=rem #23023 log reasons for disassociations with debug level

This commit is contained in:
Johannes Rudolph 2017-05-30 10:03:10 +02:00
parent f376d2c6c7
commit cf99cf13b6
No known key found for this signature in database
GPG key ID: 52AF1C9ABD77E6E5
8 changed files with 93 additions and 32 deletions

View file

@ -842,7 +842,7 @@ private[remote] class EndpointWriter(
}
case TakeOver(newHandle, replyTo)
// Shutdown old reader
handle foreach { _.disassociate() }
handle foreach { _.disassociate("the association was replaced by a new one", log) }
handle = Some(newHandle)
replyTo ! TookOver(self, newHandle)
context.become(handoff)

View file

@ -631,7 +631,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
val drop = matchesQuarantine(pendingHandle)
// Side-effecting here
if (drop) {
pendingHandle.disassociate()
pendingHandle.disassociate("the pending handle was quarantined", log)
context.stop(pendingActor)
}
!drop
@ -643,7 +643,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
writer associations.filter { assoc
val handle = assoc.association.asInstanceOf[AkkaProtocolHandle]
val drop = matchesQuarantine(handle)
if (drop) handle.disassociate()
if (drop) handle.disassociate("the stashed inbound handle was quarantined", log)
!drop
}
}
@ -745,7 +745,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
def handleInboundAssociation(ia: InboundAssociation, writerIsIdle: Boolean): Unit = ia match {
case ia @ InboundAssociation(handle: AkkaProtocolHandle) endpoints.readOnlyEndpointFor(handle.remoteAddress) match {
case Some((endpoint, _))
pendingReadHandoffs.get(endpoint) foreach (_.disassociate())
pendingReadHandoffs.get(endpoint) foreach (_.disassociate("the existing readOnly association was replaced by a new incoming one", log))
pendingReadHandoffs += endpoint handle
endpoint ! EndpointWriter.TakeOver(handle, self)
endpoints.writableEndpointWithPolicyFor(handle.remoteAddress) match {
@ -766,7 +766,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
createAndRegisterEndpoint(handle, refuseUid = endpoints.refuseUid(handle.remoteAddress))
case Some(Pass(ep, Some(uid), _))
if (handle.handshakeInfo.uid == uid) {
pendingReadHandoffs.get(ep) foreach (_.disassociate())
pendingReadHandoffs.get(ep) foreach (_.disassociate("the existing writable association was replaced by a new incoming one", log))
pendingReadHandoffs += ep handle
ep ! EndpointWriter.StopReading(ep, self)
ep ! ReliableDeliverySupervisor.Ungate

View file

@ -196,10 +196,9 @@ private[remote] class AkkaProtocolHandle(
override def write(payload: ByteString): Boolean = wrappedHandle.write(codec.constructPayload(payload))
override def disassociate(): Unit = stateActor ! DisassociateUnderlying(Unknown)
override def disassociate(): Unit = disassociate(Unknown)
def disassociate(info: DisassociateInfo): Unit = stateActor ! DisassociateUnderlying(info)
}
private[transport] object ProtocolStateActor {
@ -394,8 +393,12 @@ private[transport] class ProtocolStateActor(
// After receiving Disassociate we MUST NOT send back a Disassociate (loop)
stop(FSM.Failure(info))
case _
case msg
// Expected handshake to be finished, dropping connection
if (log.isDebugEnabled)
log.debug(
"Sending disassociate to [{}] because unexpected message of type [{}] was received during handshake",
wrappedHandle, msg.getClass.getName)
sendDisassociate(wrappedHandle, Unknown)
stop()
@ -431,18 +434,32 @@ private[transport] class ProtocolStateActor(
}
// Got a stray message -- explicitly reset the association (force remote endpoint to reassociate)
case _
case msg
if (log.isDebugEnabled)
log.debug(
"Sending disassociate to [{}] because unexpected message of type [{}] was received while unassociated",
wrappedHandle, msg.getClass.getName)
sendDisassociate(wrappedHandle, Unknown)
stop()
}
case Event(HandshakeTimer, OutboundUnderlyingAssociated(_, wrappedHandle))
if (log.isDebugEnabled)
log.debug(
"Sending disassociate to [{}] because handshake timed out for outbound association after [{}] ms.",
wrappedHandle, settings.HandshakeTimeout.toMillis)
sendDisassociate(wrappedHandle, Unknown)
stop(FSM.Failure(TimeoutReason("No response from remote for outbound association. Handshake timed out after " +
s"[${settings.HandshakeTimeout.toMillis} ms].")))
case Event(HandshakeTimer, InboundUnassociated(_, wrappedHandle))
if (log.isDebugEnabled)
log.debug(
"Sending disassociate to [{}] because handshake timed out for inbound association after [{}] ms.",
wrappedHandle, settings.HandshakeTimeout.toMillis)
sendDisassociate(wrappedHandle, Unknown)
stop(FSM.Failure(TimeoutReason("No response from remote for inbound association. Handshake timed out after " +
s"[${settings.HandshakeTimeout.toMillis} ms].")))
@ -489,6 +506,9 @@ private[transport] class ProtocolStateActor(
case msg
throw new AkkaProtocolException(s"unhandled message in state Open(DisassociateUnderlying) with type [${safeClassName(msg)}]")
}
// No debug logging here as sending DisassociateUnderlying(Unknown) should have been logged from where
// it was sent
sendDisassociate(handle, info)
stop()
@ -510,6 +530,11 @@ private[transport] class ProtocolStateActor(
sendHeartbeat(wrappedHandle)
stay()
} else {
if (log.isDebugEnabled)
log.debug(
"Sending disassociate to [{}] because failure detector triggered in state [{}]",
wrappedHandle, stateName)
// send disassociate just to be sure
sendDisassociate(wrappedHandle, Unknown)
stop(FSM.Failure(TimeoutReason(s"No response from remote. " +
@ -545,7 +570,7 @@ private[transport] class ProtocolStateActor(
case _
new AkkaProtocolException("Transport disassociated before handshake finished")
})
wrappedHandle.disassociate()
wrappedHandle.disassociate(disassociationReason(reason), log)
case StopEvent(reason, _, AssociatedWaitHandler(handlerFuture, wrappedHandle, queue))
// Invalidate exposed but still unfinished promise. The underlying association disappeared, so after
@ -555,7 +580,7 @@ private[transport] class ProtocolStateActor(
case _ Disassociated(Unknown)
}
handlerFuture foreach { _ notify disassociateNotification }
wrappedHandle.disassociate()
wrappedHandle.disassociate(disassociationReason(reason), log)
case StopEvent(reason, _, ListenerReady(handler, wrappedHandle))
val disassociateNotification = reason match {
@ -563,10 +588,10 @@ private[transport] class ProtocolStateActor(
case _ Disassociated(Unknown)
}
handler notify disassociateNotification
wrappedHandle.disassociate()
wrappedHandle.disassociate(disassociationReason(reason), log)
case StopEvent(_, _, InboundUnassociated(_, wrappedHandle))
wrappedHandle.disassociate()
case StopEvent(reason, _, InboundUnassociated(_, wrappedHandle))
wrappedHandle.disassociate(disassociationReason(reason), log)
}
@ -650,4 +675,10 @@ private[transport] class ProtocolStateActor(
} catch {
case NonFatal(e) throw new AkkaProtocolException("Error writing ASSOCIATE to transport", e)
}
private def disassociationReason(reason: FSM.Reason): String = reason match {
case FSM.Normal "the ProtocolStateActor was stopped normally"
case FSM.Shutdown "the ProtocolStateActor was shutdown"
case FSM.Failure(ex) s"the ProtocolStateActor failed: $ex"
}
}

View file

@ -6,12 +6,13 @@ package akka.remote.transport
import FailureInjectorTransportAdapter._
import akka.AkkaException
import akka.actor.{ Address, ExtendedActorSystem }
import akka.event.Logging
import akka.event.{ Logging, LoggingAdapter }
import akka.remote.transport.AssociationHandle.{ HandleEvent, HandleEventListener }
import akka.remote.transport.Transport._
import akka.util.ByteString
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ThreadLocalRandom
import scala.concurrent.{ Future, Promise }
import scala.util.control.NoStackTrace
@ -161,7 +162,11 @@ private[remote] final case class FailureInjectorHandle(
if (!gremlinAdapter.shouldDropOutbound(wrappedHandle.remoteAddress, payload, "handler.write")) wrappedHandle.write(payload)
else true
override def disassociate(): Unit = wrappedHandle.disassociate()
override def disassociate(reason: String, log: LoggingAdapter): Unit =
wrappedHandle.disassociate(reason, log)
override def disassociate(): Unit =
wrappedHandle.disassociate()
override def notify(ev: HandleEvent): Unit =
if (!gremlinAdapter.shouldDropInbound(wrappedHandle.remoteAddress, ev, "handler.notify"))

View file

@ -7,22 +7,24 @@ import akka.actor._
import akka.pattern.{ PromiseActorRef, ask, pipe }
import akka.remote.transport.ActorTransportAdapter.AssociateUnderlying
import akka.remote.transport.AkkaPduCodec.Associate
import akka.remote.transport.AssociationHandle.{ DisassociateInfo, ActorHandleEventListener, Disassociated, InboundPayload, HandleEventListener }
import akka.remote.transport.ThrottlerManager.{ Listener, Handle, ListenerAndMode, Checkin }
import akka.remote.transport.AssociationHandle.{ ActorHandleEventListener, DisassociateInfo, Disassociated, HandleEventListener, InboundPayload }
import akka.remote.transport.ThrottlerManager.{ Checkin, Handle, Listener, ListenerAndMode }
import akka.remote.transport.ThrottlerTransportAdapter._
import akka.remote.transport.Transport._
import akka.util.{ Timeout, ByteString }
import akka.util.{ ByteString, Timeout }
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicReference
import scala.annotation.tailrec
import scala.collection.immutable.Queue
import scala.concurrent.{ Future, Promise }
import scala.concurrent.duration._
import scala.math.min
import scala.util.{ Success, Failure }
import scala.util.{ Failure, Success }
import scala.util.control.NonFatal
import akka.dispatch.sysmsg.{ Unwatch, Watch }
import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue }
import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics }
import akka.event.LoggingAdapter
import akka.remote.RARP
class ThrottlerProvider extends TransportAdapterProvider {
@ -205,7 +207,8 @@ private[transport] object ThrottlerManager {
/**
* INTERNAL API
*/
private[transport] class ThrottlerManager(wrappedTransport: Transport) extends ActorTransportAdapterManager {
private[transport] class ThrottlerManager(wrappedTransport: Transport)
extends ActorTransportAdapterManager with ActorLogging {
import ThrottlerManager._
import context.dispatcher
@ -245,7 +248,7 @@ private[transport] class ThrottlerManager(wrappedTransport: Transport) extends A
case ForceDisassociate(address)
val naked = nakedAddress(address)
handleTable foreach {
case (`naked`, handle) handle.disassociate()
case (`naked`, handle) handle.disassociate(s"the disassociation was forced by ${sender()}", log)
case _
}
sender() ! ForceDisassociateAck
@ -373,7 +376,7 @@ private[transport] class ThrottledAssociation(
var throttledMessages = Queue.empty[ByteString]
var upstreamListener: HandleEventListener = _
override def postStop(): Unit = originalHandle.disassociate()
override def postStop(): Unit = originalHandle.disassociate("the owning ThrottledAssociation stopped", log)
if (inbound) startWith(WaitExposedHandle, Uninitialized) else {
originalHandle.readHandlerPromise.success(ActorHandleEventListener(self))
@ -406,7 +409,7 @@ private[transport] class ThrottledAssociation(
inboundThrottleMode = mode
try if (mode == Blackhole) {
throttledMessages = Queue.empty[ByteString]
exposedHandle.disassociate()
exposedHandle.disassociate("the association was blackholed", log)
stop()
} else {
associationHandler notify InboundAssociation(exposedHandle)
@ -549,9 +552,7 @@ private[transport] final case class ThrottlerHandle(_wrappedHandle: AssociationH
}
override def disassociate(): Unit = {
throttlerActor ! PoisonPill
}
override def disassociate(): Unit = throttlerActor ! PoisonPill
def disassociateWithFailure(reason: DisassociateInfo): Unit = {
throttlerActor ! ThrottledAssociation.FailWith(reason)

View file

@ -3,13 +3,15 @@
*/
package akka.remote.transport
import scala.concurrent.{ Promise, Future }
import akka.actor.{ NoSerializationVerificationNeeded, ActorRef, Address }
import scala.concurrent.{ Future, Promise }
import akka.actor.{ ActorRef, Address, NoSerializationVerificationNeeded }
import akka.util.ByteString
import akka.remote.transport.AssociationHandle.HandleEventListener
import akka.AkkaException
import scala.util.control.NoStackTrace
import akka.actor.DeadLetterSuppression
import akka.event.LoggingAdapter
object Transport {
@ -261,7 +263,22 @@ trait AssociationHandle {
* could be called arbitrarily many times.
*
*/
@deprecated(message = "Use method that states reasons to make sure disassociation reasons are logged.", since = "2.5.3")
def disassociate(): Unit
/**
* Closes the underlying transport link, if needed. Some transports might not need an explicit teardown (UDP) and
* some transports may not support it (hardware connections). Remote endpoint of the channel or connection MAY
* be notified, but this is not guaranteed. The Transport that provides the handle MUST guarantee that disassociate()
* could be called arbitrarily many times.
*/
def disassociate(reason: String, log: LoggingAdapter): Unit = {
if (log.isDebugEnabled)
log.debug(
"Association between local [{}] and remote [{}] was disassociated because {}",
localAddress, remoteAddress, reason)
disassociate()
}
}

View file

@ -41,8 +41,10 @@ private[remote] trait TcpHandlers extends CommonHandlers {
override def createHandle(channel: Channel, localAddress: Address, remoteAddress: Address): AssociationHandle =
new TcpAssociationHandle(localAddress, remoteAddress, transport, channel)
override def onDisconnect(ctx: ChannelHandlerContext, e: ChannelStateEvent): Unit =
override def onDisconnect(ctx: ChannelHandlerContext, e: ChannelStateEvent): Unit = {
notifyListener(e.getChannel, Disassociated(AssociationHandle.Unknown))
log.debug("Remote connection to [{}] was disconnected because of {}", e.getChannel.getRemoteAddress, e)
}
override def onMessage(ctx: ChannelHandlerContext, e: MessageEvent): Unit = {
val bytes: Array[Byte] = e.getMessage.asInstanceOf[ChannelBuffer].array()
@ -51,7 +53,7 @@ private[remote] trait TcpHandlers extends CommonHandlers {
override def onException(ctx: ChannelHandlerContext, e: ExceptionEvent): Unit = {
notifyListener(e.getChannel, Disassociated(AssociationHandle.Unknown))
log.warning("Remote connection to {} failed with {}", e.getChannel.getRemoteAddress, e.getCause)
log.warning("Remote connection to [{}] failed with {}", e.getChannel.getRemoteAddress, e.getCause)
e.getChannel.close() // No graceful close here
}
}

View file

@ -1220,6 +1220,11 @@ object MiMa extends AutoPlugin {
// #23144 recoverWithRetries cleanup
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.fusing.RecoverWith.InfiniteRetries")
),
"2.5.2" -> Seq(
// #23023 added a new overload with implementation to trait, so old transport implementations compiled against
// older versions will be missing the method. We accept that incompatibility for now.
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.remote.transport.AssociationHandle.disassociate")
)
)