-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Closed
Labels
Description
I've just tried to implement the cluster pub/sub and failed :(
Getting an error like:
worker_1 | 2016-07-28 07:11:50 [Error] "An item with the same key has already been added."
worker_1 | System.ArgumentException: An item with the same key has already been added.
worker_1 | at System.ThrowHelper.ThrowArgumentException (ExceptionResource resource) <0x421d9c00 + 0x00027> in <filename unknown>:0
worker_1 | at System.Collections.Generic.Dictionary`2[TKey,TValue].Insert (System.Collections.Generic.TKey key, System.Collections.Generic.TValue value, Boolean add) <0x415b2e20 + 0x0017b> in <filename unknown>:0
worker_1 | at System.Collections.Generic.Dictionary`2[TKey,TValue].Add (System.Collections.Generic.TKey key, System.Collections.Generic.TValue value) <0x415b79a0 + 0x0001b> in <filename unknown>:0
worker_1 | at Akka.Cluster.Tools.PublishSubscribe.DistributedPubSubMediator.HandlePrune () <0x421d7240 + 0x00364> in <filename unknown>:0
worker_1 | at Akka.Cluster.Tools.PublishSubscribe.DistributedPubSubMediator.<.ctor>b__14_14 (Akka.Cluster.Tools.PublishSubscribe.Internal.Prune _) <0x421d7200 + 0x0000f> in <filename unknown>:0
worker_1 | at (wrapper dynamic-method) System.Object:lambda_method (System.Runtime.CompilerServices.Closure,object,System.Action`1<Akka.Cluster.Tools.PublishSubscribe.Send>,System.Action`1<Akka.Cluster.Tools.PublishSubscribe.SendToAll>,System.Action`1<Akka.Cluster.Tools.PublishSubscribe.Publish>,System.Action`1<Akka.Cluster.Tools.PublishSubscribe.Put>,System.Action`1<Akka.Cluster.Tools.PublishSubscribe.Remove>,System.Action`1<Akka.Cluster.Tools.PublishSubscribe.Subscribe>,System.Action`1<Akka.Cluster.Tools.PublishSubscribe.Internal.RegisterTopic>,System.Action`1<Akka.Cluster.Tools.PublishSubscribe.GetTopics>,System.Action`1<Akka.Cluster.Tools.PublishSubscribe.Internal.Subscribed>,System.Action`1<Akka.Cluster.Tools.PublishSubscribe.Unsubscribe>,System.Action`1<Akka.Cluster.Tools.PublishSubscribe.Internal.Unsubscribed>,System.Action`1<Akka.Cluster.Tools.PublishSubscribe.Internal.Status>,System.Action`1<Akka.Cluster.Tools.PublishSubscribe.Internal.Delta>,System.Action`1<Akka.Cluster.Tools.PublishSubscribe.Internal.GossipTick>,object[])
worker_1 | at (wrapper delegate-invoke) <Module>:invoke_bound_bool_Closure_object_Action`1<Send>_Action`1<SendToAll>_Action`1<Publish>_Action`1<Put>_Action`1<Remove>_Action`1<Subscribe>_Action`1<RegisterTopic>_Action`1<GetTopics>_Action`1<Subscribed>_Action`1<Unsubscribe>_Action`1<Unsubscribed>_Action`1<Status>_Action`1<Delta>_Action`1<GossipTick>_object[] (object,System.Action`1<Akka.Cluster.Tools.PublishSubscribe.Send>,System.Action`1<Akka.Cluster.Tools.PublishSubscribe.SendToAll>,System.Action`1<Akka.Cluster.Tools.PublishSubscribe.Publish>,System.Action`1<Akka.Cluster.Tools.PublishSubscribe.Put>,System.Action`1<Akka.Cluster.Tools.PublishSubscribe.Remove>,System.Action`1<Akka.Cluster.Tools.PublishSubscribe.Subscribe>,System.Action`1<Akka.Cluster.Tools.PublishSubscribe.Internal.RegisterTopic>,System.Action`1<Akka.Cluster.Tools.PublishSubscribe.GetTopics>,System.Action`1<Akka.Cluster.Tools.PublishSubscribe.Internal.Subscribed>,System.Action`1<Akka.Cluster.Tools.PublishSubscribe.Unsubscribe>,System.Action`1<Akka.Cluster.Tools.PublishSubscribe.Internal.Unsubscribed>,System.Action`1<Akka.Cluster.Tools.PublishSubscribe.Internal.Status>,System.Action`1<Akka.Cluster.Tools.PublishSubscribe.Internal.Delta>,System.Action`1<Akka.Cluster.Tools.PublishSubscribe.Internal.GossipTick>,object[])
worker_1 | at Akka.Tools.MatchHandler.PartialHandlerArgumentsCapture`16[T,T1,T2,T3,T4,T5,T6,T7,T8,T9,T10,T11,T12,T13,T14,T15].Handle (Akka.Tools.MatchHandler.T message) <0x4208dbd0 + 0x0011b> in <filename unknown>:0
worker_1 | at Akka.Actor.ReceiveActor.ExecutePartialMessageHandler (System.Object message, Akka.Tools.MatchHandler.PartialAction`1 partialAction) <0x4178ed70 + 0x00024> in <filename unknown>:0
worker_1 | at Akka.Actor.ReceiveActor.OnReceive (System.Object message) <0x4178ed30 + 0x00023> in <filename unknown>:0
worker_1 | at Akka.Actor.UntypedActor.Receive (System.Object message) <0x4178ed00 + 0x00018> in <filename unknown>:0
worker_1 | at Akka.Actor.ActorBase.AroundReceive (Akka.Actor.Receive receive, System.Object message) <0x4178eca0 + 0x00024> in <filename unknown>:0
worker_1 | at Akka.Actor.ActorCell.ReceiveMessage (System.Object message) <0x4178e9d0 + 0x0005b> in <filename unknown>:0
worker_1 | at Akka.Actor.ActorCell.Invoke (Envelope envelope) <0x4178e520 + 0x0032f> in <filename unknown>:0
Then I looked at the HandlePrune
method...
private void HandlePrune()
{
foreach (var entry in _registry)
{
var owner = entry.Key;
var bucket = entry.Value;
var oldRemoved = bucket.Content
.Where(kv => (bucket.Version - kv.Value.Version) > _settings.RemovedTimeToLive.TotalMilliseconds)
.Select(kv => kv.Key);
if (oldRemoved.Any())
{
_registry.Add(owner, new Bucket(bucket.Owner, bucket.Version, bucket.Content.RemoveRange(oldRemoved)));
}
}
}
As I can see
if (oldRemoved.Any())
{
_registry.Add(owner, new Bucket(bucket.Owner, bucket.Version, bucket.Content.RemoveRange(oldRemoved)));
}
will always generate an exception as owner
is always this dictionary key.