From ae05ccaf770908268db3edea35b469958deca21d Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 11 Mar 2022 09:47:05 +0100 Subject: [PATCH] Restart of outbound stream if upstream completed (unexpectedly), #31081 (#31232) * In the logs attached to the issue we can see that an outbound connection is not re-established after "Upstream finished" (broken pipe). * Normally that is handled by the inner RestartFlow around the connection flow, but if that has reached it's maxRestarts (3) it will complete the entire stream and attachOutboundStreamRestart would not handle that as a restart case. --- .../main/scala/akka/remote/artery/Association.scala | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/akka-remote/src/main/scala/akka/remote/artery/Association.scala b/akka-remote/src/main/scala/akka/remote/artery/Association.scala index 2ffa870104..71b95581ce 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Association.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Association.scala @@ -958,9 +958,14 @@ private[remote] class Association( implicit val ec = materializer.executionContext streamCompleted.foreach { _ => - // shutdown as expected - // countDown the latch in case threads are waiting on the latch in outboundControlIngress method - materializing.countDown() + if (transport.isShutdown || isRemovedAfterQuarantined()) { + // shutdown as expected + // countDown the latch in case threads are waiting on the latch in outboundControlIngress method + materializing.countDown() + } else { + log.debug("{} to [{}] was completed. It will be restarted if used again.", streamName, remoteAddress) + lazyRestart() + } } streamCompleted.failed.foreach { case ArteryTransport.ShutdownSignal => @@ -1034,7 +1039,7 @@ private[remote] class Association( } else { log.error( cause, - s"{} to [{}] failed and restarted {} times within {} seconds. Terminating system. ${cause.getMessage}", + s"{} to [{}] failed and restarted {} times within {} seconds. Terminating system. ${cause.getMessage}", streamName, remoteAddress, advancedSettings.OutboundMaxRestarts,