2016-06-23 11:58:54 +02:00
|
|
|
/**
|
2017-01-04 17:37:10 +01:00
|
|
|
* Copyright (C) 2016-2017 Lightbend Inc. <http://www.lightbend.com>
|
2016-06-23 11:58:54 +02:00
|
|
|
*/
|
2016-05-05 14:38:48 +02:00
|
|
|
package akka.remote.artery
|
|
|
|
|
|
2017-01-13 10:33:55 +01:00
|
|
|
import java.util.concurrent.TimeUnit
|
|
|
|
|
|
|
|
|
|
import akka.Done
|
|
|
|
|
import akka.actor.{ EmptyLocalActorRef, _ }
|
|
|
|
|
import akka.event.Logging
|
|
|
|
|
import akka.remote.artery.Decoder.{ AdvertiseActorRefsCompressionTable, AdvertiseClassManifestsCompressionTable, InboundCompressionAccess, InboundCompressionAccessImpl }
|
|
|
|
|
import akka.remote.artery.FlightRecorderEvents.AeronSource_Started
|
2016-05-27 11:24:08 +02:00
|
|
|
import akka.remote.artery.SystemMessageDelivery.SystemMessageEnvelope
|
2017-01-13 10:33:55 +01:00
|
|
|
import akka.remote.artery.compress.CompressionProtocol._
|
|
|
|
|
import akka.remote.artery.compress._
|
|
|
|
|
import akka.remote.{ MessageSerializer, OversizedPayloadException, RemoteActorRefProvider, UniqueAddress }
|
2016-05-05 14:38:48 +02:00
|
|
|
import akka.serialization.{ Serialization, SerializationExtension }
|
|
|
|
|
import akka.stream._
|
2016-10-28 16:05:56 +02:00
|
|
|
import akka.stream.stage._
|
2017-01-13 10:33:55 +01:00
|
|
|
import akka.util.OptionVal
|
2016-09-02 18:09:43 +02:00
|
|
|
|
2017-01-13 10:33:55 +01:00
|
|
|
import scala.concurrent.duration._
|
|
|
|
|
import scala.concurrent.{ Future, Promise }
|
|
|
|
|
import scala.util.control.NonFatal
|
2016-09-05 22:44:22 +02:00
|
|
|
|
2016-08-24 19:52:07 +02:00
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
|
|
|
|
private[remote] object Encoder {
|
2017-01-13 10:33:55 +01:00
|
|
|
private[remote] trait OutboundCompressionAccess {
|
2016-08-24 19:52:07 +02:00
|
|
|
def changeActorRefCompression(table: CompressionTable[ActorRef]): Future[Done]
|
|
|
|
|
def changeClassManifestCompression(table: CompressionTable[String]): Future[Done]
|
|
|
|
|
def clearCompression(): Future[Done]
|
|
|
|
|
}
|
|
|
|
|
|
2017-01-13 10:33:55 +01:00
|
|
|
private[remote] class AccessOutboundCompressionFailed
|
|
|
|
|
extends RuntimeException("Change of outbound compression table failed (will be retried), because materialization did not complete yet")
|
2016-08-30 14:37:11 +02:00
|
|
|
|
2016-08-24 19:52:07 +02:00
|
|
|
}
|
2016-05-05 14:38:48 +02:00
|
|
|
|
2016-06-09 09:16:44 +02:00
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
|
|
|
|
private[remote] class Encoder(
|
2016-06-29 17:09:33 +02:00
|
|
|
uniqueLocalAddress: UniqueAddress,
|
2016-09-05 22:44:22 +02:00
|
|
|
system: ExtendedActorSystem,
|
2016-06-29 17:09:33 +02:00
|
|
|
outboundEnvelopePool: ObjectPool[ReusableOutboundEnvelope],
|
2016-09-23 12:30:54 +02:00
|
|
|
bufferPool: EnvelopeBufferPool,
|
|
|
|
|
debugLogSend: Boolean)
|
2017-01-13 10:33:55 +01:00
|
|
|
extends GraphStageWithMaterializedValue[FlowShape[OutboundEnvelope, EnvelopeBuffer], Encoder.OutboundCompressionAccess] {
|
2016-08-24 19:52:07 +02:00
|
|
|
import Encoder._
|
2016-06-29 17:09:33 +02:00
|
|
|
|
|
|
|
|
val in: Inlet[OutboundEnvelope] = Inlet("Artery.Encoder.in")
|
2016-05-05 14:38:48 +02:00
|
|
|
val out: Outlet[EnvelopeBuffer] = Outlet("Artery.Encoder.out")
|
2016-06-29 17:09:33 +02:00
|
|
|
val shape: FlowShape[OutboundEnvelope, EnvelopeBuffer] = FlowShape(in, out)
|
2016-05-05 14:38:48 +02:00
|
|
|
|
2017-01-13 10:33:55 +01:00
|
|
|
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, OutboundCompressionAccess) = {
|
|
|
|
|
val logic = new GraphStageLogic(shape) with InHandler with OutHandler with StageLogging with OutboundCompressionAccess {
|
2016-05-05 14:38:48 +02:00
|
|
|
|
2016-08-24 19:52:07 +02:00
|
|
|
private val headerBuilder = HeaderBuilder.out()
|
2016-07-01 11:54:57 +02:00
|
|
|
headerBuilder setVersion ArteryTransport.Version
|
|
|
|
|
headerBuilder setUid uniqueLocalAddress.uid
|
2016-05-26 10:42:08 +02:00
|
|
|
private val localAddress = uniqueLocalAddress.address
|
|
|
|
|
private val serialization = SerializationExtension(system)
|
2016-05-27 08:50:41 +02:00
|
|
|
private val serializationInfo = Serialization.Information(localAddress, system)
|
2016-05-05 14:38:48 +02:00
|
|
|
|
2016-12-06 09:10:12 +01:00
|
|
|
private val instruments: RemoteInstruments = RemoteInstruments(system)
|
2016-09-05 22:44:22 +02:00
|
|
|
|
2016-08-24 19:52:07 +02:00
|
|
|
private val changeActorRefCompressionCb = getAsyncCallback[(CompressionTable[ActorRef], Promise[Done])] {
|
|
|
|
|
case (table, done) ⇒
|
|
|
|
|
headerBuilder.setOutboundActorRefCompression(table)
|
|
|
|
|
done.success(Done)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private val changeClassManifsetCompressionCb = getAsyncCallback[(CompressionTable[String], Promise[Done])] {
|
|
|
|
|
case (table, done) ⇒
|
|
|
|
|
headerBuilder.setOutboundClassManifestCompression(table)
|
|
|
|
|
done.success(Done)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private val clearCompressionCb = getAsyncCallback[Promise[Done]] { done ⇒
|
|
|
|
|
headerBuilder.setOutboundActorRefCompression(CompressionTable.empty[ActorRef])
|
|
|
|
|
headerBuilder.setOutboundClassManifestCompression(CompressionTable.empty[String])
|
|
|
|
|
done.success(Done)
|
|
|
|
|
}
|
|
|
|
|
|
2016-05-27 11:24:08 +02:00
|
|
|
override protected def logSource = classOf[Encoder]
|
|
|
|
|
|
2016-09-23 12:30:54 +02:00
|
|
|
private var debugLogSendEnabled = false
|
|
|
|
|
|
|
|
|
|
override def preStart(): Unit = {
|
|
|
|
|
debugLogSendEnabled = debugLogSend && log.isDebugEnabled
|
|
|
|
|
}
|
|
|
|
|
|
2016-05-05 14:38:48 +02:00
|
|
|
override def onPush(): Unit = {
|
2016-06-29 17:09:33 +02:00
|
|
|
val outboundEnvelope = grab(in)
|
2016-06-06 08:26:15 +02:00
|
|
|
val envelope = bufferPool.acquire()
|
2016-05-05 14:38:48 +02:00
|
|
|
|
fix many bugs in InboundCompressions, #21464
* comprehensive integration test that revealed many bugs
* confirmations of manifests were wrong, at two places
* using wrong tables when system is restarted, including
originUid in the tables with checks when receiving advertisments
* close (stop scheduling) of advertisments when new incarnation,
quarantine, or restart
* cleanup how deadLetters ref was treated, and made it more robust
* make Decoder tolerant to decompression failures, can happen in
case of system restart before handshake completed
* give up resending advertisment after a few attempts without confirmation,
to avoid keeping outbound association open to possible dead system
* don't advertise new table when no inbound messages,
to avoid keeping outbound association open to possible dead system
* HeaderBuilder could use manifest field from previous message, added
resetMessageFields
* No compression for ArteryMessage, e.g. handshake messages must go
through without depending on compression tables being in sync
* improve debug logging, including originUid
2016-09-15 11:27:00 +02:00
|
|
|
headerBuilder.resetMessageFields()
|
|
|
|
|
// don't use outbound compression for ArteryMessage, e.g. handshake messages must get through
|
|
|
|
|
// without depending on compression tables being in sync when systems are restarted
|
|
|
|
|
headerBuilder.useOutboundCompression(!outboundEnvelope.message.isInstanceOf[ArteryMessage])
|
|
|
|
|
|
2016-06-23 11:58:54 +02:00
|
|
|
// internally compression is applied by the builder:
|
2016-06-29 17:09:33 +02:00
|
|
|
outboundEnvelope.recipient match {
|
|
|
|
|
case OptionVal.Some(r) ⇒ headerBuilder setRecipientActorRef r
|
|
|
|
|
case OptionVal.None ⇒ headerBuilder.setNoRecipient()
|
|
|
|
|
}
|
2016-05-05 14:38:48 +02:00
|
|
|
|
2016-05-27 08:50:41 +02:00
|
|
|
try {
|
2016-05-27 11:24:08 +02:00
|
|
|
// avoiding currentTransportInformation.withValue due to thunk allocation
|
|
|
|
|
val oldValue = Serialization.currentTransportInformation.value
|
|
|
|
|
try {
|
|
|
|
|
Serialization.currentTransportInformation.value = serializationInfo
|
2016-06-23 11:58:54 +02:00
|
|
|
|
2016-06-29 17:09:33 +02:00
|
|
|
outboundEnvelope.sender match {
|
2016-06-23 11:58:54 +02:00
|
|
|
case OptionVal.None ⇒ headerBuilder.setNoSender()
|
2016-07-01 11:54:57 +02:00
|
|
|
case OptionVal.Some(s) ⇒ headerBuilder setSenderActorRef s
|
2016-06-23 11:58:54 +02:00
|
|
|
}
|
|
|
|
|
|
2016-12-06 09:10:12 +01:00
|
|
|
val startTime: Long = if (instruments.timeSerialization) System.nanoTime else 0
|
|
|
|
|
if (instruments.nonEmpty)
|
|
|
|
|
headerBuilder.setRemoteInstruments(instruments)
|
|
|
|
|
|
|
|
|
|
MessageSerializer.serializeForArtery(serialization, outboundEnvelope, headerBuilder, envelope)
|
|
|
|
|
|
|
|
|
|
if (instruments.nonEmpty) {
|
|
|
|
|
val time = if (instruments.timeSerialization) System.nanoTime - startTime else 0
|
|
|
|
|
instruments.messageSent(outboundEnvelope, envelope.byteBuffer.position(), time)
|
|
|
|
|
}
|
2016-06-23 11:58:54 +02:00
|
|
|
} finally Serialization.currentTransportInformation.value = oldValue
|
2016-05-27 11:24:08 +02:00
|
|
|
|
|
|
|
|
envelope.byteBuffer.flip()
|
2016-09-23 12:30:54 +02:00
|
|
|
|
|
|
|
|
if (debugLogSendEnabled)
|
|
|
|
|
log.debug(
|
|
|
|
|
"sending remote message [{}] to [{}] from [{}]",
|
|
|
|
|
Logging.messageClassName(outboundEnvelope.message),
|
|
|
|
|
outboundEnvelope.recipient.getOrElse(""), outboundEnvelope.sender.getOrElse(""))
|
|
|
|
|
|
2016-05-27 11:24:08 +02:00
|
|
|
push(out, envelope)
|
|
|
|
|
|
|
|
|
|
} catch {
|
|
|
|
|
case NonFatal(e) ⇒
|
2016-06-06 08:26:15 +02:00
|
|
|
bufferPool.release(envelope)
|
2016-06-29 17:09:33 +02:00
|
|
|
outboundEnvelope.message match {
|
2016-05-27 11:24:08 +02:00
|
|
|
case _: SystemMessageEnvelope ⇒
|
2016-09-19 11:17:41 +02:00
|
|
|
log.error(e, "Failed to serialize system message [{}].",
|
|
|
|
|
Logging.messageClassName(outboundEnvelope.message))
|
2016-05-27 11:24:08 +02:00
|
|
|
throw e
|
2016-06-08 10:04:30 +02:00
|
|
|
case _ if e.isInstanceOf[java.nio.BufferOverflowException] ⇒
|
2016-09-19 11:17:41 +02:00
|
|
|
val reason = new OversizedPayloadException("Discarding oversized payload sent to " +
|
|
|
|
|
s"${outboundEnvelope.recipient}: max allowed size ${envelope.byteBuffer.limit()} " +
|
|
|
|
|
s"bytes. Message type [${Logging.messageClassName(outboundEnvelope.message)}].")
|
|
|
|
|
log.error(reason, "Failed to serialize oversized message [{}].",
|
|
|
|
|
Logging.messageClassName(outboundEnvelope.message))
|
2016-06-08 10:04:30 +02:00
|
|
|
pull(in)
|
2016-05-27 11:24:08 +02:00
|
|
|
case _ ⇒
|
2016-09-19 11:17:41 +02:00
|
|
|
log.error(e, "Failed to serialize message [{}].", Logging.messageClassName(outboundEnvelope.message))
|
2016-05-27 11:24:08 +02:00
|
|
|
pull(in)
|
|
|
|
|
}
|
2016-06-29 17:09:33 +02:00
|
|
|
} finally {
|
|
|
|
|
outboundEnvelope match {
|
|
|
|
|
case r: ReusableOutboundEnvelope ⇒ outboundEnvelopePool.release(r)
|
2016-09-05 22:44:22 +02:00
|
|
|
case _ ⇒ // no need to release it
|
2016-06-29 17:09:33 +02:00
|
|
|
}
|
2016-05-27 11:24:08 +02:00
|
|
|
}
|
2016-05-05 14:38:48 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
override def onPull(): Unit = pull(in)
|
|
|
|
|
|
2016-08-24 19:52:07 +02:00
|
|
|
/**
|
|
|
|
|
* External call from ChangeOutboundCompression materialized value
|
|
|
|
|
*/
|
|
|
|
|
override def changeActorRefCompression(table: CompressionTable[ActorRef]): Future[Done] = {
|
|
|
|
|
val done = Promise[Done]()
|
|
|
|
|
try changeActorRefCompressionCb.invoke((table, done)) catch {
|
|
|
|
|
// This is a harmless failure, it will be retried on next advertisement or handshake attempt.
|
|
|
|
|
// It will only occur when callback is invoked before preStart. That is highly unlikely to
|
|
|
|
|
// happen since advertisement is not done immediately and handshake involves network roundtrip.
|
2017-01-13 10:33:55 +01:00
|
|
|
case NonFatal(_) ⇒ done.tryFailure(new AccessOutboundCompressionFailed)
|
2016-08-24 19:52:07 +02:00
|
|
|
}
|
|
|
|
|
done.future
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* External call from ChangeOutboundCompression materialized value
|
|
|
|
|
*/
|
|
|
|
|
override def changeClassManifestCompression(table: CompressionTable[String]): Future[Done] = {
|
|
|
|
|
val done = Promise[Done]()
|
|
|
|
|
try changeClassManifsetCompressionCb.invoke((table, done)) catch {
|
|
|
|
|
// in case materialization not completed yet
|
2017-01-13 10:33:55 +01:00
|
|
|
case NonFatal(_) ⇒ done.tryFailure(new AccessOutboundCompressionFailed)
|
2016-08-24 19:52:07 +02:00
|
|
|
}
|
|
|
|
|
done.future
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* External call from ChangeOutboundCompression materialized value
|
|
|
|
|
*/
|
|
|
|
|
override def clearCompression(): Future[Done] = {
|
|
|
|
|
val done = Promise[Done]()
|
|
|
|
|
try clearCompressionCb.invoke(done) catch {
|
|
|
|
|
// in case materialization not completed yet
|
2017-01-13 10:33:55 +01:00
|
|
|
case NonFatal(_) ⇒ done.tryFailure(new AccessOutboundCompressionFailed)
|
2016-08-24 19:52:07 +02:00
|
|
|
}
|
|
|
|
|
done.future
|
|
|
|
|
}
|
|
|
|
|
|
2016-05-05 14:38:48 +02:00
|
|
|
setHandlers(in, out, this)
|
|
|
|
|
}
|
2016-08-24 19:52:07 +02:00
|
|
|
|
|
|
|
|
(logic, logic)
|
|
|
|
|
}
|
2016-05-05 14:38:48 +02:00
|
|
|
}
|
|
|
|
|
|
2016-06-09 09:16:44 +02:00
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
|
|
|
|
private[remote] object Decoder {
|
|
|
|
|
private final case class RetryResolveRemoteDeployedRecipient(
|
|
|
|
|
attemptsLeft: Int,
|
|
|
|
|
recipientPath: String,
|
|
|
|
|
inboundEnvelope: InboundEnvelope)
|
2016-07-04 15:59:44 +02:00
|
|
|
|
|
|
|
|
private object Tick
|
2017-01-13 10:33:55 +01:00
|
|
|
|
|
|
|
|
/** Materialized value of [[Encoder]] which allows safely calling into the stage to interfact with compression tables. */
|
|
|
|
|
private[remote] trait InboundCompressionAccess {
|
|
|
|
|
def confirmActorRefCompressionAdvertisementAck(ack: ActorRefCompressionAdvertisementAck): Future[Done]
|
|
|
|
|
def confirmClassManifestCompressionAdvertisementAck(ack: ClassManifestCompressionAdvertisementAck): Future[Done]
|
|
|
|
|
def closeCompressionFor(originUid: Long): Future[Done]
|
|
|
|
|
|
|
|
|
|
/** For testing purposes, usually triggered by timer from within Decoder stage. */
|
|
|
|
|
def runNextActorRefAdvertisement(): Unit
|
|
|
|
|
/** For testing purposes, usually triggered by timer from within Decoder stage. */
|
|
|
|
|
def runNextClassManifestAdvertisement(): Unit
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private[remote] trait InboundCompressionAccessImpl extends InboundCompressionAccess {
|
|
|
|
|
this: GraphStageLogic with StageLogging ⇒
|
|
|
|
|
|
|
|
|
|
def compressions: InboundCompressions
|
|
|
|
|
|
|
|
|
|
private val closeCompressionForCb = getAsyncCallback[(Long, Promise[Done])] {
|
|
|
|
|
case (uid, done) ⇒
|
|
|
|
|
compressions.close(uid)
|
|
|
|
|
done.success(Done)
|
|
|
|
|
}
|
|
|
|
|
private val confirmActorRefCompressionAdvertisementCb = getAsyncCallback[(ActorRefCompressionAdvertisementAck, Promise[Done])] {
|
|
|
|
|
case (ActorRefCompressionAdvertisementAck(from, tableVersion), done) ⇒
|
|
|
|
|
compressions.confirmActorRefCompressionAdvertisement(from.uid, tableVersion)
|
|
|
|
|
done.success(Done)
|
|
|
|
|
}
|
|
|
|
|
private val confirmClassManifestCompressionAdvertisementCb = getAsyncCallback[(ClassManifestCompressionAdvertisementAck, Promise[Done])] {
|
|
|
|
|
case (ClassManifestCompressionAdvertisementAck(from, tableVersion), done) ⇒
|
|
|
|
|
compressions.confirmClassManifestCompressionAdvertisement(from.uid, tableVersion)
|
|
|
|
|
done.success(Done)
|
|
|
|
|
}
|
|
|
|
|
private val runNextActorRefAdvertisementCb = getAsyncCallback[Unit] {
|
|
|
|
|
_ ⇒ compressions.runNextActorRefAdvertisement()
|
|
|
|
|
}
|
|
|
|
|
private val runNextClassManifestAdvertisementCb = getAsyncCallback[Unit] {
|
|
|
|
|
_ ⇒ compressions.runNextClassManifestAdvertisement()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// TODO in practice though all those CB's will always succeed, no need for the futures etc IMO
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* External call from ChangeInboundCompression materialized value
|
|
|
|
|
*/
|
|
|
|
|
override def closeCompressionFor(originUid: Long): Future[Done] = {
|
|
|
|
|
val done = Promise[Done]()
|
|
|
|
|
try closeCompressionForCb.invoke((originUid, done)) catch {
|
|
|
|
|
// in case materialization not completed yet
|
|
|
|
|
case NonFatal(_) ⇒ done.tryFailure(new AccessInboundCompressionFailed)
|
|
|
|
|
}
|
|
|
|
|
done.future
|
|
|
|
|
}
|
|
|
|
|
/**
|
|
|
|
|
* External call from ChangeInboundCompression materialized value
|
|
|
|
|
*/
|
|
|
|
|
override def confirmActorRefCompressionAdvertisementAck(ack: ActorRefCompressionAdvertisementAck): Future[Done] = {
|
|
|
|
|
val done = Promise[Done]()
|
|
|
|
|
try confirmActorRefCompressionAdvertisementCb.invoke((ack, done)) catch {
|
|
|
|
|
// in case materialization not completed yet
|
|
|
|
|
case NonFatal(_) ⇒ done.tryFailure(new AccessInboundCompressionFailed)
|
|
|
|
|
}
|
|
|
|
|
done.future
|
|
|
|
|
}
|
|
|
|
|
/**
|
|
|
|
|
* External call from ChangeInboundCompression materialized value
|
|
|
|
|
*/
|
|
|
|
|
override def confirmClassManifestCompressionAdvertisementAck(ack: ClassManifestCompressionAdvertisementAck): Future[Done] = {
|
|
|
|
|
val done = Promise[Done]()
|
|
|
|
|
try confirmClassManifestCompressionAdvertisementCb.invoke((ack, done)) catch {
|
|
|
|
|
case NonFatal(_) ⇒ done.tryFailure(new AccessInboundCompressionFailed)
|
|
|
|
|
}
|
|
|
|
|
done.future
|
|
|
|
|
}
|
|
|
|
|
/**
|
|
|
|
|
* External call from ChangeInboundCompression materialized value
|
|
|
|
|
*/
|
|
|
|
|
override def runNextActorRefAdvertisement(): Unit =
|
|
|
|
|
runNextActorRefAdvertisementCb.invoke()
|
|
|
|
|
/**
|
|
|
|
|
* External call from ChangeInboundCompression materialized value
|
|
|
|
|
*/
|
|
|
|
|
override def runNextClassManifestAdvertisement(): Unit =
|
|
|
|
|
runNextClassManifestAdvertisementCb.invoke()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private[remote] class AccessInboundCompressionFailed
|
|
|
|
|
extends RuntimeException("Change of inbound compression table failed (will be retried), because materialization did not complete yet")
|
|
|
|
|
|
|
|
|
|
// timer keys
|
|
|
|
|
private case object AdvertiseActorRefsCompressionTable
|
|
|
|
|
private case object AdvertiseClassManifestsCompressionTable
|
|
|
|
|
|
2016-06-09 09:16:44 +02:00
|
|
|
}
|
|
|
|
|
|
2016-09-02 18:09:43 +02:00
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
2016-09-29 10:50:37 +02:00
|
|
|
private[remote] final class ActorRefResolveCacheWithAddress(provider: RemoteActorRefProvider, localAddress: UniqueAddress)
|
2016-09-02 18:09:43 +02:00
|
|
|
extends LruBoundedCache[String, InternalActorRef](capacity = 1024, evictAgeThreshold = 600) {
|
|
|
|
|
|
|
|
|
|
override protected def compute(k: String): InternalActorRef =
|
|
|
|
|
provider.resolveActorRefWithLocalAddress(k, localAddress.address)
|
|
|
|
|
|
|
|
|
|
override protected def hash(k: String): Int = FastHash.ofString(k)
|
|
|
|
|
|
|
|
|
|
override protected def isCacheable(v: InternalActorRef): Boolean = !v.isInstanceOf[EmptyLocalActorRef]
|
|
|
|
|
}
|
|
|
|
|
|
2016-06-09 09:16:44 +02:00
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
|
|
|
|
private[remote] class Decoder(
|
2017-01-13 10:33:55 +01:00
|
|
|
inboundContext: InboundContext,
|
|
|
|
|
system: ExtendedActorSystem,
|
|
|
|
|
uniqueLocalAddress: UniqueAddress,
|
|
|
|
|
settings: ArterySettings,
|
|
|
|
|
bufferPool: EnvelopeBufferPool,
|
|
|
|
|
inboundCompressions: InboundCompressions,
|
|
|
|
|
inEnvelopePool: ObjectPool[ReusableInboundEnvelope])
|
|
|
|
|
extends GraphStageWithMaterializedValue[FlowShape[EnvelopeBuffer, InboundEnvelope], InboundCompressionAccess] {
|
|
|
|
|
|
2016-07-04 15:59:44 +02:00
|
|
|
import Decoder.Tick
|
2016-05-05 14:38:48 +02:00
|
|
|
val in: Inlet[EnvelopeBuffer] = Inlet("Artery.Decoder.in")
|
|
|
|
|
val out: Outlet[InboundEnvelope] = Outlet("Artery.Decoder.out")
|
|
|
|
|
val shape: FlowShape[EnvelopeBuffer, InboundEnvelope] = FlowShape(in, out)
|
|
|
|
|
|
2017-01-13 10:33:55 +01:00
|
|
|
def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, InboundCompressionAccess) = {
|
|
|
|
|
val logic = new TimerGraphStageLogic(shape) with InboundCompressionAccessImpl with InHandler with OutHandler with StageLogging {
|
2016-06-09 09:16:44 +02:00
|
|
|
import Decoder.RetryResolveRemoteDeployedRecipient
|
2017-01-13 10:33:55 +01:00
|
|
|
|
|
|
|
|
override val compressions = inboundCompressions
|
|
|
|
|
|
2016-06-10 13:04:23 +02:00
|
|
|
private val localAddress = inboundContext.localAddress.address
|
2017-01-13 10:33:55 +01:00
|
|
|
private val headerBuilder = HeaderBuilder.in(compressions)
|
2016-09-27 16:34:43 +02:00
|
|
|
private val actorRefResolver: ActorRefResolveCacheWithAddress =
|
|
|
|
|
new ActorRefResolveCacheWithAddress(system.provider.asInstanceOf[RemoteActorRefProvider], uniqueLocalAddress)
|
2016-09-09 10:15:12 +02:00
|
|
|
private val bannedRemoteDeployedActorRefs = new java.util.HashSet[String]
|
2016-05-20 10:33:55 +02:00
|
|
|
|
2016-06-09 09:16:44 +02:00
|
|
|
private val retryResolveRemoteDeployedRecipientInterval = 50.millis
|
|
|
|
|
private val retryResolveRemoteDeployedRecipientAttempts = 20
|
|
|
|
|
|
2016-07-04 15:59:44 +02:00
|
|
|
// adaptive sampling when rate > 1000 msg/s
|
|
|
|
|
private var messageCount = 0L
|
2016-07-07 10:27:24 +02:00
|
|
|
private var heavyHitterMask = 0 // 0 => no sampling, otherwise power of two - 1
|
2016-07-04 15:59:44 +02:00
|
|
|
private val adaptiveSamplingRateThreshold = 1000
|
|
|
|
|
private var tickTimestamp = System.nanoTime()
|
|
|
|
|
private var tickMessageCount = 0L
|
|
|
|
|
|
2016-05-27 11:24:08 +02:00
|
|
|
override protected def logSource = classOf[Decoder]
|
|
|
|
|
|
2016-07-04 15:59:44 +02:00
|
|
|
override def preStart(): Unit = {
|
|
|
|
|
schedulePeriodically(Tick, 1.seconds)
|
|
|
|
|
|
2017-01-13 10:33:55 +01:00
|
|
|
if (settings.Advanced.Compression.Enabled) {
|
|
|
|
|
settings.Advanced.Compression.ActorRefs.AdvertisementInterval match {
|
|
|
|
|
case d: FiniteDuration ⇒ schedulePeriodicallyWithInitialDelay(AdvertiseActorRefsCompressionTable, d, d)
|
|
|
|
|
case _ ⇒ // not advertising actor ref compressions
|
|
|
|
|
}
|
|
|
|
|
settings.Advanced.Compression.Manifests.AdvertisementInterval match {
|
|
|
|
|
case d: FiniteDuration ⇒ schedulePeriodicallyWithInitialDelay(AdvertiseClassManifestsCompressionTable, d, d)
|
|
|
|
|
case _ ⇒ // not advertising class manifest compressions
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
2016-05-05 14:38:48 +02:00
|
|
|
override def onPush(): Unit = {
|
2016-07-04 15:59:44 +02:00
|
|
|
messageCount += 1
|
2016-05-05 14:38:48 +02:00
|
|
|
val envelope = grab(in)
|
fix many bugs in InboundCompressions, #21464
* comprehensive integration test that revealed many bugs
* confirmations of manifests were wrong, at two places
* using wrong tables when system is restarted, including
originUid in the tables with checks when receiving advertisments
* close (stop scheduling) of advertisments when new incarnation,
quarantine, or restart
* cleanup how deadLetters ref was treated, and made it more robust
* make Decoder tolerant to decompression failures, can happen in
case of system restart before handshake completed
* give up resending advertisment after a few attempts without confirmation,
to avoid keeping outbound association open to possible dead system
* don't advertise new table when no inbound messages,
to avoid keeping outbound association open to possible dead system
* HeaderBuilder could use manifest field from previous message, added
resetMessageFields
* No compression for ArteryMessage, e.g. handshake messages must go
through without depending on compression tables being in sync
* improve debug logging, including originUid
2016-09-15 11:27:00 +02:00
|
|
|
headerBuilder.resetMessageFields()
|
2016-05-05 14:38:48 +02:00
|
|
|
envelope.parseHeader(headerBuilder)
|
|
|
|
|
|
2016-06-10 13:04:23 +02:00
|
|
|
val originUid = headerBuilder.uid
|
|
|
|
|
val association = inboundContext.association(originUid)
|
|
|
|
|
|
fix many bugs in InboundCompressions, #21464
* comprehensive integration test that revealed many bugs
* confirmations of manifests were wrong, at two places
* using wrong tables when system is restarted, including
originUid in the tables with checks when receiving advertisments
* close (stop scheduling) of advertisments when new incarnation,
quarantine, or restart
* cleanup how deadLetters ref was treated, and made it more robust
* make Decoder tolerant to decompression failures, can happen in
case of system restart before handshake completed
* give up resending advertisment after a few attempts without confirmation,
to avoid keeping outbound association open to possible dead system
* don't advertise new table when no inbound messages,
to avoid keeping outbound association open to possible dead system
* HeaderBuilder could use manifest field from previous message, added
resetMessageFields
* No compression for ArteryMessage, e.g. handshake messages must go
through without depending on compression tables being in sync
* improve debug logging, including originUid
2016-09-15 11:27:00 +02:00
|
|
|
val recipient: OptionVal[InternalActorRef] = try headerBuilder.recipientActorRef(originUid) match {
|
2016-07-01 11:54:57 +02:00
|
|
|
case OptionVal.Some(ref) ⇒
|
|
|
|
|
OptionVal(ref.asInstanceOf[InternalActorRef])
|
2016-07-04 16:42:14 +02:00
|
|
|
case OptionVal.None if headerBuilder.recipientActorRefPath.isDefined ⇒
|
2016-07-01 11:54:57 +02:00
|
|
|
resolveRecipient(headerBuilder.recipientActorRefPath.get)
|
2016-07-04 16:42:14 +02:00
|
|
|
case _ ⇒
|
|
|
|
|
OptionVal.None
|
fix many bugs in InboundCompressions, #21464
* comprehensive integration test that revealed many bugs
* confirmations of manifests were wrong, at two places
* using wrong tables when system is restarted, including
originUid in the tables with checks when receiving advertisments
* close (stop scheduling) of advertisments when new incarnation,
quarantine, or restart
* cleanup how deadLetters ref was treated, and made it more robust
* make Decoder tolerant to decompression failures, can happen in
case of system restart before handshake completed
* give up resending advertisment after a few attempts without confirmation,
to avoid keeping outbound association open to possible dead system
* don't advertise new table when no inbound messages,
to avoid keeping outbound association open to possible dead system
* HeaderBuilder could use manifest field from previous message, added
resetMessageFields
* No compression for ArteryMessage, e.g. handshake messages must go
through without depending on compression tables being in sync
* improve debug logging, including originUid
2016-09-15 11:27:00 +02:00
|
|
|
} catch {
|
|
|
|
|
case NonFatal(e) ⇒
|
|
|
|
|
// probably version mismatch due to restarted system
|
|
|
|
|
log.warning("Couldn't decompress sender from originUid [{}]. {}", originUid, e.getMessage)
|
|
|
|
|
OptionVal.None
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
val sender: OptionVal[InternalActorRef] = try headerBuilder.senderActorRef(originUid) match {
|
|
|
|
|
case OptionVal.Some(ref) ⇒
|
|
|
|
|
OptionVal(ref.asInstanceOf[InternalActorRef])
|
|
|
|
|
case OptionVal.None if headerBuilder.senderActorRefPath.isDefined ⇒
|
|
|
|
|
OptionVal(actorRefResolver.getOrCompute(headerBuilder.senderActorRefPath.get))
|
|
|
|
|
case _ ⇒
|
|
|
|
|
OptionVal.None
|
|
|
|
|
} catch {
|
|
|
|
|
case NonFatal(e) ⇒
|
|
|
|
|
// probably version mismatch due to restarted system
|
|
|
|
|
log.warning("Couldn't decompress sender from originUid [{}]. {}", originUid, e.getMessage)
|
|
|
|
|
OptionVal.None
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
val classManifestOpt = try headerBuilder.manifest(originUid) catch {
|
|
|
|
|
case NonFatal(e) ⇒
|
|
|
|
|
// probably version mismatch due to restarted system
|
|
|
|
|
log.warning("Couldn't decompress manifest from originUid [{}]. {}", originUid, e.getMessage)
|
|
|
|
|
OptionVal.None
|
2016-06-23 11:58:54 +02:00
|
|
|
}
|
|
|
|
|
|
fix many bugs in InboundCompressions, #21464
* comprehensive integration test that revealed many bugs
* confirmations of manifests were wrong, at two places
* using wrong tables when system is restarted, including
originUid in the tables with checks when receiving advertisments
* close (stop scheduling) of advertisments when new incarnation,
quarantine, or restart
* cleanup how deadLetters ref was treated, and made it more robust
* make Decoder tolerant to decompression failures, can happen in
case of system restart before handshake completed
* give up resending advertisment after a few attempts without confirmation,
to avoid keeping outbound association open to possible dead system
* don't advertise new table when no inbound messages,
to avoid keeping outbound association open to possible dead system
* HeaderBuilder could use manifest field from previous message, added
resetMessageFields
* No compression for ArteryMessage, e.g. handshake messages must go
through without depending on compression tables being in sync
* improve debug logging, including originUid
2016-09-15 11:27:00 +02:00
|
|
|
if ((recipient.isEmpty && headerBuilder.recipientActorRefPath.isEmpty && !headerBuilder.isNoRecipient) ||
|
|
|
|
|
(sender.isEmpty && headerBuilder.senderActorRefPath.isEmpty && !headerBuilder.isNoSender)) {
|
|
|
|
|
log.debug("Dropping message for unknown recipient/sender. It was probably sent from system [{}] with compression " +
|
|
|
|
|
"table [{}] built for previous incarnation of the destination system, or it was compressed with a table " +
|
|
|
|
|
"that has already been discarded in the destination system.", originUid,
|
|
|
|
|
headerBuilder.inboundActorRefCompressionTableVersion)
|
|
|
|
|
pull(in)
|
|
|
|
|
} else if (classManifestOpt.isEmpty) {
|
|
|
|
|
log.debug("Dropping message with unknown manifest. It was probably sent from system [{}] with compression " +
|
2016-09-01 16:26:11 +02:00
|
|
|
"table [{}] built for previous incarnation of the destination system, or it was compressed with a table " +
|
|
|
|
|
"that has already been discarded in the destination system.", originUid,
|
|
|
|
|
headerBuilder.inboundActorRefCompressionTableVersion)
|
|
|
|
|
pull(in)
|
|
|
|
|
} else {
|
fix many bugs in InboundCompressions, #21464
* comprehensive integration test that revealed many bugs
* confirmations of manifests were wrong, at two places
* using wrong tables when system is restarted, including
originUid in the tables with checks when receiving advertisments
* close (stop scheduling) of advertisments when new incarnation,
quarantine, or restart
* cleanup how deadLetters ref was treated, and made it more robust
* make Decoder tolerant to decompression failures, can happen in
case of system restart before handshake completed
* give up resending advertisment after a few attempts without confirmation,
to avoid keeping outbound association open to possible dead system
* don't advertise new table when no inbound messages,
to avoid keeping outbound association open to possible dead system
* HeaderBuilder could use manifest field from previous message, added
resetMessageFields
* No compression for ArteryMessage, e.g. handshake messages must go
through without depending on compression tables being in sync
* improve debug logging, including originUid
2016-09-15 11:27:00 +02:00
|
|
|
val classManifest = classManifestOpt.get
|
2016-07-04 15:59:44 +02:00
|
|
|
|
2016-09-01 16:26:11 +02:00
|
|
|
if ((messageCount & heavyHitterMask) == 0) {
|
|
|
|
|
// --- hit refs and manifests for heavy-hitter counting
|
|
|
|
|
association match {
|
|
|
|
|
case OptionVal.Some(assoc) ⇒
|
|
|
|
|
val remoteAddress = assoc.remoteAddress
|
|
|
|
|
sender match {
|
|
|
|
|
case OptionVal.Some(snd) ⇒
|
2017-01-13 10:33:55 +01:00
|
|
|
compressions.hitActorRef(originUid, remoteAddress, snd, 1)
|
2016-09-01 16:26:11 +02:00
|
|
|
case OptionVal.None ⇒
|
|
|
|
|
}
|
2016-07-04 15:59:44 +02:00
|
|
|
|
2016-09-01 16:26:11 +02:00
|
|
|
recipient match {
|
|
|
|
|
case OptionVal.Some(rcp) ⇒
|
2017-01-13 10:33:55 +01:00
|
|
|
compressions.hitActorRef(originUid, remoteAddress, rcp, 1)
|
2016-09-01 16:26:11 +02:00
|
|
|
case OptionVal.None ⇒
|
|
|
|
|
}
|
2016-07-04 15:59:44 +02:00
|
|
|
|
2017-01-13 10:33:55 +01:00
|
|
|
compressions.hitClassManifest(originUid, remoteAddress, classManifest, 1)
|
2016-07-04 15:59:44 +02:00
|
|
|
|
2016-09-01 16:26:11 +02:00
|
|
|
case _ ⇒
|
|
|
|
|
// we don't want to record hits for compression while handshake is still in progress.
|
|
|
|
|
log.debug("Decoded message but unable to record hits for compression as no remoteAddress known. No association yet?")
|
|
|
|
|
}
|
|
|
|
|
// --- end of hit refs and manifests for heavy-hitter counting
|
2016-07-04 15:59:44 +02:00
|
|
|
}
|
2016-06-23 11:58:54 +02:00
|
|
|
|
2016-09-01 16:26:11 +02:00
|
|
|
val decoded = inEnvelopePool.acquire().init(
|
|
|
|
|
recipient,
|
|
|
|
|
sender,
|
|
|
|
|
originUid,
|
|
|
|
|
headerBuilder.serializer,
|
|
|
|
|
classManifest,
|
2016-09-05 22:44:22 +02:00
|
|
|
headerBuilder.flags,
|
2016-09-01 16:26:11 +02:00
|
|
|
envelope,
|
|
|
|
|
association)
|
|
|
|
|
|
|
|
|
|
if (recipient.isEmpty && !headerBuilder.isNoRecipient) {
|
2016-09-09 10:15:12 +02:00
|
|
|
|
|
|
|
|
// The remote deployed actor might not be created yet when resolving the
|
|
|
|
|
// recipient for the first message that is sent to it, best effort retry.
|
|
|
|
|
// However, if the retried resolve isn't successful the ref is banned and
|
|
|
|
|
// we will not do the delayed retry resolve again. The reason for that is
|
|
|
|
|
// if many messages are sent to such dead refs the resolve process will slow
|
|
|
|
|
// down other messages.
|
|
|
|
|
val recipientActorRefPath = headerBuilder.recipientActorRefPath.get
|
|
|
|
|
if (bannedRemoteDeployedActorRefs.contains(recipientActorRefPath)) {
|
2016-09-26 12:32:54 +02:00
|
|
|
|
|
|
|
|
headerBuilder.recipientActorRefPath match {
|
|
|
|
|
case OptionVal.Some(path) ⇒
|
|
|
|
|
val ref = actorRefResolver.getOrCompute(path)
|
|
|
|
|
if (ref.isInstanceOf[EmptyLocalActorRef]) log.warning(
|
|
|
|
|
"Message for banned (terminated, unresolved) remote deployed recipient [{}].",
|
|
|
|
|
recipientActorRefPath)
|
|
|
|
|
push(out, decoded.withRecipient(ref))
|
|
|
|
|
case OptionVal.None ⇒
|
|
|
|
|
log.warning(
|
|
|
|
|
"Dropping message for banned (terminated, unresolved) remote deployed recipient [{}].",
|
|
|
|
|
recipientActorRefPath)
|
|
|
|
|
pull(in)
|
|
|
|
|
}
|
|
|
|
|
|
2016-09-09 10:15:12 +02:00
|
|
|
} else
|
|
|
|
|
scheduleOnce(RetryResolveRemoteDeployedRecipient(
|
|
|
|
|
retryResolveRemoteDeployedRecipientAttempts,
|
|
|
|
|
recipientActorRefPath, decoded), retryResolveRemoteDeployedRecipientInterval)
|
2016-09-20 14:23:50 +03:00
|
|
|
} else {
|
2016-09-01 16:26:11 +02:00
|
|
|
push(out, decoded)
|
2016-09-20 14:23:50 +03:00
|
|
|
}
|
2016-09-01 16:26:11 +02:00
|
|
|
}
|
2016-05-05 14:38:48 +02:00
|
|
|
}
|
|
|
|
|
|
2016-06-09 09:16:44 +02:00
|
|
|
private def resolveRecipient(path: String): OptionVal[InternalActorRef] = {
|
2016-09-02 18:09:43 +02:00
|
|
|
actorRefResolver.getOrCompute(path) match {
|
2016-06-23 11:58:54 +02:00
|
|
|
case empty: EmptyLocalActorRef ⇒
|
|
|
|
|
val pathElements = empty.path.elements
|
|
|
|
|
if (pathElements.nonEmpty && pathElements.head == "remote") OptionVal.None
|
|
|
|
|
else OptionVal(empty)
|
2016-06-09 09:16:44 +02:00
|
|
|
case ref ⇒ OptionVal(ref)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2016-05-05 14:38:48 +02:00
|
|
|
override def onPull(): Unit = pull(in)
|
|
|
|
|
|
2016-06-09 09:16:44 +02:00
|
|
|
override protected def onTimer(timerKey: Any): Unit = {
|
|
|
|
|
timerKey match {
|
2016-07-04 15:59:44 +02:00
|
|
|
case Tick ⇒
|
|
|
|
|
val now = System.nanoTime()
|
2016-07-07 10:27:24 +02:00
|
|
|
val d = math.max(1, now - tickTimestamp)
|
|
|
|
|
val rate = (messageCount - tickMessageCount) * TimeUnit.SECONDS.toNanos(1) / d
|
|
|
|
|
val oldHeavyHitterMask = heavyHitterMask
|
|
|
|
|
heavyHitterMask =
|
|
|
|
|
if (rate < adaptiveSamplingRateThreshold) 0 // no sampling
|
|
|
|
|
else if (rate < adaptiveSamplingRateThreshold * 10) (1 << 6) - 1 // sample every 64nth message
|
|
|
|
|
else if (rate < adaptiveSamplingRateThreshold * 100) (1 << 7) - 1 // sample every 128nth message
|
|
|
|
|
else (1 << 8) - 1 // sample every 256nth message
|
|
|
|
|
if (oldHeavyHitterMask > 0 && heavyHitterMask == 0)
|
|
|
|
|
log.debug("Turning off adaptive sampling of compression hit counting")
|
|
|
|
|
else if (oldHeavyHitterMask != heavyHitterMask)
|
|
|
|
|
log.debug("Turning on adaptive sampling ({}nth message) of compression hit counting", heavyHitterMask + 1)
|
2016-07-04 15:59:44 +02:00
|
|
|
tickMessageCount = messageCount
|
|
|
|
|
tickTimestamp = now
|
|
|
|
|
|
2017-01-13 10:33:55 +01:00
|
|
|
case AdvertiseActorRefsCompressionTable ⇒
|
|
|
|
|
compressions.runNextActorRefAdvertisement() // TODO: optimise these operations, otherwise they stall the hotpath
|
|
|
|
|
|
|
|
|
|
case AdvertiseClassManifestsCompressionTable ⇒
|
|
|
|
|
compressions.runNextClassManifestAdvertisement() // TODO: optimise these operations, otherwise they stall the hotpath
|
|
|
|
|
|
2016-06-09 09:16:44 +02:00
|
|
|
case RetryResolveRemoteDeployedRecipient(attemptsLeft, recipientPath, inboundEnvelope) ⇒
|
|
|
|
|
resolveRecipient(recipientPath) match {
|
|
|
|
|
case OptionVal.None ⇒
|
|
|
|
|
if (attemptsLeft > 0)
|
|
|
|
|
scheduleOnce(RetryResolveRemoteDeployedRecipient(
|
|
|
|
|
attemptsLeft - 1,
|
2016-06-23 11:58:54 +02:00
|
|
|
recipientPath, inboundEnvelope), retryResolveRemoteDeployedRecipientInterval)
|
2016-06-09 09:16:44 +02:00
|
|
|
else {
|
2016-09-09 10:15:12 +02:00
|
|
|
// No more attempts left. If the retried resolve isn't successful the ref is banned and
|
|
|
|
|
// we will not do the delayed retry resolve again. The reason for that is
|
|
|
|
|
// if many messages are sent to such dead refs the resolve process will slow
|
|
|
|
|
// down other messages.
|
|
|
|
|
if (bannedRemoteDeployedActorRefs.size >= 100) {
|
|
|
|
|
// keep it bounded
|
|
|
|
|
bannedRemoteDeployedActorRefs.clear()
|
|
|
|
|
}
|
|
|
|
|
bannedRemoteDeployedActorRefs.add(recipientPath)
|
|
|
|
|
|
2016-09-02 18:09:43 +02:00
|
|
|
val recipient = actorRefResolver.getOrCompute(recipientPath)
|
2016-06-09 09:16:44 +02:00
|
|
|
push(out, inboundEnvelope.withRecipient(recipient))
|
|
|
|
|
}
|
|
|
|
|
case OptionVal.Some(recipient) ⇒
|
|
|
|
|
push(out, inboundEnvelope.withRecipient(recipient))
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2016-05-05 14:38:48 +02:00
|
|
|
setHandlers(in, out, this)
|
|
|
|
|
}
|
2017-01-13 10:33:55 +01:00
|
|
|
|
|
|
|
|
(logic, logic)
|
|
|
|
|
}
|
|
|
|
|
|
2016-05-05 14:38:48 +02:00
|
|
|
}
|
2016-06-23 11:58:54 +02:00
|
|
|
|
2016-08-30 14:37:11 +02:00
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
|
|
|
|
private[remote] class Deserializer(
|
|
|
|
|
inboundContext: InboundContext,
|
|
|
|
|
system: ExtendedActorSystem,
|
|
|
|
|
bufferPool: EnvelopeBufferPool) extends GraphStage[FlowShape[InboundEnvelope, InboundEnvelope]] {
|
|
|
|
|
|
|
|
|
|
val in: Inlet[InboundEnvelope] = Inlet("Artery.Deserializer.in")
|
|
|
|
|
val out: Outlet[InboundEnvelope] = Outlet("Artery.Deserializer.out")
|
|
|
|
|
val shape: FlowShape[InboundEnvelope, InboundEnvelope] = FlowShape(in, out)
|
|
|
|
|
|
|
|
|
|
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
|
|
|
|
|
new GraphStageLogic(shape) with InHandler with OutHandler with StageLogging {
|
2016-12-06 09:10:12 +01:00
|
|
|
private val instruments: RemoteInstruments = RemoteInstruments(system)
|
2016-08-30 14:37:11 +02:00
|
|
|
private val serialization = SerializationExtension(system)
|
|
|
|
|
|
|
|
|
|
override protected def logSource = classOf[Deserializer]
|
|
|
|
|
|
|
|
|
|
override def onPush(): Unit = {
|
|
|
|
|
val envelope = grab(in)
|
|
|
|
|
|
|
|
|
|
try {
|
2016-12-06 09:10:12 +01:00
|
|
|
val startTime: Long = if (instruments.timeSerialization) System.nanoTime else 0
|
|
|
|
|
|
2016-08-30 14:37:11 +02:00
|
|
|
val deserializedMessage = MessageSerializer.deserializeForArtery(
|
|
|
|
|
system, envelope.originUid, serialization, envelope.serializer, envelope.classManifest, envelope.envelopeBuffer)
|
|
|
|
|
|
2016-09-05 22:44:22 +02:00
|
|
|
val envelopeWithMessage = envelope.withMessage(deserializedMessage)
|
|
|
|
|
|
2016-12-06 09:10:12 +01:00
|
|
|
if (instruments.nonEmpty) {
|
|
|
|
|
instruments.deserialize(envelopeWithMessage)
|
|
|
|
|
val time = if (instruments.timeSerialization) System.nanoTime - startTime else 0
|
|
|
|
|
instruments.messageReceived(envelopeWithMessage, envelope.envelopeBuffer.byteBuffer.limit(), time)
|
|
|
|
|
}
|
2016-09-05 22:44:22 +02:00
|
|
|
push(out, envelopeWithMessage)
|
2016-08-30 14:37:11 +02:00
|
|
|
} catch {
|
|
|
|
|
case NonFatal(e) ⇒
|
2017-02-16 14:09:04 +01:00
|
|
|
val from = envelope.association match {
|
|
|
|
|
case OptionVal.Some(a) ⇒ a.remoteAddress
|
|
|
|
|
case OptionVal.None ⇒ "unknown"
|
|
|
|
|
}
|
2016-08-30 14:37:11 +02:00
|
|
|
log.warning(
|
2017-02-16 14:09:04 +01:00
|
|
|
"Failed to deserialize message from [{}] with serializer id [{}] and manifest [{}]. {}",
|
|
|
|
|
from, envelope.serializer, envelope.classManifest, e.getMessage)
|
2016-08-30 14:37:11 +02:00
|
|
|
pull(in)
|
|
|
|
|
} finally {
|
|
|
|
|
val buf = envelope.envelopeBuffer
|
|
|
|
|
envelope.releaseEnvelopeBuffer()
|
|
|
|
|
bufferPool.release(buf)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
override def onPull(): Unit = pull(in)
|
|
|
|
|
|
|
|
|
|
setHandlers(in, out, this)
|
|
|
|
|
}
|
|
|
|
|
}
|