=art place OutboundTestStage after SystemMessageDelivery stage (#20899)
* failing test was akka.cluster.AttemptSysMsgRedelivery when running with Artery * we rely on that system messages are not dropped before the redelivery stage, i.e. blackhole must be after that
This commit is contained in:
parent
b4b89f1442
commit
5e90d4db40
2 changed files with 15 additions and 5 deletions
|
|
@ -765,18 +765,26 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
|
||||||
envelopePool, giveUpSendAfter, createFlightRecorderEventSink()))(Keep.right)
|
envelopePool, giveUpSendAfter, createFlightRecorderEventSink()))(Keep.right)
|
||||||
}
|
}
|
||||||
|
|
||||||
def outboundControl(outboundContext: OutboundContext, compression: OutboundCompressions): Sink[OutboundEnvelope, (OutboundControlIngress, Future[Done])] = {
|
/**
|
||||||
|
* The outbound stream is defined as two parts to be able to add test stage in-between.
|
||||||
|
* System messages must not be dropped before the SystemMessageDelivery stage.
|
||||||
|
*/
|
||||||
|
def outboundControlPart1(outboundContext: OutboundContext): Flow[OutboundEnvelope, OutboundEnvelope, SharedKillSwitch] = {
|
||||||
Flow.fromGraph(killSwitch.flow[OutboundEnvelope])
|
Flow.fromGraph(killSwitch.flow[OutboundEnvelope])
|
||||||
.via(new OutboundHandshake(system, outboundContext, outboundEnvelopePool, handshakeTimeout,
|
.via(new OutboundHandshake(system, outboundContext, outboundEnvelopePool, handshakeTimeout,
|
||||||
handshakeRetryInterval, injectHandshakeInterval))
|
handshakeRetryInterval, injectHandshakeInterval))
|
||||||
.via(new SystemMessageDelivery(outboundContext, system.deadLetters, systemMessageResendInterval,
|
.via(new SystemMessageDelivery(outboundContext, system.deadLetters, systemMessageResendInterval,
|
||||||
remoteSettings.SysMsgBufferSize))
|
remoteSettings.SysMsgBufferSize))
|
||||||
|
|
||||||
|
// FIXME we can also add scrubbing stage that would collapse sys msg acks/nacks and remove duplicate Quarantine messages
|
||||||
|
}
|
||||||
|
|
||||||
|
def outboundControlPart2(outboundContext: OutboundContext, compression: OutboundCompressions): Sink[OutboundEnvelope, (OutboundControlIngress, Future[Done])] = {
|
||||||
|
Flow[OutboundEnvelope]
|
||||||
.viaMat(new OutboundControlJunction(outboundContext, outboundEnvelopePool))(Keep.right)
|
.viaMat(new OutboundControlJunction(outboundContext, outboundEnvelopePool))(Keep.right)
|
||||||
.via(encoder(compression))
|
.via(encoder(compression))
|
||||||
.toMat(new AeronSink(outboundChannel(outboundContext.remoteAddress), controlStreamId, aeron, taskRunner,
|
.toMat(new AeronSink(outboundChannel(outboundContext.remoteAddress), controlStreamId, aeron, taskRunner,
|
||||||
envelopePool, Duration.Inf, createFlightRecorderEventSink()))(Keep.both)
|
envelopePool, Duration.Inf, createFlightRecorderEventSink()))(Keep.both)
|
||||||
|
|
||||||
// FIXME we can also add scrubbing stage that would collapse sys msg acks/nacks and remove duplicate Quarantine messages
|
|
||||||
}
|
}
|
||||||
|
|
||||||
def createEncoder(compression: OutboundCompressions, bufferPool: EnvelopeBufferPool): Flow[OutboundEnvelope, EnvelopeBuffer, NotUsed] =
|
def createEncoder(compression: OutboundCompressions, bufferPool: EnvelopeBufferPool): Flow[OutboundEnvelope, EnvelopeBuffer, NotUsed] =
|
||||||
|
|
|
||||||
|
|
@ -310,14 +310,16 @@ private[remote] class Association(
|
||||||
if (transport.remoteSettings.TestMode) {
|
if (transport.remoteSettings.TestMode) {
|
||||||
val ((queueValue, mgmt), (control, completed)) =
|
val ((queueValue, mgmt), (control, completed)) =
|
||||||
Source.fromGraph(new SendQueue[OutboundEnvelope])
|
Source.fromGraph(new SendQueue[OutboundEnvelope])
|
||||||
|
.via(transport.outboundControlPart1(this))
|
||||||
.viaMat(transport.outboundTestFlow(this))(Keep.both)
|
.viaMat(transport.outboundTestFlow(this))(Keep.both)
|
||||||
.toMat(transport.outboundControl(this, compression))(Keep.both)
|
.toMat(transport.outboundControlPart2(this, compression))(Keep.both)
|
||||||
.run()(materializer)
|
.run()(materializer)
|
||||||
_testStages.add(mgmt)
|
_testStages.add(mgmt)
|
||||||
(queueValue, (control, completed))
|
(queueValue, (control, completed))
|
||||||
} else {
|
} else {
|
||||||
Source.fromGraph(new SendQueue[OutboundEnvelope])
|
Source.fromGraph(new SendQueue[OutboundEnvelope])
|
||||||
.toMat(transport.outboundControl(this, compression))(Keep.both)
|
.via(transport.outboundControlPart1(this))
|
||||||
|
.toMat(transport.outboundControlPart2(this, compression))(Keep.both)
|
||||||
.run()(materializer)
|
.run()(materializer)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue