Merge remote-tracking branch 'origin/master' into testing-2.9.2

This commit is contained in:
Roland 2012-02-28 10:52:27 +01:00
commit 73170d1f4b
68 changed files with 1342 additions and 1125 deletions

3
.gitignore vendored
View file

@ -63,3 +63,6 @@ akka.sublime-workspace
_mb
schoir.props
worker*.log
mongoDB/
redis/
beanstalk/

View file

@ -0,0 +1,36 @@
package akka.actor;
import akka.actor.ActorSystem;
import akka.japi.Creator;
import akka.testkit.AkkaSpec;
import com.typesafe.config.ConfigFactory;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.junit.Assert.*;
public class StashJavaAPI {
private static ActorSystem system;
@BeforeClass
public static void beforeAll() {
system = ActorSystem.create("StashJavaAPI", ConfigFactory.parseString(ActorWithStashSpec.testConf()));
}
@AfterClass
public static void afterAll() {
system.shutdown();
system = null;
}
@Test
public void mustBeAbleToUseStash() {
ActorRef ref = system.actorOf(new Props(StashJavaAPITestActor.class).withDispatcher("my-dispatcher"));
ref.tell("Hello", ref);
ref.tell("Hello", ref);
ref.tell(new Object());
}
}

View file

@ -0,0 +1,23 @@
package akka.actor;
import static org.junit.Assert.*;
public class StashJavaAPITestActor extends UntypedActorWithStash {
int count = 0;
public void onReceive(Object msg) {
if (msg instanceof String) {
if (count < 0) {
getSender().tell(new Integer(((String) msg).length()));
} else if (count == 2) {
count = -1;
unstashAll();
} else {
count += 1;
stash();
}
} else if (msg instanceof Integer) {
int value = ((Integer) msg).intValue();
assertEquals(value, 5);
}
}
}

View file

@ -0,0 +1,102 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor
import akka.testkit._
import akka.testkit.DefaultTimeout
import akka.testkit.TestEvent._
import akka.dispatch.{ Await, MessageQueueAppendFailedException, BoundedDequeBasedMailbox }
import akka.pattern.ask
import akka.util.duration._
import akka.actor.ActorSystem.Settings
import com.typesafe.config.{ Config, ConfigFactory }
import org.scalatest.BeforeAndAfterEach
object ActorWithBoundedStashSpec {
class StashingActor(implicit sys: ActorSystem) extends Actor with Stash {
def receive = {
case "hello"
stash()
sender ! "OK"
case "world"
try {
unstashAll()
} catch {
case e: MessageQueueAppendFailedException
expectedException.open()
}
}
}
class StashingActorWithOverflow(implicit sys: ActorSystem) extends Actor with Stash {
var numStashed = 0
def receive = {
case "hello"
numStashed += 1
try {
stash()
} catch {
case e: StashOverflowException
if (numStashed == 21) stashOverflow.open()
}
}
}
@volatile var expectedException: TestLatch = null
@volatile var stashOverflow: TestLatch = null
val testConf: Config = ConfigFactory.parseString("""
my-dispatcher {
mailbox-type = "akka.actor.ActorWithBoundedStashSpec$Bounded"
stash-capacity = 20
}
""")
// bounded deque-based mailbox with capacity 10
class Bounded(settings: Settings, config: Config) extends BoundedDequeBasedMailbox(10, 5 seconds)
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class ActorWithBoundedStashSpec extends AkkaSpec(ActorWithBoundedStashSpec.testConf) with DefaultTimeout with BeforeAndAfterEach {
import ActorWithBoundedStashSpec._
implicit val sys = system
override def atStartup {
system.eventStream.publish(Mute(EventFilter[Exception]("Crashing...")))
}
def myProps(creator: Actor): Props = Props(creator).withDispatcher("my-dispatcher")
"An Actor with Stash and BoundedDequeBasedMailbox" must {
"throw a MessageQueueAppendFailedException in case of a capacity violation" in {
ActorWithBoundedStashSpec.expectedException = new TestLatch
val stasher = system.actorOf(myProps(new StashingActor))
// fill up stash
val futures = for (_ 1 to 11) yield { stasher ? "hello" }
futures foreach { Await.ready(_, 10 seconds) }
// cause unstashAll with capacity violation
stasher ! "world"
Await.ready(ActorWithBoundedStashSpec.expectedException, 10 seconds)
}
}
"An Actor with bounded Stash" must {
"throw a StashOverflowException in case of a stash capacity violation" in {
ActorWithBoundedStashSpec.stashOverflow = new TestLatch
val stasher = system.actorOf(myProps(new StashingActorWithOverflow))
// fill up stash
for (_ 1 to 21) { stasher ! "hello" }
Await.ready(ActorWithBoundedStashSpec.stashOverflow, 10 seconds)
}
}
}

View file

@ -0,0 +1,162 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor
import akka.testkit._
import akka.testkit.DefaultTimeout
import akka.testkit.TestEvent._
import akka.dispatch.Await
import akka.pattern.ask
import akka.util.duration._
import com.typesafe.config.{ Config, ConfigFactory }
import org.scalatest.BeforeAndAfterEach
import org.scalatest.junit.JUnitSuite
object ActorWithStashSpec {
class StashingActor(implicit sys: ActorSystem) extends Actor with Stash {
def greeted: Receive = {
case "bye"
state.s = "bye"
state.finished.await
case _ // do nothing
}
def receive = {
case "hello"
state.s = "hello"
unstashAll()
context.become(greeted)
case msg stash()
}
}
class StashingTwiceActor(implicit sys: ActorSystem) extends Actor with Stash {
def receive = {
case "hello"
try {
stash()
stash()
} catch {
case e: IllegalStateException
state.expectedException.open()
}
case msg // do nothing
}
}
class ActorWithProtocol(implicit sys: ActorSystem) extends Actor with Stash {
def receive = {
case "open"
unstashAll()
context.become {
case "write" // do writing...
case "close"
unstashAll()
context.unbecome()
case msg stash()
}
case "done" state.finished.await
case msg stash()
}
}
object state {
@volatile
var s: String = ""
val finished = TestBarrier(2)
var expectedException: TestLatch = null
}
val testConf = """
my-dispatcher {
mailbox-type = "akka.dispatch.UnboundedDequeBasedMailbox"
}
"""
}
class JavaActorWithStashSpec extends StashJavaAPI with JUnitSuite
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class ActorWithStashSpec extends AkkaSpec(ActorWithStashSpec.testConf) with DefaultTimeout with BeforeAndAfterEach {
import ActorWithStashSpec._
implicit val sys = system
override def atStartup {
system.eventStream.publish(Mute(EventFilter[Exception]("Crashing...")))
}
override def beforeEach() = state.finished.reset
def myProps(creator: Actor): Props = Props(creator).withDispatcher("my-dispatcher")
"An Actor with Stash" must {
"stash messages" in {
val stasher = system.actorOf(myProps(new StashingActor))
stasher ! "bye"
stasher ! "hello"
state.finished.await
state.s must be("bye")
}
"support protocols" in {
val protoActor = system.actorOf(myProps(new ActorWithProtocol))
protoActor ! "open"
protoActor ! "write"
protoActor ! "open"
protoActor ! "close"
protoActor ! "write"
protoActor ! "close"
protoActor ! "done"
state.finished.await
}
"throw an IllegalStateException if the same messages is stashed twice" in {
state.expectedException = new TestLatch
val stasher = system.actorOf(myProps(new StashingTwiceActor))
stasher ! "hello"
stasher ! "hello"
Await.ready(state.expectedException, 10 seconds)
}
"process stashed messages after restart" in {
val boss = system.actorOf(myProps(new Supervisor(
OneForOneStrategy(maxNrOfRetries = 2, withinTimeRange = 1 second)(List(classOf[Throwable])))))
val restartLatch = new TestLatch
val hasMsgLatch = new TestLatch
val slaveProps = myProps(new Actor with Stash {
protected def receive = {
case "crash"
throw new Exception("Crashing...")
// when restartLatch is not yet open, stash all messages != "crash"
case msg if !restartLatch.isOpen
stash()
// when restartLatch is open, must receive "hello"
case "hello"
hasMsgLatch.open()
}
override def preRestart(reason: Throwable, message: Option[Any]) = {
if (!restartLatch.isOpen)
restartLatch.open()
super.preRestart(reason, message)
}
})
val slave = Await.result((boss ? slaveProps).mapTo[ActorRef], timeout.duration)
slave ! "hello"
slave ! "crash"
Await.ready(restartLatch, 10 seconds)
Await.ready(hasMsgLatch, 10 seconds)
}
}
}

View file

@ -895,6 +895,13 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
}
Await.ready(complex, timeout.duration) must be('completed)
}
"should capture first exception with dataflow" in {
import Future.flow
val f1 = flow { 40 / 0 }
intercept[java.lang.ArithmeticException](Await result (f1, TestLatch.DefaultTimeout))
}
}
}

View file

