From ec55ab2d22cd66109436f2a44e99eb00ad7478e0 Mon Sep 17 00:00:00 2001 From: Roland Kuhn Date: Sat, 23 Aug 2014 18:50:25 +0200 Subject: [PATCH] =per #15730 fix context.become in PersistentView (cherry picked from commit 561bd00cf69b0070693db397fe0dbda824792335) --- .../scala/akka/persistence/PersistentView.scala | 6 ++---- .../main/scala/akka/persistence/Recovery.scala | 11 ++++++++++- .../akka/persistence/PersistentViewSpec.scala | 16 ++++++++++++++++ 3 files changed, 28 insertions(+), 5 deletions(-) diff --git a/akka-persistence/src/main/scala/akka/persistence/PersistentView.scala b/akka-persistence/src/main/scala/akka/persistence/PersistentView.scala index a7afe009bc..92d83ee62c 100644 --- a/akka-persistence/src/main/scala/akka/persistence/PersistentView.scala +++ b/akka-persistence/src/main/scala/akka/persistence/PersistentView.scala @@ -136,10 +136,8 @@ trait PersistentView extends Actor with Recovery { * INTERNAL API * WARNING: This implementation UNWRAPS PERSISTENT() before delivering to the receive block. */ - override private[persistence] def withCurrentPersistent(persistent: Persistent)(body: Persistent ⇒ Unit): Unit = - super.withCurrentPersistent(persistent) { p ⇒ - receive.applyOrElse(p.payload, unhandled) - } + override private[persistence] def runReceive(receive: Receive)(msg: Persistent): Unit = + receive.applyOrElse(msg.payload, unhandled) private val viewSettings = extension.settings.view diff --git a/akka-persistence/src/main/scala/akka/persistence/Recovery.scala b/akka-persistence/src/main/scala/akka/persistence/Recovery.scala index c697b8594e..1e988ae80c 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Recovery.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Recovery.scala @@ -30,7 +30,7 @@ trait Recovery extends Actor with Snapshotter with Stash with StashFactory { receive.applyOrElse(message, unhandled) protected def processPersistent(receive: Receive, persistent: Persistent) = - withCurrentPersistent(persistent)(receive.applyOrElse(_, unhandled)) + withCurrentPersistent(persistent)(runReceive(receive)) protected def recordFailure(cause: Throwable): Unit = { _recoveryFailureCause = cause @@ -38,6 +38,15 @@ trait Recovery extends Actor with Snapshotter with Stash with StashFactory { } } + /** + * INTERNAL API. + * + * This is used to deliver a persistent message to the actor’s behavior + * through withCurrentPersistent(). + */ + private[persistence] def runReceive(receive: Receive)(msg: Persistent): Unit = + receive.applyOrElse(msg, unhandled) + /** * INTERNAL API. * diff --git a/akka-persistence/src/test/scala/akka/persistence/PersistentViewSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PersistentViewSpec.scala index c1855bed78..61b8dfe878 100644 --- a/akka-persistence/src/test/scala/akka/persistence/PersistentViewSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/PersistentViewSpec.scala @@ -100,6 +100,17 @@ object PersistentViewSpec { } } + private class BecomingPersistentView(name: String, probe: ActorRef) extends PersistentView { + override def persistenceId = name + override def viewId = name + "-view" + + def receive = Actor.emptyBehavior + + context.become { + case payload ⇒ probe ! s"replicated-${payload}-${lastSequenceNr}" + } + } + private class PersistentOrNotTestPersistentView(name: String, probe: ActorRef) extends PersistentView { override val persistenceId: String = name override val viewId: String = name + "-view" @@ -281,6 +292,11 @@ abstract class PersistentViewSpec(config: Config) extends AkkaSpec(config) with replayProbe.expectMsgPF() { case ReplayMessages(3L, _, 2L, _, _, _) ⇒ } replayProbe.expectMsgPF() { case ReplayMessages(5L, _, 2L, _, _, _) ⇒ } } + "support context.become" in { + view = system.actorOf(Props(classOf[BecomingPersistentView], name, viewProbe.ref)) + viewProbe.expectMsg("replicated-a-1") + viewProbe.expectMsg("replicated-b-2") + } "check if an incoming message is persistent" in { persistentActor ! "c"