Creating ExtensionId, AbstractExtensionId, ExtensionIdProvider and Extension

This commit is contained in:
Viktor Klang 2011-11-25 10:45:22 +01:00
parent bf20f3fa44
commit 603a8ed034
13 changed files with 85 additions and 60 deletions

View file

@ -13,18 +13,25 @@ import static org.junit.Assert.*;
public class JavaExtension { public class JavaExtension {
static class Provider implements ExtensionProvider { static class Provider implements ExtensionIdProvider {
public Extension lookup() { return defaultInstance; } public ExtensionId lookup() { return defaultInstance; }
} }
public final static TestExtension defaultInstance = new TestExtension(); public final static TestExtensionId defaultInstance = new TestExtensionId();
static class TestExtension extends AbstractExtension<ActorSystemImpl> { static class TestExtensionId extends AbstractExtensionId<TestExtension> {
public ActorSystemImpl createExtension(ActorSystemImpl i) { public TestExtension createExtension(ActorSystemImpl i) {
return i; return new TestExtension(i);
} }
} }
static class TestExtension implements Extension {
public final ActorSystemImpl system;
public TestExtension(ActorSystemImpl i) {
system = i;
}
}
private Config c = ConfigFactory.parseString("akka.extensions = [ \"akka.actor.JavaExtension$Provider\" ]", private Config c = ConfigFactory.parseString("akka.extensions = [ \"akka.actor.JavaExtension$Provider\" ]",
ConfigParseOptions.defaults()); ConfigParseOptions.defaults());
@ -32,8 +39,8 @@ public class JavaExtension {
@Test @Test
public void mustBeAccessible() { public void mustBeAccessible() {
assertSame(system.extension(defaultInstance), system); assertSame(system.extension(defaultInstance).system, system);
assertSame(defaultInstance.apply(system), system); assertSame(defaultInstance.apply(system).system, system);
} }
} }

View file

@ -10,10 +10,12 @@ import com.typesafe.config.ConfigFactory
class JavaExtensionSpec extends JavaExtension with JUnitSuite class JavaExtensionSpec extends JavaExtension with JUnitSuite
object ActorSystemSpec { object ActorSystemSpec {
object TestExtension extends Extension[ActorSystem] with ExtensionProvider { object TestExtension extends ExtensionId[TestExtension] with ExtensionIdProvider {
def lookup = this def lookup = this
def createExtension(s: ActorSystemImpl) = s def createExtension(s: ActorSystemImpl) = new TestExtension(s)
} }
class TestExtension(val system: ActorSystemImpl) extends Extension
} }
class ActorSystemSpec extends AkkaSpec("""akka.extensions = ["akka.actor.ActorSystemSpec$TestExtension$"]""") { class ActorSystemSpec extends AkkaSpec("""akka.extensions = ["akka.actor.ActorSystemSpec$TestExtension$"]""") {
@ -22,8 +24,8 @@ class ActorSystemSpec extends AkkaSpec("""akka.extensions = ["akka.actor.ActorSy
"An ActorSystem" must { "An ActorSystem" must {
"support extensions" in { "support extensions" in {
TestExtension(system) must be === system TestExtension(system).system must be === system
system.extension(TestExtension) must be === system system.extension(TestExtension).system must be === system
system.hasExtension(TestExtension) must be(true) system.hasExtension(TestExtension) must be(true)
} }

View file

@ -256,7 +256,7 @@ abstract class ActorSystem extends ActorRefFactory with TypedActorFactory {
* This method has putIfAbsent-semantics, this method can potentially block, waiting for the initialization * This method has putIfAbsent-semantics, this method can potentially block, waiting for the initialization
* of the payload, if is in the process of registration from another Thread of execution * of the payload, if is in the process of registration from another Thread of execution
*/ */
def registerExtension[T <: AnyRef](ext: Extension[T]): T def registerExtension[T <: Extension](ext: ExtensionId[T]): T
/** /**
* Returns the payload that is associated with the provided extension * Returns the payload that is associated with the provided extension
@ -264,13 +264,13 @@ abstract class ActorSystem extends ActorRefFactory with TypedActorFactory {
* This method can potentially block, waiting for the initialization * This method can potentially block, waiting for the initialization
* of the payload, if is in the process of registration from another Thread of execution * of the payload, if is in the process of registration from another Thread of execution
*/ */
def extension[T <: AnyRef](ext: Extension[T]): T def extension[T <: Extension](ext: ExtensionId[T]): T
/** /**
* Returns whether the specified extension is already registered, this method can potentially block, waiting for the initialization * Returns whether the specified extension is already registered, this method can potentially block, waiting for the initialization
* of the payload, if is in the process of registration from another Thread of execution * of the payload, if is in the process of registration from another Thread of execution
*/ */
def hasExtension(ext: Extension[_ <: AnyRef]): Boolean def hasExtension(ext: ExtensionId[_ <: Extension]): Boolean
} }
class ActorSystemImpl(val name: String, val applicationConfig: Config) extends ActorSystem { class ActorSystemImpl(val name: String, val applicationConfig: Config) extends ActorSystem {
@ -367,28 +367,30 @@ class ActorSystemImpl(val name: String, val applicationConfig: Config) extends A
terminationFuture onComplete (_ dispatcher.shutdown()) terminationFuture onComplete (_ dispatcher.shutdown())
} }
private val extensions = new ConcurrentIdentityHashMap[Extension[_], AnyRef] private val extensions = new ConcurrentIdentityHashMap[ExtensionId[_], AnyRef]
/** /**
* Returns any extension registered to the specified Extension or returns null if not registered * Returns any extension registered to the specified Extension or returns null if not registered
*/ */
@tailrec @tailrec
private def findExtension[T <: AnyRef](ext: Extension[T]): T = extensions.get(ext) match { private def findExtension[T <: Extension](ext: ExtensionId[T]): T = extensions.get(ext) match {
case c: CountDownLatch c.await(); findExtension(ext) //Registration in process, await completion and retry case c: CountDownLatch c.await(); findExtension(ext) //Registration in process, await completion and retry
case other other.asInstanceOf[T] //could be a T or null, in which case we return the null as T case other other.asInstanceOf[T] //could be a T or null, in which case we return the null as T
} }
@tailrec @tailrec
final def registerExtension[T <: AnyRef](ext: Extension[T]): T = { final def registerExtension[T <: Extension](ext: ExtensionId[T]): T = {
findExtension(ext) match { findExtension(ext) match {
case null //Doesn't already exist, commence registration case null //Doesn't already exist, commence registration
val inProcessOfRegistration = new CountDownLatch(1) val inProcessOfRegistration = new CountDownLatch(1)
extensions.putIfAbsent(ext, inProcessOfRegistration) match { // Signal that registration is in process extensions.putIfAbsent(ext, inProcessOfRegistration) match { // Signal that registration is in process
case null try { // Signal was successfully sent case null try { // Signal was successfully sent
val instance = ext.createExtension(this) // Create and initialize the extension ext.createExtension(this) match { // Create and initialize the extension
if (instance == null) throw new IllegalStateException("Extension instance created as null for Extension: " + ext) case null throw new IllegalStateException("Extension instance created as null for Extension: " + ext)
extensions.replace(ext, inProcessOfRegistration, instance) //Replace our in process signal with the initialized extension case instance
instance //Profit! extensions.replace(ext, inProcessOfRegistration, instance) //Replace our in process signal with the initialized extension
instance //Profit!
}
} catch { } catch {
case t case t
extensions.remove(ext, inProcessOfRegistration) //In case shit hits the fan, remove the inProcess signal extensions.remove(ext, inProcessOfRegistration) //In case shit hits the fan, remove the inProcess signal
@ -402,21 +404,22 @@ class ActorSystemImpl(val name: String, val applicationConfig: Config) extends A
} }
} }
def extension[T <: AnyRef](ext: Extension[T]): T = findExtension(ext) match { def extension[T <: Extension](ext: ExtensionId[T]): T = findExtension(ext) match {
case null throw new IllegalArgumentException("Trying to get non-registered extension " + ext) case null throw new IllegalArgumentException("Trying to get non-registered extension " + ext)
case some some.asInstanceOf[T] case some some.asInstanceOf[T]
} }
def hasExtension(ext: Extension[_ <: AnyRef]): Boolean = findExtension(ext) != null def hasExtension(ext: ExtensionId[_ <: Extension]): Boolean = findExtension(ext) != null
private def loadExtensions() { private def loadExtensions() {
import scala.collection.JavaConversions._ import scala.collection.JavaConversions._
settings.config.getStringList("akka.extensions") foreach { fqcn settings.config.getStringList("akka.extensions") foreach { fqcn
import ReflectiveAccess._ import ReflectiveAccess._
getObjectFor[ExtensionProvider](fqcn).fold(_ createInstance[ExtensionProvider](fqcn, noParams, noArgs), Right(_)) match { getObjectFor[AnyRef](fqcn).fold(_ createInstance[AnyRef](fqcn, noParams, noArgs), Right(_)) match {
case Right(p: ExtensionProvider) registerExtension(p.lookup()) case Right(p: ExtensionIdProvider) registerExtension(p.lookup());
case Right(other) log.error("'{}' is not an ExtensionProvider, skipping...", fqcn) case Right(p: ExtensionId[_]) registerExtension(p);
case Left(problem) log.error(problem, "While trying to load extension '{}', skipping...", fqcn) case Right(other) log.error("'{}' is not an ExtensionIdProvider or ExtensionId, skipping...", fqcn)
case Left(problem) log.error(problem, "While trying to load extension '{}', skipping...", fqcn)
} }
} }

View file

@ -17,21 +17,35 @@ package akka.actor
* to the ActorSystem implementation. * to the ActorSystem implementation.
* *
*/ */
trait Extension[T <: AnyRef] {
/**
* Market interface to signify an Akka Extension
*/
trait Extension
/**
* Identifies an Extension
* Lookup of Extensions is done by object identity, so the Id must be the same wherever it's used,
* otherwise you'll get the same extension loaded multiple times.
*/
trait ExtensionId[T <: Extension] {
def apply(system: ActorSystem): T = system.registerExtension(this) def apply(system: ActorSystem): T = system.registerExtension(this)
def createExtension(system: ActorSystemImpl): T def createExtension(system: ActorSystemImpl): T
} }
/** /**
* Java API for Extension * Java API for ExtensionId
*/ */
abstract class AbstractExtension[T <: AnyRef] extends Extension[T] abstract class AbstractExtensionId[T <: Extension] extends ExtensionId[T]
/** /**
* To be able to load an Extension from the configuration, * To be able to load an ExtensionId from the configuration,
* a class that implements ExtensionProvider must be specified. * a class that implements ExtensionIdProvider must be specified.
* The lookup method should return the canonical reference to the extension. * The lookup method should return the canonical reference to the extension.
*/ */
trait ExtensionProvider { trait ExtensionIdProvider {
def lookup(): Extension[_ <: AnyRef] /**
* Returns the canonical ExtensionId for this Extension
*/
def lookup(): ExtensionId[_ <: Extension]
} }

View file

@ -6,11 +6,11 @@ package akka.serialization
import akka.AkkaException import akka.AkkaException
import akka.util.ReflectiveAccess import akka.util.ReflectiveAccess
import akka.actor.{ ActorSystem, ActorSystemImpl }
import scala.util.DynamicVariable import scala.util.DynamicVariable
import com.typesafe.config.{ ConfigRoot, ConfigParseOptions, ConfigFactory, Config } import com.typesafe.config.{ ConfigRoot, ConfigParseOptions, ConfigFactory, Config }
import com.typesafe.config.Config._ import com.typesafe.config.Config._
import akka.config.ConfigurationException import akka.config.ConfigurationException
import akka.actor.{ Extension, ActorSystem, ActorSystemImpl }
case class NoSerializerFoundException(m: String) extends AkkaException(m) case class NoSerializerFoundException(m: String) extends AkkaException(m)
@ -55,7 +55,7 @@ object Serialization {
* Serialization module. Contains methods for serialization and deserialization as well as * Serialization module. Contains methods for serialization and deserialization as well as
* locating a Serializer for a particular class as defined in the mapping in the 'akka.conf' file. * locating a Serializer for a particular class as defined in the mapping in the 'akka.conf' file.
*/ */
class Serialization(val system: ActorSystemImpl) { class Serialization(val system: ActorSystemImpl) extends Extension {
import Serialization._ import Serialization._
val settings = new Settings(system.applicationConfig) val settings = new Settings(system.applicationConfig)

View file

@ -2,9 +2,10 @@
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.serialization package akka.serialization
import akka.actor.{ ExtensionProvider, Extension, ActorSystemImpl }
object SerializationExtension extends Extension[Serialization] with ExtensionProvider { import akka.actor.{ ExtensionId, ExtensionIdProvider, ActorSystemImpl }
object SerializationExtension extends ExtensionId[Serialization] with ExtensionIdProvider {
override def lookup = SerializationExtension override def lookup = SerializationExtension
override def createExtension(system: ActorSystemImpl): Serialization = new Serialization(system) override def createExtension(system: ActorSystemImpl): Serialization = new Serialization(system)
} }

View file

@ -9,14 +9,14 @@ import com.typesafe.config.ConfigParseOptions
import com.typesafe.config.ConfigRoot import com.typesafe.config.ConfigRoot
import akka.util.Duration import akka.util.Duration
import java.util.concurrent.TimeUnit.MILLISECONDS import java.util.concurrent.TimeUnit.MILLISECONDS
import akka.actor.{ ExtensionProvider, ActorSystem, Extension, ActorSystemImpl } import akka.actor._
object BeanstalkBasedMailboxExtension extends Extension[BeanstalkMailboxSettings] with ExtensionProvider { object BeanstalkBasedMailboxExtension extends ExtensionId[BeanstalkMailboxSettings] with ExtensionIdProvider {
def lookup() = this def lookup() = this
def createExtension(system: ActorSystemImpl) = new BeanstalkMailboxSettings(system.applicationConfig) def createExtension(system: ActorSystemImpl) = new BeanstalkMailboxSettings(system.applicationConfig)
} }
class BeanstalkMailboxSettings(cfg: Config) { class BeanstalkMailboxSettings(cfg: Config) extends Extension {
private def referenceConfig: Config = private def referenceConfig: Config =
ConfigFactory.parseResource(classOf[ActorSystem], "/akka-beanstalk-mailbox-reference.conf", ConfigFactory.parseResource(classOf[ActorSystem], "/akka-beanstalk-mailbox-reference.conf",
ConfigParseOptions.defaults.setAllowMissing(false)) ConfigParseOptions.defaults.setAllowMissing(false))

View file

@ -9,14 +9,14 @@ import com.typesafe.config.ConfigParseOptions
import com.typesafe.config.ConfigRoot import com.typesafe.config.ConfigRoot
import akka.util.Duration import akka.util.Duration
import java.util.concurrent.TimeUnit.MILLISECONDS import java.util.concurrent.TimeUnit.MILLISECONDS
import akka.actor.{ ExtensionProvider, ActorSystem, Extension, ActorSystemImpl } import akka.actor._
object FileBasedMailboxExtension extends Extension[FileBasedMailboxSettings] with ExtensionProvider { object FileBasedMailboxExtension extends ExtensionId[FileBasedMailboxSettings] with ExtensionIdProvider {
def lookup() = this def lookup() = this
def createExtension(system: ActorSystemImpl) = new FileBasedMailboxSettings(system.applicationConfig) def createExtension(system: ActorSystemImpl) = new FileBasedMailboxSettings(system.applicationConfig)
} }
class FileBasedMailboxSettings(cfg: Config) { class FileBasedMailboxSettings(cfg: Config) extends Extension {
private def referenceConfig: Config = private def referenceConfig: Config =
ConfigFactory.parseResource(classOf[ActorSystem], "/akka-file-mailbox-reference.conf", ConfigFactory.parseResource(classOf[ActorSystem], "/akka-file-mailbox-reference.conf",
ConfigParseOptions.defaults.setAllowMissing(false)) ConfigParseOptions.defaults.setAllowMissing(false))

View file

@ -11,12 +11,12 @@ import akka.util.Duration
import java.util.concurrent.TimeUnit.MILLISECONDS import java.util.concurrent.TimeUnit.MILLISECONDS
import akka.actor._ import akka.actor._
object MongoBasedMailboxExtension extends Extension[MongoBasedMailboxSettings] with ExtensionProvider { object MongoBasedMailboxExtension extends ExtensionId[MongoBasedMailboxSettings] with ExtensionIdProvider {
def lookup() = this def lookup() = this
def createExtension(system: ActorSystemImpl) = new MongoBasedMailboxSettings(system.applicationConfig) def createExtension(system: ActorSystemImpl) = new MongoBasedMailboxSettings(system.applicationConfig)
} }
class MongoBasedMailboxSettings(cfg: Config) { class MongoBasedMailboxSettings(cfg: Config) extends Extension {
private def referenceConfig: Config = private def referenceConfig: Config =
ConfigFactory.parseResource(classOf[ActorSystem], "/akka-mongo-mailbox-reference.conf", ConfigFactory.parseResource(classOf[ActorSystem], "/akka-mongo-mailbox-reference.conf",
ConfigParseOptions.defaults.setAllowMissing(false)) ConfigParseOptions.defaults.setAllowMissing(false))

View file

@ -7,14 +7,14 @@ import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigFactory
import com.typesafe.config.ConfigParseOptions import com.typesafe.config.ConfigParseOptions
import com.typesafe.config.ConfigRoot import com.typesafe.config.ConfigRoot
import akka.actor.{ ExtensionProvider, ActorSystem, Extension, ActorSystemImpl } import akka.actor._
object RedisBasedMailboxExtension extends Extension[RedisBasedMailboxSettings] with ExtensionProvider { object RedisBasedMailboxExtension extends ExtensionId[RedisBasedMailboxSettings] with ExtensionIdProvider {
def lookup() = this def lookup() = this
def createExtension(system: ActorSystemImpl) = new RedisBasedMailboxSettings(system.applicationConfig) def createExtension(system: ActorSystemImpl) = new RedisBasedMailboxSettings(system.applicationConfig)
} }
class RedisBasedMailboxSettings(cfg: Config) { class RedisBasedMailboxSettings(cfg: Config) extends Extension {
private def referenceConfig: Config = private def referenceConfig: Config =
ConfigFactory.parseResource(classOf[ActorSystem], "/akka-redis-mailbox-reference.conf", ConfigFactory.parseResource(classOf[ActorSystem], "/akka-redis-mailbox-reference.conf",
ConfigParseOptions.defaults.setAllowMissing(false)) ConfigParseOptions.defaults.setAllowMissing(false))

View file

@ -9,13 +9,13 @@ import com.typesafe.config.ConfigParseOptions
import com.typesafe.config.ConfigRoot import com.typesafe.config.ConfigRoot
import akka.util.Duration import akka.util.Duration
import java.util.concurrent.TimeUnit.MILLISECONDS import java.util.concurrent.TimeUnit.MILLISECONDS
import akka.actor.{ ExtensionProvider, ActorSystem, Extension, ActorSystemImpl } import akka.actor._
object ZooKeeperBasedMailboxExtension extends Extension[ZooKeeperBasedMailboxSettings] with ExtensionProvider { object ZooKeeperBasedMailboxExtension extends ExtensionId[ZooKeeperBasedMailboxSettings] with ExtensionIdProvider {
def lookup() = this def lookup() = this
def createExtension(system: ActorSystemImpl) = new ZooKeeperBasedMailboxSettings(system.applicationConfig) def createExtension(system: ActorSystemImpl) = new ZooKeeperBasedMailboxSettings(system.applicationConfig)
} }
class ZooKeeperBasedMailboxSettings(cfg: Config) { class ZooKeeperBasedMailboxSettings(cfg: Config) extends Extension {
private def referenceConfig: Config = private def referenceConfig: Config =
ConfigFactory.parseResource(classOf[ActorSystem], "/akka-zookeeper-mailbox-reference.conf", ConfigFactory.parseResource(classOf[ActorSystem], "/akka-zookeeper-mailbox-reference.conf",
ConfigParseOptions.defaults.setAllowMissing(false)) ConfigParseOptions.defaults.setAllowMissing(false))

View file

@ -14,12 +14,12 @@ import akka.config.ConfigurationException
import com.eaio.uuid.UUID import com.eaio.uuid.UUID
import akka.actor._ import akka.actor._
object RemoteExtension extends Extension[RemoteExtensionSettings] with ExtensionProvider { object RemoteExtension extends ExtensionId[RemoteExtensionSettings] with ExtensionIdProvider {
def lookup() = this def lookup() = this
def createExtension(system: ActorSystemImpl) = new RemoteExtensionSettings(system.applicationConfig) def createExtension(system: ActorSystemImpl) = new RemoteExtensionSettings(system.applicationConfig)
} }
class RemoteExtensionSettings(cfg: Config) { class RemoteExtensionSettings(cfg: Config) extends Extension {
private def referenceConfig: Config = private def referenceConfig: Config =
ConfigFactory.parseResource(classOf[ActorSystem], "/akka-remote-reference.conf", ConfigFactory.parseResource(classOf[ActorSystem], "/akka-remote-reference.conf",
ConfigParseOptions.defaults.setAllowMissing(false)) ConfigParseOptions.defaults.setAllowMissing(false))

View file

@ -3,21 +3,19 @@
*/ */
package akka.testkit package akka.testkit
import akka.actor.ActorSystem
import akka.actor.Extension
import akka.actor.ActorSystemImpl
import com.typesafe.config.Config import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigFactory
import com.typesafe.config.ConfigParseOptions import com.typesafe.config.ConfigParseOptions
import com.typesafe.config.ConfigRoot import com.typesafe.config.ConfigRoot
import akka.util.Duration import akka.util.Duration
import java.util.concurrent.TimeUnit.MILLISECONDS import java.util.concurrent.TimeUnit.MILLISECONDS
import akka.actor.{ ExtensionId, ActorSystem, Extension, ActorSystemImpl }
object TestKitExtension extends Extension[TestKitSettings] { object TestKitExtension extends ExtensionId[TestKitSettings] {
def createExtension(system: ActorSystemImpl): TestKitSettings = new TestKitSettings(system.applicationConfig) def createExtension(system: ActorSystemImpl): TestKitSettings = new TestKitSettings(system.applicationConfig)
} }
class TestKitSettings(cfg: Config) { class TestKitSettings(cfg: Config) extends Extension {
private def referenceConfig: Config = private def referenceConfig: Config =
ConfigFactory.parseResource(classOf[ActorSystem], "/akka-testkit-reference.conf", ConfigFactory.parseResource(classOf[ActorSystem], "/akka-testkit-reference.conf",
ConfigParseOptions.defaults.setAllowMissing(false)) ConfigParseOptions.defaults.setAllowMissing(false))