mirror of
https://github.com/Mail-0/Zero.git
synced 2026-06-28 14:56:48 +00:00
Add email syncing status indicators and optimize folder synchronization (#1915)
# Email Syncing Status Indicators ## Description Added real-time email syncing status indicators to the mail UI that show when emails are being synchronized, which folders are currently syncing, and the current storage size. This provides users with better visibility into background processes. ## Type of Change - [x] ✨ New feature (non-breaking change which adds functionality) - [x] ⚡ Performance improvement ## Areas Affected - [x] Email Integration (Gmail, IMAP, etc.) - [x] User Interface/Experience ## Testing Done - [x] Manual testing performed ## Checklist - [x] I have performed a self-review of my code - [x] My changes generate no new warnings ## Additional Notes This PR implements a state management system for email synchronization status using Jotai atoms. The server now broadcasts syncing status through Party, and the UI displays this information in a non-intrusive way at the top of the mail interface. Key changes: - Created a new `useDoState` hook to manage syncing state - Added status indicators in the mail layout component - Modified the server to track and broadcast syncing status - Improved folder synchronization logic to be more efficient - Removed unnecessary delays in the synchronization process --- _By submitting this pull request, I confirm that my contribution is made under the terms of the project's license._ <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit * **New Features** * Added a real-time syncing status indicator in the mail interface, displaying current sync progress, folders being synced, and storage usage. * Introduced live syncing state updates communicated between server and client for improved sync transparency. * **Improvements** * Enhanced background synchronization logic to provide clearer feedback on syncing activity. * Real-time updates now reflect the latest syncing state without delays. * Removed artificial delays in syncing processes for faster synchronization. * Triggered folder syncing asynchronously when fetching threads from the database. * **Bug Fixes** * Removed unused and obsolete code related to database table management and sync rate-limiting. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
This commit is contained in:
@@ -30,6 +30,7 @@ import AIToggleButton from '../ai-toggle-button';
|
||||
import { useIsMobile } from '@/hooks/use-mobile';
|
||||
import { Button } from '@/components/ui/button';
|
||||
import { useSession } from '@/lib/auth-client';
|
||||
import { useDoState } from './use-do-state';
|
||||
import { m } from '@/paraglide/messages';
|
||||
import { useQueryState } from 'nuqs';
|
||||
import { cn } from '@/lib/utils';
|
||||
@@ -325,6 +326,7 @@ export function MailLayout() {
|
||||
const { data: activeConnection } = useActiveConnection();
|
||||
const { activeFilters, clearAllFilters } = useCommandPalette();
|
||||
const [, setIsCommandPaletteOpen] = useQueryState('isCommandPaletteOpen');
|
||||
const [{ isSyncing, syncingFolders, storageSize }] = useDoState();
|
||||
|
||||
useEffect(() => {
|
||||
if (prevFolderRef.current !== folder && mail.bulkSelected.length > 0) {
|
||||
@@ -394,6 +396,15 @@ export function MailLayout() {
|
||||
|
||||
return (
|
||||
<TooltipProvider delayDuration={0}>
|
||||
<div className="fixed right-1 top-1 z-10 flex w-full justify-end">
|
||||
<p className="w-fit rounded bg-purple-800 p-1 text-xs">
|
||||
{isSyncing ? 'Syncing your emails...' : 'Synced your emails'}
|
||||
</p>
|
||||
{storageSize && <p className="w-fit rounded bg-purple-800 p-1 text-xs">{storageSize}</p>}
|
||||
{syncingFolders.length > 0 && (
|
||||
<p className="w-fit rounded bg-purple-800 p-1 text-xs">{syncingFolders.join(', ')}</p>
|
||||
)}
|
||||
</div>
|
||||
<PricingDialog />
|
||||
<div className="rounded-inherit z-5 relative flex p-0 md:mr-0.5 md:mt-1">
|
||||
<ResizablePanelGroup
|
||||
|
||||
34
apps/mail/components/mail/use-do-state.ts
Normal file
34
apps/mail/components/mail/use-do-state.ts
Normal file
@@ -0,0 +1,34 @@
|
||||
import { atom, useAtom } from 'jotai';
|
||||
|
||||
export type State = {
|
||||
isSyncing: boolean;
|
||||
syncingFolders: string[];
|
||||
storageSize: number;
|
||||
};
|
||||
|
||||
const stateAtom = atom<State>({
|
||||
isSyncing: false,
|
||||
syncingFolders: [],
|
||||
storageSize: 0,
|
||||
});
|
||||
|
||||
function useDoState() {
|
||||
return useAtom(stateAtom);
|
||||
}
|
||||
|
||||
const setIsSyncingAtom = atom(null, (get, set, isSyncing: boolean) => {
|
||||
const current = get(stateAtom);
|
||||
set(stateAtom, { ...current, isSyncing });
|
||||
});
|
||||
|
||||
const setSyncingFoldersAtom = atom(null, (get, set, syncingFolders: string[]) => {
|
||||
const current = get(stateAtom);
|
||||
set(stateAtom, { ...current, syncingFolders });
|
||||
});
|
||||
|
||||
const setStorageSizeAtom = atom(null, (get, set, storageSize: number) => {
|
||||
const current = get(stateAtom);
|
||||
set(stateAtom, { ...current, storageSize });
|
||||
});
|
||||
|
||||
export { setIsSyncingAtom, setSyncingFoldersAtom, setStorageSizeAtom, useDoState };
|
||||
@@ -4,6 +4,7 @@ import useSearchLabels from '@/hooks/use-labels-search';
|
||||
import { useQueryClient } from '@tanstack/react-query';
|
||||
import { useTRPC } from '@/providers/query-provider';
|
||||
import { usePartySocket } from 'partysocket/react';
|
||||
import { useDoState } from './mail/use-do-state';
|
||||
|
||||
// 10 seconds is appropriate for real-time notifications
|
||||
|
||||
@@ -15,6 +16,7 @@ export enum IncomingMessageType {
|
||||
Mail_List = 'zero_mail_list_threads',
|
||||
Mail_Get = 'zero_mail_get_thread',
|
||||
User_Topics = 'zero_user_topics',
|
||||
Do_State = 'zero_do_state',
|
||||
}
|
||||
|
||||
export enum OutgoingMessageType {
|
||||
@@ -31,6 +33,7 @@ export const NotificationProvider = () => {
|
||||
const { data: activeConnection } = useActiveConnection();
|
||||
const [searchValue] = useSearchValue();
|
||||
const { labels } = useSearchLabels();
|
||||
const [, setDoState] = useDoState();
|
||||
|
||||
usePartySocket({
|
||||
party: 'zero-agent',
|
||||
@@ -59,6 +62,9 @@ export const NotificationProvider = () => {
|
||||
queryClient.invalidateQueries({
|
||||
queryKey: trpc.labels.list.queryKey(),
|
||||
});
|
||||
} else if (type === IncomingMessageType.Do_State) {
|
||||
const { isSyncing, syncingFolders, storageSize } = JSON.parse(message.data);
|
||||
setDoState({ isSyncing, syncingFolders, storageSize });
|
||||
}
|
||||
} catch (error) {
|
||||
console.error('error parsing party message', error);
|
||||
|
||||
@@ -23,8 +23,6 @@ export const getZeroClient = async (connectionId: string, executionCtx: Executio
|
||||
await agent.setName(connectionId);
|
||||
await agent.setupAuth();
|
||||
|
||||
executionCtx.waitUntil(agent.syncFolders());
|
||||
|
||||
return agent;
|
||||
};
|
||||
|
||||
|
||||
@@ -60,8 +60,6 @@ import type { Message } from 'ai';
|
||||
import { eq } from 'drizzle-orm';
|
||||
|
||||
const decoder = new TextDecoder();
|
||||
|
||||
const shouldDropTables = false;
|
||||
const maxCount = 20;
|
||||
const shouldLoop = env.THREAD_SYNC_LOOP !== 'false';
|
||||
|
||||
@@ -312,13 +310,18 @@ export class ZeroDriver extends Agent<ZeroEnv> {
|
||||
private agent: DurableObjectStub<ZeroAgent> | null = null;
|
||||
constructor(ctx: DurableObjectState, env: ZeroEnv) {
|
||||
super(ctx, env);
|
||||
if (shouldDropTables) this.dropTables();
|
||||
}
|
||||
|
||||
getDatabaseSize() {
|
||||
return this.ctx.storage.sql.databaseSize;
|
||||
}
|
||||
|
||||
isSyncing(): string[] {
|
||||
return Array.from(this.foldersInSync.entries())
|
||||
.filter(([, syncing]) => syncing)
|
||||
.map(([folder]) => folder);
|
||||
}
|
||||
|
||||
getAllSubjects() {
|
||||
const subjects = this.sql`
|
||||
SELECT latest_subject FROM threads
|
||||
@@ -844,12 +847,6 @@ export class ZeroDriver extends Agent<ZeroEnv> {
|
||||
}
|
||||
}
|
||||
|
||||
async dropTables() {
|
||||
console.log('Dropping tables');
|
||||
return this.sql`
|
||||
DROP TABLE IF EXISTS threads;`;
|
||||
}
|
||||
|
||||
async deleteThread(id: string) {
|
||||
void this.sql`
|
||||
DELETE FROM threads WHERE thread_id = ${id};
|
||||
@@ -1063,6 +1060,15 @@ export class ZeroDriver extends Agent<ZeroEnv> {
|
||||
return count[0]['COUNT(*)'] as number;
|
||||
}
|
||||
|
||||
async sendDoState() {
|
||||
return this.agent?.broadcastChatMessage({
|
||||
type: OutgoingMessageType.Do_State,
|
||||
isSyncing: this.isSyncing().length > 0,
|
||||
syncingFolders: this.isSyncing(),
|
||||
storageSize: this.getDatabaseSize(),
|
||||
});
|
||||
}
|
||||
|
||||
async syncThreads(folder: string): Promise<FolderSyncResult> {
|
||||
// Skip sync for aggregate instances - they should only mirror primary operations
|
||||
if (this.name.includes('aggregate')) {
|
||||
@@ -1095,6 +1101,7 @@ export class ZeroDriver extends Agent<ZeroEnv> {
|
||||
|
||||
if (this.foldersInSync.has(folder)) {
|
||||
console.log(`[syncThreads] Sync already in progress for folder ${folder}, skipping...`);
|
||||
await this.sendDoState();
|
||||
return {
|
||||
synced: 0,
|
||||
message: 'Sync already in progress',
|
||||
@@ -1144,7 +1151,6 @@ export class ZeroDriver extends Agent<ZeroEnv> {
|
||||
// Sync single thread function
|
||||
const syncSingleThread = (threadId: string) =>
|
||||
Effect.gen(this, function* () {
|
||||
yield* Effect.sleep(150); // Rate limiting delay
|
||||
const syncResult = yield* Effect.tryPromise(() => this.syncThread({ threadId })).pipe(
|
||||
Effect.tap(() =>
|
||||
Effect.sync(() =>
|
||||
@@ -1174,13 +1180,11 @@ export class ZeroDriver extends Agent<ZeroEnv> {
|
||||
// Main sync program
|
||||
let pageToken: string | null = null;
|
||||
let hasMore = true;
|
||||
let firstPageProcessed = false;
|
||||
|
||||
while (hasMore) {
|
||||
result.pagesProcessed++;
|
||||
|
||||
// Rate limiting delay between pages
|
||||
yield* Effect.sleep(1000);
|
||||
|
||||
console.log(
|
||||
`[syncThreads] Processing page ${result.pagesProcessed} for folder ${folder}`,
|
||||
);
|
||||
@@ -1230,6 +1234,12 @@ export class ZeroDriver extends Agent<ZeroEnv> {
|
||||
result.synced += listResult.threads.length;
|
||||
pageToken = listResult.nextPageToken;
|
||||
hasMore = pageToken !== null && shouldLoop;
|
||||
|
||||
// Send state update after first page is processed to give accurate feedback
|
||||
if (!firstPageProcessed) {
|
||||
firstPageProcessed = true;
|
||||
yield* Effect.tryPromise(() => this.sendDoState());
|
||||
}
|
||||
}
|
||||
|
||||
// Broadcast completion if agent exists
|
||||
@@ -1259,6 +1269,7 @@ export class ZeroDriver extends Agent<ZeroEnv> {
|
||||
}
|
||||
|
||||
this.foldersInSync.delete(folder);
|
||||
yield* Effect.tryPromise(() => this.sendDoState());
|
||||
|
||||
console.log(`[syncThreads] Completed sync for folder: ${folder}`, {
|
||||
synced: result.synced,
|
||||
@@ -1285,6 +1296,7 @@ export class ZeroDriver extends Agent<ZeroEnv> {
|
||||
broadcastSent: false,
|
||||
});
|
||||
}),
|
||||
Effect.tap(() => this.sendDoState()),
|
||||
),
|
||||
);
|
||||
}
|
||||
@@ -1460,22 +1472,23 @@ export class ZeroDriver extends Agent<ZeroEnv> {
|
||||
// Handle folder + labelIds combination (supports pagination)
|
||||
if (folder && labelIds.length > 0 && !q) {
|
||||
const folderLabel = folder.toUpperCase();
|
||||
|
||||
|
||||
// De-duplicate labelIds and remove folder label if it's already included
|
||||
// Cap labelIds length to prevent resource exhaustion
|
||||
const maxLabelIds = 5;
|
||||
const uniqueLabelIds = [...new Set(labelIds
|
||||
.filter(id => id.toUpperCase() !== folderLabel)
|
||||
.slice(0, maxLabelIds)
|
||||
)];
|
||||
|
||||
console.log('[queryThreads] Case: folder + labelIds', {
|
||||
folderLabel,
|
||||
originalLabelIds: labelIds,
|
||||
const uniqueLabelIds = [
|
||||
...new Set(
|
||||
labelIds.filter((id) => id.toUpperCase() !== folderLabel).slice(0, maxLabelIds),
|
||||
),
|
||||
];
|
||||
|
||||
console.log('[queryThreads] Case: folder + labelIds', {
|
||||
folderLabel,
|
||||
originalLabelIds: labelIds,
|
||||
uniqueLabelIds,
|
||||
pageToken
|
||||
pageToken,
|
||||
});
|
||||
|
||||
|
||||
if (uniqueLabelIds.length === 0) {
|
||||
// Only folder filter needed, handle separately
|
||||
return this.sql`
|
||||
@@ -1488,7 +1501,7 @@ export class ZeroDriver extends Agent<ZeroEnv> {
|
||||
LIMIT ${maxResults}
|
||||
`;
|
||||
}
|
||||
|
||||
|
||||
// Use improved JSON-based approach that handles any number of labelIds
|
||||
const labelsJson = JSON.stringify(uniqueLabelIds);
|
||||
return this.sql`
|
||||
@@ -1566,6 +1579,7 @@ export class ZeroDriver extends Agent<ZeroEnv> {
|
||||
maxResults?: number;
|
||||
pageToken?: string;
|
||||
}): Promise<IGetThreadsResponse> {
|
||||
this.ctx.waitUntil(this.syncFolders());
|
||||
const { maxResults = 50 } = params;
|
||||
const normalizedParams = {
|
||||
...params,
|
||||
@@ -1832,7 +1846,6 @@ export class ZeroDriver extends Agent<ZeroEnv> {
|
||||
|
||||
export class ZeroAgent extends AIChatAgent<ZeroEnv> {
|
||||
private chatMessageAbortControllers: Map<string, AbortController> = new Map();
|
||||
private connectionThreadIds: Map<string, string | null> = new Map();
|
||||
|
||||
async registerZeroMCP() {
|
||||
await this.mcp.connect(this.env.VITE_PUBLIC_BACKEND_URL + '/sse', {
|
||||
|
||||
@@ -16,6 +16,7 @@ export enum OutgoingMessageType {
|
||||
Mail_List = 'zero_mail_list_threads',
|
||||
Mail_Get = 'zero_mail_get_thread',
|
||||
User_Topics = 'zero_user_topics',
|
||||
Do_State = 'zero_do_state',
|
||||
}
|
||||
|
||||
export type IncomingMessage =
|
||||
@@ -72,6 +73,12 @@ export type OutgoingMessage =
|
||||
}
|
||||
| {
|
||||
type: OutgoingMessageType.User_Topics;
|
||||
}
|
||||
| {
|
||||
type: OutgoingMessageType.Do_State;
|
||||
isSyncing: boolean;
|
||||
syncingFolders: string[];
|
||||
storageSize: number;
|
||||
};
|
||||
|
||||
export type QueueFunc = (name: string, payload: unknown) => Promise<unknown>;
|
||||
|
||||
Reference in New Issue
Block a user