diff --git a/akka-remote/src/main/java/akka/remote/WireFormats.java b/akka-remote/src/main/java/akka/remote/WireFormats.java index 7baaacac9e..91c6afd3f6 100644 --- a/akka-remote/src/main/java/akka/remote/WireFormats.java +++ b/akka-remote/src/main/java/akka/remote/WireFormats.java @@ -10,23 +10,29 @@ public final class WireFormats { } public enum CommandType implements com.google.protobuf.ProtocolMessageEnum { - CONNECT(0, 1), - SHUTDOWN(1, 2), + ASSOCIATE(0, 1), + DISASSOCIATE(1, 2), HEARTBEAT(2, 3), + DISASSOCIATE_SHUTTING_DOWN(3, 4), + DISASSOCIATE_QUARANTINED(4, 5), ; - public static final int CONNECT_VALUE = 1; - public static final int SHUTDOWN_VALUE = 2; + public static final int ASSOCIATE_VALUE = 1; + public static final int DISASSOCIATE_VALUE = 2; public static final int HEARTBEAT_VALUE = 3; + public static final int DISASSOCIATE_SHUTTING_DOWN_VALUE = 4; + public static final int DISASSOCIATE_QUARANTINED_VALUE = 5; public final int getNumber() { return value; } public static CommandType valueOf(int value) { switch (value) { - case 1: return CONNECT; - case 2: return SHUTDOWN; + case 1: return ASSOCIATE; + case 2: return DISASSOCIATE; case 3: return HEARTBEAT; + case 4: return DISASSOCIATE_SHUTTING_DOWN; + case 5: return DISASSOCIATE_QUARANTINED; default: return null; } } @@ -57,7 +63,7 @@ public final class WireFormats { } private static final CommandType[] VALUES = { - CONNECT, SHUTDOWN, HEARTBEAT, + ASSOCIATE, DISASSOCIATE, HEARTBEAT, DISASSOCIATE_SHUTTING_DOWN, DISASSOCIATE_QUARANTINED, }; public static CommandType valueOf( @@ -5626,7 +5632,7 @@ public final class WireFormats { } private void initFields() { - commandType_ = akka.remote.WireFormats.CommandType.CONNECT; + commandType_ = akka.remote.WireFormats.CommandType.ASSOCIATE; handshakeInfo_ = akka.remote.WireFormats.AkkaHandshakeInfo.getDefaultInstance(); } private byte memoizedIsInitialized = -1; @@ -5799,7 +5805,7 @@ public final class WireFormats { public Builder clear() { super.clear(); - commandType_ = akka.remote.WireFormats.CommandType.CONNECT; + commandType_ = akka.remote.WireFormats.CommandType.ASSOCIATE; bitField0_ = (bitField0_ & ~0x00000001); if (handshakeInfoBuilder_ == null) { handshakeInfo_ = akka.remote.WireFormats.AkkaHandshakeInfo.getDefaultInstance(); @@ -5947,7 +5953,7 @@ public final class WireFormats { private int bitField0_; // required .CommandType commandType = 1; - private akka.remote.WireFormats.CommandType commandType_ = akka.remote.WireFormats.CommandType.CONNECT; + private akka.remote.WireFormats.CommandType commandType_ = akka.remote.WireFormats.CommandType.ASSOCIATE; public boolean hasCommandType() { return ((bitField0_ & 0x00000001) == 0x00000001); } @@ -5965,7 +5971,7 @@ public final class WireFormats { } public Builder clearCommandType() { bitField0_ = (bitField0_ & ~0x00000001); - commandType_ = akka.remote.WireFormats.CommandType.CONNECT; + commandType_ = akka.remote.WireFormats.CommandType.ASSOCIATE; onChanged(); return this; } @@ -7410,8 +7416,10 @@ public final class WireFormats { "(\0132\014.AddressData\022\013\n\003uid\030\002 \002(\006\022\016\n\006cookie\030" + "\003 \001(\t\"O\n\013AddressData\022\016\n\006system\030\001 \002(\t\022\020\n\010" + "hostname\030\002 \002(\t\022\014\n\004port\030\003 \002(\r\022\020\n\010protocol" + - "\030\004 \001(\t*7\n\013CommandType\022\013\n\007CONNECT\020\001\022\014\n\010SH" + - "UTDOWN\020\002\022\r\n\tHEARTBEAT\020\003B\017\n\013akka.remoteH\001" + "\030\004 \001(\t*{\n\013CommandType\022\r\n\tASSOCIATE\020\001\022\020\n\014" + + "DISASSOCIATE\020\002\022\r\n\tHEARTBEAT\020\003\022\036\n\032DISASSO" + + "CIATE_SHUTTING_DOWN\020\004\022\034\n\030DISASSOCIATE_QU", + "ARANTINED\020\005B\017\n\013akka.remoteH\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { diff --git a/akka-remote/src/main/protocol/WireFormats.proto b/akka-remote/src/main/protocol/WireFormats.proto index 0a44dc3ce2..b9bfccaf37 100644 --- a/akka-remote/src/main/protocol/WireFormats.proto +++ b/akka-remote/src/main/protocol/WireFormats.proto @@ -119,12 +119,13 @@ message AkkaHandshakeInfo { * Defines the type of the AkkaControlMessage command type */ enum CommandType { - CONNECT = 1; - SHUTDOWN = 2; + ASSOCIATE = 1; + DISASSOCIATE = 2; HEARTBEAT = 3; + DISASSOCIATE_SHUTTING_DOWN = 4; // Remote system is going down and will not accepts new connections + DISASSOCIATE_QUARANTINED = 5; // Remote system refused the association since the current system is quarantined } - /** * Defines a remote address. */ diff --git a/akka-remote/src/main/resources/reference.conf b/akka-remote/src/main/resources/reference.conf index 17f1dd54fc..1091c2814c 100644 --- a/akka-remote/src/main/resources/reference.conf +++ b/akka-remote/src/main/resources/reference.conf @@ -239,13 +239,15 @@ akka { retry-window = 60 s maximum-retries-in-window = 3 - # The length of time to gate an address whose name lookup has failed. + # The length of time to gate an address whose name lookup has failed + # or has explicitly signalled that it will not accept connections + # (remote system is shutting down or the requesting system is quarantined). # No connection attempts will be made to an address while it remains # gated. Any messages sent to a gated address will be directed to dead # letters instead. Name lookups are costly, and the time to recovery # is typically large, therefore this setting should be a value in the # order of seconds or minutes. - gate-unknown-addresses-for = 60 s + gate-invalid-addresses-for = 60 s # This settings controls how long a system will be quarantined after # catastrophic communication failures that result in the loss of system diff --git a/akka-remote/src/main/scala/akka/remote/AckedDelivery.scala b/akka-remote/src/main/scala/akka/remote/AckedDelivery.scala index 829f610bac..71db2ce11a 100644 --- a/akka-remote/src/main/scala/akka/remote/AckedDelivery.scala +++ b/akka-remote/src/main/scala/akka/remote/AckedDelivery.scala @@ -181,10 +181,11 @@ case class AckedReceiveBuffer[T <: HasSequenceNumber]( * @return The merged receive buffer. */ def mergeFrom(that: AckedReceiveBuffer[T]): AckedReceiveBuffer[T] = { + val mergedLastDelivered = max(this.lastDelivered, that.lastDelivered) this.copy( - lastDelivered = max(this.lastDelivered, that.lastDelivered), + lastDelivered = mergedLastDelivered, cumulativeAck = max(this.cumulativeAck, that.cumulativeAck), - buf = (this.buf union that.buf).filter { _.seq > lastDelivered }) + buf = (this.buf union that.buf).filter { _.seq > mergedLastDelivered }) } override def toString = buf.map { _.seq }.mkString("[", ", ", "]") diff --git a/akka-remote/src/main/scala/akka/remote/Endpoint.scala b/akka-remote/src/main/scala/akka/remote/Endpoint.scala index 62412692cb..8b366352e1 100644 --- a/akka-remote/src/main/scala/akka/remote/Endpoint.scala +++ b/akka-remote/src/main/scala/akka/remote/Endpoint.scala @@ -3,27 +3,29 @@ */ package akka.remote +import akka.actor.OneForOneStrategy +import akka.actor.SupervisorStrategy._ +import akka.actor.Terminated import akka.actor._ import akka.dispatch.sysmsg.SystemMessage import akka.event.LoggingAdapter import akka.pattern.pipe +import akka.remote.EndpointManager.Link import akka.remote.EndpointManager.Send +import akka.remote.EndpointWriter.{ StoppedReading, FlushAndStop } import akka.remote.WireFormats.SerializedMessage -import akka.remote.transport.AkkaPduCodec._ -import akka.remote.transport.AssociationHandle._ +import akka.remote.transport.AkkaPduCodec.Message +import akka.remote.transport.AssociationHandle.{ DisassociateInfo, ActorHandleEventListener, Disassociated, InboundPayload } import akka.remote.transport.Transport.InvalidAssociationException -import akka.remote.transport.{ AkkaPduProtobufCodec, AkkaPduCodec, Transport, AkkaProtocolHandle } +import akka.remote.transport._ import akka.serialization.Serialization import akka.util.ByteString import akka.{ OnlyCauseStackTrace, AkkaException } import java.io.NotSerializableException import java.util.concurrent.ConcurrentHashMap +import scala.annotation.tailrec import scala.concurrent.duration.{ Duration, Deadline } import scala.util.control.NonFatal -import scala.annotation.tailrec -import akka.remote.EndpointWriter.FlushAndStop -import akka.actor.SupervisorStrategy._ -import akka.remote.EndpointManager.Link /** * INTERNAL API @@ -145,6 +147,7 @@ private[remote] class OversizedPayloadException(msg: String) extends EndpointExc */ private[remote] object ReliableDeliverySupervisor { case object Ungate + case object AttemptSysMsgRedelivery case class GotUid(uid: Int) def props( @@ -171,6 +174,7 @@ private[remote] class ReliableDeliverySupervisor( val codec: AkkaPduCodec, val receiveBuffers: ConcurrentHashMap[Link, AckedReceiveBuffer[Message]]) extends Actor { import ReliableDeliverySupervisor._ + import context.dispatcher def retryGateEnabled = settings.RetryGateClosedFor > Duration.Zero @@ -178,11 +182,11 @@ private[remote] class ReliableDeliverySupervisor( case e @ (_: InvalidAssociation | _: HopelessAssociation | _: QuarantinedUidException) ⇒ Escalate case NonFatal(e) ⇒ if (retryGateEnabled) { - import context.dispatcher context.become(gated) context.system.scheduler.scheduleOnce(settings.RetryGateClosedFor, self, Ungate) context.unwatch(writer) currentHandle = None + context.parent ! StoppedReading(self) Stop } else { Restart @@ -242,6 +246,9 @@ private[remote] class ReliableDeliverySupervisor( resendNacked() case Terminated(_) ⇒ currentHandle = None + context.parent ! StoppedReading(self) + if (resendBuffer.nonAcked.nonEmpty || resendBuffer.nacked.nonEmpty) + context.system.scheduler.scheduleOnce(settings.SysResendTimeout, self, AttemptSysMsgRedelivery) context.become(idle) case GotUid(u) ⇒ uid = Some(u) case s: EndpointWriter.StopReading ⇒ writer forward s @@ -267,6 +274,10 @@ private[remote] class ReliableDeliverySupervisor( resendAll() handleSend(s) context.become(receive) + case AttemptSysMsgRedelivery ⇒ + writer = createWriter() + resendAll() + context.become(receive) case EndpointWriter.FlushAndStop ⇒ context.stop(self) case EndpointWriter.StopReading(w) ⇒ sender ! EndpointWriter.StoppedReading(w) } @@ -415,6 +426,7 @@ private[remote] class EndpointWriter( val msgDispatch = new DefaultMessageDispatcher(extendedSystem, provider, log) var inbound = handle.isDefined + var stopReason: DisassociateInfo = AssociationHandle.Unknown private def publishAndThrow(reason: Throwable): Nothing = { publishError(reason) @@ -521,6 +533,7 @@ private[remote] class EndpointWriter( case Event(FlushAndStop, _) ⇒ // Try to send a last Ack message trySendPureAck() + stopReason = AssociationHandle.Shutdown stop() case Event(AckIdleCheckTimer, _) if ackDeadline.isOverdue() ⇒ @@ -558,6 +571,7 @@ private[remote] class EndpointWriter( handle = Some(newHandle) goto(Handoff) case Event(FlushAndStop, _) ⇒ + stopReason = AssociationHandle.Shutdown stop() case Event(OutboundAck(ack), _) ⇒ lastAck = Some(ack) @@ -583,7 +597,7 @@ private[remote] class EndpointWriter( // It is important to call unstashAll() for the stash to work properly and maintain messages during restart. // As the FSM trait does not call super.postStop(), this call is needed unstashAll() - handle foreach { _.disassociate() } + handle foreach { _.disassociate(stopReason) } eventPublisher.notifyListeners(DisassociatedEvent(localAddress, remoteAddress, inbound)) } @@ -675,8 +689,7 @@ private[remote] class EndpointReader( } override def receive: Receive = { - case Disassociated ⇒ - context.stop(self) + case Disassociated(info) ⇒ handleDisassociated(info) case InboundPayload(p) if p.size <= transport.maximumPayloadBytes ⇒ val (ackOption, msgOption) = tryDecodeMessageAndAck(p) @@ -705,10 +718,10 @@ private[remote] class EndpointReader( } def notReading: Receive = { - case Disassociated ⇒ context.stop(self) + case Disassociated(info) ⇒ handleDisassociated(info) - case StopReading(newHandle) ⇒ - sender ! StoppedReading(newHandle) + case StopReading(writer) ⇒ + sender ! StoppedReading(writer) case InboundPayload(p) ⇒ val (ackOption, _) = tryDecodeMessageAndAck(p) @@ -717,6 +730,22 @@ private[remote] class EndpointReader( case _ ⇒ } + private def handleDisassociated(info: DisassociateInfo): Unit = info match { + case AssociationHandle.Unknown ⇒ + context.stop(self) + case AssociationHandle.Shutdown ⇒ + throw InvalidAssociation( + localAddress, + remoteAddress, + InvalidAssociationException("The remote system terminated the association because it is shutting down.")) + case AssociationHandle.Quarantined ⇒ + throw InvalidAssociation( + localAddress, + remoteAddress, + InvalidAssociationException("The remote system has quarantined this system. No further associations " + + "to the remote system are possible until this system is restarted.")) + } + private def deliverAndAck(): Unit = { val (updatedBuffer, deliver, ack) = ackedReceiveBuffer.extractDeliverable ackedReceiveBuffer = updatedBuffer diff --git a/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala b/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala index 2c3a2b86b5..de0d8119a9 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala @@ -40,8 +40,8 @@ final class RemoteSettings(val config: Config) { } requiring (_ >= Duration.Zero, "retry-gate-closed-for must be >= 0") val UnknownAddressGateClosedFor: FiniteDuration = { - Duration(getMilliseconds("akka.remote.gate-unknown-addresses-for"), MILLISECONDS) - } requiring (_ > Duration.Zero, "gate-unknown-addresses-for must be > 0") + Duration(getMilliseconds("akka.remote.gate-invalid-addresses-for"), MILLISECONDS) + } requiring (_ > Duration.Zero, "gate-invalid-addresses-for must be > 0") val UsePassiveConnections: Boolean = getBoolean("akka.remote.use-passive-connections") diff --git a/akka-remote/src/main/scala/akka/remote/Remoting.scala b/akka-remote/src/main/scala/akka/remote/Remoting.scala index d66750425c..c173e309c8 100644 --- a/akka-remote/src/main/scala/akka/remote/Remoting.scala +++ b/akka-remote/src/main/scala/akka/remote/Remoting.scala @@ -379,7 +379,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends override val supervisorStrategy = OneForOneStrategy(loggingEnabled = false) { case InvalidAssociation(localAddress, remoteAddress, _) ⇒ - log.error("Tried to associate with invalid remote address [{}]. " + + log.error("Tried to associate with unreachable remote address [{}]. " + "Address is now quarantined, all messages to this address will be delivered to dead letters.", remoteAddress) endpoints.markAsFailed(sender, Deadline.now + settings.UnknownAddressGateClosedFor) context.system.eventStream.publish(AddressTerminated(remoteAddress)) @@ -445,7 +445,8 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends addressesPromise.success(transportsAndAddresses) case ListensFailure(addressesPromise, cause) ⇒ addressesPromise.failure(cause) - + case ia: InboundAssociation ⇒ + context.system.scheduler.scheduleOnce(10.milliseconds, self, ia) case ManagementCommand(_) ⇒ sender ! ManagementCommandAck(false) case StartupFinished ⇒ @@ -510,9 +511,11 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends case Some(endpoint) ⇒ endpoint ! EndpointWriter.TakeOver(handle) case None ⇒ - if (endpoints.isQuarantined(handle.remoteAddress, handle.handshakeInfo.uid)) handle.disassociate() + if (endpoints.isQuarantined(handle.remoteAddress, handle.handshakeInfo.uid)) + handle.disassociate(AssociationHandle.Quarantined) else endpoints.writableEndpointWithPolicyFor(handle.remoteAddress) match { case Some(Pass(ep)) ⇒ + pendingReadHandoffs.get(ep) foreach (_.disassociate()) pendingReadHandoffs += ep -> handle ep ! EndpointWriter.StopReading(ep) case _ ⇒ @@ -562,14 +565,16 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends shutdownStatus ← shutdownAll(transportMapping.values)(_.shutdown()) } yield flushStatus && shutdownStatus) pipeTo sender + pendingReadHandoffs.valuesIterator foreach (_.disassociate(AssociationHandle.Shutdown)) + // Ignore all other writes context.become(flushing) } def flushing: Receive = { - case s: Send ⇒ extendedSystem.deadLetters ! s - case InboundAssociation(h) ⇒ h.disassociate() - case Terminated(_) ⇒ // why should we care now? + case s: Send ⇒ extendedSystem.deadLetters ! s + case InboundAssociation(h: AkkaProtocolHandle) ⇒ h.disassociate(AssociationHandle.Shutdown) + case Terminated(_) ⇒ // why should we care now? } private def listens: Future[Seq[(Transport, Address, Promise[AssociationEventListener])]] = { diff --git a/akka-remote/src/main/scala/akka/remote/transport/AbstractTransportAdapter.scala b/akka-remote/src/main/scala/akka/remote/transport/AbstractTransportAdapter.scala index 5b9e2fa893..4e70a18cad 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/AbstractTransportAdapter.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/AbstractTransportAdapter.scala @@ -13,6 +13,7 @@ import scala.collection.immutable import scala.concurrent.duration._ import scala.concurrent.{ ExecutionContext, Promise, Future } import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue } +import akka.remote.transport.AssociationHandle.DisassociateInfo trait TransportAdapterProvider { /** @@ -125,7 +126,7 @@ object ActorTransportAdapter { case class AssociateUnderlying(remoteAddress: Address, statusPromise: Promise[AssociationHandle]) extends TransportOperation case class ListenUnderlying(listenAddress: Address, upstreamListener: Future[AssociationEventListener]) extends TransportOperation - case object DisassociateUnderlying extends TransportOperation + case class DisassociateUnderlying(info: DisassociateInfo = AssociationHandle.Unknown) extends TransportOperation implicit val AskTimeout = Timeout(5.seconds) } diff --git a/akka-remote/src/main/scala/akka/remote/transport/AkkaPduCodec.scala b/akka-remote/src/main/scala/akka/remote/transport/AkkaPduCodec.scala index 0599e4e7b9..d1eb0438cd 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/AkkaPduCodec.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/AkkaPduCodec.scala @@ -6,15 +6,10 @@ package akka.remote.transport import akka.AkkaException import akka.actor.{ AddressFromURIString, InternalActorRef, Address, ActorRef } import akka.remote.WireFormats._ -import akka.remote.transport.AkkaPduCodec._ import akka.remote._ import akka.util.ByteString import com.google.protobuf.InvalidProtocolBufferException import com.google.protobuf.{ ByteString ⇒ PByteString } -import akka.remote.Ack -import akka.remote.transport.AkkaPduCodec.Payload -import akka.remote.transport.AkkaPduCodec.Associate -import akka.remote.transport.AkkaPduCodec.Message /** * INTERNAL API @@ -35,7 +30,7 @@ private[remote] object AkkaPduCodec { */ sealed trait AkkaPdu case class Associate(info: HandshakeInfo) extends AkkaPdu - case object Disassociate extends AkkaPdu + case class Disassociate(reason: AssociationHandle.DisassociateInfo) extends AkkaPdu case object Heartbeat extends AkkaPdu case class Payload(bytes: ByteString) extends AkkaPdu @@ -57,7 +52,7 @@ private[remote] object AkkaPduCodec { * A Codec that is able to convert Akka PDUs (Protocol Data Units) from and to [[akka.util.ByteString]]s. */ private[remote] trait AkkaPduCodec { - + import AkkaPduCodec._ /** * Returns an [[akka.remote.transport.AkkaPduCodec.AkkaPdu]] instance that represents the PDU contained in the raw * ByteString. @@ -81,17 +76,17 @@ private[remote] trait AkkaPduCodec { * Encoded form as raw bytes */ def encodePdu(pdu: AkkaPdu): ByteString = pdu match { - case Associate(info) ⇒ constructAssociate(info) - case Payload(bytes) ⇒ constructPayload(bytes) - case Disassociate ⇒ constructDisassociate - case Heartbeat ⇒ constructHeartbeat + case Associate(info) ⇒ constructAssociate(info) + case Payload(bytes) ⇒ constructPayload(bytes) + case Disassociate(reason) ⇒ constructDisassociate(reason) + case Heartbeat ⇒ constructHeartbeat } def constructPayload(payload: ByteString): ByteString def constructAssociate(info: HandshakeInfo): ByteString - def constructDisassociate: ByteString + def constructDisassociate(reason: AssociationHandle.DisassociateInfo): ByteString def constructHeartbeat: ByteString @@ -112,6 +107,7 @@ private[remote] trait AkkaPduCodec { * INTERNAL API */ private[remote] object AkkaPduProtobufCodec extends AkkaPduCodec { + import AkkaPduCodec._ private def ackBuilder(ack: Ack): AcknowledgementInfo.Builder = { val ackBuilder = AcknowledgementInfo.newBuilder() @@ -151,11 +147,18 @@ private[remote] object AkkaPduProtobufCodec extends AkkaPduCodec { override def constructAssociate(info: HandshakeInfo): ByteString = { val handshakeInfo = AkkaHandshakeInfo.newBuilder.setOrigin(serializeAddress(info.origin)).setUid(info.uid) info.cookie foreach handshakeInfo.setCookie - constructControlMessagePdu(WireFormats.CommandType.CONNECT, Some(handshakeInfo)) + constructControlMessagePdu(WireFormats.CommandType.ASSOCIATE, Some(handshakeInfo)) } - override val constructDisassociate: ByteString = - constructControlMessagePdu(WireFormats.CommandType.SHUTDOWN, None) + private val DISASSOCIATE = constructControlMessagePdu(WireFormats.CommandType.DISASSOCIATE, None) + private val DISASSOCIATE_SHUTTING_DOWN = constructControlMessagePdu(WireFormats.CommandType.DISASSOCIATE_SHUTTING_DOWN, None) + private val DISASSOCIATE_QUARANTINED = constructControlMessagePdu(WireFormats.CommandType.DISASSOCIATE_QUARANTINED, None) + + override def constructDisassociate(info: AssociationHandle.DisassociateInfo): ByteString = info match { + case AssociationHandle.Unknown ⇒ DISASSOCIATE + case AssociationHandle.Shutdown ⇒ DISASSOCIATE_SHUTTING_DOWN + case AssociationHandle.Quarantined ⇒ DISASSOCIATE_QUARANTINED + } override val constructHeartbeat: ByteString = constructControlMessagePdu(WireFormats.CommandType.HEARTBEAT, None) @@ -201,7 +204,7 @@ private[remote] object AkkaPduProtobufCodec extends AkkaPduCodec { private def decodeControlPdu(controlPdu: AkkaControlMessage): AkkaPdu = { controlPdu.getCommandType match { - case CommandType.CONNECT if controlPdu.hasHandshakeInfo ⇒ + case CommandType.ASSOCIATE if controlPdu.hasHandshakeInfo ⇒ val handshakeInfo = controlPdu.getHandshakeInfo val cookie = if (handshakeInfo.hasCookie) Some(handshakeInfo.getCookie) else None Associate( @@ -209,8 +212,10 @@ private[remote] object AkkaPduProtobufCodec extends AkkaPduCodec { decodeAddress(handshakeInfo.getOrigin), handshakeInfo.getUid.toInt, // 64 bits are allocated in the wire formats, but we use only 32 for now cookie)) - case CommandType.SHUTDOWN ⇒ Disassociate - case CommandType.HEARTBEAT ⇒ Heartbeat + case CommandType.DISASSOCIATE ⇒ Disassociate(AssociationHandle.Unknown) + case CommandType.DISASSOCIATE_SHUTTING_DOWN ⇒ Disassociate(AssociationHandle.Shutdown) + case CommandType.DISASSOCIATE_QUARANTINED ⇒ Disassociate(AssociationHandle.Quarantined) + case CommandType.HEARTBEAT ⇒ Heartbeat case x ⇒ throw new PduCodecException(s"Decoding of control PDU failed, invalid format, unexpected: [${x}]", null) } diff --git a/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala b/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala index 737b8522ca..ad91494682 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala @@ -155,7 +155,9 @@ private[remote] class AkkaProtocolHandle( override def write(payload: ByteString): Boolean = wrappedHandle.write(codec.constructPayload(payload)) - override def disassociate(): Unit = stateActor ! DisassociateUnderlying + override def disassociate(): Unit = stateActor ! DisassociateUnderlying(Unknown) + + def disassociate(info: DisassociateInfo): Unit = stateActor ! DisassociateUnderlying(info) } @@ -277,7 +279,7 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat stay() } - case Event(DisassociateUnderlying, _) ⇒ + case Event(DisassociateUnderlying(_), _) ⇒ stop() case _ ⇒ stay() @@ -286,8 +288,8 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat // Timeout of this state is implicitly handled by the failure detector when(WaitHandshake) { - case Event(Disassociated, _) ⇒ - stop() + case Event(Disassociated(info), _) ⇒ + stop(FSM.Failure(info)) case Event(InboundPayload(p), OutboundUnderlyingAssociated(statusPromise, wrappedHandle)) ⇒ decodePdu(p) match { @@ -298,13 +300,13 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat wrappedHandle, immutable.Queue.empty) - case Disassociate ⇒ + case Disassociate(info) ⇒ // After receiving Disassociate we MUST NOT send back a Disassociate (loop) - stop() + stop(FSM.Failure(info)) case _ ⇒ // Expected handshake to be finished, dropping connection - sendDisassociate(wrappedHandle) + sendDisassociate(wrappedHandle, Unknown) stop() } @@ -315,7 +317,7 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat case Event(InboundPayload(p), InboundUnassociated(associationHandler, wrappedHandle)) ⇒ decodePdu(p) match { // After receiving Disassociate we MUST NOT send back a Disassociate (loop) - case Disassociate ⇒ stop() + case Disassociate(info) ⇒ stop(FSM.Failure(info)) // Incoming association -- implicitly ACK by a heartbeat case Associate(info) ⇒ @@ -338,7 +340,7 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat // Got a stray message -- explicitly reset the association (force remote endpoint to reassociate) case _ ⇒ - sendDisassociate(wrappedHandle) + sendDisassociate(wrappedHandle, Unknown) stop() } @@ -346,13 +348,13 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat } when(Open) { - case Event(Disassociated, _) ⇒ - stop() + case Event(Disassociated(info), _) ⇒ + stop(FSM.Failure(info)) case Event(InboundPayload(p), _) ⇒ decodePdu(p) match { - case Disassociate ⇒ - stop() + case Disassociate(info) ⇒ + stop(FSM.Failure(info)) case Heartbeat ⇒ failureDetector.heartbeat(); stay() @@ -374,14 +376,14 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat case Event(HeartbeatTimer, AssociatedWaitHandler(_, wrappedHandle, _)) ⇒ handleTimers(wrappedHandle) case Event(HeartbeatTimer, ListenerReady(_, wrappedHandle)) ⇒ handleTimers(wrappedHandle) - case Event(DisassociateUnderlying, _) ⇒ + case Event(DisassociateUnderlying(info: DisassociateInfo), _) ⇒ val handle = stateData match { case ListenerReady(_, wrappedHandle) ⇒ wrappedHandle case AssociatedWaitHandler(_, wrappedHandle, _) ⇒ wrappedHandle case msg ⇒ throw new AkkaProtocolException(s"unhandled message in state Open(DisassociateUnderlying) with type [${safeClassName(msg)}]") } - sendDisassociate(handle) + sendDisassociate(handle, info) stop() case Event(HandleListenerRegistered(listener), AssociatedWaitHandler(_, wrappedHandle, queue)) ⇒ @@ -399,7 +401,7 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat stay() } else { // send disassociate just to be sure - sendDisassociate(wrappedHandle) + sendDisassociate(wrappedHandle, Unknown) stop(FSM.Failure(TimeoutReason)) } } @@ -415,23 +417,35 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat } onTermination { - case StopEvent(_, _, OutboundUnassociated(remoteAddress, statusPromise, transport)) ⇒ - statusPromise.tryFailure(new AkkaProtocolException("Transport disassociated before handshake finished")) + case StopEvent(reason, _, OutboundUnassociated(remoteAddress, statusPromise, transport)) ⇒ + statusPromise.tryFailure(reason match { + case FSM.Failure(info: DisassociateInfo) ⇒ disassociateException(info) + case _ ⇒ new AkkaProtocolException("Transport disassociated before handshake finished") + }) case StopEvent(reason, _, OutboundUnderlyingAssociated(statusPromise, wrappedHandle)) ⇒ - statusPromise.tryFailure(new AkkaProtocolException(reason match { - case FSM.Failure(TimeoutReason) ⇒ "No response from remote. Handshake timed out" - case _ ⇒ "Remote endpoint disassociated before handshake finished" - })) + statusPromise.tryFailure(reason match { + case FSM.Failure(TimeoutReason) ⇒ new AkkaProtocolException("No response from remote. Handshake timed out") + case FSM.Failure(info: DisassociateInfo) ⇒ disassociateException(info) + case _ ⇒ new AkkaProtocolException("Transport disassociated before handshake finished") + }) wrappedHandle.disassociate() - case StopEvent(_, _, AssociatedWaitHandler(handlerFuture, wrappedHandle, queue)) ⇒ + case StopEvent(reason, _, AssociatedWaitHandler(handlerFuture, wrappedHandle, queue)) ⇒ // Invalidate exposed but still unfinished promise. The underlying association disappeared, so after // registration immediately signal a disassociate - handlerFuture foreach { _ notify Disassociated } + val disassociateNotification = reason match { + case FSM.Failure(info: DisassociateInfo) ⇒ Disassociated(info) + case _ ⇒ Disassociated(Unknown) + } + handlerFuture foreach { _ notify disassociateNotification } - case StopEvent(_, _, ListenerReady(handler, wrappedHandle)) ⇒ - handler notify Disassociated + case StopEvent(reason, _, ListenerReady(handler, wrappedHandle)) ⇒ + val disassociateNotification = reason match { + case FSM.Failure(info: DisassociateInfo) ⇒ Disassociated(info) + case _ ⇒ Disassociated(Unknown) + } + handler notify disassociateNotification wrappedHandle.disassociate() case StopEvent(_, _, InboundUnassociated(_, wrappedHandle)) ⇒ @@ -439,9 +453,20 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat } + private def disassociateException(info: DisassociateInfo): Exception = info match { + case Unknown ⇒ + new AkkaProtocolException("The remote system explicitly disassociated (reason unknown).") + case Shutdown ⇒ + InvalidAssociationException("The remote system refused the association because it is shutting down.") + case Quarantined ⇒ + InvalidAssociationException("The remote system has quarantined this system. No further associations to the remote " + + "system are possible until this system is restarted.") + } + override protected def logTermination(reason: FSM.Reason): Unit = reason match { - case FSM.Failure(TimeoutReason) ⇒ // no logging - case other ⇒ super.logTermination(reason) + case FSM.Failure(TimeoutReason) ⇒ // no logging + case FSM.Failure(_: DisassociateInfo) ⇒ // no logging + case other ⇒ super.logTermination(reason) } private def listenForListenerRegistration(readHandlerPromise: Promise[HandleEventListener]): Unit = @@ -494,9 +519,10 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat case NonFatal(e) ⇒ throw new AkkaProtocolException("Error writing HEARTBEAT to transport", e) } - private def sendDisassociate(wrappedHandle: AssociationHandle): Unit = try wrappedHandle.write(codec.constructDisassociate) catch { - case NonFatal(e) ⇒ throw new AkkaProtocolException("Error writing DISASSOCIATE to transport", e) - } + private def sendDisassociate(wrappedHandle: AssociationHandle, info: DisassociateInfo): Unit = + try wrappedHandle.write(codec.constructDisassociate(info)) catch { + case NonFatal(e) ⇒ throw new AkkaProtocolException("Error writing DISASSOCIATE to transport", e) + } private def sendAssociate(wrappedHandle: AssociationHandle, info: HandshakeInfo): Boolean = try { wrappedHandle.write(codec.constructAssociate(info)) diff --git a/akka-remote/src/main/scala/akka/remote/transport/TestTransport.scala b/akka-remote/src/main/scala/akka/remote/transport/TestTransport.scala index d591e55564..c9b7df6661 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/TestTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/TestTransport.scala @@ -125,7 +125,9 @@ class TestTransport( } private def defaultDisassociate(handle: TestAssociationHandle): Future[Unit] = { - registry.deregisterAssociation(handle.key).foreach { registry.remoteListenerRelativeTo(handle, _) notify Disassociated } + registry.deregisterAssociation(handle.key).foreach { + registry.remoteListenerRelativeTo(handle, _) notify Disassociated(AssociationHandle.Unknown) + } Future.successful(()) } diff --git a/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala b/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala index be42d58f07..373361ef39 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala @@ -440,8 +440,8 @@ private[transport] class ThrottledAssociation( inboundThrottleMode = mode sender ! SetThrottleAck stay() - case Event(Disassociated, _) ⇒ - if (upstreamListener ne null) upstreamListener notify Disassociated + case Event(Disassociated(info), _) ⇒ + if (upstreamListener ne null) upstreamListener notify Disassociated(info) originalHandle.disassociate() stop() } diff --git a/akka-remote/src/main/scala/akka/remote/transport/Transport.scala b/akka-remote/src/main/scala/akka/remote/transport/Transport.scala index 3673c2a0fc..42d88f9a3a 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/Transport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/Transport.scala @@ -8,6 +8,7 @@ import akka.actor.{ ActorRef, Address } import akka.util.ByteString import akka.remote.transport.AssociationHandle.HandleEventListener import akka.AkkaException +import scala.util.control.NoStackTrace object Transport { @@ -18,7 +19,7 @@ object Transport { * hostname, etc.). */ @SerialVersionUID(1L) - case class InvalidAssociationException(msg: String, cause: Throwable) extends AkkaException(msg, cause) + case class InvalidAssociationException(msg: String, cause: Throwable = null) extends AkkaException(msg, cause) with NoStackTrace /** * Message sent to a [[akka.remote.transport.Transport.AssociationEventListener]] registered to a transport @@ -159,8 +160,20 @@ object AssociationHandle { /** * Message sent to the listener registered to an association + * + * @param info + * information about the reason of disassociation */ - case object Disassociated extends HandleEvent + case class Disassociated(info: DisassociateInfo) extends HandleEvent + + /** + * Supertype of possible disassociation reasons + */ + sealed trait DisassociateInfo + + case object Unknown extends DisassociateInfo + case object Shutdown extends DisassociateInfo + case object Quarantined extends DisassociateInfo /** * An interface that needs to be implemented by the user of an [[akka.remote.transport.AssociationHandle]] diff --git a/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala b/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala index 821878823e..8843620b16 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala @@ -436,8 +436,8 @@ class NettyTransport(val settings: NettyTransportSettings, val system: ExtendedA def always(c: ChannelGroupFuture) = NettyFutureBridge(c).map(_ ⇒ true) recover { case _ ⇒ false } for { // Force flush by trying to write an empty buffer and wait for success - lastWriteStatus ← always(channelGroup.write(ChannelBuffers.buffer(0))) unbindStatus ← always(channelGroup.unbind()) + lastWriteStatus ← always(channelGroup.write(ChannelBuffers.buffer(0))) disconnectStatus ← always(channelGroup.disconnect()) closeStatus ← always(channelGroup.close()) } yield { diff --git a/akka-remote/src/main/scala/akka/remote/transport/netty/TcpSupport.scala b/akka-remote/src/main/scala/akka/remote/transport/netty/TcpSupport.scala index 663568b2ba..60f22fb73c 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/netty/TcpSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/netty/TcpSupport.scala @@ -38,7 +38,7 @@ private[remote] trait TcpHandlers extends CommonHandlers { new TcpAssociationHandle(localAddress, remoteAddress, transport, channel) override def onDisconnect(ctx: ChannelHandlerContext, e: ChannelStateEvent): Unit = - notifyListener(e.getChannel, Disassociated) + notifyListener(e.getChannel, Disassociated(AssociationHandle.Unknown)) override def onMessage(ctx: ChannelHandlerContext, e: MessageEvent): Unit = { val bytes: Array[Byte] = e.getMessage.asInstanceOf[ChannelBuffer].array() @@ -46,7 +46,7 @@ private[remote] trait TcpHandlers extends CommonHandlers { } override def onException(ctx: ChannelHandlerContext, e: ExceptionEvent): Unit = { - notifyListener(e.getChannel, Disassociated) + notifyListener(e.getChannel, Disassociated(AssociationHandle.Unknown)) e.getChannel.close() // No graceful close here } } diff --git a/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolSpec.scala b/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolSpec.scala index 486422d2ce..9f5b175db8 100644 --- a/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolSpec.scala @@ -3,7 +3,7 @@ package akka.remote.transport import akka.actor.{ ExtendedActorSystem, Address, Props } import akka.remote.transport.AkkaPduCodec.{ Disassociate, Associate, Heartbeat } import akka.remote.transport.AkkaProtocolSpec.TestFailureDetector -import akka.remote.transport.AssociationHandle.{ ActorHandleEventListener, Disassociated, InboundPayload } +import akka.remote.transport.AssociationHandle.{ DisassociateInfo, ActorHandleEventListener, Disassociated, InboundPayload } import akka.remote.transport.TestTransport._ import akka.remote.transport.Transport._ import akka.remote.{ SeqNo, WireFormats, RemoteActorRefProvider, FailureDetector } @@ -74,7 +74,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re def testHeartbeat = InboundPayload(codec.constructHeartbeat) def testPayload = InboundPayload(testMsgPdu) - def testDisassociate = InboundPayload(codec.constructDisassociate) + def testDisassociate(info: DisassociateInfo) = InboundPayload(codec.constructDisassociate(info)) def testAssociate(uid: Int, cookie: Option[String]) = InboundPayload(codec.constructAssociate(HandshakeInfo(remoteAkkaAddress, uid, cookie))) @@ -113,8 +113,8 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re if (registry.logSnapshot.isEmpty) false else registry.logSnapshot.last match { case WriteAttempt(sender, recipient, payload) if sender == localAddress && recipient == remoteAddress ⇒ codec.decodePdu(payload) match { - case Disassociate ⇒ true - case _ ⇒ false + case Disassociate(_) ⇒ true + case _ ⇒ false } case _ ⇒ false } @@ -326,9 +326,9 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re wrappedHandle.readHandlerPromise.success(ActorHandleEventListener(testActor)) - reader ! testDisassociate + reader ! testDisassociate(AssociationHandle.Unknown) - expectMsg(Disassociated) + expectMsg(Disassociated(AssociationHandle.Unknown)) } "handle transport level disassociations" in { @@ -361,9 +361,9 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re wrappedHandle.readHandlerPromise.success(ActorHandleEventListener(testActor)) - reader ! Disassociated + reader ! Disassociated(AssociationHandle.Unknown) - expectMsg(Disassociated) + expectMsg(Disassociated(AssociationHandle.Unknown)) } "disassociate when failure detector signals failure" in { @@ -401,7 +401,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re failureDetector.isAvailable = false - expectMsg(Disassociated) + expectMsg(Disassociated(AssociationHandle.Unknown)) } "handle correctly when the handler is registered only after the association is already closed" in { @@ -432,11 +432,11 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re case _ ⇒ fail() } - stateActor ! Disassociated + stateActor ! Disassociated(AssociationHandle.Unknown) wrappedHandle.readHandlerPromise.success(ActorHandleEventListener(testActor)) - expectMsg(Disassociated) + expectMsg(Disassociated(AssociationHandle.Unknown)) } diff --git a/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolStressTest.scala b/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolStressTest.scala index 960d176057..8c26b62c30 100644 --- a/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolStressTest.scala +++ b/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolStressTest.scala @@ -25,7 +25,7 @@ object AkkaProtocolStressTest { acceptable-heartbeat-pause = 0.01 s } remote.retry-window = 1 s - remote.maximum-retries-in-window = 1000 + remote.maximum-retries-in-window = 3 remote.netty.tcp { applied-adapters = ["gremlin"] @@ -33,10 +33,12 @@ object AkkaProtocolStressTest { } } - """) + """) class SequenceVerifier(remote: ActorRef, controller: ActorRef) extends Actor { - val limit = 10000 + import context.dispatcher + + val limit = 100000 var nextSeq = 0 var maxSeq = -1 var losses = 0 @@ -46,7 +48,8 @@ object AkkaProtocolStressTest { case "sendNext" ⇒ if (nextSeq < limit) { remote ! nextSeq nextSeq += 1 - self ! "sendNext" + if (nextSeq % 2000 == 0) context.system.scheduler.scheduleOnce(500.milliseconds, self, "sendNext") + else self ! "sendNext" } case seq: Int ⇒ if (seq > maxSeq) { @@ -82,11 +85,13 @@ class AkkaProtocolStressTest extends AkkaSpec(configA) with ImplicitSender with "AkkaProtocolTransport" must { "guarantee at-most-once delivery and message ordering despite packet loss" taggedAs TimingTest in { + system.eventStream.publish(TestEvent.Mute(DeadLettersFilter[Any])) + systemB.eventStream.publish(TestEvent.Mute(DeadLettersFilter[Any])) Await.result(RARP(system).provider.transport.managementCommand(One(addressB, Drop(0.3, 0.3))), 3.seconds.dilated) val tester = system.actorOf(Props(classOf[SequenceVerifier], here, self)) ! "start" - expectMsgPF(45.seconds) { + expectMsgPF(60.seconds) { case (received: Int, lost: Int) ⇒ log.debug(s" ######## Received ${received - lost} messages from ${received} ########") } diff --git a/akka-remote/src/test/scala/akka/remote/transport/GenericTransportSpec.scala b/akka-remote/src/test/scala/akka/remote/transport/GenericTransportSpec.scala index 7248ef53d5..20813008d5 100644 --- a/akka-remote/src/test/scala/akka/remote/transport/GenericTransportSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/transport/GenericTransportSpec.scala @@ -147,7 +147,7 @@ abstract class GenericTransportSpec(withAkkaProtocol: Boolean = false) handleA.disassociate() expectMsgPF(timeout.duration) { - case Disassociated ⇒ + case Disassociated(_) ⇒ } awaitCond(!registry.existsAssociation(addressATest, addressBTest)) diff --git a/akka-remote/src/test/scala/akka/remote/transport/SystemMessageDeliveryStressTest.scala b/akka-remote/src/test/scala/akka/remote/transport/SystemMessageDeliveryStressTest.scala index 4206702230..225b91c8b8 100644 --- a/akka-remote/src/test/scala/akka/remote/transport/SystemMessageDeliveryStressTest.scala +++ b/akka-remote/src/test/scala/akka/remote/transport/SystemMessageDeliveryStressTest.scala @@ -46,7 +46,7 @@ object SystemMessageDeliveryStressTest { acceptable-heartbeat-pause = 0.01 s } remote.retry-window = 1 s - remote.maximum-retries-in-window = 1000 + remote.maximum-retries-in-window = 2 remote.use-passive-connections = on remote.netty.tcp { @@ -90,7 +90,9 @@ object SystemMessageDeliveryStressTest { } abstract class SystemMessageDeliveryStressTest(msg: String, cfg: String) - extends AkkaSpec(ConfigFactory.parseString(cfg).withFallback(configA)) with ImplicitSender with DefaultTimeout { + extends AkkaSpec(ConfigFactory.parseString(cfg).withFallback(SystemMessageDeliveryStressTest.baseConfig)) + with ImplicitSender + with DefaultTimeout { import SystemMessageDeliveryStressTest._ val systemB = ActorSystem("systemB", system.settings.config) diff --git a/akka-remote/src/test/scala/akka/remote/transport/TestTransportSpec.scala b/akka-remote/src/test/scala/akka/remote/transport/TestTransportSpec.scala index 2f1935018e..8312018738 100644 --- a/akka-remote/src/test/scala/akka/remote/transport/TestTransportSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/transport/TestTransportSpec.scala @@ -124,7 +124,7 @@ class TestTransportSpec extends AkkaSpec with DefaultTimeout with ImplicitSender handleA.disassociate() expectMsgPF(timeout.duration) { - case Disassociated ⇒ + case Disassociated(_) ⇒ } awaitCond(!registry.existsAssociation(addressA, addressB))