SQLite job queue · Node.js
A persistent job queue for Node.js using SQLite. Simple, reliable, and zero external dependencies beyond SQLite.
Full reference (mirrors README) — open this file locally or from the repo. Requires Node.js 18+.
fastq is fast and simple—but in-memory only. Workmatic combines that API feel with SQLite persistence: no Redis, no extra services—just a single file keeping jobs safe for small-to-medium workloads.
Requires Node.js 18+.
npm install workmatic
import { createDatabase, createClient, createWorker } from 'workmatic';
// Create database (use file path for persistence)
const db = createDatabase({ filename: './jobs.db' });
// Create a client to add jobs
const client = createClient({ db, queue: 'emails' });
// Add a job
const { id } = await client.add({
to: 'user@example.com',
subject: 'Hello!'
});
console.log(`Job created: ${id}`);
// Create a worker to process jobs
const worker = createWorker({
db,
queue: 'emails',
concurrency: 4,
});
// Define the processor
worker.process(async (job) => {
console.log(`Sending email to ${job.payload.to}`);
await sendEmail(job.payload);
});
// Start processing
worker.start();
createDatabase(options)Initialize the database connection and schema.
import { createDatabase, getUnderlyingDb } from 'workmatic';
const db = createDatabase({
// Option 1: File path (creates or opens existing)
filename: './jobs.db',
// Option 2: In-memory (for testing)
filename: ':memory:',
// Option 3: Existing better-sqlite3 instance
db: existingSqliteInstance,
});
const sqlite = getUnderlyingDb(db);
createClient(options)Create a client for adding jobs to a queue.
const client = createClient({
db, // Required: Database instance
queue: 'default', // Optional: Queue name (default: 'default')
});
client.add(payload, options?)Add a job to the queue.
const { ok, id } = await client.add(
{ email: 'user@example.com' }, // Payload (must be JSON-serializable)
{
priority: 0, // Lower = higher priority (default: 0)
delayMs: 5000, // Delay before job becomes available (default: 0)
maxAttempts: 3, // Max retry attempts (default: 3)
}
);
client.stats()Get job statistics for the queue.
const stats = await client.stats();
// { ready: 5, running: 2, done: 100, dead: 1, total: 108 }
client.addMany(payloads, options?)Many jobs in one transaction; same priority, delay, and maxAttempts for all.
const { ok, ids } = await client.addMany(
[{ email: 'a@test.com' }, { email: 'b@test.com' }],
{ priority: 1, maxAttempts: 5 }
);
client.clear(options?)Clear all jobs from the queue.
// Clear all jobs
const deleted = await client.clear();
console.log(`Deleted ${deleted} jobs`);
// Clear only jobs with specific status
const deleted = await client.clear({ status: 'done' });
console.log(`Deleted ${deleted} done jobs`);
createWorker(options)Create a worker to process jobs from a queue.
const worker = createWorker({
db, // Required: Database instance
queue: 'default', // Optional: Queue name (default: 'default')
concurrency: 1, // Optional: Parallel job count (default: 1)
leaseMs: 30000, // Optional: Job lease duration in ms (default: 30000)
pollMs: 1000, // Optional: Poll interval when idle (default: 1000)
timeoutMs: 60000, // Optional: default 60000; set 0 to disable
backoff: (n) => 1000 * Math.pow(2, n), // Optional: Retry backoff function
persistState: false, // Optional: Persist worker state to database (default: false)
autoRestore: true, // Optional: Auto-restore state on creation (default: true)
pauseCheckIntervalMs: 300, // Optional: throttle DB pause checks from pump
requeueExpiredIntervalMs: 0, // Optional: throttle lease requeue (0 = every pump)
onPumpError: (err) => { /* after default log */ },
});
worker.process(fn)worker.process(async (job) => {
console.log(`Processing job ${job.id}`);
console.log(`Payload:`, job.payload);
console.log(`Attempt ${job.attempts + 1} of ${job.maxAttempts}`);
// Do work here
// Throw an error to trigger retry
});
worker.start()worker.start();
worker.stop()await worker.stop();
worker.pause() / worker.resume()worker.pause(); // Stop claiming new jobs
worker.resume(); // Resume claiming jobs
worker.stats()const stats = await worker.stats();
worker.clear(options?)// Clear all jobs in the queue
const deleted = await worker.clear();
console.log(`Deleted ${deleted} jobs`);
// Clear only jobs with specific status
const deleted = await worker.clear({ status: 'dead' });
console.log(`Deleted ${deleted} dead jobs`);
worker.isRunning; // boolean
worker.isPaused; // boolean
worker.queue; // string
When persistState: true, worker state (running/paused/stopped) is saved—useful to remember state across restarts.
const worker = createWorker({
db,
queue: 'emails',
persistState: true, // Save state to database
autoRestore: true, // Auto-restore on creation (default: true)
});
worker.process(async (job) => {
// ...
});
// Start the worker - state is saved as "running"
worker.start();
// Later, stop the worker - state is saved as "stopped"
await worker.stop();
// If the app restarts, the worker will NOT auto-start
// because the last saved state was "stopped"
With autoRestore: false, restore manually:
const worker = createWorker({
db,
persistState: true,
autoRestore: false, // Don't auto-restore
});
worker.process(async (job) => { /* ... */ });
// Manually check and restore saved state
const savedState = await worker.restoreState();
console.log(`Restored state: ${savedState}`); // 'running' | 'paused' | 'stopped' | null
createDashboard(options)Standalone dashboard server.
const dashboard = createDashboard({
db, // Required: Database instance
port: 3000, // Optional: HTTP port (default: 3000)
workers: [worker1], // Optional: Workers to control
});
console.log(`Dashboard at http://localhost:${dashboard.port}`);
// Later, close the server
await dashboard.close();
createDashboardMiddleware(options)Express-compatible middleware to mount on an existing app.
import express from 'express';
import { createDashboardMiddleware } from 'workmatic';
const app = express();
// Mount dashboard at /workmatic
app.use(createDashboardMiddleware({
db, // Required: Database instance
basePath: '/workmatic', // Optional: URL prefix (default: '')
workers: [worker], // Optional: Workers to control
}));
app.listen(3000);
// Dashboard available at http://localhost:3000/workmatic
Works with any stack that supports (req, res, next) middleware:
// Fastify
import fastify from 'fastify';
import middie from '@fastify/middie';
const app = fastify();
await app.register(middie);
app.use(createDashboardMiddleware({ db, basePath: '/jobs' }));
// Hono
import { Hono } from 'hono';
const app = new Hono();
app.use('/workmatic/*', (c) => {
return new Promise((resolve) => {
const middleware = createDashboardMiddleware({ db, basePath: '/workmatic' });
middleware(c.env.incoming, c.env.outgoing, resolve);
});
});
The job passed to processors:
interface Job<TPayload> {
id: string; // Unique public ID (nanoid)
queue: string; // Queue name
payload: TPayload; // Your job data
status: JobStatus; // 'ready' | 'running' | 'done' | 'dead'
priority: number; // Priority value
attempts: number; // Current attempt count (starts at 0)
maxAttempts: number; // Maximum attempts allowed
createdAt: number; // Unix timestamp (ms)
lastError: string | null; // Last error message
}
Workmatic provides at-least-once delivery:
add() returnsdone after successful processingworker.process(async (job) => {
// Check if already processed
const exists = await db.checkProcessed(job.id);
if (exists) return;
// Process the job
await processPayment(job.payload);
// Mark as processed
await db.markProcessed(job.id);
});
┌─────────┐ add() ┌─────────┐
│ NEW │ ─────────────▶ │ READY │
└─────────┘ └────┬────┘
│
claim │
▼
┌─────────┐
│ RUNNING │
└────┬────┘
│
┌────────────────┼────────────────┐
│ │ │
success failure failure
│ (retries (max
│ left) attempts)
▼ │ │
┌─────────┐ │ ▼
│ DONE │ │ ┌─────────┐
└─────────┘ │ │ DEAD │
│ └─────────┘
│
▼
┌─────────────────┐
│ READY (retry) │
│ with backoff │
└─────────────────┘
| Option | Default | Description |
|---|---|---|
queue | 'default' | Queue name for job isolation |
concurrency | 1 | Parallel job count |
leaseMs | 30000 | Lease duration during processing (ms) |
pollMs | 1000 | Poll interval when idle (ms) |
timeoutMs | 60000 | Execution timeout in ms; 0 disables |
priority | 0 | Lower = processed first |
delayMs | 0 | Delay before job becomes available |
maxAttempts | 3 | Max processing attempts |
backoff | 2^n * 1000 | Retry delay function (returns ms) |
pauseCheckIntervalMs | 300 | Min ms between DB pause checks in pump |
requeueExpiredIntervalMs | 0 | Min ms between lease requeue scans (0 = each pump) |
onPumpError | — | Optional callback after default pump error log |
README references dashboard-screenshot.png. Add that image at the repository root if you want a screenshot in README; this HTML docs page omits it until the asset exists.
basic.ts — Simple job processingadvanced.ts — Priority, delays, retrieswith-dashboard.ts — Dashboard monitoringnpx tsx examples/basic.ts
npx tsx examples/with-dashboard.ts
# In-memory (fastest, for testing)
npm run bench
# File-based (realistic, persistent)
npm run bench -- --file
Short suite: 2,000 sequential add() calls plus 1,000 client.stats() calls (queue already has jobs). Same paths as the full runner, less time.
npm run bench:micro
npm run bench:micro -- --file
| Benchmark | In-Memory | File-based |
|---|---|---|
| Micro Sequential Insert (2,000) | ~27,000/s | ~9,300/s |
| Stats Query (×1,000) | ~9,200/s | ~5,500/s |
Rounded indicative numbers; real throughput depends on disk, CPU, and DB “warmth” after heavier benchmarks.
| Benchmark | In-Memory | File-based |
|---|---|---|
| Sequential Insert | 27,000/s | 13,000/s |
| Parallel Insert | 23,000/s | 12,000/s |
| Process (concurrency=1) | 1,100/s | 1,100/s |
| Process (concurrency=4) | 4,800/s | 4,800/s |
| Process (concurrency=8) | 10,000/s | 8,300/s |
| Process (concurrency=16) | 18,000/s | 5,700/s |
| Mixed Insert+Process | 7,500/s | 3,500/s |
| Micro Sequential Insert (2,000) | ~27,000/s | ~9,300/s |
| Stats Query (×1,000) | ~9,200/s | ~5,500/s |
| Claim + Process Batch | 23,600/s | 11,800/s |
Note: File-based throughput drops at high concurrency due to disk I/O; concurrency=8 is often a sweet spot.
Manage jobs directly from the database file:
# Show job statistics
npx workmatic stats ./jobs.db
# List queues with pause status
npx workmatic queues ./jobs.db
# Pause/resume a queue (workers stop/start claiming new jobs)
npx workmatic pause ./jobs.db emails
npx workmatic resume ./jobs.db emails
# List jobs (with filters)
npx workmatic list ./jobs.db --status=dead --limit=10
# Export jobs to CSV
npx workmatic export ./jobs.db backup.csv
npx workmatic export ./jobs.db --status=dead > dead.csv
# Import jobs from CSV
npx workmatic import ./jobs.db backup.csv
# Delete jobs by status
npx workmatic purge ./jobs.db --status=done
# Retry dead jobs (reset to ready)
npx workmatic retry ./jobs.db --status=dead
| Command | Description |
|---|---|
stats <db> | Job counts by status and queue |
queues <db> | List queues + pause status |
pause <db> <queue> | Pause a queue |
resume <db> <queue> | Resume a queue |
list <db> | List jobs (filters optional) |
export <db> [file] | CSV export (stdout if no file) |
import <db> <file> | Import from CSV |
purge <db> --status=X | Delete by status |
retry <db> --status=X | Reset jobs to ready |
| Option | Description |
|---|---|
--status=<status> | ready / running / done / dead |
--queue=<queue> | Filter by queue name |
--limit=<n> | Limit results (default 100) |
resume restores claimingpublic_id,queue,payload,status,priority,run_at,attempts,max_attempts,lease_until,created_at,updated_at,last_error
job_001,emails,"{""to"":""user@example.com"",""template"":""welcome""}",ready,0,1704067200000,0,3,0,1704067200000,1704067200000,
job_002,emails,"{""to"":""other@example.com"",""template"":""reminder""}",ready,5,1704067200000,0,3,0,1704067200000,1704067200000,
npx workmatic import ./jobs.db jobs.csv
cat ai-generated-jobs.csv | npx workmatic import ./jobs.db /dev/stdin
┌──────────────────────────────────────────────────────────┐ │ Your App │ ├────────────────────┬─────────────────────────────────────┤ │ Client │ Worker │ │ ┌─────────────┐ │ ┌─────────────┐ ┌─────────────┐ │ │ │ add() │ │ │ pump() │ │ fastq │ │ │ │ stats() │ │ │ claim() │ │ pool │ │ │ └─────────────┘ │ └─────────────┘ └─────────────┘ │ ├────────────────────┴─────────────────────────────────────┤ │ Kysely (Query Builder) │ ├──────────────────────────────────────────────────────────┤ │ better-sqlite3 (SQLite) │ ├──────────────────────────────────────────────────────────┤ │ jobs.db (File) │ └──────────────────────────────────────────────────────────┘
The repository uses Vitest for unit tests.
npm test
npm run test:watch
Build before publish: npm run build (tsup), types: npm run build:types.