Skip to content

Conversation

m3k0813
Copy link
Contributor

@m3k0813 m3k0813 commented Aug 6, 2025

Closes #3031

Summary

This pull request introduces a mechanism to defer the shutdown of a SimpleMessageListenerContainer to allow in-flight request/reply messages to be received before the consumer is cancelled.

Problem

Currently, when using request/reply messaging with a fixed reply queue, a graceful shutdown can cause the listener container to stop immediately. If a reply message arrives shortly after, it is lost because the consumer is already gone. This forces the application to treat the operation as a timeout or error, as described in the issue.

Solution

The solution introduces new configuration on SimpleMessageListenerContainer to make it aware of pending replies from a provider, such as a RabbitTemplate.

  • A new functional interface, PendingReplyProvider, is introduced to decouple the container from the template.
  • RabbitTemplate now exposes a getPendingReplyCount() method, which can be used as a PendingReplyProvider.
  • SimpleMessageListenerContainer has two new properties:
    • setPendingReplyProvider(PendingReplyProvider): When set, this enables the shutdown delay logic.
    • setPendingReplyCheckInterval(long): Configures the polling interval while waiting for replies during shutdown.

With this configuration, the container's shutdown logic (shutdownAndWaitOrCallback) now checks for pending replies and waits for up to the configured shutdownTimeout before proceeding with the normal shutdown process.

Testing

  • A new test case, testShutdownWithPendingReplies(), has been added to SimpleMessageListenerContainerTests to verify this new behavior.


private @Nullable PendingReplyProvider pendingReplyProvider;


Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

double empty line, remove L138

*/
int getPendingReplyCount();

}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image

can you add 1 more empty line at the end of this file?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review, @injae-kim! I'll apply these changes right away.

@m3k0813 m3k0813 changed the title issue-3031: Defer SMLC shutdown for pending replies GH-3031: Defer SMLC shutdown for pending replies Aug 6, 2025
Copy link
Member

@artembilan artembilan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, review our Contribution Guideline: https://github.com/spring-projects/spring-amqp/blob/main/CONTRIBUTING.adoc.

Looks like you are missing to agree with DCO.
Without that we cannot review your PR.

Thanks

This commit introduces a mechanism to delay the shutdown of a
SimpleMessageListenerContainer if there are pending replies for
request/reply operations.

A new functional interface, `PendingReplyProvider`, is introduced and
can be set on the container. `RabbitTemplate` now exposes a
`getPendingReplyCount()` method to serve as this provider.

When the provider is set, the container will wait up to the configured
`shutdownTimeout` for the pending reply count to drop to zero before
proceeding with the consumer cancellation.

Signed-off-by: Jeongjun Min <m3k0813@gmail.com>
@m3k0813
Copy link
Contributor Author

m3k0813 commented Aug 6, 2025

Please, review our Contribution Guideline: https://github.com/spring-projects/spring-amqp/blob/main/CONTRIBUTING.adoc.

Looks like you are missing to agree with DCO. Without that we cannot review your PR.

Thanks

Hi @artembilan, sorry about that! I've amended the commit with the DCO sign-off. PTAL. Thanks!

Signed-off-by: Jeongjun Min <m3k0813@gmail.com>
Copy link
Member

@artembilan artembilan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me know if my review makes sense!
Thank you!

* @since 4.0
* @see org.springframework.amqp.rabbit.core.RabbitTemplate#getPendingReplyCount()
*/
public void setPendingReplyProvider(PendingReplyProvider pendingReplyProvider) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need this extra abstraction.
We already have a ListenerContainerAware implemented on the RabbitTemplate.
So, that getPendingReplyCount() could be moved into that contract.

