Fix data race in bleve indexer (#16474)
* Fix data race in bleve indexer
This commit is contained in:
parent
bb7a7c4292
commit
43262226db
3 changed files with 69 additions and 6 deletions
59
modules/indexer/bleve/batch.go
Normal file
59
modules/indexer/bleve/batch.go
Normal file
|
@ -0,0 +1,59 @@
|
||||||
|
// Copyright 2021 The Gitea Authors. All rights reserved.
|
||||||
|
// Use of this source code is governed by a MIT-style
|
||||||
|
// license that can be found in the LICENSE file.
|
||||||
|
|
||||||
|
package bleve
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/blevesearch/bleve/v2"
|
||||||
|
)
|
||||||
|
|
||||||
|
// FlushingBatch is a batch of operations that automatically flushes to the
|
||||||
|
// underlying index once it reaches a certain size.
|
||||||
|
type FlushingBatch struct {
|
||||||
|
maxBatchSize int
|
||||||
|
batch *bleve.Batch
|
||||||
|
index bleve.Index
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewFlushingBatch creates a new flushing batch for the specified index. Once
|
||||||
|
// the number of operations in the batch reaches the specified limit, the batch
|
||||||
|
// automatically flushes its operations to the index.
|
||||||
|
func NewFlushingBatch(index bleve.Index, maxBatchSize int) *FlushingBatch {
|
||||||
|
return &FlushingBatch{
|
||||||
|
maxBatchSize: maxBatchSize,
|
||||||
|
batch: index.NewBatch(),
|
||||||
|
index: index,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Index add a new index to batch
|
||||||
|
func (b *FlushingBatch) Index(id string, data interface{}) error {
|
||||||
|
if err := b.batch.Index(id, data); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return b.flushIfFull()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete add a delete index to batch
|
||||||
|
func (b *FlushingBatch) Delete(id string) error {
|
||||||
|
b.batch.Delete(id)
|
||||||
|
return b.flushIfFull()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *FlushingBatch) flushIfFull() error {
|
||||||
|
if b.batch.Size() < b.maxBatchSize {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return b.Flush()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Flush submit the batch and create a new one
|
||||||
|
func (b *FlushingBatch) Flush() error {
|
||||||
|
err := b.index.Batch(b.batch)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
b.batch = b.index.NewBatch()
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -18,6 +18,7 @@ import (
|
||||||
"code.gitea.io/gitea/modules/analyze"
|
"code.gitea.io/gitea/modules/analyze"
|
||||||
"code.gitea.io/gitea/modules/charset"
|
"code.gitea.io/gitea/modules/charset"
|
||||||
"code.gitea.io/gitea/modules/git"
|
"code.gitea.io/gitea/modules/git"
|
||||||
|
gitea_bleve "code.gitea.io/gitea/modules/indexer/bleve"
|
||||||
"code.gitea.io/gitea/modules/log"
|
"code.gitea.io/gitea/modules/log"
|
||||||
"code.gitea.io/gitea/modules/setting"
|
"code.gitea.io/gitea/modules/setting"
|
||||||
"code.gitea.io/gitea/modules/timeutil"
|
"code.gitea.io/gitea/modules/timeutil"
|
||||||
|
@ -176,7 +177,8 @@ func NewBleveIndexer(indexDir string) (*BleveIndexer, bool, error) {
|
||||||
return indexer, created, err
|
return indexer, created, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *BleveIndexer) addUpdate(batchWriter git.WriteCloserError, batchReader *bufio.Reader, commitSha string, update fileUpdate, repo *models.Repository, batch rupture.FlushingBatch) error {
|
func (b *BleveIndexer) addUpdate(batchWriter git.WriteCloserError, batchReader *bufio.Reader, commitSha string,
|
||||||
|
update fileUpdate, repo *models.Repository, batch *gitea_bleve.FlushingBatch) error {
|
||||||
// Ignore vendored files in code search
|
// Ignore vendored files in code search
|
||||||
if setting.Indexer.ExcludeVendored && analyze.IsVendor(update.Filename) {
|
if setting.Indexer.ExcludeVendored && analyze.IsVendor(update.Filename) {
|
||||||
return nil
|
return nil
|
||||||
|
@ -229,7 +231,7 @@ func (b *BleveIndexer) addUpdate(batchWriter git.WriteCloserError, batchReader *
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *BleveIndexer) addDelete(filename string, repo *models.Repository, batch rupture.FlushingBatch) error {
|
func (b *BleveIndexer) addDelete(filename string, repo *models.Repository, batch *gitea_bleve.FlushingBatch) error {
|
||||||
id := filenameIndexerID(repo.ID, filename)
|
id := filenameIndexerID(repo.ID, filename)
|
||||||
return batch.Delete(id)
|
return batch.Delete(id)
|
||||||
}
|
}
|
||||||
|
@ -267,7 +269,7 @@ func (b *BleveIndexer) Close() {
|
||||||
|
|
||||||
// Index indexes the data
|
// Index indexes the data
|
||||||
func (b *BleveIndexer) Index(repo *models.Repository, sha string, changes *repoChanges) error {
|
func (b *BleveIndexer) Index(repo *models.Repository, sha string, changes *repoChanges) error {
|
||||||
batch := rupture.NewFlushingBatch(b.indexer, maxBatchSize)
|
batch := gitea_bleve.NewFlushingBatch(b.indexer, maxBatchSize)
|
||||||
if len(changes.Updates) > 0 {
|
if len(changes.Updates) > 0 {
|
||||||
|
|
||||||
batchWriter, batchReader, cancel := git.CatFileBatch(repo.RepoPath())
|
batchWriter, batchReader, cancel := git.CatFileBatch(repo.RepoPath())
|
||||||
|
@ -296,7 +298,7 @@ func (b *BleveIndexer) Delete(repoID int64) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
batch := rupture.NewFlushingBatch(b.indexer, maxBatchSize)
|
batch := gitea_bleve.NewFlushingBatch(b.indexer, maxBatchSize)
|
||||||
for _, hit := range result.Hits {
|
for _, hit := range result.Hits {
|
||||||
if err = batch.Delete(hit.ID); err != nil {
|
if err = batch.Delete(hit.ID); err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
|
@ -9,8 +9,10 @@ import (
|
||||||
"os"
|
"os"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
|
||||||
|
gitea_bleve "code.gitea.io/gitea/modules/indexer/bleve"
|
||||||
"code.gitea.io/gitea/modules/log"
|
"code.gitea.io/gitea/modules/log"
|
||||||
"code.gitea.io/gitea/modules/util"
|
"code.gitea.io/gitea/modules/util"
|
||||||
|
|
||||||
"github.com/blevesearch/bleve/v2"
|
"github.com/blevesearch/bleve/v2"
|
||||||
"github.com/blevesearch/bleve/v2/analysis/analyzer/custom"
|
"github.com/blevesearch/bleve/v2/analysis/analyzer/custom"
|
||||||
"github.com/blevesearch/bleve/v2/analysis/token/lowercase"
|
"github.com/blevesearch/bleve/v2/analysis/token/lowercase"
|
||||||
|
@ -197,7 +199,7 @@ func (b *BleveIndexer) Close() {
|
||||||
|
|
||||||
// Index will save the index data
|
// Index will save the index data
|
||||||
func (b *BleveIndexer) Index(issues []*IndexerData) error {
|
func (b *BleveIndexer) Index(issues []*IndexerData) error {
|
||||||
batch := rupture.NewFlushingBatch(b.indexer, maxBatchSize)
|
batch := gitea_bleve.NewFlushingBatch(b.indexer, maxBatchSize)
|
||||||
for _, issue := range issues {
|
for _, issue := range issues {
|
||||||
if err := batch.Index(indexerID(issue.ID), struct {
|
if err := batch.Index(indexerID(issue.ID), struct {
|
||||||
RepoID int64
|
RepoID int64
|
||||||
|
@ -218,7 +220,7 @@ func (b *BleveIndexer) Index(issues []*IndexerData) error {
|
||||||
|
|
||||||
// Delete deletes indexes by ids
|
// Delete deletes indexes by ids
|
||||||
func (b *BleveIndexer) Delete(ids ...int64) error {
|
func (b *BleveIndexer) Delete(ids ...int64) error {
|
||||||
batch := rupture.NewFlushingBatch(b.indexer, maxBatchSize)
|
batch := gitea_bleve.NewFlushingBatch(b.indexer, maxBatchSize)
|
||||||
for _, id := range ids {
|
for _, id := range ids {
|
||||||
if err := batch.Delete(indexerID(id)); err != nil {
|
if err := batch.Delete(indexerID(id)); err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
Loading…
Reference in a new issue