better timestamp format for active-active (#29329)
This commit is contained in:
parent
c44302bd1e
commit
e79f5ac3c4
1 changed files with 15 additions and 3 deletions
|
|
@ -4,10 +4,15 @@
|
||||||
|
|
||||||
package akka.persistence.typed.internal
|
package akka.persistence.typed.internal
|
||||||
|
|
||||||
|
import java.time.Instant
|
||||||
|
import java.time.LocalDateTime
|
||||||
|
import java.time.ZoneId
|
||||||
|
import java.time.format.DateTimeFormatter
|
||||||
import java.util.concurrent.atomic.AtomicReference
|
import java.util.concurrent.atomic.AtomicReference
|
||||||
|
|
||||||
import scala.annotation.tailrec
|
import scala.annotation.tailrec
|
||||||
import scala.collection.immutable
|
import scala.collection.immutable
|
||||||
|
|
||||||
import akka.actor.UnhandledMessage
|
import akka.actor.UnhandledMessage
|
||||||
import akka.actor.typed.eventstream.EventStream
|
import akka.actor.typed.eventstream.EventStream
|
||||||
import akka.actor.typed.{ Behavior, Signal }
|
import akka.actor.typed.{ Behavior, Signal }
|
||||||
|
|
@ -52,7 +57,6 @@ import akka.stream.scaladsl.Keep
|
||||||
import akka.stream.{ SharedKillSwitch, SystemMaterializer }
|
import akka.stream.{ SharedKillSwitch, SystemMaterializer }
|
||||||
import akka.stream.scaladsl.{ RestartSource, Sink }
|
import akka.stream.scaladsl.{ RestartSource, Sink }
|
||||||
import akka.stream.typed.scaladsl.ActorFlow
|
import akka.stream.typed.scaladsl.ActorFlow
|
||||||
import akka.util.Helpers
|
|
||||||
import akka.util.OptionVal
|
import akka.util.OptionVal
|
||||||
import akka.util.unused
|
import akka.util.unused
|
||||||
import akka.util.Timeout
|
import akka.util.Timeout
|
||||||
|
|
@ -174,6 +178,13 @@ private[akka] object Running {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private val timestampFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")
|
||||||
|
private val UTC = ZoneId.of("UTC")
|
||||||
|
|
||||||
|
def formatTimestamp(time: Long): String = {
|
||||||
|
timestampFormatter.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(time), UTC))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ===============================================
|
// ===============================================
|
||||||
|
|
@ -186,6 +197,7 @@ private[akka] object Running {
|
||||||
import BehaviorSetup._
|
import BehaviorSetup._
|
||||||
import InternalProtocol._
|
import InternalProtocol._
|
||||||
import Running.RunningState
|
import Running.RunningState
|
||||||
|
import Running.formatTimestamp
|
||||||
|
|
||||||
// Needed for WithSeqNrAccessible, when unstashing
|
// Needed for WithSeqNrAccessible, when unstashing
|
||||||
private var _currentSequenceNumber = 0L
|
private var _currentSequenceNumber = 0L
|
||||||
|
|
@ -315,9 +327,9 @@ private[akka] object Running {
|
||||||
} else {
|
} else {
|
||||||
if (log.isTraceEnabled)
|
if (log.isTraceEnabled)
|
||||||
log.traceN(
|
log.traceN(
|
||||||
"Received published replicated event [{}] with timestamp [{}] from replica [{}] seqNr [{}]",
|
"Received published replicated event [{}] with timestamp [{} (UTC)] from replica [{}] seqNr [{}]",
|
||||||
Logging.simpleName(event.event.getClass),
|
Logging.simpleName(event.event.getClass),
|
||||||
Helpers.timestamp(event.timestamp),
|
formatTimestamp(event.timestamp),
|
||||||
originReplicaId,
|
originReplicaId,
|
||||||
event.sequenceNumber)
|
event.sequenceNumber)
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue