new OutboundEnvelope

* instead of the old Send
* optional recipient, remove of dummy
* pool of OutboundEnvelope
This commit is contained in:
Patrik Nordwall 2016-06-29 17:09:33 +02:00
parent a2a66b1fb3
commit b2089d06a7
20 changed files with 407 additions and 284 deletions

View file

@ -8,7 +8,6 @@ import scala.concurrent.duration._
import scala.util.control.NonFatal
import akka.actor._
import akka.remote.{ MessageSerializer, OversizedPayloadException, UniqueAddress }
import akka.remote.EndpointManager.Send
import akka.remote.artery.SystemMessageDelivery.SystemMessageEnvelope
import akka.serialization.{ Serialization, SerializationExtension }
import akka.stream._
@ -21,15 +20,16 @@ import akka.stream.stage.TimerGraphStageLogic
* INTERNAL API
*/
private[remote] class Encoder(
uniqueLocalAddress: UniqueAddress,
system: ActorSystem,
compression: OutboundCompressions,
bufferPool: EnvelopeBufferPool)
extends GraphStage[FlowShape[Send, EnvelopeBuffer]] {
uniqueLocalAddress: UniqueAddress,
system: ActorSystem,
compression: OutboundCompressions,
outboundEnvelopePool: ObjectPool[ReusableOutboundEnvelope],
bufferPool: EnvelopeBufferPool)
extends GraphStage[FlowShape[OutboundEnvelope, EnvelopeBuffer]] {
val in: Inlet[Send] = Inlet("Artery.Encoder.in")
val in: Inlet[OutboundEnvelope] = Inlet("Artery.Encoder.in")
val out: Outlet[EnvelopeBuffer] = Outlet("Artery.Encoder.out")
val shape: FlowShape[Send, EnvelopeBuffer] = FlowShape(in, out)
val shape: FlowShape[OutboundEnvelope, EnvelopeBuffer] = FlowShape(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler with OutHandler with StageLogging {
@ -44,11 +44,14 @@ private[remote] class Encoder(
override protected def logSource = classOf[Encoder]
override def onPush(): Unit = {
val send = grab(in)
val outboundEnvelope = grab(in)
val envelope = bufferPool.acquire()
// internally compression is applied by the builder:
headerBuilder setRecipientActorRef send.recipient
outboundEnvelope.recipient match {
case OptionVal.Some(r) headerBuilder setRecipientActorRef r
case OptionVal.None headerBuilder.setNoRecipient()
}
try {
// avoiding currentTransportInformation.withValue due to thunk allocation
@ -56,12 +59,12 @@ private[remote] class Encoder(
try {
Serialization.currentTransportInformation.value = serializationInfo
send.senderOption match {
outboundEnvelope.sender match {
case OptionVal.None headerBuilder.setNoSender()
case OptionVal.Some(s) headerBuilder setSenderActorRef s
}
MessageSerializer.serializeForArtery(serialization, send.message.asInstanceOf[AnyRef], headerBuilder, envelope)
MessageSerializer.serializeForArtery(serialization, outboundEnvelope.message, headerBuilder, envelope)
} finally Serialization.currentTransportInformation.value = oldValue
envelope.byteBuffer.flip()
@ -70,18 +73,24 @@ private[remote] class Encoder(
} catch {
case NonFatal(e)
bufferPool.release(envelope)
send.message match {
outboundEnvelope.message match {
case _: SystemMessageEnvelope
log.error(e, "Failed to serialize system message [{}].", send.message.getClass.getName)
log.error(e, "Failed to serialize system message [{}].", outboundEnvelope.message.getClass.getName)
throw e
case _ if e.isInstanceOf[java.nio.BufferOverflowException]
val reason = new OversizedPayloadException(s"Discarding oversized payload sent to ${send.recipient}: max allowed size ${envelope.byteBuffer.limit()} bytes. Message type [${send.message.getClass.getName}].")
log.error(reason, "Failed to serialize oversized message [{}].", send.message.getClass.getName)
val reason = new OversizedPayloadException(s"Discarding oversized payload sent to ${outboundEnvelope.recipient}: " +
s"max allowed size ${envelope.byteBuffer.limit()} bytes. Message type [${outboundEnvelope.message.getClass.getName}].")
log.error(reason, "Failed to serialize oversized message [{}].", outboundEnvelope.message.getClass.getName)
pull(in)
case _
log.error(e, "Failed to serialize message [{}].", send.message.getClass.getName)
log.error(e, "Failed to serialize message [{}].", outboundEnvelope.message.getClass.getName)
pull(in)
}
} finally {
outboundEnvelope match {
case r: ReusableOutboundEnvelope outboundEnvelopePool.release(r)
case _
}
}
}
@ -111,7 +120,7 @@ private[remote] class Decoder(
resolveActorRefWithLocalAddress: String InternalActorRef,
compression: InboundCompressions, // TODO has to do demuxing on remote address It would seem, as decoder does not yet know
bufferPool: EnvelopeBufferPool,
inEnvelopePool: ObjectPool[InboundEnvelope]) extends GraphStage[FlowShape[EnvelopeBuffer, InboundEnvelope]] {
inEnvelopePool: ObjectPool[ReusableInboundEnvelope]) extends GraphStage[FlowShape[EnvelopeBuffer, InboundEnvelope]] {
val in: Inlet[EnvelopeBuffer] = Inlet("Artery.Decoder.in")
val out: Outlet[InboundEnvelope] = Outlet("Artery.Decoder.out")
val shape: FlowShape[EnvelopeBuffer, InboundEnvelope] = FlowShape(in, out)
@ -168,8 +177,7 @@ private[remote] class Decoder(
val deserializedMessage = MessageSerializer.deserializeForArtery(
system, originUid, serialization, headerBuilder, envelope)
val decoded = inEnvelopePool.acquire()
decoded.asInstanceOf[ReusableInboundEnvelope].init(
val decoded = inEnvelopePool.acquire().init(
recipient,
localAddress, // FIXME: Is this needed anymore? What should we do here?
deserializedMessage,