diff --git a/akka-remote/src/main/scala/akka/remote/Endpoint.scala b/akka-remote/src/main/scala/akka/remote/Endpoint.scala index dbe31e94dd..b82c1a4d88 100644 --- a/akka-remote/src/main/scala/akka/remote/Endpoint.scala +++ b/akka-remote/src/main/scala/akka/remote/Endpoint.scala @@ -47,7 +47,8 @@ private[remote] class DefaultMessageDispatcher(private val system: ExtendedActor recipient match { case `remoteDaemon` ⇒ - if (UntrustedMode) log.debug("dropping daemon message in untrusted mode") else { + if (UntrustedMode) log.debug("dropping daemon message in untrusted mode") + else { if (LogReceive) log.debug("received daemon message {}", msgLog) payload match { case m @ (_: DaemonMsg | _: Terminated) ⇒ diff --git a/akka-remote/src/main/scala/akka/remote/Remoting.scala b/akka-remote/src/main/scala/akka/remote/Remoting.scala index c5d79a9df7..8524d2f524 100644 --- a/akka-remote/src/main/scala/akka/remote/Remoting.scala +++ b/akka-remote/src/main/scala/akka/remote/Remoting.scala @@ -4,7 +4,7 @@ import scala.language.postfixOps import akka.actor.SupervisorStrategy._ import akka.actor._ import akka.event.{ Logging, LoggingAdapter } -import akka.pattern.gracefulStop +import akka.pattern.{ gracefulStop, pipe } import akka.remote.EndpointManager.{ StartupFinished, ManagementCommand, Listen, Send } import akka.remote.transport.Transport.{ AssociationEventListener, InboundAssociation } import akka.remote.transport._ @@ -125,8 +125,9 @@ private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteAc // This is effectively a write-once variable similar to a lazy val. The reason for not using a lazy val is exception // handling. @volatile var addresses: Set[Address] = _ - // FIXME: Temporary workaround until next Pull Request as the means of configuration changed - override def defaultAddress: Address = addresses.head + // This variable has the same semantics as the addresses variable, in the sense it is written once, and emulates + // a lazy val + @volatile var defaultAddress: Address = _ private val settings = new RemotingSettings(provider.remoteSettings.config) @@ -170,14 +171,17 @@ private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteAc implicit val timeout = new Timeout(settings.StartupTimeout) try { - val addressesPromise: Promise[Set[(Transport, Address)]] = Promise() + val addressesPromise: Promise[Seq[(Transport, Address)]] = Promise() manager ! Listen(addressesPromise) - val transports: Set[(Transport, Address)] = Await.result(addressesPromise.future, timeout.duration) + val transports: Seq[(Transport, Address)] = Await.result(addressesPromise.future, timeout.duration) + if (transports.isEmpty) throw new RemoteTransportException("No transport drivers were loaded.", null) + transportMapping = transports.groupBy { case (transport, _) ⇒ transport.schemeIdentifier }.mapValues { _.toSet } + defaultAddress = transports.head._2 addresses = transports.map { _._2 }.toSet manager ! StartupFinished @@ -228,7 +232,7 @@ private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteAc private[remote] object EndpointManager { sealed trait RemotingCommand - case class Listen(addressesPromise: Promise[Set[(Transport, Address)]]) extends RemotingCommand + case class Listen(addressesPromise: Promise[Seq[(Transport, Address)]]) extends RemotingCommand case object StartupFinished extends RemotingCommand case class Send(message: Any, senderOption: Option[ActorRef], recipient: RemoteActorRef) extends RemotingCommand { @@ -237,6 +241,9 @@ private[remote] object EndpointManager { case class ManagementCommand(cmd: Any, statusPromise: Promise[Boolean]) extends RemotingCommand + case class ListensResult(addressesPromise: Promise[Seq[(Transport, Address)]], + results: Seq[(Transport, Address, Promise[AssociationEventListener])]) + sealed trait EndpointPolicy case class Pass(endpoint: ActorRef) extends EndpointPolicy case class Gated(timeOfFailure: Long) extends EndpointPolicy @@ -283,7 +290,6 @@ private[remote] object EndpointManager { endpoint } - // FIXME: Temporary hack to verify the bug def isPassive(endpoint: ActorRef): Boolean = addressToPassive.contains(endpointToAddress(endpoint)) def markFailed(endpoint: ActorRef, timeOfFailure: Long): Unit = { @@ -351,12 +357,23 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends def receive = { case Listen(addressesPromise) ⇒ + listens map { ListensResult(addressesPromise, _) } pipeTo self - try initializeTransports(addressesPromise) catch { - case NonFatal(e) ⇒ - addressesPromise.failure(e) - context.stop(self) + case ListensResult(addressesPromise, results) ⇒ + transportMapping = results.groupBy { + case (_, transportAddress, _) ⇒ transportAddress + } map { + case (a, t) if t.size > 1 ⇒ + throw new RemoteTransportException(s"There are more than one transports listening on local address [$a]", null) + case (a, t) ⇒ a -> t.head._1 } + // Register to each transport as listener and collect mapping to addresses + val transportsAndAddresses = results map { + case (transport, address, promise) ⇒ + promise.success(self) + transport -> address + } + addressesPromise.success(transportsAndAddresses) case ManagementCommand(_, statusPromise) ⇒ statusPromise.success(false) @@ -415,7 +432,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends case Prune ⇒ endpoints.prune(settings.RetryGateClosedFor) } - private def initializeTransports(addressesPromise: Promise[Set[(Transport, Address)]]): Unit = { + private def listens: Future[Seq[(Transport, Address, Promise[AssociationEventListener])]] = { val transports = for ((fqn, adapters, config) ← settings.Transports) yield { val args = Seq(classOf[ExtendedActorSystem] -> context.system, classOf[Config] -> config) @@ -439,28 +456,10 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends new AkkaProtocolTransport(wrappedTransport, context.system, new AkkaProtocolSettings(conf), AkkaPduProtobufCodec) } - val listens: Future[Seq[(Transport, (Address, Promise[AssociationEventListener]))]] = Future.sequence( - transports.map { transport ⇒ transport.listen map (transport -> _) }) - - listens.onComplete { - case Success(results) ⇒ - transportMapping = results.groupBy { - case (_, (transportAddress, _)) ⇒ transportAddress - } map { - case (a, t) if t.size > 1 ⇒ - // FIXME: Throwing on the wrong thread - throw new RemoteTransportException(s"There are more than one transports listening on local address [$a]", null) - case (a, t) ⇒ a -> t.head._1 - } - - val transportsAndAddresses = (for ((transport, (address, promise)) ← results) yield { - promise.success(self) - transport -> address - }).toSet - addressesPromise.success(transportsAndAddresses) - - case Failure(reason) ⇒ addressesPromise.failure(reason) - } + // Collect all transports, listen addresses and listener promises in one future + Future.sequence(transports.map { transport ⇒ + transport.listen map { case (address, listenerPromise) ⇒ (transport, address, listenerPromise) } + }) } private def createEndpoint(remoteAddress: Address,