You've got something wonky going on with that query plan for the 2nd partition by attempt. In particular the seq scan on tasks to do the `(tasks.id = eligible_tasks.id)` hash join seems odd. The filter on queued status in `CTE eligible_tasks` (and not in the last join) also seems weird. Is that plan for the same query in the article?<p>If you add an index on `group_key, id WHERE status = 'queued'` and remove the 2nd `WHERE tasks."status" = 'QUEUED'` (I believe that's redundant?), you might get a better plan. You'd want something to make sure you're not preferring one group_key as well.<p>I think you should be able to solve your problem with workers having zero tasks by moving the LIMIT into the second CTE?<p>It's also useful in practice to have something like a worker_id and timestamp and not just set status to RUNNING in case a worker gets stuck/dies and you need to unclaim the work.
At a previous job we did something similar but ended up having workers first poll another table to determine which tenant to query against. We called these items tokens and they represented a finite amount of dedicated thread time for processing a specific tenants’ queue.<p>What this looked like was a worker thread would first query the token table for which tenant to process eligible tasks from, and then update the token to take a timed lock and during that time would solely process eligible tasks from a specific tenant.<p>This has some nice properties:<p>1. You can scale different tenants using different amounts of tokens which means different but controlled amounts of thread time.<p>2. It allows for locality on your worker thread. Within a specific tenant the processing was usually similar so any shared resources could be cached and reused after polling for additional eligible tasks from the tenants queue.
Is the `FOR UPDATE SKIP LOCKED` in the CTE necessary? Granted my understanding of Postgres row-level locking and their interaction with CTEs may be a bit lacking, but according to the docs[1]:<p><pre><code> The sub-statements in WITH are executed concurrently with each other and with the main query. Therefore, when using data-modifying statements in WITH, the order in which the specified updates actually happen is unpredictable. All the statements are executed with the same snapshot (see Chapter 13), so they cannot “see” one another's effects on the target tables.
</code></pre>
1. <a href="https://www.postgresql.org/docs/current/queries-with.html#QUERIES-WITH-MODIFYING" rel="nofollow">https://www.postgresql.org/docs/current/queries-with.html#QU...</a>
Is a fair queue worth it vs spinning up more capacity? I've worked on multiple projects where we've ended up ripping out a queue and just spinning up more machines to handle the load synchronously instead.
Very cool. Bookmarked in case I ever need to do this.<p>I have implemented a transactional outbox in postgres using a simpler version of this plus a trigger to notify listening workers. It worked well and vastly outpaced the inserting processes. It easily handled several million tasks per hour without issue.<p>It is also nice the article showed the correct CTE based form of the query. It is possible to naively write the query without it and sometimes get way more tasks than you asked for when there are concurrent workers. I discovered that pretty quickly but it had me pulling my hair out…
I have my queue workers maintain a list of task owners they've already processed, and prefer to get a task from an owner they've least-recently seen (or haven't seen) using `ORDER BY array_position(:seen_owner_ids, owner_id) desc`. Each new task's owner_id is inserted into the front of the list (and removed elsewhere if it exists).<p>But I have a relatively small number of possible `owner_id` values at any given time.
Interesting read, but it seems like the grouping challenge could be fairly trivially solved in the application layer by registering job types (groups) with the poller, and having the round robin logic there.
This is pretty fancy stuff! Sorry if I'm just not reading carefully enough, but does this approach account for tenants whose messages take longer to process, as opposed to a tenant that sends a larger volume of messages?
Thanks for this write up. Really interesting I've built queues using Postgres before but never anything this complex so I'm sure this article will come in use and be handy in future!
Super cool! I was looking at the self hosted quickstart, and it looks like Docker compare installs both hatchet and RabitMQ. Does hatchet use rabbit alongside Postgres?