From 4f52158b0a789fd19f4cbc786b53d97ed1975cfc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20W=C4=99grzyn?= Date: Wed, 22 Oct 2014 22:12:22 +0200 Subject: [PATCH] =per #15937 Allow PersistentActor to be used as a stackable modification * PersistentActor correctly calls `super.around*` and allows to be mixed into e.g. ActorSubscriber * Tests have been added on PersistentActor and Processor to verify the stackable behavior of `around*` and `pre*` methods * Delegation in Processor has been simplified --- .../scala/akka/persistence/Processor.scala | 27 +--- .../scala/akka/persistence/Recovery.scala | 6 +- .../persistence/PersistentActorSpec.scala | 118 +++++++++++++++++ .../akka/persistence/ProcessorSpec.scala | 123 +++++++++++++++++- 4 files changed, 246 insertions(+), 28 deletions(-) diff --git a/akka-persistence/src/main/scala/akka/persistence/Processor.scala b/akka-persistence/src/main/scala/akka/persistence/Processor.scala index 53c1512713..c0e39a55d3 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Processor.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Processor.scala @@ -264,18 +264,12 @@ private[akka] trait ProcessorImpl extends Actor with Recovery { processorBatch = Vector.empty } - /** - * INTERNAL API. - */ - override protected[akka] def aroundPreStart(): Unit = { - try preStart() finally super.preStart() - } - /** * INTERNAL API. */ override protected[akka] def aroundPostStop(): Unit = { - try unstashAll(unstashFilterPredicate) finally postStop() + // calls `super.aroundPostStop` to allow Processor to be used as a stackable modification + try unstashAll(unstashFilterPredicate) finally super.aroundPostStop() } /** @@ -288,10 +282,10 @@ private[akka] trait ProcessorImpl extends Actor with Recovery { unstashAll(unstashFilterPredicate) } finally { message match { - case Some(WriteMessageSuccess(m, _)) ⇒ preRestartDefault(reason, Some(m)) - case Some(LoopMessageSuccess(m, _)) ⇒ preRestartDefault(reason, Some(m)) - case Some(ReplayedMessage(m)) ⇒ preRestartDefault(reason, Some(m)) - case mo ⇒ preRestartDefault(reason, None) + case Some(WriteMessageSuccess(m, _)) ⇒ super.aroundPreRestart(reason, Some(m)) + case Some(LoopMessageSuccess(m, _)) ⇒ super.aroundPreRestart(reason, Some(m)) + case Some(ReplayedMessage(m)) ⇒ super.aroundPreRestart(reason, Some(m)) + case mo ⇒ super.aroundPreRestart(reason, None) } } } @@ -310,20 +304,13 @@ private[akka] trait ProcessorImpl extends Actor with Recovery { * a `Recover(lastSequenceNr)` message to `self` if `message` is defined, `Recover() otherwise`. */ override def preRestart(reason: Throwable, message: Option[Any]): Unit = { + super.preRestart(reason, message) message match { case Some(_) ⇒ self ! Recover(toSequenceNr = lastSequenceNr) case None ⇒ self ! Recover() } } - /** - * Calls [[preRestart]] and then `super.preRestart()`. If processor implementation classes want to - * opt out from stopping child actors, they should override this method and call [[preRestart]] only. - */ - def preRestartDefault(reason: Throwable, message: Option[Any]): Unit = { - try preRestart(reason, message) finally super.preRestart(reason, message) - } - override def unhandled(message: Any): Unit = { message match { case RecoveryCompleted ⇒ // mute diff --git a/akka-persistence/src/main/scala/akka/persistence/Recovery.scala b/akka-persistence/src/main/scala/akka/persistence/Recovery.scala index 1e988ae80c..93ea3053b0 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Recovery.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Recovery.scala @@ -27,7 +27,8 @@ trait Recovery extends Actor with Snapshotter with Stash with StashFactory { def aroundReceive(receive: Receive, message: Any): Unit protected def process(receive: Receive, message: Any) = - receive.applyOrElse(message, unhandled) + // calls `Recovery.super.aroundReceive` to allow Processor to be used as a stackable modification + Recovery.super.aroundReceive(receive, message) protected def processPersistent(receive: Receive, persistent: Persistent) = withCurrentPersistent(persistent)(runReceive(receive)) @@ -45,7 +46,8 @@ trait Recovery extends Actor with Snapshotter with Stash with StashFactory { * through withCurrentPersistent(). */ private[persistence] def runReceive(receive: Receive)(msg: Persistent): Unit = - receive.applyOrElse(msg, unhandled) + // calls `Recovery.super.aroundReceive` to allow Processor to be used as a stackable modification + Recovery.super.aroundReceive(receive, msg) /** * INTERNAL API. diff --git a/akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala index d7a03227ee..1b0a7efc50 100644 --- a/akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/PersistentActorSpec.scala @@ -13,6 +13,7 @@ import akka.testkit.EventFilter import akka.testkit.TestProbe import java.util.concurrent.atomic.AtomicInteger import scala.util.Random +import scala.util.control.NoStackTrace object PersistentActorSpec { final case class Cmd(data: Any) @@ -403,6 +404,95 @@ object PersistentActorSpec { } } + class StackableTestPersistentActor(val probe: ActorRef) extends StackableTestPersistentActor.BaseActor with PersistentActor with StackableTestPersistentActor.MixinActor { + override def persistenceId: String = "StackableTestPersistentActor" + + def receiveCommand = { + case "restart" ⇒ throw new Exception("triggering restart") with NoStackTrace { override def toString = "Boom!" } + } + + def receiveRecover = { + case _ ⇒ () + } + + override def preStart(): Unit = { + probe ! "preStart" + super.preStart() + } + + override def postStop(): Unit = { + probe ! "postStop" + super.postStop() + } + + override def preRestart(reason: Throwable, message: Option[Any]): Unit = { + probe ! "preRestart" + super.preRestart(reason, message) + } + + override def postRestart(reason: Throwable): Unit = { + probe ! "postRestart" + super.postRestart(reason) + } + + } + + object StackableTestPersistentActor { + trait BaseActor extends Actor { this: StackableTestPersistentActor ⇒ + override protected[akka] def aroundPreStart() = { + probe ! "base aroundPreStart" + super.aroundPreStart() + } + + override protected[akka] def aroundPostStop() = { + probe ! "base aroundPostStop" + super.aroundPostStop() + } + + override protected[akka] def aroundPreRestart(reason: Throwable, message: Option[Any]) = { + probe ! "base aroundPreRestart" + super.aroundPreRestart(reason, message) + } + + override protected[akka] def aroundPostRestart(reason: Throwable) = { + probe ! "base aroundPostRestart" + super.aroundPostRestart(reason) + } + + override protected[akka] def aroundReceive(receive: Receive, message: Any) = { + probe ! "base aroundReceive" + super.aroundReceive(receive, message) + } + } + + trait MixinActor extends Actor { this: StackableTestPersistentActor ⇒ + override protected[akka] def aroundPreStart() = { + probe ! "mixin aroundPreStart" + super.aroundPreStart() + } + + override protected[akka] def aroundPostStop() = { + probe ! "mixin aroundPostStop" + super.aroundPostStop() + } + + override protected[akka] def aroundPreRestart(reason: Throwable, message: Option[Any]) = { + probe ! "mixin aroundPreRestart" + super.aroundPreRestart(reason, message) + } + + override protected[akka] def aroundPostRestart(reason: Throwable) = { + probe ! "mixin aroundPostRestart" + super.aroundPostRestart(reason) + } + + override protected[akka] def aroundReceive(receive: Receive, message: Any) = { + if (message == "restart" && recoveryFinished) { probe ! "mixin aroundReceive" } + super.aroundReceive(receive, message) + } + } + } + } abstract class PersistentActorSpec(config: Config) extends AkkaSpec(config) with PersistenceSpec with ImplicitSender { @@ -770,7 +860,35 @@ abstract class PersistentActorSpec(config: Config) extends AkkaSpec(config) with processor2 ! GetState expectMsg(List("a-1", "a-2", "b-41", "b-42", "c-41", "c-42", RecoveryCompleted)) } + "be used as a stackable modification" in { + val persistentActor = system.actorOf(Props(classOf[StackableTestPersistentActor], testActor)) + expectMsg("mixin aroundPreStart") + expectMsg("base aroundPreStart") + expectMsg("preStart") + + persistentActor ! "restart" + expectMsg("mixin aroundReceive") + expectMsg("base aroundReceive") + + expectMsg("mixin aroundPreRestart") + expectMsg("base aroundPreRestart") + expectMsg("preRestart") + expectMsg("postStop") + + expectMsg("mixin aroundPostRestart") + expectMsg("base aroundPostRestart") + expectMsg("postRestart") + expectMsg("preStart") + + persistentActor ! PoisonPill + expectMsg("mixin aroundPostStop") + expectMsg("base aroundPostStop") + expectMsg("postStop") + + expectNoMsg(100.millis) + } } + } class LeveldbPersistentActorSpec extends PersistentActorSpec(PersistenceSpec.config("leveldb", "LeveldbPersistentActorSpec")) diff --git a/akka-persistence/src/test/scala/akka/persistence/ProcessorSpec.scala b/akka-persistence/src/test/scala/akka/persistence/ProcessorSpec.scala index 4693781b24..fd5a48229f 100644 --- a/akka-persistence/src/test/scala/akka/persistence/ProcessorSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/ProcessorSpec.scala @@ -4,13 +4,12 @@ package akka.persistence -import scala.concurrent.duration._ -import scala.collection.immutable.Seq - -import com.typesafe.config._ - import akka.actor._ import akka.testkit._ +import com.typesafe.config._ +import scala.concurrent.duration._ +import scala.collection.immutable.Seq +import scala.util.control.NoStackTrace object ProcessorSpec { class RecoverTestProcessor(name: String) extends NamedProcessor(name) { @@ -135,11 +134,95 @@ object ProcessorSpec { case DeleteN(toSnr) ⇒ deleteMessages(toSnr) } } + + class StackableTestProcessor(val probe: ActorRef) extends StackableTestProcessor.BaseActor with Processor with StackableTestProcessor.MixinActor { + override def persistenceId: String = "StackableTestPersistentActor" + + def receive = { + case "restart" ⇒ throw new Exception("triggering restart") with NoStackTrace { override def toString = "Boom!" } + } + + override def preStart(): Unit = { + probe ! "preStart" + super.preStart() + } + + override def postStop(): Unit = { + probe ! "postStop" + super.postStop() + } + + override def preRestart(reason: Throwable, message: Option[Any]): Unit = { + probe ! "preRestart" + super.preRestart(reason, message) + } + + override def postRestart(reason: Throwable): Unit = { + probe ! "postRestart" + super.postRestart(reason) + } + } + + object StackableTestProcessor { + trait BaseActor extends Actor { this: StackableTestProcessor ⇒ + override protected[akka] def aroundPreStart() = { + probe ! "base aroundPreStart" + super.aroundPreStart() + } + + override protected[akka] def aroundPostStop() = { + probe ! "base aroundPostStop" + super.aroundPostStop() + } + + override protected[akka] def aroundPreRestart(reason: Throwable, message: Option[Any]) = { + probe ! "base aroundPreRestart" + super.aroundPreRestart(reason, message) + } + + override protected[akka] def aroundPostRestart(reason: Throwable) = { + probe ! "base aroundPostRestart" + super.aroundPostRestart(reason) + } + + override protected[akka] def aroundReceive(receive: Receive, message: Any) = { + probe ! "base aroundReceive" + super.aroundReceive(receive, message) + } + } + + trait MixinActor extends Actor { this: StackableTestProcessor ⇒ + override protected[akka] def aroundPreStart() = { + probe ! "mixin aroundPreStart" + super.aroundPreStart() + } + + override protected[akka] def aroundPostStop() = { + probe ! "mixin aroundPostStop" + super.aroundPostStop() + } + + override protected[akka] def aroundPreRestart(reason: Throwable, message: Option[Any]) = { + probe ! "mixin aroundPreRestart" + super.aroundPreRestart(reason, message) + } + + override protected[akka] def aroundPostRestart(reason: Throwable) = { + probe ! "mixin aroundPostRestart" + super.aroundPostRestart(reason) + } + + override protected[akka] def aroundReceive(receive: Receive, message: Any) = { + if (message == "restart" && recoveryFinished) { probe ! "mixin aroundReceive" } + super.aroundReceive(receive, message) + } + } + } } abstract class ProcessorSpec(config: Config) extends AkkaSpec(config) with PersistenceSpec with ImplicitSender { - import ProcessorSpec._ import JournalProtocol._ + import ProcessorSpec._ override protected def beforeEach() { super.beforeEach() @@ -355,6 +438,34 @@ abstract class ProcessorSpec(config: Config) extends AkkaSpec(config) with Persi processor ! Persistent("b") List(0, 1, 2, 3) foreach (expectMsg(_)) } + + "be used as a stackable modification" in { + val processor = system.actorOf(Props(classOf[StackableTestProcessor], testActor)) + expectMsg("mixin aroundPreStart") + expectMsg("base aroundPreStart") + expectMsg("preStart") + + processor ! "restart" + expectMsg("mixin aroundReceive") + expectMsg("base aroundReceive") + + expectMsg("mixin aroundPreRestart") + expectMsg("base aroundPreRestart") + expectMsg("preRestart") + expectMsg("postStop") + + expectMsg("mixin aroundPostRestart") + expectMsg("base aroundPostRestart") + expectMsg("postRestart") + expectMsg("preStart") + + processor ! PoisonPill + expectMsg("mixin aroundPostStop") + expectMsg("base aroundPostStop") + expectMsg("postStop") + + expectNoMsg(100.millis) + } } }