diff --git a/apps/server/src/lib/prompts.ts b/apps/server/src/lib/prompts.ts index d624e966f..7f63dbe5c 100644 --- a/apps/server/src/lib/prompts.ts +++ b/apps/server/src/lib/prompts.ts @@ -394,7 +394,7 @@ export const AiChatPrompt = () => Add/remove labels from threads - Get label IDs first with getUserLabels + Always use the label names, not the IDs modifyLabels({ threadIds: [...], options: { addLabels: [...], removeLabels: [...] } }) diff --git a/apps/server/src/pipelines.effect.ts b/apps/server/src/pipelines.effect.ts index bdb804d07..ecc97711f 100644 --- a/apps/server/src/pipelines.effect.ts +++ b/apps/server/src/pipelines.effect.ts @@ -53,6 +53,7 @@ ${prompt} const appendContext = (prompt: string, context?: Record) => { if (!context) return prompt; return dedent` + use sequential thinking to solve the user's problem when the user asks about "this" thread or "this" email, use the threadId to get the thread details when the user asks about "this" folder, use the currentFolder to get the folder details diff --git a/apps/server/src/routes/agent/db/index.ts b/apps/server/src/routes/agent/db/index.ts index 1c67dc4d6..132551cec 100644 --- a/apps/server/src/routes/agent/db/index.ts +++ b/apps/server/src/routes/agent/db/index.ts @@ -20,7 +20,6 @@ const threadSelect = { latestSender: threads.latestSender, latestReceivedOn: threads.latestReceivedOn, latestSubject: threads.latestSubject, - latestLabelIds: threads.latestLabelIds, } as const; async function createMissingLabels(db: DB, labelIds: string[]): Promise { diff --git a/apps/server/src/routes/agent/db/schema.ts b/apps/server/src/routes/agent/db/schema.ts index 35c82379c..3901d9fdf 100644 --- a/apps/server/src/routes/agent/db/schema.ts +++ b/apps/server/src/routes/agent/db/schema.ts @@ -11,7 +11,6 @@ export const threads = sqliteTable( latestSender: text('latest_sender', { mode: 'json' }).$type(), latestReceivedOn: text('latest_received_on'), latestSubject: text('latest_subject'), - latestLabelIds: text('latest_label_ids', { mode: 'json' }).$type(), }, (table) => [ index('threads_thread_id_idx').on(table.threadId), diff --git a/apps/server/src/routes/agent/index.ts b/apps/server/src/routes/agent/index.ts index 8d18a487c..654997845 100644 --- a/apps/server/src/routes/agent/index.ts +++ b/apps/server/src/routes/agent/index.ts @@ -34,7 +34,7 @@ import { type ParsedMessage, } from '../../types'; import type { IGetThreadResponse, IGetThreadsResponse, MailManager } from '../../lib/driver/types'; -import { countThreads, countThreadsByLabel, create, get, modifyThreadLabels, type DB } from './db'; +import { countThreads, countThreadsByLabel, create, get, getThreadLabels, modifyThreadLabels, type DB } from './db'; import { generateWhatUserCaresAbout, type UserTopic } from '../../lib/analyze/interests'; import { DurableObjectOAuthClientProvider } from 'agents/mcp/do-oauth-client-provider'; import { AiChatPrompt, GmailSearchAssistantSystemPrompt } from '../../lib/prompts'; @@ -60,6 +60,7 @@ import { openai } from '@ai-sdk/openai'; import * as schema from './db/schema'; import { threads } from './db/schema'; import { Effect, pipe } from 'effect'; +import { groq } from '@ai-sdk/groq'; import { createDb } from '../../db'; import type { Message } from 'ai'; import { eq } from 'drizzle-orm'; @@ -952,17 +953,16 @@ export class ZeroDriver extends DurableObject { // Update database yield* Effect.tryPromise(() => create( - this.db, - { - id: threadId, - threadId, - providerId: 'google', - latestSender: latest.sender, - latestReceivedOn: normalizedReceivedOn, - latestSubject: latest.subject, - latestLabelIds: latest.tags.map((tag) => tag.id), - }, - latest.tags.map((tag) => tag.id), + this.db, + { + id: threadId, + threadId, + providerId: 'google', + latestSender: latest.sender, + latestReceivedOn: normalizedReceivedOn, + latestSubject: latest.subject, + }, + latest.tags.map((tag) => tag.id), ), ).pipe( Effect.tap(() => @@ -1610,25 +1610,20 @@ export class ZeroDriver extends DurableObject { async modifyThreadLabelsInDB(threadId: string, addLabels: string[], removeLabels: string[]) { try { // Get current labels before modification - const currentThread = await get(this.db, { id: threadId }); + let currentThread = await get(this.db, { id: threadId }); if (!currentThread) { - throw new Error(`Thread ${threadId} not found in database`); + await this.syncThread({ threadId }); + currentThread = await get(this.db, { id: threadId }); } - let currentLabels: string[]; - try { - const labelIds = currentThread.latestLabelIds; - if (Array.isArray(labelIds)) { - currentLabels = labelIds; - } else { - currentLabels = []; - } - } catch (error) { - console.error(`Invalid JSON in latest_label_ids for thread ${threadId}:`, error); - currentLabels = []; + if (!currentThread) { + throw new Error(`Thread ${threadId} not found in database and could not be synced`); } + const currentLabelsData = await getThreadLabels(this.db, threadId); + const currentLabels = currentLabelsData.map((l) => l.id); + // Use the new database operations to modify labels const result = await modifyThreadLabels(this.db, threadId, addLabels, removeLabels); @@ -1668,7 +1663,6 @@ export class ZeroDriver extends DurableObject { labels: [], } satisfies IGetThreadResponse; } - const row = result; const storedThread = await this.env.THREADS_BUCKET.get(this.getThreadKey(id)); let messages: ParsedMessage[] = storedThread @@ -1681,14 +1675,21 @@ export class ZeroDriver extends DurableObject { messages = messages.filter((e) => e.isDraft !== true); } - const latestLabelIds = row.latestLabelIds; + const labelsList = await getThreadLabels(this.db, id); + const labelIds = labelsList.map((l) => l.id); + + console.log( + '[getThreadFromDB] storedThread:', + labelIds, + messages.findLast((e) => e.isDraft !== true), + ); return { messages, latest: messages.findLast((e) => e.isDraft !== true), - hasUnread: latestLabelIds?.includes('UNREAD') || false, + hasUnread: labelIds.includes('UNREAD'), totalReplies: messages.filter((e) => e.isDraft !== true).length, - labels: latestLabelIds?.map((id: string) => ({ id, name: id })) || [], + labels: labelsList, isLatestDraft, } satisfies IGetThreadResponse; } catch (error) { @@ -1813,7 +1814,7 @@ export class ZeroAgent extends AIChatAgent { const model = this.env.USE_OPENAI === 'true' - ? openai(this.env.OPENAI_MODEL || 'gpt-4o') + ? groq('openai/gpt-oss-120b') : anthropic(this.env.OPENAI_MODEL || 'claude-3-7-sonnet-20250219'); const result = streamText({ diff --git a/apps/server/src/routes/agent/tools.ts b/apps/server/src/routes/agent/tools.ts index b18a80afd..8581a410c 100644 --- a/apps/server/src/routes/agent/tools.ts +++ b/apps/server/src/routes/agent/tools.ts @@ -242,8 +242,14 @@ const modifyLabels = (connectionId: string) => parameters: z.object({ threadIds: z.array(z.string()).describe('The IDs of the threads to modify'), options: z.object({ - addLabels: z.array(z.string()).default([]).describe('The labels to add'), - removeLabels: z.array(z.string()).default([]).describe('The labels to remove'), + addLabels: z + .array(z.string()) + .default([]) + .describe('The labels to add, an array of label names'), + removeLabels: z + .array(z.string()) + .default([]) + .describe('The labels to remove, an array of label names'), }), }), execute: async ({ threadIds, options }) => { @@ -482,6 +488,7 @@ export const tools = async (connectionId: string, ragEffect: boolean = false) => [Tools.DeleteLabel]: deleteLabel(connectionId), [Tools.BuildGmailSearchQuery]: buildGmailSearchQuery(), [Tools.GetCurrentDate]: getCurrentDate(), + [Tools.WebSearch]: webSearch(), [Tools.InboxRag]: tool({ description: 'Search the inbox for emails using natural language. Returns only an array of threadIds.',