+str #16394 recover operation

This commit is contained in:
Alexander Golubev 2015-06-13 14:02:37 -04:00 committed by Konrad Malawski
parent 0bcc996cc7
commit 630343e483
14 changed files with 236 additions and 12 deletions

View file

@ -40,6 +40,7 @@ drop the specified number of elements has been dropped already
take the specified number of elements to take has not yet been reached downstream backpressures the defined number of elements has been taken or upstream completes
takeWhile the predicate is true and until the first false result downstream backpressures predicate returned false or upstream completes
dropWhile the predicate returned false and for all following stream elements predicate returned false and downstream backpressures upstream completes
recover the element is available from the upstream or upstream is failed and pf returns an element downstream backpressures, not when failure happened upstream completes or upstream failed with exception pf can handle
===================== ========================================================================================================================= ============================================================================================================================== =====================================================================================
Asynchronous processing stages

View file

@ -6,6 +6,7 @@ package akka.stream.javadsl;
import akka.actor.ActorRef;
import akka.dispatch.Foreach;
import akka.dispatch.Futures;
import akka.japi.JavaPartialFunction;
import akka.japi.Pair;
import akka.stream.Outlet;
import akka.stream.OverflowStrategy;
@ -18,6 +19,7 @@ import akka.stream.testkit.AkkaSpec;
import akka.testkit.JavaTestKit;
import org.reactivestreams.Publisher;
import scala.PartialFunction;
import scala.runtime.BoxedUnit;
import org.junit.ClassRule;
import org.junit.Test;
@ -517,4 +519,35 @@ public class FlowTest extends StreamTest {
probe.expectMsgEquals("B");
probe.expectMsgEquals("C");
}
@Test
public void mustBeAbleToRecover() throws Exception {
final JavaTestKit probe = new JavaTestKit(system);
final Source<Integer, ?> source = Source.from(Arrays.asList(0, 1, 2, 3));
final Flow<Integer, Integer, ?> flow = Flow.of(Integer.class).map(
new Function<Integer, Integer>() {
public Integer apply(Integer elem) {
if (elem == 2) throw new RuntimeException("ex");
else return elem;
}
})
.recover(new JavaPartialFunction<Throwable, Integer>() {
public Integer apply(Throwable elem, boolean isCheck) {
if (isCheck) return null;
return 0;
}
});
final Future<BoxedUnit> future = source.via(flow).runWith(Sink.foreach(new Procedure<Integer>() {
public void apply(Integer elem) {
probe.getRef().tell(elem, ActorRef.noSender());
}
}), materializer);
probe.expectMsgEquals(0);
probe.expectMsgEquals(1);
probe.expectMsgEquals(0);
Await.ready(future, Duration.apply(200, TimeUnit.MILLISECONDS));
}
}

View file

@ -8,6 +8,7 @@ import akka.actor.Cancellable;
import akka.dispatch.Foreach;
import akka.dispatch.Futures;
import akka.dispatch.OnSuccess;
import akka.japi.JavaPartialFunction;
import akka.japi.Pair;
import akka.stream.OverflowStrategy;
import akka.stream.StreamTest;
@ -521,4 +522,33 @@ public class SourceTest extends StreamTest {
Await.ready(future, duration);
}
@Test
public void mustBeAbleToRecover() throws Exception {
final JavaTestKit probe = new JavaTestKit(system);
final Source<Integer, ?> source = Source.from(Arrays.asList(0, 1, 2, 3)).map(
new Function<Integer, Integer>() {
public Integer apply(Integer elem) {
if (elem == 2) throw new RuntimeException("ex");
else return elem;
}
})
.recover(new JavaPartialFunction<Throwable, Integer>() {
public Integer apply(Throwable elem, boolean isCheck) {
if (isCheck) return null;
return 0;
}
});
final Future<BoxedUnit> future = source.runWith(Sink.foreach(new Procedure<Integer>() {
public void apply(Integer elem) {
probe.getRef().tell(elem, ActorRef.noSender());
}
}), materializer);
probe.expectMsgEquals(0);
probe.expectMsgEquals(1);
probe.expectMsgEquals(0);
Await.ready(future, Duration.apply(200, TimeUnit.MILLISECONDS));
}
}

View file

