Template
1
0
Fork 0
mirror of https://codeberg.org/forgejo/forgejo synced 2024-11-22 09:54:24 +01:00

Fix parallelly generating index failure with Mysql (#24567)

This commit is contained in:
Lunny Xiao 2023-06-05 18:33:47 +08:00 committed by GitHub
parent 3d1fda737b
commit 315124b469
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 95 additions and 11 deletions

View file

@ -71,10 +71,31 @@ func postgresGetNextResourceIndex(ctx context.Context, tableName string, groupID
return strconv.ParseInt(string(res[0]["max_index"]), 10, 64) return strconv.ParseInt(string(res[0]["max_index"]), 10, 64)
} }
func mysqlGetNextResourceIndex(ctx context.Context, tableName string, groupID int64) (int64, error) {
if _, err := GetEngine(ctx).Exec(fmt.Sprintf("INSERT INTO %s (group_id, max_index) "+
"VALUES (?,1) ON DUPLICATE KEY UPDATE max_index = max_index+1",
tableName), groupID); err != nil {
return 0, err
}
var idx int64
_, err := GetEngine(ctx).SQL(fmt.Sprintf("SELECT max_index FROM %s WHERE group_id = ?", tableName), groupID).Get(&idx)
if err != nil {
return 0, err
}
if idx == 0 {
return 0, errors.New("cannot get the correct index")
}
return idx, nil
}
// GetNextResourceIndex generates a resource index, it must run in the same transaction where the resource is created // GetNextResourceIndex generates a resource index, it must run in the same transaction where the resource is created
func GetNextResourceIndex(ctx context.Context, tableName string, groupID int64) (int64, error) { func GetNextResourceIndex(ctx context.Context, tableName string, groupID int64) (int64, error) {
if setting.Database.Type.IsPostgreSQL() { switch {
case setting.Database.Type.IsPostgreSQL():
return postgresGetNextResourceIndex(ctx, tableName, groupID) return postgresGetNextResourceIndex(ctx, tableName, groupID)
case setting.Database.Type.IsMySQL():
return mysqlGetNextResourceIndex(ctx, tableName, groupID)
} }
e := GetEngine(ctx) e := GetEngine(ctx)

View file

@ -64,10 +64,32 @@ func postgresGetCommitStatusIndex(ctx context.Context, repoID int64, sha string)
return strconv.ParseInt(string(res[0]["max_index"]), 10, 64) return strconv.ParseInt(string(res[0]["max_index"]), 10, 64)
} }
func mysqlGetCommitStatusIndex(ctx context.Context, repoID int64, sha string) (int64, error) {
if _, err := db.GetEngine(ctx).Exec("INSERT INTO `commit_status_index` (repo_id, sha, max_index) "+
"VALUES (?,?,1) ON DUPLICATE KEY UPDATE max_index = max_index+1",
repoID, sha); err != nil {
return 0, err
}
var idx int64
_, err := db.GetEngine(ctx).SQL("SELECT max_index FROM `commit_status_index` WHERE repo_id = ? AND sha = ?",
repoID, sha).Get(&idx)
if err != nil {
return 0, err
}
if idx == 0 {
return 0, errors.New("cannot get the correct index")
}
return idx, nil
}
// GetNextCommitStatusIndex retried 3 times to generate a resource index // GetNextCommitStatusIndex retried 3 times to generate a resource index
func GetNextCommitStatusIndex(ctx context.Context, repoID int64, sha string) (int64, error) { func GetNextCommitStatusIndex(ctx context.Context, repoID int64, sha string) (int64, error) {
if setting.Database.Type.IsPostgreSQL() { switch {
case setting.Database.Type.IsPostgreSQL():
return postgresGetCommitStatusIndex(ctx, repoID, sha) return postgresGetCommitStatusIndex(ctx, repoID, sha)
case setting.Database.Type.IsMySQL():
return mysqlGetCommitStatusIndex(ctx, repoID, sha)
} }
e := db.GetEngine(ctx) e := db.GetEngine(ctx)
@ -75,7 +97,7 @@ func GetNextCommitStatusIndex(ctx context.Context, repoID int64, sha string) (in
// try to update the max_index to next value, and acquire the write-lock for the record // try to update the max_index to next value, and acquire the write-lock for the record
res, err := e.Exec("UPDATE `commit_status_index` SET max_index=max_index+1 WHERE repo_id=? AND sha=?", repoID, sha) res, err := e.Exec("UPDATE `commit_status_index` SET max_index=max_index+1 WHERE repo_id=? AND sha=?", repoID, sha)
if err != nil { if err != nil {
return 0, err return 0, fmt.Errorf("update failed: %w", err)
} }
affected, err := res.RowsAffected() affected, err := res.RowsAffected()
if err != nil { if err != nil {
@ -86,18 +108,18 @@ func GetNextCommitStatusIndex(ctx context.Context, repoID int64, sha string) (in
_, errIns := e.Exec("INSERT INTO `commit_status_index` (repo_id, sha, max_index) VALUES (?, ?, 0)", repoID, sha) _, errIns := e.Exec("INSERT INTO `commit_status_index` (repo_id, sha, max_index) VALUES (?, ?, 0)", repoID, sha)
res, err = e.Exec("UPDATE `commit_status_index` SET max_index=max_index+1 WHERE repo_id=? AND sha=?", repoID, sha) res, err = e.Exec("UPDATE `commit_status_index` SET max_index=max_index+1 WHERE repo_id=? AND sha=?", repoID, sha)
if err != nil { if err != nil {
return 0, err return 0, fmt.Errorf("update2 failed: %w", err)
} }
affected, err = res.RowsAffected() affected, err = res.RowsAffected()
if err != nil { if err != nil {
return 0, err return 0, fmt.Errorf("RowsAffected failed: %w", err)
} }
// if the update still can not update any records, the record must not exist and there must be some errors (insert error) // if the update still can not update any records, the record must not exist and there must be some errors (insert error)
if affected == 0 { if affected == 0 {
if errIns == nil { if errIns == nil {
return 0, errors.New("impossible error when GetNextCommitStatusIndex, insert and update both succeeded but no record is updated") return 0, errors.New("impossible error when GetNextCommitStatusIndex, insert and update both succeeded but no record is updated")
} }
return 0, errIns return 0, fmt.Errorf("insert failed: %w", errIns)
} }
} }
@ -105,7 +127,7 @@ func GetNextCommitStatusIndex(ctx context.Context, repoID int64, sha string) (in
var newIdx int64 var newIdx int64
has, err := e.SQL("SELECT max_index FROM `commit_status_index` WHERE repo_id=? AND sha=?", repoID, sha).Get(&newIdx) has, err := e.SQL("SELECT max_index FROM `commit_status_index` WHERE repo_id=? AND sha=?", repoID, sha).Get(&newIdx)
if err != nil { if err != nil {
return 0, err return 0, fmt.Errorf("select failed: %w", err)
} }
if !has { if !has {
return 0, errors.New("impossible error when GetNextCommitStatusIndex, upsert succeeded but no record can be selected") return 0, errors.New("impossible error when GetNextCommitStatusIndex, upsert succeeded but no record can be selected")

View file

@ -7,6 +7,8 @@ import (
"fmt" "fmt"
"net/http" "net/http"
"net/url" "net/url"
"strconv"
"sync"
"testing" "testing"
"time" "time"
@ -106,6 +108,49 @@ func TestAPICreateIssue(t *testing.T) {
assert.Equal(t, repoBefore.NumClosedIssues, repoAfter.NumClosedIssues) assert.Equal(t, repoBefore.NumClosedIssues, repoAfter.NumClosedIssues)
} }
func TestAPICreateIssueParallel(t *testing.T) {
defer tests.PrepareTestEnv(t)()
const body, title = "apiTestBody", "apiTestTitle"
repoBefore := unittest.AssertExistsAndLoadBean(t, &repo_model.Repository{ID: 1})
owner := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: repoBefore.OwnerID})
session := loginUser(t, owner.Name)
token := getTokenForLoggedInUser(t, session, auth_model.AccessTokenScopeWriteIssue)
urlStr := fmt.Sprintf("/api/v1/repos/%s/%s/issues?state=all&token=%s", owner.Name, repoBefore.Name, token)
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func(parentT *testing.T, i int) {
parentT.Run(fmt.Sprintf("ParallelCreateIssue_%d", i), func(t *testing.T) {
newTitle := title + strconv.Itoa(i)
newBody := body + strconv.Itoa(i)
req := NewRequestWithJSON(t, "POST", urlStr, &api.CreateIssueOption{
Body: newBody,
Title: newTitle,
Assignee: owner.Name,
})
resp := MakeRequest(t, req, http.StatusCreated)
var apiIssue api.Issue
DecodeJSON(t, resp, &apiIssue)
assert.Equal(t, newBody, apiIssue.Body)
assert.Equal(t, newTitle, apiIssue.Title)
unittest.AssertExistsAndLoadBean(t, &issues_model.Issue{
RepoID: repoBefore.ID,
AssigneeID: owner.ID,
Content: newBody,
Title: newTitle,
})
wg.Done()
})
}(t, i)
}
wg.Wait()
}
func TestAPIEditIssue(t *testing.T) { func TestAPIEditIssue(t *testing.T) {
defer tests.PrepareTestEnv(t)() defer tests.PrepareTestEnv(t)()

View file

@ -7,7 +7,6 @@ import (
"fmt" "fmt"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"os"
"path" "path"
"sync" "sync"
"testing" "testing"
@ -135,9 +134,6 @@ func TestRepoCommitsWithStatusRunning(t *testing.T) {
} }
func TestRepoCommitsStatusParallel(t *testing.T) { func TestRepoCommitsStatusParallel(t *testing.T) {
if os.Getenv("CI") != "" {
t.Skip("Skipping because test is flaky on CI")
}
defer tests.PrepareTestEnv(t)() defer tests.PrepareTestEnv(t)()
session := loginUser(t, "user2") session := loginUser(t, "user2")