mirror of
https://github.com/Mail-0/Zero.git
synced 2026-06-29 07:16:19 +00:00
Replace Sentry with Cloudflare WorkerEntrypoint class (#1865)
# READ CAREFULLY THEN REMOVE Remove bullet points that are not relevant. PLEASE REFRAIN FROM USING AI TO WRITE YOUR CODE AND PR DESCRIPTION. IF YOU DO USE AI TO WRITE YOUR CODE PLEASE PROVIDE A DESCRIPTION AND REVIEW IT CAREFULLY. MAKE SURE YOU UNDERSTAND THE CODE YOU ARE SUBMITTING USING AI. - Pull requests that do not follow these guidelines will be closed without review or comment. - If you use AI to write your PR description your pr will be close without review or comment. - If you are unsure about anything, feel free to ask for clarification. ## Description Please provide a clear description of your changes. --- ## Type of Change Please delete options that are not relevant. - [ ] 🐛 Bug fix (non-breaking change which fixes an issue) - [ ] ✨ New feature (non-breaking change which adds functionality) - [ ] 💥 Breaking change (fix or feature with breaking changes) - [ ] 📝 Documentation update - [ ] 🎨 UI/UX improvement - [ ] 🔒 Security enhancement - [ ] ⚡ Performance improvement ## Areas Affected Please check all that apply: - [ ] Email Integration (Gmail, IMAP, etc.) - [ ] User Interface/Experience - [ ] Authentication/Authorization - [ ] Data Storage/Management - [ ] API Endpoints - [ ] Documentation - [ ] Testing Infrastructure - [ ] Development Workflow - [ ] Deployment/Infrastructure ## Testing Done Describe the tests you've done: - [ ] Unit tests added/updated - [ ] Integration tests added/updated - [ ] Manual testing performed - [ ] Cross-browser testing (if UI changes) - [ ] Mobile responsiveness verified (if UI changes) ## Security Considerations For changes involving data or authentication: - [ ] No sensitive data is exposed - [ ] Authentication checks are in place - [ ] Input validation is implemented - [ ] Rate limiting is considered (if applicable) ## Checklist - [ ] I have read the [CONTRIBUTING](https://github.com/Mail-0/Zero/blob/staging/.github/CONTRIBUTING.md) document - [ ] My code follows the project's style guidelines - [ ] I have performed a self-review of my code - [ ] I have commented my code, particularly in complex areas - [ ] I have updated the documentation - [ ] My changes generate no new warnings - [ ] I have added tests that prove my fix/feature works - [ ] All tests pass locally - [ ] Any dependent changes are merged and published ## Additional Notes Add any other context about the pull request here. ## Screenshots/Recordings Add screenshots or recordings here if applicable. --- _By submitting this pull request, I confirm that my contribution is made under the terms of the project's license._ <!-- This is an auto-generated description by cubic. --> --- ## Summary by cubic Replaced Sentry integration with a Cloudflare WorkerEntrypoint class to handle fetch, queue, and scheduled events directly. - **Refactors** - Removed withSentry wrapper and migrated all event handling logic into a WorkerEntrypoint class. <!-- End of auto-generated description by cubic. -->
This commit is contained in:
@@ -14,8 +14,8 @@ import {
|
||||
userSettings,
|
||||
writingStyleMatrix,
|
||||
} from './db/schema';
|
||||
import { env, DurableObject, RpcTarget, WorkerEntrypoint } from 'cloudflare:workers';
|
||||
import { EProviders, type ISubscribeBatch, type IThreadBatch } from './types';
|
||||
import { env, DurableObject, RpcTarget } from 'cloudflare:workers';
|
||||
import { oAuthDiscoveryMetadata } from 'better-auth/plugins';
|
||||
import { getZeroDB, verifyToken } from './lib/server-utils';
|
||||
import { eq, and, desc, asc, inArray } from 'drizzle-orm';
|
||||
@@ -26,7 +26,6 @@ import { defaultUserSettings } from './lib/schemas';
|
||||
import { createLocalJWKSet, jwtVerify } from 'jose';
|
||||
import { getZeroAgent } from './lib/server-utils';
|
||||
import { enableBrainFunction } from './lib/brain';
|
||||
import { withSentry } from '@sentry/cloudflare';
|
||||
import { trpcServer } from '@hono/trpc-server';
|
||||
import { agentsMiddleware } from 'hono-agents';
|
||||
import { ZeroMCP } from './routes/agent/mcp';
|
||||
@@ -718,150 +717,145 @@ const app = new Hono<HonoContext>()
|
||||
return c.json({ message: 'OK' }, { status: 200 });
|
||||
}
|
||||
});
|
||||
export default withSentry(
|
||||
() => ({
|
||||
dsn: 'https://54d9ec6795f10e5c6d1c4851523d4888@o4509328786915328.ingest.us.sentry.io/4509753563938816',
|
||||
}),
|
||||
{
|
||||
async fetch(request: Request, env: Env, ctx: ExecutionContext): Promise<Response> {
|
||||
return app.fetch(request, env, ctx);
|
||||
},
|
||||
async queue(batch: MessageBatch<any>) {
|
||||
switch (true) {
|
||||
case batch.queue.startsWith('subscribe-queue'): {
|
||||
console.log('batch', batch);
|
||||
await Promise.all(
|
||||
batch.messages.map(async (msg: Message<ISubscribeBatch>) => {
|
||||
const connectionId = msg.body.connectionId;
|
||||
const providerId = msg.body.providerId;
|
||||
try {
|
||||
await enableBrainFunction({ id: connectionId, providerId });
|
||||
} catch (error) {
|
||||
console.error(
|
||||
`Failed to enable brain function for connection ${connectionId}:`,
|
||||
error,
|
||||
);
|
||||
}
|
||||
}),
|
||||
);
|
||||
console.log('[SUBSCRIBE_QUEUE] batch done');
|
||||
return;
|
||||
}
|
||||
case batch.queue.startsWith('thread-queue'): {
|
||||
await Promise.all(
|
||||
batch.messages.map(async (msg: Message<IThreadBatch>) => {
|
||||
const providerId = msg.body.providerId;
|
||||
const historyId = msg.body.historyId;
|
||||
const subscriptionName = msg.body.subscriptionName;
|
||||
|
||||
try {
|
||||
const workflowRunner = env.WORKFLOW_RUNNER.get(env.WORKFLOW_RUNNER.newUniqueId());
|
||||
const result = await workflowRunner.runMainWorkflow({
|
||||
providerId,
|
||||
historyId,
|
||||
subscriptionName,
|
||||
});
|
||||
console.log('[THREAD_QUEUE] result', result);
|
||||
} catch (error) {
|
||||
console.error('Error running workflow', error);
|
||||
}
|
||||
}),
|
||||
);
|
||||
break;
|
||||
}
|
||||
}
|
||||
},
|
||||
async scheduled() {
|
||||
console.log('[SCHEDULED] Checking for expired subscriptions...');
|
||||
const { db, conn } = createDb(env.HYPERDRIVE.connectionString);
|
||||
const allAccounts = await db.query.connection.findMany({
|
||||
where: (fields, { isNotNull, and }) =>
|
||||
and(isNotNull(fields.accessToken), isNotNull(fields.refreshToken)),
|
||||
});
|
||||
await conn.end();
|
||||
console.log('[SCHEDULED] allAccounts', allAccounts.length);
|
||||
const now = new Date();
|
||||
const fiveDaysAgo = new Date(now.getTime() - 5 * 24 * 60 * 60 * 1000);
|
||||
|
||||
const expiredSubscriptions: Array<{ connectionId: string; providerId: EProviders }> = [];
|
||||
|
||||
const nowTs = Date.now();
|
||||
|
||||
const unsnoozeMap: Record<string, { threadIds: string[]; keyNames: string[] }> = {};
|
||||
|
||||
let cursor: string | undefined = undefined;
|
||||
do {
|
||||
const listResp: {
|
||||
keys: { name: string; metadata?: { wakeAt?: string } }[];
|
||||
cursor?: string;
|
||||
} = await env.snoozed_emails.list({ cursor, limit: 1000 });
|
||||
cursor = listResp.cursor;
|
||||
|
||||
for (const key of listResp.keys) {
|
||||
try {
|
||||
const wakeAtIso = (key as any).metadata?.wakeAt as string | undefined;
|
||||
if (!wakeAtIso) continue;
|
||||
const wakeAt = new Date(wakeAtIso).getTime();
|
||||
if (wakeAt > nowTs) continue;
|
||||
|
||||
const [threadId, connectionId] = key.name.split('__');
|
||||
if (!threadId || !connectionId) continue;
|
||||
|
||||
if (!unsnoozeMap[connectionId]) {
|
||||
unsnoozeMap[connectionId] = { threadIds: [], keyNames: [] };
|
||||
}
|
||||
unsnoozeMap[connectionId].threadIds.push(threadId);
|
||||
unsnoozeMap[connectionId].keyNames.push(key.name);
|
||||
} catch (error) {
|
||||
console.error('Failed to prepare unsnooze for key', key.name, error);
|
||||
}
|
||||
}
|
||||
} while (cursor);
|
||||
|
||||
await Promise.all(
|
||||
Object.entries(unsnoozeMap).map(async ([connectionId, { threadIds, keyNames }]) => {
|
||||
try {
|
||||
const agent = await getZeroAgent(connectionId);
|
||||
await agent.queue('unsnoozeThreadsHandler', { connectionId, threadIds, keyNames });
|
||||
} catch (error) {
|
||||
console.error('Failed to enqueue unsnooze tasks', { connectionId, threadIds, error });
|
||||
}
|
||||
}),
|
||||
);
|
||||
|
||||
await Promise.all(
|
||||
allAccounts.map(async ({ id, providerId }) => {
|
||||
const lastSubscribed = await env.gmail_sub_age.get(`${id}__${providerId}`);
|
||||
|
||||
if (lastSubscribed) {
|
||||
const subscriptionDate = new Date(lastSubscribed);
|
||||
if (subscriptionDate < fiveDaysAgo) {
|
||||
console.log(`[SCHEDULED] Found expired Google subscription for connection: ${id}`);
|
||||
expiredSubscriptions.push({ connectionId: id, providerId: providerId as EProviders });
|
||||
}
|
||||
} else {
|
||||
expiredSubscriptions.push({ connectionId: id, providerId: providerId as EProviders });
|
||||
}
|
||||
}),
|
||||
);
|
||||
|
||||
// Send expired subscriptions to queue for renewal
|
||||
if (expiredSubscriptions.length > 0) {
|
||||
console.log(
|
||||
`[SCHEDULED] Sending ${expiredSubscriptions.length} expired subscriptions to renewal queue`,
|
||||
);
|
||||
export default class Entry extends WorkerEntrypoint<Env> {
|
||||
async fetch(request: Request): Promise<Response> {
|
||||
return app.fetch(request, this.env, this.ctx);
|
||||
}
|
||||
async queue(batch: MessageBatch<any>) {
|
||||
switch (true) {
|
||||
case batch.queue.startsWith('subscribe-queue'): {
|
||||
console.log('batch', batch);
|
||||
await Promise.all(
|
||||
expiredSubscriptions.map(async ({ connectionId, providerId }) => {
|
||||
await env.subscribe_queue.send({ connectionId, providerId });
|
||||
batch.messages.map(async (msg: Message<ISubscribeBatch>) => {
|
||||
const connectionId = msg.body.connectionId;
|
||||
const providerId = msg.body.providerId;
|
||||
try {
|
||||
await enableBrainFunction({ id: connectionId, providerId });
|
||||
} catch (error) {
|
||||
console.error(
|
||||
`Failed to enable brain function for connection ${connectionId}:`,
|
||||
error,
|
||||
);
|
||||
}
|
||||
}),
|
||||
);
|
||||
console.log('[SUBSCRIBE_QUEUE] batch done');
|
||||
return;
|
||||
}
|
||||
case batch.queue.startsWith('thread-queue'): {
|
||||
await Promise.all(
|
||||
batch.messages.map(async (msg: Message<IThreadBatch>) => {
|
||||
const providerId = msg.body.providerId;
|
||||
const historyId = msg.body.historyId;
|
||||
const subscriptionName = msg.body.subscriptionName;
|
||||
|
||||
try {
|
||||
const workflowRunner = env.WORKFLOW_RUNNER.get(env.WORKFLOW_RUNNER.newUniqueId());
|
||||
const result = await workflowRunner.runMainWorkflow({
|
||||
providerId,
|
||||
historyId,
|
||||
subscriptionName,
|
||||
});
|
||||
console.log('[THREAD_QUEUE] result', result);
|
||||
} catch (error) {
|
||||
console.error('Error running workflow', error);
|
||||
}
|
||||
}),
|
||||
);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
async scheduled() {
|
||||
console.log('[SCHEDULED] Checking for expired subscriptions...');
|
||||
const { db, conn } = createDb(env.HYPERDRIVE.connectionString);
|
||||
const allAccounts = await db.query.connection.findMany({
|
||||
where: (fields, { isNotNull, and }) =>
|
||||
and(isNotNull(fields.accessToken), isNotNull(fields.refreshToken)),
|
||||
});
|
||||
await conn.end();
|
||||
console.log('[SCHEDULED] allAccounts', allAccounts.length);
|
||||
const now = new Date();
|
||||
const fiveDaysAgo = new Date(now.getTime() - 5 * 24 * 60 * 60 * 1000);
|
||||
|
||||
const expiredSubscriptions: Array<{ connectionId: string; providerId: EProviders }> = [];
|
||||
|
||||
const nowTs = Date.now();
|
||||
|
||||
const unsnoozeMap: Record<string, { threadIds: string[]; keyNames: string[] }> = {};
|
||||
|
||||
let cursor: string | undefined = undefined;
|
||||
do {
|
||||
const listResp: {
|
||||
keys: { name: string; metadata?: { wakeAt?: string } }[];
|
||||
cursor?: string;
|
||||
} = await env.snoozed_emails.list({ cursor, limit: 1000 });
|
||||
cursor = listResp.cursor;
|
||||
|
||||
for (const key of listResp.keys) {
|
||||
try {
|
||||
const wakeAtIso = (key as any).metadata?.wakeAt as string | undefined;
|
||||
if (!wakeAtIso) continue;
|
||||
const wakeAt = new Date(wakeAtIso).getTime();
|
||||
if (wakeAt > nowTs) continue;
|
||||
|
||||
const [threadId, connectionId] = key.name.split('__');
|
||||
if (!threadId || !connectionId) continue;
|
||||
|
||||
if (!unsnoozeMap[connectionId]) {
|
||||
unsnoozeMap[connectionId] = { threadIds: [], keyNames: [] };
|
||||
}
|
||||
unsnoozeMap[connectionId].threadIds.push(threadId);
|
||||
unsnoozeMap[connectionId].keyNames.push(key.name);
|
||||
} catch (error) {
|
||||
console.error('Failed to prepare unsnooze for key', key.name, error);
|
||||
}
|
||||
}
|
||||
} while (cursor);
|
||||
|
||||
await Promise.all(
|
||||
Object.entries(unsnoozeMap).map(async ([connectionId, { threadIds, keyNames }]) => {
|
||||
try {
|
||||
const agent = await getZeroAgent(connectionId);
|
||||
await agent.queue('unsnoozeThreadsHandler', { connectionId, threadIds, keyNames });
|
||||
} catch (error) {
|
||||
console.error('Failed to enqueue unsnooze tasks', { connectionId, threadIds, error });
|
||||
}
|
||||
}),
|
||||
);
|
||||
|
||||
await Promise.all(
|
||||
allAccounts.map(async ({ id, providerId }) => {
|
||||
const lastSubscribed = await env.gmail_sub_age.get(`${id}__${providerId}`);
|
||||
|
||||
if (lastSubscribed) {
|
||||
const subscriptionDate = new Date(lastSubscribed);
|
||||
if (subscriptionDate < fiveDaysAgo) {
|
||||
console.log(`[SCHEDULED] Found expired Google subscription for connection: ${id}`);
|
||||
expiredSubscriptions.push({ connectionId: id, providerId: providerId as EProviders });
|
||||
}
|
||||
} else {
|
||||
expiredSubscriptions.push({ connectionId: id, providerId: providerId as EProviders });
|
||||
}
|
||||
}),
|
||||
);
|
||||
|
||||
// Send expired subscriptions to queue for renewal
|
||||
if (expiredSubscriptions.length > 0) {
|
||||
console.log(
|
||||
`[SCHEDULED] Processed ${allAccounts.keys.length} accounts, found ${expiredSubscriptions.length} expired subscriptions`,
|
||||
`[SCHEDULED] Sending ${expiredSubscriptions.length} expired subscriptions to renewal queue`,
|
||||
);
|
||||
},
|
||||
} satisfies ExportedHandler<Env>,
|
||||
);
|
||||
await Promise.all(
|
||||
expiredSubscriptions.map(async ({ connectionId, providerId }) => {
|
||||
await env.subscribe_queue.send({ connectionId, providerId });
|
||||
}),
|
||||
);
|
||||
}
|
||||
|
||||
console.log(
|
||||
`[SCHEDULED] Processed ${allAccounts.keys.length} accounts, found ${expiredSubscriptions.length} expired subscriptions`,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
export { ZeroAgent, ZeroMCP, ZeroDB, ZeroDriver, ThinkingMCP, WorkflowRunner };
|
||||
|
||||
Reference in New Issue
Block a user