diff --git a/akka-contrib/docs/receive-pipeline.rst b/akka-contrib/docs/receive-pipeline.rst index 1caa54fe85..7a96e75566 100644 --- a/akka-contrib/docs/receive-pipeline.rst +++ b/akka-contrib/docs/receive-pipeline.rst @@ -89,3 +89,18 @@ the messages with empty cases or just not calling the inner interceptor received .. includecode:: @contribSrc@/src/test/scala/akka/contrib/pattern/ReceivePipelineSpec.scala#unhandled +Using Receive Pipelines with Persistence +---------------------------------------- + +When using ``ReceivePipeline`` together with :ref:`PersistentActor` make sure to +mix-in the traits in the following order for them to properly co-operate:: + + class ExampleActor extends PersistentActor with ReceivePipeline { + /* ... */ + } + +The order is important here because of how both traits use internal "around" methods to implement their features, +and if mixed-in the other way around it would not work as expected. If you want to learn more about how exactly this +works, you can read up on Scala's `type linearization mechanism`_; + +.. _type linearization mechanism: http://ktoso.github.io/scala-types-of-types/#type-linearization-vs-the-diamond-problem \ No newline at end of file diff --git a/akka-contrib/src/test/scala/akka/contrib/pattern/ReceivePipelineSpec.scala b/akka-contrib/src/test/scala/akka/contrib/pattern/ReceivePipelineSpec.scala index 597e260004..8d9a03ac75 100644 --- a/akka-contrib/src/test/scala/akka/contrib/pattern/ReceivePipelineSpec.scala +++ b/akka-contrib/src/test/scala/akka/contrib/pattern/ReceivePipelineSpec.scala @@ -1,8 +1,9 @@ package akka.contrib.pattern -import akka.testkit.{ ImplicitSender, AkkaSpec } import akka.actor.{ Actor, Props } -import scala.concurrent.duration._ +import akka.persistence.{ PersistentActor } +import akka.testkit.{ AkkaSpec, ImplicitSender } +import com.typesafe.config.{ Config, ConfigFactory } object ReceivePipelineSpec { @@ -117,7 +118,48 @@ class ReceivePipelineSpec extends AkkaSpec with ImplicitSender { innerOuterReplier ! 6 expectMsg(IntList(List(16, 17, 18))) } + } +} + +object PersistentReceivePipelineSpec { + class PersistentReplierActor extends PersistentActor with ReceivePipeline { + override def persistenceId: String = "p-1" + + def becomeAndReply: Actor.Receive = { + case "become" ⇒ context.become(justReply) + case m ⇒ sender ! m + } + def justReply: Actor.Receive = { + case m ⇒ sender ! m + } + + override def receiveCommand: Receive = becomeAndReply + override def receiveRecover: Receive = { + case _ ⇒ // ... + } + } + +} +class PersistentReceivePipelineSpec(config: Config) extends AkkaSpec(config) with ImplicitSender { + import ReceivePipelineSpec._ + import PersistentReceivePipelineSpec._ + + def this() { + this(ConfigFactory.parseString( + s""" + |akka.persistence.journal.plugin = "akka.persistence.journal.inmem" + |akka.persistence.journal.leveldb.dir = "target/journal-${getClass.getSimpleName}" + """.stripMargin)) + } + + "A PersistentActor with ReceivePipeline" must { + "support any number of interceptors" in { + val replier = system.actorOf(Props( + new PersistentReplierActor with ListBuilderInterceptor with AdderInterceptor with ToStringInterceptor)) + replier ! 8 + expectMsg("List(18, 19, 20)") + } } }