diff --git a/akka-persistence/src/main/resources/reference.conf b/akka-persistence/src/main/resources/reference.conf index 8d10a36ba5..f8422c7a10 100644 --- a/akka-persistence/src/main/resources/reference.conf +++ b/akka-persistence/src/main/resources/reference.conf @@ -128,6 +128,10 @@ akka.persistence { # How many old writerUuid to remember max-old-writers = 10 + + # Set this to `on` to enable detailed debug logging of each + # replayed event. + debug = off } } diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala index 47c74667aa..4b76d8acb7 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala @@ -47,6 +47,7 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery { private def isReplayFilterEnabled: Boolean = replayFilterMode != ReplayFilter.Disabled private val replayFilterWindowSize: Int = config.getInt("replay-filter.window-size") private val replayFilterMaxOldWriters: Int = config.getInt("replay-filter.max-old-writers") + private val replayDebugEnabled: Boolean = config.getBoolean("replay-filter.debug") private val resequencer = context.actorOf(Props[Resequencer]()) private var resequencerCounter = 1L @@ -121,7 +122,7 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery { case r @ ReplayMessages(fromSequenceNr, toSequenceNr, max, persistenceId, persistentActor) ⇒ val replyTo = if (isReplayFilterEnabled) context.actorOf(ReplayFilter.props(persistentActor, replayFilterMode, - replayFilterWindowSize, replayFilterMaxOldWriters)) + replayFilterWindowSize, replayFilterMaxOldWriters, replayDebugEnabled)) else persistentActor val readHighestSequenceNrFrom = math.max(0L, fromSequenceNr - 1) diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/ReplayFilter.scala b/akka-persistence/src/main/scala/akka/persistence/journal/ReplayFilter.scala index a3b8608b0a..b32e990f31 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/ReplayFilter.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/ReplayFilter.scala @@ -22,11 +22,12 @@ private[akka] object ReplayFilter { persistentActor: ActorRef, mode: Mode, windowSize: Int, - maxOldWriters: Int): Props = { + maxOldWriters: Int, + debugEnabled: Boolean): Props = { require(windowSize > 0, "windowSize must be > 0") require(maxOldWriters > 0, "maxOldWriters must be > 0") require(mode != Disabled, "mode must not be Disabled") - Props(new ReplayFilter(persistentActor, mode, windowSize, maxOldWriters)) + Props(new ReplayFilter(persistentActor, mode, windowSize, maxOldWriters, debugEnabled)) } sealed trait Mode @@ -40,7 +41,7 @@ private[akka] object ReplayFilter { * INTERNAL API */ private[akka] class ReplayFilter(persistentActor: ActorRef, mode: ReplayFilter.Mode, - windowSize: Int, maxOldWriters: Int) + windowSize: Int, maxOldWriters: Int, debugEnabled: Boolean) extends Actor with ActorLogging { import JournalProtocol._ import ReplayFilter.{ Warn, Fail, RepairByDiscardOld, Disabled } @@ -52,6 +53,8 @@ private[akka] class ReplayFilter(persistentActor: ActorRef, mode: ReplayFilter.M def receive = { case r @ ReplayedMessage(persistent) ⇒ + if (debugEnabled) + log.debug("Replay: {}", persistent) try { if (buffer.size == windowSize) { val msg = buffer.removeFirst() @@ -124,6 +127,8 @@ private[akka] class ReplayFilter(persistentActor: ActorRef, mode: ReplayFilter.M } case msg @ (_: RecoverySuccess | _: ReplayMessagesFailure) ⇒ + if (debugEnabled) + log.debug("Replay completed: {}", msg) sendBuffered() persistentActor.tell(msg, Actor.noSender) context.stop(self) diff --git a/akka-persistence/src/test/scala/akka/persistence/journal/ReplayFilterSpec.scala b/akka-persistence/src/test/scala/akka/persistence/journal/ReplayFilterSpec.scala index d66e440de5..00354a009c 100644 --- a/akka-persistence/src/test/scala/akka/persistence/journal/ReplayFilterSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/journal/ReplayFilterSpec.scala @@ -26,7 +26,7 @@ class ReplayFilterSpec extends AkkaSpec with ImplicitSender { "ReplayFilter in RepairByDiscardOld mode" must { "pass on all replayed messages and then stop" in { val filter = system.actorOf(ReplayFilter.props( - testActor, mode = RepairByDiscardOld, windowSize = 2, maxOldWriters = 10)) + testActor, mode = RepairByDiscardOld, windowSize = 2, maxOldWriters = 10, debugEnabled = false)) filter ! m1 filter ! m2 filter ! m3 @@ -43,7 +43,7 @@ class ReplayFilterSpec extends AkkaSpec with ImplicitSender { "pass on all replayed messages when switching writer" in { val filter = system.actorOf(ReplayFilter.props( - testActor, mode = RepairByDiscardOld, windowSize = 100, maxOldWriters = 10)) + testActor, mode = RepairByDiscardOld, windowSize = 100, maxOldWriters = 10, debugEnabled = false)) filter ! m1 filter ! m2 val m32 = m3.copy(persistent = m3.persistent.update(writerUuid = writerB)) @@ -58,7 +58,7 @@ class ReplayFilterSpec extends AkkaSpec with ImplicitSender { "discard message with same seqNo from old overlapping writer" in { val filter = system.actorOf(ReplayFilter.props( - testActor, mode = RepairByDiscardOld, windowSize = 100, maxOldWriters = 10)) + testActor, mode = RepairByDiscardOld, windowSize = 100, maxOldWriters = 10, debugEnabled = false)) EventFilter.warning(start = "Invalid replayed event", occurrences = 1) intercept { filter ! m1 filter ! m2 @@ -76,7 +76,7 @@ class ReplayFilterSpec extends AkkaSpec with ImplicitSender { "discard messages from old writer after switching writer" in { val filter = system.actorOf(ReplayFilter.props( - testActor, mode = RepairByDiscardOld, windowSize = 100, maxOldWriters = 10)) + testActor, mode = RepairByDiscardOld, windowSize = 100, maxOldWriters = 10, debugEnabled = false)) EventFilter.warning(start = "Invalid replayed event", occurrences = 2) intercept { filter ! m1 filter ! m2 @@ -96,7 +96,7 @@ class ReplayFilterSpec extends AkkaSpec with ImplicitSender { "discard messages from several old writers" in { val filter = system.actorOf(ReplayFilter.props( - testActor, mode = RepairByDiscardOld, windowSize = 100, maxOldWriters = 10)) + testActor, mode = RepairByDiscardOld, windowSize = 100, maxOldWriters = 10, debugEnabled = false)) EventFilter.warning(start = "Invalid replayed event", occurrences = 3) intercept { filter ! m1 val m2b = m2.copy(persistent = m2.persistent.update(writerUuid = writerB)) @@ -124,7 +124,7 @@ class ReplayFilterSpec extends AkkaSpec with ImplicitSender { "ReplayFilter in Fail mode" must { "fail when message with same seqNo from old overlapping writer" in { val filter = system.actorOf(ReplayFilter.props( - testActor, mode = Fail, windowSize = 100, maxOldWriters = 10)) + testActor, mode = Fail, windowSize = 100, maxOldWriters = 10, debugEnabled = false)) EventFilter.error(start = "Invalid replayed event", occurrences = 1) intercept { filter ! m1 filter ! m2 @@ -139,7 +139,7 @@ class ReplayFilterSpec extends AkkaSpec with ImplicitSender { "fail when messages from old writer after switching writer" in { val filter = system.actorOf(ReplayFilter.props( - testActor, mode = Fail, windowSize = 100, maxOldWriters = 10)) + testActor, mode = Fail, windowSize = 100, maxOldWriters = 10, debugEnabled = false)) EventFilter.error(start = "Invalid replayed event", occurrences = 1) intercept { filter ! m1 filter ! m2 @@ -157,7 +157,7 @@ class ReplayFilterSpec extends AkkaSpec with ImplicitSender { "ReplayFilter in Warn mode" must { "warn about message with same seqNo from old overlapping writer" in { val filter = system.actorOf(ReplayFilter.props( - testActor, mode = Warn, windowSize = 100, maxOldWriters = 10)) + testActor, mode = Warn, windowSize = 100, maxOldWriters = 10, debugEnabled = false)) EventFilter.warning(start = "Invalid replayed event", occurrences = 1) intercept { filter ! m1 filter ! m2 @@ -176,7 +176,7 @@ class ReplayFilterSpec extends AkkaSpec with ImplicitSender { "warn about messages from old writer after switching writer" in { val filter = system.actorOf(ReplayFilter.props( - testActor, mode = Warn, windowSize = 100, maxOldWriters = 10)) + testActor, mode = Warn, windowSize = 100, maxOldWriters = 10, debugEnabled = false)) EventFilter.warning(start = "Invalid replayed event", occurrences = 2) intercept { filter ! m1 filter ! m2 @@ -197,7 +197,7 @@ class ReplayFilterSpec extends AkkaSpec with ImplicitSender { "warn about messages from several old writers" in { val filter = system.actorOf(ReplayFilter.props( - testActor, mode = Warn, windowSize = 100, maxOldWriters = 10)) + testActor, mode = Warn, windowSize = 100, maxOldWriters = 10, debugEnabled = false)) EventFilter.warning(start = "Invalid replayed event", occurrences = 3) intercept { filter ! m1 val m2b = m2.copy(persistent = m2.persistent.update(writerUuid = writerB)) diff --git a/project/MiMa.scala b/project/MiMa.scala index 1b0550b70b..0b7dbacf19 100644 --- a/project/MiMa.scala +++ b/project/MiMa.scala @@ -585,7 +585,13 @@ object MiMa extends AutoPlugin { FilterAnyProblem("akka.cluster.ddata.VersionVector"), // #19133 change in internal actor - ProblemFilters.exclude[MissingMethodProblem]("akka.remote.ReliableDeliverySupervisor.gated") + ProblemFilters.exclude[MissingMethodProblem]("akka.remote.ReliableDeliverySupervisor.gated"), + + // debug logging in ReplayFilter, change of internal actor + ProblemFilters.exclude[MissingMethodProblem]("akka.persistence.journal.ReplayFilter.this"), + ProblemFilters.exclude[MissingMethodProblem]("akka.persistence.journal.AsyncWriteJournal.akka$persistence$journal$AsyncWriteJournal$_setter_$akka$persistence$journal$AsyncWriteJournal$$replayDebugEnabled_="), + ProblemFilters.exclude[MissingMethodProblem]("akka.persistence.journal.AsyncWriteJournal.akka$persistence$journal$AsyncWriteJournal$$replayDebugEnabled"), + ProblemFilters.exclude[MissingMethodProblem]("akka.persistence.journal.ReplayFilter.props") ) ) }