Job queue (battery/queue)

battery/queue is a pluggable job queue with three backends — in-memory,
SQL (SQLite + Postgres), and Redis. It handles enqueue, dequeue, retry
with optional exponential backoff, dead-letter capture, inspection, and
replay, and pairs with a Scheduler for recurring jobs.

Backends at a glance

MemoryQueueDBQueueRedisQueue
Durable across restartNoYesYes
Multiple workersYesYesManual
Worker loop built inYesYesNo (bring your own, or use Start)
Auto-reclaim crashed workersYes (lease expiry in SQL)Yes (visibility timeout + Start)
Dead-letter captureYes (bounded, 1 000 jobs)Yes (status failed)Yes (Redis list)
BrowsableYesYesYes (dead-letter only)
ReplayableYesYesYes
Scheduler integrationYesYesYes

Pick MemoryQueue for tests and single-process prototypes. Use DBQueue
when you need durability and multi-replica safety (Postgres FOR UPDATE SKIP LOCKED). Use RedisQueue when you already run Redis and want the
visibility-timeout model.

Quickstart — MemoryQueue

15 lines
import "github.com/DonaldMurillo/gofastr/battery/queue"q := queue.NewMemoryQueue(4) // 4 worker goroutinesq.RegisterHandler("send-email", func(ctx context.Context, job queue.Job) error {    return sendEmail(ctx, job.Payload)})q.Start()defer q.Close()_ = q.Enqueue(ctx, queue.Job{    Type:    "send-email",    Payload: json.RawMessage(`{"to":"user@example.com"}`),})

Quickstart — DBQueue

17 lines
db, _ := sql.Open("postgres", dsn)q, err := queue.NewDBQueue(db,    queue.WithWorkers(4),    queue.WithLeaseTimeout(2*time.Minute),    queue.WithBackoff(5*time.Second, 5*time.Minute),)if err != nil {    log.Fatal(err)}q.RegisterHandler("process-upload", func(ctx context.Context, job queue.Job) error {    return processUpload(ctx, job.Payload)})q.Start(ctx)defer q.Close()

NewDBQueue creates the queue_jobs table and its index if they do not
exist. Pass WithTable("my_jobs") to use a custom table name.

Quickstart — RedisQueue

23 lines
// client implements queue.RedisClient — wrap go-redis, redigo, etc.q := queue.NewRedisQueue(client, "myapp:jobs")q.SetVisibilityTimeout(30 * time.Second)// Launch the auto-reclaim ticker (re-delivers crashed-worker jobs).q.Start(ctx, 30*time.Second)// Enqueue_ = q.Enqueue(ctx, queue.Job{Type: "notify", Payload: payload})// Dequeue + process manually (no built-in worker pool for Redis)for {    job, err := q.Dequeue(ctx)    if errors.Is(err, queue.ErrNoJob) {        time.Sleep(time.Second)        continue    }    if err := handle(ctx, job); err != nil {        _ = q.Nack(ctx, job.ID)    } else {        _ = q.Ack(ctx, job.ID)    }}

RedisQueue does not include a built-in worker loop — you drive
Dequeue/Ack/Nack yourself, or integrate with a third-party pool. Call
Start to enable the auto-reclaim ticker (see "Crash safety" below).

Job struct

