Added defaultAddress() to RemoteTransport API

This commit is contained in:
Endre Sándor Varga 2012-11-21 14:18:24 +01:00
parent b6bdb34e44
commit 682abccf7e
12 changed files with 40 additions and 28 deletions

View file

@ -61,7 +61,7 @@ class Cluster(val system: ExtendedActorSystem) extends Extension {
import settings._
val selfAddress: Address = system.provider match {
case c: ClusterActorRefProvider c.transport.addresses.head // FIXME: temporary workaround. See #2663
case c: ClusterActorRefProvider c.transport.defaultAddress
case other throw new ConfigurationException(
"ActorSystem [%s] needs to have a 'ClusterActorRefProvider' enabled in the configuration, currently uses [%s]".
format(system, other.getClass.getName))

View file

@ -39,7 +39,7 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with ImplicitSender {
import ClusterSpec._
// FIXME: temporary workaround. See #2663
val selfAddress = system.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[ClusterActorRefProvider].transport.addresses.head
val selfAddress = system.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[ClusterActorRefProvider].transport.defaultAddress
val cluster = Cluster(system)
def clusterView = cluster.readView

View file

@ -140,7 +140,7 @@ public class SerializationDocTestBase {
public Address getAddress() {
final ActorRefProvider provider = system.provider();
if (provider instanceof RemoteActorRefProvider) {
return ((RemoteActorRefProvider) provider).transport().addresses().head();
return ((RemoteActorRefProvider) provider).transport().defaultAddress();
} else {
throw new UnsupportedOperationException("need RemoteActorRefProvider");
}

View file

@ -216,7 +216,7 @@ package docs.serialization {
object ExternalAddress extends ExtensionKey[ExternalAddressExt]
class ExternalAddressExt(system: ExtendedActorSystem) extends Extension {
def addressForAkka: Address = akka.transportOf(system).addresses.head
def addressForAkka: Address = akka.transportOf(system).defaultAddress
}
def serializeAkkaDefault(ref: ActorRef): String =

View file

@ -72,7 +72,7 @@ class TestConductorExt(val system: ExtendedActorSystem) extends Extension with C
/**
* Transport address of this Netty-like remote transport.
*/
val address = transport.addresses.head //FIXME: Workaround for old-remoting -- must be removed later
val address = transport.defaultAddress
/**
* INTERNAL API.

View file

@ -410,7 +410,7 @@ abstract class MultiNodeSpec(val myself: RoleName, _system: ActorSystem, _roles:
// useful to see which jvm is running which role, used by LogRoleReplace utility
log.info("Role [{}] started with address [{}]", myself.name,
//FIXME: Workaround for old-remoting -- must be removed later
system.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider].transport.addresses.head)
system.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider].transport.defaultAddress)
}

View file

@ -84,10 +84,10 @@ class RemoteActorRefProvider(
// this enables reception of remote requests
_transport.start()
//FIXME defaultaddress maybe?
_rootPath = RootActorPath(local.rootPath.address.copy(
host = transport.addresses.head.host,
port = transport.addresses.head.port))
protocol = transport.defaultAddress.protocol,
host = transport.defaultAddress.host,
port = transport.defaultAddress.port))
val remoteClientLifeCycleHandler = system.systemActorOf(Props(new Actor {
def receive = {

View file

@ -179,6 +179,12 @@ abstract class RemoteTransport(val system: ExtendedActorSystem, val provider: Re
*/
def addresses: immutable.Set[Address]
/**
* The default transport address of the actorsystem
* @return The listen address of the default transport
*/
def defaultAddress: Address
/**
* Resolves the correct local address to be used for contacting the given remote address
* @param remote the remote address
@ -245,7 +251,7 @@ abstract class RemoteTransport(val system: ExtendedActorSystem, val provider: Re
* Serializes the ActorRef instance into a Protocol Buffers (protobuf) Message.
*/
def toRemoteActorRefProtocol(actor: ActorRef): ActorRefProtocol =
ActorRefProtocol.newBuilder.setPath(actor.path.toStringWithAddress(addresses.head)).build
ActorRefProtocol.newBuilder.setPath(actor.path.toStringWithAddress(defaultAddress)).build
/**
* Returns a new RemoteMessageProtocol containing the serialized representation of the given parameters.
@ -254,7 +260,7 @@ abstract class RemoteTransport(val system: ExtendedActorSystem, val provider: Re
val messageBuilder = RemoteMessageProtocol.newBuilder.setRecipient(toRemoteActorRefProtocol(recipient))
if (senderOption.isDefined) messageBuilder.setSender(toRemoteActorRefProtocol(senderOption.get))
Serialization.currentTransportAddress.withValue(addresses.head) {
Serialization.currentTransportAddress.withValue(defaultAddress) {
messageBuilder.setMessage(MessageSerializer.serialize(system, message.asInstanceOf[AnyRef]))
}

View file

@ -17,6 +17,8 @@ import scala.util.control.NonFatal
import java.net.URLEncoder
import java.util.concurrent.TimeoutException
import scala.util.{ Failure, Success }
import scala.collection.immutable
import akka.japi.Util.immutableSeq
class RemotingSettings(config: Config) {
@ -40,10 +42,10 @@ class RemotingSettings(config: Config) {
val BackoffPeriod: FiniteDuration =
Duration(getMilliseconds("akka.remoting.backoff-interval"), MILLISECONDS)
val Transports: List[(String, Config)] =
config.getConfigList("akka.remoting.transports").asScala.map {
val Transports: immutable.Seq[(String, Config)] =
immutableSeq(config.getConfigList("akka.remoting.transports")).map {
conf (conf.getString("transport-class"), conf.getConfig("settings"))
}.toList
}
}
private[remote] object Remoting {
@ -82,8 +84,11 @@ private[remote] object Remoting {
private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteActorRefProvider) extends RemoteTransport(_system, _provider) {
@volatile private var endpointManager: ActorRef = _
@volatile var transportMapping: Map[String, Set[(Transport, Address)]] = _
@volatile private var transportMapping: Map[String, Set[(Transport, Address)]] = _
@volatile var addresses: Set[Address] = _
// FIXME: Temporary workaround until next Pull Request as the means of configuration changed
override def defaultAddress: Address = addresses.head
private val settings = new RemotingSettings(provider.remoteSettings.config)
override def localAddressForRemote(remote: Address): Address = Remoting.localAddressForRemote(transportMapping, remote)
@ -320,14 +325,6 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
listens.onComplete {
case Success(results)
val transportsAndAddresses = (for ((transport, (address, promise)) results) yield {
promise.success(self)
transport -> address
}).toSet
addressesPromise.success(transportsAndAddresses)
context.become(accepting)
transportMapping = HashMap() ++ results.groupBy { case (_, (transportAddress, _)) transportAddress }.map {
case (a, t)
if (t.size > 1)
@ -336,6 +333,14 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
a -> t.head._1
}
val transportsAndAddresses = (for ((transport, (address, promise)) results) yield {
promise.success(self)
transport -> address
}).toSet
addressesPromise.success(transportsAndAddresses)
context.become(accepting)
case Failure(reason) addressesPromise.failure(reason)
}
}

View file

@ -40,7 +40,7 @@ private[akka] class NettyRemoteTransport(_system: ExtendedActorSystem, _provider
val settings = new NettySettings(remoteSettings.config.getConfig("akka.remote.netty"), remoteSettings.systemName)
// Workaround to emulate the support of multiple local addresses
override def localAddressForRemote(remote: Address): Address = addresses.head
override def localAddressForRemote(remote: Address): Address = defaultAddress
// TODO replace by system.scheduler
val timer: HashedWheelTimer = new HashedWheelTimer(system.threadFactory)
@ -164,7 +164,7 @@ private[akka] class NettyRemoteTransport(_system: ExtendedActorSystem, _provider
* the normal one, e.g. for inserting security hooks. Get this transports
* address from `this.address`.
*/
protected def createClient(recipient: Address): RemoteClient = new ActiveRemoteClient(this, recipient, addresses.head)
protected def createClient(recipient: Address): RemoteClient = new ActiveRemoteClient(this, recipient, defaultAddress)
// the address is set in start() or from the RemoteServerHandler, whichever comes first
private val _address = new AtomicReference[Address]
@ -177,8 +177,9 @@ private[akka] class NettyRemoteTransport(_system: ExtendedActorSystem, _provider
}
// Workaround to emulate the support of multiple local addresses
def addresses = Set(address)
override def addresses = Set(address)
def address = _address.get
override def defaultAddress: Address = _address.get
lazy val log = Logging(system.eventStream, "NettyRemoteTransport(" + addresses + ")")

View file

@ -135,7 +135,7 @@ abstract class Ticket1978CommunicationSpec(val cipherConfig: CipherConfig) exten
("-") must {
if (cipherConfig.runTest) {
val ignoreMe = other.actorOf(Props(new Actor { def receive = { case ("ping", x) sender ! ((("pong", x), sender)) } }), "echo")
val otherAddress = other.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider].transport.addresses.head
val otherAddress = other.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider].transport.defaultAddress
"support tell" in {
val here = system.actorFor(otherAddress.toString + "/user/echo")

View file

@ -34,7 +34,7 @@ akka.loglevel = DEBUG
akka.actor.provider = akka.remote.RemoteActorRefProvider
akka.remote.netty.port = 0
"""))
val addr = system.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider].transport.addresses.head
val addr = system.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider].transport.defaultAddress
val target1 = other.actorFor(RootActorPath(addr) / "remote")
val target2 = other.actorFor(RootActorPath(addr) / testActor.path.elements)