pekko/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala

192 lines
7.5 KiB
Scala
Raw Normal View History

2016-05-05 14:38:48 +02:00
package akka.remote.artery
2016-05-27 11:24:08 +02:00
import scala.util.control.NonFatal
2016-05-05 14:38:48 +02:00
import akka.actor.{ ActorRef, InternalActorRef }
2016-05-27 11:24:08 +02:00
import akka.actor.ActorSystem
import akka.actor.ExtendedActorSystem
2016-05-05 14:38:48 +02:00
import akka.remote.{ MessageSerializer, UniqueAddress }
2016-05-27 11:24:08 +02:00
import akka.remote.EndpointManager.Send
import akka.remote.artery.SystemMessageDelivery.SystemMessageEnvelope
2016-05-05 14:38:48 +02:00
import akka.serialization.{ Serialization, SerializationExtension }
import akka.stream._
import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler }
import akka.util.OptionVal
2016-05-05 14:38:48 +02:00
// TODO: Long UID
class Encoder(
uniqueLocalAddress: UniqueAddress,
2016-06-06 08:26:15 +02:00
system: ActorSystem,
compressionTable: LiteralCompressionTable,
bufferPool: EnvelopeBufferPool)
2016-05-05 14:38:48 +02:00
extends GraphStage[FlowShape[Send, EnvelopeBuffer]] {
val in: Inlet[Send] = Inlet("Artery.Encoder.in")
val out: Outlet[EnvelopeBuffer] = Outlet("Artery.Encoder.out")
val shape: FlowShape[Send, EnvelopeBuffer] = FlowShape(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
2016-05-27 11:24:08 +02:00
new GraphStageLogic(shape) with InHandler with OutHandler with StageLogging {
2016-05-05 14:38:48 +02:00
private val headerBuilder = HeaderBuilder(compressionTable)
headerBuilder.version = ArteryTransport.Version
headerBuilder.uid = uniqueLocalAddress.uid
private val localAddress = uniqueLocalAddress.address
private val serialization = SerializationExtension(system)
2016-05-27 08:50:41 +02:00
private val serializationInfo = Serialization.Information(localAddress, system)
2016-05-05 14:38:48 +02:00
private val senderCache = new java.util.HashMap[ActorRef, String]
private var recipientCache = new java.util.HashMap[ActorRef, String]
2016-05-27 11:24:08 +02:00
override protected def logSource = classOf[Encoder]
2016-05-05 14:38:48 +02:00
override def onPush(): Unit = {
val send = grab(in)
2016-06-06 08:26:15 +02:00
val envelope = bufferPool.acquire()
2016-05-05 14:38:48 +02:00
val recipientStr = recipientCache.get(send.recipient) match {
case null
val s = send.recipient.path.toSerializationFormat
// FIXME this cache will be replaced by compression table
if (recipientCache.size() >= 1000)
recipientCache.clear()
recipientCache.put(send.recipient, s)
s
case s s
}
headerBuilder.recipientActorRef = recipientStr
2016-05-05 14:38:48 +02:00
send.senderOption match {
2016-06-06 08:26:15 +02:00
case OptionVal.None headerBuilder.setNoSender()
case OptionVal.Some(sender)
val senderStr = senderCache.get(sender) match {
case null
val s = sender.path.toSerializationFormatWithAddress(localAddress)
// FIXME we might need an efficient LRU cache, or replaced by compression table
if (senderCache.size() >= 1000)
senderCache.clear()
senderCache.put(sender, s)
s
case s s
}
headerBuilder.senderActorRef = senderStr
2016-05-05 14:38:48 +02:00
}
2016-05-27 08:50:41 +02:00
try {
2016-05-27 11:24:08 +02:00
// avoiding currentTransportInformation.withValue due to thunk allocation
val oldValue = Serialization.currentTransportInformation.value
try {
Serialization.currentTransportInformation.value = serializationInfo
MessageSerializer.serializeForArtery(serialization, send.message.asInstanceOf[AnyRef], headerBuilder, envelope)
} finally
Serialization.currentTransportInformation.value = oldValue
envelope.byteBuffer.flip()
push(out, envelope)
} catch {
case NonFatal(e)
2016-06-06 08:26:15 +02:00
bufferPool.release(envelope)
2016-05-27 11:24:08 +02:00
send.message match {
case _: SystemMessageEnvelope
log.error(e, "Failed to serialize system message [{}].", send.message.getClass.getName)
throw e
case _
log.error(e, "Failed to serialize message [{}].", send.message.getClass.getName)
pull(in)
}
}
2016-05-05 14:38:48 +02:00
}
override def onPull(): Unit = pull(in)
setHandlers(in, out, this)
}
}
class Decoder(
2016-06-06 08:26:15 +02:00
uniqueLocalAddress: UniqueAddress,
system: ExtendedActorSystem,
resolveActorRefWithLocalAddress: String InternalActorRef,
2016-06-06 08:26:15 +02:00
compressionTable: LiteralCompressionTable,
bufferPool: EnvelopeBufferPool,
inEnvelopePool: ObjectPool[InboundEnvelope]) extends GraphStage[FlowShape[EnvelopeBuffer, InboundEnvelope]] {
2016-05-05 14:38:48 +02:00
val in: Inlet[EnvelopeBuffer] = Inlet("Artery.Decoder.in")
val out: Outlet[InboundEnvelope] = Outlet("Artery.Decoder.out")
val shape: FlowShape[EnvelopeBuffer, InboundEnvelope] = FlowShape(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
2016-05-27 11:24:08 +02:00
new GraphStageLogic(shape) with InHandler with OutHandler with StageLogging {
private val localAddress = uniqueLocalAddress.address
2016-05-05 14:38:48 +02:00
private val headerBuilder = HeaderBuilder(compressionTable)
private val serialization = SerializationExtension(system)
private val recipientCache = new java.util.HashMap[String, InternalActorRef]
private val senderCache = new java.util.HashMap[String, ActorRef]
2016-05-05 14:38:48 +02:00
2016-05-27 11:24:08 +02:00
override protected def logSource = classOf[Decoder]
2016-05-05 14:38:48 +02:00
override def onPush(): Unit = {
val envelope = grab(in)
envelope.parseHeader(headerBuilder)
// FIXME: Instead of using Strings, the headerBuilder should automatically return cached ActorRef instances
// in case of compression is enabled
// FIXME: Is localAddress really needed?
val recipient: InternalActorRef = recipientCache.get(headerBuilder.recipientActorRef) match {
case null
val ref = resolveActorRefWithLocalAddress(headerBuilder.recipientActorRef)
// FIXME we might need an efficient LRU cache, or replaced by compression table
if (recipientCache.size() >= 1000)
recipientCache.clear()
recipientCache.put(headerBuilder.recipientActorRef, ref)
ref
case ref ref
}
val senderOption =
if (headerBuilder.isNoSender)
OptionVal.None
else {
senderCache.get(headerBuilder.senderActorRef) match {
case null
val ref = resolveActorRefWithLocalAddress(headerBuilder.senderActorRef)
// FIXME this cache will be replaced by compression table
if (senderCache.size() >= 1000)
senderCache.clear()
senderCache.put(headerBuilder.senderActorRef, ref)
OptionVal(ref)
case ref OptionVal(ref)
}
}
2016-05-05 14:38:48 +02:00
2016-05-27 11:24:08 +02:00
try {
val deserializedMessage = MessageSerializer.deserializeForArtery(
system, serialization, headerBuilder, envelope)
2016-06-06 08:26:15 +02:00
val decoded = inEnvelopePool.acquire()
decoded.asInstanceOf[ReusableInboundEnvelope].init(
2016-05-27 11:24:08 +02:00
recipient,
localAddress, // FIXME: Is this needed anymore? What should we do here?
deserializedMessage,
senderOption,
headerBuilder.uid)
2016-05-27 11:24:08 +02:00
push(out, decoded)
} catch {
case NonFatal(e)
log.warning(
"Failed to deserialize message with serializer id [{}] and manifest [{}]. {}",
headerBuilder.serializer, headerBuilder.manifest, e.getMessage)
2016-05-27 11:24:08 +02:00
pull(in)
} finally {
2016-06-06 08:26:15 +02:00
bufferPool.release(envelope)
2016-05-27 11:24:08 +02:00
}
2016-05-05 14:38:48 +02:00
}
override def onPull(): Unit = pull(in)
setHandlers(in, out, this)
}
}