-
Notifications
You must be signed in to change notification settings - Fork 3.6k
Make sure Serialization.currentTransportInformation is always set, #25067 #25068
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Test FAILed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, mostly had a few questions.
* INTERNAL API | ||
* INTERNAL API: This holds a reference to the current transport serialization information used for | ||
* serializing local actor refs, or if serializer library e.g. custom serializer/deserializer in | ||
* Jackson need access to the current `ActorSystem`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 on improved explanation
add @InternalApi
as well?
} | ||
outboundEnvelope.sender match { | ||
case OptionVal.None ⇒ headerBuilder.setNoSender() | ||
case OptionVal.Some(s) ⇒ headerBuilder setSenderActorRef s |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
boyscout away the operator style call?
case ser: ByteBufferSerializer ⇒ ser.toBinary(message, envelope.byteBuffer) | ||
case _ ⇒ envelope.byteBuffer.put(serializer.toBinary(message)) | ||
} | ||
} finally Serialization.currentTransportInformation.value = oldInfo |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Something I don't quite get here is why we need to do the stash/unstash of the old currentTransportInformation
value, isn't it always the same in the same actorsystem? Is it to avoid a thread local leak?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Leaving the value in the thread is not an option. Bad practice, and the same thread pool can actually be used for several actor systems.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alright, thanks for confirming.
Serialization.currentTransportInformation.value = serialization.serializationInformation | ||
|
||
headerBuilder setSerializer serializer.identifier | ||
headerBuilder setManifest Serializers.manifestFor(serializer, message) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some more operator style method calls
case OptionVal.Some(info) ⇒ info | ||
case OptionVal.None ⇒ | ||
if ((transport eq null) || (transport.defaultAddress eq null)) | ||
local.serializationInformation // address not know yet |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When does this happen? Isn't the address always available from config if remoting is used? Also, won't this lead to the local serialization info potentially being put here and never replaced?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's the dreaded startup case, if accessed before completely initialized
The address is not known until bound, port 0.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Test FAILed. |
fixed the bug |
Test PASSed. |
I have been running this in multi-node repeat a few days, with and without Artery. All good. |
…5067 * The ThreadLocal Serialization.currentTransportInformation is used for serializing local actor refs, but it's also useful when a serializer library e.g. custom serializer/deserializer in Jackson need access to the current ActorSystem. * We set this in a rather ad-hoc way from remoting and in some persistence plugins, but it's only set for serialization and not deserialization, and it's easy for Persistence plugins or other libraries to forget this when using Akka serialization directly. * This change is automatically setting the info when using the ordinary serialize and deserialize methods. * It's also set when LocalActorRefProvider, which wasn't always the case previously. * Keep a cached instance of Serialization.Information in the provider to avoid creating new instances all the time. * Added optional Persistence TCK tests to verify that the plugin is setting this if it's using some custom calls to the serializer.
28d8414
to
827f118
Compare
Test PASSed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM with a couple of nits and one possible suggestion
import akka.actor._ | ||
import akka.serialization.{ SerializationExtension, Serializers } | ||
import akka.util.{ Helpers, Unsafe } | ||
import java.util.Optional | ||
|
||
import akka.serialization.Serialization |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Group with other akka imports
|
||
/** | ||
* Gets the serialization information from a `ThreadLocal` that was assigned via | ||
* [[Serialization#withTransportInformation]]. The information is is needed for serializing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is is
/** | ||
* Serializes the given AnyRef/java.lang.Object according to the Serialization configuration | ||
* to either an Array of Bytes or an Exception if one was thrown. | ||
*/ | ||
def serialize(o: AnyRef): Try[Array[Byte]] = Try(findSerializerFor(o).toBinary(o)) | ||
def serialize(o: AnyRef): Try[Array[Byte]] = { | ||
val oldInfo = Serialization.currentTransportInformation.value |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be worth pulling this out to a function e.g.
private def withTransportSet[T](f: () => T): T = {
val oldInfo = Serialization.currentTransportInformation.value
try {
if (oldInfo eq null)
Serialization.currentTransportInformation.value = serializationInformation
f()
} finally {
Serialization.currentTransportInformation.value = oldInfo
}
}
Then
def serialize(o: AnyRef): Try[Array[Byte]] = {
withTransportSet { () =>
Try(findSerializerFor(o).toBinary(o))
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the drawback is the extra allocation, perhaps I'm too concerned about allocations and doesn't matter for these methods.
Test PASSed. |
actor refs, but it's also useful when a serializer library e.g. custom serializer/deserializer
in Jackson need access to the current ActorSystem.
set for serialization and not deserialization, and it's easy for Persistence plugins or other
libraries to forget this when using Akka serialization directly.
methods.
creating new instances all the time.
if it's using some custom calls to the serializer.
Refs #25067