Made RemotingEvents sealed and final
This commit is contained in:
parent
0705d47a88
commit
9f006789fc
11 changed files with 152 additions and 158 deletions
|
|
@ -14,18 +14,20 @@ import akka.serialization.Serialization
|
|||
import akka.util.ByteString
|
||||
import java.net.URLEncoder
|
||||
import scala.util.control.NonFatal
|
||||
import akka.actor.SupervisorStrategy.{ Restart, Stop }
|
||||
|
||||
trait InboundMessageDispatcher {
|
||||
/**
|
||||
* Internal API
|
||||
*/
|
||||
private[remote] trait InboundMessageDispatcher {
|
||||
def dispatch(recipient: InternalActorRef,
|
||||
recipientAddress: Address,
|
||||
serializedMessage: MessageProtocol,
|
||||
senderOption: Option[ActorRef]): Unit
|
||||
}
|
||||
|
||||
class DefaultMessageDispatcher(private val system: ExtendedActorSystem,
|
||||
private val provider: RemoteActorRefProvider,
|
||||
private val log: LoggingAdapter) extends InboundMessageDispatcher {
|
||||
private[remote] class DefaultMessageDispatcher(private val system: ExtendedActorSystem,
|
||||
private val provider: RemoteActorRefProvider,
|
||||
private val log: LoggingAdapter) extends InboundMessageDispatcher {
|
||||
|
||||
private val remoteDaemon = provider.remoteDaemon
|
||||
|
||||
|
|
@ -37,11 +39,11 @@ class DefaultMessageDispatcher(private val system: ExtendedActorSystem,
|
|||
import provider.remoteSettings._
|
||||
|
||||
lazy val payload: AnyRef = MessageSerializer.deserialize(system, serializedMessage)
|
||||
lazy val payloadClass: Class[_] = if (payload eq null) null else payload.getClass
|
||||
def payloadClass: Class[_] = if (payload eq null) null else payload.getClass
|
||||
val sender: ActorRef = senderOption.getOrElse(system.deadLetters)
|
||||
val originalReceiver = recipient.path
|
||||
|
||||
lazy val msgLog = s"RemoteMessage: [$payload] to [$recipient]<+[$originalReceiver] from [$sender]"
|
||||
def msgLog = s"RemoteMessage: [$payload] to [$recipient]<+[$originalReceiver] from [$sender]"
|
||||
|
||||
recipient match {
|
||||
|
||||
|
|
@ -83,8 +85,17 @@ class DefaultMessageDispatcher(private val system: ExtendedActorSystem,
|
|||
|
||||
}
|
||||
|
||||
object EndpointWriter {
|
||||
/**
|
||||
* Internal API
|
||||
*/
|
||||
private[remote] object EndpointWriter {
|
||||
|
||||
/**
|
||||
* This message signals that the current association maintained by the local EndpointWriter and EndpointReader is
|
||||
* to be overridden by a new inbound association. This is needed to avoid parallel inbound associations from the
|
||||
* same remote endpoint.
|
||||
* @param handle
|
||||
*/
|
||||
case class TakeOver(handle: AssociationHandle)
|
||||
case object BackoffTimer
|
||||
|
||||
|
|
@ -94,8 +105,8 @@ object EndpointWriter {
|
|||
case object Writing extends State
|
||||
}
|
||||
|
||||
class EndpointException(msg: String, cause: Throwable) extends AkkaException(msg, cause)
|
||||
case class InvalidAssociation(localAddress: Address, remoteAddress: Address, cause: Throwable)
|
||||
private[remote] class EndpointException(msg: String, cause: Throwable) extends AkkaException(msg, cause)
|
||||
private[remote] case class InvalidAssociation(localAddress: Address, remoteAddress: Address, cause: Throwable)
|
||||
extends EndpointException("Invalid address: " + remoteAddress, cause)
|
||||
|
||||
private[remote] class EndpointWriter(
|
||||
|
|
@ -112,37 +123,34 @@ private[remote] class EndpointWriter(
|
|||
val extendedSystem: ExtendedActorSystem = context.system.asInstanceOf[ExtendedActorSystem]
|
||||
val eventPublisher = new EventPublisher(context.system, log, settings.LogLifecycleEvents)
|
||||
|
||||
var reader: ActorRef = null
|
||||
var handle: AssociationHandle = handleOrActive.getOrElse(null)
|
||||
var inbound = false
|
||||
var readerId = 0
|
||||
var reader: Option[ActorRef] = None
|
||||
var handle: Option[AssociationHandle] = handleOrActive // FIXME: refactor into state data
|
||||
val readerId = Iterator from 0
|
||||
|
||||
override val supervisorStrategy = OneForOneStrategy() { case NonFatal(e) ⇒ publishAndThrow(e) }
|
||||
|
||||
val msgDispatch =
|
||||
new DefaultMessageDispatcher(extendedSystem, extendedSystem.provider.asInstanceOf[RemoteActorRefProvider], log)
|
||||
|
||||
private def publishAndThrow(reason: Throwable): Nothing = {
|
||||
eventPublisher.notifyListeners(AssociationErrorEvent(reason, localAddress, remoteAddress, inbound))
|
||||
throw reason
|
||||
}
|
||||
def inbound = handle.isDefined
|
||||
|
||||
private def publishAndThrow(message: String, cause: Throwable): Nothing =
|
||||
publishAndThrow(new EndpointException(message, cause))
|
||||
private def publishAndThrow(reason: Throwable): Nothing =
|
||||
try
|
||||
// FIXME: Casting seems very evil here...
|
||||
eventPublisher.notifyListeners(AssociationErrorEvent(reason, localAddress, remoteAddress, inbound)).asInstanceOf[Nothing]
|
||||
finally throw reason
|
||||
|
||||
override def postRestart(reason: Throwable): Unit = {
|
||||
handle = null // Wipe out the possibly injected handle
|
||||
handle = None // Wipe out the possibly injected handle
|
||||
preStart()
|
||||
}
|
||||
|
||||
override def preStart(): Unit = {
|
||||
if (handle eq null) {
|
||||
if (!inbound) {
|
||||
transport.associate(remoteAddress) pipeTo self
|
||||
inbound = false
|
||||
startWith(Initializing, ())
|
||||
} else {
|
||||
startReadEndpoint()
|
||||
inbound = true
|
||||
startWith(Writing, ())
|
||||
}
|
||||
}
|
||||
|
|
@ -150,15 +158,16 @@ private[remote] class EndpointWriter(
|
|||
when(Initializing) {
|
||||
case Event(Send(msg, senderOption, recipient), _) ⇒
|
||||
stash()
|
||||
stay
|
||||
stay()
|
||||
case Event(Transport.Invalid(e), _) ⇒
|
||||
log.error(e, "Tried to associate with invalid remote address [{}]. " +
|
||||
"Address is now quarantined, all messages to this address will be delivered to dead letters.", remoteAddress)
|
||||
publishAndThrow(new InvalidAssociation(localAddress, remoteAddress, e))
|
||||
|
||||
case Event(Transport.Fail(e), _) ⇒ publishAndThrow(s"Association failed with [$remoteAddress]", e)
|
||||
case Event(Transport.Fail(e), _) ⇒
|
||||
publishAndThrow(new EndpointException(s"Association failed with [$remoteAddress]", e))
|
||||
case Event(Transport.Ready(inboundHandle), _) ⇒
|
||||
handle = inboundHandle
|
||||
handle = Some(inboundHandle)
|
||||
startReadEndpoint()
|
||||
goto(Writing)
|
||||
|
||||
|
|
@ -167,7 +176,7 @@ private[remote] class EndpointWriter(
|
|||
when(Buffering) {
|
||||
case Event(Send(msg, senderOption, recipient), _) ⇒
|
||||
stash()
|
||||
stay
|
||||
stay()
|
||||
|
||||
case Event(BackoffTimer, _) ⇒ goto(Writing)
|
||||
}
|
||||
|
|
@ -175,26 +184,31 @@ private[remote] class EndpointWriter(
|
|||
when(Writing) {
|
||||
case Event(Send(msg, senderOption, recipient), _) ⇒
|
||||
val pdu = codec.constructMessage(recipient.localAddressToUse, recipient, serializeMessage(msg), senderOption)
|
||||
val success = try handle.write(pdu) catch {
|
||||
case NonFatal(e) ⇒ publishAndThrow("Failed to write message to the transport", e)
|
||||
val success = try handle match {
|
||||
case Some(h) ⇒ h.write(pdu)
|
||||
case None ⇒ throw new EndpointException("Internal error: Endpoint is in state Writing, but no association" +
|
||||
"handle is present.", null)
|
||||
} catch {
|
||||
case NonFatal(e) ⇒ publishAndThrow(new EndpointException("Failed to write message to the transport", e))
|
||||
}
|
||||
if (success) stay else {
|
||||
if (success) stay() else {
|
||||
stash()
|
||||
goto(Buffering)
|
||||
}
|
||||
}
|
||||
|
||||
whenUnhandled {
|
||||
case Event(Terminated(r), _) if r == reader ⇒ publishAndThrow("Disassociated", null)
|
||||
case Event(Terminated(r), _) if r == reader ⇒ publishAndThrow(new EndpointException("Disassociated", null))
|
||||
case Event(TakeOver(newHandle), _) ⇒
|
||||
// Shutdown old reader
|
||||
if (handle ne null) handle.disassociate()
|
||||
if (reader ne null) {
|
||||
context.unwatch(reader)
|
||||
context.stop(reader)
|
||||
handle foreach { _.disassociate() }
|
||||
reader match {
|
||||
case Some(r) ⇒
|
||||
context.unwatch(r)
|
||||
context.stop(r)
|
||||
case None ⇒
|
||||
}
|
||||
handle = newHandle
|
||||
inbound = true
|
||||
handle = Some(newHandle)
|
||||
startReadEndpoint()
|
||||
unstashAll()
|
||||
goto(Writing)
|
||||
|
|
@ -204,7 +218,8 @@ private[remote] class EndpointWriter(
|
|||
case Initializing -> Writing ⇒
|
||||
unstashAll()
|
||||
eventPublisher.notifyListeners(AssociatedEvent(localAddress, remoteAddress, inbound))
|
||||
case Writing -> Buffering ⇒ setTimer("backoff-timer", BackoffTimer, settings.BackoffPeriod, false)
|
||||
case Writing -> Buffering ⇒
|
||||
setTimer("backoff-timer", BackoffTimer, settings.BackoffPeriod, repeat = false)
|
||||
case Buffering -> Writing ⇒
|
||||
unstashAll()
|
||||
cancelTimer("backoff-timer")
|
||||
|
|
@ -212,24 +227,31 @@ private[remote] class EndpointWriter(
|
|||
|
||||
onTermination {
|
||||
case StopEvent(_, _, _) ⇒ if (handle ne null) {
|
||||
// FIXME: Add a test case for this
|
||||
// It is important to call unstashAll() for the stash to work properly and maintain messages during restart.
|
||||
// As the FSM trait does not call super.postStop(), this call is needed
|
||||
unstashAll()
|
||||
handle.disassociate()
|
||||
handle foreach { _.disassociate() }
|
||||
eventPublisher.notifyListeners(DisassociatedEvent(localAddress, remoteAddress, inbound))
|
||||
}
|
||||
}
|
||||
|
||||
private def startReadEndpoint(): Unit = {
|
||||
reader = context.actorOf(Props(new EndpointReader(codec, handle.localAddress, msgDispatch)),
|
||||
"endpointReader-" + URLEncoder.encode(remoteAddress.toString, "utf-8"))
|
||||
readerId += 1
|
||||
handle.readHandlerPromise.success(reader)
|
||||
context.watch(reader)
|
||||
private def startReadEndpoint(): Unit = handle match {
|
||||
case Some(h) ⇒
|
||||
reader = Some(context.watch(context.actorOf(Props(new EndpointReader(codec, h.localAddress, msgDispatch)),
|
||||
"endpointReader-" + URLEncoder.encode(remoteAddress.toString, "utf-8") + "-" + readerId.next())))
|
||||
h.readHandlerPromise.success(reader.get)
|
||||
case None ⇒ throw new EndpointException("Internal error: No handle was present during creation of the endpoint" +
|
||||
"reader.", null)
|
||||
}
|
||||
|
||||
private def serializeMessage(msg: Any): MessageProtocol = {
|
||||
Serialization.currentTransportAddress.withValue(handle.localAddress) {
|
||||
(MessageSerializer.serialize(extendedSystem, msg.asInstanceOf[AnyRef]))
|
||||
}
|
||||
private def serializeMessage(msg: Any): MessageProtocol = handle match {
|
||||
case Some(h) ⇒
|
||||
Serialization.currentTransportAddress.withValue(h.localAddress) {
|
||||
(MessageSerializer.serialize(extendedSystem, msg.asInstanceOf[AnyRef]))
|
||||
}
|
||||
case None ⇒ throw new EndpointException("Internal error: No handle was present during serialization of" +
|
||||
"outbound message.", null)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue