diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorLookupSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorLookupSpec.scala
index 3d4f61caa1..626e413ec8 100644
--- a/akka-actor-tests/src/test/scala/akka/actor/ActorLookupSpec.scala
+++ b/akka-actor-tests/src/test/scala/akka/actor/ActorLookupSpec.scala
@@ -7,6 +7,7 @@ import akka.testkit._
import akka.util.duration._
import akka.dispatch.Await
import akka.pattern.ask
+import java.net.MalformedURLException
object ActorLookupSpec {
@@ -46,9 +47,10 @@ class ActorLookupSpec extends AkkaSpec with DefaultTimeout {
val syst = sysImpl.systemGuardian
val root = sysImpl.lookupRoot
- def empty(path: String) = new EmptyLocalActorRef(system.eventStream, sysImpl.provider, system.dispatcher, path match {
- case RelativeActorPath(elems) ⇒ system.actorFor("/").path / elems
- })
+ def empty(path: String) =
+ new EmptyLocalActorRef(sysImpl.provider, path match {
+ case RelativeActorPath(elems) ⇒ system.actorFor("/").path / elems
+ }, system.eventStream)
"An ActorSystem" must {
@@ -286,4 +288,25 @@ class ActorLookupSpec extends AkkaSpec with DefaultTimeout {
}
+ "An ActorPath" must {
+
+ "support parsing its String rep" in {
+ val path = system.actorFor("user").path
+ ActorPath.fromString(path.toString) must be(path)
+ }
+
+ "support parsing remote paths" in {
+ val remote = "akka://sys@host:1234/some/ref"
+ ActorPath.fromString(remote).toString must be(remote)
+ }
+
+ "throw exception upon malformed paths" in {
+ intercept[MalformedURLException] { ActorPath.fromString("") }
+ intercept[MalformedURLException] { ActorPath.fromString("://hallo") }
+ intercept[MalformedURLException] { ActorPath.fromString("s://dd@:12") }
+ intercept[MalformedURLException] { ActorPath.fromString("s://dd@h:hd") }
+ }
+
+ }
+
}
\ No newline at end of file
diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala
index a4dbb4d1cb..a2c3c7da5a 100644
--- a/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala
+++ b/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala
@@ -290,7 +290,7 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout {
val sysImpl = system.asInstanceOf[ActorSystemImpl]
val addr = sysImpl.provider.rootPath.address
- val serialized = SerializedActorRef(addr + "/non-existing")
+ val serialized = SerializedActorRef(RootActorPath(addr, "/non-existing"))
out.writeObject(serialized)
@@ -299,7 +299,7 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout {
Serialization.currentSystem.withValue(sysImpl) {
val in = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray))
- in.readObject must be === new EmptyLocalActorRef(system.eventStream, sysImpl.provider, system.dispatcher, system.actorFor("/").path / "non-existing")
+ in.readObject must be === new EmptyLocalActorRef(sysImpl.provider, system.actorFor("/").path / "non-existing", system.eventStream)
}
}
diff --git a/akka-actor/src/main/scala/akka/actor/ActorPath.scala b/akka-actor/src/main/scala/akka/actor/ActorPath.scala
index d0f0d8154b..5ba0ae4600 100644
--- a/akka-actor/src/main/scala/akka/actor/ActorPath.scala
+++ b/akka-actor/src/main/scala/akka/actor/ActorPath.scala
@@ -3,6 +3,7 @@
*/
package akka.actor
import scala.annotation.tailrec
+import java.net.MalformedURLException
object ActorPath {
def split(s: String): List[String] = {
@@ -16,6 +17,11 @@ object ActorPath {
rec(s.length, Nil)
}
+ def fromString(s: String): ActorPath = s match {
+ case ActorPathExtractor(addr, elems) ⇒ RootActorPath(addr) / elems
+ case _ ⇒ throw new MalformedURLException("cannot parse as ActorPath: " + s)
+ }
+
val ElementRegex = """[-\w:@&=+,.!~*'_;][-\w:@&=+,.!~*'$_;]*""".r
}
@@ -87,6 +93,12 @@ sealed trait ActorPath extends Comparable[ActorPath] with Serializable {
*/
def root: RootActorPath
+ /**
+ * Generate String representation, replacing the Address in the RootActor
+ * Path with the given one unless this path’s address includes host and port
+ * information.
+ */
+ def toStringWithAddress(address: Address): String
}
/**
@@ -105,6 +117,10 @@ final case class RootActorPath(address: Address, name: String = "/") extends Act
override val toString = address + name
+ def toStringWithAddress(addr: Address): String =
+ if (address.host.isDefined) address + name
+ else addr + name
+
def compareTo(other: ActorPath) = other match {
case r: RootActorPath ⇒ toString compareTo r.toString
case c: ChildActorPath ⇒ 1
@@ -151,6 +167,15 @@ final class ChildActorPath(val parent: ActorPath, val name: String) extends Acto
rec(parent, new StringBuilder(32).append(name)).toString
}
+ override def toStringWithAddress(addr: Address) = {
+ @tailrec
+ def rec(p: ActorPath, s: StringBuilder): StringBuilder = p match {
+ case r: RootActorPath ⇒ s.insert(0, r.toStringWithAddress(addr))
+ case _ ⇒ rec(p.parent, s.insert(0, '/').insert(0, p.name))
+ }
+ rec(parent, new StringBuilder(32).append(name)).toString
+ }
+
override def equals(other: Any): Boolean = {
@tailrec
def rec(left: ActorPath, right: ActorPath): Boolean =
diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala
index fa6c9962e7..753adaa9fa 100644
--- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala
+++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala
@@ -211,7 +211,7 @@ private[akka] abstract class InternalActorRef extends ActorRef with ScalaActorRe
* This is an internal look-up failure token, not useful for anything else.
*/
private[akka] case object Nobody extends MinimalActorRef {
- val path = new RootActorPath(new LocalAddress("all-systems"), "/Nobody")
+ val path = new RootActorPath(Address("akka", "all-systems"), "/Nobody")
def provider = throw new UnsupportedOperationException("Nobody does not provide")
}
@@ -329,13 +329,13 @@ private[akka] class LocalActorRef private[akka] (
def restart(cause: Throwable): Unit = actorCell.restart(cause)
@throws(classOf[java.io.ObjectStreamException])
- protected def writeReplace(): AnyRef = SerializedActorRef(path.toString)
+ protected def writeReplace(): AnyRef = SerializedActorRef(path)
}
/**
* Memento pattern for serializing ActorRefs transparently
*/
-case class SerializedActorRef(path: String) {
+case class SerializedActorRef private (path: String) {
import akka.serialization.Serialization.currentSystem
@throws(classOf[java.io.ObjectStreamException])
@@ -349,6 +349,15 @@ case class SerializedActorRef(path: String) {
}
}
+object SerializedActorRef {
+ def apply(path: ActorPath): SerializedActorRef = {
+ Serialization.currentTransportAddress.value match {
+ case null ⇒ new SerializedActorRef(path.toString)
+ case addr ⇒ new SerializedActorRef(path.toStringWithAddress(addr))
+ }
+ }
+}
+
/**
* Trait for ActorRef implementations where all methods contain default stubs.
*/
@@ -375,7 +384,7 @@ private[akka] trait MinimalActorRef extends InternalActorRef with LocalRef {
def restart(cause: Throwable): Unit = ()
@throws(classOf[java.io.ObjectStreamException])
- protected def writeReplace(): AnyRef = SerializedActorRef(path.toString)
+ protected def writeReplace(): AnyRef = SerializedActorRef(path)
}
private[akka] object MinimalActorRef {
@@ -398,57 +407,39 @@ private[akka] object DeadLetterActorRef {
val serialized = new SerializedDeadLetterActorRef
}
-private[akka] trait DeadLetterActorRefLike extends MinimalActorRef {
-
- def eventStream: EventStream
-
- @volatile
- private var _path: ActorPath = _
- def path: ActorPath = {
- assert(_path != null)
- _path
- }
-
- @volatile
- private var _provider: ActorRefProvider = _
- def provider = _provider
-
- private[akka] def init(provider: ActorRefProvider, path: ActorPath) {
- _path = path
- _provider = provider
- }
-
- override def isTerminated(): Boolean = true
-
- override def !(message: Any)(implicit sender: ActorRef = this): Unit = message match {
- case d: DeadLetter ⇒ eventStream.publish(d)
- case _ ⇒ eventStream.publish(DeadLetter(message, sender, this))
- }
-}
-
-private[akka] class DeadLetterActorRef(val eventStream: EventStream) extends DeadLetterActorRefLike {
- @throws(classOf[java.io.ObjectStreamException])
- override protected def writeReplace(): AnyRef = DeadLetterActorRef.serialized
-}
-
/**
* This special dead letter reference has a name: it is that which is returned
* by a local look-up which is unsuccessful.
*/
private[akka] class EmptyLocalActorRef(
- val eventStream: EventStream,
- _provider: ActorRefProvider,
- _dispatcher: MessageDispatcher,
- _path: ActorPath) extends DeadLetterActorRefLike {
+ val provider: ActorRefProvider,
+ val path: ActorPath,
+ val eventStream: EventStream) extends MinimalActorRef {
- init(_provider, _path)
+ override def isTerminated(): Boolean = true
override def !(message: Any)(implicit sender: ActorRef = null): Unit = message match {
- case d: DeadLetter ⇒ // do NOT form endless loops
+ case d: DeadLetter ⇒ // do NOT form endless loops, since deadLetters will resend!
case _ ⇒ eventStream.publish(DeadLetter(message, sender, this))
}
}
+/**
+ * Internal implementation of the dead letter destination: will publish any
+ * received message to the eventStream, wrapped as [[akka.actor.DeadLetter]].
+ */
+private[akka] class DeadLetterActorRef(_provider: ActorRefProvider, _path: ActorPath, _eventStream: EventStream)
+ extends EmptyLocalActorRef(_provider, _path, _eventStream) {
+
+ override def !(message: Any)(implicit sender: ActorRef = this): Unit = message match {
+ case d: DeadLetter ⇒ eventStream.publish(d)
+ case _ ⇒ eventStream.publish(DeadLetter(message, sender, this))
+ }
+
+ @throws(classOf[java.io.ObjectStreamException])
+ override protected def writeReplace(): AnyRef = DeadLetterActorRef.serialized
+}
+
/**
* Internal implementation detail used for paths like “/temp”
*/
diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala
index a7e9a2163e..56c3389072 100755
--- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala
+++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala
@@ -33,11 +33,22 @@ trait ActorRefProvider {
*/
def systemGuardian: InternalActorRef
+ /**
+ * Dead letter destination for this provider.
+ */
+ def deadLetters: ActorRef
+
/**
* Reference to the death watch service.
*/
def deathWatch: DeathWatch
+ /**
+ * Care-taker of actor refs which await final termination but cannot be kept
+ * in their parent’s children list because the name shall be freed.
+ */
+ def locker: Locker
+
/**
* The root path for all actors within this actor system, including remote
* address if enabled.
@@ -281,25 +292,30 @@ class LocalActorRefProvider(
val settings: ActorSystem.Settings,
val eventStream: EventStream,
val scheduler: Scheduler,
- val deadLetters: InternalActorRef,
- val rootPath: ActorPath,
val deployer: Deployer) extends ActorRefProvider {
+ // this is the constructor needed for reflectively instantiating the provider
def this(_systemName: String,
settings: ActorSystem.Settings,
eventStream: EventStream,
scheduler: Scheduler,
- deadLetters: InternalActorRef) =
+ classloader: ClassLoader) =
this(_systemName,
settings,
eventStream,
scheduler,
- deadLetters,
- new RootActorPath(LocalAddress(_systemName)),
- new Deployer(settings))
+ new Deployer(settings, classloader))
+
+ val rootPath: ActorPath = RootActorPath(Address("akka", _systemName))
val log = Logging(eventStream, "LocalActorRefProvider(" + rootPath.address + ")")
+ val deadLetters = new DeadLetterActorRef(this, rootPath / "deadLetters", eventStream)
+
+ val deathWatch = new LocalDeathWatch(1024) //TODO make configrable
+
+ val locker: Locker = new Locker(scheduler, settings.ReaperInterval, this, rootPath / "locker", deathWatch)
+
/*
* generate name for temporary actor refs
*/
@@ -455,8 +471,6 @@ class LocalActorRefProvider(
tempContainer.removeChild(path.name)
}
- val deathWatch = new LocalDeathWatch(1024) //TODO make configrable
-
def init(_system: ActorSystemImpl) {
system = _system
// chain death watchers so that killing guardian stops the application
@@ -472,7 +486,7 @@ class LocalActorRefProvider(
deadLetters
} else if (elems.head.isEmpty) actorFor(rootGuardian, elems.tail)
else actorFor(ref, elems)
- case LocalActorPath(address, elems) if address == rootPath.address ⇒ actorFor(rootGuardian, elems)
+ case ActorPathExtractor(address, elems) if address == rootPath.address ⇒ actorFor(rootGuardian, elems)
case _ ⇒
log.debug("look-up of unknown path '{}' failed", path)
deadLetters
@@ -492,7 +506,7 @@ class LocalActorRefProvider(
} else ref.getChild(path.iterator) match {
case Nobody ⇒
log.debug("look-up of path sequence '{}' failed", path)
- new EmptyLocalActorRef(eventStream, system.provider, dispatcher, ref.path / path)
+ new EmptyLocalActorRef(system.provider, ref.path / path, eventStream)
case x ⇒ x
}
diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala
index fc4a6f1f19..e3235a5cec 100644
--- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala
+++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala
@@ -267,7 +267,7 @@ abstract class ActorSystem extends ActorRefFactory {
* (below which the logging actors reside) and the execute all registered
* termination handlers (see [[ActorSystem.registerOnTermination]]).
*/
- def shutdown()
+ def shutdown(): Unit
/**
* Registers the provided extension and creates its payload, if this extension isn't already registered
@@ -322,6 +322,14 @@ abstract class ExtendedActorSystem extends ActorSystem {
*/
def deathWatch: DeathWatch
+ /**
+ * ClassLoader which is used for reflective accesses internally. This is set
+ * to the context class loader, if one is set, or the class loader which
+ * loaded the ActorSystem implementation. The context class loader is also
+ * set on all threads created by the ActorSystem, if one was set during
+ * creation.
+ */
+ def internalClassLoader: ClassLoader
}
class ActorSystemImpl(val name: String, applicationConfig: Config) extends ExtendedActorSystem {
@@ -331,8 +339,10 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Exten
import ActorSystem._
- final val settings = new Settings(applicationConfig, name)
- final val threadFactory = new MonitorableThreadFactory(name, settings.Daemonicity)
+ final val settings: Settings = new Settings(applicationConfig, name)
+
+ final val threadFactory: MonitorableThreadFactory =
+ MonitorableThreadFactory(name, settings.Daemonicity, Option(Thread.currentThread.getContextClassLoader))
def logConfiguration(): Unit = log.info(settings.toString)
@@ -377,18 +387,32 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Exten
import settings._
// this provides basic logging (to stdout) until .start() is called below
- val eventStream = new EventStream(DebugEventStream)
+ val eventStream: EventStream = new EventStream(DebugEventStream)
eventStream.startStdoutLogger(settings)
- // unfortunately we need logging before we know the rootpath address, which wants to be inserted here
- @volatile
- private var _log = new BusLogging(eventStream, "ActorSystem(" + name + ")", this.getClass)
- def log = _log
+ val log: LoggingAdapter = new BusLogging(eventStream, "ActorSystem(" + name + ")", this.getClass)
- val scheduler = createScheduler()
+ val scheduler: Scheduler = createScheduler()
- val deadLetters = new DeadLetterActorRef(eventStream)
- val deadLetterMailbox = new Mailbox(null) {
+ val internalClassLoader = Option(Thread.currentThread.getContextClassLoader) getOrElse getClass.getClassLoader
+
+ val provider: ActorRefProvider = {
+ val arguments = Seq(
+ classOf[String] -> name,
+ classOf[Settings] -> settings,
+ classOf[EventStream] -> eventStream,
+ classOf[Scheduler] -> scheduler,
+ classOf[ClassLoader] -> internalClassLoader)
+
+ ReflectiveAccess.createInstance[ActorRefProvider](ProviderClass, arguments, internalClassLoader) match {
+ case Left(e) ⇒ throw e
+ case Right(p) ⇒ p
+ }
+ }
+
+ def deadLetters: ActorRef = provider.deadLetters
+
+ val deadLetterMailbox: Mailbox = new Mailbox(null) {
becomeClosed()
override def enqueue(receiver: ActorRef, envelope: Envelope) { deadLetters ! DeadLetter(envelope.message, envelope.sender, receiver) }
override def dequeue() = null
@@ -399,28 +423,12 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Exten
override def numberOfMessages = 0
}
- val provider: ActorRefProvider = {
- val providerClass = ReflectiveAccess.getClassFor(ProviderClass) match {
- case Left(e) ⇒ throw e
- case Right(b) ⇒ b
- }
- val arguments = Seq(
- classOf[String] -> name,
- classOf[Settings] -> settings,
- classOf[EventStream] -> eventStream,
- classOf[Scheduler] -> scheduler,
- classOf[InternalActorRef] -> deadLetters)
- val types: Array[Class[_]] = arguments map (_._1) toArray
- val values: Array[AnyRef] = arguments map (_._2) toArray
+ def locker: Locker = provider.locker
- ReflectiveAccess.createInstance[ActorRefProvider](providerClass, types, values) match {
- case Left(e) ⇒ throw e
- case Right(p) ⇒ p
- }
- }
+ val dispatchers: Dispatchers = new Dispatchers(settings, DefaultDispatcherPrerequisites(
+ threadFactory, eventStream, deadLetterMailbox, scheduler, internalClassLoader))
- val dispatchers = new Dispatchers(settings, DefaultDispatcherPrerequisites(threadFactory, eventStream, deadLetterMailbox, scheduler))
- val dispatcher = dispatchers.defaultGlobalDispatcher
+ val dispatcher: MessageDispatcher = dispatchers.defaultGlobalDispatcher
def terminationFuture: Future[Unit] = provider.terminationFuture
def lookupRoot: InternalActorRef = provider.rootGuardian
@@ -434,21 +442,13 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Exten
private lazy val _start: this.type = {
// the provider is expected to start default loggers, LocalActorRefProvider does this
provider.init(this)
- _log = new BusLogging(eventStream, "ActorSystem(" + lookupRoot.path.address + ")", this.getClass)
- deadLetters.init(provider, lookupRoot.path / "deadLetters")
registerOnTermination(stopScheduler())
- // this starts the reaper actor and the user-configured logging subscribers, which are also actors
- _locker = new Locker(scheduler, ReaperInterval, provider, lookupRoot.path / "locker", deathWatch)
loadExtensions()
if (LogConfigOnStart) logConfiguration()
this
}
- @volatile
- private var _locker: Locker = _ // initialized in start()
- def locker = _locker
-
- def start() = _start
+ def start(): this.type = _start
private lazy val terminationCallbacks = {
val callbacks = new TerminationCallbacks
@@ -460,9 +460,7 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Exten
def awaitTermination(timeout: Duration) { Await.ready(terminationCallbacks, timeout) }
def awaitTermination() = awaitTermination(Duration.Inf)
- def shutdown() {
- stop(guardian)
- }
+ def shutdown(): Unit = stop(guardian)
/**
* Create the scheduler service. This one needs one special behavior: if
@@ -547,8 +545,8 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Exten
private def loadExtensions() {
import scala.collection.JavaConversions._
settings.config.getStringList("akka.extensions") foreach { fqcn ⇒
- import ReflectiveAccess._
- getObjectFor[AnyRef](fqcn).fold(_ ⇒ createInstance[AnyRef](fqcn, noParams, noArgs), Right(_)) match {
+ import ReflectiveAccess.{ getObjectFor, createInstance, noParams, noArgs }
+ getObjectFor[AnyRef](fqcn, internalClassLoader).fold(_ ⇒ createInstance[AnyRef](fqcn, noParams, noArgs), Right(_)) match {
case Right(p: ExtensionIdProvider) ⇒ registerExtension(p.lookup());
case Right(p: ExtensionId[_]) ⇒ registerExtension(p);
case Right(other) ⇒ log.error("[{}] is not an 'ExtensionIdProvider' or 'ExtensionId', skipping...", fqcn)
@@ -558,7 +556,7 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Exten
}
}
- override def toString = lookupRoot.path.root.address.toString
+ override def toString: String = lookupRoot.path.root.address.toString
final class TerminationCallbacks extends Runnable with Awaitable[Unit] {
private val lock = new ReentrantGuard
diff --git a/akka-actor/src/main/scala/akka/actor/Address.scala b/akka-actor/src/main/scala/akka/actor/Address.scala
index 6cd6c369dd..956657dc7c 100644
--- a/akka-actor/src/main/scala/akka/actor/Address.scala
+++ b/akka-actor/src/main/scala/akka/actor/Address.scala
@@ -9,17 +9,41 @@ import java.net.URISyntaxException
* The address specifies the physical location under which an Actor can be
* reached. Examples are local addresses, identified by the ActorSystem’s
* name, and remote addresses, identified by protocol, host and port.
+ *
+ * This class is final to allow use as a case class (copy method etc.); if
+ * for example a remote transport would want to associate additional
+ * information with an address, then this must be done externally.
*/
-abstract class Address {
- def protocol: String
- def hostPort: String
+final case class Address(protocol: String, system: String, host: Option[String], port: Option[Int]) {
+
+ def this(protocol: String, system: String) = this(protocol, system, None, None)
+ def this(protocol: String, system: String, host: String, port: Int) = this(protocol, system, Option(host), Some(port))
+
@transient
- override lazy val toString = protocol + "://" + hostPort
+ override lazy val toString = {
+ val sb = new StringBuilder(protocol)
+ sb.append("://")
+ sb.append(hostPort)
+ sb.toString
+ }
+
+ @transient
+ lazy val hostPort = {
+ val sb = new StringBuilder(system)
+ if (host.isDefined) {
+ sb.append('@')
+ sb.append(host.get)
+ }
+ if (port.isDefined) {
+ sb.append(':')
+ sb.append(port.get)
+ }
+ sb.toString
+ }
}
-case class LocalAddress(systemName: String) extends Address {
- def protocol = "akka"
- def hostPort = systemName
+object Address {
+ def apply(protocol: String, system: String) = new Address(protocol, system)
}
object RelativeActorPath {
@@ -32,12 +56,34 @@ object RelativeActorPath {
}
}
-object LocalActorPath {
- def unapply(addr: String): Option[(LocalAddress, Iterable[String])] = {
+object AddressExtractor {
+ def unapply(addr: String): Option[Address] = {
try {
val uri = new URI(addr)
- if (uri.getScheme != "akka" || uri.getUserInfo != null || uri.getHost == null || uri.getPath == null) None
- else Some(LocalAddress(uri.getHost), ActorPath.split(uri.getPath).drop(1))
+ if (uri.getScheme == null || (uri.getUserInfo == null && uri.getHost == null)) None
+ else {
+ val addr = Address(uri.getScheme, if (uri.getUserInfo != null) uri.getUserInfo else uri.getHost,
+ if (uri.getUserInfo == null || uri.getHost == null) None else Some(uri.getHost),
+ if (uri.getPort < 0) None else Some(uri.getPort))
+ Some(addr)
+ }
+ } catch {
+ case _: URISyntaxException ⇒ None
+ }
+ }
+}
+
+object ActorPathExtractor {
+ def unapply(addr: String): Option[(Address, Iterable[String])] = {
+ try {
+ val uri = new URI(addr)
+ if (uri.getScheme == null || (uri.getUserInfo == null && uri.getHost == null) || uri.getPath == null) None
+ else {
+ val addr = Address(uri.getScheme, if (uri.getUserInfo != null) uri.getUserInfo else uri.getHost,
+ if (uri.getUserInfo == null || uri.getHost == null) None else Some(uri.getHost),
+ if (uri.getPort < 0) None else Some(uri.getPort))
+ Some((addr, ActorPath.split(uri.getPath).drop(1)))
+ }
} catch {
case _: URISyntaxException ⇒ None
}
diff --git a/akka-actor/src/main/scala/akka/actor/Deployer.scala b/akka-actor/src/main/scala/akka/actor/Deployer.scala
index c8e780d5c2..36d82b2cec 100644
--- a/akka-actor/src/main/scala/akka/actor/Deployer.scala
+++ b/akka-actor/src/main/scala/akka/actor/Deployer.scala
@@ -23,7 +23,7 @@ case object LocalScope extends Scope
*
* @author Jonas Bonér
*/
-class Deployer(val settings: ActorSystem.Settings) {
+class Deployer(val settings: ActorSystem.Settings, val classloader: ClassLoader) {
import scala.collection.JavaConverters._
@@ -41,7 +41,6 @@ class Deployer(val settings: ActorSystem.Settings) {
def deploy(d: Deploy): Unit = deployments.put(d.path, d)
protected def parseConfig(key: String, config: Config): Option[Deploy] = {
- import akka.util.ReflectiveAccess.getClassFor
val deployment = config.withFallback(default)
@@ -65,8 +64,8 @@ class Deployer(val settings: ActorSystem.Settings) {
case "scatter-gather" ⇒ ScatterGatherFirstCompletedRouter(nrOfInstances, routees, within, resizer)
case "broadcast" ⇒ BroadcastRouter(nrOfInstances, routees, resizer)
case fqn ⇒
- val constructorSignature = Array[Class[_]](classOf[Config])
- ReflectiveAccess.createInstance[RouterConfig](fqn, constructorSignature, Array[AnyRef](deployment)) match {
+ val args = Seq(classOf[Config] -> deployment)
+ ReflectiveAccess.createInstance[RouterConfig](fqn, args, classloader) match {
case Right(router) ⇒ router
case Left(exception) ⇒
throw new IllegalArgumentException(
diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala
index 5bc5b7dc94..66dd0385c9 100644
--- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala
+++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala
@@ -324,8 +324,8 @@ abstract class MessageDispatcherConfigurator(val config: Config, val prerequisit
BoundedMailbox(capacity, duration)
}
case fqcn ⇒
- val constructorSignature = Array[Class[_]](classOf[Config])
- ReflectiveAccess.createInstance[MailboxType](fqcn, constructorSignature, Array[AnyRef](config)) match {
+ val args = Seq(classOf[Config] -> config)
+ ReflectiveAccess.createInstance[MailboxType](fqcn, args, prerequisites.classloader) match {
case Right(instance) ⇒ instance
case Left(exception) ⇒
throw new IllegalArgumentException(
diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala
index fd58346955..d71604fd1a 100644
--- a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala
+++ b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala
@@ -19,13 +19,15 @@ trait DispatcherPrerequisites {
def eventStream: EventStream
def deadLetterMailbox: Mailbox
def scheduler: Scheduler
+ def classloader: ClassLoader
}
case class DefaultDispatcherPrerequisites(
val threadFactory: ThreadFactory,
val eventStream: EventStream,
val deadLetterMailbox: Mailbox,
- val scheduler: Scheduler) extends DispatcherPrerequisites
+ val scheduler: Scheduler,
+ val classloader: ClassLoader) extends DispatcherPrerequisites
object Dispatchers {
/**
@@ -134,8 +136,8 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc
case "BalancingDispatcher" ⇒ new BalancingDispatcherConfigurator(cfg, prerequisites)
case "PinnedDispatcher" ⇒ new PinnedDispatcherConfigurator(cfg, prerequisites)
case fqn ⇒
- val constructorSignature = Array[Class[_]](classOf[Config], classOf[DispatcherPrerequisites])
- ReflectiveAccess.createInstance[MessageDispatcherConfigurator](fqn, constructorSignature, Array[AnyRef](cfg, prerequisites)) match {
+ val args = Seq(classOf[Config] -> cfg, classOf[DispatcherPrerequisites] -> prerequisites)
+ ReflectiveAccess.createInstance[MessageDispatcherConfigurator](fqn, args, prerequisites.classloader) match {
case Right(configurator) ⇒ configurator
case Left(exception) ⇒
throw new IllegalArgumentException(
diff --git a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala
index e9430340fa..369f3cdaf2 100644
--- a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala
+++ b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala
@@ -155,6 +155,7 @@ object MonitorableThreadFactory {
case class MonitorableThreadFactory(name: String,
daemonic: Boolean,
+ contextClassLoader: Option[ClassLoader],
exceptionHandler: Thread.UncaughtExceptionHandler = MonitorableThreadFactory.doNothing)
extends ThreadFactory with ForkJoinPool.ForkJoinWorkerThreadFactory {
protected val counter = new AtomicLong
@@ -169,6 +170,7 @@ case class MonitorableThreadFactory(name: String,
val t = new Thread(runnable, name + counter.incrementAndGet())
t.setUncaughtExceptionHandler(exceptionHandler)
t.setDaemon(daemonic)
+ contextClassLoader foreach (t.setContextClassLoader(_))
t
}
}
diff --git a/akka-actor/src/main/scala/akka/event/Logging.scala b/akka-actor/src/main/scala/akka/event/Logging.scala
index f75e47f9a4..c8bbe5f9eb 100644
--- a/akka-actor/src/main/scala/akka/event/Logging.scala
+++ b/akka-actor/src/main/scala/akka/event/Logging.scala
@@ -101,7 +101,7 @@ trait LoggingBus extends ActorEventBus {
if loggerName != StandardOutLoggerName
} yield {
try {
- ReflectiveAccess.getClassFor[Actor](loggerName) match {
+ ReflectiveAccess.getClassFor[Actor](loggerName, system.internalClassLoader) match {
case Right(actorClass) ⇒ addLogger(system, actorClass, level, logName)
case Left(exception) ⇒ throw exception
}
@@ -648,7 +648,7 @@ object Logging {
* akka.stdout-loglevel in akka.conf.
*/
class StandardOutLogger extends MinimalActorRef with StdOutLogger {
- val path: ActorPath = new RootActorPath(LocalAddress("all-systems"), "/StandardOutLogger")
+ val path: ActorPath = new RootActorPath(Address("akka", "all-systems"), "/StandardOutLogger")
def provider: ActorRefProvider = throw new UnsupportedOperationException("StandardOutLogger does not provide")
override val toString = "StandardOutLogger"
override def !(message: Any)(implicit sender: ActorRef = null): Unit = print(message)
diff --git a/akka-actor/src/main/scala/akka/serialization/Serialization.scala b/akka-actor/src/main/scala/akka/serialization/Serialization.scala
index 78cb370b68..750b2e5c35 100644
--- a/akka-actor/src/main/scala/akka/serialization/Serialization.scala
+++ b/akka-actor/src/main/scala/akka/serialization/Serialization.scala
@@ -9,7 +9,7 @@ import akka.util.ReflectiveAccess
import scala.util.DynamicVariable
import com.typesafe.config.Config
import akka.config.ConfigurationException
-import akka.actor.{ Extension, ActorSystem, ExtendedActorSystem }
+import akka.actor.{ Extension, ActorSystem, ExtendedActorSystem, Address }
case class NoSerializerFoundException(m: String) extends AkkaException(m)
@@ -27,6 +27,12 @@ object Serialization {
*/
val currentSystem = new DynamicVariable[ActorSystem](null)
+ /**
+ * This holds a reference to the current transport address to be inserted
+ * into local actor refs during serialization.
+ */
+ val currentTransportAddress = new DynamicVariable[Address](null)
+
class Settings(val config: Config) {
import scala.collection.JavaConverters._
@@ -75,10 +81,10 @@ class Serialization(val system: ExtendedActorSystem) extends Extension {
def deserialize(bytes: Array[Byte],
serializerId: Int,
clazz: Option[Class[_]],
- classLoader: Option[ClassLoader]): Either[Exception, AnyRef] =
+ classLoader: ClassLoader): Either[Exception, AnyRef] =
try {
currentSystem.withValue(system) {
- Right(serializerByIdentity(serializerId).fromBinary(bytes, clazz, classLoader))
+ Right(serializerByIdentity(serializerId).fromBinary(bytes, clazz, Some(classLoader)))
}
} catch { case e: Exception ⇒ Left(e) }
diff --git a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala
index 7c00d69225..19ad3bc995 100644
--- a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala
+++ b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala
@@ -38,6 +38,15 @@ object ReflectiveAccess {
}
}
+ def createInstance[T](clazz: Class[_], args: Seq[(Class[_], AnyRef)]): Either[Exception, T] =
+ createInstance(clazz, args.map(_._1).toArray, args.map(_._2).toArray)
+
+ def createInstance[T](fqcn: String, args: Seq[(Class[_], AnyRef)], classloader: ClassLoader): Either[Exception, T] =
+ createInstance(fqcn, args.map(_._1).toArray, args.map(_._2).toArray, classloader)
+
+ def createInstance[T](fqcn: String, args: Seq[(Class[_], AnyRef)]): Either[Exception, T] =
+ createInstance(fqcn, args.map(_._1).toArray, args.map(_._2).toArray, loader)
+
//Obtains a reference to fqn.MODULE$
def getObjectFor[T](fqn: String, classloader: ClassLoader = loader): Either[Exception, T] = try {
getClassFor(fqn, classloader) match {
diff --git a/akka-docs/general/configuration.rst b/akka-docs/general/configuration.rst
index 8109e7a358..1a01ba24c9 100644
--- a/akka-docs/general/configuration.rst
+++ b/akka-docs/general/configuration.rst
@@ -217,6 +217,20 @@ and parsed by the actor system can be displayed like this:
println(system.settings());
// this is a shortcut for system.settings().config().root().render()
+A Word About ClassLoaders
+-------------------------
+
+In several places of the configuration file it is possible to specify the
+fully-qualified class name of something to be instantiated by Akka. This is
+done using Java reflection, which in turn uses a :class:`ClassLoader`. Getting
+the right one in challenging environments like application containers or OSGi
+bundles is not always trivial, the current approach of Akka is that each
+:class:`ActorSystem` implementation stores the current thread’s context class
+loader (if available, otherwise just its own loader as in
+``this.getClass.getClassLoader``) and uses that for all reflective accesses.
+This implies that putting Akka on the boot class path will yield
+:class:`NullPointerException` from strange places: this is simply not
+supported.
Application specific settings
-----------------------------
diff --git a/akka-docs/intro/what-is-akka.rst b/akka-docs/intro/what-is-akka.rst
index 6cc7c591c0..80a8b0e62c 100644
--- a/akka-docs/intro/what-is-akka.rst
+++ b/akka-docs/intro/what-is-akka.rst
@@ -82,6 +82,20 @@ Akka can be used in two different ways
See the :ref:`deployment-scenarios` for details.
+What happened to Cloudy Akka?
+=============================
+
+The commercial offering was earlier referred to as Cloudy Akka. This offering
+consisted of two things:
+
+- Cluster support for Akka
+- Monitoring & Management (formerly called Atmos)
+
+Cloudy Akka have been discontinued and the Cluster support is now being moved into the
+Open Source version of Akka (the upcoming Akka 2.1), while the Monitoring & Management
+(Atmos) is now rebranded into Typesafe Console and is part of the commercial subscription
+for the Typesafe Stack (see below for details).
+
Typesafe Stack
==============
@@ -100,6 +114,7 @@ Typesafe Console
On top of the Typesafe Stack we have also have commercial product called Typesafe
Console which provides the following features:
+#. Slick Web UI with real-time view into the system
#. Management through Dashboard, JMX and REST
#. Dapper-style tracing of messages across components and remote nodes
#. Real-time statistics
diff --git a/akka-docs/java/code/akka/docs/actor/FSMDocTestBase.java b/akka-docs/java/code/akka/docs/actor/FSMDocTestBase.java
index 981cac15b1..aeaca63f92 100644
--- a/akka-docs/java/code/akka/docs/actor/FSMDocTestBase.java
+++ b/akka-docs/java/code/akka/docs/actor/FSMDocTestBase.java
@@ -18,45 +18,53 @@ import akka.actor.UntypedActor;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.testkit.TestProbe;
+import akka.testkit.AkkaSpec;
public class FSMDocTestBase {
//#data
public static final class SetTarget {
final ActorRef ref;
+
public SetTarget(ActorRef ref) {
this.ref = ref;
}
}
-
+
public static final class Queue {
final Object o;
+
public Queue(Object o) {
this.o = o;
}
}
-
+
public static final Object flush = new Object();
-
+
public static final class Batch {
final List