lazy init of SerializationExtension in Artery, #24620 (#24667)

to avoid loading serializers before ActorRefProvider has been initialized
This commit is contained in:
Patrik Nordwall 2018-03-09 08:59:33 +01:00 committed by Konrad `ktoso` Malawski
parent 5be3c7bf83
commit 7d67524bb5
2 changed files with 49 additions and 14 deletions

View file

@ -13,7 +13,6 @@ import akka.testkit.typed.scaladsl.{ ActorTestKit, TestProbe }
import akka.actor.typed.{ ActorRef, ActorRefResolver, Props, TypedAkkaSpecWithShutdown }
import akka.serialization.SerializerWithStringManifest
import com.typesafe.config.ConfigFactory
import scala.concurrent.Await
import scala.concurrent.duration._
@ -62,6 +61,9 @@ object ClusterSingletonApiSpec {
}
class PingSerializer(system: ExtendedActorSystem) extends SerializerWithStringManifest {
// Reproducer of issue #24620, by eagerly creating the ActorRefResolver in serializer
val actorRefResolver = ActorRefResolver(system.toTyped)
def identifier: Int = 47
def manifest(o: AnyRef): String = o match {
case _: Ping "a"
@ -70,13 +72,13 @@ object ClusterSingletonApiSpec {
}
def toBinary(o: AnyRef): Array[Byte] = o match {
case p: Ping ActorRefResolver(system.toTyped).toSerializationFormat(p.respondTo).getBytes(StandardCharsets.UTF_8)
case p: Ping actorRefResolver.toSerializationFormat(p.respondTo).getBytes(StandardCharsets.UTF_8)
case Pong Array.emptyByteArray
case Perish Array.emptyByteArray
}
def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = manifest match {
case "a" Ping(ActorRefResolver(system.toTyped).resolveActorRef(new String(bytes, StandardCharsets.UTF_8)))
case "a" Ping(actorRefResolver.resolveActorRef(new String(bytes, StandardCharsets.UTF_8)))
case "b" Pong
case "c" Perish
}

View file

@ -62,9 +62,18 @@ private[remote] class Encoder(
headerBuilder setVersion version
headerBuilder setUid uniqueLocalAddress.uid
private val localAddress = uniqueLocalAddress.address
private val serialization = SerializationExtension(system)
private val serializationInfo = Serialization.Information(localAddress, system)
// lazy init of SerializationExtension to avoid loading serializers before ActorRefProvider has been initialized
private var _serialization: OptionVal[Serialization] = OptionVal.None
private def serialization: Serialization = _serialization match {
case OptionVal.Some(s) s
case OptionVal.None
val s = SerializationExtension(system)
_serialization = OptionVal.Some(s)
s
}
private val instruments: RemoteInstruments = RemoteInstruments(system)
private val changeActorRefCompressionCb = getAsyncCallback[CompressionTable[ActorRef]] { table
@ -580,7 +589,16 @@ private[remote] class Deserializer(
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler with OutHandler with StageLogging {
private val instruments: RemoteInstruments = RemoteInstruments(system)
private val serialization = SerializationExtension(system)
// lazy init of SerializationExtension to avoid loading serializers before ActorRefProvider has been initialized
private var _serialization: OptionVal[Serialization] = OptionVal.None
private def serialization: Serialization = _serialization match {
case OptionVal.Some(s) s
case OptionVal.None
val s = SerializationExtension(system)
_serialization = OptionVal.Some(s)
s
}
override protected def logSource = classOf[Deserializer]
@ -642,16 +660,31 @@ private[remote] class DuplicateHandshakeReq(
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler with OutHandler {
private val (serializerId, manifest) = {
val serialization = SerializationExtension(system)
val ser = serialization.serializerFor(classOf[HandshakeReq])
val m = ser match {
case s: SerializerWithStringManifest
s.manifest(HandshakeReq(inboundContext.localAddress, inboundContext.localAddress.address))
case _ ""
}
(ser.identifier, m)
// lazy init of SerializationExtension to avoid loading serializers before ActorRefProvider has been initialized
var _serializerId: Int = -1
var _manifest = ""
def serializerId: Int = {
lazyInitOfSerializer()
_serializerId
}
def manifest: String = {
lazyInitOfSerializer()
_manifest
}
def lazyInitOfSerializer(): Unit = {
if (_serializerId == -1) {
val serialization = SerializationExtension(system)
val ser = serialization.serializerFor(classOf[HandshakeReq])
_manifest = ser match {
case s: SerializerWithStringManifest
s.manifest(HandshakeReq(inboundContext.localAddress, inboundContext.localAddress.address))
case _ ""
}
_serializerId = ser.identifier
}
}
var currentIterator: Iterator[InboundEnvelope] = Iterator.empty
override def onPush(): Unit = {