diff --git a/akka-remote/src/main/scala/akka/remote/Endpoint.scala b/akka-remote/src/main/scala/akka/remote/Endpoint.scala index 69799b7dcb..c44f6261a5 100644 --- a/akka-remote/src/main/scala/akka/remote/Endpoint.scala +++ b/akka-remote/src/main/scala/akka/remote/Endpoint.scala @@ -842,7 +842,7 @@ private[remote] class EndpointWriter( } case TakeOver(newHandle, replyTo) ⇒ // Shutdown old reader - handle foreach { _.disassociate() } + handle foreach { _.disassociate("the association was replaced by a new one", log) } handle = Some(newHandle) replyTo ! TookOver(self, newHandle) context.become(handoff) diff --git a/akka-remote/src/main/scala/akka/remote/Remoting.scala b/akka-remote/src/main/scala/akka/remote/Remoting.scala index b00838403c..2455aea394 100644 --- a/akka-remote/src/main/scala/akka/remote/Remoting.scala +++ b/akka-remote/src/main/scala/akka/remote/Remoting.scala @@ -631,7 +631,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends val drop = matchesQuarantine(pendingHandle) // Side-effecting here if (drop) { - pendingHandle.disassociate() + pendingHandle.disassociate("the pending handle was quarantined", log) context.stop(pendingActor) } !drop @@ -643,7 +643,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends writer → associations.filter { assoc ⇒ val handle = assoc.association.asInstanceOf[AkkaProtocolHandle] val drop = matchesQuarantine(handle) - if (drop) handle.disassociate() + if (drop) handle.disassociate("the stashed inbound handle was quarantined", log) !drop } } @@ -745,7 +745,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends def handleInboundAssociation(ia: InboundAssociation, writerIsIdle: Boolean): Unit = ia match { case ia @ InboundAssociation(handle: AkkaProtocolHandle) ⇒ endpoints.readOnlyEndpointFor(handle.remoteAddress) match { case Some((endpoint, _)) ⇒ - pendingReadHandoffs.get(endpoint) foreach (_.disassociate()) + pendingReadHandoffs.get(endpoint) foreach (_.disassociate("the existing readOnly association was replaced by a new incoming one", log)) pendingReadHandoffs += endpoint → handle endpoint ! EndpointWriter.TakeOver(handle, self) endpoints.writableEndpointWithPolicyFor(handle.remoteAddress) match { @@ -766,7 +766,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends createAndRegisterEndpoint(handle, refuseUid = endpoints.refuseUid(handle.remoteAddress)) case Some(Pass(ep, Some(uid), _)) ⇒ if (handle.handshakeInfo.uid == uid) { - pendingReadHandoffs.get(ep) foreach (_.disassociate()) + pendingReadHandoffs.get(ep) foreach (_.disassociate("the existing writable association was replaced by a new incoming one", log)) pendingReadHandoffs += ep → handle ep ! EndpointWriter.StopReading(ep, self) ep ! ReliableDeliverySupervisor.Ungate diff --git a/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala b/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala index ce90fa2e05..07aa13dd46 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala @@ -196,10 +196,9 @@ private[remote] class AkkaProtocolHandle( override def write(payload: ByteString): Boolean = wrappedHandle.write(codec.constructPayload(payload)) - override def disassociate(): Unit = stateActor ! DisassociateUnderlying(Unknown) + override def disassociate(): Unit = disassociate(Unknown) def disassociate(info: DisassociateInfo): Unit = stateActor ! DisassociateUnderlying(info) - } private[transport] object ProtocolStateActor { @@ -394,8 +393,12 @@ private[transport] class ProtocolStateActor( // After receiving Disassociate we MUST NOT send back a Disassociate (loop) stop(FSM.Failure(info)) - case _ ⇒ + case msg ⇒ // Expected handshake to be finished, dropping connection + if (log.isDebugEnabled) + log.debug( + "Sending disassociate to [{}] because unexpected message of type [{}] was received during handshake", + wrappedHandle, msg.getClass.getName) sendDisassociate(wrappedHandle, Unknown) stop() @@ -431,18 +434,32 @@ private[transport] class ProtocolStateActor( } // Got a stray message -- explicitly reset the association (force remote endpoint to reassociate) - case _ ⇒ + case msg ⇒ + if (log.isDebugEnabled) + log.debug( + "Sending disassociate to [{}] because unexpected message of type [{}] was received while unassociated", + wrappedHandle, msg.getClass.getName) sendDisassociate(wrappedHandle, Unknown) stop() } case Event(HandshakeTimer, OutboundUnderlyingAssociated(_, wrappedHandle)) ⇒ + if (log.isDebugEnabled) + log.debug( + "Sending disassociate to [{}] because handshake timed out for outbound association after [{}] ms.", + wrappedHandle, settings.HandshakeTimeout.toMillis) + sendDisassociate(wrappedHandle, Unknown) stop(FSM.Failure(TimeoutReason("No response from remote for outbound association. Handshake timed out after " + s"[${settings.HandshakeTimeout.toMillis} ms]."))) case Event(HandshakeTimer, InboundUnassociated(_, wrappedHandle)) ⇒ + if (log.isDebugEnabled) + log.debug( + "Sending disassociate to [{}] because handshake timed out for inbound association after [{}] ms.", + wrappedHandle, settings.HandshakeTimeout.toMillis) + sendDisassociate(wrappedHandle, Unknown) stop(FSM.Failure(TimeoutReason("No response from remote for inbound association. Handshake timed out after " + s"[${settings.HandshakeTimeout.toMillis} ms]."))) @@ -489,6 +506,9 @@ private[transport] class ProtocolStateActor( case msg ⇒ throw new AkkaProtocolException(s"unhandled message in state Open(DisassociateUnderlying) with type [${safeClassName(msg)}]") } + // No debug logging here as sending DisassociateUnderlying(Unknown) should have been logged from where + // it was sent + sendDisassociate(handle, info) stop() @@ -510,6 +530,11 @@ private[transport] class ProtocolStateActor( sendHeartbeat(wrappedHandle) stay() } else { + if (log.isDebugEnabled) + log.debug( + "Sending disassociate to [{}] because failure detector triggered in state [{}]", + wrappedHandle, stateName) + // send disassociate just to be sure sendDisassociate(wrappedHandle, Unknown) stop(FSM.Failure(TimeoutReason(s"No response from remote. " + @@ -545,7 +570,7 @@ private[transport] class ProtocolStateActor( case _ ⇒ new AkkaProtocolException("Transport disassociated before handshake finished") }) - wrappedHandle.disassociate() + wrappedHandle.disassociate(disassociationReason(reason), log) case StopEvent(reason, _, AssociatedWaitHandler(handlerFuture, wrappedHandle, queue)) ⇒ // Invalidate exposed but still unfinished promise. The underlying association disappeared, so after @@ -555,7 +580,7 @@ private[transport] class ProtocolStateActor( case _ ⇒ Disassociated(Unknown) } handlerFuture foreach { _ notify disassociateNotification } - wrappedHandle.disassociate() + wrappedHandle.disassociate(disassociationReason(reason), log) case StopEvent(reason, _, ListenerReady(handler, wrappedHandle)) ⇒ val disassociateNotification = reason match { @@ -563,10 +588,10 @@ private[transport] class ProtocolStateActor( case _ ⇒ Disassociated(Unknown) } handler notify disassociateNotification - wrappedHandle.disassociate() + wrappedHandle.disassociate(disassociationReason(reason), log) - case StopEvent(_, _, InboundUnassociated(_, wrappedHandle)) ⇒ - wrappedHandle.disassociate() + case StopEvent(reason, _, InboundUnassociated(_, wrappedHandle)) ⇒ + wrappedHandle.disassociate(disassociationReason(reason), log) } @@ -650,4 +675,10 @@ private[transport] class ProtocolStateActor( } catch { case NonFatal(e) ⇒ throw new AkkaProtocolException("Error writing ASSOCIATE to transport", e) } + + private def disassociationReason(reason: FSM.Reason): String = reason match { + case FSM.Normal ⇒ "the ProtocolStateActor was stopped normally" + case FSM.Shutdown ⇒ "the ProtocolStateActor was shutdown" + case FSM.Failure(ex) ⇒ s"the ProtocolStateActor failed: $ex" + } } diff --git a/akka-remote/src/main/scala/akka/remote/transport/FailureInjectorTransportAdapter.scala b/akka-remote/src/main/scala/akka/remote/transport/FailureInjectorTransportAdapter.scala index 394197ae4e..24d45522f0 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/FailureInjectorTransportAdapter.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/FailureInjectorTransportAdapter.scala @@ -6,12 +6,13 @@ package akka.remote.transport import FailureInjectorTransportAdapter._ import akka.AkkaException import akka.actor.{ Address, ExtendedActorSystem } -import akka.event.Logging +import akka.event.{ Logging, LoggingAdapter } import akka.remote.transport.AssociationHandle.{ HandleEvent, HandleEventListener } import akka.remote.transport.Transport._ import akka.util.ByteString import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ThreadLocalRandom + import scala.concurrent.{ Future, Promise } import scala.util.control.NoStackTrace @@ -161,7 +162,11 @@ private[remote] final case class FailureInjectorHandle( if (!gremlinAdapter.shouldDropOutbound(wrappedHandle.remoteAddress, payload, "handler.write")) wrappedHandle.write(payload) else true - override def disassociate(): Unit = wrappedHandle.disassociate() + override def disassociate(reason: String, log: LoggingAdapter): Unit = + wrappedHandle.disassociate(reason, log) + + override def disassociate(): Unit = + wrappedHandle.disassociate() override def notify(ev: HandleEvent): Unit = if (!gremlinAdapter.shouldDropInbound(wrappedHandle.remoteAddress, ev, "handler.notify")) diff --git a/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala b/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala index b27ac63a35..09f6cdd8e0 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala @@ -7,22 +7,24 @@ import akka.actor._ import akka.pattern.{ PromiseActorRef, ask, pipe } import akka.remote.transport.ActorTransportAdapter.AssociateUnderlying import akka.remote.transport.AkkaPduCodec.Associate -import akka.remote.transport.AssociationHandle.{ DisassociateInfo, ActorHandleEventListener, Disassociated, InboundPayload, HandleEventListener } -import akka.remote.transport.ThrottlerManager.{ Listener, Handle, ListenerAndMode, Checkin } +import akka.remote.transport.AssociationHandle.{ ActorHandleEventListener, DisassociateInfo, Disassociated, HandleEventListener, InboundPayload } +import akka.remote.transport.ThrottlerManager.{ Checkin, Handle, Listener, ListenerAndMode } import akka.remote.transport.ThrottlerTransportAdapter._ import akka.remote.transport.Transport._ -import akka.util.{ Timeout, ByteString } +import akka.util.{ ByteString, Timeout } import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicReference + import scala.annotation.tailrec import scala.collection.immutable.Queue import scala.concurrent.{ Future, Promise } import scala.concurrent.duration._ import scala.math.min -import scala.util.{ Success, Failure } +import scala.util.{ Failure, Success } import scala.util.control.NonFatal import akka.dispatch.sysmsg.{ Unwatch, Watch } -import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue } +import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics } +import akka.event.LoggingAdapter import akka.remote.RARP class ThrottlerProvider extends TransportAdapterProvider { @@ -205,7 +207,8 @@ private[transport] object ThrottlerManager { /** * INTERNAL API */ -private[transport] class ThrottlerManager(wrappedTransport: Transport) extends ActorTransportAdapterManager { +private[transport] class ThrottlerManager(wrappedTransport: Transport) + extends ActorTransportAdapterManager with ActorLogging { import ThrottlerManager._ import context.dispatcher @@ -245,7 +248,7 @@ private[transport] class ThrottlerManager(wrappedTransport: Transport) extends A case ForceDisassociate(address) ⇒ val naked = nakedAddress(address) handleTable foreach { - case (`naked`, handle) ⇒ handle.disassociate() + case (`naked`, handle) ⇒ handle.disassociate(s"the disassociation was forced by ${sender()}", log) case _ ⇒ } sender() ! ForceDisassociateAck @@ -373,7 +376,7 @@ private[transport] class ThrottledAssociation( var throttledMessages = Queue.empty[ByteString] var upstreamListener: HandleEventListener = _ - override def postStop(): Unit = originalHandle.disassociate() + override def postStop(): Unit = originalHandle.disassociate("the owning ThrottledAssociation stopped", log) if (inbound) startWith(WaitExposedHandle, Uninitialized) else { originalHandle.readHandlerPromise.success(ActorHandleEventListener(self)) @@ -406,7 +409,7 @@ private[transport] class ThrottledAssociation( inboundThrottleMode = mode try if (mode == Blackhole) { throttledMessages = Queue.empty[ByteString] - exposedHandle.disassociate() + exposedHandle.disassociate("the association was blackholed", log) stop() } else { associationHandler notify InboundAssociation(exposedHandle) @@ -549,9 +552,7 @@ private[transport] final case class ThrottlerHandle(_wrappedHandle: AssociationH } - override def disassociate(): Unit = { - throttlerActor ! PoisonPill - } + override def disassociate(): Unit = throttlerActor ! PoisonPill def disassociateWithFailure(reason: DisassociateInfo): Unit = { throttlerActor ! ThrottledAssociation.FailWith(reason) diff --git a/akka-remote/src/main/scala/akka/remote/transport/Transport.scala b/akka-remote/src/main/scala/akka/remote/transport/Transport.scala index 8598debff8..196263cabc 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/Transport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/Transport.scala @@ -3,13 +3,15 @@ */ package akka.remote.transport -import scala.concurrent.{ Promise, Future } -import akka.actor.{ NoSerializationVerificationNeeded, ActorRef, Address } +import scala.concurrent.{ Future, Promise } +import akka.actor.{ ActorRef, Address, NoSerializationVerificationNeeded } import akka.util.ByteString import akka.remote.transport.AssociationHandle.HandleEventListener import akka.AkkaException + import scala.util.control.NoStackTrace import akka.actor.DeadLetterSuppression +import akka.event.LoggingAdapter object Transport { @@ -261,7 +263,22 @@ trait AssociationHandle { * could be called arbitrarily many times. * */ + @deprecated(message = "Use method that states reasons to make sure disassociation reasons are logged.", since = "2.5.3") def disassociate(): Unit + /** + * Closes the underlying transport link, if needed. Some transports might not need an explicit teardown (UDP) and + * some transports may not support it (hardware connections). Remote endpoint of the channel or connection MAY + * be notified, but this is not guaranteed. The Transport that provides the handle MUST guarantee that disassociate() + * could be called arbitrarily many times. + */ + def disassociate(reason: String, log: LoggingAdapter): Unit = { + if (log.isDebugEnabled) + log.debug( + "Association between local [{}] and remote [{}] was disassociated because {}", + localAddress, remoteAddress, reason) + + disassociate() + } } diff --git a/akka-remote/src/main/scala/akka/remote/transport/netty/TcpSupport.scala b/akka-remote/src/main/scala/akka/remote/transport/netty/TcpSupport.scala index fc841afe23..3e31a460bd 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/netty/TcpSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/netty/TcpSupport.scala @@ -41,8 +41,10 @@ private[remote] trait TcpHandlers extends CommonHandlers { override def createHandle(channel: Channel, localAddress: Address, remoteAddress: Address): AssociationHandle = new TcpAssociationHandle(localAddress, remoteAddress, transport, channel) - override def onDisconnect(ctx: ChannelHandlerContext, e: ChannelStateEvent): Unit = + override def onDisconnect(ctx: ChannelHandlerContext, e: ChannelStateEvent): Unit = { notifyListener(e.getChannel, Disassociated(AssociationHandle.Unknown)) + log.debug("Remote connection to [{}] was disconnected because of {}", e.getChannel.getRemoteAddress, e) + } override def onMessage(ctx: ChannelHandlerContext, e: MessageEvent): Unit = { val bytes: Array[Byte] = e.getMessage.asInstanceOf[ChannelBuffer].array() @@ -51,7 +53,7 @@ private[remote] trait TcpHandlers extends CommonHandlers { override def onException(ctx: ChannelHandlerContext, e: ExceptionEvent): Unit = { notifyListener(e.getChannel, Disassociated(AssociationHandle.Unknown)) - log.warning("Remote connection to {} failed with {}", e.getChannel.getRemoteAddress, e.getCause) + log.warning("Remote connection to [{}] failed with {}", e.getChannel.getRemoteAddress, e.getCause) e.getChannel.close() // No graceful close here } } diff --git a/project/MiMa.scala b/project/MiMa.scala index 1c3c47f1d5..b70ce7c1e9 100644 --- a/project/MiMa.scala +++ b/project/MiMa.scala @@ -1220,6 +1220,11 @@ object MiMa extends AutoPlugin { // #23144 recoverWithRetries cleanup ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.fusing.RecoverWith.InfiniteRetries") + ), + "2.5.2" -> Seq( + // #23023 added a new overload with implementation to trait, so old transport implementations compiled against + // older versions will be missing the method. We accept that incompatibility for now. + ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.remote.transport.AssociationHandle.disassociate") ) )