From cd4a31e74db45f14b2837ac2defd0d70346b7412 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 8 Sep 2016 15:01:32 +0200 Subject: [PATCH 1/2] No ack delivery for prio messages, #21371 * and send prio messages enclosed in actor selection over the control stream --- .../akka/remote/artery/Association.scala | 8 +++- .../remote/artery/SystemMessageDelivery.scala | 39 ++++++++++-------- .../artery/SystemMessageDeliverySpec.scala | 40 ++++++++++--------- 3 files changed, 51 insertions(+), 36 deletions(-) diff --git a/akka-remote/src/main/scala/akka/remote/artery/Association.scala b/akka-remote/src/main/scala/akka/remote/artery/Association.scala index 57c3531175..cd42e7ec73 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Association.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Association.scala @@ -287,12 +287,18 @@ private[remote] class Association( // allow ActorSelectionMessage to pass through quarantine, to be able to establish interaction with new system if (message.isInstanceOf[ActorSelectionMessage] || !associationState.isQuarantined() || message == ClearSystemMessageDelivery) { message match { - case _: SystemMessage | ClearSystemMessageDelivery | _: ControlMessage ⇒ + case _: SystemMessage ⇒ val outboundEnvelope = createOutboundEnvelope() if (!controlQueue.offer(createOutboundEnvelope())) { quarantine(reason = s"Due to overflow of control queue, size [$controlQueueSize]") dropped(ControlQueueIndex, controlQueueSize, outboundEnvelope) } + case ActorSelectionMessage(_: PriorityMessage, _, _) | _: ControlMessage | ClearSystemMessageDelivery ⇒ + // ActorSelectionMessage with PriorityMessage is used by cluster and remote failure detector heartbeating + val outboundEnvelope = createOutboundEnvelope() + if (!controlQueue.offer(createOutboundEnvelope())) { + dropped(ControlQueueIndex, controlQueueSize, outboundEnvelope) + } case _: DaemonMsgCreate ⇒ // DaemonMsgCreate is not a SystemMessage, but must be sent over the control stream because // remote deployment process depends on message ordering for DaemonMsgCreate and Watch messages. diff --git a/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala b/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala index 3eaead00b7..957c432036 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala @@ -23,6 +23,9 @@ import akka.stream.stage.OutHandler import akka.stream.stage.TimerGraphStageLogic import akka.remote.artery.OutboundHandshake.HandshakeReq import akka.actor.ActorRef +import akka.remote.PriorityMessage +import akka.actor.ActorSelectionMessage +import akka.dispatch.sysmsg.SystemMessage /** * INTERNAL API @@ -36,6 +39,10 @@ private[akka] object SystemMessageDelivery { final case object ClearSystemMessageDelivery private case object ResendTick + + // If other message types than SystemMesage need acked delivery they can extend this trait. + // Used in tests since real SystemMessage are somewhat cumbersome to create. + trait AckedDeliveryMessage } /** @@ -166,22 +173,7 @@ private[akka] class SystemMessageDelivery( override def onPush(): Unit = { val outboundEnvelope = grab(in) outboundEnvelope.message match { - case _: HandshakeReq ⇒ - // pass on HandshakeReq - if (isAvailable(out)) - pushCopy(outboundEnvelope) - case ClearSystemMessageDelivery ⇒ - clear() - pull(in) - case _: ControlMessage ⇒ - // e.g. ActorSystemTerminating, no need for acked delivery - if (resending.isEmpty && isAvailable(out)) - pushCopy(outboundEnvelope) - else { - resending.offer(outboundEnvelope) - tryResend() - } - case msg ⇒ + case msg @ (_: SystemMessage | _: AckedDeliveryMessage) ⇒ if (unacknowledged.size < maxBufferSize) { seqNo += 1 val sendEnvelope = outboundEnvelope.withMessage(SystemMessageEnvelope(msg, seqNo, localAddress)) @@ -199,6 +191,21 @@ private[akka] class SystemMessageDelivery( deadLetters ! outboundEnvelope pull(in) } + case _: HandshakeReq ⇒ + // pass on HandshakeReq + if (isAvailable(out)) + pushCopy(outboundEnvelope) + case ClearSystemMessageDelivery ⇒ + clear() + pull(in) + case _ ⇒ + // e.g. ActorSystemTerminating or ActorSelectionMessage with PriorityMessage, no need for acked delivery + if (resending.isEmpty && isAvailable(out)) + push(out, outboundEnvelope) + else { + resending.offer(outboundEnvelope) + tryResend() + } } } diff --git a/akka-remote/src/test/scala/akka/remote/artery/SystemMessageDeliverySpec.scala b/akka-remote/src/test/scala/akka/remote/artery/SystemMessageDeliverySpec.scala index 3ad664f708..9264c7d980 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/SystemMessageDeliverySpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/SystemMessageDeliverySpec.scala @@ -45,6 +45,8 @@ object SystemMessageDeliverySpec { akka.actor.serialize-messages = off """) + case class TestSysMsg(s: String) extends SystemMessageDelivery.AckedDeliveryMessage + } class SystemMessageDeliverySpec extends AkkaSpec(SystemMessageDeliverySpec.config) with ImplicitSender { @@ -68,7 +70,7 @@ class SystemMessageDeliverySpec extends AkkaSpec(SystemMessageDeliverySpec.confi private def send(sendCount: Int, resendInterval: FiniteDuration, outboundContext: OutboundContext): Source[OutboundEnvelope, NotUsed] = { val deadLetters = TestProbe().ref Source(1 to sendCount) - .map(n ⇒ outboundEnvelopePool.acquire().init(OptionVal.None, "msg-" + n, OptionVal.None)) + .map(n ⇒ outboundEnvelopePool.acquire().init(OptionVal.None, TestSysMsg("msg-" + n), OptionVal.None)) .via(new SystemMessageDelivery(outboundContext, deadLetters, resendInterval, maxBufferSize = 1000)) } @@ -159,12 +161,12 @@ class SystemMessageDeliverySpec extends AkkaSpec(SystemMessageDeliverySpec.confi val sink = send(sendCount = 5, resendInterval = 60.seconds, outboundContextA) .via(drop(dropSeqNumbers = Vector(3L, 4L))) .via(inbound(inboundContextB)) - .map(_.message.asInstanceOf[String]) + .map(_.message.asInstanceOf[TestSysMsg]) .runWith(TestSink.probe) sink.request(100) - sink.expectNext("msg-1") - sink.expectNext("msg-2") + sink.expectNext(TestSysMsg("msg-1")) + sink.expectNext(TestSysMsg("msg-2")) replyProbe.expectMsg(Ack(1L, addressB)) replyProbe.expectMsg(Ack(2L, addressB)) // 3 and 4 was dropped @@ -172,11 +174,11 @@ class SystemMessageDeliverySpec extends AkkaSpec(SystemMessageDeliverySpec.confi sink.expectNoMsg(100.millis) // 3 was dropped inboundContextB.deliverLastReply() // resending 3, 4, 5 - sink.expectNext("msg-3") + sink.expectNext(TestSysMsg("msg-3")) replyProbe.expectMsg(Ack(3L, addressB)) - sink.expectNext("msg-4") + sink.expectNext(TestSysMsg("msg-4")) replyProbe.expectMsg(Ack(4L, addressB)) - sink.expectNext("msg-5") + sink.expectNext(TestSysMsg("msg-5")) replyProbe.expectMsg(Ack(5L, addressB)) replyProbe.expectNoMsg(100.millis) inboundContextB.deliverLastReply() @@ -193,7 +195,7 @@ class SystemMessageDeliverySpec extends AkkaSpec(SystemMessageDeliverySpec.confi val sink = send(sendCount = 3, resendInterval = 60.seconds, outboundContextA) .via(drop(dropSeqNumbers = Vector(1L))) .via(inbound(inboundContextB)) - .map(_.message.asInstanceOf[String]) + .map(_.message.asInstanceOf[TestSysMsg]) .runWith(TestSink.probe) sink.request(100) @@ -202,11 +204,11 @@ class SystemMessageDeliverySpec extends AkkaSpec(SystemMessageDeliverySpec.confi sink.expectNoMsg(100.millis) // 1 was dropped inboundContextB.deliverLastReply() // it's ok to not delivery all nacks // resending 1, 2, 3 - sink.expectNext("msg-1") + sink.expectNext(TestSysMsg("msg-1")) replyProbe.expectMsg(Ack(1L, addressB)) - sink.expectNext("msg-2") + sink.expectNext(TestSysMsg("msg-2")) replyProbe.expectMsg(Ack(2L, addressB)) - sink.expectNext("msg-3") + sink.expectNext(TestSysMsg("msg-3")) replyProbe.expectMsg(Ack(3L, addressB)) inboundContextB.deliverLastReply() sink.expectComplete() @@ -222,19 +224,19 @@ class SystemMessageDeliverySpec extends AkkaSpec(SystemMessageDeliverySpec.confi val sink = send(sendCount = 3, resendInterval = 2.seconds, outboundContextA) .via(drop(dropSeqNumbers = Vector(3L))) .via(inbound(inboundContextB)) - .map(_.message.asInstanceOf[String]) + .map(_.message.asInstanceOf[TestSysMsg]) .runWith(TestSink.probe) sink.request(100) - sink.expectNext("msg-1") + sink.expectNext(TestSysMsg("msg-1")) replyProbe.expectMsg(Ack(1L, addressB)) inboundContextB.deliverLastReply() - sink.expectNext("msg-2") + sink.expectNext(TestSysMsg("msg-2")) replyProbe.expectMsg(Ack(2L, addressB)) inboundContextB.deliverLastReply() sink.expectNoMsg(200.millis) // 3 was dropped // resending 3 due to timeout - sink.expectNext("msg-3") + sink.expectNext(TestSysMsg("msg-3")) replyProbe.expectMsg(4.seconds, Ack(3L, addressB)) // continue resending replyProbe.expectMsg(4.seconds, Ack(3L, addressB)) @@ -255,10 +257,10 @@ class SystemMessageDeliverySpec extends AkkaSpec(SystemMessageDeliverySpec.confi send(N, 1.second, outboundContextA) .via(randomDrop(dropRate)) .via(inbound(inboundContextB)) - .map(_.message.asInstanceOf[String]) + .map(_.message.asInstanceOf[TestSysMsg]) .runWith(Sink.seq) - Await.result(output, 20.seconds) should ===((1 to N).map("msg-" + _).toVector) + Await.result(output, 20.seconds) should ===((1 to N).map(n ⇒ TestSysMsg("msg-" + n)).toVector) } "deliver all during throttling and random dropping" in { @@ -274,10 +276,10 @@ class SystemMessageDeliverySpec extends AkkaSpec(SystemMessageDeliverySpec.confi .throttle(200, 1.second, 10, ThrottleMode.shaping) .via(randomDrop(dropRate)) .via(inbound(inboundContextB)) - .map(_.message.asInstanceOf[String]) + .map(_.message.asInstanceOf[TestSysMsg]) .runWith(Sink.seq) - Await.result(output, 20.seconds) should ===((1 to N).map("msg-" + _).toVector) + Await.result(output, 20.seconds) should ===((1 to N).map(n ⇒ TestSysMsg("msg-" + n)).toVector) } } From 1584c521902254452e8551a1323621c21e0df3bd Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 9 Sep 2016 07:45:21 +0200 Subject: [PATCH 2/2] handle longer network partitions, #21399 * system messages in flight should not trigger premature quarantine in case of longer network partitions, therefore we keep the control stream alive * add give-up-system-message-after property that is used by both SystemMessageDelivery and AeronSink in the control stream * also unwrap SystemMessageEnvelope in RemoteDeadLetterActorRef * skip sending control messages after shutdown, can be triggered by scheduled compression advertisment --- .../artery/AeronStreamConcistencySpec.scala | 8 +- .../artery/AeronStreamLatencySpec.scala | 8 +- .../artery/AeronStreamMaxThroughputSpec.scala | 4 +- .../artery/SurviveNetworkPartitionSpec.scala | 113 ++++++++++++++++++ akka-remote/src/main/resources/reference.conf | 7 +- .../akka/remote/RemoteActorRefProvider.scala | 10 +- .../scala/akka/remote/artery/AeronSink.scala | 21 ++-- .../akka/remote/artery/ArterySettings.scala | 6 +- .../akka/remote/artery/ArteryTransport.scala | 5 +- .../akka/remote/artery/Association.scala | 4 +- .../remote/artery/SystemMessageDelivery.scala | 22 +++- .../akka/remote/artery/AeronSinkSpec.scala | 4 +- 12 files changed, 181 insertions(+), 31 deletions(-) create mode 100644 akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/SurviveNetworkPartitionSpec.scala diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamConcistencySpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamConcistencySpec.scala index 9103899eb4..240607651d 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamConcistencySpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamConcistencySpec.scala @@ -82,7 +82,7 @@ abstract class AeronStreamConsistencySpec } val streamId = 1 - val giveUpSendAfter = 30.seconds + val giveUpMessageAfter = 30.seconds override def afterAll(): Unit = { taskRunner.stop() @@ -98,7 +98,7 @@ abstract class AeronStreamConsistencySpec runOn(second) { // just echo back Source.fromGraph(new AeronSource(channel(second), streamId, aeron, taskRunner, pool, IgnoreEventSink)) - .runWith(new AeronSink(channel(first), streamId, aeron, taskRunner, pool, giveUpSendAfter, IgnoreEventSink)) + .runWith(new AeronSink(channel(first), streamId, aeron, taskRunner, pool, giveUpMessageAfter, IgnoreEventSink)) } enterBarrier("echo-started") } @@ -139,7 +139,7 @@ abstract class AeronStreamConsistencySpec envelope } .throttle(1, 200.milliseconds, 1, ThrottleMode.Shaping) - .runWith(new AeronSink(channel(second), streamId, aeron, taskRunner, pool, giveUpSendAfter, IgnoreEventSink)) + .runWith(new AeronSink(channel(second), streamId, aeron, taskRunner, pool, giveUpMessageAfter, IgnoreEventSink)) started.expectMsg(Done) } @@ -151,7 +151,7 @@ abstract class AeronStreamConsistencySpec envelope.byteBuffer.flip() envelope } - .runWith(new AeronSink(channel(second), streamId, aeron, taskRunner, pool, giveUpSendAfter, IgnoreEventSink)) + .runWith(new AeronSink(channel(second), streamId, aeron, taskRunner, pool, giveUpMessageAfter, IgnoreEventSink)) Await.ready(done, 20.seconds) killSwitch.shutdown() diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamLatencySpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamLatencySpec.scala index 4b0fd67dd0..8b279d21d6 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamLatencySpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamLatencySpec.scala @@ -115,7 +115,7 @@ abstract class AeronStreamLatencySpec } val streamId = 1 - val giveUpSendAfter = 30.seconds + val giveUpMessageAfter = 30.seconds lazy val reporterExecutor = Executors.newFixedThreadPool(1) def reporter(name: String): TestRateReporter = { @@ -245,7 +245,7 @@ abstract class AeronStreamLatencySpec envelope } .throttle(1, 200.milliseconds, 1, ThrottleMode.Shaping) - .runWith(new AeronSink(channel(second), streamId, aeron, taskRunner, pool, giveUpSendAfter, IgnoreEventSink)) + .runWith(new AeronSink(channel(second), streamId, aeron, taskRunner, pool, giveUpMessageAfter, IgnoreEventSink)) started.expectMsg(Done) } @@ -264,7 +264,7 @@ abstract class AeronStreamLatencySpec val queueValue = Source.fromGraph(new SendQueue[Unit]) .via(sendFlow) - .to(new AeronSink(channel(second), streamId, aeron, taskRunner, pool, giveUpSendAfter, IgnoreEventSink)) + .to(new AeronSink(channel(second), streamId, aeron, taskRunner, pool, giveUpMessageAfter, IgnoreEventSink)) .run() val queue = new ManyToOneConcurrentArrayQueue[Unit](1024) @@ -314,7 +314,7 @@ abstract class AeronStreamLatencySpec runOn(second) { // just echo back Source.fromGraph(new AeronSource(channel(second), streamId, aeron, taskRunner, pool, IgnoreEventSink)) - .runWith(new AeronSink(channel(first), streamId, aeron, taskRunner, pool, giveUpSendAfter, IgnoreEventSink)) + .runWith(new AeronSink(channel(first), streamId, aeron, taskRunner, pool, giveUpMessageAfter, IgnoreEventSink)) } enterBarrier("echo-started") } diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamMaxThroughputSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamMaxThroughputSpec.scala index a1f9a7ee4d..cd75f21fef 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamMaxThroughputSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamMaxThroughputSpec.scala @@ -115,7 +115,7 @@ abstract class AeronStreamMaxThroughputSpec } val streamId = 1 - val giveUpSendAfter = 30.seconds + val giveUpMessageAfter = 30.seconds lazy val reporterExecutor = Executors.newFixedThreadPool(1) def reporter(name: String): TestRateReporter = { @@ -213,7 +213,7 @@ abstract class AeronStreamMaxThroughputSpec envelope.byteBuffer.flip() envelope } - .runWith(new AeronSink(channel(second), streamId, aeron, taskRunner, pool, giveUpSendAfter, IgnoreEventSink)) + .runWith(new AeronSink(channel(second), streamId, aeron, taskRunner, pool, giveUpMessageAfter, IgnoreEventSink)) printStats("sender") enterBarrier(testName + "-done") diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/SurviveNetworkPartitionSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/SurviveNetworkPartitionSpec.scala new file mode 100644 index 0000000000..9eab4bab1b --- /dev/null +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/SurviveNetworkPartitionSpec.scala @@ -0,0 +1,113 @@ +/** + * Copyright (C) 2016 Lightbend Inc. + */ +package akka.remote.artery + +import scala.concurrent.duration._ +import akka.actor._ +import akka.actor.ActorIdentity +import akka.actor.Identify +import akka.remote.RARP +import akka.remote.testconductor.RoleName +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.remote.testkit.STMultiNodeSpec +import akka.remote.transport.ThrottlerTransportAdapter.Direction +import akka.testkit._ +import com.typesafe.config.ConfigFactory +import akka.remote.QuarantinedEvent + +object SurviveNetworkPartitionSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + + commonConfig(debugConfig(on = false).withFallback( + ConfigFactory.parseString(""" + akka.loglevel = INFO + akka.remote.artery.enabled = on + akka.remote.artery.advanced.give-up-system-message-after = 4s + """))) + + testTransport(on = true) +} + +class SurviveNetworkPartitionSpecMultiJvmNode1 extends SurviveNetworkPartitionSpec +class SurviveNetworkPartitionSpecMultiJvmNode2 extends SurviveNetworkPartitionSpec + +abstract class SurviveNetworkPartitionSpec + extends MultiNodeSpec(SurviveNetworkPartitionSpec) + with STMultiNodeSpec with ImplicitSender { + + import SurviveNetworkPartitionSpec._ + + override def initialParticipants = roles.size + + "Network partition" must { + + "not quarantine system when it heals within 'give-up-system-message-after'" taggedAs LongRunningTest in { + + runOn(second) { + system.actorOf(TestActors.echoActorProps, "echo1") + } + enterBarrier("echo-started") + + runOn(first) { + system.actorSelection(node(second) / "user" / "echo1") ! Identify(None) + val ref = expectMsgType[ActorIdentity].ref.get + ref ! "ping1" + expectMsg("ping1") + + // network partition + testConductor.blackhole(first, second, Direction.Both).await + + // send system message during network partition + watch(ref) + // keep the network partition for a while, but shorter than give-up-system-message-after + expectNoMsg(RARP(system).provider.remoteSettings.Artery.Advanced.GiveUpSystemMessageAfter - 2.second) + + // heal the network partition + testConductor.passThrough(first, second, Direction.Both).await + + // not quarantined + ref ! "ping2" + expectMsg("ping2") + + ref ! PoisonPill + expectTerminated(ref) + } + + enterBarrier("done") + } + + "quarantine system when it doesn't heal within 'give-up-system-message-after'" taggedAs LongRunningTest in { + + runOn(second) { + system.actorOf(TestActors.echoActorProps, "echo2") + } + enterBarrier("echo-started") + + runOn(first) { + val qProbe = TestProbe() + system.eventStream.subscribe(qProbe.ref, classOf[QuarantinedEvent]) + system.actorSelection(node(second) / "user" / "echo2") ! Identify(None) + val ref = expectMsgType[ActorIdentity].ref.get + ref ! "ping1" + expectMsg("ping1") + + // network partition + testConductor.blackhole(first, second, Direction.Both).await + + // send system message during network partition + watch(ref) + // keep the network partition for a while, longer than give-up-system-message-after + expectNoMsg(RARP(system).provider.remoteSettings.Artery.Advanced.GiveUpSystemMessageAfter - 1.second) + qProbe.expectMsgType[QuarantinedEvent](5.seconds).address should ===(node(second).address) + + expectTerminated(ref) + } + + enterBarrier("done") + } + + } +} diff --git a/akka-remote/src/main/resources/reference.conf b/akka-remote/src/main/resources/reference.conf index aca07a396a..2a210b6ee5 100644 --- a/akka-remote/src/main/resources/reference.conf +++ b/akka-remote/src/main/resources/reference.conf @@ -246,7 +246,12 @@ akka { inject-handshake-interval = 1 second # messages that are not accepted by Aeron are dropped after retrying for this period - give-up-send-after = 60 seconds + give-up-message-after = 60 seconds + + # System messages that are not acknowledged after re-sending for this period are + # dropped and will trigger quarantine. The value should be longer than the length + # of a network partition that you need to survive. + give-up-system-message-after = 6 hours # during ActorSystem termination the remoting will wait this long for # an acknowledgment by the destination system that flushing of outstanding diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 1aa85416f6..28ced7a91b 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -20,6 +20,7 @@ import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics } import akka.remote.artery.ArteryTransport import akka.util.OptionVal import akka.remote.artery.OutboundEnvelope +import akka.remote.artery.SystemMessageDelivery.SystemMessageEnvelope /** * INTERNAL API @@ -98,14 +99,19 @@ private[akka] object RemoteActorRefProvider { // the dead letter status if (seqOpt.isEmpty) super.!(DeadLetter(m, senderOption.getOrElse(_provider.deadLetters), recipient)) case env: OutboundEnvelope ⇒ - super.!(DeadLetter(env.message, env.sender.getOrElse(_provider.deadLetters), + super.!(DeadLetter(unwrapSystemMessageEnvelope(env.message), env.sender.getOrElse(_provider.deadLetters), env.recipient.getOrElse(_provider.deadLetters))) case DeadLetter(env: OutboundEnvelope, _, _) ⇒ - super.!(DeadLetter(env.message, env.sender.getOrElse(_provider.deadLetters), + super.!(DeadLetter(unwrapSystemMessageEnvelope(env.message), env.sender.getOrElse(_provider.deadLetters), env.recipient.getOrElse(_provider.deadLetters))) case _ ⇒ super.!(message)(sender) } + private def unwrapSystemMessageEnvelope(msg: AnyRef): AnyRef = msg match { + case SystemMessageEnvelope(m, _, _) ⇒ m + case _ ⇒ msg + } + @throws(classOf[java.io.ObjectStreamException]) override protected def writeReplace(): AnyRef = DeadLetterActorRef.serialized } diff --git a/akka-remote/src/main/scala/akka/remote/artery/AeronSink.scala b/akka-remote/src/main/scala/akka/remote/artery/AeronSink.scala index b90d412403..9d3e5a2fd3 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/AeronSink.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/AeronSink.scala @@ -3,6 +3,7 @@ */ package akka.remote.artery +import akka.util.PrettyDuration.PrettyPrintableDuration import java.nio.ByteBuffer import java.util.concurrent.atomic.AtomicInteger import scala.annotation.tailrec @@ -28,7 +29,7 @@ import org.agrona.hints.ThreadHints object AeronSink { - final class GaveUpSendingException(msg: String) extends RuntimeException(msg) with NoStackTrace + final class GaveUpMessageException(msg: String) extends RuntimeException(msg) with NoStackTrace final class PublicationClosedException(msg: String) extends RuntimeException(msg) with NoStackTrace @@ -75,13 +76,13 @@ object AeronSink { * @param channel eg. "aeron:udp?endpoint=localhost:40123" */ class AeronSink( - channel: String, - streamId: Int, - aeron: Aeron, - taskRunner: TaskRunner, - pool: EnvelopeBufferPool, - giveUpSendAfter: Duration, - flightRecorder: EventSink) + channel: String, + streamId: Int, + aeron: Aeron, + taskRunner: TaskRunner, + pool: EnvelopeBufferPool, + giveUpAfter: Duration, + flightRecorder: EventSink) extends GraphStageWithMaterializedValue[SinkShape[EnvelopeBuffer], Future[Done]] { import AeronSink._ import TaskRunner._ @@ -104,7 +105,7 @@ class AeronSink( private var backoffCount = spinning private var lastMsgSize = 0 private val offerTask = new OfferTask(pub, null, lastMsgSize, getAsyncCallback(_ ⇒ taskOnOfferSuccess()), - giveUpSendAfter, getAsyncCallback(_ ⇒ onGiveUp()), getAsyncCallback(_ ⇒ onPublicationClosed())) + giveUpAfter, getAsyncCallback(_ ⇒ onGiveUp()), getAsyncCallback(_ ⇒ onPublicationClosed())) private val addOfferTask: Add = Add(offerTask) private var offerTaskInProgress = false @@ -191,7 +192,7 @@ class AeronSink( private def onGiveUp(): Unit = { offerTaskInProgress = false - val cause = new GaveUpSendingException(s"Gave up sending message to $channel after $giveUpSendAfter.") + val cause = new GaveUpMessageException(s"Gave up sending message to $channel after ${giveUpAfter.pretty}.") flightRecorder.alert(AeronSink_GaveUpEnvelope, cause.getMessage.getBytes("US-ASCII")) completedValue = Failure(cause) failStage(cause) diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArterySettings.scala b/akka-remote/src/main/scala/akka/remote/artery/ArterySettings.scala index 6f008d4235..0d3c6cc1a8 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArterySettings.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArterySettings.scala @@ -89,8 +89,10 @@ private[akka] final class ArterySettings private (config: Config) { interval > Duration.Zero, "handshake-retry-interval must be more than zero") val InjectHandshakeInterval = config.getMillisDuration("inject-handshake-interval").requiring(interval ⇒ interval > Duration.Zero, "inject-handshake-interval must be more than zero") - val GiveUpSendAfter = config.getMillisDuration("give-up-send-after").requiring(interval ⇒ - interval > Duration.Zero, "give-up-send-after must be more than zero") + val GiveUpMessageAfter = config.getMillisDuration("give-up-message-after").requiring(interval ⇒ + interval > Duration.Zero, "give-up-message-after must be more than zero") + val GiveUpSystemMessageAfter = config.getMillisDuration("give-up-system-message-after").requiring(interval ⇒ + interval > Duration.Zero, "give-up-system-message-after must be more than zero") val ShutdownFlushTimeout = config.getMillisDuration("shutdown-flush-timeout").requiring(interval ⇒ interval > Duration.Zero, "shutdown-flush-timeout must be more than zero") val InboundRestartTimeout = config.getMillisDuration("inbound-restart-timeout").requiring(interval ⇒ diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala index c7f04f6e81..35cc7ee5e9 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala @@ -876,8 +876,11 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R aeronSink(outboundContext, ordinaryStreamId) private def aeronSink(outboundContext: OutboundContext, streamId: Int): Sink[EnvelopeBuffer, Future[Done]] = { + val giveUpAfter = + if (streamId == controlStreamId) settings.Advanced.GiveUpSystemMessageAfter + else settings.Advanced.GiveUpMessageAfter Sink.fromGraph(new AeronSink(outboundChannel(outboundContext.remoteAddress), streamId, aeron, taskRunner, - envelopeBufferPool, settings.Advanced.GiveUpSendAfter, createFlightRecorderEventSink())) + envelopeBufferPool, giveUpAfter, createFlightRecorderEventSink())) } def outboundLane(outboundContext: OutboundContext): Flow[OutboundEnvelope, EnvelopeBuffer, ChangeOutboundCompression] = diff --git a/akka-remote/src/main/scala/akka/remote/artery/Association.scala b/akka-remote/src/main/scala/akka/remote/artery/Association.scala index cd42e7ec73..6894e02e49 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Association.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Association.scala @@ -24,7 +24,7 @@ import akka.event.Logging import akka.remote._ import akka.remote.DaemonMsgCreate import akka.remote.QuarantinedEvent -import akka.remote.artery.AeronSink.GaveUpSendingException +import akka.remote.artery.AeronSink.GaveUpMessageException import akka.remote.artery.Encoder.ChangeOutboundCompression import akka.remote.artery.Encoder.ChangeOutboundCompressionFailed import akka.remote.artery.InboundControlJunction.ControlMessageSubject @@ -589,7 +589,7 @@ private[remote] class Association( // don't restart after shutdown, but log some details so we notice log.error(cause, s"{} to {} failed after shutdown. {}", streamName, remoteAddress, cause.getMessage) case _: AbruptTerminationException ⇒ // ActorSystem shutdown - case cause: GaveUpSendingException ⇒ + case cause: GaveUpMessageException ⇒ log.debug("{} to {} failed. Restarting it. {}", streamName, remoteAddress, cause.getMessage) // restart unconditionally, without counting restarts lazyRestart() diff --git a/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala b/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala index 957c432036..3e093fa5f8 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala @@ -3,6 +3,7 @@ */ package akka.remote.artery +import akka.util.PrettyDuration.PrettyPrintableDuration import java.util.ArrayDeque import scala.annotation.tailrec import scala.concurrent.duration._ @@ -26,6 +27,7 @@ import akka.actor.ActorRef import akka.remote.PriorityMessage import akka.actor.ActorSelectionMessage import akka.dispatch.sysmsg.SystemMessage +import scala.util.control.NoStackTrace /** * INTERNAL API @@ -38,11 +40,14 @@ private[akka] object SystemMessageDelivery { final case object ClearSystemMessageDelivery + final class GaveUpSystemMessageException(msg: String) extends RuntimeException(msg) with NoStackTrace + private case object ResendTick // If other message types than SystemMesage need acked delivery they can extend this trait. // Used in tests since real SystemMessage are somewhat cumbersome to create. trait AckedDeliveryMessage + } /** @@ -71,6 +76,9 @@ private[akka] class SystemMessageDelivery( private var resendingFromSeqNo = -1L private var stopping = false + private val giveUpAfterNanos = outboundContext.settings.Advanced.GiveUpSystemMessageAfter.toNanos + private var ackTimestamp = System.nanoTime() + private def localAddress = outboundContext.localAddress private def remoteAddress = outboundContext.remoteAddress @@ -109,13 +117,13 @@ private[akka] class SystemMessageDelivery( override protected def onTimer(timerKey: Any): Unit = timerKey match { case ResendTick ⇒ + checkGiveUp() if (resending.isEmpty && !unacknowledged.isEmpty) { resending = unacknowledged.clone() tryResend() } if (!unacknowledged.isEmpty) scheduleOnce(ResendTick, resendInterval) - // FIXME give up resending after a long while, i.e. config property quarantine-after-silence } // ControlMessageObserver, external call @@ -141,6 +149,7 @@ private[akka] class SystemMessageDelivery( } private def ack(n: Long): Unit = { + ackTimestamp = System.nanoTime() if (n <= seqNo) clearUnacknowledged(n) } @@ -176,6 +185,10 @@ private[akka] class SystemMessageDelivery( case msg @ (_: SystemMessage | _: AckedDeliveryMessage) ⇒ if (unacknowledged.size < maxBufferSize) { seqNo += 1 + if (unacknowledged.isEmpty) + ackTimestamp = System.nanoTime() + else + checkGiveUp() val sendEnvelope = outboundEnvelope.withMessage(SystemMessageEnvelope(msg, seqNo, localAddress)) unacknowledged.offer(sendEnvelope) scheduleOnce(ResendTick, resendInterval) @@ -209,6 +222,13 @@ private[akka] class SystemMessageDelivery( } } + private def checkGiveUp(): Unit = { + if (!unacknowledged.isEmpty && (System.nanoTime() - ackTimestamp > giveUpAfterNanos)) + throw new GaveUpSystemMessageException( + s"Gave up sending system message to [${outboundContext.remoteAddress}] after " + + s"${outboundContext.settings.Advanced.GiveUpSystemMessageAfter.pretty}.") + } + private def clear(): Unit = { sendUnacknowledgedToDeadLetters() seqNo = 0L // sequence number for the first message will be 1 diff --git a/akka-remote/src/test/scala/akka/remote/artery/AeronSinkSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/AeronSinkSpec.scala index ead929855d..feeb58b2c4 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/AeronSinkSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/AeronSinkSpec.scala @@ -10,7 +10,7 @@ import scala.concurrent.duration._ import scala.util.control.NoStackTrace import akka.actor.ExtendedActorSystem -import akka.remote.artery.AeronSink.GaveUpSendingException +import akka.remote.artery.AeronSink.GaveUpMessageException import akka.stream.ActorMaterializer import akka.stream.ActorMaterializerSettings import akka.stream.scaladsl.Sink @@ -75,7 +75,7 @@ class AeronSinkSpec extends AkkaSpec with ImplicitSender { .runWith(new AeronSink(channel, 1, aeron, taskRunner, pool, 500.millis, IgnoreEventSink)) // without the give up timeout the stream would not complete/fail - intercept[GaveUpSendingException] { + intercept[GaveUpMessageException] { Await.result(done, 5.seconds) } }