diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala index 92fa84e5da..cc114df912 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala @@ -765,18 +765,26 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R envelopePool, giveUpSendAfter, createFlightRecorderEventSink()))(Keep.right) } - def outboundControl(outboundContext: OutboundContext, compression: OutboundCompressions): Sink[OutboundEnvelope, (OutboundControlIngress, Future[Done])] = { + /** + * The outbound stream is defined as two parts to be able to add test stage in-between. + * System messages must not be dropped before the SystemMessageDelivery stage. + */ + def outboundControlPart1(outboundContext: OutboundContext): Flow[OutboundEnvelope, OutboundEnvelope, SharedKillSwitch] = { Flow.fromGraph(killSwitch.flow[OutboundEnvelope]) .via(new OutboundHandshake(system, outboundContext, outboundEnvelopePool, handshakeTimeout, handshakeRetryInterval, injectHandshakeInterval)) .via(new SystemMessageDelivery(outboundContext, system.deadLetters, systemMessageResendInterval, remoteSettings.SysMsgBufferSize)) + + // FIXME we can also add scrubbing stage that would collapse sys msg acks/nacks and remove duplicate Quarantine messages + } + + def outboundControlPart2(outboundContext: OutboundContext, compression: OutboundCompressions): Sink[OutboundEnvelope, (OutboundControlIngress, Future[Done])] = { + Flow[OutboundEnvelope] .viaMat(new OutboundControlJunction(outboundContext, outboundEnvelopePool))(Keep.right) .via(encoder(compression)) .toMat(new AeronSink(outboundChannel(outboundContext.remoteAddress), controlStreamId, aeron, taskRunner, envelopePool, Duration.Inf, createFlightRecorderEventSink()))(Keep.both) - - // FIXME we can also add scrubbing stage that would collapse sys msg acks/nacks and remove duplicate Quarantine messages } def createEncoder(compression: OutboundCompressions, bufferPool: EnvelopeBufferPool): Flow[OutboundEnvelope, EnvelopeBuffer, NotUsed] = diff --git a/akka-remote/src/main/scala/akka/remote/artery/Association.scala b/akka-remote/src/main/scala/akka/remote/artery/Association.scala index fdbfca9747..da3936b065 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Association.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Association.scala @@ -310,14 +310,16 @@ private[remote] class Association( if (transport.remoteSettings.TestMode) { val ((queueValue, mgmt), (control, completed)) = Source.fromGraph(new SendQueue[OutboundEnvelope]) + .via(transport.outboundControlPart1(this)) .viaMat(transport.outboundTestFlow(this))(Keep.both) - .toMat(transport.outboundControl(this, compression))(Keep.both) + .toMat(transport.outboundControlPart2(this, compression))(Keep.both) .run()(materializer) _testStages.add(mgmt) (queueValue, (control, completed)) } else { Source.fromGraph(new SendQueue[OutboundEnvelope]) - .toMat(transport.outboundControl(this, compression))(Keep.both) + .via(transport.outboundControlPart1(this)) + .toMat(transport.outboundControlPart2(this, compression))(Keep.both) .run()(materializer) }