Improve logging in remoting, see #3177
* Supress TimeoutReason logging * Add logTermination in FSM * Improve some error messages, incl making them unique * Cookie only logged if debug enabled
This commit is contained in:
parent
7c1ab68263
commit
1dbe65f53e
8 changed files with 43 additions and 22 deletions
|
|
@ -646,11 +646,7 @@ trait FSM[S, D] extends Actor with Listeners with ActorLogging {
|
|||
private def terminate(nextState: State): Unit = {
|
||||
if (currentState.stopReason.isEmpty) {
|
||||
val reason = nextState.stopReason.get
|
||||
reason match {
|
||||
case Failure(ex: Throwable) ⇒ log.error(ex, "terminating due to Failure")
|
||||
case Failure(msg: AnyRef) ⇒ log.error(msg.toString)
|
||||
case _ ⇒
|
||||
}
|
||||
logTermination(reason)
|
||||
for (timer ← timers.values) timer.cancel()
|
||||
timers.clear()
|
||||
currentState = nextState
|
||||
|
|
@ -661,6 +657,16 @@ trait FSM[S, D] extends Actor with Listeners with ActorLogging {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* By default [[Failure]] is logged at error level and other reason
|
||||
* types are not logged. It is possible to override this behavior.
|
||||
*/
|
||||
protected def logTermination(reason: Reason): Unit = reason match {
|
||||
case Failure(ex: Throwable) ⇒ log.error(ex, "terminating due to Failure")
|
||||
case Failure(msg: AnyRef) ⇒ log.error(msg.toString)
|
||||
case _ ⇒
|
||||
}
|
||||
|
||||
/**
|
||||
* All messages sent to the [[akka.actor.FSM]] will be wrapped inside an
|
||||
* `Event`, which allows pattern matching to extract both state and data.
|
||||
|
|
|
|||
|
|
@ -82,11 +82,11 @@ private[remote] class DefaultMessageDispatcher(private val system: ExtendedActor
|
|||
// if it was originally addressed to us but is in fact remote from our point of view (i.e. remote-deployed)
|
||||
r.!(payload)(sender)
|
||||
else
|
||||
log.error("dropping message {} for non-local recipient {} arriving at {} inbound addresses are {}",
|
||||
payloadClass, r, recipientAddress, provider.transport.addresses)
|
||||
log.error("dropping message [{}] for non-local recipient [{}] arriving at [{}] inbound addresses are [{}]",
|
||||
payloadClass, r, recipientAddress, provider.transport.addresses.mkString(", "))
|
||||
|
||||
case r ⇒ log.error("dropping message {} for unknown recipient {} arriving at {} inbound addresses are {}",
|
||||
payloadClass, r, recipientAddress, provider.transport.addresses)
|
||||
case r ⇒ log.error("dropping message [{}] for unknown recipient [{}] arriving at [{}] inbound addresses are [{}]",
|
||||
payloadClass, r, recipientAddress, provider.transport.addresses.mkString(", "))
|
||||
|
||||
}
|
||||
}
|
||||
|
|
@ -592,8 +592,8 @@ private[remote] class EndpointWriter(
|
|||
Serialization.currentTransportInformation.withValue(Serialization.Information(h.localAddress, extendedSystem)) {
|
||||
(MessageSerializer.serialize(extendedSystem, msg.asInstanceOf[AnyRef]))
|
||||
}
|
||||
case None ⇒ throw new EndpointException("Internal error: No handle was present during serialization of" +
|
||||
"outbound message.")
|
||||
case None ⇒
|
||||
throw new EndpointException("Internal error: No handle was present during serialization of outbound message.")
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -26,7 +26,7 @@ private[akka] class RemoteDeployer(_settings: ActorSystem.Settings, _pm: Dynamic
|
|||
case d @ Some(deploy) ⇒
|
||||
deploy.config.getString("remote") match {
|
||||
case AddressFromURIString(r) ⇒ Some(deploy.copy(scope = RemoteScope(r)))
|
||||
case str if !str.isEmpty ⇒ throw new ConfigurationException("unparseable remote node name " + str)
|
||||
case str if !str.isEmpty ⇒ throw new ConfigurationException(s"unparseable remote node name [${str}]")
|
||||
case _ ⇒
|
||||
val nodes = immutableSeq(deploy.config.getStringList("target.nodes")).map(AddressFromURIString(_))
|
||||
if (nodes.isEmpty || deploy.routerConfig == NoRouter) d
|
||||
|
|
|
|||
|
|
@ -208,7 +208,7 @@ private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteAc
|
|||
override def quarantine(remoteAddress: Address, uid: Int): Unit = endpointManager match {
|
||||
case Some(manager) ⇒ manager ! Quarantine(remoteAddress, uid)
|
||||
case _ ⇒ throw new RemoteTransportExceptionNoStackTrace(
|
||||
s"Attempted to quarantine addres [$remoteAddress] with uid [$uid] but Remoting is not running", null)
|
||||
s"Attempted to quarantine address [$remoteAddress] with uid [$uid] but Remoting is not running", null)
|
||||
}
|
||||
|
||||
// Not used anywhere only to keep compatibility with RemoteTransport interface
|
||||
|
|
|
|||
|
|
@ -26,13 +26,13 @@ class TransportAdapters(system: ExtendedActorSystem) extends Extension {
|
|||
|
||||
private val adaptersTable: Map[String, TransportAdapterProvider] = for ((name, fqn) ← settings.Adapters) yield {
|
||||
name -> system.dynamicAccess.createInstanceFor[TransportAdapterProvider](fqn, immutable.Seq.empty).recover({
|
||||
case exception ⇒ throw new IllegalArgumentException("Cannot instantiate transport adapter" + fqn, exception)
|
||||
case e ⇒ throw new IllegalArgumentException(s"Cannot instantiate transport adapter [${fqn}]", e)
|
||||
}).get
|
||||
}
|
||||
|
||||
def getAdapterProvider(name: String): TransportAdapterProvider = adaptersTable.get(name) match {
|
||||
case Some(provider) ⇒ provider
|
||||
case None ⇒ throw new IllegalArgumentException("There is no registered transport adapter provider with name: " + name)
|
||||
case None ⇒ throw new IllegalArgumentException(s"There is no registered transport adapter provider with name: [${name}]")
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -211,7 +211,8 @@ private[remote] object AkkaPduProtobufCodec extends AkkaPduCodec {
|
|||
cookie))
|
||||
case CommandType.SHUTDOWN ⇒ Disassociate
|
||||
case CommandType.HEARTBEAT ⇒ Heartbeat
|
||||
case _ ⇒ throw new PduCodecException("Decoding of control PDU failed: format invalid", null)
|
||||
case x ⇒
|
||||
throw new PduCodecException(s"Decoding of control PDU failed, invalid format, unexpected: [${x}]", null)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -328,8 +328,11 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat
|
|||
wrappedHandle,
|
||||
immutable.Queue.empty)
|
||||
} else {
|
||||
if (log.isDebugEnabled)
|
||||
log.warning(s"Association attempt with mismatching cookie from [{}]. Expected [{}] but received [{}].",
|
||||
info.origin, localHandshakeInfo.cookie.getOrElse(""), info.cookie.getOrElse(""))
|
||||
else
|
||||
log.warning(s"Association attempt with mismatching cookie from [{}].", info.origin)
|
||||
stop()
|
||||
}
|
||||
|
||||
|
|
@ -362,7 +365,7 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat
|
|||
listener notify InboundPayload(payload)
|
||||
stay()
|
||||
case msg ⇒
|
||||
throw new AkkaProtocolException("unhandled message in Open state with type " + (if (msg ne null) msg.getClass else "null"))
|
||||
throw new AkkaProtocolException(s"unhandled message in state Open(InboundPayload) with type [${safeClassName(msg)}]")
|
||||
}
|
||||
|
||||
case _ ⇒ stay()
|
||||
|
|
@ -376,7 +379,7 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat
|
|||
case ListenerReady(_, wrappedHandle) ⇒ wrappedHandle
|
||||
case AssociatedWaitHandler(_, wrappedHandle, _) ⇒ wrappedHandle
|
||||
case msg ⇒
|
||||
throw new AkkaProtocolException("unhandled message in Open state with type " + (if (msg ne null) msg.getClass else "null"))
|
||||
throw new AkkaProtocolException(s"unhandled message in state Open(DisassociateUnderlying) with type [${safeClassName(msg)}]")
|
||||
}
|
||||
sendDisassociate(handle)
|
||||
stop()
|
||||
|
|
@ -401,6 +404,11 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat
|
|||
}
|
||||
}
|
||||
|
||||
private def safeClassName(obj: AnyRef): String = obj match {
|
||||
case null ⇒ "null"
|
||||
case _ ⇒ obj.getClass.getName
|
||||
}
|
||||
|
||||
override def postStop(): Unit = {
|
||||
cancelTimer("heartbeat-timer")
|
||||
super.postStop() // Pass to onTermination
|
||||
|
|
@ -428,6 +436,12 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat
|
|||
|
||||
case StopEvent(_, _, InboundUnassociated(_, wrappedHandle)) ⇒
|
||||
wrappedHandle.disassociate()
|
||||
|
||||
}
|
||||
|
||||
override protected def logTermination(reason: FSM.Reason): Unit = reason match {
|
||||
case FSM.Failure(TimeoutReason) ⇒ // no logging
|
||||
case other ⇒ super.logTermination(reason)
|
||||
}
|
||||
|
||||
private def listenForListenerRegistration(readHandlerPromise: Promise[HandleEventListener]): Unit =
|
||||
|
|
|
|||
|
|
@ -181,7 +181,7 @@ private[netty] abstract class ServerHandler(protected final val transport: Netty
|
|||
case listener: AssociationEventListener ⇒
|
||||
val remoteAddress = NettyTransport.addressFromSocketAddress(remoteSocketAddress, transport.schemeIdentifier,
|
||||
transport.system.name, hostName = None).getOrElse(
|
||||
throw new NettyTransportException(s"Unknown remote address type [${remoteSocketAddress.getClass.getName}]"))
|
||||
throw new NettyTransportException(s"Unknown inbound remote address type [${remoteSocketAddress.getClass.getName}]"))
|
||||
init(channel, remoteSocketAddress, remoteAddress, msg) { listener notify InboundAssociation(_) }
|
||||
}
|
||||
}
|
||||
|
|
@ -413,7 +413,7 @@ class NettyTransport(val settings: NettyTransportSettings, val system: ExtendedA
|
|||
case listener ⇒ udpConnectionTable.put(addr, listener)
|
||||
}
|
||||
handle
|
||||
case unknown ⇒ throw new NettyTransportException(s"Unknown remote address type [${unknown.getClass.getName}]")
|
||||
case unknown ⇒ throw new NettyTransportException(s"Unknown outbound remote address type [${unknown.getClass.getName}]")
|
||||
}
|
||||
}
|
||||
else
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue