mirror of
https://codeberg.org/forgejo/forgejo
synced 2024-11-25 03:06:10 +01:00
Fixed race condition when deleting documents by repoId in ElasticSearch (#32185)
Resolves #32184 --------- Signed-off-by: Bruno Sofiato <bruno.sofiato@gmail.com> (cherry picked from commit d266d190bd744b7b6f572bf69a42013e21b9be62)
This commit is contained in:
parent
00e5c68060
commit
4cb10ff28a
|
@ -20,6 +20,7 @@ import (
|
||||||
indexer_internal "code.gitea.io/gitea/modules/indexer/internal"
|
indexer_internal "code.gitea.io/gitea/modules/indexer/internal"
|
||||||
inner_elasticsearch "code.gitea.io/gitea/modules/indexer/internal/elasticsearch"
|
inner_elasticsearch "code.gitea.io/gitea/modules/indexer/internal/elasticsearch"
|
||||||
"code.gitea.io/gitea/modules/json"
|
"code.gitea.io/gitea/modules/json"
|
||||||
|
"code.gitea.io/gitea/modules/log"
|
||||||
"code.gitea.io/gitea/modules/setting"
|
"code.gitea.io/gitea/modules/setting"
|
||||||
"code.gitea.io/gitea/modules/timeutil"
|
"code.gitea.io/gitea/modules/timeutil"
|
||||||
"code.gitea.io/gitea/modules/typesniffer"
|
"code.gitea.io/gitea/modules/typesniffer"
|
||||||
|
@ -197,8 +198,33 @@ func (b *Indexer) Index(ctx context.Context, repo *repo_model.Repository, sha st
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Delete deletes indexes by ids
|
// Delete entries by repoId
|
||||||
func (b *Indexer) Delete(ctx context.Context, repoID int64) error {
|
func (b *Indexer) Delete(ctx context.Context, repoID int64) error {
|
||||||
|
if err := b.doDelete(ctx, repoID); err != nil {
|
||||||
|
// Maybe there is a conflict during the delete operation, so we should retry after a refresh
|
||||||
|
log.Warn("Deletion of entries of repo %v within index %v was erroneus. Trying to refresh index before trying again", repoID, b.inner.VersionedIndexName(), err)
|
||||||
|
if err := b.refreshIndex(ctx); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := b.doDelete(ctx, repoID); err != nil {
|
||||||
|
log.Error("Could not delete entries of repo %v within index %v", repoID, b.inner.VersionedIndexName())
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *Indexer) refreshIndex(ctx context.Context) error {
|
||||||
|
if _, err := b.inner.Client.Refresh(b.inner.VersionedIndexName()).Do(ctx); err != nil {
|
||||||
|
log.Error("Error while trying to refresh index %v", b.inner.VersionedIndexName(), err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete entries by repoId
|
||||||
|
func (b *Indexer) doDelete(ctx context.Context, repoID int64) error {
|
||||||
_, err := b.inner.Client.DeleteByQuery(b.inner.VersionedIndexName()).
|
_, err := b.inner.Client.DeleteByQuery(b.inner.VersionedIndexName()).
|
||||||
Query(elastic.NewTermsQuery("repo_id", repoID)).
|
Query(elastic.NewTermsQuery("repo_id", repoID)).
|
||||||
Do(ctx)
|
Do(ctx)
|
||||||
|
|
Loading…
Reference in a new issue