diff --git a/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala index 5a9fab6c63..26f510d08a 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala @@ -12,13 +12,13 @@ import java.util.concurrent.atomic.AtomicReference import annotation.tailrec import akka.testkit.{ EventFilter, filterEvents, AkkaSpec } import akka.serialization.SerializationExtension -import akka.actor.TypedActor.{ PostRestart, PreRestart, PostStop, PreStart } import java.util.concurrent.{ TimeUnit, CountDownLatch } import akka.japi.{ Creator, Option ⇒ JOption } import akka.testkit.DefaultTimeout import akka.dispatch.{ Await, Dispatchers, Future, Promise } import akka.pattern.ask import akka.serialization.JavaSerializer +import akka.actor.TypedActor._ object TypedActorSpec { @@ -160,7 +160,7 @@ object TypedActorSpec { def crash(): Unit } - class LifeCyclesImpl(val latch: CountDownLatch) extends PreStart with PostStop with PreRestart with PostRestart with LifeCycles { + class LifeCyclesImpl(val latch: CountDownLatch) extends PreStart with PostStop with PreRestart with PostRestart with LifeCycles with Receiver { override def crash(): Unit = throw new IllegalStateException("Crash!") @@ -171,6 +171,12 @@ object TypedActorSpec { override def preRestart(reason: Throwable, message: Option[Any]): Unit = for (i ← 1 to 5) latch.countDown() override def postRestart(reason: Throwable): Unit = for (i ← 1 to 7) latch.countDown() + + override def onReceive(msg: Any, sender: ActorRef): Unit = { + msg match { + case "pigdog" ⇒ sender ! "dogpig" + } + } } } @@ -415,6 +421,16 @@ class TypedActorSpec extends AkkaSpec(TypedActorSpec.config) EventFilter[IllegalStateException]("Crash!", occurrences = 1) intercept { t.crash() } + + //Sneak in a check for the Receiver override + val ref = ta getActorRefFor t + + ref.tell("pigdog", testActor) + + expectMsg(timeout.duration, "dogpig") + + //Done with that now + ta.poisonPill(t) latch.await(10, TimeUnit.SECONDS) must be === true } diff --git a/akka-actor/src/main/scala/akka/actor/TypedActor.scala b/akka-actor/src/main/scala/akka/actor/TypedActor.scala index 85306c7203..279876af7b 100644 --- a/akka-actor/src/main/scala/akka/actor/TypedActor.scala +++ b/akka-actor/src/main/scala/akka/actor/TypedActor.scala @@ -256,33 +256,40 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi case _ ⇒ super.postRestart(reason) } + protected def withContext[T](unitOfWork: ⇒ T): T = { + TypedActor.selfReference set proxyVar.get + TypedActor.currentContext set context + try unitOfWork finally { + TypedActor.selfReference set null + TypedActor.currentContext set null + } + } + def receive = { - case m: MethodCall ⇒ - TypedActor.selfReference set proxyVar.get - TypedActor.currentContext set context - try { - if (m.isOneWay) m(me) - else { - try { - if (m.returnsFuture_?) { - val s = sender - m(me).asInstanceOf[Future[Any]] onComplete { - case Left(f) ⇒ s ! Status.Failure(f) - case Right(r) ⇒ s ! r - } - } else { - sender ! m(me) + case m: MethodCall ⇒ withContext { + if (m.isOneWay) m(me) + else { + try { + if (m.returnsFuture_?) { + val s = sender + m(me).asInstanceOf[Future[Any]] onComplete { + case Left(f) ⇒ s ! Status.Failure(f) + case Right(r) ⇒ s ! r } - } catch { - case NonFatal(e) ⇒ - sender ! Status.Failure(e) - throw e + } else { + sender ! m(me) } + } catch { + case NonFatal(e) ⇒ + sender ! Status.Failure(e) + throw e } - } finally { - TypedActor.selfReference set null - TypedActor.currentContext set null } + } + + case msg if me.isInstanceOf[Receiver] ⇒ withContext { + me.asInstanceOf[Receiver].onReceive(msg, sender) + } } } @@ -297,6 +304,13 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi def supervisorStrategy(): SupervisorStrategy } + /** + * Mix this into your TypedActor to be able to intercept Terminated messages + */ + trait Receiver { + def onReceive(message: Any, sender: ActorRef): Unit + } + /** * Mix this into your TypedActor to be able to hook into its lifecycle */ diff --git a/akka-docs/java/typed-actors.rst b/akka-docs/java/typed-actors.rst index d6b016e3b5..8d5a13b8b2 100644 --- a/akka-docs/java/typed-actors.rst +++ b/akka-docs/java/typed-actors.rst @@ -160,6 +160,15 @@ By having your Typed Actor implementation class implement ``TypedActor.Superviso you can define the strategy to use for supervising child actors, as described in :ref:`supervision` and :ref:`fault-tolerance-java`. +Receive arbitrary messages +-------------------------- + +If your implementation class of your TypedActor extends ``akka.actor.TypedActor.Receiver``, +all messages that are not ``MethodCall``s will be passed into the ``onReceive``-method. + +This allows you to react to DeathWatch ``Terminated``-messages and other types of messages, +e.g. when interfacing with untyped actors. + Lifecycle callbacks ------------------- diff --git a/akka-docs/scala/typed-actors.rst b/akka-docs/scala/typed-actors.rst index ba326b8a27..2d47063664 100644 --- a/akka-docs/scala/typed-actors.rst +++ b/akka-docs/scala/typed-actors.rst @@ -172,6 +172,15 @@ By having your Typed Actor implementation class implement any and all of the fol You can hook into the lifecycle of your Typed Actor. +Receive arbitrary messages +-------------------------- + +If your implementation class of your TypedActor extends ``akka.actor.TypedActor.Receiver``, +all messages that are not ``MethodCall``s will be passed into the ``onReceive``-method. + +This allows you to react to DeathWatch ``Terminated``-messages and other types of messages, +e.g. when interfacing with untyped actors. + Supercharging -------------