From 315124b4693dd56d94d52cda3015611184a49a27 Mon Sep 17 00:00:00 2001 From: Lunny Xiao Date: Mon, 5 Jun 2023 18:33:47 +0800 Subject: [PATCH] Fix parallelly generating index failure with Mysql (#24567) --- models/db/index.go | 23 ++++++++++++- models/git/commit_status.go | 34 +++++++++++++++---- tests/integration/api_issue_test.go | 45 ++++++++++++++++++++++++++ tests/integration/repo_commits_test.go | 4 --- 4 files changed, 95 insertions(+), 11 deletions(-) diff --git a/models/db/index.go b/models/db/index.go index 7609d8fb6..259ddd6ad 100644 --- a/models/db/index.go +++ b/models/db/index.go @@ -71,10 +71,31 @@ func postgresGetNextResourceIndex(ctx context.Context, tableName string, groupID return strconv.ParseInt(string(res[0]["max_index"]), 10, 64) } +func mysqlGetNextResourceIndex(ctx context.Context, tableName string, groupID int64) (int64, error) { + if _, err := GetEngine(ctx).Exec(fmt.Sprintf("INSERT INTO %s (group_id, max_index) "+ + "VALUES (?,1) ON DUPLICATE KEY UPDATE max_index = max_index+1", + tableName), groupID); err != nil { + return 0, err + } + + var idx int64 + _, err := GetEngine(ctx).SQL(fmt.Sprintf("SELECT max_index FROM %s WHERE group_id = ?", tableName), groupID).Get(&idx) + if err != nil { + return 0, err + } + if idx == 0 { + return 0, errors.New("cannot get the correct index") + } + return idx, nil +} + // GetNextResourceIndex generates a resource index, it must run in the same transaction where the resource is created func GetNextResourceIndex(ctx context.Context, tableName string, groupID int64) (int64, error) { - if setting.Database.Type.IsPostgreSQL() { + switch { + case setting.Database.Type.IsPostgreSQL(): return postgresGetNextResourceIndex(ctx, tableName, groupID) + case setting.Database.Type.IsMySQL(): + return mysqlGetNextResourceIndex(ctx, tableName, groupID) } e := GetEngine(ctx) diff --git a/models/git/commit_status.go b/models/git/commit_status.go index 6028e4664..a018bb055 100644 --- a/models/git/commit_status.go +++ b/models/git/commit_status.go @@ -64,10 +64,32 @@ func postgresGetCommitStatusIndex(ctx context.Context, repoID int64, sha string) return strconv.ParseInt(string(res[0]["max_index"]), 10, 64) } +func mysqlGetCommitStatusIndex(ctx context.Context, repoID int64, sha string) (int64, error) { + if _, err := db.GetEngine(ctx).Exec("INSERT INTO `commit_status_index` (repo_id, sha, max_index) "+ + "VALUES (?,?,1) ON DUPLICATE KEY UPDATE max_index = max_index+1", + repoID, sha); err != nil { + return 0, err + } + + var idx int64 + _, err := db.GetEngine(ctx).SQL("SELECT max_index FROM `commit_status_index` WHERE repo_id = ? AND sha = ?", + repoID, sha).Get(&idx) + if err != nil { + return 0, err + } + if idx == 0 { + return 0, errors.New("cannot get the correct index") + } + return idx, nil +} + // GetNextCommitStatusIndex retried 3 times to generate a resource index func GetNextCommitStatusIndex(ctx context.Context, repoID int64, sha string) (int64, error) { - if setting.Database.Type.IsPostgreSQL() { + switch { + case setting.Database.Type.IsPostgreSQL(): return postgresGetCommitStatusIndex(ctx, repoID, sha) + case setting.Database.Type.IsMySQL(): + return mysqlGetCommitStatusIndex(ctx, repoID, sha) } e := db.GetEngine(ctx) @@ -75,7 +97,7 @@ func GetNextCommitStatusIndex(ctx context.Context, repoID int64, sha string) (in // try to update the max_index to next value, and acquire the write-lock for the record res, err := e.Exec("UPDATE `commit_status_index` SET max_index=max_index+1 WHERE repo_id=? AND sha=?", repoID, sha) if err != nil { - return 0, err + return 0, fmt.Errorf("update failed: %w", err) } affected, err := res.RowsAffected() if err != nil { @@ -86,18 +108,18 @@ func GetNextCommitStatusIndex(ctx context.Context, repoID int64, sha string) (in _, errIns := e.Exec("INSERT INTO `commit_status_index` (repo_id, sha, max_index) VALUES (?, ?, 0)", repoID, sha) res, err = e.Exec("UPDATE `commit_status_index` SET max_index=max_index+1 WHERE repo_id=? AND sha=?", repoID, sha) if err != nil { - return 0, err + return 0, fmt.Errorf("update2 failed: %w", err) } affected, err = res.RowsAffected() if err != nil { - return 0, err + return 0, fmt.Errorf("RowsAffected failed: %w", err) } // if the update still can not update any records, the record must not exist and there must be some errors (insert error) if affected == 0 { if errIns == nil { return 0, errors.New("impossible error when GetNextCommitStatusIndex, insert and update both succeeded but no record is updated") } - return 0, errIns + return 0, fmt.Errorf("insert failed: %w", errIns) } } @@ -105,7 +127,7 @@ func GetNextCommitStatusIndex(ctx context.Context, repoID int64, sha string) (in var newIdx int64 has, err := e.SQL("SELECT max_index FROM `commit_status_index` WHERE repo_id=? AND sha=?", repoID, sha).Get(&newIdx) if err != nil { - return 0, err + return 0, fmt.Errorf("select failed: %w", err) } if !has { return 0, errors.New("impossible error when GetNextCommitStatusIndex, upsert succeeded but no record can be selected") diff --git a/tests/integration/api_issue_test.go b/tests/integration/api_issue_test.go index 324f5ddbe..8b02342d8 100644 --- a/tests/integration/api_issue_test.go +++ b/tests/integration/api_issue_test.go @@ -7,6 +7,8 @@ import ( "fmt" "net/http" "net/url" + "strconv" + "sync" "testing" "time" @@ -106,6 +108,49 @@ func TestAPICreateIssue(t *testing.T) { assert.Equal(t, repoBefore.NumClosedIssues, repoAfter.NumClosedIssues) } +func TestAPICreateIssueParallel(t *testing.T) { + defer tests.PrepareTestEnv(t)() + const body, title = "apiTestBody", "apiTestTitle" + + repoBefore := unittest.AssertExistsAndLoadBean(t, &repo_model.Repository{ID: 1}) + owner := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: repoBefore.OwnerID}) + + session := loginUser(t, owner.Name) + token := getTokenForLoggedInUser(t, session, auth_model.AccessTokenScopeWriteIssue) + urlStr := fmt.Sprintf("/api/v1/repos/%s/%s/issues?state=all&token=%s", owner.Name, repoBefore.Name, token) + + var wg sync.WaitGroup + for i := 0; i < 10; i++ { + wg.Add(1) + go func(parentT *testing.T, i int) { + parentT.Run(fmt.Sprintf("ParallelCreateIssue_%d", i), func(t *testing.T) { + newTitle := title + strconv.Itoa(i) + newBody := body + strconv.Itoa(i) + req := NewRequestWithJSON(t, "POST", urlStr, &api.CreateIssueOption{ + Body: newBody, + Title: newTitle, + Assignee: owner.Name, + }) + resp := MakeRequest(t, req, http.StatusCreated) + var apiIssue api.Issue + DecodeJSON(t, resp, &apiIssue) + assert.Equal(t, newBody, apiIssue.Body) + assert.Equal(t, newTitle, apiIssue.Title) + + unittest.AssertExistsAndLoadBean(t, &issues_model.Issue{ + RepoID: repoBefore.ID, + AssigneeID: owner.ID, + Content: newBody, + Title: newTitle, + }) + + wg.Done() + }) + }(t, i) + } + wg.Wait() +} + func TestAPIEditIssue(t *testing.T) { defer tests.PrepareTestEnv(t)() diff --git a/tests/integration/repo_commits_test.go b/tests/integration/repo_commits_test.go index 99927f192..5f580a026 100644 --- a/tests/integration/repo_commits_test.go +++ b/tests/integration/repo_commits_test.go @@ -7,7 +7,6 @@ import ( "fmt" "net/http" "net/http/httptest" - "os" "path" "sync" "testing" @@ -135,9 +134,6 @@ func TestRepoCommitsWithStatusRunning(t *testing.T) { } func TestRepoCommitsStatusParallel(t *testing.T) { - if os.Getenv("CI") != "" { - t.Skip("Skipping because test is flaky on CI") - } defer tests.PrepareTestEnv(t)() session := loginUser(t, "user2")