From 95b1590cd9aef0d4ca3513c9879ac37ded376130 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Sun, 19 Feb 2012 18:25:22 +0100 Subject: [PATCH 1/5] Adding support for DeathWatcher for TypedActors --- .../src/main/scala/akka/actor/TypedActor.scala | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/akka-actor/src/main/scala/akka/actor/TypedActor.scala b/akka-actor/src/main/scala/akka/actor/TypedActor.scala index da375a5f21..07c494fec9 100644 --- a/akka-actor/src/main/scala/akka/actor/TypedActor.scala +++ b/akka-actor/src/main/scala/akka/actor/TypedActor.scala @@ -283,6 +283,9 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi TypedActor.selfReference set null TypedActor.currentContext set null } + + case t: Terminated if me.isInstanceOf[DeathWatcher] => + me.asInstanceOf[DeathWatcher].onTermination(t.actor) } } @@ -297,6 +300,18 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi def supervisorStrategy(): SupervisorStrategy } + /** + * Mix this into your TypedActor to be able to intercept Terminated messages + */ + trait DeathWatcher { + + /** + * User overridable callback to intercept Terminated messages. + * @param actor + */ + def onTermination(actor: ActorRef): Unit + } + /** * Mix this into your TypedActor to be able to hook into its lifecycle */ From ff294986691dfe69835b560d07b091797eb936a9 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Sun, 19 Feb 2012 22:28:54 +0100 Subject: [PATCH 2/5] Adding support for Receiver and yanking out DeathWatcher --- .../main/scala/akka/actor/TypedActor.scala | 71 +++++++++---------- 1 file changed, 35 insertions(+), 36 deletions(-) diff --git a/akka-actor/src/main/scala/akka/actor/TypedActor.scala b/akka-actor/src/main/scala/akka/actor/TypedActor.scala index 07c494fec9..3d1fd7bed3 100644 --- a/akka-actor/src/main/scala/akka/actor/TypedActor.scala +++ b/akka-actor/src/main/scala/akka/actor/TypedActor.scala @@ -256,36 +256,40 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi case _ ⇒ super.postRestart(reason) } - def receive = { - case m: MethodCall ⇒ - TypedActor.selfReference set proxyVar.get - TypedActor.currentContext set context - try { - if (m.isOneWay) m(me) - else { - try { - if (m.returnsFuture_?) { - val s = sender - m(me).asInstanceOf[Future[Any]] onComplete { - case Left(f) ⇒ s ! Status.Failure(f) - case Right(r) ⇒ s ! r - } - } else { - sender ! m(me) - } - } catch { - case NonFatal(e) ⇒ - sender ! Status.Failure(e) - throw e - } - } - } finally { - TypedActor.selfReference set null - TypedActor.currentContext set null - } + protected def withContext[T](unitOfWork: ⇒ T): T = { + TypedActor.selfReference set proxyVar.get + TypedActor.currentContext set context + try unitOfWork finally { + TypedActor.selfReference set null + TypedActor.currentContext set null + } + } - case t: Terminated if me.isInstanceOf[DeathWatcher] => - me.asInstanceOf[DeathWatcher].onTermination(t.actor) + def receive = { + case m: MethodCall ⇒ withContext { + if (m.isOneWay) m(me) + else { + try { + if (m.returnsFuture_?) { + val s = sender + m(me).asInstanceOf[Future[Any]] onComplete { + case Left(f) ⇒ s ! Status.Failure(f) + case Right(r) ⇒ s ! r + } + } else { + sender ! m(me) + } + } catch { + case NonFatal(e) ⇒ + sender ! Status.Failure(e) + throw e + } + } + } + + case msg if me.isInstanceOf[Receiver] ⇒ withContext { + me.asInstanceOf[Receiver].onReceive(msg, sender) + } } } @@ -303,13 +307,8 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi /** * Mix this into your TypedActor to be able to intercept Terminated messages */ - trait DeathWatcher { - - /** - * User overridable callback to intercept Terminated messages. - * @param actor - */ - def onTermination(actor: ActorRef): Unit + trait Receiver { + def onReceive(message: Any, sender: ActorRef): Unit } /** From ccae535c798e64aa34ddbaee15aad98edea39fa4 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Mon, 20 Feb 2012 00:44:56 +0100 Subject: [PATCH 3/5] Adding testcase for Receiver --- .../scala/akka/actor/TypedActorSpec.scala | 20 +++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala index 5a9fab6c63..26f510d08a 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala @@ -12,13 +12,13 @@ import java.util.concurrent.atomic.AtomicReference import annotation.tailrec import akka.testkit.{ EventFilter, filterEvents, AkkaSpec } import akka.serialization.SerializationExtension -import akka.actor.TypedActor.{ PostRestart, PreRestart, PostStop, PreStart } import java.util.concurrent.{ TimeUnit, CountDownLatch } import akka.japi.{ Creator, Option ⇒ JOption } import akka.testkit.DefaultTimeout import akka.dispatch.{ Await, Dispatchers, Future, Promise } import akka.pattern.ask import akka.serialization.JavaSerializer +import akka.actor.TypedActor._ object TypedActorSpec { @@ -160,7 +160,7 @@ object TypedActorSpec { def crash(): Unit } - class LifeCyclesImpl(val latch: CountDownLatch) extends PreStart with PostStop with PreRestart with PostRestart with LifeCycles { + class LifeCyclesImpl(val latch: CountDownLatch) extends PreStart with PostStop with PreRestart with PostRestart with LifeCycles with Receiver { override def crash(): Unit = throw new IllegalStateException("Crash!") @@ -171,6 +171,12 @@ object TypedActorSpec { override def preRestart(reason: Throwable, message: Option[Any]): Unit = for (i ← 1 to 5) latch.countDown() override def postRestart(reason: Throwable): Unit = for (i ← 1 to 7) latch.countDown() + + override def onReceive(msg: Any, sender: ActorRef): Unit = { + msg match { + case "pigdog" ⇒ sender ! "dogpig" + } + } } } @@ -415,6 +421,16 @@ class TypedActorSpec extends AkkaSpec(TypedActorSpec.config) EventFilter[IllegalStateException]("Crash!", occurrences = 1) intercept { t.crash() } + + //Sneak in a check for the Receiver override + val ref = ta getActorRefFor t + + ref.tell("pigdog", testActor) + + expectMsg(timeout.duration, "dogpig") + + //Done with that now + ta.poisonPill(t) latch.await(10, TimeUnit.SECONDS) must be === true } From 7c43d049a9a6c749a47757374037439a1340fdbf Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Mon, 20 Feb 2012 16:25:01 +0100 Subject: [PATCH 4/5] Adding Receiver-docs --- akka-docs/java/typed-actors.rst | 8 ++++++++ akka-docs/scala/typed-actors.rst | 8 ++++++++ 2 files changed, 16 insertions(+) diff --git a/akka-docs/java/typed-actors.rst b/akka-docs/java/typed-actors.rst index d6b016e3b5..a54e5767d9 100644 --- a/akka-docs/java/typed-actors.rst +++ b/akka-docs/java/typed-actors.rst @@ -160,6 +160,14 @@ By having your Typed Actor implementation class implement ``TypedActor.Superviso you can define the strategy to use for supervising child actors, as described in :ref:`supervision` and :ref:`fault-tolerance-java`. +Receive arbitrary messages +-------------------------- + +If your implementation class of your TypedActor extends ``akka.actor.TypedActor.Receiver``, +all messages that are not ``MethodCall``s will be passed into the ``onReceive``-method. + +This allows you to react to DeathWatch ``Terminated``-messages and other typed of messages. + Lifecycle callbacks ------------------- diff --git a/akka-docs/scala/typed-actors.rst b/akka-docs/scala/typed-actors.rst index ba326b8a27..4dee51a920 100644 --- a/akka-docs/scala/typed-actors.rst +++ b/akka-docs/scala/typed-actors.rst @@ -172,6 +172,14 @@ By having your Typed Actor implementation class implement any and all of the fol You can hook into the lifecycle of your Typed Actor. +Receive arbitrary messages +-------------------------- + +If your implementation class of your TypedActor extends ``akka.actor.TypedActor.Receiver``, +all messages that are not ``MethodCall``s will be passed into the ``onReceive``-method. + +This allows you to react to DeathWatch ``Terminated``-messages and other typed of messages. + Supercharging ------------- From a2a9996ee6db5bc7f9f2819d60f746c2edea37c7 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Mon, 20 Feb 2012 22:02:15 +0100 Subject: [PATCH 5/5] Fixing typo --- akka-docs/java/typed-actors.rst | 3 ++- akka-docs/scala/typed-actors.rst | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/akka-docs/java/typed-actors.rst b/akka-docs/java/typed-actors.rst index a54e5767d9..8d5a13b8b2 100644 --- a/akka-docs/java/typed-actors.rst +++ b/akka-docs/java/typed-actors.rst @@ -166,7 +166,8 @@ Receive arbitrary messages If your implementation class of your TypedActor extends ``akka.actor.TypedActor.Receiver``, all messages that are not ``MethodCall``s will be passed into the ``onReceive``-method. -This allows you to react to DeathWatch ``Terminated``-messages and other typed of messages. +This allows you to react to DeathWatch ``Terminated``-messages and other types of messages, +e.g. when interfacing with untyped actors. Lifecycle callbacks ------------------- diff --git a/akka-docs/scala/typed-actors.rst b/akka-docs/scala/typed-actors.rst index 4dee51a920..2d47063664 100644 --- a/akka-docs/scala/typed-actors.rst +++ b/akka-docs/scala/typed-actors.rst @@ -178,7 +178,8 @@ Receive arbitrary messages If your implementation class of your TypedActor extends ``akka.actor.TypedActor.Receiver``, all messages that are not ``MethodCall``s will be passed into the ``onReceive``-method. -This allows you to react to DeathWatch ``Terminated``-messages and other typed of messages. +This allows you to react to DeathWatch ``Terminated``-messages and other types of messages, +e.g. when interfacing with untyped actors. Supercharging -------------