diff --git a/apps/server/src/pipelines.ts b/apps/server/src/pipelines.ts index 3d7e29ca4..a26540378 100644 --- a/apps/server/src/pipelines.ts +++ b/apps/server/src/pipelines.ts @@ -82,7 +82,13 @@ export class MainWorkflow extends WorkflowEntrypoint { }); } else { log('[MAIN_WORKFLOW] Creating workflow instance with current history'); + const existingInstance = await env.ZERO_WORKFLOW.get(`${connectionId}__${historyId}`); + if (existingInstance) { + log('[MAIN_WORKFLOW] History already processing:', existingInstance.id); + return; + } const instance = await env.ZERO_WORKFLOW.create({ + id: `${connectionId}__${historyId}`, params: { connectionId, historyId: historyId, @@ -267,7 +273,15 @@ export class ZeroWorkflow extends WorkflowEntrypoint { continue; } await env.gmail_processing_threads.put(threadId.toString(), 'true'); + const existingInstance = await env.THREAD_WORKFLOW.get( + `${threadId.toString()}__${connectionId.toString()}`, + ); + if (existingInstance) { + log('[ZERO_WORKFLOW] Thread already processing:', isProcessing, threadId); + continue; + } const instance = await env.THREAD_WORKFLOW.create({ + id: `${threadId.toString()}__${connectionId.toString()}`, params: { connectionId, threadId, providerId: foundConnection.providerId }, }); log('[ZERO_WORKFLOW] Created instance:', {