Files
linkwarden/apps/worker/lib/getLinkBatchFairly.ts

180 lines
4.8 KiB
TypeScript

import { prisma } from "@linkwarden/prisma";
import { Prisma } from "@linkwarden/prisma/client";
type PickLinksOptions = {
maxBatchLinks: number;
mode: "links" | "tags";
};
const TRIAL_PERIOD_DAYS = process.env.NEXT_PUBLIC_TRIAL_PERIOD_DAYS || 14;
const REQUIRE_CC = process.env.NEXT_PUBLIC_REQUIRE_CC === "true";
export default async function getLinkBatchFairly({
maxBatchLinks,
mode,
}: PickLinksOptions) {
if (maxBatchLinks <= 0) return [];
const baseLinkWhere: Prisma.LinkWhereInput =
mode === "tags"
? {
url: { not: null },
type: "url",
lastPreserved: { not: null },
aiTagged: false,
collection: {
is: {
owner: {
is: {
aiTaggingMethod: { not: "DISABLED" },
},
},
},
},
}
: {
url: { not: null },
lastPreserved: null,
};
const userLinksOrderBy: Prisma.LinkOrderByWithRelationInput[] =
mode === "tags"
? [{ lastPreserved: "desc" }, { id: "desc" }]
: [{ createdAt: "desc" }];
const users = await prisma.user.findMany({
where: {
createdLinks: {
some: {
...baseLinkWhere,
},
},
...(process.env.STRIPE_SECRET_KEY
? {
OR: [
{ subscriptions: { is: { active: true } } },
{ parentSubscription: { is: { active: true } } },
...(REQUIRE_CC
? []
: [
{
createdAt: {
gte: new Date(
new Date().getTime() -
Number(TRIAL_PERIOD_DAYS) * 86400000
),
},
},
]),
],
}
: {}),
...(process.env.NEXT_PUBLIC_EMAIL_PROVIDER === "true"
? {
emailVerified: { not: null },
}
: {}),
},
orderBy: [{ lastPickedAt: { sort: "asc", nulls: "first" } }, { id: "asc" }],
select: { id: true, lastPickedAt: true },
take: maxBatchLinks,
});
if (users.length === 0) return [];
const linkUserMap = new Map<number, number>();
for (const user of users) {
const userLinks = await prisma.link.findMany({
where: { createdBy: { id: user.id }, ...baseLinkWhere },
orderBy: userLinksOrderBy,
take: maxBatchLinks,
select: { id: true },
});
linkUserMap.set(user.id, userLinks.length);
}
const uniqueUsersWithLinks = Array.from(linkUserMap.entries())
.filter(([, count]) => count > 0)
.map(([userId]) => userId);
// Pick one `linksPerUser` from the `linkUserMap` recursively until we reach the `maxBatchLinks` OR run out of links
const linksPerUser = Math.max(1, Math.floor(maxBatchLinks / users.length));
const nextOffset = new Map<number, number>();
users.forEach((u) => nextOffset.set(u.id, 0));
const picked = new Set<number>();
while (picked.size < maxBatchLinks) {
let addedThisRound = 0;
for (const { id: userId } of users) {
if (picked.size >= maxBatchLinks) break;
const remaining = maxBatchLinks - picked.size;
const toTake = Math.min(linksPerUser, remaining);
if (toTake <= 0) break;
const skip = nextOffset.get(userId) ?? 0;
const userLinks = await prisma.link.findMany({
where: { ...baseLinkWhere, createdBy: { id: userId } },
orderBy: userLinksOrderBy,
skip,
take: toTake,
select: { id: true },
});
if (userLinks.length === 0) continue; // this user ran out
nextOffset.set(userId, skip + userLinks.length);
for (const { id } of userLinks) {
if (picked.size >= maxBatchLinks) break;
if (!picked.has(id)) {
picked.add(id);
addedThisRound++;
}
}
}
// Nobody contributed anything — avoid infinite loop
if (addedThisRound === 0) break;
}
if (picked.size === 0) return [];
const pickedIds = Array.from(picked);
const batch = await prisma.link.findMany({
where: { id: { in: pickedIds } },
include: {
collection: { include: { owner: true } },
tags: true,
},
});
const now = new Date();
await prisma.user.updateMany({
where: { id: { in: uniqueUsersWithLinks } },
data: { lastPickedAt: now },
});
const order = new Map<number, number>();
pickedIds.forEach((id, i) => order.set(id, i));
batch.sort((a, b) => (order.get(a.id) ?? 0) - (order.get(b.id) ?? 0));
console.log(
"\x1b[34m%s\x1b[0m",
`${mode === "tags" ? "Auto-tagging" : "Processing"} ${batch.length} ${
batch.length > 1 ? "links" : "link"
} for the following ${
uniqueUsersWithLinks.length > 1 ? "userIds" : "userId"
}: ${uniqueUsersWithLinks.join(", ")}`
);
return batch;
}