Merge branch 'extensions' into master

This commit is contained in:
Roland 2011-11-21 15:58:01 +01:00
commit 263e2d495d
8 changed files with 280 additions and 9 deletions

View file

@ -0,0 +1,40 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor;
import org.junit.Test;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigParseOptions;
import static org.junit.Assert.*;
public class JavaExtension {
static class TestExtension implements Extension<TestExtension> {
private ActorSystemImpl system;
public static ExtensionKey<TestExtension> key = new ExtensionKey<TestExtension>() {};
public ExtensionKey<TestExtension> init(ActorSystemImpl system) {
this.system = system;
return key;
}
public ActorSystemImpl getSystem() {
return system;
}
}
private Config c = ConfigFactory.parseString("akka.extensions = [ \"akka.actor.JavaExtension$TestExtension\" ]", ConfigParseOptions.defaults());
private ActorSystem system = ActorSystem.create("JavaExtension", c);
@Test
public void mustBeAccessible() {
final ActorSystemImpl s = system.extension(TestExtension.key).getSystem();
assertSame(s, system);
}
}

View file

@ -0,0 +1,38 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor
import akka.testkit._
import org.scalatest.junit.JUnitSuite
import com.typesafe.config.ConfigFactory
class JavaExtensionSpec extends JavaExtension with JUnitSuite
object ActorSystemSpec {
case class TestExtension extends Extension[TestExtension] {
var system: ActorSystemImpl = _
def init(system: ActorSystemImpl): ExtensionKey[TestExtension] = {
this.system = system
TestExtension
}
}
object TestExtension extends ExtensionKey[TestExtension]
}
class ActorSystemSpec extends AkkaSpec("""akka.extensions = ["akka.actor.ActorSystemSpec$TestExtension"]""") {
import ActorSystemSpec._
"An ActorSystem" must {
"support extensions" in {
system.extension(TestExtension).system must be === system
}
}
}

View file

@ -20,6 +20,8 @@ akka {
# as they have been started; before that, see "stdout-loglevel"
stdout-loglevel = "WARNING" # Loglevel for the very basic logger activated during AkkaApplication startup
extensions = [] # list FQCN of extensions which shall be loaded at actor system startup
event-handler-dispatcher {
type = "Dispatcher" # Must be one of the following
# Dispatcher, (BalancingDispatcher, only valid when all actors using it are of the same type),

View file

@ -22,6 +22,7 @@ import com.typesafe.config.ConfigParseOptions
import com.typesafe.config.ConfigRoot
import com.typesafe.config.ConfigFactory
import java.lang.reflect.InvocationTargetException
import java.util.concurrent.ConcurrentHashMap
object ActorSystem {
@ -150,26 +151,68 @@ object ActorSystem {
}
/**
* An actor system is a hierarchical group of actors which share common
* configuration, e.g. dispatchers, deployments, remote capabilities and
* addresses. It is also the entry point for creating or looking up actors.
*/
abstract class ActorSystem extends ActorRefFactory with TypedActorFactory {
import ActorSystem._
/**
* The name of this actor system, used to distinguish multiple ones within
* the same JVM & class loader.
*/
def name: String
/**
* The core settings extracted from the supplied configuration.
*/
def settings: Settings
/**
* The logical node name where this actor system resides.
*/
def nodename: String
/**
* Construct a path below the application guardian.
* Construct a path below the application guardian to be used with [[ActorSystem.actorFor]].
*/
def /(name: String): ActorPath
/**
* The root path for all actors within this actor system, including remote
* address if enabled.
*/
def rootPath: ActorPath
/**
* Start-up time in milliseconds since the epoch.
*/
val startTime = System.currentTimeMillis
/**
* Up-time of this actor system in seconds.
*/
def uptime = (System.currentTimeMillis - startTime) / 1000
/**
* Main event bus of this actor system, used for example for logging.
*/
def eventStream: EventStream
/**
* Convenient logging adapter for logging to the [[ActorSystem.eventStream]].
*/
def log: LoggingAdapter
/**
* Actor reference where messages are re-routed to which were addressed to
* stopped or non-existing actors. Delivery to this actor is done on a best
* effort basis and hence not strictly guaranteed.
*/
def deadLetters: ActorRef
// FIXME: do not publish this
def deadLetterMailbox: Mailbox
// FIXME: Serialization should be an extension
@ -177,20 +220,83 @@ abstract class ActorSystem extends ActorRefFactory with TypedActorFactory {
// FIXME: TypedActor should be an extension
def typedActor: TypedActor
/**
* Light-weight scheduler for running asynchronous tasks after some deadline
* in the future. Not terribly precise but cheap.
*/
def scheduler: Scheduler
/**
* Helper object for creating new dispatchers and passing in all required
* information.
*/
def dispatcherFactory: Dispatchers
/**
* Default dispatcher as configured. This dispatcher is used for all actors
* in the actor system which do not have a different dispatcher configured
* explicitly.
*/
def dispatcher: MessageDispatcher
/**
* Register a block of code to run after all actors in this actor system have
* been stopped.
*/
def registerOnTermination(code: Unit)
/**
* Register a block of code to run after all actors in this actor system have
* been stopped (Java API).
*/
def registerOnTermination(code: Runnable)
/**
* Stop this actor system. This will stop the guardian actor, which in turn
* will recursively stop all its child actors, then the system guardian
* (below which the logging actors reside) and the execute all registered
* termination handlers (see [[ActorSystem.registerOnTermination]]).
*/
def stop()
/**
* Register an [[akka.actor.Extension]] within this actor system. The supplied
* object is interrogated for the extensions key with which the extension is
* accessible from anywhere you have a reference to this actor system in
* scope, e.g. within actors (see [[ActorSystem.extension]]).
*
* Extensions can be registered automatically by adding their fully-qualified
* class name to the `akka.extensions` configuration key.
*/
def registerExtension(ext: Extension[_ <: AnyRef])
/**
* Obtain a reference to a registered extension by passing in the key which
* the extension object returned from its init method (typically a static
* field or Scala `object`):
*
* {{{
* class MyActor extends Actor {
* val ext: MyExtension = context.app.extension(MyExtension.key)
* }
* }}}
*
* Throws IllegalArgumentException if the extension key is not found.
*/
def extension[T <: AnyRef](key: ExtensionKey[T]): T
/**
* Query presence of a specific extension. Beware that this key needs to be
* the same as the one used for registration (it is using a HashMap).
*/
def hasExtension(key: ExtensionKey[_]): Boolean
}
class ActorSystemImpl(val name: String, config: Config) extends ActorSystem {
class ActorSystemImpl(val name: String, _config: Config) extends ActorSystem {
import ActorSystem._
val settings = new Settings(config)
val settings = new Settings(_config)
protected def systemImpl = this
@ -280,6 +386,7 @@ class ActorSystemImpl(val name: String, config: Config) extends ActorSystem {
// this starts the reaper actor and the user-configured logging subscribers, which are also actors
eventStream.start(this)
eventStream.startDefaultLoggers(this)
loadExtensions()
this
}
@ -295,4 +402,31 @@ class ActorSystemImpl(val name: String, config: Config) extends ActorSystem {
terminationFuture onComplete (_ dispatcher.shutdown())
}
private val extensions = new ConcurrentHashMap[ExtensionKey[_], Extension[_]]
def registerExtension(ext: Extension[_ <: AnyRef]) {
val key = ext.init(this)
extensions.put(key, ext) match {
case null
case old log.warning("replacing extension {}:{} with {}", key, old, ext)
}
}
def extension[T <: AnyRef](key: ExtensionKey[T]): T = extensions.get(key) match {
case null throw new IllegalArgumentException("trying to get non-registered extension " + key)
case x x.asInstanceOf[T]
}
def hasExtension(key: ExtensionKey[_]): Boolean = extensions.get(key) != null
private def loadExtensions() {
import scala.collection.JavaConversions._
settings.config.getStringList("akka.extensions") foreach { fqcn
import ReflectiveAccess._
createInstance[Extension[_ <: AnyRef]](fqcn, noParams, noArgs) match {
case Left(ex) log.error(ex, "Exception trying to load extension " + fqcn)
case Right(ext) if (ext.isInstanceOf[Extension[_]]) registerExtension(ext) else log.error("Class {} is not an Extension", fqcn)
}
}
}
}

View file

@ -0,0 +1,58 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor
/**
* The basic ActorSystem covers all that is needed for locally running actors,
* using futures and so on. In addition, more features can hook into it and
* thus become visible to actors et al by registering themselves as extensions.
* This is accomplished by providing an extensionwhich is an object
* implementing this traitto `ActorSystem.registerExtension(...)` or by
* specifying the corresponding option in the configuration passed to
* ActorSystem, which will then instantiate (without arguments) each FQCN and
* register the result.
*
* The extension itself can be created in any way desired and has full access
* to the ActorSystem implementation.
*
* Scala example:
*
* {{{
* class MyExtension extends Extension[MyExtension] {
* def init(system: ActorSystemImpl): ExtensionKey[MyExtension] = {
* ... // initialize here
* MyExtension
* }
* }
* object MyExtension extends ExtensionKey[MyExtension]
* }}}
*
* Java example:
*
* {{{
* static class MyExtension implements Extension<MyExtension> {
* public static ExtensionKey<MyExtension> key = new ExtensionKey<MyExtension>() {};
*
* public ExtensionKey<MyExtension> init(ActorSystemImpl system) {
* ... // initialize here
* return key;
* }
* }
* }}}
*/
trait Extension[T <: AnyRef] {
/**
* This method is called by the ActorSystem upon registering this extension.
* The key returned is used for looking up extensions, hence it must be a
* suitable hash key and available to all clients of the extension. This is
* best achieved by storing it in a static field (Java) or as/in an object
* (Scala).
*/
def init(system: ActorSystemImpl): ExtensionKey[T]
}
/**
* Marker trait identifying a registered [[akka.actor.Extension]].
*/
trait ExtensionKey[T <: AnyRef]

View file

@ -46,7 +46,7 @@ class Serialization(val system: ActorSystemImpl) {
* Tries to load the specified Serializer by the FQN
*/
def serializerOf(serializerFQN: String): Either[Exception, Serializer] =
ReflectiveAccess.createInstance(serializerFQN, ReflectiveAccess.emptyParams, ReflectiveAccess.emptyArguments)
ReflectiveAccess.createInstance(serializerFQN, ReflectiveAccess.noParams, ReflectiveAccess.noArgs)
private def serializerForBestMatchClass(cl: Class[_]): Either[Exception, Serializer] = {
if (bindings.isEmpty)

View file

@ -16,11 +16,8 @@ import akka.actor.ActorSystem
object ReflectiveAccess {
val loader = getClass.getClassLoader
val emptyParams: Array[Class[_]] = Array()
val emptyArguments: Array[AnyRef] = Array()
val noParams = Array[Class[_]]()
val noArgs = Array[AnyRef]()
val noParams: Array[Class[_]] = Array()
val noArgs: Array[AnyRef] = Array()
def createInstance[T](clazz: Class[_],
params: Array[Class[_]],

View file

@ -63,6 +63,8 @@ abstract class AkkaSpec(_application: ActorSystem = ActorSystem(getClass.getSimp
def this(config: Config) = this(ActorSystem(getClass.getSimpleName, config.withFallback(AkkaSpec.testConf)))
def this(s: String) = this(ConfigFactory.parseString(s, ConfigParseOptions.defaults))
def this(configMap: Map[String, _]) = {
this(AkkaSpec.mapToConfig(configMap).withFallback(AkkaSpec.testConf))
}