diff --git a/apps/mail/components/mail/mail.tsx b/apps/mail/components/mail/mail.tsx index 7fb7cb7a1..93b11b65d 100644 --- a/apps/mail/components/mail/mail.tsx +++ b/apps/mail/components/mail/mail.tsx @@ -30,6 +30,7 @@ import AIToggleButton from '../ai-toggle-button'; import { useIsMobile } from '@/hooks/use-mobile'; import { Button } from '@/components/ui/button'; import { useSession } from '@/lib/auth-client'; +import { useDoState } from './use-do-state'; import { m } from '@/paraglide/messages'; import { useQueryState } from 'nuqs'; import { cn } from '@/lib/utils'; @@ -325,6 +326,7 @@ export function MailLayout() { const { data: activeConnection } = useActiveConnection(); const { activeFilters, clearAllFilters } = useCommandPalette(); const [, setIsCommandPaletteOpen] = useQueryState('isCommandPaletteOpen'); + const [{ isSyncing, syncingFolders, storageSize }] = useDoState(); useEffect(() => { if (prevFolderRef.current !== folder && mail.bulkSelected.length > 0) { @@ -394,6 +396,15 @@ export function MailLayout() { return ( +
+

+ {isSyncing ? 'Syncing your emails...' : 'Synced your emails'} +

+ {storageSize &&

{storageSize}

} + {syncingFolders.length > 0 && ( +

{syncingFolders.join(', ')}

+ )} +
({ + isSyncing: false, + syncingFolders: [], + storageSize: 0, +}); + +function useDoState() { + return useAtom(stateAtom); +} + +const setIsSyncingAtom = atom(null, (get, set, isSyncing: boolean) => { + const current = get(stateAtom); + set(stateAtom, { ...current, isSyncing }); +}); + +const setSyncingFoldersAtom = atom(null, (get, set, syncingFolders: string[]) => { + const current = get(stateAtom); + set(stateAtom, { ...current, syncingFolders }); +}); + +const setStorageSizeAtom = atom(null, (get, set, storageSize: number) => { + const current = get(stateAtom); + set(stateAtom, { ...current, storageSize }); +}); + +export { setIsSyncingAtom, setSyncingFoldersAtom, setStorageSizeAtom, useDoState }; diff --git a/apps/mail/components/party.tsx b/apps/mail/components/party.tsx index d53b1211b..50ce6fa89 100644 --- a/apps/mail/components/party.tsx +++ b/apps/mail/components/party.tsx @@ -4,6 +4,7 @@ import useSearchLabels from '@/hooks/use-labels-search'; import { useQueryClient } from '@tanstack/react-query'; import { useTRPC } from '@/providers/query-provider'; import { usePartySocket } from 'partysocket/react'; +import { useDoState } from './mail/use-do-state'; // 10 seconds is appropriate for real-time notifications @@ -15,6 +16,7 @@ export enum IncomingMessageType { Mail_List = 'zero_mail_list_threads', Mail_Get = 'zero_mail_get_thread', User_Topics = 'zero_user_topics', + Do_State = 'zero_do_state', } export enum OutgoingMessageType { @@ -31,6 +33,7 @@ export const NotificationProvider = () => { const { data: activeConnection } = useActiveConnection(); const [searchValue] = useSearchValue(); const { labels } = useSearchLabels(); + const [, setDoState] = useDoState(); usePartySocket({ party: 'zero-agent', @@ -59,6 +62,9 @@ export const NotificationProvider = () => { queryClient.invalidateQueries({ queryKey: trpc.labels.list.queryKey(), }); + } else if (type === IncomingMessageType.Do_State) { + const { isSyncing, syncingFolders, storageSize } = JSON.parse(message.data); + setDoState({ isSyncing, syncingFolders, storageSize }); } } catch (error) { console.error('error parsing party message', error); diff --git a/apps/server/src/lib/server-utils.ts b/apps/server/src/lib/server-utils.ts index c41a0e2d1..226249830 100644 --- a/apps/server/src/lib/server-utils.ts +++ b/apps/server/src/lib/server-utils.ts @@ -23,8 +23,6 @@ export const getZeroClient = async (connectionId: string, executionCtx: Executio await agent.setName(connectionId); await agent.setupAuth(); - executionCtx.waitUntil(agent.syncFolders()); - return agent; }; diff --git a/apps/server/src/routes/agent/index.ts b/apps/server/src/routes/agent/index.ts index 34682c30e..12161bc56 100644 --- a/apps/server/src/routes/agent/index.ts +++ b/apps/server/src/routes/agent/index.ts @@ -60,8 +60,6 @@ import type { Message } from 'ai'; import { eq } from 'drizzle-orm'; const decoder = new TextDecoder(); - -const shouldDropTables = false; const maxCount = 20; const shouldLoop = env.THREAD_SYNC_LOOP !== 'false'; @@ -312,13 +310,18 @@ export class ZeroDriver extends Agent { private agent: DurableObjectStub | null = null; constructor(ctx: DurableObjectState, env: ZeroEnv) { super(ctx, env); - if (shouldDropTables) this.dropTables(); } getDatabaseSize() { return this.ctx.storage.sql.databaseSize; } + isSyncing(): string[] { + return Array.from(this.foldersInSync.entries()) + .filter(([, syncing]) => syncing) + .map(([folder]) => folder); + } + getAllSubjects() { const subjects = this.sql` SELECT latest_subject FROM threads @@ -844,12 +847,6 @@ export class ZeroDriver extends Agent { } } - async dropTables() { - console.log('Dropping tables'); - return this.sql` - DROP TABLE IF EXISTS threads;`; - } - async deleteThread(id: string) { void this.sql` DELETE FROM threads WHERE thread_id = ${id}; @@ -1063,6 +1060,15 @@ export class ZeroDriver extends Agent { return count[0]['COUNT(*)'] as number; } + async sendDoState() { + return this.agent?.broadcastChatMessage({ + type: OutgoingMessageType.Do_State, + isSyncing: this.isSyncing().length > 0, + syncingFolders: this.isSyncing(), + storageSize: this.getDatabaseSize(), + }); + } + async syncThreads(folder: string): Promise { // Skip sync for aggregate instances - they should only mirror primary operations if (this.name.includes('aggregate')) { @@ -1095,6 +1101,7 @@ export class ZeroDriver extends Agent { if (this.foldersInSync.has(folder)) { console.log(`[syncThreads] Sync already in progress for folder ${folder}, skipping...`); + await this.sendDoState(); return { synced: 0, message: 'Sync already in progress', @@ -1144,7 +1151,6 @@ export class ZeroDriver extends Agent { // Sync single thread function const syncSingleThread = (threadId: string) => Effect.gen(this, function* () { - yield* Effect.sleep(150); // Rate limiting delay const syncResult = yield* Effect.tryPromise(() => this.syncThread({ threadId })).pipe( Effect.tap(() => Effect.sync(() => @@ -1174,13 +1180,11 @@ export class ZeroDriver extends Agent { // Main sync program let pageToken: string | null = null; let hasMore = true; + let firstPageProcessed = false; while (hasMore) { result.pagesProcessed++; - // Rate limiting delay between pages - yield* Effect.sleep(1000); - console.log( `[syncThreads] Processing page ${result.pagesProcessed} for folder ${folder}`, ); @@ -1230,6 +1234,12 @@ export class ZeroDriver extends Agent { result.synced += listResult.threads.length; pageToken = listResult.nextPageToken; hasMore = pageToken !== null && shouldLoop; + + // Send state update after first page is processed to give accurate feedback + if (!firstPageProcessed) { + firstPageProcessed = true; + yield* Effect.tryPromise(() => this.sendDoState()); + } } // Broadcast completion if agent exists @@ -1259,6 +1269,7 @@ export class ZeroDriver extends Agent { } this.foldersInSync.delete(folder); + yield* Effect.tryPromise(() => this.sendDoState()); console.log(`[syncThreads] Completed sync for folder: ${folder}`, { synced: result.synced, @@ -1285,6 +1296,7 @@ export class ZeroDriver extends Agent { broadcastSent: false, }); }), + Effect.tap(() => this.sendDoState()), ), ); } @@ -1460,22 +1472,23 @@ export class ZeroDriver extends Agent { // Handle folder + labelIds combination (supports pagination) if (folder && labelIds.length > 0 && !q) { const folderLabel = folder.toUpperCase(); - + // De-duplicate labelIds and remove folder label if it's already included // Cap labelIds length to prevent resource exhaustion const maxLabelIds = 5; - const uniqueLabelIds = [...new Set(labelIds - .filter(id => id.toUpperCase() !== folderLabel) - .slice(0, maxLabelIds) - )]; - - console.log('[queryThreads] Case: folder + labelIds', { - folderLabel, - originalLabelIds: labelIds, + const uniqueLabelIds = [ + ...new Set( + labelIds.filter((id) => id.toUpperCase() !== folderLabel).slice(0, maxLabelIds), + ), + ]; + + console.log('[queryThreads] Case: folder + labelIds', { + folderLabel, + originalLabelIds: labelIds, uniqueLabelIds, - pageToken + pageToken, }); - + if (uniqueLabelIds.length === 0) { // Only folder filter needed, handle separately return this.sql` @@ -1488,7 +1501,7 @@ export class ZeroDriver extends Agent { LIMIT ${maxResults} `; } - + // Use improved JSON-based approach that handles any number of labelIds const labelsJson = JSON.stringify(uniqueLabelIds); return this.sql` @@ -1566,6 +1579,7 @@ export class ZeroDriver extends Agent { maxResults?: number; pageToken?: string; }): Promise { + this.ctx.waitUntil(this.syncFolders()); const { maxResults = 50 } = params; const normalizedParams = { ...params, @@ -1832,7 +1846,6 @@ export class ZeroDriver extends Agent { export class ZeroAgent extends AIChatAgent { private chatMessageAbortControllers: Map = new Map(); - private connectionThreadIds: Map = new Map(); async registerZeroMCP() { await this.mcp.connect(this.env.VITE_PUBLIC_BACKEND_URL + '/sse', { diff --git a/apps/server/src/routes/agent/types.ts b/apps/server/src/routes/agent/types.ts index ce3dd33be..693836815 100644 --- a/apps/server/src/routes/agent/types.ts +++ b/apps/server/src/routes/agent/types.ts @@ -16,6 +16,7 @@ export enum OutgoingMessageType { Mail_List = 'zero_mail_list_threads', Mail_Get = 'zero_mail_get_thread', User_Topics = 'zero_user_topics', + Do_State = 'zero_do_state', } export type IncomingMessage = @@ -72,6 +73,12 @@ export type OutgoingMessage = } | { type: OutgoingMessageType.User_Topics; + } + | { + type: OutgoingMessageType.Do_State; + isSyncing: boolean; + syncingFolders: string[]; + storageSize: number; }; export type QueueFunc = (name: string, payload: unknown) => Promise;