mirror of
https://github.com/cloudreve/cloudreve.git
synced 2026-03-02 22:47:01 +00:00
feat: full-text search and RAG powered search
This commit is contained in:
@@ -3,7 +3,7 @@ package constants
|
||||
// These values will be injected at build time, DO NOT EDIT.
|
||||
|
||||
// BackendVersion 当前后端版本号
|
||||
var BackendVersion = "4.13.0"
|
||||
var BackendVersion = "4.14.0"
|
||||
|
||||
// IsPro 是否为Pro版本
|
||||
var IsPro = "false"
|
||||
|
||||
@@ -26,6 +26,9 @@ import (
|
||||
"github.com/cloudreve/Cloudreve/v4/pkg/mediameta"
|
||||
"github.com/cloudreve/Cloudreve/v4/pkg/queue"
|
||||
"github.com/cloudreve/Cloudreve/v4/pkg/request"
|
||||
"github.com/cloudreve/Cloudreve/v4/pkg/searcher"
|
||||
"github.com/cloudreve/Cloudreve/v4/pkg/searcher/extractor"
|
||||
"github.com/cloudreve/Cloudreve/v4/pkg/searcher/indexer"
|
||||
"github.com/cloudreve/Cloudreve/v4/pkg/setting"
|
||||
"github.com/cloudreve/Cloudreve/v4/pkg/thumb"
|
||||
"github.com/cloudreve/Cloudreve/v4/pkg/util"
|
||||
@@ -139,6 +142,10 @@ type Dep interface {
|
||||
EncryptorFactory(ctx context.Context) encrypt.CryptorFactory
|
||||
// EventHub Get a singleton eventhub.EventHub instance for event publishing.
|
||||
EventHub() eventhub.EventHub
|
||||
// SearchIndexer Get a singleton searcher.SearchIndexer instance for full-text search indexing.
|
||||
SearchIndexer(ctx context.Context) searcher.SearchIndexer
|
||||
// TextExtractor Get a singleton searcher.TextExtractor instance for text extraction.
|
||||
TextExtractor(ctx context.Context) searcher.TextExtractor
|
||||
}
|
||||
|
||||
type dependency struct {
|
||||
@@ -187,6 +194,8 @@ type dependency struct {
|
||||
cron *cron.Cron
|
||||
masterEncryptKeyVault encrypt.MasterEncryptKeyVault
|
||||
eventHub eventhub.EventHub
|
||||
searchIndexer searcher.SearchIndexer
|
||||
textExtractor searcher.TextExtractor
|
||||
|
||||
configPath string
|
||||
isPro bool
|
||||
@@ -380,6 +389,63 @@ func (d *dependency) EventHub() eventhub.EventHub {
|
||||
return d.eventHub
|
||||
}
|
||||
|
||||
func (d *dependency) SearchIndexer(ctx context.Context) searcher.SearchIndexer {
|
||||
d.mu.Lock()
|
||||
defer d.mu.Unlock()
|
||||
|
||||
_, reload := ctx.Value(ReloadCtx{}).(bool)
|
||||
if d.searchIndexer != nil && !reload {
|
||||
return d.searchIndexer
|
||||
}
|
||||
|
||||
sp := d.SettingProvider()
|
||||
if !sp.FTSEnabled(ctx) || sp.FTSIndexType(ctx) != setting.FTSIndexTypeMeilisearch {
|
||||
d.searchIndexer = &indexer.NoopIndexer{}
|
||||
return d.searchIndexer
|
||||
}
|
||||
|
||||
msCfg := sp.FTSIndexMeilisearch(ctx)
|
||||
if msCfg.Endpoint == "" {
|
||||
d.searchIndexer = &indexer.NoopIndexer{}
|
||||
return d.searchIndexer
|
||||
}
|
||||
|
||||
idx := indexer.NewMeilisearchIndexer(msCfg, sp.FTSChunkSize(ctx), d.Logger())
|
||||
if err := idx.EnsureIndex(ctx); err != nil {
|
||||
d.Logger().Warning("Failed to ensure Meilisearch index: %s, falling back to noop", err)
|
||||
d.searchIndexer = &indexer.NoopIndexer{}
|
||||
return d.searchIndexer
|
||||
}
|
||||
|
||||
d.searchIndexer = idx
|
||||
return d.searchIndexer
|
||||
}
|
||||
|
||||
func (d *dependency) TextExtractor(ctx context.Context) searcher.TextExtractor {
|
||||
d.mu.Lock()
|
||||
defer d.mu.Unlock()
|
||||
|
||||
_, reload := ctx.Value(ReloadCtx{}).(bool)
|
||||
if d.textExtractor != nil && !reload {
|
||||
return d.textExtractor
|
||||
}
|
||||
|
||||
sp := d.SettingProvider()
|
||||
if sp.FTSExtractorType(ctx) != setting.FTSExtractorTypeTika {
|
||||
d.textExtractor = &extractor.NoopExtractor{}
|
||||
return d.textExtractor
|
||||
}
|
||||
|
||||
tikaCfg := sp.FTSTikaExtractor(ctx)
|
||||
if tikaCfg.Endpoint == "" {
|
||||
d.textExtractor = &extractor.NoopExtractor{}
|
||||
return d.textExtractor
|
||||
}
|
||||
|
||||
d.textExtractor = extractor.NewTikaExtractor(d.RequestClient(), d.SettingProvider(), d.Logger(), tikaCfg)
|
||||
return d.textExtractor
|
||||
}
|
||||
|
||||
func (d *dependency) FsEventClient() inventory.FsEventClient {
|
||||
if d.fsEventClient != nil {
|
||||
return d.fsEventClient
|
||||
@@ -578,7 +644,13 @@ func (d *dependency) MediaMetaQueue(ctx context.Context) queue.Queue {
|
||||
queue.WithWorkerCount(queueSetting.WorkerNum),
|
||||
queue.WithName("MediaMetadataQueue"),
|
||||
queue.WithMaxTaskExecution(queueSetting.MaxExecution),
|
||||
queue.WithResumeTaskType(queue.MediaMetaTaskType),
|
||||
queue.WithResumeTaskType(
|
||||
queue.MediaMetaTaskType,
|
||||
queue.FullTextIndexTaskType,
|
||||
queue.FullTextDeleteTaskType,
|
||||
queue.FullTextCopyTaskType,
|
||||
queue.FullTextChangeOwnerTaskType,
|
||||
),
|
||||
)
|
||||
return d.mediaMetaQueue
|
||||
}
|
||||
@@ -900,6 +972,12 @@ func (d *dependency) Shutdown(ctx context.Context) error {
|
||||
}()
|
||||
}
|
||||
|
||||
if d.searchIndexer != nil {
|
||||
if err := d.searchIndexer.Close(); err != nil {
|
||||
d.Logger().Warning("Failed to close search indexer: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
d.mu.Unlock()
|
||||
wg.Wait()
|
||||
|
||||
|
||||
@@ -11,6 +11,7 @@ import (
|
||||
"github.com/cloudreve/Cloudreve/v4/pkg/email"
|
||||
"github.com/cloudreve/Cloudreve/v4/pkg/hashid"
|
||||
"github.com/cloudreve/Cloudreve/v4/pkg/logging"
|
||||
"github.com/cloudreve/Cloudreve/v4/pkg/searcher"
|
||||
"github.com/cloudreve/Cloudreve/v4/pkg/setting"
|
||||
"github.com/gin-contrib/static"
|
||||
)
|
||||
@@ -158,3 +159,17 @@ func WithShareClient(s inventory.ShareClient) Option {
|
||||
o.shareClient = s
|
||||
})
|
||||
}
|
||||
|
||||
// WithSearchIndexer Set the default search indexer
|
||||
func WithSearchIndexer(s searcher.SearchIndexer) Option {
|
||||
return optionFunc(func(o *dependency) {
|
||||
o.searchIndexer = s
|
||||
})
|
||||
}
|
||||
|
||||
// WithTextExtractor Set the default text extractor
|
||||
func WithTextExtractor(s searcher.TextExtractor) Option {
|
||||
return optionFunc(func(o *dependency) {
|
||||
o.textExtractor = s
|
||||
})
|
||||
}
|
||||
|
||||
2
assets
2
assets
Submodule assets updated: 21a9819433...4a38a946cb
3
go.mod
3
go.mod
@@ -29,7 +29,7 @@ require (
|
||||
github.com/go-sql-driver/mysql v1.6.0
|
||||
github.com/go-webauthn/webauthn v0.11.2
|
||||
github.com/gofrs/uuid v4.0.0+incompatible
|
||||
github.com/golang-jwt/jwt/v5 v5.2.2
|
||||
github.com/golang-jwt/jwt/v5 v5.3.0
|
||||
github.com/gomodule/redigo v1.9.2
|
||||
github.com/google/go-querystring v1.1.0
|
||||
github.com/google/uuid v1.6.0
|
||||
@@ -42,6 +42,7 @@ require (
|
||||
github.com/juju/ratelimit v1.0.1
|
||||
github.com/ks3sdklib/aws-sdk-go v1.6.2
|
||||
github.com/lib/pq v1.10.9
|
||||
github.com/meilisearch/meilisearch-go v0.36.0
|
||||
github.com/mholt/archives v0.1.3
|
||||
github.com/mojocn/base64Captcha v0.0.0-20190801020520-752b1cd608b2
|
||||
github.com/pquerna/otp v1.2.0
|
||||
|
||||
6
go.sum
6
go.sum
@@ -379,8 +379,8 @@ github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zV
|
||||
github.com/gogo/protobuf v1.3.0/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o=
|
||||
github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o=
|
||||
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
|
||||
github.com/golang-jwt/jwt/v5 v5.2.2 h1:Rl4B7itRWVtYIHFrSNd7vhTiz9UpLdi6gZhZ3wEeDy8=
|
||||
github.com/golang-jwt/jwt/v5 v5.2.2/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk=
|
||||
github.com/golang-jwt/jwt/v5 v5.3.0 h1:pv4AsKCKKZuqlgs5sUmn4x8UlGa0kEVt/puTpKx9vvo=
|
||||
github.com/golang-jwt/jwt/v5 v5.3.0/go.mod h1:fxCRLWMO43lRc8nhHWY6LGqRcf+1gQWArsqaEUEa5bE=
|
||||
github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0 h1:DACJavvAHhabrF08vX0COfcOBJRhZ8lUbR+ZWIs0Y5g=
|
||||
github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0/go.mod h1:E/TSTwGwJL78qG/PmXZO1EjYhfJinVAhrmmHX6Z8B9k=
|
||||
github.com/golang/geo v0.0.0-20190916061304-5b978397cfec/go.mod h1:QZ0nwyI2jOfgRAoBvP+ab5aRr7c9x7lhGEJrKvBwjWI=
|
||||
@@ -689,6 +689,8 @@ github.com/mattn/go-sqlite3 v1.14.16 h1:yOQRA0RpS5PFz/oikGwBEqvAWhWg5ufRz4ETLjwp
|
||||
github.com/mattn/go-sqlite3 v1.14.16/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg=
|
||||
github.com/mattn/go-zglob v0.0.1/go.mod h1:9fxibJccNxU2cnpIKLRRFA7zX7qhkJIQWBb449FYHOo=
|
||||
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
|
||||
github.com/meilisearch/meilisearch-go v0.36.0 h1:N1etykTektXt5KPcSbhBO0d5Xx5NaKj4pJWEM7WA5dI=
|
||||
github.com/meilisearch/meilisearch-go v0.36.0/go.mod h1:HBfHzKMxcSbTOvqdfuRA/yf6Vk9IivcwKocWRuW7W78=
|
||||
github.com/mgutz/ansi v0.0.0-20170206155736-9520e82c474b/go.mod h1:01TrycV0kFyexm33Z7vhZRXopbI8J3TDReVlkTgMUxE=
|
||||
github.com/mholt/archives v0.1.3 h1:aEAaOtNra78G+TvV5ohmXrJOAzf++dIlYeDW3N9q458=
|
||||
github.com/mholt/archives v0.1.3/go.mod h1:LUCGp++/IbV/I0Xq4SzcIR6uwgeh2yjnQWamjRQfLTU=
|
||||
|
||||
@@ -3,11 +3,12 @@ package inventory
|
||||
import (
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"entgo.io/ent/dialect/sql"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"entgo.io/ent/dialect/sql"
|
||||
"github.com/cloudreve/Cloudreve/v4/pkg/conf"
|
||||
"github.com/cloudreve/Cloudreve/v4/pkg/hashid"
|
||||
"time"
|
||||
)
|
||||
|
||||
type (
|
||||
|
||||
@@ -139,6 +139,12 @@ type (
|
||||
ParentFiles []int
|
||||
PrimaryEntityParentFiles []int
|
||||
}
|
||||
|
||||
CopyParameter struct {
|
||||
Files []*ent.File
|
||||
DstMap map[int][]*ent.File
|
||||
ExcludedMetadataKeys []string
|
||||
}
|
||||
)
|
||||
|
||||
type FileClient interface {
|
||||
@@ -187,7 +193,7 @@ type FileClient interface {
|
||||
// UpsertMetadata update or insert metadata
|
||||
UpsertMetadata(ctx context.Context, file *ent.File, data map[string]string, privateMask map[string]bool) error
|
||||
// Copy copies a layer of file to its corresponding destination folder. dstMap is a map from src parent ID to dst parent Files.
|
||||
Copy(ctx context.Context, files []*ent.File, dstMap map[int][]*ent.File) (map[int][]*ent.File, StorageDiff, error)
|
||||
Copy(ctx context.Context, args *CopyParameter) (map[int][]*ent.File, StorageDiff, error)
|
||||
// Delete deletes a group of files (and related models) with given entity recycle option
|
||||
Delete(ctx context.Context, files []*ent.File, options *types.EntityProps) ([]*ent.Entity, StorageDiff, error)
|
||||
// StaleEntities returns stale entities of a given file. If ID is not provided, all entities
|
||||
@@ -562,7 +568,9 @@ func (f *fileClient) Delete(ctx context.Context, files []*ent.File, options *typ
|
||||
return toBeRecycled, storageReduced, nil
|
||||
}
|
||||
|
||||
func (f *fileClient) Copy(ctx context.Context, files []*ent.File, dstMap map[int][]*ent.File) (map[int][]*ent.File, StorageDiff, error) {
|
||||
func (f *fileClient) Copy(ctx context.Context, args *CopyParameter) (map[int][]*ent.File, StorageDiff, error) {
|
||||
files := args.Files
|
||||
dstMap := args.DstMap
|
||||
pageSize := capPageSize(f.maxSQlParam, intsets.MaxInt, 10)
|
||||
// 1. Copy files and metadata
|
||||
copyFileStm := lo.Map(files, func(file *ent.File, index int) *ent.FileCreate {
|
||||
@@ -603,12 +611,15 @@ func (f *fileClient) Copy(ctx context.Context, files []*ent.File, dstMap map[int
|
||||
return nil, nil, fmt.Errorf("failed to get metadata of file: %w", err)
|
||||
}
|
||||
|
||||
metadataStm = append(metadataStm, lo.Map(fileMetadata, func(metadata *ent.Metadata, index int) *ent.MetadataCreate {
|
||||
metadataStm = append(metadataStm, lo.FilterMap(fileMetadata, func(metadata *ent.Metadata, index int) (*ent.MetadataCreate, bool) {
|
||||
if lo.Contains(args.ExcludedMetadataKeys, metadata.Name) {
|
||||
return nil, false
|
||||
}
|
||||
return f.client.Metadata.Create().
|
||||
SetName(metadata.Name).
|
||||
SetValue(metadata.Value).
|
||||
SetFile(newFile).
|
||||
SetIsPublic(metadata.IsPublic)
|
||||
SetIsPublic(metadata.IsPublic), true
|
||||
})...)
|
||||
|
||||
fileEntities, err := files[index].Edges.EntitiesOrErr()
|
||||
|
||||
@@ -672,6 +672,19 @@ var DefaultSettings = map[string]string{
|
||||
"fs_event_push_enabled": "1",
|
||||
"fs_event_push_max_age": "1209600",
|
||||
"fs_event_push_debounce": "5",
|
||||
"fts_enabled": "0",
|
||||
"fts_index_type": "",
|
||||
"fts_extractor_type": "",
|
||||
"fts_meilisearch_endpoint": "",
|
||||
"fts_meilisearch_api_key": "",
|
||||
"fts_meilisearch_page_size": "5",
|
||||
"fts_meilisearch_embed_enabled": "0",
|
||||
"fts_meilisearch_embed_config": "{}",
|
||||
"fts_tika_endpoint": "",
|
||||
"fts_tika_max_response_size": "10485760",
|
||||
"fts_tika_exts": "pdf,doc,docx,xls,xlsx,ppt,pptx,odt,ods,odp,rtf,txt,md,html,htm,epub,csv",
|
||||
"fts_tika_max_file_size": "26214400",
|
||||
"fts_chunk_size": "2000",
|
||||
}
|
||||
|
||||
var RedactedSettings = map[string]struct{}{
|
||||
|
||||
@@ -66,6 +66,8 @@ const (
|
||||
ThumbMetadataPrefix = "thumb:"
|
||||
ThumbDisabledKey = ThumbMetadataPrefix + "disabled"
|
||||
|
||||
FullTextIndexKey = MetadataSysPrefix + "fulltext_index"
|
||||
|
||||
pathIndexRoot = 0
|
||||
pathIndexUser = 1
|
||||
)
|
||||
|
||||
@@ -147,47 +147,48 @@ func (f *DBFS) Create(ctx context.Context, path *fs.URI, fileType types.FileType
|
||||
return ancestor, nil
|
||||
}
|
||||
|
||||
func (f *DBFS) Rename(ctx context.Context, path *fs.URI, newName string) (fs.File, error) {
|
||||
func (f *DBFS) Rename(ctx context.Context, path *fs.URI, newName string) (fs.File, *fs.IndexDiff, error) {
|
||||
// Get navigator
|
||||
navigator, err := f.getNavigator(ctx, path, NavigatorCapabilityRenameFile, NavigatorCapabilityLockFile)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
// Get target file
|
||||
ctx = context.WithValue(ctx, inventory.LoadFileMetadata{}, true)
|
||||
target, err := f.getFileByPath(ctx, navigator, path)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get target file: %w", err)
|
||||
return nil, nil, fmt.Errorf("failed to get target file: %w", err)
|
||||
}
|
||||
oldName := target.Name()
|
||||
|
||||
if _, ok := ctx.Value(ByPassOwnerCheckCtxKey{}).(bool); !ok && target.Owner().ID != f.user.ID {
|
||||
return nil, fs.ErrOwnerOnly
|
||||
return nil, nil, fs.ErrOwnerOnly
|
||||
}
|
||||
|
||||
// Root folder cannot be modified
|
||||
if target.IsRootFolder() {
|
||||
return nil, fs.ErrNotSupportedAction.WithError(fmt.Errorf("cannot modify root folder"))
|
||||
return nil, nil, fs.ErrNotSupportedAction.WithError(fmt.Errorf("cannot modify root folder"))
|
||||
}
|
||||
|
||||
// Validate new name
|
||||
if err := validateFileName(newName); err != nil {
|
||||
return nil, fs.ErrIllegalObjectName.WithError(err)
|
||||
return nil, nil, fs.ErrIllegalObjectName.WithError(err)
|
||||
}
|
||||
|
||||
// If target is a file, validate file extension
|
||||
policy, err := f.getPreferredPolicy(ctx, target)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
if target.Type() == types.FileTypeFile {
|
||||
if err := validateExtension(newName, policy); err != nil {
|
||||
return nil, fs.ErrIllegalObjectName.WithError(err)
|
||||
return nil, nil, fs.ErrIllegalObjectName.WithError(err)
|
||||
}
|
||||
|
||||
if err := validateFileNameRegexp(newName, policy); err != nil {
|
||||
return nil, fs.ErrIllegalObjectName.WithError(err)
|
||||
return nil, nil, fs.ErrIllegalObjectName.WithError(err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -196,39 +197,54 @@ func (f *DBFS) Rename(ctx context.Context, path *fs.URI, newName string) (fs.Fil
|
||||
&LockByPath{target.Uri(true), target, target.Type(), ""})
|
||||
defer func() { _ = f.Release(ctx, ls) }()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
// Rename target
|
||||
fc, tx, ctx, err := inventory.WithTx(ctx, f.fileClient)
|
||||
if err != nil {
|
||||
return nil, serializer.NewError(serializer.CodeDBError, "Failed to start transaction", err)
|
||||
return nil, nil, serializer.NewError(serializer.CodeDBError, "Failed to start transaction", err)
|
||||
}
|
||||
|
||||
updated, err := fc.Rename(ctx, target.Model, newName)
|
||||
if err != nil {
|
||||
_ = inventory.Rollback(tx)
|
||||
if ent.IsConstraintError(err) {
|
||||
return nil, fs.ErrFileExisted.WithError(err)
|
||||
return nil, nil, fs.ErrFileExisted.WithError(err)
|
||||
}
|
||||
|
||||
return nil, serializer.NewError(serializer.CodeDBError, "failed to update file", err)
|
||||
return nil, nil, serializer.NewError(serializer.CodeDBError, "failed to update file", err)
|
||||
}
|
||||
|
||||
if target.Type() == types.FileTypeFile && !strings.EqualFold(filepath.Ext(newName), filepath.Ext(oldName)) {
|
||||
if err := fc.RemoveMetadata(ctx, target.Model, ThumbDisabledKey); err != nil {
|
||||
_ = inventory.Rollback(tx)
|
||||
return nil, serializer.NewError(serializer.CodeDBError, "failed to remove disabled thumbnail mark", err)
|
||||
return nil, nil, serializer.NewError(serializer.CodeDBError, "failed to remove disabled thumbnail mark", err)
|
||||
}
|
||||
}
|
||||
|
||||
if err := inventory.Commit(tx); err != nil {
|
||||
return nil, serializer.NewError(serializer.CodeDBError, "Failed to commit rename change", err)
|
||||
return nil, nil, serializer.NewError(serializer.CodeDBError, "Failed to commit rename change", err)
|
||||
}
|
||||
|
||||
f.emitFileRenamed(ctx, target, newName)
|
||||
|
||||
return target.Replace(updated), nil
|
||||
originalMetadata := target.Metadata()
|
||||
newFile := target.Replace(updated)
|
||||
var diff *fs.IndexDiff
|
||||
if _, ok := originalMetadata[FullTextIndexKey]; ok {
|
||||
diff = &fs.IndexDiff{
|
||||
IndexToRename: []fs.IndexDiffRenameDetails{
|
||||
{
|
||||
Uri: *newFile.Uri(false),
|
||||
FileID: newFile.ID(),
|
||||
EntityID: newFile.PrimaryEntityID(),
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
return newFile, diff, nil
|
||||
}
|
||||
|
||||
func (f *DBFS) SoftDelete(ctx context.Context, path ...*fs.URI) error {
|
||||
@@ -311,7 +327,7 @@ func (f *DBFS) SoftDelete(ctx context.Context, path ...*fs.URI) error {
|
||||
return ae.Aggregate()
|
||||
}
|
||||
|
||||
func (f *DBFS) Delete(ctx context.Context, path []*fs.URI, opts ...fs.Option) ([]fs.Entity, error) {
|
||||
func (f *DBFS) Delete(ctx context.Context, path []*fs.URI, opts ...fs.Option) ([]fs.Entity, *fs.IndexDiff, error) {
|
||||
o := newDbfsOption()
|
||||
for _, opt := range opts {
|
||||
o.apply(opt)
|
||||
@@ -362,7 +378,7 @@ func (f *DBFS) Delete(ctx context.Context, path []*fs.URI, opts ...fs.Option) ([
|
||||
|
||||
targets := lo.Flatten(lo.Values(fileNavGroup))
|
||||
if len(targets) == 0 {
|
||||
return nil, ae.Aggregate()
|
||||
return nil, nil, ae.Aggregate()
|
||||
}
|
||||
// Lock all targets
|
||||
lockTargets := lo.Map(targets, func(value *File, key int) *LockByPath {
|
||||
@@ -371,50 +387,55 @@ func (f *DBFS) Delete(ctx context.Context, path []*fs.URI, opts ...fs.Option) ([
|
||||
ls, err := f.acquireByPath(ctx, -1, f.user, false, fs.LockApp(fs.ApplicationDelete), lockTargets...)
|
||||
defer func() { _ = f.Release(ctx, ls) }()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
fc, tx, ctx, err := inventory.WithTx(ctx, f.fileClient)
|
||||
if err != nil {
|
||||
return nil, serializer.NewError(serializer.CodeDBError, "Failed to start transaction", err)
|
||||
return nil, nil, serializer.NewError(serializer.CodeDBError, "Failed to start transaction", err)
|
||||
}
|
||||
|
||||
// Delete targets
|
||||
newStaleEntities, storageDiff, err := f.deleteFiles(ctx, fileNavGroup, fc, opt)
|
||||
newStaleEntities, storageDiff, indexToDelete, err := f.deleteFiles(ctx, fileNavGroup, fc, opt)
|
||||
if err != nil {
|
||||
_ = inventory.Rollback(tx)
|
||||
return nil, serializer.NewError(serializer.CodeDBError, "failed to delete files", err)
|
||||
return nil, nil, serializer.NewError(serializer.CodeDBError, "failed to delete files", err)
|
||||
}
|
||||
|
||||
tx.AppendStorageDiff(storageDiff)
|
||||
if err := inventory.CommitWithStorageDiff(ctx, tx, f.l, f.userClient); err != nil {
|
||||
return nil, serializer.NewError(serializer.CodeDBError, "Failed to commit delete change", err)
|
||||
return nil, nil, serializer.NewError(serializer.CodeDBError, "Failed to commit delete change", err)
|
||||
}
|
||||
f.emitFileDeleted(ctx, targets...)
|
||||
return newStaleEntities, ae.Aggregate()
|
||||
return newStaleEntities, &fs.IndexDiff{
|
||||
IndexToDelete: indexToDelete,
|
||||
}, ae.Aggregate()
|
||||
}
|
||||
|
||||
func (f *DBFS) VersionControl(ctx context.Context, path *fs.URI, versionId int, delete bool) error {
|
||||
func (f *DBFS) VersionControl(ctx context.Context, path *fs.URI, versionId int, delete bool) (*fs.IndexDiff, error) {
|
||||
// Get navigator
|
||||
navigator, err := f.getNavigator(ctx, path, NavigatorCapabilityVersionControl)
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Get target file
|
||||
ctx = context.WithValue(ctx, inventory.LoadFileEntity{}, true)
|
||||
if !delete {
|
||||
ctx = context.WithValue(ctx, inventory.LoadFileMetadata{}, true)
|
||||
}
|
||||
target, err := f.getFileByPath(ctx, navigator, path)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get target file: %w", err)
|
||||
return nil, fmt.Errorf("failed to get target file: %w", err)
|
||||
}
|
||||
|
||||
if _, ok := ctx.Value(ByPassOwnerCheckCtxKey{}).(bool); !ok && target.Owner().ID != f.user.ID {
|
||||
return fs.ErrOwnerOnly
|
||||
return nil, fs.ErrOwnerOnly
|
||||
}
|
||||
|
||||
// Target must be a file
|
||||
if target.Type() != types.FileTypeFile {
|
||||
return fs.ErrNotSupportedAction.WithError(fmt.Errorf("target must be a valid file"))
|
||||
return nil, fs.ErrNotSupportedAction.WithError(fmt.Errorf("target must be a valid file"))
|
||||
}
|
||||
|
||||
// Lock file
|
||||
@@ -422,21 +443,38 @@ func (f *DBFS) VersionControl(ctx context.Context, path *fs.URI, versionId int,
|
||||
&LockByPath{target.Uri(true), target, target.Type(), ""})
|
||||
defer func() { _ = f.Release(ctx, ls) }()
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if delete {
|
||||
storageDiff, err := f.deleteEntity(ctx, target, versionId)
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := f.userClient.ApplyStorageDiff(ctx, storageDiff); err != nil {
|
||||
f.l.Error("Failed to apply storage diff after deleting version: %s", err)
|
||||
}
|
||||
return nil
|
||||
return nil, nil
|
||||
} else {
|
||||
return f.setCurrentVersion(ctx, target, versionId)
|
||||
if err := f.setCurrentVersion(ctx, target, versionId); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if _, ok := target.Metadata()[FullTextIndexKey]; ok {
|
||||
return &fs.IndexDiff{
|
||||
IndexToUpdate: []fs.IndexDiffUpdateDetails{
|
||||
{
|
||||
Uri: *target.Uri(false),
|
||||
FileID: target.ID(),
|
||||
OwnerID: target.Owner().ID,
|
||||
EntityID: versionId,
|
||||
},
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
return nil, nil
|
||||
}
|
||||
}
|
||||
|
||||
@@ -484,7 +522,7 @@ func (f *DBFS) Restore(ctx context.Context, path ...*fs.URI) error {
|
||||
|
||||
// Copy each file to its original location
|
||||
for _, uris := range allTrashUriStr {
|
||||
if err := f.MoveOrCopy(ctx, []*fs.URI{uris[0]}, uris[1], false); err != nil {
|
||||
if _, err := f.MoveOrCopy(ctx, []*fs.URI{uris[0]}, uris[1], false); err != nil {
|
||||
if !ae.Merge(err) {
|
||||
ae.Add(uris[0].String(), err)
|
||||
}
|
||||
@@ -495,26 +533,26 @@ func (f *DBFS) Restore(ctx context.Context, path ...*fs.URI) error {
|
||||
|
||||
}
|
||||
|
||||
func (f *DBFS) MoveOrCopy(ctx context.Context, path []*fs.URI, dst *fs.URI, isCopy bool) error {
|
||||
func (f *DBFS) MoveOrCopy(ctx context.Context, path []*fs.URI, dst *fs.URI, isCopy bool) (*fs.IndexDiff, error) {
|
||||
targets := make([]*File, 0, len(path))
|
||||
dstNavigator, err := f.getNavigator(ctx, dst, NavigatorCapabilityLockFile)
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Get destination file
|
||||
destination, err := f.getFileByPath(ctx, dstNavigator, dst)
|
||||
if err != nil {
|
||||
return fmt.Errorf("faield to get destination folder: %w", err)
|
||||
return nil, fmt.Errorf("faield to get destination folder: %w", err)
|
||||
}
|
||||
|
||||
if _, ok := ctx.Value(ByPassOwnerCheckCtxKey{}).(bool); !ok && destination.Owner().ID != f.user.ID {
|
||||
return fs.ErrOwnerOnly
|
||||
return nil, fs.ErrOwnerOnly
|
||||
}
|
||||
|
||||
// Target must be a folder
|
||||
if !destination.CanHaveChildren() {
|
||||
return fs.ErrNotSupportedAction.WithError(fmt.Errorf("destination must be a valid folder"))
|
||||
return nil, fs.ErrNotSupportedAction.WithError(fmt.Errorf("destination must be a valid folder"))
|
||||
}
|
||||
|
||||
ae := serializer.NewAggregateError()
|
||||
@@ -571,6 +609,7 @@ func (f *DBFS) MoveOrCopy(ctx context.Context, path []*fs.URI, dst *fs.URI, isCo
|
||||
}
|
||||
}
|
||||
|
||||
indexDiff := &fs.IndexDiff{}
|
||||
if len(targets) > 0 {
|
||||
// Lock all targets
|
||||
lockTargets := lo.Map(targets, func(value *File, key int) *LockByPath {
|
||||
@@ -598,33 +637,35 @@ func (f *DBFS) MoveOrCopy(ctx context.Context, path []*fs.URI, dst *fs.URI, isCo
|
||||
ls, err := f.acquireByPath(ctx, -1, f.user, false, fs.LockApp(fs.ApplicationMoveCopy), allLockTargets...)
|
||||
defer func() { _ = f.Release(ctx, ls) }()
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Start transaction to move files
|
||||
fc, tx, ctx, err := inventory.WithTx(ctx, f.fileClient)
|
||||
if err != nil {
|
||||
return serializer.NewError(serializer.CodeDBError, "Failed to start transaction", err)
|
||||
return nil, serializer.NewError(serializer.CodeDBError, "Failed to start transaction", err)
|
||||
}
|
||||
|
||||
var (
|
||||
copiedNewTargetsMap map[int]*ent.File
|
||||
storageDiff inventory.StorageDiff
|
||||
indexDiffBatch *fs.IndexDiff
|
||||
)
|
||||
if isCopy {
|
||||
copiedNewTargetsMap, storageDiff, err = f.copyFiles(ctx, fileNavGroup, destination, fc)
|
||||
copiedNewTargetsMap, storageDiff, indexDiffBatch, err = f.copyFiles(ctx, fileNavGroup, destination, fc)
|
||||
} else {
|
||||
storageDiff, err = f.moveFiles(ctx, targets, destination, fc, dstNavigator)
|
||||
storageDiff, indexDiffBatch, err = f.moveFiles(ctx, targets, destination, fc, dstNavigator)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
_ = inventory.Rollback(tx)
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
indexDiff.Merge(indexDiffBatch)
|
||||
tx.AppendStorageDiff(storageDiff)
|
||||
if err := inventory.CommitWithStorageDiff(ctx, tx, f.l, f.userClient); err != nil {
|
||||
return serializer.NewError(serializer.CodeDBError, "Failed to commit move change", err)
|
||||
return nil, serializer.NewError(serializer.CodeDBError, "Failed to commit move change", err)
|
||||
}
|
||||
|
||||
for _, target := range targets {
|
||||
@@ -638,7 +679,7 @@ func (f *DBFS) MoveOrCopy(ctx context.Context, path []*fs.URI, dst *fs.URI, isCo
|
||||
// TODO: after move, dbfs cache should be cleared
|
||||
}
|
||||
|
||||
return ae.Aggregate()
|
||||
return indexDiff, ae.Aggregate()
|
||||
}
|
||||
|
||||
func (f *DBFS) GetFileFromDirectLink(ctx context.Context, dl *ent.DirectLink) (fs.File, error) {
|
||||
@@ -773,17 +814,18 @@ func (f *DBFS) setCurrentVersion(ctx context.Context, target *File, versionId in
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *DBFS) deleteFiles(ctx context.Context, targets map[Navigator][]*File, fc inventory.FileClient, opt *types.EntityProps) ([]fs.Entity, inventory.StorageDiff, error) {
|
||||
func (f *DBFS) deleteFiles(ctx context.Context, targets map[Navigator][]*File, fc inventory.FileClient, opt *types.EntityProps) ([]fs.Entity, inventory.StorageDiff, []int, error) {
|
||||
if f.user.Edges.Group == nil {
|
||||
return nil, nil, fmt.Errorf("user group not loaded")
|
||||
return nil, nil, nil, fmt.Errorf("user group not loaded")
|
||||
}
|
||||
allStaleEntities := make([]fs.Entity, 0, len(targets))
|
||||
storageDiff := make(inventory.StorageDiff)
|
||||
indexToDelete := make([]int, 0)
|
||||
for n, files := range targets {
|
||||
// Let navigator use tx
|
||||
reset, err := n.FollowTx(ctx)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
|
||||
defer reset()
|
||||
@@ -792,9 +834,12 @@ func (f *DBFS) deleteFiles(ctx context.Context, targets map[Navigator][]*File, f
|
||||
toBeDeletedFiles := make([]*File, 0, len(files))
|
||||
if err := n.Walk(ctx, files, intsets.MaxInt, intsets.MaxInt, func(targets []*File, level int) error {
|
||||
toBeDeletedFiles = append(toBeDeletedFiles, targets...)
|
||||
indexToDelete = append(indexToDelete, lo.Map(targets, func(item *File, index int) int {
|
||||
return item.ID()
|
||||
})...)
|
||||
return nil
|
||||
}); err != nil {
|
||||
return nil, nil, fmt.Errorf("failed to walk files: %w", err)
|
||||
return nil, nil, nil, fmt.Errorf("failed to walk files: %w", err)
|
||||
}
|
||||
|
||||
// Delete files
|
||||
@@ -802,7 +847,7 @@ func (f *DBFS) deleteFiles(ctx context.Context, targets map[Navigator][]*File, f
|
||||
return item.Model
|
||||
}), opt)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("failed to delete files: %w", err)
|
||||
return nil, nil, nil, fmt.Errorf("failed to delete files: %w", err)
|
||||
}
|
||||
storageDiff.Merge(diff)
|
||||
allStaleEntities = append(allStaleEntities, lo.Map(staleEntities, func(item *ent.Entity, index int) fs.Entity {
|
||||
@@ -810,17 +855,17 @@ func (f *DBFS) deleteFiles(ctx context.Context, targets map[Navigator][]*File, f
|
||||
})...)
|
||||
}
|
||||
|
||||
return allStaleEntities, storageDiff, nil
|
||||
return allStaleEntities, storageDiff, indexToDelete, nil
|
||||
}
|
||||
|
||||
func (f *DBFS) copyFiles(ctx context.Context, targets map[Navigator][]*File, destination *File, fc inventory.FileClient) (map[int]*ent.File, inventory.StorageDiff, error) {
|
||||
func (f *DBFS) copyFiles(ctx context.Context, targets map[Navigator][]*File, destination *File, fc inventory.FileClient) (map[int]*ent.File, inventory.StorageDiff, *fs.IndexDiff, error) {
|
||||
if f.user.Edges.Group == nil {
|
||||
return nil, nil, fmt.Errorf("user group not loaded")
|
||||
return nil, nil, nil, fmt.Errorf("user group not loaded")
|
||||
}
|
||||
limit := max(f.user.Edges.Group.Settings.MaxWalkedFiles, 1)
|
||||
capacity, err := f.Capacity(ctx, destination.Owner())
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("copy files: failed to destination owner capacity: %w", err)
|
||||
return nil, nil, nil, fmt.Errorf("copy files: failed to destination owner capacity: %w", err)
|
||||
}
|
||||
|
||||
dstAncestors := lo.Map(destination.AncestorsChain(), func(item *File, index int) *ent.File {
|
||||
@@ -830,6 +875,7 @@ func (f *DBFS) copyFiles(ctx context.Context, targets map[Navigator][]*File, des
|
||||
// newTargetsMap is the map of between new target files in first layer, and its src file ID.
|
||||
newTargetsMap := make(map[int]*ent.File)
|
||||
storageDiff := make(inventory.StorageDiff)
|
||||
indexToCopy := make([]fs.IndexDiffCopyDetails, 0)
|
||||
var diff inventory.StorageDiff
|
||||
for n, files := range targets {
|
||||
initialDstMap := make(map[int][]*ent.File)
|
||||
@@ -841,7 +887,7 @@ func (f *DBFS) copyFiles(ctx context.Context, targets map[Navigator][]*File, des
|
||||
// Let navigator use tx
|
||||
reset, err := n.FollowTx(ctx)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
|
||||
defer reset()
|
||||
@@ -858,9 +904,13 @@ func (f *DBFS) copyFiles(ctx context.Context, targets map[Navigator][]*File, des
|
||||
}
|
||||
|
||||
limit -= len(targets)
|
||||
initialDstMap, diff, err = fc.Copy(ctx, lo.Map(targets, func(item *File, index int) *ent.File {
|
||||
return item.Model
|
||||
}), initialDstMap)
|
||||
initialDstMap, diff, err = fc.Copy(ctx, &inventory.CopyParameter{
|
||||
Files: lo.Map(targets, func(item *File, index int) *ent.File {
|
||||
return item.Model
|
||||
}),
|
||||
ExcludedMetadataKeys: []string{FullTextIndexKey},
|
||||
DstMap: initialDstMap,
|
||||
})
|
||||
if err != nil {
|
||||
if ent.IsConstraintError(err) {
|
||||
return fs.ErrFileExisted.WithError(err)
|
||||
@@ -870,26 +920,45 @@ func (f *DBFS) copyFiles(ctx context.Context, targets map[Navigator][]*File, des
|
||||
}
|
||||
|
||||
storageDiff.Merge(diff)
|
||||
|
||||
if firstLayer {
|
||||
for k, v := range initialDstMap {
|
||||
newTargetsMap[k] = v[0]
|
||||
}
|
||||
}
|
||||
|
||||
for _, file := range targets {
|
||||
if _, ok := file.Metadata()[FullTextIndexKey]; ok {
|
||||
copiedFile := newTargetsMap[file.ID()]
|
||||
indexToCopy = append(indexToCopy, fs.IndexDiffCopyDetails{
|
||||
OriginalFileID: file.ID(),
|
||||
FileID: copiedFile.ID,
|
||||
Uri: *destination.Uri(false).Join(file.Name()),
|
||||
EntityID: copiedFile.PrimaryEntity,
|
||||
OwnerID: destination.OwnerID(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
capacity.Used += sizeTotal
|
||||
firstLayer = false
|
||||
|
||||
return nil
|
||||
}); err != nil {
|
||||
return nil, nil, fmt.Errorf("failed to walk files: %w", err)
|
||||
return nil, nil, nil, fmt.Errorf("failed to walk files: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return newTargetsMap, storageDiff, nil
|
||||
var indexDiff *fs.IndexDiff
|
||||
if len(indexToCopy) > 0 {
|
||||
indexDiff = &fs.IndexDiff{
|
||||
IndexToCopy: indexToCopy,
|
||||
}
|
||||
}
|
||||
|
||||
return newTargetsMap, storageDiff, indexDiff, nil
|
||||
}
|
||||
|
||||
func (f *DBFS) moveFiles(ctx context.Context, targets []*File, destination *File, fc inventory.FileClient, n Navigator) (inventory.StorageDiff, error) {
|
||||
func (f *DBFS) moveFiles(ctx context.Context, targets []*File, destination *File, fc inventory.FileClient, n Navigator) (inventory.StorageDiff, *fs.IndexDiff, error) {
|
||||
models := lo.Map(targets, func(value *File, key int) *ent.File {
|
||||
return value.Model
|
||||
})
|
||||
@@ -897,10 +966,10 @@ func (f *DBFS) moveFiles(ctx context.Context, targets []*File, destination *File
|
||||
// Change targets' parent
|
||||
if err := fc.SetParent(ctx, models, destination.Model); err != nil {
|
||||
if ent.IsConstraintError(err) {
|
||||
return nil, fs.ErrFileExisted.WithError(err)
|
||||
return nil, nil, fs.ErrFileExisted.WithError(err)
|
||||
}
|
||||
|
||||
return nil, serializer.NewError(serializer.CodeDBError, "Failed to move file", err)
|
||||
return nil, nil, serializer.NewError(serializer.CodeDBError, "Failed to move file", err)
|
||||
}
|
||||
|
||||
var (
|
||||
@@ -916,17 +985,17 @@ func (f *DBFS) moveFiles(ctx context.Context, targets []*File, destination *File
|
||||
// renaming it to its original name
|
||||
if _, err := fc.Rename(ctx, file.Model, file.DisplayName()); err != nil {
|
||||
if ent.IsConstraintError(err) {
|
||||
return nil, fs.ErrFileExisted.WithError(err)
|
||||
return nil, nil, fs.ErrFileExisted.WithError(err)
|
||||
}
|
||||
|
||||
return storageDiff, serializer.NewError(serializer.CodeDBError, "Failed to rename file from trash bin to its original name", err)
|
||||
return storageDiff, nil, serializer.NewError(serializer.CodeDBError, "Failed to rename file from trash bin to its original name", err)
|
||||
}
|
||||
|
||||
// Remove trash bin metadata
|
||||
if err := fc.RemoveMetadata(ctx, file.Model, MetadataRestoreUri, MetadataExpectedCollectTime); err != nil {
|
||||
return storageDiff, serializer.NewError(serializer.CodeDBError, "Failed to remove trash related metadata", err)
|
||||
return storageDiff, nil, serializer.NewError(serializer.CodeDBError, "Failed to remove trash related metadata", err)
|
||||
}
|
||||
}
|
||||
|
||||
return storageDiff, nil
|
||||
return storageDiff, nil, nil
|
||||
}
|
||||
|
||||
@@ -369,18 +369,18 @@ func (f *DBFS) CompleteUpload(ctx context.Context, session *fs.UploadSession) (f
|
||||
// - File still locked by uplaod session
|
||||
// - File unlocked, upload session valid
|
||||
// - File unlocked, upload session not valid
|
||||
func (f *DBFS) CancelUploadSession(ctx context.Context, path *fs.URI, sessionID string, session *fs.UploadSession) ([]fs.Entity, error) {
|
||||
func (f *DBFS) CancelUploadSession(ctx context.Context, path *fs.URI, sessionID string, session *fs.UploadSession) ([]fs.Entity, *fs.IndexDiff, error) {
|
||||
// Get placeholder file
|
||||
file, err := f.Get(ctx, path, WithFileEntities(), WithNotRoot())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get placeholder file: %w", err)
|
||||
return nil, nil, fmt.Errorf("failed to get placeholder file: %w", err)
|
||||
}
|
||||
|
||||
filePrivate := file.(*File)
|
||||
|
||||
// Make sure presented upload session is valid
|
||||
if session != nil && (session.UID != f.user.ID || session.FileID != file.ID()) {
|
||||
return nil, serializer.NewError(serializer.CodeNotFound, "Upload session not found", nil)
|
||||
return nil, nil, serializer.NewError(serializer.CodeNotFound, "Upload session not found", nil)
|
||||
}
|
||||
|
||||
// Confirm locks on placeholder file
|
||||
@@ -393,7 +393,7 @@ func (f *DBFS) CancelUploadSession(ctx context.Context, path *fs.URI, sessionID
|
||||
}
|
||||
|
||||
if _, ok := ctx.Value(ByPassOwnerCheckCtxKey{}).(bool); !ok && filePrivate.OwnerID() != f.user.ID {
|
||||
return nil, fs.ErrOwnerOnly
|
||||
return nil, nil, fs.ErrOwnerOnly
|
||||
}
|
||||
|
||||
// Lock file
|
||||
@@ -402,7 +402,7 @@ func (f *DBFS) CancelUploadSession(ctx context.Context, path *fs.URI, sessionID
|
||||
defer func() { _ = f.Release(ctx, ls) }()
|
||||
ctx = fs.LockSessionToContext(ctx, ls)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
// Find placeholder entity
|
||||
@@ -416,12 +416,12 @@ func (f *DBFS) CancelUploadSession(ctx context.Context, path *fs.URI, sessionID
|
||||
|
||||
// Remove upload session metadata
|
||||
if err := f.fileClient.RemoveMetadata(ctx, filePrivate.Model, MetadataUploadSessionID, ThumbDisabledKey); err != nil {
|
||||
return nil, serializer.NewError(serializer.CodeDBError, "Failed to remove upload session metadata", err)
|
||||
return nil, nil, serializer.NewError(serializer.CodeDBError, "Failed to remove upload session metadata", err)
|
||||
}
|
||||
|
||||
if entity == nil {
|
||||
// Given upload session does not exist
|
||||
return nil, nil
|
||||
return nil, nil, nil
|
||||
}
|
||||
|
||||
if session != nil && session.LockToken != "" {
|
||||
@@ -434,18 +434,19 @@ func (f *DBFS) CancelUploadSession(ctx context.Context, path *fs.URI, sessionID
|
||||
|
||||
if len(filePrivate.Entities()) == 1 {
|
||||
// Only one placeholder entity, just delete this file
|
||||
return f.Delete(ctx, []*fs.URI{path})
|
||||
entities, indexDiff, err := f.Delete(ctx, []*fs.URI{path})
|
||||
return entities, indexDiff, err
|
||||
}
|
||||
|
||||
// Delete place holder entity
|
||||
storageDiff, err := f.deleteEntity(ctx, filePrivate, entity.ID())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to delete placeholder entity: %w", err)
|
||||
return nil, nil, fmt.Errorf("failed to delete placeholder entity: %w", err)
|
||||
}
|
||||
|
||||
if err := f.userClient.ApplyStorageDiff(ctx, storageDiff); err != nil {
|
||||
return nil, fmt.Errorf("failed to apply storage diff: %w", err)
|
||||
return nil, nil, fmt.Errorf("failed to apply storage diff: %w", err)
|
||||
}
|
||||
|
||||
return nil, nil
|
||||
return nil, nil, nil
|
||||
}
|
||||
|
||||
@@ -76,11 +76,11 @@ type (
|
||||
// List lists files under give path.
|
||||
List(ctx context.Context, path *URI, opts ...Option) (File, *ListFileResult, error)
|
||||
// Rename renames a file.
|
||||
Rename(ctx context.Context, path *URI, newName string) (File, error)
|
||||
Rename(ctx context.Context, path *URI, newName string) (File, *IndexDiff, error)
|
||||
// Move moves files to dst.
|
||||
MoveOrCopy(ctx context.Context, path []*URI, dst *URI, isCopy bool) error
|
||||
MoveOrCopy(ctx context.Context, path []*URI, dst *URI, isCopy bool) (*IndexDiff, error)
|
||||
// Delete performs hard-delete for given paths, return newly generated stale entities in this delete operation.
|
||||
Delete(ctx context.Context, path []*URI, opts ...Option) ([]Entity, error)
|
||||
Delete(ctx context.Context, path []*URI, opts ...Option) ([]Entity, *IndexDiff, error)
|
||||
// GetEntitiesFromFileID returns all entities of a given file.
|
||||
GetEntity(ctx context.Context, entityID int) (Entity, error)
|
||||
// UpsertMetadata update or insert metadata of a file.
|
||||
@@ -92,7 +92,7 @@ type (
|
||||
// VersionControl performs version control on given file.
|
||||
// - `delete` is false: set version as current version;
|
||||
// - `delete` is true: delete version.
|
||||
VersionControl(ctx context.Context, path *URI, versionId int, delete bool) error
|
||||
VersionControl(ctx context.Context, path *URI, versionId int, delete bool) (*IndexDiff, error)
|
||||
// GetFileFromDirectLink gets a file from a direct link.
|
||||
GetFileFromDirectLink(ctx context.Context, dl *ent.DirectLink) (File, error)
|
||||
// TraverseFile traverses a file to its root file, return the file with linked root.
|
||||
@@ -108,7 +108,7 @@ type (
|
||||
// CompleteUpload completes an upload session.
|
||||
CompleteUpload(ctx context.Context, session *UploadSession) (File, error)
|
||||
// CancelUploadSession cancels an upload session. Delete the placeholder file if no other entity is created.
|
||||
CancelUploadSession(ctx context.Context, path *URI, sessionID string, session *UploadSession) ([]Entity, error)
|
||||
CancelUploadSession(ctx context.Context, path *URI, sessionID string, session *UploadSession) ([]Entity, *IndexDiff, error)
|
||||
// PreValidateUpload pre-validates an upload request.
|
||||
PreValidateUpload(ctx context.Context, dst *URI, files ...PreValidateFile) error
|
||||
}
|
||||
@@ -808,3 +808,75 @@ func NewEmptyEntity(u *ent.User) Entity {
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
type (
|
||||
// IndexDiff is the difference between the old and new full-text index after file operation.
|
||||
IndexDiff struct {
|
||||
IndexToCopy []IndexDiffCopyDetails
|
||||
IndexToChangeOwner []IndexDiffOwnerChangeDetails
|
||||
IndexToRename []IndexDiffRenameDetails
|
||||
IndexToDelete []int
|
||||
IndexToUpdate []IndexDiffUpdateDetails
|
||||
}
|
||||
IndexDiffUpdateDetails struct {
|
||||
Uri URI
|
||||
FileID int
|
||||
OwnerID int
|
||||
EntityID int
|
||||
}
|
||||
IndexDiffCopyDetails struct {
|
||||
Uri URI
|
||||
OriginalFileID int
|
||||
FileID int
|
||||
OwnerID int
|
||||
EntityID int
|
||||
}
|
||||
IndexDiffOwnerChangeDetails struct {
|
||||
Uri URI
|
||||
FileID int
|
||||
EntityID int
|
||||
OriginalOwnerID int
|
||||
NewOwnerID int
|
||||
}
|
||||
IndexDiffRenameDetails struct {
|
||||
Uri URI
|
||||
FileID int
|
||||
EntityID int
|
||||
}
|
||||
)
|
||||
|
||||
func (i *IndexDiff) Merge(d *IndexDiff) {
|
||||
if i.IndexToCopy == nil {
|
||||
i.IndexToCopy = make([]IndexDiffCopyDetails, 0)
|
||||
}
|
||||
if i.IndexToChangeOwner == nil {
|
||||
i.IndexToChangeOwner = make([]IndexDiffOwnerChangeDetails, 0)
|
||||
}
|
||||
if i.IndexToDelete == nil {
|
||||
i.IndexToDelete = make([]int, 0)
|
||||
}
|
||||
if i.IndexToRename == nil {
|
||||
i.IndexToRename = make([]IndexDiffRenameDetails, 0)
|
||||
}
|
||||
if i.IndexToUpdate == nil {
|
||||
i.IndexToUpdate = make([]IndexDiffUpdateDetails, 0)
|
||||
}
|
||||
if d == nil {
|
||||
return
|
||||
}
|
||||
if len(d.IndexToCopy) > 0 {
|
||||
i.IndexToCopy = append(i.IndexToCopy, d.IndexToCopy...)
|
||||
}
|
||||
if len(d.IndexToChangeOwner) > 0 {
|
||||
i.IndexToChangeOwner = append(i.IndexToChangeOwner, d.IndexToChangeOwner...)
|
||||
}
|
||||
if len(d.IndexToDelete) > 0 {
|
||||
i.IndexToDelete = append(i.IndexToDelete, d.IndexToDelete...)
|
||||
}
|
||||
if len(d.IndexToRename) > 0 {
|
||||
i.IndexToRename = append(i.IndexToRename, d.IndexToRename...)
|
||||
}
|
||||
if len(d.IndexToUpdate) > 0 {
|
||||
i.IndexToUpdate = append(i.IndexToUpdate, d.IndexToUpdate...)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -346,11 +346,15 @@ func (m *manager) GetEntitySource(ctx context.Context, entityID int, opts ...fs.
|
||||
}
|
||||
|
||||
func (l *manager) SetCurrentVersion(ctx context.Context, path *fs.URI, version int) error {
|
||||
return l.fs.VersionControl(ctx, path, version, false)
|
||||
indexDiff, err := l.fs.VersionControl(ctx, path, version, false)
|
||||
|
||||
l.processIndexDiff(ctx, indexDiff)
|
||||
return err
|
||||
}
|
||||
|
||||
func (l *manager) DeleteVersion(ctx context.Context, path *fs.URI, version int) error {
|
||||
return l.fs.VersionControl(ctx, path, version, true)
|
||||
_, err := l.fs.VersionControl(ctx, path, version, true)
|
||||
return err
|
||||
}
|
||||
|
||||
func (l *manager) ListPhysical(ctx context.Context, path string, policyID int, recursive bool, progress driver.ListProgressFunc) ([]fs.PhysicalObject, error) {
|
||||
@@ -392,7 +396,7 @@ func (l *manager) ImportPhysical(ctx context.Context, dst *fs.URI, policyId int,
|
||||
return err
|
||||
}
|
||||
|
||||
l.onNewEntityUploaded(ctx, uploadSession, d)
|
||||
l.onNewEntityUploaded(ctx, uploadSession, d, l.user.ID)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
529
pkg/filemanager/manager/fulltextindex.go
Normal file
529
pkg/filemanager/manager/fulltextindex.go
Normal file
@@ -0,0 +1,529 @@
|
||||
package manager
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
|
||||
"github.com/cloudreve/Cloudreve/v4/application/dependency"
|
||||
"github.com/cloudreve/Cloudreve/v4/ent"
|
||||
"github.com/cloudreve/Cloudreve/v4/ent/task"
|
||||
"github.com/cloudreve/Cloudreve/v4/inventory"
|
||||
"github.com/cloudreve/Cloudreve/v4/inventory/types"
|
||||
"github.com/cloudreve/Cloudreve/v4/pkg/filemanager/fs"
|
||||
"github.com/cloudreve/Cloudreve/v4/pkg/filemanager/fs/dbfs"
|
||||
"github.com/cloudreve/Cloudreve/v4/pkg/hashid"
|
||||
"github.com/cloudreve/Cloudreve/v4/pkg/logging"
|
||||
"github.com/cloudreve/Cloudreve/v4/pkg/queue"
|
||||
"github.com/cloudreve/Cloudreve/v4/pkg/searcher"
|
||||
"github.com/cloudreve/Cloudreve/v4/pkg/util"
|
||||
"github.com/samber/lo"
|
||||
)
|
||||
|
||||
type (
|
||||
FullTextIndexTask struct {
|
||||
*queue.DBTask
|
||||
}
|
||||
|
||||
FullTextIndexTaskState struct {
|
||||
Uri *fs.URI `json:"uri"`
|
||||
EntityID int `json:"entity_id"`
|
||||
FileID int `json:"file_id"`
|
||||
OwnerID int `json:"owner_id"`
|
||||
}
|
||||
|
||||
ftsFileInfo struct {
|
||||
FileID int
|
||||
OwnerID int
|
||||
EntityID int
|
||||
FileName string
|
||||
}
|
||||
)
|
||||
|
||||
func (m *manager) SearchFullText(ctx context.Context, query string, offset int) (*FullTextSearchResults, error) {
|
||||
indexer := m.dep.SearchIndexer(ctx)
|
||||
results, total, err := indexer.Search(ctx, m.user.ID, query, offset)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to search full text: %w", err)
|
||||
}
|
||||
|
||||
if len(results) == 0 {
|
||||
// No results.
|
||||
return &FullTextSearchResults{}, nil
|
||||
}
|
||||
|
||||
// Traverse each file in result
|
||||
files := lo.FilterMap(results, func(result searcher.SearchResult, _ int) (FullTextSearchResult, bool) {
|
||||
file, err := m.TraverseFile(ctx, result.FileID)
|
||||
if err != nil {
|
||||
m.l.Debug("Failed to traverse file %d for full text search: %s, skipping.", result.FileID, err)
|
||||
return FullTextSearchResult{}, false
|
||||
}
|
||||
|
||||
return FullTextSearchResult{
|
||||
File: file,
|
||||
Content: result.Text,
|
||||
}, true
|
||||
})
|
||||
|
||||
if len(files) == 0 {
|
||||
// No valid files, run next offset
|
||||
return m.SearchFullText(ctx, query, offset+len(results))
|
||||
}
|
||||
|
||||
return &FullTextSearchResults{
|
||||
Hits: files,
|
||||
Total: total,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func init() {
|
||||
queue.RegisterResumableTaskFactory(queue.FullTextIndexTaskType, NewFullTextIndexTaskFromModel)
|
||||
queue.RegisterResumableTaskFactory(queue.FullTextCopyTaskType, NewFullTextCopyTaskFromModel)
|
||||
queue.RegisterResumableTaskFactory(queue.FullTextChangeOwnerTaskType, NewFullTextChangeOwnerTaskFromModel)
|
||||
queue.RegisterResumableTaskFactory(queue.FullTextDeleteTaskType, NewFullTextDeleteTaskFromModel)
|
||||
}
|
||||
|
||||
func NewFullTextIndexTask(ctx context.Context, uri *fs.URI, entityID, fileID, ownerID int, creator *ent.User) (*FullTextIndexTask, error) {
|
||||
state := &FullTextIndexTaskState{
|
||||
Uri: uri,
|
||||
EntityID: entityID,
|
||||
FileID: fileID,
|
||||
OwnerID: ownerID,
|
||||
}
|
||||
stateBytes, err := json.Marshal(state)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to marshal state: %w", err)
|
||||
}
|
||||
|
||||
return &FullTextIndexTask{
|
||||
DBTask: &queue.DBTask{
|
||||
DirectOwner: creator,
|
||||
Task: &ent.Task{
|
||||
Type: queue.FullTextIndexTaskType,
|
||||
CorrelationID: logging.CorrelationID(ctx),
|
||||
PrivateState: string(stateBytes),
|
||||
PublicState: &types.TaskPublicState{},
|
||||
},
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
func NewFullTextIndexTaskFromModel(t *ent.Task) queue.Task {
|
||||
return &FullTextIndexTask{
|
||||
DBTask: &queue.DBTask{
|
||||
Task: t,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
type (
|
||||
FullTextCopyTask struct {
|
||||
*queue.DBTask
|
||||
}
|
||||
|
||||
FullTextCopyTaskState struct {
|
||||
Uri *fs.URI `json:"uri"`
|
||||
OriginalFileID int `json:"original_file_id"`
|
||||
FileID int `json:"file_id"`
|
||||
OwnerID int `json:"owner_id"`
|
||||
EntityID int `json:"entity_id"`
|
||||
}
|
||||
)
|
||||
|
||||
func NewFullTextCopyTask(ctx context.Context, uri *fs.URI, originalFileID, fileID, ownerID, entityID int, creator *ent.User) (*FullTextCopyTask, error) {
|
||||
state := &FullTextCopyTaskState{
|
||||
Uri: uri,
|
||||
OriginalFileID: originalFileID,
|
||||
FileID: fileID,
|
||||
OwnerID: ownerID,
|
||||
EntityID: entityID,
|
||||
}
|
||||
stateBytes, err := json.Marshal(state)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to marshal state: %w", err)
|
||||
}
|
||||
|
||||
return &FullTextCopyTask{
|
||||
DBTask: &queue.DBTask{
|
||||
DirectOwner: creator,
|
||||
Task: &ent.Task{
|
||||
Type: queue.FullTextCopyTaskType,
|
||||
CorrelationID: logging.CorrelationID(ctx),
|
||||
PrivateState: string(stateBytes),
|
||||
PublicState: &types.TaskPublicState{},
|
||||
},
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
func NewFullTextCopyTaskFromModel(t *ent.Task) queue.Task {
|
||||
return &FullTextCopyTask{
|
||||
DBTask: &queue.DBTask{
|
||||
Task: t,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (t *FullTextCopyTask) Do(ctx context.Context) (task.Status, error) {
|
||||
dep := dependency.FromContext(ctx)
|
||||
l := dep.Logger()
|
||||
fm := NewFileManager(dep, inventory.UserFromContext(ctx)).(*manager)
|
||||
|
||||
if !fm.settings.FTSEnabled(ctx) {
|
||||
l.Debug("FTS disabled, skipping full text copy task.")
|
||||
return task.StatusCompleted, nil
|
||||
}
|
||||
|
||||
var state FullTextCopyTaskState
|
||||
if err := json.Unmarshal([]byte(t.State()), &state); err != nil {
|
||||
return task.StatusError, fmt.Errorf("failed to unmarshal state: %s (%w)", err, queue.CriticalErr)
|
||||
}
|
||||
|
||||
// Get fresh file to make sure task is not stale.
|
||||
file, err := fm.Get(ctx, state.Uri, dbfs.WithFilePublicMetadata())
|
||||
if err != nil {
|
||||
return task.StatusError, fmt.Errorf("failed to get latest file: %w", err)
|
||||
}
|
||||
|
||||
if file.PrimaryEntityID() != state.EntityID {
|
||||
l.Debug("File %d entity changed, skipping copy index.", state.FileID)
|
||||
return task.StatusCompleted, nil
|
||||
}
|
||||
|
||||
indexer := dep.SearchIndexer(ctx)
|
||||
if err := indexer.CopyByFileID(ctx, state.OriginalFileID, state.FileID, state.OwnerID, state.EntityID); err != nil {
|
||||
l.Warning("Failed to copy index from file %d to %d, falling back to full indexing: %s", state.OriginalFileID, state.FileID, err)
|
||||
return performIndexing(ctx, fm, state.Uri, state.EntityID, state.FileID, state.OwnerID, file.Name(), false)
|
||||
}
|
||||
|
||||
// Patch metadata to mark file as indexed.
|
||||
if err := fm.fs.PatchMetadata(ctx, []*fs.URI{state.Uri}, fs.MetadataPatch{
|
||||
Key: dbfs.FullTextIndexKey,
|
||||
Value: hashid.EncodeEntityID(fm.hasher, state.EntityID),
|
||||
}); err != nil {
|
||||
return task.StatusError, fmt.Errorf("failed to patch metadata: %w", err)
|
||||
}
|
||||
|
||||
l.Debug("Successfully copied index from file %d to %d.", state.OriginalFileID, state.FileID)
|
||||
return task.StatusCompleted, nil
|
||||
}
|
||||
|
||||
type (
|
||||
FullTextChangeOwnerTask struct {
|
||||
*queue.DBTask
|
||||
}
|
||||
|
||||
FullTextChangeOwnerTaskState struct {
|
||||
Uri *fs.URI `json:"uri"`
|
||||
EntityID int `json:"entity_id"`
|
||||
FileID int `json:"file_id"`
|
||||
OriginalOwnerID int `json:"original_owner_id"`
|
||||
NewOwnerID int `json:"new_owner_id"`
|
||||
}
|
||||
)
|
||||
|
||||
func NewFullTextChangeOwnerTask(ctx context.Context, uri *fs.URI, entityID, fileID, originalOwnerID, newOwnerID int, creator *ent.User) (*FullTextChangeOwnerTask, error) {
|
||||
state := &FullTextChangeOwnerTaskState{
|
||||
Uri: uri,
|
||||
EntityID: entityID,
|
||||
FileID: fileID,
|
||||
OriginalOwnerID: originalOwnerID,
|
||||
NewOwnerID: newOwnerID,
|
||||
}
|
||||
stateBytes, err := json.Marshal(state)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to marshal state: %w", err)
|
||||
}
|
||||
|
||||
return &FullTextChangeOwnerTask{
|
||||
DBTask: &queue.DBTask{
|
||||
DirectOwner: creator,
|
||||
Task: &ent.Task{
|
||||
Type: queue.FullTextChangeOwnerTaskType,
|
||||
CorrelationID: logging.CorrelationID(ctx),
|
||||
PrivateState: string(stateBytes),
|
||||
PublicState: &types.TaskPublicState{},
|
||||
},
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
func NewFullTextChangeOwnerTaskFromModel(t *ent.Task) queue.Task {
|
||||
return &FullTextChangeOwnerTask{
|
||||
DBTask: &queue.DBTask{
|
||||
Task: t,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (t *FullTextChangeOwnerTask) Do(ctx context.Context) (task.Status, error) {
|
||||
dep := dependency.FromContext(ctx)
|
||||
l := dep.Logger()
|
||||
fm := NewFileManager(dep, inventory.UserFromContext(ctx)).(*manager)
|
||||
|
||||
if !fm.settings.FTSEnabled(ctx) {
|
||||
l.Debug("FTS disabled, skipping full text change owner task.")
|
||||
return task.StatusCompleted, nil
|
||||
}
|
||||
|
||||
var state FullTextChangeOwnerTaskState
|
||||
if err := json.Unmarshal([]byte(t.State()), &state); err != nil {
|
||||
return task.StatusError, fmt.Errorf("failed to unmarshal state: %s (%w)", err, queue.CriticalErr)
|
||||
}
|
||||
|
||||
// Get fresh file to make sure task is not stale.
|
||||
file, err := fm.Get(ctx, state.Uri, dbfs.WithFilePublicMetadata())
|
||||
if err != nil {
|
||||
return task.StatusError, fmt.Errorf("failed to get latest file: %w", err)
|
||||
}
|
||||
|
||||
if file.PrimaryEntityID() != state.EntityID {
|
||||
l.Debug("File %d entity changed, skipping owner change.", state.FileID)
|
||||
return task.StatusCompleted, nil
|
||||
}
|
||||
|
||||
indexer := dep.SearchIndexer(ctx)
|
||||
if err := indexer.ChangeOwner(ctx, state.FileID, state.OriginalOwnerID, state.NewOwnerID); err != nil {
|
||||
return task.StatusError, fmt.Errorf("failed to change owner for file %d: %w", state.FileID, err)
|
||||
}
|
||||
|
||||
l.Debug("Successfully changed index owner for file %d from %d to %d.", state.FileID, state.OriginalOwnerID, state.NewOwnerID)
|
||||
return task.StatusCompleted, nil
|
||||
}
|
||||
|
||||
type (
|
||||
FullTextDeleteTask struct {
|
||||
*queue.DBTask
|
||||
}
|
||||
|
||||
FullTextDeleteTaskState struct {
|
||||
FileIDs []int `json:"file_ids"`
|
||||
}
|
||||
)
|
||||
|
||||
func NewFullTextDeleteTask(ctx context.Context, fileIDs []int, creator *ent.User) (*FullTextDeleteTask, error) {
|
||||
state := &FullTextDeleteTaskState{
|
||||
FileIDs: fileIDs,
|
||||
}
|
||||
stateBytes, err := json.Marshal(state)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to marshal state: %w", err)
|
||||
}
|
||||
|
||||
return &FullTextDeleteTask{
|
||||
DBTask: &queue.DBTask{
|
||||
DirectOwner: creator,
|
||||
Task: &ent.Task{
|
||||
Type: queue.FullTextDeleteTaskType,
|
||||
CorrelationID: logging.CorrelationID(ctx),
|
||||
PrivateState: string(stateBytes),
|
||||
PublicState: &types.TaskPublicState{},
|
||||
},
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
func NewFullTextDeleteTaskFromModel(t *ent.Task) queue.Task {
|
||||
return &FullTextDeleteTask{
|
||||
DBTask: &queue.DBTask{
|
||||
Task: t,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (t *FullTextDeleteTask) Do(ctx context.Context) (task.Status, error) {
|
||||
dep := dependency.FromContext(ctx)
|
||||
l := dep.Logger()
|
||||
|
||||
var state FullTextDeleteTaskState
|
||||
if err := json.Unmarshal([]byte(t.State()), &state); err != nil {
|
||||
return task.StatusError, fmt.Errorf("failed to unmarshal state: %s (%w)", err, queue.CriticalErr)
|
||||
}
|
||||
|
||||
indexer := dep.SearchIndexer(ctx)
|
||||
if err := indexer.DeleteByFileIDs(ctx, state.FileIDs...); err != nil {
|
||||
return task.StatusError, fmt.Errorf("failed to delete index for %d file(s): %w", len(state.FileIDs), err)
|
||||
}
|
||||
|
||||
l.Debug("Successfully deleted index for %d file(s).", len(state.FileIDs))
|
||||
return task.StatusCompleted, nil
|
||||
}
|
||||
|
||||
func (t *FullTextIndexTask) Do(ctx context.Context) (task.Status, error) {
|
||||
dep := dependency.FromContext(ctx)
|
||||
l := dep.Logger()
|
||||
fm := NewFileManager(dep, inventory.UserFromContext(ctx)).(*manager)
|
||||
|
||||
// Check FTS enabled
|
||||
if !fm.settings.FTSEnabled(ctx) {
|
||||
l.Debug("FTS disabled, skipping full text index task.")
|
||||
return task.StatusCompleted, nil
|
||||
}
|
||||
|
||||
// Unmarshal state
|
||||
var state FullTextIndexTaskState
|
||||
if err := json.Unmarshal([]byte(t.State()), &state); err != nil {
|
||||
return task.StatusError, fmt.Errorf("failed to unmarshal state: %s (%w)", err, queue.CriticalErr)
|
||||
}
|
||||
|
||||
// Get fresh file to make sure task is not stale
|
||||
file, err := fm.Get(ctx, state.Uri, dbfs.WithFilePublicMetadata())
|
||||
if err != nil {
|
||||
return task.StatusError, fmt.Errorf("failed to get latest file: %w", err)
|
||||
}
|
||||
|
||||
if file.PrimaryEntityID() != state.EntityID {
|
||||
l.Debug("File %d is not the latest version, skipping indexing.", state.FileID)
|
||||
return task.StatusCompleted, nil
|
||||
}
|
||||
|
||||
deleteOldChunks := false
|
||||
if _, ok := file.Metadata()[dbfs.FullTextIndexKey]; ok {
|
||||
deleteOldChunks = true
|
||||
}
|
||||
|
||||
return performIndexing(ctx, fm, state.Uri, state.EntityID, state.FileID, state.OwnerID, state.Uri.Name(), deleteOldChunks)
|
||||
}
|
||||
|
||||
// performIndexing extracts text from the entity and indexes it. This is shared between
|
||||
// the regular index task and the copy task (as a fallback when copy fails).
|
||||
func performIndexing(ctx context.Context, fm *manager, uri *fs.URI, entityID, fileID, ownerID int, fileName string, deleteOldChunks bool) (task.Status, error) {
|
||||
dep := fm.dep
|
||||
l := dep.Logger()
|
||||
|
||||
// Get entity source
|
||||
source, err := fm.GetEntitySource(ctx, entityID)
|
||||
if err != nil {
|
||||
return task.StatusError, fmt.Errorf("failed to get entity source: %w", err)
|
||||
}
|
||||
defer source.Close()
|
||||
|
||||
// Extract text
|
||||
var text string
|
||||
if source.Entity().Size() > 0 {
|
||||
extractor := dep.TextExtractor(ctx)
|
||||
text, err = extractor.Extract(ctx, source)
|
||||
if err != nil {
|
||||
l.Warning("Failed to extract text for file %d: %s", fileID, err)
|
||||
return task.StatusCompleted, nil
|
||||
}
|
||||
}
|
||||
|
||||
indexer := dep.SearchIndexer(ctx)
|
||||
|
||||
// Delete old chunks first so that stale chunks from a previously longer
|
||||
// version of the file are removed before upserting the new (possibly fewer)
|
||||
// chunks.
|
||||
if deleteOldChunks {
|
||||
if err := indexer.DeleteByFileIDs(ctx, fileID); err != nil {
|
||||
l.Warning("Failed to delete old index chunks for file %d: %s", fileID, err)
|
||||
}
|
||||
}
|
||||
|
||||
// Index via SearchIndexer
|
||||
if err := indexer.IndexFile(ctx, ownerID, fileID, entityID, fileName, text); err != nil {
|
||||
return task.StatusError, fmt.Errorf("failed to index file %d: %w", fileID, err)
|
||||
}
|
||||
|
||||
// Upsert metadata
|
||||
if err := fm.fs.PatchMetadata(ctx, []*fs.URI{uri}, fs.MetadataPatch{
|
||||
Key: dbfs.FullTextIndexKey,
|
||||
Value: hashid.EncodeEntityID(fm.hasher, entityID),
|
||||
}); err != nil {
|
||||
return task.StatusError, fmt.Errorf("failed to patch metadata: %w", err)
|
||||
}
|
||||
|
||||
l.Debug("Successfully indexed file %d for owner %d.", fileID, ownerID)
|
||||
return task.StatusCompleted, nil
|
||||
}
|
||||
|
||||
// shouldIndexFullText checks if a file should be indexed for full-text search.
|
||||
func (m *manager) shouldIndexFullText(ctx context.Context, fileName string, size int64) bool {
|
||||
if !m.settings.FTSEnabled(ctx) {
|
||||
return false
|
||||
}
|
||||
|
||||
extractor := m.dep.TextExtractor(ctx)
|
||||
return util.IsInExtensionList(extractor.Exts(), fileName) && extractor.MaxFileSize() > size
|
||||
}
|
||||
|
||||
// fullTextIndexForNewEntity creates and queues a full text index task for a newly uploaded entity.
|
||||
func (m *manager) fullTextIndexForNewEntity(ctx context.Context, session *fs.UploadSession, owner int) {
|
||||
if session.Props.EntityType != nil && *session.Props.EntityType != types.EntityTypeVersion {
|
||||
return
|
||||
}
|
||||
|
||||
if !m.shouldIndexFullText(ctx, session.Props.Uri.Name(), session.Props.Size) {
|
||||
return
|
||||
}
|
||||
|
||||
t, err := NewFullTextIndexTask(ctx, session.Props.Uri, session.EntityID, session.FileID, owner, m.user)
|
||||
if err != nil {
|
||||
m.l.Warning("Failed to create full text index task: %s", err)
|
||||
return
|
||||
}
|
||||
if err := m.dep.MediaMetaQueue(ctx).QueueTask(ctx, t); err != nil {
|
||||
m.l.Warning("Failed to queue full text index task: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (m *manager) processIndexDiff(ctx context.Context, diff *fs.IndexDiff) {
|
||||
if diff == nil {
|
||||
return
|
||||
}
|
||||
|
||||
for _, update := range diff.IndexToUpdate {
|
||||
t, err := NewFullTextIndexTask(ctx, &update.Uri, update.EntityID, update.FileID, update.OwnerID, m.user)
|
||||
if err != nil {
|
||||
m.l.Warning("Failed to create full text update task: %s", err)
|
||||
continue
|
||||
}
|
||||
if err := m.dep.MediaMetaQueue(ctx).QueueTask(ctx, t); err != nil {
|
||||
m.l.Warning("Failed to queue full text update task: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
for _, cp := range diff.IndexToCopy {
|
||||
t, err := NewFullTextCopyTask(ctx, &cp.Uri, cp.OriginalFileID, cp.FileID, cp.OwnerID, cp.EntityID, m.user)
|
||||
if err != nil {
|
||||
m.l.Warning("Failed to create full text copy task: %s", err)
|
||||
continue
|
||||
}
|
||||
if err := m.dep.MediaMetaQueue(ctx).QueueTask(ctx, t); err != nil {
|
||||
m.l.Warning("Failed to queue full text copy task: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
for _, change := range diff.IndexToChangeOwner {
|
||||
t, err := NewFullTextChangeOwnerTask(ctx, &change.Uri, change.EntityID, change.FileID, change.OriginalOwnerID, change.NewOwnerID, m.user)
|
||||
if err != nil {
|
||||
m.l.Warning("Failed to create full text change owner task: %s", err)
|
||||
continue
|
||||
}
|
||||
if err := m.dep.MediaMetaQueue(ctx).QueueTask(ctx, t); err != nil {
|
||||
m.l.Warning("Failed to queue full text change owner task: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
if len(diff.IndexToDelete) > 0 && m.dep.SettingProvider().FTSEnabled(ctx) {
|
||||
t, err := NewFullTextDeleteTask(ctx, diff.IndexToDelete, m.user)
|
||||
if err != nil {
|
||||
m.l.Warning("Failed to create full text delete task: %s", err)
|
||||
return
|
||||
}
|
||||
if err := m.dep.MediaMetaQueue(ctx).QueueTask(ctx, t); err != nil {
|
||||
m.l.Warning("Failed to queue full text delete task: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
ctx = context.WithoutCancel(ctx)
|
||||
indexer := m.dep.SearchIndexer(ctx)
|
||||
go func() {
|
||||
for _, rename := range diff.IndexToRename {
|
||||
if err := indexer.Rename(ctx, rename.FileID, rename.EntityID, rename.Uri.Name()); err != nil {
|
||||
m.l.Warning("Failed to rename index for file %d: %s", rename.FileID, err)
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
@@ -57,6 +57,8 @@ type (
|
||||
CreateViewerSession(ctx context.Context, uri *fs.URI, version string, viewer *types.Viewer) (*ViewerSession, error)
|
||||
// TraverseFile traverses a file to its root file, return the file with linked root.
|
||||
TraverseFile(ctx context.Context, fileID int) (fs.File, error)
|
||||
// SearchFullText searches full text for given query and offset
|
||||
SearchFullText(ctx context.Context, query string, offset int) (*FullTextSearchResults, error)
|
||||
}
|
||||
|
||||
FsManagement interface {
|
||||
@@ -120,6 +122,16 @@ type (
|
||||
ShareView bool
|
||||
ShowReadMe bool
|
||||
}
|
||||
|
||||
FullTextSearchResults struct {
|
||||
Hits []FullTextSearchResult
|
||||
Total int64
|
||||
}
|
||||
|
||||
FullTextSearchResult struct {
|
||||
File fs.File
|
||||
Content string
|
||||
}
|
||||
)
|
||||
|
||||
type manager struct {
|
||||
|
||||
@@ -138,11 +138,19 @@ func (m *manager) Create(ctx context.Context, path *fs.URI, fileType types.FileT
|
||||
}
|
||||
|
||||
func (m *manager) Rename(ctx context.Context, path *fs.URI, newName string) (fs.File, error) {
|
||||
return m.fs.Rename(ctx, path, newName)
|
||||
file, indexDiff, err := m.fs.Rename(ctx, path, newName)
|
||||
m.processIndexDiff(ctx, indexDiff)
|
||||
return file, err
|
||||
}
|
||||
|
||||
func (m *manager) MoveOrCopy(ctx context.Context, src []*fs.URI, dst *fs.URI, isCopy bool) error {
|
||||
return m.fs.MoveOrCopy(ctx, src, dst, isCopy)
|
||||
indexDiff, err := m.fs.MoveOrCopy(ctx, src, dst, isCopy)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
m.processIndexDiff(ctx, indexDiff)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *manager) SoftDelete(ctx context.Context, path ...*fs.URI) error {
|
||||
@@ -159,7 +167,7 @@ func (m *manager) Delete(ctx context.Context, path []*fs.URI, opts ...fs.Option)
|
||||
return m.SoftDelete(ctx, path...)
|
||||
}
|
||||
|
||||
staleEntities, err := m.fs.Delete(ctx, path, fs.WithUnlinkOnly(o.UnlinkOnly), fs.WithSysSkipSoftDelete(o.SysSkipSoftDelete))
|
||||
staleEntities, indexDiff, err := m.fs.Delete(ctx, path, fs.WithUnlinkOnly(o.UnlinkOnly), fs.WithSysSkipSoftDelete(o.SysSkipSoftDelete))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -179,6 +187,12 @@ func (m *manager) Delete(ctx context.Context, path []*fs.URI, opts ...fs.Option)
|
||||
return fmt.Errorf("failed to queue explicit entity recycle task: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Process index diff
|
||||
if indexDiff != nil {
|
||||
m.processIndexDiff(ctx, indexDiff)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -282,7 +282,6 @@ func (m *manager) RecycleEntities(ctx context.Context, force bool, entityIDs ...
|
||||
if err := inventory.CommitWithStorageDiff(ctx, tx, m.l, m.dep.UserClient()); err != nil {
|
||||
return fmt.Errorf("failed to commit delete change: %w", err)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -231,11 +231,12 @@ func (m *manager) CancelUploadSession(ctx context.Context, path *fs.URI, session
|
||||
|
||||
var (
|
||||
staleEntities []fs.Entity
|
||||
indexDiff *fs.IndexDiff
|
||||
err error
|
||||
)
|
||||
|
||||
if !m.stateless {
|
||||
staleEntities, err = m.fs.CancelUploadSession(ctx, path, sessionID, session)
|
||||
staleEntities, indexDiff, err = m.fs.CancelUploadSession(ctx, path, sessionID, session)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -277,6 +278,11 @@ func (m *manager) CancelUploadSession(ctx context.Context, path *fs.URI, session
|
||||
}
|
||||
}
|
||||
|
||||
// Process index diff
|
||||
if indexDiff != nil {
|
||||
m.processIndexDiff(ctx, indexDiff)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -308,7 +314,7 @@ func (m *manager) CompleteUpload(ctx context.Context, session *fs.UploadSession)
|
||||
}
|
||||
}
|
||||
|
||||
m.onNewEntityUploaded(ctx, session, d)
|
||||
m.onNewEntityUploaded(ctx, session, d, file.OwnerID())
|
||||
// Remove upload session
|
||||
_ = m.kv.Delete(UploadSessionCachePrefix, session.Props.UploadSessionID)
|
||||
return file, nil
|
||||
@@ -371,7 +377,7 @@ func (m *manager) OnUploadFailed(ctx context.Context, session *fs.UploadSession)
|
||||
m.l.Warning("OnUploadFailed hook failed to delete file: %s", err)
|
||||
}
|
||||
} else if !session.Importing {
|
||||
if err := m.fs.VersionControl(ctx, session.Props.Uri, session.EntityID, true); err != nil {
|
||||
if _, err := m.fs.VersionControl(ctx, session.Props.Uri, session.EntityID, true); err != nil {
|
||||
m.l.Warning("OnUploadFailed hook failed to version control: %s", err)
|
||||
}
|
||||
}
|
||||
@@ -426,10 +432,12 @@ func (m *manager) updateStateless(ctx context.Context, req *fs.UploadRequest, o
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (m *manager) onNewEntityUploaded(ctx context.Context, session *fs.UploadSession, d driver.Handler) {
|
||||
func (m *manager) onNewEntityUploaded(ctx context.Context, session *fs.UploadSession, d driver.Handler, owner int) {
|
||||
if !m.stateless {
|
||||
// Submit media meta task for new entity
|
||||
m.mediaMetaForNewEntity(ctx, session, d)
|
||||
// Submit full text index task for new entity
|
||||
m.fullTextIndexForNewEntity(ctx, session, owner)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -105,6 +105,11 @@ const (
|
||||
RemoteDownloadTaskType = "remote_download"
|
||||
ImportTaskType = "import"
|
||||
|
||||
FullTextIndexTaskType = "full_text_index"
|
||||
FullTextCopyTaskType = "full_text_copy"
|
||||
FullTextChangeOwnerTaskType = "full_text_change_owner"
|
||||
FullTextDeleteTaskType = "full_text_delete"
|
||||
|
||||
SlaveCreateArchiveTaskType = "slave_create_archive"
|
||||
SlaveUploadTaskType = "slave_upload"
|
||||
SlaveExtractArchiveType = "slave_extract_archive"
|
||||
|
||||
15
pkg/searcher/extractor/noop.go
Normal file
15
pkg/searcher/extractor/noop.go
Normal file
@@ -0,0 +1,15 @@
|
||||
package extractor
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
)
|
||||
|
||||
// NoopExtractor is a no-op implementation of TextExtractor, used when text extraction is disabled.
|
||||
type NoopExtractor struct{}
|
||||
|
||||
func (n *NoopExtractor) Exts() []string { return nil }
|
||||
func (n *NoopExtractor) MaxFileSize() int64 { return 0 }
|
||||
func (n *NoopExtractor) Extract(ctx context.Context, reader io.Reader) (string, error) {
|
||||
return "", nil
|
||||
}
|
||||
82
pkg/searcher/extractor/tika.go
Normal file
82
pkg/searcher/extractor/tika.go
Normal file
@@ -0,0 +1,82 @@
|
||||
package extractor
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"strings"
|
||||
|
||||
"github.com/cloudreve/Cloudreve/v4/pkg/logging"
|
||||
"github.com/cloudreve/Cloudreve/v4/pkg/request"
|
||||
"github.com/cloudreve/Cloudreve/v4/pkg/setting"
|
||||
)
|
||||
|
||||
// TikaExtractor extracts text from documents using Apache Tika.
|
||||
type TikaExtractor struct {
|
||||
client request.Client
|
||||
settings setting.Provider
|
||||
l logging.Logger
|
||||
exts []string
|
||||
maxFileSize int64
|
||||
}
|
||||
|
||||
// NewTikaExtractor creates a new TikaExtractor.
|
||||
func NewTikaExtractor(client request.Client, settings setting.Provider, l logging.Logger, cfg *setting.FTSTikaExtractorSetting) *TikaExtractor {
|
||||
exts := cfg.Exts
|
||||
return &TikaExtractor{
|
||||
client: client,
|
||||
settings: settings,
|
||||
l: l,
|
||||
exts: exts,
|
||||
maxFileSize: cfg.MaxFileSize,
|
||||
}
|
||||
}
|
||||
|
||||
// Exts returns the list of supported file extensions.
|
||||
func (t *TikaExtractor) Exts() []string {
|
||||
return t.exts
|
||||
}
|
||||
|
||||
// MaxFileSize returns the maximum file size for text extraction.
|
||||
func (t *TikaExtractor) MaxFileSize() int64 {
|
||||
return t.maxFileSize
|
||||
}
|
||||
|
||||
// Extract sends the document to Tika and returns the extracted plain text.
|
||||
func (t *TikaExtractor) Extract(ctx context.Context, reader io.Reader) (string, error) {
|
||||
tikaCfg := t.settings.FTSTikaExtractor(ctx)
|
||||
if tikaCfg.Endpoint == "" {
|
||||
return "", fmt.Errorf("tika endpoint not configured")
|
||||
}
|
||||
|
||||
endpoint := strings.TrimRight(tikaCfg.Endpoint, "/") + "/tika"
|
||||
resp := t.client.Request(
|
||||
"PUT",
|
||||
endpoint,
|
||||
reader,
|
||||
request.WithHeader(map[string][]string{
|
||||
"Accept": {"text/plain"},
|
||||
}),
|
||||
)
|
||||
if resp.Err != nil {
|
||||
return "", fmt.Errorf("tika request failed: %w", resp.Err)
|
||||
}
|
||||
defer resp.Response.Body.Close()
|
||||
|
||||
if resp.Response.StatusCode != 200 {
|
||||
return "", fmt.Errorf("tika returned status %d", resp.Response.StatusCode)
|
||||
}
|
||||
|
||||
maxSize := tikaCfg.MaxResponseSize
|
||||
if maxSize <= 0 {
|
||||
maxSize = 10 * 1024 * 1024 // default 10MB
|
||||
}
|
||||
|
||||
limited := io.LimitReader(resp.Response.Body, maxSize)
|
||||
body, err := io.ReadAll(limited)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to read tika response: %w", err)
|
||||
}
|
||||
|
||||
return strings.TrimSpace(string(body)), nil
|
||||
}
|
||||
49
pkg/searcher/indexer.go
Normal file
49
pkg/searcher/indexer.go
Normal file
@@ -0,0 +1,49 @@
|
||||
package searcher
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
)
|
||||
|
||||
type SearchDocument struct {
|
||||
ID string `json:"id"`
|
||||
FileID int `json:"file_id"`
|
||||
OwnerID int `json:"owner_id"`
|
||||
EntityID int `json:"entity_id"`
|
||||
ChunkIdx int `json:"chunk_idx"`
|
||||
FileName string `json:"file_name"`
|
||||
Text string `json:"text"`
|
||||
Formated *FormatedHit `json:"_formatted,omitempty"`
|
||||
}
|
||||
|
||||
type FormatedHit struct {
|
||||
Text string `json:"text"`
|
||||
}
|
||||
|
||||
type SearchResult struct {
|
||||
FileID int `json:"file_id"`
|
||||
OwnerID int `json:"owner_id"`
|
||||
EntityID int `json:"entity_id"`
|
||||
FileName string `json:"file_name"`
|
||||
Text string `json:"text"`
|
||||
}
|
||||
|
||||
type SearchIndexer interface {
|
||||
IndexFile(ctx context.Context, ownerID, fileID, entityID int, fileName, text string) error
|
||||
DeleteByFileIDs(ctx context.Context, fileID ...int) error
|
||||
ChangeOwner(ctx context.Context, fileID, oldOwnerID, newOwnerID int) error
|
||||
CopyByFileID(ctx context.Context, srcFileID, dstFileID, dstOwnerID, dstEntityID int) error
|
||||
Rename(ctx context.Context, fileID, entityID int, newFileName string) error
|
||||
Search(ctx context.Context, ownerID int, query string, offset int) ([]SearchResult, int64, error)
|
||||
// IndexReady reports whether the search index exists and has the required
|
||||
// configuration (filterable/searchable attributes, etc.).
|
||||
IndexReady(ctx context.Context) (bool, error)
|
||||
EnsureIndex(ctx context.Context) error
|
||||
Close() error
|
||||
}
|
||||
|
||||
type TextExtractor interface {
|
||||
Exts() []string
|
||||
MaxFileSize() int64
|
||||
Extract(ctx context.Context, reader io.Reader) (string, error)
|
||||
}
|
||||
101
pkg/searcher/indexer/chunker.go
Normal file
101
pkg/searcher/indexer/chunker.go
Normal file
@@ -0,0 +1,101 @@
|
||||
package indexer
|
||||
|
||||
import "strings"
|
||||
|
||||
const defaultMaxBytes = 2000
|
||||
|
||||
// ChunkText splits text into chunks of approximately maxBytes bytes each.
|
||||
// It splits on paragraph breaks (\n\n), combines small paragraphs until the
|
||||
// byte limit is reached, and splits large paragraphs at word boundaries.
|
||||
func ChunkText(text string, maxBytes int) []string {
|
||||
if maxBytes <= 0 {
|
||||
maxBytes = defaultMaxBytes
|
||||
}
|
||||
|
||||
text = strings.TrimSpace(text)
|
||||
if text == "" {
|
||||
return nil
|
||||
}
|
||||
|
||||
paragraphs := strings.Split(text, "\n\n")
|
||||
var chunks []string
|
||||
var current []string
|
||||
currentBytes := 0
|
||||
|
||||
for _, para := range paragraphs {
|
||||
para = strings.TrimSpace(para)
|
||||
if para == "" {
|
||||
continue
|
||||
}
|
||||
|
||||
paraBytes := len(para)
|
||||
|
||||
// If a single paragraph exceeds maxBytes, split it at word boundaries
|
||||
if paraBytes > maxBytes {
|
||||
// Flush accumulated content first
|
||||
if currentBytes > 0 {
|
||||
chunks = append(chunks, strings.Join(current, "\n\n"))
|
||||
current = nil
|
||||
currentBytes = 0
|
||||
}
|
||||
chunks = append(chunks, splitByBytes(para, maxBytes)...)
|
||||
continue
|
||||
}
|
||||
|
||||
// If adding this paragraph (plus separator) would exceed the limit, flush
|
||||
joinerLen := 0
|
||||
if currentBytes > 0 {
|
||||
joinerLen = 2 // "\n\n"
|
||||
}
|
||||
if currentBytes+joinerLen+paraBytes > maxBytes && currentBytes > 0 {
|
||||
chunks = append(chunks, strings.Join(current, "\n\n"))
|
||||
current = nil
|
||||
currentBytes = 0
|
||||
}
|
||||
|
||||
if currentBytes > 0 {
|
||||
currentBytes += 2 // account for "\n\n" joiner
|
||||
}
|
||||
current = append(current, para)
|
||||
currentBytes += paraBytes
|
||||
}
|
||||
|
||||
// Flush remaining
|
||||
if currentBytes > 0 {
|
||||
chunks = append(chunks, strings.Join(current, "\n\n"))
|
||||
}
|
||||
|
||||
return chunks
|
||||
}
|
||||
|
||||
// splitByBytes splits text into chunks at word boundaries, each at most maxBytes bytes.
|
||||
func splitByBytes(text string, maxBytes int) []string {
|
||||
words := strings.Fields(text)
|
||||
var chunks []string
|
||||
var current []string
|
||||
currentBytes := 0
|
||||
|
||||
for _, w := range words {
|
||||
wLen := len(w)
|
||||
spaceLen := 0
|
||||
if currentBytes > 0 {
|
||||
spaceLen = 1
|
||||
}
|
||||
|
||||
if currentBytes+spaceLen+wLen > maxBytes && currentBytes > 0 {
|
||||
chunks = append(chunks, strings.Join(current, " "))
|
||||
current = nil
|
||||
currentBytes = 0
|
||||
spaceLen = 0
|
||||
}
|
||||
|
||||
current = append(current, w)
|
||||
currentBytes += spaceLen + wLen
|
||||
}
|
||||
|
||||
if len(current) > 0 {
|
||||
chunks = append(chunks, strings.Join(current, " "))
|
||||
}
|
||||
|
||||
return chunks
|
||||
}
|
||||
84
pkg/searcher/indexer/chunker_test.go
Normal file
84
pkg/searcher/indexer/chunker_test.go
Normal file
@@ -0,0 +1,84 @@
|
||||
package indexer
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestChunkText_Empty(t *testing.T) {
|
||||
assert.Nil(t, ChunkText("", 500))
|
||||
assert.Nil(t, ChunkText(" ", 500))
|
||||
}
|
||||
|
||||
func TestChunkText_SingleSmallParagraph(t *testing.T) {
|
||||
chunks := ChunkText("Hello world", 500)
|
||||
assert.Equal(t, []string{"Hello world"}, chunks)
|
||||
}
|
||||
|
||||
func TestChunkText_MultipleParagraphsCombined(t *testing.T) {
|
||||
text := "First paragraph.\n\nSecond paragraph.\n\nThird paragraph."
|
||||
chunks := ChunkText(text, 500)
|
||||
assert.Len(t, chunks, 1)
|
||||
assert.Contains(t, chunks[0], "First paragraph.")
|
||||
assert.Contains(t, chunks[0], "Third paragraph.")
|
||||
}
|
||||
|
||||
func TestChunkText_SplitOnParagraphBoundary(t *testing.T) {
|
||||
// Create two paragraphs each ~300 bytes
|
||||
para := strings.Repeat("abcde ", 50) // 300 bytes
|
||||
para = strings.TrimSpace(para)
|
||||
text := para + "\n\n" + para
|
||||
|
||||
chunks := ChunkText(text, 500)
|
||||
assert.Len(t, chunks, 2)
|
||||
}
|
||||
|
||||
func TestChunkText_LargeParagraphSplit(t *testing.T) {
|
||||
// Create a single paragraph of 1200 bytes (240 words * 5 bytes each)
|
||||
words := make([]string, 240)
|
||||
for i := range words {
|
||||
words[i] = "word" // 4 bytes + 1 space = 5 per word
|
||||
}
|
||||
text := strings.Join(words, " ") // 240*4 + 239 = 1199 bytes
|
||||
|
||||
chunks := ChunkText(text, 500)
|
||||
assert.Len(t, chunks, 3)
|
||||
assert.LessOrEqual(t, len(chunks[0]), 500)
|
||||
assert.LessOrEqual(t, len(chunks[1]), 500)
|
||||
assert.Greater(t, len(chunks[2]), 0)
|
||||
}
|
||||
|
||||
func TestChunkText_DefaultMaxBytes(t *testing.T) {
|
||||
chunks := ChunkText("hello", 0)
|
||||
assert.Equal(t, []string{"hello"}, chunks)
|
||||
}
|
||||
|
||||
func TestChunkText_EmptyParagraphsIgnored(t *testing.T) {
|
||||
text := "First.\n\n\n\n\n\nSecond."
|
||||
chunks := ChunkText(text, 500)
|
||||
assert.Len(t, chunks, 1)
|
||||
assert.Equal(t, "First.\n\nSecond.", chunks[0])
|
||||
}
|
||||
|
||||
func TestChunkText_ParagraphKeptWhole(t *testing.T) {
|
||||
// A paragraph under the limit should not be split
|
||||
para := strings.Repeat("x", 400)
|
||||
chunks := ChunkText(para, 500)
|
||||
assert.Len(t, chunks, 1)
|
||||
assert.Equal(t, para, chunks[0])
|
||||
}
|
||||
|
||||
func TestChunkText_JoinerAccountedInLimit(t *testing.T) {
|
||||
// Two paragraphs that fit individually but exceed the limit when joined with "\n\n"
|
||||
p1 := strings.Repeat("a", 250)
|
||||
p2 := strings.Repeat("b", 250)
|
||||
text := p1 + "\n\n" + p2
|
||||
|
||||
chunks := ChunkText(text, 500)
|
||||
// 250 + 2 + 250 = 502 > 500, so they should be split
|
||||
assert.Len(t, chunks, 2)
|
||||
assert.Equal(t, p1, chunks[0])
|
||||
assert.Equal(t, p2, chunks[1])
|
||||
}
|
||||
384
pkg/searcher/indexer/meilisearch.go
Normal file
384
pkg/searcher/indexer/meilisearch.go
Normal file
@@ -0,0 +1,384 @@
|
||||
package indexer
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"slices"
|
||||
"strings"
|
||||
|
||||
"github.com/cloudreve/Cloudreve/v4/pkg/logging"
|
||||
"github.com/cloudreve/Cloudreve/v4/pkg/searcher"
|
||||
"github.com/cloudreve/Cloudreve/v4/pkg/setting"
|
||||
"github.com/meilisearch/meilisearch-go"
|
||||
)
|
||||
|
||||
const (
|
||||
indexName = "cloudreve_files"
|
||||
embedderName = "cr-text"
|
||||
)
|
||||
|
||||
// MeilisearchIndexer implements SearchIndexer using Meilisearch.
|
||||
type MeilisearchIndexer struct {
|
||||
client meilisearch.ServiceManager
|
||||
l logging.Logger
|
||||
pageSize int
|
||||
chunkSize int
|
||||
cfg *setting.FTSIndexMeilisearchSetting
|
||||
}
|
||||
|
||||
// NewMeilisearchIndexer creates a new MeilisearchIndexer.
|
||||
func NewMeilisearchIndexer(msCfg *setting.FTSIndexMeilisearchSetting, chunkSize int, l logging.Logger) *MeilisearchIndexer {
|
||||
client := meilisearch.New(msCfg.Endpoint, meilisearch.WithAPIKey(msCfg.APIKey))
|
||||
return &MeilisearchIndexer{
|
||||
client: client,
|
||||
l: l,
|
||||
pageSize: msCfg.PageSize,
|
||||
chunkSize: chunkSize,
|
||||
cfg: msCfg,
|
||||
}
|
||||
}
|
||||
|
||||
var (
|
||||
requiredFilterable = []string{"owner_id", "file_id", "entity_id"}
|
||||
requiredSearchable = []string{"text", "file_name"}
|
||||
requiredDistinct = "file_id"
|
||||
)
|
||||
|
||||
func (m *MeilisearchIndexer) IndexReady(ctx context.Context) (bool, error) {
|
||||
index := m.client.Index(indexName)
|
||||
|
||||
settings, err := index.GetSettingsWithContext(ctx)
|
||||
if err != nil {
|
||||
// If the index doesn't exist, Meilisearch returns an error.
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// Check filterable attributes.
|
||||
for _, attr := range requiredFilterable {
|
||||
if !slices.Contains(settings.FilterableAttributes, attr) {
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
|
||||
// Check searchable attributes.
|
||||
for _, attr := range requiredSearchable {
|
||||
if !slices.Contains(settings.SearchableAttributes, attr) {
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
|
||||
// Check distinct attribute.
|
||||
if settings.DistinctAttribute == nil || *settings.DistinctAttribute != requiredDistinct {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// Check embedder if embedding is enabled.
|
||||
if m.cfg.EmbeddingEnbaled {
|
||||
if settings.Embedders == nil {
|
||||
return false, nil
|
||||
}
|
||||
if _, ok := settings.Embedders[embedderName]; !ok {
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func (m *MeilisearchIndexer) EnsureIndex(ctx context.Context) error {
|
||||
_, err := m.client.CreateIndexWithContext(ctx, &meilisearch.IndexConfig{
|
||||
Uid: indexName,
|
||||
PrimaryKey: "id",
|
||||
})
|
||||
if err != nil {
|
||||
m.l.Debug("Create index returned (may already exist): %s", err)
|
||||
}
|
||||
|
||||
index := m.client.Index(indexName)
|
||||
|
||||
filterableAttrs := []any{"owner_id", "file_id", "entity_id"}
|
||||
if _, err := index.UpdateFilterableAttributesWithContext(ctx, &filterableAttrs); err != nil {
|
||||
return fmt.Errorf("failed to set filterable attributes: %w", err)
|
||||
}
|
||||
|
||||
searchableAttrs := []string{"text", "file_name"}
|
||||
if _, err := index.UpdateSearchableAttributesWithContext(ctx, &searchableAttrs); err != nil {
|
||||
return fmt.Errorf("failed to set searchable attributes: %w", err)
|
||||
}
|
||||
|
||||
_, err = index.UpdateDistinctAttributeWithContext(ctx, "file_id")
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to set distinct attribute: %w", err)
|
||||
}
|
||||
|
||||
if m.cfg.EmbeddingEnbaled {
|
||||
var embedder meilisearch.Embedder
|
||||
if err := json.Unmarshal([]byte(m.cfg.EmbeddingSetting), &embedder); err != nil {
|
||||
m.cfg.EmbeddingEnbaled = false
|
||||
m.l.Warning("Failed to unmarshal embedding setting: %s, fallback to disable embedding", err)
|
||||
return nil
|
||||
}
|
||||
|
||||
_, err := index.UpdateEmbeddersWithContext(ctx, map[string]meilisearch.Embedder{
|
||||
embedderName: embedder,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to set embedders: %w", err)
|
||||
}
|
||||
} else {
|
||||
_, err := index.ResetEmbeddersWithContext(ctx)
|
||||
if err != nil {
|
||||
m.l.Warning("Failed to reset embedder: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *MeilisearchIndexer) IndexFile(ctx context.Context, ownerID, fileID, entityID int, fileName, text string) error {
|
||||
chunks := ChunkText(text, m.chunkSize)
|
||||
if len(chunks) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
docs := make([]searcher.SearchDocument, 0, len(chunks))
|
||||
for i, chunk := range chunks {
|
||||
docs = append(docs, searcher.SearchDocument{
|
||||
ID: fmt.Sprintf("%d_%d", fileID, i),
|
||||
FileID: fileID,
|
||||
OwnerID: ownerID,
|
||||
EntityID: entityID,
|
||||
ChunkIdx: i,
|
||||
FileName: fileName,
|
||||
Text: chunk,
|
||||
})
|
||||
}
|
||||
|
||||
index := m.client.Index(indexName)
|
||||
pk := "id"
|
||||
if _, err := index.AddDocumentsWithContext(ctx, docs, &meilisearch.DocumentOptions{PrimaryKey: &pk}); err != nil {
|
||||
return fmt.Errorf("failed to add documents: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *MeilisearchIndexer) DeleteByFileIDs(ctx context.Context, fileID ...int) error {
|
||||
if len(fileID) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
index := m.client.Index(indexName)
|
||||
strs := make([]string, len(fileID))
|
||||
for i, id := range fileID {
|
||||
strs[i] = fmt.Sprintf("%d", id)
|
||||
}
|
||||
filter := fmt.Sprintf("file_id IN [%s]", strings.Join(strs, ", "))
|
||||
if _, err := index.DeleteDocumentsByFilterWithContext(ctx, filter, nil); err != nil {
|
||||
return fmt.Errorf("failed to delete documents by file_ids: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *MeilisearchIndexer) ChangeOwner(ctx context.Context, fileID, oldOwnerID, newOwnerID int) error {
|
||||
index := m.client.Index(indexName)
|
||||
filter := fmt.Sprintf("file_id = %d AND owner_id = %d", fileID, oldOwnerID)
|
||||
|
||||
// Fetch all existing document chunks in batches.
|
||||
const batchSize int64 = 100
|
||||
var allDocs []searcher.SearchDocument
|
||||
for offset := int64(0); ; offset += batchSize {
|
||||
var result meilisearch.DocumentsResult
|
||||
if err := index.GetDocumentsWithContext(ctx, &meilisearch.DocumentsQuery{
|
||||
Filter: filter,
|
||||
Limit: batchSize,
|
||||
Offset: offset,
|
||||
}, &result); err != nil {
|
||||
return fmt.Errorf("failed to get documents: %w", err)
|
||||
}
|
||||
|
||||
for _, hit := range result.Results {
|
||||
var doc searcher.SearchDocument
|
||||
if err := hit.DecodeInto(&doc); err != nil {
|
||||
m.l.Warning("Failed to decode document during owner change: %s", err)
|
||||
continue
|
||||
}
|
||||
allDocs = append(allDocs, doc)
|
||||
}
|
||||
|
||||
if int64(len(result.Results)) < batchSize {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if len(allDocs) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Update owner_id in place — primary key is {fileID}_{chunkIdx} so it stays the same.
|
||||
for i := range allDocs {
|
||||
allDocs[i].OwnerID = newOwnerID
|
||||
}
|
||||
|
||||
if _, err := index.UpdateDocumentsInBatchesWithContext(ctx, allDocs, 100, nil); err != nil {
|
||||
return fmt.Errorf("failed to update documents with new owner: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *MeilisearchIndexer) CopyByFileID(ctx context.Context, srcFileID, dstFileID, dstOwnerID, dstEntityID int) error {
|
||||
index := m.client.Index(indexName)
|
||||
filter := fmt.Sprintf("file_id = %d", srcFileID)
|
||||
|
||||
const batchSize int64 = 100
|
||||
var allDocs []searcher.SearchDocument
|
||||
for offset := int64(0); ; offset += batchSize {
|
||||
var result meilisearch.DocumentsResult
|
||||
if err := index.GetDocumentsWithContext(ctx, &meilisearch.DocumentsQuery{
|
||||
Filter: filter,
|
||||
Limit: batchSize,
|
||||
Offset: offset,
|
||||
}, &result); err != nil {
|
||||
return fmt.Errorf("failed to get source documents: %w", err)
|
||||
}
|
||||
|
||||
for _, hit := range result.Results {
|
||||
var doc searcher.SearchDocument
|
||||
if err := hit.DecodeInto(&doc); err != nil {
|
||||
m.l.Warning("Failed to decode document during copy: %s", err)
|
||||
continue
|
||||
}
|
||||
allDocs = append(allDocs, doc)
|
||||
}
|
||||
|
||||
if int64(len(result.Results)) < batchSize {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if len(allDocs) == 0 {
|
||||
return fmt.Errorf("no source documents found for file %d", srcFileID)
|
||||
}
|
||||
|
||||
for i := range allDocs {
|
||||
if allDocs[i].EntityID != dstEntityID {
|
||||
m.l.Warning("Entity id mismatch for file %d, original: %d, destination: %d", srcFileID, allDocs[i].EntityID, dstEntityID)
|
||||
continue
|
||||
}
|
||||
|
||||
allDocs[i].ID = fmt.Sprintf("%d_%d", dstFileID, allDocs[i].ChunkIdx)
|
||||
allDocs[i].FileID = dstFileID
|
||||
allDocs[i].OwnerID = dstOwnerID
|
||||
allDocs[i].EntityID = dstEntityID
|
||||
}
|
||||
|
||||
if len(allDocs) == 0 {
|
||||
return fmt.Errorf("no source documents found for file %d", srcFileID)
|
||||
}
|
||||
|
||||
pk := "id"
|
||||
if _, err := index.AddDocumentsWithContext(ctx, allDocs, &meilisearch.DocumentOptions{PrimaryKey: &pk}); err != nil {
|
||||
return fmt.Errorf("failed to add copied documents: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *MeilisearchIndexer) Rename(ctx context.Context, fileID, entityID int, newFileName string) error {
|
||||
index := m.client.Index(indexName)
|
||||
filter := fmt.Sprintf("file_id = %d AND entity_id = %d", fileID, entityID)
|
||||
|
||||
const batchSize int64 = 100
|
||||
var allDocs []searcher.SearchDocument
|
||||
for offset := int64(0); ; offset += batchSize {
|
||||
var result meilisearch.DocumentsResult
|
||||
if err := index.GetDocumentsWithContext(ctx, &meilisearch.DocumentsQuery{
|
||||
Filter: filter,
|
||||
Limit: batchSize,
|
||||
Offset: offset,
|
||||
}, &result); err != nil {
|
||||
return fmt.Errorf("failed to get documents for rename: %w", err)
|
||||
}
|
||||
|
||||
for _, hit := range result.Results {
|
||||
var doc searcher.SearchDocument
|
||||
if err := hit.DecodeInto(&doc); err != nil {
|
||||
m.l.Warning("Failed to decode document during rename: %s", err)
|
||||
continue
|
||||
}
|
||||
doc.FileName = newFileName
|
||||
allDocs = append(allDocs, doc)
|
||||
}
|
||||
|
||||
if int64(len(result.Results)) < batchSize {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if len(allDocs) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
if _, err := index.UpdateDocumentsInBatchesWithContext(ctx, allDocs, 100, nil); err != nil {
|
||||
return fmt.Errorf("failed to update documents with new file name: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *MeilisearchIndexer) Search(ctx context.Context, ownerID int, query string, offset int) ([]searcher.SearchResult, int64, error) {
|
||||
index := m.client.Index(indexName)
|
||||
|
||||
searchReq := &meilisearch.SearchRequest{
|
||||
Filter: fmt.Sprintf("owner_id = %d", ownerID),
|
||||
Limit: int64(m.pageSize),
|
||||
Offset: int64(offset),
|
||||
AttributesToHighlight: []string{"text"},
|
||||
}
|
||||
|
||||
if m.cfg.EmbeddingEnbaled {
|
||||
searchReq.Hybrid = &meilisearch.SearchRequestHybrid{
|
||||
Embedder: embedderName,
|
||||
}
|
||||
}
|
||||
|
||||
resp, err := index.SearchWithContext(ctx, query, searchReq)
|
||||
if err != nil {
|
||||
return nil, 0, fmt.Errorf("search failed: %w", err)
|
||||
}
|
||||
|
||||
results := make([]searcher.SearchResult, 0, len(resp.Hits))
|
||||
seen := make(map[int]struct{})
|
||||
for _, hit := range resp.Hits {
|
||||
var doc searcher.SearchDocument
|
||||
if err := hit.DecodeInto(&doc); err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
if _, exists := seen[doc.FileID]; exists {
|
||||
continue
|
||||
}
|
||||
seen[doc.FileID] = struct{}{}
|
||||
|
||||
// Extract text from raw JSON for display
|
||||
textStr := doc.Text
|
||||
if doc.Formated != nil {
|
||||
textStr = doc.Formated.Text
|
||||
}
|
||||
|
||||
results = append(results, searcher.SearchResult{
|
||||
FileID: doc.FileID,
|
||||
OwnerID: doc.OwnerID,
|
||||
FileName: doc.FileName,
|
||||
Text: textStr,
|
||||
})
|
||||
}
|
||||
|
||||
return results, resp.EstimatedTotalHits, nil
|
||||
}
|
||||
|
||||
func (m *MeilisearchIndexer) Close() error {
|
||||
return nil
|
||||
}
|
||||
46
pkg/searcher/indexer/noop.go
Normal file
46
pkg/searcher/indexer/noop.go
Normal file
@@ -0,0 +1,46 @@
|
||||
package indexer
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/cloudreve/Cloudreve/v4/pkg/searcher"
|
||||
)
|
||||
|
||||
// NoopIndexer is a no-op implementation of SearchIndexer, used when FTS is disabled.
|
||||
type NoopIndexer struct{}
|
||||
|
||||
func (n *NoopIndexer) IndexFile(ctx context.Context, ownerID, fileID, entityID int, fileName, text string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *NoopIndexer) DeleteByFileIDs(ctx context.Context, fileID ...int) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *NoopIndexer) ChangeOwner(ctx context.Context, fileID, oldOwnerID, newOwnerID int) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *NoopIndexer) CopyByFileID(ctx context.Context, srcFileID, dstFileID, dstOwnerID, dstEntityID int) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *NoopIndexer) Rename(ctx context.Context, fileID, entityID int, newFileName string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *NoopIndexer) Search(ctx context.Context, ownerID int, query string, offset int) ([]searcher.SearchResult, int64, error) {
|
||||
return nil, 0, nil
|
||||
}
|
||||
|
||||
func (n *NoopIndexer) IndexReady(ctx context.Context) (bool, error) {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func (n *NoopIndexer) EnsureIndex(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *NoopIndexer) Close() error {
|
||||
return nil
|
||||
}
|
||||
@@ -222,6 +222,18 @@ type (
|
||||
EventHubEnabled(ctx context.Context) bool
|
||||
// EventHubDebounceDelay returns the debounce delay of event hub.
|
||||
EventHubDebounceDelay(ctx context.Context) time.Duration
|
||||
// FTSEnabled returns true if full-text search is enabled.
|
||||
FTSEnabled(ctx context.Context) bool
|
||||
// FTSIndexType returns the full-text search index type.
|
||||
FTSIndexType(ctx context.Context) FTSIndexType
|
||||
// FTSExtractorType returns the full-text search extractor type.
|
||||
FTSExtractorType(ctx context.Context) FTSExtractorType
|
||||
// FTSIndexMeilisearch returns Meilisearch index settings.
|
||||
FTSIndexMeilisearch(ctx context.Context) *FTSIndexMeilisearchSetting
|
||||
// FTSTikaExtractor returns Tika extractor settings.
|
||||
FTSTikaExtractor(ctx context.Context) *FTSTikaExtractorSetting
|
||||
// FTSChunkSize returns the maximum chunk size in bytes for full-text search indexing.
|
||||
FTSChunkSize(ctx context.Context) int
|
||||
}
|
||||
UseFirstSiteUrlCtxKey = struct{}
|
||||
)
|
||||
@@ -600,6 +612,41 @@ func (s *settingProvider) EventHubEnabled(ctx context.Context) bool {
|
||||
return s.getBoolean(ctx, "fs_event_push_enabled", true)
|
||||
}
|
||||
|
||||
func (s *settingProvider) FTSEnabled(ctx context.Context) bool {
|
||||
return s.getBoolean(ctx, "fts_enabled", false)
|
||||
}
|
||||
|
||||
func (s *settingProvider) FTSIndexType(ctx context.Context) FTSIndexType {
|
||||
return FTSIndexType(s.getString(ctx, "fts_index_type", ""))
|
||||
}
|
||||
|
||||
func (s *settingProvider) FTSExtractorType(ctx context.Context) FTSExtractorType {
|
||||
return FTSExtractorType(s.getString(ctx, "fts_extractor_type", ""))
|
||||
}
|
||||
|
||||
func (s *settingProvider) FTSIndexMeilisearch(ctx context.Context) *FTSIndexMeilisearchSetting {
|
||||
return &FTSIndexMeilisearchSetting{
|
||||
Endpoint: s.getString(ctx, "fts_meilisearch_endpoint", ""),
|
||||
APIKey: s.getString(ctx, "fts_meilisearch_api_key", ""),
|
||||
PageSize: s.getInt(ctx, "fts_meilisearch_page_size", 5),
|
||||
EmbeddingEnbaled: s.getBoolean(ctx, "fts_meilisearch_embed_enabled", false),
|
||||
EmbeddingSetting: s.getString(ctx, "fts_meilisearch_embed_config", "{}"),
|
||||
}
|
||||
}
|
||||
|
||||
func (s *settingProvider) FTSTikaExtractor(ctx context.Context) *FTSTikaExtractorSetting {
|
||||
return &FTSTikaExtractorSetting{
|
||||
Endpoint: s.getString(ctx, "fts_tika_endpoint", ""),
|
||||
MaxResponseSize: s.getInt64(ctx, "fts_tika_max_response_size", 10485760),
|
||||
Exts: s.getStringList(ctx, "fts_tika_exts", []string{}),
|
||||
MaxFileSize: s.getInt64(ctx, "fts_tika_max_file_size_remote", 52428800),
|
||||
}
|
||||
}
|
||||
|
||||
func (s *settingProvider) FTSChunkSize(ctx context.Context) int {
|
||||
return s.getInt(ctx, "fts_chunk_size", 2000)
|
||||
}
|
||||
|
||||
func (s *settingProvider) Queue(ctx context.Context, queueType QueueType) *QueueSetting {
|
||||
queueTypeStr := string(queueType)
|
||||
return &QueueSetting{
|
||||
|
||||
@@ -225,6 +225,35 @@ type CustomHTML struct {
|
||||
SidebarBottom string `json:"sidebar_bottom,omitempty"`
|
||||
}
|
||||
|
||||
type FTSIndexType string
|
||||
|
||||
const (
|
||||
FTSIndexTypeNone = FTSIndexType("")
|
||||
FTSIndexTypeMeilisearch = FTSIndexType("meilisearch")
|
||||
)
|
||||
|
||||
type FTSExtractorType string
|
||||
|
||||
const (
|
||||
FTSExtractorTypeNone = FTSExtractorType("")
|
||||
FTSExtractorTypeTika = FTSExtractorType("tika")
|
||||
)
|
||||
|
||||
type FTSIndexMeilisearchSetting struct {
|
||||
Endpoint string
|
||||
APIKey string
|
||||
PageSize int
|
||||
EmbeddingEnbaled bool
|
||||
EmbeddingSetting string
|
||||
}
|
||||
|
||||
type FTSTikaExtractorSetting struct {
|
||||
Endpoint string
|
||||
MaxResponseSize int64
|
||||
Exts []string
|
||||
MaxFileSize int64
|
||||
}
|
||||
|
||||
type MasterEncryptKeyVaultType string
|
||||
|
||||
const (
|
||||
|
||||
@@ -436,3 +436,17 @@ func HandleExplorerEventsPush(c *gin.Context) {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func FulltextSearch(c *gin.Context) {
|
||||
service := ParametersFromContext[*explorer.FulltextSearchService](c, explorer.FulltextSearchParamCtx{})
|
||||
resp, err := service.Search(c)
|
||||
if err != nil {
|
||||
c.JSON(200, serializer.Err(c, err))
|
||||
c.Abort()
|
||||
return
|
||||
}
|
||||
|
||||
c.JSON(200, serializer.Response{
|
||||
Data: resp,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -785,6 +785,16 @@ func initMasterRouter(dep dependency.Dep) *gin.Engine {
|
||||
controllers.FromQuery[explorer.ExplorerEventService](explorer.ExplorerEventParamCtx{}),
|
||||
controllers.HandleExplorerEventsPush,
|
||||
)
|
||||
|
||||
// Full text search
|
||||
file.GET("search",
|
||||
middleware.LoginRequired(),
|
||||
middleware.IsFunctionEnabled(func(c *gin.Context) bool {
|
||||
return dep.SettingProvider().FTSEnabled(c)
|
||||
}),
|
||||
controllers.FromQuery[explorer.FulltextSearchService](explorer.FulltextSearchParamCtx{}),
|
||||
controllers.FulltextSearch,
|
||||
)
|
||||
}
|
||||
|
||||
// 分享相关
|
||||
|
||||
@@ -54,6 +54,7 @@ type SiteConfig struct {
|
||||
ThumbnailHeight int `json:"thumbnail_height,omitempty"`
|
||||
CustomProps []types.CustomProps `json:"custom_props,omitempty"`
|
||||
ShowEncryptionStatus bool `json:"show_encryption_status,omitempty"`
|
||||
FullTextSearch bool `json:"full_text_search,omitempty"`
|
||||
|
||||
// Thumbnail section
|
||||
ThumbExts []string `json:"thumb_exts,omitempty"`
|
||||
@@ -120,6 +121,7 @@ func (s *GetSettingService) GetSiteConfig(c *gin.Context) (*SiteConfig, error) {
|
||||
ThumbnailHeight: h,
|
||||
CustomProps: customProps,
|
||||
ShowEncryptionStatus: showEncryptionStatus,
|
||||
FullTextSearch: settings.FTSEnabled(c),
|
||||
}, nil
|
||||
case "emojis":
|
||||
emojis := settings.EmojiPresets(c)
|
||||
|
||||
@@ -747,3 +747,25 @@ func (s *ArchiveListFilesService) List(c *gin.Context) (*ArchiveListFilesRespons
|
||||
|
||||
return BuildArchiveListFilesResponse(files), nil
|
||||
}
|
||||
|
||||
type (
|
||||
FulltextSearchParamCtx struct{}
|
||||
FulltextSearchService struct {
|
||||
Query string `form:"query" binding:"required"`
|
||||
Offset int `form:"offset"`
|
||||
}
|
||||
)
|
||||
|
||||
func (s *FulltextSearchService) Search(c *gin.Context) (*FullTextSearchResults, error) {
|
||||
dep := dependency.FromContext(c)
|
||||
user := inventory.UserFromContext(c)
|
||||
m := manager.NewFileManager(dep, user)
|
||||
defer m.Recycle()
|
||||
|
||||
results, err := m.SearchFullText(c, s.Query, s.Offset)
|
||||
if err != nil {
|
||||
return nil, serializer.NewError(serializer.CodeInternalSetting, "failed to search full text", err)
|
||||
}
|
||||
|
||||
return BuildFullTextSearchResults(c, user, dep.HashIDEncoder(), results), nil
|
||||
}
|
||||
|
||||
@@ -26,6 +26,28 @@ import (
|
||||
"github.com/samber/lo"
|
||||
)
|
||||
|
||||
type FullTextSearchResults struct {
|
||||
Hits []FullTextSearchResult `json:"hits"`
|
||||
Total int64 `json:"total"`
|
||||
}
|
||||
|
||||
func BuildFullTextSearchResults(ctx context.Context, user *ent.User, hasher hashid.Encoder, results *manager.FullTextSearchResults) *FullTextSearchResults {
|
||||
return &FullTextSearchResults{
|
||||
Hits: lo.Map(results.Hits, func(result manager.FullTextSearchResult, index int) FullTextSearchResult {
|
||||
return FullTextSearchResult{
|
||||
File: *BuildFileResponse(ctx, user, result.File, hasher, nil),
|
||||
Content: result.Content,
|
||||
}
|
||||
}),
|
||||
Total: results.Total,
|
||||
}
|
||||
}
|
||||
|
||||
type FullTextSearchResult struct {
|
||||
File FileResponse `json:"file"`
|
||||
Content string `json:"content"`
|
||||
}
|
||||
|
||||
type ArchiveListFilesResponse struct {
|
||||
Files []manager.ArchivedFile `json:"files"`
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user