@ -107,7 +107,7 @@ class SynchronousFileSinkSpec extends AkkaSpec(UnboundedMailboxConfig) {
}
}
"allow overriding the dispatcher using OperationAttributes" in assertAllStagesStopped {
"allow overriding the dispatcher using Attributes" in assertAllStagesStopped {
targetFile { f
val sys = ActorSystem("dispatcher-testing", UnboundedMailboxConfig)
val mat = ActorMaterializer()(sys)

View file

@ -180,7 +180,7 @@ class SynchronousFileSourceSpec extends AkkaSpec(UnboundedMailboxConfig) {
} finally shutdown(sys)
}
"allow overriding the dispatcher using OperationAttributes" in {
"allow overriding the dispatcher using Attributes" in {
val sys = ActorSystem("dispatcher-testing", UnboundedMailboxConfig)
val mat = ActorMaterializer()(sys)
implicit val timeout = Timeout(500.millis)

View file

@ -117,7 +117,7 @@ class FlowLogSpec extends AkkaSpec("akka.loglevel = DEBUG") with ScriptedTest {
logProbe.expectMsg(Logging.Debug(src, clazz, "[flow-5] Upstream finished."))
}
"allow configuring log levels via OperationAttributes" in {
"allow configuring log levels via Attributes" in {
val logAttrs = Attributes.logLevels(
onElement = Logging.WarningLevel,
onFinish = Logging.InfoLevel,

View file

@ -0,0 +1,74 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.scaladsl
import akka.stream.{ ActorMaterializer, ActorMaterializerSettings }
import akka.stream.testkit.Utils._
import akka.stream.testkit.{ AkkaSpec, TestSubscriber }
import scala.util.control.NoStackTrace
class FlowRecoverSpec extends AkkaSpec {
val settings = ActorMaterializerSettings(system).withInputBuffer(initialSize = 1, maxSize = 1)
implicit val materializer = ActorMaterializer(settings)
val ex = new RuntimeException("ex") with NoStackTrace
"A Recover" must {
"recover when there is a handler" in assertAllStagesStopped {
val subscriber = TestSubscriber.probe[Int]()
Source(1 to 4).map { a if (a == 3) throw ex else a }
.recover { case t: Throwable 0 }
.runWith(Sink(subscriber))
subscriber.requestNext(1)
subscriber.requestNext(2)
subscriber.request(1)
subscriber.expectNext(0)
subscriber.request(1)
subscriber.expectComplete()
}
"failed stream if handler is not for such exception type" in assertAllStagesStopped {
val subscriber = TestSubscriber.probe[Int]()
Source(1 to 3).map { a if (a == 2) throw ex else a }
.recover { case t: IndexOutOfBoundsException 0 }
.runWith(Sink(subscriber))
subscriber.requestNext(1)
subscriber.request(1)
subscriber.expectError(ex)
}
"not influence stream when there is no exceptions" in assertAllStagesStopped {
val subscriber = TestSubscriber.probe[Int]()
val k = Source(1 to 3).map(identity)
.recover { case t: Throwable 0 }
.runWith(Sink(subscriber))
subscriber.requestNext(1)
subscriber.requestNext(2)
subscriber.requestNext(3)
subscriber.expectComplete()
}
"finish stream if it's empty" in assertAllStagesStopped {
val subscriber = TestSubscriber.probe[Int]()
Source.empty.map(identity)
.recover { case t: Throwable 0 }
.runWith(Sink(subscriber))
subscriber.request(1)
subscriber.expectComplete()
}
}
}

View file

@ -308,14 +308,16 @@ private[akka] object ActorProcessorFactory {
case Collect(pf, _) interp(fusing.Collect(pf, settings.supervisionDecider))
case Scan(z, f, _) interp(fusing.Scan(z, f, settings.supervisionDecider))
case Fold(z, f, _) interp(fusing.Fold(z, f, settings.supervisionDecider))
case Expand(s, f, _) interp(fusing.Expand(s, f))
case Conflate(s, f, _) interp(fusing.Conflate(s, f, settings.supervisionDecider))
case Buffer(n, s, _) interp(fusing.Buffer(n, s))
case MapConcat(f, _) interp(fusing.MapConcat(f, settings.supervisionDecider))
case MapAsync(p, f, _) interp(fusing.MapAsync(p, f, settings.supervisionDecider))
case MapAsyncUnordered(p, f, _) interp(fusing.MapAsyncUnordered(p, f, settings.supervisionDecider))
case Grouped(n, _) interp(fusing.Grouped(n))
case Log(n, e, l, _) interp(fusing.Log(n, e, l))
case Recover(pf, _) (ActorInterpreter.props(settings, List(fusing.Recover(pf)), materializer, att), ())
case Scan(z, f, _) (ActorInterpreter.props(settings, List(fusing.Scan(z, f, settings.supervisionDecider)), materializer, att), ())
case Expand(s, f, _) (ActorInterpreter.props(settings, List(fusing.Expand(s, f)), materializer, att), ())
case Conflate(s, f, _) (ActorInterpreter.props(settings, List(fusing.Conflate(s, f, settings.supervisionDecider)), materializer, att), ())
case Buffer(n, s, _) (ActorInterpreter.props(settings, List(fusing.Buffer(n, s)), materializer, att), ())
case MapConcat(f, _) (ActorInterpreter.props(settings, List(fusing.MapConcat(f, settings.supervisionDecider)), materializer, att), ())
case MapAsync(p, f, _) (ActorInterpreter.props(settings, List(fusing.MapAsync(p, f, settings.supervisionDecider)), materializer, att), ())
case MapAsyncUnordered(p, f, _) (ActorInterpreter.props(settings, List(fusing.MapAsyncUnordered(p, f, settings.supervisionDecider)), materializer, att), ())
case Grouped(n, _) (ActorInterpreter.props(settings, List(fusing.Grouped(n)), materializer, att), ())
case Log(n, e, l, _) (ActorInterpreter.props(settings, List(fusing.Log(n, e, l)), materializer, att), ())
case GroupBy(f, _) (GroupByProcessorImpl.props(settings, f), ())
case PrefixAndTail(n, _) (PrefixAndTailImpl.props(settings, n), ())
case Split(d, _) (SplitWhereProcessorImpl.props(settings, d), ())

View file

@ -26,6 +26,7 @@ private[stream] object Stages {
val map = name("map")
val filter = name("filter")
val collect = name("collect")
val recover = name("recover")
val mapAsync = name("mapAsync")
val mapAsyncUnordered = name("mapAsyncUnordered")
val grouped = name("grouped")
@ -140,6 +141,11 @@ private[stream] object Stages {
override protected def newInstance: StageModule = this.copy()
}
final case class Recover(pf: PartialFunction[Any, Any], attributes: Attributes = recover) extends StageModule {
def withAttributes(attributes: Attributes) = copy(attributes = attributes)
override protected def newInstance: StageModule = this.copy()
}
final case class MapAsync(parallelism: Int, f: Any Future[Any], attributes: Attributes = mapAsync) extends StageModule {
def withAttributes(attributes: Attributes) = copy(attributes = attributes)
override protected def newInstance: StageModule = this.copy()

View file

@ -6,6 +6,7 @@ package akka.stream.impl.fusing
import akka.event.Logging.LogLevel
import akka.event.{ LogSource, Logging, LoggingAdapter }
import akka.stream.Attributes.LogLevels
import akka.stream.Supervision.Resume
import akka.stream.impl.{ FixedSizeBuffer, ReactiveStreamsCompliance }
import akka.stream.stage._
import akka.stream.{ Supervision, _ }
@ -87,6 +88,34 @@ private[akka] final case class Collect[In, Out](pf: PartialFunction[In, Out], de
override def decide(t: Throwable): Supervision.Directive = decider(t)
}
/**
* INTERNAL API
*/
private[akka] final case class Recover[T](pf: PartialFunction[Throwable, T]) extends PushPullStage[T, T] {
import Collect.NotApplied
var recovered: Option[T] = None
override def onPush(elem: T, ctx: Context[T]): SyncDirective = {
ctx.push(elem)
}
override def onPull(ctx: Context[T]): SyncDirective =
recovered match {
case Some(value) ctx.pushAndFinish(value)
case None ctx.pull()
}
override def onUpstreamFailure(t: Throwable, ctx: Context[T]): TerminationDirective = {
pf.applyOrElse(t, NotApplied) match {
case NotApplied ctx.fail(t)
case result: T @unchecked
recovered = Some(result)
ctx.absorbTermination()
}
}
}
/**
* INTERNAL API
*/

View file

@ -4,7 +4,7 @@ import akka.stream.ActorAttributes.Dispatcher
import akka.stream.{ ActorMaterializer, MaterializationContext }
private[stream] object IOSettings {
/** Picks default akka.stream.file-io-dispatcher or the OperationAttributes configured one */
/** Picks default akka.stream.file-io-dispatcher or the Attributes configured one */
def fileIoDispatcher(context: MaterializationContext): String = {
val mat = ActorMaterializer.downcast(context.materializer)
context.effectiveAttributes.attributeList.collectFirst { case d: Dispatcher d.dispatcher } getOrElse {

View file

@ -7,6 +7,7 @@ import akka.event.LoggingAdapter
import akka.stream._
import akka.japi.{ Util, Pair }
import akka.japi.function
import akka.stream.impl.Stages.Recover
import akka.stream.scaladsl
import akka.stream.scaladsl.{ Keep, Sink, Source }
import org.reactivestreams.{ Subscription, Publisher, Subscriber, Processor }
@ -468,6 +469,23 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph
*/
def dropWhile(p: function.Predicate[Out]): javadsl.Flow[In, Out, Mat] = new Flow(delegate.dropWhile(p.test))
/**
* Recover allows to send last element on failure and gracefully complete the stream
* Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements.
* This stage can recover the failure signal, but not the skipped elements, which will be dropped.
*
* '''Emits when''' element is available from the upstream or upstream is failed and pf returns an element
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes or upstream failed with exception pf can handle
*
* '''Cancels when''' downstream cancels
*
*/
def recover[T >: Out](pf: PartialFunction[Throwable, T]): javadsl.Flow[In, T, Mat] =
new Flow(delegate.recover(pf))
/**
* Terminate processing (and cancel the upstream publisher) after the given
* number of elements. Due to input buffering some elements may have been

View file

@ -326,6 +326,14 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour
def map[T](f: function.Function[Out, T]): javadsl.Source[T, Mat] =
new Source(delegate.map(f.apply))
/**
* Recover allows to send last element on failure and gracefully complete the stream
* Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements.
* This stage can recover the failure signal, but not the skipped elements, which will be dropped.
*/
def recover[T >: Out](pf: PartialFunction[Throwable, T]): javadsl.Source[T, Mat] =
new Source(delegate.recover(pf))
/**
* Transform each input element into a sequence of output elements that is
* then flattened into the output stream.

View file

@ -6,6 +6,8 @@ package akka.stream.scaladsl
import scala.language.higherKinds
import akka.event.LoggingAdapter
import akka.stream.impl.Stages.{ Recover, MaterializingStageFactory, StageModule }
import akka.stream.impl.StreamLayout.{ EmptyModule, Module }
import akka.stream._
import akka.stream.Attributes._
import akka.stream.stage._
@ -15,10 +17,15 @@ import akka.stream.impl.Stages.{ DirectProcessor, MaterializingStageFactory, Sta
import akka.stream.impl.StreamLayout.{ EmptyModule, Module }
import akka.util.Collections.EmptyImmutableSeq
import org.reactivestreams.{ Subscription, Publisher, Subscriber, Processor }
import org.reactivestreams.Processor
import scala.annotation.implicitNotFound
import scala.annotation.unchecked.uncheckedVariance
import scala.collection.immutable
import scala.concurrent.duration.{ Duration, FiniteDuration }
import scala.concurrent.Future
import scala.language.higherKinds
import akka.stream.stage._
import akka.stream.impl.{ Stages, StreamLayout, FlowModule }
/**
* A `Flow` is a set of stream processing steps that has one open input and one open output.
@ -374,6 +381,22 @@ trait FlowOps[+Out, +Mat] {
private final val _identity = (x: Any) x
/**
* Recover allows to send last element on failure and gracefully complete the stream
* Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements.
* This stage can recover the failure signal, but not the skipped elements, which will be dropped.
*
* '''Emits when''' element is available from the upstream or upstream is failed and pf returns an element
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes or upstream failed with exception pf can handle
*
* '''Cancels when''' downstream cancels
*
*/
def recover[T >: Out](pf: PartialFunction[Throwable, T]): Repr[T, Mat] = andThen(Recover(pf.asInstanceOf[PartialFunction[Any, Any]]))
/**
* Transform this stream by applying the given function to each of the elements
* as they pass through this processing step.