diff --git a/akka-remote/src/main/scala/akka/remote/artery/tcp/ArteryTcpTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/tcp/ArteryTcpTransport.scala index f37e532571..060f0d90d7 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/tcp/ArteryTcpTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/tcp/ArteryTcpTransport.scala @@ -124,17 +124,19 @@ private[remote] class ArteryTcpTransport(_system: ExtendedActorSystem, _provider def connectionFlowWithRestart: Flow[ByteString, ByteString, NotUsed] = { val flowFactory = () ⇒ { - afr.loFreq( - TcpOutbound_Connected, - s"${outboundContext.remoteAddress.host.get}:${outboundContext.remoteAddress.port.get} " + - s"/ ${streamName(streamId)}") - - // FIXME use the Flow.lazyInit from https://github.com/akka/akka/pull/24527 - val flow = Flow[ByteString] - .prepend(Source.single(TcpFraming.encodeConnectionHeader(streamId))) - .via(connectionFlow) + .via(Flow.lazyInit(_ ⇒ { + // only open the actual connection if any new messages are sent + afr.loFreq( + TcpOutbound_Connected, + s"${outboundContext.remoteAddress.host.get}:${outboundContext.remoteAddress.port.get} " + + s"/ ${streamName(streamId)}") + Future.successful( + Flow[ByteString] + .prepend(Source.single(TcpFraming.encodeConnectionHeader(streamId))) + .via(connectionFlow)) + }, () ⇒ NotUsed)) .recoverWithRetries(1, { case ArteryTransport.ShutdownSignal ⇒ Source.empty }) .log(name = s"outbound connection to [${outboundContext.remoteAddress}], ${streamName(streamId)} stream") .addAttributes(Attributes.logLevels(onElement = LogLevels.Off, onFailure = Logging.WarningLevel))