+str #18045 add recoverWith(Source)

This commit is contained in:
Alexander Golubev 2016-01-29 22:06:36 -05:00
parent 4f237c8eff
commit 5a8f4135b7
13 changed files with 346 additions and 36 deletions

View file

@ -580,6 +580,16 @@ Allow sending of one last element downstream when a failure has happened upstrea
*completes* when upstream completes or upstream failed with exception pf can handle
recoverWith
^^^^^^^^^^^
Allow switching to alternative Source when a failure has happened upstream.
*emits* the element is available from the upstream or upstream is failed and pf returns alternative Source
*backpressures* downstream backpressures, after failure happened it backprssures to alternative Source
*completes* upstream completes or upstream failed with exception pf can handle
detach
^^^^^^
Detach upstream demand from downstream demand without detaching the stream rates.

View file

@ -569,6 +569,16 @@ Allow sending of one last element downstream when a failure has happened upstrea
*completes* when upstream completes or upstream failed with exception pf can handle
recoverWith
^^^^^^^^^^^
Allow switching to alternative Source when a failure has happened upstream.
*emits* the element is available from the upstream or upstream is failed and pf returns alternative Source
*backpressures* downstream backpressures, after failure happened it backprssures to alternative Source
*completes* upstream completes or upstream failed with exception pf can handle
detach
^^^^^^
Detach upstream demand from downstream demand without detaching the stream rates.

View file

@ -628,6 +628,42 @@ public class FlowTest extends StreamTest {
future.toCompletableFuture().get(200, TimeUnit.MILLISECONDS);
}
@Test
public void mustBeAbleToRecoverWithSource() throws Exception {
final TestPublisher.ManualProbe<Integer> publisherProbe = TestPublisher.manualProbe(true,system);
final JavaTestKit probe = new JavaTestKit(system);
final Iterable<Integer> recover = Arrays.asList(55, 0);
final Source<Integer, NotUsed> source = Source.fromPublisher(publisherProbe);
final Flow<Integer, Integer, NotUsed> flow = Flow.of(Integer.class).map(
new Function<Integer, Integer>() {
public Integer apply(Integer elem) {
if (elem == 2) throw new RuntimeException("ex");
else return elem;
}
})
.recoverWith(new JavaPartialFunction<Throwable, Source<Integer, NotUsed>>() {
public Source<Integer, NotUsed> apply(Throwable elem, boolean isCheck) {
if (isCheck) return null;
return Source.from(recover);
}
});
final CompletionStage<Done> future =
source.via(flow).runWith(Sink.foreach(elem -> probe.getRef().tell(elem, ActorRef.noSender())), materializer);
final PublisherProbeSubscription<Integer> s = publisherProbe.expectSubscription();
s.sendNext(0);
probe.expectMsgEquals(0);
s.sendNext(1);
probe.expectMsgEquals(1);
s.sendNext(2);
probe.expectMsgEquals(55);
probe.expectMsgEquals(0);
future.toCompletableFuture().get(200, TimeUnit.MILLISECONDS);
}
@Test
public void mustBeAbleToMaterializeIdentityWithJavaFlow() throws Exception {
final JavaTestKit probe = new JavaTestKit(system);

View file

@ -3,9 +3,10 @@
*/
package akka.stream.scaladsl
import akka.stream.testkit.scaladsl.TestSink
import akka.stream.{ ActorMaterializer, ActorMaterializerSettings }
import akka.stream.testkit.Utils._
import akka.stream.testkit.{ AkkaSpec, TestSubscriber }
import akka.stream.testkit.AkkaSpec
import scala.util.control.NoStackTrace
@ -19,56 +20,40 @@ class FlowRecoverSpec extends AkkaSpec {
"A Recover" must {
"recover when there is a handler" in assertAllStagesStopped {
val subscriber = TestSubscriber.probe[Int]()
Source(1 to 4).map { a if (a == 3) throw ex else a }
.recover { case t: Throwable 0 }
.runWith(Sink.fromSubscriber(subscriber))
subscriber.requestNext(1)
subscriber.requestNext(2)
subscriber.request(1)
subscriber.expectNext(0)
subscriber.request(1)
subscriber.expectComplete()
.runWith(TestSink.probe[Int])
.requestNext(1)
.requestNext(2)
.requestNext(0)
.request(1)
.expectComplete()
}
"failed stream if handler is not for such exception type" in assertAllStagesStopped {
val subscriber = TestSubscriber.probe[Int]()
Source(1 to 3).map { a if (a == 2) throw ex else a }
.recover { case t: IndexOutOfBoundsException 0 }
.runWith(Sink.fromSubscriber(subscriber))
subscriber.requestNext(1)
subscriber.request(1)
subscriber.expectError(ex)
.runWith(TestSink.probe[Int])
.requestNext(1)
.request(1)
.expectError(ex)
}
"not influence stream when there is no exceptions" in assertAllStagesStopped {
val subscriber = TestSubscriber.probe[Int]()
val k = Source(1 to 3).map(identity)
Source(1 to 3).map(identity)
.recover { case t: Throwable 0 }
.runWith(Sink.fromSubscriber(subscriber))
subscriber.requestNext(1)
subscriber.requestNext(2)
subscriber.requestNext(3)
subscriber.expectComplete()
.runWith(TestSink.probe[Int])
.request(3)
.expectNextN(1 to 3)
.expectComplete()
}
"finish stream if it's empty" in assertAllStagesStopped {
val subscriber = TestSubscriber.probe[Int]()
Source.empty.map(identity)
.recover { case t: Throwable 0 }
.runWith(Sink.fromSubscriber(subscriber))
subscriber.request(1)
subscriber.expectComplete()
.runWith(TestSink.probe[Int])
.request(1)
.expectComplete()
}
}
}

View file

@ -0,0 +1,116 @@
/**
* Copyright (C) 2016 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.scaladsl
import akka.stream.testkit.scaladsl.TestSink
import akka.stream.{ ActorMaterializer, ActorMaterializerSettings }
import akka.stream.testkit.Utils._
import akka.stream.testkit.AkkaSpec
import scala.util.control.NoStackTrace
class FlowRecoverWithSpec extends AkkaSpec {
val settings = ActorMaterializerSettings(system).withInputBuffer(initialSize = 1, maxSize = 1)
implicit val materializer = ActorMaterializer(settings)
val ex = new RuntimeException("ex") with NoStackTrace
"A RecoverWith" must {
"recover when there is a handler" in assertAllStagesStopped {
Source(1 to 4).map { a if (a == 3) throw ex else a }
.recoverWith { case t: Throwable Source(List(0, -1)) }
.runWith(TestSink.probe[Int])
.request(2)
.expectNextN(1 to 2)
.request(1)
.expectNext(0)
.request(1)
.expectNext(-1)
.expectComplete()
}
"cancel substream if parent is terminated when there is a handler" in assertAllStagesStopped {
Source(1 to 4).map { a if (a == 3) throw ex else a }
.recoverWith { case t: Throwable Source(List(0, -1)) }
.runWith(TestSink.probe[Int])
.request(2)
.expectNextN(1 to 2)
.request(1)
.expectNext(0)
.cancel()
}
"failed stream if handler is not for such exception type" in assertAllStagesStopped {
Source(1 to 3).map { a if (a == 2) throw ex else a }
.recoverWith { case t: IndexOutOfBoundsException Source.single(0) }
.runWith(TestSink.probe[Int])
.request(1)
.expectNext(1)
.request(1)
.expectError(ex)
}
"be able to recover with th same unmaterialized source if configured" in assertAllStagesStopped {
val src = Source(1 to 3).map { a if (a == 3) throw ex else a }
src.recoverWith { case t: Throwable src }
.runWith(TestSink.probe[Int])
.request(2)
.expectNextN(1 to 2)
.request(2)
.expectNextN(1 to 2)
.request(2)
.expectNextN(1 to 2)
.cancel()
}
"not influence stream when there is no exceptions" in assertAllStagesStopped {
Source(1 to 3).map(identity)
.recoverWith { case t: Throwable Source.single(0) }
.runWith(TestSink.probe[Int])
.request(3)
.expectNextN(1 to 3)
.expectComplete()
}
"finish stream if it's empty" in assertAllStagesStopped {
Source.empty.map(identity)
.recoverWith { case t: Throwable Source.single(0) }
.runWith(TestSink.probe[Int])
.request(3)
.expectComplete()
}
"switch the second time if alternative source throws exception" in assertAllStagesStopped {
val k = Source(1 to 3).map { a if (a == 3) throw new IndexOutOfBoundsException() else a }
.recoverWith {
case t: IndexOutOfBoundsException
Source(List(11, 22)).map(m if (m == 22) throw new IllegalArgumentException() else m)
case t: IllegalArgumentException Source(List(33, 44))
}.runWith(TestSink.probe[Int])
.request(2)
.expectNextN(List(1, 2))
.request(2)
.expectNextN(List(11, 33))
.request(1)
.expectNext(44)
.expectComplete()
}
"terminate with exception if altrnative source failed" in assertAllStagesStopped {
Source(1 to 3).map { a if (a == 3) throw new IndexOutOfBoundsException() else a }
.recoverWith {
case t: IndexOutOfBoundsException
Source(List(11, 22)).map(m if (m == 22) throw ex else m)
}.runWith(TestSink.probe[Int])
.request(2)
.expectNextN(List(1, 2))
.request(1)
.expectNextN(List(11))
.request(1)
.expectError(ex)
}
}
}

View file

@ -65,6 +65,7 @@ private[stream] object Stages {
val merge = name("merge")
val mergePreferred = name("mergePreferred")
val flattenMerge = name("flattenMerge")
val recoverWith = name("recoverWith")
val broadcast = name("broadcast")
val balance = name("balance")
val zip = name("zip")

View file

@ -6,10 +6,10 @@ package akka.stream.impl.fusing
import akka.event.Logging.LogLevel
import akka.event.{ LogSource, Logging, LoggingAdapter }
import akka.stream.Attributes.{ InputBuffer, LogLevels }
import akka.stream.impl.Stages.DefaultAttributes
import akka.stream.OverflowStrategies._
import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage
import akka.stream.impl.{ Buffer BufferImpl, ReactiveStreamsCompliance }
import akka.stream.scaladsl.Source
import akka.stream.stage._
import akka.stream.{ Supervision, _ }
import scala.annotation.tailrec
@ -1119,3 +1119,53 @@ private[stream] final class Reduce[T](f: (T, T) ⇒ T) extends SimpleLinearGraph
}
override def toString = "Reduce"
}
/**
* INTERNAL API
*/
private[stream] final class RecoverWith[T, M](pf: PartialFunction[Throwable, Graph[SourceShape[T], M]]) extends SimpleLinearGraphStage[T] {
override def initialAttributes = DefaultAttributes.recoverWith
override def createLogic(attr: Attributes) = new GraphStageLogic(shape) {
setHandler(in, new InHandler {
override def onPush(): Unit = push(out, grab(in))
override def onUpstreamFailure(ex: Throwable) = onFailure(ex)
})
setHandler(out, new OutHandler {
override def onPull(): Unit = pull(in)
})
def onFailure(ex: Throwable) = if (pf.isDefinedAt(ex)) switchTo(pf(ex)) else failStage(ex)
def switchTo(source: Graph[SourceShape[T], M]): Unit = {
val sinkIn = new SubSinkInlet[T]("RecoverWithSink")
sinkIn.setHandler(new InHandler {
override def onPush(): Unit =
if (isAvailable(out)) {
push(out, sinkIn.grab())
sinkIn.pull()
}
override def onUpstreamFinish(): Unit = if (!sinkIn.isAvailable) completeStage()
override def onUpstreamFailure(ex: Throwable) = onFailure(ex)
})
def pushOut(): Unit = {
push(out, sinkIn.grab())
if (!sinkIn.isClosed) sinkIn.pull()
else completeStage()
}
val outHandler = new OutHandler {
override def onPull(): Unit = if (sinkIn.isAvailable) pushOut()
override def onDownstreamFinish(): Unit = sinkIn.cancel()
}
Source.fromGraph(source).runWith(sinkIn.sink)(interpreter.subFusingMaterializer)
setHandler(out, outHandler)
sinkIn.pull()
}
}
override def toString: String = "RecoverWith"
}

View file

@ -8,6 +8,7 @@ import akka.NotUsed
import akka.stream._
import akka.stream.impl.Stages.DefaultAttributes
import akka.stream.impl.SubscriptionTimeoutException
import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage
import akka.stream.stage._
import akka.stream.scaladsl._
import akka.stream.actor.ActorSubscriberMessage

View file

@ -3,6 +3,7 @@
*/
package akka.stream.javadsl
import akka.stream.impl.fusing.RecoverWith
import akka.{ NotUsed, Done }
import akka.event.LoggingAdapter
import akka.japi.{ function, Pair }
@ -760,6 +761,26 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
def recover[T >: Out](pf: PartialFunction[Throwable, T]): javadsl.Flow[In, T, Mat] =
new Flow(delegate.recover(pf))
/**
* RecoverWith allows to switch to alternative Source on flow failure. It will stay in effect after
* a failure has been recovered so that each time there is a failure it is fed into the `pf` and a new
* Source may be materialized.
* Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements.
* This stage can recover the failure signal, but not the skipped elements, which will be dropped.
*
* '''Emits when''' element is available from the upstream or upstream is failed and element is available
* from alternative Source
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes or upstream failed with exception pf can handle
*
* '''Cancels when''' downstream cancels
*
*/
def recoverWith[T >: Out](pf: PartialFunction[Throwable, _ <: Graph[SourceShape[T], NotUsed]]): javadsl.Flow[In, T, Mat @uncheckedVariance] =
new Flow(delegate.recoverWith(pf))
/**
* Terminate processing (and cancel the upstream publisher) after the given
* number of elements. Due to input buffering some elements may have been

View file

@ -775,6 +775,26 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
def recover[T >: Out](pf: PartialFunction[Throwable, T]): javadsl.Source[T, Mat] =
new Source(delegate.recover(pf))
/**
* RecoverWith allows to switch to alternative Source on flow failure. It will stay in effect after
* a failure has been recovered so that each time there is a failure it is fed into the `pf` and a new
* Source may be materialized.
* Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements.
* This stage can recover the failure signal, but not the skipped elements, which will be dropped.
*
* '''Emits when''' element is available from the upstream or upstream is failed and element is available
* from alternative Source
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes or upstream failed with exception pf can handle
*
* '''Cancels when''' downstream cancels
*
*/
def recoverWith[T >: Out](pf: PartialFunction[Throwable, _ <: Graph[SourceShape[T], NotUsed]]): Source[T, Mat @uncheckedVariance] =
new Source(delegate.recoverWith(pf))
/**
* Transform each input element into an `Iterable of output elements that is
* then flattened into the output stream.

View file

@ -606,6 +606,26 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo
def recover[T >: Out](pf: PartialFunction[Throwable, T]): SubFlow[In, T, Mat] =
new SubFlow(delegate.recover(pf))
/**
* RecoverWith allows to switch to alternative Source on flow failure. It will stay in effect after
* a failure has been recovered so that each time there is a failure it is fed into the `pf` and a new
* Source may be materialized.
* Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements.
* This stage can recover the failure signal, but not the skipped elements, which will be dropped.
*
* '''Emits when''' element is available from the upstream or upstream is failed and element is available
* from alternative Source
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes or upstream failed with exception pf can handle
*
* '''Cancels when''' downstream cancels
*
*/
def recoverWith[T >: Out](pf: PartialFunction[Throwable, _ <: Graph[SourceShape[T], NotUsed]]): SubFlow[In, T, Mat @uncheckedVariance] =
new SubFlow(delegate.recoverWith(pf))
/**
* Terminate processing (and cancel the upstream publisher) after the given
* number of elements. Due to input buffering some elements may have been

View file

@ -602,6 +602,26 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source
def recover[T >: Out](pf: PartialFunction[Throwable, T]): SubSource[T, Mat] =
new SubSource(delegate.recover(pf))
/**
* RecoverWith allows to switch to alternative Source on flow failure. It will stay in effect after
* a failure has been recovered so that each time there is a failure it is fed into the `pf` and a new
* Source may be materialized.
* Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements.
* This stage can recover the failure signal, but not the skipped elements, which will be dropped.
*
* '''Emits when''' element is available from the upstream or upstream is failed and element is available
* from alternative Source
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes or upstream failed with exception pf can handle
*
* '''Cancels when''' downstream cancels
*
*/
def recoverWith[T >: Out](pf: PartialFunction[Throwable, _ <: Graph[SourceShape[T], NotUsed]]): SubSource[T, Mat @uncheckedVariance] =
new SubSource(delegate.recoverWith(pf))
/**
* Terminate processing (and cancel the upstream publisher) after the given
* number of elements. Due to input buffering some elements may have been

View file

@ -420,6 +420,26 @@ trait FlowOps[+Out, +Mat] {
*/
def recover[T >: Out](pf: PartialFunction[Throwable, T]): Repr[T] = andThen(Recover(pf))
/**
* RecoverWith allows to switch to alternative Source on flow failure. It will stay in effect after
* a failure has been recovered so that each time there is a failure it is fed into the `pf` and a new
* Source may be materialized.
* Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements.
* This stage can recover the failure signal, but not the skipped elements, which will be dropped.
*
* '''Emits when''' element is available from the upstream or upstream is failed and element is available
* from alternative Source
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes or upstream failed with exception pf can handle
*
* '''Cancels when''' downstream cancels
*
*/
def recoverWith[T >: Out](pf: PartialFunction[Throwable, Graph[SourceShape[T], NotUsed]]): Repr[T] =
via(new RecoverWith(pf))
/**
* Transform this stream by applying the given function to each of the elements
* as they pass through this processing step.