2016-05-05 14:38:48 +02:00
package akka.remote.artery
2016-06-09 09:16:44 +02:00
import scala.concurrent.duration._
2016-05-27 11:24:08 +02:00
import scala.util.control.NonFatal
2016-05-05 14:38:48 +02:00
import akka.actor. { ActorRef , InternalActorRef }
2016-05-27 11:24:08 +02:00
import akka.actor.ActorSystem
import akka.actor.ExtendedActorSystem
2016-06-08 10:04:30 +02:00
import akka.remote. { MessageSerializer , OversizedPayloadException , UniqueAddress }
2016-05-27 11:24:08 +02:00
import akka.remote.EndpointManager.Send
import akka.remote.artery.SystemMessageDelivery.SystemMessageEnvelope
2016-05-05 14:38:48 +02:00
import akka.serialization. { Serialization , SerializationExtension }
import akka.stream._
import akka.stream.stage. { GraphStage , GraphStageLogic , InHandler , OutHandler }
2016-06-05 15:40:06 +02:00
import akka.util.OptionVal
2016-06-09 09:16:44 +02:00
import akka.actor.EmptyLocalActorRef
import akka.stream.stage.TimerGraphStageLogic
2016-05-05 14:38:48 +02:00
2016-06-09 09:16:44 +02:00
/* *
* INTERNAL API
*/
private [ remote ] class Encoder (
2016-05-26 10:42:08 +02:00
uniqueLocalAddress : UniqueAddress ,
2016-06-06 08:26:15 +02:00
system : ActorSystem ,
compressionTable : LiteralCompressionTable ,
bufferPool : EnvelopeBufferPool )
2016-05-05 14:38:48 +02:00
extends GraphStage [ FlowShape [ Send , EnvelopeBuffer ] ] {
val in : Inlet [ Send ] = Inlet ( "Artery.Encoder.in" )
val out : Outlet [ EnvelopeBuffer ] = Outlet ( "Artery.Encoder.out" )
val shape : FlowShape [ Send , EnvelopeBuffer ] = FlowShape ( in , out )
override def createLogic ( inheritedAttributes : Attributes ) : GraphStageLogic =
2016-05-27 11:24:08 +02:00
new GraphStageLogic ( shape ) with InHandler with OutHandler with StageLogging {
2016-05-05 14:38:48 +02:00
private val headerBuilder = HeaderBuilder ( compressionTable )
headerBuilder . version = ArteryTransport . Version
2016-05-26 10:42:08 +02:00
headerBuilder . uid = uniqueLocalAddress . uid
private val localAddress = uniqueLocalAddress . address
private val serialization = SerializationExtension ( system )
2016-05-27 08:50:41 +02:00
private val serializationInfo = Serialization . Information ( localAddress , system )
2016-05-05 14:38:48 +02:00
2016-05-20 10:33:55 +02:00
private val senderCache = new java . util . HashMap [ ActorRef , String ]
private var recipientCache = new java . util . HashMap [ ActorRef , String ]
2016-05-27 11:24:08 +02:00
override protected def logSource = classOf [ Encoder ]
2016-05-05 14:38:48 +02:00
override def onPush ( ) : Unit = {
val send = grab ( in )
2016-06-06 08:26:15 +02:00
val envelope = bufferPool . acquire ( )
2016-05-05 14:38:48 +02:00
2016-05-20 10:33:55 +02:00
val recipientStr = recipientCache . get ( send . recipient ) match {
case null ⇒
val s = send . recipient . path . toSerializationFormat
// FIXME this cache will be replaced by compression table
if ( recipientCache . size ( ) >= 1000 )
recipientCache . clear ( )
recipientCache . put ( send . recipient , s )
s
case s ⇒ s
}
headerBuilder . recipientActorRef = recipientStr
2016-05-05 14:38:48 +02:00
send . senderOption match {
2016-06-06 08:26:15 +02:00
case OptionVal . None ⇒ headerBuilder . setNoSender ( )
case OptionVal . Some ( sender ) ⇒
2016-05-20 10:33:55 +02:00
val senderStr = senderCache . get ( sender ) match {
case null ⇒
val s = sender . path . toSerializationFormatWithAddress ( localAddress )
// FIXME we might need an efficient LRU cache, or replaced by compression table
if ( senderCache . size ( ) >= 1000 )
senderCache . clear ( )
senderCache . put ( sender , s )
s
case s ⇒ s
}
headerBuilder . senderActorRef = senderStr
2016-05-05 14:38:48 +02:00
}
2016-05-27 08:50:41 +02:00
try {
2016-05-27 11:24:08 +02:00
// avoiding currentTransportInformation.withValue due to thunk allocation
val oldValue = Serialization . currentTransportInformation . value
try {
Serialization . currentTransportInformation . value = serializationInfo
MessageSerializer . serializeForArtery ( serialization , send . message . asInstanceOf [ AnyRef ] , headerBuilder , envelope )
} finally
Serialization . currentTransportInformation . value = oldValue
envelope . byteBuffer . flip ( )
push ( out , envelope )
} catch {
case NonFatal ( e ) ⇒
2016-06-06 08:26:15 +02:00
bufferPool . release ( envelope )
2016-05-27 11:24:08 +02:00
send . message match {
case _ : SystemMessageEnvelope ⇒
log . error ( e , "Failed to serialize system message [{}]." , send . message . getClass . getName )
throw e
2016-06-08 10:04:30 +02:00
case _ if e . isInstanceOf [ java . nio . BufferOverflowException ] ⇒
val reason = new OversizedPayloadException ( s" Discarding oversized payload sent to ${ send . recipient } : max allowed size ${ envelope . byteBuffer . limit ( ) } bytes. Message type [ ${ send . message . getClass . getName } ]. " )
log . error ( reason , "Transient association error (association remains live)" )
pull ( in )
2016-05-27 11:24:08 +02:00
case _ ⇒
log . error ( e , "Failed to serialize message [{}]." , send . message . getClass . getName )
pull ( in )
}
}
2016-05-05 14:38:48 +02:00
}
override def onPull ( ) : Unit = pull ( in )
setHandlers ( in , out , this )
}
}
2016-06-09 09:16:44 +02:00
/* *
* INTERNAL API
*/
private [ remote ] object Decoder {
private final case class RetryResolveRemoteDeployedRecipient (
attemptsLeft : Int ,
recipientPath : String ,
inboundEnvelope : InboundEnvelope )
}
/* *
* INTERNAL API
*/
private [ remote ] class Decoder (
2016-06-10 13:04:23 +02:00
inboundContext : InboundContext ,
2016-06-06 08:26:15 +02:00
system : ExtendedActorSystem ,
2016-05-26 10:42:08 +02:00
resolveActorRefWithLocalAddress : String ⇒ InternalActorRef ,
2016-06-06 08:26:15 +02:00
compressionTable : LiteralCompressionTable ,
bufferPool : EnvelopeBufferPool ,
inEnvelopePool : ObjectPool [ InboundEnvelope ] ) extends GraphStage [ FlowShape [ EnvelopeBuffer , InboundEnvelope ] ] {
2016-05-05 14:38:48 +02:00
val in : Inlet [ EnvelopeBuffer ] = Inlet ( "Artery.Decoder.in" )
val out : Outlet [ InboundEnvelope ] = Outlet ( "Artery.Decoder.out" )
val shape : FlowShape [ EnvelopeBuffer , InboundEnvelope ] = FlowShape ( in , out )
override def createLogic ( inheritedAttributes : Attributes ) : GraphStageLogic =
2016-06-09 09:16:44 +02:00
new TimerGraphStageLogic ( shape ) with InHandler with OutHandler with StageLogging {
import Decoder.RetryResolveRemoteDeployedRecipient
2016-06-10 13:04:23 +02:00
private val localAddress = inboundContext . localAddress . address
2016-05-05 14:38:48 +02:00
private val headerBuilder = HeaderBuilder ( compressionTable )
2016-05-26 10:42:08 +02:00
private val serialization = SerializationExtension ( system )
2016-05-20 10:33:55 +02:00
private val recipientCache = new java . util . HashMap [ String , InternalActorRef ]
2016-06-05 15:40:06 +02:00
private val senderCache = new java . util . HashMap [ String , ActorRef ]
2016-05-05 14:38:48 +02:00
2016-06-09 09:16:44 +02:00
private val retryResolveRemoteDeployedRecipientInterval = 50. millis
private val retryResolveRemoteDeployedRecipientAttempts = 20
2016-05-27 11:24:08 +02:00
override protected def logSource = classOf [ Decoder ]
2016-05-05 14:38:48 +02:00
override def onPush ( ) : Unit = {
val envelope = grab ( in )
envelope . parseHeader ( headerBuilder )
// FIXME: Instead of using Strings, the headerBuilder should automatically return cached ActorRef instances
// in case of compression is enabled
// FIXME: Is localAddress really needed?
2016-05-20 10:33:55 +02:00
2016-06-09 09:16:44 +02:00
val sender =
2016-06-05 15:40:06 +02:00
if ( headerBuilder . isNoSender )
OptionVal . None
else {
senderCache . get ( headerBuilder . senderActorRef ) match {
case null ⇒
val ref = resolveActorRefWithLocalAddress ( headerBuilder . senderActorRef )
// FIXME this cache will be replaced by compression table
if ( senderCache . size ( ) >= 1000 )
senderCache . clear ( )
senderCache . put ( headerBuilder . senderActorRef , ref )
OptionVal ( ref )
case ref ⇒ OptionVal ( ref )
}
}
2016-05-05 14:38:48 +02:00
2016-06-09 09:16:44 +02:00
val recipient =
if ( headerBuilder . isNoRecipient )
OptionVal . None
else
resolveRecipient ( headerBuilder . recipientActorRef )
2016-06-10 13:04:23 +02:00
val originUid = headerBuilder . uid
val association = inboundContext . association ( originUid )
2016-05-27 11:24:08 +02:00
try {
val deserializedMessage = MessageSerializer . deserializeForArtery (
system , serialization , headerBuilder , envelope )
2016-06-06 08:26:15 +02:00
val decoded = inEnvelopePool . acquire ( )
decoded . asInstanceOf [ ReusableInboundEnvelope ] . init (
2016-05-27 11:24:08 +02:00
recipient ,
localAddress , // FIXME: Is this needed anymore? What should we do here?
deserializedMessage ,
2016-06-09 09:16:44 +02:00
sender ,
2016-06-10 13:04:23 +02:00
originUid ,
association )
2016-05-27 11:24:08 +02:00
2016-06-09 09:16:44 +02:00
if ( recipient . isEmpty && ! headerBuilder . isNoRecipient ) {
// the remote deployed actor might not be created yet when resolving the
// recipient for the first message that is sent to it, best effort retry
scheduleOnce ( RetryResolveRemoteDeployedRecipient (
retryResolveRemoteDeployedRecipientAttempts ,
headerBuilder . recipientActorRef , decoded ) , retryResolveRemoteDeployedRecipientInterval )
} else
push ( out , decoded )
2016-05-27 11:24:08 +02:00
} catch {
case NonFatal ( e ) ⇒
2016-06-03 11:59:00 +02:00
log . warning (
"Failed to deserialize message with serializer id [{}] and manifest [{}]. {}" ,
2016-05-27 11:47:34 +02:00
headerBuilder . serializer , headerBuilder . manifest , e . getMessage )
2016-05-27 11:24:08 +02:00
pull ( in )
} finally {
2016-06-06 08:26:15 +02:00
bufferPool . release ( envelope )
2016-05-27 11:24:08 +02:00
}
2016-05-05 14:38:48 +02:00
}
2016-06-09 09:16:44 +02:00
private def resolveRecipient ( path : String ) : OptionVal [ InternalActorRef ] = {
recipientCache . get ( path ) match {
case null ⇒
def addToCache ( resolved : InternalActorRef ) : Unit = {
// FIXME we might need an efficient LRU cache, or replaced by compression table
if ( recipientCache . size ( ) >= 1000 )
recipientCache . clear ( )
recipientCache . put ( path , resolved )
}
resolveActorRefWithLocalAddress ( path ) match {
case empty : EmptyLocalActorRef ⇒
val pathElements = empty . path . elements
if ( pathElements . nonEmpty && pathElements . head == "remote" )
OptionVal . None
else {
addToCache ( empty )
OptionVal ( empty )
}
case ref ⇒
addToCache ( ref )
OptionVal ( ref )
}
case ref ⇒ OptionVal ( ref )
}
}
2016-05-05 14:38:48 +02:00
override def onPull ( ) : Unit = pull ( in )
2016-06-09 09:16:44 +02:00
override protected def onTimer ( timerKey : Any ) : Unit = {
timerKey match {
case RetryResolveRemoteDeployedRecipient ( attemptsLeft , recipientPath , inboundEnvelope ) ⇒
resolveRecipient ( recipientPath ) match {
case OptionVal . None ⇒
if ( attemptsLeft > 0 )
scheduleOnce ( RetryResolveRemoteDeployedRecipient (
attemptsLeft - 1 ,
headerBuilder . recipientActorRef , inboundEnvelope ) , retryResolveRemoteDeployedRecipientInterval )
else {
val recipient = resolveActorRefWithLocalAddress ( recipientPath )
// only retry for the first message
recipientCache . put ( recipientPath , recipient )
push ( out , inboundEnvelope . withRecipient ( recipient ) )
}
case OptionVal . Some ( recipient ) ⇒
push ( out , inboundEnvelope . withRecipient ( recipient ) )
}
}
}
2016-05-05 14:38:48 +02:00
setHandlers ( in , out , this )
}
}