10 lines
type Job struct {    ID          string          // auto-filled by Enqueue if empty    Type        string          // required — selects the handler    Payload     json.RawMessage // arbitrary JSON for the handler    Priority    int             // higher = dequeued first (DBQueue only)    Attempts    int             // incremented on each claim    MaxAttempts int             // auto-defaults to 3; 0 means 3    CreatedAt   time.Time       // auto-filled if zero    ScheduledAt time.Time       // auto-filled to now; set to delay execution}

Scheduled jobs (future ScheduledAt) are invisible to Dequeue until
the moment passes. This lets you implement delayed processing without a
separate scheduler.

Retry and backoff

By default, a Nack with attempts remaining makes the job immediately
eligible again (next Dequeue can pick it up).

WithBackoff(base, max) turns on exponential backoff for DBQueue:

1 lines
queue.WithBackoff(5*time.Second, 5*time.Minute)

The n-th retry delay is base × 2^(n-1), capped at max. A job that
Nacks on attempt 1 waits ~5s; attempt 2 waits ~10s; attempt 3 waits
~20s; etc., up to 5m.

Once Attempts >= MaxAttempts, the job moves to the dead-letter state
instead of being retried.

Dead-letter and replay

When a job exhausts MaxAttempts, it is retained as a terminally-failed
job (never silently dropped):

  • MemoryQueue: stored in a bounded in-memory slice (cap 1 000; oldest
    evicted on overflow).
  • DBQueue: row status set to 'failed'.
  • RedisQueue: appended to the <queue>:dead Redis list.

Replay a failed job (reset attempts to 0 and re-enqueue):

6 lines
// Type-assert the Replayable capability (all three backends implement it).if r, ok := q.(queue.Replayable); ok {    if err := r.Replay(ctx, jobID); err != nil {        log.Printf("replay failed: %v", err)    }}

Replay is idempotent: replaying an unknown ID or a non-failed job is a
no-op (returns nil, no side effect).

Inspecting jobs (Browsable)

All three backends implement Browsable:

5 lines
if b, ok := q.(queue.Browsable); ok {    jobs, _ := b.ListJobs(ctx, "failed", 50)    stats, _ := b.Stats(ctx)    fmt.Println("failed:", stats["failed"])}

ListJobs accepts a status string ("pending", "failed", "" for
all) and a limit. Jobs are returned newest-first. Stats returns a
JobStats map (status → count).

MemoryQueue and RedisQueue can only enumerate their dead-letter store,
so only "failed" (or "") returns results. DBQueue can enumerate any
status.

Crash safety and auto-reclaim

DBQueue reclaims stale-claimed jobs automatically inside Dequeue:
a row in claimed status whose claimed_at has passed the configured
lease timeout (default 5 min) becomes eligible again. No extra
configuration needed.

RedisQueue uses a visibility timeout: while a job is in-flight it
sits in a processing hash with an expiry timestamp. Call
RedisQueue.Start(ctx, interval) to run an auto-reclaim ticker:

1 lines
q.Start(ctx, 30*time.Second) // checks every 30 s; 0 defaults to 30 s

The ticker calls q.Reclaim(ctx) on each tick, which scans the
processing hash and re-enqueues any job whose expiresAt has passed.
Without Start, crashed-worker jobs strand silently until you call
Reclaim manually.

You can also call Reclaim directly from your own ticker:

2 lines
n, err := q.Reclaim(ctx)fmt.Printf("reclaimed %d jobs\n", n)

Scheduler

Scheduler enqueues recurring jobs onto one or more queue backends:

15 lines
sched := queue.NewScheduler(q)           // or NewSchedulerWithLogger(q, logger)// Fixed interval — fires every 5 minutes.sched.Every(5 * time.Minute).    Job("send-digest", json.RawMessage(`{}`)).    Register()// Cron expression — fires every day at 02:00.if err := sched.Cron("0 2 * * *").    Job("nightly-rollup", nil).    Register(); err != nil {    log.Fatalf("bad cron spec: %v", err)}go sched.Start(ctx) // blocks until ctx is cancelled

Every(d) schedules fire on a fixed interval; Cron(spec) schedules
fire when the cron expression's next time arrives — use it for
time-of-day work like "every day at 02:00" that an interval cannot
express. The spec is parsed by framework/cron (cron.Parse),
so the queue does not carry a second cron parser; it accepts the same
5-field syntax and @shortcuts (e.g. @daily). The two kinds coexist
in one scheduler.

Register() returns an error only when a Cron spec is invalid —
Every schedules never error, so existing callers that ignore the
return value are unaffected. RegisterAt(base) is the deterministic
variant: it anchors the first run to base instead of time.Now(),
which is handy for tests and replayed fixtures.

When the scheduler runs, the wake interval is the smallest of the
interval schedules and one minute (cron resolution); a cron-only
scheduler wakes once per minute.

Multiple queues can be passed to NewScheduler — the job is enqueued
onto all of them. Enqueue errors are logged via slog.Default().
NewSchedulerWithLogger lets you supply a custom *slog.Logger.

Scheduler fires in-process, not via a distributed lock. On multiple
replicas, either run the scheduler on one instance only or gate the
handler behind a lock so the actual work is done once.

Handler registration

Handlers are registered by job type. Unregistered types are acknowledged
(dropped) so they never loop. Handlers are safe to register concurrently
with a running worker loop.

4 lines
q.RegisterHandler("resize-image", func(ctx context.Context, job queue.Job) error {    // Return a non-nil error to Nack (retry or dead-letter).    return resizeImage(ctx, job.Payload)})

A handler panic is recovered and treated as an error — the job follows the
normal retry path and the worker goroutine is respawned, so a poison
message cannot drain the worker pool.

RedisClient interface

RedisQueue accepts any client that implements queue.RedisClient:

11 lines
type RedisClient interface {    LPush(ctx, key string, values ...interface{}) error    RPop(ctx, key string) (string, error)    HSet(ctx, key string, values ...interface{}) error    HGet(ctx, key, field string) (string, error)    HGetAll(ctx, key string) (map[string]string, error)    HDel(ctx, key string, fields ...string) error    Del(ctx, keys ...string) error    LRange(ctx, key string, start, stop int64) ([]string, error)    LRem(ctx, key string, count int64, value interface{}) (int64, error)}

Wrap your preferred driver (go-redis, redigo, etc.) with a thin adapter
that maps to this interface.

Sentinel errors

2 lines
queue.ErrNoJob       // Dequeue: nothing ready right nowqueue.ErrQueueClosed // Enqueue: queue was already closed

Common mistakes

  • Not calling q.Start(ctx, interval) on RedisQueue. Without it,
    crashed-worker jobs strand in the processing hash indefinitely.
  • Closing MemoryQueue before workers drain. Close waits for
    in-flight handlers to finish — call it after all producers are done.
  • Replaying a job that is still pending. Replay only touches
    terminal (failed) entries — replaying a pending job is a no-op.
  • Running the Scheduler on every replica. Multiple replicas fire
    the same tick. Either pin the scheduler to one instance or use a DB
    advisory lock to ensure the enqueued work is done once.
  • Ignoring Nack errors. A Nack failure means the job stays in
    the processing hash (Redis) or claimed state (DB) and will be
    auto-reclaimed later — but log the error so you can spot connection
    issues early.