mirror of
https://github.com/Mail-0/Zero.git
synced 2026-03-03 00:27:01 +00:00
SyncStuff (#1657)
# READ CAREFULLY THEN REMOVE Remove bullet points that are not relevant. PLEASE REFRAIN FROM USING AI TO WRITE YOUR CODE AND PR DESCRIPTION. IF YOU DO USE AI TO WRITE YOUR CODE PLEASE PROVIDE A DESCRIPTION AND REVIEW IT CAREFULLY. MAKE SURE YOU UNDERSTAND THE CODE YOU ARE SUBMITTING USING AI. - Pull requests that do not follow these guidelines will be closed without review or comment. - If you use AI to write your PR description your pr will be close without review or comment. - If you are unsure about anything, feel free to ask for clarification. ## Description Please provide a clear description of your changes. --- ## Type of Change Please delete options that are not relevant. - [ ] 🐛 Bug fix (non-breaking change which fixes an issue) - [ ] ✨ New feature (non-breaking change which adds functionality) - [ ] 💥 Breaking change (fix or feature with breaking changes) - [ ] 📝 Documentation update - [ ] 🎨 UI/UX improvement - [ ] 🔒 Security enhancement - [ ] ⚡ Performance improvement ## Areas Affected Please check all that apply: - [ ] Email Integration (Gmail, IMAP, etc.) - [ ] User Interface/Experience - [ ] Authentication/Authorization - [ ] Data Storage/Management - [ ] API Endpoints - [ ] Documentation - [ ] Testing Infrastructure - [ ] Development Workflow - [ ] Deployment/Infrastructure ## Testing Done Describe the tests you've done: - [ ] Unit tests added/updated - [ ] Integration tests added/updated - [ ] Manual testing performed - [ ] Cross-browser testing (if UI changes) - [ ] Mobile responsiveness verified (if UI changes) ## Security Considerations For changes involving data or authentication: - [ ] No sensitive data is exposed - [ ] Authentication checks are in place - [ ] Input validation is implemented - [ ] Rate limiting is considered (if applicable) ## Checklist - [ ] I have read the [CONTRIBUTING](https://github.com/Mail-0/Zero/blob/staging/.github/CONTRIBUTING.md) document - [ ] My code follows the project's style guidelines - [ ] I have performed a self-review of my code - [ ] I have commented my code, particularly in complex areas - [ ] I have updated the documentation - [ ] My changes generate no new warnings - [ ] I have added tests that prove my fix/feature works - [ ] All tests pass locally - [ ] Any dependent changes are merged and published ## Additional Notes Add any other context about the pull request here. ## Screenshots/Recordings Add screenshots or recordings here if applicable. --- _By submitting this pull request, I confirm that my contribution is made under the terms of the project's license._ <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit * **New Features** * Improved email thread synchronization for enhanced performance and reliability. * Added support for advanced search and label filtering when listing email threads. * **Bug Fixes** * Refined cache invalidation logic to ensure up-to-date thread and message lists in the user interface. * **Chores** * Updated environment variables and documentation to reflect new sync options and configuration changes. * Enhanced internal sync tracking and reduced redundant synchronization operations. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
This commit is contained in:
10
README.md
10
README.md
@@ -193,6 +193,7 @@ You can set up Zero in two ways:
|
||||
- Go to the [Twilio](https://www.twilio.com/)
|
||||
- Create a Twilio account if you don’t already have one
|
||||
- From the dashboard, locate your:
|
||||
|
||||
- Account SID
|
||||
- Auth Token
|
||||
- Phone Number
|
||||
@@ -265,6 +266,15 @@ Zero uses PostgreSQL for storing data. Here's how to set it up:
|
||||
```
|
||||
> If you run `pnpm dev` in your terminal, the studio command should be automatically running with the app.
|
||||
|
||||
### Sync
|
||||
|
||||
Background: https://x.com/cmdhaus/status/1940886269950902362
|
||||
We're now storing the user's emails in their Durable Object & an R2 bucket. This allow us to speed things up, a lot.
|
||||
This also introduces 3 environment variables, `DROP_AGENT_TABLES`,`THREAD_SYNC_MAX_COUNT`, `THREAD_SYNC_LOOP`.
|
||||
`DROP_AGENT_TABLES`: should the durable object drop the threads table before starting a sync
|
||||
`THREAD_SYNC_MAX_COUNT`: how many threads should we sync? max `500` because it's using the same number for the maxResults number from the driver. i.e 500 results per page.
|
||||
`THREAD_SYNC_LOOP`: should make sure to sync all of the items inside a folder? i.e if THREAD_SYNC_MAX_COUNT=500 it will sync 500 threads per request until the folder is fully synced. (should be true in production)
|
||||
|
||||
## Contribute
|
||||
|
||||
Please refer to the [contributing guide](.github/CONTRIBUTING.md).
|
||||
|
||||
@@ -1,4 +1,6 @@
|
||||
import { useActiveConnection } from '@/hooks/use-connections';
|
||||
import { useSearchValue } from '@/hooks/use-search-value';
|
||||
import useSearchLabels from '@/hooks/use-labels-search';
|
||||
import { useQueryClient } from '@tanstack/react-query';
|
||||
import { useTRPC } from '@/providers/query-provider';
|
||||
import { usePartySocket } from 'partysocket/react';
|
||||
@@ -27,6 +29,8 @@ export const NotificationProvider = () => {
|
||||
const trpc = useTRPC();
|
||||
const queryClient = useQueryClient();
|
||||
const { data: activeConnection } = useActiveConnection();
|
||||
const [searchValue] = useSearchValue();
|
||||
const { labels } = useSearchLabels();
|
||||
|
||||
const labelsDebouncer = funnel(
|
||||
() => queryClient.invalidateQueries({ queryKey: trpc.labels.list.queryKey() }),
|
||||
@@ -41,14 +45,37 @@ export const NotificationProvider = () => {
|
||||
party: 'zero-agent',
|
||||
room: activeConnection?.id ? String(activeConnection.id) : 'general',
|
||||
prefix: 'agents',
|
||||
maxRetries: 1,
|
||||
maxRetries: 3,
|
||||
host: import.meta.env.VITE_PUBLIC_BACKEND_URL!,
|
||||
onMessage: async (message: MessageEvent<string>) => {
|
||||
try {
|
||||
const { threadIds, type } = JSON.parse(message.data);
|
||||
const { type } = JSON.parse(message.data);
|
||||
if (type === IncomingMessageType.Mail_Get) {
|
||||
const { threadId, result } = JSON.parse(message.data);
|
||||
// queryClient.setQueryData(trpc.mail.get.queryKey({ id: threadId }), result);
|
||||
const { threadId } = JSON.parse(message.data);
|
||||
queryClient.invalidateQueries({
|
||||
queryKey: trpc.mail.get.queryKey({ id: threadId }),
|
||||
refetchType: 'active',
|
||||
exact: true,
|
||||
predicate: (query) => {
|
||||
const queryAge = Date.now() - (query.state.dataUpdatedAt || 0);
|
||||
console.log({ queryAge, query: query.queryKey });
|
||||
return queryAge > 60000; // 1 minute in milliseconds
|
||||
},
|
||||
});
|
||||
} else if (type === IncomingMessageType.Mail_List) {
|
||||
const { folder } = JSON.parse(message.data);
|
||||
console.log({
|
||||
folder,
|
||||
labelIds: labels,
|
||||
q: searchValue.value,
|
||||
});
|
||||
queryClient.invalidateQueries({
|
||||
queryKey: trpc.mail.listThreads.infiniteQueryKey({
|
||||
folder,
|
||||
labelIds: labels,
|
||||
q: searchValue.value,
|
||||
}),
|
||||
});
|
||||
}
|
||||
} catch (error) {
|
||||
console.error('error parsing party message', error);
|
||||
|
||||
@@ -313,6 +313,20 @@ export class ZeroWorkflow extends WorkflowEntrypoint<Env, Params> {
|
||||
},
|
||||
);
|
||||
|
||||
await step.do(`[ZERO_WORKFLOW] Sync Threads ${historyProcessingKey}`, async () => {
|
||||
const agent = env.ZERO_AGENT.get(env.ZERO_AGENT.idFromName(connectionId.toString()));
|
||||
for (const threadId of threadsToProcess) {
|
||||
try {
|
||||
await agent.syncThread(threadId.toString());
|
||||
} catch (error) {
|
||||
log('[ZERO_WORKFLOW] Failed to sync thread:', {
|
||||
threadId,
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
});
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
await step.do(
|
||||
`[ZERO_WORKFLOW] Send Thread Workflow Instances ${connectionId}`,
|
||||
async () => {
|
||||
@@ -462,11 +476,11 @@ export class ThreadWorkflow extends WorkflowEntrypoint<Env, Params> {
|
||||
async () => {
|
||||
log('[THREAD_WORKFLOW] Getting thread:', threadId);
|
||||
const thread = await driver.get(threadId.toString());
|
||||
await notifyUser({
|
||||
connectionId: connectionId.toString(),
|
||||
result: thread,
|
||||
threadId: threadId.toString(),
|
||||
});
|
||||
// await notifyUser({
|
||||
// connectionId: connectionId.toString(),
|
||||
// result: thread,
|
||||
// threadId: threadId.toString(),
|
||||
// });
|
||||
log('[THREAD_WORKFLOW] Found thread with messages:', thread.messages.length);
|
||||
return thread;
|
||||
},
|
||||
|
||||
@@ -114,17 +114,10 @@ export type OutgoingMessage =
|
||||
}
|
||||
| {
|
||||
type: OutgoingMessageType.Mail_List;
|
||||
result: {
|
||||
threads: {
|
||||
id: string;
|
||||
historyId: string | null;
|
||||
}[];
|
||||
nextPageToken: string | null;
|
||||
};
|
||||
folder: string;
|
||||
}
|
||||
| {
|
||||
type: OutgoingMessageType.Mail_Get;
|
||||
result: IGetThreadResponse;
|
||||
threadId: string;
|
||||
};
|
||||
|
||||
@@ -174,6 +167,16 @@ export class AgentRpcDO extends RpcTarget {
|
||||
return await this.mainDo.buildGmailSearchQuery(query);
|
||||
}
|
||||
|
||||
async rawListThreads(params: {
|
||||
folder: string;
|
||||
query?: string;
|
||||
maxResults?: number;
|
||||
labelIds?: string[];
|
||||
pageToken?: string;
|
||||
}) {
|
||||
return await this.mainDo.rawListThreads(params);
|
||||
}
|
||||
|
||||
async listThreads(params: {
|
||||
folder: string;
|
||||
query?: string;
|
||||
@@ -190,19 +193,19 @@ export class AgentRpcDO extends RpcTarget {
|
||||
|
||||
async markThreadsRead(threadIds: string[]) {
|
||||
const result = await this.mainDo.markThreadsRead(threadIds);
|
||||
// await Promise.all(threadIds.map((id) => this.mainDo.syncThread(id)));
|
||||
await Promise.all(threadIds.map((id) => this.mainDo.syncThread(id)));
|
||||
return result;
|
||||
}
|
||||
|
||||
async markThreadsUnread(threadIds: string[]) {
|
||||
const result = await this.mainDo.markThreadsUnread(threadIds);
|
||||
// await Promise.all(threadIds.map((id) => this.mainDo.syncThread(id)));
|
||||
await Promise.all(threadIds.map((id) => this.mainDo.syncThread(id)));
|
||||
return result;
|
||||
}
|
||||
|
||||
async modifyLabels(threadIds: string[], addLabelIds: string[], removeLabelIds: string[]) {
|
||||
const result = await this.mainDo.modifyLabels(threadIds, addLabelIds, removeLabelIds);
|
||||
// await Promise.all(threadIds.map((id) => this.mainDo.syncThread(id)));
|
||||
await Promise.all(threadIds.map((id) => this.mainDo.syncThread(id)));
|
||||
return result;
|
||||
}
|
||||
|
||||
@@ -234,13 +237,13 @@ export class AgentRpcDO extends RpcTarget {
|
||||
|
||||
async markAsRead(threadIds: string[]) {
|
||||
const result = await this.mainDo.markAsRead(threadIds);
|
||||
// await Promise.all(threadIds.map((id) => this.mainDo.syncThread(id)));
|
||||
await Promise.all(threadIds.map((id) => this.mainDo.syncThread(id)));
|
||||
return result;
|
||||
}
|
||||
|
||||
async markAsUnread(threadIds: string[]) {
|
||||
const result = await this.mainDo.markAsUnread(threadIds);
|
||||
// await Promise.all(threadIds.map((id) => this.mainDo.syncThread(id)));
|
||||
await Promise.all(threadIds.map((id) => this.mainDo.syncThread(id)));
|
||||
return result;
|
||||
}
|
||||
|
||||
@@ -305,25 +308,27 @@ const shouldLoop = env.THREAD_SYNC_LOOP !== 'false';
|
||||
|
||||
export class ZeroAgent extends AIChatAgent<typeof env> {
|
||||
private chatMessageAbortControllers: Map<string, AbortController> = new Map();
|
||||
private foldersInSync: string[] = [];
|
||||
private foldersInSync: Map<string, boolean> = new Map();
|
||||
private syncThreadsInProgress: Map<string, boolean> = new Map();
|
||||
private currentFolder: string | null = 'inbox';
|
||||
driver: MailManager | null = null;
|
||||
constructor(ctx: DurableObjectState, env: Env) {
|
||||
super(ctx, env);
|
||||
if (shouldDropTables) this.dropTables();
|
||||
// this.sql`
|
||||
// CREATE TABLE IF NOT EXISTS threads (
|
||||
// id TEXT PRIMARY KEY,
|
||||
// created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
// updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
// thread_id TEXT NOT NULL,
|
||||
// provider_id TEXT NOT NULL,
|
||||
// latest_sender TEXT,
|
||||
// latest_received_on TEXT,
|
||||
// latest_subject TEXT,
|
||||
// latest_label_ids TEXT
|
||||
// );
|
||||
// `;
|
||||
this.sql`
|
||||
CREATE TABLE IF NOT EXISTS threads (
|
||||
id TEXT PRIMARY KEY,
|
||||
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
thread_id TEXT NOT NULL,
|
||||
provider_id TEXT NOT NULL,
|
||||
latest_sender TEXT,
|
||||
latest_received_on TEXT,
|
||||
latest_subject TEXT,
|
||||
latest_label_ids TEXT,
|
||||
categories TEXT
|
||||
);
|
||||
`;
|
||||
}
|
||||
|
||||
async dropTables() {
|
||||
@@ -394,7 +399,7 @@ export class ZeroAgent extends AIChatAgent<typeof env> {
|
||||
});
|
||||
if (_connection) this.driver = connectionToDriver(_connection);
|
||||
this.ctx.waitUntil(conn.end());
|
||||
// this.ctx.waitUntil(this.syncThreads('inbox'));
|
||||
this.ctx.waitUntil(this.syncThreads('inbox'));
|
||||
// this.ctx.waitUntil(this.syncThreads('sent'));
|
||||
// this.ctx.waitUntil(this.syncThreads('spam'));
|
||||
// this.ctx.waitUntil(this.syncThreads('archive'));
|
||||
@@ -602,6 +607,19 @@ export class ZeroAgent extends AIChatAgent<typeof env> {
|
||||
maxResults?: number;
|
||||
labelIds?: string[];
|
||||
pageToken?: string;
|
||||
}) {
|
||||
if (!this.driver) {
|
||||
throw new Error('No driver available');
|
||||
}
|
||||
return await this.getThreadsFromDB(params);
|
||||
}
|
||||
|
||||
async rawListThreads(params: {
|
||||
folder: string;
|
||||
query?: string;
|
||||
maxResults?: number;
|
||||
labelIds?: string[];
|
||||
pageToken?: string;
|
||||
}) {
|
||||
if (!this.driver) {
|
||||
throw new Error('No driver available');
|
||||
@@ -613,7 +631,7 @@ export class ZeroAgent extends AIChatAgent<typeof env> {
|
||||
if (!this.driver) {
|
||||
throw new Error('No driver available');
|
||||
}
|
||||
return await this.driver.get(threadId);
|
||||
return await this.getThreadFromDB(threadId);
|
||||
}
|
||||
|
||||
async markThreadsRead(threadIds: string[]) {
|
||||
@@ -758,7 +776,7 @@ export class ZeroAgent extends AIChatAgent<typeof env> {
|
||||
if (!this.driver) {
|
||||
throw new Error('No driver available');
|
||||
}
|
||||
return await this.driver.list(params);
|
||||
return await this.getThreadsFromDB(params);
|
||||
}
|
||||
|
||||
async markAsRead(threadIds: string[]) {
|
||||
@@ -786,7 +804,7 @@ export class ZeroAgent extends AIChatAgent<typeof env> {
|
||||
if (!this.driver) {
|
||||
throw new Error('No driver available');
|
||||
}
|
||||
return await this.driver.get(id);
|
||||
return await this.getThreadFromDB(id);
|
||||
}
|
||||
|
||||
async sendDraft(id: string, data: IOutgoingMessage) {
|
||||
@@ -835,6 +853,12 @@ export class ZeroAgent extends AIChatAgent<typeof env> {
|
||||
throw new Error('No driver available');
|
||||
}
|
||||
|
||||
if (this.syncThreadsInProgress.has(threadId)) {
|
||||
console.log(`Sync already in progress for thread ${threadId}, skipping...`);
|
||||
return;
|
||||
}
|
||||
this.syncThreadsInProgress.set(threadId, true);
|
||||
|
||||
try {
|
||||
const threadData = await this.driver.get(threadId);
|
||||
const latest = threadData.latest;
|
||||
@@ -843,10 +867,7 @@ export class ZeroAgent extends AIChatAgent<typeof env> {
|
||||
// Convert receivedOn to ISO format for proper sorting
|
||||
const normalizedReceivedOn = new Date(latest.receivedOn).toISOString();
|
||||
|
||||
await env.THREADS_BUCKET.put(
|
||||
this.getThreadKey(threadId),
|
||||
JSON.stringify(threadData.messages),
|
||||
);
|
||||
await env.THREADS_BUCKET.put(this.getThreadKey(threadId), JSON.stringify(threadData));
|
||||
|
||||
this.sql`
|
||||
INSERT OR REPLACE INTO threads (
|
||||
@@ -872,10 +893,10 @@ export class ZeroAgent extends AIChatAgent<typeof env> {
|
||||
if (this.currentFolder === 'inbox') {
|
||||
this.broadcastChatMessage({
|
||||
type: OutgoingMessageType.Mail_Get,
|
||||
result: threadData,
|
||||
threadId,
|
||||
});
|
||||
}
|
||||
this.syncThreadsInProgress.delete(threadId);
|
||||
return { success: true, threadId, threadData };
|
||||
} else {
|
||||
console.log(`Skipping thread ${threadId} - no latest message`);
|
||||
@@ -888,7 +909,7 @@ export class ZeroAgent extends AIChatAgent<typeof env> {
|
||||
}
|
||||
|
||||
getThreadKey(threadId: string) {
|
||||
return `${this.name}/${threadId}`;
|
||||
return `${this.name}/${threadId}.json`;
|
||||
}
|
||||
|
||||
async syncThreads(folder: string) {
|
||||
@@ -897,7 +918,7 @@ export class ZeroAgent extends AIChatAgent<typeof env> {
|
||||
throw new Error('No driver available');
|
||||
}
|
||||
|
||||
if (this.foldersInSync.includes(folder)) {
|
||||
if (this.foldersInSync.has(folder)) {
|
||||
console.log('Sync already in progress, skipping...');
|
||||
return { synced: 0, message: 'Sync already in progress' };
|
||||
}
|
||||
@@ -908,7 +929,7 @@ export class ZeroAgent extends AIChatAgent<typeof env> {
|
||||
return { synced: 0, message: 'Threads already synced' };
|
||||
}
|
||||
|
||||
this.foldersInSync.push(folder);
|
||||
this.foldersInSync.set(folder, true);
|
||||
|
||||
try {
|
||||
let totalSynced = 0;
|
||||
@@ -933,6 +954,11 @@ export class ZeroAgent extends AIChatAgent<typeof env> {
|
||||
}
|
||||
}
|
||||
|
||||
this.broadcastChatMessage({
|
||||
type: OutgoingMessageType.Mail_List,
|
||||
folder,
|
||||
});
|
||||
|
||||
totalSynced += result.threads.length;
|
||||
pageToken = result.nextPageToken;
|
||||
hasMore = pageToken !== null && shouldLoop;
|
||||
@@ -944,221 +970,225 @@ export class ZeroAgent extends AIChatAgent<typeof env> {
|
||||
throw error;
|
||||
} finally {
|
||||
console.log('Setting isSyncing to false');
|
||||
this.foldersInSync = this.foldersInSync.filter((f) => f !== folder);
|
||||
this.foldersInSync.delete(folder);
|
||||
this.broadcastChatMessage({
|
||||
type: OutgoingMessageType.Mail_List,
|
||||
folder,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// async getThreadsFromDB(params: {
|
||||
// labelIds?: string[];
|
||||
// folder?: string;
|
||||
// q?: string;
|
||||
// max?: number;
|
||||
// cursor?: string;
|
||||
// }) {
|
||||
// const { labelIds = [], folder, q, max = 50, cursor } = params;
|
||||
async getThreadsFromDB(params: {
|
||||
labelIds?: string[];
|
||||
folder?: string;
|
||||
q?: string;
|
||||
max?: number;
|
||||
pageToken?: string;
|
||||
}) {
|
||||
const { labelIds = [], folder, q, max = 50, pageToken } = params;
|
||||
|
||||
// try {
|
||||
// // Build WHERE conditions
|
||||
// const whereConditions: string[] = [];
|
||||
try {
|
||||
// Build WHERE conditions
|
||||
const whereConditions: string[] = [];
|
||||
|
||||
// // Add folder condition (maps to specific label)
|
||||
// if (folder) {
|
||||
// const folderLabel = folder.toUpperCase();
|
||||
// whereConditions.push(`EXISTS (
|
||||
// SELECT 1 FROM json_each(latest_label_ids) WHERE value = '${folderLabel}'
|
||||
// )`);
|
||||
// }
|
||||
// Add folder condition (maps to specific label)
|
||||
if (folder) {
|
||||
const folderLabel = folder.toUpperCase();
|
||||
whereConditions.push(`EXISTS (
|
||||
SELECT 1 FROM json_each(latest_label_ids) WHERE value = '${folderLabel}'
|
||||
)`);
|
||||
}
|
||||
|
||||
// // Add label conditions (OR logic for multiple labels)
|
||||
// if (labelIds.length > 0) {
|
||||
// if (labelIds.length === 1) {
|
||||
// whereConditions.push(`EXISTS (
|
||||
// SELECT 1 FROM json_each(latest_label_ids) WHERE value = '${labelIds[0]}'
|
||||
// )`);
|
||||
// } else {
|
||||
// // Multiple labels with OR logic
|
||||
// const multiLabelCondition = labelIds
|
||||
// .map(
|
||||
// (labelId) =>
|
||||
// `EXISTS (SELECT 1 FROM json_each(latest_label_ids) WHERE value = '${labelId}')`,
|
||||
// )
|
||||
// .join(' OR ');
|
||||
// whereConditions.push(`(${multiLabelCondition})`);
|
||||
// }
|
||||
// }
|
||||
// Add label conditions (OR logic for multiple labels)
|
||||
if (labelIds.length > 0) {
|
||||
if (labelIds.length === 1) {
|
||||
whereConditions.push(`EXISTS (
|
||||
SELECT 1 FROM json_each(latest_label_ids) WHERE value = '${labelIds[0]}'
|
||||
)`);
|
||||
} else {
|
||||
// Multiple labels with OR logic
|
||||
const multiLabelCondition = labelIds
|
||||
.map(
|
||||
(labelId) =>
|
||||
`EXISTS (SELECT 1 FROM json_each(latest_label_ids) WHERE value = '${labelId}')`,
|
||||
)
|
||||
.join(' OR ');
|
||||
whereConditions.push(`(${multiLabelCondition})`);
|
||||
}
|
||||
}
|
||||
|
||||
// // // Add search query condition
|
||||
// // if (q) {
|
||||
// // const searchTerm = q.replace(/'/g, "''"); // Escape single quotes
|
||||
// // whereConditions.push(`(
|
||||
// // latest_subject LIKE '%${searchTerm}%' OR
|
||||
// // latest_sender LIKE '%${searchTerm}%' OR
|
||||
// // messages LIKE '%${searchTerm}%'
|
||||
// // )`);
|
||||
// // }
|
||||
// // Add search query condition
|
||||
if (q) {
|
||||
const searchTerm = q.replace(/'/g, "''"); // Escape single quotes
|
||||
whereConditions.push(`(
|
||||
latest_subject LIKE '%${searchTerm}%' OR
|
||||
latest_sender LIKE '%${searchTerm}%'
|
||||
)`);
|
||||
}
|
||||
|
||||
// // Add cursor condition
|
||||
// if (cursor) {
|
||||
// whereConditions.push(`latest_received_on < '${cursor}'`);
|
||||
// }
|
||||
// Add cursor condition
|
||||
if (pageToken) {
|
||||
whereConditions.push(`latest_received_on < '${pageToken}'`);
|
||||
}
|
||||
|
||||
// // Execute query based on conditions
|
||||
// let result;
|
||||
// Execute query based on conditions
|
||||
let result;
|
||||
|
||||
// if (whereConditions.length === 0) {
|
||||
// // No conditions
|
||||
// result = await this.sql`
|
||||
// SELECT id, latest_received_on
|
||||
// FROM threads
|
||||
// ORDER BY latest_received_on DESC
|
||||
// LIMIT ${max}
|
||||
// `;
|
||||
// } else if (whereConditions.length === 1) {
|
||||
// // Single condition
|
||||
// const condition = whereConditions[0];
|
||||
// if (condition.includes('latest_received_on <')) {
|
||||
// const cursorValue = cursor!;
|
||||
// result = await this.sql`
|
||||
// SELECT id, latest_received_on
|
||||
// FROM threads
|
||||
// WHERE latest_received_on < ${cursorValue}
|
||||
// ORDER BY latest_received_on DESC
|
||||
// LIMIT ${max}
|
||||
// `;
|
||||
// } else if (folder) {
|
||||
// // Folder condition
|
||||
// const folderLabel = folder.toUpperCase();
|
||||
// result = await this.sql`
|
||||
// SELECT id, latest_received_on
|
||||
// FROM threads
|
||||
// WHERE EXISTS (
|
||||
// SELECT 1 FROM json_each(latest_label_ids) WHERE value = ${folderLabel}
|
||||
// )
|
||||
// ORDER BY latest_received_on DESC
|
||||
// LIMIT ${max}
|
||||
// `;
|
||||
// } else {
|
||||
// // Single label condition
|
||||
// const labelId = labelIds[0];
|
||||
// result = await this.sql`
|
||||
// SELECT id, latest_received_on
|
||||
// FROM threads
|
||||
// WHERE EXISTS (
|
||||
// SELECT 1 FROM json_each(latest_label_ids) WHERE value = ${labelId}
|
||||
// )
|
||||
// ORDER BY latest_received_on DESC
|
||||
// LIMIT ${max}
|
||||
// `;
|
||||
// }
|
||||
// } else {
|
||||
// // Multiple conditions - handle combinations
|
||||
// if (folder && labelIds.length === 0 && cursor) {
|
||||
// // Folder + cursor
|
||||
// const folderLabel = folder.toUpperCase();
|
||||
// result = await this.sql`
|
||||
// SELECT id, latest_received_on
|
||||
// FROM threads
|
||||
// WHERE EXISTS (
|
||||
// SELECT 1 FROM json_each(latest_label_ids) WHERE value = ${folderLabel}
|
||||
// ) AND latest_received_on < ${cursor}
|
||||
// ORDER BY latest_received_on DESC
|
||||
// LIMIT ${max}
|
||||
// `;
|
||||
// } else if (labelIds.length === 1 && cursor && !folder) {
|
||||
// // Single label + cursor
|
||||
// const labelId = labelIds[0];
|
||||
// result = await this.sql`
|
||||
// SELECT id, latest_received_on
|
||||
// FROM threads
|
||||
// WHERE EXISTS (
|
||||
// SELECT 1 FROM json_each(latest_label_ids) WHERE value = ${labelId}
|
||||
// ) AND latest_received_on < ${cursor}
|
||||
// ORDER BY latest_received_on DESC
|
||||
// LIMIT ${max}
|
||||
// `;
|
||||
// } else {
|
||||
// // For now, fallback to just cursor if complex combinations
|
||||
// const cursorValue = cursor || '';
|
||||
// result = await this.sql`
|
||||
// SELECT id, latest_received_on
|
||||
// FROM threads
|
||||
// WHERE latest_received_on < ${cursorValue}
|
||||
// ORDER BY latest_received_on DESC
|
||||
// LIMIT ${max}
|
||||
// `;
|
||||
// }
|
||||
// }
|
||||
if (whereConditions.length === 0) {
|
||||
// No conditions
|
||||
result = await this.sql`
|
||||
SELECT id, latest_received_on
|
||||
FROM threads
|
||||
ORDER BY latest_received_on DESC
|
||||
LIMIT ${max}
|
||||
`;
|
||||
} else if (whereConditions.length === 1) {
|
||||
// Single condition
|
||||
const condition = whereConditions[0];
|
||||
if (condition.includes('latest_received_on <')) {
|
||||
const cursorValue = pageToken!;
|
||||
result = await this.sql`
|
||||
SELECT id, latest_received_on
|
||||
FROM threads
|
||||
WHERE latest_received_on < ${cursorValue}
|
||||
ORDER BY latest_received_on DESC
|
||||
LIMIT ${max}
|
||||
`;
|
||||
} else if (folder) {
|
||||
// Folder condition
|
||||
const folderLabel = folder.toUpperCase();
|
||||
result = await this.sql`
|
||||
SELECT id, latest_received_on
|
||||
FROM threads
|
||||
WHERE EXISTS (
|
||||
SELECT 1 FROM json_each(latest_label_ids) WHERE value = ${folderLabel}
|
||||
)
|
||||
ORDER BY latest_received_on DESC
|
||||
LIMIT ${max}
|
||||
`;
|
||||
} else {
|
||||
// Single label condition
|
||||
const labelId = labelIds[0];
|
||||
result = await this.sql`
|
||||
SELECT id, latest_received_on
|
||||
FROM threads
|
||||
WHERE EXISTS (
|
||||
SELECT 1 FROM json_each(latest_label_ids) WHERE value = ${labelId}
|
||||
)
|
||||
ORDER BY latest_received_on DESC
|
||||
LIMIT ${max}
|
||||
`;
|
||||
}
|
||||
} else {
|
||||
// Multiple conditions - handle combinations
|
||||
if (folder && labelIds.length === 0 && pageToken) {
|
||||
// Folder + cursor
|
||||
const folderLabel = folder.toUpperCase();
|
||||
result = await this.sql`
|
||||
SELECT id, latest_received_on
|
||||
FROM threads
|
||||
WHERE EXISTS (
|
||||
SELECT 1 FROM json_each(latest_label_ids) WHERE value = ${folderLabel}
|
||||
) AND latest_received_on < ${pageToken}
|
||||
ORDER BY latest_received_on DESC
|
||||
LIMIT ${max}
|
||||
`;
|
||||
} else if (labelIds.length === 1 && pageToken && !folder) {
|
||||
// Single label + cursor
|
||||
const labelId = labelIds[0];
|
||||
result = await this.sql`
|
||||
SELECT id, latest_received_on
|
||||
FROM threads
|
||||
WHERE EXISTS (
|
||||
SELECT 1 FROM json_each(latest_label_ids) WHERE value = ${labelId}
|
||||
) AND latest_received_on < ${pageToken}
|
||||
ORDER BY latest_received_on DESC
|
||||
LIMIT ${max}
|
||||
`;
|
||||
} else {
|
||||
// For now, fallback to just cursor if complex combinations
|
||||
const cursorValue = pageToken || '';
|
||||
result = await this.sql`
|
||||
SELECT id, latest_received_on
|
||||
FROM threads
|
||||
WHERE latest_received_on < ${cursorValue}
|
||||
ORDER BY latest_received_on DESC
|
||||
LIMIT ${max}
|
||||
`;
|
||||
}
|
||||
}
|
||||
|
||||
// const threads = result.map((row: any) => ({
|
||||
// id: row.id,
|
||||
// historyId: null,
|
||||
// }));
|
||||
const threads = result.map((row: any) => ({
|
||||
id: row.id,
|
||||
historyId: null,
|
||||
}));
|
||||
|
||||
// // Use latest_received_on for pagination cursor
|
||||
// const nextPageToken =
|
||||
// threads.length === max && result.length > 0
|
||||
// ? result[result.length - 1].latest_received_on
|
||||
// : null;
|
||||
// Use latest_received_on for pagination cursor
|
||||
const nextPageToken =
|
||||
threads.length === max && result.length > 0
|
||||
? result[result.length - 1].latest_received_on
|
||||
: null;
|
||||
|
||||
// return {
|
||||
// threads,
|
||||
// nextPageToken,
|
||||
// };
|
||||
// } catch (error) {
|
||||
// console.error('Failed to get threads from database:', error);
|
||||
// throw error;
|
||||
// }
|
||||
// }
|
||||
return {
|
||||
threads,
|
||||
nextPageToken,
|
||||
};
|
||||
} catch (error) {
|
||||
console.error('Failed to get threads from database:', error);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
// async getThreadFromDB(id: string): Promise<IGetThreadResponse> {
|
||||
// try {
|
||||
// const result = this.sql`
|
||||
// SELECT
|
||||
// id,
|
||||
// thread_id,
|
||||
// provider_id,
|
||||
// latest_sender,
|
||||
// latest_received_on,
|
||||
// latest_subject,
|
||||
// latest_label_ids,
|
||||
// created_at,
|
||||
// updated_at
|
||||
// FROM threads
|
||||
// WHERE id = ${id}
|
||||
// LIMIT 1
|
||||
// `;
|
||||
async getThreadFromDB(id: string): Promise<IGetThreadResponse> {
|
||||
try {
|
||||
const result = this.sql`
|
||||
SELECT
|
||||
id,
|
||||
thread_id,
|
||||
provider_id,
|
||||
latest_sender,
|
||||
latest_received_on,
|
||||
latest_subject,
|
||||
latest_label_ids,
|
||||
created_at,
|
||||
updated_at
|
||||
FROM threads
|
||||
WHERE id = ${id}
|
||||
LIMIT 1
|
||||
`;
|
||||
|
||||
// if (result.length === 0) {
|
||||
// this.ctx.waitUntil(this.syncThread(id));
|
||||
// return {
|
||||
// messages: [],
|
||||
// latest: undefined,
|
||||
// hasUnread: false,
|
||||
// totalReplies: 0,
|
||||
// labels: [],
|
||||
// } satisfies IGetThreadResponse;
|
||||
// }
|
||||
if (result.length === 0) {
|
||||
this.ctx.waitUntil(this.syncThread(id));
|
||||
return {
|
||||
messages: [],
|
||||
latest: undefined,
|
||||
hasUnread: false,
|
||||
totalReplies: 0,
|
||||
labels: [],
|
||||
} satisfies IGetThreadResponse;
|
||||
}
|
||||
|
||||
// const row = result[0] as any;
|
||||
// const storedMessages = await env.THREADS_BUCKET.get(this.getThreadKey(id));
|
||||
// const latestLabelIds = JSON.parse(row.latest_label_ids || '[]');
|
||||
const row = result[0] as any;
|
||||
const storedThread = await env.THREADS_BUCKET.get(this.getThreadKey(id));
|
||||
|
||||
// const messages: ParsedMessage[] = storedMessages
|
||||
// ? JSON.parse(await storedMessages.text())
|
||||
// : [];
|
||||
const messages: ParsedMessage[] = storedThread
|
||||
? (JSON.parse(await storedThread.text()) as IGetThreadResponse).messages
|
||||
: [];
|
||||
|
||||
// return {
|
||||
// messages,
|
||||
// latest: messages.length > 0 ? messages[messages.length - 1] : undefined,
|
||||
// hasUnread: latestLabelIds.includes('UNREAD'),
|
||||
// totalReplies: messages.length,
|
||||
// labels: latestLabelIds.map((id: string) => ({ id, name: id })),
|
||||
// } satisfies IGetThreadResponse;
|
||||
// } catch (error) {
|
||||
// console.error('Failed to get thread from database:', error);
|
||||
// throw error;
|
||||
// }
|
||||
// }
|
||||
const latestLabelIds = JSON.parse(row.latest_label_ids || '[]');
|
||||
|
||||
return {
|
||||
messages,
|
||||
latest: messages.length > 0 ? messages[messages.length - 1] : undefined,
|
||||
hasUnread: latestLabelIds.includes('UNREAD'),
|
||||
totalReplies: messages.length,
|
||||
labels: latestLabelIds.map((id: string) => ({ id, name: id })),
|
||||
} satisfies IGetThreadResponse;
|
||||
} catch (error) {
|
||||
console.error('Failed to get thread from database:', error);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export class ZeroMCP extends McpAgent<typeof env, {}, { userId: string }> {
|
||||
|
||||
@@ -82,24 +82,26 @@ export const mailRouter = router({
|
||||
});
|
||||
return drafts;
|
||||
}
|
||||
// if (q) {
|
||||
if (q) {
|
||||
const threadsResponse = await agent.rawListThreads({
|
||||
labelIds: labelIds,
|
||||
maxResults: max,
|
||||
pageToken: cursor,
|
||||
query: q,
|
||||
folder,
|
||||
});
|
||||
return threadsResponse;
|
||||
}
|
||||
const folderLabelId = getFolderLabelId(folder);
|
||||
const labelIdsToUse = folderLabelId ? [...labelIds, folderLabelId] : labelIds;
|
||||
const threadsResponse = await agent.listThreads({
|
||||
labelIds: labelIds,
|
||||
labelIds: labelIdsToUse,
|
||||
maxResults: max,
|
||||
pageToken: cursor,
|
||||
query: q,
|
||||
folder,
|
||||
});
|
||||
return threadsResponse;
|
||||
// }
|
||||
// const folderLabelId = getFolderLabelId(folder);
|
||||
// const labelIdsToUse = folderLabelId ? [...labelIds, folderLabelId] : labelIds;
|
||||
// const threadsResponse = await agent.getThreadsFromDB({
|
||||
// labelIds: labelIdsToUse,
|
||||
// max: max,
|
||||
// cursor: cursor,
|
||||
// });
|
||||
// return threadsResponse;
|
||||
}),
|
||||
markAsRead: activeDriverProcedure
|
||||
.input(
|
||||
|
||||
@@ -116,7 +116,7 @@
|
||||
"DISABLE_CALLS": "true",
|
||||
"VOICE_SECRET": "1234567890",
|
||||
"GOOGLE_S_ACCOUNT": "{}",
|
||||
"DROP_AGENT_TABLES": "false",
|
||||
"DROP_AGENT_TABLES": "true",
|
||||
"THREAD_SYNC_MAX_COUNT": "40",
|
||||
"THREAD_SYNC_LOOP": "false",
|
||||
},
|
||||
@@ -260,9 +260,9 @@
|
||||
"VITE_PUBLIC_BACKEND_URL": "https://sapi.0.email",
|
||||
"VITE_PUBLIC_APP_URL": "https://staging.0.email",
|
||||
"DISABLE_CALLS": "",
|
||||
"DROP_AGENT_TABLES": "false",
|
||||
"DROP_AGENT_TABLES": "true",
|
||||
"THREAD_SYNC_MAX_COUNT": "40",
|
||||
"THREAD_SYNC_LOOP": "false",
|
||||
"THREAD_SYNC_LOOP": "true",
|
||||
},
|
||||
"kv_namespaces": [
|
||||
{
|
||||
@@ -401,9 +401,9 @@
|
||||
"VITE_PUBLIC_BACKEND_URL": "https://api.0.email",
|
||||
"VITE_PUBLIC_APP_URL": "https://0.email",
|
||||
"DISABLE_CALLS": "true",
|
||||
"DROP_AGENT_TABLES": "false",
|
||||
"DROP_AGENT_TABLES": "true",
|
||||
"THREAD_SYNC_MAX_COUNT": "40",
|
||||
"THREAD_SYNC_LOOP": "false",
|
||||
"THREAD_SYNC_LOOP": "true",
|
||||
},
|
||||
"kv_namespaces": [
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user