mirror of
https://github.com/cloudreve/cloudreve.git
synced 2026-03-03 03:07:01 +00:00
feat(dashboard): add setting option for event push
This commit is contained in:
@@ -376,7 +376,7 @@ func (d *dependency) EventHub() eventhub.EventHub {
|
||||
if d.eventHub != nil {
|
||||
return d.eventHub
|
||||
}
|
||||
d.eventHub = eventhub.NewEventHub(d.UserClient(), d.FsEventClient())
|
||||
d.eventHub = eventhub.NewEventHub(d.UserClient(), d.FsEventClient(), d.SettingProvider())
|
||||
return d.eventHub
|
||||
}
|
||||
|
||||
|
||||
@@ -669,6 +669,9 @@ var DefaultSettings = map[string]string{
|
||||
"encrypt_master_key_file": "",
|
||||
"show_encryption_status": "1",
|
||||
"show_desktop_app_promotion": "1",
|
||||
"fs_event_push_enabled": "1",
|
||||
"fs_event_push_max_age": "1209600",
|
||||
"fs_event_push_debounce": "5",
|
||||
}
|
||||
|
||||
var RedactedSettings = map[string]struct{}{
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
|
||||
"github.com/cloudreve/Cloudreve/v4/inventory"
|
||||
"github.com/cloudreve/Cloudreve/v4/pkg/logging"
|
||||
"github.com/cloudreve/Cloudreve/v4/pkg/setting"
|
||||
)
|
||||
|
||||
type (
|
||||
@@ -36,16 +37,18 @@ type eventHub struct {
|
||||
topics map[int]map[string]*subscriber
|
||||
userClient inventory.UserClient
|
||||
fsEventClient inventory.FsEventClient
|
||||
settings setting.Provider
|
||||
closed bool
|
||||
closeCh chan struct{}
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
func NewEventHub(userClient inventory.UserClient, fsEventClient inventory.FsEventClient) EventHub {
|
||||
func NewEventHub(userClient inventory.UserClient, fsEventClient inventory.FsEventClient, settings setting.Provider) EventHub {
|
||||
e := &eventHub{
|
||||
topics: make(map[int]map[string]*subscriber),
|
||||
userClient: userClient,
|
||||
fsEventClient: fsEventClient,
|
||||
settings: settings,
|
||||
closeCh: make(chan struct{}),
|
||||
}
|
||||
|
||||
@@ -139,7 +142,7 @@ func (e *eventHub) Subscribe(ctx context.Context, topic int, id string) (chan *E
|
||||
}
|
||||
}
|
||||
|
||||
sub, err := newSubscriber(ctx, id, e.userClient, e.fsEventClient)
|
||||
sub, err := newSubscriber(ctx, id, e.userClient, e.fsEventClient, e.settings.EventHubMaxOfflineDuration(ctx), e.settings.EventHubDebounceDelay(ctx))
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
@@ -31,9 +31,7 @@ type Subscriber interface {
|
||||
}
|
||||
|
||||
const (
|
||||
debounceDelay = 5 * time.Second
|
||||
userCacheTTL = 1 * time.Hour
|
||||
offlineMaxAge = 14 * 24 * time.Hour // 14 days
|
||||
userCacheTTL = 1 * time.Hour
|
||||
)
|
||||
|
||||
type subscriber struct {
|
||||
@@ -50,8 +48,10 @@ type subscriber struct {
|
||||
offlineSince time.Time
|
||||
|
||||
// Debounce buffer for pending events
|
||||
buffer []*Event
|
||||
timer *time.Timer
|
||||
buffer []*Event
|
||||
timer *time.Timer
|
||||
offlineMaxAge time.Duration
|
||||
debounceDelay time.Duration
|
||||
|
||||
// Owner info
|
||||
ownerCached *ent.User
|
||||
@@ -62,7 +62,7 @@ type subscriber struct {
|
||||
closedCh chan struct{}
|
||||
}
|
||||
|
||||
func newSubscriber(ctx context.Context, id string, userClient inventory.UserClient, fsEventClient inventory.FsEventClient) (*subscriber, error) {
|
||||
func newSubscriber(ctx context.Context, id string, userClient inventory.UserClient, fsEventClient inventory.FsEventClient, maxAge, debounceDelay time.Duration) (*subscriber, error) {
|
||||
user := inventory.UserFromContext(ctx)
|
||||
if user == nil || inventory.IsAnonymousUser(user) {
|
||||
return nil, errors.New("user not found")
|
||||
@@ -73,11 +73,14 @@ func newSubscriber(ctx context.Context, id string, userClient inventory.UserClie
|
||||
ch: make(chan *Event, bufSize),
|
||||
userClient: userClient,
|
||||
fsEventClient: fsEventClient,
|
||||
|
||||
ownerCached: user,
|
||||
uid: user.ID,
|
||||
cachedAt: time.Now(),
|
||||
online: true,
|
||||
closedCh: make(chan struct{}),
|
||||
offlineMaxAge: maxAge,
|
||||
debounceDelay: debounceDelay,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -142,7 +145,7 @@ func (s *subscriber) publishLocked(evt Event) {
|
||||
if s.timer != nil {
|
||||
s.timer.Stop()
|
||||
}
|
||||
s.timer = time.AfterFunc(debounceDelay, s.flush)
|
||||
s.timer = time.AfterFunc(s.debounceDelay, s.flush)
|
||||
}
|
||||
|
||||
// flush sends all buffered events to the channel.
|
||||
@@ -236,7 +239,7 @@ func (s *subscriber) setOnline(ctx context.Context) {
|
||||
if s.timer != nil {
|
||||
s.timer.Stop()
|
||||
}
|
||||
s.timer = time.AfterFunc(debounceDelay, s.flush)
|
||||
s.timer = time.AfterFunc(s.debounceDelay, s.flush)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -297,7 +300,7 @@ func (s *subscriber) isClosed() bool {
|
||||
func (s *subscriber) shouldExpire() bool {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
return !s.online && !s.offlineSince.IsZero() && time.Since(s.offlineSince) > offlineMaxAge
|
||||
return !s.online && !s.offlineSince.IsZero() && time.Since(s.offlineSince) > s.offlineMaxAge
|
||||
}
|
||||
|
||||
// Buffer returns a copy of the current buffered events.
|
||||
|
||||
@@ -216,6 +216,12 @@ type (
|
||||
MasterEncryptKeyFile(ctx context.Context) string
|
||||
// ShowEncryptionStatus returns true if encryption status is shown.
|
||||
ShowEncryptionStatus(ctx context.Context) bool
|
||||
// EventHubMaxOfflineDuration returns the maximum offline duration of event hub.
|
||||
EventHubMaxOfflineDuration(ctx context.Context) time.Duration
|
||||
// EventHubEnabled returns true if event hub is enabled.
|
||||
EventHubEnabled(ctx context.Context) bool
|
||||
// EventHubDebounceDelay returns the debounce delay of event hub.
|
||||
EventHubDebounceDelay(ctx context.Context) time.Duration
|
||||
}
|
||||
UseFirstSiteUrlCtxKey = struct{}
|
||||
)
|
||||
@@ -582,6 +588,18 @@ func (s *settingProvider) EntityUrlValidDuration(ctx context.Context) time.Durat
|
||||
return time.Duration(s.getInt(ctx, "entity_url_default_ttl", 3600)) * time.Second
|
||||
}
|
||||
|
||||
func (s *settingProvider) EventHubMaxOfflineDuration(ctx context.Context) time.Duration {
|
||||
return time.Duration(s.getInt(ctx, "fs_event_push_max_age", 1209600)) * time.Second
|
||||
}
|
||||
|
||||
func (s *settingProvider) EventHubDebounceDelay(ctx context.Context) time.Duration {
|
||||
return time.Duration(s.getInt(ctx, "fs_event_push_debounce", 5)) * time.Second
|
||||
}
|
||||
|
||||
func (s *settingProvider) EventHubEnabled(ctx context.Context) bool {
|
||||
return s.getBoolean(ctx, "fs_event_push_enabled", true)
|
||||
}
|
||||
|
||||
func (s *settingProvider) Queue(ctx context.Context, queueType QueueType) *QueueSetting {
|
||||
queueTypeStr := string(queueType)
|
||||
return &QueueSetting{
|
||||
|
||||
@@ -778,6 +778,9 @@ func initMasterRouter(dep dependency.Dep) *gin.Engine {
|
||||
// Server event push
|
||||
file.GET("events",
|
||||
middleware.LoginRequired(),
|
||||
middleware.IsFunctionEnabled(func(c *gin.Context) bool {
|
||||
return dep.SettingProvider().EventHubEnabled(c)
|
||||
}),
|
||||
controllers.FromQuery[explorer.ExplorerEventService](explorer.ExplorerEventParamCtx{}),
|
||||
controllers.HandleExplorerEventsPush,
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user