Skip to content

DistributedPubSubMediator - an error in HandlePrune #2221

@kantora

Description

@kantora

I've just tried to implement the cluster pub/sub and failed :(
Getting an error like:

worker_1            | 2016-07-28 07:11:50 [Error] "An item with the same key has already been added."
worker_1            | System.ArgumentException: An item with the same key has already been added.
worker_1            |   at System.ThrowHelper.ThrowArgumentException (ExceptionResource resource) <0x421d9c00 + 0x00027> in <filename unknown>:0
worker_1            |   at System.Collections.Generic.Dictionary`2[TKey,TValue].Insert (System.Collections.Generic.TKey key, System.Collections.Generic.TValue value, Boolean add) <0x415b2e20 + 0x0017b> in <filename unknown>:0
worker_1            |   at System.Collections.Generic.Dictionary`2[TKey,TValue].Add (System.Collections.Generic.TKey key, System.Collections.Generic.TValue value) <0x415b79a0 + 0x0001b> in <filename unknown>:0
worker_1            |   at Akka.Cluster.Tools.PublishSubscribe.DistributedPubSubMediator.HandlePrune () <0x421d7240 + 0x00364> in <filename unknown>:0
worker_1            |   at Akka.Cluster.Tools.PublishSubscribe.DistributedPubSubMediator.<.ctor>b__14_14 (Akka.Cluster.Tools.PublishSubscribe.Internal.Prune _) <0x421d7200 + 0x0000f> in <filename unknown>:0
worker_1            |   at (wrapper dynamic-method) System.Object:lambda_method (System.Runtime.CompilerServices.Closure,object,System.Action`1<Akka.Cluster.Tools.PublishSubscribe.Send>,System.Action`1<Akka.Cluster.Tools.PublishSubscribe.SendToAll>,System.Action`1<Akka.Cluster.Tools.PublishSubscribe.Publish>,System.Action`1<Akka.Cluster.Tools.PublishSubscribe.Put>,System.Action`1<Akka.Cluster.Tools.PublishSubscribe.Remove>,System.Action`1<Akka.Cluster.Tools.PublishSubscribe.Subscribe>,System.Action`1<Akka.Cluster.Tools.PublishSubscribe.Internal.RegisterTopic>,System.Action`1<Akka.Cluster.Tools.PublishSubscribe.GetTopics>,System.Action`1<Akka.Cluster.Tools.PublishSubscribe.Internal.Subscribed>,System.Action`1<Akka.Cluster.Tools.PublishSubscribe.Unsubscribe>,System.Action`1<Akka.Cluster.Tools.PublishSubscribe.Internal.Unsubscribed>,System.Action`1<Akka.Cluster.Tools.PublishSubscribe.Internal.Status>,System.Action`1<Akka.Cluster.Tools.PublishSubscribe.Internal.Delta>,System.Action`1<Akka.Cluster.Tools.PublishSubscribe.Internal.GossipTick>,object[])
worker_1            |   at (wrapper delegate-invoke) <Module>:invoke_bound_bool_Closure_object_Action`1<Send>_Action`1<SendToAll>_Action`1<Publish>_Action`1<Put>_Action`1<Remove>_Action`1<Subscribe>_Action`1<RegisterTopic>_Action`1<GetTopics>_Action`1<Subscribed>_Action`1<Unsubscribe>_Action`1<Unsubscribed>_Action`1<Status>_Action`1<Delta>_Action`1<GossipTick>_object[] (object,System.Action`1<Akka.Cluster.Tools.PublishSubscribe.Send>,System.Action`1<Akka.Cluster.Tools.PublishSubscribe.SendToAll>,System.Action`1<Akka.Cluster.Tools.PublishSubscribe.Publish>,System.Action`1<Akka.Cluster.Tools.PublishSubscribe.Put>,System.Action`1<Akka.Cluster.Tools.PublishSubscribe.Remove>,System.Action`1<Akka.Cluster.Tools.PublishSubscribe.Subscribe>,System.Action`1<Akka.Cluster.Tools.PublishSubscribe.Internal.RegisterTopic>,System.Action`1<Akka.Cluster.Tools.PublishSubscribe.GetTopics>,System.Action`1<Akka.Cluster.Tools.PublishSubscribe.Internal.Subscribed>,System.Action`1<Akka.Cluster.Tools.PublishSubscribe.Unsubscribe>,System.Action`1<Akka.Cluster.Tools.PublishSubscribe.Internal.Unsubscribed>,System.Action`1<Akka.Cluster.Tools.PublishSubscribe.Internal.Status>,System.Action`1<Akka.Cluster.Tools.PublishSubscribe.Internal.Delta>,System.Action`1<Akka.Cluster.Tools.PublishSubscribe.Internal.GossipTick>,object[])
worker_1            |   at Akka.Tools.MatchHandler.PartialHandlerArgumentsCapture`16[T,T1,T2,T3,T4,T5,T6,T7,T8,T9,T10,T11,T12,T13,T14,T15].Handle (Akka.Tools.MatchHandler.T message) <0x4208dbd0 + 0x0011b> in <filename unknown>:0
worker_1            |   at Akka.Actor.ReceiveActor.ExecutePartialMessageHandler (System.Object message, Akka.Tools.MatchHandler.PartialAction`1 partialAction) <0x4178ed70 + 0x00024> in <filename unknown>:0
worker_1            |   at Akka.Actor.ReceiveActor.OnReceive (System.Object message) <0x4178ed30 + 0x00023> in <filename unknown>:0
worker_1            |   at Akka.Actor.UntypedActor.Receive (System.Object message) <0x4178ed00 + 0x00018> in <filename unknown>:0
worker_1            |   at Akka.Actor.ActorBase.AroundReceive (Akka.Actor.Receive receive, System.Object message) <0x4178eca0 + 0x00024> in <filename unknown>:0
worker_1            |   at Akka.Actor.ActorCell.ReceiveMessage (System.Object message) <0x4178e9d0 + 0x0005b> in <filename unknown>:0
worker_1            |   at Akka.Actor.ActorCell.Invoke (Envelope envelope) <0x4178e520 + 0x0032f> in <filename unknown>:0

Then I looked at the HandlePrune method...

        private void HandlePrune()
        {
            foreach (var entry in _registry)
            {
                var owner = entry.Key;
                var bucket = entry.Value;

                var oldRemoved = bucket.Content
                    .Where(kv => (bucket.Version - kv.Value.Version) > _settings.RemovedTimeToLive.TotalMilliseconds)
                    .Select(kv => kv.Key);

                if (oldRemoved.Any())
                {
                    _registry.Add(owner, new Bucket(bucket.Owner, bucket.Version, bucket.Content.RemoveRange(oldRemoved)));
                }
            }
        }

As I can see

 if (oldRemoved.Any())
                {
                    _registry.Add(owner, new Bucket(bucket.Owner, bucket.Version, bucket.Content.RemoveRange(oldRemoved)));
                }

will always generate an exception as owner is always this dictionary key.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions