!per #17832 Remove SyncWriteJournal
This commit is contained in:
parent
17760c020c
commit
4638f5630e
20 changed files with 218 additions and 357 deletions
|
|
@ -34,8 +34,8 @@ interface AsyncRecoveryPlugin {
|
|||
* @param replayCallback
|
||||
* called to replay a single message. Can be called from any thread.
|
||||
*/
|
||||
Future<Void> doAsyncReplayMessages(String persistenceId, long fromSequenceNr, long toSequenceNr, long max,
|
||||
Consumer<PersistentRepr> replayCallback);
|
||||
Future<Void> doAsyncReplayMessages(String persistenceId, long fromSequenceNr,
|
||||
long toSequenceNr, long max, Consumer<PersistentRepr> replayCallback);
|
||||
|
||||
/**
|
||||
* Java API, Plugin API: asynchronously reads the highest stored sequence
|
||||
|
|
|
|||
|
|
@ -38,7 +38,7 @@ interface AsyncWritePlugin {
|
|||
* returned `Future` must be completed with failure. The `Future` must only be
|
||||
* completed with success when all messages in the batch have been confirmed
|
||||
* to be stored successfully, i.e. they will be readable, and visible, in a
|
||||
* subsequent replay. If there are uncertainty about if the messages were
|
||||
* subsequent replay. If there is uncertainty about if the messages were
|
||||
* stored or not the `Future` must be completed with failure.
|
||||
*
|
||||
* Data store connection problems must be signaled by completing the `Future`
|
||||
|
|
@ -55,6 +55,9 @@ interface AsyncWritePlugin {
|
|||
* serialization error.
|
||||
*
|
||||
* Data store connection problems must not be signaled as rejections.
|
||||
*
|
||||
* Note that it is possible to reduce number of allocations by caching some
|
||||
* result `Iterable` for the happy path, i.e. when no messages are rejected.
|
||||
*/
|
||||
Future<Iterable<Optional<Exception>>> doAsyncWriteMessages(Iterable<AtomicWrite> messages);
|
||||
|
||||
|
|
|
|||
|
|
@ -1,67 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.persistence.journal.japi;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
import akka.persistence.*;
|
||||
import scala.concurrent.Future;
|
||||
|
||||
interface SyncWritePlugin {
|
||||
//#sync-write-plugin-api
|
||||
/**
|
||||
* Java API, Plugin API: asynchronously writes a batch (`Iterable`) of
|
||||
* persistent messages to the journal.
|
||||
*
|
||||
* The batch is only for performance reasons, i.e. all messages don't have to
|
||||
* be written atomically. Higher throughput can typically be achieved by using
|
||||
* batch inserts of many records compared inserting records one-by-one, but
|
||||
* this aspect depends on the underlying data store and a journal
|
||||
* implementation can implement it as efficient as possible with the
|
||||
* assumption that the messages of the batch are unrelated.
|
||||
*
|
||||
* Each `AtomicWrite` message contains the single `PersistentRepr` that
|
||||
* corresponds to the event that was passed to the `persist` method of the
|
||||
* `PersistentActor`, or it contains several `PersistentRepr` that corresponds
|
||||
* to the events that were passed to the `persistAll` method of the
|
||||
* `PersistentActor`. All `PersistentRepr` of the `AtomicWrite` must be
|
||||
* written to the data store atomically, i.e. all or none must be stored. If
|
||||
* the journal (data store) cannot support atomic writes of multiple events it
|
||||
* should reject such writes with an `Optional` with an
|
||||
* `UnsupportedOperationException` describing the issue. This limitation
|
||||
* should also be documented by the journal plugin.
|
||||
*
|
||||
* If there are failures when storing any of the messages in the batch the
|
||||
* method must throw an exception. The method must only return normally when
|
||||
* all messages in the batch have been confirmed to be stored successfully,
|
||||
* i.e. they will be readable, and visible, in a subsequent replay. If there
|
||||
* are uncertainty about if the messages were stored or not the method must
|
||||
* throw an exception.
|
||||
*
|
||||
* Data store connection problems must be signaled by throwing an exception.
|
||||
*
|
||||
* The journal can also signal that it rejects individual messages
|
||||
* (`AtomicWrite`) by the returned
|
||||
* `Iterable<Optional<Exception>>`. The returned `Iterable` must
|
||||
* have as many elements as the input `messages` `Iterable`. Each `Optional`
|
||||
* element signals if the corresponding `AtomicWrite` is rejected or not, with
|
||||
* an exception describing the problem. Rejecting a message means it was not
|
||||
* stored, i.e. it must not be included in a later replay. Rejecting a message
|
||||
* is typically done before attempting to store it, e.g. because of
|
||||
* serialization error.
|
||||
*
|
||||
* Data store connection problems must not be signaled as rejections.
|
||||
*/
|
||||
Iterable<Optional<Exception>> doWriteMessages(Iterable<AtomicWrite> messages);
|
||||
|
||||
/**
|
||||
* Java API, Plugin API: synchronously deletes all persistent messages up to
|
||||
* `toSequenceNr`.
|
||||
*
|
||||
* @see AsyncRecoveryPlugin
|
||||
*/
|
||||
void doDeleteMessagesTo(String persistenceId, long toSequenceNr);
|
||||
//#sync-write-plugin-api
|
||||
}
|
||||
|
|
@ -42,7 +42,7 @@ private[persistence] object JournalProtocol {
|
|||
* @param persistentActor write requestor.
|
||||
*/
|
||||
final case class WriteMessages(messages: immutable.Seq[PersistentEnvelope], persistentActor: ActorRef, actorInstanceId: Int)
|
||||
extends Request
|
||||
extends Request with NoSerializationVerificationNeeded
|
||||
|
||||
/**
|
||||
* Reply message to a successful [[WriteMessages]] request. This reply is sent to the requestor
|
||||
|
|
@ -78,7 +78,7 @@ private[persistence] object JournalProtocol {
|
|||
* @param cause failure cause.
|
||||
*/
|
||||
final case class WriteMessageRejected(message: PersistentRepr, cause: Throwable, actorInstanceId: Int)
|
||||
extends Response
|
||||
extends Response with NoSerializationVerificationNeeded
|
||||
|
||||
/**
|
||||
* Reply message to a failed [[WriteMessages]] request. For each contained [[PersistentRepr]] message
|
||||
|
|
@ -88,7 +88,7 @@ private[persistence] object JournalProtocol {
|
|||
* @param cause failure cause.
|
||||
*/
|
||||
final case class WriteMessageFailure(message: PersistentRepr, cause: Throwable, actorInstanceId: Int)
|
||||
extends Response
|
||||
extends Response with NoSerializationVerificationNeeded
|
||||
|
||||
/**
|
||||
* Reply message to a [[WriteMessages]] with a non-persistent message.
|
||||
|
|
@ -96,7 +96,7 @@ private[persistence] object JournalProtocol {
|
|||
* @param message looped message.
|
||||
*/
|
||||
final case class LoopMessageSuccess(message: Any, actorInstanceId: Int)
|
||||
extends Response
|
||||
extends Response with NoSerializationVerificationNeeded
|
||||
|
||||
/**
|
||||
* Request to replay messages to `persistentActor`.
|
||||
|
|
@ -117,7 +117,7 @@ private[persistence] object JournalProtocol {
|
|||
* @param persistent replayed message.
|
||||
*/
|
||||
final case class ReplayedMessage(persistent: PersistentRepr)
|
||||
extends Response with DeadLetterSuppression
|
||||
extends Response with DeadLetterSuppression with NoSerializationVerificationNeeded
|
||||
|
||||
/**
|
||||
* Reply message to a successful [[ReplayMessages]] request. This reply is sent to the requestor
|
||||
|
|
|
|||
|
|
@ -7,10 +7,10 @@ package akka.persistence
|
|||
import scala.collection.immutable
|
||||
import java.lang.{ Iterable ⇒ JIterable }
|
||||
import java.util.{ List ⇒ JList }
|
||||
|
||||
import akka.actor.{ ActorContext, ActorRef }
|
||||
import akka.pattern.PromiseActorRef
|
||||
import akka.persistence.serialization.Message
|
||||
import akka.actor.NoSerializationVerificationNeeded
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
|
|
@ -45,7 +45,6 @@ final case class AtomicWrite(payload: immutable.Seq[PersistentRepr]) extends Per
|
|||
/**
|
||||
* Plugin API: representation of a persistent message in the journal plugin API.
|
||||
*
|
||||
* @see [[akka.persistence.journal.SyncWriteJournal]]
|
||||
* @see [[akka.persistence.journal.AsyncWriteJournal]]
|
||||
* @see [[akka.persistence.journal.AsyncRecovery]]
|
||||
*/
|
||||
|
|
@ -152,7 +151,7 @@ private[persistence] final case class PersistentImpl(
|
|||
override val manifest: String,
|
||||
override val deleted: Boolean,
|
||||
override val sender: ActorRef,
|
||||
override val writerUuid: String) extends PersistentRepr {
|
||||
override val writerUuid: String) extends PersistentRepr with NoSerializationVerificationNeeded {
|
||||
|
||||
def withPayload(payload: Any): PersistentRepr =
|
||||
copy(payload = payload)
|
||||
|
|
|
|||
|
|
@ -32,9 +32,9 @@ trait AsyncRecovery {
|
|||
* thread.
|
||||
*
|
||||
* @see [[AsyncWriteJournal]]
|
||||
* @see [[SyncWriteJournal]]
|
||||
*/
|
||||
def asyncReplayMessages(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)(replayCallback: PersistentRepr ⇒ Unit): Future[Unit]
|
||||
def asyncReplayMessages(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long,
|
||||
max: Long)(replayCallback: PersistentRepr ⇒ Unit): Future[Unit]
|
||||
|
||||
/**
|
||||
* Plugin API: asynchronously reads the highest stored sequence number for the
|
||||
|
|
|
|||
|
|
@ -126,40 +126,47 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery {
|
|||
|
||||
//#journal-plugin-api
|
||||
/**
|
||||
* Plugin API: asynchronously writes a batch (`Seq`) of persistent messages to the journal.
|
||||
* Plugin API: asynchronously writes a batch (`Seq`) of persistent messages to the
|
||||
* journal.
|
||||
*
|
||||
* The batch is only for performance reasons, i.e. all messages don't have to be written
|
||||
* atomically. Higher throughput can typically be achieved by using batch inserts of many
|
||||
* records compared inserting records one-by-one, but this aspect depends on the underlying
|
||||
* data store and a journal implementation can implement it as efficient as possible with
|
||||
* the assumption that the messages of the batch are unrelated.
|
||||
* records compared inserting records one-by-one, but this aspect depends on the
|
||||
* underlying data store and a journal implementation can implement it as efficient as
|
||||
* possible with the assumption that the messages of the batch are unrelated.
|
||||
*
|
||||
* Each `AtomicWrite` message contains the single `PersistentRepr` that corresponds to the
|
||||
* event that was passed to the `persist` method of the `PersistentActor`, or it contains
|
||||
* several `PersistentRepr` that corresponds to the events that were passed to the `persistAll`
|
||||
* method of the `PersistentActor`. All `PersistentRepr` of the `AtomicWrite` must be
|
||||
* written to the data store atomically, i.e. all or none must be stored.
|
||||
* If the journal (data store) cannot support atomic writes of multiple events it should
|
||||
* reject such writes with a `Try` `Failure` with an `UnsupportedOperationException`
|
||||
* describing the issue. This limitation should also be documented by the journal plugin.
|
||||
* Each `AtomicWrite` message contains the single `PersistentRepr` that corresponds to
|
||||
* the event that was passed to the `persist` method of the `PersistentActor`, or it
|
||||
* contains several `PersistentRepr` that corresponds to the events that were passed
|
||||
* to the `persistAll` method of the `PersistentActor`. All `PersistentRepr` of the
|
||||
* `AtomicWrite` must be written to the data store atomically, i.e. all or none must
|
||||
* be stored. If the journal (data store) cannot support atomic writes of multiple
|
||||
* events it should reject such writes with a `Try` `Failure` with an
|
||||
* `UnsupportedOperationException` describing the issue. This limitation should
|
||||
* also be documented by the journal plugin.
|
||||
*
|
||||
* If there are failures when storing any of the messages in the batch the returned
|
||||
* `Future` must be completed with failure. The `Future` must only be completed with
|
||||
* success when all messages in the batch have been confirmed to be stored successfully,
|
||||
* i.e. they will be readable, and visible, in a subsequent replay. If there are uncertainty
|
||||
* about if the messages were stored or not the `Future` must be completed with failure.
|
||||
* i.e. they will be readable, and visible, in a subsequent replay. If there is
|
||||
* uncertainty about if the messages were stored or not the `Future` must be completed
|
||||
* with failure.
|
||||
*
|
||||
* Data store connection problems must be signaled by completing the `Future` with
|
||||
* failure.
|
||||
*
|
||||
* The journal can also signal that it rejects individual messages (`AtomicWrite`) by
|
||||
* the returned `immutable.Seq[Try[Unit]]`. The returned `Seq` must have as many elements
|
||||
* as the input `messages` `Seq`. Each `Try` element signals if the corresponding `AtomicWrite`
|
||||
* is rejected or not, with an exception describing the problem. Rejecting a message means it
|
||||
* was not stored, i.e. it must not be included in a later replay. Rejecting a message is
|
||||
* typically done before attempting to store it, e.g. because of serialization error.
|
||||
* as the input `messages` `Seq`. Each `Try` element signals if the corresponding
|
||||
* `AtomicWrite` is rejected or not, with an exception describing the problem. Rejecting
|
||||
* a message means it was not stored, i.e. it must not be included in a later replay.
|
||||
* Rejecting a message is typically done before attempting to store it, e.g. because of
|
||||
* serialization error.
|
||||
*
|
||||
* Data store connection problems must not be signaled as rejections.
|
||||
*
|
||||
* Note that it is possible to reduce number of allocations by
|
||||
* caching some result `Seq` for the happy path, i.e. when no messages are rejected.
|
||||
*/
|
||||
def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]]
|
||||
|
||||
|
|
@ -187,6 +194,7 @@ private[persistence] object AsyncWriteJournal {
|
|||
val successUnit: Success[Unit] = Success(())
|
||||
|
||||
final case class Desequenced(msg: Any, snr: Long, target: ActorRef, sender: ActorRef)
|
||||
extends NoSerializationVerificationNeeded
|
||||
|
||||
class Resequencer extends Actor {
|
||||
import scala.collection.mutable.Map
|
||||
|
|
|
|||
|
|
@ -1,144 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
|
||||
* Copyright (C) 2012-2013 Eligotech BV.
|
||||
*/
|
||||
|
||||
package akka.persistence.journal
|
||||
|
||||
import scala.collection.immutable
|
||||
import scala.util._
|
||||
import akka.actor.{ ActorLogging, Actor }
|
||||
import akka.pattern.pipe
|
||||
import akka.persistence._
|
||||
|
||||
object SyncWriteJournal {
|
||||
val successUnit: Success[Unit] = Success(())
|
||||
}
|
||||
|
||||
/**
|
||||
* Abstract journal, optimized for synchronous writes.
|
||||
*/
|
||||
trait SyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery with ActorLogging {
|
||||
import JournalProtocol._
|
||||
import context.dispatcher
|
||||
|
||||
private val extension = Persistence(context.system)
|
||||
private val publish = extension.settings.internal.publishPluginCommands
|
||||
|
||||
final def receive = {
|
||||
case WriteMessages(messages, persistentActor, actorInstanceId) ⇒
|
||||
val writeResult = Try {
|
||||
val prepared = preparePersistentBatch(messages)
|
||||
val results = writeMessages(prepared)
|
||||
if (results.size != prepared.size)
|
||||
throw new IllegalStateException("writeMessages returned invalid number of results. " +
|
||||
s"Expected [${prepared.size}], but got [${results.size}]")
|
||||
results
|
||||
}
|
||||
writeResult match {
|
||||
case Success(results) ⇒
|
||||
persistentActor ! WriteMessagesSuccessful
|
||||
val resultsIter = results.iterator
|
||||
messages.foreach {
|
||||
case a: AtomicWrite ⇒
|
||||
resultsIter.next() match {
|
||||
case Success(_) ⇒
|
||||
a.payload.foreach { p ⇒
|
||||
persistentActor.tell(WriteMessageSuccess(p, actorInstanceId), p.sender)
|
||||
}
|
||||
case Failure(e) ⇒
|
||||
a.payload.foreach { p ⇒
|
||||
persistentActor.tell(WriteMessageRejected(p, e, actorInstanceId), p.sender)
|
||||
}
|
||||
}
|
||||
|
||||
case r: NonPersistentRepr ⇒
|
||||
persistentActor.tell(LoopMessageSuccess(r.payload, actorInstanceId), r.sender)
|
||||
}
|
||||
|
||||
case Failure(e) ⇒
|
||||
persistentActor ! WriteMessagesFailed(e)
|
||||
messages.foreach {
|
||||
case a: AtomicWrite ⇒
|
||||
a.payload.foreach { p ⇒
|
||||
persistentActor.tell(WriteMessageFailure(p, e, actorInstanceId), p.sender)
|
||||
}
|
||||
case r: NonPersistentRepr ⇒
|
||||
persistentActor.tell(LoopMessageSuccess(r.payload, actorInstanceId), r.sender)
|
||||
}
|
||||
throw e
|
||||
}
|
||||
|
||||
case r @ ReplayMessages(fromSequenceNr, toSequenceNr, max, persistenceId, persistentActor) ⇒
|
||||
asyncReplayMessages(persistenceId, fromSequenceNr, toSequenceNr, max) { p ⇒
|
||||
if (!p.deleted) // old records from 2.3 may still have the deleted flag
|
||||
adaptFromJournal(p).foreach { adaptedPersistentRepr ⇒
|
||||
persistentActor.tell(ReplayedMessage(adaptedPersistentRepr), adaptedPersistentRepr.sender)
|
||||
}
|
||||
} map {
|
||||
case _ ⇒ ReplayMessagesSuccess
|
||||
} recover {
|
||||
case e ⇒ ReplayMessagesFailure(e)
|
||||
} pipeTo persistentActor onSuccess {
|
||||
case _ if publish ⇒ context.system.eventStream.publish(r)
|
||||
}
|
||||
|
||||
case ReadHighestSequenceNr(fromSequenceNr, persistenceId, persistentActor) ⇒
|
||||
asyncReadHighestSequenceNr(persistenceId, fromSequenceNr).map {
|
||||
highest ⇒ ReadHighestSequenceNrSuccess(highest)
|
||||
} recover {
|
||||
case e ⇒ ReadHighestSequenceNrFailure(e)
|
||||
} pipeTo persistentActor
|
||||
|
||||
case d @ DeleteMessagesTo(persistenceId, toSequenceNr) ⇒
|
||||
Try(deleteMessagesTo(persistenceId, toSequenceNr)) match {
|
||||
case Success(_) ⇒ if (publish) context.system.eventStream.publish(d)
|
||||
case Failure(e) ⇒
|
||||
}
|
||||
}
|
||||
|
||||
//#journal-plugin-api
|
||||
/**
|
||||
* * Plugin API: asynchronously writes a batch (`Seq`) of persistent messages to the journal.
|
||||
*
|
||||
* The batch is only for performance reasons, i.e. all messages don't have to be written
|
||||
* atomically. Higher throughput can typically be achieved by using batch inserts of many
|
||||
* records compared inserting records one-by-one, but this aspect depends on the underlying
|
||||
* data store and a journal implementation can implement it as efficient as possible with
|
||||
* the assumption that the messages of the batch are unrelated.
|
||||
*
|
||||
* Each `AtomicWrite` message contains the single `PersistentRepr` that corresponds to the
|
||||
* event that was passed to the `persist` method of the `PersistentActor`, or it contains
|
||||
* several `PersistentRepr` that corresponds to the events that were passed to the `persistAll`
|
||||
* method of the `PersistentActor`. All `PersistentRepr` of the `AtomicWrite` must be
|
||||
* written to the data store atomically, i.e. all or none must be stored.
|
||||
* If the journal (data store) cannot support atomic writes of multiple events it should
|
||||
* reject such writes with a `Try` `Failure` with an `UnsupportedOperationException`
|
||||
* describing the issue. This limitation should also be documented by the journal plugin.
|
||||
*
|
||||
* If there are failures when storing any of the messages in the batch the method must
|
||||
* throw an exception. The method must only return normally when all messages in the
|
||||
* batch have been confirmed to be stored successfully, i.e. they will be readable,
|
||||
* and visible, in a subsequent replay. If there are uncertainty about if the
|
||||
* messages were stored or not the method must throw an exception.
|
||||
*
|
||||
* Data store connection problems must be signaled by throwing an exception.
|
||||
*
|
||||
* The journal can also signal that it rejects individual messages (`AtomicWrite`) by
|
||||
* the returned `immutable.Seq[Try[Unit]]`. The returned `Seq` must have as many elements
|
||||
* as the input `messages` `Seq`. Each `Try` element signals if the corresponding `AtomicWrite`
|
||||
* is rejected or not, with an exception describing the problem. Rejecting a message means it
|
||||
* was not stored, i.e. it must not be included in a later replay. Rejecting a message is
|
||||
* typically done before attempting to store it, e.g. because of serialization error.
|
||||
*
|
||||
* Data store connection problems must not be signaled as rejections.
|
||||
*/
|
||||
def writeMessages(messages: immutable.Seq[AtomicWrite]): immutable.Seq[Try[Unit]]
|
||||
|
||||
/**
|
||||
* Plugin API: synchronously deletes all persistent messages up to `toSequenceNr`
|
||||
* (inclusive).
|
||||
*/
|
||||
def deleteMessagesTo(persistenceId: String, toSequenceNr: Long): Unit
|
||||
//#journal-plugin-api
|
||||
}
|
||||
|
|
@ -1,31 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.persistence.journal.japi
|
||||
|
||||
import scala.collection.immutable
|
||||
import scala.collection.JavaConverters._
|
||||
import akka.persistence._
|
||||
import akka.persistence.journal.{ SyncWriteJournal ⇒ SSyncWriteJournal }
|
||||
import scala.util.Try
|
||||
import scala.util.Failure
|
||||
|
||||
import scala.concurrent.{ Await, Future }
|
||||
import scala.concurrent.duration._
|
||||
|
||||
/**
|
||||
* Java API: abstract journal, optimized for synchronous writes.
|
||||
*/
|
||||
abstract class SyncWriteJournal extends AsyncRecovery with SSyncWriteJournal with SyncWritePlugin {
|
||||
import SSyncWriteJournal.successUnit
|
||||
|
||||
final def writeMessages(messages: immutable.Seq[AtomicWrite]): immutable.Seq[Try[Unit]] =
|
||||
doWriteMessages(messages.asJava).asScala.map { o ⇒
|
||||
if (o.isPresent) Failure(o.get)
|
||||
else successUnit
|
||||
}(collection.breakOut)
|
||||
|
||||
final def deleteMessagesTo(persistenceId: String, toSequenceNr: Long): Unit =
|
||||
doDeleteMessagesTo(persistenceId, toSequenceNr)
|
||||
}
|
||||
|
|
@ -17,7 +17,7 @@ import akka.util.Helpers.ConfigOps
|
|||
*
|
||||
* Journal backed by a local LevelDB store. For production use.
|
||||
*/
|
||||
private[persistence] class LeveldbJournal extends { val configPath = "akka.persistence.journal.leveldb" } with SyncWriteJournal with LeveldbStore
|
||||
private[persistence] class LeveldbJournal extends { val configPath = "akka.persistence.journal.leveldb" } with AsyncWriteJournal with LeveldbStore
|
||||
|
||||
/**
|
||||
* INTERNAL API.
|
||||
|
|
|
|||
|
|
@ -6,15 +6,16 @@
|
|||
package akka.persistence.journal.leveldb
|
||||
|
||||
import java.io.File
|
||||
|
||||
import akka.actor._
|
||||
import akka.persistence._
|
||||
import akka.persistence.journal.{ WriteJournalBase, AsyncWriteTarget }
|
||||
import akka.serialization.SerializationExtension
|
||||
import org.iq80.leveldb._
|
||||
|
||||
import scala.collection.immutable
|
||||
import scala.util._
|
||||
import scala.concurrent.Future
|
||||
import scala.util.control.NonFatal
|
||||
import akka.persistence.journal.AsyncWriteJournal
|
||||
|
||||
/**
|
||||
* INTERNAL API.
|
||||
|
|
@ -39,25 +40,32 @@ private[persistence] trait LeveldbStore extends Actor with WriteJournalBase with
|
|||
|
||||
import Key._
|
||||
|
||||
def writeMessages(messages: immutable.Seq[AtomicWrite]): immutable.Seq[Try[Unit]] =
|
||||
withBatch(batch ⇒ messages.map { a ⇒
|
||||
Try(a.payload.foreach(message ⇒ addToMessageBatch(message, batch)))
|
||||
def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] =
|
||||
Future.fromTry(Try {
|
||||
withBatch(batch ⇒ messages.map { a ⇒
|
||||
Try(a.payload.foreach(message ⇒ addToMessageBatch(message, batch)))
|
||||
})
|
||||
})
|
||||
|
||||
def deleteMessagesTo(persistenceId: String, toSequenceNr: Long) = withBatch { batch ⇒
|
||||
val nid = numericId(persistenceId)
|
||||
def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long): Future[Unit] =
|
||||
try Future.successful {
|
||||
withBatch { batch ⇒
|
||||
val nid = numericId(persistenceId)
|
||||
|
||||
// seek to first existing message
|
||||
val fromSequenceNr = withIterator { iter ⇒
|
||||
val startKey = Key(nid, 1L, 0)
|
||||
iter.seek(keyToBytes(startKey))
|
||||
if (iter.hasNext) keyFromBytes(iter.peekNext().getKey).sequenceNr else Long.MaxValue
|
||||
}
|
||||
// seek to first existing message
|
||||
val fromSequenceNr = withIterator { iter ⇒
|
||||
val startKey = Key(nid, 1L, 0)
|
||||
iter.seek(keyToBytes(startKey))
|
||||
if (iter.hasNext) keyFromBytes(iter.peekNext().getKey).sequenceNr else Long.MaxValue
|
||||
}
|
||||
|
||||
fromSequenceNr to toSequenceNr foreach { sequenceNr ⇒
|
||||
batch.delete(keyToBytes(Key(nid, sequenceNr, 0)))
|
||||
fromSequenceNr to toSequenceNr foreach { sequenceNr ⇒
|
||||
batch.delete(keyToBytes(Key(nid, sequenceNr, 0)))
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
case NonFatal(e) ⇒ Future.failed(e)
|
||||
}
|
||||
}
|
||||
|
||||
def leveldbSnapshot(): ReadOptions = leveldbReadOptions.snapshot(leveldb.getSnapshot)
|
||||
|
||||
|
|
@ -103,22 +111,3 @@ private[persistence] trait LeveldbStore extends Actor with WriteJournalBase with
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A LevelDB store that can be shared by multiple actor systems. The shared store must be
|
||||
* set for each actor system that uses the store via `SharedLeveldbJournal.setStore`. The
|
||||
* shared LevelDB store is for testing only.
|
||||
*/
|
||||
class SharedLeveldbStore extends { val configPath = "akka.persistence.journal.leveldb-shared.store" } with LeveldbStore {
|
||||
import AsyncWriteTarget._
|
||||
|
||||
def receive = {
|
||||
case WriteMessages(msgs) ⇒ sender() ! writeMessages(preparePersistentBatch(msgs))
|
||||
case DeleteMessagesTo(pid, tsnr) ⇒ sender() ! deleteMessagesTo(pid, tsnr)
|
||||
case ReadHighestSequenceNr(pid, fromSequenceNr) ⇒ sender() ! readHighestSequenceNr(numericId(pid))
|
||||
case ReplayMessages(pid, fromSnr, toSnr, max) ⇒
|
||||
Try(replayMessages(numericId(pid), fromSnr, toSnr, max)(p ⇒ adaptFromJournal(p).foreach { sender() ! _ })) match {
|
||||
case Success(max) ⇒ sender() ! ReplaySuccess
|
||||
case Failure(cause) ⇒ sender() ! ReplayFailure(cause)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,55 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.persistence.journal.leveldb
|
||||
|
||||
import akka.persistence.journal.AsyncWriteTarget
|
||||
import akka.pattern.pipe
|
||||
import scala.util.Try
|
||||
import scala.util.Success
|
||||
import scala.util.Failure
|
||||
import scala.util.control.NonFatal
|
||||
import akka.persistence.AtomicWrite
|
||||
import scala.concurrent.Future
|
||||
|
||||
/**
|
||||
* A LevelDB store that can be shared by multiple actor systems. The shared store must be
|
||||
* set for each actor system that uses the store via `SharedLeveldbJournal.setStore`. The
|
||||
* shared LevelDB store is for testing only.
|
||||
*/
|
||||
class SharedLeveldbStore extends { val configPath = "akka.persistence.journal.leveldb-shared.store" } with LeveldbStore {
|
||||
import AsyncWriteTarget._
|
||||
import context.dispatcher
|
||||
|
||||
def receive = {
|
||||
case WriteMessages(messages) ⇒
|
||||
val prepared = Try(preparePersistentBatch(messages))
|
||||
val writeResult = (prepared match {
|
||||
case Success(prep) ⇒
|
||||
// in case the asyncWriteMessages throws
|
||||
try asyncWriteMessages(prep) catch { case NonFatal(e) ⇒ Future.failed(e) }
|
||||
case f @ Failure(_) ⇒
|
||||
// exception from preparePersistentBatch => rejected
|
||||
Future.successful(messages.collect { case a: AtomicWrite ⇒ f })
|
||||
}).map { results ⇒
|
||||
if (results.size != prepared.get.size)
|
||||
throw new IllegalStateException("asyncWriteMessages returned invalid number of results. " +
|
||||
s"Expected [${prepared.get.size}], but got [${results.size}]")
|
||||
results
|
||||
}
|
||||
|
||||
writeResult.pipeTo(sender())
|
||||
|
||||
case DeleteMessagesTo(pid, tsnr) ⇒
|
||||
asyncDeleteMessagesTo(pid, tsnr).pipeTo(sender())
|
||||
|
||||
case ReadHighestSequenceNr(pid, fromSequenceNr) ⇒
|
||||
asyncReadHighestSequenceNr(pid, fromSequenceNr).pipeTo(sender())
|
||||
|
||||
case ReplayMessages(pid, fromSnr, toSnr, max) ⇒
|
||||
Try(replayMessages(numericId(pid), fromSnr, toSnr, max)(p ⇒ adaptFromJournal(p).foreach { sender() ! _ })) match {
|
||||
case Success(max) ⇒ sender() ! ReplaySuccess
|
||||
case Failure(cause) ⇒ sender() ! ReplayFailure(cause)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -8,9 +8,10 @@ import scala.collection.immutable
|
|||
import scala.concurrent.Future
|
||||
import scala.concurrent.forkjoin.ThreadLocalRandom
|
||||
import akka.persistence._
|
||||
import akka.persistence.journal.SyncWriteJournal
|
||||
import akka.persistence.journal.AsyncWriteJournal
|
||||
import akka.persistence.journal.inmem.InmemMessages
|
||||
import scala.util.Try
|
||||
import scala.util.control.NonFatal
|
||||
|
||||
class WriteFailedException(ps: Seq[PersistentRepr])
|
||||
extends TestException(s"write failed for payloads = [${ps.map(_.payload)}]")
|
||||
|
|
@ -27,7 +28,7 @@ class ReadHighestFailedException
|
|||
*/
|
||||
private object ChaosJournalMessages extends InmemMessages
|
||||
|
||||
class ChaosJournal extends SyncWriteJournal {
|
||||
class ChaosJournal extends AsyncWriteJournal {
|
||||
import ChaosJournalMessages.{ delete ⇒ del, _ }
|
||||
|
||||
val config = context.system.settings.config.getConfig("akka.persistence.journal.chaos")
|
||||
|
|
@ -38,17 +39,25 @@ class ChaosJournal extends SyncWriteJournal {
|
|||
|
||||
def random = ThreadLocalRandom.current
|
||||
|
||||
def writeMessages(messages: immutable.Seq[AtomicWrite]): immutable.Seq[Try[Unit]] =
|
||||
if (shouldFail(writeFailureRate)) throw new WriteFailedException(messages.flatMap(_.payload))
|
||||
else
|
||||
for (a ← messages) yield {
|
||||
a.payload.foreach(add)
|
||||
SyncWriteJournal.successUnit
|
||||
}
|
||||
override def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] =
|
||||
try Future.successful {
|
||||
if (shouldFail(writeFailureRate)) throw new WriteFailedException(messages.flatMap(_.payload))
|
||||
else
|
||||
for (a ← messages) yield {
|
||||
a.payload.foreach(add)
|
||||
AsyncWriteJournal.successUnit
|
||||
}
|
||||
} catch {
|
||||
case NonFatal(e) ⇒ Future.failed(e)
|
||||
}
|
||||
|
||||
def deleteMessagesTo(persistenceId: String, toSequenceNr: Long): Unit = {
|
||||
(1L to toSequenceNr).foreach { snr ⇒
|
||||
del(persistenceId, snr)
|
||||
override def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long): Future[Unit] = {
|
||||
try Future.successful {
|
||||
(1L to toSequenceNr).foreach { snr ⇒
|
||||
del(persistenceId, snr)
|
||||
}
|
||||
} catch {
|
||||
case NonFatal(e) ⇒ Future.failed(e)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue