forgejo-federation/modules/indexer/issues/indexer.go
Jason Song beb71f5ef6
Include public repos in doer's dashboard for issue search (#28304)
It will fix #28268 .

<img width="1313" alt="image"
src="https://github.com/go-gitea/gitea/assets/9418365/cb1e07d5-7a12-4691-a054-8278ba255bfc">

<img width="1318" alt="image"
src="https://github.com/go-gitea/gitea/assets/9418365/4fd60820-97f1-4c2c-a233-d3671a5039e9">

## ⚠️ BREAKING ⚠️

But need to give up some features:

<img width="1312" alt="image"
src="https://github.com/go-gitea/gitea/assets/9418365/281c0d51-0e7d-473f-bbed-216e2f645610">

However, such abandonment may fix #28055 .

## Backgroud

When the user switches the dashboard context to an org, it means they
want to search issues in the repos that belong to the org. However, when
they switch to themselves, it means all repos they can access because
they may have created an issue in a public repo that they don't own.

<img width="286" alt="image"
src="https://github.com/go-gitea/gitea/assets/9418365/182dcd5b-1c20-4725-93af-96e8dfae5b97">

It's a confusing design. Think about this: What does "In your
repositories" mean when the user switches to an org? Repos belong to the
user or the org?

Whatever, it has been broken by #26012 and its following PRs. After the
PR, it searches for issues in repos that the dashboard context user owns
or has been explicitly granted access to, so it causes #28268.

## How to fix it

It's not really difficult to fix it. Just extend the repo scope to
search issues when the dashboard context user is the doer. Since the
user may create issues or be mentioned in any public repo, we can just
set `AllPublic` to true, which is already supported by indexers. The DB
condition will also support it in this PR.

But the real difficulty is how to count the search results grouped by
repos. It's something like "search issues with this keyword and those
filters, and return the total number and the top results. **Then, group
all of them by repo and return the counts of each group.**"

<img width="314" alt="image"
src="https://github.com/go-gitea/gitea/assets/9418365/5206eb20-f8f5-49b9-b45a-1be2fcf679f4">

Before #26012, it was being done in the DB, but it caused the results to
be incomplete (see the description of #26012).

And to keep this, #26012 implement it in an inefficient way, just count
the issues by repo one by one, so it cannot work when `AllPublic` is
true because it's almost impossible to do this for all public repos.


1bfcdeef4c/modules/indexer/issues/indexer.go (L318-L338)

## Give up unnecessary features

We may can resovle `TODO: use "group by" of the indexer engines to
implement it`, I'm sure it can be done with Elasticsearch, but IIRC,
Bleve and Meilisearch don't support "group by".

And the real question is, does it worth it? Why should we need to know
the counts grouped by repos?

Let me show you my search dashboard on gitea.com.

<img width="1304" alt="image"
src="https://github.com/go-gitea/gitea/assets/9418365/2bca2d46-6c71-4de1-94cb-0c9af27c62ff">

I never think the long repo list helps anything.

And if we agree to abandon it, things will be much easier. That is this
PR.

## TODO

I know it's important to filter by repos when searching issues. However,
it shouldn't be the way we have it now. It could be implemented like
this.

<img width="1316" alt="image"
src="https://github.com/go-gitea/gitea/assets/9418365/99ee5f21-cbb5-4dfe-914d-cb796cb79fbe">

The indexers support it well now, but it requires some frontend work,
which I'm not good at. So, I think someone could help do that in another
PR and merge this one to fix the bug first.

Or please block this PR and help to complete it.

Finally, "Switch dashboard context" is also a design that needs
improvement. In my opinion, it can be accomplished by adding filtering
conditions instead of "switching".
2023-12-07 13:26:18 +08:00

315 lines
11 KiB
Go

// Copyright 2018 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package issues
import (
"context"
"fmt"
"os"
"runtime/pprof"
"sync/atomic"
"time"
db_model "code.gitea.io/gitea/models/db"
repo_model "code.gitea.io/gitea/models/repo"
"code.gitea.io/gitea/modules/graceful"
"code.gitea.io/gitea/modules/indexer/issues/bleve"
"code.gitea.io/gitea/modules/indexer/issues/db"
"code.gitea.io/gitea/modules/indexer/issues/elasticsearch"
"code.gitea.io/gitea/modules/indexer/issues/internal"
"code.gitea.io/gitea/modules/indexer/issues/meilisearch"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/process"
"code.gitea.io/gitea/modules/queue"
"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/util"
)
// IndexerMetadata is used to send data to the queue, so it contains only the ids.
// It may look weired, because it has to be compatible with the old queue data format.
// If the IsDelete flag is true, the IDs specify the issues to delete from the index without querying the database.
// If the IsDelete flag is false, the ID specify the issue to index, so Indexer will query the database to get the issue data.
// It should be noted that if the id is not existing in the database, it's index will be deleted too even if IsDelete is false.
// Valid values:
// - IsDelete = true, IDs = [1, 2, 3], and ID will be ignored
// - IsDelete = false, ID = 1, and IDs will be ignored
type IndexerMetadata struct {
ID int64 `json:"id"`
IsDelete bool `json:"is_delete"`
IDs []int64 `json:"ids"`
}
var (
// issueIndexerQueue queue of issue ids to be updated
issueIndexerQueue *queue.WorkerPoolQueue[*IndexerMetadata]
// globalIndexer is the global indexer, it cannot be nil.
// When the real indexer is not ready, it will be a dummy indexer which will return error to explain it's not ready.
// So it's always safe use it as *globalIndexer.Load() and call its methods.
globalIndexer atomic.Pointer[internal.Indexer]
dummyIndexer *internal.Indexer
)
func init() {
i := internal.NewDummyIndexer()
dummyIndexer = &i
globalIndexer.Store(dummyIndexer)
}
// InitIssueIndexer initialize issue indexer, syncReindex is true then reindex until
// all issue index done.
func InitIssueIndexer(syncReindex bool) {
ctx, _, finished := process.GetManager().AddTypedContext(context.Background(), "Service: IssueIndexer", process.SystemProcessType, false)
indexerInitWaitChannel := make(chan time.Duration, 1)
// Create the Queue
issueIndexerQueue = queue.CreateUniqueQueue(ctx, "issue_indexer", getIssueIndexerQueueHandler(ctx))
graceful.GetManager().RunAtTerminate(finished)
// Create the Indexer
go func() {
pprof.SetGoroutineLabels(ctx)
start := time.Now()
log.Info("PID %d: Initializing Issue Indexer: %s", os.Getpid(), setting.Indexer.IssueType)
var (
issueIndexer internal.Indexer
existed bool
err error
)
switch setting.Indexer.IssueType {
case "bleve":
defer func() {
if err := recover(); err != nil {
log.Error("PANIC whilst initializing issue indexer: %v\nStacktrace: %s", err, log.Stack(2))
log.Error("The indexer files are likely corrupted and may need to be deleted")
log.Error("You can completely remove the %q directory to make Gitea recreate the indexes", setting.Indexer.IssuePath)
globalIndexer.Store(dummyIndexer)
log.Fatal("PID: %d Unable to initialize the Bleve Issue Indexer at path: %s Error: %v", os.Getpid(), setting.Indexer.IssuePath, err)
}
}()
issueIndexer = bleve.NewIndexer(setting.Indexer.IssuePath)
existed, err = issueIndexer.Init(ctx)
if err != nil {
log.Fatal("Unable to initialize Bleve Issue Indexer at path: %s Error: %v", setting.Indexer.IssuePath, err)
}
case "elasticsearch":
issueIndexer = elasticsearch.NewIndexer(setting.Indexer.IssueConnStr, setting.Indexer.IssueIndexerName)
existed, err = issueIndexer.Init(ctx)
if err != nil {
log.Fatal("Unable to issueIndexer.Init with connection %s Error: %v", setting.Indexer.IssueConnStr, err)
}
case "db":
issueIndexer = db.NewIndexer()
case "meilisearch":
issueIndexer = meilisearch.NewIndexer(setting.Indexer.IssueConnStr, setting.Indexer.IssueConnAuth, setting.Indexer.IssueIndexerName)
existed, err = issueIndexer.Init(ctx)
if err != nil {
log.Fatal("Unable to issueIndexer.Init with connection %s Error: %v", setting.Indexer.IssueConnStr, err)
}
default:
log.Fatal("Unknown issue indexer type: %s", setting.Indexer.IssueType)
}
globalIndexer.Store(&issueIndexer)
graceful.GetManager().RunAtTerminate(func() {
log.Debug("Closing issue indexer")
(*globalIndexer.Load()).Close()
log.Info("PID: %d Issue Indexer closed", os.Getpid())
})
// Start processing the queue
go graceful.GetManager().RunWithCancel(issueIndexerQueue)
// Populate the index
if !existed {
if syncReindex {
graceful.GetManager().RunWithShutdownContext(populateIssueIndexer)
} else {
go graceful.GetManager().RunWithShutdownContext(populateIssueIndexer)
}
}
indexerInitWaitChannel <- time.Since(start)
close(indexerInitWaitChannel)
}()
if syncReindex {
select {
case <-indexerInitWaitChannel:
case <-graceful.GetManager().IsShutdown():
}
} else if setting.Indexer.StartupTimeout > 0 {
go func() {
pprof.SetGoroutineLabels(ctx)
timeout := setting.Indexer.StartupTimeout
if graceful.GetManager().IsChild() && setting.GracefulHammerTime > 0 {
timeout += setting.GracefulHammerTime
}
select {
case duration := <-indexerInitWaitChannel:
log.Info("Issue Indexer Initialization took %v", duration)
case <-graceful.GetManager().IsShutdown():
log.Warn("Shutdown occurred before issue index initialisation was complete")
case <-time.After(timeout):
issueIndexerQueue.ShutdownWait(5 * time.Second)
log.Fatal("Issue Indexer Initialization timed-out after: %v", timeout)
}
}()
}
}
func getIssueIndexerQueueHandler(ctx context.Context) func(items ...*IndexerMetadata) []*IndexerMetadata {
return func(items ...*IndexerMetadata) []*IndexerMetadata {
var unhandled []*IndexerMetadata
indexer := *globalIndexer.Load()
for _, item := range items {
log.Trace("IndexerMetadata Process: %d %v %t", item.ID, item.IDs, item.IsDelete)
if item.IsDelete {
if err := indexer.Delete(ctx, item.IDs...); err != nil {
log.Error("Issue indexer handler: failed to from index: %v Error: %v", item.IDs, err)
unhandled = append(unhandled, item)
}
continue
}
data, existed, err := getIssueIndexerData(ctx, item.ID)
if err != nil {
log.Error("Issue indexer handler: failed to get issue data of %d: %v", item.ID, err)
unhandled = append(unhandled, item)
continue
}
if !existed {
if err := indexer.Delete(ctx, item.ID); err != nil {
log.Error("Issue indexer handler: failed to delete issue %d from index: %v", item.ID, err)
unhandled = append(unhandled, item)
}
continue
}
if err := indexer.Index(ctx, data); err != nil {
log.Error("Issue indexer handler: failed to index issue %d: %v", item.ID, err)
unhandled = append(unhandled, item)
continue
}
}
return unhandled
}
}
// populateIssueIndexer populate the issue indexer with issue data
func populateIssueIndexer(ctx context.Context) {
ctx, _, finished := process.GetManager().AddTypedContext(ctx, "Service: PopulateIssueIndexer", process.SystemProcessType, true)
defer finished()
ctx = contextWithKeepRetry(ctx) // keep retrying since it's a background task
if err := PopulateIssueIndexer(ctx); err != nil {
log.Error("Issue indexer population failed: %v", err)
}
}
func PopulateIssueIndexer(ctx context.Context) error {
for page := 1; ; page++ {
select {
case <-ctx.Done():
return fmt.Errorf("shutdown before completion: %w", ctx.Err())
default:
}
repos, _, err := repo_model.SearchRepositoryByName(ctx, &repo_model.SearchRepoOptions{
ListOptions: db_model.ListOptions{Page: page, PageSize: repo_model.RepositoryListDefaultPageSize},
OrderBy: db_model.SearchOrderByID,
Private: true,
Collaborate: util.OptionalBoolFalse,
})
if err != nil {
log.Error("SearchRepositoryByName: %v", err)
continue
}
if len(repos) == 0 {
log.Debug("Issue Indexer population complete")
return nil
}
for _, repo := range repos {
if err := updateRepoIndexer(ctx, repo.ID); err != nil {
return fmt.Errorf("populate issue indexer for repo %d: %v", repo.ID, err)
}
}
}
}
// UpdateRepoIndexer add/update all issues of the repositories
func UpdateRepoIndexer(ctx context.Context, repoID int64) {
if err := updateRepoIndexer(ctx, repoID); err != nil {
log.Error("Unable to push repo %d to issue indexer: %v", repoID, err)
}
}
// UpdateIssueIndexer add/update an issue to the issue indexer
func UpdateIssueIndexer(ctx context.Context, issueID int64) {
if err := updateIssueIndexer(ctx, issueID); err != nil {
log.Error("Unable to push issue %d to issue indexer: %v", issueID, err)
}
}
// DeleteRepoIssueIndexer deletes repo's all issues indexes
func DeleteRepoIssueIndexer(ctx context.Context, repoID int64) {
if err := deleteRepoIssueIndexer(ctx, repoID); err != nil {
log.Error("Unable to push deleted repo %d to issue indexer: %v", repoID, err)
}
}
// IsAvailable checks if issue indexer is available
func IsAvailable(ctx context.Context) bool {
return (*globalIndexer.Load()).Ping(ctx) == nil
}
// SearchOptions indicates the options for searching issues
type SearchOptions = internal.SearchOptions
const (
SortByCreatedDesc = internal.SortByCreatedDesc
SortByUpdatedDesc = internal.SortByUpdatedDesc
SortByCommentsDesc = internal.SortByCommentsDesc
SortByDeadlineDesc = internal.SortByDeadlineDesc
SortByCreatedAsc = internal.SortByCreatedAsc
SortByUpdatedAsc = internal.SortByUpdatedAsc
SortByCommentsAsc = internal.SortByCommentsAsc
SortByDeadlineAsc = internal.SortByDeadlineAsc
)
// SearchIssues search issues by options.
func SearchIssues(ctx context.Context, opts *SearchOptions) ([]int64, int64, error) {
indexer := *globalIndexer.Load()
if opts.Keyword == "" {
// This is a conservative shortcut.
// If the keyword is empty, db has better (at least not worse) performance to filter issues.
// When the keyword is empty, it tends to listing rather than searching issues.
// So if the user creates an issue and list issues immediately, the issue may not be listed because the indexer needs time to index the issue.
// Even worse, the external indexer like elastic search may not be available for a while,
// and the user may not be able to list issues completely until it is available again.
indexer = db.NewIndexer()
}
result, err := indexer.Search(ctx, opts)
if err != nil {
return nil, 0, err
}
ret := make([]int64, 0, len(result.Hits))
for _, hit := range result.Hits {
ret = append(ret, hit.ID)
}
return ret, result.Total, nil
}
// CountIssues counts issues by options. It is a shortcut of SearchIssues(ctx, opts) but only returns the total count.
func CountIssues(ctx context.Context, opts *SearchOptions) (int64, error) {
opts = opts.Copy(func(options *SearchOptions) { opts.Paginator = &db_model.ListOptions{PageSize: 0} })
_, total, err := SearchIssues(ctx, opts)
return total, err
}