+rem #3504: Toned down error logging and made loglevel configurable
This commit is contained in:
parent
cdea2af973
commit
132c30d6cf
6 changed files with 54 additions and 32 deletions
|
|
@ -8,7 +8,7 @@ import akka.actor.SupervisorStrategy._
|
|||
import akka.actor.Terminated
|
||||
import akka.actor._
|
||||
import akka.dispatch.sysmsg.SystemMessage
|
||||
import akka.event.LoggingAdapter
|
||||
import akka.event.{ Logging, LoggingAdapter }
|
||||
import akka.pattern.pipe
|
||||
import akka.remote.EndpointManager.{ ResendState, Link, Send }
|
||||
import akka.remote.EndpointWriter.{ StoppedReading, FlushAndStop }
|
||||
|
|
@ -352,13 +352,16 @@ private[remote] abstract class EndpointActor(
|
|||
|
||||
def inbound: Boolean
|
||||
|
||||
val eventPublisher = new EventPublisher(context.system, log, settings.LogRemoteLifecycleEvents)
|
||||
val eventPublisher = new EventPublisher(context.system, log, settings.RemoteLifecycleEventsLogLevel)
|
||||
|
||||
def publishError(reason: Throwable): Unit = {
|
||||
try
|
||||
eventPublisher.notifyListeners(AssociationErrorEvent(reason, localAddress, remoteAddress, inbound))
|
||||
catch { case NonFatal(e) ⇒ log.error(e, "Unable to publish error event to EventStream.") }
|
||||
}
|
||||
def publishError(reason: Throwable, logLevel: Logging.LogLevel): Unit =
|
||||
tryPublish(AssociationErrorEvent(reason, localAddress, remoteAddress, inbound, logLevel))
|
||||
|
||||
def publishDisassociated(): Unit = tryPublish(DisassociatedEvent(localAddress, remoteAddress, inbound))
|
||||
|
||||
private def tryPublish(ev: AssociationEvent): Unit = try
|
||||
eventPublisher.notifyListeners(ev)
|
||||
catch { case NonFatal(e) ⇒ log.error(e, "Unable to publish error event to EventStream.") }
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -433,7 +436,9 @@ private[remote] class EndpointWriter(
|
|||
|
||||
var lastAck: Option[Ack] = None
|
||||
|
||||
override val supervisorStrategy = OneForOneStrategy() { case NonFatal(e) ⇒ publishAndThrow(e) }
|
||||
override val supervisorStrategy = OneForOneStrategy(loggingEnabled = false) {
|
||||
case NonFatal(e) ⇒ publishAndThrow(e, Logging.ErrorLevel)
|
||||
}
|
||||
|
||||
val provider = RARP(extendedSystem).provider
|
||||
val msgDispatch = new DefaultMessageDispatcher(extendedSystem, provider, log)
|
||||
|
|
@ -441,8 +446,11 @@ private[remote] class EndpointWriter(
|
|||
var inbound = handle.isDefined
|
||||
var stopReason: DisassociateInfo = AssociationHandle.Unknown
|
||||
|
||||
private def publishAndThrow(reason: Throwable): Nothing = {
|
||||
publishError(reason)
|
||||
private def publishAndThrow(reason: Throwable, logLevel: Logging.LogLevel): Nothing = {
|
||||
reason match {
|
||||
case _: EndpointDisassociatedException ⇒ publishDisassociated()
|
||||
case _ ⇒ publishError(reason, logLevel)
|
||||
}
|
||||
throw reason
|
||||
}
|
||||
|
||||
|
|
@ -478,9 +486,9 @@ private[remote] class EndpointWriter(
|
|||
stash()
|
||||
stay()
|
||||
case Event(Status.Failure(e: InvalidAssociationException), _) ⇒
|
||||
publishAndThrow(new InvalidAssociation(localAddress, remoteAddress, e))
|
||||
publishAndThrow(new InvalidAssociation(localAddress, remoteAddress, e), Logging.WarningLevel)
|
||||
case Event(Status.Failure(e), _) ⇒
|
||||
publishAndThrow(new EndpointAssociationException(s"Association failed with [$remoteAddress]", e))
|
||||
publishAndThrow(new EndpointAssociationException(s"Association failed with [$remoteAddress]", e), Logging.DebugLevel)
|
||||
case Event(inboundHandle: AkkaProtocolHandle, _) ⇒
|
||||
// Assert handle == None?
|
||||
context.parent ! ReliableDeliverySupervisor.GotUid(inboundHandle.handshakeInfo.uid)
|
||||
|
|
@ -537,9 +545,12 @@ private[remote] class EndpointWriter(
|
|||
throw new EndpointException("Internal error: Endpoint is in state Writing, but no association handle is present.")
|
||||
}
|
||||
} catch {
|
||||
case e: NotSerializableException ⇒ logAndStay(e)
|
||||
case e: EndpointException ⇒ publishAndThrow(e)
|
||||
case NonFatal(e) ⇒ publishAndThrow(new EndpointException("Failed to write message to the transport", e))
|
||||
case e: NotSerializableException ⇒
|
||||
logAndStay(e)
|
||||
case e: EndpointException ⇒
|
||||
publishAndThrow(e, Logging.ErrorLevel)
|
||||
case NonFatal(e) ⇒
|
||||
publishAndThrow(new EndpointException("Failed to write message to the transport", e), Logging.ErrorLevel)
|
||||
}
|
||||
|
||||
// We are in Writing state, so stash is empty, safe to stop here
|
||||
|
|
@ -568,7 +579,7 @@ private[remote] class EndpointWriter(
|
|||
|
||||
whenUnhandled {
|
||||
case Event(Terminated(r), _) if r == reader.orNull ⇒
|
||||
publishAndThrow(new EndpointDisassociatedException("Disassociated"))
|
||||
publishAndThrow(new EndpointDisassociatedException("Disassociated"), Logging.DebugLevel)
|
||||
case Event(s: StopReading, _) ⇒
|
||||
reader match {
|
||||
case Some(r) ⇒ r forward s
|
||||
|
|
|
|||
|
|
@ -11,6 +11,9 @@ import scala.collection.immutable
|
|||
import akka.util.Helpers.Requiring
|
||||
import akka.japi.Util._
|
||||
import akka.actor.Props
|
||||
import akka.event.Logging
|
||||
import akka.event.Logging.LogLevel
|
||||
import akka.ConfigurationException
|
||||
|
||||
final class RemoteSettings(val config: Config) {
|
||||
import config._
|
||||
|
|
@ -22,7 +25,16 @@ final class RemoteSettings(val config: Config) {
|
|||
|
||||
val UntrustedMode: Boolean = getBoolean("akka.remote.untrusted-mode")
|
||||
|
||||
val LogRemoteLifecycleEvents: Boolean = getBoolean("akka.remote.log-remote-lifecycle-events")
|
||||
val RemoteLifecycleEventsLogLevel: LogLevel = getString("akka.remote.log-remote-lifecycle-events").toLowerCase() match {
|
||||
case "on" ⇒ Logging.DebugLevel
|
||||
case other ⇒ Logging.levelFor(other) match {
|
||||
case Some(level) ⇒ level
|
||||
case None ⇒ throw new ConfigurationException("Logging level must be one of (on, off, debug, info, warning, error)")
|
||||
}
|
||||
}
|
||||
|
||||
@deprecated("Use the RemoteLifecycleEventsLogLevel field instead.")
|
||||
def LogRemoteLifecycleEvents: Boolean = RemoteLifecycleEventsLogLevel >= Logging.ErrorLevel
|
||||
|
||||
val Dispatcher: String = getString("akka.remote.use-dispatcher")
|
||||
|
||||
|
|
|
|||
|
|
@ -6,7 +6,7 @@ package akka.remote
|
|||
|
||||
import akka.AkkaException
|
||||
import akka.actor._
|
||||
import akka.event.LoggingAdapter
|
||||
import akka.event.{ Logging, LoggingAdapter }
|
||||
import scala.collection.immutable
|
||||
import scala.concurrent.Future
|
||||
import scala.util.control.NoStackTrace
|
||||
|
|
|
|||
|
|
@ -123,7 +123,7 @@ private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteAc
|
|||
override def localAddressForRemote(remote: Address): Address = Remoting.localAddressForRemote(transportMapping, remote)
|
||||
|
||||
val log: LoggingAdapter = Logging(system.eventStream, "Remoting")
|
||||
val eventPublisher = new EventPublisher(system, log, LogRemoteLifecycleEvents)
|
||||
val eventPublisher = new EventPublisher(system, log, RemoteLifecycleEventsLogLevel)
|
||||
|
||||
private def notifyError(msg: String, cause: Throwable): Unit =
|
||||
eventPublisher.notifyListeners(RemotingErrorEvent(new RemoteTransportException(msg, cause)))
|
||||
|
|
@ -370,7 +370,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
|
|||
val extendedSystem = context.system.asInstanceOf[ExtendedActorSystem]
|
||||
val endpointId: Iterator[Int] = Iterator from 0
|
||||
|
||||
val eventPublisher = new EventPublisher(context.system, log, settings.LogRemoteLifecycleEvents)
|
||||
val eventPublisher = new EventPublisher(context.system, log, settings.RemoteLifecycleEventsLogLevel)
|
||||
|
||||
// Mapping between addresses and endpoint actors. If passive connections are turned off, incoming connections
|
||||
// will be not part of this map!
|
||||
|
|
@ -389,7 +389,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
|
|||
override val supervisorStrategy =
|
||||
OneForOneStrategy(loggingEnabled = false) {
|
||||
case InvalidAssociation(localAddress, remoteAddress, _) ⇒
|
||||
log.error("Tried to associate with unreachable remote address [{}]. " +
|
||||
log.warning("Tried to associate with unreachable remote address [{}]. " +
|
||||
"Address is now quarantined, all messages to this address will be delivered to dead letters.", remoteAddress)
|
||||
endpoints.markAsFailed(sender, Deadline.now + settings.UnknownAddressGateClosedFor)
|
||||
context.system.eventStream.publish(AddressTerminated(remoteAddress))
|
||||
|
|
@ -398,7 +398,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
|
|||
case HopelessAssociation(localAddress, remoteAddress, Some(uid), _) ⇒
|
||||
settings.QuarantineDuration match {
|
||||
case d: FiniteDuration ⇒
|
||||
log.error("Association to [{}] having UID [{}] is irrecoverably failed. UID is now quarantined and all " +
|
||||
log.warning("Association to [{}] having UID [{}] is irrecoverably failed. UID is now quarantined and all " +
|
||||
"messages to this UID will be delivered to dead letters. Remote actorsystem must be restarted to recover " +
|
||||
"from this situation.", remoteAddress, uid)
|
||||
endpoints.markAsQuarantined(remoteAddress, uid, Deadline.now + d)
|
||||
|
|
@ -410,7 +410,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
|
|||
case HopelessAssociation(localAddress, remoteAddress, None, _) ⇒
|
||||
settings.QuarantineDuration match {
|
||||
case d: FiniteDuration ⇒
|
||||
log.error("Association to [{}] with unknown UID is irrecoverably failed. " +
|
||||
log.warning("Association to [{}] with unknown UID is irrecoverably failed. " +
|
||||
"Address is now quarantined, all messages to this address will be delivered to dead letters.", remoteAddress)
|
||||
endpoints.markAsFailed(sender, Deadline.now + d)
|
||||
case _ ⇒
|
||||
|
|
|
|||
|
|
@ -50,9 +50,9 @@ final case class AssociationErrorEvent(
|
|||
cause: Throwable,
|
||||
localAddress: Address,
|
||||
remoteAddress: Address,
|
||||
inbound: Boolean) extends AssociationEvent {
|
||||
inbound: Boolean,
|
||||
logLevel: Logging.LogLevel) extends AssociationEvent {
|
||||
protected override def eventName: String = "AssociationError"
|
||||
override def logLevel: Logging.LogLevel = Logging.ErrorLevel
|
||||
override def toString: String = s"${super.toString}: Error [${cause.getMessage}] [${Logging.stackTraceFor(cause)}]"
|
||||
def getCause: Throwable = cause
|
||||
}
|
||||
|
|
@ -81,9 +81,9 @@ final case class RemotingErrorEvent(cause: Throwable) extends RemotingLifecycleE
|
|||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[remote] class EventPublisher(system: ActorSystem, log: LoggingAdapter, logEvents: Boolean) {
|
||||
private[remote] class EventPublisher(system: ActorSystem, log: LoggingAdapter, logLevel: Logging.LogLevel) {
|
||||
def notifyListeners(message: RemotingLifecycleEvent): Unit = {
|
||||
system.eventStream.publish(message)
|
||||
if (logEvents) log.log(message.logLevel, "{}", message)
|
||||
if (logLevel <= message.logLevel) log.log(message.logLevel, "{}", message)
|
||||
}
|
||||
}
|
||||
|
|
@ -204,11 +204,10 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D
|
|||
expectMsg(("pong", testActor))
|
||||
}
|
||||
|
||||
"send error message for wrong address" in {
|
||||
filterEvents(EventFilter.error(start = "AssociationError", occurrences = 1),
|
||||
EventFilter.error(pattern = "Address is now quarantined", occurrences = 1)) {
|
||||
system.actorFor("akka.test://nonexistingsystem@localhost:12346/user/echo") ! "ping"
|
||||
}
|
||||
"send warning message for wrong address" in {
|
||||
filterEvents(EventFilter.warning(pattern = "Address is now quarantined", occurrences = 1)) {
|
||||
system.actorFor("akka.test://nonexistingsystem@localhost:12346/user/echo") ! "ping"
|
||||
}
|
||||
}
|
||||
|
||||
"support ask" in {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue