-
Notifications
You must be signed in to change notification settings - Fork 642
GH-3031: Defer SMLC shutdown for pending replies #3147
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
||
private @Nullable PendingReplyProvider pendingReplyProvider; | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
double empty line, remove L138
*/ | ||
int getPendingReplyCount(); | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the review, @injae-kim! I'll apply these changes right away.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please, review our Contribution Guideline: https://github.com/spring-projects/spring-amqp/blob/main/CONTRIBUTING.adoc.
Looks like you are missing to agree with DCO.
Without that we cannot review your PR.
Thanks
This commit introduces a mechanism to delay the shutdown of a SimpleMessageListenerContainer if there are pending replies for request/reply operations. A new functional interface, `PendingReplyProvider`, is introduced and can be set on the container. `RabbitTemplate` now exposes a `getPendingReplyCount()` method to serve as this provider. When the provider is set, the container will wait up to the configured `shutdownTimeout` for the pending reply count to drop to zero before proceeding with the consumer cancellation. Signed-off-by: Jeongjun Min <m3k0813@gmail.com>
Hi @artembilan, sorry about that! I've amended the commit with the DCO sign-off. PTAL. Thanks! |
Signed-off-by: Jeongjun Min <m3k0813@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me know if my review makes sense!
Thank you!
* @since 4.0 | ||
* @see org.springframework.amqp.rabbit.core.RabbitTemplate#getPendingReplyCount() | ||
*/ | ||
public void setPendingReplyProvider(PendingReplyProvider pendingReplyProvider) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need this extra abstraction.
We already have a ListenerContainerAware
implemented on the RabbitTemplate
.
So, that getPendingReplyCount()
could be moved into that contract.
@@ -652,6 +679,8 @@ private void waitForConsumersToStart(Set<AsyncMessageProcessingConsumer> process | |||
|
|||
@Override | |||
protected void shutdownAndWaitOrCallback(@Nullable Runnable callback) { | |||
waitForPendingReplies(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this is a right place for such a logic.
I believe it has to be a part of that awaitShutdown
in the end of this shutdownAndWaitOrCallback()
method.
} | ||
catch (InterruptedException e) { | ||
Thread.currentThread().interrupt(); | ||
logger.warn("Interrupted while waiting for pending replies."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need so much logic here.
Currently, we utilize an ActiveObjectCounter
for the consumers to be cancelled in this listener container.
I believe we can use it in the RabbitTemplate
as well for those pending replies to be fulfilled.
Then in that awaitShutdown
hook we could take this ActiveObjectCounter
from the ListenerContainerAware
and wait on it as we do for consumers.
|
||
RabbitTemplate template = new RabbitTemplate(connectionFactory); | ||
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); | ||
container.setQueueNames("foo"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No foo/bar
language, please, in the project.
Refactor the shutdown deferral mechanism based on pull request feedback: - Replace the custom `PendingReplyProvider` with the existing `ListenerContainerAware` interface and `ActiveObjectCounter`. - Move the waiting logic into the `awaitShutdown` runnable for better consistency with the existing shutdown process. Signed-off-by: Jeongjun Min <m3k0813@gmail.com>
Hi @artembilan, Thanks for the feedback! I've updated the PR with all the changes you suggested. Thank you! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please, add your name to all the affected classes.
Also, consider to add an entry into the whats-new.adoc
.
Thanks
* @since 4.0 | ||
*/ | ||
@Nullable | ||
default ActiveObjectCounter<Object> getPendingReplyCounter() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This annotation has to be next to the type, not method itself.
Therefore this style is preferred:
default @Nullable ActiveObjectCounter<Object>
@@ -692,6 +692,24 @@ protected void shutdownAndWaitOrCallback(@Nullable Runnable callback) { | |||
Runnable awaitShutdown = () -> { | |||
logger.info("Waiting for workers to finish."); | |||
try { | |||
ActiveObjectCounter<Object> replyCounter = null; | |||
Object listener = getMessageListener(); | |||
if (listener instanceof ListenerContainerAware) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can just do this if (getMessageListener() instanceof ListenerContainerAware listenerContainerAware) {
Then use that pattern matching variable in the block.
replyCounter = ((ListenerContainerAware) listener).getPendingReplyCounter(); | ||
} | ||
|
||
if (replyCounter != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be inside of the previous if
.
Plus I believe this still has to be conditional on the replyCounter.getCount() > 0
.
No reason to log anything and wait if there is really nothing to wait for.
And we probably can just extract this into a separate method to this code easier to read.
Thanks for the thorough review! |
- Move @nullable to type declaration - Use pattern matching for ListenerContainerAware - Skip waiting when replyCounter is null or count is zero - Extract shutdown wait logic into separate method - Add author tag and update whats-new.adoc Signed-off-by: Jeongjun Min <m3k0813@gmail.com>
container.setShutdownTimeout(shutdownTimeout); | ||
|
||
ActiveObjectCounter<Object> replyCounter = | ||
(ActiveObjectCounter<Object>) ReflectionTestUtils.getField(template, "pendingRepliesCounter"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need to use a reflection here since RabbitTemplate
provides for us already getPendingReplyCounter()
.
|
||
template.setReplyAddress(container.getQueueNames()[0]); | ||
|
||
long shutdownTimeout = 2000L; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need to wait for "never reply" so long.
We deliberately aware that reply never comes back to us in this testing scenario.
So, what is the point to block this test for those 2 seconds?
Now imaging that every single 3000 tests in the project is going to block for similar time.
That would be a disaster for development feedback loop.
I think we should make like 500
millis at most.
|
||
long startTime = System.currentTimeMillis(); | ||
container.stop(); | ||
long stopDuration = System.currentTimeMillis() - startTime; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The timing is always problematic for testing.
Let's see if we can spy
on the logger
of the SimpleMessageListenerContainer
and await().untilAsserted()
for the new warn
log message you have introduced in this change!
|
||
[[x40-enhancements]] | ||
=== Enhancements |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's name it as "MessageListenerContainer Changes"!
=== Enhancements | ||
|
||
Defer `SimpleMessageListenerContainer` shutdown for pending `RabbitTemplate` replies. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This info is not enough.
Let's see if my sentence would be better!
The `SimpleMessageListenerContainer` now awaits at most `shutdownTimeout` for pending replies from the provided `RabbitTemplate` listener on its shutdown.
This commit applies the final polishing suggestions from the review. - The test case is refactored to be more robust and efficient. It now verifies the warning log message via a spy instead of relying on unstable timing assertions. The shutdown timeout is also reduced to speed up the build. - The `whats-new.adoc` document is updated with the title and description suggested in the review. Signed-off-by: Jeongjun Min <m3k0813@gmail.com>
|
||
replyCounter.release(pending); | ||
await().atMost(Duration.ofMillis(500)).untilAsserted(() -> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think those max 30 seconds by default is fine with us in our tests.
It is really not going to be blocked that long.
Might really maximum those 500 millis.
However, if we have it as exact our given time, there is no guarantee that CPU resources will be available for us in time to satisfy this expectation.
|
||
assertThat(stopDuration).isGreaterThanOrEqualTo(shutdownTimeout - 500); | ||
replyCounter.release(pending); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this call makes any difference or sense for the test logic.
Therefore let's remove altogether to avoid extra reading noise!
Improve test robustness by increasing the await timeout and remove unnecessary cleanup code per review feedback. Signed-off-by: Jeongjun Min <m3k0813@gmail.com>
Thank you for your detailed and helpful feedback! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
Will merge when build is green.
Thanks
This was great contribution; @m3k0813 ; looking forward for more! |
Nice work @m3k0813 👍👍👍 |
Thank you for the kind words and for merging! |
Right. So, even that 1 second is not enough. I still believe that we have to leave it with default 30 seconds. |
Had to rework logic in the test: 521ff0a. For me, in debug mode, it looks like |
The new test logic makes a lot of sense and looks great. |
Closes #3031
Summary
This pull request introduces a mechanism to defer the shutdown of a
SimpleMessageListenerContainer
to allow in-flight request/reply messages to be received before the consumer is cancelled.Problem
Currently, when using request/reply messaging with a fixed reply queue, a graceful shutdown can cause the listener container to stop immediately. If a reply message arrives shortly after, it is lost because the consumer is already gone. This forces the application to treat the operation as a timeout or error, as described in the issue.
Solution
The solution introduces new configuration on
SimpleMessageListenerContainer
to make it aware of pending replies from a provider, such as aRabbitTemplate
.PendingReplyProvider
, is introduced to decouple the container from the template.RabbitTemplate
now exposes agetPendingReplyCount()
method, which can be used as aPendingReplyProvider
.SimpleMessageListenerContainer
has two new properties:setPendingReplyProvider(PendingReplyProvider)
: When set, this enables the shutdown delay logic.setPendingReplyCheckInterval(long)
: Configures the polling interval while waiting for replies during shutdown.With this configuration, the container's shutdown logic (
shutdownAndWaitOrCallback
) now checks for pending replies and waits for up to the configuredshutdownTimeout
before proceeding with the normal shutdown process.Testing
testShutdownWithPendingReplies()
, has been added toSimpleMessageListenerContainerTests
to verify this new behavior.