Fixes for leaked connection

- Also introduces reason in the Disassociate message
 - Reliable delivery now transitions from idle to active if there are pending system msgs
 - Minor fix in merging receive buffers (reduces resends)
 - Tweaked WireFormat
 - Removed busy-wait in startup
 - throwing the proper exception type in EndpointReader
 - InvalidAssociationException extends NoStackTrace
This commit is contained in:
Endre Sándor Varga 2013-06-19 14:09:14 +02:00
parent 6d8e13c760
commit e6e5be859b
20 changed files with 220 additions and 120 deletions

View file

@ -10,23 +10,29 @@ public final class WireFormats {
}
public enum CommandType
implements com.google.protobuf.ProtocolMessageEnum {
CONNECT(0, 1),
SHUTDOWN(1, 2),
ASSOCIATE(0, 1),
DISASSOCIATE(1, 2),
HEARTBEAT(2, 3),
DISASSOCIATE_SHUTTING_DOWN(3, 4),
DISASSOCIATE_QUARANTINED(4, 5),
;
public static final int CONNECT_VALUE = 1;
public static final int SHUTDOWN_VALUE = 2;
public static final int ASSOCIATE_VALUE = 1;
public static final int DISASSOCIATE_VALUE = 2;
public static final int HEARTBEAT_VALUE = 3;
public static final int DISASSOCIATE_SHUTTING_DOWN_VALUE = 4;
public static final int DISASSOCIATE_QUARANTINED_VALUE = 5;
public final int getNumber() { return value; }
public static CommandType valueOf(int value) {
switch (value) {
case 1: return CONNECT;
case 2: return SHUTDOWN;
case 1: return ASSOCIATE;
case 2: return DISASSOCIATE;
case 3: return HEARTBEAT;
case 4: return DISASSOCIATE_SHUTTING_DOWN;
case 5: return DISASSOCIATE_QUARANTINED;
default: return null;
}
}
@ -57,7 +63,7 @@ public final class WireFormats {
}
private static final CommandType[] VALUES = {
CONNECT, SHUTDOWN, HEARTBEAT,
ASSOCIATE, DISASSOCIATE, HEARTBEAT, DISASSOCIATE_SHUTTING_DOWN, DISASSOCIATE_QUARANTINED,
};
public static CommandType valueOf(
@ -5626,7 +5632,7 @@ public final class WireFormats {
}
private void initFields() {
commandType_ = akka.remote.WireFormats.CommandType.CONNECT;
commandType_ = akka.remote.WireFormats.CommandType.ASSOCIATE;
handshakeInfo_ = akka.remote.WireFormats.AkkaHandshakeInfo.getDefaultInstance();
}
private byte memoizedIsInitialized = -1;
@ -5799,7 +5805,7 @@ public final class WireFormats {
public Builder clear() {
super.clear();
commandType_ = akka.remote.WireFormats.CommandType.CONNECT;
commandType_ = akka.remote.WireFormats.CommandType.ASSOCIATE;
bitField0_ = (bitField0_ & ~0x00000001);
if (handshakeInfoBuilder_ == null) {
handshakeInfo_ = akka.remote.WireFormats.AkkaHandshakeInfo.getDefaultInstance();
@ -5947,7 +5953,7 @@ public final class WireFormats {
private int bitField0_;
// required .CommandType commandType = 1;
private akka.remote.WireFormats.CommandType commandType_ = akka.remote.WireFormats.CommandType.CONNECT;
private akka.remote.WireFormats.CommandType commandType_ = akka.remote.WireFormats.CommandType.ASSOCIATE;
public boolean hasCommandType() {
return ((bitField0_ & 0x00000001) == 0x00000001);
}
@ -5965,7 +5971,7 @@ public final class WireFormats {
}
public Builder clearCommandType() {
bitField0_ = (bitField0_ & ~0x00000001);
commandType_ = akka.remote.WireFormats.CommandType.CONNECT;
commandType_ = akka.remote.WireFormats.CommandType.ASSOCIATE;
onChanged();
return this;
}
@ -7410,8 +7416,10 @@ public final class WireFormats {
"(\0132\014.AddressData\022\013\n\003uid\030\002 \002(\006\022\016\n\006cookie\030" +
"\003 \001(\t\"O\n\013AddressData\022\016\n\006system\030\001 \002(\t\022\020\n\010" +
"hostname\030\002 \002(\t\022\014\n\004port\030\003 \002(\r\022\020\n\010protocol" +
"\030\004 \001(\t*7\n\013CommandType\022\013\n\007CONNECT\020\001\022\014\n\010SH" +
"UTDOWN\020\002\022\r\n\tHEARTBEAT\020\003B\017\n\013akka.remoteH\001"
"\030\004 \001(\t*{\n\013CommandType\022\r\n\tASSOCIATE\020\001\022\020\n\014" +
"DISASSOCIATE\020\002\022\r\n\tHEARTBEAT\020\003\022\036\n\032DISASSO" +
"CIATE_SHUTTING_DOWN\020\004\022\034\n\030DISASSOCIATE_QU",
"ARANTINED\020\005B\017\n\013akka.remoteH\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {

View file

@ -119,12 +119,13 @@ message AkkaHandshakeInfo {
* Defines the type of the AkkaControlMessage command type
*/
enum CommandType {
CONNECT = 1;
SHUTDOWN = 2;
ASSOCIATE = 1;
DISASSOCIATE = 2;
HEARTBEAT = 3;
DISASSOCIATE_SHUTTING_DOWN = 4; // Remote system is going down and will not accepts new connections
DISASSOCIATE_QUARANTINED = 5; // Remote system refused the association since the current system is quarantined
}
/**
* Defines a remote address.
*/

View file

@ -239,13 +239,15 @@ akka {
retry-window = 60 s
maximum-retries-in-window = 3
# The length of time to gate an address whose name lookup has failed.
# The length of time to gate an address whose name lookup has failed
# or has explicitly signalled that it will not accept connections
# (remote system is shutting down or the requesting system is quarantined).
# No connection attempts will be made to an address while it remains
# gated. Any messages sent to a gated address will be directed to dead
# letters instead. Name lookups are costly, and the time to recovery
# is typically large, therefore this setting should be a value in the
# order of seconds or minutes.
gate-unknown-addresses-for = 60 s
gate-invalid-addresses-for = 60 s
# This settings controls how long a system will be quarantined after
# catastrophic communication failures that result in the loss of system

View file

@ -181,10 +181,11 @@ case class AckedReceiveBuffer[T <: HasSequenceNumber](
* @return The merged receive buffer.
*/
def mergeFrom(that: AckedReceiveBuffer[T]): AckedReceiveBuffer[T] = {
val mergedLastDelivered = max(this.lastDelivered, that.lastDelivered)
this.copy(
lastDelivered = max(this.lastDelivered, that.lastDelivered),
lastDelivered = mergedLastDelivered,
cumulativeAck = max(this.cumulativeAck, that.cumulativeAck),
buf = (this.buf union that.buf).filter { _.seq > lastDelivered })
buf = (this.buf union that.buf).filter { _.seq > mergedLastDelivered })
}
override def toString = buf.map { _.seq }.mkString("[", ", ", "]")

View file

@ -3,27 +3,29 @@
*/
package akka.remote
import akka.actor.OneForOneStrategy
import akka.actor.SupervisorStrategy._
import akka.actor.Terminated
import akka.actor._
import akka.dispatch.sysmsg.SystemMessage
import akka.event.LoggingAdapter
import akka.pattern.pipe
import akka.remote.EndpointManager.Link
import akka.remote.EndpointManager.Send
import akka.remote.EndpointWriter.{ StoppedReading, FlushAndStop }
import akka.remote.WireFormats.SerializedMessage
import akka.remote.transport.AkkaPduCodec._
import akka.remote.transport.AssociationHandle._
import akka.remote.transport.AkkaPduCodec.Message
import akka.remote.transport.AssociationHandle.{ DisassociateInfo, ActorHandleEventListener, Disassociated, InboundPayload }
import akka.remote.transport.Transport.InvalidAssociationException
import akka.remote.transport.{ AkkaPduProtobufCodec, AkkaPduCodec, Transport, AkkaProtocolHandle }
import akka.remote.transport._
import akka.serialization.Serialization
import akka.util.ByteString
import akka.{ OnlyCauseStackTrace, AkkaException }
import java.io.NotSerializableException
import java.util.concurrent.ConcurrentHashMap
import scala.annotation.tailrec
import scala.concurrent.duration.{ Duration, Deadline }
import scala.util.control.NonFatal
import scala.annotation.tailrec
import akka.remote.EndpointWriter.FlushAndStop
import akka.actor.SupervisorStrategy._
import akka.remote.EndpointManager.Link
/**
* INTERNAL API
@ -145,6 +147,7 @@ private[remote] class OversizedPayloadException(msg: String) extends EndpointExc
*/
private[remote] object ReliableDeliverySupervisor {
case object Ungate
case object AttemptSysMsgRedelivery
case class GotUid(uid: Int)
def props(
@ -171,6 +174,7 @@ private[remote] class ReliableDeliverySupervisor(
val codec: AkkaPduCodec,
val receiveBuffers: ConcurrentHashMap[Link, AckedReceiveBuffer[Message]]) extends Actor {
import ReliableDeliverySupervisor._
import context.dispatcher
def retryGateEnabled = settings.RetryGateClosedFor > Duration.Zero
@ -178,11 +182,11 @@ private[remote] class ReliableDeliverySupervisor(
case e @ (_: InvalidAssociation | _: HopelessAssociation | _: QuarantinedUidException) Escalate
case NonFatal(e)
if (retryGateEnabled) {
import context.dispatcher
context.become(gated)
context.system.scheduler.scheduleOnce(settings.RetryGateClosedFor, self, Ungate)
context.unwatch(writer)
currentHandle = None
context.parent ! StoppedReading(self)
Stop
} else {
Restart
@ -242,6 +246,9 @@ private[remote] class ReliableDeliverySupervisor(
resendNacked()
case Terminated(_)
currentHandle = None
context.parent ! StoppedReading(self)
if (resendBuffer.nonAcked.nonEmpty || resendBuffer.nacked.nonEmpty)
context.system.scheduler.scheduleOnce(settings.SysResendTimeout, self, AttemptSysMsgRedelivery)
context.become(idle)
case GotUid(u) uid = Some(u)
case s: EndpointWriter.StopReading writer forward s
@ -267,6 +274,10 @@ private[remote] class ReliableDeliverySupervisor(
resendAll()
handleSend(s)
context.become(receive)
case AttemptSysMsgRedelivery
writer = createWriter()
resendAll()
context.become(receive)
case EndpointWriter.FlushAndStop context.stop(self)
case EndpointWriter.StopReading(w) sender ! EndpointWriter.StoppedReading(w)
}
@ -415,6 +426,7 @@ private[remote] class EndpointWriter(
val msgDispatch = new DefaultMessageDispatcher(extendedSystem, provider, log)
var inbound = handle.isDefined
var stopReason: DisassociateInfo = AssociationHandle.Unknown
private def publishAndThrow(reason: Throwable): Nothing = {
publishError(reason)
@ -521,6 +533,7 @@ private[remote] class EndpointWriter(
case Event(FlushAndStop, _)
// Try to send a last Ack message
trySendPureAck()
stopReason = AssociationHandle.Shutdown
stop()
case Event(AckIdleCheckTimer, _) if ackDeadline.isOverdue()
@ -558,6 +571,7 @@ private[remote] class EndpointWriter(
handle = Some(newHandle)
goto(Handoff)
case Event(FlushAndStop, _)
stopReason = AssociationHandle.Shutdown
stop()
case Event(OutboundAck(ack), _)
lastAck = Some(ack)
@ -583,7 +597,7 @@ private[remote] class EndpointWriter(
// It is important to call unstashAll() for the stash to work properly and maintain messages during restart.
// As the FSM trait does not call super.postStop(), this call is needed
unstashAll()
handle foreach { _.disassociate() }
handle foreach { _.disassociate(stopReason) }
eventPublisher.notifyListeners(DisassociatedEvent(localAddress, remoteAddress, inbound))
}
@ -675,8 +689,7 @@ private[remote] class EndpointReader(
}
override def receive: Receive = {
case Disassociated
context.stop(self)
case Disassociated(info) handleDisassociated(info)
case InboundPayload(p) if p.size <= transport.maximumPayloadBytes
val (ackOption, msgOption) = tryDecodeMessageAndAck(p)
@ -705,10 +718,10 @@ private[remote] class EndpointReader(
}
def notReading: Receive = {
case Disassociated context.stop(self)
case Disassociated(info) handleDisassociated(info)
case StopReading(newHandle)
sender ! StoppedReading(newHandle)
case StopReading(writer)
sender ! StoppedReading(writer)
case InboundPayload(p)
val (ackOption, _) = tryDecodeMessageAndAck(p)
@ -717,6 +730,22 @@ private[remote] class EndpointReader(
case _
}
private def handleDisassociated(info: DisassociateInfo): Unit = info match {
case AssociationHandle.Unknown
context.stop(self)
case AssociationHandle.Shutdown
throw InvalidAssociation(
localAddress,
remoteAddress,
InvalidAssociationException("The remote system terminated the association because it is shutting down."))
case AssociationHandle.Quarantined
throw InvalidAssociation(
localAddress,
remoteAddress,
InvalidAssociationException("The remote system has quarantined this system. No further associations " +
"to the remote system are possible until this system is restarted."))
}
private def deliverAndAck(): Unit = {
val (updatedBuffer, deliver, ack) = ackedReceiveBuffer.extractDeliverable
ackedReceiveBuffer = updatedBuffer

View file

@ -40,8 +40,8 @@ final class RemoteSettings(val config: Config) {
} requiring (_ >= Duration.Zero, "retry-gate-closed-for must be >= 0")
val UnknownAddressGateClosedFor: FiniteDuration = {
Duration(getMilliseconds("akka.remote.gate-unknown-addresses-for"), MILLISECONDS)
} requiring (_ > Duration.Zero, "gate-unknown-addresses-for must be > 0")
Duration(getMilliseconds("akka.remote.gate-invalid-addresses-for"), MILLISECONDS)
} requiring (_ > Duration.Zero, "gate-invalid-addresses-for must be > 0")
val UsePassiveConnections: Boolean = getBoolean("akka.remote.use-passive-connections")

View file

@ -379,7 +379,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
override val supervisorStrategy =
OneForOneStrategy(loggingEnabled = false) {
case InvalidAssociation(localAddress, remoteAddress, _)
log.error("Tried to associate with invalid remote address [{}]. " +
log.error("Tried to associate with unreachable remote address [{}]. " +
"Address is now quarantined, all messages to this address will be delivered to dead letters.", remoteAddress)
endpoints.markAsFailed(sender, Deadline.now + settings.UnknownAddressGateClosedFor)
context.system.eventStream.publish(AddressTerminated(remoteAddress))
@ -445,7 +445,8 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
addressesPromise.success(transportsAndAddresses)
case ListensFailure(addressesPromise, cause)
addressesPromise.failure(cause)
case ia: InboundAssociation
context.system.scheduler.scheduleOnce(10.milliseconds, self, ia)
case ManagementCommand(_)
sender ! ManagementCommandAck(false)
case StartupFinished
@ -510,9 +511,11 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
case Some(endpoint)
endpoint ! EndpointWriter.TakeOver(handle)
case None
if (endpoints.isQuarantined(handle.remoteAddress, handle.handshakeInfo.uid)) handle.disassociate()
if (endpoints.isQuarantined(handle.remoteAddress, handle.handshakeInfo.uid))
handle.disassociate(AssociationHandle.Quarantined)
else endpoints.writableEndpointWithPolicyFor(handle.remoteAddress) match {
case Some(Pass(ep))
pendingReadHandoffs.get(ep) foreach (_.disassociate())
pendingReadHandoffs += ep -> handle
ep ! EndpointWriter.StopReading(ep)
case _
@ -562,14 +565,16 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
shutdownStatus shutdownAll(transportMapping.values)(_.shutdown())
} yield flushStatus && shutdownStatus) pipeTo sender
pendingReadHandoffs.valuesIterator foreach (_.disassociate(AssociationHandle.Shutdown))
// Ignore all other writes
context.become(flushing)
}
def flushing: Receive = {
case s: Send extendedSystem.deadLetters ! s
case InboundAssociation(h) h.disassociate()
case Terminated(_) // why should we care now?
case s: Send extendedSystem.deadLetters ! s
case InboundAssociation(h: AkkaProtocolHandle) h.disassociate(AssociationHandle.Shutdown)
case Terminated(_) // why should we care now?
}
private def listens: Future[Seq[(Transport, Address, Promise[AssociationEventListener])]] = {

View file

@ -13,6 +13,7 @@ import scala.collection.immutable
import scala.concurrent.duration._
import scala.concurrent.{ ExecutionContext, Promise, Future }
import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue }
import akka.remote.transport.AssociationHandle.DisassociateInfo
trait TransportAdapterProvider {
/**
@ -125,7 +126,7 @@ object ActorTransportAdapter {
case class AssociateUnderlying(remoteAddress: Address, statusPromise: Promise[AssociationHandle]) extends TransportOperation
case class ListenUnderlying(listenAddress: Address,
upstreamListener: Future[AssociationEventListener]) extends TransportOperation
case object DisassociateUnderlying extends TransportOperation
case class DisassociateUnderlying(info: DisassociateInfo = AssociationHandle.Unknown) extends TransportOperation
implicit val AskTimeout = Timeout(5.seconds)
}

View file

@ -6,15 +6,10 @@ package akka.remote.transport
import akka.AkkaException
import akka.actor.{ AddressFromURIString, InternalActorRef, Address, ActorRef }
import akka.remote.WireFormats._
import akka.remote.transport.AkkaPduCodec._
import akka.remote._
import akka.util.ByteString
import com.google.protobuf.InvalidProtocolBufferException
import com.google.protobuf.{ ByteString PByteString }
import akka.remote.Ack
import akka.remote.transport.AkkaPduCodec.Payload
import akka.remote.transport.AkkaPduCodec.Associate
import akka.remote.transport.AkkaPduCodec.Message
/**
* INTERNAL API
@ -35,7 +30,7 @@ private[remote] object AkkaPduCodec {
*/
sealed trait AkkaPdu
case class Associate(info: HandshakeInfo) extends AkkaPdu
case object Disassociate extends AkkaPdu
case class Disassociate(reason: AssociationHandle.DisassociateInfo) extends AkkaPdu
case object Heartbeat extends AkkaPdu
case class Payload(bytes: ByteString) extends AkkaPdu
@ -57,7 +52,7 @@ private[remote] object AkkaPduCodec {
* A Codec that is able to convert Akka PDUs (Protocol Data Units) from and to [[akka.util.ByteString]]s.
*/
private[remote] trait AkkaPduCodec {
import AkkaPduCodec._
/**
* Returns an [[akka.remote.transport.AkkaPduCodec.AkkaPdu]] instance that represents the PDU contained in the raw
* ByteString.
@ -81,17 +76,17 @@ private[remote] trait AkkaPduCodec {
* Encoded form as raw bytes
*/
def encodePdu(pdu: AkkaPdu): ByteString = pdu match {
case Associate(info) constructAssociate(info)
case Payload(bytes) constructPayload(bytes)
case Disassociate constructDisassociate
case Heartbeat constructHeartbeat
case Associate(info) constructAssociate(info)
case Payload(bytes) constructPayload(bytes)
case Disassociate(reason) constructDisassociate(reason)
case Heartbeat constructHeartbeat
}
def constructPayload(payload: ByteString): ByteString
def constructAssociate(info: HandshakeInfo): ByteString
def constructDisassociate: ByteString
def constructDisassociate(reason: AssociationHandle.DisassociateInfo): ByteString
def constructHeartbeat: ByteString
@ -112,6 +107,7 @@ private[remote] trait AkkaPduCodec {
* INTERNAL API
*/
private[remote] object AkkaPduProtobufCodec extends AkkaPduCodec {
import AkkaPduCodec._
private def ackBuilder(ack: Ack): AcknowledgementInfo.Builder = {
val ackBuilder = AcknowledgementInfo.newBuilder()
@ -151,11 +147,18 @@ private[remote] object AkkaPduProtobufCodec extends AkkaPduCodec {
override def constructAssociate(info: HandshakeInfo): ByteString = {
val handshakeInfo = AkkaHandshakeInfo.newBuilder.setOrigin(serializeAddress(info.origin)).setUid(info.uid)
info.cookie foreach handshakeInfo.setCookie
constructControlMessagePdu(WireFormats.CommandType.CONNECT, Some(handshakeInfo))
constructControlMessagePdu(WireFormats.CommandType.ASSOCIATE, Some(handshakeInfo))
}
override val constructDisassociate: ByteString =
constructControlMessagePdu(WireFormats.CommandType.SHUTDOWN, None)
private val DISASSOCIATE = constructControlMessagePdu(WireFormats.CommandType.DISASSOCIATE, None)
private val DISASSOCIATE_SHUTTING_DOWN = constructControlMessagePdu(WireFormats.CommandType.DISASSOCIATE_SHUTTING_DOWN, None)
private val DISASSOCIATE_QUARANTINED = constructControlMessagePdu(WireFormats.CommandType.DISASSOCIATE_QUARANTINED, None)
override def constructDisassociate(info: AssociationHandle.DisassociateInfo): ByteString = info match {
case AssociationHandle.Unknown DISASSOCIATE
case AssociationHandle.Shutdown DISASSOCIATE_SHUTTING_DOWN
case AssociationHandle.Quarantined DISASSOCIATE_QUARANTINED
}
override val constructHeartbeat: ByteString =
constructControlMessagePdu(WireFormats.CommandType.HEARTBEAT, None)
@ -201,7 +204,7 @@ private[remote] object AkkaPduProtobufCodec extends AkkaPduCodec {
private def decodeControlPdu(controlPdu: AkkaControlMessage): AkkaPdu = {
controlPdu.getCommandType match {
case CommandType.CONNECT if controlPdu.hasHandshakeInfo
case CommandType.ASSOCIATE if controlPdu.hasHandshakeInfo
val handshakeInfo = controlPdu.getHandshakeInfo
val cookie = if (handshakeInfo.hasCookie) Some(handshakeInfo.getCookie) else None
Associate(
@ -209,8 +212,10 @@ private[remote] object AkkaPduProtobufCodec extends AkkaPduCodec {
decodeAddress(handshakeInfo.getOrigin),
handshakeInfo.getUid.toInt, // 64 bits are allocated in the wire formats, but we use only 32 for now
cookie))
case CommandType.SHUTDOWN Disassociate
case CommandType.HEARTBEAT Heartbeat
case CommandType.DISASSOCIATE Disassociate(AssociationHandle.Unknown)
case CommandType.DISASSOCIATE_SHUTTING_DOWN Disassociate(AssociationHandle.Shutdown)
case CommandType.DISASSOCIATE_QUARANTINED Disassociate(AssociationHandle.Quarantined)
case CommandType.HEARTBEAT Heartbeat
case x
throw new PduCodecException(s"Decoding of control PDU failed, invalid format, unexpected: [${x}]", null)
}

View file

@ -155,7 +155,9 @@ private[remote] class AkkaProtocolHandle(
override def write(payload: ByteString): Boolean = wrappedHandle.write(codec.constructPayload(payload))
override def disassociate(): Unit = stateActor ! DisassociateUnderlying
override def disassociate(): Unit = stateActor ! DisassociateUnderlying(Unknown)
def disassociate(info: DisassociateInfo): Unit = stateActor ! DisassociateUnderlying(info)
}
@ -277,7 +279,7 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat
stay()
}
case Event(DisassociateUnderlying, _)
case Event(DisassociateUnderlying(_), _)
stop()
case _ stay()
@ -286,8 +288,8 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat
// Timeout of this state is implicitly handled by the failure detector
when(WaitHandshake) {
case Event(Disassociated, _)
stop()
case Event(Disassociated(info), _)
stop(FSM.Failure(info))
case Event(InboundPayload(p), OutboundUnderlyingAssociated(statusPromise, wrappedHandle))
decodePdu(p) match {
@ -298,13 +300,13 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat
wrappedHandle,
immutable.Queue.empty)
case Disassociate
case Disassociate(info)
// After receiving Disassociate we MUST NOT send back a Disassociate (loop)
stop()
stop(FSM.Failure(info))
case _
// Expected handshake to be finished, dropping connection
sendDisassociate(wrappedHandle)
sendDisassociate(wrappedHandle, Unknown)
stop()
}
@ -315,7 +317,7 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat
case Event(InboundPayload(p), InboundUnassociated(associationHandler, wrappedHandle))
decodePdu(p) match {
// After receiving Disassociate we MUST NOT send back a Disassociate (loop)
case Disassociate stop()
case Disassociate(info) stop(FSM.Failure(info))
// Incoming association -- implicitly ACK by a heartbeat
case Associate(info)
@ -338,7 +340,7 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat
// Got a stray message -- explicitly reset the association (force remote endpoint to reassociate)
case _
sendDisassociate(wrappedHandle)
sendDisassociate(wrappedHandle, Unknown)
stop()
}
@ -346,13 +348,13 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat
}
when(Open) {
case Event(Disassociated, _)
stop()
case Event(Disassociated(info), _)
stop(FSM.Failure(info))
case Event(InboundPayload(p), _)
decodePdu(p) match {
case Disassociate
stop()
case Disassociate(info)
stop(FSM.Failure(info))
case Heartbeat
failureDetector.heartbeat(); stay()
@ -374,14 +376,14 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat
case Event(HeartbeatTimer, AssociatedWaitHandler(_, wrappedHandle, _)) handleTimers(wrappedHandle)
case Event(HeartbeatTimer, ListenerReady(_, wrappedHandle)) handleTimers(wrappedHandle)
case Event(DisassociateUnderlying, _)
case Event(DisassociateUnderlying(info: DisassociateInfo), _)
val handle = stateData match {
case ListenerReady(_, wrappedHandle) wrappedHandle
case AssociatedWaitHandler(_, wrappedHandle, _) wrappedHandle
case msg
throw new AkkaProtocolException(s"unhandled message in state Open(DisassociateUnderlying) with type [${safeClassName(msg)}]")
}
sendDisassociate(handle)
sendDisassociate(handle, info)
stop()
case Event(HandleListenerRegistered(listener), AssociatedWaitHandler(_, wrappedHandle, queue))
@ -399,7 +401,7 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat
stay()
} else {
// send disassociate just to be sure
sendDisassociate(wrappedHandle)
sendDisassociate(wrappedHandle, Unknown)
stop(FSM.Failure(TimeoutReason))
}
}
@ -415,23 +417,35 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat
}
onTermination {
case StopEvent(_, _, OutboundUnassociated(remoteAddress, statusPromise, transport))
statusPromise.tryFailure(new AkkaProtocolException("Transport disassociated before handshake finished"))
case StopEvent(reason, _, OutboundUnassociated(remoteAddress, statusPromise, transport))
statusPromise.tryFailure(reason match {
case FSM.Failure(info: DisassociateInfo) disassociateException(info)
case _ new AkkaProtocolException("Transport disassociated before handshake finished")
})
case StopEvent(reason, _, OutboundUnderlyingAssociated(statusPromise, wrappedHandle))
statusPromise.tryFailure(new AkkaProtocolException(reason match {
case FSM.Failure(TimeoutReason) "No response from remote. Handshake timed out"
case _ "Remote endpoint disassociated before handshake finished"
}))
statusPromise.tryFailure(reason match {
case FSM.Failure(TimeoutReason) new AkkaProtocolException("No response from remote. Handshake timed out")
case FSM.Failure(info: DisassociateInfo) disassociateException(info)
case _ new AkkaProtocolException("Transport disassociated before handshake finished")
})
wrappedHandle.disassociate()
case StopEvent(_, _, AssociatedWaitHandler(handlerFuture, wrappedHandle, queue))
case StopEvent(reason, _, AssociatedWaitHandler(handlerFuture, wrappedHandle, queue))
// Invalidate exposed but still unfinished promise. The underlying association disappeared, so after
// registration immediately signal a disassociate
handlerFuture foreach { _ notify Disassociated }
val disassociateNotification = reason match {
case FSM.Failure(info: DisassociateInfo) Disassociated(info)
case _ Disassociated(Unknown)
}
handlerFuture foreach { _ notify disassociateNotification }
case StopEvent(_, _, ListenerReady(handler, wrappedHandle))
handler notify Disassociated
case StopEvent(reason, _, ListenerReady(handler, wrappedHandle))
val disassociateNotification = reason match {
case FSM.Failure(info: DisassociateInfo) Disassociated(info)
case _ Disassociated(Unknown)
}
handler notify disassociateNotification
wrappedHandle.disassociate()
case StopEvent(_, _, InboundUnassociated(_, wrappedHandle))
@ -439,9 +453,20 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat
}
private def disassociateException(info: DisassociateInfo): Exception = info match {
case Unknown
new AkkaProtocolException("The remote system explicitly disassociated (reason unknown).")
case Shutdown
InvalidAssociationException("The remote system refused the association because it is shutting down.")
case Quarantined
InvalidAssociationException("The remote system has quarantined this system. No further associations to the remote " +
"system are possible until this system is restarted.")
}
override protected def logTermination(reason: FSM.Reason): Unit = reason match {
case FSM.Failure(TimeoutReason) // no logging
case other super.logTermination(reason)
case FSM.Failure(TimeoutReason) // no logging
case FSM.Failure(_: DisassociateInfo) // no logging
case other super.logTermination(reason)
}
private def listenForListenerRegistration(readHandlerPromise: Promise[HandleEventListener]): Unit =
@ -494,9 +519,10 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat
case NonFatal(e) throw new AkkaProtocolException("Error writing HEARTBEAT to transport", e)
}
private def sendDisassociate(wrappedHandle: AssociationHandle): Unit = try wrappedHandle.write(codec.constructDisassociate) catch {
case NonFatal(e) throw new AkkaProtocolException("Error writing DISASSOCIATE to transport", e)
}
private def sendDisassociate(wrappedHandle: AssociationHandle, info: DisassociateInfo): Unit =
try wrappedHandle.write(codec.constructDisassociate(info)) catch {
case NonFatal(e) throw new AkkaProtocolException("Error writing DISASSOCIATE to transport", e)
}
private def sendAssociate(wrappedHandle: AssociationHandle, info: HandshakeInfo): Boolean = try {
wrappedHandle.write(codec.constructAssociate(info))

View file

@ -125,7 +125,9 @@ class TestTransport(
}
private def defaultDisassociate(handle: TestAssociationHandle): Future[Unit] = {
registry.deregisterAssociation(handle.key).foreach { registry.remoteListenerRelativeTo(handle, _) notify Disassociated }
registry.deregisterAssociation(handle.key).foreach {
registry.remoteListenerRelativeTo(handle, _) notify Disassociated(AssociationHandle.Unknown)
}
Future.successful(())
}

View file

@ -440,8 +440,8 @@ private[transport] class ThrottledAssociation(
inboundThrottleMode = mode
sender ! SetThrottleAck
stay()
case Event(Disassociated, _)
if (upstreamListener ne null) upstreamListener notify Disassociated
case Event(Disassociated(info), _)
if (upstreamListener ne null) upstreamListener notify Disassociated(info)
originalHandle.disassociate()
stop()
}

View file

@ -8,6 +8,7 @@ import akka.actor.{ ActorRef, Address }
import akka.util.ByteString
import akka.remote.transport.AssociationHandle.HandleEventListener
import akka.AkkaException
import scala.util.control.NoStackTrace
object Transport {
@ -18,7 +19,7 @@ object Transport {
* hostname, etc.).
*/
@SerialVersionUID(1L)
case class InvalidAssociationException(msg: String, cause: Throwable) extends AkkaException(msg, cause)
case class InvalidAssociationException(msg: String, cause: Throwable = null) extends AkkaException(msg, cause) with NoStackTrace
/**
* Message sent to a [[akka.remote.transport.Transport.AssociationEventListener]] registered to a transport
@ -159,8 +160,20 @@ object AssociationHandle {
/**
* Message sent to the listener registered to an association
*
* @param info
* information about the reason of disassociation
*/
case object Disassociated extends HandleEvent
case class Disassociated(info: DisassociateInfo) extends HandleEvent
/**
* Supertype of possible disassociation reasons
*/
sealed trait DisassociateInfo
case object Unknown extends DisassociateInfo
case object Shutdown extends DisassociateInfo
case object Quarantined extends DisassociateInfo
/**
* An interface that needs to be implemented by the user of an [[akka.remote.transport.AssociationHandle]]

View file

@ -436,8 +436,8 @@ class NettyTransport(val settings: NettyTransportSettings, val system: ExtendedA
def always(c: ChannelGroupFuture) = NettyFutureBridge(c).map(_ true) recover { case _ false }
for {
// Force flush by trying to write an empty buffer and wait for success
lastWriteStatus always(channelGroup.write(ChannelBuffers.buffer(0)))
unbindStatus always(channelGroup.unbind())
lastWriteStatus always(channelGroup.write(ChannelBuffers.buffer(0)))
disconnectStatus always(channelGroup.disconnect())
closeStatus always(channelGroup.close())
} yield {

View file

@ -38,7 +38,7 @@ private[remote] trait TcpHandlers extends CommonHandlers {
new TcpAssociationHandle(localAddress, remoteAddress, transport, channel)
override def onDisconnect(ctx: ChannelHandlerContext, e: ChannelStateEvent): Unit =
notifyListener(e.getChannel, Disassociated)
notifyListener(e.getChannel, Disassociated(AssociationHandle.Unknown))
override def onMessage(ctx: ChannelHandlerContext, e: MessageEvent): Unit = {
val bytes: Array[Byte] = e.getMessage.asInstanceOf[ChannelBuffer].array()
@ -46,7 +46,7 @@ private[remote] trait TcpHandlers extends CommonHandlers {
}
override def onException(ctx: ChannelHandlerContext, e: ExceptionEvent): Unit = {
notifyListener(e.getChannel, Disassociated)
notifyListener(e.getChannel, Disassociated(AssociationHandle.Unknown))
e.getChannel.close() // No graceful close here
}
}

View file

@ -3,7 +3,7 @@ package akka.remote.transport
import akka.actor.{ ExtendedActorSystem, Address, Props }
import akka.remote.transport.AkkaPduCodec.{ Disassociate, Associate, Heartbeat }
import akka.remote.transport.AkkaProtocolSpec.TestFailureDetector
import akka.remote.transport.AssociationHandle.{ ActorHandleEventListener, Disassociated, InboundPayload }
import akka.remote.transport.AssociationHandle.{ DisassociateInfo, ActorHandleEventListener, Disassociated, InboundPayload }
import akka.remote.transport.TestTransport._
import akka.remote.transport.Transport._
import akka.remote.{ SeqNo, WireFormats, RemoteActorRefProvider, FailureDetector }
@ -74,7 +74,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re
def testHeartbeat = InboundPayload(codec.constructHeartbeat)
def testPayload = InboundPayload(testMsgPdu)
def testDisassociate = InboundPayload(codec.constructDisassociate)
def testDisassociate(info: DisassociateInfo) = InboundPayload(codec.constructDisassociate(info))
def testAssociate(uid: Int, cookie: Option[String]) =
InboundPayload(codec.constructAssociate(HandshakeInfo(remoteAkkaAddress, uid, cookie)))
@ -113,8 +113,8 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re
if (registry.logSnapshot.isEmpty) false else registry.logSnapshot.last match {
case WriteAttempt(sender, recipient, payload) if sender == localAddress && recipient == remoteAddress
codec.decodePdu(payload) match {
case Disassociate true
case _ false
case Disassociate(_) true
case _ false
}
case _ false
}
@ -326,9 +326,9 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re
wrappedHandle.readHandlerPromise.success(ActorHandleEventListener(testActor))
reader ! testDisassociate
reader ! testDisassociate(AssociationHandle.Unknown)
expectMsg(Disassociated)
expectMsg(Disassociated(AssociationHandle.Unknown))
}
"handle transport level disassociations" in {
@ -361,9 +361,9 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re
wrappedHandle.readHandlerPromise.success(ActorHandleEventListener(testActor))
reader ! Disassociated
reader ! Disassociated(AssociationHandle.Unknown)
expectMsg(Disassociated)
expectMsg(Disassociated(AssociationHandle.Unknown))
}
"disassociate when failure detector signals failure" in {
@ -401,7 +401,7 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re
failureDetector.isAvailable = false
expectMsg(Disassociated)
expectMsg(Disassociated(AssociationHandle.Unknown))
}
"handle correctly when the handler is registered only after the association is already closed" in {
@ -432,11 +432,11 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re
case _ fail()
}
stateActor ! Disassociated
stateActor ! Disassociated(AssociationHandle.Unknown)
wrappedHandle.readHandlerPromise.success(ActorHandleEventListener(testActor))
expectMsg(Disassociated)
expectMsg(Disassociated(AssociationHandle.Unknown))
}

View file

@ -25,7 +25,7 @@ object AkkaProtocolStressTest {
acceptable-heartbeat-pause = 0.01 s
}
remote.retry-window = 1 s
remote.maximum-retries-in-window = 1000
remote.maximum-retries-in-window = 3
remote.netty.tcp {
applied-adapters = ["gremlin"]
@ -33,10 +33,12 @@ object AkkaProtocolStressTest {
}
}
""")
""")
class SequenceVerifier(remote: ActorRef, controller: ActorRef) extends Actor {
val limit = 10000
import context.dispatcher
val limit = 100000
var nextSeq = 0
var maxSeq = -1
var losses = 0
@ -46,7 +48,8 @@ object AkkaProtocolStressTest {
case "sendNext" if (nextSeq < limit) {
remote ! nextSeq
nextSeq += 1
self ! "sendNext"
if (nextSeq % 2000 == 0) context.system.scheduler.scheduleOnce(500.milliseconds, self, "sendNext")
else self ! "sendNext"
}
case seq: Int
if (seq > maxSeq) {
@ -82,11 +85,13 @@ class AkkaProtocolStressTest extends AkkaSpec(configA) with ImplicitSender with
"AkkaProtocolTransport" must {
"guarantee at-most-once delivery and message ordering despite packet loss" taggedAs TimingTest in {
system.eventStream.publish(TestEvent.Mute(DeadLettersFilter[Any]))
systemB.eventStream.publish(TestEvent.Mute(DeadLettersFilter[Any]))
Await.result(RARP(system).provider.transport.managementCommand(One(addressB, Drop(0.3, 0.3))), 3.seconds.dilated)
val tester = system.actorOf(Props(classOf[SequenceVerifier], here, self)) ! "start"
expectMsgPF(45.seconds) {
expectMsgPF(60.seconds) {
case (received: Int, lost: Int)
log.debug(s" ######## Received ${received - lost} messages from ${received} ########")
}

View file

@ -147,7 +147,7 @@ abstract class GenericTransportSpec(withAkkaProtocol: Boolean = false)
handleA.disassociate()
expectMsgPF(timeout.duration) {
case Disassociated
case Disassociated(_)
}
awaitCond(!registry.existsAssociation(addressATest, addressBTest))

View file

@ -46,7 +46,7 @@ object SystemMessageDeliveryStressTest {
acceptable-heartbeat-pause = 0.01 s
}
remote.retry-window = 1 s
remote.maximum-retries-in-window = 1000
remote.maximum-retries-in-window = 2
remote.use-passive-connections = on
remote.netty.tcp {
@ -90,7 +90,9 @@ object SystemMessageDeliveryStressTest {
}
abstract class SystemMessageDeliveryStressTest(msg: String, cfg: String)
extends AkkaSpec(ConfigFactory.parseString(cfg).withFallback(configA)) with ImplicitSender with DefaultTimeout {
extends AkkaSpec(ConfigFactory.parseString(cfg).withFallback(SystemMessageDeliveryStressTest.baseConfig))
with ImplicitSender
with DefaultTimeout {
import SystemMessageDeliveryStressTest._
val systemB = ActorSystem("systemB", system.settings.config)

View file

@ -124,7 +124,7 @@ class TestTransportSpec extends AkkaSpec with DefaultTimeout with ImplicitSender
handleA.disassociate()
expectMsgPF(timeout.duration) {
case Disassociated
case Disassociated(_)
}
awaitCond(!registry.existsAssociation(addressA, addressB))