=per #15457 Correlate persistAsync handlers with journal messages
We have assumed that the handlers can be popped when replies come back from journal, but if messages to journal are in flight when the actor is restarted the handlers does not match up with journal replies. This solution ignores journal replies that were emitted by an old PersistentActor instance by passing an uid with the journal messages. This means that the handler will not be invoked for such messages. (cherry picked from commit 7ebaaab669c9e467a1ffb4d9ed8b6500e1801a7c) Conflicts: akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala akka-persistence/src/main/scala/akka/persistence/Processor.scala akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala
This commit is contained in:
parent
33c7f6bb4f
commit
8eec3f92d3
11 changed files with 126 additions and 56 deletions
|
|
@ -214,9 +214,13 @@ The ordering between events is still guaranteed ("evt-b-1" will be sent after "e
|
||||||
In order to implement the pattern known as "*command sourcing*" simply ``persistAsync`` all incoming events right away,
|
In order to implement the pattern known as "*command sourcing*" simply ``persistAsync`` all incoming events right away,
|
||||||
and handle them in the callback.
|
and handle them in the callback.
|
||||||
|
|
||||||
|
.. warning::
|
||||||
|
The callback will not be invoked if the actor is restarted (or stopped) in between the call to
|
||||||
|
``persistAsync`` and the journal has confirmed the write.
|
||||||
|
|
||||||
.. _defer-java-lambda:
|
.. _defer-java-lambda:
|
||||||
|
|
||||||
Deferring actions until preceeding persist handlers have executed
|
Deferring actions until preceding persist handlers have executed
|
||||||
-----------------------------------------------------------------
|
-----------------------------------------------------------------
|
||||||
|
|
||||||
Sometimes when working with ``persistAsync`` you may find that it would be nice to define some actions in terms of
|
Sometimes when working with ``persistAsync`` you may find that it would be nice to define some actions in terms of
|
||||||
|
|
@ -234,6 +238,10 @@ of the command for which this ``defer`` handler was called.
|
||||||
|
|
||||||
.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#defer-caller
|
.. includecode:: ../../../akka-samples/akka-sample-persistence-java-lambda/src/main/java/doc/LambdaPersistenceDocTest.java#defer-caller
|
||||||
|
|
||||||
|
.. warning::
|
||||||
|
The callback will not be invoked if the actor is restarted (or stopped) in between the call to
|
||||||
|
``defer`` and the journal has processed and confirmed all preceding writes.
|
||||||
|
|
||||||
Batch writes
|
Batch writes
|
||||||
------------
|
------------
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -218,10 +218,14 @@ The ordering between events is still guaranteed ("evt-b-1" will be sent after "e
|
||||||
.. note::
|
.. note::
|
||||||
In order to implement the pattern known as "*command sourcing*" simply ``persistAsync`` all incoming events right away,
|
In order to implement the pattern known as "*command sourcing*" simply ``persistAsync`` all incoming events right away,
|
||||||
and handle them in the callback.
|
and handle them in the callback.
|
||||||
|
|
||||||
|
.. warning::
|
||||||
|
The callback will not be invoked if the actor is restarted (or stopped) in between the call to
|
||||||
|
``persistAsync`` and the journal has confirmed the write.
|
||||||
|
|
||||||
.. _defer-java:
|
.. _defer-java:
|
||||||
|
|
||||||
Deferring actions until preceeding persist handlers have executed
|
Deferring actions until preceding persist handlers have executed
|
||||||
-----------------------------------------------------------------
|
-----------------------------------------------------------------
|
||||||
|
|
||||||
Sometimes when working with ``persistAsync`` you may find that it would be nice to define some actions in terms of
|
Sometimes when working with ``persistAsync`` you may find that it would be nice to define some actions in terms of
|
||||||
|
|
@ -239,6 +243,10 @@ of the command for which this ``defer`` handler was called.
|
||||||
|
|
||||||
.. includecode:: code/docs/persistence/PersistenceDocTest.java#defer-caller
|
.. includecode:: code/docs/persistence/PersistenceDocTest.java#defer-caller
|
||||||
|
|
||||||
|
.. warning::
|
||||||
|
The callback will not be invoked if the actor is restarted (or stopped) in between the call to
|
||||||
|
``defer`` and the journal has processed and confirmed all preceding writes.
|
||||||
|
|
||||||
Batch writes
|
Batch writes
|
||||||
------------
|
------------
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -536,7 +536,7 @@ Managagement Messages
|
||||||
These management messages may be handled after other messages, so if you send ``AddRoutee`` immediately followed
|
These management messages may be handled after other messages, so if you send ``AddRoutee`` immediately followed
|
||||||
an ordinary message you are not guaranteed that the routees have been changed when the ordinary message
|
an ordinary message you are not guaranteed that the routees have been changed when the ordinary message
|
||||||
is routed. If you need to know when the change has been applied you can send ``AddRoutee`` followed by ``GetRoutees``
|
is routed. If you need to know when the change has been applied you can send ``AddRoutee`` followed by ``GetRoutees``
|
||||||
and when you receive the ``Routees`` reply you know that the preceeding change has been applied.
|
and when you receive the ``Routees`` reply you know that the preceding change has been applied.
|
||||||
|
|
||||||
.. _resizable-routers-java:
|
.. _resizable-routers-java:
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -210,10 +210,14 @@ The ordering between events is still guaranteed ("evt-b-1" will be sent after "e
|
||||||
.. note::
|
.. note::
|
||||||
In order to implement the pattern known as "*command sourcing*" simply call ``persistAsync(cmd)(...)`` right away on all incomming
|
In order to implement the pattern known as "*command sourcing*" simply call ``persistAsync(cmd)(...)`` right away on all incomming
|
||||||
messages right away, and handle them in the callback.
|
messages right away, and handle them in the callback.
|
||||||
|
|
||||||
|
.. warning::
|
||||||
|
The callback will not be invoked if the actor is restarted (or stopped) in between the call to
|
||||||
|
``persistAsync`` and the journal has confirmed the write.
|
||||||
|
|
||||||
.. _defer-scala:
|
.. _defer-scala:
|
||||||
|
|
||||||
Deferring actions until preceeding persist handlers have executed
|
Deferring actions until preceding persist handlers have executed
|
||||||
-----------------------------------------------------------------
|
-----------------------------------------------------------------
|
||||||
|
|
||||||
Sometimes when working with ``persistAsync`` you may find that it would be nice to define some actions in terms of
|
Sometimes when working with ``persistAsync`` you may find that it would be nice to define some actions in terms of
|
||||||
|
|
@ -233,6 +237,9 @@ The calling side will get the responses in this (guaranteed) order:
|
||||||
|
|
||||||
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#defer-caller
|
.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#defer-caller
|
||||||
|
|
||||||
|
.. warning::
|
||||||
|
The callback will not be invoked if the actor is restarted (or stopped) in between the call to
|
||||||
|
``defer`` and the journal has processed and confirmed all preceding writes.
|
||||||
|
|
||||||
.. _batch-writes:
|
.. _batch-writes:
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -535,7 +535,7 @@ Managagement Messages
|
||||||
These management messages may be handled after other messages, so if you send ``AddRoutee`` immediately followed
|
These management messages may be handled after other messages, so if you send ``AddRoutee`` immediately followed
|
||||||
an ordinary message you are not guaranteed that the routees have been changed when the ordinary message
|
an ordinary message you are not guaranteed that the routees have been changed when the ordinary message
|
||||||
is routed. If you need to know when the change has been applied you can send ``AddRoutee`` followed by ``GetRoutees``
|
is routed. If you need to know when the change has been applied you can send ``AddRoutee`` followed by ``GetRoutees``
|
||||||
and when you receive the ``Routees`` reply you know that the preceeding change has been applied.
|
and when you receive the ``Routees`` reply you know that the preceding change has been applied.
|
||||||
|
|
||||||
.. _resizable-routers-scala:
|
.. _resizable-routers-scala:
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -58,16 +58,20 @@ private[persistence] trait Eventsourced extends ProcessorImpl {
|
||||||
throw new UnsupportedOperationException("Persistent command batches not supported")
|
throw new UnsupportedOperationException("Persistent command batches not supported")
|
||||||
case _: PersistentRepr ⇒
|
case _: PersistentRepr ⇒
|
||||||
throw new UnsupportedOperationException("Persistent commands not supported")
|
throw new UnsupportedOperationException("Persistent commands not supported")
|
||||||
case WriteMessageSuccess(r) ⇒
|
case WriteMessageSuccess(p, id) ⇒
|
||||||
r match {
|
// instanceId mismatch can happen for persistAsync and defer in case of actor restart
|
||||||
case p: PersistentRepr ⇒
|
// while message is in flight, in that case we ignore the call to the handler
|
||||||
withCurrentPersistent(p)(p ⇒ pendingInvocations.peek().handler(p.payload))
|
if (id == instanceId) {
|
||||||
case _ ⇒ pendingInvocations.peek().handler(r.payload)
|
withCurrentPersistent(p)(p ⇒ pendingInvocations.peek().handler(p.payload))
|
||||||
|
onWriteComplete()
|
||||||
|
}
|
||||||
|
case LoopMessageSuccess(l, id) ⇒
|
||||||
|
// instanceId mismatch can happen for persistAsync and defer in case of actor restart
|
||||||
|
// while message is in flight, in that case we ignore the call to the handler
|
||||||
|
if (id == instanceId) {
|
||||||
|
pendingInvocations.peek().handler(l)
|
||||||
|
onWriteComplete()
|
||||||
}
|
}
|
||||||
onWriteComplete()
|
|
||||||
case LoopMessageSuccess(l) ⇒
|
|
||||||
pendingInvocations.peek().handler(l)
|
|
||||||
onWriteComplete()
|
|
||||||
case s @ WriteMessagesSuccessful ⇒ Eventsourced.super.aroundReceive(receive, s)
|
case s @ WriteMessagesSuccessful ⇒ Eventsourced.super.aroundReceive(receive, s)
|
||||||
case f: WriteMessagesFailed ⇒ Eventsourced.super.aroundReceive(receive, f)
|
case f: WriteMessagesFailed ⇒ Eventsourced.super.aroundReceive(receive, f)
|
||||||
case _ ⇒
|
case _ ⇒
|
||||||
|
|
@ -75,7 +79,7 @@ private[persistence] trait Eventsourced extends ProcessorImpl {
|
||||||
}
|
}
|
||||||
|
|
||||||
private def doAroundReceive(receive: Receive, message: Any): Unit = {
|
private def doAroundReceive(receive: Receive, message: Any): Unit = {
|
||||||
Eventsourced.super.aroundReceive(receive, LoopMessageSuccess(message))
|
Eventsourced.super.aroundReceive(receive, LoopMessageSuccess(message, instanceId))
|
||||||
|
|
||||||
if (pendingStashingPersistInvocations > 0) {
|
if (pendingStashingPersistInvocations > 0) {
|
||||||
currentState = persistingEvents
|
currentState = persistingEvents
|
||||||
|
|
@ -111,19 +115,25 @@ private[persistence] trait Eventsourced extends ProcessorImpl {
|
||||||
deleteMessage(p.sequenceNr, permanent = true)
|
deleteMessage(p.sequenceNr, permanent = true)
|
||||||
throw new UnsupportedOperationException("Persistent commands not supported")
|
throw new UnsupportedOperationException("Persistent commands not supported")
|
||||||
|
|
||||||
case WriteMessageSuccess(m) ⇒
|
case WriteMessageSuccess(p, id) ⇒
|
||||||
m match {
|
// instanceId mismatch can happen for persistAsync and defer in case of actor restart
|
||||||
case p: PersistentRepr ⇒ withCurrentPersistent(p)(p ⇒ pendingInvocations.peek().handler(p.payload))
|
// while message is in flight, in that case we ignore the call to the handler
|
||||||
case _ ⇒ pendingInvocations.peek().handler(m.payload)
|
if (id == instanceId) {
|
||||||
|
withCurrentPersistent(p)(p ⇒ pendingInvocations.peek().handler(p.payload))
|
||||||
|
onWriteComplete()
|
||||||
}
|
}
|
||||||
onWriteComplete()
|
|
||||||
|
|
||||||
case e @ WriteMessageFailure(p, _) ⇒
|
case e @ WriteMessageFailure(p, _, id) ⇒
|
||||||
Eventsourced.super.aroundReceive(receive, message) // stops actor by default
|
Eventsourced.super.aroundReceive(receive, message) // stops actor by default
|
||||||
onWriteComplete()
|
// instanceId mismatch can happen for persistAsync and defer in case of actor restart
|
||||||
case LoopMessageSuccess(l) ⇒
|
// while message is in flight, in that case the handler has already been discarded
|
||||||
pendingInvocations.peek().handler(l)
|
if (id == instanceId)
|
||||||
onWriteComplete()
|
onWriteComplete()
|
||||||
|
case LoopMessageSuccess(l, id) ⇒
|
||||||
|
if (id == instanceId) {
|
||||||
|
pendingInvocations.peek().handler(l)
|
||||||
|
onWriteComplete()
|
||||||
|
}
|
||||||
case s @ WriteMessagesSuccessful ⇒ Eventsourced.super.aroundReceive(receive, s)
|
case s @ WriteMessagesSuccessful ⇒ Eventsourced.super.aroundReceive(receive, s)
|
||||||
case f: WriteMessagesFailed ⇒ Eventsourced.super.aroundReceive(receive, f)
|
case f: WriteMessagesFailed ⇒ Eventsourced.super.aroundReceive(receive, f)
|
||||||
case other ⇒ processorStash.stash()
|
case other ⇒ processorStash.stash()
|
||||||
|
|
|
||||||
|
|
@ -58,7 +58,7 @@ private[persistence] object JournalProtocol {
|
||||||
* @param messages messages to be written.
|
* @param messages messages to be written.
|
||||||
* @param persistentActor write requestor.
|
* @param persistentActor write requestor.
|
||||||
*/
|
*/
|
||||||
final case class WriteMessages(messages: immutable.Seq[Resequenceable], persistentActor: ActorRef)
|
final case class WriteMessages(messages: immutable.Seq[Resequenceable], persistentActor: ActorRef, actorInstanceId: Int)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Reply message to a successful [[WriteMessages]] request. This reply is sent to the requestor
|
* Reply message to a successful [[WriteMessages]] request. This reply is sent to the requestor
|
||||||
|
|
@ -80,7 +80,7 @@ private[persistence] object JournalProtocol {
|
||||||
*
|
*
|
||||||
* @param persistent successfully written message.
|
* @param persistent successfully written message.
|
||||||
*/
|
*/
|
||||||
final case class WriteMessageSuccess(persistent: PersistentRepr)
|
final case class WriteMessageSuccess(persistent: PersistentRepr, actorInstanceId: Int)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Reply message to a failed [[WriteMessages]] request. For each contained [[PersistentRepr]] message
|
* Reply message to a failed [[WriteMessages]] request. For each contained [[PersistentRepr]] message
|
||||||
|
|
@ -89,7 +89,7 @@ private[persistence] object JournalProtocol {
|
||||||
* @param message message failed to be written.
|
* @param message message failed to be written.
|
||||||
* @param cause failure cause.
|
* @param cause failure cause.
|
||||||
*/
|
*/
|
||||||
final case class WriteMessageFailure(message: PersistentRepr, cause: Throwable)
|
final case class WriteMessageFailure(message: PersistentRepr, cause: Throwable, actorInstanceId: Int)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Request to loop a `message` back to `persistent actor`, without persisting the message. Looping of messages
|
* Request to loop a `message` back to `persistent actor`, without persisting the message. Looping of messages
|
||||||
|
|
@ -98,14 +98,14 @@ private[persistence] object JournalProtocol {
|
||||||
* @param message message to be looped through the journal.
|
* @param message message to be looped through the journal.
|
||||||
* @param persistentActor loop requestor.
|
* @param persistentActor loop requestor.
|
||||||
*/
|
*/
|
||||||
final case class LoopMessage(message: Any, persistentActor: ActorRef)
|
final case class LoopMessage(message: Any, persistentActor: ActorRef, actorInstanceId: Int)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Reply message to a [[LoopMessage]] request.
|
* Reply message to a [[LoopMessage]] request.
|
||||||
*
|
*
|
||||||
* @param message looped message.
|
* @param message looped message.
|
||||||
*/
|
*/
|
||||||
final case class LoopMessageSuccess(message: Any)
|
final case class LoopMessageSuccess(message: Any, actorInstanceId: Int)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Request to replay messages to `persistentActor`.
|
* Request to replay messages to `persistentActor`.
|
||||||
|
|
|
||||||
|
|
@ -7,6 +7,7 @@ package akka.persistence
|
||||||
import akka.AkkaException
|
import akka.AkkaException
|
||||||
import akka.actor._
|
import akka.actor._
|
||||||
import akka.dispatch._
|
import akka.dispatch._
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* An actor that persists (journals) messages of type [[Persistent]]. Messages of other types are not persisted.
|
* An actor that persists (journals) messages of type [[Persistent]]. Messages of other types are not persisted.
|
||||||
|
|
@ -59,6 +60,14 @@ trait Processor extends ProcessorImpl {
|
||||||
override def persistenceId: String = processorId
|
override def persistenceId: String = processorId
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* INTERNAL API
|
||||||
|
*/
|
||||||
|
private[akka] object ProcessorImpl {
|
||||||
|
// ok to wrap around (2*Int.MaxValue restarts will not happen within a journal roundtrip)
|
||||||
|
private val instanceIdCounter = new AtomicInteger
|
||||||
|
}
|
||||||
|
|
||||||
/** INTERNAL API */
|
/** INTERNAL API */
|
||||||
@deprecated("Processor will be removed. Instead extend `akka.persistence.PersistentActor` and use it's `persistAsync(command)(callback)` method to get equivalent semantics.", since = "2.3.4")
|
@deprecated("Processor will be removed. Instead extend `akka.persistence.PersistentActor` and use it's `persistAsync(command)(callback)` method to get equivalent semantics.", since = "2.3.4")
|
||||||
private[akka] trait ProcessorImpl extends Actor with Recovery {
|
private[akka] trait ProcessorImpl extends Actor with Recovery {
|
||||||
|
|
@ -66,6 +75,8 @@ private[akka] trait ProcessorImpl extends Actor with Recovery {
|
||||||
|
|
||||||
import JournalProtocol._
|
import JournalProtocol._
|
||||||
|
|
||||||
|
private[persistence] val instanceId: Int = ProcessorImpl.instanceIdCounter.incrementAndGet()
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Processes the highest stored sequence number response from the journal and then switches
|
* Processes the highest stored sequence number response from the journal and then switches
|
||||||
* to `processing` state.
|
* to `processing` state.
|
||||||
|
|
@ -95,13 +106,12 @@ private[akka] trait ProcessorImpl extends Actor with Recovery {
|
||||||
private var batching = false
|
private var batching = false
|
||||||
|
|
||||||
def aroundReceive(receive: Receive, message: Any) = message match {
|
def aroundReceive(receive: Receive, message: Any) = message match {
|
||||||
case r: Recover ⇒ // ignore
|
case r: Recover ⇒ // ignore
|
||||||
case ReplayedMessage(p) ⇒ processPersistent(receive, p) // can occur after unstash from user stash
|
case ReplayedMessage(p) ⇒ processPersistent(receive, p) // can occur after unstash from user stash
|
||||||
case WriteMessageSuccess(p: PersistentRepr) ⇒ processPersistent(receive, p)
|
case WriteMessageSuccess(p: PersistentRepr, _) ⇒ processPersistent(receive, p)
|
||||||
case WriteMessageSuccess(r: Resequenceable) ⇒ process(receive, r)
|
case WriteMessageSuccess(r: Resequenceable, _) ⇒ process(receive, r)
|
||||||
case WriteMessageFailure(p, cause) ⇒
|
case WriteMessageFailure(p, cause, _) ⇒ process(receive, PersistenceFailure(p.payload, p.sequenceNr, cause))
|
||||||
process(receive, PersistenceFailure(p.payload, p.sequenceNr, cause))
|
case LoopMessageSuccess(m, _) ⇒ process(receive, m)
|
||||||
case LoopMessageSuccess(m) ⇒ process(receive, m)
|
|
||||||
case WriteMessagesSuccessful | WriteMessagesFailed(_) ⇒
|
case WriteMessagesSuccessful | WriteMessagesFailed(_) ⇒
|
||||||
if (processorBatch.isEmpty) batching = false else journalBatch()
|
if (processorBatch.isEmpty) batching = false else journalBatch()
|
||||||
case p: PersistentRepr ⇒
|
case p: PersistentRepr ⇒
|
||||||
|
|
@ -118,7 +128,7 @@ private[akka] trait ProcessorImpl extends Actor with Recovery {
|
||||||
case m ⇒
|
case m ⇒
|
||||||
// submit all batched messages before looping this message
|
// submit all batched messages before looping this message
|
||||||
if (processorBatch.isEmpty) batching = false else journalBatch()
|
if (processorBatch.isEmpty) batching = false else journalBatch()
|
||||||
journal forward LoopMessage(m, self)
|
journal forward LoopMessage(m, self, instanceId)
|
||||||
}
|
}
|
||||||
|
|
||||||
def addToBatch(p: Resequenceable): Unit = p match {
|
def addToBatch(p: Resequenceable): Unit = p match {
|
||||||
|
|
@ -135,7 +145,7 @@ private[akka] trait ProcessorImpl extends Actor with Recovery {
|
||||||
processorBatch.length >= extension.settings.journal.maxMessageBatchSize
|
processorBatch.length >= extension.settings.journal.maxMessageBatchSize
|
||||||
|
|
||||||
def journalBatch(): Unit = {
|
def journalBatch(): Unit = {
|
||||||
journal ! WriteMessages(processorBatch, self)
|
journal ! WriteMessages(processorBatch, self, instanceId)
|
||||||
processorBatch = Vector.empty
|
processorBatch = Vector.empty
|
||||||
batching = true
|
batching = true
|
||||||
}
|
}
|
||||||
|
|
@ -271,10 +281,10 @@ private[akka] trait ProcessorImpl extends Actor with Recovery {
|
||||||
unstashAll(unstashFilterPredicate)
|
unstashAll(unstashFilterPredicate)
|
||||||
} finally {
|
} finally {
|
||||||
message match {
|
message match {
|
||||||
case Some(WriteMessageSuccess(m)) ⇒ preRestartDefault(reason, Some(m))
|
case Some(WriteMessageSuccess(m, _)) ⇒ preRestartDefault(reason, Some(m))
|
||||||
case Some(LoopMessageSuccess(m)) ⇒ preRestartDefault(reason, Some(m))
|
case Some(LoopMessageSuccess(m, _)) ⇒ preRestartDefault(reason, Some(m))
|
||||||
case Some(ReplayedMessage(m)) ⇒ preRestartDefault(reason, Some(m))
|
case Some(ReplayedMessage(m)) ⇒ preRestartDefault(reason, Some(m))
|
||||||
case mo ⇒ preRestartDefault(reason, None)
|
case mo ⇒ preRestartDefault(reason, None)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -28,19 +28,19 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery {
|
||||||
private var resequencerCounter = 1L
|
private var resequencerCounter = 1L
|
||||||
|
|
||||||
def receive = {
|
def receive = {
|
||||||
case WriteMessages(resequenceables, processor) ⇒
|
case WriteMessages(resequenceables, processor, actorInstanceId) ⇒
|
||||||
val cctr = resequencerCounter
|
val cctr = resequencerCounter
|
||||||
def resequence(f: PersistentRepr ⇒ Any) = resequenceables.zipWithIndex.foreach {
|
def resequence(f: PersistentRepr ⇒ Any) = resequenceables.zipWithIndex.foreach {
|
||||||
case (p: PersistentRepr, i) ⇒ resequencer ! Desequenced(f(p), cctr + i + 1, processor, p.sender)
|
case (p: PersistentRepr, i) ⇒ resequencer ! Desequenced(f(p), cctr + i + 1, processor, p.sender)
|
||||||
case (r, i) ⇒ resequencer ! Desequenced(LoopMessageSuccess(r.payload), cctr + i + 1, processor, r.sender)
|
case (r, i) ⇒ resequencer ! Desequenced(LoopMessageSuccess(r.payload, actorInstanceId), cctr + i + 1, processor, r.sender)
|
||||||
}
|
}
|
||||||
asyncWriteMessages(preparePersistentBatch(resequenceables)) onComplete {
|
asyncWriteMessages(preparePersistentBatch(resequenceables)) onComplete {
|
||||||
case Success(_) ⇒
|
case Success(_) ⇒
|
||||||
resequencer ! Desequenced(WriteMessagesSuccessful, cctr, processor, self)
|
resequencer ! Desequenced(WriteMessagesSuccessful, cctr, processor, self)
|
||||||
resequence(WriteMessageSuccess(_))
|
resequence(WriteMessageSuccess(_, actorInstanceId))
|
||||||
case Failure(e) ⇒
|
case Failure(e) ⇒
|
||||||
resequencer ! Desequenced(WriteMessagesFailed(e), cctr, processor, self)
|
resequencer ! Desequenced(WriteMessagesFailed(e), cctr, processor, self)
|
||||||
resequence(WriteMessageFailure(_, e))
|
resequence(WriteMessageFailure(_, e, actorInstanceId))
|
||||||
}
|
}
|
||||||
resequencerCounter += resequenceables.length + 1
|
resequencerCounter += resequenceables.length + 1
|
||||||
case r @ ReplayMessages(fromSequenceNr, toSequenceNr, max, persistenceId, processor, replayDeleted) ⇒
|
case r @ ReplayMessages(fromSequenceNr, toSequenceNr, max, persistenceId, processor, replayDeleted) ⇒
|
||||||
|
|
@ -80,8 +80,8 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery {
|
||||||
case Success(_) ⇒ if (publish) context.system.eventStream.publish(d)
|
case Success(_) ⇒ if (publish) context.system.eventStream.publish(d)
|
||||||
case Failure(e) ⇒
|
case Failure(e) ⇒
|
||||||
}
|
}
|
||||||
case LoopMessage(message, processor) ⇒
|
case LoopMessage(message, processor, actorInstanceId) ⇒
|
||||||
resequencer ! Desequenced(LoopMessageSuccess(message), resequencerCounter, processor, sender())
|
resequencer ! Desequenced(LoopMessageSuccess(message, actorInstanceId), resequencerCounter, processor, sender)
|
||||||
resequencerCounter += 1
|
resequencerCounter += 1
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -23,19 +23,19 @@ trait SyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery {
|
||||||
private val publish = extension.settings.internal.publishPluginCommands
|
private val publish = extension.settings.internal.publishPluginCommands
|
||||||
|
|
||||||
final def receive = {
|
final def receive = {
|
||||||
case WriteMessages(resequenceables, processor) ⇒
|
case WriteMessages(resequenceables, processor, actorInstanceId) ⇒
|
||||||
Try(writeMessages(preparePersistentBatch(resequenceables))) match {
|
Try(writeMessages(preparePersistentBatch(resequenceables))) match {
|
||||||
case Success(_) ⇒
|
case Success(_) ⇒
|
||||||
processor ! WriteMessagesSuccessful
|
processor ! WriteMessagesSuccessful
|
||||||
resequenceables.foreach {
|
resequenceables.foreach {
|
||||||
case p: PersistentRepr ⇒ processor.tell(WriteMessageSuccess(p), p.sender)
|
case p: PersistentRepr ⇒ processor.tell(WriteMessageSuccess(p, actorInstanceId), p.sender)
|
||||||
case r ⇒ processor.tell(LoopMessageSuccess(r.payload), r.sender)
|
case r ⇒ processor.tell(LoopMessageSuccess(r.payload, actorInstanceId), r.sender)
|
||||||
}
|
}
|
||||||
case Failure(e) ⇒
|
case Failure(e) ⇒
|
||||||
processor ! WriteMessagesFailed(e)
|
processor ! WriteMessagesFailed(e)
|
||||||
resequenceables.foreach {
|
resequenceables.foreach {
|
||||||
case p: PersistentRepr ⇒ processor tell (WriteMessageFailure(p, e), p.sender)
|
case p: PersistentRepr ⇒ processor tell (WriteMessageFailure(p, e, actorInstanceId), p.sender)
|
||||||
case r ⇒ processor tell (LoopMessageSuccess(r.payload), r.sender)
|
case r ⇒ processor tell (LoopMessageSuccess(r.payload, actorInstanceId), r.sender)
|
||||||
}
|
}
|
||||||
throw e
|
throw e
|
||||||
}
|
}
|
||||||
|
|
@ -73,8 +73,8 @@ trait SyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery {
|
||||||
case Success(_) ⇒ if (publish) context.system.eventStream.publish(d)
|
case Success(_) ⇒ if (publish) context.system.eventStream.publish(d)
|
||||||
case Failure(e) ⇒
|
case Failure(e) ⇒
|
||||||
}
|
}
|
||||||
case LoopMessage(message, processor) ⇒
|
case LoopMessage(message, processor, actorInstanceId) ⇒
|
||||||
processor forward LoopMessageSuccess(message)
|
processor forward LoopMessageSuccess(message, actorInstanceId)
|
||||||
}
|
}
|
||||||
|
|
||||||
//#journal-plugin-api
|
//#journal-plugin-api
|
||||||
|
|
|
||||||
|
|
@ -299,6 +299,25 @@ object PersistentActorSpec {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
class AsyncPersistHandlerCorrelationCheck(name: String) extends ExamplePersistentActor(name) {
|
||||||
|
var counter = 0
|
||||||
|
|
||||||
|
val receiveCommand: Receive = commonBehavior orElse {
|
||||||
|
case Cmd(data) ⇒
|
||||||
|
persistAsync(Evt(data)) { evt ⇒
|
||||||
|
if (data != evt.data)
|
||||||
|
sender() ! s"Expected [$data] bot got [${evt.data}]"
|
||||||
|
if (evt.data == "done")
|
||||||
|
sender() ! "done"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private def incCounter(): Int = {
|
||||||
|
counter += 1
|
||||||
|
counter
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
class UserStashFailureProcessor(name: String) extends ExamplePersistentActor(name) {
|
class UserStashFailureProcessor(name: String) extends ExamplePersistentActor(name) {
|
||||||
val receiveCommand: Receive = commonBehavior orElse {
|
val receiveCommand: Receive = commonBehavior orElse {
|
||||||
case Cmd(data) ⇒
|
case Cmd(data) ⇒
|
||||||
|
|
@ -661,6 +680,14 @@ abstract class PersistentActorSpec(config: Config) extends AkkaSpec(config) with
|
||||||
|
|
||||||
expectNoMsg(100.millis)
|
expectNoMsg(100.millis)
|
||||||
}
|
}
|
||||||
|
"correlate persistAsync handlers after restart" in {
|
||||||
|
val processor = namedProcessor[AsyncPersistHandlerCorrelationCheck]
|
||||||
|
for (n ← 1 to 100) processor ! Cmd(n)
|
||||||
|
processor ! "boom"
|
||||||
|
for (n ← 1 to 20) processor ! Cmd(n)
|
||||||
|
processor ! Cmd("done")
|
||||||
|
expectMsg(5.seconds, "done")
|
||||||
|
}
|
||||||
"allow deferring handlers in order to provide ordered processing in respect to persist handlers" in {
|
"allow deferring handlers in order to provide ordered processing in respect to persist handlers" in {
|
||||||
val processor = namedProcessor[DeferringWithPersistActor]
|
val processor = namedProcessor[DeferringWithPersistActor]
|
||||||
processor ! Cmd("a")
|
processor ! Cmd("a")
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue