-
Notifications
You must be signed in to change notification settings - Fork 3.6k
Closed
Labels
1 - triagedTickets that are safe to pick up for contributing in terms of likeliness of being acceptedTickets that are safe to pick up for contributing in terms of likeliness of being acceptedhelp wantedIssues that the core team will likely not have time to work onIssues that the core team will likely not have time to work ont:cluster-tools
Milestone
Description
Symptom
Starting a proxy to a shard region (say "myType"
) before starting the shard region itself puts the actor system in a bad state. Attempting to create the shard region "myType"
will result in success, but the shard region coordinator will be absent, and the shard region will not be able to receive messages from its proxies.
Cause
- The cluster sharding guardian receives the message
StartProxy("myType", ...)"
when it has no child"myType"
and proceeds to create the child"myType"
. - The cluster sharding guardian receives the message
Start("myType", ...)"
. It checks whether the child"myType"
exists. Seeing that the child exists (it created the child a moment ago), it thinks the shard region was created already and replied withStarted(...)
without going through creating the coordinator singleton manager.
Relevant line:
akka/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala
Line 462 in b45a254
val shardRegion = context.child(encName).getOrElse { |
Proposed fix
Replace line 462 of ClusterSharding.scala
val shardRegion = context.child(encName).getOrElse { ... }
by
val shardRegion = context.child(cName).flatMap(_ => context.child(encName)).getOrElse { ... }
Minimal throwing example
In Java:
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.cluster.sharding.ClusterSharding;
import akka.cluster.sharding.ClusterShardingSettings;
import akka.cluster.sharding.ShardRegion;
import akka.testkit.TestActors;
import scala.concurrent.duration.FiniteDuration;
public final class Main {
public static void main(final String[] args) throws Exception {
final String configString = "akka.actor.provider=\"akka.cluster.ClusterActorRefProvider\"";
final Config config = ConfigFactory.parseString(configString).withFallback(ConfigFactory.defaultApplication());
final ActorSystem system = ActorSystem.create("myActorSystem", config);
final ClusterSharding clusterSharding = ClusterSharding.get(system);
final ClusterShardingSettings shardingSettings = ClusterShardingSettings.create(system);
final ShardRegion.MessageExtractor messageExtractor = new ShardRegion.HashCodeMessageExtractor(10) {
public String entityId(final Object message) {
return "dummyId";
}
};
// start a proxy to the shard "myType"
final ActorRef shardProxy = clusterSharding.startProxy("myType", Optional.empty(), messageExtractor);
// starting a proxy makes the cluster sharding guardian create the child "myType" without the coordinator.
final ActorRef childOfGuardian =
system.actorSelection("akka://myActorSystem/system/sharding/myType")
.resolveOneCS(FiniteDuration.create(5, TimeUnit.SECONDS))
.toCompletableFuture()
.get();
final Props props = Props.create(TestActors.EchoActor.class, TestActors.EchoActor::new);
// start the shard "myType" for real, but no coordinator is created
final ActorRef shardRegion = clusterSharding.start("myType", props, shardingSettings, messageExtractor);
// shard coordinator not found:
// akka.actor.ActorNotFound: Actor not found for: ActorSelection[Anchor(akka://myActorSystem/), Path(/system/sharding/myTypeCoordinator)]
final ActorRef shardCoordinator =
system.actorSelection("akka://myActorSystem/system/sharding/myTypeCoordinator")
.resolveOneCS(FiniteDuration.create(5, TimeUnit.SECONDS))
.toCompletableFuture()
.get();
}
}
The final actor tree from ActorSystem.printTree
is as follows (generated with akka-cluster_2.11 v2.5.3).
-> / LocalActorRefProvider$$anon$1 class akka.actor.LocalActorRefProvider$Guardian status=0 2 children
⌊-> system LocalActorRef class akka.actor.LocalActorRefProvider$SystemGuardian status=0 11 children
| ⌊-> cluster RepointableActorRef class akka.cluster.ClusterDaemon status=0 2 children
| | ⌊-> core LocalActorRef class akka.cluster.ClusterCoreSupervisor status=0 2 children
| | | ⌊-> daemon LocalActorRef class akka.cluster.ClusterCoreDaemon status=0 no children
| | | ⌊-> publisher LocalActorRef class akka.cluster.ClusterDomainEventPublisher status=2 no children
| | ⌊-> heartbeatReceiver LocalActorRef class akka.cluster.ClusterHeartbeatReceiver status=0 no children
| ⌊-> clusterEventBusListener RepointableActorRef class akka.cluster.ClusterReadView$$anonfun$1$$anon$1 status=0 no children
| ⌊-> deadLetterListener RepointableActorRef class akka.event.DeadLetterListener status=0 no children
| ⌊-> endpointManager RepointableActorRef class akka.remote.EndpointManager status=0 no children
| ⌊-> eventStreamUnsubscriber-1 RepointableActorRef class akka.event.EventStreamUnsubscriber status=0 no children
| ⌊-> log1-Logging$DefaultLogger RepointableActorRef class akka.event.Logging$DefaultLogger status=0 no children
| ⌊-> remote-deployment-watcher RepointableActorRef class akka.remote.RemoteDeploymentWatcher status=0 no children
| ⌊-> remote-watcher RepointableActorRef class akka.cluster.ClusterRemoteWatcher status=0 no children
| ⌊-> remoting-terminator RepointableActorRef class akka.remote.RemoteActorRefProvider$RemotingTerminator status=0 no children
| ⌊-> sharding RepointableActorRef class akka.cluster.sharding.ClusterShardingGuardian status=0 2 children
| | ⌊-> myType LocalActorRef class akka.cluster.sharding.ShardRegion status=0 no children
| | ⌊-> replicator LocalActorRef class akka.cluster.ddata.Replicator status=2 1 children
| | ⌊-> durableStore LocalActorRef class akka.cluster.ddata.LmdbDurableStore status=2 no children
| ⌊-> transports RepointableActorRef class akka.remote.Remoting$TransportSupervisor status=0 1 children
| ⌊-> akkaprotocolmanager.tcp0 LocalActorRef class akka.remote.transport.AkkaProtocolManager status=0 no children
⌊-> user LocalActorRef class akka.actor.LocalActorRefProvider$Guardian status=0 no children
Metadata
Metadata
Assignees
Labels
1 - triagedTickets that are safe to pick up for contributing in terms of likeliness of being acceptedTickets that are safe to pick up for contributing in terms of likeliness of being acceptedhelp wantedIssues that the core team will likely not have time to work onIssues that the core team will likely not have time to work ont:cluster-tools