prohibit concurrent write request from same persistenceId #19694
- also use scalactic.ConversionCheckedTripleEquals - also remove usage of the setting `max-message-batch-size`
This commit is contained in:
parent
a6aee310ba
commit
6f06e60480
9 changed files with 323 additions and 73 deletions
|
|
@ -330,15 +330,13 @@ command, i.e. ``onPersistRejected`` is called with an exception (typically ``Uns
|
||||||
Batch writes
|
Batch writes
|
||||||
------------
|
------------
|
||||||
|
|
||||||
In order to optimize throughput a persistent actor internally batches events to be stored under high load before
|
In order to optimize throughput when using ``persistAsync``, a persistent actor
|
||||||
writing them to the journal (as a single batch). The batch size dynamically grows from 1 under low and moderate loads
|
internally batches events to be stored under high load before writing them to
|
||||||
to a configurable maximum size (default is ``200``) under high load. When using ``persistAsync`` this increases
|
the journal (as a single batch). The batch size is dynamically determined by
|
||||||
the maximum throughput dramatically.
|
how many events are emitted during the time of a journal round-trip: after
|
||||||
|
sending a batch to the journal no further batch can be sent before confirmation
|
||||||
.. includecode:: ../scala/code/docs/persistence/PersistencePluginDocSpec.scala#max-message-batch-size
|
has been received that the previous batch has been written. Batch writes are never
|
||||||
|
timer-based which keeps latencies at a minimum.
|
||||||
A new batch write is triggered by a persistent actor as soon as a batch reaches the maximum size or if the journal completed
|
|
||||||
writing the previous batch. Batch writes are never timer-based which keeps latencies at a minimum.
|
|
||||||
|
|
||||||
Message deletion
|
Message deletion
|
||||||
----------------
|
----------------
|
||||||
|
|
|
||||||
|
|
@ -317,15 +317,13 @@ command, i.e. ``onPersistRejected`` is called with an exception (typically ``Uns
|
||||||
Batch writes
|
Batch writes
|
||||||
------------
|
------------
|
||||||
|
|
||||||
In order to optimize throughput, a persistent actor internally batches events to be stored under high load before
|
In order to optimize throughput when using ``persistAsync``, a persistent actor
|
||||||
writing them to the journal (as a single batch). The batch size dynamically grows from 1 under low and moderate loads
|
internally batches events to be stored under high load before writing them to
|
||||||
to a configurable maximum size (default is ``200``) under high load. When using ``persistAsync`` this increases
|
the journal (as a single batch). The batch size is dynamically determined by
|
||||||
the maximum throughput dramatically.
|
how many events are emitted during the time of a journal round-trip: after
|
||||||
|
sending a batch to the journal no further batch can be sent before confirmation
|
||||||
.. includecode:: code/docs/persistence/PersistencePluginDocSpec.scala#max-message-batch-size
|
has been received that the previous batch has been written. Batch writes are never
|
||||||
|
timer-based which keeps latencies at a minimum.
|
||||||
A new batch write is triggered by a persistent actor as soon as a batch reaches the maximum size or if the journal completed
|
|
||||||
writing the previous batch. Batch writes are never timer-based which keeps latencies at a minimum.
|
|
||||||
|
|
||||||
Message deletion
|
Message deletion
|
||||||
----------------
|
----------------
|
||||||
|
|
|
||||||
|
|
@ -105,7 +105,9 @@ akka.persistence {
|
||||||
# Dispatcher for message replay.
|
# Dispatcher for message replay.
|
||||||
replay-dispatcher = "akka.persistence.dispatchers.default-replay-dispatcher"
|
replay-dispatcher = "akka.persistence.dispatchers.default-replay-dispatcher"
|
||||||
|
|
||||||
# Maximum size of a persistent message batch written to the journal.
|
# Removed: used to be the Maximum size of a persistent message batch written to the journal.
|
||||||
|
# Now this setting is without function, PersistentActor will write as many messages
|
||||||
|
# as it has accumulated since the last write.
|
||||||
max-message-batch-size = 200
|
max-message-batch-size = 200
|
||||||
|
|
||||||
circuit-breaker {
|
circuit-breaker {
|
||||||
|
|
|
||||||
|
|
@ -51,6 +51,7 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas
|
||||||
private val writerUuid = UUID.randomUUID.toString
|
private val writerUuid = UUID.randomUUID.toString
|
||||||
|
|
||||||
private var journalBatch = Vector.empty[PersistentEnvelope]
|
private var journalBatch = Vector.empty[PersistentEnvelope]
|
||||||
|
// no longer used, but kept for binary compatibility
|
||||||
private val maxMessageBatchSize = extension.journalConfigFor(journalPluginId).getInt("max-message-batch-size")
|
private val maxMessageBatchSize = extension.journalConfigFor(journalPluginId).getInt("max-message-batch-size")
|
||||||
private var writeInProgress = false
|
private var writeInProgress = false
|
||||||
private var sequenceNr: Long = 0L
|
private var sequenceNr: Long = 0L
|
||||||
|
|
@ -232,7 +233,8 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas
|
||||||
sequenceNr
|
sequenceNr
|
||||||
}
|
}
|
||||||
|
|
||||||
private def flushJournalBatch(): Unit = {
|
private def flushJournalBatch(): Unit =
|
||||||
|
if (!writeInProgress) {
|
||||||
journal ! WriteMessages(journalBatch, self, instanceId)
|
journal ! WriteMessages(journalBatch, self, instanceId)
|
||||||
journalBatch = Vector.empty
|
journalBatch = Vector.empty
|
||||||
writeInProgress = true
|
writeInProgress = true
|
||||||
|
|
@ -291,7 +293,8 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas
|
||||||
def persist[A](event: A)(handler: A ⇒ Unit): Unit = {
|
def persist[A](event: A)(handler: A ⇒ Unit): Unit = {
|
||||||
pendingStashingPersistInvocations += 1
|
pendingStashingPersistInvocations += 1
|
||||||
pendingInvocations addLast StashingHandlerInvocation(event, handler.asInstanceOf[Any ⇒ Unit])
|
pendingInvocations addLast StashingHandlerInvocation(event, handler.asInstanceOf[Any ⇒ Unit])
|
||||||
eventBatch = AtomicWrite(PersistentRepr(event, sender = sender())) :: eventBatch
|
eventBatch ::= AtomicWrite(PersistentRepr(event, persistenceId = persistenceId,
|
||||||
|
sequenceNr = nextSequenceNr(), writerUuid = writerUuid, sender = sender()))
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -308,7 +311,8 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas
|
||||||
pendingStashingPersistInvocations += 1
|
pendingStashingPersistInvocations += 1
|
||||||
pendingInvocations addLast StashingHandlerInvocation(event, handler.asInstanceOf[Any ⇒ Unit])
|
pendingInvocations addLast StashingHandlerInvocation(event, handler.asInstanceOf[Any ⇒ Unit])
|
||||||
}
|
}
|
||||||
eventBatch = AtomicWrite(events.map(PersistentRepr.apply(_, sender = sender()))) :: eventBatch
|
eventBatch ::= AtomicWrite(events.map(PersistentRepr.apply(_, persistenceId = persistenceId,
|
||||||
|
sequenceNr = nextSequenceNr(), writerUuid = writerUuid, sender = sender())))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -341,7 +345,8 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas
|
||||||
*/
|
*/
|
||||||
def persistAsync[A](event: A)(handler: A ⇒ Unit): Unit = {
|
def persistAsync[A](event: A)(handler: A ⇒ Unit): Unit = {
|
||||||
pendingInvocations addLast AsyncHandlerInvocation(event, handler.asInstanceOf[Any ⇒ Unit])
|
pendingInvocations addLast AsyncHandlerInvocation(event, handler.asInstanceOf[Any ⇒ Unit])
|
||||||
eventBatch = AtomicWrite(PersistentRepr(event, sender = sender())) :: eventBatch
|
eventBatch ::= AtomicWrite(PersistentRepr(event, persistenceId = persistenceId,
|
||||||
|
sequenceNr = nextSequenceNr(), writerUuid = writerUuid, sender = sender()))
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -357,7 +362,8 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas
|
||||||
events.foreach { event ⇒
|
events.foreach { event ⇒
|
||||||
pendingInvocations addLast AsyncHandlerInvocation(event, handler.asInstanceOf[Any ⇒ Unit])
|
pendingInvocations addLast AsyncHandlerInvocation(event, handler.asInstanceOf[Any ⇒ Unit])
|
||||||
}
|
}
|
||||||
eventBatch = AtomicWrite(events.map(PersistentRepr.apply(_, sender = sender()))) :: eventBatch
|
eventBatch ::= AtomicWrite(events.map(PersistentRepr(_, persistenceId = persistenceId,
|
||||||
|
sequenceNr = nextSequenceNr(), writerUuid = writerUuid, sender = sender())))
|
||||||
}
|
}
|
||||||
|
|
||||||
@deprecated("use persistAllAsync instead", "2.4")
|
@deprecated("use persistAllAsync instead", "2.4")
|
||||||
|
|
@ -501,42 +507,17 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas
|
||||||
}
|
}
|
||||||
|
|
||||||
private def flushBatch() {
|
private def flushBatch() {
|
||||||
def addToBatch(p: PersistentEnvelope): Unit = p match {
|
if (eventBatch.nonEmpty) {
|
||||||
case a: AtomicWrite ⇒
|
journalBatch ++= eventBatch.reverse
|
||||||
journalBatch :+= a.copy(payload =
|
|
||||||
a.payload.map(_.update(persistenceId = persistenceId, sequenceNr = nextSequenceNr(), writerUuid = writerUuid)))
|
|
||||||
case r: PersistentEnvelope ⇒
|
|
||||||
journalBatch :+= r
|
|
||||||
}
|
|
||||||
|
|
||||||
def maxBatchSizeReached: Boolean =
|
|
||||||
journalBatch.size >= maxMessageBatchSize
|
|
||||||
|
|
||||||
// When using only `persistAsync` and `defer` max throughput is increased by using
|
|
||||||
// batching, but when using `persist` we want to use one atomic WriteMessages
|
|
||||||
// for the emitted events.
|
|
||||||
// Flush previously collected events, if any, separately from the `persist` batch
|
|
||||||
if (pendingStashingPersistInvocations > 0 && journalBatch.nonEmpty)
|
|
||||||
flushJournalBatch()
|
|
||||||
|
|
||||||
eventBatch.reverse.foreach { p ⇒
|
|
||||||
addToBatch(p)
|
|
||||||
if (!writeInProgress || maxBatchSizeReached) flushJournalBatch()
|
|
||||||
}
|
|
||||||
|
|
||||||
eventBatch = Nil
|
eventBatch = Nil
|
||||||
}
|
}
|
||||||
|
|
||||||
private def peekApplyHandler(payload: Any): Unit = {
|
if (journalBatch.nonEmpty) flushJournalBatch()
|
||||||
val batchSizeBeforeApply = eventBatch.size
|
}
|
||||||
try pendingInvocations.peek().handler(payload)
|
|
||||||
finally {
|
|
||||||
val batchSizeAfterApply = eventBatch.size
|
|
||||||
|
|
||||||
if (batchSizeAfterApply > batchSizeBeforeApply)
|
private def peekApplyHandler(payload: Any): Unit =
|
||||||
flushBatch()
|
try pendingInvocations.peek().handler(payload)
|
||||||
}
|
finally flushBatch()
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Common receive handler for processingCommands and persistingEvents
|
* Common receive handler for processingCommands and persistingEvents
|
||||||
|
|
@ -573,14 +554,16 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas
|
||||||
// while message is in flight, in that case we ignore the call to the handler
|
// while message is in flight, in that case we ignore the call to the handler
|
||||||
if (id == instanceId) {
|
if (id == instanceId) {
|
||||||
try {
|
try {
|
||||||
pendingInvocations.peek().handler(l)
|
peekApplyHandler(l)
|
||||||
onWriteMessageComplete(err = false)
|
onWriteMessageComplete(err = false)
|
||||||
} catch { case NonFatal(e) ⇒ onWriteMessageComplete(err = true); throw e }
|
} catch { case NonFatal(e) ⇒ onWriteMessageComplete(err = true); throw e }
|
||||||
}
|
}
|
||||||
case WriteMessagesSuccessful ⇒
|
case WriteMessagesSuccessful ⇒
|
||||||
if (journalBatch.isEmpty) writeInProgress = false else flushJournalBatch()
|
writeInProgress = false
|
||||||
|
if (journalBatch.nonEmpty) flushJournalBatch()
|
||||||
|
|
||||||
case WriteMessagesFailed(_) ⇒
|
case WriteMessagesFailed(_) ⇒
|
||||||
|
writeInProgress = false
|
||||||
() // it will be stopped by the first WriteMessageFailure message
|
() // it will be stopped by the first WriteMessageFailure message
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -26,6 +26,8 @@ trait AsyncRecovery {
|
||||||
*
|
*
|
||||||
* The `toSequenceNr` is the lowest of what was returned by [[#asyncReadHighestSequenceNr]]
|
* The `toSequenceNr` is the lowest of what was returned by [[#asyncReadHighestSequenceNr]]
|
||||||
* and what the user specified as recovery [[akka.persistence.Recovery]] parameter.
|
* and what the user specified as recovery [[akka.persistence.Recovery]] parameter.
|
||||||
|
* This does imply that this call is always preceded by reading the highest sequence
|
||||||
|
* number for the given `persistenceId`.
|
||||||
*
|
*
|
||||||
* This call is NOT protected with a circuit-breaker because it may take long time
|
* This call is NOT protected with a circuit-breaker because it may take long time
|
||||||
* to replay all events. The plugin implementation itself must protect against
|
* to replay all events. The plugin implementation itself must protect against
|
||||||
|
|
@ -55,6 +57,10 @@ trait AsyncRecovery {
|
||||||
*
|
*
|
||||||
* This call is protected with a circuit-breaker.
|
* This call is protected with a circuit-breaker.
|
||||||
*
|
*
|
||||||
|
* Please also note that requests for the highest sequence number may be made concurrently
|
||||||
|
* to writes executing for the same `persistenceId`, in particular it is possible that
|
||||||
|
* a restarting actor tries to recover before its outstanding writes have completed.
|
||||||
|
*
|
||||||
* @param persistenceId persistent actor id.
|
* @param persistenceId persistent actor id.
|
||||||
* @param fromSequenceNr hint where to start searching for the highest sequence
|
* @param fromSequenceNr hint where to start searching for the highest sequence
|
||||||
* number. When a persistent actor is recovering this
|
* number. When a persistent actor is recovering this
|
||||||
|
|
|
||||||
|
|
@ -60,7 +60,7 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery {
|
||||||
{
|
{
|
||||||
case WriteMessages(messages, persistentActor, actorInstanceId) ⇒
|
case WriteMessages(messages, persistentActor, actorInstanceId) ⇒
|
||||||
val cctr = resequencerCounter
|
val cctr = resequencerCounter
|
||||||
resequencerCounter += messages.foldLeft(0)((acc, m) ⇒ acc + m.size) + 1
|
resequencerCounter += messages.foldLeft(1)((acc, m) ⇒ acc + m.size)
|
||||||
|
|
||||||
val atomicWriteCount = messages.count(_.isInstanceOf[AtomicWrite])
|
val atomicWriteCount = messages.count(_.isInstanceOf[AtomicWrite])
|
||||||
val prepared = Try(preparePersistentBatch(messages))
|
val prepared = Try(preparePersistentBatch(messages))
|
||||||
|
|
@ -215,11 +215,20 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery {
|
||||||
* work in asyncronous tasks it is alright that they complete the futures in any order,
|
* work in asyncronous tasks it is alright that they complete the futures in any order,
|
||||||
* but the actual writes for a specific persistenceId should be serialized to avoid
|
* but the actual writes for a specific persistenceId should be serialized to avoid
|
||||||
* issues such as events of a later write are visible to consumers (query side, or replay)
|
* issues such as events of a later write are visible to consumers (query side, or replay)
|
||||||
* before the events of an earlier write are visible. This can also be done with
|
* before the events of an earlier write are visible.
|
||||||
* consistent hashing if it is too fine grained to do it on the persistenceId level.
|
* A PersistentActor will not send a new WriteMessages request before the previous one
|
||||||
* Normally a `PersistentActor` will only have one outstanding write request to the journal but
|
* has been completed.
|
||||||
* it may emit several write requests when `persistAsync` is used and the max batch size
|
*
|
||||||
* is reached.
|
* Please note that the `sender` field of the contained PersistentRepr objects has been
|
||||||
|
* nulled out (i.e. set to `ActorRef.noSender`) in order to not use space in the journal
|
||||||
|
* for a sender reference that will likely be obsolete during replay.
|
||||||
|
*
|
||||||
|
* Please also note that requests for the highest sequence number may be made concurrently
|
||||||
|
* to this call executing for the same `persistenceId`, in particular it is possible that
|
||||||
|
* a restarting actor tries to recover before its outstanding writes have completed. In
|
||||||
|
* the latter case it is highly desirable to defer reading the highest sequence number
|
||||||
|
* until all outstanding writes have completed, otherwise the PersistentActor may reuse
|
||||||
|
* sequence numbers.
|
||||||
*
|
*
|
||||||
* This call is protected with a circuit-breaker.
|
* This call is protected with a circuit-breaker.
|
||||||
*/
|
*/
|
||||||
|
|
|
||||||
|
|
@ -8,7 +8,7 @@ import java.io.File
|
||||||
import akka.actor._
|
import akka.actor._
|
||||||
import akka.persistence.EndToEndEventAdapterSpec.NewA
|
import akka.persistence.EndToEndEventAdapterSpec.NewA
|
||||||
import akka.persistence.journal.{ EventSeq, EventAdapter }
|
import akka.persistence.journal.{ EventSeq, EventAdapter }
|
||||||
import akka.testkit.{ ImplicitSender, WatchedByCoroner, AkkaSpec, TestProbe }
|
import akka.testkit.{ ImplicitSender, WatchedByCoroner, AkkaSpec, TestProbe, EventFilter }
|
||||||
import com.typesafe.config.{ Config, ConfigFactory }
|
import com.typesafe.config.{ Config, ConfigFactory }
|
||||||
import org.apache.commons.io.FileUtils
|
import org.apache.commons.io.FileUtils
|
||||||
import org.scalatest.{ WordSpecLike, Matchers, BeforeAndAfterAll }
|
import org.scalatest.{ WordSpecLike, Matchers, BeforeAndAfterAll }
|
||||||
|
|
@ -127,6 +127,7 @@ abstract class EndToEndEventAdapterSpec(journalName: String, journalConfig: Conf
|
||||||
| }
|
| }
|
||||||
| }
|
| }
|
||||||
|}
|
|}
|
||||||
|
|akka.loggers = ["akka.testkit.TestEventListener"]
|
||||||
""".stripMargin)
|
""".stripMargin)
|
||||||
|
|
||||||
val newAdaptersConfig = ConfigFactory.parseString(
|
val newAdaptersConfig = ConfigFactory.parseString(
|
||||||
|
|
@ -226,14 +227,16 @@ abstract class EndToEndEventAdapterSpec(journalName: String, journalConfig: Conf
|
||||||
.withoutPath(s"$journalPath.event-adapters.a")
|
.withoutPath(s"$journalPath.event-adapters.a")
|
||||||
.withoutPath(s"""$journalPath.event-adapter-bindings."${classOf[EndToEndEventAdapterSpec].getCanonicalName}$$A"""")
|
.withoutPath(s"""$journalPath.event-adapter-bindings."${classOf[EndToEndEventAdapterSpec].getCanonicalName}$$A"""")
|
||||||
|
|
||||||
intercept[IllegalArgumentException] {
|
|
||||||
withActorSystem("MissingAdapterSystem", journalConfig.withFallback(missingAdapterConfig)) { implicit system2 ⇒
|
withActorSystem("MissingAdapterSystem", journalConfig.withFallback(missingAdapterConfig)) { implicit system2 ⇒
|
||||||
|
EventFilter[ActorInitializationException](occurrences = 1, pattern = ".*undefined event-adapter.*") intercept {
|
||||||
|
intercept[IllegalArgumentException] {
|
||||||
Persistence(system2).adaptersFor(s"akka.persistence.journal.$journalName").get(classOf[String])
|
Persistence(system2).adaptersFor(s"akka.persistence.journal.$journalName").get(classOf[String])
|
||||||
}
|
|
||||||
}.getMessage should include("was bound to undefined event-adapter: a (bindings: [a, b], known adapters: b)")
|
}.getMessage should include("was bound to undefined event-adapter: a (bindings: [a, b], known adapters: b)")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// needs persistence between actor systems, thus not running with the inmem journal
|
// needs persistence between actor systems, thus not running with the inmem journal
|
||||||
class LeveldbEndToEndEventAdapterSpec extends EndToEndEventAdapterSpec("leveldb", PersistenceSpec.config("leveldb", "LeveldbEndToEndEventAdapterSpec"))
|
class LeveldbEndToEndEventAdapterSpec extends EndToEndEventAdapterSpec("leveldb", PersistenceSpec.config("leveldb", "LeveldbEndToEndEventAdapterSpec"))
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,251 @@
|
||||||
|
/**
|
||||||
|
* Copyright (C) 2016 Typesafe Inc. <http://www.typesafe.com>
|
||||||
|
*/
|
||||||
|
package akka.persistence
|
||||||
|
|
||||||
|
import akka.actor._
|
||||||
|
import akka.testkit._
|
||||||
|
import scala.concurrent.duration._
|
||||||
|
import com.typesafe.config.ConfigFactory
|
||||||
|
import akka.persistence.JournalProtocol._
|
||||||
|
|
||||||
|
object PersistentActorJournalProtocolSpec {
|
||||||
|
|
||||||
|
val config = ConfigFactory.parseString("""
|
||||||
|
puppet {
|
||||||
|
class = "akka.persistence.JournalPuppet"
|
||||||
|
max-message-batch-size = 10
|
||||||
|
}
|
||||||
|
akka.persistence.journal.plugin = puppet
|
||||||
|
akka.persistence.snapshot-store.plugin = "akka.persistence.no-snapshot-store"
|
||||||
|
""")
|
||||||
|
|
||||||
|
sealed trait Command
|
||||||
|
case class Persist(id: Int, msgs: Any*) extends Command
|
||||||
|
case class PersistAsync(id: Int, msgs: Any*) extends Command
|
||||||
|
case class Multi(cmd: Command*) extends Command
|
||||||
|
case class Echo(id: Int) extends Command
|
||||||
|
case class Fail(ex: Throwable) extends Command
|
||||||
|
case class Done(id: Int, sub: Int)
|
||||||
|
|
||||||
|
case class PreStart(name: String)
|
||||||
|
case class PreRestart(name: String)
|
||||||
|
case class PostRestart(name: String)
|
||||||
|
case class PostStop(name: String)
|
||||||
|
|
||||||
|
class A(monitor: ActorRef) extends PersistentActor {
|
||||||
|
|
||||||
|
def persistenceId = self.path.name
|
||||||
|
|
||||||
|
override def preStart(): Unit = monitor ! PreStart(persistenceId)
|
||||||
|
override def preRestart(reason: Throwable, msg: Option[Any]): Unit = monitor ! PreRestart(persistenceId)
|
||||||
|
override def postRestart(reason: Throwable): Unit = monitor ! PostRestart(persistenceId)
|
||||||
|
override def postStop(): Unit = monitor ! PostStop(persistenceId)
|
||||||
|
|
||||||
|
def receiveRecover = {
|
||||||
|
case x ⇒ monitor ! x
|
||||||
|
}
|
||||||
|
def receiveCommand = behavior orElse {
|
||||||
|
case m: Multi ⇒ m.cmd.foreach(behavior)
|
||||||
|
}
|
||||||
|
|
||||||
|
val behavior: Receive = {
|
||||||
|
case p: Persist ⇒ P(p)
|
||||||
|
case p: PersistAsync ⇒ PA(p)
|
||||||
|
case Echo(id) ⇒ sender() ! Done(id, 0)
|
||||||
|
case Fail(ex) ⇒ throw ex
|
||||||
|
}
|
||||||
|
val doNothing = (_: Any) ⇒ ()
|
||||||
|
|
||||||
|
def P(p: Persist): Unit = {
|
||||||
|
var sub = 0
|
||||||
|
persistAll(p.msgs.toList) { e ⇒
|
||||||
|
sender() ! Done(p.id, { sub += 1; sub })
|
||||||
|
behavior.applyOrElse(e, doNothing)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
def PA(p: PersistAsync): Unit = {
|
||||||
|
var sub = 0
|
||||||
|
persistAllAsync(p.msgs.toList) { e ⇒
|
||||||
|
sender() ! Done(p.id, { sub += 1; sub })
|
||||||
|
behavior.applyOrElse(e, doNothing)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
object JournalPuppet extends ExtensionKey[JournalProbe]
|
||||||
|
class JournalProbe(implicit private val system: ExtendedActorSystem) extends Extension {
|
||||||
|
val probe = TestProbe()
|
||||||
|
val ref = probe.ref
|
||||||
|
}
|
||||||
|
|
||||||
|
class JournalPuppet extends Actor {
|
||||||
|
val ref = JournalPuppet(context.system).ref
|
||||||
|
def receive = {
|
||||||
|
case x ⇒ ref forward x
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
import PersistentActorJournalProtocolSpec._
|
||||||
|
|
||||||
|
class PersistentActorJournalProtocolSpec extends AkkaSpec(config) with ImplicitSender {
|
||||||
|
|
||||||
|
val journal = JournalPuppet(system).probe
|
||||||
|
|
||||||
|
case class Msgs(msg: Any*)
|
||||||
|
|
||||||
|
def expectWrite(subject: ActorRef, msgs: Msgs*): WriteMessages = {
|
||||||
|
val w = journal.expectMsgType[WriteMessages]
|
||||||
|
withClue(s"$w: ") {
|
||||||
|
w.persistentActor should ===(subject)
|
||||||
|
w.messages.size should ===(msgs.size)
|
||||||
|
w.messages.zip(msgs).foreach {
|
||||||
|
case (AtomicWrite(writes), msg) ⇒
|
||||||
|
writes.size should ===(msg.msg.size)
|
||||||
|
writes.zip(msg.msg).foreach {
|
||||||
|
case (PersistentRepr(evt, _), m) ⇒
|
||||||
|
evt should ===(m)
|
||||||
|
}
|
||||||
|
case x ⇒ fail(s"unexpected $x")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
w
|
||||||
|
}
|
||||||
|
|
||||||
|
def confirm(w: WriteMessages): Unit = {
|
||||||
|
journal.send(w.persistentActor, WriteMessagesSuccessful)
|
||||||
|
w.messages.foreach {
|
||||||
|
case AtomicWrite(msgs) ⇒
|
||||||
|
msgs.foreach(msg ⇒
|
||||||
|
w.persistentActor.tell(WriteMessageSuccess(msg, w.actorInstanceId), msg.sender))
|
||||||
|
case NonPersistentRepr(msg, sender) ⇒ w.persistentActor.tell(msg, sender)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
def startActor(name: String): ActorRef = {
|
||||||
|
val subject = system.actorOf(Props(new A(testActor)), name)
|
||||||
|
subject ! Echo(0)
|
||||||
|
expectMsg(PreStart(name))
|
||||||
|
journal.expectMsgType[ReplayMessages]
|
||||||
|
journal.reply(RecoverySuccess(0L))
|
||||||
|
expectMsg(RecoveryCompleted)
|
||||||
|
expectMsg(Done(0, 0))
|
||||||
|
subject
|
||||||
|
}
|
||||||
|
|
||||||
|
"A PersistentActor’s journal protocol" must {
|
||||||
|
|
||||||
|
"not send WriteMessages while a write is still outstanding" when {
|
||||||
|
|
||||||
|
"using simple persist()" in {
|
||||||
|
val subject = startActor("test-1")
|
||||||
|
subject ! Persist(1, "a-1")
|
||||||
|
val w1 = expectWrite(subject, Msgs("a-1"))
|
||||||
|
subject ! Persist(2, "a-2")
|
||||||
|
expectNoMsg(300.millis)
|
||||||
|
journal.msgAvailable should ===(false)
|
||||||
|
confirm(w1)
|
||||||
|
expectMsg(Done(1, 1))
|
||||||
|
val w2 = expectWrite(subject, Msgs("a-2"))
|
||||||
|
confirm(w2)
|
||||||
|
expectMsg(Done(2, 1))
|
||||||
|
subject ! PoisonPill
|
||||||
|
expectMsg(PostStop("test-1"))
|
||||||
|
journal.msgAvailable should ===(false)
|
||||||
|
}
|
||||||
|
|
||||||
|
"using nested persist()" in {
|
||||||
|
val subject = startActor("test-2")
|
||||||
|
subject ! Persist(1, Persist(2, "a-1"))
|
||||||
|
val w1 = expectWrite(subject, Msgs(Persist(2, "a-1")))
|
||||||
|
subject ! Persist(3, "a-2")
|
||||||
|
expectNoMsg(300.millis)
|
||||||
|
journal.msgAvailable should ===(false)
|
||||||
|
confirm(w1)
|
||||||
|
expectMsg(Done(1, 1))
|
||||||
|
val w2 = expectWrite(subject, Msgs("a-1"))
|
||||||
|
confirm(w2)
|
||||||
|
expectMsg(Done(2, 1))
|
||||||
|
val w3 = expectWrite(subject, Msgs("a-2"))
|
||||||
|
confirm(w3)
|
||||||
|
expectMsg(Done(3, 1))
|
||||||
|
subject ! PoisonPill
|
||||||
|
expectMsg(PostStop("test-2"))
|
||||||
|
journal.msgAvailable should ===(false)
|
||||||
|
}
|
||||||
|
|
||||||
|
"using nested multiple persist()" in {
|
||||||
|
val subject = startActor("test-3")
|
||||||
|
subject ! Multi(Persist(1, Persist(2, "a-1")), Persist(3, "a-2"))
|
||||||
|
val w1 = expectWrite(subject, Msgs(Persist(2, "a-1")), Msgs("a-2"))
|
||||||
|
confirm(w1)
|
||||||
|
expectMsg(Done(1, 1))
|
||||||
|
expectMsg(Done(3, 1))
|
||||||
|
val w2 = expectWrite(subject, Msgs("a-1"))
|
||||||
|
confirm(w2)
|
||||||
|
expectMsg(Done(2, 1))
|
||||||
|
subject ! PoisonPill
|
||||||
|
expectMsg(PostStop("test-3"))
|
||||||
|
journal.msgAvailable should ===(false)
|
||||||
|
}
|
||||||
|
|
||||||
|
"using large number of persist() calls" in {
|
||||||
|
val subject = startActor("test-4")
|
||||||
|
subject ! Multi(Vector.tabulate(30)(i ⇒ Persist(i, s"a-$i")): _*)
|
||||||
|
val w1 = expectWrite(subject, Vector.tabulate(30)(i ⇒ Msgs(s"a-$i")): _*)
|
||||||
|
confirm(w1)
|
||||||
|
for (i ← 0 until 30) expectMsg(Done(i, 1))
|
||||||
|
subject ! PoisonPill
|
||||||
|
expectMsg(PostStop("test-4"))
|
||||||
|
journal.msgAvailable should ===(false)
|
||||||
|
}
|
||||||
|
|
||||||
|
"using large number of persistAsync() calls" in {
|
||||||
|
def msgs(start: Int, end: Int) = (start until end).map(i ⇒ Msgs(s"a-$i-1", s"a-$i-2"))
|
||||||
|
def commands(start: Int, end: Int) = (start until end).map(i ⇒ PersistAsync(i, s"a-$i-1", s"a-$i-2"))
|
||||||
|
def expectDone(start: Int, end: Int) = for (i ← start until end; j ← 1 to 2) expectMsg(Done(i, j))
|
||||||
|
|
||||||
|
val subject = startActor("test-5")
|
||||||
|
subject ! PersistAsync(-1, "a" +: commands(20, 30): _*)
|
||||||
|
subject ! Multi(commands(0, 10): _*)
|
||||||
|
subject ! Multi(commands(10, 20): _*)
|
||||||
|
val w0 = expectWrite(subject, Msgs("a" +: commands(20, 30): _*))
|
||||||
|
journal.expectNoMsg(300.millis)
|
||||||
|
confirm(w0)
|
||||||
|
(1 to 11) foreach (x ⇒ expectMsg(Done(-1, x)))
|
||||||
|
val w1 = expectWrite(subject, msgs(0, 20): _*)
|
||||||
|
journal.expectNoMsg(300.millis)
|
||||||
|
confirm(w1)
|
||||||
|
expectDone(0, 20)
|
||||||
|
val w2 = expectWrite(subject, msgs(20, 30): _*)
|
||||||
|
confirm(w2)
|
||||||
|
expectDone(20, 30)
|
||||||
|
subject ! PoisonPill
|
||||||
|
expectMsg(PostStop("test-5"))
|
||||||
|
journal.msgAvailable should ===(false)
|
||||||
|
}
|
||||||
|
|
||||||
|
"the actor fails with queued events" in {
|
||||||
|
val subject = startActor("test-6")
|
||||||
|
subject ! PersistAsync(1, "a-1")
|
||||||
|
val w1 = expectWrite(subject, Msgs("a-1"))
|
||||||
|
subject ! PersistAsync(2, "a-2")
|
||||||
|
EventFilter[Exception](message = "K-BOOM!", occurrences = 1) intercept {
|
||||||
|
subject ! Fail(new Exception("K-BOOM!"))
|
||||||
|
expectMsg(PreRestart("test-6"))
|
||||||
|
expectMsg(PostRestart("test-6"))
|
||||||
|
journal.expectMsgType[ReplayMessages]
|
||||||
|
}
|
||||||
|
journal.reply(RecoverySuccess(0L))
|
||||||
|
expectMsg(RecoveryCompleted)
|
||||||
|
confirm(w1)
|
||||||
|
subject ! PoisonPill
|
||||||
|
expectMsg(PostStop("test-6"))
|
||||||
|
journal.msgAvailable should ===(false)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -15,7 +15,7 @@ import scala.concurrent.Future
|
||||||
import com.typesafe.config.{ Config, ConfigFactory }
|
import com.typesafe.config.{ Config, ConfigFactory }
|
||||||
import akka.dispatch.Dispatchers
|
import akka.dispatch.Dispatchers
|
||||||
import akka.testkit.TestEvent._
|
import akka.testkit.TestEvent._
|
||||||
import org.scalautils.ConversionCheckedTripleEquals
|
import org.scalactic.ConversionCheckedTripleEquals
|
||||||
|
|
||||||
object AkkaSpec {
|
object AkkaSpec {
|
||||||
val testConf: Config = ConfigFactory.parseString("""
|
val testConf: Config = ConfigFactory.parseString("""
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue