OmniQ is a Redis + Lua, language-agnostic job queue. This package is the Python client for OmniQ v1.
Key ideas:
- Hybrid lanes: ungrouped jobs by default, optional grouped jobs (FIFO per group + per-group concurrency).
- Lease-based execution: workers reserve a job with a time-limited lease.
- Token-gated ACK/heartbeat:
reserve()returns alease_tokenthat must be used byheartbeat()andack_*(). - Pause / resume (flag-only): pausing a queue prevents new reserves; it does not move jobs or stop running jobs.
Core project / docs: https://github.com/not-empty/omniq
pip install omniqfrom omniq.client import OmniqClient
uq = OmniqClient(
host="omniq-redis",
port=6379,
)
job_id = uq.publish(
queue="demo",
payload={"hello": "world"},
timeout_ms=30_000,
)
print("OK", job_id)import time
from omniq.client import OmniqClient
def handler(ctx):
print("Waiting 2 seconds")
time.sleep(2)
print("Done")
uq = OmniqClient(
host="omniq-redis",
port=6379,
)
uq.consume(
queue="demo",
handler=handler,
verbose=True,
drain=False,
)You can connect using a redis URL or the standard host/port fields.
from omniq.client import OmniqClient
# Option A: host/port
uq = OmniqClient(host="localhost", port=6379, db=0)
# Option B: Redis URL (recommended for TLS / auth)
uq = OmniqClient(redis_url="redis://:password@localhost:6379/0")The client automatically loads the Lua scripts on first use (or during init).
All methods below are on OmniqClient.
Enqueue a job.
job_id = uq.publish(
queue="demo",
payload={"hello": "world"}, # must be dict or list
job_id=None, # optional ULID (generated if omitted)
max_attempts=3,
timeout_ms=60_000,
backoff_ms=5_000,
due_ms=0, # schedule in the future (ms since epoch)
gid=None, # optional group id (string)
group_limit=0, # lazily initialize per-group limit (>0)
)Notes:
payloadmust be adictorlist(structured JSON). Passing a raw string is an error.- If
gidis provided, the job goes into that group’s FIFO lane. Group concurrency is controlled bygroup_limit(first writer wins).
Pause/resume is a queue-level flag.
uq.pause(queue="demo")
print(uq.is_paused(queue="demo")) # True
uq.resume(queue="demo")Behavior:
- Pause does not move jobs.
- Pause does not affect running jobs.
- Pause only blocks new reserves (reserve returns
PAUSED).
consume() is a convenience loop that:
- periodically runs
promote_delayed+reap_expired - calls
reserve() - runs your
handler(ctx) - heartbeats while the handler runs
- ACKs success / fail using the job’s
lease_token
uq.consume(
queue="demo",
handler=handler, # handler(ctx)
poll_interval_s=0.05,
promote_interval_s=1.0,
promote_batch=1000,
reap_interval_s=1.0,
reap_batch=1000,
heartbeat_interval_s=None, # None => derived from timeout_ms/2 (clamped)
verbose=False,
drain=True, # drain=True => finish current job on Ctrl+C then exit
)Your handler receives a ctx object with:
queuejob_idpayload_raw(JSON string)payload(parsed dict/list)attemptlock_until_mslease_tokengid
If you pass a gid when publishing, jobs are routed to that group.
uq.publish(queue="demo", payload={"i": 1}, gid="company:acme", group_limit=2)
uq.publish(queue="demo", payload={"i": 2}, gid="company:acme")- FIFO ordering is preserved within each group.
- Groups can run concurrently with each other.
- Concurrency within a group is limited by
group_limit(or the queue default set in the core config).
OmniQ v1 supports per-job overrides and queue defaults such as:
timeout_ms(lease duration)max_attemptsbackoff_mscompleted_keep(retention size)
See the core docs for the full contract and configuration details: https://github.com/not-empty/omniq
See the repository license.