Skip to content

Commit 248dc3a

Browse files
authored
Separate importScheduledCommisisons (dubinc#3688)
1 parent 343ba40 commit 248dc3a

9 files changed

Lines changed: 146 additions & 90 deletions

File tree

apps/web/app/(ee)/api/cron/import/partnerstack/route.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import { importCustomers } from "@/lib/partnerstack/import-customers";
55
import { importGroups } from "@/lib/partnerstack/import-groups";
66
import { importLinks } from "@/lib/partnerstack/import-links";
77
import { importPartners } from "@/lib/partnerstack/import-partners";
8+
import { importScheduledCommissions } from "@/lib/partnerstack/import-scheduled-commissions";
89
import { partnerStackImportPayloadSchema } from "@/lib/partnerstack/schemas";
910
import { updateStripeCustomers } from "@/lib/partnerstack/update-stripe-customers";
1011
import { NextResponse } from "next/server";
@@ -38,6 +39,9 @@ export async function POST(req: Request) {
3839
case "import-commissions":
3940
await importCommissions(payload);
4041
break;
42+
case "import-scheduled-commissions":
43+
await importScheduledCommissions(payload);
44+
break;
4145
case "update-stripe-customers":
4246
await updateStripeCustomers(payload);
4347
break;

apps/web/lib/firstpromoter/api.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import * as z from "zod/v4";
2-
import { PAGE_LIMIT } from "./importer";
2+
import { FIRSTPROMOTER_PAGE_LIMIT } from "./importer";
33
import {
44
firstPromoterCampaignSchema,
55
firstPromoterCommissionSchema,
@@ -46,7 +46,7 @@ export class FirstPromoterApi {
4646

4747
async listCampaigns({ page }: { page?: number }) {
4848
const searchParams = new URLSearchParams({
49-
per_page: PAGE_LIMIT.toString(),
49+
per_page: FIRSTPROMOTER_PAGE_LIMIT.toString(),
5050
...(page ? { page: page.toString() } : {}),
5151
});
5252

@@ -61,7 +61,7 @@ export class FirstPromoterApi {
6161
const searchParams = new URLSearchParams();
6262
searchParams.set("filters[state]", "accepted");
6363
searchParams.set("filters[referrals_count][from]", "1");
64-
searchParams.set("per_page", PAGE_LIMIT.toString());
64+
searchParams.set("per_page", FIRSTPROMOTER_PAGE_LIMIT.toString());
6565
if (page) searchParams.set("page", page.toString());
6666

6767
const response = await this.fetch(`/promoters?${searchParams.toString()}`);
@@ -77,7 +77,7 @@ export class FirstPromoterApi {
7777

7878
async listCustomers({ page }: { page?: number }) {
7979
const searchParams = new URLSearchParams({
80-
per_page: PAGE_LIMIT.toString(),
80+
per_page: FIRSTPROMOTER_PAGE_LIMIT.toString(),
8181
...(page ? { page: page.toString() } : {}),
8282
});
8383

@@ -95,7 +95,7 @@ export class FirstPromoterApi {
9595

9696
async listCommissions({ page }: { page?: number }) {
9797
const searchParams = new URLSearchParams({
98-
per_page: PAGE_LIMIT.toString(),
98+
per_page: FIRSTPROMOTER_PAGE_LIMIT.toString(),
9999
...(page ? { page: page.toString() } : {}),
100100
});
101101

apps/web/lib/firstpromoter/importer.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import { redis } from "@/lib/upstash";
33
import { APP_DOMAIN_WITH_NGROK } from "@dub/utils";
44
import { FirstPromoterCredentials, FirstPromoterImportPayload } from "./types";
55

6-
export const PAGE_LIMIT = 100;
6+
export const FIRSTPROMOTER_PAGE_LIMIT = 100;
77
export const MAX_BATCHES = 10;
88
export const CACHE_EXPIRY = 60 * 60 * 24;
99
export const CACHE_KEY_PREFIX = "firstpromoter:import";

apps/web/lib/partnerstack/api.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import {
1414
PartnerStackPartner,
1515
} from "./types";
1616

17-
const PAGE_LIMIT = 100;
17+
export const PARTNERSTACK_PAGE_LIMIT = 100;
1818

1919
export class PartnerStackApi {
2020
private readonly baseUrl = "https://api.partnerstack.com/api/v2";
@@ -77,7 +77,7 @@ export class PartnerStackApi {
7777
async listPartners({ startingAfter }: { startingAfter?: string }) {
7878
const searchParams = new URLSearchParams();
7979
searchParams.append("approved_status", "approved");
80-
searchParams.append("limit", PAGE_LIMIT.toString());
80+
searchParams.append("limit", PARTNERSTACK_PAGE_LIMIT.toString());
8181

8282
if (startingAfter) {
8383
searchParams.append("starting_after", startingAfter);
@@ -104,7 +104,7 @@ export class PartnerStackApi {
104104

105105
async listCustomers({ startingAfter }: { startingAfter?: string }) {
106106
const searchParams = new URLSearchParams();
107-
searchParams.append("limit", PAGE_LIMIT.toString());
107+
searchParams.append("limit", PARTNERSTACK_PAGE_LIMIT.toString());
108108

109109
if (startingAfter) {
110110
searchParams.append("starting_after", startingAfter);
@@ -127,7 +127,7 @@ export class PartnerStackApi {
127127
status?: PartnerStackCommission["reward_status"];
128128
}) {
129129
const searchParams = new URLSearchParams();
130-
searchParams.append("limit", PAGE_LIMIT.toString());
130+
searchParams.append("limit", PARTNERSTACK_PAGE_LIMIT.toString());
131131

132132
if (startingAfter) {
133133
searchParams.append("starting_after", startingAfter);

apps/web/lib/partnerstack/import-commissions.ts

Lines changed: 4 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ export async function importCommissions(payload: PartnerStackImportPayload) {
103103

104104
await Promise.allSettled(
105105
commissions.map((commission) =>
106-
createCommission({
106+
createCommissionFromPS({
107107
program,
108108
commission,
109109
fxRates,
@@ -114,86 +114,20 @@ export async function importCommissions(payload: PartnerStackImportPayload) {
114114
),
115115
);
116116

117-
await new Promise((resolve) => setTimeout(resolve, 1000));
118-
119117
currentStartingAfter = commissions[commissions.length - 1].key;
120118
processedBatches++;
121119
}
122120

123-
// Import scheduled commissions
124-
if (!hasMore) {
125-
const commissions = await partnerStackApi.listCommissions({
126-
status: "scheduled",
127-
});
128-
129-
if (commissions.length > 0) {
130-
const customersData = await prisma.customer.findMany({
131-
where: {
132-
projectId: program.workspaceId,
133-
OR: [
134-
{
135-
email: {
136-
in: commissions
137-
.map((commission) => commission.customer?.email)
138-
.filter(
139-
(email): email is string =>
140-
email !== null && email !== undefined,
141-
),
142-
},
143-
},
144-
{
145-
externalId: {
146-
in: commissions
147-
.map((commission) => commission.customer?.external_key)
148-
.filter(
149-
(externalKey): externalKey is string =>
150-
externalKey !== null && externalKey !== undefined,
151-
),
152-
},
153-
},
154-
],
155-
},
156-
include: {
157-
link: true,
158-
},
159-
orderBy: {
160-
createdAt: "asc",
161-
},
162-
});
163-
164-
const customerLeadEvents = await getLeadEvents({
165-
customerIds: customersData.map((customer) => customer.id),
166-
}).then((res) => res.data);
167-
168-
await Promise.allSettled(
169-
commissions.map((commission) =>
170-
createCommission({
171-
program,
172-
commission,
173-
fxRates,
174-
importId,
175-
customersData,
176-
customerLeadEvents,
177-
}),
178-
),
179-
);
180-
181-
await new Promise((resolve) => setTimeout(resolve, 1000));
182-
}
183-
}
184-
185-
if (!hasMore) {
186-
await partnerStackImporter.deleteCredentials(program.workspaceId);
187-
}
121+
await new Promise((resolve) => setTimeout(resolve, 1000));
188122

189123
await partnerStackImporter.queue({
190124
...payload,
191125
startingAfter: hasMore ? currentStartingAfter : undefined,
192-
action: hasMore ? "import-commissions" : "update-stripe-customers",
126+
action: hasMore ? "import-commissions" : "import-scheduled-commissions",
193127
});
194128
}
195129

196-
async function createCommission({
130+
export async function createCommissionFromPS({
197131
program,
198132
commission,
199133
fxRates,
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
import { prisma } from "@dub/prisma";
2+
import { getLeadEvents } from "../tinybird/get-lead-events";
3+
import { redis } from "../upstash";
4+
import { PartnerStackApi } from "./api";
5+
import { createCommissionFromPS } from "./import-commissions";
6+
import { MAX_BATCHES, partnerStackImporter } from "./importer";
7+
import { PartnerStackImportPayload } from "./types";
8+
9+
export async function importScheduledCommissions(
10+
payload: PartnerStackImportPayload,
11+
) {
12+
const { importId, programId, startingAfter } = payload;
13+
14+
const program = await prisma.program.findUniqueOrThrow({
15+
where: {
16+
id: programId,
17+
},
18+
});
19+
20+
const { publicKey, secretKey } = await partnerStackImporter.getCredentials(
21+
program.workspaceId,
22+
);
23+
24+
const partnerStackApi = new PartnerStackApi({
25+
publicKey,
26+
secretKey,
27+
});
28+
29+
const fxRates = await redis.hgetall<Record<string, string>>("fxRates:usd");
30+
31+
let hasMore = true;
32+
let processedBatches = 0;
33+
let currentStartingAfter = startingAfter;
34+
35+
while (hasMore && processedBatches < MAX_BATCHES) {
36+
const scheduledCommissions = await partnerStackApi.listCommissions({
37+
status: "scheduled",
38+
startingAfter: currentStartingAfter,
39+
});
40+
41+
if (scheduledCommissions.length === 0) {
42+
hasMore = false;
43+
break;
44+
}
45+
46+
const customersData = await prisma.customer.findMany({
47+
where: {
48+
projectId: program.workspaceId,
49+
OR: [
50+
{
51+
email: {
52+
in: scheduledCommissions
53+
.map((commission) => commission.customer?.email)
54+
.filter(
55+
(email): email is string =>
56+
email !== null && email !== undefined,
57+
),
58+
},
59+
},
60+
{
61+
externalId: {
62+
in: scheduledCommissions
63+
.map((commission) => commission.customer?.external_key)
64+
.filter(
65+
(externalKey): externalKey is string =>
66+
externalKey !== null && externalKey !== undefined,
67+
),
68+
},
69+
},
70+
],
71+
},
72+
include: {
73+
link: true,
74+
},
75+
orderBy: {
76+
createdAt: "asc",
77+
},
78+
});
79+
80+
const customerLeadEvents = await getLeadEvents({
81+
customerIds: customersData.map((customer) => customer.id),
82+
}).then((res) => res.data);
83+
84+
await Promise.allSettled(
85+
scheduledCommissions.map((commission) =>
86+
createCommissionFromPS({
87+
program,
88+
commission,
89+
fxRates,
90+
importId,
91+
customersData,
92+
customerLeadEvents,
93+
}),
94+
),
95+
);
96+
97+
currentStartingAfter =
98+
scheduledCommissions[scheduledCommissions.length - 1].key;
99+
processedBatches++;
100+
}
101+
102+
// If there are more scheduled commissions to import, sleep for 1 second and continue the loop
103+
if (hasMore) {
104+
await new Promise((resolve) => setTimeout(resolve, 1000));
105+
} else {
106+
// else, delete the credentials and exit the loop
107+
await partnerStackImporter.deleteCredentials(program.workspaceId);
108+
}
109+
110+
await partnerStackImporter.queue({
111+
...payload,
112+
startingAfter: hasMore ? currentStartingAfter : undefined,
113+
action: hasMore
114+
? "import-scheduled-commissions"
115+
: "update-stripe-customers",
116+
});
117+
}

apps/web/lib/partnerstack/schemas.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ export const partnerStackImportSteps = z.enum([
66
"import-links",
77
"import-customers",
88
"import-commissions",
9+
"import-scheduled-commissions",
910
"update-stripe-customers",
1011
]);
1112

apps/web/lib/rewardful/api.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import {
77
RewardfulReferral,
88
} from "./types";
99

10-
const PAGE_LIMIT = 100;
10+
const REWARDFUL_PAGE_LIMIT = 100;
1111

1212
class RewardfulApiError extends DubApiError {
1313
constructor(message: string) {
@@ -57,7 +57,7 @@ export class RewardfulApi {
5757
searchParams.append("expand[]", "campaign");
5858
searchParams.append("expand[]", "links");
5959
searchParams.append("page", page.toString());
60-
searchParams.append("limit", PAGE_LIMIT.toString());
60+
searchParams.append("limit", REWARDFUL_PAGE_LIMIT.toString());
6161

6262
const { data } = await this.fetch<{ data: RewardfulAffiliate[] }>(
6363
`${this.baseUrl}/affiliates?${searchParams.toString()}`,
@@ -72,7 +72,7 @@ export class RewardfulApi {
7272
searchParams.append("conversion_state[]", "lead");
7373
searchParams.append("conversion_state[]", "conversion");
7474
searchParams.append("page", page.toString());
75-
searchParams.append("limit", PAGE_LIMIT.toString());
75+
searchParams.append("limit", REWARDFUL_PAGE_LIMIT.toString());
7676

7777
const { data } = await this.fetch<{ data: RewardfulReferral[] }>(
7878
`${this.baseUrl}/referrals?${searchParams.toString()}`,
@@ -86,7 +86,7 @@ export class RewardfulApi {
8686
searchParams.append("expand[]", "sale");
8787
searchParams.append("expand[]", "campaign");
8888
searchParams.append("page", page.toString());
89-
searchParams.append("limit", PAGE_LIMIT.toString());
89+
searchParams.append("limit", REWARDFUL_PAGE_LIMIT.toString());
9090

9191
const { data } = await this.fetch<{ data: RewardfulCommission[] }>(
9292
`${this.baseUrl}/commissions?${searchParams.toString()}`,
@@ -98,7 +98,7 @@ export class RewardfulApi {
9898
async listAffiliateCoupons({ page = 1 }: { page?: number }) {
9999
const searchParams = new URLSearchParams();
100100
searchParams.append("page", page.toString());
101-
searchParams.append("limit", PAGE_LIMIT.toString());
101+
searchParams.append("limit", REWARDFUL_PAGE_LIMIT.toString());
102102

103103
const { data } = await this.fetch<{ data: RewardfulCoupon[] }>(
104104
`${this.baseUrl}/affiliate_coupons?${searchParams.toString()}`,

0 commit comments

Comments
 (0)