mirror of
https://github.com/siyuan-note/siyuan.git
synced 2026-06-28 06:46:12 +00:00
* ♻️ Add/update indirect Go dependencies in kernel Update kernel/go.mod and kernel/go.sum to add multiple indirect modules and checksum entries. Notable additions include github.com/fastschema/qjs, github.com/filecoin-project/go-jsonrpc, github.com/ipfs/go-log/v2, go.opencensus.io, go.uber.org/{atomic,multierr,zap}, golang.org/x/xerrors and github.com/golang/groupcache among many transitive entries. Changes ensure transitive dependencies are pinned and go.sum checksums are present (likely produced by `go mod tidy`) to make builds reproducible. * refactor: export bazaar.GetCurrentBackend for kernel plugin platform matching Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * build: promote qjs to direct dependency for kernel plugin system Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * feat(plugin): add KernelPlugin struct with QJS runtime lifecycle and state machine Introduces plugin/plugin.go with KernelPlugin owning an isolated QuickJS runtime, a mutex-serialized call path, RPC method registration/dispatch, Promise awaiting, JSON round-trip result conversion, and WebSocket tracking. Adds sandbox_stub.go as a temporary no-op stub for injectSandboxGlobals. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * feat(plugin): add PluginManager singleton for kernel plugin discovery and lifecycle * feat(plugin): add sandbox injection scaffold with siyuan.log Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * feat(plugin): implement siyuan.storage CRUD scoped to petal storage directory Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * feat(plugin): implement siyuan.fetch with browser-like Response interface * feat(plugin): implement siyuan.socket with browser-compatible WebSocket API - Add sync import for mutex-protected WebSocket connection tracking - Implement __siyuan_socket Go function that creates browser-compatible WebSocket objects - Support send() method with queueing for messages sent before connection opens - Support close() method for closing the WebSocket connection - Track connection state via readyState property (0=CONNECTING, 1=OPEN, 3=CLOSED) - Connect to kernel WebSocket endpoint with automatic auth token injection - Run WebSocket I/O in background goroutine with proper cleanup - Wire up siyuan.socket JS API Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * feat(plugin): implement siyuan.rpc.register for JSON-RPC method registration * feat(plugin): add JSON-RPC 2.0 handler for kernel plugin method dispatch * feat(plugin): register /api/plugin/rpc/:name and /ws/plugin/rpc/:name routes Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * feat(plugin): wire kernel plugin manager start/stop into main lifecycle * feat(plugin): hook SetPetalEnabled to start/stop kernel plugins on enable/disable * test(plugin): add unit tests for kernel plugin state machine and eligibility * test(plugin): add comprehensive unit tests for manager, sandbox, and RPC handlers * refactor(plugin): Export IsTargetSupported and update usages Rename isTargetSupported to exported IsTargetSupported and adjust its comment. Replace local calls with bazaar.IsTargetSupported in kernel/bazaar and kernel/plugin/manager, removing the duplicated isKernelEligible helper. Update tests to import bazaar, call the new function, and change expectations to reflect that nil/empty kernel slices are treated as supported (i.e. supported on all platforms). * refactor(plugin): initialize PluginManager in main and update related usages * refactor(plugin): update JWT handling and plugin initialization for kernel plugins * refactor(plugin): enhance plugin initialization and improve sandbox global injections * refactor(kernel-plugin): Refactor plugin RPC registration and sandbox integration - Removed deprecated tests and refactored existing tests for clarity and efficiency. - Updated RPC method registration to use `bind` and `unbind` methods for better clarity. - Enhanced the `injectSandboxGlobals` function to include additional properties for the plugin. - Improved error handling in RPC methods and ensured proper state management for plugins. - Added benchmarks for map to JS conversion performance. - Cleaned up unused imports and organized code structure for better readability. * refactor(plugin): enhance concurrency handling and improve WebSocket integration * refactor(kernel-plugin): enhance RPC method handling and improve function registration * feat(kernel-plugin): add RPC method info retrieval and enhance plugin management * refactor(plugin): add plugin management endpoints and enhance plugin info retrieval * refactor(kernel-plugin): enhance RPC method handling and improve plugin info retrieval * refactor(kernel-plugin): improve error handling and response structures in RPC methods * refactor(kernel-plugin): improve error handling in RPC methods and enhance WebSocket closure management * fix(kernel-plugin): initialize sockets and socketMus maps in NewKernelPlugin * feat(kernel-plugin): add wsWrite helper and fix PushNotification omitempty Add wsWrite method on KernelPlugin that acquires the per-connection write mutex before sending a text frame, returning nil for untracked connections. Fix PushNotification's Params field to use omitempty for JSON-RPC 2.0 §4.2 compliance. Add rpc_test.go with newTestWsPair helper and tests for wsWrite. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * feat(kernel-plugin): add BroadcastNotification and per-connection write mutex Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * feat(kernel-plugin): expose siyuan.rpc.broadcast in plugin sandbox Add rpc.broadcast(method, params) binding in injectRpc so JS plugins can push JSON-RPC 2.0 notifications to all connected server clients. Fix deadlock by introducing a dedicated socketsMu RWMutex for the sockets map, decoupling socket tracking from the main plugin mutex that is held during Start()/Eval(). * fix(kernel-plugin): double-unlock in send handler and document PushNotification write-safety Remove spurious mu.Unlock() inside the nil-conn branch of injectSocket's CONNECTING-state send handler; the outer unconditional unlock is sufficient, so the inner one causes a panic under concurrent load. Document that PushNotification bypasses per-connection write serialization and must not be called concurrently with BroadcastNotification/wsWrite on the same connection without external locking. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * style(kernel-plugin): align struct field declarations in KernelPlugin Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix(kernel-plugin): omit params field from JsonRpcRequest when nil (JSON-RPC 2.0 §4.1) Per spec, params MAY be omitted; add omitempty so marshaled requests with no parameters do not emit "params":null. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * refactor(kernel-plugin): change JsonRpcRequest.Params to *json.RawMessage A pointer correctly models the three-way distinction: - nil → params key absent (omitted from marshal output via omitempty) - non-nil → params present (null, array, or object) The previous []byte omitempty omitted the key only for nil/empty slices and could not distinguish absent from explicit null on the wire. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * refactor(kernel-plugin): unify method naming conventions and improve JSON-RPC request handling * fix(kernel-plugin): improve WebSocket message handling and ensure thread safety with mutexes * fix(kernel-plugin): enhance WebSocket handling and improve error management in storage methods * fix(kernel-plugin): rename JsonRpcRequestRaw to JsonRpcInboundRequest and update related methods * fix(kernel-plugin): improve plugin management and error handling in kernel plugin methods * fix(kernel-plugin): rename kernel field to kernels and update related references * feat(kernel-plugin): implement logging and improve concurrency handling in plugin manager and storage methods * feat(kernel-plugin): enhance RPC parameter handling and add JSON array parsing support * refactor(kernel-plugin): refactor RPC handling and improve logging functionality * refactor(kernel-plugin): streamline loggerWrapper function and improve error handling in injectFetch * refactor(kernel-plugin): optimize injectFetch function and enhance error handling * feat(kernel-plugin): add onLoaded hook and enhance plugin lifecycle management * feat(kernel-plugin): add ObjectFreeze and ObjectSeal functions to enhance API security * feat(kernel-plugin): add InitJwtKey function to generate JWT signing key * refactor(kernel-plugin): enhance error handling and logging in plugin lifecycle methods * feat(kernel-plugin): improve WebSocket error handling and add concurrency support in BroadcastNotification * feat(kernel-plugin): enhance error handling in storage and fetch methods with panic recovery * feat(kernel-plugin): enhance PluginManager concurrency and error handling with sync.Map and atomic operations * feat(kernel-plugin): refactor PluginState to use atomic operations for improved concurrency * feat(kernel-plugin): add PluginStateLoaded and update state management in plugin lifecycle * refactor(kernel-plugin): update logging level in loadPetals and refactor loggerWrapper return values * feat(kernel-plugin): simplify invokeHook and enhance error handling in Object methods * feat(kernel-plugin): remove obsolete test files for plugin functionality * refactor(kernel-plugin): implement loggerWrapper and rpcParamsToJsValue functions for improved logging and RPC parameter handling * feat(kernel-plugin): introduce Worker for serializing plugin tasks and enhance context management * refactor(worker): enhance task execution with callback support and graceful shutdown - Introduced a callback mechanism in the Task struct to handle results and errors. - Updated the Run method to accept a callback, allowing immediate handling of task results. - Added a RunSync method for synchronous task execution with result retrieval. - Implemented atomic closure state management to prevent task submission after closure. - Enhanced the Close method to ensure graceful shutdown and wait for the worker to finish processing. * feat(kernel-plugin): refactor storage and RPC methods to use PromiseRun for better error handling * feat(kernel-plugin): enhance plugin event handling with lifecycle and RPC event subscriptions * refactor(kernel-plugin): replace PromiseRun with worker.Run for improved error handling in event and storage methods * chore(kernel-plugin): add goja dependency, drop qjs * chore(kernel-plugin): delete KernelPluginLogger (qjs stdout/stderr only) * refactor(kernel-plugin): replace qjs runtime with goja in plugin.go Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * test(kernel-plugin): add sandbox utility tests (pre-rewrite) * refactor(kernel-plugin): rewrite sandbox utility functions for goja Replace goValueToJsValue, getJsContextValue, dispatchEvent with goja implementations; add convertJsonNumbers helper; stub ObjectFreeze and ObjectSeal as no-ops; delete dead qjs-only helpers (invokeRpcMethod, PromiseAwait, rpcParamsToJsValue, parseJsonArrayStringToJsValueArray, parseJsonStringToJsValue, loggerWrapper, ObjectSetDataMethods). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * refactor(kernel-plugin): rewrite sandbox.go inject functions for goja Replace all qjs-based inject functions (injectGlobalContext, injectPlugin, injectLogger, injectEvent, injectStorage, injectFetch, injectSocket, injectRpc) with goja equivalents. Add ObjectSetDataMethods and loggerWrapper helpers. Remove all remaining qjs dead code; ObjectFreeze/ObjectSeal now call Object.freeze/seal via goja AssertFunction. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * test(kernel-plugin): add plugin lifecycle and RPC integration tests Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * chore(kernel-plugin): go mod tidy after qjs removal Remove fastschema/qjs from go.mod and go.sum, add go-sourcemap as indirect (transitive dep of dop251/goja), mark go-sourcemap indirect. Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com> * fix(kernel-plugin): fix invokeHook early-return on subscribe failure, safe await extraction, and goja value cross-goroutine access in socket methods * refactor(kernel-plugin): replace goValueToJsValue with goValueToJsValueSafely in sandbox functions and tests * feat(plugin): enhance plugin management and error handling - Added GetLoadedPlugin method to retrieve loaded plugin info by name. - Introduced file path for kernel.js in KernelPlugin struct. - Updated Eval method to use the new file path for script execution. - Improved error handling in injectGlobalContext and other injection functions using recover. - Refactored task execution in Worker to use clearer types for task executors and callbacks. - Enhanced storage methods to ensure proper error handling and logging. - Updated loggerWrapper to handle errors more gracefully. - Ensured consistent use of error handling patterns across various plugin methods. * refactor(worker): enhance task execution with goja runtime integration - Updated TaskExecutor and TaskCallback signatures to accept *goja.Runtime. - Modified Worker to start processing tasks with an event loop. - Improved error handling in task execution to catch panics from both executor and callback. - Renamed Close method to Stop for clarity on worker shutdown behavior. * refactor(kernel-plugin): streamline worker implementation and update context handling in plugin methods * refactor(kernel-plugin): update event handler to use byte slices and improve event dispatching * refactor(worker): simplify RunSync method by removing unnecessary select statement * refactor(kernel-plugin): enhance plugin lifecycle management and improve RPC method binding * refactor(kernel-plugin): improve error logging in data methods for better debugging * refactor(kernel-plugin): add version field to plugin data structures and update related methods * refactor(kernel-plugin): replace JsonRpcInboundRequest with JsonRpcRequest and update related methods * refactor(kernel-plugin): enhance plugin lifecycle hooks and improve RPC method invocation * feat(kernel-plugin): improve error handling and response processing in fetch and socket methods * refactor(kernel-plugin): update invokeFunction to handle promise results correctly * refactor(kernel-plugin): streamline event handling and remove unused JSON marshaling functions * refactor(kernel-plugin): improve error handling in start method and add event publishing for lifecycle states * refactor(kernel-plugin): move logging to separate function and execute in goroutines for improved performance * feat(kernel-plugin): add unique ID generation for start and stop events * refactor(kernel-plugin): enhance error handling and concurrency in storage operations Co-authored-by: Copilot <copilot@github.com> * fix(kernel-plugin): remove unexpected resolve in fetch function * feat(kernel-plugin): enhance JSON-RPC request handling with optional parameters and improved error reporting Co-authored-by: Copilot <copilot@github.com> * refactor(kernel-plugin): rename await to async in dispatchEvent function for clarity Co-authored-by: Copilot <copilot@github.com> * fix(kernel-plugin): improve error handling in RPC method execution and hook invocation * feat(kernel-plugin): implement custom JSON marshaling for JsonRpcRequest to handle optional parameters * feat(kernel-plugin): add error codes for plugin state and improve error handling in RPC responses Co-authored-by: Copilot <copilot@github.com> * refactor(kernel-plugin): clean up context usage and improve error logging for RPC methods * feat(kernel-plugin): add buffer method to object for asynchronous data processing * fix(kernel-plugin): Fixed the problem of blocking when plug-in life cycle function is not bound Co-authored-by: Copilot <copilot@github.com> * feat(kernel-plugin): implement public and private web server handlers and enhance request handling Co-authored-by: Copilot <copilot@github.com> * feat(kernel-plugin): enhance server request handling and introduce server handler invocation Co-authored-by: Copilot <copilot@github.com> * feat(kernel-plugin): enhance response handling and add jsValueToBytes conversion utility Co-authored-by: Copilot <copilot@github.com> * feat(kernel-plugin): comment out public web server route in router * feat(kernel-plugin): add WebSocket and EventSource proxy handlers and update sandbox integration Co-authored-by: Copilot <copilot@github.com> * feat(kernel-plugin): implement HTTP proxy handler with response header forwarding * refactor(kernel-plugin): refactor siyuan.client.* methods * feat(kernel-plugin): add support for EventSource with SSE handling and response header forwarding Co-authored-by: Copilot <copilot@github.com> * feat(kernel-plugin): add SSE support using r3labs/sse library for EventSource handling * feat(kernel-plugin): enhance SSE client with onclose event handling Co-authored-by: Copilot <copilot@github.com> * feat(kernel-plugin): implement SSE event handling and error management in server-sent events * feat(kernel-plugin): refactor SSE handling and introduce request handler utility functions Co-authored-by: Copilot <copilot@github.com> * feat(kernel-plugin): enhance WebSocket message handling with buffered amount tracking and cleanup Co-authored-by: Copilot <copilot@github.com> * perf(kernel-plugin): improve WebSocket message handling with channel-based message sending and error management Co-Authored-By: Copilot <copilot@github.com> * refactor(kernel-plugin): remove invokeServerHandler Co-Authored-By: Copilot <copilot@github.com> * feat(kernel-plugin): implement WebSocket message handling with improved structure and error management Co-authored-by: Copilot <copilot@github.com> * refactor(kernel-plugin): Refactor code structure for improved readability and maintainability * refactor(kernel-plugin): streamline HTTP client creation and enhance event source state management Co-authored-by: Copilot <copilot@github.com> * refactor(kernel-plugin): enhance WebSocket and SSE handling with improved closure management and error handling Co-authored-by: Copilot <copilot@github.com> * refactor(kernel-plugin): optimize WebSocket handling by restructuring state management and improving closure logic Co-authored-by: Copilot <copilot@github.com> * refactor(kernel-plugin): simplify header setting and improve null checks in WebSocket and SSE handling Co-authored-by: Copilot <copilot@github.com> * refactor(kernel-plugin): update WebSocket request handling to improve error management and consistency * refactor(kernel-plugin): improve WebSocket error handling by adding close message management Co-authored-by: Copilot <copilot@github.com> * refactor(kernel-plugin): Refactor WebSocket handling to use gws library - Replaced gorilla/websocket with lxzan/gws for WebSocket connections. - Introduced gwsEventHandler to manage WebSocket events with customizable callbacks. - Updated KernelPlugin to track gws connections and handle message broadcasting. - Refactored RPC WebSocket handling to accommodate new gws structure. - Simplified message sending and connection management logic. - Added utility function to check for undefined JavaScript values. Co-authored-by: Copilot <copilot@github.com> * refactor(kernel-plugin): integrate gws library for improved WebSocket handling and error management Co-authored-by: Copilot <copilot@github.com> * refactor(kernel-plugin): remove unnecessary error handling in WebSocket request processing * refactor(kernel-plugin): enhance error logging in WebSocket message handling Co-Authored-By: Copilot <copilot@github.com> * refactor(kernel-plugin): replace gwsEventHandler with WsEventHandler and improve WebSocket management Co-authored-by: Copilot <copilot@github.com> * refactor(kernel-plugin): integrate chanx for improved event handling in SSE * refactor(kernel-plugin): update handleHttpRequest signature to include gin.Context for improved request handling Co-authored-by: Copilot <copilot@github.com> * refactor(kernel-plugin): optimize WebSocket connection management with context and sync mechanisms * refactor(kernel-plugin): improve error handling and context management in WebSocket and HTTP request handling * refactor(kernel-plugin): enhance WebSocket management with context handling and improved error reporting * fix(kernel-plugin): streamline header export and enhance error handling in injectClient function Co-authored-by: Copilot <copilot@github.com> * perf(kernel-plugin): enhance httpProxy and esProxy functions with improved error handling and content management Co-authored-by: Copilot <copilot@github.com> --------- Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com> Co-authored-by: Copilot <copilot@github.com>
835 lines
20 KiB
Go
835 lines
20 KiB
Go
// SiYuan - Refactor your thinking
|
|
// Copyright (c) 2020-present, b3log.org
|
|
//
|
|
// This program is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU Affero General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version.
|
|
//
|
|
// This program is distributed in the hope that it will be useful,
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU Affero General Public License for more details.
|
|
//
|
|
// You should have received a copy of the GNU Affero General Public License
|
|
// along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
|
|
package api
|
|
|
|
import (
|
|
"net/http"
|
|
"strconv"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/88250/gulu"
|
|
"github.com/asaskevich/EventBus"
|
|
"github.com/gin-contrib/sse"
|
|
"github.com/gin-gonic/gin"
|
|
"github.com/olahol/melody"
|
|
"github.com/siyuan-note/logging"
|
|
"github.com/siyuan-note/siyuan/kernel/util"
|
|
)
|
|
|
|
const (
|
|
MessageTypeString MessageType = "string"
|
|
MessageTypeBinary MessageType = "binary"
|
|
MessageTypeClose MessageType = "close"
|
|
|
|
EvtBroadcastMessage = "broadcast.message"
|
|
)
|
|
|
|
var (
|
|
BroadcastChannels = sync.Map{} // [string (channel-name)] -> *BroadcastChannel
|
|
UnifiedSSE = &EventSourceServer{
|
|
EventBus: EventBus.New(),
|
|
WaitGroup: &sync.WaitGroup{},
|
|
Subscriber: &EventSourceSubscriber{
|
|
lock: &sync.Mutex{},
|
|
count: 0,
|
|
},
|
|
}
|
|
messageID = &MessageID{
|
|
lock: &sync.Mutex{},
|
|
id: 0,
|
|
}
|
|
)
|
|
|
|
type MessageType string
|
|
type MessageEventChannel chan *MessageEvent
|
|
|
|
type MessageID struct {
|
|
lock *sync.Mutex
|
|
id uint64
|
|
}
|
|
|
|
func (m *MessageID) Next() uint64 {
|
|
m.lock.Lock()
|
|
defer m.lock.Unlock()
|
|
|
|
m.id++
|
|
return m.id
|
|
}
|
|
|
|
type MessageEvent struct {
|
|
ID string // event ID
|
|
Type MessageType
|
|
Name string // channel name
|
|
Data []byte
|
|
}
|
|
|
|
type BroadcastSubscriber struct {
|
|
Count int // SEE subscriber count
|
|
}
|
|
|
|
type BroadcastChannel struct {
|
|
Name string // channel name
|
|
WebSocket *melody.Melody
|
|
Subscriber *BroadcastSubscriber // SEE subscriber
|
|
}
|
|
|
|
// SubscriberCount gets the total number of subscribers
|
|
func (b *BroadcastChannel) SubscriberCount() int {
|
|
return b.WebSocket.Len() + b.Subscriber.Count + UnifiedSSE.Subscriber.Count()
|
|
}
|
|
|
|
// BroadcastString broadcast string message to all subscribers
|
|
func (b *BroadcastChannel) BroadcastString(message string) (sent bool, err error) {
|
|
data := []byte(message)
|
|
sent = UnifiedSSE.SendEvent(&MessageEvent{
|
|
Type: MessageTypeString,
|
|
Name: b.Name,
|
|
Data: data,
|
|
})
|
|
err = b.WebSocket.Broadcast(data)
|
|
return
|
|
}
|
|
|
|
// BroadcastBinary broadcast binary message to all subscribers
|
|
func (b *BroadcastChannel) BroadcastBinary(data []byte) (sent bool, err error) {
|
|
sent = UnifiedSSE.SendEvent(&MessageEvent{
|
|
Type: MessageTypeBinary,
|
|
Name: b.Name,
|
|
Data: data,
|
|
})
|
|
err = b.WebSocket.BroadcastBinary(data)
|
|
return
|
|
}
|
|
|
|
func (b *BroadcastChannel) HandleRequest(c *gin.Context) {
|
|
if err := b.WebSocket.HandleRequestWithKeys(
|
|
c.Writer,
|
|
c.Request,
|
|
map[string]any{
|
|
"channel": b.Name,
|
|
},
|
|
); err != nil {
|
|
logging.LogErrorf("create broadcast channel failed: %s", err)
|
|
return
|
|
}
|
|
}
|
|
|
|
func (b *BroadcastChannel) Subscribed() bool {
|
|
return b.SubscriberCount() > 0
|
|
}
|
|
|
|
func (b *BroadcastChannel) Destroy(force bool) bool {
|
|
if force || !b.Subscribed() {
|
|
b.WebSocket.Close()
|
|
UnifiedSSE.SendEvent(&MessageEvent{
|
|
Type: MessageTypeClose,
|
|
Name: b.Name,
|
|
})
|
|
logging.LogInfof("destroy broadcast channel [%s]", b.Name)
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
type EventSourceSubscriber struct {
|
|
lock *sync.Mutex
|
|
count int
|
|
}
|
|
|
|
func (s *EventSourceSubscriber) updateCount(delta int) {
|
|
s.lock.Lock()
|
|
defer s.lock.Unlock()
|
|
|
|
s.count += delta
|
|
}
|
|
|
|
func (s *EventSourceSubscriber) Count() int {
|
|
s.lock.Lock()
|
|
defer s.lock.Unlock()
|
|
|
|
return s.count
|
|
}
|
|
|
|
type EventSourceServer struct {
|
|
EventBus EventBus.Bus
|
|
WaitGroup *sync.WaitGroup
|
|
Subscriber *EventSourceSubscriber
|
|
}
|
|
|
|
// SendEvent sends a message to all subscribers
|
|
func (s *EventSourceServer) SendEvent(event *MessageEvent) bool {
|
|
if event.ID == "" {
|
|
switch event.Type {
|
|
case MessageTypeClose:
|
|
default:
|
|
event.ID = strconv.FormatUint(messageID.Next(), 10)
|
|
}
|
|
}
|
|
|
|
s.EventBus.Publish(EvtBroadcastMessage, event)
|
|
return s.EventBus.HasCallback(EvtBroadcastMessage)
|
|
}
|
|
|
|
// Subscribe subscribes to specified broadcast channels
|
|
func (s *EventSourceServer) Subscribe(c *gin.Context, retry uint, channels ...string) {
|
|
wg := sync.WaitGroup{}
|
|
for _, channel := range channels {
|
|
wg.Go(func() {
|
|
var broadcastChannel *BroadcastChannel
|
|
_broadcastChannel, exist := BroadcastChannels.Load(channel)
|
|
if exist { // channel exists, use it
|
|
broadcastChannel = _broadcastChannel.(*BroadcastChannel)
|
|
} else {
|
|
broadcastChannel = ConstructBroadcastChannel(channel)
|
|
}
|
|
broadcastChannel.Subscriber.Count++
|
|
})
|
|
}
|
|
wg.Wait()
|
|
|
|
channelSet := make(map[string]bool)
|
|
for _, channel := range channels {
|
|
channelSet[channel] = true
|
|
}
|
|
|
|
c.Writer.Flush()
|
|
s.Stream(c, func(event *MessageEvent, ok bool) bool {
|
|
if ok {
|
|
if _, exists := channelSet[event.Name]; exists {
|
|
switch event.Type {
|
|
case MessageTypeClose:
|
|
return false
|
|
case MessageTypeString:
|
|
s.SSEvent(c, &sse.Event{
|
|
Id: event.ID,
|
|
Event: event.Name,
|
|
Retry: retry,
|
|
Data: string(event.Data),
|
|
})
|
|
default:
|
|
s.SSEvent(c, &sse.Event{
|
|
Id: event.ID,
|
|
Event: event.Name,
|
|
Retry: retry,
|
|
Data: event.Data,
|
|
})
|
|
}
|
|
c.Writer.Flush()
|
|
return true
|
|
}
|
|
return true
|
|
}
|
|
return false
|
|
})
|
|
|
|
wg.Add(len(channels))
|
|
for _, channel := range channels {
|
|
go func() {
|
|
defer wg.Done()
|
|
_broadcastChannel, exist := BroadcastChannels.Load(channel)
|
|
if exist {
|
|
broadcastChannel := _broadcastChannel.(*BroadcastChannel)
|
|
broadcastChannel.Subscriber.Count--
|
|
if !broadcastChannel.Subscribed() {
|
|
BroadcastChannels.Delete(channel)
|
|
broadcastChannel.Destroy(true)
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
wg.Wait()
|
|
}
|
|
|
|
// SubscribeAll subscribes to all broadcast channels
|
|
func (s *EventSourceServer) SubscribeAll(c *gin.Context, retry uint) {
|
|
s.Subscriber.updateCount(1)
|
|
|
|
c.Writer.Flush()
|
|
s.Stream(c, func(event *MessageEvent, ok bool) bool {
|
|
if ok {
|
|
switch event.Type {
|
|
case MessageTypeClose:
|
|
return true
|
|
case MessageTypeString:
|
|
s.SSEvent(c, &sse.Event{
|
|
Id: event.ID,
|
|
Event: event.Name,
|
|
Retry: retry,
|
|
Data: string(event.Data),
|
|
})
|
|
default:
|
|
s.SSEvent(c, &sse.Event{
|
|
Id: event.ID,
|
|
Event: event.Name,
|
|
Retry: retry,
|
|
Data: event.Data,
|
|
})
|
|
}
|
|
c.Writer.Flush()
|
|
return true
|
|
}
|
|
return false
|
|
})
|
|
|
|
s.Subscriber.updateCount(-1)
|
|
PruneBroadcastChannels()
|
|
}
|
|
|
|
// GetRetry gets the retry interval
|
|
//
|
|
// If the retry interval is not specified, it will return 0
|
|
func (s *EventSourceServer) GetRetry(c *gin.Context) uint {
|
|
value := c.DefaultQuery("retry", "")
|
|
retry, err := strconv.ParseUint(value, 10, 0)
|
|
if err == nil {
|
|
return uint(retry)
|
|
}
|
|
return 0
|
|
}
|
|
|
|
// Stream streams message to client
|
|
//
|
|
// If the client is gone, it will return true
|
|
func (s *EventSourceServer) Stream(c *gin.Context, step func(event *MessageEvent, ok bool) bool) bool {
|
|
channel := make(MessageEventChannel)
|
|
defer close(channel)
|
|
|
|
subscriber := func(event *MessageEvent) {
|
|
channel <- event
|
|
}
|
|
s.EventBus.Subscribe(EvtBroadcastMessage, subscriber)
|
|
defer s.EventBus.Unsubscribe(EvtBroadcastMessage, subscriber)
|
|
|
|
clientGone := c.Writer.CloseNotify()
|
|
for {
|
|
select {
|
|
case <-clientGone:
|
|
logging.LogInfof("event source connection is closed by client")
|
|
return true
|
|
case event, ok := <-channel:
|
|
if step(event, ok) {
|
|
continue
|
|
}
|
|
logging.LogInfof("event source connection is closed by server")
|
|
return false
|
|
}
|
|
}
|
|
}
|
|
|
|
// SSEvent writes a Server-Sent Event into the body stream.
|
|
func (s *EventSourceServer) SSEvent(c *gin.Context, event *sse.Event) {
|
|
c.Render(-1, event)
|
|
}
|
|
|
|
// Subscribed checks whether the SSE server is subscribed
|
|
func (s *EventSourceServer) Subscribed() bool {
|
|
return s.Subscriber.Count() > 0
|
|
}
|
|
|
|
type ChannelInfo struct {
|
|
Name string `json:"name"`
|
|
Count int `json:"count"`
|
|
}
|
|
|
|
type PublishMessage struct {
|
|
Type MessageType `json:"type"` // "string" | "binary"
|
|
Size int `json:"size"` // message size
|
|
Filename string `json:"filename"` // empty string for string-message
|
|
}
|
|
|
|
type PublishResult struct {
|
|
Code int `json:"code"` // 0: success
|
|
Msg string `json:"msg"` // error message
|
|
|
|
Channel ChannelInfo `json:"channel"`
|
|
Message PublishMessage `json:"message"`
|
|
}
|
|
|
|
// broadcast create a broadcast channel WebSocket connection
|
|
//
|
|
// @param
|
|
//
|
|
// {
|
|
// channel: string, // channel name
|
|
// }
|
|
//
|
|
// @example
|
|
//
|
|
// "ws://localhost:6806/ws/broadcast?channel=test"
|
|
func broadcast(c *gin.Context) {
|
|
var (
|
|
channel = c.Query("channel")
|
|
broadcastChannel *BroadcastChannel
|
|
)
|
|
|
|
_broadcastChannel, exist := BroadcastChannels.Load(channel)
|
|
if exist { // channel exists, use it
|
|
broadcastChannel = _broadcastChannel.(*BroadcastChannel)
|
|
if broadcastChannel.WebSocket.IsClosed() { // channel is closed
|
|
// delete channel before creating a new one
|
|
DestroyBroadcastChannel(channel, true)
|
|
} else { // channel is open
|
|
// connect to the existing channel
|
|
broadcastChannel.HandleRequest(c)
|
|
return
|
|
}
|
|
}
|
|
|
|
// create a new channel
|
|
broadcastChannel = ConstructBroadcastChannel(channel)
|
|
broadcastChannel.HandleRequest(c)
|
|
}
|
|
|
|
// GetBroadcastChannel gets a broadcast channel
|
|
//
|
|
// If the channel does not exist but the SSE server is subscribed, it will create a new broadcast channel.
|
|
// If the SSE server is not subscribed, it will return nil.
|
|
func GetBroadcastChannel(channel string) *BroadcastChannel {
|
|
_broadcastChannel, exist := BroadcastChannels.Load(channel)
|
|
if exist {
|
|
return _broadcastChannel.(*BroadcastChannel)
|
|
}
|
|
if UnifiedSSE.Subscribed() {
|
|
return ConstructBroadcastChannel(channel)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// ConstructBroadcastChannel creates a broadcast channel
|
|
func ConstructBroadcastChannel(channel string) *BroadcastChannel {
|
|
websocket := melody.New()
|
|
websocket.Config.MaxMessageSize = 1024 * 1024 * 128 // 128 MiB
|
|
|
|
// broadcast string message to other session
|
|
websocket.HandleMessage(func(s *melody.Session, msg []byte) {
|
|
UnifiedSSE.SendEvent(&MessageEvent{
|
|
Type: MessageTypeString,
|
|
Name: channel,
|
|
Data: msg,
|
|
})
|
|
websocket.BroadcastOthers(msg, s)
|
|
})
|
|
|
|
// broadcast binary message to other session
|
|
websocket.HandleMessageBinary(func(s *melody.Session, msg []byte) {
|
|
UnifiedSSE.SendEvent(&MessageEvent{
|
|
Type: MessageTypeBinary,
|
|
Name: channel,
|
|
Data: msg,
|
|
})
|
|
websocket.BroadcastBinaryOthers(msg, s)
|
|
})
|
|
|
|
// client close the connection
|
|
websocket.HandleClose(func(s *melody.Session, status int, reason string) error {
|
|
channel := s.Keys["channel"].(string)
|
|
logging.LogInfof("close broadcast session in channel [%s] with status code %d: %s", channel, status, reason)
|
|
|
|
DestroyBroadcastChannel(channel, false)
|
|
return nil
|
|
})
|
|
|
|
var broadcastChannel *BroadcastChannel
|
|
for {
|
|
// Melody Initialization is an asynchronous process, so we need to wait for it to complete
|
|
if websocket.IsClosed() {
|
|
time.Sleep(1 * time.Nanosecond)
|
|
} else {
|
|
newBroadcastChannel := &BroadcastChannel{
|
|
Name: channel,
|
|
WebSocket: websocket,
|
|
Subscriber: &BroadcastSubscriber{
|
|
Count: 0,
|
|
},
|
|
}
|
|
_broadcastChannel, loaded := BroadcastChannels.LoadOrStore(channel, newBroadcastChannel)
|
|
broadcastChannel = _broadcastChannel.(*BroadcastChannel)
|
|
if loaded { // channel exists
|
|
if broadcastChannel.WebSocket.IsClosed() { // channel is closed, replace it
|
|
BroadcastChannels.Store(channel, newBroadcastChannel)
|
|
broadcastChannel = newBroadcastChannel
|
|
} else { // channel is open, destroy the new one
|
|
newBroadcastChannel.Destroy(true)
|
|
}
|
|
}
|
|
break
|
|
}
|
|
}
|
|
return broadcastChannel
|
|
}
|
|
|
|
// DestroyBroadcastChannel tries to destroy a broadcast channel
|
|
//
|
|
// Return true if the channel destroy successfully, otherwise false
|
|
func DestroyBroadcastChannel(channel string, force bool) bool {
|
|
_broadcastChannel, exist := BroadcastChannels.Load(channel)
|
|
if !exist {
|
|
return true
|
|
}
|
|
|
|
broadcastChannel := _broadcastChannel.(*BroadcastChannel)
|
|
if force || !broadcastChannel.Subscribed() {
|
|
BroadcastChannels.Delete(channel)
|
|
broadcastChannel.Destroy(true)
|
|
return true
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
// PruneBroadcastChannels prunes all broadcast channels without subscribers
|
|
func PruneBroadcastChannels() []string {
|
|
channels := []string{}
|
|
BroadcastChannels.Range(func(key, value any) bool {
|
|
channel := key.(string)
|
|
broadcastChannel := value.(*BroadcastChannel)
|
|
if !broadcastChannel.Subscribed() {
|
|
BroadcastChannels.Delete(channel)
|
|
broadcastChannel.Destroy(true)
|
|
channels = append(channels, channel)
|
|
}
|
|
return true
|
|
})
|
|
return channels
|
|
}
|
|
|
|
// broadcastSubscribe subscribe to a broadcast channel by SSE
|
|
//
|
|
// If the channel-name does not specified, the client will subscribe to all broadcast channels.
|
|
//
|
|
// @param
|
|
//
|
|
// {
|
|
// retry: string, // retry interval (ms) (optional)
|
|
// channel: string, // channel name (optional, multiple)
|
|
// }
|
|
//
|
|
// @example
|
|
//
|
|
// "http://localhost:6806/es/broadcast/subscribe?retry=1000&channel=test1&channel=test2"
|
|
func broadcastSubscribe(c *gin.Context) {
|
|
// REF: https://github.com/gin-gonic/examples/blob/master/server-sent-event/main.go
|
|
c.Header("Content-Type", "text/event-stream")
|
|
c.Header("Cache-Control", "no-cache")
|
|
c.Header("Connection", "keep-alive")
|
|
c.Header("Transfer-Encoding", "chunked")
|
|
|
|
defer UnifiedSSE.WaitGroup.Done()
|
|
UnifiedSSE.WaitGroup.Add(1)
|
|
|
|
retry := UnifiedSSE.GetRetry(c)
|
|
channels, ok := c.GetQueryArray("channel")
|
|
if ok { // subscribe specified broadcast channels
|
|
UnifiedSSE.Subscribe(c, retry, channels...)
|
|
} else { // subscribe all broadcast channels
|
|
UnifiedSSE.SubscribeAll(c, retry)
|
|
}
|
|
}
|
|
|
|
// broadcastPublish push multiple binary messages to multiple broadcast channels
|
|
//
|
|
// @param
|
|
//
|
|
// MultipartForm: [name] -> [values]
|
|
// - name: string // channel name
|
|
// - values:
|
|
// - string[] // string-messages to the same channel
|
|
// - File[] // binary-messages to the same channel
|
|
// - filename: string // message key
|
|
//
|
|
// @returns
|
|
//
|
|
// {
|
|
// code: int,
|
|
// msg: string,
|
|
// data: {
|
|
// results: {
|
|
// code: int, // 0: success
|
|
// msg: string, // error message
|
|
// channel: {
|
|
// name: string, // channel name
|
|
// count: string, // subscriber count
|
|
// },
|
|
// message: {
|
|
// type: string, // "string" | "binary"
|
|
// size: int, // message size (Bytes)
|
|
// filename: string, // empty string for string-message
|
|
// },
|
|
// }[],
|
|
// },
|
|
// }
|
|
func broadcastPublish(c *gin.Context) {
|
|
ret := gulu.Ret.NewResult()
|
|
defer c.JSON(http.StatusOK, ret)
|
|
|
|
results := []*PublishResult{}
|
|
|
|
// Multipart form
|
|
form, err := c.MultipartForm()
|
|
if err != nil {
|
|
ret.Code = -2
|
|
ret.Msg = err.Error()
|
|
return
|
|
}
|
|
|
|
// Broadcast string messages
|
|
for name, values := range form.Value {
|
|
channel := ChannelInfo{
|
|
Name: name,
|
|
Count: 0,
|
|
}
|
|
|
|
// Get broadcast channel
|
|
broadcastChannel := GetBroadcastChannel(channel.Name)
|
|
if broadcastChannel == nil {
|
|
channel.Count = 0
|
|
} else {
|
|
channel.Count = broadcastChannel.SubscriberCount()
|
|
}
|
|
|
|
// Broadcast each string message to the same channel
|
|
for _, value := range values {
|
|
result := &PublishResult{
|
|
Code: 0,
|
|
Msg: "",
|
|
Channel: channel,
|
|
Message: PublishMessage{
|
|
Type: MessageTypeString,
|
|
Size: len(value),
|
|
Filename: "",
|
|
},
|
|
}
|
|
results = append(results, result)
|
|
|
|
if broadcastChannel != nil {
|
|
_, err := broadcastChannel.BroadcastString(value)
|
|
if err != nil {
|
|
logging.LogErrorf("broadcast message failed: %s", err)
|
|
result.Code = -2
|
|
result.Msg = err.Error()
|
|
continue
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Broadcast binary message
|
|
for name, files := range form.File {
|
|
channel := ChannelInfo{
|
|
Name: name,
|
|
Count: 0,
|
|
}
|
|
|
|
// Get broadcast channel
|
|
broadcastChannel := GetBroadcastChannel(channel.Name)
|
|
if broadcastChannel == nil {
|
|
channel.Count = 0
|
|
} else {
|
|
channel.Count = broadcastChannel.SubscriberCount()
|
|
}
|
|
|
|
// Broadcast each binary message to the same channel
|
|
for _, file := range files {
|
|
result := &PublishResult{
|
|
Code: 0,
|
|
Msg: "",
|
|
Channel: channel,
|
|
Message: PublishMessage{
|
|
Type: MessageTypeBinary,
|
|
Size: int(file.Size),
|
|
Filename: file.Filename,
|
|
},
|
|
}
|
|
results = append(results, result)
|
|
|
|
if broadcastChannel != nil {
|
|
value, err := file.Open()
|
|
if err != nil {
|
|
logging.LogErrorf("open multipart form file [%s] failed: %s", file.Filename, err)
|
|
result.Code = -4
|
|
result.Msg = err.Error()
|
|
continue
|
|
}
|
|
|
|
content := make([]byte, file.Size)
|
|
if _, err := value.Read(content); err != nil {
|
|
logging.LogErrorf("read multipart form file [%s] failed: %s", file.Filename, err)
|
|
result.Code = -3
|
|
result.Msg = err.Error()
|
|
continue
|
|
}
|
|
|
|
if _, err := broadcastChannel.BroadcastBinary(content); err != nil {
|
|
logging.LogErrorf("broadcast binary message failed: %s", err)
|
|
result.Code = -2
|
|
result.Msg = err.Error()
|
|
continue
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
ret.Data = map[string]any{
|
|
"results": results,
|
|
}
|
|
}
|
|
|
|
// postMessage send string message to a broadcast channel
|
|
//
|
|
// @param
|
|
//
|
|
// {
|
|
// channel: string // channel name
|
|
// message: string // message payload
|
|
// }
|
|
//
|
|
// @returns
|
|
//
|
|
// {
|
|
// code: int,
|
|
// msg: string,
|
|
// data: {
|
|
// channel: {
|
|
// name: string, //channel name
|
|
// count: string, //listener count
|
|
// },
|
|
// },
|
|
// }
|
|
func postMessage(c *gin.Context) {
|
|
ret := gulu.Ret.NewResult()
|
|
defer c.JSON(http.StatusOK, ret)
|
|
|
|
arg, ok := util.JsonArg(c, ret)
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
var message, channelName string
|
|
if !util.ParseJsonArgs(arg, ret,
|
|
util.BindJsonArg("message", &message, true, true),
|
|
util.BindJsonArg("channel", &channelName, true, true),
|
|
) {
|
|
return
|
|
}
|
|
|
|
channel := &ChannelInfo{
|
|
Name: channelName,
|
|
Count: 0,
|
|
}
|
|
|
|
broadcastChannel := GetBroadcastChannel(channel.Name)
|
|
if broadcastChannel == nil {
|
|
channel.Count = 0
|
|
} else {
|
|
channel.Count = broadcastChannel.SubscriberCount()
|
|
if _, err := broadcastChannel.BroadcastString(message); err != nil {
|
|
logging.LogErrorf("broadcast message failed: %s", err)
|
|
|
|
ret.Code = -2
|
|
ret.Msg = err.Error()
|
|
return
|
|
}
|
|
}
|
|
ret.Data = map[string]any{
|
|
"channel": channel,
|
|
}
|
|
}
|
|
|
|
// getChannelInfo gets the information of a broadcast channel
|
|
//
|
|
// @param
|
|
//
|
|
// {
|
|
// name: string, // channel name
|
|
// }
|
|
//
|
|
// @returns
|
|
//
|
|
// {
|
|
// code: int,
|
|
// msg: string,
|
|
// data: {
|
|
// channel: {
|
|
// name: string, //channel name
|
|
// count: string, //listener count
|
|
// },
|
|
// },
|
|
// }
|
|
func getChannelInfo(c *gin.Context) {
|
|
ret := gulu.Ret.NewResult()
|
|
defer c.JSON(http.StatusOK, ret)
|
|
|
|
arg, ok := util.JsonArg(c, ret)
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
var name string
|
|
if !util.ParseJsonArgs(arg, ret, util.BindJsonArg("name", &name, true, true)) {
|
|
return
|
|
}
|
|
|
|
channel := &ChannelInfo{
|
|
Name: name,
|
|
Count: 0,
|
|
}
|
|
|
|
if _broadcastChannel, ok := BroadcastChannels.Load(channel.Name); !ok {
|
|
channel.Count = 0
|
|
} else {
|
|
var broadcastChannel = _broadcastChannel.(*BroadcastChannel)
|
|
channel.Count = broadcastChannel.SubscriberCount()
|
|
}
|
|
|
|
ret.Data = map[string]any{
|
|
"channel": channel,
|
|
}
|
|
}
|
|
|
|
// getChannels gets the channel name and lintener number of all broadcast chanel
|
|
//
|
|
// @returns
|
|
//
|
|
// {
|
|
// code: int,
|
|
// msg: string,
|
|
// data: {
|
|
// channels: {
|
|
// name: string, //channel name
|
|
// count: string, //listener count
|
|
// }[],
|
|
// },
|
|
// }
|
|
func getChannels(c *gin.Context) {
|
|
ret := gulu.Ret.NewResult()
|
|
defer c.JSON(http.StatusOK, ret)
|
|
|
|
channels := []*ChannelInfo{}
|
|
BroadcastChannels.Range(func(key, value any) bool {
|
|
broadcastChannel := value.(*BroadcastChannel)
|
|
channels = append(channels, &ChannelInfo{
|
|
Name: key.(string),
|
|
Count: broadcastChannel.SubscriberCount(),
|
|
})
|
|
return true
|
|
})
|
|
ret.Data = map[string]any{
|
|
"channels": channels,
|
|
}
|
|
}
|