Merge branch 'master' into 1495-routees-message-he

Conflicts:
	akka-actor/src/main/scala/akka/routing/Routing.scala
This commit is contained in:
Henrik Engstrom 2011-12-20 11:18:37 +01:00
commit c92f3c5ca2
57 changed files with 1078 additions and 567 deletions

View file

@ -334,7 +334,7 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout {
clientRef ! "simple"
clientRef ! "simple"
latch.await
Await.ready(latch, timeout.duration)
latch.reset
@ -343,7 +343,7 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout {
clientRef ! "simple"
clientRef ! "simple"
latch.await
Await.ready(latch, timeout.duration)
system.stop(clientRef)
system.stop(serverRef)
@ -370,7 +370,7 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout {
"restart when Kill:ed" in {
filterException[ActorKilledException] {
val latch = new CountDownLatch(2)
val latch = TestLatch(2)
val boss = system.actorOf(Props(new Actor {
@ -385,7 +385,7 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout {
}).withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), 2, 1000)))
boss ! "sendKill"
latch.await(5, TimeUnit.SECONDS) must be === true
Await.ready(latch, 5 seconds)
}
}
}

View file

@ -8,12 +8,14 @@ import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach }
import akka.testkit._
import TestEvent.Mute
import FSM._
import akka.util.Duration
import akka.util.duration._
import akka.event._
import com.typesafe.config.ConfigFactory
import akka.dispatch.Await
import akka.util.{ Timeout, Duration }
object FSMActorSpec {
val timeout = Timeout(2 seconds)
class Latches(implicit system: ActorSystem) {
val unlockedLatch = TestLatch()
@ -122,7 +124,7 @@ class FSMActorSpec extends AkkaSpec(Map("akka.actor.debug.fsm" -> true)) with Im
}))
lock ! SubscribeTransitionCallBack(transitionTester)
initialStateLatch.await
Await.ready(initialStateLatch, timeout.duration)
lock ! '3'
lock ! '3'
@ -130,14 +132,14 @@ class FSMActorSpec extends AkkaSpec(Map("akka.actor.debug.fsm" -> true)) with Im
lock ! '2'
lock ! '1'
unlockedLatch.await
transitionLatch.await
transitionCallBackLatch.await
lockedLatch.await
Await.ready(unlockedLatch, timeout.duration)
Await.ready(transitionLatch, timeout.duration)
Await.ready(transitionCallBackLatch, timeout.duration)
Await.ready(lockedLatch, timeout.duration)
EventFilter.warning(start = "unhandled event", occurrences = 1) intercept {
lock ! "not_handled"
unhandledLatch.await
Await.ready(unhandledLatch, timeout.duration)
}
val answerLatch = TestLatch()
@ -151,10 +153,10 @@ class FSMActorSpec extends AkkaSpec(Map("akka.actor.debug.fsm" -> true)) with Im
}
}))
tester ! Hello
answerLatch.await
Await.ready(answerLatch, timeout.duration)
tester ! Bye
terminatedLatch.await
Await.ready(terminatedLatch, timeout.duration)
}
"log termination" in {
@ -186,7 +188,7 @@ class FSMActorSpec extends AkkaSpec(Map("akka.actor.debug.fsm" -> true)) with Im
}
}
val ref = system.actorOf(Props(fsm))
started.await
Await.ready(started, timeout.duration)
system.stop(ref)
expectMsg(1 second, fsm.StopEvent(Shutdown, 1, null))
}

View file

@ -188,7 +188,7 @@ class IOActorSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout {
val started = TestLatch(1)
val ioManager = system.actorOf(Props(new IOManager(2))) // teeny tiny buffer
val server = system.actorOf(Props(new SimpleEchoServer("localhost", 8064, ioManager, started)))
started.await
Await.ready(started, timeout.duration)
val client = system.actorOf(Props(new SimpleEchoClient("localhost", 8064, ioManager)))
val f1 = client ? ByteString("Hello World!1")
val f2 = client ? ByteString("Hello World!2")
@ -205,7 +205,7 @@ class IOActorSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout {
val started = TestLatch(1)
val ioManager = system.actorOf(Props(new IOManager()))
val server = system.actorOf(Props(new SimpleEchoServer("localhost", 8065, ioManager, started)))
started.await
Await.ready(started, timeout.duration)
val client = system.actorOf(Props(new SimpleEchoClient("localhost", 8065, ioManager)))
val list = List.range(0, 1000)
val f = Future.traverse(list)(i client ? ByteString(i.toString))
@ -219,7 +219,7 @@ class IOActorSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout {
val started = TestLatch(1)
val ioManager = system.actorOf(Props(new IOManager(2)))
val server = system.actorOf(Props(new SimpleEchoServer("localhost", 8066, ioManager, started)))
started.await
Await.ready(started, timeout.duration)
val client = system.actorOf(Props(new SimpleEchoClient("localhost", 8066, ioManager)))
val list = List.range(0, 1000)
val f = Future.traverse(list)(i client ? ByteString(i.toString))
@ -233,7 +233,7 @@ class IOActorSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout {
val started = TestLatch(1)
val ioManager = system.actorOf(Props(new IOManager(2))) // teeny tiny buffer
val server = system.actorOf(Props(new KVStore("localhost", 8067, ioManager, started)))
started.await
Await.ready(started, timeout.duration)
val client1 = system.actorOf(Props(new KVClient("localhost", 8067, ioManager)))
val client2 = system.actorOf(Props(new KVClient("localhost", 8067, ioManager)))
val f1 = client1 ? (('set, "hello", ByteString("World")))

View file

@ -8,6 +8,8 @@ import akka.testkit._
import akka.util.duration._
import java.util.concurrent.atomic.AtomicInteger
import akka.dispatch.Await
import java.util.concurrent.TimeoutException
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class ReceiveTimeoutSpec extends AkkaSpec {
@ -25,7 +27,7 @@ class ReceiveTimeoutSpec extends AkkaSpec {
}
}))
timeoutLatch.await
Await.ready(timeoutLatch, TestLatch.DefaultTimeout)
system.stop(timeoutActor)
}
@ -44,7 +46,7 @@ class ReceiveTimeoutSpec extends AkkaSpec {
timeoutActor ! Tick
timeoutLatch.await
Await.ready(timeoutLatch, TestLatch.DefaultTimeout)
system.stop(timeoutActor)
}
@ -67,7 +69,7 @@ class ReceiveTimeoutSpec extends AkkaSpec {
timeoutActor ! Tick
timeoutLatch.await
Await.ready(timeoutLatch, TestLatch.DefaultTimeout)
count.get must be(1)
system.stop(timeoutActor)
}
@ -81,7 +83,7 @@ class ReceiveTimeoutSpec extends AkkaSpec {
}
}))
timeoutLatch.awaitTimeout(1 second) // timeout expected
intercept[TimeoutException] { Await.ready(timeoutLatch, 1 second) }
system.stop(timeoutActor)
}

View file

