Skip to content

Conversation

Robban1980
Copy link
Contributor

@Robban1980 Robban1980 commented Apr 11, 2025

Pull Request type

  • Bugfix
  • Feature
  • Refactoring (no functional changes, no api changes)
  • Build related changes
  • WHOSUSING.md
  • Other (please describe):

NOTE: Please remember to run ./gradlew spotlessApply to fix any format violations.

Changes in this PR

This PR fixes a bug in the PostgreQueueDAO.popMessage() method, where the SQL query does not limit the update to the provided queue_name, which can result in unintentionally popping messages from other queues and returning too many rows.

The issue stems from the SQL used: it updates all messages matching the message_ids returned from a subquery, but does not check the associated queue_name. Since the table uses a composite primary key of (queue_name, message_id), both fields must be provided to uniquely identify a message.

To reproduce the bug:

  1. Enable conductor.workflow-status-listener.type=queue_publisher but do not configure any event handlers to consume the events.
  2. Run a few workflows that publish events to the queue, allowing it to fill with data.
  3. Execute the original query (note that queue_name is only included in RETURNING for debugging):
BEGIN TRANSACTION;
UPDATE queue_message SET popped = true 
WHERE  message_id IN (SELECT message_id 
                    FROM queue_message 
                    WHERE queue_name = '_callbackFailureQueue' -- Only messages from the _callbackFailureQueue should be popped
                    AND popped = false 
                    AND deliver_on <= (current_timestamp + (1000 ||' microseconds')::interval) 
                    ORDER BY priority DESC, deliver_on, created_on 
                    LIMIT 2 -- Limiting to 2 results here
                    FOR UPDATE SKIP LOCKED) 
RETURNING queue_name, message_id, priority, payload

Note

The queue_name name in the RETURNING is not part of the original query it was added in the sample here to show the issue.

This query may return rows like:
"queue_name" "message_id" "priority"
"_callbackFinalizeQueue" "98936225-c0d7-4963-a140-550d0ee4d57f" 0
"_callbackFailureQueue" "856b1b6f-e6ea-4dc7-bd91-6248e5ef4ae9" 0
"_callbackFinalizeQueue" "856b1b6f-e6ea-4dc7-bd91-6248e5ef4ae9" 0
"_callbackFailureQueue" "98936225-c0d7-4963-a140-550d0ee4d57f" 0

We intended to pop messages only from _callbackFailureQueue, but ended up updating rows from multiple queues due to ambiguous use of message_id.

Fixed query (with CTE):

BEGIN TRANSACTION;
WITH cte AS (
    SELECT queue_name, message_id
    FROM queue_message
    WHERE 
        queue_name = '_callbackFailureQueue' 
        AND popped = false 
        AND deliver_on <= (current_timestamp + (1000 || ' microseconds')::interval)
    ORDER BY priority DESC, deliver_on, created_on
    LIMIT 2
    FOR UPDATE SKIP LOCKED
)
UPDATE queue_message
SET popped = true
FROM cte
WHERE 
    queue_message.queue_name = cte.queue_name 
    AND queue_message.message_id = cte.message_id
    AND queue_message.popped = false
RETURNING queue_message.queue_name, queue_message.message_id, queue_message.priority, queue_message.payload;

Note

The queue_name name in the RETURNING is not part of the original query it was added in the sample here to show the issue is fixed.

This query correctly returns:
"queue_name" "message_id" "priority"
"_callbackFailureQueue" "856b1b6f-e6ea-4dc7-bd91-6248e5ef4ae9" 0
"_callbackFailureQueue" "98936225-c0d7-4963-a140-550d0ee4d57f" 0

Only the intended messages in the specified queue are updated.

Another advantage of using the CTE is that it allows PostgreSQL to optimize the query plan more effectively as the dataset grows.

This new CTE-based approach also ensures that a message hasn't been popped by another process while the CTE was running, preventing unnecessary updates in high-concurrency scenarios.

Issue #369

…n the specific queue and only non popped messages.

Switching to CTE for
- better performance at scale
- more conistent query planning at scale
@Robban1980 Robban1980 changed the title [BUG] PostgresQueueDAO method popMessage does not honor only updating messages in queue x and only returning y number of messages [BUG] PostgreQueueDAO.popMessage may pop messages from incorrect queues and return more messages than expected Apr 11, 2025
@manan164
Copy link
Contributor

Hi @Robban1980 , Good find. Can you please write a small test to assert the behavior?

@Robban1980
Copy link
Contributor Author

Hi @manan164 I have pushed a new test that tests for the issue and confirms the new SQL is working as intended.

@Robban1980
Copy link
Contributor Author

@manan164 could you merge this one?

@v1r3n v1r3n merged commit c866da7 into conductor-oss:main Apr 18, 2025
2 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants