diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 94ec966468..2bc8e77885 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -253,7 +253,7 @@ private[akka] abstract class InternalActorRef extends ActorRef with ScalaActorRe } private[akka] case object Nobody extends MinimalActorRef { - val path = new RootActorPath(new LocalAddress("all-systems"), "/Nobody") + val path = new RootActorPath(Address("akka", "all-systems"), "/Nobody") } /** diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index 7bc9f502d1..a41f4572ff 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -280,7 +280,7 @@ class LocalActorRefProvider( eventStream, scheduler, deadLetters, - new RootActorPath(LocalAddress(_systemName)), + new RootActorPath(Address("akka", _systemName)), new Deployer(settings)) val log = Logging(eventStream, "LocalActorRefProvider(" + rootPath.address + ")") @@ -443,7 +443,7 @@ class LocalActorRefProvider( deadLetters } else if (elems.head.isEmpty) actorFor(rootGuardian, elems.tail) else actorFor(ref, elems) - case LocalActorPath(address, elems) if address == rootPath.address ⇒ actorFor(rootGuardian, elems) + case ActorPathExtractor(address, elems) if address == rootPath.address ⇒ actorFor(rootGuardian, elems) case _ ⇒ log.debug("look-up of unknown path '{}' failed", path) deadLetters diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index a84d080536..db807df939 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -341,20 +341,19 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor } val provider: ActorRefProvider = { - val providerClass = ReflectiveAccess.getClassFor(ProviderClass) match { - case Left(e) ⇒ throw e - case Right(b) ⇒ b - } val arguments = Seq( classOf[String] -> name, classOf[Settings] -> settings, classOf[EventStream] -> eventStream, classOf[Scheduler] -> scheduler, classOf[InternalActorRef] -> deadLetters) - val types: Array[Class[_]] = arguments map (_._1) toArray - val values: Array[AnyRef] = arguments map (_._2) toArray - ReflectiveAccess.createInstance[ActorRefProvider](providerClass, types, values) match { + val loader = Thread.currentThread.getContextClassLoader match { + case null ⇒ getClass.getClassLoader + case l ⇒ l + } + + ReflectiveAccess.createInstance[ActorRefProvider](ProviderClass, arguments, loader) match { case Left(e) ⇒ throw e case Right(p) ⇒ p } diff --git a/akka-actor/src/main/scala/akka/actor/Address.scala b/akka-actor/src/main/scala/akka/actor/Address.scala index e9097d72d6..e9dc24dad8 100644 --- a/akka-actor/src/main/scala/akka/actor/Address.scala +++ b/akka-actor/src/main/scala/akka/actor/Address.scala @@ -9,17 +9,41 @@ import java.net.URISyntaxException * The address specifies the physical location under which an Actor can be * reached. Examples are local addresses, identified by the ActorSystem’s * name, and remote addresses, identified by protocol, host and port. + * + * This class is final to allow use as a case class (copy method etc.); if + * for example a remote transport would want to associate additional + * information with an address, then this must be done externally. */ -abstract class Address { - def protocol: String - def hostPort: String +final case class Address(protocol: String, system: String, host: Option[String], port: Option[Int]) { + + def this(protocol: String, system: String) = this(protocol, system, None, None) + def this(protocol: String, system: String, host: String, port: Int) = this(protocol, system, Option(host), Some(port)) + @transient - override lazy val toString = protocol + "://" + hostPort + override lazy val toString = { + val sb = new StringBuilder(protocol) + sb.append("://") + sb.append(hostPort) + sb.toString + } + + @transient + lazy val hostPort = { + val sb = new StringBuilder(system) + if (host.isDefined) { + sb.append('@') + sb.append(host.get) + } + if (port.isDefined) { + sb.append(':') + sb.append(port.get) + } + sb.toString + } } -case class LocalAddress(systemName: String) extends Address { - def protocol = "akka" - def hostPort = systemName +object Address { + def apply(protocol: String, system: String) = new Address(protocol, system) } object RelativeActorPath { @@ -32,12 +56,34 @@ object RelativeActorPath { } } -object LocalActorPath { - def unapply(addr: String): Option[(LocalAddress, Iterable[String])] = { +object AddressExtractor { + def unapply(addr: String): Option[Address] = { try { val uri = new URI(addr) - if (uri.getScheme != "akka" || uri.getUserInfo != null || uri.getHost == null || uri.getPath == null) None - else Some(LocalAddress(uri.getHost), ActorPath.split(uri.getPath).drop(1)) + if (uri.getScheme == null || (uri.getUserInfo == null && uri.getHost == null)) None + else { + val addr = Address(uri.getScheme, if (uri.getUserInfo != null) uri.getUserInfo else uri.getHost, + if (uri.getUserInfo == null || uri.getHost == null) None else Some(uri.getHost), + if (uri.getPort < 0) None else Some(uri.getPort)) + Some(addr) + } + } catch { + case _: URISyntaxException ⇒ None + } + } +} + +object ActorPathExtractor { + def unapply(addr: String): Option[(Address, Iterable[String])] = { + try { + val uri = new URI(addr) + if (uri.getScheme == null || (uri.getUserInfo == null && uri.getHost == null) || uri.getPath == null) None + else { + val addr = Address(uri.getScheme, if (uri.getUserInfo != null) uri.getUserInfo else uri.getHost, + if (uri.getUserInfo == null || uri.getHost == null) None else Some(uri.getHost), + if (uri.getPort < 0) None else Some(uri.getPort)) + Some((addr, ActorPath.split(uri.getPath).drop(1))) + } } catch { case _: URISyntaxException ⇒ None } diff --git a/akka-actor/src/main/scala/akka/event/Logging.scala b/akka-actor/src/main/scala/akka/event/Logging.scala index 07a1da1da5..16396bd62e 100644 --- a/akka-actor/src/main/scala/akka/event/Logging.scala +++ b/akka-actor/src/main/scala/akka/event/Logging.scala @@ -647,7 +647,7 @@ object Logging { * akka.stdout-loglevel in akka.conf. */ class StandardOutLogger extends MinimalActorRef with StdOutLogger { - val path: ActorPath = new RootActorPath(LocalAddress("all-systems"), "/StandardOutLogger") + val path: ActorPath = new RootActorPath(Address("akka", "all-systems"), "/StandardOutLogger") override val toString = "StandardOutLogger" override def !(message: Any)(implicit sender: ActorRef = null): Unit = print(message) } diff --git a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala index 70b6fa5a03..0c8721db2c 100644 --- a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala +++ b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala @@ -39,6 +39,15 @@ object ReflectiveAccess { } } + def createInstance[T](clazz: Class[_], args: Seq[(Class[_], AnyRef)]): Either[Exception, T] = + createInstance(clazz, args.map(_._1).toArray, args.map(_._2).toArray) + + def createInstance[T](fqcn: String, args: Seq[(Class[_], AnyRef)], classloader: ClassLoader): Either[Exception, T] = + createInstance(fqcn, args.map(_._1).toArray, args.map(_._2).toArray, classloader) + + def createInstance[T](fqcn: String, args: Seq[(Class[_], AnyRef)]): Either[Exception, T] = + createInstance(fqcn, args.map(_._1).toArray, args.map(_._2).toArray, loader) + //Obtains a reference to fqn.MODULE$ def getObjectFor[T](fqn: String, classloader: ClassLoader = loader): Either[Exception, T] = try { getClassFor(fqn, classloader) match { diff --git a/akka-remote/src/main/resources/reference.conf b/akka-remote/src/main/resources/reference.conf index a89508982c..e519aae027 100644 --- a/akka-remote/src/main/resources/reference.conf +++ b/akka-remote/src/main/resources/reference.conf @@ -38,29 +38,82 @@ akka { remote { - # Which implementation of akka.remote.RemoteSupport to use + # Which implementation of akka.remote.RemoteTransport to use # default is a TCP-based remote transport based on Netty - transport = "akka.remote.netty.NettyRemoteSupport" + transport = "akka.remote.netty.NettyRemoteTransport" - # In case of increased latency / overflow how long - # should we wait (blocking the sender) until we deem the send to be cancelled? - # 0 means "never backoff", any positive number will indicate time to block at most. - backoff-timeout = 0ms - - use-compression = off - - # Generate your own with '$AKKA_HOME/scripts/generate_config_with_secure_cookie.sh' - # or using 'akka.util.Crypt.generateSecureCookie' - secure-cookie = "" + # Enable untrusted mode for full security of server managed actors, allows + # untrusted clients to connect. + untrusted-mode = off # Timeout for ACK of cluster operations, lik checking actor out etc. remote-daemon-ack-timeout = 30s - # Reuse inbound connections for outbound messages - use-passive-connections = on + # Each property is annotated with (C) or (S) or (C&S), where C stands for “client” and S for “server” role. + # The NettyRemoteTransport always starts the server role to allow inbound connections, and it starts + # active client connections whenever sending to a destination which is not yet connected; if configured + # it reuses inbound connections for replies, which is called a passive client connection (i.e. from server + # to client). + netty { + + # (C) In case of increased latency / overflow how long + # should we wait (blocking the sender) until we deem the send to be cancelled? + # 0 means "never backoff", any positive number will indicate time to block at most. + backoff-timeout = 0ms + + # (C&S) Generate your own with '$AKKA_HOME/scripts/generate_config_with_secure_cookie.sh' + # or using 'akka.util.Crypt.generateSecureCookie' + secure-cookie = "" + + # (S) Should the remote server require that it peers share the same secure-cookie + # (defined in the 'remote' section)? + require-cookie = off - # Whether any Threds created by the remoting should be daemons or not - daemonic = on + # (S) Reuse inbound connections for outbound messages + use-passive-connections = on + + # (C&S) Whether any Threds created by the remoting should be daemons or not + daemonic = on + + # (S) The hostname or ip to bind the remoting to, + # InetAddress.getLocalHost.getHostAddress is used if empty + hostname = "" + + # (S) The default remote server port clients should connect to. + # Default is 2552 (AKKA), use 0 if you want a random available port + port = 2552 + + # (C&S) Increase this if you want to be able to send messages with large payloads + message-frame-size = 1 MiB + + # (C) Timeout duration + connection-timeout = 120s + + # (S) Sets the size of the connection backlog + backlog = 4096 + + # (S) Length in akka.time-unit how long core threads will be kept alive if idling + execution-pool-keepalive = 60s + + # (S) Size of the core pool of the remote execution unit + execution-pool-size = 4 + + # (S) Maximum channel size, 0 for off + max-channel-memory-size = 0b + + # (S) Maximum total size of all channels, 0 for off + max-total-memory-size = 0b + + # (C) Time between reconnect attempts for active clients + reconnect-delay = 5s + + # (C) Inactivity period after which active client connection is shutdown; will be + # re-established in case of new communication requests + read-timeout = 3600s + + # (C) Maximum time window that a client should try to reconnect for + reconnection-time-window = 600s + } # accrual failure detection config failure-detector { @@ -93,52 +146,6 @@ akka { daemonic = on } - server { - # The hostname or ip to bind the remoting to, - # InetAddress.getLocalHost.getHostAddress is used if empty - hostname = "" - - # The default remote server port clients should connect to. - # Default is 2552 (AKKA), use 0 if you want a random available port - port = 2552 - - # Increase this if you want to be able to send messages with large payloads - message-frame-size = 1 MiB - - # Timeout duration - connection-timeout = 120s - - # Should the remote server require that it peers share the same secure-cookie - # (defined in the 'remote' section)? - require-cookie = off - - # Enable untrusted mode for full security of server managed actors, allows - # untrusted clients to connect. - untrusted-mode = off - - # Sets the size of the connection backlog - backlog = 4096 - - # Length in akka.time-unit how long core threads will be kept alive if idling - execution-pool-keepalive = 60s - - # Size of the core pool of the remote execution unit - execution-pool-size = 4 - - # Maximum channel size, 0 for off - max-channel-memory-size = 0b - - # Maximum total size of all channels, 0 for off - max-total-memory-size = 0b - } - - client { - reconnect-delay = 5s - read-timeout = 3600s - message-frame-size = 1 MiB - # Maximum time window that a client should try to reconnect for - reconnection-time-window = 600s - } } cluster { diff --git a/akka-remote/src/main/scala/akka/remote/AccrualFailureDetector.scala b/akka-remote/src/main/scala/akka/remote/AccrualFailureDetector.scala index 23043c5303..0384358557 100644 --- a/akka-remote/src/main/scala/akka/remote/AccrualFailureDetector.scala +++ b/akka-remote/src/main/scala/akka/remote/AccrualFailureDetector.scala @@ -5,13 +5,11 @@ package akka.remote import java.util.concurrent.atomic.AtomicReference - import scala.collection.immutable.Map import scala.annotation.tailrec - import System.{ currentTimeMillis ⇒ newTimestamp } - import akka.actor.ActorSystem +import akka.actor.Address /** * Implementation of 'The Phi Accrual Failure Detector' by Hayashibara et al. as defined in their paper: @@ -35,9 +33,9 @@ class AccrualFailureDetector(val threshold: Int = 8, val maxSampleSize: Int = 10 */ private case class State( version: Long = 0L, - failureStats: Map[ParsedTransportAddress, FailureStats] = Map.empty[ParsedTransportAddress, FailureStats], - intervalHistory: Map[ParsedTransportAddress, Vector[Long]] = Map.empty[ParsedTransportAddress, Vector[Long]], - timestamps: Map[ParsedTransportAddress, Long] = Map.empty[ParsedTransportAddress, Long]) + failureStats: Map[Address, FailureStats] = Map.empty[Address, FailureStats], + intervalHistory: Map[Address, Vector[Long]] = Map.empty[Address, Vector[Long]], + timestamps: Map[Address, Long] = Map.empty[Address, Long]) private val state = new AtomicReference[State](State()) @@ -45,13 +43,13 @@ class AccrualFailureDetector(val threshold: Int = 8, val maxSampleSize: Int = 10 * Returns true if the connection is considered to be up and healthy * and returns false otherwise. */ - def isAvailable(connection: ParsedTransportAddress): Boolean = phi(connection) < threshold + def isAvailable(connection: Address): Boolean = phi(connection) < threshold /** * Records a heartbeat for a connection. */ @tailrec - final def heartbeat(connection: ParsedTransportAddress) { + final def heartbeat(connection: Address) { val oldState = state.get val latestTimestamp = oldState.timestamps.get(connection) @@ -132,7 +130,7 @@ class AccrualFailureDetector(val threshold: Int = 8, val maxSampleSize: Int = 10 * Implementations of 'Cumulative Distribution Function' for Exponential Distribution. * For a discussion on the math read [https://issues.apache.org/jira/browse/CASSANDRA-2597]. */ - def phi(connection: ParsedTransportAddress): Double = { + def phi(connection: Address): Double = { val oldState = state.get val oldTimestamp = oldState.timestamps.get(connection) if (oldTimestamp.isEmpty) 0.0D // treat unmanaged connections, e.g. with zero heartbeats, as healthy connections @@ -147,7 +145,7 @@ class AccrualFailureDetector(val threshold: Int = 8, val maxSampleSize: Int = 10 * Removes the heartbeat management for a connection. */ @tailrec - final def remove(connection: ParsedTransportAddress) { + final def remove(connection: Address) { val oldState = state.get if (oldState.failureStats.contains(connection)) { diff --git a/akka-remote/src/main/scala/akka/remote/Gossiper.scala b/akka-remote/src/main/scala/akka/remote/Gossiper.scala index a12e5ecab1..9af6f584ae 100644 --- a/akka-remote/src/main/scala/akka/remote/Gossiper.scala +++ b/akka-remote/src/main/scala/akka/remote/Gossiper.scala @@ -27,8 +27,8 @@ import akka.dispatch.Await * Interface for node membership change listener. */ trait NodeMembershipChangeListener { - def nodeConnected(node: ParsedTransportAddress) - def nodeDisconnected(node: ParsedTransportAddress) + def nodeConnected(node: Address) + def nodeDisconnected(node: Address) } /** @@ -36,9 +36,9 @@ trait NodeMembershipChangeListener { */ case class Gossip( version: VectorClock, - node: ParsedTransportAddress, - availableNodes: Set[ParsedTransportAddress] = Set.empty[ParsedTransportAddress], - unavailableNodes: Set[ParsedTransportAddress] = Set.empty[ParsedTransportAddress]) + node: Address, + availableNodes: Set[Address] = Set.empty[Address], + unavailableNodes: Set[Address] = Set.empty[Address]) // ====== START - NEW GOSSIP IMPLEMENTATION ====== /* @@ -94,7 +94,7 @@ case class Gossip( * gossip to random seed with certain probability depending on number of unreachable, seed and live nodes. * */ -class Gossiper(remote: Remote, system: ActorSystemImpl) { +class Gossiper(remote: RemoteActorRefProvider, system: ActorSystemImpl) { /** * Represents the state for this Gossiper. Implemented using optimistic lockless concurrency, @@ -108,27 +108,19 @@ class Gossiper(remote: Remote, system: ActorSystemImpl) { private val serialization = remote.serialization private val log = Logging(system, "Gossiper") private val failureDetector = remote.failureDetector - private val connectionManager = new RemoteConnectionManager(system, remote, Map.empty[ParsedTransportAddress, ActorRef]) + private val connectionManager = new RemoteConnectionManager(system, remote, Map.empty[Address, ActorRef]) private val seeds = { - val seeds = remoteSettings.SeedNodes flatMap { - case x: UnparsedTransportAddress ⇒ - x.parse(remote.transports) match { - case y: ParsedTransportAddress ⇒ Some(y) - case _ ⇒ None - } - case _ ⇒ None - } - if (seeds.isEmpty) throw new ConfigurationException( + if (remoteSettings.SeedNodes.isEmpty) throw new ConfigurationException( "At least one seed node must be defined in the configuration [akka.cluster.seed-nodes]") - else seeds + else remoteSettings.SeedNodes } - private val address = remote.remoteAddress + private val address = remote.transport.address private val nodeFingerprint = address.## private val random = SecureRandom.getInstance("SHA1PRNG") - private val initalDelayForGossip = remoteSettings.InitalDelayForGossip + private val initalDelayForGossip = remoteSettings.InitialDelayForGossip private val gossipFrequency = remoteSettings.GossipFrequency private val state = new AtomicReference[State](State(currentGossip = newGossip())) @@ -166,7 +158,7 @@ class Gossiper(remote: Remote, system: ActorSystemImpl) { node ← oldAvailableNodes if connectionManager.connectionFor(node).isEmpty } { - val connectionFactory = () ⇒ system.actorFor(RootActorPath(RemoteSystemAddress(system.name, gossipingNode)) / "remote") + val connectionFactory = () ⇒ system.actorFor(RootActorPath(gossipingNode) / "remote") connectionManager.putIfAbsent(node, connectionFactory) // create a new remote connection to the new node oldState.nodeMembershipChangeListeners foreach (_ nodeConnected node) // notify listeners about the new nodes } @@ -240,7 +232,7 @@ class Gossiper(remote: Remote, system: ActorSystemImpl) { /** * Gossips set of nodes passed in as argument. Returns 'true' if it gossiped to a "seed" node. */ - private def gossipTo(nodes: Set[ParsedTransportAddress]): Boolean = { + private def gossipTo(nodes: Set[Address]): Boolean = { val peers = nodes filter (_ != address) // filter out myself val peer = selectRandomNode(peers) val oldState = state.get @@ -298,8 +290,8 @@ class Gossiper(remote: Remote, system: ActorSystemImpl) { private def newGossip(): Gossip = Gossip( version = VectorClock(), - node = address.transport, - availableNodes = Set(address.transport)) + node = address, + availableNodes = Set(address)) private def incrementVersionForGossip(from: Gossip): Gossip = { val newVersion = from.version.increment(nodeFingerprint, newTimestamp) @@ -314,7 +306,7 @@ class Gossiper(remote: Remote, system: ActorSystemImpl) { } } - private def selectRandomNode(nodes: Set[ParsedTransportAddress]): ParsedTransportAddress = { + private def selectRandomNode(nodes: Set[Address]): Address = { nodes.toList(random.nextInt(nodes.size)) } } diff --git a/akka-remote/src/main/scala/akka/remote/NetworkEventStream.scala b/akka-remote/src/main/scala/akka/remote/NetworkEventStream.scala index 6d3f340cb5..763b22a6fa 100644 --- a/akka-remote/src/main/scala/akka/remote/NetworkEventStream.scala +++ b/akka-remote/src/main/scala/akka/remote/NetworkEventStream.scala @@ -8,6 +8,7 @@ import scala.collection.mutable import akka.actor.{ LocalActorRef, Actor, ActorRef, Props, newUuid } import akka.actor.Actor._ import akka.actor.ActorSystemImpl +import akka.actor.Address /** * Stream of all kinds of network events, remote failure and connection events, cluster failure and connection events etc. @@ -17,10 +18,10 @@ object NetworkEventStream { private sealed trait NetworkEventStreamEvent - private case class Register(listener: Listener, connectionAddress: ParsedTransportAddress) + private case class Register(listener: Listener, connectionAddress: Address) extends NetworkEventStreamEvent - private case class Unregister(listener: Listener, connectionAddress: ParsedTransportAddress) + private case class Unregister(listener: Listener, connectionAddress: Address) extends NetworkEventStreamEvent /** @@ -35,8 +36,8 @@ object NetworkEventStream { */ private class Channel extends Actor { - val listeners = new mutable.HashMap[ParsedTransportAddress, mutable.Set[Listener]]() { - override def default(k: ParsedTransportAddress) = mutable.Set.empty[Listener] + val listeners = new mutable.HashMap[Address, mutable.Set[Listener]]() { + override def default(k: Address) = mutable.Set.empty[Listener] } def receive = { @@ -67,12 +68,12 @@ class NetworkEventStream(system: ActorSystemImpl) { /** * Registers a network event stream listener (asyncronously). */ - def register(listener: Listener, connectionAddress: ParsedTransportAddress) = + def register(listener: Listener, connectionAddress: Address) = sender ! Register(listener, connectionAddress) /** * Unregisters a network event stream listener (asyncronously) . */ - def unregister(listener: Listener, connectionAddress: ParsedTransportAddress) = + def unregister(listener: Listener, connectionAddress: Address) = sender ! Unregister(listener, connectionAddress) } diff --git a/akka-remote/src/main/scala/akka/remote/Remote.scala b/akka-remote/src/main/scala/akka/remote/Remote.scala deleted file mode 100644 index 02d6d682b0..0000000000 --- a/akka-remote/src/main/scala/akka/remote/Remote.scala +++ /dev/null @@ -1,275 +0,0 @@ -/** - * Copyright (C) 2009-2011 Typesafe Inc. - */ - -package akka.remote - -import akka.actor._ -import akka.event._ -import akka.util._ -import akka.util.duration._ -import akka.util.Helpers._ -import akka.serialization.{ JavaSerializer, Serialization, SerializationExtension } -import akka.dispatch.MessageDispatcher -import akka.dispatch.SystemMessage -import scala.annotation.tailrec -import akka.remote.RemoteProtocol.{ ActorRefProtocol, AkkaRemoteProtocol, RemoteControlProtocol, RemoteMessageProtocol } - -/** - * Remote module - contains remote client and server config, remote server instance, remote daemon, remote dispatchers etc. - */ -class Remote(val settings: ActorSystem.Settings, val remoteSettings: RemoteSettings) { - - import settings._ - - // TODO make this really pluggable - val transports: TransportsMap = Map("akka" -> ((h, p) ⇒ Right(RemoteNettyAddress(h, p)))) - val remoteAddress: RemoteSystemAddress[ParsedTransportAddress] = { - val unparsedAddress = remoteSettings.serverSettings.URI match { - case RemoteAddressExtractor(a) ⇒ a - case x ⇒ throw new IllegalArgumentException("cannot parse URI " + x) - } - val parsed = unparsedAddress.parse(transports) match { - case Left(x) ⇒ throw new IllegalArgumentException(x.transport.error) - case Right(x) ⇒ x - } - parsed.copy(system = settings.name) - } - - val failureDetector = new AccrualFailureDetector(remoteSettings.FailureDetectorThreshold, remoteSettings.FailureDetectorMaxSampleSize) - - @volatile - private var _serialization: Serialization = _ - def serialization = _serialization - - @volatile - private var _computeGridDispatcher: MessageDispatcher = _ - def computeGridDispatcher = _computeGridDispatcher - - @volatile - private var _remoteDaemon: InternalActorRef = _ - def remoteDaemon = _remoteDaemon - - @volatile - private var _eventStream: NetworkEventStream = _ - def eventStream = _eventStream - - @volatile - private var _transport: RemoteSupport[ParsedTransportAddress] = _ - def transport = _transport - - @volatile - private var _provider: RemoteActorRefProvider = _ - def provider = _provider - - def init(system: ActorSystemImpl, provider: RemoteActorRefProvider) = { - - val log = Logging(system, "Remote") - - _provider = provider - _serialization = SerializationExtension(system) - _computeGridDispatcher = system.dispatchers.lookup("akka.remote.compute-grid-dispatcher") - _remoteDaemon = new RemoteSystemDaemon(system, this, system.provider.rootPath / "remote", system.provider.rootGuardian, log) - _eventStream = new NetworkEventStream(system) - _transport = { - val arguments = Seq( - classOf[ActorSystemImpl] -> system, - classOf[Remote] -> this, - classOf[RemoteSystemAddress[_ <: ParsedTransportAddress]] -> remoteAddress) - val types: Array[Class[_]] = arguments map (_._1) toArray - val values: Array[AnyRef] = arguments map (_._2) toArray - - ReflectiveAccess.createInstance[RemoteSupport[ParsedTransportAddress]](remoteSettings.RemoteTransport, types, values) match { - case Left(problem) ⇒ - - log.error(problem, "Could not load remote transport layer") - throw problem - - case Right(remote) ⇒ - - remote.start(Option(Thread.currentThread().getContextClassLoader)) //TODO Any application loader here? - - val remoteClientLifeCycleHandler = system.systemActorOf(Props(new Actor { - def receive = { - case RemoteClientError(cause, remote, address) ⇒ remote.shutdownClientConnection(address) - case RemoteClientDisconnected(remote, address) ⇒ remote.shutdownClientConnection(address) - case _ ⇒ //ignore other - } - }), "RemoteClientLifeCycleListener") - - system.eventStream.subscribe(eventStream.sender, classOf[RemoteLifeCycleEvent]) - system.eventStream.subscribe(remoteClientLifeCycleHandler, classOf[RemoteLifeCycleEvent]) - - remote - } - } - - log.info("Starting remote server on [{}@{}]", system.name, remoteAddress) - } -} - -sealed trait DaemonMsg -case class DaemonMsgCreate(factory: () ⇒ Actor, path: String, supervisor: ActorRef) extends DaemonMsg -case class DaemonMsgWatch(watcher: ActorRef, watched: ActorRef) extends DaemonMsg - -/** - * Internal system "daemon" actor for remote internal communication. - * - * It acts as the brain of the remote that responds to system remote events (messages) and undertakes action. - */ -class RemoteSystemDaemon(system: ActorSystemImpl, remote: Remote, _path: ActorPath, _parent: InternalActorRef, _log: LoggingAdapter) - extends VirtualPathContainer(_path, _parent, _log) { - - /** - * Find the longest matching path which we know about and return that ref - * (or ask that ref to continue searching if elements are left). - */ - override def getChild(names: Iterator[String]): InternalActorRef = { - - @tailrec - def rec(s: String, n: Int): (InternalActorRef, Int) = { - getChild(s) match { - case null ⇒ - val last = s.lastIndexOf('/') - if (last == -1) (Nobody, n) - else rec(s.substring(0, last), n + 1) - case ref ⇒ (ref, n) - } - } - - val full = Vector() ++ names - rec(full.mkString("/"), 0) match { - case (Nobody, _) ⇒ Nobody - case (ref, 0) ⇒ ref - case (ref, n) ⇒ ref.getChild(full.takeRight(n).iterator) - } - } - - override def !(msg: Any)(implicit sender: ActorRef = null): Unit = msg match { - case message: DaemonMsg ⇒ - log.debug("Received command [{}] to RemoteSystemDaemon on [{}]", message, path.address.hostPort) - message match { - case DaemonMsgCreate(factory, path, supervisor) ⇒ - import remote.remoteAddress - implicit val t = remote.transports - - path match { - case ParsedActorPath(`remoteAddress`, elems) if elems.nonEmpty && elems.head == "remote" ⇒ - // TODO RK canonicalize path so as not to duplicate it always #1446 - val subpath = elems.drop(1) - val path = remote.remoteDaemon.path / subpath - val actor = system.provider.actorOf(system, - Props(creator = factory), - supervisor.asInstanceOf[InternalActorRef], - path, true, None) - addChild(subpath.mkString("/"), actor) - system.deathWatch.subscribe(this, actor) - case _ ⇒ - log.error("remote path does not match path from message [{}]", message) - } - case DaemonMsgWatch(watcher, watched) ⇒ - val other = system.actorFor(watcher.path.root / "remote") - system.deathWatch.subscribe(other, watched) - } - - case Terminated(child: LocalActorRef) ⇒ removeChild(child.path.elements.drop(1).mkString("/")) - - case t: Terminated ⇒ system.deathWatch.publish(t) - - case unknown ⇒ log.warning("Unknown message {} received by {}", unknown, this) - } - -} - -class RemoteMessage(input: RemoteMessageProtocol, system: ActorSystemImpl, classLoader: Option[ClassLoader] = None) { - - def originalReceiver = input.getRecipient.getPath - - lazy val sender: ActorRef = - if (input.hasSender) system.provider.actorFor(system.provider.rootGuardian, input.getSender.getPath) - else system.deadLetters - - lazy val recipient: InternalActorRef = system.provider.actorFor(system.provider.rootGuardian, originalReceiver) - - lazy val payload: AnyRef = MessageSerializer.deserialize(system, input.getMessage, classLoader) - - override def toString = "RemoteMessage: " + payload + " to " + recipient + "<+{" + originalReceiver + "} from " + sender -} - -trait RemoteMarshallingOps { - - def log: LoggingAdapter - - def system: ActorSystem - - def remote: Remote - - protected def useUntrustedMode: Boolean - - def createMessageSendEnvelope(rmp: RemoteMessageProtocol): AkkaRemoteProtocol = { - val arp = AkkaRemoteProtocol.newBuilder - arp.setMessage(rmp) - arp.build - } - - def createControlEnvelope(rcp: RemoteControlProtocol): AkkaRemoteProtocol = { - val arp = AkkaRemoteProtocol.newBuilder - arp.setInstruction(rcp) - arp.build - } - - /** - * Serializes the ActorRef instance into a Protocol Buffers (protobuf) Message. - */ - def toRemoteActorRefProtocol(actor: ActorRef): ActorRefProtocol = { - ActorRefProtocol.newBuilder.setPath(actor.path.toString).build - } - - def createRemoteMessageProtocolBuilder( - recipient: ActorRef, - message: Any, - senderOption: Option[ActorRef]): RemoteMessageProtocol.Builder = { - - val messageBuilder = RemoteMessageProtocol.newBuilder.setRecipient(toRemoteActorRefProtocol(recipient)) - messageBuilder.setMessage(MessageSerializer.serialize(system, message.asInstanceOf[AnyRef])) - - if (senderOption.isDefined) messageBuilder.setSender(toRemoteActorRefProtocol(senderOption.get)) - - messageBuilder - } - - def receiveMessage(remoteMessage: RemoteMessage) { - log.debug("received message {}", remoteMessage) - - val remoteDaemon = remote.remoteDaemon - - remoteMessage.recipient match { - case `remoteDaemon` ⇒ - remoteMessage.payload match { - case m @ (_: DaemonMsg | _: Terminated) ⇒ - try remoteDaemon ! m catch { - case e: Exception ⇒ log.error(e, "exception while processing remote command {} from {}", m, remoteMessage.sender) - } - case x ⇒ log.warning("remoteDaemon received illegal message {} from {}", x, remoteMessage.sender) - } - case l: LocalRef ⇒ - remoteMessage.payload match { - case msg: SystemMessage ⇒ - if (useUntrustedMode) - throw new SecurityException("RemoteModule server is operating is untrusted mode, can not send system message") - else l.sendSystemMessage(msg) - case _: AutoReceivedMessage if (useUntrustedMode) ⇒ - throw new SecurityException("RemoteModule server is operating is untrusted mode, can not pass on a AutoReceivedMessage to the remote actor") - case m ⇒ l.!(m)(remoteMessage.sender) - } - case r: RemoteActorRef ⇒ - implicit val t = remote.transports - remoteMessage.originalReceiver match { - case ParsedActorPath(address, _) if address == remote.remoteDaemon.path.address ⇒ - r.!(remoteMessage.payload)(remoteMessage.sender) - case r ⇒ log.error("dropping message {} for non-local recipient {}", remoteMessage.payload, r) - } - case r ⇒ log.error("dropping message {} for non-local recipient {}", remoteMessage.payload, r) - } - } -} diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 397f110783..21b7089708 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -16,6 +16,10 @@ import akka.event.EventStream import akka.dispatch.Promise import akka.config.ConfigurationException import java.util.concurrent.{ TimeoutException } +import com.typesafe.config.Config +import akka.util.ReflectiveAccess +import akka.serialization.Serialization +import akka.serialization.SerializationExtension /** * Remote ActorRefProvider. Starts up actor on remote node and creates a RemoteActorRef representing it. @@ -37,22 +41,63 @@ class RemoteActorRefProvider( val deployer = new RemoteDeployer(settings) - val remote = new Remote(settings, remoteSettings) - implicit val transports = remote.transports + val transport: RemoteTransport = { + val fqn = remoteSettings.RemoteTransport + // TODO check if this classloader is the right one + ReflectiveAccess.createInstance[RemoteTransport]( + fqn, + Seq(classOf[RemoteSettings] -> remoteSettings), + getClass.getClassLoader) match { + case Left(problem) ⇒ throw new RemoteTransportException("Could not load remote transport layer " + fqn, problem) + case Right(remote) ⇒ remote + } + } - val log = Logging(eventStream, "RemoteActorRefProvider(" + remote.remoteAddress + ")") + val log = Logging(eventStream, "RemoteActorRefProvider(" + transport.address + ")") - val rootPath: ActorPath = RootActorPath(remote.remoteAddress) + val rootPath: ActorPath = RootActorPath(transport.address) private val local = new LocalActorRefProvider(systemName, settings, eventStream, scheduler, _deadLetters, rootPath, deployer) + val failureDetector = new AccrualFailureDetector(remoteSettings.FailureDetectorThreshold, remoteSettings.FailureDetectorMaxSampleSize) + + @volatile + private var _serialization: Serialization = _ + def serialization = _serialization + + @volatile + private var _remoteDaemon: InternalActorRef = _ + def remoteDaemon = _remoteDaemon + + @volatile + private var _networkEventStream: NetworkEventStream = _ + def networkEventStream = _networkEventStream + val deathWatch = new RemoteDeathWatch(local.deathWatch, this) def init(system: ActorSystemImpl) { local.init(system) - remote.init(system, this) - local.registerExtraNames(Map(("remote", remote.remoteDaemon))) - terminationFuture.onComplete(_ ⇒ remote.transport.shutdown()) + + _remoteDaemon = new RemoteSystemDaemon(system, transport.address, rootPath / "remote", rootGuardian, log) + _serialization = SerializationExtension(system) + + transport.start(system, this) + + val remoteClientLifeCycleHandler = system.systemActorOf(Props(new Actor { + def receive = { + case RemoteClientError(cause, remote, address) ⇒ remote.shutdownClientConnection(address) + case RemoteClientDisconnected(remote, address) ⇒ remote.shutdownClientConnection(address) + case _ ⇒ //ignore other + } + }), "RemoteClientLifeCycleListener") + + _networkEventStream = new NetworkEventStream(system) + + system.eventStream.subscribe(networkEventStream.sender, classOf[RemoteLifeCycleEvent]) + system.eventStream.subscribe(remoteClientLifeCycleHandler, classOf[RemoteLifeCycleEvent]) + + local.registerExtraNames(Map(("remote", remoteDaemon))) + terminationFuture.onComplete(_ ⇒ transport.shutdown()) } def actorOf(system: ActorSystemImpl, props: Props, supervisor: InternalActorRef, path: ActorPath, systemService: Boolean, deploy: Option[Deploy]): InternalActorRef = { @@ -102,18 +147,12 @@ class RemoteActorRefProvider( }) deployment match { - case Some(Deploy(_, _, _, RemoteScope(address))) ⇒ - // FIXME RK this should be done within the deployer, i.e. the whole parsing business - address.parse(remote.transports) match { - case Left(x) ⇒ - throw new ConfigurationException("cannot parse remote address: " + x) - case Right(addr) ⇒ - if (addr == rootPath.address) local.actorOf(system, props, supervisor, path, false, deployment) - else { - val rpath = RootActorPath(addr) / "remote" / rootPath.address.hostPort / path.elements - useActorOnNode(rpath, props.creator, supervisor) - new RemoteActorRef(this, remote.transport, rpath, supervisor, None) - } + case Some(Deploy(_, _, _, RemoteScope(addr))) ⇒ + if (addr == rootPath.address) local.actorOf(system, props, supervisor, path, false, deployment) + else { + val rpath = RootActorPath(addr) / "remote" / rootPath.address.hostPort / path.elements + useActorOnNode(rpath, props.creator, supervisor) + new RemoteActorRef(this, transport, rpath, supervisor, None) } case _ ⇒ local.actorOf(system, props, supervisor, path, systemService, deployment) @@ -123,14 +162,13 @@ class RemoteActorRefProvider( def actorFor(path: ActorPath): InternalActorRef = path.root match { case `rootPath` ⇒ actorFor(rootGuardian, path.elements) - case RootActorPath(_: RemoteSystemAddress[_], _) ⇒ new RemoteActorRef(this, remote.transport, path, Nobody, None) - case _ ⇒ local.actorFor(path) + case _ ⇒ new RemoteActorRef(this, transport, path, Nobody, None) } def actorFor(ref: InternalActorRef, path: String): InternalActorRef = path match { - case ParsedActorPath(address, elems) ⇒ + case ActorPathExtractor(address, elems) ⇒ if (address == rootPath.address) actorFor(rootGuardian, elems) - else new RemoteActorRef(this, remote.transport, new RootActorPath(address) / elems, Nobody, None) + else new RemoteActorRef(this, transport, new RootActorPath(address) / elems, Nobody, None) case _ ⇒ local.actorFor(ref, path) } @@ -159,7 +197,7 @@ trait RemoteRef extends ActorRefScope { */ private[akka] class RemoteActorRef private[akka] ( provider: RemoteActorRefProvider, - remote: RemoteSupport[ParsedTransportAddress], + remote: RemoteTransport, val path: ActorPath, val getParent: InternalActorRef, loader: Option[ClassLoader]) diff --git a/akka-remote/src/main/scala/akka/remote/RemoteAddress.scala b/akka-remote/src/main/scala/akka/remote/RemoteAddress.scala new file mode 100644 index 0000000000..f7274c2356 --- /dev/null +++ b/akka-remote/src/main/scala/akka/remote/RemoteAddress.scala @@ -0,0 +1,5 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.remote + diff --git a/akka-remote/src/main/scala/akka/remote/RemoteConnectionManager.scala b/akka-remote/src/main/scala/akka/remote/RemoteConnectionManager.scala index 404bf98c4c..fc65734822 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteConnectionManager.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteConnectionManager.scala @@ -19,14 +19,14 @@ import java.util.concurrent.atomic.AtomicReference */ class RemoteConnectionManager( system: ActorSystemImpl, - remote: Remote, - initialConnections: Map[ParsedTransportAddress, ActorRef] = Map.empty[ParsedTransportAddress, ActorRef]) + remote: RemoteActorRefProvider, + initialConnections: Map[Address, ActorRef] = Map.empty[Address, ActorRef]) extends ConnectionManager { val log = Logging(system, "RemoteConnectionManager") // FIXME is this VersionedIterable really needed? It is not used I think. Complicates API. See 'def connections' etc. - case class State(version: Long, connections: Map[ParsedTransportAddress, ActorRef]) + case class State(version: Long, connections: Map[Address, ActorRef]) extends VersionedIterable[ActorRef] { def iterable: Iterable[ActorRef] = connections.values } @@ -52,7 +52,7 @@ class RemoteConnectionManager( def size: Int = connections.connections.size - def connectionFor(address: ParsedTransportAddress): Option[ActorRef] = connections.connections.get(address) + def connectionFor(address: Address): Option[ActorRef] = connections.connections.get(address) def isEmpty: Boolean = connections.connections.isEmpty @@ -61,7 +61,7 @@ class RemoteConnectionManager( } @tailrec - final def failOver(from: ParsedTransportAddress, to: ParsedTransportAddress) { + final def failOver(from: Address, to: Address) { log.debug("Failing over connection from [{}] to [{}]", from, to) val oldState = state.get @@ -92,8 +92,8 @@ class RemoteConnectionManager( val oldState = state.get() var changed = false - var faultyAddress: ParsedTransportAddress = null - var newConnections = Map.empty[ParsedTransportAddress, ActorRef] + var faultyAddress: Address = null + var newConnections = Map.empty[Address, ActorRef] oldState.connections.keys foreach { address ⇒ val actorRef: ActorRef = oldState.connections.get(address).get @@ -119,7 +119,7 @@ class RemoteConnectionManager( } @tailrec - final def putIfAbsent(address: ParsedTransportAddress, newConnectionFactory: () ⇒ ActorRef): ActorRef = { + final def putIfAbsent(address: Address, newConnectionFactory: () ⇒ ActorRef): ActorRef = { val oldState = state.get() val oldConnections = oldState.connections @@ -146,6 +146,6 @@ class RemoteConnectionManager( } } - private[remote] def newConnection(remoteAddress: ParsedTransportAddress, actorPath: ActorPath) = - new RemoteActorRef(remote.provider, remote.transport, actorPath, Nobody, None) + private[remote] def newConnection(remoteAddress: Address, actorPath: ActorPath) = + new RemoteActorRef(remote, remote.transport, actorPath, Nobody, None) } diff --git a/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala b/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala new file mode 100644 index 0000000000..81088b5000 --- /dev/null +++ b/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala @@ -0,0 +1,80 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ + +package akka.remote + +import scala.annotation.tailrec + +import akka.actor.{ VirtualPathContainer, Terminated, Props, Nobody, LocalActorRef, InternalActorRef, Address, ActorSystemImpl, ActorRef, ActorPathExtractor, ActorPath, Actor } +import akka.event.LoggingAdapter + +sealed trait DaemonMsg +case class DaemonMsgCreate(factory: () ⇒ Actor, path: String, supervisor: ActorRef) extends DaemonMsg +case class DaemonMsgWatch(watcher: ActorRef, watched: ActorRef) extends DaemonMsg + +/** + * Internal system "daemon" actor for remote internal communication. + * + * It acts as the brain of the remote that responds to system remote events (messages) and undertakes action. + */ +class RemoteSystemDaemon(system: ActorSystemImpl, address: Address, _path: ActorPath, _parent: InternalActorRef, _log: LoggingAdapter) + extends VirtualPathContainer(_path, _parent, _log) { + + /** + * Find the longest matching path which we know about and return that ref + * (or ask that ref to continue searching if elements are left). + */ + override def getChild(names: Iterator[String]): InternalActorRef = { + + @tailrec + def rec(s: String, n: Int): (InternalActorRef, Int) = { + getChild(s) match { + case null ⇒ + val last = s.lastIndexOf('/') + if (last == -1) (Nobody, n) + else rec(s.substring(0, last), n + 1) + case ref ⇒ (ref, n) + } + } + + val full = Vector() ++ names + rec(full.mkString("/"), 0) match { + case (Nobody, _) ⇒ Nobody + case (ref, 0) ⇒ ref + case (ref, n) ⇒ ref.getChild(full.takeRight(n).iterator) + } + } + + override def !(msg: Any)(implicit sender: ActorRef = null): Unit = msg match { + case message: DaemonMsg ⇒ + log.debug("Received command [{}] to RemoteSystemDaemon on [{}]", message, path.address) + message match { + case DaemonMsgCreate(factory, path, supervisor) ⇒ + path match { + case ActorPathExtractor(`address`, elems) if elems.nonEmpty && elems.head == "remote" ⇒ + // TODO RK canonicalize path so as not to duplicate it always #1446 + val subpath = elems.drop(1) + val path = this.path / subpath + val actor = system.provider.actorOf(system, + Props(creator = factory), + supervisor.asInstanceOf[InternalActorRef], + path, true, None) + addChild(subpath.mkString("/"), actor) + system.deathWatch.subscribe(this, actor) + case _ ⇒ + log.error("remote path does not match path from message [{}]", message) + } + case DaemonMsgWatch(watcher, watched) ⇒ + val other = system.actorFor(watcher.path.root / "remote") + system.deathWatch.subscribe(other, watched) + } + + case Terminated(child: LocalActorRef) ⇒ removeChild(child.path.elements.drop(1).mkString("/")) + + case t: Terminated ⇒ system.deathWatch.publish(t) + + case unknown ⇒ log.warning("Unknown message {} received by {}", unknown, this) + } + +} diff --git a/akka-remote/src/main/scala/akka/remote/RemoteDeployer.scala b/akka-remote/src/main/scala/akka/remote/RemoteDeployer.scala index de3e0825ff..63eaae0632 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteDeployer.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteDeployer.scala @@ -8,7 +8,7 @@ import akka.routing._ import com.typesafe.config._ import akka.config.ConfigurationException -case class RemoteScope(node: UnparsedSystemAddress[UnparsedTransportAddress]) extends Scope +case class RemoteScope(node: Address) extends Scope class RemoteDeployer(_settings: ActorSystem.Settings) extends Deployer(_settings) { @@ -18,7 +18,7 @@ class RemoteDeployer(_settings: ActorSystem.Settings) extends Deployer(_settings super.parseConfig(path, config) match { case d @ Some(deploy) ⇒ deploy.config.getString("remote") match { - case RemoteAddressExtractor(r) ⇒ Some(deploy.copy(scope = RemoteScope(r))) + case AddressExtractor(r) ⇒ Some(deploy.copy(scope = RemoteScope(r))) case str ⇒ if (!str.isEmpty) throw new ConfigurationException("unparseable remote node name " + str) val nodes = deploy.config.getStringList("target.nodes").asScala diff --git a/akka-remote/src/main/scala/akka/remote/RemoteInterface.scala b/akka-remote/src/main/scala/akka/remote/RemoteInterface.scala deleted file mode 100644 index a5474ce427..0000000000 --- a/akka-remote/src/main/scala/akka/remote/RemoteInterface.scala +++ /dev/null @@ -1,343 +0,0 @@ -/** - * Copyright (C) 2009-2010 Typesafe Inc. - */ - -package akka.remote - -import akka.actor._ -import akka.AkkaException -import scala.reflect.BeanProperty -import java.io.{ PrintWriter, PrintStream } -import java.net.InetSocketAddress -import java.net.URI -import java.net.URISyntaxException -import java.net.InetAddress -import java.net.UnknownHostException -import java.net.UnknownServiceException -import akka.event.Logging - -/** - * Interface for remote transports to encode their addresses. The three parts - * are named according to the URI spec (precisely java.net.URI) which is used - * for parsing. That means that the address’ parts must conform to what an - * URI expects, but otherwise each transport may assign a different meaning - * to these parts. - */ -trait RemoteTransportAddress { - def protocol: String - def host: String - def port: Int -} - -trait ParsedTransportAddress extends RemoteTransportAddress - -case class RemoteNettyAddress(host: String, ip: Option[InetAddress], port: Int) extends ParsedTransportAddress { - def protocol = "akka" - - override def toString(): String = "akka://" + host + ":" + port -} - -object RemoteNettyAddress { - def apply(host: String, port: Int): RemoteNettyAddress = { - // TODO ticket #1639 - val ip = try Some(InetAddress.getByName(host)) catch { case _: UnknownHostException ⇒ None } - new RemoteNettyAddress(host, ip, port) - } - def apply(s: String): RemoteNettyAddress = { - val RE = """([^:]+):(\d+)""".r - s match { - case RE(h, p) ⇒ apply(h, Integer.parseInt(p)) - case _ ⇒ throw new IllegalArgumentException("cannot parse " + s + " as ") - } - } -} - -case class UnparsedTransportAddress(protocol: String, host: String, port: Int) extends RemoteTransportAddress { - def parse(transports: TransportsMap): RemoteTransportAddress = - transports.get(protocol) - .map(_(host, port)) - .toRight("protocol " + protocol + " not known") - .joinRight.fold(UnparseableTransportAddress(protocol, host, port, _), identity) -} - -case class UnparseableTransportAddress(protocol: String, host: String, port: Int, error: String) extends RemoteTransportAddress - -case class RemoteSystemAddress[+T <: ParsedTransportAddress](system: String, transport: T) extends Address { - def protocol = transport.protocol - @transient - lazy val hostPort = system + "@" + transport.host + ":" + transport.port -} - -case class UnparsedSystemAddress[+T <: RemoteTransportAddress](system: Option[String], transport: T) { - def parse(transports: TransportsMap): Either[UnparsedSystemAddress[UnparseableTransportAddress], RemoteSystemAddress[ParsedTransportAddress]] = - system match { - case Some(sys) ⇒ - transport match { - case x: ParsedTransportAddress ⇒ Right(RemoteSystemAddress(sys, x)) - case y: UnparsedTransportAddress ⇒ - y.parse(transports) match { - case x: ParsedTransportAddress ⇒ Right(RemoteSystemAddress(sys, x)) - case y: UnparseableTransportAddress ⇒ Left(UnparsedSystemAddress(system, y)) - case z ⇒ Left(UnparsedSystemAddress(system, UnparseableTransportAddress(z.protocol, z.host, z.port, "cannot parse " + z))) - } - case z ⇒ Left(UnparsedSystemAddress(system, UnparseableTransportAddress(z.protocol, z.host, z.port, "cannot parse " + z))) - } - case None ⇒ Left(UnparsedSystemAddress(None, UnparseableTransportAddress(transport.protocol, transport.host, transport.port, "no system name specified"))) - } -} - -object RemoteAddressExtractor { - def unapply(s: String): Option[UnparsedSystemAddress[UnparsedTransportAddress]] = { - try { - val uri = new URI(s) - if (uri.getScheme == null || uri.getHost == null || uri.getPort == -1) None - else Some(UnparsedSystemAddress(Option(uri.getUserInfo), UnparsedTransportAddress(uri.getScheme, uri.getHost, uri.getPort))) - } catch { - case _: URISyntaxException ⇒ None - } - } -} - -object RemoteActorPath { - def unapply(addr: String): Option[(UnparsedSystemAddress[UnparsedTransportAddress], Iterable[String])] = { - try { - val uri = new URI(addr) - if (uri.getScheme == null || uri.getUserInfo == null || uri.getHost == null || uri.getPort == -1 || uri.getPath == null) None - else Some(UnparsedSystemAddress(Some(uri.getUserInfo), UnparsedTransportAddress(uri.getScheme, uri.getHost, uri.getPort)), - ActorPath.split(uri.getPath).drop(1)) - } catch { - case _: URISyntaxException ⇒ None - } - } -} - -object ParsedActorPath { - def unapply(addr: String)(implicit transports: TransportsMap): Option[(RemoteSystemAddress[ParsedTransportAddress], Iterable[String])] = { - try { - val uri = new URI(addr) - if (uri.getScheme == null || uri.getUserInfo == null || uri.getHost == null || uri.getPort == -1 || uri.getPath == null) None - else - UnparsedSystemAddress(Some(uri.getUserInfo), UnparsedTransportAddress(uri.getScheme, uri.getHost, uri.getPort)).parse(transports) match { - case Left(_) ⇒ None - case Right(x) ⇒ Some(x, ActorPath.split(uri.getPath).drop(1)) - } - } catch { - case _: URISyntaxException ⇒ None - } - } -} - -class RemoteException(message: String) extends AkkaException(message) - -trait RemoteModule { - protected[akka] def notifyListeners(message: RemoteLifeCycleEvent): Unit -} - -/** - * Remote life-cycle events. - */ -sealed trait RemoteLifeCycleEvent { - def logLevel: Logging.LogLevel -} - -/** - * Life-cycle events for RemoteClient. - */ -trait RemoteClientLifeCycleEvent extends RemoteLifeCycleEvent { - def remoteAddress: ParsedTransportAddress -} - -case class RemoteClientError[T <: ParsedTransportAddress]( - @BeanProperty cause: Throwable, - @BeanProperty remote: RemoteSupport[T], - @BeanProperty remoteAddress: T) extends RemoteClientLifeCycleEvent { - override def logLevel = Logging.ErrorLevel - override def toString = - "RemoteClientError@" + - remoteAddress + - ": Error[" + - (if (cause ne null) cause.getClass.getName + ": " + cause.getMessage else "unknown") + - "]" -} - -case class RemoteClientDisconnected[T <: ParsedTransportAddress]( - @BeanProperty remote: RemoteSupport[T], - @BeanProperty remoteAddress: T) extends RemoteClientLifeCycleEvent { - override def logLevel = Logging.DebugLevel - override def toString = - "RemoteClientDisconnected@" + remoteAddress -} - -case class RemoteClientConnected[T <: ParsedTransportAddress]( - @BeanProperty remote: RemoteSupport[T], - @BeanProperty remoteAddress: T) extends RemoteClientLifeCycleEvent { - override def logLevel = Logging.DebugLevel - override def toString = - "RemoteClientConnected@" + remoteAddress -} - -case class RemoteClientStarted[T <: ParsedTransportAddress]( - @BeanProperty remote: RemoteSupport[T], - @BeanProperty remoteAddress: T) extends RemoteClientLifeCycleEvent { - override def logLevel = Logging.InfoLevel - override def toString = - "RemoteClientStarted@" + remoteAddress -} - -case class RemoteClientShutdown[T <: ParsedTransportAddress]( - @BeanProperty remote: RemoteSupport[T], - @BeanProperty remoteAddress: T) extends RemoteClientLifeCycleEvent { - override def logLevel = Logging.InfoLevel - override def toString = - "RemoteClientShutdown@" + remoteAddress -} - -case class RemoteClientWriteFailed[T <: ParsedTransportAddress]( - @BeanProperty request: AnyRef, - @BeanProperty cause: Throwable, - @BeanProperty remote: RemoteSupport[T], - @BeanProperty remoteAddress: T) extends RemoteClientLifeCycleEvent { - override def logLevel = Logging.WarningLevel - override def toString = - "RemoteClientWriteFailed@" + - remoteAddress + - ": MessageClass[" + - (if (request ne null) request.getClass.getName else "no message") + - "] Error[" + - (if (cause ne null) cause.getClass.getName + ": " + cause.getMessage else "unknown") + - "]" -} - -/** - * Life-cycle events for RemoteServer. - */ -trait RemoteServerLifeCycleEvent extends RemoteLifeCycleEvent - -case class RemoteServerStarted[T <: ParsedTransportAddress]( - @BeanProperty remote: RemoteSupport[T]) extends RemoteServerLifeCycleEvent { - override def logLevel = Logging.InfoLevel - override def toString = - "RemoteServerStarted@" + remote.name -} - -case class RemoteServerShutdown[T <: ParsedTransportAddress]( - @BeanProperty remote: RemoteSupport[T]) extends RemoteServerLifeCycleEvent { - override def logLevel = Logging.InfoLevel - override def toString = - "RemoteServerShutdown@" + remote.name -} - -case class RemoteServerError[T <: ParsedTransportAddress]( - @BeanProperty val cause: Throwable, - @BeanProperty remote: RemoteSupport[T]) extends RemoteServerLifeCycleEvent { - override def logLevel = Logging.ErrorLevel - override def toString = - "RemoteServerError@" + - remote.name + - ": Error[" + - (if (cause ne null) cause.getClass.getName + ": " + cause.getMessage else "unknown") + - "]" -} - -case class RemoteServerClientConnected[T <: ParsedTransportAddress]( - @BeanProperty remote: RemoteSupport[T], - @BeanProperty val clientAddress: Option[T]) extends RemoteServerLifeCycleEvent { - override def logLevel = Logging.DebugLevel - override def toString = - "RemoteServerClientConnected@" + - remote.name + - ": Client[" + - (if (clientAddress.isDefined) clientAddress.get else "no address") + - "]" -} - -case class RemoteServerClientDisconnected[T <: ParsedTransportAddress]( - @BeanProperty remote: RemoteSupport[T], - @BeanProperty val clientAddress: Option[T]) extends RemoteServerLifeCycleEvent { - override def logLevel = Logging.DebugLevel - override def toString = - "RemoteServerClientDisconnected@" + - remote.name + - ": Client[" + - (if (clientAddress.isDefined) clientAddress.get else "no address") + - "]" -} - -case class RemoteServerClientClosed[T <: ParsedTransportAddress]( - @BeanProperty remote: RemoteSupport[T], - @BeanProperty val clientAddress: Option[T]) extends RemoteServerLifeCycleEvent { - override def logLevel = Logging.DebugLevel - override def toString = - "RemoteServerClientClosed@" + - remote.name + - ": Client[" + - (if (clientAddress.isDefined) clientAddress.get else "no address") + - "]" -} - -case class RemoteServerWriteFailed[T <: ParsedTransportAddress]( - @BeanProperty request: AnyRef, - @BeanProperty cause: Throwable, - @BeanProperty remote: RemoteSupport[T], - @BeanProperty remoteAddress: Option[T]) extends RemoteServerLifeCycleEvent { - override def logLevel = Logging.WarningLevel - override def toString = - "RemoteServerWriteFailed@" + - remote + - ": ClientAddress[" + - remoteAddress + - "] MessageClass[" + - (if (request ne null) request.getClass.getName else "no message") + - "] Error[" + - (if (cause ne null) cause.getClass.getName + ": " + cause.getMessage else "unknown") + - "]" -} - -/** - * Thrown for example when trying to send a message using a RemoteClient that is either not started or shut down. - */ -class RemoteClientException[T <: ParsedTransportAddress] private[akka] ( - message: String, - @BeanProperty val client: RemoteSupport[T], - val remoteAddress: T, cause: Throwable = null) extends AkkaException(message, cause) - -abstract class RemoteSupport[-T <: ParsedTransportAddress](val system: ActorSystemImpl) { - /** - * Shuts down the remoting - */ - def shutdown(): Unit - - /** - * Gets the name of the server instance - */ - def name: String - - /** - * Starts up the remoting - */ - def start(loader: Option[ClassLoader]): Unit - - /** - * Shuts down a specific client connected to the supplied remote address returns true if successful - */ - def shutdownClientConnection(address: T): Boolean - - /** - * Restarts a specific client connected to the supplied remote address, but only if the client is not shut down - */ - def restartClientConnection(address: T): Boolean - - /** Methods that needs to be implemented by a transport **/ - - protected[akka] def send(message: Any, - senderOption: Option[ActorRef], - recipient: RemoteActorRef, - loader: Option[ClassLoader]): Unit - - protected[akka] def notifyListeners(message: RemoteLifeCycleEvent): Unit = { - system.eventStream.publish(message) - system.log.log(message.logLevel, "REMOTE: {}", message) - } - - override def toString = name -} diff --git a/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala b/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala index eae0741844..b2e09b5e13 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala @@ -10,85 +10,28 @@ import java.net.InetAddress import akka.config.ConfigurationException import com.eaio.uuid.UUID import scala.collection.JavaConverters._ +import akka.actor.Address +import akka.actor.AddressExtractor class RemoteSettings(val config: Config, val systemName: String) { import config._ val RemoteTransport = getString("akka.remote.transport") - val Daemonic = getBoolean("akka.remote.daemonic") + + // AccrualFailureDetector val FailureDetectorThreshold = getInt("akka.remote.failure-detector.threshold") val FailureDetectorMaxSampleSize = getInt("akka.remote.failure-detector.max-sample-size") - val ShouldCompressData = getBoolean("akka.remote.use-compression") + + // Gossiper val RemoteSystemDaemonAckTimeout = Duration(getMilliseconds("akka.remote.remote-daemon-ack-timeout"), MILLISECONDS) - val InitalDelayForGossip = Duration(getMilliseconds("akka.remote.gossip.initialDelay"), MILLISECONDS) + val InitialDelayForGossip = Duration(getMilliseconds("akka.remote.gossip.initialDelay"), MILLISECONDS) val GossipFrequency = Duration(getMilliseconds("akka.remote.gossip.frequency"), MILLISECONDS) - val BackoffTimeout = Duration(getMilliseconds("akka.remote.backoff-timeout"), MILLISECONDS) - // TODO cluster config will go into akka-cluster/reference.conf when we enable that module - val SeedNodes = Set.empty[RemoteNettyAddress] ++ getStringList("akka.cluster.seed-nodes").asScala.collect { - case RemoteAddressExtractor(addr) ⇒ addr.transport + val SeedNodes = Set.empty[Address] ++ getStringList("akka.cluster.seed-nodes").asScala.collect { + case AddressExtractor(addr) ⇒ addr } - val serverSettings = new RemoteServerSettings - val clientSettings = new RemoteClientSettings + val UntrustedMode = getBoolean("akka.remote.untrusted-mode") - class RemoteClientSettings { - val SecureCookie: Option[String] = getString("akka.remote.secure-cookie") match { - case "" ⇒ None - case cookie ⇒ Some(cookie) - } - - val ReconnectionTimeWindow = Duration(getMilliseconds("akka.remote.client.reconnection-time-window"), MILLISECONDS) - val ReadTimeout = Duration(getMilliseconds("akka.remote.client.read-timeout"), MILLISECONDS) - val ReconnectDelay = Duration(getMilliseconds("akka.remote.client.reconnect-delay"), MILLISECONDS) - val MessageFrameSize = getBytes("akka.remote.client.message-frame-size").toInt - } - - class RemoteServerSettings { - import scala.collection.JavaConverters._ - val MessageFrameSize = getBytes("akka.remote.server.message-frame-size").toInt - val SecureCookie: Option[String] = getString("akka.remote.secure-cookie") match { - case "" ⇒ None - case cookie ⇒ Some(cookie) - } - val RequireCookie = { - val requireCookie = getBoolean("akka.remote.server.require-cookie") - if (requireCookie && SecureCookie.isEmpty) throw new ConfigurationException( - "Configuration option 'akka.remote.server.require-cookie' is turned on but no secure cookie is defined in 'akka.remote.secure-cookie'.") - requireCookie - } - - val UsePassiveConnections = getBoolean("akka.remote.use-passive-connections") - - val UntrustedMode = getBoolean("akka.remote.server.untrusted-mode") - val Hostname = getString("akka.remote.server.hostname") match { - case "" ⇒ InetAddress.getLocalHost.getHostAddress - case value ⇒ value - } - val Port = getInt("akka.remote.server.port") - val ConnectionTimeout = Duration(getMilliseconds("akka.remote.server.connection-timeout"), MILLISECONDS) - - val Backlog = getInt("akka.remote.server.backlog") - - val ExecutionPoolKeepAlive = Duration(getMilliseconds("akka.remote.server.execution-pool-keepalive"), MILLISECONDS) - - val ExecutionPoolSize = getInt("akka.remote.server.execution-pool-size") match { - case sz if sz < 1 ⇒ throw new IllegalArgumentException("akka.remote.server.execution-pool-size is less than 1") - case sz ⇒ sz - } - - val MaxChannelMemorySize = getBytes("akka.remote.server.max-channel-memory-size") match { - case sz if sz < 0 ⇒ throw new IllegalArgumentException("akka.remote.server.max-channel-memory-size is less than 0 bytes") - case sz ⇒ sz - } - - val MaxTotalMemorySize = getBytes("akka.remote.server.max-total-memory-size") match { - case sz if sz < 0 ⇒ throw new IllegalArgumentException("akka.remote.server.max-total-memory-size is less than 0 bytes") - case sz ⇒ sz - } - - // TODO handle the system name right and move this to config file syntax - val URI = "akka://sys@" + Hostname + ":" + Port - } } \ No newline at end of file diff --git a/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala b/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala new file mode 100644 index 0000000000..19fd6c1c43 --- /dev/null +++ b/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala @@ -0,0 +1,321 @@ +/** + * Copyright (C) 2009-2010 Typesafe Inc. + */ + +package akka.remote + +import scala.reflect.BeanProperty + +import akka.actor.{ Terminated, LocalRef, InternalActorRef, AutoReceivedMessage, AddressExtractor, Address, ActorSystemImpl, ActorSystem, ActorRef } +import akka.dispatch.SystemMessage +import akka.event.{ LoggingAdapter, Logging } +import akka.remote.RemoteProtocol.{ RemoteMessageProtocol, RemoteControlProtocol, AkkaRemoteProtocol, ActorRefProtocol } +import akka.AkkaException + +/** + * Remote life-cycle events. + */ +sealed trait RemoteLifeCycleEvent { + def logLevel: Logging.LogLevel +} + +/** + * Life-cycle events for RemoteClient. + */ +trait RemoteClientLifeCycleEvent extends RemoteLifeCycleEvent { + def remoteAddress: Address +} + +case class RemoteClientError( + @BeanProperty cause: Throwable, + @BeanProperty remote: RemoteTransport, + @BeanProperty remoteAddress: Address) extends RemoteClientLifeCycleEvent { + override def logLevel = Logging.ErrorLevel + override def toString = + "RemoteClientError@" + + remoteAddress + + ": Error[" + + (if (cause ne null) cause.getClass.getName + ": " + cause.getMessage else "unknown") + + "]" +} + +case class RemoteClientDisconnected( + @BeanProperty remote: RemoteTransport, + @BeanProperty remoteAddress: Address) extends RemoteClientLifeCycleEvent { + override def logLevel = Logging.DebugLevel + override def toString = + "RemoteClientDisconnected@" + remoteAddress +} + +case class RemoteClientConnected( + @BeanProperty remote: RemoteTransport, + @BeanProperty remoteAddress: Address) extends RemoteClientLifeCycleEvent { + override def logLevel = Logging.DebugLevel + override def toString = + "RemoteClientConnected@" + remoteAddress +} + +case class RemoteClientStarted( + @BeanProperty remote: RemoteTransport, + @BeanProperty remoteAddress: Address) extends RemoteClientLifeCycleEvent { + override def logLevel = Logging.InfoLevel + override def toString = + "RemoteClientStarted@" + remoteAddress +} + +case class RemoteClientShutdown( + @BeanProperty remote: RemoteTransport, + @BeanProperty remoteAddress: Address) extends RemoteClientLifeCycleEvent { + override def logLevel = Logging.InfoLevel + override def toString = + "RemoteClientShutdown@" + remoteAddress +} + +case class RemoteClientWriteFailed( + @BeanProperty request: AnyRef, + @BeanProperty cause: Throwable, + @BeanProperty remote: RemoteTransport, + @BeanProperty remoteAddress: Address) extends RemoteClientLifeCycleEvent { + override def logLevel = Logging.WarningLevel + override def toString = + "RemoteClientWriteFailed@" + + remoteAddress + + ": MessageClass[" + + (if (request ne null) request.getClass.getName else "no message") + + "] Error[" + + (if (cause ne null) cause.getClass.getName + ": " + cause.getMessage else "unknown") + + "]" +} + +/** + * Life-cycle events for RemoteServer. + */ +trait RemoteServerLifeCycleEvent extends RemoteLifeCycleEvent + +case class RemoteServerStarted( + @BeanProperty remote: RemoteTransport) extends RemoteServerLifeCycleEvent { + override def logLevel = Logging.InfoLevel + override def toString = + "RemoteServerStarted@" + remote +} + +case class RemoteServerShutdown( + @BeanProperty remote: RemoteTransport) extends RemoteServerLifeCycleEvent { + override def logLevel = Logging.InfoLevel + override def toString = + "RemoteServerShutdown@" + remote +} + +case class RemoteServerError( + @BeanProperty val cause: Throwable, + @BeanProperty remote: RemoteTransport) extends RemoteServerLifeCycleEvent { + override def logLevel = Logging.ErrorLevel + override def toString = + "RemoteServerError@" + + remote + + ": Error[" + + (if (cause ne null) cause.getClass.getName + ": " + cause.getMessage else "unknown") + + "]" +} + +case class RemoteServerClientConnected( + @BeanProperty remote: RemoteTransport, + @BeanProperty val clientAddress: Option[Address]) extends RemoteServerLifeCycleEvent { + override def logLevel = Logging.DebugLevel + override def toString = + "RemoteServerClientConnected@" + + remote + + ": Client[" + + (if (clientAddress.isDefined) clientAddress.get else "no address") + + "]" +} + +case class RemoteServerClientDisconnected( + @BeanProperty remote: RemoteTransport, + @BeanProperty val clientAddress: Option[Address]) extends RemoteServerLifeCycleEvent { + override def logLevel = Logging.DebugLevel + override def toString = + "RemoteServerClientDisconnected@" + + remote + + ": Client[" + + (if (clientAddress.isDefined) clientAddress.get else "no address") + + "]" +} + +case class RemoteServerClientClosed( + @BeanProperty remote: RemoteTransport, + @BeanProperty val clientAddress: Option[Address]) extends RemoteServerLifeCycleEvent { + override def logLevel = Logging.DebugLevel + override def toString = + "RemoteServerClientClosed@" + + remote + + ": Client[" + + (if (clientAddress.isDefined) clientAddress.get else "no address") + + "]" +} + +case class RemoteServerWriteFailed( + @BeanProperty request: AnyRef, + @BeanProperty cause: Throwable, + @BeanProperty remote: RemoteTransport, + @BeanProperty remoteAddress: Option[Address]) extends RemoteServerLifeCycleEvent { + override def logLevel = Logging.WarningLevel + override def toString = + "RemoteServerWriteFailed@" + + remote + + ": ClientAddress[" + + remoteAddress + + "] MessageClass[" + + (if (request ne null) request.getClass.getName else "no message") + + "] Error[" + + (if (cause ne null) cause.getClass.getName + ": " + cause.getMessage else "unknown") + + "]" +} + +/** + * Thrown for example when trying to send a message using a RemoteClient that is either not started or shut down. + */ +class RemoteClientException private[akka] ( + message: String, + @BeanProperty val client: RemoteTransport, + val remoteAddress: Address, cause: Throwable = null) extends AkkaException(message, cause) + +class RemoteTransportException(message: String, cause: Throwable) extends AkkaException(message, cause) + +abstract class RemoteTransport { + /** + * Shuts down the remoting + */ + def shutdown(): Unit + + /** + * Address to be used in RootActorPath of refs generated for this transport. + */ + def address: Address + + /** + * The actor system, for which this transport is instantiated. Will publish to its eventStream. + */ + def system: ActorSystem + + /** + * Starts up the remoting + */ + def start(system: ActorSystemImpl, provider: RemoteActorRefProvider): Unit + + /** + * Shuts down a specific client connected to the supplied remote address returns true if successful + */ + def shutdownClientConnection(address: Address): Boolean + + /** + * Restarts a specific client connected to the supplied remote address, but only if the client is not shut down + */ + def restartClientConnection(address: Address): Boolean + + /** Methods that needs to be implemented by a transport **/ + + protected[akka] def send(message: Any, + senderOption: Option[ActorRef], + recipient: RemoteActorRef, + loader: Option[ClassLoader]): Unit + + protected[akka] def notifyListeners(message: RemoteLifeCycleEvent): Unit = { + system.eventStream.publish(message) + system.log.log(message.logLevel, "REMOTE: {}", message) + } + + override def toString = address.toString +} + +class RemoteMessage(input: RemoteMessageProtocol, system: ActorSystemImpl, classLoader: Option[ClassLoader]) { + + def originalReceiver = input.getRecipient.getPath + + lazy val sender: ActorRef = + if (input.hasSender) system.provider.actorFor(system.provider.rootGuardian, input.getSender.getPath) + else system.deadLetters + + lazy val recipient: InternalActorRef = system.provider.actorFor(system.provider.rootGuardian, originalReceiver) + + lazy val payload: AnyRef = MessageSerializer.deserialize(system, input.getMessage, classLoader) + + override def toString = "RemoteMessage: " + payload + " to " + recipient + "<+{" + originalReceiver + "} from " + sender +} + +trait RemoteMarshallingOps { + + def log: LoggingAdapter + + def system: ActorSystemImpl + + def provider: RemoteActorRefProvider + + protected def useUntrustedMode: Boolean + + def createMessageSendEnvelope(rmp: RemoteMessageProtocol): AkkaRemoteProtocol = { + val arp = AkkaRemoteProtocol.newBuilder + arp.setMessage(rmp) + arp.build + } + + def createControlEnvelope(rcp: RemoteControlProtocol): AkkaRemoteProtocol = { + val arp = AkkaRemoteProtocol.newBuilder + arp.setInstruction(rcp) + arp.build + } + + /** + * Serializes the ActorRef instance into a Protocol Buffers (protobuf) Message. + */ + def toRemoteActorRefProtocol(actor: ActorRef): ActorRefProtocol = { + ActorRefProtocol.newBuilder.setPath(actor.path.toString).build + } + + def createRemoteMessageProtocolBuilder( + recipient: ActorRef, + message: Any, + senderOption: Option[ActorRef]): RemoteMessageProtocol.Builder = { + + val messageBuilder = RemoteMessageProtocol.newBuilder.setRecipient(toRemoteActorRefProtocol(recipient)) + messageBuilder.setMessage(MessageSerializer.serialize(system, message.asInstanceOf[AnyRef])) + + if (senderOption.isDefined) messageBuilder.setSender(toRemoteActorRefProtocol(senderOption.get)) + + messageBuilder + } + + def receiveMessage(remoteMessage: RemoteMessage) { + log.debug("received message {}", remoteMessage) + + val remoteDaemon = provider.remoteDaemon + + remoteMessage.recipient match { + case `remoteDaemon` ⇒ + remoteMessage.payload match { + case m @ (_: DaemonMsg | _: Terminated) ⇒ + try remoteDaemon ! m catch { + case e: Exception ⇒ log.error(e, "exception while processing remote command {} from {}", m, remoteMessage.sender) + } + case x ⇒ log.warning("remoteDaemon received illegal message {} from {}", x, remoteMessage.sender) + } + case l: LocalRef ⇒ + remoteMessage.payload match { + case msg: SystemMessage ⇒ + if (useUntrustedMode) + throw new SecurityException("RemoteModule server is operating is untrusted mode, can not send system message") + else l.sendSystemMessage(msg) + case _: AutoReceivedMessage if (useUntrustedMode) ⇒ + throw new SecurityException("RemoteModule server is operating is untrusted mode, can not pass on a AutoReceivedMessage to the remote actor") + case m ⇒ l.!(m)(remoteMessage.sender) + } + case r: RemoteActorRef ⇒ + remoteMessage.originalReceiver match { + case AddressExtractor(address) if address == provider.transport.address ⇒ + r.!(remoteMessage.payload)(remoteMessage.sender) + case r ⇒ log.error("dropping message {} for non-local recipient {}", remoteMessage.payload, r) + } + case r ⇒ log.error("dropping message {} for non-local recipient {}", remoteMessage.payload, r) + } + } +} diff --git a/akka-remote/src/main/scala/akka/remote/netty/Client.scala b/akka-remote/src/main/scala/akka/remote/netty/Client.scala new file mode 100644 index 0000000000..4a9d6607fd --- /dev/null +++ b/akka-remote/src/main/scala/akka/remote/netty/Client.scala @@ -0,0 +1,347 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ +package akka.remote.netty + +import java.net.InetSocketAddress +import org.jboss.netty.util.HashedWheelTimer +import org.jboss.netty.bootstrap.ClientBootstrap +import org.jboss.netty.channel.group.DefaultChannelGroup +import org.jboss.netty.channel.{ ChannelHandler, StaticChannelPipeline, SimpleChannelUpstreamHandler, MessageEvent, ExceptionEvent, ChannelStateEvent, ChannelPipelineFactory, ChannelPipeline, ChannelHandlerContext, ChannelFuture, Channel } +import org.jboss.netty.handler.codec.frame.{ LengthFieldPrepender, LengthFieldBasedFrameDecoder } +import org.jboss.netty.handler.execution.ExecutionHandler +import org.jboss.netty.handler.timeout.{ ReadTimeoutHandler, ReadTimeoutException } +import akka.remote.RemoteProtocol.{ RemoteControlProtocol, CommandType, AkkaRemoteProtocol } +import akka.remote.{ RemoteProtocol, RemoteMessage, RemoteLifeCycleEvent, RemoteClientStarted, RemoteClientShutdown, RemoteClientException, RemoteClientError, RemoteClientDisconnected, RemoteClientConnected } +import akka.actor.{ simpleName, Address } +import akka.AkkaException +import akka.event.Logging +import akka.util.Switch +import akka.actor.ActorRef +import org.jboss.netty.channel.ChannelFutureListener +import akka.remote.RemoteClientWriteFailed +import java.net.InetAddress +import org.jboss.netty.util.TimerTask +import org.jboss.netty.util.Timeout +import java.util.concurrent.TimeUnit + +class RemoteClientMessageBufferException(message: String, cause: Throwable) extends AkkaException(message, cause) { + def this(msg: String) = this(msg, null) +} + +/** + * This is the abstract baseclass for netty remote clients, currently there's only an + * ActiveRemoteClient, but others could be feasible, like a PassiveRemoteClient that + * reuses an already established connection. + */ +abstract class RemoteClient private[akka] ( + val netty: NettyRemoteTransport, + val remoteAddress: Address) { + + val log = Logging(netty.system, "RemoteClient") + + val name = simpleName(this) + "@" + remoteAddress + + private[remote] val runSwitch = new Switch() + + private[remote] def isRunning = runSwitch.isOn + + protected def currentChannel: Channel + + def connect(reconnectIfAlreadyConnected: Boolean = false): Boolean + + def shutdown(): Boolean + + def isBoundTo(address: Address): Boolean = remoteAddress == address + + /** + * Converts the message to the wireprotocol and sends the message across the wire + */ + def send(message: Any, senderOption: Option[ActorRef], recipient: ActorRef): Unit = if (isRunning) { + log.debug("Sending message {} from {} to {}", message, senderOption, recipient) + send((message, senderOption, recipient)) + } else { + val exception = new RemoteClientException("RemoteModule client is not running, make sure you have invoked 'RemoteClient.connect()' before using it.", netty, remoteAddress) + netty.notifyListeners(RemoteClientError(exception, netty, remoteAddress)) + throw exception + } + + /** + * Sends the message across the wire + */ + private def send(request: (Any, Option[ActorRef], ActorRef)): Unit = { + try { + val channel = currentChannel + val f = channel.write(request) + f.addListener( + new ChannelFutureListener { + def operationComplete(future: ChannelFuture) { + if (future.isCancelled || !future.isSuccess) { + netty.notifyListeners(RemoteClientWriteFailed(request, future.getCause, netty, remoteAddress)) + } + } + }) + // Check if we should back off + if (!channel.isWritable) { + val backoff = netty.settings.BackoffTimeout + if (backoff.length > 0 && !f.await(backoff.length, backoff.unit)) f.cancel() //Waited as long as we could, now back off + } + } catch { + case e: Exception ⇒ netty.notifyListeners(RemoteClientError(e, netty, remoteAddress)) + } + } + + override def toString = name +} + +/** + * RemoteClient represents a connection to an Akka node. Is used to send messages to remote actors on the node. + */ +class ActiveRemoteClient private[akka] ( + netty: NettyRemoteTransport, + remoteAddress: Address, + localAddress: Address, + val loader: Option[ClassLoader] = None) + extends RemoteClient(netty, remoteAddress) { + + import netty.settings + + //TODO rewrite to a wrapper object (minimize volatile access and maximize encapsulation) + @volatile + private var bootstrap: ClientBootstrap = _ + @volatile + private var connection: ChannelFuture = _ + @volatile + private[remote] var openChannels: DefaultChannelGroup = _ + @volatile + private var executionHandler: ExecutionHandler = _ + + @volatile + private var reconnectionTimeWindowStart = 0L + + def notifyListeners(msg: RemoteLifeCycleEvent): Unit = netty.notifyListeners(msg) + + def currentChannel = connection.getChannel + + /** + * Connect to remote server. + */ + def connect(reconnectIfAlreadyConnected: Boolean = false): Boolean = { + + def sendSecureCookie(connection: ChannelFuture) { + val handshake = RemoteControlProtocol.newBuilder.setCommandType(CommandType.CONNECT) + if (settings.SecureCookie.nonEmpty) handshake.setCookie(settings.SecureCookie.get) + handshake.setOrigin(RemoteProtocol.AddressProtocol.newBuilder + .setSystem(localAddress.system) + .setHostname(localAddress.host.get) + .setPort(localAddress.port.get) + .build) + connection.getChannel.write(netty.createControlEnvelope(handshake.build)) + } + + def attemptReconnect(): Boolean = { + val remoteIP = InetAddress.getByName(remoteAddress.host.get) + log.debug("Remote client reconnecting to [{}|{}]", remoteAddress, remoteIP) + connection = bootstrap.connect(new InetSocketAddress(remoteIP, remoteAddress.port.get)) + openChannels.add(connection.awaitUninterruptibly.getChannel) // Wait until the connection attempt succeeds or fails. + + if (!connection.isSuccess) { + notifyListeners(RemoteClientError(connection.getCause, netty, remoteAddress)) + false + } else { + sendSecureCookie(connection) + true + } + } + + runSwitch switchOn { + openChannels = new DefaultDisposableChannelGroup(classOf[RemoteClient].getName) + + executionHandler = new ExecutionHandler(netty.executor) + + bootstrap = new ClientBootstrap(netty.clientChannelFactory) + bootstrap.setPipelineFactory(new ActiveRemoteClientPipelineFactory(name, bootstrap, executionHandler, remoteAddress, this)) + bootstrap.setOption("tcpNoDelay", true) + bootstrap.setOption("keepAlive", true) + bootstrap.setOption("connectTimeoutMillis", settings.ConnectionTimeout.toMillis) + + val remoteIP = InetAddress.getByName(remoteAddress.host.get) + log.debug("Starting remote client connection to [{}|{}]", remoteAddress, remoteIP) + + connection = bootstrap.connect(new InetSocketAddress(remoteIP, remoteAddress.port.get)) + + openChannels.add(connection.awaitUninterruptibly.getChannel) // Wait until the connection attempt succeeds or fails. + + if (!connection.isSuccess) { + notifyListeners(RemoteClientError(connection.getCause, netty, remoteAddress)) + false + } else { + sendSecureCookie(connection) + notifyListeners(RemoteClientStarted(netty, remoteAddress)) + true + } + } match { + case true ⇒ true + case false if reconnectIfAlreadyConnected ⇒ + connection.getChannel.close() + openChannels.remove(connection.getChannel) + + log.debug("Remote client reconnecting to [{}]", remoteAddress) + attemptReconnect() + + case false ⇒ false + } + } + + // Please note that this method does _not_ remove the ARC from the NettyRemoteClientModule's map of clients + def shutdown() = runSwitch switchOff { + log.debug("Shutting down remote client [{}]", name) + + notifyListeners(RemoteClientShutdown(netty, remoteAddress)) + try { + if ((connection ne null) && (connection.getChannel ne null)) + connection.getChannel.close() + } finally { + try { + if (openChannels ne null) openChannels.close.awaitUninterruptibly() + } finally { + connection = null + executionHandler = null + } + } + + log.debug("[{}] has been shut down", name) + } + + private[akka] def isWithinReconnectionTimeWindow: Boolean = { + if (reconnectionTimeWindowStart == 0L) { + reconnectionTimeWindowStart = System.currentTimeMillis + true + } else { + val timeLeft = (settings.ReconnectionTimeWindow.toMillis - (System.currentTimeMillis - reconnectionTimeWindowStart)) > 0 + if (timeLeft) + log.info("Will try to reconnect to remote server for another [{}] milliseconds", timeLeft) + + timeLeft + } + } + + private[akka] def resetReconnectionTimeWindow = reconnectionTimeWindowStart = 0L +} + +@ChannelHandler.Sharable +class ActiveRemoteClientHandler( + val name: String, + val bootstrap: ClientBootstrap, + val remoteAddress: Address, + val timer: HashedWheelTimer, + val client: ActiveRemoteClient) + extends SimpleChannelUpstreamHandler { + + def runOnceNow(thunk: ⇒ Unit): Unit = timer.newTimeout(new TimerTask() { + def run(timeout: Timeout) = try { thunk } finally { timeout.cancel() } + }, 0, TimeUnit.MILLISECONDS) + + override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) { + try { + event.getMessage match { + case arp: AkkaRemoteProtocol if arp.hasInstruction ⇒ + val rcp = arp.getInstruction + rcp.getCommandType match { + case CommandType.SHUTDOWN ⇒ runOnceNow { client.netty.shutdownClientConnection(remoteAddress) } + case _ ⇒ //Ignore others + } + + case arp: AkkaRemoteProtocol if arp.hasMessage ⇒ + client.netty.receiveMessage(new RemoteMessage(arp.getMessage, client.netty.system, client.loader)) + + case other ⇒ + throw new RemoteClientException("Unknown message received in remote client handler: " + other, client.netty, client.remoteAddress) + } + } catch { + case e: Exception ⇒ client.notifyListeners(RemoteClientError(e, client.netty, client.remoteAddress)) + } + } + + override def channelClosed(ctx: ChannelHandlerContext, event: ChannelStateEvent) = client.runSwitch ifOn { + if (client.isWithinReconnectionTimeWindow) { + timer.newTimeout(new TimerTask() { + def run(timeout: Timeout) = + if (client.isRunning) { + client.openChannels.remove(event.getChannel) + client.connect(reconnectIfAlreadyConnected = true) + } + }, client.netty.settings.ReconnectDelay.toMillis, TimeUnit.MILLISECONDS) + } else runOnceNow { + client.netty.shutdownClientConnection(remoteAddress) // spawn in another thread + } + } + + override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { + try { + client.notifyListeners(RemoteClientConnected(client.netty, client.remoteAddress)) + client.resetReconnectionTimeWindow + } catch { + case e: Exception ⇒ client.notifyListeners(RemoteClientError(e, client.netty, client.remoteAddress)) + } + } + + override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { + client.notifyListeners(RemoteClientDisconnected(client.netty, client.remoteAddress)) + } + + override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = { + val cause = event.getCause + if (cause ne null) { + client.notifyListeners(RemoteClientError(cause, client.netty, client.remoteAddress)) + cause match { + case e: ReadTimeoutException ⇒ + runOnceNow { + client.netty.shutdownClientConnection(remoteAddress) // spawn in another thread + } + case e: Exception ⇒ event.getChannel.close() + } + + } else client.notifyListeners(RemoteClientError(new Exception("Unknown cause"), client.netty, client.remoteAddress)) + } +} + +class ActiveRemoteClientPipelineFactory( + name: String, + bootstrap: ClientBootstrap, + executionHandler: ExecutionHandler, + remoteAddress: Address, + client: ActiveRemoteClient) extends ChannelPipelineFactory { + + import client.netty.settings + + def getPipeline: ChannelPipeline = { + val timeout = new ReadTimeoutHandler(client.netty.timer, settings.ReadTimeout.length, settings.ReadTimeout.unit) + val lenDec = new LengthFieldBasedFrameDecoder(settings.MessageFrameSize, 0, 4, 0, 4) + val lenPrep = new LengthFieldPrepender(4) + val messageDec = new RemoteMessageDecoder + val messageEnc = new RemoteMessageEncoder(client.netty) + val remoteClient = new ActiveRemoteClientHandler(name, bootstrap, remoteAddress, client.netty.timer, client) + + new StaticChannelPipeline(timeout, lenDec, messageDec, lenPrep, messageEnc, executionHandler, remoteClient) + } +} + +class PassiveRemoteClient(val currentChannel: Channel, + netty: NettyRemoteTransport, + remoteAddress: Address) + extends RemoteClient(netty, remoteAddress) { + + def connect(reconnectIfAlreadyConnected: Boolean = false): Boolean = runSwitch switchOn { + netty.notifyListeners(RemoteClientStarted(netty, remoteAddress)) + log.debug("Starting remote client connection to [{}]", remoteAddress) + } + + def shutdown() = runSwitch switchOff { + log.debug("Shutting down remote client [{}]", name) + + netty.notifyListeners(RemoteClientShutdown(netty, remoteAddress)) + log.debug("[{}] has been shut down", name) + } +} + diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index db58b6b23d..4226f9dffc 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -4,394 +4,114 @@ package akka.remote.netty -import akka.actor.{ ActorRef, IllegalActorStateException, simpleName } -import akka.remote._ -import RemoteProtocol._ -import akka.util._ -import org.jboss.netty.channel.group.{ DefaultChannelGroup, ChannelGroup, ChannelGroupFuture } -import org.jboss.netty.channel.socket.nio.{ NioServerSocketChannelFactory, NioClientSocketChannelFactory } -import org.jboss.netty.bootstrap.{ ServerBootstrap, ClientBootstrap } -import org.jboss.netty.handler.codec.frame.{ LengthFieldBasedFrameDecoder, LengthFieldPrepender } -import org.jboss.netty.handler.codec.protobuf.{ ProtobufDecoder, ProtobufEncoder } -import org.jboss.netty.handler.timeout.{ ReadTimeoutHandler, ReadTimeoutException } -import org.jboss.netty.util.{ TimerTask, Timeout, HashedWheelTimer } +import java.net.{ UnknownHostException, InetAddress } +import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.locks.ReentrantReadWriteLock +import java.util.concurrent.Executors import scala.collection.mutable.HashMap -import java.net.InetSocketAddress -import java.util.concurrent.atomic._ -import akka.AkkaException -import akka.event.Logging -import org.jboss.netty.channel._ -import akka.actor.ActorSystemImpl -import org.jboss.netty.handler.execution.{ ExecutionHandler, OrderedMemoryAwareThreadPoolExecutor } -import java.util.concurrent._ -import locks.ReentrantReadWriteLock +import org.jboss.netty.channel.group.{ DefaultChannelGroup, ChannelGroupFuture } +import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory +import org.jboss.netty.channel.{ ChannelHandlerContext, ChannelFutureListener, ChannelFuture, Channel } +import org.jboss.netty.handler.codec.protobuf.{ ProtobufEncoder, ProtobufDecoder } +import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor +import org.jboss.netty.util.HashedWheelTimer +import akka.actor.{ ActorSystemImpl, ActorRef, simpleName } import akka.dispatch.MonitorableThreadFactory - -class RemoteClientMessageBufferException(message: String, cause: Throwable = null) extends AkkaException(message, cause) { - def this(msg: String) = this(msg, null) -} - -/** - * This is the abstract baseclass for netty remote clients, currently there's only an - * ActiveRemoteClient, but others could be feasible, like a PassiveRemoteClient that - * reuses an already established connection. - */ -abstract class RemoteClient private[akka] ( - val remoteSupport: NettyRemoteSupport, - val remoteAddress: RemoteNettyAddress) { - - val log = Logging(remoteSupport.system, "RemoteClient") - - val name = simpleName(this) + "@" + remoteAddress - - private[remote] val runSwitch = new Switch() - - private[remote] def isRunning = runSwitch.isOn - - protected def currentChannel: Channel - - def connect(reconnectIfAlreadyConnected: Boolean = false): Boolean - - def shutdown(): Boolean - - def isBoundTo(address: RemoteNettyAddress): Boolean = remoteAddress == address - - /** - * Converts the message to the wireprotocol and sends the message across the wire - */ - def send(message: Any, senderOption: Option[ActorRef], recipient: ActorRef): Unit = if (isRunning) { - log.debug("Sending message {} from {} to {}", message, senderOption, recipient) - send((message, senderOption, recipient)) - } else { - val exception = new RemoteClientException("RemoteModule client is not running, make sure you have invoked 'RemoteClient.connect()' before using it.", remoteSupport, remoteAddress) - remoteSupport.notifyListeners(RemoteClientError(exception, remoteSupport, remoteAddress)) - throw exception - } - - /** - * Sends the message across the wire - */ - private def send(request: (Any, Option[ActorRef], ActorRef)): Unit = { - try { - val channel = currentChannel - val f = channel.write(request) - f.addListener( - new ChannelFutureListener { - def operationComplete(future: ChannelFuture) { - if (future.isCancelled || !future.isSuccess) { - remoteSupport.notifyListeners(RemoteClientWriteFailed(request, future.getCause, remoteSupport, remoteAddress)) - } - } - }) - // Check if we should back off - if (!channel.isWritable) { - val backoff = remoteSupport.remote.remoteSettings.BackoffTimeout - if (backoff.length > 0 && !f.await(backoff.length, backoff.unit)) f.cancel() //Waited as long as we could, now back off - } - } catch { - case e: Exception ⇒ remoteSupport.notifyListeners(RemoteClientError(e, remoteSupport, remoteAddress)) - } - } - - override def toString = name -} - -class PassiveRemoteClient(val currentChannel: Channel, - remoteSupport: NettyRemoteSupport, - remoteAddress: RemoteNettyAddress) - extends RemoteClient(remoteSupport, remoteAddress) { - - def connect(reconnectIfAlreadyConnected: Boolean = false): Boolean = runSwitch switchOn { - remoteSupport.notifyListeners(RemoteClientStarted(remoteSupport, remoteAddress)) - log.debug("Starting remote client connection to [{}]", remoteAddress) - } - - def shutdown() = runSwitch switchOff { - log.debug("Shutting down remote client [{}]", name) - - remoteSupport.notifyListeners(RemoteClientShutdown(remoteSupport, remoteAddress)) - log.debug("[{}] has been shut down", name) - } -} - -/** - * RemoteClient represents a connection to an Akka node. Is used to send messages to remote actors on the node. - */ -class ActiveRemoteClient private[akka] ( - remoteSupport: NettyRemoteSupport, - remoteAddress: RemoteNettyAddress, - localAddress: RemoteSystemAddress[ParsedTransportAddress], - val loader: Option[ClassLoader] = None) - extends RemoteClient(remoteSupport, remoteAddress) { - - if (remoteAddress.ip.isEmpty) throw new java.net.UnknownHostException(remoteAddress.host) - - import remoteSupport.clientSettings._ - - //TODO rewrite to a wrapper object (minimize volatile access and maximize encapsulation) - @volatile - private var bootstrap: ClientBootstrap = _ - @volatile - private var connection: ChannelFuture = _ - @volatile - private[remote] var openChannels: DefaultChannelGroup = _ - @volatile - private var executionHandler: ExecutionHandler = _ - - @volatile - private var reconnectionTimeWindowStart = 0L - - def notifyListeners(msg: RemoteLifeCycleEvent): Unit = remoteSupport.notifyListeners(msg) - - def currentChannel = connection.getChannel - - /** - * Connect to remote server. - */ - def connect(reconnectIfAlreadyConnected: Boolean = false): Boolean = { - - def sendSecureCookie(connection: ChannelFuture) { - val handshake = RemoteControlProtocol.newBuilder.setCommandType(CommandType.CONNECT) - if (SecureCookie.nonEmpty) handshake.setCookie(SecureCookie.get) - handshake.setOrigin(RemoteProtocol.AddressProtocol.newBuilder - .setSystem(localAddress.system) - .setHostname(localAddress.transport.host) - .setPort(localAddress.transport.port) - .build) - connection.getChannel.write(remoteSupport.createControlEnvelope(handshake.build)) - } - - def attemptReconnect(): Boolean = { - log.debug("Remote client reconnecting to [{}]", remoteAddress) - connection = bootstrap.connect(new InetSocketAddress(remoteAddress.ip.get, remoteAddress.port)) - openChannels.add(connection.awaitUninterruptibly.getChannel) // Wait until the connection attempt succeeds or fails. - - if (!connection.isSuccess) { - notifyListeners(RemoteClientError(connection.getCause, remoteSupport, remoteAddress)) - false - } else { - sendSecureCookie(connection) - true - } - } - - runSwitch switchOn { - openChannels = new DefaultDisposableChannelGroup(classOf[RemoteClient].getName) - - executionHandler = new ExecutionHandler(remoteSupport.executor) - - bootstrap = new ClientBootstrap(remoteSupport.clientChannelFactory) - bootstrap.setPipelineFactory(new ActiveRemoteClientPipelineFactory(name, bootstrap, executionHandler, remoteAddress, this)) - bootstrap.setOption("tcpNoDelay", true) - bootstrap.setOption("keepAlive", true) - - log.debug("Starting remote client connection to [{}]", remoteAddress) - - connection = bootstrap.connect(new InetSocketAddress(remoteAddress.ip.get, remoteAddress.port)) - - openChannels.add(connection.awaitUninterruptibly.getChannel) // Wait until the connection attempt succeeds or fails. - - if (!connection.isSuccess) { - notifyListeners(RemoteClientError(connection.getCause, remoteSupport, remoteAddress)) - false - } else { - sendSecureCookie(connection) - notifyListeners(RemoteClientStarted(remoteSupport, remoteAddress)) - true - } - } match { - case true ⇒ true - case false if reconnectIfAlreadyConnected ⇒ - connection.getChannel.close() - openChannels.remove(connection.getChannel) - - log.debug("Remote client reconnecting to [{}]", remoteAddress) - attemptReconnect() - - case false ⇒ false - } - } - - // Please note that this method does _not_ remove the ARC from the NettyRemoteClientModule's map of clients - def shutdown() = runSwitch switchOff { - log.debug("Shutting down remote client [{}]", name) - - notifyListeners(RemoteClientShutdown(remoteSupport, remoteAddress)) - try { - if ((connection ne null) && (connection.getChannel ne null)) - connection.getChannel.close() - } finally { - try { - if (openChannels ne null) openChannels.close.awaitUninterruptibly() - } finally { - connection = null - executionHandler = null - } - } - - log.debug("[{}] has been shut down", name) - } - - private[akka] def isWithinReconnectionTimeWindow: Boolean = { - if (reconnectionTimeWindowStart == 0L) { - reconnectionTimeWindowStart = System.currentTimeMillis - true - } else { - val timeLeft = (ReconnectionTimeWindow.toMillis - (System.currentTimeMillis - reconnectionTimeWindowStart)) > 0 - if (timeLeft) - log.info("Will try to reconnect to remote server for another [{}] milliseconds", timeLeft) - - timeLeft - } - } - - private[akka] def resetReconnectionTimeWindow = reconnectionTimeWindowStart = 0L -} - -class ActiveRemoteClientPipelineFactory( - name: String, - bootstrap: ClientBootstrap, - executionHandler: ExecutionHandler, - remoteAddress: RemoteNettyAddress, - client: ActiveRemoteClient) extends ChannelPipelineFactory { - - import client.remoteSupport.clientSettings._ - - def getPipeline: ChannelPipeline = { - val timeout = new ReadTimeoutHandler(client.remoteSupport.timer, ReadTimeout.length, ReadTimeout.unit) - val lenDec = new LengthFieldBasedFrameDecoder(MessageFrameSize, 0, 4, 0, 4) - val lenPrep = new LengthFieldPrepender(4) - val messageDec = new RemoteMessageDecoder - val messageEnc = new RemoteMessageEncoder(client.remoteSupport) - val remoteClient = new ActiveRemoteClientHandler(name, bootstrap, remoteAddress, client.remoteSupport.timer, client) - - new StaticChannelPipeline(timeout, lenDec, messageDec, lenPrep, messageEnc, executionHandler, remoteClient) - } -} - -class RemoteMessageEncoder(remoteSupport: NettyRemoteSupport) extends ProtobufEncoder { - override def encode(ctx: ChannelHandlerContext, channel: Channel, msg: AnyRef): AnyRef = { - msg match { - case (message: Any, sender: Option[_], recipient: ActorRef) ⇒ - super.encode(ctx, channel, - remoteSupport.createMessageSendEnvelope( - remoteSupport.createRemoteMessageProtocolBuilder( - recipient, - message, - sender.asInstanceOf[Option[ActorRef]]).build)) - case _ ⇒ super.encode(ctx, channel, msg) - } - } -} - -class RemoteMessageDecoder extends ProtobufDecoder(AkkaRemoteProtocol.getDefaultInstance) - -@ChannelHandler.Sharable -class ActiveRemoteClientHandler( - val name: String, - val bootstrap: ClientBootstrap, - val remoteAddress: RemoteNettyAddress, - val timer: HashedWheelTimer, - val client: ActiveRemoteClient) - extends SimpleChannelUpstreamHandler { - - def runOnceNow(thunk: ⇒ Unit): Unit = timer.newTimeout(new TimerTask() { - def run(timeout: Timeout) = try { thunk } finally { timeout.cancel() } - }, 0, TimeUnit.MILLISECONDS) - - override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) { - try { - event.getMessage match { - case arp: AkkaRemoteProtocol if arp.hasInstruction ⇒ - val rcp = arp.getInstruction - rcp.getCommandType match { - case CommandType.SHUTDOWN ⇒ runOnceNow { client.remoteSupport.shutdownClientConnection(remoteAddress) } - case _ ⇒ //Ignore others - } - - case arp: AkkaRemoteProtocol if arp.hasMessage ⇒ - client.remoteSupport.receiveMessage(new RemoteMessage(arp.getMessage, client.remoteSupport.system, client.loader)) - - case other ⇒ - throw new RemoteClientException("Unknown message received in remote client handler: " + other, client.remoteSupport, client.remoteAddress) - } - } catch { - case e: Exception ⇒ client.notifyListeners(RemoteClientError(e, client.remoteSupport, client.remoteAddress)) - } - } - - override def channelClosed(ctx: ChannelHandlerContext, event: ChannelStateEvent) = client.runSwitch ifOn { - if (client.isWithinReconnectionTimeWindow) { - timer.newTimeout(new TimerTask() { - def run(timeout: Timeout) = - if (client.isRunning) { - client.openChannels.remove(event.getChannel) - client.connect(reconnectIfAlreadyConnected = true) - } - }, client.remoteSupport.clientSettings.ReconnectDelay.toMillis, TimeUnit.MILLISECONDS) - } else runOnceNow { - client.remoteSupport.shutdownClientConnection(remoteAddress) // spawn in another thread - } - } - - override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { - try { - client.notifyListeners(RemoteClientConnected(client.remoteSupport, client.remoteAddress)) - client.resetReconnectionTimeWindow - } catch { - case e: Exception ⇒ client.notifyListeners(RemoteClientError(e, client.remoteSupport, client.remoteAddress)) - } - } - - override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { - client.notifyListeners(RemoteClientDisconnected(client.remoteSupport, client.remoteAddress)) - } - - override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = { - val cause = event.getCause - if (cause ne null) { - client.notifyListeners(RemoteClientError(cause, client.remoteSupport, client.remoteAddress)) - cause match { - case e: ReadTimeoutException ⇒ - runOnceNow { - client.remoteSupport.shutdownClientConnection(remoteAddress) // spawn in another thread - } - case e: Exception ⇒ event.getChannel.close() - } - - } else client.notifyListeners(RemoteClientError(new Exception("Unknown cause"), client.remoteSupport, client.remoteAddress)) - } -} +import akka.event.Logging +import akka.remote.RemoteProtocol.AkkaRemoteProtocol +import akka.remote.{ RemoteTransport, RemoteMarshallingOps, RemoteClientWriteFailed, RemoteClientException, RemoteClientError, RemoteActorRef } +import akka.util.Switch +import akka.AkkaException +import com.typesafe.config.Config +import akka.remote.RemoteSettings +import akka.actor.Address +import java.net.InetSocketAddress +import akka.remote.RemoteActorRefProvider +import akka.remote.RemoteActorRefProvider +import akka.event.LoggingAdapter /** * Provides the implementation of the Netty remote support */ -class NettyRemoteSupport(_system: ActorSystemImpl, val remote: Remote, val address: RemoteSystemAddress[RemoteNettyAddress]) - extends RemoteSupport[RemoteNettyAddress](_system) with RemoteMarshallingOps { - val log = Logging(system, "NettyRemoteSupport") +class NettyRemoteTransport(val remoteSettings: RemoteSettings) + extends RemoteTransport with RemoteMarshallingOps { - val serverSettings = remote.remoteSettings.serverSettings - val clientSettings = remote.remoteSettings.clientSettings + val settings = new NettySettings(remoteSettings.config.getConfig("akka.remote.netty"), remoteSettings.systemName) - val threadFactory = new MonitorableThreadFactory("NettyRemoteSupport", remote.remoteSettings.Daemonic) + val threadFactory = new MonitorableThreadFactory("NettyRemoteTransport", settings.Daemonic) val timer: HashedWheelTimer = new HashedWheelTimer(threadFactory) val executor = new OrderedMemoryAwareThreadPoolExecutor( - serverSettings.ExecutionPoolSize, - serverSettings.MaxChannelMemorySize, - serverSettings.MaxTotalMemorySize, - serverSettings.ExecutionPoolKeepAlive.length, - serverSettings.ExecutionPoolKeepAlive.unit, + settings.ExecutionPoolSize, + settings.MaxChannelMemorySize, + settings.MaxTotalMemorySize, + settings.ExecutionPoolKeepAlive.length, + settings.ExecutionPoolKeepAlive.unit, threadFactory) val clientChannelFactory = new NioClientSocketChannelFactory( Executors.newCachedThreadPool(threadFactory), Executors.newCachedThreadPool(threadFactory)) - private val remoteClients = new HashMap[RemoteNettyAddress, RemoteClient] + private val remoteClients = new HashMap[Address, RemoteClient] private val clientsLock = new ReentrantReadWriteLock - override protected def useUntrustedMode = serverSettings.UntrustedMode + override protected def useUntrustedMode = remoteSettings.UntrustedMode + + val server = try new NettyRemoteServer(this, Some(getClass.getClassLoader)) catch { + case ex ⇒ shutdown(); throw ex + } + + val address = { + server.channel.getLocalAddress match { + case ia: InetSocketAddress ⇒ Address("akka", remoteSettings.systemName, Some(ia.getHostName), Some(ia.getPort)) + case x ⇒ + shutdown() + throw new IllegalArgumentException("unknown address format " + x + ":" + x.getClass) + } + } + + @volatile + private var _system: ActorSystemImpl = _ + def system = _system + + @volatile + private var _provider: RemoteActorRefProvider = _ + def provider = _provider + + @volatile + private var _log: LoggingAdapter = _ + def log = _log + + def start(system: ActorSystemImpl, provider: RemoteActorRefProvider): Unit = { + _system = system + _provider = provider + _log = Logging(system, "NettyRemoteTransport") + server.start(system) + } + + def shutdown(): Unit = { + clientsLock.writeLock().lock() + try { + remoteClients foreach { case (_, client) ⇒ client.shutdown() } + remoteClients.clear() + } finally { + clientsLock.writeLock().unlock() + try { + if (server != null) server.shutdown() + } finally { + try { + timer.stop() + } finally { + try { + clientChannelFactory.releaseExternalResources() + } finally { + executor.shutdown() + } + } + } + } + } protected[akka] def send( message: Any, @@ -399,13 +119,7 @@ class NettyRemoteSupport(_system: ActorSystemImpl, val remote: Remote, val addre recipient: RemoteActorRef, loader: Option[ClassLoader]): Unit = { - val recipientAddress = recipient.path.address match { - case RemoteSystemAddress(sys, transport) ⇒ - transport match { - case x: RemoteNettyAddress ⇒ x - case _ ⇒ throw new IllegalArgumentException("invoking NettyRemoteSupport.send with foreign target address " + transport) - } - } + val recipientAddress = recipient.path.address clientsLock.readLock.lock try { @@ -420,7 +134,7 @@ class NettyRemoteSupport(_system: ActorSystemImpl, val remote: Remote, val addre //Recheck for addition, race between upgrades case Some(client) ⇒ client //If already populated by other writer case None ⇒ //Populate map - val client = new ActiveRemoteClient(this, recipientAddress, remote.remoteAddress, loader) + val client = new ActiveRemoteClient(this, recipientAddress, address, loader) client.connect() remoteClients += recipientAddress -> client client @@ -438,7 +152,7 @@ class NettyRemoteSupport(_system: ActorSystemImpl, val remote: Remote, val addre } } - def bindClient(remoteAddress: RemoteNettyAddress, client: RemoteClient, putIfAbsent: Boolean = false): Boolean = { + def bindClient(remoteAddress: Address, client: RemoteClient, putIfAbsent: Boolean = false): Boolean = { clientsLock.writeLock().lock() try { if (putIfAbsent && remoteClients.contains(remoteAddress)) false @@ -452,7 +166,7 @@ class NettyRemoteSupport(_system: ActorSystemImpl, val remote: Remote, val addre } } - def unbindClient(remoteAddress: RemoteNettyAddress): Unit = { + def unbindClient(remoteAddress: Address): Unit = { clientsLock.writeLock().lock() try { remoteClients.foreach { case (k, v) ⇒ if (v.isBoundTo(remoteAddress)) { v.shutdown(); remoteClients.remove(k) } } @@ -461,7 +175,7 @@ class NettyRemoteSupport(_system: ActorSystemImpl, val remote: Remote, val addre } } - def shutdownClientConnection(remoteAddress: RemoteNettyAddress): Boolean = { + def shutdownClientConnection(remoteAddress: Address): Boolean = { clientsLock.writeLock().lock() try { remoteClients.remove(remoteAddress) match { @@ -473,7 +187,7 @@ class NettyRemoteSupport(_system: ActorSystemImpl, val remote: Remote, val addre } } - def restartClientConnection(remoteAddress: RemoteNettyAddress): Boolean = { + def restartClientConnection(remoteAddress: Address): Boolean = { clientsLock.readLock().lock() try { remoteClients.get(remoteAddress) match { @@ -485,229 +199,24 @@ class NettyRemoteSupport(_system: ActorSystemImpl, val remote: Remote, val addre } } - /** - * Server section - */ - @volatile - private var currentServer: NettyRemoteServer = _ +} - def name = currentServer match { - case null ⇒ remote.remoteAddress.toString - case server ⇒ server.name - } - - private val _isRunning = new Switch(false) - - def isRunning = _isRunning.isOn - - def start(loader: Option[ClassLoader] = None): Unit = - _isRunning switchOn { currentServer = new NettyRemoteServer(this, loader, address) } - - /** - * Common section - */ - - def shutdown(): Unit = _isRunning switchOff { - clientsLock.writeLock().lock() - try { - remoteClients foreach { case (_, client) ⇒ client.shutdown() } - remoteClients.clear() - } finally { - clientsLock.writeLock().unlock() - try { - val s = currentServer - currentServer = null - s.shutdown() - } finally { - try { - timer.stop() - } finally { - try { - clientChannelFactory.releaseExternalResources() - } finally { - executor.shutdown() - } - } - } +class RemoteMessageEncoder(remoteSupport: NettyRemoteTransport) extends ProtobufEncoder { + override def encode(ctx: ChannelHandlerContext, channel: Channel, msg: AnyRef): AnyRef = { + msg match { + case (message: Any, sender: Option[_], recipient: ActorRef) ⇒ + super.encode(ctx, channel, + remoteSupport.createMessageSendEnvelope( + remoteSupport.createRemoteMessageProtocolBuilder( + recipient, + message, + sender.asInstanceOf[Option[ActorRef]]).build)) + case _ ⇒ super.encode(ctx, channel, msg) } } } -class NettyRemoteServer( - val remoteSupport: NettyRemoteSupport, - val loader: Option[ClassLoader], - val address: RemoteSystemAddress[RemoteNettyAddress]) { - val log = Logging(remoteSupport.system, "NettyRemoteServer") - import remoteSupport.serverSettings._ - - if (address.transport.ip.isEmpty) throw new java.net.UnknownHostException(address.transport.host) - - val name = "NettyRemoteServer@" + address - - private val factory = new NioServerSocketChannelFactory( - Executors.newCachedThreadPool(remoteSupport.threadFactory), - Executors.newCachedThreadPool(remoteSupport.threadFactory)) - - private val bootstrap = new ServerBootstrap(factory) - - private val executionHandler = new ExecutionHandler(remoteSupport.executor) - - // group of open channels, used for clean-up - private val openChannels: ChannelGroup = new DefaultDisposableChannelGroup("akka-remote-server") - - val pipelineFactory = new RemoteServerPipelineFactory(name, openChannels, executionHandler, loader, remoteSupport) - bootstrap.setPipelineFactory(pipelineFactory) - bootstrap.setOption("backlog", Backlog) - bootstrap.setOption("child.tcpNoDelay", true) - bootstrap.setOption("child.keepAlive", true) - bootstrap.setOption("child.reuseAddress", true) - bootstrap.setOption("child.connectTimeoutMillis", ConnectionTimeout.toMillis) - - openChannels.add(bootstrap.bind(new InetSocketAddress(address.transport.ip.get, address.transport.port))) - remoteSupport.notifyListeners(RemoteServerStarted(remoteSupport)) - - def shutdown() { - try { - val shutdownSignal = { - val b = RemoteControlProtocol.newBuilder.setCommandType(CommandType.SHUTDOWN) - b.setOrigin(RemoteProtocol.AddressProtocol.newBuilder - .setSystem(address.system) - .setHostname(address.transport.host) - .setPort(address.transport.port) - .build) - if (SecureCookie.nonEmpty) - b.setCookie(SecureCookie.get) - b.build - } - openChannels.write(remoteSupport.createControlEnvelope(shutdownSignal)).awaitUninterruptibly - openChannels.disconnect - openChannels.close.awaitUninterruptibly - bootstrap.releaseExternalResources() - remoteSupport.notifyListeners(RemoteServerShutdown(remoteSupport)) - } catch { - case e: Exception ⇒ remoteSupport.notifyListeners(RemoteServerError(e, remoteSupport)) - } - } -} - -class RemoteServerPipelineFactory( - val name: String, - val openChannels: ChannelGroup, - val executionHandler: ExecutionHandler, - val loader: Option[ClassLoader], - val remoteSupport: NettyRemoteSupport) extends ChannelPipelineFactory { - - import remoteSupport.serverSettings._ - - def getPipeline: ChannelPipeline = { - val lenDec = new LengthFieldBasedFrameDecoder(MessageFrameSize, 0, 4, 0, 4) - val lenPrep = new LengthFieldPrepender(4) - val messageDec = new RemoteMessageDecoder - val messageEnc = new RemoteMessageEncoder(remoteSupport) - - val authenticator = if (RequireCookie) new RemoteServerAuthenticationHandler(SecureCookie) :: Nil else Nil - val remoteServer = new RemoteServerHandler(name, openChannels, loader, remoteSupport) - val stages: List[ChannelHandler] = lenDec :: messageDec :: lenPrep :: messageEnc :: executionHandler :: authenticator ::: remoteServer :: Nil - new StaticChannelPipeline(stages: _*) - } -} - -@ChannelHandler.Sharable -class RemoteServerAuthenticationHandler(secureCookie: Option[String]) extends SimpleChannelUpstreamHandler { - val authenticated = new AnyRef - - override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) = secureCookie match { - case None ⇒ ctx.sendUpstream(event) - case Some(cookie) ⇒ - ctx.getAttachment match { - case `authenticated` ⇒ ctx.sendUpstream(event) - case null ⇒ event.getMessage match { - case remoteProtocol: AkkaRemoteProtocol if remoteProtocol.hasInstruction ⇒ - val instruction = remoteProtocol.getInstruction - instruction.getCookie match { - case `cookie` ⇒ - ctx.setAttachment(authenticated) - ctx.sendUpstream(event) - case _ ⇒ - throw new SecurityException( - "The remote client [" + ctx.getChannel.getRemoteAddress + "] secure cookie is not the same as remote server secure cookie") - } - case _ ⇒ - throw new SecurityException("The remote client [" + ctx.getChannel.getRemoteAddress + "] is not authorized!") - } - } - } -} - -@ChannelHandler.Sharable -class RemoteServerHandler( - val name: String, - val openChannels: ChannelGroup, - val applicationLoader: Option[ClassLoader], - val remoteSupport: NettyRemoteSupport) extends SimpleChannelUpstreamHandler { - - val log = Logging(remoteSupport.system, "RemoteServerHandler") - - import remoteSupport.serverSettings._ - - /** - * ChannelOpen overridden to store open channels for a clean postStop of a node. - * If a channel is closed before, it is automatically removed from the open channels group. - */ - override def channelOpen(ctx: ChannelHandlerContext, event: ChannelStateEvent) = openChannels.add(ctx.getChannel) - - override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { - val clientAddress = getClientAddress(ctx.getChannel) - remoteSupport.notifyListeners(RemoteServerClientConnected(remoteSupport, clientAddress)) - } - - override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { - val clientAddress = getClientAddress(ctx.getChannel) - remoteSupport.notifyListeners(RemoteServerClientDisconnected(remoteSupport, clientAddress)) - } - - override def channelClosed(ctx: ChannelHandlerContext, event: ChannelStateEvent) = getClientAddress(ctx.getChannel) match { - case s @ Some(address) ⇒ - if (UsePassiveConnections) - remoteSupport.unbindClient(address) - remoteSupport.notifyListeners(RemoteServerClientClosed(remoteSupport, s)) - case None ⇒ - remoteSupport.notifyListeners(RemoteServerClientClosed[RemoteNettyAddress](remoteSupport, None)) - } - - override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) = try { - event.getMessage match { - case remote: AkkaRemoteProtocol if remote.hasMessage ⇒ - remoteSupport.receiveMessage(new RemoteMessage(remote.getMessage, remoteSupport.system, applicationLoader)) - - case remote: AkkaRemoteProtocol if remote.hasInstruction ⇒ - val instruction = remote.getInstruction - instruction.getCommandType match { - case CommandType.CONNECT if UsePassiveConnections ⇒ - val origin = instruction.getOrigin - val inbound = RemoteNettyAddress(origin.getHostname, origin.getPort) - val client = new PassiveRemoteClient(event.getChannel, remoteSupport, inbound) - remoteSupport.bindClient(inbound, client) - case CommandType.SHUTDOWN ⇒ //Will be unbound in channelClosed - case _ ⇒ //Unknown command - } - case _ ⇒ //ignore - } - } catch { - case e: Exception ⇒ remoteSupport.notifyListeners(RemoteServerError(e, remoteSupport)) - } - - override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = { - remoteSupport.notifyListeners(RemoteServerError(event.getCause, remoteSupport)) - event.getChannel.close() - } - - private def getClientAddress(c: Channel): Option[RemoteNettyAddress] = - c.getRemoteAddress match { - case inet: InetSocketAddress ⇒ Some(RemoteNettyAddress(inet.getHostName, Some(inet.getAddress), inet.getPort)) - case _ ⇒ None - } -} +class RemoteMessageDecoder extends ProtobufDecoder(AkkaRemoteProtocol.getDefaultInstance) class DefaultDisposableChannelGroup(name: String) extends DefaultChannelGroup(name) { protected val guard = new ReentrantReadWriteLock diff --git a/akka-remote/src/main/scala/akka/remote/netty/Server.scala b/akka-remote/src/main/scala/akka/remote/netty/Server.scala new file mode 100644 index 0000000000..a50d3653e8 --- /dev/null +++ b/akka-remote/src/main/scala/akka/remote/netty/Server.scala @@ -0,0 +1,202 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ +package akka.remote.netty + +import java.net.InetSocketAddress +import java.util.concurrent.Executors +import scala.Option.option2Iterable +import org.jboss.netty.bootstrap.ServerBootstrap +import org.jboss.netty.channel.ChannelHandler.Sharable +import org.jboss.netty.channel.group.ChannelGroup +import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory +import org.jboss.netty.channel.{ StaticChannelPipeline, SimpleChannelUpstreamHandler, MessageEvent, ExceptionEvent, ChannelStateEvent, ChannelPipelineFactory, ChannelPipeline, ChannelHandlerContext, ChannelHandler, Channel } +import org.jboss.netty.handler.codec.frame.{ LengthFieldPrepender, LengthFieldBasedFrameDecoder } +import org.jboss.netty.handler.execution.ExecutionHandler +import akka.event.Logging +import akka.remote.RemoteProtocol.{ RemoteControlProtocol, CommandType, AkkaRemoteProtocol } +import akka.remote.{ RemoteServerStarted, RemoteServerShutdown, RemoteServerError, RemoteServerClientDisconnected, RemoteServerClientConnected, RemoteServerClientClosed, RemoteProtocol, RemoteMessage } +import akka.actor.Address +import java.net.InetAddress +import akka.actor.ActorSystemImpl +import org.jboss.netty.channel.ChannelLocal + +class NettyRemoteServer( + val netty: NettyRemoteTransport, + val loader: Option[ClassLoader]) { + + import netty.settings + + val ip = InetAddress.getByName(settings.Hostname) + + private val factory = new NioServerSocketChannelFactory( + Executors.newCachedThreadPool(netty.threadFactory), + Executors.newCachedThreadPool(netty.threadFactory)) + + private val bootstrap = new ServerBootstrap(factory) + + private val executionHandler = new ExecutionHandler(netty.executor) + + // group of open channels, used for clean-up + private val openChannels: ChannelGroup = new DefaultDisposableChannelGroup("akka-remote-server") + + val pipelineFactory = new RemoteServerPipelineFactory(openChannels, executionHandler, loader, netty) + bootstrap.setPipelineFactory(pipelineFactory) + bootstrap.setOption("backlog", settings.Backlog) + bootstrap.setOption("tcpNoDelay", true) + bootstrap.setOption("keepAlive", true) + bootstrap.setOption("reuseAddress", true) + + val channel = bootstrap.bind(new InetSocketAddress(ip, settings.Port)) + + openChannels.add(channel) + + def start(system: ActorSystemImpl) { + netty.notifyListeners(RemoteServerStarted(netty)) + // TODO uncork the pipeline, which was ... + // TODO ... corked before in order not to allow anything through before init is complete + } + + def shutdown() { + try { + val shutdownSignal = { + val b = RemoteControlProtocol.newBuilder.setCommandType(CommandType.SHUTDOWN) + b.setOrigin(RemoteProtocol.AddressProtocol.newBuilder + .setSystem(settings.systemName) + .setHostname(settings.Hostname) + .setPort(settings.Port) + .build) + if (settings.SecureCookie.nonEmpty) + b.setCookie(settings.SecureCookie.get) + b.build + } + openChannels.write(netty.createControlEnvelope(shutdownSignal)).awaitUninterruptibly + openChannels.disconnect + openChannels.close.awaitUninterruptibly + bootstrap.releaseExternalResources() + netty.notifyListeners(RemoteServerShutdown(netty)) + } catch { + case e: Exception ⇒ netty.notifyListeners(RemoteServerError(e, netty)) + } + } +} + +class RemoteServerPipelineFactory( + val openChannels: ChannelGroup, + val executionHandler: ExecutionHandler, + val loader: Option[ClassLoader], + val netty: NettyRemoteTransport) extends ChannelPipelineFactory { + + import netty.settings + + def getPipeline: ChannelPipeline = { + val lenDec = new LengthFieldBasedFrameDecoder(settings.MessageFrameSize, 0, 4, 0, 4) + val lenPrep = new LengthFieldPrepender(4) + val messageDec = new RemoteMessageDecoder + val messageEnc = new RemoteMessageEncoder(netty) + + val authenticator = if (settings.RequireCookie) new RemoteServerAuthenticationHandler(settings.SecureCookie) :: Nil else Nil + val remoteServer = new RemoteServerHandler(openChannels, loader, netty) + val stages: List[ChannelHandler] = lenDec :: messageDec :: lenPrep :: messageEnc :: executionHandler :: authenticator ::: remoteServer :: Nil + new StaticChannelPipeline(stages: _*) + } +} + +@ChannelHandler.Sharable +class RemoteServerAuthenticationHandler(secureCookie: Option[String]) extends SimpleChannelUpstreamHandler { + val authenticated = new AnyRef + + override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) = secureCookie match { + case None ⇒ ctx.sendUpstream(event) + case Some(cookie) ⇒ + ctx.getAttachment match { + case `authenticated` ⇒ ctx.sendUpstream(event) + case null ⇒ event.getMessage match { + case remoteProtocol: AkkaRemoteProtocol if remoteProtocol.hasInstruction ⇒ + val instruction = remoteProtocol.getInstruction + instruction.getCookie match { + case `cookie` ⇒ + ctx.setAttachment(authenticated) + ctx.sendUpstream(event) + case _ ⇒ + throw new SecurityException( + "The remote client [" + ctx.getChannel.getRemoteAddress + "] secure cookie is not the same as remote server secure cookie") + } + case _ ⇒ + throw new SecurityException("The remote client [" + ctx.getChannel.getRemoteAddress + "] is not authorized!") + } + } + } +} + +object ChannelLocalSystem extends ChannelLocal[ActorSystemImpl] { + override def initialValue(ch: Channel): ActorSystemImpl = null +} + +@ChannelHandler.Sharable +class RemoteServerHandler( + val openChannels: ChannelGroup, + val applicationLoader: Option[ClassLoader], + val netty: NettyRemoteTransport) extends SimpleChannelUpstreamHandler { + + import netty.settings + + /** + * ChannelOpen overridden to store open channels for a clean postStop of a node. + * If a channel is closed before, it is automatically removed from the open channels group. + */ + override def channelOpen(ctx: ChannelHandlerContext, event: ChannelStateEvent) = openChannels.add(ctx.getChannel) + + override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { + val clientAddress = getClientAddress(ctx.getChannel) + netty.notifyListeners(RemoteServerClientConnected(netty, clientAddress)) + } + + override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { + val clientAddress = getClientAddress(ctx.getChannel) + netty.notifyListeners(RemoteServerClientDisconnected(netty, clientAddress)) + } + + override def channelClosed(ctx: ChannelHandlerContext, event: ChannelStateEvent) = getClientAddress(ctx.getChannel) match { + case s @ Some(address) ⇒ + if (settings.UsePassiveConnections) + netty.unbindClient(address) + netty.notifyListeners(RemoteServerClientClosed(netty, s)) + case None ⇒ + netty.notifyListeners(RemoteServerClientClosed(netty, None)) + } + + override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) = try { + event.getMessage match { + case remote: AkkaRemoteProtocol if remote.hasMessage ⇒ + netty.receiveMessage(new RemoteMessage(remote.getMessage, netty.system, applicationLoader)) + + case remote: AkkaRemoteProtocol if remote.hasInstruction ⇒ + val instruction = remote.getInstruction + instruction.getCommandType match { + case CommandType.CONNECT if settings.UsePassiveConnections ⇒ + val origin = instruction.getOrigin + val inbound = Address("akka", origin.getSystem, Some(origin.getHostname), Some(origin.getPort)) + val client = new PassiveRemoteClient(event.getChannel, netty, inbound) + netty.bindClient(inbound, client) + case CommandType.SHUTDOWN ⇒ //Will be unbound in channelClosed + case _ ⇒ //Unknown command + } + case _ ⇒ //ignore + } + } catch { + case e: Exception ⇒ netty.notifyListeners(RemoteServerError(e, netty)) + } + + override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = { + netty.notifyListeners(RemoteServerError(event.getCause, netty)) + event.getChannel.close() + } + + private def getClientAddress(c: Channel): Option[Address] = + c.getRemoteAddress match { + case inet: InetSocketAddress ⇒ Some(Address("akka", "unknown(yet)", Some(inet.getAddress.toString), Some(inet.getPort))) + case _ ⇒ None + } +} + diff --git a/akka-remote/src/main/scala/akka/remote/netty/Settings.scala b/akka-remote/src/main/scala/akka/remote/netty/Settings.scala new file mode 100644 index 0000000000..9596f1b083 --- /dev/null +++ b/akka-remote/src/main/scala/akka/remote/netty/Settings.scala @@ -0,0 +1,63 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ +package akka.remote.netty + +import com.typesafe.config.Config +import akka.util.Duration +import java.util.concurrent.TimeUnit._ +import java.net.InetAddress +import akka.config.ConfigurationException + +class NettySettings(config: Config, val systemName: String) { + + import config._ + + val Daemonic = getBoolean("daemonic") + val BackoffTimeout = Duration(getMilliseconds("backoff-timeout"), MILLISECONDS) + + val SecureCookie: Option[String] = getString("secure-cookie") match { + case "" ⇒ None + case cookie ⇒ Some(cookie) + } + val RequireCookie = { + val requireCookie = getBoolean("require-cookie") + if (requireCookie && SecureCookie.isEmpty) throw new ConfigurationException( + "Configuration option 'akka.remote.netty.require-cookie' is turned on but no secure cookie is defined in 'akka.remote.netty.secure-cookie'.") + requireCookie + } + + val UsePassiveConnections = getBoolean("use-passive-connections") + + val ReconnectionTimeWindow = Duration(getMilliseconds("reconnection-time-window"), MILLISECONDS) + val ReadTimeout = Duration(getMilliseconds("read-timeout"), MILLISECONDS) + val ReconnectDelay = Duration(getMilliseconds("reconnect-delay"), MILLISECONDS) + val MessageFrameSize = getBytes("message-frame-size").toInt + + val Hostname = getString("hostname") match { + case "" ⇒ InetAddress.getLocalHost.getHostAddress + case value ⇒ value + } + val Port = getInt("port") + val ConnectionTimeout = Duration(getMilliseconds("connection-timeout"), MILLISECONDS) + + val Backlog = getInt("backlog") + + val ExecutionPoolKeepAlive = Duration(getMilliseconds("execution-pool-keepalive"), MILLISECONDS) + + val ExecutionPoolSize = getInt("execution-pool-size") match { + case sz if sz < 1 ⇒ throw new IllegalArgumentException("akka.remote.netty.execution-pool-size is less than 1") + case sz ⇒ sz + } + + val MaxChannelMemorySize = getBytes("max-channel-memory-size") match { + case sz if sz < 0 ⇒ throw new IllegalArgumentException("akka.remote.netty.max-channel-memory-size is less than 0 bytes") + case sz ⇒ sz + } + + val MaxTotalMemorySize = getBytes("max-total-memory-size") match { + case sz if sz < 0 ⇒ throw new IllegalArgumentException("akka.remote.netty.max-total-memory-size is less than 0 bytes") + case sz ⇒ sz + } + +} \ No newline at end of file diff --git a/akka-remote/src/main/scala/akka/remote/package.scala b/akka-remote/src/main/scala/akka/remote/package.scala deleted file mode 100644 index e2514d6592..0000000000 --- a/akka-remote/src/main/scala/akka/remote/package.scala +++ /dev/null @@ -1,8 +0,0 @@ -/** - * Copyright (C) 2009-2010 Typesafe Inc. - */ -package akka - -package object remote { - type TransportsMap = Map[String, (String, Int) ⇒ Either[String, RemoteTransportAddress]] -} \ No newline at end of file diff --git a/akka-remote/src/main/scala/akka/routing/RemoteRouterConfig.scala b/akka-remote/src/main/scala/akka/routing/RemoteRouterConfig.scala index dffb874be6..43bd2d06df 100644 --- a/akka-remote/src/main/scala/akka/routing/RemoteRouterConfig.scala +++ b/akka-remote/src/main/scala/akka/routing/RemoteRouterConfig.scala @@ -12,7 +12,7 @@ import akka.actor.InternalActorRef import akka.actor.Props import akka.config.ConfigurationException import akka.remote.RemoteScope -import akka.remote.RemoteAddressExtractor +import akka.actor.AddressExtractor /** * [[akka.routing.RouterConfig]] implementation for remote deployment on defined @@ -46,8 +46,8 @@ class RemoteRouteeProvider(nodes: Iterable[String], _context: ActorContext, _res // need this iterator as instance variable since Resizer may call createRoutees several times private val nodeAddressIter = { val nodeAddresses = nodes map { - case RemoteAddressExtractor(a) ⇒ a - case x ⇒ throw new ConfigurationException("unparseable remote node " + x) + case AddressExtractor(a) ⇒ a + case x ⇒ throw new ConfigurationException("unparseable remote node " + x) } Stream.continually(nodeAddresses).flatten.iterator } diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/AbstractRemoteActorMultiJvmSpec.scala b/akka-remote/src/multi-jvm/scala/akka/remote/AbstractRemoteActorMultiJvmSpec.scala index 597b552fe9..2d409811cc 100755 --- a/akka-remote/src/multi-jvm/scala/akka/remote/AbstractRemoteActorMultiJvmSpec.scala +++ b/akka-remote/src/multi-jvm/scala/akka/remote/AbstractRemoteActorMultiJvmSpec.scala @@ -21,8 +21,8 @@ trait AbstractRemoteActorMultiJvmSpec { case (idx, host) => ConfigFactory.parseString(""" akka { - remote.server.hostname="%s" - remote.server.port = "%d" + remote.netty.hostname="%s" + remote.netty.port = "%d" }""".format(host, 9990+idx, idx)) withFallback commonConfig } } diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/AkkaRemoteSpec.scala b/akka-remote/src/multi-jvm/scala/akka/remote/AkkaRemoteSpec.scala index 2a38a33402..8b9a79a64e 100644 --- a/akka-remote/src/multi-jvm/scala/akka/remote/AkkaRemoteSpec.scala +++ b/akka-remote/src/multi-jvm/scala/akka/remote/AkkaRemoteSpec.scala @@ -32,14 +32,4 @@ abstract class AkkaRemoteSpec(config: Config) extends AkkaSpec(config.withFallback(AkkaRemoteSpec.testConf)) with MultiJvmSync { - /** - * Helper function for accessing the underlying remoting. - */ - def remote: Remote = { - system.asInstanceOf[ActorSystemImpl].provider match { - case r: RemoteActorRefProvider ⇒ r.remote - case _ ⇒ throw new Exception("Remoting is not enabled") - } - } - } diff --git a/akka-remote/src/test/scala/akka/remote/AccrualFailureDetectorSpec.scala b/akka-remote/src/test/scala/akka/remote/AccrualFailureDetectorSpec.scala index 5ac2f281ab..17a848b8d3 100644 --- a/akka-remote/src/test/scala/akka/remote/AccrualFailureDetectorSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/AccrualFailureDetectorSpec.scala @@ -2,11 +2,12 @@ package akka.remote import java.net.InetSocketAddress import akka.testkit.AkkaSpec +import akka.actor.Address class AccrualFailureDetectorSpec extends AkkaSpec { "An AccrualFailureDetector" must { - val conn = RemoteNettyAddress("localhost", 2552) + val conn = Address("akka", "", Some("localhost"), Some(2552)) "mark node as available after a series of successful heartbeats" in { val fd = new AccrualFailureDetector() diff --git a/akka-remote/src/test/scala/akka/remote/NetworkFailureSpec.scala b/akka-remote/src/test/scala/akka/remote/NetworkFailureSpec.scala index e02b072e2e..4b46fdddfc 100644 --- a/akka-remote/src/test/scala/akka/remote/NetworkFailureSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/NetworkFailureSpec.scala @@ -6,7 +6,7 @@ package akka.remote import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach } -import akka.remote.netty.NettyRemoteSupport +import akka.remote.netty.NettyRemoteTransport import akka.actor.Actor import akka.testkit.AkkaSpec import akka.testkit.DefaultTimeout diff --git a/akka-remote/src/test/scala/akka/remote/RemoteCommunicationSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteCommunicationSpec.scala index 58199b4683..ddf1f338a6 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteCommunicationSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteCommunicationSpec.scala @@ -33,7 +33,7 @@ object RemoteCommunicationSpec { class RemoteCommunicationSpec extends AkkaSpec(""" akka { actor.provider = "akka.remote.RemoteActorRefProvider" - remote.server { + remote.netty { hostname = localhost port = 12345 } @@ -47,7 +47,7 @@ akka { import RemoteCommunicationSpec._ - val conf = ConfigFactory.parseString("akka.remote.server.port=12346").withFallback(system.settings.config) + val conf = ConfigFactory.parseString("akka.remote.netty.port=12346").withFallback(system.settings.config) val other = ActorSystem("remote_sys", conf) val remote = other.actorOf(Props(new Actor { diff --git a/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala index 49502b99ea..72a980db93 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala @@ -1,6 +1,10 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ package akka.remote import akka.testkit.AkkaSpec +import akka.util.duration._ @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class RemoteConfigSpec extends AkkaSpec("") { @@ -8,54 +12,20 @@ class RemoteConfigSpec extends AkkaSpec("") { "RemoteExtension" must { "be able to parse remote and cluster config elements" in { - val config = system.settings.config - import config._ + val settings = new RemoteSettings(system.settings.config, "") + import settings._ - //akka.remote - getString("akka.remote.transport") must equal("akka.remote.netty.NettyRemoteSupport") - getString("akka.remote.secure-cookie") must equal("") - getBoolean("akka.remote.use-passive-connections") must equal(true) - getMilliseconds("akka.remote.backoff-timeout") must equal(0) - getBoolean("akka.remote.daemonic") must equal(true) - // getMilliseconds("akka.remote.remote-daemon-ack-timeout") must equal(30 * 1000) + RemoteTransport must be("akka.remote.netty.NettyRemoteTransport") + UntrustedMode must be(false) + RemoteSystemDaemonAckTimeout must be(30 seconds) - //akka.remote.server - getInt("akka.remote.server.port") must equal(2552) - getBytes("akka.remote.server.message-frame-size") must equal(1048576L) - getMilliseconds("akka.remote.server.connection-timeout") must equal(120 * 1000) - getBoolean("akka.remote.server.require-cookie") must equal(false) - getBoolean("akka.remote.server.untrusted-mode") must equal(false) - getInt("akka.remote.server.backlog") must equal(4096) + FailureDetectorThreshold must be(8) + FailureDetectorMaxSampleSize must be(1000) - getMilliseconds("akka.remote.server.execution-pool-keepalive") must equal(60 * 1000) + InitialDelayForGossip must be(5 seconds) + GossipFrequency must be(1 second) + SeedNodes must be(Set()) - getInt("akka.remote.server.execution-pool-size") must equal(4) - - getBytes("akka.remote.server.max-channel-memory-size") must equal(0) - getBytes("akka.remote.server.max-total-memory-size") must equal(0) - - //akka.remote.client - getMilliseconds("akka.remote.client.reconnect-delay") must equal(5 * 1000) - getMilliseconds("akka.remote.client.read-timeout") must equal(3600 * 1000) - getMilliseconds("akka.remote.client.reconnection-time-window") must equal(600 * 1000) - - // TODO cluster config will go into akka-cluster/reference.conf when we enable that module - //akka.cluster - getStringList("akka.cluster.seed-nodes") must equal(new java.util.ArrayList[String]) - - // getMilliseconds("akka.cluster.max-time-to-wait-until-connected") must equal(30 * 1000) - // getMilliseconds("akka.cluster.session-timeout") must equal(60 * 1000) - // getMilliseconds("akka.cluster.connection-timeout") must equal(60 * 1000) - // getBoolean("akka.cluster.include-ref-node-in-replica-set") must equal(true) - // getString("akka.cluster.log-directory") must equal("_akka_cluster") - - // //akka.cluster.replication - // getString("akka.cluster.replication.digest-type") must equal("MAC") - // getString("akka.cluster.replication.password") must equal("secret") - // getInt("akka.cluster.replication.ensemble-size") must equal(3) - // getInt("akka.cluster.replication.quorum-size") must equal(2) - // getInt("akka.cluster.replication.snapshot-frequency") must equal(1000) - // getMilliseconds("akka.cluster.replication.timeout") must equal(30 * 1000) } } } diff --git a/akka-remote/src/test/scala/akka/remote/RemoteDeathWatchSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteDeathWatchSpec.scala index 80a4f3cffe..942ca0cd8d 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteDeathWatchSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteDeathWatchSpec.scala @@ -16,14 +16,14 @@ akka { /watchers.remote = "akka://other@127.0.0.1:2666" } } - remote.server { + remote.netty { hostname = "127.0.0.1" port = 2665 } } """)) with ImplicitSender with DefaultTimeout with DeathWatchSpec { - val other = ActorSystem("other", ConfigFactory.parseString("akka.remote.server.port=2666").withFallback(system.settings.config)) + val other = ActorSystem("other", ConfigFactory.parseString("akka.remote.netty.port=2666").withFallback(system.settings.config)) override def atTermination() { other.shutdown() diff --git a/akka-remote/src/test/scala/akka/remote/RemoteDeployerSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteDeployerSpec.scala index 15016748d0..37917e9410 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteDeployerSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteDeployerSpec.scala @@ -41,7 +41,7 @@ class RemoteDeployerSpec extends AkkaSpec(RemoteDeployerSpec.deployerConf) { service, deployment.get.config, RoundRobinRouter(3), - RemoteScope(UnparsedSystemAddress(Some("sys"), UnparsedTransportAddress("akka", "wallace", 2552)))))) + RemoteScope(Address("akka", "sys", Some("wallace"), Some(2552)))))) } } diff --git a/akka-remote/src/test/scala/akka/remote/RemoteRouterSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteRouterSpec.scala index 6418f93966..fa96afb82b 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteRouterSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteRouterSpec.scala @@ -20,7 +20,7 @@ object RemoteRouterSpec { class RemoteRouterSpec extends AkkaSpec(""" akka { actor.provider = "akka.remote.RemoteActorRefProvider" - remote.server { + remote.netty { hostname = localhost port = 12345 } @@ -44,7 +44,7 @@ akka { import RemoteRouterSpec._ - val conf = ConfigFactory.parseString("akka.remote.server.port=12346").withFallback(system.settings.config) + val conf = ConfigFactory.parseString("akka.remote.netty.port=12346").withFallback(system.settings.config) val other = ActorSystem("remote_sys", conf) override def atTermination() {