=con #17670 Fix potential ReceivePipeline MatchError

This commit is contained in:
Jeremy.Stone 2015-08-18 17:55:02 +01:00
parent b5c69ac854
commit b884dde6ee
3 changed files with 391 additions and 95 deletions

View file

@ -2,8 +2,8 @@
Receive Pipeline Pattern
========================
Receive Pipeline Pattern lets you define general interceptors for your messages
and plug any arbitrary amount of them to your Actors.
The Receive Pipeline Pattern lets you define general interceptors for your messages
and plug an arbitrary amount of them into your Actors.
It's useful for defining cross aspects to be applied to all or many of your Actors.
Some Possible Use Cases
@ -16,33 +16,44 @@ Some Possible Use Cases
Interceptors
------------
So how does an interceptor look? Well, Interceptors are defined by decorator functions
of type :class:`Receive => Receive`, where it gets the inner :class:`Receive` by parameter and
returns a new :class:`Receive` with the decoration applied.
Most of the times your decorators will be defined as a regular :class:`Receive` with cases
for the messages of your interest and at some point delegate on the inner :class:`Receive`
you get by parameter. We will talk about ignored and unhandled messages later.
Multiple interceptors can be added to actors that mixin the :class:`ReceivePipeline` trait.
These interceptors internally define layers of decorators around the actor's behavior. The first interceptor
defines an outer decorator which delegates to a decorator corresponding to the second interceptor and so on,
until the last interceptor which defines a decorator for the actor's :class:`Receive`.
The first or outermost interceptor receives messages sent to the actor.
For each message received by an interceptor, the interceptor will typically perform some
processing based on the message and decide whether
or not to pass the received message (or some other message) to the next interceptor.
An :class:`Interceptor` is a type alias for
:class:`PartialFunction[Any, Delegation]`. The :class:`Any` input is the message
it receives from the previous interceptor (or, in the case of the first interceptor,
the message that was sent to the actor).
The :class:`Delegation` return type is used to control what (if any)
message is passed on to the next interceptor.
A simple example
----------------
We have the simplest :class:`Receive` possible, it just prints any type of message, and we decorate
it to create an interceptor that increments :class:`Int` messages and delegates the result to the
inner (printer) :class:`Receive`.
To pass a transformed message to the actor
(or next inner interceptor) an interceptor can return :class:`Inner(newMsg)` where :class:`newMsg` is the transformed message.
.. includecode:: @contribSrc@/src/test/scala/akka/contrib/pattern/ReceivePipelineSpec.scala#interceptor
The following interceptor shows this. It intercepts :class:`Int` messages,
adds one to them and passes on the incremented value to the next interceptor.
.. includecode:: @contribSrc@/src/test/scala/akka/contrib/pattern/ReceivePipelineSpec.scala#interceptor-sample1
Building the Pipeline
---------------------
So we're done defining decorators which will create the interceptors. Now we will see
how to plug them into the Actors to construct the pipeline.
To give your Actor the ability to pipeline the receive, you'll need to mixin with the
To give your Actor the ability to pipeline received messages in this way, you'll need to mixin with the
:class:`ReceivePipeline` trait. It has two methods for controlling the pipeline, :class:`pipelineOuter`
and :class:`pipelineInner`, both receiving a decorator function. The first one adds the interceptor at the
and :class:`pipelineInner`, both receiving an :class:`Interceptor`.
The first one adds the interceptor at the
beginning of the pipeline and the second one adds it at the end, just before the current
Actor's behavior.
In this example we mixin our Actor with :class:`ReceivePipeline` trait and
In this example we mixin our Actor with the :class:`ReceivePipeline` trait and
we add :class:`Increment` and :class:`Double` interceptors with :class:`pipelineInner`.
They will be applied in this very order.
@ -62,21 +73,23 @@ Let's see it in an example. We have the following model:
.. includecode:: @contribSrc@/src/test/scala/akka/contrib/pattern/ReceivePipelineSpec.scala#mixin-model
And this two interceptors defined each one in its own trait. The first one intercepts any messages having
and these two interceptors defined, each one in its own trait:
.. includecode:: @contribSrc@/src/test/scala/akka/contrib/pattern/ReceivePipelineSpec.scala#mixin-interceptors
The first one intercepts any messages having
an internationalized text and replaces it with the resolved text before resuming with the chain. The second one
intercepts any message with an author defined and prints it before resuming the chain with the message unchanged.
But since :class:`I18n` adds the interceptor with :class:`pipelineInner` and :class:`Audit` adds it with
:class:`pipelineOuter`, the audit will happen before the internationalization.
.. includecode:: @contribSrc@/src/test/scala/akka/contrib/pattern/ReceivePipelineSpec.scala#mixin-interceptors
So if we mixin both interceptors in our Actor, we will see the following output for these example messages.
So if we mixin both interceptors in our Actor, we will see the following output for these example messages:
.. includecode:: @contribSrc@/src/test/scala/akka/contrib/pattern/ReceivePipelineSpec.scala#mixin-actor
Unhandled Messages
------------------
With all that behaviors chaining occurring on, what happens to unhandled messages? Let me explain it with
With all that behaviors chaining occurring, what happens to unhandled messages? Let me explain it with
a simple rule.
.. note::
@ -84,11 +97,28 @@ a simple rule.
of the interceptors handles a message, the current Actor's behavior will receive it, and if the
behavior doesn't handle it either, it will be treated as usual with the unhandled method.
But some times it is desired for interceptors to break the chain. You can do it by explicitly ignoring
the messages with empty cases or just not calling the inner interceptor received by parameter.
But sometimes it is desired for interceptors to break the chain. You can do it by explicitly indicating
that the message has been completely handled by the interceptor by returning
:class:`HandledCompletely`.
.. includecode:: @contribSrc@/src/test/scala/akka/contrib/pattern/ReceivePipelineSpec.scala#unhandled
Processing after delegation
---------------------------
But what if you want to perform some action after the actor has processed the message (for example to
measure the message processing time)?
In order to support such use cases, the :class:`Inner` return type has a method :class:`andAfter` which accepts
a code block that can perform some action after the message has been processed by subsequent inner interceptors.
The following is a simple interceptor that times message processing:
.. includecode:: @contribSrc@/src/test/scala/akka/contrib/pattern/ReceivePipelineSpec.scala#interceptor-after
.. note::
The :class:`andAfter` code blocks are run on return from handling the message with the next inner handler and
on the same thread. It is therefore safe for the :class:`andAfter` logic to close over the interceptor's state.
Using Receive Pipelines with Persistence
----------------------------------------

View file

@ -2,46 +2,113 @@ package akka.contrib.pattern
import akka.actor.Actor
object ReceivePipeline {
/**
* Result returned by an interceptor PF to determine what/whether to delegate to the next inner interceptor
*/
sealed trait Delegation
case class Inner(transformedMsg: Any) extends Delegation {
/**
* Add a block of code to be executed after the message (which may be further transformed and processed by
* inner interceptors) is handled by the actor's receive.
*
* The block of code will be executed before similar blocks in outer interceptors.
*/
def andAfter(after: Unit): Delegation = InnerAndAfter(transformedMsg, (_ after))
}
private[ReceivePipeline] case class InnerAndAfter(transformedMsg: Any, after: Unit Unit) extends Delegation
/**
* Interceptor return value that indicates that the message has been handled
* completely. The message will not be passed to inner interceptors
* (or to the decorated actor's receive).
*/
case object HandledCompletely extends Delegation
private def withDefault(interceptor: Interceptor): Interceptor = interceptor.orElse({ case msg Inner(msg) })
type Interceptor = PartialFunction[Any, Delegation]
private sealed trait HandlerResult
private case object Done extends HandlerResult
private case object Undefined extends HandlerResult
private type Handler = Any HandlerResult
}
/**
* Trait implementing Receive Pipeline Pattern. Mixin this trait
* for configuring a chain of interceptors to be applied around
* Actor's current behavior.
*/
trait ReceivePipeline extends Actor {
import ReceivePipeline._
private var pipeline: Vector[Receive Receive] = Vector.empty
private var aroundCache: Option[(Receive, Receive)] = None
private var pipeline: Vector[Interceptor] = Vector.empty
private var decoratorCache: Option[(Receive, Receive)] = None
/**
* Adds an inner interceptor, it will be applied lastly, near to Actor's original behavior
* @param around A Receive decorator. Gets by parameter the next Receive in the chain
* and has to return a new Receive with the decoration applied
* @param interceptor an interceptor
*/
def pipelineInner(around: Receive Receive): Unit = {
pipeline :+= around
aroundCache = None
def pipelineInner(interceptor: Interceptor): Unit = {
pipeline :+= withDefault(interceptor)
decoratorCache = None
}
/**
* Adds an outer interceptor, it will be applied firstly, far from Actor's original behavior
* @param around A Receive decorator. Gets by parameter the next Receive in the chain
* and has to return a new Receive with the decoration applied
* @param interceptor an interceptor
*/
def pipelineOuter(around: Receive Receive): Unit = {
pipeline +:= around
aroundCache = None
def pipelineOuter(interceptor: Interceptor): Unit = {
pipeline +:= withDefault(interceptor)
decoratorCache = None
}
private def combinedDecorator: Receive Receive = { receive
// So that reconstructed Receive PF is undefined only when the actor's
// receive is undefined for a transformed message that reaches it...
val innerReceiveHandler: Handler = {
case msg receive.lift(msg).map(_ Done).getOrElse(Undefined)
}
val zipped = pipeline.foldRight(innerReceiveHandler) { (outerInterceptor, innerHandler)
outerInterceptor.andThen {
case Inner(msg) innerHandler(msg)
case InnerAndAfter(msg, after) try innerHandler(msg) finally after()
case HandledCompletely Done
}
}
toReceive(zipped)
}
private def toReceive(handler: Handler) = new Receive {
def isDefinedAt(m: Any): Boolean = evaluate(m) != Undefined
def apply(m: Any): Unit = evaluate(m)
override def applyOrElse[A1 <: Any, B1 >: Unit](m: A1, default: A1 B1): B1 = {
val result = handler(m)
if (result == Undefined) default(m)
}
private def evaluate(m: Any) = handler(m)
}
/**
* INTERNAL API.
*/
override protected[akka] def aroundReceive(receive: Receive, msg: Any): Unit = {
val around = aroundCache match {
def withCachedDecoration(decorator: Receive Receive): Receive = decoratorCache match {
case Some((`receive`, cached)) cached
case _
val zipped = pipeline.foldRight[Receive](receive)((outer, inner) outer(inner).orElse[Any, Unit](inner))
aroundCache = Some((receive, zipped))
zipped
val decorated = decorator(receive)
decoratorCache = Some((receive, decorated))
decorated
}
super.aroundReceive(around, msg)
super.aroundReceive(withCachedDecoration(combinedDecorator), msg)
}
}

View file

@ -4,8 +4,12 @@ import akka.actor.{ Actor, Props }
import akka.persistence.{ PersistentActor }
import akka.testkit.{ AkkaSpec, ImplicitSender }
import com.typesafe.config.{ Config, ConfigFactory }
import scala.concurrent.duration._
import akka.testkit.TestProbe
import akka.actor.ActorLogging
object ReceivePipelineSpec {
import ReceivePipeline._
class ReplierActor extends Actor with ReceivePipeline {
def receive: Actor.Receive = becomeAndReply
@ -18,6 +22,20 @@ object ReceivePipelineSpec {
}
}
class IntReplierActor(max: Int) extends Actor with ReceivePipeline {
def receive: Actor.Receive = {
case m: Int if (m <= max) sender ! m
}
}
class TotallerActor extends Actor with ReceivePipeline {
var total = 0
def receive: Actor.Receive = {
case m: Int total += m
case "get" sender ! total
}
}
case class IntList(l: List[Int]) {
override def toString: String = s"IntList(${l.mkString(", ")})"
}
@ -25,34 +43,61 @@ object ReceivePipelineSpec {
trait ListBuilderInterceptor {
this: ReceivePipeline
pipelineOuter(inner
{
case n: Int inner(IntList((n until n + 3).toList))
})
pipelineOuter {
case n: Int Inner(IntList((n until n + 3).toList))
}
}
trait AdderInterceptor {
this: ReceivePipeline
pipelineInner(inner
{
case n: Int inner(n + 10)
case IntList(l) inner(IntList(l.map(_ + 10)))
case "explicitly ignored"
})
pipelineInner {
case n: Int Inner(n + 10)
case IntList(l) Inner(IntList(l.map(_ + 10)))
case "explicitly ignored" HandledCompletely
}
}
trait ToStringInterceptor {
this: ReceivePipeline
pipelineInner(inner
{
case i: Int inner(i.toString)
case IntList(l) inner(l.toString)
case other: Iterable[_] inner(other.toString)
})
pipelineInner {
case i: Int Inner(i.toString)
case IntList(l) Inner(l.toString)
case other: Iterable[_] Inner(other.toString)
}
}
trait OddDoublerInterceptor {
this: ReceivePipeline
pipelineInner {
case i: Int if (i % 2 != 0) Inner(i * 2)
}
}
trait EvenHalverInterceptor {
this: ReceivePipeline
pipelineInner {
case i: Int if (i % 2 == 0) Inner(i / 2)
}
}
trait Timer {
this: ReceivePipeline
def notifyDuration(duration: Long): Unit
pipelineInner {
case msg: Any
val start = 1L // = currentTimeMillis
Inner(msg).andAfter {
val end = 100L // = currentTimeMillis
notifyDuration(end - start)
}
}
}
}
class ReceivePipelineSpec extends AkkaSpec with ImplicitSender {
@ -160,29 +205,136 @@ class PersistentReceivePipelineSpec(config: Config) extends AkkaSpec(config) wit
replier ! 8
expectMsg("List(18, 19, 20)")
}
}
"allow messages explicitly passed on by interceptors to be handled by the actor" in {
val replier = system.actorOf(Props(
new IntReplierActor(10) with EvenHalverInterceptor with OddDoublerInterceptor))
// 6 -> 3 -> 6
replier ! 6
expectMsg(6)
}
"allow messages not handled by some interceptors to be handled by the actor" in {
val replier = system.actorOf(Props(
new IntReplierActor(10) with EvenHalverInterceptor with OddDoublerInterceptor))
// 8 -> 4 ( -> not handled by OddDoublerInterceptor)
replier ! 8
expectMsg(4)
}
"allow messages explicitly passed on by interceptors but not handled by the actor to be treated as unhandled" in {
val probe = new TestProbe(system)
val probeRef = probe.ref
val replier = system.actorOf(Props(
new IntReplierActor(10) with EvenHalverInterceptor with OddDoublerInterceptor {
override def unhandled(message: Any) = probeRef ! message
}))
// 22 -> 11 -> 22 but > 10 so not handled in main receive: falls back to unhandled implementation...
replier ! 22
probe.expectMsg(22)
}
"allow messages not handled by some interceptors or by the actor to be treated as unhandled" in {
val probe = new TestProbe(system)
val probeRef = probe.ref
val replier = system.actorOf(Props(
new IntReplierActor(10) with EvenHalverInterceptor with OddDoublerInterceptor {
override def unhandled(message: Any) = probeRef ! message
}))
// 11 ( -> not handled by EvenHalverInterceptor) -> 22 but > 10 so not handled in main receive:
// original message falls back to unhandled implementation...
replier ! 11
probe.expectMsg(11)
}
"allow messages not handled by any interceptors or by the actor to be treated as unhandled" in {
val probe = new TestProbe(system)
val probeRef = probe.ref
val replier = system.actorOf(Props(
new IntReplierActor(10) with EvenHalverInterceptor with OddDoublerInterceptor {
override def unhandled(message: Any) = probeRef ! message
}))
replier ! "hi there!"
probe.expectMsg("hi there!")
}
"not treat messages handled by the actor as unhandled" in {
val probe = new TestProbe(system)
val probeRef = probe.ref
val replier = system.actorOf(Props(
new IntReplierActor(10) with EvenHalverInterceptor with OddDoublerInterceptor {
override def unhandled(message: Any) = probeRef ! message
}))
replier ! 4
expectMsg(2)
probe.expectNoMsg(100.millis)
}
"continue to handle messages normally after unhandled messages" in {
val probe = new TestProbe(system)
val probeRef = probe.ref
val replier = system.actorOf(Props(
new IntReplierActor(10) with EvenHalverInterceptor with OddDoublerInterceptor {
override def unhandled(message: Any) = probeRef ! message
}))
replier ! "hi there!"
replier ! 8
probe.expectMsg("hi there!")
expectMsg(4)
}
"call side-effecting receive code only once" in {
val totaller = system.actorOf(Props(
new TotallerActor with EvenHalverInterceptor with OddDoublerInterceptor))
totaller ! 8
totaller ! 6
totaller ! "get"
expectMsg(10)
}
"not cache the result of the same message" in {
val totaller = system.actorOf(Props(
new TotallerActor with EvenHalverInterceptor with OddDoublerInterceptor))
totaller ! 6
totaller ! 6
totaller ! "get"
expectMsg(12)
}
"run code in 'after' block" in {
val probe = new TestProbe(system)
val probeRef = probe.ref
val totaller = system.actorOf(Props(
new TotallerActor with Timer {
def notifyDuration(d: Long) = probeRef ! d
}))
totaller ! 6
totaller ! "get"
expectMsg(6)
probe.expectMsg(99)
}
}
}
// Just compiling code samples for documentation. Not intended to be tests.
object InterceptorSample extends App {
import Actor.Receive
//#interceptor
val printReceive: Receive = { case any println(any) }
val incrementDecorator: Receive Receive =
inner { case i: Int inner(i + 1) }
val incPrint = incrementDecorator(printReceive)
incPrint(10) // prints 11
//#interceptor
}
object InActorSample extends App {
import ReceivePipeline._
import akka.actor.ActorSystem
@ -194,9 +346,9 @@ object InActorSample extends App {
class PipelinedActor extends Actor with ReceivePipeline {
// Increment
pipelineInner(inner { case i: Int inner(i + 1) })
pipelineInner { case i: Int Inner(i + 1) }
// Double
pipelineInner(inner { case i: Int inner(i * 2) })
pipelineInner { case i: Int Inner(i * 2) }
def receive: Receive = { case any println(any) }
}
@ -210,9 +362,9 @@ object InActorSample extends App {
//#in-actor-outer
// Increment
pipelineInner(inner { case i: Int inner(i + 1) })
pipelineInner { case i: Int Inner(i + 1) }
// Double
pipelineOuter(inner { case i: Int inner(i * 2) })
pipelineOuter { case i: Int Inner(i * 2) }
// prints 11 = (5 * 2) + 1
//#in-actor-outer
@ -224,7 +376,32 @@ object InActorSample extends App {
}
object InterceptorSamples {
import ReceivePipeline._
//#interceptor-sample1
val incrementInterceptor: Interceptor = {
case i: Int Inner(i + 1)
}
//#interceptor-sample1
def logTimeTaken(time: Long) = ???
//#interceptor-sample2
val timerInterceptor: Interceptor = {
case e
val start = System.nanoTime
Inner(e).andAfter {
val end = System.nanoTime
logTimeTaken(end - start)
}
}
//#interceptor-sample2
}
object MixinSample extends App {
import ReceivePipeline._
import akka.actor.{ ActorSystem, Props }
@ -245,22 +422,20 @@ object MixinSample extends App {
trait I18nInterceptor {
this: ReceivePipeline
pipelineInner(
inner {
case m @ Message(_, I18nText(loc, key))
inner(m.copy(text = texts(s"${key}_$loc")))
})
pipelineInner {
case m @ Message(_, I18nText(loc, key))
Inner(m.copy(text = texts(s"${key}_$loc")))
}
}
trait AuditInterceptor {
this: ReceivePipeline
pipelineOuter(
inner {
case m @ Message(Some(author), text)
println(s"$author is about to say: $text")
inner(m)
})
pipelineOuter {
case m @ Message(Some(author), text)
println(s"$author is about to say: $text")
Inner(m)
}
}
//#mixin-interceptors
@ -289,6 +464,7 @@ object MixinSample extends App {
}
object UnhandledSample extends App {
import ReceivePipeline._
def isGranted(userId: Long) = true
@ -298,12 +474,35 @@ object UnhandledSample extends App {
trait PrivateInterceptor {
this: ReceivePipeline
pipelineInner(
inner {
case PrivateMessage(Some(userId), msg) if isGranted(userId) inner(msg)
case _
})
pipelineInner {
case PrivateMessage(Some(userId), msg)
if (isGranted(userId))
Inner(msg)
else
HandledCompletely
}
}
//#unhandled
}
object AfterSamples {
import ReceivePipeline._
//#interceptor-after
trait TimerInterceptor extends ActorLogging {
this: ReceivePipeline
def logTimeTaken(time: Long) = log.debug(s"Time taken: $time ns")
pipelineOuter {
case e
val start = System.nanoTime
Inner(e).andAfter {
val end = System.nanoTime
logTimeTaken(end - start)
}
}
}
//#interceptor-after
}