# Soniq - Full canonical docs (for LLM context) > This file concatenates the six canonical Soniq pages so an agent can load the whole working surface in one fetch. For the indexed list of all docs (including tutorials, guides, recipes, production, migration), see [llms.txt](llms.txt). Pages included, in order: 1. Quickstart - canonical end-to-end shape 2. API: Jobs - decorator, enqueue, schedule, JobContext, cron DSL 3. API: Worker - CLI, run_worker, env-var configuration 4. API: Hooks - before_job, after_job, on_error 5. Guide: Transactional enqueue - the killer feature, four working patterns 6. Reference: Glossary - one-paragraph definitions of every term the docs use --- # Page 1: Quickstart *(source: docs/quickstart.md)* # Quickstart Get a job running in under 5 minutes. ## 1. Install ```bash pip install soniq ``` You will need a running PostgreSQL. If you do not have one handy, `docker run -p 5432:5432 -e POSTGRES_PASSWORD=postgres postgres:16` will do. ## 2. Define a job ```python # jobs.py import asyncio from soniq import Soniq app = Soniq(database_url="postgresql://localhost/myapp") @app.job() async def send_welcome(to: str): print(f"Sending welcome email to {to}") if __name__ == "__main__": asyncio.run(app.enqueue(send_welcome, to="dev@example.com")) ``` ## 3. Set up the database ```bash soniq setup ``` Creates the tables Soniq needs. The command is idempotent (safe to run more than once - re-running it does not break anything). ## 4. Start a worker ```bash SONIQ_DATABASE_URL="postgresql://localhost/myapp" \ SONIQ_JOBS_MODULES="jobs" \ soniq worker --concurrency 4 ``` `SONIQ_JOBS_MODULES` tells the worker which Python modules to import on startup, so the `@app.job` decorators have a chance to run and register the functions. Without it, the worker has no idea which jobs exist. The worker process also needs to be able to actually import your job code - see [Job module discovery](getting-started/installation.md#job-module-discovery) for cross-service setups and per-worker overrides. ## 5. Enqueue a job In another terminal: ```bash python jobs.py ``` The worker prints "Sending welcome email to dev@example.com". You have a working background queue. ## What changes in production The code above works, but you will want to tighten a few things before deploying for real. > **New to job queues?** The terms below (idempotent, at-least-once) are explained inline as they appear, and the [tutorial](tutorial/01-defining-jobs.md) covers each in depth. The quickstart is just here to show the mechanics - do not feel like you have to internalize all of this in one sitting. - **Use environment variables** instead of hardcoding `database_url`. Soniq reads `SONIQ_DATABASE_URL` automatically. - **Set `SONIQ_JOBS_MODULES`** so workers can import your job functions on startup. - **Run `soniq setup` only once per deploy**, not from every replica's startup. See [going to production](production/going-to-production.md). - **Make handlers idempotent** - meaning safe to run more than once with the same end result. Soniq guarantees *at-least-once* delivery: if a worker crashes after running your function but before marking the job done, the job will run again on another worker. This is normal for every job queue. The fix is to design your handler so a re-run is harmless: use database upserts (`INSERT ... ON CONFLICT DO UPDATE`) instead of plain inserts, check whether you already sent the email before sending it again, or store an idempotency token alongside the side effect. - **Tune timeouts.** Every job has a default 300-second timeout. Override per-job with `@app.job(timeout=600)`. ## Where to go next **Just starting out?** Work through the [tutorial](tutorial/01-defining-jobs.md) in order. It is six chapters, takes about 30 minutes, and walks through every concept you will need. **Already comfortable with job queues?** Skip ahead based on what you are doing: - [FastAPI guide](guides/fastapi.md) - the most common producer shape - [Going to production](production/going-to-production.md) - the eight things that matter for a healthy deploy - [Migrating from Celery](migration/from-celery.md) or [from RQ](migration/from-rq.md) --- # Page 2: API - Jobs *(source: docs/api/jobs.md)* # Jobs Everything about defining, enqueueing, scheduling, and inspecting jobs. ## @app.job decorator Registers a function as a job. Always called with parentheses, even with no kwargs. ```python app = Soniq(database_url="postgresql://localhost/myapp") @app.job() async def send_email(to: str, subject: str, body: str): ... @app.job(retries=5, queue="urgent") async def send_password_reset(to: str, token: str): ... ``` ### Parameters All parameters are optional. | Parameter | Type | Default | Description | |---|---|---|---| | `name` | `str \| None` | `None` | Explicit task name. When omitted, derived from `f"{module}.{qualname}"` (Celery-style). Pass an explicit value for cross-service deployments where the name is a wire-protocol identifier. | | `retries` | `int` | `3` | Maximum retry attempts on failure. Alias: `max_retries`. | | `priority` | `int` | `100` | Lower number = higher priority. Range: 1--1000. | | `queue` | `str` | `"default"` | Queue name for this job. | | `unique` | `bool` | `False` | Deduplicate by arguments hash. If a matching job is already queued, the enqueue is skipped. | | `retry_delay` | `int \| float \| list[int \| float]` | `0` | Seconds to wait before each retry. Pass a list to set per-attempt delays (e.g. `[1, 5, 30]`). | | `retry_backoff` | `bool` | `False` | Apply exponential backoff to `retry_delay`. | | `retry_max_delay` | `int \| float \| None` | `None` | Cap on retry delay in seconds. | | `timeout` | `int \| float \| None` | `None` | Per-job timeout in seconds. `None` uses the global `job_timeout` setting (default 300s). | | `validate` | `type[BaseModel] \| None` | `None` | Pydantic model for argument validation at enqueue time. Alias: `args_model`. | ```python @app.job( retries=5, priority=10, queue="urgent", retry_delay=[1, 5, 30, 60], timeout=120, ) async def process_payment(order_id: int, amount: float): ... ``` ## enqueue() Dispatches a registered job for processing. Three input shapes: ```python # 1. Callable (single-repo) job_id = await app.enqueue(send_email, to="a@b.com", subject="Hi", body="Hello") # 2. String task name (cross-service / by-name) job_id = await app.enqueue( "users.send_email", args={"to": "a@b.com", "subject": "Hi", "body": "Hello"}, ) # 3. TaskRef (typed cross-repo stub) job_id = await app.enqueue(send_email_ref, args={"to": "a@b.com", "subject": "Hi", "body": "Hello"}) ``` ### Signature ```python async def enqueue( target, # Callable, string task name, or TaskRef *, args: dict | None = None, # Function args (string / TaskRef shapes) priority: int = None, # Override the job's default priority queue: str = None, # Override the job's default queue scheduled_at: datetime = None, # Run at a specific time (UTC) unique: bool = None, # Override the job's default uniqueness dedup_key: str = None, # Custom deduplication key (instead of args hash) connection = None, # Asyncpg connection for transactional enqueue **func_kwargs, # Function args (callable shape) ) -> str # Returns job UUID ``` `target` is the first positional argument and selects the input shape: - **Callable**: function args travel as `**func_kwargs`. Don't pass `args=`. - **String task name**: function args travel in `args=dict`. Don't pass `**func_kwargs` (they would collide with enqueue options). - **`TaskRef`**: function args travel in `args=dict` and are validated against `ref.args_model` if set. All option parameters are optional. When omitted, the values from the `@app.job` registration apply (or system defaults if no local registration). ### Transactional enqueue Pass a `connection` to enqueue a job inside an existing database transaction. If the transaction rolls back, the job is never created. ```python await app.ensure_initialized() async with app.backend.acquire() as conn: async with conn.transaction(): await conn.execute("INSERT INTO orders (id) VALUES ($1)", order_id) await app.enqueue(fulfill_order, connection=conn, order_id=order_id) ``` Transactional enqueue requires the PostgreSQL backend. ## schedule() Schedule a job for future execution. ```python from datetime import datetime, timezone # Absolute UTC datetime await app.schedule( send_report, run_at=datetime(2025, 1, 1, 9, 0, tzinfo=timezone.utc), user_id=42, ) ``` ```python async def schedule( target, run_at, # UTC datetime, or seconds-from-now (int/float) *, args: dict | None = None, **kwargs, ) -> str # Returns job UUID ``` `app.schedule()` is a thin wrapper around `app.enqueue()` that sets `scheduled_at=run_at`. ## @app.periodic() Declares a job that runs on a recurring schedule. Single decorator: registers the function as a regular `@app.job` and stamps the schedule on it. The scheduler process (`soniq scheduler`) picks up all `@periodic` functions automatically. ```python from datetime import timedelta from soniq import cron, daily, every @app.periodic(cron=daily().at("09:00"), name="reports.daily") async def daily_report(): ... @app.periodic(cron=every(10).minutes(), queue="maintenance", name="cleanup") async def cleanup_old_sessions(): ... @app.periodic(every=timedelta(seconds=30), name="metrics.flush") async def flush_metrics(): ... ``` ### Parameters | Parameter | Type | Description | |---|---|---| | `cron` | `str` or builder | A 5-field cron expression, or any object whose `__str__` is one (e.g. `daily().at("09:00")` from `soniq.schedules`). | | `every` | `timedelta` or `int`/`float` | Interval between runs. Use `timedelta` for clarity; ints are treated as seconds. | | `**job_kwargs` | | Any parameter accepted by `@app.job` (`name`, `queue`, `priority`, `retries`, etc.). `name` is optional and falls back to `f"{module}.{qualname}"`. | Rules: - Specify exactly one of `cron=` or `every=`. - They cannot be combined. Requires a running `soniq scheduler` process to actually fire the jobs. ## JobContext Runtime metadata injected into your job function. Declare a parameter with type annotation `JobContext` and Soniq fills it in automatically. ```python from soniq import JobContext @app.job() async def process_order(order_id: int, ctx: JobContext): print(f"Job {ctx.job_id}, attempt {ctx.attempt} of {ctx.max_attempts}") ``` ### Attributes | Attribute | Type | Description | |---|---|---| | `job_id` | `str` | UUID of this job. | | `job_name` | `str` | Fully qualified name (`module.function`). | | `attempt` | `int` | Current attempt number (starts at 1). | | `max_attempts` | `int` | Total allowed attempts (`retries + 1`). | | `queue` | `str` | Queue this job is running in. | | `worker_id` | `str \| None` | UUID of the worker processing this job. | | `scheduled_at` | `datetime \| None` | When the job was scheduled to run, if it was delayed. | | `created_at` | `datetime \| None` | When the job was created. | `JobContext` is a frozen dataclass. It is read-only. ## JobStatus Enum of the lifecycle states a `soniq_jobs` row can hold. ```python from soniq import JobStatus ``` | Value | Meaning | |---|---| | `JobStatus.QUEUED` | Waiting to be picked up by a worker. Retries also re-enter this state. | | `JobStatus.PROCESSING` | Currently being executed. | | `JobStatus.DONE` | Completed successfully. | | `JobStatus.CANCELLED` | Cancelled before execution. | Jobs that exhaust all retries are moved into the `soniq_dead_letter_jobs` table; they do not remain in `soniq_jobs`. See [Dead-letter queue](../reference/dead-letter.md). ## Imperative scheduling: `app.scheduler` For schedules computed at runtime (per-tenant, per-flag, ...) use the `Scheduler` service exposed on the Soniq instance: ```python await app.scheduler.add( target=cleanup, # callable, task-name string, or pass name= cron="0 9 * * *", # OR: every=timedelta(...) args={"region": "US"}, queue="reports", priority=10, ) await app.scheduler.pause("reports.daily") await app.scheduler.resume("reports.daily") await app.scheduler.remove("reports.daily") schedules = await app.scheduler.list(status="active") sched = await app.scheduler.get("reports.daily") ``` Schedules are keyed by the resolved task name. Re-adding the same name updates the schedule in place rather than creating a duplicate. ## Cron-string DSL `soniq.schedules` (also re-exported from the `soniq` package root) is a small, pure-Python builder layer that returns plain cron strings: ```python from datetime import timedelta from soniq import cron, daily, every, monthly, weekly every(5).minutes() # "*/5 * * * *" every(2).hours() # "0 */2 * * *" every(30).seconds() # timedelta(seconds=30) daily().at("09:00") # "0 9 * * *" weekly().on("monday").at("09:00") # "0 9 * * 1" monthly().on_day(15).at("12:00") # "0 12 15 * *" cron("*/15 * * * *") # identity passthrough ``` Each terminal returns a `str`, so `cron=daily().at("09:00")` plugs straight into `@app.periodic(cron=...)` or `app.scheduler.add(cron=...)`. --- # Page 3: API - Worker *(source: docs/api/worker.md)* # Worker Workers fetch jobs from the database, execute them, and update their status. **The primary way to run a worker is the `soniq worker` CLI** -- run it from your process manager (systemd, Kubernetes, supervisord) and let the manager handle restarts and scaling. The `Worker` class and `app.run_worker()` are documented here for advanced use cases (tests, embedding, custom orchestration). ## soniq worker ```bash soniq worker soniq worker --concurrency 8 --queues emails,billing soniq worker --run-once # process available jobs once and exit ``` Reads `SONIQ_DATABASE_URL` (and other `SONIQ_*` settings) from the environment. See the CLI reference for the full flag list. ## run_worker() The in-process entry point. Use this for tests or when embedding Soniq inside a larger Python application that owns the lifecycle. ```python app = Soniq(database_url="postgresql://localhost/myapp") await app.run_worker( concurrency=4, run_once=False, queues=None, ) ``` ### Parameters | Parameter | Type | Default | Description | |---|---|---|---| | `concurrency` | `int` | `4` | Number of concurrent asyncio tasks fetching and executing jobs. | | `run_once` | `bool` | `False` | Process all available jobs and exit. Useful for testing and cron-driven setups. | | `queues` | `list[str] \| None` | `None` | Restrict to these queue names. `None` means process all queues. This is the in-process default; the `soniq worker` CLI also defaults to all queues when `--queues` is omitted. | ### What happens during run_worker 1. The app auto-initializes if needed (connects to the database, runs lazy setup). 2. A `Worker` instance is created with the app's backend, job registry, settings, and hooks. 3. In continuous mode, the worker registers itself in the database, starts a heartbeat loop, subscribes to `LISTEN/NOTIFY` for instant job pickup, and launches `concurrency` processing tasks. 4. In `run_once` mode, the worker processes available jobs sequentially until the queue is empty, then returns. ## Worker configuration via environment variables These environment variables control worker behavior when using the CLI or default settings: | Env var | Default | Description | |---|---|---| | `SONIQ_CONCURRENCY` | `4` | Default concurrency (overridden by `--concurrency` flag). | | `SONIQ_POLL_INTERVAL` | `5.0` | Seconds to wait when no jobs are available before polling again. Also the `LISTEN/NOTIFY` timeout. | | `SONIQ_HEARTBEAT_INTERVAL` | `5.0` | Seconds between heartbeat updates. | | `SONIQ_HEARTBEAT_TIMEOUT` | `300.0` | Seconds after which a worker with no heartbeat is considered stale. | | `SONIQ_CLEANUP_INTERVAL` | `300.0` | Seconds between expired-job and stale-worker cleanup runs. | | `SONIQ_ERROR_RETRY_DELAY` | `5.0` | Seconds to sleep after an unexpected worker-level error before resuming. | | `SONIQ_JOBS_MODULES` | (empty) | Comma-separated list of modules to import on worker startup. Required by the CLI. See [Job module discovery](../getting-started/installation.md#job-module-discovery). | !!! note "Queue selection" The `soniq worker` CLI worker processes **all queues** when `--queues` is not passed. There is no env-var equivalent on the CLI entrypoint; pass `--queues=name1,name2` to scope a worker. `SONIQ_QUEUES` only affects the programmatic `Soniq(queues=...)` setting, not the CLI worker default. ## Worker class (advanced) Most users never instantiate `Worker` directly. It is documented here for contributors and users who need fine-grained control. ```python from soniq.worker import Worker from soniq.core.registry import JobRegistry from soniq.settings import SoniqSettings worker = Worker( backend=backend, # A StorageBackend instance registry=registry, # A JobRegistry with registered jobs settings=settings, # SoniqSettings (optional, uses global defaults) hooks=hooks, # Dict of hook lists (optional) ) ``` ### Constructor parameters | Parameter | Type | Default | Description | |---|---|---|---| | `backend` | `StorageBackend` | (required) | The storage backend (PostgresBackend, SQLiteBackend, or MemoryBackend). | | `registry` | `JobRegistry` | (required) | Job registry containing all `@app.job()` registrations. | | `settings` | `SoniqSettings \| None` | `None` | Settings instance. Falls back to global settings when `None`. | | `hooks` | `dict \| None` | `None` | Hook dictionary with keys `"before_job"`, `"after_job"`, `"on_error"`, each mapping to a list of callables. | ### run() ```python async def run( self, concurrency: int = 4, run_once: bool = False, queues: list[str] | None = None, ) -> bool ``` Returns `True` if any jobs were processed. ### run_once() ```python async def run_once( self, queues: list[str] | None = None, max_jobs: int | None = None, ) -> bool ``` Process available jobs and return. Pass `max_jobs` to cap how many jobs are processed in one call. ## Graceful shutdown In continuous mode, the worker installs signal handlers for `SIGINT` and `SIGTERM`. On receiving either signal: 1. The shutdown event is set. 2. Running job tasks are cancelled. 3. The worker deregisters itself from the database. 4. The `LISTEN/NOTIFY` connection is released. A second signal during shutdown forces an immediate exit. ## LISTEN/NOTIFY When the backend supports push notifications (PostgreSQL), the worker subscribes to the `soniq_new_job` channel. When a job is enqueued, the worker wakes up immediately instead of waiting for the next poll cycle. This keeps latency low without hammering the database with frequent polls. If `LISTEN/NOTIFY` setup fails (for example behind PgBouncer in transaction mode), the worker falls back to polling at `poll_interval`. --- # Page 4: API - Hooks *(source: docs/api/hooks.md)* # Hooks Hooks let you run code before and after every job, and when a job fails. They are useful for logging, metrics, tracing, and alerting without modifying your job functions. ## Registering hooks Use the decorator methods on your `Soniq` instance: ```python app = Soniq(database_url="postgresql://localhost/myapp") @app.before_job async def log_start(job_name: str, job_id: str, attempt: int): print(f"Starting {job_name} ({job_id}), attempt {attempt}") @app.after_job async def log_success(job_name: str, job_id: str, duration_ms: float): print(f"Completed {job_name} ({job_id}) in {duration_ms}ms") @app.on_error async def log_failure(job_name: str, job_id: str, error: str, attempt: int): print(f"Failed {job_name} ({job_id}): {error} (attempt {attempt})") ``` You can register multiple hooks for the same event. They run in registration order. ## Hook signatures ### @app.before_job Called immediately before a job function executes. ```python async def before_job_hook(job_name: str, job_id: str, attempt: int) -> None ``` | Argument | Type | Description | |---|---|---| | `job_name` | `str` | Fully qualified job name (`module.function`). | | `job_id` | `str` | UUID of the job. | | `attempt` | `int` | Current attempt number. | ### @app.after_job Called after a job completes successfully. ```python async def after_job_hook(job_name: str, job_id: str, duration_ms: float) -> None ``` | Argument | Type | Description | |---|---|---| | `job_name` | `str` | Fully qualified job name. | | `job_id` | `str` | UUID of the job. | | `duration_ms` | `float` | Wall-clock execution time in milliseconds. | ### @app.on_error Called when a job raises an exception. Runs before the retry/dead-letter decision. ```python async def on_error_hook(job_name: str, job_id: str, error: str, attempt: int) -> None ``` | Argument | Type | Description | |---|---|---| | `job_name` | `str` | Fully qualified job name. | | `job_id` | `str` | UUID of the job. | | `error` | `str` | String representation of the exception. | | `attempt` | `int` | The attempt number that failed. | ## Execution order For a successful job: ``` before_job -> job function -> after_job ``` For a failed job: ``` before_job -> job function (raises) -> on_error ``` `after_job` and `on_error` are mutually exclusive for a given execution. `before_job` always runs, even if the job will ultimately fail. ## Sync and async hooks Hooks can be either sync or async. Soniq inspects each hook at call time and awaits it if it is a coroutine function. ```python # Async hook @app.before_job async def async_hook(job_name, job_id, attempt): await some_async_operation() # Sync hook works too @app.before_job def sync_hook(job_name, job_id, attempt): some_sync_operation() ``` ## Error handling in hooks If a hook raises an exception, Soniq logs a warning and continues. A broken hook never prevents a job from running or its result from being recorded. ``` WARNING - Hook before_job failed: ConnectionError(...) ``` This means hooks are safe for non-critical operations like metrics and logging. If you need a hook failure to stop job execution, handle that logic inside the job function itself. ## Practical examples ### Structured logging ```python import logging import json logger = logging.getLogger("soniq.hooks") @app.before_job async def structured_log_start(job_name, job_id, attempt): logger.info(json.dumps({ "event": "job_started", "job_name": job_name, "job_id": job_id, "attempt": attempt, })) @app.after_job async def structured_log_done(job_name, job_id, duration_ms): logger.info(json.dumps({ "event": "job_completed", "job_name": job_name, "job_id": job_id, "duration_ms": duration_ms, })) ``` ### Prometheus metrics ```python from prometheus_client import Counter, Histogram jobs_started = Counter("soniq_jobs_started", "Jobs started", ["job_name"]) jobs_completed = Histogram("soniq_jobs_duration_ms", "Job duration", ["job_name"]) jobs_failed = Counter("soniq_jobs_failed", "Jobs failed", ["job_name"]) @app.before_job async def track_start(job_name, job_id, attempt): jobs_started.labels(job_name=job_name).inc() @app.after_job async def track_duration(job_name, job_id, duration_ms): jobs_completed.labels(job_name=job_name).observe(duration_ms) @app.on_error async def track_failure(job_name, job_id, error, attempt): jobs_failed.labels(job_name=job_name).inc() ``` ### Slack alerting on dead-letter ```python @app.on_error async def alert_on_final_failure(job_name, job_id, error, attempt): # Look up the job's max_attempts from the registry job_meta = app._get_job_registry().get_job(job_name) if job_meta and attempt >= job_meta["max_retries"] + 1: await send_slack_message( f"Job {job_name} ({job_id}) moved to dead-letter queue: {error}" ) ``` ### OpenTelemetry tracing ```python from opentelemetry import trace tracer = trace.get_tracer("soniq") _spans = {} @app.before_job async def start_span(job_name, job_id, attempt): span = tracer.start_span(f"job:{job_name}", attributes={ "job.id": job_id, "job.attempt": attempt, }) _spans[job_id] = span @app.after_job async def end_span_success(job_name, job_id, duration_ms): span = _spans.pop(job_id, None) if span: span.set_attribute("job.duration_ms", duration_ms) span.end() @app.on_error async def end_span_error(job_name, job_id, error, attempt): span = _spans.pop(job_id, None) if span: span.set_status(trace.StatusCode.ERROR, error) span.end() ``` --- # Page 5: Guide - Transactional enqueue *(source: docs/guides/transactional-enqueue.md)* # Transactional Enqueue Enqueue a job inside a database transaction. If the transaction rolls back, the job never enters the queue. ## How it works When you pass `connection=conn` to `enqueue()`, the job row is inserted into `soniq_jobs` using that connection. The INSERT is part of your transaction, so the job only becomes visible to workers after `COMMIT`. Either both your business data and the job are committed, or neither is. ## Whose pool is this? Soniq manages an internal `asyncpg` connection pool that it uses for its own writes (claiming jobs, heartbeats, etc.). When you write transactional enqueue code, the question of *which pool* you should use comes up almost immediately. The short answer is: **any raw `asyncpg` connection inside an active transaction works**. It can be Soniq's pool, your own app's pool, or one extracted from your ORM session. Soniq doesn't care -- it just inserts the job row on the connection you hand it. That gives you four working patterns, depending on what your app already uses: 1. **Raw `asyncpg`, Soniq's pool** -- simplest. No extra setup. Good for scripts and small services that don't already manage a pool. 2. **Raw `asyncpg`, your own pool** -- you manage your own `asyncpg.Pool` and pass acquired connections directly. Best when you already have a pool for your application queries. 3. **SQLAlchemy async** -- extract the underlying `asyncpg` connection from a `AsyncSession`. The most common FastAPI shape. 4. **Tortoise ORM** -- pull `asyncpg` connection from `in_transaction()` via a private attribute (caveat below). The four patterns are interchangeable. If your team uses SQLAlchemy, use the SQLAlchemy pattern. If you write raw SQL, the asyncpg pattern is fine. ## Pattern 1: Raw asyncpg via Soniq's pool The simplest pattern. Borrow a connection from Soniq's own backend pool. No second pool, no extraction step: ```python await eq.ensure_initialized() async with eq.backend.acquire() as conn: async with conn.transaction(): await conn.execute("INSERT INTO orders (id, total) VALUES ($1, $2)", order_id, total) await eq.enqueue(send_invoice, connection=conn, order_id=order_id) ``` The `connection=conn` parameter is the only thing that changes from a normal enqueue call. Everything else works the same -- job options, priority, scheduling. ## FastAPI route example A real-world order creation endpoint where the order record and the follow-up job are committed atomically: ```python from contextlib import asynccontextmanager from fastapi import FastAPI from soniq import Soniq eq = Soniq(database_url="postgresql://localhost/myapp") @asynccontextmanager async def lifespan(app: FastAPI): yield await eq.close() app = FastAPI(lifespan=lifespan) @eq.job(queue="invoices", max_retries=5) async def send_invoice(order_id: int): order = await get_order(order_id) await generate_and_send_invoice(order) @app.post("/orders") async def create_order(product_id: int, quantity: int): await eq.ensure_initialized() async with eq.backend.acquire() as conn: async with conn.transaction(): order_id = await conn.fetchval( "INSERT INTO orders (product_id, quantity) VALUES ($1, $2) RETURNING id", product_id, quantity, ) await eq.enqueue(send_invoice, connection=conn, order_id=order_id) return {"order_id": order_id} ``` If the INSERT fails or anything else raises inside the transaction block, both the order row and the job are rolled back. ## Pattern 2: Bring your own asyncpg pool If your app already manages its own `asyncpg.Pool` (common in apps that predate Soniq), you don't need to involve Soniq's pool at all. Pass the connection from *your* pool to `enqueue` and the job row goes through your transaction: ```python import asyncpg from fastapi import FastAPI app = FastAPI() eq = Soniq(database_url=os.environ["DATABASE_URL"]) db_pool: asyncpg.Pool # your application's pool @app.on_event("startup") async def _on_startup(): global db_pool db_pool = await asyncpg.create_pool(os.environ["DATABASE_URL"]) @app.post("/orders") async def create_order(product_id: int, quantity: int): async with db_pool.acquire() as conn: async with conn.transaction(): order_id = await conn.fetchval( "INSERT INTO orders (product_id, quantity) VALUES ($1, $2) RETURNING id", product_id, quantity, ) await eq.enqueue(send_invoice, connection=conn, order_id=order_id) return {"order_id": order_id} ``` This is the cleanest pattern when you have an existing pool: there is no extraction step and no second pool to size. Soniq is a passenger on a connection your app already owned. > Both pools must point at the **same Postgres database**. Soniq's job tables and your application tables share one database; the connection just has to be in a transaction on that database. ## Pattern 3: SQLAlchemy async Most FastAPI apps use SQLAlchemy. Soniq does not have a native SQLAlchemy integration, but you can extract the underlying `asyncpg` connection from an `AsyncSession` and use it as you would any other: ```python from fastapi import Depends, FastAPI from sqlalchemy.ext.asyncio import AsyncSession app = FastAPI() eq = Soniq(database_url=os.environ["DATABASE_URL"]) @app.post("/orders") async def create_order( product_id: int, quantity: int, db: AsyncSession = Depends(get_db), ): async with db.begin(): order = Order(product_id=product_id, quantity=quantity) db.add(order) await db.flush() # populate order.id without committing # Reach down to the raw asyncpg connection. raw_conn = await db.connection() asyncpg_conn = raw_conn.sync_connection.connection.driver_connection await eq.enqueue(send_invoice, connection=asyncpg_conn, order_id=order.id) # SQLAlchemy commits here. The job becomes visible to workers at the same moment. ``` Two things to know: - **You must use the asyncpg driver.** `create_async_engine("postgresql+asyncpg://...")`. The connection-extraction path does not work with `psycopg3` async; the attribute chain is different. - **SQLAlchemy stays in charge.** Soniq does not open a separate transaction. It writes onto the same connection, inside the same transaction, that SQLAlchemy is managing. When SQLAlchemy commits, the job becomes visible. ## Pattern 4: Tortoise ORM Tortoise exposes the raw `asyncpg` connection on the transaction context, but only via a private attribute. It works today; treat it as a soft API. ```python from tortoise.transactions import in_transaction @app.post("/orders") async def create_order(product_id: int, quantity: int): async with in_transaction() as conn: order = await Order.create( using_db=conn, product_id=product_id, quantity=quantity, ) # conn._connection is the raw asyncpg connection. Private attribute, # works in current Tortoise releases. A first-class Tortoise integration # is on the roadmap. await eq.enqueue(send_invoice, connection=conn._connection, order_id=order.id) ``` If you'd like first-class Tortoise support that does not depend on a private attribute, please open an issue -- it's not a hard integration to write, we just want to gauge demand before adding the dependency. ## Use cases **Order + invoice.** Create the order and enqueue the invoice generation in one transaction. No orphaned orders without invoices. **User signup + welcome email.** Insert the user row and enqueue the welcome email together. If the INSERT hits a unique constraint, no phantom email gets sent. **Payment + receipt.** Record the payment and enqueue the receipt delivery atomically. No "payment recorded but receipt never sent" bugs. The common thread: any workflow where "row exists but job is missing" would be a data integrity bug. ## Delivery semantics Transactional enqueue guarantees the job enters the queue if and only if the transaction commits. It does not guarantee single execution. Soniq provides **at-least-once delivery**. If a worker crashes after executing the job but before marking it done, stale worker recovery will re-queue it. Design your job functions to be idempotent -- use upserts, deduplication keys, or check-before-act patterns for side effects like sending emails or charging payments. ## What transactional enqueue does NOT guarantee - **Single execution.** The guarantee applies to enqueue only. Re-execution after worker crashes is still possible. - **Rollback after commit.** Once committed, the job is in the queue. You can cancel it with `eq.cancel_job(job_id)`, but a fast worker might pick it up first. > **Note:** Transactional enqueue requires PostgreSQL. It is not available with the SQLite or Memory backends. Calling `enqueue(..., connection=conn)` on a non-PostgreSQL backend raises a `ValueError`. --- # Page 6: Reference - Glossary *(source: docs/reference/glossary.md)* # Glossary One-paragraph definitions for the words Soniq's docs use. ## Job A unit of work. Either the function you decorate with `@app.job` (the **handler**), or a specific row in `soniq_jobs` representing one scheduled invocation of that function (the **job instance**). Most of the docs say "job" for both and the meaning is clear from context. When ambiguity matters, "handler" and "job row" are the disambiguating terms. ## Handler The Python function that runs when a job is processed. Registered via `@app.job`. ## Task name The string identifier under which a handler is registered. By default Soniq derives it from `f"{module}.{qualname}"` (Celery-style). Override with `@app.job(name="...")`. The task name is the wire protocol for cross-service deployments - producers refer to handlers by name, not by import. ## TaskRef A typed stub that lets a producer enqueue a job by name without importing the consumer's code. Created via `task_ref("billing.send_invoice", InvoiceArgs)`. Use it for cross-service producer/consumer setups where the producer should not own the handler implementation. See [cross-service jobs](../guides/cross-service-jobs.md). ## Worker A long-running process that polls (or receives `NOTIFY` for) jobs, claims them via `SELECT ... FOR UPDATE SKIP LOCKED`, and runs the handler. Started with `soniq worker`. Multiple workers can compete for the same queue safely. ## Scheduler A separate long-running process (`soniq scheduler`) that evaluates `@app.periodic` jobs and writes due instances into `soniq_jobs` for workers to pick up. The CLI worker does not evaluate recurring jobs - that is intentional, so worker scaling does not duplicate scheduler work. Multiple scheduler instances coordinate via a Postgres advisory lock; only one does work at a time. ## Queue A `varchar` column on `soniq_jobs` used for routing. Workers can be started with `--queues=foo,bar` to consume from a subset. Default queue name is `"default"`. Queues are not separate tables - just a routing tag. ## Concurrency The number of in-flight jobs a single worker process will run at once. Default is 4. Tune up for I/O-bound workloads, down for CPU-bound. Set with `--concurrency` or `SONIQ_WORKER_CONCURRENCY`. ## Heartbeat Periodic write each worker performs to `soniq_workers` to advertise liveness. If a worker stops heartbeating for `SONIQ_HEARTBEAT_TIMEOUT` seconds (default 300), surviving workers detect it as stale, reset its in-flight jobs to `queued`, and remove its row. ## Dead-letter queue (DLQ) The `soniq_dead_letter_jobs` table. Jobs that exhaust all retries land here instead of staying in `soniq_jobs`. Use `app.dead_letter.list()`, `app.dead_letter.replay(job_id)`, or the `soniq dead-letter` CLI to inspect and recover them. ## Replay Re-running a dead-letter job by minting a fresh `soniq_jobs` row from it. The original DLQ row stays put with `resurrection_count` incremented; replay does not delete the dead-letter record. ## At-least-once delivery Soniq's delivery guarantee. A job will run at least once but may run more than once if a worker crashes mid-handler before marking the row done. Handlers must be **idempotent** - rerunning them must produce the same end state. Soniq does not offer an exactly-once mode; nothing on Postgres alone can. ## Transactional enqueue Enqueueing a job inside the same Postgres transaction as your business writes by passing `connection=conn` to `enqueue()`. If the transaction rolls back, the job row is also rolled back. The reason most teams pick a Postgres-backed queue. Postgres-only - SQLite and the in-memory backend do not support it. ## Backend The storage layer. The production backend is `asyncpg` against PostgreSQL. SQLite and an in-memory backend exist for local development and tests; both have hard limitations (single-writer, no transactional enqueue, polling-only) and are not for production.