SQLite job queue · Node.js

Workmatic

A persistent job queue for Node.js using SQLite. Simple, reliable, and zero external dependencies beyond SQLite.

Full reference (mirrors README) — open this file locally or from the repo. Requires Node.js 18+.

Why?

fastq is fast and simple—but in-memory only. Workmatic combines that API feel with SQLite persistence: no Redis, no extra services—just a single file keeping jobs safe for small-to-medium workloads.

Fits well when…

  • You need durability without new infrastructure
  • Single-machine or embedded SQLite deployments
  • TypeScript + predictable queue semantics

Core ideas

  • Lease-based claiming (at-least-once)
  • Optional worker state persisted in DB
  • Kysely + better-sqlite3 under the hood

Features

Installation

Requires Node.js 18+.

npm install workmatic

Quick start

import { createDatabase, createClient, createWorker } from 'workmatic';

// Create database (use file path for persistence)
const db = createDatabase({ filename: './jobs.db' });

// Create a client to add jobs
const client = createClient({ db, queue: 'emails' });

// Add a job
const { id } = await client.add({ 
  to: 'user@example.com', 
  subject: 'Hello!' 
});
console.log(`Job created: ${id}`);

// Create a worker to process jobs
const worker = createWorker({
  db,
  queue: 'emails',
  concurrency: 4,
});

// Define the processor
worker.process(async (job) => {
  console.log(`Sending email to ${job.payload.to}`);
  await sendEmail(job.payload);
});

// Start processing
worker.start();

API Reference — createDatabase(options)

Initialize the database connection and schema.

import { createDatabase, getUnderlyingDb } from 'workmatic';

const db = createDatabase({
  // Option 1: File path (creates or opens existing)
  filename: './jobs.db',
  
  // Option 2: In-memory (for testing)
  filename: ':memory:',
  
  // Option 3: Existing better-sqlite3 instance
  db: existingSqliteInstance,
});

const sqlite = getUnderlyingDb(db);

createClient(options)

Create a client for adding jobs to a queue.

const client = createClient({
  db,                    // Required: Database instance
  queue: 'default',      // Optional: Queue name (default: 'default')
});

client.add(payload, options?)

Add a job to the queue.

const { ok, id } = await client.add(
  { email: 'user@example.com' },  // Payload (must be JSON-serializable)
  {
    priority: 0,      // Lower = higher priority (default: 0)
    delayMs: 5000,    // Delay before job becomes available (default: 0)
    maxAttempts: 3,   // Max retry attempts (default: 3)
  }
);

client.stats()

Get job statistics for the queue.

const stats = await client.stats();
// { ready: 5, running: 2, done: 100, dead: 1, total: 108 }

client.addMany(payloads, options?)

Many jobs in one transaction; same priority, delay, and maxAttempts for all.

const { ok, ids } = await client.addMany(
  [{ email: 'a@test.com' }, { email: 'b@test.com' }],
  { priority: 1, maxAttempts: 5 }
);

client.clear(options?)

Clear all jobs from the queue.

// Clear all jobs
const deleted = await client.clear();
console.log(`Deleted ${deleted} jobs`);

// Clear only jobs with specific status
const deleted = await client.clear({ status: 'done' });
console.log(`Deleted ${deleted} done jobs`);

createWorker(options)

Create a worker to process jobs from a queue.

const worker = createWorker({
  db,                     // Required: Database instance
  queue: 'default',       // Optional: Queue name (default: 'default')
  concurrency: 1,         // Optional: Parallel job count (default: 1)
  leaseMs: 30000,         // Optional: Job lease duration in ms (default: 30000)
  pollMs: 1000,           // Optional: Poll interval when idle (default: 1000)
  timeoutMs: 60000,       // Optional: default 60000; set 0 to disable
  backoff: (n) => 1000 * Math.pow(2, n),  // Optional: Retry backoff function
  persistState: false,    // Optional: Persist worker state to database (default: false)
  autoRestore: true,      // Optional: Auto-restore state on creation (default: true)
  pauseCheckIntervalMs: 300,       // Optional: throttle DB pause checks from pump
  requeueExpiredIntervalMs: 0,    // Optional: throttle lease requeue (0 = every pump)
  onPumpError: (err) => { /* after default log */ },
});

worker.process(fn)

worker.process(async (job) => {
  console.log(`Processing job ${job.id}`);
  console.log(`Payload:`, job.payload);
  console.log(`Attempt ${job.attempts + 1} of ${job.maxAttempts}`);
  
  // Do work here
  // Throw an error to trigger retry
});

worker.start()

worker.start();

worker.stop()

await worker.stop();

worker.pause() / worker.resume()

worker.pause();   // Stop claiming new jobs
worker.resume();  // Resume claiming jobs

worker.stats()

