Skip to content

Actor does not receive "Terminated" message if remoting is used and it is not monitored actor's parent #1646

@akoshelev

Description

@akoshelev

Hi,

I am trying to implement Reaper of Souls pattern in Akka.NET and when remoting is enabled my Reaper actor does not receive Terminated message from its watchees, if, at the time it receives WatchMe message, watchee is already dead, but direct parent of the watchees is receiving these messages regardless of watchee's state. Here is the code to reproduce this problem (you'll need three separate c# projects)

/* system1.csproj */
namespace System1
{
    class Program
    {
        static void Main(string[] args)
        {
            using (var @as = ActorSystem.Create("system1", ConfigurationFactory.ParseString(@"
                akka {  
                    log-config-on-start = on
                    stdout-loglevel = INFO
                    loglevel = INFO
                    actor {
                        provider = ""Akka.Remote.RemoteActorRefProvider, Akka.Remote""

                        debug {  
                          receive = on 
                          autoreceive = on
                          lifecycle = on
                          event-stream = on
                          unhandled = on
                        }

                        deployment {
                            /coordinator {
                               remote = ""akka.tcp://system2@localhost:8080""
                            }
                        }
                    }
                    remote {
                       helios.tcp {
                           port = 8090
                           hostname = localhost
                       }
                    }
                }
                ")))
            {
                var coordinator = @as.ActorOf(Props.Create(() => new Coordinator()), "coordinator");
                coordinator.Tell("Start");
                Console.ReadLine();
            }

        }
    }
}

/* system2.csproj */
namespace System2
{
    class Program
    {
        static void Main(string[] args)
        {
            using (var @as = ActorSystem.Create("system2", ConfigurationFactory.ParseString(@"
                               akka {  
                    log-config-on-start = on
                    stdout-loglevel = DEBUG
                    loglevel = INFO
                    actor {
                        provider = ""Akka.Remote.RemoteActorRefProvider, Akka.Remote""

                        debug {  
                          receive = on 
                          autoreceive = on
                          lifecycle = on
                          event-stream = on
                          unhandled = on
                        }
                    }
                    remote {
                        helios.tcp {
                            port = 8080
                            hostname = localhost
                        }
                    }
                }
                ")))
            {
                Console.ReadLine();
            }

        }
    }
}

/* ClassLibrary.csproj */
namespace ClassLibrary1
{
    public class Watchee : ReceiveActor
    {
        public Watchee()
        {
            Receive<string>(s => s == "Start", s =>
            {
                Console.WriteLine("{0} stopping", Self);
                Context.Stop(Self);
            });
        }
    }

    public class Reaper : ReceiveActor
    {
        public Reaper(IActorRef parent)
        {
            HashSet<IActorRef> watchees = new HashSet<IActorRef>();

            Receive<WatchMe>(m =>
            {
                Console.WriteLine("{0} watching", m.Watchee);
                watchees.Add(m.Watchee);
                Context.Watch(m.Watchee);
            });

            Receive<Terminated>(t =>
            {
                watchees.Remove(t.ActorRef);
                Console.WriteLine("{0} terminated, {1} still watching", t.ActorRef, watchees.Count);
                if (watchees.Count == 0)
                {
                    parent.Tell(new AllSoulsReaped());
                }
            });
        }
    }

    public sealed class AllSoulsReaped
    {
    }

    public sealed class WatchMe
    {
        public WatchMe(IActorRef watchee)
        {
            Watchee = watchee;
        }

        public IActorRef Watchee { get; private set; }
    }

    public class Coordinator : ReceiveActor
    {
        public Coordinator()
        {
            IActorRef actor1, actor2;
            IActorRef reaper = Context.ActorOf(Props.Create(() => new Reaper(Self)));

            Receive<string>(s => s == "Start", s =>
            {
                actor1 = Context.ActorOf(Props.Create<Watchee>());

                actor1.Tell("Start");

                actor2 = Context.ActorOf(Props.Create<Watchee>());
                actor2.Tell("Start");

                Thread.Sleep(2000);

                // at this time both actors are supposed to be dead.
                // context.watch seems to be working
                Context.Watch(actor1);
                Context.Watch(actor2);
                // if other actor tries to monitor actor state, it never receives "Terminated" message
                reaper.Tell(new WatchMe(actor1));
                reaper.Tell(new WatchMe(actor2));
            });

            Receive<AllSoulsReaped>(t =>
            {
                Console.WriteLine("All souls reaped");
            });

            Receive<Terminated>(t =>
            {
                Console.WriteLine("{0} Receive terminated from {1}", Self, t.ActorRef);
            });
        }
    }
}

I'm receiving the following output

[akka://system2/remote/akka.tcp/system1@localhost:8090/user/coordinator/$c] stopping
[akka://system2/remote/akka.tcp/system1@localhost:8090/user/coordinator/$d] stopping
[akka://system2/remote/akka.tcp/system1@localhost:8090/user/coordinator/$c] watching
[akka://system2/remote/akka.tcp/system1@localhost:8090/user/coordinator/$d] watching
[akka://system2/remote/akka.tcp/system1@localhost:8090/user/coordinator] Receive terminated from [akka://system2/remote/akka.tcp/system1@localhost:8090/user/coordinator/$d]
[akka://system2/remote/akka.tcp/system1@localhost:8090/user/coordinator] Receive terminated from [akka://system2/remote/akka.tcp/system1@localhost:8090/user/coordinator/$c]

so Coordinator receives Terminated messages while Reaper does not. But if I disable remoting, I'm getting the following:

[akka://system1/user/coordinator/$c] stopping
[akka://system1/user/coordinator/$d] stopping
[akka://system1/user/coordinator/$c] watching
[akka://system1/user/coordinator/$d] watching
[akka://system1/user/coordinator/$c] terminated, 1 still watching
[akka://system1/user/coordinator/$d] terminated, 0 still watching
[akka://system1/user/coordinator] Receive terminated from [akka://system1/user/coordinator/$c]
All souls reaped
[akka://system1/user/coordinator] Receive terminated from [akka://system1/user/coordinator/$d]

So in that case, Reaper correctly receives Terminated messages.

From Akka docs it seems it does not matter whether actor (at the moment we call Context.Watch) is dead or not link

It should be noted that the Terminated message is generated independent of the order in which registration and termination occur. In particular, the watching actor will receive a Terminated message even if the watched actor has already been terminated at the time of registration.

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions