diff --git a/akka-persistence/src/main/scala/akka/persistence/View.scala b/akka-persistence/src/main/scala/akka/persistence/View.scala index 00d544d6e3..29b8f05469 100644 --- a/akka-persistence/src/main/scala/akka/persistence/View.scala +++ b/akka-persistence/src/main/scala/akka/persistence/View.scala @@ -129,7 +129,7 @@ trait View extends Actor with Recovery { */ private def onReplayComplete(await: Boolean): Unit = { _currentState = idle - if (autoUpdate) schedule = Some(context.system.scheduler.scheduleOnce(autoUpdateInterval, self, Update(await = false))) + if (autoUpdate) schedule = Some(context.system.scheduler.scheduleOnce(autoUpdateInterval, self, Update(await = false, autoUpdateReplayMax))) if (await) receiverStash.unstashAll() } diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala index f15632d689..7b1c19f4d2 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala @@ -42,7 +42,7 @@ trait AsyncWriteJournal extends Actor with AsyncRecovery { resequence(WriteMessageFailure(_, e)) } resequencerCounter += persistentBatch.length + 1 - case ReplayMessages(fromSequenceNr, toSequenceNr, max, processorId, processor, replayDeleted) ⇒ + case r @ ReplayMessages(fromSequenceNr, toSequenceNr, max, processorId, processor, replayDeleted) ⇒ // Send replayed messages and replay result to processor directly. No need // to resequence replayed messages relative to written and looped messages. asyncReplayMessages(processorId, fromSequenceNr, toSequenceNr, max) { p ⇒ @@ -51,7 +51,9 @@ trait AsyncWriteJournal extends Actor with AsyncRecovery { case _ ⇒ ReplayMessagesSuccess } recover { case e ⇒ ReplayMessagesFailure(e) - } pipeTo (processor) + } pipeTo (processor) onSuccess { + case _ if publish ⇒ context.system.eventStream.publish(r) + } case ReadHighestSequenceNr(fromSequenceNr, processorId, processor) ⇒ // Send read highest sequence number to processor directly. No need // to resequence the result relative to written and looped messages. diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/SyncWriteJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/SyncWriteJournal.scala index 4f330314b5..fa6819e346 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/SyncWriteJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/SyncWriteJournal.scala @@ -33,14 +33,16 @@ trait SyncWriteJournal extends Actor with AsyncRecovery { persistentBatch.foreach(p ⇒ processor tell (WriteMessageFailure(p, e), p.sender)) throw e } - case ReplayMessages(fromSequenceNr, toSequenceNr, max, processorId, processor, replayDeleted) ⇒ + case r @ ReplayMessages(fromSequenceNr, toSequenceNr, max, processorId, processor, replayDeleted) ⇒ asyncReplayMessages(processorId, fromSequenceNr, toSequenceNr, max) { p ⇒ if (!p.deleted || replayDeleted) processor.tell(ReplayedMessage(p), p.sender) } map { case _ ⇒ ReplayMessagesSuccess } recover { case e ⇒ ReplayMessagesFailure(e) - } pipeTo (processor) + } pipeTo (processor) onSuccess { + case _ if publish ⇒ context.system.eventStream.publish(r) + } case ReadHighestSequenceNr(fromSequenceNr, processorId, processor) ⇒ asyncReadHighestSequenceNr(processorId, fromSequenceNr).map { highest ⇒ ReadHighestSequenceNrSuccess(highest) diff --git a/akka-persistence/src/test/scala/akka/persistence/PerformanceSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PerformanceSpec.scala index f62dacb268..fd0f22fa4f 100644 --- a/akka-persistence/src/test/scala/akka/persistence/PerformanceSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/PerformanceSpec.scala @@ -15,7 +15,6 @@ object PerformanceSpec { """ akka.persistence.performance.cycles.warmup = 300 akka.persistence.performance.cycles.load = 1000 - akka.persistence.publish-confirmations = on """ case object StartMeasure diff --git a/akka-persistence/src/test/scala/akka/persistence/ViewSpec.scala b/akka-persistence/src/test/scala/akka/persistence/ViewSpec.scala index 82b625db0c..9b24f12d34 100644 --- a/akka-persistence/src/test/scala/akka/persistence/ViewSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/ViewSpec.scala @@ -9,6 +9,7 @@ import com.typesafe.config.Config import akka.actor._ import akka.testkit._ +import akka.persistence.JournalProtocol.ReplayMessages object ViewSpec { class TestProcessor(name: String, probe: ActorRef) extends NamedProcessor(name) { @@ -71,6 +72,17 @@ object ViewSpec { } } + class ActiveTestView(name: String, probe: ActorRef) extends View { + override val processorId: String = name + override def autoUpdateInterval: FiniteDuration = 50.millis + override def autoUpdateReplayMax: Long = 2 + + def receive = { + case Persistent(payload, sequenceNr) ⇒ + probe ! s"replicated-${payload}-${sequenceNr}" + } + } + class TestDestination(probe: ActorRef) extends Actor { def receive = { case cp @ ConfirmablePersistent(payload, sequenceNr, _) ⇒ @@ -154,6 +166,9 @@ abstract class ViewSpec(config: Config) extends AkkaSpec(config) with Persistenc def awaitConfirmation(probe: TestProbe): Unit = probe.expectMsgType[Delivered] + def subscribeToReplay(probe: TestProbe): Unit = + system.eventStream.subscribe(probe.ref, classOf[ReplayMessages]) + "A view" must { "receive past updates from a processor" in { view = system.actorOf(Props(classOf[TestView], name, viewProbe.ref)) @@ -238,6 +253,28 @@ abstract class ViewSpec(config: Config) extends AkkaSpec(config) with Persistenc view ! "get" viewProbe.expectMsg("replicated-f-6") } + "run size-limited updates automatically" in { + val replayProbe = TestProbe() + + processor ! Persistent("c") + processor ! Persistent("d") + + processorProbe.expectMsg("c-3") + processorProbe.expectMsg("d-4") + + subscribeToReplay(replayProbe) + + view = system.actorOf(Props(classOf[ActiveTestView], name, viewProbe.ref)) + + viewProbe.expectMsg("replicated-a-1") + viewProbe.expectMsg("replicated-b-2") + viewProbe.expectMsg("replicated-c-3") + viewProbe.expectMsg("replicated-d-4") + + replayProbe.expectMsgPF() { case ReplayMessages(1L, _, 2L, _, _, _) ⇒ } + replayProbe.expectMsgPF() { case ReplayMessages(3L, _, 2L, _, _, _) ⇒ } + replayProbe.expectMsgPF() { case ReplayMessages(5L, _, 2L, _, _, _) ⇒ } + } } "A view" can {