2013-01-09 12:21:31 +01:00
/* *
* Copyright ( C ) 2009 - 2013 Typesafe Inc . < http : //www.typesafe.com>
*/
2012-09-12 11:18:42 +02:00
package akka.remote
2013-06-19 14:09:14 +02:00
import akka.actor.OneForOneStrategy
import akka.actor.SupervisorStrategy._
import akka.actor.Terminated
2012-09-12 11:18:42 +02:00
import akka.actor._
2013-03-05 16:19:54 +01:00
import akka.dispatch.sysmsg.SystemMessage
2013-08-26 15:41:05 +02:00
import akka.event. { Logging , LoggingAdapter }
2012-09-12 11:18:42 +02:00
import akka.pattern.pipe
2013-08-19 15:34:24 +02:00
import akka.remote.EndpointManager. { ResendState , Link , Send }
2013-06-19 14:09:14 +02:00
import akka.remote.EndpointWriter. { StoppedReading , FlushAndStop }
2013-03-27 17:47:56 +01:00
import akka.remote.WireFormats.SerializedMessage
2013-06-19 14:09:14 +02:00
import akka.remote.transport.AkkaPduCodec.Message
import akka.remote.transport.AssociationHandle. { DisassociateInfo , ActorHandleEventListener , Disassociated , InboundPayload }
2013-03-27 17:47:56 +01:00
import akka.remote.transport.Transport.InvalidAssociationException
2013-06-19 14:09:14 +02:00
import akka.remote.transport._
2013-03-26 11:25:09 +01:00
import akka.serialization.Serialization
2012-09-12 11:18:42 +02:00
import akka.util.ByteString
2013-03-27 17:47:56 +01:00
import akka. { OnlyCauseStackTrace , AkkaException }
2013-03-20 20:38:49 +13:00
import java.io.NotSerializableException
2013-03-27 17:47:56 +01:00
import java.util.concurrent.ConcurrentHashMap
2013-06-19 14:09:14 +02:00
import scala.annotation.tailrec
2013-03-27 17:47:56 +01:00
import scala.concurrent.duration. { Duration , Deadline }
import scala.util.control.NonFatal
2012-09-12 11:18:42 +02:00
2012-12-07 16:03:04 +01:00
/* *
2013-02-08 13:13:52 +01:00
* INTERNAL API
2012-12-07 16:03:04 +01:00
*/
private [ remote ] trait InboundMessageDispatcher {
2012-09-12 11:18:42 +02:00
def dispatch ( recipient : InternalActorRef ,
recipientAddress : Address ,
2013-03-27 17:47:56 +01:00
serializedMessage : SerializedMessage ,
2012-09-12 11:18:42 +02:00
senderOption : Option [ ActorRef ] ) : Unit
}
2013-02-08 13:13:52 +01:00
/* *
* INTERNAL API
*/
2012-12-07 16:03:04 +01:00
private [ remote ] class DefaultMessageDispatcher ( private val system : ExtendedActorSystem ,
private val provider : RemoteActorRefProvider ,
private val log : LoggingAdapter ) extends InboundMessageDispatcher {
2012-09-12 11:18:42 +02:00
private val remoteDaemon = provider . remoteDaemon
override def dispatch ( recipient : InternalActorRef ,
recipientAddress : Address ,
2013-03-27 17:47:56 +01:00
serializedMessage : SerializedMessage ,
2012-09-12 11:18:42 +02:00
senderOption : Option [ ActorRef ] ) : Unit = {
import provider.remoteSettings._
lazy val payload : AnyRef = MessageSerializer . deserialize ( system , serializedMessage )
2012-12-07 16:03:04 +01:00
def payloadClass : Class [ _ ] = if ( payload eq null ) null else payload . getClass
2012-09-12 11:18:42 +02:00
val sender : ActorRef = senderOption . getOrElse ( system . deadLetters )
val originalReceiver = recipient . path
2014-01-16 15:16:35 +01:00
def msgLog = s" RemoteMessage: [ $payload ] to [ $recipient ]<+[ $originalReceiver ] from [ $sender ()] "
2012-09-12 11:18:42 +02:00
recipient match {
case `remoteDaemon` ⇒
2012-12-12 15:04:44 +01:00
if ( UntrustedMode ) log . debug ( "dropping daemon message in untrusted mode" )
else {
2012-11-23 10:15:19 +01:00
if ( LogReceive ) log . debug ( "received daemon message {}" , msgLog )
2013-03-26 18:17:50 +01:00
remoteDaemon ! payload
2012-09-12 11:18:42 +02:00
}
case l @ ( _ : LocalRef | _ : RepointableRef ) if l . isLocal ⇒
if ( LogReceive ) log . debug ( "received local message {}" , msgLog )
payload match {
2013-11-06 14:34:07 +01:00
case sel : ActorSelectionMessage ⇒
if ( UntrustedMode && ( ! TrustedSelectionPaths . contains ( sel . elements . mkString ( "/" , "/" , "" ) ) ||
sel . msg . isInstanceOf [ PossiblyHarmful ] || l != provider . rootGuardian ) )
log . debug ( "operating in UntrustedMode, dropping inbound actor selection to [{}], " +
"allow it by adding the path to 'akka.remote.trusted-selection-paths' configuration" ,
sel . elements . mkString ( "/" , "/" , "" ) )
else
// run the receive logic for ActorSelectionMessage here to make sure it is not stuck on busy user actor
ActorSelection . deliverSelection ( l , sender , sel )
2012-09-12 11:18:42 +02:00
case msg : PossiblyHarmful if UntrustedMode ⇒
2013-11-06 14:34:07 +01:00
log . debug ( "operating in UntrustedMode, dropping inbound PossiblyHarmful message of type [{}]" , msg . getClass . getName )
2012-09-12 11:18:42 +02:00
case msg : SystemMessage ⇒ l . sendSystemMessage ( msg )
2013-11-06 14:34:07 +01:00
case msg ⇒ l . ! ( msg ) ( sender )
2012-09-12 11:18:42 +02:00
}
case r @ ( _ : RemoteRef | _ : RepointableRef ) if ! r . isLocal && ! UntrustedMode ⇒
if ( LogReceive ) log . debug ( "received remote-destined message {}" , msgLog )
if ( provider . transport . addresses ( recipientAddress ) )
// if it was originally addressed to us but is in fact remote from our point of view (i.e. remote-deployed)
r . ! ( payload ) ( sender )
else
2013-05-30 21:37:57 +02:00
log . error ( "dropping message [{}] for non-local recipient [{}] arriving at [{}] inbound addresses are [{}]" ,
payloadClass , r , recipientAddress , provider . transport . addresses . mkString ( ", " ) )
2012-09-12 11:18:42 +02:00
2013-05-30 21:37:57 +02:00
case r ⇒ log . error ( "dropping message [{}] for unknown recipient [{}] arriving at [{}] inbound addresses are [{}]" ,
payloadClass , r , recipientAddress , provider . transport . addresses . mkString ( ", " ) )
2012-09-12 11:18:42 +02:00
}
}
}
2013-02-08 13:13:52 +01:00
/* *
* INTERNAL API
*/
@SerialVersionUID ( 1L )
2012-12-20 12:59:48 +01:00
private [ remote ] class EndpointException ( msg : String , cause : Throwable ) extends AkkaException ( msg , cause ) with OnlyCauseStackTrace {
2012-12-18 13:46:10 +01:00
def this ( msg : String ) = this ( msg , null )
}
2013-10-09 11:29:02 +02:00
/* *
* INTERNAL API
*/
private [ remote ] trait AssociationProblem
2013-02-08 13:13:52 +01:00
/* *
* INTERNAL API
*/
2013-03-07 18:08:07 +01:00
@SerialVersionUID ( 1L )
2013-10-09 11:29:02 +02:00
private [ remote ] case class ShutDownAssociation ( localAddress : Address , remoteAddress : Address , cause : Throwable )
extends EndpointException ( "Shut down address: " + remoteAddress , cause ) with AssociationProblem
2012-09-12 11:18:42 +02:00
2013-03-27 17:47:56 +01:00
/* *
* INTERNAL API
*/
@SerialVersionUID ( 1L )
2013-10-09 11:29:02 +02:00
private [ remote ] case class InvalidAssociation ( localAddress : Address , remoteAddress : Address , cause : Throwable )
extends EndpointException ( "Invalid address: " + remoteAddress , cause ) with AssociationProblem
2013-03-27 17:47:56 +01:00
2013-03-07 18:08:07 +01:00
/* *
* INTERNAL API
*/
@SerialVersionUID ( 1L )
2013-10-09 11:29:02 +02:00
private [ remote ] case class HopelessAssociation ( localAddress : Address , remoteAddress : Address , uid : Option [ Int ] , cause : Throwable )
extends EndpointException ( "Catastrophic association error." ) with AssociationProblem
2013-03-07 18:08:07 +01:00
/* *
* INTERNAL API
*/
@SerialVersionUID ( 1L )
2013-10-09 11:29:02 +02:00
private [ remote ] class EndpointDisassociatedException ( msg : String ) extends EndpointException ( msg )
2013-03-07 18:08:07 +01:00
2013-04-18 17:35:43 +02:00
/* *
* INTERNAL API
*/
@SerialVersionUID ( 1L )
2013-10-09 11:29:02 +02:00
private [ remote ] class EndpointAssociationException ( msg : String , cause : Throwable ) extends EndpointException ( msg , cause )
2013-04-18 17:35:43 +02:00
2013-02-08 13:13:52 +01:00
/* *
* INTERNAL API
*/
2013-03-20 20:38:49 +13:00
@SerialVersionUID ( 1L )
private [ remote ] class OversizedPayloadException ( msg : String ) extends EndpointException ( msg )
2013-03-27 17:47:56 +01:00
/* *
* INTERNAL API
*/
private [ remote ] object ReliableDeliverySupervisor {
case object Ungate
2013-06-19 14:09:14 +02:00
case object AttemptSysMsgRedelivery
2013-03-27 17:47:56 +01:00
case class GotUid ( uid : Int )
2013-05-30 14:03:35 +02:00
def props (
2013-03-27 17:47:56 +01:00
handleOrActive : Option [ AkkaProtocolHandle ] ,
localAddress : Address ,
remoteAddress : Address ,
2013-11-26 15:25:05 +01:00
refuseUid : Option [ Int ] ,
transport : AkkaProtocolTransport ,
2013-03-27 17:47:56 +01:00
settings : RemoteSettings ,
codec : AkkaPduCodec ,
2013-08-19 15:34:24 +02:00
receiveBuffers : ConcurrentHashMap [ Link , ResendState ] ) : Props =
2013-11-26 15:25:05 +01:00
Props ( classOf [ ReliableDeliverySupervisor ] , handleOrActive , localAddress , remoteAddress , refuseUid , transport , settings ,
2013-05-24 14:01:29 +02:00
codec , receiveBuffers )
2013-03-27 17:47:56 +01:00
}
/* *
* INTERNAL API
*/
private [ remote ] class ReliableDeliverySupervisor (
handleOrActive : Option [ AkkaProtocolHandle ] ,
val localAddress : Address ,
val remoteAddress : Address ,
2013-11-26 15:25:05 +01:00
val refuseUid : Option [ Int ] ,
val transport : AkkaProtocolTransport ,
2013-03-27 17:47:56 +01:00
val settings : RemoteSettings ,
val codec : AkkaPduCodec ,
2013-12-12 13:05:59 +01:00
val receiveBuffers : ConcurrentHashMap [ Link , ResendState ] ) extends Actor with ActorLogging {
2013-03-27 17:47:56 +01:00
import ReliableDeliverySupervisor._
2013-06-19 14:09:14 +02:00
import context.dispatcher
2013-03-27 17:47:56 +01:00
2013-10-08 09:56:59 +02:00
var autoResendTimer : Option [ Cancellable ] = None
def scheduleAutoResend ( ) : Unit = if ( resendBuffer . nacked . nonEmpty || resendBuffer . nonAcked . nonEmpty ) {
if ( autoResendTimer . isEmpty )
autoResendTimer = Some ( context . system . scheduler . scheduleOnce ( settings . SysResendTimeout , self , AttemptSysMsgRedelivery ) )
}
def rescheduleAutoResend ( ) : Unit = {
autoResendTimer . foreach ( _ . cancel ( ) )
autoResendTimer = None
scheduleAutoResend ( )
}
2013-12-12 13:05:59 +01:00
override val supervisorStrategy = OneForOneStrategy ( loggingEnabled = false ) {
2013-10-09 11:29:02 +02:00
case e @ ( _ : AssociationProblem ) ⇒ Escalate
2013-03-27 17:47:56 +01:00
case NonFatal ( e ) ⇒
2013-12-12 13:05:59 +01:00
log . warning ( "Association with remote system [{}] has failed, address is now gated for [{}] ms. Reason is: [{}]." ,
remoteAddress , settings . RetryGateClosedFor . toMillis , e . getMessage )
2013-09-17 14:17:31 +02:00
uidConfirmed = false // Need confirmation of UID again
2013-12-12 13:05:59 +01:00
context . become ( gated )
context . system . scheduler . scheduleOnce ( settings . RetryGateClosedFor , self , Ungate )
context . unwatch ( writer )
currentHandle = None
context . parent ! StoppedReading ( self )
Stop
2013-03-27 17:47:56 +01:00
}
var currentHandle : Option [ AkkaProtocolHandle ] = handleOrActive
2013-08-19 15:34:24 +02:00
var resendBuffer : AckedSendBuffer [ Send ] = _
var lastCumulativeAck : SeqNo = _
var seqCounter : Long = _
2013-09-17 14:17:31 +02:00
var pendingAcks = Vector . empty [ Ack ]
2013-08-19 15:34:24 +02:00
def reset ( ) {
resendBuffer = new AckedSendBuffer [ Send ] ( settings . SysMsgBufferSize )
2013-10-08 09:56:59 +02:00
scheduleAutoResend ( )
2013-08-19 15:34:24 +02:00
lastCumulativeAck = SeqNo ( - 1 )
seqCounter = 0L
2013-09-17 14:17:31 +02:00
pendingAcks = Vector . empty
2013-08-19 15:34:24 +02:00
}
reset ( )
def nextSeq ( ) : SeqNo = {
val tmp = seqCounter
seqCounter += 1
SeqNo ( tmp )
2013-03-27 17:47:56 +01:00
}
var writer : ActorRef = createWriter ( )
var uid : Option [ Int ] = handleOrActive map { _ . handshakeInfo . uid }
2013-09-17 14:17:31 +02:00
// Processing of Acks has to be delayed until the UID after a reconnect is discovered. Depending whether the
// UID matches the expected one, pending Acks can be processed, or must be dropped. It is guaranteed that for
// any inbound connections (calling createWriter()) the first message from that connection is GotUid() therefore
// it serves a separator.
// If we already have an inbound handle then UID is initially confirmed.
// (This actor is never restarted)
var uidConfirmed : Boolean = uid . isDefined
def unstashAcks ( ) : Unit = {
pendingAcks foreach ( self ! _ )
pendingAcks = Vector . empty
}
2013-03-27 17:47:56 +01:00
override def postStop ( ) : Unit = {
// All remaining messages in the buffer has to be delivered to dead letters. It is important to clear the sequence
// number otherwise deadLetters will ignore it to avoid reporting system messages as dead letters while they are
// still possibly retransmitted.
// Such a situation may arise when the EndpointWriter is shut down, and all of its mailbox contents are delivered
// to dead letters. These messages should be ignored, as they still live in resendBuffer and might be delivered to
// the remote system later.
( resendBuffer . nacked ++ resendBuffer . nonAcked ) foreach { s ⇒ context . system . deadLetters ! s . copy ( seqOpt = None ) }
receiveBuffers . remove ( Link ( localAddress , remoteAddress ) )
}
override def postRestart ( reason : Throwable ) : Unit = {
throw new IllegalStateException (
"BUG: ReliableDeliverySupervisor has been attempted to be restarted. This must not happen." )
}
override def receive : Receive = {
case FlushAndStop ⇒
// Trying to serve until our last breath
resendAll ( )
writer ! FlushAndStop
context . become ( flushWait )
case s : Send ⇒
handleSend ( s )
case ack : Ack ⇒
2013-09-17 14:17:31 +02:00
if ( ! uidConfirmed ) pendingAcks = pendingAcks : + ack
else {
try resendBuffer = resendBuffer . acknowledge ( ack )
catch {
case NonFatal ( e ) ⇒
throw new InvalidAssociationException ( s" Error encountered while processing system message acknowledgement $resendBuffer $ack " , e )
}
2013-08-19 15:34:24 +02:00
2013-09-17 14:17:31 +02:00
if ( lastCumulativeAck < ack . cumulativeAck ) {
lastCumulativeAck = ack . cumulativeAck
2013-10-08 09:56:59 +02:00
// Cumulative ack is progressing, we might not need to resend non-acked messages yet.
// If this progression stops, the timer will eventually kick in, since scheduleAutoResend
// does not cancel existing timers (see the "else" case).
rescheduleAutoResend ( )
} else scheduleAutoResend ( )
2013-09-17 14:17:31 +02:00
resendNacked ( )
2013-03-27 17:47:56 +01:00
}
2013-10-08 09:56:59 +02:00
case AttemptSysMsgRedelivery ⇒
if ( uidConfirmed ) resendAll ( )
2013-03-27 17:47:56 +01:00
case Terminated ( _ ) ⇒
currentHandle = None
2013-06-19 14:09:14 +02:00
context . parent ! StoppedReading ( self )
if ( resendBuffer . nonAcked . nonEmpty || resendBuffer . nacked . nonEmpty )
context . system . scheduler . scheduleOnce ( settings . SysResendTimeout , self , AttemptSysMsgRedelivery )
2013-03-27 17:47:56 +01:00
context . become ( idle )
2013-09-17 14:17:31 +02:00
case GotUid ( receivedUid ) ⇒
2013-08-19 15:34:24 +02:00
// New system that has the same address as the old - need to start from fresh state
2013-09-17 14:17:31 +02:00
uidConfirmed = true
if ( uid . exists ( _ != receivedUid ) ) reset ( )
else unstashAcks ( )
uid = Some ( receivedUid )
resendAll ( )
2013-06-03 11:35:37 +02:00
case s : EndpointWriter . StopReading ⇒ writer forward s
2013-03-27 17:47:56 +01:00
}
def gated : Receive = {
case Ungate ⇒
if ( resendBuffer . nonAcked . nonEmpty || resendBuffer . nacked . nonEmpty ) {
writer = createWriter ( )
2013-09-17 14:17:31 +02:00
// Resending will be triggered by the incoming GotUid message after the connection finished
2013-03-27 17:47:56 +01:00
context . become ( receive )
} else context . become ( idle )
case s @ Send ( msg : SystemMessage , _ , _ , _ ) ⇒ tryBuffer ( s . copy ( seqOpt = Some ( nextSeq ( ) ) ) )
case s : Send ⇒ context . system . deadLetters ! s
2013-06-03 11:35:37 +02:00
case EndpointWriter . FlushAndStop ⇒ context . stop ( self )
2014-01-16 15:16:35 +01:00
case EndpointWriter . StopReading ( w ) ⇒ sender ( ) ! EndpointWriter . StoppedReading ( w )
2013-03-27 17:47:56 +01:00
case _ ⇒ // Ignore
}
def idle : Receive = {
case s : Send ⇒
writer = createWriter ( )
2013-09-17 14:17:31 +02:00
// Resending will be triggered by the incoming GotUid message after the connection finished
2013-03-27 17:47:56 +01:00
handleSend ( s )
context . become ( receive )
2013-06-19 14:09:14 +02:00
case AttemptSysMsgRedelivery ⇒
writer = createWriter ( )
2013-09-17 14:17:31 +02:00
// Resending will be triggered by the incoming GotUid message after the connection finished
2013-06-19 14:09:14 +02:00
context . become ( receive )
2013-06-03 11:35:37 +02:00
case EndpointWriter . FlushAndStop ⇒ context . stop ( self )
2014-01-16 15:16:35 +01:00
case EndpointWriter . StopReading ( w ) ⇒ sender ( ) ! EndpointWriter . StoppedReading ( w )
2013-03-27 17:47:56 +01:00
}
def flushWait : Receive = {
case Terminated ( _ ) ⇒
// Clear buffer to prevent sending system messages to dead letters -- at this point we are shutting down
// and don't really know if they were properly delivered or not.
resendBuffer = new AckedSendBuffer [ Send ] ( 0 )
context . stop ( self )
case _ ⇒ // Ignore
}
private def handleSend ( send : Send ) : Unit =
if ( send . message . isInstanceOf [ SystemMessage ] ) {
val sequencedSend = send . copy ( seqOpt = Some ( nextSeq ( ) ) )
tryBuffer ( sequencedSend )
2013-09-17 14:17:31 +02:00
// If we have not confirmed the remote UID we cannot transfer the system message at this point just buffer it.
// GotUid will kick resendAll() causing the messages to be properly written
if ( uidConfirmed ) writer ! sequencedSend
2013-03-27 17:47:56 +01:00
} else writer ! send
private def resendNacked ( ) : Unit = resendBuffer . nacked foreach { writer ! _ }
private def resendAll ( ) : Unit = {
resendNacked ( )
resendBuffer . nonAcked foreach { writer ! _ }
2013-10-08 09:56:59 +02:00
rescheduleAutoResend ( )
2013-03-27 17:47:56 +01:00
}
private def tryBuffer ( s : Send ) : Unit =
try {
resendBuffer = resendBuffer buffer s
} catch {
case NonFatal ( e ) ⇒ throw new HopelessAssociation ( localAddress , remoteAddress , uid , e )
}
private def createWriter ( ) : ActorRef = {
2013-08-19 12:06:07 +02:00
context . watch ( context . actorOf ( RARP ( context . system ) . configureDispatcher ( EndpointWriter . props (
2013-03-27 17:47:56 +01:00
handleOrActive = currentHandle ,
localAddress = localAddress ,
remoteAddress = remoteAddress ,
2013-11-26 15:25:05 +01:00
refuseUid ,
2013-03-27 17:47:56 +01:00
transport = transport ,
settings = settings ,
AkkaPduProtobufCodec ,
receiveBuffers = receiveBuffers ,
2013-08-19 12:06:07 +02:00
reliableDeliverySupervisor = Some ( self ) ) ) . withDeploy ( Deploy . local ) , "endpointWriter" ) )
2013-03-27 17:47:56 +01:00
}
}
/* *
* INTERNAL API
*/
2013-03-20 20:38:49 +13:00
private [ remote ] abstract class EndpointActor (
2012-09-12 11:18:42 +02:00
val localAddress : Address ,
val remoteAddress : Address ,
val transport : Transport ,
2013-01-17 16:19:31 +01:00
val settings : RemoteSettings ,
2013-03-20 20:38:49 +13:00
val codec : AkkaPduCodec ) extends Actor with ActorLogging {
def inbound : Boolean
2013-08-26 15:41:05 +02:00
val eventPublisher = new EventPublisher ( context . system , log , settings . RemoteLifecycleEventsLogLevel )
2013-03-20 20:38:49 +13:00
2013-08-26 15:41:05 +02:00
def publishError ( reason : Throwable , logLevel : Logging . LogLevel ) : Unit =
tryPublish ( AssociationErrorEvent ( reason , localAddress , remoteAddress , inbound , logLevel ) )
def publishDisassociated ( ) : Unit = tryPublish ( DisassociatedEvent ( localAddress , remoteAddress , inbound ) )
private def tryPublish ( ev : AssociationEvent ) : Unit = try
eventPublisher . notifyListeners ( ev )
catch { case NonFatal ( e ) ⇒ log . error ( e , "Unable to publish error event to EventStream." ) }
2013-03-20 20:38:49 +13:00
}
2013-03-27 17:47:56 +01:00
/* *
* INTERNAL API
*/
private [ remote ] object EndpointWriter {
2013-05-30 14:03:35 +02:00
def props (
2013-03-27 17:47:56 +01:00
handleOrActive : Option [ AkkaProtocolHandle ] ,
localAddress : Address ,
remoteAddress : Address ,
2013-11-26 15:25:05 +01:00
refuseUid : Option [ Int ] ,
transport : AkkaProtocolTransport ,
2013-03-27 17:47:56 +01:00
settings : RemoteSettings ,
codec : AkkaPduCodec ,
2013-08-19 15:34:24 +02:00
receiveBuffers : ConcurrentHashMap [ Link , ResendState ] ,
2013-03-27 17:47:56 +01:00
reliableDeliverySupervisor : Option [ ActorRef ] ) : Props =
2013-11-26 15:25:05 +01:00
Props ( classOf [ EndpointWriter ] , handleOrActive , localAddress , remoteAddress , refuseUid , transport , settings , codec ,
2013-05-24 14:01:29 +02:00
receiveBuffers , reliableDeliverySupervisor )
2013-03-27 17:47:56 +01:00
/* *
* This message signals that the current association maintained by the local EndpointWriter and EndpointReader is
* to be overridden by a new inbound association . This is needed to avoid parallel inbound associations from the
* same remote endpoint : when a parallel inbound association is detected , the old one is removed and the new one is
* used instead .
* @param handle Handle of the new inbound association .
*/
2013-08-23 14:39:21 +02:00
case class TakeOver ( handle : AkkaProtocolHandle ) extends NoSerializationVerificationNeeded
2013-11-15 08:59:46 +01:00
case class TookOver ( writer : ActorRef , handle : AkkaProtocolHandle ) extends NoSerializationVerificationNeeded
2013-03-27 17:47:56 +01:00
case object BackoffTimer
case object FlushAndStop
case object AckIdleCheckTimer
2013-06-03 11:35:37 +02:00
case class StopReading ( writer : ActorRef )
case class StoppedReading ( writer : ActorRef )
2013-03-27 17:47:56 +01:00
2013-08-23 14:39:21 +02:00
case class Handle ( handle : AkkaProtocolHandle ) extends NoSerializationVerificationNeeded
2013-03-27 17:47:56 +01:00
case class OutboundAck ( ack : Ack )
sealed trait State
case object Initializing extends State
case object Buffering extends State
case object Writing extends State
case object Handoff extends State
val AckIdleTimerName = "AckIdleTimer"
}
2013-03-20 20:38:49 +13:00
/* *
* INTERNAL API
*/
private [ remote ] class EndpointWriter (
2013-03-27 17:47:56 +01:00
handleOrActive : Option [ AkkaProtocolHandle ] ,
2013-03-20 20:38:49 +13:00
localAddress : Address ,
remoteAddress : Address ,
2013-11-26 15:25:05 +01:00
refuseUid : Option [ Int ] ,
transport : AkkaProtocolTransport ,
2013-03-20 20:38:49 +13:00
settings : RemoteSettings ,
2013-03-27 17:47:56 +01:00
codec : AkkaPduCodec ,
2013-08-19 15:34:24 +02:00
val receiveBuffers : ConcurrentHashMap [ Link , ResendState ] ,
2013-04-26 12:18:01 +02:00
val reliableDeliverySupervisor : Option [ ActorRef ] )
extends EndpointActor ( localAddress , remoteAddress , transport , settings , codec ) with UnboundedStash
with FSM [ EndpointWriter . State , Unit ] {
2012-09-12 11:18:42 +02:00
import EndpointWriter._
import context.dispatcher
val extendedSystem : ExtendedActorSystem = context . system . asInstanceOf [ ExtendedActorSystem ]
2013-05-20 14:35:51 +02:00
val remoteMetrics = RemoteMetricsExtension ( extendedSystem )
2012-11-21 16:39:04 +01:00
2012-12-07 16:03:04 +01:00
var reader : Option [ ActorRef ] = None
2013-03-27 17:47:56 +01:00
var handle : Option [ AkkaProtocolHandle ] = handleOrActive // FIXME: refactor into state data
2012-12-07 16:03:04 +01:00
val readerId = Iterator from 0
2012-09-12 11:18:42 +02:00
2013-03-27 17:47:56 +01:00
def newAckDeadline : Deadline = Deadline . now + settings . SysMsgAckTimeout
var ackDeadline : Deadline = newAckDeadline
var lastAck : Option [ Ack ] = None
2013-08-26 15:41:05 +02:00
override val supervisorStrategy = OneForOneStrategy ( loggingEnabled = false ) {
case NonFatal ( e ) ⇒ publishAndThrow ( e , Logging . ErrorLevel )
}
2012-09-12 11:18:42 +02:00
2013-05-20 14:35:51 +02:00
val provider = RARP ( extendedSystem ) . provider
val msgDispatch = new DefaultMessageDispatcher ( extendedSystem , provider , log )
2012-09-12 11:18:42 +02:00
2013-01-14 10:04:49 +01:00
var inbound = handle . isDefined
2013-06-19 14:09:14 +02:00
var stopReason : DisassociateInfo = AssociationHandle . Unknown
2012-09-12 11:18:42 +02:00
2013-08-26 15:41:05 +02:00
private def publishAndThrow ( reason : Throwable , logLevel : Logging . LogLevel ) : Nothing = {
reason match {
case _ : EndpointDisassociatedException ⇒ publishDisassociated ( )
case _ ⇒ publishError ( reason , logLevel )
}
2012-12-14 13:45:55 +01:00
throw reason
}
2012-09-12 11:18:42 +02:00
2013-07-01 12:51:15 +02:00
private def logAndStay ( reason : Throwable ) : State = {
log . error ( reason , "Transient association error (association remains live)" )
2013-03-20 20:38:49 +13:00
stay ( )
}
2012-09-12 11:18:42 +02:00
override def postRestart ( reason : Throwable ) : Unit = {
2012-12-07 16:03:04 +01:00
handle = None // Wipe out the possibly injected handle
2013-01-14 10:04:49 +01:00
inbound = false
2012-09-12 11:18:42 +02:00
preStart ( )
}
2013-03-27 17:47:56 +01:00
override def preStart ( ) : Unit = {
setTimer ( AckIdleTimerName , AckIdleCheckTimer , settings . SysMsgAckTimeout / 2 , repeat = true )
2012-12-21 18:11:56 +01:00
startWith (
handle match {
case Some ( h ) ⇒
reader = startReadEndpoint ( h )
Writing
case None ⇒
2013-11-26 15:25:05 +01:00
transport . associate ( remoteAddress , refuseUid ) . map ( Handle ( _ ) ) pipeTo self
2012-12-21 18:11:56 +01:00
Initializing
} ,
2013-03-27 17:47:56 +01:00
stateData = ( ) )
}
2012-09-12 11:18:42 +02:00
when ( Initializing ) {
2013-03-27 17:47:56 +01:00
case Event ( Send ( msg , senderOption , recipient , _ ) , _ ) ⇒
2012-09-12 11:18:42 +02:00
stash ( )
2012-12-07 16:03:04 +01:00
stay ( )
2012-12-20 12:54:43 +01:00
case Event ( Status . Failure ( e : InvalidAssociationException ) , _ ) ⇒
2013-08-26 15:41:05 +02:00
publishAndThrow ( new InvalidAssociation ( localAddress , remoteAddress , e ) , Logging . WarningLevel )
2012-12-20 12:54:43 +01:00
case Event ( Status . Failure ( e ) , _ ) ⇒
2013-08-26 15:41:05 +02:00
publishAndThrow ( new EndpointAssociationException ( s" Association failed with [ $remoteAddress ] " , e ) , Logging . DebugLevel )
2013-08-23 14:39:21 +02:00
case Event ( Handle ( inboundHandle ) , _ ) ⇒
2012-12-21 18:11:56 +01:00
// Assert handle == None?
2013-03-27 17:47:56 +01:00
context . parent ! ReliableDeliverySupervisor . GotUid ( inboundHandle . handshakeInfo . uid )
2012-12-07 16:03:04 +01:00
handle = Some ( inboundHandle )
2012-12-21 18:11:56 +01:00
reader = startReadEndpoint ( inboundHandle )
2012-09-12 11:18:42 +02:00
goto ( Writing )
}
when ( Buffering ) {
2013-03-27 17:47:56 +01:00
case Event ( _ : Send , _ ) ⇒
2012-09-12 11:18:42 +02:00
stash ( )
2012-12-07 16:03:04 +01:00
stay ( )
2012-09-12 11:18:42 +02:00
case Event ( BackoffTimer , _ ) ⇒ goto ( Writing )
2013-02-25 12:11:39 +01:00
case Event ( FlushAndStop , _ ) ⇒
stash ( ) // Flushing is postponed after the pending writes
stay ( )
2012-09-12 11:18:42 +02:00
}
when ( Writing ) {
2013-03-27 17:47:56 +01:00
case Event ( s @ Send ( msg , senderOption , recipient , seqOption ) , _ ) ⇒
2012-12-21 18:11:56 +01:00
try {
2012-12-14 13:45:55 +01:00
handle match {
2012-12-21 18:11:56 +01:00
case Some ( h ) ⇒
2013-05-20 14:35:51 +02:00
if ( provider . remoteSettings . LogSend ) {
def msgLog = s" RemoteMessage: [ $msg ] to [ $recipient ]<+[ ${ recipient . path } ] from [ ${ senderOption . getOrElse ( extendedSystem . deadLetters ) } ] "
log . debug ( "sending message {}" , msgLog )
}
2013-03-27 17:47:56 +01:00
val pdu = codec . constructMessage (
recipient . localAddressToUse ,
recipient ,
serializeMessage ( msg ) ,
senderOption ,
seqOption = seqOption ,
ackOption = lastAck )
2013-10-08 09:56:59 +02:00
ackDeadline = newAckDeadline
lastAck = None
2013-04-28 08:32:58 +02:00
val pduSize = pdu . size
remoteMetrics . logPayloadBytes ( msg , pduSize )
if ( pduSize > transport . maximumPayloadBytes ) {
2013-07-01 12:51:15 +02:00
logAndStay ( new OversizedPayloadException ( s" Discarding oversized payload sent to ${ recipient } : max allowed size ${ transport . maximumPayloadBytes } bytes, actual size of encoded ${ msg . getClass } was ${ pdu . size } bytes. " ) )
2013-03-20 20:38:49 +13:00
} else if ( h . write ( pdu ) ) {
stay ( )
} else {
2013-03-27 17:47:56 +01:00
if ( seqOption . isEmpty ) stash ( )
2012-12-21 18:11:56 +01:00
goto ( Buffering )
}
case None ⇒
throw new EndpointException ( "Internal error: Endpoint is in state Writing, but no association handle is present." )
2012-12-14 13:45:55 +01:00
}
2012-12-07 16:03:04 +01:00
} catch {
2013-08-26 15:41:05 +02:00
case e : NotSerializableException ⇒
logAndStay ( e )
case e : EndpointException ⇒
publishAndThrow ( e , Logging . ErrorLevel )
case NonFatal ( e ) ⇒
publishAndThrow ( new EndpointException ( "Failed to write message to the transport" , e ) , Logging . ErrorLevel )
2012-09-12 11:18:42 +02:00
}
2013-02-25 12:11:39 +01:00
2013-03-20 20:38:49 +13:00
// We are in Writing state, so stash is empty, safe to stop here
2013-03-27 17:47:56 +01:00
case Event ( FlushAndStop , _ ) ⇒
// Try to send a last Ack message
trySendPureAck ( )
2013-06-19 14:09:14 +02:00
stopReason = AssociationHandle . Shutdown
2013-03-27 17:47:56 +01:00
stop ( )
case Event ( AckIdleCheckTimer , _ ) if ackDeadline . isOverdue ( ) ⇒
trySendPureAck ( )
stay ( )
2012-09-12 11:18:42 +02:00
}
2013-01-14 10:04:49 +01:00
when ( Handoff ) {
case Event ( Terminated ( _ ) , _ ) ⇒
reader = startReadEndpoint ( handle . get )
unstashAll ( )
goto ( Writing )
2013-08-19 17:50:15 +02:00
case _ ⇒
2013-01-14 10:04:49 +01:00
stash ( )
stay ( )
}
2012-09-12 11:18:42 +02:00
whenUnhandled {
2013-03-27 17:47:56 +01:00
case Event ( Terminated ( r ) , _ ) if r == reader . orNull ⇒
2013-08-26 15:41:05 +02:00
publishAndThrow ( new EndpointDisassociatedException ( "Disassociated" ) , Logging . DebugLevel )
2013-06-03 11:35:37 +02:00
case Event ( s : StopReading , _ ) ⇒
reader match {
case Some ( r ) ⇒ r forward s
case None ⇒ stash ( )
}
stay ( )
2012-11-21 16:39:04 +01:00
case Event ( TakeOver ( newHandle ) , _ ) ⇒
// Shutdown old reader
2012-12-07 16:03:04 +01:00
handle foreach { _ . disassociate ( ) }
handle = Some ( newHandle )
2014-01-16 15:16:35 +01:00
sender ( ) ! TookOver ( self , newHandle )
2013-01-14 10:04:49 +01:00
goto ( Handoff )
2013-02-25 12:11:39 +01:00
case Event ( FlushAndStop , _ ) ⇒
2013-06-19 14:09:14 +02:00
stopReason = AssociationHandle . Shutdown
2013-02-25 12:11:39 +01:00
stop ( )
2013-03-27 17:47:56 +01:00
case Event ( OutboundAck ( ack ) , _ ) ⇒
lastAck = Some ( ack )
stay ( )
case Event ( AckIdleCheckTimer , _ ) ⇒ stay ( ) // Ignore
2012-09-12 11:18:42 +02:00
}
onTransition {
case Initializing -> Writing ⇒
unstashAll ( )
eventPublisher . notifyListeners ( AssociatedEvent ( localAddress , remoteAddress , inbound ) )
2012-12-07 16:03:04 +01:00
case Writing -> Buffering ⇒
setTimer ( "backoff-timer" , BackoffTimer , settings . BackoffPeriod , repeat = false )
2012-09-12 11:18:42 +02:00
case Buffering -> Writing ⇒
unstashAll ( )
cancelTimer ( "backoff-timer" )
}
onTermination {
2012-12-18 13:46:10 +01:00
case StopEvent ( _ , _ , _ ) ⇒
2013-03-27 17:47:56 +01:00
cancelTimer ( AckIdleTimerName )
2012-12-07 16:03:04 +01:00
// It is important to call unstashAll() for the stash to work properly and maintain messages during restart.
// As the FSM trait does not call super.postStop(), this call is needed
2012-11-21 16:39:04 +01:00
unstashAll ( )
2013-06-19 14:09:14 +02:00
handle foreach { _ . disassociate ( stopReason ) }
2012-09-12 11:18:42 +02:00
eventPublisher . notifyListeners ( DisassociatedEvent ( localAddress , remoteAddress , inbound ) )
}
2013-03-27 17:47:56 +01:00
private def trySendPureAck ( ) : Unit = for ( h ← handle ; ack ← lastAck )
2013-10-08 09:56:59 +02:00
if ( h . write ( codec . constructPureAck ( ack ) ) ) {
ackDeadline = newAckDeadline
lastAck = None
}
2013-03-27 17:47:56 +01:00
private def startReadEndpoint ( handle : AkkaProtocolHandle ) : Some [ ActorRef ] = {
2012-12-21 18:11:56 +01:00
val newReader =
2013-03-20 20:38:49 +13:00
context . watch ( context . actorOf (
2013-08-19 12:06:07 +02:00
RARP ( context . system ) . configureDispatcher ( EndpointReader . props ( localAddress , remoteAddress , transport , settings , codec ,
2013-08-19 15:34:24 +02:00
msgDispatch , inbound , handle . handshakeInfo . uid , reliableDeliverySupervisor , receiveBuffers ) ) . withDeploy ( Deploy . local ) ,
2012-12-21 18:11:56 +01:00
"endpointReader-" + AddressUrlEncoder ( remoteAddress ) + "-" + readerId . next ( ) ) )
handle . readHandlerPromise . success ( ActorHandleEventListener ( newReader ) )
Some ( newReader )
2012-09-12 11:18:42 +02:00
}
2013-03-27 17:47:56 +01:00
private def serializeMessage ( msg : Any ) : SerializedMessage = handle match {
2012-12-07 16:03:04 +01:00
case Some ( h ) ⇒
2013-05-20 14:35:51 +02:00
Serialization . currentTransportInformation . withValue ( Serialization . Information ( h . localAddress , extendedSystem ) ) {
2012-12-07 16:03:04 +01:00
( MessageSerializer . serialize ( extendedSystem , msg . asInstanceOf [ AnyRef ] ) )
}
2013-05-30 21:37:57 +02:00
case None ⇒
throw new EndpointException ( "Internal error: No handle was present during serialization of outbound message." )
2012-09-12 11:18:42 +02:00
}
}
2013-03-27 17:47:56 +01:00
/* *
* INTERNAL API
*/
private [ remote ] object EndpointReader {
2013-05-30 14:03:35 +02:00
def props (
2013-03-27 17:47:56 +01:00
localAddress : Address ,
remoteAddress : Address ,
transport : Transport ,
settings : RemoteSettings ,
codec : AkkaPduCodec ,
msgDispatch : InboundMessageDispatcher ,
inbound : Boolean ,
2013-08-19 15:34:24 +02:00
uid : Int ,
2013-03-27 17:47:56 +01:00
reliableDeliverySupervisor : Option [ ActorRef ] ,
2013-08-19 15:34:24 +02:00
receiveBuffers : ConcurrentHashMap [ Link , ResendState ] ) : Props =
2013-03-27 17:47:56 +01:00
Props ( classOf [ EndpointReader ] , localAddress , remoteAddress , transport , settings , codec , msgDispatch , inbound ,
2013-08-19 15:34:24 +02:00
uid , reliableDeliverySupervisor , receiveBuffers )
2013-03-27 17:47:56 +01:00
}
2013-02-08 13:13:52 +01:00
/* *
* INTERNAL API
*/
2012-09-12 11:18:42 +02:00
private [ remote ] class EndpointReader (
2013-03-20 20:38:49 +13:00
localAddress : Address ,
remoteAddress : Address ,
transport : Transport ,
settings : RemoteSettings ,
codec : AkkaPduCodec ,
msgDispatch : InboundMessageDispatcher ,
2013-03-27 17:47:56 +01:00
val inbound : Boolean ,
2013-08-19 15:34:24 +02:00
val uid : Int ,
2013-03-27 17:47:56 +01:00
val reliableDeliverySupervisor : Option [ ActorRef ] ,
2013-08-19 15:34:24 +02:00
val receiveBuffers : ConcurrentHashMap [ Link , ResendState ] ) extends EndpointActor ( localAddress , remoteAddress , transport , settings , codec ) {
2013-03-27 17:47:56 +01:00
2013-06-03 11:35:37 +02:00
import EndpointWriter. { OutboundAck , StopReading , StoppedReading }
2012-09-12 11:18:42 +02:00
2012-12-11 13:08:36 +01:00
val provider = RARP ( context . system ) . provider
2013-03-27 17:47:56 +01:00
var ackedReceiveBuffer = new AckedReceiveBuffer [ Message ]
override def preStart ( ) : Unit = {
receiveBuffers . get ( Link ( localAddress , remoteAddress ) ) match {
case null ⇒
2013-08-19 15:34:24 +02:00
case ResendState ( `uid` , buffer ) ⇒
ackedReceiveBuffer = buffer
2013-03-27 17:47:56 +01:00
deliverAndAck ( )
2013-08-19 15:34:24 +02:00
case _ ⇒
2013-03-27 17:47:56 +01:00
}
}
2013-06-03 11:35:37 +02:00
override def postStop ( ) : Unit = saveState ( )
2013-03-27 17:47:56 +01:00
2013-06-03 11:35:37 +02:00
def saveState ( ) : Unit = {
2013-08-19 15:34:24 +02:00
def merge ( currentState : ResendState , oldState : ResendState ) : ResendState =
if ( currentState . uid == oldState . uid ) ResendState ( uid , oldState . buffer . mergeFrom ( currentState . buffer ) )
else currentState
2013-03-27 17:47:56 +01:00
@tailrec
2013-08-19 15:34:24 +02:00
def updateSavedState ( key : Link , expectedState : ResendState ) : Unit = {
2013-03-27 17:47:56 +01:00
if ( expectedState eq null ) {
2013-08-19 15:34:24 +02:00
if ( receiveBuffers . putIfAbsent ( key , ResendState ( uid , ackedReceiveBuffer ) ) ne null )
updateSavedState ( key , receiveBuffers . get ( key ) )
} else if ( ! receiveBuffers . replace ( key , expectedState , merge ( ResendState ( uid , ackedReceiveBuffer ) , expectedState ) ) )
2013-03-27 17:47:56 +01:00
updateSavedState ( key , receiveBuffers . get ( key ) )
}
val key = Link ( localAddress , remoteAddress )
updateSavedState ( key , receiveBuffers . get ( key ) )
}
2012-09-12 11:18:42 +02:00
override def receive : Receive = {
2013-06-19 14:09:14 +02:00
case Disassociated ( info ) ⇒ handleDisassociated ( info )
2012-09-12 11:18:42 +02:00
2013-03-27 17:47:56 +01:00
case InboundPayload ( p ) if p . size <= transport . maximumPayloadBytes ⇒
val ( ackOption , msgOption ) = tryDecodeMessageAndAck ( p )
for ( ack ← ackOption ; reliableDelivery ← reliableDeliverySupervisor ) reliableDelivery ! ack
msgOption match {
case Some ( msg ) ⇒
if ( msg . reliableDeliveryEnabled ) {
ackedReceiveBuffer = ackedReceiveBuffer . receive ( msg )
deliverAndAck ( )
} else msgDispatch . dispatch ( msg . recipient , msg . recipientAddress , msg . serializedMessage , msg . senderOption )
case None ⇒
2013-03-20 20:38:49 +13:00
}
2013-03-27 17:47:56 +01:00
case InboundPayload ( oversized ) ⇒
2013-07-01 12:51:15 +02:00
log . error ( new OversizedPayloadException ( s" Discarding oversized payload received: " +
s" max allowed size [ ${ transport . maximumPayloadBytes } ] bytes, actual size [ ${ oversized . size } ] bytes. " ) ,
"Transient error while reading from association (association remains live)" )
2013-03-27 17:47:56 +01:00
2013-06-03 11:35:37 +02:00
case StopReading ( writer ) ⇒
saveState ( )
context . become ( notReading )
2014-01-16 15:16:35 +01:00
sender ( ) ! StoppedReading ( writer )
2013-06-03 11:35:37 +02:00
}
def notReading : Receive = {
2013-06-19 14:09:14 +02:00
case Disassociated ( info ) ⇒ handleDisassociated ( info )
2013-06-03 11:35:37 +02:00
2013-06-19 14:09:14 +02:00
case StopReading ( writer ) ⇒
2014-01-16 15:16:35 +01:00
sender ( ) ! StoppedReading ( writer )
2013-06-03 11:35:37 +02:00
case InboundPayload ( p ) ⇒
val ( ackOption , _ ) = tryDecodeMessageAndAck ( p )
for ( ack ← ackOption ; reliableDelivery ← reliableDeliverySupervisor ) reliableDelivery ! ack
case _ ⇒
2013-03-27 17:47:56 +01:00
}
2013-06-19 14:09:14 +02:00
private def handleDisassociated ( info : DisassociateInfo ) : Unit = info match {
case AssociationHandle . Unknown ⇒
context . stop ( self )
case AssociationHandle . Shutdown ⇒
2013-10-09 11:29:02 +02:00
throw ShutDownAssociation (
2013-06-19 14:09:14 +02:00
localAddress ,
remoteAddress ,
InvalidAssociationException ( "The remote system terminated the association because it is shutting down." ) )
case AssociationHandle . Quarantined ⇒
throw InvalidAssociation (
localAddress ,
remoteAddress ,
InvalidAssociationException ( "The remote system has quarantined this system. No further associations " +
"to the remote system are possible until this system is restarted." ) )
}
2013-03-27 17:47:56 +01:00
private def deliverAndAck ( ) : Unit = {
val ( updatedBuffer , deliver , ack ) = ackedReceiveBuffer . extractDeliverable
ackedReceiveBuffer = updatedBuffer
2013-10-08 09:56:59 +02:00
2013-03-27 17:47:56 +01:00
// Notify writer that some messages can be acked
context . parent ! OutboundAck ( ack )
deliver foreach { m ⇒
msgDispatch . dispatch ( m . recipient , m . recipientAddress , m . serializedMessage , m . senderOption )
}
2012-09-12 11:18:42 +02:00
}
2013-03-27 17:47:56 +01:00
private def tryDecodeMessageAndAck ( pdu : ByteString ) : ( Option [ Ack ] , Option [ Message ] ) = try {
2012-11-21 15:58:01 +01:00
codec . decodeMessage ( pdu , provider , localAddress )
2012-09-12 11:18:42 +02:00
} catch {
case NonFatal ( e ) ⇒ throw new EndpointException ( "Error while decoding incoming Akka PDU" , e )
}
}