From 1584c521902254452e8551a1323621c21e0df3bd Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 9 Sep 2016 07:45:21 +0200 Subject: [PATCH] handle longer network partitions, #21399 * system messages in flight should not trigger premature quarantine in case of longer network partitions, therefore we keep the control stream alive * add give-up-system-message-after property that is used by both SystemMessageDelivery and AeronSink in the control stream * also unwrap SystemMessageEnvelope in RemoteDeadLetterActorRef * skip sending control messages after shutdown, can be triggered by scheduled compression advertisment --- .../artery/AeronStreamConcistencySpec.scala | 8 +- .../artery/AeronStreamLatencySpec.scala | 8 +- .../artery/AeronStreamMaxThroughputSpec.scala | 4 +- .../artery/SurviveNetworkPartitionSpec.scala | 113 ++++++++++++++++++ akka-remote/src/main/resources/reference.conf | 7 +- .../akka/remote/RemoteActorRefProvider.scala | 10 +- .../scala/akka/remote/artery/AeronSink.scala | 21 ++-- .../akka/remote/artery/ArterySettings.scala | 6 +- .../akka/remote/artery/ArteryTransport.scala | 5 +- .../akka/remote/artery/Association.scala | 4 +- .../remote/artery/SystemMessageDelivery.scala | 22 +++- .../akka/remote/artery/AeronSinkSpec.scala | 4 +- 12 files changed, 181 insertions(+), 31 deletions(-) create mode 100644 akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/SurviveNetworkPartitionSpec.scala diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamConcistencySpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamConcistencySpec.scala index 9103899eb4..240607651d 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamConcistencySpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamConcistencySpec.scala @@ -82,7 +82,7 @@ abstract class AeronStreamConsistencySpec } val streamId = 1 - val giveUpSendAfter = 30.seconds + val giveUpMessageAfter = 30.seconds override def afterAll(): Unit = { taskRunner.stop() @@ -98,7 +98,7 @@ abstract class AeronStreamConsistencySpec runOn(second) { // just echo back Source.fromGraph(new AeronSource(channel(second), streamId, aeron, taskRunner, pool, IgnoreEventSink)) - .runWith(new AeronSink(channel(first), streamId, aeron, taskRunner, pool, giveUpSendAfter, IgnoreEventSink)) + .runWith(new AeronSink(channel(first), streamId, aeron, taskRunner, pool, giveUpMessageAfter, IgnoreEventSink)) } enterBarrier("echo-started") } @@ -139,7 +139,7 @@ abstract class AeronStreamConsistencySpec envelope } .throttle(1, 200.milliseconds, 1, ThrottleMode.Shaping) - .runWith(new AeronSink(channel(second), streamId, aeron, taskRunner, pool, giveUpSendAfter, IgnoreEventSink)) + .runWith(new AeronSink(channel(second), streamId, aeron, taskRunner, pool, giveUpMessageAfter, IgnoreEventSink)) started.expectMsg(Done) } @@ -151,7 +151,7 @@ abstract class AeronStreamConsistencySpec envelope.byteBuffer.flip() envelope } - .runWith(new AeronSink(channel(second), streamId, aeron, taskRunner, pool, giveUpSendAfter, IgnoreEventSink)) + .runWith(new AeronSink(channel(second), streamId, aeron, taskRunner, pool, giveUpMessageAfter, IgnoreEventSink)) Await.ready(done, 20.seconds) killSwitch.shutdown() diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamLatencySpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamLatencySpec.scala index 4b0fd67dd0..8b279d21d6 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamLatencySpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamLatencySpec.scala @@ -115,7 +115,7 @@ abstract class AeronStreamLatencySpec } val streamId = 1 - val giveUpSendAfter = 30.seconds + val giveUpMessageAfter = 30.seconds lazy val reporterExecutor = Executors.newFixedThreadPool(1) def reporter(name: String): TestRateReporter = { @@ -245,7 +245,7 @@ abstract class AeronStreamLatencySpec envelope } .throttle(1, 200.milliseconds, 1, ThrottleMode.Shaping) - .runWith(new AeronSink(channel(second), streamId, aeron, taskRunner, pool, giveUpSendAfter, IgnoreEventSink)) + .runWith(new AeronSink(channel(second), streamId, aeron, taskRunner, pool, giveUpMessageAfter, IgnoreEventSink)) started.expectMsg(Done) } @@ -264,7 +264,7 @@ abstract class AeronStreamLatencySpec val queueValue = Source.fromGraph(new SendQueue[Unit]) .via(sendFlow) - .to(new AeronSink(channel(second), streamId, aeron, taskRunner, pool, giveUpSendAfter, IgnoreEventSink)) + .to(new AeronSink(channel(second), streamId, aeron, taskRunner, pool, giveUpMessageAfter, IgnoreEventSink)) .run() val queue = new ManyToOneConcurrentArrayQueue[Unit](1024) @@ -314,7 +314,7 @@ abstract class AeronStreamLatencySpec runOn(second) { // just echo back Source.fromGraph(new AeronSource(channel(second), streamId, aeron, taskRunner, pool, IgnoreEventSink)) - .runWith(new AeronSink(channel(first), streamId, aeron, taskRunner, pool, giveUpSendAfter, IgnoreEventSink)) + .runWith(new AeronSink(channel(first), streamId, aeron, taskRunner, pool, giveUpMessageAfter, IgnoreEventSink)) } enterBarrier("echo-started") } diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamMaxThroughputSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamMaxThroughputSpec.scala index a1f9a7ee4d..cd75f21fef 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamMaxThroughputSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamMaxThroughputSpec.scala @@ -115,7 +115,7 @@ abstract class AeronStreamMaxThroughputSpec } val streamId = 1 - val giveUpSendAfter = 30.seconds + val giveUpMessageAfter = 30.seconds lazy val reporterExecutor = Executors.newFixedThreadPool(1) def reporter(name: String): TestRateReporter = { @@ -213,7 +213,7 @@ abstract class AeronStreamMaxThroughputSpec envelope.byteBuffer.flip() envelope } - .runWith(new AeronSink(channel(second), streamId, aeron, taskRunner, pool, giveUpSendAfter, IgnoreEventSink)) + .runWith(new AeronSink(channel(second), streamId, aeron, taskRunner, pool, giveUpMessageAfter, IgnoreEventSink)) printStats("sender") enterBarrier(testName + "-done") diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/SurviveNetworkPartitionSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/SurviveNetworkPartitionSpec.scala new file mode 100644 index 0000000000..9eab4bab1b --- /dev/null +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/SurviveNetworkPartitionSpec.scala @@ -0,0 +1,113 @@ +/** + * Copyright (C) 2016 Lightbend Inc. + */ +package akka.remote.artery + +import scala.concurrent.duration._ +import akka.actor._ +import akka.actor.ActorIdentity +import akka.actor.Identify +import akka.remote.RARP +import akka.remote.testconductor.RoleName +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.remote.testkit.STMultiNodeSpec +import akka.remote.transport.ThrottlerTransportAdapter.Direction +import akka.testkit._ +import com.typesafe.config.ConfigFactory +import akka.remote.QuarantinedEvent + +object SurviveNetworkPartitionSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + + commonConfig(debugConfig(on = false).withFallback( + ConfigFactory.parseString(""" + akka.loglevel = INFO + akka.remote.artery.enabled = on + akka.remote.artery.advanced.give-up-system-message-after = 4s + """))) + + testTransport(on = true) +} + +class SurviveNetworkPartitionSpecMultiJvmNode1 extends SurviveNetworkPartitionSpec +class SurviveNetworkPartitionSpecMultiJvmNode2 extends SurviveNetworkPartitionSpec + +abstract class SurviveNetworkPartitionSpec + extends MultiNodeSpec(SurviveNetworkPartitionSpec) + with STMultiNodeSpec with ImplicitSender { + + import SurviveNetworkPartitionSpec._ + + override def initialParticipants = roles.size + + "Network partition" must { + + "not quarantine system when it heals within 'give-up-system-message-after'" taggedAs LongRunningTest in { + + runOn(second) { + system.actorOf(TestActors.echoActorProps, "echo1") + } + enterBarrier("echo-started") + + runOn(first) { + system.actorSelection(node(second) / "user" / "echo1") ! Identify(None) + val ref = expectMsgType[ActorIdentity].ref.get + ref ! "ping1" + expectMsg("ping1") + + // network partition + testConductor.blackhole(first, second, Direction.Both).await + + // send system message during network partition + watch(ref) + // keep the network partition for a while, but shorter than give-up-system-message-after + expectNoMsg(RARP(system).provider.remoteSettings.Artery.Advanced.GiveUpSystemMessageAfter - 2.second) + + // heal the network partition + testConductor.passThrough(first, second, Direction.Both).await + + // not quarantined + ref ! "ping2" + expectMsg("ping2") + + ref ! PoisonPill + expectTerminated(ref) + } + + enterBarrier("done") + } + + "quarantine system when it doesn't heal within 'give-up-system-message-after'" taggedAs LongRunningTest in { + + runOn(second) { + system.actorOf(TestActors.echoActorProps, "echo2") + } + enterBarrier("echo-started") + + runOn(first) { + val qProbe = TestProbe() + system.eventStream.subscribe(qProbe.ref, classOf[QuarantinedEvent]) + system.actorSelection(node(second) / "user" / "echo2") ! Identify(None) + val ref = expectMsgType[ActorIdentity].ref.get + ref ! "ping1" + expectMsg("ping1") + + // network partition + testConductor.blackhole(first, second, Direction.Both).await + + // send system message during network partition + watch(ref) + // keep the network partition for a while, longer than give-up-system-message-after + expectNoMsg(RARP(system).provider.remoteSettings.Artery.Advanced.GiveUpSystemMessageAfter - 1.second) + qProbe.expectMsgType[QuarantinedEvent](5.seconds).address should ===(node(second).address) + + expectTerminated(ref) + } + + enterBarrier("done") + } + + } +} diff --git a/akka-remote/src/main/resources/reference.conf b/akka-remote/src/main/resources/reference.conf index aca07a396a..2a210b6ee5 100644 --- a/akka-remote/src/main/resources/reference.conf +++ b/akka-remote/src/main/resources/reference.conf @@ -246,7 +246,12 @@ akka { inject-handshake-interval = 1 second # messages that are not accepted by Aeron are dropped after retrying for this period - give-up-send-after = 60 seconds + give-up-message-after = 60 seconds + + # System messages that are not acknowledged after re-sending for this period are + # dropped and will trigger quarantine. The value should be longer than the length + # of a network partition that you need to survive. + give-up-system-message-after = 6 hours # during ActorSystem termination the remoting will wait this long for # an acknowledgment by the destination system that flushing of outstanding diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 1aa85416f6..28ced7a91b 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -20,6 +20,7 @@ import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics } import akka.remote.artery.ArteryTransport import akka.util.OptionVal import akka.remote.artery.OutboundEnvelope +import akka.remote.artery.SystemMessageDelivery.SystemMessageEnvelope /** * INTERNAL API @@ -98,14 +99,19 @@ private[akka] object RemoteActorRefProvider { // the dead letter status if (seqOpt.isEmpty) super.!(DeadLetter(m, senderOption.getOrElse(_provider.deadLetters), recipient)) case env: OutboundEnvelope ⇒ - super.!(DeadLetter(env.message, env.sender.getOrElse(_provider.deadLetters), + super.!(DeadLetter(unwrapSystemMessageEnvelope(env.message), env.sender.getOrElse(_provider.deadLetters), env.recipient.getOrElse(_provider.deadLetters))) case DeadLetter(env: OutboundEnvelope, _, _) ⇒ - super.!(DeadLetter(env.message, env.sender.getOrElse(_provider.deadLetters), + super.!(DeadLetter(unwrapSystemMessageEnvelope(env.message), env.sender.getOrElse(_provider.deadLetters), env.recipient.getOrElse(_provider.deadLetters))) case _ ⇒ super.!(message)(sender) } + private def unwrapSystemMessageEnvelope(msg: AnyRef): AnyRef = msg match { + case SystemMessageEnvelope(m, _, _) ⇒ m + case _ ⇒ msg + } + @throws(classOf[java.io.ObjectStreamException]) override protected def writeReplace(): AnyRef = DeadLetterActorRef.serialized } diff --git a/akka-remote/src/main/scala/akka/remote/artery/AeronSink.scala b/akka-remote/src/main/scala/akka/remote/artery/AeronSink.scala index b90d412403..9d3e5a2fd3 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/AeronSink.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/AeronSink.scala @@ -3,6 +3,7 @@ */ package akka.remote.artery +import akka.util.PrettyDuration.PrettyPrintableDuration import java.nio.ByteBuffer import java.util.concurrent.atomic.AtomicInteger import scala.annotation.tailrec @@ -28,7 +29,7 @@ import org.agrona.hints.ThreadHints object AeronSink { - final class GaveUpSendingException(msg: String) extends RuntimeException(msg) with NoStackTrace + final class GaveUpMessageException(msg: String) extends RuntimeException(msg) with NoStackTrace final class PublicationClosedException(msg: String) extends RuntimeException(msg) with NoStackTrace @@ -75,13 +76,13 @@ object AeronSink { * @param channel eg. "aeron:udp?endpoint=localhost:40123" */ class AeronSink( - channel: String, - streamId: Int, - aeron: Aeron, - taskRunner: TaskRunner, - pool: EnvelopeBufferPool, - giveUpSendAfter: Duration, - flightRecorder: EventSink) + channel: String, + streamId: Int, + aeron: Aeron, + taskRunner: TaskRunner, + pool: EnvelopeBufferPool, + giveUpAfter: Duration, + flightRecorder: EventSink) extends GraphStageWithMaterializedValue[SinkShape[EnvelopeBuffer], Future[Done]] { import AeronSink._ import TaskRunner._ @@ -104,7 +105,7 @@ class AeronSink( private var backoffCount = spinning private var lastMsgSize = 0 private val offerTask = new OfferTask(pub, null, lastMsgSize, getAsyncCallback(_ ⇒ taskOnOfferSuccess()), - giveUpSendAfter, getAsyncCallback(_ ⇒ onGiveUp()), getAsyncCallback(_ ⇒ onPublicationClosed())) + giveUpAfter, getAsyncCallback(_ ⇒ onGiveUp()), getAsyncCallback(_ ⇒ onPublicationClosed())) private val addOfferTask: Add = Add(offerTask) private var offerTaskInProgress = false @@ -191,7 +192,7 @@ class AeronSink( private def onGiveUp(): Unit = { offerTaskInProgress = false - val cause = new GaveUpSendingException(s"Gave up sending message to $channel after $giveUpSendAfter.") + val cause = new GaveUpMessageException(s"Gave up sending message to $channel after ${giveUpAfter.pretty}.") flightRecorder.alert(AeronSink_GaveUpEnvelope, cause.getMessage.getBytes("US-ASCII")) completedValue = Failure(cause) failStage(cause) diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArterySettings.scala b/akka-remote/src/main/scala/akka/remote/artery/ArterySettings.scala index 6f008d4235..0d3c6cc1a8 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArterySettings.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArterySettings.scala @@ -89,8 +89,10 @@ private[akka] final class ArterySettings private (config: Config) { interval > Duration.Zero, "handshake-retry-interval must be more than zero") val InjectHandshakeInterval = config.getMillisDuration("inject-handshake-interval").requiring(interval ⇒ interval > Duration.Zero, "inject-handshake-interval must be more than zero") - val GiveUpSendAfter = config.getMillisDuration("give-up-send-after").requiring(interval ⇒ - interval > Duration.Zero, "give-up-send-after must be more than zero") + val GiveUpMessageAfter = config.getMillisDuration("give-up-message-after").requiring(interval ⇒ + interval > Duration.Zero, "give-up-message-after must be more than zero") + val GiveUpSystemMessageAfter = config.getMillisDuration("give-up-system-message-after").requiring(interval ⇒ + interval > Duration.Zero, "give-up-system-message-after must be more than zero") val ShutdownFlushTimeout = config.getMillisDuration("shutdown-flush-timeout").requiring(interval ⇒ interval > Duration.Zero, "shutdown-flush-timeout must be more than zero") val InboundRestartTimeout = config.getMillisDuration("inbound-restart-timeout").requiring(interval ⇒ diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala index c7f04f6e81..35cc7ee5e9 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala @@ -876,8 +876,11 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R aeronSink(outboundContext, ordinaryStreamId) private def aeronSink(outboundContext: OutboundContext, streamId: Int): Sink[EnvelopeBuffer, Future[Done]] = { + val giveUpAfter = + if (streamId == controlStreamId) settings.Advanced.GiveUpSystemMessageAfter + else settings.Advanced.GiveUpMessageAfter Sink.fromGraph(new AeronSink(outboundChannel(outboundContext.remoteAddress), streamId, aeron, taskRunner, - envelopeBufferPool, settings.Advanced.GiveUpSendAfter, createFlightRecorderEventSink())) + envelopeBufferPool, giveUpAfter, createFlightRecorderEventSink())) } def outboundLane(outboundContext: OutboundContext): Flow[OutboundEnvelope, EnvelopeBuffer, ChangeOutboundCompression] = diff --git a/akka-remote/src/main/scala/akka/remote/artery/Association.scala b/akka-remote/src/main/scala/akka/remote/artery/Association.scala index cd42e7ec73..6894e02e49 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Association.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Association.scala @@ -24,7 +24,7 @@ import akka.event.Logging import akka.remote._ import akka.remote.DaemonMsgCreate import akka.remote.QuarantinedEvent -import akka.remote.artery.AeronSink.GaveUpSendingException +import akka.remote.artery.AeronSink.GaveUpMessageException import akka.remote.artery.Encoder.ChangeOutboundCompression import akka.remote.artery.Encoder.ChangeOutboundCompressionFailed import akka.remote.artery.InboundControlJunction.ControlMessageSubject @@ -589,7 +589,7 @@ private[remote] class Association( // don't restart after shutdown, but log some details so we notice log.error(cause, s"{} to {} failed after shutdown. {}", streamName, remoteAddress, cause.getMessage) case _: AbruptTerminationException ⇒ // ActorSystem shutdown - case cause: GaveUpSendingException ⇒ + case cause: GaveUpMessageException ⇒ log.debug("{} to {} failed. Restarting it. {}", streamName, remoteAddress, cause.getMessage) // restart unconditionally, without counting restarts lazyRestart() diff --git a/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala b/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala index 957c432036..3e093fa5f8 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala @@ -3,6 +3,7 @@ */ package akka.remote.artery +import akka.util.PrettyDuration.PrettyPrintableDuration import java.util.ArrayDeque import scala.annotation.tailrec import scala.concurrent.duration._ @@ -26,6 +27,7 @@ import akka.actor.ActorRef import akka.remote.PriorityMessage import akka.actor.ActorSelectionMessage import akka.dispatch.sysmsg.SystemMessage +import scala.util.control.NoStackTrace /** * INTERNAL API @@ -38,11 +40,14 @@ private[akka] object SystemMessageDelivery { final case object ClearSystemMessageDelivery + final class GaveUpSystemMessageException(msg: String) extends RuntimeException(msg) with NoStackTrace + private case object ResendTick // If other message types than SystemMesage need acked delivery they can extend this trait. // Used in tests since real SystemMessage are somewhat cumbersome to create. trait AckedDeliveryMessage + } /** @@ -71,6 +76,9 @@ private[akka] class SystemMessageDelivery( private var resendingFromSeqNo = -1L private var stopping = false + private val giveUpAfterNanos = outboundContext.settings.Advanced.GiveUpSystemMessageAfter.toNanos + private var ackTimestamp = System.nanoTime() + private def localAddress = outboundContext.localAddress private def remoteAddress = outboundContext.remoteAddress @@ -109,13 +117,13 @@ private[akka] class SystemMessageDelivery( override protected def onTimer(timerKey: Any): Unit = timerKey match { case ResendTick ⇒ + checkGiveUp() if (resending.isEmpty && !unacknowledged.isEmpty) { resending = unacknowledged.clone() tryResend() } if (!unacknowledged.isEmpty) scheduleOnce(ResendTick, resendInterval) - // FIXME give up resending after a long while, i.e. config property quarantine-after-silence } // ControlMessageObserver, external call @@ -141,6 +149,7 @@ private[akka] class SystemMessageDelivery( } private def ack(n: Long): Unit = { + ackTimestamp = System.nanoTime() if (n <= seqNo) clearUnacknowledged(n) } @@ -176,6 +185,10 @@ private[akka] class SystemMessageDelivery( case msg @ (_: SystemMessage | _: AckedDeliveryMessage) ⇒ if (unacknowledged.size < maxBufferSize) { seqNo += 1 + if (unacknowledged.isEmpty) + ackTimestamp = System.nanoTime() + else + checkGiveUp() val sendEnvelope = outboundEnvelope.withMessage(SystemMessageEnvelope(msg, seqNo, localAddress)) unacknowledged.offer(sendEnvelope) scheduleOnce(ResendTick, resendInterval) @@ -209,6 +222,13 @@ private[akka] class SystemMessageDelivery( } } + private def checkGiveUp(): Unit = { + if (!unacknowledged.isEmpty && (System.nanoTime() - ackTimestamp > giveUpAfterNanos)) + throw new GaveUpSystemMessageException( + s"Gave up sending system message to [${outboundContext.remoteAddress}] after " + + s"${outboundContext.settings.Advanced.GiveUpSystemMessageAfter.pretty}.") + } + private def clear(): Unit = { sendUnacknowledgedToDeadLetters() seqNo = 0L // sequence number for the first message will be 1 diff --git a/akka-remote/src/test/scala/akka/remote/artery/AeronSinkSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/AeronSinkSpec.scala index ead929855d..feeb58b2c4 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/AeronSinkSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/AeronSinkSpec.scala @@ -10,7 +10,7 @@ import scala.concurrent.duration._ import scala.util.control.NoStackTrace import akka.actor.ExtendedActorSystem -import akka.remote.artery.AeronSink.GaveUpSendingException +import akka.remote.artery.AeronSink.GaveUpMessageException import akka.stream.ActorMaterializer import akka.stream.ActorMaterializerSettings import akka.stream.scaladsl.Sink @@ -75,7 +75,7 @@ class AeronSinkSpec extends AkkaSpec with ImplicitSender { .runWith(new AeronSink(channel, 1, aeron, taskRunner, pool, 500.millis, IgnoreEventSink)) // without the give up timeout the stream would not complete/fail - intercept[GaveUpSendingException] { + intercept[GaveUpMessageException] { Await.result(done, 5.seconds) } }