diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorLookupSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorLookupSpec.scala index 3d4f61caa1..3b18c742ce 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorLookupSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorLookupSpec.scala @@ -46,9 +46,10 @@ class ActorLookupSpec extends AkkaSpec with DefaultTimeout { val syst = sysImpl.systemGuardian val root = sysImpl.lookupRoot - def empty(path: String) = new EmptyLocalActorRef(system.eventStream, sysImpl.provider, system.dispatcher, path match { - case RelativeActorPath(elems) ⇒ system.actorFor("/").path / elems - }) + def empty(path: String) = + new EmptyLocalActorRef(sysImpl.provider, path match { + case RelativeActorPath(elems) ⇒ system.actorFor("/").path / elems + }, system.eventStream) "An ActorSystem" must { diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala index a4dbb4d1cb..a2c3c7da5a 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala @@ -290,7 +290,7 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout { val sysImpl = system.asInstanceOf[ActorSystemImpl] val addr = sysImpl.provider.rootPath.address - val serialized = SerializedActorRef(addr + "/non-existing") + val serialized = SerializedActorRef(RootActorPath(addr, "/non-existing")) out.writeObject(serialized) @@ -299,7 +299,7 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout { Serialization.currentSystem.withValue(sysImpl) { val in = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray)) - in.readObject must be === new EmptyLocalActorRef(system.eventStream, sysImpl.provider, system.dispatcher, system.actorFor("/").path / "non-existing") + in.readObject must be === new EmptyLocalActorRef(sysImpl.provider, system.actorFor("/").path / "non-existing", system.eventStream) } } diff --git a/akka-actor/src/main/scala/akka/actor/ActorPath.scala b/akka-actor/src/main/scala/akka/actor/ActorPath.scala index d0f0d8154b..99be8bae0e 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorPath.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorPath.scala @@ -87,6 +87,12 @@ sealed trait ActorPath extends Comparable[ActorPath] with Serializable { */ def root: RootActorPath + /** + * Generate String representation, replacing the Address in the RootActor + * Path with the given one unless this path’s address includes host and port + * information. + */ + def toStringWithAddress(address: Address): String } /** @@ -105,6 +111,10 @@ final case class RootActorPath(address: Address, name: String = "/") extends Act override val toString = address + name + def toStringWithAddress(addr: Address): String = + if (address.host.isDefined) address + name + else addr + name + def compareTo(other: ActorPath) = other match { case r: RootActorPath ⇒ toString compareTo r.toString case c: ChildActorPath ⇒ 1 @@ -151,6 +161,15 @@ final class ChildActorPath(val parent: ActorPath, val name: String) extends Acto rec(parent, new StringBuilder(32).append(name)).toString } + override def toStringWithAddress(addr: Address) = { + @tailrec + def rec(p: ActorPath, s: StringBuilder): StringBuilder = p match { + case r: RootActorPath ⇒ s.insert(0, r.toStringWithAddress(addr)) + case _ ⇒ rec(p.parent, s.insert(0, '/').insert(0, p.name)) + } + rec(parent, new StringBuilder(32).append(name)).toString + } + override def equals(other: Any): Boolean = { @tailrec def rec(left: ActorPath, right: ActorPath): Boolean = diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 7a448c628a..753adaa9fa 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -329,13 +329,13 @@ private[akka] class LocalActorRef private[akka] ( def restart(cause: Throwable): Unit = actorCell.restart(cause) @throws(classOf[java.io.ObjectStreamException]) - protected def writeReplace(): AnyRef = SerializedActorRef(path.toString) + protected def writeReplace(): AnyRef = SerializedActorRef(path) } /** * Memento pattern for serializing ActorRefs transparently */ -case class SerializedActorRef(path: String) { +case class SerializedActorRef private (path: String) { import akka.serialization.Serialization.currentSystem @throws(classOf[java.io.ObjectStreamException]) @@ -349,6 +349,15 @@ case class SerializedActorRef(path: String) { } } +object SerializedActorRef { + def apply(path: ActorPath): SerializedActorRef = { + Serialization.currentTransportAddress.value match { + case null ⇒ new SerializedActorRef(path.toString) + case addr ⇒ new SerializedActorRef(path.toStringWithAddress(addr)) + } + } +} + /** * Trait for ActorRef implementations where all methods contain default stubs. */ @@ -375,7 +384,7 @@ private[akka] trait MinimalActorRef extends InternalActorRef with LocalRef { def restart(cause: Throwable): Unit = () @throws(classOf[java.io.ObjectStreamException]) - protected def writeReplace(): AnyRef = SerializedActorRef(path.toString) + protected def writeReplace(): AnyRef = SerializedActorRef(path) } private[akka] object MinimalActorRef { @@ -398,57 +407,39 @@ private[akka] object DeadLetterActorRef { val serialized = new SerializedDeadLetterActorRef } -private[akka] trait DeadLetterActorRefLike extends MinimalActorRef { - - def eventStream: EventStream - - @volatile - private var _path: ActorPath = _ - def path: ActorPath = { - assert(_path != null) - _path - } - - @volatile - private var _provider: ActorRefProvider = _ - def provider = _provider - - private[akka] def init(provider: ActorRefProvider, path: ActorPath) { - _path = path - _provider = provider - } - - override def isTerminated(): Boolean = true - - override def !(message: Any)(implicit sender: ActorRef = this): Unit = message match { - case d: DeadLetter ⇒ eventStream.publish(d) - case _ ⇒ eventStream.publish(DeadLetter(message, sender, this)) - } -} - -private[akka] class DeadLetterActorRef(val eventStream: EventStream) extends DeadLetterActorRefLike { - @throws(classOf[java.io.ObjectStreamException]) - override protected def writeReplace(): AnyRef = DeadLetterActorRef.serialized -} - /** * This special dead letter reference has a name: it is that which is returned * by a local look-up which is unsuccessful. */ private[akka] class EmptyLocalActorRef( - val eventStream: EventStream, - _provider: ActorRefProvider, - _dispatcher: MessageDispatcher, - _path: ActorPath) extends DeadLetterActorRefLike { + val provider: ActorRefProvider, + val path: ActorPath, + val eventStream: EventStream) extends MinimalActorRef { - init(_provider, _path) + override def isTerminated(): Boolean = true override def !(message: Any)(implicit sender: ActorRef = null): Unit = message match { - case d: DeadLetter ⇒ // do NOT form endless loops + case d: DeadLetter ⇒ // do NOT form endless loops, since deadLetters will resend! case _ ⇒ eventStream.publish(DeadLetter(message, sender, this)) } } +/** + * Internal implementation of the dead letter destination: will publish any + * received message to the eventStream, wrapped as [[akka.actor.DeadLetter]]. + */ +private[akka] class DeadLetterActorRef(_provider: ActorRefProvider, _path: ActorPath, _eventStream: EventStream) + extends EmptyLocalActorRef(_provider, _path, _eventStream) { + + override def !(message: Any)(implicit sender: ActorRef = this): Unit = message match { + case d: DeadLetter ⇒ eventStream.publish(d) + case _ ⇒ eventStream.publish(DeadLetter(message, sender, this)) + } + + @throws(classOf[java.io.ObjectStreamException]) + override protected def writeReplace(): AnyRef = DeadLetterActorRef.serialized +} + /** * Internal implementation detail used for paths like “/temp” */ diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index c47b261b57..369e1429db 100755 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -33,11 +33,22 @@ trait ActorRefProvider { */ def systemGuardian: InternalActorRef + /** + * Dead letter destination for this provider. + */ + def deadLetters: ActorRef + /** * Reference to the death watch service. */ def deathWatch: DeathWatch + /** + * Care-taker of actor refs which await final termination but cannot be kept + * in their parent’s children list because the name shall be freed. + */ + def locker: Locker + /** * The root path for all actors within this actor system, including remote * address if enabled. @@ -281,25 +292,29 @@ class LocalActorRefProvider( val settings: ActorSystem.Settings, val eventStream: EventStream, val scheduler: Scheduler, - val deadLetters: InternalActorRef, - val rootPath: ActorPath, val deployer: Deployer) extends ActorRefProvider { + // this is the constructor needed for reflectively instantiating the provider def this(_systemName: String, settings: ActorSystem.Settings, eventStream: EventStream, - scheduler: Scheduler, - deadLetters: InternalActorRef) = + scheduler: Scheduler) = this(_systemName, settings, eventStream, scheduler, - deadLetters, - new RootActorPath(Address("akka", _systemName)), new Deployer(settings)) + val rootPath: ActorPath = RootActorPath(Address("akka", _systemName)) + val log = Logging(eventStream, "LocalActorRefProvider(" + rootPath.address + ")") + val deadLetters = new DeadLetterActorRef(this, rootPath / "deadLetters", eventStream) + + val deathWatch = new LocalDeathWatch(1024) //TODO make configrable + + val locker: Locker = new Locker(scheduler, settings.ReaperInterval, this, rootPath / "locker", deathWatch) + /* * generate name for temporary actor refs */ @@ -455,8 +470,6 @@ class LocalActorRefProvider( tempContainer.removeChild(path.name) } - val deathWatch = new LocalDeathWatch(1024) //TODO make configrable - def init(_system: ActorSystemImpl) { system = _system // chain death watchers so that killing guardian stops the application @@ -492,7 +505,7 @@ class LocalActorRefProvider( } else ref.getChild(path.iterator) match { case Nobody ⇒ log.debug("look-up of path sequence '{}' failed", path) - new EmptyLocalActorRef(eventStream, system.provider, dispatcher, ref.path / path) + new EmptyLocalActorRef(system.provider, ref.path / path, eventStream) case x ⇒ x } diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index 1347d4e0be..50b69b9ae8 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -267,7 +267,7 @@ abstract class ActorSystem extends ActorRefFactory { * (below which the logging actors reside) and the execute all registered * termination handlers (see [[ActorSystem.registerOnTermination]]). */ - def shutdown() + def shutdown(): Unit /** * Registers the provided extension and creates its payload, if this extension isn't already registered @@ -331,8 +331,8 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Exten import ActorSystem._ - final val settings = new Settings(applicationConfig, name) - final val threadFactory = new MonitorableThreadFactory(name, settings.Daemonicity) + final val settings: Settings = new Settings(applicationConfig, name) + final val threadFactory: MonitorableThreadFactory = new MonitorableThreadFactory(name, settings.Daemonicity) def logConfiguration(): Unit = log.info(settings.toString) @@ -377,35 +377,19 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Exten import settings._ // this provides basic logging (to stdout) until .start() is called below - val eventStream = new EventStream(DebugEventStream) + val eventStream: EventStream = new EventStream(DebugEventStream) eventStream.startStdoutLogger(settings) - // unfortunately we need logging before we know the rootpath address, which wants to be inserted here - @volatile - private var _log = new BusLogging(eventStream, "ActorSystem(" + name + ")", this.getClass) - def log = _log + val log: LoggingAdapter = new BusLogging(eventStream, "ActorSystem(" + name + ")", this.getClass) - val scheduler = createScheduler() - - val deadLetters = new DeadLetterActorRef(eventStream) - val deadLetterMailbox = new Mailbox(null) { - becomeClosed() - override def enqueue(receiver: ActorRef, envelope: Envelope) { deadLetters ! DeadLetter(envelope.message, envelope.sender, receiver) } - override def dequeue() = null - override def systemEnqueue(receiver: ActorRef, handle: SystemMessage) { deadLetters ! DeadLetter(handle, receiver, receiver) } - override def systemDrain(): SystemMessage = null - override def hasMessages = false - override def hasSystemMessages = false - override def numberOfMessages = 0 - } + val scheduler: Scheduler = createScheduler() val provider: ActorRefProvider = { val arguments = Seq( classOf[String] -> name, classOf[Settings] -> settings, classOf[EventStream] -> eventStream, - classOf[Scheduler] -> scheduler, - classOf[InternalActorRef] -> deadLetters) + classOf[Scheduler] -> scheduler) val loader = Thread.currentThread.getContextClassLoader match { case null ⇒ getClass.getClassLoader @@ -418,8 +402,23 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Exten } } - val dispatchers = new Dispatchers(settings, DefaultDispatcherPrerequisites(threadFactory, eventStream, deadLetterMailbox, scheduler)) - val dispatcher = dispatchers.defaultGlobalDispatcher + def deadLetters: ActorRef = provider.deadLetters + + val deadLetterMailbox: Mailbox = new Mailbox(null) { + becomeClosed() + override def enqueue(receiver: ActorRef, envelope: Envelope) { deadLetters ! DeadLetter(envelope.message, envelope.sender, receiver) } + override def dequeue() = null + override def systemEnqueue(receiver: ActorRef, handle: SystemMessage) { deadLetters ! DeadLetter(handle, receiver, receiver) } + override def systemDrain(): SystemMessage = null + override def hasMessages = false + override def hasSystemMessages = false + override def numberOfMessages = 0 + } + + def locker: Locker = provider.locker + + val dispatchers: Dispatchers = new Dispatchers(settings, DefaultDispatcherPrerequisites(threadFactory, eventStream, deadLetterMailbox, scheduler)) + val dispatcher: MessageDispatcher = dispatchers.defaultGlobalDispatcher def terminationFuture: Future[Unit] = provider.terminationFuture def lookupRoot: InternalActorRef = provider.rootGuardian @@ -433,21 +432,13 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Exten private lazy val _start: this.type = { // the provider is expected to start default loggers, LocalActorRefProvider does this provider.init(this) - _log = new BusLogging(eventStream, "ActorSystem(" + lookupRoot.path.address + ")", this.getClass) - deadLetters.init(provider, lookupRoot.path / "deadLetters") registerOnTermination(stopScheduler()) - // this starts the reaper actor and the user-configured logging subscribers, which are also actors - _locker = new Locker(scheduler, ReaperInterval, provider, lookupRoot.path / "locker", deathWatch) loadExtensions() if (LogConfigOnStart) logConfiguration() this } - @volatile - private var _locker: Locker = _ // initialized in start() - def locker = _locker - - def start() = _start + def start(): this.type = _start private lazy val terminationCallbacks = { val callbacks = new TerminationCallbacks @@ -459,9 +450,7 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Exten def awaitTermination(timeout: Duration) { Await.ready(terminationCallbacks, timeout) } def awaitTermination() = awaitTermination(Duration.Inf) - def shutdown() { - stop(guardian) - } + def shutdown(): Unit = stop(guardian) /** * Create the scheduler service. This one needs one special behavior: if @@ -557,7 +546,7 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Exten } } - override def toString = lookupRoot.path.root.address.toString + override def toString: String = lookupRoot.path.root.address.toString final class TerminationCallbacks extends Runnable with Awaitable[Unit] { private val lock = new ReentrantGuard diff --git a/akka-actor/src/main/scala/akka/serialization/Serialization.scala b/akka-actor/src/main/scala/akka/serialization/Serialization.scala index 78cb370b68..f56862b2fb 100644 --- a/akka-actor/src/main/scala/akka/serialization/Serialization.scala +++ b/akka-actor/src/main/scala/akka/serialization/Serialization.scala @@ -9,7 +9,7 @@ import akka.util.ReflectiveAccess import scala.util.DynamicVariable import com.typesafe.config.Config import akka.config.ConfigurationException -import akka.actor.{ Extension, ActorSystem, ExtendedActorSystem } +import akka.actor.{ Extension, ActorSystem, ExtendedActorSystem, Address } case class NoSerializerFoundException(m: String) extends AkkaException(m) @@ -27,6 +27,12 @@ object Serialization { */ val currentSystem = new DynamicVariable[ActorSystem](null) + /** + * This holds a reference to the current transport address to be inserted + * into local actor refs during serialization. + */ + val currentTransportAddress = new DynamicVariable[Address](null) + class Settings(val config: Config) { import scala.collection.JavaConverters._ diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index a41ca1984d..aefa770eaf 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -6,7 +6,7 @@ package akka.remote import akka.actor._ import akka.dispatch._ -import akka.event.{ DeathWatch, Logging } +import akka.event.{ DeathWatch, Logging, LoggingAdapter } import akka.event.EventStream import akka.config.ConfigurationException import java.util.concurrent.{ TimeoutException } @@ -22,43 +22,40 @@ class RemoteActorRefProvider( val systemName: String, val settings: ActorSystem.Settings, val eventStream: EventStream, - val scheduler: Scheduler, - _deadLetters: InternalActorRef) extends ActorRefProvider { + val scheduler: Scheduler) extends ActorRefProvider { val remoteSettings = new RemoteSettings(settings.config, systemName) + val deployer = new RemoteDeployer(settings) + + private val local = new LocalActorRefProvider(systemName, settings, eventStream, scheduler, deployer) + + @volatile + private var _log = local.log + def log: LoggingAdapter = _log + + def rootPath = local.rootPath + def locker = local.locker + def deadLetters = local.deadLetters + + val deathWatch = new RemoteDeathWatch(local.deathWatch, this) + + val failureDetector = new AccrualFailureDetector(remoteSettings.FailureDetectorThreshold, remoteSettings.FailureDetectorMaxSampleSize) + + // these are only available after init() def rootGuardian = local.rootGuardian def guardian = local.guardian def systemGuardian = local.systemGuardian def terminationFuture = local.terminationFuture def dispatcher = local.dispatcher - def registerTempActor(actorRef: InternalActorRef, path: ActorPath) = local.registerTempActor(actorRef, path) def unregisterTempActor(path: ActorPath) = local.unregisterTempActor(path) def tempPath() = local.tempPath() def tempContainer = local.tempContainer - val deployer = new RemoteDeployer(settings) - - val transport: RemoteTransport = { - val fqn = remoteSettings.RemoteTransport - // TODO check if this classloader is the right one - ReflectiveAccess.createInstance[RemoteTransport]( - fqn, - Seq(classOf[RemoteSettings] -> remoteSettings), - getClass.getClassLoader) match { - case Left(problem) ⇒ throw new RemoteTransportException("Could not load remote transport layer " + fqn, problem) - case Right(remote) ⇒ remote - } - } - - val log = Logging(eventStream, "RemoteActorRefProvider(" + transport.address + ")") - - val rootPath: ActorPath = RootActorPath(transport.address) - - private val local = new LocalActorRefProvider(systemName, settings, eventStream, scheduler, _deadLetters, rootPath, deployer) - - val failureDetector = new AccrualFailureDetector(remoteSettings.FailureDetectorThreshold, remoteSettings.FailureDetectorMaxSampleSize) + @volatile + private var _transport: RemoteTransport = _ + def transport: RemoteTransport = _transport @volatile private var _serialization: Serialization = _ @@ -72,15 +69,35 @@ class RemoteActorRefProvider( private var _networkEventStream: NetworkEventStream = _ def networkEventStream = _networkEventStream - val deathWatch = new RemoteDeathWatch(local.deathWatch, this) - def init(system: ActorSystemImpl) { local.init(system) - _remoteDaemon = new RemoteSystemDaemon(system, transport.address, rootPath / "remote", rootGuardian, log) + _remoteDaemon = new RemoteSystemDaemon(system, rootPath / "remote", rootGuardian, log) + local.registerExtraNames(Map(("remote", remoteDaemon))) + _serialization = SerializationExtension(system) - transport.start(system, this) + _networkEventStream = new NetworkEventStream(system) + system.eventStream.subscribe(networkEventStream.sender, classOf[RemoteLifeCycleEvent]) + + _transport = { + val fqn = remoteSettings.RemoteTransport + // TODO check if this classloader is the right one; hint: this class was loaded by contextClassLoader if that was not null + ReflectiveAccess.createInstance[RemoteTransport]( + fqn, + Seq(classOf[RemoteSettings] -> remoteSettings, + classOf[ActorSystemImpl] -> system, + classOf[RemoteActorRefProvider] -> this), + getClass.getClassLoader) match { + case Left(problem) ⇒ throw new RemoteTransportException("Could not load remote transport layer " + fqn, problem) + case Right(remote) ⇒ remote + } + } + + _log = Logging(eventStream, "RemoteActorRefProvider(" + transport.address + ")") + + // this enables reception of remote requests + _transport.start() val remoteClientLifeCycleHandler = system.systemActorOf(Props(new Actor { def receive = { @@ -90,12 +107,8 @@ class RemoteActorRefProvider( } }), "RemoteClientLifeCycleListener") - _networkEventStream = new NetworkEventStream(system) - - system.eventStream.subscribe(networkEventStream.sender, classOf[RemoteLifeCycleEvent]) system.eventStream.subscribe(remoteClientLifeCycleHandler, classOf[RemoteLifeCycleEvent]) - local.registerExtraNames(Map(("remote", remoteDaemon))) terminationFuture.onComplete(_ ⇒ transport.shutdown()) } @@ -149,7 +162,7 @@ class RemoteActorRefProvider( case Some(Deploy(_, _, _, RemoteScope(addr))) ⇒ if (addr == rootPath.address) local.actorOf(system, props, supervisor, path, false, deployment) else { - val rpath = RootActorPath(addr) / "remote" / rootPath.address.hostPort / path.elements + val rpath = RootActorPath(addr) / "remote" / transport.address.hostPort / path.elements useActorOnNode(rpath, props.creator, supervisor) new RemoteActorRef(this, transport, rpath, supervisor, None) } @@ -159,14 +172,13 @@ class RemoteActorRefProvider( } } - def actorFor(path: ActorPath): InternalActorRef = path.root match { - case `rootPath` ⇒ actorFor(rootGuardian, path.elements) - case _ ⇒ new RemoteActorRef(this, transport, path, Nobody, None) - } + def actorFor(path: ActorPath): InternalActorRef = + if (path.address == rootPath.address || path.address == transport.address) actorFor(rootGuardian, path.elements) + else new RemoteActorRef(this, transport, path, Nobody, None) def actorFor(ref: InternalActorRef, path: String): InternalActorRef = path match { case ActorPathExtractor(address, elems) ⇒ - if (address == rootPath.address) actorFor(rootGuardian, elems) + if (address == rootPath.address || address == transport.address) actorFor(rootGuardian, elems) else new RemoteActorRef(this, transport, new RootActorPath(address) / elems, Nobody, None) case _ ⇒ local.actorFor(ref, path) } @@ -227,7 +239,7 @@ private[akka] class RemoteActorRef private[akka] ( def restart(cause: Throwable): Unit = sendSystemMessage(Recreate(cause)) @throws(classOf[java.io.ObjectStreamException]) - private def writeReplace(): AnyRef = SerializedActorRef(path.toString) + private def writeReplace(): AnyRef = SerializedActorRef(path) } class RemoteDeathWatch(val local: LocalDeathWatch, val provider: RemoteActorRefProvider) extends DeathWatch { diff --git a/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala b/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala index a002f6fe46..1274e99416 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala @@ -18,7 +18,7 @@ case class DaemonMsgWatch(watcher: ActorRef, watched: ActorRef) extends DaemonMs * * It acts as the brain of the remote that responds to system remote events (messages) and undertakes action. */ -class RemoteSystemDaemon(system: ActorSystemImpl, address: Address, _path: ActorPath, _parent: InternalActorRef, _log: LoggingAdapter) +class RemoteSystemDaemon(system: ActorSystemImpl, _path: ActorPath, _parent: InternalActorRef, _log: LoggingAdapter) extends VirtualPathContainer(system.provider, _path, _parent, _log) { /** @@ -52,7 +52,8 @@ class RemoteSystemDaemon(system: ActorSystemImpl, address: Address, _path: Actor message match { case DaemonMsgCreate(factory, path, supervisor) ⇒ path match { - case ActorPathExtractor(`address`, elems) if elems.nonEmpty && elems.head == "remote" ⇒ + case ActorPathExtractor(address, elems) if elems.nonEmpty && elems.head == "remote" ⇒ + // TODO RK currently the extracted “address” is just ignored, is that okay? // TODO RK canonicalize path so as not to duplicate it always #1446 val subpath = elems.drop(1) val path = this.path / subpath diff --git a/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala b/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala index 4c671b7e05..6509d19383 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala @@ -17,6 +17,8 @@ class RemoteSettings(val config: Config, val systemName: String) { import config._ val RemoteTransport = getString("akka.remote.transport") + val LogReceive = getBoolean("akka.remote.log-received-messages") + val LogSend = getBoolean("akka.remote.log-sent-messages") // AccrualFailureDetector val FailureDetectorThreshold = getInt("akka.remote.failure-detector.threshold") diff --git a/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala b/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala index 19fd6c1c43..8d29111672 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala @@ -5,12 +5,12 @@ package akka.remote import scala.reflect.BeanProperty - import akka.actor.{ Terminated, LocalRef, InternalActorRef, AutoReceivedMessage, AddressExtractor, Address, ActorSystemImpl, ActorSystem, ActorRef } import akka.dispatch.SystemMessage import akka.event.{ LoggingAdapter, Logging } import akka.remote.RemoteProtocol.{ RemoteMessageProtocol, RemoteControlProtocol, AkkaRemoteProtocol, ActorRefProtocol } import akka.AkkaException +import akka.serialization.Serialization /** * Remote life-cycle events. @@ -199,9 +199,9 @@ abstract class RemoteTransport { def system: ActorSystem /** - * Starts up the remoting + * Start up the transport, i.e. enable incoming connections. */ - def start(system: ActorSystemImpl, provider: RemoteActorRefProvider): Unit + def start(): Unit /** * Shuts down a specific client connected to the supplied remote address returns true if successful @@ -251,6 +251,8 @@ trait RemoteMarshallingOps { def provider: RemoteActorRefProvider + def address: Address + protected def useUntrustedMode: Boolean def createMessageSendEnvelope(rmp: RemoteMessageProtocol): AkkaRemoteProtocol = { @@ -269,7 +271,7 @@ trait RemoteMarshallingOps { * Serializes the ActorRef instance into a Protocol Buffers (protobuf) Message. */ def toRemoteActorRefProtocol(actor: ActorRef): ActorRefProtocol = { - ActorRefProtocol.newBuilder.setPath(actor.path.toString).build + ActorRefProtocol.newBuilder.setPath(actor.path.toStringWithAddress(address)).build } def createRemoteMessageProtocolBuilder( @@ -278,20 +280,21 @@ trait RemoteMarshallingOps { senderOption: Option[ActorRef]): RemoteMessageProtocol.Builder = { val messageBuilder = RemoteMessageProtocol.newBuilder.setRecipient(toRemoteActorRefProtocol(recipient)) - messageBuilder.setMessage(MessageSerializer.serialize(system, message.asInstanceOf[AnyRef])) - if (senderOption.isDefined) messageBuilder.setSender(toRemoteActorRefProtocol(senderOption.get)) + Serialization.currentTransportAddress.withValue(address) { + messageBuilder.setMessage(MessageSerializer.serialize(system, message.asInstanceOf[AnyRef])) + } + messageBuilder } def receiveMessage(remoteMessage: RemoteMessage) { - log.debug("received message {}", remoteMessage) - val remoteDaemon = provider.remoteDaemon remoteMessage.recipient match { case `remoteDaemon` ⇒ + if (provider.remoteSettings.LogReceive) log.debug("received daemon message {}", remoteMessage) remoteMessage.payload match { case m @ (_: DaemonMsg | _: Terminated) ⇒ try remoteDaemon ! m catch { @@ -300,6 +303,7 @@ trait RemoteMarshallingOps { case x ⇒ log.warning("remoteDaemon received illegal message {} from {}", x, remoteMessage.sender) } case l: LocalRef ⇒ + if (provider.remoteSettings.LogReceive) log.debug("received local message {}", remoteMessage) remoteMessage.payload match { case msg: SystemMessage ⇒ if (useUntrustedMode) @@ -309,9 +313,11 @@ trait RemoteMarshallingOps { throw new SecurityException("RemoteModule server is operating is untrusted mode, can not pass on a AutoReceivedMessage to the remote actor") case m ⇒ l.!(m)(remoteMessage.sender) } - case r: RemoteActorRef ⇒ + case r: RemoteRef ⇒ + if (provider.remoteSettings.LogReceive) log.debug("received remote-destined message {}", remoteMessage) remoteMessage.originalReceiver match { case AddressExtractor(address) if address == provider.transport.address ⇒ + // if it was originally addressed to us but is in fact remote from our point of view (i.e. remote-deployed) r.!(remoteMessage.payload)(remoteMessage.sender) case r ⇒ log.error("dropping message {} for non-local recipient {}", remoteMessage.payload, r) } diff --git a/akka-remote/src/main/scala/akka/remote/netty/Client.scala b/akka-remote/src/main/scala/akka/remote/netty/Client.scala index 4a9d6607fd..b72fd8b893 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/Client.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/Client.scala @@ -58,7 +58,7 @@ abstract class RemoteClient private[akka] ( * Converts the message to the wireprotocol and sends the message across the wire */ def send(message: Any, senderOption: Option[ActorRef], recipient: ActorRef): Unit = if (isRunning) { - log.debug("Sending message {} from {} to {}", message, senderOption, recipient) + if (netty.remoteSettings.LogSend) log.debug("Sending message {} from {} to {}", message, senderOption, recipient) send((message, senderOption, recipient)) } else { val exception = new RemoteClientException("RemoteModule client is not running, make sure you have invoked 'RemoteClient.connect()' before using it.", netty, remoteAddress) diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index 4226f9dffc..7ad10c92ff 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -33,7 +33,7 @@ import akka.event.LoggingAdapter /** * Provides the implementation of the Netty remote support */ -class NettyRemoteTransport(val remoteSettings: RemoteSettings) +class NettyRemoteTransport(val remoteSettings: RemoteSettings, val system: ActorSystemImpl, val provider: RemoteActorRefProvider) extends RemoteTransport with RemoteMarshallingOps { val settings = new NettySettings(remoteSettings.config.getConfig("akka.remote.netty"), remoteSettings.systemName) @@ -62,33 +62,11 @@ class NettyRemoteTransport(val remoteSettings: RemoteSettings) case ex ⇒ shutdown(); throw ex } - val address = { - server.channel.getLocalAddress match { - case ia: InetSocketAddress ⇒ Address("akka", remoteSettings.systemName, Some(ia.getHostName), Some(ia.getPort)) - case x ⇒ - shutdown() - throw new IllegalArgumentException("unknown address format " + x + ":" + x.getClass) - } - } + val address = Address("akka", remoteSettings.systemName, Some(settings.Hostname), Some(settings.Port)) - @volatile - private var _system: ActorSystemImpl = _ - def system = _system + val log = Logging(system.eventStream, "NettyRemoteTransport(" + address + ")") - @volatile - private var _provider: RemoteActorRefProvider = _ - def provider = _provider - - @volatile - private var _log: LoggingAdapter = _ - def log = _log - - def start(system: ActorSystemImpl, provider: RemoteActorRefProvider): Unit = { - _system = system - _provider = provider - _log = Logging(system, "NettyRemoteTransport") - server.start(system) - } + def start(): Unit = server.start() def shutdown(): Unit = { clientsLock.writeLock().lock() diff --git a/akka-remote/src/main/scala/akka/remote/netty/Server.scala b/akka-remote/src/main/scala/akka/remote/netty/Server.scala index a50d3653e8..f695042331 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/Server.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/Server.scala @@ -44,17 +44,12 @@ class NettyRemoteServer( bootstrap.setPipelineFactory(pipelineFactory) bootstrap.setOption("backlog", settings.Backlog) bootstrap.setOption("tcpNoDelay", true) - bootstrap.setOption("keepAlive", true) + bootstrap.setOption("child.keepAlive", true) bootstrap.setOption("reuseAddress", true) - val channel = bootstrap.bind(new InetSocketAddress(ip, settings.Port)) - - openChannels.add(channel) - - def start(system: ActorSystemImpl) { + def start(): Unit = { + openChannels.add(bootstrap.bind(new InetSocketAddress(ip, settings.Port))) netty.notifyListeners(RemoteServerStarted(netty)) - // TODO uncork the pipeline, which was ... - // TODO ... corked before in order not to allow anything through before init is complete } def shutdown() { diff --git a/akka-remote/src/main/scala/akka/remote/netty/Settings.scala b/akka-remote/src/main/scala/akka/remote/netty/Settings.scala index 9596f1b083..4e31720b2b 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/Settings.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/Settings.scala @@ -38,7 +38,15 @@ class NettySettings(config: Config, val systemName: String) { case "" ⇒ InetAddress.getLocalHost.getHostAddress case value ⇒ value } - val Port = getInt("port") + val Port = getInt("port") match { + case 0 ⇒ + try { + val s = new java.net.ServerSocket(0) + try s.getLocalPort finally s.close() + } catch { case e ⇒ throw new ConfigurationException("Unable to obtain random port", e) } + case other ⇒ other + } + val ConnectionTimeout = Duration(getMilliseconds("connection-timeout"), MILLISECONDS) val Backlog = getInt("backlog") diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/DirectRoutedRemoteActorMultiJvmSpec.scala b/akka-remote/src/multi-jvm/scala/akka/remote/DirectRoutedRemoteActorMultiJvmSpec.scala index ccfdea0189..1c7d4b2602 100644 --- a/akka-remote/src/multi-jvm/scala/akka/remote/DirectRoutedRemoteActorMultiJvmSpec.scala +++ b/akka-remote/src/multi-jvm/scala/akka/remote/DirectRoutedRemoteActorMultiJvmSpec.scala @@ -1,6 +1,6 @@ package akka.remote -import akka.actor.{ Actor, Props } +import akka.actor.{ Actor, ActorRef, Props } import akka.testkit._ import akka.dispatch.Await import akka.pattern.ask @@ -10,7 +10,7 @@ object DirectRoutedRemoteActorMultiJvmSpec extends AbstractRemoteActorMultiJvmSp class SomeActor extends Actor with Serializable { def receive = { - case "identify" ⇒ sender ! self.path.address.hostPort + case "identify" ⇒ sender ! self } } @@ -53,7 +53,7 @@ class DirectRoutedRemoteActorMultiJvmNode2 extends AkkaRemoteSpec(nodeConfigs(1) val actor = system.actorOf(Props[SomeActor], "service-hello") actor.isInstanceOf[RemoteActorRef] must be(true) - Await.result(actor ? "identify", timeout.duration) must equal(akkaSpec(0)) + Await.result(actor ? "identify", timeout.duration).asInstanceOf[ActorRef].path.address.hostPort must equal(akkaSpec(0)) barrier("done") } diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/NewRemoteActorMultiJvmSpec.scala b/akka-remote/src/multi-jvm/scala/akka/remote/NewRemoteActorMultiJvmSpec.scala index 59e6d4e850..a91b203707 100644 --- a/akka-remote/src/multi-jvm/scala/akka/remote/NewRemoteActorMultiJvmSpec.scala +++ b/akka-remote/src/multi-jvm/scala/akka/remote/NewRemoteActorMultiJvmSpec.scala @@ -1,6 +1,6 @@ package akka.remote -import akka.actor.{ Actor, Props } +import akka.actor.{ Actor, ActorRef, Props } import akka.testkit._ import akka.dispatch.Await import akka.pattern.ask @@ -10,7 +10,7 @@ object NewRemoteActorMultiJvmSpec extends AbstractRemoteActorMultiJvmSpec { class SomeActor extends Actor with Serializable { def receive = { - case "identify" ⇒ sender ! self.path.address.hostPort + case "identify" ⇒ sender ! self } } @@ -53,7 +53,7 @@ class NewRemoteActorMultiJvmNode2 extends AkkaRemoteSpec(NewRemoteActorMultiJvmS barrier("start") val actor = system.actorOf(Props[SomeActor], "service-hello") - Await.result(actor ? "identify", timeout.duration) must equal(akkaSpec(0)) + Await.result(actor ? "identify", timeout.duration).asInstanceOf[ActorRef].path.address.hostPort must equal(akkaSpec(0)) barrier("done") } diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/RandomRoutedRemoteActorMultiJvmSpec.scala b/akka-remote/src/multi-jvm/scala/akka/remote/RandomRoutedRemoteActorMultiJvmSpec.scala index 89d5da3bd2..f895708294 100644 --- a/akka-remote/src/multi-jvm/scala/akka/remote/RandomRoutedRemoteActorMultiJvmSpec.scala +++ b/akka-remote/src/multi-jvm/scala/akka/remote/RandomRoutedRemoteActorMultiJvmSpec.scala @@ -1,6 +1,6 @@ package akka.remote -import akka.actor.{ Actor, Props } +import akka.actor.{ Actor, ActorRef, Props } import akka.routing._ import akka.testkit.DefaultTimeout import akka.dispatch.Await @@ -10,7 +10,7 @@ object RandomRoutedRemoteActorMultiJvmSpec extends AbstractRemoteActorMultiJvmSp override def NrOfNodes = 4 class SomeActor extends Actor with Serializable { def receive = { - case "hit" ⇒ sender ! self.path.address.hostPort + case "hit" ⇒ sender ! self case "end" ⇒ context.stop(self) } } @@ -89,7 +89,7 @@ class RandomRoutedRemoteActorMultiJvmNode4 extends AkkaRemoteSpec(RandomRoutedRe for (i ← 0 until iterationCount) { for (k ← 0 until connectionCount) { - val nodeName = Await.result(actor ? "hit", timeout.duration).toString + val nodeName = Await.result(actor ? "hit", timeout.duration).asInstanceOf[ActorRef].path.address.hostPort replies = replies + (nodeName -> (replies(nodeName) + 1)) } } diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/RoundRobinRoutedRemoteActorMultiJvmSpec.scala b/akka-remote/src/multi-jvm/scala/akka/remote/RoundRobinRoutedRemoteActorMultiJvmSpec.scala index 0b3db1c8bd..21c7e4cf64 100644 --- a/akka-remote/src/multi-jvm/scala/akka/remote/RoundRobinRoutedRemoteActorMultiJvmSpec.scala +++ b/akka-remote/src/multi-jvm/scala/akka/remote/RoundRobinRoutedRemoteActorMultiJvmSpec.scala @@ -1,6 +1,6 @@ package akka.remote -import akka.actor.{ Actor, Props } +import akka.actor.{ Actor, ActorRef, Props } import akka.routing._ import akka.testkit.DefaultTimeout import akka.dispatch.Await @@ -11,7 +11,7 @@ object RoundRobinRoutedRemoteActorMultiJvmSpec extends AbstractRemoteActorMultiJ class SomeActor extends Actor with Serializable { def receive = { - case "hit" ⇒ sender ! self.path.address.hostPort + case "hit" ⇒ sender ! self case "end" ⇒ context.stop(self) } } @@ -90,7 +90,7 @@ class RoundRobinRoutedRemoteActorMultiJvmNode4 extends AkkaRemoteSpec(RoundRobin for (i ← 0 until iterationCount) { for (k ← 0 until connectionCount) { - val nodeName = Await.result(actor ? "hit", timeout.duration).toString + val nodeName = Await.result(actor ? "hit", timeout.duration).asInstanceOf[ActorRef].path.address.hostPort replies = replies + (nodeName -> (replies(nodeName) + 1)) } diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/ScatterGatherRoutedRemoteActorMultiJvmSpec.scala b/akka-remote/src/multi-jvm/scala/akka/remote/ScatterGatherRoutedRemoteActorMultiJvmSpec.scala index d2cb625469..0eb3caa8e4 100644 --- a/akka-remote/src/multi-jvm/scala/akka/remote/ScatterGatherRoutedRemoteActorMultiJvmSpec.scala +++ b/akka-remote/src/multi-jvm/scala/akka/remote/ScatterGatherRoutedRemoteActorMultiJvmSpec.scala @@ -1,6 +1,6 @@ package akka.remote -import akka.actor.{ Actor, Props } +import akka.actor.{ Actor, ActorRef, Props } import akka.routing._ import akka.testkit._ import akka.util.duration._ @@ -9,7 +9,7 @@ object ScatterGatherRoutedRemoteActorMultiJvmSpec extends AbstractRemoteActorMul override def NrOfNodes = 4 class SomeActor extends Actor with Serializable { def receive = { - case "hit" ⇒ sender ! self.path.address.hostPort + case "hit" ⇒ sender ! self case "end" ⇒ context.stop(self) } } @@ -89,7 +89,7 @@ class ScatterGatherRoutedRemoteActorMultiJvmNode4 extends AkkaRemoteSpec(Scatter } val replies = (receiveWhile(5 seconds, messages = connectionCount * iterationCount) { - case name: String ⇒ (name, 1) + case ref: ActorRef ⇒ (ref.asInstanceOf[ActorRef].path.address.hostPort, 1) }).foldLeft(Map(akkaSpec(0) -> 0, akkaSpec(1) -> 0, akkaSpec(2) -> 0)) { case (m, (n, c)) ⇒ m + (n -> (m(n) + c)) } diff --git a/akka-remote/src/test/scala/akka/remote/RemoteRouterSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteRouterSpec.scala index 367c6faf57..ac3833130c 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteRouterSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteRouterSpec.scala @@ -11,7 +11,7 @@ import com.typesafe.config._ object RemoteRouterSpec { class Echo extends Actor { def receive = { - case _ ⇒ sender ! self.path + case _ ⇒ sender ! self } } } @@ -56,26 +56,26 @@ akka { "deploy its children on remote host driven by configuration" in { val router = system.actorOf(Props[Echo].withRouter(RoundRobinRouter(2)), "blub") router ! "" - expectMsgType[ActorPath].toString must be === "akka://remote_sys@localhost:12346/remote/RemoteRouterSpec@localhost:12345/user/blub/c1" + expectMsgType[ActorRef].path.toString must be === "akka://remote_sys@localhost:12346/remote/RemoteRouterSpec@localhost:12345/user/blub/c1" router ! "" - expectMsgType[ActorPath].toString must be === "akka://remote_sys@localhost:12346/remote/RemoteRouterSpec@localhost:12345/user/blub/c2" + expectMsgType[ActorRef].path.toString must be === "akka://remote_sys@localhost:12346/remote/RemoteRouterSpec@localhost:12345/user/blub/c2" } "deploy its children on remote host driven by programatic definition" in { val router = system.actorOf(Props[Echo].withRouter(new RemoteRouterConfig(RoundRobinRouter(2), Seq("akka://remote_sys@localhost:12346"))), "blub2") router ! "" - expectMsgType[ActorPath].toString must be === "akka://remote_sys@localhost:12346/remote/RemoteRouterSpec@localhost:12345/user/blub2/c1" + expectMsgType[ActorRef].path.toString must be === "akka://remote_sys@localhost:12346/remote/RemoteRouterSpec@localhost:12345/user/blub2/c1" router ! "" - expectMsgType[ActorPath].toString must be === "akka://remote_sys@localhost:12346/remote/RemoteRouterSpec@localhost:12345/user/blub2/c2" + expectMsgType[ActorRef].path.toString must be === "akka://remote_sys@localhost:12346/remote/RemoteRouterSpec@localhost:12345/user/blub2/c2" } "deploy dynamic resizable number of children on remote host driven by configuration" in { val router = system.actorOf(Props[Echo].withRouter(FromConfig), "elastic-blub") router ! "" - expectMsgType[ActorPath].toString must be === "akka://remote_sys@localhost:12346/remote/RemoteRouterSpec@localhost:12345/user/elastic-blub/c1" + expectMsgType[ActorRef].path.toString must be === "akka://remote_sys@localhost:12346/remote/RemoteRouterSpec@localhost:12345/user/elastic-blub/c1" router ! "" - expectMsgType[ActorPath].toString must be === "akka://remote_sys@localhost:12346/remote/RemoteRouterSpec@localhost:12345/user/elastic-blub/c2" + expectMsgType[ActorRef].path.toString must be === "akka://remote_sys@localhost:12346/remote/RemoteRouterSpec@localhost:12345/user/elastic-blub/c2" } }