=per #17896 Support stashing of replayed in PersistentView

This commit is contained in:
Patrik Nordwall 2015-08-11 10:43:19 +02:00
parent f4abf80f50
commit 380f9efee1
3 changed files with 32 additions and 5 deletions

View file

@ -41,12 +41,12 @@ final case class AtomicWrite(payload: immutable.Seq[PersistentRepr]) extends Per
// only check that all persistenceIds are equal when there's more than one in the Seq
if (payload match {
case l: List[PersistentRepr] => l.tail.nonEmpty
case v: Vector[PersistentRepr] => v.size > 1
case _ => true // some other collection type, let's just check
case l: List[PersistentRepr] l.tail.nonEmpty
case v: Vector[PersistentRepr] v.size > 1
case _ true // some other collection type, let's just check
}) require(payload.forall(_.persistenceId == payload.head.persistenceId),
"AtomicWrite must contain messages for the same persistenceId, " +
s"yet different persistenceIds found: ${payload.map(_.persistenceId).toSet}")
"AtomicWrite must contain messages for the same persistenceId, " +
s"yet different persistenceIds found: ${payload.map(_.persistenceId).toSet}")
def persistenceId = payload.head.persistenceId
def lowestSequenceNr = payload.head.sequenceNr // this assumes they're gapless; they should be (it is only our code creating AWs)

View file

@ -361,6 +361,10 @@ trait PersistentView extends Actor with Snapshotter with Stash with StashFactory
override def recoveryRunning: Boolean = false
override def stateReceive(receive: Receive, message: Any): Unit = message match {
case ReplayedMessage(p)
// we can get ReplayedMessage here if it was stashed by user during replay
// unwrap the payload
PersistentView.super.aroundReceive(receive, p.payload)
case ScheduledUpdate(replayMax) changeStateToReplayStarted(await = false, replayMax)
case Update(awaitUpdate, replayMax) changeStateToReplayStarted(awaitUpdate, replayMax)
case other PersistentView.super.aroundReceive(receive, other)

View file

@ -111,6 +111,21 @@ object PersistentViewSpec {
}
}
private class StashingPersistentView(name: String, probe: ActorRef) extends PersistentView {
override def persistenceId = name
override def viewId = name + "-view"
def receive = {
case "other" stash()
case "unstash"
unstashAll()
context.become {
case msg probe ! s"$msg-${lastSequenceNr}"
}
case msg stash()
}
}
private class PersistentOrNotTestPersistentView(name: String, probe: ActorRef) extends PersistentView {
override val persistenceId: String = name
override val viewId: String = name + "-view"
@ -321,6 +336,14 @@ abstract class PersistentViewSpec(config: Config) extends PersistenceSpec(config
viewProbe.expectMsg("replicated-b-2")
viewProbe.expectMsg("replicated-c-3")
}
"support stash" in {
view = system.actorOf(Props(classOf[StashingPersistentView], name, viewProbe.ref))
view ! "other"
view ! "unstash"
viewProbe.expectMsg("a-2") // note that the lastSequenceNumber is 2, since we have replayed b-2
viewProbe.expectMsg("b-2")
viewProbe.expectMsg("other-2")
}
}
}