diff --git a/akka-docs/rst/java/persistence.rst b/akka-docs/rst/java/persistence.rst index eeda6b173e..5e7ca7a41a 100644 --- a/akka-docs/rst/java/persistence.rst +++ b/akka-docs/rst/java/persistence.rst @@ -330,15 +330,13 @@ command, i.e. ``onPersistRejected`` is called with an exception (typically ``Uns Batch writes ------------ -In order to optimize throughput a persistent actor internally batches events to be stored under high load before -writing them to the journal (as a single batch). The batch size dynamically grows from 1 under low and moderate loads -to a configurable maximum size (default is ``200``) under high load. When using ``persistAsync`` this increases -the maximum throughput dramatically. - -.. includecode:: ../scala/code/docs/persistence/PersistencePluginDocSpec.scala#max-message-batch-size - -A new batch write is triggered by a persistent actor as soon as a batch reaches the maximum size or if the journal completed -writing the previous batch. Batch writes are never timer-based which keeps latencies at a minimum. +In order to optimize throughput when using ``persistAsync``, a persistent actor +internally batches events to be stored under high load before writing them to +the journal (as a single batch). The batch size is dynamically determined by +how many events are emitted during the time of a journal round-trip: after +sending a batch to the journal no further batch can be sent before confirmation +has been received that the previous batch has been written. Batch writes are never +timer-based which keeps latencies at a minimum. Message deletion ---------------- diff --git a/akka-docs/rst/scala/persistence.rst b/akka-docs/rst/scala/persistence.rst index 3551a34bc8..790c1f6549 100644 --- a/akka-docs/rst/scala/persistence.rst +++ b/akka-docs/rst/scala/persistence.rst @@ -317,15 +317,13 @@ command, i.e. ``onPersistRejected`` is called with an exception (typically ``Uns Batch writes ------------ -In order to optimize throughput, a persistent actor internally batches events to be stored under high load before -writing them to the journal (as a single batch). The batch size dynamically grows from 1 under low and moderate loads -to a configurable maximum size (default is ``200``) under high load. When using ``persistAsync`` this increases -the maximum throughput dramatically. - -.. includecode:: code/docs/persistence/PersistencePluginDocSpec.scala#max-message-batch-size - -A new batch write is triggered by a persistent actor as soon as a batch reaches the maximum size or if the journal completed -writing the previous batch. Batch writes are never timer-based which keeps latencies at a minimum. +In order to optimize throughput when using ``persistAsync``, a persistent actor +internally batches events to be stored under high load before writing them to +the journal (as a single batch). The batch size is dynamically determined by +how many events are emitted during the time of a journal round-trip: after +sending a batch to the journal no further batch can be sent before confirmation +has been received that the previous batch has been written. Batch writes are never +timer-based which keeps latencies at a minimum. Message deletion ---------------- diff --git a/akka-persistence-tck/src/main/scala/akka/persistence/journal/JournalSpec.scala b/akka-persistence-tck/src/main/scala/akka/persistence/journal/JournalSpec.scala index 6792d9d7ef..87b4eaf49d 100644 --- a/akka-persistence-tck/src/main/scala/akka/persistence/journal/JournalSpec.scala +++ b/akka-persistence-tck/src/main/scala/akka/persistence/journal/JournalSpec.scala @@ -46,9 +46,17 @@ abstract class JournalSpec(config: Config) extends PluginSpec(config) with MayVe super.beforeEach() senderProbe = TestProbe() receiverProbe = TestProbe() + preparePersistenceId(pid) writeMessages(1, 5, pid, senderProbe.ref, writerUuid) } + /** + * Overridable hook that is called before populating the journal for the next + * test case. `pid` is the `persistenceId` that will be used in the test. + * This method may be needed to clean pre-existing events from the log. + */ + def preparePersistenceId(pid: String): Unit = () + /** * Implementation may override and return false if it does not * support atomic writes of several events, as emitted by `persistAll`. diff --git a/akka-persistence/src/main/resources/reference.conf b/akka-persistence/src/main/resources/reference.conf index 07b674e475..571c3239a2 100644 --- a/akka-persistence/src/main/resources/reference.conf +++ b/akka-persistence/src/main/resources/reference.conf @@ -105,7 +105,9 @@ akka.persistence { # Dispatcher for message replay. replay-dispatcher = "akka.persistence.dispatchers.default-replay-dispatcher" - # Maximum size of a persistent message batch written to the journal. + # Removed: used to be the Maximum size of a persistent message batch written to the journal. + # Now this setting is without function, PersistentActor will write as many messages + # as it has accumulated since the last write. max-message-batch-size = 200 circuit-breaker { diff --git a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala index 754f7ef578..669e1729f9 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala @@ -51,6 +51,7 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas private val writerUuid = UUID.randomUUID.toString private var journalBatch = Vector.empty[PersistentEnvelope] + // no longer used, but kept for binary compatibility private val maxMessageBatchSize = extension.journalConfigFor(journalPluginId).getInt("max-message-batch-size") private var writeInProgress = false private var sequenceNr: Long = 0L @@ -232,11 +233,12 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas sequenceNr } - private def flushJournalBatch(): Unit = { - journal ! WriteMessages(journalBatch, self, instanceId) - journalBatch = Vector.empty - writeInProgress = true - } + private def flushJournalBatch(): Unit = + if (!writeInProgress) { + journal ! WriteMessages(journalBatch, self, instanceId) + journalBatch = Vector.empty + writeInProgress = true + } private def log: LoggingAdapter = Logging(context.system, this) @@ -291,7 +293,8 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas def persist[A](event: A)(handler: A ⇒ Unit): Unit = { pendingStashingPersistInvocations += 1 pendingInvocations addLast StashingHandlerInvocation(event, handler.asInstanceOf[Any ⇒ Unit]) - eventBatch = AtomicWrite(PersistentRepr(event, sender = sender())) :: eventBatch + eventBatch ::= AtomicWrite(PersistentRepr(event, persistenceId = persistenceId, + sequenceNr = nextSequenceNr(), writerUuid = writerUuid, sender = sender())) } /** @@ -308,7 +311,8 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas pendingStashingPersistInvocations += 1 pendingInvocations addLast StashingHandlerInvocation(event, handler.asInstanceOf[Any ⇒ Unit]) } - eventBatch = AtomicWrite(events.map(PersistentRepr.apply(_, sender = sender()))) :: eventBatch + eventBatch ::= AtomicWrite(events.map(PersistentRepr.apply(_, persistenceId = persistenceId, + sequenceNr = nextSequenceNr(), writerUuid = writerUuid, sender = sender()))) } } @@ -341,7 +345,8 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas */ def persistAsync[A](event: A)(handler: A ⇒ Unit): Unit = { pendingInvocations addLast AsyncHandlerInvocation(event, handler.asInstanceOf[Any ⇒ Unit]) - eventBatch = AtomicWrite(PersistentRepr(event, sender = sender())) :: eventBatch + eventBatch ::= AtomicWrite(PersistentRepr(event, persistenceId = persistenceId, + sequenceNr = nextSequenceNr(), writerUuid = writerUuid, sender = sender())) } /** @@ -357,7 +362,8 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas events.foreach { event ⇒ pendingInvocations addLast AsyncHandlerInvocation(event, handler.asInstanceOf[Any ⇒ Unit]) } - eventBatch = AtomicWrite(events.map(PersistentRepr.apply(_, sender = sender()))) :: eventBatch + eventBatch ::= AtomicWrite(events.map(PersistentRepr(_, persistenceId = persistenceId, + sequenceNr = nextSequenceNr(), writerUuid = writerUuid, sender = sender()))) } @deprecated("use persistAllAsync instead", "2.4") @@ -501,42 +507,17 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas } private def flushBatch() { - def addToBatch(p: PersistentEnvelope): Unit = p match { - case a: AtomicWrite ⇒ - journalBatch :+= a.copy(payload = - a.payload.map(_.update(persistenceId = persistenceId, sequenceNr = nextSequenceNr(), writerUuid = writerUuid))) - case r: PersistentEnvelope ⇒ - journalBatch :+= r + if (eventBatch.nonEmpty) { + journalBatch ++= eventBatch.reverse + eventBatch = Nil } - def maxBatchSizeReached: Boolean = - journalBatch.size >= maxMessageBatchSize - - // When using only `persistAsync` and `defer` max throughput is increased by using - // batching, but when using `persist` we want to use one atomic WriteMessages - // for the emitted events. - // Flush previously collected events, if any, separately from the `persist` batch - if (pendingStashingPersistInvocations > 0 && journalBatch.nonEmpty) - flushJournalBatch() - - eventBatch.reverse.foreach { p ⇒ - addToBatch(p) - if (!writeInProgress || maxBatchSizeReached) flushJournalBatch() - } - - eventBatch = Nil + if (journalBatch.nonEmpty) flushJournalBatch() } - private def peekApplyHandler(payload: Any): Unit = { - val batchSizeBeforeApply = eventBatch.size + private def peekApplyHandler(payload: Any): Unit = try pendingInvocations.peek().handler(payload) - finally { - val batchSizeAfterApply = eventBatch.size - - if (batchSizeAfterApply > batchSizeBeforeApply) - flushBatch() - } - } + finally flushBatch() /** * Common receive handler for processingCommands and persistingEvents @@ -573,14 +554,16 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas // while message is in flight, in that case we ignore the call to the handler if (id == instanceId) { try { - pendingInvocations.peek().handler(l) + peekApplyHandler(l) onWriteMessageComplete(err = false) } catch { case NonFatal(e) ⇒ onWriteMessageComplete(err = true); throw e } } case WriteMessagesSuccessful ⇒ - if (journalBatch.isEmpty) writeInProgress = false else flushJournalBatch() + writeInProgress = false + if (journalBatch.nonEmpty) flushJournalBatch() case WriteMessagesFailed(_) ⇒ + writeInProgress = false () // it will be stopped by the first WriteMessageFailure message } diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncRecovery.scala b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncRecovery.scala index 9648f7b6e5..3df29021c5 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncRecovery.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncRecovery.scala @@ -26,6 +26,8 @@ trait AsyncRecovery { * * The `toSequenceNr` is the lowest of what was returned by [[#asyncReadHighestSequenceNr]] * and what the user specified as recovery [[akka.persistence.Recovery]] parameter. + * This does imply that this call is always preceded by reading the highest sequence + * number for the given `persistenceId`. * * This call is NOT protected with a circuit-breaker because it may take long time * to replay all events. The plugin implementation itself must protect against @@ -55,6 +57,10 @@ trait AsyncRecovery { * * This call is protected with a circuit-breaker. * + * Please also note that requests for the highest sequence number may be made concurrently + * to writes executing for the same `persistenceId`, in particular it is possible that + * a restarting actor tries to recover before its outstanding writes have completed. + * * @param persistenceId persistent actor id. * @param fromSequenceNr hint where to start searching for the highest sequence * number. When a persistent actor is recovering this diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala index 745cae10cf..1591952f5e 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala @@ -60,7 +60,7 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery { { case WriteMessages(messages, persistentActor, actorInstanceId) ⇒ val cctr = resequencerCounter - resequencerCounter += messages.foldLeft(0)((acc, m) ⇒ acc + m.size) + 1 + resequencerCounter += messages.foldLeft(1)((acc, m) ⇒ acc + m.size) val atomicWriteCount = messages.count(_.isInstanceOf[AtomicWrite]) val prepared = Try(preparePersistentBatch(messages)) @@ -215,11 +215,20 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery { * work in asyncronous tasks it is alright that they complete the futures in any order, * but the actual writes for a specific persistenceId should be serialized to avoid * issues such as events of a later write are visible to consumers (query side, or replay) - * before the events of an earlier write are visible. This can also be done with - * consistent hashing if it is too fine grained to do it on the persistenceId level. - * Normally a `PersistentActor` will only have one outstanding write request to the journal but - * it may emit several write requests when `persistAsync` is used and the max batch size - * is reached. + * before the events of an earlier write are visible. + * A PersistentActor will not send a new WriteMessages request before the previous one + * has been completed. + * + * Please note that the `sender` field of the contained PersistentRepr objects has been + * nulled out (i.e. set to `ActorRef.noSender`) in order to not use space in the journal + * for a sender reference that will likely be obsolete during replay. + * + * Please also note that requests for the highest sequence number may be made concurrently + * to this call executing for the same `persistenceId`, in particular it is possible that + * a restarting actor tries to recover before its outstanding writes have completed. In + * the latter case it is highly desirable to defer reading the highest sequence number + * until all outstanding writes have completed, otherwise the PersistentActor may reuse + * sequence numbers. * * This call is protected with a circuit-breaker. */ diff --git a/akka-persistence/src/test/scala/akka/persistence/EndToEndEventAdapterSpec.scala b/akka-persistence/src/test/scala/akka/persistence/EndToEndEventAdapterSpec.scala index bd39db7a3b..1389097b82 100644 --- a/akka-persistence/src/test/scala/akka/persistence/EndToEndEventAdapterSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/EndToEndEventAdapterSpec.scala @@ -8,7 +8,7 @@ import java.io.File import akka.actor._ import akka.persistence.EndToEndEventAdapterSpec.NewA import akka.persistence.journal.{ EventSeq, EventAdapter } -import akka.testkit.{ ImplicitSender, WatchedByCoroner, AkkaSpec, TestProbe } +import akka.testkit.{ ImplicitSender, WatchedByCoroner, AkkaSpec, TestProbe, EventFilter } import com.typesafe.config.{ Config, ConfigFactory } import org.apache.commons.io.FileUtils import org.scalatest.{ WordSpecLike, Matchers, BeforeAndAfterAll } @@ -127,6 +127,7 @@ abstract class EndToEndEventAdapterSpec(journalName: String, journalConfig: Conf | } | } |} + |akka.loggers = ["akka.testkit.TestEventListener"] """.stripMargin) val newAdaptersConfig = ConfigFactory.parseString( @@ -226,11 +227,13 @@ abstract class EndToEndEventAdapterSpec(journalName: String, journalConfig: Conf .withoutPath(s"$journalPath.event-adapters.a") .withoutPath(s"""$journalPath.event-adapter-bindings."${classOf[EndToEndEventAdapterSpec].getCanonicalName}$$A"""") - intercept[IllegalArgumentException] { - withActorSystem("MissingAdapterSystem", journalConfig.withFallback(missingAdapterConfig)) { implicit system2 ⇒ - Persistence(system2).adaptersFor(s"akka.persistence.journal.$journalName").get(classOf[String]) + withActorSystem("MissingAdapterSystem", journalConfig.withFallback(missingAdapterConfig)) { implicit system2 ⇒ + EventFilter[ActorInitializationException](occurrences = 1, pattern = ".*undefined event-adapter.*") intercept { + intercept[IllegalArgumentException] { + Persistence(system2).adaptersFor(s"akka.persistence.journal.$journalName").get(classOf[String]) + }.getMessage should include("was bound to undefined event-adapter: a (bindings: [a, b], known adapters: b)") } - }.getMessage should include("was bound to undefined event-adapter: a (bindings: [a, b], known adapters: b)") + } } } } diff --git a/akka-persistence/src/test/scala/akka/persistence/PersistentActorJournalProtocolSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PersistentActorJournalProtocolSpec.scala new file mode 100644 index 0000000000..8ec6c24cd2 --- /dev/null +++ b/akka-persistence/src/test/scala/akka/persistence/PersistentActorJournalProtocolSpec.scala @@ -0,0 +1,251 @@ +/** + * Copyright (C) 2016 Typesafe Inc. + */ +package akka.persistence + +import akka.actor._ +import akka.testkit._ +import scala.concurrent.duration._ +import com.typesafe.config.ConfigFactory +import akka.persistence.JournalProtocol._ + +object PersistentActorJournalProtocolSpec { + + val config = ConfigFactory.parseString(""" +puppet { + class = "akka.persistence.JournalPuppet" + max-message-batch-size = 10 +} +akka.persistence.journal.plugin = puppet +akka.persistence.snapshot-store.plugin = "akka.persistence.no-snapshot-store" +""") + + sealed trait Command + case class Persist(id: Int, msgs: Any*) extends Command + case class PersistAsync(id: Int, msgs: Any*) extends Command + case class Multi(cmd: Command*) extends Command + case class Echo(id: Int) extends Command + case class Fail(ex: Throwable) extends Command + case class Done(id: Int, sub: Int) + + case class PreStart(name: String) + case class PreRestart(name: String) + case class PostRestart(name: String) + case class PostStop(name: String) + + class A(monitor: ActorRef) extends PersistentActor { + + def persistenceId = self.path.name + + override def preStart(): Unit = monitor ! PreStart(persistenceId) + override def preRestart(reason: Throwable, msg: Option[Any]): Unit = monitor ! PreRestart(persistenceId) + override def postRestart(reason: Throwable): Unit = monitor ! PostRestart(persistenceId) + override def postStop(): Unit = monitor ! PostStop(persistenceId) + + def receiveRecover = { + case x ⇒ monitor ! x + } + def receiveCommand = behavior orElse { + case m: Multi ⇒ m.cmd.foreach(behavior) + } + + val behavior: Receive = { + case p: Persist ⇒ P(p) + case p: PersistAsync ⇒ PA(p) + case Echo(id) ⇒ sender() ! Done(id, 0) + case Fail(ex) ⇒ throw ex + } + val doNothing = (_: Any) ⇒ () + + def P(p: Persist): Unit = { + var sub = 0 + persistAll(p.msgs.toList) { e ⇒ + sender() ! Done(p.id, { sub += 1; sub }) + behavior.applyOrElse(e, doNothing) + } + } + def PA(p: PersistAsync): Unit = { + var sub = 0 + persistAllAsync(p.msgs.toList) { e ⇒ + sender() ! Done(p.id, { sub += 1; sub }) + behavior.applyOrElse(e, doNothing) + } + } + } +} + +object JournalPuppet extends ExtensionKey[JournalProbe] +class JournalProbe(implicit private val system: ExtendedActorSystem) extends Extension { + val probe = TestProbe() + val ref = probe.ref +} + +class JournalPuppet extends Actor { + val ref = JournalPuppet(context.system).ref + def receive = { + case x ⇒ ref forward x + } +} + +import PersistentActorJournalProtocolSpec._ + +class PersistentActorJournalProtocolSpec extends AkkaSpec(config) with ImplicitSender { + + val journal = JournalPuppet(system).probe + + case class Msgs(msg: Any*) + + def expectWrite(subject: ActorRef, msgs: Msgs*): WriteMessages = { + val w = journal.expectMsgType[WriteMessages] + withClue(s"$w: ") { + w.persistentActor should ===(subject) + w.messages.size should ===(msgs.size) + w.messages.zip(msgs).foreach { + case (AtomicWrite(writes), msg) ⇒ + writes.size should ===(msg.msg.size) + writes.zip(msg.msg).foreach { + case (PersistentRepr(evt, _), m) ⇒ + evt should ===(m) + } + case x ⇒ fail(s"unexpected $x") + } + } + w + } + + def confirm(w: WriteMessages): Unit = { + journal.send(w.persistentActor, WriteMessagesSuccessful) + w.messages.foreach { + case AtomicWrite(msgs) ⇒ + msgs.foreach(msg ⇒ + w.persistentActor.tell(WriteMessageSuccess(msg, w.actorInstanceId), msg.sender)) + case NonPersistentRepr(msg, sender) ⇒ w.persistentActor.tell(msg, sender) + } + } + + def startActor(name: String): ActorRef = { + val subject = system.actorOf(Props(new A(testActor)), name) + subject ! Echo(0) + expectMsg(PreStart(name)) + journal.expectMsgType[ReplayMessages] + journal.reply(RecoverySuccess(0L)) + expectMsg(RecoveryCompleted) + expectMsg(Done(0, 0)) + subject + } + + "A PersistentActor’s journal protocol" must { + + "not send WriteMessages while a write is still outstanding" when { + + "using simple persist()" in { + val subject = startActor("test-1") + subject ! Persist(1, "a-1") + val w1 = expectWrite(subject, Msgs("a-1")) + subject ! Persist(2, "a-2") + expectNoMsg(300.millis) + journal.msgAvailable should ===(false) + confirm(w1) + expectMsg(Done(1, 1)) + val w2 = expectWrite(subject, Msgs("a-2")) + confirm(w2) + expectMsg(Done(2, 1)) + subject ! PoisonPill + expectMsg(PostStop("test-1")) + journal.msgAvailable should ===(false) + } + + "using nested persist()" in { + val subject = startActor("test-2") + subject ! Persist(1, Persist(2, "a-1")) + val w1 = expectWrite(subject, Msgs(Persist(2, "a-1"))) + subject ! Persist(3, "a-2") + expectNoMsg(300.millis) + journal.msgAvailable should ===(false) + confirm(w1) + expectMsg(Done(1, 1)) + val w2 = expectWrite(subject, Msgs("a-1")) + confirm(w2) + expectMsg(Done(2, 1)) + val w3 = expectWrite(subject, Msgs("a-2")) + confirm(w3) + expectMsg(Done(3, 1)) + subject ! PoisonPill + expectMsg(PostStop("test-2")) + journal.msgAvailable should ===(false) + } + + "using nested multiple persist()" in { + val subject = startActor("test-3") + subject ! Multi(Persist(1, Persist(2, "a-1")), Persist(3, "a-2")) + val w1 = expectWrite(subject, Msgs(Persist(2, "a-1")), Msgs("a-2")) + confirm(w1) + expectMsg(Done(1, 1)) + expectMsg(Done(3, 1)) + val w2 = expectWrite(subject, Msgs("a-1")) + confirm(w2) + expectMsg(Done(2, 1)) + subject ! PoisonPill + expectMsg(PostStop("test-3")) + journal.msgAvailable should ===(false) + } + + "using large number of persist() calls" in { + val subject = startActor("test-4") + subject ! Multi(Vector.tabulate(30)(i ⇒ Persist(i, s"a-$i")): _*) + val w1 = expectWrite(subject, Vector.tabulate(30)(i ⇒ Msgs(s"a-$i")): _*) + confirm(w1) + for (i ← 0 until 30) expectMsg(Done(i, 1)) + subject ! PoisonPill + expectMsg(PostStop("test-4")) + journal.msgAvailable should ===(false) + } + + "using large number of persistAsync() calls" in { + def msgs(start: Int, end: Int) = (start until end).map(i ⇒ Msgs(s"a-$i-1", s"a-$i-2")) + def commands(start: Int, end: Int) = (start until end).map(i ⇒ PersistAsync(i, s"a-$i-1", s"a-$i-2")) + def expectDone(start: Int, end: Int) = for (i ← start until end; j ← 1 to 2) expectMsg(Done(i, j)) + + val subject = startActor("test-5") + subject ! PersistAsync(-1, "a" +: commands(20, 30): _*) + subject ! Multi(commands(0, 10): _*) + subject ! Multi(commands(10, 20): _*) + val w0 = expectWrite(subject, Msgs("a" +: commands(20, 30): _*)) + journal.expectNoMsg(300.millis) + confirm(w0) + (1 to 11) foreach (x ⇒ expectMsg(Done(-1, x))) + val w1 = expectWrite(subject, msgs(0, 20): _*) + journal.expectNoMsg(300.millis) + confirm(w1) + expectDone(0, 20) + val w2 = expectWrite(subject, msgs(20, 30): _*) + confirm(w2) + expectDone(20, 30) + subject ! PoisonPill + expectMsg(PostStop("test-5")) + journal.msgAvailable should ===(false) + } + + "the actor fails with queued events" in { + val subject = startActor("test-6") + subject ! PersistAsync(1, "a-1") + val w1 = expectWrite(subject, Msgs("a-1")) + subject ! PersistAsync(2, "a-2") + EventFilter[Exception](message = "K-BOOM!", occurrences = 1) intercept { + subject ! Fail(new Exception("K-BOOM!")) + expectMsg(PreRestart("test-6")) + expectMsg(PostRestart("test-6")) + journal.expectMsgType[ReplayMessages] + } + journal.reply(RecoverySuccess(0L)) + expectMsg(RecoveryCompleted) + confirm(w1) + subject ! PoisonPill + expectMsg(PostStop("test-6")) + journal.msgAvailable should ===(false) + } + + } + + } +} diff --git a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala index 238b7947a2..59d0d6fb08 100644 --- a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala @@ -15,7 +15,7 @@ import scala.concurrent.Future import com.typesafe.config.{ Config, ConfigFactory } import akka.dispatch.Dispatchers import akka.testkit.TestEvent._ -import org.scalautils.ConversionCheckedTripleEquals +import org.scalactic.ConversionCheckedTripleEquals object AkkaSpec { val testConf: Config = ConfigFactory.parseString("""