Merge pull request #19644 from akka/wip-19637-fix-BC-AsyncWriteJournal-RK

This commit is contained in:
Roland Kuhn 2016-01-29 19:06:51 +01:00
commit a6aee310ba
3 changed files with 115 additions and 107 deletions

View file

@ -47,7 +47,6 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery {
private def isReplayFilterEnabled: Boolean = replayFilterMode != ReplayFilter.Disabled
private val replayFilterWindowSize: Int = config.getInt("replay-filter.window-size")
private val replayFilterMaxOldWriters: Int = config.getInt("replay-filter.max-old-writers")
private val replayDebugEnabled: Boolean = config.getBoolean("replay-filter.debug")
private val resequencer = context.actorOf(Props[Resequencer]())
private var resequencerCounter = 1L
@ -55,109 +54,114 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery {
final def receive = receiveWriteJournal.orElse[Any, Unit](receivePluginInternal)
final val receiveWriteJournal: Actor.Receive = {
case WriteMessages(messages, persistentActor, actorInstanceId)
val cctr = resequencerCounter
resequencerCounter += messages.foldLeft(0)((acc, m) acc + m.size) + 1
// cannot be a val in the trait due to binary compatibility
val replayDebugEnabled: Boolean = config.getBoolean("replay-filter.debug")
val atomicWriteCount = messages.count(_.isInstanceOf[AtomicWrite])
val prepared = Try(preparePersistentBatch(messages))
val writeResult = (prepared match {
case Success(prep)
// try in case the asyncWriteMessages throws
try breaker.withCircuitBreaker(asyncWriteMessages(prep))
catch { case NonFatal(e) Future.failed(e) }
case f @ Failure(_)
// exception from preparePersistentBatch => rejected
Future.successful(messages.collect { case a: AtomicWrite f })
}).map { results
if (results.nonEmpty && results.size != atomicWriteCount)
throw new IllegalStateException("asyncWriteMessages returned invalid number of results. " +
s"Expected [${prepared.get.size}], but got [${results.size}]")
results
}
{
case WriteMessages(messages, persistentActor, actorInstanceId)
val cctr = resequencerCounter
resequencerCounter += messages.foldLeft(0)((acc, m) acc + m.size) + 1
writeResult.onComplete {
case Success(results)
resequencer ! Desequenced(WriteMessagesSuccessful, cctr, persistentActor, self)
val resultsIter =
if (results.isEmpty) Iterator.fill(atomicWriteCount)(AsyncWriteJournal.successUnit)
else results.iterator
var n = cctr + 1
messages.foreach {
case a: AtomicWrite
resultsIter.next() match {
case Success(_)
a.payload.foreach { p
resequencer ! Desequenced(WriteMessageSuccess(p, actorInstanceId), n, persistentActor, p.sender)
n += 1
}
case Failure(e)
a.payload.foreach { p
resequencer ! Desequenced(WriteMessageRejected(p, e, actorInstanceId), n, persistentActor, p.sender)
n += 1
}
}
case r: NonPersistentRepr
resequencer ! Desequenced(LoopMessageSuccess(r.payload, actorInstanceId), n, persistentActor, r.sender)
n += 1
}
case Failure(e)
resequencer ! Desequenced(WriteMessagesFailed(e), cctr, persistentActor, self)
var n = cctr + 1
messages.foreach {
case a: AtomicWrite
a.payload.foreach { p
resequencer ! Desequenced(WriteMessageFailure(p, e, actorInstanceId), n, persistentActor, p.sender)
n += 1
}
case r: NonPersistentRepr
resequencer ! Desequenced(LoopMessageSuccess(r.payload, actorInstanceId), n, persistentActor, r.sender)
n += 1
}
}
case r @ ReplayMessages(fromSequenceNr, toSequenceNr, max, persistenceId, persistentActor)
val replyTo =
if (isReplayFilterEnabled) context.actorOf(ReplayFilter.props(persistentActor, replayFilterMode,
replayFilterWindowSize, replayFilterMaxOldWriters, replayDebugEnabled))
else persistentActor
val readHighestSequenceNrFrom = math.max(0L, fromSequenceNr - 1)
breaker.withCircuitBreaker(asyncReadHighestSequenceNr(persistenceId, readHighestSequenceNrFrom))
.flatMap { highSeqNr
val toSeqNr = math.min(toSequenceNr, highSeqNr)
if (highSeqNr == 0L || fromSequenceNr > toSeqNr)
Future.successful(highSeqNr)
else {
// Send replayed messages and replay result to persistentActor directly. No need
// to resequence replayed messages relative to written and looped messages.
// not possible to use circuit breaker here
asyncReplayMessages(persistenceId, fromSequenceNr, toSeqNr, max) { p
if (!p.deleted) // old records from 2.3 may still have the deleted flag
adaptFromJournal(p).foreach { adaptedPersistentRepr
replyTo.tell(ReplayedMessage(adaptedPersistentRepr), Actor.noSender)
}
}.map(_ highSeqNr)
}
}.map {
highSeqNr RecoverySuccess(highSeqNr)
}.recover {
case e ReplayMessagesFailure(e)
}.pipeTo(replyTo).onSuccess {
case _ if (publish) context.system.eventStream.publish(r)
val atomicWriteCount = messages.count(_.isInstanceOf[AtomicWrite])
val prepared = Try(preparePersistentBatch(messages))
val writeResult = (prepared match {
case Success(prep)
// try in case the asyncWriteMessages throws
try breaker.withCircuitBreaker(asyncWriteMessages(prep))
catch { case NonFatal(e) Future.failed(e) }
case f @ Failure(_)
// exception from preparePersistentBatch => rejected
Future.successful(messages.collect { case a: AtomicWrite f })
}).map { results
if (results.nonEmpty && results.size != atomicWriteCount)
throw new IllegalStateException("asyncWriteMessages returned invalid number of results. " +
s"Expected [${prepared.get.size}], but got [${results.size}]")
results
}
case d @ DeleteMessagesTo(persistenceId, toSequenceNr, persistentActor)
breaker.withCircuitBreaker(asyncDeleteMessagesTo(persistenceId, toSequenceNr)) map {
case _ DeleteMessagesSuccess(toSequenceNr)
} recover {
case e DeleteMessagesFailure(e, toSequenceNr)
} pipeTo persistentActor onComplete {
case _ if (publish) context.system.eventStream.publish(d)
}
writeResult.onComplete {
case Success(results)
resequencer ! Desequenced(WriteMessagesSuccessful, cctr, persistentActor, self)
val resultsIter =
if (results.isEmpty) Iterator.fill(atomicWriteCount)(AsyncWriteJournal.successUnit)
else results.iterator
var n = cctr + 1
messages.foreach {
case a: AtomicWrite
resultsIter.next() match {
case Success(_)
a.payload.foreach { p
resequencer ! Desequenced(WriteMessageSuccess(p, actorInstanceId), n, persistentActor, p.sender)
n += 1
}
case Failure(e)
a.payload.foreach { p
resequencer ! Desequenced(WriteMessageRejected(p, e, actorInstanceId), n, persistentActor, p.sender)
n += 1
}
}
case r: NonPersistentRepr
resequencer ! Desequenced(LoopMessageSuccess(r.payload, actorInstanceId), n, persistentActor, r.sender)
n += 1
}
case Failure(e)
resequencer ! Desequenced(WriteMessagesFailed(e), cctr, persistentActor, self)
var n = cctr + 1
messages.foreach {
case a: AtomicWrite
a.payload.foreach { p
resequencer ! Desequenced(WriteMessageFailure(p, e, actorInstanceId), n, persistentActor, p.sender)
n += 1
}
case r: NonPersistentRepr
resequencer ! Desequenced(LoopMessageSuccess(r.payload, actorInstanceId), n, persistentActor, r.sender)
n += 1
}
}
case r @ ReplayMessages(fromSequenceNr, toSequenceNr, max, persistenceId, persistentActor)
val replyTo =
if (isReplayFilterEnabled) context.actorOf(ReplayFilter.props(persistentActor, replayFilterMode,
replayFilterWindowSize, replayFilterMaxOldWriters, replayDebugEnabled))
else persistentActor
val readHighestSequenceNrFrom = math.max(0L, fromSequenceNr - 1)
breaker.withCircuitBreaker(asyncReadHighestSequenceNr(persistenceId, readHighestSequenceNrFrom))
.flatMap { highSeqNr
val toSeqNr = math.min(toSequenceNr, highSeqNr)
if (highSeqNr == 0L || fromSequenceNr > toSeqNr)
Future.successful(highSeqNr)
else {
// Send replayed messages and replay result to persistentActor directly. No need
// to resequence replayed messages relative to written and looped messages.
// not possible to use circuit breaker here
asyncReplayMessages(persistenceId, fromSequenceNr, toSeqNr, max) { p
if (!p.deleted) // old records from 2.3 may still have the deleted flag
adaptFromJournal(p).foreach { adaptedPersistentRepr
replyTo.tell(ReplayedMessage(adaptedPersistentRepr), Actor.noSender)
}
}.map(_ highSeqNr)
}
}.map {
highSeqNr RecoverySuccess(highSeqNr)
}.recover {
case e ReplayMessagesFailure(e)
}.pipeTo(replyTo).onSuccess {
case _ if (publish) context.system.eventStream.publish(r)
}
case d @ DeleteMessagesTo(persistenceId, toSequenceNr, persistentActor)
breaker.withCircuitBreaker(asyncDeleteMessagesTo(persistenceId, toSequenceNr)) map {
case _ DeleteMessagesSuccess(toSequenceNr)
} recover {
case e DeleteMessagesFailure(e, toSequenceNr)
} pipeTo persistentActor onComplete {
case _ if (publish) context.system.eventStream.publish(d)
}
}
}
//#journal-plugin-api
@ -274,4 +278,3 @@ private[persistence] object AsyncWriteJournal {
}
}
}

View file

@ -30,6 +30,13 @@ private[akka] object ReplayFilter {
Props(new ReplayFilter(persistentActor, mode, windowSize, maxOldWriters, debugEnabled))
}
// for binary compatibility
def props(
persistentActor: ActorRef,
mode: Mode,
windowSize: Int,
maxOldWriters: Int): Props = props(persistentActor, mode, windowSize, maxOldWriters, debugEnabled = false)
sealed trait Mode
case object Fail extends Mode
case object Warn extends Mode
@ -46,6 +53,10 @@ private[akka] class ReplayFilter(persistentActor: ActorRef, mode: ReplayFilter.M
import JournalProtocol._
import ReplayFilter.{ Warn, Fail, RepairByDiscardOld, Disabled }
// for binary compatibility
def this(persistentActor: ActorRef, mode: ReplayFilter.Mode,
windowSize: Int, maxOldWriters: Int) = this(persistentActor, mode, windowSize, maxOldWriters, debugEnabled = false)
val buffer = new LinkedList[ReplayedMessage]()
val oldWriters = LinkedHashSet.empty[String]
var writerUuid = ""

View file

@ -590,12 +590,6 @@ object MiMa extends AutoPlugin {
// #19133 change in internal actor
ProblemFilters.exclude[MissingMethodProblem]("akka.remote.ReliableDeliverySupervisor.gated"),
// #19200 debug logging in ReplayFilter, change of internal actor
ProblemFilters.exclude[MissingMethodProblem]("akka.persistence.journal.ReplayFilter.this"),
ProblemFilters.exclude[MissingMethodProblem]("akka.persistence.journal.AsyncWriteJournal.akka$persistence$journal$AsyncWriteJournal$_setter_$akka$persistence$journal$AsyncWriteJournal$$replayDebugEnabled_="),
ProblemFilters.exclude[MissingMethodProblem]("akka.persistence.journal.AsyncWriteJournal.akka$persistence$journal$AsyncWriteJournal$$replayDebugEnabled"),
ProblemFilters.exclude[MissingMethodProblem]("akka.persistence.journal.ReplayFilter.props"),
// #18758 report invalid association events
ProblemFilters.exclude[MissingTypesProblem]("akka.remote.InvalidAssociation$"),
ProblemFilters.exclude[MissingMethodProblem]("akka.remote.InvalidAssociation.apply"),