diff --git a/akka-cluster-typed/src/test/scala/akka/cluster/typed/ClusterSingletonApiSpec.scala b/akka-cluster-typed/src/test/scala/akka/cluster/typed/ClusterSingletonApiSpec.scala index cd579f33fa..a0b199c21d 100644 --- a/akka-cluster-typed/src/test/scala/akka/cluster/typed/ClusterSingletonApiSpec.scala +++ b/akka-cluster-typed/src/test/scala/akka/cluster/typed/ClusterSingletonApiSpec.scala @@ -13,7 +13,6 @@ import akka.testkit.typed.scaladsl.{ ActorTestKit, TestProbe } import akka.actor.typed.{ ActorRef, ActorRefResolver, Props, TypedAkkaSpecWithShutdown } import akka.serialization.SerializerWithStringManifest import com.typesafe.config.ConfigFactory - import scala.concurrent.Await import scala.concurrent.duration._ @@ -62,6 +61,9 @@ object ClusterSingletonApiSpec { } class PingSerializer(system: ExtendedActorSystem) extends SerializerWithStringManifest { + // Reproducer of issue #24620, by eagerly creating the ActorRefResolver in serializer + val actorRefResolver = ActorRefResolver(system.toTyped) + def identifier: Int = 47 def manifest(o: AnyRef): String = o match { case _: Ping ⇒ "a" @@ -70,13 +72,13 @@ object ClusterSingletonApiSpec { } def toBinary(o: AnyRef): Array[Byte] = o match { - case p: Ping ⇒ ActorRefResolver(system.toTyped).toSerializationFormat(p.respondTo).getBytes(StandardCharsets.UTF_8) + case p: Ping ⇒ actorRefResolver.toSerializationFormat(p.respondTo).getBytes(StandardCharsets.UTF_8) case Pong ⇒ Array.emptyByteArray case Perish ⇒ Array.emptyByteArray } def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = manifest match { - case "a" ⇒ Ping(ActorRefResolver(system.toTyped).resolveActorRef(new String(bytes, StandardCharsets.UTF_8))) + case "a" ⇒ Ping(actorRefResolver.resolveActorRef(new String(bytes, StandardCharsets.UTF_8))) case "b" ⇒ Pong case "c" ⇒ Perish } diff --git a/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala b/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala index cb52065d81..61333817ac 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala @@ -62,9 +62,18 @@ private[remote] class Encoder( headerBuilder setVersion version headerBuilder setUid uniqueLocalAddress.uid private val localAddress = uniqueLocalAddress.address - private val serialization = SerializationExtension(system) private val serializationInfo = Serialization.Information(localAddress, system) + // lazy init of SerializationExtension to avoid loading serializers before ActorRefProvider has been initialized + private var _serialization: OptionVal[Serialization] = OptionVal.None + private def serialization: Serialization = _serialization match { + case OptionVal.Some(s) ⇒ s + case OptionVal.None ⇒ + val s = SerializationExtension(system) + _serialization = OptionVal.Some(s) + s + } + private val instruments: RemoteInstruments = RemoteInstruments(system) private val changeActorRefCompressionCb = getAsyncCallback[CompressionTable[ActorRef]] { table ⇒ @@ -580,7 +589,16 @@ private[remote] class Deserializer( override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler with StageLogging { private val instruments: RemoteInstruments = RemoteInstruments(system) - private val serialization = SerializationExtension(system) + + // lazy init of SerializationExtension to avoid loading serializers before ActorRefProvider has been initialized + private var _serialization: OptionVal[Serialization] = OptionVal.None + private def serialization: Serialization = _serialization match { + case OptionVal.Some(s) ⇒ s + case OptionVal.None ⇒ + val s = SerializationExtension(system) + _serialization = OptionVal.Some(s) + s + } override protected def logSource = classOf[Deserializer] @@ -642,16 +660,31 @@ private[remote] class DuplicateHandshakeReq( override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler { - private val (serializerId, manifest) = { - val serialization = SerializationExtension(system) - val ser = serialization.serializerFor(classOf[HandshakeReq]) - val m = ser match { - case s: SerializerWithStringManifest ⇒ - s.manifest(HandshakeReq(inboundContext.localAddress, inboundContext.localAddress.address)) - case _ ⇒ "" - } - (ser.identifier, m) + + // lazy init of SerializationExtension to avoid loading serializers before ActorRefProvider has been initialized + var _serializerId: Int = -1 + var _manifest = "" + def serializerId: Int = { + lazyInitOfSerializer() + _serializerId } + def manifest: String = { + lazyInitOfSerializer() + _manifest + } + def lazyInitOfSerializer(): Unit = { + if (_serializerId == -1) { + val serialization = SerializationExtension(system) + val ser = serialization.serializerFor(classOf[HandshakeReq]) + _manifest = ser match { + case s: SerializerWithStringManifest ⇒ + s.manifest(HandshakeReq(inboundContext.localAddress, inboundContext.localAddress.address)) + case _ ⇒ "" + } + _serializerId = ser.identifier + } + } + var currentIterator: Iterator[InboundEnvelope] = Iterator.empty override def onPush(): Unit = {