Background tasks (pgqueuer)

kiln-generated apps integrate with pgqueuer — a PostgreSQL-backed job queue that uses LISTEN / NOTIFY for low-latency dispatch. The queue lives in the same database as your app, so producer-side enqueue can join the request’s transaction (transactional outbox) and you don’t run a separate broker.

be contributes two helpers in ingot.queue that bridge SQLAlchemy and pgqueuer:

  • ingot.queue.get_queue() — wraps a SQLAlchemy AsyncSession so the request can enqueue jobs inside its own transaction.

  • ingot.queue.open_worker_driver() — opens an asyncpg connection from a DSN, coercing SQLAlchemy’s postgresql+asyncpg:// prefix to plain postgresql:// so the same env var works for both halves.

Everything else (defining tasks, running the worker, tuning entrypoints) is straight pgqueuer — be does not wrap, scaffold, or rename pgqueuer’s API.

Prerequisites

pgqueuer[asyncpg] is already a runtime dep of kiln-generator, pulled in transitively when you install the package. No extra install is needed.

You need PostgreSQL 9.5 or newer (any version with LISTEN / NOTIFY).

Schema setup

pgqueuer ships its own schema: one table for jobs, one for stats, plus an enum, an index, and a notification trigger. The schema is not part of your alembic chain — pgqueuer’s CLI owns it.

Run the install once per environment, before the worker first starts:

pgq install --pg-dsn "$DATABASE_URL"

If you upgrade pgqueuer and the new version ships schema changes, re-run pgq install. Uninstall with pgq uninstall --pg-dsn ....

Note

The DSN passed to pgq install should be a plain postgresql://... URL. If your env var is SQLAlchemy-shaped (postgresql+asyncpg://...), strip the +asyncpg first or point pgq install at a separate plain-DSN env var.

Defining tasks

A pgqueuer task is just an async function taking a pgqueuer.Job. Put them anywhere importable:

# blog/queue/tasks.py
from pgqueuer import Job


async def index_article(job: Job) -> None:
    article_id = job.payload.decode()
    # ... look up the article, push to search index, etc.


async def send_welcome(job: Job) -> None:
    user_id = job.payload.decode()
    # ... send the email

job.payload is bytes | None. JSON-encode if you need structured data: json.dumps({"id": ...}).encode().

Worker factory

The factory creates a pgqueuer.PgQueuer, registers each entrypoint, and returns it. Use ingot.queue.open_worker_driver() to open the connection — it strips the +asyncpg prefix so the same DATABASE_URL env var works for both the request path and the worker.

# blog/queue/main.py
import os
from datetime import timedelta

from pgqueuer import PgQueuer

from ingot.queue import open_worker_driver

from blog.queue.tasks import index_article, send_welcome


async def main() -> PgQueuer:
    """Worker factory invoked by ``pgq run blog.queue.main:main``."""
    async with open_worker_driver(os.environ["DATABASE_URL"]) as driver:
        pgq = PgQueuer(driver)

        pgq.entrypoint("index_article", concurrency_limit=4)(
            index_article
        )
        pgq.entrypoint(
            "send_welcome",
            concurrency_limit=2,
            requests_per_second=10.0,
            retry_timer=timedelta(seconds=30),
        )(send_welcome)

        return pgq

You can also use @pgq.entrypoint(...) as a decorator inline:

async def main() -> PgQueuer:
    async with open_worker_driver(os.environ["DATABASE_URL"]) as driver:
        pgq = PgQueuer(driver)

        @pgq.entrypoint("index_article", concurrency_limit=4)
        async def index_article(job): ...

        return pgq

Available entrypoint kwargs (from pgqueuer):

  • concurrency_limit: int — max parallel jobs (default 0 = unlimited).

  • requests_per_second: float — token-bucket rate limit (default inf).

  • retry_timer: timedelta — re-queue jobs stuck in-progress longer than this (default timedelta(0) = no retry).

  • serialized_dispatch: bool — force sequential per dedupe_key (default False).

