Merge pull request '[TEST] make the indexer and pull tasks cancellable (without shutdown)' (#3141) from oliverpool/forgejo:cancellable_queues into forgejo
Reviewed-on: https://codeberg.org/forgejo/forgejo/pulls/3141 Reviewed-by: Earl Warren <earl-warren@noreply.codeberg.org>
This commit is contained in:
commit
91617c30ca
2 changed files with 11 additions and 3 deletions
|
@ -120,9 +120,12 @@ func Init() {
|
||||||
case "bleve", "elasticsearch":
|
case "bleve", "elasticsearch":
|
||||||
handler := func(items ...*internal.IndexerData) (unhandled []*internal.IndexerData) {
|
handler := func(items ...*internal.IndexerData) (unhandled []*internal.IndexerData) {
|
||||||
indexer := *globalIndexer.Load()
|
indexer := *globalIndexer.Load()
|
||||||
|
// make it a process to allow for cancellation (especially during integration tests where no global shutdown happens)
|
||||||
|
batchCtx, _, finished := process.GetManager().AddContext(ctx, "CodeIndexer batch")
|
||||||
|
defer finished()
|
||||||
for _, indexerData := range items {
|
for _, indexerData := range items {
|
||||||
log.Trace("IndexerData Process Repo: %d", indexerData.RepoID)
|
log.Trace("IndexerData Process Repo: %d", indexerData.RepoID)
|
||||||
if err := index(ctx, indexer, indexerData.RepoID); err != nil {
|
if err := index(batchCtx, indexer, indexerData.RepoID); err != nil {
|
||||||
unhandled = append(unhandled, indexerData)
|
unhandled = append(unhandled, indexerData)
|
||||||
if !setting.IsInTesting {
|
if !setting.IsInTesting {
|
||||||
log.Error("Codes indexer handler: index error for repo %v: %v", indexerData.RepoID, err)
|
log.Error("Codes indexer handler: index error for repo %v: %v", indexerData.RepoID, err)
|
||||||
|
|
|
@ -26,6 +26,7 @@ import (
|
||||||
"code.gitea.io/gitea/modules/graceful"
|
"code.gitea.io/gitea/modules/graceful"
|
||||||
"code.gitea.io/gitea/modules/json"
|
"code.gitea.io/gitea/modules/json"
|
||||||
"code.gitea.io/gitea/modules/log"
|
"code.gitea.io/gitea/modules/log"
|
||||||
|
"code.gitea.io/gitea/modules/process"
|
||||||
repo_module "code.gitea.io/gitea/modules/repository"
|
repo_module "code.gitea.io/gitea/modules/repository"
|
||||||
"code.gitea.io/gitea/modules/setting"
|
"code.gitea.io/gitea/modules/setting"
|
||||||
"code.gitea.io/gitea/modules/sync"
|
"code.gitea.io/gitea/modules/sync"
|
||||||
|
@ -296,8 +297,12 @@ func checkForInvalidation(ctx context.Context, requests issues_model.PullRequest
|
||||||
// AddTestPullRequestTask adds new test tasks by given head/base repository and head/base branch,
|
// AddTestPullRequestTask adds new test tasks by given head/base repository and head/base branch,
|
||||||
// and generate new patch for testing as needed.
|
// and generate new patch for testing as needed.
|
||||||
func AddTestPullRequestTask(doer *user_model.User, repoID int64, branch string, isSync bool, oldCommitID, newCommitID string) {
|
func AddTestPullRequestTask(doer *user_model.User, repoID int64, branch string, isSync bool, oldCommitID, newCommitID string) {
|
||||||
log.Trace("AddTestPullRequestTask [head_repo_id: %d, head_branch: %s]: finding pull requests", repoID, branch)
|
description := fmt.Sprintf("AddTestPullRequestTask [head_repo_id: %d, head_branch: %s]: finding pull requests", repoID, branch)
|
||||||
graceful.GetManager().RunWithShutdownContext(func(ctx context.Context) {
|
log.Trace(description)
|
||||||
|
graceful.GetManager().RunWithShutdownContext(func(shutdownCtx context.Context) {
|
||||||
|
// make it a process to allow for cancellation (especially during integration tests where no global shutdown happens)
|
||||||
|
ctx, _, finished := process.GetManager().AddContext(shutdownCtx, description)
|
||||||
|
defer finished()
|
||||||
// There is no sensible way to shut this down ":-("
|
// There is no sensible way to shut this down ":-("
|
||||||
// If you don't let it run all the way then you will lose data
|
// If you don't let it run all the way then you will lose data
|
||||||
// TODO: graceful: AddTestPullRequestTask needs to become a queue!
|
// TODO: graceful: AddTestPullRequestTask needs to become a queue!
|
||||||
|
|
Loading…
Reference in a new issue