How it works
Job creation
When the frontend submits a POST /jobs request:
flowchart TD
A[POST /jobs] --> B[Validate request]
B --> C[Persist job to CockroachDB]
C --> D[Publish to RabbitMQ print.jobs]
D --> E[Return 201 with job ID]
The job is written to the database before the RabbitMQ publish. If the broker is temporarily unavailable, the job record still exists and the watchdog can requeue it later.
Job status flow
A job moves through the following states:
stateDiagram-v2
[*] --> Queued : POST /jobs
Queued --> Printing : print-svc picks up job
Printing --> Done : print-svc reports completion
Printing --> Error : max retries reached
Printing --> Requeued : watchdog requeues stuck job
Requeued --> Printing : print-svc picks up again
Status values are lowercase strings in the API: queued, printing, requeued, done, error.
SSE streaming
The GET /jobs/{jobId}/stream endpoint streams job status updates to the client using Server-Sent Events.
The internal pipeline is:
RabbitMQ print.status
--> PrintStatusConsumer
--> CockroachDB (update job row)
--> pg_notify('job_status', payload)
--> JobStatusListener (LISTEN/NOTIFY)
--> Channel<StatusUpdate>
--> SSE endpoint
--> client
PrintStatusConsumerreceives aprint.statusmessage from RabbitMQ, updates the job row in the database, and issues apg_notifycall in the same transaction.JobStatusListenerholds a persistent Npgsql connection listening on thejob_statuschannel. When a notification arrives it writes aStatusUpdateto an in-processChannel<StatusUpdate>.- Each active SSE connection reads from that channel and forwards updates matching its
jobIdto the HTTP response.
The SSE endpoint sends one initial event with the current job state immediately on connect. The connection closes automatically when the status reaches done or error.
JobStatusListener reconnects automatically with exponential backoff (1 s to 30 s) if the database connection is lost.
Stuck jobs watchdog
StuckJobsWatchdog runs on a configurable interval (default: every 5 minutes). Each run:
- Queries for jobs with status
printingwhoseupdatedAtis older thanStaleThresholdMinutes. - For each stale job:
- If
retryCount < MaxRetries: sets status torequeued, incrementsretryCount, and republishes the job toprint.jobsstarting from the last printed index. - If
retryCount >= MaxRetries: sets status toerrorwith message"max retries reached".
- If
- Emits a
pg_notifyfor each updated job so connected SSE clients receive the new status immediately.
Photo listing
GET /photos lists all objects under the low/ prefix in MinIO, applies pagination, and returns a pre-signed URL (valid 1 hour) for each object. The object keys are sorted lexicographically before pagination.
Logging
All log entries are structured and use named parameters.
Key log points:
| Event | Level |
|---|---|
| Database migrations applied | Information |
| Job created and published | Information |
| print.status message processed | Information |
| PrintStatusConsumer started | Information |
| JobStatusListener connected | Information |
| JobStatusListener connection lost | Warning |
| Watchdog found stale jobs | Warning |
| Job requeued by watchdog | Information |
| Job marked as error by watchdog | Warning |
| Invalid or null message received | Warning |
| Processing error (no requeue) | Error |