=per add detailed debug logging in ReplayFilter

This commit is contained in:
Patrik Nordwall 2015-12-16 14:41:22 +01:00
parent a99fee96df
commit 0af98e7bee
5 changed files with 31 additions and 15 deletions

View file

@ -128,6 +128,10 @@ akka.persistence {
# How many old writerUuid to remember # How many old writerUuid to remember
max-old-writers = 10 max-old-writers = 10
# Set this to `on` to enable detailed debug logging of each
# replayed event.
debug = off
} }
} }

View file

@ -47,6 +47,7 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery {
private def isReplayFilterEnabled: Boolean = replayFilterMode != ReplayFilter.Disabled private def isReplayFilterEnabled: Boolean = replayFilterMode != ReplayFilter.Disabled
private val replayFilterWindowSize: Int = config.getInt("replay-filter.window-size") private val replayFilterWindowSize: Int = config.getInt("replay-filter.window-size")
private val replayFilterMaxOldWriters: Int = config.getInt("replay-filter.max-old-writers") private val replayFilterMaxOldWriters: Int = config.getInt("replay-filter.max-old-writers")
private val replayDebugEnabled: Boolean = config.getBoolean("replay-filter.debug")
private val resequencer = context.actorOf(Props[Resequencer]()) private val resequencer = context.actorOf(Props[Resequencer]())
private var resequencerCounter = 1L private var resequencerCounter = 1L
@ -121,7 +122,7 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery {
case r @ ReplayMessages(fromSequenceNr, toSequenceNr, max, persistenceId, persistentActor) case r @ ReplayMessages(fromSequenceNr, toSequenceNr, max, persistenceId, persistentActor)
val replyTo = val replyTo =
if (isReplayFilterEnabled) context.actorOf(ReplayFilter.props(persistentActor, replayFilterMode, if (isReplayFilterEnabled) context.actorOf(ReplayFilter.props(persistentActor, replayFilterMode,
replayFilterWindowSize, replayFilterMaxOldWriters)) replayFilterWindowSize, replayFilterMaxOldWriters, replayDebugEnabled))
else persistentActor else persistentActor
val readHighestSequenceNrFrom = math.max(0L, fromSequenceNr - 1) val readHighestSequenceNrFrom = math.max(0L, fromSequenceNr - 1)

View file

@ -22,11 +22,12 @@ private[akka] object ReplayFilter {
persistentActor: ActorRef, persistentActor: ActorRef,
mode: Mode, mode: Mode,
windowSize: Int, windowSize: Int,
maxOldWriters: Int): Props = { maxOldWriters: Int,
debugEnabled: Boolean): Props = {
require(windowSize > 0, "windowSize must be > 0") require(windowSize > 0, "windowSize must be > 0")
require(maxOldWriters > 0, "maxOldWriters must be > 0") require(maxOldWriters > 0, "maxOldWriters must be > 0")
require(mode != Disabled, "mode must not be Disabled") require(mode != Disabled, "mode must not be Disabled")
Props(new ReplayFilter(persistentActor, mode, windowSize, maxOldWriters)) Props(new ReplayFilter(persistentActor, mode, windowSize, maxOldWriters, debugEnabled))
} }
sealed trait Mode sealed trait Mode
@ -40,7 +41,7 @@ private[akka] object ReplayFilter {
* INTERNAL API * INTERNAL API
*/ */
private[akka] class ReplayFilter(persistentActor: ActorRef, mode: ReplayFilter.Mode, private[akka] class ReplayFilter(persistentActor: ActorRef, mode: ReplayFilter.Mode,
windowSize: Int, maxOldWriters: Int) windowSize: Int, maxOldWriters: Int, debugEnabled: Boolean)
extends Actor with ActorLogging { extends Actor with ActorLogging {
import JournalProtocol._ import JournalProtocol._
import ReplayFilter.{ Warn, Fail, RepairByDiscardOld, Disabled } import ReplayFilter.{ Warn, Fail, RepairByDiscardOld, Disabled }
@ -52,6 +53,8 @@ private[akka] class ReplayFilter(persistentActor: ActorRef, mode: ReplayFilter.M
def receive = { def receive = {
case r @ ReplayedMessage(persistent) case r @ ReplayedMessage(persistent)
if (debugEnabled)
log.debug("Replay: {}", persistent)
try { try {
if (buffer.size == windowSize) { if (buffer.size == windowSize) {
val msg = buffer.removeFirst() val msg = buffer.removeFirst()
@ -124,6 +127,8 @@ private[akka] class ReplayFilter(persistentActor: ActorRef, mode: ReplayFilter.M
} }
case msg @ (_: RecoverySuccess | _: ReplayMessagesFailure) case msg @ (_: RecoverySuccess | _: ReplayMessagesFailure)
if (debugEnabled)
log.debug("Replay completed: {}", msg)
sendBuffered() sendBuffered()
persistentActor.tell(msg, Actor.noSender) persistentActor.tell(msg, Actor.noSender)
context.stop(self) context.stop(self)

View file

@ -26,7 +26,7 @@ class ReplayFilterSpec extends AkkaSpec with ImplicitSender {
"ReplayFilter in RepairByDiscardOld mode" must { "ReplayFilter in RepairByDiscardOld mode" must {
"pass on all replayed messages and then stop" in { "pass on all replayed messages and then stop" in {
val filter = system.actorOf(ReplayFilter.props( val filter = system.actorOf(ReplayFilter.props(
testActor, mode = RepairByDiscardOld, windowSize = 2, maxOldWriters = 10)) testActor, mode = RepairByDiscardOld, windowSize = 2, maxOldWriters = 10, debugEnabled = false))
filter ! m1 filter ! m1
filter ! m2 filter ! m2
filter ! m3 filter ! m3
@ -43,7 +43,7 @@ class ReplayFilterSpec extends AkkaSpec with ImplicitSender {
"pass on all replayed messages when switching writer" in { "pass on all replayed messages when switching writer" in {
val filter = system.actorOf(ReplayFilter.props( val filter = system.actorOf(ReplayFilter.props(
testActor, mode = RepairByDiscardOld, windowSize = 100, maxOldWriters = 10)) testActor, mode = RepairByDiscardOld, windowSize = 100, maxOldWriters = 10, debugEnabled = false))
filter ! m1 filter ! m1
filter ! m2 filter ! m2
val m32 = m3.copy(persistent = m3.persistent.update(writerUuid = writerB)) val m32 = m3.copy(persistent = m3.persistent.update(writerUuid = writerB))
@ -58,7 +58,7 @@ class ReplayFilterSpec extends AkkaSpec with ImplicitSender {
"discard message with same seqNo from old overlapping writer" in { "discard message with same seqNo from old overlapping writer" in {
val filter = system.actorOf(ReplayFilter.props( val filter = system.actorOf(ReplayFilter.props(
testActor, mode = RepairByDiscardOld, windowSize = 100, maxOldWriters = 10)) testActor, mode = RepairByDiscardOld, windowSize = 100, maxOldWriters = 10, debugEnabled = false))
EventFilter.warning(start = "Invalid replayed event", occurrences = 1) intercept { EventFilter.warning(start = "Invalid replayed event", occurrences = 1) intercept {
filter ! m1 filter ! m1
filter ! m2 filter ! m2
@ -76,7 +76,7 @@ class ReplayFilterSpec extends AkkaSpec with ImplicitSender {
"discard messages from old writer after switching writer" in { "discard messages from old writer after switching writer" in {
val filter = system.actorOf(ReplayFilter.props( val filter = system.actorOf(ReplayFilter.props(
testActor, mode = RepairByDiscardOld, windowSize = 100, maxOldWriters = 10)) testActor, mode = RepairByDiscardOld, windowSize = 100, maxOldWriters = 10, debugEnabled = false))
EventFilter.warning(start = "Invalid replayed event", occurrences = 2) intercept { EventFilter.warning(start = "Invalid replayed event", occurrences = 2) intercept {
filter ! m1 filter ! m1
filter ! m2 filter ! m2
@ -96,7 +96,7 @@ class ReplayFilterSpec extends AkkaSpec with ImplicitSender {
"discard messages from several old writers" in { "discard messages from several old writers" in {
val filter = system.actorOf(ReplayFilter.props( val filter = system.actorOf(ReplayFilter.props(
testActor, mode = RepairByDiscardOld, windowSize = 100, maxOldWriters = 10)) testActor, mode = RepairByDiscardOld, windowSize = 100, maxOldWriters = 10, debugEnabled = false))
EventFilter.warning(start = "Invalid replayed event", occurrences = 3) intercept { EventFilter.warning(start = "Invalid replayed event", occurrences = 3) intercept {
filter ! m1 filter ! m1
val m2b = m2.copy(persistent = m2.persistent.update(writerUuid = writerB)) val m2b = m2.copy(persistent = m2.persistent.update(writerUuid = writerB))
@ -124,7 +124,7 @@ class ReplayFilterSpec extends AkkaSpec with ImplicitSender {
"ReplayFilter in Fail mode" must { "ReplayFilter in Fail mode" must {
"fail when message with same seqNo from old overlapping writer" in { "fail when message with same seqNo from old overlapping writer" in {
val filter = system.actorOf(ReplayFilter.props( val filter = system.actorOf(ReplayFilter.props(
testActor, mode = Fail, windowSize = 100, maxOldWriters = 10)) testActor, mode = Fail, windowSize = 100, maxOldWriters = 10, debugEnabled = false))
EventFilter.error(start = "Invalid replayed event", occurrences = 1) intercept { EventFilter.error(start = "Invalid replayed event", occurrences = 1) intercept {
filter ! m1 filter ! m1
filter ! m2 filter ! m2
@ -139,7 +139,7 @@ class ReplayFilterSpec extends AkkaSpec with ImplicitSender {
"fail when messages from old writer after switching writer" in { "fail when messages from old writer after switching writer" in {
val filter = system.actorOf(ReplayFilter.props( val filter = system.actorOf(ReplayFilter.props(
testActor, mode = Fail, windowSize = 100, maxOldWriters = 10)) testActor, mode = Fail, windowSize = 100, maxOldWriters = 10, debugEnabled = false))
EventFilter.error(start = "Invalid replayed event", occurrences = 1) intercept { EventFilter.error(start = "Invalid replayed event", occurrences = 1) intercept {
filter ! m1 filter ! m1
filter ! m2 filter ! m2
@ -157,7 +157,7 @@ class ReplayFilterSpec extends AkkaSpec with ImplicitSender {
"ReplayFilter in Warn mode" must { "ReplayFilter in Warn mode" must {
"warn about message with same seqNo from old overlapping writer" in { "warn about message with same seqNo from old overlapping writer" in {
val filter = system.actorOf(ReplayFilter.props( val filter = system.actorOf(ReplayFilter.props(
testActor, mode = Warn, windowSize = 100, maxOldWriters = 10)) testActor, mode = Warn, windowSize = 100, maxOldWriters = 10, debugEnabled = false))
EventFilter.warning(start = "Invalid replayed event", occurrences = 1) intercept { EventFilter.warning(start = "Invalid replayed event", occurrences = 1) intercept {
filter ! m1 filter ! m1
filter ! m2 filter ! m2
@ -176,7 +176,7 @@ class ReplayFilterSpec extends AkkaSpec with ImplicitSender {
"warn about messages from old writer after switching writer" in { "warn about messages from old writer after switching writer" in {
val filter = system.actorOf(ReplayFilter.props( val filter = system.actorOf(ReplayFilter.props(
testActor, mode = Warn, windowSize = 100, maxOldWriters = 10)) testActor, mode = Warn, windowSize = 100, maxOldWriters = 10, debugEnabled = false))
EventFilter.warning(start = "Invalid replayed event", occurrences = 2) intercept { EventFilter.warning(start = "Invalid replayed event", occurrences = 2) intercept {
filter ! m1 filter ! m1
filter ! m2 filter ! m2
@ -197,7 +197,7 @@ class ReplayFilterSpec extends AkkaSpec with ImplicitSender {
"warn about messages from several old writers" in { "warn about messages from several old writers" in {
val filter = system.actorOf(ReplayFilter.props( val filter = system.actorOf(ReplayFilter.props(
testActor, mode = Warn, windowSize = 100, maxOldWriters = 10)) testActor, mode = Warn, windowSize = 100, maxOldWriters = 10, debugEnabled = false))
EventFilter.warning(start = "Invalid replayed event", occurrences = 3) intercept { EventFilter.warning(start = "Invalid replayed event", occurrences = 3) intercept {
filter ! m1 filter ! m1
val m2b = m2.copy(persistent = m2.persistent.update(writerUuid = writerB)) val m2b = m2.copy(persistent = m2.persistent.update(writerUuid = writerB))

View file

@ -585,7 +585,13 @@ object MiMa extends AutoPlugin {
FilterAnyProblem("akka.cluster.ddata.VersionVector"), FilterAnyProblem("akka.cluster.ddata.VersionVector"),
// #19133 change in internal actor // #19133 change in internal actor
ProblemFilters.exclude[MissingMethodProblem]("akka.remote.ReliableDeliverySupervisor.gated") ProblemFilters.exclude[MissingMethodProblem]("akka.remote.ReliableDeliverySupervisor.gated"),
// debug logging in ReplayFilter, change of internal actor
ProblemFilters.exclude[MissingMethodProblem]("akka.persistence.journal.ReplayFilter.this"),
ProblemFilters.exclude[MissingMethodProblem]("akka.persistence.journal.AsyncWriteJournal.akka$persistence$journal$AsyncWriteJournal$_setter_$akka$persistence$journal$AsyncWriteJournal$$replayDebugEnabled_="),
ProblemFilters.exclude[MissingMethodProblem]("akka.persistence.journal.AsyncWriteJournal.akka$persistence$journal$AsyncWriteJournal$$replayDebugEnabled"),
ProblemFilters.exclude[MissingMethodProblem]("akka.persistence.journal.ReplayFilter.props")
) )
) )
} }