diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterRemoteWatcher.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterRemoteWatcher.scala index 806933cd44..7f75d73c75 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterRemoteWatcher.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterRemoteWatcher.scala @@ -108,7 +108,8 @@ private[cluster] class ClusterRemoteWatcher( clusterNodes -= m.address if (previousStatus == MemberStatus.Down) { - quarantine(m.address, Some(m.uniqueAddress.longUid), s"Cluster member removed, previous status [$previousStatus]") + quarantine(m.address, Some(m.uniqueAddress.longUid), + s"Cluster member removed, previous status [$previousStatus]", harmless = false) } else if (arteryEnabled) { // Don't quarantine gracefully removed members (leaving) directly, // give Cluster Singleton some time to exchange TakeOver/HandOver messages. @@ -128,14 +129,15 @@ private[cluster] class ClusterRemoteWatcher( pendingDelayedQuarantine.find(_.address == newIncarnation.address).foreach { oldIncarnation ⇒ pendingDelayedQuarantine -= oldIncarnation quarantine(oldIncarnation.address, Some(oldIncarnation.longUid), - s"Cluster member removed, new incarnation joined") + s"Cluster member removed, new incarnation joined", harmless = true) } } def delayedQuarantine(m: Member, previousStatus: MemberStatus): Unit = { if (pendingDelayedQuarantine(m.uniqueAddress)) { pendingDelayedQuarantine -= m.uniqueAddress - quarantine(m.address, Some(m.uniqueAddress.longUid), s"Cluster member removed, previous status [$previousStatus]") + quarantine(m.address, Some(m.uniqueAddress.longUid), s"Cluster member removed, previous status [$previousStatus]", + harmless = true) } } diff --git a/akka-remote/src/main/mima-filters/2.5.12.backwards.excludes b/akka-remote/src/main/mima-filters/2.5.12.backwards.excludes index 096ed407f2..4cf43d3853 100644 --- a/akka-remote/src/main/mima-filters/2.5.12.backwards.excludes +++ b/akka-remote/src/main/mima-filters/2.5.12.backwards.excludes @@ -1,3 +1,10 @@ +# #24972 Artery internals +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.artery.compress.InboundCompression.confirmAdvertisement") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.artery.OutboundHandshake.this") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.artery.ArteryTransport.terminationHintReplier") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.RemoteWatcher.quarantine") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.artery.Association.quarantine") + # Internal API changes ProblemFilters.exclude[MissingTypesProblem]("akka.remote.artery.ArteryTransport$InboundStreamMatValues$") ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.remote.artery.ArteryTransport#InboundStreamMatValues.apply") diff --git a/akka-remote/src/main/resources/reference.conf b/akka-remote/src/main/resources/reference.conf index 1973950687..c52a43acdd 100644 --- a/akka-remote/src/main/resources/reference.conf +++ b/akka-remote/src/main/resources/reference.conf @@ -967,16 +967,17 @@ akka { # Only used when transport is tcp or tls-tcp. connection-timeout = 5 seconds - # The timeout for outbound associations to perform the handshake. - # This timeout must be greater than the 'image-liveness-timeout'. + # The timeout for outbound associations to perform the initial handshake. + # This timeout must be greater than the 'image-liveness-timeout' when + # transport is aeron-udp. handshake-timeout = 20 seconds - # incomplete handshake attempt is retried with this interval + # incomplete initial handshake attempt is retried with this interval handshake-retry-interval = 1 second - # handshake requests are performed periodically with this interval, + # Handshake requests are performed periodically with this interval, # also after the handshake has been completed to be able to establish - # a new session with a restarted destination system + # a new session with a restarted destination system. inject-handshake-interval = 1 second # messages that are not accepted by Aeron are dropped after retrying for this period @@ -988,6 +989,27 @@ akka { # of a network partition that you need to survive. give-up-system-message-after = 6 hours + # Outbound streams are stopped when they haven't been used for this duration. + # They are started again when new messages are sent. + stop-idle-outbound-after = 5 minutes + + # Outbound streams are quarantined when they haven't been used for this duration + # to cleanup resources used by the association, such as compression tables. + # This will cleanup association to crashed systems that didn't announce their + # termination. + # The value should be longer than the length of a network partition that you + # need to survive. + # The value must also be greater than stop-idle-outbound-after. + # Once every 1/10 of this duration an extra handshake message will be sent. + # Therfore it's also recommended to use a value that is greater than 10 times + # the stop-idle-outbound-after, since otherwise the idle streams will not be + # stopped. + quarantine-idle-outbound-after = 6 hours + + # Stop outbound stream of a quarantined association after this idle timeout, i.e. + # when not used any more. + stop-quarantined-after-idle = 3 seconds + # After catastrophic communication failures that could result in the loss of system # messages or after the remote DeathWatch triggers the remote system gets # quarantined to prevent inconsistent behavior. @@ -999,10 +1021,6 @@ akka { # if it wakes up again. Therfore this shouldn't be set too low. remove-quarantined-association-after = 1 h - # Outbound streams are stopped when they haven't been used for this duration. - # They are started again when new messages are sent. - stop-idle-outbound-after = 5.minutes - # during ActorSystem termination the remoting will wait this long for # an acknowledgment by the destination system that flushing of outstanding # remote messages has been completed @@ -1026,10 +1044,6 @@ akka { # If more restarts occurs the ActorSystem will be terminated. outbound-max-restarts = 5 - # Stop outbound stream of a quarantined association after this idle timeout, i.e. - # when not used any more. - stop-quarantined-after-idle = 3 seconds - # Timeout after which aeron driver has not had keepalive messages # from a client before it considers the client dead. # Only used when transport is aeron-udp. diff --git a/akka-remote/src/main/scala/akka/remote/RemoteWatcher.scala b/akka-remote/src/main/scala/akka/remote/RemoteWatcher.scala index 72d9bb3cf7..9412b8d906 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteWatcher.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteWatcher.scala @@ -9,10 +9,11 @@ import akka.dispatch.sysmsg.{ DeathWatchNotification, Watch } import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics } import akka.event.AddressTerminatedTopic import akka.remote.artery.ArteryMessage - import scala.collection.mutable import scala.concurrent.duration._ +import akka.remote.artery.ArteryTransport + /** * INTERNAL API */ @@ -163,7 +164,7 @@ private[akka] class RemoteWatcher( watchingNodes foreach { a ⇒ if (!unreachable(a) && !failureDetector.isAvailable(a)) { log.warning("Detected unreachable: [{}]", a) - quarantine(a, addressUids.get(a), "Deemed unreachable by remote failure detector") + quarantine(a, addressUids.get(a), "Deemed unreachable by remote failure detector", harmless = false) publishAddressTerminated(a) unreachable += a } @@ -172,8 +173,12 @@ private[akka] class RemoteWatcher( def publishAddressTerminated(address: Address): Unit = AddressTerminatedTopic(context.system).publish(AddressTerminated(address)) - def quarantine(address: Address, uid: Option[Long], reason: String): Unit = - remoteProvider.quarantine(address, uid, reason) + def quarantine(address: Address, uid: Option[Long], reason: String, harmless: Boolean): Unit = { + remoteProvider.transport match { + case t: ArteryTransport if harmless ⇒ t.quarantine(address, uid, reason, harmless) + case _ ⇒ remoteProvider.quarantine(address, uid, reason) + } + } def addWatch(watchee: InternalActorRef, watcher: InternalActorRef): Unit = { assert(watcher != self) diff --git a/akka-remote/src/main/scala/akka/remote/RemotingLifecycleEvent.scala b/akka-remote/src/main/scala/akka/remote/RemotingLifecycleEvent.scala index 63d4626a66..bf10a4cdf1 100644 --- a/akka-remote/src/main/scala/akka/remote/RemotingLifecycleEvent.scala +++ b/akka-remote/src/main/scala/akka/remote/RemotingLifecycleEvent.scala @@ -95,7 +95,7 @@ final case class QuarantinedEvent(address: Address, longUid: Long) extends Remot override def logLevel: Logging.LogLevel = Logging.WarningLevel override val toString: String = s"Association to [$address] having UID [$longUid] is irrecoverably failed. UID is now quarantined and all " + - "messages to this UID will be delivered to dead letters. Remote actorsystem must be restarted to recover " + + "messages to this UID will be delivered to dead letters. Remote ActorSystem must be restarted to recover " + "from this situation." // For binary compatibility @@ -110,6 +110,17 @@ final case class QuarantinedEvent(address: Address, longUid: Long) extends Remot def copy(address: Address = address, uid: Int = uid) = new QuarantinedEvent(address, uid) } +/** + * The `uniqueAddress` was quarantined but it was due to normal shutdown or cluster leaving/exiting. + */ +@SerialVersionUID(1L) +final case class GracefulShutdownQuarantinedEvent(uniqueAddress: UniqueAddress, reason: String) extends RemotingLifecycleEvent { + override def logLevel: Logging.LogLevel = Logging.InfoLevel + override val toString: String = + s"Association to [${uniqueAddress.address}] having UID [${uniqueAddress.uid}] has been stopped. All " + + s"messages to this UID will be delivered to dead letters. Reason: $reason " +} + @SerialVersionUID(1L) final case class ThisActorSystemQuarantinedEvent(localAddress: Address, remoteAddress: Address) extends RemotingLifecycleEvent { override def logLevel: LogLevel = Logging.WarningLevel diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArterySettings.scala b/akka-remote/src/main/scala/akka/remote/artery/ArterySettings.scala index 142fc51ddc..9781e4bba5 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArterySettings.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArterySettings.scala @@ -144,11 +144,18 @@ private[akka] final class ArterySettings private (config: Config) { val GiveUpSystemMessageAfter: FiniteDuration = config.getMillisDuration("give-up-system-message-after").requiring(interval ⇒ interval > Duration.Zero, "give-up-system-message-after must be more than zero") + val StopIdleOutboundAfter: FiniteDuration = config.getMillisDuration("stop-idle-outbound-after") + .requiring(interval ⇒ interval > Duration.Zero, "stop-idle-outbound-after must be more than zero") + val QuarantineIdleOutboundAfter: FiniteDuration = config.getMillisDuration("quarantine-idle-outbound-after") + .requiring( + interval ⇒ interval > StopIdleOutboundAfter, + "quarantine-idle-outbound-after must be greater than stop-idle-outbound-after") + val StopQuarantinedAfterIdle: FiniteDuration = + config.getMillisDuration("stop-quarantined-after-idle").requiring(interval ⇒ + interval > Duration.Zero, "stop-quarantined-after-idle must be more than zero") val RemoveQuarantinedAssociationAfter: FiniteDuration = config.getMillisDuration("remove-quarantined-association-after").requiring(interval ⇒ interval > Duration.Zero, "remove-quarantined-association-after must be more than zero") - val StopIdleOutboundAfter: FiniteDuration = config.getMillisDuration("stop-idle-outbound-after").requiring(interval ⇒ - interval > Duration.Zero, "stop-idle-outbound-after must be more than zero") val ShutdownFlushTimeout: FiniteDuration = config.getMillisDuration("shutdown-flush-timeout").requiring(interval ⇒ interval > Duration.Zero, "shutdown-flush-timeout must be more than zero") @@ -163,9 +170,6 @@ private[akka] final class ArterySettings private (config: Config) { config.getMillisDuration("outbound-restart-timeout").requiring(interval ⇒ interval > Duration.Zero, "outbound-restart-timeout must be more than zero") val OutboundMaxRestarts: Int = getInt("outbound-max-restarts") - val StopQuarantinedAfterIdle: FiniteDuration = - config.getMillisDuration("stop-quarantined-after-idle").requiring(interval ⇒ - interval > Duration.Zero, "stop-quarantined-after-idle must be more than zero") val ClientLivenessTimeout: FiniteDuration = config.getMillisDuration("client-liveness-timeout").requiring(interval ⇒ interval > Duration.Zero, "client-liveness-timeout must be more than zero") diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala index 633346b76b..78af43b81b 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala @@ -736,8 +736,12 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr } override def quarantine(remoteAddress: Address, uid: Option[Long], reason: String): Unit = { + quarantine(remoteAddress, uid, reason, harmless = false) + } + + def quarantine(remoteAddress: Address, uid: Option[Long], reason: String, harmless: Boolean): Unit = { try { - association(remoteAddress).quarantine(reason, uid) + association(remoteAddress).quarantine(reason, uid, harmless) } catch { case ShuttingDown ⇒ // silence it } @@ -772,15 +776,16 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr Flow.fromGraph(killSwitch.flow[OutboundEnvelope]) .via(new OutboundHandshake(system, outboundContext, outboundEnvelopePool, settings.Advanced.HandshakeTimeout, - settings.Advanced.HandshakeRetryInterval, settings.Advanced.InjectHandshakeInterval)) + settings.Advanced.HandshakeRetryInterval, settings.Advanced.InjectHandshakeInterval, Duration.Undefined)) .viaMat(createEncoder(bufferPool, streamId))(Keep.right) } def outboundControl(outboundContext: OutboundContext): Sink[OutboundEnvelope, (OutboundControlIngress, Future[Done])] = { - + val livenessProbeInterval = (settings.Advanced.QuarantineIdleOutboundAfter / 10) + .max(settings.Advanced.HandshakeRetryInterval) Flow.fromGraph(killSwitch.flow[OutboundEnvelope]) .via(new OutboundHandshake(system, outboundContext, outboundEnvelopePool, settings.Advanced.HandshakeTimeout, - settings.Advanced.HandshakeRetryInterval, settings.Advanced.InjectHandshakeInterval)) + settings.Advanced.HandshakeRetryInterval, settings.Advanced.InjectHandshakeInterval, livenessProbeInterval)) .via(new SystemMessageDelivery(outboundContext, system.deadLetters, settings.Advanced.SystemMessageResendInterval, settings.Advanced.SysMsgBufferSize)) // note that System messages must not be dropped before the SystemMessageDelivery stage @@ -813,13 +818,20 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr // Checks for termination hint messages and sends an ACK for those (not processing them further) // Purpose of this stage is flushing, the sender can wait for the ACKs up to try flushing // pending messages. - def terminationHintReplier(): Flow[InboundEnvelope, InboundEnvelope, NotUsed] = { + def terminationHintReplier(inControlStream: Boolean): Flow[InboundEnvelope, InboundEnvelope, NotUsed] = { Flow[InboundEnvelope].filter { envelope ⇒ envelope.message match { - case _: ActorSystemTerminating ⇒ + case ActorSystemTerminating(from) ⇒ envelope.sender match { - case OptionVal.Some(snd) ⇒ snd.tell(ActorSystemTerminatingAck(localAddress), ActorRef.noSender) - case OptionVal.None ⇒ log.error("Expected sender for ActorSystemTerminating message") + case OptionVal.Some(snd) ⇒ + snd.tell(ActorSystemTerminatingAck(localAddress), ActorRef.noSender) + if (inControlStream) + system.scheduler.scheduleOnce(settings.Advanced.ShutdownFlushTimeout) { + if (!isShutdown) + quarantine(from.address, Some(from.uid), "ActorSystem terminated", harmless = true) + }(materializer.executionContext) + case OptionVal.None ⇒ + log.error("Expected sender for ActorSystemTerminating message from [{}]", from) } false case _ ⇒ true @@ -831,7 +843,7 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr Flow[InboundEnvelope] .via(createDeserializer(bufferPool)) .via(if (settings.Advanced.TestMode) new InboundTestStage(this, testState) else Flow[InboundEnvelope]) - .via(terminationHintReplier()) + .via(terminationHintReplier(inControlStream = false)) .via(new InboundHandshake(this, inControlStream = false)) .via(new InboundQuarantineCheck(this)) .toMat(messageDispatcherSink)(Keep.right) @@ -850,7 +862,7 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr Flow[InboundEnvelope] .via(createDeserializer(envelopeBufferPool)) .via(if (settings.Advanced.TestMode) new InboundTestStage(this, testState) else Flow[InboundEnvelope]) - .via(terminationHintReplier()) + .via(terminationHintReplier(inControlStream = true)) .via(new InboundHandshake(this, inControlStream = true)) .via(new InboundQuarantineCheck(this)) .viaMat(new InboundControlJunction)(Keep.right) diff --git a/akka-remote/src/main/scala/akka/remote/artery/Association.scala b/akka-remote/src/main/scala/akka/remote/artery/Association.scala index d9fd5a0786..b2cd7d7a6a 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Association.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Association.scala @@ -4,6 +4,7 @@ package akka.remote.artery +import akka.util.PrettyDuration._ import java.util.Queue import java.util.concurrent.CountDownLatch import java.util.concurrent.LinkedBlockingQueue @@ -180,8 +181,8 @@ private[remote] class Association( // keyed by stream queue index private[this] val streamMatValues = new AtomicReference(Map.empty[Int, OutboundStreamMatValues]) - private[this] val idleTask = new AtomicReference[Option[Cancellable]](None) - private[this] val quarantinedIdleTask = new AtomicReference[Option[Cancellable]](None) + private[this] val idleTimer = new AtomicReference[Option[Cancellable]](None) + private[this] val stopQuarantinedTimer = new AtomicReference[Option[Cancellable]](None) private[remote] def changeActorRefCompression(table: CompressionTable[ActorRef]): Future[Done] = updateOutboundCompression(c ⇒ c.changeActorRefCompression(table)) @@ -256,8 +257,6 @@ private[remote] class Association( def setControlIdleKillSwitch(killSwitch: OptionVal[SharedKillSwitch]): Unit = { val current = associationState swapState(current, current.withControlIdleKillSwitch(killSwitch)) - if (killSwitch.isDefined) - startIdleTimer() } def completeHandshake(peer: UniqueAddress): Future[Done] = { @@ -285,7 +284,7 @@ private[remote] class Association( if (swapState(current, newState)) { current.uniqueRemoteAddressValue() match { case Some(old) ⇒ - cancelQuarantinedIdleTimer() + cancelStopQuarantinedTimer() log.debug( "Incarnation {} of association to [{}] with new UID [{}] (old UID [{}])", newState.incarnation, peer.address, peer.uid, old.uid) @@ -308,7 +307,7 @@ private[remote] class Association( if (associationState.isQuarantined()) { log.debug("Send control message [{}] to quarantined [{}]", Logging.messageClassName(message), remoteAddress) - startQuarantinedIdleTimer() + setupStopQuarantinedTimer() } outboundControlIngress.sendControlMessage(message) } @@ -351,7 +350,7 @@ private[remote] class Association( if (message.isInstanceOf[ActorSelectionMessage] || !quarantined || messageIsClearSystemMessageDelivery) { if (quarantined && !messageIsClearSystemMessageDelivery) { log.debug("Quarantine piercing attempt with message [{}] to [{}]", Logging.messageClassName(message), recipient.getOrElse("")) - startQuarantinedIdleTimer() + setupStopQuarantinedTimer() } try { val outboundEnvelope = createOutboundEnvelope() @@ -454,10 +453,10 @@ private[remote] class Association( // OutboundContext override def quarantine(reason: String): Unit = { val uid = associationState.uniqueRemoteAddressValue().map(_.uid) - quarantine(reason, uid) + quarantine(reason, uid, harmless = false) } - @tailrec final def quarantine(reason: String, uid: Option[Long]): Unit = { + @tailrec final def quarantine(reason: String, uid: Option[Long], harmless: Boolean): Unit = { uid match { case Some(u) ⇒ val current = associationState @@ -467,22 +466,32 @@ private[remote] class Association( val newState = current.newQuarantined() if (swapState(current, newState)) { // quarantine state change was performed - log.warning( - "Association to [{}] with UID [{}] is irrecoverably failed. UID is now quarantined and all " + - "messages to this UID will be delivered to dead letters. " + - "Remote actorsystem must be restarted to recover from this situation. {}", - remoteAddress, u, reason) - transport.system.eventStream.publish(QuarantinedEvent(remoteAddress, u)) + if (harmless) { + log.info( + "Association to [{}] having UID [{}] has been stopped. All " + + "messages to this UID will be delivered to dead letters. Reason: {}", + remoteAddress, u, reason) + transport.system.eventStream.publish(GracefulShutdownQuarantinedEvent(UniqueAddress(remoteAddress, u), reason)) + } else { + log.warning( + "Association to [{}] with UID [{}] is irrecoverably failed. UID is now quarantined and all " + + "messages to this UID will be delivered to dead letters. " + + "Remote ActorSystem must be restarted to recover from this situation. Reason: {}", + remoteAddress, u, reason) + transport.system.eventStream.publish(QuarantinedEvent(remoteAddress, u)) + } flightRecorder.loFreq(Transport_Quarantined, s"$remoteAddress - $u") clearOutboundCompression() clearInboundCompression(u) // end delivery of system messages to that incarnation after this point send(ClearSystemMessageDelivery(current.incarnation), OptionVal.None, OptionVal.None) - // try to tell the other system that we have quarantined it - sendControl(Quarantined(localAddress, peer)) - startQuarantinedIdleTimer() + if (!harmless) { + // try to tell the other system that we have quarantined it + sendControl(Quarantined(localAddress, peer)) + } + setupStopQuarantinedTimer() } else - quarantine(reason, uid) // recursive + quarantine(reason, uid, harmless) // recursive } case Some(peer) ⇒ log.info( @@ -519,8 +528,7 @@ private[remote] class Association( // cleanup _outboundControlIngress = OptionVal.None outboundCompressionAccess = Vector.empty - cancelIdleTimer() - cancelQuarantinedIdleTimer() + cancelAllTimers() abortQuarantined() log.info("Unused association to [{}] removed after quarantine", remoteAddress) @@ -530,21 +538,22 @@ private[remote] class Association( def isRemovedAfterQuarantined(): Boolean = queues(ControlQueueIndex) == RemovedQueueWrapper - private def cancelQuarantinedIdleTimer(): Unit = { - val current = quarantinedIdleTask.get + private def cancelStopQuarantinedTimer(): Unit = { + val current = stopQuarantinedTimer.get current.foreach(_.cancel()) - quarantinedIdleTask.compareAndSet(current, None) + stopQuarantinedTimer.compareAndSet(current, None) } - private def startQuarantinedIdleTimer(): Unit = { - cancelQuarantinedIdleTimer() - quarantinedIdleTask.set(Some(transport.system.scheduler.scheduleOnce(advancedSettings.StopQuarantinedAfterIdle) { + private def setupStopQuarantinedTimer(): Unit = { + cancelStopQuarantinedTimer() + stopQuarantinedTimer.set(Some(transport.system.scheduler.scheduleOnce(advancedSettings.StopQuarantinedAfterIdle) { if (associationState.isQuarantined()) abortQuarantined() }(transport.system.dispatcher))) } private def abortQuarantined(): Unit = { + cancelIdleTimer() streamMatValues.get.foreach { case (queueIndex, OutboundStreamMatValues(killSwitch, _, _)) ⇒ killSwitch match { @@ -558,61 +567,66 @@ private[remote] class Association( } private def cancelIdleTimer(): Unit = { - val current = idleTask.get + val current = idleTimer.get current.foreach(_.cancel()) - idleTask.compareAndSet(current, None) + idleTimer.compareAndSet(current, None) } - private def startIdleTimer(): Unit = { - cancelIdleTimer() - val StopIdleOutboundAfter = settings.Advanced.StopIdleOutboundAfter - val interval = StopIdleOutboundAfter / 2 - val stopIdleOutboundAfterNanos = StopIdleOutboundAfter.toNanos - val initialDelay = settings.Advanced.ConnectionTimeout.max(StopIdleOutboundAfter) + 1.second - val task: Cancellable = transport.system.scheduler.schedule(initialDelay, interval) { - if (System.nanoTime() - associationState.lastUsedTimestamp.get >= stopIdleOutboundAfterNanos) { - streamMatValues.get.foreach { - case (queueIndex, OutboundStreamMatValues(streamKillSwitch, _, stopping)) ⇒ - if (isStreamActive(queueIndex) && stopping.isEmpty) { - if (queueIndex != ControlQueueIndex) { - streamKillSwitch match { - case OptionVal.Some(k) ⇒ - // for non-control streams we can stop the entire stream - log.info("Stopping idle outbound stream [{}] to [{}]", queueIndex, remoteAddress) - flightRecorder.loFreq(Transport_StopIdleOutbound, s"$remoteAddress - $queueIndex") - setStopReason(queueIndex, OutboundStreamStopIdleSignal) - clearStreamKillSwitch(queueIndex, k) - k.abort(OutboundStreamStopIdleSignal) - case OptionVal.None ⇒ // already aborted - } + private def setupIdleTimer(): Unit = { + if (idleTimer.get.isEmpty) { + val StopIdleOutboundAfter = settings.Advanced.StopIdleOutboundAfter + val QuarantineIdleOutboundAfter = settings.Advanced.QuarantineIdleOutboundAfter + val interval = StopIdleOutboundAfter / 2 + val initialDelay = settings.Advanced.ConnectionTimeout.max(StopIdleOutboundAfter) + 1.second + val task = transport.system.scheduler.schedule(initialDelay, interval) { + val lastUsedDurationNanos = System.nanoTime() - associationState.lastUsedTimestamp.get + if (lastUsedDurationNanos >= QuarantineIdleOutboundAfter.toNanos && !associationState.isQuarantined()) { + // If idle longer than quarantine-idle-outbound-after and the low frequency HandshakeReq + // doesn't get through it will be quarantined to cleanup lingering associations to crashed systems. + quarantine(s"Idle longer than quarantine-idle-outbound-after [${QuarantineIdleOutboundAfter.pretty}]") + } else if (lastUsedDurationNanos >= StopIdleOutboundAfter.toNanos) { + streamMatValues.get.foreach { + case (queueIndex, OutboundStreamMatValues(streamKillSwitch, _, stopping)) ⇒ + if (isStreamActive(queueIndex) && stopping.isEmpty) { + if (queueIndex != ControlQueueIndex) { + streamKillSwitch match { + case OptionVal.Some(k) ⇒ + // for non-control streams we can stop the entire stream + log.info("Stopping idle outbound stream [{}] to [{}]", queueIndex, remoteAddress) + flightRecorder.loFreq(Transport_StopIdleOutbound, s"$remoteAddress - $queueIndex") + setStopReason(queueIndex, OutboundStreamStopIdleSignal) + clearStreamKillSwitch(queueIndex, k) + k.abort(OutboundStreamStopIdleSignal) + case OptionVal.None ⇒ // already aborted + } - } else { - // only stop the transport parts of the stream because SystemMessageDelivery stage has - // state (seqno) and system messages might be sent at the same time - associationState.controlIdleKillSwitch match { - case OptionVal.Some(killSwitch) ⇒ - log.info("Stopping idle outbound control stream to [{}]", remoteAddress) - flightRecorder.loFreq(Transport_StopIdleOutbound, s"$remoteAddress - $queueIndex") - setControlIdleKillSwitch(OptionVal.None) - killSwitch.abort(OutboundStreamStopIdleSignal) - case OptionVal.None ⇒ - log.debug( - "Couldn't stop idle outbound control stream to [{}] due to missing KillSwitch.", - remoteAddress) + } else { + // only stop the transport parts of the stream because SystemMessageDelivery stage has + // state (seqno) and system messages might be sent at the same time + associationState.controlIdleKillSwitch match { + case OptionVal.Some(killSwitch) ⇒ + log.info("Stopping idle outbound control stream to [{}]", remoteAddress) + flightRecorder.loFreq(Transport_StopIdleOutbound, s"$remoteAddress - $queueIndex") + setControlIdleKillSwitch(OptionVal.None) + killSwitch.abort(OutboundStreamStopIdleSignal) + case OptionVal.None ⇒ // already stopped + } } } - } + } } + }(transport.system.dispatcher) - cancelIdleTimer() + if (!idleTimer.compareAndSet(None, Some(task))) { + // another thread did same thing and won + task.cancel() } - }(transport.system.dispatcher) - - if (!idleTask.compareAndSet(None, Some(task))) { - // another thread did same thing and won - task.cancel() } + } + private def cancelAllTimers(): Unit = { + cancelIdleTimer() + cancelStopQuarantinedTimer() } private def sendToDeadLetters[T](pending: Vector[OutboundEnvelope]): Unit = { @@ -631,7 +645,6 @@ private[remote] class Association( if (!controlQueue.isInstanceOf[QueueWrapper]) throw new IllegalStateException("associate() must only be called once") runOutboundStreams() - startIdleTimer() } private def runOutboundStreams(): Unit = { @@ -676,6 +689,7 @@ private[remote] class Association( materializing.countDown() updateStreamMatValues(ControlQueueIndex, streamKillSwitch, completed) + setupIdleTimer() attachOutboundStreamRestart("Outbound control stream", ControlQueueIndex, controlQueueSize, completed, () ⇒ runOutboundControlStream()) } @@ -810,13 +824,12 @@ private[remote] class Association( _outboundControlIngress = OptionVal.None } // LazyQueueWrapper will invoke the `restart` function when first message is offered - val restartAndStartIdleTimer: () ⇒ Unit = () ⇒ { + val wrappedRestartFun: () ⇒ Unit = () ⇒ { restart() - startIdleTimer() } if (!isRemovedAfterQuarantined()) - queues(queueIndex) = LazyQueueWrapper(createQueue(queueCapacity, queueIndex), restartAndStartIdleTimer) + queues(queueIndex) = LazyQueueWrapper(createQueue(queueCapacity, queueIndex), wrappedRestartFun) queuesVisibility = true // volatile write for visibility of the queues array } @@ -830,7 +843,7 @@ private[remote] class Association( streamCompleted.failed.foreach { case ArteryTransport.ShutdownSignal ⇒ // shutdown as expected - cancelIdleTimer() + cancelAllTimers() // countDown the latch in case threads are waiting on the latch in outboundControlIngress method materializing.countDown() case cause if transport.isShutdown || isRemovedAfterQuarantined() ⇒ @@ -838,15 +851,15 @@ private[remote] class Association( // for the TCP transport the ShutdownSignal is "converted" to StreamTcpException if (!cause.isInstanceOf[StreamTcpException]) log.error(cause, s"{} to [{}] failed after shutdown. {}", streamName, remoteAddress, cause.getMessage) - cancelIdleTimer() + cancelAllTimers() // countDown the latch in case threads are waiting on the latch in outboundControlIngress method materializing.countDown() case _: AeronTerminated ⇒ // shutdown already in progress - cancelIdleTimer() + cancelAllTimers() case _: AbruptTerminationException ⇒ // ActorSystem shutdown - cancelIdleTimer() + cancelAllTimers() case cause ⇒ // it might have been stopped as expected due to idle or quarantine @@ -884,7 +897,7 @@ private[remote] class Association( } else { log.error(cause, s"{} to [{}] failed and restarted {} times within {} seconds. Terminating system. ${cause.getMessage}", streamName, remoteAddress, advancedSettings.OutboundMaxRestarts, advancedSettings.OutboundRestartTimeout.toSeconds) - cancelIdleTimer() + cancelAllTimers() transport.system.terminate() } } diff --git a/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala b/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala index 71aa593a2a..3e48244931 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala @@ -42,6 +42,7 @@ private[remote] object OutboundHandshake { private case object HandshakeTimeout private case object HandshakeRetryTick private case object InjectHandshakeTick + private case object LivenessProbeTick } @@ -49,11 +50,13 @@ private[remote] object OutboundHandshake { * INTERNAL API */ private[remote] class OutboundHandshake( - system: ActorSystem, - outboundContext: OutboundContext, - outboundEnvelopePool: ObjectPool[ReusableOutboundEnvelope], - timeout: FiniteDuration, - retryInterval: FiniteDuration, injectHandshakeInterval: FiniteDuration) + system: ActorSystem, + outboundContext: OutboundContext, + outboundEnvelopePool: ObjectPool[ReusableOutboundEnvelope], + timeout: FiniteDuration, + retryInterval: FiniteDuration, + injectHandshakeInterval: FiniteDuration, + livenessProbeInterval: Duration) extends GraphStage[FlowShape[OutboundEnvelope, OutboundEnvelope]] { val in: Inlet[OutboundEnvelope] = Inlet("OutboundHandshake.in") @@ -61,29 +64,42 @@ private[remote] class OutboundHandshake( override val shape: FlowShape[OutboundEnvelope, OutboundEnvelope] = FlowShape(in, out) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = - new TimerGraphStageLogic(shape) with InHandler with OutHandler { + new TimerGraphStageLogic(shape) with InHandler with OutHandler with StageLogging { import OutboundHandshake._ private var handshakeState: HandshakeState = Start - private var pendingMessage: OutboundEnvelope = null + private var pendingMessage: OptionVal[OutboundEnvelope] = OptionVal.None private var injectHandshakeTickScheduled = false + override protected def logSource: Class[_] = classOf[OutboundHandshake] + override def preStart(): Unit = { scheduleOnce(HandshakeTimeout, timeout) + livenessProbeInterval match { + case d: FiniteDuration ⇒ schedulePeriodically(LivenessProbeTick, d) + case _ ⇒ // only used in control stream + } } // InHandler override def onPush(): Unit = { if (handshakeState != Completed) - throw new IllegalStateException(s"onPush before handshake completed, was [$handshakeState]") + throw new IllegalStateException(s"onPush before handshake completed, was [$handshakeState].") // inject a HandshakeReq once in a while to trigger a new handshake when destination // system has been restarted if (injectHandshakeTickScheduled) { - push(out, grab(in)) + // out is always available here, except for if a liveness HandshakeReq was just pushed + if (isAvailable(out)) + push(out, grab(in)) + else { + if (pendingMessage.isDefined) + throw new IllegalStateException(s"pendingMessage expected to be empty") + pendingMessage = OptionVal.Some(grab(in)) + } } else { pushHandshakeReq() - pendingMessage = grab(in) + pendingMessage = OptionVal.Some(grab(in)) } } @@ -91,11 +107,13 @@ private[remote] class OutboundHandshake( override def onPull(): Unit = { handshakeState match { case Completed ⇒ - if (pendingMessage eq null) - pull(in) - else { - push(out, pendingMessage) - pendingMessage = null + pendingMessage match { + case OptionVal.None ⇒ + if (!hasBeenPulled(in)) + pull(in) + case OptionVal.Some(p) ⇒ + push(out, p) + pendingMessage = OptionVal.None } case Start ⇒ @@ -131,10 +149,29 @@ private[remote] class OutboundHandshake( private def pushHandshakeReq(): Unit = { injectHandshakeTickScheduled = true scheduleOnce(InjectHandshakeTick, injectHandshakeInterval) - val env: OutboundEnvelope = outboundEnvelopePool.acquire().init( - recipient = OptionVal.None, message = HandshakeReq(outboundContext.localAddress, outboundContext.remoteAddress), sender = OptionVal.None) outboundContext.associationState.lastUsedTimestamp.set(System.nanoTime()) - push(out, env) + if (isAvailable(out)) + push(out, createHandshakeReqEnvelope()) + } + + private def pushLivenessProbeReq(): Unit = { + // The associationState.lastUsedTimestamp will be updated when the HandshakeRsp is received + // and that is the confirmation that the other system is alive, and will not be quarantined + // by the quarantine-idle-outbound-after even though no real messages have been sent. + if (handshakeState == Completed && isAvailable(out) && pendingMessage.isEmpty) { + val lastUsedDuration = (System.nanoTime() - outboundContext.associationState.lastUsedTimestamp.get()).nanos + if (lastUsedDuration >= livenessProbeInterval) { + log.info( + "Association to [{}] has been idle for [{}] seconds, sending HandshakeReq to validate liveness", + outboundContext.remoteAddress, lastUsedDuration.toSeconds) + push(out, createHandshakeReqEnvelope()) + } + } + } + + private def createHandshakeReqEnvelope(): OutboundEnvelope = { + outboundEnvelopePool.acquire().init( + recipient = OptionVal.None, message = HandshakeReq(outboundContext.localAddress, outboundContext.remoteAddress), sender = OptionVal.None) } private def handshakeCompleted(): Unit = { @@ -148,6 +185,8 @@ private[remote] class OutboundHandshake( case InjectHandshakeTick ⇒ // next onPush message will trigger sending of HandshakeReq injectHandshakeTickScheduled = false + case LivenessProbeTick ⇒ + pushLivenessProbeReq() case HandshakeRetryTick ⇒ if (isAvailable(out)) pushHandshakeReq() @@ -181,6 +220,11 @@ private[remote] class InboundHandshake(inboundContext: InboundContext, inControl env.message match { case HandshakeReq(from, to) ⇒ onHandshakeReq(from, to) case HandshakeRsp(from) ⇒ + // Touch the lastUsedTimestamp here also because when sending the extra low frequency HandshakeRsp + // the timestamp is not supposed to be updated when sending but when receiving reply, which confirms + // that the other system is alive. + inboundContext.association(from.address).associationState.lastUsedTimestamp.set(System.nanoTime()) + after(inboundContext.completeHandshake(from)) { pull(in) } diff --git a/akka-remote/src/main/scala/akka/remote/artery/compress/InboundCompressions.scala b/akka-remote/src/main/scala/akka/remote/artery/compress/InboundCompressions.scala index 5aa2a3fc0b..98f9015fd5 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/compress/InboundCompressions.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/compress/InboundCompressions.scala @@ -93,7 +93,7 @@ private[remote] final class InboundCompressionsImpl( override def confirmActorRefCompressionAdvertisement(originUid: Long, tableVersion: Byte): Unit = { _actorRefsIns.get(originUid) match { case null ⇒ // ignore - case a ⇒ a.confirmAdvertisement(tableVersion) + case a ⇒ a.confirmAdvertisement(tableVersion, gaveUp = false) } } /** Send compression table advertisement over control stream. Should be called from Decoder. */ @@ -124,7 +124,7 @@ private[remote] final class InboundCompressionsImpl( override def confirmClassManifestCompressionAdvertisement(originUid: Long, tableVersion: Byte): Unit = { _classManifestsIns.get(originUid) match { case null ⇒ // ignore - case a ⇒ a.confirmAdvertisement(tableVersion) + case a ⇒ a.confirmAdvertisement(tableVersion, gaveUp = false) } } /** Send compression table advertisement over control stream. Should be called from Decoder. */ @@ -296,6 +296,7 @@ private[remote] abstract class InboundCompression[T >: Null]( // We should not continue sending advertisements to an association that might be dead (not quarantined yet) @volatile private[this] var alive = true private[this] var resendCount = 0 + private[this] val maxResendCount = 3 private[this] val cms = new CountMinSketch(16, 1024, System.currentTimeMillis().toInt) @@ -338,7 +339,7 @@ private[remote] abstract class InboundCompression[T >: Null]( "Received first value from originUid [{}] compressed using the advertised compression table, " + "flipping to it (version: {})", originUid, current.nextTable.version) - confirmAdvertisement(incomingTableVersion) + confirmAdvertisement(incomingTableVersion, gaveUp = false) decompressInternal(incomingTableVersion, idx, attemptCounter + 1) // recurse case _ ⇒ @@ -354,15 +355,17 @@ private[remote] abstract class InboundCompression[T >: Null]( } } - final def confirmAdvertisement(tableVersion: Byte): Unit = { + final def confirmAdvertisement(tableVersion: Byte, gaveUp: Boolean): Unit = { tables.advertisementInProgress match { case Some(inProgress) if tableVersion == inProgress.version ⇒ tables = tables.startUsingNextTable() - log.debug("Confirmed compression table version [{}] for originUid [{}]", tableVersion, originUid) + log.debug( + "{} compression table version [{}] for originUid [{}]", + if (gaveUp) "Gave up" else "Confirmed", tableVersion, originUid) case Some(inProgress) if tableVersion != inProgress.version ⇒ log.debug( - "Confirmed compression table version [{}] for originUid [{}] but other version in progress [{}]", - tableVersion, originUid, inProgress.version) + "{} compression table version [{}] for originUid [{}] but other version in progress [{}]", + if (gaveUp) "Gave up" else "Confirmed", tableVersion, originUid, inProgress.version) case None ⇒ // already confirmed } @@ -410,7 +413,7 @@ private[remote] abstract class InboundCompression[T >: Null]( alive = false // will be set to true on first incoming message resendCount = 0 advertiseCompressionTable(association, table) - } else { + } else if (association.isOrdinaryMessageStreamActive()) { log.debug("{} for originUid [{}] not changed, no need to advertise same.", Logging.simpleName(tables.activeTable), originUid) } @@ -422,23 +425,23 @@ private[remote] abstract class InboundCompression[T >: Null]( case Some(inProgress) ⇒ resendCount += 1 - if (resendCount <= 5) { + if (resendCount <= maxResendCount) { // The ActorRefCompressionAdvertisement message is resent because it can be lost inboundContext.association(originUid) match { case OptionVal.Some(association) ⇒ log.debug( - "Advertisement in progress for originUid [{}] version {}, resending", - originUid, inProgress.version) + "Advertisement in progress for originUid [{}] version [{}], resending [{}:{}]", + originUid, inProgress.version, resendCount, maxResendCount) advertiseCompressionTable(association, inProgress) // resend case OptionVal.None ⇒ } } else { // give up, it might be dead log.debug( - "Advertisement in progress for originUid [{}] version {} but no confirmation after retries.", + "Advertisement in progress for originUid [{}] version [{}] but no confirmation after retries.", originUid, inProgress.version) - confirmAdvertisement(inProgress.version) + confirmAdvertisement(inProgress.version, gaveUp = true) } } } diff --git a/akka-remote/src/main/scala/akka/remote/artery/tcp/ArteryTcpTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/tcp/ArteryTcpTransport.scala index 67cbd4d6cd..09b3151f39 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/tcp/ArteryTcpTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/tcp/ArteryTcpTransport.scala @@ -128,7 +128,7 @@ private[remote] class ArteryTcpTransport(_system: ExtendedActorSystem, _provider def connectionFlowWithRestart: Flow[ByteString, ByteString, NotUsed] = { val flowFactory = () ⇒ { - val flow = + def flow(controlIdleKillSwitch: OptionVal[SharedKillSwitch]) = Flow[ByteString] .via(Flow.lazyInitAsync(() ⇒ { // only open the actual connection if any new messages are sent @@ -136,6 +136,8 @@ private[remote] class ArteryTcpTransport(_system: ExtendedActorSystem, _provider TcpOutbound_Connected, s"${outboundContext.remoteAddress.host.get}:${outboundContext.remoteAddress.port.get} " + s"/ ${streamName(streamId)}") + if (controlIdleKillSwitch.isDefined) + outboundContext.asInstanceOf[Association].setControlIdleKillSwitch(controlIdleKillSwitch) Future.successful( Flow[ByteString] .prepend(Source.single(TcpFraming.encodeConnectionHeader(streamId))) @@ -150,29 +152,19 @@ private[remote] class ArteryTcpTransport(_system: ExtendedActorSystem, _provider val controlIdleKillSwitch = KillSwitches.shared("outboundControlStreamIdleKillSwitch") Flow[ByteString] .via(controlIdleKillSwitch.flow) - .via(flow) - .mapMaterializedValue { _ ⇒ - outboundContext.asInstanceOf[Association].setControlIdleKillSwitch(OptionVal.Some(controlIdleKillSwitch)) - NotUsed - } + .via(flow(OptionVal.Some(controlIdleKillSwitch))) } else { - flow + flow(OptionVal.None) } } - if (streamId == ControlStreamId) { - // restart of inner connection part important in control flow, since system messages - // are buffered and resent from the outer SystemMessageDelivery stage. - RestartFlow.withBackoff[ByteString, ByteString]( - settings.Advanced.OutboundRestartBackoff, - settings.Advanced.GiveUpSystemMessageAfter, 0.1)(flowFactory) - } else { - // Best effort retry a few times - RestartFlow.withBackoff[ByteString, ByteString]( - settings.Advanced.OutboundRestartBackoff, - settings.Advanced.OutboundRestartBackoff * 5, 0.1, maxRestarts = 3)(flowFactory) - } - + val maxRestarts = if (streamId == ControlStreamId) Int.MaxValue else 3 + // Restart of inner connection part important in control stream, since system messages + // are buffered and resent from the outer SystemMessageDelivery stage. No maxRestarts limit for control + // stream. For message stream it's best effort retry a few times. + RestartFlow.withBackoff[ByteString, ByteString]( + settings.Advanced.OutboundRestartBackoff, + settings.Advanced.OutboundRestartBackoff * 5, 0.1, maxRestarts)(flowFactory) } Flow[EnvelopeBuffer] @@ -416,7 +408,7 @@ private[remote] class ArteryTcpTransport(_system: ExtendedActorSystem, _provider } override protected def shutdownTransport(): Future[Done] = { - implicit val ec: ExecutionContext = materializer.executionContext + import system.dispatcher inboundKillSwitch.shutdown() unbind().map { _ ⇒ topLevelFlightRecorder.loFreq(Transport_Stopped, NoMetaData) @@ -425,9 +417,9 @@ private[remote] class ArteryTcpTransport(_system: ExtendedActorSystem, _provider } private def unbind(): Future[Done] = { - implicit val ec: ExecutionContext = materializer.executionContext serverBinding match { case Some(binding) ⇒ + import system.dispatcher for { b ← binding _ ← b.unbind() diff --git a/akka-remote/src/test/scala/akka/remote/RemoteWatcherSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteWatcherSpec.scala index 96d5958aec..6fd836c9ff 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteWatcherSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteWatcherSpec.scala @@ -54,7 +54,7 @@ object RemoteWatcherSpec { // that doesn't interfere with the real watch that is going on in the background context.system.eventStream.publish(TestRemoteWatcher.AddressTerm(address)) - override def quarantine(address: Address, uid: Option[Long], reason: String): Unit = { + override def quarantine(address: Address, uid: Option[Long], reason: String, harmless: Boolean): Unit = { // don't quarantine in remoting, but publish a testable message context.system.eventStream.publish(TestRemoteWatcher.Quarantined(address, uid)) } diff --git a/akka-remote/src/test/scala/akka/remote/artery/OutboundHandshakeSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/OutboundHandshakeSpec.scala index 1be83699f3..4af81c098b 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/OutboundHandshakeSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/OutboundHandshakeSpec.scala @@ -33,11 +33,13 @@ class OutboundHandshakeSpec extends AkkaSpec with ImplicitSender { private def setupStream( outboundContext: OutboundContext, timeout: FiniteDuration = 5.seconds, retryInterval: FiniteDuration = 10.seconds, - injectHandshakeInterval: FiniteDuration = 10.seconds): (TestPublisher.Probe[String], TestSubscriber.Probe[Any]) = { + injectHandshakeInterval: FiniteDuration = 10.seconds, + livenessProbeInterval: Duration = Duration.Undefined): (TestPublisher.Probe[String], TestSubscriber.Probe[Any]) = { TestSource.probe[String] .map(msg ⇒ outboundEnvelopePool.acquire().init(OptionVal.None, msg, OptionVal.None)) - .via(new OutboundHandshake(system, outboundContext, outboundEnvelopePool, timeout, retryInterval, injectHandshakeInterval)) + .via(new OutboundHandshake(system, outboundContext, outboundEnvelopePool, timeout, retryInterval, + injectHandshakeInterval, livenessProbeInterval)) .map(env ⇒ env.message) .toMat(TestSink.probe[Any])(Keep.both) .run() @@ -130,6 +132,21 @@ class OutboundHandshakeSpec extends AkkaSpec with ImplicitSender { downstream.cancel() } + "send HandshakeReq for liveness probing" in { + val inboundContext = new TestInboundContext(localAddress = addressA) + val outboundContext = inboundContext.association(addressB.address) + val (upstream, downstream) = setupStream(outboundContext, livenessProbeInterval = 200.millis) + + downstream.request(10) + // this is from the initial + downstream.expectNext(HandshakeReq(addressA, addressB.address)) + inboundContext.completeHandshake(addressB) + // these are from livenessProbeInterval + downstream.expectNext(HandshakeReq(addressA, addressB.address)) + downstream.expectNext(HandshakeReq(addressA, addressB.address)) + downstream.cancel() + } + } } diff --git a/akka-remote/src/test/scala/akka/remote/artery/OutboundIdleShutdownSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/OutboundIdleShutdownSpec.scala index 436dd67fd7..8ec8241a46 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/OutboundIdleShutdownSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/OutboundIdleShutdownSpec.scala @@ -105,9 +105,6 @@ class OutboundIdleShutdownSpec extends ArteryMultiNodeSpec(s""" assertStreamActive(association, Association.OrdinaryQueueIndex, expected = false) } - Thread.sleep(2000) - // localArtery.quarantine(remoteAddress, Some(remoteUid), "Test") - // the outbound streams are inactive and association quarantined, then it's completely removed eventually { localArtery.remoteAddresses should not contain remoteAddress diff --git a/akka-remote/src/test/scala/akka/remote/artery/RemoteWatcherSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/RemoteWatcherSpec.scala index 9d5e2b7e8b..ca4b9e3190 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/RemoteWatcherSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/RemoteWatcherSpec.scala @@ -55,7 +55,7 @@ object RemoteWatcherSpec { // that doesn't interfere with the real watch that is going on in the background context.system.eventStream.publish(TestRemoteWatcher.AddressTerm(address)) - override def quarantine(address: Address, uid: Option[Long], reason: String): Unit = { + override def quarantine(address: Address, uid: Option[Long], reason: String, harmless: Boolean): Unit = { // don't quarantine in remoting, but publish a testable message context.system.eventStream.publish(TestRemoteWatcher.Quarantined(address, uid)) }