diff --git a/akka-docs/rst/java/code/docs/persistence/PersistencePluginDocTest.java b/akka-docs/rst/java/code/docs/persistence/PersistencePluginDocTest.java index 9bd3428e24..97068da28b 100644 --- a/akka-docs/rst/java/code/docs/persistence/PersistencePluginDocTest.java +++ b/akka-docs/rst/java/code/docs/persistence/PersistencePluginDocTest.java @@ -45,12 +45,7 @@ public class PersistencePluginDocTest { } @Override - public Future doWriteAsync(PersistentRepr persistent) { - return null; - } - - @Override - public Future doWriteBatchAsync(Iterable persistentBatch) { + public Future doWriteAsync(Iterable persistentBatch) { return null; } diff --git a/akka-docs/rst/java/persistence.rst b/akka-docs/rst/java/persistence.rst index 83179dd628..7ecb5436c8 100644 --- a/akka-docs/rst/java/persistence.rst +++ b/akka-docs/rst/java/persistence.rst @@ -381,17 +381,28 @@ another behavior, defined by ``otherCommandHandler``, and back using ``getContex Batch writes ============ -Applications may also send a batch of ``Persistent`` messages to a processor via a ``PersistentBatch`` message. +To optimize throughput, an ``UntypedProcessor`` internally batches received ``Persistent`` messages under high load before +writing them to the journal (as a single batch). The batch size dynamically grows from 1 under low and moderate loads +to a configurable maximum size (default is ``200``) under high load. + +.. includecode:: ../scala/code/docs/persistence/PersistencePluginDocSpec.scala#max-batch-size + +A new batch write is triggered by a processor as soon as a batch reaches the maximum size or if the journal completed +writing the previous batch. Batch writes are never timer-based which keeps latencies as low as possible. + +Applications that want to have more explicit control over batch writes and batch sizes can send processors +``PersistentBatch`` messages. .. includecode:: code/docs/persistence/PersistenceDocTest.java#batch-write -``Persistent`` messages contained in a ``PersistentBatch`` message are written to the journal atomically but are -received by the processor separately (as ``Persistent`` messages). They are also replayed separately. Batch writes -can not only increase the throughput of a processor but may also be necessary for consistency reasons. For example, -in :ref:`event-sourcing-java`, all events that are generated and persisted by a single command are batch-written to -the journal (even if ``persist`` is called multiple times per command). The recovery of an -``UntypedEventsourcedProcessor`` will therefore never be done partially i.e. with only a subset of events persisted -by a single command. +``Persistent`` messages contained in a ``PersistentBatch`` message are always written atomically, even if the batch +size is greater than ``max-batch-size``. Also, a ``PersistentBatch`` is written isolated from other batches. +``Persistent`` messages contained in a ``PersistentBatch`` are received individually by a processor. + +``PersistentBatch`` messages, for example, are used internally by an ``UntypedEventsourcedProcessor`` to ensure atomic +writes of events. All events that are persisted in context of a single command are written as single batch to the +journal (even if ``persist`` is called multiple times per command). The recovery of an ``UntypedEventsourcedProcessor`` +will therefore never be done partially i.e. with only a subset of events persisted by a single command. Storage plugins =============== diff --git a/akka-docs/rst/scala/code/docs/event/LoggingDocSpec.scala b/akka-docs/rst/scala/code/docs/event/LoggingDocSpec.scala index 136f4c7f11..cb4b37101e 100644 --- a/akka-docs/rst/scala/code/docs/event/LoggingDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/event/LoggingDocSpec.scala @@ -60,14 +60,14 @@ object LoggingDocSpec { reqId += 1 val always = Map("requestId" -> reqId) val perMessage = currentMessage match { - case r: Req => Map("visitorId" -> r.visitorId) - case _ => Map() + case r: Req ⇒ Map("visitorId" -> r.visitorId) + case _ ⇒ Map() } always ++ perMessage } def receive: Receive = { - case r: Req => { + case r: Req ⇒ { log.info(s"Starting new request: ${r.work}") } } @@ -75,7 +75,6 @@ object LoggingDocSpec { //#mdc-actor - //#my-event-listener import akka.event.Logging.InitializeLogger import akka.event.Logging.LoggerInitialized @@ -117,7 +116,7 @@ object LoggingDocSpec { class LoggingDocSpec extends AkkaSpec { - import LoggingDocSpec.{MyActor, MdcActor, MdcActorMixin, Req} + import LoggingDocSpec.{ MyActor, MdcActor, MdcActorMixin, Req } "use a logging actor" in { val myActor = system.actorOf(Props[MyActor]) diff --git a/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala b/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala index abac9ec6d3..5c151c0e13 100644 --- a/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala @@ -18,15 +18,12 @@ trait PersistenceDocSpec { class MyProcessor extends Processor { def receive = { - case Persistent(payload, sequenceNr) ⇒ { - // message successfully written to journal - } - case PersistenceFailure(payload, sequenceNr, cause) ⇒ { - // message failed to be written to journal - } - case other ⇒ { - // message not written to journal - } + case Persistent(payload, sequenceNr) ⇒ + // message successfully written to journal + case PersistenceFailure(payload, sequenceNr, cause) ⇒ + // message failed to be written to journal + case other ⇒ + // message not written to journal } } //#definition @@ -109,18 +106,16 @@ trait PersistenceDocSpec { val channel = context.actorOf(Channel.props(), name = "myChannel") def receive = { - case p @ Persistent(payload, _) ⇒ { + case p @ Persistent(payload, _) ⇒ channel ! Deliver(p.withPayload(s"processed ${payload}"), destination) - } } } class MyDestination extends Actor { def receive = { - case p @ ConfirmablePersistent(payload, _) ⇒ { + case p @ ConfirmablePersistent(payload, _) ⇒ println(s"received ${payload}") p.confirm() - } } } //#channel-example @@ -135,7 +130,7 @@ trait PersistenceDocSpec { //#channel-id-override def receive = { - case p @ Persistent(payload, _) ⇒ { + case p @ Persistent(payload, _) ⇒ //#channel-example-reply channel ! Deliver(p.withPayload(s"processed ${payload}"), sender) //#channel-example-reply @@ -144,8 +139,7 @@ trait PersistenceDocSpec { //#resolve-destination //#resolve-sender channel forward Deliver(p, destination, Resolve.Sender) - //#resolve-sender - } + //#resolve-sender } } @@ -175,15 +169,13 @@ trait PersistenceDocSpec { startWith("closed", 0) when("closed") { - case Event(Persistent("open", _), counter) ⇒ { + case Event(Persistent("open", _), counter) ⇒ goto("open") using (counter + 1) replying (counter) - } } when("open") { - case Event(Persistent("close", _), counter) ⇒ { + case Event(Persistent("close", _), counter) ⇒ goto("closed") using (counter + 1) replying (counter) - } } } //#fsm-example @@ -239,7 +231,7 @@ trait PersistenceDocSpec { val system = ActorSystem("example") val processor = system.actorOf(Props[MyProcessor]) - processor ! PersistentBatch(Vector(Persistent("a"), Persistent("b"))) + processor ! PersistentBatch(List(Persistent("a"), Persistent("b"))) //#batch-write system.shutdown() } diff --git a/akka-docs/rst/scala/code/docs/persistence/PersistencePluginDocSpec.scala b/akka-docs/rst/scala/code/docs/persistence/PersistencePluginDocSpec.scala index 7380c59b1c..4c261c2902 100644 --- a/akka-docs/rst/scala/code/docs/persistence/PersistencePluginDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/persistence/PersistencePluginDocSpec.scala @@ -23,6 +23,9 @@ import akka.persistence.snapshot._ object PersistencePluginDocSpec { val config = """ + //#max-batch-size + akka.persistence.journal.max-batch-size = 200 + //#max-batch-size //#journal-config akka.persistence.journal.leveldb.dir = "target/journal" //#journal-config @@ -69,8 +72,7 @@ class PersistencePluginDocSpec extends WordSpec { } class MyJournal extends AsyncWriteJournal { - def writeAsync(persistent: PersistentRepr): Future[Unit] = ??? - def writeBatchAsync(persistentBatch: Seq[PersistentRepr]): Future[Unit] = ??? + def writeAsync(persistentBatch: Seq[PersistentRepr]): Future[Unit] = ??? def deleteAsync(processorId: String, fromSequenceNr: Long, toSequenceNr: Long, permanent: Boolean): Future[Unit] = ??? def confirmAsync(processorId: String, sequenceNr: Long, channelId: String): Future[Unit] = ??? def replayAsync(processorId: String, fromSequenceNr: Long, toSequenceNr: Long)(replayCallback: (PersistentRepr) ⇒ Unit): Future[Long] = ??? diff --git a/akka-docs/rst/scala/persistence.rst b/akka-docs/rst/scala/persistence.rst index 738d77be31..1d54ceb462 100644 --- a/akka-docs/rst/scala/persistence.rst +++ b/akka-docs/rst/scala/persistence.rst @@ -392,14 +392,26 @@ See also the API docs of ``persist`` for further details. Batch writes ============ -Applications may also send a batch of ``Persistent`` messages to a processor via a ``PersistentBatch`` message. +To optimize throughput, a ``Processor`` internally batches received ``Persistent`` messages under high load before +writing them to the journal (as a single batch). The batch size dynamically grows from 1 under low and moderate loads +to a configurable maximum size (default is ``200``) under high load. + +.. includecode:: code/docs/persistence/PersistencePluginDocSpec.scala#max-batch-size + +A new batch write is triggered by a processor as soon as a batch reaches the maximum size or if the journal completed +writing the previous batch. Batch writes are never timer-based which keeps latencies as low as possible. + +Applications that want to have more explicit control over batch writes and batch sizes can send processors +``PersistentBatch`` messages. .. includecode:: code/docs/persistence/PersistenceDocSpec.scala#batch-write -``Persistent`` messages contained in a ``PersistentBatch`` message are written to the journal atomically but are -received by the processor separately (as ``Persistent`` messages). They are also replayed separately. Batch writes -can not only increase the throughput of a processor but may also be necessary for consistency reasons. For example, -in :ref:`event-sourcing`, all events that are generated and persisted by a single command are batch-written to the +``Persistent`` messages contained in a ``PersistentBatch`` message are always written atomically, even if the batch +size is greater than ``max-batch-size``. Also, a ``PersistentBatch`` is written isolated from other batches. +``Persistent`` messages contained in a ``PersistentBatch`` are received individually by a processor. + +``PersistentBatch`` messages, for example, are used internally by an ``EventsourcedProcessor`` to ensure atomic +writes of events. All events that are persisted in context of a single command are written as single batch to the journal (even if ``persist`` is called multiple times per command). The recovery of an ``EventsourcedProcessor`` will therefore never be done partially i.e. with only a subset of events persisted by a single command. diff --git a/akka-persistence/src/main/java/akka/persistence/journal/japi/AsyncWritePlugin.java b/akka-persistence/src/main/java/akka/persistence/journal/japi/AsyncWritePlugin.java index 713c09886d..350c9be3df 100644 --- a/akka-persistence/src/main/java/akka/persistence/journal/japi/AsyncWritePlugin.java +++ b/akka-persistence/src/main/java/akka/persistence/journal/japi/AsyncWritePlugin.java @@ -10,17 +10,12 @@ import akka.persistence.PersistentRepr; interface AsyncWritePlugin { //#async-write-plugin-api - /** - * Java API, Plugin API: asynchronously writes a `persistent` message to the journal. - */ - Future doWriteAsync(PersistentRepr persistent); - /** * Java API, Plugin API: asynchronously writes a batch of persistent messages to the * journal. The batch write must be atomic i.e. either all persistent messages in the * batch are written or none. */ - Future doWriteBatchAsync(Iterable persistentBatch); + Future doWriteAsync(Iterable persistentBatch); /** * Java API, Plugin API: asynchronously deletes all persistent messages within the diff --git a/akka-persistence/src/main/java/akka/persistence/journal/japi/SyncWritePlugin.java b/akka-persistence/src/main/java/akka/persistence/journal/japi/SyncWritePlugin.java index 09665ff23a..f15da52f34 100644 --- a/akka-persistence/src/main/java/akka/persistence/journal/japi/SyncWritePlugin.java +++ b/akka-persistence/src/main/java/akka/persistence/journal/japi/SyncWritePlugin.java @@ -8,17 +8,12 @@ import akka.persistence.PersistentRepr; interface SyncWritePlugin { //#sync-write-plugin-api - /** - * Java API, Plugin API: synchronously writes a `persistent` message to the journal. - */ - void doWrite(PersistentRepr persistent) throws Exception; - /** * Java API, Plugin API: synchronously writes a batch of persistent messages to the * journal. The batch write must be atomic i.e. either all persistent messages in the * batch are written or none. */ - void doWriteBatch(Iterable persistentBatch); + void doWrite(Iterable persistentBatch); /** * Java API, Plugin API: synchronously deletes all persistent messages within the diff --git a/akka-persistence/src/main/resources/reference.conf b/akka-persistence/src/main/resources/reference.conf index 550ce21602..85ea2edc11 100644 --- a/akka-persistence/src/main/resources/reference.conf +++ b/akka-persistence/src/main/resources/reference.conf @@ -26,6 +26,12 @@ akka { journal { + # Maximum size of a persistent message batch written to the journal. Only applies to + # internally created batches by processors that receive persistent messages individually. + # Application-defined batches, even if larger than this setting, are always written as + # a single isolated batch. + max-batch-size = 200 + # Path to the journal plugin to be used plugin = "akka.persistence.journal.leveldb" diff --git a/akka-persistence/src/main/scala/akka/persistence/Channel.scala b/akka-persistence/src/main/scala/akka/persistence/Channel.scala index 1230fee7ff..4e9bad76d7 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Channel.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Channel.scala @@ -23,10 +23,9 @@ import akka.persistence.serialization.Message * val channel = context.actorOf(Channel.props(), "myChannel") * * def receive = { - * case m @ Persistent(payload, _) => { + * case m @ Persistent(payload, _) => * // forward modified message to destination * channel forward Deliver(m.withPayload(s"fw: ${payload}"), destination) - * } * } * } * }}} @@ -39,10 +38,9 @@ import akka.persistence.serialization.Message * val channel = context.actorOf(Channel.props(), "myChannel") * * def receive = { - * case m @ Persistent(payload, _) => { + * case m @ Persistent(payload, _) => * // reply modified message to sender * channel ! Deliver(m.withPayload(s"re: ${payload}"), sender) - * } * } * } * }}} @@ -78,28 +76,27 @@ sealed class Channel private[akka] (_channelId: Option[String]) extends Actor wi import ResolvedDelivery._ private val delivering: Actor.Receive = { - case Deliver(persistent: PersistentRepr, destination, resolve) ⇒ { + case Deliver(persistent: PersistentRepr, destination, resolve) ⇒ if (!persistent.confirms.contains(id)) { val prepared = prepareDelivery(persistent) resolve match { - case Resolve.Sender if !prepared.resolved ⇒ { + case Resolve.Sender if !prepared.resolved ⇒ context.actorOf(Props(classOf[ResolvedSenderDelivery], prepared, destination, sender)) ! DeliverResolved context.become(buffering, false) - } - case Resolve.Destination if !prepared.resolved ⇒ { + case Resolve.Destination if !prepared.resolved ⇒ context.actorOf(Props(classOf[ResolvedDestinationDelivery], prepared, destination, sender)) ! DeliverResolved context.become(buffering, false) - } case _ ⇒ destination tell (prepared, sender) } } unstash() - } } private val buffering: Actor.Receive = { - case DeliveredResolved | DeliveredUnresolved ⇒ { context.unbecome(); unstash() } - case _: Deliver ⇒ stash() + case DeliveredResolved | DeliveredUnresolved ⇒ + context.unbecome() + unstash() + case _: Deliver ⇒ stash() } def receive = delivering @@ -164,7 +161,7 @@ final class PersistentChannel private[akka] (_channelId: Option[String], persist } def receiveCommand: Receive = { - case d @ Deliver(persistent: PersistentRepr, destination, resolve) ⇒ { + case d @ Deliver(persistent: PersistentRepr, destination, resolve) ⇒ if (!persistent.confirms.contains(processorId)) { persist(d) { _ ⇒ val prepared = prepareDelivery(persistent) @@ -179,7 +176,6 @@ final class PersistentChannel private[akka] (_channelId: Option[String], persist deliver(prepared, destination, resolve) } } - } case c: Confirm ⇒ deleteMessage(c.sequenceNr, true) case DisableDelivery ⇒ deliveryEnabled = false case EnableDelivery if (!deliveryEnabled) ⇒ throw new ChannelRestartRequiredException @@ -393,10 +389,17 @@ private trait ResolvedDelivery extends Actor { def onResolveFailure(): Unit def receive = { - case DeliverResolved ⇒ context.actorSelection(path) ! Identify(1) - case ActorIdentity(1, Some(ref)) ⇒ { onResolveSuccess(ref); shutdown(DeliveredResolved) } - case ActorIdentity(1, None) ⇒ { onResolveFailure(); shutdown(DeliveredUnresolved) } - case ReceiveTimeout ⇒ { onResolveFailure(); shutdown(DeliveredUnresolved) } + case DeliverResolved ⇒ + context.actorSelection(path) ! Identify(1) + case ActorIdentity(1, Some(ref)) ⇒ + onResolveSuccess(ref) + shutdown(DeliveredResolved) + case ActorIdentity(1, None) ⇒ + onResolveFailure() + shutdown(DeliveredUnresolved) + case ReceiveTimeout ⇒ + onResolveFailure() + shutdown(DeliveredUnresolved) } def shutdown(message: Any) { diff --git a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala index dc4d5dcd20..d904e48400 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala @@ -26,6 +26,8 @@ private[persistence] trait Eventsourced extends Processor { * `processingCommands` */ private val recovering: State = new State { + override def toString: String = "recovering" + def aroundReceive(receive: Receive, message: Any) { Eventsourced.super.aroundReceive(receive, message) message match { @@ -43,6 +45,8 @@ private[persistence] trait Eventsourced extends Processor { * directly offered as `LoopSuccess` to the state machine implemented by `Processor`. */ private val processingCommands: State = new State { + override def toString: String = "processing commands" + def aroundReceive(receive: Receive, message: Any) { Eventsourced.super.aroundReceive(receive, LoopSuccess(message)) if (!persistInvocations.isEmpty) { @@ -62,24 +66,24 @@ private[persistence] trait Eventsourced extends Processor { * messages are stashed internally. */ private val persistingEvents: State = new State { + override def toString: String = "persisting events" + def aroundReceive(receive: Receive, message: Any) = message match { - case PersistentBatch(b) ⇒ { + case PersistentBatch(b) ⇒ b.foreach(p ⇒ deleteMessage(p.sequenceNr, true)) throw new UnsupportedOperationException("Persistent command batches not supported") - } - case p: PersistentRepr ⇒ { + case p: PersistentRepr ⇒ deleteMessage(p.sequenceNr, true) throw new UnsupportedOperationException("Persistent commands not supported") - } - case WriteSuccess(p) if identical(p.payload, persistInvocations.head._1) ⇒ { + case WriteSuccess(p) if identical(p.payload, persistInvocations.head._1) ⇒ withCurrentPersistent(p)(p ⇒ persistInvocations.head._2(p.payload)) onWriteComplete() - } - case e @ WriteFailure(p, _) if identical(p.payload, persistInvocations.head._1) ⇒ { + case e @ WriteFailure(p, _) if identical(p.payload, persistInvocations.head._1) ⇒ Eventsourced.super.aroundReceive(receive, message) // stops actor by default onWriteComplete() - } - case other ⇒ processorStash.stash() + case s @ WriteBatchSuccess ⇒ Eventsourced.super.aroundReceive(receive, s) + case f: WriteBatchFailure ⇒ Eventsourced.super.aroundReceive(receive, f) + case other ⇒ processorStash.stash() } def onWriteComplete(): Unit = { @@ -177,16 +181,16 @@ private[persistence] trait Eventsourced extends Processor { * Calls `super.preRestart` then unstashes all messages from the internal stash. */ override def preRestart(reason: Throwable, message: Option[Any]) { - super.preRestart(reason, message) processorStash.unstashAll() + super.preRestart(reason, message) } /** * Calls `super.postStop` then unstashes all messages from the internal stash. */ override def postStop() { - super.postStop() processorStash.unstashAll() + super.postStop() } /** diff --git a/akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala b/akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala index c3cee50399..8ce5b9c59b 100644 --- a/akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala +++ b/akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala @@ -29,12 +29,18 @@ private[persistence] object JournalProtocol { case class WriteBatch(persistentBatch: immutable.Seq[PersistentRepr], processor: ActorRef) /** - * Instructs a journal to persist a message. - * - * @param persistent message to be persisted. - * @param processor requesting processor. + * Reply message to a processor if a batch write succeeded. This message is received before + * all subsequent [[WriteSuccess]] messages. */ - case class Write(persistent: PersistentRepr, processor: ActorRef) + case object WriteBatchSuccess + + /** + * Reply message to a processor if a batch write failed. This message is received before + * all subsequent [[WriteFailure]] messages. + * + * @param cause failure cause. + */ + case class WriteBatchFailure(cause: Throwable) /** * Reply message to a processor that `persistent` message has been successfully journaled. diff --git a/akka-persistence/src/main/scala/akka/persistence/Persistence.scala b/akka-persistence/src/main/scala/akka/persistence/Persistence.scala index 0483674888..f45ab66fa8 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Persistence.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Persistence.scala @@ -33,6 +33,9 @@ class Persistence(val system: ExtendedActorSystem) extends Extension { private val journal = createPlugin("journal", clazz ⇒ if (classOf[AsyncWriteJournal].isAssignableFrom(clazz)) Dispatchers.DefaultDispatcherId else DefaultPluginDispatcherId) + /** + * INTERNAL API. + */ private[persistence] val publishPluginCommands: Boolean = { val path = "publish-plugin-commands" // this config option is only used internally (for testing @@ -40,6 +43,12 @@ class Persistence(val system: ExtendedActorSystem) extends Extension { config.hasPath(path) && config.getBoolean(path) } + /** + * INTERNAL API. + */ + private[persistence] val maxBatchSize: Int = + config.getInt("journal.max-batch-size") + /** * Returns a snapshot store for a processor identified by `processorId`. */ diff --git a/akka-persistence/src/main/scala/akka/persistence/Persistent.scala b/akka-persistence/src/main/scala/akka/persistence/Persistent.scala index 474b2bd3d2..ac74e71b93 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Persistent.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Persistent.scala @@ -9,9 +9,9 @@ import java.util.{ List ⇒ JList } import scala.collection.immutable -import akka.actor.ActorRef +import akka.actor.{ ActorContext, ActorRef } import akka.japi.Util.immutableSeq - +import akka.pattern.PromiseActorRef import akka.persistence.serialization.Message /** @@ -177,8 +177,20 @@ trait PersistentRepr extends Persistent with Message { */ def sender: ActorRef + /** + * INTERNAL API. + */ private[persistence] def prepareWrite(sender: ActorRef): PersistentRepr + /** + * INTERNAL API. + */ + private[persistence] def prepareWrite()(implicit context: ActorContext): PersistentRepr = + prepareWrite(if (sender.isInstanceOf[PromiseActorRef]) context.system.deadLetters else sender) + + /** + * INTERNAL API. + */ private[persistence] def update( sequenceNr: Long = sequenceNr, processorId: String = processorId, @@ -186,7 +198,8 @@ trait PersistentRepr extends Persistent with Message { resolved: Boolean = resolved, confirms: immutable.Seq[String] = confirms, confirmMessage: Confirm = confirmMessage, - confirmTarget: ActorRef = confirmTarget): PersistentRepr + confirmTarget: ActorRef = confirmTarget, + sender: ActorRef = sender): PersistentRepr } object PersistentRepr { @@ -250,8 +263,9 @@ private[persistence] case class PersistentImpl( resolved: Boolean, confirms: immutable.Seq[String], confirmMessage: Confirm, - confirmTarget: ActorRef) = - copy(sequenceNr = sequenceNr, processorId = processorId, deleted = deleted, confirms = confirms) + confirmTarget: ActorRef, + sender: ActorRef) = + copy(sequenceNr = sequenceNr, processorId = processorId, deleted = deleted, confirms = confirms, sender = sender) val resolved: Boolean = false val confirmable: Boolean = false @@ -284,8 +298,8 @@ private[persistence] case class ConfirmablePersistentImpl( def prepareWrite(sender: ActorRef) = copy(sender = sender, resolved = false, confirmMessage = null, confirmTarget = null) - def update(sequenceNr: Long, processorId: String, deleted: Boolean, resolved: Boolean, confirms: immutable.Seq[String], confirmMessage: Confirm, confirmTarget: ActorRef) = - copy(sequenceNr = sequenceNr, processorId = processorId, deleted = deleted, resolved = resolved, confirms = confirms, confirmMessage = confirmMessage, confirmTarget = confirmTarget) + def update(sequenceNr: Long, processorId: String, deleted: Boolean, resolved: Boolean, confirms: immutable.Seq[String], confirmMessage: Confirm, confirmTarget: ActorRef, sender: ActorRef) = + copy(sequenceNr = sequenceNr, processorId = processorId, deleted = deleted, resolved = resolved, confirms = confirms, confirmMessage = confirmMessage, confirmTarget = confirmTarget, sender = sender) } private[persistence] object ConfirmablePersistentImpl { diff --git a/akka-persistence/src/main/scala/akka/persistence/Processor.scala b/akka-persistence/src/main/scala/akka/persistence/Processor.scala index 76b80f8a1c..48bfb0a53c 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Processor.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Processor.scala @@ -4,6 +4,8 @@ package akka.persistence +import scala.annotation.tailrec + import akka.actor._ import akka.dispatch._ @@ -58,6 +60,8 @@ trait Processor extends Actor with Stash with StashFactory { private val extension = Persistence(context.system) private val _processorId = extension.processorId(self) + import extension.maxBatchSize + /** * Processor state. */ @@ -81,10 +85,9 @@ trait Processor extends Actor with Stash with StashFactory { override def toString: String = "recovery pending" def aroundReceive(receive: Actor.Receive, message: Any): Unit = message match { - case Recover(fromSnap, toSnr) ⇒ { + case Recover(fromSnap, toSnr) ⇒ _currentState = recoveryStarted snapshotStore ! LoadSnapshot(processorId, fromSnap, toSnr) - } case _ ⇒ processorStash.stash() } } @@ -100,32 +103,28 @@ trait Processor extends Actor with Stash with StashFactory { def aroundReceive(receive: Actor.Receive, message: Any) = message match { case LoadSnapshotResult(sso, toSnr) ⇒ sso match { - case Some(SelectedSnapshot(metadata, snapshot)) ⇒ { + case Some(SelectedSnapshot(metadata, snapshot)) ⇒ process(receive, SnapshotOffer(metadata, snapshot)) journal ! Replay(metadata.sequenceNr + 1L, toSnr, processorId, self) - } case None ⇒ { + case None ⇒ journal ! Replay(1L, toSnr, processorId, self) - } } - case ReplaySuccess(maxSnr) ⇒ { + case ReplaySuccess(maxSnr) ⇒ _currentState = recoverySucceeded _sequenceNr = maxSnr processorStash.unstashAll() - } - case ReplayFailure(cause) ⇒ { + case ReplayFailure(cause) ⇒ val notification = RecoveryFailure(cause) if (receive.isDefinedAt(notification)) process(receive, notification) else { val errorMsg = s"Replay failure by journal (processor id = [${processorId}])" throw new RecoveryFailureException(errorMsg, cause) } - } case Replayed(p) ⇒ try { processPersistent(receive, p) } catch { - case t: Throwable ⇒ { + case t: Throwable ⇒ _currentState = recoveryFailed // delay throwing exception to prepareRestart _recoveryFailureCause = t _recoveryFailureMessage = currentEnvelope - } } case r: Recover ⇒ // ignore case _ ⇒ processorStash.stash() @@ -138,11 +137,13 @@ trait Processor extends Actor with Stash with StashFactory { private val recoverySucceeded = new State { override def toString: String = "recovery finished" + private var batching = false + def aroundReceive(receive: Actor.Receive, message: Any) = message match { case r: Recover ⇒ // ignore case Replayed(p) ⇒ processPersistent(receive, p) // can occur after unstash from user stash case WriteSuccess(p) ⇒ processPersistent(receive, p) - case WriteFailure(p, cause) ⇒ { + case WriteFailure(p, cause) ⇒ val notification = PersistenceFailure(p.payload, p.sequenceNr, cause) if (receive.isDefinedAt(notification)) process(receive, notification) else { @@ -151,11 +152,36 @@ trait Processor extends Actor with Stash with StashFactory { "To avoid killing processors on persistence failure, a processor must handle PersistenceFailure messages." throw new ActorKilledException(errorMsg) } - } - case LoopSuccess(m) ⇒ process(receive, m) - case p: PersistentRepr ⇒ journal forward Write(p.update(processorId = processorId, sequenceNr = nextSequenceNr()), self) - case pb: PersistentBatch ⇒ journal forward WriteBatch(pb.persistentReprList.map(_.update(processorId = processorId, sequenceNr = nextSequenceNr())), self) - case m ⇒ journal forward Loop(m, self) + case LoopSuccess(m) ⇒ process(receive, m) + case WriteBatchSuccess | WriteBatchFailure(_) ⇒ + if (processorBatch.isEmpty) batching = false else journalBatch() + case p: PersistentRepr ⇒ + addToBatch(p) + if (!batching || maxBatchSizeReached) journalBatch() + case pb: PersistentBatch ⇒ + // submit all batched messages before submitting this user batch (isolated) + if (!processorBatch.isEmpty) journalBatch() + addToBatch(pb) + journalBatch() + case m ⇒ + // submit all batched messages before looping this message + if (processorBatch.isEmpty) batching = false else journalBatch() + journal forward Loop(m, self) + } + + def addToBatch(p: PersistentRepr): Unit = + processorBatch = processorBatch :+ p.update(processorId = processorId, sequenceNr = nextSequenceNr(), sender = sender) + + def addToBatch(pb: PersistentBatch): Unit = + pb.persistentReprList.foreach(addToBatch) + + def maxBatchSizeReached: Boolean = + processorBatch.length >= maxBatchSize + + def journalBatch(): Unit = { + journal ! WriteBatch(processorBatch, self) + processorBatch = Vector.empty + batching = true } } @@ -168,13 +194,20 @@ trait Processor extends Actor with Stash with StashFactory { override def toString: String = "recovery failed" def aroundReceive(receive: Actor.Receive, message: Any) = message match { - case ReplaySuccess(_) | ReplayFailure(_) ⇒ { - _currentState = prepareRestart - mailbox.enqueueFirst(self, _recoveryFailureMessage) - } - case Replayed(p) ⇒ updateLastSequenceNr(p) - case r: Recover ⇒ // ignore - case _ ⇒ processorStash.stash() + case ReplayFailure(_) ⇒ + replayCompleted() + // journal couldn't tell the maximum stored sequence number, hence the next + // replay must be a full replay (up to the highest stored sequence number) + _lastSequenceNr = Long.MaxValue + case ReplaySuccess(_) ⇒ replayCompleted() + case Replayed(p) ⇒ updateLastSequenceNr(p) + case r: Recover ⇒ // ignore + case _ ⇒ processorStash.stash() + } + + def replayCompleted(): Unit = { + _currentState = prepareRestart + mailbox.enqueueFirst(self, _recoveryFailureMessage) } } @@ -191,6 +224,8 @@ trait Processor extends Actor with Stash with StashFactory { } } + private var processorBatch = Vector.empty[PersistentRepr] + private var _sequenceNr: Long = 0L private var _lastSequenceNr: Long = 0L @@ -352,8 +387,9 @@ trait Processor extends Actor with Stash with StashFactory { */ final override protected[akka] def aroundPreRestart(reason: Throwable, message: Option[Any]): Unit = { try { - unstashAll(unstashFilterPredicate) + processorStash.prepend(processorBatch.map(p ⇒ Envelope(p, p.sender, context.system))) processorStash.unstashAll() + unstashAll(unstashFilterPredicate) } finally { message match { case Some(WriteSuccess(m)) ⇒ preRestartDefault(reason, Some(m)) diff --git a/akka-persistence/src/main/scala/akka/persistence/Snapshot.scala b/akka-persistence/src/main/scala/akka/persistence/Snapshot.scala index 25c5cd2278..1e27d6c2c3 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Snapshot.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Snapshot.scala @@ -50,9 +50,15 @@ case class SnapshotOffer(metadata: SnapshotMetadata, snapshot: Any) */ @SerialVersionUID(1L) case class SnapshotSelectionCriteria(maxSequenceNr: Long = Long.MaxValue, maxTimestamp: Long = Long.MaxValue) { + /** + * INTERNAL API. + */ private[persistence] def limit(toSequenceNr: Long): SnapshotSelectionCriteria = if (toSequenceNr < maxSequenceNr) copy(maxSequenceNr = toSequenceNr) else this + /** + * INTERNAL API. + */ private[persistence] def matches(metadata: SnapshotMetadata): Boolean = metadata.sequenceNr <= maxSequenceNr && metadata.timestamp <= maxTimestamp } diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala index c7a8488c49..bb0b1b7e85 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala @@ -27,31 +27,21 @@ trait AsyncWriteJournal extends Actor with AsyncReplay { private var resequencerCounter = 1L final def receive = { - case Write(persistent, processor) ⇒ { - val csdr = sender + case WriteBatch(persistentBatch, processor) ⇒ val cctr = resequencerCounter - val psdr = if (sender.isInstanceOf[PromiseActorRef]) context.system.deadLetters else sender - writeAsync(persistent.prepareWrite(psdr)) map { - _ ⇒ Desequenced(WriteSuccess(persistent), cctr, processor, csdr) - } recover { - case e ⇒ Desequenced(WriteFailure(persistent, e), cctr, processor, csdr) - } pipeTo (resequencer) - resequencerCounter += 1 - } - case WriteBatch(persistentBatch, processor) ⇒ { - val csdr = sender - val cctr = resequencerCounter - val psdr = if (sender.isInstanceOf[PromiseActorRef]) context.system.deadLetters else sender def resequence(f: PersistentRepr ⇒ Any) = persistentBatch.zipWithIndex.foreach { - case (p, i) ⇒ resequencer ! Desequenced(f(p), cctr + i, processor, csdr) + case (p, i) ⇒ resequencer ! Desequenced(f(p), cctr + i + 1, processor, p.sender) } - writeBatchAsync(persistentBatch.map(_.prepareWrite(psdr))) onComplete { - case Success(_) ⇒ resequence(WriteSuccess(_)) - case Failure(e) ⇒ resequence(WriteFailure(_, e)) + writeAsync(persistentBatch.map(_.prepareWrite())) onComplete { + case Success(_) ⇒ + resequencer ! Desequenced(WriteBatchSuccess, cctr, processor, self) + resequence(WriteSuccess(_)) + case Failure(e) ⇒ + resequencer ! Desequenced(WriteBatchFailure(e), cctr, processor, self) + resequence(WriteFailure(_, e)) } - resequencerCounter += persistentBatch.length - } - case Replay(fromSequenceNr, toSequenceNr, processorId, processor) ⇒ { + resequencerCounter += persistentBatch.length + 1 + case Replay(fromSequenceNr, toSequenceNr, processorId, processor) ⇒ // Send replayed messages and replay result to processor directly. No need // to resequence replayed messages relative to written and looped messages. replayAsync(processorId, fromSequenceNr, toSequenceNr) { p ⇒ @@ -61,38 +51,29 @@ trait AsyncWriteJournal extends Actor with AsyncReplay { } recover { case e ⇒ ReplayFailure(e) } pipeTo (processor) - } - case c @ Confirm(processorId, sequenceNr, channelId) ⇒ { + case c @ Confirm(processorId, sequenceNr, channelId) ⇒ confirmAsync(processorId, sequenceNr, channelId) onComplete { case Success(_) ⇒ if (extension.publishPluginCommands) context.system.eventStream.publish(c) case Failure(e) ⇒ // TODO: publish failure to event stream + context.system.eventStream.publish(c) } - context.system.eventStream.publish(c) - } - case d @ Delete(processorId, fromSequenceNr, toSequenceNr, permanent) ⇒ { + case d @ Delete(processorId, fromSequenceNr, toSequenceNr, permanent) ⇒ deleteAsync(processorId, fromSequenceNr, toSequenceNr, permanent) onComplete { case Success(_) ⇒ if (extension.publishPluginCommands) context.system.eventStream.publish(d) case Failure(e) ⇒ // TODO: publish failure to event stream } - } - case Loop(message, processor) ⇒ { + case Loop(message, processor) ⇒ resequencer ! Desequenced(LoopSuccess(message), resequencerCounter, processor, sender) resequencerCounter += 1 - } } //#journal-plugin-api - /** - * Plugin API: asynchronously writes a `persistent` message to the journal. - */ - def writeAsync(persistent: PersistentRepr): Future[Unit] - /** * Plugin API: asynchronously writes a batch of persistent messages to the journal. * The batch write must be atomic i.e. either all persistent messages in the batch * are written or none. */ - def writeBatchAsync(persistentBatch: immutable.Seq[PersistentRepr]): Future[Unit] + def writeAsync(persistentBatch: immutable.Seq[PersistentRepr]): Future[Unit] /** * Plugin API: asynchronously deletes all persistent messages within the range from diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/SyncWriteJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/SyncWriteJournal.scala index 3b667523f3..f9329b2dc6 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/SyncWriteJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/SyncWriteJournal.scala @@ -22,21 +22,17 @@ trait SyncWriteJournal extends Actor with AsyncReplay { private val extension = Persistence(context.system) final def receive = { - case Write(persistent, processor) ⇒ { - val sdr = if (sender.isInstanceOf[PromiseActorRef]) context.system.deadLetters else sender - Try(write(persistent.prepareWrite(sdr))) match { - case Success(_) ⇒ processor forward WriteSuccess(persistent) - case Failure(e) ⇒ processor forward WriteFailure(persistent, e); throw e + case WriteBatch(persistentBatch, processor) ⇒ + Try(write(persistentBatch.map(_.prepareWrite()))) match { + case Success(_) ⇒ + processor ! WriteBatchSuccess + persistentBatch.foreach(p ⇒ processor.tell(WriteSuccess(p), p.sender)) + case Failure(e) ⇒ + processor ! WriteBatchFailure(e) + persistentBatch.foreach(p ⇒ processor tell (WriteFailure(p, e), p.sender)) + throw e } - } - case WriteBatch(persistentBatch, processor) ⇒ { - val sdr = if (sender.isInstanceOf[PromiseActorRef]) context.system.deadLetters else sender - Try(writeBatch(persistentBatch.map(_.prepareWrite(sdr)))) match { - case Success(_) ⇒ persistentBatch.foreach(processor forward WriteSuccess(_)) - case Failure(e) ⇒ persistentBatch.foreach(processor forward WriteFailure(_, e)); throw e - } - } - case Replay(fromSequenceNr, toSequenceNr, processorId, processor) ⇒ { + case Replay(fromSequenceNr, toSequenceNr, processorId, processor) ⇒ replayAsync(processorId, fromSequenceNr, toSequenceNr) { p ⇒ if (!p.deleted) processor.tell(Replayed(p), p.sender) } map { @@ -44,32 +40,23 @@ trait SyncWriteJournal extends Actor with AsyncReplay { } recover { case e ⇒ ReplayFailure(e) } pipeTo (processor) - } - case c @ Confirm(processorId, sequenceNr, channelId) ⇒ { + case c @ Confirm(processorId, sequenceNr, channelId) ⇒ confirm(processorId, sequenceNr, channelId) if (extension.publishPluginCommands) context.system.eventStream.publish(c) - } - case d @ Delete(processorId, fromSequenceNr, toSequenceNr, permanent) ⇒ { + case d @ Delete(processorId, fromSequenceNr, toSequenceNr, permanent) ⇒ delete(processorId, fromSequenceNr, toSequenceNr, permanent) if (extension.publishPluginCommands) context.system.eventStream.publish(d) - } - case Loop(message, processor) ⇒ { + case Loop(message, processor) ⇒ processor forward LoopSuccess(message) - } } //#journal-plugin-api - /** - * Plugin API: synchronously writes a `persistent` message to the journal. - */ - def write(persistent: PersistentRepr): Unit - /** * Plugin API: synchronously writes a batch of persistent messages to the journal. * The batch write must be atomic i.e. either all persistent messages in the batch * are written or none. */ - def writeBatch(persistentBatch: immutable.Seq[PersistentRepr]): Unit + def write(persistentBatch: immutable.Seq[PersistentRepr]): Unit /** * Plugin API: synchronously deletes all persistent messages within the range from diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/inmem/InmemJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/inmem/InmemJournal.scala index 68b39f66aa..9b7a93b416 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/inmem/InmemJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/inmem/InmemJournal.scala @@ -27,10 +27,7 @@ private[persistence] class InmemJournal extends AsyncWriteJournal { import InmemStore._ - def writeAsync(persistent: PersistentRepr): Future[Unit] = - (store ? Write(persistent)).mapTo[Unit] - - def writeBatchAsync(persistentBatch: immutable.Seq[PersistentRepr]): Future[Unit] = + def writeAsync(persistentBatch: immutable.Seq[PersistentRepr]): Future[Unit] = (store ? WriteBatch(persistentBatch)).mapTo[Unit] def deleteAsync(processorId: String, fromSequenceNr: Long, toSequenceNr: Long, permanent: Boolean): Future[Unit] = @@ -43,16 +40,49 @@ private[persistence] class InmemJournal extends AsyncWriteJournal { (store ? Replay(processorId, fromSequenceNr, toSequenceNr, replayCallback)).mapTo[Long] } -private[persistence] class InmemStore extends Actor { - import InmemStore._ - +/** + * INTERNAL API. + */ +private[persistence] trait InmemMessages { // processor id => persistent message var messages = Map.empty[String, Vector[PersistentRepr]] + def add(p: PersistentRepr) = messages = messages + (messages.get(p.processorId) match { + case Some(ms) ⇒ p.processorId -> (ms :+ p) + case None ⇒ p.processorId -> Vector(p) + }) + + def update(pid: String, snr: Long)(f: PersistentRepr ⇒ PersistentRepr) = messages = messages.get(pid) match { + case Some(ms) ⇒ messages + (pid -> ms.map(sp ⇒ if (sp.sequenceNr == snr) f(sp) else sp)) + case None ⇒ messages + } + + def delete(pid: String, snr: Long) = messages = messages.get(pid) match { + case Some(ms) ⇒ messages + (pid -> ms.filterNot(_.sequenceNr == snr)) + case None ⇒ messages + } + + def read(pid: String, fromSnr: Long, toSnr: Long): immutable.Seq[PersistentRepr] = messages.get(pid) match { + case Some(ms) ⇒ ms.filter(m ⇒ m.sequenceNr >= fromSnr && m.sequenceNr <= toSnr) + case None ⇒ Nil + } + + def maxSequenceNr(pid: String): Long = { + val snro = for { + ms ← messages.get(pid) + m ← ms.lastOption + } yield m.sequenceNr + snro.getOrElse(0L) + } +} + +/** + * INTERNAL API. + */ +private[persistence] class InmemStore extends Actor with InmemMessages { + import InmemStore._ + def receive = { - case Write(p) ⇒ - add(p) - success() case WriteBatch(pb) ⇒ pb.foreach(add) success() @@ -65,46 +95,16 @@ private[persistence] class InmemStore extends Actor { case Confirm(pid, snr, cid) ⇒ update(pid, snr)(p ⇒ p.update(confirms = cid +: p.confirms)) success() - case Replay(pid, fromSnr, toSnr, callback) ⇒ { - for { - ms ← messages.get(pid) - m ← ms - if m.sequenceNr >= fromSnr && m.sequenceNr <= toSnr - } callback(m) - + case Replay(pid, fromSnr, toSnr, callback) ⇒ + read(pid, fromSnr, toSnr).foreach(callback) success(maxSequenceNr(pid)) - } } private def success(reply: Any = ()) = sender ! reply - - private def add(p: PersistentRepr) = messages = messages + (messages.get(p.processorId) match { - case Some(ms) ⇒ p.processorId -> (ms :+ p) - case None ⇒ p.processorId -> Vector(p) - }) - - private def update(pid: String, snr: Long)(f: PersistentRepr ⇒ PersistentRepr) = messages = messages.get(pid) match { - case Some(ms) ⇒ messages + (pid -> ms.map(sp ⇒ if (sp.sequenceNr == snr) f(sp) else sp)) - case None ⇒ messages - } - - private def delete(pid: String, snr: Long) = messages = messages.get(pid) match { - case Some(ms) ⇒ messages + (pid -> ms.filterNot(_.sequenceNr == snr)) - case None ⇒ messages - } - - private def maxSequenceNr(pid: String): Long = { - val snro = for { - ms ← messages.get(pid) - m ← ms.lastOption - } yield m.sequenceNr - snro.getOrElse(0L) - } } private[persistence] object InmemStore { - case class Write(p: PersistentRepr) case class WriteBatch(pb: Seq[PersistentRepr]) case class Delete(processorId: String, fromSequenceNr: Long, toSequenceNr: Long, permanent: Boolean) case class Confirm(processorId: String, sequenceNr: Long, channelId: String) diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/japi/AsyncWriteJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/japi/AsyncWriteJournal.scala index 23a9dc08de..58a87ba03c 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/japi/AsyncWriteJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/japi/AsyncWriteJournal.scala @@ -16,11 +16,8 @@ import akka.persistence.PersistentRepr abstract class AsyncWriteJournal extends AsyncReplay with SAsyncWriteJournal with AsyncWritePlugin { import context.dispatcher - final def writeAsync(persistent: PersistentRepr) = - doWriteAsync(persistent).map(Unit.unbox) - - final def writeBatchAsync(persistentBatch: immutable.Seq[PersistentRepr]) = - doWriteBatchAsync(persistentBatch.asJava).map(Unit.unbox) + final def writeAsync(persistentBatch: immutable.Seq[PersistentRepr]) = + doWriteAsync(persistentBatch.asJava).map(Unit.unbox) final def deleteAsync(processorId: String, fromSequenceNr: Long, toSequenceNr: Long, permanent: Boolean) = doDeleteAsync(processorId, fromSequenceNr, toSequenceNr, permanent).map(Unit.unbox) diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/japi/SyncWriteJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/japi/SyncWriteJournal.scala index 55e0fdbe05..894cbc9cc2 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/japi/SyncWriteJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/japi/SyncWriteJournal.scala @@ -14,11 +14,8 @@ import akka.persistence.PersistentRepr * Java API: abstract journal, optimized for synchronous writes. */ abstract class SyncWriteJournal extends AsyncReplay with SSyncWriteJournal with SyncWritePlugin { - final def write(persistent: PersistentRepr) = - doWrite(persistent) - - final def writeBatch(persistentBatch: immutable.Seq[PersistentRepr]) = - doWriteBatch(persistentBatch.asJava) + final def write(persistentBatch: immutable.Seq[PersistentRepr]) = + doWrite(persistentBatch.asJava) final def delete(processorId: String, fromSequenceNr: Long, toSequenceNr: Long, permanent: Boolean) = doDelete(processorId, fromSequenceNr, toSequenceNr, permanent) diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbIdMapping.scala b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbIdMapping.scala index b5f0db7bef..d52f38fe6a 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbIdMapping.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbIdMapping.scala @@ -9,6 +9,8 @@ import org.iq80.leveldb.DBIterator import akka.actor.Actor /** + * INTERNAL API. + * * LevelDB backed persistent mapping of `String`-based processor and channel ids to numeric ids. */ private[persistence] trait LeveldbIdMapping extends Actor { this: LeveldbJournal ⇒ diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbJournal.scala index 3d2cb94735..2d65853290 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbJournal.scala @@ -20,7 +20,7 @@ import akka.serialization.SerializationExtension * * LevelDB backed journal. */ -private[leveldb] class LeveldbJournal extends SyncWriteJournal with LeveldbIdMapping with LeveldbReplay { +private[persistence] class LeveldbJournal extends SyncWriteJournal with LeveldbIdMapping with LeveldbReplay { val config = context.system.settings.config.getConfig("akka.persistence.journal.leveldb") val nativeLeveldb = config.getBoolean("native") @@ -42,10 +42,7 @@ private[leveldb] class LeveldbJournal extends SyncWriteJournal with LeveldbIdMap import Key._ - def write(persistent: PersistentRepr) = - withBatch(batch ⇒ addToBatch(persistent, batch)) - - def writeBatch(persistentBatch: immutable.Seq[PersistentRepr]) = + def write(persistentBatch: immutable.Seq[PersistentRepr]) = withBatch(batch ⇒ persistentBatch.foreach(persistent ⇒ addToBatch(persistent, batch))) def delete(processorId: String, fromSequenceNr: Long, toSequenceNr: Long, permanent: Boolean) = withBatch { batch ⇒ diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbReplay.scala b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbReplay.scala index 849aed4905..afc0eee7d3 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbReplay.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbReplay.scala @@ -11,6 +11,8 @@ import akka.persistence._ import akka.persistence.journal.AsyncReplay /** + * INTERNAL API. + * * LevelDB backed message replay. */ private[persistence] trait LeveldbReplay extends AsyncReplay { this: LeveldbJournal ⇒ diff --git a/akka-persistence/src/main/scala/akka/persistence/snapshot/SnapshotStore.scala b/akka-persistence/src/main/scala/akka/persistence/snapshot/SnapshotStore.scala index 55248c2858..37b06db5bc 100644 --- a/akka-persistence/src/main/scala/akka/persistence/snapshot/SnapshotStore.scala +++ b/akka-persistence/src/main/scala/akka/persistence/snapshot/SnapshotStore.scala @@ -21,15 +21,14 @@ trait SnapshotStore extends Actor { private val extension = Persistence(context.system) final def receive = { - case LoadSnapshot(processorId, criteria, toSequenceNr) ⇒ { + case LoadSnapshot(processorId, criteria, toSequenceNr) ⇒ val p = sender loadAsync(processorId, criteria.limit(toSequenceNr)) map { sso ⇒ LoadSnapshotResult(sso, toSequenceNr) } recover { case e ⇒ LoadSnapshotResult(None, toSequenceNr) } pipeTo (p) - } - case SaveSnapshot(metadata, snapshot) ⇒ { + case SaveSnapshot(metadata, snapshot) ⇒ val p = sender val md = metadata.copy(timestamp = System.currentTimeMillis) saveAsync(md, snapshot) map { @@ -37,23 +36,18 @@ trait SnapshotStore extends Actor { } recover { case e ⇒ SaveSnapshotFailure(metadata, e) } to (self, p) - } - case evt @ SaveSnapshotSuccess(metadata) ⇒ { + case evt @ SaveSnapshotSuccess(metadata) ⇒ saved(metadata) sender ! evt // sender is processor - } - case evt @ SaveSnapshotFailure(metadata, _) ⇒ { + case evt @ SaveSnapshotFailure(metadata, _) ⇒ delete(metadata) sender ! evt // sender is processor - } - case d @ DeleteSnapshot(metadata) ⇒ { + case d @ DeleteSnapshot(metadata) ⇒ delete(metadata) if (extension.publishPluginCommands) context.system.eventStream.publish(d) - } - case d @ DeleteSnapshots(processorId, criteria) ⇒ { + case d @ DeleteSnapshots(processorId, criteria) ⇒ delete(processorId, criteria) if (extension.publishPluginCommands) context.system.eventStream.publish(d) - } } //#snapshot-store-plugin-api diff --git a/akka-persistence/src/main/scala/akka/persistence/snapshot/local/LocalSnapshotStore.scala b/akka-persistence/src/main/scala/akka/persistence/snapshot/local/LocalSnapshotStore.scala index ccdf1c98d9..51bcf70cf4 100644 --- a/akka-persistence/src/main/scala/akka/persistence/snapshot/local/LocalSnapshotStore.scala +++ b/akka-persistence/src/main/scala/akka/persistence/snapshot/local/LocalSnapshotStore.scala @@ -65,15 +65,13 @@ private[persistence] class LocalSnapshotStore extends SnapshotStore with ActorLo @scala.annotation.tailrec def load(metadata: SortedSet[SnapshotMetadata]): Option[SelectedSnapshot] = metadata.lastOption match { case None ⇒ None - case Some(md) ⇒ { + case Some(md) ⇒ Try(withInputStream(md)(deserialize)) match { case Success(s) ⇒ Some(SelectedSnapshot(md, s.data)) - case Failure(e) ⇒ { + case Failure(e) ⇒ log.error(e, s"error loading snapshot ${md}") load(metadata.init) // try older snapshot - } } - } } // Heuristics: diff --git a/akka-persistence/src/test/scala/akka/persistence/ChannelSpec.scala b/akka-persistence/src/test/scala/akka/persistence/ChannelSpec.scala index fb67bace37..9ede279ea7 100644 --- a/akka-persistence/src/test/scala/akka/persistence/ChannelSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/ChannelSpec.scala @@ -15,15 +15,13 @@ object ChannelSpec { val channel = context.actorOf(channelProps) def receive = { - case m @ Persistent(s: String, _) if s.startsWith("a") ⇒ { + case m @ Persistent(s: String, _) if s.startsWith("a") ⇒ // forward to destination via channel, // destination replies to initial sender channel forward Deliver(m.withPayload(s"fw: ${s}"), destination) - } - case m @ Persistent(s: String, _) if s.startsWith("b") ⇒ { + case m @ Persistent(s: String, _) if s.startsWith("b") ⇒ // reply to sender via channel channel ! Deliver(m.withPayload(s"re: ${s}"), sender) - } } } diff --git a/akka-persistence/src/test/scala/akka/persistence/EventsourcedSpec.scala b/akka-persistence/src/test/scala/akka/persistence/EventsourcedSpec.scala index 4232e24a15..35a978cfe2 100644 --- a/akka-persistence/src/test/scala/akka/persistence/EventsourcedSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/EventsourcedSpec.scala @@ -32,118 +32,105 @@ object EventsourcedSpec { class Behavior1Processor(name: String) extends ExampleProcessor(name) { val receiveCommand: Receive = commonBehavior orElse { - case Cmd(data) ⇒ { + case Cmd(data) ⇒ persist(Seq(Evt(s"${data}-1"), Evt(s"${data}-2")))(updateState) - } } } class Behavior2Processor(name: String) extends ExampleProcessor(name) { val receiveCommand: Receive = commonBehavior orElse { - case Cmd(data) ⇒ { + case Cmd(data) ⇒ persist(Seq(Evt(s"${data}-1"), Evt(s"${data}-2")))(updateState) persist(Seq(Evt(s"${data}-3"), Evt(s"${data}-4")))(updateState) - } } } class Behavior3Processor(name: String) extends ExampleProcessor(name) { val receiveCommand: Receive = commonBehavior orElse { - case Cmd(data) ⇒ { + case Cmd(data) ⇒ persist(Seq(Evt(s"${data}-11"), Evt(s"${data}-12")))(updateState) updateState(Evt(s"${data}-10")) - } } } class ChangeBehaviorInLastEventHandlerProcessor(name: String) extends ExampleProcessor(name) { val newBehavior: Receive = { - case Cmd(data) ⇒ { + case Cmd(data) ⇒ persist(Evt(s"${data}-21"))(updateState) persist(Evt(s"${data}-22")) { event ⇒ updateState(event) context.unbecome() } - } } val receiveCommand: Receive = commonBehavior orElse { - case Cmd(data) ⇒ { + case Cmd(data) ⇒ persist(Evt(s"${data}-0")) { event ⇒ updateState(event) context.become(newBehavior) } - } } } class ChangeBehaviorInFirstEventHandlerProcessor(name: String) extends ExampleProcessor(name) { val newBehavior: Receive = { - case Cmd(data) ⇒ { + case Cmd(data) ⇒ persist(Evt(s"${data}-21")) { event ⇒ updateState(event) context.unbecome() } persist(Evt(s"${data}-22"))(updateState) - } } val receiveCommand: Receive = commonBehavior orElse { - case Cmd(data) ⇒ { + case Cmd(data) ⇒ persist(Evt(s"${data}-0")) { event ⇒ updateState(event) context.become(newBehavior) } - } } } class ChangeBehaviorInCommandHandlerFirstProcessor(name: String) extends ExampleProcessor(name) { val newBehavior: Receive = { - case Cmd(data) ⇒ { + case Cmd(data) ⇒ context.unbecome() persist(Seq(Evt(s"${data}-31"), Evt(s"${data}-32")))(updateState) updateState(Evt(s"${data}-30")) - } } val receiveCommand: Receive = commonBehavior orElse { - case Cmd(data) ⇒ { + case Cmd(data) ⇒ context.become(newBehavior) persist(Evt(s"${data}-0"))(updateState) - } } } class ChangeBehaviorInCommandHandlerLastProcessor(name: String) extends ExampleProcessor(name) { val newBehavior: Receive = { - case Cmd(data) ⇒ { + case Cmd(data) ⇒ persist(Seq(Evt(s"${data}-31"), Evt(s"${data}-32")))(updateState) updateState(Evt(s"${data}-30")) context.unbecome() - } } val receiveCommand: Receive = commonBehavior orElse { - case Cmd(data) ⇒ { + case Cmd(data) ⇒ persist(Evt(s"${data}-0"))(updateState) context.become(newBehavior) - } } } class SnapshottingEventsourcedProcessor(name: String, probe: ActorRef) extends ExampleProcessor(name) { override def receiveReplay = super.receiveReplay orElse { - case SnapshotOffer(_, events: List[_]) ⇒ { + case SnapshotOffer(_, events: List[_]) ⇒ probe ! "offered" this.events = events - } } val receiveCommand: Receive = commonBehavior orElse { - case Cmd(data) ⇒ { + case Cmd(data) ⇒ persist(Seq(Evt(s"${data}-41"), Evt(s"${data}-42")))(updateState) - } case SaveSnapshotSuccess(_) ⇒ probe ! "saved" case "snap" ⇒ saveSnapshot(events) } @@ -175,36 +162,33 @@ object EventsourcedSpec { } val processC: Receive = { - case Cmd("c") ⇒ { + case Cmd("c") ⇒ persist(Evt("c")) { evt ⇒ updateState(evt) context.unbecome() } unstashAll() - } case other ⇒ stash() } } class UserStashFailureProcessor(name: String) extends ExampleProcessor(name) { val receiveCommand: Receive = commonBehavior orElse { - case Cmd(data) ⇒ { + case Cmd(data) ⇒ if (data == "b-2") throw new TestException("boom") persist(Evt(data)) { event ⇒ updateState(event) if (data == "a") context.become(otherCommandHandler) } - } } val otherCommandHandler: Receive = { - case Cmd("c") ⇒ { + case Cmd("c") ⇒ persist(Evt("c")) { event ⇒ updateState(event) context.unbecome() } unstashAll() - } case other ⇒ stash() } } diff --git a/akka-persistence/src/test/scala/akka/persistence/FailureSpec.scala b/akka-persistence/src/test/scala/akka/persistence/FailureSpec.scala new file mode 100644 index 0000000000..fd4c8bbcd2 --- /dev/null +++ b/akka-persistence/src/test/scala/akka/persistence/FailureSpec.scala @@ -0,0 +1,116 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ + +package akka.persistence + +import scala.concurrent.duration._ +import scala.concurrent.forkjoin.ThreadLocalRandom +import scala.language.postfixOps + +import com.typesafe.config.ConfigFactory + +import akka.actor._ +import akka.testkit._ + +object FailureSpec { + val config = ConfigFactory.parseString( + s""" + akka.persistence.processor.chaos.live-processing-failure-rate = 0.3 + akka.persistence.processor.chaos.replay-processing-failure-rate = 0.1 + akka.persistence.journal.plugin = "akka.persistence.journal.chaos" + akka.persistence.journal.chaos.write-failure-rate = 0.3 + akka.persistence.journal.chaos.delete-failure-rate = 0.3 + akka.persistence.journal.chaos.replay-failure-rate = 0.3 + akka.persistence.journal.chaos.class = akka.persistence.journal.chaos.ChaosJournal + akka.persistence.snapshot-store.local.dir = "target/snapshots-failure-spec/" + """) + + val numMessages = 10 + + case object Start + case class Done(ints: Vector[Int]) + + case class ProcessingFailure(i: Int) + case class JournalingFailure(i: Int) + + class ChaosProcessor extends Processor with ActorLogging { + val config = context.system.settings.config.getConfig("akka.persistence.processor.chaos") + val liveProcessingFailureRate = config.getDouble("live-processing-failure-rate") + val replayProcessingFailureRate = config.getDouble("replay-processing-failure-rate") + + // processor state + var ints = Vector.empty[Int] + + override def processorId = "chaos" + + def random = ThreadLocalRandom.current + + def receive = { + case Persistent(i: Int, _) ⇒ + val failureRate = if (recoveryRunning) replayProcessingFailureRate else liveProcessingFailureRate + if (ints.contains(i)) { + log.debug(debugMessage(s"ignored duplicate ${i}")) + } else if (shouldFail(failureRate)) { + throw new TestException(debugMessage(s"rejected payload ${i}")) + } else { + ints :+= i + if (ints.length == numMessages) sender ! Done(ints) + log.debug(debugMessage(s"processed payload ${i}")) + } + case PersistenceFailure(i: Int, _, _) ⇒ + // inform sender about journaling failure so that it can resend + sender ! JournalingFailure(i) + case RecoveryFailure(_) ⇒ + // journal failed during recovery, throw exception to re-recover processor + throw new TestException(debugMessage("recovery failed")) + } + + override def preRestart(reason: Throwable, message: Option[Any]): Unit = { + message match { + case Some(p @ Persistent(i: Int, _)) if !recoveryRunning ⇒ + deleteMessage(p.sequenceNr) + log.debug(debugMessage(s"requested deletion of payload ${i}")) + // inform sender about processing failure so that it can resend + sender ! ProcessingFailure(i) + case _ ⇒ + } + super.preRestart(reason, message) + } + + private def shouldFail(rate: Double) = + random.nextDouble() < rate + + private def debugMessage(msg: String): String = + s"${msg} (mode = ${if (recoveryRunning) "replay" else "live"} snr = ${lastSequenceNr} state = ${ints.sorted})" + } + + class ChaosProcessorApp(probe: ActorRef) extends Actor with ActorLogging { + val processor = context.actorOf(Props[ChaosProcessor]) + + def receive = { + case Start ⇒ 1 to numMessages foreach (processor ! Persistent(_)) + case Done(ints) ⇒ probe ! Done(ints) + case ProcessingFailure(i) ⇒ + processor ! Persistent(i) + log.debug(s"resent ${i} after processing failure") + case JournalingFailure(i) ⇒ + processor ! Persistent(i) + log.debug(s"resent ${i} after journaling failure") + } + } +} + +class FailureSpec extends AkkaSpec(FailureSpec.config) with Cleanup with ImplicitSender { + import FailureSpec._ + + "The journaling protocol (= conversation between a processor and a journal)" must { + "tolerate and recover from random failures" in { + system.actorOf(Props(classOf[ChaosProcessorApp], testActor)) ! Start + expectMsgPF(numMessages seconds) { case Done(ints) ⇒ ints.sorted must be(1 to numMessages toVector) } + + system.actorOf(Props(classOf[ChaosProcessorApp], testActor)) // recovery of new instance must have same outcome + expectMsgPF(numMessages seconds) { case Done(ints) ⇒ ints.sorted must be(1 to numMessages toVector) } + } + } +} diff --git a/akka-persistence/src/test/scala/akka/persistence/PerformanceSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PerformanceSpec.scala index c23ad9f920..c3b356f0e4 100644 --- a/akka-persistence/src/test/scala/akka/persistence/PerformanceSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/PerformanceSpec.scala @@ -9,7 +9,7 @@ import akka.actor._ import akka.testkit._ object PerformanceSpec { - // multiply cycles with 100 for more + // multiply cycles with 200 for more // accurate throughput measurements val config = """ @@ -33,15 +33,13 @@ object PerformanceSpec { var failAt: Long = -1 val controlBehavior: Receive = { - case StartMeasure ⇒ { + case StartMeasure ⇒ startSequenceNr = lastSequenceNr startTime = System.nanoTime - } - case StopMeasure ⇒ { + case StopMeasure ⇒ stopSequenceNr = lastSequenceNr stopTime = System.nanoTime sender ! (NanoToSecond * (stopSequenceNr - startSequenceNr) / (stopTime - startTime)) - } case FailAt(sequenceNr) ⇒ failAt = sequenceNr } @@ -53,10 +51,9 @@ object PerformanceSpec { class CommandsourcedTestProcessor(name: String) extends PerformanceTestProcessor(name) { def receive = controlBehavior orElse { - case p: Persistent ⇒ { + case p: Persistent ⇒ if (lastSequenceNr % 1000 == 0) if (recoveryRunning) print("r") else print(".") if (lastSequenceNr == failAt) throw new TestException("boom") - } } } @@ -88,10 +85,9 @@ object PerformanceSpec { }) val processC: Receive = printProgress andThen { - case "c" ⇒ { + case "c" ⇒ persist("c")(_ ⇒ context.unbecome()) unstashAll() - } case other ⇒ stash() } } @@ -100,13 +96,13 @@ object PerformanceSpec { class PerformanceSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "performance").withFallback(ConfigFactory.parseString(PerformanceSpec.config))) with PersistenceSpec with ImplicitSender { import PerformanceSpec._ - val warmupClycles = system.settings.config.getInt("akka.persistence.performance.cycles.warmup") + val warmupCycles = system.settings.config.getInt("akka.persistence.performance.cycles.warmup") val loadCycles = system.settings.config.getInt("akka.persistence.performance.cycles.load") def stressCommandsourcedProcessor(failAt: Option[Long]): Unit = { val processor = namedProcessor[CommandsourcedTestProcessor] failAt foreach { processor ! FailAt(_) } - 1 to warmupClycles foreach { i ⇒ processor ! Persistent(s"msg${i}") } + 1 to warmupCycles foreach { i ⇒ processor ! Persistent(s"msg${i}") } processor ! StartMeasure 1 to loadCycles foreach { i ⇒ processor ! Persistent(s"msg${i}") } processor ! StopMeasure @@ -118,7 +114,7 @@ class PerformanceSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "perfor def stressEventsourcedProcessor(failAt: Option[Long]): Unit = { val processor = namedProcessor[EventsourcedTestProcessor] failAt foreach { processor ! FailAt(_) } - 1 to warmupClycles foreach { i ⇒ processor ! s"msg${i}" } + 1 to warmupCycles foreach { i ⇒ processor ! s"msg${i}" } processor ! StartMeasure 1 to loadCycles foreach { i ⇒ processor ! s"msg${i}" } processor ! StopMeasure @@ -129,7 +125,7 @@ class PerformanceSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "perfor def stressStashingEventsourcedProcessor(): Unit = { val processor = namedProcessor[StashingEventsourcedTestProcessor] - 1 to warmupClycles foreach { i ⇒ processor ! "b" } + 1 to warmupCycles foreach { i ⇒ processor ! "b" } processor ! StartMeasure val cmds = 1 to (loadCycles / 3) flatMap (_ ⇒ List("a", "b", "c")) processor ! StartMeasure @@ -145,7 +141,7 @@ class PerformanceSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "perfor stressCommandsourcedProcessor(None) } "have some reasonable throughput under failure conditions" in { - stressCommandsourcedProcessor(Some(warmupClycles + loadCycles / 10)) + stressCommandsourcedProcessor(Some(warmupCycles + loadCycles / 10)) } } @@ -154,7 +150,7 @@ class PerformanceSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "perfor stressEventsourcedProcessor(None) } "have some reasonable throughput under failure conditions" in { - stressEventsourcedProcessor(Some(warmupClycles + loadCycles / 10)) + stressEventsourcedProcessor(Some(warmupCycles + loadCycles / 10)) } "have some reasonable throughput with stashing and unstashing every 3rd command" in { stressStashingEventsourcedProcessor() diff --git a/akka-persistence/src/test/scala/akka/persistence/PersistenceSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PersistenceSpec.scala index 90138ce8cb..9e8671e7e9 100644 --- a/akka-persistence/src/test/scala/akka/persistence/PersistenceSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/PersistenceSpec.scala @@ -18,10 +18,10 @@ import org.scalatest.BeforeAndAfterEach import akka.actor.Props import akka.testkit.AkkaSpec -trait PersistenceSpec extends BeforeAndAfterEach { this: AkkaSpec ⇒ +trait PersistenceSpec extends BeforeAndAfterEach with Cleanup { this: AkkaSpec ⇒ private var _name: String = _ - val extension = Persistence(system) + lazy val extension = Persistence(system) val counter = new AtomicInteger(0) /** @@ -43,12 +43,6 @@ trait PersistenceSpec extends BeforeAndAfterEach { this: AkkaSpec ⇒ override protected def beforeEach() { _name = namePrefix + counter.incrementAndGet() } - - override protected def afterTermination() { - List("akka.persistence.journal.leveldb.dir", "akka.persistence.snapshot-store.local.dir") foreach { s ⇒ - FileUtils.deleteDirectory(new File(system.settings.config.getString(s))) - } - } } object PersistenceSpec { @@ -63,6 +57,20 @@ object PersistenceSpec { """) } +trait Cleanup { this: AkkaSpec ⇒ + val storageLocations = List( + "akka.persistence.journal.leveldb.dir", + "akka.persistence.snapshot-store.local.dir").map(s ⇒ new File(system.settings.config.getString(s))) + + override protected def atStartup() { + storageLocations.foreach(FileUtils.deleteDirectory) + } + + override protected def afterTermination() { + storageLocations.foreach(FileUtils.deleteDirectory) + } +} + abstract class NamedProcessor(name: String) extends Processor { override def processorId: String = name } diff --git a/akka-persistence/src/test/scala/akka/persistence/ProcessorSpec.scala b/akka-persistence/src/test/scala/akka/persistence/ProcessorSpec.scala index 525476e078..1e1baa2d02 100644 --- a/akka-persistence/src/test/scala/akka/persistence/ProcessorSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/ProcessorSpec.scala @@ -47,17 +47,15 @@ object ProcessorSpec { class BehaviorChangeTestProcessor(name: String) extends NamedProcessor(name) { val acceptA: Actor.Receive = { - case Persistent("a", _) ⇒ { + case Persistent("a", _) ⇒ sender ! "a" context.become(acceptB) - } } val acceptB: Actor.Receive = { - case Persistent("b", _) ⇒ { + case Persistent("b", _) ⇒ sender ! "b" context.become(acceptA) - } } def receive = acceptA @@ -67,15 +65,13 @@ object ProcessorSpec { startWith("closed", 0) when("closed") { - case Event(Persistent("a", _), counter) ⇒ { + case Event(Persistent("a", _), counter) ⇒ goto("open") using (counter + 1) replying (counter) - } } when("open") { - case Event(Persistent("b", _), counter) ⇒ { + case Event(Persistent("b", _), counter) ⇒ goto("closed") using (counter + 1) replying (counter) - } } } diff --git a/akka-persistence/src/test/scala/akka/persistence/ProcessorStashSpec.scala b/akka-persistence/src/test/scala/akka/persistence/ProcessorStashSpec.scala index 6f6976466e..def5001a7e 100644 --- a/akka-persistence/src/test/scala/akka/persistence/ProcessorStashSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/ProcessorStashSpec.scala @@ -14,18 +14,27 @@ object ProcessorStashSpec { var state: List[String] = Nil val behaviorA: Actor.Receive = { - case Persistent("a", snr) ⇒ { update("a", snr); context.become(behaviorB) } - case Persistent("b", snr) ⇒ update("b", snr) - case Persistent("c", snr) ⇒ { update("c", snr); unstashAll() } - case "x" ⇒ update("x") + case Persistent("a", snr) ⇒ + update("a", snr) + context.become(behaviorB) + case Persistent("b", snr) ⇒ + update("b", snr) + case Persistent("c", snr) ⇒ + update("c", snr) + unstashAll() + case "x" ⇒ + update("x") case "boom" ⇒ throw new TestException("boom") case Persistent("boom", _) ⇒ throw new TestException("boom") case GetState ⇒ sender ! state.reverse } val behaviorB: Actor.Receive = { - case Persistent("b", _) ⇒ { stash(); context.become(behaviorA) } - case "x" ⇒ stash() + case Persistent("b", _) ⇒ + stash() + context.become(behaviorA) + case "x" ⇒ + stash() } def receive = behaviorA diff --git a/akka-persistence/src/test/scala/akka/persistence/SnapshotSpec.scala b/akka-persistence/src/test/scala/akka/persistence/SnapshotSpec.scala index 918b9b0f79..a6d1c0d14c 100644 --- a/akka-persistence/src/test/scala/akka/persistence/SnapshotSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/SnapshotSpec.scala @@ -69,10 +69,9 @@ class SnapshotSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "snapshot" processor ! Recover() expectMsgPF() { - case (SnapshotMetadata(`processorId`, 4, timestamp), state) ⇒ { + case (SnapshotMetadata(`processorId`, 4, timestamp), state) ⇒ state must be(List("a-1", "b-2", "c-3", "d-4").reverse) timestamp must be > (0L) - } } expectMsg("e-5") expectMsg("f-6") @@ -84,10 +83,9 @@ class SnapshotSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "snapshot" processor ! Recover(toSequenceNr = 3) expectMsgPF() { - case (SnapshotMetadata(`processorId`, 2, timestamp), state) ⇒ { + case (SnapshotMetadata(`processorId`, 2, timestamp), state) ⇒ state must be(List("a-1", "b-2").reverse) timestamp must be > (0L) - } } expectMsg("c-3") } @@ -99,10 +97,9 @@ class SnapshotSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "snapshot" processor ! "done" expectMsgPF() { - case (SnapshotMetadata(`processorId`, 4, timestamp), state) ⇒ { + case (SnapshotMetadata(`processorId`, 4, timestamp), state) ⇒ state must be(List("a-1", "b-2", "c-3", "d-4").reverse) timestamp must be > (0L) - } } expectMsg("done") } @@ -113,10 +110,9 @@ class SnapshotSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "snapshot" processor ! Recover(fromSnapshot = SnapshotSelectionCriteria(maxSequenceNr = 2)) expectMsgPF() { - case (SnapshotMetadata(`processorId`, 2, timestamp), state) ⇒ { + case (SnapshotMetadata(`processorId`, 2, timestamp), state) ⇒ state must be(List("a-1", "b-2").reverse) timestamp must be > (0L) - } } expectMsg("c-3") expectMsg("d-4") @@ -130,10 +126,9 @@ class SnapshotSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "snapshot" processor ! Recover(fromSnapshot = SnapshotSelectionCriteria(maxSequenceNr = 2), toSequenceNr = 3) expectMsgPF() { - case (SnapshotMetadata(`processorId`, 2, timestamp), state) ⇒ { + case (SnapshotMetadata(`processorId`, 2, timestamp), state) ⇒ state must be(List("a-1", "b-2").reverse) timestamp must be > (0L) - } } expectMsg("c-3") } @@ -159,10 +154,9 @@ class SnapshotSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "snapshot" processor1 ! "done" val metadata = expectMsgPF() { - case (md @ SnapshotMetadata(`processorId`, 4, _), state) ⇒ { + case (md @ SnapshotMetadata(`processorId`, 4, _), state) ⇒ state must be(List("a-1", "b-2", "c-3", "d-4").reverse) md - } } expectMsg("done") @@ -174,10 +168,9 @@ class SnapshotSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "snapshot" processor2 ! Recover(toSequenceNr = 4) expectMsgPF() { - case (md @ SnapshotMetadata(`processorId`, 2, _), state) ⇒ { + case (md @ SnapshotMetadata(`processorId`, 2, _), state) ⇒ state must be(List("a-1", "b-2").reverse) md - } } expectMsg("c-3") expectMsg("d-4") @@ -194,9 +187,8 @@ class SnapshotSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "snapshot" processor1 ! Recover(toSequenceNr = 4) processor1 ! DeleteN(SnapshotSelectionCriteria(maxSequenceNr = 4)) expectMsgPF() { - case (md @ SnapshotMetadata(`processorId`, 4, _), state) ⇒ { + case (md @ SnapshotMetadata(`processorId`, 4, _), state) ⇒ state must be(List("a-1", "b-2", "c-3", "d-4").reverse) - } } deleteProbe.expectMsgType[DeleteSnapshots] diff --git a/akka-persistence/src/test/scala/akka/persistence/journal/chaos/ChaosJournal.scala b/akka-persistence/src/test/scala/akka/persistence/journal/chaos/ChaosJournal.scala new file mode 100644 index 0000000000..82c6e81572 --- /dev/null +++ b/akka-persistence/src/test/scala/akka/persistence/journal/chaos/ChaosJournal.scala @@ -0,0 +1,64 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ + +package akka.persistence.journal.chaos + +import scala.collection.immutable.Seq +import scala.concurrent.Future +import scala.concurrent.forkjoin.ThreadLocalRandom + +import akka.persistence._ +import akka.persistence.journal.SyncWriteJournal +import akka.persistence.journal.inmem.InmemMessages + +class WriteFailedException(ps: Seq[PersistentRepr]) + extends TestException(s"write failed for payloads = [${ps.map(_.payload)}]") + +class ReplayFailedException(ps: Seq[PersistentRepr]) + extends TestException(s"replay failed after payloads = [${ps.map(_.payload)}]") + +class DeleteFailedException(processorId: String, fromSequenceNr: Long, toSequenceNr: Long) + extends TestException(s"delete failed for processor id = [${processorId}], from sequence number = [${fromSequenceNr}], to sequence number = [${toSequenceNr}]") + +/** + * Keep [[ChaosJournal]] state in an external singleton so that it survives journal restarts. + * The journal itself uses a dedicated dispatcher, so there won't be any visibility issues. + */ +private object ChaosJournalMessages extends InmemMessages + +class ChaosJournal extends SyncWriteJournal { + import ChaosJournalMessages.{ delete ⇒ del, _ } + + val config = context.system.settings.config.getConfig("akka.persistence.journal.chaos") + val writeFailureRate = config.getDouble("write-failure-rate") + val deleteFailureRate = config.getDouble("delete-failure-rate") + val replayFailureRate = config.getDouble("replay-failure-rate") + + def random = ThreadLocalRandom.current + + def write(persistentBatch: Seq[PersistentRepr]): Unit = + if (shouldFail(writeFailureRate)) throw new WriteFailedException(persistentBatch) + else persistentBatch.foreach(add) + + def delete(processorId: String, fromSequenceNr: Long, toSequenceNr: Long, permanent: Boolean): Unit = + if (shouldFail(deleteFailureRate)) throw new DeleteFailedException(processorId, fromSequenceNr, toSequenceNr) + else fromSequenceNr to toSequenceNr foreach { snr ⇒ if (permanent) del(processorId, snr) else update(processorId, snr)(_.update(deleted = true)) } + + def confirm(processorId: String, sequenceNr: Long, channelId: String): Unit = + update(processorId, sequenceNr)(p ⇒ p.update(confirms = channelId +: p.confirms)) + + def replayAsync(processorId: String, fromSequenceNr: Long, toSequenceNr: Long)(replayCallback: (PersistentRepr) ⇒ Unit): Future[Long] = + if (shouldFail(replayFailureRate)) { + val rm = read(processorId, fromSequenceNr, toSequenceNr) + val sm = rm.take(random.nextInt(rm.length + 1)) + sm.foreach(replayCallback) + Future.failed(new ReplayFailedException(sm)) + } else { + read(processorId, fromSequenceNr, toSequenceNr).foreach(replayCallback) + Future.successful(maxSequenceNr(processorId)) + } + + def shouldFail(rate: Double): Boolean = + random.nextDouble() < rate +} diff --git a/akka-samples/akka-sample-persistence/src/main/scala/sample/persistence/ConversationRecoveryExample.scala b/akka-samples/akka-sample-persistence/src/main/scala/sample/persistence/ConversationRecoveryExample.scala index 89a869b276..04a830922e 100644 --- a/akka-samples/akka-sample-persistence/src/main/scala/sample/persistence/ConversationRecoveryExample.scala +++ b/akka-samples/akka-sample-persistence/src/main/scala/sample/persistence/ConversationRecoveryExample.scala @@ -16,13 +16,12 @@ object ConversationRecoveryExample extends App { var counter = 0 def receive = { - case m @ ConfirmablePersistent(Ping, _) ⇒ { + case m @ ConfirmablePersistent(Ping, _) ⇒ counter += 1 println(s"received ping ${counter} times ...") m.confirm() if (!recoveryRunning) Thread.sleep(1000) pongChannel ! Deliver(m.withPayload(Pong), sender, Resolve.Destination) - } case "init" ⇒ if (counter == 0) pongChannel ! Deliver(Persistent(Pong), sender) } @@ -34,13 +33,12 @@ object ConversationRecoveryExample extends App { var counter = 0 def receive = { - case m @ ConfirmablePersistent(Pong, _) ⇒ { + case m @ ConfirmablePersistent(Pong, _) ⇒ counter += 1 println(s"received pong ${counter} times ...") m.confirm() if (!recoveryRunning) Thread.sleep(1000) pingChannel ! Deliver(m.withPayload(Ping), sender, Resolve.Destination) - } } override def preStart() = () diff --git a/akka-samples/akka-sample-persistence/src/main/scala/sample/persistence/EventsourcedExample.scala b/akka-samples/akka-sample-persistence/src/main/scala/sample/persistence/EventsourcedExample.scala index d357a26a66..c4ba215241 100644 --- a/akka-samples/akka-sample-persistence/src/main/scala/sample/persistence/EventsourcedExample.scala +++ b/akka-samples/akka-sample-persistence/src/main/scala/sample/persistence/EventsourcedExample.scala @@ -32,26 +32,24 @@ class ExampleProcessor extends EventsourcedProcessor { } val receiveCommand: Receive = { - case Cmd(data) ⇒ { + case Cmd(data) ⇒ persist(Evt(s"${data}-${numEvents}"))(updateState) persist(Evt(s"${data}-${numEvents + 1}")) { event ⇒ updateState(event) context.system.eventStream.publish(event) if (data == "foo") context.become(otherCommandHandler) } - } case "snap" ⇒ saveSnapshot(state) case "print" ⇒ println(state) } val otherCommandHandler: Receive = { - case Cmd("bar") ⇒ { + case Cmd("bar") ⇒ persist(Evt(s"bar-${numEvents}")) { event ⇒ updateState(event) context.unbecome() } unstashAll() - } case other ⇒ stash() } } diff --git a/akka-samples/akka-sample-persistence/src/main/scala/sample/persistence/ProcessorChannelExample.scala b/akka-samples/akka-sample-persistence/src/main/scala/sample/persistence/ProcessorChannelExample.scala index cdac4e3716..0735334ed8 100644 --- a/akka-samples/akka-sample-persistence/src/main/scala/sample/persistence/ProcessorChannelExample.scala +++ b/akka-samples/akka-sample-persistence/src/main/scala/sample/persistence/ProcessorChannelExample.scala @@ -16,20 +16,18 @@ object ProcessorChannelExample extends App { var received: List[Persistent] = Nil def receive = { - case p @ Persistent(payload, _) ⇒ { + case p @ Persistent(payload, _) ⇒ println(s"processed ${payload}") channel forward Deliver(p.withPayload(s"processed ${payload}"), destination) - } } } class ExampleDestination extends Actor { def receive = { - case p @ ConfirmablePersistent(payload, snr) ⇒ { + case p @ ConfirmablePersistent(payload, snr) ⇒ println(s"received ${payload}") sender ! s"re: ${payload} (${snr})" p.confirm() - } } } diff --git a/akka-samples/akka-sample-persistence/src/main/scala/sample/persistence/SnapshotExample.scala b/akka-samples/akka-sample-persistence/src/main/scala/sample/persistence/SnapshotExample.scala index 17f02e1095..e38d05fd77 100644 --- a/akka-samples/akka-sample-persistence/src/main/scala/sample/persistence/SnapshotExample.scala +++ b/akka-samples/akka-sample-persistence/src/main/scala/sample/persistence/SnapshotExample.scala @@ -20,9 +20,11 @@ object SnapshotExample extends App { case Persistent(s, snr) ⇒ state = state.update(s"${s}-${snr}") case SaveSnapshotSuccess(metadata) ⇒ // ... case SaveSnapshotFailure(metadata, reason) ⇒ // ... - case SnapshotOffer(_, s: ExampleState) ⇒ { println("offered state = " + s); state = s } - case "print" ⇒ println("current state = " + state) - case "snap" ⇒ saveSnapshot(state) + case SnapshotOffer(_, s: ExampleState) ⇒ + println("offered state = " + s) + state = s + case "print" ⇒ println("current state = " + state) + case "snap" ⇒ saveSnapshot(state) } }