From 380f9efee13e75cf7eac387f78cc2a2b1b3d3c76 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 11 Aug 2015 10:43:19 +0200 Subject: [PATCH] =per #17896 Support stashing of replayed in PersistentView --- .../scala/akka/persistence/Persistent.scala | 10 ++++---- .../akka/persistence/PersistentView.scala | 4 ++++ .../akka/persistence/PersistentViewSpec.scala | 23 +++++++++++++++++++ 3 files changed, 32 insertions(+), 5 deletions(-) diff --git a/akka-persistence/src/main/scala/akka/persistence/Persistent.scala b/akka-persistence/src/main/scala/akka/persistence/Persistent.scala index d0906c395f..6e8b03c402 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Persistent.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Persistent.scala @@ -41,12 +41,12 @@ final case class AtomicWrite(payload: immutable.Seq[PersistentRepr]) extends Per // only check that all persistenceIds are equal when there's more than one in the Seq if (payload match { - case l: List[PersistentRepr] => l.tail.nonEmpty - case v: Vector[PersistentRepr] => v.size > 1 - case _ => true // some other collection type, let's just check + case l: List[PersistentRepr] ⇒ l.tail.nonEmpty + case v: Vector[PersistentRepr] ⇒ v.size > 1 + case _ ⇒ true // some other collection type, let's just check }) require(payload.forall(_.persistenceId == payload.head.persistenceId), - "AtomicWrite must contain messages for the same persistenceId, " + - s"yet different persistenceIds found: ${payload.map(_.persistenceId).toSet}") + "AtomicWrite must contain messages for the same persistenceId, " + + s"yet different persistenceIds found: ${payload.map(_.persistenceId).toSet}") def persistenceId = payload.head.persistenceId def lowestSequenceNr = payload.head.sequenceNr // this assumes they're gapless; they should be (it is only our code creating AWs) diff --git a/akka-persistence/src/main/scala/akka/persistence/PersistentView.scala b/akka-persistence/src/main/scala/akka/persistence/PersistentView.scala index d3585aff02..31dc835327 100644 --- a/akka-persistence/src/main/scala/akka/persistence/PersistentView.scala +++ b/akka-persistence/src/main/scala/akka/persistence/PersistentView.scala @@ -361,6 +361,10 @@ trait PersistentView extends Actor with Snapshotter with Stash with StashFactory override def recoveryRunning: Boolean = false override def stateReceive(receive: Receive, message: Any): Unit = message match { + case ReplayedMessage(p) ⇒ + // we can get ReplayedMessage here if it was stashed by user during replay + // unwrap the payload + PersistentView.super.aroundReceive(receive, p.payload) case ScheduledUpdate(replayMax) ⇒ changeStateToReplayStarted(await = false, replayMax) case Update(awaitUpdate, replayMax) ⇒ changeStateToReplayStarted(awaitUpdate, replayMax) case other ⇒ PersistentView.super.aroundReceive(receive, other) diff --git a/akka-persistence/src/test/scala/akka/persistence/PersistentViewSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PersistentViewSpec.scala index 02c0d6b677..a9bf5d90a0 100644 --- a/akka-persistence/src/test/scala/akka/persistence/PersistentViewSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/PersistentViewSpec.scala @@ -111,6 +111,21 @@ object PersistentViewSpec { } } + private class StashingPersistentView(name: String, probe: ActorRef) extends PersistentView { + override def persistenceId = name + override def viewId = name + "-view" + + def receive = { + case "other" ⇒ stash() + case "unstash" ⇒ + unstashAll() + context.become { + case msg ⇒ probe ! s"$msg-${lastSequenceNr}" + } + case msg ⇒ stash() + } + } + private class PersistentOrNotTestPersistentView(name: String, probe: ActorRef) extends PersistentView { override val persistenceId: String = name override val viewId: String = name + "-view" @@ -321,6 +336,14 @@ abstract class PersistentViewSpec(config: Config) extends PersistenceSpec(config viewProbe.expectMsg("replicated-b-2") viewProbe.expectMsg("replicated-c-3") } + "support stash" in { + view = system.actorOf(Props(classOf[StashingPersistentView], name, viewProbe.ref)) + view ! "other" + view ! "unstash" + viewProbe.expectMsg("a-2") // note that the lastSequenceNumber is 2, since we have replayed b-2 + viewProbe.expectMsg("b-2") + viewProbe.expectMsg("other-2") + } } }