diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala index 43af040d62..83c82e443b 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala @@ -334,7 +334,7 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout { clientRef ! "simple" clientRef ! "simple" - latch.await + Await.ready(latch, timeout.duration) latch.reset @@ -343,7 +343,7 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout { clientRef ! "simple" clientRef ! "simple" - latch.await + Await.ready(latch, timeout.duration) system.stop(clientRef) system.stop(serverRef) @@ -370,7 +370,7 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout { "restart when Kill:ed" in { filterException[ActorKilledException] { - val latch = new CountDownLatch(2) + val latch = TestLatch(2) val boss = system.actorOf(Props(new Actor { @@ -385,7 +385,7 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout { }).withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), 2, 1000))) boss ! "sendKill" - latch.await(5, TimeUnit.SECONDS) must be === true + Await.ready(latch, 5 seconds) } } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala index a856c045c1..a6d6a7df98 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala @@ -8,12 +8,14 @@ import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach } import akka.testkit._ import TestEvent.Mute import FSM._ -import akka.util.Duration import akka.util.duration._ import akka.event._ import com.typesafe.config.ConfigFactory +import akka.dispatch.Await +import akka.util.{ Timeout, Duration } object FSMActorSpec { + val timeout = Timeout(2 seconds) class Latches(implicit system: ActorSystem) { val unlockedLatch = TestLatch() @@ -122,7 +124,7 @@ class FSMActorSpec extends AkkaSpec(Map("akka.actor.debug.fsm" -> true)) with Im })) lock ! SubscribeTransitionCallBack(transitionTester) - initialStateLatch.await + Await.ready(initialStateLatch, timeout.duration) lock ! '3' lock ! '3' @@ -130,14 +132,14 @@ class FSMActorSpec extends AkkaSpec(Map("akka.actor.debug.fsm" -> true)) with Im lock ! '2' lock ! '1' - unlockedLatch.await - transitionLatch.await - transitionCallBackLatch.await - lockedLatch.await + Await.ready(unlockedLatch, timeout.duration) + Await.ready(transitionLatch, timeout.duration) + Await.ready(transitionCallBackLatch, timeout.duration) + Await.ready(lockedLatch, timeout.duration) EventFilter.warning(start = "unhandled event", occurrences = 1) intercept { lock ! "not_handled" - unhandledLatch.await + Await.ready(unhandledLatch, timeout.duration) } val answerLatch = TestLatch() @@ -151,10 +153,10 @@ class FSMActorSpec extends AkkaSpec(Map("akka.actor.debug.fsm" -> true)) with Im } })) tester ! Hello - answerLatch.await + Await.ready(answerLatch, timeout.duration) tester ! Bye - terminatedLatch.await + Await.ready(terminatedLatch, timeout.duration) } "log termination" in { @@ -186,7 +188,7 @@ class FSMActorSpec extends AkkaSpec(Map("akka.actor.debug.fsm" -> true)) with Im } } val ref = system.actorOf(Props(fsm)) - started.await + Await.ready(started, timeout.duration) system.stop(ref) expectMsg(1 second, fsm.StopEvent(Shutdown, 1, null)) } diff --git a/akka-actor-tests/src/test/scala/akka/actor/IOActor.scala b/akka-actor-tests/src/test/scala/akka/actor/IOActor.scala index 9b1ba99459..757acb1fd0 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/IOActor.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/IOActor.scala @@ -188,7 +188,7 @@ class IOActorSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout { val started = TestLatch(1) val ioManager = system.actorOf(Props(new IOManager(2))) // teeny tiny buffer val server = system.actorOf(Props(new SimpleEchoServer("localhost", 8064, ioManager, started))) - started.await + Await.ready(started, timeout.duration) val client = system.actorOf(Props(new SimpleEchoClient("localhost", 8064, ioManager))) val f1 = client ? ByteString("Hello World!1") val f2 = client ? ByteString("Hello World!2") @@ -205,7 +205,7 @@ class IOActorSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout { val started = TestLatch(1) val ioManager = system.actorOf(Props(new IOManager())) val server = system.actorOf(Props(new SimpleEchoServer("localhost", 8065, ioManager, started))) - started.await + Await.ready(started, timeout.duration) val client = system.actorOf(Props(new SimpleEchoClient("localhost", 8065, ioManager))) val list = List.range(0, 1000) val f = Future.traverse(list)(i ⇒ client ? ByteString(i.toString)) @@ -219,7 +219,7 @@ class IOActorSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout { val started = TestLatch(1) val ioManager = system.actorOf(Props(new IOManager(2))) val server = system.actorOf(Props(new SimpleEchoServer("localhost", 8066, ioManager, started))) - started.await + Await.ready(started, timeout.duration) val client = system.actorOf(Props(new SimpleEchoClient("localhost", 8066, ioManager))) val list = List.range(0, 1000) val f = Future.traverse(list)(i ⇒ client ? ByteString(i.toString)) @@ -233,7 +233,7 @@ class IOActorSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout { val started = TestLatch(1) val ioManager = system.actorOf(Props(new IOManager(2))) // teeny tiny buffer val server = system.actorOf(Props(new KVStore("localhost", 8067, ioManager, started))) - started.await + Await.ready(started, timeout.duration) val client1 = system.actorOf(Props(new KVClient("localhost", 8067, ioManager))) val client2 = system.actorOf(Props(new KVClient("localhost", 8067, ioManager))) val f1 = client1 ? (('set, "hello", ByteString("World"))) diff --git a/akka-actor-tests/src/test/scala/akka/actor/ReceiveTimeoutSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ReceiveTimeoutSpec.scala index 9706a77d9f..2671fa9b9a 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ReceiveTimeoutSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ReceiveTimeoutSpec.scala @@ -8,6 +8,8 @@ import akka.testkit._ import akka.util.duration._ import java.util.concurrent.atomic.AtomicInteger +import akka.dispatch.Await +import java.util.concurrent.TimeoutException @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class ReceiveTimeoutSpec extends AkkaSpec { @@ -25,7 +27,7 @@ class ReceiveTimeoutSpec extends AkkaSpec { } })) - timeoutLatch.await + Await.ready(timeoutLatch, TestLatch.DefaultTimeout) system.stop(timeoutActor) } @@ -44,7 +46,7 @@ class ReceiveTimeoutSpec extends AkkaSpec { timeoutActor ! Tick - timeoutLatch.await + Await.ready(timeoutLatch, TestLatch.DefaultTimeout) system.stop(timeoutActor) } @@ -67,7 +69,7 @@ class ReceiveTimeoutSpec extends AkkaSpec { timeoutActor ! Tick - timeoutLatch.await + Await.ready(timeoutLatch, TestLatch.DefaultTimeout) count.get must be(1) system.stop(timeoutActor) } @@ -81,7 +83,7 @@ class ReceiveTimeoutSpec extends AkkaSpec { } })) - timeoutLatch.awaitTimeout(1 second) // timeout expected + intercept[TimeoutException] { Await.ready(timeoutLatch, 1 second) } system.stop(timeoutActor) } diff --git a/akka-actor-tests/src/test/scala/akka/actor/RestartStrategySpec.scala b/akka-actor-tests/src/test/scala/akka/actor/RestartStrategySpec.scala index 789bf12e68..b627046052 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/RestartStrategySpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/RestartStrategySpec.scala @@ -32,7 +32,7 @@ class RestartStrategySpec extends AkkaSpec with DefaultTimeout { val restartLatch = new TestLatch val secondRestartLatch = new TestLatch - val countDownLatch = new CountDownLatch(3) + val countDownLatch = new TestLatch(3) val stopLatch = new TestLatch val slaveProps = Props(new Actor { @@ -60,23 +60,23 @@ class RestartStrategySpec extends AkkaSpec with DefaultTimeout { slave ! Ping // test restart and post restart ping - assert(restartLatch.await(10 seconds)) + Await.ready(restartLatch, 10 seconds) // now crash again... should not restart slave ! Crash slave ! Ping - assert(secondRestartLatch.await(10 seconds)) - assert(countDownLatch.await(10, TimeUnit.SECONDS)) + Await.ready(secondRestartLatch, 10 seconds) + Await.ready(countDownLatch, 10 seconds) slave ! Crash - assert(stopLatch.await(10 seconds)) + Await.ready(stopLatch, 10 seconds) } "ensure that slave is immortal without max restarts and time range" in { val boss = system.actorOf(Props[Supervisor].withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), None, None))) - val countDownLatch = new CountDownLatch(100) + val countDownLatch = new TestLatch(100) val slaveProps = Props(new Actor { @@ -91,7 +91,7 @@ class RestartStrategySpec extends AkkaSpec with DefaultTimeout { val slave = Await.result((boss ? slaveProps).mapTo[ActorRef], timeout.duration) (1 to 100) foreach { _ ⇒ slave ! Crash } - assert(countDownLatch.await(120, TimeUnit.SECONDS)) + Await.ready(countDownLatch, 2 minutes) assert(!slave.isTerminated) } @@ -131,14 +131,14 @@ class RestartStrategySpec extends AkkaSpec with DefaultTimeout { slave ! Ping slave ! Crash - assert(restartLatch.await(10 seconds)) - assert(pingLatch.await(10 seconds)) + Await.ready(restartLatch, 10 seconds) + Await.ready(pingLatch, 10 seconds) slave ! Ping slave ! Crash - assert(secondRestartLatch.await(10 seconds)) - assert(secondPingLatch.await(10 seconds)) + Await.ready(secondRestartLatch, 10 seconds) + Await.ready(secondPingLatch, 10 seconds) // sleep to go out of the restart strategy's time range sleep(700L) @@ -147,7 +147,7 @@ class RestartStrategySpec extends AkkaSpec with DefaultTimeout { slave ! Crash slave ! Ping - assert(thirdRestartLatch.await(1 second)) + Await.ready(thirdRestartLatch, 1 second) assert(!slave.isTerminated) } @@ -157,7 +157,7 @@ class RestartStrategySpec extends AkkaSpec with DefaultTimeout { val restartLatch = new TestLatch val secondRestartLatch = new TestLatch - val countDownLatch = new CountDownLatch(3) + val countDownLatch = new TestLatch(3) val stopLatch = new TestLatch val slaveProps = Props(new Actor { @@ -184,7 +184,7 @@ class RestartStrategySpec extends AkkaSpec with DefaultTimeout { slave ! Ping // test restart and post restart ping - assert(restartLatch.await(10 seconds)) + Await.ready(restartLatch, 10 seconds) assert(!slave.isTerminated) @@ -192,20 +192,20 @@ class RestartStrategySpec extends AkkaSpec with DefaultTimeout { slave ! Crash slave ! Ping - assert(secondRestartLatch.await(10 seconds)) - assert(countDownLatch.await(10, TimeUnit.SECONDS)) + Await.ready(secondRestartLatch, 10 seconds) + Await.ready(countDownLatch, 10 seconds) sleep(700L) slave ! Crash - assert(stopLatch.await(10 seconds)) + Await.ready(stopLatch, 10 seconds) sleep(500L) assert(slave.isTerminated) } "ensure that slave is not restarted within time range" in { val restartLatch, stopLatch, maxNoOfRestartsLatch = new TestLatch - val countDownLatch = new CountDownLatch(2) + val countDownLatch = new TestLatch(2) val boss = system.actorOf(Props(new Actor { def receive = { @@ -236,7 +236,7 @@ class RestartStrategySpec extends AkkaSpec with DefaultTimeout { slave ! Ping // test restart and post restart ping - assert(restartLatch.await(10 seconds)) + Await.ready(restartLatch, 10 seconds) assert(!slave.isTerminated) @@ -245,14 +245,14 @@ class RestartStrategySpec extends AkkaSpec with DefaultTimeout { // may not be running slave ! Ping - assert(countDownLatch.await(10, TimeUnit.SECONDS)) + Await.ready(countDownLatch, 10 seconds) // may not be running slave ! Crash - assert(stopLatch.await(10 seconds)) + Await.ready(stopLatch, 10 seconds) - assert(maxNoOfRestartsLatch.await(10 seconds)) + Await.ready(maxNoOfRestartsLatch, 10 seconds) sleep(500L) assert(slave.isTerminated) } diff --git a/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala index 7c0aefb2fe..ba06a90023 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala @@ -103,7 +103,7 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout object Crash val restartLatch = new TestLatch - val pingLatch = new CountDownLatch(6) + val pingLatch = new TestLatch(6) val supervisor = system.actorOf(Props[Supervisor].withFaultHandler(AllForOneStrategy(List(classOf[Exception]), 3, 1000))) val props = Props(new Actor { @@ -122,13 +122,13 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout collectCancellable(system.scheduler.scheduleOnce(1000 milliseconds, actor, Crash)) } - assert(restartLatch.await(2 seconds)) + Await.ready(restartLatch, 2 seconds) // should be enough time for the ping countdown to recover and reach 6 pings - assert(pingLatch.await(4, TimeUnit.SECONDS)) + Await.ready(pingLatch, 5 seconds) } "never fire prematurely" in { - val ticks = new CountDownLatch(300) + val ticks = new TestLatch(300) case class Msg(ts: Long) @@ -147,11 +147,11 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout Thread.sleep(5) } - assert(ticks.await(3, TimeUnit.SECONDS) == true) + Await.ready(ticks, 3 seconds) } "schedule with different initial delay and frequency" in { - val ticks = new CountDownLatch(3) + val ticks = new TestLatch(3) case object Msg @@ -162,12 +162,12 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout })) val startTime = System.nanoTime() - val cancellable = system.scheduler.schedule(1 second, 100 milliseconds, actor, Msg) - ticks.await(3, TimeUnit.SECONDS) + val cancellable = system.scheduler.schedule(1 second, 300 milliseconds, actor, Msg) + Await.ready(ticks, 3 seconds) val elapsedTimeMs = (System.nanoTime() - startTime) / 1000000 - assert(elapsedTimeMs > 1200) - assert(elapsedTimeMs < 1500) // the precision is not ms exact + assert(elapsedTimeMs > 1600) + assert(elapsedTimeMs < 2000) // the precision is not ms exact cancellable.cancel() } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/routing/ListenerSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/routing/ListenerSpec.scala index 38a57fda10..b9fc484957 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/routing/ListenerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/routing/ListenerSpec.scala @@ -5,6 +5,7 @@ import akka.actor._ import akka.actor.Actor._ import akka.routing._ import java.util.concurrent.atomic.AtomicInteger +import akka.dispatch.Await @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class ListenerSpec extends AkkaSpec { @@ -45,10 +46,10 @@ class ListenerSpec extends AkkaSpec { broadcast ! WithListeners(_ ! "foo") broadcast ! "foo" - barLatch.await + Await.ready(barLatch, TestLatch.DefaultTimeout) barCount.get must be(2) - fooLatch.await + Await.ready(fooLatch, TestLatch.DefaultTimeout) for (a ← List(broadcast, a1, a2, a3)) system.stop(a) } diff --git a/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala b/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala index b0529e19cf..ff4aacc4b5 100644 --- a/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala @@ -32,6 +32,7 @@ class ConfigSpec extends AkkaSpec(ConfigFactory.defaultReference) { getBoolean("akka.actor.default-dispatcher.allow-core-timeout") must equal(true) getInt("akka.actor.default-dispatcher.mailbox-capacity") must equal(-1) getMilliseconds("akka.actor.default-dispatcher.mailbox-push-timeout-time") must equal(10 * 1000) + getString("akka.actor.default-dispatcher.mailboxType") must be("") getMilliseconds("akka.actor.dispatcher-shutdown-timeout") must equal(1 * 1000) settings.DispatcherDefaultShutdown must equal(1 second) getInt("akka.actor.default-dispatcher.throughput") must equal(5) diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala index 08b6a766ab..abf37fdff8 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala @@ -28,10 +28,10 @@ object FutureSpec { class TestDelayActor(await: TestLatch) extends Actor { def receive = { - case "Hello" ⇒ await.await; sender ! "World" - case "NoReply" ⇒ await.await + case "Hello" ⇒ Await.ready(await, TestLatch.DefaultTimeout); sender ! "World" + case "NoReply" ⇒ Await.ready(await, TestLatch.DefaultTimeout) case "Failure" ⇒ - await.await + Await.ready(await, TestLatch.DefaultTimeout) sender ! Status.Failure(new RuntimeException("Expected exception; to test fault-tolerance")) } } @@ -72,7 +72,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa val latch = new TestLatch val result = "test value" val future = Future { - latch.await + Await.ready(latch, TestLatch.DefaultTimeout) result } test(future) @@ -85,7 +85,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa val latch = new TestLatch val result = "test value" val future = Future { - latch.await + Await.ready(latch, TestLatch.DefaultTimeout) result } latch.open() @@ -395,7 +395,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa val latch = new TestLatch val actor = system.actorOf(Props[TestActor]) actor ? "Hello" onSuccess { case "World" ⇒ latch.open() } - assert(latch.await(5 seconds)) + Await.ready(latch, 5 seconds) system.stop(actor) } @@ -426,7 +426,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa intercept[ThrowableTest] { Await.result(f1, timeout.duration) } val latch = new TestLatch - val f2 = Future { latch.await(5 seconds); "success" } + val f2 = Future { Await.ready(latch, 5 seconds); "success" } f2 foreach (_ ⇒ throw new ThrowableTest("dispatcher foreach")) f2 onSuccess { case _ ⇒ throw new ThrowableTest("dispatcher receive") } val f3 = f2 map (s ⇒ s.toUpperCase) @@ -441,7 +441,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa "shouldBlockUntilResult" in { val latch = new TestLatch - val f = Future { latch.await; 5 } + val f = Future { Await.ready(latch, 5 seconds); 5 } val f2 = Future { Await.result(f, timeout.duration) + 5 } intercept[TimeoutException](Await.ready(f2, 100 millis)) @@ -525,8 +525,8 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa z() + y() } - assert(ly.await(100 milliseconds)) - lz.awaitTimeout(100 milliseconds) + Await.ready(ly, 100 milliseconds) + intercept[TimeoutException] { Await.ready(lz, 100 milliseconds) } flow { x << 5 } @@ -588,20 +588,20 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa lz.open() x1() + x2() } - assert(lx.await(2 seconds)) + Await.ready(lx, 2 seconds) assert(!ly.isOpen) assert(!lz.isOpen) assert(List(x1, x2, y1, y2).forall(_.isCompleted == false)) flow { y1 << 1 } // When this is set, it should cascade down the line - assert(ly.await(2 seconds)) + Await.ready(ly, 2 seconds) assert(Await.result(x1, 1 minute) === 1) assert(!lz.isOpen) flow { y2 << 9 } // When this is set, it should cascade down the line - assert(lz.await(2 seconds)) + Await.ready(lz, 2 seconds) assert(Await.result(x2, 1 minute) === 9) assert(List(x1, x2, y1, y2).forall(_.isCompleted)) @@ -614,16 +614,16 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa val i1, i2, s1, s2 = new TestLatch - val callService1 = Future { i1.open(); s1.await; 1 } - val callService2 = Future { i2.open(); s2.await; 9 } + val callService1 = Future { i1.open(); Await.ready(s1, TestLatch.DefaultTimeout); 1 } + val callService2 = Future { i2.open(); Await.ready(s2, TestLatch.DefaultTimeout); 9 } val result = flow { callService1() + callService2() } assert(!s1.isOpen) assert(!s2.isOpen) assert(!result.isCompleted) - assert(i1.await(2 seconds)) - assert(i2.await(2 seconds)) + Await.ready(i1, 2 seconds) + Await.ready(i2, 2 seconds) s1.open() s2.open() assert(Await.result(result, timeout.duration) === 10) @@ -644,10 +644,8 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa lz.open() z() + y() + oops } - - ly.awaitTimeout(100 milliseconds) - lz.awaitTimeout(100 milliseconds) - + intercept[TimeoutException] { Await.ready(ly, 100 milliseconds) } + intercept[TimeoutException] { Await.ready(lz, 100 milliseconds) } flow { x << 5 } assert(Await.result(y, timeout.duration) === 5) @@ -662,7 +660,7 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa val latch = new TestLatch val future = Future { - latch.await + Await.ready(latch, TestLatch.DefaultTimeout) "Hello" } @@ -745,36 +743,36 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa "run callbacks async" in { val latch = Vector.fill(10)(new TestLatch) - val f1 = Future { latch(0).open(); latch(1).await; "Hello" } - val f2 = f1 map { s ⇒ latch(2).open(); latch(3).await; s.length } + val f1 = Future { latch(0).open(); Await.ready(latch(1), TestLatch.DefaultTimeout); "Hello" } + val f2 = f1 map { s ⇒ latch(2).open(); Await.ready(latch(3), TestLatch.DefaultTimeout); s.length } f2 foreach (_ ⇒ latch(4).open()) - latch(0).await + Await.ready(latch(0), TestLatch.DefaultTimeout) f1 must not be ('completed) f2 must not be ('completed) latch(1).open() - latch(2).await + Await.ready(latch(2), TestLatch.DefaultTimeout) f1 must be('completed) f2 must not be ('completed) - val f3 = f1 map { s ⇒ latch(5).open(); latch(6).await; s.length * 2 } + val f3 = f1 map { s ⇒ latch(5).open(); Await.ready(latch(6), TestLatch.DefaultTimeout); s.length * 2 } f3 foreach (_ ⇒ latch(3).open()) - latch(5).await + Await.ready(latch(5), TestLatch.DefaultTimeout) f3 must not be ('completed) latch(6).open() - latch(4).await + Await.ready(latch(4), TestLatch.DefaultTimeout) f2 must be('completed) f3 must be('completed) val p1 = Promise[String]() - val f4 = p1 map { s ⇒ latch(7).open(); latch(8).await; s.length } + val f4 = p1 map { s ⇒ latch(7).open(); Await.ready(latch(8), TestLatch.DefaultTimeout); s.length } f4 foreach (_ ⇒ latch(9).open()) p1 must not be ('completed) @@ -782,13 +780,13 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa p1 complete Right("Hello") - latch(7).await + Await.ready(latch(7), TestLatch.DefaultTimeout) p1 must be('completed) f4 must not be ('completed) latch(8).open() - latch(9).await + Await.ready(latch(9), TestLatch.DefaultTimeout) Await.ready(f4, timeout.duration) must be('completed) } @@ -802,9 +800,9 @@ class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with Defa Future.blocking(system.dispatcher) val nested = Future(()) nested foreach (_ ⇒ l1.open()) - l1.await // make sure nested is completed + Await.ready(l1, TestLatch.DefaultTimeout) // make sure nested is completed nested foreach (_ ⇒ l2.open()) - l2.await + Await.ready(l2, TestLatch.DefaultTimeout) } Await.ready(complex, timeout.duration) must be('completed) } diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala index d0c2053243..d3dd9e9209 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala @@ -1,9 +1,13 @@ package akka.dispatch + import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach } import java.util.concurrent.{ TimeUnit, BlockingQueue } +import java.util.concurrent.ConcurrentLinkedQueue import akka.util._ import akka.util.duration._ import akka.testkit.AkkaSpec +import akka.actor.ActorRef +import akka.actor.ActorContext @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAndAfterEach { @@ -144,3 +148,26 @@ class PriorityMailboxSpec extends MailboxSpec { case BoundedMailbox(capacity, pushTimeOut) ⇒ BoundedPriorityMailbox(comparator, capacity, pushTimeOut).create(null) } } + +object CustomMailboxSpec { + val config = """ + my-dispatcher { + mailboxType = "akka.dispatch.CustomMailboxSpec$MyMailbox" + } + """ + + class MyMailbox(owner: ActorContext) extends CustomMailbox(owner) + with QueueBasedMessageQueue with UnboundedMessageQueueSemantics with DefaultSystemMessageQueue { + final val queue = new ConcurrentLinkedQueue[Envelope]() + } +} + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class CustomMailboxSpec extends AkkaSpec(CustomMailboxSpec.config) { + "Dispatcher configuration" must { + "support custom mailboxType" in { + val dispatcher = system.dispatcherFactory.newFromConfig("my-dispatcher") + dispatcher.createMailbox(null).getClass must be(classOf[CustomMailboxSpec.MyMailbox]) + } + } +} diff --git a/akka-actor-tests/src/test/scala/akka/routing/ActorPoolSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/ActorPoolSpec.scala index c87e4aeb89..f18fd2e5e1 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/ActorPoolSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/ActorPoolSpec.scala @@ -92,8 +92,8 @@ class ActorPoolSpec extends AkkaSpec with DefaultTimeout { pool ! "a" pool ! "b" - latch.await - successes.await + Await.ready(latch, TestLatch.DefaultTimeout) + Await.ready(successes, TestLatch.DefaultTimeout) count.get must be(2) @@ -180,7 +180,7 @@ class ActorPoolSpec extends AkkaSpec with DefaultTimeout { loops = 2 loop(500) - latch.await + Await.ready(latch, TestLatch.DefaultTimeout) count.get must be(loops) Await.result((pool ? ActorPool.Stat).mapTo[ActorPool.Stats], timeout.duration).size must be(2) @@ -189,7 +189,7 @@ class ActorPoolSpec extends AkkaSpec with DefaultTimeout { loops = 10 loop(500) - latch.await + Await.ready(latch, TestLatch.DefaultTimeout) count.get must be(loops) Await.result((pool ? ActorPool.Stat).mapTo[ActorPool.Stats], timeout.duration).size must be(4) @@ -236,7 +236,7 @@ class ActorPoolSpec extends AkkaSpec with DefaultTimeout { // send a few messages and observe pool at its lower bound loops = 3 loop(500) - latch.await + Await.ready(latch, TestLatch.DefaultTimeout) count.get must be(loops) Await.result((pool ? ActorPool.Stat).mapTo[ActorPool.Stats], timeout.duration).size must be(2) @@ -245,7 +245,7 @@ class ActorPoolSpec extends AkkaSpec with DefaultTimeout { loops = 15 loop(500) - latch.await(10 seconds) + Await.ready(latch, 10 seconds) count.get must be(loops) Await.result((pool ? ActorPool.Stat).mapTo[ActorPool.Stats], timeout.duration).size must be >= (3) @@ -278,7 +278,7 @@ class ActorPoolSpec extends AkkaSpec with DefaultTimeout { pool1 ! "a" pool1 ! "b" - latch1.await + Await.ready(latch1, TestLatch.DefaultTimeout) delegates.size must be(1) system.stop(pool1) @@ -306,7 +306,7 @@ class ActorPoolSpec extends AkkaSpec with DefaultTimeout { pool2 ! "a" pool2 ! "b" - latch2.await + Await.ready(latch2, TestLatch.DefaultTimeout) delegates.size must be(2) system.stop(pool2) diff --git a/akka-actor-tests/src/test/scala/akka/routing/ConfiguredLocalRoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/ConfiguredLocalRoutingSpec.scala index 45e86fc387..dd4e45f5cb 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/ConfiguredLocalRoutingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/ConfiguredLocalRoutingSpec.scala @@ -3,7 +3,6 @@ package akka.routing import akka.actor._ import akka.routing._ import java.util.concurrent.atomic.AtomicInteger -import java.util.concurrent.{ CountDownLatch, TimeUnit } import akka.testkit._ import akka.util.duration._ import akka.dispatch.Await @@ -30,8 +29,8 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec with DefaultTimeout with Impli "round robin router" must { "be able to shut down its instance" in { - val helloLatch = new CountDownLatch(5) - val stopLatch = new CountDownLatch(5) + val helloLatch = new TestLatch(5) + val stopLatch = new TestLatch(5) val actor = system.actorOf(Props(new Actor { def receive = { @@ -48,16 +47,16 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec with DefaultTimeout with Impli actor ! "hello" actor ! "hello" actor ! "hello" - helloLatch.await(5, TimeUnit.SECONDS) must be(true) + Await.ready(helloLatch, 5 seconds) system.stop(actor) - stopLatch.await(5, TimeUnit.SECONDS) must be(true) + Await.ready(stopLatch, 5 seconds) } "deliver messages in a round robin fashion" in { val connectionCount = 10 val iterationCount = 10 - val doneLatch = new CountDownLatch(connectionCount) + val doneLatch = new TestLatch(connectionCount) val counter = new AtomicInteger var replies = Map.empty[Int, Int] @@ -83,14 +82,14 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec with DefaultTimeout with Impli counter.get must be(connectionCount) actor ! Broadcast("end") - doneLatch.await(5, TimeUnit.SECONDS) must be(true) + Await.ready(doneLatch, 5 seconds) replies.values foreach { _ must be(iterationCount) } } "deliver a broadcast message using the !" in { - val helloLatch = new CountDownLatch(5) - val stopLatch = new CountDownLatch(5) + val helloLatch = new TestLatch(5) + val stopLatch = new TestLatch(5) val actor = system.actorOf(Props(new Actor { def receive = { @@ -103,17 +102,17 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec with DefaultTimeout with Impli }).withRouter(RoundRobinRouter(5)), "round-robin-broadcast") actor ! Broadcast("hello") - helloLatch.await(5, TimeUnit.SECONDS) must be(true) + Await.ready(helloLatch, 5 seconds) system.stop(actor) - stopLatch.await(5, TimeUnit.SECONDS) must be(true) + Await.ready(stopLatch, 5 seconds) } } "random router" must { "be able to shut down its instance" in { - val stopLatch = new CountDownLatch(7) + val stopLatch = new TestLatch(7) val actor = system.actorOf(Props(new Actor { def receive = { @@ -136,13 +135,13 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec with DefaultTimeout with Impli } system.stop(actor) - stopLatch.await(5, TimeUnit.SECONDS) must be(true) + Await.ready(stopLatch, 5 seconds) } "deliver messages in a random fashion" in { val connectionCount = 10 val iterationCount = 10 - val doneLatch = new CountDownLatch(connectionCount) + val doneLatch = new TestLatch(connectionCount) val counter = new AtomicInteger var replies = Map.empty[Int, Int] @@ -168,15 +167,15 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec with DefaultTimeout with Impli counter.get must be(connectionCount) actor ! Broadcast("end") - doneLatch.await(5, TimeUnit.SECONDS) must be(true) + Await.ready(doneLatch, 5 seconds) replies.values foreach { _ must be > (0) } replies.values.sum must be === iterationCount * connectionCount } "deliver a broadcast message using the !" in { - val helloLatch = new CountDownLatch(6) - val stopLatch = new CountDownLatch(6) + val helloLatch = new TestLatch(6) + val stopLatch = new TestLatch(6) val actor = system.actorOf(Props(new Actor { def receive = { @@ -189,10 +188,10 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec with DefaultTimeout with Impli }).withRouter(RandomRouter(6)), "random-broadcast") actor ! Broadcast("hello") - helloLatch.await(5, TimeUnit.SECONDS) must be(true) + Await.ready(helloLatch, 5 seconds) system.stop(actor) - stopLatch.await(5, TimeUnit.SECONDS) must be(true) + Await.ready(stopLatch, 5 seconds) } } } diff --git a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala index 49bad5736c..617bfa5b5f 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala @@ -105,7 +105,7 @@ class RoutingSpec extends AkkaSpec(ConfigFactory.parseString(""" } "send message to connection" in { - val doneLatch = new CountDownLatch(1) + val doneLatch = new TestLatch(1) val counter = new AtomicInteger(0) @@ -120,7 +120,7 @@ class RoutingSpec extends AkkaSpec(ConfigFactory.parseString(""" routedActor ! "hello" routedActor ! "end" - doneLatch.await(5, TimeUnit.SECONDS) must be(true) + Await.ready(doneLatch, 5 seconds) counter.get must be(1) } @@ -139,7 +139,7 @@ class RoutingSpec extends AkkaSpec(ConfigFactory.parseString(""" "deliver messages in a round robin fashion" in { val connectionCount = 10 val iterationCount = 10 - val doneLatch = new CountDownLatch(connectionCount) + val doneLatch = new TestLatch(connectionCount) //lets create some connections. var actors = new LinkedList[ActorRef] @@ -167,7 +167,7 @@ class RoutingSpec extends AkkaSpec(ConfigFactory.parseString(""" routedActor ! Broadcast("end") //now wait some and do validations. - doneLatch.await(5, TimeUnit.SECONDS) must be(true) + Await.ready(doneLatch, 5 seconds) for (i ← 0 until connectionCount) { val counter = counters.get(i).get @@ -176,7 +176,7 @@ class RoutingSpec extends AkkaSpec(ConfigFactory.parseString(""" } "deliver a broadcast message using the !" in { - val doneLatch = new CountDownLatch(2) + val doneLatch = new TestLatch(2) val counter1 = new AtomicInteger val actor1 = system.actorOf(Props(new Actor { @@ -199,7 +199,7 @@ class RoutingSpec extends AkkaSpec(ConfigFactory.parseString(""" routedActor ! Broadcast(1) routedActor ! Broadcast("end") - doneLatch.await(5, TimeUnit.SECONDS) must be(true) + Await.ready(doneLatch, 5 seconds) counter1.get must be(1) counter2.get must be(1) @@ -214,7 +214,7 @@ class RoutingSpec extends AkkaSpec(ConfigFactory.parseString(""" } "deliver a broadcast message" in { - val doneLatch = new CountDownLatch(2) + val doneLatch = new TestLatch(2) val counter1 = new AtomicInteger val actor1 = system.actorOf(Props(new Actor { @@ -237,7 +237,7 @@ class RoutingSpec extends AkkaSpec(ConfigFactory.parseString(""" routedActor ! Broadcast(1) routedActor ! Broadcast("end") - doneLatch.await(5, TimeUnit.SECONDS) must be(true) + Await.ready(doneLatch, 5 seconds) counter1.get must be(1) counter2.get must be(1) @@ -251,7 +251,7 @@ class RoutingSpec extends AkkaSpec(ConfigFactory.parseString(""" } "broadcast message using !" in { - val doneLatch = new CountDownLatch(2) + val doneLatch = new TestLatch(2) val counter1 = new AtomicInteger val actor1 = system.actorOf(Props(new Actor { @@ -273,14 +273,14 @@ class RoutingSpec extends AkkaSpec(ConfigFactory.parseString(""" routedActor ! 1 routedActor ! "end" - doneLatch.await(5, TimeUnit.SECONDS) must be(true) + Await.ready(doneLatch, 5 seconds) counter1.get must be(1) counter2.get must be(1) } "broadcast message using ?" in { - val doneLatch = new CountDownLatch(2) + val doneLatch = new TestLatch(2) val counter1 = new AtomicInteger val actor1 = system.actorOf(Props(new Actor { @@ -304,7 +304,7 @@ class RoutingSpec extends AkkaSpec(ConfigFactory.parseString(""" routedActor ? 1 routedActor ! "end" - doneLatch.await(5, TimeUnit.SECONDS) must be(true) + Await.ready(doneLatch, 5 seconds) counter1.get must be(1) counter2.get must be(1) @@ -341,7 +341,7 @@ class RoutingSpec extends AkkaSpec(ConfigFactory.parseString(""" routedActor ! Broadcast(1) routedActor ! Broadcast("end") - doneLatch.await + Await.ready(doneLatch, TestLatch.DefaultTimeout) counter1.get must be(1) counter2.get must be(1) @@ -354,7 +354,7 @@ class RoutingSpec extends AkkaSpec(ConfigFactory.parseString(""" val routedActor = system.actorOf(Props[TestActor].withRouter(ScatterGatherFirstCompletedRouter(routees = List(actor1, actor2)))) routedActor ! Broadcast(Stop(Some(1))) - shutdownLatch.await + Await.ready(shutdownLatch, TestLatch.DefaultTimeout) Await.result(routedActor ? Broadcast(0), timeout.duration) must be(22) } diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index d9447fa0c7..4eeb3a29fe 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -29,7 +29,7 @@ akka { logConfigOnStart = off # List FQCN of extensions which shall be loaded at actor system startup. - # Should be on the format: 'extensions = ["foo", "bar"]' etc. + # Should be on the format: 'extensions = ["foo", "bar"]' etc. # FIXME: clarify "extensions" here, "Akka Extensions ()" extensions = [] @@ -59,7 +59,7 @@ akka { deployment { - # deployment id pattern - on the format: /parent/child etc. + # deployment id pattern - on the format: /parent/child etc. default { # routing (load-balance) scheme to use @@ -160,6 +160,10 @@ akka { # Specifies the timeout to add a new message to a mailbox that is full - # negative number means infinite timeout mailbox-push-timeout-time = 10s + + # FQCN of the MailboxType, if not specified the default bounded or unbounded + # mailbox is used. + mailboxType = "" } debug { diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 1c94d4304d..ccf8b3c6c8 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -16,7 +16,6 @@ import akka.AkkaException import scala.reflect.BeanProperty import scala.util.control.NoStackTrace import com.eaio.uuid.UUID -import java.lang.reflect.InvocationTargetException import java.util.concurrent.TimeUnit import java.util.{ Collection ⇒ JCollection } import java.util.regex.Pattern diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index 203c8e3510..5e6f2e72e4 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -19,7 +19,6 @@ import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigParseOptions import com.typesafe.config.ConfigResolveOptions import com.typesafe.config.ConfigException -import java.lang.reflect.InvocationTargetException import akka.util.{ Helpers, Duration, ReflectiveAccess } import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.{ CountDownLatch, Executors, ConcurrentHashMap } @@ -409,9 +408,8 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor val values: Array[AnyRef] = arguments map (_._2) toArray ReflectiveAccess.createInstance[ActorRefProvider](providerClass, types, values) match { - case Left(e: InvocationTargetException) ⇒ throw e.getTargetException - case Left(e) ⇒ throw e - case Right(p) ⇒ p + case Left(e) ⇒ throw e + case Right(p) ⇒ p } } diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index 35284879a4..b1ca951b36 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -271,11 +271,15 @@ abstract class MessageDispatcherConfigurator() { def configure(config: Config, settings: Settings, prerequisites: DispatcherPrerequisites): MessageDispatcher def mailboxType(config: Config, settings: Settings): MailboxType = { - val capacity = config.getInt("mailbox-capacity") - if (capacity < 1) UnboundedMailbox() - else { - val duration = Duration(config.getNanoseconds("mailbox-push-timeout-time"), TimeUnit.NANOSECONDS) - BoundedMailbox(capacity, duration) + config.getString("mailboxType") match { + case "" ⇒ + val capacity = config.getInt("mailbox-capacity") + if (capacity < 1) UnboundedMailbox() + else { + val duration = Duration(config.getNanoseconds("mailbox-push-timeout-time"), TimeUnit.NANOSECONDS) + BoundedMailbox(capacity, duration) + } + case fqn ⇒ new CustomMailboxType(fqn) } } diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index f3697ae904..479de8731b 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -41,7 +41,7 @@ object Await { def result(atMost: Duration)(implicit permit: CanAwait): T } - private implicit val permit = new CanAwait {} + private[this] implicit final val permit = new CanAwait {} def ready[T <: Awaitable[_]](awaitable: T, atMost: Duration): T = awaitable.ready(atMost) def result[T](awaitable: Awaitable[T], atMost: Duration): T = awaitable.result(atMost) diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index fdb46f0ec4..86b286e916 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -10,6 +10,8 @@ import akka.actor.{ ActorCell, ActorRef } import java.util.concurrent._ import annotation.tailrec import akka.event.Logging.Error +import com.typesafe.config.Config +import akka.actor.ActorContext class MessageQueueAppendFailedException(message: String, cause: Throwable = null) extends AkkaException(message, cause) @@ -33,7 +35,17 @@ object Mailbox { final val debug = false } -abstract class Mailbox(val actor: ActorCell) extends MessageQueue with SystemMessageQueue with Runnable { +/** + * Custom mailbox implementations are implemented by extending this class. + */ +abstract class CustomMailbox(val actorContext: ActorContext) extends Mailbox(actorContext.asInstanceOf[ActorCell]) + +/** + * Mailbox and InternalMailbox is separated in two classes because ActorCell is needed for implementation, + * but can't be exposed to user defined mailbox subclasses. + * + */ +private[akka] abstract class Mailbox(val actor: ActorCell) extends MessageQueue with SystemMessageQueue with Runnable { import Mailbox._ @volatile @@ -317,15 +329,15 @@ trait QueueBasedMessageQueue extends MessageQueue { * Mailbox configuration. */ trait MailboxType { - def create(receiver: ActorCell): Mailbox + def create(receiver: ActorContext): Mailbox } /** * It's a case class for Java (new UnboundedMailbox) */ case class UnboundedMailbox() extends MailboxType { - override def create(receiver: ActorCell) = - new Mailbox(receiver) with QueueBasedMessageQueue with UnboundedMessageQueueSemantics with DefaultSystemMessageQueue { + override def create(receiver: ActorContext) = + new Mailbox(receiver.asInstanceOf[ActorCell]) with QueueBasedMessageQueue with UnboundedMessageQueueSemantics with DefaultSystemMessageQueue { final val queue = new ConcurrentLinkedQueue[Envelope]() } } @@ -335,16 +347,16 @@ case class BoundedMailbox( final val capacity: Int, final val pushTimeOut: Durat if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative") if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null") - override def create(receiver: ActorCell) = - new Mailbox(receiver) with QueueBasedMessageQueue with BoundedMessageQueueSemantics with DefaultSystemMessageQueue { + override def create(receiver: ActorContext) = + new Mailbox(receiver.asInstanceOf[ActorCell]) with QueueBasedMessageQueue with BoundedMessageQueueSemantics with DefaultSystemMessageQueue { final val queue = new LinkedBlockingQueue[Envelope](capacity) final val pushTimeOut = BoundedMailbox.this.pushTimeOut } } case class UnboundedPriorityMailbox( final val cmp: Comparator[Envelope]) extends MailboxType { - override def create(receiver: ActorCell) = - new Mailbox(receiver) with QueueBasedMessageQueue with UnboundedMessageQueueSemantics with DefaultSystemMessageQueue { + override def create(receiver: ActorContext) = + new Mailbox(receiver.asInstanceOf[ActorCell]) with QueueBasedMessageQueue with UnboundedMessageQueueSemantics with DefaultSystemMessageQueue { final val queue = new PriorityBlockingQueue[Envelope](11, cmp) } } @@ -354,10 +366,36 @@ case class BoundedPriorityMailbox( final val cmp: Comparator[Envelope], final va if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative") if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null") - override def create(receiver: ActorCell) = - new Mailbox(receiver) with QueueBasedMessageQueue with BoundedMessageQueueSemantics with DefaultSystemMessageQueue { + override def create(receiver: ActorContext) = + new Mailbox(receiver.asInstanceOf[ActorCell]) with QueueBasedMessageQueue with BoundedMessageQueueSemantics with DefaultSystemMessageQueue { final val queue = new BoundedBlockingQueue[Envelope](capacity, new PriorityQueue[Envelope](11, cmp)) final val pushTimeOut = BoundedPriorityMailbox.this.pushTimeOut } } +/** + * Mailbox factory that creates instantiates the implementation from a + * fully qualified class name. The implementation class must have + * a constructor with a [[akka.actor.ActorContext]] parameter. + * E.g. + * + * class MyMailbox(owner: ActorContext) extends CustomMailbox(owner) + * with QueueBasedMessageQueue with UnboundedMessageQueueSemantics with DefaultSystemMessageQueue { + * val queue = new ConcurrentLinkedQueue[Envelope]() + * } + * + */ +class CustomMailboxType(mailboxFQN: String) extends MailboxType { + + override def create(receiver: ActorContext): Mailbox = { + val constructorSignature = Array[Class[_]](classOf[ActorContext]) + ReflectiveAccess.createInstance[Mailbox](mailboxFQN, constructorSignature, Array[AnyRef](receiver)) match { + case Right(instance) ⇒ instance + case Left(exception) ⇒ + throw new IllegalArgumentException("Cannot instantiate mailbox [%s] due to: %s". + format(mailboxFQN, exception.toString)) + } + } + +} + diff --git a/akka-actor/src/main/scala/akka/event/Logging.scala b/akka-actor/src/main/scala/akka/event/Logging.scala index face9a20d9..1797eb9d18 100644 --- a/akka-actor/src/main/scala/akka/event/Logging.scala +++ b/akka-actor/src/main/scala/akka/event/Logging.scala @@ -12,7 +12,6 @@ import akka.util.ReentrantGuard import akka.util.duration._ import akka.util.Timeout import java.util.concurrent.atomic.AtomicInteger -import akka.actor.ActorRefProvider import scala.util.control.NoStackTrace import java.util.concurrent.TimeoutException import akka.dispatch.Await @@ -516,34 +515,54 @@ trait LoggingAdapter { */ def error(cause: Throwable, message: String) { if (isErrorEnabled) notifyError(cause, message) } - def error(cause: Throwable, template: String, arg1: Any) { if (isErrorEnabled) error(cause, format(template, arg1)) } - def error(cause: Throwable, template: String, arg1: Any, arg2: Any) { if (isErrorEnabled) error(cause, format(template, arg1, arg2)) } - def error(cause: Throwable, template: String, arg1: Any, arg2: Any, arg3: Any) { if (isErrorEnabled) error(cause, format(template, arg1, arg2, arg3)) } - def error(cause: Throwable, template: String, arg1: Any, arg2: Any, arg3: Any, arg4: Any) { if (isErrorEnabled) error(cause, format(template, arg1, arg2, arg3, arg4)) } + def error(cause: Throwable, template: String, arg1: Any) { if (isErrorEnabled) notifyError(cause, format(template, arg1)) } + def error(cause: Throwable, template: String, arg1: Any, arg2: Any) { if (isErrorEnabled) notifyError(cause, format(template, arg1, arg2)) } + def error(cause: Throwable, template: String, arg1: Any, arg2: Any, arg3: Any) { if (isErrorEnabled) notifyError(cause, format(template, arg1, arg2, arg3)) } + def error(cause: Throwable, template: String, arg1: Any, arg2: Any, arg3: Any, arg4: Any) { if (isErrorEnabled) notifyError(cause, format(template, arg1, arg2, arg3, arg4)) } def error(message: String) { if (isErrorEnabled) notifyError(message) } - def error(template: String, arg1: Any) { if (isErrorEnabled) error(format(template, arg1)) } - def error(template: String, arg1: Any, arg2: Any) { if (isErrorEnabled) error(format(template, arg1, arg2)) } - def error(template: String, arg1: Any, arg2: Any, arg3: Any) { if (isErrorEnabled) error(format(template, arg1, arg2, arg3)) } - def error(template: String, arg1: Any, arg2: Any, arg3: Any, arg4: Any) { if (isErrorEnabled) error(format(template, arg1, arg2, arg3, arg4)) } + def error(template: String, arg1: Any) { if (isErrorEnabled) notifyError(format(template, arg1)) } + def error(template: String, arg1: Any, arg2: Any) { if (isErrorEnabled) notifyError(format(template, arg1, arg2)) } + def error(template: String, arg1: Any, arg2: Any, arg3: Any) { if (isErrorEnabled) notifyError(format(template, arg1, arg2, arg3)) } + def error(template: String, arg1: Any, arg2: Any, arg3: Any, arg4: Any) { if (isErrorEnabled) notifyError(format(template, arg1, arg2, arg3, arg4)) } def warning(message: String) { if (isWarningEnabled) notifyWarning(message) } - def warning(template: String, arg1: Any) { if (isWarningEnabled) warning(format(template, arg1)) } - def warning(template: String, arg1: Any, arg2: Any) { if (isWarningEnabled) warning(format(template, arg1, arg2)) } - def warning(template: String, arg1: Any, arg2: Any, arg3: Any) { if (isWarningEnabled) warning(format(template, arg1, arg2, arg3)) } - def warning(template: String, arg1: Any, arg2: Any, arg3: Any, arg4: Any) { if (isWarningEnabled) warning(format(template, arg1, arg2, arg3, arg4)) } + def warning(template: String, arg1: Any) { if (isWarningEnabled) notifyWarning(format(template, arg1)) } + def warning(template: String, arg1: Any, arg2: Any) { if (isWarningEnabled) notifyWarning(format(template, arg1, arg2)) } + def warning(template: String, arg1: Any, arg2: Any, arg3: Any) { if (isWarningEnabled) notifyWarning(format(template, arg1, arg2, arg3)) } + def warning(template: String, arg1: Any, arg2: Any, arg3: Any, arg4: Any) { if (isWarningEnabled) notifyWarning(format(template, arg1, arg2, arg3, arg4)) } def info(message: String) { if (isInfoEnabled) notifyInfo(message) } - def info(template: String, arg1: Any) { if (isInfoEnabled) info(format(template, arg1)) } - def info(template: String, arg1: Any, arg2: Any) { if (isInfoEnabled) info(format(template, arg1, arg2)) } - def info(template: String, arg1: Any, arg2: Any, arg3: Any) { if (isInfoEnabled) info(format(template, arg1, arg2, arg3)) } - def info(template: String, arg1: Any, arg2: Any, arg3: Any, arg4: Any) { if (isInfoEnabled) info(format(template, arg1, arg2, arg3, arg4)) } + def info(template: String, arg1: Any) { if (isInfoEnabled) notifyInfo(format(template, arg1)) } + def info(template: String, arg1: Any, arg2: Any) { if (isInfoEnabled) notifyInfo(format(template, arg1, arg2)) } + def info(template: String, arg1: Any, arg2: Any, arg3: Any) { if (isInfoEnabled) notifyInfo(format(template, arg1, arg2, arg3)) } + def info(template: String, arg1: Any, arg2: Any, arg3: Any, arg4: Any) { if (isInfoEnabled) notifyInfo(format(template, arg1, arg2, arg3, arg4)) } def debug(message: String) { if (isDebugEnabled) notifyDebug(message) } - def debug(template: String, arg1: Any) { if (isDebugEnabled) debug(format(template, arg1)) } - def debug(template: String, arg1: Any, arg2: Any) { if (isDebugEnabled) debug(format(template, arg1, arg2)) } - def debug(template: String, arg1: Any, arg2: Any, arg3: Any) { if (isDebugEnabled) debug(format(template, arg1, arg2, arg3)) } - def debug(template: String, arg1: Any, arg2: Any, arg3: Any, arg4: Any) { if (isDebugEnabled) debug(format(template, arg1, arg2, arg3, arg4)) } + def debug(template: String, arg1: Any) { if (isDebugEnabled) notifyDebug(format(template, arg1)) } + def debug(template: String, arg1: Any, arg2: Any) { if (isDebugEnabled) notifyDebug(format(template, arg1, arg2)) } + def debug(template: String, arg1: Any, arg2: Any, arg3: Any) { if (isDebugEnabled) notifyDebug(format(template, arg1, arg2, arg3)) } + def debug(template: String, arg1: Any, arg2: Any, arg3: Any, arg4: Any) { if (isDebugEnabled) notifyDebug(format(template, arg1, arg2, arg3, arg4)) } + + def log(level: Logging.LogLevel, message: String) { if (isEnabled(level)) notifyLog(level, message) } + def log(level: Logging.LogLevel, template: String, arg1: Any) { if (isEnabled(level)) notifyLog(level, format(template, arg1)) } + def log(level: Logging.LogLevel, template: String, arg1: Any, arg2: Any) { if (isEnabled(level)) notifyLog(level, format(template, arg1, arg2)) } + def log(level: Logging.LogLevel, template: String, arg1: Any, arg2: Any, arg3: Any) { if (isEnabled(level)) notifyLog(level, format(template, arg1, arg2, arg3)) } + def log(level: Logging.LogLevel, template: String, arg1: Any, arg2: Any, arg3: Any, arg4: Any) { if (isEnabled(level)) notifyLog(level, format(template, arg1, arg2, arg3, arg4)) } + + final def isEnabled(level: Logging.LogLevel): Boolean = level match { + case Logging.ErrorLevel ⇒ isErrorEnabled + case Logging.WarningLevel ⇒ isWarningEnabled + case Logging.InfoLevel ⇒ isInfoEnabled + case Logging.DebugLevel ⇒ isDebugEnabled + } + + final def notifyLog(level: Logging.LogLevel, message: String): Unit = level match { + case Logging.ErrorLevel ⇒ if (isErrorEnabled) notifyError(message) + case Logging.WarningLevel ⇒ if (isWarningEnabled) notifyWarning(message) + case Logging.InfoLevel ⇒ if (isInfoEnabled) notifyInfo(message) + case Logging.DebugLevel ⇒ if (isDebugEnabled) notifyDebug(message) + } def format(t: String, arg: Any*) = { val sb = new StringBuilder diff --git a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala index 065e11ba78..70b6fa5a03 100644 --- a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala +++ b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala @@ -5,6 +5,7 @@ package akka.util import akka.actor._ +import java.lang.reflect.InvocationTargetException object ReflectiveAccess { @@ -14,22 +15,19 @@ object ReflectiveAccess { def createInstance[T](clazz: Class[_], params: Array[Class[_]], - args: Array[AnyRef]): Either[Exception, T] = try { + args: Array[AnyRef]): Either[Exception, T] = withErrorHandling { assert(clazz ne null) assert(params ne null) assert(args ne null) val ctor = clazz.getDeclaredConstructor(params: _*) ctor.setAccessible(true) Right(ctor.newInstance(args: _*).asInstanceOf[T]) - } catch { - case e: Exception ⇒ - Left(e) } def createInstance[T](fqn: String, params: Array[Class[_]], args: Array[AnyRef], - classloader: ClassLoader = loader): Either[Exception, T] = try { + classloader: ClassLoader = loader): Either[Exception, T] = withErrorHandling { assert(params ne null) assert(args ne null) getClassFor(fqn, classloader) match { @@ -39,9 +37,6 @@ object ReflectiveAccess { Right(ctor.newInstance(args: _*).asInstanceOf[T]) case Left(exception) ⇒ Left(exception) //We could just cast this to Either[Exception, T] but it's ugly } - } catch { - case e: Exception ⇒ - Left(e) } //Obtains a reference to fqn.MODULE$ @@ -100,5 +95,24 @@ object ReflectiveAccess { case e: Exception ⇒ Left(e) } + /** + * Caught exception is returned as Left(exception). + * Unwraps `InvocationTargetException` if its getTargetException is an `Exception`. + * Other `Throwable`, such as `Error` is thrown. + */ + @inline + private final def withErrorHandling[T](body: ⇒ Either[Exception, T]): Either[Exception, T] = { + try { + body + } catch { + case e: InvocationTargetException ⇒ e.getTargetException match { + case t: Exception ⇒ Left(t) + case t ⇒ throw t + } + case e: Exception ⇒ + Left(e) + } + } + } diff --git a/akka-stm/src/main/scala/akka/agent/Agent.scala b/akka-agent/src/main/scala/akka/agent/Agent.scala similarity index 88% rename from akka-stm/src/main/scala/akka/agent/Agent.scala rename to akka-agent/src/main/scala/akka/agent/Agent.scala index a5bece2b5c..278acadc74 100644 --- a/akka-stm/src/main/scala/akka/agent/Agent.scala +++ b/akka-agent/src/main/scala/akka/agent/Agent.scala @@ -4,17 +4,17 @@ package akka.agent -import akka.actor.ActorSystem import akka.actor._ -import akka.stm._ import akka.japi.{ Function ⇒ JFunc, Procedure ⇒ JProc } import akka.dispatch._ import akka.util.Timeout +import scala.concurrent.stm._ /** * Used internally to send functions. */ private[akka] case class Update[T](function: T ⇒ T) +private[akka] case class Alter[T](function: T ⇒ T) private[akka] case object Get /** @@ -101,7 +101,7 @@ class Agent[T](initialValue: T, system: ActorSystem) { /** * Read the internal state of the agent. */ - def get() = ref.get + def get() = ref.single.get /** * Read the internal state of the agent. @@ -111,9 +111,10 @@ class Agent[T](initialValue: T, system: ActorSystem) { /** * Dispatch a function to update the internal state. */ - def send(f: T ⇒ T) { + def send(f: T ⇒ T): Unit = { def dispatch = updater ! Update(f) - if (Stm.activeTransaction) { get; deferred(dispatch) } + val txn = Txn.findCurrent + if (txn.isDefined) Txn.afterCommit(status ⇒ dispatch)(txn.get) else dispatch } @@ -122,11 +123,11 @@ class Agent[T](initialValue: T, system: ActorSystem) { * that new state can be obtained within the given timeout. */ def alter(f: T ⇒ T)(timeout: Timeout): Future[T] = { - def dispatch = updater.?(Update(f), timeout).asInstanceOf[Future[T]] - if (Stm.activeTransaction) { + def dispatch = updater.?(Alter(f), timeout).asInstanceOf[Future[T]] + val txn = Txn.findCurrent + if (txn.isDefined) { val result = Promise[T]()(system.dispatcher) - get //Join xa - deferred { result completeWith dispatch } //Attach deferred-block to current transaction + Txn.afterCommit(status ⇒ result completeWith dispatch)(txn.get) result } else dispatch } @@ -172,7 +173,7 @@ class Agent[T](initialValue: T, system: ActorSystem) { suspend() val pinnedDispatcher = new PinnedDispatcher(system.dispatcherFactory.prerequisites, null, "agent-alter-off", UnboundedMailbox(), system.settings.ActorTimeout.duration) val threadBased = system.actorOf(Props(new ThreadBasedAgentUpdater(this)).withDispatcher(pinnedDispatcher)) - result completeWith threadBased.?(Update(f), timeout).asInstanceOf[Future[T]] + result completeWith threadBased.?(Alter(f), timeout).asInstanceOf[Future[T]] value }) result @@ -283,28 +284,35 @@ class Agent[T](initialValue: T, system: ActorSystem) { * Agent updater actor. Used internally for `send` actions. */ class AgentUpdater[T](agent: Agent[T]) extends Actor { - val txFactory = TransactionFactory(familyName = "AgentUpdater", readonly = false) - def receive = { - case update: Update[_] ⇒ sender.tell(atomic(txFactory) { agent.ref alter update.function.asInstanceOf[T ⇒ T] }) - case Get ⇒ sender.tell(agent.get) - case _ ⇒ + case u: Update[_] ⇒ update(u.function.asInstanceOf[T ⇒ T]) + case a: Alter[_] ⇒ sender ! update(a.function.asInstanceOf[T ⇒ T]) + case Get ⇒ sender ! agent.get + case _ ⇒ } + + def update(function: T ⇒ T): T = agent.ref.single.transformAndGet(function) } /** * Thread-based agent updater actor. Used internally for `sendOff` actions. */ class ThreadBasedAgentUpdater[T](agent: Agent[T]) extends Actor { - val txFactory = TransactionFactory(familyName = "ThreadBasedAgentUpdater", readonly = false) - def receive = { - case update: Update[_] ⇒ try { - sender.tell(atomic(txFactory) { agent.ref alter update.function.asInstanceOf[T ⇒ T] }) + case u: Update[_] ⇒ try { + update(u.function.asInstanceOf[T ⇒ T]) + } finally { + agent.resume() + context.stop(self) + } + case a: Alter[_] ⇒ try { + sender ! update(a.function.asInstanceOf[T ⇒ T]) } finally { agent.resume() context.stop(self) } case _ ⇒ context.stop(self) } + + def update(function: T ⇒ T): T = agent.ref.single.transformAndGet(function) } diff --git a/akka-stm/src/test/scala/akka/agent/test/AgentSpec.scala b/akka-agent/src/test/scala/akka/agent/AgentSpec.scala similarity index 93% rename from akka-stm/src/test/scala/akka/agent/test/AgentSpec.scala rename to akka-agent/src/test/scala/akka/agent/AgentSpec.scala index b834489e6e..0e20d6c866 100644 --- a/akka-stm/src/test/scala/akka/agent/test/AgentSpec.scala +++ b/akka-agent/src/test/scala/akka/agent/AgentSpec.scala @@ -1,17 +1,12 @@ -package akka.agent.test +package akka.agent -import org.scalatest.WordSpec -import org.scalatest.matchers.MustMatchers -import akka.actor.ActorSystem -import akka.util.Timeout -import akka.agent.Agent -import akka.stm._ +import akka.dispatch.Await import akka.util.Duration import akka.util.duration._ -import java.util.concurrent.CountDownLatch -import akka.testkit.AkkaSpec +import akka.util.Timeout import akka.testkit._ -import akka.dispatch.Await +import scala.concurrent.stm._ +import java.util.concurrent.CountDownLatch class CountDownFunction[A](num: Int = 1) extends Function1[A, A] { val latch = new CountDownLatch(num) @@ -96,7 +91,7 @@ class AgentSpec extends AkkaSpec { "be readable within a transaction" in { val agent = Agent(5) - val value = atomic { agent() } + val value = atomic { t ⇒ agent() } value must be(5) agent.close() } @@ -105,7 +100,7 @@ class AgentSpec extends AkkaSpec { val countDown = new CountDownFunction[Int] val agent = Agent(5) - atomic { + atomic { t ⇒ agent send (_ * 2) } agent send countDown @@ -122,7 +117,7 @@ class AgentSpec extends AkkaSpec { val agent = Agent(5) try { - atomic(DefaultTransactionFactory) { + atomic { t ⇒ agent send (_ * 2) throw new RuntimeException("Expected failure") } diff --git a/akka-docs/_sphinx/exts/includecode.py b/akka-docs/_sphinx/exts/includecode.py index c12ddfa7f4..7a98848776 100644 --- a/akka-docs/_sphinx/exts/includecode.py +++ b/akka-docs/_sphinx/exts/includecode.py @@ -32,7 +32,7 @@ class IncludeCode(Directive): document = self.state.document arg0 = self.arguments[0] (filename, sep, section) = arg0.partition('#') - + if not document.settings.file_insertion_enabled: return [document.reporter.warning('File insertion disabled', line=self.lineno)] @@ -126,8 +126,9 @@ class IncludeCode(Directive): retnode = nodes.literal_block(text, text, source=fn) retnode.line = 1 retnode.attributes['line_number'] = self.lineno - if self.options.get('language', ''): - retnode['language'] = self.options['language'] + language = self.options.get('language') + if language: + retnode['language'] = language if 'linenos' in self.options: retnode['linenos'] = True document.settings.env.note_dependency(rel_fn) diff --git a/akka-docs/disabled/agents.rst b/akka-docs/disabled/agents.rst deleted file mode 100644 index b12fc1643c..0000000000 --- a/akka-docs/disabled/agents.rst +++ /dev/null @@ -1,147 +0,0 @@ -Agents (Scala) -============== - -.. sidebar:: Contents - - .. contents:: :local: - -Agents in Akka were inspired by `agents in Clojure `_. - -Agents provide asynchronous change of individual locations. Agents are bound to a single storage location for their lifetime, and only allow mutation of that location (to a new state) to occur as a result of an action. Update actions are functions that are asynchronously applied to the Agent's state and whose return value becomes the Agent's new state. The state of an Agent should be immutable. - -While updates to Agents are asynchronous, the state of an Agent is always immediately available for reading by any thread (using ``get`` or ``apply``) without any messages. - -Agents are reactive. The update actions of all Agents get interleaved amongst threads in a thread pool. At any point in time, at most one ``send`` action for each Agent is being executed. Actions dispatched to an agent from another thread will occur in the order they were sent, potentially interleaved with actions dispatched to the same agent from other sources. - -If an Agent is used within an enclosing transaction, then it will participate in that transaction. Agents are integrated with the STM - any dispatches made in a transaction are held until that transaction commits, and are discarded if it is retried or aborted. - -Creating and stopping Agents ----------------------------- - -Agents are created by invoking ``Agent(value)`` passing in the Agent's initial value. - -.. code-block:: scala - - val agent = Agent(5) - -An Agent will be running until you invoke ``close`` on it. Then it will be eligible for garbage collection (unless you hold on to it in some way). - -.. code-block:: scala - - agent.close() - -Updating Agents ---------------- - -You update an Agent by sending a function that transforms the current value or by sending just a new value. The Agent will apply the new value or function atomically and asynchronously. The update is done in a fire-forget manner and you are only guaranteed that it will be applied. There is no guarantee of when the update will be applied but dispatches to an Agent from a single thread will occur in order. You apply a value or a function by invoking the ``send`` function. - -.. code-block:: scala - - // send a value - agent send 7 - - // send a function - agent send (_ + 1) - agent send (_ * 2) - -You can also dispatch a function to update the internal state but on its own thread. This does not use the reactive thread pool and can be used for long-running or blocking operations. You do this with the ``sendOff`` method. Dispatches using either ``sendOff`` or ``send`` will still be executed in order. - -.. code-block:: scala - - // sendOff a function - agent sendOff (longRunningOrBlockingFunction) - -Reading an Agent's value ------------------------- - -Agents can be dereferenced, e.g. you can get an Agent's value, by invoking the Agent with parenthesis like this: - -.. code-block:: scala - - val result = agent() - -Or by using the get method. - -.. code-block:: scala - - val result = agent.get - -Reading an Agent's current value does not involve any message passing and happens immediately. So while updates to an Agent are asynchronous, reading the state of an Agent is synchronous. - -Awaiting an Agent's value -------------------------- - -It is also possible to read the value after all currently queued ``send``\s have completed. You can do this with ``await``: - -.. code-block:: scala - - val result = agent.await - -You can also get a ``Future`` to this value, that will be completed after the currently queued updates have completed: - -.. code-block:: scala - - val future = agent.future - // ... - val result = future.await.result.get - -Transactional Agents --------------------- - -If an Agent is used within an enclosing transaction, then it will participate in that transaction. If you send to an Agent within a transaction then the dispatch to the Agent will be held until that transaction commits, and discarded if the transaction is aborted. - -.. code-block:: scala - - import akka.agent.Agent - import akka.stm._ - - def transfer(from: Agent[Int], to: Agent[Int], amount: Int): Boolean = { - atomic { - if (from.get < amount) false - else { - from send (_ - amount) - to send (_ + amount) - true - } - } - } - - val from = Agent(100) - val to = Agent(20) - val ok = transfer(from, to, 50) - - from() // -> 50 - to() // -> 70 - -Monadic usage -------------- - -Agents are also monadic, allowing you to compose operations using for-comprehensions. In a monadic usage, new Agents are created leaving the original Agents untouched. So the old values (Agents) are still available as-is. They are so-called 'persistent'. - -Example of a monadic usage: - -.. code-block:: scala - - val agent1 = Agent(3) - val agent2 = Agent(5) - - // uses foreach - var result = 0 - for (value <- agent1) { - result = value + 1 - } - - // uses map - val agent3 = - for (value <- agent1) yield value + 1 - - // uses flatMap - val agent4 = for { - value1 <- agent1 - value2 <- agent2 - } yield value1 + value2 - - agent1.close() - agent2.close() - agent3.close() - agent4.close() diff --git a/akka-docs/java/agents.rst b/akka-docs/java/agents.rst new file mode 100644 index 0000000000..c00d0a533c --- /dev/null +++ b/akka-docs/java/agents.rst @@ -0,0 +1,112 @@ +.. _agents-java: + +############## + Agents (Java) +############## + +.. sidebar:: Contents + + .. contents:: :local: + +Agents in Akka are inspired by `agents in Clojure`_. + +.. _agents in Clojure: http://clojure.org/agents + +Agents provide asynchronous change of individual locations. Agents are bound to +a single storage location for their lifetime, and only allow mutation of that +location (to a new state) to occur as a result of an action. Update actions are +functions that are asynchronously applied to the Agent's state and whose return +value becomes the Agent's new state. The state of an Agent should be immutable. + +While updates to Agents are asynchronous, the state of an Agent is always +immediately available for reading by any thread (using ``get``) without any +messages. + +Agents are reactive. The update actions of all Agents get interleaved amongst +threads in a thread pool. At any point in time, at most one ``send`` action for +each Agent is being executed. Actions dispatched to an agent from another thread +will occur in the order they were sent, potentially interleaved with actions +dispatched to the same agent from other sources. + +If an Agent is used within an enclosing transaction, then it will participate in +that transaction. Agents are integrated with the STM - any dispatches made in +a transaction are held until that transaction commits, and are discarded if it +is retried or aborted. + + +Creating and stopping Agents +============================ + +Agents are created by invoking ``new Agent(value, system)`` passing in the +Agent's initial value and a reference to the ``ActorSystem`` for your +application. An ``ActorSystem`` is required to create the underlying Actors. See +:ref:`actor-systems` for more information about actor systems. + +Here is an example of creating an Agent: + +.. includecode:: code/akka/docs/agent/AgentDocTest.java + :include: import-system,import-agent + :language: java + +.. includecode:: code/akka/docs/agent/AgentDocTest.java#create + :language: java + +An Agent will be running until you invoke ``close`` on it. Then it will be +eligible for garbage collection (unless you hold on to it in some way). + +.. includecode:: code/akka/docs/agent/AgentDocTest.java#close + :language: java + + +Updating Agents +=============== + +You update an Agent by sending a function that transforms the current value or +by sending just a new value. The Agent will apply the new value or function +atomically and asynchronously. The update is done in a fire-forget manner and +you are only guaranteed that it will be applied. There is no guarantee of when +the update will be applied but dispatches to an Agent from a single thread will +occur in order. You apply a value or a function by invoking the ``send`` +function. + +.. includecode:: code/akka/docs/agent/AgentDocTest.java#import-function + :language: java + +.. includecode:: code/akka/docs/agent/AgentDocTest.java#send + :language: java + +You can also dispatch a function to update the internal state but on its own +thread. This does not use the reactive thread pool and can be used for +long-running or blocking operations. You do this with the ``sendOff`` +method. Dispatches using either ``sendOff`` or ``send`` will still be executed +in order. + +.. includecode:: code/akka/docs/agent/AgentDocTest.java#send-off + :language: java + + +Reading an Agent's value +======================== + +Agents can be dereferenced (you can get an Agent's value) by calling the get +method: + +.. includecode:: code/akka/docs/agent/AgentDocTest.java#read-get + :language: java + +Reading an Agent's current value does not involve any message passing and +happens immediately. So while updates to an Agent are asynchronous, reading the +state of an Agent is synchronous. + + +Awaiting an Agent's value +========================= + +It is also possible to read the value after all currently queued sends have +completed. You can do this with ``await``: + +.. includecode:: code/akka/docs/agent/AgentDocTest.java#import-timeout + :language: java + +.. includecode:: code/akka/docs/agent/AgentDocTest.java#read-await + :language: java diff --git a/akka-docs/java/code/akka/docs/agent/AgentDocJavaSpec.scala b/akka-docs/java/code/akka/docs/agent/AgentDocJavaSpec.scala new file mode 100644 index 0000000000..3418830595 --- /dev/null +++ b/akka-docs/java/code/akka/docs/agent/AgentDocJavaSpec.scala @@ -0,0 +1,10 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ +package akka.docs.agent + +import org.scalatest.junit.JUnitWrapperSuite + +class AgentDocJavaSpec extends JUnitWrapperSuite( + "akka.docs.agent.AgentDocTest", + Thread.currentThread.getContextClassLoader) \ No newline at end of file diff --git a/akka-docs/java/code/akka/docs/agent/AgentDocTest.java b/akka-docs/java/code/akka/docs/agent/AgentDocTest.java new file mode 100644 index 0000000000..f7021092c2 --- /dev/null +++ b/akka-docs/java/code/akka/docs/agent/AgentDocTest.java @@ -0,0 +1,109 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ +package akka.docs.agent; + +import static org.junit.Assert.*; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import akka.testkit.AkkaSpec; + +//#import-system +import akka.actor.ActorSystem; +//#import-system + +//#import-agent +import akka.agent.Agent; +//#import-agent + +//#import-function +import akka.japi.Function; +//#import-function + +//#import-timeout +import akka.util.Duration; +import akka.util.Timeout; +import static java.util.concurrent.TimeUnit.SECONDS; +//#import-timeout + +public class AgentDocTest { + + private static ActorSystem testSystem; + + @BeforeClass + public static void beforeAll() { + testSystem = ActorSystem.create("AgentDocTest", AkkaSpec.testConf()); + } + + @AfterClass + public static void afterAll() { + testSystem.shutdown(); + testSystem = null; + } + + @Test + public void createAndClose() { + //#create + ActorSystem system = ActorSystem.create("app"); + + Agent agent = new Agent(5, system); + //#create + + //#close + agent.close(); + //#close + + system.shutdown(); + } + + @Test + public void sendAndSendOffAndReadAwait() { + Agent agent = new Agent(5, testSystem); + + //#send + // send a value + agent.send(7); + + // send a function + agent.send(new Function() { + public Integer apply(Integer i) { + return i * 2; + } + }); + //#send + + Function longRunningOrBlockingFunction = new Function() { + public Integer apply(Integer i) { + return i * 1; + } + }; + + //#send-off + // sendOff a function + agent.sendOff(longRunningOrBlockingFunction); + //#send-off + + //#read-await + Integer result = agent.await(new Timeout(Duration.create(5, SECONDS))); + //#read-await + + assertEquals(result, new Integer(14)); + + agent.close(); + } + + @Test + public void readWithGet() { + Agent agent = new Agent(5, testSystem); + + //#read-get + Integer result = agent.get(); + //#read-get + + assertEquals(result, new Integer(5)); + + agent.close(); + } +} \ No newline at end of file diff --git a/akka-docs/java/index.rst b/akka-docs/java/index.rst index 8ae79c5ba4..dc8ae98537 100644 --- a/akka-docs/java/index.rst +++ b/akka-docs/java/index.rst @@ -17,5 +17,6 @@ Java API routing remoting serialization + agents extending-akka transactors diff --git a/akka-docs/modules/code/akka/docs/actor/mailbox/DurableMailboxDocSpec.scala b/akka-docs/modules/code/akka/docs/actor/mailbox/DurableMailboxDocSpec.scala index 7a18da6182..863c48a15b 100644 --- a/akka-docs/modules/code/akka/docs/actor/mailbox/DurableMailboxDocSpec.scala +++ b/akka-docs/modules/code/akka/docs/actor/mailbox/DurableMailboxDocSpec.scala @@ -4,15 +4,14 @@ package akka.docs.actor.mailbox //#imports -import akka.actor.Actor import akka.actor.Props -import akka.actor.mailbox.FileDurableMailboxType //#imports import org.scalatest.{ BeforeAndAfterAll, WordSpec } import org.scalatest.matchers.MustMatchers import akka.testkit.AkkaSpec +import akka.actor.Actor class MyActor extends Actor { def receive = { @@ -20,15 +19,23 @@ class MyActor extends Actor { } } -class DurableMailboxDocSpec extends AkkaSpec { +object DurableMailboxDocSpec { + val config = """ + //#dispatcher-config + my-dispatcher { + mailboxType = akka.actor.mailbox.FileBasedMailbox + } + //#dispatcher-config + """ +} - "define dispatcher with durable mailbox" in { - //#define-dispatcher - val dispatcher = system.dispatcherFactory.newDispatcher( - "my-dispatcher", throughput = 1, mailboxType = FileDurableMailboxType).build +class DurableMailboxDocSpec extends AkkaSpec(DurableMailboxDocSpec.config) { + + "configuration of dispatcher with durable mailbox" in { + //#dispatcher-config-use + val dispatcher = system.dispatcherFactory.lookup("my-dispatcher") val myActor = system.actorOf(Props[MyActor].withDispatcher(dispatcher), name = "myactor") - //#define-dispatcher - myActor ! "hello" + //#dispatcher-config-use } } diff --git a/akka-docs/modules/code/akka/docs/actor/mailbox/DurableMailboxDocTestBase.java b/akka-docs/modules/code/akka/docs/actor/mailbox/DurableMailboxDocTestBase.java index ba411ffbb0..8b904f5ef6 100644 --- a/akka-docs/modules/code/akka/docs/actor/mailbox/DurableMailboxDocTestBase.java +++ b/akka-docs/modules/code/akka/docs/actor/mailbox/DurableMailboxDocTestBase.java @@ -4,7 +4,6 @@ package akka.docs.actor.mailbox; //#imports -import akka.actor.mailbox.DurableMailboxType; import akka.dispatch.MessageDispatcher; import akka.actor.UntypedActorFactory; import akka.actor.UntypedActor; @@ -12,8 +11,12 @@ import akka.actor.Props; //#imports +import org.junit.After; +import org.junit.Before; import org.junit.Test; +import akka.testkit.AkkaSpec; +import com.typesafe.config.ConfigFactory; import akka.actor.ActorRef; import akka.actor.ActorSystem; @@ -21,24 +24,35 @@ import static org.junit.Assert.*; public class DurableMailboxDocTestBase { + ActorSystem system; + + @Before + public void setUp() { + system = ActorSystem.create("MySystem", + ConfigFactory.parseString(DurableMailboxDocSpec.config()).withFallback(AkkaSpec.testConf())); + } + + @After + public void tearDown() { + system.shutdown(); + } + @Test - public void defineDispatcher() { - ActorSystem system = ActorSystem.create("MySystem"); - //#define-dispatcher - MessageDispatcher dispatcher = system.dispatcherFactory() - .newDispatcher("my-dispatcher", 1, DurableMailboxType.fileDurableMailboxType()).build(); + public void configDefinedDispatcher() { + //#dispatcher-config-use + MessageDispatcher dispatcher = system.dispatcherFactory().lookup("my-dispatcher"); ActorRef myActor = system.actorOf(new Props().withDispatcher(dispatcher).withCreator(new UntypedActorFactory() { public UntypedActor create() { return new MyUntypedActor(); } - })); - //#define-dispatcher + }), "myactor"); + //#dispatcher-config-use myActor.tell("test"); - system.shutdown(); } public static class MyUntypedActor extends UntypedActor { public void onReceive(Object message) { } } + } diff --git a/akka-docs/modules/durable-mailbox.rst b/akka-docs/modules/durable-mailbox.rst index 74af6c3ca5..2f6ca9e261 100644 --- a/akka-docs/modules/durable-mailbox.rst +++ b/akka-docs/modules/durable-mailbox.rst @@ -62,15 +62,22 @@ The durable mailboxes and their configuration options reside in the You configure durable mailboxes through the dispatcher. The actor is oblivious to which type of mailbox it is using. -Here is an example in Scala: + +In the configuration of the dispatcher you specify the fully qualified class name +of the mailbox: .. includecode:: code/akka/docs/actor/mailbox/DurableMailboxDocSpec.scala - :include: imports,define-dispatcher + :include: dispatcher-config + +Here is an example of how to create an actor with a durable dispatcher, in Scala: + +.. includecode:: code/akka/docs/actor/mailbox/DurableMailboxDocSpec.scala + :include: imports,dispatcher-config-use Corresponding example in Java: .. includecode:: code/akka/docs/actor/mailbox/DurableMailboxDocTestBase.java - :include: imports,define-dispatcher + :include: imports,dispatcher-config-use The actor is oblivious to which type of mailbox it is using. @@ -89,14 +96,11 @@ you need. You configure durable mailboxes through the dispatcher, as described in :ref:`DurableMailbox.General` with the following mailbox type. -Scala:: - - mailbox = akka.actor.mailbox.FileDurableMailboxType - -Java:: - - akka.actor.mailbox.DurableMailboxType.fileDurableMailboxType() +Config:: + my-dispatcher { + mailboxType = akka.actor.mailbox.FileBasedMailbox + } You can also configure and tune the file-based durable mailbox. This is done in the ``akka.actor.mailbox.file-based`` section in the :ref:`configuration`. @@ -117,14 +121,11 @@ mailboxes. Read more in the Redis documentation on how to do that. You configure durable mailboxes through the dispatcher, as described in :ref:`DurableMailbox.General` with the following mailbox type. -Scala:: - - mailbox = akka.actor.mailbox.RedisDurableMailboxType - -Java:: - - akka.actor.mailbox.DurableMailboxType.redisDurableMailboxType() +Config:: + my-dispatcher { + mailboxType = akka.actor.mailbox.RedisBasedMailbox + } You also need to configure the IP and port for the Redis server. This is done in the ``akka.actor.mailbox.redis`` section in the :ref:`configuration`. @@ -146,13 +147,11 @@ documentation on how to do that. You configure durable mailboxes through the dispatcher, as described in :ref:`DurableMailbox.General` with the following mailbox type. -Scala:: +Config:: - mailbox = akka.actor.mailbox.ZooKeeperDurableMailboxType - -Java:: - - akka.actor.mailbox.DurableMailboxType.zooKeeperDurableMailboxType() + my-dispatcher { + mailboxType = akka.actor.mailbox.ZooKeeperBasedMailbox + } You also need to configure ZooKeeper server addresses, timeouts, etc. This is done in the ``akka.actor.mailbox.zookeeper`` section in the :ref:`configuration`. @@ -171,13 +170,11 @@ Beanstalk documentation on how to do that. You configure durable mailboxes through the dispatcher, as described in :ref:`DurableMailbox.General` with the following mailbox type. -Scala:: +Config:: - mailbox = akka.actor.mailbox.BeanstalkDurableMailboxType - -Java:: - - akka.actor.mailbox.DurableMailboxType.beanstalkDurableMailboxType() + my-dispatcher { + mailboxType = akka.actor.mailbox.BeanstalkBasedMailbox + } You also need to configure the IP, and port, and so on, for the Beanstalk server. This is done in the ``akka.actor.mailbox.beanstalk`` section in the @@ -202,13 +199,11 @@ lightweight versus building on other MongoDB implementations such as You configure durable mailboxes through the dispatcher, as described in :ref:`DurableMailbox.General` with the following mailbox type. -Scala:: +Config:: - mailbox = akka.actor.mailbox.MongoDurableMailboxType - -Java:: - - akka.actor.mailbox.DurableMailboxType.mongoDurableMailboxType() + my-dispatcher { + mailboxType = akka.actor.mailbox.MongoBasedMailbox + } You will need to configure the URI for the MongoDB server, using the URI Format specified in the `MongoDB Documentation `_. This is done in diff --git a/akka-docs/scala/actors.rst b/akka-docs/scala/actors.rst index dc781604ee..3a382ca005 100644 --- a/akka-docs/scala/actors.rst +++ b/akka-docs/scala/actors.rst @@ -328,16 +328,13 @@ message. If the actor does not complete the future, it will expire after the timeout period, which is taken from one of the following locations in order of precedence: -#. explicitly given timeout as in ``actor.?("hello")(timeout = 12 millis)`` -#. implicit argument of type :class:`akka.actor.Timeout`, e.g. +1. explicitly given timeout as in: - :: +.. includecode:: code/akka/docs/actor/ActorDocSpec.scala#using-explicit-timeout - import akka.actor.Timeout - import akka.util.duration._ +2. implicit argument of type :class:`akka.util.Timeout`, e.g. - implicit val timeout = Timeout(12 millis) - val future = actor ? "hello" +.. includecode:: code/akka/docs/actor/ActorDocSpec.scala#using-implicit-timeout See :ref:`futures-scala` for more information on how to await or query a future. diff --git a/akka-docs/scala/agents.rst b/akka-docs/scala/agents.rst index bb65c4fabf..856eb4cab8 100644 --- a/akka-docs/scala/agents.rst +++ b/akka-docs/scala/agents.rst @@ -1,4 +1,135 @@ -Agents (Scala) -============== +.. _agents-scala: -The Akka Agents module has not been migrated to Akka 2.0-SNAPSHOT yet. \ No newline at end of file +################ + Agents (Scala) +################ + +.. sidebar:: Contents + + .. contents:: :local: + +Agents in Akka are inspired by `agents in Clojure`_. + +.. _agents in Clojure: http://clojure.org/agents + +Agents provide asynchronous change of individual locations. Agents are bound to +a single storage location for their lifetime, and only allow mutation of that +location (to a new state) to occur as a result of an action. Update actions are +functions that are asynchronously applied to the Agent's state and whose return +value becomes the Agent's new state. The state of an Agent should be immutable. + +While updates to Agents are asynchronous, the state of an Agent is always +immediately available for reading by any thread (using ``get`` or ``apply``) +without any messages. + +Agents are reactive. The update actions of all Agents get interleaved amongst +threads in a thread pool. At any point in time, at most one ``send`` action for +each Agent is being executed. Actions dispatched to an agent from another thread +will occur in the order they were sent, potentially interleaved with actions +dispatched to the same agent from other sources. + +If an Agent is used within an enclosing transaction, then it will participate in +that transaction. Agents are integrated with Scala STM - any dispatches made in +a transaction are held until that transaction commits, and are discarded if it +is retried or aborted. + + +Creating and stopping Agents +============================ + +Agents are created by invoking ``Agent(value)`` passing in the Agent's initial +value: + +.. includecode:: code/akka/docs/agent/AgentDocSpec.scala#create + +Note that creating an Agent requires an implicit ``ActorSystem`` (for creating +the underlying actors). See :ref:`actor-systems` for more information about +actor systems. An ActorSystem can be in implicit scope when creating an Agent: + +.. includecode:: code/akka/docs/agent/AgentDocSpec.scala#create-implicit-system + +Or the ActorSystem can be passed explicitly when creating an Agent: + +.. includecode:: code/akka/docs/agent/AgentDocSpec.scala#create-explicit-system + +An Agent will be running until you invoke ``close`` on it. Then it will be +eligible for garbage collection (unless you hold on to it in some way). + +.. includecode:: code/akka/docs/agent/AgentDocSpec.scala#close + + +Updating Agents +=============== + +You update an Agent by sending a function that transforms the current value or +by sending just a new value. The Agent will apply the new value or function +atomically and asynchronously. The update is done in a fire-forget manner and +you are only guaranteed that it will be applied. There is no guarantee of when +the update will be applied but dispatches to an Agent from a single thread will +occur in order. You apply a value or a function by invoking the ``send`` +function. + +.. includecode:: code/akka/docs/agent/AgentDocSpec.scala#send + +You can also dispatch a function to update the internal state but on its own +thread. This does not use the reactive thread pool and can be used for +long-running or blocking operations. You do this with the ``sendOff`` +method. Dispatches using either ``sendOff`` or ``send`` will still be executed +in order. + +.. includecode:: code/akka/docs/agent/AgentDocSpec.scala#send-off + + +Reading an Agent's value +======================== + +Agents can be dereferenced (you can get an Agent's value) by invoking the Agent +with parentheses like this: + +.. includecode:: code/akka/docs/agent/AgentDocSpec.scala#read-apply + +Or by using the get method: + +.. includecode:: code/akka/docs/agent/AgentDocSpec.scala#read-get + +Reading an Agent's current value does not involve any message passing and +happens immediately. So while updates to an Agent are asynchronous, reading the +state of an Agent is synchronous. + + +Awaiting an Agent's value +========================= + +It is also possible to read the value after all currently queued sends have +completed. You can do this with ``await``: + +.. includecode:: code/akka/docs/agent/AgentDocSpec.scala#read-await + +You can also get a ``Future`` to this value, that will be completed after the +currently queued updates have completed: + +.. includecode:: code/akka/docs/agent/AgentDocSpec.scala#read-future + + +Transactional Agents +==================== + +If an Agent is used within an enclosing transaction, then it will participate in +that transaction. If you send to an Agent within a transaction then the dispatch +to the Agent will be held until that transaction commits, and discarded if the +transaction is aborted. Here's an example: + +.. includecode:: code/akka/docs/agent/AgentDocSpec.scala#transfer-example + + +Monadic usage +============= + +Agents are also monadic, allowing you to compose operations using +for-comprehensions. In monadic usage, new Agents are created leaving the +original Agents untouched. So the old values (Agents) are still available +as-is. They are so-called 'persistent'. + +Example of monadic usage: + +.. includecode:: code/akka/docs/agent/AgentDocSpec.scala#monadic-example diff --git a/akka-docs/scala/code/akka/docs/actor/ActorDocSpec.scala b/akka-docs/scala/code/akka/docs/actor/ActorDocSpec.scala index 54bdc875e3..3d9587b8dd 100644 --- a/akka-docs/scala/code/akka/docs/actor/ActorDocSpec.scala +++ b/akka-docs/scala/code/akka/docs/actor/ActorDocSpec.scala @@ -20,6 +20,8 @@ import org.scalatest.matchers.MustMatchers import akka.testkit._ import akka.util._ import akka.util.duration._ +import akka.actor.Actor.Receive +import akka.dispatch.Await //#my-actor class MyActor extends Actor { @@ -238,6 +240,27 @@ class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) { system.stop(myActor) } + "using implicit timeout" in { + val myActor = system.actorOf(Props(new FirstActor)) + //#using-implicit-timeout + import akka.util.duration._ + import akka.util.Timeout + implicit val timeout = Timeout(500 millis) + val future = myActor ? "hello" + //#using-implicit-timeout + Await.result(future, timeout.duration) must be("hello") + + } + + "using explicit timeout" in { + val myActor = system.actorOf(Props(new FirstActor)) + //#using-explicit-timeout + import akka.util.duration._ + val future = myActor ? ("hello", timeout = 500 millis) + //#using-explicit-timeout + Await.result(future, 500 millis) must be("hello") + } + "using receiveTimeout" in { //#receive-timeout import akka.actor.ReceiveTimeout diff --git a/akka-docs/scala/code/akka/docs/agent/AgentDocSpec.scala b/akka-docs/scala/code/akka/docs/agent/AgentDocSpec.scala new file mode 100644 index 0000000000..95e4b04ea9 --- /dev/null +++ b/akka-docs/scala/code/akka/docs/agent/AgentDocSpec.scala @@ -0,0 +1,190 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ +package akka.docs.agent + +import akka.agent.Agent +import akka.util.duration._ +import akka.util.Timeout +import akka.testkit._ + +class AgentDocSpec extends AkkaSpec { + + "create and close" in { + //#create + import akka.agent.Agent + + val agent = Agent(5) + //#create + + //#close + agent.close() + //#close + } + + "create with implicit system" in { + //#create-implicit-system + import akka.actor.ActorSystem + import akka.agent.Agent + + implicit val system = ActorSystem("app") + + val agent = Agent(5) + //#create-implicit-system + + agent.close() + system.shutdown() + } + + "create with explicit system" in { + //#create-explicit-system + import akka.actor.ActorSystem + import akka.agent.Agent + + val system = ActorSystem("app") + + val agent = Agent(5)(system) + //#create-explicit-system + + agent.close() + system.shutdown() + } + + "send and sendOff" in { + val agent = Agent(0) + + //#send + // send a value + agent send 7 + + // send a function + agent send (_ + 1) + agent send (_ * 2) + //#send + + def longRunningOrBlockingFunction = (i: Int) ⇒ i * 1 + + //#send-off + // sendOff a function + agent sendOff (longRunningOrBlockingFunction) + //#send-off + + val result = agent.await(Timeout(5 seconds)) + result must be === 16 + } + + "read with apply" in { + val agent = Agent(0) + + //#read-apply + val result = agent() + //#read-apply + + result must be === 0 + } + + "read with get" in { + val agent = Agent(0) + + //#read-get + val result = agent.get + //#read-get + + result must be === 0 + } + + "read with await" in { + val agent = Agent(0) + + //#read-await + import akka.util.duration._ + import akka.util.Timeout + + implicit val timeout = Timeout(5 seconds) + val result = agent.await + //#read-await + + result must be === 0 + } + + "read with future" in { + val agent = Agent(0) + + //#read-future + import akka.dispatch.Await + + implicit val timeout = Timeout(5 seconds) + val future = agent.future + val result = Await.result(future, timeout.duration) + //#read-future + + result must be === 0 + } + + "transfer example" in { + //#transfer-example + import akka.agent.Agent + import akka.util.duration._ + import akka.util.Timeout + import scala.concurrent.stm._ + + def transfer(from: Agent[Int], to: Agent[Int], amount: Int): Boolean = { + atomic { txn ⇒ + if (from.get < amount) false + else { + from send (_ - amount) + to send (_ + amount) + true + } + } + } + + val from = Agent(100) + val to = Agent(20) + val ok = transfer(from, to, 50) + + implicit val timeout = Timeout(5 seconds) + val fromValue = from.await // -> 50 + val toValue = to.await // -> 70 + //#transfer-example + + fromValue must be === 50 + toValue must be === 70 + } + + "monadic example" in { + //#monadic-example + val agent1 = Agent(3) + val agent2 = Agent(5) + + // uses foreach + var result = 0 + for (value ← agent1) { + result = value + 1 + } + + // uses map + val agent3 = for (value ← agent1) yield value + 1 + + // or using map directly + val agent4 = agent1 map (_ + 1) + + // uses flatMap + val agent5 = for { + value1 ← agent1 + value2 ← agent2 + } yield value1 + value2 + //#monadic-example + + result must be === 4 + agent3() must be === 4 + agent4() must be === 4 + agent5() must be === 8 + + agent1.close() + agent2.close() + agent3.close() + agent4.close() + agent5.close() + } +} diff --git a/akka-docs/scala/index.rst b/akka-docs/scala/index.rst index 54139db119..f571e2fec8 100644 --- a/akka-docs/scala/index.rst +++ b/akka-docs/scala/index.rst @@ -18,7 +18,7 @@ Scala API remoting serialization fsm + agents testing extending-akka - agents transactors diff --git a/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailbox.scala b/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailbox.scala index c680511697..57d7b3e098 100644 --- a/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailbox.scala @@ -9,7 +9,7 @@ import java.util.concurrent.TimeUnit.MILLISECONDS import akka.actor.LocalActorRef import akka.util.Duration import akka.AkkaException -import akka.actor.ActorCell +import akka.actor.ActorContext import akka.dispatch.Envelope import akka.event.Logging import akka.actor.ActorRef @@ -19,7 +19,7 @@ class BeanstalkBasedMailboxException(message: String) extends AkkaException(mess /** * @author Jonas Bonér */ -class BeanstalkBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner) with DurableMessageSerialization { +class BeanstalkBasedMailbox(val owner: ActorContext) extends DurableMailbox(owner) with DurableMessageSerialization { private val settings = BeanstalkBasedMailboxExtension(owner.system) private val messageSubmitDelaySeconds = settings.MessageSubmitDelay.toSeconds.toInt @@ -78,7 +78,7 @@ class BeanstalkBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner) // TODO PN: Why volatile on local variable? @volatile var connected = false - // TODO PN: attempts is not used. Should we have maxAttempts check? Note that this is called from ThreadLocal.initialValue + // TODO PN: attempts is not used. Should we have maxAttempts check? Note that this is called from ThreadLocal.initialValue var attempts = 0 var client: Client = null while (!connected) { diff --git a/akka-durable-mailboxes/akka-beanstalk-mailbox/src/test/scala/akka/actor/mailbox/BeanstalkBasedMailboxSpec.scala b/akka-durable-mailboxes/akka-beanstalk-mailbox/src/test/scala/akka/actor/mailbox/BeanstalkBasedMailboxSpec.scala index 88b1bf85ae..f011352d2b 100644 --- a/akka-durable-mailboxes/akka-beanstalk-mailbox/src/test/scala/akka/actor/mailbox/BeanstalkBasedMailboxSpec.scala +++ b/akka-durable-mailboxes/akka-beanstalk-mailbox/src/test/scala/akka/actor/mailbox/BeanstalkBasedMailboxSpec.scala @@ -1,4 +1,7 @@ package akka.actor.mailbox +import akka.dispatch.CustomMailboxType + @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class BeanstalkBasedMailboxSpec extends DurableMailboxSpec("Beanstalkd", BeanstalkDurableMailboxType) +class BeanstalkBasedMailboxSpec extends DurableMailboxSpec("Beanstalkd", + new CustomMailboxType("akka.actor.mailbox.BeanstalkBasedMailbox")) diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FiledBasedMailbox.scala b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FiledBasedMailbox.scala index 8fa7f81e25..55c96ea65c 100644 --- a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FiledBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/FiledBasedMailbox.scala @@ -5,12 +5,12 @@ package akka.actor.mailbox import org.apache.commons.io.FileUtils -import akka.actor.ActorCell +import akka.actor.ActorContext import akka.dispatch.Envelope import akka.event.Logging import akka.actor.ActorRef -class FileBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner) with DurableMessageSerialization { +class FileBasedMailbox(val owner: ActorContext) extends DurableMailbox(owner) with DurableMessageSerialization { val log = Logging(system, "FileBasedMailbox") diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/test/scala/akka/actor/mailbox/FileBasedMailboxSpec.scala b/akka-durable-mailboxes/akka-file-mailbox/src/test/scala/akka/actor/mailbox/FileBasedMailboxSpec.scala index d1a36d14eb..43b8b2c048 100644 --- a/akka-durable-mailboxes/akka-file-mailbox/src/test/scala/akka/actor/mailbox/FileBasedMailboxSpec.scala +++ b/akka-durable-mailboxes/akka-file-mailbox/src/test/scala/akka/actor/mailbox/FileBasedMailboxSpec.scala @@ -1,9 +1,11 @@ package akka.actor.mailbox import org.apache.commons.io.FileUtils +import akka.dispatch.CustomMailboxType @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class FileBasedMailboxSpec extends DurableMailboxSpec("File", FileDurableMailboxType) { +class FileBasedMailboxSpec extends DurableMailboxSpec("File", + new CustomMailboxType("akka.actor.mailbox.FileBasedMailbox")) { def clean { val queuePath = FileBasedMailboxExtension(system).QueuePath diff --git a/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala b/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala index 7eb30b2fdb..c8f9026cbf 100644 --- a/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala +++ b/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala @@ -4,15 +4,14 @@ package akka.actor.mailbox import akka.util.ReflectiveAccess -import java.lang.reflect.InvocationTargetException import akka.AkkaException -import akka.actor.ActorCell +import akka.actor.ActorContext import akka.actor.ActorRef import akka.actor.SerializedActorRef import akka.dispatch.Envelope import akka.dispatch.DefaultSystemMessageQueue import akka.dispatch.Dispatcher -import akka.dispatch.Mailbox +import akka.dispatch.CustomMailbox import akka.dispatch.MailboxType import akka.dispatch.MessageDispatcher import akka.dispatch.MessageQueue @@ -24,6 +23,7 @@ import akka.remote.RemoteActorRefProvider import akka.remote.netty.NettyRemoteServer import akka.serialization.Serialization import com.typesafe.config.Config +import akka.dispatch.CustomMailboxType private[akka] object DurableExecutableMailboxConfig { val Name = "[\\.\\/\\$\\s]".r @@ -33,7 +33,7 @@ class DurableMailboxException private[akka] (message: String, cause: Throwable) def this(message: String) = this(message, null) } -abstract class DurableMailbox(owner: ActorCell) extends Mailbox(owner) with DefaultSystemMessageQueue { +abstract class DurableMailbox(owner: ActorContext) extends CustomMailbox(owner) with DefaultSystemMessageQueue { import DurableExecutableMailboxConfig._ def system = owner.system @@ -45,7 +45,7 @@ abstract class DurableMailbox(owner: ActorCell) extends Mailbox(owner) with Defa trait DurableMessageSerialization { - def owner: ActorCell + def owner: ActorContext def serialize(durableMessage: Envelope): Array[Byte] = { @@ -73,73 +73,3 @@ trait DurableMessageSerialization { } -abstract class DurableMailboxType(mailboxFQN: String) extends MailboxType { - val constructorSignature = Array[Class[_]](classOf[ActorCell]) - - val mailboxClass: Class[_] = ReflectiveAccess.getClassFor(mailboxFQN, classOf[ActorCell].getClassLoader) match { - case Right(clazz) ⇒ clazz - case Left(exception) ⇒ - val cause = exception match { - case i: InvocationTargetException ⇒ i.getTargetException - case _ ⇒ exception - } - throw new DurableMailboxException("Cannot find class [%s] due to: %s".format(mailboxFQN, cause.toString)) - } - - //TODO take into consideration a mailboxConfig parameter so one can have bounded mboxes and capacity etc - def create(receiver: ActorCell): Mailbox = { - ReflectiveAccess.createInstance[AnyRef](mailboxClass, constructorSignature, Array[AnyRef](receiver)) match { - case Right(instance) ⇒ instance.asInstanceOf[Mailbox] - case Left(exception) ⇒ - val cause = exception match { - case i: InvocationTargetException ⇒ i.getTargetException - case _ ⇒ exception - } - throw new DurableMailboxException("Cannot instantiate [%s] due to: %s".format(mailboxClass.getName, cause.toString)) - } - } -} - -case object RedisDurableMailboxType extends DurableMailboxType("akka.actor.mailbox.RedisBasedMailbox") -case object MongoDurableMailboxType extends DurableMailboxType("akka.actor.mailbox.MongoBasedMailbox") -case object BeanstalkDurableMailboxType extends DurableMailboxType("akka.actor.mailbox.BeanstalkBasedMailbox") -case object FileDurableMailboxType extends DurableMailboxType("akka.actor.mailbox.FileBasedMailbox") -case object ZooKeeperDurableMailboxType extends DurableMailboxType("akka.actor.mailbox.ZooKeeperBasedMailbox") -case class FqnDurableMailboxType(mailboxFQN: String) extends DurableMailboxType(mailboxFQN) - -/** - * Java API for the mailbox types. Usage: - *

- * MessageDispatcher dispatcher = system.dispatcherFactory()
- *   .newDispatcher("my-dispatcher", 1, DurableMailboxType.redisDurableMailboxType()).build();
- * 
- */ -object DurableMailboxType { - def redisDurableMailboxType(): DurableMailboxType = RedisDurableMailboxType - def mongoDurableMailboxType(): DurableMailboxType = MongoDurableMailboxType - def beanstalkDurableMailboxType(): DurableMailboxType = BeanstalkDurableMailboxType - def fileDurableMailboxType(): DurableMailboxType = FileDurableMailboxType - def zooKeeperDurableMailboxType(): DurableMailboxType = ZooKeeperDurableMailboxType - def fqnDurableMailboxType(mailboxFQN: String): DurableMailboxType = FqnDurableMailboxType(mailboxFQN) -} - -/** - * Configurator for the DurableMailbox - * Do not forget to specify the "storage", valid values are "redis", "beanstalkd", "zookeeper", "mongodb", "file", - * or a full class name of the Mailbox implementation. - */ -class DurableMailboxConfigurator { - // TODO PN #896: when and how is this class supposed to be used? Can we remove it? - - def mailboxType(config: Config): MailboxType = { - if (!config.hasPath("storage")) throw new DurableMailboxException("No 'storage' defined for durable mailbox") - config.getString("storage") match { - case "redis" ⇒ RedisDurableMailboxType - case "mongodb" ⇒ MongoDurableMailboxType - case "beanstalk" ⇒ BeanstalkDurableMailboxType - case "zookeeper" ⇒ ZooKeeperDurableMailboxType - case "file" ⇒ FileDurableMailboxType - case fqn ⇒ FqnDurableMailboxType(fqn) - } - } -} diff --git a/akka-durable-mailboxes/akka-mailboxes-common/src/test/scala/akka/actor/mailbox/DurableMailboxSpec.scala b/akka-durable-mailboxes/akka-mailboxes-common/src/test/scala/akka/actor/mailbox/DurableMailboxSpec.scala index eec1b03192..5bce062203 100644 --- a/akka-durable-mailboxes/akka-mailboxes-common/src/test/scala/akka/actor/mailbox/DurableMailboxSpec.scala +++ b/akka-durable-mailboxes/akka-mailboxes-common/src/test/scala/akka/actor/mailbox/DurableMailboxSpec.scala @@ -1,15 +1,16 @@ package akka.actor.mailbox import java.util.concurrent.TimeUnit +import java.util.concurrent.CountDownLatch import org.scalatest.WordSpec import org.scalatest.matchers.MustMatchers import org.scalatest.{ BeforeAndAfterEach, BeforeAndAfterAll } import akka.actor._ import akka.actor.Actor._ -import java.util.concurrent.CountDownLatch import akka.dispatch.MessageDispatcher -import akka.testkit.AkkaSpec import akka.dispatch.Dispatchers +import akka.dispatch.MailboxType +import akka.testkit.AkkaSpec object DurableMailboxSpecActorFactory { @@ -23,7 +24,7 @@ object DurableMailboxSpecActorFactory { } -abstract class DurableMailboxSpec(val backendName: String, val mailboxType: DurableMailboxType) extends AkkaSpec with BeforeAndAfterEach { +abstract class DurableMailboxSpec(val backendName: String, val mailboxType: MailboxType) extends AkkaSpec with BeforeAndAfterEach { import DurableMailboxSpecActorFactory._ implicit val dispatcher = system.dispatcherFactory.newDispatcher(backendName, throughput = 1, mailboxType = mailboxType).build diff --git a/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala b/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala index 6e1c28219d..b404e3c844 100644 --- a/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala @@ -7,7 +7,7 @@ import akka.AkkaException import com.mongodb.async._ import com.mongodb.async.futures.RequestFutures import org.bson.collection._ -import akka.actor.ActorCell +import akka.actor.ActorContext import akka.event.Logging import akka.actor.ActorRef import akka.dispatch.{ Await, Promise, Envelope, DefaultPromise } @@ -26,7 +26,7 @@ class MongoBasedMailboxException(message: String) extends AkkaException(message) * * @author Brendan W. McAdams */ -class MongoBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner) { +class MongoBasedMailbox(val owner: ActorContext) extends DurableMailbox(owner) { // this implicit object provides the context for reading/writing things as MongoDurableMessage implicit val mailboxBSONSer = new BSONSerializableMailbox(system) implicit val safeWrite = WriteConcern.Safe // TODO - Replica Safe when appropriate! diff --git a/akka-durable-mailboxes/akka-mongo-mailbox/src/test/scala/akka/actor/mailbox/MongoBasedMailboxSpec.scala b/akka-durable-mailboxes/akka-mongo-mailbox/src/test/scala/akka/actor/mailbox/MongoBasedMailboxSpec.scala index ec13cff17f..4746a47242 100644 --- a/akka-durable-mailboxes/akka-mongo-mailbox/src/test/scala/akka/actor/mailbox/MongoBasedMailboxSpec.scala +++ b/akka-durable-mailboxes/akka-mongo-mailbox/src/test/scala/akka/actor/mailbox/MongoBasedMailboxSpec.scala @@ -1,18 +1,19 @@ package akka.actor.mailbox import java.util.concurrent.TimeUnit - import org.scalatest.WordSpec import org.scalatest.matchers.MustMatchers import org.scalatest.{ BeforeAndAfterEach, BeforeAndAfterAll } - import akka.actor._ import akka.actor.Actor._ import java.util.concurrent.CountDownLatch import akka.dispatch.MessageDispatcher +import akka.dispatch.CustomMailboxType @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class MongoBasedMailboxSpec extends DurableMailboxSpec("mongodb", MongoDurableMailboxType) { +class MongoBasedMailboxSpec extends DurableMailboxSpec("mongodb", + new CustomMailboxType("akka.actor.mailbox.MongoBasedMailbox")) { + import org.apache.log4j.{ Logger, Level } import com.mongodb.async._ diff --git a/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailbox.scala b/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailbox.scala index f937be09e0..21c5555590 100644 --- a/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailbox.scala @@ -6,14 +6,14 @@ package akka.actor.mailbox import com.redis._ import akka.actor.LocalActorRef import akka.AkkaException -import akka.actor.ActorCell +import akka.actor.ActorContext import akka.dispatch.Envelope import akka.event.Logging import akka.actor.ActorRef class RedisBasedMailboxException(message: String) extends AkkaException(message) -class RedisBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner) with DurableMessageSerialization { +class RedisBasedMailbox(val owner: ActorContext) extends DurableMailbox(owner) with DurableMessageSerialization { private val settings = RedisBasedMailboxExtension(owner.system) diff --git a/akka-durable-mailboxes/akka-redis-mailbox/src/test/scala/akka/actor/mailbox/RedisBasedMailboxSpec.scala b/akka-durable-mailboxes/akka-redis-mailbox/src/test/scala/akka/actor/mailbox/RedisBasedMailboxSpec.scala index 8b8ea6d611..ecb700d383 100644 --- a/akka-durable-mailboxes/akka-redis-mailbox/src/test/scala/akka/actor/mailbox/RedisBasedMailboxSpec.scala +++ b/akka-durable-mailboxes/akka-redis-mailbox/src/test/scala/akka/actor/mailbox/RedisBasedMailboxSpec.scala @@ -1,4 +1,6 @@ package akka.actor.mailbox +import akka.dispatch.CustomMailboxType @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class RedisBasedMailboxSpec extends DurableMailboxSpec("Redis", RedisDurableMailboxType) +class RedisBasedMailboxSpec extends DurableMailboxSpec("Redis", + new CustomMailboxType("akka.actor.mailbox.RedisBasedMailbox")) diff --git a/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala b/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala index 3a50b93e93..1cdedba25d 100644 --- a/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala @@ -8,7 +8,7 @@ import akka.actor.LocalActorRef import akka.util.Duration import akka.AkkaException import org.I0Itec.zkclient.serialize._ -import akka.actor.ActorCell +import akka.actor.ActorContext import akka.cluster.zookeeper.AkkaZkClient import akka.dispatch.Envelope import akka.event.Logging @@ -17,7 +17,7 @@ import akka.actor.ActorRef class ZooKeeperBasedMailboxException(message: String) extends AkkaException(message) -class ZooKeeperBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner) with DurableMessageSerialization { +class ZooKeeperBasedMailbox(val owner: ActorContext) extends DurableMailbox(owner) with DurableMessageSerialization { private val settings = ZooKeeperBasedMailboxExtension(owner.system) val queueNode = "/queues" diff --git a/akka-durable-mailboxes/akka-zookeeper-mailbox/src/test/scala/akka/actor/mailbox/ZooKeeperBasedMailboxSpec.scala b/akka-durable-mailboxes/akka-zookeeper-mailbox/src/test/scala/akka/actor/mailbox/ZooKeeperBasedMailboxSpec.scala index 3cdc734830..888c46c1ea 100644 --- a/akka-durable-mailboxes/akka-zookeeper-mailbox/src/test/scala/akka/actor/mailbox/ZooKeeperBasedMailboxSpec.scala +++ b/akka-durable-mailboxes/akka-zookeeper-mailbox/src/test/scala/akka/actor/mailbox/ZooKeeperBasedMailboxSpec.scala @@ -4,10 +4,13 @@ import akka.actor.{ Actor, LocalActorRef } import akka.cluster.zookeeper._ import org.I0Itec.zkclient._ import akka.dispatch.MessageDispatcher +import akka.dispatch.CustomMailboxType import akka.actor.ActorRef @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class ZooKeeperBasedMailboxSpec extends DurableMailboxSpec("ZooKeeper", ZooKeeperDurableMailboxType) { +class ZooKeeperBasedMailboxSpec extends DurableMailboxSpec("ZooKeeper", + new CustomMailboxType("akka.actor.mailbox.ZooKeeperBasedMailbox")) { + val dataPath = "_akka_cluster/data" val logPath = "_akka_cluster/log" diff --git a/akka-remote/src/main/scala/akka/remote/Remote.scala b/akka-remote/src/main/scala/akka/remote/Remote.scala index 35d417be1a..49b9e63db8 100644 --- a/akka-remote/src/main/scala/akka/remote/Remote.scala +++ b/akka-remote/src/main/scala/akka/remote/Remote.scala @@ -94,7 +94,7 @@ class Remote(val settings: ActorSystem.Settings, val remoteSettings: RemoteSetti case Right(remote) ⇒ - remote.start(None) //TODO Any application loader here? + remote.start(Option(Thread.currentThread().getContextClassLoader)) //TODO Any application loader here? val remoteClientLifeCycleHandler = system.systemActorOf(Props(new Actor { def receive = { diff --git a/akka-remote/src/main/scala/akka/remote/RemoteInterface.scala b/akka-remote/src/main/scala/akka/remote/RemoteInterface.scala index 4e6730f65b..ff01e8eb26 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteInterface.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteInterface.scala @@ -14,6 +14,7 @@ import java.net.URISyntaxException import java.net.InetAddress import java.net.UnknownHostException import java.net.UnknownServiceException +import akka.event.Logging /** * Interface for remote transports to encode their addresses. The three parts @@ -135,7 +136,9 @@ trait RemoteModule { /** * Remote life-cycle events. */ -sealed trait RemoteLifeCycleEvent +sealed trait RemoteLifeCycleEvent { + def logLevel: Logging.LogLevel +} /** * Life-cycle events for RemoteClient. @@ -148,6 +151,7 @@ case class RemoteClientError[T <: ParsedTransportAddress]( @BeanProperty cause: Throwable, @BeanProperty remote: RemoteSupport[T], @BeanProperty remoteAddress: T) extends RemoteClientLifeCycleEvent { + override def logLevel = Logging.ErrorLevel override def toString = "RemoteClientError@" + remoteAddress + @@ -159,6 +163,7 @@ case class RemoteClientError[T <: ParsedTransportAddress]( case class RemoteClientDisconnected[T <: ParsedTransportAddress]( @BeanProperty remote: RemoteSupport[T], @BeanProperty remoteAddress: T) extends RemoteClientLifeCycleEvent { + override def logLevel = Logging.DebugLevel override def toString = "RemoteClientDisconnected@" + remoteAddress } @@ -166,6 +171,7 @@ case class RemoteClientDisconnected[T <: ParsedTransportAddress]( case class RemoteClientConnected[T <: ParsedTransportAddress]( @BeanProperty remote: RemoteSupport[T], @BeanProperty remoteAddress: T) extends RemoteClientLifeCycleEvent { + override def logLevel = Logging.DebugLevel override def toString = "RemoteClientConnected@" + remoteAddress } @@ -173,6 +179,7 @@ case class RemoteClientConnected[T <: ParsedTransportAddress]( case class RemoteClientStarted[T <: ParsedTransportAddress]( @BeanProperty remote: RemoteSupport[T], @BeanProperty remoteAddress: T) extends RemoteClientLifeCycleEvent { + override def logLevel = Logging.InfoLevel override def toString = "RemoteClientStarted@" + remoteAddress } @@ -180,6 +187,7 @@ case class RemoteClientStarted[T <: ParsedTransportAddress]( case class RemoteClientShutdown[T <: ParsedTransportAddress]( @BeanProperty remote: RemoteSupport[T], @BeanProperty remoteAddress: T) extends RemoteClientLifeCycleEvent { + override def logLevel = Logging.InfoLevel override def toString = "RemoteClientShutdown@" + remoteAddress } @@ -189,6 +197,7 @@ case class RemoteClientWriteFailed[T <: ParsedTransportAddress]( @BeanProperty cause: Throwable, @BeanProperty remote: RemoteSupport[T], @BeanProperty remoteAddress: T) extends RemoteClientLifeCycleEvent { + override def logLevel = Logging.WarningLevel override def toString = "RemoteClientWriteFailed@" + remoteAddress + @@ -206,12 +215,14 @@ trait RemoteServerLifeCycleEvent extends RemoteLifeCycleEvent case class RemoteServerStarted[T <: ParsedTransportAddress]( @BeanProperty remote: RemoteSupport[T]) extends RemoteServerLifeCycleEvent { + override def logLevel = Logging.InfoLevel override def toString = "RemoteServerStarted@" + remote.name } case class RemoteServerShutdown[T <: ParsedTransportAddress]( @BeanProperty remote: RemoteSupport[T]) extends RemoteServerLifeCycleEvent { + override def logLevel = Logging.InfoLevel override def toString = "RemoteServerShutdown@" + remote.name } @@ -219,6 +230,7 @@ case class RemoteServerShutdown[T <: ParsedTransportAddress]( case class RemoteServerError[T <: ParsedTransportAddress]( @BeanProperty val cause: Throwable, @BeanProperty remote: RemoteSupport[T]) extends RemoteServerLifeCycleEvent { + override def logLevel = Logging.ErrorLevel override def toString = "RemoteServerError@" + remote.name + @@ -230,6 +242,7 @@ case class RemoteServerError[T <: ParsedTransportAddress]( case class RemoteServerClientConnected[T <: ParsedTransportAddress]( @BeanProperty remote: RemoteSupport[T], @BeanProperty val clientAddress: Option[T]) extends RemoteServerLifeCycleEvent { + override def logLevel = Logging.DebugLevel override def toString = "RemoteServerClientConnected@" + remote.name + @@ -241,6 +254,7 @@ case class RemoteServerClientConnected[T <: ParsedTransportAddress]( case class RemoteServerClientDisconnected[T <: ParsedTransportAddress]( @BeanProperty remote: RemoteSupport[T], @BeanProperty val clientAddress: Option[T]) extends RemoteServerLifeCycleEvent { + override def logLevel = Logging.DebugLevel override def toString = "RemoteServerClientDisconnected@" + remote.name + @@ -252,6 +266,7 @@ case class RemoteServerClientDisconnected[T <: ParsedTransportAddress]( case class RemoteServerClientClosed[T <: ParsedTransportAddress]( @BeanProperty remote: RemoteSupport[T], @BeanProperty val clientAddress: Option[T]) extends RemoteServerLifeCycleEvent { + override def logLevel = Logging.DebugLevel override def toString = "RemoteServerClientClosed@" + remote.name + @@ -265,6 +280,7 @@ case class RemoteServerWriteFailed[T <: ParsedTransportAddress]( @BeanProperty cause: Throwable, @BeanProperty remote: RemoteSupport[T], @BeanProperty remoteAddress: Option[T]) extends RemoteServerLifeCycleEvent { + override def logLevel = Logging.WarningLevel override def toString = "RemoteServerWriteFailed@" + remote + @@ -320,7 +336,7 @@ abstract class RemoteSupport[-T <: ParsedTransportAddress](val system: ActorSyst protected[akka] def notifyListeners(message: RemoteLifeCycleEvent): Unit = { system.eventStream.publish(message) - system.log.debug("REMOTE: {}", message) + system.log.log(message.logLevel, "REMOTE: {}", message) } override def toString = name diff --git a/akka-remote/src/main/scala/akka/routing/RemoteRouters.scala b/akka-remote/src/main/scala/akka/routing/RemoteRouters.scala index d7f62c4f61..fc08c222af 100644 --- a/akka-remote/src/main/scala/akka/routing/RemoteRouters.scala +++ b/akka-remote/src/main/scala/akka/routing/RemoteRouters.scala @@ -19,7 +19,7 @@ trait RemoteRouterConfig extends RouterConfig { case x ⇒ throw new ConfigurationException("unparseable remote node " + x) } val node = Stream.continually(nodes).flatten.iterator - val impl = context.system.asInstanceOf[ActorSystemImpl] + val impl = context.system.asInstanceOf[ActorSystemImpl] //FIXME should we rely on this cast to work here? Vector.empty[ActorRef] ++ (for (i ← 1 to nrOfInstances) yield { val name = "c" + i val deploy = Deploy("", ConfigFactory.empty(), None, props.routerConfig, RemoteScope(node.next)) diff --git a/akka-testkit/src/main/scala/akka/testkit/TestFSMRef.scala b/akka-testkit/src/main/scala/akka/testkit/TestFSMRef.scala index 17c8a45ade..c37201cda5 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestFSMRef.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestFSMRef.scala @@ -88,12 +88,12 @@ class TestFSMRef[S, D, T <: Actor]( object TestFSMRef { def apply[S, D, T <: Actor](factory: ⇒ T)(implicit ev: T <:< FSM[S, D], system: ActorSystem): TestFSMRef[S, D, T] = { - val impl = system.asInstanceOf[ActorSystemImpl] + val impl = system.asInstanceOf[ActorSystemImpl] //FIXME should we rely on this cast to work here? new TestFSMRef(impl, system.dispatcherFactory.prerequisites, Props(creator = () ⇒ factory), impl.guardian.asInstanceOf[InternalActorRef], TestActorRef.randomName) } def apply[S, D, T <: Actor](factory: ⇒ T, name: String)(implicit ev: T <:< FSM[S, D], system: ActorSystem): TestFSMRef[S, D, T] = { - val impl = system.asInstanceOf[ActorSystemImpl] + val impl = system.asInstanceOf[ActorSystemImpl] //FIXME should we rely on this cast to work here? new TestFSMRef(impl, system.dispatcherFactory.prerequisites, Props(creator = () ⇒ factory), impl.guardian.asInstanceOf[InternalActorRef], name) } } diff --git a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala index 96b0cb151e..b5577fa747 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala @@ -102,7 +102,7 @@ class TestKit(_system: ActorSystem) { * registration as message target. */ lazy val testActor: ActorRef = { - val impl = system.asInstanceOf[ActorSystemImpl] + val impl = system.asInstanceOf[ActorSystemImpl] //FIXME should we rely on this cast to work here? impl.systemActorOf(Props(new TestActor(queue)) .copy(dispatcher = new CallingThreadDispatcher(system.dispatcherFactory.prerequisites)), "testActor" + TestKit.testActorId.incrementAndGet) diff --git a/akka-testkit/src/main/scala/akka/testkit/TestLatch.scala b/akka-testkit/src/main/scala/akka/testkit/TestLatch.scala index 13bb9d84ab..37e5e607fb 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestLatch.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestLatch.scala @@ -5,11 +5,9 @@ package akka.testkit import akka.util.Duration -import java.util.concurrent.{ CountDownLatch, TimeUnit } import akka.actor.ActorSystem - -class TestLatchTimeoutException(message: String) extends RuntimeException(message) -class TestLatchNoTimeoutException(message: String) extends RuntimeException(message) +import akka.dispatch.Await.{ CanAwait, Awaitable } +import java.util.concurrent.{ TimeoutException, CountDownLatch, TimeUnit } /** * A count down latch wrapper for use in testing. @@ -24,34 +22,23 @@ object TestLatch { def apply(count: Int = 1)(implicit system: ActorSystem) = new TestLatch(count) } -class TestLatch(count: Int = 1)(implicit system: ActorSystem) { +class TestLatch(count: Int = 1)(implicit system: ActorSystem) extends Awaitable[Unit] { private var latch = new CountDownLatch(count) def countDown() = latch.countDown() - def isOpen: Boolean = latch.getCount == 0 - def open() = while (!isOpen) countDown() - - def await(): Boolean = await(TestLatch.DefaultTimeout) - - def await(timeout: Duration): Boolean = { - val opened = latch.await(timeout.dilated.toNanos, TimeUnit.NANOSECONDS) - if (!opened) throw new TestLatchTimeoutException( - "Timeout of %s with time factor of %s" format (timeout.toString, TestKitExtension(system).TestTimeFactor)) - opened - } - - /** - * Timeout is expected. Throws exception if latch is opened before timeout. - */ - def awaitTimeout(timeout: Duration = TestLatch.DefaultTimeout) = { - val opened = latch.await(timeout.dilated.toNanos, TimeUnit.NANOSECONDS) - if (opened) throw new TestLatchNoTimeoutException( - "Latch opened before timeout of %s with time factor of %s" format (timeout.toString, TestKitExtension(system).TestTimeFactor)) - opened - } - def reset() = latch = new CountDownLatch(count) + + def ready(atMost: Duration)(implicit permit: CanAwait) = { + val opened = latch.await(atMost.dilated.toNanos, TimeUnit.NANOSECONDS) + if (!opened) throw new TimeoutException( + "Timeout of %s with time factor of %s" format (atMost.toString, TestKitExtension(system).TestTimeFactor)) + this + } + + def result(atMost: Duration)(implicit permit: CanAwait): Unit = { + ready(atMost) + } } diff --git a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala index baab14b9cb..e918a15b1c 100644 --- a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala @@ -107,7 +107,7 @@ class AkkaSpecSpec extends WordSpec with MustMatchers { system.actorFor("/") ! PoisonPill - latch.await(2 seconds) + Await.ready(latch, 2 seconds) } "must enqueue unread messages from testActor to deadLetters" in { @@ -139,7 +139,7 @@ class AkkaSpecSpec extends WordSpec with MustMatchers { val latch = new TestLatch(1)(system) system.registerOnTermination(latch.countDown()) system.shutdown() - latch.await(2 seconds) + Await.ready(latch, 2 seconds) Await.result(davyJones ? "Die!", timeout.duration) must be === "finally gone" // this will typically also contain log messages which were sent after the logger shutdown diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index 0776b584a7..a12964b8db 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -25,12 +25,12 @@ object AkkaBuild extends Build { id = "akka", base = file("."), settings = parentSettings ++ Release.settings ++ Unidoc.settings ++ Rstdoc.settings ++ Publish.versionSettings ++ Dist.settings ++ Seq( - parallelExecution in GlobalScope := false, + parallelExecution in GlobalScope := true, Publish.defaultPublishTo in ThisBuild <<= crossTarget / "repository", Unidoc.unidocExclude := Seq(samples.id, tutorials.id), Dist.distExclude := Seq(actorTests.id, akkaSbtPlugin.id, docs.id) ), - aggregate = Seq(actor, testkit, actorTests, remote, slf4j, mailboxes, kernel, akkaSbtPlugin, samples, tutorials, docs) + aggregate = Seq(actor, testkit, actorTests, remote, slf4j, agent, mailboxes, kernel, akkaSbtPlugin, samples, tutorials, docs) ) lazy val actor = Project( @@ -72,6 +72,8 @@ object AkkaBuild extends Build { dependencies = Seq(actor, actorTests % "test->test", testkit % "test->test"), settings = defaultSettings ++ multiJvmSettings ++ Seq( libraryDependencies ++= Dependencies.cluster, + // disable parallel tests + parallelExecution in Test := false, extraOptions in MultiJvm <<= (sourceDirectory in MultiJvm) { src => (name: String) => (src ** (name + ".conf")).get.headOption.map("-Dakka.config=" + _.absolutePath).toSeq }, @@ -92,6 +94,15 @@ object AkkaBuild extends Build { ) ) + lazy val agent = Project( + id = "akka-agent", + base = file("akka-agent"), + dependencies = Seq(actor, testkit % "test->test"), + settings = defaultSettings ++ Seq( + libraryDependencies ++= Dependencies.agent + ) + ) + // lazy val amqp = Project( // id = "akka-amqp", // base = file("akka-amqp"), @@ -254,7 +265,7 @@ object AkkaBuild extends Build { lazy val docs = Project( id = "akka-docs", base = file("akka-docs"), - dependencies = Seq(actor, testkit % "test->test", remote, slf4j, fileMailbox, mongoMailbox, redisMailbox, beanstalkMailbox, zookeeperMailbox), + dependencies = Seq(actor, testkit % "test->test", remote, slf4j, agent, fileMailbox, mongoMailbox, redisMailbox, beanstalkMailbox, zookeeperMailbox), settings = defaultSettings ++ Seq( unmanagedSourceDirectories in Test <<= baseDirectory { _ ** "code" get }, libraryDependencies ++= Dependencies.docs, @@ -291,8 +302,7 @@ object AkkaBuild extends Build { unmanagedClasspath in Runtime <+= (baseDirectory in LocalProject("akka")) map { base => Attributed.blank(base / "config") }, unmanagedClasspath in Test <+= (baseDirectory in LocalProject("akka")) map { base => Attributed.blank(base / "config") }, - // disable parallel tests - parallelExecution in Test := false, + parallelExecution in Test := true, // for excluding tests by name (or use system property: -Dakka.test.names.exclude=TimingSpec) excludeTestNames := { @@ -368,6 +378,8 @@ object Dependencies { val slf4j = Seq(slf4jApi) + val agent = Seq(scalaStm, Test.scalatest, Test.junit) + val amqp = Seq(rabbit, commonsIo, protobuf) val mailboxes = Seq(Test.scalatest, Test.junit) @@ -408,11 +420,12 @@ object Dependency { val Logback = "0.9.28" val Netty = "3.2.5.Final" val Protobuf = "2.4.1" + val Rabbit = "2.3.1" + val ScalaStm = "0.4" val Scalatest = "1.6.1" val Slf4j = "1.6.4" val Spring = "3.0.5.RELEASE" val Zookeeper = "3.4.0" - val Rabbit = "2.3.1" } // Compile @@ -437,6 +450,7 @@ object Dependency { val protobuf = "com.google.protobuf" % "protobuf-java" % V.Protobuf // New BSD val rabbit = "com.rabbitmq" % "amqp-client" % V.Rabbit // Mozilla Public License val redis = "net.debasishg" %% "redisclient" % "2.4.0" // ApacheV2 + val scalaStm = "org.scala-tools" %% "scala-stm" % V.ScalaStm // Modified BSD (Scala) val sjson = "net.debasishg" %% "sjson" % "0.15" // ApacheV2 val slf4jApi = "org.slf4j" % "slf4j-api" % V.Slf4j // MIT val springBeans = "org.springframework" % "spring-beans" % V.Spring // ApacheV2