2013-10-08 11:46:02 +02:00
|
|
|
/**
|
2014-02-02 19:05:45 -06:00
|
|
|
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
|
2013-10-08 11:46:02 +02:00
|
|
|
* Copyright (C) 2012-2013 Eligotech BV.
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
package akka.persistence.journal
|
|
|
|
|
|
2013-10-27 08:01:14 +01:00
|
|
|
import scala.collection.immutable
|
2013-10-08 11:46:02 +02:00
|
|
|
import scala.concurrent.Future
|
|
|
|
|
import scala.util._
|
|
|
|
|
|
|
|
|
|
import akka.actor._
|
2013-12-06 12:48:44 +01:00
|
|
|
import akka.pattern.pipe
|
2013-10-08 11:46:02 +02:00
|
|
|
import akka.persistence._
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Abstract journal, optimized for asynchronous, non-blocking writes.
|
|
|
|
|
*/
|
2014-06-03 16:40:44 +02:00
|
|
|
trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery {
|
2014-01-17 06:58:25 +01:00
|
|
|
import JournalProtocol._
|
2013-10-08 11:46:02 +02:00
|
|
|
import AsyncWriteJournal._
|
|
|
|
|
import context.dispatcher
|
|
|
|
|
|
2013-11-12 09:02:02 +01:00
|
|
|
private val extension = Persistence(context.system)
|
2014-01-17 06:58:25 +01:00
|
|
|
private val publish = extension.settings.internal.publishPluginCommands
|
2013-11-12 09:02:02 +01:00
|
|
|
|
2014-06-03 16:40:44 +02:00
|
|
|
private val resequencer = context.actorOf(Props[Resequencer]())
|
2013-10-08 11:46:02 +02:00
|
|
|
private var resequencerCounter = 1L
|
|
|
|
|
|
2013-11-25 12:02:29 +01:00
|
|
|
def receive = {
|
2014-06-03 16:40:44 +02:00
|
|
|
case WriteMessages(resequenceables, processor) ⇒
|
2013-10-08 11:46:02 +02:00
|
|
|
val cctr = resequencerCounter
|
2014-06-03 16:40:44 +02:00
|
|
|
def resequence(f: PersistentRepr ⇒ Any) = resequenceables.zipWithIndex.foreach {
|
|
|
|
|
case (p: PersistentRepr, i) ⇒ resequencer ! Desequenced(f(p), cctr + i + 1, processor, p.sender)
|
|
|
|
|
case (r, i) ⇒ resequencer ! Desequenced(LoopMessageSuccess(r.payload), cctr + i + 1, processor, r.sender)
|
2013-10-27 08:01:14 +01:00
|
|
|
}
|
2014-06-03 16:40:44 +02:00
|
|
|
asyncWriteMessages(preparePersistentBatch(resequenceables)) onComplete {
|
2013-11-20 13:47:42 +01:00
|
|
|
case Success(_) ⇒
|
2014-05-21 01:35:21 +02:00
|
|
|
resequencer ! Desequenced(WriteMessagesSuccessful, cctr, processor, self)
|
2014-01-17 06:58:25 +01:00
|
|
|
resequence(WriteMessageSuccess(_))
|
2013-11-20 13:47:42 +01:00
|
|
|
case Failure(e) ⇒
|
2014-05-21 01:35:21 +02:00
|
|
|
resequencer ! Desequenced(WriteMessagesFailed(e), cctr, processor, self)
|
2014-01-17 06:58:25 +01:00
|
|
|
resequence(WriteMessageFailure(_, e))
|
2013-10-27 08:01:14 +01:00
|
|
|
}
|
2014-06-03 16:40:44 +02:00
|
|
|
resequencerCounter += resequenceables.length + 1
|
2014-06-23 14:33:35 +02:00
|
|
|
case r @ ReplayMessages(fromSequenceNr, toSequenceNr, max, persistenceId, processor, replayDeleted) ⇒
|
2013-10-08 11:46:02 +02:00
|
|
|
// Send replayed messages and replay result to processor directly. No need
|
|
|
|
|
// to resequence replayed messages relative to written and looped messages.
|
2014-06-23 14:33:35 +02:00
|
|
|
asyncReplayMessages(persistenceId, fromSequenceNr, toSequenceNr, max) { p ⇒
|
2014-01-17 06:58:25 +01:00
|
|
|
if (!p.deleted || replayDeleted) processor.tell(ReplayedMessage(p), p.sender)
|
2013-10-08 11:46:02 +02:00
|
|
|
} map {
|
2014-01-17 06:58:25 +01:00
|
|
|
case _ ⇒ ReplayMessagesSuccess
|
2013-10-08 11:46:02 +02:00
|
|
|
} recover {
|
2014-01-17 06:58:25 +01:00
|
|
|
case e ⇒ ReplayMessagesFailure(e)
|
2014-01-23 10:43:02 +01:00
|
|
|
} pipeTo (processor) onSuccess {
|
|
|
|
|
case _ if publish ⇒ context.system.eventStream.publish(r)
|
|
|
|
|
}
|
2014-06-23 14:33:35 +02:00
|
|
|
case ReadHighestSequenceNr(fromSequenceNr, persistenceId, processor) ⇒
|
2014-01-17 06:58:25 +01:00
|
|
|
// Send read highest sequence number to processor directly. No need
|
|
|
|
|
// to resequence the result relative to written and looped messages.
|
2014-06-23 14:33:35 +02:00
|
|
|
asyncReadHighestSequenceNr(persistenceId, fromSequenceNr).map {
|
2014-01-17 06:58:25 +01:00
|
|
|
highest ⇒ ReadHighestSequenceNrSuccess(highest)
|
|
|
|
|
} recover {
|
|
|
|
|
case e ⇒ ReadHighestSequenceNrFailure(e)
|
|
|
|
|
} pipeTo (processor)
|
|
|
|
|
case c @ WriteConfirmations(confirmationsBatch, requestor) ⇒
|
|
|
|
|
asyncWriteConfirmations(confirmationsBatch) onComplete {
|
|
|
|
|
case Success(_) ⇒ requestor ! WriteConfirmationsSuccess(confirmationsBatch)
|
|
|
|
|
case Failure(e) ⇒ requestor ! WriteConfirmationsFailure(e)
|
2013-12-06 12:48:44 +01:00
|
|
|
}
|
2014-01-17 06:58:25 +01:00
|
|
|
case d @ DeleteMessages(messageIds, permanent, requestorOption) ⇒
|
|
|
|
|
asyncDeleteMessages(messageIds, permanent) onComplete {
|
2013-12-06 12:48:44 +01:00
|
|
|
case Success(_) ⇒
|
2014-01-17 06:58:25 +01:00
|
|
|
requestorOption.foreach(_ ! DeleteMessagesSuccess(messageIds))
|
|
|
|
|
if (publish) context.system.eventStream.publish(d)
|
|
|
|
|
case Failure(e) ⇒
|
2013-10-08 11:46:02 +02:00
|
|
|
}
|
2014-06-23 14:33:35 +02:00
|
|
|
case d @ DeleteMessagesTo(persistenceId, toSequenceNr, permanent) ⇒
|
|
|
|
|
asyncDeleteMessagesTo(persistenceId, toSequenceNr, permanent) onComplete {
|
2014-01-17 06:58:25 +01:00
|
|
|
case Success(_) ⇒ if (publish) context.system.eventStream.publish(d)
|
|
|
|
|
case Failure(e) ⇒
|
2013-10-08 11:46:02 +02:00
|
|
|
}
|
2014-01-17 06:58:25 +01:00
|
|
|
case LoopMessage(message, processor) ⇒
|
2014-06-20 23:05:51 +02:00
|
|
|
resequencer ! Desequenced(LoopMessageSuccess(message), resequencerCounter, processor, sender())
|
2013-10-08 11:46:02 +02:00
|
|
|
resequencerCounter += 1
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
//#journal-plugin-api
|
2013-10-27 08:01:14 +01:00
|
|
|
/**
|
2013-11-07 10:45:02 +01:00
|
|
|
* Plugin API: asynchronously writes a batch of persistent messages to the journal.
|
|
|
|
|
* The batch write must be atomic i.e. either all persistent messages in the batch
|
|
|
|
|
* are written or none.
|
2013-10-27 08:01:14 +01:00
|
|
|
*/
|
2014-01-17 06:58:25 +01:00
|
|
|
def asyncWriteMessages(messages: immutable.Seq[PersistentRepr]): Future[Unit]
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Plugin API: asynchronously writes a batch of delivery confirmations to the journal.
|
|
|
|
|
*/
|
2014-06-27 08:46:07 +02:00
|
|
|
@deprecated("writeConfirmations will be removed, since Channels will be removed.", since = "2.3.4")
|
2014-01-17 06:58:25 +01:00
|
|
|
def asyncWriteConfirmations(confirmations: immutable.Seq[PersistentConfirmation]): Future[Unit]
|
2013-10-27 08:01:14 +01:00
|
|
|
|
2013-10-08 11:46:02 +02:00
|
|
|
/**
|
2014-01-17 06:58:25 +01:00
|
|
|
* Plugin API: asynchronously deletes messages identified by `messageIds` from the
|
|
|
|
|
* journal. If `permanent` is set to `false`, the persistent messages are marked as
|
|
|
|
|
* deleted, otherwise they are permanently deleted.
|
2013-10-08 11:46:02 +02:00
|
|
|
*/
|
2014-06-27 08:46:07 +02:00
|
|
|
@deprecated("asyncDeleteMessages will be removed.", since = "2.3.4")
|
2014-06-27 11:51:58 +02:00
|
|
|
def asyncDeleteMessages(messageIds: immutable.Seq[PersistentId], permanent: Boolean): Future[Unit]
|
2013-10-08 11:46:02 +02:00
|
|
|
|
|
|
|
|
/**
|
2014-01-17 06:58:25 +01:00
|
|
|
* Plugin API: asynchronously deletes all persistent messages up to `toSequenceNr`
|
|
|
|
|
* (inclusive). If `permanent` is set to `false`, the persistent messages are marked
|
|
|
|
|
* as deleted, otherwise they are permanently deleted.
|
2013-10-08 11:46:02 +02:00
|
|
|
*/
|
2014-06-23 14:33:35 +02:00
|
|
|
def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long, permanent: Boolean): Future[Unit]
|
2013-10-08 11:46:02 +02:00
|
|
|
//#journal-plugin-api
|
|
|
|
|
}
|
|
|
|
|
|
2013-11-25 12:02:29 +01:00
|
|
|
/**
|
|
|
|
|
* INTERNAL API.
|
|
|
|
|
*/
|
2013-10-08 11:46:02 +02:00
|
|
|
private[persistence] object AsyncWriteJournal {
|
2014-03-07 13:20:01 +01:00
|
|
|
final case class Desequenced(msg: Any, snr: Long, target: ActorRef, sender: ActorRef)
|
2013-10-08 11:46:02 +02:00
|
|
|
|
|
|
|
|
class Resequencer extends Actor {
|
|
|
|
|
import scala.collection.mutable.Map
|
|
|
|
|
|
|
|
|
|
private val delayed = Map.empty[Long, Desequenced]
|
|
|
|
|
private var delivered = 0L
|
|
|
|
|
|
|
|
|
|
def receive = {
|
|
|
|
|
case d: Desequenced ⇒ resequence(d)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@scala.annotation.tailrec
|
|
|
|
|
private def resequence(d: Desequenced) {
|
|
|
|
|
if (d.snr == delivered + 1) {
|
|
|
|
|
delivered = d.snr
|
|
|
|
|
d.target tell (d.msg, d.sender)
|
|
|
|
|
} else {
|
|
|
|
|
delayed += (d.snr -> d)
|
|
|
|
|
}
|
|
|
|
|
val ro = delayed.remove(delivered + 1)
|
|
|
|
|
if (ro.isDefined) resequence(ro.get)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|