@@ -652,6 +679,8 @@ private void waitForConsumersToStart(Set<AsyncMessageProcessingConsumer> process

@Override
protected void shutdownAndWaitOrCallback(@Nullable Runnable callback) {
waitForPendingReplies();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is a right place for such a logic.
I believe it has to be a part of that awaitShutdown in the end of this shutdownAndWaitOrCallback() method.

}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.warn("Interrupted while waiting for pending replies.");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need so much logic here.
Currently, we utilize an ActiveObjectCounter for the consumers to be cancelled in this listener container.
I believe we can use it in the RabbitTemplate as well for those pending replies to be fulfilled.
Then in that awaitShutdown hook we could take this ActiveObjectCounter from the ListenerContainerAware and wait on it as we do for consumers.


RabbitTemplate template = new RabbitTemplate(connectionFactory);
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueueNames("foo");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No foo/bar language, please, in the project.

Refactor the shutdown deferral mechanism based on pull request feedback:

- Replace the custom `PendingReplyProvider` with the existing
  `ListenerContainerAware` interface and `ActiveObjectCounter`.
- Move the waiting logic into the `awaitShutdown` runnable for
  better consistency with the existing shutdown process.

Signed-off-by: Jeongjun Min <m3k0813@gmail.com>
@m3k0813
Copy link
Contributor Author

m3k0813 commented Aug 6, 2025

Hi @artembilan,

Thanks for the feedback! I've updated the PR with all the changes you suggested.
It should be ready for another look now. PTAL.

Thank you!

Copy link
Member

@artembilan artembilan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, add your name to all the affected classes.
Also, consider to add an entry into the whats-new.adoc.

Thanks

* @since 4.0
*/
@Nullable
default ActiveObjectCounter<Object> getPendingReplyCounter() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This annotation has to be next to the type, not method itself.
Therefore this style is preferred:

default @Nullable ActiveObjectCounter<Object>

@@ -692,6 +692,24 @@ protected void shutdownAndWaitOrCallback(@Nullable Runnable callback) {
Runnable awaitShutdown = () -> {
logger.info("Waiting for workers to finish.");
try {
ActiveObjectCounter<Object> replyCounter = null;
Object listener = getMessageListener();
if (listener instanceof ListenerContainerAware) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can just do this if (getMessageListener() instanceof ListenerContainerAware listenerContainerAware) {
Then use that pattern matching variable in the block.

replyCounter = ((ListenerContainerAware) listener).getPendingReplyCounter();
}

if (replyCounter != null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be inside of the previous if.
Plus I believe this still has to be conditional on the replyCounter.getCount() > 0.
No reason to log anything and wait if there is really nothing to wait for.

And we probably can just extract this into a separate method to this code easier to read.

@m3k0813
Copy link
Contributor Author

m3k0813 commented Aug 7, 2025

Thanks for the thorough review!
I’ve applied the suggestions and pushed the changes.

- Move @nullable to type declaration
- Use pattern matching for ListenerContainerAware
- Skip waiting when replyCounter is null or count is zero
- Extract shutdown wait logic into separate method
- Add author tag and update whats-new.adoc

Signed-off-by: Jeongjun Min <m3k0813@gmail.com>
@m3k0813 m3k0813 requested a review from artembilan August 7, 2025 10:41
container.setShutdownTimeout(shutdownTimeout);

ActiveObjectCounter<Object> replyCounter =
(ActiveObjectCounter<Object>) ReflectionTestUtils.getField(template, "pendingRepliesCounter");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to use a reflection here since RabbitTemplate provides for us already getPendingReplyCounter().


template.setReplyAddress(container.getQueueNames()[0]);

long shutdownTimeout = 2000L;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to wait for "never reply" so long.
We deliberately aware that reply never comes back to us in this testing scenario.
So, what is the point to block this test for those 2 seconds?
Now imaging that every single 3000 tests in the project is going to block for similar time.
That would be a disaster for development feedback loop.

I think we should make like 500 millis at most.


long startTime = System.currentTimeMillis();
container.stop();
long stopDuration = System.currentTimeMillis() - startTime;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The timing is always problematic for testing.

Let's see if we can spy on the logger of the SimpleMessageListenerContainer and await().untilAsserted() for the new warn log message you have introduced in this change!


[[x40-enhancements]]
=== Enhancements
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's name it as "MessageListenerContainer Changes"!

=== Enhancements

Defer `SimpleMessageListenerContainer` shutdown for pending `RabbitTemplate` replies.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This info is not enough.
Let's see if my sentence would be better!

The  `SimpleMessageListenerContainer` now awaits at most `shutdownTimeout` for pending replies from the provided `RabbitTemplate` listener on its shutdown.

This commit applies the final polishing suggestions from the review.

- The test case is refactored to be more robust and efficient. It now
  verifies the warning log message via a spy instead of relying on
  unstable timing assertions. The shutdown timeout is also reduced to
  speed up the build.

- The `whats-new.adoc` document is updated with the title and
  description suggested in the review.

Signed-off-by: Jeongjun Min <m3k0813@gmail.com>

replyCounter.release(pending);
await().atMost(Duration.ofMillis(500)).untilAsserted(() ->
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think those max 30 seconds by default is fine with us in our tests.
It is really not going to be blocked that long.
Might really maximum those 500 millis.
However, if we have it as exact our given time, there is no guarantee that CPU resources will be available for us in time to satisfy this expectation.


assertThat(stopDuration).isGreaterThanOrEqualTo(shutdownTimeout - 500);
replyCounter.release(pending);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this call makes any difference or sense for the test logic.
Therefore let's remove altogether to avoid extra reading noise!

Improve test robustness by increasing the await timeout and remove
unnecessary cleanup code per review feedback.

Signed-off-by: Jeongjun Min <m3k0813@gmail.com>
@m3k0813
Copy link
Contributor Author

m3k0813 commented Aug 7, 2025

Thank you for your detailed and helpful feedback!
I've pushed the final commit reflecting all of your suggestions, PTAL when you get a chance.

Copy link
Member

@artembilan artembilan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.
Will merge when build is green.
Thanks

@artembilan artembilan merged commit 275868e into spring-projects:main Aug 7, 2025
3 checks passed
@artembilan
Copy link
Member

This was great contribution; @m3k0813 ; looking forward for more!

@injae-kim
Copy link

Nice work @m3k0813 👍👍👍

@m3k0813
Copy link
Contributor Author

m3k0813 commented Aug 7, 2025

Thank you for the kind words and for merging!
Glad I could contribute — looking forward to helping more in the future!

@artembilan
Copy link
Member

Right. So, even that 1 second is not enough.
The test has failed: https://ge.spring.io/s/55ld5jaabojio/tests/task/:spring-rabbit:test/details/org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainerTests/testShutdownWithPendingReplies()?top-execution=1.

I still believe that we have to leave it with default 30 seconds.
Will fix shortly...

@artembilan
Copy link
Member

Had to rework logic in the test: 521ff0a.

For me, in debug mode, it looks like logger is not spied 🤷

@m3k0813
Copy link
Contributor Author

m3k0813 commented Aug 8, 2025

The new test logic makes a lot of sense and looks great.
Thanks for the fix and the explanation!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants