use Flow.lazyInit in Artery TCP (#24590)

This commit is contained in:
Patrik Nordwall 2018-02-22 11:18:44 +01:00 committed by Konrad `ktoso` Malawski
parent a89268ee45
commit c15c22622e

View file

@ -124,17 +124,19 @@ private[remote] class ArteryTcpTransport(_system: ExtendedActorSystem, _provider
def connectionFlowWithRestart: Flow[ByteString, ByteString, NotUsed] = {
val flowFactory = () {
afr.loFreq(
TcpOutbound_Connected,
s"${outboundContext.remoteAddress.host.get}:${outboundContext.remoteAddress.port.get} " +
s"/ ${streamName(streamId)}")
// FIXME use the Flow.lazyInit from https://github.com/akka/akka/pull/24527
val flow =
Flow[ByteString]
.prepend(Source.single(TcpFraming.encodeConnectionHeader(streamId)))
.via(connectionFlow)
.via(Flow.lazyInit(_ {
// only open the actual connection if any new messages are sent
afr.loFreq(
TcpOutbound_Connected,
s"${outboundContext.remoteAddress.host.get}:${outboundContext.remoteAddress.port.get} " +
s"/ ${streamName(streamId)}")
Future.successful(
Flow[ByteString]
.prepend(Source.single(TcpFraming.encodeConnectionHeader(streamId)))
.via(connectionFlow))
}, () NotUsed))
.recoverWithRetries(1, { case ArteryTransport.ShutdownSignal Source.empty })
.log(name = s"outbound connection to [${outboundContext.remoteAddress}], ${streamName(streamId)} stream")
.addAttributes(Attributes.logLevels(onElement = LogLevels.Off, onFailure = Logging.WarningLevel))