Merge pull request #1959 from krasserm/wip-3844-enforce-replay-max-krasserm

=per #3844 Views now enforce autoUpdateReplayMax
This commit is contained in:
Patrik Nordwall 2014-01-24 02:45:19 -08:00
commit 787c7ca439
5 changed files with 46 additions and 6 deletions

View file

@ -129,7 +129,7 @@ trait View extends Actor with Recovery {
*/
private def onReplayComplete(await: Boolean): Unit = {
_currentState = idle
if (autoUpdate) schedule = Some(context.system.scheduler.scheduleOnce(autoUpdateInterval, self, Update(await = false)))
if (autoUpdate) schedule = Some(context.system.scheduler.scheduleOnce(autoUpdateInterval, self, Update(await = false, autoUpdateReplayMax)))
if (await) receiverStash.unstashAll()
}

View file

@ -42,7 +42,7 @@ trait AsyncWriteJournal extends Actor with AsyncRecovery {
resequence(WriteMessageFailure(_, e))
}
resequencerCounter += persistentBatch.length + 1
case ReplayMessages(fromSequenceNr, toSequenceNr, max, processorId, processor, replayDeleted)
case r @ ReplayMessages(fromSequenceNr, toSequenceNr, max, processorId, processor, replayDeleted)
// Send replayed messages and replay result to processor directly. No need
// to resequence replayed messages relative to written and looped messages.
asyncReplayMessages(processorId, fromSequenceNr, toSequenceNr, max) { p
@ -51,7 +51,9 @@ trait AsyncWriteJournal extends Actor with AsyncRecovery {
case _ ReplayMessagesSuccess
} recover {
case e ReplayMessagesFailure(e)
} pipeTo (processor)
} pipeTo (processor) onSuccess {
case _ if publish context.system.eventStream.publish(r)
}
case ReadHighestSequenceNr(fromSequenceNr, processorId, processor)
// Send read highest sequence number to processor directly. No need
// to resequence the result relative to written and looped messages.

View file

@ -33,14 +33,16 @@ trait SyncWriteJournal extends Actor with AsyncRecovery {
persistentBatch.foreach(p processor tell (WriteMessageFailure(p, e), p.sender))
throw e
}
case ReplayMessages(fromSequenceNr, toSequenceNr, max, processorId, processor, replayDeleted)
case r @ ReplayMessages(fromSequenceNr, toSequenceNr, max, processorId, processor, replayDeleted)
asyncReplayMessages(processorId, fromSequenceNr, toSequenceNr, max) { p
if (!p.deleted || replayDeleted) processor.tell(ReplayedMessage(p), p.sender)
} map {
case _ ReplayMessagesSuccess
} recover {
case e ReplayMessagesFailure(e)
} pipeTo (processor)
} pipeTo (processor) onSuccess {
case _ if publish context.system.eventStream.publish(r)
}
case ReadHighestSequenceNr(fromSequenceNr, processorId, processor)
asyncReadHighestSequenceNr(processorId, fromSequenceNr).map {
highest ReadHighestSequenceNrSuccess(highest)

View file

@ -15,7 +15,6 @@ object PerformanceSpec {
"""
akka.persistence.performance.cycles.warmup = 300
akka.persistence.performance.cycles.load = 1000
akka.persistence.publish-confirmations = on
"""
case object StartMeasure

View file

@ -9,6 +9,7 @@ import com.typesafe.config.Config
import akka.actor._
import akka.testkit._
import akka.persistence.JournalProtocol.ReplayMessages
object ViewSpec {
class TestProcessor(name: String, probe: ActorRef) extends NamedProcessor(name) {
@ -71,6 +72,17 @@ object ViewSpec {
}
}
class ActiveTestView(name: String, probe: ActorRef) extends View {
override val processorId: String = name
override def autoUpdateInterval: FiniteDuration = 50.millis
override def autoUpdateReplayMax: Long = 2
def receive = {
case Persistent(payload, sequenceNr)
probe ! s"replicated-${payload}-${sequenceNr}"
}
}
class TestDestination(probe: ActorRef) extends Actor {
def receive = {
case cp @ ConfirmablePersistent(payload, sequenceNr, _)
@ -154,6 +166,9 @@ abstract class ViewSpec(config: Config) extends AkkaSpec(config) with Persistenc
def awaitConfirmation(probe: TestProbe): Unit =
probe.expectMsgType[Delivered]
def subscribeToReplay(probe: TestProbe): Unit =
system.eventStream.subscribe(probe.ref, classOf[ReplayMessages])
"A view" must {
"receive past updates from a processor" in {
view = system.actorOf(Props(classOf[TestView], name, viewProbe.ref))
@ -238,6 +253,28 @@ abstract class ViewSpec(config: Config) extends AkkaSpec(config) with Persistenc
view ! "get"
viewProbe.expectMsg("replicated-f-6")
}
"run size-limited updates automatically" in {
val replayProbe = TestProbe()
processor ! Persistent("c")
processor ! Persistent("d")
processorProbe.expectMsg("c-3")
processorProbe.expectMsg("d-4")
subscribeToReplay(replayProbe)
view = system.actorOf(Props(classOf[ActiveTestView], name, viewProbe.ref))
viewProbe.expectMsg("replicated-a-1")
viewProbe.expectMsg("replicated-b-2")
viewProbe.expectMsg("replicated-c-3")
viewProbe.expectMsg("replicated-d-4")
replayProbe.expectMsgPF() { case ReplayMessages(1L, _, 2L, _, _, _) }
replayProbe.expectMsgPF() { case ReplayMessages(3L, _, 2L, _, _, _) }
replayProbe.expectMsgPF() { case ReplayMessages(5L, _, 2L, _, _, _) }
}
}
"A view" can {