2013-01-09 12:21:31 +01:00
/* *
* Copyright ( C ) 2009 - 2013 Typesafe Inc . < http : //www.typesafe.com>
*/
2012-09-12 11:18:42 +02:00
package akka.remote
import akka.actor._
2013-03-05 16:19:54 +01:00
import akka.dispatch.sysmsg.SystemMessage
2012-09-12 11:18:42 +02:00
import akka.event.LoggingAdapter
import akka.pattern.pipe
import akka.remote.EndpointManager.Send
2013-03-27 17:47:56 +01:00
import akka.remote.WireFormats.SerializedMessage
2012-09-12 11:18:42 +02:00
import akka.remote.transport.AkkaPduCodec._
import akka.remote.transport.AssociationHandle._
2013-03-27 17:47:56 +01:00
import akka.remote.transport.Transport.InvalidAssociationException
import akka.remote.transport. { AkkaPduProtobufCodec , AkkaPduCodec , Transport , AkkaProtocolHandle }
2013-03-26 11:25:09 +01:00
import akka.serialization.Serialization
2012-09-12 11:18:42 +02:00
import akka.util.ByteString
2013-03-27 17:47:56 +01:00
import akka. { OnlyCauseStackTrace , AkkaException }
2013-03-20 20:38:49 +13:00
import java.io.NotSerializableException
2013-03-27 17:47:56 +01:00
import java.util.concurrent.ConcurrentHashMap
import scala.concurrent.duration. { Duration , Deadline }
import scala.util.control.NonFatal
import scala.annotation.tailrec
import akka.remote.EndpointWriter.FlushAndStop
import akka.actor.SupervisorStrategy._
import akka.remote.EndpointManager.Link
2012-09-12 11:18:42 +02:00
2012-12-07 16:03:04 +01:00
/* *
2013-02-08 13:13:52 +01:00
* INTERNAL API
2012-12-07 16:03:04 +01:00
*/
private [ remote ] trait InboundMessageDispatcher {
2012-09-12 11:18:42 +02:00
def dispatch ( recipient : InternalActorRef ,
recipientAddress : Address ,
2013-03-27 17:47:56 +01:00
serializedMessage : SerializedMessage ,
2012-09-12 11:18:42 +02:00
senderOption : Option [ ActorRef ] ) : Unit
}
2013-02-08 13:13:52 +01:00
/* *
* INTERNAL API
*/
2012-12-07 16:03:04 +01:00
private [ remote ] class DefaultMessageDispatcher ( private val system : ExtendedActorSystem ,
private val provider : RemoteActorRefProvider ,
private val log : LoggingAdapter ) extends InboundMessageDispatcher {
2012-09-12 11:18:42 +02:00
private val remoteDaemon = provider . remoteDaemon
override def dispatch ( recipient : InternalActorRef ,
recipientAddress : Address ,
2013-03-27 17:47:56 +01:00
serializedMessage : SerializedMessage ,
2012-09-12 11:18:42 +02:00
senderOption : Option [ ActorRef ] ) : Unit = {
import provider.remoteSettings._
lazy val payload : AnyRef = MessageSerializer . deserialize ( system , serializedMessage )
2012-12-07 16:03:04 +01:00
def payloadClass : Class [ _ ] = if ( payload eq null ) null else payload . getClass
2012-09-12 11:18:42 +02:00
val sender : ActorRef = senderOption . getOrElse ( system . deadLetters )
val originalReceiver = recipient . path
2012-12-07 16:03:04 +01:00
def msgLog = s" RemoteMessage: [ $payload ] to [ $recipient ]<+[ $originalReceiver ] from [ $sender ] "
2012-09-12 11:18:42 +02:00
recipient match {
case `remoteDaemon` ⇒
2012-12-12 15:04:44 +01:00
if ( UntrustedMode ) log . debug ( "dropping daemon message in untrusted mode" )
else {
2012-11-23 10:15:19 +01:00
if ( LogReceive ) log . debug ( "received daemon message {}" , msgLog )
2013-03-26 18:17:50 +01:00
remoteDaemon ! payload
2012-09-12 11:18:42 +02:00
}
case l @ ( _ : LocalRef | _ : RepointableRef ) if l . isLocal ⇒
if ( LogReceive ) log . debug ( "received local message {}" , msgLog )
payload match {
case msg : PossiblyHarmful if UntrustedMode ⇒
log . debug ( "operating in UntrustedMode, dropping inbound PossiblyHarmful message of type {}" , msg . getClass )
case msg : SystemMessage ⇒ l . sendSystemMessage ( msg )
case msg ⇒ l . ! ( msg ) ( sender )
}
case r @ ( _ : RemoteRef | _ : RepointableRef ) if ! r . isLocal && ! UntrustedMode ⇒
if ( LogReceive ) log . debug ( "received remote-destined message {}" , msgLog )
if ( provider . transport . addresses ( recipientAddress ) )
// if it was originally addressed to us but is in fact remote from our point of view (i.e. remote-deployed)
r . ! ( payload ) ( sender )
else
log . error ( "dropping message {} for non-local recipient {} arriving at {} inbound addresses are {}" ,
payloadClass , r , recipientAddress , provider . transport . addresses )
case r ⇒ log . error ( "dropping message {} for unknown recipient {} arriving at {} inbound addresses are {}" ,
payloadClass , r , recipientAddress , provider . transport . addresses )
}
}
}
2013-02-08 13:13:52 +01:00
/* *
* INTERNAL API
*/
@SerialVersionUID ( 1L )
2012-12-20 12:59:48 +01:00
private [ remote ] class EndpointException ( msg : String , cause : Throwable ) extends AkkaException ( msg , cause ) with OnlyCauseStackTrace {
2012-12-18 13:46:10 +01:00
def this ( msg : String ) = this ( msg , null )
}
2013-02-08 13:13:52 +01:00
/* *
* INTERNAL API
*/
2013-03-07 18:08:07 +01:00
@SerialVersionUID ( 1L )
2012-12-07 16:03:04 +01:00
private [ remote ] case class InvalidAssociation ( localAddress : Address , remoteAddress : Address , cause : Throwable )
2012-09-12 11:18:42 +02:00
extends EndpointException ( "Invalid address: " + remoteAddress , cause )
2013-03-27 17:47:56 +01:00
/* *
* INTERNAL API
*/
@SerialVersionUID ( 1L )
private [ remote ] case class HopelessAssociation ( localAddress : Address , remoteAddress : Address , uid : Option [ Int ] , cause : Throwable )
extends EndpointException ( "Catastrophic association error." )
2013-03-07 18:08:07 +01:00
/* *
* INTERNAL API
*/
@SerialVersionUID ( 1L )
private [ remote ] class EndpointDisassociatedException ( msg : String ) extends EndpointException ( msg )
/* *
* INTERNAL API
*/
@SerialVersionUID ( 1L )
private [ remote ] class EndpointAssociationException ( msg : String , cause : Throwable ) extends EndpointException ( msg , cause )
2013-04-18 17:35:43 +02:00
/* *
* INTERNAL API
*/
@SerialVersionUID ( 1L )
private [ remote ] class QuarantinedUidException ( uid : Int , remoteAddress : Address )
extends EndpointException ( s" Refused association to [ $remoteAddress ] because its UID [ $uid ] is quarantined. " )
2013-02-08 13:13:52 +01:00
/* *
* INTERNAL API
*/
2013-03-20 20:38:49 +13:00
@SerialVersionUID ( 1L )
private [ remote ] class OversizedPayloadException ( msg : String ) extends EndpointException ( msg )
2013-03-27 17:47:56 +01:00
/* *
* INTERNAL API
*/
private [ remote ] object ReliableDeliverySupervisor {
case object Ungate
case class GotUid ( uid : Int )
def apply (
handleOrActive : Option [ AkkaProtocolHandle ] ,
localAddress : Address ,
remoteAddress : Address ,
transport : Transport ,
settings : RemoteSettings ,
codec : AkkaPduCodec ,
2013-04-18 17:35:43 +02:00
refuseUid : Option [ Int ] ,
2013-03-27 17:47:56 +01:00
receiveBuffers : ConcurrentHashMap [ Link , AckedReceiveBuffer [ Message ] ] ) : Props =
Props ( classOf [ ReliableDeliverySupervisor ] , handleOrActive , localAddress , remoteAddress , transport , settings ,
2013-04-18 17:35:43 +02:00
codec , refuseUid , receiveBuffers )
2013-03-27 17:47:56 +01:00
}
/* *
* INTERNAL API
*/
private [ remote ] class ReliableDeliverySupervisor (
handleOrActive : Option [ AkkaProtocolHandle ] ,
val localAddress : Address ,
val remoteAddress : Address ,
val transport : Transport ,
val settings : RemoteSettings ,
val codec : AkkaPduCodec ,
2013-04-18 17:35:43 +02:00
val refuseUid : Option [ Int ] ,
2013-03-27 17:47:56 +01:00
val receiveBuffers : ConcurrentHashMap [ Link , AckedReceiveBuffer [ Message ] ] ) extends Actor {
import ReliableDeliverySupervisor._
def retryGateEnabled = settings . RetryGateClosedFor > Duration . Zero
override val supervisorStrategy = OneForOneStrategy ( settings . MaximumRetriesInWindow , settings . RetryWindow , loggingEnabled = false ) {
2013-04-18 17:35:43 +02:00
case e @ ( _ : InvalidAssociation | _ : HopelessAssociation | _ : QuarantinedUidException ) ⇒ Escalate
2013-03-27 17:47:56 +01:00
case NonFatal ( e ) ⇒
if ( retryGateEnabled ) {
import context.dispatcher
context . become ( gated )
context . system . scheduler . scheduleOnce ( settings . RetryGateClosedFor , self , Ungate )
context . unwatch ( writer )
currentHandle = None
Stop
} else {
Restart
}
}
var currentHandle : Option [ AkkaProtocolHandle ] = handleOrActive
var resendBuffer = new AckedSendBuffer [ Send ] ( settings . SysMsgBufferSize )
var resendDeadline = Deadline . now + settings . SysResendTimeout
var lastCumulativeAck = SeqNo ( - 1 )
val nextSeq = {
var seqCounter : Long = 0L
( ) ⇒ {
val tmp = seqCounter
seqCounter += 1
SeqNo ( tmp )
}
}
var writer : ActorRef = createWriter ( )
var uid : Option [ Int ] = handleOrActive map { _ . handshakeInfo . uid }
override def postStop ( ) : Unit = {
// All remaining messages in the buffer has to be delivered to dead letters. It is important to clear the sequence
// number otherwise deadLetters will ignore it to avoid reporting system messages as dead letters while they are
// still possibly retransmitted.
// Such a situation may arise when the EndpointWriter is shut down, and all of its mailbox contents are delivered
// to dead letters. These messages should be ignored, as they still live in resendBuffer and might be delivered to
// the remote system later.
( resendBuffer . nacked ++ resendBuffer . nonAcked ) foreach { s ⇒ context . system . deadLetters ! s . copy ( seqOpt = None ) }
receiveBuffers . remove ( Link ( localAddress , remoteAddress ) )
}
override def postRestart ( reason : Throwable ) : Unit = {
throw new IllegalStateException (
"BUG: ReliableDeliverySupervisor has been attempted to be restarted. This must not happen." )
}
override def receive : Receive = {
case FlushAndStop ⇒
// Trying to serve until our last breath
resendAll ( )
writer ! FlushAndStop
context . become ( flushWait )
case s : Send ⇒
handleSend ( s )
case ack : Ack ⇒
resendBuffer = resendBuffer . acknowledge ( ack )
if ( lastCumulativeAck < ack . cumulativeAck ) {
resendDeadline = Deadline . now + settings . SysResendTimeout
lastCumulativeAck = ack . cumulativeAck
} else if ( resendDeadline . isOverdue ( ) ) {
resendAll ( )
resendDeadline = Deadline . now + settings . SysResendTimeout
}
resendNacked ( )
case Terminated ( _ ) ⇒
currentHandle = None
context . become ( idle )
case GotUid ( u ) ⇒ uid = Some ( u )
}
def gated : Receive = {
case Ungate ⇒
if ( resendBuffer . nonAcked . nonEmpty || resendBuffer . nacked . nonEmpty ) {
writer = createWriter ( )
resendAll ( )
context . become ( receive )
} else context . become ( idle )
case s @ Send ( msg : SystemMessage , _ , _ , _ ) ⇒ tryBuffer ( s . copy ( seqOpt = Some ( nextSeq ( ) ) ) )
case s : Send ⇒ context . system . deadLetters ! s
case FlushAndStop ⇒ context . stop ( self )
case _ ⇒ // Ignore
}
def idle : Receive = {
case s : Send ⇒
writer = createWriter ( )
resendAll ( )
handleSend ( s )
context . become ( receive )
case FlushAndStop ⇒ context . stop ( self )
}
def flushWait : Receive = {
case Terminated ( _ ) ⇒
// Clear buffer to prevent sending system messages to dead letters -- at this point we are shutting down
// and don't really know if they were properly delivered or not.
resendBuffer = new AckedSendBuffer [ Send ] ( 0 )
context . stop ( self )
case _ ⇒ // Ignore
}
private def handleSend ( send : Send ) : Unit =
if ( send . message . isInstanceOf [ SystemMessage ] ) {
val sequencedSend = send . copy ( seqOpt = Some ( nextSeq ( ) ) )
tryBuffer ( sequencedSend )
writer ! sequencedSend
} else writer ! send
private def resendNacked ( ) : Unit = resendBuffer . nacked foreach { writer ! _ }
private def resendAll ( ) : Unit = {
resendNacked ( )
resendBuffer . nonAcked foreach { writer ! _ }
}
private def tryBuffer ( s : Send ) : Unit =
try {
resendBuffer = resendBuffer buffer s
} catch {
case NonFatal ( e ) ⇒ throw new HopelessAssociation ( localAddress , remoteAddress , uid , e )
}
private def createWriter ( ) : ActorRef = {
context . watch ( context . actorOf ( EndpointWriter (
handleOrActive = currentHandle ,
localAddress = localAddress ,
remoteAddress = remoteAddress ,
transport = transport ,
settings = settings ,
AkkaPduProtobufCodec ,
2013-04-18 17:35:43 +02:00
refuseUid ,
2013-03-27 17:47:56 +01:00
receiveBuffers = receiveBuffers ,
reliableDeliverySupervisor = Some ( self ) )
. withDispatcher ( "akka.remote.writer-dispatcher" ) ,
"endpointWriter" ) )
}
}
/* *
* INTERNAL API
*/
2013-03-20 20:38:49 +13:00
private [ remote ] abstract class EndpointActor (
2012-09-12 11:18:42 +02:00
val localAddress : Address ,
val remoteAddress : Address ,
val transport : Transport ,
2013-01-17 16:19:31 +01:00
val settings : RemoteSettings ,
2013-03-20 20:38:49 +13:00
val codec : AkkaPduCodec ) extends Actor with ActorLogging {
def inbound : Boolean
val eventPublisher = new EventPublisher ( context . system , log , settings . LogRemoteLifecycleEvents )
def publishError ( reason : Throwable ) : Unit = {
try
eventPublisher . notifyListeners ( AssociationErrorEvent ( reason , localAddress , remoteAddress , inbound ) )
catch { case NonFatal ( e ) ⇒ log . error ( e , "Unable to publish error event to EventStream." ) }
}
}
2013-03-27 17:47:56 +01:00
/* *
* INTERNAL API
*/
private [ remote ] object EndpointWriter {
def apply (
handleOrActive : Option [ AkkaProtocolHandle ] ,
localAddress : Address ,
remoteAddress : Address ,
transport : Transport ,
settings : RemoteSettings ,
codec : AkkaPduCodec ,
2013-04-18 17:35:43 +02:00
refuseUid : Option [ Int ] ,
2013-03-27 17:47:56 +01:00
receiveBuffers : ConcurrentHashMap [ Link , AckedReceiveBuffer [ Message ] ] ,
reliableDeliverySupervisor : Option [ ActorRef ] ) : Props =
Props ( classOf [ EndpointWriter ] , handleOrActive , localAddress , remoteAddress , transport , settings , codec ,
2013-04-18 17:35:43 +02:00
refuseUid , receiveBuffers , reliableDeliverySupervisor )
2013-03-27 17:47:56 +01:00
/* *
* This message signals that the current association maintained by the local EndpointWriter and EndpointReader is
* to be overridden by a new inbound association . This is needed to avoid parallel inbound associations from the
* same remote endpoint : when a parallel inbound association is detected , the old one is removed and the new one is
* used instead .
* @param handle Handle of the new inbound association .
*/
case class TakeOver ( handle : AkkaProtocolHandle )
case object BackoffTimer
case object FlushAndStop
case object AckIdleCheckTimer
case class OutboundAck ( ack : Ack )
sealed trait State
case object Initializing extends State
case object Buffering extends State
case object Writing extends State
case object Handoff extends State
val AckIdleTimerName = "AckIdleTimer"
}
2013-03-20 20:38:49 +13:00
/* *
* INTERNAL API
*/
private [ remote ] class EndpointWriter (
2013-03-27 17:47:56 +01:00
handleOrActive : Option [ AkkaProtocolHandle ] ,
2013-03-20 20:38:49 +13:00
localAddress : Address ,
remoteAddress : Address ,
transport : Transport ,
settings : RemoteSettings ,
2013-03-27 17:47:56 +01:00
codec : AkkaPduCodec ,
2013-04-18 17:35:43 +02:00
val refuseUid : Option [ Int ] ,
2013-03-27 17:47:56 +01:00
val receiveBuffers : ConcurrentHashMap [ Link , AckedReceiveBuffer [ Message ] ] ,
2013-04-26 12:18:01 +02:00
val reliableDeliverySupervisor : Option [ ActorRef ] )
extends EndpointActor ( localAddress , remoteAddress , transport , settings , codec ) with UnboundedStash
with FSM [ EndpointWriter . State , Unit ] {
2012-09-12 11:18:42 +02:00
import EndpointWriter._
import context.dispatcher
val extendedSystem : ExtendedActorSystem = context . system . asInstanceOf [ ExtendedActorSystem ]
2012-11-21 16:39:04 +01:00
2012-12-07 16:03:04 +01:00
var reader : Option [ ActorRef ] = None
2013-03-27 17:47:56 +01:00
var handle : Option [ AkkaProtocolHandle ] = handleOrActive // FIXME: refactor into state data
2012-12-07 16:03:04 +01:00
val readerId = Iterator from 0
2012-09-12 11:18:42 +02:00
2013-03-27 17:47:56 +01:00
def newAckDeadline : Deadline = Deadline . now + settings . SysMsgAckTimeout
var ackDeadline : Deadline = newAckDeadline
var lastAck : Option [ Ack ] = None
2012-11-22 13:33:48 +01:00
override val supervisorStrategy = OneForOneStrategy ( ) { case NonFatal ( e ) ⇒ publishAndThrow ( e ) }
2012-09-12 11:18:42 +02:00
2012-12-21 18:11:56 +01:00
val msgDispatch = new DefaultMessageDispatcher ( extendedSystem , RARP ( extendedSystem ) . provider , log )
2012-09-12 11:18:42 +02:00
2013-01-14 10:04:49 +01:00
var inbound = handle . isDefined
2012-09-12 11:18:42 +02:00
2012-12-14 13:45:55 +01:00
private def publishAndThrow ( reason : Throwable ) : Nothing = {
2013-03-20 20:38:49 +13:00
publishError ( reason )
2012-12-14 13:45:55 +01:00
throw reason
}
2012-09-12 11:18:42 +02:00
2013-03-20 20:38:49 +13:00
private def publishAndStay ( reason : Throwable ) : State = {
publishError ( reason )
stay ( )
}
2012-09-12 11:18:42 +02:00
override def postRestart ( reason : Throwable ) : Unit = {
2012-12-07 16:03:04 +01:00
handle = None // Wipe out the possibly injected handle
2013-01-14 10:04:49 +01:00
inbound = false
2012-09-12 11:18:42 +02:00
preStart ( )
}
2013-03-27 17:47:56 +01:00
override def preStart ( ) : Unit = {
setTimer ( AckIdleTimerName , AckIdleCheckTimer , settings . SysMsgAckTimeout / 2 , repeat = true )
2012-12-21 18:11:56 +01:00
startWith (
handle match {
case Some ( h ) ⇒
reader = startReadEndpoint ( h )
Writing
case None ⇒
transport . associate ( remoteAddress ) pipeTo self
Initializing
} ,
2013-03-27 17:47:56 +01:00
stateData = ( ) )
}
2012-09-12 11:18:42 +02:00
when ( Initializing ) {
2013-03-27 17:47:56 +01:00
case Event ( Send ( msg , senderOption , recipient , _ ) , _ ) ⇒
2012-09-12 11:18:42 +02:00
stash ( )
2012-12-07 16:03:04 +01:00
stay ( )
2012-12-20 12:54:43 +01:00
case Event ( Status . Failure ( e : InvalidAssociationException ) , _ ) ⇒
2012-09-12 11:18:42 +02:00
publishAndThrow ( new InvalidAssociation ( localAddress , remoteAddress , e ) )
2012-12-20 12:54:43 +01:00
case Event ( Status . Failure ( e ) , _ ) ⇒
2013-03-07 18:08:07 +01:00
publishAndThrow ( new EndpointAssociationException ( s" Association failed with [ $remoteAddress ] " , e ) )
2013-03-27 17:47:56 +01:00
case Event ( inboundHandle : AkkaProtocolHandle , _ ) ⇒
2013-04-18 17:35:43 +02:00
refuseUid match {
case Some ( uid ) if inboundHandle . handshakeInfo . uid == uid ⇒
publishAndThrow ( new QuarantinedUidException ( inboundHandle . handshakeInfo . uid , inboundHandle . remoteAddress ) )
case _ ⇒ // Everything is fine
}
2012-12-21 18:11:56 +01:00
// Assert handle == None?
2013-03-27 17:47:56 +01:00
context . parent ! ReliableDeliverySupervisor . GotUid ( inboundHandle . handshakeInfo . uid )
2012-12-07 16:03:04 +01:00
handle = Some ( inboundHandle )
2012-12-21 18:11:56 +01:00
reader = startReadEndpoint ( inboundHandle )
2012-09-12 11:18:42 +02:00
goto ( Writing )
}
when ( Buffering ) {
2013-03-27 17:47:56 +01:00
case Event ( _ : Send , _ ) ⇒
2012-09-12 11:18:42 +02:00
stash ( )
2012-12-07 16:03:04 +01:00
stay ( )
2012-09-12 11:18:42 +02:00
case Event ( BackoffTimer , _ ) ⇒ goto ( Writing )
2013-02-25 12:11:39 +01:00
case Event ( FlushAndStop , _ ) ⇒
stash ( ) // Flushing is postponed after the pending writes
stay ( )
2012-09-12 11:18:42 +02:00
}
when ( Writing ) {
2013-03-27 17:47:56 +01:00
case Event ( s @ Send ( msg , senderOption , recipient , seqOption ) , _ ) ⇒
2012-12-21 18:11:56 +01:00
try {
2012-12-14 13:45:55 +01:00
handle match {
2012-12-21 18:11:56 +01:00
case Some ( h ) ⇒
2013-03-27 17:47:56 +01:00
ackDeadline = newAckDeadline
val pdu = codec . constructMessage (
recipient . localAddressToUse ,
recipient ,
serializeMessage ( msg ) ,
senderOption ,
seqOption = seqOption ,
ackOption = lastAck )
2013-03-20 20:38:49 +13:00
if ( pdu . size > transport . maximumPayloadBytes ) {
publishAndStay ( new OversizedPayloadException ( s" Discarding oversized payload sent to ${ recipient } : max allowed size ${ transport . maximumPayloadBytes } bytes, actual size of encoded ${ msg . getClass } was ${ pdu . size } bytes. " ) )
} else if ( h . write ( pdu ) ) {
stay ( )
} else {
2013-03-27 17:47:56 +01:00
if ( seqOption . isEmpty ) stash ( )
2012-12-21 18:11:56 +01:00
goto ( Buffering )
}
case None ⇒
throw new EndpointException ( "Internal error: Endpoint is in state Writing, but no association handle is present." )
2012-12-14 13:45:55 +01:00
}
2012-12-07 16:03:04 +01:00
} catch {
2013-03-20 20:38:49 +13:00
case e : NotSerializableException ⇒ publishAndStay ( e )
case e : EndpointException ⇒ publishAndThrow ( e )
case NonFatal ( e ) ⇒ publishAndThrow ( new EndpointException ( "Failed to write message to the transport" , e ) )
2012-09-12 11:18:42 +02:00
}
2013-02-25 12:11:39 +01:00
2013-03-20 20:38:49 +13:00
// We are in Writing state, so stash is empty, safe to stop here
2013-03-27 17:47:56 +01:00
case Event ( FlushAndStop , _ ) ⇒
// Try to send a last Ack message
trySendPureAck ( )
stop ( )
case Event ( AckIdleCheckTimer , _ ) if ackDeadline . isOverdue ( ) ⇒
trySendPureAck ( )
stay ( )
2012-09-12 11:18:42 +02:00
}
2013-01-14 10:04:49 +01:00
when ( Handoff ) {
case Event ( Terminated ( _ ) , _ ) ⇒
reader = startReadEndpoint ( handle . get )
unstashAll ( )
goto ( Writing )
2013-03-27 17:47:56 +01:00
case Event ( Send ( msg , senderOption , recipient , _ ) , _ ) ⇒
2013-01-14 10:04:49 +01:00
stash ( )
stay ( )
}
2012-09-12 11:18:42 +02:00
whenUnhandled {
2013-03-27 17:47:56 +01:00
case Event ( Terminated ( r ) , _ ) if r == reader . orNull ⇒
publishAndThrow ( new EndpointDisassociatedException ( "Disassociated" ) )
2012-11-21 16:39:04 +01:00
case Event ( TakeOver ( newHandle ) , _ ) ⇒
// Shutdown old reader
2012-12-07 16:03:04 +01:00
handle foreach { _ . disassociate ( ) }
2013-01-14 10:04:49 +01:00
reader foreach context . stop
2012-12-07 16:03:04 +01:00
handle = Some ( newHandle )
2013-01-14 10:04:49 +01:00
goto ( Handoff )
2013-02-25 12:11:39 +01:00
case Event ( FlushAndStop , _ ) ⇒
stop ( )
2013-03-27 17:47:56 +01:00
case Event ( OutboundAck ( ack ) , _ ) ⇒
lastAck = Some ( ack )
trySendPureAck ( )
stay ( )
case Event ( AckIdleCheckTimer , _ ) ⇒ stay ( ) // Ignore
2012-09-12 11:18:42 +02:00
}
onTransition {
case Initializing -> Writing ⇒
unstashAll ( )
eventPublisher . notifyListeners ( AssociatedEvent ( localAddress , remoteAddress , inbound ) )
2012-12-07 16:03:04 +01:00
case Writing -> Buffering ⇒
setTimer ( "backoff-timer" , BackoffTimer , settings . BackoffPeriod , repeat = false )
2012-09-12 11:18:42 +02:00
case Buffering -> Writing ⇒
unstashAll ( )
cancelTimer ( "backoff-timer" )
}
onTermination {
2012-12-18 13:46:10 +01:00
case StopEvent ( _ , _ , _ ) ⇒
2013-03-27 17:47:56 +01:00
cancelTimer ( AckIdleTimerName )
2012-12-07 16:03:04 +01:00
// It is important to call unstashAll() for the stash to work properly and maintain messages during restart.
// As the FSM trait does not call super.postStop(), this call is needed
2012-11-21 16:39:04 +01:00
unstashAll ( )
2012-12-07 16:03:04 +01:00
handle foreach { _ . disassociate ( ) }
2012-09-12 11:18:42 +02:00
eventPublisher . notifyListeners ( DisassociatedEvent ( localAddress , remoteAddress , inbound ) )
}
2013-03-27 17:47:56 +01:00
private def trySendPureAck ( ) : Unit = for ( h ← handle ; ack ← lastAck )
if ( h . write ( codec . constructPureAck ( ack ) ) ) ackDeadline = newAckDeadline
private def startReadEndpoint ( handle : AkkaProtocolHandle ) : Some [ ActorRef ] = {
2012-12-21 18:11:56 +01:00
val newReader =
2013-03-20 20:38:49 +13:00
context . watch ( context . actorOf (
2013-03-27 17:47:56 +01:00
EndpointReader ( localAddress , remoteAddress , transport , settings , codec ,
msgDispatch , inbound , reliableDeliverySupervisor , receiveBuffers ) ,
2012-12-21 18:11:56 +01:00
"endpointReader-" + AddressUrlEncoder ( remoteAddress ) + "-" + readerId . next ( ) ) )
handle . readHandlerPromise . success ( ActorHandleEventListener ( newReader ) )
Some ( newReader )
2012-09-12 11:18:42 +02:00
}
2013-03-27 17:47:56 +01:00
private def serializeMessage ( msg : Any ) : SerializedMessage = handle match {
2012-12-07 16:03:04 +01:00
case Some ( h ) ⇒
2013-03-26 11:25:09 +01:00
Serialization . currentTransportInformation . withValue ( Serialization . Information ( h . localAddress , context . system ) ) {
2012-12-07 16:03:04 +01:00
( MessageSerializer . serialize ( extendedSystem , msg . asInstanceOf [ AnyRef ] ) )
}
case None ⇒ throw new EndpointException ( "Internal error: No handle was present during serialization of" +
2012-12-18 13:46:10 +01:00
"outbound message." )
2012-09-12 11:18:42 +02:00
}
}
2013-03-27 17:47:56 +01:00
/* *
* INTERNAL API
*/
private [ remote ] object EndpointReader {
def apply (
localAddress : Address ,
remoteAddress : Address ,
transport : Transport ,
settings : RemoteSettings ,
codec : AkkaPduCodec ,
msgDispatch : InboundMessageDispatcher ,
inbound : Boolean ,
reliableDeliverySupervisor : Option [ ActorRef ] ,
receiveBuffers : ConcurrentHashMap [ Link , AckedReceiveBuffer [ Message ] ] ) : Props =
Props ( classOf [ EndpointReader ] , localAddress , remoteAddress , transport , settings , codec , msgDispatch , inbound ,
reliableDeliverySupervisor , receiveBuffers )
}
2013-02-08 13:13:52 +01:00
/* *
* INTERNAL API
*/
2012-09-12 11:18:42 +02:00
private [ remote ] class EndpointReader (
2013-03-20 20:38:49 +13:00
localAddress : Address ,
remoteAddress : Address ,
transport : Transport ,
settings : RemoteSettings ,
codec : AkkaPduCodec ,
msgDispatch : InboundMessageDispatcher ,
2013-03-27 17:47:56 +01:00
val inbound : Boolean ,
val reliableDeliverySupervisor : Option [ ActorRef ] ,
val receiveBuffers : ConcurrentHashMap [ Link , AckedReceiveBuffer [ Message ] ] ) extends EndpointActor ( localAddress , remoteAddress , transport , settings , codec ) {
import EndpointWriter.OutboundAck
2012-09-12 11:18:42 +02:00
2012-12-11 13:08:36 +01:00
val provider = RARP ( context . system ) . provider
2013-03-27 17:47:56 +01:00
var ackedReceiveBuffer = new AckedReceiveBuffer [ Message ]
override def preStart ( ) : Unit = {
receiveBuffers . get ( Link ( localAddress , remoteAddress ) ) match {
case null ⇒
case buf ⇒
ackedReceiveBuffer = buf
deliverAndAck ( )
}
}
override def postStop ( ) : Unit = {
@tailrec
def updateSavedState ( key : Link , expectedState : AckedReceiveBuffer [ Message ] ) : Unit = {
if ( expectedState eq null ) {
if ( receiveBuffers . putIfAbsent ( key , ackedReceiveBuffer ) ne null ) updateSavedState ( key , receiveBuffers . get ( key ) )
} else if ( ! receiveBuffers . replace ( key , expectedState , expectedState . mergeFrom ( ackedReceiveBuffer ) ) )
updateSavedState ( key , receiveBuffers . get ( key ) )
}
val key = Link ( localAddress , remoteAddress )
updateSavedState ( key , receiveBuffers . get ( key ) )
}
2012-09-12 11:18:42 +02:00
override def receive : Receive = {
case Disassociated ⇒ context . stop ( self )
2013-03-27 17:47:56 +01:00
case InboundPayload ( p ) if p . size <= transport . maximumPayloadBytes ⇒
val ( ackOption , msgOption ) = tryDecodeMessageAndAck ( p )
for ( ack ← ackOption ; reliableDelivery ← reliableDeliverySupervisor ) reliableDelivery ! ack
msgOption match {
case Some ( msg ) ⇒
if ( msg . reliableDeliveryEnabled ) {
ackedReceiveBuffer = ackedReceiveBuffer . receive ( msg )
deliverAndAck ( )
} else msgDispatch . dispatch ( msg . recipient , msg . recipientAddress , msg . serializedMessage , msg . senderOption )
case None ⇒
2013-03-20 20:38:49 +13:00
}
2013-03-27 17:47:56 +01:00
case InboundPayload ( oversized ) ⇒
publishError ( new OversizedPayloadException ( s" Discarding oversized payload received: " +
s" max allowed size [ ${ transport . maximumPayloadBytes } ] bytes, actual size [ ${ oversized . size } ] bytes. " ) )
}
private def deliverAndAck ( ) : Unit = {
val ( updatedBuffer , deliver , ack ) = ackedReceiveBuffer . extractDeliverable
ackedReceiveBuffer = updatedBuffer
// Notify writer that some messages can be acked
context . parent ! OutboundAck ( ack )
deliver foreach { m ⇒
msgDispatch . dispatch ( m . recipient , m . recipientAddress , m . serializedMessage , m . senderOption )
}
2012-09-12 11:18:42 +02:00
}
2013-03-27 17:47:56 +01:00
private def tryDecodeMessageAndAck ( pdu : ByteString ) : ( Option [ Ack ] , Option [ Message ] ) = try {
2012-11-21 15:58:01 +01:00
codec . decodeMessage ( pdu , provider , localAddress )
2012-09-12 11:18:42 +02:00
} catch {
case NonFatal ( e ) ⇒ throw new EndpointException ( "Error while decoding incoming Akka PDU" , e )
}
}