Communication platform¶
kiln-generated apps can ship a typed, durable communication platform: declare your communication types (with a Pydantic context schema and templates), point at your transports (SMTP, Twilio, FCM, …), and the generator wires up a registry, a producer wrapper, and a pgqueuer-backed worker dispatch handler. The platform reuses the same transactional-outbox bridge described in Background tasks (pgqueuer) – a send is durable iff the request transaction commits.
The platform is fully opt-in. Without project.comms set,
the generator emits zero references to it; the runtime cost is
exactly zero.
No HTTP routes are emitted. Triggering a send is your concern – typically inside an action handler, a webhook receiver, or a scheduled job. See Sending.
What you get¶
The generator produces _generated/comms.py exposing three
symbols:
registryA populated
ingot.comms.CommRegistry– oneCommTypeper declared entry.send_comm(...)Thin wrapper over
ingot.comms.send_communication()with your message / recipient ORM classes and the registry pre-bound. Call it from any request handler.dispatchA pre-built worker handler from
ingot.comms.make_dispatch_entrypoint(). Register it againstingot.comms.DISPATCH_ENTRYPOINTon your pgqueuer instance; the handler routes each job to the right transport and stamps success / failure on the recipient row.
The runtime helpers (mixins, transport / preference / renderer
protocols, the producer + dispatch builders) live in
ingot.comms.
Enabling¶
Add a comms block to your project config using the
be/comms/comms.libsonnet helper:
local comms = import 'be/comms/comms.libsonnet';
local db = import 'be/db/databases.libsonnet';
{
databases: [db.postgres('primary', { default: true })],
comms: comms.platform({
message_model: 'myapp.models.CommMessage',
recipient_model: 'myapp.models.CommRecipient',
transports: {
email: 'myapp.comms.transports.email_transport',
sms: 'myapp.comms.transports.sms_transport',
},
preferences: 'myapp.comms.prefs.resolver', // optional
types: [
comms.type({
name: 'order_shipped',
context_schema: 'myapp.comms.contexts.OrderShipped',
subject_template: 'Order {{ order_id }} shipped',
body_template: comms.path('templates/order_shipped.html'),
default_methods: ['email', 'sms'],
}),
],
}),
apps: [...],
}
The full schema lives in be.config.schema.CommsConfig and
be.config.schema.CommTypeConfig.
Models¶
You own the three tables; ingot owns the columns. Same idiom as
ingot.files.FileMixin and
ingot.rate_limit.RateLimitBucketMixin.
# myapp/models.py
import uuid
from sqlalchemy import ForeignKey
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from ingot.comms import (
MessageMixin,
NotificationPreferenceMixin,
RecipientMixin,
)
class Base(DeclarativeBase):
pass
class CommMessage(Base, MessageMixin):
__tablename__ = 'comm_messages'
class CommRecipient(Base, RecipientMixin):
__tablename__ = 'comm_recipients'
# The mixin leaves message_id FK-free so it works against
# any message table name; bolt on the FK in your subclass.
message_id: Mapped[uuid.UUID] = mapped_column(
ForeignKey('comm_messages.id', ondelete='CASCADE'),
nullable=False,
index=True,
)
class NotificationPreference(Base, NotificationPreferenceMixin):
__tablename__ = 'comm_preferences'
Migrations are yours – be doesn’t generate alembic. Run
alembic revision --autogenerate against your Base.metadata;
all three tables come out as standard CREATE TABLE.
What’s stored:
MessageMixin– the intent.comm_type, the JSON-dumped context, the renderedsubject/body,created_at. Storing the rendered output (not just the context) means template churn doesn’t invalidate the audit log.RecipientMixin– one row per(message, method, address). Carriesstatus,sent_at,error. The pgqueuer job payload is the recipient’s id.NotificationPreferenceMixin–(subject_key, comm_type, method) -> enabled. Looked up by yourPreferenceResolverimplementation; the mixin only supplies the columns.
Comm types¶
A comm type binds a name (the registry key) to:
A Pydantic context schema – validated before render.
A subject template and a body template.
A list of default methods (
email,sms, …) – a documentation hint today; the platform always uses caller-supplied recipients.
# myapp/comms/contexts.py
from pydantic import BaseModel
class OrderShipped(BaseModel):
order_id: str
customer_name: str
class PasswordReset(BaseModel):
reset_link: str
The context_schema field in the jsonnet config is a dotted
import path to one of these classes. The generator wires the import
into _generated/comms.py; you don’t import them yourself.
Templates: inline or file¶
subject_template and body_template accept either an inline
string or a file reference:
// Inline (most common -- short subjects, SMS bodies).
subject_template: 'Order {{ order_id }} shipped',
// File-backed (HTML email bodies, MJML, anything multi-line).
body_template: comms.path('templates/order_shipped.html'),
Resolution semantics for comms.path(...):
Path is interpreted relative to the directory in which
codegen generateis invoked (or absolute).The file is read once at build time.
Its contents are inlined into
_generated/comms.pyas a string literal.
The generated tree carries no runtime file dependency. Re-running
codegen generate is the only way template-file edits propagate.
A missing file raises FileNotFoundError with the comm-type
name and the offending field (subject vs body) in the
message.
Transports¶
A transport is a small adapter implementing the
Transport protocol – one async send
method that takes the message + recipient rows and either delivers
or raises.
# myapp/comms/transports.py
import smtplib
from email.message import EmailMessage
from ingot.comms import MessageMixin, RecipientMixin
class SmtpTransport:
def __init__(self, host: str, sender: str) -> None:
self._host = host
self._sender = sender
async def send(
self,
*,
message: MessageMixin,
recipient: RecipientMixin,
) -> None:
msg = EmailMessage()
msg['From'] = self._sender
msg['To'] = recipient.address
msg['Subject'] = message.subject or ''
msg.set_content(message.body)
with smtplib.SMTP(self._host) as smtp:
smtp.send_message(msg)
# Module-level instance -- the dotted path in project.jsonnet
# points here.
email_transport = SmtpTransport(
host='smtp.internal',
sender='[email protected]',
)
The dotted paths in the transports map of your config resolve
to instances, not classes – you construct them once at
import time.
For tests and local development, ingot ships
LoggingTransport – it appends every send
into an in-memory list you can assert against.
Renderer¶
By default the platform uses the in-process
JinjaRenderer (Jinja2 against the validated
context dump). Swap it out by setting the optional renderer
field in your config:
comms: comms.platform({
...,
renderer: 'myapp.comms.renderers.node_renderer',
})
The dotted path resolves to an instance implementing the
single-method Renderer protocol:
from ingot.comms import CommType, RenderedMessage
from pydantic import BaseModel
class NodeRenderer:
"""HTTP-call into a separate Node template service."""
async def render(
self,
comm_type: CommType,
context: BaseModel,
) -> RenderedMessage:
# POST context to your renderer service, return its result.
...
The same shape fits any out-of-process renderer (MJML compiler, React-email service, …). Producer-side code never changes; only the configured instance.
Preferences¶
Per-recipient opt-in is implemented behind the
PreferenceResolver protocol. The platform
calls the resolver once per recipient that has a non-None
subject_key; recipients with subject_key=None skip the
check entirely (typical for non-user addresses like a billing
inbox).
A minimal database-backed resolver:
# myapp/comms/prefs.py
from sqlalchemy import select
from myapp.db import SessionLocal
from myapp.models import NotificationPreference
class DbPreferenceResolver:
async def is_enabled(
self,
*,
subject_key: str,
comm_type: str,
method: str,
) -> bool:
async with SessionLocal() as session:
row = (
await session.execute(
select(NotificationPreference.enabled).where(
NotificationPreference.subject_key == subject_key,
NotificationPreference.comm_type == comm_type,
NotificationPreference.method == method,
)
)
).scalar_one_or_none()
# Absent row defaults to opt-in.
return True if row is None else bool(row)
resolver = DbPreferenceResolver()
An opted-out recipient yields no RecipientMixin row and
no pgqueuer job. The MessageMixin row
still records the attempt so the audit trail is honest.
be_root scaffold for the preference layer¶
If you bootstrapped with be_root and set
notification_preferences: true (requires comms: true), the
generator emits all of this for you:
The stub
PreferenceResolverincomms.pyis replaced by a realDbPreferenceResolverthat queries{module}.models.NotificationPreference. Default policy: absent row reads as opt-in – flip the fallback inis_enabledif your project defaults to opt-out.The per-app
config/{module}.jsonnetgains a full-CRUD resource at/notification-preferencesso users can manage their own preferences over HTTP. The resource is auth-required but otherwise open; add acan_listguard scoped tosession.subto restrict each session to its own rows.
You still own the NotificationPreference SQLAlchemy class
itself – subclass NotificationPreferenceMixin
on your project’s Base and migrate the table. The bootstrap
points the resolver and the resource at
{module}.models.NotificationPreference; place the class there
or update both dotted paths to match.
Sending¶
send_comm runs inside the request handler’s transaction. The
single await does five things:
Validate
contextagainst the comm type’scontext_schema.Render the templates with the configured renderer.
Insert the
MessageMixinrow.For each recipient, consult the
PreferenceResolver(if configured) and insert aRecipientMixinrow for each one that passes.Enqueue one pgqueuer job per surviving recipient under
ingot.comms.DISPATCH_ENTRYPOINT, payload = recipient id.
Everything rides the request session’s transaction. When the request commits, the message row, recipient rows, and pgqueuer jobs all become durable atomically. Roll back and the comm never happened.
from sqlalchemy.ext.asyncio import AsyncSession
from ingot.comms import RecipientSpec
from ingot.queue import get_queue
from myapp.comms.contexts import OrderShipped
from _generated.comms import send_comm
async def notify_shipped(order, *, db: AsyncSession) -> None:
queue = await get_queue(db) # rides the same transaction
await send_comm(
session=db,
queue=queue,
comm_type='order_shipped',
context=OrderShipped(
order_id=order.id,
customer_name=order.customer.name,
),
recipients=[
RecipientSpec(
method='email',
address=order.customer.email,
subject_key=str(order.customer.id), # opens prefs
),
RecipientSpec(
method='sms',
address=order.customer.phone,
subject_key=str(order.customer.id),
),
],
)
The context argument also accepts a plain dict – the registry
validates it against the declared Pydantic schema before rendering.
Worker¶
Register the generated dispatch handler against
ingot.comms.DISPATCH_ENTRYPOINT in your pgqueuer worker
factory. Any other entrypoints you have go alongside it – the
comms handler is just one more entry in the same factory.
# myapp/worker.py
import os
from pgqueuer import PgQueuer
from ingot.comms import DISPATCH_ENTRYPOINT
from ingot.queue import open_worker_driver
from _generated.comms import dispatch
async def main() -> PgQueuer:
async with open_worker_driver(os.environ['DATABASE_URL']) as driver:
pgq = PgQueuer(driver)
pgq.entrypoint(DISPATCH_ENTRYPOINT)(dispatch)
return pgq
Run with pgq run myapp.worker:main (see Background tasks (pgqueuer) for the
full worker story including pgq install, retry, concurrency).
The dispatch handler:
Decodes the recipient id from the job payload.
Loads the recipient + message rows.
Skips if the recipient is missing or already past
PENDING(re-fire idempotency).Looks up the configured transport for the recipient’s method; marks the row failed if no transport is registered for that method.
Calls
transport.send. Stampsstatus='sent'+sent_aton success, orstatus='failed'+ truncatederroron raise.
The audit trail lives on the row. No ad-hoc logging is required.
Lifecycle in one diagram¶
request handler
-> send_comm(session, queue, ...)
-> validate(context)
-> render(subject, body)
-> INSERT comm_messages
-> for recipient in recipients:
-> [preference check]
-> INSERT comm_recipients
-> queue.enqueue(DISPATCH_ENTRYPOINT, recipient.id)
-> session.commit() ← atomic; nothing has been *sent* yet
pgqueuer notifies the worker
-> dispatch(job)
-> load recipient + message
-> transport.send(...)
-> UPDATE comm_recipients SET status, sent_at | error
Pitfalls¶
payload is bytes¶
The platform handles encoding internally
(str(uuid).encode("utf-8")). You don’t construct payloads
yourself; you pass RecipientSpec instances.
Method names match transport keys¶
A RecipientSpec with method='fax' requires
a transport keyed 'fax' in your config. Mismatches are caught
at dispatch time, not config time – the recipient row gets stamped
status='failed' with error='no transport for method ...'.
This is intentional: a comm-type’s
default_methods is just
a hint, and you may want to add new methods independently of the
comm-type registry.
Re-rendering¶
The MessageMixin row stores the rendered
subject and body. If you change a template, the audit log
continues to reflect what the recipient actually saw. To re-send
with the new template, send a new comm.
Schema migrations¶
The mixins supply columns; you own the table. Run alembic against
your Base.metadata as usual. The columns themselves don’t
churn between kiln releases unless explicitly noted in the
changelog – the platform’s contract is the columns, not the
table.
Forgot pgq install¶
The dispatch worker is just another pgqueuer worker. pgq
install is required once per environment for the queue to
work at all – see Background tasks (pgqueuer).