diff --git a/apps/server/src/main.ts b/apps/server/src/main.ts index 5490341ef..a2f6c835f 100644 --- a/apps/server/src/main.ts +++ b/apps/server/src/main.ts @@ -14,8 +14,8 @@ import { userSettings, writingStyleMatrix, } from './db/schema'; +import { env, DurableObject, RpcTarget, WorkerEntrypoint } from 'cloudflare:workers'; import { EProviders, type ISubscribeBatch, type IThreadBatch } from './types'; -import { env, DurableObject, RpcTarget } from 'cloudflare:workers'; import { oAuthDiscoveryMetadata } from 'better-auth/plugins'; import { getZeroDB, verifyToken } from './lib/server-utils'; import { eq, and, desc, asc, inArray } from 'drizzle-orm'; @@ -26,7 +26,6 @@ import { defaultUserSettings } from './lib/schemas'; import { createLocalJWKSet, jwtVerify } from 'jose'; import { getZeroAgent } from './lib/server-utils'; import { enableBrainFunction } from './lib/brain'; -import { withSentry } from '@sentry/cloudflare'; import { trpcServer } from '@hono/trpc-server'; import { agentsMiddleware } from 'hono-agents'; import { ZeroMCP } from './routes/agent/mcp'; @@ -718,150 +717,145 @@ const app = new Hono() return c.json({ message: 'OK' }, { status: 200 }); } }); -export default withSentry( - () => ({ - dsn: 'https://54d9ec6795f10e5c6d1c4851523d4888@o4509328786915328.ingest.us.sentry.io/4509753563938816', - }), - { - async fetch(request: Request, env: Env, ctx: ExecutionContext): Promise { - return app.fetch(request, env, ctx); - }, - async queue(batch: MessageBatch) { - switch (true) { - case batch.queue.startsWith('subscribe-queue'): { - console.log('batch', batch); - await Promise.all( - batch.messages.map(async (msg: Message) => { - const connectionId = msg.body.connectionId; - const providerId = msg.body.providerId; - try { - await enableBrainFunction({ id: connectionId, providerId }); - } catch (error) { - console.error( - `Failed to enable brain function for connection ${connectionId}:`, - error, - ); - } - }), - ); - console.log('[SUBSCRIBE_QUEUE] batch done'); - return; - } - case batch.queue.startsWith('thread-queue'): { - await Promise.all( - batch.messages.map(async (msg: Message) => { - const providerId = msg.body.providerId; - const historyId = msg.body.historyId; - const subscriptionName = msg.body.subscriptionName; - - try { - const workflowRunner = env.WORKFLOW_RUNNER.get(env.WORKFLOW_RUNNER.newUniqueId()); - const result = await workflowRunner.runMainWorkflow({ - providerId, - historyId, - subscriptionName, - }); - console.log('[THREAD_QUEUE] result', result); - } catch (error) { - console.error('Error running workflow', error); - } - }), - ); - break; - } - } - }, - async scheduled() { - console.log('[SCHEDULED] Checking for expired subscriptions...'); - const { db, conn } = createDb(env.HYPERDRIVE.connectionString); - const allAccounts = await db.query.connection.findMany({ - where: (fields, { isNotNull, and }) => - and(isNotNull(fields.accessToken), isNotNull(fields.refreshToken)), - }); - await conn.end(); - console.log('[SCHEDULED] allAccounts', allAccounts.length); - const now = new Date(); - const fiveDaysAgo = new Date(now.getTime() - 5 * 24 * 60 * 60 * 1000); - - const expiredSubscriptions: Array<{ connectionId: string; providerId: EProviders }> = []; - - const nowTs = Date.now(); - - const unsnoozeMap: Record = {}; - - let cursor: string | undefined = undefined; - do { - const listResp: { - keys: { name: string; metadata?: { wakeAt?: string } }[]; - cursor?: string; - } = await env.snoozed_emails.list({ cursor, limit: 1000 }); - cursor = listResp.cursor; - - for (const key of listResp.keys) { - try { - const wakeAtIso = (key as any).metadata?.wakeAt as string | undefined; - if (!wakeAtIso) continue; - const wakeAt = new Date(wakeAtIso).getTime(); - if (wakeAt > nowTs) continue; - - const [threadId, connectionId] = key.name.split('__'); - if (!threadId || !connectionId) continue; - - if (!unsnoozeMap[connectionId]) { - unsnoozeMap[connectionId] = { threadIds: [], keyNames: [] }; - } - unsnoozeMap[connectionId].threadIds.push(threadId); - unsnoozeMap[connectionId].keyNames.push(key.name); - } catch (error) { - console.error('Failed to prepare unsnooze for key', key.name, error); - } - } - } while (cursor); - - await Promise.all( - Object.entries(unsnoozeMap).map(async ([connectionId, { threadIds, keyNames }]) => { - try { - const agent = await getZeroAgent(connectionId); - await agent.queue('unsnoozeThreadsHandler', { connectionId, threadIds, keyNames }); - } catch (error) { - console.error('Failed to enqueue unsnooze tasks', { connectionId, threadIds, error }); - } - }), - ); - - await Promise.all( - allAccounts.map(async ({ id, providerId }) => { - const lastSubscribed = await env.gmail_sub_age.get(`${id}__${providerId}`); - - if (lastSubscribed) { - const subscriptionDate = new Date(lastSubscribed); - if (subscriptionDate < fiveDaysAgo) { - console.log(`[SCHEDULED] Found expired Google subscription for connection: ${id}`); - expiredSubscriptions.push({ connectionId: id, providerId: providerId as EProviders }); - } - } else { - expiredSubscriptions.push({ connectionId: id, providerId: providerId as EProviders }); - } - }), - ); - - // Send expired subscriptions to queue for renewal - if (expiredSubscriptions.length > 0) { - console.log( - `[SCHEDULED] Sending ${expiredSubscriptions.length} expired subscriptions to renewal queue`, - ); +export default class Entry extends WorkerEntrypoint { + async fetch(request: Request): Promise { + return app.fetch(request, this.env, this.ctx); + } + async queue(batch: MessageBatch) { + switch (true) { + case batch.queue.startsWith('subscribe-queue'): { + console.log('batch', batch); await Promise.all( - expiredSubscriptions.map(async ({ connectionId, providerId }) => { - await env.subscribe_queue.send({ connectionId, providerId }); + batch.messages.map(async (msg: Message) => { + const connectionId = msg.body.connectionId; + const providerId = msg.body.providerId; + try { + await enableBrainFunction({ id: connectionId, providerId }); + } catch (error) { + console.error( + `Failed to enable brain function for connection ${connectionId}:`, + error, + ); + } }), ); + console.log('[SUBSCRIBE_QUEUE] batch done'); + return; } + case batch.queue.startsWith('thread-queue'): { + await Promise.all( + batch.messages.map(async (msg: Message) => { + const providerId = msg.body.providerId; + const historyId = msg.body.historyId; + const subscriptionName = msg.body.subscriptionName; + try { + const workflowRunner = env.WORKFLOW_RUNNER.get(env.WORKFLOW_RUNNER.newUniqueId()); + const result = await workflowRunner.runMainWorkflow({ + providerId, + historyId, + subscriptionName, + }); + console.log('[THREAD_QUEUE] result', result); + } catch (error) { + console.error('Error running workflow', error); + } + }), + ); + break; + } + } + } + async scheduled() { + console.log('[SCHEDULED] Checking for expired subscriptions...'); + const { db, conn } = createDb(env.HYPERDRIVE.connectionString); + const allAccounts = await db.query.connection.findMany({ + where: (fields, { isNotNull, and }) => + and(isNotNull(fields.accessToken), isNotNull(fields.refreshToken)), + }); + await conn.end(); + console.log('[SCHEDULED] allAccounts', allAccounts.length); + const now = new Date(); + const fiveDaysAgo = new Date(now.getTime() - 5 * 24 * 60 * 60 * 1000); + + const expiredSubscriptions: Array<{ connectionId: string; providerId: EProviders }> = []; + + const nowTs = Date.now(); + + const unsnoozeMap: Record = {}; + + let cursor: string | undefined = undefined; + do { + const listResp: { + keys: { name: string; metadata?: { wakeAt?: string } }[]; + cursor?: string; + } = await env.snoozed_emails.list({ cursor, limit: 1000 }); + cursor = listResp.cursor; + + for (const key of listResp.keys) { + try { + const wakeAtIso = (key as any).metadata?.wakeAt as string | undefined; + if (!wakeAtIso) continue; + const wakeAt = new Date(wakeAtIso).getTime(); + if (wakeAt > nowTs) continue; + + const [threadId, connectionId] = key.name.split('__'); + if (!threadId || !connectionId) continue; + + if (!unsnoozeMap[connectionId]) { + unsnoozeMap[connectionId] = { threadIds: [], keyNames: [] }; + } + unsnoozeMap[connectionId].threadIds.push(threadId); + unsnoozeMap[connectionId].keyNames.push(key.name); + } catch (error) { + console.error('Failed to prepare unsnooze for key', key.name, error); + } + } + } while (cursor); + + await Promise.all( + Object.entries(unsnoozeMap).map(async ([connectionId, { threadIds, keyNames }]) => { + try { + const agent = await getZeroAgent(connectionId); + await agent.queue('unsnoozeThreadsHandler', { connectionId, threadIds, keyNames }); + } catch (error) { + console.error('Failed to enqueue unsnooze tasks', { connectionId, threadIds, error }); + } + }), + ); + + await Promise.all( + allAccounts.map(async ({ id, providerId }) => { + const lastSubscribed = await env.gmail_sub_age.get(`${id}__${providerId}`); + + if (lastSubscribed) { + const subscriptionDate = new Date(lastSubscribed); + if (subscriptionDate < fiveDaysAgo) { + console.log(`[SCHEDULED] Found expired Google subscription for connection: ${id}`); + expiredSubscriptions.push({ connectionId: id, providerId: providerId as EProviders }); + } + } else { + expiredSubscriptions.push({ connectionId: id, providerId: providerId as EProviders }); + } + }), + ); + + // Send expired subscriptions to queue for renewal + if (expiredSubscriptions.length > 0) { console.log( - `[SCHEDULED] Processed ${allAccounts.keys.length} accounts, found ${expiredSubscriptions.length} expired subscriptions`, + `[SCHEDULED] Sending ${expiredSubscriptions.length} expired subscriptions to renewal queue`, ); - }, - } satisfies ExportedHandler, -); + await Promise.all( + expiredSubscriptions.map(async ({ connectionId, providerId }) => { + await env.subscribe_queue.send({ connectionId, providerId }); + }), + ); + } + + console.log( + `[SCHEDULED] Processed ${allAccounts.keys.length} accounts, found ${expiredSubscriptions.length} expired subscriptions`, + ); + } +} export { ZeroAgent, ZeroMCP, ZeroDB, ZeroDriver, ThinkingMCP, WorkflowRunner };