Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/shared/utils/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ export const env = createEnv({
QUEUE_FAIL_HISTORY_COUNT: z.coerce.number().default(10_000),
// Sets the number of recent nonces to map to queue IDs.
NONCE_MAP_COUNT: z.coerce.number().default(10_000),
// Sets the estimated number of blocks to query per contract subscription job. Defaults to 1 block (real-time).
CONTRACT_SUBSCRIPTION_BLOCK_RANGE: z.coerce.number().default(1),

ENABLE_KEYPAIR_AUTH: boolEnvSchema(false),
ENABLE_CUSTOM_HMAC_AUTH: boolEnvSchema(false),
Expand Down Expand Up @@ -136,6 +138,8 @@ export const env = createEnv({
QUEUE_COMPLETE_HISTORY_COUNT: process.env.QUEUE_COMPLETE_HISTORY_COUNT,
QUEUE_FAIL_HISTORY_COUNT: process.env.QUEUE_FAIL_HISTORY_COUNT,
NONCE_MAP_COUNT: process.env.NONCE_MAP_COUNT,
CONTRACT_SUBSCRIPTION_BLOCK_RANGE:
process.env.CONTRACT_SUBSCRIPTION_BLOCK_RANGE,
EXPERIMENTAL__MINE_WORKER_TIMEOUT_SECONDS:
process.env.EXPERIMENTAL__MINE_WORKER_TIMEOUT_SECONDS,
EXPERIMENTAL__MAX_GAS_PRICE_WEI:
Expand Down
24 changes: 19 additions & 5 deletions src/worker/indexers/chain-indexer-registry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import cron from "node-cron";
import { getBlockTimeSeconds } from "../../shared/utils/indexer/get-block-time";
import { logger } from "../../shared/utils/logger";
import { handleContractSubscriptions } from "../tasks/chain-indexer";
import { env } from "../../shared/utils/env";

// @TODO: Move all worker logic to Bullmq to better handle multiple hosts.
export const INDEXER_REGISTRY = {} as Record<number, cron.ScheduledTask>;
Expand All @@ -24,9 +25,10 @@ export const addChainIndexer = async (chainId: number) => {
});
blockTimeSeconds = 2;
}
const cronSchedule = createScheduleSeconds(
Math.max(Math.round(blockTimeSeconds), 1),
);
const cronSchedule = createScheduleSeconds({
blockTimeSeconds,
numBlocks: env.CONTRACT_SUBSCRIPTION_BLOCK_RANGE,
});
logger({
service: "worker",
level: "info",
Expand Down Expand Up @@ -74,5 +76,17 @@ export const removeChainIndexer = async (chainId: number) => {
delete INDEXER_REGISTRY[chainId];
};

export const createScheduleSeconds = (seconds: number) =>
`*/${seconds} * * * * *`;
/**
* Returns the cron schedule given the chain's block time and the number of blocks to batch per job.
* Minimum is every 2 seconds.
*/
function createScheduleSeconds({
blockTimeSeconds,
numBlocks,
}: { blockTimeSeconds: number; numBlocks: number }) {
const pollFrequencySeconds = Math.max(
Math.round(blockTimeSeconds * numBlocks),
2,
);
return `*/${pollFrequencySeconds} * * * * *`;
}