From 1e3b851e19af5701c9091355bdb2142c70adc58d Mon Sep 17 00:00:00 2001 From: Aaron Liu Date: Wed, 11 Feb 2026 16:05:09 +0800 Subject: [PATCH] feat: full-text search and RAG powered search --- application/constants/constants.go | 2 +- application/dependency/dependency.go | 80 +++- application/dependency/options.go | 15 + assets | 2 +- go.mod | 3 +- go.sum | 6 +- inventory/common.go | 5 +- inventory/file.go | 19 +- inventory/setting.go | 13 + pkg/filemanager/fs/dbfs/file.go | 2 + pkg/filemanager/fs/dbfs/manage.go | 207 ++++++--- pkg/filemanager/fs/dbfs/upload.go | 23 +- pkg/filemanager/fs/fs.go | 82 +++- pkg/filemanager/manager/entity.go | 10 +- pkg/filemanager/manager/fulltextindex.go | 529 +++++++++++++++++++++++ pkg/filemanager/manager/manager.go | 12 + pkg/filemanager/manager/operation.go | 20 +- pkg/filemanager/manager/recycle.go | 1 - pkg/filemanager/manager/upload.go | 16 +- pkg/queue/task.go | 5 + pkg/searcher/extractor/noop.go | 15 + pkg/searcher/extractor/tika.go | 82 ++++ pkg/searcher/indexer.go | 49 +++ pkg/searcher/indexer/chunker.go | 101 +++++ pkg/searcher/indexer/chunker_test.go | 84 ++++ pkg/searcher/indexer/meilisearch.go | 384 ++++++++++++++++ pkg/searcher/indexer/noop.go | 46 ++ pkg/setting/provider.go | 47 ++ pkg/setting/types.go | 29 ++ routers/controllers/file.go | 14 + routers/router.go | 10 + service/basic/site.go | 2 + service/explorer/file.go | 22 + service/explorer/response.go | 22 + 34 files changed, 1851 insertions(+), 108 deletions(-) create mode 100644 pkg/filemanager/manager/fulltextindex.go create mode 100644 pkg/searcher/extractor/noop.go create mode 100644 pkg/searcher/extractor/tika.go create mode 100644 pkg/searcher/indexer.go create mode 100644 pkg/searcher/indexer/chunker.go create mode 100644 pkg/searcher/indexer/chunker_test.go create mode 100644 pkg/searcher/indexer/meilisearch.go create mode 100644 pkg/searcher/indexer/noop.go diff --git a/application/constants/constants.go b/application/constants/constants.go index 28afbf5c..bf97df7f 100644 --- a/application/constants/constants.go +++ b/application/constants/constants.go @@ -3,7 +3,7 @@ package constants // These values will be injected at build time, DO NOT EDIT. // BackendVersion 当前后端版本号 -var BackendVersion = "4.13.0" +var BackendVersion = "4.14.0" // IsPro 是否为Pro版本 var IsPro = "false" diff --git a/application/dependency/dependency.go b/application/dependency/dependency.go index d17e0500..aaab6e18 100644 --- a/application/dependency/dependency.go +++ b/application/dependency/dependency.go @@ -26,6 +26,9 @@ import ( "github.com/cloudreve/Cloudreve/v4/pkg/mediameta" "github.com/cloudreve/Cloudreve/v4/pkg/queue" "github.com/cloudreve/Cloudreve/v4/pkg/request" + "github.com/cloudreve/Cloudreve/v4/pkg/searcher" + "github.com/cloudreve/Cloudreve/v4/pkg/searcher/extractor" + "github.com/cloudreve/Cloudreve/v4/pkg/searcher/indexer" "github.com/cloudreve/Cloudreve/v4/pkg/setting" "github.com/cloudreve/Cloudreve/v4/pkg/thumb" "github.com/cloudreve/Cloudreve/v4/pkg/util" @@ -139,6 +142,10 @@ type Dep interface { EncryptorFactory(ctx context.Context) encrypt.CryptorFactory // EventHub Get a singleton eventhub.EventHub instance for event publishing. EventHub() eventhub.EventHub + // SearchIndexer Get a singleton searcher.SearchIndexer instance for full-text search indexing. + SearchIndexer(ctx context.Context) searcher.SearchIndexer + // TextExtractor Get a singleton searcher.TextExtractor instance for text extraction. + TextExtractor(ctx context.Context) searcher.TextExtractor } type dependency struct { @@ -187,6 +194,8 @@ type dependency struct { cron *cron.Cron masterEncryptKeyVault encrypt.MasterEncryptKeyVault eventHub eventhub.EventHub + searchIndexer searcher.SearchIndexer + textExtractor searcher.TextExtractor configPath string isPro bool @@ -380,6 +389,63 @@ func (d *dependency) EventHub() eventhub.EventHub { return d.eventHub } +func (d *dependency) SearchIndexer(ctx context.Context) searcher.SearchIndexer { + d.mu.Lock() + defer d.mu.Unlock() + + _, reload := ctx.Value(ReloadCtx{}).(bool) + if d.searchIndexer != nil && !reload { + return d.searchIndexer + } + + sp := d.SettingProvider() + if !sp.FTSEnabled(ctx) || sp.FTSIndexType(ctx) != setting.FTSIndexTypeMeilisearch { + d.searchIndexer = &indexer.NoopIndexer{} + return d.searchIndexer + } + + msCfg := sp.FTSIndexMeilisearch(ctx) + if msCfg.Endpoint == "" { + d.searchIndexer = &indexer.NoopIndexer{} + return d.searchIndexer + } + + idx := indexer.NewMeilisearchIndexer(msCfg, sp.FTSChunkSize(ctx), d.Logger()) + if err := idx.EnsureIndex(ctx); err != nil { + d.Logger().Warning("Failed to ensure Meilisearch index: %s, falling back to noop", err) + d.searchIndexer = &indexer.NoopIndexer{} + return d.searchIndexer + } + + d.searchIndexer = idx + return d.searchIndexer +} + +func (d *dependency) TextExtractor(ctx context.Context) searcher.TextExtractor { + d.mu.Lock() + defer d.mu.Unlock() + + _, reload := ctx.Value(ReloadCtx{}).(bool) + if d.textExtractor != nil && !reload { + return d.textExtractor + } + + sp := d.SettingProvider() + if sp.FTSExtractorType(ctx) != setting.FTSExtractorTypeTika { + d.textExtractor = &extractor.NoopExtractor{} + return d.textExtractor + } + + tikaCfg := sp.FTSTikaExtractor(ctx) + if tikaCfg.Endpoint == "" { + d.textExtractor = &extractor.NoopExtractor{} + return d.textExtractor + } + + d.textExtractor = extractor.NewTikaExtractor(d.RequestClient(), d.SettingProvider(), d.Logger(), tikaCfg) + return d.textExtractor +} + func (d *dependency) FsEventClient() inventory.FsEventClient { if d.fsEventClient != nil { return d.fsEventClient @@ -578,7 +644,13 @@ func (d *dependency) MediaMetaQueue(ctx context.Context) queue.Queue { queue.WithWorkerCount(queueSetting.WorkerNum), queue.WithName("MediaMetadataQueue"), queue.WithMaxTaskExecution(queueSetting.MaxExecution), - queue.WithResumeTaskType(queue.MediaMetaTaskType), + queue.WithResumeTaskType( + queue.MediaMetaTaskType, + queue.FullTextIndexTaskType, + queue.FullTextDeleteTaskType, + queue.FullTextCopyTaskType, + queue.FullTextChangeOwnerTaskType, + ), ) return d.mediaMetaQueue } @@ -900,6 +972,12 @@ func (d *dependency) Shutdown(ctx context.Context) error { }() } + if d.searchIndexer != nil { + if err := d.searchIndexer.Close(); err != nil { + d.Logger().Warning("Failed to close search indexer: %s", err) + } + } + d.mu.Unlock() wg.Wait() diff --git a/application/dependency/options.go b/application/dependency/options.go index 7046670e..2f052527 100644 --- a/application/dependency/options.go +++ b/application/dependency/options.go @@ -11,6 +11,7 @@ import ( "github.com/cloudreve/Cloudreve/v4/pkg/email" "github.com/cloudreve/Cloudreve/v4/pkg/hashid" "github.com/cloudreve/Cloudreve/v4/pkg/logging" + "github.com/cloudreve/Cloudreve/v4/pkg/searcher" "github.com/cloudreve/Cloudreve/v4/pkg/setting" "github.com/gin-contrib/static" ) @@ -158,3 +159,17 @@ func WithShareClient(s inventory.ShareClient) Option { o.shareClient = s }) } + +// WithSearchIndexer Set the default search indexer +func WithSearchIndexer(s searcher.SearchIndexer) Option { + return optionFunc(func(o *dependency) { + o.searchIndexer = s + }) +} + +// WithTextExtractor Set the default text extractor +func WithTextExtractor(s searcher.TextExtractor) Option { + return optionFunc(func(o *dependency) { + o.textExtractor = s + }) +} diff --git a/assets b/assets index 21a98194..4a38a946 160000 --- a/assets +++ b/assets @@ -1 +1 @@ -Subproject commit 21a98194339c89e375ac7e34fdb93d4aa0d213ef +Subproject commit 4a38a946cb40c3fe9878af0945c33a8b6987637b diff --git a/go.mod b/go.mod index e98e0101..1630ad15 100644 --- a/go.mod +++ b/go.mod @@ -29,7 +29,7 @@ require ( github.com/go-sql-driver/mysql v1.6.0 github.com/go-webauthn/webauthn v0.11.2 github.com/gofrs/uuid v4.0.0+incompatible - github.com/golang-jwt/jwt/v5 v5.2.2 + github.com/golang-jwt/jwt/v5 v5.3.0 github.com/gomodule/redigo v1.9.2 github.com/google/go-querystring v1.1.0 github.com/google/uuid v1.6.0 @@ -42,6 +42,7 @@ require ( github.com/juju/ratelimit v1.0.1 github.com/ks3sdklib/aws-sdk-go v1.6.2 github.com/lib/pq v1.10.9 + github.com/meilisearch/meilisearch-go v0.36.0 github.com/mholt/archives v0.1.3 github.com/mojocn/base64Captcha v0.0.0-20190801020520-752b1cd608b2 github.com/pquerna/otp v1.2.0 diff --git a/go.sum b/go.sum index a81cc4db..1cedf0d5 100644 --- a/go.sum +++ b/go.sum @@ -379,8 +379,8 @@ github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zV github.com/gogo/protobuf v1.3.0/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o= github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= -github.com/golang-jwt/jwt/v5 v5.2.2 h1:Rl4B7itRWVtYIHFrSNd7vhTiz9UpLdi6gZhZ3wEeDy8= -github.com/golang-jwt/jwt/v5 v5.2.2/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk= +github.com/golang-jwt/jwt/v5 v5.3.0 h1:pv4AsKCKKZuqlgs5sUmn4x8UlGa0kEVt/puTpKx9vvo= +github.com/golang-jwt/jwt/v5 v5.3.0/go.mod h1:fxCRLWMO43lRc8nhHWY6LGqRcf+1gQWArsqaEUEa5bE= github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0 h1:DACJavvAHhabrF08vX0COfcOBJRhZ8lUbR+ZWIs0Y5g= github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0/go.mod h1:E/TSTwGwJL78qG/PmXZO1EjYhfJinVAhrmmHX6Z8B9k= github.com/golang/geo v0.0.0-20190916061304-5b978397cfec/go.mod h1:QZ0nwyI2jOfgRAoBvP+ab5aRr7c9x7lhGEJrKvBwjWI= @@ -689,6 +689,8 @@ github.com/mattn/go-sqlite3 v1.14.16 h1:yOQRA0RpS5PFz/oikGwBEqvAWhWg5ufRz4ETLjwp github.com/mattn/go-sqlite3 v1.14.16/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg= github.com/mattn/go-zglob v0.0.1/go.mod h1:9fxibJccNxU2cnpIKLRRFA7zX7qhkJIQWBb449FYHOo= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= +github.com/meilisearch/meilisearch-go v0.36.0 h1:N1etykTektXt5KPcSbhBO0d5Xx5NaKj4pJWEM7WA5dI= +github.com/meilisearch/meilisearch-go v0.36.0/go.mod h1:HBfHzKMxcSbTOvqdfuRA/yf6Vk9IivcwKocWRuW7W78= github.com/mgutz/ansi v0.0.0-20170206155736-9520e82c474b/go.mod h1:01TrycV0kFyexm33Z7vhZRXopbI8J3TDReVlkTgMUxE= github.com/mholt/archives v0.1.3 h1:aEAaOtNra78G+TvV5ohmXrJOAzf++dIlYeDW3N9q458= github.com/mholt/archives v0.1.3/go.mod h1:LUCGp++/IbV/I0Xq4SzcIR6uwgeh2yjnQWamjRQfLTU= diff --git a/inventory/common.go b/inventory/common.go index 5db42fed..36305390 100644 --- a/inventory/common.go +++ b/inventory/common.go @@ -3,11 +3,12 @@ package inventory import ( "encoding/base64" "encoding/json" - "entgo.io/ent/dialect/sql" "fmt" + "time" + + "entgo.io/ent/dialect/sql" "github.com/cloudreve/Cloudreve/v4/pkg/conf" "github.com/cloudreve/Cloudreve/v4/pkg/hashid" - "time" ) type ( diff --git a/inventory/file.go b/inventory/file.go index 4d5734d9..ed7047aa 100644 --- a/inventory/file.go +++ b/inventory/file.go @@ -139,6 +139,12 @@ type ( ParentFiles []int PrimaryEntityParentFiles []int } + + CopyParameter struct { + Files []*ent.File + DstMap map[int][]*ent.File + ExcludedMetadataKeys []string + } ) type FileClient interface { @@ -187,7 +193,7 @@ type FileClient interface { // UpsertMetadata update or insert metadata UpsertMetadata(ctx context.Context, file *ent.File, data map[string]string, privateMask map[string]bool) error // Copy copies a layer of file to its corresponding destination folder. dstMap is a map from src parent ID to dst parent Files. - Copy(ctx context.Context, files []*ent.File, dstMap map[int][]*ent.File) (map[int][]*ent.File, StorageDiff, error) + Copy(ctx context.Context, args *CopyParameter) (map[int][]*ent.File, StorageDiff, error) // Delete deletes a group of files (and related models) with given entity recycle option Delete(ctx context.Context, files []*ent.File, options *types.EntityProps) ([]*ent.Entity, StorageDiff, error) // StaleEntities returns stale entities of a given file. If ID is not provided, all entities @@ -562,7 +568,9 @@ func (f *fileClient) Delete(ctx context.Context, files []*ent.File, options *typ return toBeRecycled, storageReduced, nil } -func (f *fileClient) Copy(ctx context.Context, files []*ent.File, dstMap map[int][]*ent.File) (map[int][]*ent.File, StorageDiff, error) { +func (f *fileClient) Copy(ctx context.Context, args *CopyParameter) (map[int][]*ent.File, StorageDiff, error) { + files := args.Files + dstMap := args.DstMap pageSize := capPageSize(f.maxSQlParam, intsets.MaxInt, 10) // 1. Copy files and metadata copyFileStm := lo.Map(files, func(file *ent.File, index int) *ent.FileCreate { @@ -603,12 +611,15 @@ func (f *fileClient) Copy(ctx context.Context, files []*ent.File, dstMap map[int return nil, nil, fmt.Errorf("failed to get metadata of file: %w", err) } - metadataStm = append(metadataStm, lo.Map(fileMetadata, func(metadata *ent.Metadata, index int) *ent.MetadataCreate { + metadataStm = append(metadataStm, lo.FilterMap(fileMetadata, func(metadata *ent.Metadata, index int) (*ent.MetadataCreate, bool) { + if lo.Contains(args.ExcludedMetadataKeys, metadata.Name) { + return nil, false + } return f.client.Metadata.Create(). SetName(metadata.Name). SetValue(metadata.Value). SetFile(newFile). - SetIsPublic(metadata.IsPublic) + SetIsPublic(metadata.IsPublic), true })...) fileEntities, err := files[index].Edges.EntitiesOrErr() diff --git a/inventory/setting.go b/inventory/setting.go index 307ddcac..65b70765 100644 --- a/inventory/setting.go +++ b/inventory/setting.go @@ -672,6 +672,19 @@ var DefaultSettings = map[string]string{ "fs_event_push_enabled": "1", "fs_event_push_max_age": "1209600", "fs_event_push_debounce": "5", + "fts_enabled": "0", + "fts_index_type": "", + "fts_extractor_type": "", + "fts_meilisearch_endpoint": "", + "fts_meilisearch_api_key": "", + "fts_meilisearch_page_size": "5", + "fts_meilisearch_embed_enabled": "0", + "fts_meilisearch_embed_config": "{}", + "fts_tika_endpoint": "", + "fts_tika_max_response_size": "10485760", + "fts_tika_exts": "pdf,doc,docx,xls,xlsx,ppt,pptx,odt,ods,odp,rtf,txt,md,html,htm,epub,csv", + "fts_tika_max_file_size": "26214400", + "fts_chunk_size": "2000", } var RedactedSettings = map[string]struct{}{ diff --git a/pkg/filemanager/fs/dbfs/file.go b/pkg/filemanager/fs/dbfs/file.go index 804598e0..a3a26b90 100644 --- a/pkg/filemanager/fs/dbfs/file.go +++ b/pkg/filemanager/fs/dbfs/file.go @@ -66,6 +66,8 @@ const ( ThumbMetadataPrefix = "thumb:" ThumbDisabledKey = ThumbMetadataPrefix + "disabled" + FullTextIndexKey = MetadataSysPrefix + "fulltext_index" + pathIndexRoot = 0 pathIndexUser = 1 ) diff --git a/pkg/filemanager/fs/dbfs/manage.go b/pkg/filemanager/fs/dbfs/manage.go index 240b76c8..acec701a 100644 --- a/pkg/filemanager/fs/dbfs/manage.go +++ b/pkg/filemanager/fs/dbfs/manage.go @@ -147,47 +147,48 @@ func (f *DBFS) Create(ctx context.Context, path *fs.URI, fileType types.FileType return ancestor, nil } -func (f *DBFS) Rename(ctx context.Context, path *fs.URI, newName string) (fs.File, error) { +func (f *DBFS) Rename(ctx context.Context, path *fs.URI, newName string) (fs.File, *fs.IndexDiff, error) { // Get navigator navigator, err := f.getNavigator(ctx, path, NavigatorCapabilityRenameFile, NavigatorCapabilityLockFile) if err != nil { - return nil, err + return nil, nil, err } // Get target file + ctx = context.WithValue(ctx, inventory.LoadFileMetadata{}, true) target, err := f.getFileByPath(ctx, navigator, path) if err != nil { - return nil, fmt.Errorf("failed to get target file: %w", err) + return nil, nil, fmt.Errorf("failed to get target file: %w", err) } oldName := target.Name() if _, ok := ctx.Value(ByPassOwnerCheckCtxKey{}).(bool); !ok && target.Owner().ID != f.user.ID { - return nil, fs.ErrOwnerOnly + return nil, nil, fs.ErrOwnerOnly } // Root folder cannot be modified if target.IsRootFolder() { - return nil, fs.ErrNotSupportedAction.WithError(fmt.Errorf("cannot modify root folder")) + return nil, nil, fs.ErrNotSupportedAction.WithError(fmt.Errorf("cannot modify root folder")) } // Validate new name if err := validateFileName(newName); err != nil { - return nil, fs.ErrIllegalObjectName.WithError(err) + return nil, nil, fs.ErrIllegalObjectName.WithError(err) } // If target is a file, validate file extension policy, err := f.getPreferredPolicy(ctx, target) if err != nil { - return nil, err + return nil, nil, err } if target.Type() == types.FileTypeFile { if err := validateExtension(newName, policy); err != nil { - return nil, fs.ErrIllegalObjectName.WithError(err) + return nil, nil, fs.ErrIllegalObjectName.WithError(err) } if err := validateFileNameRegexp(newName, policy); err != nil { - return nil, fs.ErrIllegalObjectName.WithError(err) + return nil, nil, fs.ErrIllegalObjectName.WithError(err) } } @@ -196,39 +197,54 @@ func (f *DBFS) Rename(ctx context.Context, path *fs.URI, newName string) (fs.Fil &LockByPath{target.Uri(true), target, target.Type(), ""}) defer func() { _ = f.Release(ctx, ls) }() if err != nil { - return nil, err + return nil, nil, err } // Rename target fc, tx, ctx, err := inventory.WithTx(ctx, f.fileClient) if err != nil { - return nil, serializer.NewError(serializer.CodeDBError, "Failed to start transaction", err) + return nil, nil, serializer.NewError(serializer.CodeDBError, "Failed to start transaction", err) } updated, err := fc.Rename(ctx, target.Model, newName) if err != nil { _ = inventory.Rollback(tx) if ent.IsConstraintError(err) { - return nil, fs.ErrFileExisted.WithError(err) + return nil, nil, fs.ErrFileExisted.WithError(err) } - return nil, serializer.NewError(serializer.CodeDBError, "failed to update file", err) + return nil, nil, serializer.NewError(serializer.CodeDBError, "failed to update file", err) } if target.Type() == types.FileTypeFile && !strings.EqualFold(filepath.Ext(newName), filepath.Ext(oldName)) { if err := fc.RemoveMetadata(ctx, target.Model, ThumbDisabledKey); err != nil { _ = inventory.Rollback(tx) - return nil, serializer.NewError(serializer.CodeDBError, "failed to remove disabled thumbnail mark", err) + return nil, nil, serializer.NewError(serializer.CodeDBError, "failed to remove disabled thumbnail mark", err) } } if err := inventory.Commit(tx); err != nil { - return nil, serializer.NewError(serializer.CodeDBError, "Failed to commit rename change", err) + return nil, nil, serializer.NewError(serializer.CodeDBError, "Failed to commit rename change", err) } f.emitFileRenamed(ctx, target, newName) - return target.Replace(updated), nil + originalMetadata := target.Metadata() + newFile := target.Replace(updated) + var diff *fs.IndexDiff + if _, ok := originalMetadata[FullTextIndexKey]; ok { + diff = &fs.IndexDiff{ + IndexToRename: []fs.IndexDiffRenameDetails{ + { + Uri: *newFile.Uri(false), + FileID: newFile.ID(), + EntityID: newFile.PrimaryEntityID(), + }, + }, + } + } + + return newFile, diff, nil } func (f *DBFS) SoftDelete(ctx context.Context, path ...*fs.URI) error { @@ -311,7 +327,7 @@ func (f *DBFS) SoftDelete(ctx context.Context, path ...*fs.URI) error { return ae.Aggregate() } -func (f *DBFS) Delete(ctx context.Context, path []*fs.URI, opts ...fs.Option) ([]fs.Entity, error) { +func (f *DBFS) Delete(ctx context.Context, path []*fs.URI, opts ...fs.Option) ([]fs.Entity, *fs.IndexDiff, error) { o := newDbfsOption() for _, opt := range opts { o.apply(opt) @@ -362,7 +378,7 @@ func (f *DBFS) Delete(ctx context.Context, path []*fs.URI, opts ...fs.Option) ([ targets := lo.Flatten(lo.Values(fileNavGroup)) if len(targets) == 0 { - return nil, ae.Aggregate() + return nil, nil, ae.Aggregate() } // Lock all targets lockTargets := lo.Map(targets, func(value *File, key int) *LockByPath { @@ -371,50 +387,55 @@ func (f *DBFS) Delete(ctx context.Context, path []*fs.URI, opts ...fs.Option) ([ ls, err := f.acquireByPath(ctx, -1, f.user, false, fs.LockApp(fs.ApplicationDelete), lockTargets...) defer func() { _ = f.Release(ctx, ls) }() if err != nil { - return nil, err + return nil, nil, err } fc, tx, ctx, err := inventory.WithTx(ctx, f.fileClient) if err != nil { - return nil, serializer.NewError(serializer.CodeDBError, "Failed to start transaction", err) + return nil, nil, serializer.NewError(serializer.CodeDBError, "Failed to start transaction", err) } // Delete targets - newStaleEntities, storageDiff, err := f.deleteFiles(ctx, fileNavGroup, fc, opt) + newStaleEntities, storageDiff, indexToDelete, err := f.deleteFiles(ctx, fileNavGroup, fc, opt) if err != nil { _ = inventory.Rollback(tx) - return nil, serializer.NewError(serializer.CodeDBError, "failed to delete files", err) + return nil, nil, serializer.NewError(serializer.CodeDBError, "failed to delete files", err) } tx.AppendStorageDiff(storageDiff) if err := inventory.CommitWithStorageDiff(ctx, tx, f.l, f.userClient); err != nil { - return nil, serializer.NewError(serializer.CodeDBError, "Failed to commit delete change", err) + return nil, nil, serializer.NewError(serializer.CodeDBError, "Failed to commit delete change", err) } f.emitFileDeleted(ctx, targets...) - return newStaleEntities, ae.Aggregate() + return newStaleEntities, &fs.IndexDiff{ + IndexToDelete: indexToDelete, + }, ae.Aggregate() } -func (f *DBFS) VersionControl(ctx context.Context, path *fs.URI, versionId int, delete bool) error { +func (f *DBFS) VersionControl(ctx context.Context, path *fs.URI, versionId int, delete bool) (*fs.IndexDiff, error) { // Get navigator navigator, err := f.getNavigator(ctx, path, NavigatorCapabilityVersionControl) if err != nil { - return err + return nil, err } // Get target file ctx = context.WithValue(ctx, inventory.LoadFileEntity{}, true) + if !delete { + ctx = context.WithValue(ctx, inventory.LoadFileMetadata{}, true) + } target, err := f.getFileByPath(ctx, navigator, path) if err != nil { - return fmt.Errorf("failed to get target file: %w", err) + return nil, fmt.Errorf("failed to get target file: %w", err) } if _, ok := ctx.Value(ByPassOwnerCheckCtxKey{}).(bool); !ok && target.Owner().ID != f.user.ID { - return fs.ErrOwnerOnly + return nil, fs.ErrOwnerOnly } // Target must be a file if target.Type() != types.FileTypeFile { - return fs.ErrNotSupportedAction.WithError(fmt.Errorf("target must be a valid file")) + return nil, fs.ErrNotSupportedAction.WithError(fmt.Errorf("target must be a valid file")) } // Lock file @@ -422,21 +443,38 @@ func (f *DBFS) VersionControl(ctx context.Context, path *fs.URI, versionId int, &LockByPath{target.Uri(true), target, target.Type(), ""}) defer func() { _ = f.Release(ctx, ls) }() if err != nil { - return err + return nil, err } if delete { storageDiff, err := f.deleteEntity(ctx, target, versionId) if err != nil { - return err + return nil, err } if err := f.userClient.ApplyStorageDiff(ctx, storageDiff); err != nil { f.l.Error("Failed to apply storage diff after deleting version: %s", err) } - return nil + return nil, nil } else { - return f.setCurrentVersion(ctx, target, versionId) + if err := f.setCurrentVersion(ctx, target, versionId); err != nil { + return nil, err + } + + if _, ok := target.Metadata()[FullTextIndexKey]; ok { + return &fs.IndexDiff{ + IndexToUpdate: []fs.IndexDiffUpdateDetails{ + { + Uri: *target.Uri(false), + FileID: target.ID(), + OwnerID: target.Owner().ID, + EntityID: versionId, + }, + }, + }, nil + } + + return nil, nil } } @@ -484,7 +522,7 @@ func (f *DBFS) Restore(ctx context.Context, path ...*fs.URI) error { // Copy each file to its original location for _, uris := range allTrashUriStr { - if err := f.MoveOrCopy(ctx, []*fs.URI{uris[0]}, uris[1], false); err != nil { + if _, err := f.MoveOrCopy(ctx, []*fs.URI{uris[0]}, uris[1], false); err != nil { if !ae.Merge(err) { ae.Add(uris[0].String(), err) } @@ -495,26 +533,26 @@ func (f *DBFS) Restore(ctx context.Context, path ...*fs.URI) error { } -func (f *DBFS) MoveOrCopy(ctx context.Context, path []*fs.URI, dst *fs.URI, isCopy bool) error { +func (f *DBFS) MoveOrCopy(ctx context.Context, path []*fs.URI, dst *fs.URI, isCopy bool) (*fs.IndexDiff, error) { targets := make([]*File, 0, len(path)) dstNavigator, err := f.getNavigator(ctx, dst, NavigatorCapabilityLockFile) if err != nil { - return err + return nil, err } // Get destination file destination, err := f.getFileByPath(ctx, dstNavigator, dst) if err != nil { - return fmt.Errorf("faield to get destination folder: %w", err) + return nil, fmt.Errorf("faield to get destination folder: %w", err) } if _, ok := ctx.Value(ByPassOwnerCheckCtxKey{}).(bool); !ok && destination.Owner().ID != f.user.ID { - return fs.ErrOwnerOnly + return nil, fs.ErrOwnerOnly } // Target must be a folder if !destination.CanHaveChildren() { - return fs.ErrNotSupportedAction.WithError(fmt.Errorf("destination must be a valid folder")) + return nil, fs.ErrNotSupportedAction.WithError(fmt.Errorf("destination must be a valid folder")) } ae := serializer.NewAggregateError() @@ -571,6 +609,7 @@ func (f *DBFS) MoveOrCopy(ctx context.Context, path []*fs.URI, dst *fs.URI, isCo } } + indexDiff := &fs.IndexDiff{} if len(targets) > 0 { // Lock all targets lockTargets := lo.Map(targets, func(value *File, key int) *LockByPath { @@ -598,33 +637,35 @@ func (f *DBFS) MoveOrCopy(ctx context.Context, path []*fs.URI, dst *fs.URI, isCo ls, err := f.acquireByPath(ctx, -1, f.user, false, fs.LockApp(fs.ApplicationMoveCopy), allLockTargets...) defer func() { _ = f.Release(ctx, ls) }() if err != nil { - return err + return nil, err } // Start transaction to move files fc, tx, ctx, err := inventory.WithTx(ctx, f.fileClient) if err != nil { - return serializer.NewError(serializer.CodeDBError, "Failed to start transaction", err) + return nil, serializer.NewError(serializer.CodeDBError, "Failed to start transaction", err) } var ( copiedNewTargetsMap map[int]*ent.File storageDiff inventory.StorageDiff + indexDiffBatch *fs.IndexDiff ) if isCopy { - copiedNewTargetsMap, storageDiff, err = f.copyFiles(ctx, fileNavGroup, destination, fc) + copiedNewTargetsMap, storageDiff, indexDiffBatch, err = f.copyFiles(ctx, fileNavGroup, destination, fc) } else { - storageDiff, err = f.moveFiles(ctx, targets, destination, fc, dstNavigator) + storageDiff, indexDiffBatch, err = f.moveFiles(ctx, targets, destination, fc, dstNavigator) } if err != nil { _ = inventory.Rollback(tx) - return err + return nil, err } + indexDiff.Merge(indexDiffBatch) tx.AppendStorageDiff(storageDiff) if err := inventory.CommitWithStorageDiff(ctx, tx, f.l, f.userClient); err != nil { - return serializer.NewError(serializer.CodeDBError, "Failed to commit move change", err) + return nil, serializer.NewError(serializer.CodeDBError, "Failed to commit move change", err) } for _, target := range targets { @@ -638,7 +679,7 @@ func (f *DBFS) MoveOrCopy(ctx context.Context, path []*fs.URI, dst *fs.URI, isCo // TODO: after move, dbfs cache should be cleared } - return ae.Aggregate() + return indexDiff, ae.Aggregate() } func (f *DBFS) GetFileFromDirectLink(ctx context.Context, dl *ent.DirectLink) (fs.File, error) { @@ -773,17 +814,18 @@ func (f *DBFS) setCurrentVersion(ctx context.Context, target *File, versionId in return nil } -func (f *DBFS) deleteFiles(ctx context.Context, targets map[Navigator][]*File, fc inventory.FileClient, opt *types.EntityProps) ([]fs.Entity, inventory.StorageDiff, error) { +func (f *DBFS) deleteFiles(ctx context.Context, targets map[Navigator][]*File, fc inventory.FileClient, opt *types.EntityProps) ([]fs.Entity, inventory.StorageDiff, []int, error) { if f.user.Edges.Group == nil { - return nil, nil, fmt.Errorf("user group not loaded") + return nil, nil, nil, fmt.Errorf("user group not loaded") } allStaleEntities := make([]fs.Entity, 0, len(targets)) storageDiff := make(inventory.StorageDiff) + indexToDelete := make([]int, 0) for n, files := range targets { // Let navigator use tx reset, err := n.FollowTx(ctx) if err != nil { - return nil, nil, err + return nil, nil, nil, err } defer reset() @@ -792,9 +834,12 @@ func (f *DBFS) deleteFiles(ctx context.Context, targets map[Navigator][]*File, f toBeDeletedFiles := make([]*File, 0, len(files)) if err := n.Walk(ctx, files, intsets.MaxInt, intsets.MaxInt, func(targets []*File, level int) error { toBeDeletedFiles = append(toBeDeletedFiles, targets...) + indexToDelete = append(indexToDelete, lo.Map(targets, func(item *File, index int) int { + return item.ID() + })...) return nil }); err != nil { - return nil, nil, fmt.Errorf("failed to walk files: %w", err) + return nil, nil, nil, fmt.Errorf("failed to walk files: %w", err) } // Delete files @@ -802,7 +847,7 @@ func (f *DBFS) deleteFiles(ctx context.Context, targets map[Navigator][]*File, f return item.Model }), opt) if err != nil { - return nil, nil, fmt.Errorf("failed to delete files: %w", err) + return nil, nil, nil, fmt.Errorf("failed to delete files: %w", err) } storageDiff.Merge(diff) allStaleEntities = append(allStaleEntities, lo.Map(staleEntities, func(item *ent.Entity, index int) fs.Entity { @@ -810,17 +855,17 @@ func (f *DBFS) deleteFiles(ctx context.Context, targets map[Navigator][]*File, f })...) } - return allStaleEntities, storageDiff, nil + return allStaleEntities, storageDiff, indexToDelete, nil } -func (f *DBFS) copyFiles(ctx context.Context, targets map[Navigator][]*File, destination *File, fc inventory.FileClient) (map[int]*ent.File, inventory.StorageDiff, error) { +func (f *DBFS) copyFiles(ctx context.Context, targets map[Navigator][]*File, destination *File, fc inventory.FileClient) (map[int]*ent.File, inventory.StorageDiff, *fs.IndexDiff, error) { if f.user.Edges.Group == nil { - return nil, nil, fmt.Errorf("user group not loaded") + return nil, nil, nil, fmt.Errorf("user group not loaded") } limit := max(f.user.Edges.Group.Settings.MaxWalkedFiles, 1) capacity, err := f.Capacity(ctx, destination.Owner()) if err != nil { - return nil, nil, fmt.Errorf("copy files: failed to destination owner capacity: %w", err) + return nil, nil, nil, fmt.Errorf("copy files: failed to destination owner capacity: %w", err) } dstAncestors := lo.Map(destination.AncestorsChain(), func(item *File, index int) *ent.File { @@ -830,6 +875,7 @@ func (f *DBFS) copyFiles(ctx context.Context, targets map[Navigator][]*File, des // newTargetsMap is the map of between new target files in first layer, and its src file ID. newTargetsMap := make(map[int]*ent.File) storageDiff := make(inventory.StorageDiff) + indexToCopy := make([]fs.IndexDiffCopyDetails, 0) var diff inventory.StorageDiff for n, files := range targets { initialDstMap := make(map[int][]*ent.File) @@ -841,7 +887,7 @@ func (f *DBFS) copyFiles(ctx context.Context, targets map[Navigator][]*File, des // Let navigator use tx reset, err := n.FollowTx(ctx) if err != nil { - return nil, nil, err + return nil, nil, nil, err } defer reset() @@ -858,9 +904,13 @@ func (f *DBFS) copyFiles(ctx context.Context, targets map[Navigator][]*File, des } limit -= len(targets) - initialDstMap, diff, err = fc.Copy(ctx, lo.Map(targets, func(item *File, index int) *ent.File { - return item.Model - }), initialDstMap) + initialDstMap, diff, err = fc.Copy(ctx, &inventory.CopyParameter{ + Files: lo.Map(targets, func(item *File, index int) *ent.File { + return item.Model + }), + ExcludedMetadataKeys: []string{FullTextIndexKey}, + DstMap: initialDstMap, + }) if err != nil { if ent.IsConstraintError(err) { return fs.ErrFileExisted.WithError(err) @@ -870,26 +920,45 @@ func (f *DBFS) copyFiles(ctx context.Context, targets map[Navigator][]*File, des } storageDiff.Merge(diff) - if firstLayer { for k, v := range initialDstMap { newTargetsMap[k] = v[0] } } + for _, file := range targets { + if _, ok := file.Metadata()[FullTextIndexKey]; ok { + copiedFile := newTargetsMap[file.ID()] + indexToCopy = append(indexToCopy, fs.IndexDiffCopyDetails{ + OriginalFileID: file.ID(), + FileID: copiedFile.ID, + Uri: *destination.Uri(false).Join(file.Name()), + EntityID: copiedFile.PrimaryEntity, + OwnerID: destination.OwnerID(), + }) + } + } + capacity.Used += sizeTotal firstLayer = false return nil }); err != nil { - return nil, nil, fmt.Errorf("failed to walk files: %w", err) + return nil, nil, nil, fmt.Errorf("failed to walk files: %w", err) } } - return newTargetsMap, storageDiff, nil + var indexDiff *fs.IndexDiff + if len(indexToCopy) > 0 { + indexDiff = &fs.IndexDiff{ + IndexToCopy: indexToCopy, + } + } + + return newTargetsMap, storageDiff, indexDiff, nil } -func (f *DBFS) moveFiles(ctx context.Context, targets []*File, destination *File, fc inventory.FileClient, n Navigator) (inventory.StorageDiff, error) { +func (f *DBFS) moveFiles(ctx context.Context, targets []*File, destination *File, fc inventory.FileClient, n Navigator) (inventory.StorageDiff, *fs.IndexDiff, error) { models := lo.Map(targets, func(value *File, key int) *ent.File { return value.Model }) @@ -897,10 +966,10 @@ func (f *DBFS) moveFiles(ctx context.Context, targets []*File, destination *File // Change targets' parent if err := fc.SetParent(ctx, models, destination.Model); err != nil { if ent.IsConstraintError(err) { - return nil, fs.ErrFileExisted.WithError(err) + return nil, nil, fs.ErrFileExisted.WithError(err) } - return nil, serializer.NewError(serializer.CodeDBError, "Failed to move file", err) + return nil, nil, serializer.NewError(serializer.CodeDBError, "Failed to move file", err) } var ( @@ -916,17 +985,17 @@ func (f *DBFS) moveFiles(ctx context.Context, targets []*File, destination *File // renaming it to its original name if _, err := fc.Rename(ctx, file.Model, file.DisplayName()); err != nil { if ent.IsConstraintError(err) { - return nil, fs.ErrFileExisted.WithError(err) + return nil, nil, fs.ErrFileExisted.WithError(err) } - return storageDiff, serializer.NewError(serializer.CodeDBError, "Failed to rename file from trash bin to its original name", err) + return storageDiff, nil, serializer.NewError(serializer.CodeDBError, "Failed to rename file from trash bin to its original name", err) } // Remove trash bin metadata if err := fc.RemoveMetadata(ctx, file.Model, MetadataRestoreUri, MetadataExpectedCollectTime); err != nil { - return storageDiff, serializer.NewError(serializer.CodeDBError, "Failed to remove trash related metadata", err) + return storageDiff, nil, serializer.NewError(serializer.CodeDBError, "Failed to remove trash related metadata", err) } } - return storageDiff, nil + return storageDiff, nil, nil } diff --git a/pkg/filemanager/fs/dbfs/upload.go b/pkg/filemanager/fs/dbfs/upload.go index c95b8184..e446213b 100644 --- a/pkg/filemanager/fs/dbfs/upload.go +++ b/pkg/filemanager/fs/dbfs/upload.go @@ -369,18 +369,18 @@ func (f *DBFS) CompleteUpload(ctx context.Context, session *fs.UploadSession) (f // - File still locked by uplaod session // - File unlocked, upload session valid // - File unlocked, upload session not valid -func (f *DBFS) CancelUploadSession(ctx context.Context, path *fs.URI, sessionID string, session *fs.UploadSession) ([]fs.Entity, error) { +func (f *DBFS) CancelUploadSession(ctx context.Context, path *fs.URI, sessionID string, session *fs.UploadSession) ([]fs.Entity, *fs.IndexDiff, error) { // Get placeholder file file, err := f.Get(ctx, path, WithFileEntities(), WithNotRoot()) if err != nil { - return nil, fmt.Errorf("failed to get placeholder file: %w", err) + return nil, nil, fmt.Errorf("failed to get placeholder file: %w", err) } filePrivate := file.(*File) // Make sure presented upload session is valid if session != nil && (session.UID != f.user.ID || session.FileID != file.ID()) { - return nil, serializer.NewError(serializer.CodeNotFound, "Upload session not found", nil) + return nil, nil, serializer.NewError(serializer.CodeNotFound, "Upload session not found", nil) } // Confirm locks on placeholder file @@ -393,7 +393,7 @@ func (f *DBFS) CancelUploadSession(ctx context.Context, path *fs.URI, sessionID } if _, ok := ctx.Value(ByPassOwnerCheckCtxKey{}).(bool); !ok && filePrivate.OwnerID() != f.user.ID { - return nil, fs.ErrOwnerOnly + return nil, nil, fs.ErrOwnerOnly } // Lock file @@ -402,7 +402,7 @@ func (f *DBFS) CancelUploadSession(ctx context.Context, path *fs.URI, sessionID defer func() { _ = f.Release(ctx, ls) }() ctx = fs.LockSessionToContext(ctx, ls) if err != nil { - return nil, err + return nil, nil, err } // Find placeholder entity @@ -416,12 +416,12 @@ func (f *DBFS) CancelUploadSession(ctx context.Context, path *fs.URI, sessionID // Remove upload session metadata if err := f.fileClient.RemoveMetadata(ctx, filePrivate.Model, MetadataUploadSessionID, ThumbDisabledKey); err != nil { - return nil, serializer.NewError(serializer.CodeDBError, "Failed to remove upload session metadata", err) + return nil, nil, serializer.NewError(serializer.CodeDBError, "Failed to remove upload session metadata", err) } if entity == nil { // Given upload session does not exist - return nil, nil + return nil, nil, nil } if session != nil && session.LockToken != "" { @@ -434,18 +434,19 @@ func (f *DBFS) CancelUploadSession(ctx context.Context, path *fs.URI, sessionID if len(filePrivate.Entities()) == 1 { // Only one placeholder entity, just delete this file - return f.Delete(ctx, []*fs.URI{path}) + entities, indexDiff, err := f.Delete(ctx, []*fs.URI{path}) + return entities, indexDiff, err } // Delete place holder entity storageDiff, err := f.deleteEntity(ctx, filePrivate, entity.ID()) if err != nil { - return nil, fmt.Errorf("failed to delete placeholder entity: %w", err) + return nil, nil, fmt.Errorf("failed to delete placeholder entity: %w", err) } if err := f.userClient.ApplyStorageDiff(ctx, storageDiff); err != nil { - return nil, fmt.Errorf("failed to apply storage diff: %w", err) + return nil, nil, fmt.Errorf("failed to apply storage diff: %w", err) } - return nil, nil + return nil, nil, nil } diff --git a/pkg/filemanager/fs/fs.go b/pkg/filemanager/fs/fs.go index 76a6195e..a7ea6ff4 100644 --- a/pkg/filemanager/fs/fs.go +++ b/pkg/filemanager/fs/fs.go @@ -76,11 +76,11 @@ type ( // List lists files under give path. List(ctx context.Context, path *URI, opts ...Option) (File, *ListFileResult, error) // Rename renames a file. - Rename(ctx context.Context, path *URI, newName string) (File, error) + Rename(ctx context.Context, path *URI, newName string) (File, *IndexDiff, error) // Move moves files to dst. - MoveOrCopy(ctx context.Context, path []*URI, dst *URI, isCopy bool) error + MoveOrCopy(ctx context.Context, path []*URI, dst *URI, isCopy bool) (*IndexDiff, error) // Delete performs hard-delete for given paths, return newly generated stale entities in this delete operation. - Delete(ctx context.Context, path []*URI, opts ...Option) ([]Entity, error) + Delete(ctx context.Context, path []*URI, opts ...Option) ([]Entity, *IndexDiff, error) // GetEntitiesFromFileID returns all entities of a given file. GetEntity(ctx context.Context, entityID int) (Entity, error) // UpsertMetadata update or insert metadata of a file. @@ -92,7 +92,7 @@ type ( // VersionControl performs version control on given file. // - `delete` is false: set version as current version; // - `delete` is true: delete version. - VersionControl(ctx context.Context, path *URI, versionId int, delete bool) error + VersionControl(ctx context.Context, path *URI, versionId int, delete bool) (*IndexDiff, error) // GetFileFromDirectLink gets a file from a direct link. GetFileFromDirectLink(ctx context.Context, dl *ent.DirectLink) (File, error) // TraverseFile traverses a file to its root file, return the file with linked root. @@ -108,7 +108,7 @@ type ( // CompleteUpload completes an upload session. CompleteUpload(ctx context.Context, session *UploadSession) (File, error) // CancelUploadSession cancels an upload session. Delete the placeholder file if no other entity is created. - CancelUploadSession(ctx context.Context, path *URI, sessionID string, session *UploadSession) ([]Entity, error) + CancelUploadSession(ctx context.Context, path *URI, sessionID string, session *UploadSession) ([]Entity, *IndexDiff, error) // PreValidateUpload pre-validates an upload request. PreValidateUpload(ctx context.Context, dst *URI, files ...PreValidateFile) error } @@ -808,3 +808,75 @@ func NewEmptyEntity(u *ent.User) Entity { }, } } + +type ( + // IndexDiff is the difference between the old and new full-text index after file operation. + IndexDiff struct { + IndexToCopy []IndexDiffCopyDetails + IndexToChangeOwner []IndexDiffOwnerChangeDetails + IndexToRename []IndexDiffRenameDetails + IndexToDelete []int + IndexToUpdate []IndexDiffUpdateDetails + } + IndexDiffUpdateDetails struct { + Uri URI + FileID int + OwnerID int + EntityID int + } + IndexDiffCopyDetails struct { + Uri URI + OriginalFileID int + FileID int + OwnerID int + EntityID int + } + IndexDiffOwnerChangeDetails struct { + Uri URI + FileID int + EntityID int + OriginalOwnerID int + NewOwnerID int + } + IndexDiffRenameDetails struct { + Uri URI + FileID int + EntityID int + } +) + +func (i *IndexDiff) Merge(d *IndexDiff) { + if i.IndexToCopy == nil { + i.IndexToCopy = make([]IndexDiffCopyDetails, 0) + } + if i.IndexToChangeOwner == nil { + i.IndexToChangeOwner = make([]IndexDiffOwnerChangeDetails, 0) + } + if i.IndexToDelete == nil { + i.IndexToDelete = make([]int, 0) + } + if i.IndexToRename == nil { + i.IndexToRename = make([]IndexDiffRenameDetails, 0) + } + if i.IndexToUpdate == nil { + i.IndexToUpdate = make([]IndexDiffUpdateDetails, 0) + } + if d == nil { + return + } + if len(d.IndexToCopy) > 0 { + i.IndexToCopy = append(i.IndexToCopy, d.IndexToCopy...) + } + if len(d.IndexToChangeOwner) > 0 { + i.IndexToChangeOwner = append(i.IndexToChangeOwner, d.IndexToChangeOwner...) + } + if len(d.IndexToDelete) > 0 { + i.IndexToDelete = append(i.IndexToDelete, d.IndexToDelete...) + } + if len(d.IndexToRename) > 0 { + i.IndexToRename = append(i.IndexToRename, d.IndexToRename...) + } + if len(d.IndexToUpdate) > 0 { + i.IndexToUpdate = append(i.IndexToUpdate, d.IndexToUpdate...) + } +} diff --git a/pkg/filemanager/manager/entity.go b/pkg/filemanager/manager/entity.go index a452d943..27ce4108 100644 --- a/pkg/filemanager/manager/entity.go +++ b/pkg/filemanager/manager/entity.go @@ -346,11 +346,15 @@ func (m *manager) GetEntitySource(ctx context.Context, entityID int, opts ...fs. } func (l *manager) SetCurrentVersion(ctx context.Context, path *fs.URI, version int) error { - return l.fs.VersionControl(ctx, path, version, false) + indexDiff, err := l.fs.VersionControl(ctx, path, version, false) + + l.processIndexDiff(ctx, indexDiff) + return err } func (l *manager) DeleteVersion(ctx context.Context, path *fs.URI, version int) error { - return l.fs.VersionControl(ctx, path, version, true) + _, err := l.fs.VersionControl(ctx, path, version, true) + return err } func (l *manager) ListPhysical(ctx context.Context, path string, policyID int, recursive bool, progress driver.ListProgressFunc) ([]fs.PhysicalObject, error) { @@ -392,7 +396,7 @@ func (l *manager) ImportPhysical(ctx context.Context, dst *fs.URI, policyId int, return err } - l.onNewEntityUploaded(ctx, uploadSession, d) + l.onNewEntityUploaded(ctx, uploadSession, d, l.user.ID) } return nil diff --git a/pkg/filemanager/manager/fulltextindex.go b/pkg/filemanager/manager/fulltextindex.go new file mode 100644 index 00000000..4c6e8d91 --- /dev/null +++ b/pkg/filemanager/manager/fulltextindex.go @@ -0,0 +1,529 @@ +package manager + +import ( + "context" + "encoding/json" + "fmt" + + "github.com/cloudreve/Cloudreve/v4/application/dependency" + "github.com/cloudreve/Cloudreve/v4/ent" + "github.com/cloudreve/Cloudreve/v4/ent/task" + "github.com/cloudreve/Cloudreve/v4/inventory" + "github.com/cloudreve/Cloudreve/v4/inventory/types" + "github.com/cloudreve/Cloudreve/v4/pkg/filemanager/fs" + "github.com/cloudreve/Cloudreve/v4/pkg/filemanager/fs/dbfs" + "github.com/cloudreve/Cloudreve/v4/pkg/hashid" + "github.com/cloudreve/Cloudreve/v4/pkg/logging" + "github.com/cloudreve/Cloudreve/v4/pkg/queue" + "github.com/cloudreve/Cloudreve/v4/pkg/searcher" + "github.com/cloudreve/Cloudreve/v4/pkg/util" + "github.com/samber/lo" +) + +type ( + FullTextIndexTask struct { + *queue.DBTask + } + + FullTextIndexTaskState struct { + Uri *fs.URI `json:"uri"` + EntityID int `json:"entity_id"` + FileID int `json:"file_id"` + OwnerID int `json:"owner_id"` + } + + ftsFileInfo struct { + FileID int + OwnerID int + EntityID int + FileName string + } +) + +func (m *manager) SearchFullText(ctx context.Context, query string, offset int) (*FullTextSearchResults, error) { + indexer := m.dep.SearchIndexer(ctx) + results, total, err := indexer.Search(ctx, m.user.ID, query, offset) + if err != nil { + return nil, fmt.Errorf("failed to search full text: %w", err) + } + + if len(results) == 0 { + // No results. + return &FullTextSearchResults{}, nil + } + + // Traverse each file in result + files := lo.FilterMap(results, func(result searcher.SearchResult, _ int) (FullTextSearchResult, bool) { + file, err := m.TraverseFile(ctx, result.FileID) + if err != nil { + m.l.Debug("Failed to traverse file %d for full text search: %s, skipping.", result.FileID, err) + return FullTextSearchResult{}, false + } + + return FullTextSearchResult{ + File: file, + Content: result.Text, + }, true + }) + + if len(files) == 0 { + // No valid files, run next offset + return m.SearchFullText(ctx, query, offset+len(results)) + } + + return &FullTextSearchResults{ + Hits: files, + Total: total, + }, nil +} + +func init() { + queue.RegisterResumableTaskFactory(queue.FullTextIndexTaskType, NewFullTextIndexTaskFromModel) + queue.RegisterResumableTaskFactory(queue.FullTextCopyTaskType, NewFullTextCopyTaskFromModel) + queue.RegisterResumableTaskFactory(queue.FullTextChangeOwnerTaskType, NewFullTextChangeOwnerTaskFromModel) + queue.RegisterResumableTaskFactory(queue.FullTextDeleteTaskType, NewFullTextDeleteTaskFromModel) +} + +func NewFullTextIndexTask(ctx context.Context, uri *fs.URI, entityID, fileID, ownerID int, creator *ent.User) (*FullTextIndexTask, error) { + state := &FullTextIndexTaskState{ + Uri: uri, + EntityID: entityID, + FileID: fileID, + OwnerID: ownerID, + } + stateBytes, err := json.Marshal(state) + if err != nil { + return nil, fmt.Errorf("failed to marshal state: %w", err) + } + + return &FullTextIndexTask{ + DBTask: &queue.DBTask{ + DirectOwner: creator, + Task: &ent.Task{ + Type: queue.FullTextIndexTaskType, + CorrelationID: logging.CorrelationID(ctx), + PrivateState: string(stateBytes), + PublicState: &types.TaskPublicState{}, + }, + }, + }, nil +} + +func NewFullTextIndexTaskFromModel(t *ent.Task) queue.Task { + return &FullTextIndexTask{ + DBTask: &queue.DBTask{ + Task: t, + }, + } +} + +type ( + FullTextCopyTask struct { + *queue.DBTask + } + + FullTextCopyTaskState struct { + Uri *fs.URI `json:"uri"` + OriginalFileID int `json:"original_file_id"` + FileID int `json:"file_id"` + OwnerID int `json:"owner_id"` + EntityID int `json:"entity_id"` + } +) + +func NewFullTextCopyTask(ctx context.Context, uri *fs.URI, originalFileID, fileID, ownerID, entityID int, creator *ent.User) (*FullTextCopyTask, error) { + state := &FullTextCopyTaskState{ + Uri: uri, + OriginalFileID: originalFileID, + FileID: fileID, + OwnerID: ownerID, + EntityID: entityID, + } + stateBytes, err := json.Marshal(state) + if err != nil { + return nil, fmt.Errorf("failed to marshal state: %w", err) + } + + return &FullTextCopyTask{ + DBTask: &queue.DBTask{ + DirectOwner: creator, + Task: &ent.Task{ + Type: queue.FullTextCopyTaskType, + CorrelationID: logging.CorrelationID(ctx), + PrivateState: string(stateBytes), + PublicState: &types.TaskPublicState{}, + }, + }, + }, nil +} + +func NewFullTextCopyTaskFromModel(t *ent.Task) queue.Task { + return &FullTextCopyTask{ + DBTask: &queue.DBTask{ + Task: t, + }, + } +} + +func (t *FullTextCopyTask) Do(ctx context.Context) (task.Status, error) { + dep := dependency.FromContext(ctx) + l := dep.Logger() + fm := NewFileManager(dep, inventory.UserFromContext(ctx)).(*manager) + + if !fm.settings.FTSEnabled(ctx) { + l.Debug("FTS disabled, skipping full text copy task.") + return task.StatusCompleted, nil + } + + var state FullTextCopyTaskState + if err := json.Unmarshal([]byte(t.State()), &state); err != nil { + return task.StatusError, fmt.Errorf("failed to unmarshal state: %s (%w)", err, queue.CriticalErr) + } + + // Get fresh file to make sure task is not stale. + file, err := fm.Get(ctx, state.Uri, dbfs.WithFilePublicMetadata()) + if err != nil { + return task.StatusError, fmt.Errorf("failed to get latest file: %w", err) + } + + if file.PrimaryEntityID() != state.EntityID { + l.Debug("File %d entity changed, skipping copy index.", state.FileID) + return task.StatusCompleted, nil + } + + indexer := dep.SearchIndexer(ctx) + if err := indexer.CopyByFileID(ctx, state.OriginalFileID, state.FileID, state.OwnerID, state.EntityID); err != nil { + l.Warning("Failed to copy index from file %d to %d, falling back to full indexing: %s", state.OriginalFileID, state.FileID, err) + return performIndexing(ctx, fm, state.Uri, state.EntityID, state.FileID, state.OwnerID, file.Name(), false) + } + + // Patch metadata to mark file as indexed. + if err := fm.fs.PatchMetadata(ctx, []*fs.URI{state.Uri}, fs.MetadataPatch{ + Key: dbfs.FullTextIndexKey, + Value: hashid.EncodeEntityID(fm.hasher, state.EntityID), + }); err != nil { + return task.StatusError, fmt.Errorf("failed to patch metadata: %w", err) + } + + l.Debug("Successfully copied index from file %d to %d.", state.OriginalFileID, state.FileID) + return task.StatusCompleted, nil +} + +type ( + FullTextChangeOwnerTask struct { + *queue.DBTask + } + + FullTextChangeOwnerTaskState struct { + Uri *fs.URI `json:"uri"` + EntityID int `json:"entity_id"` + FileID int `json:"file_id"` + OriginalOwnerID int `json:"original_owner_id"` + NewOwnerID int `json:"new_owner_id"` + } +) + +func NewFullTextChangeOwnerTask(ctx context.Context, uri *fs.URI, entityID, fileID, originalOwnerID, newOwnerID int, creator *ent.User) (*FullTextChangeOwnerTask, error) { + state := &FullTextChangeOwnerTaskState{ + Uri: uri, + EntityID: entityID, + FileID: fileID, + OriginalOwnerID: originalOwnerID, + NewOwnerID: newOwnerID, + } + stateBytes, err := json.Marshal(state) + if err != nil { + return nil, fmt.Errorf("failed to marshal state: %w", err) + } + + return &FullTextChangeOwnerTask{ + DBTask: &queue.DBTask{ + DirectOwner: creator, + Task: &ent.Task{ + Type: queue.FullTextChangeOwnerTaskType, + CorrelationID: logging.CorrelationID(ctx), + PrivateState: string(stateBytes), + PublicState: &types.TaskPublicState{}, + }, + }, + }, nil +} + +func NewFullTextChangeOwnerTaskFromModel(t *ent.Task) queue.Task { + return &FullTextChangeOwnerTask{ + DBTask: &queue.DBTask{ + Task: t, + }, + } +} + +func (t *FullTextChangeOwnerTask) Do(ctx context.Context) (task.Status, error) { + dep := dependency.FromContext(ctx) + l := dep.Logger() + fm := NewFileManager(dep, inventory.UserFromContext(ctx)).(*manager) + + if !fm.settings.FTSEnabled(ctx) { + l.Debug("FTS disabled, skipping full text change owner task.") + return task.StatusCompleted, nil + } + + var state FullTextChangeOwnerTaskState + if err := json.Unmarshal([]byte(t.State()), &state); err != nil { + return task.StatusError, fmt.Errorf("failed to unmarshal state: %s (%w)", err, queue.CriticalErr) + } + + // Get fresh file to make sure task is not stale. + file, err := fm.Get(ctx, state.Uri, dbfs.WithFilePublicMetadata()) + if err != nil { + return task.StatusError, fmt.Errorf("failed to get latest file: %w", err) + } + + if file.PrimaryEntityID() != state.EntityID { + l.Debug("File %d entity changed, skipping owner change.", state.FileID) + return task.StatusCompleted, nil + } + + indexer := dep.SearchIndexer(ctx) + if err := indexer.ChangeOwner(ctx, state.FileID, state.OriginalOwnerID, state.NewOwnerID); err != nil { + return task.StatusError, fmt.Errorf("failed to change owner for file %d: %w", state.FileID, err) + } + + l.Debug("Successfully changed index owner for file %d from %d to %d.", state.FileID, state.OriginalOwnerID, state.NewOwnerID) + return task.StatusCompleted, nil +} + +type ( + FullTextDeleteTask struct { + *queue.DBTask + } + + FullTextDeleteTaskState struct { + FileIDs []int `json:"file_ids"` + } +) + +func NewFullTextDeleteTask(ctx context.Context, fileIDs []int, creator *ent.User) (*FullTextDeleteTask, error) { + state := &FullTextDeleteTaskState{ + FileIDs: fileIDs, + } + stateBytes, err := json.Marshal(state) + if err != nil { + return nil, fmt.Errorf("failed to marshal state: %w", err) + } + + return &FullTextDeleteTask{ + DBTask: &queue.DBTask{ + DirectOwner: creator, + Task: &ent.Task{ + Type: queue.FullTextDeleteTaskType, + CorrelationID: logging.CorrelationID(ctx), + PrivateState: string(stateBytes), + PublicState: &types.TaskPublicState{}, + }, + }, + }, nil +} + +func NewFullTextDeleteTaskFromModel(t *ent.Task) queue.Task { + return &FullTextDeleteTask{ + DBTask: &queue.DBTask{ + Task: t, + }, + } +} + +func (t *FullTextDeleteTask) Do(ctx context.Context) (task.Status, error) { + dep := dependency.FromContext(ctx) + l := dep.Logger() + + var state FullTextDeleteTaskState + if err := json.Unmarshal([]byte(t.State()), &state); err != nil { + return task.StatusError, fmt.Errorf("failed to unmarshal state: %s (%w)", err, queue.CriticalErr) + } + + indexer := dep.SearchIndexer(ctx) + if err := indexer.DeleteByFileIDs(ctx, state.FileIDs...); err != nil { + return task.StatusError, fmt.Errorf("failed to delete index for %d file(s): %w", len(state.FileIDs), err) + } + + l.Debug("Successfully deleted index for %d file(s).", len(state.FileIDs)) + return task.StatusCompleted, nil +} + +func (t *FullTextIndexTask) Do(ctx context.Context) (task.Status, error) { + dep := dependency.FromContext(ctx) + l := dep.Logger() + fm := NewFileManager(dep, inventory.UserFromContext(ctx)).(*manager) + + // Check FTS enabled + if !fm.settings.FTSEnabled(ctx) { + l.Debug("FTS disabled, skipping full text index task.") + return task.StatusCompleted, nil + } + + // Unmarshal state + var state FullTextIndexTaskState + if err := json.Unmarshal([]byte(t.State()), &state); err != nil { + return task.StatusError, fmt.Errorf("failed to unmarshal state: %s (%w)", err, queue.CriticalErr) + } + + // Get fresh file to make sure task is not stale + file, err := fm.Get(ctx, state.Uri, dbfs.WithFilePublicMetadata()) + if err != nil { + return task.StatusError, fmt.Errorf("failed to get latest file: %w", err) + } + + if file.PrimaryEntityID() != state.EntityID { + l.Debug("File %d is not the latest version, skipping indexing.", state.FileID) + return task.StatusCompleted, nil + } + + deleteOldChunks := false + if _, ok := file.Metadata()[dbfs.FullTextIndexKey]; ok { + deleteOldChunks = true + } + + return performIndexing(ctx, fm, state.Uri, state.EntityID, state.FileID, state.OwnerID, state.Uri.Name(), deleteOldChunks) +} + +// performIndexing extracts text from the entity and indexes it. This is shared between +// the regular index task and the copy task (as a fallback when copy fails). +func performIndexing(ctx context.Context, fm *manager, uri *fs.URI, entityID, fileID, ownerID int, fileName string, deleteOldChunks bool) (task.Status, error) { + dep := fm.dep + l := dep.Logger() + + // Get entity source + source, err := fm.GetEntitySource(ctx, entityID) + if err != nil { + return task.StatusError, fmt.Errorf("failed to get entity source: %w", err) + } + defer source.Close() + + // Extract text + var text string + if source.Entity().Size() > 0 { + extractor := dep.TextExtractor(ctx) + text, err = extractor.Extract(ctx, source) + if err != nil { + l.Warning("Failed to extract text for file %d: %s", fileID, err) + return task.StatusCompleted, nil + } + } + + indexer := dep.SearchIndexer(ctx) + + // Delete old chunks first so that stale chunks from a previously longer + // version of the file are removed before upserting the new (possibly fewer) + // chunks. + if deleteOldChunks { + if err := indexer.DeleteByFileIDs(ctx, fileID); err != nil { + l.Warning("Failed to delete old index chunks for file %d: %s", fileID, err) + } + } + + // Index via SearchIndexer + if err := indexer.IndexFile(ctx, ownerID, fileID, entityID, fileName, text); err != nil { + return task.StatusError, fmt.Errorf("failed to index file %d: %w", fileID, err) + } + + // Upsert metadata + if err := fm.fs.PatchMetadata(ctx, []*fs.URI{uri}, fs.MetadataPatch{ + Key: dbfs.FullTextIndexKey, + Value: hashid.EncodeEntityID(fm.hasher, entityID), + }); err != nil { + return task.StatusError, fmt.Errorf("failed to patch metadata: %w", err) + } + + l.Debug("Successfully indexed file %d for owner %d.", fileID, ownerID) + return task.StatusCompleted, nil +} + +// shouldIndexFullText checks if a file should be indexed for full-text search. +func (m *manager) shouldIndexFullText(ctx context.Context, fileName string, size int64) bool { + if !m.settings.FTSEnabled(ctx) { + return false + } + + extractor := m.dep.TextExtractor(ctx) + return util.IsInExtensionList(extractor.Exts(), fileName) && extractor.MaxFileSize() > size +} + +// fullTextIndexForNewEntity creates and queues a full text index task for a newly uploaded entity. +func (m *manager) fullTextIndexForNewEntity(ctx context.Context, session *fs.UploadSession, owner int) { + if session.Props.EntityType != nil && *session.Props.EntityType != types.EntityTypeVersion { + return + } + + if !m.shouldIndexFullText(ctx, session.Props.Uri.Name(), session.Props.Size) { + return + } + + t, err := NewFullTextIndexTask(ctx, session.Props.Uri, session.EntityID, session.FileID, owner, m.user) + if err != nil { + m.l.Warning("Failed to create full text index task: %s", err) + return + } + if err := m.dep.MediaMetaQueue(ctx).QueueTask(ctx, t); err != nil { + m.l.Warning("Failed to queue full text index task: %s", err) + } +} + +func (m *manager) processIndexDiff(ctx context.Context, diff *fs.IndexDiff) { + if diff == nil { + return + } + + for _, update := range diff.IndexToUpdate { + t, err := NewFullTextIndexTask(ctx, &update.Uri, update.EntityID, update.FileID, update.OwnerID, m.user) + if err != nil { + m.l.Warning("Failed to create full text update task: %s", err) + continue + } + if err := m.dep.MediaMetaQueue(ctx).QueueTask(ctx, t); err != nil { + m.l.Warning("Failed to queue full text update task: %s", err) + } + } + + for _, cp := range diff.IndexToCopy { + t, err := NewFullTextCopyTask(ctx, &cp.Uri, cp.OriginalFileID, cp.FileID, cp.OwnerID, cp.EntityID, m.user) + if err != nil { + m.l.Warning("Failed to create full text copy task: %s", err) + continue + } + if err := m.dep.MediaMetaQueue(ctx).QueueTask(ctx, t); err != nil { + m.l.Warning("Failed to queue full text copy task: %s", err) + } + } + + for _, change := range diff.IndexToChangeOwner { + t, err := NewFullTextChangeOwnerTask(ctx, &change.Uri, change.EntityID, change.FileID, change.OriginalOwnerID, change.NewOwnerID, m.user) + if err != nil { + m.l.Warning("Failed to create full text change owner task: %s", err) + continue + } + if err := m.dep.MediaMetaQueue(ctx).QueueTask(ctx, t); err != nil { + m.l.Warning("Failed to queue full text change owner task: %s", err) + } + } + + if len(diff.IndexToDelete) > 0 && m.dep.SettingProvider().FTSEnabled(ctx) { + t, err := NewFullTextDeleteTask(ctx, diff.IndexToDelete, m.user) + if err != nil { + m.l.Warning("Failed to create full text delete task: %s", err) + return + } + if err := m.dep.MediaMetaQueue(ctx).QueueTask(ctx, t); err != nil { + m.l.Warning("Failed to queue full text delete task: %s", err) + } + } + + ctx = context.WithoutCancel(ctx) + indexer := m.dep.SearchIndexer(ctx) + go func() { + for _, rename := range diff.IndexToRename { + if err := indexer.Rename(ctx, rename.FileID, rename.EntityID, rename.Uri.Name()); err != nil { + m.l.Warning("Failed to rename index for file %d: %s", rename.FileID, err) + } + } + }() +} diff --git a/pkg/filemanager/manager/manager.go b/pkg/filemanager/manager/manager.go index 4e314f7a..17fa18e5 100644 --- a/pkg/filemanager/manager/manager.go +++ b/pkg/filemanager/manager/manager.go @@ -57,6 +57,8 @@ type ( CreateViewerSession(ctx context.Context, uri *fs.URI, version string, viewer *types.Viewer) (*ViewerSession, error) // TraverseFile traverses a file to its root file, return the file with linked root. TraverseFile(ctx context.Context, fileID int) (fs.File, error) + // SearchFullText searches full text for given query and offset + SearchFullText(ctx context.Context, query string, offset int) (*FullTextSearchResults, error) } FsManagement interface { @@ -120,6 +122,16 @@ type ( ShareView bool ShowReadMe bool } + + FullTextSearchResults struct { + Hits []FullTextSearchResult + Total int64 + } + + FullTextSearchResult struct { + File fs.File + Content string + } ) type manager struct { diff --git a/pkg/filemanager/manager/operation.go b/pkg/filemanager/manager/operation.go index 83891f12..bb1fdcd3 100644 --- a/pkg/filemanager/manager/operation.go +++ b/pkg/filemanager/manager/operation.go @@ -138,11 +138,19 @@ func (m *manager) Create(ctx context.Context, path *fs.URI, fileType types.FileT } func (m *manager) Rename(ctx context.Context, path *fs.URI, newName string) (fs.File, error) { - return m.fs.Rename(ctx, path, newName) + file, indexDiff, err := m.fs.Rename(ctx, path, newName) + m.processIndexDiff(ctx, indexDiff) + return file, err } func (m *manager) MoveOrCopy(ctx context.Context, src []*fs.URI, dst *fs.URI, isCopy bool) error { - return m.fs.MoveOrCopy(ctx, src, dst, isCopy) + indexDiff, err := m.fs.MoveOrCopy(ctx, src, dst, isCopy) + if err != nil { + return err + } + + m.processIndexDiff(ctx, indexDiff) + return nil } func (m *manager) SoftDelete(ctx context.Context, path ...*fs.URI) error { @@ -159,7 +167,7 @@ func (m *manager) Delete(ctx context.Context, path []*fs.URI, opts ...fs.Option) return m.SoftDelete(ctx, path...) } - staleEntities, err := m.fs.Delete(ctx, path, fs.WithUnlinkOnly(o.UnlinkOnly), fs.WithSysSkipSoftDelete(o.SysSkipSoftDelete)) + staleEntities, indexDiff, err := m.fs.Delete(ctx, path, fs.WithUnlinkOnly(o.UnlinkOnly), fs.WithSysSkipSoftDelete(o.SysSkipSoftDelete)) if err != nil { return err } @@ -179,6 +187,12 @@ func (m *manager) Delete(ctx context.Context, path []*fs.URI, opts ...fs.Option) return fmt.Errorf("failed to queue explicit entity recycle task: %w", err) } } + + // Process index diff + if indexDiff != nil { + m.processIndexDiff(ctx, indexDiff) + } + return nil } diff --git a/pkg/filemanager/manager/recycle.go b/pkg/filemanager/manager/recycle.go index 419f3c55..e01abfbd 100644 --- a/pkg/filemanager/manager/recycle.go +++ b/pkg/filemanager/manager/recycle.go @@ -282,7 +282,6 @@ func (m *manager) RecycleEntities(ctx context.Context, force bool, entityIDs ... if err := inventory.CommitWithStorageDiff(ctx, tx, m.l, m.dep.UserClient()); err != nil { return fmt.Errorf("failed to commit delete change: %w", err) } - } } diff --git a/pkg/filemanager/manager/upload.go b/pkg/filemanager/manager/upload.go index ba578b06..a7d375a0 100644 --- a/pkg/filemanager/manager/upload.go +++ b/pkg/filemanager/manager/upload.go @@ -231,11 +231,12 @@ func (m *manager) CancelUploadSession(ctx context.Context, path *fs.URI, session var ( staleEntities []fs.Entity + indexDiff *fs.IndexDiff err error ) if !m.stateless { - staleEntities, err = m.fs.CancelUploadSession(ctx, path, sessionID, session) + staleEntities, indexDiff, err = m.fs.CancelUploadSession(ctx, path, sessionID, session) if err != nil { return err } @@ -277,6 +278,11 @@ func (m *manager) CancelUploadSession(ctx context.Context, path *fs.URI, session } } + // Process index diff + if indexDiff != nil { + m.processIndexDiff(ctx, indexDiff) + } + return nil } @@ -308,7 +314,7 @@ func (m *manager) CompleteUpload(ctx context.Context, session *fs.UploadSession) } } - m.onNewEntityUploaded(ctx, session, d) + m.onNewEntityUploaded(ctx, session, d, file.OwnerID()) // Remove upload session _ = m.kv.Delete(UploadSessionCachePrefix, session.Props.UploadSessionID) return file, nil @@ -371,7 +377,7 @@ func (m *manager) OnUploadFailed(ctx context.Context, session *fs.UploadSession) m.l.Warning("OnUploadFailed hook failed to delete file: %s", err) } } else if !session.Importing { - if err := m.fs.VersionControl(ctx, session.Props.Uri, session.EntityID, true); err != nil { + if _, err := m.fs.VersionControl(ctx, session.Props.Uri, session.EntityID, true); err != nil { m.l.Warning("OnUploadFailed hook failed to version control: %s", err) } } @@ -426,10 +432,12 @@ func (m *manager) updateStateless(ctx context.Context, req *fs.UploadRequest, o return nil, nil } -func (m *manager) onNewEntityUploaded(ctx context.Context, session *fs.UploadSession, d driver.Handler) { +func (m *manager) onNewEntityUploaded(ctx context.Context, session *fs.UploadSession, d driver.Handler, owner int) { if !m.stateless { // Submit media meta task for new entity m.mediaMetaForNewEntity(ctx, session, d) + // Submit full text index task for new entity + m.fullTextIndexForNewEntity(ctx, session, owner) } } diff --git a/pkg/queue/task.go b/pkg/queue/task.go index 90bb7429..ee845126 100644 --- a/pkg/queue/task.go +++ b/pkg/queue/task.go @@ -105,6 +105,11 @@ const ( RemoteDownloadTaskType = "remote_download" ImportTaskType = "import" + FullTextIndexTaskType = "full_text_index" + FullTextCopyTaskType = "full_text_copy" + FullTextChangeOwnerTaskType = "full_text_change_owner" + FullTextDeleteTaskType = "full_text_delete" + SlaveCreateArchiveTaskType = "slave_create_archive" SlaveUploadTaskType = "slave_upload" SlaveExtractArchiveType = "slave_extract_archive" diff --git a/pkg/searcher/extractor/noop.go b/pkg/searcher/extractor/noop.go new file mode 100644 index 00000000..43c64936 --- /dev/null +++ b/pkg/searcher/extractor/noop.go @@ -0,0 +1,15 @@ +package extractor + +import ( + "context" + "io" +) + +// NoopExtractor is a no-op implementation of TextExtractor, used when text extraction is disabled. +type NoopExtractor struct{} + +func (n *NoopExtractor) Exts() []string { return nil } +func (n *NoopExtractor) MaxFileSize() int64 { return 0 } +func (n *NoopExtractor) Extract(ctx context.Context, reader io.Reader) (string, error) { + return "", nil +} diff --git a/pkg/searcher/extractor/tika.go b/pkg/searcher/extractor/tika.go new file mode 100644 index 00000000..ec0b66ff --- /dev/null +++ b/pkg/searcher/extractor/tika.go @@ -0,0 +1,82 @@ +package extractor + +import ( + "context" + "fmt" + "io" + "strings" + + "github.com/cloudreve/Cloudreve/v4/pkg/logging" + "github.com/cloudreve/Cloudreve/v4/pkg/request" + "github.com/cloudreve/Cloudreve/v4/pkg/setting" +) + +// TikaExtractor extracts text from documents using Apache Tika. +type TikaExtractor struct { + client request.Client + settings setting.Provider + l logging.Logger + exts []string + maxFileSize int64 +} + +// NewTikaExtractor creates a new TikaExtractor. +func NewTikaExtractor(client request.Client, settings setting.Provider, l logging.Logger, cfg *setting.FTSTikaExtractorSetting) *TikaExtractor { + exts := cfg.Exts + return &TikaExtractor{ + client: client, + settings: settings, + l: l, + exts: exts, + maxFileSize: cfg.MaxFileSize, + } +} + +// Exts returns the list of supported file extensions. +func (t *TikaExtractor) Exts() []string { + return t.exts +} + +// MaxFileSize returns the maximum file size for text extraction. +func (t *TikaExtractor) MaxFileSize() int64 { + return t.maxFileSize +} + +// Extract sends the document to Tika and returns the extracted plain text. +func (t *TikaExtractor) Extract(ctx context.Context, reader io.Reader) (string, error) { + tikaCfg := t.settings.FTSTikaExtractor(ctx) + if tikaCfg.Endpoint == "" { + return "", fmt.Errorf("tika endpoint not configured") + } + + endpoint := strings.TrimRight(tikaCfg.Endpoint, "/") + "/tika" + resp := t.client.Request( + "PUT", + endpoint, + reader, + request.WithHeader(map[string][]string{ + "Accept": {"text/plain"}, + }), + ) + if resp.Err != nil { + return "", fmt.Errorf("tika request failed: %w", resp.Err) + } + defer resp.Response.Body.Close() + + if resp.Response.StatusCode != 200 { + return "", fmt.Errorf("tika returned status %d", resp.Response.StatusCode) + } + + maxSize := tikaCfg.MaxResponseSize + if maxSize <= 0 { + maxSize = 10 * 1024 * 1024 // default 10MB + } + + limited := io.LimitReader(resp.Response.Body, maxSize) + body, err := io.ReadAll(limited) + if err != nil { + return "", fmt.Errorf("failed to read tika response: %w", err) + } + + return strings.TrimSpace(string(body)), nil +} diff --git a/pkg/searcher/indexer.go b/pkg/searcher/indexer.go new file mode 100644 index 00000000..0e7b6518 --- /dev/null +++ b/pkg/searcher/indexer.go @@ -0,0 +1,49 @@ +package searcher + +import ( + "context" + "io" +) + +type SearchDocument struct { + ID string `json:"id"` + FileID int `json:"file_id"` + OwnerID int `json:"owner_id"` + EntityID int `json:"entity_id"` + ChunkIdx int `json:"chunk_idx"` + FileName string `json:"file_name"` + Text string `json:"text"` + Formated *FormatedHit `json:"_formatted,omitempty"` +} + +type FormatedHit struct { + Text string `json:"text"` +} + +type SearchResult struct { + FileID int `json:"file_id"` + OwnerID int `json:"owner_id"` + EntityID int `json:"entity_id"` + FileName string `json:"file_name"` + Text string `json:"text"` +} + +type SearchIndexer interface { + IndexFile(ctx context.Context, ownerID, fileID, entityID int, fileName, text string) error + DeleteByFileIDs(ctx context.Context, fileID ...int) error + ChangeOwner(ctx context.Context, fileID, oldOwnerID, newOwnerID int) error + CopyByFileID(ctx context.Context, srcFileID, dstFileID, dstOwnerID, dstEntityID int) error + Rename(ctx context.Context, fileID, entityID int, newFileName string) error + Search(ctx context.Context, ownerID int, query string, offset int) ([]SearchResult, int64, error) + // IndexReady reports whether the search index exists and has the required + // configuration (filterable/searchable attributes, etc.). + IndexReady(ctx context.Context) (bool, error) + EnsureIndex(ctx context.Context) error + Close() error +} + +type TextExtractor interface { + Exts() []string + MaxFileSize() int64 + Extract(ctx context.Context, reader io.Reader) (string, error) +} diff --git a/pkg/searcher/indexer/chunker.go b/pkg/searcher/indexer/chunker.go new file mode 100644 index 00000000..ac2a330d --- /dev/null +++ b/pkg/searcher/indexer/chunker.go @@ -0,0 +1,101 @@ +package indexer + +import "strings" + +const defaultMaxBytes = 2000 + +// ChunkText splits text into chunks of approximately maxBytes bytes each. +// It splits on paragraph breaks (\n\n), combines small paragraphs until the +// byte limit is reached, and splits large paragraphs at word boundaries. +func ChunkText(text string, maxBytes int) []string { + if maxBytes <= 0 { + maxBytes = defaultMaxBytes + } + + text = strings.TrimSpace(text) + if text == "" { + return nil + } + + paragraphs := strings.Split(text, "\n\n") + var chunks []string + var current []string + currentBytes := 0 + + for _, para := range paragraphs { + para = strings.TrimSpace(para) + if para == "" { + continue + } + + paraBytes := len(para) + + // If a single paragraph exceeds maxBytes, split it at word boundaries + if paraBytes > maxBytes { + // Flush accumulated content first + if currentBytes > 0 { + chunks = append(chunks, strings.Join(current, "\n\n")) + current = nil + currentBytes = 0 + } + chunks = append(chunks, splitByBytes(para, maxBytes)...) + continue + } + + // If adding this paragraph (plus separator) would exceed the limit, flush + joinerLen := 0 + if currentBytes > 0 { + joinerLen = 2 // "\n\n" + } + if currentBytes+joinerLen+paraBytes > maxBytes && currentBytes > 0 { + chunks = append(chunks, strings.Join(current, "\n\n")) + current = nil + currentBytes = 0 + } + + if currentBytes > 0 { + currentBytes += 2 // account for "\n\n" joiner + } + current = append(current, para) + currentBytes += paraBytes + } + + // Flush remaining + if currentBytes > 0 { + chunks = append(chunks, strings.Join(current, "\n\n")) + } + + return chunks +} + +// splitByBytes splits text into chunks at word boundaries, each at most maxBytes bytes. +func splitByBytes(text string, maxBytes int) []string { + words := strings.Fields(text) + var chunks []string + var current []string + currentBytes := 0 + + for _, w := range words { + wLen := len(w) + spaceLen := 0 + if currentBytes > 0 { + spaceLen = 1 + } + + if currentBytes+spaceLen+wLen > maxBytes && currentBytes > 0 { + chunks = append(chunks, strings.Join(current, " ")) + current = nil + currentBytes = 0 + spaceLen = 0 + } + + current = append(current, w) + currentBytes += spaceLen + wLen + } + + if len(current) > 0 { + chunks = append(chunks, strings.Join(current, " ")) + } + + return chunks +} diff --git a/pkg/searcher/indexer/chunker_test.go b/pkg/searcher/indexer/chunker_test.go new file mode 100644 index 00000000..398784c9 --- /dev/null +++ b/pkg/searcher/indexer/chunker_test.go @@ -0,0 +1,84 @@ +package indexer + +import ( + "strings" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestChunkText_Empty(t *testing.T) { + assert.Nil(t, ChunkText("", 500)) + assert.Nil(t, ChunkText(" ", 500)) +} + +func TestChunkText_SingleSmallParagraph(t *testing.T) { + chunks := ChunkText("Hello world", 500) + assert.Equal(t, []string{"Hello world"}, chunks) +} + +func TestChunkText_MultipleParagraphsCombined(t *testing.T) { + text := "First paragraph.\n\nSecond paragraph.\n\nThird paragraph." + chunks := ChunkText(text, 500) + assert.Len(t, chunks, 1) + assert.Contains(t, chunks[0], "First paragraph.") + assert.Contains(t, chunks[0], "Third paragraph.") +} + +func TestChunkText_SplitOnParagraphBoundary(t *testing.T) { + // Create two paragraphs each ~300 bytes + para := strings.Repeat("abcde ", 50) // 300 bytes + para = strings.TrimSpace(para) + text := para + "\n\n" + para + + chunks := ChunkText(text, 500) + assert.Len(t, chunks, 2) +} + +func TestChunkText_LargeParagraphSplit(t *testing.T) { + // Create a single paragraph of 1200 bytes (240 words * 5 bytes each) + words := make([]string, 240) + for i := range words { + words[i] = "word" // 4 bytes + 1 space = 5 per word + } + text := strings.Join(words, " ") // 240*4 + 239 = 1199 bytes + + chunks := ChunkText(text, 500) + assert.Len(t, chunks, 3) + assert.LessOrEqual(t, len(chunks[0]), 500) + assert.LessOrEqual(t, len(chunks[1]), 500) + assert.Greater(t, len(chunks[2]), 0) +} + +func TestChunkText_DefaultMaxBytes(t *testing.T) { + chunks := ChunkText("hello", 0) + assert.Equal(t, []string{"hello"}, chunks) +} + +func TestChunkText_EmptyParagraphsIgnored(t *testing.T) { + text := "First.\n\n\n\n\n\nSecond." + chunks := ChunkText(text, 500) + assert.Len(t, chunks, 1) + assert.Equal(t, "First.\n\nSecond.", chunks[0]) +} + +func TestChunkText_ParagraphKeptWhole(t *testing.T) { + // A paragraph under the limit should not be split + para := strings.Repeat("x", 400) + chunks := ChunkText(para, 500) + assert.Len(t, chunks, 1) + assert.Equal(t, para, chunks[0]) +} + +func TestChunkText_JoinerAccountedInLimit(t *testing.T) { + // Two paragraphs that fit individually but exceed the limit when joined with "\n\n" + p1 := strings.Repeat("a", 250) + p2 := strings.Repeat("b", 250) + text := p1 + "\n\n" + p2 + + chunks := ChunkText(text, 500) + // 250 + 2 + 250 = 502 > 500, so they should be split + assert.Len(t, chunks, 2) + assert.Equal(t, p1, chunks[0]) + assert.Equal(t, p2, chunks[1]) +} diff --git a/pkg/searcher/indexer/meilisearch.go b/pkg/searcher/indexer/meilisearch.go new file mode 100644 index 00000000..22c34393 --- /dev/null +++ b/pkg/searcher/indexer/meilisearch.go @@ -0,0 +1,384 @@ +package indexer + +import ( + "context" + "encoding/json" + "fmt" + "slices" + "strings" + + "github.com/cloudreve/Cloudreve/v4/pkg/logging" + "github.com/cloudreve/Cloudreve/v4/pkg/searcher" + "github.com/cloudreve/Cloudreve/v4/pkg/setting" + "github.com/meilisearch/meilisearch-go" +) + +const ( + indexName = "cloudreve_files" + embedderName = "cr-text" +) + +// MeilisearchIndexer implements SearchIndexer using Meilisearch. +type MeilisearchIndexer struct { + client meilisearch.ServiceManager + l logging.Logger + pageSize int + chunkSize int + cfg *setting.FTSIndexMeilisearchSetting +} + +// NewMeilisearchIndexer creates a new MeilisearchIndexer. +func NewMeilisearchIndexer(msCfg *setting.FTSIndexMeilisearchSetting, chunkSize int, l logging.Logger) *MeilisearchIndexer { + client := meilisearch.New(msCfg.Endpoint, meilisearch.WithAPIKey(msCfg.APIKey)) + return &MeilisearchIndexer{ + client: client, + l: l, + pageSize: msCfg.PageSize, + chunkSize: chunkSize, + cfg: msCfg, + } +} + +var ( + requiredFilterable = []string{"owner_id", "file_id", "entity_id"} + requiredSearchable = []string{"text", "file_name"} + requiredDistinct = "file_id" +) + +func (m *MeilisearchIndexer) IndexReady(ctx context.Context) (bool, error) { + index := m.client.Index(indexName) + + settings, err := index.GetSettingsWithContext(ctx) + if err != nil { + // If the index doesn't exist, Meilisearch returns an error. + return false, nil + } + + // Check filterable attributes. + for _, attr := range requiredFilterable { + if !slices.Contains(settings.FilterableAttributes, attr) { + return false, nil + } + } + + // Check searchable attributes. + for _, attr := range requiredSearchable { + if !slices.Contains(settings.SearchableAttributes, attr) { + return false, nil + } + } + + // Check distinct attribute. + if settings.DistinctAttribute == nil || *settings.DistinctAttribute != requiredDistinct { + return false, nil + } + + // Check embedder if embedding is enabled. + if m.cfg.EmbeddingEnbaled { + if settings.Embedders == nil { + return false, nil + } + if _, ok := settings.Embedders[embedderName]; !ok { + return false, nil + } + } + + return true, nil +} + +func (m *MeilisearchIndexer) EnsureIndex(ctx context.Context) error { + _, err := m.client.CreateIndexWithContext(ctx, &meilisearch.IndexConfig{ + Uid: indexName, + PrimaryKey: "id", + }) + if err != nil { + m.l.Debug("Create index returned (may already exist): %s", err) + } + + index := m.client.Index(indexName) + + filterableAttrs := []any{"owner_id", "file_id", "entity_id"} + if _, err := index.UpdateFilterableAttributesWithContext(ctx, &filterableAttrs); err != nil { + return fmt.Errorf("failed to set filterable attributes: %w", err) + } + + searchableAttrs := []string{"text", "file_name"} + if _, err := index.UpdateSearchableAttributesWithContext(ctx, &searchableAttrs); err != nil { + return fmt.Errorf("failed to set searchable attributes: %w", err) + } + + _, err = index.UpdateDistinctAttributeWithContext(ctx, "file_id") + if err != nil { + return fmt.Errorf("failed to set distinct attribute: %w", err) + } + + if m.cfg.EmbeddingEnbaled { + var embedder meilisearch.Embedder + if err := json.Unmarshal([]byte(m.cfg.EmbeddingSetting), &embedder); err != nil { + m.cfg.EmbeddingEnbaled = false + m.l.Warning("Failed to unmarshal embedding setting: %s, fallback to disable embedding", err) + return nil + } + + _, err := index.UpdateEmbeddersWithContext(ctx, map[string]meilisearch.Embedder{ + embedderName: embedder, + }) + if err != nil { + return fmt.Errorf("failed to set embedders: %w", err) + } + } else { + _, err := index.ResetEmbeddersWithContext(ctx) + if err != nil { + m.l.Warning("Failed to reset embedder: %w", err) + } + } + + return nil +} + +func (m *MeilisearchIndexer) IndexFile(ctx context.Context, ownerID, fileID, entityID int, fileName, text string) error { + chunks := ChunkText(text, m.chunkSize) + if len(chunks) == 0 { + return nil + } + + docs := make([]searcher.SearchDocument, 0, len(chunks)) + for i, chunk := range chunks { + docs = append(docs, searcher.SearchDocument{ + ID: fmt.Sprintf("%d_%d", fileID, i), + FileID: fileID, + OwnerID: ownerID, + EntityID: entityID, + ChunkIdx: i, + FileName: fileName, + Text: chunk, + }) + } + + index := m.client.Index(indexName) + pk := "id" + if _, err := index.AddDocumentsWithContext(ctx, docs, &meilisearch.DocumentOptions{PrimaryKey: &pk}); err != nil { + return fmt.Errorf("failed to add documents: %w", err) + } + + return nil +} + +func (m *MeilisearchIndexer) DeleteByFileIDs(ctx context.Context, fileID ...int) error { + if len(fileID) == 0 { + return nil + } + + index := m.client.Index(indexName) + strs := make([]string, len(fileID)) + for i, id := range fileID { + strs[i] = fmt.Sprintf("%d", id) + } + filter := fmt.Sprintf("file_id IN [%s]", strings.Join(strs, ", ")) + if _, err := index.DeleteDocumentsByFilterWithContext(ctx, filter, nil); err != nil { + return fmt.Errorf("failed to delete documents by file_ids: %w", err) + } + return nil +} + +func (m *MeilisearchIndexer) ChangeOwner(ctx context.Context, fileID, oldOwnerID, newOwnerID int) error { + index := m.client.Index(indexName) + filter := fmt.Sprintf("file_id = %d AND owner_id = %d", fileID, oldOwnerID) + + // Fetch all existing document chunks in batches. + const batchSize int64 = 100 + var allDocs []searcher.SearchDocument + for offset := int64(0); ; offset += batchSize { + var result meilisearch.DocumentsResult + if err := index.GetDocumentsWithContext(ctx, &meilisearch.DocumentsQuery{ + Filter: filter, + Limit: batchSize, + Offset: offset, + }, &result); err != nil { + return fmt.Errorf("failed to get documents: %w", err) + } + + for _, hit := range result.Results { + var doc searcher.SearchDocument + if err := hit.DecodeInto(&doc); err != nil { + m.l.Warning("Failed to decode document during owner change: %s", err) + continue + } + allDocs = append(allDocs, doc) + } + + if int64(len(result.Results)) < batchSize { + break + } + } + + if len(allDocs) == 0 { + return nil + } + + // Update owner_id in place — primary key is {fileID}_{chunkIdx} so it stays the same. + for i := range allDocs { + allDocs[i].OwnerID = newOwnerID + } + + if _, err := index.UpdateDocumentsInBatchesWithContext(ctx, allDocs, 100, nil); err != nil { + return fmt.Errorf("failed to update documents with new owner: %w", err) + } + + return nil +} + +func (m *MeilisearchIndexer) CopyByFileID(ctx context.Context, srcFileID, dstFileID, dstOwnerID, dstEntityID int) error { + index := m.client.Index(indexName) + filter := fmt.Sprintf("file_id = %d", srcFileID) + + const batchSize int64 = 100 + var allDocs []searcher.SearchDocument + for offset := int64(0); ; offset += batchSize { + var result meilisearch.DocumentsResult + if err := index.GetDocumentsWithContext(ctx, &meilisearch.DocumentsQuery{ + Filter: filter, + Limit: batchSize, + Offset: offset, + }, &result); err != nil { + return fmt.Errorf("failed to get source documents: %w", err) + } + + for _, hit := range result.Results { + var doc searcher.SearchDocument + if err := hit.DecodeInto(&doc); err != nil { + m.l.Warning("Failed to decode document during copy: %s", err) + continue + } + allDocs = append(allDocs, doc) + } + + if int64(len(result.Results)) < batchSize { + break + } + } + + if len(allDocs) == 0 { + return fmt.Errorf("no source documents found for file %d", srcFileID) + } + + for i := range allDocs { + if allDocs[i].EntityID != dstEntityID { + m.l.Warning("Entity id mismatch for file %d, original: %d, destination: %d", srcFileID, allDocs[i].EntityID, dstEntityID) + continue + } + + allDocs[i].ID = fmt.Sprintf("%d_%d", dstFileID, allDocs[i].ChunkIdx) + allDocs[i].FileID = dstFileID + allDocs[i].OwnerID = dstOwnerID + allDocs[i].EntityID = dstEntityID + } + + if len(allDocs) == 0 { + return fmt.Errorf("no source documents found for file %d", srcFileID) + } + + pk := "id" + if _, err := index.AddDocumentsWithContext(ctx, allDocs, &meilisearch.DocumentOptions{PrimaryKey: &pk}); err != nil { + return fmt.Errorf("failed to add copied documents: %w", err) + } + + return nil +} + +func (m *MeilisearchIndexer) Rename(ctx context.Context, fileID, entityID int, newFileName string) error { + index := m.client.Index(indexName) + filter := fmt.Sprintf("file_id = %d AND entity_id = %d", fileID, entityID) + + const batchSize int64 = 100 + var allDocs []searcher.SearchDocument + for offset := int64(0); ; offset += batchSize { + var result meilisearch.DocumentsResult + if err := index.GetDocumentsWithContext(ctx, &meilisearch.DocumentsQuery{ + Filter: filter, + Limit: batchSize, + Offset: offset, + }, &result); err != nil { + return fmt.Errorf("failed to get documents for rename: %w", err) + } + + for _, hit := range result.Results { + var doc searcher.SearchDocument + if err := hit.DecodeInto(&doc); err != nil { + m.l.Warning("Failed to decode document during rename: %s", err) + continue + } + doc.FileName = newFileName + allDocs = append(allDocs, doc) + } + + if int64(len(result.Results)) < batchSize { + break + } + } + + if len(allDocs) == 0 { + return nil + } + + if _, err := index.UpdateDocumentsInBatchesWithContext(ctx, allDocs, 100, nil); err != nil { + return fmt.Errorf("failed to update documents with new file name: %w", err) + } + + return nil +} + +func (m *MeilisearchIndexer) Search(ctx context.Context, ownerID int, query string, offset int) ([]searcher.SearchResult, int64, error) { + index := m.client.Index(indexName) + + searchReq := &meilisearch.SearchRequest{ + Filter: fmt.Sprintf("owner_id = %d", ownerID), + Limit: int64(m.pageSize), + Offset: int64(offset), + AttributesToHighlight: []string{"text"}, + } + + if m.cfg.EmbeddingEnbaled { + searchReq.Hybrid = &meilisearch.SearchRequestHybrid{ + Embedder: embedderName, + } + } + + resp, err := index.SearchWithContext(ctx, query, searchReq) + if err != nil { + return nil, 0, fmt.Errorf("search failed: %w", err) + } + + results := make([]searcher.SearchResult, 0, len(resp.Hits)) + seen := make(map[int]struct{}) + for _, hit := range resp.Hits { + var doc searcher.SearchDocument + if err := hit.DecodeInto(&doc); err != nil { + continue + } + + if _, exists := seen[doc.FileID]; exists { + continue + } + seen[doc.FileID] = struct{}{} + + // Extract text from raw JSON for display + textStr := doc.Text + if doc.Formated != nil { + textStr = doc.Formated.Text + } + + results = append(results, searcher.SearchResult{ + FileID: doc.FileID, + OwnerID: doc.OwnerID, + FileName: doc.FileName, + Text: textStr, + }) + } + + return results, resp.EstimatedTotalHits, nil +} + +func (m *MeilisearchIndexer) Close() error { + return nil +} diff --git a/pkg/searcher/indexer/noop.go b/pkg/searcher/indexer/noop.go new file mode 100644 index 00000000..3a4b608b --- /dev/null +++ b/pkg/searcher/indexer/noop.go @@ -0,0 +1,46 @@ +package indexer + +import ( + "context" + + "github.com/cloudreve/Cloudreve/v4/pkg/searcher" +) + +// NoopIndexer is a no-op implementation of SearchIndexer, used when FTS is disabled. +type NoopIndexer struct{} + +func (n *NoopIndexer) IndexFile(ctx context.Context, ownerID, fileID, entityID int, fileName, text string) error { + return nil +} + +func (n *NoopIndexer) DeleteByFileIDs(ctx context.Context, fileID ...int) error { + return nil +} + +func (n *NoopIndexer) ChangeOwner(ctx context.Context, fileID, oldOwnerID, newOwnerID int) error { + return nil +} + +func (n *NoopIndexer) CopyByFileID(ctx context.Context, srcFileID, dstFileID, dstOwnerID, dstEntityID int) error { + return nil +} + +func (n *NoopIndexer) Rename(ctx context.Context, fileID, entityID int, newFileName string) error { + return nil +} + +func (n *NoopIndexer) Search(ctx context.Context, ownerID int, query string, offset int) ([]searcher.SearchResult, int64, error) { + return nil, 0, nil +} + +func (n *NoopIndexer) IndexReady(ctx context.Context) (bool, error) { + return true, nil +} + +func (n *NoopIndexer) EnsureIndex(ctx context.Context) error { + return nil +} + +func (n *NoopIndexer) Close() error { + return nil +} diff --git a/pkg/setting/provider.go b/pkg/setting/provider.go index 21722af8..5bc37775 100644 --- a/pkg/setting/provider.go +++ b/pkg/setting/provider.go @@ -222,6 +222,18 @@ type ( EventHubEnabled(ctx context.Context) bool // EventHubDebounceDelay returns the debounce delay of event hub. EventHubDebounceDelay(ctx context.Context) time.Duration + // FTSEnabled returns true if full-text search is enabled. + FTSEnabled(ctx context.Context) bool + // FTSIndexType returns the full-text search index type. + FTSIndexType(ctx context.Context) FTSIndexType + // FTSExtractorType returns the full-text search extractor type. + FTSExtractorType(ctx context.Context) FTSExtractorType + // FTSIndexMeilisearch returns Meilisearch index settings. + FTSIndexMeilisearch(ctx context.Context) *FTSIndexMeilisearchSetting + // FTSTikaExtractor returns Tika extractor settings. + FTSTikaExtractor(ctx context.Context) *FTSTikaExtractorSetting + // FTSChunkSize returns the maximum chunk size in bytes for full-text search indexing. + FTSChunkSize(ctx context.Context) int } UseFirstSiteUrlCtxKey = struct{} ) @@ -600,6 +612,41 @@ func (s *settingProvider) EventHubEnabled(ctx context.Context) bool { return s.getBoolean(ctx, "fs_event_push_enabled", true) } +func (s *settingProvider) FTSEnabled(ctx context.Context) bool { + return s.getBoolean(ctx, "fts_enabled", false) +} + +func (s *settingProvider) FTSIndexType(ctx context.Context) FTSIndexType { + return FTSIndexType(s.getString(ctx, "fts_index_type", "")) +} + +func (s *settingProvider) FTSExtractorType(ctx context.Context) FTSExtractorType { + return FTSExtractorType(s.getString(ctx, "fts_extractor_type", "")) +} + +func (s *settingProvider) FTSIndexMeilisearch(ctx context.Context) *FTSIndexMeilisearchSetting { + return &FTSIndexMeilisearchSetting{ + Endpoint: s.getString(ctx, "fts_meilisearch_endpoint", ""), + APIKey: s.getString(ctx, "fts_meilisearch_api_key", ""), + PageSize: s.getInt(ctx, "fts_meilisearch_page_size", 5), + EmbeddingEnbaled: s.getBoolean(ctx, "fts_meilisearch_embed_enabled", false), + EmbeddingSetting: s.getString(ctx, "fts_meilisearch_embed_config", "{}"), + } +} + +func (s *settingProvider) FTSTikaExtractor(ctx context.Context) *FTSTikaExtractorSetting { + return &FTSTikaExtractorSetting{ + Endpoint: s.getString(ctx, "fts_tika_endpoint", ""), + MaxResponseSize: s.getInt64(ctx, "fts_tika_max_response_size", 10485760), + Exts: s.getStringList(ctx, "fts_tika_exts", []string{}), + MaxFileSize: s.getInt64(ctx, "fts_tika_max_file_size_remote", 52428800), + } +} + +func (s *settingProvider) FTSChunkSize(ctx context.Context) int { + return s.getInt(ctx, "fts_chunk_size", 2000) +} + func (s *settingProvider) Queue(ctx context.Context, queueType QueueType) *QueueSetting { queueTypeStr := string(queueType) return &QueueSetting{ diff --git a/pkg/setting/types.go b/pkg/setting/types.go index 77d83458..db75bc79 100644 --- a/pkg/setting/types.go +++ b/pkg/setting/types.go @@ -225,6 +225,35 @@ type CustomHTML struct { SidebarBottom string `json:"sidebar_bottom,omitempty"` } +type FTSIndexType string + +const ( + FTSIndexTypeNone = FTSIndexType("") + FTSIndexTypeMeilisearch = FTSIndexType("meilisearch") +) + +type FTSExtractorType string + +const ( + FTSExtractorTypeNone = FTSExtractorType("") + FTSExtractorTypeTika = FTSExtractorType("tika") +) + +type FTSIndexMeilisearchSetting struct { + Endpoint string + APIKey string + PageSize int + EmbeddingEnbaled bool + EmbeddingSetting string +} + +type FTSTikaExtractorSetting struct { + Endpoint string + MaxResponseSize int64 + Exts []string + MaxFileSize int64 +} + type MasterEncryptKeyVaultType string const ( diff --git a/routers/controllers/file.go b/routers/controllers/file.go index 317d23c9..cc20b102 100644 --- a/routers/controllers/file.go +++ b/routers/controllers/file.go @@ -436,3 +436,17 @@ func HandleExplorerEventsPush(c *gin.Context) { return } } + +func FulltextSearch(c *gin.Context) { + service := ParametersFromContext[*explorer.FulltextSearchService](c, explorer.FulltextSearchParamCtx{}) + resp, err := service.Search(c) + if err != nil { + c.JSON(200, serializer.Err(c, err)) + c.Abort() + return + } + + c.JSON(200, serializer.Response{ + Data: resp, + }) +} diff --git a/routers/router.go b/routers/router.go index 45bdec91..1164b8ab 100644 --- a/routers/router.go +++ b/routers/router.go @@ -785,6 +785,16 @@ func initMasterRouter(dep dependency.Dep) *gin.Engine { controllers.FromQuery[explorer.ExplorerEventService](explorer.ExplorerEventParamCtx{}), controllers.HandleExplorerEventsPush, ) + + // Full text search + file.GET("search", + middleware.LoginRequired(), + middleware.IsFunctionEnabled(func(c *gin.Context) bool { + return dep.SettingProvider().FTSEnabled(c) + }), + controllers.FromQuery[explorer.FulltextSearchService](explorer.FulltextSearchParamCtx{}), + controllers.FulltextSearch, + ) } // 分享相关 diff --git a/service/basic/site.go b/service/basic/site.go index 6fb7276a..2e222524 100644 --- a/service/basic/site.go +++ b/service/basic/site.go @@ -54,6 +54,7 @@ type SiteConfig struct { ThumbnailHeight int `json:"thumbnail_height,omitempty"` CustomProps []types.CustomProps `json:"custom_props,omitempty"` ShowEncryptionStatus bool `json:"show_encryption_status,omitempty"` + FullTextSearch bool `json:"full_text_search,omitempty"` // Thumbnail section ThumbExts []string `json:"thumb_exts,omitempty"` @@ -120,6 +121,7 @@ func (s *GetSettingService) GetSiteConfig(c *gin.Context) (*SiteConfig, error) { ThumbnailHeight: h, CustomProps: customProps, ShowEncryptionStatus: showEncryptionStatus, + FullTextSearch: settings.FTSEnabled(c), }, nil case "emojis": emojis := settings.EmojiPresets(c) diff --git a/service/explorer/file.go b/service/explorer/file.go index 05120b84..98dbe63b 100644 --- a/service/explorer/file.go +++ b/service/explorer/file.go @@ -747,3 +747,25 @@ func (s *ArchiveListFilesService) List(c *gin.Context) (*ArchiveListFilesRespons return BuildArchiveListFilesResponse(files), nil } + +type ( + FulltextSearchParamCtx struct{} + FulltextSearchService struct { + Query string `form:"query" binding:"required"` + Offset int `form:"offset"` + } +) + +func (s *FulltextSearchService) Search(c *gin.Context) (*FullTextSearchResults, error) { + dep := dependency.FromContext(c) + user := inventory.UserFromContext(c) + m := manager.NewFileManager(dep, user) + defer m.Recycle() + + results, err := m.SearchFullText(c, s.Query, s.Offset) + if err != nil { + return nil, serializer.NewError(serializer.CodeInternalSetting, "failed to search full text", err) + } + + return BuildFullTextSearchResults(c, user, dep.HashIDEncoder(), results), nil +} diff --git a/service/explorer/response.go b/service/explorer/response.go index d3887924..0afa97ea 100644 --- a/service/explorer/response.go +++ b/service/explorer/response.go @@ -26,6 +26,28 @@ import ( "github.com/samber/lo" ) +type FullTextSearchResults struct { + Hits []FullTextSearchResult `json:"hits"` + Total int64 `json:"total"` +} + +func BuildFullTextSearchResults(ctx context.Context, user *ent.User, hasher hashid.Encoder, results *manager.FullTextSearchResults) *FullTextSearchResults { + return &FullTextSearchResults{ + Hits: lo.Map(results.Hits, func(result manager.FullTextSearchResult, index int) FullTextSearchResult { + return FullTextSearchResult{ + File: *BuildFileResponse(ctx, user, result.File, hasher, nil), + Content: result.Content, + } + }), + Total: results.Total, + } +} + +type FullTextSearchResult struct { + File FileResponse `json:"file"` + Content string `json:"content"` +} + type ArchiveListFilesResponse struct { Files []manager.ArchivedFile `json:"files"` }