2016-06-23 11:58:54 +02:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
|
|
|
|
|
*/
|
2016-05-05 14:38:48 +02:00
|
|
|
package akka.remote.artery
|
|
|
|
|
|
2016-06-09 09:16:44 +02:00
|
|
|
import scala.concurrent.duration._
|
2016-05-27 11:24:08 +02:00
|
|
|
import scala.util.control.NonFatal
|
2016-06-23 11:58:54 +02:00
|
|
|
import akka.actor._
|
2016-06-08 10:04:30 +02:00
|
|
|
import akka.remote.{ MessageSerializer, OversizedPayloadException, UniqueAddress }
|
2016-05-27 11:24:08 +02:00
|
|
|
import akka.remote.artery.SystemMessageDelivery.SystemMessageEnvelope
|
2016-05-05 14:38:48 +02:00
|
|
|
import akka.serialization.{ Serialization, SerializationExtension }
|
|
|
|
|
import akka.stream._
|
|
|
|
|
import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler }
|
2016-06-23 11:58:54 +02:00
|
|
|
import akka.util.{ ByteString, OptionVal }
|
2016-06-09 09:16:44 +02:00
|
|
|
import akka.actor.EmptyLocalActorRef
|
|
|
|
|
import akka.stream.stage.TimerGraphStageLogic
|
2016-05-05 14:38:48 +02:00
|
|
|
|
2016-06-09 09:16:44 +02:00
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
|
|
|
|
private[remote] class Encoder(
|
2016-06-29 17:09:33 +02:00
|
|
|
uniqueLocalAddress: UniqueAddress,
|
|
|
|
|
system: ActorSystem,
|
|
|
|
|
compression: OutboundCompressions,
|
|
|
|
|
outboundEnvelopePool: ObjectPool[ReusableOutboundEnvelope],
|
|
|
|
|
bufferPool: EnvelopeBufferPool)
|
|
|
|
|
extends GraphStage[FlowShape[OutboundEnvelope, EnvelopeBuffer]] {
|
|
|
|
|
|
|
|
|
|
val in: Inlet[OutboundEnvelope] = Inlet("Artery.Encoder.in")
|
2016-05-05 14:38:48 +02:00
|
|
|
val out: Outlet[EnvelopeBuffer] = Outlet("Artery.Encoder.out")
|
2016-06-29 17:09:33 +02:00
|
|
|
val shape: FlowShape[OutboundEnvelope, EnvelopeBuffer] = FlowShape(in, out)
|
2016-05-05 14:38:48 +02:00
|
|
|
|
|
|
|
|
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
|
2016-05-27 11:24:08 +02:00
|
|
|
new GraphStageLogic(shape) with InHandler with OutHandler with StageLogging {
|
2016-05-05 14:38:48 +02:00
|
|
|
|
2016-06-23 11:58:54 +02:00
|
|
|
private val headerBuilder = HeaderBuilder.out(compression)
|
2016-07-01 11:54:57 +02:00
|
|
|
headerBuilder setVersion ArteryTransport.Version
|
|
|
|
|
headerBuilder setUid uniqueLocalAddress.uid
|
2016-05-26 10:42:08 +02:00
|
|
|
private val localAddress = uniqueLocalAddress.address
|
|
|
|
|
private val serialization = SerializationExtension(system)
|
2016-05-27 08:50:41 +02:00
|
|
|
private val serializationInfo = Serialization.Information(localAddress, system)
|
2016-05-05 14:38:48 +02:00
|
|
|
|
2016-05-27 11:24:08 +02:00
|
|
|
override protected def logSource = classOf[Encoder]
|
|
|
|
|
|
2016-05-05 14:38:48 +02:00
|
|
|
override def onPush(): Unit = {
|
2016-06-29 17:09:33 +02:00
|
|
|
val outboundEnvelope = grab(in)
|
2016-06-06 08:26:15 +02:00
|
|
|
val envelope = bufferPool.acquire()
|
2016-05-05 14:38:48 +02:00
|
|
|
|
2016-06-23 11:58:54 +02:00
|
|
|
// internally compression is applied by the builder:
|
2016-06-29 17:09:33 +02:00
|
|
|
outboundEnvelope.recipient match {
|
|
|
|
|
case OptionVal.Some(r) ⇒ headerBuilder setRecipientActorRef r
|
|
|
|
|
case OptionVal.None ⇒ headerBuilder.setNoRecipient()
|
|
|
|
|
}
|
2016-05-05 14:38:48 +02:00
|
|
|
|
2016-05-27 08:50:41 +02:00
|
|
|
try {
|
2016-05-27 11:24:08 +02:00
|
|
|
// avoiding currentTransportInformation.withValue due to thunk allocation
|
|
|
|
|
val oldValue = Serialization.currentTransportInformation.value
|
|
|
|
|
try {
|
|
|
|
|
Serialization.currentTransportInformation.value = serializationInfo
|
2016-06-23 11:58:54 +02:00
|
|
|
|
2016-06-29 17:09:33 +02:00
|
|
|
outboundEnvelope.sender match {
|
2016-06-23 11:58:54 +02:00
|
|
|
case OptionVal.None ⇒ headerBuilder.setNoSender()
|
2016-07-01 11:54:57 +02:00
|
|
|
case OptionVal.Some(s) ⇒ headerBuilder setSenderActorRef s
|
2016-06-23 11:58:54 +02:00
|
|
|
}
|
|
|
|
|
|
2016-06-29 17:09:33 +02:00
|
|
|
MessageSerializer.serializeForArtery(serialization, outboundEnvelope.message, headerBuilder, envelope)
|
2016-06-23 11:58:54 +02:00
|
|
|
} finally Serialization.currentTransportInformation.value = oldValue
|
2016-05-27 11:24:08 +02:00
|
|
|
|
|
|
|
|
envelope.byteBuffer.flip()
|
|
|
|
|
push(out, envelope)
|
|
|
|
|
|
|
|
|
|
} catch {
|
|
|
|
|
case NonFatal(e) ⇒
|
2016-06-06 08:26:15 +02:00
|
|
|
bufferPool.release(envelope)
|
2016-06-29 17:09:33 +02:00
|
|
|
outboundEnvelope.message match {
|
2016-05-27 11:24:08 +02:00
|
|
|
case _: SystemMessageEnvelope ⇒
|
2016-06-29 17:09:33 +02:00
|
|
|
log.error(e, "Failed to serialize system message [{}].", outboundEnvelope.message.getClass.getName)
|
2016-05-27 11:24:08 +02:00
|
|
|
throw e
|
2016-06-08 10:04:30 +02:00
|
|
|
case _ if e.isInstanceOf[java.nio.BufferOverflowException] ⇒
|
2016-06-29 17:09:33 +02:00
|
|
|
val reason = new OversizedPayloadException(s"Discarding oversized payload sent to ${outboundEnvelope.recipient}: " +
|
|
|
|
|
s"max allowed size ${envelope.byteBuffer.limit()} bytes. Message type [${outboundEnvelope.message.getClass.getName}].")
|
|
|
|
|
log.error(reason, "Failed to serialize oversized message [{}].", outboundEnvelope.message.getClass.getName)
|
2016-06-08 10:04:30 +02:00
|
|
|
pull(in)
|
2016-05-27 11:24:08 +02:00
|
|
|
case _ ⇒
|
2016-06-29 17:09:33 +02:00
|
|
|
log.error(e, "Failed to serialize message [{}].", outboundEnvelope.message.getClass.getName)
|
2016-05-27 11:24:08 +02:00
|
|
|
pull(in)
|
|
|
|
|
}
|
2016-06-29 17:09:33 +02:00
|
|
|
} finally {
|
|
|
|
|
outboundEnvelope match {
|
|
|
|
|
case r: ReusableOutboundEnvelope ⇒ outboundEnvelopePool.release(r)
|
|
|
|
|
case _ ⇒
|
|
|
|
|
}
|
2016-05-27 11:24:08 +02:00
|
|
|
}
|
2016-05-05 14:38:48 +02:00
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
override def onPull(): Unit = pull(in)
|
|
|
|
|
|
|
|
|
|
setHandlers(in, out, this)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2016-06-09 09:16:44 +02:00
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
|
|
|
|
private[remote] object Decoder {
|
|
|
|
|
private final case class RetryResolveRemoteDeployedRecipient(
|
|
|
|
|
attemptsLeft: Int,
|
|
|
|
|
recipientPath: String,
|
|
|
|
|
inboundEnvelope: InboundEnvelope)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
|
|
|
|
private[remote] class Decoder(
|
2016-06-10 13:04:23 +02:00
|
|
|
inboundContext: InboundContext,
|
2016-06-06 08:26:15 +02:00
|
|
|
system: ExtendedActorSystem,
|
2016-05-26 10:42:08 +02:00
|
|
|
resolveActorRefWithLocalAddress: String ⇒ InternalActorRef,
|
2016-07-01 11:54:57 +02:00
|
|
|
compression: InboundCompressions, // TODO has to do demuxing on remote address It would seem, as decoder does not yet know
|
2016-06-06 08:26:15 +02:00
|
|
|
bufferPool: EnvelopeBufferPool,
|
2016-06-29 17:09:33 +02:00
|
|
|
inEnvelopePool: ObjectPool[ReusableInboundEnvelope]) extends GraphStage[FlowShape[EnvelopeBuffer, InboundEnvelope]] {
|
2016-05-05 14:38:48 +02:00
|
|
|
val in: Inlet[EnvelopeBuffer] = Inlet("Artery.Decoder.in")
|
|
|
|
|
val out: Outlet[InboundEnvelope] = Outlet("Artery.Decoder.out")
|
|
|
|
|
val shape: FlowShape[EnvelopeBuffer, InboundEnvelope] = FlowShape(in, out)
|
|
|
|
|
|
|
|
|
|
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
|
2016-06-09 09:16:44 +02:00
|
|
|
new TimerGraphStageLogic(shape) with InHandler with OutHandler with StageLogging {
|
|
|
|
|
import Decoder.RetryResolveRemoteDeployedRecipient
|
2016-06-10 13:04:23 +02:00
|
|
|
private val localAddress = inboundContext.localAddress.address
|
2016-06-23 11:58:54 +02:00
|
|
|
private val headerBuilder = HeaderBuilder.in(compression)
|
2016-05-26 10:42:08 +02:00
|
|
|
private val serialization = SerializationExtension(system)
|
2016-05-20 10:33:55 +02:00
|
|
|
|
2016-06-09 09:16:44 +02:00
|
|
|
private val retryResolveRemoteDeployedRecipientInterval = 50.millis
|
|
|
|
|
private val retryResolveRemoteDeployedRecipientAttempts = 20
|
|
|
|
|
|
2016-05-27 11:24:08 +02:00
|
|
|
override protected def logSource = classOf[Decoder]
|
|
|
|
|
|
2016-05-05 14:38:48 +02:00
|
|
|
override def onPush(): Unit = {
|
|
|
|
|
val envelope = grab(in)
|
|
|
|
|
envelope.parseHeader(headerBuilder)
|
|
|
|
|
|
2016-06-10 13:04:23 +02:00
|
|
|
val originUid = headerBuilder.uid
|
|
|
|
|
val association = inboundContext.association(originUid)
|
|
|
|
|
|
2016-07-01 11:54:57 +02:00
|
|
|
val recipient: OptionVal[InternalActorRef] = headerBuilder.recipientActorRef(originUid) match {
|
|
|
|
|
case OptionVal.Some(ref) ⇒
|
|
|
|
|
OptionVal(ref.asInstanceOf[InternalActorRef])
|
|
|
|
|
case OptionVal.None ⇒
|
|
|
|
|
// `get` on Path is safe because it surely is not a compressed value here
|
|
|
|
|
resolveRecipient(headerBuilder.recipientActorRefPath.get)
|
2016-06-23 11:58:54 +02:00
|
|
|
}
|
|
|
|
|
|
2016-07-01 11:54:57 +02:00
|
|
|
val sender: InternalActorRef = headerBuilder.senderActorRef(originUid) match {
|
|
|
|
|
case OptionVal.Some(ref) ⇒
|
|
|
|
|
ref.asInstanceOf[InternalActorRef]
|
|
|
|
|
case OptionVal.None ⇒
|
|
|
|
|
// `get` on Path is safe because it surely is not a compressed value here
|
|
|
|
|
resolveActorRefWithLocalAddress(headerBuilder.senderActorRefPath.get)
|
2016-06-23 11:58:54 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// --- hit refs and manifests for heavy-hitter counting
|
|
|
|
|
association match {
|
|
|
|
|
case OptionVal.Some(assoc) ⇒
|
|
|
|
|
val remoteAddress = assoc.remoteAddress
|
2016-07-01 11:54:57 +02:00
|
|
|
compression.hitActorRef(originUid, headerBuilder.actorRefCompressionTableVersion, remoteAddress, sender)
|
|
|
|
|
if (recipient.isDefined) compression.hitActorRef(originUid, headerBuilder.actorRefCompressionTableVersion, remoteAddress, recipient.get)
|
|
|
|
|
compression.hitClassManifest(originUid, headerBuilder.classManifestCompressionTableVersion, remoteAddress, headerBuilder.manifest(originUid))
|
2016-06-23 11:58:54 +02:00
|
|
|
case _ ⇒
|
|
|
|
|
// we don't want to record hits for compression while handshake is still in progress.
|
|
|
|
|
log.debug("Decoded message but unable to record hits for compression as no remoteAddress known. No association yet?")
|
|
|
|
|
}
|
|
|
|
|
// --- end of hit refs and manifests for heavy-hitter counting
|
|
|
|
|
|
2016-05-27 11:24:08 +02:00
|
|
|
try {
|
|
|
|
|
val deserializedMessage = MessageSerializer.deserializeForArtery(
|
2016-07-01 11:54:57 +02:00
|
|
|
system, originUid, serialization, headerBuilder, envelope)
|
2016-05-27 11:24:08 +02:00
|
|
|
|
2016-06-29 17:09:33 +02:00
|
|
|
val decoded = inEnvelopePool.acquire().init(
|
2016-05-27 11:24:08 +02:00
|
|
|
recipient,
|
|
|
|
|
localAddress, // FIXME: Is this needed anymore? What should we do here?
|
|
|
|
|
deserializedMessage,
|
2016-06-23 11:58:54 +02:00
|
|
|
OptionVal.Some(sender), // FIXME: No need for an option, decode simply to deadLetters instead
|
2016-06-10 13:04:23 +02:00
|
|
|
originUid,
|
|
|
|
|
association)
|
2016-05-27 11:24:08 +02:00
|
|
|
|
2016-06-09 09:16:44 +02:00
|
|
|
if (recipient.isEmpty && !headerBuilder.isNoRecipient) {
|
|
|
|
|
// the remote deployed actor might not be created yet when resolving the
|
|
|
|
|
// recipient for the first message that is sent to it, best effort retry
|
|
|
|
|
scheduleOnce(RetryResolveRemoteDeployedRecipient(
|
|
|
|
|
retryResolveRemoteDeployedRecipientAttempts,
|
2016-07-01 11:54:57 +02:00
|
|
|
headerBuilder.recipientActorRefPath.get, decoded), retryResolveRemoteDeployedRecipientInterval) // FIXME IS THIS SAFE?
|
2016-06-09 09:16:44 +02:00
|
|
|
} else
|
|
|
|
|
push(out, decoded)
|
2016-05-27 11:24:08 +02:00
|
|
|
} catch {
|
|
|
|
|
case NonFatal(e) ⇒
|
2016-06-03 11:59:00 +02:00
|
|
|
log.warning(
|
|
|
|
|
"Failed to deserialize message with serializer id [{}] and manifest [{}]. {}",
|
2016-07-01 11:54:57 +02:00
|
|
|
headerBuilder.serializer, headerBuilder.manifest(originUid), e.getMessage)
|
2016-05-27 11:24:08 +02:00
|
|
|
pull(in)
|
|
|
|
|
} finally {
|
2016-06-06 08:26:15 +02:00
|
|
|
bufferPool.release(envelope)
|
2016-05-27 11:24:08 +02:00
|
|
|
}
|
2016-05-05 14:38:48 +02:00
|
|
|
}
|
|
|
|
|
|
2016-06-09 09:16:44 +02:00
|
|
|
private def resolveRecipient(path: String): OptionVal[InternalActorRef] = {
|
2016-06-23 11:58:54 +02:00
|
|
|
resolveActorRefWithLocalAddress(path) match {
|
|
|
|
|
case empty: EmptyLocalActorRef ⇒
|
|
|
|
|
val pathElements = empty.path.elements
|
|
|
|
|
// FIXME remote deployment corner case, please fix @patriknw (see also below, in onTimer)
|
|
|
|
|
if (pathElements.nonEmpty && pathElements.head == "remote") OptionVal.None
|
|
|
|
|
else OptionVal(empty)
|
2016-06-09 09:16:44 +02:00
|
|
|
case ref ⇒ OptionVal(ref)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2016-05-05 14:38:48 +02:00
|
|
|
override def onPull(): Unit = pull(in)
|
|
|
|
|
|
2016-06-09 09:16:44 +02:00
|
|
|
override protected def onTimer(timerKey: Any): Unit = {
|
|
|
|
|
timerKey match {
|
|
|
|
|
case RetryResolveRemoteDeployedRecipient(attemptsLeft, recipientPath, inboundEnvelope) ⇒
|
|
|
|
|
resolveRecipient(recipientPath) match {
|
|
|
|
|
case OptionVal.None ⇒
|
|
|
|
|
if (attemptsLeft > 0)
|
|
|
|
|
scheduleOnce(RetryResolveRemoteDeployedRecipient(
|
|
|
|
|
attemptsLeft - 1,
|
2016-06-23 11:58:54 +02:00
|
|
|
recipientPath, inboundEnvelope), retryResolveRemoteDeployedRecipientInterval)
|
2016-06-09 09:16:44 +02:00
|
|
|
else {
|
|
|
|
|
val recipient = resolveActorRefWithLocalAddress(recipientPath)
|
2016-06-23 11:58:54 +02:00
|
|
|
// FIXME only retry for the first message, need to keep them in a cache
|
2016-06-09 09:16:44 +02:00
|
|
|
push(out, inboundEnvelope.withRecipient(recipient))
|
|
|
|
|
}
|
|
|
|
|
case OptionVal.Some(recipient) ⇒
|
|
|
|
|
push(out, inboundEnvelope.withRecipient(recipient))
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2016-05-05 14:38:48 +02:00
|
|
|
setHandlers(in, out, this)
|
|
|
|
|
}
|
|
|
|
|
}
|
2016-06-23 11:58:54 +02:00
|
|
|
|