diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index c6e581b033..cfe1f0ddab 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -61,7 +61,7 @@ class Cluster(val system: ExtendedActorSystem) extends Extension { import settings._ val selfAddress: Address = system.provider match { - case c: ClusterActorRefProvider ⇒ c.transport.addresses.head // FIXME: temporary workaround. See #2663 + case c: ClusterActorRefProvider ⇒ c.transport.defaultAddress case other ⇒ throw new ConfigurationException( "ActorSystem [%s] needs to have a 'ClusterActorRefProvider' enabled in the configuration, currently uses [%s]". format(system, other.getClass.getName)) diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala index 0cb564a69b..d010c823e8 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala @@ -39,7 +39,7 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with ImplicitSender { import ClusterSpec._ // FIXME: temporary workaround. See #2663 - val selfAddress = system.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[ClusterActorRefProvider].transport.addresses.head + val selfAddress = system.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[ClusterActorRefProvider].transport.defaultAddress val cluster = Cluster(system) def clusterView = cluster.readView diff --git a/akka-docs/rst/java/code/docs/serialization/SerializationDocTestBase.java b/akka-docs/rst/java/code/docs/serialization/SerializationDocTestBase.java index 48c0d2fc62..78c85f3e9e 100644 --- a/akka-docs/rst/java/code/docs/serialization/SerializationDocTestBase.java +++ b/akka-docs/rst/java/code/docs/serialization/SerializationDocTestBase.java @@ -140,7 +140,7 @@ public class SerializationDocTestBase { public Address getAddress() { final ActorRefProvider provider = system.provider(); if (provider instanceof RemoteActorRefProvider) { - return ((RemoteActorRefProvider) provider).transport().addresses().head(); + return ((RemoteActorRefProvider) provider).transport().defaultAddress(); } else { throw new UnsupportedOperationException("need RemoteActorRefProvider"); } diff --git a/akka-docs/rst/scala/code/docs/serialization/SerializationDocSpec.scala b/akka-docs/rst/scala/code/docs/serialization/SerializationDocSpec.scala index af49e48999..e428497039 100644 --- a/akka-docs/rst/scala/code/docs/serialization/SerializationDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/serialization/SerializationDocSpec.scala @@ -216,7 +216,7 @@ package docs.serialization { object ExternalAddress extends ExtensionKey[ExternalAddressExt] class ExternalAddressExt(system: ExtendedActorSystem) extends Extension { - def addressForAkka: Address = akka.transportOf(system).addresses.head + def addressForAkka: Address = akka.transportOf(system).defaultAddress } def serializeAkkaDefault(ref: ActorRef): String = diff --git a/akka-remote-tests/src/main/scala/akka/remote/testconductor/Extension.scala b/akka-remote-tests/src/main/scala/akka/remote/testconductor/Extension.scala index e407ecca13..479bc38b1d 100644 --- a/akka-remote-tests/src/main/scala/akka/remote/testconductor/Extension.scala +++ b/akka-remote-tests/src/main/scala/akka/remote/testconductor/Extension.scala @@ -72,7 +72,7 @@ class TestConductorExt(val system: ExtendedActorSystem) extends Extension with C /** * Transport address of this Netty-like remote transport. */ - val address = transport.addresses.head //FIXME: Workaround for old-remoting -- must be removed later + val address = transport.defaultAddress /** * INTERNAL API. diff --git a/akka-remote-tests/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala b/akka-remote-tests/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala index 3754406f83..645a96be89 100644 --- a/akka-remote-tests/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala +++ b/akka-remote-tests/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala @@ -410,7 +410,7 @@ abstract class MultiNodeSpec(val myself: RoleName, _system: ActorSystem, _roles: // useful to see which jvm is running which role, used by LogRoleReplace utility log.info("Role [{}] started with address [{}]", myself.name, //FIXME: Workaround for old-remoting -- must be removed later - system.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider].transport.addresses.head) + system.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider].transport.defaultAddress) } diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 399f24b2e6..df100249c7 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -84,10 +84,10 @@ class RemoteActorRefProvider( // this enables reception of remote requests _transport.start() - //FIXME defaultaddress maybe? _rootPath = RootActorPath(local.rootPath.address.copy( - host = transport.addresses.head.host, - port = transport.addresses.head.port)) + protocol = transport.defaultAddress.protocol, + host = transport.defaultAddress.host, + port = transport.defaultAddress.port)) val remoteClientLifeCycleHandler = system.systemActorOf(Props(new Actor { def receive = { diff --git a/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala b/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala index ecd56cfbb3..d18abf7126 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala @@ -179,6 +179,12 @@ abstract class RemoteTransport(val system: ExtendedActorSystem, val provider: Re */ def addresses: immutable.Set[Address] + /** + * The default transport address of the actorsystem + * @return The listen address of the default transport + */ + def defaultAddress: Address + /** * Resolves the correct local address to be used for contacting the given remote address * @param remote the remote address @@ -245,7 +251,7 @@ abstract class RemoteTransport(val system: ExtendedActorSystem, val provider: Re * Serializes the ActorRef instance into a Protocol Buffers (protobuf) Message. */ def toRemoteActorRefProtocol(actor: ActorRef): ActorRefProtocol = - ActorRefProtocol.newBuilder.setPath(actor.path.toStringWithAddress(addresses.head)).build + ActorRefProtocol.newBuilder.setPath(actor.path.toStringWithAddress(defaultAddress)).build /** * Returns a new RemoteMessageProtocol containing the serialized representation of the given parameters. @@ -254,7 +260,7 @@ abstract class RemoteTransport(val system: ExtendedActorSystem, val provider: Re val messageBuilder = RemoteMessageProtocol.newBuilder.setRecipient(toRemoteActorRefProtocol(recipient)) if (senderOption.isDefined) messageBuilder.setSender(toRemoteActorRefProtocol(senderOption.get)) - Serialization.currentTransportAddress.withValue(addresses.head) { + Serialization.currentTransportAddress.withValue(defaultAddress) { messageBuilder.setMessage(MessageSerializer.serialize(system, message.asInstanceOf[AnyRef])) } diff --git a/akka-remote/src/main/scala/akka/remote/Remoting.scala b/akka-remote/src/main/scala/akka/remote/Remoting.scala index 22a6cdd0b0..6858a8188f 100644 --- a/akka-remote/src/main/scala/akka/remote/Remoting.scala +++ b/akka-remote/src/main/scala/akka/remote/Remoting.scala @@ -17,6 +17,8 @@ import scala.util.control.NonFatal import java.net.URLEncoder import java.util.concurrent.TimeoutException import scala.util.{ Failure, Success } +import scala.collection.immutable +import akka.japi.Util.immutableSeq class RemotingSettings(config: Config) { @@ -40,10 +42,10 @@ class RemotingSettings(config: Config) { val BackoffPeriod: FiniteDuration = Duration(getMilliseconds("akka.remoting.backoff-interval"), MILLISECONDS) - val Transports: List[(String, Config)] = - config.getConfigList("akka.remoting.transports").asScala.map { + val Transports: immutable.Seq[(String, Config)] = + immutableSeq(config.getConfigList("akka.remoting.transports")).map { conf ⇒ (conf.getString("transport-class"), conf.getConfig("settings")) - }.toList + } } private[remote] object Remoting { @@ -82,8 +84,11 @@ private[remote] object Remoting { private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteActorRefProvider) extends RemoteTransport(_system, _provider) { @volatile private var endpointManager: ActorRef = _ - @volatile var transportMapping: Map[String, Set[(Transport, Address)]] = _ + @volatile private var transportMapping: Map[String, Set[(Transport, Address)]] = _ @volatile var addresses: Set[Address] = _ + // FIXME: Temporary workaround until next Pull Request as the means of configuration changed + override def defaultAddress: Address = addresses.head + private val settings = new RemotingSettings(provider.remoteSettings.config) override def localAddressForRemote(remote: Address): Address = Remoting.localAddressForRemote(transportMapping, remote) @@ -320,14 +325,6 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends listens.onComplete { case Success(results) ⇒ - val transportsAndAddresses = (for ((transport, (address, promise)) ← results) yield { - promise.success(self) - transport -> address - }).toSet - addressesPromise.success(transportsAndAddresses) - - context.become(accepting) - transportMapping = HashMap() ++ results.groupBy { case (_, (transportAddress, _)) ⇒ transportAddress }.map { case (a, t) ⇒ if (t.size > 1) @@ -336,6 +333,14 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends a -> t.head._1 } + val transportsAndAddresses = (for ((transport, (address, promise)) ← results) yield { + promise.success(self) + transport -> address + }).toSet + addressesPromise.success(transportsAndAddresses) + + context.become(accepting) + case Failure(reason) ⇒ addressesPromise.failure(reason) } } diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index fa28dd9f4c..5fdaa23ba2 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -40,7 +40,7 @@ private[akka] class NettyRemoteTransport(_system: ExtendedActorSystem, _provider val settings = new NettySettings(remoteSettings.config.getConfig("akka.remote.netty"), remoteSettings.systemName) // Workaround to emulate the support of multiple local addresses - override def localAddressForRemote(remote: Address): Address = addresses.head + override def localAddressForRemote(remote: Address): Address = defaultAddress // TODO replace by system.scheduler val timer: HashedWheelTimer = new HashedWheelTimer(system.threadFactory) @@ -164,7 +164,7 @@ private[akka] class NettyRemoteTransport(_system: ExtendedActorSystem, _provider * the normal one, e.g. for inserting security hooks. Get this transport’s * address from `this.address`. */ - protected def createClient(recipient: Address): RemoteClient = new ActiveRemoteClient(this, recipient, addresses.head) + protected def createClient(recipient: Address): RemoteClient = new ActiveRemoteClient(this, recipient, defaultAddress) // the address is set in start() or from the RemoteServerHandler, whichever comes first private val _address = new AtomicReference[Address] @@ -177,8 +177,9 @@ private[akka] class NettyRemoteTransport(_system: ExtendedActorSystem, _provider } // Workaround to emulate the support of multiple local addresses - def addresses = Set(address) + override def addresses = Set(address) def address = _address.get + override def defaultAddress: Address = _address.get lazy val log = Logging(system.eventStream, "NettyRemoteTransport(" + addresses + ")") diff --git a/akka-remote/src/test/scala/akka/remote/Ticket1978CommunicationSpec.scala b/akka-remote/src/test/scala/akka/remote/Ticket1978CommunicationSpec.scala index 8b4fc8a444..ba9918d120 100644 --- a/akka-remote/src/test/scala/akka/remote/Ticket1978CommunicationSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/Ticket1978CommunicationSpec.scala @@ -135,7 +135,7 @@ abstract class Ticket1978CommunicationSpec(val cipherConfig: CipherConfig) exten ("-") must { if (cipherConfig.runTest) { val ignoreMe = other.actorOf(Props(new Actor { def receive = { case ("ping", x) ⇒ sender ! ((("pong", x), sender)) } }), "echo") - val otherAddress = other.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider].transport.addresses.head + val otherAddress = other.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider].transport.defaultAddress "support tell" in { val here = system.actorFor(otherAddress.toString + "/user/echo") diff --git a/akka-remote/src/test/scala/akka/remote/UntrustedSpec.scala b/akka-remote/src/test/scala/akka/remote/UntrustedSpec.scala index e2f9f1e6ee..bddfd6d39b 100644 --- a/akka-remote/src/test/scala/akka/remote/UntrustedSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/UntrustedSpec.scala @@ -34,7 +34,7 @@ akka.loglevel = DEBUG akka.actor.provider = akka.remote.RemoteActorRefProvider akka.remote.netty.port = 0 """)) - val addr = system.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider].transport.addresses.head + val addr = system.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider].transport.defaultAddress val target1 = other.actorFor(RootActorPath(addr) / "remote") val target2 = other.actorFor(RootActorPath(addr) / testActor.path.elements)