2016-06-23 11:58:54 +02:00
/* *
* Copyright ( C ) 2016 Lightbend Inc . < http : //www.lightbend.com>
*/
2016-05-05 14:38:48 +02:00
package akka.remote.artery
2016-06-09 09:16:44 +02:00
import scala.concurrent.duration._
2016-05-27 11:24:08 +02:00
import scala.util.control.NonFatal
2016-06-23 11:58:54 +02:00
import akka.actor._
2016-09-02 18:09:43 +02:00
import akka.remote. { MessageSerializer , OversizedPayloadException , RemoteActorRefProvider , UniqueAddress }
2016-05-27 11:24:08 +02:00
import akka.remote.artery.SystemMessageDelivery.SystemMessageEnvelope
2016-05-05 14:38:48 +02:00
import akka.serialization. { Serialization , SerializationExtension }
import akka.stream._
import akka.stream.stage. { GraphStage , GraphStageLogic , InHandler , OutHandler }
2016-09-20 14:23:50 +03:00
import akka.util. { ByteString , OptionVal }
2016-06-09 09:16:44 +02:00
import akka.actor.EmptyLocalActorRef
2016-08-24 19:52:07 +02:00
import akka.remote.artery.compress.InboundCompressions
2016-06-09 09:16:44 +02:00
import akka.stream.stage.TimerGraphStageLogic
2016-07-04 15:59:44 +02:00
import java.util.concurrent.TimeUnit
2016-09-02 18:09:43 +02:00
2016-08-24 19:52:07 +02:00
import scala.concurrent.Future
import akka.remote.artery.compress.CompressionTable
import akka.Done
import akka.stream.stage.GraphStageWithMaterializedValue
2016-09-02 18:09:43 +02:00
2016-08-24 19:52:07 +02:00
import scala.concurrent.Promise
2016-09-19 11:17:41 +02:00
import akka.event.Logging
2016-09-05 22:44:22 +02:00
2016-08-24 19:52:07 +02:00
/* *
* INTERNAL API
*/
private [ remote ] object Encoder {
private [ remote ] trait ChangeOutboundCompression {
def changeActorRefCompression ( table : CompressionTable [ ActorRef ] ) : Future [ Done ]
def changeClassManifestCompression ( table : CompressionTable [ String ] ) : Future [ Done ]
def clearCompression ( ) : Future [ Done ]
}
private [ remote ] class ChangeOutboundCompressionFailed extends RuntimeException (
"Change of outbound compression table failed (will be retried), because materialization did not complete yet" )
2016-08-30 14:37:11 +02:00
2016-08-24 19:52:07 +02:00
}
2016-05-05 14:38:48 +02:00
2016-06-09 09:16:44 +02:00
/* *
* INTERNAL API
*/
private [ remote ] class Encoder (
2016-06-29 17:09:33 +02:00
uniqueLocalAddress : UniqueAddress ,
2016-09-05 22:44:22 +02:00
system : ExtendedActorSystem ,
2016-06-29 17:09:33 +02:00
outboundEnvelopePool : ObjectPool [ ReusableOutboundEnvelope ] ,
2016-09-23 12:30:54 +02:00
bufferPool : EnvelopeBufferPool ,
debugLogSend : Boolean )
2016-08-24 19:52:07 +02:00
extends GraphStageWithMaterializedValue [ FlowShape [ OutboundEnvelope , EnvelopeBuffer ] , Encoder . ChangeOutboundCompression ] {
import Encoder._
2016-06-29 17:09:33 +02:00
val in : Inlet [ OutboundEnvelope ] = Inlet ( "Artery.Encoder.in" )
2016-05-05 14:38:48 +02:00
val out : Outlet [ EnvelopeBuffer ] = Outlet ( "Artery.Encoder.out" )
2016-06-29 17:09:33 +02:00
val shape : FlowShape [ OutboundEnvelope , EnvelopeBuffer ] = FlowShape ( in , out )
2016-05-05 14:38:48 +02:00
2016-08-24 19:52:07 +02:00
override def createLogicAndMaterializedValue ( inheritedAttributes : Attributes ) : ( GraphStageLogic , ChangeOutboundCompression ) = {
val logic = new GraphStageLogic ( shape ) with InHandler with OutHandler with StageLogging with ChangeOutboundCompression {
2016-05-05 14:38:48 +02:00
2016-08-24 19:52:07 +02:00
private val headerBuilder = HeaderBuilder . out ( )
2016-07-01 11:54:57 +02:00
headerBuilder setVersion ArteryTransport . Version
headerBuilder setUid uniqueLocalAddress . uid
2016-05-26 10:42:08 +02:00
private val localAddress = uniqueLocalAddress . address
private val serialization = SerializationExtension ( system )
2016-05-27 08:50:41 +02:00
private val serializationInfo = Serialization . Information ( localAddress , system )
2016-05-05 14:38:48 +02:00
2016-09-05 22:44:22 +02:00
private val instruments : Vector [ RemoteInstrument ] = RemoteInstruments . create ( system )
// by being backed by an Array, this allows us to not allocate any wrapper type for the metadata (since we need its ID)
private val serializedMetadatas : MetadataMap [ ByteString ] = MetadataMap ( ) // TODO: possibly can be optimised a more for the specific access pattern (during write)
2016-08-24 19:52:07 +02:00
private val changeActorRefCompressionCb = getAsyncCallback [ ( CompressionTable [ ActorRef ] , Promise [ Done ] ) ] {
case ( table , done ) ⇒
headerBuilder . setOutboundActorRefCompression ( table )
done . success ( Done )
}
private val changeClassManifsetCompressionCb = getAsyncCallback [ ( CompressionTable [ String ] , Promise [ Done ] ) ] {
case ( table , done ) ⇒
headerBuilder . setOutboundClassManifestCompression ( table )
done . success ( Done )
}
private val clearCompressionCb = getAsyncCallback [ Promise [ Done ] ] { done ⇒
headerBuilder . setOutboundActorRefCompression ( CompressionTable . empty [ ActorRef ] )
headerBuilder . setOutboundClassManifestCompression ( CompressionTable . empty [ String ] )
done . success ( Done )
}
2016-05-27 11:24:08 +02:00
override protected def logSource = classOf [ Encoder ]
2016-09-23 12:30:54 +02:00
private var debugLogSendEnabled = false
override def preStart ( ) : Unit = {
debugLogSendEnabled = debugLogSend && log . isDebugEnabled
}
2016-05-05 14:38:48 +02:00
override def onPush ( ) : Unit = {
2016-06-29 17:09:33 +02:00
val outboundEnvelope = grab ( in )
2016-06-06 08:26:15 +02:00
val envelope = bufferPool . acquire ( )
2016-05-05 14:38:48 +02:00
fix many bugs in InboundCompressions, #21464
* comprehensive integration test that revealed many bugs
* confirmations of manifests were wrong, at two places
* using wrong tables when system is restarted, including
originUid in the tables with checks when receiving advertisments
* close (stop scheduling) of advertisments when new incarnation,
quarantine, or restart
* cleanup how deadLetters ref was treated, and made it more robust
* make Decoder tolerant to decompression failures, can happen in
case of system restart before handshake completed
* give up resending advertisment after a few attempts without confirmation,
to avoid keeping outbound association open to possible dead system
* don't advertise new table when no inbound messages,
to avoid keeping outbound association open to possible dead system
* HeaderBuilder could use manifest field from previous message, added
resetMessageFields
* No compression for ArteryMessage, e.g. handshake messages must go
through without depending on compression tables being in sync
* improve debug logging, including originUid
2016-09-15 11:27:00 +02:00
headerBuilder . resetMessageFields ( )
// don't use outbound compression for ArteryMessage, e.g. handshake messages must get through
// without depending on compression tables being in sync when systems are restarted
headerBuilder . useOutboundCompression ( ! outboundEnvelope . message . isInstanceOf [ ArteryMessage ] )
2016-06-23 11:58:54 +02:00
// internally compression is applied by the builder:
2016-06-29 17:09:33 +02:00
outboundEnvelope . recipient match {
case OptionVal . Some ( r ) ⇒ headerBuilder setRecipientActorRef r
case OptionVal . None ⇒ headerBuilder . setNoRecipient ( )
}
2016-05-05 14:38:48 +02:00
2016-05-27 08:50:41 +02:00
try {
2016-05-27 11:24:08 +02:00
// avoiding currentTransportInformation.withValue due to thunk allocation
val oldValue = Serialization . currentTransportInformation . value
try {
Serialization . currentTransportInformation . value = serializationInfo
2016-06-23 11:58:54 +02:00
2016-06-29 17:09:33 +02:00
outboundEnvelope . sender match {
2016-06-23 11:58:54 +02:00
case OptionVal . None ⇒ headerBuilder . setNoSender ( )
2016-07-01 11:54:57 +02:00
case OptionVal . Some ( s ) ⇒ headerBuilder setSenderActorRef s
2016-06-23 11:58:54 +02:00
}
2016-09-05 22:44:22 +02:00
applyAndRenderRemoteMessageSentMetadata ( instruments , outboundEnvelope , headerBuilder )
2016-06-29 17:09:33 +02:00
MessageSerializer . serializeForArtery ( serialization , outboundEnvelope . message , headerBuilder , envelope )
2016-06-23 11:58:54 +02:00
} finally Serialization . currentTransportInformation . value = oldValue
2016-05-27 11:24:08 +02:00
envelope . byteBuffer . flip ( )
2016-09-23 12:30:54 +02:00
if ( debugLogSendEnabled )
log . debug (
"sending remote message [{}] to [{}] from [{}]" ,
Logging . messageClassName ( outboundEnvelope . message ) ,
outboundEnvelope . recipient . getOrElse ( "" ) , outboundEnvelope . sender . getOrElse ( "" ) )
2016-05-27 11:24:08 +02:00
push ( out , envelope )
} catch {
case NonFatal ( e ) ⇒
2016-06-06 08:26:15 +02:00
bufferPool . release ( envelope )
2016-06-29 17:09:33 +02:00
outboundEnvelope . message match {
2016-05-27 11:24:08 +02:00
case _ : SystemMessageEnvelope ⇒
2016-09-19 11:17:41 +02:00
log . error ( e , "Failed to serialize system message [{}]." ,
Logging . messageClassName ( outboundEnvelope . message ) )
2016-05-27 11:24:08 +02:00
throw e
2016-06-08 10:04:30 +02:00
case _ if e . isInstanceOf [ java . nio . BufferOverflowException ] ⇒
2016-09-19 11:17:41 +02:00
val reason = new OversizedPayloadException ( "Discarding oversized payload sent to " +
s" ${ outboundEnvelope . recipient } : max allowed size ${ envelope . byteBuffer . limit ( ) } " +
s" bytes. Message type [ ${ Logging . messageClassName ( outboundEnvelope . message ) } ]. " )
log . error ( reason , "Failed to serialize oversized message [{}]." ,
Logging . messageClassName ( outboundEnvelope . message ) )
2016-06-08 10:04:30 +02:00
pull ( in )
2016-05-27 11:24:08 +02:00
case _ ⇒
2016-09-19 11:17:41 +02:00
log . error ( e , "Failed to serialize message [{}]." , Logging . messageClassName ( outboundEnvelope . message ) )
2016-05-27 11:24:08 +02:00
pull ( in )
}
2016-06-29 17:09:33 +02:00
} finally {
outboundEnvelope match {
case r : ReusableOutboundEnvelope ⇒ outboundEnvelopePool . release ( r )
2016-09-05 22:44:22 +02:00
case _ ⇒ // no need to release it
2016-06-29 17:09:33 +02:00
}
2016-05-27 11:24:08 +02:00
}
2016-05-05 14:38:48 +02:00
}
override def onPull ( ) : Unit = pull ( in )
2016-09-05 22:44:22 +02:00
/* *
* Renders metadata into `headerBuilder` .
*
* Replace all AnyRef 's that were passed along with the [ [ OutboundEnvelope ] ] into their [ [ ByteString ] ] representations ,
* by calling `remoteMessageSent` of each enabled instrumentation . If `context` was attached in the envelope it is passed
* into the instrument , otherwise it receives an OptionVal . None as context , and may still decide to attach rendered
* metadata by returning it .
*/
private def applyAndRenderRemoteMessageSentMetadata ( instruments : Vector [ RemoteInstrument ] , envelope : OutboundEnvelope , headerBuilder : HeaderBuilder ) : Unit = {
if ( instruments . nonEmpty ) {
val n = instruments . length
var i = 0
while ( i < n ) {
val instrument = instruments ( i )
val instrumentId = instrument . identifier
val metadata = instrument . remoteMessageSent ( envelope . recipient . orNull , envelope . message , envelope . sender . orNull )
if ( metadata ne null ) serializedMetadatas . set ( instrumentId , metadata )
i += 1
}
}
if ( serializedMetadatas . nonEmpty ) {
MetadataEnvelopeSerializer . serialize ( serializedMetadatas , headerBuilder )
serializedMetadatas . clear ( )
}
}
2016-08-24 19:52:07 +02:00
/* *
* External call from ChangeOutboundCompression materialized value
*/
override def changeActorRefCompression ( table : CompressionTable [ ActorRef ] ) : Future [ Done ] = {
val done = Promise [ Done ] ( )
try changeActorRefCompressionCb . invoke ( ( table , done ) ) catch {
// This is a harmless failure, it will be retried on next advertisement or handshake attempt.
// It will only occur when callback is invoked before preStart. That is highly unlikely to
// happen since advertisement is not done immediately and handshake involves network roundtrip.
case NonFatal ( _ ) ⇒ done . tryFailure ( new ChangeOutboundCompressionFailed )
}
done . future
}
/* *
* External call from ChangeOutboundCompression materialized value
*/
override def changeClassManifestCompression ( table : CompressionTable [ String ] ) : Future [ Done ] = {
val done = Promise [ Done ] ( )
try changeClassManifsetCompressionCb . invoke ( ( table , done ) ) catch {
// in case materialization not completed yet
case NonFatal ( _ ) ⇒ done . tryFailure ( new ChangeOutboundCompressionFailed )
}
done . future
}
/* *
* External call from ChangeOutboundCompression materialized value
*/
override def clearCompression ( ) : Future [ Done ] = {
val done = Promise [ Done ] ( )
try clearCompressionCb . invoke ( done ) catch {
// in case materialization not completed yet
case NonFatal ( _ ) ⇒ done . tryFailure ( new ChangeOutboundCompressionFailed )
}
done . future
}
2016-05-05 14:38:48 +02:00
setHandlers ( in , out , this )
}
2016-08-24 19:52:07 +02:00
( logic , logic )
}
2016-05-05 14:38:48 +02:00
}
2016-06-09 09:16:44 +02:00
/* *
* INTERNAL API
*/
private [ remote ] object Decoder {
private final case class RetryResolveRemoteDeployedRecipient (
attemptsLeft : Int ,
recipientPath : String ,
inboundEnvelope : InboundEnvelope )
2016-07-04 15:59:44 +02:00
private object Tick
2016-06-09 09:16:44 +02:00
}
2016-09-02 18:09:43 +02:00
/* *
* INTERNAL API
*/
private [ akka ] final class ActorRefResolveCache ( provider : RemoteActorRefProvider , localAddress : UniqueAddress )
extends LruBoundedCache [ String , InternalActorRef ] ( capacity = 1024 , evictAgeThreshold = 600 ) {
override protected def compute ( k : String ) : InternalActorRef =
provider . resolveActorRefWithLocalAddress ( k , localAddress . address )
override protected def hash ( k : String ) : Int = FastHash . ofString ( k )
override protected def isCacheable ( v : InternalActorRef ) : Boolean = ! v . isInstanceOf [ EmptyLocalActorRef ]
}
2016-06-09 09:16:44 +02:00
/* *
* INTERNAL API
*/
private [ remote ] class Decoder (
2016-09-02 18:09:43 +02:00
inboundContext : InboundContext ,
system : ExtendedActorSystem ,
uniqueLocalAddress : UniqueAddress ,
compression : InboundCompressions ,
bufferPool : EnvelopeBufferPool ,
inEnvelopePool : ObjectPool [ ReusableInboundEnvelope ] ) extends GraphStage [ FlowShape [ EnvelopeBuffer , InboundEnvelope ] ] {
2016-07-04 15:59:44 +02:00
import Decoder.Tick
2016-05-05 14:38:48 +02:00
val in : Inlet [ EnvelopeBuffer ] = Inlet ( "Artery.Decoder.in" )
val out : Outlet [ InboundEnvelope ] = Outlet ( "Artery.Decoder.out" )
val shape : FlowShape [ EnvelopeBuffer , InboundEnvelope ] = FlowShape ( in , out )
override def createLogic ( inheritedAttributes : Attributes ) : GraphStageLogic =
2016-06-09 09:16:44 +02:00
new TimerGraphStageLogic ( shape ) with InHandler with OutHandler with StageLogging {
import Decoder.RetryResolveRemoteDeployedRecipient
2016-06-10 13:04:23 +02:00
private val localAddress = inboundContext . localAddress . address
2016-06-23 11:58:54 +02:00
private val headerBuilder = HeaderBuilder . in ( compression )
2016-09-02 18:09:43 +02:00
private val actorRefResolver : ActorRefResolveCache =
new ActorRefResolveCache ( system . provider . asInstanceOf [ RemoteActorRefProvider ] , uniqueLocalAddress )
2016-09-09 10:15:12 +02:00
private val bannedRemoteDeployedActorRefs = new java . util . HashSet [ String ]
2016-05-20 10:33:55 +02:00
2016-06-09 09:16:44 +02:00
private val retryResolveRemoteDeployedRecipientInterval = 50. millis
private val retryResolveRemoteDeployedRecipientAttempts = 20
2016-07-04 15:59:44 +02:00
// adaptive sampling when rate > 1000 msg/s
private var messageCount = 0L
2016-07-07 10:27:24 +02:00
private var heavyHitterMask = 0 // 0 => no sampling, otherwise power of two - 1
2016-07-04 15:59:44 +02:00
private val adaptiveSamplingRateThreshold = 1000
private var tickTimestamp = System . nanoTime ( )
private var tickMessageCount = 0L
2016-05-27 11:24:08 +02:00
override protected def logSource = classOf [ Decoder ]
2016-07-04 15:59:44 +02:00
override def preStart ( ) : Unit = {
schedulePeriodically ( Tick , 1. seconds )
}
2016-05-05 14:38:48 +02:00
override def onPush ( ) : Unit = {
2016-07-04 15:59:44 +02:00
messageCount += 1
2016-05-05 14:38:48 +02:00
val envelope = grab ( in )
fix many bugs in InboundCompressions, #21464
* comprehensive integration test that revealed many bugs
* confirmations of manifests were wrong, at two places
* using wrong tables when system is restarted, including
originUid in the tables with checks when receiving advertisments
* close (stop scheduling) of advertisments when new incarnation,
quarantine, or restart
* cleanup how deadLetters ref was treated, and made it more robust
* make Decoder tolerant to decompression failures, can happen in
case of system restart before handshake completed
* give up resending advertisment after a few attempts without confirmation,
to avoid keeping outbound association open to possible dead system
* don't advertise new table when no inbound messages,
to avoid keeping outbound association open to possible dead system
* HeaderBuilder could use manifest field from previous message, added
resetMessageFields
* No compression for ArteryMessage, e.g. handshake messages must go
through without depending on compression tables being in sync
* improve debug logging, including originUid
2016-09-15 11:27:00 +02:00
headerBuilder . resetMessageFields ( )
2016-05-05 14:38:48 +02:00
envelope . parseHeader ( headerBuilder )
2016-06-10 13:04:23 +02:00
val originUid = headerBuilder . uid
val association = inboundContext . association ( originUid )
fix many bugs in InboundCompressions, #21464
* comprehensive integration test that revealed many bugs
* confirmations of manifests were wrong, at two places
* using wrong tables when system is restarted, including
originUid in the tables with checks when receiving advertisments
* close (stop scheduling) of advertisments when new incarnation,
quarantine, or restart
* cleanup how deadLetters ref was treated, and made it more robust
* make Decoder tolerant to decompression failures, can happen in
case of system restart before handshake completed
* give up resending advertisment after a few attempts without confirmation,
to avoid keeping outbound association open to possible dead system
* don't advertise new table when no inbound messages,
to avoid keeping outbound association open to possible dead system
* HeaderBuilder could use manifest field from previous message, added
resetMessageFields
* No compression for ArteryMessage, e.g. handshake messages must go
through without depending on compression tables being in sync
* improve debug logging, including originUid
2016-09-15 11:27:00 +02:00
val recipient : OptionVal [ InternalActorRef ] = try headerBuilder . recipientActorRef ( originUid ) match {
2016-07-01 11:54:57 +02:00
case OptionVal . Some ( ref ) ⇒
OptionVal ( ref . asInstanceOf [ InternalActorRef ] )
2016-07-04 16:42:14 +02:00
case OptionVal . None if headerBuilder . recipientActorRefPath . isDefined ⇒
2016-07-01 11:54:57 +02:00
resolveRecipient ( headerBuilder . recipientActorRefPath . get )
2016-07-04 16:42:14 +02:00
case _ ⇒
OptionVal . None
fix many bugs in InboundCompressions, #21464
* comprehensive integration test that revealed many bugs
* confirmations of manifests were wrong, at two places
* using wrong tables when system is restarted, including
originUid in the tables with checks when receiving advertisments
* close (stop scheduling) of advertisments when new incarnation,
quarantine, or restart
* cleanup how deadLetters ref was treated, and made it more robust
* make Decoder tolerant to decompression failures, can happen in
case of system restart before handshake completed
* give up resending advertisment after a few attempts without confirmation,
to avoid keeping outbound association open to possible dead system
* don't advertise new table when no inbound messages,
to avoid keeping outbound association open to possible dead system
* HeaderBuilder could use manifest field from previous message, added
resetMessageFields
* No compression for ArteryMessage, e.g. handshake messages must go
through without depending on compression tables being in sync
* improve debug logging, including originUid
2016-09-15 11:27:00 +02:00
} catch {
case NonFatal ( e ) ⇒
// probably version mismatch due to restarted system
log . warning ( "Couldn't decompress sender from originUid [{}]. {}" , originUid , e . getMessage )
OptionVal . None
}
val sender : OptionVal [ InternalActorRef ] = try headerBuilder . senderActorRef ( originUid ) match {
case OptionVal . Some ( ref ) ⇒
OptionVal ( ref . asInstanceOf [ InternalActorRef ] )
case OptionVal . None if headerBuilder . senderActorRefPath . isDefined ⇒
OptionVal ( actorRefResolver . getOrCompute ( headerBuilder . senderActorRefPath . get ) )
case _ ⇒
OptionVal . None
} catch {
case NonFatal ( e ) ⇒
// probably version mismatch due to restarted system
log . warning ( "Couldn't decompress sender from originUid [{}]. {}" , originUid , e . getMessage )
OptionVal . None
}
val classManifestOpt = try headerBuilder . manifest ( originUid ) catch {
case NonFatal ( e ) ⇒
// probably version mismatch due to restarted system
log . warning ( "Couldn't decompress manifest from originUid [{}]. {}" , originUid , e . getMessage )
OptionVal . None
2016-06-23 11:58:54 +02:00
}
fix many bugs in InboundCompressions, #21464
* comprehensive integration test that revealed many bugs
* confirmations of manifests were wrong, at two places
* using wrong tables when system is restarted, including
originUid in the tables with checks when receiving advertisments
* close (stop scheduling) of advertisments when new incarnation,
quarantine, or restart
* cleanup how deadLetters ref was treated, and made it more robust
* make Decoder tolerant to decompression failures, can happen in
case of system restart before handshake completed
* give up resending advertisment after a few attempts without confirmation,
to avoid keeping outbound association open to possible dead system
* don't advertise new table when no inbound messages,
to avoid keeping outbound association open to possible dead system
* HeaderBuilder could use manifest field from previous message, added
resetMessageFields
* No compression for ArteryMessage, e.g. handshake messages must go
through without depending on compression tables being in sync
* improve debug logging, including originUid
2016-09-15 11:27:00 +02:00
if ( ( recipient . isEmpty && headerBuilder . recipientActorRefPath . isEmpty && ! headerBuilder . isNoRecipient ) ||
( sender . isEmpty && headerBuilder . senderActorRefPath . isEmpty && ! headerBuilder . isNoSender ) ) {
log . debug ( "Dropping message for unknown recipient/sender. It was probably sent from system [{}] with compression " +
"table [{}] built for previous incarnation of the destination system, or it was compressed with a table " +
"that has already been discarded in the destination system." , originUid ,
headerBuilder . inboundActorRefCompressionTableVersion )
pull ( in )
} else if ( classManifestOpt . isEmpty ) {
log . debug ( "Dropping message with unknown manifest. It was probably sent from system [{}] with compression " +
2016-09-01 16:26:11 +02:00
"table [{}] built for previous incarnation of the destination system, or it was compressed with a table " +
"that has already been discarded in the destination system." , originUid ,
headerBuilder . inboundActorRefCompressionTableVersion )
pull ( in )
} else {
fix many bugs in InboundCompressions, #21464
* comprehensive integration test that revealed many bugs
* confirmations of manifests were wrong, at two places
* using wrong tables when system is restarted, including
originUid in the tables with checks when receiving advertisments
* close (stop scheduling) of advertisments when new incarnation,
quarantine, or restart
* cleanup how deadLetters ref was treated, and made it more robust
* make Decoder tolerant to decompression failures, can happen in
case of system restart before handshake completed
* give up resending advertisment after a few attempts without confirmation,
to avoid keeping outbound association open to possible dead system
* don't advertise new table when no inbound messages,
to avoid keeping outbound association open to possible dead system
* HeaderBuilder could use manifest field from previous message, added
resetMessageFields
* No compression for ArteryMessage, e.g. handshake messages must go
through without depending on compression tables being in sync
* improve debug logging, including originUid
2016-09-15 11:27:00 +02:00
val classManifest = classManifestOpt . get
2016-07-04 15:59:44 +02:00
2016-09-01 16:26:11 +02:00
if ( ( messageCount & heavyHitterMask ) == 0 ) {
// --- hit refs and manifests for heavy-hitter counting
association match {
case OptionVal . Some ( assoc ) ⇒
val remoteAddress = assoc . remoteAddress
sender match {
case OptionVal . Some ( snd ) ⇒
compression . hitActorRef ( originUid , remoteAddress , snd , 1 )
case OptionVal . None ⇒
}
2016-07-04 15:59:44 +02:00
2016-09-01 16:26:11 +02:00
recipient match {
case OptionVal . Some ( rcp ) ⇒
compression . hitActorRef ( originUid , remoteAddress , rcp , 1 )
case OptionVal . None ⇒
}
2016-07-04 15:59:44 +02:00
2016-09-01 16:26:11 +02:00
compression . hitClassManifest ( originUid , remoteAddress , classManifest , 1 )
2016-07-04 15:59:44 +02:00
2016-09-01 16:26:11 +02:00
case _ ⇒
// we don't want to record hits for compression while handshake is still in progress.
log . debug ( "Decoded message but unable to record hits for compression as no remoteAddress known. No association yet?" )
}
// --- end of hit refs and manifests for heavy-hitter counting
2016-07-04 15:59:44 +02:00
}
2016-06-23 11:58:54 +02:00
2016-09-01 16:26:11 +02:00
val decoded = inEnvelopePool . acquire ( ) . init (
recipient ,
sender ,
originUid ,
headerBuilder . serializer ,
classManifest ,
2016-09-05 22:44:22 +02:00
headerBuilder . flags ,
2016-09-01 16:26:11 +02:00
envelope ,
association )
if ( recipient . isEmpty && ! headerBuilder . isNoRecipient ) {
2016-09-09 10:15:12 +02:00
// The remote deployed actor might not be created yet when resolving the
// recipient for the first message that is sent to it, best effort retry.
// However, if the retried resolve isn't successful the ref is banned and
// we will not do the delayed retry resolve again. The reason for that is
// if many messages are sent to such dead refs the resolve process will slow
// down other messages.
val recipientActorRefPath = headerBuilder . recipientActorRefPath . get
if ( bannedRemoteDeployedActorRefs . contains ( recipientActorRefPath ) ) {
log . debug (
"Dropping message for banned (terminated) remote deployed recipient [{}]." ,
recipientActorRefPath )
pull ( in )
} else
scheduleOnce ( RetryResolveRemoteDeployedRecipient (
retryResolveRemoteDeployedRecipientAttempts ,
recipientActorRefPath , decoded ) , retryResolveRemoteDeployedRecipientInterval )
2016-09-20 14:23:50 +03:00
} else {
2016-09-01 16:26:11 +02:00
push ( out , decoded )
2016-09-20 14:23:50 +03:00
}
2016-09-01 16:26:11 +02:00
}
2016-05-05 14:38:48 +02:00
}
2016-06-09 09:16:44 +02:00
private def resolveRecipient ( path : String ) : OptionVal [ InternalActorRef ] = {
2016-09-02 18:09:43 +02:00
actorRefResolver . getOrCompute ( path ) match {
2016-06-23 11:58:54 +02:00
case empty : EmptyLocalActorRef ⇒
val pathElements = empty . path . elements
if ( pathElements . nonEmpty && pathElements . head == "remote" ) OptionVal . None
else OptionVal ( empty )
2016-06-09 09:16:44 +02:00
case ref ⇒ OptionVal ( ref )
}
}
2016-05-05 14:38:48 +02:00
override def onPull ( ) : Unit = pull ( in )
2016-06-09 09:16:44 +02:00
override protected def onTimer ( timerKey : Any ) : Unit = {
timerKey match {
2016-07-04 15:59:44 +02:00
case Tick ⇒
val now = System . nanoTime ( )
2016-07-07 10:27:24 +02:00
val d = math . max ( 1 , now - tickTimestamp )
val rate = ( messageCount - tickMessageCount ) * TimeUnit . SECONDS . toNanos ( 1 ) / d
val oldHeavyHitterMask = heavyHitterMask
heavyHitterMask =
if ( rate < adaptiveSamplingRateThreshold ) 0 // no sampling
else if ( rate < adaptiveSamplingRateThreshold * 10 ) ( 1 << 6 ) - 1 // sample every 64nth message
else if ( rate < adaptiveSamplingRateThreshold * 100 ) ( 1 << 7 ) - 1 // sample every 128nth message
else ( 1 << 8 ) - 1 // sample every 256nth message
if ( oldHeavyHitterMask > 0 && heavyHitterMask == 0 )
log . debug ( "Turning off adaptive sampling of compression hit counting" )
else if ( oldHeavyHitterMask != heavyHitterMask )
log . debug ( "Turning on adaptive sampling ({}nth message) of compression hit counting" , heavyHitterMask + 1 )
2016-07-04 15:59:44 +02:00
tickMessageCount = messageCount
tickTimestamp = now
2016-06-09 09:16:44 +02:00
case RetryResolveRemoteDeployedRecipient ( attemptsLeft , recipientPath , inboundEnvelope ) ⇒
resolveRecipient ( recipientPath ) match {
case OptionVal . None ⇒
if ( attemptsLeft > 0 )
scheduleOnce ( RetryResolveRemoteDeployedRecipient (
attemptsLeft - 1 ,
2016-06-23 11:58:54 +02:00
recipientPath , inboundEnvelope ) , retryResolveRemoteDeployedRecipientInterval )
2016-06-09 09:16:44 +02:00
else {
2016-09-09 10:15:12 +02:00
// No more attempts left. If the retried resolve isn't successful the ref is banned and
// we will not do the delayed retry resolve again. The reason for that is
// if many messages are sent to such dead refs the resolve process will slow
// down other messages.
if ( bannedRemoteDeployedActorRefs . size >= 100 ) {
// keep it bounded
bannedRemoteDeployedActorRefs . clear ( )
}
bannedRemoteDeployedActorRefs . add ( recipientPath )
2016-09-02 18:09:43 +02:00
val recipient = actorRefResolver . getOrCompute ( recipientPath )
2016-06-09 09:16:44 +02:00
push ( out , inboundEnvelope . withRecipient ( recipient ) )
}
case OptionVal . Some ( recipient ) ⇒
push ( out , inboundEnvelope . withRecipient ( recipient ) )
}
}
}
2016-05-05 14:38:48 +02:00
setHandlers ( in , out , this )
}
}
2016-06-23 11:58:54 +02:00
2016-08-30 14:37:11 +02:00
/* *
* INTERNAL API
*/
private [ remote ] class Deserializer (
inboundContext : InboundContext ,
system : ExtendedActorSystem ,
bufferPool : EnvelopeBufferPool ) extends GraphStage [ FlowShape [ InboundEnvelope , InboundEnvelope ] ] {
val in : Inlet [ InboundEnvelope ] = Inlet ( "Artery.Deserializer.in" )
val out : Outlet [ InboundEnvelope ] = Outlet ( "Artery.Deserializer.out" )
val shape : FlowShape [ InboundEnvelope , InboundEnvelope ] = FlowShape ( in , out )
override def createLogic ( inheritedAttributes : Attributes ) : GraphStageLogic =
new GraphStageLogic ( shape ) with InHandler with OutHandler with StageLogging {
2016-09-05 22:44:22 +02:00
private val instruments : Vector [ RemoteInstrument ] = RemoteInstruments . create ( system )
2016-08-30 14:37:11 +02:00
private val serialization = SerializationExtension ( system )
override protected def logSource = classOf [ Deserializer ]
override def onPush ( ) : Unit = {
val envelope = grab ( in )
try {
val deserializedMessage = MessageSerializer . deserializeForArtery (
system , envelope . originUid , serialization , envelope . serializer , envelope . classManifest , envelope . envelopeBuffer )
2016-09-05 22:44:22 +02:00
val envelopeWithMessage = envelope . withMessage ( deserializedMessage )
applyIncomingInstruments ( envelopeWithMessage )
push ( out , envelopeWithMessage )
2016-08-30 14:37:11 +02:00
} catch {
case NonFatal ( e ) ⇒
log . warning (
"Failed to deserialize message with serializer id [{}] and manifest [{}]. {}" ,
envelope . serializer , envelope . classManifest , e . getMessage )
pull ( in )
} finally {
val buf = envelope . envelopeBuffer
envelope . releaseEnvelopeBuffer ( )
bufferPool . release ( buf )
}
}
override def onPull ( ) : Unit = pull ( in )
2016-09-05 22:44:22 +02:00
private def applyIncomingInstruments ( envelope : InboundEnvelope ) : Unit = {
if ( envelope . flag ( EnvelopeBuffer . MetadataPresentFlag ) ) {
val length = instruments . length
if ( length == 0 ) {
// TODO do we need to parse, or can we do a fast forward if debug logging is not enabled?
val metaMetadataEnvelope = MetadataMapParsing . parse ( envelope )
if ( log . isDebugEnabled )
log . debug ( "Incoming message envelope contains metadata for instruments: {}, " +
"however no RemoteInstrument was registered in local system!" , metaMetadataEnvelope . metadataMap . keysWithValues . mkString ( "[" , "," , "]" ) )
} else {
// we avoid emitting a MetadataMap and instead directly apply the instruments onto the received metadata
MetadataMapParsing . applyAllRemoteMessageReceived ( instruments , envelope )
}
}
}
2016-08-30 14:37:11 +02:00
setHandlers ( in , out , this )
}
}