Distributed, event-driven job execution platform built on Spring Boot 4, Apache Kafka, and PostgreSQL.
- System Overview
- Module Decomposition
- Request Lifecycle
- Data Model
- Kafka Topology
- Scheduling Engine
- Worker Execution Model
- Retry and Dead-Letter Strategy
- Concurrency and Thread Model
- Offset Management
- Idempotency Guarantees
- Exception Architecture
- Observability
- Infrastructure
JobWeaver is a multi-service system designed to accept, schedule, execute, and track simulation jobs. The platform follows an event-driven architecture where services communicate exclusively through Apache Kafka topics, with each service maintaining its own isolated PostgreSQL database (database-per-service pattern).
The system is composed of four backend modules and a frontend scaffold:
| Module | Role | Port |
|---|---|---|
jobweaver-api |
REST gateway; accepts and persists job requests | 8080 |
jobweaver-scheduler |
Consumes job events, schedules dispatch, manages retries | 8081 |
jobweaver-worker |
Executes simulation instructions, reports outcomes | 8082 |
jobweaver-common |
Shared domain objects, events, and exception base classes | N/A (library) |
jobweaver-dashboard |
React/Vite frontend (scaffold) | -- |
A pure library module (no Spring Boot application context) that defines the shared contracts consumed by all three services:
- Event records:
JobCreatedEvent,RunJobEvent,JobCompletedEvent,JobFailedEvent,DeadLetterEvent-- all implemented as Java records for immutability. - Simulation model: A sealed interface
SimulationStepwith Jackson polymorphic deserialization (@JsonTypeInfo/@JsonSubTypes). Subtypes:SleepStep,LogStep,ComputeStep,HttpCallStep,FailStep. TheSimulationInstructionrecord wraps aList<SimulationStep>. - Exception base:
BaseDomainException(abstractRuntimeException) andDomainErrorCodeinterface, providing a consistent error structure across all services. - Enums:
JobType(SIMULATION),ExecutionOutcome(RUNNING,SUCCESS,FAILURE). - Error response:
ErrorResponserecord standardises the API error body (timestamp,status,errorCode,message,jobId).
The REST API gateway. Responsibilities:
- Expose CRUD endpoints under
/api/jobs. - Validate incoming
JobRequestpayloads. - Persist the
Jobentity (with JSONB-serialisedSimulationInstruction) to the API database. - Publish a
JobCreatedEventto thejob-createdKafka topic (synchronous send withacks=all). - Assign and propagate a
traceId(UUID) via MDC and Kafka headers.
Key classes: JobController, JobService, JobEventPublisher, MdcFilter, GlobalExceptionHandler.
The scheduling and retry engine. Responsibilities:
- Consume
JobCreatedEventmessages and persistJobExecutionrecords inPENDINGstate. - Poll the database on a fixed interval (10 seconds) for jobs whose
next_run_athas elapsed. - Dispatch ready jobs by publishing
RunJobEventmessages to therun-jobtopic. - Consume
JobCompletedEventandJobFailedEventoutcomes from the worker. - On failure: apply exponential backoff retry or route to the dead-letter topic.
Key classes: IngestionService, SchedulerService, JobDispatchService, DispatchScheduler, RunJobPublisher, DeadLetterQueuePublisher.
The job execution engine. Responsibilities:
- Consume
RunJobEventmessages from therun-jobtopic (3 consumer threads). - Submit simulation execution to a bounded thread pool (12 threads).
- Execute the
SimulationInstructionstep-by-step via theSimulationExecutor. - Persist
ExecutionAttemptrecords with idempotency guarantees (event ID as primary key). - Publish outcome events (
JobCompletedEventorJobFailedEvent) back to the scheduler. - Manage Kafka offsets asynchronously to allow out-of-order job completion.
Key classes: RunJobListener, WorkerService, ExecutionAttemptProcessor, SimulationExecutor, PartitionState, OffsetCommitCoordinator.
A job request flows through the system in the following stages:
- Client sends
POST /api/jobswith aJobRequestbody containingjobType,payload(simulation instruction), andmaxRetryCount. MdcFilter(highest precedence) attaches or generates atraceId, placing it in MDC and echoing it as a response header.JobService.submitJob()validates the request, generates atraceId, persists theJobentity to PostgreSQL, and publishes aJobCreatedEventsynchronously to thejob-createdtopic.- The client receives a
202 Acceptedresponse withjobIdandtraceId.
JobCreatedListenerreceives the event, extracts thetraceIdheader, and delegates toIngestionService.IngestionService.persistIfNotExists()creates aJobExecutionrecord inPENDINGstate withretryCount=0andnextRunAt=Instant.now(). Duplicate events (samejobId) are silently ignored viaDataIntegrityViolationExceptioncatch.- The consumer acknowledges the message manually.
DispatchSchedulerfires every 10 seconds, invokingJobDispatchService.dispatchPendingJobs().- A
SELECT ... FOR UPDATE SKIP LOCKEDquery retrieves up to 50 jobs wherestatus = PENDINGandnext_run_at <= now. - Each job is marked
RUNNINGand aRunJobEventis published asynchronously to therun-jobtopic (3 partitions, keyed byjobId).
RunJobListenerreceives the event on one of 3 consumer threads. It extractstraceIdandeventIdfrom Kafka headers.- The offset is registered as in-flight in the
PartitionState, and the job is submitted to thejobProcessorExecutorthread pool. ExecutionAttemptProcessor.executeTransaction()performs an idempotency check oneventId, creates theExecutionAttemptrecord, and invokesSimulationExecutor.SimulationExecutoriterates through the instruction steps:SLEEP,LOG,COMPUTE,HTTP_CALL,FAIL.- The attempt is marked
SUCCESSorFAILUREin the database.
WorkerServicepublishes either aJobCompletedEventorJobFailedEventsynchronously to the corresponding Kafka topic.- The offset is marked completed in
PartitionState. On the next poll cycle, contiguous completed offsets are committed.
JobCompletedListener: transitions theJobExecutionfromRUNNINGtoCOMPLETED.JobFailedListener: invokesSchedulerService.handleFailure():- If
retryCount < maxRetries: increments retry count, computes next backoff delay, resets status toPENDINGwith updatednextRunAt. The job re-enters dispatch on the next scheduler tick. - If
retryCount >= maxRetries: marks the jobFAILEDand publishes aDeadLetterEventto thejob-dead-lettertopic.
- If
jobs
├── id UUID PK
├── type VARCHAR(50) NOT NULL -- JobType enum
├── instruction JSONB NOT NULL -- SimulationInstruction
├── trace_id VARCHAR(100) NOT NULL
├── created_at TIMESTAMPTZ NOT NULL -- @CreatedDate
└── updated_at TIMESTAMPTZ NOT NULL -- @LastModifiedDate
Indexes: idx_jobs_trace_id, idx_jobs_created_at
job_executions
├── job_id UUID PK
├── trace_id VARCHAR(100) NOT NULL
├── instruction JSONB NOT NULL
├── job_status VARCHAR(30) NOT NULL -- PENDING | RUNNING | COMPLETED | FAILED
├── retry_count INTEGER NOT NULL DEFAULT 0
├── max_retries INTEGER NOT NULL
├── next_run_at TIMESTAMPTZ NOT NULL
├── last_error TEXT
├── updated_at TIMESTAMPTZ NOT NULL
└── version BIGINT NOT NULL DEFAULT 0 -- Optimistic locking
Indexes: idx_job_executions_status, idx_job_executions_next_run, idx_job_executions_trace_id
execution_attempts
├── event_id UUID PK -- Kafka eventId (idempotency key)
├── job_id UUID NOT NULL
├── trace_id VARCHAR(64) NOT NULL
├── started_at TIMESTAMPTZ NOT NULL
├── finished_at TIMESTAMPTZ
├── outcome VARCHAR(20) -- RUNNING | SUCCESS | FAILURE
└── error_message TEXT
Indexes: idx_execution_attempts_job_id, idx_execution_attempts_started_at
All databases use Flyway for schema migration management.
| Topic | Partitions | Replicas | Producer | Consumer | Event Type |
|---|---|---|---|---|---|
job-created |
1 | 1 | API | Scheduler | JobCreatedEvent |
run-job |
3 | 1 | Scheduler | Worker | RunJobEvent |
job-completed |
3 | 1 | Worker | Scheduler | JobCompletedEvent |
job-failed |
3 | 1 | Worker | Scheduler | JobFailedEvent |
job-dead-letter |
1 | 1 | Scheduler | (none) | DeadLetterEvent |
Producer configuration (uniform across all modules):
acks=all-- full ISR acknowledgment.enable.idempotence=true-- exactly-once semantics at the producer level.retries=5-- automatic retry on transient failures.- Serialization:
StringSerializer(key) +JacksonJsonSerializer(value), type-info headers disabled.
Consumer configuration:
enable.auto.commit=false-- all modules use manual acknowledgment.- Deserialization:
JacksonJsonDeserializerwithsetUseTypeHeaders(false)and explicit target types. - Worker-specific:
isolation.level=read_committed,max.poll.interval.ms=900000(15 minutes).
Message headers (all topics): traceId (correlation), eventId (deduplication).
The scheduler module employs a database-backed polling strategy with pessimistic locking:
- Polling interval:
@Scheduled(fixedDelay = 10000)-- every 10 seconds after the previous dispatch completes. - Query:
SELECT * FROM job_executions WHERE job_status = 'PENDING' AND next_run_at <= :now ORDER BY next_run_at LIMIT 50 FOR UPDATE SKIP LOCKED. - SKIP LOCKED: Enables horizontal scaling of scheduler instances without double-dispatching. Rows locked by one scheduler instance are simply skipped by others.
- Batch size: Up to 50 jobs per dispatch cycle.
- Error isolation: Failures during dispatch of individual jobs are caught and logged; remaining jobs in the batch continue processing.
The JobExecution entity uses @Version for optimistic locking, preventing concurrent state modifications from race conditions between the dispatch thread and Kafka listener threads.
The worker implements the multi-threaded Kafka consumer pattern as described in the Confluent architecture guide:
- Consumer threads: 3 (configured via
spring.kafka.listener.concurrency), each running an independent Kafka consumer within the same consumer group (jobweaver-workers). - Processing thread pool: 12 threads (
ArrayBlockingQueue(100),CallerRunsPolicy). - Simulation execution: The
SimulationExecutorprocessesSimulationInstructionsteps sequentially using Java 21 pattern matching (switchexpressions over sealed types):SleepStep:Thread.sleep(durationMs)LogStep: Logs the message at INFO levelComputeStep: CPU-bound loop accumulating a sum foriterationscyclesHttpCallStep: Simulated latency viaThread.sleep(latencyMs)FailStep: ThrowsSimulationFailureExceptionimmediately, halting execution
- Transaction boundary:
ExecutionAttemptProcessoris a separate Spring bean fromWorkerServiceto ensure@Transactionalproxying functions correctly (avoids self-invocation).
The retry mechanism is managed entirely by the scheduler module:
-
On receiving a
JobFailedEvent, the scheduler incrementsretryCountand evaluates againstmaxRetries. -
If retries remain, an exponential backoff delay is computed:
delay = min(5 * 2^retryCount, 300) secondsRetry Delay 1 10s 2 20s 3 40s 4 80s 5 160s 6+ 300s (cap) -
The
JobExecutionstatus is reset toPENDINGwithnextRunAtset toInstant.now() + delay. The next dispatch cycle picks it up once the delay has elapsed. -
If
retryCount > maxRetries, the job is markedFAILEDand aDeadLetterEventis published to thejob-dead-lettertopic containingjobId,reason,finalRetryCount, andfailedAt.
| Component | Mechanism | Purpose |
|---|---|---|
| Scheduler dispatch query | FOR UPDATE SKIP LOCKED |
Prevents double-dispatch across scheduler instances |
JobExecution.version |
Optimistic locking (@Version) |
Prevents concurrent state transition conflicts |
| Worker consumer threads | 3 Kafka consumers (same group) | Parallelises message consumption across partitions |
| Worker processing pool | 12 threads, bounded queue (100) | Decouples I/O-bound simulation from Kafka poll loop |
| Back-pressure | CallerRunsPolicy |
When pool is saturated, listener thread executes the task, naturally slowing poll() |
PartitionState |
synchronized methods |
Thread-safe offset tracking across consumer and pool threads |
PartitionStateRegistry |
ConcurrentHashMap |
Thread-safe partition-to-state mapping |
The worker module implements a custom asynchronous offset commit strategy to handle out-of-order job completion:
- In-flight tracking: When a record is received, its offset is registered in
PartitionState.inFlight(aTreeSet<Long>). - Completion marking: When a job finishes (success or failure), its offset moves from
inFlighttocompleted. - Contiguous watermark:
tryAdvanceCommit()computes the highest contiguous completed offset fromlastCommittedOffset. For example, iflastCommittedOffset = 4andcompleted = {5, 6, 8}, the watermark advances to 6 (gap at 7 blocks further advancement). - Commit piggybacking: Offset commits occur on the consumer thread during the next
poll()cycle, viaOffsetCommitCoordinator.commitIfReady(). - Rebalance handling:
ConsumerRebalanceHandlerflushes pending commits for revoked partitions and resets partition state. - Stuck offset detection: Warnings are logged if the completed set exceeds 10,000 entries, indicating a potentially stuck offset.
| Boundary | Mechanism |
|---|---|
| API to Scheduler | jobId (UUID) as primary key in job_executions. Duplicate JobCreatedEvent messages produce a DataIntegrityViolationException, which is caught silently. |
| Scheduler to Worker | eventId (UUID, generated per Kafka message) as primary key in execution_attempts. Duplicate RunJobEvent messages are detected, and the existing outcome is re-published without re-execution. |
| Producer idempotence | All Kafka producers use enable.idempotence=true, ensuring exactly-once delivery at the broker level. |
Each module defines a local exception hierarchy extending BaseDomainException from jobweaver-common:
BaseDomainException (common)
├── ApiException (api)
│ ├── JobNotFoundException (404)
│ ├── InvalidJobRequestException (400)
│ ├── DuplicateJobException (409)
│ ├── InvalidStateTransitionException (409)
│ └── EventPublishException (500)
├── SchedulerException (scheduler)
│ ├── JobNotFoundException (404)
│ ├── InvalidStateTransitionException (409)
│ ├── DispatchException (500)
│ └── EventPublishException (500)
└── WorkerException (worker)
├── SimulationFailureException (500)
├── SimulationInterruptedException (500)
├── EventPublishException (500)
├── MalformedRecordException (400)
└── OffsetCommitException (500)
Each module defines a DomainErrorCode enum implementing the DomainErrorCode interface, producing namespaced error codes (e.g., API.JOB_NOT_FOUND, SCHEDULER.DISPATCH_FAILED, WORKER.SIMULATION_FAILED).
The API module includes a GlobalExceptionHandler (@RestControllerAdvice) that maps domain exceptions to standardised ErrorResponse payloads with appropriate HTTP status codes.
- Structured logging: All services use Logback with a consistent pattern:
[service=<name>] [traceId=%X{traceId}] [jobId=%X{jobId}]. - Trace propagation: A UUID-based
traceIdis generated at the API layer, propagated through Kafka headers, and restored into MDC at each service boundary. - MDC lifecycle:
MdcFilter(API) and all Kafka listeners set and clear MDC fields intry/finallyblocks. - Health checks: All Docker services expose Spring Boot Actuator
/actuator/healthendpoints, used as Docker Compose health checks.
| Service | Image | Exposed Port |
|---|---|---|
postgres-api |
postgres:16 |
5432 |
postgres-scheduler |
postgres:16 |
5433 |
postgres-worker |
postgres:16 |
5434 |
zookeeper |
confluentinc/cp-zookeeper:7.6.1 |
2181 |
kafka |
confluentinc/cp-kafka:7.6.1 |
9092 |
jobweaver-api |
Custom (Dockerfile) | 8080 |
jobweaver-scheduler |
Custom (Dockerfile) | 8081 |
jobweaver-worker |
Custom (Dockerfile) | 8082 |
- Base image:
eclipse-temurin:21-jre-alpine - Non-root execution (
appuser:appgroup) - JVM flags:
-Duser.timezone=Asia/Kolkata,-XX:+UseContainerSupport,-XX:MaxRAMPercentage=75.0 - Spring profile:
docker(overrides database hosts and Kafka bootstrap servers)
Each service maintains its own PostgreSQL instance with an independent schema, enforcing strict data ownership boundaries. Cross-service data access is performed exclusively through Kafka events.