diff --git a/akka-remote/src/main/scala/akka/remote/Endpoint.scala b/akka-remote/src/main/scala/akka/remote/Endpoint.scala index bee68f4cd1..0a4b9291c6 100644 --- a/akka-remote/src/main/scala/akka/remote/Endpoint.scala +++ b/akka-remote/src/main/scala/akka/remote/Endpoint.scala @@ -8,7 +8,7 @@ import akka.actor.SupervisorStrategy._ import akka.actor.Terminated import akka.actor._ import akka.dispatch.sysmsg.SystemMessage -import akka.event.LoggingAdapter +import akka.event.{ Logging, LoggingAdapter } import akka.pattern.pipe import akka.remote.EndpointManager.{ ResendState, Link, Send } import akka.remote.EndpointWriter.{ StoppedReading, FlushAndStop } @@ -352,13 +352,16 @@ private[remote] abstract class EndpointActor( def inbound: Boolean - val eventPublisher = new EventPublisher(context.system, log, settings.LogRemoteLifecycleEvents) + val eventPublisher = new EventPublisher(context.system, log, settings.RemoteLifecycleEventsLogLevel) - def publishError(reason: Throwable): Unit = { - try - eventPublisher.notifyListeners(AssociationErrorEvent(reason, localAddress, remoteAddress, inbound)) - catch { case NonFatal(e) ⇒ log.error(e, "Unable to publish error event to EventStream.") } - } + def publishError(reason: Throwable, logLevel: Logging.LogLevel): Unit = + tryPublish(AssociationErrorEvent(reason, localAddress, remoteAddress, inbound, logLevel)) + + def publishDisassociated(): Unit = tryPublish(DisassociatedEvent(localAddress, remoteAddress, inbound)) + + private def tryPublish(ev: AssociationEvent): Unit = try + eventPublisher.notifyListeners(ev) + catch { case NonFatal(e) ⇒ log.error(e, "Unable to publish error event to EventStream.") } } /** @@ -433,7 +436,9 @@ private[remote] class EndpointWriter( var lastAck: Option[Ack] = None - override val supervisorStrategy = OneForOneStrategy() { case NonFatal(e) ⇒ publishAndThrow(e) } + override val supervisorStrategy = OneForOneStrategy(loggingEnabled = false) { + case NonFatal(e) ⇒ publishAndThrow(e, Logging.ErrorLevel) + } val provider = RARP(extendedSystem).provider val msgDispatch = new DefaultMessageDispatcher(extendedSystem, provider, log) @@ -441,8 +446,11 @@ private[remote] class EndpointWriter( var inbound = handle.isDefined var stopReason: DisassociateInfo = AssociationHandle.Unknown - private def publishAndThrow(reason: Throwable): Nothing = { - publishError(reason) + private def publishAndThrow(reason: Throwable, logLevel: Logging.LogLevel): Nothing = { + reason match { + case _: EndpointDisassociatedException ⇒ publishDisassociated() + case _ ⇒ publishError(reason, logLevel) + } throw reason } @@ -478,9 +486,9 @@ private[remote] class EndpointWriter( stash() stay() case Event(Status.Failure(e: InvalidAssociationException), _) ⇒ - publishAndThrow(new InvalidAssociation(localAddress, remoteAddress, e)) + publishAndThrow(new InvalidAssociation(localAddress, remoteAddress, e), Logging.WarningLevel) case Event(Status.Failure(e), _) ⇒ - publishAndThrow(new EndpointAssociationException(s"Association failed with [$remoteAddress]", e)) + publishAndThrow(new EndpointAssociationException(s"Association failed with [$remoteAddress]", e), Logging.DebugLevel) case Event(inboundHandle: AkkaProtocolHandle, _) ⇒ // Assert handle == None? context.parent ! ReliableDeliverySupervisor.GotUid(inboundHandle.handshakeInfo.uid) @@ -537,9 +545,12 @@ private[remote] class EndpointWriter( throw new EndpointException("Internal error: Endpoint is in state Writing, but no association handle is present.") } } catch { - case e: NotSerializableException ⇒ logAndStay(e) - case e: EndpointException ⇒ publishAndThrow(e) - case NonFatal(e) ⇒ publishAndThrow(new EndpointException("Failed to write message to the transport", e)) + case e: NotSerializableException ⇒ + logAndStay(e) + case e: EndpointException ⇒ + publishAndThrow(e, Logging.ErrorLevel) + case NonFatal(e) ⇒ + publishAndThrow(new EndpointException("Failed to write message to the transport", e), Logging.ErrorLevel) } // We are in Writing state, so stash is empty, safe to stop here @@ -568,7 +579,7 @@ private[remote] class EndpointWriter( whenUnhandled { case Event(Terminated(r), _) if r == reader.orNull ⇒ - publishAndThrow(new EndpointDisassociatedException("Disassociated")) + publishAndThrow(new EndpointDisassociatedException("Disassociated"), Logging.DebugLevel) case Event(s: StopReading, _) ⇒ reader match { case Some(r) ⇒ r forward s diff --git a/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala b/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala index 2a91568df1..0529c94842 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala @@ -11,6 +11,9 @@ import scala.collection.immutable import akka.util.Helpers.Requiring import akka.japi.Util._ import akka.actor.Props +import akka.event.Logging +import akka.event.Logging.LogLevel +import akka.ConfigurationException final class RemoteSettings(val config: Config) { import config._ @@ -22,7 +25,16 @@ final class RemoteSettings(val config: Config) { val UntrustedMode: Boolean = getBoolean("akka.remote.untrusted-mode") - val LogRemoteLifecycleEvents: Boolean = getBoolean("akka.remote.log-remote-lifecycle-events") + val RemoteLifecycleEventsLogLevel: LogLevel = getString("akka.remote.log-remote-lifecycle-events").toLowerCase() match { + case "on" ⇒ Logging.DebugLevel + case other ⇒ Logging.levelFor(other) match { + case Some(level) ⇒ level + case None ⇒ throw new ConfigurationException("Logging level must be one of (on, off, debug, info, warning, error)") + } + } + + @deprecated("Use the RemoteLifecycleEventsLogLevel field instead.") + def LogRemoteLifecycleEvents: Boolean = RemoteLifecycleEventsLogLevel >= Logging.ErrorLevel val Dispatcher: String = getString("akka.remote.use-dispatcher") diff --git a/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala b/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala index b89c36996b..2b7c197eb9 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala @@ -6,7 +6,7 @@ package akka.remote import akka.AkkaException import akka.actor._ -import akka.event.LoggingAdapter +import akka.event.{ Logging, LoggingAdapter } import scala.collection.immutable import scala.concurrent.Future import scala.util.control.NoStackTrace diff --git a/akka-remote/src/main/scala/akka/remote/Remoting.scala b/akka-remote/src/main/scala/akka/remote/Remoting.scala index b6836f278b..ab2ce17756 100644 --- a/akka-remote/src/main/scala/akka/remote/Remoting.scala +++ b/akka-remote/src/main/scala/akka/remote/Remoting.scala @@ -123,7 +123,7 @@ private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteAc override def localAddressForRemote(remote: Address): Address = Remoting.localAddressForRemote(transportMapping, remote) val log: LoggingAdapter = Logging(system.eventStream, "Remoting") - val eventPublisher = new EventPublisher(system, log, LogRemoteLifecycleEvents) + val eventPublisher = new EventPublisher(system, log, RemoteLifecycleEventsLogLevel) private def notifyError(msg: String, cause: Throwable): Unit = eventPublisher.notifyListeners(RemotingErrorEvent(new RemoteTransportException(msg, cause))) @@ -370,7 +370,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends val extendedSystem = context.system.asInstanceOf[ExtendedActorSystem] val endpointId: Iterator[Int] = Iterator from 0 - val eventPublisher = new EventPublisher(context.system, log, settings.LogRemoteLifecycleEvents) + val eventPublisher = new EventPublisher(context.system, log, settings.RemoteLifecycleEventsLogLevel) // Mapping between addresses and endpoint actors. If passive connections are turned off, incoming connections // will be not part of this map! @@ -389,7 +389,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends override val supervisorStrategy = OneForOneStrategy(loggingEnabled = false) { case InvalidAssociation(localAddress, remoteAddress, _) ⇒ - log.error("Tried to associate with unreachable remote address [{}]. " + + log.warning("Tried to associate with unreachable remote address [{}]. " + "Address is now quarantined, all messages to this address will be delivered to dead letters.", remoteAddress) endpoints.markAsFailed(sender, Deadline.now + settings.UnknownAddressGateClosedFor) context.system.eventStream.publish(AddressTerminated(remoteAddress)) @@ -398,7 +398,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends case HopelessAssociation(localAddress, remoteAddress, Some(uid), _) ⇒ settings.QuarantineDuration match { case d: FiniteDuration ⇒ - log.error("Association to [{}] having UID [{}] is irrecoverably failed. UID is now quarantined and all " + + log.warning("Association to [{}] having UID [{}] is irrecoverably failed. UID is now quarantined and all " + "messages to this UID will be delivered to dead letters. Remote actorsystem must be restarted to recover " + "from this situation.", remoteAddress, uid) endpoints.markAsQuarantined(remoteAddress, uid, Deadline.now + d) @@ -410,7 +410,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends case HopelessAssociation(localAddress, remoteAddress, None, _) ⇒ settings.QuarantineDuration match { case d: FiniteDuration ⇒ - log.error("Association to [{}] with unknown UID is irrecoverably failed. " + + log.warning("Association to [{}] with unknown UID is irrecoverably failed. " + "Address is now quarantined, all messages to this address will be delivered to dead letters.", remoteAddress) endpoints.markAsFailed(sender, Deadline.now + d) case _ ⇒ diff --git a/akka-remote/src/main/scala/akka/remote/RemotingLifecycleEvent.scala b/akka-remote/src/main/scala/akka/remote/RemotingLifecycleEvent.scala index 42c788e5d7..6ed910434f 100644 --- a/akka-remote/src/main/scala/akka/remote/RemotingLifecycleEvent.scala +++ b/akka-remote/src/main/scala/akka/remote/RemotingLifecycleEvent.scala @@ -50,9 +50,9 @@ final case class AssociationErrorEvent( cause: Throwable, localAddress: Address, remoteAddress: Address, - inbound: Boolean) extends AssociationEvent { + inbound: Boolean, + logLevel: Logging.LogLevel) extends AssociationEvent { protected override def eventName: String = "AssociationError" - override def logLevel: Logging.LogLevel = Logging.ErrorLevel override def toString: String = s"${super.toString}: Error [${cause.getMessage}] [${Logging.stackTraceFor(cause)}]" def getCause: Throwable = cause } @@ -81,9 +81,9 @@ final case class RemotingErrorEvent(cause: Throwable) extends RemotingLifecycleE /** * INTERNAL API */ -private[remote] class EventPublisher(system: ActorSystem, log: LoggingAdapter, logEvents: Boolean) { +private[remote] class EventPublisher(system: ActorSystem, log: LoggingAdapter, logLevel: Logging.LogLevel) { def notifyListeners(message: RemotingLifecycleEvent): Unit = { system.eventStream.publish(message) - if (logEvents) log.log(message.logLevel, "{}", message) + if (logLevel <= message.logLevel) log.log(message.logLevel, "{}", message) } } \ No newline at end of file diff --git a/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala b/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala index 4042a0d166..ed10fb8c5c 100644 --- a/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala @@ -204,11 +204,10 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D expectMsg(("pong", testActor)) } - "send error message for wrong address" in { - filterEvents(EventFilter.error(start = "AssociationError", occurrences = 1), - EventFilter.error(pattern = "Address is now quarantined", occurrences = 1)) { - system.actorFor("akka.test://nonexistingsystem@localhost:12346/user/echo") ! "ping" - } + "send warning message for wrong address" in { + filterEvents(EventFilter.warning(pattern = "Address is now quarantined", occurrences = 1)) { + system.actorFor("akka.test://nonexistingsystem@localhost:12346/user/echo") ! "ping" + } } "support ask" in {