From b886fcd54c0f4fd9cb3fb1a995a07abcd7f5f49d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Endre=20S=C3=A1ndor=20Varga?= Date: Fri, 14 Dec 2012 13:45:55 +0100 Subject: [PATCH] Added clarifications to driver-adapter stack building method --- .../remoting/RemoteDeploymentDocSpec.scala | 2 +- akka-remote/src/main/resources/reference.conf | 3 +++ .../src/main/scala/akka/remote/Endpoint.scala | 21 +++++++++++-------- .../scala/akka/remote/FailureDetector.scala | 1 + .../akka/remote/RemoteActorRefProvider.scala | 6 +++--- .../src/main/scala/akka/remote/Remoting.scala | 19 ++++++++++++++++- 6 files changed, 38 insertions(+), 14 deletions(-) diff --git a/akka-docs/rst/scala/code/docs/remoting/RemoteDeploymentDocSpec.scala b/akka-docs/rst/scala/code/docs/remoting/RemoteDeploymentDocSpec.scala index c9da680844..0816d262a6 100644 --- a/akka-docs/rst/scala/code/docs/remoting/RemoteDeploymentDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/remoting/RemoteDeploymentDocSpec.scala @@ -26,7 +26,7 @@ class RemoteDeploymentDocSpec extends AkkaSpec(""" import RemoteDeploymentDocSpec._ val other = ActorSystem("remote", system.settings.config) - val address = other.asInstanceOf[ExtendedActorSystem].provider.getExternalAddressFor(Address("akka", "s", "host", 1)).get + val address = other.asInstanceOf[ExtendedActorSystem].provider.getExternalAddressFor(Address("tcp.akka", "s", "host", 1)).get override def atTermination() { other.shutdown() } diff --git a/akka-remote/src/main/resources/reference.conf b/akka-remote/src/main/resources/reference.conf index 9c41c32f2f..65027d4e42 100644 --- a/akka-remote/src/main/resources/reference.conf +++ b/akka-remote/src/main/resources/reference.conf @@ -88,6 +88,9 @@ akka { # FIXME document require-cookie = off + # FIXME document + flush-wait-on-shutdown = 2 s + # FIXME document shutdown-timeout = 5 s diff --git a/akka-remote/src/main/scala/akka/remote/Endpoint.scala b/akka-remote/src/main/scala/akka/remote/Endpoint.scala index b82c1a4d88..3bd3514e01 100644 --- a/akka-remote/src/main/scala/akka/remote/Endpoint.scala +++ b/akka-remote/src/main/scala/akka/remote/Endpoint.scala @@ -135,11 +135,12 @@ private[remote] class EndpointWriter( def inbound = handle.isDefined - private def publishAndThrow(reason: Throwable): Nothing = + private def publishAndThrow(reason: Throwable): Nothing = { try - // FIXME: Casting seems very evil here... - eventPublisher.notifyListeners(AssociationErrorEvent(reason, localAddress, remoteAddress, inbound)).asInstanceOf[Nothing] - finally throw reason + eventPublisher.notifyListeners(AssociationErrorEvent(reason, localAddress, remoteAddress, inbound)) + catch { case NonFatal(e) ⇒ log.error(e, "Unable to publish error event to EventStream.") } + throw reason + } override def postRestart(reason: Throwable): Unit = { handle = None // Wipe out the possibly injected handle @@ -185,10 +186,12 @@ private[remote] class EndpointWriter( when(Writing) { case Event(Send(msg, senderOption, recipient), _) ⇒ val pdu = codec.constructMessage(recipient.localAddressToUse, recipient, serializeMessage(msg), senderOption) - val success = try handle match { - case Some(h) ⇒ h.write(pdu) - case None ⇒ throw new EndpointException("Internal error: Endpoint is in state Writing, but no association" + - "handle is present.", null) + val success = try { + handle match { + case Some(h) ⇒ h.write(pdu) + case None ⇒ throw new EndpointException("Internal error: Endpoint is in state Writing, but no association" + + "handle is present.", null) + } } catch { case NonFatal(e) ⇒ publishAndThrow(new EndpointException("Failed to write message to the transport", e)) } @@ -199,7 +202,7 @@ private[remote] class EndpointWriter( } whenUnhandled { - case Event(Terminated(r), _) if r == reader ⇒ publishAndThrow(new EndpointException("Disassociated", null)) + case Event(Terminated(r), _) if Some(r) == reader ⇒ publishAndThrow(new EndpointException("Disassociated", null)) case Event(TakeOver(newHandle), _) ⇒ // Shutdown old reader handle foreach { _.disassociate() } diff --git a/akka-remote/src/main/scala/akka/remote/FailureDetector.scala b/akka-remote/src/main/scala/akka/remote/FailureDetector.scala index b5deea4b53..d3562b2afd 100644 --- a/akka-remote/src/main/scala/akka/remote/FailureDetector.scala +++ b/akka-remote/src/main/scala/akka/remote/FailureDetector.scala @@ -27,6 +27,7 @@ object FailureDetector { * Abstraction of a clock that returns time in milliseconds. Clock can only be used to measure elapsed * time and is not related to any other notion of system or wall-clock time. */ + // Abstract class to be able to extend it from Java abstract class Clock extends (() ⇒ Long) implicit val defaultClock = new Clock { diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 5b2177f290..944641403d 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -227,9 +227,9 @@ class RemoteActorRefProvider( def getExternalAddressFor(addr: Address): Option[Address] = { addr match { - case _ if hasAddress(addr) ⇒ Some(local.rootPath.address) - case Address("akka", _, Some(_), Some(_)) ⇒ Some(transport.defaultAddress) - case _ ⇒ None + case _ if hasAddress(addr) ⇒ Some(local.rootPath.address) + case Address(_, _, Some(_), Some(_)) ⇒ try Some(transport.localAddressForRemote(addr)) catch { case NonFatal(_) ⇒ None } + case _ ⇒ None } } diff --git a/akka-remote/src/main/scala/akka/remote/Remoting.scala b/akka-remote/src/main/scala/akka/remote/Remoting.scala index 2ddc141a46..3e86b9b138 100644 --- a/akka-remote/src/main/scala/akka/remote/Remoting.scala +++ b/akka-remote/src/main/scala/akka/remote/Remoting.scala @@ -463,10 +463,20 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends } private def listens: Future[Seq[(Transport, Address, Promise[AssociationEventListener])]] = { - val transports = for ((fqn, adapters, config) ← settings.Transports) yield { + /* + * Constructs chains of adapters on top of each driver as given in configuration. The resulting structure looks + * like the following: + * AkkaProtocolTransport <- Adapter <- ... <- Adapter <- Driver + * + * The transports variable contains only the heads of each chains (the AkkaProtocolTransport instances). + */ + val transports: Seq[AkkaProtocolTransport] = for ((fqn, adapters, config) ← settings.Transports) yield { val args = Seq(classOf[ExtendedActorSystem] -> context.system, classOf[Config] -> config) + // Loads the driver -- the bottom element of the chain. + // The chain at this point: + // Driver val driver = extendedSystem.dynamicAccess .createInstanceFor[Transport](fqn, args).recover({ @@ -477,12 +487,19 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends }).get + // Iteratively decorates the bottom level driver with a list of adapters. + // The chain at this point: + // Adapter <- ... <- Adapter <- Driver val wrappedTransport = adapters.map { TransportAdaptersExtension.get(context.system).getAdapterProvider(_) }.foldLeft(driver) { (t: Transport, provider: TransportAdapterProvider) ⇒ + // The TransportAdapterProvider will wrap the given Transport and returns with a wrapped one provider(t, context.system.asInstanceOf[ExtendedActorSystem]) } + // Apply AkkaProtocolTransport wrapper to the end of the chain + // The chain at this point: + // AkkaProtocolTransport <- Adapter <- ... <- Adapter <- Driver new AkkaProtocolTransport(wrappedTransport, context.system, new AkkaProtocolSettings(conf), AkkaPduProtobufCodec) }