improve codec performance
* caching of actor refs in Encoder, Decoder * dynamicAccess.getClassFor in Serialization is costly, so introduced a cache for the class manifests there
This commit is contained in:
parent
a4b996546e
commit
121840589b
3 changed files with 72 additions and 18 deletions
|
|
@ -27,17 +27,42 @@ class Encoder(
|
|||
private val localAddress = transport.localAddress.address
|
||||
private val serialization = SerializationExtension(transport.system)
|
||||
|
||||
private val noSender = transport.system.deadLetters.path.toSerializationFormatWithAddress(localAddress)
|
||||
private val senderCache = new java.util.HashMap[ActorRef, String]
|
||||
private var recipientCache = new java.util.HashMap[ActorRef, String]
|
||||
|
||||
override def onPush(): Unit = {
|
||||
val send = grab(in)
|
||||
val envelope = pool.acquire()
|
||||
|
||||
headerBuilder.recipientActorRef = send.recipient.path.toSerializationFormat
|
||||
val recipientStr = recipientCache.get(send.recipient) match {
|
||||
case null ⇒
|
||||
val s = send.recipient.path.toSerializationFormat
|
||||
// FIXME this cache will be replaced by compression table
|
||||
if (recipientCache.size() >= 1000)
|
||||
recipientCache.clear()
|
||||
recipientCache.put(send.recipient, s)
|
||||
s
|
||||
case s ⇒ s
|
||||
}
|
||||
headerBuilder.recipientActorRef = recipientStr
|
||||
|
||||
send.senderOption match {
|
||||
case Some(sender) ⇒
|
||||
headerBuilder.senderActorRef = sender.path.toSerializationFormatWithAddress(localAddress)
|
||||
val senderStr = senderCache.get(sender) match {
|
||||
case null ⇒
|
||||
val s = sender.path.toSerializationFormatWithAddress(localAddress)
|
||||
// FIXME we might need an efficient LRU cache, or replaced by compression table
|
||||
if (senderCache.size() >= 1000)
|
||||
senderCache.clear()
|
||||
senderCache.put(sender, s)
|
||||
s
|
||||
case s ⇒ s
|
||||
}
|
||||
headerBuilder.senderActorRef = senderStr
|
||||
case None ⇒
|
||||
//headerBuilder.setNoSender()
|
||||
headerBuilder.senderActorRef = transport.system.deadLetters.path.toSerializationFormatWithAddress(localAddress)
|
||||
headerBuilder.senderActorRef = noSender
|
||||
}
|
||||
|
||||
// FIXME: Thunk allocation
|
||||
|
|
@ -70,6 +95,10 @@ class Decoder(
|
|||
private val localAddress = transport.localAddress.address
|
||||
private val provider = transport.provider
|
||||
private val headerBuilder = HeaderBuilder(compressionTable)
|
||||
private val serialization = SerializationExtension(transport.system)
|
||||
|
||||
private val recipientCache = new java.util.HashMap[String, InternalActorRef]
|
||||
private val senderCache = new java.util.HashMap[String, Option[ActorRef]]
|
||||
|
||||
override def onPush(): Unit = {
|
||||
val envelope = grab(in)
|
||||
|
|
@ -80,17 +109,35 @@ class Decoder(
|
|||
// FIXME: Instead of using Strings, the headerBuilder should automatically return cached ActorRef instances
|
||||
// in case of compression is enabled
|
||||
// FIXME: Is localAddress really needed?
|
||||
val recipient: InternalActorRef =
|
||||
provider.resolveActorRefWithLocalAddress(headerBuilder.recipientActorRef, localAddress)
|
||||
val sender: ActorRef =
|
||||
provider.resolveActorRefWithLocalAddress(headerBuilder.senderActorRef, localAddress)
|
||||
val recipient: InternalActorRef = recipientCache.get(headerBuilder.recipientActorRef) match {
|
||||
case null ⇒
|
||||
val ref = provider.resolveActorRefWithLocalAddress(headerBuilder.recipientActorRef, localAddress)
|
||||
// FIXME we might need an efficient LRU cache, or replaced by compression table
|
||||
if (recipientCache.size() >= 1000)
|
||||
recipientCache.clear()
|
||||
recipientCache.put(headerBuilder.recipientActorRef, ref)
|
||||
ref
|
||||
case ref ⇒ ref
|
||||
}
|
||||
|
||||
val senderOption: Option[ActorRef] = senderCache.get(headerBuilder.senderActorRef) match {
|
||||
case null ⇒
|
||||
val ref = provider.resolveActorRefWithLocalAddress(headerBuilder.senderActorRef, localAddress)
|
||||
// FIXME this cache will be replaced by compression table
|
||||
if (senderCache.size() >= 1000)
|
||||
senderCache.clear()
|
||||
val refOpt = Some(ref)
|
||||
senderCache.put(headerBuilder.senderActorRef, refOpt)
|
||||
refOpt
|
||||
case refOpt ⇒ refOpt
|
||||
}
|
||||
|
||||
val decoded = InboundEnvelope(
|
||||
recipient,
|
||||
localAddress, // FIXME: Is this needed anymore? What should we do here?
|
||||
MessageSerializer.deserializeForArtery(transport.system, headerBuilder, envelope),
|
||||
Some(sender), // FIXME: No need for an option, decode simply to deadLetters instead
|
||||
UniqueAddress(sender.path.address, headerBuilder.uid))
|
||||
MessageSerializer.deserializeForArtery(transport.system, serialization, headerBuilder, envelope),
|
||||
senderOption, // FIXME: No need for an option, decode simply to deadLetters instead
|
||||
UniqueAddress(senderOption.get.path.address, headerBuilder.uid)) // FIXME see issue #20568
|
||||
|
||||
pool.release(envelope)
|
||||
push(out, decoded)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue