Jobs¶
Everything about defining, enqueueing, scheduling, and inspecting jobs.
@app.job decorator¶
Registers a function as a job. Always called with parentheses, even with no kwargs.
app = Soniq(database_url="postgresql://localhost/myapp")
@app.job()
async def send_email(to: str, subject: str, body: str):
...
@app.job(retries=5, queue="urgent")
async def send_password_reset(to: str, token: str):
...
Parameters¶
All parameters are optional.
| Parameter | Type | Default | Description |
|---|---|---|---|
name |
str \| None |
None |
Explicit task name. When omitted, derived from f"{module}.{qualname}" (Celery-style). Pass an explicit value for cross-service deployments where the name is a wire-protocol identifier. |
retries |
int |
3 |
Maximum retry attempts on failure. Alias: max_retries. |
priority |
int |
100 |
Lower number = higher priority. Range: 1--1000. |
queue |
str |
"default" |
Queue name for this job. |
unique |
bool |
False |
Deduplicate by arguments hash. If a matching job is already queued, the enqueue is skipped. |
retry_delay |
int \| float \| list[int \| float] |
0 |
Seconds to wait before each retry. Pass a list to set per-attempt delays (e.g. [1, 5, 30]). |
retry_backoff |
bool |
False |
Apply exponential backoff to retry_delay. |
retry_max_delay |
int \| float \| None |
None |
Cap on retry delay in seconds. |
timeout |
int \| float \| None |
None |
Per-job timeout in seconds. None uses the global job_timeout setting (default 300s). |
validate |
type[BaseModel] \| None |
None |
Pydantic model for argument validation at enqueue time. Alias: args_model. |
@app.job(
retries=5,
priority=10,
queue="urgent",
retry_delay=[1, 5, 30, 60],
timeout=120,
)
async def process_payment(order_id: int, amount: float):
...
enqueue()¶
Dispatches a registered job for processing. Three input shapes:
# 1. Callable (single-repo)
job_id = await app.enqueue(send_email, to="a@b.com", subject="Hi", body="Hello")
# 2. String task name (cross-service / by-name)
job_id = await app.enqueue(
"users.send_email",
args={"to": "a@b.com", "subject": "Hi", "body": "Hello"},
)
# 3. TaskRef (typed cross-repo stub)
job_id = await app.enqueue(send_email_ref, args={"to": "a@b.com", "subject": "Hi", "body": "Hello"})
Signature¶
async def enqueue(
target, # Callable, string task name, or TaskRef
*,
args: dict | None = None, # Function args (string / TaskRef shapes)
priority: int = None, # Override the job's default priority
queue: str = None, # Override the job's default queue
scheduled_at: datetime = None, # Run at a specific time (UTC)
unique: bool = None, # Override the job's default uniqueness
dedup_key: str = None, # Custom deduplication key (instead of args hash)
connection = None, # Asyncpg connection for transactional enqueue
**func_kwargs, # Function args (callable shape)
) -> str # Returns job UUID
target is the first positional argument and selects the input shape:
- Callable: function args travel as
**func_kwargs. Don't passargs=. - String task name: function args travel in
args=dict. Don't pass**func_kwargs(they would collide with enqueue options). TaskRef: function args travel inargs=dictand are validated againstref.args_modelif set.
All option parameters are optional. When omitted, the values from the @app.job registration apply (or system defaults if no local registration).
Transactional enqueue¶
Pass a connection to enqueue a job inside an existing database transaction.
If the transaction rolls back, the job is never created.
await app.ensure_initialized()
async with app.backend.acquire() as conn:
async with conn.transaction():
await conn.execute("INSERT INTO orders (id) VALUES ($1)", order_id)
await app.enqueue(fulfill_order, connection=conn, order_id=order_id)
Transactional enqueue requires the PostgreSQL backend.
enqueue_many()¶
Bulk-enqueue many jobs that share the same target. Returns a list of job IDs in input order.
ids = await app.enqueue_many(
send_email,
[
{"to": "a@example.com", "subject": "Hi", "body": "Hello"},
{"to": "b@example.com", "subject": "Hi", "body": "Hello"},
{"to": "c@example.com", "subject": "Hi", "body": "Hello"},
],
queue="emails", # optional, applies to all rows
priority=50, # optional, applies to all rows
)
Signature¶
async def enqueue_many(
target, # Callable, string task name, or TaskRef
args_list: list[dict], # One args dict per job
*,
queue: str | None = None, # Shared override for all rows
priority: int | None = None, # Shared override for all rows
scheduled_at = None, # Shared override for all rows
) -> list[str]
On Postgres this issues one batched INSERT round-trip; on SQLite/Memory it loops over create_job. Each args_list[i] is validated against the registered args_model (if any) before any row is written.
enqueue_many() does not support unique=True or dedup_key=. If the registered job declares unique=True, or you need per-row dedup, call enqueue() in a loop instead.
schedule()¶
Schedule a job for future execution.
from datetime import datetime, timezone
# Absolute UTC datetime
await app.schedule(
send_report,
run_at=datetime(2025, 1, 1, 9, 0, tzinfo=timezone.utc),
user_id=42,
)
async def schedule(
target,
run_at, # UTC datetime, or seconds-from-now (int/float)
*,
args: dict | None = None,
**kwargs,
) -> str # Returns job UUID
app.schedule() is a thin wrapper around app.enqueue() that sets scheduled_at=run_at.
@app.periodic()¶
Declares a job that runs on a recurring schedule. Single decorator: registers the
function as a regular @app.job and stamps the schedule on it. The scheduler
process (soniq scheduler) picks up all @periodic functions automatically.
from datetime import timedelta
from soniq import cron, daily, every
@app.periodic(cron=daily().at("09:00"), name="reports.daily")
async def daily_report():
...
@app.periodic(cron=every(10).minutes(), queue="maintenance", name="cleanup")
async def cleanup_old_sessions():
...
@app.periodic(every=timedelta(seconds=30), name="metrics.flush")
async def flush_metrics():
...
Parameters¶
| Parameter | Type | Description |
|---|---|---|
cron |
str or builder |
A 5-field cron expression, or any object whose __str__ is one (e.g. daily().at("09:00") from soniq.schedules). |
every |
timedelta or int/float |
Interval between runs. Use timedelta for clarity; ints are treated as seconds. |
**job_kwargs |
Any parameter accepted by @app.job (name, queue, priority, retries, etc.). name is optional and falls back to f"{module}.{qualname}". |
Rules:
- Specify exactly one of cron= or every=.
- They cannot be combined.
Requires a running soniq scheduler process to actually fire the jobs.
JobContext¶
Runtime metadata injected into your job function. Declare a parameter with
type annotation JobContext and Soniq fills it in automatically.
from soniq import JobContext
@app.job()
async def process_order(order_id: int, ctx: JobContext):
print(f"Job {ctx.job_id}, attempt {ctx.attempt} of {ctx.max_attempts}")
Attributes¶
| Attribute | Type | Description |
|---|---|---|
job_id |
str |
UUID of this job. |
job_name |
str |
Fully qualified name (module.function). |
attempt |
int |
Current attempt number (starts at 1). |
max_attempts |
int |
Total allowed attempts (retries + 1). |
queue |
str |
Queue this job is running in. |
worker_id |
str \| None |
UUID of the worker processing this job. |
scheduled_at |
datetime \| None |
When the job was scheduled to run, if it was delayed. |
created_at |
datetime \| None |
When the job was created. |
JobContext is a frozen dataclass. It is read-only.
JobStatus¶
Enum of the lifecycle states a soniq_jobs row can hold.
| Value | Meaning |
|---|---|
JobStatus.QUEUED |
Waiting to be picked up by a worker. Retries also re-enter this state. |
JobStatus.PROCESSING |
Currently being executed. |
JobStatus.DONE |
Completed successfully. |
JobStatus.CANCELLED |
Cancelled before execution. |
Jobs that exhaust all retries are moved into the soniq_dead_letter_jobs
table; they do not remain in soniq_jobs. See
Dead-letter queue.
Imperative scheduling: app.scheduler¶
For schedules computed at runtime (per-tenant, per-flag, ...) use the
Scheduler service exposed on the Soniq instance:
await app.scheduler.add(
target=cleanup, # callable, task-name string, or pass name=
cron="0 9 * * *", # OR: every=timedelta(...)
args={"region": "US"},
queue="reports",
priority=10,
)
await app.scheduler.pause("reports.daily")
await app.scheduler.resume("reports.daily")
await app.scheduler.remove("reports.daily")
schedules = await app.scheduler.list(status="active")
sched = await app.scheduler.get("reports.daily")
Schedules are keyed by the resolved task name. Re-adding the same name updates the schedule in place rather than creating a duplicate.
Cron-string DSL¶
soniq.schedules (also re-exported from the soniq package root) is a
small, pure-Python builder layer that returns plain cron strings:
from datetime import timedelta
from soniq import cron, daily, every, monthly, weekly
every(5).minutes() # "*/5 * * * *"
every(2).hours() # "0 */2 * * *"
every(30).seconds() # timedelta(seconds=30)
daily().at("09:00") # "0 9 * * *"
weekly().on("monday").at("09:00") # "0 9 * * 1"
monthly().on_day(15).at("12:00") # "0 12 15 * *"
cron("*/15 * * * *") # identity passthrough
Each terminal returns a str, so cron=daily().at("09:00") plugs straight
into @app.periodic(cron=...) or app.scheduler.add(cron=...).