diff --git a/akka-actor/src/main/scala/akka/serialization/Serialization.scala b/akka-actor/src/main/scala/akka/serialization/Serialization.scala index b0f6f275de..dbc1f21d1c 100644 --- a/akka-actor/src/main/scala/akka/serialization/Serialization.scala +++ b/akka-actor/src/main/scala/akka/serialization/Serialization.scala @@ -83,6 +83,7 @@ class Serialization(val system: ExtendedActorSystem) extends Extension { val settings = new Settings(system.settings.config) val log = Logging(system, getClass.getName) + private val manifestCache = new ConcurrentHashMap[String, Option[Class[_]]] /** * Serializes the given AnyRef/java.lang.Object according to the Serialization configuration @@ -123,12 +124,18 @@ class Serialization(val system: ExtendedActorSystem) extends Extension { if (manifest == "") s1.fromBinary(bytes, None) else { - system.dynamicAccess.getClassFor[AnyRef](manifest) match { - case Success(classManifest) ⇒ - s1.fromBinary(bytes, Some(classManifest)) - case Failure(e) ⇒ - throw new NotSerializableException( - s"Cannot find manifest class [$manifest] for serializer with id [$serializerId].") + val cachedClassManifest = manifestCache.get(manifest) + if (cachedClassManifest ne null) + s1.fromBinary(bytes, cachedClassManifest) + else { + system.dynamicAccess.getClassFor[AnyRef](manifest) match { + case Success(classManifest) ⇒ + manifestCache.put(manifest, Some(classManifest)) + s1.fromBinary(bytes, Some(classManifest)) + case Failure(e) ⇒ + throw new NotSerializableException( + s"Cannot find manifest class [$manifest] for serializer with id [$serializerId].") + } } } } diff --git a/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala b/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala index e11627a078..7119b96d2c 100644 --- a/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala +++ b/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala @@ -66,12 +66,12 @@ private[akka] object MessageSerializer { envelope.byteBuffer.put(serializer.toBinary(message)) } - def deserializeForArtery(system: ExtendedActorSystem, headerBuilder: HeaderBuilder, envelope: EnvelopeBuffer): AnyRef = { + def deserializeForArtery(system: ExtendedActorSystem, serialization: Serialization, headerBuilder: HeaderBuilder, envelope: EnvelopeBuffer): AnyRef = { // FIXME: Use the buffer directly val size = envelope.byteBuffer.limit - envelope.byteBuffer.position val bytes = Array.ofDim[Byte](size) envelope.byteBuffer.get(bytes) - SerializationExtension(system).deserialize( + serialization.deserialize( bytes, Integer.parseInt(headerBuilder.serializer), // FIXME: Use FQCN headerBuilder.classManifest).get diff --git a/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala b/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala index db412d7862..aeeb541cde 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala @@ -27,17 +27,42 @@ class Encoder( private val localAddress = transport.localAddress.address private val serialization = SerializationExtension(transport.system) + private val noSender = transport.system.deadLetters.path.toSerializationFormatWithAddress(localAddress) + private val senderCache = new java.util.HashMap[ActorRef, String] + private var recipientCache = new java.util.HashMap[ActorRef, String] + override def onPush(): Unit = { val send = grab(in) val envelope = pool.acquire() - headerBuilder.recipientActorRef = send.recipient.path.toSerializationFormat + val recipientStr = recipientCache.get(send.recipient) match { + case null ⇒ + val s = send.recipient.path.toSerializationFormat + // FIXME this cache will be replaced by compression table + if (recipientCache.size() >= 1000) + recipientCache.clear() + recipientCache.put(send.recipient, s) + s + case s ⇒ s + } + headerBuilder.recipientActorRef = recipientStr + send.senderOption match { case Some(sender) ⇒ - headerBuilder.senderActorRef = sender.path.toSerializationFormatWithAddress(localAddress) + val senderStr = senderCache.get(sender) match { + case null ⇒ + val s = sender.path.toSerializationFormatWithAddress(localAddress) + // FIXME we might need an efficient LRU cache, or replaced by compression table + if (senderCache.size() >= 1000) + senderCache.clear() + senderCache.put(sender, s) + s + case s ⇒ s + } + headerBuilder.senderActorRef = senderStr case None ⇒ //headerBuilder.setNoSender() - headerBuilder.senderActorRef = transport.system.deadLetters.path.toSerializationFormatWithAddress(localAddress) + headerBuilder.senderActorRef = noSender } // FIXME: Thunk allocation @@ -70,6 +95,10 @@ class Decoder( private val localAddress = transport.localAddress.address private val provider = transport.provider private val headerBuilder = HeaderBuilder(compressionTable) + private val serialization = SerializationExtension(transport.system) + + private val recipientCache = new java.util.HashMap[String, InternalActorRef] + private val senderCache = new java.util.HashMap[String, Option[ActorRef]] override def onPush(): Unit = { val envelope = grab(in) @@ -80,17 +109,35 @@ class Decoder( // FIXME: Instead of using Strings, the headerBuilder should automatically return cached ActorRef instances // in case of compression is enabled // FIXME: Is localAddress really needed? - val recipient: InternalActorRef = - provider.resolveActorRefWithLocalAddress(headerBuilder.recipientActorRef, localAddress) - val sender: ActorRef = - provider.resolveActorRefWithLocalAddress(headerBuilder.senderActorRef, localAddress) + val recipient: InternalActorRef = recipientCache.get(headerBuilder.recipientActorRef) match { + case null ⇒ + val ref = provider.resolveActorRefWithLocalAddress(headerBuilder.recipientActorRef, localAddress) + // FIXME we might need an efficient LRU cache, or replaced by compression table + if (recipientCache.size() >= 1000) + recipientCache.clear() + recipientCache.put(headerBuilder.recipientActorRef, ref) + ref + case ref ⇒ ref + } + + val senderOption: Option[ActorRef] = senderCache.get(headerBuilder.senderActorRef) match { + case null ⇒ + val ref = provider.resolveActorRefWithLocalAddress(headerBuilder.senderActorRef, localAddress) + // FIXME this cache will be replaced by compression table + if (senderCache.size() >= 1000) + senderCache.clear() + val refOpt = Some(ref) + senderCache.put(headerBuilder.senderActorRef, refOpt) + refOpt + case refOpt ⇒ refOpt + } val decoded = InboundEnvelope( recipient, localAddress, // FIXME: Is this needed anymore? What should we do here? - MessageSerializer.deserializeForArtery(transport.system, headerBuilder, envelope), - Some(sender), // FIXME: No need for an option, decode simply to deadLetters instead - UniqueAddress(sender.path.address, headerBuilder.uid)) + MessageSerializer.deserializeForArtery(transport.system, serialization, headerBuilder, envelope), + senderOption, // FIXME: No need for an option, decode simply to deadLetters instead + UniqueAddress(senderOption.get.path.address, headerBuilder.uid)) // FIXME see issue #20568 pool.release(envelope) push(out, decoded)