diff --git a/akka-actor-tests/src/test/java/akka/actor/JavaExtension.java b/akka-actor-tests/src/test/java/akka/actor/JavaExtension.java new file mode 100644 index 0000000000..ec00ba823f --- /dev/null +++ b/akka-actor-tests/src/test/java/akka/actor/JavaExtension.java @@ -0,0 +1,40 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ +package akka.actor; + +import org.junit.Test; + +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigParseOptions; + +import static org.junit.Assert.*; + +public class JavaExtension { + + static class TestExtension implements Extension { + private ActorSystemImpl system; + public static ExtensionKey key = new ExtensionKey() {}; + + public ExtensionKey init(ActorSystemImpl system) { + this.system = system; + return key; + } + + public ActorSystemImpl getSystem() { + return system; + } + } + + private Config c = ConfigFactory.parseString("akka.extensions = [ \"akka.actor.JavaExtension$TestExtension\" ]", ConfigParseOptions.defaults()); + + private ActorSystem system = ActorSystem.create("JavaExtension", c); + + @Test + public void mustBeAccessible() { + final ActorSystemImpl s = system.extension(TestExtension.key).getSystem(); + assertSame(s, system); + } + +} diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala new file mode 100644 index 0000000000..19e183b965 --- /dev/null +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala @@ -0,0 +1,38 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ +package akka.actor + +import akka.testkit._ +import org.scalatest.junit.JUnitSuite +import com.typesafe.config.ConfigFactory + +class JavaExtensionSpec extends JavaExtension with JUnitSuite + +object ActorSystemSpec { + + case class TestExtension extends Extension[TestExtension] { + var system: ActorSystemImpl = _ + + def init(system: ActorSystemImpl): ExtensionKey[TestExtension] = { + this.system = system + TestExtension + } + } + + object TestExtension extends ExtensionKey[TestExtension] + +} + +class ActorSystemSpec extends AkkaSpec("""akka.extensions = ["akka.actor.ActorSystemSpec$TestExtension"]""") { + import ActorSystemSpec._ + + "An ActorSystem" must { + + "support extensions" in { + system.extension(TestExtension).system must be === system + } + + } + +} \ No newline at end of file diff --git a/akka-actor/src/main/resources/akka-actor-reference.conf b/akka-actor/src/main/resources/akka-actor-reference.conf index b189b737e5..c5ac492950 100644 --- a/akka-actor/src/main/resources/akka-actor-reference.conf +++ b/akka-actor/src/main/resources/akka-actor-reference.conf @@ -20,6 +20,8 @@ akka { # as they have been started; before that, see "stdout-loglevel" stdout-loglevel = "WARNING" # Loglevel for the very basic logger activated during AkkaApplication startup + extensions = [] # list FQCN of extensions which shall be loaded at actor system startup + event-handler-dispatcher { type = "Dispatcher" # Must be one of the following # Dispatcher, (BalancingDispatcher, only valid when all actors using it are of the same type), diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index 47ae9a9593..f7e3678909 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -22,6 +22,7 @@ import com.typesafe.config.ConfigParseOptions import com.typesafe.config.ConfigRoot import com.typesafe.config.ConfigFactory import java.lang.reflect.InvocationTargetException +import java.util.concurrent.ConcurrentHashMap object ActorSystem { @@ -150,26 +151,68 @@ object ActorSystem { } +/** + * An actor system is a hierarchical group of actors which share common + * configuration, e.g. dispatchers, deployments, remote capabilities and + * addresses. It is also the entry point for creating or looking up actors. + */ abstract class ActorSystem extends ActorRefFactory with TypedActorFactory { import ActorSystem._ + /** + * The name of this actor system, used to distinguish multiple ones within + * the same JVM & class loader. + */ def name: String + + /** + * The core settings extracted from the supplied configuration. + */ def settings: Settings + + /** + * The logical node name where this actor system resides. + */ def nodename: String /** - * Construct a path below the application guardian. + * Construct a path below the application guardian to be used with [[ActorSystem.actorFor]]. */ def /(name: String): ActorPath + + /** + * The root path for all actors within this actor system, including remote + * address if enabled. + */ def rootPath: ActorPath + /** + * Start-up time in milliseconds since the epoch. + */ val startTime = System.currentTimeMillis + + /** + * Up-time of this actor system in seconds. + */ def uptime = (System.currentTimeMillis - startTime) / 1000 + /** + * Main event bus of this actor system, used for example for logging. + */ def eventStream: EventStream + + /** + * Convenient logging adapter for logging to the [[ActorSystem.eventStream]]. + */ def log: LoggingAdapter + /** + * Actor reference where messages are re-routed to which were addressed to + * stopped or non-existing actors. Delivery to this actor is done on a best + * effort basis and hence not strictly guaranteed. + */ def deadLetters: ActorRef + // FIXME: do not publish this def deadLetterMailbox: Mailbox // FIXME: Serialization should be an extension @@ -177,20 +220,83 @@ abstract class ActorSystem extends ActorRefFactory with TypedActorFactory { // FIXME: TypedActor should be an extension def typedActor: TypedActor + /** + * Light-weight scheduler for running asynchronous tasks after some deadline + * in the future. Not terribly precise but cheap. + */ def scheduler: Scheduler + + /** + * Helper object for creating new dispatchers and passing in all required + * information. + */ def dispatcherFactory: Dispatchers + + /** + * Default dispatcher as configured. This dispatcher is used for all actors + * in the actor system which do not have a different dispatcher configured + * explicitly. + */ def dispatcher: MessageDispatcher + /** + * Register a block of code to run after all actors in this actor system have + * been stopped. + */ def registerOnTermination(code: ⇒ Unit) + + /** + * Register a block of code to run after all actors in this actor system have + * been stopped (Java API). + */ def registerOnTermination(code: Runnable) + + /** + * Stop this actor system. This will stop the guardian actor, which in turn + * will recursively stop all its child actors, then the system guardian + * (below which the logging actors reside) and the execute all registered + * termination handlers (see [[ActorSystem.registerOnTermination]]). + */ def stop() + + /** + * Register an [[akka.actor.Extension]] within this actor system. The supplied + * object is interrogated for the extension’s key with which the extension is + * accessible from anywhere you have a reference to this actor system in + * scope, e.g. within actors (see [[ActorSystem.extension]]). + * + * Extensions can be registered automatically by adding their fully-qualified + * class name to the `akka.extensions` configuration key. + */ + def registerExtension(ext: Extension[_ <: AnyRef]) + + /** + * Obtain a reference to a registered extension by passing in the key which + * the extension object returned from its init method (typically a static + * field or Scala `object`): + * + * {{{ + * class MyActor extends Actor { + * val ext: MyExtension = context.app.extension(MyExtension.key) + * } + * }}} + * + * Throws IllegalArgumentException if the extension key is not found. + */ + def extension[T <: AnyRef](key: ExtensionKey[T]): T + + /** + * Query presence of a specific extension. Beware that this key needs to be + * “the same” as the one used for registration (it is using a HashMap). + */ + def hasExtension(key: ExtensionKey[_]): Boolean } -class ActorSystemImpl(val name: String, config: Config) extends ActorSystem { +class ActorSystemImpl(val name: String, _config: Config) extends ActorSystem { import ActorSystem._ - val settings = new Settings(config) + val settings = new Settings(_config) protected def systemImpl = this @@ -280,6 +386,7 @@ class ActorSystemImpl(val name: String, config: Config) extends ActorSystem { // this starts the reaper actor and the user-configured logging subscribers, which are also actors eventStream.start(this) eventStream.startDefaultLoggers(this) + loadExtensions() this } @@ -295,4 +402,31 @@ class ActorSystemImpl(val name: String, config: Config) extends ActorSystem { terminationFuture onComplete (_ ⇒ dispatcher.shutdown()) } + private val extensions = new ConcurrentHashMap[ExtensionKey[_], Extension[_]] + + def registerExtension(ext: Extension[_ <: AnyRef]) { + val key = ext.init(this) + extensions.put(key, ext) match { + case null ⇒ + case old ⇒ log.warning("replacing extension {}:{} with {}", key, old, ext) + } + } + + def extension[T <: AnyRef](key: ExtensionKey[T]): T = extensions.get(key) match { + case null ⇒ throw new IllegalArgumentException("trying to get non-registered extension " + key) + case x ⇒ x.asInstanceOf[T] + } + + def hasExtension(key: ExtensionKey[_]): Boolean = extensions.get(key) != null + + private def loadExtensions() { + import scala.collection.JavaConversions._ + settings.config.getStringList("akka.extensions") foreach { fqcn ⇒ + import ReflectiveAccess._ + createInstance[Extension[_ <: AnyRef]](fqcn, noParams, noArgs) match { + case Left(ex) ⇒ log.error(ex, "Exception trying to load extension " + fqcn) + case Right(ext) ⇒ if (ext.isInstanceOf[Extension[_]]) registerExtension(ext) else log.error("Class {} is not an Extension", fqcn) + } + } + } } diff --git a/akka-actor/src/main/scala/akka/actor/Extension.scala b/akka-actor/src/main/scala/akka/actor/Extension.scala new file mode 100644 index 0000000000..bc0ab0c366 --- /dev/null +++ b/akka-actor/src/main/scala/akka/actor/Extension.scala @@ -0,0 +1,58 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ +package akka.actor + +/** + * The basic ActorSystem covers all that is needed for locally running actors, + * using futures and so on. In addition, more features can hook into it and + * thus become visible to actors et al by registering themselves as extensions. + * This is accomplished by providing an extension—which is an object + * implementing this trait—to `ActorSystem.registerExtension(...)` or by + * specifying the corresponding option in the configuration passed to + * ActorSystem, which will then instantiate (without arguments) each FQCN and + * register the result. + * + * The extension itself can be created in any way desired and has full access + * to the ActorSystem implementation. + * + * Scala example: + * + * {{{ + * class MyExtension extends Extension[MyExtension] { + * def init(system: ActorSystemImpl): ExtensionKey[MyExtension] = { + * ... // initialize here + * MyExtension + * } + * } + * object MyExtension extends ExtensionKey[MyExtension] + * }}} + * + * Java example: + * + * {{{ + * static class MyExtension implements Extension { + * public static ExtensionKey key = new ExtensionKey() {}; + * + * public ExtensionKey init(ActorSystemImpl system) { + * ... // initialize here + * return key; + * } + * } + * }}} + */ +trait Extension[T <: AnyRef] { + /** + * This method is called by the ActorSystem upon registering this extension. + * The key returned is used for looking up extensions, hence it must be a + * suitable hash key and available to all clients of the extension. This is + * best achieved by storing it in a static field (Java) or as/in an object + * (Scala). + */ + def init(system: ActorSystemImpl): ExtensionKey[T] +} + +/** + * Marker trait identifying a registered [[akka.actor.Extension]]. + */ +trait ExtensionKey[T <: AnyRef] diff --git a/akka-actor/src/main/scala/akka/serialization/Serialization.scala b/akka-actor/src/main/scala/akka/serialization/Serialization.scala index dffff71828..ac4a1a773a 100644 --- a/akka-actor/src/main/scala/akka/serialization/Serialization.scala +++ b/akka-actor/src/main/scala/akka/serialization/Serialization.scala @@ -46,7 +46,7 @@ class Serialization(val system: ActorSystemImpl) { * Tries to load the specified Serializer by the FQN */ def serializerOf(serializerFQN: String): Either[Exception, Serializer] = - ReflectiveAccess.createInstance(serializerFQN, ReflectiveAccess.emptyParams, ReflectiveAccess.emptyArguments) + ReflectiveAccess.createInstance(serializerFQN, ReflectiveAccess.noParams, ReflectiveAccess.noArgs) private def serializerForBestMatchClass(cl: Class[_]): Either[Exception, Serializer] = { if (bindings.isEmpty) diff --git a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala index d331190de6..e2f667527f 100644 --- a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala +++ b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala @@ -16,11 +16,8 @@ import akka.actor.ActorSystem object ReflectiveAccess { val loader = getClass.getClassLoader - val emptyParams: Array[Class[_]] = Array() - val emptyArguments: Array[AnyRef] = Array() - - val noParams = Array[Class[_]]() - val noArgs = Array[AnyRef]() + val noParams: Array[Class[_]] = Array() + val noArgs: Array[AnyRef] = Array() def createInstance[T](clazz: Class[_], params: Array[Class[_]], diff --git a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala index def5ebfe96..4ad3ae2c15 100644 --- a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala @@ -63,6 +63,8 @@ abstract class AkkaSpec(_application: ActorSystem = ActorSystem(getClass.getSimp def this(config: Config) = this(ActorSystem(getClass.getSimpleName, config.withFallback(AkkaSpec.testConf))) + def this(s: String) = this(ConfigFactory.parseString(s, ConfigParseOptions.defaults)) + def this(configMap: Map[String, _]) = { this(AkkaSpec.mapToConfig(configMap).withFallback(AkkaSpec.testConf)) }