Running the worker

pgqueuer ships its own CLI; nothing kiln-specific:

pgq run blog.queue.main:main

Long-running. Run it as a service / pod / supervised process.

To split work across pod classes — e.g. a small fast pool and a larger slow pool — write multiple factories and point each deployment at the right one:

# pod class A
pgq run blog.queue.fast:main

# pod class B
pgq run blog.queue.slow:main

Both pods share one binary; only the entrypoints registered in the factory each pod uses are pulled.

Enqueueing from a request

This is where be’s get_queue() shines. It returns a pgqueuer.Queries bound to the asyncpg connection underlying your SQLAlchemy session — so enqueue runs in the same transaction as your other writes. If the request commits, the job is durable; if it rolls back, the job never existed.

In an action body (kiln-generated routes pass session automatically):

# blog/actions.py
from uuid import UUID

from sqlalchemy.ext.asyncio import AsyncSession

from ingot.queue import get_queue


async def publish(article_id: UUID, session: AsyncSession) -> None:
    # ... whatever business logic updates the article row ...
    article = await session.get(Article, article_id)
    article.status = "published"

    queue = await get_queue(session)
    await queue.enqueue(
        ["index_article"],
        [str(article_id).encode()],
    )
    # The session commit (handled by be) makes both the row
    # update AND the job insert durable atomically.  Roll back
    # and neither one happened.

pgqueuer.Queries.enqueue accepts:

await queue.enqueue(
    entrypoint,         # str or list[str] — task name(s)
    payload,            # bytes | None or list[bytes | None]
    priority=0,         # int or list[int] — higher runs first
    execute_after=None, # timedelta — delay job start
    dedupe_key=None,    # str — drop duplicates within window
    headers=None,       # dict[str, str] — arbitrary metadata
)

Pass lists to enqueue many jobs at once in a single round-trip.

Testing

Tasks are plain async functions; test them directly with a fake pgqueuer.Job:

from datetime import datetime, UTC
from uuid import uuid4

import pytest
from pgqueuer import Job

from blog.queue.tasks import index_article


@pytest.mark.asyncio
async def test_index_article():
    article_id = uuid4()
    fake_job = Job(
        id=1,
        priority=0,
        created=datetime.now(UTC),
        updated=datetime.now(UTC),
        status="picked",
        entrypoint="index_article",
        payload=str(article_id).encode(),
        queue_manager_id=uuid4(),
    )
    await index_article(fake_job)
    # ... assertions

For end-to-end tests of enqueue-from-request, use a real Postgres (testcontainers / docker-compose). get_queue needs a live asyncpg connection underlying the session.

Common pitfalls

SQLAlchemy URL prefix

Your DATABASE_URL env var probably starts with postgresql+asyncpg:// because that’s what SQLAlchemy wants. open_worker_driver() strips the +asyncpg automatically. If you bypass it and call asyncpg.connect yourself, strip the prefix first or asyncpg will reject the DSN.

Worker connection ≠ request pool

The worker needs its own long-lived connection so pgqueuer can LISTEN on it. Don’t try to share the SQLAlchemy pool — you will lose notifications. open_worker_driver() opens a fresh connection for exactly this reason.

Session not yet checked out

get_queue() walks session.connection() get_raw_connection() driver_connection. If the session has never been used, the underlying driver connection may be None and get_queue raises RuntimeError. Doing any read first (or await session.connection()) before enqueue is enough.

payload is bytes

Not dict, not str. JSON-encode if you need structured data: payload=json.dumps({"id": ...}).encode().

Forgot pgq install

Worker will start and then immediately error trying to query a missing table. Run the install once per fresh database (and again after upgrading pgqueuer).

Schema migrations

be’s alembic chain doesn’t manage pgqueuer’s tables. Don’t try to put them in there — let pgqueuer’s CLI own the schema. This is the pattern recommended by pgqueuer itself.