@ -32,7 +32,7 @@ class RestartStrategySpec extends AkkaSpec with DefaultTimeout {
val restartLatch = new TestLatch
val secondRestartLatch = new TestLatch
val countDownLatch = new CountDownLatch(3)
val countDownLatch = new TestLatch(3)
val stopLatch = new TestLatch
val slaveProps = Props(new Actor {
@ -60,23 +60,23 @@ class RestartStrategySpec extends AkkaSpec with DefaultTimeout {
slave ! Ping
// test restart and post restart ping
assert(restartLatch.await(10 seconds))
Await.ready(restartLatch, 10 seconds)
// now crash again... should not restart
slave ! Crash
slave ! Ping
assert(secondRestartLatch.await(10 seconds))
assert(countDownLatch.await(10, TimeUnit.SECONDS))
Await.ready(secondRestartLatch, 10 seconds)
Await.ready(countDownLatch, 10 seconds)
slave ! Crash
assert(stopLatch.await(10 seconds))
Await.ready(stopLatch, 10 seconds)
}
"ensure that slave is immortal without max restarts and time range" in {
val boss = system.actorOf(Props[Supervisor].withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), None, None)))
val countDownLatch = new CountDownLatch(100)
val countDownLatch = new TestLatch(100)
val slaveProps = Props(new Actor {
@ -91,7 +91,7 @@ class RestartStrategySpec extends AkkaSpec with DefaultTimeout {
val slave = Await.result((boss ? slaveProps).mapTo[ActorRef], timeout.duration)
(1 to 100) foreach { _ slave ! Crash }
assert(countDownLatch.await(120, TimeUnit.SECONDS))
Await.ready(countDownLatch, 2 minutes)
assert(!slave.isTerminated)
}
@ -131,14 +131,14 @@ class RestartStrategySpec extends AkkaSpec with DefaultTimeout {
slave ! Ping
slave ! Crash
assert(restartLatch.await(10 seconds))
assert(pingLatch.await(10 seconds))
Await.ready(restartLatch, 10 seconds)
Await.ready(pingLatch, 10 seconds)
slave ! Ping
slave ! Crash
assert(secondRestartLatch.await(10 seconds))
assert(secondPingLatch.await(10 seconds))
Await.ready(secondRestartLatch, 10 seconds)
Await.ready(secondPingLatch, 10 seconds)
// sleep to go out of the restart strategy's time range
sleep(700L)
@ -147,7 +147,7 @@ class RestartStrategySpec extends AkkaSpec with DefaultTimeout {
slave ! Crash
slave ! Ping
assert(thirdRestartLatch.await(1 second))
Await.ready(thirdRestartLatch, 1 second)
assert(!slave.isTerminated)
}
@ -157,7 +157,7 @@ class RestartStrategySpec extends AkkaSpec with DefaultTimeout {
val restartLatch = new TestLatch
val secondRestartLatch = new TestLatch
val countDownLatch = new CountDownLatch(3)
val countDownLatch = new TestLatch(3)
val stopLatch = new TestLatch
val slaveProps = Props(new Actor {
@ -184,7 +184,7 @@ class RestartStrategySpec extends AkkaSpec with DefaultTimeout {
slave ! Ping
// test restart and post restart ping
assert(restartLatch.await(10 seconds))
Await.ready(restartLatch, 10 seconds)
assert(!slave.isTerminated)
@ -192,20 +192,20 @@ class RestartStrategySpec extends AkkaSpec with DefaultTimeout {
slave ! Crash
slave ! Ping
assert(secondRestartLatch.await(10 seconds))
assert(countDownLatch.await(10, TimeUnit.SECONDS))
Await.ready(secondRestartLatch, 10 seconds)
Await.ready(countDownLatch, 10 seconds)
sleep(700L)
slave ! Crash
assert(stopLatch.await(10 seconds))
Await.ready(stopLatch, 10 seconds)
sleep(500L)
assert(slave.isTerminated)
}
"ensure that slave is not restarted within time range" in {
val restartLatch, stopLatch, maxNoOfRestartsLatch = new TestLatch
val countDownLatch = new CountDownLatch(2)
val countDownLatch = new TestLatch(2)
val boss = system.actorOf(Props(new Actor {
def receive = {
@ -236,7 +236,7 @@ class RestartStrategySpec extends AkkaSpec with DefaultTimeout {
slave ! Ping
// test restart and post restart ping
assert(restartLatch.await(10 seconds))
Await.ready(restartLatch, 10 seconds)
assert(!slave.isTerminated)
@ -245,14 +245,14 @@ class RestartStrategySpec extends AkkaSpec with DefaultTimeout {
// may not be running
slave ! Ping
assert(countDownLatch.await(10, TimeUnit.SECONDS))
Await.ready(countDownLatch, 10 seconds)
// may not be running
slave ! Crash
assert(stopLatch.await(10 seconds))
Await.ready(stopLatch, 10 seconds)
assert(maxNoOfRestartsLatch.await(10 seconds))
Await.ready(maxNoOfRestartsLatch, 10 seconds)
sleep(500L)
assert(slave.isTerminated)
}

View file

@ -103,7 +103,7 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout
object Crash
val restartLatch = new TestLatch
val pingLatch = new CountDownLatch(6)
val pingLatch = new TestLatch(6)
val supervisor = system.actorOf(Props[Supervisor].withFaultHandler(AllForOneStrategy(List(classOf[Exception]), 3, 1000)))
val props = Props(new Actor {
@ -122,13 +122,13 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout
collectCancellable(system.scheduler.scheduleOnce(1000 milliseconds, actor, Crash))
}
assert(restartLatch.await(2 seconds))
Await.ready(restartLatch, 2 seconds)
// should be enough time for the ping countdown to recover and reach 6 pings
assert(pingLatch.await(4, TimeUnit.SECONDS))
Await.ready(pingLatch, 5 seconds)
}
"never fire prematurely" in {
val ticks = new CountDownLatch(300)
val ticks = new TestLatch(300)
case class Msg(ts: Long)
@ -147,11 +147,11 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout
Thread.sleep(5)
}
assert(ticks.await(3, TimeUnit.SECONDS) == true)
Await.ready(ticks, 3 seconds)
}
"schedule with different initial delay and frequency" in {
val ticks = new CountDownLatch(3)
val ticks = new TestLatch(3)
case object Msg
@ -162,12 +162,12 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout
}))
val startTime = System.nanoTime()
val cancellable = system.scheduler.schedule(1 second, 100 milliseconds, actor, Msg)
ticks.await(3, TimeUnit.SECONDS)
val cancellable = system.scheduler.schedule(1 second, 300 milliseconds, actor, Msg)
Await.ready(ticks, 3 seconds)
val elapsedTimeMs = (System.nanoTime() - startTime) / 1000000
assert(elapsedTimeMs > 1200)
assert(elapsedTimeMs < 1500) // the precision is not ms exact
assert(elapsedTimeMs > 1600)
assert(elapsedTimeMs < 2000) // the precision is not ms exact
cancellable.cancel()
}
}

View file

@ -5,6 +5,7 @@ import akka.actor._
import akka.actor.Actor._
import akka.routing._
import java.util.concurrent.atomic.AtomicInteger
import akka.dispatch.Await
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class ListenerSpec extends AkkaSpec {
@ -45,10 +46,10 @@ class ListenerSpec extends AkkaSpec {
broadcast ! WithListeners(_ ! "foo")
broadcast ! "foo"
barLatch.await
Await.ready(barLatch, TestLatch.DefaultTimeout)
barCount.get must be(2)
fooLatch.await
Await.ready(fooLatch, TestLatch.DefaultTimeout)
for (a List(broadcast, a1, a2, a3)) system.stop(a)
}

View file

@ -32,6 +32,7 @@ class ConfigSpec extends AkkaSpec(ConfigFactory.defaultReference) {
getBoolean("akka.actor.default-dispatcher.allow-core-timeout") must equal(true)
getInt("akka.actor.default-dispatcher.mailbox-capacity") must equal(-1)
getMilliseconds("akka.actor.default-dispatcher.mailbox-push-timeout-time") must equal(10 * 1000)
getString("akka.actor.default-dispatcher.mailboxType") must be("")
getMilliseconds("akka.actor.dispatcher-shutdown-timeout") must equal(1 * 1000)
settings.DispatcherDefaultShutdown must equal(1 second)
getInt("akka.actor.default-dispatcher.throughput") must equal(5)

View file

@ -28,10 +28,10 @@ object FutureSpec {
class TestDelayActor(await: TestLatch) extends Actor {
def receive = {
case "Hello" await.await; sender ! "World"
case "NoReply" await.await
case "Hello" Await.ready(await, TestLatch.DefaultTimeout); sender ! "World"
case "NoReply" Await.ready(await, TestLatch.DefaultTimeout)
case "Failure"
await.await
Await.ready(await, TestLatch.DefaultTimeout)
sender ! Status.Failure(new RuntimeException("Expected exception; to test fault-tolerance"))
}
}
@ -72,7 +72,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
val latch = new TestLatch
val result = "test value"
val future = Future {
latch.await
Await.ready(latch, TestLatch.DefaultTimeout)
result
}
test(future)
@ -85,7 +85,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
val latch = new TestLatch
val result = "test value"
val future = Future {
latch.await
Await.ready(latch, TestLatch.DefaultTimeout)
result
}
latch.open()
@ -395,7 +395,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
val latch = new TestLatch
val actor = system.actorOf(Props[TestActor])
actor ? "Hello" onSuccess { case "World" latch.open() }
assert(latch.await(5 seconds))
Await.ready(latch, 5 seconds)
system.stop(actor)
}
@ -426,7 +426,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
intercept[ThrowableTest] { Await.result(f1, timeout.duration) }
val latch = new TestLatch
val f2 = Future { latch.await(5 seconds); "success" }
val f2 = Future { Await.ready(latch, 5 seconds); "success" }
f2 foreach (_ throw new ThrowableTest("dispatcher foreach"))
f2 onSuccess { case _ throw new ThrowableTest("dispatcher receive") }
val f3 = f2 map (s s.toUpperCase)
@ -441,7 +441,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
"shouldBlockUntilResult" in {
val latch = new TestLatch
val f = Future { latch.await; 5 }
val f = Future { Await.ready(latch, 5 seconds); 5 }
val f2 = Future { Await.result(f, timeout.duration) + 5 }
intercept[TimeoutException](Await.ready(f2, 100 millis))
@ -525,8 +525,8 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
z() + y()
}
assert(ly.await(100 milliseconds))
lz.awaitTimeout(100 milliseconds)
Await.ready(ly, 100 milliseconds)
intercept[TimeoutException] { Await.ready(lz, 100 milliseconds) }
flow { x << 5 }
@ -588,20 +588,20 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
lz.open()
x1() + x2()
}
assert(lx.await(2 seconds))
Await.ready(lx, 2 seconds)
assert(!ly.isOpen)
assert(!lz.isOpen)
assert(List(x1, x2, y1, y2).forall(_.isCompleted == false))
flow { y1 << 1 } // When this is set, it should cascade down the line
assert(ly.await(2 seconds))
Await.ready(ly, 2 seconds)
assert(Await.result(x1, 1 minute) === 1)
assert(!lz.isOpen)
flow { y2 << 9 } // When this is set, it should cascade down the line
assert(lz.await(2 seconds))
Await.ready(lz, 2 seconds)
assert(Await.result(x2, 1 minute) === 9)
assert(List(x1, x2, y1, y2).forall(_.isCompleted))
@ -614,16 +614,16 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
val i1, i2, s1, s2 = new TestLatch
val callService1 = Future { i1.open(); s1.await; 1 }
val callService2 = Future { i2.open(); s2.await; 9 }
val callService1 = Future { i1.open(); Await.ready(s1, TestLatch.DefaultTimeout); 1 }
val callService2 = Future { i2.open(); Await.ready(s2, TestLatch.DefaultTimeout); 9 }
val result = flow { callService1() + callService2() }
assert(!s1.isOpen)
assert(!s2.isOpen)
assert(!result.isCompleted)
assert(i1.await(2 seconds))
assert(i2.await(2 seconds))
Await.ready(i1, 2 seconds)
Await.ready(i2, 2 seconds)
s1.open()
s2.open()
assert(Await.result(result, timeout.duration) === 10)
@ -644,10 +644,8 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
lz.open()
z() + y() + oops
}
ly.awaitTimeout(100 milliseconds)
lz.awaitTimeout(100 milliseconds)
intercept[TimeoutException] { Await.ready(ly, 100 milliseconds) }
intercept[TimeoutException] { Await.ready(lz, 100 milliseconds) }
flow { x << 5 }
assert(Await.result(y, timeout.duration) === 5)
@ -662,7 +660,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
val latch = new TestLatch
val future = Future {
latch.await
Await.ready(latch, TestLatch.DefaultTimeout)
"Hello"
}
@ -745,36 +743,36 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
"run callbacks async" in {
val latch = Vector.fill(10)(new TestLatch)
val f1 = Future { latch(0).open(); latch(1).await; "Hello" }
val f2 = f1 map { s latch(2).open(); latch(3).await; s.length }
val f1 = Future { latch(0).open(); Await.ready(latch(1), TestLatch.DefaultTimeout); "Hello" }
val f2 = f1 map { s latch(2).open(); Await.ready(latch(3), TestLatch.DefaultTimeout); s.length }
f2 foreach (_ latch(4).open())
latch(0).await
Await.ready(latch(0), TestLatch.DefaultTimeout)
f1 must not be ('completed)
f2 must not be ('completed)
latch(1).open()
latch(2).await
Await.ready(latch(2), TestLatch.DefaultTimeout)
f1 must be('completed)
f2 must not be ('completed)
val f3 = f1 map { s latch(5).open(); latch(6).await; s.length * 2 }
val f3 = f1 map { s latch(5).open(); Await.ready(latch(6), TestLatch.DefaultTimeout); s.length * 2 }
f3 foreach (_ latch(3).open())
latch(5).await
Await.ready(latch(5), TestLatch.DefaultTimeout)
f3 must not be ('completed)
latch(6).open()
latch(4).await
Await.ready(latch(4), TestLatch.DefaultTimeout)
f2 must be('completed)
f3 must be('completed)
val p1 = Promise[String]()
val f4 = p1 map { s latch(7).open(); latch(8).await; s.length }
val f4 = p1 map { s latch(7).open(); Await.ready(latch(8), TestLatch.DefaultTimeout); s.length }
f4 foreach (_ latch(9).open())
p1 must not be ('completed)
@ -782,13 +780,13 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
p1 complete Right("Hello")
latch(7).await
Await.ready(latch(7), TestLatch.DefaultTimeout)
p1 must be('completed)
f4 must not be ('completed)
latch(8).open()
latch(9).await
Await.ready(latch(9), TestLatch.DefaultTimeout)
Await.ready(f4, timeout.duration) must be('completed)
}
@ -802,9 +800,9 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa
Future.blocking(system.dispatcher)
val nested = Future(())
nested foreach (_ l1.open())
l1.await // make sure nested is completed
Await.ready(l1, TestLatch.DefaultTimeout) // make sure nested is completed
nested foreach (_ l2.open())
l2.await
Await.ready(l2, TestLatch.DefaultTimeout)
}
Await.ready(complex, timeout.duration) must be('completed)
}

View file

@ -1,9 +1,13 @@
package akka.dispatch
import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach }
import java.util.concurrent.{ TimeUnit, BlockingQueue }
import java.util.concurrent.ConcurrentLinkedQueue
import akka.util._
import akka.util.duration._
import akka.testkit.AkkaSpec
import akka.actor.ActorRef
import akka.actor.ActorContext
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAndAfterEach {
@ -144,3 +148,26 @@ class PriorityMailboxSpec extends MailboxSpec {
case BoundedMailbox(capacity, pushTimeOut) BoundedPriorityMailbox(comparator, capacity, pushTimeOut).create(null)
}
}
object CustomMailboxSpec {
val config = """
my-dispatcher {
mailboxType = "akka.dispatch.CustomMailboxSpec$MyMailbox"
}
"""
class MyMailbox(owner: ActorContext) extends CustomMailbox(owner)
with QueueBasedMessageQueue with UnboundedMessageQueueSemantics with DefaultSystemMessageQueue {
final val queue = new ConcurrentLinkedQueue[Envelope]()
}
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class CustomMailboxSpec extends AkkaSpec(CustomMailboxSpec.config) {
"Dispatcher configuration" must {
"support custom mailboxType" in {
val dispatcher = system.dispatcherFactory.newFromConfig("my-dispatcher")
dispatcher.createMailbox(null).getClass must be(classOf[CustomMailboxSpec.MyMailbox])
}
}
}

View file

@ -92,8 +92,8 @@ class ActorPoolSpec extends AkkaSpec with DefaultTimeout {
pool ! "a"
pool ! "b"
latch.await
successes.await
Await.ready(latch, TestLatch.DefaultTimeout)
Await.ready(successes, TestLatch.DefaultTimeout)
count.get must be(2)
@ -180,7 +180,7 @@ class ActorPoolSpec extends AkkaSpec with DefaultTimeout {
loops = 2
loop(500)
latch.await
Await.ready(latch, TestLatch.DefaultTimeout)
count.get must be(loops)
Await.result((pool ? ActorPool.Stat).mapTo[ActorPool.Stats], timeout.duration).size must be(2)
@ -189,7 +189,7 @@ class ActorPoolSpec extends AkkaSpec with DefaultTimeout {
loops = 10
loop(500)
latch.await
Await.ready(latch, TestLatch.DefaultTimeout)
count.get must be(loops)
Await.result((pool ? ActorPool.Stat).mapTo[ActorPool.Stats], timeout.duration).size must be(4)
@ -236,7 +236,7 @@ class ActorPoolSpec extends AkkaSpec with DefaultTimeout {
// send a few messages and observe pool at its lower bound
loops = 3
loop(500)
latch.await
Await.ready(latch, TestLatch.DefaultTimeout)
count.get must be(loops)
Await.result((pool ? ActorPool.Stat).mapTo[ActorPool.Stats], timeout.duration).size must be(2)
@ -245,7 +245,7 @@ class ActorPoolSpec extends AkkaSpec with DefaultTimeout {
loops = 15
loop(500)
latch.await(10 seconds)
Await.ready(latch, 10 seconds)
count.get must be(loops)
Await.result((pool ? ActorPool.Stat).mapTo[ActorPool.Stats], timeout.duration).size must be >= (3)
@ -278,7 +278,7 @@ class ActorPoolSpec extends AkkaSpec with DefaultTimeout {
pool1 ! "a"
pool1 ! "b"
latch1.await
Await.ready(latch1, TestLatch.DefaultTimeout)
delegates.size must be(1)
system.stop(pool1)
@ -306,7 +306,7 @@ class ActorPoolSpec extends AkkaSpec with DefaultTimeout {
pool2 ! "a"
pool2 ! "b"
latch2.await
Await.ready(latch2, TestLatch.DefaultTimeout)
delegates.size must be(2)
system.stop(pool2)

View file

@ -3,7 +3,6 @@ package akka.routing
import akka.actor._
import akka.routing._
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.{ CountDownLatch, TimeUnit }
import akka.testkit._
import akka.util.duration._
import akka.dispatch.Await
@ -30,8 +29,8 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec with DefaultTimeout with Impli
"round robin router" must {
"be able to shut down its instance" in {
val helloLatch = new CountDownLatch(5)
val stopLatch = new CountDownLatch(5)
val helloLatch = new TestLatch(5)
val stopLatch = new TestLatch(5)
val actor = system.actorOf(Props(new Actor {
def receive = {
@ -48,16 +47,16 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec with DefaultTimeout with Impli
actor ! "hello"
actor ! "hello"
actor ! "hello"
helloLatch.await(5, TimeUnit.SECONDS) must be(true)
Await.ready(helloLatch, 5 seconds)
system.stop(actor)
stopLatch.await(5, TimeUnit.SECONDS) must be(true)
Await.ready(stopLatch, 5 seconds)
}
"deliver messages in a round robin fashion" in {
val connectionCount = 10
val iterationCount = 10
val doneLatch = new CountDownLatch(connectionCount)
val doneLatch = new TestLatch(connectionCount)
val counter = new AtomicInteger
var replies = Map.empty[Int, Int]
@ -83,14 +82,14 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec with DefaultTimeout with Impli
counter.get must be(connectionCount)
actor ! Broadcast("end")
doneLatch.await(5, TimeUnit.SECONDS) must be(true)
Await.ready(doneLatch, 5 seconds)
replies.values foreach { _ must be(iterationCount) }
}
"deliver a broadcast message using the !" in {
val helloLatch = new CountDownLatch(5)
val stopLatch = new CountDownLatch(5)
val helloLatch = new TestLatch(5)
val stopLatch = new TestLatch(5)
val actor = system.actorOf(Props(new Actor {
def receive = {
@ -103,17 +102,17 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec with DefaultTimeout with Impli
}).withRouter(RoundRobinRouter(5)), "round-robin-broadcast")
actor ! Broadcast("hello")
helloLatch.await(5, TimeUnit.SECONDS) must be(true)
Await.ready(helloLatch, 5 seconds)
system.stop(actor)
stopLatch.await(5, TimeUnit.SECONDS) must be(true)
Await.ready(stopLatch, 5 seconds)
}
}
"random router" must {
"be able to shut down its instance" in {
val stopLatch = new CountDownLatch(7)
val stopLatch = new TestLatch(7)
val actor = system.actorOf(Props(new Actor {
def receive = {
@ -136,13 +135,13 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec with DefaultTimeout with Impli
}
system.stop(actor)
stopLatch.await(5, TimeUnit.SECONDS) must be(true)
Await.ready(stopLatch, 5 seconds)
}
"deliver messages in a random fashion" in {
val connectionCount = 10
val iterationCount = 10
val doneLatch = new CountDownLatch(connectionCount)
val doneLatch = new TestLatch(connectionCount)
val counter = new AtomicInteger
var replies = Map.empty[Int, Int]
@ -168,15 +167,15 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec with DefaultTimeout with Impli
counter.get must be(connectionCount)
actor ! Broadcast("end")
doneLatch.await(5, TimeUnit.SECONDS) must be(true)
Await.ready(doneLatch, 5 seconds)
replies.values foreach { _ must be > (0) }
replies.values.sum must be === iterationCount * connectionCount
}
"deliver a broadcast message using the !" in {
val helloLatch = new CountDownLatch(6)
val stopLatch = new CountDownLatch(6)
val helloLatch = new TestLatch(6)
val stopLatch = new TestLatch(6)
val actor = system.actorOf(Props(new Actor {
def receive = {
@ -189,10 +188,10 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec with DefaultTimeout with Impli
}).withRouter(RandomRouter(6)), "random-broadcast")
actor ! Broadcast("hello")
helloLatch.await(5, TimeUnit.SECONDS) must be(true)
Await.ready(helloLatch, 5 seconds)
system.stop(actor)
stopLatch.await(5, TimeUnit.SECONDS) must be(true)
Await.ready(stopLatch, 5 seconds)
}
}
}

View file

@ -105,7 +105,7 @@ class RoutingSpec extends AkkaSpec(ConfigFactory.parseString("""
}
"send message to connection" in {
val doneLatch = new CountDownLatch(1)
val doneLatch = new TestLatch(1)
val counter = new AtomicInteger(0)
@ -120,7 +120,7 @@ class RoutingSpec extends AkkaSpec(ConfigFactory.parseString("""
routedActor ! "hello"
routedActor ! "end"
doneLatch.await(5, TimeUnit.SECONDS) must be(true)
Await.ready(doneLatch, 5 seconds)
counter.get must be(1)
}
@ -139,7 +139,7 @@ class RoutingSpec extends AkkaSpec(ConfigFactory.parseString("""
"deliver messages in a round robin fashion" in {
val connectionCount = 10
val iterationCount = 10
val doneLatch = new CountDownLatch(connectionCount)
val doneLatch = new TestLatch(connectionCount)
//lets create some connections.
var actors = new LinkedList[ActorRef]
@ -167,7 +167,7 @@ class RoutingSpec extends AkkaSpec(ConfigFactory.parseString("""
routedActor ! Broadcast("end")
//now wait some and do validations.
doneLatch.await(5, TimeUnit.SECONDS) must be(true)
Await.ready(doneLatch, 5 seconds)
for (i 0 until connectionCount) {
val counter = counters.get(i).get
@ -176,7 +176,7 @@ class RoutingSpec extends AkkaSpec(ConfigFactory.parseString("""
}
"deliver a broadcast message using the !" in {
val doneLatch = new CountDownLatch(2)
val doneLatch = new TestLatch(2)
val counter1 = new AtomicInteger
val actor1 = system.actorOf(Props(new Actor {
@ -199,7 +199,7 @@ class RoutingSpec extends AkkaSpec(ConfigFactory.parseString("""
routedActor ! Broadcast(1)
routedActor ! Broadcast("end")
doneLatch.await(5, TimeUnit.SECONDS) must be(true)
Await.ready(doneLatch, 5 seconds)
counter1.get must be(1)
counter2.get must be(1)
@ -214,7 +214,7 @@ class RoutingSpec extends AkkaSpec(ConfigFactory.parseString("""
}
"deliver a broadcast message" in {
val doneLatch = new CountDownLatch(2)
val doneLatch = new TestLatch(2)
val counter1 = new AtomicInteger
val actor1 = system.actorOf(Props(new Actor {
@ -237,7 +237,7 @@ class RoutingSpec extends AkkaSpec(ConfigFactory.parseString("""
routedActor ! Broadcast(1)
routedActor ! Broadcast("end")
doneLatch.await(5, TimeUnit.SECONDS) must be(true)
Await.ready(doneLatch, 5 seconds)
counter1.get must be(1)
counter2.get must be(1)
@ -251,7 +251,7 @@ class RoutingSpec extends AkkaSpec(ConfigFactory.parseString("""
}
"broadcast message using !" in {
val doneLatch = new CountDownLatch(2)
val doneLatch = new TestLatch(2)
val counter1 = new AtomicInteger
val actor1 = system.actorOf(Props(new Actor {
@ -273,14 +273,14 @@ class RoutingSpec extends AkkaSpec(ConfigFactory.parseString("""
routedActor ! 1
routedActor ! "end"
doneLatch.await(5, TimeUnit.SECONDS) must be(true)
Await.ready(doneLatch, 5 seconds)
counter1.get must be(1)
counter2.get must be(1)
}
"broadcast message using ?" in {
val doneLatch = new CountDownLatch(2)
val doneLatch = new TestLatch(2)
val counter1 = new AtomicInteger
val actor1 = system.actorOf(Props(new Actor {
@ -304,7 +304,7 @@ class RoutingSpec extends AkkaSpec(ConfigFactory.parseString("""
routedActor ? 1
routedActor ! "end"
doneLatch.await(5, TimeUnit.SECONDS) must be(true)
Await.ready(doneLatch, 5 seconds)
counter1.get must be(1)
counter2.get must be(1)
@ -341,7 +341,7 @@ class RoutingSpec extends AkkaSpec(ConfigFactory.parseString("""
routedActor ! Broadcast(1)
routedActor ! Broadcast("end")
doneLatch.await
Await.ready(doneLatch, TestLatch.DefaultTimeout)
counter1.get must be(1)
counter2.get must be(1)
@ -354,7 +354,7 @@ class RoutingSpec extends AkkaSpec(ConfigFactory.parseString("""
val routedActor = system.actorOf(Props[TestActor].withRouter(ScatterGatherFirstCompletedRouter(routees = List(actor1, actor2))))
routedActor ! Broadcast(Stop(Some(1)))
shutdownLatch.await
Await.ready(shutdownLatch, TestLatch.DefaultTimeout)
Await.result(routedActor ? Broadcast(0), timeout.duration) must be(22)
}

View file

@ -160,6 +160,10 @@ akka {
# Specifies the timeout to add a new message to a mailbox that is full -
# negative number means infinite timeout
mailbox-push-timeout-time = 10s
# FQCN of the MailboxType, if not specified the default bounded or unbounded
# mailbox is used.
mailboxType = ""
}
debug {

View file

@ -16,7 +16,6 @@ import akka.AkkaException
import scala.reflect.BeanProperty
import scala.util.control.NoStackTrace
import com.eaio.uuid.UUID
import java.lang.reflect.InvocationTargetException
import java.util.concurrent.TimeUnit
import java.util.{ Collection JCollection }
import java.util.regex.Pattern

View file

@ -19,7 +19,6 @@ import com.typesafe.config.ConfigFactory
import com.typesafe.config.ConfigParseOptions
import com.typesafe.config.ConfigResolveOptions
import com.typesafe.config.ConfigException
import java.lang.reflect.InvocationTargetException
import akka.util.{ Helpers, Duration, ReflectiveAccess }
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.{ CountDownLatch, Executors, ConcurrentHashMap }
@ -409,7 +408,6 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor
val values: Array[AnyRef] = arguments map (_._2) toArray
ReflectiveAccess.createInstance[ActorRefProvider](providerClass, types, values) match {
case Left(e: InvocationTargetException) throw e.getTargetException
case Left(e) throw e
case Right(p) p
}

View file

@ -271,12 +271,16 @@ abstract class MessageDispatcherConfigurator() {
def configure(config: Config, settings: Settings, prerequisites: DispatcherPrerequisites): MessageDispatcher
def mailboxType(config: Config, settings: Settings): MailboxType = {
config.getString("mailboxType") match {
case ""
val capacity = config.getInt("mailbox-capacity")
if (capacity < 1) UnboundedMailbox()
else {
val duration = Duration(config.getNanoseconds("mailbox-push-timeout-time"), TimeUnit.NANOSECONDS)
BoundedMailbox(capacity, duration)
}
case fqn new CustomMailboxType(fqn)
}
}
def configureThreadPool(

View file

@ -41,7 +41,7 @@ object Await {
def result(atMost: Duration)(implicit permit: CanAwait): T
}
private implicit val permit = new CanAwait {}
private[this] implicit final val permit = new CanAwait {}
def ready[T <: Awaitable[_]](awaitable: T, atMost: Duration): T = awaitable.ready(atMost)
def result[T](awaitable: Awaitable[T], atMost: Duration): T = awaitable.result(atMost)

View file

@ -10,6 +10,8 @@ import akka.actor.{ ActorCell, ActorRef }
import java.util.concurrent._
import annotation.tailrec
import akka.event.Logging.Error
import com.typesafe.config.Config
import akka.actor.ActorContext
class MessageQueueAppendFailedException(message: String, cause: Throwable = null) extends AkkaException(message, cause)
@ -33,7 +35,17 @@ object Mailbox {
final val debug = false
}
abstract class Mailbox(val actor: ActorCell) extends MessageQueue with SystemMessageQueue with Runnable {
/**
* Custom mailbox implementations are implemented by extending this class.
*/
abstract class CustomMailbox(val actorContext: ActorContext) extends Mailbox(actorContext.asInstanceOf[ActorCell])
/**
* Mailbox and InternalMailbox is separated in two classes because ActorCell is needed for implementation,
* but can't be exposed to user defined mailbox subclasses.
*
*/
private[akka] abstract class Mailbox(val actor: ActorCell) extends MessageQueue with SystemMessageQueue with Runnable {
import Mailbox._
@volatile
@ -317,15 +329,15 @@ trait QueueBasedMessageQueue extends MessageQueue {
* Mailbox configuration.
*/
trait MailboxType {
def create(receiver: ActorCell): Mailbox
def create(receiver: ActorContext): Mailbox
}
/**
* It's a case class for Java (new UnboundedMailbox)
*/
case class UnboundedMailbox() extends MailboxType {
override def create(receiver: ActorCell) =
new Mailbox(receiver) with QueueBasedMessageQueue with UnboundedMessageQueueSemantics with DefaultSystemMessageQueue {
override def create(receiver: ActorContext) =
new Mailbox(receiver.asInstanceOf[ActorCell]) with QueueBasedMessageQueue with UnboundedMessageQueueSemantics with DefaultSystemMessageQueue {
final val queue = new ConcurrentLinkedQueue[Envelope]()
}
}
@ -335,16 +347,16 @@ case class BoundedMailbox( final val capacity: Int, final val pushTimeOut: Durat
if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative")
if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null")
override def create(receiver: ActorCell) =
new Mailbox(receiver) with QueueBasedMessageQueue with BoundedMessageQueueSemantics with DefaultSystemMessageQueue {
override def create(receiver: ActorContext) =
new Mailbox(receiver.asInstanceOf[ActorCell]) with QueueBasedMessageQueue with BoundedMessageQueueSemantics with DefaultSystemMessageQueue {
final val queue = new LinkedBlockingQueue[Envelope](capacity)
final val pushTimeOut = BoundedMailbox.this.pushTimeOut
}
}
case class UnboundedPriorityMailbox( final val cmp: Comparator[Envelope]) extends MailboxType {
override def create(receiver: ActorCell) =
new Mailbox(receiver) with QueueBasedMessageQueue with UnboundedMessageQueueSemantics with DefaultSystemMessageQueue {
override def create(receiver: ActorContext) =
new Mailbox(receiver.asInstanceOf[ActorCell]) with QueueBasedMessageQueue with UnboundedMessageQueueSemantics with DefaultSystemMessageQueue {
final val queue = new PriorityBlockingQueue[Envelope](11, cmp)
}
}
@ -354,10 +366,36 @@ case class BoundedPriorityMailbox( final val cmp: Comparator[Envelope], final va
if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative")
if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null")
override def create(receiver: ActorCell) =
new Mailbox(receiver) with QueueBasedMessageQueue with BoundedMessageQueueSemantics with DefaultSystemMessageQueue {
override def create(receiver: ActorContext) =
new Mailbox(receiver.asInstanceOf[ActorCell]) with QueueBasedMessageQueue with BoundedMessageQueueSemantics with DefaultSystemMessageQueue {
final val queue = new BoundedBlockingQueue[Envelope](capacity, new PriorityQueue[Envelope](11, cmp))
final val pushTimeOut = BoundedPriorityMailbox.this.pushTimeOut
}
}
/**
* Mailbox factory that creates instantiates the implementation from a
* fully qualified class name. The implementation class must have
* a constructor with a [[akka.actor.ActorContext]] parameter.
* E.g.
* <pre<code>
* class MyMailbox(owner: ActorContext) extends CustomMailbox(owner)
* with QueueBasedMessageQueue with UnboundedMessageQueueSemantics with DefaultSystemMessageQueue {
* val queue = new ConcurrentLinkedQueue[Envelope]()
* }
* </code></pre>
*/
class CustomMailboxType(mailboxFQN: String) extends MailboxType {
override def create(receiver: ActorContext): Mailbox = {
val constructorSignature = Array[Class[_]](classOf[ActorContext])
ReflectiveAccess.createInstance[Mailbox](mailboxFQN, constructorSignature, Array[AnyRef](receiver)) match {
case Right(instance) instance
case Left(exception)
throw new IllegalArgumentException("Cannot instantiate mailbox [%s] due to: %s".
format(mailboxFQN, exception.toString))
}
}
}

View file

@ -12,7 +12,6 @@ import akka.util.ReentrantGuard
import akka.util.duration._
import akka.util.Timeout
import java.util.concurrent.atomic.AtomicInteger
import akka.actor.ActorRefProvider
import scala.util.control.NoStackTrace
import java.util.concurrent.TimeoutException
import akka.dispatch.Await
@ -516,34 +515,54 @@ trait LoggingAdapter {
*/
def error(cause: Throwable, message: String) { if (isErrorEnabled) notifyError(cause, message) }
def error(cause: Throwable, template: String, arg1: Any) { if (isErrorEnabled) error(cause, format(template, arg1)) }
def error(cause: Throwable, template: String, arg1: Any, arg2: Any) { if (isErrorEnabled) error(cause, format(template, arg1, arg2)) }
def error(cause: Throwable, template: String, arg1: Any, arg2: Any, arg3: Any) { if (isErrorEnabled) error(cause, format(template, arg1, arg2, arg3)) }
def error(cause: Throwable, template: String, arg1: Any, arg2: Any, arg3: Any, arg4: Any) { if (isErrorEnabled) error(cause, format(template, arg1, arg2, arg3, arg4)) }
def error(cause: Throwable, template: String, arg1: Any) { if (isErrorEnabled) notifyError(cause, format(template, arg1)) }
def error(cause: Throwable, template: String, arg1: Any, arg2: Any) { if (isErrorEnabled) notifyError(cause, format(template, arg1, arg2)) }
def error(cause: Throwable, template: String, arg1: Any, arg2: Any, arg3: Any) { if (isErrorEnabled) notifyError(cause, format(template, arg1, arg2, arg3)) }
def error(cause: Throwable, template: String, arg1: Any, arg2: Any, arg3: Any, arg4: Any) { if (isErrorEnabled) notifyError(cause, format(template, arg1, arg2, arg3, arg4)) }
def error(message: String) { if (isErrorEnabled) notifyError(message) }
def error(template: String, arg1: Any) { if (isErrorEnabled) error(format(template, arg1)) }
def error(template: String, arg1: Any, arg2: Any) { if (isErrorEnabled) error(format(template, arg1, arg2)) }
def error(template: String, arg1: Any, arg2: Any, arg3: Any) { if (isErrorEnabled) error(format(template, arg1, arg2, arg3)) }
def error(template: String, arg1: Any, arg2: Any, arg3: Any, arg4: Any) { if (isErrorEnabled) error(format(template, arg1, arg2, arg3, arg4)) }
def error(template: String, arg1: Any) { if (isErrorEnabled) notifyError(format(template, arg1)) }
def error(template: String, arg1: Any, arg2: Any) { if (isErrorEnabled) notifyError(format(template, arg1, arg2)) }
def error(template: String, arg1: Any, arg2: Any, arg3: Any) { if (isErrorEnabled) notifyError(format(template, arg1, arg2, arg3)) }
def error(template: String, arg1: Any, arg2: Any, arg3: Any, arg4: Any) { if (isErrorEnabled) notifyError(format(template, arg1, arg2, arg3, arg4)) }
def warning(message: String) { if (isWarningEnabled) notifyWarning(message) }
def warning(template: String, arg1: Any) { if (isWarningEnabled) warning(format(template, arg1)) }
def warning(template: String, arg1: Any, arg2: Any) { if (isWarningEnabled) warning(format(template, arg1, arg2)) }
def warning(template: String, arg1: Any, arg2: Any, arg3: Any) { if (isWarningEnabled) warning(format(template, arg1, arg2, arg3)) }
def warning(template: String, arg1: Any, arg2: Any, arg3: Any, arg4: Any) { if (isWarningEnabled) warning(format(template, arg1, arg2, arg3, arg4)) }
def warning(template: String, arg1: Any) { if (isWarningEnabled) notifyWarning(format(template, arg1)) }
def warning(template: String, arg1: Any, arg2: Any) { if (isWarningEnabled) notifyWarning(format(template, arg1, arg2)) }
def warning(template: String, arg1: Any, arg2: Any, arg3: Any) { if (isWarningEnabled) notifyWarning(format(template, arg1, arg2, arg3)) }
def warning(template: String, arg1: Any, arg2: Any, arg3: Any, arg4: Any) { if (isWarningEnabled) notifyWarning(format(template, arg1, arg2, arg3, arg4)) }
def info(message: String) { if (isInfoEnabled) notifyInfo(message) }
def info(template: String, arg1: Any) { if (isInfoEnabled) info(format(template, arg1)) }
def info(template: String, arg1: Any, arg2: Any) { if (isInfoEnabled) info(format(template, arg1, arg2)) }
def info(template: String, arg1: Any, arg2: Any, arg3: Any) { if (isInfoEnabled) info(format(template, arg1, arg2, arg3)) }
def info(template: String, arg1: Any, arg2: Any, arg3: Any, arg4: Any) { if (isInfoEnabled) info(format(template, arg1, arg2, arg3, arg4)) }
def info(template: String, arg1: Any) { if (isInfoEnabled) notifyInfo(format(template, arg1)) }
def info(template: String, arg1: Any, arg2: Any) { if (isInfoEnabled) notifyInfo(format(template, arg1, arg2)) }
def info(template: String, arg1: Any, arg2: Any, arg3: Any) { if (isInfoEnabled) notifyInfo(format(template, arg1, arg2, arg3)) }
def info(template: String, arg1: Any, arg2: Any, arg3: Any, arg4: Any) { if (isInfoEnabled) notifyInfo(format(template, arg1, arg2, arg3, arg4)) }
def debug(message: String) { if (isDebugEnabled) notifyDebug(message) }
def debug(template: String, arg1: Any) { if (isDebugEnabled) debug(format(template, arg1)) }
def debug(template: String, arg1: Any, arg2: Any) { if (isDebugEnabled) debug(format(template, arg1, arg2)) }
def debug(template: String, arg1: Any, arg2: Any, arg3: Any) { if (isDebugEnabled) debug(format(template, arg1, arg2, arg3)) }
def debug(template: String, arg1: Any, arg2: Any, arg3: Any, arg4: Any) { if (isDebugEnabled) debug(format(template, arg1, arg2, arg3, arg4)) }
def debug(template: String, arg1: Any) { if (isDebugEnabled) notifyDebug(format(template, arg1)) }
def debug(template: String, arg1: Any, arg2: Any) { if (isDebugEnabled) notifyDebug(format(template, arg1, arg2)) }
def debug(template: String, arg1: Any, arg2: Any, arg3: Any) { if (isDebugEnabled) notifyDebug(format(template, arg1, arg2, arg3)) }
def debug(template: String, arg1: Any, arg2: Any, arg3: Any, arg4: Any) { if (isDebugEnabled) notifyDebug(format(template, arg1, arg2, arg3, arg4)) }
def log(level: Logging.LogLevel, message: String) { if (isEnabled(level)) notifyLog(level, message) }
def log(level: Logging.LogLevel, template: String, arg1: Any) { if (isEnabled(level)) notifyLog(level, format(template, arg1)) }
def log(level: Logging.LogLevel, template: String, arg1: Any, arg2: Any) { if (isEnabled(level)) notifyLog(level, format(template, arg1, arg2)) }
def log(level: Logging.LogLevel, template: String, arg1: Any, arg2: Any, arg3: Any) { if (isEnabled(level)) notifyLog(level, format(template, arg1, arg2, arg3)) }
def log(level: Logging.LogLevel, template: String, arg1: Any, arg2: Any, arg3: Any, arg4: Any) { if (isEnabled(level)) notifyLog(level, format(template, arg1, arg2, arg3, arg4)) }
final def isEnabled(level: Logging.LogLevel): Boolean = level match {
case Logging.ErrorLevel isErrorEnabled
case Logging.WarningLevel isWarningEnabled
case Logging.InfoLevel isInfoEnabled
case Logging.DebugLevel isDebugEnabled
}
final def notifyLog(level: Logging.LogLevel, message: String): Unit = level match {
case Logging.ErrorLevel if (isErrorEnabled) notifyError(message)
case Logging.WarningLevel if (isWarningEnabled) notifyWarning(message)
case Logging.InfoLevel if (isInfoEnabled) notifyInfo(message)
case Logging.DebugLevel if (isDebugEnabled) notifyDebug(message)
}
def format(t: String, arg: Any*) = {
val sb = new StringBuilder

View file

@ -5,6 +5,7 @@
package akka.util
import akka.actor._
import java.lang.reflect.InvocationTargetException
object ReflectiveAccess {
@ -14,22 +15,19 @@ object ReflectiveAccess {
def createInstance[T](clazz: Class[_],
params: Array[Class[_]],
args: Array[AnyRef]): Either[Exception, T] = try {
args: Array[AnyRef]): Either[Exception, T] = withErrorHandling {
assert(clazz ne null)
assert(params ne null)
assert(args ne null)
val ctor = clazz.getDeclaredConstructor(params: _*)
ctor.setAccessible(true)
Right(ctor.newInstance(args: _*).asInstanceOf[T])
} catch {
case e: Exception
Left(e)
}
def createInstance[T](fqn: String,
params: Array[Class[_]],
args: Array[AnyRef],
classloader: ClassLoader = loader): Either[Exception, T] = try {
classloader: ClassLoader = loader): Either[Exception, T] = withErrorHandling {
assert(params ne null)
assert(args ne null)
getClassFor(fqn, classloader) match {
@ -39,9 +37,6 @@ object ReflectiveAccess {
Right(ctor.newInstance(args: _*).asInstanceOf[T])
case Left(exception) Left(exception) //We could just cast this to Either[Exception, T] but it's ugly
}
} catch {
case e: Exception
Left(e)
}
//Obtains a reference to fqn.MODULE$
@ -100,5 +95,24 @@ object ReflectiveAccess {
case e: Exception Left(e)
}
/**
* Caught exception is returned as Left(exception).
* Unwraps `InvocationTargetException` if its getTargetException is an `Exception`.
* Other `Throwable`, such as `Error` is thrown.
*/
@inline
private final def withErrorHandling[T](body: Either[Exception, T]): Either[Exception, T] = {
try {
body
} catch {
case e: InvocationTargetException e.getTargetException match {
case t: Exception Left(t)
case t throw t
}
case e: Exception
Left(e)
}
}
}

View file

@ -4,17 +4,17 @@
package akka.agent
import akka.actor.ActorSystem
import akka.actor._
import akka.stm._
import akka.japi.{ Function JFunc, Procedure JProc }
import akka.dispatch._
import akka.util.Timeout
import scala.concurrent.stm._
/**
* Used internally to send functions.
*/
private[akka] case class Update[T](function: T T)
private[akka] case class Alter[T](function: T T)
private[akka] case object Get
/**
@ -101,7 +101,7 @@ class Agent[T](initialValue: T, system: ActorSystem) {
/**
* Read the internal state of the agent.
*/
def get() = ref.get
def get() = ref.single.get
/**
* Read the internal state of the agent.
@ -111,9 +111,10 @@ class Agent[T](initialValue: T, system: ActorSystem) {
/**
* Dispatch a function to update the internal state.
*/
def send(f: T T) {
def send(f: T T): Unit = {
def dispatch = updater ! Update(f)
if (Stm.activeTransaction) { get; deferred(dispatch) }
val txn = Txn.findCurrent
if (txn.isDefined) Txn.afterCommit(status dispatch)(txn.get)
else dispatch
}
@ -122,11 +123,11 @@ class Agent[T](initialValue: T, system: ActorSystem) {
* that new state can be obtained within the given timeout.
*/
def alter(f: T T)(timeout: Timeout): Future[T] = {
def dispatch = updater.?(Update(f), timeout).asInstanceOf[Future[T]]
if (Stm.activeTransaction) {
def dispatch = updater.?(Alter(f), timeout).asInstanceOf[Future[T]]
val txn = Txn.findCurrent
if (txn.isDefined) {
val result = Promise[T]()(system.dispatcher)
get //Join xa
deferred { result completeWith dispatch } //Attach deferred-block to current transaction
Txn.afterCommit(status result completeWith dispatch)(txn.get)
result
} else dispatch
}
@ -172,7 +173,7 @@ class Agent[T](initialValue: T, system: ActorSystem) {
suspend()
val pinnedDispatcher = new PinnedDispatcher(system.dispatcherFactory.prerequisites, null, "agent-alter-off", UnboundedMailbox(), system.settings.ActorTimeout.duration)
val threadBased = system.actorOf(Props(new ThreadBasedAgentUpdater(this)).withDispatcher(pinnedDispatcher))
result completeWith threadBased.?(Update(f), timeout).asInstanceOf[Future[T]]
result completeWith threadBased.?(Alter(f), timeout).asInstanceOf[Future[T]]
value
})
result
@ -283,28 +284,35 @@ class Agent[T](initialValue: T, system: ActorSystem) {
* Agent updater actor. Used internally for `send` actions.
*/
class AgentUpdater[T](agent: Agent[T]) extends Actor {
val txFactory = TransactionFactory(familyName = "AgentUpdater", readonly = false)
def receive = {
case update: Update[_] sender.tell(atomic(txFactory) { agent.ref alter update.function.asInstanceOf[T T] })
case Get sender.tell(agent.get)
case u: Update[_] update(u.function.asInstanceOf[T T])
case a: Alter[_] sender ! update(a.function.asInstanceOf[T T])
case Get sender ! agent.get
case _
}
def update(function: T T): T = agent.ref.single.transformAndGet(function)
}
/**
* Thread-based agent updater actor. Used internally for `sendOff` actions.
*/
class ThreadBasedAgentUpdater[T](agent: Agent[T]) extends Actor {
val txFactory = TransactionFactory(familyName = "ThreadBasedAgentUpdater", readonly = false)
def receive = {
case update: Update[_] try {
sender.tell(atomic(txFactory) { agent.ref alter update.function.asInstanceOf[T T] })
case u: Update[_] try {
update(u.function.asInstanceOf[T T])
} finally {
agent.resume()
context.stop(self)
}
case a: Alter[_] try {
sender ! update(a.function.asInstanceOf[T T])
} finally {
agent.resume()
context.stop(self)
}
case _ context.stop(self)
}
def update(function: T T): T = agent.ref.single.transformAndGet(function)
}

View file

@ -1,17 +1,12 @@
package akka.agent.test
package akka.agent
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
import akka.actor.ActorSystem
import akka.util.Timeout
import akka.agent.Agent
import akka.stm._
import akka.dispatch.Await
import akka.util.Duration
import akka.util.duration._
import java.util.concurrent.CountDownLatch
import akka.testkit.AkkaSpec
import akka.util.Timeout
import akka.testkit._
import akka.dispatch.Await
import scala.concurrent.stm._
import java.util.concurrent.CountDownLatch
class CountDownFunction[A](num: Int = 1) extends Function1[A, A] {
val latch = new CountDownLatch(num)
@ -96,7 +91,7 @@ class AgentSpec extends AkkaSpec {
"be readable within a transaction" in {
val agent = Agent(5)
val value = atomic { agent() }
val value = atomic { t agent() }
value must be(5)
agent.close()
}
@ -105,7 +100,7 @@ class AgentSpec extends AkkaSpec {
val countDown = new CountDownFunction[Int]
val agent = Agent(5)
atomic {
atomic { t
agent send (_ * 2)
}
agent send countDown
@ -122,7 +117,7 @@ class AgentSpec extends AkkaSpec {
val agent = Agent(5)
try {
atomic(DefaultTransactionFactory) {
atomic { t
agent send (_ * 2)
throw new RuntimeException("Expected failure")
}

View file

@ -126,8 +126,9 @@ class IncludeCode(Directive):
retnode = nodes.literal_block(text, text, source=fn)
retnode.line = 1
retnode.attributes['line_number'] = self.lineno
if self.options.get('language', ''):
retnode['language'] = self.options['language']
language = self.options.get('language')
if language:
retnode['language'] = language
if 'linenos' in self.options:
retnode['linenos'] = True
document.settings.env.note_dependency(rel_fn)

View file

@ -1,147 +0,0 @@
Agents (Scala)
==============
.. sidebar:: Contents
.. contents:: :local:
Agents in Akka were inspired by `agents in Clojure <http://clojure.org/agents>`_.
Agents provide asynchronous change of individual locations. Agents are bound to a single storage location for their lifetime, and only allow mutation of that location (to a new state) to occur as a result of an action. Update actions are functions that are asynchronously applied to the Agent's state and whose return value becomes the Agent's new state. The state of an Agent should be immutable.
While updates to Agents are asynchronous, the state of an Agent is always immediately available for reading by any thread (using ``get`` or ``apply``) without any messages.
Agents are reactive. The update actions of all Agents get interleaved amongst threads in a thread pool. At any point in time, at most one ``send`` action for each Agent is being executed. Actions dispatched to an agent from another thread will occur in the order they were sent, potentially interleaved with actions dispatched to the same agent from other sources.
If an Agent is used within an enclosing transaction, then it will participate in that transaction. Agents are integrated with the STM - any dispatches made in a transaction are held until that transaction commits, and are discarded if it is retried or aborted.
Creating and stopping Agents
----------------------------
Agents are created by invoking ``Agent(value)`` passing in the Agent's initial value.
.. code-block:: scala
val agent = Agent(5)
An Agent will be running until you invoke ``close`` on it. Then it will be eligible for garbage collection (unless you hold on to it in some way).
.. code-block:: scala
agent.close()
Updating Agents
---------------
You update an Agent by sending a function that transforms the current value or by sending just a new value. The Agent will apply the new value or function atomically and asynchronously. The update is done in a fire-forget manner and you are only guaranteed that it will be applied. There is no guarantee of when the update will be applied but dispatches to an Agent from a single thread will occur in order. You apply a value or a function by invoking the ``send`` function.
.. code-block:: scala
// send a value
agent send 7
// send a function
agent send (_ + 1)
agent send (_ * 2)
You can also dispatch a function to update the internal state but on its own thread. This does not use the reactive thread pool and can be used for long-running or blocking operations. You do this with the ``sendOff`` method. Dispatches using either ``sendOff`` or ``send`` will still be executed in order.
.. code-block:: scala
// sendOff a function
agent sendOff (longRunningOrBlockingFunction)
Reading an Agent's value
------------------------
Agents can be dereferenced, e.g. you can get an Agent's value, by invoking the Agent with parenthesis like this:
.. code-block:: scala
val result = agent()
Or by using the get method.
.. code-block:: scala
val result = agent.get
Reading an Agent's current value does not involve any message passing and happens immediately. So while updates to an Agent are asynchronous, reading the state of an Agent is synchronous.
Awaiting an Agent's value
-------------------------
It is also possible to read the value after all currently queued ``send``\s have completed. You can do this with ``await``:
.. code-block:: scala
val result = agent.await
You can also get a ``Future`` to this value, that will be completed after the currently queued updates have completed:
.. code-block:: scala
val future = agent.future
// ...
val result = future.await.result.get
Transactional Agents
--------------------
If an Agent is used within an enclosing transaction, then it will participate in that transaction. If you send to an Agent within a transaction then the dispatch to the Agent will be held until that transaction commits, and discarded if the transaction is aborted.
.. code-block:: scala
import akka.agent.Agent
import akka.stm._
def transfer(from: Agent[Int], to: Agent[Int], amount: Int): Boolean = {
atomic {
if (from.get < amount) false
else {
from send (_ - amount)
to send (_ + amount)
true
}
}
}
val from = Agent(100)
val to = Agent(20)
val ok = transfer(from, to, 50)
from() // -> 50
to() // -> 70
Monadic usage
-------------
Agents are also monadic, allowing you to compose operations using for-comprehensions. In a monadic usage, new Agents are created leaving the original Agents untouched. So the old values (Agents) are still available as-is. They are so-called 'persistent'.
Example of a monadic usage:
.. code-block:: scala
val agent1 = Agent(3)
val agent2 = Agent(5)
// uses foreach
var result = 0
for (value <- agent1) {
result = value + 1
}
// uses map
val agent3 =
for (value <- agent1) yield value + 1
// uses flatMap
val agent4 = for {
value1 <- agent1
value2 <- agent2
} yield value1 + value2
agent1.close()
agent2.close()
agent3.close()
agent4.close()

112
akka-docs/java/agents.rst Normal file
View file

@ -0,0 +1,112 @@
.. _agents-java:
##############
Agents (Java)
##############
.. sidebar:: Contents
.. contents:: :local:
Agents in Akka are inspired by `agents in Clojure`_.
.. _agents in Clojure: http://clojure.org/agents
Agents provide asynchronous change of individual locations. Agents are bound to
a single storage location for their lifetime, and only allow mutation of that
location (to a new state) to occur as a result of an action. Update actions are
functions that are asynchronously applied to the Agent's state and whose return
value becomes the Agent's new state. The state of an Agent should be immutable.
While updates to Agents are asynchronous, the state of an Agent is always
immediately available for reading by any thread (using ``get``) without any
messages.
Agents are reactive. The update actions of all Agents get interleaved amongst
threads in a thread pool. At any point in time, at most one ``send`` action for
each Agent is being executed. Actions dispatched to an agent from another thread
will occur in the order they were sent, potentially interleaved with actions
dispatched to the same agent from other sources.
If an Agent is used within an enclosing transaction, then it will participate in
that transaction. Agents are integrated with the STM - any dispatches made in
a transaction are held until that transaction commits, and are discarded if it
is retried or aborted.
Creating and stopping Agents
============================
Agents are created by invoking ``new Agent(value, system)`` passing in the
Agent's initial value and a reference to the ``ActorSystem`` for your
application. An ``ActorSystem`` is required to create the underlying Actors. See
:ref:`actor-systems` for more information about actor systems.
Here is an example of creating an Agent:
.. includecode:: code/akka/docs/agent/AgentDocTest.java
:include: import-system,import-agent
:language: java
.. includecode:: code/akka/docs/agent/AgentDocTest.java#create
:language: java
An Agent will be running until you invoke ``close`` on it. Then it will be
eligible for garbage collection (unless you hold on to it in some way).
.. includecode:: code/akka/docs/agent/AgentDocTest.java#close
:language: java
Updating Agents
===============
You update an Agent by sending a function that transforms the current value or
by sending just a new value. The Agent will apply the new value or function
atomically and asynchronously. The update is done in a fire-forget manner and
you are only guaranteed that it will be applied. There is no guarantee of when
the update will be applied but dispatches to an Agent from a single thread will
occur in order. You apply a value or a function by invoking the ``send``
function.
.. includecode:: code/akka/docs/agent/AgentDocTest.java#import-function
:language: java
.. includecode:: code/akka/docs/agent/AgentDocTest.java#send
:language: java
You can also dispatch a function to update the internal state but on its own
thread. This does not use the reactive thread pool and can be used for
long-running or blocking operations. You do this with the ``sendOff``
method. Dispatches using either ``sendOff`` or ``send`` will still be executed
in order.
.. includecode:: code/akka/docs/agent/AgentDocTest.java#send-off
:language: java
Reading an Agent's value
========================
Agents can be dereferenced (you can get an Agent's value) by calling the get
method:
.. includecode:: code/akka/docs/agent/AgentDocTest.java#read-get
:language: java
Reading an Agent's current value does not involve any message passing and
happens immediately. So while updates to an Agent are asynchronous, reading the
state of an Agent is synchronous.
Awaiting an Agent's value
=========================
It is also possible to read the value after all currently queued sends have
completed. You can do this with ``await``:
.. includecode:: code/akka/docs/agent/AgentDocTest.java#import-timeout
:language: java
.. includecode:: code/akka/docs/agent/AgentDocTest.java#read-await
:language: java

View file

@ -0,0 +1,10 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.docs.agent
import org.scalatest.junit.JUnitWrapperSuite
class AgentDocJavaSpec extends JUnitWrapperSuite(
"akka.docs.agent.AgentDocTest",
Thread.currentThread.getContextClassLoader)

View file

@ -0,0 +1,109 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.docs.agent;
import static org.junit.Assert.*;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import akka.testkit.AkkaSpec;
//#import-system
import akka.actor.ActorSystem;
//#import-system
//#import-agent
import akka.agent.Agent;
//#import-agent
//#import-function
import akka.japi.Function;
//#import-function
//#import-timeout
import akka.util.Duration;
import akka.util.Timeout;
import static java.util.concurrent.TimeUnit.SECONDS;
//#import-timeout
public class AgentDocTest {
private static ActorSystem testSystem;
@BeforeClass
public static void beforeAll() {
testSystem = ActorSystem.create("AgentDocTest", AkkaSpec.testConf());
}
@AfterClass
public static void afterAll() {
testSystem.shutdown();
testSystem = null;
}
@Test
public void createAndClose() {
//#create
ActorSystem system = ActorSystem.create("app");
Agent<Integer> agent = new Agent<Integer>(5, system);
//#create
//#close
agent.close();
//#close
system.shutdown();
}
@Test
public void sendAndSendOffAndReadAwait() {
Agent<Integer> agent = new Agent<Integer>(5, testSystem);
//#send
// send a value
agent.send(7);
// send a function
agent.send(new Function<Integer, Integer>() {
public Integer apply(Integer i) {
return i * 2;
}
});
//#send
Function<Integer, Integer> longRunningOrBlockingFunction = new Function<Integer, Integer>() {
public Integer apply(Integer i) {
return i * 1;
}
};
//#send-off
// sendOff a function
agent.sendOff(longRunningOrBlockingFunction);
//#send-off
//#read-await
Integer result = agent.await(new Timeout(Duration.create(5, SECONDS)));
//#read-await
assertEquals(result, new Integer(14));
agent.close();
}
@Test
public void readWithGet() {
Agent<Integer> agent = new Agent<Integer>(5, testSystem);
//#read-get
Integer result = agent.get();
//#read-get
assertEquals(result, new Integer(5));
agent.close();
}
}

View file

@ -17,5 +17,6 @@ Java API
routing
remoting
serialization
agents
extending-akka
transactors

View file

@ -4,15 +4,14 @@
package akka.docs.actor.mailbox
//#imports
import akka.actor.Actor
import akka.actor.Props
import akka.actor.mailbox.FileDurableMailboxType
//#imports
import org.scalatest.{ BeforeAndAfterAll, WordSpec }
import org.scalatest.matchers.MustMatchers
import akka.testkit.AkkaSpec
import akka.actor.Actor
class MyActor extends Actor {
def receive = {
@ -20,15 +19,23 @@ class MyActor extends Actor {
}
}
class DurableMailboxDocSpec extends AkkaSpec {
object DurableMailboxDocSpec {
val config = """
//#dispatcher-config
my-dispatcher {
mailboxType = akka.actor.mailbox.FileBasedMailbox
}
//#dispatcher-config
"""
}
"define dispatcher with durable mailbox" in {
//#define-dispatcher
val dispatcher = system.dispatcherFactory.newDispatcher(
"my-dispatcher", throughput = 1, mailboxType = FileDurableMailboxType).build
class DurableMailboxDocSpec extends AkkaSpec(DurableMailboxDocSpec.config) {
"configuration of dispatcher with durable mailbox" in {
//#dispatcher-config-use
val dispatcher = system.dispatcherFactory.lookup("my-dispatcher")
val myActor = system.actorOf(Props[MyActor].withDispatcher(dispatcher), name = "myactor")
//#define-dispatcher
myActor ! "hello"
//#dispatcher-config-use
}
}

View file

@ -4,7 +4,6 @@
package akka.docs.actor.mailbox;
//#imports
import akka.actor.mailbox.DurableMailboxType;
import akka.dispatch.MessageDispatcher;
import akka.actor.UntypedActorFactory;
import akka.actor.UntypedActor;
@ -12,8 +11,12 @@ import akka.actor.Props;
//#imports
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import akka.testkit.AkkaSpec;
import com.typesafe.config.ConfigFactory;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
@ -21,24 +24,35 @@ import static org.junit.Assert.*;
public class DurableMailboxDocTestBase {
ActorSystem system;
@Before
public void setUp() {
system = ActorSystem.create("MySystem",
ConfigFactory.parseString(DurableMailboxDocSpec.config()).withFallback(AkkaSpec.testConf()));
}
@After
public void tearDown() {
system.shutdown();
}
@Test
public void defineDispatcher() {
ActorSystem system = ActorSystem.create("MySystem");
//#define-dispatcher
MessageDispatcher dispatcher = system.dispatcherFactory()
.newDispatcher("my-dispatcher", 1, DurableMailboxType.fileDurableMailboxType()).build();
public void configDefinedDispatcher() {
//#dispatcher-config-use
MessageDispatcher dispatcher = system.dispatcherFactory().lookup("my-dispatcher");
ActorRef myActor = system.actorOf(new Props().withDispatcher(dispatcher).withCreator(new UntypedActorFactory() {
public UntypedActor create() {
return new MyUntypedActor();
}
}));
//#define-dispatcher
}), "myactor");
//#dispatcher-config-use
myActor.tell("test");
system.shutdown();
}
public static class MyUntypedActor extends UntypedActor {
public void onReceive(Object message) {
}
}
}

View file

@ -62,15 +62,22 @@ The durable mailboxes and their configuration options reside in the
You configure durable mailboxes through the dispatcher. The
actor is oblivious to which type of mailbox it is using.
Here is an example in Scala:
In the configuration of the dispatcher you specify the fully qualified class name
of the mailbox:
.. includecode:: code/akka/docs/actor/mailbox/DurableMailboxDocSpec.scala
:include: imports,define-dispatcher
:include: dispatcher-config
Here is an example of how to create an actor with a durable dispatcher, in Scala:
.. includecode:: code/akka/docs/actor/mailbox/DurableMailboxDocSpec.scala
:include: imports,dispatcher-config-use
Corresponding example in Java:
.. includecode:: code/akka/docs/actor/mailbox/DurableMailboxDocTestBase.java
:include: imports,define-dispatcher
:include: imports,dispatcher-config-use
The actor is oblivious to which type of mailbox it is using.
@ -89,14 +96,11 @@ you need.
You configure durable mailboxes through the dispatcher, as described in
:ref:`DurableMailbox.General` with the following mailbox type.
Scala::
mailbox = akka.actor.mailbox.FileDurableMailboxType
Java::
akka.actor.mailbox.DurableMailboxType.fileDurableMailboxType()
Config::
my-dispatcher {
mailboxType = akka.actor.mailbox.FileBasedMailbox
}
You can also configure and tune the file-based durable mailbox. This is done in
the ``akka.actor.mailbox.file-based`` section in the :ref:`configuration`.
@ -117,14 +121,11 @@ mailboxes. Read more in the Redis documentation on how to do that.
You configure durable mailboxes through the dispatcher, as described in
:ref:`DurableMailbox.General` with the following mailbox type.
Scala::
mailbox = akka.actor.mailbox.RedisDurableMailboxType
Java::
akka.actor.mailbox.DurableMailboxType.redisDurableMailboxType()
Config::
my-dispatcher {
mailboxType = akka.actor.mailbox.RedisBasedMailbox
}
You also need to configure the IP and port for the Redis server. This is done in
the ``akka.actor.mailbox.redis`` section in the :ref:`configuration`.
@ -146,13 +147,11 @@ documentation on how to do that.
You configure durable mailboxes through the dispatcher, as described in
:ref:`DurableMailbox.General` with the following mailbox type.
Scala::
Config::
mailbox = akka.actor.mailbox.ZooKeeperDurableMailboxType
Java::
akka.actor.mailbox.DurableMailboxType.zooKeeperDurableMailboxType()
my-dispatcher {
mailboxType = akka.actor.mailbox.ZooKeeperBasedMailbox
}
You also need to configure ZooKeeper server addresses, timeouts, etc. This is
done in the ``akka.actor.mailbox.zookeeper`` section in the :ref:`configuration`.
@ -171,13 +170,11 @@ Beanstalk documentation on how to do that.
You configure durable mailboxes through the dispatcher, as described in
:ref:`DurableMailbox.General` with the following mailbox type.
Scala::
Config::
mailbox = akka.actor.mailbox.BeanstalkDurableMailboxType
Java::
akka.actor.mailbox.DurableMailboxType.beanstalkDurableMailboxType()
my-dispatcher {
mailboxType = akka.actor.mailbox.BeanstalkBasedMailbox
}
You also need to configure the IP, and port, and so on, for the Beanstalk
server. This is done in the ``akka.actor.mailbox.beanstalk`` section in the
@ -202,13 +199,11 @@ lightweight versus building on other MongoDB implementations such as
You configure durable mailboxes through the dispatcher, as described in
:ref:`DurableMailbox.General` with the following mailbox type.
Scala::
Config::
mailbox = akka.actor.mailbox.MongoDurableMailboxType
Java::
akka.actor.mailbox.DurableMailboxType.mongoDurableMailboxType()
my-dispatcher {
mailboxType = akka.actor.mailbox.MongoBasedMailbox
}
You will need to configure the URI for the MongoDB server, using the URI Format specified in the
`MongoDB Documentation <http://www.mongodb.org/display/DOCS/Connections>`_. This is done in

View file

@ -328,16 +328,13 @@ message.
If the actor does not complete the future, it will expire after the timeout period,
which is taken from one of the following locations in order of precedence:
#. explicitly given timeout as in ``actor.?("hello")(timeout = 12 millis)``
#. implicit argument of type :class:`akka.actor.Timeout`, e.g.
1. explicitly given timeout as in:
::
.. includecode:: code/akka/docs/actor/ActorDocSpec.scala#using-explicit-timeout
import akka.actor.Timeout
import akka.util.duration._
2. implicit argument of type :class:`akka.util.Timeout`, e.g.
implicit val timeout = Timeout(12 millis)
val future = actor ? "hello"
.. includecode:: code/akka/docs/actor/ActorDocSpec.scala#using-implicit-timeout
See :ref:`futures-scala` for more information on how to await or query a
future.

View file

@ -1,4 +1,135 @@
Agents (Scala)
==============
.. _agents-scala:
The Akka Agents module has not been migrated to Akka 2.0-SNAPSHOT yet.
################
Agents (Scala)
################
.. sidebar:: Contents
.. contents:: :local:
Agents in Akka are inspired by `agents in Clojure`_.
.. _agents in Clojure: http://clojure.org/agents
Agents provide asynchronous change of individual locations. Agents are bound to
a single storage location for their lifetime, and only allow mutation of that
location (to a new state) to occur as a result of an action. Update actions are
functions that are asynchronously applied to the Agent's state and whose return
value becomes the Agent's new state. The state of an Agent should be immutable.
While updates to Agents are asynchronous, the state of an Agent is always
immediately available for reading by any thread (using ``get`` or ``apply``)
without any messages.
Agents are reactive. The update actions of all Agents get interleaved amongst
threads in a thread pool. At any point in time, at most one ``send`` action for
each Agent is being executed. Actions dispatched to an agent from another thread
will occur in the order they were sent, potentially interleaved with actions
dispatched to the same agent from other sources.
If an Agent is used within an enclosing transaction, then it will participate in
that transaction. Agents are integrated with Scala STM - any dispatches made in
a transaction are held until that transaction commits, and are discarded if it
is retried or aborted.
Creating and stopping Agents
============================
Agents are created by invoking ``Agent(value)`` passing in the Agent's initial
value:
.. includecode:: code/akka/docs/agent/AgentDocSpec.scala#create
Note that creating an Agent requires an implicit ``ActorSystem`` (for creating
the underlying actors). See :ref:`actor-systems` for more information about
actor systems. An ActorSystem can be in implicit scope when creating an Agent:
.. includecode:: code/akka/docs/agent/AgentDocSpec.scala#create-implicit-system
Or the ActorSystem can be passed explicitly when creating an Agent:
.. includecode:: code/akka/docs/agent/AgentDocSpec.scala#create-explicit-system
An Agent will be running until you invoke ``close`` on it. Then it will be
eligible for garbage collection (unless you hold on to it in some way).
.. includecode:: code/akka/docs/agent/AgentDocSpec.scala#close
Updating Agents
===============
You update an Agent by sending a function that transforms the current value or
by sending just a new value. The Agent will apply the new value or function
atomically and asynchronously. The update is done in a fire-forget manner and
you are only guaranteed that it will be applied. There is no guarantee of when
the update will be applied but dispatches to an Agent from a single thread will
occur in order. You apply a value or a function by invoking the ``send``
function.
.. includecode:: code/akka/docs/agent/AgentDocSpec.scala#send
You can also dispatch a function to update the internal state but on its own
thread. This does not use the reactive thread pool and can be used for
long-running or blocking operations. You do this with the ``sendOff``
method. Dispatches using either ``sendOff`` or ``send`` will still be executed
in order.
.. includecode:: code/akka/docs/agent/AgentDocSpec.scala#send-off
Reading an Agent's value
========================
Agents can be dereferenced (you can get an Agent's value) by invoking the Agent
with parentheses like this:
.. includecode:: code/akka/docs/agent/AgentDocSpec.scala#read-apply
Or by using the get method:
.. includecode:: code/akka/docs/agent/AgentDocSpec.scala#read-get
Reading an Agent's current value does not involve any message passing and
happens immediately. So while updates to an Agent are asynchronous, reading the
state of an Agent is synchronous.
Awaiting an Agent's value
=========================
It is also possible to read the value after all currently queued sends have
completed. You can do this with ``await``:
.. includecode:: code/akka/docs/agent/AgentDocSpec.scala#read-await
You can also get a ``Future`` to this value, that will be completed after the
currently queued updates have completed:
.. includecode:: code/akka/docs/agent/AgentDocSpec.scala#read-future
Transactional Agents
====================
If an Agent is used within an enclosing transaction, then it will participate in
that transaction. If you send to an Agent within a transaction then the dispatch
to the Agent will be held until that transaction commits, and discarded if the
transaction is aborted. Here's an example:
.. includecode:: code/akka/docs/agent/AgentDocSpec.scala#transfer-example
Monadic usage
=============
Agents are also monadic, allowing you to compose operations using
for-comprehensions. In monadic usage, new Agents are created leaving the
original Agents untouched. So the old values (Agents) are still available
as-is. They are so-called 'persistent'.
Example of monadic usage:
.. includecode:: code/akka/docs/agent/AgentDocSpec.scala#monadic-example

View file

@ -20,6 +20,8 @@ import org.scalatest.matchers.MustMatchers
import akka.testkit._
import akka.util._
import akka.util.duration._
import akka.actor.Actor.Receive
import akka.dispatch.Await
//#my-actor
class MyActor extends Actor {
@ -238,6 +240,27 @@ class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) {
system.stop(myActor)
}
"using implicit timeout" in {
val myActor = system.actorOf(Props(new FirstActor))
//#using-implicit-timeout
import akka.util.duration._
import akka.util.Timeout
implicit val timeout = Timeout(500 millis)
val future = myActor ? "hello"
//#using-implicit-timeout
Await.result(future, timeout.duration) must be("hello")
}
"using explicit timeout" in {
val myActor = system.actorOf(Props(new FirstActor))
//#using-explicit-timeout
import akka.util.duration._
val future = myActor ? ("hello", timeout = 500 millis)
//#using-explicit-timeout
Await.result(future, 500 millis) must be("hello")
}
"using receiveTimeout" in {
//#receive-timeout
import akka.actor.ReceiveTimeout

View file

@ -0,0 +1,190 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.docs.agent
import akka.agent.Agent
import akka.util.duration._
import akka.util.Timeout
import akka.testkit._
class AgentDocSpec extends AkkaSpec {
"create and close" in {
//#create
import akka.agent.Agent
val agent = Agent(5)
//#create
//#close
agent.close()
//#close
}
"create with implicit system" in {
//#create-implicit-system
import akka.actor.ActorSystem
import akka.agent.Agent
implicit val system = ActorSystem("app")
val agent = Agent(5)
//#create-implicit-system
agent.close()
system.shutdown()
}
"create with explicit system" in {
//#create-explicit-system
import akka.actor.ActorSystem
import akka.agent.Agent
val system = ActorSystem("app")
val agent = Agent(5)(system)
//#create-explicit-system
agent.close()
system.shutdown()
}
"send and sendOff" in {
val agent = Agent(0)
//#send
// send a value
agent send 7
// send a function
agent send (_ + 1)
agent send (_ * 2)
//#send
def longRunningOrBlockingFunction = (i: Int) i * 1
//#send-off
// sendOff a function
agent sendOff (longRunningOrBlockingFunction)
//#send-off
val result = agent.await(Timeout(5 seconds))
result must be === 16
}
"read with apply" in {
val agent = Agent(0)
//#read-apply
val result = agent()
//#read-apply
result must be === 0
}
"read with get" in {
val agent = Agent(0)
//#read-get
val result = agent.get
//#read-get
result must be === 0
}
"read with await" in {
val agent = Agent(0)
//#read-await
import akka.util.duration._
import akka.util.Timeout
implicit val timeout = Timeout(5 seconds)
val result = agent.await
//#read-await
result must be === 0
}
"read with future" in {
val agent = Agent(0)
//#read-future
import akka.dispatch.Await
implicit val timeout = Timeout(5 seconds)
val future = agent.future
val result = Await.result(future, timeout.duration)
//#read-future
result must be === 0
}
"transfer example" in {
//#transfer-example
import akka.agent.Agent
import akka.util.duration._
import akka.util.Timeout
import scala.concurrent.stm._
def transfer(from: Agent[Int], to: Agent[Int], amount: Int): Boolean = {
atomic { txn
if (from.get < amount) false
else {
from send (_ - amount)
to send (_ + amount)
true
}
}
}
val from = Agent(100)
val to = Agent(20)
val ok = transfer(from, to, 50)
implicit val timeout = Timeout(5 seconds)
val fromValue = from.await // -> 50
val toValue = to.await // -> 70
//#transfer-example
fromValue must be === 50
toValue must be === 70
}
"monadic example" in {
//#monadic-example
val agent1 = Agent(3)
val agent2 = Agent(5)
// uses foreach
var result = 0
for (value agent1) {
result = value + 1
}
// uses map
val agent3 = for (value agent1) yield value + 1
// or using map directly
val agent4 = agent1 map (_ + 1)
// uses flatMap
val agent5 = for {
value1 agent1
value2 agent2
} yield value1 + value2
//#monadic-example
result must be === 4
agent3() must be === 4
agent4() must be === 4
agent5() must be === 8
agent1.close()
agent2.close()
agent3.close()
agent4.close()
agent5.close()
}
}

View file

@ -18,7 +18,7 @@ Scala API
remoting
serialization
fsm
agents
testing
extending-akka
agents
transactors

View file

@ -9,7 +9,7 @@ import java.util.concurrent.TimeUnit.MILLISECONDS
import akka.actor.LocalActorRef
import akka.util.Duration
import akka.AkkaException
import akka.actor.ActorCell
import akka.actor.ActorContext
import akka.dispatch.Envelope
import akka.event.Logging
import akka.actor.ActorRef
@ -19,7 +19,7 @@ class BeanstalkBasedMailboxException(message: String) extends AkkaException(mess
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class BeanstalkBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner) with DurableMessageSerialization {
class BeanstalkBasedMailbox(val owner: ActorContext) extends DurableMailbox(owner) with DurableMessageSerialization {
private val settings = BeanstalkBasedMailboxExtension(owner.system)
private val messageSubmitDelaySeconds = settings.MessageSubmitDelay.toSeconds.toInt

View file

@ -1,4 +1,7 @@
package akka.actor.mailbox
import akka.dispatch.CustomMailboxType
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class BeanstalkBasedMailboxSpec extends DurableMailboxSpec("Beanstalkd", BeanstalkDurableMailboxType)
class BeanstalkBasedMailboxSpec extends DurableMailboxSpec("Beanstalkd",
new CustomMailboxType("akka.actor.mailbox.BeanstalkBasedMailbox"))

View file

@ -5,12 +5,12 @@
package akka.actor.mailbox
import org.apache.commons.io.FileUtils
import akka.actor.ActorCell
import akka.actor.ActorContext
import akka.dispatch.Envelope
import akka.event.Logging
import akka.actor.ActorRef
class FileBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner) with DurableMessageSerialization {
class FileBasedMailbox(val owner: ActorContext) extends DurableMailbox(owner) with DurableMessageSerialization {
val log = Logging(system, "FileBasedMailbox")

View file

@ -1,9 +1,11 @@
package akka.actor.mailbox
import org.apache.commons.io.FileUtils
import akka.dispatch.CustomMailboxType
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class FileBasedMailboxSpec extends DurableMailboxSpec("File", FileDurableMailboxType) {
class FileBasedMailboxSpec extends DurableMailboxSpec("File",
new CustomMailboxType("akka.actor.mailbox.FileBasedMailbox")) {
def clean {
val queuePath = FileBasedMailboxExtension(system).QueuePath

View file

@ -4,15 +4,14 @@
package akka.actor.mailbox
import akka.util.ReflectiveAccess
import java.lang.reflect.InvocationTargetException
import akka.AkkaException
import akka.actor.ActorCell
import akka.actor.ActorContext
import akka.actor.ActorRef
import akka.actor.SerializedActorRef
import akka.dispatch.Envelope
import akka.dispatch.DefaultSystemMessageQueue
import akka.dispatch.Dispatcher
import akka.dispatch.Mailbox
import akka.dispatch.CustomMailbox
import akka.dispatch.MailboxType
import akka.dispatch.MessageDispatcher
import akka.dispatch.MessageQueue
@ -24,6 +23,7 @@ import akka.remote.RemoteActorRefProvider
import akka.remote.netty.NettyRemoteServer
import akka.serialization.Serialization
import com.typesafe.config.Config
import akka.dispatch.CustomMailboxType
private[akka] object DurableExecutableMailboxConfig {
val Name = "[\\.\\/\\$\\s]".r
@ -33,7 +33,7 @@ class DurableMailboxException private[akka] (message: String, cause: Throwable)
def this(message: String) = this(message, null)
}
abstract class DurableMailbox(owner: ActorCell) extends Mailbox(owner) with DefaultSystemMessageQueue {
abstract class DurableMailbox(owner: ActorContext) extends CustomMailbox(owner) with DefaultSystemMessageQueue {
import DurableExecutableMailboxConfig._
def system = owner.system
@ -45,7 +45,7 @@ abstract class DurableMailbox(owner: ActorCell) extends Mailbox(owner) with Defa
trait DurableMessageSerialization {
def owner: ActorCell
def owner: ActorContext
def serialize(durableMessage: Envelope): Array[Byte] = {
@ -73,73 +73,3 @@ trait DurableMessageSerialization {
}
abstract class DurableMailboxType(mailboxFQN: String) extends MailboxType {
val constructorSignature = Array[Class[_]](classOf[ActorCell])
val mailboxClass: Class[_] = ReflectiveAccess.getClassFor(mailboxFQN, classOf[ActorCell].getClassLoader) match {
case Right(clazz) clazz
case Left(exception)
val cause = exception match {
case i: InvocationTargetException i.getTargetException
case _ exception
}
throw new DurableMailboxException("Cannot find class [%s] due to: %s".format(mailboxFQN, cause.toString))
}
//TODO take into consideration a mailboxConfig parameter so one can have bounded mboxes and capacity etc
def create(receiver: ActorCell): Mailbox = {
ReflectiveAccess.createInstance[AnyRef](mailboxClass, constructorSignature, Array[AnyRef](receiver)) match {
case Right(instance) instance.asInstanceOf[Mailbox]
case Left(exception)
val cause = exception match {
case i: InvocationTargetException i.getTargetException
case _ exception
}
throw new DurableMailboxException("Cannot instantiate [%s] due to: %s".format(mailboxClass.getName, cause.toString))
}
}
}
case object RedisDurableMailboxType extends DurableMailboxType("akka.actor.mailbox.RedisBasedMailbox")
case object MongoDurableMailboxType extends DurableMailboxType("akka.actor.mailbox.MongoBasedMailbox")
case object BeanstalkDurableMailboxType extends DurableMailboxType("akka.actor.mailbox.BeanstalkBasedMailbox")
case object FileDurableMailboxType extends DurableMailboxType("akka.actor.mailbox.FileBasedMailbox")
case object ZooKeeperDurableMailboxType extends DurableMailboxType("akka.actor.mailbox.ZooKeeperBasedMailbox")
case class FqnDurableMailboxType(mailboxFQN: String) extends DurableMailboxType(mailboxFQN)
/**
* Java API for the mailbox types. Usage:
* <pre><code>
* MessageDispatcher dispatcher = system.dispatcherFactory()
* .newDispatcher("my-dispatcher", 1, DurableMailboxType.redisDurableMailboxType()).build();
* </code></pre>
*/
object DurableMailboxType {
def redisDurableMailboxType(): DurableMailboxType = RedisDurableMailboxType
def mongoDurableMailboxType(): DurableMailboxType = MongoDurableMailboxType
def beanstalkDurableMailboxType(): DurableMailboxType = BeanstalkDurableMailboxType
def fileDurableMailboxType(): DurableMailboxType = FileDurableMailboxType
def zooKeeperDurableMailboxType(): DurableMailboxType = ZooKeeperDurableMailboxType
def fqnDurableMailboxType(mailboxFQN: String): DurableMailboxType = FqnDurableMailboxType(mailboxFQN)
}
/**
* Configurator for the DurableMailbox
* Do not forget to specify the "storage", valid values are "redis", "beanstalkd", "zookeeper", "mongodb", "file",
* or a full class name of the Mailbox implementation.
*/
class DurableMailboxConfigurator {
// TODO PN #896: when and how is this class supposed to be used? Can we remove it?
def mailboxType(config: Config): MailboxType = {
if (!config.hasPath("storage")) throw new DurableMailboxException("No 'storage' defined for durable mailbox")
config.getString("storage") match {
case "redis" RedisDurableMailboxType
case "mongodb" MongoDurableMailboxType
case "beanstalk" BeanstalkDurableMailboxType
case "zookeeper" ZooKeeperDurableMailboxType
case "file" FileDurableMailboxType
case fqn FqnDurableMailboxType(fqn)
}
}
}

View file

@ -1,15 +1,16 @@
package akka.actor.mailbox
import java.util.concurrent.TimeUnit
import java.util.concurrent.CountDownLatch
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
import org.scalatest.{ BeforeAndAfterEach, BeforeAndAfterAll }
import akka.actor._
import akka.actor.Actor._
import java.util.concurrent.CountDownLatch
import akka.dispatch.MessageDispatcher
import akka.testkit.AkkaSpec
import akka.dispatch.Dispatchers
import akka.dispatch.MailboxType
import akka.testkit.AkkaSpec
object DurableMailboxSpecActorFactory {
@ -23,7 +24,7 @@ object DurableMailboxSpecActorFactory {
}
abstract class DurableMailboxSpec(val backendName: String, val mailboxType: DurableMailboxType) extends AkkaSpec with BeforeAndAfterEach {
abstract class DurableMailboxSpec(val backendName: String, val mailboxType: MailboxType) extends AkkaSpec with BeforeAndAfterEach {
import DurableMailboxSpecActorFactory._
implicit val dispatcher = system.dispatcherFactory.newDispatcher(backendName, throughput = 1, mailboxType = mailboxType).build

View file

@ -7,7 +7,7 @@ import akka.AkkaException
import com.mongodb.async._
import com.mongodb.async.futures.RequestFutures
import org.bson.collection._
import akka.actor.ActorCell
import akka.actor.ActorContext
import akka.event.Logging
import akka.actor.ActorRef
import akka.dispatch.{ Await, Promise, Envelope, DefaultPromise }
@ -26,7 +26,7 @@ class MongoBasedMailboxException(message: String) extends AkkaException(message)
*
* @author <a href="http://evilmonkeylabs.com">Brendan W. McAdams</a>
*/
class MongoBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner) {
class MongoBasedMailbox(val owner: ActorContext) extends DurableMailbox(owner) {
// this implicit object provides the context for reading/writing things as MongoDurableMessage
implicit val mailboxBSONSer = new BSONSerializableMailbox(system)
implicit val safeWrite = WriteConcern.Safe // TODO - Replica Safe when appropriate!

View file

@ -1,18 +1,19 @@
package akka.actor.mailbox
import java.util.concurrent.TimeUnit
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
import org.scalatest.{ BeforeAndAfterEach, BeforeAndAfterAll }
import akka.actor._
import akka.actor.Actor._
import java.util.concurrent.CountDownLatch
import akka.dispatch.MessageDispatcher
import akka.dispatch.CustomMailboxType
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class MongoBasedMailboxSpec extends DurableMailboxSpec("mongodb", MongoDurableMailboxType) {
class MongoBasedMailboxSpec extends DurableMailboxSpec("mongodb",
new CustomMailboxType("akka.actor.mailbox.MongoBasedMailbox")) {
import org.apache.log4j.{ Logger, Level }
import com.mongodb.async._

View file

@ -6,14 +6,14 @@ package akka.actor.mailbox
import com.redis._
import akka.actor.LocalActorRef
import akka.AkkaException
import akka.actor.ActorCell
import akka.actor.ActorContext
import akka.dispatch.Envelope
import akka.event.Logging
import akka.actor.ActorRef
class RedisBasedMailboxException(message: String) extends AkkaException(message)
class RedisBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner) with DurableMessageSerialization {
class RedisBasedMailbox(val owner: ActorContext) extends DurableMailbox(owner) with DurableMessageSerialization {
private val settings = RedisBasedMailboxExtension(owner.system)

View file

@ -1,4 +1,6 @@
package akka.actor.mailbox
import akka.dispatch.CustomMailboxType
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class RedisBasedMailboxSpec extends DurableMailboxSpec("Redis", RedisDurableMailboxType)
class RedisBasedMailboxSpec extends DurableMailboxSpec("Redis",
new CustomMailboxType("akka.actor.mailbox.RedisBasedMailbox"))

View file

@ -8,7 +8,7 @@ import akka.actor.LocalActorRef
import akka.util.Duration
import akka.AkkaException
import org.I0Itec.zkclient.serialize._
import akka.actor.ActorCell
import akka.actor.ActorContext
import akka.cluster.zookeeper.AkkaZkClient
import akka.dispatch.Envelope
import akka.event.Logging
@ -17,7 +17,7 @@ import akka.actor.ActorRef
class ZooKeeperBasedMailboxException(message: String) extends AkkaException(message)
class ZooKeeperBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner) with DurableMessageSerialization {
class ZooKeeperBasedMailbox(val owner: ActorContext) extends DurableMailbox(owner) with DurableMessageSerialization {
private val settings = ZooKeeperBasedMailboxExtension(owner.system)
val queueNode = "/queues"

View file

@ -4,10 +4,13 @@ import akka.actor.{ Actor, LocalActorRef }
import akka.cluster.zookeeper._
import org.I0Itec.zkclient._
import akka.dispatch.MessageDispatcher
import akka.dispatch.CustomMailboxType
import akka.actor.ActorRef
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class ZooKeeperBasedMailboxSpec extends DurableMailboxSpec("ZooKeeper", ZooKeeperDurableMailboxType) {
class ZooKeeperBasedMailboxSpec extends DurableMailboxSpec("ZooKeeper",
new CustomMailboxType("akka.actor.mailbox.ZooKeeperBasedMailbox")) {
val dataPath = "_akka_cluster/data"
val logPath = "_akka_cluster/log"

View file

@ -94,7 +94,7 @@ class Remote(val settings: ActorSystem.Settings, val remoteSettings: RemoteSetti
case Right(remote)
remote.start(None) //TODO Any application loader here?
remote.start(Option(Thread.currentThread().getContextClassLoader)) //TODO Any application loader here?
val remoteClientLifeCycleHandler = system.systemActorOf(Props(new Actor {
def receive = {

View file

@ -14,6 +14,7 @@ import java.net.URISyntaxException
import java.net.InetAddress
import java.net.UnknownHostException
import java.net.UnknownServiceException
import akka.event.Logging
/**
* Interface for remote transports to encode their addresses. The three parts
@ -135,7 +136,9 @@ trait RemoteModule {
/**
* Remote life-cycle events.
*/
sealed trait RemoteLifeCycleEvent
sealed trait RemoteLifeCycleEvent {
def logLevel: Logging.LogLevel
}
/**
* Life-cycle events for RemoteClient.
@ -148,6 +151,7 @@ case class RemoteClientError[T <: ParsedTransportAddress](
@BeanProperty cause: Throwable,
@BeanProperty remote: RemoteSupport[T],
@BeanProperty remoteAddress: T) extends RemoteClientLifeCycleEvent {
override def logLevel = Logging.ErrorLevel
override def toString =
"RemoteClientError@" +
remoteAddress +
@ -159,6 +163,7 @@ case class RemoteClientError[T <: ParsedTransportAddress](
case class RemoteClientDisconnected[T <: ParsedTransportAddress](
@BeanProperty remote: RemoteSupport[T],
@BeanProperty remoteAddress: T) extends RemoteClientLifeCycleEvent {
override def logLevel = Logging.DebugLevel
override def toString =
"RemoteClientDisconnected@" + remoteAddress
}
@ -166,6 +171,7 @@ case class RemoteClientDisconnected[T <: ParsedTransportAddress](
case class RemoteClientConnected[T <: ParsedTransportAddress](
@BeanProperty remote: RemoteSupport[T],
@BeanProperty remoteAddress: T) extends RemoteClientLifeCycleEvent {
override def logLevel = Logging.DebugLevel
override def toString =
"RemoteClientConnected@" + remoteAddress
}
@ -173,6 +179,7 @@ case class RemoteClientConnected[T <: ParsedTransportAddress](
case class RemoteClientStarted[T <: ParsedTransportAddress](
@BeanProperty remote: RemoteSupport[T],
@BeanProperty remoteAddress: T) extends RemoteClientLifeCycleEvent {
override def logLevel = Logging.InfoLevel
override def toString =
"RemoteClientStarted@" + remoteAddress
}
@ -180,6 +187,7 @@ case class RemoteClientStarted[T <: ParsedTransportAddress](
case class RemoteClientShutdown[T <: ParsedTransportAddress](
@BeanProperty remote: RemoteSupport[T],
@BeanProperty remoteAddress: T) extends RemoteClientLifeCycleEvent {
override def logLevel = Logging.InfoLevel
override def toString =
"RemoteClientShutdown@" + remoteAddress
}
@ -189,6 +197,7 @@ case class RemoteClientWriteFailed[T <: ParsedTransportAddress](
@BeanProperty cause: Throwable,
@BeanProperty remote: RemoteSupport[T],
@BeanProperty remoteAddress: T) extends RemoteClientLifeCycleEvent {
override def logLevel = Logging.WarningLevel
override def toString =
"RemoteClientWriteFailed@" +
remoteAddress +
@ -206,12 +215,14 @@ trait RemoteServerLifeCycleEvent extends RemoteLifeCycleEvent
case class RemoteServerStarted[T <: ParsedTransportAddress](
@BeanProperty remote: RemoteSupport[T]) extends RemoteServerLifeCycleEvent {
override def logLevel = Logging.InfoLevel
override def toString =
"RemoteServerStarted@" + remote.name
}
case class RemoteServerShutdown[T <: ParsedTransportAddress](
@BeanProperty remote: RemoteSupport[T]) extends RemoteServerLifeCycleEvent {
override def logLevel = Logging.InfoLevel
override def toString =
"RemoteServerShutdown@" + remote.name
}
@ -219,6 +230,7 @@ case class RemoteServerShutdown[T <: ParsedTransportAddress](
case class RemoteServerError[T <: ParsedTransportAddress](
@BeanProperty val cause: Throwable,
@BeanProperty remote: RemoteSupport[T]) extends RemoteServerLifeCycleEvent {
override def logLevel = Logging.ErrorLevel
override def toString =
"RemoteServerError@" +
remote.name +
@ -230,6 +242,7 @@ case class RemoteServerError[T <: ParsedTransportAddress](
case class RemoteServerClientConnected[T <: ParsedTransportAddress](
@BeanProperty remote: RemoteSupport[T],
@BeanProperty val clientAddress: Option[T]) extends RemoteServerLifeCycleEvent {
override def logLevel = Logging.DebugLevel
override def toString =
"RemoteServerClientConnected@" +
remote.name +
@ -241,6 +254,7 @@ case class RemoteServerClientConnected[T <: ParsedTransportAddress](
case class RemoteServerClientDisconnected[T <: ParsedTransportAddress](
@BeanProperty remote: RemoteSupport[T],
@BeanProperty val clientAddress: Option[T]) extends RemoteServerLifeCycleEvent {
override def logLevel = Logging.DebugLevel
override def toString =
"RemoteServerClientDisconnected@" +
remote.name +
@ -252,6 +266,7 @@ case class RemoteServerClientDisconnected[T <: ParsedTransportAddress](
case class RemoteServerClientClosed[T <: ParsedTransportAddress](
@BeanProperty remote: RemoteSupport[T],
@BeanProperty val clientAddress: Option[T]) extends RemoteServerLifeCycleEvent {
override def logLevel = Logging.DebugLevel
override def toString =
"RemoteServerClientClosed@" +
remote.name +
@ -265,6 +280,7 @@ case class RemoteServerWriteFailed[T <: ParsedTransportAddress](
@BeanProperty cause: Throwable,
@BeanProperty remote: RemoteSupport[T],
@BeanProperty remoteAddress: Option[T]) extends RemoteServerLifeCycleEvent {
override def logLevel = Logging.WarningLevel
override def toString =
"RemoteServerWriteFailed@" +
remote +
@ -320,7 +336,7 @@ abstract class RemoteSupport[-T <: ParsedTransportAddress](val system: ActorSyst
protected[akka] def notifyListeners(message: RemoteLifeCycleEvent): Unit = {
system.eventStream.publish(message)
system.log.debug("REMOTE: {}", message)
system.log.log(message.logLevel, "REMOTE: {}", message)
}
override def toString = name

View file

@ -19,7 +19,7 @@ trait RemoteRouterConfig extends RouterConfig {
case x throw new ConfigurationException("unparseable remote node " + x)
}
val node = Stream.continually(nodes).flatten.iterator
val impl = context.system.asInstanceOf[ActorSystemImpl]
val impl = context.system.asInstanceOf[ActorSystemImpl] //FIXME should we rely on this cast to work here?
Vector.empty[ActorRef] ++ (for (i 1 to nrOfInstances) yield {
val name = "c" + i
val deploy = Deploy("", ConfigFactory.empty(), None, props.routerConfig, RemoteScope(node.next))

View file

@ -88,12 +88,12 @@ class TestFSMRef[S, D, T <: Actor](
object TestFSMRef {
def apply[S, D, T <: Actor](factory: T)(implicit ev: T <:< FSM[S, D], system: ActorSystem): TestFSMRef[S, D, T] = {
val impl = system.asInstanceOf[ActorSystemImpl]
val impl = system.asInstanceOf[ActorSystemImpl] //FIXME should we rely on this cast to work here?
new TestFSMRef(impl, system.dispatcherFactory.prerequisites, Props(creator = () factory), impl.guardian.asInstanceOf[InternalActorRef], TestActorRef.randomName)
}
def apply[S, D, T <: Actor](factory: T, name: String)(implicit ev: T <:< FSM[S, D], system: ActorSystem): TestFSMRef[S, D, T] = {
val impl = system.asInstanceOf[ActorSystemImpl]
val impl = system.asInstanceOf[ActorSystemImpl] //FIXME should we rely on this cast to work here?
new TestFSMRef(impl, system.dispatcherFactory.prerequisites, Props(creator = () factory), impl.guardian.asInstanceOf[InternalActorRef], name)
}
}

View file

@ -102,7 +102,7 @@ class TestKit(_system: ActorSystem) {
* registration as message target.
*/
lazy val testActor: ActorRef = {
val impl = system.asInstanceOf[ActorSystemImpl]
val impl = system.asInstanceOf[ActorSystemImpl] //FIXME should we rely on this cast to work here?
impl.systemActorOf(Props(new TestActor(queue))
.copy(dispatcher = new CallingThreadDispatcher(system.dispatcherFactory.prerequisites)),
"testActor" + TestKit.testActorId.incrementAndGet)

View file

@ -5,11 +5,9 @@
package akka.testkit
import akka.util.Duration
import java.util.concurrent.{ CountDownLatch, TimeUnit }
import akka.actor.ActorSystem
class TestLatchTimeoutException(message: String) extends RuntimeException(message)
class TestLatchNoTimeoutException(message: String) extends RuntimeException(message)
import akka.dispatch.Await.{ CanAwait, Awaitable }
import java.util.concurrent.{ TimeoutException, CountDownLatch, TimeUnit }
/**
* A count down latch wrapper for use in testing.
@ -24,34 +22,23 @@ object TestLatch {
def apply(count: Int = 1)(implicit system: ActorSystem) = new TestLatch(count)
}
class TestLatch(count: Int = 1)(implicit system: ActorSystem) {
class TestLatch(count: Int = 1)(implicit system: ActorSystem) extends Awaitable[Unit] {
private var latch = new CountDownLatch(count)
def countDown() = latch.countDown()
def isOpen: Boolean = latch.getCount == 0
def open() = while (!isOpen) countDown()
def await(): Boolean = await(TestLatch.DefaultTimeout)
def await(timeout: Duration): Boolean = {
val opened = latch.await(timeout.dilated.toNanos, TimeUnit.NANOSECONDS)
if (!opened) throw new TestLatchTimeoutException(
"Timeout of %s with time factor of %s" format (timeout.toString, TestKitExtension(system).TestTimeFactor))
opened
}
/**
* Timeout is expected. Throws exception if latch is opened before timeout.
*/
def awaitTimeout(timeout: Duration = TestLatch.DefaultTimeout) = {
val opened = latch.await(timeout.dilated.toNanos, TimeUnit.NANOSECONDS)
if (opened) throw new TestLatchNoTimeoutException(
"Latch opened before timeout of %s with time factor of %s" format (timeout.toString, TestKitExtension(system).TestTimeFactor))
opened
}
def reset() = latch = new CountDownLatch(count)
def ready(atMost: Duration)(implicit permit: CanAwait) = {
val opened = latch.await(atMost.dilated.toNanos, TimeUnit.NANOSECONDS)
if (!opened) throw new TimeoutException(
"Timeout of %s with time factor of %s" format (atMost.toString, TestKitExtension(system).TestTimeFactor))
this
}
def result(atMost: Duration)(implicit permit: CanAwait): Unit = {
ready(atMost)
}
}

View file

@ -107,7 +107,7 @@ class AkkaSpecSpec extends WordSpec with MustMatchers {
system.actorFor("/") ! PoisonPill
latch.await(2 seconds)
Await.ready(latch, 2 seconds)
}
"must enqueue unread messages from testActor to deadLetters" in {
@ -139,7 +139,7 @@ class AkkaSpecSpec extends WordSpec with MustMatchers {
val latch = new TestLatch(1)(system)
system.registerOnTermination(latch.countDown())
system.shutdown()
latch.await(2 seconds)
Await.ready(latch, 2 seconds)
Await.result(davyJones ? "Die!", timeout.duration) must be === "finally gone"
// this will typically also contain log messages which were sent after the logger shutdown

View file

@ -25,12 +25,12 @@ object AkkaBuild extends Build {
id = "akka",
base = file("."),
settings = parentSettings ++ Release.settings ++ Unidoc.settings ++ Rstdoc.settings ++ Publish.versionSettings ++ Dist.settings ++ Seq(
parallelExecution in GlobalScope := false,
parallelExecution in GlobalScope := true,
Publish.defaultPublishTo in ThisBuild <<= crossTarget / "repository",
Unidoc.unidocExclude := Seq(samples.id, tutorials.id),
Dist.distExclude := Seq(actorTests.id, akkaSbtPlugin.id, docs.id)
),
aggregate = Seq(actor, testkit, actorTests, remote, slf4j, mailboxes, kernel, akkaSbtPlugin, samples, tutorials, docs)
aggregate = Seq(actor, testkit, actorTests, remote, slf4j, agent, mailboxes, kernel, akkaSbtPlugin, samples, tutorials, docs)
)
lazy val actor = Project(
@ -72,6 +72,8 @@ object AkkaBuild extends Build {
dependencies = Seq(actor, actorTests % "test->test", testkit % "test->test"),
settings = defaultSettings ++ multiJvmSettings ++ Seq(
libraryDependencies ++= Dependencies.cluster,
// disable parallel tests
parallelExecution in Test := false,
extraOptions in MultiJvm <<= (sourceDirectory in MultiJvm) { src =>
(name: String) => (src ** (name + ".conf")).get.headOption.map("-Dakka.config=" + _.absolutePath).toSeq
},
@ -92,6 +94,15 @@ object AkkaBuild extends Build {
)
)
lazy val agent = Project(
id = "akka-agent",
base = file("akka-agent"),
dependencies = Seq(actor, testkit % "test->test"),
settings = defaultSettings ++ Seq(
libraryDependencies ++= Dependencies.agent
)
)
// lazy val amqp = Project(
// id = "akka-amqp",
// base = file("akka-amqp"),
@ -254,7 +265,7 @@ object AkkaBuild extends Build {
lazy val docs = Project(
id = "akka-docs",
base = file("akka-docs"),
dependencies = Seq(actor, testkit % "test->test", remote, slf4j, fileMailbox, mongoMailbox, redisMailbox, beanstalkMailbox, zookeeperMailbox),
dependencies = Seq(actor, testkit % "test->test", remote, slf4j, agent, fileMailbox, mongoMailbox, redisMailbox, beanstalkMailbox, zookeeperMailbox),
settings = defaultSettings ++ Seq(
unmanagedSourceDirectories in Test <<= baseDirectory { _ ** "code" get },
libraryDependencies ++= Dependencies.docs,
@ -291,8 +302,7 @@ object AkkaBuild extends Build {
unmanagedClasspath in Runtime <+= (baseDirectory in LocalProject("akka")) map { base => Attributed.blank(base / "config") },
unmanagedClasspath in Test <+= (baseDirectory in LocalProject("akka")) map { base => Attributed.blank(base / "config") },
// disable parallel tests
parallelExecution in Test := false,
parallelExecution in Test := true,
// for excluding tests by name (or use system property: -Dakka.test.names.exclude=TimingSpec)
excludeTestNames := {
@ -368,6 +378,8 @@ object Dependencies {
val slf4j = Seq(slf4jApi)
val agent = Seq(scalaStm, Test.scalatest, Test.junit)
val amqp = Seq(rabbit, commonsIo, protobuf)
val mailboxes = Seq(Test.scalatest, Test.junit)
@ -408,11 +420,12 @@ object Dependency {
val Logback = "0.9.28"
val Netty = "3.2.5.Final"
val Protobuf = "2.4.1"
val Rabbit = "2.3.1"
val ScalaStm = "0.4"
val Scalatest = "1.6.1"
val Slf4j = "1.6.4"
val Spring = "3.0.5.RELEASE"
val Zookeeper = "3.4.0"
val Rabbit = "2.3.1"
}
// Compile
@ -437,6 +450,7 @@ object Dependency {
val protobuf = "com.google.protobuf" % "protobuf-java" % V.Protobuf // New BSD
val rabbit = "com.rabbitmq" % "amqp-client" % V.Rabbit // Mozilla Public License
val redis = "net.debasishg" %% "redisclient" % "2.4.0" // ApacheV2
val scalaStm = "org.scala-tools" %% "scala-stm" % V.ScalaStm // Modified BSD (Scala)
val sjson = "net.debasishg" %% "sjson" % "0.15" // ApacheV2
val slf4jApi = "org.slf4j" % "slf4j-api" % V.Slf4j // MIT
val springBeans = "org.springframework" % "spring-beans" % V.Spring // ApacheV2