Modularize configuration. See #1361

* Split config reference to one for each module/extension.
* Adjusted signature of registerExtension to avoid race of extension init
* Moved Duration.dilated to testkit
* TestKitExtension
* RemoteExtension
* SerializationExtension
* Durable mailboxes extensions
* Fixed broken serialization bindings and added test
* Updated configuration documentation
* System properties akka.remote.hostname akka.remote.port replaced with akka.remote.server.hostname and akka.remote.server.port
* Adjustments of ActorSystem initialization. Still don't like the two-phase constructor/init flow. Very fragile for changes.

Review fixes. SerializationExtension
This commit is contained in:
Patrik Nordwall 2011-11-22 13:04:10 +01:00
parent c56341b3a6
commit 179399296e
85 changed files with 1233 additions and 643 deletions

View file

@ -15,19 +15,24 @@ public class JavaExtension {
static class TestExtension implements Extension<TestExtension> {
private ActorSystemImpl system;
public static ExtensionKey<TestExtension> key = new ExtensionKey<TestExtension>() {};
public static ExtensionKey<TestExtension> key = new ExtensionKey<TestExtension>() {
};
public ExtensionKey<TestExtension> init(ActorSystemImpl system) {
this.system = system;
public ExtensionKey<TestExtension> key() {
return key;
}
public void init(ActorSystemImpl system) {
this.system = system;
}
public ActorSystemImpl getSystem() {
return system;
}
}
private Config c = ConfigFactory.parseString("akka.extensions = [ \"akka.actor.JavaExtension$TestExtension\" ]", ConfigParseOptions.defaults());
private Config c = ConfigFactory.parseString("akka.extensions = [ \"akka.actor.JavaExtension$TestExtension\" ]",
ConfigParseOptions.defaults());
private ActorSystem system = ActorSystem.create("JavaExtension", c);

View file

@ -0,0 +1,30 @@
package akka.docs.config
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
import com.typesafe.config.ConfigFactory
import com.typesafe.config.ConfigParseOptions
import akka.actor.ActorSystem
//#imports
class ConfigDocSpec extends WordSpec {
"programmatically configure ActorSystem" in {
//#custom-config
val customConf = ConfigFactory.parseString("""
akka.actor.deployment {
/app/my-service {
router = round-robin
nr-of-instances = 3
}
}
""", ConfigParseOptions.defaults)
val system = ActorSystem("MySystem", ConfigFactory.systemProperties.withFallback(customConf))
//#custom-config
system.stop()
}
}

View file

@ -275,7 +275,7 @@ class ActorRefSpec extends AkkaSpec {
(intercept[java.lang.IllegalStateException] {
in.readObject
}).getMessage must be === "Trying to deserialize a serialized ActorRef without an ActorSystem in scope." +
" Use akka.serialization.Serialization.system.withValue(akkaApplication) { ... }"
" Use akka.serialization.Serialization.system.withValue(system) { ... }"
}
"must throw exception on deserialize if not present in actor hierarchy (and remoting is not enabled)" in {
@ -284,7 +284,7 @@ class ActorRefSpec extends AkkaSpec {
val baos = new ByteArrayOutputStream(8192 * 32)
val out = new ObjectOutputStream(baos)
val addr = system.rootPath.remoteAddress
val addr = system.asInstanceOf[ActorSystemImpl].provider.rootPath.remoteAddress
val serialized = SerializedActorRef(addr.hostname, addr.port, "/this/path/does/not/exist")
out.writeObject(serialized)

View file

@ -11,12 +11,13 @@ class JavaExtensionSpec extends JavaExtension with JUnitSuite
object ActorSystemSpec {
case class TestExtension extends Extension[TestExtension] {
class TestExtension extends Extension[TestExtension] {
var system: ActorSystemImpl = _
def init(system: ActorSystemImpl): ExtensionKey[TestExtension] = {
def key = TestExtension
def init(system: ActorSystemImpl) {
this.system = system
TestExtension
}
}

View file

@ -9,7 +9,7 @@ import akka.util.duration._
import akka.{ Die, Ping }
import akka.actor.Actor._
import akka.testkit.TestEvent._
import akka.testkit.{ EventFilter, ImplicitSender, AkkaSpec, filterEvents }
import akka.testkit._
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.LinkedBlockingQueue

View file

@ -13,6 +13,7 @@ import akka.serialization.Serialization
import java.util.concurrent.atomic.AtomicReference
import annotation.tailrec
import akka.testkit.{ EventFilter, filterEvents, AkkaSpec }
import akka.serialization.SerializationExtension
object TypedActorSpec {
@ -332,7 +333,8 @@ class TypedActorSpec extends AkkaSpec with BeforeAndAfterEach with BeforeAndAfte
"be able to serialize and deserialize invocations" in {
import java.io._
val m = TypedActor.MethodCall(system.serialization, classOf[Foo].getDeclaredMethod("pigdog"), Array[AnyRef]())
val serialization = SerializationExtension(system).serialization
val m = TypedActor.MethodCall(serialization, classOf[Foo].getDeclaredMethod("pigdog"), Array[AnyRef]())
val baos = new ByteArrayOutputStream(8192 * 4)
val out = new ObjectOutputStream(baos)
@ -351,7 +353,8 @@ class TypedActorSpec extends AkkaSpec with BeforeAndAfterEach with BeforeAndAfte
"be able to serialize and deserialize invocations' parameters" in {
import java.io._
val someFoo: Foo = new Bar
val m = TypedActor.MethodCall(system.serialization, classOf[Foo].getDeclaredMethod("testMethodCallSerialization", Array[Class[_]](classOf[Foo], classOf[String], classOf[Int]): _*), Array[AnyRef](someFoo, null, 1.asInstanceOf[AnyRef]))
val serialization = SerializationExtension(system).serialization
val m = TypedActor.MethodCall(serialization, classOf[Foo].getDeclaredMethod("testMethodCallSerialization", Array[Class[_]](classOf[Foo], classOf[String], classOf[Int]): _*), Array[AnyRef](someFoo, null, 1.asInstanceOf[AnyRef]))
val baos = new ByteArrayOutputStream(8192 * 4)
val out = new ObjectOutputStream(baos)

View file

@ -4,7 +4,7 @@
package akka.actor.dispatch
import org.scalatest.Assertions._
import akka.testkit.{ filterEvents, EventFilter, AkkaSpec }
import akka.testkit._
import akka.dispatch._
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.{ ConcurrentHashMap, CountDownLatch, TimeUnit }

View file

@ -45,8 +45,6 @@ class ConfigSpec extends AkkaSpec(ConfigFactory.parseResource(classOf[ConfigSpec
getBoolean("akka.actor.serialize-messages") must equal(false)
settings.SerializeAllMessages must equal(false)
getString("akka.remote.layer") must equal("akka.cluster.netty.NettyRemoteSupport")
getInt("akka.remote.server.port") must equal(2552)
}
}
}

View file

@ -2,7 +2,7 @@ package akka.routing
import akka.dispatch.{ KeptPromise, Future }
import akka.actor._
import akka.testkit.{ TestLatch, filterEvents, EventFilter, filterException }
import akka.testkit._
import akka.util.duration._
import java.util.concurrent.atomic.{ AtomicBoolean, AtomicInteger }
import akka.testkit.AkkaSpec

View file

@ -10,8 +10,30 @@ import akka.testkit.AkkaSpec
import akka.actor.{ ActorSystem, ActorSystemImpl }
import java.io.{ ObjectInputStream, ByteArrayInputStream, ByteArrayOutputStream, ObjectOutputStream }
import akka.actor.DeadLetterActorRef
import com.typesafe.config.ConfigFactory
import com.typesafe.config.ConfigParseOptions
object SerializeSpec {
val serializationConf = ConfigFactory.parseString("""
akka {
actor {
serializers {
java = "akka.serialization.JavaSerializer"
proto = "akka.testing.ProtobufSerializer"
sjson = "akka.testing.SJSONSerializer"
default = "akka.serialization.JavaSerializer"
}
serialization-bindings {
java = ["akka.serialization.SerializeSpec$Address", "akka.serialization.MyJavaSerializableActor", "akka.serialization.MyStatelessActorWithMessagesInMailbox", "akka.serialization.MyActorWithProtobufMessagesInMailbox"]
sjson = ["akka.serialization.SerializeSpec$Person"]
proto = ["com.google.protobuf.Message", "akka.actor.ProtobufProtocol$MyMessage"]
}
}
}
""", ConfigParseOptions.defaults)
@BeanInfo
case class Address(no: String, street: String, city: String, zip: String) { def this() = this("", "", "", "") }
@BeanInfo
@ -21,16 +43,23 @@ object SerializeSpec {
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class SerializeSpec extends AkkaSpec {
class SerializeSpec extends AkkaSpec(SerializeSpec.serializationConf) {
import SerializeSpec._
val ser = system.serialization
val ser = SerializationExtension(system).serialization
import ser._
val addr = Address("120", "Monroe Street", "Santa Clara", "95050")
val person = Person("debasish ghosh", 25, Address("120", "Monroe Street", "Santa Clara", "95050"))
"Serialization" must {
"have correct bindings" in {
ser.bindings(addr.getClass.getName) must be("java")
ser.bindings(person.getClass.getName) must be("sjson")
}
"serialize Address" in {
val addr = Address("120", "Monroe Street", "Santa Clara", "95050")
val b = serialize(addr) match {
case Left(exception) fail(exception)
case Right(bytes) bytes
@ -42,7 +71,7 @@ class SerializeSpec extends AkkaSpec {
}
"serialize Person" in {
val person = Person("debasish ghosh", 25, Address("120", "Monroe Street", "Santa Clara", "95050"))
val b = serialize(person) match {
case Left(exception) fail(exception)
case Right(bytes) bytes
@ -54,7 +83,7 @@ class SerializeSpec extends AkkaSpec {
}
"serialize record with default serializer" in {
val person = Person("debasish ghosh", 25, Address("120", "Monroe Street", "Santa Clara", "95050"))
val r = Record(100, person)
val b = serialize(r) match {
case Left(exception) fail(exception)

View file

@ -1,16 +0,0 @@
akka {
actor {
serializers {
java = "akka.serialization.JavaSerializer"
proto = "akka.testing.ProtobufSerializer"
sjson = "akka.testing.SJSONSerializer"
default = "akka.serialization.JavaSerializer"
}
serialization-bindings {
java = ["akka.serialization.SerializeSpec$Address", "akka.serialization.MyJavaSerializableActor", "akka.serialization.MyStatelessActorWithMessagesInMailbox", "akka.serialization.MyActorWithProtobufMessagesInMailbox"]
sjson = ["akka.serialization.SerializeSpec$Person"]
proto = ["com.google.protobuf.Message", "akka.actor.ProtobufProtocol$MyMessage"]
}
}
}

View file

@ -78,7 +78,7 @@ public abstract class Parseable implements ConfigParseable {
// so that exceptions are thrown from the public parse() function and not
// from the creation of the Parseable. Essentially this is a lazy field.
// The parser should close the reader when it's done with it.
// ALSO, IMPORTANT: if the file or URL is not found, this must throw.
// ALSO, IMPortANT: if the file or URL is not found, this must throw.
// to support the "allow missing" feature.
protected abstract Reader reader() throws IOException;

View file

@ -20,27 +20,6 @@ akka {
extensions = [] # list FQCN of extensions which shall be loaded at actor system startup
event-handler-dispatcher {
type = "Dispatcher" # Must be one of the following
# Dispatcher, (BalancingDispatcher, only valid when all actors using it are of the same type),
# A FQCN to a class inheriting MessageDispatcherConfigurator with a no-arg visible constructor
name = "EventHandlerDispatcher" # Optional, will be a generated UUID if omitted
keep-alive-time = 60s # Keep alive time for threads
core-pool-size = 1 # No of core threads
max-pool-size = 8 # Max no of threads
executor-bounds = -1 # Makes the Executor bounded, -1 is unbounded
task-queue-size = -1 # Specifies the bounded capacity of the task queue (< 1 == unbounded)
task-queue-type = "linked" # Specifies which type of task queue will be used, can be "array" or "linked" (default)
allow-core-timeout = on # Allow core threads to time out
throughput = 5 # Throughput for Dispatcher, set to 1 for complete fairness
throughput-deadline-time = 0ms # Throughput deadline for Dispatcher, set to 0 or negative for no deadline
mailbox-capacity = -1 # If negative (or zero) then an unbounded mailbox is used (default)
# If positive then a bounded mailbox is used and the capacity is set using the property
# NOTE: setting a mailbox to 'blocking' can be a bit dangerous, could lead to deadlock, use with care
# The following are only used for Dispatcher and only if mailbox-capacity > 0
mailbox-push-timeout-time = 10s # Specifies the timeout to add a new message to a mailbox that is full - negative number means infinite timeout
}
# These boot classes are loaded (and created) automatically when the Akka Microkernel boots up
# Can be used to bootstrap your application(s)
# Should be the FQN (Fully Qualified Name) of the boot class which needs to have a default constructor
@ -126,7 +105,7 @@ akka {
task-queue-type = "linked" # Specifies which type of task queue will be used, can be "array" or "linked" (default)
allow-core-timeout = on # Allow core threads to time out
throughput = 5 # Throughput for Dispatcher, set to 1 for complete fairness
throughput-deadline-time = -0ms # Throughput deadline for Dispatcher, set to 0 or negative for no deadline
throughput-deadline-time = 0ms # Throughput deadline for Dispatcher, set to 0 or negative for no deadline
mailbox-capacity = -1 # If negative (or zero) then an unbounded mailbox is used (default)
# If positive then a bounded mailbox is used and the capacity is set using the property
# NOTE: setting a mailbox to 'blocking' can be a bit dangerous, could lead to deadlock, use with care
@ -142,147 +121,7 @@ akka {
event-stream = off # enable DEBUG logging of subscription changes on the eventStream
}
mailbox {
file-based {
directory-path = "./_mb"
max-items = 2147483647
max-size = 2147483647 bytes
max-items = 2147483647
max-item-size = 2147483647 bytes
max-age = 0s
max-journal-size = 16 megabytes
max-memory-size = 128 megabytes
max-journal-overflow = 10
max-journal-size-absolute = 9223372036854775807 bytes
discard-old-when-full = on
keep-journal = on
sync-journal = off
}
redis {
hostname = "127.0.0.1"
port = 6379
}
mongodb {
# Any specified collection name will be used as a prefix for collections that use durable mongo mailboxes
uri = "mongodb://localhost/akka.mailbox" # Follow Mongo URI Spec - http://www.mongodb.org/display/DOCS/Connections
# Configurable timeouts for certain ops
timeout {
read = 3000ms # time to wait for a read to succeed before timing out the future
write = 3000ms # time to wait for a write to succeed before timing out the future
}
}
zookeeper {
server-addresses = "127.0.0.1:2181"
session-timeout = 60s
connection-timeout = 60s
blocking-queue = on
}
beanstalk {
hostname = "127.0.0.1"
port = 11300
reconnect-window = 5s
message-submit-delay = 0s
message-submit-timeout = 5s
message-time-to-live = 120s
}
}
# Entries for pluggable serializers and their bindings. If a binding for a specific class is not found,
# then the default serializer (Java serialization) is used.
#
serializers {
# java = "akka.serialization.JavaSerializer"
# proto = "akka.testing.ProtobufSerializer"
# sjson = "akka.testing.SJSONSerializer"
default = "akka.serialization.JavaSerializer"
}
# serialization-bindings {
# java = ["akka.serialization.SerializeSpec$Address",
# "akka.serialization.MyJavaSerializableActor",
# "akka.serialization.MyStatelessActorWithMessagesInMailbox",
# "akka.serialization.MyActorWithProtobufMessagesInMailbox"]
# sjson = ["akka.serialization.SerializeSpec$Person"]
# proto = ["com.google.protobuf.Message",
# "akka.actor.ProtobufProtocol$MyMessage"]
# }
}
remote {
# FIXME rename to transport
layer = "akka.cluster.netty.NettyRemoteSupport"
use-compression = off
secure-cookie = "" # Generate your own with '$AKKA_HOME/scripts/generate_config_with_secure_cookie.sh'
# or using 'akka.util.Crypt.generateSecureCookie'
remote-daemon-ack-timeout = 30s # Timeout for ACK of cluster operations, lik checking actor out etc.
use-passive-connections = on # Reuse inbound connections for outbound messages
failure-detector { # accrual failure detection config
threshold = 8 # defines the failure detector threshold
# A low threshold is prone to generate many wrong suspicions but ensures a
# quick detection in the event of a real crash. Conversely, a high threshold
# generates fewer mistakes but needs more time to detect actual crashes
max-sample-size = 1000
}
server {
port = 2552 # The default remote server port clients should connect to. Default is 2552 (AKKA)
message-frame-size = 1048576 # Increase this if you want to be able to send messages with large payloads
connection-timeout = 120s # Timeout duration
require-cookie = off # Should the remote server require that it peers share the same secure-cookie (defined in the 'remote' section)?
untrusted-mode = off # Enable untrusted mode for full security of server managed actors, allows untrusted clients to connect.
backlog = 4096 # Sets the size of the connection backlog
}
client {
buffering {
retry-message-send-on-failure = off # Should message buffering on remote client error be used (buffer flushed on successful reconnect)
capacity = -1 # If negative (or zero) then an unbounded mailbox is used (default)
# If positive then a bounded mailbox is used and the capacity is set using the property
}
reconnect-delay = 5s
read-timeout = 3600s
message-frame-size = 1048576
reap-futures-delay = 5s # FIXME: This is not used anywhere (except in ClusterSpec), remove?
reconnection-time-window = 600s # Maximum time window that a client should try to reconnect for
}
}
cluster {
name = "test-cluster"
zookeeper-server-addresses = "localhost:2181" # comma-separated list of '<hostname>:<port>' elements
max-time-to-wait-until-connected = 30s
session-timeout = 60s
connection-timeout = 60s
include-ref-node-in-replica-set = on # Can a replica be instantiated on the same node as the cluster reference to the actor
# Default: on
log-directory = "_akka_cluster" # Where ZooKeeper should store the logs and data files
replication {
digest-type = "MAC" # Options: CRC32 (cheap & unsafe), MAC (expensive & secure using password)
password = "secret" # FIXME: store open in file?
ensemble-size = 3
quorum-size = 2
snapshot-frequency = 1000 # The number of messages that should be logged between every actor snapshot
timeout = 30s # Timeout for asyncronous (write-behind) operations
}
}
# TODO move to testkit-reference
test {
timefactor = "1.0" # factor by which to scale timeouts during tests, e.g. to account for shared build system load
filter-leeway = 3s # duration of EventFilter.intercept waits after the block is finished until all required messages are received
single-expect-default = 3s # duration to wait in expectMsg and friends outside of within() block by default
}
}

View file

@ -0,0 +1,32 @@
############################################
# Akka Serialization Reference Config File #
############################################
# This the reference config file has all the default settings.
# Make your edits/overrides in your akka.conf.
akka {
actor {
# Entries for pluggable serializers and their bindings. If a binding for a specific class is not found,
# then the default serializer (Java serialization) is used.
#
serializers {
# java = "akka.serialization.JavaSerializer"
# proto = "akka.testing.ProtobufSerializer"
# sjson = "akka.testing.SJSONSerializer"
default = "akka.serialization.JavaSerializer"
}
# serialization-bindings {
# java = ["akka.serialization.SerializeSpec$Address",
# "akka.serialization.MyJavaSerializableActor",
# "akka.serialization.MyStatelessActorWithMessagesInMailbox",
# "akka.serialization.MyActorWithProtobufMessagesInMailbox"]
# sjson = ["akka.serialization.SerializeSpec$Person"]
# proto = ["com.google.protobuf.Message",
# "akka.actor.ProtobufProtocol$MyMessage"]
# }
}
}

View file

@ -9,7 +9,6 @@ import akka.dispatch._
import akka.routing._
import akka.util.Duration
import akka.remote.RemoteSupport
import akka.cluster.ClusterNode
import akka.japi.{ Creator, Procedure }
import akka.serialization.{ Serializer, Serialization }
import akka.event.Logging.Debug

View file

@ -21,7 +21,7 @@ object ActorPath {
* Create an actor path from an iterable.
*/
def apply(system: ActorSystem, path: Iterable[String]): ActorPath =
path.foldLeft(system.rootPath)(_ / _)
path.foldLeft(system.asInstanceOf[ActorSystemImpl].provider.rootPath)(_ / _)
/**
* Split a string path into an iterable.

View file

@ -314,7 +314,7 @@ case class SerializedActorRef(hostname: String, port: Int, path: String) {
def readResolve(): AnyRef = {
if (system.value eq null) throw new IllegalStateException(
"Trying to deserialize a serialized ActorRef without an ActorSystem in scope." +
" Use akka.serialization.Serialization.system.withValue(akkaApplication) { ... }")
" Use akka.serialization.Serialization.system.withValue(system) { ... }")
system.value.provider.deserialize(this) match {
case Some(actor) actor
case None throw new IllegalStateException("Could not deserialize ActorRef")
@ -360,11 +360,18 @@ object DeadLetterActorRef {
val serialized = new SerializedDeadLetterActorRef
}
class DeadLetterActorRef(val eventStream: EventStream, val path: ActorPath) extends MinimalActorRef {
class DeadLetterActorRef(val eventStream: EventStream) extends MinimalActorRef {
@volatile
var brokenPromise: Future[Any] = _
private var brokenPromise: Future[Any] = _
@volatile
private var _path: ActorPath = _
def path: ActorPath = {
assert(_path != null)
_path
}
private[akka] def init(dispatcher: MessageDispatcher) {
private[akka] def init(dispatcher: MessageDispatcher, rootPath: ActorPath) {
_path = rootPath / "nul"
brokenPromise = new KeptPromise[Any](Left(new ActorKilledException("In DeadLetterActorRef, promises are always broken.")))(dispatcher)
}

View file

@ -16,6 +16,8 @@ import akka.routing.{ ScatterGatherFirstCompletedRouter, Routing, RouterType, Ro
import akka.AkkaException
import com.eaio.uuid.UUID
import akka.util.{ Switch, Helpers }
import akka.remote.RemoteAddress
import akka.remote.LocalOnly
/**
* Interface for all ActorRef providers to implement.
@ -32,8 +34,16 @@ trait ActorRefProvider {
def deathWatch: DeathWatch
// FIXME: remove/replace
// FIXME: remove/replace?
def nodename: String
// FIXME: remove/replace?
def clustername: String
/**
* The root path for all actors within this actor system, including remote
* address if enabled.
*/
def rootPath: ActorPath
def settings: ActorSystem.Settings
@ -134,23 +144,20 @@ class ActorRefProviderException(message: String) extends AkkaException(message)
*/
class LocalActorRefProvider(
val settings: ActorSystem.Settings,
val rootPath: ActorPath,
val eventStream: EventStream,
val dispatcher: MessageDispatcher,
val scheduler: Scheduler) extends ActorRefProvider {
val scheduler: Scheduler,
val rootPath: ActorPath,
val nodename: String,
val clustername: String) extends ActorRefProvider {
def this(settings: ActorSystem.Settings, eventStream: EventStream, scheduler: Scheduler) {
this(settings, eventStream, scheduler, new RootActorPath(LocalOnly), "local", "local")
}
val log = Logging(eventStream, "LocalActorRefProvider")
// FIXME remove/replave (clustering shall not leak into akka-actor)
val nodename: String = System.getProperty("akka.cluster.nodename") match {
case null | "" new UUID().toString
case value value
}
private[akka] val deployer: Deployer = new Deployer(settings, eventStream, nodename)
val terminationFuture = new DefaultPromise[Unit](Timeout.never)(dispatcher)
/*
* generate name for temporary actor refs
*/
@ -227,26 +234,23 @@ class LocalActorRefProvider(
* provide their service. Hence they cannot be created while the
* constructors of ActorSystem and ActorRefProvider are still running.
* The solution is to split out that last part into an init() method,
* but it also requires these references to be @volatile.
* but it also requires these references to be @volatile and lazy.
*/
@volatile
private var rootGuardian: ActorRef = _
@volatile
private var _guardian: ActorRef = _
@volatile
private var _systemGuardian: ActorRef = _
def guardian = _guardian
def systemGuardian = _systemGuardian
private var system: ActorSystemImpl = _
def dispatcher: MessageDispatcher = system.dispatcher
lazy val terminationFuture: DefaultPromise[Unit] = new DefaultPromise[Unit](Timeout.never)(dispatcher)
lazy val rootGuardian: ActorRef = actorOf(system, guardianProps, theOneWhoWalksTheBubblesOfSpaceTime, rootPath, true)
lazy val guardian: ActorRef = actorOf(system, guardianProps, rootGuardian, "app", true)
lazy val systemGuardian: ActorRef = actorOf(system, guardianProps.withCreator(new SystemGuardian), rootGuardian, "sys", true)
val deathWatch = createDeathWatch()
def init(system: ActorSystemImpl) {
rootGuardian = actorOf(system, guardianProps, theOneWhoWalksTheBubblesOfSpaceTime, rootPath, true)
_guardian = actorOf(system, guardianProps, rootGuardian, "app", true)
_systemGuardian = actorOf(system, guardianProps.withCreator(new SystemGuardian), rootGuardian, "sys", true)
def init(_system: ActorSystemImpl) {
system = _system
// chain death watchers so that killing guardian stops the application
deathWatch.subscribe(_systemGuardian, _guardian)
deathWatch.subscribe(rootGuardian, _systemGuardian)
deathWatch.subscribe(systemGuardian, guardian)
deathWatch.subscribe(rootGuardian, systemGuardian)
}
// FIXME (actor path): should start at the new root guardian, and not use the tail (just to avoid the expected "system" name for now)

View file

@ -14,6 +14,7 @@ import akka.serialization.Serialization
import akka.remote.RemoteAddress
import org.jboss.netty.akka.util.HashedWheelTimer
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit.SECONDS
import java.util.concurrent.TimeUnit.MILLISECONDS
import java.util.concurrent.TimeUnit.NANOSECONDS
import java.io.File
@ -25,6 +26,9 @@ import java.lang.reflect.InvocationTargetException
import java.util.concurrent.ConcurrentHashMap
import akka.util.{ Helpers, Duration, ReflectiveAccess }
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.CountDownLatch
import scala.annotation.tailrec
import akka.serialization.SerializationExtension
object ActorSystem {
@ -55,7 +59,7 @@ object ActorSystem {
private def referenceConfig: Config =
ConfigFactory.parseResource(classOf[ActorSystem], "/akka-actor-reference.conf",
ConfigParseOptions.defaults.setAllowMissing(false))
val config: ConfigRoot = ConfigFactory.emptyRoot("akka").withFallback(cfg).withFallback(referenceConfig).resolve()
val config: ConfigRoot = ConfigFactory.emptyRoot("akka-actor").withFallback(cfg).withFallback(referenceConfig).resolve()
import scala.collection.JavaConverters._
import config._
@ -64,12 +68,9 @@ object ActorSystem {
val ProviderClass = getString("akka.actor.provider")
val ActorTimeout = Timeout(Duration(getMilliseconds("akka.actor.timeout"), MILLISECONDS))
// TODO This isn't used anywhere. Remove?
val SerializeAllMessages = getBoolean("akka.actor.serialize-messages")
val TestTimeFactor = getDouble("akka.test.timefactor")
val SingleExpectDefaultTimeout = Duration(getMilliseconds("akka.test.single-expect-default"), MILLISECONDS)
val TestEventFilterLeeway = Duration(getMilliseconds("akka.test.filter-leeway"), MILLISECONDS)
val LogLevel = getString("akka.loglevel")
val StdoutLogLevel = getString("akka.stdout-loglevel")
val EventHandlers: Seq[String] = getStringList("akka.event-handlers").asScala
@ -93,16 +94,6 @@ object ActorSystem {
val EnabledModules: Seq[String] = getStringList("akka.enabled-modules").asScala
// FIXME move to cluster extension
val ClusterEnabled = EnabledModules exists (_ == "cluster")
val ClusterName = getString("akka.cluster.name")
// FIXME move to remote extension
val RemoteTransport = getString("akka.remote.layer")
val RemoteServerPort = getInt("akka.remote.server.port")
val FailureDetectorThreshold = getInt("akka.remote.failure-detector.threshold")
val FailureDetectorMaxSampleSize = getInt("akka.remote.failure-detector.max-sample-size")
if (ConfigVersion != Version)
throw new ConfigurationException("Akka JAR version [" + Version +
"] does not match the provided config version [" + ConfigVersion + "]")
@ -175,17 +166,16 @@ abstract class ActorSystem extends ActorRefFactory with TypedActorFactory {
*/
def nodename: String
/**
* The logical name of the cluster this actor system belongs to.
*/
def clustername: String
/**
* Construct a path below the application guardian to be used with [[ActorSystem.actorFor]].
*/
def /(name: String): ActorPath
/**
* The root path for all actors within this actor system, including remote
* address if enabled.
*/
def rootPath: ActorPath
/**
* Start-up time in milliseconds since the epoch.
*/
@ -215,8 +205,6 @@ abstract class ActorSystem extends ActorRefFactory with TypedActorFactory {
// FIXME: do not publish this
def deadLetterMailbox: Mailbox
// FIXME: Serialization should be an extension
def serialization: Serialization
// FIXME: TypedActor should be an extension
def typedActor: TypedActor
@ -268,7 +256,7 @@ abstract class ActorSystem extends ActorRefFactory with TypedActorFactory {
* Extensions can be registered automatically by adding their fully-qualified
* class name to the `akka.extensions` configuration key.
*/
def registerExtension(ext: Extension[_ <: AnyRef])
def registerExtension[T <: AnyRef](ext: Extension[T]): Extension[T]
/**
* Obtain a reference to a registered extension by passing in the key which
@ -292,11 +280,11 @@ abstract class ActorSystem extends ActorRefFactory with TypedActorFactory {
def hasExtension(key: ExtensionKey[_]): Boolean
}
class ActorSystemImpl(val name: String, _config: Config) extends ActorSystem {
class ActorSystemImpl(val name: String, val applicationConfig: Config) extends ActorSystem {
import ActorSystem._
val settings = new Settings(_config)
val settings = new Settings(applicationConfig)
protected def systemImpl = this
@ -304,25 +292,34 @@ class ActorSystemImpl(val name: String, _config: Config) extends ActorSystem {
import settings._
val address = RemoteAddress(System.getProperty("akka.remote.hostname") match {
case null | "" InetAddress.getLocalHost.getHostAddress
case value value
}, System.getProperty("akka.remote.port") match {
case null | "" settings.RemoteServerPort
case value value.toInt
})
// this provides basic logging (to stdout) until .start() is called below
val eventStream = new EventStream(DebugEventStream)
eventStream.startStdoutLogger(settings)
val log = new BusLogging(eventStream, "ActorSystem") // this used only for .getClass in tagging messages
/**
* The root actor path for this application.
*/
val rootPath: ActorPath = new RootActorPath(address)
// FIXME make this configurable
val scheduler = new DefaultScheduler(new HashedWheelTimer(log, Executors.defaultThreadFactory, 100, MILLISECONDS, 512))
val deadLetters = new DeadLetterActorRef(eventStream, rootPath / "nul")
val provider: ActorRefProvider = {
val providerClass = ReflectiveAccess.getClassFor(ProviderClass) match {
case Left(e) throw e
case Right(b) b
}
val arguments = Seq(
classOf[Settings] -> settings,
classOf[EventStream] -> eventStream,
classOf[Scheduler] -> scheduler)
val types: Array[Class[_]] = arguments map (_._1) toArray
val values: Array[AnyRef] = arguments map (_._2) toArray
ReflectiveAccess.createInstance[ActorRefProvider](providerClass, types, values) match {
case Left(e: InvocationTargetException) throw e.getTargetException
case Left(e) throw e
case Right(p) p
}
}
val deadLetters = new DeadLetterActorRef(eventStream)
val deadLetterMailbox = new Mailbox(null) {
becomeClosed()
override def enqueue(receiver: ActorRef, envelope: Envelope) { deadLetters ! DeadLetter(envelope.message, envelope.sender, receiver) }
@ -334,48 +331,20 @@ class ActorSystemImpl(val name: String, _config: Config) extends ActorSystem {
override def numberOfMessages = 0
}
// FIXME make this configurable
val scheduler = new DefaultScheduler(new HashedWheelTimer(log, Executors.defaultThreadFactory, 100, MILLISECONDS, 512))
// TODO correctly pull its config from the config
val dispatcherFactory = new Dispatchers(settings, DefaultDispatcherPrerequisites(eventStream, deadLetterMailbox, scheduler))
implicit val dispatcher = dispatcherFactory.defaultGlobalDispatcher
deadLetters.init(dispatcher)
val provider: ActorRefProvider = {
val providerClass = ReflectiveAccess.getClassFor(ProviderClass) match {
case Left(e) throw e
case Right(b) b
}
val arguments = Seq(
classOf[Settings] -> settings,
classOf[ActorPath] -> rootPath,
classOf[EventStream] -> eventStream,
classOf[MessageDispatcher] -> dispatcher,
classOf[Scheduler] -> scheduler)
val types: Array[Class[_]] = arguments map (_._1) toArray
val values: Array[AnyRef] = arguments map (_._2) toArray
ReflectiveAccess.createInstance[ActorRefProvider](providerClass, types, values) match {
case Left(e: InvocationTargetException) throw e.getTargetException
case Left(e) throw e
case Right(p) p
}
}
//FIXME Set this to a Failure when things bubble to the top
def terminationFuture: Future[Unit] = provider.terminationFuture
def guardian: ActorRef = provider.guardian
def systemGuardian: ActorRef = provider.systemGuardian
def deathWatch: DeathWatch = provider.deathWatch
def nodename: String = provider.nodename
def clustername: String = provider.clustername
private final val nextName = new AtomicLong
override protected def randomName(): String = Helpers.base64(nextName.incrementAndGet())
@volatile
private var _serialization: Serialization = _
def serialization = _serialization
@volatile
private var _typedActor: TypedActor = _
def typedActor = _typedActor
@ -383,9 +352,10 @@ class ActorSystemImpl(val name: String, _config: Config) extends ActorSystem {
def /(actorName: String): ActorPath = guardian.path / actorName
private lazy val _start: this.type = {
_serialization = new Serialization(this)
_typedActor = new TypedActor(settings, _serialization)
// TODO can we do something better than loading SerializationExtension from here?
_typedActor = new TypedActor(settings, SerializationExtension(this).serialization)
provider.init(this)
deadLetters.init(dispatcher, provider.rootPath)
// this starts the reaper actor and the user-configured logging subscribers, which are also actors
eventStream.start(this)
eventStream.startDefaultLoggers(this)
@ -405,22 +375,56 @@ class ActorSystemImpl(val name: String, _config: Config) extends ActorSystem {
terminationFuture onComplete (_ dispatcher.shutdown())
}
private val extensions = new ConcurrentHashMap[ExtensionKey[_], Extension[_]]
private val extensions = new ConcurrentHashMap[ExtensionKey[_], AnyRef]
def registerExtension(ext: Extension[_ <: AnyRef]) {
val key = ext.init(this)
extensions.put(key, ext) match {
case null
case old log.warning("replacing extension {}:{} with {}", key, old, ext)
/**
* Attempts to initialize and register this extension if the key associated with it isn't already registered.
* The extension will only be initialized if it isn't already registered.
* Rethrows anything thrown when initializing the extension (doesn't register in that case)
* Returns the registered extension, might be another already registered instance.
*/
@tailrec
final def registerExtension[T <: AnyRef](ext: Extension[T]): Extension[T] = {
/**
* Returns any extension registered to the specified key or returns null if not registered
*/
@tailrec
def lookupExtension[T <: AnyRef](key: ExtensionKey[T]): T = extensions.get(key) match {
case c: CountDownLatch c.await(); lookupExtension(key) //Registration in process, await completion and retry
case e: Extension[_] e.asInstanceOf[T] //Profit!
case null null.asInstanceOf[T] //Doesn't exist
}
lookupExtension(ext.key) match {
case e: Extension[_] e.asInstanceOf[Extension[T]] //Profit!
case null //Doesn't already exist, commence registration
val inProcessOfRegistration = new CountDownLatch(1)
extensions.putIfAbsent(ext.key, inProcessOfRegistration) match { // Signal that registration is in process
case null try { // Signal was successfully sent
ext.init(this) //Initialize the new extension
extensions.replace(ext.key, inProcessOfRegistration, ext) //Replace our in process signal with the initialized extension
ext //Profit!
} catch {
case t
extensions.remove(ext.key, inProcessOfRegistration) //In case shit hits the fan, remove the inProcess signal
throw t //Escalate to caller
} finally {
inProcessOfRegistration.countDown //Always notify listeners of the inProcess signal
}
case other registerExtension(ext) //Someone else is in process of registering an extension for this key, retry
}
}
}
def extension[T <: AnyRef](key: ExtensionKey[T]): T = extensions.get(key) match {
case null throw new IllegalArgumentException("trying to get non-registered extension " + key)
case x x.asInstanceOf[T]
case x: Extension[_] x.asInstanceOf[T]
case _ throw new IllegalArgumentException("trying to get non-registered extension " + key)
}
def hasExtension(key: ExtensionKey[_]): Boolean = extensions.get(key) != null
def hasExtension(key: ExtensionKey[_]): Boolean = extensions.get(key) match {
case x: Extension[_] true
case _ false
}
private def loadExtensions() {
import scala.collection.JavaConversions._

View file

@ -20,9 +20,9 @@ package akka.actor
*
* {{{
* class MyExtension extends Extension[MyExtension] {
* def init(system: ActorSystemImpl): ExtensionKey[MyExtension] = {
* def key = MyExtension
* def init(system: ActorSystemImpl) {
* ... // initialize here
* MyExtension
* }
* }
* object MyExtension extends ExtensionKey[MyExtension]
@ -34,14 +34,17 @@ package akka.actor
* static class MyExtension implements Extension<MyExtension> {
* public static ExtensionKey<MyExtension> key = new ExtensionKey<MyExtension>() {};
*
* public ExtensionKey<MyExtension> init(ActorSystemImpl system) {
* ... // initialize here
* public ExtensionKey<TestExtension> key() {
* return key;
* }
* public void init(ActorSystemImpl system) {
* ... // initialize here
* }
* }
* }}}
*/
trait Extension[T <: AnyRef] {
/**
* This method is called by the ActorSystem upon registering this extension.
* The key returned is used for looking up extensions, hence it must be a
@ -49,7 +52,13 @@ trait Extension[T <: AnyRef] {
* best achieved by storing it in a static field (Java) or as/in an object
* (Scala).
*/
def init(system: ActorSystemImpl): ExtensionKey[T]
def key: ExtensionKey[T]
/**
* This method is called by the ActorSystem when the extension is registered
* to trigger initialization of the extension.
*/
def init(system: ActorSystemImpl)
}
/**

View file

@ -10,6 +10,7 @@ import akka.util.{ Duration }
import java.util.concurrent.atomic.{ AtomicReference AtomVar }
import akka.serialization.{ Serializer, Serialization }
import akka.dispatch._
import akka.serialization.SerializationExtension
object TypedActor {
/**
@ -60,14 +61,15 @@ object TypedActor {
val system = akka.serialization.Serialization.system.value
if (system eq null) throw new IllegalStateException(
"Trying to deserialize a SerializedMethodCall without an ActorSystem in scope." +
" Use akka.serialization.Serialization.system.withValue(akkaApplication) { ... }")
MethodCall(system.serialization, ownerType.getDeclaredMethod(methodName, parameterTypes: _*), serializedParameters match {
" Use akka.serialization.Serialization.system.withValue(system) { ... }")
val serialization = SerializationExtension(system).serialization
MethodCall(serialization, ownerType.getDeclaredMethod(methodName, parameterTypes: _*), serializedParameters match {
case null null
case a if a.length == 0 Array[AnyRef]()
case a
val deserializedParameters: Array[AnyRef] = Array.ofDim[AnyRef](a.length) //Mutable for the sake of sanity
for (i 0 until a.length) {
deserializedParameters(i) = system.serialization.serializerByIdentity(serializerIdentifiers(i)).fromBinary(serializedParameters(i))
deserializedParameters(i) = serialization.serializerByIdentity(serializerIdentifiers(i)).fromBinary(serializedParameters(i))
}
deserializedParameters
})

View file

@ -103,7 +103,7 @@ class NodeAddress(val clusterName: String, val nodeName: String) {
*/
object NodeAddress {
def apply(clusterName: String, nodeName: String): NodeAddress = new NodeAddress(clusterName, nodeName)
def apply(system: ActorSystem): NodeAddress = new NodeAddress(system.settings.ClusterName, system.nodename)
def apply(system: ActorSystem): NodeAddress = new NodeAddress(system.clustername, system.nodename)
def unapply(other: Any) = other match {
case address: NodeAddress Some((address.clusterName, address.nodeName))

View file

@ -36,7 +36,7 @@ import akka.util.Duration
* .withNewThreadPoolWithBoundedBlockingQueue(100)
* .setCorePoolSize(16)
* .setMaxPoolSize(128)
* .setKeepAliveTimeInMillis(60000)
* .setKeepAliveTime(60 seconds)
* .buildThreadPool
* </pre>
* <p/>
@ -50,7 +50,7 @@ import akka.util.Duration
* .withNewThreadPoolWithBoundedBlockingQueue(100)
* .setCorePoolSize(16)
* .setMaxPoolSize(128)
* .setKeepAliveTimeInMillis(60000)
* .setKeepAliveTime(60 seconds)
* .buildThreadPool();
* </pre>
* <p/>

View file

@ -36,7 +36,7 @@ case class DefaultDispatcherPrerequisites(
* .withNewThreadPoolWithLinkedBlockingQueueWithCapacity(100)
* .setCorePoolSize(16)
* .setMaxPoolSize(128)
* .setKeepAliveTimeInMillis(60000)
* .setKeepAliveTime(60 seconds)
* .build
* </pre>
* <p/>
@ -49,7 +49,7 @@ case class DefaultDispatcherPrerequisites(
* .withNewThreadPoolWithLinkedBlockingQueueWithCapacity(100)
* .setCorePoolSize(16)
* .setMaxPoolSize(128)
* .setKeepAliveTimeInMillis(60000)
* .setKeepAliveTime(60 seconds)
* .build();
* </pre>
* <p/>
@ -174,7 +174,9 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc
config, settings.DispatcherDefaultShutdown), ThreadPoolConfig())
/**
* Utility function that tries to load the specified dispatcher config from the akka.conf
* or else use the supplied default dispatcher
* or if not defined it uses the supplied dispatcher.
* Uses default values from default-dispatcher, i.e. all options doesn't need to be defined
* in config.
*/
def fromConfig(key: String, default: MessageDispatcher = defaultGlobalDispatcher, cfg: Config = settings.config): MessageDispatcher = {
import scala.collection.JavaConverters._
@ -192,7 +194,7 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc
* Creates of obtains a dispatcher from a ConfigMap according to the format below.
* Uses default values from default-dispatcher.
*
* default-dispatcher {
* my-dispatcher {
* type = "Dispatcher" # Must be one of the following
* # Dispatcher, (BalancingDispatcher, only valid when all actors using it are of the same type),
* # A FQCN to a class inheriting MessageDispatcherConfigurator with a no-arg visible constructor

View file

@ -26,6 +26,8 @@ object RemoteAddress {
}
}
object LocalOnly extends RemoteAddress(0, "local")
case class RemoteAddress private[akka] (port: Int, hostname: String) {
@transient
override lazy val toString = "" + hostname + ":" + port

View file

@ -8,7 +8,6 @@ import akka.AkkaException
import akka.util.ReflectiveAccess
import akka.actor.{ ActorSystem, ActorSystemImpl }
import scala.util.DynamicVariable
import akka.remote.RemoteSupport
case class NoSerializerFoundException(m: String) extends AkkaException(m)
@ -64,14 +63,15 @@ class Serialization(val system: ActorSystemImpl) {
}
}
// serializers and bindings needs to be lazy because Serialization is initialized from SerializationExtension, which is needed here
/**
* A Map of serializer from alias to implementation (class implementing akka.serialization.Serializer)
* By default always contains the following mapping: "default" -> akka.serialization.JavaSerializer
* But "default" can be overridden in config
*/
val serializers: Map[String, Serializer] = {
import scala.collection.JavaConverters._
val serializersConf = system.settings.config.getConfig("akka.actor.serializers").toObject.unwrapped.asScala.toMap
lazy val serializers: Map[String, Serializer] = {
val serializersConf = SerializationExtension(system).settings.Serializers
for ((k: String, v: String) serializersConf)
yield k -> serializerOf(v).fold(throw _, identity)
}
@ -79,29 +79,27 @@ class Serialization(val system: ActorSystemImpl) {
/**
* bindings is a Map whose keys = FQN of class that is serializable and values = the alias of the serializer to be used
*/
val bindings: Map[String, String] = {
import scala.collection.JavaConverters._
val configPath = "akka.actor.serialization-bindings"
system.settings.config.hasPath(configPath) match {
case false Map()
case true
val serializationBindings = system.settings.config.getConfig(configPath).toObject.unwrapped.asScala
serializationBindings.foldLeft(Map[String, String]()) {
case (result, (k: String, vs: List[_])) result ++ (vs collect { case v: String (v, k) }) //All keys which are lists, take the Strings from them and Map them
case (result, _) result //For any other values, just skip them, TODO: print out warnings?
}
lazy val bindings: Map[String, String] = {
val configBindings = SerializationExtension(system).settings.SerializationBindings
configBindings.foldLeft(Map[String, String]()) {
case (result, (k: String, vs: Seq[_]))
//All keys which are lists, take the Strings from them and Map them
result ++ (vs collect { case v: String (v, k) })
case (result, x)
//For any other values, just skip them
result
}
}
/**
* serializerMap is a Map whose keys = FQN of class that is serializable and values = the FQN of the serializer to be used for that class
*/
val serializerMap: Map[String, Serializer] = bindings mapValues serializers
lazy val serializerMap: Map[String, Serializer] = bindings mapValues serializers
/**
* Maps from a Serializer.Identifier (Byte) to a Serializer instance (optimization)
*/
val serializerByIdentity: Map[Serializer.Identifier, Serializer] =
lazy val serializerByIdentity: Map[Serializer.Identifier, Serializer] =
Map(NullSerializer.identifier -> NullSerializer) ++ serializers map { case (_, v) (v.identifier, v) }
}

View file

@ -0,0 +1,79 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.serialization
import akka.actor.ActorSystem
import akka.actor.ExtensionKey
import akka.actor.Extension
import akka.actor.ActorSystemImpl
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import com.typesafe.config.ConfigParseOptions
import com.typesafe.config.ConfigRoot
import akka.config.ConfigurationException
object SerializationExtensionKey extends ExtensionKey[SerializationExtension]
object SerializationExtension {
def apply(system: ActorSystem): SerializationExtension = {
if (!system.hasExtension(SerializationExtensionKey)) {
system.registerExtension(new SerializationExtension)
}
system.extension(SerializationExtensionKey)
}
class Settings(cfg: Config) {
private def referenceConfig: Config =
ConfigFactory.parseResource(classOf[ActorSystem], "/akka-serialization-reference.conf",
ConfigParseOptions.defaults.setAllowMissing(false))
val config: ConfigRoot = ConfigFactory.emptyRoot("akka-serialization").withFallback(cfg).withFallback(referenceConfig).resolve()
import scala.collection.JavaConverters._
import config._
val Serializers: Map[String, String] = {
toStringMap(getConfig("akka.actor.serializers"))
}
val SerializationBindings: Map[String, Seq[String]] = {
val configPath = "akka.actor.serialization-bindings"
hasPath(configPath) match {
case false Map()
case true
val serializationBindings: Map[String, Seq[String]] = getConfig(configPath).toObject.unwrapped.asScala.toMap.map {
case (k: String, v: java.util.Collection[_]) (k -> v.asScala.toSeq.asInstanceOf[Seq[String]])
case invalid throw new ConfigurationException("Invalid serialization-bindings [%s]".format(invalid))
}
serializationBindings
}
}
private def toStringMap(mapConfig: Config): Map[String, String] = {
mapConfig.toObject.unwrapped.asScala.toMap.map { entry
(entry._1 -> entry._2.toString)
}
}
}
}
class SerializationExtension extends Extension[SerializationExtension] {
import SerializationExtension._
@volatile
private var _settings: Settings = _
@volatile
private var _serialization: Serialization = _
def serialization = _serialization
def key = SerializationExtensionKey
def init(system: ActorSystemImpl) {
_settings = new Settings(system.applicationConfig)
_serialization = new Serialization(system)
}
def settings: Settings = _settings
}

View file

@ -278,7 +278,6 @@ abstract class Duration extends Serializable {
def /(other: Duration): Double
def unary_- : Duration
def finite_? : Boolean
def dilated(implicit system: ActorSystem): Duration = this * system.settings.TestTimeFactor
def min(other: Duration): Duration = if (this < other) this else other
def max(other: Duration): Duration = if (this > other) this else other
def sleep(): Unit = Thread.sleep(toMillis)
@ -483,3 +482,4 @@ class DurationDouble(d: Double) {
def days = Duration(d, DAYS)
def day = Duration(d, DAYS)
}

View file

@ -0,0 +1,32 @@
package akka.docs.config
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
//#imports
import akka.actor.ActorSystem
import com.typesafe.config.ConfigFactory
import com.typesafe.config.ConfigParseOptions
//#imports
class ConfigDocSpec extends WordSpec {
"programmatically configure ActorSystem" in {
//#custom-config
val customConf = ConfigFactory.parseString("""
akka.actor.deployment {
/app/my-service {
router = round-robin
nr-of-instances = 3
}
}
""", ConfigParseOptions.defaults)
val system = ActorSystem("MySystem", ConfigFactory.systemProperties.withFallback(customConf))
//#custom-config
system.stop()
}
}

View file

@ -11,9 +11,10 @@ Configuration
Specifying the configuration file
---------------------------------
If you don't specify a configuration file then Akka uses default values, corresponding to the ``akka-reference.conf``
that you see below. You can specify your own configuration file to override any property in the reference config.
You only have to define the properties that differ from the default configuration.
If you don't specify a configuration file then Akka uses default values, corresponding to the reference
configuration files that you see below. You can specify your own configuration file to override any
property in the reference config. You only have to define the properties that differ from the default
configuration.
The location of the config file to use can be specified in various ways:
@ -29,22 +30,74 @@ The location of the config file to use can be specified in various ways:
If several of these ways to specify the config file are used at the same time the precedence is the order as given above,
i.e. you can always redefine the location with the ``-Dakka.config=...`` system property.
You may also specify the configuration programmatically when instantiating the ``ActorSystem``.
.. includecode:: code/ConfigDocSpec.scala
:include: imports,custom-config
The ``ConfigFactory`` provides several methods to parse the configuration from various sources.
Defining the configuration file
-------------------------------
Here is the reference configuration file:
Each Akka module has a reference configuration file with the default values.
.. literalinclude:: ../../config/akka-reference.conf
*akka-actor:*
.. literalinclude:: ../../akka-actor/src/main/resources/akka-actor-reference.conf
:language: none
*akka-remote:*
.. literalinclude:: ../../akka-remote/src/main/resources/akka-remote-reference.conf
:language: none
*akka-serialization:*
.. literalinclude:: ../../akka-actor/src/main/resources/akka-serialization-reference.conf
:language: none
*akka-testkit:*
.. literalinclude:: ../../akka-testkit/src/main/resources/akka-testkit-reference.conf
:language: none
*akka-beanstalk-mailbox:*
.. literalinclude:: ../../akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/resources/akka-beanstalk-mailbox-reference.conf
:language: none
*akka-file-mailbox:*
.. literalinclude:: ../../akka-durable-mailboxes/akka-file-mailbox/src/main/resources/akka-file-mailbox-reference.conf
:language: none
*akka-mongo-mailbox:*
.. literalinclude:: ../../akka-durable-mailboxes/akka-mongo-mailbox/src/main/resources/akka-mongo-mailbox-reference.conf
:language: none
*akka-redis-mailbox:*
.. literalinclude:: ../../akka-durable-mailboxes/akka-redis-mailbox/src/main/resources/akka-redis-mailbox-reference.conf
:language: none
*akka-zookeeper-mailbox:*
.. literalinclude:: ../../akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/resources/akka-zookeeper-mailbox-reference.conf
:language: none
A custom ``akka.conf`` might look like this::
# In this file you can override any option defined in the 'akka-reference.conf' file.
# Copy in all or parts of the 'akka-reference.conf' file and modify as you please.
# In this file you can override any option defined in the reference files.
# Copy in parts of the reference files and modify as you please.
akka {
event-handlers = ["akka.event.slf4j.Slf4jEventHandler"]
loglevel = DEBUG # Options: ERROR, WARNING, INFO, DEBUG
# this level is used by the configured loggers (see "event-handlers") as soon
# as they have been started; before that, see "stdout-loglevel"
stdout-loglevel = DEBUG # Loglevel for the very basic logger activated during AkkaApplication startup
# Comma separated list of the enabled modules.
enabled-modules = ["camel", "remote"]
@ -56,7 +109,9 @@ A custom ``akka.conf`` might look like this::
"sample.myservice.Boot"]
actor {
throughput = 10 # Throughput for Dispatcher, set to 1 for complete fairness
default-dispatcher {
throughput = 10 # Throughput for default Dispatcher, set to 1 for complete fairness
}
}
remote {
@ -68,6 +123,12 @@ A custom ``akka.conf`` might look like this::
.. _-Dakka.mode:
Config file format
------------------
The configuration file syntax is described in the `HOCON <https://github.com/havocp/config/blob/master/HOCON.md>`_
specification. Note that it supports three formats; conf, json, and properties.
Specifying files for different modes
------------------------------------

View file

@ -6,6 +6,7 @@
# Make your edits/overrides in your akka.conf.
akka {
actor {
mailbox {
beanstalk {
hostname = "127.0.0.1"
@ -16,5 +17,6 @@ akka {
message-time-to-live = 120s
}
}
}
}

View file

@ -21,12 +21,9 @@ class BeanstalkBasedMailboxException(message: String) extends AkkaException(mess
*/
class BeanstalkBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner) with DurableMessageSerialization {
val hostname = system.settings.config.getString("akka.actor.mailbox.beanstalk.hostname")
val port = system.settings.config.getInt("akka.actor.mailbox.beanstalk.port")
val reconnectWindow = Duration(system.settings.config.getMilliseconds("akka.actor.mailbox.beanstalk.reconnect-window"), MILLISECONDS).toSeconds.toInt
val messageSubmitDelay = Duration(system.settings.config.getMilliseconds("akka.actor.mailbox.beanstalk.message-submit-delay"), MILLISECONDS).toSeconds.toInt
val messageSubmitTimeout = Duration(system.settings.config.getMilliseconds("akka.actor.mailbox.beanstalk.message-submit-timeout"), MILLISECONDS).toSeconds.toInt
val messageTimeToLive = Duration(system.settings.config.getMilliseconds("akka.actor.mailbox.beanstalk.message-time-to-live"), MILLISECONDS).toSeconds.toInt
private val settings = BeanstalkBasedMailboxExtension(owner.system).settings
private val messageSubmitDelaySeconds = settings.MessageSubmitDelay.toSeconds.toInt
private val messageTimeToLiveSeconds = settings.MessageTimeToLive.toSeconds.toInt
val log = Logging(system, "BeanstalkBasedMailbox")
@ -36,7 +33,7 @@ class BeanstalkBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner)
def enqueue(receiver: ActorRef, envelope: Envelope) {
log.debug("ENQUEUING message in beanstalk-based mailbox [%s]".format(envelope))
Some(queue.get.put(65536, messageSubmitDelay, messageTimeToLive, serialize(envelope)).toInt)
Some(queue.get.put(65536, messageSubmitDelaySeconds, messageTimeToLiveSeconds, serialize(envelope)).toInt)
}
def dequeue(): Envelope = try {
@ -87,15 +84,16 @@ class BeanstalkBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner)
while (!connected) {
attempts += 1
try {
client = new ClientImpl(hostname, port)
client = new ClientImpl(settings.Hostname, settings.Port)
client.useTube(name)
client.watch(name)
connected = true
} catch {
case e: Exception
log.error(e, "Unable to connect to Beanstalk. Retrying in [%s] seconds: %s".format(reconnectWindow, e))
log.error(e, "Unable to connect to Beanstalk. Retrying in [%s] seconds: %s".
format(settings.ReconnectWindow.toSeconds, e))
try {
Thread.sleep(1000 * reconnectWindow)
Thread.sleep(settings.ReconnectWindow.toMillis)
} catch {
case e: InterruptedException {}
}

View file

@ -0,0 +1,58 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor.mailbox
import akka.actor.ActorSystem
import akka.actor.ExtensionKey
import akka.actor.Extension
import akka.actor.ActorSystemImpl
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import com.typesafe.config.ConfigParseOptions
import com.typesafe.config.ConfigRoot
import akka.util.Duration
import java.util.concurrent.TimeUnit.MILLISECONDS
object BeanstalkBasedMailboxExtensionKey extends ExtensionKey[BeanstalkBasedMailboxExtension]
object BeanstalkBasedMailboxExtension {
def apply(system: ActorSystem): BeanstalkBasedMailboxExtension = {
if (!system.hasExtension(BeanstalkBasedMailboxExtensionKey)) {
system.registerExtension(new BeanstalkBasedMailboxExtension)
}
system.extension(BeanstalkBasedMailboxExtensionKey)
}
class Settings(cfg: Config) {
private def referenceConfig: Config =
ConfigFactory.parseResource(classOf[ActorSystem], "/akka-beanstalk-mailbox-reference.conf",
ConfigParseOptions.defaults.setAllowMissing(false))
val config: ConfigRoot = ConfigFactory.emptyRoot("akka-beanstalk-mailbox").withFallback(cfg).withFallback(referenceConfig).resolve()
import config._
val Hostname = getString("akka.actor.mailbox.beanstalk.hostname")
val Port = getInt("akka.actor.mailbox.beanstalk.port")
val ReconnectWindow = Duration(getMilliseconds("akka.actor.mailbox.beanstalk.reconnect-window"), MILLISECONDS)
val MessageSubmitDelay = Duration(getMilliseconds("akka.actor.mailbox.beanstalk.message-submit-delay"), MILLISECONDS)
val MessageSubmitTimeout = Duration(getMilliseconds("akka.actor.mailbox.beanstalk.message-submit-timeout"), MILLISECONDS)
val MessageTimeToLive = Duration(getMilliseconds("akka.actor.mailbox.beanstalk.message-time-to-live"), MILLISECONDS)
}
}
class BeanstalkBasedMailboxExtension extends Extension[BeanstalkBasedMailboxExtension] {
import BeanstalkBasedMailboxExtension._
@volatile
private var _settings: Settings = _
def key = BeanstalkBasedMailboxExtensionKey
def init(system: ActorSystemImpl) {
_settings = new Settings(system.applicationConfig)
}
def settings: Settings = _settings
}

View file

@ -6,6 +6,7 @@
# Make your edits/overrides in your akka.conf.
akka {
actor {
mailbox {
file-based {
directory-path = "./_mb"
@ -13,7 +14,7 @@ akka {
max-size = 2147483647 bytes
max-items = 2147483647
max-item-size = 2147483647 bytes
max-age = 0
max-age = 0s
max-journal-size = 16 megabytes
max-memory-size = 128 megabytes
max-journal-overflow = 10
@ -24,3 +25,4 @@ akka {
}
}
}
}

View file

@ -0,0 +1,65 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor.mailbox
import akka.actor.ActorSystem
import akka.actor.ExtensionKey
import akka.actor.Extension
import akka.actor.ActorSystemImpl
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import com.typesafe.config.ConfigParseOptions
import com.typesafe.config.ConfigRoot
import akka.util.Duration
import java.util.concurrent.TimeUnit.MILLISECONDS
object FileBasedMailboxExtensionKey extends ExtensionKey[FileBasedMailboxExtension]
object FileBasedMailboxExtension {
def apply(system: ActorSystem): FileBasedMailboxExtension = {
if (!system.hasExtension(FileBasedMailboxExtensionKey)) {
system.registerExtension(new FileBasedMailboxExtension)
}
system.extension(FileBasedMailboxExtensionKey)
}
class Settings(cfg: Config) {
private def referenceConfig: Config =
ConfigFactory.parseResource(classOf[ActorSystem], "/akka-file-mailbox-reference.conf",
ConfigParseOptions.defaults.setAllowMissing(false))
val config: ConfigRoot = ConfigFactory.emptyRoot("akka-file-mailbox").withFallback(cfg).withFallback(referenceConfig).resolve()
import config._
val QueuePath = getString("akka.actor.mailbox.file-based.directory-path")
val MaxItems = getInt("akka.actor.mailbox.file-based.max-items")
val MaxSize = getMemorySizeInBytes("akka.actor.mailbox.file-based.max-size")
val MaxItemSize = getMemorySizeInBytes("akka.actor.mailbox.file-based.max-item-size")
val MaxAge = Duration(getMilliseconds("akka.actor.mailbox.file-based.max-age"), MILLISECONDS)
val MaxJournalSize = getMemorySizeInBytes("akka.actor.mailbox.file-based.max-journal-size")
val MaxMemorySize = getMemorySizeInBytes("akka.actor.mailbox.file-based.max-memory-size")
val MaxJournalOverflow = getInt("akka.actor.mailbox.file-based.max-journal-overflow")
val MaxJournalSizeAbsolute = getMemorySizeInBytes("akka.actor.mailbox.file-based.max-journal-size-absolute")
val DiscardOldWhenFull = getBoolean("akka.actor.mailbox.file-based.discard-old-when-full")
val KeepJournal = getBoolean("akka.actor.mailbox.file-based.keep-journal")
val SyncJournal = getBoolean("akka.actor.mailbox.file-based.sync-journal")
}
}
class FileBasedMailboxExtension extends Extension[FileBasedMailboxExtension] {
import FileBasedMailboxExtension._
@volatile
private var _settings: Settings = _
def key = FileBasedMailboxExtensionKey
def init(system: ActorSystemImpl) {
_settings = new Settings(system.applicationConfig)
}
def settings: Settings = _settings
}

View file

@ -9,23 +9,17 @@ import akka.actor.ActorCell
import akka.dispatch.Envelope
import akka.event.Logging
import akka.actor.ActorRef
import com.typesafe.config.Config
object FileBasedMailbox {
def queuePath(config: Config): String = {
config.getString("akka.actor.mailbox.file-based.directory-path") // /var/spool/akka
}
}
class FileBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner) with DurableMessageSerialization {
val log = Logging(system, "FileBasedMailbox")
val queuePath = FileBasedMailbox.queuePath(owner.system.settings.config)
private val settings = FileBasedMailboxExtension(owner.system).settings
val queuePath = settings.QueuePath
private val queue = try {
try { FileUtils.forceMkdir(new java.io.File(queuePath)) } catch { case e {} }
val queue = new filequeue.PersistentQueue(queuePath, name, owner.system.settings.config, log)
val queue = new filequeue.PersistentQueue(queuePath, name, settings, log)
queue.setup // replays journal
queue.discardExpired
queue

View file

@ -20,9 +20,9 @@ package akka.actor.mailbox.filequeue
import java.io._
import scala.collection.mutable
import akka.event.LoggingAdapter
import com.typesafe.config.Config
import akka.util.Duration
import java.util.concurrent.TimeUnit
import akka.actor.mailbox.FileBasedMailboxExtension
// a config value that's backed by a global setting but may be locally overridden
class OverlaySetting[T](base: T) {
@ -34,7 +34,7 @@ class OverlaySetting[T](base: ⇒ T) {
def apply() = local.getOrElse(base)
}
class PersistentQueue(persistencePath: String, val name: String, val config: Config, log: LoggingAdapter) {
class PersistentQueue(persistencePath: String, val name: String, val settings: FileBasedMailboxExtension.Settings, log: LoggingAdapter) {
private case object ItemArrived
@ -125,22 +125,20 @@ class PersistentQueue(persistencePath: String, val name: String, val config: Con
def memoryBytes: Long = synchronized { _memoryBytes }
def inReadBehind = synchronized { journal.inReadBehind }
//FIXME, segment commented out, might have damaged semantics, investigate.
//config.subscribe { c => configure(c.getOrElse(new Config)) }
configure(config)
configure(settings)
def configure(config: Config) = synchronized {
maxItems set Some(config.getInt("akka.actor.mailbox.file-based.max-items"))
maxSize set Some(config.getMemorySizeInBytes("akka.actor.mailbox.file-based.max-size"))
maxItemSize set Some(config.getMemorySizeInBytes("akka.actor.mailbox.file-based.max-item-size"))
maxAge set Some(Duration(config.getMilliseconds("akka.actor.mailbox.file-based.max-age"), TimeUnit.MILLISECONDS).toSeconds.toInt)
maxJournalSize set Some(config.getMemorySizeInBytes("akka.actor.mailbox.file-based.max-journal-size"))
maxMemorySize set Some(config.getMemorySizeInBytes("akka.actor.mailbox.file-based.max-memory-size"))
maxJournalOverflow set Some(config.getInt("akka.actor.mailbox.file-based.max-journal-overflow"))
maxJournalSizeAbsolute set Some(config.getMemorySizeInBytes("akka.actor.mailbox.file-based.max-journal-size-absolute"))
discardOldWhenFull set Some(config.getBoolean("akka.actor.mailbox.file-based.discard-old-when-full"))
keepJournal set Some(config.getBoolean("akka.actor.mailbox.file-based.keep-journal"))
syncJournal set Some(config.getBoolean("akka.actor.mailbox.file-based.sync-journal"))
def configure(settings: FileBasedMailboxExtension.Settings) = synchronized {
maxItems set Some(settings.MaxItems)
maxSize set Some(settings.MaxSize)
maxItemSize set Some(settings.MaxItemSize)
maxAge set Some(settings.MaxAge.toSeconds.toInt)
maxJournalSize set Some(settings.MaxJournalSize)
maxMemorySize set Some(settings.MaxMemorySize)
maxJournalOverflow set Some(settings.MaxJournalOverflow)
maxJournalSizeAbsolute set Some(settings.MaxJournalSizeAbsolute)
discardOldWhenFull set Some(settings.DiscardOldWhenFull)
keepJournal set Some(settings.KeepJournal)
syncJournal set Some(settings.SyncJournal)
log.info("Configuring queue %s: journal=%s, max-items=%s, max-size=%s, max-age=%s, max-journal-size=%s, max-memory-size=%s, max-journal-overflow=%s, max-journal-size-absolute=%s, discard-old-when-full=%s, sync-journal=%s"
.format(
name, keepJournal(), maxItems(), maxSize(), maxAge(), maxJournalSize(), maxMemorySize(),

View file

@ -21,11 +21,11 @@ import java.io.File
import java.util.concurrent.CountDownLatch
import scala.collection.mutable
import akka.event.LoggingAdapter
import com.typesafe.config.Config
import akka.actor.mailbox.FileBasedMailboxExtension
class InaccessibleQueuePath extends Exception("Inaccessible queue path: Must be a directory and writable")
class QueueCollection(queueFolder: String, private var queueConfigs: Config, log: LoggingAdapter) {
class QueueCollection(queueFolder: String, settings: FileBasedMailboxExtension.Settings, log: LoggingAdapter) {
private val path = new File(queueFolder)
if (!path.isDirectory) {
@ -46,13 +46,6 @@ class QueueCollection(queueFolder: String, private var queueConfigs: Config, log
val queueHits = new Counter()
val queueMisses = new Counter()
/* FIXME, segment commented out, might have damaged semantics, investigate.
queueConfigs.subscribe { c =>
synchronized {
queueConfigs = c.getOrElse(new Config)
}
}*/
// preload any queues
def loadQueues() {
path.list() filter { name !(name contains "~~") } map { queue(_) }
@ -79,9 +72,9 @@ class QueueCollection(queueFolder: String, private var queueConfigs: Config, log
val master = name.split('+')(0)
fanout_queues.getOrElseUpdate(master, new mutable.HashSet[String]) += name
log.debug("Fanout queue {} added to {}", name, master)
new PersistentQueue(path.getPath, name, queueConfigs, log)
new PersistentQueue(path.getPath, name, settings, log)
} else {
new PersistentQueue(path.getPath, name, queueConfigs, log)
new PersistentQueue(path.getPath, name, settings, log)
}
q.setup
queues(name) = q

View file

@ -6,7 +6,7 @@ import org.apache.commons.io.FileUtils
class FileBasedMailboxSpec extends DurableMailboxSpec("File", FileDurableMailboxType) {
def clean {
val queuePath = FileBasedMailbox.queuePath(system.settings.config)
val queuePath = FileBasedMailboxExtension(system).settings.QueuePath
FileUtils.deleteDirectory(new java.io.File(queuePath))
}

View file

@ -6,6 +6,7 @@
# Make your edits/overrides in your akka.conf.
akka {
actor {
mailbox {
mongodb {
# Any specified collection name will be used as a prefix for collections that use durable mongo mailboxes
@ -19,3 +20,4 @@ akka {
}
}
}
}

View file

@ -12,8 +12,6 @@ import akka.dispatch.Envelope
import akka.event.Logging
import akka.dispatch.DefaultPromise
import akka.actor.ActorRef
import akka.util.Duration
import java.util.concurrent.TimeUnit
class MongoBasedMailboxException(message: String) extends AkkaException(message)
@ -33,13 +31,7 @@ class MongoBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner) {
implicit val mailboxBSONSer = new BSONSerializableMailbox(system)
implicit val safeWrite = WriteConcern.Safe // TODO - Replica Safe when appropriate!
def config = system.settings.config
val URI_CONFIG_KEY = "akka.actor.mailbox.mongodb.uri"
val WRITE_TIMEOUT_KEY = "akka.actor.mailbox.mongodb.timeout.write"
val READ_TIMEOUT_KEY = "akka.actor.mailbox.mongodb.timeout.read"
val mongoURI = if (config.hasPath(URI_CONFIG_KEY)) Some(config.getString(URI_CONFIG_KEY)) else None
val writeTimeout = Duration(config.getMilliseconds(WRITE_TIMEOUT_KEY), TimeUnit.MILLISECONDS)
val readTimeout = Duration(config.getInt(READ_TIMEOUT_KEY), TimeUnit.MILLISECONDS)
private val settings = MongoBasedMailboxExtension(owner.system).settings
val log = Logging(system, "MongoBasedMailbox")
@ -51,7 +43,7 @@ class MongoBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner) {
/* TODO - Test if a BSON serializer is registered for the message and only if not, use toByteString? */
val durableMessage = MongoDurableMessage(ownerPathString, envelope.message, envelope.sender)
// todo - do we need to filter the actor name at all for safe collection naming?
val result = new DefaultPromise[Boolean](writeTimeout)(dispatcher)
val result = new DefaultPromise[Boolean](settings.WriteTimeout)(dispatcher)
mongo.insert(durableMessage, false)(RequestFutures.write { wr: Either[Throwable, (Option[AnyRef], WriteResult)]
wr match {
case Right((oid, wr)) result.completeWithResult(true)
@ -70,7 +62,7 @@ class MongoBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner) {
* TODO - Should we have a specific query in place? Which way do we sort?
* TODO - Error handling version!
*/
val envelopePromise = new DefaultPromise[Envelope](readTimeout)(dispatcher)
val envelopePromise = new DefaultPromise[Envelope](settings.ReadTimeout)(dispatcher)
mongo.findAndRemove(Document.empty) { doc: Option[MongoDurableMessage]
doc match {
case Some(msg) {
@ -90,7 +82,7 @@ class MongoBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner) {
}
def numberOfMessages: Int = {
val count = new DefaultPromise[Int](readTimeout)(dispatcher)
val count = new DefaultPromise[Int](settings.ReadTimeout)(dispatcher)
mongo.count()(count.completeWithResult)
count.as[Int].getOrElse(-1)
}
@ -99,9 +91,9 @@ class MongoBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner) {
def hasMessages: Boolean = numberOfMessages > 0
private[akka] def connect() = {
require(mongoURI.isDefined, "Mongo URI (%s) must be explicitly defined in akka.conf; will not assume defaults for safety sake.".format(URI_CONFIG_KEY))
log.info("CONNECTING mongodb uri : [{}]", mongoURI)
val _dbh = MongoConnection.fromURI(mongoURI.get) match {
require(settings.MongoURI.isDefined, "Mongo URI (%s) must be explicitly defined in akka.conf; will not assume defaults for safety sake.".format(settings.UriConfigKey))
log.info("CONNECTING mongodb uri : [{}]", settings.MongoURI)
val _dbh = MongoConnection.fromURI(settings.MongoURI.get) match {
case (conn, None, None) {
throw new UnsupportedOperationException("You must specify a database name to use with MongoDB; please see the MongoDB Connection URI Spec: 'http://www.mongodb.org/display/DOCS/Connections'")
}

View file

@ -0,0 +1,56 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor.mailbox
import akka.actor.ActorSystem
import akka.actor.ExtensionKey
import akka.actor.Extension
import akka.actor.ActorSystemImpl
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import com.typesafe.config.ConfigParseOptions
import com.typesafe.config.ConfigRoot
import akka.util.Duration
import java.util.concurrent.TimeUnit.MILLISECONDS
object MongoBasedMailboxExtensionKey extends ExtensionKey[MongoBasedMailboxExtension]
object MongoBasedMailboxExtension {
def apply(system: ActorSystem): MongoBasedMailboxExtension = {
if (!system.hasExtension(MongoBasedMailboxExtensionKey)) {
system.registerExtension(new MongoBasedMailboxExtension)
}
system.extension(MongoBasedMailboxExtensionKey)
}
class Settings(cfg: Config) {
private def referenceConfig: Config =
ConfigFactory.parseResource(classOf[ActorSystem], "/akka-mongo-mailbox-reference.conf",
ConfigParseOptions.defaults.setAllowMissing(false))
val config: ConfigRoot = ConfigFactory.emptyRoot("akka-mongo-mailbox").withFallback(cfg).withFallback(referenceConfig).resolve()
import config._
val UriConfigKey = "akka.actor.mailbox.mongodb.uri"
val MongoURI = if (config.hasPath(UriConfigKey)) Some(config.getString(UriConfigKey)) else None
val WriteTimeout = Duration(config.getMilliseconds("akka.actor.mailbox.mongodb.timeout.write"), MILLISECONDS)
val ReadTimeout = Duration(config.getMilliseconds("akka.actor.mailbox.mongodb.timeout.read"), MILLISECONDS)
}
}
class MongoBasedMailboxExtension extends Extension[MongoBasedMailboxExtension] {
import MongoBasedMailboxExtension._
@volatile
private var _settings: Settings = _
def key = MongoBasedMailboxExtensionKey
def init(system: ActorSystemImpl) {
_settings = new Settings(system.applicationConfig)
}
def settings: Settings = _settings
}

View file

@ -6,6 +6,7 @@
# Make your edits/overrides in your akka.conf.
akka {
actor {
mailbox {
redis {
hostname = "127.0.0.1"
@ -13,3 +14,4 @@ akka {
}
}
}
}

View file

@ -17,6 +17,9 @@ class RedisBasedMailboxException(message: String) extends AkkaException(message)
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class RedisBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner) with DurableMessageSerialization {
private val settings = RedisBasedMailboxExtension(owner.system).settings
@volatile
private var clients = connect() // returns a RedisClientPool for multiple asynchronous message handling
@ -57,9 +60,7 @@ class RedisBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner) with
def hasMessages: Boolean = numberOfMessages > 0 //TODO review find other solution, this will be very expensive
private[akka] def connect() = {
new RedisClientPool(
system.settings.config.getString("akka.actor.mailbox.redis.hostname"),
system.settings.config.getInt("akka.actor.mailbox.redis.port"))
new RedisClientPool(settings.Hostname, settings.Port)
}
private def withErrorHandling[T](body: T): T = {

View file

@ -0,0 +1,52 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor.mailbox
import akka.actor.ActorSystem
import akka.actor.ExtensionKey
import akka.actor.Extension
import akka.actor.ActorSystemImpl
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import com.typesafe.config.ConfigParseOptions
import com.typesafe.config.ConfigRoot
object RedisBasedMailboxExtensionKey extends ExtensionKey[RedisBasedMailboxExtension]
object RedisBasedMailboxExtension {
def apply(system: ActorSystem): RedisBasedMailboxExtension = {
if (!system.hasExtension(RedisBasedMailboxExtensionKey)) {
system.registerExtension(new RedisBasedMailboxExtension)
}
system.extension(RedisBasedMailboxExtensionKey)
}
class Settings(cfg: Config) {
private def referenceConfig: Config =
ConfigFactory.parseResource(classOf[ActorSystem], "/akka-redis-mailbox-reference.conf",
ConfigParseOptions.defaults.setAllowMissing(false))
val config: ConfigRoot = ConfigFactory.emptyRoot("akka-redis-mailbox").withFallback(cfg).withFallback(referenceConfig).resolve()
import config._
val Hostname = getString("akka.actor.mailbox.redis.hostname")
val Port = getInt("akka.actor.mailbox.redis.port")
}
}
class RedisBasedMailboxExtension extends Extension[RedisBasedMailboxExtension] {
import RedisBasedMailboxExtension._
@volatile
private var _settings: Settings = _
def key = RedisBasedMailboxExtensionKey
def init(system: ActorSystemImpl) {
_settings = new Settings(system.applicationConfig)
}
def settings: Settings = _settings
}

View file

@ -6,6 +6,7 @@
# Make your edits/overrides in your akka.conf.
akka {
actor {
mailbox {
zookeeper {
server-addresses = "127.0.0.1:2181"
@ -15,3 +16,4 @@ akka {
}
}
}
}

View file

@ -22,18 +22,17 @@ class ZooKeeperBasedMailboxException(message: String) extends AkkaException(mess
*/
class ZooKeeperBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner) with DurableMessageSerialization {
val zkServerAddresses = system.settings.config.getString("akka.actor.mailbox.zookeeper.server-addresses")
val sessionTimeout = Duration(system.settings.config.getMilliseconds("akka.actor.mailbox.zookeeper.session-timeout"), MILLISECONDS)
val connectionTimeout = Duration(system.settings.config.getMilliseconds("akka.actor.mailbox.zookeeper.connection-timeout"), MILLISECONDS)
val blockingQueue = system.settings.config.getBoolean("akka.actor.mailbox.zookeeper.blocking-queue")
private val settings = ZooKeeperBasedMailboxExtension(owner.system).settings
val queueNode = "/queues"
val queuePathTemplate = queueNode + "/%s"
val log = Logging(system, "ZooKeeperBasedMailbox")
private val zkClient = new AkkaZkClient(zkServerAddresses, sessionTimeout, connectionTimeout)
private val queue = new ZooKeeperQueue[Array[Byte]](zkClient, queuePathTemplate.format(name), blockingQueue)
private val zkClient = new AkkaZkClient(
settings.ZkServerAddresses,
settings.SessionTimeout,
settings.ConnectionTimeout)
private val queue = new ZooKeeperQueue[Array[Byte]](zkClient, queuePathTemplate.format(name), settings.BlockingQueue)
def enqueue(receiver: ActorRef, envelope: Envelope) {
log.debug("ENQUEUING message in zookeeper-based mailbox [%s]".format(envelope))

View file

@ -0,0 +1,56 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor.mailbox
import akka.actor.ActorSystem
import akka.actor.ExtensionKey
import akka.actor.Extension
import akka.actor.ActorSystemImpl
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import com.typesafe.config.ConfigParseOptions
import com.typesafe.config.ConfigRoot
import akka.util.Duration
import java.util.concurrent.TimeUnit.MILLISECONDS
object ZooKeeperBasedMailboxExtensionKey extends ExtensionKey[ZooKeeperBasedMailboxExtension]
object ZooKeeperBasedMailboxExtension {
def apply(system: ActorSystem): ZooKeeperBasedMailboxExtension = {
if (!system.hasExtension(ZooKeeperBasedMailboxExtensionKey)) {
system.registerExtension(new ZooKeeperBasedMailboxExtension)
}
system.extension(ZooKeeperBasedMailboxExtensionKey)
}
class Settings(cfg: Config) {
private def referenceConfig: Config =
ConfigFactory.parseResource(classOf[ActorSystem], "/akka-zookeeper-mailbox-reference.conf",
ConfigParseOptions.defaults.setAllowMissing(false))
val config: ConfigRoot = ConfigFactory.emptyRoot("akka-zookeeper-mailbox").withFallback(cfg).withFallback(referenceConfig).resolve()
import config._
val ZkServerAddresses = getString("akka.actor.mailbox.zookeeper.server-addresses")
val SessionTimeout = Duration(getMilliseconds("akka.actor.mailbox.zookeeper.session-timeout"), MILLISECONDS)
val ConnectionTimeout = Duration(getMilliseconds("akka.actor.mailbox.zookeeper.connection-timeout"), MILLISECONDS)
val BlockingQueue = getBoolean("akka.actor.mailbox.zookeeper.blocking-queue")
}
}
class ZooKeeperBasedMailboxExtension extends Extension[ZooKeeperBasedMailboxExtension] {
import ZooKeeperBasedMailboxExtension._
@volatile
private var _settings: Settings = _
def key = ZooKeeperBasedMailboxExtensionKey
def init(system: ActorSystemImpl) {
_settings = new Settings(system.applicationConfig)
}
def settings: Settings = _settings
}

View file

@ -2785,7 +2785,7 @@ public final class RemoteProtocol {
}
// required uint32 port = 2;
public static final int PORT_FIELD_NUMBER = 2;
public static final int Port_FIELD_NUMBER = 2;
private int port_;
public boolean hasPort() {
return ((bitField0_ & 0x00000002) == 0x00000002);
@ -4602,7 +4602,7 @@ public final class RemoteProtocol {
}
// required uint32 port = 2;
public static final int PORT_FIELD_NUMBER = 2;
public static final int Port_FIELD_NUMBER = 2;
private int port_;
public boolean hasPort() {
return ((bitField0_ & 0x00000002) == 0x00000002);

View file

@ -16,7 +16,7 @@ akka {
secure-cookie = "" # Generate your own with '$AKKA_HOME/scripts/generate_config_with_secure_cookie.sh'
# or using 'akka.util.Crypt.generateSecureCookie'
remote-daemon-ack-timeout = 30 # Timeout for ACK of cluster operations, lik checking actor out etc.
remote-daemon-ack-timeout = 30s # Timeout for ACK of cluster operations, lik checking actor out etc.
use-passive-connections = on # Reuse inbound connections for outbound messages
@ -29,9 +29,10 @@ akka {
}
server {
hostname = "" # The hostname or ip to bind the remoting to, InetAddress.getLocalHost.getHostAddress is used if empty
port = 2552 # The default remote server port clients should connect to. Default is 2552 (AKKA)
message-frame-size = 1048576 # Increase this if you want to be able to send messages with large payloads
connection-timeout = 120 # Length in time-unit
connection-timeout = 120s # Timeout duration
require-cookie = off # Should the remote server require that it peers share the same secure-cookie (defined in the 'remote' section)?
untrusted-mode = off # Enable untrusted mode for full security of server managed actors, allows untrusted clients to connect.
backlog = 4096 # Sets the size of the connection backlog
@ -43,11 +44,32 @@ akka {
capacity = -1 # If negative (or zero) then an unbounded mailbox is used (default)
# If positive then a bounded mailbox is used and the capacity is set using the property
}
reconnect-delay = 5
read-timeout = 3600
reconnect-delay = 5s
read-timeout = 3600s
message-frame-size = 1048576
reap-futures-delay = 5
reconnection-time-window = 600 # Maximum time window that a client should try to reconnect for
reconnection-time-window = 600s # Maximum time window that a client should try to reconnect for
}
}
// TODO cluster config will go into akka-cluster-reference.conf when we enable that module
cluster {
name = "test-cluster"
nodename = ""
zookeeper-server-addresses = "localhost:2181" # comma-separated list of '<hostname>:<port>' elements
max-time-to-wait-until-connected = 30s
session-timeout = 60s
connection-timeout = 60s
include-ref-node-in-replica-set = on # Can a replica be instantiated on the same node as the cluster reference to the actor
# Default: on
log-directory = "_akka_cluster" # Where ZooKeeper should store the logs and data files
replication {
digest-type = "MAC" # Options: CRC32 (cheap & unsafe), MAC (expensive & secure using password)
password = "secret" # FIXME: store open in file?
ensemble-size = 3
quorum-size = 2
snapshot-frequency = 1000 # The number of messages that should be logged between every actor snapshot
timeout = 30s # Timeout for asyncronous (write-behind) operations
}
}

View file

@ -27,8 +27,8 @@ class AccrualFailureDetector(val threshold: Int = 8, val maxSampleSize: Int = 10
def this(system: ActorSystem) {
this(
system.settings.config.getInt("akka.remote.failure-detector.threshold"),
system.settings.config.getInt("akka.remote.failure-detector.max-sample-size"))
RemoteExtension(system).settings.FailureDetectorThreshold,
RemoteExtension(system).settings.FailureDetectorMaxSampleSize)
}
private final val PhiFactor = 1.0 / math.log(10.0)

View file

@ -10,15 +10,13 @@ import akka.event.Logging
import akka.util.duration._
import akka.remote.RemoteProtocol._
import akka.remote.RemoteProtocol.RemoteSystemDaemonMessageType._
import java.util.concurrent.atomic.AtomicReference
import java.security.SecureRandom
import System.{ currentTimeMillis newTimestamp }
import scala.collection.immutable.Map
import scala.annotation.tailrec
import com.google.protobuf.ByteString
import akka.serialization.SerializationExtension
/**
* Interface for node membership change listener.
@ -102,12 +100,14 @@ class Gossiper(remote: Remote) {
nodeMembershipChangeListeners: Set[NodeMembershipChangeListener] = Set.empty[NodeMembershipChangeListener])
private val system = remote.system
private val remoteExtension = RemoteExtension(system)
private val serializationExtension = SerializationExtension(system)
private val log = Logging(system, "Gossiper")
private val failureDetector = remote.failureDetector
private val connectionManager = new RemoteConnectionManager(system, remote, Map.empty[RemoteAddress, ActorRef])
private val seeds = Set(address) // FIXME read in list of seeds from config
private val address = system.rootPath.remoteAddress
private val address = system.asInstanceOf[ActorSystemImpl].provider.rootPath.remoteAddress
private val nodeFingerprint = address.##
private val random = SecureRandom.getInstance("SHA1PRNG")
@ -237,7 +237,7 @@ class Gossiper(remote: Remote) {
throw new IllegalStateException("Connection for [" + peer + "] is not set up"))
try {
(connection ? (toRemoteMessage(newGossip), remote.remoteSystemDaemonAckTimeout)).as[Status] match {
(connection ? (toRemoteMessage(newGossip), remoteExtension.settings.RemoteSystemDaemonAckTimeout)).as[Status] match {
case Some(Success(receiver))
log.debug("Gossip sent to [{}] was successfully received", receiver)
@ -299,7 +299,7 @@ class Gossiper(remote: Remote) {
}
private def toRemoteMessage(gossip: Gossip): RemoteProtocol.RemoteSystemDaemonMessageProtocol = {
val gossipAsBytes = system.serialization.serialize(gossip) match {
val gossipAsBytes = serializationExtension.serialization.serialize(gossip) match {
case Left(error) throw error
case Right(bytes) bytes
}

View file

@ -8,18 +8,19 @@ import akka.remote.RemoteProtocol._
import akka.serialization.Serialization
import com.google.protobuf.ByteString
import akka.actor.ActorSystem
import akka.serialization.SerializationExtension
object MessageSerializer {
def deserialize(system: ActorSystem, messageProtocol: MessageProtocol, classLoader: Option[ClassLoader] = None): AnyRef = {
val clazz = loadManifest(classLoader, messageProtocol)
system.serialization.deserialize(messageProtocol.getMessage.toByteArray,
SerializationExtension(system).serialization.deserialize(messageProtocol.getMessage.toByteArray,
clazz, classLoader).fold(x throw x, identity)
}
def serialize(system: ActorSystem, message: AnyRef): MessageProtocol = {
val builder = MessageProtocol.newBuilder
val bytes = system.serialization.serialize(message).fold(x throw x, identity)
val bytes = SerializationExtension(system).serialization.serialize(message).fold(x throw x, identity)
builder.setMessage(ByteString.copyFrom(bytes))
builder.setMessageManifest(ByteString.copyFromUtf8(message.getClass.getName))
builder.build

View file

@ -21,6 +21,7 @@ import akka.serialization.{ JavaSerializer, Serialization, Serializer, Compressi
import akka.dispatch.{ Terminate, Dispatchers, Future, PinnedDispatcher }
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.TimeUnit.MILLISECONDS
import akka.serialization.SerializationExtension
/**
* Remote module - contains remote client and server config, remote server instance, remote daemon, remote dispatchers etc.
@ -34,9 +35,11 @@ class Remote(val system: ActorSystemImpl, val nodename: String) {
import system._
import settings._
// TODO move to settings?
val shouldCompressData = config.getBoolean("akka.remote.use-compression")
val remoteSystemDaemonAckTimeout = Duration(config.getMilliseconds("akka.remote.remote-daemon-ack-timeout"), MILLISECONDS)
private[remote] val remoteExtension = RemoteExtension(system)
private[remote] val serializationExtension = SerializationExtension(system)
private[remote] val remoteAddress = {
RemoteAddress(remoteExtension.settings.serverSettings.Hostname, remoteExtension.settings.serverSettings.Port)
}
val failureDetector = new AccrualFailureDetector(system)
@ -81,7 +84,7 @@ class Remote(val system: ActorSystemImpl, val nodename: String) {
}
def start(): Unit = {
val serverAddress = server.system.rootPath.remoteAddress //Force init of server
val serverAddress = server.system.asInstanceOf[ActorSystemImpl].provider.rootPath.remoteAddress //Force init of server
val daemonAddress = remoteDaemon.address //Force init of daemon
log.info("Starting remote server on [{}] and starting remoteDaemon with address [{}]", serverAddress, daemonAddress)
}
@ -131,10 +134,10 @@ class RemoteSystemDaemon(remote: Remote) extends Actor {
if (message.hasActorPath) {
val actorFactoryBytes =
if (shouldCompressData) LZF.uncompress(message.getPayload.toByteArray) else message.getPayload.toByteArray
if (remoteExtension.settings.ShouldCompressData) LZF.uncompress(message.getPayload.toByteArray) else message.getPayload.toByteArray
val actorFactory =
system.serialization.deserialize(actorFactoryBytes, classOf[() Actor], None) match {
serializationExtension.serialization.deserialize(actorFactoryBytes, classOf[() Actor], None) match {
case Left(error) throw error
case Right(instance) instance.asInstanceOf[() Actor]
}
@ -152,12 +155,13 @@ class RemoteSystemDaemon(remote: Remote) extends Actor {
log.error("Actor 'address' for actor to instantiate is not defined, ignoring remote system daemon command [{}]", message)
}
sender ! Success(systemImpl.address)
sender ! Success(remoteAddress)
} catch {
case error: Throwable //FIXME doesn't seem sensible
sender ! Failure(error)
throw error
}
}
// FIXME implement handleRelease
@ -230,7 +234,7 @@ class RemoteSystemDaemon(remote: Remote) extends Actor {
}
private def payloadFor[T](message: RemoteSystemDaemonMessageProtocol, clazz: Class[T]): T = {
system.serialization.deserialize(message.getPayload.toByteArray, clazz, None) match {
serializationExtension.serialization.deserialize(message.getPayload.toByteArray, clazz, None) match {
case Left(error) throw error
case Right(instance) instance.asInstanceOf[T]
}

View file

@ -21,6 +21,8 @@ import java.util.concurrent.atomic.AtomicBoolean
import akka.event.EventStream
import java.util.concurrent.ConcurrentHashMap
import akka.dispatch.Promise
import java.net.InetAddress
import akka.serialization.SerializationExtension
/**
* Remote ActorRefProvider. Starts up actor on remote node and creates a RemoteActorRef representing it.
@ -29,33 +31,43 @@ import akka.dispatch.Promise
*/
class RemoteActorRefProvider(
val settings: ActorSystem.Settings,
val rootPath: ActorPath,
val eventStream: EventStream,
val dispatcher: MessageDispatcher,
val scheduler: Scheduler) extends ActorRefProvider {
val log = Logging(eventStream, "RemoteActorRefProvider")
val local = new LocalActorRefProvider(settings, rootPath, eventStream, dispatcher, scheduler)
def deathWatch = local.deathWatch
def guardian = local.guardian
def systemGuardian = local.systemGuardian
def nodename = local.nodename
def clustername = local.clustername
def tempName = local.tempName
@volatile
var remote: Remote = _
private val actors = new ConcurrentHashMap[String, AnyRef]
/*
* The problem is that ActorRefs need a reference to the ActorSystem to
* provide their service. Hence they cannot be created while the
* constructors of ActorSystem and ActorRefProvider are still running.
* The solution is to split out that last part into an init() method,
* but it also requires these references to be @volatile and lazy.
*/
@volatile
private var remoteDaemonConnectionManager: RemoteConnectionManager = _
private var system: ActorSystemImpl = _
private lazy val remoteExtension = RemoteExtension(system)
private lazy val serializationExtension = SerializationExtension(system)
lazy val rootPath: ActorPath = {
val remoteAddress = RemoteAddress(remoteExtension.settings.serverSettings.Hostname, remoteExtension.settings.serverSettings.Port)
new RootActorPath(remoteAddress)
}
private lazy val local = new LocalActorRefProvider(settings, eventStream, scheduler, rootPath,
remoteExtension.settings.NodeName, remoteExtension.settings.ClusterName)
private[akka] lazy val remote = new Remote(system, nodename)
private lazy val remoteDaemonConnectionManager = new RemoteConnectionManager(system, remote)
def init(system: ActorSystemImpl) {
local.init(system)
remote = new Remote(system, nodename)
remoteDaemonConnectionManager = new RemoteConnectionManager(system, remote)
def init(_system: ActorSystemImpl) {
system = _system
local.init(_system)
terminationFuture.onComplete(_ remote.server.shutdown())
}
@ -64,7 +76,7 @@ class RemoteActorRefProvider(
private[akka] def deployer: Deployer = local.deployer
def defaultDispatcher = dispatcher
def dispatcher = local.dispatcher
def defaultTimeout = settings.ActorTimeout
private[akka] def actorOf(system: ActorSystemImpl, props: Props, supervisor: ActorRef, name: String, systemService: Boolean): ActorRef =
@ -74,7 +86,7 @@ class RemoteActorRefProvider(
if (systemService) local.actorOf(system, props, supervisor, path, systemService)
else {
val name = path.name
val newFuture = Promise[ActorRef](5000)(defaultDispatcher) // FIXME is this proper timeout?
val newFuture = Promise[ActorRef](5000)(dispatcher) // FIXME is this proper timeout?
actors.putIfAbsent(path.toString, newFuture) match { // we won the race -- create the actor and resolve the future
case null
@ -90,7 +102,7 @@ class RemoteActorRefProvider(
// case FailureDetectorType.Custom(implClass) FailureDetector.createCustomFailureDetector(implClass)
// }
def isReplicaNode: Boolean = remoteAddresses exists { _ == system.address }
def isReplicaNode: Boolean = remoteAddresses exists { _ == rootPath.remoteAddress }
//system.eventHandler.debug(this, "%s: Deploy Remote Actor with address [%s] connected to [%s]: isReplica(%s)".format(system.defaultAddress, address, remoteAddresses.mkString, isReplicaNode))
@ -123,7 +135,7 @@ class RemoteActorRefProvider(
if (remoteAddresses.size < 1) throw new ConfigurationException(
"Actor [%s] configured with ScatterGather router must have at least 1 remote node configured. Found [%s]"
.format(name, remoteAddresses.mkString(", ")))
() new ScatterGatherFirstCompletedRouter()(defaultDispatcher, defaultTimeout)
() new ScatterGatherFirstCompletedRouter()(dispatcher, defaultTimeout)
case RouterType.LeastCPU sys.error("Router LeastCPU not supported yet")
case RouterType.LeastRAM sys.error("Router LeastRAM not supported yet")
@ -207,9 +219,9 @@ class RemoteActorRefProvider(
log.debug("[{}] Instantiating Actor [{}] on node [{}]", rootPath, actorPath, remoteAddress)
val actorFactoryBytes =
system.serialization.serialize(actorFactory) match {
serializationExtension.serialization.serialize(actorFactory) match {
case Left(error) throw error
case Right(bytes) if (remote.shouldCompressData) LZF.compress(bytes) else bytes
case Right(bytes) if (remoteExtension.settings.ShouldCompressData) LZF.compress(bytes) else bytes
}
val command = RemoteSystemDaemonMessageProtocol.newBuilder
@ -229,7 +241,7 @@ class RemoteActorRefProvider(
private def sendCommandToRemoteNode(connection: ActorRef, command: RemoteSystemDaemonMessageProtocol, withACK: Boolean) {
if (withACK) {
try {
val f = connection ? (command, remote.remoteSystemDaemonAckTimeout)
val f = connection ? (command, remoteExtension.settings.RemoteSystemDaemonAckTimeout)
(try f.await.value catch { case _: FutureTimeoutException None }) match {
case Some(Right(receiver))
log.debug("Remote system command sent to [{}] successfully received", receiver)

View file

@ -1,46 +0,0 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote
import akka.util.Duration
import akka.config.ConfigurationException
import java.util.concurrent.TimeUnit.MILLISECONDS
import com.typesafe.config.Config
class RemoteClientSettings(config: Config) {
val SECURE_COOKIE: Option[String] = config.getString("akka.remote.secure-cookie") match {
case "" None
case cookie Some(cookie)
}
val RECONNECTION_TIME_WINDOW = Duration(config.getMilliseconds("akka.remote.client.reconnection-time-window"), MILLISECONDS).toMillis
val READ_TIMEOUT = Duration(config.getMilliseconds("akka.remote.client.read-timeout"), MILLISECONDS)
val RECONNECT_DELAY = Duration(config.getMilliseconds("akka.remote.client.reconnect-delay"), MILLISECONDS)
val MESSAGE_FRAME_SIZE = config.getInt("akka.remote.client.message-frame-size")
}
class RemoteServerSettings(config: Config) {
import scala.collection.JavaConverters._
val isRemotingEnabled = config.getStringList("akka.enabled-modules").asScala.exists(_ == "cluster") //TODO FIXME Shouldn't this be "remote"?
val MESSAGE_FRAME_SIZE = config.getInt("akka.remote.server.message-frame-size")
val SECURE_COOKIE: Option[String] = config.getString("akka.remote.secure-cookie") match {
case "" None
case cookie Some(cookie)
}
val REQUIRE_COOKIE = {
val requireCookie = config.getBoolean("akka.remote.server.require-cookie")
if (isRemotingEnabled && requireCookie && SECURE_COOKIE.isEmpty) throw new ConfigurationException(
"Configuration option 'akka.remote.server.require-cookie' is turned on but no secure cookie is defined in 'akka.remote.secure-cookie'.")
requireCookie
}
val USE_PASSIVE_CONNECTIONS = config.getBoolean("akka.remote.use-passive-connections")
val UNTRUSTED_MODE = config.getBoolean("akka.remote.server.untrusted-mode")
val PORT = config.getInt("akka.remote.server.port")
val CONNECTION_TIMEOUT = Duration(config.getMilliseconds("akka.remote.server.connection-timeout"), MILLISECONDS)
val BACKLOG = config.getInt("akka.remote.server.backlog")
}

View file

@ -0,0 +1,110 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote
import akka.actor.ActorSystem
import akka.actor.ExtensionKey
import akka.actor.Extension
import akka.actor.ActorSystemImpl
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import com.typesafe.config.ConfigParseOptions
import com.typesafe.config.ConfigRoot
import akka.util.Duration
import java.util.concurrent.TimeUnit.MILLISECONDS
import java.net.InetAddress
import akka.config.ConfigurationException
import com.eaio.uuid.UUID
object RemoteExtensionKey extends ExtensionKey[RemoteExtension]
object RemoteExtension {
def apply(system: ActorSystem): RemoteExtension = {
if (!system.hasExtension(RemoteExtensionKey)) {
system.registerExtension(new RemoteExtension)
}
system.extension(RemoteExtensionKey)
}
class Settings(cfg: Config) {
private def referenceConfig: Config =
ConfigFactory.parseResource(classOf[ActorSystem], "/akka-remote-reference.conf",
ConfigParseOptions.defaults.setAllowMissing(false))
val config: ConfigRoot = ConfigFactory.emptyRoot("akka-remote").withFallback(cfg).withFallback(referenceConfig).resolve()
import config._
val RemoteTransport = getString("akka.remote.layer")
val FailureDetectorThreshold = getInt("akka.remote.failure-detector.threshold")
val FailureDetectorMaxSampleSize = getInt("akka.remote.failure-detector.max-sample-size")
val ShouldCompressData = config.getBoolean("akka.remote.use-compression")
val RemoteSystemDaemonAckTimeout = Duration(config.getMilliseconds("akka.remote.remote-daemon-ack-timeout"), MILLISECONDS)
// TODO cluster config will go into akka-cluster-reference.conf when we enable that module
val ClusterName = getString("akka.cluster.name")
val NodeName: String = config.getString("akka.cluster.nodename") match {
case "" new UUID().toString
case value value
}
val serverSettings = new RemoteServerSettings
val clientSettings = new RemoteClientSettings
class RemoteClientSettings {
val SecureCookie: Option[String] = config.getString("akka.remote.secure-cookie") match {
case "" None
case cookie Some(cookie)
}
val ReconnectionTimeWindow = Duration(config.getMilliseconds("akka.remote.client.reconnection-time-window"), MILLISECONDS)
val ReadTimeout = Duration(config.getMilliseconds("akka.remote.client.read-timeout"), MILLISECONDS)
val ReconnectDelay = Duration(config.getMilliseconds("akka.remote.client.reconnect-delay"), MILLISECONDS)
val MessageFrameSize = config.getInt("akka.remote.client.message-frame-size")
}
class RemoteServerSettings {
import scala.collection.JavaConverters._
val MessageFrameSize = config.getInt("akka.remote.server.message-frame-size")
val SecureCookie: Option[String] = config.getString("akka.remote.secure-cookie") match {
case "" None
case cookie Some(cookie)
}
val RequireCookie = {
val requireCookie = config.getBoolean("akka.remote.server.require-cookie")
if (requireCookie && SecureCookie.isEmpty) throw new ConfigurationException(
"Configuration option 'akka.remote.server.require-cookie' is turned on but no secure cookie is defined in 'akka.remote.secure-cookie'.")
requireCookie
}
val UsePassiveConnections = config.getBoolean("akka.remote.use-passive-connections")
val UntrustedMode = config.getBoolean("akka.remote.server.untrusted-mode")
val Hostname = config.getString("akka.remote.server.hostname") match {
case "" InetAddress.getLocalHost.getHostAddress
case value value
}
val Port = config.getInt("akka.remote.server.port")
val ConnectionTimeout = Duration(config.getMilliseconds("akka.remote.server.connection-timeout"), MILLISECONDS)
val Backlog = config.getInt("akka.remote.server.backlog")
}
}
}
class RemoteExtension extends Extension[RemoteExtension] {
import RemoteExtension._
@volatile
private var _settings: Settings = _
def key = RemoteExtensionKey
def init(system: ActorSystemImpl) {
_settings = new Settings(system.applicationConfig)
}
def settings: Settings = _settings
}

View file

@ -25,6 +25,7 @@ import akka.actor.ActorSystem
import akka.event.Logging
import locks.ReentrantReadWriteLock
import org.jboss.netty.channel._
import akka.actor.ActorSystemImpl
class RemoteClientMessageBufferException(message: String, cause: Throwable = null) extends AkkaException(message, cause) {
def this(msg: String) = this(msg, null)
@ -140,6 +141,8 @@ class ActiveRemoteClient private[akka] (
def currentChannel = connection.getChannel
private val senderRemoteAddress = remoteSupport.system.asInstanceOf[ActorSystemImpl].provider.rootPath.remoteAddress
/**
* Connect to remote server.
*/
@ -147,9 +150,11 @@ class ActiveRemoteClient private[akka] (
def sendSecureCookie(connection: ChannelFuture) {
val handshake = RemoteControlProtocol.newBuilder.setCommandType(CommandType.CONNECT)
if (SECURE_COOKIE.nonEmpty) handshake.setCookie(SECURE_COOKIE.get)
val addr = remoteSupport.system.rootPath.remoteAddress
handshake.setOrigin(RemoteProtocol.AddressProtocol.newBuilder.setHostname(addr.hostname).setPort(addr.port).build)
if (SecureCookie.nonEmpty) handshake.setCookie(SecureCookie.get)
handshake.setOrigin(RemoteProtocol.AddressProtocol.newBuilder
.setHostname(senderRemoteAddress.hostname)
.setPort(senderRemoteAddress.port)
.build)
connection.getChannel.write(remoteSupport.createControlEnvelope(handshake.build))
}
@ -230,7 +235,7 @@ class ActiveRemoteClient private[akka] (
reconnectionTimeWindowStart = System.currentTimeMillis
true
} else {
val timeLeft = (RECONNECTION_TIME_WINDOW - (System.currentTimeMillis - reconnectionTimeWindowStart)) > 0
val timeLeft = (ReconnectionTimeWindow.toMillis - (System.currentTimeMillis - reconnectionTimeWindowStart)) > 0
if (timeLeft)
log.info("Will try to reconnect to remote server for another [{}] milliseconds", timeLeft)
@ -254,8 +259,8 @@ class ActiveRemoteClientPipelineFactory(
import client.remoteSupport.clientSettings._
def getPipeline: ChannelPipeline = {
val timeout = new ReadTimeoutHandler(timer, READ_TIMEOUT.length, READ_TIMEOUT.unit)
val lenDec = new LengthFieldBasedFrameDecoder(MESSAGE_FRAME_SIZE, 0, 4, 0, 4)
val timeout = new ReadTimeoutHandler(timer, ReadTimeout.length, ReadTimeout.unit)
val lenDec = new LengthFieldBasedFrameDecoder(MessageFrameSize, 0, 4, 0, 4)
val lenPrep = new LengthFieldPrepender(4)
val protobufDec = new ProtobufDecoder(AkkaRemoteProtocol.getDefaultInstance)
val protobufEnc = new ProtobufEncoder
@ -311,7 +316,7 @@ class ActiveRemoteClientHandler(
client.connect(reconnectIfAlreadyConnected = true)
}
}
}, client.remoteSupport.clientSettings.RECONNECT_DELAY.toMillis, TimeUnit.MILLISECONDS)
}, client.remoteSupport.clientSettings.ReconnectDelay.toMillis, TimeUnit.MILLISECONDS)
} else runOnceNow {
client.remoteSupport.shutdownClientConnection(remoteAddress) // spawn in another thread
}
@ -353,8 +358,8 @@ class ActiveRemoteClientHandler(
class NettyRemoteSupport(_system: ActorSystem) extends RemoteSupport(_system) with RemoteMarshallingOps {
val log = Logging(system, "NettyRemoteSupport")
val serverSettings = new RemoteServerSettings(system.settings.config)
val clientSettings = new RemoteClientSettings(system.settings.config)
val serverSettings = RemoteExtension(system).settings.serverSettings
val clientSettings = RemoteExtension(system).settings.clientSettings
private val remoteClients = new HashMap[RemoteAddress, RemoteClient]
private val clientsLock = new ReentrantReadWriteLock
@ -449,7 +454,7 @@ class NettyRemoteSupport(_system: ActorSystem) extends RemoteSupport(_system) wi
def name = currentServer.get match {
case Some(server) server.name
case None "Non-running NettyRemoteServer@" + system.rootPath.remoteAddress
case None "Non-running NettyRemoteServer@" + system.asInstanceOf[ActorSystemImpl].provider.rootPath.remoteAddress
}
private val _isRunning = new Switch(false)
@ -484,7 +489,7 @@ class NettyRemoteServer(val remoteSupport: NettyRemoteSupport, val loader: Optio
val log = Logging(remoteSupport.system, "NettyRemoteServer")
import remoteSupport.serverSettings._
val address = remoteSupport.system.rootPath.remoteAddress
val address = remoteSupport.system.asInstanceOf[ActorSystemImpl].provider.rootPath.remoteAddress
val name = "NettyRemoteServer@" + address
@ -497,11 +502,11 @@ class NettyRemoteServer(val remoteSupport: NettyRemoteSupport, val loader: Optio
val pipelineFactory = new RemoteServerPipelineFactory(name, openChannels, loader, remoteSupport)
bootstrap.setPipelineFactory(pipelineFactory)
bootstrap.setOption("backlog", BACKLOG)
bootstrap.setOption("backlog", Backlog)
bootstrap.setOption("child.tcpNoDelay", true)
bootstrap.setOption("child.keepAlive", true)
bootstrap.setOption("child.reuseAddress", true)
bootstrap.setOption("child.connectTimeoutMillis", CONNECTION_TIMEOUT.toMillis)
bootstrap.setOption("child.connectTimeoutMillis", ConnectionTimeout.toMillis)
openChannels.add(bootstrap.bind(new InetSocketAddress(address.hostname, address.port)))
remoteSupport.notifyListeners(RemoteServerStarted(remoteSupport))
@ -510,8 +515,8 @@ class NettyRemoteServer(val remoteSupport: NettyRemoteSupport, val loader: Optio
try {
val shutdownSignal = {
val b = RemoteControlProtocol.newBuilder.setCommandType(CommandType.SHUTDOWN)
if (SECURE_COOKIE.nonEmpty)
b.setCookie(SECURE_COOKIE.get)
if (SecureCookie.nonEmpty)
b.setCookie(SecureCookie.get)
b.build
}
openChannels.write(remoteSupport.createControlEnvelope(shutdownSignal)).awaitUninterruptibly
@ -537,12 +542,12 @@ class RemoteServerPipelineFactory(
import remoteSupport.serverSettings._
def getPipeline: ChannelPipeline = {
val lenDec = new LengthFieldBasedFrameDecoder(MESSAGE_FRAME_SIZE, 0, 4, 0, 4)
val lenDec = new LengthFieldBasedFrameDecoder(MessageFrameSize, 0, 4, 0, 4)
val lenPrep = new LengthFieldPrepender(4)
val protobufDec = new ProtobufDecoder(AkkaRemoteProtocol.getDefaultInstance)
val protobufEnc = new ProtobufEncoder
val authenticator = if (REQUIRE_COOKIE) new RemoteServerAuthenticationHandler(SECURE_COOKIE) :: Nil else Nil
val authenticator = if (RequireCookie) new RemoteServerAuthenticationHandler(SecureCookie) :: Nil else Nil
val remoteServer = new RemoteServerHandler(name, openChannels, loader, remoteSupport)
val stages: List[ChannelHandler] = lenDec :: protobufDec :: lenPrep :: protobufEnc :: authenticator ::: remoteServer :: Nil
new StaticChannelPipeline(stages: _*)
@ -619,7 +624,7 @@ class RemoteServerHandler(
override def channelClosed(ctx: ChannelHandlerContext, event: ChannelStateEvent) = getClientAddress(ctx.getChannel) match {
case s @ Some(address)
if (USE_PASSIVE_CONNECTIONS)
if (UsePassiveConnections)
remoteSupport.unbindClient(address)
remoteSupport.notifyListeners(RemoteServerClientClosed(remoteSupport, s))
case None
@ -629,12 +634,12 @@ class RemoteServerHandler(
override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) = try {
event.getMessage match {
case remote: AkkaRemoteProtocol if remote.hasMessage
remoteSupport.receiveMessage(new RemoteMessage(remote.getMessage, remoteSupport, applicationLoader), UNTRUSTED_MODE)
remoteSupport.receiveMessage(new RemoteMessage(remote.getMessage, remoteSupport, applicationLoader), UntrustedMode)
case remote: AkkaRemoteProtocol if remote.hasInstruction
val instruction = remote.getInstruction
instruction.getCommandType match {
case CommandType.CONNECT if USE_PASSIVE_CONNECTIONS
case CommandType.CONNECT if UsePassiveConnections
val origin = instruction.getOrigin
val inbound = RemoteAddress(origin.getHostname, origin.getPort)
val client = new PassiveRemoteClient(event.getChannel, remoteSupport, inbound)

View file

@ -1 +1 @@
-Dakka.cluster.nodename=node1 -Dakka.remote.hostname=localhost -Dakka.remote.port=9991
-Dakka.cluster.nodename=node1 -Dakka.remote.server.hostname=localhost -Dakka.remote.server.port=9991

View file

@ -1 +1 @@
-Dakka.cluster.nodename=node2 -Dakka.remote.hostname=localhost -Dakka.remote.port=9992
-Dakka.cluster.nodename=node2 -Dakka.remote.server.hostname=localhost -Dakka.remote.server.port=9992

View file

@ -1 +1 @@
-Dakka.cluster.nodename=node1 -Dakka.remote.hostname=localhost -Dakka.remote.port=9991
-Dakka.cluster.nodename=node1 -Dakka.remote.server.hostname=localhost -Dakka.remote.server.port=9991

View file

@ -1 +1 @@
-Dakka.cluster.nodename=node2 -Dakka.remote.hostname=localhost -Dakka.remote.port=9992
-Dakka.cluster.nodename=node2 -Dakka.remote.server.hostname=localhost -Dakka.remote.server.port=9992

View file

@ -1 +1 @@
-Dakka.cluster.nodename=node1 -Dakka.remote.hostname=localhost -Dakka.remote.port=9991
-Dakka.cluster.nodename=node1 -Dakka.remote.server.hostname=localhost -Dakka.remote.server.port=9991

View file

@ -1 +1 @@
-Dakka.cluster.nodename=node2 -Dakka.remote.hostname=localhost -Dakka.remote.port=9992
-Dakka.cluster.nodename=node2 -Dakka.remote.server.hostname=localhost -Dakka.remote.server.port=9992

View file

@ -1 +1 @@
-Dakka.cluster.nodename=node3 -Dakka.remote.hostname=localhost -Dakka.remote.port=9993
-Dakka.cluster.nodename=node3 -Dakka.remote.server.hostname=localhost -Dakka.remote.server.port=9993

View file

@ -1 +1 @@
-Dakka.cluster.nodename=node4 -Dakka.remote.hostname=localhost -Dakka.remote.port=9994
-Dakka.cluster.nodename=node4 -Dakka.remote.server.hostname=localhost -Dakka.remote.server.port=9994

View file

@ -1 +1 @@
-Dakka.cluster.nodename=node1 -Dakka.remote.hostname=localhost -Dakka.remote.port=9991
-Dakka.cluster.nodename=node1 -Dakka.remote.server.hostname=localhost -Dakka.remote.server.port=9991

View file

@ -1 +1 @@
-Dakka.cluster.nodename=node2 -Dakka.remote.hostname=localhost -Dakka.remote.port=9992
-Dakka.cluster.nodename=node2 -Dakka.remote.server.hostname=localhost -Dakka.remote.server.port=9992

View file

@ -1 +1 @@
-Dakka.cluster.nodename=node3 -Dakka.remote.hostname=localhost -Dakka.remote.port=9993
-Dakka.cluster.nodename=node3 -Dakka.remote.server.hostname=localhost -Dakka.remote.server.port=9993

View file

@ -1 +1 @@
-Dakka.cluster.nodename=node4 -Dakka.remote.hostname=localhost -Dakka.remote.port=9994
-Dakka.cluster.nodename=node4 -Dakka.remote.server.hostname=localhost -Dakka.remote.server.port=9994

View file

@ -1 +1 @@
-Dakka.cluster.nodename=node1 -Dakka.remote.hostname=localhost -Dakka.remote.port=9991
-Dakka.cluster.nodename=node1 -Dakka.remote.server.hostname=localhost -Dakka.remote.server.port=9991

View file

@ -1 +1 @@
-Dakka.cluster.nodename=node2 -Dakka.remote.hostname=localhost -Dakka.remote.port=9992
-Dakka.cluster.nodename=node2 -Dakka.remote.server.hostname=localhost -Dakka.remote.server.port=9992

View file

@ -1 +1 @@
-Dakka.cluster.nodename=node3 -Dakka.remote.hostname=localhost -Dakka.remote.port=9993
-Dakka.cluster.nodename=node3 -Dakka.remote.server.hostname=localhost -Dakka.remote.server.port=9993

View file

@ -1 +1 @@
-Dakka.cluster.nodename=node4 -Dakka.remote.hostname=localhost -Dakka.remote.port=9994
-Dakka.cluster.nodename=node4 -Dakka.remote.server.hostname=localhost -Dakka.remote.server.port=9994

View file

@ -1,17 +1,32 @@
package akka.actor
package akka.remote
import akka.testkit.AkkaSpec
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class ClusterSpec extends AkkaSpec {
class RemoteConfigSpec extends AkkaSpec {
"ClusterSpec: A Deployer" must {
"be able to parse 'akka.actor.cluster._' config elements" in {
// TODO: make it use its own special config?
val config = system.settings.config
val config = RemoteExtension(system).settings.config
import config._
//akka.remote.server
getInt("akka.remote.server.port") must equal(2552)
getInt("akka.remote.server.message-frame-size") must equal(1048576)
getMilliseconds("akka.remote.server.connection-timeout") must equal(120 * 1000)
getBoolean("akka.remote.server.require-cookie") must equal(false)
getBoolean("akka.remote.server.untrusted-mode") must equal(false)
getInt("akka.remote.server.backlog") must equal(4096)
//akka.remote.client
getBoolean("akka.remote.client.buffering.retry-message-send-on-failure") must equal(false)
getInt("akka.remote.client.buffering.capacity") must equal(-1)
getMilliseconds("akka.remote.client.reconnect-delay") must equal(5 * 1000)
getMilliseconds("akka.remote.client.read-timeout") must equal(3600 * 1000)
getMilliseconds("akka.remote.client.reconnection-time-window") must equal(600 * 1000)
// TODO cluster config will go into akka-cluster-reference.conf when we enable that module
//akka.cluster
getString("akka.cluster.name") must equal("test-cluster")
getString("akka.cluster.zookeeper-server-addresses") must equal("localhost:2181")
@ -34,21 +49,6 @@ class ClusterSpec extends AkkaSpec {
getInt("akka.cluster.replication.snapshot-frequency") must equal(1000)
getMilliseconds("akka.cluster.replication.timeout") must equal(30 * 1000)
//akka.remote.server
getInt("akka.remote.server.port") must equal(2552)
getInt("akka.remote.server.message-frame-size") must equal(1048576)
getMilliseconds("akka.remote.server.connection-timeout") must equal(120 * 1000)
getBoolean("akka.remote.server.require-cookie") must equal(false)
getBoolean("akka.remote.server.untrusted-mode") must equal(false)
getInt("akka.remote.server.backlog") must equal(4096)
//akka.remote.client
getBoolean("akka.remote.client.buffering.retry-message-send-on-failure") must equal(false)
getInt("akka.remote.client.buffering.capacity") must equal(-1)
getMilliseconds("akka.remote.client.reconnect-delay") must equal(5 * 1000)
getMilliseconds("akka.remote.client.read-timeout") must equal(3600 * 1000)
getMilliseconds("akka.remote.client.reap-futures-delay") must equal(5 * 1000)
getMilliseconds("akka.remote.client.reconnection-time-window") must equal(600 * 1000)
}
}
}

View file

@ -2,15 +2,15 @@ package akka.agent.test
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
import akka.actor.ActorSystem
import akka.actor.Timeout
import akka.agent.Agent
import akka.stm._
import akka.util.Duration
import akka.util.duration._
import java.util.concurrent.CountDownLatch
import akka.testkit.AkkaSpec
import akka.testkit._
class CountDownFunction[A](num: Int = 1) extends Function1[A, A] {
val latch = new CountDownLatch(num)
@ -18,12 +18,11 @@ class CountDownFunction[A](num: Int = 1) extends Function1[A, A] {
def await(timeout: Duration) = latch.await(timeout.length, timeout.unit)
}
class AgentSpec extends WordSpec with MustMatchers {
class AgentSpec extends AkkaSpec {
implicit val system = ActorSystem("AgentSpec")
implicit val timeout = Timeout(5.seconds.dilated)
"Agent" should {
"Agent" must {
"update with send dispatches in order sent" in {
val countDown = new CountDownFunction[String]

View file

@ -7,8 +7,8 @@
akka {
test {
timefactor = "1.0" # factor by which to scale timeouts during tests, e.g. to account for shared build system load
filter-leeway = 3 # time-units EventFilter.intercept waits after the block is finished until all required messages are received
single-expect-default = 3 # time-units to wait in expectMsg and friends outside of within() block by default
timefactor = 1.0 # factor by which to scale timeouts during tests, e.g. to account for shared build system load
filter-leeway = 3s # duration of EventFilter.intercept waits after the block is finished until all required messages are received
single-expect-default = 3s # duration to wait in expectMsg and friends outside of within() block by default
}
}

View file

@ -33,7 +33,7 @@ class TestBarrier(count: Int) {
} catch {
case e: TimeoutException
throw new TestBarrierTimeoutException("Timeout of %s and time factor of %s"
format (timeout.toString, system.settings.TestTimeFactor))
format (timeout.toString, TestKitExtension(system).settings.TestTimeFactor))
}
}

View file

@ -81,11 +81,12 @@ abstract class EventFilter(occurrences: Int) {
*/
def intercept[T](code: T)(implicit system: ActorSystem): T = {
system.eventStream publish TestEvent.Mute(this)
val testKitExtension = TestKitExtension(system)
try {
val result = code
if (!awaitDone(system.settings.TestEventFilterLeeway))
if (!awaitDone(testKitExtension.settings.TestEventFilterLeeway))
if (todo > 0)
throw new AssertionError("Timeout (" + system.settings.TestEventFilterLeeway + ") waiting for " + todo + " messages on " + this)
throw new AssertionError("Timeout (" + testKitExtension.settings.TestEventFilterLeeway + ") waiting for " + todo + " messages on " + this)
else
throw new AssertionError("Received " + (-todo) + " messages too many on " + this)
result

View file

@ -71,7 +71,7 @@ class TestActor(queue: BlockingDeque[TestActor.Message]) extends Actor {
*
* It should be noted that for CI servers and the like all maximum Durations
* are scaled using their Duration.dilated method, which uses the
* Duration.timeFactor settable via akka.conf entry "akka.test.timefactor".
* TestKitExtension.Settings.TestTimeFactor settable via akka.conf entry "akka.test.timefactor".
*
* @author Roland Kuhn
* @since 1.1
@ -81,6 +81,7 @@ class TestKit(_system: ActorSystem) {
import TestActor.{ Message, RealMessage, NullMessage }
implicit val system = _system
val testKitExtension = TestKitExtension(system)
private val queue = new LinkedBlockingDeque[Message]()
private[akka] var lastMessage: Message = NullMessage
@ -127,7 +128,7 @@ class TestKit(_system: ActorSystem) {
* block or missing that it returns the properly dilated default for this
* case from settings (key "akka.test.single-expect-default").
*/
def remaining: Duration = if (end == Duration.Undefined) system.settings.SingleExpectDefaultTimeout.dilated else end - now
def remaining: Duration = if (end == Duration.Undefined) testKitExtension.settings.SingleExpectDefaultTimeout.dilated else end - now
/**
* Query queue status.
@ -141,7 +142,8 @@ class TestKit(_system: ActorSystem) {
* If no timeout is given, take it from the innermost enclosing `within`
* block.
*
* Note that the timeout is scaled using Duration.timeFactor.
* Note that the timeout is scaled using Duration.dilated,
* which uses the configuration entry "akka.test.timefactor".
*/
def awaitCond(p: Boolean, max: Duration = Duration.Undefined, interval: Duration = 100.millis) {
val _max = if (max eq Duration.Undefined) remaining else max.dilated
@ -165,8 +167,8 @@ class TestKit(_system: ActorSystem) {
* take maximum wait times are available in a version which implicitly uses
* the remaining time governed by the innermost enclosing `within` block.
*
* Note that the max Duration is scaled by Duration.timeFactor while the min
* Duration is not.
* Note that the timeout is scaled using Duration.dilated, which uses the
* configuration entry "akka.test.timefactor", while the min Duration is not.
*
* <pre>
* val ret = within(50 millis) {
@ -535,7 +537,8 @@ object TestKit {
* If no timeout is given, take it from the innermost enclosing `within`
* block.
*
* Note that the timeout is scaled using Duration.timeFactor.
* Note that the timeout is scaled using Duration.dilated, which uses the
* configuration entry "akka.test.timefactor"
*/
def awaitCond(p: Boolean, max: Duration, interval: Duration = 100.millis, noThrow: Boolean = false): Boolean = {
val stop = now + max
@ -562,6 +565,14 @@ object TestKit {
*/
def now: Duration = System.nanoTime().nanos
/**
* Java API. Scale timeouts (durations) during tests with the configured
* 'akka.test.timefactor'.
*/
def dilated(duration: Duration, system: ActorSystem): Duration = {
duration * TestKitExtension(system).settings.TestTimeFactor
}
}
/**

View file

@ -0,0 +1,55 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.testkit
import akka.actor.ActorSystem
import akka.actor.ExtensionKey
import akka.actor.Extension
import akka.actor.ActorSystemImpl
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import com.typesafe.config.ConfigParseOptions
import com.typesafe.config.ConfigRoot
import akka.util.Duration
import java.util.concurrent.TimeUnit.MILLISECONDS
object TestKitExtensionKey extends ExtensionKey[TestKitExtension]
object TestKitExtension {
def apply(system: ActorSystem): TestKitExtension = {
if (!system.hasExtension(TestKitExtensionKey)) {
system.registerExtension(new TestKitExtension)
}
system.extension(TestKitExtensionKey)
}
class Settings(cfg: Config) {
private def referenceConfig: Config =
ConfigFactory.parseResource(classOf[ActorSystem], "/akka-testkit-reference.conf",
ConfigParseOptions.defaults.setAllowMissing(false))
val config: ConfigRoot = ConfigFactory.emptyRoot("akka-testkit").withFallback(cfg).withFallback(referenceConfig).resolve()
import config._
val TestTimeFactor = getDouble("akka.test.timefactor")
val SingleExpectDefaultTimeout = Duration(getMilliseconds("akka.test.single-expect-default"), MILLISECONDS)
val TestEventFilterLeeway = Duration(getMilliseconds("akka.test.filter-leeway"), MILLISECONDS)
}
}
class TestKitExtension extends Extension[TestKitExtension] {
import TestKitExtension._
@volatile
private var _settings: Settings = _
def key = TestKitExtensionKey
def init(system: ActorSystemImpl) {
_settings = new Settings(system.applicationConfig)
}
def settings: Settings = _settings
}

View file

@ -34,9 +34,10 @@ class TestLatch(count: Int = 1)(implicit system: ActorSystem) {
def await(): Boolean = await(TestLatch.DefaultTimeout)
def await(timeout: Duration): Boolean = {
val testKitExtension = TestKitExtension(system)
val opened = latch.await(timeout.dilated.toNanos, TimeUnit.NANOSECONDS)
if (!opened) throw new TestLatchTimeoutException(
"Timeout of %s with time factor of %s" format (timeout.toString, system.settings.TestTimeFactor))
"Timeout of %s with time factor of %s" format (timeout.toString, testKitExtension.settings.TestTimeFactor))
opened
}
@ -44,9 +45,10 @@ class TestLatch(count: Int = 1)(implicit system: ActorSystem) {
* Timeout is expected. Throws exception if latch is opened before timeout.
*/
def awaitTimeout(timeout: Duration = TestLatch.DefaultTimeout) = {
val testKitExtension = TestKitExtension(system)
val opened = latch.await(timeout.dilated.toNanos, TimeUnit.NANOSECONDS)
if (opened) throw new TestLatchNoTimeoutException(
"Latch opened before timeout of %s with time factor of %s" format (timeout.toString, system.settings.TestTimeFactor))
"Latch opened before timeout of %s with time factor of %s" format (timeout.toString, testKitExtension.settings.TestTimeFactor))
opened
}

View file

@ -12,8 +12,9 @@ package object testkit {
try {
val result = block
val stop = now + system.settings.TestEventFilterLeeway.toMillis
val failed = eventFilters filterNot (_.awaitDone(Duration(stop - now, MILLISECONDS))) map ("Timeout (" + system.settings.TestEventFilterLeeway + ") waiting for " + _)
val testKitExtension = TestKitExtension(system)
val stop = now + testKitExtension.settings.TestEventFilterLeeway.toMillis
val failed = eventFilters filterNot (_.awaitDone(Duration(stop - now, MILLISECONDS))) map ("Timeout (" + testKitExtension.settings.TestEventFilterLeeway + ") waiting for " + _)
if (failed.nonEmpty)
throw new AssertionError("Filter completion error:\n" + failed.mkString("\n"))
@ -26,4 +27,25 @@ package object testkit {
def filterEvents[T](eventFilters: EventFilter*)(block: T)(implicit system: ActorSystem): T = filterEvents(eventFilters.toSeq)(block)
def filterException[T <: Throwable](block: Unit)(implicit system: ActorSystem, m: Manifest[T]): Unit = EventFilter[T]() intercept (block)
/**
* Scala API. Scale timeouts (durations) during tests with the configured
* 'akka.test.timefactor'.
* Implicit conversion to add dilated function to Duration.
* import akka.util.duration._
* import akka.testkit._
* 10.milliseconds.dilated
*
* Corresponding Java API is available in TestKit.dilated
*/
implicit def duration2TestDuration(duration: Duration) = new TestDuration(duration)
/**
* Wrapper for implicit conversion to add dilated function to Duration.
*/
class TestDuration(duration: Duration) {
def dilated(implicit system: ActorSystem): Duration = {
duration * TestKitExtension(system).settings.TestTimeFactor
}
}
}

View file

@ -40,8 +40,8 @@ object AkkaSpec {
}
abstract class AkkaSpec(_application: ActorSystem = ActorSystem(getClass.getSimpleName, AkkaSpec.testConf))
extends TestKit(_application) with WordSpec with MustMatchers with BeforeAndAfterAll {
abstract class AkkaSpec(_system: ActorSystem = ActorSystem(getClass.getSimpleName, AkkaSpec.testConf))
extends TestKit(_system) with WordSpec with MustMatchers with BeforeAndAfterAll {
val log: LoggingAdapter = Logging(system, this.getClass)
@ -66,7 +66,7 @@ abstract class AkkaSpec(_application: ActorSystem = ActorSystem(getClass.getSimp
def this(s: String) = this(ConfigFactory.parseString(s, ConfigParseOptions.defaults))
def this(configMap: Map[String, _]) = {
this(AkkaSpec.mapToConfig(configMap).withFallback(AkkaSpec.testConf))
this(AkkaSpec.mapToConfig(configMap))
}
def actorOf(props: Props): ActorRef = system.actorOf(props)

View file

@ -15,7 +15,7 @@ class TestTimeSpec extends AkkaSpec(Map("akka.test.timefactor" -> 2.0)) with Bef
val now = System.nanoTime
intercept[AssertionError] { probe.awaitCond(false, Duration("1 second")) }
val diff = System.nanoTime - now
val target = (1000000000l * system.settings.TestTimeFactor).toLong
val target = (1000000000l * testKitExtension.settings.TestTimeFactor).toLong
diff must be > (target - 300000000l)
diff must be < (target + 300000000l)
}