-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Description
Root cause:
EndpointManager
|
EndpointWriter
|
EndpointReader
Inside the EndpointReader on Disassociated message ShutDownAssociation exception is thrown.
EndpointWriter's SupervisorStrategy supposed to escalate the exception, but instead its re-thrown from there.
protected override SupervisorStrategy SupervisorStrategy()
{
return new OneForOneStrategy(ex =>
{
//we're going to throw an exception anyway
PublishAndThrow(ex, LogLevel.ErrorLevel);
return Directive.Escalate;
});
}
The outcome is the same: the issue is escalated from the EndpointWriter and EndpointManager's SupervisorStrategy logs the exception and stops EndpointWriter and EndpointReader;
but what actually happened is hidden + a bit of the perf hit on the exception throwing and catching it again on the higher level.
Here is how exception looks like now:
2017-01-04 16:35:19.208 [4] [(null)] ERROR Akka.Actor.OneForOneStrategy - Shut down address: akka.tcp://riskengine@127.0.0.1:4054
Akka.Remote.ShutDownAssociation: Shut down address: akka.tcp://riskengine@127.0.0.1:4054 ---> Akka.Remote.Transport.InvalidAssociationException: The remote system terminated the association because it is shutting down.
--- End of inner exception stack trace ---
at Akka.Remote.EndpointWriter.PublishAndThrow(Exception reason, LogLevel level)
at Akka.Remote.EndpointWriter.b__20_0(Exception ex)
at Akka.Actor.SupervisorStrategy.HandleFailure(ActorCell actorCell, IActorRef child, Exception cause, ChildRestartStats stats, IReadOnlyCollection`1 children)
at Akka.Actor.ActorCell.HandleFailed(Failed f)
at Akka.Actor.ActorCell.SysMsgInvokeAll(EarliestFirstSystemMessageList messages, Int32 currentState)
--- End of stack trace from previous location where exception was thrown ---
at Akka.Actor.ActorCell.HandleFailed(Failed f)
at Akka.Actor.ActorCell.SysMsgInvokeAll(EarliestFirstSystemMessageList messages, Int32 currentState)
This is how it should look:
2017-01-04 17:08:32.147 [20] [(null)] ERROR Akka.Actor.OneForOneStrategy - Shut down address: akka.tcp://riskengine@127.0.0.1:4054
Akka.Remote.ShutDownAssociation: Shut down address: akka.tcp://riskengine@127.0.0.1:4054 ---> Akka.Remote.Transport.InvalidAssociationException: The remote system terminated the association because it is shutting down.
--- End of inner exception stack trace ---
at Akka.Remote.EndpointReader.HandleDisassociated(DisassociateInfo info)
at lambda_method(Closure , Object , Action1 , Action
1 , Action1 ) at Akka.Tools.MatchHandler.PartialHandlerArgumentsCapture
4.Handle(T value)
at Akka.Actor.ReceiveActor.ExecutePartialMessageHandler(Object message, PartialAction`1 partialAction)
at Akka.Actor.UntypedActor.Receive(Object message)
at Akka.Actor.ActorBase.AroundReceive(Receive receive, Object message)
at Akka.Actor.ActorCell.ReceiveMessage(Object message)
at Akka.Actor.ActorCell.Invoke(Envelope envelope)
--- End of stack trace from previous location where exception was thrown ---
at Akka.Actor.ActorCell.HandleFailed(Failed f)
at Akka.Actor.ActorCell.SysMsgInvokeAll(EarliestFirstSystemMessageList messages, Int32 currentState)
--- End of stack trace from previous location where exception was thrown ---
at Akka.Actor.ActorCell.HandleFailed(Failed f)
at Akka.Actor.ActorCell.SysMsgInvokeAll(EarliestFirstSystemMessageList messages, Int32 currentState)
And as I mentioned before:
The Akka.Remote.ShutDownAssociation exception is escalated and the EndpointManager's SupervisorStrategy logs the exception and stops EndpointWriter and EndpointReader.
On the Akka.Actor.OneForOneStrategy the Stop and Restart derectives are logged by default:
protected virtual void LogFailure(IActorContext context, IActorRef child, Exception cause, Directive directive)
{
if(LoggingEnabled)
{
var actorInitializationException = cause as ActorInitializationException;
string message;
if(actorInitializationException != null && actorInitializationException.InnerException != null)
message = actorInitializationException.InnerException.Message;
else
message = cause.Message;
switch (directive)
{
case Directive.Resume:
Publish(context, new Warning(child.Path.ToString(), GetType(), message));
break;
case Directive.Escalate:
//Don't log here
break;
default:
//case Directive.Restart:
//case Directive.Stop:
Publish(context, new Error(cause, child.Path.ToString(), GetType(), message));
break;
}
}
}