diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorAdapter.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorAdapter.scala index 900ba9cee1..c050d479cc 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorAdapter.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorAdapter.scala @@ -49,8 +49,7 @@ import akka.util.OptionVal * INTERNAL API */ @InternalApi private[typed] final class ActorAdapter[T](_initialBehavior: Behavior[T], rethrowTypedFailure: Boolean) - extends classic.Actor - with classic.ActorLogging { + extends classic.Actor { private var behavior: Behavior[T] = _initialBehavior def currentBehavior: Behavior[T] = behavior @@ -181,7 +180,7 @@ import akka.util.OptionVal case Success(a) => body(a) case Failure(ex) => - log.error(ex, s"Exception thrown out of adapter. Stopping myself. ${ex.getMessage}") + ctx.log.error(s"Exception thrown out of adapter. Stopping myself. ${ex.getMessage}", ex) context.stop(self) } } @@ -220,7 +219,7 @@ import akka.util.OptionVal case e => e.getMessage } // log at Error as that is what the supervision strategy would have done. - log.error(ex, logMessage) + ctx.log.error(logMessage, ex) if (isTypedActor) classic.SupervisorStrategy.Stop else diff --git a/akka-actor/src/main/scala/akka/event/Logging.scala b/akka-actor/src/main/scala/akka/event/Logging.scala index 251d695bd0..c4af856961 100644 --- a/akka-actor/src/main/scala/akka/event/Logging.scala +++ b/akka-actor/src/main/scala/akka/event/Logging.scala @@ -270,6 +270,11 @@ trait LoggingBus extends ActorEventBus { def getClazz(t: T): Class[_] = t.getClass } +/** + * INTERNAL API + */ +@InternalApi private[akka] final case class ActorWithLogClass(actor: Actor, logClass: Class[_]) + /** * This is a “marker” class which is inserted as originator class into * [[akka.event.Logging.LogEvent]] when the string representation was supplied @@ -319,6 +324,16 @@ object LogSource { } } + /** + * INTERNAL API + */ + @InternalApi private[akka] implicit val fromActorWithLoggerClass: LogSource[ActorWithLogClass] = + new LogSource[ActorWithLogClass] { + def genString(a: ActorWithLogClass) = fromActor.genString(a.actor) + override def genString(a: ActorWithLogClass, system: ActorSystem) = fromActor.genString(a.actor, system) + override def getClazz(a: ActorWithLogClass): Class[_] = a.logClass + } + // this one unfortunately does not work as implicit, because existential types have some weird behavior val fromClass: LogSource[Class[_]] = new LogSource[Class[_]] { def genString(c: Class[_]): String = Logging.simpleName(c) diff --git a/akka-actor/src/main/scala/akka/io/Dns.scala b/akka-actor/src/main/scala/akka/io/Dns.scala index 5d009de77f..7119d0d9d4 100644 --- a/akka-actor/src/main/scala/akka/io/Dns.scala +++ b/akka-actor/src/main/scala/akka/io/Dns.scala @@ -14,8 +14,9 @@ import com.typesafe.config.Config import java.util.function.{ Function => JFunction } import akka.util.unused - import scala.collection.immutable + +import akka.event.Logging import akka.util.ccompat._ @ccompatUsedUntil213 @@ -117,7 +118,9 @@ class DnsExt private[akka] (val system: ExtendedActorSystem, resolverName: Strin val settings = new Settings(system.settings.config.getConfig("akka.io.dns"), "async-dns") val provider = system.dynamicAccess.createInstanceFor[DnsProvider](settings.ProviderObjectName, Nil).get - system.log.info("Creating async dns resolver {} with manager name {}", settings.Resolver, managerName) + Logging(system, classOf[DnsExt]) + .info("Creating async dns resolver {} with manager name {}", settings.Resolver, managerName) + system.systemActorOf( props = Props( provider.managerClass, diff --git a/akka-actor/src/main/scala/akka/io/dns/DnsSettings.scala b/akka-actor/src/main/scala/akka/io/dns/DnsSettings.scala index 9bf4486701..f7d0d04061 100644 --- a/akka-actor/src/main/scala/akka/io/dns/DnsSettings.scala +++ b/akka-actor/src/main/scala/akka/io/dns/DnsSettings.scala @@ -18,11 +18,12 @@ import akka.util.JavaDurationConverters._ import akka.util.ccompat.JavaConverters._ import akka.util.ccompat._ import com.typesafe.config.{ Config, ConfigValueType } - import scala.collection.immutable import scala.concurrent.duration.FiniteDuration import scala.util.{ Failure, Success, Try } +import akka.event.Logging + /** INTERNAL API */ @InternalApi @ccompatUsedUntil213 @@ -76,8 +77,9 @@ private[dns] final class DnsSettings(system: ExtendedActorSystem, c: Config) { parsed match { case Success(value) => Some(value) case Failure(exception) => - if (system.log.isWarningEnabled) { - system.log.error(exception, "Error parsing /etc/resolv.conf, ignoring.") + val log = Logging(system, getClass) + if (log.isWarningEnabled) { + log.error(exception, "Error parsing /etc/resolv.conf, ignoring.") } None } diff --git a/akka-actor/src/main/scala/akka/serialization/AsyncSerializer.scala b/akka-actor/src/main/scala/akka/serialization/AsyncSerializer.scala index 2ae2fd11f6..fecf5857cb 100644 --- a/akka-actor/src/main/scala/akka/serialization/AsyncSerializer.scala +++ b/akka-actor/src/main/scala/akka/serialization/AsyncSerializer.scala @@ -10,6 +10,8 @@ import akka.actor.ExtendedActorSystem import scala.concurrent.duration.Duration import scala.concurrent.{ Await, Future } +import akka.event.Logging + /** * Serializer that supports async serialization. * @@ -38,15 +40,18 @@ trait AsyncSerializer { abstract class AsyncSerializerWithStringManifest(system: ExtendedActorSystem) extends SerializerWithStringManifest with AsyncSerializer { + + private val log = Logging(system, getClass) + final override def toBinary(o: AnyRef): Array[Byte] = { - system.log.warning( + log.warning( "Async serializer called synchronously. This will block. Async serializers should only be used for akka persistence plugins that support them. Class: {}", o.getClass) Await.result(toBinaryAsync(o), Duration.Inf) } final override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = { - system.log.warning( + log.warning( "Async serializer called synchronously. This will block. Async serializers should only be used for akka persistence plugins that support them. Manifest: [{}]", manifest) Await.result(fromBinaryAsync(bytes, manifest), Duration.Inf) diff --git a/akka-cluster-metrics/src/main/scala/akka/cluster/metrics/ClusterMetricsExtension.scala b/akka-cluster-metrics/src/main/scala/akka/cluster/metrics/ClusterMetricsExtension.scala index 3e6c506efb..e0d1bde6da 100644 --- a/akka-cluster-metrics/src/main/scala/akka/cluster/metrics/ClusterMetricsExtension.scala +++ b/akka-cluster-metrics/src/main/scala/akka/cluster/metrics/ClusterMetricsExtension.scala @@ -49,7 +49,7 @@ class ClusterMetricsExtension(system: ExtendedActorSystem) extends Extension { SupervisorStrategyProvider, immutable.Seq(classOf[Config] -> SupervisorStrategyConfiguration)) .getOrElse { - val log: LoggingAdapter = Logging(system, getClass.getName) + val log: LoggingAdapter = Logging(system, getClass) log.error(s"Configured strategy provider ${SupervisorStrategyProvider} failed to load, using default ${classOf[ ClusterMetricsStrategy].getName}.") new ClusterMetricsStrategy(SupervisorStrategyConfiguration) diff --git a/akka-cluster-metrics/src/main/scala/akka/cluster/metrics/MetricsCollector.scala b/akka-cluster-metrics/src/main/scala/akka/cluster/metrics/MetricsCollector.scala index 6e01a424b6..a803da86e4 100644 --- a/akka-cluster-metrics/src/main/scala/akka/cluster/metrics/MetricsCollector.scala +++ b/akka-cluster-metrics/src/main/scala/akka/cluster/metrics/MetricsCollector.scala @@ -47,7 +47,7 @@ private[metrics] object MetricsCollector { /** Try to create collector instance in the order of priority. */ def apply(system: ActorSystem): MetricsCollector = { - val log = Logging(system, getClass.getName) + val log = Logging(system, getClass) val settings = ClusterMetricsSettings(system.settings.config) import settings._ diff --git a/akka-cluster/src/main/mima-filters/2.5.x.backwards.excludes/issue-27922-logging.excludes b/akka-cluster/src/main/mima-filters/2.5.x.backwards.excludes/issue-27922-logging.excludes new file mode 100644 index 0000000000..7c46f333c3 --- /dev/null +++ b/akka-cluster/src/main/mima-filters/2.5.x.backwards.excludes/issue-27922-logging.excludes @@ -0,0 +1,8 @@ +# #27922 More structured logger names, remove usage of ActorLogging +ProblemFilters.exclude[MissingTypesProblem]("akka.cluster.ClusterHeartbeatSender") +ProblemFilters.exclude[MissingTypesProblem]("akka.cluster.ClusterHeartbeatReceiver") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ClusterHeartbeatReceiver.log") +ProblemFilters.exclude[MissingTypesProblem]("akka.cluster.CrossDcHeartbeatSender") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.CrossDcHeartbeatSender.log") +ProblemFilters.exclude[MissingTypesProblem]("akka.cluster.ClusterDomainEventPublisher") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ClusterDomainEventPublisher.log") diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf index a9b27a3f2a..4317057bbc 100644 --- a/akka-cluster/src/main/resources/reference.conf +++ b/akka-cluster/src/main/resources/reference.conf @@ -95,11 +95,13 @@ akka { # until the cluster has reached a certain size. min-nr-of-members = 1 - # Enable/disable info level logging of cluster events + # Enable/disable info level logging of cluster events. + # These are logged with logger name `akka.cluster.Cluster`. log-info = on # Enable/disable verbose info-level logging of cluster events # for temporary troubleshooting. Defaults to 'off'. + # These are logged with logger name `akka.cluster.Cluster`. log-info-verbose = off # Enable or disable JMX MBeans for management of the cluster @@ -260,7 +262,8 @@ akka { } debug { - # log heartbeat events (very verbose, useful mostly when debugging heartbeating issues) + # Log heartbeat events (very verbose, useful mostly when debugging heartbeating issues). + # These are logged with logger name `akka.cluster.ClusterHeartbeat`. verbose-heartbeat-logging = off # log verbose details about gossip diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 89b5d73365..f6b87bf214 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -101,7 +101,7 @@ class Cluster(val system: ExtendedActorSystem) extends Extension { scala.collection.JavaConverters.setAsJavaSetConverter(selfRoles).asJava private val _isTerminated = new AtomicBoolean(false) - private val log = Logging(system, getClass.getName) + private val log = Logging(system, ClusterLogClass.ClusterCore) // ClusterJmx is initialized as the last thing in the constructor private var clusterJmx: Option[ClusterJmx] = None @@ -463,7 +463,12 @@ class Cluster(val system: ExtendedActorSystem) extends Extension { /** * INTERNAL API */ - private[cluster] object ClusterLogger { + private[cluster] object ClusterLogger extends ClusterLogger(log) + + /** + * INTERNAL API + */ + private[cluster] class ClusterLogger(log: LoggingAdapter) { def isDebugEnabled: Boolean = log.isDebugEnabled diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index c5f056a96e..7db2ba1adf 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -22,6 +22,8 @@ import scala.concurrent.Future import scala.concurrent.Promise import scala.util.control.NonFatal +import akka.event.ActorWithLogClass +import akka.event.Logging import com.github.ghik.silencer.silent /** @@ -320,6 +322,9 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh val selfDc = cluster.selfDataCenter + private val gossipLogger = + new cluster.ClusterLogger(Logging(context.system, ActorWithLogClass(this, ClusterLogClass.ClusterGossip))) + protected def selfUniqueAddress = cluster.selfUniqueAddress val vclockNode = VectorClock.Node(Gossip.vclockName(selfUniqueAddress)) @@ -941,9 +946,9 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh def receiveGossipStatus(status: GossipStatus): Unit = { val from = status.from if (!latestGossip.hasMember(from)) - logInfo("Ignoring received gossip status from unknown [{}]", from) + gossipLogger.logInfo("Ignoring received gossip status from unknown [{}]", from) else if (!latestGossip.isReachable(selfUniqueAddress, from)) - logInfo("Ignoring received gossip status from unreachable [{}] ", from) + gossipLogger.logInfo("Ignoring received gossip status from unreachable [{}] ", from) else { status.version.compareTo(latestGossip.version) match { case VectorClock.Same => // same version @@ -973,19 +978,22 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh val localGossip = latestGossip if (remoteGossip eq Gossip.empty) { - logDebug("Ignoring received gossip from [{}] to protect against overload", from) + gossipLogger.logDebug("Ignoring received gossip from [{}] to protect against overload", from) Ignored } else if (envelope.to != selfUniqueAddress) { - logInfo("Ignoring received gossip intended for someone else, from [{}] to [{}]", from.address, envelope.to) + gossipLogger.logInfo( + "Ignoring received gossip intended for someone else, from [{}] to [{}]", + from.address, + envelope.to) Ignored } else if (!localGossip.hasMember(from)) { - logInfo("Ignoring received gossip from unknown [{}]", from) + gossipLogger.logInfo("Ignoring received gossip from unknown [{}]", from) Ignored } else if (!localGossip.isReachable(selfUniqueAddress, from)) { - logInfo("Ignoring received gossip from unreachable [{}] ", from) + gossipLogger.logInfo("Ignoring received gossip from unreachable [{}] ", from) Ignored } else if (remoteGossip.members.forall(_.uniqueAddress != selfUniqueAddress)) { - logInfo("Ignoring received gossip that does not contain myself, from [{}]", from) + gossipLogger.logInfo("Ignoring received gossip that does not contain myself, from [{}]", from) Ignored } else { val comparison = remoteGossip.version.compareTo(localGossip.version) @@ -1010,14 +1018,14 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh // Removal of member itself is handled in merge (pickHighestPriority) val prunedLocalGossip = localGossip.members.foldLeft(localGossip) { (g, m) => if (removeUnreachableWithMemberStatus(m.status) && !remoteGossip.members.contains(m)) { - logDebug("Pruned conflicting local gossip: {}", m) + gossipLogger.logDebug("Pruned conflicting local gossip: {}", m) g.prune(VectorClock.Node(Gossip.vclockName(m.uniqueAddress))) } else g } val prunedRemoteGossip = remoteGossip.members.foldLeft(remoteGossip) { (g, m) => if (removeUnreachableWithMemberStatus(m.status) && !localGossip.members.contains(m)) { - logDebug("Pruned conflicting remote gossip: {}", m) + gossipLogger.logDebug("Pruned conflicting remote gossip: {}", m) g.prune(VectorClock.Node(Gossip.vclockName(m.uniqueAddress))) } else g @@ -1043,10 +1051,10 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh } } - logDebug("Receiving gossip from [{}]", from) + gossipLogger.logDebug("Receiving gossip from [{}]", from) if (comparison == VectorClock.Concurrent && cluster.settings.Debug.VerboseGossipLogging) { - logDebug( + gossipLogger.logDebug( """Couldn't establish a causal relationship between "remote" gossip and "local" gossip - Remote[{}] - Local[{}] - merged them into [{}]""", remoteGossip, localGossip, @@ -1127,7 +1135,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh gossipTo(peer) case None => // nothing to see here if (cluster.settings.Debug.VerboseGossipLogging) - logDebug("will not gossip this round") + gossipLogger.logDebug("will not gossip this round") } } @@ -1321,7 +1329,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh if (targets.nonEmpty) { if (isDebugEnabled) - logDebug( + gossipLogger.logDebug( "Gossip exiting members [{}] to the two oldest (per role) [{}] (singleton optimization).", exitingMembers.mkString(", "), targets.mkString(", ")) @@ -1479,7 +1487,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh def publishMembershipState(): Unit = { if (cluster.settings.Debug.VerboseGossipLogging) - logDebug("New gossip published [{}]", membershipState.latestGossip) + gossipLogger.logDebug("New gossip published [{}]", membershipState.latestGossip) publisher ! PublishChanges(membershipState) if (PublishStatsInterval == Duration.Zero) publishInternalStats() diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala index d41897a3cd..225dcea320 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala @@ -7,7 +7,7 @@ package akka.cluster import language.postfixOps import scala.collection.immutable import scala.collection.immutable.{ SortedSet, VectorBuilder } -import akka.actor.{ Actor, ActorLogging, ActorRef, Address } +import akka.actor.{ Actor, ActorRef, Address } import akka.cluster.ClusterSettings.DataCenter import akka.cluster.ClusterEvent._ import akka.cluster.MemberStatus._ @@ -572,7 +572,6 @@ object ClusterEvent { */ private[cluster] final class ClusterDomainEventPublisher extends Actor - with ActorLogging with RequiresMessageQueue[UnboundedMessageQueueSemantics] { import InternalClusterAction._ diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala index 0cccbda055..0cebc860f3 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala @@ -8,20 +8,20 @@ import java.util.concurrent.TimeUnit import scala.annotation.tailrec import scala.collection.immutable -import akka.actor.{ - Actor, - ActorLogging, - ActorPath, - ActorSelection, - Address, - DeadLetterSuppression, - Props, - RootActorPath -} + +import akka.actor.Actor +import akka.actor.ActorPath +import akka.actor.ActorSelection +import akka.actor.Address +import akka.actor.DeadLetterSuppression +import akka.actor.Props +import akka.actor.RootActorPath +import akka.annotation.InternalApi import akka.cluster.ClusterEvent._ +import akka.event.ActorWithLogClass +import akka.event.Logging import akka.remote.FailureDetectorRegistry import akka.remote.HeartbeatMessage -import akka.annotation.InternalApi import akka.util.ccompat._ /** @@ -31,7 +31,7 @@ import akka.util.ccompat._ */ @InternalApi @ccompatUsedUntil213 -private[cluster] final class ClusterHeartbeatReceiver(getCluster: () => Cluster) extends Actor with ActorLogging { +private[cluster] final class ClusterHeartbeatReceiver(getCluster: () => Cluster) extends Actor { import ClusterHeartbeatSender._ // Important - don't use Cluster(context.system) in constructor because that would @@ -40,10 +40,13 @@ private[cluster] final class ClusterHeartbeatReceiver(getCluster: () => Cluster) lazy val verboseHeartbeat = cluster.settings.Debug.VerboseHeartbeatLogging + private lazy val clusterLogger = + new cluster.ClusterLogger(Logging(context.system, ActorWithLogClass(this, ClusterLogClass.ClusterHeartbeat))) + def receive: Receive = { case hb: Heartbeat => // TODO log the sequence nr once serializer is enabled - if (verboseHeartbeat) cluster.ClusterLogger.logDebug("Heartbeat from [{}]", hb.from) + if (verboseHeartbeat) clusterLogger.logDebug("Heartbeat from [{}]", hb.from) sender() ! HeartbeatRsp(cluster.selfUniqueAddress, hb.sequenceNr, hb.creationTimeNanos) } @@ -93,16 +96,21 @@ private[cluster] object ClusterHeartbeatSender { * a few other nodes, which will reply and then this actor updates the * failure detector. */ -private[cluster] class ClusterHeartbeatSender extends Actor with ActorLogging { +private[cluster] class ClusterHeartbeatSender extends Actor { import ClusterHeartbeatSender._ val cluster = Cluster(context.system) - import cluster.ClusterLogger._ val verboseHeartbeat = cluster.settings.Debug.VerboseHeartbeatLogging - import cluster.{ scheduler, selfAddress, selfUniqueAddress } + import cluster.scheduler + import cluster.selfAddress + import cluster.selfUniqueAddress import cluster.settings._ import context.dispatcher + private val clusterLogger = + new cluster.ClusterLogger(Logging(context.system, ActorWithLogClass(this, ClusterLogClass.ClusterHeartbeat))) + import clusterLogger._ + val filterInternalClusterMembers: Member => Boolean = _.dataCenter == cluster.selfDataCenter diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterLogClass.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterLogClass.scala new file mode 100644 index 0000000000..30092c94a4 --- /dev/null +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterLogClass.scala @@ -0,0 +1,28 @@ +/* + * Copyright (C) 2019 Lightbend Inc. + */ + +package akka.cluster + +import akka.annotation.InternalApi + +/** + * INTERNAL API + */ +@InternalApi private[akka] object ClusterLogClass { + + val ClusterCore: Class[Cluster] = classOf[Cluster] + val ClusterHeartbeat: Class[ClusterHeartbeat] = classOf[ClusterHeartbeat] + val ClusterGossip: Class[ClusterGossip] = classOf[ClusterGossip] + +} + +/** + * INTERNAL API: Logger class for (verbose) heartbeat logging. + */ +@InternalApi private[akka] class ClusterHeartbeat + +/** + * INTERNAL API: Logger class for (verbose) gossip logging. + */ +@InternalApi private[akka] class ClusterGossip diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterRemoteWatcher.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterRemoteWatcher.scala index 41263e2018..4a1c9a5615 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterRemoteWatcher.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterRemoteWatcher.scala @@ -14,6 +14,8 @@ import akka.cluster.ClusterEvent.MemberUp import akka.cluster.ClusterEvent.MemberRemoved import akka.cluster.ClusterEvent.MemberWeaklyUp import akka.dispatch.Dispatchers +import akka.event.ActorWithLogClass +import akka.event.Logging import akka.remote.FailureDetectorRegistry import akka.remote.RemoteSettings import akka.remote.RemoteWatcher @@ -66,6 +68,8 @@ private[cluster] class ClusterRemoteWatcher( val cluster = Cluster(context.system) import cluster.selfAddress + override val log = Logging(context.system, ActorWithLogClass(this, ClusterLogClass.ClusterCore)) + private var pendingDelayedQuarantine: Set[UniqueAddress] = Set.empty var clusterNodes: Set[Address] = Set.empty diff --git a/akka-cluster/src/main/scala/akka/cluster/CrossDcClusterHeartbeat.scala b/akka-cluster/src/main/scala/akka/cluster/CrossDcClusterHeartbeat.scala index 2b63123a64..adea65ffc4 100644 --- a/akka-cluster/src/main/scala/akka/cluster/CrossDcClusterHeartbeat.scala +++ b/akka-cluster/src/main/scala/akka/cluster/CrossDcClusterHeartbeat.scala @@ -4,17 +4,19 @@ package akka.cluster -import akka.actor.{ Actor, ActorLogging, ActorSelection, Address, NoSerializationVerificationNeeded } +import akka.actor.{ Actor, ActorSelection, Address, NoSerializationVerificationNeeded } import akka.annotation.InternalApi import akka.cluster.ClusterEvent._ import akka.cluster.ClusterSettings.DataCenter import akka.remote.FailureDetectorRegistry import akka.util.ConstantFun import akka.util.ccompat._ - import scala.collection.SortedSet import scala.collection.immutable +import akka.event.ActorWithLogClass +import akka.event.Logging + /** * INTERNAL API * @@ -34,17 +36,20 @@ import scala.collection.immutable */ @InternalApi @ccompatUsedUntil213 -private[cluster] class CrossDcHeartbeatSender extends Actor with ActorLogging { +private[cluster] class CrossDcHeartbeatSender extends Actor { import CrossDcHeartbeatSender._ val cluster = Cluster(context.system) - import cluster.ClusterLogger._ val verboseHeartbeat = cluster.settings.Debug.VerboseHeartbeatLogging import cluster.settings._ import cluster.{ scheduler, selfAddress, selfDataCenter, selfUniqueAddress } import context.dispatcher + private val clusterLogger = + new cluster.ClusterLogger(Logging(context.system, ActorWithLogClass(this, ClusterLogClass.ClusterHeartbeat))) + import clusterLogger._ + // For inspecting if in active state; allows avoiding "becoming active" when already active var activelyMonitoring = false @@ -78,7 +83,8 @@ private[cluster] class CrossDcHeartbeatSender extends Actor with ActorLogging { override def preStart(): Unit = { cluster.subscribe(self, classOf[MemberEvent]) - if (verboseHeartbeat) log.debug("Initialized cross-dc heartbeat sender as DORMANT in DC: [{}]", selfDataCenter) + if (verboseHeartbeat) + clusterLogger.logDebug("Initialized cross-dc heartbeat sender as DORMANT in DC: [{}]", selfDataCenter) } override def postStop(): Unit = { @@ -143,7 +149,7 @@ private[cluster] class CrossDcHeartbeatSender extends Actor with ActorLogging { // since we only monitor nodes in Up or later states, due to the n-th oldest requirement dataCentersState = dataCentersState.addMember(m) if (verboseHeartbeat && m.dataCenter != selfDataCenter) - log.debug("Register member {} for cross DC heartbeat (will only heartbeat if oldest)", m) + clusterLogger.logDebug("Register member {} for cross DC heartbeat (will only heartbeat if oldest)", m) becomeActiveIfResponsibleForHeartbeat() } @@ -192,14 +198,14 @@ private[cluster] class CrossDcHeartbeatSender extends Actor with ActorLogging { /** Idempotent, become active if this node is n-th oldest and should monitor other nodes */ private def becomeActiveIfResponsibleForHeartbeat(): Unit = { if (!activelyMonitoring && selfIsResponsibleForCrossDcHeartbeat()) { - log.info( + logInfo( "Cross DC heartbeat becoming ACTIVE on this node (for DC: {}), monitoring other DCs oldest nodes", selfDataCenter) activelyMonitoring = true context.become(active.orElse(introspecting)) } else if (!activelyMonitoring) - if (verboseHeartbeat) log.info("Remaining DORMANT; others in {} handle heartbeating other DCs", selfDataCenter) + if (verboseHeartbeat) logInfo("Remaining DORMANT; others in {} handle heartbeating other DCs", selfDataCenter) } } diff --git a/akka-docs/src/main/paradox/typed/logging.md b/akka-docs/src/main/paradox/typed/logging.md index 6bdd7c1f15..c1c624b824 100644 --- a/akka-docs/src/main/paradox/typed/logging.md +++ b/akka-docs/src/main/paradox/typed/logging.md @@ -253,6 +253,9 @@ akka { The `stdout-loglevel` is only in effect during system startup and shutdown, and setting it to `OFF` as well, ensures that nothing gets logged during system startup or shutdown. +See @ref:[Logger names](#logger-names) for configuration of log level in SLF4J backend for certain +modules of Akka. + ### Logging to stdout during startup and shutdown When the actor system is starting up and shutting down the configured `loggers` are not used. @@ -394,6 +397,36 @@ With Logback the timestamp is available with `%X{akkaTimestamp}` specifier withi ``` +### Logger names + +It can be useful to enable debug level or other SLF4J backend configuration for certain modules of Akka when +troubleshooting. Those logger names are typically prefixed with the package name of the classes in that module. +For example, in Logback the configuration may look like this to enable debug logging for Cluster Sharding: + +``` + + + + + +``` + +Other examples of logger names or prefixes: + +``` +akka.cluster +akka.cluster.Cluster +akka.cluster.ClusterHeartbeat +akka.cluster.ClusterGossip +akka.cluster.ddata +akka.cluster.pubsub +akka.cluster.singleton +akka.cluster.sharding +akka.coordination.lease +akka.discovery +akka.persistence +akka.remote +``` ## Logging in tests diff --git a/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/Conductor.scala b/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/Conductor.scala index 258e5258ef..16c749a131 100644 --- a/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/Conductor.scala +++ b/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/Conductor.scala @@ -428,7 +428,7 @@ private[akka] class Controller(private var initialParticipants: Int, controllerP Server, controllerPort, settings.ServerSocketWorkerPoolSize, - new ConductorHandler(settings.QueryTimeout, self, Logging(context.system, classOf[ConductorHandler].getName))) + new ConductorHandler(settings.QueryTimeout, self, Logging(context.system, classOf[ConductorHandler]))) /* * Supervision of the BarrierCoordinator means to catch all his bad emotions diff --git a/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/Player.scala b/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/Player.scala index 6d435124e4..9a77709a6e 100644 --- a/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/Player.scala +++ b/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/Player.scala @@ -179,7 +179,7 @@ private[akka] class ClientFSM(name: RoleName, controllerAddr: InetSocketAddress) settings.ReconnectBackoff, settings.ClientSocketWorkerPoolSize, self, - Logging(context.system, classOf[PlayerHandler].getName), + Logging(context.system, classOf[PlayerHandler]), context.system.scheduler)(context.dispatcher) startWith(Connecting, Data(None, None)) diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/AllPersistenceIdsStage.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/AllPersistenceIdsStage.scala index dcb7dd50d1..755160b7c3 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/AllPersistenceIdsStage.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/AllPersistenceIdsStage.scala @@ -40,6 +40,8 @@ final private[akka] class AllPersistenceIdsStage(liveQuery: Boolean, writeJourna val journal: ActorRef = Persistence(eagerMaterializer.system).journalFor(writeJournalPluginId) var initialResponseReceived = false + override protected def logSource: Class[_] = classOf[AllPersistenceIdsStage] + override def preStart(): Unit = { journal.tell(LeveldbJournal.SubscribeAllPersistenceIds, getStageActor(journalInteraction).ref) } diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/EventsByPersistenceIdStage.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/EventsByPersistenceIdStage.scala index 82cebafe81..8cc34c5383 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/EventsByPersistenceIdStage.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/EventsByPersistenceIdStage.scala @@ -68,6 +68,8 @@ final private[akka] class EventsByPersistenceIdStage( var nextSequenceNr = fromSequenceNr var toSequenceNr = initialToSequenceNr + override protected def logSource: Class[_] = classOf[EventsByPersistenceIdStage] + override def preStart(): Unit = { stageActorRef = getStageActor(journalInteraction).ref refreshInterval.foreach(fd => { @@ -117,6 +119,7 @@ final private[akka] class EventsByPersistenceIdStage( if (highestSeqNr < toSequenceNr && isCurrentQuery()) { toSequenceNr = highestSeqNr } + log.debug( "Replay complete. From sequenceNr {} currentSequenceNr {} toSequenceNr {} buffer size {}", fromSequenceNr, diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/EventsByTagStage.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/EventsByTagStage.scala index 6d410a3bfb..31abc71104 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/EventsByTagStage.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/journal/leveldb/EventsByTagStage.scala @@ -67,6 +67,8 @@ final private[leveldb] class EventsByTagStage( var replayInProgress = false var outstandingReplay = false + override protected def logSource: Class[_] = classOf[EventsByTagStage] + override def preStart(): Unit = { stageActorRef = getStageActor(journalInteraction).ref refreshInterval.foreach(fd => { diff --git a/akka-persistence/src/main/scala/akka/persistence/Persistence.scala b/akka-persistence/src/main/scala/akka/persistence/Persistence.scala index 2e6246ffe9..59b70f19a4 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Persistence.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Persistence.scala @@ -206,7 +206,7 @@ class Persistence(val system: ExtendedActorSystem) extends Extension { import Persistence._ - private def log: LoggingAdapter = Logging(system, getClass.getName) + private def log: LoggingAdapter = Logging(system, getClass) private val NoSnapshotStorePluginId = "akka.persistence.no-snapshot-store" diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/EventAdapters.scala b/akka-persistence/src/main/scala/akka/persistence/journal/EventAdapters.scala index d42a6d2f7b..d8fb0ad45b 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/EventAdapters.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/EventAdapters.scala @@ -109,7 +109,7 @@ private[akka] object EventAdapters { case (map, (c, s)) => map.put(c, s); map } - new EventAdapters(backing, bindings, system.log) + new EventAdapters(backing, bindings, Logging(system, classOf[EventAdapters])) } def instantiateAdapter(adapterFQN: String, system: ExtendedActorSystem): Try[EventAdapter] = { diff --git a/akka-remote/src/main/scala/akka/remote/Remoting.scala b/akka-remote/src/main/scala/akka/remote/Remoting.scala index fd2d63ebd5..dbabd646d2 100644 --- a/akka-remote/src/main/scala/akka/remote/Remoting.scala +++ b/akka-remote/src/main/scala/akka/remote/Remoting.scala @@ -156,7 +156,7 @@ private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteAc override def localAddressForRemote(remote: Address): Address = Remoting.localAddressForRemote(transportMapping, remote) - val log: LoggingAdapter = Logging(system.eventStream, getClass.getName) + val log: LoggingAdapter = Logging(system.eventStream, getClass) val eventPublisher = new EventPublisher(system, log, RemoteLifecycleEventsLogLevel) private def notifyError(msg: String, cause: Throwable): Unit = diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala index 462b64a962..b1ad6b07fa 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala @@ -308,7 +308,7 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr @volatile private[this] var controlSubject: ControlMessageSubject = _ @volatile private[this] var messageDispatcher: MessageDispatcher = _ - override val log: LoggingAdapter = Logging(system, getClass.getName) + override val log: LoggingAdapter = Logging(system, getClass) /** * Compression tables must be created once, such that inbound lane restarts don't cause dropping of the tables. diff --git a/akka-remote/src/main/scala/akka/remote/artery/Association.scala b/akka-remote/src/main/scala/akka/remote/artery/Association.scala index b9b446b4ac..3ac17f04e0 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Association.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Association.scala @@ -142,7 +142,7 @@ private[remote] class Association( require(remoteAddress.port.nonEmpty) - private val log = Logging(transport.system, getClass.getName) + private val log = Logging(transport.system, getClass) private def flightRecorder = transport.topLevelFlightRecorder override def settings = transport.settings diff --git a/akka-remote/src/main/scala/akka/remote/transport/FailureInjectorTransportAdapter.scala b/akka-remote/src/main/scala/akka/remote/transport/FailureInjectorTransportAdapter.scala index 4900022450..66752b0368 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/FailureInjectorTransportAdapter.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/FailureInjectorTransportAdapter.scala @@ -68,7 +68,7 @@ private[remote] class FailureInjectorTransportAdapter( with AssociationEventListener { private def rng = ThreadLocalRandom.current() - private val log = Logging(extendedSystem, getClass.getName) + private val log = Logging(extendedSystem, getClass) private val shouldDebugLog: Boolean = extendedSystem.settings.config.getBoolean("akka.remote.classic.gremlin.debug") @volatile private var upstreamListener: Option[AssociationEventListener] = None diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorRefSinkStage.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorRefSinkStage.scala index 7502f4b952..4c84d420d1 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorRefSinkStage.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorRefSinkStage.scala @@ -29,6 +29,8 @@ final private[akka] class ActorRefSinkStage[T]( override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with StageLogging { + override protected def logSource: Class[_] = classOf[ActorRefSinkStage[_]] + var completionSignalled = false override def preStart(): Unit = { diff --git a/akka-stream/src/main/scala/akka/stream/impl/FanoutProcessor.scala b/akka-stream/src/main/scala/akka/stream/impl/FanoutProcessor.scala index 2fe67c188e..be7c9d35ef 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FanoutProcessor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FanoutProcessor.scala @@ -158,7 +158,7 @@ import org.reactivestreams.Subscriber primaryInputs.cancel() context.stop(self) case WarnTermination => - context.system.log.warning("Subscription timeout for {}", this) + log.warning("Subscription timeout for {}", this) case NoopTermination => // won't happen } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index 4e3cf951fb..ec628c9509 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -1405,6 +1405,7 @@ private[stream] object Collect { override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler with StageLogging { + override protected def logSource: Class[_] = classOf[Watch[_]] private lazy val self = getStageActor { case (_, Terminated(`targetRef`)) => diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamSource.scala b/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamSource.scala index 2efad288a1..88d826a7d4 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamSource.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamSource.scala @@ -46,6 +46,8 @@ private[akka] final class InputStreamSource(factory: () => InputStream, chunkSiz private var inputStream: InputStream = _ private def isClosed = mat.isCompleted + override protected def logSource: Class[_] = classOf[InputStreamSource] + override def preStart(): Unit = { try { inputStream = factory() diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/OutputStreamGraphStage.scala b/akka-stream/src/main/scala/akka/stream/impl/io/OutputStreamGraphStage.scala index 1cd47d7945..bef5dbfb55 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/OutputStreamGraphStage.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/OutputStreamGraphStage.scala @@ -33,6 +33,9 @@ private[akka] final class OutputStreamGraphStage(factory: () => OutputStream, au val logic = new GraphStageLogicWithLogging(shape) with InHandler { var outputStream: OutputStream = _ var bytesWritten: Long = 0L + + override protected def logSource: Class[_] = classOf[OutputStreamGraphStage] + override def preStart(): Unit = { try { outputStream = factory() diff --git a/akka-stream/src/main/scala/akka/stream/impl/streamref/SinkRefImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/streamref/SinkRefImpl.scala index d9be4f9b81..42dbdb8962 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/streamref/SinkRefImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/streamref/SinkRefImpl.scala @@ -59,6 +59,7 @@ private[stream] final class SinkRefStageImpl[In] private[akka] (val initialPartn eagerMaterializer: Materializer): (GraphStageLogic, SourceRef[In]) = { val logic = new TimerGraphStageLogic(shape) with StageLogging with ActorRefStage with InHandler { + override protected def logSource: Class[_] = classOf[SinkRefStageImpl[_]] private[this] val streamRefsMaster = StreamRefsMaster(eagerMaterializer.system) diff --git a/akka-stream/src/main/scala/akka/stream/impl/streamref/SourceRefImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/streamref/SourceRefImpl.scala index 3738fcd395..bb9140478b 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/streamref/SourceRefImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/streamref/SourceRefImpl.scala @@ -95,6 +95,8 @@ private[stream] final class SourceRefStageImpl[Out](val initialPartnerRef: Optio eagerMaterializer: Materializer): (GraphStageLogic, SinkRef[Out]) = { val logic = new TimerGraphStageLogic(shape) with StageLogging with ActorRefStage with OutHandler { + override protected def logSource: Class[_] = classOf[SourceRefStageImpl[_]] + private[this] val streamRefsMaster = StreamRefsMaster(eagerMaterializer.system) // settings --- diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/RestartFlow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/RestartFlow.scala index a55fc87935..fb88cc77d0 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/RestartFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/RestartFlow.scala @@ -231,6 +231,8 @@ private abstract class RestartWithBackoffLogic[S <: Shape]( // don't want to restart the sub inlet when it finishes, we just finish normally. var finishing = false + override protected def logSource: Class[_] = classOf[RestartWithBackoffLogic[_]] + protected def startGraph(): Unit protected def backoff(): Unit @@ -385,6 +387,7 @@ object RestartWithBackoffFlow { override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) with InHandler with OutHandler with StageLogging { + override protected def logSource: Class[_] = classOf[DelayCancellationStage[_]] private var cause: OptionVal[Throwable] = OptionVal.None diff --git a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala index dafa5d8737..d9ba48f1c7 100644 --- a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala +++ b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala @@ -153,7 +153,7 @@ object CallingThreadDispatcher { class CallingThreadDispatcher(_configurator: MessageDispatcherConfigurator) extends MessageDispatcher(_configurator) { import CallingThreadDispatcher._ - val log = akka.event.Logging(eventStream, getClass.getName) + val log = akka.event.Logging(eventStream, getClass) override def id: String = Id