Merge pull request #338 from jboner/wip-1845-monitor-typed-actors-√
Wip 1845 monitor typed actors √
This commit is contained in:
commit
a0bb415393
4 changed files with 72 additions and 24 deletions
|
|
@ -12,13 +12,13 @@ import java.util.concurrent.atomic.AtomicReference
|
|||
import annotation.tailrec
|
||||
import akka.testkit.{ EventFilter, filterEvents, AkkaSpec }
|
||||
import akka.serialization.SerializationExtension
|
||||
import akka.actor.TypedActor.{ PostRestart, PreRestart, PostStop, PreStart }
|
||||
import java.util.concurrent.{ TimeUnit, CountDownLatch }
|
||||
import akka.japi.{ Creator, Option ⇒ JOption }
|
||||
import akka.testkit.DefaultTimeout
|
||||
import akka.dispatch.{ Await, Dispatchers, Future, Promise }
|
||||
import akka.pattern.ask
|
||||
import akka.serialization.JavaSerializer
|
||||
import akka.actor.TypedActor._
|
||||
|
||||
object TypedActorSpec {
|
||||
|
||||
|
|
@ -160,7 +160,7 @@ object TypedActorSpec {
|
|||
def crash(): Unit
|
||||
}
|
||||
|
||||
class LifeCyclesImpl(val latch: CountDownLatch) extends PreStart with PostStop with PreRestart with PostRestart with LifeCycles {
|
||||
class LifeCyclesImpl(val latch: CountDownLatch) extends PreStart with PostStop with PreRestart with PostRestart with LifeCycles with Receiver {
|
||||
|
||||
override def crash(): Unit = throw new IllegalStateException("Crash!")
|
||||
|
||||
|
|
@ -171,6 +171,12 @@ object TypedActorSpec {
|
|||
override def preRestart(reason: Throwable, message: Option[Any]): Unit = for (i ← 1 to 5) latch.countDown()
|
||||
|
||||
override def postRestart(reason: Throwable): Unit = for (i ← 1 to 7) latch.countDown()
|
||||
|
||||
override def onReceive(msg: Any, sender: ActorRef): Unit = {
|
||||
msg match {
|
||||
case "pigdog" ⇒ sender ! "dogpig"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -415,6 +421,16 @@ class TypedActorSpec extends AkkaSpec(TypedActorSpec.config)
|
|||
EventFilter[IllegalStateException]("Crash!", occurrences = 1) intercept {
|
||||
t.crash()
|
||||
}
|
||||
|
||||
//Sneak in a check for the Receiver override
|
||||
val ref = ta getActorRefFor t
|
||||
|
||||
ref.tell("pigdog", testActor)
|
||||
|
||||
expectMsg(timeout.duration, "dogpig")
|
||||
|
||||
//Done with that now
|
||||
|
||||
ta.poisonPill(t)
|
||||
latch.await(10, TimeUnit.SECONDS) must be === true
|
||||
}
|
||||
|
|
|
|||
|
|
@ -256,33 +256,40 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi
|
|||
case _ ⇒ super.postRestart(reason)
|
||||
}
|
||||
|
||||
protected def withContext[T](unitOfWork: ⇒ T): T = {
|
||||
TypedActor.selfReference set proxyVar.get
|
||||
TypedActor.currentContext set context
|
||||
try unitOfWork finally {
|
||||
TypedActor.selfReference set null
|
||||
TypedActor.currentContext set null
|
||||
}
|
||||
}
|
||||
|
||||
def receive = {
|
||||
case m: MethodCall ⇒
|
||||
TypedActor.selfReference set proxyVar.get
|
||||
TypedActor.currentContext set context
|
||||
try {
|
||||
if (m.isOneWay) m(me)
|
||||
else {
|
||||
try {
|
||||
if (m.returnsFuture_?) {
|
||||
val s = sender
|
||||
m(me).asInstanceOf[Future[Any]] onComplete {
|
||||
case Left(f) ⇒ s ! Status.Failure(f)
|
||||
case Right(r) ⇒ s ! r
|
||||
}
|
||||
} else {
|
||||
sender ! m(me)
|
||||
case m: MethodCall ⇒ withContext {
|
||||
if (m.isOneWay) m(me)
|
||||
else {
|
||||
try {
|
||||
if (m.returnsFuture_?) {
|
||||
val s = sender
|
||||
m(me).asInstanceOf[Future[Any]] onComplete {
|
||||
case Left(f) ⇒ s ! Status.Failure(f)
|
||||
case Right(r) ⇒ s ! r
|
||||
}
|
||||
} catch {
|
||||
case NonFatal(e) ⇒
|
||||
sender ! Status.Failure(e)
|
||||
throw e
|
||||
} else {
|
||||
sender ! m(me)
|
||||
}
|
||||
} catch {
|
||||
case NonFatal(e) ⇒
|
||||
sender ! Status.Failure(e)
|
||||
throw e
|
||||
}
|
||||
} finally {
|
||||
TypedActor.selfReference set null
|
||||
TypedActor.currentContext set null
|
||||
}
|
||||
}
|
||||
|
||||
case msg if me.isInstanceOf[Receiver] ⇒ withContext {
|
||||
me.asInstanceOf[Receiver].onReceive(msg, sender)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -297,6 +304,13 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi
|
|||
def supervisorStrategy(): SupervisorStrategy
|
||||
}
|
||||
|
||||
/**
|
||||
* Mix this into your TypedActor to be able to intercept Terminated messages
|
||||
*/
|
||||
trait Receiver {
|
||||
def onReceive(message: Any, sender: ActorRef): Unit
|
||||
}
|
||||
|
||||
/**
|
||||
* Mix this into your TypedActor to be able to hook into its lifecycle
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -160,6 +160,15 @@ By having your Typed Actor implementation class implement ``TypedActor.Superviso
|
|||
you can define the strategy to use for supervising child actors, as described in
|
||||
:ref:`supervision` and :ref:`fault-tolerance-java`.
|
||||
|
||||
Receive arbitrary messages
|
||||
--------------------------
|
||||
|
||||
If your implementation class of your TypedActor extends ``akka.actor.TypedActor.Receiver``,
|
||||
all messages that are not ``MethodCall``s will be passed into the ``onReceive``-method.
|
||||
|
||||
This allows you to react to DeathWatch ``Terminated``-messages and other types of messages,
|
||||
e.g. when interfacing with untyped actors.
|
||||
|
||||
Lifecycle callbacks
|
||||
-------------------
|
||||
|
||||
|
|
|
|||
|
|
@ -172,6 +172,15 @@ By having your Typed Actor implementation class implement any and all of the fol
|
|||
|
||||
You can hook into the lifecycle of your Typed Actor.
|
||||
|
||||
Receive arbitrary messages
|
||||
--------------------------
|
||||
|
||||
If your implementation class of your TypedActor extends ``akka.actor.TypedActor.Receiver``,
|
||||
all messages that are not ``MethodCall``s will be passed into the ``onReceive``-method.
|
||||
|
||||
This allows you to react to DeathWatch ``Terminated``-messages and other types of messages,
|
||||
e.g. when interfacing with untyped actors.
|
||||
|
||||
Supercharging
|
||||
-------------
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue