Merge db.Iterate and IterateObjects (#21641)
These two functions are similiar, merge them.
This commit is contained in:
parent
4ae3f76217
commit
9a70a12a34
12 changed files with 77 additions and 51 deletions
|
@ -83,35 +83,35 @@ var CmdMigrateStorage = cli.Command{
|
||||||
}
|
}
|
||||||
|
|
||||||
func migrateAttachments(ctx context.Context, dstStorage storage.ObjectStorage) error {
|
func migrateAttachments(ctx context.Context, dstStorage storage.ObjectStorage) error {
|
||||||
return db.IterateObjects(ctx, func(attach *repo_model.Attachment) error {
|
return db.Iterate(ctx, nil, func(ctx context.Context, attach *repo_model.Attachment) error {
|
||||||
_, err := storage.Copy(dstStorage, attach.RelativePath(), storage.Attachments, attach.RelativePath())
|
_, err := storage.Copy(dstStorage, attach.RelativePath(), storage.Attachments, attach.RelativePath())
|
||||||
return err
|
return err
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func migrateLFS(ctx context.Context, dstStorage storage.ObjectStorage) error {
|
func migrateLFS(ctx context.Context, dstStorage storage.ObjectStorage) error {
|
||||||
return db.IterateObjects(ctx, func(mo *git_model.LFSMetaObject) error {
|
return db.Iterate(ctx, nil, func(ctx context.Context, mo *git_model.LFSMetaObject) error {
|
||||||
_, err := storage.Copy(dstStorage, mo.RelativePath(), storage.LFS, mo.RelativePath())
|
_, err := storage.Copy(dstStorage, mo.RelativePath(), storage.LFS, mo.RelativePath())
|
||||||
return err
|
return err
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func migrateAvatars(ctx context.Context, dstStorage storage.ObjectStorage) error {
|
func migrateAvatars(ctx context.Context, dstStorage storage.ObjectStorage) error {
|
||||||
return db.IterateObjects(ctx, func(user *user_model.User) error {
|
return db.Iterate(ctx, nil, func(ctx context.Context, user *user_model.User) error {
|
||||||
_, err := storage.Copy(dstStorage, user.CustomAvatarRelativePath(), storage.Avatars, user.CustomAvatarRelativePath())
|
_, err := storage.Copy(dstStorage, user.CustomAvatarRelativePath(), storage.Avatars, user.CustomAvatarRelativePath())
|
||||||
return err
|
return err
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func migrateRepoAvatars(ctx context.Context, dstStorage storage.ObjectStorage) error {
|
func migrateRepoAvatars(ctx context.Context, dstStorage storage.ObjectStorage) error {
|
||||||
return db.IterateObjects(ctx, func(repo *repo_model.Repository) error {
|
return db.Iterate(ctx, nil, func(ctx context.Context, repo *repo_model.Repository) error {
|
||||||
_, err := storage.Copy(dstStorage, repo.CustomAvatarRelativePath(), storage.RepoAvatars, repo.CustomAvatarRelativePath())
|
_, err := storage.Copy(dstStorage, repo.CustomAvatarRelativePath(), storage.RepoAvatars, repo.CustomAvatarRelativePath())
|
||||||
return err
|
return err
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func migrateRepoArchivers(ctx context.Context, dstStorage storage.ObjectStorage) error {
|
func migrateRepoArchivers(ctx context.Context, dstStorage storage.ObjectStorage) error {
|
||||||
return db.IterateObjects(ctx, func(archiver *repo_model.RepoArchiver) error {
|
return db.Iterate(ctx, nil, func(ctx context.Context, archiver *repo_model.RepoArchiver) error {
|
||||||
p := archiver.RelativePath()
|
p := archiver.RelativePath()
|
||||||
_, err := storage.Copy(dstStorage, p, storage.RepoArchives, p)
|
_, err := storage.Copy(dstStorage, p, storage.RepoArchives, p)
|
||||||
return err
|
return err
|
||||||
|
@ -119,7 +119,7 @@ func migrateRepoArchivers(ctx context.Context, dstStorage storage.ObjectStorage)
|
||||||
}
|
}
|
||||||
|
|
||||||
func migratePackages(ctx context.Context, dstStorage storage.ObjectStorage) error {
|
func migratePackages(ctx context.Context, dstStorage storage.ObjectStorage) error {
|
||||||
return db.IterateObjects(ctx, func(pb *packages_model.PackageBlob) error {
|
return db.Iterate(ctx, nil, func(ctx context.Context, pb *packages_model.PackageBlob) error {
|
||||||
p := packages_module.KeyToRelativePath(packages_module.BlobHash256Key(pb.HashSHA256))
|
p := packages_module.KeyToRelativePath(packages_module.BlobHash256Key(pb.HashSHA256))
|
||||||
_, err := storage.Copy(dstStorage, p, storage.Packages, p)
|
_, err := storage.Copy(dstStorage, p, storage.Packages, p)
|
||||||
return err
|
return err
|
||||||
|
|
|
@ -8,9 +8,6 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
|
|
||||||
"code.gitea.io/gitea/modules/setting"
|
|
||||||
|
|
||||||
"xorm.io/builder"
|
|
||||||
"xorm.io/xorm/schemas"
|
"xorm.io/xorm/schemas"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -121,13 +118,6 @@ func WithTx(f func(ctx context.Context) error, stdCtx ...context.Context) error
|
||||||
return sess.Commit()
|
return sess.Commit()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Iterate iterates the databases and doing something
|
|
||||||
func Iterate(ctx context.Context, tableBean interface{}, cond builder.Cond, fun func(idx int, bean interface{}) error) error {
|
|
||||||
return GetEngine(ctx).Where(cond).
|
|
||||||
BufferSize(setting.Database.IterateBufferSize).
|
|
||||||
Iterate(tableBean, fun)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Insert inserts records into database
|
// Insert inserts records into database
|
||||||
func Insert(ctx context.Context, beans ...interface{}) error {
|
func Insert(ctx context.Context, beans ...interface{}) error {
|
||||||
_, err := GetEngine(ctx).Insert(beans...)
|
_, err := GetEngine(ctx).Insert(beans...)
|
||||||
|
|
|
@ -8,25 +8,30 @@ import (
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
"code.gitea.io/gitea/modules/setting"
|
"code.gitea.io/gitea/modules/setting"
|
||||||
|
|
||||||
|
"xorm.io/builder"
|
||||||
)
|
)
|
||||||
|
|
||||||
// IterateObjects iterate all the Bean object
|
// Iterate iterate all the Bean object
|
||||||
func IterateObjects[Object any](ctx context.Context, f func(repo *Object) error) error {
|
func Iterate[Bean any](ctx context.Context, cond builder.Cond, f func(ctx context.Context, bean *Bean) error) error {
|
||||||
var start int
|
var start int
|
||||||
batchSize := setting.Database.IterateBufferSize
|
batchSize := setting.Database.IterateBufferSize
|
||||||
sess := GetEngine(ctx)
|
sess := GetEngine(ctx)
|
||||||
for {
|
for {
|
||||||
repos := make([]*Object, 0, batchSize)
|
beans := make([]*Bean, 0, batchSize)
|
||||||
if err := sess.Limit(batchSize, start).Find(&repos); err != nil {
|
if cond != nil {
|
||||||
|
sess = sess.Where(cond)
|
||||||
|
}
|
||||||
|
if err := sess.Limit(batchSize, start).Find(&beans); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if len(repos) == 0 {
|
if len(beans) == 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
start += len(repos)
|
start += len(beans)
|
||||||
|
|
||||||
for _, repo := range repos {
|
for _, bean := range beans {
|
||||||
if err := f(repo); err != nil {
|
if err := f(ctx, bean); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
44
models/db/iterate_test.go
Normal file
44
models/db/iterate_test.go
Normal file
|
@ -0,0 +1,44 @@
|
||||||
|
// Copyright 2022 The Gitea Authors. All rights reserved.
|
||||||
|
// Use of this source code is governed by a MIT-style
|
||||||
|
// license that can be found in the LICENSE file.
|
||||||
|
|
||||||
|
package db_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"code.gitea.io/gitea/models/db"
|
||||||
|
repo_model "code.gitea.io/gitea/models/repo"
|
||||||
|
"code.gitea.io/gitea/models/unittest"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestIterate(t *testing.T) {
|
||||||
|
assert.NoError(t, unittest.PrepareTestDatabase())
|
||||||
|
xe := unittest.GetXORMEngine()
|
||||||
|
assert.NoError(t, xe.Sync(&repo_model.RepoUnit{}))
|
||||||
|
|
||||||
|
var repoCnt int
|
||||||
|
err := db.Iterate(db.DefaultContext, nil, func(ctx context.Context, repo *repo_model.RepoUnit) error {
|
||||||
|
repoCnt++
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.EqualValues(t, 79, repoCnt)
|
||||||
|
|
||||||
|
err = db.Iterate(db.DefaultContext, nil, func(ctx context.Context, repoUnit *repo_model.RepoUnit) error {
|
||||||
|
reopUnit2 := repo_model.RepoUnit{ID: repoUnit.ID}
|
||||||
|
has, err := db.GetByBean(ctx, &reopUnit2)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
} else if !has {
|
||||||
|
return db.ErrNotExist{Resource: "repo_unit", ID: repoUnit.ID}
|
||||||
|
}
|
||||||
|
assert.EqualValues(t, repoUnit.RepoID, repoUnit.RepoID)
|
||||||
|
assert.EqualValues(t, repoUnit.CreatedUnix, repoUnit.CreatedUnix)
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
assert.NoError(t, err)
|
||||||
|
}
|
|
@ -18,10 +18,9 @@ import (
|
||||||
func iterateUserAccounts(ctx context.Context, each func(*user.User) error) error {
|
func iterateUserAccounts(ctx context.Context, each func(*user.User) error) error {
|
||||||
err := db.Iterate(
|
err := db.Iterate(
|
||||||
ctx,
|
ctx,
|
||||||
new(user.User),
|
|
||||||
builder.Gt{"id": 0},
|
builder.Gt{"id": 0},
|
||||||
func(idx int, bean interface{}) error {
|
func(ctx context.Context, bean *user.User) error {
|
||||||
return each(bean.(*user.User))
|
return each(bean)
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
return err
|
return err
|
||||||
|
|
|
@ -269,13 +269,10 @@ func fixBrokenRepoUnits16961(ctx context.Context, logger log.Logger, autofix boo
|
||||||
|
|
||||||
err := db.Iterate(
|
err := db.Iterate(
|
||||||
ctx,
|
ctx,
|
||||||
new(RepoUnit),
|
|
||||||
builder.Gt{
|
builder.Gt{
|
||||||
"id": 0,
|
"id": 0,
|
||||||
},
|
},
|
||||||
func(idx int, bean interface{}) error {
|
func(ctx context.Context, unit *RepoUnit) error {
|
||||||
unit := bean.(*RepoUnit)
|
|
||||||
|
|
||||||
bs := unit.Config
|
bs := unit.Config
|
||||||
repoUnit := &repo_model.RepoUnit{
|
repoUnit := &repo_model.RepoUnit{
|
||||||
ID: unit.ID,
|
ID: unit.ID,
|
||||||
|
|
|
@ -21,10 +21,9 @@ import (
|
||||||
func iteratePRs(ctx context.Context, repo *repo_model.Repository, each func(*repo_model.Repository, *issues_model.PullRequest) error) error {
|
func iteratePRs(ctx context.Context, repo *repo_model.Repository, each func(*repo_model.Repository, *issues_model.PullRequest) error) error {
|
||||||
return db.Iterate(
|
return db.Iterate(
|
||||||
ctx,
|
ctx,
|
||||||
new(issues_model.PullRequest),
|
|
||||||
builder.Eq{"base_repo_id": repo.ID},
|
builder.Eq{"base_repo_id": repo.ID},
|
||||||
func(idx int, bean interface{}) error {
|
func(ctx context.Context, bean *issues_model.PullRequest) error {
|
||||||
return each(repo, bean.(*issues_model.PullRequest))
|
return each(repo, bean)
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
|
@ -30,10 +30,9 @@ import (
|
||||||
func iterateRepositories(ctx context.Context, each func(*repo_model.Repository) error) error {
|
func iterateRepositories(ctx context.Context, each func(*repo_model.Repository) error) error {
|
||||||
err := db.Iterate(
|
err := db.Iterate(
|
||||||
ctx,
|
ctx,
|
||||||
new(repo_model.Repository),
|
|
||||||
builder.Gt{"id": 0},
|
builder.Gt{"id": 0},
|
||||||
func(idx int, bean interface{}) error {
|
func(ctx context.Context, bean *repo_model.Repository) error {
|
||||||
return each(bean.(*repo_model.Repository))
|
return each(bean)
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
return err
|
return err
|
||||||
|
|
|
@ -5,6 +5,7 @@
|
||||||
package private
|
package private
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
stdCtx "context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
@ -60,7 +61,7 @@ func SendEmail(ctx *context.PrivateContext) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
err := db.IterateObjects(ctx, func(user *user_model.User) error {
|
err := db.Iterate(ctx, nil, func(ctx stdCtx.Context, user *user_model.User) error {
|
||||||
if len(user.Email) > 0 && user.IsActive {
|
if len(user.Email) > 0 && user.IsActive {
|
||||||
emails = append(emails, user.Email)
|
emails = append(emails, user.Email)
|
||||||
}
|
}
|
||||||
|
|
|
@ -96,7 +96,7 @@ func DeleteAvatar(repo *repo_model.Repository) error {
|
||||||
|
|
||||||
// RemoveRandomAvatars removes the randomly generated avatars that were created for repositories
|
// RemoveRandomAvatars removes the randomly generated avatars that were created for repositories
|
||||||
func RemoveRandomAvatars(ctx context.Context) error {
|
func RemoveRandomAvatars(ctx context.Context) error {
|
||||||
return db.IterateObjects(ctx, func(repository *repo_model.Repository) error {
|
return db.Iterate(ctx, nil, func(ctx context.Context, repository *repo_model.Repository) error {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return db.ErrCancelledf("before random avatars removed for %s", repository.FullName())
|
return db.ErrCancelledf("before random avatars removed for %s", repository.FullName())
|
||||||
|
|
|
@ -29,10 +29,8 @@ func GitFsck(ctx context.Context, timeout time.Duration, args []git.CmdArg) erro
|
||||||
|
|
||||||
if err := db.Iterate(
|
if err := db.Iterate(
|
||||||
ctx,
|
ctx,
|
||||||
new(repo_model.Repository),
|
|
||||||
builder.Expr("id>0 AND is_fsck_enabled=?", true),
|
builder.Expr("id>0 AND is_fsck_enabled=?", true),
|
||||||
func(idx int, bean interface{}) error {
|
func(ctx context.Context, repo *repo_model.Repository) error {
|
||||||
repo := bean.(*repo_model.Repository)
|
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return db.ErrCancelledf("before fsck of %s", repo.FullName())
|
return db.ErrCancelledf("before fsck of %s", repo.FullName())
|
||||||
|
@ -64,10 +62,8 @@ func GitGcRepos(ctx context.Context, timeout time.Duration, args ...git.CmdArg)
|
||||||
|
|
||||||
if err := db.Iterate(
|
if err := db.Iterate(
|
||||||
ctx,
|
ctx,
|
||||||
new(repo_model.Repository),
|
|
||||||
builder.Gt{"id": 0},
|
builder.Gt{"id": 0},
|
||||||
func(idx int, bean interface{}) error {
|
func(ctx context.Context, repo *repo_model.Repository) error {
|
||||||
repo := bean.(*repo_model.Repository)
|
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return db.ErrCancelledf("before GC of %s", repo.FullName())
|
return db.ErrCancelledf("before GC of %s", repo.FullName())
|
||||||
|
@ -113,10 +109,8 @@ func gatherMissingRepoRecords(ctx context.Context) ([]*repo_model.Repository, er
|
||||||
repos := make([]*repo_model.Repository, 0, 10)
|
repos := make([]*repo_model.Repository, 0, 10)
|
||||||
if err := db.Iterate(
|
if err := db.Iterate(
|
||||||
ctx,
|
ctx,
|
||||||
new(repo_model.Repository),
|
|
||||||
builder.Gt{"id": 0},
|
builder.Gt{"id": 0},
|
||||||
func(idx int, bean interface{}) error {
|
func(ctx context.Context, repo *repo_model.Repository) error {
|
||||||
repo := bean.(*repo_model.Repository)
|
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return db.ErrCancelledf("during gathering missing repo records before checking %s", repo.FullName())
|
return db.ErrCancelledf("during gathering missing repo records before checking %s", repo.FullName())
|
||||||
|
|
|
@ -25,10 +25,8 @@ func SyncRepositoryHooks(ctx context.Context) error {
|
||||||
|
|
||||||
if err := db.Iterate(
|
if err := db.Iterate(
|
||||||
ctx,
|
ctx,
|
||||||
new(repo_model.Repository),
|
|
||||||
builder.Gt{"id": 0},
|
builder.Gt{"id": 0},
|
||||||
func(idx int, bean interface{}) error {
|
func(ctx context.Context, repo *repo_model.Repository) error {
|
||||||
repo := bean.(*repo_model.Repository)
|
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return db.ErrCancelledf("before sync repository hooks for %s", repo.FullName())
|
return db.ErrCancelledf("before sync repository hooks for %s", repo.FullName())
|
||||||
|
|
Loading…
Reference in a new issue