diff --git a/apps/mail/components/ui/app-sidebar.tsx b/apps/mail/components/ui/app-sidebar.tsx index 9da9cf000..5237a403a 100644 --- a/apps/mail/components/ui/app-sidebar.tsx +++ b/apps/mail/components/ui/app-sidebar.tsx @@ -20,8 +20,8 @@ import { useSession } from '@/lib/auth-client'; import { useAIFullScreen } from './ai-sidebar'; import { useStats } from '@/hooks/use-stats'; import { useLocation } from 'react-router'; +import { cn, FOLDERS } from '@/lib/utils'; import { m } from '@/paraglide/messages'; -import { FOLDERS } from '@/lib/utils'; import { Video } from 'lucide-react'; import { NavUser } from './nav-user'; import { NavMain } from './nav-main'; @@ -39,11 +39,8 @@ export function AppSidebar({ ...props }: React.ComponentProps) { return true; }); const [, setPricingDialog] = useQueryState('pricingDialog'); - const { isFullScreen } = useAIFullScreen(); - const { data: stats } = useStats(); - const location = useLocation(); const { data: session } = useSession(); const { currentSection, navItems } = useMemo(() => { @@ -107,15 +104,17 @@ export function AppSidebar({ ...props }: React.ComponentProps) { {showComposeButton && (
-
+
- + {isPro ? ( + + ) : null}
)} diff --git a/apps/mail/hooks/use-billing.ts b/apps/mail/hooks/use-billing.ts index beeda4ef3..b99ddd2c9 100644 --- a/apps/mail/hooks/use-billing.ts +++ b/apps/mail/hooks/use-billing.ts @@ -1,5 +1,6 @@ import { useAutumn, useCustomer } from 'autumn-js/react'; import { signOut } from '@/lib/auth-client'; +import { isProCustomer } from '@/lib/utils'; import { useEffect, useMemo } from 'react'; type FeatureState = { @@ -58,8 +59,6 @@ const FEATURE_IDS = { BRAIN: 'brain-activity', } as const; -const PRO_PLANS = ['pro-example', 'pro_annual', 'team', 'enterprise'] as const; - export const useBilling = () => { const { customer, refetch, isLoading, error } = useCustomer(); const { attach, track, openBillingPortal } = useAutumn(); @@ -69,12 +68,7 @@ export const useBilling = () => { }, [error]); const { isPro, ...customerFeatures } = useMemo(() => { - const isPro = - customer?.products && Array.isArray(customer.products) - ? customer.products.some((product) => - PRO_PLANS.some((plan) => product.id?.includes(plan) || product.name?.includes(plan)), - ) - : false; + const isPro = customer ? isProCustomer(customer) : false; if (!customer?.features) return { isPro, ...DEFAULT_FEATURES }; diff --git a/apps/mail/lib/utils.ts b/apps/mail/lib/utils.ts index 9185497eb..89533e499 100644 --- a/apps/mail/lib/utils.ts +++ b/apps/mail/lib/utils.ts @@ -3,6 +3,7 @@ import { getBrowserTimezone } from './timezones'; import { formatInTimeZone } from 'date-fns-tz'; import { MAX_URL_LENGTH } from './constants'; import { clsx, type ClassValue } from 'clsx'; +import type { Customer } from 'autumn-js'; import { twMerge } from 'tailwind-merge'; import type { Sender } from '@/types'; import LZString from 'lz-string'; @@ -617,3 +618,13 @@ export const withExponentialBackoff = async ( } } }; + +const PRO_PLANS = ['pro-example', 'pro_annual', 'team', 'enterprise'] as const; + +export const isProCustomer = (customer: Customer) => { + return customer?.products && Array.isArray(customer.products) + ? customer.products.some((product) => + PRO_PLANS.some((plan) => product.id?.includes(plan) || product.name?.includes(plan)), + ) + : false; +}; diff --git a/apps/server/src/ctx.ts b/apps/server/src/ctx.ts index d1144f18e..681f3b8f6 100644 --- a/apps/server/src/ctx.ts +++ b/apps/server/src/ctx.ts @@ -1,13 +1,13 @@ -import type { Env } from './env'; import type { Autumn } from 'autumn-js'; import type { Auth } from './lib/auth'; +import type { ZeroEnv } from './env'; export type SessionUser = NonNullable>>['user']; export type HonoVariables = { auth: Auth; sessionUser?: SessionUser; - autumn: Autumn; + autumn?: Autumn; }; -export type HonoContext = { Variables: HonoVariables; Bindings: Env }; +export type HonoContext = { Variables: HonoVariables; Bindings: ZeroEnv }; diff --git a/apps/server/src/lib/auth.ts b/apps/server/src/lib/auth.ts index 4a560b3bd..9ef2de89a 100644 --- a/apps/server/src/lib/auth.ts +++ b/apps/server/src/lib/auth.ts @@ -13,18 +13,17 @@ import { getBrowserTimezone, isValidTimezone } from './timezones'; import { drizzleAdapter } from 'better-auth/adapters/drizzle'; import { getSocialProviders } from './auth-providers'; import { redis, resend, twilio } from './services'; -import { getContext } from 'hono/context-storage'; import { dubAnalytics } from '@dub/better-auth'; import { defaultUserSettings } from './schemas'; import { disableBrainFunction } from './brain'; import { APIError } from 'better-auth/api'; import { getZeroDB } from './server-utils'; import { type EProviders } from '../types'; -import type { HonoContext } from '../ctx'; -import { env } from '../env'; import { createDriver } from './driver'; +import { Autumn } from 'autumn-js'; import { createDb } from '../db'; import { Effect } from 'effect'; +import { env } from '../env'; import { Dub } from 'dub'; const scheduleCampaign = (userInfo: { address: string; name: string }) => @@ -191,9 +190,9 @@ export const createAuth = () => { if (!request) throw new APIError('BAD_REQUEST', { message: 'Request object is missing' }); const db = await getZeroDB(user.id); const connections = await db.findManyConnections(); - const context = getContext(); + const autumn = new Autumn({ secretKey: env.AUTUMN_SECRET_KEY }); try { - await context.var.autumn.customers.delete(user.id); + await autumn.customers.delete(user.id); } catch (error) { console.error('Failed to delete Autumn customer:', error); // Continue with deletion process despite Autumn failure diff --git a/apps/server/src/lib/server-utils.ts b/apps/server/src/lib/server-utils.ts index 4ce8e4e53..1dc3a85cb 100644 --- a/apps/server/src/lib/server-utils.ts +++ b/apps/server/src/lib/server-utils.ts @@ -364,11 +364,7 @@ export const forceReSync = async (connectionId: string) => { await agent.stub.forceReSync(); }; -type GetThreadsAccumulator = { - threads: any[]; - nextPageToken: string | null; - maxResults: number; -}; + export const getThreadsFromDB = async ( connectionId: string, @@ -380,80 +376,40 @@ export const getThreadsFromDB = async ( pageToken?: string; }, ): Promise => { - console.log(`[getThreadsFromDB] Called with connectionId: ${connectionId}, params:`, params); - await sendDoState(connectionId); + // Fire and forget - don't block the thread query on state updates + void sendDoState(connectionId); const maxResults = params.maxResults ?? 20; return Effect.runPromise( - aggregateShardDataSequentialEffect( + aggregateShardDataEffect( connectionId, - (shard, shardId, accumulator) => - Effect.gen(function* () { - if (accumulator.threads.length >= accumulator.maxResults) { - console.log( - `[getThreadsFromDB] Reached maxResults (${accumulator.maxResults}), breaking loop`, - ); - return { shouldContinue: false, accumulator }; - } + (shard) => + Effect.promise(() => + shard.stub.getThreadsFromDB({ + ...params, + maxResults: maxResults * 2, // Request more from each shard to ensure we have enough + }), + ), + (shardResults) => { + // Combine all threads from all shards + const allThreads = shardResults.flatMap((result) => result.threads); + + // Sort by some criteria if needed (assuming threads have a sortable field) + // allThreads.sort((a, b) => new Date(b.updated_at).getTime() - new Date(a.updated_at).getTime()); + + // Take only the requested amount + const threads = allThreads.slice(0, maxResults); + + // Determine if there's a next page token (simplified logic) + const hasMoreResults = allThreads.length > maxResults; + const nextPageToken = hasMoreResults + ? shardResults.find(r => r.nextPageToken)?.nextPageToken || null + : null; - const remainingResults = accumulator.maxResults - accumulator.threads.length; - console.log( - `[getThreadsFromDB] Querying shard ${shardId} for up to ${remainingResults} threads`, - ); - - const shardResult = (yield* Effect.promise(() => - shard.stub.getThreadsFromDB({ - ...params, - maxResults: remainingResults, - }), - )) as IGetThreadsResponse; - - console.log( - `[getThreadsFromDB] Shard ${shardId} returned ${shardResult.threads.length} threads, nextPageToken: ${shardResult.nextPageToken}`, - ); - - const newThreads = [...accumulator.threads, ...shardResult.threads]; - let newNextPageToken = accumulator.nextPageToken; - - if (shardResult.nextPageToken) { - newNextPageToken = shardResult.nextPageToken; - console.log( - `[getThreadsFromDB] Setting nextPageToken from shard ${shardId}: ${newNextPageToken}`, - ); - } - - const shouldContinue = - newThreads.length < accumulator.maxResults && - shardResult.threads.length >= remainingResults; - - if (!shouldContinue) { - console.log( - `[getThreadsFromDB] Stopping after shard ${shardId} (threads.length: ${newThreads.length}, shardResult.threads.length: ${shardResult.threads.length}, remainingResults: ${remainingResults})`, - ); - } - - return { - shouldContinue, - accumulator: { - threads: newThreads, - nextPageToken: newNextPageToken, - maxResults: accumulator.maxResults, - }, - }; - }), - { threads: [], nextPageToken: null, maxResults }, - (accumulator) => { - const slicedThreads = accumulator.threads.slice( - 0, - maxResults === Infinity ? accumulator.threads.length : maxResults, - ); - console.log( - `[getThreadsFromDB] Returning ${slicedThreads.length} threads, nextPageToken: ${accumulator.nextPageToken}`, - ); return { - threads: slicedThreads, - nextPageToken: accumulator.nextPageToken, + threads, + nextPageToken, }; }, ), diff --git a/apps/server/src/lib/utils.ts b/apps/server/src/lib/utils.ts index c5bdb1625..249d5ac3e 100644 --- a/apps/server/src/lib/utils.ts +++ b/apps/server/src/lib/utils.ts @@ -1,4 +1,5 @@ import type { AppContext, EProviders, Sender } from '../types'; +import type { Customer } from 'autumn-js'; import { env } from '../env'; export const parseHeaders = (token: string) => { @@ -365,3 +366,13 @@ export const cleanSearchValue = (q: string): string => { .replace(/\s+/g, ' ') .trim(); }; + +const PRO_PLANS = ['pro-example', 'pro_annual', 'team', 'enterprise'] as const; + +export const isProCustomer = (customer: Customer) => { + return customer?.products && Array.isArray(customer.products) + ? customer.products.some((product) => + PRO_PLANS.some((plan) => product.id?.includes(plan) || product.name?.includes(plan)), + ) + : false; +}; diff --git a/apps/server/src/main.ts b/apps/server/src/main.ts index 07214f3f1..ad34b89f0 100644 --- a/apps/server/src/main.ts +++ b/apps/server/src/main.ts @@ -45,7 +45,6 @@ import type { HonoContext } from './ctx'; import { createDb, type DB } from './db'; import { createAuth } from './lib/auth'; import { aiRouter } from './routes/ai'; -import { Autumn } from 'autumn-js'; import { appRouter } from './trpc'; import { cors } from 'hono/cors'; import { Hono } from 'hono'; @@ -587,13 +586,9 @@ const api = new Hono() } } - const autumn = new Autumn({ secretKey: env.AUTUMN_SECRET_KEY }); - c.set('autumn', autumn); - await next(); c.set('sessionUser', undefined); - c.set('autumn', undefined as any); c.set('auth', undefined as any); }) .route('/ai', aiRouter) diff --git a/apps/server/src/routes/autumn.ts b/apps/server/src/routes/autumn.ts index d2bb1113a..f129085b2 100644 --- a/apps/server/src/routes/autumn.ts +++ b/apps/server/src/routes/autumn.ts @@ -1,4 +1,4 @@ -import { fetchPricingTable } from 'autumn-js'; +import { Autumn, fetchPricingTable } from 'autumn-js'; import type { HonoContext } from '../ctx'; import { env } from '../env'; import { Hono } from 'hono'; @@ -38,6 +38,7 @@ export const autumnApi = new Hono() }, }, ); + c.set('autumn', new Autumn({ secretKey: env.AUTUMN_SECRET_KEY })); await next(); }) .post('/customers', async (c) => { @@ -46,7 +47,7 @@ export const autumnApi = new Hono() if (!customerData) return c.json({ error: 'No customer ID found' }, 401); return c.json( - await autumn.customers + await autumn!.customers .create({ id: customerData.customerId, ...customerData.customerData, @@ -62,7 +63,7 @@ export const autumnApi = new Hono() if (!customerData) return c.json({ error: 'No customer ID found' }, 401); return c.json( - await autumn + await autumn! .attach({ ...sanitizedBody, customer_id: customerData.customerId, @@ -78,7 +79,7 @@ export const autumnApi = new Hono() if (!customerData) return c.json({ error: 'No customer ID found' }, 401); return c.json( - await autumn + await autumn! .cancel({ ...sanitizedBody, customer_id: customerData.customerId, @@ -93,7 +94,7 @@ export const autumnApi = new Hono() if (!customerData) return c.json({ error: 'No customer ID found' }, 401); return c.json( - await autumn + await autumn! .check({ ...sanitizedBody, customer_id: customerData.customerId, @@ -109,7 +110,7 @@ export const autumnApi = new Hono() if (!customerData) return c.json({ error: 'No customer ID found' }, 401); return c.json( - await autumn + await autumn! .track({ ...sanitizedBody, customer_id: customerData.customerId, @@ -124,7 +125,9 @@ export const autumnApi = new Hono() if (!customerData) return c.json({ error: 'No customer ID found' }, 401); return c.json( - await autumn.customers.billingPortal(customerData.customerId, body).then((data) => data.data), + await autumn!.customers + .billingPortal(customerData.customerId, body) + .then((data) => data.data), ); }) .post('/openBillingPortal', async (c) => { @@ -133,7 +136,7 @@ export const autumnApi = new Hono() if (!customerData) return c.json({ error: 'No customer ID found' }, 401); return c.json( - await autumn.customers + await autumn!.customers .billingPortal(customerData.customerId, { ...body, return_url: `${env.VITE_PUBLIC_APP_URL}`, @@ -147,7 +150,7 @@ export const autumnApi = new Hono() if (!customerData) return c.json({ error: 'No customer ID found' }, 401); return c.json( - await autumn.entities.create(customerData.customerId, body).then((data) => data.data), + await autumn!.entities.create(customerData.customerId, body).then((data) => data.data), ); }) .get('/entities/:entityId', async (c) => { @@ -168,7 +171,7 @@ export const autumnApi = new Hono() } return c.json( - await autumn.entities + await autumn!.entities .get(customerData.customerId, entityId, { expand }) .then((data) => data.data), ); @@ -190,7 +193,7 @@ export const autumnApi = new Hono() } return c.json( - await autumn.entities.delete(customerData.customerId, entityId).then((data) => data.data), + await autumn!.entities.delete(customerData.customerId, entityId).then((data) => data.data), ); }) .get('/components/pricing_table', async (c) => { @@ -198,7 +201,7 @@ export const autumnApi = new Hono() return c.json( await fetchPricingTable({ - instance: autumn, + instance: autumn!, params: { customer_id: customerData?.customerId || undefined, }, diff --git a/apps/server/src/trpc/index.ts b/apps/server/src/trpc/index.ts index 3a18133ee..5a5a433b8 100644 --- a/apps/server/src/trpc/index.ts +++ b/apps/server/src/trpc/index.ts @@ -47,6 +47,5 @@ export const serverTrpc = () => { c, sessionUser: c.var.sessionUser, auth: c.var.auth, - autumn: c.var.autumn, }); }; diff --git a/apps/server/src/trpc/routes/meet.ts b/apps/server/src/trpc/routes/meet.ts index ff4294b7a..944891a7a 100644 --- a/apps/server/src/trpc/routes/meet.ts +++ b/apps/server/src/trpc/routes/meet.ts @@ -1,5 +1,8 @@ -import { activeDriverProcedure, router } from '../trpc'; +import { activeDriverProcedure, createRateLimiterMiddleware, router } from '../trpc'; +import { isProCustomer } from '../../lib/utils'; +import { Ratelimit } from '@upstash/ratelimit'; import { TRPCError } from '@trpc/server'; +import { Autumn } from 'autumn-js'; import { env } from '../../env'; type MeetResponse = { @@ -18,22 +21,42 @@ type MeetResponse = { }; export const meetRouter = router({ - create: activeDriverProcedure.mutation(async () => { - const AuthHeader = env.MEET_AUTH_HEADER; - const response = await fetch(env.MEET_API_URL + '/meetings', { - method: 'POST', - headers: { - 'Content-Type': 'application/json', - Authorization: AuthHeader, - }, - }); + create: activeDriverProcedure + .use( + createRateLimiterMiddleware({ + limiter: Ratelimit.slidingWindow(10, '1m'), + generatePrefix: ({ sessionUser }) => `ratelimit:meet-create-${sessionUser?.id}`, + }), + ) + .mutation(async ({ ctx }) => { + const autumn = new Autumn({ secretKey: env.AUTUMN_SECRET_KEY }); + const customer = await autumn.customers.get(ctx.sessionUser?.id); + if (!customer.data) { + throw new TRPCError({ code: 'UNAUTHORIZED', message: 'Customer not found' }); + } - if (!response.ok) { - console.error(await response.text()); - throw new TRPCError({ code: 'INTERNAL_SERVER_ERROR', message: 'Failed to create meeting' }); - } + if (!isProCustomer(customer.data)) { + throw new TRPCError({ + code: 'UNAUTHORIZED', + message: 'Customer is not a pro customer, please upgrade to a pro plan', + }); + } - const data = await response.json(); - return data; - }), + const AuthHeader = env.MEET_AUTH_HEADER; + const response = await fetch(env.MEET_API_URL + '/meetings', { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + Authorization: AuthHeader, + }, + }); + + if (!response.ok) { + console.error(await response.text()); + throw new TRPCError({ code: 'INTERNAL_SERVER_ERROR', message: 'Failed to create meeting' }); + } + + const data = await response.json(); + return data; + }), });