diff --git a/README.md b/README.md index dbd458a7..80ca509c 100644 --- a/README.md +++ b/README.md @@ -193,6 +193,7 @@ You can set up Zero in two ways: - Go to the [Twilio](https://www.twilio.com/) - Create a Twilio account if you don’t already have one - From the dashboard, locate your: + - Account SID - Auth Token - Phone Number @@ -265,6 +266,15 @@ Zero uses PostgreSQL for storing data. Here's how to set it up: ``` > If you run `pnpm dev` in your terminal, the studio command should be automatically running with the app. +### Sync + +Background: https://x.com/cmdhaus/status/1940886269950902362 +We're now storing the user's emails in their Durable Object & an R2 bucket. This allow us to speed things up, a lot. +This also introduces 3 environment variables, `DROP_AGENT_TABLES`,`THREAD_SYNC_MAX_COUNT`, `THREAD_SYNC_LOOP`. +`DROP_AGENT_TABLES`: should the durable object drop the threads table before starting a sync +`THREAD_SYNC_MAX_COUNT`: how many threads should we sync? max `500` because it's using the same number for the maxResults number from the driver. i.e 500 results per page. +`THREAD_SYNC_LOOP`: should make sure to sync all of the items inside a folder? i.e if THREAD_SYNC_MAX_COUNT=500 it will sync 500 threads per request until the folder is fully synced. (should be true in production) + ## Contribute Please refer to the [contributing guide](.github/CONTRIBUTING.md). diff --git a/apps/mail/components/party.tsx b/apps/mail/components/party.tsx index 09bcd42e..36218329 100644 --- a/apps/mail/components/party.tsx +++ b/apps/mail/components/party.tsx @@ -1,4 +1,6 @@ import { useActiveConnection } from '@/hooks/use-connections'; +import { useSearchValue } from '@/hooks/use-search-value'; +import useSearchLabels from '@/hooks/use-labels-search'; import { useQueryClient } from '@tanstack/react-query'; import { useTRPC } from '@/providers/query-provider'; import { usePartySocket } from 'partysocket/react'; @@ -27,6 +29,8 @@ export const NotificationProvider = () => { const trpc = useTRPC(); const queryClient = useQueryClient(); const { data: activeConnection } = useActiveConnection(); + const [searchValue] = useSearchValue(); + const { labels } = useSearchLabels(); const labelsDebouncer = funnel( () => queryClient.invalidateQueries({ queryKey: trpc.labels.list.queryKey() }), @@ -41,14 +45,37 @@ export const NotificationProvider = () => { party: 'zero-agent', room: activeConnection?.id ? String(activeConnection.id) : 'general', prefix: 'agents', - maxRetries: 1, + maxRetries: 3, host: import.meta.env.VITE_PUBLIC_BACKEND_URL!, onMessage: async (message: MessageEvent) => { try { - const { threadIds, type } = JSON.parse(message.data); + const { type } = JSON.parse(message.data); if (type === IncomingMessageType.Mail_Get) { - const { threadId, result } = JSON.parse(message.data); - // queryClient.setQueryData(trpc.mail.get.queryKey({ id: threadId }), result); + const { threadId } = JSON.parse(message.data); + queryClient.invalidateQueries({ + queryKey: trpc.mail.get.queryKey({ id: threadId }), + refetchType: 'active', + exact: true, + predicate: (query) => { + const queryAge = Date.now() - (query.state.dataUpdatedAt || 0); + console.log({ queryAge, query: query.queryKey }); + return queryAge > 60000; // 1 minute in milliseconds + }, + }); + } else if (type === IncomingMessageType.Mail_List) { + const { folder } = JSON.parse(message.data); + console.log({ + folder, + labelIds: labels, + q: searchValue.value, + }); + queryClient.invalidateQueries({ + queryKey: trpc.mail.listThreads.infiniteQueryKey({ + folder, + labelIds: labels, + q: searchValue.value, + }), + }); } } catch (error) { console.error('error parsing party message', error); diff --git a/apps/server/src/pipelines.ts b/apps/server/src/pipelines.ts index a7712c70..c6eef90e 100644 --- a/apps/server/src/pipelines.ts +++ b/apps/server/src/pipelines.ts @@ -313,6 +313,20 @@ export class ZeroWorkflow extends WorkflowEntrypoint { }, ); + await step.do(`[ZERO_WORKFLOW] Sync Threads ${historyProcessingKey}`, async () => { + const agent = env.ZERO_AGENT.get(env.ZERO_AGENT.idFromName(connectionId.toString())); + for (const threadId of threadsToProcess) { + try { + await agent.syncThread(threadId.toString()); + } catch (error) { + log('[ZERO_WORKFLOW] Failed to sync thread:', { + threadId, + error: error instanceof Error ? error.message : String(error), + }); + } + } + }); + await step.do( `[ZERO_WORKFLOW] Send Thread Workflow Instances ${connectionId}`, async () => { @@ -462,11 +476,11 @@ export class ThreadWorkflow extends WorkflowEntrypoint { async () => { log('[THREAD_WORKFLOW] Getting thread:', threadId); const thread = await driver.get(threadId.toString()); - await notifyUser({ - connectionId: connectionId.toString(), - result: thread, - threadId: threadId.toString(), - }); + // await notifyUser({ + // connectionId: connectionId.toString(), + // result: thread, + // threadId: threadId.toString(), + // }); log('[THREAD_WORKFLOW] Found thread with messages:', thread.messages.length); return thread; }, diff --git a/apps/server/src/routes/chat.ts b/apps/server/src/routes/chat.ts index 07224322..8d6ea44e 100644 --- a/apps/server/src/routes/chat.ts +++ b/apps/server/src/routes/chat.ts @@ -114,17 +114,10 @@ export type OutgoingMessage = } | { type: OutgoingMessageType.Mail_List; - result: { - threads: { - id: string; - historyId: string | null; - }[]; - nextPageToken: string | null; - }; + folder: string; } | { type: OutgoingMessageType.Mail_Get; - result: IGetThreadResponse; threadId: string; }; @@ -174,6 +167,16 @@ export class AgentRpcDO extends RpcTarget { return await this.mainDo.buildGmailSearchQuery(query); } + async rawListThreads(params: { + folder: string; + query?: string; + maxResults?: number; + labelIds?: string[]; + pageToken?: string; + }) { + return await this.mainDo.rawListThreads(params); + } + async listThreads(params: { folder: string; query?: string; @@ -190,19 +193,19 @@ export class AgentRpcDO extends RpcTarget { async markThreadsRead(threadIds: string[]) { const result = await this.mainDo.markThreadsRead(threadIds); - // await Promise.all(threadIds.map((id) => this.mainDo.syncThread(id))); + await Promise.all(threadIds.map((id) => this.mainDo.syncThread(id))); return result; } async markThreadsUnread(threadIds: string[]) { const result = await this.mainDo.markThreadsUnread(threadIds); - // await Promise.all(threadIds.map((id) => this.mainDo.syncThread(id))); + await Promise.all(threadIds.map((id) => this.mainDo.syncThread(id))); return result; } async modifyLabels(threadIds: string[], addLabelIds: string[], removeLabelIds: string[]) { const result = await this.mainDo.modifyLabels(threadIds, addLabelIds, removeLabelIds); - // await Promise.all(threadIds.map((id) => this.mainDo.syncThread(id))); + await Promise.all(threadIds.map((id) => this.mainDo.syncThread(id))); return result; } @@ -234,13 +237,13 @@ export class AgentRpcDO extends RpcTarget { async markAsRead(threadIds: string[]) { const result = await this.mainDo.markAsRead(threadIds); - // await Promise.all(threadIds.map((id) => this.mainDo.syncThread(id))); + await Promise.all(threadIds.map((id) => this.mainDo.syncThread(id))); return result; } async markAsUnread(threadIds: string[]) { const result = await this.mainDo.markAsUnread(threadIds); - // await Promise.all(threadIds.map((id) => this.mainDo.syncThread(id))); + await Promise.all(threadIds.map((id) => this.mainDo.syncThread(id))); return result; } @@ -305,25 +308,27 @@ const shouldLoop = env.THREAD_SYNC_LOOP !== 'false'; export class ZeroAgent extends AIChatAgent { private chatMessageAbortControllers: Map = new Map(); - private foldersInSync: string[] = []; + private foldersInSync: Map = new Map(); + private syncThreadsInProgress: Map = new Map(); private currentFolder: string | null = 'inbox'; driver: MailManager | null = null; constructor(ctx: DurableObjectState, env: Env) { super(ctx, env); if (shouldDropTables) this.dropTables(); - // this.sql` - // CREATE TABLE IF NOT EXISTS threads ( - // id TEXT PRIMARY KEY, - // created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, - // updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, - // thread_id TEXT NOT NULL, - // provider_id TEXT NOT NULL, - // latest_sender TEXT, - // latest_received_on TEXT, - // latest_subject TEXT, - // latest_label_ids TEXT - // ); - // `; + this.sql` + CREATE TABLE IF NOT EXISTS threads ( + id TEXT PRIMARY KEY, + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + thread_id TEXT NOT NULL, + provider_id TEXT NOT NULL, + latest_sender TEXT, + latest_received_on TEXT, + latest_subject TEXT, + latest_label_ids TEXT, + categories TEXT + ); + `; } async dropTables() { @@ -394,7 +399,7 @@ export class ZeroAgent extends AIChatAgent { }); if (_connection) this.driver = connectionToDriver(_connection); this.ctx.waitUntil(conn.end()); - // this.ctx.waitUntil(this.syncThreads('inbox')); + this.ctx.waitUntil(this.syncThreads('inbox')); // this.ctx.waitUntil(this.syncThreads('sent')); // this.ctx.waitUntil(this.syncThreads('spam')); // this.ctx.waitUntil(this.syncThreads('archive')); @@ -602,6 +607,19 @@ export class ZeroAgent extends AIChatAgent { maxResults?: number; labelIds?: string[]; pageToken?: string; + }) { + if (!this.driver) { + throw new Error('No driver available'); + } + return await this.getThreadsFromDB(params); + } + + async rawListThreads(params: { + folder: string; + query?: string; + maxResults?: number; + labelIds?: string[]; + pageToken?: string; }) { if (!this.driver) { throw new Error('No driver available'); @@ -613,7 +631,7 @@ export class ZeroAgent extends AIChatAgent { if (!this.driver) { throw new Error('No driver available'); } - return await this.driver.get(threadId); + return await this.getThreadFromDB(threadId); } async markThreadsRead(threadIds: string[]) { @@ -758,7 +776,7 @@ export class ZeroAgent extends AIChatAgent { if (!this.driver) { throw new Error('No driver available'); } - return await this.driver.list(params); + return await this.getThreadsFromDB(params); } async markAsRead(threadIds: string[]) { @@ -786,7 +804,7 @@ export class ZeroAgent extends AIChatAgent { if (!this.driver) { throw new Error('No driver available'); } - return await this.driver.get(id); + return await this.getThreadFromDB(id); } async sendDraft(id: string, data: IOutgoingMessage) { @@ -835,6 +853,12 @@ export class ZeroAgent extends AIChatAgent { throw new Error('No driver available'); } + if (this.syncThreadsInProgress.has(threadId)) { + console.log(`Sync already in progress for thread ${threadId}, skipping...`); + return; + } + this.syncThreadsInProgress.set(threadId, true); + try { const threadData = await this.driver.get(threadId); const latest = threadData.latest; @@ -843,10 +867,7 @@ export class ZeroAgent extends AIChatAgent { // Convert receivedOn to ISO format for proper sorting const normalizedReceivedOn = new Date(latest.receivedOn).toISOString(); - await env.THREADS_BUCKET.put( - this.getThreadKey(threadId), - JSON.stringify(threadData.messages), - ); + await env.THREADS_BUCKET.put(this.getThreadKey(threadId), JSON.stringify(threadData)); this.sql` INSERT OR REPLACE INTO threads ( @@ -872,10 +893,10 @@ export class ZeroAgent extends AIChatAgent { if (this.currentFolder === 'inbox') { this.broadcastChatMessage({ type: OutgoingMessageType.Mail_Get, - result: threadData, threadId, }); } + this.syncThreadsInProgress.delete(threadId); return { success: true, threadId, threadData }; } else { console.log(`Skipping thread ${threadId} - no latest message`); @@ -888,7 +909,7 @@ export class ZeroAgent extends AIChatAgent { } getThreadKey(threadId: string) { - return `${this.name}/${threadId}`; + return `${this.name}/${threadId}.json`; } async syncThreads(folder: string) { @@ -897,7 +918,7 @@ export class ZeroAgent extends AIChatAgent { throw new Error('No driver available'); } - if (this.foldersInSync.includes(folder)) { + if (this.foldersInSync.has(folder)) { console.log('Sync already in progress, skipping...'); return { synced: 0, message: 'Sync already in progress' }; } @@ -908,7 +929,7 @@ export class ZeroAgent extends AIChatAgent { return { synced: 0, message: 'Threads already synced' }; } - this.foldersInSync.push(folder); + this.foldersInSync.set(folder, true); try { let totalSynced = 0; @@ -933,6 +954,11 @@ export class ZeroAgent extends AIChatAgent { } } + this.broadcastChatMessage({ + type: OutgoingMessageType.Mail_List, + folder, + }); + totalSynced += result.threads.length; pageToken = result.nextPageToken; hasMore = pageToken !== null && shouldLoop; @@ -944,221 +970,225 @@ export class ZeroAgent extends AIChatAgent { throw error; } finally { console.log('Setting isSyncing to false'); - this.foldersInSync = this.foldersInSync.filter((f) => f !== folder); + this.foldersInSync.delete(folder); + this.broadcastChatMessage({ + type: OutgoingMessageType.Mail_List, + folder, + }); } } - // async getThreadsFromDB(params: { - // labelIds?: string[]; - // folder?: string; - // q?: string; - // max?: number; - // cursor?: string; - // }) { - // const { labelIds = [], folder, q, max = 50, cursor } = params; + async getThreadsFromDB(params: { + labelIds?: string[]; + folder?: string; + q?: string; + max?: number; + pageToken?: string; + }) { + const { labelIds = [], folder, q, max = 50, pageToken } = params; - // try { - // // Build WHERE conditions - // const whereConditions: string[] = []; + try { + // Build WHERE conditions + const whereConditions: string[] = []; - // // Add folder condition (maps to specific label) - // if (folder) { - // const folderLabel = folder.toUpperCase(); - // whereConditions.push(`EXISTS ( - // SELECT 1 FROM json_each(latest_label_ids) WHERE value = '${folderLabel}' - // )`); - // } + // Add folder condition (maps to specific label) + if (folder) { + const folderLabel = folder.toUpperCase(); + whereConditions.push(`EXISTS ( + SELECT 1 FROM json_each(latest_label_ids) WHERE value = '${folderLabel}' + )`); + } - // // Add label conditions (OR logic for multiple labels) - // if (labelIds.length > 0) { - // if (labelIds.length === 1) { - // whereConditions.push(`EXISTS ( - // SELECT 1 FROM json_each(latest_label_ids) WHERE value = '${labelIds[0]}' - // )`); - // } else { - // // Multiple labels with OR logic - // const multiLabelCondition = labelIds - // .map( - // (labelId) => - // `EXISTS (SELECT 1 FROM json_each(latest_label_ids) WHERE value = '${labelId}')`, - // ) - // .join(' OR '); - // whereConditions.push(`(${multiLabelCondition})`); - // } - // } + // Add label conditions (OR logic for multiple labels) + if (labelIds.length > 0) { + if (labelIds.length === 1) { + whereConditions.push(`EXISTS ( + SELECT 1 FROM json_each(latest_label_ids) WHERE value = '${labelIds[0]}' + )`); + } else { + // Multiple labels with OR logic + const multiLabelCondition = labelIds + .map( + (labelId) => + `EXISTS (SELECT 1 FROM json_each(latest_label_ids) WHERE value = '${labelId}')`, + ) + .join(' OR '); + whereConditions.push(`(${multiLabelCondition})`); + } + } - // // // Add search query condition - // // if (q) { - // // const searchTerm = q.replace(/'/g, "''"); // Escape single quotes - // // whereConditions.push(`( - // // latest_subject LIKE '%${searchTerm}%' OR - // // latest_sender LIKE '%${searchTerm}%' OR - // // messages LIKE '%${searchTerm}%' - // // )`); - // // } + // // Add search query condition + if (q) { + const searchTerm = q.replace(/'/g, "''"); // Escape single quotes + whereConditions.push(`( + latest_subject LIKE '%${searchTerm}%' OR + latest_sender LIKE '%${searchTerm}%' + )`); + } - // // Add cursor condition - // if (cursor) { - // whereConditions.push(`latest_received_on < '${cursor}'`); - // } + // Add cursor condition + if (pageToken) { + whereConditions.push(`latest_received_on < '${pageToken}'`); + } - // // Execute query based on conditions - // let result; + // Execute query based on conditions + let result; - // if (whereConditions.length === 0) { - // // No conditions - // result = await this.sql` - // SELECT id, latest_received_on - // FROM threads - // ORDER BY latest_received_on DESC - // LIMIT ${max} - // `; - // } else if (whereConditions.length === 1) { - // // Single condition - // const condition = whereConditions[0]; - // if (condition.includes('latest_received_on <')) { - // const cursorValue = cursor!; - // result = await this.sql` - // SELECT id, latest_received_on - // FROM threads - // WHERE latest_received_on < ${cursorValue} - // ORDER BY latest_received_on DESC - // LIMIT ${max} - // `; - // } else if (folder) { - // // Folder condition - // const folderLabel = folder.toUpperCase(); - // result = await this.sql` - // SELECT id, latest_received_on - // FROM threads - // WHERE EXISTS ( - // SELECT 1 FROM json_each(latest_label_ids) WHERE value = ${folderLabel} - // ) - // ORDER BY latest_received_on DESC - // LIMIT ${max} - // `; - // } else { - // // Single label condition - // const labelId = labelIds[0]; - // result = await this.sql` - // SELECT id, latest_received_on - // FROM threads - // WHERE EXISTS ( - // SELECT 1 FROM json_each(latest_label_ids) WHERE value = ${labelId} - // ) - // ORDER BY latest_received_on DESC - // LIMIT ${max} - // `; - // } - // } else { - // // Multiple conditions - handle combinations - // if (folder && labelIds.length === 0 && cursor) { - // // Folder + cursor - // const folderLabel = folder.toUpperCase(); - // result = await this.sql` - // SELECT id, latest_received_on - // FROM threads - // WHERE EXISTS ( - // SELECT 1 FROM json_each(latest_label_ids) WHERE value = ${folderLabel} - // ) AND latest_received_on < ${cursor} - // ORDER BY latest_received_on DESC - // LIMIT ${max} - // `; - // } else if (labelIds.length === 1 && cursor && !folder) { - // // Single label + cursor - // const labelId = labelIds[0]; - // result = await this.sql` - // SELECT id, latest_received_on - // FROM threads - // WHERE EXISTS ( - // SELECT 1 FROM json_each(latest_label_ids) WHERE value = ${labelId} - // ) AND latest_received_on < ${cursor} - // ORDER BY latest_received_on DESC - // LIMIT ${max} - // `; - // } else { - // // For now, fallback to just cursor if complex combinations - // const cursorValue = cursor || ''; - // result = await this.sql` - // SELECT id, latest_received_on - // FROM threads - // WHERE latest_received_on < ${cursorValue} - // ORDER BY latest_received_on DESC - // LIMIT ${max} - // `; - // } - // } + if (whereConditions.length === 0) { + // No conditions + result = await this.sql` + SELECT id, latest_received_on + FROM threads + ORDER BY latest_received_on DESC + LIMIT ${max} + `; + } else if (whereConditions.length === 1) { + // Single condition + const condition = whereConditions[0]; + if (condition.includes('latest_received_on <')) { + const cursorValue = pageToken!; + result = await this.sql` + SELECT id, latest_received_on + FROM threads + WHERE latest_received_on < ${cursorValue} + ORDER BY latest_received_on DESC + LIMIT ${max} + `; + } else if (folder) { + // Folder condition + const folderLabel = folder.toUpperCase(); + result = await this.sql` + SELECT id, latest_received_on + FROM threads + WHERE EXISTS ( + SELECT 1 FROM json_each(latest_label_ids) WHERE value = ${folderLabel} + ) + ORDER BY latest_received_on DESC + LIMIT ${max} + `; + } else { + // Single label condition + const labelId = labelIds[0]; + result = await this.sql` + SELECT id, latest_received_on + FROM threads + WHERE EXISTS ( + SELECT 1 FROM json_each(latest_label_ids) WHERE value = ${labelId} + ) + ORDER BY latest_received_on DESC + LIMIT ${max} + `; + } + } else { + // Multiple conditions - handle combinations + if (folder && labelIds.length === 0 && pageToken) { + // Folder + cursor + const folderLabel = folder.toUpperCase(); + result = await this.sql` + SELECT id, latest_received_on + FROM threads + WHERE EXISTS ( + SELECT 1 FROM json_each(latest_label_ids) WHERE value = ${folderLabel} + ) AND latest_received_on < ${pageToken} + ORDER BY latest_received_on DESC + LIMIT ${max} + `; + } else if (labelIds.length === 1 && pageToken && !folder) { + // Single label + cursor + const labelId = labelIds[0]; + result = await this.sql` + SELECT id, latest_received_on + FROM threads + WHERE EXISTS ( + SELECT 1 FROM json_each(latest_label_ids) WHERE value = ${labelId} + ) AND latest_received_on < ${pageToken} + ORDER BY latest_received_on DESC + LIMIT ${max} + `; + } else { + // For now, fallback to just cursor if complex combinations + const cursorValue = pageToken || ''; + result = await this.sql` + SELECT id, latest_received_on + FROM threads + WHERE latest_received_on < ${cursorValue} + ORDER BY latest_received_on DESC + LIMIT ${max} + `; + } + } - // const threads = result.map((row: any) => ({ - // id: row.id, - // historyId: null, - // })); + const threads = result.map((row: any) => ({ + id: row.id, + historyId: null, + })); - // // Use latest_received_on for pagination cursor - // const nextPageToken = - // threads.length === max && result.length > 0 - // ? result[result.length - 1].latest_received_on - // : null; + // Use latest_received_on for pagination cursor + const nextPageToken = + threads.length === max && result.length > 0 + ? result[result.length - 1].latest_received_on + : null; - // return { - // threads, - // nextPageToken, - // }; - // } catch (error) { - // console.error('Failed to get threads from database:', error); - // throw error; - // } - // } + return { + threads, + nextPageToken, + }; + } catch (error) { + console.error('Failed to get threads from database:', error); + throw error; + } + } - // async getThreadFromDB(id: string): Promise { - // try { - // const result = this.sql` - // SELECT - // id, - // thread_id, - // provider_id, - // latest_sender, - // latest_received_on, - // latest_subject, - // latest_label_ids, - // created_at, - // updated_at - // FROM threads - // WHERE id = ${id} - // LIMIT 1 - // `; + async getThreadFromDB(id: string): Promise { + try { + const result = this.sql` + SELECT + id, + thread_id, + provider_id, + latest_sender, + latest_received_on, + latest_subject, + latest_label_ids, + created_at, + updated_at + FROM threads + WHERE id = ${id} + LIMIT 1 + `; - // if (result.length === 0) { - // this.ctx.waitUntil(this.syncThread(id)); - // return { - // messages: [], - // latest: undefined, - // hasUnread: false, - // totalReplies: 0, - // labels: [], - // } satisfies IGetThreadResponse; - // } + if (result.length === 0) { + this.ctx.waitUntil(this.syncThread(id)); + return { + messages: [], + latest: undefined, + hasUnread: false, + totalReplies: 0, + labels: [], + } satisfies IGetThreadResponse; + } - // const row = result[0] as any; - // const storedMessages = await env.THREADS_BUCKET.get(this.getThreadKey(id)); - // const latestLabelIds = JSON.parse(row.latest_label_ids || '[]'); + const row = result[0] as any; + const storedThread = await env.THREADS_BUCKET.get(this.getThreadKey(id)); - // const messages: ParsedMessage[] = storedMessages - // ? JSON.parse(await storedMessages.text()) - // : []; + const messages: ParsedMessage[] = storedThread + ? (JSON.parse(await storedThread.text()) as IGetThreadResponse).messages + : []; - // return { - // messages, - // latest: messages.length > 0 ? messages[messages.length - 1] : undefined, - // hasUnread: latestLabelIds.includes('UNREAD'), - // totalReplies: messages.length, - // labels: latestLabelIds.map((id: string) => ({ id, name: id })), - // } satisfies IGetThreadResponse; - // } catch (error) { - // console.error('Failed to get thread from database:', error); - // throw error; - // } - // } + const latestLabelIds = JSON.parse(row.latest_label_ids || '[]'); + + return { + messages, + latest: messages.length > 0 ? messages[messages.length - 1] : undefined, + hasUnread: latestLabelIds.includes('UNREAD'), + totalReplies: messages.length, + labels: latestLabelIds.map((id: string) => ({ id, name: id })), + } satisfies IGetThreadResponse; + } catch (error) { + console.error('Failed to get thread from database:', error); + throw error; + } + } } export class ZeroMCP extends McpAgent { diff --git a/apps/server/src/trpc/routes/mail.ts b/apps/server/src/trpc/routes/mail.ts index ec9a6d72..f56b643e 100644 --- a/apps/server/src/trpc/routes/mail.ts +++ b/apps/server/src/trpc/routes/mail.ts @@ -82,24 +82,26 @@ export const mailRouter = router({ }); return drafts; } - // if (q) { + if (q) { + const threadsResponse = await agent.rawListThreads({ + labelIds: labelIds, + maxResults: max, + pageToken: cursor, + query: q, + folder, + }); + return threadsResponse; + } + const folderLabelId = getFolderLabelId(folder); + const labelIdsToUse = folderLabelId ? [...labelIds, folderLabelId] : labelIds; const threadsResponse = await agent.listThreads({ - labelIds: labelIds, + labelIds: labelIdsToUse, maxResults: max, pageToken: cursor, query: q, folder, }); return threadsResponse; - // } - // const folderLabelId = getFolderLabelId(folder); - // const labelIdsToUse = folderLabelId ? [...labelIds, folderLabelId] : labelIds; - // const threadsResponse = await agent.getThreadsFromDB({ - // labelIds: labelIdsToUse, - // max: max, - // cursor: cursor, - // }); - // return threadsResponse; }), markAsRead: activeDriverProcedure .input( diff --git a/apps/server/wrangler.jsonc b/apps/server/wrangler.jsonc index 5a08cf3f..be7a84a5 100644 --- a/apps/server/wrangler.jsonc +++ b/apps/server/wrangler.jsonc @@ -116,7 +116,7 @@ "DISABLE_CALLS": "true", "VOICE_SECRET": "1234567890", "GOOGLE_S_ACCOUNT": "{}", - "DROP_AGENT_TABLES": "false", + "DROP_AGENT_TABLES": "true", "THREAD_SYNC_MAX_COUNT": "40", "THREAD_SYNC_LOOP": "false", }, @@ -260,9 +260,9 @@ "VITE_PUBLIC_BACKEND_URL": "https://sapi.0.email", "VITE_PUBLIC_APP_URL": "https://staging.0.email", "DISABLE_CALLS": "", - "DROP_AGENT_TABLES": "false", + "DROP_AGENT_TABLES": "true", "THREAD_SYNC_MAX_COUNT": "40", - "THREAD_SYNC_LOOP": "false", + "THREAD_SYNC_LOOP": "true", }, "kv_namespaces": [ { @@ -401,9 +401,9 @@ "VITE_PUBLIC_BACKEND_URL": "https://api.0.email", "VITE_PUBLIC_APP_URL": "https://0.email", "DISABLE_CALLS": "true", - "DROP_AGENT_TABLES": "false", + "DROP_AGENT_TABLES": "true", "THREAD_SYNC_MAX_COUNT": "40", - "THREAD_SYNC_LOOP": "false", + "THREAD_SYNC_LOOP": "true", }, "kv_namespaces": [ {