Merge branch 'master' of github.com:jboner/akka

Signed-off-by: Jonas Bonér <jonas@jonasboner.com>
This commit is contained in:
Jonas Bonér 2011-11-25 12:10:23 +01:00
commit 8237271310
39 changed files with 363 additions and 607 deletions

View file

@ -13,33 +13,34 @@ import static org.junit.Assert.*;
public class JavaExtension { public class JavaExtension {
static class TestExtension implements Extension<TestExtension> { static class Provider implements ExtensionIdProvider {
private ActorSystemImpl system; public ExtensionId lookup() { return defaultInstance; }
public static ExtensionKey<TestExtension> key = new ExtensionKey<TestExtension>() { }
};
public ExtensionKey<TestExtension> key() { public final static TestExtensionId defaultInstance = new TestExtensionId();
return key;
}
public void init(ActorSystemImpl system) { static class TestExtensionId extends AbstractExtensionId<TestExtension> {
this.system = system; public TestExtension createExtension(ActorSystemImpl i) {
} return new TestExtension(i);
public ActorSystemImpl getSystem() {
return system;
} }
} }
private Config c = ConfigFactory.parseString("akka.extensions = [ \"akka.actor.JavaExtension$TestExtension\" ]", static class TestExtension implements Extension {
public final ActorSystemImpl system;
public TestExtension(ActorSystemImpl i) {
system = i;
}
}
private Config c = ConfigFactory.parseString("akka.extensions = [ \"akka.actor.JavaExtension$Provider\" ]",
ConfigParseOptions.defaults()); ConfigParseOptions.defaults());
private ActorSystem system = ActorSystem.create("JavaExtension", c); private ActorSystem system = ActorSystem.create("JavaExtension", c);
@Test @Test
public void mustBeAccessible() { public void mustBeAccessible() {
final ActorSystemImpl s = system.extension(TestExtension.key).getSystem(); assertSame(system.extension(defaultInstance).system, system);
assertSame(s, system); assertSame(defaultInstance.apply(system).system, system);
} }
} }

View file

