Added clarifications to driver-adapter stack building method

This commit is contained in:
Endre Sándor Varga 2012-12-14 13:45:55 +01:00
parent eb3a0de01b
commit b886fcd54c
6 changed files with 38 additions and 14 deletions

View file

@ -26,7 +26,7 @@ class RemoteDeploymentDocSpec extends AkkaSpec("""
import RemoteDeploymentDocSpec._ import RemoteDeploymentDocSpec._
val other = ActorSystem("remote", system.settings.config) val other = ActorSystem("remote", system.settings.config)
val address = other.asInstanceOf[ExtendedActorSystem].provider.getExternalAddressFor(Address("akka", "s", "host", 1)).get val address = other.asInstanceOf[ExtendedActorSystem].provider.getExternalAddressFor(Address("tcp.akka", "s", "host", 1)).get
override def atTermination() { other.shutdown() } override def atTermination() { other.shutdown() }

View file

@ -88,6 +88,9 @@ akka {
# FIXME document # FIXME document
require-cookie = off require-cookie = off
# FIXME document
flush-wait-on-shutdown = 2 s
# FIXME document # FIXME document
shutdown-timeout = 5 s shutdown-timeout = 5 s

View file

@ -135,11 +135,12 @@ private[remote] class EndpointWriter(
def inbound = handle.isDefined def inbound = handle.isDefined
private def publishAndThrow(reason: Throwable): Nothing = private def publishAndThrow(reason: Throwable): Nothing = {
try try
// FIXME: Casting seems very evil here... eventPublisher.notifyListeners(AssociationErrorEvent(reason, localAddress, remoteAddress, inbound))
eventPublisher.notifyListeners(AssociationErrorEvent(reason, localAddress, remoteAddress, inbound)).asInstanceOf[Nothing] catch { case NonFatal(e) log.error(e, "Unable to publish error event to EventStream.") }
finally throw reason throw reason
}
override def postRestart(reason: Throwable): Unit = { override def postRestart(reason: Throwable): Unit = {
handle = None // Wipe out the possibly injected handle handle = None // Wipe out the possibly injected handle
@ -185,10 +186,12 @@ private[remote] class EndpointWriter(
when(Writing) { when(Writing) {
case Event(Send(msg, senderOption, recipient), _) case Event(Send(msg, senderOption, recipient), _)
val pdu = codec.constructMessage(recipient.localAddressToUse, recipient, serializeMessage(msg), senderOption) val pdu = codec.constructMessage(recipient.localAddressToUse, recipient, serializeMessage(msg), senderOption)
val success = try handle match { val success = try {
case Some(h) h.write(pdu) handle match {
case None throw new EndpointException("Internal error: Endpoint is in state Writing, but no association" + case Some(h) h.write(pdu)
"handle is present.", null) case None throw new EndpointException("Internal error: Endpoint is in state Writing, but no association" +
"handle is present.", null)
}
} catch { } catch {
case NonFatal(e) publishAndThrow(new EndpointException("Failed to write message to the transport", e)) case NonFatal(e) publishAndThrow(new EndpointException("Failed to write message to the transport", e))
} }
@ -199,7 +202,7 @@ private[remote] class EndpointWriter(
} }
whenUnhandled { whenUnhandled {
case Event(Terminated(r), _) if r == reader publishAndThrow(new EndpointException("Disassociated", null)) case Event(Terminated(r), _) if Some(r) == reader publishAndThrow(new EndpointException("Disassociated", null))
case Event(TakeOver(newHandle), _) case Event(TakeOver(newHandle), _)
// Shutdown old reader // Shutdown old reader
handle foreach { _.disassociate() } handle foreach { _.disassociate() }

View file

@ -27,6 +27,7 @@ object FailureDetector {
* Abstraction of a clock that returns time in milliseconds. Clock can only be used to measure elapsed * Abstraction of a clock that returns time in milliseconds. Clock can only be used to measure elapsed
* time and is not related to any other notion of system or wall-clock time. * time and is not related to any other notion of system or wall-clock time.
*/ */
// Abstract class to be able to extend it from Java
abstract class Clock extends (() Long) abstract class Clock extends (() Long)
implicit val defaultClock = new Clock { implicit val defaultClock = new Clock {

View file

@ -227,9 +227,9 @@ class RemoteActorRefProvider(
def getExternalAddressFor(addr: Address): Option[Address] = { def getExternalAddressFor(addr: Address): Option[Address] = {
addr match { addr match {
case _ if hasAddress(addr) Some(local.rootPath.address) case _ if hasAddress(addr) Some(local.rootPath.address)
case Address("akka", _, Some(_), Some(_)) Some(transport.defaultAddress) case Address(_, _, Some(_), Some(_)) try Some(transport.localAddressForRemote(addr)) catch { case NonFatal(_) None }
case _ None case _ None
} }
} }

View file

@ -463,10 +463,20 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
} }
private def listens: Future[Seq[(Transport, Address, Promise[AssociationEventListener])]] = { private def listens: Future[Seq[(Transport, Address, Promise[AssociationEventListener])]] = {
val transports = for ((fqn, adapters, config) settings.Transports) yield { /*
* Constructs chains of adapters on top of each driver as given in configuration. The resulting structure looks
* like the following:
* AkkaProtocolTransport <- Adapter <- ... <- Adapter <- Driver
*
* The transports variable contains only the heads of each chains (the AkkaProtocolTransport instances).
*/
val transports: Seq[AkkaProtocolTransport] = for ((fqn, adapters, config) settings.Transports) yield {
val args = Seq(classOf[ExtendedActorSystem] -> context.system, classOf[Config] -> config) val args = Seq(classOf[ExtendedActorSystem] -> context.system, classOf[Config] -> config)
// Loads the driver -- the bottom element of the chain.
// The chain at this point:
// Driver
val driver = extendedSystem.dynamicAccess val driver = extendedSystem.dynamicAccess
.createInstanceFor[Transport](fqn, args).recover({ .createInstanceFor[Transport](fqn, args).recover({
@ -477,12 +487,19 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
}).get }).get
// Iteratively decorates the bottom level driver with a list of adapters.
// The chain at this point:
// Adapter <- ... <- Adapter <- Driver
val wrappedTransport = val wrappedTransport =
adapters.map { TransportAdaptersExtension.get(context.system).getAdapterProvider(_) }.foldLeft(driver) { adapters.map { TransportAdaptersExtension.get(context.system).getAdapterProvider(_) }.foldLeft(driver) {
(t: Transport, provider: TransportAdapterProvider) (t: Transport, provider: TransportAdapterProvider)
// The TransportAdapterProvider will wrap the given Transport and returns with a wrapped one
provider(t, context.system.asInstanceOf[ExtendedActorSystem]) provider(t, context.system.asInstanceOf[ExtendedActorSystem])
} }
// Apply AkkaProtocolTransport wrapper to the end of the chain
// The chain at this point:
// AkkaProtocolTransport <- Adapter <- ... <- Adapter <- Driver
new AkkaProtocolTransport(wrappedTransport, context.system, new AkkaProtocolSettings(conf), AkkaPduProtobufCodec) new AkkaProtocolTransport(wrappedTransport, context.system, new AkkaProtocolSettings(conf), AkkaPduProtobufCodec)
} }