const stats = await worker.stats();

worker.clear(options?)

// Clear all jobs in the queue
const deleted = await worker.clear();
console.log(`Deleted ${deleted} jobs`);

// Clear only jobs with specific status
const deleted = await worker.clear({ status: 'dead' });
console.log(`Deleted ${deleted} dead jobs`);

Worker properties

worker.isRunning;  // boolean
worker.isPaused;   // boolean
worker.queue;      // string

Persist state mode

When persistState: true, worker state (running/paused/stopped) is saved—useful to remember state across restarts.

const worker = createWorker({
  db,
  queue: 'emails',
  persistState: true,   // Save state to database
  autoRestore: true,    // Auto-restore on creation (default: true)
});

worker.process(async (job) => {
  // ...
});

// Start the worker - state is saved as "running"
worker.start();

// Later, stop the worker - state is saved as "stopped"
await worker.stop();

// If the app restarts, the worker will NOT auto-start
// because the last saved state was "stopped"

With autoRestore: false, restore manually:

const worker = createWorker({
  db,
  persistState: true,
  autoRestore: false,  // Don't auto-restore
});

worker.process(async (job) => { /* ... */ });

// Manually check and restore saved state
const savedState = await worker.restoreState();
console.log(`Restored state: ${savedState}`); // 'running' | 'paused' | 'stopped' | null

createDashboard(options)

Standalone dashboard server.

const dashboard = createDashboard({
  db,                    // Required: Database instance
  port: 3000,            // Optional: HTTP port (default: 3000)
  workers: [worker1],    // Optional: Workers to control
});

console.log(`Dashboard at http://localhost:${dashboard.port}`);

// Later, close the server
await dashboard.close();

createDashboardMiddleware(options)

Express-compatible middleware to mount on an existing app.

import express from 'express';
import { createDashboardMiddleware } from 'workmatic';

const app = express();

// Mount dashboard at /workmatic
app.use(createDashboardMiddleware({
  db,                        // Required: Database instance
  basePath: '/workmatic',    // Optional: URL prefix (default: '')
  workers: [worker],         // Optional: Workers to control
}));

app.listen(3000);
// Dashboard available at http://localhost:3000/workmatic

Works with any stack that supports (req, res, next) middleware:

// Fastify
import fastify from 'fastify';
import middie from '@fastify/middie';

const app = fastify();
await app.register(middie);
app.use(createDashboardMiddleware({ db, basePath: '/jobs' }));

// Hono
import { Hono } from 'hono';

const app = new Hono();
app.use('/workmatic/*', (c) => {
  return new Promise((resolve) => {
    const middleware = createDashboardMiddleware({ db, basePath: '/workmatic' });
    middleware(c.env.incoming, c.env.outgoing, resolve);
  });
});

Job object

The job passed to processors:

interface Job<TPayload> {
  id: string;           // Unique public ID (nanoid)
  queue: string;        // Queue name
  payload: TPayload;    // Your job data
  status: JobStatus;    // 'ready' | 'running' | 'done' | 'dead'
  priority: number;     // Priority value
  attempts: number;     // Current attempt count (starts at 0)
  maxAttempts: number;  // Maximum attempts allowed
  createdAt: number;    // Unix timestamp (ms)
  lastError: string | null;  // Last error message
}

Durability model

Workmatic provides at-least-once delivery:

Idempotency recommendation

worker.process(async (job) => {
  // Check if already processed
  const exists = await db.checkProcessed(job.id);
  if (exists) return;
  
  // Process the job
  await processPayment(job.payload);
  
  // Mark as processed
  await db.markProcessed(job.id);
});

Job lifecycle

┌─────────┐     add()      ┌─────────┐
│  NEW    │ ─────────────▶ │  READY  │
└─────────┘                └────┬────┘
                               │
                         claim  │
                               ▼
                         ┌─────────┐
                         │ RUNNING │
                         └────┬────┘
                              │
             ┌────────────────┼────────────────┐
             │                │                │
        success          failure          failure
             │          (retries           (max
             │           left)           attempts)
             ▼                │                │
       ┌─────────┐           │                ▼
       │  DONE   │           │          ┌─────────┐
       └─────────┘           │          │  DEAD   │
                             │          └─────────┘
                             │
                             ▼
                   ┌─────────────────┐
                   │  READY (retry)  │
                   │  with backoff   │
                   └─────────────────┘

Options glossary

OptionDefaultDescription
queue'default'Queue name for job isolation
concurrency1Parallel job count
leaseMs30000Lease duration during processing (ms)
pollMs1000Poll interval when idle (ms)
timeoutMs60000Execution timeout in ms; 0 disables
priority0Lower = processed first
delayMs0Delay before job becomes available
maxAttempts3Max processing attempts
backoff2^n * 1000Retry delay function (returns ms)
pauseCheckIntervalMs300Min ms between DB pause checks in pump
requeueExpiredIntervalMs0Min ms between lease requeue scans (0 = each pump)
onPumpErrorOptional callback after default pump error log

