pekko/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala

124 lines
4.6 KiB
Scala
Raw Normal View History

/**
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
* Copyright (C) 2012-2013 Eligotech BV.
*/
package akka.persistence.journal
import akka.actor._
import akka.pattern.pipe
import akka.persistence._
import scala.collection.immutable
import scala.concurrent.Future
import scala.util._
/**
* Abstract journal, optimized for asynchronous, non-blocking writes.
*/
trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery {
import AsyncWriteJournal._
import JournalProtocol._
import context.dispatcher
private val extension = Persistence(context.system)
private val publish = extension.settings.internal.publishPluginCommands
private val resequencer = context.actorOf(Props[Resequencer]())
private var resequencerCounter = 1L
def receive = {
case WriteMessages(messages, persistentActor, actorInstanceId)
val cctr = resequencerCounter
def resequence(f: PersistentRepr Any) = messages.zipWithIndex.foreach {
case (p: PersistentRepr, i) resequencer ! Desequenced(f(p), cctr + i + 1, persistentActor, p.sender)
case (r, i) resequencer ! Desequenced(LoopMessageSuccess(r.payload, actorInstanceId), cctr + i + 1, persistentActor, r.sender)
}
asyncWriteMessages(preparePersistentBatch(messages)) onComplete {
case Success(_)
resequencer ! Desequenced(WriteMessagesSuccessful, cctr, persistentActor, self)
resequence(WriteMessageSuccess(_, actorInstanceId))
case Failure(e)
resequencer ! Desequenced(WriteMessagesFailed(e), cctr, persistentActor, self)
resequence(WriteMessageFailure(_, e, actorInstanceId))
}
resequencerCounter += messages.length + 1
case r @ ReplayMessages(fromSequenceNr, toSequenceNr, max, persistenceId, persistentActor, replayDeleted)
// Send replayed messages and replay result to persistentActor directly. No need
// to resequence replayed messages relative to written and looped messages.
!per #15230 rename processorId => persistentId * This is NOT binary compatible, we're in an *experimental* module. * disabled binary compat checks for package akka.persistence * Source compatibility is retained, but users should migrate do the new method name ASAP. * Plugin APIs were migrated in a way that allows the old plugins to compile agains 2.3.4 without having to change anything. Hopefuly this will help authors migrate to 2.3.4 sooner. This is only source level compatible, not binary compatible. * added deprecation warnings on all processorId methods and provided bridges where possible * for users, the migration should be painless, they can still override the old method, and it'll work. But we encourage them to move to persistenceId; All delegation code will have to be removed afterwards ofc. Conflicts: akka-persistence/src/main/scala/akka/persistence/Channel.scala akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala akka-persistence/src/main/scala/akka/persistence/Persistent.scala akka-persistence/src/main/scala/akka/persistence/PersistentChannel.scala akka-persistence/src/main/scala/akka/persistence/Processor.scala akka-persistence/src/main/scala/akka/persistence/Snapshot.scala akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteProxy.scala akka-persistence/src/main/scala/akka/persistence/journal/inmem/InmemJournal.scala akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbKey.scala akka-persistence/src/main/scala/akka/persistence/snapshot/SnapshotStore.scala akka-persistence/src/test/scala/akka/persistence/serialization/SerializerSpec.scala project/AkkaBuild.scala
2014-06-23 14:33:35 +02:00
asyncReplayMessages(persistenceId, fromSequenceNr, toSequenceNr, max) { p
if (!p.deleted || replayDeleted)
adaptFromJournal(p).foreach { adaptedPersistentRepr
persistentActor.tell(ReplayedMessage(adaptedPersistentRepr), Actor.noSender)
}
} map {
case _ ReplayMessagesSuccess
} recover {
case e ReplayMessagesFailure(e)
} pipeTo persistentActor onSuccess {
case _ if publish context.system.eventStream.publish(r)
}
case ReadHighestSequenceNr(fromSequenceNr, persistenceId, persistentActor)
// Send read highest sequence number to persistentActor directly. No need
// to resequence the result relative to written and looped messages.
!per #15230 rename processorId => persistentId * This is NOT binary compatible, we're in an *experimental* module. * disabled binary compat checks for package akka.persistence * Source compatibility is retained, but users should migrate do the new method name ASAP. * Plugin APIs were migrated in a way that allows the old plugins to compile agains 2.3.4 without having to change anything. Hopefuly this will help authors migrate to 2.3.4 sooner. This is only source level compatible, not binary compatible. * added deprecation warnings on all processorId methods and provided bridges where possible * for users, the migration should be painless, they can still override the old method, and it'll work. But we encourage them to move to persistenceId; All delegation code will have to be removed afterwards ofc. Conflicts: akka-persistence/src/main/scala/akka/persistence/Channel.scala akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala akka-persistence/src/main/scala/akka/persistence/Persistent.scala akka-persistence/src/main/scala/akka/persistence/PersistentChannel.scala akka-persistence/src/main/scala/akka/persistence/Processor.scala akka-persistence/src/main/scala/akka/persistence/Snapshot.scala akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteProxy.scala akka-persistence/src/main/scala/akka/persistence/journal/inmem/InmemJournal.scala akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbKey.scala akka-persistence/src/main/scala/akka/persistence/snapshot/SnapshotStore.scala akka-persistence/src/test/scala/akka/persistence/serialization/SerializerSpec.scala project/AkkaBuild.scala
2014-06-23 14:33:35 +02:00
asyncReadHighestSequenceNr(persistenceId, fromSequenceNr).map {
highest ReadHighestSequenceNrSuccess(highest)
} recover {
case e ReadHighestSequenceNrFailure(e)
} pipeTo persistentActor
!per #15230 rename processorId => persistentId * This is NOT binary compatible, we're in an *experimental* module. * disabled binary compat checks for package akka.persistence * Source compatibility is retained, but users should migrate do the new method name ASAP. * Plugin APIs were migrated in a way that allows the old plugins to compile agains 2.3.4 without having to change anything. Hopefuly this will help authors migrate to 2.3.4 sooner. This is only source level compatible, not binary compatible. * added deprecation warnings on all processorId methods and provided bridges where possible * for users, the migration should be painless, they can still override the old method, and it'll work. But we encourage them to move to persistenceId; All delegation code will have to be removed afterwards ofc. Conflicts: akka-persistence/src/main/scala/akka/persistence/Channel.scala akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala akka-persistence/src/main/scala/akka/persistence/Persistent.scala akka-persistence/src/main/scala/akka/persistence/PersistentChannel.scala akka-persistence/src/main/scala/akka/persistence/Processor.scala akka-persistence/src/main/scala/akka/persistence/Snapshot.scala akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteProxy.scala akka-persistence/src/main/scala/akka/persistence/journal/inmem/InmemJournal.scala akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbKey.scala akka-persistence/src/main/scala/akka/persistence/snapshot/SnapshotStore.scala akka-persistence/src/test/scala/akka/persistence/serialization/SerializerSpec.scala project/AkkaBuild.scala
2014-06-23 14:33:35 +02:00
case d @ DeleteMessagesTo(persistenceId, toSequenceNr, permanent)
asyncDeleteMessagesTo(persistenceId, toSequenceNr, permanent) onComplete {
case Success(_) if (publish) context.system.eventStream.publish(d)
case Failure(e)
}
}
//#journal-plugin-api
/**
* Plugin API: asynchronously writes a batch of persistent messages to the journal.
* The batch write must be atomic i.e. either all persistent messages in the batch
* are written or none.
*/
def asyncWriteMessages(messages: immutable.Seq[PersistentRepr]): Future[Unit]
/**
* Plugin API: asynchronously deletes all persistent messages up to `toSequenceNr`
* (inclusive). If `permanent` is set to `false`, the persistent messages are marked
* as deleted, otherwise they are permanently deleted.
*/
!per #15230 rename processorId => persistentId * This is NOT binary compatible, we're in an *experimental* module. * disabled binary compat checks for package akka.persistence * Source compatibility is retained, but users should migrate do the new method name ASAP. * Plugin APIs were migrated in a way that allows the old plugins to compile agains 2.3.4 without having to change anything. Hopefuly this will help authors migrate to 2.3.4 sooner. This is only source level compatible, not binary compatible. * added deprecation warnings on all processorId methods and provided bridges where possible * for users, the migration should be painless, they can still override the old method, and it'll work. But we encourage them to move to persistenceId; All delegation code will have to be removed afterwards ofc. Conflicts: akka-persistence/src/main/scala/akka/persistence/Channel.scala akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala akka-persistence/src/main/scala/akka/persistence/Persistent.scala akka-persistence/src/main/scala/akka/persistence/PersistentChannel.scala akka-persistence/src/main/scala/akka/persistence/Processor.scala akka-persistence/src/main/scala/akka/persistence/Snapshot.scala akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteProxy.scala akka-persistence/src/main/scala/akka/persistence/journal/inmem/InmemJournal.scala akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbKey.scala akka-persistence/src/main/scala/akka/persistence/snapshot/SnapshotStore.scala akka-persistence/src/test/scala/akka/persistence/serialization/SerializerSpec.scala project/AkkaBuild.scala
2014-06-23 14:33:35 +02:00
def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long, permanent: Boolean): Future[Unit]
//#journal-plugin-api
}
/**
* INTERNAL API.
*/
private[persistence] object AsyncWriteJournal {
2014-03-07 13:20:01 +01:00
final case class Desequenced(msg: Any, snr: Long, target: ActorRef, sender: ActorRef)
class Resequencer extends Actor {
import scala.collection.mutable.Map
private val delayed = Map.empty[Long, Desequenced]
private var delivered = 0L
def receive = {
case d: Desequenced resequence(d)
}
@scala.annotation.tailrec
private def resequence(d: Desequenced) {
if (d.snr == delivered + 1) {
delivered = d.snr
d.target.tell(d.msg, d.sender)
} else {
delayed += (d.snr -> d)
}
val ro = delayed.remove(delivered + 1)
if (ro.isDefined) resequence(ro.get)
}
}
}