=per #15937 Allow PersistentActor to be used as a stackable modification

* PersistentActor correctly calls `super.around*` and allows to be mixed into e.g. ActorSubscriber
 * Tests have been added on PersistentActor and Processor to verify the stackable behavior of `around*` and `pre*` methods
 * Delegation in Processor has been simplified
This commit is contained in:
Michał Węgrzyn 2014-10-22 22:12:22 +02:00
parent ad35be7de1
commit 4f52158b0a
4 changed files with 246 additions and 28 deletions

View file

@ -264,18 +264,12 @@ private[akka] trait ProcessorImpl extends Actor with Recovery {
processorBatch = Vector.empty
}
/**
* INTERNAL API.
*/
override protected[akka] def aroundPreStart(): Unit = {
try preStart() finally super.preStart()
}
/**
* INTERNAL API.
*/
override protected[akka] def aroundPostStop(): Unit = {
try unstashAll(unstashFilterPredicate) finally postStop()
// calls `super.aroundPostStop` to allow Processor to be used as a stackable modification
try unstashAll(unstashFilterPredicate) finally super.aroundPostStop()
}
/**
@ -288,10 +282,10 @@ private[akka] trait ProcessorImpl extends Actor with Recovery {
unstashAll(unstashFilterPredicate)
} finally {
message match {
case Some(WriteMessageSuccess(m, _)) preRestartDefault(reason, Some(m))
case Some(LoopMessageSuccess(m, _)) preRestartDefault(reason, Some(m))
case Some(ReplayedMessage(m)) preRestartDefault(reason, Some(m))
case mo preRestartDefault(reason, None)
case Some(WriteMessageSuccess(m, _)) super.aroundPreRestart(reason, Some(m))
case Some(LoopMessageSuccess(m, _)) super.aroundPreRestart(reason, Some(m))
case Some(ReplayedMessage(m)) super.aroundPreRestart(reason, Some(m))
case mo super.aroundPreRestart(reason, None)
}
}
}
@ -310,20 +304,13 @@ private[akka] trait ProcessorImpl extends Actor with Recovery {
* a `Recover(lastSequenceNr)` message to `self` if `message` is defined, `Recover() otherwise`.
*/
override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
super.preRestart(reason, message)
message match {
case Some(_) self ! Recover(toSequenceNr = lastSequenceNr)
case None self ! Recover()
}
}
/**
* Calls [[preRestart]] and then `super.preRestart()`. If processor implementation classes want to
* opt out from stopping child actors, they should override this method and call [[preRestart]] only.
*/
def preRestartDefault(reason: Throwable, message: Option[Any]): Unit = {
try preRestart(reason, message) finally super.preRestart(reason, message)
}
override def unhandled(message: Any): Unit = {
message match {
case RecoveryCompleted // mute

View file

@ -27,7 +27,8 @@ trait Recovery extends Actor with Snapshotter with Stash with StashFactory {
def aroundReceive(receive: Receive, message: Any): Unit
protected def process(receive: Receive, message: Any) =
receive.applyOrElse(message, unhandled)
// calls `Recovery.super.aroundReceive` to allow Processor to be used as a stackable modification
Recovery.super.aroundReceive(receive, message)
protected def processPersistent(receive: Receive, persistent: Persistent) =
withCurrentPersistent(persistent)(runReceive(receive))
@ -45,7 +46,8 @@ trait Recovery extends Actor with Snapshotter with Stash with StashFactory {
* through withCurrentPersistent().
*/
private[persistence] def runReceive(receive: Receive)(msg: Persistent): Unit =
receive.applyOrElse(msg, unhandled)
// calls `Recovery.super.aroundReceive` to allow Processor to be used as a stackable modification
Recovery.super.aroundReceive(receive, msg)
/**
* INTERNAL API.

View file

@ -13,6 +13,7 @@ import akka.testkit.EventFilter
import akka.testkit.TestProbe
import java.util.concurrent.atomic.AtomicInteger
import scala.util.Random
import scala.util.control.NoStackTrace
object PersistentActorSpec {
final case class Cmd(data: Any)
@ -403,6 +404,95 @@ object PersistentActorSpec {
}
}
class StackableTestPersistentActor(val probe: ActorRef) extends StackableTestPersistentActor.BaseActor with PersistentActor with StackableTestPersistentActor.MixinActor {
override def persistenceId: String = "StackableTestPersistentActor"
def receiveCommand = {
case "restart" throw new Exception("triggering restart") with NoStackTrace { override def toString = "Boom!" }
}
def receiveRecover = {
case _ ()
}
override def preStart(): Unit = {
probe ! "preStart"
super.preStart()
}
override def postStop(): Unit = {
probe ! "postStop"
super.postStop()
}
override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
probe ! "preRestart"
super.preRestart(reason, message)
}
override def postRestart(reason: Throwable): Unit = {
probe ! "postRestart"
super.postRestart(reason)
}
}
object StackableTestPersistentActor {
trait BaseActor extends Actor { this: StackableTestPersistentActor
override protected[akka] def aroundPreStart() = {
probe ! "base aroundPreStart"
super.aroundPreStart()
}
override protected[akka] def aroundPostStop() = {
probe ! "base aroundPostStop"
super.aroundPostStop()
}
override protected[akka] def aroundPreRestart(reason: Throwable, message: Option[Any]) = {
probe ! "base aroundPreRestart"
super.aroundPreRestart(reason, message)
}
override protected[akka] def aroundPostRestart(reason: Throwable) = {
probe ! "base aroundPostRestart"
super.aroundPostRestart(reason)
}
override protected[akka] def aroundReceive(receive: Receive, message: Any) = {
probe ! "base aroundReceive"
super.aroundReceive(receive, message)
}
}
trait MixinActor extends Actor { this: StackableTestPersistentActor
override protected[akka] def aroundPreStart() = {
probe ! "mixin aroundPreStart"
super.aroundPreStart()
}
override protected[akka] def aroundPostStop() = {
probe ! "mixin aroundPostStop"
super.aroundPostStop()
}
override protected[akka] def aroundPreRestart(reason: Throwable, message: Option[Any]) = {
probe ! "mixin aroundPreRestart"
super.aroundPreRestart(reason, message)
}
override protected[akka] def aroundPostRestart(reason: Throwable) = {
probe ! "mixin aroundPostRestart"
super.aroundPostRestart(reason)
}
override protected[akka] def aroundReceive(receive: Receive, message: Any) = {
if (message == "restart" && recoveryFinished) { probe ! "mixin aroundReceive" }
super.aroundReceive(receive, message)
}
}
}
}
abstract class PersistentActorSpec(config: Config) extends AkkaSpec(config) with PersistenceSpec with ImplicitSender {
@ -770,7 +860,35 @@ abstract class PersistentActorSpec(config: Config) extends AkkaSpec(config) with
processor2 ! GetState
expectMsg(List("a-1", "a-2", "b-41", "b-42", "c-41", "c-42", RecoveryCompleted))
}
"be used as a stackable modification" in {
val persistentActor = system.actorOf(Props(classOf[StackableTestPersistentActor], testActor))
expectMsg("mixin aroundPreStart")
expectMsg("base aroundPreStart")
expectMsg("preStart")
persistentActor ! "restart"
expectMsg("mixin aroundReceive")
expectMsg("base aroundReceive")
expectMsg("mixin aroundPreRestart")
expectMsg("base aroundPreRestart")
expectMsg("preRestart")
expectMsg("postStop")
expectMsg("mixin aroundPostRestart")
expectMsg("base aroundPostRestart")
expectMsg("postRestart")
expectMsg("preStart")
persistentActor ! PoisonPill
expectMsg("mixin aroundPostStop")
expectMsg("base aroundPostStop")
expectMsg("postStop")
expectNoMsg(100.millis)
}
}
}
class LeveldbPersistentActorSpec extends PersistentActorSpec(PersistenceSpec.config("leveldb", "LeveldbPersistentActorSpec"))

View file

@ -4,13 +4,12 @@
package akka.persistence
import scala.concurrent.duration._
import scala.collection.immutable.Seq
import com.typesafe.config._
import akka.actor._
import akka.testkit._
import com.typesafe.config._
import scala.concurrent.duration._
import scala.collection.immutable.Seq
import scala.util.control.NoStackTrace
object ProcessorSpec {
class RecoverTestProcessor(name: String) extends NamedProcessor(name) {
@ -135,11 +134,95 @@ object ProcessorSpec {
case DeleteN(toSnr) deleteMessages(toSnr)
}
}
class StackableTestProcessor(val probe: ActorRef) extends StackableTestProcessor.BaseActor with Processor with StackableTestProcessor.MixinActor {
override def persistenceId: String = "StackableTestPersistentActor"
def receive = {
case "restart" throw new Exception("triggering restart") with NoStackTrace { override def toString = "Boom!" }
}
override def preStart(): Unit = {
probe ! "preStart"
super.preStart()
}
override def postStop(): Unit = {
probe ! "postStop"
super.postStop()
}
override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
probe ! "preRestart"
super.preRestart(reason, message)
}
override def postRestart(reason: Throwable): Unit = {
probe ! "postRestart"
super.postRestart(reason)
}
}
object StackableTestProcessor {
trait BaseActor extends Actor { this: StackableTestProcessor
override protected[akka] def aroundPreStart() = {
probe ! "base aroundPreStart"
super.aroundPreStart()
}
override protected[akka] def aroundPostStop() = {
probe ! "base aroundPostStop"
super.aroundPostStop()
}
override protected[akka] def aroundPreRestart(reason: Throwable, message: Option[Any]) = {
probe ! "base aroundPreRestart"
super.aroundPreRestart(reason, message)
}
override protected[akka] def aroundPostRestart(reason: Throwable) = {
probe ! "base aroundPostRestart"
super.aroundPostRestart(reason)
}
override protected[akka] def aroundReceive(receive: Receive, message: Any) = {
probe ! "base aroundReceive"
super.aroundReceive(receive, message)
}
}
trait MixinActor extends Actor { this: StackableTestProcessor
override protected[akka] def aroundPreStart() = {
probe ! "mixin aroundPreStart"
super.aroundPreStart()
}
override protected[akka] def aroundPostStop() = {
probe ! "mixin aroundPostStop"
super.aroundPostStop()
}
override protected[akka] def aroundPreRestart(reason: Throwable, message: Option[Any]) = {
probe ! "mixin aroundPreRestart"
super.aroundPreRestart(reason, message)
}
override protected[akka] def aroundPostRestart(reason: Throwable) = {
probe ! "mixin aroundPostRestart"
super.aroundPostRestart(reason)
}
override protected[akka] def aroundReceive(receive: Receive, message: Any) = {
if (message == "restart" && recoveryFinished) { probe ! "mixin aroundReceive" }
super.aroundReceive(receive, message)
}
}
}
}
abstract class ProcessorSpec(config: Config) extends AkkaSpec(config) with PersistenceSpec with ImplicitSender {
import ProcessorSpec._
import JournalProtocol._
import ProcessorSpec._
override protected def beforeEach() {
super.beforeEach()
@ -355,6 +438,34 @@ abstract class ProcessorSpec(config: Config) extends AkkaSpec(config) with Persi
processor ! Persistent("b")
List(0, 1, 2, 3) foreach (expectMsg(_))
}
"be used as a stackable modification" in {
val processor = system.actorOf(Props(classOf[StackableTestProcessor], testActor))
expectMsg("mixin aroundPreStart")
expectMsg("base aroundPreStart")
expectMsg("preStart")
processor ! "restart"
expectMsg("mixin aroundReceive")
expectMsg("base aroundReceive")
expectMsg("mixin aroundPreRestart")
expectMsg("base aroundPreRestart")
expectMsg("preRestart")
expectMsg("postStop")
expectMsg("mixin aroundPostRestart")
expectMsg("base aroundPostRestart")
expectMsg("postRestart")
expectMsg("preStart")
processor ! PoisonPill
expectMsg("mixin aroundPostStop")
expectMsg("base aroundPostStop")
expectMsg("postStop")
expectNoMsg(100.millis)
}
}
}