diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala index 11bb4ff8ad..745cae10cf 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala @@ -47,7 +47,6 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery { private def isReplayFilterEnabled: Boolean = replayFilterMode != ReplayFilter.Disabled private val replayFilterWindowSize: Int = config.getInt("replay-filter.window-size") private val replayFilterMaxOldWriters: Int = config.getInt("replay-filter.max-old-writers") - private val replayDebugEnabled: Boolean = config.getBoolean("replay-filter.debug") private val resequencer = context.actorOf(Props[Resequencer]()) private var resequencerCounter = 1L @@ -55,109 +54,114 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery { final def receive = receiveWriteJournal.orElse[Any, Unit](receivePluginInternal) final val receiveWriteJournal: Actor.Receive = { - case WriteMessages(messages, persistentActor, actorInstanceId) ⇒ - val cctr = resequencerCounter - resequencerCounter += messages.foldLeft(0)((acc, m) ⇒ acc + m.size) + 1 + // cannot be a val in the trait due to binary compatibility + val replayDebugEnabled: Boolean = config.getBoolean("replay-filter.debug") - val atomicWriteCount = messages.count(_.isInstanceOf[AtomicWrite]) - val prepared = Try(preparePersistentBatch(messages)) - val writeResult = (prepared match { - case Success(prep) ⇒ - // try in case the asyncWriteMessages throws - try breaker.withCircuitBreaker(asyncWriteMessages(prep)) - catch { case NonFatal(e) ⇒ Future.failed(e) } - case f @ Failure(_) ⇒ - // exception from preparePersistentBatch => rejected - Future.successful(messages.collect { case a: AtomicWrite ⇒ f }) - }).map { results ⇒ - if (results.nonEmpty && results.size != atomicWriteCount) - throw new IllegalStateException("asyncWriteMessages returned invalid number of results. " + - s"Expected [${prepared.get.size}], but got [${results.size}]") - results - } + { + case WriteMessages(messages, persistentActor, actorInstanceId) ⇒ + val cctr = resequencerCounter + resequencerCounter += messages.foldLeft(0)((acc, m) ⇒ acc + m.size) + 1 - writeResult.onComplete { - case Success(results) ⇒ - resequencer ! Desequenced(WriteMessagesSuccessful, cctr, persistentActor, self) - - val resultsIter = - if (results.isEmpty) Iterator.fill(atomicWriteCount)(AsyncWriteJournal.successUnit) - else results.iterator - var n = cctr + 1 - messages.foreach { - case a: AtomicWrite ⇒ - resultsIter.next() match { - case Success(_) ⇒ - a.payload.foreach { p ⇒ - resequencer ! Desequenced(WriteMessageSuccess(p, actorInstanceId), n, persistentActor, p.sender) - n += 1 - } - case Failure(e) ⇒ - a.payload.foreach { p ⇒ - resequencer ! Desequenced(WriteMessageRejected(p, e, actorInstanceId), n, persistentActor, p.sender) - n += 1 - } - } - - case r: NonPersistentRepr ⇒ - resequencer ! Desequenced(LoopMessageSuccess(r.payload, actorInstanceId), n, persistentActor, r.sender) - n += 1 - } - - case Failure(e) ⇒ - resequencer ! Desequenced(WriteMessagesFailed(e), cctr, persistentActor, self) - var n = cctr + 1 - messages.foreach { - case a: AtomicWrite ⇒ - a.payload.foreach { p ⇒ - resequencer ! Desequenced(WriteMessageFailure(p, e, actorInstanceId), n, persistentActor, p.sender) - n += 1 - } - case r: NonPersistentRepr ⇒ - resequencer ! Desequenced(LoopMessageSuccess(r.payload, actorInstanceId), n, persistentActor, r.sender) - n += 1 - } - } - - case r @ ReplayMessages(fromSequenceNr, toSequenceNr, max, persistenceId, persistentActor) ⇒ - val replyTo = - if (isReplayFilterEnabled) context.actorOf(ReplayFilter.props(persistentActor, replayFilterMode, - replayFilterWindowSize, replayFilterMaxOldWriters, replayDebugEnabled)) - else persistentActor - - val readHighestSequenceNrFrom = math.max(0L, fromSequenceNr - 1) - breaker.withCircuitBreaker(asyncReadHighestSequenceNr(persistenceId, readHighestSequenceNrFrom)) - .flatMap { highSeqNr ⇒ - val toSeqNr = math.min(toSequenceNr, highSeqNr) - if (highSeqNr == 0L || fromSequenceNr > toSeqNr) - Future.successful(highSeqNr) - else { - // Send replayed messages and replay result to persistentActor directly. No need - // to resequence replayed messages relative to written and looped messages. - // not possible to use circuit breaker here - asyncReplayMessages(persistenceId, fromSequenceNr, toSeqNr, max) { p ⇒ - if (!p.deleted) // old records from 2.3 may still have the deleted flag - adaptFromJournal(p).foreach { adaptedPersistentRepr ⇒ - replyTo.tell(ReplayedMessage(adaptedPersistentRepr), Actor.noSender) - } - }.map(_ ⇒ highSeqNr) - } - }.map { - highSeqNr ⇒ RecoverySuccess(highSeqNr) - }.recover { - case e ⇒ ReplayMessagesFailure(e) - }.pipeTo(replyTo).onSuccess { - case _ ⇒ if (publish) context.system.eventStream.publish(r) + val atomicWriteCount = messages.count(_.isInstanceOf[AtomicWrite]) + val prepared = Try(preparePersistentBatch(messages)) + val writeResult = (prepared match { + case Success(prep) ⇒ + // try in case the asyncWriteMessages throws + try breaker.withCircuitBreaker(asyncWriteMessages(prep)) + catch { case NonFatal(e) ⇒ Future.failed(e) } + case f @ Failure(_) ⇒ + // exception from preparePersistentBatch => rejected + Future.successful(messages.collect { case a: AtomicWrite ⇒ f }) + }).map { results ⇒ + if (results.nonEmpty && results.size != atomicWriteCount) + throw new IllegalStateException("asyncWriteMessages returned invalid number of results. " + + s"Expected [${prepared.get.size}], but got [${results.size}]") + results } - case d @ DeleteMessagesTo(persistenceId, toSequenceNr, persistentActor) ⇒ - breaker.withCircuitBreaker(asyncDeleteMessagesTo(persistenceId, toSequenceNr)) map { - case _ ⇒ DeleteMessagesSuccess(toSequenceNr) - } recover { - case e ⇒ DeleteMessagesFailure(e, toSequenceNr) - } pipeTo persistentActor onComplete { - case _ ⇒ if (publish) context.system.eventStream.publish(d) - } + writeResult.onComplete { + case Success(results) ⇒ + resequencer ! Desequenced(WriteMessagesSuccessful, cctr, persistentActor, self) + + val resultsIter = + if (results.isEmpty) Iterator.fill(atomicWriteCount)(AsyncWriteJournal.successUnit) + else results.iterator + var n = cctr + 1 + messages.foreach { + case a: AtomicWrite ⇒ + resultsIter.next() match { + case Success(_) ⇒ + a.payload.foreach { p ⇒ + resequencer ! Desequenced(WriteMessageSuccess(p, actorInstanceId), n, persistentActor, p.sender) + n += 1 + } + case Failure(e) ⇒ + a.payload.foreach { p ⇒ + resequencer ! Desequenced(WriteMessageRejected(p, e, actorInstanceId), n, persistentActor, p.sender) + n += 1 + } + } + + case r: NonPersistentRepr ⇒ + resequencer ! Desequenced(LoopMessageSuccess(r.payload, actorInstanceId), n, persistentActor, r.sender) + n += 1 + } + + case Failure(e) ⇒ + resequencer ! Desequenced(WriteMessagesFailed(e), cctr, persistentActor, self) + var n = cctr + 1 + messages.foreach { + case a: AtomicWrite ⇒ + a.payload.foreach { p ⇒ + resequencer ! Desequenced(WriteMessageFailure(p, e, actorInstanceId), n, persistentActor, p.sender) + n += 1 + } + case r: NonPersistentRepr ⇒ + resequencer ! Desequenced(LoopMessageSuccess(r.payload, actorInstanceId), n, persistentActor, r.sender) + n += 1 + } + } + + case r @ ReplayMessages(fromSequenceNr, toSequenceNr, max, persistenceId, persistentActor) ⇒ + val replyTo = + if (isReplayFilterEnabled) context.actorOf(ReplayFilter.props(persistentActor, replayFilterMode, + replayFilterWindowSize, replayFilterMaxOldWriters, replayDebugEnabled)) + else persistentActor + + val readHighestSequenceNrFrom = math.max(0L, fromSequenceNr - 1) + breaker.withCircuitBreaker(asyncReadHighestSequenceNr(persistenceId, readHighestSequenceNrFrom)) + .flatMap { highSeqNr ⇒ + val toSeqNr = math.min(toSequenceNr, highSeqNr) + if (highSeqNr == 0L || fromSequenceNr > toSeqNr) + Future.successful(highSeqNr) + else { + // Send replayed messages and replay result to persistentActor directly. No need + // to resequence replayed messages relative to written and looped messages. + // not possible to use circuit breaker here + asyncReplayMessages(persistenceId, fromSequenceNr, toSeqNr, max) { p ⇒ + if (!p.deleted) // old records from 2.3 may still have the deleted flag + adaptFromJournal(p).foreach { adaptedPersistentRepr ⇒ + replyTo.tell(ReplayedMessage(adaptedPersistentRepr), Actor.noSender) + } + }.map(_ ⇒ highSeqNr) + } + }.map { + highSeqNr ⇒ RecoverySuccess(highSeqNr) + }.recover { + case e ⇒ ReplayMessagesFailure(e) + }.pipeTo(replyTo).onSuccess { + case _ ⇒ if (publish) context.system.eventStream.publish(r) + } + + case d @ DeleteMessagesTo(persistenceId, toSequenceNr, persistentActor) ⇒ + breaker.withCircuitBreaker(asyncDeleteMessagesTo(persistenceId, toSequenceNr)) map { + case _ ⇒ DeleteMessagesSuccess(toSequenceNr) + } recover { + case e ⇒ DeleteMessagesFailure(e, toSequenceNr) + } pipeTo persistentActor onComplete { + case _ ⇒ if (publish) context.system.eventStream.publish(d) + } + } } //#journal-plugin-api @@ -274,4 +278,3 @@ private[persistence] object AsyncWriteJournal { } } } - diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/ReplayFilter.scala b/akka-persistence/src/main/scala/akka/persistence/journal/ReplayFilter.scala index 507a913077..f316a9e4d9 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/ReplayFilter.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/ReplayFilter.scala @@ -30,6 +30,13 @@ private[akka] object ReplayFilter { Props(new ReplayFilter(persistentActor, mode, windowSize, maxOldWriters, debugEnabled)) } + // for binary compatibility + def props( + persistentActor: ActorRef, + mode: Mode, + windowSize: Int, + maxOldWriters: Int): Props = props(persistentActor, mode, windowSize, maxOldWriters, debugEnabled = false) + sealed trait Mode case object Fail extends Mode case object Warn extends Mode @@ -46,6 +53,10 @@ private[akka] class ReplayFilter(persistentActor: ActorRef, mode: ReplayFilter.M import JournalProtocol._ import ReplayFilter.{ Warn, Fail, RepairByDiscardOld, Disabled } + // for binary compatibility + def this(persistentActor: ActorRef, mode: ReplayFilter.Mode, + windowSize: Int, maxOldWriters: Int) = this(persistentActor, mode, windowSize, maxOldWriters, debugEnabled = false) + val buffer = new LinkedList[ReplayedMessage]() val oldWriters = LinkedHashSet.empty[String] var writerUuid = "" diff --git a/project/MiMa.scala b/project/MiMa.scala index 8914ae94f8..f51dcf963c 100644 --- a/project/MiMa.scala +++ b/project/MiMa.scala @@ -590,12 +590,6 @@ object MiMa extends AutoPlugin { // #19133 change in internal actor ProblemFilters.exclude[MissingMethodProblem]("akka.remote.ReliableDeliverySupervisor.gated"), - // #19200 debug logging in ReplayFilter, change of internal actor - ProblemFilters.exclude[MissingMethodProblem]("akka.persistence.journal.ReplayFilter.this"), - ProblemFilters.exclude[MissingMethodProblem]("akka.persistence.journal.AsyncWriteJournal.akka$persistence$journal$AsyncWriteJournal$_setter_$akka$persistence$journal$AsyncWriteJournal$$replayDebugEnabled_="), - ProblemFilters.exclude[MissingMethodProblem]("akka.persistence.journal.AsyncWriteJournal.akka$persistence$journal$AsyncWriteJournal$$replayDebugEnabled"), - ProblemFilters.exclude[MissingMethodProblem]("akka.persistence.journal.ReplayFilter.props"), - // #18758 report invalid association events ProblemFilters.exclude[MissingTypesProblem]("akka.remote.InvalidAssociation$"), ProblemFilters.exclude[MissingMethodProblem]("akka.remote.InvalidAssociation.apply"),