diff --git a/akka-actor-tests/src/test/java/akka/actor/JavaAPITestActor.java b/akka-actor-tests/src/test/java/akka/actor/JavaAPITestActor.java index 7b4c5a48bb..f8e9d1c3ee 100644 --- a/akka-actor-tests/src/test/java/akka/actor/JavaAPITestActor.java +++ b/akka-actor-tests/src/test/java/akka/actor/JavaAPITestActor.java @@ -1,7 +1,8 @@ package akka.actor; public class JavaAPITestActor extends UntypedActor { - public void onReceive(Object msg) { - getSender().tell("got it!"); - } + public void onReceive(Object msg) { + getSender().tell("got it!"); + getContext().getChildren(); + } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorFireForgetRequestReplySpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorFireForgetRequestReplySpec.scala index 7daf812631..bcae4b276e 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorFireForgetRequestReplySpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorFireForgetRequestReplySpec.scala @@ -21,6 +21,7 @@ object ActorFireForgetRequestReplySpec { } class CrashingActor extends Actor { + implicit val system = context.system def receive = { case "Die" ⇒ state.finished.await @@ -29,6 +30,7 @@ object ActorFireForgetRequestReplySpec { } class SenderActor(replyActor: ActorRef) extends Actor { + implicit val system = context.system def receive = { case "Init" ⇒ replyActor ! "Send" @@ -51,7 +53,7 @@ object ActorFireForgetRequestReplySpec { } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class ActorFireForgetRequestReplySpec extends AkkaSpec with BeforeAndAfterEach { +class ActorFireForgetRequestReplySpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout { import ActorFireForgetRequestReplySpec._ override def beforeEach() = { diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorLifeCycleSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorLifeCycleSpec.scala index 5d3358dc6f..0080efc7c4 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorLifeCycleSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorLifeCycleSpec.scala @@ -26,7 +26,7 @@ object ActorLifeCycleSpec { } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class ActorLifeCycleSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSender { +class ActorLifeCycleSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSender with DefaultTimeout { import ActorLifeCycleSpec._ "An Actor" must { diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala index 613b3c7b36..42916a7433 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala @@ -37,6 +37,7 @@ object ActorRefSpec { } class WorkerActor() extends Actor { + implicit val system = context.system def receive = { case "work" ⇒ { work @@ -111,7 +112,7 @@ object ActorRefSpec { } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class ActorRefSpec extends AkkaSpec { +class ActorRefSpec extends AkkaSpec with DefaultTimeout { import akka.actor.ActorRefSpec._ def promiseIntercept(f: ⇒ Actor)(to: Promise[Actor]): Actor = try { diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorTimeoutSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorTimeoutSpec.scala index 0057cdda60..e2ac25bb2b 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorTimeoutSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorTimeoutSpec.scala @@ -7,9 +7,10 @@ import org.scalatest.BeforeAndAfterAll import akka.dispatch.FutureTimeoutException import akka.util.duration._ import akka.testkit.AkkaSpec +import akka.testkit.DefaultTimeout @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class ActorTimeoutSpec extends AkkaSpec with BeforeAndAfterAll { +class ActorTimeoutSpec extends AkkaSpec with BeforeAndAfterAll with DefaultTimeout { def actorWithTimeout(t: Timeout): ActorRef = actorOf(Props(creator = () ⇒ new Actor { def receive = { diff --git a/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala index 88b31f25d9..54e32a700b 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala @@ -10,9 +10,9 @@ import akka.util.duration._ import java.util.concurrent.atomic._ @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class DeathWatchSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSender { +class DeathWatchSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSender with DefaultTimeout { def startWatching(target: ActorRef) = actorOf(Props(new Actor { - watch(target) + context.startsWatching(target) def receive = { case x ⇒ testActor forward x } })) @@ -52,8 +52,8 @@ class DeathWatchSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende val terminal = actorOf(Props(context ⇒ { case _ ⇒ })) val monitor1, monitor3 = startWatching(terminal) val monitor2 = actorOf(Props(new Actor { - watch(terminal) - unwatch(terminal) + context.startsWatching(terminal) + context.stopsWatching(terminal) def receive = { case "ping" ⇒ sender ! "pong" case t: Terminated ⇒ testActor ! t @@ -107,7 +107,7 @@ class DeathWatchSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende val failed = (supervisor ? Props.empty).as[ActorRef].get val brother = (supervisor ? Props(new Actor { - watch(failed) + context.startsWatching(failed) def receive = Actor.emptyBehavior })).as[ActorRef].get diff --git a/akka-actor-tests/src/test/scala/akka/actor/FSMTransitionSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/FSMTransitionSpec.scala index fdeabd2a47..020b8d9421 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/FSMTransitionSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/FSMTransitionSpec.scala @@ -58,7 +58,7 @@ class FSMTransitionSpec extends AkkaSpec with ImplicitSender { val forward = actorOf(new Forwarder(testActor)) val fsm = actorOf(new MyFSM(testActor)) val sup = actorOf(Props(new Actor { - watch(fsm) + context.startsWatching(fsm) def receive = { case _ ⇒ } }).withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), None, None))) diff --git a/akka-actor-tests/src/test/scala/akka/actor/HotSwapSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/HotSwapSpec.scala index 66257f89f4..f57bfe3548 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/HotSwapSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/HotSwapSpec.scala @@ -37,7 +37,7 @@ class HotSwapSpec extends AkkaSpec { case "init" ⇒ _log += "init" barrier.await - case "swap" ⇒ become({ + case "swap" ⇒ context.become({ case _ ⇒ _log += "swapped" barrier.await @@ -113,12 +113,12 @@ class HotSwapSpec extends AkkaSpec { _log += "init" barrier.await case "swap" ⇒ - become({ + context.become({ case "swapped" ⇒ _log += "swapped" barrier.await case "revert" ⇒ - unbecome() + context.unbecome() }) barrier.await } diff --git a/akka-actor-tests/src/test/scala/akka/actor/IOActor.scala b/akka-actor-tests/src/test/scala/akka/actor/IOActor.scala index 3765ad5b6c..f4a800719d 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/IOActor.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/IOActor.scala @@ -17,6 +17,9 @@ object IOActorSpec { class SimpleEchoServer(host: String, port: Int, ioManager: ActorRef, started: TestLatch) extends Actor { + implicit val timeout = context.system.settings.ActorTimeout + implicit val dispatcher = context.dispatcher + override def preStart = { listen(ioManager, host, port) started.open() @@ -63,6 +66,9 @@ object IOActorSpec { // Basic Redis-style protocol class KVStore(host: String, port: Int, ioManager: ActorRef, started: TestLatch) extends Actor { + implicit val timeout = context.system.settings.ActorTimeout + implicit val dispatcher = context.dispatcher + var kvs: Map[String, ByteString] = Map.empty override def preStart = { @@ -117,6 +123,9 @@ object IOActorSpec { class KVClient(host: String, port: Int, ioManager: ActorRef) extends Actor with IO { + implicit val timeout = context.system.settings.ActorTimeout + implicit val dispatcher = context.dispatcher + var socket: SocketHandle = _ override def preStart { @@ -171,7 +180,7 @@ object IOActorSpec { } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class IOActorSpec extends AkkaSpec with BeforeAndAfterEach { +class IOActorSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout { import IOActorSpec._ "an IO Actor" must { diff --git a/akka-actor-tests/src/test/scala/akka/actor/ReceiveTimeoutSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ReceiveTimeoutSpec.scala index 944bf11a49..f34ea05924 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ReceiveTimeoutSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ReceiveTimeoutSpec.scala @@ -18,7 +18,7 @@ class ReceiveTimeoutSpec extends AkkaSpec { val timeoutLatch = TestLatch() val timeoutActor = actorOf(new Actor { - receiveTimeout = Some(500L) + context.receiveTimeout = Some(500 milliseconds) protected def receive = { case ReceiveTimeout ⇒ timeoutLatch.open @@ -33,7 +33,7 @@ class ReceiveTimeoutSpec extends AkkaSpec { val timeoutLatch = TestLatch() val timeoutActor = actorOf(new Actor { - receiveTimeout = Some(500L) + context.receiveTimeout = Some(500 milliseconds) protected def receive = { case ReceiveTimeout ⇒ timeoutLatch.open @@ -57,7 +57,7 @@ class ReceiveTimeoutSpec extends AkkaSpec { case object Tick val timeoutActor = actorOf(new Actor { - receiveTimeout = Some(500L) + context.receiveTimeout = Some(500 milliseconds) protected def receive = { case Tick ⇒ () @@ -77,14 +77,14 @@ class ReceiveTimeoutSpec extends AkkaSpec { case object Tick val timeoutActor = actorOf(new Actor { - receiveTimeout = Some(500L) + context.receiveTimeout = Some(500 milliseconds) protected def receive = { case Tick ⇒ () case ReceiveTimeout ⇒ count.incrementAndGet timeoutLatch.open - receiveTimeout = None + context.receiveTimeout = None } }) @@ -109,7 +109,7 @@ class ReceiveTimeoutSpec extends AkkaSpec { } "have ReceiveTimeout eq to Actors ReceiveTimeout" in { - akka.actor.Actors.receiveTimeout() must be theSameInstanceAs (ReceiveTimeout) + akka.actor.Actors.receiveTimeout must be theSameInstanceAs (ReceiveTimeout) } } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/RestartStrategySpec.scala b/akka-actor-tests/src/test/scala/akka/actor/RestartStrategySpec.scala index dd9e9ac79f..6d74053875 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/RestartStrategySpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/RestartStrategySpec.scala @@ -11,9 +11,10 @@ import akka.testkit.EventFilter import java.util.concurrent.{ TimeUnit, CountDownLatch } import org.multiverse.api.latches.StandardLatch import akka.testkit.AkkaSpec +import akka.testkit.DefaultTimeout @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class RestartStrategySpec extends AkkaSpec { +class RestartStrategySpec extends AkkaSpec with DefaultTimeout { override def atStartup { system.eventStream.publish(Mute(EventFilter[Exception]("Crashing..."))) @@ -206,7 +207,7 @@ class RestartStrategySpec extends AkkaSpec { val boss = actorOf(Props(new Actor { def receive = { - case p: Props ⇒ sender ! watch(context.actorOf(p)) + case p: Props ⇒ sender ! context.startsWatching(context.actorOf(p)) case t: Terminated ⇒ maxNoOfRestartsLatch.open } }).withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), None, Some(1000)))) diff --git a/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala index 4f70bc52f5..f1f8024f77 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala @@ -6,9 +6,10 @@ import akka.testkit.AkkaSpec import akka.testkit.EventFilter import akka.util.duration._ import java.util.concurrent.{ CountDownLatch, ConcurrentLinkedQueue, TimeUnit } +import akka.testkit.DefaultTimeout @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach { +class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout { private val cancellables = new ConcurrentLinkedQueue[Cancellable]() def collectCancellable(c: Cancellable): Cancellable = { @@ -96,6 +97,7 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach { * ticket #307 */ "pick up schedule after actor restart" in { + object Ping object Crash diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala index 7b6299ab69..6c87fb686f 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala @@ -22,7 +22,7 @@ object SupervisorHierarchySpec { } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class SupervisorHierarchySpec extends AkkaSpec { +class SupervisorHierarchySpec extends AkkaSpec with DefaultTimeout { import SupervisorHierarchySpec._ "A Supervisor Hierarchy" must { @@ -52,7 +52,7 @@ class SupervisorHierarchySpec extends AkkaSpec { val countDownMessages = new CountDownLatch(1) val countDownMax = new CountDownLatch(1) val boss = actorOf(Props(new Actor { - val crasher = watch(context.actorOf(Props(new CountDownActor(countDownMessages)))) + val crasher = context.startsWatching(context.actorOf(Props(new CountDownActor(countDownMessages)))) protected def receive = { case "killCrasher" ⇒ crasher ! Kill diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorMiscSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorMiscSpec.scala index d8ae9d7444..57229de46f 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorMiscSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorMiscSpec.scala @@ -7,9 +7,10 @@ import akka.testkit.{ filterEvents, EventFilter } import akka.dispatch.{ PinnedDispatcher, Dispatchers } import java.util.concurrent.{ TimeUnit, CountDownLatch } import akka.testkit.AkkaSpec +import akka.testkit.DefaultTimeout @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class SupervisorMiscSpec extends AkkaSpec { +class SupervisorMiscSpec extends AkkaSpec with DefaultTimeout { "A Supervisor" must { diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala index 65faedeb51..b1f2c30c27 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala @@ -50,7 +50,7 @@ object SupervisorSpec { } class Master(sendTo: ActorRef) extends Actor { - val temp = watch(context.actorOf(Props(new PingPongActor(sendTo)))) + val temp = context.startsWatching(context.actorOf(Props(new PingPongActor(sendTo)))) var s: ActorRef = _ @@ -63,7 +63,7 @@ object SupervisorSpec { } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSender { +class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSender with DefaultTimeout { import SupervisorSpec._ diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorTreeSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorTreeSpec.scala index a77f8c20c1..78eaeb4009 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorTreeSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorTreeSpec.scala @@ -11,9 +11,10 @@ import akka.actor.Actor._ import akka.testkit.{ TestKit, EventFilter, filterEvents, filterException } import akka.testkit.AkkaSpec import akka.testkit.ImplicitSender +import akka.testkit.DefaultTimeout @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class SupervisorTreeSpec extends AkkaSpec with ImplicitSender { +class SupervisorTreeSpec extends AkkaSpec with ImplicitSender with DefaultTimeout { "In a 3 levels deep supervisor tree (linked in the constructor) we" must { diff --git a/akka-actor-tests/src/test/scala/akka/actor/Ticket669Spec.scala b/akka-actor-tests/src/test/scala/akka/actor/Ticket669Spec.scala index e502b91069..75d3a88814 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/Ticket669Spec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/Ticket669Spec.scala @@ -9,9 +9,10 @@ import org.scalatest.BeforeAndAfterAll import akka.testkit.{ TestKit, filterEvents, EventFilter } import akka.testkit.AkkaSpec import akka.testkit.ImplicitSender +import akka.testkit.DefaultTimeout @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class Ticket669Spec extends AkkaSpec with BeforeAndAfterAll with ImplicitSender { +class Ticket669Spec extends AkkaSpec with BeforeAndAfterAll with ImplicitSender with DefaultTimeout { import Ticket669Spec._ // TODO: does this really make sense? diff --git a/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala index 00fb05561d..6b5557ff8a 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala @@ -16,6 +16,7 @@ import akka.serialization.SerializationExtension import akka.actor.TypedActor.{ PostRestart, PreRestart, PostStop, PreStart } import java.util.concurrent.{ TimeUnit, CountDownLatch } import akka.japi.{ Creator, Option ⇒ JOption } +import akka.testkit.DefaultTimeout object TypedActorSpec { @@ -80,7 +81,7 @@ object TypedActorSpec { class Bar extends Foo with Serializable { - import TypedActor.{ dispatcher, timeout } + import TypedActor.dispatcher def pigdog = "Pigdog" @@ -96,8 +97,10 @@ object TypedActorSpec { new KeptPromise(Right(pigdog + numbered)) } - def futureComposePigdogFrom(foo: Foo): Future[String] = + def futureComposePigdogFrom(foo: Foo): Future[String] = { + implicit val timeout = TypedActor.system.settings.ActorTimeout foo.futurePigdog(500).map(_.toUpperCase) + } def optionPigdog(): Option[String] = Some(pigdog) @@ -157,7 +160,7 @@ object TypedActorSpec { } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class TypedActorSpec extends AkkaSpec with BeforeAndAfterEach with BeforeAndAfterAll { +class TypedActorSpec extends AkkaSpec with BeforeAndAfterEach with BeforeAndAfterAll with DefaultTimeout { import TypedActorSpec._ diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala index c601a42700..81e3290005 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala @@ -53,7 +53,7 @@ object ActorModelSpec { class DispatcherActor extends Actor { private val busy = new Switch(false) - def interceptor = dispatcher.asInstanceOf[MessageDispatcherInterceptor] + def interceptor = context.dispatcher.asInstanceOf[MessageDispatcherInterceptor] def ack { if (!busy.switchOn()) { @@ -223,7 +223,7 @@ object ActorModelSpec { } } -abstract class ActorModelSpec extends AkkaSpec { +abstract class ActorModelSpec extends AkkaSpec with DefaultTimeout { import ActorModelSpec._ @@ -343,7 +343,7 @@ abstract class ActorModelSpec extends AkkaSpec { val waitTime = (30 seconds).dilated.toMillis val boss = actorOf(Props(new Actor { def receive = { - case "run" ⇒ for (_ ← 1 to num) (watch(context.actorOf(props))) ! cachedMessage + case "run" ⇒ for (_ ← 1 to num) (context.startsWatching(context.actorOf(props))) ! cachedMessage case Terminated(child) ⇒ stopLatch.countDown() } }).withDispatcher(system.dispatcherFactory.newPinnedDispatcher("boss"))) diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatcherActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatcherActorSpec.scala index 0084cc0ae5..71e1c57642 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatcherActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatcherActorSpec.scala @@ -7,6 +7,7 @@ import akka.dispatch.{ PinnedDispatcher, Dispatchers, Dispatcher } import akka.actor.{ Props, Actor } import akka.util.Duration import akka.util.duration._ +import akka.testkit.DefaultTimeout object DispatcherActorSpec { class TestActor extends Actor { @@ -27,7 +28,7 @@ object DispatcherActorSpec { } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class DispatcherActorSpec extends AkkaSpec { +class DispatcherActorSpec extends AkkaSpec with DefaultTimeout { import DispatcherActorSpec._ private val unit = TimeUnit.MILLISECONDS diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/PinnedActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/PinnedActorSpec.scala index d9feed3209..c692b065e5 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/PinnedActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/PinnedActorSpec.scala @@ -18,7 +18,7 @@ object PinnedActorSpec { } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class PinnedActorSpec extends AkkaSpec with BeforeAndAfterEach { +class PinnedActorSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout { import PinnedActorSpec._ private val unit = TimeUnit.MILLISECONDS diff --git a/akka-actor-tests/src/test/scala/akka/dataflow/Future2Actor.scala b/akka-actor-tests/src/test/scala/akka/dataflow/Future2Actor.scala index 35924a2b14..d684474b16 100644 --- a/akka-actor-tests/src/test/scala/akka/dataflow/Future2Actor.scala +++ b/akka-actor-tests/src/test/scala/akka/dataflow/Future2Actor.scala @@ -8,8 +8,9 @@ import akka.dispatch.Future import akka.actor.future2actor import akka.util.duration._ import akka.testkit.AkkaSpec +import akka.testkit.DefaultTimeout -class Future2ActorSpec extends AkkaSpec { +class Future2ActorSpec extends AkkaSpec with DefaultTimeout { "The Future2Actor bridge" must { diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala index 0261b8fc7e..8177ab6c9d 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala @@ -14,6 +14,7 @@ import java.util.concurrent.{ TimeUnit, CountDownLatch } import akka.testkit.AkkaSpec import org.scalatest.junit.JUnitSuite import java.lang.ArithmeticException +import akka.testkit.DefaultTimeout object FutureSpec { class TestActor extends Actor { @@ -39,7 +40,7 @@ object FutureSpec { class JavaFutureSpec extends JavaFutureTests with JUnitSuite @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll { +class FutureSpec extends AkkaSpec with Checkers with BeforeAndAfterAll with DefaultTimeout { import FutureSpec._ "A Promise" when { diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/PriorityDispatcherSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/PriorityDispatcherSpec.scala index 282835e6fc..eeb6766b4c 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/PriorityDispatcherSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/PriorityDispatcherSpec.scala @@ -3,9 +3,10 @@ package akka.dispatch import akka.actor.{ Props, LocalActorRef, Actor } import akka.testkit.AkkaSpec import akka.util.Duration +import akka.testkit.DefaultTimeout @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class PriorityDispatcherSpec extends AkkaSpec { +class PriorityDispatcherSpec extends AkkaSpec with DefaultTimeout { "A PriorityDispatcher" must { "Order it's messages according to the specified comparator using an unbounded mailbox" in { diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/PromiseStreamSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/PromiseStreamSpec.scala index 4c4c0c9ee7..724beba6bb 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/PromiseStreamSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/PromiseStreamSpec.scala @@ -5,9 +5,10 @@ import akka.util.cps._ import akka.actor.Timeout import akka.util.duration._ import akka.testkit.AkkaSpec +import akka.testkit.DefaultTimeout @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class PromiseStreamSpec extends AkkaSpec { +class PromiseStreamSpec extends AkkaSpec with DefaultTimeout { "A PromiseStream" must { diff --git a/akka-actor-tests/src/test/scala/akka/event/EventStreamSpec.scala b/akka-actor-tests/src/test/scala/akka/event/EventStreamSpec.scala index 031fb1ccb3..9969740bf2 100644 --- a/akka-actor-tests/src/test/scala/akka/event/EventStreamSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/event/EventStreamSpec.scala @@ -25,7 +25,7 @@ object EventStreamSpec { case class SetTarget(ref: ActorRef) class MyLog extends Actor { - var dst: ActorRef = system.deadLetters + var dst: ActorRef = context.system.deadLetters def receive = { case Logging.InitializeLogger(bus) ⇒ bus.subscribe(context.self, classOf[SetTarget]); sender ! Logging.LoggerInitialized case SetTarget(ref) ⇒ dst = ref; dst ! "OK" diff --git a/akka-actor-tests/src/test/scala/akka/routing/ActorPoolSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/ActorPoolSpec.scala index d9503e31b7..93fffae89e 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/ActorPoolSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/ActorPoolSpec.scala @@ -25,7 +25,7 @@ object ActorPoolSpec { } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class TypedActorPoolSpec extends AkkaSpec { +class TypedActorPoolSpec extends AkkaSpec with DefaultTimeout { import ActorPoolSpec._ "Actor Pool (2)" must { "support typed actors" in { @@ -55,7 +55,7 @@ class TypedActorPoolSpec extends AkkaSpec { } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class ActorPoolSpec extends AkkaSpec { +class ActorPoolSpec extends AkkaSpec with DefaultTimeout { import ActorPoolSpec._ "Actor Pool" must { diff --git a/akka-actor-tests/src/test/scala/akka/routing/ConfiguredLocalRoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/ConfiguredLocalRoutingSpec.scala index 575fcfb7fe..a62f6712f0 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/ConfiguredLocalRoutingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/ConfiguredLocalRoutingSpec.scala @@ -7,9 +7,10 @@ import java.util.concurrent.{ CountDownLatch, TimeUnit } import akka.testkit.AkkaSpec import akka.actor.DeploymentConfig._ import akka.routing.Routing.Broadcast +import akka.testkit.DefaultTimeout @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class ConfiguredLocalRoutingSpec extends AkkaSpec { +class ConfiguredLocalRoutingSpec extends AkkaSpec with DefaultTimeout { val deployer = system.asInstanceOf[ActorSystemImpl].provider.deployer diff --git a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala index 6dd5f56577..357f1467a6 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala @@ -20,7 +20,7 @@ object RoutingSpec { } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class RoutingSpec extends AkkaSpec { +class RoutingSpec extends AkkaSpec with DefaultTimeout { val impl = system.asInstanceOf[ActorSystemImpl] diff --git a/akka-actor-tests/src/test/scala/akka/util/IndexSpec.scala b/akka-actor-tests/src/test/scala/akka/util/IndexSpec.scala index 699f7af56c..271f8a9ac0 100644 --- a/akka-actor-tests/src/test/scala/akka/util/IndexSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/util/IndexSpec.scala @@ -7,8 +7,9 @@ import org.scalatest.matchers.MustMatchers import akka.dispatch.Future import akka.testkit.AkkaSpec import scala.util.Random +import akka.testkit.DefaultTimeout -class IndexSpec extends AkkaSpec with MustMatchers { +class IndexSpec extends AkkaSpec with MustMatchers with DefaultTimeout { private def emptyIndex = new Index[String, Int](100, _ compareTo _) diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 0770448c3b..4543a5db27 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -149,11 +149,10 @@ object Timeout { implicit def durationToTimeout(duration: Duration) = new Timeout(duration) implicit def intToTimeout(timeout: Int) = new Timeout(timeout) implicit def longToTimeout(timeout: Long) = new Timeout(timeout) - implicit def defaultTimeout(implicit system: ActorSystem) = system.settings.ActorTimeout } trait ActorLogging { this: Actor ⇒ - val log = akka.event.Logging(system.eventStream, context.self) + val log = akka.event.Logging(context.system.eventStream, context.self) } object Actor { @@ -190,6 +189,7 @@ trait Actor { /** * Stores the context for this actor, including self, sender, and hotswap. + * It is implicit to support operations such as `forward`. */ @transient protected[akka] implicit val context: ActorContext = { @@ -211,13 +211,6 @@ trait Actor { c } - implicit def system = context.system - - /** - * The default timeout, based on the config setting 'akka.actor.timeout' - */ - implicit def defaultTimeout = system.settings.ActorTimeout - /** * The 'self' field holds the ActorRef for this actor. *

@@ -230,34 +223,11 @@ trait Actor { /** * The reference sender Actor of the last received message. - * Is defined if the message was sent from another Actor, else None. + * Is defined if the message was sent from another Actor, + * else `deadLetters` in [[akka.actor.ActorSystem]]. */ final def sender: ActorRef = context.sender - /** - * Gets the current receive timeout - * When specified, the receive method should be able to handle a 'ReceiveTimeout' message. - */ - def receiveTimeout: Option[Long] = context.receiveTimeout - - /** - * User overridable callback/setting. - *

- * Defines the default timeout for an initial receive invocation. - * When specified, the receive function should be able to handle a 'ReceiveTimeout' message. - */ - def receiveTimeout_=(timeout: Option[Long]) = context.receiveTimeout = timeout - - /** - * Same as ActorContext.children - */ - def children: Iterable[ActorRef] = context.children - - /** - * Returns the dispatcher (MessageDispatcher) that is used for this Actor - */ - def dispatcher: MessageDispatcher = context.dispatcher - /** * User overridable callback/setting. *

@@ -325,30 +295,6 @@ trait Actor { } } - /** - * Changes the Actor's behavior to become the new 'Receive' (PartialFunction[Any, Unit]) handler. - * Puts the behavior on top of the hotswap stack. - * If "discardOld" is true, an unbecome will be issued prior to pushing the new behavior to the stack - */ - final def become(behavior: Receive, discardOld: Boolean = true) { context.become(behavior, discardOld) } - - /** - * Reverts the Actor behavior to the previous one in the hotswap stack. - */ - final def unbecome() { context.unbecome() } - - /** - * Registers this actor as a Monitor for the provided ActorRef - * @return the provided ActorRef - */ - final def watch(subject: ActorRef): ActorRef = context startsWatching subject - - /** - * Unregisters this actor as Monitor for the provided ActorRef - * @return the provided ActorRef - */ - final def unwatch(subject: ActorRef): ActorRef = context stopsWatching subject - // ========================================= // ==== INTERNAL IMPLEMENTATION DETAILS ==== // ========================================= diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index c7a37de589..f66e5a95b5 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -8,8 +8,10 @@ import akka.dispatch._ import scala.annotation.tailrec import scala.collection.immutable.{ Stack, TreeMap } import java.util.concurrent.TimeUnit +import java.util.concurrent.TimeUnit.MILLISECONDS import akka.event.Logging.{ Debug, Warning, Error } import akka.util.{ Duration, Helpers } +import akka.japi.Procedure /** * The actor context - the view of the actor cell from the actor. @@ -20,14 +22,30 @@ trait ActorContext extends ActorRefFactory { def self: ActorRef - def receiveTimeout: Option[Long] + /** + * Gets the current receive timeout + * When specified, the receive method should be able to handle a 'ReceiveTimeout' message. + */ + def receiveTimeout: Option[Duration] - def receiveTimeout_=(timeout: Option[Long]): Unit + /** + * Defines the default timeout for an initial receive invocation. + * When specified, the receive function should be able to handle a 'ReceiveTimeout' message. + */ + def receiveTimeout_=(timeout: Option[Duration]): Unit - def become(behavior: Actor.Receive, discardOld: Boolean): Unit + /** + * Changes the Actor's behavior to become the new 'Receive' (PartialFunction[Any, Unit]) handler. + * Puts the behavior on top of the hotswap stack. + * If "discardOld" is true, an unbecome will be issued prior to pushing the new behavior to the stack + */ + def become(behavior: Actor.Receive, discardOld: Boolean = true): Unit def hotswap: Stack[PartialFunction[Any, Unit]] + /** + * Reverts the Actor behavior to the previous one in the hotswap stack. + */ def unbecome(): Unit def currentMessage: Envelope @@ -38,6 +56,9 @@ trait ActorContext extends ActorRefFactory { def children: Iterable[ActorRef] + /** + * Returns the dispatcher (MessageDispatcher) that is used for this Actor + */ def dispatcher: MessageDispatcher def handleFailure(child: ActorRef, cause: Throwable): Unit @@ -48,11 +69,53 @@ trait ActorContext extends ActorRefFactory { def parent: ActorRef + /** + * Registers this actor as a Monitor for the provided ActorRef + * @return the provided ActorRef + */ def startsWatching(subject: ActorRef): ActorRef + /** + * Unregisters this actor as Monitor for the provided ActorRef + * @return the provided ActorRef + */ def stopsWatching(subject: ActorRef): ActorRef } +trait JavaActorContext extends ActorContext { + /** + * Returns an unmodifiable Java Collection containing the linked actors, + * please note that the backing map is thread-safe but not immutable + */ + def getChildren(): java.lang.Iterable[ActorRef] + + /** + * Gets the current receive timeout + * When specified, the receive method should be able to handle a 'ReceiveTimeout' message. + */ + def getReceiveTimeout: Option[Duration] + + /** + * Defines the default timeout for an initial receive invocation. + * When specified, the receive function should be able to handle a 'ReceiveTimeout' message. + */ + def setReceiveTimeout(timeout: Duration): Unit + + /** + * Changes the Actor's behavior to become the new 'Procedure' handler. + * Puts the behavior on top of the hotswap stack. + */ + def become(behavior: Procedure[Any]): Unit + + /** + * Changes the Actor's behavior to become the new 'Procedure' handler. + * Puts the behavior on top of the hotswap stack. + * If "discardOld" is true, an unbecome will be issued prior to pushing the new behavior to the stack + */ + def become(behavior: Procedure[Any], discardOld: Boolean): Unit + +} + private[akka] object ActorCell { val contextStack = new ThreadLocal[Stack[ActorContext]] { override def initialValue = Stack[ActorContext]() @@ -76,8 +139,8 @@ private[akka] class ActorCell( val self: ActorRef with ScalaActorRef, val props: Props, val parent: ActorRef, - /*no member*/ _receiveTimeout: Option[Long], - var hotswap: Stack[PartialFunction[Any, Unit]]) extends ActorContext { + /*no member*/ _receiveTimeout: Option[Duration], + var hotswap: Stack[PartialFunction[Any, Unit]]) extends JavaActorContext { import ActorCell._ @@ -87,15 +150,28 @@ private[akka] class ActorCell( final def provider = system.provider - override def receiveTimeout: Option[Long] = if (receiveTimeoutData._1 > 0) Some(receiveTimeoutData._1) else None + override def receiveTimeout: Option[Duration] = if (receiveTimeoutData._1 > 0) Some(Duration(receiveTimeoutData._1, MILLISECONDS)) else None - override def receiveTimeout_=(timeout: Option[Long]): Unit = { - val timeoutMs = if (timeout.isDefined && timeout.get > 0) timeout.get else -1 + override def receiveTimeout_=(timeout: Option[Duration]): Unit = { + val timeoutMs = if (timeout.isDefined && timeout.get.toMillis > 0) timeout.get.toMillis else -1 receiveTimeoutData = (timeoutMs, receiveTimeoutData._2) } + /** + * In milliseconds + */ var receiveTimeoutData: (Long, Cancellable) = - if (_receiveTimeout.isDefined) (_receiveTimeout.get, emptyCancellable) else emptyReceiveTimeoutData + if (_receiveTimeout.isDefined) (_receiveTimeout.get.toMillis, emptyCancellable) else emptyReceiveTimeoutData + + /** + * JavaActorContext impl + */ + def getReceiveTimeout: Option[Duration] = receiveTimeout + + /** + * JavaActorContext impl + */ + def setReceiveTimeout(timeout: Duration): Unit = receiveTimeout = Some(timeout) var childrenRefs: TreeMap[String, ChildRestartStats] = emptyChildrenRefs @@ -120,6 +196,11 @@ private[akka] class ActorCell( @inline final def dispatcher: MessageDispatcher = if (props.dispatcher == Props.defaultDispatcher) system.dispatcher else props.dispatcher + /** + * JavaActorContext impl + */ + def getDispatcher(): MessageDispatcher = dispatcher + final def isTerminated: Boolean = mailbox.isClosed final def start(): Unit = { @@ -154,6 +235,14 @@ private[akka] class ActorCell( final def children: Iterable[ActorRef] = childrenRefs.values.view.map(_.child) + /** + * Impl JavaActorContext + */ + def getChildren(): java.lang.Iterable[ActorRef] = { + import scala.collection.JavaConverters.asJavaIterableConverter + asJavaIterableConverter(children).asJava + } + final def getChild(name: String): Option[ActorRef] = if (isTerminated) None else childrenRefs.get(name).map(_.child) @@ -341,6 +430,19 @@ private[akka] class ActorCell( hotswap = hotswap.push(behavior) } + /** + * JavaActorContext impl + */ + def become(behavior: Procedure[Any]): Unit = become(behavior, false) + + /* + * JavaActorContext impl + */ + def become(behavior: Procedure[Any], discardOld: Boolean): Unit = { + def newReceive: Actor.Receive = { case msg ⇒ behavior.apply(msg) } + become(newReceive, discardOld) + } + def unbecome() { val h = hotswap if (h.nonEmpty) hotswap = h.pop diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index c3865c001b..a65d344c06 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -139,7 +139,7 @@ class LocalActorRef private[akka] ( _supervisor: ActorRef, val path: ActorPath, val systemService: Boolean = false, - _receiveTimeout: Option[Long] = None, + _receiveTimeout: Option[Duration] = None, _hotswap: Stack[PartialFunction[Any, Unit]] = Props.noHotSwap) extends ActorRef with ScalaActorRef with RefInternals { @@ -216,7 +216,7 @@ class LocalActorRef private[akka] ( /** * This trait represents the Scala Actor API - * There are implicit conversions in ../actor/Implicits.scala + * There are implicit conversions in [[akka.actor]] package object * from ActorRef -> ScalaActorRef and back */ trait ScalaActorRef { ref: ActorRef ⇒ diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index fef236d5fd..d5ab9b5892 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -336,8 +336,7 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor } val dispatcherFactory = new Dispatchers(settings, DefaultDispatcherPrerequisites(eventStream, deadLetterMailbox, scheduler)) - // TODO why implicit val dispatcher? - implicit val dispatcher = dispatcherFactory.defaultGlobalDispatcher + val dispatcher = dispatcherFactory.defaultGlobalDispatcher def terminationFuture: Future[Unit] = provider.terminationFuture def guardian: ActorRef = provider.guardian diff --git a/akka-actor/src/main/scala/akka/actor/FSM.scala b/akka-actor/src/main/scala/akka/actor/FSM.scala index 0dc961ab0b..e4ca89770a 100644 --- a/akka-actor/src/main/scala/akka/actor/FSM.scala +++ b/akka-actor/src/main/scala/akka/actor/FSM.scala @@ -189,7 +189,7 @@ trait FSM[S, D] extends ListenerManagement { type Timeout = Option[Duration] type TransitionHandler = PartialFunction[(S, S), Unit] - val log = Logging(system, context.self) + val log = Logging(context.system, context.self) /** * **************************************** @@ -279,7 +279,7 @@ trait FSM[S, D] extends ListenerManagement { if (timers contains name) { timers(name).cancel } - val timer = Timer(name, msg, repeat, timerGen.next) + val timer = Timer(name, msg, repeat, timerGen.next)(context.system) timer.schedule(self, timeout) timers(name) = timer stay @@ -523,7 +523,7 @@ trait FSM[S, D] extends ListenerManagement { if (timeout.isDefined) { val t = timeout.get if (t.finite_? && t.length >= 0) { - timeoutFuture = Some(system.scheduler.scheduleOnce(t, self, TimeoutMarker(generation))) + timeoutFuture = Some(context.system.scheduler.scheduleOnce(t, self, TimeoutMarker(generation))) } } } @@ -566,7 +566,7 @@ trait LoggingFSM[S, D] extends FSM[S, D] { this: Actor ⇒ def logDepth: Int = 0 - private val debugEvent = system.settings.FsmDebugEvent + private val debugEvent = context.system.settings.FsmDebugEvent private val events = new Array[Event](logDepth) private val states = new Array[AnyRef](logDepth) diff --git a/akka-actor/src/main/scala/akka/actor/IO.scala b/akka-actor/src/main/scala/akka/actor/IO.scala index aeb4e53573..1a92679c4b 100644 --- a/akka-actor/src/main/scala/akka/actor/IO.scala +++ b/akka-actor/src/main/scala/akka/actor/IO.scala @@ -239,7 +239,7 @@ class IOManager(bufferSize: Int = 8192) extends Actor { var worker: IOWorker = _ override def preStart { - worker = new IOWorker(system, self, bufferSize) + worker = new IOWorker(context.system, self, bufferSize) worker.start() } diff --git a/akka-actor/src/main/scala/akka/actor/TypedActor.scala b/akka-actor/src/main/scala/akka/actor/TypedActor.scala index 3ae639e95f..711a4ac235 100644 --- a/akka-actor/src/main/scala/akka/actor/TypedActor.scala +++ b/akka-actor/src/main/scala/akka/actor/TypedActor.scala @@ -292,11 +292,6 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi */ implicit def dispatcher = system.dispatcher - /** - * Returns the default timeout (for a TypedActor) when inside a method call in a TypedActor. - */ - implicit def timeout = system.settings.ActorTimeout - /** * Implementation of TypedActor as an Actor */ @@ -326,7 +321,7 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi def receive = { case m: MethodCall ⇒ TypedActor.selfReference set proxyVar.get - TypedActor.currentSystem set system + TypedActor.currentSystem set context.system try { if (m.isOneWay) m(me) else { diff --git a/akka-actor/src/main/scala/akka/actor/UntypedActor.scala b/akka-actor/src/main/scala/akka/actor/UntypedActor.scala index c0deb28c81..ff05de52b4 100644 --- a/akka-actor/src/main/scala/akka/actor/UntypedActor.scala +++ b/akka-actor/src/main/scala/akka/actor/UntypedActor.scala @@ -58,6 +58,8 @@ abstract class UntypedActor extends Actor { @throws(classOf[Exception]) def onReceive(message: Any): Unit + def getContext(): JavaActorContext = context.asInstanceOf[JavaActorContext] + /** * Returns the 'self' reference. */ @@ -69,43 +71,6 @@ abstract class UntypedActor extends Actor { */ def getSender(): ActorRef = sender - /** - * Gets the current receive timeout - * When specified, the receive method should be able to handle a 'ReceiveTimeout' message. - */ - def getReceiveTimeout: Option[Long] = receiveTimeout - - /** - * Defines the default timeout for an initial receive invocation. - * When specified, the receive function should be able to handle a 'ReceiveTimeout' message. - */ - def setReceiveTimeout(timeout: Long): Unit = receiveTimeout = Some(timeout) - - /** - * Returns an unmodifiable Java Collection containing the linked actors, - * please note that the backing map is thread-safe but not immutable - */ - def getChildren(): java.lang.Iterable[ActorRef] = { - import scala.collection.JavaConverters.asJavaIterableConverter - asJavaIterableConverter(context.children).asJava - } - - /** - * Returns the dispatcher (MessageDispatcher) that is used for this Actor - */ - def getDispatcher(): MessageDispatcher = dispatcher - - /** - * Java API for become - */ - def become(behavior: Procedure[Any]): Unit = become(behavior, false) - - /* - * Java API for become with optional discardOld - */ - def become(behavior: Procedure[Any], discardOld: Boolean): Unit = - super.become({ case msg ⇒ behavior.apply(msg) }, discardOld) - /** * User overridable callback. *

diff --git a/akka-actor/src/main/scala/akka/event/EventStream.scala b/akka-actor/src/main/scala/akka/event/EventStream.scala index 3906d2cb04..ae51c7bc52 100644 --- a/akka-actor/src/main/scala/akka/event/EventStream.scala +++ b/akka-actor/src/main/scala/akka/event/EventStream.scala @@ -49,7 +49,7 @@ class EventStream(debug: Boolean = false) extends LoggingBus with SubchannelClas def start(system: ActorSystemImpl) { reaper = system.systemActorOf(Props(new Actor { def receive = { - case ref: ActorRef ⇒ watch(ref) + case ref: ActorRef ⇒ context.startsWatching(ref) case Terminated(ref) ⇒ unsubscribe(ref) } }), "MainBusReaper") diff --git a/akka-actor/src/main/scala/akka/routing/Pool.scala b/akka-actor/src/main/scala/akka/routing/Pool.scala index 11526fc900..cef7d00bc1 100644 --- a/akka-actor/src/main/scala/akka/routing/Pool.scala +++ b/akka-actor/src/main/scala/akka/routing/Pool.scala @@ -120,7 +120,7 @@ trait DefaultActorPool extends ActorPool { this: Actor ⇒ val requestedCapacity = capacity(_delegates) val newDelegates = requestedCapacity match { case qty if qty > 0 ⇒ - _delegates ++ Vector.fill(requestedCapacity)(watch(instance(defaultProps))) + _delegates ++ Vector.fill(requestedCapacity)(context.startsWatching(instance(defaultProps))) case qty if qty < 0 ⇒ _delegates.splitAt(_delegates.length + requestedCapacity) match { diff --git a/akka-docs/general/code/ConfigDocSpec.scala b/akka-docs/general/code/ConfigDocSpec.scala index 3180fa3b8a..557c33ff53 100644 --- a/akka-docs/general/code/ConfigDocSpec.scala +++ b/akka-docs/general/code/ConfigDocSpec.scala @@ -15,7 +15,7 @@ class ConfigDocSpec extends WordSpec with MustMatchers { //#custom-config val customConf = ConfigFactory.parseString(""" akka.actor.deployment { - /app/my-service { + /user/my-service { router = round-robin nr-of-instances = 3 } diff --git a/akka-docs/scala/code/ActorDocSpec.scala b/akka-docs/scala/code/ActorDocSpec.scala index b5665aede1..0e207fb0d0 100644 --- a/akka-docs/scala/code/ActorDocSpec.scala +++ b/akka-docs/scala/code/ActorDocSpec.scala @@ -13,7 +13,7 @@ import akka.event.Logging //#my-actor class MyActor extends Actor { - val log = Logging(system, this) + val log = Logging(context.system, this) def receive = { case "test" ⇒ log.info("received test") case _ ⇒ log.info("received unknown message") diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/direct_routed/DirectRoutedRemoteActorMultiJvmSpec.scala b/akka-remote/src/multi-jvm/scala/akka/remote/direct_routed/DirectRoutedRemoteActorMultiJvmSpec.scala index f1a6745d91..843a044bd9 100644 --- a/akka-remote/src/multi-jvm/scala/akka/remote/direct_routed/DirectRoutedRemoteActorMultiJvmSpec.scala +++ b/akka-remote/src/multi-jvm/scala/akka/remote/direct_routed/DirectRoutedRemoteActorMultiJvmSpec.scala @@ -10,7 +10,7 @@ object DirectRoutedRemoteActorMultiJvmSpec { class SomeActor extends Actor with Serializable { def receive = { - case "identify" ⇒ sender ! system.nodename + case "identify" ⇒ sender ! context.system.nodename } } } @@ -33,7 +33,7 @@ class DirectRoutedRemoteActorMultiJvmNode1 extends AkkaRemoteSpec { } } -class DirectRoutedRemoteActorMultiJvmNode2 extends AkkaRemoteSpec { +class DirectRoutedRemoteActorMultiJvmNode2 extends AkkaRemoteSpec with DefaultTimeout { import DirectRoutedRemoteActorMultiJvmSpec._ diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/new_remote_actor/NewRemoteActorMultiJvmSpec.scala b/akka-remote/src/multi-jvm/scala/akka/remote/new_remote_actor/NewRemoteActorMultiJvmSpec.scala index 3be4979964..e42594c949 100644 --- a/akka-remote/src/multi-jvm/scala/akka/remote/new_remote_actor/NewRemoteActorMultiJvmSpec.scala +++ b/akka-remote/src/multi-jvm/scala/akka/remote/new_remote_actor/NewRemoteActorMultiJvmSpec.scala @@ -2,13 +2,14 @@ package akka.remote.new_remote_actor import akka.actor.Actor import akka.remote._ +import akka.testkit.DefaultTimeout object NewRemoteActorMultiJvmSpec { val NrOfNodes = 2 class SomeActor extends Actor with Serializable { def receive = { - case "identify" ⇒ sender ! system.nodename + case "identify" ⇒ sender ! context.system.nodename } } } @@ -32,7 +33,7 @@ class NewRemoteActorMultiJvmNode1 extends AkkaRemoteSpec { } } -class NewRemoteActorMultiJvmNode2 extends AkkaRemoteSpec { +class NewRemoteActorMultiJvmNode2 extends AkkaRemoteSpec with DefaultTimeout { import NewRemoteActorMultiJvmSpec._ diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/random_routed/RandomRoutedRemoteActorMultiJvmSpec.scala b/akka-remote/src/multi-jvm/scala/akka/remote/random_routed/RandomRoutedRemoteActorMultiJvmSpec.scala index a5701cccd4..d985c770c2 100644 --- a/akka-remote/src/multi-jvm/scala/akka/remote/random_routed/RandomRoutedRemoteActorMultiJvmSpec.scala +++ b/akka-remote/src/multi-jvm/scala/akka/remote/random_routed/RandomRoutedRemoteActorMultiJvmSpec.scala @@ -4,12 +4,13 @@ import akka.actor.Actor import akka.remote._ import akka.routing._ import akka.routing.Routing.Broadcast +import akka.testkit.DefaultTimeout object RandomRoutedRemoteActorMultiJvmSpec { val NrOfNodes = 4 class SomeActor extends Actor with Serializable { def receive = { - case "hit" ⇒ sender ! system.nodename + case "hit" ⇒ sender ! context.system.nodename case "end" ⇒ self.stop() } } @@ -60,7 +61,7 @@ class RandomRoutedRemoteActorMultiJvmNode3 extends AkkaRemoteSpec { } } -class RandomRoutedRemoteActorMultiJvmNode4 extends AkkaRemoteSpec { +class RandomRoutedRemoteActorMultiJvmNode4 extends AkkaRemoteSpec with DefaultTimeout { import RandomRoutedRemoteActorMultiJvmSpec._ val nodes = NrOfNodes "A new remote actor configured with a Random router" must { diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/round_robin_routed/RoundRobinRoutedRemoteActorMultiJvmSpec.scala b/akka-remote/src/multi-jvm/scala/akka/remote/round_robin_routed/RoundRobinRoutedRemoteActorMultiJvmSpec.scala index 413d7814a5..08c0009f4b 100644 --- a/akka-remote/src/multi-jvm/scala/akka/remote/round_robin_routed/RoundRobinRoutedRemoteActorMultiJvmSpec.scala +++ b/akka-remote/src/multi-jvm/scala/akka/remote/round_robin_routed/RoundRobinRoutedRemoteActorMultiJvmSpec.scala @@ -4,12 +4,13 @@ import akka.actor.Actor import akka.remote._ import akka.routing._ import akka.routing.Routing.Broadcast +import akka.testkit.DefaultTimeout object RoundRobinRoutedRemoteActorMultiJvmSpec { val NrOfNodes = 4 class SomeActor extends Actor with Serializable { def receive = { - case "hit" ⇒ sender ! system.nodename + case "hit" ⇒ sender ! context.system.nodename case "end" ⇒ self.stop() } } @@ -60,7 +61,7 @@ class RoundRobinRoutedRemoteActorMultiJvmNode3 extends AkkaRemoteSpec { } } -class RoundRobinRoutedRemoteActorMultiJvmNode4 extends AkkaRemoteSpec { +class RoundRobinRoutedRemoteActorMultiJvmNode4 extends AkkaRemoteSpec with DefaultTimeout { import RoundRobinRoutedRemoteActorMultiJvmSpec._ val nodes = NrOfNodes "A new remote actor configured with a RoundRobin router" must { diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/scatter_gather_routed/ScatterGatherRoutedRemoteActorMultiJvmSpec.scala b/akka-remote/src/multi-jvm/scala/akka/remote/scatter_gather_routed/ScatterGatherRoutedRemoteActorMultiJvmSpec.scala index 95c5037e8f..764cda813e 100644 --- a/akka-remote/src/multi-jvm/scala/akka/remote/scatter_gather_routed/ScatterGatherRoutedRemoteActorMultiJvmSpec.scala +++ b/akka-remote/src/multi-jvm/scala/akka/remote/scatter_gather_routed/ScatterGatherRoutedRemoteActorMultiJvmSpec.scala @@ -4,12 +4,13 @@ import akka.actor.Actor import akka.remote._ import akka.routing._ import akka.routing.Routing.Broadcast +import akka.testkit.DefaultTimeout object ScatterGatherRoutedRemoteActorMultiJvmSpec { val NrOfNodes = 4 class SomeActor extends Actor with Serializable { def receive = { - case "hit" ⇒ sender ! system.nodename + case "hit" ⇒ sender ! context.system.nodename case "end" ⇒ self.stop() } } @@ -60,7 +61,7 @@ class ScatterGatherRoutedRemoteActorMultiJvmNode3 extends AkkaRemoteSpec { } } -class ScatterGatherRoutedRemoteActorMultiJvmNode4 extends AkkaRemoteSpec { +class ScatterGatherRoutedRemoteActorMultiJvmNode4 extends AkkaRemoteSpec with DefaultTimeout { import ScatterGatherRoutedRemoteActorMultiJvmSpec._ val nodes = NrOfNodes "A new remote actor configured with a ScatterGather router" must { diff --git a/akka-remote/src/test/scala/akka/remote/NetworkFailureSpec.scala b/akka-remote/src/test/scala/akka/remote/NetworkFailureSpec.scala index 56a27079ea..e02b072e2e 100644 --- a/akka-remote/src/test/scala/akka/remote/NetworkFailureSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/NetworkFailureSpec.scala @@ -9,12 +9,13 @@ import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach } import akka.remote.netty.NettyRemoteSupport import akka.actor.Actor import akka.testkit.AkkaSpec +import akka.testkit.DefaultTimeout import akka.dispatch.Future import java.util.concurrent.{ TimeUnit, CountDownLatch } import java.util.concurrent.atomic.AtomicBoolean -trait NetworkFailureSpec { self: AkkaSpec ⇒ +trait NetworkFailureSpec extends DefaultTimeout { self: AkkaSpec ⇒ import Actor._ import akka.util.Duration diff --git a/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnBecome.scala b/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnBecome.scala index b574cd8888..d7f815f4f2 100644 --- a/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnBecome.scala +++ b/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnBecome.scala @@ -25,6 +25,8 @@ object Think extends DiningHakkerMessage */ class Chopstick extends Actor { + import context._ + //When a Chopstick is taken by a hakker //It will refuse to be taken by other hakkers //But the owning hakker can put it back @@ -51,6 +53,8 @@ class Chopstick extends Actor { */ class Hakker(name: String, left: ActorRef, right: ActorRef) extends Actor { + import context._ + //When a hakker is thinking it can become hungry //and try to pick up its chopsticks and eat def thinking: Receive = { diff --git a/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnFsm.scala b/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnFsm.scala index b9cea1fe8f..611e99c6d6 100644 --- a/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnFsm.scala +++ b/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnFsm.scala @@ -33,6 +33,7 @@ case class TakenBy(hakker: ActorRef) * A chopstick is an actor, it can be taken, and put back */ class Chopstick extends Actor with FSM[ChopstickState, TakenBy] { + import context._ // A chopstick begins its existence as available and taken by no one startWith(Available, TakenBy(system.deadLetters)) diff --git a/akka-samples/akka-sample-hello/src/main/scala/sample/hello/Main.scala b/akka-samples/akka-sample-hello/src/main/scala/sample/hello/Main.scala index 5df2661800..65f530c077 100644 --- a/akka-samples/akka-sample-hello/src/main/scala/sample/hello/Main.scala +++ b/akka-samples/akka-sample-hello/src/main/scala/sample/hello/Main.scala @@ -15,12 +15,12 @@ object Main { } class HelloActor extends Actor { - val worldActor = system.actorOf[WorldActor] + val worldActor = context.actorOf[WorldActor] def receive = { case Start ⇒ worldActor ! "Hello" case s: String ⇒ println("Received message: %s".format(s)) - system.stop() + context.system.stop() } } diff --git a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala index c0476a74cc..8e84050415 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala @@ -613,3 +613,7 @@ object TestProbe { trait ImplicitSender { this: TestKit ⇒ implicit def self = testActor } + +trait DefaultTimeout { this: TestKit ⇒ + implicit val timeout = system.settings.ActorTimeout +} diff --git a/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala b/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala index 12096a61b1..519301191a 100644 --- a/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala @@ -37,6 +37,7 @@ object TestActorRefSpec { } class ReplyActor extends TActor { + implicit val system = context.system var replyTo: ActorRef = null def receiveT = { @@ -87,7 +88,7 @@ object TestActorRefSpec { } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class TestActorRefSpec extends AkkaSpec with BeforeAndAfterEach { +class TestActorRefSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout { import TestActorRefSpec._ @@ -156,7 +157,7 @@ class TestActorRefSpec extends AkkaSpec with BeforeAndAfterEach { EventFilter[ActorKilledException]() intercept { val a = TestActorRef(Props[WorkerActor]) val forwarder = actorOf(Props(new Actor { - watch(a) + context.startsWatching(a) def receive = { case x ⇒ testActor forward x } })) a.!(PoisonPill)(testActor) @@ -216,7 +217,7 @@ class TestActorRefSpec extends AkkaSpec with BeforeAndAfterEach { "set receiveTimeout to None" in { val a = TestActorRef[WorkerActor] - a.underlyingActor.receiveTimeout must be(None) + a.underlyingActor.context.receiveTimeout must be(None) } "set CallingThreadDispatcher" in { diff --git a/akka-testkit/src/test/scala/akka/testkit/TestProbeSpec.scala b/akka-testkit/src/test/scala/akka/testkit/TestProbeSpec.scala index e428769d74..5e2d775195 100644 --- a/akka-testkit/src/test/scala/akka/testkit/TestProbeSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/TestProbeSpec.scala @@ -8,7 +8,7 @@ import akka.dispatch.Future import akka.util.duration._ @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class TestProbeSpec extends AkkaSpec { +class TestProbeSpec extends AkkaSpec with DefaultTimeout { "A TestProbe" must { diff --git a/akka-tutorials/akka-tutorial-first/src/main/java/akka/tutorial/first/java/Pi.java b/akka-tutorials/akka-tutorial-first/src/main/java/akka/tutorial/first/java/Pi.java index 5a80699ade..909d1e5435 100644 --- a/akka-tutorials/akka-tutorial-first/src/main/java/akka/tutorial/first/java/Pi.java +++ b/akka-tutorials/akka-tutorial-first/src/main/java/akka/tutorial/first/java/Pi.java @@ -106,16 +106,16 @@ public class Pi { this.latch = latch; Creator routerCreator = new Creator() { public Router create() { - return new RoundRobinRouter(dispatcher(), new akka.actor.Timeout(-1)); + return new RoundRobinRouter(getContext().dispatcher(), new akka.actor.Timeout(-1)); } }; LinkedList actors = new LinkedList() { { - for (int i = 0; i < nrOfWorkers; i++) add(context().actorOf(Worker.class)); + for (int i = 0; i < nrOfWorkers; i++) add(getContext().actorOf(Worker.class)); } }; RoutedProps props = new RoutedProps(routerCreator, new LocalConnectionManager(actors), new akka.actor.Timeout(-1), true); - router = new RoutedActorRef(system(), props, getSelf(), "pi"); + router = new RoutedActorRef(getContext().system(), props, getSelf(), "pi"); } // message handler diff --git a/akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala b/akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala index 3283a591f4..2288af6cf9 100644 --- a/akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala +++ b/akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala @@ -52,11 +52,14 @@ object Pi extends App { var start: Long = _ // create the workers - val workers = Vector.fill(nrOfWorkers)(system.actorOf[Worker]) + val workers = Vector.fill(nrOfWorkers)(context.actorOf[Worker]) // wrap them with a load-balancing router + // FIXME routers are intended to be used like this + implicit val timout = context.system.settings.ActorTimeout + implicit val dispatcher = context.dispatcher val props = RoutedProps(routerFactory = () ⇒ new RoundRobinRouter, connectionManager = new LocalConnectionManager(workers)) - val router = new RoutedActorRef(system, props, self, "pi") + val router = new RoutedActorRef(context.system, props, self, "pi") // message handler def receive = {