-
Notifications
You must be signed in to change notification settings - Fork 3.6k
Automatic passivation for typed sharding #25765
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Automatic passivation for typed sharding #25765
Conversation
Test FAILed. |
Test FAILed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks like the right approach
some nitpick comments that you didn't ask for at this point..
...ster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ClusterShardingSettings.scala
Show resolved
Hide resolved
import context.dispatcher | ||
val passivateIdleTask = if (settings.passivateIdleEntityAfter.toNanos > 0) { | ||
val idleInterval = 1.second // FIXME | ||
log.info("Idle entities will be passivated after {}", PrettyDuration.format(settings.passivateIdleEntityAfter)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[{}]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also didn't think it quite through, now it's logged once for every started shard. Should be logged from the region rather.
var passivating = Set.empty[ActorRef] | ||
val messageBuffers = new MessageBufferMap[EntityId] | ||
|
||
var handOffStopper: Option[ActorRef] = None | ||
|
||
import context.dispatcher | ||
val passivateIdleTask = if (settings.passivateIdleEntityAfter.toNanos > 0) { | ||
val idleInterval = 1.second // FIXME |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably enough with half (or some other faction) of the total duration
@@ -240,6 +256,9 @@ private[akka] class Shard( | |||
val id = idByRef(ref) | |||
idByRef -= ref | |||
refById -= id | |||
if (passivateIdleTask.isDefined && lastMessageTimestamp.contains(id)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
contains not needed here
lastMessageTimestamp.foreach { | ||
case (entityId, lastMessageTimestamp) ⇒ | ||
if (lastMessageTimestamp < deadline && refById.contains(entityId)) // test failed but, how could it not be in refs though? | ||
passivate(refById(entityId), handOffStopMessage) // FIXME double check that this is the right message |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, in Typed we use that, and that should be ok for untyped also
messageBuffers.contains(id) match { | ||
case false ⇒ deliverTo(id, msg, payload, snd) | ||
if (passivateIdleTask.isDefined) { | ||
lastMessageTimestamp += (id -> System.nanoTime()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is slightly more efficient: lastMessageTimestamp = lastMessageTimestamp.updated(id, System.nanoTime())
Not sure if I actually need a multi-JVM test for the feature given that it doesn't do any specific multi-node interactions, wdyt @patriknw ? |
Test FAILed. |
Test PASSed. |
Test PASSed. |
@johanandren no need for multi-jvm test of this feature. |
Then it's ready for final review! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looking good, some final touch...
...ster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ClusterShardingSettings.scala
Show resolved
Hide resolved
@@ -23,6 +23,10 @@ akka.cluster.sharding { | |||
# due to rebalance or crash. | |||
remember-entities = off | |||
|
|||
# Set this to a time duration to have sharding passivate entities when they have not | |||
# gotten any message in this long time. | |||
passivate-idle-entity-after = 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would be nice to support "off" value also
var passivating = Set.empty[ActorRef] | ||
val messageBuffers = new MessageBufferMap[EntityId] | ||
|
||
var handOffStopper: Option[ActorRef] = None | ||
|
||
import context.dispatcher | ||
val passivateIdleTask = if (settings.passivateIdleEntityAfter.toNanos > 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
settings.passivateIdleEntityAfter > Duration.Zero
@@ -265,6 +282,17 @@ private[akka] class Shard( | |||
} | |||
} | |||
|
|||
def passivateIdleEntities(): Unit = { | |||
log.debug("Passivating idle entities") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think that log statement adds much value. More useful would be to collect the entityIds to be passivated and log something like
log.debug("Passivating [{}] idle entities", idleEntities.size)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did it like this to not have to create an intermediate collection, and since each passivation also logs at debug, but maybe having to correlate this log entry with those is a bad idea. I'll change it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logging each passivation might be too much, that could be a storm of 1000s of log messages
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if someone need that they can add the logging in their actor
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That logging was already in place prior to this PR, you mean I should remove that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hm, ok let's leave one of them "Entity stopped after passivation", but remove "Passivating started on entity"
akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala
Outdated
Show resolved
Hide resolved
@@ -418,6 +418,8 @@ private[akka] class ShardRegion( | |||
// subscribe to MemberEvent, re-subscribe when restart | |||
override def preStart(): Unit = { | |||
cluster.subscribe(self, classOf[MemberEvent]) | |||
if (settings.passivateIdleEntityAfter.toMillis > 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
> Duration.Zero
akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala
Outdated
Show resolved
Hide resolved
akka-cluster-sharding/src/test/scala/akka/cluster/sharding/InactiveEntityPassivationSpec.scala
Outdated
Show resolved
Hide resolved
The entities can be configured to be automatically passivated if they haven't received | ||
a message for a while using the `akka.cluster.sharding.passivate-idle-entity-after` setting, | ||
or by explicitly setting `ClusterShardingSettings.passivateIdleEntityAfter` to a suitable | ||
time to keep the actor alive. By default automatic passivation is disabled. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need to mention that messages that are not sent via sharding are not counted, e.g. scheduled messages to self
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, I'll add that.
Test PASSed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, some optional nits
...ster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ClusterShardingSettings.scala
Outdated
Show resolved
Hide resolved
akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterShardingSettings.scala
Outdated
Show resolved
Hide resolved
akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala
Outdated
Show resolved
Hide resolved
akka-cluster-sharding/src/test/scala/akka/cluster/sharding/InactiveEntityPassivationSpec.scala
Outdated
Show resolved
Hide resolved
4548353
to
a418e43
Compare
Test FAILed. |
it was formatting issue in InactiveEntityPassivationSpec.scala |
Test PASSed. |
Fixes #25512