-
-
Notifications
You must be signed in to change notification settings - Fork 1.6k
[FIXED] Ephemeral consumers always selects an online server #7165
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Signed-off-by: Maurice van Veen <github@mauricevanveen.com>
@@ -7640,12 +7641,6 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec | |||
|
|||
// We need to set the ephemeral here before replicating. | |||
if !isDurableConsumer(cfg) { | |||
// We chose to have ephemerals be R=1 unless stream is interest or workqueue. | |||
// Consumer can override. | |||
if sa.Config.Retention == LimitsPolicy && cfg.Replicas <= 1 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where is this logic now handled?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's handled as part of: replicas := cfg.replicas(sa.Config)
// Calculate accurate replicas for the consumer config with the parent stream config.
func (consCfg ConsumerConfig) replicas(strCfg *StreamConfig) int {
if consCfg.Replicas == 0 || consCfg.Replicas > strCfg.Replicas {
if !isDurableConsumer(&consCfg) && strCfg.Retention == LimitsPolicy && consCfg.Replicas == 0 {
// Matches old-school ephemerals only, where the replica count is 0.
return 1
}
return strCfg.Replicas
}
return consCfg.Replicas
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I more meant the selection of the rg.Peers
and the name selection for the consumer..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's already handled by cc.createGroupForConsumer
// First shuffle the active peers and then select to account for replica = 1.
rand.Shuffle(len(active), func(i, j int) { active[i], active[j] = active[j], active[i] })
peers = active[:replicas]
}
...
return &raftGroup{Name: groupNameForConsumer(peers, storage), Storage: storage, Peers: peers}
Will have a longer look at this tomorrow morning. |
Confirmed to be working. We now always select the right peer set with the right amount of peers in |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
A Limits-based R3 stream can have ephemeral consumers that are R1. Ephemeral consumers don't specify a replica count, instead they specify
Replicas: 0
, to be inferred automatically based on the stream retention and the amount of stream replicas.Before this PR,
createGroupForConsumer
would check there's at least one online server hosting the stream, but it would still select a peer set (R3 in this example) equal to the stream. After creating the group, it would be "fixed" by doingrg.Peers = []string{rg.Preferred}
if it's an ephemeral consumer.In 2.12, due to this fix: #7083,
rg.setPreferred()
will always only select online servers. But on 2.11 and prior, it could randomly select any node, even ones that are offline. Even though this will be fixed as well in 2.12, the real fix is to use the real replica count increateGroupForConsumer
, immediately selecting the right peer set and not try fixing it outside of it.Signed-off-by: Maurice van Veen github@mauricevanveen.com