Dashboard (built-in UI)

README references dashboard-screenshot.png. Add that image at the repository root if you want a screenshot in README; this HTML docs page omits it until the asset exists.

Examples

npx tsx examples/basic.ts
npx tsx examples/with-dashboard.ts

Benchmarks

# In-memory (fastest, for testing)
npm run bench

# File-based (realistic, persistent)
npm run bench -- --file

Micro benchmarks

Short suite: 2,000 sequential add() calls plus 1,000 client.stats() calls (queue already has jobs). Same paths as the full runner, less time.

npm run bench:micro
npm run bench:micro -- --file
BenchmarkIn-MemoryFile-based
Micro Sequential Insert (2,000)~27,000/s~9,300/s
Stats Query (×1,000)~9,200/s~5,500/s

Rounded indicative numbers; real throughput depends on disk, CPU, and DB “warmth” after heavier benchmarks.

Results comparison

BenchmarkIn-MemoryFile-based
Sequential Insert27,000/s13,000/s
Parallel Insert23,000/s12,000/s
Process (concurrency=1)1,100/s1,100/s
Process (concurrency=4)4,800/s4,800/s
Process (concurrency=8)10,000/s8,300/s
Process (concurrency=16)18,000/s5,700/s
Mixed Insert+Process7,500/s3,500/s
Micro Sequential Insert (2,000)~27,000/s~9,300/s
Stats Query (×1,000)~9,200/s~5,500/s
Claim + Process Batch23,600/s11,800/s

Note: File-based throughput drops at high concurrency due to disk I/O; concurrency=8 is often a sweet spot.

CLI

Manage jobs directly from the database file:

# Show job statistics
npx workmatic stats ./jobs.db

# List queues with pause status
npx workmatic queues ./jobs.db

# Pause/resume a queue (workers stop/start claiming new jobs)
npx workmatic pause ./jobs.db emails
npx workmatic resume ./jobs.db emails

# List jobs (with filters)
npx workmatic list ./jobs.db --status=dead --limit=10

# Export jobs to CSV
npx workmatic export ./jobs.db backup.csv
npx workmatic export ./jobs.db --status=dead > dead.csv

# Import jobs from CSV
npx workmatic import ./jobs.db backup.csv

# Delete jobs by status
npx workmatic purge ./jobs.db --status=done

# Retry dead jobs (reset to ready)
npx workmatic retry ./jobs.db --status=dead

CLI commands

CommandDescription
stats <db>Job counts by status and queue
queues <db>List queues + pause status
pause <db> <queue>Pause a queue
resume <db> <queue>Resume a queue
list <db>List jobs (filters optional)
export <db> [file]CSV export (stdout if no file)
import <db> <file>Import from CSV
purge <db> --status=XDelete by status
retry <db> --status=XReset jobs to ready

CLI options

OptionDescription
--status=<status>ready / running / done / dead
--queue=<queue>Filter by queue name
--limit=<n>Limit results (default 100)

Live pause/resume

CSV import (AI workflows)

public_id,queue,payload,status,priority,run_at,attempts,max_attempts,lease_until,created_at,updated_at,last_error
job_001,emails,"{""to"":""user@example.com"",""template"":""welcome""}",ready,0,1704067200000,0,3,0,1704067200000,1704067200000,
job_002,emails,"{""to"":""other@example.com"",""template"":""reminder""}",ready,5,1704067200000,0,3,0,1704067200000,1704067200000,
npx workmatic import ./jobs.db jobs.csv
cat ai-generated-jobs.csv | npx workmatic import ./jobs.db /dev/stdin

Architecture

┌──────────────────────────────────────────────────────────┐
│                        Your App                          │
├────────────────────┬─────────────────────────────────────┤
│      Client        │              Worker                 │
│  ┌─────────────┐   │   ┌─────────────┐  ┌─────────────┐  │
│  │   add()     │   │   │   pump()    │  │   fastq     │  │
│  │   stats()   │   │   │   claim()   │  │   pool      │  │
│  └─────────────┘   │   └─────────────┘  └─────────────┘  │
├────────────────────┴─────────────────────────────────────┤
│                    Kysely (Query Builder)                │
├──────────────────────────────────────────────────────────┤
│                  better-sqlite3 (SQLite)                 │
├──────────────────────────────────────────────────────────┤
│                      jobs.db (File)                      │
└──────────────────────────────────────────────────────────┘

Development & testing

The repository uses Vitest for unit tests.

npm test
npm run test:watch

Build before publish: npm run build (tsup), types: npm run build:types.