=htc #16394 Remove StreamUtils.recover, Flow.recover is now available

This commit is contained in:
Bernard Leach 2016-04-05 22:25:07 +10:00 committed by Konrad Malawski
parent fc9c853ade
commit 09d5072f2c
2 changed files with 1 additions and 20 deletions

View file

@ -218,25 +218,6 @@ private[http] object StreamUtils {
throw new IllegalStateException("Value can be only set once.")
}
// TODO: remove after #16394 is cleared
def recover[A, B >: A](pf: PartialFunction[Throwable, B]): () PushPullStage[A, B] = {
val stage = new PushPullStage[A, B] {
var recovery: Option[B] = None
def onPush(elem: A, ctx: Context[B]): SyncDirective = ctx.push(elem)
def onPull(ctx: Context[B]): SyncDirective = recovery match {
case None ctx.pull()
case Some(x) { recovery = null; ctx.push(x) }
case null ctx.finish()
}
override def onUpstreamFailure(cause: Throwable, ctx: Context[B]): TerminationDirective =
if (pf isDefinedAt cause) {
recovery = Some(pf(cause))
ctx.absorbTermination()
} else super.onUpstreamFailure(cause, ctx)
}
() stage
}
/**
* Returns a no-op flow that materializes to a future that will be completed when the flow gets a
* completion or error signal. It doesn't necessarily mean, though, that all of a streaming pipeline

View file

@ -302,7 +302,7 @@ class ConnectionPoolSpec extends AkkaSpec("""
val incomingConnectionsSub = {
val rawBytesInjection = BidiFlow.fromFlows(
Flow[SslTlsOutbound].collect[ByteString] { case SendBytes(x) mapServerSideOutboundRawBytes(x) }
.transform(StreamUtils.recover { case NoErrorComplete ByteString.empty }),
.recover({ case NoErrorComplete ByteString.empty }),
Flow[ByteString].map(SessionBytes(null, _)))
val sink = if (autoAccept) Sink.foreach[Http.IncomingConnection](handleConnection) else Sink.fromSubscriber(incomingConnections)
Tcp().bind(serverEndpoint.getHostString, serverEndpoint.getPort, idleTimeout = serverSettings.timeouts.idleTimeout)