@ -8,6 +8,7 @@ import akka.util.duration._
import akka.testkit.AkkaSpec
import akka.actor.{ ActorRef, ActorContext, Props, LocalActorRef }
import com.typesafe.config.Config
import akka.actor.ActorSystem
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAndAfterEach {
@ -156,7 +157,7 @@ object CustomMailboxSpec {
}
"""
class MyMailboxType(config: Config) extends MailboxType {
class MyMailboxType(settings: ActorSystem.Settings, config: Config) extends MailboxType {
override def create(owner: Option[ActorContext]) = owner match {
case Some(o) new MyMailbox(o)
case None throw new Exception("no mailbox owner given")

View file

@ -6,6 +6,7 @@ import akka.pattern.ask
import akka.util.duration._
import akka.testkit.DefaultTimeout
import com.typesafe.config.Config
import akka.actor.ActorSystem
object PriorityDispatcherSpec {
val config = """
@ -17,12 +18,12 @@ object PriorityDispatcherSpec {
}
"""
class Unbounded(config: Config) extends UnboundedPriorityMailbox(PriorityGenerator({
class Unbounded(settings: ActorSystem.Settings, config: Config) extends UnboundedPriorityMailbox(PriorityGenerator({
case i: Int i //Reverse order
case 'Result Int.MaxValue
}: Any Int))
class Bounded(config: Config) extends BoundedPriorityMailbox(PriorityGenerator({
class Bounded(settings: ActorSystem.Settings, config: Config) extends BoundedPriorityMailbox(PriorityGenerator({
case i: Int i //Reverse order
case 'Result Int.MaxValue
}: Any Int), 1000, 10 seconds)

View file

@ -23,6 +23,8 @@ import com.typesafe.config.ConfigValueType;
final class ConfigSubstitution extends AbstractConfigValue implements
Unmergeable {
private static final long serialVersionUID = 1L;
// this is a list of String and SubstitutionExpression where the
// SubstitutionExpression has to be resolved to values, then if there's more
// than one piece everything is stringified and concatenated

View file

@ -242,19 +242,26 @@ akka {
mailbox-push-timeout-time = 10s
# FQCN of the MailboxType, if not specified the default bounded or unbounded
# mailbox is used. The Class of the FQCN must have a constructor with a
# com.typesafe.config.Config parameter.
# mailbox is used. The Class of the FQCN must have a constructor with
# (akka.actor.ActorSystem.Settings, com.typesafe.config.Config) parameters.
mailbox-type = ""
# For BalancingDispatcher: If the balancing dispatcher should attempt to
# schedule idle actors using the same dispatcher when a message comes in,
# and the dispatchers ExecutorService is not fully busy already.
attempt-teamwork = on
# For Actor with Stash: The default capacity of the stash.
# If negative (or zero) then an unbounded stash is used (default)
# If positive then a bounded stash is used and the capacity is set using the
# property
stash-capacity = -1
}
debug {
# enable function of Actor.loggable(), which is to log any received message at
# DEBUG level
# DEBUG level, see the “Testing Actor Systems” section of the Akka Documentation
# at http://akka.io/docs
receive = off
# enable DEBUG logging of all AutoReceiveMessages (Kill, PoisonPill and the like)

View file

@ -481,7 +481,7 @@ class ActorSystemImpl protected[akka] (val name: String, applicationConfig: Conf
def locker: Locker = provider.locker
val dispatchers: Dispatchers = new Dispatchers(settings, DefaultDispatcherPrerequisites(
threadFactory, eventStream, deadLetterMailbox, scheduler, dynamicAccess))
threadFactory, eventStream, deadLetterMailbox, scheduler, dynamicAccess, settings))
val dispatcher: MessageDispatcher = dispatchers.defaultGlobalDispatcher

View file

@ -61,7 +61,7 @@ object RelativeActorPath {
/**
* This object serves as extractor for Scala and as address parser for Java.
*/
object AddressExtractor {
object AddressFromURIString {
def unapply(addr: String): Option[Address] =
try {
val uri = new URI(addr)
@ -87,7 +87,7 @@ object AddressExtractor {
* Try to construct an Address from the given String or throw a java.net.MalformedURLException.
*/
def apply(addr: String): Address = addr match {
case AddressExtractor(address) address
case AddressFromURIString(address) address
case _ throw new MalformedURLException
}
@ -102,7 +102,7 @@ object ActorPathExtractor {
try {
val uri = new URI(addr)
if (uri.getPath == null) None
else AddressExtractor.unapply(uri) match {
else AddressFromURIString.unapply(uri) match {
case None None
case Some(addr) Some((addr, ActorPath.split(uri.getPath).drop(1)))
}

View file

@ -0,0 +1,128 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor
import akka.dispatch.{ Envelope, DequeBasedMessageQueue }
import akka.AkkaException
/**
* The `Stash` trait enables an actor to temporarily stash away messages that can not or
* should not be handled using the actor's current behavior.
* <p/>
* Example:
* <pre>
* class ActorWithProtocol extends Actor with Stash {
* def receive = {
* case "open"
* unstashAll {
* case "write" // do writing...
* case "close"
* unstashAll()
* context.unbecome()
* case msg stash()
* }
* case "done" // done
* case msg stash()
* }
* }
* </pre>
*
* Note that the `Stash` trait can only be used together with actors that have a deque-based
* mailbox. Actors can be configured to use a deque-based mailbox using a configuration like
* the following (see the documentation on dispatchers on how to configure a custom
* dispatcher):
* <pre>
* akka {
* actor {
* my-custom-dispatcher {
* mailbox-type = "akka.dispatch.UnboundedDequeBasedMailbox"
* }
* }
* }
* </pre>
*
* Note that the `Stash` trait must be mixed into (a subclass of) the `Actor` trait before
* any trait/class that overrides the `preRestart` callback. This means it's not possible to write
* `Actor with MyActor with Stash` if `MyActor` overrides `preRestart`.
*/
trait Stash {
this: Actor
/* The private stash of the actor. It is only accessible using `stash()` and
* `unstashAll()`.
*/
private var theStash = Vector.empty[Envelope]
/* The capacity of the stash. Configured in the actor's dispatcher config.
*/
private val capacity = {
val dispatcher = context.system.settings.config.getConfig(context.props.dispatcher)
val config = dispatcher.withFallback(context.system.settings.config.getConfig("akka.actor.default-dispatcher"))
config.getInt("stash-capacity")
}
/* The actor's deque-based message queue.
* `mailbox.queue` is the underlying `Deque`.
*/
private val mailbox: DequeBasedMessageQueue = {
context.asInstanceOf[ActorCell].mailbox.messageQueue match {
case queue: DequeBasedMessageQueue queue
case other throw new ActorInitializationException(self, "DequeBasedMailbox required, got: " + other.getClass() + """
An (unbounded) deque-based mailbox can be configured as follows:
my-custom-dispatcher {
mailbox-type = "akka.dispatch.UnboundedDequeBasedMailbox"
}
""")
}
}
/**
* Adds the current message (the message that the actor received last) to the
* actor's stash.
*
* @throws StashOverflowException in case of a stash capacity violation
* @throws IllegalStateException if the same message is stashed more than once
*/
def stash(): Unit = {
val currMsg = context.asInstanceOf[ActorCell].currentMessage
if (theStash.size > 0 && (currMsg eq theStash.last))
throw new IllegalStateException("Can't stash the same message " + currMsg + " more than once")
if (capacity <= 0 || theStash.size < capacity) theStash :+= currMsg
else throw new StashOverflowException("Couldn't enqueue message " + currMsg + " to stash of " + self)
}
/**
* Prepends all messages in the stash to the mailbox, and then clears the stash.
*
* Messages from the stash are enqueued to the mailbox until the capacity of the
* mailbox (if any) has been reached. In case a bounded mailbox overflows, a
* `MessageQueueAppendFailedException` is thrown.
*
* The stash is guaranteed to be empty after calling `unstashAll()`.
*
* @throws MessageQueueAppendFailedException in case of a capacity violation when
* prepending the stash to a bounded mailbox
*/
def unstashAll(): Unit = {
try {
for (msg theStash.reverseIterator) mailbox.enqueueFirst(self, msg)
} finally {
theStash = Vector.empty[Envelope]
}
}
/**
* Overridden callback. Prepends all messages in the stash to the mailbox,
* clears the stash, stops all children and invokes the postStop() callback of the superclass.
*/
override def preRestart(reason: Throwable, message: Option[Any]) {
try unstashAll() finally {
context.children foreach context.stop
postStop()
}
}
}
class StashOverflowException(message: String, cause: Throwable = null) extends AkkaException(message, cause)

View file

@ -0,0 +1,33 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor
/**
* Actor base class that should be extended to create an actor with a stash.
*
* The stash enables an actor to temporarily stash away messages that can not or
* should not be handled using the actor's current behavior.
* <p/>
* Example:
* <pre>
* public class MyActorWithStash extends UntypedActorWithStash {
* int count = 0;
* public void onReceive(Object msg) {
* if (msg instanceof String) {
* if (count < 0) {
* getSender().tell(new Integer(((String) msg).length()));
* } else if (count == 2) {
* count = -1;
* unstashAll();
* } else {
* count += 1;
* stash();
* }
* }
* }
* }
* </pre>
*/
abstract class UntypedActorWithStash extends UntypedActor with Stash

View file

@ -384,17 +384,17 @@ abstract class MessageDispatcherConfigurator(val config: Config, val prerequisit
config.getString("mailbox-type") match {
case ""
if (config.getInt("mailbox-capacity") < 1) UnboundedMailbox()
else new BoundedMailbox(config)
else new BoundedMailbox(prerequisites.settings, config)
case "unbounded" UnboundedMailbox()
case "bounded" new BoundedMailbox(config)
case "bounded" new BoundedMailbox(prerequisites.settings, config)
case fqcn
val args = Seq(classOf[Config] -> config)
val args = Seq(classOf[ActorSystem.Settings] -> prerequisites.settings, classOf[Config] -> config)
prerequisites.dynamicAccess.createInstanceFor[MailboxType](fqcn, args) match {
case Right(instance) instance
case Left(exception)
throw new IllegalArgumentException(
("Cannot instantiate MailboxType [%s], defined in [%s], " +
"make sure it has constructor with a [com.typesafe.config.Config] parameter")
"make sure it has constructor with [akka.actor.ActorSystem.Settings, com.typesafe.config.Config] parameters")
.format(fqcn, config.getString("id")), exception)
}
}

View file

@ -22,6 +22,7 @@ trait DispatcherPrerequisites {
def deadLetterMailbox: Mailbox
def scheduler: Scheduler
def dynamicAccess: DynamicAccess
def settings: ActorSystem.Settings
}
case class DefaultDispatcherPrerequisites(
@ -29,7 +30,8 @@ case class DefaultDispatcherPrerequisites(
val eventStream: EventStream,
val deadLetterMailbox: Mailbox,
val scheduler: Scheduler,
val dynamicAccess: DynamicAccess) extends DispatcherPrerequisites
val dynamicAccess: DynamicAccess,
val settings: ActorSystem.Settings) extends DispatcherPrerequisites
object Dispatchers {
/**

View file

@ -308,9 +308,13 @@ object Future {
def flow[A](body: A @cps[Future[Any]])(implicit executor: ExecutionContext): Future[A] = {
val p = Promise[A]
dispatchTask({ ()
try {
(reify(body) foreachFull (p success, p failure): Future[Any]) onFailure {
case NonFatal(e) p tryComplete Left(e)
}
} catch {
case NonFatal(e) p tryComplete Left(e)
}
}, true)
p.future
}

View file

@ -4,7 +4,7 @@
package akka.dispatch
import akka.AkkaException
import java.util.{ Comparator, PriorityQueue, Queue }
import java.util.{ Comparator, PriorityQueue, Queue, Deque }
import akka.util._
import akka.actor.{ ActorCell, ActorRef }
import java.util.concurrent._
@ -12,6 +12,7 @@ import annotation.tailrec
import akka.event.Logging.Error
import akka.actor.ActorContext
import com.typesafe.config.Config
import akka.actor.ActorSystem
class MessageQueueAppendFailedException(message: String, cause: Throwable = null) extends AkkaException(message, cause)
@ -310,6 +311,21 @@ private[akka] trait DefaultSystemMessageQueue { self: Mailbox ⇒
def hasSystemMessages: Boolean = systemQueueGet ne null
}
trait QueueBasedMessageQueue extends MessageQueue {
def queue: Queue[Envelope]
def numberOfMessages = queue.size
def hasMessages = !queue.isEmpty
def cleanUp(owner: ActorContext, deadLetters: MessageQueue): Unit = {
if (hasMessages) {
var envelope = dequeue
while (envelope ne null) {
deadLetters.enqueue(owner.self, envelope)
envelope = dequeue
}
}
}
}
trait UnboundedMessageQueueSemantics extends QueueBasedMessageQueue {
def enqueue(receiver: ActorRef, handle: Envelope): Unit = queue add handle
def dequeue(): Envelope = queue.poll()
@ -330,19 +346,36 @@ trait BoundedMessageQueueSemantics extends QueueBasedMessageQueue {
def dequeue(): Envelope = queue.poll()
}
trait QueueBasedMessageQueue extends MessageQueue {
def queue: Queue[Envelope]
def numberOfMessages = queue.size
def hasMessages = !queue.isEmpty
def cleanUp(owner: ActorContext, deadLetters: MessageQueue): Unit = {
if (hasMessages) {
var envelope = dequeue
while (envelope ne null) {
deadLetters.enqueue(owner.self, envelope)
envelope = dequeue
}
trait DequeBasedMessageQueue extends QueueBasedMessageQueue {
def queue: Deque[Envelope]
def enqueueFirst(receiver: ActorRef, handle: Envelope): Unit
}
trait UnboundedDequeBasedMessageQueueSemantics extends DequeBasedMessageQueue {
def enqueue(receiver: ActorRef, handle: Envelope): Unit = queue add handle
def enqueueFirst(receiver: ActorRef, handle: Envelope): Unit = queue addFirst handle
def dequeue(): Envelope = queue.poll()
}
trait BoundedDequeBasedMessageQueueSemantics extends DequeBasedMessageQueue {
def pushTimeOut: Duration
override def queue: BlockingDeque[Envelope]
def enqueue(receiver: ActorRef, handle: Envelope): Unit =
if (pushTimeOut.length > 0)
queue.offer(handle, pushTimeOut.length, pushTimeOut.unit) || {
throw new MessageQueueAppendFailedException("Couldn't enqueue message " + handle + " to " + receiver)
}
else queue put handle
def enqueueFirst(receiver: ActorRef, handle: Envelope): Unit =
if (pushTimeOut.length > 0)
queue.offerFirst(handle, pushTimeOut.length, pushTimeOut.unit) || {
throw new MessageQueueAppendFailedException("Couldn't enqueue message " + handle + " to " + receiver)
}
else queue putFirst handle
def dequeue(): Envelope = queue.poll()
}
/**
@ -357,25 +390,25 @@ trait MailboxType {
*/
case class UnboundedMailbox() extends MailboxType {
def this(config: Config) = this()
def this(settings: ActorSystem.Settings, config: Config) = this()
final override def create(owner: Option[ActorContext]): MessageQueue =
new QueueBasedMessageQueue with UnboundedMessageQueueSemantics {
final val queue = new ConcurrentLinkedQueue[Envelope]()
new ConcurrentLinkedQueue[Envelope]() with QueueBasedMessageQueue with UnboundedMessageQueueSemantics {
final def queue: Queue[Envelope] = this
}
}
case class BoundedMailbox( final val capacity: Int, final val pushTimeOut: Duration) extends MailboxType {
def this(config: Config) = this(config.getInt("mailbox-capacity"),
def this(settings: ActorSystem.Settings, config: Config) = this(config.getInt("mailbox-capacity"),
Duration(config.getNanoseconds("mailbox-push-timeout-time"), TimeUnit.NANOSECONDS))
if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative")
if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null")
final override def create(owner: Option[ActorContext]): MessageQueue =
new QueueBasedMessageQueue with BoundedMessageQueueSemantics {
final val queue = new LinkedBlockingQueue[Envelope](capacity)
new LinkedBlockingQueue[Envelope](capacity) with QueueBasedMessageQueue with BoundedMessageQueueSemantics {
final def queue: BlockingQueue[Envelope] = this
final val pushTimeOut = BoundedMailbox.this.pushTimeOut
}
}
@ -385,8 +418,8 @@ case class BoundedMailbox( final val capacity: Int, final val pushTimeOut: Durat
*/
class UnboundedPriorityMailbox( final val cmp: Comparator[Envelope]) extends MailboxType {
final override def create(owner: Option[ActorContext]): MessageQueue =
new QueueBasedMessageQueue with UnboundedMessageQueueSemantics {
final val queue = new PriorityBlockingQueue[Envelope](11, cmp)
new PriorityBlockingQueue[Envelope](11, cmp) with QueueBasedMessageQueue with UnboundedMessageQueueSemantics {
final def queue: Queue[Envelope] = this
}
}
@ -399,9 +432,33 @@ class BoundedPriorityMailbox( final val cmp: Comparator[Envelope], final val cap
if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null")
final override def create(owner: Option[ActorContext]): MessageQueue =
new QueueBasedMessageQueue with BoundedMessageQueueSemantics {
final val queue = new BoundedBlockingQueue[Envelope](capacity, new PriorityQueue[Envelope](11, cmp))
new BoundedBlockingQueue[Envelope](capacity, new PriorityQueue[Envelope](11, cmp)) with QueueBasedMessageQueue with BoundedMessageQueueSemantics {
final def queue: BlockingQueue[Envelope] = this
final val pushTimeOut = BoundedPriorityMailbox.this.pushTimeOut
}
}
case class UnboundedDequeBasedMailbox() extends MailboxType {
def this(settings: ActorSystem.Settings, config: Config) = this()
final override def create(owner: Option[ActorContext]): MessageQueue =
new LinkedBlockingDeque[Envelope]() with DequeBasedMessageQueue with UnboundedDequeBasedMessageQueueSemantics {
final val queue = this
}
}
case class BoundedDequeBasedMailbox( final val capacity: Int, final val pushTimeOut: Duration) extends MailboxType {
def this(settings: ActorSystem.Settings, config: Config) = this(config.getInt("mailbox-capacity"),
Duration(config.getNanoseconds("mailbox-push-timeout-time"), TimeUnit.NANOSECONDS))
if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedDequeBasedMailbox can not be negative")
if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedDequeBasedMailbox can not be null")
final override def create(owner: Option[ActorContext]): MessageQueue =
new LinkedBlockingDeque[Envelope](capacity) with DequeBasedMessageQueue with BoundedDequeBasedMessageQueueSemantics {
final val queue = this
final val pushTimeOut = BoundedDequeBasedMailbox.this.pushTimeOut
}
}

View file

@ -9,7 +9,7 @@ import java.util.concurrent.TimeUnit.MILLISECONDS
import akka.config.ConfigurationException
import scala.collection.JavaConverters._
import akka.actor.Address
import akka.actor.AddressExtractor
import akka.actor.AddressFromURIString
class ClusterSettings(val config: Config, val systemName: String) {
import config._
@ -21,6 +21,6 @@ class ClusterSettings(val config: Config, val systemName: String) {
val InitialDelayForGossip = Duration(getMilliseconds("akka.cluster.gossip.initial-delay"), MILLISECONDS)
val GossipFrequency = Duration(getMilliseconds("akka.cluster.gossip.frequency"), MILLISECONDS)
val SeedNodes = Set.empty[Address] ++ getStringList("akka.cluster.seed-nodes").asScala.collect {
case AddressExtractor(addr) addr
case AddressFromURIString(addr) addr
}
}

View file

@ -15,3 +15,9 @@ Scalatra has Akka integration.
Read more here: `<https://github.com/scalatra/scalatra/blob/develop/akka/src/main/scala/org/scalatra/akka/AkkaSupport.scala>`_
Gatling
-------
Gatling is an Open Source Stress Tool.
Read more here: `<http://gatling-tool.org/>`_

View file

@ -9,24 +9,27 @@
Name Role Email
=================== ========================== ====================================
Jonas Bonér Founder, Despot, Committer jonas AT jonasboner DOT com
Viktor Klang Bad cop, Committer viktor DOT klang AT gmail DOT com
Debasish Ghosh Committer dghosh AT acm DOT org
Viktor Klang Project Owner viktor DOT klang AT gmail DOT com
Roland Kuhn Committer
Patrik Nordwall Committer patrik DOT nordwall AT gmail DOT com
Derek Williams Committer derek AT nebvin DOT ca
Henrik Engström Committer
Peter Vlugter Committer
Martin Krasser Committer krasserm AT googlemail DOT com
Raymond Roestenburg Committer
Piotr Gabryanczyk Committer
Debasish Ghosh Alumni dghosh AT acm DOT org
Ross McDonald Alumni rossajmcd AT gmail DOT com
Eckhart Hertzler Alumni
Mikael Högqvist Alumni
Tim Perrett Alumni
Jeanfrancois Arcand Alumni jfarcand AT apache DOT org
Martin Krasser Committer krasserm AT googlemail DOT com
Jan Van Besien Alumni
Michael Kober Alumni
Peter Vlugter Committer
Peter Veentjer Committer
Irmo Manie Committer
Heiko Seeberger Committer
Hiram Chirino Committer
Scott Clasen Committer
Roland Kuhn Committer
Patrik Nordwall Committer patrik DOT nordwall AT gmail DOT com
Derek Williams Committer derek AT nebvin DOT ca
Henrik Engström Committer
Peter Veentjer Alumni
Irmo Manie Alumni
Heiko Seeberger Alumni
Hiram Chirino Alumni
Scott Clasen Alumni
=================== ========================== ====================================

View file

@ -60,19 +60,17 @@ public class DispatcherDocTestBase {
@Test
public void defineDispatcher() {
//#defining-dispatcher
ActorRef myActor1 = system.actorOf(new Props(MyUntypedActor.class).withDispatcher("my-dispatcher"),
"myactor1");
ActorRef myActor2 = system.actorOf(new Props(MyUntypedActor.class).withDispatcher("my-dispatcher"),
"myactor2");
ActorRef myActor =
system.actorOf(new Props(MyUntypedActor.class).withDispatcher("my-dispatcher"),
"myactor3");
//#defining-dispatcher
}
@Test
public void definePinnedDispatcher() {
//#defining-pinned-dispatcher
String name = "myactor";
ActorRef myActor = system.actorOf(new Props(MyUntypedActor.class)
.withDispatcher("myactor-dispatcher"), name);
.withDispatcher("my-pinned-dispatcher"));
//#defining-pinned-dispatcher
}
@ -80,11 +78,13 @@ public class DispatcherDocTestBase {
public void priorityDispatcher() throws Exception {
//#prio-dispatcher
ActorRef myActor = system.actorOf( // We create a new Actor that just prints out what it processes
// We create a new Actor that just prints out what it processes
ActorRef myActor = system.actorOf(
new Props().withCreator(new UntypedActorFactory() {
public UntypedActor create() {
return new UntypedActor() {
LoggingAdapter log = Logging.getLogger(getContext().system(), this);
LoggingAdapter log =
Logging.getLogger(getContext().system(), this);
{
getSelf().tell("lowpriority");
getSelf().tell("lowpriority");
@ -101,7 +101,7 @@ public class DispatcherDocTestBase {
}
};
}
}).withDispatcher("prio-dispatcher-java"));
}).withDispatcher("prio-dispatcher"));
/*
Logs:
@ -123,19 +123,20 @@ public class DispatcherDocTestBase {
}
//#prio-mailbox
public static class PrioMailbox extends UnboundedPriorityMailbox {
public PrioMailbox(Config config) { // needed for reflective instantiation
super(new PriorityGenerator() { // Create a new PriorityGenerator, lower prio means more important
public static class MyPrioMailbox extends UnboundedPriorityMailbox {
public MyPrioMailbox(ActorSystem.Settings settings, Config config) { // needed for reflective instantiation
// Create a new PriorityGenerator, lower prio means more important
super(new PriorityGenerator() {
@Override
public int gen(Object message) {
if (message.equals("highpriority"))
return 0; // 'highpriority messages should be treated first if possible
else if (message.equals("lowpriority"))
return 100; // 'lowpriority messages should be treated last if possible
return 2; // 'lowpriority messages should be treated last if possible
else if (message.equals(Actors.poisonPill()))
return 1000; // PoisonPill when no other left
return 3; // PoisonPill when no other left
else
return 50; // We default to 50
return 1; // By default they go between high and low prio
}
});
}

View file

@ -11,7 +11,7 @@ import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.actor.ActorSystem;
import akka.actor.Address;
import akka.actor.AddressExtractor;
import akka.actor.AddressFromURIString;
import java.util.Arrays;
public class RouterViaProgramExample {
@ -73,7 +73,7 @@ public class RouterViaProgramExample {
//#remoteRoutees
Address addr1 = new Address("akka", "remotesys", "otherhost", 1234);
Address addr2 = AddressExtractor.parse("akka://othersys@anotherhost:1234");
Address addr2 = AddressFromURIString.parse("akka://othersys@anotherhost:1234");
Address[] addresses = new Address[] { addr1, addr2 };
ActorRef routerRemote = system.actorOf(new Props(ExampleActor.class)
.withRouter(new RemoteRouterConfig(new RoundRobinRouter(5), addresses)));

View file

@ -10,7 +10,7 @@ import org.junit.Test;
//#import
import akka.actor.ActorRef;
import akka.actor.Address;
import akka.actor.AddressExtractor;
import akka.actor.AddressFromURIString;
import akka.actor.Deploy;
import akka.actor.Props;
import akka.actor.ActorSystem;
@ -35,7 +35,7 @@ public class RemoteDeploymentDocTestBase {
public void demonstrateDeployment() {
//#make-address
Address addr = new Address("akka", "sys", "host", 1234);
addr = AddressExtractor.parse("akka://sys@host:1234"); // the same
addr = AddressFromURIString.parse("akka://sys@host:1234"); // the same
//#make-address
//#deploy
ActorRef ref = system.actorOf(new Props(RemoteDeploymentDocSpec.Echo.class).withDeploy(new Deploy(new RemoteScope(addr))));

View file

@ -1,195 +0,0 @@
Dataflow Concurrency (Java)
===========================
.. sidebar:: Contents
.. contents:: :local:
Introduction
------------
**IMPORTANT: As of Akka 1.1, Akka Future, Promise and DefaultPromise have all the functionality of DataFlowVariables, they also support non-blocking composition and advanced features like fold and reduce, Akka DataFlowVariable is therefor deprecated and will probably resurface in the following release as a DSL on top of Futures.**
Akka implements `Oz-style dataflow concurrency <http://www.mozart-oz.org/documentation/tutorial/node8.html#chapter.concurrency>`_ through dataflow (single assignment) variables and lightweight (event-based) processes/threads.
Dataflow concurrency is deterministic. This means that it will always behave the same. If you run it once and it yields output 5 then it will do that **every time**, run it 10 million times, same result. If it on the other hand deadlocks the first time you run it, then it will deadlock **every single time** you run it. Also, there is **no difference** between sequential code and concurrent code. These properties makes it very easy to reason about concurrency. The limitation is that the code needs to be side-effect free, e.g. deterministic. You can't use exceptions, time, random etc., but need to treat the part of your program that uses dataflow concurrency as a pure function with input and output.
The best way to learn how to program with dataflow variables is to read the fantastic book `Concepts, Techniques, and Models of Computer Programming <http://www.info.ucl.ac.be/%7Epvr/book.html>`_. By Peter Van Roy and Seif Haridi.
The documentation is not as complete as it should be, something we will improve shortly. For now, besides above listed resources on dataflow concurrency, I recommend you to read the documentation for the GPars implementation, which is heavily influenced by the Akka implementation:
* `<http://gpars.codehaus.org/Dataflow>`_
* `<http://www.gpars.org/guide/guide/7.%20Dataflow%20Concurrency.html>`_
Dataflow Variables
------------------
Dataflow Variable defines three different operations:
1. Define a Dataflow Variable
.. code-block:: java
import static akka.dataflow.DataFlow.*;
DataFlowVariable<int> x = new DataFlowVariable<int>();
2. Wait for Dataflow Variable to be bound
.. code-block:: java
x.get();
3. Bind Dataflow Variable
.. code-block:: java
x.set(3);
A Dataflow Variable can only be bound once. Subsequent attempts to bind the variable will throw an exception.
You can also shutdown a dataflow variable like this:
.. code-block:: java
x.shutdown();
Threads
-------
You can easily create millions lightweight (event-driven) threads on a regular workstation.
.. code-block:: java
import static akka.dataflow.DataFlow.*;
import akka.japi.Effect;
thread(new Effect() {
public void apply() { ... }
});
You can also set the thread to a reference to be able to control its life-cycle:
.. code-block:: java
import static akka.dataflow.DataFlow.*;
import akka.japi.Effect;
ActorRef t = thread(new Effect() {
public void apply() { ... }
});
... // time passes
t.tell(new Exit()); // shut down the thread
Examples
--------
Most of these examples are taken from the `Oz wikipedia page <http://en.wikipedia.org/wiki/Oz_%28programming_language%29>`_
Simple DataFlowVariable example
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
This example is from Oz wikipedia page: http://en.wikipedia.org/wiki/Oz_(programming_language).
Sort of the "Hello World" of dataflow concurrency.
Example in Oz:
.. code-block:: ruby
thread
Z = X+Y % will wait until both X and Y are bound to a value.
{Browse Z} % shows the value of Z.
end
thread X = 40 end
thread Y = 2 end
Example in Akka:
.. code-block:: java
import static akka.dataflow.DataFlow.*;
import akka.japi.Effect;
DataFlowVariable<int> x = new DataFlowVariable<int>();
DataFlowVariable<int> y = new DataFlowVariable<int>();
DataFlowVariable<int> z = new DataFlowVariable<int>();
thread(new Effect() {
public void apply() {
z.set(x.get() + y.get());
System.out.println("z = " + z.get());
}
});
thread(new Effect() {
public void apply() {
x.set(40);
}
});
thread(new Effect() {
public void apply() {
y.set(40);
}
});
Example on life-cycle management of DataFlowVariables
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Shows how to shutdown dataflow variables and bind threads to values to be able to interact with them (exit etc.).
Example in Akka:
.. code-block:: java
import static akka.dataflow.DataFlow.*;
import akka.japi.Effect;
// create four 'int' data flow variables
DataFlowVariable<int> x = new DataFlowVariable<int>();
DataFlowVariable<int> y = new DataFlowVariable<int>();
DataFlowVariable<int> z = new DataFlowVariable<int>();
DataFlowVariable<int> v = new DataFlowVariable<int>();
ActorRef main = thread(new Effect() {
public void apply() {
System.out.println("Thread 'main'")
if (x.get() > y.get()) {
z.set(x);
System.out.println("'z' set to 'x': " + z.get());
} else {
z.set(y);
System.out.println("'z' set to 'y': " + z.get());
}
// main completed, shut down the data flow variables
x.shutdown();
y.shutdown();
z.shutdown();
v.shutdown();
}
});
ActorRef setY = thread(new Effect() {
public void apply() {
System.out.println("Thread 'setY', sleeping...");
Thread.sleep(5000);
y.set(2);
System.out.println("'y' set to: " + y.get());
}
});
ActorRef setV = thread(new Effect() {
public void apply() {
System.out.println("Thread 'setV'");
y.set(2);
System.out.println("'v' set to y: " + v.get());
}
});
// shut down the threads
main.tell(new Exit());
setY.tell(new Exit());
setV.tell(new Exit());

View file

@ -7,204 +7,167 @@ Dispatchers (Java)
.. contents:: :local:
The Dispatcher is an important piece that allows you to configure the right semantics and parameters for optimal performance, throughput and scalability. Different Actors have different needs.
Akka supports dispatchers for both event-driven lightweight threads, allowing creation of millions of threads on a single workstation, and thread-based Actors, where each dispatcher is bound to a dedicated OS thread.
The event-based Actors currently consume ~600 bytes per Actor which means that you can create more than 6.5 million Actors on 4 GB RAM.
An Akka ``MessageDispatcher`` is what makes Akka Actors "tick", it is the engine of the machine so to speak.
All ``MessageDispatcher`` implementations are also an ``ExecutionContext``, which means that they can be used
to execute arbitrary code, for instance :ref:`futures-java`.
Default dispatcher
------------------
For most scenarios the default settings are the best. Here we have one single event-based dispatcher for all Actors created.
The default dispatcher is available from the ``ActorSystem.dispatcher`` and can be configured in the ``akka.actor.default-dispatcher``
section of the :ref:`configuration`.
Every ``ActorSystem`` will have a default dispatcher that will be used in case nothing else is configured for an ``Actor``.
The default dispatcher can be configured, and is by default a ``Dispatcher`` with a "fork-join-executor", which gives excellent performance in most cases.
If you are starting to get contention on the single dispatcher (the ``Executor`` and its queue) or want to group a specific set of Actors
for a dedicated dispatcher for better flexibility and configurability then you can override the defaults and define your own dispatcher.
See below for details on which ones are available and how they can be configured.
Setting the dispatcher for an Actor
-----------------------------------
.. warning::
Try to stick to a sensible default dispatcher, that means avoid using CallingThreadDispatcher, BalancingDispatcher or PinnedDispatcher
as the default-dispatcher. This is because they have very specific requirements from the environment in which they are used.
So in case you want to give your ``Actor`` a different dispatcher than the default, you need to do two things, of which the first is:
Setting the dispatcher
----------------------
.. includecode:: ../java/code/akka/docs/dispatcher/DispatcherDocTestBase.java#defining-dispatcher
You specify the id of the dispatcher to use when creating an actor. The id corresponds to the :ref:`configuration` key
of the dispatcher settings.
.. note::
The "dispatcherId" you specify in withDispatcher is in fact a path into your configuration.
So in this example it's a top-level section, but you could for instance put it as a sub-section,
where you'd use periods to denote sub-sections, like this: ``"foo.bar.my-dispatcher"``
.. includecode:: code/akka/docs/dispatcher/DispatcherDocTestBase.java
:include: imports,defining-dispatcher
And then you just need to configure that dispatcher in your configuration:
.. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-dispatcher-config
And here's another example that uses the "thread-pool-executor":
.. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-thread-pool-dispatcher-config
For more options, see the default-dispatcher section of the :ref:`configuration`.
Types of dispatchers
--------------------
There are 4 different types of message dispatchers:
* Thread-based (Pinned)
* Event-based
* Priority event-based
* Work-sharing (Balancing)
* Dispatcher
It is recommended to define the dispatcher in :ref:`configuration` to allow for tuning for different environments.
- Sharability: Unlimited
Example of a custom event-based dispatcher, which can be used with
``new Props().withCreator(MyUntypedActor.class).withDispatcher("my-dispatcher")``
as in the example above:
- Mailboxes: Any, creates one per Actor
.. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-dispatcher-config
- Use cases: Default dispatcher, Bulkheading
Default values are taken from ``default-dispatcher``, i.e. all options doesn't need to be defined. See
:ref:`configuration` for the default values of the ``default-dispatcher``. You can also override
the values for the ``default-dispatcher`` in your configuration.
- Driven by: ``java.util.concurrent.ExecutorService``
specify using "executor" using "fork-join-executor",
"thread-pool-executor" or the FQCN of
an ``akka.dispatcher.ExecutorServiceConfigurator``
.. note::
* PinnedDispatcher
It should be noted that the ``dispatcher-id`` used in :class:`Props` is in
fact an absolute path into the configuration object, i.e. you can declare a
dispatcher configuration nested within other configuration objects and refer
to it like so: ``"my.config.object.myAwesomeDispatcher"``
- Sharability: None
There are two different executor services:
- Mailboxes: Any, creates one per Actor
* executor = "fork-join-executor", ``ExecutorService`` based on ForkJoinPool (jsr166y). This is used by default for
``default-dispatcher``.
* executor = "thread-pool-executor", ``ExecutorService`` based on ``java.util.concurrent.ThreadPoolExecutor``.
- Use cases: Bulkheading
Note that the pool size is configured differently for the two executor services. The configuration above
is an example for ``fork-join-executor``. Below is an example for ``thread-pool-executor``:
- Driven by: Any ``akka.dispatch.ThreadPoolExecutorConfigurator``
by default a "thread-pool-executor"
.. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-thread-pool-dispatcher-config
* BalancingDispatcher
Let's now walk through the different dispatchers in more detail.
- Sharability: Actors of the same type only
Thread-based
^^^^^^^^^^^^
- Mailboxes: Any, creates one for all Actors
The ``PinnedDispatcher`` binds a dedicated OS thread to each specific Actor. The messages are posted to a
`LinkedBlockingQueue <http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/LinkedBlockingQueue.html>`_
which feeds the messages to the dispatcher one by one. A ``PinnedDispatcher`` cannot be shared between actors. This dispatcher
has worse performance and scalability than the event-based dispatcher but works great for creating "daemon" Actors that consumes
a low frequency of messages and are allowed to go off and do their own thing for a longer period of time. Another advantage with
this dispatcher is that Actors do not block threads for each other.
- Use cases: Work-sharing
The ``PinnedDispatcher`` is configured like this:
- Driven by: ``java.util.concurrent.ExecutorService``
specify using "executor" using "fork-join-executor",
"thread-pool-executor" or the FQCN of
an ``akka.dispatcher.ExecutorServiceConfigurator``
* CallingThreadDispatcher
- Sharability: Unlimited
- Mailboxes: Any, creates one per Actor per Thread (on demand)
- Use cases: Testing
- Driven by: The calling thread (duh)
More dispatcher configuration examples
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Configuring a ``PinnedDispatcher``:
.. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-pinned-dispatcher-config
Note that it must be used with ``executor = "thread-pool-executor"``.
And then using it:
Event-based
^^^^^^^^^^^
.. includecode:: ../java/code/akka/docs/dispatcher/DispatcherDocTestBase.java#defining-pinned-dispatcher
The event-based ``Dispatcher`` binds a set of Actors to a thread pool backed up by a
`BlockingQueue <http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/BlockingQueue.html>`_. This dispatcher is highly configurable
and supports a fluent configuration API to configure the ``BlockingQueue`` (type of queue, max items etc.) as well as the thread pool.
Mailboxes
---------
The event-driven dispatchers **must be shared** between multiple Actors. One best practice is to let each top-level Actor, e.g.
the Actors you create from ``system.actorOf`` to get their own dispatcher but reuse the dispatcher for each new Actor
that the top-level Actor creates. But you can also share dispatcher between multiple top-level Actors. This is very use-case specific
and needs to be tried out on a case by case basis. The important thing is that Akka tries to provide you with the freedom you need to
design and implement your system in the most efficient way in regards to performance, throughput and latency.
An Akka ``Mailbox`` holds the messages that are destined for an ``Actor``.
Normally each ``Actor`` has its own mailbox, but with example a ``BalancingDispatcher`` all actors with the same ``BalancingDispatcher`` will share a single instance.
It comes with many different predefined BlockingQueue configurations:
Builtin implementations
^^^^^^^^^^^^^^^^^^^^^^^
* Bounded `LinkedBlockingQueue <http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/LinkedBlockingQueue.html>`_
* Unbounded `LinkedBlockingQueue <http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/LinkedBlockingQueue.html>`_
* Bounded `ArrayBlockingQueue <http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/ArrayBlockingQueue.html>`_
* Unbounded `ArrayBlockingQueue <http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/ArrayBlockingQueue.html>`_
* `SynchronousQueue <http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/SynchronousQueue.html>`_
Akka comes shipped with a number of default mailbox implementations:
When using a bounded queue and it has grown up to limit defined the message processing will run in the caller's
thread as a way to slow him down and balance producer/consumer.
* UnboundedMailbox
Here is an example of a bounded mailbox:
- Backed by a ``java.util.concurrent.ConcurrentLinkedQueue``
.. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-bounded-config
- Blocking: No
The standard :class:`Dispatcher` allows you to define the ``throughput`` it
should have, as shown above. This defines the number of messages for a specific
Actor the dispatcher should process in one single sweep; in other words, the
dispatcher will batch process up to ``throughput`` messages together when
having elected an actor to run. Setting this to a higher number will increase
throughput but lower fairness, and vice versa. If you don't specify it explicitly
then it uses the value (5) defined for ``default-dispatcher`` in the :ref:`configuration`.
- Bounded: No
Browse the `ScalaDoc <scaladoc>`_ or look at the code for all the options available.
* BoundedMailbox
Priority event-based
^^^^^^^^^^^^^^^^^^^^
- Backed by a ``java.util.concurrent.LinkedBlockingQueue``
Sometimes it's useful to be able to specify priority order of messages, that is done by using Dispatcher and supply
an UnboundedPriorityMailbox or BoundedPriorityMailbox with a ``java.util.Comparator[Envelope]`` or use a
``akka.dispatch.PriorityGenerator`` (recommended).
- Blocking: Yes
Creating a Dispatcher with a mailbox using PriorityGenerator:
- Bounded: Yes
Config:
* UnboundedPriorityMailbox
.. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala
:include: prio-dispatcher-config-java
- Backed by a ``java.util.concurrent.PriorityBlockingQueue``
Priority mailbox:
- Blocking: Yes
.. includecode:: code/akka/docs/dispatcher/DispatcherDocTestBase.java
:include: imports-prio-mailbox,prio-mailbox
- Bounded: No
Usage:
* BoundedPriorityMailbox
.. includecode:: code/akka/docs/dispatcher/DispatcherDocTestBase.java
:include: imports-prio,prio-dispatcher
- Backed by a ``java.util.PriorityBlockingQueue`` wrapped in an ``akka.util.BoundedBlockingQueue``
- Blocking: Yes
Work-sharing event-based
^^^^^^^^^^^^^^^^^^^^^^^^^
- Bounded: Yes
The ``BalancingDispatcher`` is a variation of the ``Dispatcher`` in which Actors of the same type can be set up to
share this dispatcher and during execution time the different actors will steal messages from other actors if they
have less messages to process.
Although the technique used in this implementation is commonly known as "work stealing", the actual implementation is probably
best described as "work donating" because the actor of which work is being stolen takes the initiative.
This can be a great way to improve throughput at the cost of a little higher latency.
* Durable mailboxes, see :ref:`durable-mailboxes`.
.. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-balancing-config
Mailbox configuration examples
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Here is an article with some more information: `Load Balancing Actors with Work Stealing Techniques <http://janvanbesien.blogspot.com/2010/03/load-balancing-actors-with-work.html>`_
Here is another article discussing this particular dispatcher: `Flexible load balancing with Akka in Scala <http://vasilrem.com/blog/software-development/flexible-load-balancing-with-akka-in-scala/>`_
How to create a PriorityMailbox:
Making the Actor mailbox bounded
--------------------------------
.. includecode:: ../java/code/akka/docs/dispatcher/DispatcherDocTestBase.java#prio-mailbox
Global configuration
^^^^^^^^^^^^^^^^^^^^
And then add it to the configuration:
You can make the Actor mailbox bounded by a capacity in two ways. Either you define it in the :ref:`configuration` file under
``default-dispatcher``. This will set it globally as default for the DefaultDispatcher and for other configured dispatchers,
if not specified otherwise.
.. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala#prio-dispatcher-config
.. code-block:: ruby
And then an example on how you would use it:
akka {
actor {
default-dispatcher {
# If negative (or zero) then an unbounded mailbox is used (default)
# If positive then a bounded mailbox is used and the capacity is set to the number specified
mailbox-capacity = 1000
}
}
}
.. includecode:: ../java/code/akka/docs/dispatcher/DispatcherDocTestBase.java#prio-dispatcher
Per-instance based configuration
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.. note::
You can also do it on a specific dispatcher instance.
.. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-bounded-config
For the ``PinnedDispatcher``, it is non-shareable between actors, and associates a dedicated Thread with the actor.
Making it bounded (by specifying a capacity) is optional, but if you do, you need to provide a pushTimeout (default is 10 seconds).
When trying to send a message to the Actor it will throw a MessageQueueAppendFailedException("BlockingMessageTransferQueue transfer timed out")
if the message cannot be added to the mailbox within the time specified by the pushTimeout.
Make sure to include a constructor which takes
``akka.actor.ActorSystem.Settings`` and ``com.typesafe.config.Config``
arguments, as this constructor is invoked reflectively to construct your
mailbox type. The config passed in as second argument is that section from
the configuration which describes the dispatcher using this mailbox type; the
mailbox type will be instantiated once for each dispatcher using it.

View file

@ -12,7 +12,6 @@ Java API
event-bus
scheduler
futures
dataflow
fault-tolerance
dispatchers
routing

View file

@ -180,3 +180,9 @@ By having your Typed Actor implementation class implement any and all of the fol
* ``TypedActor.PostRestart``
You can hook into the lifecycle of your Typed Actor.
Proxying
--------
You can use the ``typedActorOf`` that takes a TypedProps and an ActorRef to proxy the given ActorRef as a TypedActor.
This is usable if you want to communicate remotely with TypedActors on other machines, just look them up with ``actorFor`` and pass the ``ActorRef`` to ``typedActorOf``.

View file

@ -1,6 +0,0 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
//#global
object Global extends com.typesafe.play.mini.Setup(akka.docs.http.PlayMiniApplication)
//#global

View file

@ -1,128 +0,0 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.docs.http
//#imports
import com.typesafe.play.mini.{ POST, GET, Path, Application }
import play.api.mvc.{ Action, AsyncResult }
import play.api.mvc.Results._
import play.api.libs.concurrent._
import play.api.data._
import play.api.data.Forms._
import akka.pattern.ask
import akka.util.Timeout
import akka.util.duration._
import akka.actor.{ ActorSystem, Props, Actor }
import scala.collection.mutable.{ Map MutableMap }
//#imports
//#playMiniDefinition
object PlayMiniApplication extends Application {
//#playMiniDefinition
private val system = ActorSystem("sample")
//#regexURI
private final val StatementPattern = """/account/statement/(\w+)""".r
//#regexURI
private lazy val accountActor = system.actorOf(Props[AccountActor])
implicit val timeout = Timeout(1000 milliseconds)
//#route
def route = {
//#routeLogic
//#simpleGET
case GET(Path("/ping")) Action {
Ok("Pong @ " + System.currentTimeMillis)
}
//#simpleGET
//#regexGET
case GET(Path(StatementPattern(accountId))) Action {
AsyncResult {
//#innerRegexGET
(accountActor ask Status(accountId)).mapTo[Int].asPromise.map { r
if (r >= 0) Ok("Account total: " + r)
else BadRequest("Unknown account: " + accountId)
}
//#innerRegexGET
}
}
//#regexGET
//#asyncDepositPOST
case POST(Path("/account/deposit")) Action { implicit request
//#formAsyncDepositPOST
val (accountId, amount) = commonForm.bindFromRequest.get
//#formAsyncDepositPOST
AsyncResult {
(accountActor ask Deposit(accountId, amount)).mapTo[Int].asPromise.map { r Ok("Updated account total: " + r) }
}
}
//#asyncDepositPOST
//#asyncWithdrawPOST
case POST(Path("/account/withdraw")) Action { implicit request
val (accountId, amount) = commonForm.bindFromRequest.get
AsyncResult {
(accountActor ask Withdraw(accountId, amount)).mapTo[Int].asPromise.map { r
if (r >= 0) Ok("Updated account total: " + r)
else BadRequest("Unknown account or insufficient funds. Get your act together.")
}
}
}
//#asyncWithdrawPOST
//#routeLogic
}
//#route
//#form
val commonForm = Form(
tuple(
"accountId" -> nonEmptyText,
"amount" -> number(min = 1)))
//#form
}
//#cases
case class Status(accountId: String)
case class Deposit(accountId: String, amount: Int)
case class Withdraw(accountId: String, amount: Int)
//#cases
//#actor
class AccountActor extends Actor {
var accounts = MutableMap[String, Int]()
//#receive
def receive = {
//#senderBang
case Status(accountId) sender ! accounts.getOrElse(accountId, -1)
//#senderBang
case Deposit(accountId, amount) sender ! deposit(accountId, amount)
case Withdraw(accountId, amount) sender ! withdraw(accountId, amount)
}
//#receive
private def deposit(accountId: String, amount: Int): Int = {
accounts.get(accountId) match {
case Some(value)
val newValue = value + amount
accounts += accountId -> newValue
newValue
case None
accounts += accountId -> amount
amount
}
}
private def withdraw(accountId: String, amount: Int): Int = {
accounts.get(accountId) match {
case Some(value)
if (value < amount) -1
else {
val newValue = value - amount
accounts += accountId -> newValue
newValue
}
case None -1
}
}
//#actor
}

View file

@ -19,182 +19,4 @@ Getting started
First you must make your application aware of play-mini.
In SBT you just have to add the following to your _libraryDependencies_::
libraryDependencies += "com.typesafe" %% "play-mini" % "2.0-RC3-SNAPSHOT"
Sample Application
------------------
To illustrate how easy it is to wire a RESTful service with Akka we will use a sample application.
The aim of the application is to show how to use play-mini and Akka in combination. Do not put too much
attention on the actual business logic itself, which is a extremely simple bank application, as building a bank
application is a little more complex than what's shown in the sample...
The application should support the following URL commands:
- GET /ping - returns a Pong message with the time of the server (used to see if the application is up and running)
- GET /account/statement/{accountId} - returns the account statement
- POST /account/deposit - deposits money to an account (and creates a new one if it's not already existing)
- POST /account/withdraw - withdraws money from an account
Error messages will be returned in case of any misuse of the application, e.g. withdrawing more money than an
account has etc.
Getting started
---------------
To build a play-mini application you first have to make your object extend com.typesafe.play.mini.Application:
.. includecode:: code/akka/docs/http/PlayMiniApplication.scala
:include: playMiniDefinition
The next step is to implement the mandatory method ``route``:
.. includecode:: code/akka/docs/http/PlayMiniApplication.scala
:include: route
:exclude: routeLogic
It is inside the ``route`` method that all the magic happens.
In the sections below we will show how to set up play-mini to handle both GET and POST HTTP calls.
Simple GET
----------
We start off by creating the simplest method we can - a "ping" method:
.. includecode:: code/akka/docs/http/PlayMiniApplication.scala
:include: simpleGET
As you can see in the section above play-mini uses Scala's wonderful pattern matching.
In the snippet we instruct play-mini to reply to all HTTP GET calls with the URI "/ping".
The ``Action`` returned comes from Play! and you can find more information about it `here <https://github.com/playframework/Play20/wiki/ScalaActions>`_.
.. _Advanced-GET:
Advanced GET
------------
Let's try something more advanced, retrieving parameters from the URI and also make an asynchronous call to an actor:
.. includecode:: code/akka/docs/http/PlayMiniApplication.scala
:include: regexGET
The regular expression looks like this:
.. includecode:: code/akka/docs/http/PlayMiniApplication.scala
:include: regexURI
In the snippets above we extract a URI parameter with the help of a simple regular expression and then we pass this
parameter on to the underlying actor system. As you can see ``AsyncResult`` is being used. This means that the call to
the actor will be performed asynchronously, i.e. no blocking.
The asynchronous call to the actor is being done with a ``ask``, e.g.::
(accountActor ask Status(accountId))
The actor that receives the message returns the result by using a standard *sender !*
as can be seen here:
.. includecode:: code/akka/docs/http/PlayMiniApplication.scala
:include: senderBang
When the result is returned to the calling code we use some mapping code in Play to convert a Akka future to a Play future.
This is shown in this code:
.. includecode:: code/akka/docs/http/PlayMiniApplication.scala
:include: innerRegexGET
In this snippet we check the result to decide what type of response we want to send to the calling client.
Using HTTP POST
---------------
Okay, in the sections above we have shown you how to use play-mini for HTTP GET calls. Let's move on to when the user
posts values to the application.
.. includecode:: code/akka/docs/http/PlayMiniApplication.scala
:include: asyncDepositPOST
As you can see the structure is almost the same as for the :ref:`Advanced-GET`. The difference is that we make the
``request`` parameter ``implicit`` and also that the following line of code is used to extract parameters from the POST.
.. includecode:: code/akka/docs/http/PlayMiniApplication.scala
:include: formAsyncDepositPOST
The code snippet used to map the call to parameters looks like this:
.. includecode:: code/akka/docs/http/PlayMiniApplication.scala
:include: form
Apart from the mapping of parameters the call to the actor looks is done the same as in :ref:`Advanced-GET`.
The Complete Code Sample
------------------------
Below is the complete application in all its beauty.
Global.scala (<yourApp>/src/main/scala/Global.scala):
.. includecode:: code/Global.scala
PlayMiniApplication.scala (<yourApp>/src/main/scala/akka/docs/http/PlayMiniApplication.scala):
.. includecode:: code/akka/docs/http/PlayMiniApplication.scala
Build.scala (<yourApp>/project/Build.scala):
.. code-block:: scala
import sbt._
import Keys._
object PlayMiniApplicationBuild extends Build {
lazy val root = Project(id = "play-mini-application", base = file("."), settings = Project.defaultSettings).settings(
libraryDependencies += "com.typesafe" %% "play-mini" % "2.0-RC3-SNAPSHOT",
mainClass in (Compile, run) := Some("play.core.server.NettyServer"))
}
Running the Application
-----------------------
Firstly, start up the application by opening a command terminal and type::
> sbt
> run
Now you should see something similar to this in your terminal window::
[info] Running play.core.server.NettyServer
Play server process ID is 2523
[info] play - Application started (Prod)
[info] play - Listening for HTTP on port 9000...
In this example we will use the awesome `cURL <http://en.wikipedia.org/wiki/CURL>`_ command to interact with the application.
Fire up a command terminal and try the application out::
First we check the status of a couple of accounts:
> curl http://localhost:9000/account/statement/TheDudesAccount
Unknown account: TheDudesAccount
> curl http://localhost:9000/account/statement/MrLebowskisAccount
Unknown account: MrLebowskisAccount
Now deposit some money to the accounts:
> curl -d "accountId=TheDudesAccount&amount=1000" http://localhost:9000/account/deposit
Updated account total: 1000
> curl -d "accountId=MrLebowskisAccount&amount=500" http://localhost:9000/account/deposit
Updated account total: 500
Next thing is to check the status of the account:
> curl http://localhost:9000/account/statement/TheDudesAccount
Account total: 1000
> curl http://localhost:9000/account/statement/MrLebowskisAccount
Account total: 500
Fair enough, let's try to withdraw some cash shall we:
> curl -d "accountId=TheDudesAccount&amount=999" http://localhost:9000/account/withdraw
Updated account total: 1
> curl -d "accountId=MrLebowskisAccount&amount=999" http://localhost:9000/account/withdraw
Unknown account or insufficient funds. Get your act together.
> curl -d "accountId=MrLebowskisAccount&amount=500" http://localhost:9000/account/withdraw
Updated account total: 0
Yeah, it works!
Now we leave it to the astute reader of this document to take advantage of the power of play-mini and Akka.
libraryDependencies += "com.typesafe" %% "play-mini" % "<version-number>"

View file

@ -19,6 +19,12 @@ be found here:
Release Versions
================
1.3.1
---
- Akka 1.3.1 - http://akka.io/docs/akka/1.3.1/ (or in `PDF format <http://akka.io/docs/akka/1.3.1/Akka.pdf>`__)
- Akka Modules 1.3.1 - http://akka.io/docs/akka-modules/1.3.1/ (or in `PDF format <http://akka.io/docs/akka-modules/1.3.1/AkkaModules.pdf>`__)
1.2
---

View file

@ -6,15 +6,10 @@ package akka.docs.dispatcher
import org.scalatest.{ BeforeAndAfterAll, WordSpec }
import org.scalatest.matchers.MustMatchers
import akka.testkit.AkkaSpec
import akka.actor.Props
import akka.actor.Actor
import akka.event.Logging
import akka.event.LoggingAdapter
import akka.util.duration._
import akka.actor.PoisonPill
import akka.dispatch.MessageDispatcherConfigurator
import akka.dispatch.MessageDispatcher
import akka.dispatch.DispatcherPrerequisites
import akka.actor.{ Props, Actor, PoisonPill, ActorSystem }
object DispatcherDocSpec {
val config = """
@ -33,8 +28,9 @@ object DispatcherDocSpec {
# Max number of threads to cap factor-based parallelism number to
parallelism-max = 10
}
# Throughput defines the number of messages that are processed in a batch before the
# thread is returned to the pool. Set to 1 for as fair as possible.
# Throughput defines the maximum number of messages to be
# processed per actor before the thread jumps to the next actor.
# Set to 1 for as fair as possible.
throughput = 100
}
//#my-dispatcher-config
@ -54,8 +50,9 @@ object DispatcherDocSpec {
# maximum number of threads to cap factor-based number to
core-pool-size-max = 10
}
# Throughput defines the number of messages that are processed in a batch before the
# thread is returned to the pool. Set to 1 for as fair as possible.
# Throughput defines the maximum number of messages to be
# processed per actor before the thread jumps to the next actor.
# Set to 1 for as fair as possible.
throughput = 100
}
//#my-thread-pool-dispatcher-config
@ -94,13 +91,14 @@ object DispatcherDocSpec {
//#prio-dispatcher-config
prio-dispatcher {
mailbox-type = "akka.docs.dispatcher.DispatcherDocSpec$PrioMailbox"
mailbox-type = "akka.docs.dispatcher.DispatcherDocSpec$MyPrioMailbox"
}
//#prio-dispatcher-config
//#prio-dispatcher-config-java
prio-dispatcher-java {
mailbox-type = "akka.docs.dispatcher.DispatcherDocTestBase$PrioMailbox"
mailbox-type = "akka.docs.dispatcher.DispatcherDocTestBase$MyPrioMailbox"
//Other dispatcher configuration goes here
}
//#prio-dispatcher-config-java
"""
@ -108,17 +106,24 @@ object DispatcherDocSpec {
//#prio-mailbox
import akka.dispatch.PriorityGenerator
import akka.dispatch.UnboundedPriorityMailbox
import akka.dispatch.MailboxType
import akka.actor.ActorContext
import com.typesafe.config.Config
// We create a new Priority dispatcher and seed it with the priority generator
class PrioMailbox(config: Config) extends UnboundedPriorityMailbox(
PriorityGenerator { // Create a new PriorityGenerator, lower prio means more important
case 'highpriority 0 // 'highpriority messages should be treated first if possible
case 'lowpriority 100 // 'lowpriority messages should be treated last if possible
case PoisonPill 1000 // PoisonPill when no other left
case otherwise 50 // We default to 50
// We inherit, in this case, from UnboundedPriorityMailbox
// and seed it with the priority generator
class MyPrioMailbox(settings: ActorSystem.Settings, config: Config) extends UnboundedPriorityMailbox(
// Create a new PriorityGenerator, lower prio means more important
PriorityGenerator {
// 'highpriority messages should be treated first if possible
case 'highpriority 0
// 'lowpriority messages should be treated last if possible
case 'lowpriority 2
// PoisonPill when no other left
case PoisonPill 3
// We default to 1, which is in between high and low
case otherwise 1
})
//#prio-mailbox
@ -127,6 +132,29 @@ object DispatcherDocSpec {
case x
}
}
//#mailbox-implementation-example
case class MyUnboundedMailbox() extends akka.dispatch.MailboxType {
import akka.actor.ActorContext
import com.typesafe.config.Config
import java.util.concurrent.ConcurrentLinkedQueue
import akka.dispatch.{
Envelope,
MessageQueue,
QueueBasedMessageQueue,
UnboundedMessageQueueSemantics
}
// This constructor signature must exist, it will be called by Akka
def this(settings: ActorSystem.Settings, config: Config) = this()
// The create method is called to create the MessageQueue
final override def create(owner: Option[ActorContext]): MessageQueue =
new QueueBasedMessageQueue with UnboundedMessageQueueSemantics {
final val queue = new ConcurrentLinkedQueue[Envelope]()
}
//#mailbox-implementation-example
}
}
class DispatcherDocSpec extends AkkaSpec(DispatcherDocSpec.config) {
@ -134,10 +162,11 @@ class DispatcherDocSpec extends AkkaSpec(DispatcherDocSpec.config) {
import DispatcherDocSpec.MyActor
"defining dispatcher" in {
val context = system
//#defining-dispatcher
import akka.actor.Props
val myActor1 = system.actorOf(Props[MyActor].withDispatcher("my-dispatcher"), name = "myactor1")
val myActor2 = system.actorOf(Props[MyActor].withDispatcher("my-dispatcher"), name = "myactor2")
val myActor =
context.actorOf(Props[MyActor].withDispatcher("my-dispatcher"), "myactor1")
//#defining-dispatcher
}
@ -146,15 +175,18 @@ class DispatcherDocSpec extends AkkaSpec(DispatcherDocSpec.config) {
}
"defining pinned dispatcher" in {
val context = system
//#defining-pinned-dispatcher
val myActor = system.actorOf(Props[MyActor].withDispatcher("my-dispatcher"), name = "myactor")
val myActor =
context.actorOf(Props[MyActor].withDispatcher("my-pinned-dispatcher"), "myactor2")
//#defining-pinned-dispatcher
}
"defining priority dispatcher" in {
//#prio-dispatcher
val a = system.actorOf( // We create a new Actor that just prints out what it processes
// We create a new Actor that just prints out what it processes
val a = system.actorOf(
Props(new Actor {
val log: LoggingAdapter = Logging(context.system, this)

View file

@ -6,7 +6,7 @@ package akka.docs.remoting
import akka.actor.{ ExtendedActorSystem, ActorSystem, Actor, ActorRef }
import akka.testkit.{ AkkaSpec, ImplicitSender }
//#import
import akka.actor.{ Props, Deploy, Address, AddressExtractor }
import akka.actor.{ Props, Deploy, Address, AddressFromURIString }
import akka.remote.RemoteScope
//#import
@ -43,7 +43,7 @@ class RemoteDeploymentDocSpec extends AkkaSpec("""
"demonstrate address extractor" in {
//#make-address
val one = AddressExtractor("akka://sys@host:1234")
val one = AddressFromURIString("akka://sys@host:1234")
val two = Address("akka", "sys", "host", 1234) // this gives the same
//#make-address
one must be === two

View file

@ -42,10 +42,10 @@ object RoutingProgrammaticallyExample extends App {
1 to 6 foreach { i router3 ! Message1(i) }
//#remoteRoutees
import akka.actor.{ Address, AddressExtractor }
import akka.actor.{ Address, AddressFromURIString }
val addresses = Seq(
Address("akka", "remotesys", "otherhost", 1234),
AddressExtractor("akka://othersys@anotherhost:1234"))
AddressFromURIString("akka://othersys@anotherhost:1234"))
val routerRemote = system.actorOf(Props[ExampleActor1].withRouter(
RemoteRouterConfig(RoundRobinRouter(5), addresses)))
//#remoteRoutees

View file

@ -26,12 +26,9 @@ Scala's Delimited Continuations plugin is required to use the Dataflow API. To e
.. code-block:: scala
import sbt._
class MyAkkaProject(info: ProjectInfo) extends DefaultProject(info) with AkkaProject with AutoCompilerPlugins {
val continuationsPlugin = compilerPlugin("org.scala-lang.plugins" % "continuations" % "2.9.1")
override def compileOptions = super.compileOptions ++ compileOptions("-P:continuations:enable")
}
autoCompilerPlugins := true,
libraryDependencies <+= scalaVersion { v => compilerPlugin("org.scala-lang.plugins" % "continuations" % <scalaVersion>) },
scalacOptions += "-P:continuations:enable",
Dataflow Variables
------------------
@ -117,7 +114,7 @@ To run these examples:
::
Welcome to Scala version 2.9.0 (Java HotSpot(TM) 64-Bit Server VM, Java 1.6.0_25).
Welcome to Scala version 2.9.1 (Java HotSpot(TM) 64-Bit Server VM, Java 1.6.0_25).
Type in expressions to have them evaluated.
Type :help for more information.

View file

@ -7,202 +7,176 @@ Dispatchers (Scala)
.. contents:: :local:
The Dispatcher is an important piece that allows you to configure the right semantics and parameters for optimal performance, throughput and scalability. Different Actors have different needs.
Akka supports dispatchers for both event-driven lightweight threads, allowing creation of millions of threads on a single workstation, and thread-based Actors, where each dispatcher is bound to a dedicated OS thread.
The event-based Actors currently consume ~600 bytes per Actor which means that you can create more than 6.5 million Actors on 4 GB RAM.
An Akka ``MessageDispatcher`` is what makes Akka Actors "tick", it is the engine of the machine so to speak.
All ``MessageDispatcher`` implementations are also an ``ExecutionContext``, which means that they can be used
to execute arbitrary code, for instance :ref:`futures-scala`.
Default dispatcher
------------------
For most scenarios the default settings are the best. Here we have one single event-based dispatcher for all Actors created.
The default dispatcher is available from the ``ActorSystem.dispatcher`` and can be configured in the ``akka.actor.default-dispatcher``
section of the :ref:`configuration`.
Every ``ActorSystem`` will have a default dispatcher that will be used in case nothing else is configured for an ``Actor``.
The default dispatcher can be configured, and is by default a ``Dispatcher`` with a "fork-join-executor", which gives excellent performance in most cases.
If you are starting to get contention on the single dispatcher (the ``Executor`` and its queue) or want to group a specific set of Actors
for a dedicated dispatcher for better flexibility and configurability then you can override the defaults and define your own dispatcher.
See below for details on which ones are available and how they can be configured.
Setting the dispatcher for an Actor
-----------------------------------
.. warning::
Try to stick to a sensible default dispatcher, that means avoid using CallingThreadDispatcher, BalancingDispatcher or PinnedDispatcher
as the default-dispatcher. This is because they have very specific requirements from the environment in which they are used.
So in case you want to give your ``Actor`` a different dispatcher than the default, you need to do two things, of which the first is:
Setting the dispatcher
----------------------
.. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala#defining-dispatcher
You specify the id of the dispatcher to use when creating an actor. The id corresponds to the :ref:`configuration` key
of the dispatcher settings.
.. note::
The "dispatcherId" you specify in withDispatcher is in fact a path into your configuration.
So in this example it's a top-level section, but you could for instance put it as a sub-section,
where you'd use periods to denote sub-sections, like this: ``"foo.bar.my-dispatcher"``
.. includecode:: code/akka/docs/dispatcher/DispatcherDocSpec.scala
:include: imports,defining-dispatcher
And then you just need to configure that dispatcher in your configuration:
.. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-dispatcher-config
And here's another example that uses the "thread-pool-executor":
.. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-thread-pool-dispatcher-config
For more options, see the default-dispatcher section of the :ref:`configuration`.
Types of dispatchers
--------------------
There are 4 different types of message dispatchers:
* Thread-based (Pinned)
* Event-based
* Priority event-based
* Work-sharing (Balancing)
* Dispatcher
It is recommended to define the dispatcher in :ref:`configuration` to allow for tuning for different environments.
- Sharability: Unlimited
Example of a custom event-based dispatcher, which can be used with ``Props[MyActor].withDispatcher("my-dispatcher")``
as in the example above:
- Mailboxes: Any, creates one per Actor
.. includecode:: code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-dispatcher-config
- Use cases: Default dispatcher, Bulkheading
Default values are taken from ``default-dispatcher``, i.e. all options doesn't need to be defined. See
:ref:`configuration` for the default values of the ``default-dispatcher``. You can also override
the values for the ``default-dispatcher`` in your configuration.
- Driven by: ``java.util.concurrent.ExecutorService``
specify using "executor" using "fork-join-executor",
"thread-pool-executor" or the FQCN of
an ``akka.dispatcher.ExecutorServiceConfigurator``
* PinnedDispatcher
- Sharability: None
- Mailboxes: Any, creates one per Actor
- Use cases: Bulkheading
- Driven by: Any ``akka.dispatch.ThreadPoolExecutorConfigurator``
by default a "thread-pool-executor"
* BalancingDispatcher
- Sharability: Actors of the same type only
- Mailboxes: Any, creates one for all Actors
- Use cases: Work-sharing
- Driven by: ``java.util.concurrent.ExecutorService``
specify using "executor" using "fork-join-executor",
"thread-pool-executor" or the FQCN of
an ``akka.dispatcher.ExecutorServiceConfigurator``
* CallingThreadDispatcher
- Sharability: Unlimited
- Mailboxes: Any, creates one per Actor per Thread (on demand)
- Use cases: Testing
- Driven by: The calling thread (duh)
More dispatcher configuration examples
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Configuring a ``PinnedDispatcher``:
.. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-pinned-dispatcher-config
And then using it:
.. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala#defining-pinned-dispatcher
Mailboxes
---------
An Akka ``Mailbox`` holds the messages that are destined for an ``Actor``.
Normally each ``Actor`` has its own mailbox, but with example a ``BalancingDispatcher`` all actors with the same ``BalancingDispatcher`` will share a single instance.
Builtin implementations
^^^^^^^^^^^^^^^^^^^^^^^
Akka comes shipped with a number of default mailbox implementations:
* UnboundedMailbox
- Backed by a ``java.util.concurrent.ConcurrentLinkedQueue``
- Blocking: No
- Bounded: No
* BoundedMailbox
- Backed by a ``java.util.concurrent.LinkedBlockingQueue``
- Blocking: Yes
- Bounded: Yes
* UnboundedPriorityMailbox
- Backed by a ``java.util.concurrent.PriorityBlockingQueue``
- Blocking: Yes
- Bounded: No
* BoundedPriorityMailbox
- Backed by a ``java.util.PriorityBlockingQueue`` wrapped in an ``akka.util.BoundedBlockingQueue``
- Blocking: Yes
- Bounded: Yes
* Durable mailboxes, see :ref:`durable-mailboxes`.
Mailbox configuration examples
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
How to create a PriorityMailbox:
.. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala#prio-mailbox
And then add it to the configuration:
.. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala#prio-dispatcher-config
And then an example on how you would use it:
.. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala#prio-dispatcher
Creating your own Mailbox type
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
An example is worth a thousand quacks:
.. includecode:: ../scala/code/akka/docs/dispatcher/DispatcherDocSpec.scala#mailbox-implementation-example
And then you just specify the FQCN of your MailboxType as the value of the "mailbox-type" in the dispatcher configuration.
.. note::
It should be noted that the ``dispatcher-id`` used in :class:`Props` is in
fact an absolute path into the configuration object, i.e. you can declare a
dispatcher configuration nested within other configuration objects and refer
to it like so: ``"my.config.object.myAwesomeDispatcher"``
There are two different executor services:
* executor = "fork-join-executor", ``ExecutorService`` based on ForkJoinPool (jsr166y). This is used by default for
``default-dispatcher``.
* executor = "thread-pool-executor", ``ExecutorService`` based on ``java.util.concurrent.ThreadPoolExecutor``.
Note that the pool size is configured differently for the two executor services. The configuration above
is an example for ``fork-join-executor``. Below is an example for ``thread-pool-executor``:
.. includecode:: code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-thread-pool-dispatcher-config
Let's now walk through the different dispatchers in more detail.
Thread-based
^^^^^^^^^^^^
The ``PinnedDispatcher`` binds a dedicated OS thread to each specific Actor. The messages are posted to a
`LinkedBlockingQueue <http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/LinkedBlockingQueue.html>`_
which feeds the messages to the dispatcher one by one. A ``PinnedDispatcher`` cannot be shared between actors. This dispatcher
has worse performance and scalability than the event-based dispatcher but works great for creating "daemon" Actors that consumes
a low frequency of messages and are allowed to go off and do their own thing for a longer period of time. Another advantage with
this dispatcher is that Actors do not block threads for each other.
The ``PinnedDispatcher`` is configured like this:
.. includecode:: code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-pinned-dispatcher-config
Note that it must be used with ``executor = "thread-pool-executor"``.
Event-based
^^^^^^^^^^^
The event-based ``Dispatcher`` binds a set of Actors to a thread pool backed up by a
`BlockingQueue <http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/BlockingQueue.html>`_. This dispatcher is highly configurable
and supports a fluent configuration API to configure the ``BlockingQueue`` (type of queue, max items etc.) as well as the thread pool.
The event-driven dispatchers **must be shared** between multiple Actors. One best practice is to let each top-level Actor, e.g.
the Actors you create from ``system.actorOf`` to get their own dispatcher but reuse the dispatcher for each new Actor
that the top-level Actor creates. But you can also share dispatcher between multiple top-level Actors. This is very use-case specific
and needs to be tried out on a case by case basis. The important thing is that Akka tries to provide you with the freedom you need to
design and implement your system in the most efficient way in regards to performance, throughput and latency.
It comes with many different predefined BlockingQueue configurations:
* Bounded `LinkedBlockingQueue <http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/LinkedBlockingQueue.html>`_
* Unbounded `LinkedBlockingQueue <http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/LinkedBlockingQueue.html>`_
* Bounded `ArrayBlockingQueue <http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/ArrayBlockingQueue.html>`_
* Unbounded `ArrayBlockingQueue <http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/ArrayBlockingQueue.html>`_
* `SynchronousQueue <http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/SynchronousQueue.html>`_
When using a bounded queue and it has grown up to limit defined the message processing will run in the caller's
thread as a way to slow him down and balance producer/consumer.
Here is an example of a bounded mailbox:
.. includecode:: code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-bounded-config
The standard :class:`Dispatcher` allows you to define the ``throughput`` it
should have, as shown above. This defines the number of messages for a specific
Actor the dispatcher should process in one single sweep; in other words, the
dispatcher will batch process up to ``throughput`` messages together when
having elected an actor to run. Setting this to a higher number will increase
throughput but lower fairness, and vice versa. If you don't specify it explicitly
then it uses the value (5) defined for ``default-dispatcher`` in the :ref:`configuration`.
Browse the `ScalaDoc <scaladoc>`_ or look at the code for all the options available.
Priority event-based
^^^^^^^^^^^^^^^^^^^^
Sometimes it's useful to be able to specify priority order of messages, that is done by using Dispatcher and supply
an UnboundedPriorityMailbox or BoundedPriorityMailbox with a ``java.util.Comparator[Envelope]`` or use a
``akka.dispatch.PriorityGenerator`` (recommended).
Creating a Dispatcher with a mailbox using PriorityGenerator:
Config:
.. includecode:: code/akka/docs/dispatcher/DispatcherDocSpec.scala
:include: prio-dispatcher-config
Priority mailbox:
.. includecode:: code/akka/docs/dispatcher/DispatcherDocSpec.scala
:include: prio-mailbox
Usage:
.. includecode:: code/akka/docs/dispatcher/DispatcherDocSpec.scala
:include: prio-dispatcher
Work-sharing event-based
^^^^^^^^^^^^^^^^^^^^^^^^^
The ``BalancingDispatcher`` is a variation of the ``Dispatcher`` in which Actors of the same type can be set up to
share this dispatcher and during execution time the different actors will steal messages from other actors if they
have less messages to process.
Although the technique used in this implementation is commonly known as "work stealing", the actual implementation is probably
best described as "work donating" because the actor of which work is being stolen takes the initiative.
This can be a great way to improve throughput at the cost of a little higher latency.
.. includecode:: code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-balancing-config
Here is an article with some more information: `Load Balancing Actors with Work Stealing Techniques <http://janvanbesien.blogspot.com/2010/03/load-balancing-actors-with-work.html>`_
Here is another article discussing this particular dispatcher: `Flexible load balancing with Akka in Scala <http://vasilrem.com/blog/software-development/flexible-load-balancing-with-akka-in-scala/>`_
Making the Actor mailbox bounded
--------------------------------
Global configuration
^^^^^^^^^^^^^^^^^^^^
You can make the Actor mailbox bounded by a capacity in two ways. Either you define it in the :ref:`configuration` file under
``default-dispatcher``. This will set it globally as default for the DefaultDispatcher and for other configured dispatchers,
if not specified otherwise.
.. code-block:: ruby
akka {
actor {
default-dispatcher {
# If negative (or zero) then an unbounded mailbox is used (default)
# If positive then a bounded mailbox is used and the capacity is set to the number specified
mailbox-capacity = 1000
}
}
}
Per-instance based configuration
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
You can also do it on a specific dispatcher instance.
.. includecode:: code/akka/docs/dispatcher/DispatcherDocSpec.scala#my-bounded-config
For the ``PinnedDispatcher``, it is non-shareable between actors, and associates a dedicated Thread with the actor.
Making it bounded (by specifying a capacity) is optional, but if you do, you need to provide a pushTimeout (default is 10 seconds).
When trying to send a message to the Actor it will throw a MessageQueueAppendFailedException("BlockingMessageTransferQueue transfer timed out")
if the message cannot be added to the mailbox within the time specified by the pushTimeout.
Make sure to include a constructor which takes
``akka.actor.ActorSystem.Settings`` and ``com.typesafe.config.Config``
arguments, as this constructor is invoked reflectively to construct your
mailbox type. The config passed in as second argument is that section from
the configuration which describes the dispatcher using this mailbox type; the
mailbox type will be instantiated once for each dispatcher using it.

View file

@ -9,6 +9,17 @@
.. contents:: :local:
If you want to add features to Akka, there is a very elegant, but powerful mechanism for doing so.
It's called Akka Extensions and is comprised of 2 basic components: an ``Extension`` and an ``ExtensionId``.
Extensions will only be loaded once per ``ActorSystem``, which will be managed by Akka.
You can choose to have your Extension loaded on-demand or at ``ActorSystem`` creation time through the Akka configuration.
Details on how to make that happens are below, in the "Loading from Configuration" section.
.. warning::
Since an extension is a way to hook into Akka itself, the implementor of the extension needs to
ensure the thread safety of his/her extension.
Building an Extension
=====================

View file

@ -181,6 +181,16 @@ all messages that are not ``MethodCall``s will be passed into the ``onReceive``-
This allows you to react to DeathWatch ``Terminated``-messages and other types of messages,
e.g. when interfacing with untyped actors.
Proxying
--------
You can use the ``typedActorOf`` that takes a TypedProps and an ActorRef to proxy the given ActorRef as a TypedActor.
This is usable if you want to communicate remotely with TypedActors on other machines, just look them up with ``actorFor`` and pass the ``ActorRef`` to ``typedActorOf``.
.. note::
The ActorRef needs to accept ``MethodCall`` messages.
Supercharging
-------------

View file

@ -14,12 +14,14 @@ import akka.dispatch.MailboxType
import com.typesafe.config.Config
import akka.config.ConfigurationException
import akka.dispatch.MessageQueue
import akka.actor.ActorSystem
class BeanstalkBasedMailboxException(message: String) extends AkkaException(message) {}
class BeanstalkBasedMailboxType(config: Config) extends MailboxType {
class BeanstalkBasedMailboxType(systemSettings: ActorSystem.Settings, config: Config) extends MailboxType {
private val settings = new BeanstalkMailboxSettings(systemSettings, config)
override def create(owner: Option[ActorContext]): MessageQueue = owner match {
case Some(o) new BeanstalkBasedMessageQueue(o)
case Some(o) new BeanstalkBasedMessageQueue(o, settings)
case None throw new ConfigurationException("creating a durable mailbox requires an owner (i.e. does not work with BalancingDispatcher)")
}
}
@ -27,9 +29,8 @@ class BeanstalkBasedMailboxType(config: Config) extends MailboxType {
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class BeanstalkBasedMessageQueue(_owner: ActorContext) extends DurableMessageQueue(_owner) with DurableMessageSerialization {
class BeanstalkBasedMessageQueue(_owner: ActorContext, val settings: BeanstalkMailboxSettings) extends DurableMessageQueue(_owner) with DurableMessageSerialization {
private val settings = BeanstalkBasedMailboxExtension(owner.system)
private val messageSubmitDelaySeconds = settings.MessageSubmitDelay.toSeconds.toInt
private val messageTimeToLiveSeconds = settings.MessageTimeToLive.toSeconds.toInt

View file

@ -1,28 +0,0 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor.mailbox
import com.typesafe.config.Config
import akka.util.Duration
import java.util.concurrent.TimeUnit.MILLISECONDS
import akka.actor._
object BeanstalkBasedMailboxExtension extends ExtensionId[BeanstalkMailboxSettings] with ExtensionIdProvider {
override def get(system: ActorSystem): BeanstalkMailboxSettings = super.get(system)
def lookup() = this
def createExtension(system: ExtendedActorSystem) = new BeanstalkMailboxSettings(system.settings.config)
}
class BeanstalkMailboxSettings(val config: Config) extends Extension {
import config._
val Hostname = getString("akka.actor.mailbox.beanstalk.hostname")
val Port = getInt("akka.actor.mailbox.beanstalk.port")
val ReconnectWindow = Duration(getMilliseconds("akka.actor.mailbox.beanstalk.reconnect-window"), MILLISECONDS)
val MessageSubmitDelay = Duration(getMilliseconds("akka.actor.mailbox.beanstalk.message-submit-delay"), MILLISECONDS)
val MessageSubmitTimeout = Duration(getMilliseconds("akka.actor.mailbox.beanstalk.message-submit-timeout"), MILLISECONDS)
val MessageTimeToLive = Duration(getMilliseconds("akka.actor.mailbox.beanstalk.message-time-to-live"), MILLISECONDS)
}

View file

@ -0,0 +1,27 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor.mailbox
import com.typesafe.config.Config
import akka.util.Duration
import java.util.concurrent.TimeUnit.MILLISECONDS
import akka.actor.ActorSystem
class BeanstalkMailboxSettings(val systemSettings: ActorSystem.Settings, val userConfig: Config)
extends DurableMailboxSettings {
def name = "beanstalk"
val config = initialize
import config._
val Hostname = getString("hostname")
val Port = getInt("port")
val ReconnectWindow = Duration(getMilliseconds("reconnect-window"), MILLISECONDS)
val MessageSubmitDelay = Duration(getMilliseconds("message-submit-delay"), MILLISECONDS)
val MessageSubmitTimeout = Duration(getMilliseconds("message-submit-timeout"), MILLISECONDS)
val MessageTimeToLive = Duration(getMilliseconds("message-time-to-live"), MILLISECONDS)
}

View file

@ -5,9 +5,26 @@ object BeanstalkBasedMailboxSpec {
Beanstalkd-dispatcher {
mailbox-type = akka.actor.mailbox.BeanstalkBasedMailboxType
throughput = 1
beanstalk {
hostname = "127.0.0.1"
port = 11400
}
}
"""
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class BeanstalkBasedMailboxSpec extends DurableMailboxSpec("Beanstalkd", BeanstalkBasedMailboxSpec.config)
class BeanstalkBasedMailboxSpec extends DurableMailboxSpec("Beanstalkd", BeanstalkBasedMailboxSpec.config) {
lazy val beanstalkd = new ProcessBuilder("beanstalkd", "-b", "beanstalk", "-l", "127.0.0.1", "-p", "11400").start()
override def atStartup(): Unit = {
new java.io.File("beanstalk").mkdir()
beanstalkd
Thread.sleep(3000)
}
override def atTermination(): Unit = beanstalkd.destroy()
}

View file

@ -13,19 +13,20 @@ import akka.dispatch.MailboxType
import com.typesafe.config.Config
import akka.util.NonFatal
import akka.config.ConfigurationException
import akka.actor.ActorSystem
class FileBasedMailboxType(config: Config) extends MailboxType {
class FileBasedMailboxType(systemSettings: ActorSystem.Settings, config: Config) extends MailboxType {
private val settings = new FileBasedMailboxSettings(systemSettings, config)
override def create(owner: Option[ActorContext]): MessageQueue = owner match {
case Some(o) new FileBasedMessageQueue(o)
case Some(o) new FileBasedMessageQueue(o, settings)
case None throw new ConfigurationException("creating a durable mailbox requires an owner (i.e. does not work with BalancingDispatcher)")
}
}
class FileBasedMessageQueue(_owner: ActorContext) extends DurableMessageQueue(_owner) with DurableMessageSerialization {
class FileBasedMessageQueue(_owner: ActorContext, val settings: FileBasedMailboxSettings) extends DurableMessageQueue(_owner) with DurableMessageSerialization {
val log = Logging(system, "FileBasedMessageQueue")
private val settings = FileBasedMailboxExtension(owner.system)
val queuePath = settings.QueuePath
private val queue = try {

View file

@ -1,35 +0,0 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor.mailbox
import com.typesafe.config.Config
import akka.util.Duration
import java.util.concurrent.TimeUnit.MILLISECONDS
import akka.actor._
object FileBasedMailboxExtension extends ExtensionId[FileBasedMailboxSettings] with ExtensionIdProvider {
override def get(system: ActorSystem): FileBasedMailboxSettings = super.get(system)
def lookup() = this
def createExtension(system: ExtendedActorSystem) = new FileBasedMailboxSettings(system.settings.config)
}
class FileBasedMailboxSettings(val config: Config) extends Extension {
import config._
val QueuePath = getString("akka.actor.mailbox.file-based.directory-path")
val MaxItems = getInt("akka.actor.mailbox.file-based.max-items")
val MaxSize = getBytes("akka.actor.mailbox.file-based.max-size")
val MaxItemSize = getBytes("akka.actor.mailbox.file-based.max-item-size")
val MaxAge = Duration(getMilliseconds("akka.actor.mailbox.file-based.max-age"), MILLISECONDS)
val MaxJournalSize = getBytes("akka.actor.mailbox.file-based.max-journal-size")
val MaxMemorySize = getBytes("akka.actor.mailbox.file-based.max-memory-size")
val MaxJournalOverflow = getInt("akka.actor.mailbox.file-based.max-journal-overflow")
val MaxJournalSizeAbsolute = getBytes("akka.actor.mailbox.file-based.max-journal-size-absolute")
val DiscardOldWhenFull = getBoolean("akka.actor.mailbox.file-based.discard-old-when-full")
val KeepJournal = getBoolean("akka.actor.mailbox.file-based.keep-journal")
val SyncJournal = getBoolean("akka.actor.mailbox.file-based.sync-journal")
}

View file

@ -0,0 +1,34 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor.mailbox
import com.typesafe.config.Config
import akka.util.Duration
import java.util.concurrent.TimeUnit.MILLISECONDS
import akka.actor.ActorSystem
class FileBasedMailboxSettings(val systemSettings: ActorSystem.Settings, val userConfig: Config)
extends DurableMailboxSettings {
def name = "file-based"
val config = initialize
import config._
val QueuePath = getString("directory-path")
val MaxItems = getInt("max-items")
val MaxSize = getBytes("max-size")
val MaxItemSize = getBytes("max-item-size")
val MaxAge = Duration(getMilliseconds("max-age"), MILLISECONDS)
val MaxJournalSize = getBytes("max-journal-size")
val MaxMemorySize = getBytes("max-memory-size")
val MaxJournalOverflow = getInt("max-journal-overflow")
val MaxJournalSizeAbsolute = getBytes("max-journal-size-absolute")
val DiscardOldWhenFull = getBoolean("discard-old-when-full")
val KeepJournal = getBoolean("keep-journal")
val SyncJournal = getBoolean("sync-journal")
}

View file

@ -1,12 +1,14 @@
package akka.actor.mailbox
import org.apache.commons.io.FileUtils
import com.typesafe.config.ConfigFactory
object FileBasedMailboxSpec {
val config = """
File-dispatcher {
mailbox-type = akka.actor.mailbox.FileBasedMailboxType
throughput = 1
file-based.directory-path = "file-based"
}
"""
}
@ -14,8 +16,15 @@ object FileBasedMailboxSpec {
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class FileBasedMailboxSpec extends DurableMailboxSpec("File", FileBasedMailboxSpec.config) {
val queuePath = new FileBasedMailboxSettings(system.settings, system.settings.config.getConfig("File-dispatcher")).QueuePath
"FileBasedMailboxSettings" must {
"read the file-based section" in {
queuePath must be("file-based")
}
}
def clean {
val queuePath = FileBasedMailboxExtension(system).QueuePath
FileUtils.deleteDirectory(new java.io.File(queuePath))
}

View file

@ -7,6 +7,8 @@ import akka.actor.{ ActorContext, ActorRef, ExtendedActorSystem }
import akka.dispatch.{ Envelope, MessageQueue }
import akka.remote.MessageSerializer
import akka.remote.RemoteProtocol.{ ActorRefProtocol, RemoteMessageProtocol }
import com.typesafe.config.Config
import akka.actor.ActorSystem
private[akka] object DurableExecutableMailboxConfig {
val Name = "[\\.\\/\\$\\s]".r
@ -50,3 +52,55 @@ trait DurableMessageSerialization { this: DurableMessageQueue ⇒
}
/**
* Conventional organization of durable mailbox settings:
*
* {{{
* my-durable-dispatcher {
* mailbox-type = "my.durable.mailbox"
* my-durable-mailbox {
* setting1 = 1
* setting2 = 2
* }
* }
* }}}
*
* where name=my-durable-mailbox in this example.
*/
trait DurableMailboxSettings {
/**
* A reference to the enclosing actor system.
*/
def systemSettings: ActorSystem.Settings
/**
* A reference to the config section which the user specified for this mailboxs dispatcher.
*/
def userConfig: Config
/**
* The extracted config section for this mailbox, which is the name
* section (if that exists), falling back to system defaults. Typical
* implementation looks like:
*
* {{{
* val config = initialize
* }}}
*/
def config: Config
/**
* Name of this mailbox type for purposes of configuration scoping. Reference
* defaults go into akka.actor.mailbox.<name>.
*/
def name: String
/**
* Obtain default extracted mailbox config section from userConfig and system.
*/
def initialize: Config =
if (userConfig.hasPath(name))
userConfig.getConfig(name).withFallback(systemSettings.config.getConfig("akka.actor.mailbox." + name))
else systemSettings.config.getConfig("akka.actor.mailbox." + name)
}

View file

@ -11,6 +11,9 @@ import akka.dispatch.Await
import akka.testkit.AkkaSpec
import akka.testkit.TestLatch
import akka.util.duration._
import java.io.InputStream
import scala.annotation.tailrec
import com.typesafe.config.Config
object DurableMailboxSpecActorFactory {
@ -31,6 +34,26 @@ object DurableMailboxSpecActorFactory {
abstract class DurableMailboxSpec(val backendName: String, config: String) extends AkkaSpec(config) {
import DurableMailboxSpecActorFactory._
protected def streamMustContain(in: InputStream, words: String): Unit = {
val output = new Array[Byte](8192)
def now = System.currentTimeMillis
def string(len: Int) = new String(output, 0, len, "ISO-8859-1") // dont want parse errors
@tailrec def read(end: Int = 0, start: Long = now): Int =
in.read(output, end, output.length - end) match {
case -1 end
case x
val next = end + x
if (string(next).contains(words) || now - start > 10000 || next == output.length) next
else read(next, start)
}
val result = string(read())
if (!result.contains(words)) throw new Exception("stream did not contain '" + words + "':\n" + result)
}
def createMailboxTestActor(id: String): ActorRef =
system.actorOf(Props(new MailboxTestActor).withDispatcher(backendName + "-dispatcher"))

View file

@ -16,12 +16,14 @@ import akka.dispatch.MailboxType
import com.typesafe.config.Config
import akka.config.ConfigurationException
import akka.dispatch.MessageQueue
import akka.actor.ActorSystem
class MongoBasedMailboxException(message: String) extends AkkaException(message)
class MongoBasedMailboxType(config: Config) extends MailboxType {
class MongoBasedMailboxType(systemSettings: ActorSystem.Settings, config: Config) extends MailboxType {
private val settings = new MongoBasedMailboxSettings(systemSettings, config)
override def create(owner: Option[ActorContext]): MessageQueue = owner match {
case Some(o) new MongoBasedMessageQueue(o)
case Some(o) new MongoBasedMessageQueue(o, settings)
case None throw new ConfigurationException("creating a durable mailbox requires an owner (i.e. does not work with BalancingDispatcher)")
}
}
@ -37,15 +39,13 @@ class MongoBasedMailboxType(config: Config) extends MailboxType {
*
* @author <a href="http://evilmonkeylabs.com">Brendan W. McAdams</a>
*/
class MongoBasedMessageQueue(_owner: ActorContext) extends DurableMessageQueue(_owner) {
class MongoBasedMessageQueue(_owner: ActorContext, val settings: MongoBasedMailboxSettings) extends DurableMessageQueue(_owner) {
// this implicit object provides the context for reading/writing things as MongoDurableMessage
implicit val mailboxBSONSer = new BSONSerializableMessageQueue(system)
implicit val safeWrite = WriteConcern.Safe // TODO - Replica Safe when appropriate!
private val dispatcher = owner.dispatcher
private val settings = MongoBasedMailboxExtension(owner.system)
val log = Logging(system, "MongoBasedMessageQueue")
@volatile
@ -98,9 +98,8 @@ class MongoBasedMessageQueue(_owner: ActorContext) extends DurableMessageQueue(_
def hasMessages: Boolean = numberOfMessages > 0
private[akka] def connect() = {
require(settings.MongoURI.isDefined, "Mongo URI (%s) must be explicitly defined in akka.conf; will not assume defaults for safety sake.".format(settings.UriConfigKey))
log.info("CONNECTING mongodb uri : [{}]", settings.MongoURI)
val _dbh = MongoConnection.fromURI(settings.MongoURI.get) match {
val _dbh = MongoConnection.fromURI(settings.MongoURI) match {
case (conn, None, None) {
throw new UnsupportedOperationException("You must specify a database name to use with MongoDB; please see the MongoDB Connection URI Spec: 'http://www.mongodb.org/display/DOCS/Connections'")
}

View file

@ -1,26 +0,0 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor.mailbox
import com.typesafe.config.Config
import akka.util.Duration
import java.util.concurrent.TimeUnit.MILLISECONDS
import akka.actor._
object MongoBasedMailboxExtension extends ExtensionId[MongoBasedMailboxSettings] with ExtensionIdProvider {
override def get(system: ActorSystem): MongoBasedMailboxSettings = super.get(system)
def lookup() = this
def createExtension(system: ExtendedActorSystem) = new MongoBasedMailboxSettings(system.settings.config)
}
class MongoBasedMailboxSettings(val config: Config) extends Extension {
import config._
val UriConfigKey = "akka.actor.mailbox.mongodb.uri"
val MongoURI = if (config.hasPath(UriConfigKey)) Some(config.getString(UriConfigKey)) else None
val WriteTimeout = Duration(config.getMilliseconds("akka.actor.mailbox.mongodb.timeout.write"), MILLISECONDS)
val ReadTimeout = Duration(config.getMilliseconds("akka.actor.mailbox.mongodb.timeout.read"), MILLISECONDS)
}

View file

@ -0,0 +1,24 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor.mailbox
import com.typesafe.config.Config
import akka.util.Duration
import java.util.concurrent.TimeUnit.MILLISECONDS
import akka.actor.ActorSystem
class MongoBasedMailboxSettings(val systemSettings: ActorSystem.Settings, val userConfig: Config)
extends DurableMailboxSettings {
def name = "mongodb"
val config = initialize
import config._
val MongoURI = getString("uri")
val WriteTimeout = Duration(config.getMilliseconds("timeout.write"), MILLISECONDS)
val ReadTimeout = Duration(config.getMilliseconds("timeout.read"), MILLISECONDS)
}

View file

@ -14,6 +14,7 @@ object MongoBasedMailboxSpec {
mongodb-dispatcher {
mailbox-type = akka.actor.mailbox.MongoBasedMailboxType
throughput = 1
mongodb.uri = "mongodb://localhost:27123/akka.mailbox"
}
"""
}
@ -23,9 +24,23 @@ class MongoBasedMailboxSpec extends DurableMailboxSpec("mongodb", MongoBasedMail
import com.mongodb.async._
val mongo = MongoConnection("localhost", 27017)("akka")
lazy val mongod = new ProcessBuilder("mongod", "--dbpath", "mongoDB", "--bind_ip", "127.0.0.1", "--port", "27123").start()
lazy val mongo = MongoConnection("localhost", 27123)("akka")
override def atStartup(): Unit = {
// start MongoDB daemon
new java.io.File("mongoDB").mkdir()
val in = mongod.getInputStream
try {
streamMustContain(in, "waiting for connections on port")
mongo.dropDatabase() { success }
} catch {
case e mongod.destroy(); throw e
}
}
override def atTermination(): Unit = mongod.destroy()
}

View file

@ -14,19 +14,19 @@ import com.typesafe.config.Config
import akka.util.NonFatal
import akka.config.ConfigurationException
import akka.dispatch.MessageQueue
import akka.actor.ActorSystem
class RedisBasedMailboxException(message: String) extends AkkaException(message)
class RedisBasedMailboxType(config: Config) extends MailboxType {
class RedisBasedMailboxType(systemSettings: ActorSystem.Settings, config: Config) extends MailboxType {
private val settings = new RedisBasedMailboxSettings(systemSettings, config)
override def create(owner: Option[ActorContext]): MessageQueue = owner match {
case Some(o) new RedisBasedMessageQueue(o)
case Some(o) new RedisBasedMessageQueue(o, settings)
case None throw new ConfigurationException("creating a durable mailbox requires an owner (i.e. does not work with BalancingDispatcher)")
}
}
class RedisBasedMessageQueue(_owner: ActorContext) extends DurableMessageQueue(_owner) with DurableMessageSerialization {
private val settings = RedisBasedMailboxExtension(owner.system)
class RedisBasedMessageQueue(_owner: ActorContext, val settings: RedisBasedMailboxSettings) extends DurableMessageQueue(_owner) with DurableMessageSerialization {
@volatile
private var clients = connect() // returns a RedisClientPool for multiple asynchronous message handling

View file

@ -1,21 +0,0 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor.mailbox
import com.typesafe.config.Config
import akka.actor._
object RedisBasedMailboxExtension extends ExtensionId[RedisBasedMailboxSettings] with ExtensionIdProvider {
override def get(system: ActorSystem): RedisBasedMailboxSettings = super.get(system)
def lookup() = this
def createExtension(system: ExtendedActorSystem) = new RedisBasedMailboxSettings(system.settings.config)
}
class RedisBasedMailboxSettings(val config: Config) extends Extension {
import config._
val Hostname = getString("akka.actor.mailbox.redis.hostname")
val Port = getInt("akka.actor.mailbox.redis.port")
}

View file

@ -0,0 +1,20 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor.mailbox
import com.typesafe.config.Config
import akka.actor.ActorSystem
class RedisBasedMailboxSettings(val systemSettings: ActorSystem.Settings, val userConfig: Config)
extends DurableMailboxSettings {
def name = "redis"
val config = initialize
import config._
val Hostname = getString("hostname")
val Port = getInt("port")
}

View file

@ -5,9 +5,40 @@ object RedisBasedMailboxSpec {
Redis-dispatcher {
mailbox-type = akka.actor.mailbox.RedisBasedMailboxType
throughput = 1
redis {
hostname = "127.0.0.1"
port = 6479
}
}
"""
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class RedisBasedMailboxSpec extends DurableMailboxSpec("Redis", RedisBasedMailboxSpec.config)
class RedisBasedMailboxSpec extends DurableMailboxSpec("Redis", RedisBasedMailboxSpec.config) {
lazy val redisServer = new ProcessBuilder("redis-server", "-").start()
override def atStartup(): Unit = {
new java.io.File("redis").mkdir()
val out = redisServer.getOutputStream
val config = """
port 6479
bind 127.0.0.1
dir redis
""".getBytes("UTF-8")
try {
out.write(config)
out.close()
streamMustContain(redisServer.getInputStream, "ready to accept connections on port")
} catch {
case e redisServer.destroy(); throw e
}
}
override def atTermination(): Unit = redisServer.destroy()
}

View file

@ -15,19 +15,20 @@ import com.typesafe.config.Config
import akka.util.NonFatal
import akka.config.ConfigurationException
import akka.dispatch.MessageQueue
import akka.actor.ActorSystem
class ZooKeeperBasedMailboxException(message: String) extends AkkaException(message)
class ZooKeeperBasedMailboxType(config: Config) extends MailboxType {
class ZooKeeperBasedMailboxType(systemSettings: ActorSystem.Settings, config: Config) extends MailboxType {
private val settings = new ZooKeeperBasedMailboxSettings(systemSettings, config)
override def create(owner: Option[ActorContext]): MessageQueue = owner match {
case Some(o) new ZooKeeperBasedMessageQueue(o)
case Some(o) new ZooKeeperBasedMessageQueue(o, settings)
case None throw new ConfigurationException("creating a durable mailbox requires an owner (i.e. does not work with BalancingDispatcher)")
}
}
class ZooKeeperBasedMessageQueue(_owner: ActorContext) extends DurableMessageQueue(_owner) with DurableMessageSerialization {
class ZooKeeperBasedMessageQueue(_owner: ActorContext, val settings: ZooKeeperBasedMailboxSettings) extends DurableMessageQueue(_owner) with DurableMessageSerialization {
private val settings = ZooKeeperBasedMailboxExtension(owner.system)
val queueNode = "/queues"
val queuePathTemplate = queueNode + "/%s"

View file

@ -1,25 +0,0 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor.mailbox
import com.typesafe.config.Config
import akka.util.Duration
import java.util.concurrent.TimeUnit.MILLISECONDS
import akka.actor._
object ZooKeeperBasedMailboxExtension extends ExtensionId[ZooKeeperBasedMailboxSettings] with ExtensionIdProvider {
override def get(system: ActorSystem): ZooKeeperBasedMailboxSettings = super.get(system)
def lookup() = this
def createExtension(system: ExtendedActorSystem) = new ZooKeeperBasedMailboxSettings(system.settings.config)
}
class ZooKeeperBasedMailboxSettings(val config: Config) extends Extension {
import config._
val ZkServerAddresses = getString("akka.actor.mailbox.zookeeper.server-addresses")
val SessionTimeout = Duration(getMilliseconds("akka.actor.mailbox.zookeeper.session-timeout"), MILLISECONDS)
val ConnectionTimeout = Duration(getMilliseconds("akka.actor.mailbox.zookeeper.connection-timeout"), MILLISECONDS)
val BlockingQueue = getBoolean("akka.actor.mailbox.zookeeper.blocking-queue")
}

View file

@ -0,0 +1,25 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor.mailbox
import com.typesafe.config.Config
import akka.util.Duration
import java.util.concurrent.TimeUnit.MILLISECONDS
import akka.actor.ActorSystem
class ZooKeeperBasedMailboxSettings(val systemSettings: ActorSystem.Settings, val userConfig: Config)
extends DurableMailboxSettings {
def name = "zookeeper"
val config = initialize
import config._
val ZkServerAddresses = getString("server-addresses")
val SessionTimeout = Duration(getMilliseconds("session-timeout"), MILLISECONDS)
val ConnectionTimeout = Duration(getMilliseconds("connection-timeout"), MILLISECONDS)
val BlockingQueue = getBoolean("blocking-queue")
}

View file

@ -5,12 +5,15 @@ import akka.cluster.zookeeper._
import org.I0Itec.zkclient._
import akka.dispatch.MessageDispatcher
import akka.actor.ActorRef
import com.typesafe.config.ConfigFactory
import akka.util.duration._
object ZooKeeperBasedMailboxSpec {
val config = """
ZooKeeper-dispatcher {
mailbox-type = akka.actor.mailbox.ZooKeeperBasedMailboxType
throughput = 1
zookeeper.session-timeout = 30s
}
"""
}
@ -21,6 +24,12 @@ class ZooKeeperBasedMailboxSpec extends DurableMailboxSpec("ZooKeeper", ZooKeepe
val dataPath = "_akka_cluster/data"
val logPath = "_akka_cluster/log"
"ZookeeperBasedMailboxSettings" must {
"read the right settings" in {
new ZooKeeperBasedMailboxSettings(system.settings, system.settings.config.getConfig("ZooKeeper-dispatcher")).SessionTimeout must be(30 seconds)
}
}
var zkServer: ZkServer = _
override def atStartup() {

View file

@ -20,10 +20,10 @@ class RemoteDeployer(_settings: ActorSystem.Settings, _pm: DynamicAccess) extend
super.parseConfig(path, config) match {
case d @ Some(deploy)
deploy.config.getString("remote") match {
case AddressExtractor(r) Some(deploy.copy(scope = RemoteScope(r)))
case AddressFromURIString(r) Some(deploy.copy(scope = RemoteScope(r)))
case str
if (!str.isEmpty) throw new ConfigurationException("unparseable remote node name " + str)
val nodes = deploy.config.getStringList("target.nodes").asScala map (AddressExtractor(_))
val nodes = deploy.config.getStringList("target.nodes").asScala map (AddressFromURIString(_))
if (nodes.isEmpty || deploy.routerConfig == NoRouter) d
else Some(deploy.copy(routerConfig = RemoteRouterConfig(deploy.routerConfig, nodes)))
}

View file

@ -10,7 +10,7 @@ import java.net.InetAddress
import akka.config.ConfigurationException
import scala.collection.JavaConverters._
import akka.actor.Address
import akka.actor.AddressExtractor
import akka.actor.AddressFromURIString
class RemoteSettings(val config: Config, val systemName: String) {
import config._

View file

@ -5,7 +5,7 @@
package akka.remote
import scala.reflect.BeanProperty
import akka.actor.{ Terminated, LocalRef, InternalActorRef, AutoReceivedMessage, AddressExtractor, Address, ActorSystemImpl, ActorSystem, ActorRef }
import akka.actor.{ Terminated, LocalRef, InternalActorRef, AutoReceivedMessage, AddressFromURIString, Address, ActorSystemImpl, ActorSystem, ActorRef }
import akka.dispatch.SystemMessage
import akka.event.{ LoggingAdapter, Logging }
import akka.AkkaException
@ -284,7 +284,7 @@ trait RemoteMarshallingOps {
case r: RemoteRef
if (provider.remoteSettings.LogReceive) log.debug("received remote-destined message {}", remoteMessage)
remoteMessage.originalReceiver match {
case AddressExtractor(address) if address == provider.transport.address
case AddressFromURIString(address) if address == provider.transport.address
// if it was originally addressed to us but is in fact remote from our point of view (i.e. remote-deployed)
r.!(remoteMessage.payload)(remoteMessage.sender)
case r log.error("dropping message {} for non-local recipient {} at {} local is {}", remoteMessage.payload, r, address, provider.transport.address)

View file

@ -12,7 +12,7 @@ import akka.actor.InternalActorRef
import akka.actor.Props
import akka.config.ConfigurationException
import akka.remote.RemoteScope
import akka.actor.AddressExtractor
import akka.actor.AddressFromURIString
import akka.actor.SupervisorStrategy
import akka.actor.Address

View file

@ -133,7 +133,7 @@ akka.actor.deployment {
"deploy remote routers based on explicit deployment" in {
val router = system.actorOf(Props[Echo].withRouter(RoundRobinRouter(2))
.withDeploy(Deploy(scope = RemoteScope(AddressExtractor("akka://remote_sys@localhost:12347")))), "remote-blub2")
.withDeploy(Deploy(scope = RemoteScope(AddressFromURIString("akka://remote_sys@localhost:12347")))), "remote-blub2")
router.path.address.toString must be("akka://remote_sys@localhost:12347")
val replies = for (i 1 to 5) yield {
router ! ""
@ -150,7 +150,7 @@ akka.actor.deployment {
"let remote deployment be overridden by local configuration" in {
val router = system.actorOf(Props[Echo].withRouter(RoundRobinRouter(2))
.withDeploy(Deploy(scope = RemoteScope(AddressExtractor("akka://remote_sys@localhost:12347")))), "local-blub")
.withDeploy(Deploy(scope = RemoteScope(AddressFromURIString("akka://remote_sys@localhost:12347")))), "local-blub")
router.path.address.toString must be("akka://RemoteRouterSpec")
val replies = for (i 1 to 5) yield {
router ! ""
@ -167,7 +167,7 @@ akka.actor.deployment {
"let remote deployment router be overridden by local configuration" in {
val router = system.actorOf(Props[Echo].withRouter(RoundRobinRouter(2))
.withDeploy(Deploy(scope = RemoteScope(AddressExtractor("akka://remote_sys@localhost:12347")))), "local-blub2")
.withDeploy(Deploy(scope = RemoteScope(AddressFromURIString("akka://remote_sys@localhost:12347")))), "local-blub2")
router.path.address.toString must be("akka://remote_sys@localhost:12347")
val replies = for (i 1 to 5) yield {
router ! ""
@ -184,7 +184,7 @@ akka.actor.deployment {
"let remote deployment be overridden by remote configuration" in {
val router = system.actorOf(Props[Echo].withRouter(RoundRobinRouter(2))
.withDeploy(Deploy(scope = RemoteScope(AddressExtractor("akka://remote_sys@localhost:12347")))), "remote-override")
.withDeploy(Deploy(scope = RemoteScope(AddressFromURIString("akka://remote_sys@localhost:12347")))), "remote-override")
router.path.address.toString must be("akka://remote_sys@localhost:12347")
val replies = for (i 1 to 5) yield {
router ! ""

View file

@ -5,3 +5,4 @@
(externalResolvers in LsKeys.lsync) := Seq("Akka Repository" at "http://akka.io/repository/")
(description in LsKeys.lsync) := "Akka is the platform for the next generation of event-driven, scalable and fault-tolerant architectures on the JVM."

View file

@ -26,6 +26,7 @@ object AkkaBuild extends Build {
id = "akka",
base = file("."),
settings = parentSettings ++ Release.settings ++ Unidoc.settings ++ Rstdoc.settings ++ Publish.versionSettings ++ Dist.settings ++ Seq(
testMailbox in GlobalScope := System.getProperty("akka.testMailbox", "false").toBoolean,
parallelExecution in GlobalScope := System.getProperty("akka.parallelExecution", "false").toBoolean,
Publish.defaultPublishTo in ThisBuild <<= crossTarget / "repository",
Unidoc.unidocExclude := Seq(samples.id, tutorials.id),
@ -141,6 +142,8 @@ object AkkaBuild extends Build {
// )
// )
val testMailbox = SettingKey[Boolean]("test-mailbox")
lazy val mailboxes = Project(
id = "akka-durable-mailboxes",
base = file("akka-durable-mailboxes"),
@ -165,8 +168,7 @@ object AkkaBuild extends Build {
dependencies = Seq(mailboxesCommon % "compile;test->test"),
settings = defaultSettings ++ Seq(
libraryDependencies ++= Dependencies.beanstalkMailbox,
testBeanstalkMailbox := false,
testOptions in Test <+= testBeanstalkMailbox map { test => Tests.Filter(s => test) }
testOptions in Test <+= testMailbox map { test => Tests.Filter(s => test) }
)
)
@ -179,16 +181,13 @@ object AkkaBuild extends Build {
)
)
val testRedisMailbox = SettingKey[Boolean]("test-redis-mailbox")
lazy val redisMailbox = Project(
id = "akka-redis-mailbox",
base = file("akka-durable-mailboxes/akka-redis-mailbox"),
dependencies = Seq(mailboxesCommon % "compile;test->test"),
settings = defaultSettings ++ Seq(
libraryDependencies ++= Dependencies.redisMailbox,
testRedisMailbox := false,
testOptions in Test <+= testRedisMailbox map { test => Tests.Filter(s => test) }
testOptions in Test <+= testMailbox map { test => Tests.Filter(s => test) }
)
)
@ -201,8 +200,6 @@ object AkkaBuild extends Build {
)
)
val testMongoMailbox = SettingKey[Boolean]("test-mongo-mailbox")
lazy val mongoMailbox = Project(
id = "akka-mongo-mailbox",
base = file("akka-durable-mailboxes/akka-mongo-mailbox"),
@ -210,8 +207,7 @@ object AkkaBuild extends Build {
settings = defaultSettings ++ Seq(
libraryDependencies ++= Dependencies.mongoMailbox,
ivyXML := Dependencies.mongoMailboxExcludes,
testMongoMailbox := false,
testOptions in Test <+= testMongoMailbox map { test => Tests.Filter(s => test) }
testOptions in Test <+= testMailbox map { test => Tests.Filter(s => test) }
)
)
@ -476,7 +472,7 @@ object Dependencies {
val tutorials = Seq(Test.scalatest, Test.junit)
val docs = Seq(Test.scalatest, Test.junit, playMini)
val docs = Seq(Test.scalatest, Test.junit)
val zeroMQ = Seq(Test.scalatest, Test.junit, protobuf, Dependency.zeroMQ)
}
@ -488,8 +484,6 @@ object Dependency {
object V {
val Camel = "2.8.0"
val Jackson = "1.8.0"
val JavaxServlet = "3.0"
val Jersey = "1.3"
val Jetty = "7.4.0.v20110414"
val Logback = "0.9.28"
val Netty = "3.3.0.Final"
@ -500,7 +494,6 @@ object Dependency {
val Slf4j = "1.6.4"
val Spring = "3.0.5.RELEASE"
val Zookeeper = "3.4.0"
val PlayMini = "2.0-RC1-SNAPSHOT"
}
// Compile
@ -537,7 +530,6 @@ object Dependency {
val zookeeper = "org.apache.hadoop.zookeeper" % "zookeeper" % V.Zookeeper // ApacheV2
val zookeeperLock = "org.apache.hadoop.zookeeper" % "zookeeper-recipes-lock" % V.Zookeeper // ApacheV2
val zeroMQ = "org.zeromq" % "zeromq-scala-binding_2.9.1" % "0.0.3" // ApacheV2
val playMini = "com.typesafe" % "play-mini_2.9.1" % V.PlayMini
// Provided