Skip to content

Deployment shapes

Soniq is one class for both producers and consumers. Producer-vs-consumer is a deployment convention, not a class hierarchy or a constructor flag. The same Soniq runs in both roles; what differs is what the deployment imports and what it executes.

Producer service

A producer service connects to the queue, enqueues jobs by name, and never starts a worker. It does not need to import the handler modules: the canonical task name is the contract.

from soniq import Soniq

app = Soniq(database_url="postgresql://...")

await app.enqueue("billing.send", args={"order_id": 7})

If a producer accidentally calls run_worker(...), it polls the queue with zero registered handlers and dead-letters jobs by Job not registered. That's a fast, actionable failure - the misconfiguration shows up in the worker's first poll, not at 3am.

Consumer service

A consumer service registers handlers with @app.job and runs a worker:

from soniq import Soniq

app = Soniq(database_url="postgresql://...")

@app.job(name="billing.send")
async def send(order_id: int): ...

await app.run_worker()

The same instance can also enqueue (the worker process itself often re-enqueues follow-up jobs). The "consumer" label is just shorthand for "this deployment runs a worker."

When the producer and the consumer live in different repos or services, share a small stub package that declares the canonical task name and an args model. Both sides import it; neither side needs to know about the other's internals.

# billing_tasks/__init__.py - imported by both producer and consumer
from soniq import task_ref
from pydantic import BaseModel

class SendInvoiceArgs(BaseModel):
    order_id: str
    customer: str

SEND_INVOICE = task_ref(
    name="billing.send_invoice",
    args_model=SendInvoiceArgs,
    default_queue="billing",
)

Producer service:

from billing_tasks import SEND_INVOICE, SendInvoiceArgs

await app.enqueue(SEND_INVOICE, args=SendInvoiceArgs(
    order_id="o1", customer="acme"
).model_dump())

Consumer service:

from billing_tasks import SEND_INVOICE, SendInvoiceArgs

@app.job(name=SEND_INVOICE.name, validate=SendInvoiceArgs)
async def send_invoice(order_id: str, customer: str): ...

await app.run_worker(queues=["billing"])

The TaskRef carries the canonical name, the args model, and the default queue across the wire. The producer gets validation at enqueue time; the consumer gets validation again at dispatch time. No shared runtime, no class hierarchy.