PgQueuer is a minimalist, high-performance job queue library for Python, leveraging the robustness of PostgreSQL. Designed for simplicity and efficiency, PgQueuer uses PostgreSQL's LISTEN/NOTIFY to manage job queues effortlessly.
Does the celery SQLAlchemy broker support PostgreSQL's LISTEN/NOTIFY features?<p>Similar support in SQLite would simplify testing applications built with celery.<p>How to add table event messages to SQLite so that the SQLite broker has the same features as AMQP? Could a vtable facade send messages on tablet events?<p>Are there sqlite Triggers?<p>Celery > Backends and Brokers:
<a href="https://docs.celeryq.dev/en/stable/getting-started/backends-and-brokers/index.html" rel="nofollow">https://docs.celeryq.dev/en/stable/getting-started/backends-...</a><p>/? sqlalchemy listen notify:
<a href="https://www.google.com/search?q=sqlalchemy+listen+notify" rel="nofollow">https://www.google.com/search?q=sqlalchemy+listen+notify</a> :<p>asyncpg.Connection.add_listener<p>sqlalchemy.event.listen, @listen_for<p>psychopg2 conn.poll(), while connection.notifies<p>psychopg2 > docs > advanced > Advanced notifications:
<a href="https://www.psycopg.org/docs/advanced.html#asynchronous-notifications" rel="nofollow">https://www.psycopg.org/docs/advanced.html#asynchronous-noti...</a><p>PgQueuer.db, PgQueuer.listeners.add_listener; asyncpg add_listener: <a href="https://github.com/janbjorge/PgQueuer/blob/main/src/PgQueuer/db.py">https://github.com/janbjorge/PgQueuer/blob/main/src/PgQueuer...</a><p>asyncpg/tests/test_listeners.py:
<a href="https://github.com/MagicStack/asyncpg/blob/master/tests/test_listeners.py">https://github.com/MagicStack/asyncpg/blob/master/tests/test...</a><p>/? sqlite LISTEN NOTIFY: <a href="https://www.google.com/search?q=sqlite+listen+notify" rel="nofollow">https://www.google.com/search?q=sqlite+listen+notify</a><p>sqlite3 update_hook: <a href="https://www.sqlite.org/c3ref/update_hook.html" rel="nofollow">https://www.sqlite.org/c3ref/update_hook.html</a>
We use listen notify extensively and it is great. The things it lacks most for us is guaranteed single recipient. All subscribers get all notifications which leads to problems in determining who should act on the message n our case.
How does LISTEN/NOTIFY compare to using select for update skip locked? I thought listen/notify can lose queue items when the process crashes? Is that true? Do you need to code for those cases in some manner?
I am going to go the other direction on this... to anyone reading this, please consider using a backend-generic queueing system for your Python project.<p>Why? Mainly because those systems offer good affordances for testing and running locally in an operationally simple way. They also tend to have decent default answers for various futzy questions around disconnects at various parts of the workflow.<p>We all know Celery is a buggy pain in the butt, but rolling your own job queue likely ends up with you just writing a similary-buggy pain in the butt. We've already done "Celery but simpler", it's stuff like Dramatiq!<p>If you have backend-specific needs, you won't listen to this advice. But think deeply how important your needs are. Computers are fast, and you can deal with a lot of events with most systems.<p>Meanwhile if you use a backend-generic system... well you could write a backend using PgQueuer!
I've been referring to this post about issues with Celery: <a href="https://docs.hatchet.run/blog/problems-with-celery">https://docs.hatchet.run/blog/problems-with-celery</a><p>Does PgQueuer address any of them?
Broadcaster is one we use in production for PUB/SUB stuff with OPA/OPAL. <a href="https://pypi.org/project/broadcaster/" rel="nofollow">https://pypi.org/project/broadcaster/</a>
How does this compare to the popular Graphile Worker library?<p><a href="https://github.com/graphile/worker">https://github.com/graphile/worker</a>
Although I am more of a MySQL guy, I have been exploring PostgreSQL from sometime. Seems it has lot of features out of box.<p>This is very interesting tool.
I’ve been thinking about the potential for PostgreSQL-backed job queue libraries to share a common schema. For instance, I’m a big fan of Oban in Elixir: <a href="https://github.com/sorentwo/oban">https://github.com/sorentwo/oban</a><p>Given that there are many Sidekiq-compatible libraries across various languages, it might be beneficial to have a similar approach for PostgreSQL-based job queues. This could allow for job processing in different languages while maintaining compatibility.<p>Alternatively, we could consider developing a core job queue library in Rust, with language-specific bindings. This would provide a robust, cross-language solution while leveraging the performance and safety benefits of Rust.
This looks like a great task queue, I'm a massive proponent of "Postgres is all you need" [0] and doubling down on it with my project that takes it to the extreme.<p>What I would love is a Postgres task queue that does multi-step pipelines, with fan out and accumulation. In my view a structured relational database is a particularly good backend for that as it inherently can model the structure. Is that something you have considered exploring?<p>The one thing with listen/notify that I find lacking is the max payload size of 8k, it somewhat limits its capability without having to start saving stuff to tables. What I would really like is a streaming table, with a schema and all the rich type support... maybe one day.<p>0: <a href="https://www.amazingcto.com/postgres-for-everything/" rel="nofollow">https://www.amazingcto.com/postgres-for-everything/</a>
BTW: Good PostgresFM episode on implementing queues in Postgres, various caveats etc: <a href="https://www.youtube.com/watch?v=mW5z5NYpGeA" rel="nofollow">https://www.youtube.com/watch?v=mW5z5NYpGeA</a> .
The Symfony framework (PHP) provides a similar feature, which also relies on LISTEN/NOTIFY and FOR UPDATE SKIP LOCKED: <a href="https://symfony.com/doc/current/messenger.html#doctrine-transport" rel="nofollow">https://symfony.com/doc/current/messenger.html#doctrine-tran...</a><p>It also supports many other backends including AMQP, Beanstalkd, Redis and various cloud services.<p>This component, called Messenger, can be installed as a standalone library in any PHP project.<p>(Disclaimer: I’m the author of the PostgreSQL transport for Symfony Messenger).
You might also want to look at River (<a href="https://github.com/riverqueue/river">https://github.com/riverqueue/river</a>) for inspiration as they support scheduled jobs, etc.<p>From an end-user perspective, they also have a UI which is nice to have for debugging.
There is also Procrastinate: <a href="https://procrastinate.readthedocs.io/en/stable/index.html" rel="nofollow">https://procrastinate.readthedocs.io/en/stable/index.html</a><p>Procrastinate also uses PostgreSQL's LISTEN/NOTIFY (but can optionally be turned off and use polling). It also supports many features (and more are planned), like sync and async jobs (it uses asyncio under the hood), periodic tasks, retries, task locks, priorities, job cancellation/aborting, Django integration (optional).<p>DISCLAIMER: I am a co-maintainer of Procrastinate.
Good Job does the same for Rails<p><a href="https://github.com/bensheldon/good_job">https://github.com/bensheldon/good_job</a>
I really like the emergence of simple queuing tools for robust database management systems. Keep things simple and remove infrastructure complexity. Definitely a +1 from me!<p>For handling straightforward asynchronous tasks like sending opt-in emails, we've developed a similar library at All Quiet for C# and MongoDB: <a href="https://allquiet.app/open-source/mongo-queueing" rel="nofollow">https://allquiet.app/open-source/mongo-queueing</a><p>In this context:<p><pre><code> LISTEN/NOTIFY in PostgreSQL is comparable to MongoDB's change streams.
SELECT FOR UPDATE SKIP LOCKED in PostgreSQL can be likened to MongoDB's atomic read/update operations.</code></pre>
Most of the small python alternatives I've seen use Redis as backend:<p>- <a href="https://github.com/rq/rq">https://github.com/rq/rq</a>
- <a href="https://github.com/coleifer/huey">https://github.com/coleifer/huey</a>
- <a href="https://github.com/resque/resque">https://github.com/resque/resque</a>
The most simple job queue in MySQL:<p><pre><code> update job_table set key=value where ... limit 1
</code></pre>
It's simple and atomic. Unfortunately PG doesn't allow `update ... limit` syntax