From 630343e483a303a1c8f4f4ea025090ba6abcac92 Mon Sep 17 00:00:00 2001 From: Alexander Golubev Date: Sat, 13 Jun 2015 14:02:37 -0400 Subject: [PATCH] +str #16394 recover operation --- akka-docs-dev/rst/stages-overview.rst | 1 + .../java/akka/stream/javadsl/FlowTest.java | 33 +++++++++ .../java/akka/stream/javadsl/SourceTest.java | 30 ++++++++ .../stream/io/SynchronousFileSinkSpec.scala | 2 +- .../stream/io/SynchronousFileSourceSpec.scala | 2 +- .../akka/stream/scaladsl/FlowLogSpec.scala | 2 +- .../stream/scaladsl/FlowRecoverSpec.scala | 74 +++++++++++++++++++ .../stream/impl/ActorMaterializerImpl.scala | 18 +++-- .../main/scala/akka/stream/impl/Stages.scala | 6 ++ .../scala/akka/stream/impl/fusing/Ops.scala | 29 ++++++++ .../akka/stream/impl/io/IOSettings.scala | 2 +- .../main/scala/akka/stream/javadsl/Flow.scala | 18 +++++ .../scala/akka/stream/javadsl/Source.scala | 8 ++ .../scala/akka/stream/scaladsl/Flow.scala | 23 ++++++ 14 files changed, 236 insertions(+), 12 deletions(-) create mode 100644 akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowRecoverSpec.scala diff --git a/akka-docs-dev/rst/stages-overview.rst b/akka-docs-dev/rst/stages-overview.rst index 312d66c906..1890f5f2fe 100644 --- a/akka-docs-dev/rst/stages-overview.rst +++ b/akka-docs-dev/rst/stages-overview.rst @@ -40,6 +40,7 @@ drop the specified number of elements has been dropped already take the specified number of elements to take has not yet been reached downstream backpressures the defined number of elements has been taken or upstream completes takeWhile the predicate is true and until the first false result downstream backpressures predicate returned false or upstream completes dropWhile the predicate returned false and for all following stream elements predicate returned false and downstream backpressures upstream completes +recover the element is available from the upstream or upstream is failed and pf returns an element downstream backpressures, not when failure happened upstream completes or upstream failed with exception pf can handle ===================== ========================================================================================================================= ============================================================================================================================== ===================================================================================== Asynchronous processing stages diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java index 154c958c7f..d7bc385357 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java @@ -6,6 +6,7 @@ package akka.stream.javadsl; import akka.actor.ActorRef; import akka.dispatch.Foreach; import akka.dispatch.Futures; +import akka.japi.JavaPartialFunction; import akka.japi.Pair; import akka.stream.Outlet; import akka.stream.OverflowStrategy; @@ -18,6 +19,7 @@ import akka.stream.testkit.AkkaSpec; import akka.testkit.JavaTestKit; import org.reactivestreams.Publisher; +import scala.PartialFunction; import scala.runtime.BoxedUnit; import org.junit.ClassRule; import org.junit.Test; @@ -517,4 +519,35 @@ public class FlowTest extends StreamTest { probe.expectMsgEquals("B"); probe.expectMsgEquals("C"); } + + @Test + public void mustBeAbleToRecover() throws Exception { + final JavaTestKit probe = new JavaTestKit(system); + final Source source = Source.from(Arrays.asList(0, 1, 2, 3)); + final Flow flow = Flow.of(Integer.class).map( + new Function() { + public Integer apply(Integer elem) { + if (elem == 2) throw new RuntimeException("ex"); + else return elem; + } + }) + .recover(new JavaPartialFunction() { + public Integer apply(Throwable elem, boolean isCheck) { + if (isCheck) return null; + return 0; + } + }); + + final Future future = source.via(flow).runWith(Sink.foreach(new Procedure() { + public void apply(Integer elem) { + probe.getRef().tell(elem, ActorRef.noSender()); + } + }), materializer); + + probe.expectMsgEquals(0); + probe.expectMsgEquals(1); + probe.expectMsgEquals(0); + Await.ready(future, Duration.apply(200, TimeUnit.MILLISECONDS)); + } + } diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java index 639bc07d44..e7cb7e6e56 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java @@ -8,6 +8,7 @@ import akka.actor.Cancellable; import akka.dispatch.Foreach; import akka.dispatch.Futures; import akka.dispatch.OnSuccess; +import akka.japi.JavaPartialFunction; import akka.japi.Pair; import akka.stream.OverflowStrategy; import akka.stream.StreamTest; @@ -521,4 +522,33 @@ public class SourceTest extends StreamTest { Await.ready(future, duration); } + @Test + public void mustBeAbleToRecover() throws Exception { + final JavaTestKit probe = new JavaTestKit(system); + final Source source = Source.from(Arrays.asList(0, 1, 2, 3)).map( + new Function() { + public Integer apply(Integer elem) { + if (elem == 2) throw new RuntimeException("ex"); + else return elem; + } + }) + .recover(new JavaPartialFunction() { + public Integer apply(Throwable elem, boolean isCheck) { + if (isCheck) return null; + return 0; + } + }); + + final Future future = source.runWith(Sink.foreach(new Procedure() { + public void apply(Integer elem) { + probe.getRef().tell(elem, ActorRef.noSender()); + } + }), materializer); + + probe.expectMsgEquals(0); + probe.expectMsgEquals(1); + probe.expectMsgEquals(0); + Await.ready(future, Duration.apply(200, TimeUnit.MILLISECONDS)); + } + } diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/SynchronousFileSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/SynchronousFileSinkSpec.scala index 701fae804f..c791914267 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/SynchronousFileSinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/SynchronousFileSinkSpec.scala @@ -107,7 +107,7 @@ class SynchronousFileSinkSpec extends AkkaSpec(UnboundedMailboxConfig) { } } - "allow overriding the dispatcher using OperationAttributes" in assertAllStagesStopped { + "allow overriding the dispatcher using Attributes" in assertAllStagesStopped { targetFile { f ⇒ val sys = ActorSystem("dispatcher-testing", UnboundedMailboxConfig) val mat = ActorMaterializer()(sys) diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/SynchronousFileSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/SynchronousFileSourceSpec.scala index 6f24244f10..fe2d3e38a4 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/SynchronousFileSourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/SynchronousFileSourceSpec.scala @@ -180,7 +180,7 @@ class SynchronousFileSourceSpec extends AkkaSpec(UnboundedMailboxConfig) { } finally shutdown(sys) } - "allow overriding the dispatcher using OperationAttributes" in { + "allow overriding the dispatcher using Attributes" in { val sys = ActorSystem("dispatcher-testing", UnboundedMailboxConfig) val mat = ActorMaterializer()(sys) implicit val timeout = Timeout(500.millis) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowLogSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowLogSpec.scala index 1a1009ee6d..02dad695a0 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowLogSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowLogSpec.scala @@ -117,7 +117,7 @@ class FlowLogSpec extends AkkaSpec("akka.loglevel = DEBUG") with ScriptedTest { logProbe.expectMsg(Logging.Debug(src, clazz, "[flow-5] Upstream finished.")) } - "allow configuring log levels via OperationAttributes" in { + "allow configuring log levels via Attributes" in { val logAttrs = Attributes.logLevels( onElement = Logging.WarningLevel, onFinish = Logging.InfoLevel, diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowRecoverSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowRecoverSpec.scala new file mode 100644 index 0000000000..cf725815a5 --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowRecoverSpec.scala @@ -0,0 +1,74 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.stream.scaladsl + +import akka.stream.{ ActorMaterializer, ActorMaterializerSettings } +import akka.stream.testkit.Utils._ +import akka.stream.testkit.{ AkkaSpec, TestSubscriber } + +import scala.util.control.NoStackTrace + +class FlowRecoverSpec extends AkkaSpec { + + val settings = ActorMaterializerSettings(system).withInputBuffer(initialSize = 1, maxSize = 1) + + implicit val materializer = ActorMaterializer(settings) + + val ex = new RuntimeException("ex") with NoStackTrace + + "A Recover" must { + "recover when there is a handler" in assertAllStagesStopped { + val subscriber = TestSubscriber.probe[Int]() + + Source(1 to 4).map { a ⇒ if (a == 3) throw ex else a } + .recover { case t: Throwable ⇒ 0 } + .runWith(Sink(subscriber)) + + subscriber.requestNext(1) + subscriber.requestNext(2) + + subscriber.request(1) + subscriber.expectNext(0) + + subscriber.request(1) + subscriber.expectComplete() + } + + "failed stream if handler is not for such exception type" in assertAllStagesStopped { + val subscriber = TestSubscriber.probe[Int]() + + Source(1 to 3).map { a ⇒ if (a == 2) throw ex else a } + .recover { case t: IndexOutOfBoundsException ⇒ 0 } + .runWith(Sink(subscriber)) + + subscriber.requestNext(1) + subscriber.request(1) + subscriber.expectError(ex) + } + + "not influence stream when there is no exceptions" in assertAllStagesStopped { + val subscriber = TestSubscriber.probe[Int]() + + val k = Source(1 to 3).map(identity) + .recover { case t: Throwable ⇒ 0 } + .runWith(Sink(subscriber)) + + subscriber.requestNext(1) + subscriber.requestNext(2) + subscriber.requestNext(3) + subscriber.expectComplete() + } + + "finish stream if it's empty" in assertAllStagesStopped { + val subscriber = TestSubscriber.probe[Int]() + Source.empty.map(identity) + .recover { case t: Throwable ⇒ 0 } + .runWith(Sink(subscriber)) + + subscriber.request(1) + subscriber.expectComplete() + + } + } +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala index 1d652434c0..20a9e6ec20 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala @@ -308,14 +308,16 @@ private[akka] object ActorProcessorFactory { case Collect(pf, _) ⇒ interp(fusing.Collect(pf, settings.supervisionDecider)) case Scan(z, f, _) ⇒ interp(fusing.Scan(z, f, settings.supervisionDecider)) case Fold(z, f, _) ⇒ interp(fusing.Fold(z, f, settings.supervisionDecider)) - case Expand(s, f, _) ⇒ interp(fusing.Expand(s, f)) - case Conflate(s, f, _) ⇒ interp(fusing.Conflate(s, f, settings.supervisionDecider)) - case Buffer(n, s, _) ⇒ interp(fusing.Buffer(n, s)) - case MapConcat(f, _) ⇒ interp(fusing.MapConcat(f, settings.supervisionDecider)) - case MapAsync(p, f, _) ⇒ interp(fusing.MapAsync(p, f, settings.supervisionDecider)) - case MapAsyncUnordered(p, f, _) ⇒ interp(fusing.MapAsyncUnordered(p, f, settings.supervisionDecider)) - case Grouped(n, _) ⇒ interp(fusing.Grouped(n)) - case Log(n, e, l, _) ⇒ interp(fusing.Log(n, e, l)) + case Recover(pf, _) ⇒ (ActorInterpreter.props(settings, List(fusing.Recover(pf)), materializer, att), ()) + case Scan(z, f, _) ⇒ (ActorInterpreter.props(settings, List(fusing.Scan(z, f, settings.supervisionDecider)), materializer, att), ()) + case Expand(s, f, _) ⇒ (ActorInterpreter.props(settings, List(fusing.Expand(s, f)), materializer, att), ()) + case Conflate(s, f, _) ⇒ (ActorInterpreter.props(settings, List(fusing.Conflate(s, f, settings.supervisionDecider)), materializer, att), ()) + case Buffer(n, s, _) ⇒ (ActorInterpreter.props(settings, List(fusing.Buffer(n, s)), materializer, att), ()) + case MapConcat(f, _) ⇒ (ActorInterpreter.props(settings, List(fusing.MapConcat(f, settings.supervisionDecider)), materializer, att), ()) + case MapAsync(p, f, _) ⇒ (ActorInterpreter.props(settings, List(fusing.MapAsync(p, f, settings.supervisionDecider)), materializer, att), ()) + case MapAsyncUnordered(p, f, _) ⇒ (ActorInterpreter.props(settings, List(fusing.MapAsyncUnordered(p, f, settings.supervisionDecider)), materializer, att), ()) + case Grouped(n, _) ⇒ (ActorInterpreter.props(settings, List(fusing.Grouped(n)), materializer, att), ()) + case Log(n, e, l, _) ⇒ (ActorInterpreter.props(settings, List(fusing.Log(n, e, l)), materializer, att), ()) case GroupBy(f, _) ⇒ (GroupByProcessorImpl.props(settings, f), ()) case PrefixAndTail(n, _) ⇒ (PrefixAndTailImpl.props(settings, n), ()) case Split(d, _) ⇒ (SplitWhereProcessorImpl.props(settings, d), ()) diff --git a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala index 534241f7e0..849d23e429 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala @@ -26,6 +26,7 @@ private[stream] object Stages { val map = name("map") val filter = name("filter") val collect = name("collect") + val recover = name("recover") val mapAsync = name("mapAsync") val mapAsyncUnordered = name("mapAsyncUnordered") val grouped = name("grouped") @@ -140,6 +141,11 @@ private[stream] object Stages { override protected def newInstance: StageModule = this.copy() } + final case class Recover(pf: PartialFunction[Any, Any], attributes: Attributes = recover) extends StageModule { + def withAttributes(attributes: Attributes) = copy(attributes = attributes) + override protected def newInstance: StageModule = this.copy() + } + final case class MapAsync(parallelism: Int, f: Any ⇒ Future[Any], attributes: Attributes = mapAsync) extends StageModule { def withAttributes(attributes: Attributes) = copy(attributes = attributes) override protected def newInstance: StageModule = this.copy() diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index bd8b6be08e..b5c485c4de 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -6,6 +6,7 @@ package akka.stream.impl.fusing import akka.event.Logging.LogLevel import akka.event.{ LogSource, Logging, LoggingAdapter } import akka.stream.Attributes.LogLevels +import akka.stream.Supervision.Resume import akka.stream.impl.{ FixedSizeBuffer, ReactiveStreamsCompliance } import akka.stream.stage._ import akka.stream.{ Supervision, _ } @@ -87,6 +88,34 @@ private[akka] final case class Collect[In, Out](pf: PartialFunction[In, Out], de override def decide(t: Throwable): Supervision.Directive = decider(t) } +/** + * INTERNAL API + */ +private[akka] final case class Recover[T](pf: PartialFunction[Throwable, T]) extends PushPullStage[T, T] { + import Collect.NotApplied + var recovered: Option[T] = None + + override def onPush(elem: T, ctx: Context[T]): SyncDirective = { + ctx.push(elem) + } + + override def onPull(ctx: Context[T]): SyncDirective = + recovered match { + case Some(value) ⇒ ctx.pushAndFinish(value) + case None ⇒ ctx.pull() + } + + override def onUpstreamFailure(t: Throwable, ctx: Context[T]): TerminationDirective = { + pf.applyOrElse(t, NotApplied) match { + case NotApplied ⇒ ctx.fail(t) + case result: T @unchecked ⇒ + recovered = Some(result) + ctx.absorbTermination() + } + } + +} + /** * INTERNAL API */ diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/IOSettings.scala b/akka-stream/src/main/scala/akka/stream/impl/io/IOSettings.scala index 6fa792a073..f09c86b5d5 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/IOSettings.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/IOSettings.scala @@ -4,7 +4,7 @@ import akka.stream.ActorAttributes.Dispatcher import akka.stream.{ ActorMaterializer, MaterializationContext } private[stream] object IOSettings { - /** Picks default akka.stream.file-io-dispatcher or the OperationAttributes configured one */ + /** Picks default akka.stream.file-io-dispatcher or the Attributes configured one */ def fileIoDispatcher(context: MaterializationContext): String = { val mat = ActorMaterializer.downcast(context.materializer) context.effectiveAttributes.attributeList.collectFirst { case d: Dispatcher ⇒ d.dispatcher } getOrElse { diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index af0e43c2e7..e157a93b59 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -7,6 +7,7 @@ import akka.event.LoggingAdapter import akka.stream._ import akka.japi.{ Util, Pair } import akka.japi.function +import akka.stream.impl.Stages.Recover import akka.stream.scaladsl import akka.stream.scaladsl.{ Keep, Sink, Source } import org.reactivestreams.{ Subscription, Publisher, Subscriber, Processor } @@ -468,6 +469,23 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph */ def dropWhile(p: function.Predicate[Out]): javadsl.Flow[In, Out, Mat] = new Flow(delegate.dropWhile(p.test)) + /** + * Recover allows to send last element on failure and gracefully complete the stream + * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. + * This stage can recover the failure signal, but not the skipped elements, which will be dropped. + * + * '''Emits when''' element is available from the upstream or upstream is failed and pf returns an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or upstream failed with exception pf can handle + * + * '''Cancels when''' downstream cancels + * + */ + def recover[T >: Out](pf: PartialFunction[Throwable, T]): javadsl.Flow[In, T, Mat] = + new Flow(delegate.recover(pf)) + /** * Terminate processing (and cancel the upstream publisher) after the given * number of elements. Due to input buffering some elements may have been diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 45be60b496..40dfc9d177 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -326,6 +326,14 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour def map[T](f: function.Function[Out, T]): javadsl.Source[T, Mat] = new Source(delegate.map(f.apply)) + /** + * Recover allows to send last element on failure and gracefully complete the stream + * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. + * This stage can recover the failure signal, but not the skipped elements, which will be dropped. + */ + def recover[T >: Out](pf: PartialFunction[Throwable, T]): javadsl.Source[T, Mat] = + new Source(delegate.recover(pf)) + /** * Transform each input element into a sequence of output elements that is * then flattened into the output stream. diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 6f25244e16..721f37ad86 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -6,6 +6,8 @@ package akka.stream.scaladsl import scala.language.higherKinds import akka.event.LoggingAdapter +import akka.stream.impl.Stages.{ Recover, MaterializingStageFactory, StageModule } +import akka.stream.impl.StreamLayout.{ EmptyModule, Module } import akka.stream._ import akka.stream.Attributes._ import akka.stream.stage._ @@ -15,10 +17,15 @@ import akka.stream.impl.Stages.{ DirectProcessor, MaterializingStageFactory, Sta import akka.stream.impl.StreamLayout.{ EmptyModule, Module } import akka.util.Collections.EmptyImmutableSeq import org.reactivestreams.{ Subscription, Publisher, Subscriber, Processor } +import org.reactivestreams.Processor +import scala.annotation.implicitNotFound import scala.annotation.unchecked.uncheckedVariance import scala.collection.immutable import scala.concurrent.duration.{ Duration, FiniteDuration } import scala.concurrent.Future +import scala.language.higherKinds +import akka.stream.stage._ +import akka.stream.impl.{ Stages, StreamLayout, FlowModule } /** * A `Flow` is a set of stream processing steps that has one open input and one open output. @@ -374,6 +381,22 @@ trait FlowOps[+Out, +Mat] { private final val _identity = (x: Any) ⇒ x + /** + * Recover allows to send last element on failure and gracefully complete the stream + * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. + * This stage can recover the failure signal, but not the skipped elements, which will be dropped. + * + * '''Emits when''' element is available from the upstream or upstream is failed and pf returns an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or upstream failed with exception pf can handle + * + * '''Cancels when''' downstream cancels + * + */ + def recover[T >: Out](pf: PartialFunction[Throwable, T]): Repr[T, Mat] = andThen(Recover(pf.asInstanceOf[PartialFunction[Any, Any]])) + /** * Transform this stream by applying the given function to each of the elements * as they pass through this processing step.