diff --git a/akka-actor-tests/src/test/java/akka/actor/JavaExtension.java b/akka-actor-tests/src/test/java/akka/actor/JavaExtension.java index 533fba8164..2600188cac 100644 --- a/akka-actor-tests/src/test/java/akka/actor/JavaExtension.java +++ b/akka-actor-tests/src/test/java/akka/actor/JavaExtension.java @@ -13,33 +13,34 @@ import static org.junit.Assert.*; public class JavaExtension { - static class TestExtension implements Extension { - private ActorSystemImpl system; - public static ExtensionKey key = new ExtensionKey() { - }; + static class Provider implements ExtensionIdProvider { + public ExtensionId lookup() { return defaultInstance; } + } - public ExtensionKey key() { - return key; - } + public final static TestExtensionId defaultInstance = new TestExtensionId(); - public void init(ActorSystemImpl system) { - this.system = system; - } - - public ActorSystemImpl getSystem() { - return system; + static class TestExtensionId extends AbstractExtensionId { + public TestExtension createExtension(ActorSystemImpl i) { + return new TestExtension(i); } } - private Config c = ConfigFactory.parseString("akka.extensions = [ \"akka.actor.JavaExtension$TestExtension\" ]", + static class TestExtension implements Extension { + public final ActorSystemImpl system; + public TestExtension(ActorSystemImpl i) { + system = i; + } + } + + private Config c = ConfigFactory.parseString("akka.extensions = [ \"akka.actor.JavaExtension$Provider\" ]", ConfigParseOptions.defaults()); private ActorSystem system = ActorSystem.create("JavaExtension", c); @Test public void mustBeAccessible() { - final ActorSystemImpl s = system.extension(TestExtension.key).getSystem(); - assertSame(s, system); + assertSame(system.extension(defaultInstance).system, system); + assertSame(defaultInstance.apply(system).system, system); } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala index 1358e61c82..613b3c7b36 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala @@ -247,7 +247,7 @@ class ActorRefSpec extends AkkaSpec { out.flush out.close - Serialization.system.withValue(system.asInstanceOf[ActorSystemImpl]) { + Serialization.currentSystem.withValue(system.asInstanceOf[ActorSystemImpl]) { val in = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray)) val readA = in.readObject @@ -275,7 +275,7 @@ class ActorRefSpec extends AkkaSpec { (intercept[java.lang.IllegalStateException] { in.readObject }).getMessage must be === "Trying to deserialize a serialized ActorRef without an ActorSystem in scope." + - " Use akka.serialization.Serialization.system.withValue(system) { ... }" + " Use akka.serialization.Serialization.currentSystem.withValue(system) { ... }" } "must throw exception on deserialize if not present in actor hierarchy (and remoting is not enabled)" in { @@ -292,7 +292,7 @@ class ActorRefSpec extends AkkaSpec { out.flush out.close - Serialization.system.withValue(system.asInstanceOf[ActorSystemImpl]) { + Serialization.currentSystem.withValue(system.asInstanceOf[ActorSystemImpl]) { val in = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray)) (intercept[java.lang.IllegalStateException] { in.readObject diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala index d472387f13..3565cde2fb 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala @@ -10,28 +10,23 @@ import com.typesafe.config.ConfigFactory class JavaExtensionSpec extends JavaExtension with JUnitSuite object ActorSystemSpec { - - class TestExtension extends Extension[TestExtension] { - var system: ActorSystemImpl = _ - - def key = TestExtension - - def init(system: ActorSystemImpl) { - this.system = system - } + object TestExtension extends ExtensionId[TestExtension] with ExtensionIdProvider { + def lookup = this + def createExtension(s: ActorSystemImpl) = new TestExtension(s) } - object TestExtension extends ExtensionKey[TestExtension] - + class TestExtension(val system: ActorSystemImpl) extends Extension } -class ActorSystemSpec extends AkkaSpec("""akka.extensions = ["akka.actor.ActorSystemSpec$TestExtension"]""") { +class ActorSystemSpec extends AkkaSpec("""akka.extensions = ["akka.actor.ActorSystemSpec$TestExtension$"]""") { import ActorSystemSpec._ "An ActorSystem" must { "support extensions" in { + TestExtension(system).system must be === system system.extension(TestExtension).system must be === system + system.hasExtension(TestExtension) must be(true) } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala index d29627e443..ac796d407d 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala @@ -333,7 +333,7 @@ class TypedActorSpec extends AkkaSpec with BeforeAndAfterEach with BeforeAndAfte "be able to serialize and deserialize invocations" in { import java.io._ - val serialization = SerializationExtension(system).serialization + val serialization = SerializationExtension(system) val m = TypedActor.MethodCall(serialization, classOf[Foo].getDeclaredMethod("pigdog"), Array[AnyRef]()) val baos = new ByteArrayOutputStream(8192 * 4) val out = new ObjectOutputStream(baos) @@ -343,7 +343,7 @@ class TypedActorSpec extends AkkaSpec with BeforeAndAfterEach with BeforeAndAfte val in = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray)) - Serialization.system.withValue(system.asInstanceOf[ActorSystemImpl]) { + Serialization.currentSystem.withValue(system.asInstanceOf[ActorSystemImpl]) { val mNew = in.readObject().asInstanceOf[TypedActor.MethodCall] mNew.method must be(m.method) @@ -353,7 +353,7 @@ class TypedActorSpec extends AkkaSpec with BeforeAndAfterEach with BeforeAndAfte "be able to serialize and deserialize invocations' parameters" in { import java.io._ val someFoo: Foo = new Bar - val serialization = SerializationExtension(system).serialization + val serialization = SerializationExtension(system) val m = TypedActor.MethodCall(serialization, classOf[Foo].getDeclaredMethod("testMethodCallSerialization", Array[Class[_]](classOf[Foo], classOf[String], classOf[Int]): _*), Array[AnyRef](someFoo, null, 1.asInstanceOf[AnyRef])) val baos = new ByteArrayOutputStream(8192 * 4) val out = new ObjectOutputStream(baos) @@ -363,7 +363,7 @@ class TypedActorSpec extends AkkaSpec with BeforeAndAfterEach with BeforeAndAfte val in = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray)) - Serialization.system.withValue(system.asInstanceOf[ActorSystemImpl]) { + Serialization.currentSystem.withValue(system.asInstanceOf[ActorSystemImpl]) { val mNew = in.readObject().asInstanceOf[TypedActor.MethodCall] mNew.method must be(m.method) diff --git a/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala b/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala index 8022edcc62..0e2f43a3d8 100644 --- a/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala @@ -46,7 +46,7 @@ object SerializeSpec { class SerializeSpec extends AkkaSpec(SerializeSpec.serializationConf) { import SerializeSpec._ - val ser = SerializationExtension(system).serialization + val ser = SerializationExtension(system) import ser._ val addr = Address("120", "Monroe Street", "Santa Clara", "95050") @@ -104,7 +104,7 @@ class SerializeSpec extends AkkaSpec(SerializeSpec.serializationConf) { out.close() val in = new ObjectInputStream(new ByteArrayInputStream(outbuf.toByteArray)) - Serialization.system.withValue(a.asInstanceOf[ActorSystemImpl]) { + Serialization.currentSystem.withValue(a.asInstanceOf[ActorSystemImpl]) { val deadLetters = in.readObject().asInstanceOf[DeadLetterActorRef] (deadLetters eq a.deadLetters) must be(true) } diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index e3229fba30..afa7299669 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -305,17 +305,17 @@ trait ScalaActorRef { ref: ActorRef ⇒ */ case class SerializedActorRef(hostname: String, port: Int, path: String) { - import akka.serialization.Serialization.system + import akka.serialization.Serialization.currentSystem def this(remoteAddress: RemoteAddress, path: String) = this(remoteAddress.hostname, remoteAddress.port, path) def this(remoteAddress: InetSocketAddress, path: String) = this(remoteAddress.getAddress.getHostAddress, remoteAddress.getPort, path) //TODO FIXME REMOVE @throws(classOf[java.io.ObjectStreamException]) - def readResolve(): AnyRef = { - if (system.value eq null) throw new IllegalStateException( + def readResolve(): AnyRef = currentSystem.value match { + case null ⇒ throw new IllegalStateException( "Trying to deserialize a serialized ActorRef without an ActorSystem in scope." + - " Use akka.serialization.Serialization.system.withValue(system) { ... }") - system.value.provider.deserialize(this) match { + " Use akka.serialization.Serialization.currentSystem.withValue(system) { ... }") + case someSystem ⇒ someSystem.provider.deserialize(this) match { case Some(actor) ⇒ actor case None ⇒ throw new IllegalStateException("Could not deserialize ActorRef") } @@ -354,7 +354,7 @@ case class DeadLetter(message: Any, sender: ActorRef, recipient: ActorRef) object DeadLetterActorRef { class SerializedDeadLetterActorRef extends Serializable { //TODO implement as Protobuf for performance? @throws(classOf[java.io.ObjectStreamException]) - private def readResolve(): AnyRef = Serialization.system.value.deadLetters + private def readResolve(): AnyRef = Serialization.currentSystem.value.deadLetters } val serialized = new SerializedDeadLetterActorRef diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index 1bf1ea2bd1..83d7aa9d3c 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -29,6 +29,7 @@ import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.Executors import scala.annotation.tailrec import akka.serialization.SerializationExtension +import org.jboss.netty.akka.util.internal.ConcurrentIdentityHashMap object ActorSystem { @@ -251,36 +252,25 @@ abstract class ActorSystem extends ActorRefFactory with TypedActorFactory { def stop() /** - * Register an [[akka.actor.Extension]] within this actor system. The supplied - * object is interrogated for the extension’s key with which the extension is - * accessible from anywhere you have a reference to this actor system in - * scope, e.g. within actors (see [[ActorSystem.extension]]). - * - * Extensions can be registered automatically by adding their fully-qualified - * class name to the `akka.extensions` configuration key. + * Registers the provided extension and creates its payload, if this extension isn't already registered + * This method has putIfAbsent-semantics, this method can potentially block, waiting for the initialization + * of the payload, if is in the process of registration from another Thread of execution */ - def registerExtension[T <: AnyRef](ext: Extension[T]): Extension[T] + def registerExtension[T <: Extension](ext: ExtensionId[T]): T /** - * Obtain a reference to a registered extension by passing in the key which - * the extension object returned from its init method (typically a static - * field or Scala `object`): - * - * {{{ - * class MyActor extends Actor { - * val ext: MyExtension = context.app.extension(MyExtension.key) - * } - * }}} - * - * Throws IllegalArgumentException if the extension key is not found. + * Returns the payload that is associated with the provided extension + * throws an IllegalStateException if it is not registered. + * This method can potentially block, waiting for the initialization + * of the payload, if is in the process of registration from another Thread of execution */ - def extension[T <: AnyRef](key: ExtensionKey[T]): T + def extension[T <: Extension](ext: ExtensionId[T]): T /** - * Query presence of a specific extension. Beware that this key needs to be - * “the same” as the one used for registration (it is using a HashMap). + * Returns whether the specified extension is already registered, this method can potentially block, waiting for the initialization + * of the payload, if is in the process of registration from another Thread of execution */ - def hasExtension(key: ExtensionKey[_]): Boolean + def hasExtension(ext: ExtensionId[_ <: Extension]): Boolean } class ActorSystemImpl(val name: String, val applicationConfig: Config) extends ActorSystem { @@ -355,7 +345,7 @@ class ActorSystemImpl(val name: String, val applicationConfig: Config) extends A private lazy val _start: this.type = { // TODO can we do something better than loading SerializationExtension from here? - _typedActor = new TypedActor(settings, SerializationExtension(this).serialization) + _typedActor = new TypedActor(settings, SerializationExtension(this)) provider.init(this) deadLetters.init(dispatcher, provider.rootPath) // this starts the reaper actor and the user-configured logging subscribers, which are also actors @@ -377,65 +367,61 @@ class ActorSystemImpl(val name: String, val applicationConfig: Config) extends A terminationFuture onComplete (_ ⇒ dispatcher.shutdown()) } - private val extensions = new ConcurrentHashMap[ExtensionKey[_], AnyRef] + private val extensions = new ConcurrentIdentityHashMap[ExtensionId[_], AnyRef] /** - * Attempts to initialize and register this extension if the key associated with it isn't already registered. - * The extension will only be initialized if it isn't already registered. - * Rethrows anything thrown when initializing the extension (doesn't register in that case) - * Returns the registered extension, might be another already registered instance. + * Returns any extension registered to the specified Extension or returns null if not registered */ @tailrec - final def registerExtension[T <: AnyRef](ext: Extension[T]): Extension[T] = { - /** - * Returns any extension registered to the specified key or returns null if not registered - */ - @tailrec - def findExtension[T <: AnyRef](key: ExtensionKey[T]): Option[T] = extensions.get(key) match { - case c: CountDownLatch ⇒ c.await(); findExtension(key) //Registration in process, await completion and retry - case e: Extension[_] ⇒ Some(e.asInstanceOf[T]) //Profit! - case null ⇒ None //Doesn't exist - } + private def findExtension[T <: Extension](ext: ExtensionId[T]): T = extensions.get(ext) match { + case c: CountDownLatch ⇒ c.await(); findExtension(ext) //Registration in process, await completion and retry + case other ⇒ other.asInstanceOf[T] //could be a T or null, in which case we return the null as T + } - findExtension(ext.key) match { - case Some(e: Extension[_]) ⇒ e.asInstanceOf[Extension[T]] //Profit! - case None ⇒ //Doesn't already exist, commence registration + @tailrec + final def registerExtension[T <: Extension](ext: ExtensionId[T]): T = { + findExtension(ext) match { + case null ⇒ //Doesn't already exist, commence registration val inProcessOfRegistration = new CountDownLatch(1) - extensions.putIfAbsent(ext.key, inProcessOfRegistration) match { // Signal that registration is in process + extensions.putIfAbsent(ext, inProcessOfRegistration) match { // Signal that registration is in process case null ⇒ try { // Signal was successfully sent - ext.init(this) //Initialize the new extension - extensions.replace(ext.key, inProcessOfRegistration, ext) //Replace our in process signal with the initialized extension - ext //Profit! + ext.createExtension(this) match { // Create and initialize the extension + case null ⇒ throw new IllegalStateException("Extension instance created as null for Extension: " + ext) + case instance ⇒ + extensions.replace(ext, inProcessOfRegistration, instance) //Replace our in process signal with the initialized extension + instance //Profit! + } } catch { case t ⇒ - extensions.remove(ext.key, inProcessOfRegistration) //In case shit hits the fan, remove the inProcess signal + extensions.remove(ext, inProcessOfRegistration) //In case shit hits the fan, remove the inProcess signal throw t //Escalate to caller } finally { inProcessOfRegistration.countDown //Always notify listeners of the inProcess signal } - case other ⇒ registerExtension(ext) //Someone else is in process of registering an extension for this key, retry + case other ⇒ registerExtension(ext) //Someone else is in process of registering an extension for this Extension, retry } + case existing ⇒ existing.asInstanceOf[T] } } - def extension[T <: AnyRef](key: ExtensionKey[T]): T = extensions.get(key) match { - case x: Extension[_] ⇒ x.asInstanceOf[T] - case _ ⇒ throw new IllegalArgumentException("trying to get non-registered extension " + key) + def extension[T <: Extension](ext: ExtensionId[T]): T = findExtension(ext) match { + case null ⇒ throw new IllegalArgumentException("Trying to get non-registered extension " + ext) + case some ⇒ some.asInstanceOf[T] } - def hasExtension(key: ExtensionKey[_]): Boolean = extensions.get(key) match { - case x: Extension[_] ⇒ true - case _ ⇒ false - } + def hasExtension(ext: ExtensionId[_ <: Extension]): Boolean = findExtension(ext) != null private def loadExtensions() { import scala.collection.JavaConversions._ settings.config.getStringList("akka.extensions") foreach { fqcn ⇒ import ReflectiveAccess._ - createInstance[Extension[_ <: AnyRef]](fqcn, noParams, noArgs) match { - case Left(ex) ⇒ log.error(ex, "Exception trying to load extension " + fqcn) - case Right(ext) ⇒ if (ext.isInstanceOf[Extension[_]]) registerExtension(ext) else log.error("Class {} is not an Extension", fqcn) + getObjectFor[AnyRef](fqcn).fold(_ ⇒ createInstance[AnyRef](fqcn, noParams, noArgs), Right(_)) match { + case Right(p: ExtensionIdProvider) ⇒ registerExtension(p.lookup()); + case Right(p: ExtensionId[_]) ⇒ registerExtension(p); + case Right(other) ⇒ log.error("'{}' is not an ExtensionIdProvider or ExtensionId, skipping...", fqcn) + case Left(problem) ⇒ log.error(problem, "While trying to load extension '{}', skipping...", fqcn) } + } } } diff --git a/akka-actor/src/main/scala/akka/actor/Extension.scala b/akka-actor/src/main/scala/akka/actor/Extension.scala index 7c582fa8c4..3850ef4462 100644 --- a/akka-actor/src/main/scala/akka/actor/Extension.scala +++ b/akka-actor/src/main/scala/akka/actor/Extension.scala @@ -16,53 +16,36 @@ package akka.actor * The extension itself can be created in any way desired and has full access * to the ActorSystem implementation. * - * Scala example: - * - * {{{ - * class MyExtension extends Extension[MyExtension] { - * def key = MyExtension - * def init(system: ActorSystemImpl) { - * ... // initialize here - * } - * } - * object MyExtension extends ExtensionKey[MyExtension] - * }}} - * - * Java example: - * - * {{{ - * static class MyExtension implements Extension { - * public static ExtensionKey key = new ExtensionKey() {}; - * - * public ExtensionKey key() { - * return key; - * } - * public void init(ActorSystemImpl system) { - * ... // initialize here - * } - * } - * }}} */ -trait Extension[T <: AnyRef] { - /** - * This method is called by the ActorSystem upon registering this extension. - * The key returned is used for looking up extensions, hence it must be a - * suitable hash key and available to all clients of the extension. This is - * best achieved by storing it in a static field (Java) or as/in an object - * (Scala). - */ - def key: ExtensionKey[T] +/** + * Market interface to signify an Akka Extension + */ +trait Extension - // FIXME ActorSystemImpl exposed to user API. We might well choose to introduce a new interface for this level of access, just so we can shuffle around the implementation - /** - * This method is called by the ActorSystem when the extension is registered - * to trigger initialization of the extension. - */ - def init(system: ActorSystemImpl): Unit +/** + * Identifies an Extension + * Lookup of Extensions is done by object identity, so the Id must be the same wherever it's used, + * otherwise you'll get the same extension loaded multiple times. + */ +trait ExtensionId[T <: Extension] { + def apply(system: ActorSystem): T = system.registerExtension(this) + def createExtension(system: ActorSystemImpl): T } /** - * Marker trait identifying a registered [[akka.actor.Extension]]. + * Java API for ExtensionId */ -trait ExtensionKey[T <: AnyRef] +abstract class AbstractExtensionId[T <: Extension] extends ExtensionId[T] + +/** + * To be able to load an ExtensionId from the configuration, + * a class that implements ExtensionIdProvider must be specified. + * The lookup method should return the canonical reference to the extension. + */ +trait ExtensionIdProvider { + /** + * Returns the canonical ExtensionId for this Extension + */ + def lookup(): ExtensionId[_ <: Extension] +} diff --git a/akka-actor/src/main/scala/akka/actor/TypedActor.scala b/akka-actor/src/main/scala/akka/actor/TypedActor.scala index 3409aa05e4..b6103048ff 100644 --- a/akka-actor/src/main/scala/akka/actor/TypedActor.scala +++ b/akka-actor/src/main/scala/akka/actor/TypedActor.scala @@ -58,11 +58,11 @@ object TypedActor { //TODO implement writeObject and readObject to serialize //TODO Possible optimization is to special encode the parameter-types to conserve space private def readResolve(): AnyRef = { - val system = akka.serialization.Serialization.system.value + val system = akka.serialization.Serialization.currentSystem.value if (system eq null) throw new IllegalStateException( "Trying to deserialize a SerializedMethodCall without an ActorSystem in scope." + - " Use akka.serialization.Serialization.system.withValue(system) { ... }") - val serialization = SerializationExtension(system).serialization + " Use akka.serialization.Serialization.currentSystem.withValue(system) { ... }") + val serialization = SerializationExtension(system) MethodCall(serialization, ownerType.getDeclaredMethod(methodName, parameterTypes: _*), serializedParameters match { case null ⇒ null case a if a.length == 0 ⇒ Array[AnyRef]() diff --git a/akka-actor/src/main/scala/akka/serialization/Serialization.scala b/akka-actor/src/main/scala/akka/serialization/Serialization.scala index 2e10bf22af..7232375fa8 100644 --- a/akka-actor/src/main/scala/akka/serialization/Serialization.scala +++ b/akka-actor/src/main/scala/akka/serialization/Serialization.scala @@ -6,16 +6,59 @@ package akka.serialization import akka.AkkaException import akka.util.ReflectiveAccess -import akka.actor.{ ActorSystem, ActorSystemImpl } import scala.util.DynamicVariable +import com.typesafe.config.{ ConfigRoot, ConfigParseOptions, ConfigFactory, Config } +import com.typesafe.config.Config._ +import akka.config.ConfigurationException +import akka.actor.{ Extension, ActorSystem, ActorSystemImpl } case class NoSerializerFoundException(m: String) extends AkkaException(m) +object Serialization { + + // TODO ensure that these are always set (i.e. withValue()) when doing deserialization + val currentSystem = new DynamicVariable[ActorSystemImpl](null) + + class Settings(cfg: Config) { + private def referenceConfig: Config = + ConfigFactory.parseResource(classOf[ActorSystem], "/akka-serialization-reference.conf", + ConfigParseOptions.defaults.setAllowMissing(false)) + val config: ConfigRoot = ConfigFactory.emptyRoot("akka-serialization").withFallback(cfg).withFallback(referenceConfig).resolve() + + import scala.collection.JavaConverters._ + import config._ + + val Serializers: Map[String, String] = { + toStringMap(getConfig("akka.actor.serializers")) + } + + val SerializationBindings: Map[String, Seq[String]] = { + val configPath = "akka.actor.serialization-bindings" + hasPath(configPath) match { + case false ⇒ Map() + case true ⇒ + val serializationBindings: Map[String, Seq[String]] = getConfig(configPath).toObject.unwrapped.asScala.toMap.map { + case (k: String, v: java.util.Collection[_]) ⇒ (k -> v.asScala.toSeq.asInstanceOf[Seq[String]]) + case invalid ⇒ throw new ConfigurationException("Invalid serialization-bindings [%s]".format(invalid)) + } + serializationBindings + + } + } + + private def toStringMap(mapConfig: Config): Map[String, String] = + mapConfig.toObject.unwrapped.asScala.toMap.map { case (k, v) ⇒ (k, v.toString) } + } +} + /** * Serialization module. Contains methods for serialization and deserialization as well as * locating a Serializer for a particular class as defined in the mapping in the 'akka.conf' file. */ -class Serialization(val system: ActorSystemImpl) { +class Serialization(val system: ActorSystemImpl) extends Extension { + import Serialization._ + + val settings = new Settings(system.applicationConfig) //TODO document me def serialize(o: AnyRef): Either[Exception, Array[Byte]] = @@ -27,7 +70,7 @@ class Serialization(val system: ActorSystemImpl) { clazz: Class[_], classLoader: Option[ClassLoader]): Either[Exception, AnyRef] = try { - Serialization.system.withValue(system) { + currentSystem.withValue(system) { Right(serializerFor(clazz).fromBinary(bytes, Some(clazz), classLoader)) } } catch { case e: Exception ⇒ Left(e) } @@ -63,15 +106,13 @@ class Serialization(val system: ActorSystemImpl) { } } - // serializers and bindings needs to be lazy because Serialization is initialized from SerializationExtension, which is needed here - /** * A Map of serializer from alias to implementation (class implementing akka.serialization.Serializer) * By default always contains the following mapping: "default" -> akka.serialization.JavaSerializer * But "default" can be overridden in config */ lazy val serializers: Map[String, Serializer] = { - val serializersConf = SerializationExtension(system).settings.Serializers + val serializersConf = settings.Serializers for ((k: String, v: String) ← serializersConf) yield k -> serializerOf(v).fold(throw _, identity) } @@ -80,7 +121,7 @@ class Serialization(val system: ActorSystemImpl) { * bindings is a Map whose keys = FQN of class that is serializable and values = the alias of the serializer to be used */ lazy val bindings: Map[String, String] = { - val configBindings = SerializationExtension(system).settings.SerializationBindings + val configBindings = settings.SerializationBindings configBindings.foldLeft(Map[String, String]()) { case (result, (k: String, vs: Seq[_])) ⇒ //All keys which are lists, take the Strings from them and Map them @@ -103,8 +144,3 @@ class Serialization(val system: ActorSystemImpl) { Map(NullSerializer.identifier -> NullSerializer) ++ serializers map { case (_, v) ⇒ (v.identifier, v) } } -object Serialization { - // TODO ensure that these are always set (i.e. withValue()) when doing deserialization - val system = new DynamicVariable[ActorSystemImpl](null) -} - diff --git a/akka-actor/src/main/scala/akka/serialization/SerializationExtension.scala b/akka-actor/src/main/scala/akka/serialization/SerializationExtension.scala index 4fc0b1be3c..a53ba832c7 100644 --- a/akka-actor/src/main/scala/akka/serialization/SerializationExtension.scala +++ b/akka-actor/src/main/scala/akka/serialization/SerializationExtension.scala @@ -3,77 +3,9 @@ */ package akka.serialization -import akka.actor.ActorSystem -import akka.actor.ExtensionKey -import akka.actor.Extension -import akka.actor.ActorSystemImpl -import com.typesafe.config.Config -import com.typesafe.config.ConfigFactory -import com.typesafe.config.ConfigParseOptions -import com.typesafe.config.ConfigRoot -import akka.config.ConfigurationException - -object SerializationExtensionKey extends ExtensionKey[SerializationExtension] - -object SerializationExtension { - def apply(system: ActorSystem): SerializationExtension = { - if (!system.hasExtension(SerializationExtensionKey)) { - system.registerExtension(new SerializationExtension) - } - system.extension(SerializationExtensionKey) - } - - class Settings(cfg: Config) { - private def referenceConfig: Config = - ConfigFactory.parseResource(classOf[ActorSystem], "/akka-serialization-reference.conf", - ConfigParseOptions.defaults.setAllowMissing(false)) - val config: ConfigRoot = ConfigFactory.emptyRoot("akka-serialization").withFallback(cfg).withFallback(referenceConfig).resolve() - - import scala.collection.JavaConverters._ - import config._ - - val Serializers: Map[String, String] = { - toStringMap(getConfig("akka.actor.serializers")) - } - - val SerializationBindings: Map[String, Seq[String]] = { - val configPath = "akka.actor.serialization-bindings" - hasPath(configPath) match { - case false ⇒ Map() - case true ⇒ - val serializationBindings: Map[String, Seq[String]] = getConfig(configPath).toObject.unwrapped.asScala.toMap.map { - case (k: String, v: java.util.Collection[_]) ⇒ (k -> v.asScala.toSeq.asInstanceOf[Seq[String]]) - case invalid ⇒ throw new ConfigurationException("Invalid serialization-bindings [%s]".format(invalid)) - } - serializationBindings - - } - } - - private def toStringMap(mapConfig: Config): Map[String, String] = { - mapConfig.toObject.unwrapped.asScala.toMap.map { entry ⇒ - (entry._1 -> entry._2.toString) - } - } - - } -} - -class SerializationExtension extends Extension[SerializationExtension] { - import SerializationExtension._ - @volatile - private var _settings: Settings = _ - @volatile - private var _serialization: Serialization = _ - def serialization = _serialization - - def key = SerializationExtensionKey - - def init(system: ActorSystemImpl) { - _settings = new Settings(system.applicationConfig) - _serialization = new Serialization(system) - } - - def settings: Settings = _settings +import akka.actor.{ ExtensionId, ExtensionIdProvider, ActorSystemImpl } +object SerializationExtension extends ExtensionId[Serialization] with ExtensionIdProvider { + override def lookup = SerializationExtension + override def createExtension(system: ActorSystemImpl): Serialization = new Serialization(system) } \ No newline at end of file diff --git a/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailbox.scala b/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailbox.scala index aa0ee6645d..c680511697 100644 --- a/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailbox.scala @@ -21,7 +21,7 @@ class BeanstalkBasedMailboxException(message: String) extends AkkaException(mess */ class BeanstalkBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner) with DurableMessageSerialization { - private val settings = BeanstalkBasedMailboxExtension(owner.system).settings + private val settings = BeanstalkBasedMailboxExtension(owner.system) private val messageSubmitDelaySeconds = settings.MessageSubmitDelay.toSeconds.toInt private val messageTimeToLiveSeconds = settings.MessageTimeToLive.toSeconds.toInt diff --git a/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailboxExtension.scala b/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailboxExtension.scala index 539b5b45e1..5f6fd40708 100644 --- a/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailboxExtension.scala +++ b/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailboxExtension.scala @@ -3,56 +3,32 @@ */ package akka.actor.mailbox -import akka.actor.ActorSystem -import akka.actor.ExtensionKey -import akka.actor.Extension -import akka.actor.ActorSystemImpl import com.typesafe.config.Config import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigParseOptions import com.typesafe.config.ConfigRoot import akka.util.Duration import java.util.concurrent.TimeUnit.MILLISECONDS +import akka.actor._ -object BeanstalkBasedMailboxExtensionKey extends ExtensionKey[BeanstalkBasedMailboxExtension] - -object BeanstalkBasedMailboxExtension { - def apply(system: ActorSystem): BeanstalkBasedMailboxExtension = { - if (!system.hasExtension(BeanstalkBasedMailboxExtensionKey)) { - system.registerExtension(new BeanstalkBasedMailboxExtension) - } - system.extension(BeanstalkBasedMailboxExtensionKey) - } - - class Settings(cfg: Config) { - private def referenceConfig: Config = - ConfigFactory.parseResource(classOf[ActorSystem], "/akka-beanstalk-mailbox-reference.conf", - ConfigParseOptions.defaults.setAllowMissing(false)) - val config: ConfigRoot = ConfigFactory.emptyRoot("akka-beanstalk-mailbox").withFallback(cfg).withFallback(referenceConfig).resolve() - - import config._ - - val Hostname = getString("akka.actor.mailbox.beanstalk.hostname") - val Port = getInt("akka.actor.mailbox.beanstalk.port") - val ReconnectWindow = Duration(getMilliseconds("akka.actor.mailbox.beanstalk.reconnect-window"), MILLISECONDS) - val MessageSubmitDelay = Duration(getMilliseconds("akka.actor.mailbox.beanstalk.message-submit-delay"), MILLISECONDS) - val MessageSubmitTimeout = Duration(getMilliseconds("akka.actor.mailbox.beanstalk.message-submit-timeout"), MILLISECONDS) - val MessageTimeToLive = Duration(getMilliseconds("akka.actor.mailbox.beanstalk.message-time-to-live"), MILLISECONDS) - - } +object BeanstalkBasedMailboxExtension extends ExtensionId[BeanstalkMailboxSettings] with ExtensionIdProvider { + def lookup() = this + def createExtension(system: ActorSystemImpl) = new BeanstalkMailboxSettings(system.applicationConfig) } -class BeanstalkBasedMailboxExtension extends Extension[BeanstalkBasedMailboxExtension] { - import BeanstalkBasedMailboxExtension._ - @volatile - private var _settings: Settings = _ +class BeanstalkMailboxSettings(cfg: Config) extends Extension { + private def referenceConfig: Config = + ConfigFactory.parseResource(classOf[ActorSystem], "/akka-beanstalk-mailbox-reference.conf", + ConfigParseOptions.defaults.setAllowMissing(false)) + val config: ConfigRoot = ConfigFactory.emptyRoot("akka-beanstalk-mailbox").withFallback(cfg).withFallback(referenceConfig).resolve() - def key = BeanstalkBasedMailboxExtensionKey + import config._ - def init(system: ActorSystemImpl) { - _settings = new Settings(system.applicationConfig) - } - - def settings: Settings = _settings + val Hostname = getString("akka.actor.mailbox.beanstalk.hostname") + val Port = getInt("akka.actor.mailbox.beanstalk.port") + val ReconnectWindow = Duration(getMilliseconds("akka.actor.mailbox.beanstalk.reconnect-window"), MILLISECONDS) + val MessageSubmitDelay = Duration(getMilliseconds("akka.actor.mailbox.beanstalk.message-submit-delay"), MILLISECONDS) + val MessageSubmitTimeout = Duration(getMilliseconds("akka.actor.mailbox.beanstalk.message-submit-timeout"), MILLISECONDS) + val MessageTimeToLive = Duration(getMilliseconds("akka.actor.mailbox.beanstalk.message-time-to-live"), MILLISECONDS) } \ No newline at end of file diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailboxExtension.scala b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailboxExtension.scala index f6ca730a1c..1bdf9ae958 100644 --- a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailboxExtension.scala +++ b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FileBasedMailboxExtension.scala @@ -3,63 +3,39 @@ */ package akka.actor.mailbox -import akka.actor.ActorSystem -import akka.actor.ExtensionKey -import akka.actor.Extension -import akka.actor.ActorSystemImpl import com.typesafe.config.Config import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigParseOptions import com.typesafe.config.ConfigRoot import akka.util.Duration import java.util.concurrent.TimeUnit.MILLISECONDS +import akka.actor._ -object FileBasedMailboxExtensionKey extends ExtensionKey[FileBasedMailboxExtension] - -object FileBasedMailboxExtension { - def apply(system: ActorSystem): FileBasedMailboxExtension = { - if (!system.hasExtension(FileBasedMailboxExtensionKey)) { - system.registerExtension(new FileBasedMailboxExtension) - } - system.extension(FileBasedMailboxExtensionKey) - } - - class Settings(cfg: Config) { - private def referenceConfig: Config = - ConfigFactory.parseResource(classOf[ActorSystem], "/akka-file-mailbox-reference.conf", - ConfigParseOptions.defaults.setAllowMissing(false)) - val config: ConfigRoot = ConfigFactory.emptyRoot("akka-file-mailbox").withFallback(cfg).withFallback(referenceConfig).resolve() - - import config._ - - val QueuePath = getString("akka.actor.mailbox.file-based.directory-path") - - val MaxItems = getInt("akka.actor.mailbox.file-based.max-items") - val MaxSize = getMemorySizeInBytes("akka.actor.mailbox.file-based.max-size") - val MaxItemSize = getMemorySizeInBytes("akka.actor.mailbox.file-based.max-item-size") - val MaxAge = Duration(getMilliseconds("akka.actor.mailbox.file-based.max-age"), MILLISECONDS) - val MaxJournalSize = getMemorySizeInBytes("akka.actor.mailbox.file-based.max-journal-size") - val MaxMemorySize = getMemorySizeInBytes("akka.actor.mailbox.file-based.max-memory-size") - val MaxJournalOverflow = getInt("akka.actor.mailbox.file-based.max-journal-overflow") - val MaxJournalSizeAbsolute = getMemorySizeInBytes("akka.actor.mailbox.file-based.max-journal-size-absolute") - val DiscardOldWhenFull = getBoolean("akka.actor.mailbox.file-based.discard-old-when-full") - val KeepJournal = getBoolean("akka.actor.mailbox.file-based.keep-journal") - val SyncJournal = getBoolean("akka.actor.mailbox.file-based.sync-journal") - - } +object FileBasedMailboxExtension extends ExtensionId[FileBasedMailboxSettings] with ExtensionIdProvider { + def lookup() = this + def createExtension(system: ActorSystemImpl) = new FileBasedMailboxSettings(system.applicationConfig) } -class FileBasedMailboxExtension extends Extension[FileBasedMailboxExtension] { - import FileBasedMailboxExtension._ - @volatile - private var _settings: Settings = _ +class FileBasedMailboxSettings(cfg: Config) extends Extension { + private def referenceConfig: Config = + ConfigFactory.parseResource(classOf[ActorSystem], "/akka-file-mailbox-reference.conf", + ConfigParseOptions.defaults.setAllowMissing(false)) + val config: ConfigRoot = ConfigFactory.emptyRoot("akka-file-mailbox").withFallback(cfg).withFallback(referenceConfig).resolve() - def key = FileBasedMailboxExtensionKey + import config._ - def init(system: ActorSystemImpl) { - _settings = new Settings(system.applicationConfig) - } + val QueuePath = getString("akka.actor.mailbox.file-based.directory-path") - def settings: Settings = _settings + val MaxItems = getInt("akka.actor.mailbox.file-based.max-items") + val MaxSize = getMemorySizeInBytes("akka.actor.mailbox.file-based.max-size") + val MaxItemSize = getMemorySizeInBytes("akka.actor.mailbox.file-based.max-item-size") + val MaxAge = Duration(getMilliseconds("akka.actor.mailbox.file-based.max-age"), MILLISECONDS) + val MaxJournalSize = getMemorySizeInBytes("akka.actor.mailbox.file-based.max-journal-size") + val MaxMemorySize = getMemorySizeInBytes("akka.actor.mailbox.file-based.max-memory-size") + val MaxJournalOverflow = getInt("akka.actor.mailbox.file-based.max-journal-overflow") + val MaxJournalSizeAbsolute = getMemorySizeInBytes("akka.actor.mailbox.file-based.max-journal-size-absolute") + val DiscardOldWhenFull = getBoolean("akka.actor.mailbox.file-based.discard-old-when-full") + val KeepJournal = getBoolean("akka.actor.mailbox.file-based.keep-journal") + val SyncJournal = getBoolean("akka.actor.mailbox.file-based.sync-journal") } \ No newline at end of file diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FiledBasedMailbox.scala b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FiledBasedMailbox.scala index e167a88f27..8a81b2f8e4 100644 --- a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FiledBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FiledBasedMailbox.scala @@ -14,7 +14,7 @@ class FileBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner) with val log = Logging(system, "FileBasedMailbox") - private val settings = FileBasedMailboxExtension(owner.system).settings + private val settings = FileBasedMailboxExtension(owner.system) val queuePath = settings.QueuePath private val queue = try { diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filequeue/PersistentQueue.scala b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filequeue/PersistentQueue.scala index 1ae3cd9e06..1a5ddf4a8c 100644 --- a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filequeue/PersistentQueue.scala +++ b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filequeue/PersistentQueue.scala @@ -22,7 +22,7 @@ import scala.collection.mutable import akka.event.LoggingAdapter import akka.util.Duration import java.util.concurrent.TimeUnit -import akka.actor.mailbox.FileBasedMailboxExtension +import akka.actor.mailbox.FileBasedMailboxSettings // a config value that's backed by a global setting but may be locally overridden class OverlaySetting[T](base: ⇒ T) { @@ -34,7 +34,7 @@ class OverlaySetting[T](base: ⇒ T) { def apply() = local.getOrElse(base) } -class PersistentQueue(persistencePath: String, val name: String, val settings: FileBasedMailboxExtension.Settings, log: LoggingAdapter) { +class PersistentQueue(persistencePath: String, val name: String, val settings: FileBasedMailboxSettings, log: LoggingAdapter) { private case object ItemArrived @@ -127,7 +127,7 @@ class PersistentQueue(persistencePath: String, val name: String, val settings: F configure(settings) - def configure(settings: FileBasedMailboxExtension.Settings) = synchronized { + def configure(settings: FileBasedMailboxSettings) = synchronized { maxItems set Some(settings.MaxItems) maxSize set Some(settings.MaxSize) maxItemSize set Some(settings.MaxItemSize) diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filequeue/QueueCollection.scala b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filequeue/QueueCollection.scala index ff5e12c86e..568428dfc6 100644 --- a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filequeue/QueueCollection.scala +++ b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filequeue/QueueCollection.scala @@ -21,11 +21,11 @@ import java.io.File import java.util.concurrent.CountDownLatch import scala.collection.mutable import akka.event.LoggingAdapter -import akka.actor.mailbox.FileBasedMailboxExtension +import akka.actor.mailbox.FileBasedMailboxSettings class InaccessibleQueuePath extends Exception("Inaccessible queue path: Must be a directory and writable") -class QueueCollection(queueFolder: String, settings: FileBasedMailboxExtension.Settings, log: LoggingAdapter) { +class QueueCollection(queueFolder: String, settings: FileBasedMailboxSettings, log: LoggingAdapter) { private val path = new File(queueFolder) if (!path.isDirectory) { diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/test/scala/akka/actor/mailbox/FileBasedMailboxSpec.scala b/akka-durable-mailboxes/akka-file-mailbox/src/test/scala/akka/actor/mailbox/FileBasedMailboxSpec.scala index 8b17ec9662..d1a36d14eb 100644 --- a/akka-durable-mailboxes/akka-file-mailbox/src/test/scala/akka/actor/mailbox/FileBasedMailboxSpec.scala +++ b/akka-durable-mailboxes/akka-file-mailbox/src/test/scala/akka/actor/mailbox/FileBasedMailboxSpec.scala @@ -6,7 +6,7 @@ import org.apache.commons.io.FileUtils class FileBasedMailboxSpec extends DurableMailboxSpec("File", FileDurableMailboxType) { def clean { - val queuePath = FileBasedMailboxExtension(system).settings.QueuePath + val queuePath = FileBasedMailboxExtension(system).QueuePath FileUtils.deleteDirectory(new java.io.File(queuePath)) } diff --git a/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala b/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala index 65fda89354..d010a1ef6a 100644 --- a/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala @@ -31,7 +31,7 @@ class MongoBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner) { implicit val mailboxBSONSer = new BSONSerializableMailbox(system) implicit val safeWrite = WriteConcern.Safe // TODO - Replica Safe when appropriate! - private val settings = MongoBasedMailboxExtension(owner.system).settings + private val settings = MongoBasedMailboxExtension(owner.system) val log = Logging(system, "MongoBasedMailbox") diff --git a/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailboxExtension.scala b/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailboxExtension.scala index e6ca3443e0..88eb95438c 100644 --- a/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailboxExtension.scala +++ b/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailboxExtension.scala @@ -3,54 +3,30 @@ */ package akka.actor.mailbox -import akka.actor.ActorSystem -import akka.actor.ExtensionKey -import akka.actor.Extension -import akka.actor.ActorSystemImpl import com.typesafe.config.Config import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigParseOptions import com.typesafe.config.ConfigRoot import akka.util.Duration import java.util.concurrent.TimeUnit.MILLISECONDS +import akka.actor._ -object MongoBasedMailboxExtensionKey extends ExtensionKey[MongoBasedMailboxExtension] - -object MongoBasedMailboxExtension { - def apply(system: ActorSystem): MongoBasedMailboxExtension = { - if (!system.hasExtension(MongoBasedMailboxExtensionKey)) { - system.registerExtension(new MongoBasedMailboxExtension) - } - system.extension(MongoBasedMailboxExtensionKey) - } - - class Settings(cfg: Config) { - private def referenceConfig: Config = - ConfigFactory.parseResource(classOf[ActorSystem], "/akka-mongo-mailbox-reference.conf", - ConfigParseOptions.defaults.setAllowMissing(false)) - val config: ConfigRoot = ConfigFactory.emptyRoot("akka-mongo-mailbox").withFallback(cfg).withFallback(referenceConfig).resolve() - - import config._ - - val UriConfigKey = "akka.actor.mailbox.mongodb.uri" - val MongoURI = if (config.hasPath(UriConfigKey)) Some(config.getString(UriConfigKey)) else None - val WriteTimeout = Duration(config.getMilliseconds("akka.actor.mailbox.mongodb.timeout.write"), MILLISECONDS) - val ReadTimeout = Duration(config.getMilliseconds("akka.actor.mailbox.mongodb.timeout.read"), MILLISECONDS) - - } +object MongoBasedMailboxExtension extends ExtensionId[MongoBasedMailboxSettings] with ExtensionIdProvider { + def lookup() = this + def createExtension(system: ActorSystemImpl) = new MongoBasedMailboxSettings(system.applicationConfig) } -class MongoBasedMailboxExtension extends Extension[MongoBasedMailboxExtension] { - import MongoBasedMailboxExtension._ - @volatile - private var _settings: Settings = _ +class MongoBasedMailboxSettings(cfg: Config) extends Extension { + private def referenceConfig: Config = + ConfigFactory.parseResource(classOf[ActorSystem], "/akka-mongo-mailbox-reference.conf", + ConfigParseOptions.defaults.setAllowMissing(false)) + val config: ConfigRoot = ConfigFactory.emptyRoot("akka-mongo-mailbox").withFallback(cfg).withFallback(referenceConfig).resolve() - def key = MongoBasedMailboxExtensionKey + import config._ - def init(system: ActorSystemImpl) { - _settings = new Settings(system.applicationConfig) - } - - def settings: Settings = _settings + val UriConfigKey = "akka.actor.mailbox.mongodb.uri" + val MongoURI = if (config.hasPath(UriConfigKey)) Some(config.getString(UriConfigKey)) else None + val WriteTimeout = Duration(config.getMilliseconds("akka.actor.mailbox.mongodb.timeout.write"), MILLISECONDS) + val ReadTimeout = Duration(config.getMilliseconds("akka.actor.mailbox.mongodb.timeout.read"), MILLISECONDS) } \ No newline at end of file diff --git a/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailbox.scala b/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailbox.scala index d005d5675f..7bb1c5a5dc 100644 --- a/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailbox.scala @@ -18,7 +18,7 @@ class RedisBasedMailboxException(message: String) extends AkkaException(message) */ class RedisBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner) with DurableMessageSerialization { - private val settings = RedisBasedMailboxExtension(owner.system).settings + private val settings = RedisBasedMailboxExtension(owner.system) @volatile private var clients = connect() // returns a RedisClientPool for multiple asynchronous message handling diff --git a/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailboxExtension.scala b/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailboxExtension.scala index 4b3d424e0f..beccf4051f 100644 --- a/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailboxExtension.scala +++ b/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailboxExtension.scala @@ -3,50 +3,25 @@ */ package akka.actor.mailbox -import akka.actor.ActorSystem -import akka.actor.ExtensionKey -import akka.actor.Extension -import akka.actor.ActorSystemImpl import com.typesafe.config.Config import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigParseOptions import com.typesafe.config.ConfigRoot +import akka.actor._ -object RedisBasedMailboxExtensionKey extends ExtensionKey[RedisBasedMailboxExtension] - -object RedisBasedMailboxExtension { - def apply(system: ActorSystem): RedisBasedMailboxExtension = { - if (!system.hasExtension(RedisBasedMailboxExtensionKey)) { - system.registerExtension(new RedisBasedMailboxExtension) - } - system.extension(RedisBasedMailboxExtensionKey) - } - - class Settings(cfg: Config) { - private def referenceConfig: Config = - ConfigFactory.parseResource(classOf[ActorSystem], "/akka-redis-mailbox-reference.conf", - ConfigParseOptions.defaults.setAllowMissing(false)) - val config: ConfigRoot = ConfigFactory.emptyRoot("akka-redis-mailbox").withFallback(cfg).withFallback(referenceConfig).resolve() - - import config._ - - val Hostname = getString("akka.actor.mailbox.redis.hostname") - val Port = getInt("akka.actor.mailbox.redis.port") - - } +object RedisBasedMailboxExtension extends ExtensionId[RedisBasedMailboxSettings] with ExtensionIdProvider { + def lookup() = this + def createExtension(system: ActorSystemImpl) = new RedisBasedMailboxSettings(system.applicationConfig) } -class RedisBasedMailboxExtension extends Extension[RedisBasedMailboxExtension] { - import RedisBasedMailboxExtension._ - @volatile - private var _settings: Settings = _ +class RedisBasedMailboxSettings(cfg: Config) extends Extension { + private def referenceConfig: Config = + ConfigFactory.parseResource(classOf[ActorSystem], "/akka-redis-mailbox-reference.conf", + ConfigParseOptions.defaults.setAllowMissing(false)) + val config: ConfigRoot = ConfigFactory.emptyRoot("akka-redis-mailbox").withFallback(cfg).withFallback(referenceConfig).resolve() - def key = RedisBasedMailboxExtensionKey - - def init(system: ActorSystemImpl) { - _settings = new Settings(system.applicationConfig) - } - - def settings: Settings = _settings + import config._ + val Hostname = getString("akka.actor.mailbox.redis.hostname") + val Port = getInt("akka.actor.mailbox.redis.port") } \ No newline at end of file diff --git a/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala b/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala index 3979dfdf36..8350f743d5 100644 --- a/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala @@ -22,7 +22,7 @@ class ZooKeeperBasedMailboxException(message: String) extends AkkaException(mess */ class ZooKeeperBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner) with DurableMessageSerialization { - private val settings = ZooKeeperBasedMailboxExtension(owner.system).settings + private val settings = ZooKeeperBasedMailboxExtension(owner.system) val queueNode = "/queues" val queuePathTemplate = queueNode + "/%s" diff --git a/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailboxExtension.scala b/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailboxExtension.scala index a08df43bf5..e2b0ad45f7 100644 --- a/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailboxExtension.scala +++ b/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailboxExtension.scala @@ -3,54 +3,29 @@ */ package akka.actor.mailbox -import akka.actor.ActorSystem -import akka.actor.ExtensionKey -import akka.actor.Extension -import akka.actor.ActorSystemImpl import com.typesafe.config.Config import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigParseOptions import com.typesafe.config.ConfigRoot import akka.util.Duration import java.util.concurrent.TimeUnit.MILLISECONDS +import akka.actor._ -object ZooKeeperBasedMailboxExtensionKey extends ExtensionKey[ZooKeeperBasedMailboxExtension] - -object ZooKeeperBasedMailboxExtension { - def apply(system: ActorSystem): ZooKeeperBasedMailboxExtension = { - if (!system.hasExtension(ZooKeeperBasedMailboxExtensionKey)) { - system.registerExtension(new ZooKeeperBasedMailboxExtension) - } - system.extension(ZooKeeperBasedMailboxExtensionKey) - } - - class Settings(cfg: Config) { - private def referenceConfig: Config = - ConfigFactory.parseResource(classOf[ActorSystem], "/akka-zookeeper-mailbox-reference.conf", - ConfigParseOptions.defaults.setAllowMissing(false)) - val config: ConfigRoot = ConfigFactory.emptyRoot("akka-zookeeper-mailbox").withFallback(cfg).withFallback(referenceConfig).resolve() - - import config._ - - val ZkServerAddresses = getString("akka.actor.mailbox.zookeeper.server-addresses") - val SessionTimeout = Duration(getMilliseconds("akka.actor.mailbox.zookeeper.session-timeout"), MILLISECONDS) - val ConnectionTimeout = Duration(getMilliseconds("akka.actor.mailbox.zookeeper.connection-timeout"), MILLISECONDS) - val BlockingQueue = getBoolean("akka.actor.mailbox.zookeeper.blocking-queue") - - } +object ZooKeeperBasedMailboxExtension extends ExtensionId[ZooKeeperBasedMailboxSettings] with ExtensionIdProvider { + def lookup() = this + def createExtension(system: ActorSystemImpl) = new ZooKeeperBasedMailboxSettings(system.applicationConfig) } +class ZooKeeperBasedMailboxSettings(cfg: Config) extends Extension { + private def referenceConfig: Config = + ConfigFactory.parseResource(classOf[ActorSystem], "/akka-zookeeper-mailbox-reference.conf", + ConfigParseOptions.defaults.setAllowMissing(false)) + val config: ConfigRoot = ConfigFactory.emptyRoot("akka-zookeeper-mailbox").withFallback(cfg).withFallback(referenceConfig).resolve() -class ZooKeeperBasedMailboxExtension extends Extension[ZooKeeperBasedMailboxExtension] { - import ZooKeeperBasedMailboxExtension._ - @volatile - private var _settings: Settings = _ + import config._ - def key = ZooKeeperBasedMailboxExtensionKey - - def init(system: ActorSystemImpl) { - _settings = new Settings(system.applicationConfig) - } - - def settings: Settings = _settings + val ZkServerAddresses = getString("akka.actor.mailbox.zookeeper.server-addresses") + val SessionTimeout = Duration(getMilliseconds("akka.actor.mailbox.zookeeper.session-timeout"), MILLISECONDS) + val ConnectionTimeout = Duration(getMilliseconds("akka.actor.mailbox.zookeeper.connection-timeout"), MILLISECONDS) + val BlockingQueue = getBoolean("akka.actor.mailbox.zookeeper.blocking-queue") } \ No newline at end of file diff --git a/akka-remote/src/main/scala/akka/remote/AccrualFailureDetector.scala b/akka-remote/src/main/scala/akka/remote/AccrualFailureDetector.scala index 01244a5fad..87dda83b71 100644 --- a/akka-remote/src/main/scala/akka/remote/AccrualFailureDetector.scala +++ b/akka-remote/src/main/scala/akka/remote/AccrualFailureDetector.scala @@ -27,8 +27,8 @@ class AccrualFailureDetector(val threshold: Int = 8, val maxSampleSize: Int = 10 def this(system: ActorSystem) { this( - RemoteExtension(system).settings.FailureDetectorThreshold, - RemoteExtension(system).settings.FailureDetectorMaxSampleSize) + RemoteExtension(system).FailureDetectorThreshold, + RemoteExtension(system).FailureDetectorMaxSampleSize) } private final val PhiFactor = 1.0 / math.log(10.0) diff --git a/akka-remote/src/main/scala/akka/remote/Gossiper.scala b/akka-remote/src/main/scala/akka/remote/Gossiper.scala index 243a5f9596..8050b24fe1 100644 --- a/akka-remote/src/main/scala/akka/remote/Gossiper.scala +++ b/akka-remote/src/main/scala/akka/remote/Gossiper.scala @@ -106,13 +106,13 @@ class Gossiper(remote: Remote) { private val system = remote.system private val remoteExtension = RemoteExtension(system) - private val serializationExtension = SerializationExtension(system) + private val serialization = SerializationExtension(system) private val log = Logging(system, "Gossiper") private val failureDetector = remote.failureDetector private val connectionManager = new RemoteConnectionManager(system, remote, Map.empty[RemoteAddress, ActorRef]) private val seeds = { - val seeds = RemoteExtension(system).settings.SeedNodes + val seeds = remoteExtension.SeedNodes if (seeds.isEmpty) throw new ConfigurationException( "At least one seed node must be defined in the configuration [akka.cluster.seed-nodes]") else seeds @@ -248,7 +248,7 @@ class Gossiper(remote: Remote) { throw new IllegalStateException("Connection for [" + peer + "] is not set up")) try { - (connection ? (toRemoteMessage(newGossip), remoteExtension.settings.RemoteSystemDaemonAckTimeout)).as[Status] match { + (connection ? (toRemoteMessage(newGossip), remoteExtension.RemoteSystemDaemonAckTimeout)).as[Status] match { case Some(Success(receiver)) ⇒ log.debug("Gossip sent to [{}] was successfully received", receiver) @@ -310,7 +310,7 @@ class Gossiper(remote: Remote) { } private def toRemoteMessage(gossip: Gossip): RemoteProtocol.RemoteSystemDaemonMessageProtocol = { - val gossipAsBytes = serializationExtension.serialization.serialize(gossip) match { + val gossipAsBytes = serialization.serialize(gossip) match { case Left(error) ⇒ throw error case Right(bytes) ⇒ bytes } diff --git a/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala b/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala index 628264b207..03e10e770b 100644 --- a/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala +++ b/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala @@ -14,13 +14,13 @@ object MessageSerializer { def deserialize(system: ActorSystem, messageProtocol: MessageProtocol, classLoader: Option[ClassLoader] = None): AnyRef = { val clazz = loadManifest(classLoader, messageProtocol) - SerializationExtension(system).serialization.deserialize(messageProtocol.getMessage.toByteArray, + SerializationExtension(system).deserialize(messageProtocol.getMessage.toByteArray, clazz, classLoader).fold(x ⇒ throw x, identity) } def serialize(system: ActorSystem, message: AnyRef): MessageProtocol = { val builder = MessageProtocol.newBuilder - val bytes = SerializationExtension(system).serialization.serialize(message).fold(x ⇒ throw x, identity) + val bytes = SerializationExtension(system).serialize(message).fold(x ⇒ throw x, identity) builder.setMessage(ByteString.copyFrom(bytes)) builder.setMessageManifest(ByteString.copyFromUtf8(message.getClass.getName)) builder.build diff --git a/akka-remote/src/main/scala/akka/remote/Remote.scala b/akka-remote/src/main/scala/akka/remote/Remote.scala index 123304c314..3db093efd9 100644 --- a/akka-remote/src/main/scala/akka/remote/Remote.scala +++ b/akka-remote/src/main/scala/akka/remote/Remote.scala @@ -36,9 +36,9 @@ class Remote(val system: ActorSystemImpl, val nodename: String) { import settings._ private[remote] val remoteExtension = RemoteExtension(system) - private[remote] val serializationExtension = SerializationExtension(system) + private[remote] val serialization = SerializationExtension(system) private[remote] val remoteAddress = { - RemoteAddress(remoteExtension.settings.serverSettings.Hostname, remoteExtension.settings.serverSettings.Port) + RemoteAddress(remoteExtension.serverSettings.Hostname, remoteExtension.serverSettings.Port) } val failureDetector = new AccrualFailureDetector(system) @@ -134,10 +134,10 @@ class RemoteSystemDaemon(remote: Remote) extends Actor { if (message.hasActorPath) { val actorFactoryBytes = - if (remoteExtension.settings.ShouldCompressData) LZF.uncompress(message.getPayload.toByteArray) else message.getPayload.toByteArray + if (remoteExtension.ShouldCompressData) LZF.uncompress(message.getPayload.toByteArray) else message.getPayload.toByteArray val actorFactory = - serializationExtension.serialization.deserialize(actorFactoryBytes, classOf[() ⇒ Actor], None) match { + serialization.deserialize(actorFactoryBytes, classOf[() ⇒ Actor], None) match { case Left(error) ⇒ throw error case Right(instance) ⇒ instance.asInstanceOf[() ⇒ Actor] } @@ -234,7 +234,7 @@ class RemoteSystemDaemon(remote: Remote) extends Actor { } private def payloadFor[T](message: RemoteSystemDaemonMessageProtocol, clazz: Class[T]): T = { - serializationExtension.serialization.deserialize(message.getPayload.toByteArray, clazz, None) match { + serialization.deserialize(message.getPayload.toByteArray, clazz, None) match { case Left(error) ⇒ throw error case Right(instance) ⇒ instance.asInstanceOf[T] } diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index f388cd976a..358568e13c 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -55,13 +55,13 @@ class RemoteActorRefProvider( @volatile private var system: ActorSystemImpl = _ private lazy val remoteExtension = RemoteExtension(system) - private lazy val serializationExtension = SerializationExtension(system) + private lazy val serialization = SerializationExtension(system) lazy val rootPath: ActorPath = { - val remoteAddress = RemoteAddress(remoteExtension.settings.serverSettings.Hostname, remoteExtension.settings.serverSettings.Port) + val remoteAddress = RemoteAddress(remoteExtension.serverSettings.Hostname, remoteExtension.serverSettings.Port) new RootActorPath(remoteAddress) } private lazy val local = new LocalActorRefProvider(settings, eventStream, scheduler, rootPath, - remoteExtension.settings.NodeName, remoteExtension.settings.ClusterName) + remoteExtension.NodeName, remoteExtension.ClusterName) private[akka] lazy val remote = new Remote(system, nodename) private lazy val remoteDaemonConnectionManager = new RemoteConnectionManager(system, remote) @@ -220,9 +220,9 @@ class RemoteActorRefProvider( log.debug("[{}] Instantiating Actor [{}] on node [{}]", rootPath, actorPath, remoteAddress) val actorFactoryBytes = - serializationExtension.serialization.serialize(actorFactory) match { + serialization.serialize(actorFactory) match { case Left(error) ⇒ throw error - case Right(bytes) ⇒ if (remoteExtension.settings.ShouldCompressData) LZF.compress(bytes) else bytes + case Right(bytes) ⇒ if (remoteExtension.ShouldCompressData) LZF.compress(bytes) else bytes } val command = RemoteSystemDaemonMessageProtocol.newBuilder @@ -242,7 +242,7 @@ class RemoteActorRefProvider( private def sendCommandToRemoteNode(connection: ActorRef, command: RemoteSystemDaemonMessageProtocol, withACK: Boolean) { if (withACK) { try { - val f = connection ? (command, remoteExtension.settings.RemoteSystemDaemonAckTimeout) + val f = connection ? (command, remoteExtension.RemoteSystemDaemonAckTimeout) (try f.await.value catch { case _: FutureTimeoutException ⇒ None }) match { case Some(Right(receiver)) ⇒ log.debug("Remote system command sent to [{}] successfully received", receiver) diff --git a/akka-remote/src/main/scala/akka/remote/RemoteExtension.scala b/akka-remote/src/main/scala/akka/remote/RemoteExtension.scala index b8e4f89f40..89b09e289d 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteExtension.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteExtension.scala @@ -3,10 +3,6 @@ */ package akka.remote -import akka.actor.ActorSystem -import akka.actor.ExtensionKey -import akka.actor.Extension -import akka.actor.ActorSystemImpl import com.typesafe.config.Config import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigParseOptions @@ -16,97 +12,77 @@ import java.util.concurrent.TimeUnit.MILLISECONDS import java.net.InetAddress import akka.config.ConfigurationException import com.eaio.uuid.UUID +import akka.actor._ import scala.collection.JavaConverters._ -object RemoteExtensionKey extends ExtensionKey[RemoteExtension] - -object RemoteExtension { - def apply(system: ActorSystem): RemoteExtension = { - if (!system.hasExtension(RemoteExtensionKey)) { - system.registerExtension(new RemoteExtension) - } - system.extension(RemoteExtensionKey) - } - - class Settings(cfg: Config) { - private def referenceConfig: Config = - ConfigFactory.parseResource(classOf[ActorSystem], "/akka-remote-reference.conf", - ConfigParseOptions.defaults.setAllowMissing(false)) - val config: ConfigRoot = ConfigFactory.emptyRoot("akka-remote").withFallback(cfg).withFallback(referenceConfig).resolve() - - import config._ - - val RemoteTransport = getString("akka.remote.layer") - val FailureDetectorThreshold = getInt("akka.remote.failure-detector.threshold") - val FailureDetectorMaxSampleSize = getInt("akka.remote.failure-detector.max-sample-size") - val ShouldCompressData = config.getBoolean("akka.remote.use-compression") - val RemoteSystemDaemonAckTimeout = Duration(config.getMilliseconds("akka.remote.remote-daemon-ack-timeout"), MILLISECONDS) - - // TODO cluster config will go into akka-cluster-reference.conf when we enable that module - val ClusterName = getString("akka.cluster.name") - val SeedNodes = Set.empty[RemoteAddress] ++ getStringList("akka.cluster.seed-nodes").asScala.toSeq.map(RemoteAddress(_)) - - val NodeName: String = config.getString("akka.cluster.nodename") match { - case "" ⇒ new UUID().toString - case value ⇒ value - } - - val serverSettings = new RemoteServerSettings - val clientSettings = new RemoteClientSettings - - class RemoteClientSettings { - val SecureCookie: Option[String] = config.getString("akka.remote.secure-cookie") match { - case "" ⇒ None - case cookie ⇒ Some(cookie) - } - - val ReconnectionTimeWindow = Duration(config.getMilliseconds("akka.remote.client.reconnection-time-window"), MILLISECONDS) - val ReadTimeout = Duration(config.getMilliseconds("akka.remote.client.read-timeout"), MILLISECONDS) - val ReconnectDelay = Duration(config.getMilliseconds("akka.remote.client.reconnect-delay"), MILLISECONDS) - val MessageFrameSize = config.getInt("akka.remote.client.message-frame-size") - } - - class RemoteServerSettings { - val MessageFrameSize = config.getInt("akka.remote.server.message-frame-size") - val SecureCookie: Option[String] = config.getString("akka.remote.secure-cookie") match { - case "" ⇒ None - case cookie ⇒ Some(cookie) - } - val RequireCookie = { - val requireCookie = config.getBoolean("akka.remote.server.require-cookie") - if (requireCookie && SecureCookie.isEmpty) throw new ConfigurationException( - "Configuration option 'akka.remote.server.require-cookie' is turned on but no secure cookie is defined in 'akka.remote.secure-cookie'.") - requireCookie - } - - val UsePassiveConnections = config.getBoolean("akka.remote.use-passive-connections") - - val UntrustedMode = config.getBoolean("akka.remote.server.untrusted-mode") - val Hostname = config.getString("akka.remote.server.hostname") match { - case "" ⇒ InetAddress.getLocalHost.getHostAddress - case value ⇒ value - } - val Port = config.getInt("akka.remote.server.port") - val ConnectionTimeout = Duration(config.getMilliseconds("akka.remote.server.connection-timeout"), MILLISECONDS) - - val Backlog = config.getInt("akka.remote.server.backlog") - } - - } +object RemoteExtension extends ExtensionId[RemoteExtensionSettings] with ExtensionIdProvider { + def lookup() = this + def createExtension(system: ActorSystemImpl) = new RemoteExtensionSettings(system.applicationConfig) } -class RemoteExtension extends Extension[RemoteExtension] { - import RemoteExtension._ - @volatile - private var _settings: Settings = _ +class RemoteExtensionSettings(cfg: Config) extends Extension { + private def referenceConfig: Config = + ConfigFactory.parseResource(classOf[ActorSystem], "/akka-remote-reference.conf", + ConfigParseOptions.defaults.setAllowMissing(false)) + val config: ConfigRoot = ConfigFactory.emptyRoot("akka-remote").withFallback(cfg).withFallback(referenceConfig).resolve() - def key = RemoteExtensionKey + import config._ - def init(system: ActorSystemImpl) { - _settings = new Settings(system.applicationConfig) + val RemoteTransport = getString("akka.remote.layer") + val FailureDetectorThreshold = getInt("akka.remote.failure-detector.threshold") + val FailureDetectorMaxSampleSize = getInt("akka.remote.failure-detector.max-sample-size") + val ShouldCompressData = config.getBoolean("akka.remote.use-compression") + val RemoteSystemDaemonAckTimeout = Duration(config.getMilliseconds("akka.remote.remote-daemon-ack-timeout"), MILLISECONDS) + + // TODO cluster config will go into akka-cluster-reference.conf when we enable that module + val ClusterName = getString("akka.cluster.name") + val SeedNodes = Set.empty[RemoteAddress] ++ getStringList("akka.cluster.seed-nodes").asScala.toSeq.map(RemoteAddress(_)) + + val NodeName: String = config.getString("akka.cluster.nodename") match { + case "" ⇒ new UUID().toString + case value ⇒ value } - def settings: Settings = _settings + val serverSettings = new RemoteServerSettings + val clientSettings = new RemoteClientSettings + class RemoteClientSettings { + val SecureCookie: Option[String] = config.getString("akka.remote.secure-cookie") match { + case "" ⇒ None + case cookie ⇒ Some(cookie) + } + + val ReconnectionTimeWindow = Duration(config.getMilliseconds("akka.remote.client.reconnection-time-window"), MILLISECONDS) + val ReadTimeout = Duration(config.getMilliseconds("akka.remote.client.read-timeout"), MILLISECONDS) + val ReconnectDelay = Duration(config.getMilliseconds("akka.remote.client.reconnect-delay"), MILLISECONDS) + val MessageFrameSize = config.getInt("akka.remote.client.message-frame-size") + } + + class RemoteServerSettings { + import scala.collection.JavaConverters._ + val MessageFrameSize = config.getInt("akka.remote.server.message-frame-size") + val SecureCookie: Option[String] = config.getString("akka.remote.secure-cookie") match { + case "" ⇒ None + case cookie ⇒ Some(cookie) + } + val RequireCookie = { + val requireCookie = config.getBoolean("akka.remote.server.require-cookie") + if (requireCookie && SecureCookie.isEmpty) throw new ConfigurationException( + "Configuration option 'akka.remote.server.require-cookie' is turned on but no secure cookie is defined in 'akka.remote.secure-cookie'.") + requireCookie + } + + val UsePassiveConnections = config.getBoolean("akka.remote.use-passive-connections") + + val UntrustedMode = config.getBoolean("akka.remote.server.untrusted-mode") + val Hostname = config.getString("akka.remote.server.hostname") match { + case "" ⇒ InetAddress.getLocalHost.getHostAddress + case value ⇒ value + } + val Port = config.getInt("akka.remote.server.port") + val ConnectionTimeout = Duration(config.getMilliseconds("akka.remote.server.connection-timeout"), MILLISECONDS) + + val Backlog = config.getInt("akka.remote.server.backlog") + } } \ No newline at end of file diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index 9424b93372..cf922feafe 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -358,8 +358,8 @@ class ActiveRemoteClientHandler( class NettyRemoteSupport(_system: ActorSystem) extends RemoteSupport(_system) with RemoteMarshallingOps { val log = Logging(system, "NettyRemoteSupport") - val serverSettings = RemoteExtension(system).settings.serverSettings - val clientSettings = RemoteExtension(system).settings.clientSettings + val serverSettings = RemoteExtension(system).serverSettings + val clientSettings = RemoteExtension(system).clientSettings private val remoteClients = new HashMap[RemoteAddress, RemoteClient] private val clientsLock = new ReentrantReadWriteLock diff --git a/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala index d4e4b6b3bc..67162f1bb6 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala @@ -8,7 +8,7 @@ class RemoteConfigSpec extends AkkaSpec { "ClusterSpec: A Deployer" must { "be able to parse 'akka.actor.cluster._' config elements" in { - val config = RemoteExtension(system).settings.config + val config = RemoteExtension(system).config import config._ //akka.remote.server diff --git a/akka-testkit/src/main/scala/akka/testkit/TestBarrier.scala b/akka-testkit/src/main/scala/akka/testkit/TestBarrier.scala index 22c10271b7..e027620b61 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestBarrier.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestBarrier.scala @@ -33,7 +33,7 @@ class TestBarrier(count: Int) { } catch { case e: TimeoutException ⇒ throw new TestBarrierTimeoutException("Timeout of %s and time factor of %s" - format (timeout.toString, TestKitExtension(system).settings.TestTimeFactor)) + format (timeout.toString, TestKitExtension(system).TestTimeFactor)) } } diff --git a/akka-testkit/src/main/scala/akka/testkit/TestEventListener.scala b/akka-testkit/src/main/scala/akka/testkit/TestEventListener.scala index 675bdfe8c1..543d443da6 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestEventListener.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestEventListener.scala @@ -81,8 +81,7 @@ abstract class EventFilter(occurrences: Int) { */ def intercept[T](code: ⇒ T)(implicit system: ActorSystem): T = { system.eventStream publish TestEvent.Mute(this) - val testKitExtension = TestKitExtension(system) - val leeway = testKitExtension.settings.TestEventFilterLeeway + val leeway = TestKitExtension(system).TestEventFilterLeeway try { val result = code if (!awaitDone(leeway)) diff --git a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala index 88548e9cb2..dba8437ef6 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala @@ -81,7 +81,7 @@ class TestKit(_system: ActorSystem) { import TestActor.{ Message, RealMessage, NullMessage } implicit val system = _system - val testKitExtension = TestKitExtension(system) + val testKitSettings = TestKitExtension(system) private val queue = new LinkedBlockingDeque[Message]() private[akka] var lastMessage: Message = NullMessage @@ -128,7 +128,7 @@ class TestKit(_system: ActorSystem) { * block or missing that it returns the properly dilated default for this * case from settings (key "akka.test.single-expect-default"). */ - def remaining: Duration = if (end == Duration.Undefined) testKitExtension.settings.SingleExpectDefaultTimeout.dilated else end - now + def remaining: Duration = if (end == Duration.Undefined) testKitSettings.SingleExpectDefaultTimeout.dilated else end - now /** * Query queue status. @@ -569,10 +569,8 @@ object TestKit { * Java API. Scale timeouts (durations) during tests with the configured * 'akka.test.timefactor'. */ - def dilated(duration: Duration, system: ActorSystem): Duration = { - duration * TestKitExtension(system).settings.TestTimeFactor - } - + def dilated(duration: Duration, system: ActorSystem): Duration = + duration * TestKitExtension(system).TestTimeFactor } /** diff --git a/akka-testkit/src/main/scala/akka/testkit/TestKitExtension.scala b/akka-testkit/src/main/scala/akka/testkit/TestKitExtension.scala index d1ef60065f..5af1bde50a 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestKitExtension.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestKitExtension.scala @@ -3,53 +3,27 @@ */ package akka.testkit -import akka.actor.ActorSystem -import akka.actor.ExtensionKey -import akka.actor.Extension -import akka.actor.ActorSystemImpl import com.typesafe.config.Config import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigParseOptions import com.typesafe.config.ConfigRoot import akka.util.Duration import java.util.concurrent.TimeUnit.MILLISECONDS +import akka.actor.{ ExtensionId, ActorSystem, Extension, ActorSystemImpl } -object TestKitExtensionKey extends ExtensionKey[TestKitExtension] - -object TestKitExtension { - def apply(system: ActorSystem): TestKitExtension = { - if (!system.hasExtension(TestKitExtensionKey)) { - system.registerExtension(new TestKitExtension) - } - system.extension(TestKitExtensionKey) - } - - class Settings(cfg: Config) { - private def referenceConfig: Config = - ConfigFactory.parseResource(classOf[ActorSystem], "/akka-testkit-reference.conf", - ConfigParseOptions.defaults.setAllowMissing(false)) - val config: ConfigRoot = ConfigFactory.emptyRoot("akka-testkit").withFallback(cfg).withFallback(referenceConfig).resolve() - - import config._ - - val TestTimeFactor = getDouble("akka.test.timefactor") - val SingleExpectDefaultTimeout = Duration(getMilliseconds("akka.test.single-expect-default"), MILLISECONDS) - val TestEventFilterLeeway = Duration(getMilliseconds("akka.test.filter-leeway"), MILLISECONDS) - - } +object TestKitExtension extends ExtensionId[TestKitSettings] { + def createExtension(system: ActorSystemImpl): TestKitSettings = new TestKitSettings(system.applicationConfig) } -class TestKitExtension extends Extension[TestKitExtension] { - import TestKitExtension._ - @volatile - private var _settings: Settings = _ +class TestKitSettings(cfg: Config) extends Extension { + private def referenceConfig: Config = + ConfigFactory.parseResource(classOf[ActorSystem], "/akka-testkit-reference.conf", + ConfigParseOptions.defaults.setAllowMissing(false)) + val config: ConfigRoot = ConfigFactory.emptyRoot("akka-testkit").withFallback(cfg).withFallback(referenceConfig).resolve() - def key = TestKitExtensionKey - - def init(system: ActorSystemImpl) { - _settings = new Settings(system.applicationConfig) - } - - def settings: Settings = _settings + import config._ + val TestTimeFactor = getDouble("akka.test.timefactor") + val SingleExpectDefaultTimeout = Duration(getMilliseconds("akka.test.single-expect-default"), MILLISECONDS) + val TestEventFilterLeeway = Duration(getMilliseconds("akka.test.filter-leeway"), MILLISECONDS) } \ No newline at end of file diff --git a/akka-testkit/src/main/scala/akka/testkit/TestLatch.scala b/akka-testkit/src/main/scala/akka/testkit/TestLatch.scala index 2cfb2edc85..1af4785525 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestLatch.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestLatch.scala @@ -34,10 +34,9 @@ class TestLatch(count: Int = 1)(implicit system: ActorSystem) { def await(): Boolean = await(TestLatch.DefaultTimeout) def await(timeout: Duration): Boolean = { - val testKitExtension = TestKitExtension(system) val opened = latch.await(timeout.dilated.toNanos, TimeUnit.NANOSECONDS) if (!opened) throw new TestLatchTimeoutException( - "Timeout of %s with time factor of %s" format (timeout.toString, testKitExtension.settings.TestTimeFactor)) + "Timeout of %s with time factor of %s" format (timeout.toString, TestKitExtension(system).TestTimeFactor)) opened } @@ -45,10 +44,9 @@ class TestLatch(count: Int = 1)(implicit system: ActorSystem) { * Timeout is expected. Throws exception if latch is opened before timeout. */ def awaitTimeout(timeout: Duration = TestLatch.DefaultTimeout) = { - val testKitExtension = TestKitExtension(system) val opened = latch.await(timeout.dilated.toNanos, TimeUnit.NANOSECONDS) if (opened) throw new TestLatchNoTimeoutException( - "Latch opened before timeout of %s with time factor of %s" format (timeout.toString, testKitExtension.settings.TestTimeFactor)) + "Latch opened before timeout of %s with time factor of %s" format (timeout.toString, TestKitExtension(system).TestTimeFactor)) opened } diff --git a/akka-testkit/src/main/scala/akka/testkit/package.scala b/akka-testkit/src/main/scala/akka/testkit/package.scala index 86b404c67d..6f7bf965ca 100644 --- a/akka-testkit/src/main/scala/akka/testkit/package.scala +++ b/akka-testkit/src/main/scala/akka/testkit/package.scala @@ -12,9 +12,9 @@ package object testkit { try { val result = block - val testKitExtension = TestKitExtension(system) - val stop = now + testKitExtension.settings.TestEventFilterLeeway.toMillis - val failed = eventFilters filterNot (_.awaitDone(Duration(stop - now, MILLISECONDS))) map ("Timeout (" + testKitExtension.settings.TestEventFilterLeeway + ") waiting for " + _) + val testKitSettings = TestKitExtension(system) + val stop = now + testKitSettings.TestEventFilterLeeway.toMillis + val failed = eventFilters filterNot (_.awaitDone(Duration(stop - now, MILLISECONDS))) map ("Timeout (" + testKitSettings.TestEventFilterLeeway + ") waiting for " + _) if (failed.nonEmpty) throw new AssertionError("Filter completion error:\n" + failed.mkString("\n")) @@ -45,7 +45,7 @@ package object testkit { */ class TestDuration(duration: Duration) { def dilated(implicit system: ActorSystem): Duration = { - duration * TestKitExtension(system).settings.TestTimeFactor + duration * TestKitExtension(system).TestTimeFactor } } } diff --git a/akka-testkit/src/test/scala/akka/testkit/TestTimeSpec.scala b/akka-testkit/src/test/scala/akka/testkit/TestTimeSpec.scala index ff02a6c82b..0d0bab20b6 100644 --- a/akka-testkit/src/test/scala/akka/testkit/TestTimeSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/TestTimeSpec.scala @@ -15,7 +15,7 @@ class TestTimeSpec extends AkkaSpec(Map("akka.test.timefactor" -> 2.0)) with Bef val now = System.nanoTime intercept[AssertionError] { probe.awaitCond(false, Duration("1 second")) } val diff = System.nanoTime - now - val target = (1000000000l * testKitExtension.settings.TestTimeFactor).toLong + val target = (1000000000l * testKitSettings.TestTimeFactor).toLong diff must be > (target - 300000000l) diff must be < (target + 300000000l) }