@ -247,7 +247,7 @@ class ActorRefSpec extends AkkaSpec {
out.flush out.flush
out.close out.close
Serialization.system.withValue(system.asInstanceOf[ActorSystemImpl]) { Serialization.currentSystem.withValue(system.asInstanceOf[ActorSystemImpl]) {
val in = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray)) val in = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray))
val readA = in.readObject val readA = in.readObject
@ -275,7 +275,7 @@ class ActorRefSpec extends AkkaSpec {
(intercept[java.lang.IllegalStateException] { (intercept[java.lang.IllegalStateException] {
in.readObject in.readObject
}).getMessage must be === "Trying to deserialize a serialized ActorRef without an ActorSystem in scope." + }).getMessage must be === "Trying to deserialize a serialized ActorRef without an ActorSystem in scope." +
" Use akka.serialization.Serialization.system.withValue(system) { ... }" " Use akka.serialization.Serialization.currentSystem.withValue(system) { ... }"
} }
"must throw exception on deserialize if not present in actor hierarchy (and remoting is not enabled)" in { "must throw exception on deserialize if not present in actor hierarchy (and remoting is not enabled)" in {
@ -292,7 +292,7 @@ class ActorRefSpec extends AkkaSpec {
out.flush out.flush
out.close out.close
Serialization.system.withValue(system.asInstanceOf[ActorSystemImpl]) { Serialization.currentSystem.withValue(system.asInstanceOf[ActorSystemImpl]) {
val in = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray)) val in = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray))
(intercept[java.lang.IllegalStateException] { (intercept[java.lang.IllegalStateException] {
in.readObject in.readObject

View file

@ -10,28 +10,23 @@ import com.typesafe.config.ConfigFactory
class JavaExtensionSpec extends JavaExtension with JUnitSuite class JavaExtensionSpec extends JavaExtension with JUnitSuite
object ActorSystemSpec { object ActorSystemSpec {
object TestExtension extends ExtensionId[TestExtension] with ExtensionIdProvider {
class TestExtension extends Extension[TestExtension] { def lookup = this
var system: ActorSystemImpl = _ def createExtension(s: ActorSystemImpl) = new TestExtension(s)
def key = TestExtension
def init(system: ActorSystemImpl) {
this.system = system
}
} }
object TestExtension extends ExtensionKey[TestExtension] class TestExtension(val system: ActorSystemImpl) extends Extension
} }
class ActorSystemSpec extends AkkaSpec("""akka.extensions = ["akka.actor.ActorSystemSpec$TestExtension"]""") { class ActorSystemSpec extends AkkaSpec("""akka.extensions = ["akka.actor.ActorSystemSpec$TestExtension$"]""") {
import ActorSystemSpec._ import ActorSystemSpec._
"An ActorSystem" must { "An ActorSystem" must {
"support extensions" in { "support extensions" in {
TestExtension(system).system must be === system
system.extension(TestExtension).system must be === system system.extension(TestExtension).system must be === system
system.hasExtension(TestExtension) must be(true)
} }
} }

View file

@ -333,7 +333,7 @@ class TypedActorSpec extends AkkaSpec with BeforeAndAfterEach with BeforeAndAfte
"be able to serialize and deserialize invocations" in { "be able to serialize and deserialize invocations" in {
import java.io._ import java.io._
val serialization = SerializationExtension(system).serialization val serialization = SerializationExtension(system)
val m = TypedActor.MethodCall(serialization, classOf[Foo].getDeclaredMethod("pigdog"), Array[AnyRef]()) val m = TypedActor.MethodCall(serialization, classOf[Foo].getDeclaredMethod("pigdog"), Array[AnyRef]())
val baos = new ByteArrayOutputStream(8192 * 4) val baos = new ByteArrayOutputStream(8192 * 4)
val out = new ObjectOutputStream(baos) val out = new ObjectOutputStream(baos)
@ -343,7 +343,7 @@ class TypedActorSpec extends AkkaSpec with BeforeAndAfterEach with BeforeAndAfte
val in = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray)) val in = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray))
Serialization.system.withValue(system.asInstanceOf[ActorSystemImpl]) { Serialization.currentSystem.withValue(system.asInstanceOf[ActorSystemImpl]) {
val mNew = in.readObject().asInstanceOf[TypedActor.MethodCall] val mNew = in.readObject().asInstanceOf[TypedActor.MethodCall]
mNew.method must be(m.method) mNew.method must be(m.method)
@ -353,7 +353,7 @@ class TypedActorSpec extends AkkaSpec with BeforeAndAfterEach with BeforeAndAfte
"be able to serialize and deserialize invocations' parameters" in { "be able to serialize and deserialize invocations' parameters" in {
import java.io._ import java.io._
val someFoo: Foo = new Bar val someFoo: Foo = new Bar
val serialization = SerializationExtension(system).serialization val serialization = SerializationExtension(system)
val m = TypedActor.MethodCall(serialization, classOf[Foo].getDeclaredMethod("testMethodCallSerialization", Array[Class[_]](classOf[Foo], classOf[String], classOf[Int]): _*), Array[AnyRef](someFoo, null, 1.asInstanceOf[AnyRef])) val m = TypedActor.MethodCall(serialization, classOf[Foo].getDeclaredMethod("testMethodCallSerialization", Array[Class[_]](classOf[Foo], classOf[String], classOf[Int]): _*), Array[AnyRef](someFoo, null, 1.asInstanceOf[AnyRef]))
val baos = new ByteArrayOutputStream(8192 * 4) val baos = new ByteArrayOutputStream(8192 * 4)
val out = new ObjectOutputStream(baos) val out = new ObjectOutputStream(baos)
@ -363,7 +363,7 @@ class TypedActorSpec extends AkkaSpec with BeforeAndAfterEach with BeforeAndAfte
val in = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray)) val in = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray))
Serialization.system.withValue(system.asInstanceOf[ActorSystemImpl]) { Serialization.currentSystem.withValue(system.asInstanceOf[ActorSystemImpl]) {
val mNew = in.readObject().asInstanceOf[TypedActor.MethodCall] val mNew = in.readObject().asInstanceOf[TypedActor.MethodCall]
mNew.method must be(m.method) mNew.method must be(m.method)

View file

@ -46,7 +46,7 @@ object SerializeSpec {
class SerializeSpec extends AkkaSpec(SerializeSpec.serializationConf) { class SerializeSpec extends AkkaSpec(SerializeSpec.serializationConf) {
import SerializeSpec._ import SerializeSpec._
val ser = SerializationExtension(system).serialization val ser = SerializationExtension(system)
import ser._ import ser._
val addr = Address("120", "Monroe Street", "Santa Clara", "95050") val addr = Address("120", "Monroe Street", "Santa Clara", "95050")
@ -104,7 +104,7 @@ class SerializeSpec extends AkkaSpec(SerializeSpec.serializationConf) {
out.close() out.close()
val in = new ObjectInputStream(new ByteArrayInputStream(outbuf.toByteArray)) val in = new ObjectInputStream(new ByteArrayInputStream(outbuf.toByteArray))
Serialization.system.withValue(a.asInstanceOf[ActorSystemImpl]) { Serialization.currentSystem.withValue(a.asInstanceOf[ActorSystemImpl]) {
val deadLetters = in.readObject().asInstanceOf[DeadLetterActorRef] val deadLetters = in.readObject().asInstanceOf[DeadLetterActorRef]
(deadLetters eq a.deadLetters) must be(true) (deadLetters eq a.deadLetters) must be(true)
} }

View file

@ -305,17 +305,17 @@ trait ScalaActorRef { ref: ActorRef ⇒
*/ */
case class SerializedActorRef(hostname: String, port: Int, path: String) { case class SerializedActorRef(hostname: String, port: Int, path: String) {
import akka.serialization.Serialization.system import akka.serialization.Serialization.currentSystem
def this(remoteAddress: RemoteAddress, path: String) = this(remoteAddress.hostname, remoteAddress.port, path) def this(remoteAddress: RemoteAddress, path: String) = this(remoteAddress.hostname, remoteAddress.port, path)
def this(remoteAddress: InetSocketAddress, path: String) = this(remoteAddress.getAddress.getHostAddress, remoteAddress.getPort, path) //TODO FIXME REMOVE def this(remoteAddress: InetSocketAddress, path: String) = this(remoteAddress.getAddress.getHostAddress, remoteAddress.getPort, path) //TODO FIXME REMOVE
@throws(classOf[java.io.ObjectStreamException]) @throws(classOf[java.io.ObjectStreamException])
def readResolve(): AnyRef = { def readResolve(): AnyRef = currentSystem.value match {
if (system.value eq null) throw new IllegalStateException( case null throw new IllegalStateException(
"Trying to deserialize a serialized ActorRef without an ActorSystem in scope." + "Trying to deserialize a serialized ActorRef without an ActorSystem in scope." +
" Use akka.serialization.Serialization.system.withValue(system) { ... }") " Use akka.serialization.Serialization.currentSystem.withValue(system) { ... }")
system.value.provider.deserialize(this) match { case someSystem someSystem.provider.deserialize(this) match {
case Some(actor) actor case Some(actor) actor
case None throw new IllegalStateException("Could not deserialize ActorRef") case None throw new IllegalStateException("Could not deserialize ActorRef")
} }
@ -354,7 +354,7 @@ case class DeadLetter(message: Any, sender: ActorRef, recipient: ActorRef)
object DeadLetterActorRef { object DeadLetterActorRef {
class SerializedDeadLetterActorRef extends Serializable { //TODO implement as Protobuf for performance? class SerializedDeadLetterActorRef extends Serializable { //TODO implement as Protobuf for performance?
@throws(classOf[java.io.ObjectStreamException]) @throws(classOf[java.io.ObjectStreamException])
private def readResolve(): AnyRef = Serialization.system.value.deadLetters private def readResolve(): AnyRef = Serialization.currentSystem.value.deadLetters
} }
val serialized = new SerializedDeadLetterActorRef val serialized = new SerializedDeadLetterActorRef

View file

@ -29,6 +29,7 @@ import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.Executors import java.util.concurrent.Executors
import scala.annotation.tailrec import scala.annotation.tailrec
import akka.serialization.SerializationExtension import akka.serialization.SerializationExtension
import org.jboss.netty.akka.util.internal.ConcurrentIdentityHashMap
object ActorSystem { object ActorSystem {
@ -251,36 +252,25 @@ abstract class ActorSystem extends ActorRefFactory with TypedActorFactory {
def stop() def stop()
/** /**
* Register an [[akka.actor.Extension]] within this actor system. The supplied * Registers the provided extension and creates its payload, if this extension isn't already registered
* object is interrogated for the extensions key with which the extension is * This method has putIfAbsent-semantics, this method can potentially block, waiting for the initialization
* accessible from anywhere you have a reference to this actor system in * of the payload, if is in the process of registration from another Thread of execution
* scope, e.g. within actors (see [[ActorSystem.extension]]).
*
* Extensions can be registered automatically by adding their fully-qualified
* class name to the `akka.extensions` configuration key.
*/ */
def registerExtension[T <: AnyRef](ext: Extension[T]): Extension[T] def registerExtension[T <: Extension](ext: ExtensionId[T]): T
/** /**
* Obtain a reference to a registered extension by passing in the key which * Returns the payload that is associated with the provided extension
* the extension object returned from its init method (typically a static * throws an IllegalStateException if it is not registered.
* field or Scala `object`): * This method can potentially block, waiting for the initialization
* * of the payload, if is in the process of registration from another Thread of execution
* {{{
* class MyActor extends Actor {
* val ext: MyExtension = context.app.extension(MyExtension.key)
* }
* }}}
*
* Throws IllegalArgumentException if the extension key is not found.
*/ */
def extension[T <: AnyRef](key: ExtensionKey[T]): T def extension[T <: Extension](ext: ExtensionId[T]): T
/** /**
* Query presence of a specific extension. Beware that this key needs to be * Returns whether the specified extension is already registered, this method can potentially block, waiting for the initialization
* the same as the one used for registration (it is using a HashMap). * of the payload, if is in the process of registration from another Thread of execution
*/ */
def hasExtension(key: ExtensionKey[_]): Boolean def hasExtension(ext: ExtensionId[_ <: Extension]): Boolean
} }
class ActorSystemImpl(val name: String, val applicationConfig: Config) extends ActorSystem { class ActorSystemImpl(val name: String, val applicationConfig: Config) extends ActorSystem {
@ -355,7 +345,7 @@ class ActorSystemImpl(val name: String, val applicationConfig: Config) extends A
private lazy val _start: this.type = { private lazy val _start: this.type = {
// TODO can we do something better than loading SerializationExtension from here? // TODO can we do something better than loading SerializationExtension from here?
_typedActor = new TypedActor(settings, SerializationExtension(this).serialization) _typedActor = new TypedActor(settings, SerializationExtension(this))
provider.init(this) provider.init(this)
deadLetters.init(dispatcher, provider.rootPath) deadLetters.init(dispatcher, provider.rootPath)
// this starts the reaper actor and the user-configured logging subscribers, which are also actors // this starts the reaper actor and the user-configured logging subscribers, which are also actors
@ -377,65 +367,61 @@ class ActorSystemImpl(val name: String, val applicationConfig: Config) extends A
terminationFuture onComplete (_ dispatcher.shutdown()) terminationFuture onComplete (_ dispatcher.shutdown())
} }
private val extensions = new ConcurrentHashMap[ExtensionKey[_], AnyRef] private val extensions = new ConcurrentIdentityHashMap[ExtensionId[_], AnyRef]
/** /**
* Attempts to initialize and register this extension if the key associated with it isn't already registered. * Returns any extension registered to the specified Extension or returns null if not registered
* The extension will only be initialized if it isn't already registered.
* Rethrows anything thrown when initializing the extension (doesn't register in that case)
* Returns the registered extension, might be another already registered instance.
*/ */
@tailrec @tailrec
final def registerExtension[T <: AnyRef](ext: Extension[T]): Extension[T] = { private def findExtension[T <: Extension](ext: ExtensionId[T]): T = extensions.get(ext) match {
/** case c: CountDownLatch c.await(); findExtension(ext) //Registration in process, await completion and retry
* Returns any extension registered to the specified key or returns null if not registered case other other.asInstanceOf[T] //could be a T or null, in which case we return the null as T
*/ }
@tailrec
def findExtension[T <: AnyRef](key: ExtensionKey[T]): Option[T] = extensions.get(key) match {
case c: CountDownLatch c.await(); findExtension(key) //Registration in process, await completion and retry
case e: Extension[_] Some(e.asInstanceOf[T]) //Profit!
case null None //Doesn't exist
}
findExtension(ext.key) match { @tailrec
case Some(e: Extension[_]) e.asInstanceOf[Extension[T]] //Profit! final def registerExtension[T <: Extension](ext: ExtensionId[T]): T = {
case None //Doesn't already exist, commence registration findExtension(ext) match {
case null //Doesn't already exist, commence registration
val inProcessOfRegistration = new CountDownLatch(1) val inProcessOfRegistration = new CountDownLatch(1)
extensions.putIfAbsent(ext.key, inProcessOfRegistration) match { // Signal that registration is in process extensions.putIfAbsent(ext, inProcessOfRegistration) match { // Signal that registration is in process
case null try { // Signal was successfully sent case null try { // Signal was successfully sent
ext.init(this) //Initialize the new extension ext.createExtension(this) match { // Create and initialize the extension
extensions.replace(ext.key, inProcessOfRegistration, ext) //Replace our in process signal with the initialized extension case null throw new IllegalStateException("Extension instance created as null for Extension: " + ext)
ext //Profit! case instance
extensions.replace(ext, inProcessOfRegistration, instance) //Replace our in process signal with the initialized extension
instance //Profit!
}
} catch { } catch {
case t case t
extensions.remove(ext.key, inProcessOfRegistration) //In case shit hits the fan, remove the inProcess signal extensions.remove(ext, inProcessOfRegistration) //In case shit hits the fan, remove the inProcess signal
throw t //Escalate to caller throw t //Escalate to caller
} finally { } finally {
inProcessOfRegistration.countDown //Always notify listeners of the inProcess signal inProcessOfRegistration.countDown //Always notify listeners of the inProcess signal
} }
case other registerExtension(ext) //Someone else is in process of registering an extension for this key, retry case other registerExtension(ext) //Someone else is in process of registering an extension for this Extension, retry
} }
case existing existing.asInstanceOf[T]
} }
} }
def extension[T <: AnyRef](key: ExtensionKey[T]): T = extensions.get(key) match { def extension[T <: Extension](ext: ExtensionId[T]): T = findExtension(ext) match {
case x: Extension[_] x.asInstanceOf[T] case null throw new IllegalArgumentException("Trying to get non-registered extension " + ext)
case _ throw new IllegalArgumentException("trying to get non-registered extension " + key) case some some.asInstanceOf[T]
} }
def hasExtension(key: ExtensionKey[_]): Boolean = extensions.get(key) match { def hasExtension(ext: ExtensionId[_ <: Extension]): Boolean = findExtension(ext) != null
case x: Extension[_] true
case _ false
}
private def loadExtensions() { private def loadExtensions() {
import scala.collection.JavaConversions._ import scala.collection.JavaConversions._
settings.config.getStringList("akka.extensions") foreach { fqcn settings.config.getStringList("akka.extensions") foreach { fqcn
import ReflectiveAccess._ import ReflectiveAccess._
createInstance[Extension[_ <: AnyRef]](fqcn, noParams, noArgs) match { getObjectFor[AnyRef](fqcn).fold(_ createInstance[AnyRef](fqcn, noParams, noArgs), Right(_)) match {
case Left(ex) log.error(ex, "Exception trying to load extension " + fqcn) case Right(p: ExtensionIdProvider) registerExtension(p.lookup());
case Right(ext) if (ext.isInstanceOf[Extension[_]]) registerExtension(ext) else log.error("Class {} is not an Extension", fqcn) case Right(p: ExtensionId[_]) registerExtension(p);
case Right(other) log.error("'{}' is not an ExtensionIdProvider or ExtensionId, skipping...", fqcn)
case Left(problem) log.error(problem, "While trying to load extension '{}', skipping...", fqcn)
} }
} }
} }
} }

View file

@ -16,53 +16,36 @@ package akka.actor
* The extension itself can be created in any way desired and has full access * The extension itself can be created in any way desired and has full access
* to the ActorSystem implementation. * to the ActorSystem implementation.
* *
* Scala example:
*
* {{{
* class MyExtension extends Extension[MyExtension] {
* def key = MyExtension
* def init(system: ActorSystemImpl) {
* ... // initialize here
* }
* }
* object MyExtension extends ExtensionKey[MyExtension]
* }}}
*
* Java example:
*
* {{{
* static class MyExtension implements Extension<MyExtension> {
* public static ExtensionKey<MyExtension> key = new ExtensionKey<MyExtension>() {};
*
* public ExtensionKey<TestExtension> key() {
* return key;
* }
* public void init(ActorSystemImpl system) {
* ... // initialize here
* }
* }
* }}}
*/ */
trait Extension[T <: AnyRef] {
/** /**
* This method is called by the ActorSystem upon registering this extension. * Market interface to signify an Akka Extension
* The key returned is used for looking up extensions, hence it must be a */
* suitable hash key and available to all clients of the extension. This is trait Extension
* best achieved by storing it in a static field (Java) or as/in an object
* (Scala).
*/
def key: ExtensionKey[T]
// FIXME ActorSystemImpl exposed to user API. We might well choose to introduce a new interface for this level of access, just so we can shuffle around the implementation /**
/** * Identifies an Extension
* This method is called by the ActorSystem when the extension is registered * Lookup of Extensions is done by object identity, so the Id must be the same wherever it's used,
* to trigger initialization of the extension. * otherwise you'll get the same extension loaded multiple times.
*/ */
def init(system: ActorSystemImpl): Unit trait ExtensionId[T <: Extension] {
def apply(system: ActorSystem): T = system.registerExtension(this)
def createExtension(system: ActorSystemImpl): T
} }
/** /**
* Marker trait identifying a registered [[akka.actor.Extension]]. * Java API for ExtensionId
*/ */
trait ExtensionKey[T <: AnyRef] abstract class AbstractExtensionId[T <: Extension] extends ExtensionId[T]
/**
* To be able to load an ExtensionId from the configuration,
* a class that implements ExtensionIdProvider must be specified.
* The lookup method should return the canonical reference to the extension.
*/
trait ExtensionIdProvider {
/**
* Returns the canonical ExtensionId for this Extension
*/
def lookup(): ExtensionId[_ <: Extension]
}

View file

@ -58,11 +58,11 @@ object TypedActor {
//TODO implement writeObject and readObject to serialize //TODO implement writeObject and readObject to serialize
//TODO Possible optimization is to special encode the parameter-types to conserve space //TODO Possible optimization is to special encode the parameter-types to conserve space
private def readResolve(): AnyRef = { private def readResolve(): AnyRef = {
val system = akka.serialization.Serialization.system.value val system = akka.serialization.Serialization.currentSystem.value
if (system eq null) throw new IllegalStateException( if (system eq null) throw new IllegalStateException(
"Trying to deserialize a SerializedMethodCall without an ActorSystem in scope." + "Trying to deserialize a SerializedMethodCall without an ActorSystem in scope." +
" Use akka.serialization.Serialization.system.withValue(system) { ... }") " Use akka.serialization.Serialization.currentSystem.withValue(system) { ... }")
val serialization = SerializationExtension(system).serialization val serialization = SerializationExtension(system)
MethodCall(serialization, ownerType.getDeclaredMethod(methodName, parameterTypes: _*), serializedParameters match { MethodCall(serialization, ownerType.getDeclaredMethod(methodName, parameterTypes: _*), serializedParameters match {
case null null case null null
case a if a.length == 0 Array[AnyRef]() case a if a.length == 0 Array[AnyRef]()

View file

@ -6,16 +6,59 @@ package akka.serialization
import akka.AkkaException import akka.AkkaException
import akka.util.ReflectiveAccess import akka.util.ReflectiveAccess
import akka.actor.{ ActorSystem, ActorSystemImpl }
import scala.util.DynamicVariable import scala.util.DynamicVariable
import com.typesafe.config.{ ConfigRoot, ConfigParseOptions, ConfigFactory, Config }
import com.typesafe.config.Config._
import akka.config.ConfigurationException
import akka.actor.{ Extension, ActorSystem, ActorSystemImpl }
case class NoSerializerFoundException(m: String) extends AkkaException(m) case class NoSerializerFoundException(m: String) extends AkkaException(m)
object Serialization {
// TODO ensure that these are always set (i.e. withValue()) when doing deserialization
val currentSystem = new DynamicVariable[ActorSystemImpl](null)
class Settings(cfg: Config) {
private def referenceConfig: Config =
ConfigFactory.parseResource(classOf[ActorSystem], "/akka-serialization-reference.conf",
ConfigParseOptions.defaults.setAllowMissing(false))
val config: ConfigRoot = ConfigFactory.emptyRoot("akka-serialization").withFallback(cfg).withFallback(referenceConfig).resolve()
import scala.collection.JavaConverters._
import config._
val Serializers: Map[String, String] = {
toStringMap(getConfig("akka.actor.serializers"))
}
val SerializationBindings: Map[String, Seq[String]] = {
val configPath = "akka.actor.serialization-bindings"
hasPath(configPath) match {
case false Map()
case true
val serializationBindings: Map[String, Seq[String]] = getConfig(configPath).toObject.unwrapped.asScala.toMap.map {
case (k: String, v: java.util.Collection[_]) (k -> v.asScala.toSeq.asInstanceOf[Seq[String]])
case invalid throw new ConfigurationException("Invalid serialization-bindings [%s]".format(invalid))
}
serializationBindings
}
}
private def toStringMap(mapConfig: Config): Map[String, String] =
mapConfig.toObject.unwrapped.asScala.toMap.map { case (k, v) (k, v.toString) }
}
}
/** /**
* Serialization module. Contains methods for serialization and deserialization as well as * Serialization module. Contains methods for serialization and deserialization as well as
* locating a Serializer for a particular class as defined in the mapping in the 'akka.conf' file. * locating a Serializer for a particular class as defined in the mapping in the 'akka.conf' file.
*/ */
class Serialization(val system: ActorSystemImpl) { class Serialization(val system: ActorSystemImpl) extends Extension {
import Serialization._
val settings = new Settings(system.applicationConfig)
//TODO document me //TODO document me
def serialize(o: AnyRef): Either[Exception, Array[Byte]] = def serialize(o: AnyRef): Either[Exception, Array[Byte]] =
@ -27,7 +70,7 @@ class Serialization(val system: ActorSystemImpl) {
clazz: Class[_], clazz: Class[_],
classLoader: Option[ClassLoader]): Either[Exception, AnyRef] = classLoader: Option[ClassLoader]): Either[Exception, AnyRef] =
try { try {
Serialization.system.withValue(system) { currentSystem.withValue(system) {
Right(serializerFor(clazz).fromBinary(bytes, Some(clazz), classLoader)) Right(serializerFor(clazz).fromBinary(bytes, Some(clazz), classLoader))
} }
} catch { case e: Exception Left(e) } } catch { case e: Exception Left(e) }
@ -63,15 +106,13 @@ class Serialization(val system: ActorSystemImpl) {
} }
} }
// serializers and bindings needs to be lazy because Serialization is initialized from SerializationExtension, which is needed here
/** /**
* A Map of serializer from alias to implementation (class implementing akka.serialization.Serializer) * A Map of serializer from alias to implementation (class implementing akka.serialization.Serializer)
* By default always contains the following mapping: "default" -> akka.serialization.JavaSerializer * By default always contains the following mapping: "default" -> akka.serialization.JavaSerializer
* But "default" can be overridden in config * But "default" can be overridden in config
*/ */
lazy val serializers: Map[String, Serializer] = { lazy val serializers: Map[String, Serializer] = {
val serializersConf = SerializationExtension(system).settings.Serializers val serializersConf = settings.Serializers
for ((k: String, v: String) serializersConf) for ((k: String, v: String) serializersConf)
yield k -> serializerOf(v).fold(throw _, identity) yield k -> serializerOf(v).fold(throw _, identity)
} }
@ -80,7 +121,7 @@ class Serialization(val system: ActorSystemImpl) {
* bindings is a Map whose keys = FQN of class that is serializable and values = the alias of the serializer to be used * bindings is a Map whose keys = FQN of class that is serializable and values = the alias of the serializer to be used
*/ */
lazy val bindings: Map[String, String] = { lazy val bindings: Map[String, String] = {
val configBindings = SerializationExtension(system).settings.SerializationBindings val configBindings = settings.SerializationBindings
configBindings.foldLeft(Map[String, String]()) { configBindings.foldLeft(Map[String, String]()) {
case (result, (k: String, vs: Seq[_])) case (result, (k: String, vs: Seq[_]))
//All keys which are lists, take the Strings from them and Map them //All keys which are lists, take the Strings from them and Map them
@ -103,8 +144,3 @@ class Serialization(val system: ActorSystemImpl) {
Map(NullSerializer.identifier -> NullSerializer) ++ serializers map { case (_, v) (v.identifier, v) } Map(NullSerializer.identifier -> NullSerializer) ++ serializers map { case (_, v) (v.identifier, v) }
} }
object Serialization {
// TODO ensure that these are always set (i.e. withValue()) when doing deserialization
val system = new DynamicVariable[ActorSystemImpl](null)
}

View file

@ -3,77 +3,9 @@
*/ */
package akka.serialization package akka.serialization
import akka.actor.ActorSystem import akka.actor.{ ExtensionId, ExtensionIdProvider, ActorSystemImpl }
import akka.actor.ExtensionKey
import akka.actor.Extension
import akka.actor.ActorSystemImpl
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import com.typesafe.config.ConfigParseOptions
import com.typesafe.config.ConfigRoot
import akka.config.ConfigurationException
object SerializationExtensionKey extends ExtensionKey[SerializationExtension]
object SerializationExtension {
def apply(system: ActorSystem): SerializationExtension = {
if (!system.hasExtension(SerializationExtensionKey)) {
system.registerExtension(new SerializationExtension)
}
system.extension(SerializationExtensionKey)
}
class Settings(cfg: Config) {
private def referenceConfig: Config =
ConfigFactory.parseResource(classOf[ActorSystem], "/akka-serialization-reference.conf",
ConfigParseOptions.defaults.setAllowMissing(false))
val config: ConfigRoot = ConfigFactory.emptyRoot("akka-serialization").withFallback(cfg).withFallback(referenceConfig).resolve()
import scala.collection.JavaConverters._
import config._
val Serializers: Map[String, String] = {
toStringMap(getConfig("akka.actor.serializers"))
}
val SerializationBindings: Map[String, Seq[String]] = {
val configPath = "akka.actor.serialization-bindings"
hasPath(configPath) match {
case false Map()
case true
val serializationBindings: Map[String, Seq[String]] = getConfig(configPath).toObject.unwrapped.asScala.toMap.map {
case (k: String, v: java.util.Collection[_]) (k -> v.asScala.toSeq.asInstanceOf[Seq[String]])
case invalid throw new ConfigurationException("Invalid serialization-bindings [%s]".format(invalid))
}
serializationBindings
}
}
private def toStringMap(mapConfig: Config): Map[String, String] = {
mapConfig.toObject.unwrapped.asScala.toMap.map { entry
(entry._1 -> entry._2.toString)
}
}
}
}
class SerializationExtension extends Extension[SerializationExtension] {
import SerializationExtension._
@volatile
private var _settings: Settings = _
@volatile
private var _serialization: Serialization = _
def serialization = _serialization
def key = SerializationExtensionKey
def init(system: ActorSystemImpl) {
_settings = new Settings(system.applicationConfig)
_serialization = new Serialization(system)
}
def settings: Settings = _settings
object SerializationExtension extends ExtensionId[Serialization] with ExtensionIdProvider {
override def lookup = SerializationExtension
override def createExtension(system: ActorSystemImpl): Serialization = new Serialization(system)
} }

View file

@ -21,7 +21,7 @@ class BeanstalkBasedMailboxException(message: String) extends AkkaException(mess
*/ */
class BeanstalkBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner) with DurableMessageSerialization { class BeanstalkBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner) with DurableMessageSerialization {
private val settings = BeanstalkBasedMailboxExtension(owner.system).settings private val settings = BeanstalkBasedMailboxExtension(owner.system)
private val messageSubmitDelaySeconds = settings.MessageSubmitDelay.toSeconds.toInt private val messageSubmitDelaySeconds = settings.MessageSubmitDelay.toSeconds.toInt
private val messageTimeToLiveSeconds = settings.MessageTimeToLive.toSeconds.toInt private val messageTimeToLiveSeconds = settings.MessageTimeToLive.toSeconds.toInt

View file

@ -3,56 +3,32 @@
*/ */
package akka.actor.mailbox package akka.actor.mailbox
import akka.actor.ActorSystem
import akka.actor.ExtensionKey
import akka.actor.Extension
import akka.actor.ActorSystemImpl
import com.typesafe.config.Config import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigFactory
import com.typesafe.config.ConfigParseOptions import com.typesafe.config.ConfigParseOptions
import com.typesafe.config.ConfigRoot import com.typesafe.config.ConfigRoot
import akka.util.Duration import akka.util.Duration
import java.util.concurrent.TimeUnit.MILLISECONDS import java.util.concurrent.TimeUnit.MILLISECONDS
import akka.actor._
object BeanstalkBasedMailboxExtensionKey extends ExtensionKey[BeanstalkBasedMailboxExtension] object BeanstalkBasedMailboxExtension extends ExtensionId[BeanstalkMailboxSettings] with ExtensionIdProvider {
def lookup() = this
object BeanstalkBasedMailboxExtension { def createExtension(system: ActorSystemImpl) = new BeanstalkMailboxSettings(system.applicationConfig)
def apply(system: ActorSystem): BeanstalkBasedMailboxExtension = {
if (!system.hasExtension(BeanstalkBasedMailboxExtensionKey)) {
system.registerExtension(new BeanstalkBasedMailboxExtension)
}
system.extension(BeanstalkBasedMailboxExtensionKey)
}
class Settings(cfg: Config) {
private def referenceConfig: Config =
ConfigFactory.parseResource(classOf[ActorSystem], "/akka-beanstalk-mailbox-reference.conf",
ConfigParseOptions.defaults.setAllowMissing(false))
val config: ConfigRoot = ConfigFactory.emptyRoot("akka-beanstalk-mailbox").withFallback(cfg).withFallback(referenceConfig).resolve()
import config._
val Hostname = getString("akka.actor.mailbox.beanstalk.hostname")
val Port = getInt("akka.actor.mailbox.beanstalk.port")
val ReconnectWindow = Duration(getMilliseconds("akka.actor.mailbox.beanstalk.reconnect-window"), MILLISECONDS)
val MessageSubmitDelay = Duration(getMilliseconds("akka.actor.mailbox.beanstalk.message-submit-delay"), MILLISECONDS)
val MessageSubmitTimeout = Duration(getMilliseconds("akka.actor.mailbox.beanstalk.message-submit-timeout"), MILLISECONDS)
val MessageTimeToLive = Duration(getMilliseconds("akka.actor.mailbox.beanstalk.message-time-to-live"), MILLISECONDS)
}
} }
class BeanstalkBasedMailboxExtension extends Extension[BeanstalkBasedMailboxExtension] { class BeanstalkMailboxSettings(cfg: Config) extends Extension {
import BeanstalkBasedMailboxExtension._ private def referenceConfig: Config =
@volatile ConfigFactory.parseResource(classOf[ActorSystem], "/akka-beanstalk-mailbox-reference.conf",
private var _settings: Settings = _ ConfigParseOptions.defaults.setAllowMissing(false))
val config: ConfigRoot = ConfigFactory.emptyRoot("akka-beanstalk-mailbox").withFallback(cfg).withFallback(referenceConfig).resolve()
def key = BeanstalkBasedMailboxExtensionKey import config._
def init(system: ActorSystemImpl) { val Hostname = getString("akka.actor.mailbox.beanstalk.hostname")
_settings = new Settings(system.applicationConfig) val Port = getInt("akka.actor.mailbox.beanstalk.port")
} val ReconnectWindow = Duration(getMilliseconds("akka.actor.mailbox.beanstalk.reconnect-window"), MILLISECONDS)
val MessageSubmitDelay = Duration(getMilliseconds("akka.actor.mailbox.beanstalk.message-submit-delay"), MILLISECONDS)
def settings: Settings = _settings val MessageSubmitTimeout = Duration(getMilliseconds("akka.actor.mailbox.beanstalk.message-submit-timeout"), MILLISECONDS)
val MessageTimeToLive = Duration(getMilliseconds("akka.actor.mailbox.beanstalk.message-time-to-live"), MILLISECONDS)
} }

View file

@ -3,63 +3,39 @@
*/ */
package akka.actor.mailbox package akka.actor.mailbox
import akka.actor.ActorSystem
import akka.actor.ExtensionKey
import akka.actor.Extension
import akka.actor.ActorSystemImpl
import com.typesafe.config.Config import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigFactory
import com.typesafe.config.ConfigParseOptions import com.typesafe.config.ConfigParseOptions
import com.typesafe.config.ConfigRoot import com.typesafe.config.ConfigRoot
import akka.util.Duration import akka.util.Duration
import java.util.concurrent.TimeUnit.MILLISECONDS import java.util.concurrent.TimeUnit.MILLISECONDS
import akka.actor._
object FileBasedMailboxExtensionKey extends ExtensionKey[FileBasedMailboxExtension] object FileBasedMailboxExtension extends ExtensionId[FileBasedMailboxSettings] with ExtensionIdProvider {
def lookup() = this
object FileBasedMailboxExtension { def createExtension(system: ActorSystemImpl) = new FileBasedMailboxSettings(system.applicationConfig)
def apply(system: ActorSystem): FileBasedMailboxExtension = {
if (!system.hasExtension(FileBasedMailboxExtensionKey)) {
system.registerExtension(new FileBasedMailboxExtension)
}
system.extension(FileBasedMailboxExtensionKey)
}
class Settings(cfg: Config) {
private def referenceConfig: Config =
ConfigFactory.parseResource(classOf[ActorSystem], "/akka-file-mailbox-reference.conf",
ConfigParseOptions.defaults.setAllowMissing(false))
val config: ConfigRoot = ConfigFactory.emptyRoot("akka-file-mailbox").withFallback(cfg).withFallback(referenceConfig).resolve()
import config._
val QueuePath = getString("akka.actor.mailbox.file-based.directory-path")
val MaxItems = getInt("akka.actor.mailbox.file-based.max-items")
val MaxSize = getMemorySizeInBytes("akka.actor.mailbox.file-based.max-size")
val MaxItemSize = getMemorySizeInBytes("akka.actor.mailbox.file-based.max-item-size")
val MaxAge = Duration(getMilliseconds("akka.actor.mailbox.file-based.max-age"), MILLISECONDS)
val MaxJournalSize = getMemorySizeInBytes("akka.actor.mailbox.file-based.max-journal-size")
val MaxMemorySize = getMemorySizeInBytes("akka.actor.mailbox.file-based.max-memory-size")
val MaxJournalOverflow = getInt("akka.actor.mailbox.file-based.max-journal-overflow")
val MaxJournalSizeAbsolute = getMemorySizeInBytes("akka.actor.mailbox.file-based.max-journal-size-absolute")
val DiscardOldWhenFull = getBoolean("akka.actor.mailbox.file-based.discard-old-when-full")
val KeepJournal = getBoolean("akka.actor.mailbox.file-based.keep-journal")
val SyncJournal = getBoolean("akka.actor.mailbox.file-based.sync-journal")
}
} }
class FileBasedMailboxExtension extends Extension[FileBasedMailboxExtension] { class FileBasedMailboxSettings(cfg: Config) extends Extension {
import FileBasedMailboxExtension._ private def referenceConfig: Config =
@volatile ConfigFactory.parseResource(classOf[ActorSystem], "/akka-file-mailbox-reference.conf",
private var _settings: Settings = _ ConfigParseOptions.defaults.setAllowMissing(false))
val config: ConfigRoot = ConfigFactory.emptyRoot("akka-file-mailbox").withFallback(cfg).withFallback(referenceConfig).resolve()
def key = FileBasedMailboxExtensionKey import config._
def init(system: ActorSystemImpl) { val QueuePath = getString("akka.actor.mailbox.file-based.directory-path")
_settings = new Settings(system.applicationConfig)
}
def settings: Settings = _settings val MaxItems = getInt("akka.actor.mailbox.file-based.max-items")
val MaxSize = getMemorySizeInBytes("akka.actor.mailbox.file-based.max-size")
val MaxItemSize = getMemorySizeInBytes("akka.actor.mailbox.file-based.max-item-size")
val MaxAge = Duration(getMilliseconds("akka.actor.mailbox.file-based.max-age"), MILLISECONDS)
val MaxJournalSize = getMemorySizeInBytes("akka.actor.mailbox.file-based.max-journal-size")
val MaxMemorySize = getMemorySizeInBytes("akka.actor.mailbox.file-based.max-memory-size")
val MaxJournalOverflow = getInt("akka.actor.mailbox.file-based.max-journal-overflow")
val MaxJournalSizeAbsolute = getMemorySizeInBytes("akka.actor.mailbox.file-based.max-journal-size-absolute")
val DiscardOldWhenFull = getBoolean("akka.actor.mailbox.file-based.discard-old-when-full")
val KeepJournal = getBoolean("akka.actor.mailbox.file-based.keep-journal")
val SyncJournal = getBoolean("akka.actor.mailbox.file-based.sync-journal")
} }

View file

@ -14,7 +14,7 @@ class FileBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner) with
val log = Logging(system, "FileBasedMailbox") val log = Logging(system, "FileBasedMailbox")
private val settings = FileBasedMailboxExtension(owner.system).settings private val settings = FileBasedMailboxExtension(owner.system)
val queuePath = settings.QueuePath val queuePath = settings.QueuePath
private val queue = try { private val queue = try {

View file

@ -22,7 +22,7 @@ import scala.collection.mutable
import akka.event.LoggingAdapter import akka.event.LoggingAdapter
import akka.util.Duration import akka.util.Duration
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import akka.actor.mailbox.FileBasedMailboxExtension import akka.actor.mailbox.FileBasedMailboxSettings
// a config value that's backed by a global setting but may be locally overridden // a config value that's backed by a global setting but may be locally overridden
class OverlaySetting[T](base: T) { class OverlaySetting[T](base: T) {
@ -34,7 +34,7 @@ class OverlaySetting[T](base: ⇒ T) {
def apply() = local.getOrElse(base) def apply() = local.getOrElse(base)
} }
class PersistentQueue(persistencePath: String, val name: String, val settings: FileBasedMailboxExtension.Settings, log: LoggingAdapter) { class PersistentQueue(persistencePath: String, val name: String, val settings: FileBasedMailboxSettings, log: LoggingAdapter) {
private case object ItemArrived private case object ItemArrived
@ -127,7 +127,7 @@ class PersistentQueue(persistencePath: String, val name: String, val settings: F
configure(settings) configure(settings)
def configure(settings: FileBasedMailboxExtension.Settings) = synchronized { def configure(settings: FileBasedMailboxSettings) = synchronized {
maxItems set Some(settings.MaxItems) maxItems set Some(settings.MaxItems)
maxSize set Some(settings.MaxSize) maxSize set Some(settings.MaxSize)
maxItemSize set Some(settings.MaxItemSize) maxItemSize set Some(settings.MaxItemSize)

View file

@ -21,11 +21,11 @@ import java.io.File
import java.util.concurrent.CountDownLatch import java.util.concurrent.CountDownLatch
import scala.collection.mutable import scala.collection.mutable
import akka.event.LoggingAdapter import akka.event.LoggingAdapter
import akka.actor.mailbox.FileBasedMailboxExtension import akka.actor.mailbox.FileBasedMailboxSettings
class InaccessibleQueuePath extends Exception("Inaccessible queue path: Must be a directory and writable") class InaccessibleQueuePath extends Exception("Inaccessible queue path: Must be a directory and writable")
class QueueCollection(queueFolder: String, settings: FileBasedMailboxExtension.Settings, log: LoggingAdapter) { class QueueCollection(queueFolder: String, settings: FileBasedMailboxSettings, log: LoggingAdapter) {
private val path = new File(queueFolder) private val path = new File(queueFolder)
if (!path.isDirectory) { if (!path.isDirectory) {

View file

@ -6,7 +6,7 @@ import org.apache.commons.io.FileUtils
class FileBasedMailboxSpec extends DurableMailboxSpec("File", FileDurableMailboxType) { class FileBasedMailboxSpec extends DurableMailboxSpec("File", FileDurableMailboxType) {
def clean { def clean {
val queuePath = FileBasedMailboxExtension(system).settings.QueuePath val queuePath = FileBasedMailboxExtension(system).QueuePath
FileUtils.deleteDirectory(new java.io.File(queuePath)) FileUtils.deleteDirectory(new java.io.File(queuePath))
} }

View file

@ -31,7 +31,7 @@ class MongoBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner) {
implicit val mailboxBSONSer = new BSONSerializableMailbox(system) implicit val mailboxBSONSer = new BSONSerializableMailbox(system)
implicit val safeWrite = WriteConcern.Safe // TODO - Replica Safe when appropriate! implicit val safeWrite = WriteConcern.Safe // TODO - Replica Safe when appropriate!
private val settings = MongoBasedMailboxExtension(owner.system).settings private val settings = MongoBasedMailboxExtension(owner.system)
val log = Logging(system, "MongoBasedMailbox") val log = Logging(system, "MongoBasedMailbox")

View file

@ -3,54 +3,30 @@
*/ */
package akka.actor.mailbox package akka.actor.mailbox
import akka.actor.ActorSystem
import akka.actor.ExtensionKey
import akka.actor.Extension
import akka.actor.ActorSystemImpl
import com.typesafe.config.Config import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigFactory
import com.typesafe.config.ConfigParseOptions import com.typesafe.config.ConfigParseOptions
import com.typesafe.config.ConfigRoot import com.typesafe.config.ConfigRoot
import akka.util.Duration import akka.util.Duration
import java.util.concurrent.TimeUnit.MILLISECONDS import java.util.concurrent.TimeUnit.MILLISECONDS
import akka.actor._
object MongoBasedMailboxExtensionKey extends ExtensionKey[MongoBasedMailboxExtension] object MongoBasedMailboxExtension extends ExtensionId[MongoBasedMailboxSettings] with ExtensionIdProvider {
def lookup() = this
object MongoBasedMailboxExtension { def createExtension(system: ActorSystemImpl) = new MongoBasedMailboxSettings(system.applicationConfig)
def apply(system: ActorSystem): MongoBasedMailboxExtension = {
if (!system.hasExtension(MongoBasedMailboxExtensionKey)) {
system.registerExtension(new MongoBasedMailboxExtension)
}
system.extension(MongoBasedMailboxExtensionKey)
}
class Settings(cfg: Config) {
private def referenceConfig: Config =
ConfigFactory.parseResource(classOf[ActorSystem], "/akka-mongo-mailbox-reference.conf",
ConfigParseOptions.defaults.setAllowMissing(false))
val config: ConfigRoot = ConfigFactory.emptyRoot("akka-mongo-mailbox").withFallback(cfg).withFallback(referenceConfig).resolve()
import config._
val UriConfigKey = "akka.actor.mailbox.mongodb.uri"
val MongoURI = if (config.hasPath(UriConfigKey)) Some(config.getString(UriConfigKey)) else None
val WriteTimeout = Duration(config.getMilliseconds("akka.actor.mailbox.mongodb.timeout.write"), MILLISECONDS)
val ReadTimeout = Duration(config.getMilliseconds("akka.actor.mailbox.mongodb.timeout.read"), MILLISECONDS)
}
} }
class MongoBasedMailboxExtension extends Extension[MongoBasedMailboxExtension] { class MongoBasedMailboxSettings(cfg: Config) extends Extension {
import MongoBasedMailboxExtension._ private def referenceConfig: Config =
@volatile ConfigFactory.parseResource(classOf[ActorSystem], "/akka-mongo-mailbox-reference.conf",
private var _settings: Settings = _ ConfigParseOptions.defaults.setAllowMissing(false))
val config: ConfigRoot = ConfigFactory.emptyRoot("akka-mongo-mailbox").withFallback(cfg).withFallback(referenceConfig).resolve()
def key = MongoBasedMailboxExtensionKey import config._
def init(system: ActorSystemImpl) { val UriConfigKey = "akka.actor.mailbox.mongodb.uri"
_settings = new Settings(system.applicationConfig) val MongoURI = if (config.hasPath(UriConfigKey)) Some(config.getString(UriConfigKey)) else None
} val WriteTimeout = Duration(config.getMilliseconds("akka.actor.mailbox.mongodb.timeout.write"), MILLISECONDS)
val ReadTimeout = Duration(config.getMilliseconds("akka.actor.mailbox.mongodb.timeout.read"), MILLISECONDS)
def settings: Settings = _settings
} }

View file

@ -18,7 +18,7 @@ class RedisBasedMailboxException(message: String) extends AkkaException(message)
*/ */
class RedisBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner) with DurableMessageSerialization { class RedisBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner) with DurableMessageSerialization {
private val settings = RedisBasedMailboxExtension(owner.system).settings private val settings = RedisBasedMailboxExtension(owner.system)
@volatile @volatile
private var clients = connect() // returns a RedisClientPool for multiple asynchronous message handling private var clients = connect() // returns a RedisClientPool for multiple asynchronous message handling

View file

@ -3,50 +3,25 @@
*/ */
package akka.actor.mailbox package akka.actor.mailbox
import akka.actor.ActorSystem
import akka.actor.ExtensionKey
import akka.actor.Extension
import akka.actor.ActorSystemImpl
import com.typesafe.config.Config import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigFactory
import com.typesafe.config.ConfigParseOptions import com.typesafe.config.ConfigParseOptions
import com.typesafe.config.ConfigRoot import com.typesafe.config.ConfigRoot
import akka.actor._
object RedisBasedMailboxExtensionKey extends ExtensionKey[RedisBasedMailboxExtension] object RedisBasedMailboxExtension extends ExtensionId[RedisBasedMailboxSettings] with ExtensionIdProvider {
def lookup() = this
object RedisBasedMailboxExtension { def createExtension(system: ActorSystemImpl) = new RedisBasedMailboxSettings(system.applicationConfig)
def apply(system: ActorSystem): RedisBasedMailboxExtension = {
if (!system.hasExtension(RedisBasedMailboxExtensionKey)) {
system.registerExtension(new RedisBasedMailboxExtension)
}
system.extension(RedisBasedMailboxExtensionKey)
}
class Settings(cfg: Config) {
private def referenceConfig: Config =
ConfigFactory.parseResource(classOf[ActorSystem], "/akka-redis-mailbox-reference.conf",
ConfigParseOptions.defaults.setAllowMissing(false))
val config: ConfigRoot = ConfigFactory.emptyRoot("akka-redis-mailbox").withFallback(cfg).withFallback(referenceConfig).resolve()
import config._
val Hostname = getString("akka.actor.mailbox.redis.hostname")
val Port = getInt("akka.actor.mailbox.redis.port")
}
} }
class RedisBasedMailboxExtension extends Extension[RedisBasedMailboxExtension] { class RedisBasedMailboxSettings(cfg: Config) extends Extension {
import RedisBasedMailboxExtension._ private def referenceConfig: Config =
@volatile ConfigFactory.parseResource(classOf[ActorSystem], "/akka-redis-mailbox-reference.conf",
private var _settings: Settings = _ ConfigParseOptions.defaults.setAllowMissing(false))
val config: ConfigRoot = ConfigFactory.emptyRoot("akka-redis-mailbox").withFallback(cfg).withFallback(referenceConfig).resolve()
def key = RedisBasedMailboxExtensionKey import config._
def init(system: ActorSystemImpl) {
_settings = new Settings(system.applicationConfig)
}
def settings: Settings = _settings
val Hostname = getString("akka.actor.mailbox.redis.hostname")
val Port = getInt("akka.actor.mailbox.redis.port")
} }

View file

@ -22,7 +22,7 @@ class ZooKeeperBasedMailboxException(message: String) extends AkkaException(mess
*/ */
class ZooKeeperBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner) with DurableMessageSerialization { class ZooKeeperBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner) with DurableMessageSerialization {
private val settings = ZooKeeperBasedMailboxExtension(owner.system).settings private val settings = ZooKeeperBasedMailboxExtension(owner.system)
val queueNode = "/queues" val queueNode = "/queues"
val queuePathTemplate = queueNode + "/%s" val queuePathTemplate = queueNode + "/%s"

View file

@ -3,54 +3,29 @@
*/ */
package akka.actor.mailbox package akka.actor.mailbox
import akka.actor.ActorSystem
import akka.actor.ExtensionKey
import akka.actor.Extension
import akka.actor.ActorSystemImpl
import com.typesafe.config.Config import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigFactory
import com.typesafe.config.ConfigParseOptions import com.typesafe.config.ConfigParseOptions
import com.typesafe.config.ConfigRoot import com.typesafe.config.ConfigRoot
import akka.util.Duration import akka.util.Duration
import java.util.concurrent.TimeUnit.MILLISECONDS import java.util.concurrent.TimeUnit.MILLISECONDS
import akka.actor._
object ZooKeeperBasedMailboxExtensionKey extends ExtensionKey[ZooKeeperBasedMailboxExtension] object ZooKeeperBasedMailboxExtension extends ExtensionId[ZooKeeperBasedMailboxSettings] with ExtensionIdProvider {
def lookup() = this
object ZooKeeperBasedMailboxExtension { def createExtension(system: ActorSystemImpl) = new ZooKeeperBasedMailboxSettings(system.applicationConfig)
def apply(system: ActorSystem): ZooKeeperBasedMailboxExtension = {
if (!system.hasExtension(ZooKeeperBasedMailboxExtensionKey)) {
system.registerExtension(new ZooKeeperBasedMailboxExtension)
}
system.extension(ZooKeeperBasedMailboxExtensionKey)
}
class Settings(cfg: Config) {
private def referenceConfig: Config =
ConfigFactory.parseResource(classOf[ActorSystem], "/akka-zookeeper-mailbox-reference.conf",
ConfigParseOptions.defaults.setAllowMissing(false))
val config: ConfigRoot = ConfigFactory.emptyRoot("akka-zookeeper-mailbox").withFallback(cfg).withFallback(referenceConfig).resolve()
import config._
val ZkServerAddresses = getString("akka.actor.mailbox.zookeeper.server-addresses")
val SessionTimeout = Duration(getMilliseconds("akka.actor.mailbox.zookeeper.session-timeout"), MILLISECONDS)
val ConnectionTimeout = Duration(getMilliseconds("akka.actor.mailbox.zookeeper.connection-timeout"), MILLISECONDS)
val BlockingQueue = getBoolean("akka.actor.mailbox.zookeeper.blocking-queue")
}
} }
class ZooKeeperBasedMailboxSettings(cfg: Config) extends Extension {
private def referenceConfig: Config =
ConfigFactory.parseResource(classOf[ActorSystem], "/akka-zookeeper-mailbox-reference.conf",
ConfigParseOptions.defaults.setAllowMissing(false))
val config: ConfigRoot = ConfigFactory.emptyRoot("akka-zookeeper-mailbox").withFallback(cfg).withFallback(referenceConfig).resolve()
class ZooKeeperBasedMailboxExtension extends Extension[ZooKeeperBasedMailboxExtension] { import config._
import ZooKeeperBasedMailboxExtension._
@volatile
private var _settings: Settings = _
def key = ZooKeeperBasedMailboxExtensionKey val ZkServerAddresses = getString("akka.actor.mailbox.zookeeper.server-addresses")
val SessionTimeout = Duration(getMilliseconds("akka.actor.mailbox.zookeeper.session-timeout"), MILLISECONDS)
def init(system: ActorSystemImpl) { val ConnectionTimeout = Duration(getMilliseconds("akka.actor.mailbox.zookeeper.connection-timeout"), MILLISECONDS)
_settings = new Settings(system.applicationConfig) val BlockingQueue = getBoolean("akka.actor.mailbox.zookeeper.blocking-queue")
}
def settings: Settings = _settings
} }

View file

@ -27,8 +27,8 @@ class AccrualFailureDetector(val threshold: Int = 8, val maxSampleSize: Int = 10
def this(system: ActorSystem) { def this(system: ActorSystem) {
this( this(
RemoteExtension(system).settings.FailureDetectorThreshold, RemoteExtension(system).FailureDetectorThreshold,
RemoteExtension(system).settings.FailureDetectorMaxSampleSize) RemoteExtension(system).FailureDetectorMaxSampleSize)
} }
private final val PhiFactor = 1.0 / math.log(10.0) private final val PhiFactor = 1.0 / math.log(10.0)

View file

@ -106,13 +106,13 @@ class Gossiper(remote: Remote) {
private val system = remote.system private val system = remote.system
private val remoteExtension = RemoteExtension(system) private val remoteExtension = RemoteExtension(system)
private val serializationExtension = SerializationExtension(system) private val serialization = SerializationExtension(system)
private val log = Logging(system, "Gossiper") private val log = Logging(system, "Gossiper")
private val failureDetector = remote.failureDetector private val failureDetector = remote.failureDetector
private val connectionManager = new RemoteConnectionManager(system, remote, Map.empty[RemoteAddress, ActorRef]) private val connectionManager = new RemoteConnectionManager(system, remote, Map.empty[RemoteAddress, ActorRef])
private val seeds = { private val seeds = {
val seeds = RemoteExtension(system).settings.SeedNodes val seeds = remoteExtension.SeedNodes
if (seeds.isEmpty) throw new ConfigurationException( if (seeds.isEmpty) throw new ConfigurationException(
"At least one seed node must be defined in the configuration [akka.cluster.seed-nodes]") "At least one seed node must be defined in the configuration [akka.cluster.seed-nodes]")
else seeds else seeds
@ -248,7 +248,7 @@ class Gossiper(remote: Remote) {
throw new IllegalStateException("Connection for [" + peer + "] is not set up")) throw new IllegalStateException("Connection for [" + peer + "] is not set up"))
try { try {
(connection ? (toRemoteMessage(newGossip), remoteExtension.settings.RemoteSystemDaemonAckTimeout)).as[Status] match { (connection ? (toRemoteMessage(newGossip), remoteExtension.RemoteSystemDaemonAckTimeout)).as[Status] match {
case Some(Success(receiver)) case Some(Success(receiver))
log.debug("Gossip sent to [{}] was successfully received", receiver) log.debug("Gossip sent to [{}] was successfully received", receiver)
@ -310,7 +310,7 @@ class Gossiper(remote: Remote) {
} }
private def toRemoteMessage(gossip: Gossip): RemoteProtocol.RemoteSystemDaemonMessageProtocol = { private def toRemoteMessage(gossip: Gossip): RemoteProtocol.RemoteSystemDaemonMessageProtocol = {
val gossipAsBytes = serializationExtension.serialization.serialize(gossip) match { val gossipAsBytes = serialization.serialize(gossip) match {
case Left(error) throw error case Left(error) throw error
case Right(bytes) bytes case Right(bytes) bytes
} }

View file

@ -14,13 +14,13 @@ object MessageSerializer {
def deserialize(system: ActorSystem, messageProtocol: MessageProtocol, classLoader: Option[ClassLoader] = None): AnyRef = { def deserialize(system: ActorSystem, messageProtocol: MessageProtocol, classLoader: Option[ClassLoader] = None): AnyRef = {
val clazz = loadManifest(classLoader, messageProtocol) val clazz = loadManifest(classLoader, messageProtocol)
SerializationExtension(system).serialization.deserialize(messageProtocol.getMessage.toByteArray, SerializationExtension(system).deserialize(messageProtocol.getMessage.toByteArray,
clazz, classLoader).fold(x throw x, identity) clazz, classLoader).fold(x throw x, identity)
} }
def serialize(system: ActorSystem, message: AnyRef): MessageProtocol = { def serialize(system: ActorSystem, message: AnyRef): MessageProtocol = {
val builder = MessageProtocol.newBuilder val builder = MessageProtocol.newBuilder
val bytes = SerializationExtension(system).serialization.serialize(message).fold(x throw x, identity) val bytes = SerializationExtension(system).serialize(message).fold(x throw x, identity)
builder.setMessage(ByteString.copyFrom(bytes)) builder.setMessage(ByteString.copyFrom(bytes))
builder.setMessageManifest(ByteString.copyFromUtf8(message.getClass.getName)) builder.setMessageManifest(ByteString.copyFromUtf8(message.getClass.getName))
builder.build builder.build

View file

@ -36,9 +36,9 @@ class Remote(val system: ActorSystemImpl, val nodename: String) {
import settings._ import settings._
private[remote] val remoteExtension = RemoteExtension(system) private[remote] val remoteExtension = RemoteExtension(system)
private[remote] val serializationExtension = SerializationExtension(system) private[remote] val serialization = SerializationExtension(system)
private[remote] val remoteAddress = { private[remote] val remoteAddress = {
RemoteAddress(remoteExtension.settings.serverSettings.Hostname, remoteExtension.settings.serverSettings.Port) RemoteAddress(remoteExtension.serverSettings.Hostname, remoteExtension.serverSettings.Port)
} }
val failureDetector = new AccrualFailureDetector(system) val failureDetector = new AccrualFailureDetector(system)
@ -134,10 +134,10 @@ class RemoteSystemDaemon(remote: Remote) extends Actor {
if (message.hasActorPath) { if (message.hasActorPath) {
val actorFactoryBytes = val actorFactoryBytes =
if (remoteExtension.settings.ShouldCompressData) LZF.uncompress(message.getPayload.toByteArray) else message.getPayload.toByteArray if (remoteExtension.ShouldCompressData) LZF.uncompress(message.getPayload.toByteArray) else message.getPayload.toByteArray
val actorFactory = val actorFactory =
serializationExtension.serialization.deserialize(actorFactoryBytes, classOf[() Actor], None) match { serialization.deserialize(actorFactoryBytes, classOf[() Actor], None) match {
case Left(error) throw error case Left(error) throw error
case Right(instance) instance.asInstanceOf[() Actor] case Right(instance) instance.asInstanceOf[() Actor]
} }
@ -234,7 +234,7 @@ class RemoteSystemDaemon(remote: Remote) extends Actor {
} }
private def payloadFor[T](message: RemoteSystemDaemonMessageProtocol, clazz: Class[T]): T = { private def payloadFor[T](message: RemoteSystemDaemonMessageProtocol, clazz: Class[T]): T = {
serializationExtension.serialization.deserialize(message.getPayload.toByteArray, clazz, None) match { serialization.deserialize(message.getPayload.toByteArray, clazz, None) match {
case Left(error) throw error case Left(error) throw error
case Right(instance) instance.asInstanceOf[T] case Right(instance) instance.asInstanceOf[T]
} }

View file

@ -55,13 +55,13 @@ class RemoteActorRefProvider(
@volatile @volatile
private var system: ActorSystemImpl = _ private var system: ActorSystemImpl = _
private lazy val remoteExtension = RemoteExtension(system) private lazy val remoteExtension = RemoteExtension(system)
private lazy val serializationExtension = SerializationExtension(system) private lazy val serialization = SerializationExtension(system)
lazy val rootPath: ActorPath = { lazy val rootPath: ActorPath = {
val remoteAddress = RemoteAddress(remoteExtension.settings.serverSettings.Hostname, remoteExtension.settings.serverSettings.Port) val remoteAddress = RemoteAddress(remoteExtension.serverSettings.Hostname, remoteExtension.serverSettings.Port)
new RootActorPath(remoteAddress) new RootActorPath(remoteAddress)
} }
private lazy val local = new LocalActorRefProvider(settings, eventStream, scheduler, rootPath, private lazy val local = new LocalActorRefProvider(settings, eventStream, scheduler, rootPath,
remoteExtension.settings.NodeName, remoteExtension.settings.ClusterName) remoteExtension.NodeName, remoteExtension.ClusterName)
private[akka] lazy val remote = new Remote(system, nodename) private[akka] lazy val remote = new Remote(system, nodename)
private lazy val remoteDaemonConnectionManager = new RemoteConnectionManager(system, remote) private lazy val remoteDaemonConnectionManager = new RemoteConnectionManager(system, remote)
@ -220,9 +220,9 @@ class RemoteActorRefProvider(
log.debug("[{}] Instantiating Actor [{}] on node [{}]", rootPath, actorPath, remoteAddress) log.debug("[{}] Instantiating Actor [{}] on node [{}]", rootPath, actorPath, remoteAddress)
val actorFactoryBytes = val actorFactoryBytes =
serializationExtension.serialization.serialize(actorFactory) match { serialization.serialize(actorFactory) match {
case Left(error) throw error case Left(error) throw error
case Right(bytes) if (remoteExtension.settings.ShouldCompressData) LZF.compress(bytes) else bytes case Right(bytes) if (remoteExtension.ShouldCompressData) LZF.compress(bytes) else bytes
} }
val command = RemoteSystemDaemonMessageProtocol.newBuilder val command = RemoteSystemDaemonMessageProtocol.newBuilder
@ -242,7 +242,7 @@ class RemoteActorRefProvider(
private def sendCommandToRemoteNode(connection: ActorRef, command: RemoteSystemDaemonMessageProtocol, withACK: Boolean) { private def sendCommandToRemoteNode(connection: ActorRef, command: RemoteSystemDaemonMessageProtocol, withACK: Boolean) {
if (withACK) { if (withACK) {
try { try {
val f = connection ? (command, remoteExtension.settings.RemoteSystemDaemonAckTimeout) val f = connection ? (command, remoteExtension.RemoteSystemDaemonAckTimeout)
(try f.await.value catch { case _: FutureTimeoutException None }) match { (try f.await.value catch { case _: FutureTimeoutException None }) match {
case Some(Right(receiver)) case Some(Right(receiver))
log.debug("Remote system command sent to [{}] successfully received", receiver) log.debug("Remote system command sent to [{}] successfully received", receiver)

View file

@ -3,10 +3,6 @@
*/ */
package akka.remote package akka.remote
import akka.actor.ActorSystem
import akka.actor.ExtensionKey
import akka.actor.Extension
import akka.actor.ActorSystemImpl
import com.typesafe.config.Config import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigFactory
import com.typesafe.config.ConfigParseOptions import com.typesafe.config.ConfigParseOptions
@ -16,97 +12,77 @@ import java.util.concurrent.TimeUnit.MILLISECONDS
import java.net.InetAddress import java.net.InetAddress
import akka.config.ConfigurationException import akka.config.ConfigurationException
import com.eaio.uuid.UUID import com.eaio.uuid.UUID
import akka.actor._
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
object RemoteExtensionKey extends ExtensionKey[RemoteExtension] object RemoteExtension extends ExtensionId[RemoteExtensionSettings] with ExtensionIdProvider {
def lookup() = this
object RemoteExtension { def createExtension(system: ActorSystemImpl) = new RemoteExtensionSettings(system.applicationConfig)
def apply(system: ActorSystem): RemoteExtension = {
if (!system.hasExtension(RemoteExtensionKey)) {
system.registerExtension(new RemoteExtension)
}
system.extension(RemoteExtensionKey)
}
class Settings(cfg: Config) {
private def referenceConfig: Config =
ConfigFactory.parseResource(classOf[ActorSystem], "/akka-remote-reference.conf",
ConfigParseOptions.defaults.setAllowMissing(false))
val config: ConfigRoot = ConfigFactory.emptyRoot("akka-remote").withFallback(cfg).withFallback(referenceConfig).resolve()
import config._
val RemoteTransport = getString("akka.remote.layer")
val FailureDetectorThreshold = getInt("akka.remote.failure-detector.threshold")
val FailureDetectorMaxSampleSize = getInt("akka.remote.failure-detector.max-sample-size")
val ShouldCompressData = config.getBoolean("akka.remote.use-compression")
val RemoteSystemDaemonAckTimeout = Duration(config.getMilliseconds("akka.remote.remote-daemon-ack-timeout"), MILLISECONDS)
// TODO cluster config will go into akka-cluster-reference.conf when we enable that module
val ClusterName = getString("akka.cluster.name")
val SeedNodes = Set.empty[RemoteAddress] ++ getStringList("akka.cluster.seed-nodes").asScala.toSeq.map(RemoteAddress(_))
val NodeName: String = config.getString("akka.cluster.nodename") match {
case "" new UUID().toString
case value value
}
val serverSettings = new RemoteServerSettings
val clientSettings = new RemoteClientSettings
class RemoteClientSettings {
val SecureCookie: Option[String] = config.getString("akka.remote.secure-cookie") match {
case "" None
case cookie Some(cookie)
}
val ReconnectionTimeWindow = Duration(config.getMilliseconds("akka.remote.client.reconnection-time-window"), MILLISECONDS)
val ReadTimeout = Duration(config.getMilliseconds("akka.remote.client.read-timeout"), MILLISECONDS)
val ReconnectDelay = Duration(config.getMilliseconds("akka.remote.client.reconnect-delay"), MILLISECONDS)
val MessageFrameSize = config.getInt("akka.remote.client.message-frame-size")
}
class RemoteServerSettings {
val MessageFrameSize = config.getInt("akka.remote.server.message-frame-size")
val SecureCookie: Option[String] = config.getString("akka.remote.secure-cookie") match {
case "" None
case cookie Some(cookie)
}
val RequireCookie = {
val requireCookie = config.getBoolean("akka.remote.server.require-cookie")
if (requireCookie && SecureCookie.isEmpty) throw new ConfigurationException(
"Configuration option 'akka.remote.server.require-cookie' is turned on but no secure cookie is defined in 'akka.remote.secure-cookie'.")
requireCookie
}
val UsePassiveConnections = config.getBoolean("akka.remote.use-passive-connections")
val UntrustedMode = config.getBoolean("akka.remote.server.untrusted-mode")
val Hostname = config.getString("akka.remote.server.hostname") match {
case "" InetAddress.getLocalHost.getHostAddress
case value value
}
val Port = config.getInt("akka.remote.server.port")
val ConnectionTimeout = Duration(config.getMilliseconds("akka.remote.server.connection-timeout"), MILLISECONDS)
val Backlog = config.getInt("akka.remote.server.backlog")
}
}
} }
class RemoteExtension extends Extension[RemoteExtension] { class RemoteExtensionSettings(cfg: Config) extends Extension {
import RemoteExtension._ private def referenceConfig: Config =
@volatile ConfigFactory.parseResource(classOf[ActorSystem], "/akka-remote-reference.conf",
private var _settings: Settings = _ ConfigParseOptions.defaults.setAllowMissing(false))
val config: ConfigRoot = ConfigFactory.emptyRoot("akka-remote").withFallback(cfg).withFallback(referenceConfig).resolve()
def key = RemoteExtensionKey import config._
def init(system: ActorSystemImpl) { val RemoteTransport = getString("akka.remote.layer")
_settings = new Settings(system.applicationConfig) val FailureDetectorThreshold = getInt("akka.remote.failure-detector.threshold")
val FailureDetectorMaxSampleSize = getInt("akka.remote.failure-detector.max-sample-size")
val ShouldCompressData = config.getBoolean("akka.remote.use-compression")
val RemoteSystemDaemonAckTimeout = Duration(config.getMilliseconds("akka.remote.remote-daemon-ack-timeout"), MILLISECONDS)
// TODO cluster config will go into akka-cluster-reference.conf when we enable that module
val ClusterName = getString("akka.cluster.name")
val SeedNodes = Set.empty[RemoteAddress] ++ getStringList("akka.cluster.seed-nodes").asScala.toSeq.map(RemoteAddress(_))
val NodeName: String = config.getString("akka.cluster.nodename") match {
case "" new UUID().toString
case value value
} }
def settings: Settings = _settings val serverSettings = new RemoteServerSettings
val clientSettings = new RemoteClientSettings
class RemoteClientSettings {
val SecureCookie: Option[String] = config.getString("akka.remote.secure-cookie") match {
case "" None
case cookie Some(cookie)
}
val ReconnectionTimeWindow = Duration(config.getMilliseconds("akka.remote.client.reconnection-time-window"), MILLISECONDS)
val ReadTimeout = Duration(config.getMilliseconds("akka.remote.client.read-timeout"), MILLISECONDS)
val ReconnectDelay = Duration(config.getMilliseconds("akka.remote.client.reconnect-delay"), MILLISECONDS)
val MessageFrameSize = config.getInt("akka.remote.client.message-frame-size")
}
class RemoteServerSettings {
import scala.collection.JavaConverters._
val MessageFrameSize = config.getInt("akka.remote.server.message-frame-size")
val SecureCookie: Option[String] = config.getString("akka.remote.secure-cookie") match {
case "" None
case cookie Some(cookie)
}
val RequireCookie = {
val requireCookie = config.getBoolean("akka.remote.server.require-cookie")
if (requireCookie && SecureCookie.isEmpty) throw new ConfigurationException(
"Configuration option 'akka.remote.server.require-cookie' is turned on but no secure cookie is defined in 'akka.remote.secure-cookie'.")
requireCookie
}
val UsePassiveConnections = config.getBoolean("akka.remote.use-passive-connections")
val UntrustedMode = config.getBoolean("akka.remote.server.untrusted-mode")
val Hostname = config.getString("akka.remote.server.hostname") match {
case "" InetAddress.getLocalHost.getHostAddress
case value value
}
val Port = config.getInt("akka.remote.server.port")
val ConnectionTimeout = Duration(config.getMilliseconds("akka.remote.server.connection-timeout"), MILLISECONDS)
val Backlog = config.getInt("akka.remote.server.backlog")
}
} }

View file

@ -358,8 +358,8 @@ class ActiveRemoteClientHandler(
class NettyRemoteSupport(_system: ActorSystem) extends RemoteSupport(_system) with RemoteMarshallingOps { class NettyRemoteSupport(_system: ActorSystem) extends RemoteSupport(_system) with RemoteMarshallingOps {
val log = Logging(system, "NettyRemoteSupport") val log = Logging(system, "NettyRemoteSupport")
val serverSettings = RemoteExtension(system).settings.serverSettings val serverSettings = RemoteExtension(system).serverSettings
val clientSettings = RemoteExtension(system).settings.clientSettings val clientSettings = RemoteExtension(system).clientSettings
private val remoteClients = new HashMap[RemoteAddress, RemoteClient] private val remoteClients = new HashMap[RemoteAddress, RemoteClient]
private val clientsLock = new ReentrantReadWriteLock private val clientsLock = new ReentrantReadWriteLock

View file

@ -8,7 +8,7 @@ class RemoteConfigSpec extends AkkaSpec {
"ClusterSpec: A Deployer" must { "ClusterSpec: A Deployer" must {
"be able to parse 'akka.actor.cluster._' config elements" in { "be able to parse 'akka.actor.cluster._' config elements" in {
val config = RemoteExtension(system).settings.config val config = RemoteExtension(system).config
import config._ import config._
//akka.remote.server //akka.remote.server

View file

@ -33,7 +33,7 @@ class TestBarrier(count: Int) {
} catch { } catch {
case e: TimeoutException case e: TimeoutException
throw new TestBarrierTimeoutException("Timeout of %s and time factor of %s" throw new TestBarrierTimeoutException("Timeout of %s and time factor of %s"
format (timeout.toString, TestKitExtension(system).settings.TestTimeFactor)) format (timeout.toString, TestKitExtension(system).TestTimeFactor))
} }
} }

View file

@ -81,8 +81,7 @@ abstract class EventFilter(occurrences: Int) {
*/ */
def intercept[T](code: T)(implicit system: ActorSystem): T = { def intercept[T](code: T)(implicit system: ActorSystem): T = {
system.eventStream publish TestEvent.Mute(this) system.eventStream publish TestEvent.Mute(this)
val testKitExtension = TestKitExtension(system) val leeway = TestKitExtension(system).TestEventFilterLeeway
val leeway = testKitExtension.settings.TestEventFilterLeeway
try { try {
val result = code val result = code
if (!awaitDone(leeway)) if (!awaitDone(leeway))

View file

@ -81,7 +81,7 @@ class TestKit(_system: ActorSystem) {
import TestActor.{ Message, RealMessage, NullMessage } import TestActor.{ Message, RealMessage, NullMessage }
implicit val system = _system implicit val system = _system
val testKitExtension = TestKitExtension(system) val testKitSettings = TestKitExtension(system)
private val queue = new LinkedBlockingDeque[Message]() private val queue = new LinkedBlockingDeque[Message]()
private[akka] var lastMessage: Message = NullMessage private[akka] var lastMessage: Message = NullMessage
@ -128,7 +128,7 @@ class TestKit(_system: ActorSystem) {
* block or missing that it returns the properly dilated default for this * block or missing that it returns the properly dilated default for this
* case from settings (key "akka.test.single-expect-default"). * case from settings (key "akka.test.single-expect-default").
*/ */
def remaining: Duration = if (end == Duration.Undefined) testKitExtension.settings.SingleExpectDefaultTimeout.dilated else end - now def remaining: Duration = if (end == Duration.Undefined) testKitSettings.SingleExpectDefaultTimeout.dilated else end - now
/** /**
* Query queue status. * Query queue status.
@ -569,10 +569,8 @@ object TestKit {
* Java API. Scale timeouts (durations) during tests with the configured * Java API. Scale timeouts (durations) during tests with the configured
* 'akka.test.timefactor'. * 'akka.test.timefactor'.
*/ */
def dilated(duration: Duration, system: ActorSystem): Duration = { def dilated(duration: Duration, system: ActorSystem): Duration =
duration * TestKitExtension(system).settings.TestTimeFactor duration * TestKitExtension(system).TestTimeFactor
}
} }
/** /**

View file

@ -3,53 +3,27 @@
*/ */
package akka.testkit package akka.testkit
import akka.actor.ActorSystem
import akka.actor.ExtensionKey
import akka.actor.Extension
import akka.actor.ActorSystemImpl
import com.typesafe.config.Config import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigFactory
import com.typesafe.config.ConfigParseOptions import com.typesafe.config.ConfigParseOptions
import com.typesafe.config.ConfigRoot import com.typesafe.config.ConfigRoot
import akka.util.Duration import akka.util.Duration
import java.util.concurrent.TimeUnit.MILLISECONDS import java.util.concurrent.TimeUnit.MILLISECONDS
import akka.actor.{ ExtensionId, ActorSystem, Extension, ActorSystemImpl }
object TestKitExtensionKey extends ExtensionKey[TestKitExtension] object TestKitExtension extends ExtensionId[TestKitSettings] {
def createExtension(system: ActorSystemImpl): TestKitSettings = new TestKitSettings(system.applicationConfig)
object TestKitExtension {
def apply(system: ActorSystem): TestKitExtension = {
if (!system.hasExtension(TestKitExtensionKey)) {
system.registerExtension(new TestKitExtension)
}
system.extension(TestKitExtensionKey)
}
class Settings(cfg: Config) {
private def referenceConfig: Config =
ConfigFactory.parseResource(classOf[ActorSystem], "/akka-testkit-reference.conf",
ConfigParseOptions.defaults.setAllowMissing(false))
val config: ConfigRoot = ConfigFactory.emptyRoot("akka-testkit").withFallback(cfg).withFallback(referenceConfig).resolve()
import config._
val TestTimeFactor = getDouble("akka.test.timefactor")
val SingleExpectDefaultTimeout = Duration(getMilliseconds("akka.test.single-expect-default"), MILLISECONDS)
val TestEventFilterLeeway = Duration(getMilliseconds("akka.test.filter-leeway"), MILLISECONDS)
}
} }
class TestKitExtension extends Extension[TestKitExtension] { class TestKitSettings(cfg: Config) extends Extension {
import TestKitExtension._ private def referenceConfig: Config =
@volatile ConfigFactory.parseResource(classOf[ActorSystem], "/akka-testkit-reference.conf",
private var _settings: Settings = _ ConfigParseOptions.defaults.setAllowMissing(false))
val config: ConfigRoot = ConfigFactory.emptyRoot("akka-testkit").withFallback(cfg).withFallback(referenceConfig).resolve()
def key = TestKitExtensionKey import config._
def init(system: ActorSystemImpl) {
_settings = new Settings(system.applicationConfig)
}
def settings: Settings = _settings
val TestTimeFactor = getDouble("akka.test.timefactor")
val SingleExpectDefaultTimeout = Duration(getMilliseconds("akka.test.single-expect-default"), MILLISECONDS)
val TestEventFilterLeeway = Duration(getMilliseconds("akka.test.filter-leeway"), MILLISECONDS)
} }

View file

@ -34,10 +34,9 @@ class TestLatch(count: Int = 1)(implicit system: ActorSystem) {
def await(): Boolean = await(TestLatch.DefaultTimeout) def await(): Boolean = await(TestLatch.DefaultTimeout)
def await(timeout: Duration): Boolean = { def await(timeout: Duration): Boolean = {
val testKitExtension = TestKitExtension(system)
val opened = latch.await(timeout.dilated.toNanos, TimeUnit.NANOSECONDS) val opened = latch.await(timeout.dilated.toNanos, TimeUnit.NANOSECONDS)
if (!opened) throw new TestLatchTimeoutException( if (!opened) throw new TestLatchTimeoutException(
"Timeout of %s with time factor of %s" format (timeout.toString, testKitExtension.settings.TestTimeFactor)) "Timeout of %s with time factor of %s" format (timeout.toString, TestKitExtension(system).TestTimeFactor))
opened opened
} }
@ -45,10 +44,9 @@ class TestLatch(count: Int = 1)(implicit system: ActorSystem) {
* Timeout is expected. Throws exception if latch is opened before timeout. * Timeout is expected. Throws exception if latch is opened before timeout.
*/ */
def awaitTimeout(timeout: Duration = TestLatch.DefaultTimeout) = { def awaitTimeout(timeout: Duration = TestLatch.DefaultTimeout) = {
val testKitExtension = TestKitExtension(system)
val opened = latch.await(timeout.dilated.toNanos, TimeUnit.NANOSECONDS) val opened = latch.await(timeout.dilated.toNanos, TimeUnit.NANOSECONDS)
if (opened) throw new TestLatchNoTimeoutException( if (opened) throw new TestLatchNoTimeoutException(
"Latch opened before timeout of %s with time factor of %s" format (timeout.toString, testKitExtension.settings.TestTimeFactor)) "Latch opened before timeout of %s with time factor of %s" format (timeout.toString, TestKitExtension(system).TestTimeFactor))
opened opened
} }

View file

@ -12,9 +12,9 @@ package object testkit {
try { try {
val result = block val result = block
val testKitExtension = TestKitExtension(system) val testKitSettings = TestKitExtension(system)
val stop = now + testKitExtension.settings.TestEventFilterLeeway.toMillis val stop = now + testKitSettings.TestEventFilterLeeway.toMillis
val failed = eventFilters filterNot (_.awaitDone(Duration(stop - now, MILLISECONDS))) map ("Timeout (" + testKitExtension.settings.TestEventFilterLeeway + ") waiting for " + _) val failed = eventFilters filterNot (_.awaitDone(Duration(stop - now, MILLISECONDS))) map ("Timeout (" + testKitSettings.TestEventFilterLeeway + ") waiting for " + _)
if (failed.nonEmpty) if (failed.nonEmpty)
throw new AssertionError("Filter completion error:\n" + failed.mkString("\n")) throw new AssertionError("Filter completion error:\n" + failed.mkString("\n"))
@ -45,7 +45,7 @@ package object testkit {
*/ */
class TestDuration(duration: Duration) { class TestDuration(duration: Duration) {
def dilated(implicit system: ActorSystem): Duration = { def dilated(implicit system: ActorSystem): Duration = {
duration * TestKitExtension(system).settings.TestTimeFactor duration * TestKitExtension(system).TestTimeFactor
} }
} }
} }

View file

@ -15,7 +15,7 @@ class TestTimeSpec extends AkkaSpec(Map("akka.test.timefactor" -> 2.0)) with Bef
val now = System.nanoTime val now = System.nanoTime
intercept[AssertionError] { probe.awaitCond(false, Duration("1 second")) } intercept[AssertionError] { probe.awaitCond(false, Duration("1 second")) }
val diff = System.nanoTime - now val diff = System.nanoTime - now
val target = (1000000000l * testKitExtension.settings.TestTimeFactor).toLong val target = (1000000000l * testKitSettings.TestTimeFactor).toLong
diff must be > (target - 300000000l) diff must be > (target - 300000000l)
diff must be < (target + 300000000l) diff must be < (target + 300000000l)
} }