diff --git a/akka-http-core/src/main/scala/akka/http/impl/util/StreamUtils.scala b/akka-http-core/src/main/scala/akka/http/impl/util/StreamUtils.scala index 3b9b8465b1..07c80604ef 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/util/StreamUtils.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/util/StreamUtils.scala @@ -218,25 +218,6 @@ private[http] object StreamUtils { throw new IllegalStateException("Value can be only set once.") } - // TODO: remove after #16394 is cleared - def recover[A, B >: A](pf: PartialFunction[Throwable, B]): () ⇒ PushPullStage[A, B] = { - val stage = new PushPullStage[A, B] { - var recovery: Option[B] = None - def onPush(elem: A, ctx: Context[B]): SyncDirective = ctx.push(elem) - def onPull(ctx: Context[B]): SyncDirective = recovery match { - case None ⇒ ctx.pull() - case Some(x) ⇒ { recovery = null; ctx.push(x) } - case null ⇒ ctx.finish() - } - override def onUpstreamFailure(cause: Throwable, ctx: Context[B]): TerminationDirective = - if (pf isDefinedAt cause) { - recovery = Some(pf(cause)) - ctx.absorbTermination() - } else super.onUpstreamFailure(cause, ctx) - } - () ⇒ stage - } - /** * Returns a no-op flow that materializes to a future that will be completed when the flow gets a * completion or error signal. It doesn't necessarily mean, though, that all of a streaming pipeline diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/client/ConnectionPoolSpec.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/client/ConnectionPoolSpec.scala index 78423046de..315e87757f 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/engine/client/ConnectionPoolSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/client/ConnectionPoolSpec.scala @@ -302,7 +302,7 @@ class ConnectionPoolSpec extends AkkaSpec(""" val incomingConnectionsSub = { val rawBytesInjection = BidiFlow.fromFlows( Flow[SslTlsOutbound].collect[ByteString] { case SendBytes(x) ⇒ mapServerSideOutboundRawBytes(x) } - .transform(StreamUtils.recover { case NoErrorComplete ⇒ ByteString.empty }), + .recover({ case NoErrorComplete ⇒ ByteString.empty }), Flow[ByteString].map(SessionBytes(null, _))) val sink = if (autoAccept) Sink.foreach[Http.IncomingConnection](handleConnection) else Sink.fromSubscriber(incomingConnections) Tcp().bind(serverEndpoint.getHostString, serverEndpoint.getPort, idleTimeout = serverSettings.timeouts.idleTimeout)