Files
siyuan/kernel/plugin/api_client.go
Yingyi / 颖逸 e564ce7b1f Support kernel plugin system (#17487)
* ♻️ Add/update indirect Go dependencies in kernel

Update kernel/go.mod and kernel/go.sum to add multiple indirect modules and checksum entries. Notable additions include github.com/fastschema/qjs, github.com/filecoin-project/go-jsonrpc, github.com/ipfs/go-log/v2, go.opencensus.io, go.uber.org/{atomic,multierr,zap}, golang.org/x/xerrors and github.com/golang/groupcache among many transitive entries. Changes ensure transitive dependencies are pinned and go.sum checksums are present (likely produced by `go mod tidy`) to make builds reproducible.

* refactor: export bazaar.GetCurrentBackend for kernel plugin platform matching

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* build: promote qjs to direct dependency for kernel plugin system

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* feat(plugin): add KernelPlugin struct with QJS runtime lifecycle and state machine

Introduces plugin/plugin.go with KernelPlugin owning an isolated QuickJS
runtime, a mutex-serialized call path, RPC method registration/dispatch,
Promise awaiting, JSON round-trip result conversion, and WebSocket tracking.
Adds sandbox_stub.go as a temporary no-op stub for injectSandboxGlobals.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* feat(plugin): add PluginManager singleton for kernel plugin discovery and lifecycle

* feat(plugin): add sandbox injection scaffold with siyuan.log

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* feat(plugin): implement siyuan.storage CRUD scoped to petal storage directory

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* feat(plugin): implement siyuan.fetch with browser-like Response interface

* feat(plugin): implement siyuan.socket with browser-compatible WebSocket API

- Add sync import for mutex-protected WebSocket connection tracking
- Implement __siyuan_socket Go function that creates browser-compatible WebSocket objects
- Support send() method with queueing for messages sent before connection opens
- Support close() method for closing the WebSocket connection
- Track connection state via readyState property (0=CONNECTING, 1=OPEN, 3=CLOSED)
- Connect to kernel WebSocket endpoint with automatic auth token injection
- Run WebSocket I/O in background goroutine with proper cleanup
- Wire up siyuan.socket JS API

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* feat(plugin): implement siyuan.rpc.register for JSON-RPC method registration

* feat(plugin): add JSON-RPC 2.0 handler for kernel plugin method dispatch

* feat(plugin): register /api/plugin/rpc/:name and /ws/plugin/rpc/:name routes

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* feat(plugin): wire kernel plugin manager start/stop into main lifecycle

* feat(plugin): hook SetPetalEnabled to start/stop kernel plugins on enable/disable

* test(plugin): add unit tests for kernel plugin state machine and eligibility

* test(plugin): add comprehensive unit tests for manager, sandbox, and RPC handlers

* refactor(plugin): Export IsTargetSupported and update usages

Rename isTargetSupported to exported IsTargetSupported and adjust its comment. Replace local calls with bazaar.IsTargetSupported in kernel/bazaar and kernel/plugin/manager, removing the duplicated isKernelEligible helper. Update tests to import bazaar, call the new function, and change expectations to reflect that nil/empty kernel slices are treated as supported (i.e. supported on all platforms).

* refactor(plugin): initialize PluginManager in main and update related usages

* refactor(plugin): update JWT handling and plugin initialization for kernel plugins

* refactor(plugin): enhance plugin initialization and improve sandbox global injections

* refactor(kernel-plugin): Refactor plugin RPC registration and sandbox integration

- Removed deprecated tests and refactored existing tests for clarity and efficiency.
- Updated RPC method registration to use `bind` and `unbind` methods for better clarity.
- Enhanced the `injectSandboxGlobals` function to include additional properties for the plugin.
- Improved error handling in RPC methods and ensured proper state management for plugins.
- Added benchmarks for map to JS conversion performance.
- Cleaned up unused imports and organized code structure for better readability.

* refactor(plugin): enhance concurrency handling and improve WebSocket integration

* refactor(kernel-plugin): enhance RPC method handling and improve function registration

* feat(kernel-plugin): add RPC method info retrieval and enhance plugin management

* refactor(plugin): add plugin management endpoints and enhance plugin info retrieval

* refactor(kernel-plugin): enhance RPC method handling and improve plugin info retrieval

* refactor(kernel-plugin): improve error handling and response structures in RPC methods

* refactor(kernel-plugin): improve error handling in RPC methods and enhance WebSocket closure management

* fix(kernel-plugin): initialize sockets and socketMus maps in NewKernelPlugin

* feat(kernel-plugin): add wsWrite helper and fix PushNotification omitempty

Add wsWrite method on KernelPlugin that acquires the per-connection write
mutex before sending a text frame, returning nil for untracked connections.
Fix PushNotification's Params field to use omitempty for JSON-RPC 2.0 §4.2
compliance. Add rpc_test.go with newTestWsPair helper and tests for wsWrite.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* feat(kernel-plugin): add BroadcastNotification and per-connection write mutex

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* feat(kernel-plugin): expose siyuan.rpc.broadcast in plugin sandbox

Add rpc.broadcast(method, params) binding in injectRpc so JS plugins
can push JSON-RPC 2.0 notifications to all connected server clients.
Fix deadlock by introducing a dedicated socketsMu RWMutex for the
sockets map, decoupling socket tracking from the main plugin mutex
that is held during Start()/Eval().

* fix(kernel-plugin): double-unlock in send handler and document PushNotification write-safety

Remove spurious mu.Unlock() inside the nil-conn branch of injectSocket's
CONNECTING-state send handler; the outer unconditional unlock is sufficient,
so the inner one causes a panic under concurrent load.

Document that PushNotification bypasses per-connection write serialization
and must not be called concurrently with BroadcastNotification/wsWrite on
the same connection without external locking.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* style(kernel-plugin): align struct field declarations in KernelPlugin

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix(kernel-plugin): omit params field from JsonRpcRequest when nil (JSON-RPC 2.0 §4.1)

Per spec, params MAY be omitted; add omitempty so marshaled requests
with no parameters do not emit "params":null.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* refactor(kernel-plugin): change JsonRpcRequest.Params to *json.RawMessage

A pointer correctly models the three-way distinction:
- nil      → params key absent (omitted from marshal output via omitempty)
- non-nil → params present (null, array, or object)

The previous []byte omitempty omitted the key only for nil/empty slices
and could not distinguish absent from explicit null on the wire.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* refactor(kernel-plugin): unify method naming conventions and improve JSON-RPC request handling

* fix(kernel-plugin): improve WebSocket message handling and ensure thread safety with mutexes

* fix(kernel-plugin): enhance WebSocket handling and improve error management in storage methods

* fix(kernel-plugin): rename JsonRpcRequestRaw to JsonRpcInboundRequest and update related methods

* fix(kernel-plugin): improve plugin management and error handling in kernel plugin methods

* fix(kernel-plugin): rename kernel field to kernels and update related references

* feat(kernel-plugin): implement logging and improve concurrency handling in plugin manager and storage methods

* feat(kernel-plugin): enhance RPC parameter handling and add JSON array parsing support

* refactor(kernel-plugin): refactor RPC handling and improve logging functionality

* refactor(kernel-plugin): streamline loggerWrapper function and improve error handling in injectFetch

* refactor(kernel-plugin): optimize injectFetch function and enhance error handling

* feat(kernel-plugin): add onLoaded hook and enhance plugin lifecycle management

* feat(kernel-plugin): add ObjectFreeze and ObjectSeal functions to enhance API security

* feat(kernel-plugin): add InitJwtKey function to generate JWT signing key

* refactor(kernel-plugin): enhance error handling and logging in plugin lifecycle methods

* feat(kernel-plugin): improve WebSocket error handling and add concurrency support in BroadcastNotification

* feat(kernel-plugin): enhance error handling in storage and fetch methods with panic recovery

* feat(kernel-plugin): enhance PluginManager concurrency and error handling with sync.Map and atomic operations

* feat(kernel-plugin): refactor PluginState to use atomic operations for improved concurrency

* feat(kernel-plugin): add PluginStateLoaded and update state management in plugin lifecycle

* refactor(kernel-plugin): update logging level in loadPetals and refactor loggerWrapper return values

* feat(kernel-plugin): simplify invokeHook and enhance error handling in Object methods

* feat(kernel-plugin): remove obsolete test files for plugin functionality

* refactor(kernel-plugin): implement loggerWrapper and rpcParamsToJsValue functions for improved logging and RPC parameter handling

* feat(kernel-plugin): introduce Worker for serializing plugin tasks and enhance context management

* refactor(worker): enhance task execution with callback support and graceful shutdown

- Introduced a callback mechanism in the Task struct to handle results and errors.
- Updated the Run method to accept a callback, allowing immediate handling of task results.
- Added a RunSync method for synchronous task execution with result retrieval.
- Implemented atomic closure state management to prevent task submission after closure.
- Enhanced the Close method to ensure graceful shutdown and wait for the worker to finish processing.

* feat(kernel-plugin): refactor storage and RPC methods to use PromiseRun for better error handling

* feat(kernel-plugin): enhance plugin event handling with lifecycle and RPC event subscriptions

* refactor(kernel-plugin): replace PromiseRun with worker.Run for improved error handling in event and storage methods

* chore(kernel-plugin): add goja dependency, drop qjs

* chore(kernel-plugin): delete KernelPluginLogger (qjs stdout/stderr only)

* refactor(kernel-plugin): replace qjs runtime with goja in plugin.go

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* test(kernel-plugin): add sandbox utility tests (pre-rewrite)

* refactor(kernel-plugin): rewrite sandbox utility functions for goja

Replace goValueToJsValue, getJsContextValue, dispatchEvent with goja
implementations; add convertJsonNumbers helper; stub ObjectFreeze and
ObjectSeal as no-ops; delete dead qjs-only helpers (invokeRpcMethod,
PromiseAwait, rpcParamsToJsValue, parseJsonArrayStringToJsValueArray,
parseJsonStringToJsValue, loggerWrapper, ObjectSetDataMethods).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* refactor(kernel-plugin): rewrite sandbox.go inject functions for goja

Replace all qjs-based inject functions (injectGlobalContext, injectPlugin,
injectLogger, injectEvent, injectStorage, injectFetch, injectSocket, injectRpc)
with goja equivalents. Add ObjectSetDataMethods and loggerWrapper helpers.
Remove all remaining qjs dead code; ObjectFreeze/ObjectSeal now call
Object.freeze/seal via goja AssertFunction.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* test(kernel-plugin): add plugin lifecycle and RPC integration tests

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* chore(kernel-plugin): go mod tidy after qjs removal

Remove fastschema/qjs from go.mod and go.sum, add go-sourcemap as
indirect (transitive dep of dop251/goja), mark go-sourcemap indirect.

Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>

* fix(kernel-plugin): fix invokeHook early-return on subscribe failure, safe await extraction, and goja value cross-goroutine access in socket methods

* refactor(kernel-plugin): replace goValueToJsValue with goValueToJsValueSafely in sandbox functions and tests

* feat(plugin): enhance plugin management and error handling

- Added GetLoadedPlugin method to retrieve loaded plugin info by name.
- Introduced file path for kernel.js in KernelPlugin struct.
- Updated Eval method to use the new file path for script execution.
- Improved error handling in injectGlobalContext and other injection functions using recover.
- Refactored task execution in Worker to use clearer types for task executors and callbacks.
- Enhanced storage methods to ensure proper error handling and logging.
- Updated loggerWrapper to handle errors more gracefully.
- Ensured consistent use of error handling patterns across various plugin methods.

* refactor(worker): enhance task execution with goja runtime integration

- Updated TaskExecutor and TaskCallback signatures to accept *goja.Runtime.
- Modified Worker to start processing tasks with an event loop.
- Improved error handling in task execution to catch panics from both executor and callback.
- Renamed Close method to Stop for clarity on worker shutdown behavior.

* refactor(kernel-plugin): streamline worker implementation and update context handling in plugin methods

* refactor(kernel-plugin): update event handler to use byte slices and improve event dispatching

* refactor(worker): simplify RunSync method by removing unnecessary select statement

* refactor(kernel-plugin): enhance plugin lifecycle management and improve RPC method binding

* refactor(kernel-plugin): improve error logging in data methods for better debugging

* refactor(kernel-plugin): add version field to plugin data structures and update related methods

* refactor(kernel-plugin): replace JsonRpcInboundRequest with JsonRpcRequest and update related methods

* refactor(kernel-plugin): enhance plugin lifecycle hooks and improve RPC method invocation

* feat(kernel-plugin): improve error handling and response processing in fetch and socket methods

* refactor(kernel-plugin): update invokeFunction to handle promise results correctly

* refactor(kernel-plugin): streamline event handling and remove unused JSON marshaling functions

* refactor(kernel-plugin): improve error handling in start method and add event publishing for lifecycle states

* refactor(kernel-plugin): move logging to separate function and execute in goroutines for improved performance

* feat(kernel-plugin): add unique ID generation for start and stop events

* refactor(kernel-plugin): enhance error handling and concurrency in storage operations

Co-authored-by: Copilot <copilot@github.com>

* fix(kernel-plugin): remove unexpected resolve in fetch function

* feat(kernel-plugin): enhance JSON-RPC request handling with optional parameters and improved error reporting

Co-authored-by: Copilot <copilot@github.com>

* refactor(kernel-plugin): rename await to async in dispatchEvent function for clarity

Co-authored-by: Copilot <copilot@github.com>

* fix(kernel-plugin): improve error handling in RPC method execution and hook invocation

* feat(kernel-plugin): implement custom JSON marshaling for JsonRpcRequest to handle optional parameters

* feat(kernel-plugin): add error codes for plugin state and improve error handling in RPC responses

Co-authored-by: Copilot <copilot@github.com>

* refactor(kernel-plugin): clean up context usage and improve error logging for RPC methods

* feat(kernel-plugin): add buffer method to object for asynchronous data processing

* fix(kernel-plugin): Fixed the problem of blocking when plug-in life cycle function is not bound

Co-authored-by: Copilot <copilot@github.com>

* feat(kernel-plugin): implement public and private web server handlers and enhance request handling

Co-authored-by: Copilot <copilot@github.com>

* feat(kernel-plugin): enhance server request handling and introduce server handler invocation

Co-authored-by: Copilot <copilot@github.com>

* feat(kernel-plugin): enhance response handling and add jsValueToBytes conversion utility

Co-authored-by: Copilot <copilot@github.com>

* feat(kernel-plugin): comment out public web server route in router

* feat(kernel-plugin): add WebSocket and EventSource proxy handlers and update sandbox integration

Co-authored-by: Copilot <copilot@github.com>

* feat(kernel-plugin): implement HTTP proxy handler with response header forwarding

* refactor(kernel-plugin): refactor siyuan.client.* methods

* feat(kernel-plugin): add support for EventSource with SSE handling and response header forwarding

Co-authored-by: Copilot <copilot@github.com>

* feat(kernel-plugin): add SSE support using r3labs/sse library for EventSource handling

* feat(kernel-plugin): enhance SSE client with onclose event handling

Co-authored-by: Copilot <copilot@github.com>

* feat(kernel-plugin): implement SSE event handling and error management in server-sent events

* feat(kernel-plugin): refactor SSE handling and introduce request handler utility functions

Co-authored-by: Copilot <copilot@github.com>

* feat(kernel-plugin): enhance WebSocket message handling with buffered amount tracking and cleanup

Co-authored-by: Copilot <copilot@github.com>

* perf(kernel-plugin): improve WebSocket message handling with channel-based message sending and error management

Co-Authored-By: Copilot <copilot@github.com>

* refactor(kernel-plugin): remove invokeServerHandler

Co-Authored-By: Copilot <copilot@github.com>

* feat(kernel-plugin): implement WebSocket message handling with improved structure and error management

Co-authored-by: Copilot <copilot@github.com>

* refactor(kernel-plugin): Refactor code structure for improved readability and maintainability

* refactor(kernel-plugin): streamline HTTP client creation and enhance event source state management

Co-authored-by: Copilot <copilot@github.com>

* refactor(kernel-plugin): enhance WebSocket and SSE handling with improved closure management and error handling

Co-authored-by: Copilot <copilot@github.com>

* refactor(kernel-plugin): optimize WebSocket handling by restructuring state management and improving closure logic

Co-authored-by: Copilot <copilot@github.com>

* refactor(kernel-plugin): simplify header setting and improve null checks in WebSocket and SSE handling

Co-authored-by: Copilot <copilot@github.com>

* refactor(kernel-plugin): update WebSocket request handling to improve error management and consistency

* refactor(kernel-plugin): improve WebSocket error handling by adding close message management

Co-authored-by: Copilot <copilot@github.com>

* refactor(kernel-plugin): Refactor WebSocket handling to use gws library

- Replaced gorilla/websocket with lxzan/gws for WebSocket connections.
- Introduced gwsEventHandler to manage WebSocket events with customizable callbacks.
- Updated KernelPlugin to track gws connections and handle message broadcasting.
- Refactored RPC WebSocket handling to accommodate new gws structure.
- Simplified message sending and connection management logic.
- Added utility function to check for undefined JavaScript values.

Co-authored-by: Copilot <copilot@github.com>

* refactor(kernel-plugin): integrate gws library for improved WebSocket handling and error management

Co-authored-by: Copilot <copilot@github.com>

* refactor(kernel-plugin): remove unnecessary error handling in WebSocket request processing

* refactor(kernel-plugin): enhance error logging in WebSocket message handling

Co-Authored-By: Copilot <copilot@github.com>

* refactor(kernel-plugin): replace gwsEventHandler with WsEventHandler and improve WebSocket management

Co-authored-by: Copilot <copilot@github.com>

* refactor(kernel-plugin): integrate chanx for improved event handling in SSE

* refactor(kernel-plugin): update handleHttpRequest signature to include gin.Context for improved request handling

Co-authored-by: Copilot <copilot@github.com>

* refactor(kernel-plugin): optimize WebSocket connection management with context and sync mechanisms

* refactor(kernel-plugin): improve error handling and context management in WebSocket and HTTP request handling

* refactor(kernel-plugin): enhance WebSocket management with context handling and improved error reporting

* fix(kernel-plugin): streamline header export and enhance error handling in injectClient function

Co-authored-by: Copilot <copilot@github.com>

* perf(kernel-plugin): enhance httpProxy and esProxy functions with improved error handling and content management

Co-authored-by: Copilot <copilot@github.com>

---------

Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-authored-by: Copilot <copilot@github.com>
2026-05-09 11:26:37 +08:00

717 lines
22 KiB
Go

// SiYuan - Refactor your thinking
// Copyright (c) 2020-present, b3log.org
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
package plugin
import (
"context"
"errors"
"fmt"
"io"
"net/http"
"strings"
"sync"
"sync/atomic"
"github.com/dop251/goja"
"github.com/lxzan/gws"
sse "github.com/r3labs/sse/v2"
"github.com/samber/lo"
"github.com/siyuan-note/logging"
"github.com/siyuan-note/siyuan/kernel/model"
"github.com/siyuan-note/siyuan/kernel/util"
)
// injectClient adds siyuan.server to the goja context.
func injectClient(p *KernelPlugin, rt *goja.Runtime, siyuan *goja.Object) (err error) {
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("injectClient: %v", r)
}
}()
client := rt.NewObject()
lo.Must0(client.Set("fetch", rt.ToValue(func(call goja.FunctionCall, rt *goja.Runtime) goja.Value {
promise, resolve, reject := rt.NewPromise()
runErr := p.worker.Run(func(rt *goja.Runtime) (_ any, err error) {
var path string
if goja.IsString(call.Argument(0)) {
path = call.Argument(0).String()
} else {
err = fmt.Errorf("path required")
return
}
if !strings.HasPrefix(path, "/") {
err = fmt.Errorf("path must start with /")
return
}
method := "GET"
headers := map[string]string{}
var bodyString string
var bodyBytes []byte
if init := call.Argument(1); isJsValueNotNull(init) {
if initObj := init.ToObject(rt); initObj != nil {
if m := initObj.Get("method"); goja.IsString(m) {
method = m.String()
}
if h := initObj.Get("headers"); isJsValueNotNull(h) {
if exportErr := rt.ExportTo(h, &headers); exportErr != nil {
err = fmt.Errorf("failed to export headers: %w", exportErr)
return
}
}
if b := initObj.Get("body"); isJsValueNotNull(b) {
if goja.IsString(b) {
bodyString = b.String()
} else {
body := b.Export()
if arrayBuffer, ok := body.(goja.ArrayBuffer); ok {
src := arrayBuffer.Bytes()
bodyBytes = make([]byte, len(src))
copy(bodyBytes, src)
}
}
}
}
}
go func() {
var err error
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("panic during siyuan.client.fetch: %v", r)
}
if err != nil {
p.worker.Run(func(rt *goja.Runtime) (_ any, _ error) {
if rejectErr := reject(rt.NewGoError(err)); rejectErr != nil {
logging.LogErrorf("[plugin:%s] siyuan.client.fetch reject: %v", p.Name, rejectErr)
}
return
}, nil)
}
}()
targetURL := fmt.Sprintf("http://127.0.0.1:%s%s", util.ServerPort, path)
r := httpClient.R()
for k, v := range headers {
r.SetHeader(k, v)
}
r.SetHeader(model.XAuthTokenKey, p.token)
if bodyString != "" {
r.SetBody(bodyString)
} else if len(bodyBytes) > 0 {
r.SetBody(bodyBytes)
}
resp, sendErr := r.Send(method, targetURL)
if sendErr != nil {
err = sendErr
return
}
defer resp.Body.Close()
body, readErr := io.ReadAll(resp.Body)
if readErr != nil {
err = fmt.Errorf("failed to read response body: %w", readErr)
return
}
responseHeader := map[string]string{}
for k, vs := range resp.Header {
responseHeader[k] = strings.Join(vs, ", ")
}
runErr := p.worker.Run(func(rt *goja.Runtime) (result any, err error) {
response := rt.NewObject()
lo.Must0(response.Set("url", rt.ToValue(path)))
lo.Must0(response.Set("ok", rt.ToValue(resp.StatusCode >= 200 && resp.StatusCode < 300)))
lo.Must0(response.Set("status", rt.ToValue(resp.StatusCode)))
lo.Must0(response.Set("statusText", rt.ToValue(resp.Status)))
lo.Must0(response.Set("headers", rt.ToValue(responseHeader)))
lo.Must0(ObjectSetDataMethods(p, rt, response, body))
result = response
return
}, func(rt *goja.Runtime, result any, err error) {
if lo.IsNil(err) {
if resolveErr := resolve(result); resolveErr != nil {
logging.LogErrorf("[plugin:%s] siyuan.client.fetch resolve: %v", p.Name, resolveErr)
}
} else {
if rejectErr := reject(rt.NewGoError(err)); rejectErr != nil {
logging.LogErrorf("[plugin:%s] siyuan.client.fetch reject: %v", p.Name, rejectErr)
}
}
})
if runErr != nil {
err = runErr
return
}
}()
return
}, func(rt *goja.Runtime, _ any, err error) {
if !lo.IsNil(err) {
if rejectErr := reject(rt.NewGoError(err)); rejectErr != nil {
logging.LogErrorf("[plugin:%s] siyuan.client.fetch reject: %v", p.Name, rejectErr)
}
}
})
if runErr != nil {
logging.LogErrorf("[plugin:%s] siyuan.client.fetch worker run: %v", p.Name, runErr)
if rejectErr := reject(rt.NewGoError(runErr)); rejectErr != nil {
logging.LogErrorf("[plugin:%s] siyuan.client.fetch reject: %v", p.Name, rejectErr)
}
}
return rt.ToValue(promise)
})))
lo.Must0(client.Set("socket", rt.ToValue(func(call goja.FunctionCall, rt *goja.Runtime) goja.Value {
promise, resolve, reject := rt.NewPromise()
runErr := p.worker.Run(func(rt *goja.Runtime) (result any, err error) {
var path string
if goja.IsString(call.Argument(0)) {
path = call.Argument(0).String()
} else {
err = fmt.Errorf("path required")
return
}
if !strings.HasPrefix(path, "/") {
err = fmt.Errorf("path must start with /")
return
}
var protocols []string
if proto := call.Argument(1); isJsValueNotNull(proto) {
if protoObj := proto.ToObject(rt); protoObj != nil && protoObj.ClassName() == "Array" {
if arr, ok := proto.Export().([]interface{}); ok {
for _, v := range arr {
protocols = append(protocols, fmt.Sprintf("%v", v))
}
}
} else {
protocols = []string{proto.String()}
}
}
var gwsConn atomic.Pointer[gws.Conn]
var readyState atomic.Int64
var bufferedAmount atomic.Int64
readyState.Store(int64(WebSocketReadyStateConnecting))
wsURL := fmt.Sprintf("ws://127.0.0.1:%s%s", util.ServerPort, path)
wsHeader := http.Header{}
wsHeader.Set(model.XAuthTokenKey, p.token)
if len(protocols) > 0 {
wsHeader.Set("Sec-WebSocket-Protocol", strings.Join(protocols, ", "))
}
wsObj := rt.NewObject()
invokeHook := func(_ *goja.Runtime, name string, args ...goja.Value) {
hook := wsObj.Get(name)
if fn, ok := goja.AssertFunction(hook); ok {
if _, callErr := fn(wsObj, args...); callErr != nil {
logging.LogErrorf("[plugin:%s] ws hook %q: %v", p.Name, name, callErr)
}
}
}
setProtocol := func(rt *goja.Runtime, protocol string) {
wsObj.Set("protocol", rt.ToValue(protocol))
}
setReadyState := func(rt *goja.Runtime, state WebSocketState) {
readyState.Store(int64(state))
wsObj.Set("readyState", rt.ToValue(state))
}
updateBufferedAmount := func(rt *goja.Runtime, delta int) {
bufferedAmount.Add(int64(delta))
wsObj.Set("bufferedAmount", rt.ToValue(bufferedAmount.Load()))
}
h := &WsEventHandler{p: p}
manager := &WsManager{
BufferedAmount: &bufferedAmount,
InvokeHook: invokeHook,
SetProtocol: setProtocol,
SetReadyState: setReadyState,
}
h.BindOnOpen(manager)
h.BindOnClose(manager)
h.BindOnPing(manager)
h.BindOnPong(manager)
h.BindOnMessage(manager)
var openOnce sync.Once
ctx, cancel := context.WithCancel(p.context)
var closeOnce sync.Once
doClose := func() {
closeOnce.Do(func() {
cancel()
})
}
ws_open := rt.ToValue(func(openCall goja.FunctionCall, rt *goja.Runtime) goja.Value {
openPromise, openResolve, openReject := rt.NewPromise()
openRunErr := p.worker.Run(func(rt *goja.Runtime) (opening any, err error) {
opening = false
openOnce.Do(func() {
opening = true
go func() {
conn, _, dialErr := gws.NewClient(h, &gws.ClientOption{
Addr: wsURL,
RequestHeader: wsHeader,
})
if dialErr != nil {
p.worker.Run(func(rt *goja.Runtime) (_ any, _ error) {
event := rt.NewObject()
event.Set("type", rt.ToValue("error"))
event.Set("error", rt.NewGoError(dialErr))
invokeHook(rt, "onerror", event)
return
}, nil)
p.worker.Run(func(rt *goja.Runtime) (_ any, dialErr2 error) {
dialErr2 = dialErr
return
}, func(rt *goja.Runtime, _ any, dialErr2 error) {
if rejectErr := openReject(rt.NewGoError(dialErr2)); rejectErr != nil {
logging.LogErrorf("[plugin:%s] siyuan.client.socket.open reject: %v", p.Name, rejectErr)
}
})
doClose()
return
}
gwsConn.Store(conn)
go func() {
<-ctx.Done()
conn.NetConn().Close()
}()
// Resolve the open promise before starting ReadLoop so the caller
// can await open() and then rely on onopen for additional setup.
p.worker.Run(func(rt *goja.Runtime) (_ any, _ error) {
if resolveErr := openResolve(nil); resolveErr != nil {
logging.LogErrorf("[plugin:%s] siyuan.client.socket.open resolve: %v", p.Name, resolveErr)
}
return
}, nil)
conn.ReadLoop()
doClose()
}()
})
return
}, func(rt *goja.Runtime, result any, err error) {
if lo.IsNil(err) {
if opening, ok := result.(bool); !ok || !opening {
if resolveErr := openResolve(nil); resolveErr != nil {
logging.LogErrorf("[plugin:%s] siyuan.client.socket.open resolve: %v", p.Name, resolveErr)
}
}
} else {
if rejectErr := openReject(rt.NewGoError(err)); rejectErr != nil {
logging.LogErrorf("[plugin:%s] siyuan.client.socket.open reject: %v", p.Name, rejectErr)
}
}
})
if openRunErr != nil {
logging.LogErrorf("[plugin:%s] siyuan.client.socket.open worker run: %v", p.Name, openRunErr)
}
return rt.ToValue(openPromise)
})
ws_send := rt.ToValue(func(sendCall goja.FunctionCall, rt *goja.Runtime) goja.Value {
sendPromise, sendResolve, sendReject := rt.NewPromise()
sendRunErr := p.worker.Run(func(rt *goja.Runtime) (_ any, err error) {
var messageData []byte
var opcode gws.Opcode
if data := sendCall.Argument(0); isJsValueNotNull(data) {
if arrayBuffer, ok := data.Export().(goja.ArrayBuffer); ok {
opcode = gws.OpcodeBinary
b := arrayBuffer.Bytes()
messageData = make([]byte, len(b))
copy(messageData, b) // ArrayBuffer.Bytes() points into JS engine memory; copy before async send
} else {
opcode = gws.OpcodeText
messageData = []byte(data.String())
}
}
state := WebSocketState(readyState.Load())
if state == WebSocketReadyStateClosing || state == WebSocketReadyStateClosed {
err = fmt.Errorf("WebSocket is not open (state: %d)", state)
return
}
c := gwsConn.Load()
if c == nil {
err = fmt.Errorf("WebSocket not yet connected")
return
}
updateBufferedAmount(rt, len(messageData))
c.WriteAsync(opcode, messageData, func(writeErr error) {
p.worker.Run(func(rt *goja.Runtime) (_ any, err error) {
if writeErr == nil {
updateBufferedAmount(rt, -len(messageData))
} else {
err = writeErr
}
return
}, func(rt *goja.Runtime, result any, err error) {
if lo.IsNil(err) {
if resolveErr := sendResolve(result); resolveErr != nil {
logging.LogErrorf("[plugin:%s] siyuan.client.socket.send resolve: %v", p.Name, resolveErr)
}
} else {
if rejectErr := sendReject(rt.NewGoError(err)); rejectErr != nil {
logging.LogErrorf("[plugin:%s] siyuan.client.socket.send reject: %v", p.Name, rejectErr)
}
}
})
})
return
}, func(rt *goja.Runtime, _ any, err error) {
if !lo.IsNil(err) {
if rejectErr := sendReject(rt.NewGoError(err)); rejectErr != nil {
logging.LogErrorf("[plugin:%s] siyuan.client.socket.send reject: %v", p.Name, rejectErr)
}
}
})
if sendRunErr != nil {
logging.LogErrorf("[plugin:%s] siyuan.client.socket.send worker run: %v", p.Name, sendRunErr)
}
return rt.ToValue(sendPromise)
})
ws_ping := rt.ToValue(func(pingCall goja.FunctionCall, rt *goja.Runtime) goja.Value {
pingPromise, pingResolve, pingReject := rt.NewPromise()
pingRunErr := p.worker.Run(func(rt *goja.Runtime) (result any, err error) {
var pingData string
if isJsValueNotNull(pingCall.Argument(0)) {
pingData = pingCall.Argument(0).String()
}
if c := gwsConn.Load(); c != nil {
err = c.WritePing([]byte(pingData))
} else {
err = fmt.Errorf("WebSocket not yet connected")
}
return
}, func(rt *goja.Runtime, result any, err error) {
if lo.IsNil(err) {
if resolveErr := pingResolve(result); resolveErr != nil {
logging.LogErrorf("[plugin:%s] siyuan.client.socket.ping resolve: %v", p.Name, resolveErr)
}
} else {
if rejectErr := pingReject(rt.NewGoError(err)); rejectErr != nil {
logging.LogErrorf("[plugin:%s] siyuan.client.socket.ping reject: %v", p.Name, rejectErr)
}
}
})
if pingRunErr != nil {
logging.LogErrorf("[plugin:%s] siyuan.client.socket.ping worker run: %v", p.Name, pingRunErr)
}
return rt.ToValue(pingPromise)
})
ws_pong := rt.ToValue(func(pongCall goja.FunctionCall, rt *goja.Runtime) goja.Value {
pongPromise, pongResolve, pongReject := rt.NewPromise()
pongRunErr := p.worker.Run(func(rt *goja.Runtime) (result any, err error) {
var pongData string
if isJsValueNotNull(pongCall.Argument(0)) {
pongData = pongCall.Argument(0).String()
}
if c := gwsConn.Load(); c != nil {
err = c.WritePong([]byte(pongData))
} else {
err = fmt.Errorf("WebSocket not yet connected")
}
return
}, func(rt *goja.Runtime, result any, err error) {
if lo.IsNil(err) {
if resolveErr := pongResolve(result); resolveErr != nil {
logging.LogErrorf("[plugin:%s] siyuan.client.socket.pong resolve: %v", p.Name, resolveErr)
}
} else {
if rejectErr := pongReject(rt.NewGoError(err)); rejectErr != nil {
logging.LogErrorf("[plugin:%s] siyuan.client.socket.pong reject: %v", p.Name, rejectErr)
}
}
})
if pongRunErr != nil {
logging.LogErrorf("[plugin:%s] siyuan.client.socket.pong worker run: %v", p.Name, pongRunErr)
}
return rt.ToValue(pongPromise)
})
ws_close := rt.ToValue(func(closeCall goja.FunctionCall, rt *goja.Runtime) goja.Value {
closePromise, closeResolve, closeReject := rt.NewPromise()
closeRunErr := p.worker.Run(func(rt *goja.Runtime) (result any, err error) {
code := uint16(1000)
var reason []byte
if isJsValueNotNull(closeCall.Argument(0)) {
code = uint16(closeCall.Argument(0).ToInteger())
}
if isJsValueNotNull(closeCall.Argument(1)) {
reason = []byte(closeCall.Argument(1).String())
}
if c := gwsConn.Load(); c != nil {
setReadyState(rt, WebSocketReadyStateClosing)
err = c.WriteClose(code, reason)
}
return
}, func(rt *goja.Runtime, result any, err error) {
if lo.IsNil(err) {
if resolveErr := closeResolve(result); resolveErr != nil {
logging.LogErrorf("[plugin:%s] siyuan.client.socket.close resolve: %v", p.Name, resolveErr)
}
} else {
if rejectErr := closeReject(rt.NewGoError(err)); rejectErr != nil {
logging.LogErrorf("[plugin:%s] siyuan.client.socket.close reject: %v", p.Name, rejectErr)
}
}
})
if closeRunErr != nil {
logging.LogErrorf("[plugin:%s] siyuan.client.socket.close worker run: %v", p.Name, closeRunErr)
}
return rt.ToValue(closePromise)
})
lo.Must0(wsObj.Set("binaryType", rt.ToValue("arraybuffer")))
lo.Must0(wsObj.Set("bufferedAmount", rt.ToValue(bufferedAmount.Load())))
lo.Must0(wsObj.Set("extensions", rt.ToValue("")))
lo.Must0(wsObj.Set("protocol", rt.ToValue("")))
lo.Must0(wsObj.Set("readyState", rt.ToValue(readyState.Load())))
lo.Must0(wsObj.Set("url", rt.ToValue(wsURL)))
lo.Must0(wsObj.Set("onopen", goja.Null()))
lo.Must0(wsObj.Set("onmessage", goja.Null()))
lo.Must0(wsObj.Set("onping", goja.Null()))
lo.Must0(wsObj.Set("onpong", goja.Null()))
lo.Must0(wsObj.Set("onclose", goja.Null()))
lo.Must0(wsObj.Set("onerror", goja.Null()))
lo.Must0(wsObj.Set("open", ws_open))
lo.Must0(wsObj.Set("send", ws_send))
lo.Must0(wsObj.Set("ping", ws_ping))
lo.Must0(wsObj.Set("pong", ws_pong))
lo.Must0(wsObj.Set("close", ws_close))
lo.Must0(ObjectSeal(rt, wsObj))
result = wsObj
return
}, func(rt *goja.Runtime, result any, err error) {
if lo.IsNil(err) {
if resolveErr := resolve(result); resolveErr != nil {
logging.LogErrorf("[plugin:%s] siyuan.client.socket resolve: %v", p.Name, resolveErr)
}
} else {
if rejectErr := reject(rt.NewGoError(err)); rejectErr != nil {
logging.LogErrorf("[plugin:%s] siyuan.client.socket reject: %v", p.Name, rejectErr)
}
}
})
if runErr != nil {
logging.LogErrorf("[plugin:%s] siyuan.client.socket worker run: %v", p.Name, runErr)
}
return rt.ToValue(promise)
})))
lo.Must0(client.Set("event", rt.ToValue(func(call goja.FunctionCall, rt *goja.Runtime) goja.Value {
promise, resolve, reject := rt.NewPromise()
runErr := p.worker.Run(func(rt *goja.Runtime) (result any, err error) {
var path string
if goja.IsString(call.Argument(0)) {
path = call.Argument(0).String()
} else {
err = fmt.Errorf("path required")
return
}
if !strings.HasPrefix(path, "/") {
err = fmt.Errorf("path must start with /")
return
}
var readyState atomic.Int64
readyState.Store(int64(EventSourceConnecting))
esURL := fmt.Sprintf("http://127.0.0.1:%s%s", util.ServerPort, path)
ctx, cancel := context.WithCancel(p.context)
var closeOnce sync.Once
doClose := func() {
closeOnce.Do(func() {
cancel()
})
}
esObj := rt.NewObject()
setReadyState := func(state EventSourceState) {
readyState.Store(int64(state))
esObj.Set("readyState", rt.ToValue(state))
}
invokeEsHook := func(name string, args ...goja.Value) {
hook := esObj.Get(name)
if fn, ok := goja.AssertFunction(hook); ok {
if _, callErr := fn(esObj, args...); callErr != nil {
logging.LogErrorf("[plugin:%s] es hook %q: %v", p.Name, name, callErr)
}
}
}
es_close := rt.ToValue(func(goja.FunctionCall) goja.Value {
setReadyState(EventSourceClosed)
doClose()
return goja.Undefined()
})
lo.Must0(esObj.Set("readyState", rt.ToValue(readyState.Load())))
lo.Must0(esObj.Set("url", rt.ToValue(path)))
lo.Must0(esObj.Set("onopen", goja.Null()))
lo.Must0(esObj.Set("onmessage", goja.Null()))
lo.Must0(esObj.Set("onclose", goja.Null()))
lo.Must0(esObj.Set("onerror", goja.Null()))
lo.Must0(esObj.Set("close", es_close))
lo.Must0(ObjectSeal(rt, esObj))
setReadyState(EventSourceConnecting)
go func() {
var err error
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("panic during siyuan.client.event: %v", r)
}
doClose()
p.worker.Run(func(rt *goja.Runtime) (_ any, _ error) {
if err != nil && !errors.Is(err, context.Canceled) {
event := rt.NewObject()
event.Set("type", rt.ToValue("error"))
event.Set("error", rt.NewGoError(err))
invokeEsHook("onerror", event)
}
if EventSourceState(readyState.Load()) != EventSourceClosed {
setReadyState(EventSourceClosed)
}
return
}, nil)
}()
sseClient := sse.NewClient(esURL)
sseClient.Headers[model.XAuthTokenKey] = p.token
sseClient.OnConnect(func(_ *sse.Client) {
p.worker.Run(func(rt *goja.Runtime) (_ any, _ error) {
setReadyState(EventSourceOpen)
event := rt.NewObject()
event.Set("type", rt.ToValue("open"))
invokeEsHook("onopen", event)
return
}, nil)
})
sseClient.OnDisconnect(func(_ *sse.Client) {
p.worker.Run(func(rt *goja.Runtime) (_ any, _ error) {
setReadyState(EventSourceClosed)
event := rt.NewObject()
event.Set("type", rt.ToValue("close"))
invokeEsHook("onclose", event)
return
}, nil)
})
err = sseClient.SubscribeRawWithContext(ctx, func(msg *sse.Event) {
p.worker.Run(func(rt *goja.Runtime) (_ any, _ error) {
typ := "message"
if len(msg.Event) > 0 {
typ = string(msg.Event)
}
event := rt.NewObject()
event.Set("type", rt.ToValue(typ))
event.Set("data", rt.ToValue(string(msg.Data)))
event.Set("lastEventId", rt.ToValue(string(msg.ID)))
invokeEsHook("onmessage", event)
return
}, nil)
})
}()
result = esObj
return
}, func(rt *goja.Runtime, result any, err error) {
if lo.IsNil(err) {
if resolveErr := resolve(result); resolveErr != nil {
logging.LogErrorf("[plugin:%s] siyuan.client.event resolve: %v", p.Name, resolveErr)
}
} else {
if rejectErr := reject(rt.NewGoError(err)); rejectErr != nil {
logging.LogErrorf("[plugin:%s] siyuan.client.event reject: %v", p.Name, rejectErr)
}
}
})
if runErr != nil {
logging.LogErrorf("[plugin:%s] siyuan.client.event worker run: %v", p.Name, runErr)
}
return rt.ToValue(promise)
})))
lo.Must0(ObjectFreeze(rt, client))
lo.Must0(siyuan.Set("client", client))
return
}