diff --git a/akka-docs/rst/scala/stream/stages-overview.rst b/akka-docs/rst/scala/stream/stages-overview.rst index 37e66b2125..c4f80b049e 100644 --- a/akka-docs/rst/scala/stream/stages-overview.rst +++ b/akka-docs/rst/scala/stream/stages-overview.rst @@ -636,6 +636,16 @@ the second element is required from downstream. **completes** when upstream completes +scanAsync +^^^^ +Just like ``scan`` but receiving a function that results in a ``Future`` to the next value. + +**emits** when the ``Future`` resulting from the function scanning the element resolves to the next value + +**backpressures** when downstream backpressures + +**completes** when upstream completes and the last ``Future`` is resolved + fold ^^^^ Start with current value ``zero`` and then apply the current and next value to the given function, when upstream @@ -647,6 +657,16 @@ complete the current value is emitted downstream. **completes** when upstream completes +foldAsync +^^^^ +Just like ``fold`` but receiving a function that results in a ``Future`` to the next value. + +**emits** when upstream completes and the last ``Future`` is resolved + +**backpressures** when downstream backpressures + +**completes** when upstream completes and the last ``Future`` is resolved + reduce ^^^^^^ Start with first element and then apply the current and next value to the given function, when upstream diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowScanAsyncSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowScanAsyncSpec.scala new file mode 100644 index 0000000000..59ff4ace61 --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowScanAsyncSpec.scala @@ -0,0 +1,188 @@ +/** + * Copyright (C) 2014-2016 Lightbend Inc. + */ +package akka.stream.scaladsl + +import akka.pattern +import akka.stream.{ ActorAttributes, ActorMaterializer, Supervision } +import akka.stream.impl.ReactiveStreamsCompliance +import akka.stream.testkit.TestSubscriber.Probe +import akka.stream.testkit._ +import akka.stream.testkit.scaladsl._ + +import scala.collection.immutable +import scala.concurrent.Future +import scala.concurrent.duration._ + +class FlowScanAsyncSpec extends StreamSpec { + + implicit val materializer: ActorMaterializer = ActorMaterializer() + implicit val executionContext = materializer.executionContext + + "A ScanAsync" must { + + val sumScanFlow = Flow[Int].scanAsync(0) { (accumulator, next) ⇒ + Future(accumulator + next) + } + + "work with a empty source" in { + Source.empty[Int] + .via(sumScanFlow) + .runWith(TestSink.probe[Int]) + .request(1) + .expectNext(0) + .expectComplete() + } + + "work with a single source" in { + Source.single(1) + .via(sumScanFlow) + .runWith(TestSink.probe[Int]) + .request(2) + .expectNext(0, 1) + .expectComplete() + } + + "work with a large source" in { + val elements = 1 to 100000 + val expectedSum = elements.sum + val eventualActual: Future[Int] = Source(elements) + .via(sumScanFlow) + .runWith(Sink.last) + whenReady(eventualActual) { actual ⇒ assert(actual === expectedSum) } + } + + "work with slow futures" in { + val delay = 500.milliseconds + val delayedFutureScanFlow = Flow[Int].scanAsync(0) { (accumulator, next) ⇒ + pattern.after(delay, system.scheduler)(Future.successful(accumulator + next)) + } + val elements = 1 :: 1 :: Nil + Source(elements) + .via(delayedFutureScanFlow) + .runWith(TestSink.probe[Int]) + .request(3) + .expectNext(100.milliseconds, 0) + .expectNext(1.second, 1) + .expectNext(1.second, 2) + .expectComplete() + } + + "throw error with a failed source" in { + val expected = Utils.TE("failed source") + Source.failed[Int](expected) + .via(sumScanFlow) + .runWith(TestSink.probe[Int]) + .expectSubscriptionAndError(expected) + } + + "with the restarting decider" should { + "skip error values with a failed scan" in { + val elements = 1 :: -1 :: 1 :: Nil + whenFailedScan(elements, 0, decider = Supervision.restartingDecider) + .expectNext(1, 1) + .expectComplete() + } + + "emit zero with a failed future" in { + val elements = 1 :: -1 :: 1 :: Nil + whenFailedFuture(elements, 0, decider = Supervision.restartingDecider) + .expectNext(1, 1) + .expectComplete() + } + } + + "with the resuming decider" should { + "skip values with a failed scan" in { + val elements = 1 :: -1 :: 1 :: Nil + whenFailedScan(elements, 0, decider = Supervision.resumingDecider) + .expectNext(1, 2) + .expectComplete() + } + + "skip values with a failed future" in { + val elements = 1 :: -1 :: 1 :: Nil + whenFailedFuture(elements, 0, decider = Supervision.resumingDecider) + .expectNext(1, 2) + .expectComplete() + } + } + + "with the stopping decider" should { + "throw error with a failed scan function" in { + val expected = Utils.TE("failed scan function") + val elements = -1 :: Nil + whenFailedScan(elements, 0, expected) + .expectError(expected) + } + + "throw error with a failed future" in { + val expected = Utils.TE("failed future generated from scan function") + val elements = -1 :: Nil + whenFailedFuture(elements, 0, expected) + .expectError(expected) + } + + "throw error with a null element" in { + val expectedMessage = ReactiveStreamsCompliance.ElementMustNotBeNullMsg + val elements = "null" :: Nil + val actual = whenNullElement(elements, "") + .expectError() + assert(actual.getClass === classOf[NullPointerException]) + assert(actual.getMessage === expectedMessage) + } + } + + def whenFailedScan( + elements: immutable.Seq[Int], + zero: Int, + throwable: Throwable = new Exception("non fatal exception"), + decider: Supervision.Decider = Supervision.stoppingDecider): Probe[Int] = { + val failedScanFlow = Flow[Int].scanAsync(zero) { (accumulator: Int, next: Int) ⇒ + if (next >= 0) Future(accumulator + next) + else throw throwable + } + Source(elements) + .via(failedScanFlow) + .withAttributes(ActorAttributes.supervisionStrategy(decider)) + .runWith(TestSink.probe[Int]) + .request(elements.size + 1) + .expectNext(zero) + } + + def whenFailedFuture( + elements: immutable.Seq[Int], + zero: Int, + throwable: Throwable = new Exception("non fatal exception"), + decider: Supervision.Decider = Supervision.stoppingDecider): Probe[Int] = { + val failedFutureScanFlow = Flow[Int].scanAsync(zero) { (accumulator: Int, next: Int) ⇒ + if (next >= 0) Future(accumulator + next) + else Future.failed(throwable) + } + Source(elements) + .via(failedFutureScanFlow) + .withAttributes(ActorAttributes.supervisionStrategy(decider)) + .runWith(TestSink.probe[Int]) + .request(elements.size + 1) + .expectNext(zero) + } + + def whenNullElement( + elements: immutable.Seq[String], + zero: String, + decider: Supervision.Decider = Supervision.stoppingDecider): Probe[String] = { + val nullFutureScanFlow: Flow[String, String, _] = Flow[String].scanAsync(zero) { (_: String, next: String) ⇒ + if (next != "null") Future(next) + else Future(null) + } + Source(elements) + .via(nullFutureScanFlow) + .withAttributes(ActorAttributes.supervisionStrategy(decider)) + .runWith(TestSink.probe[String]) + .request(elements.size + 1) + .expectNext(zero) + } + + } + +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala index 5e3ee846aa..fc85edb97c 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala @@ -39,6 +39,7 @@ object Stages { val takeWhile = name("takeWhile") val dropWhile = name("dropWhile") val scan = name("scan") + val scanAsync = name("scanAsync") val fold = name("fold") val foldAsync = name("foldAsync") val reduce = name("reduce") diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index 7c92df8113..e4cac2a4c4 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -387,6 +387,114 @@ final case class Scan[In, Out](zero: Out, f: (Out, In) ⇒ Out) extends GraphSta } } +/** + * INTERNAL API + */ +final case class ScanAsync[In, Out](zero: Out, f: (Out, In) ⇒ Future[Out]) extends GraphStage[FlowShape[In, Out]] { + + import akka.dispatch.ExecutionContexts + + val in = Inlet[In]("ScanAsync.in") + val out = Outlet[Out]("ScanAsync.out") + override val shape: FlowShape[In, Out] = FlowShape[In, Out](in, out) + + override val initialAttributes: Attributes = Attributes.name("scanAsync") + + override val toString: String = "ScanAsync" + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = + new GraphStageLogic(shape) with InHandler with OutHandler { + self ⇒ + + private var current: Out = zero + private var eventualCurrent: Future[Out] = Future.successful(current) + + private def ec = ExecutionContexts.sameThreadExecutionContext + + private lazy val decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider) + + private val ZeroHandler: OutHandler with InHandler = new OutHandler with InHandler { + override def onPush(): Unit = () + + override def onPull(): Unit = { + push(out, current) + setHandlers(in, out, self) + } + + override def onUpstreamFinish(): Unit = setHandler(out, new OutHandler { + override def onPull(): Unit = { + push(out, current) + completeStage() + } + }) + } + + private def onRestart(t: Throwable): Unit = { + current = zero + } + + private def safePull(): Unit = { + if (!hasBeenPulled(in)) { + tryPull(in) + } + } + + private def pushAndPullOrFinish(update: Out): Unit = { + push(out, update) + if (isClosed(in)) { + completeStage() + } else if (isAvailable(out)) { + safePull() + } + } + + private def doSupervision(t: Throwable): Unit = { + decider(t) match { + case Supervision.Stop ⇒ failStage(t) + case Supervision.Resume ⇒ safePull() + case Supervision.Restart ⇒ + onRestart(t) + safePull() + } + } + + private val futureCB = getAsyncCallback[Try[Out]] { + case Success(next) if next != null ⇒ + current = next + pushAndPullOrFinish(next) + case Success(null) ⇒ doSupervision(ReactiveStreamsCompliance.elementMustNotBeNullException) + case Failure(t) ⇒ doSupervision(t) + }.invoke _ + + setHandlers(in, out, ZeroHandler) + + def onPull(): Unit = safePull() + + def onPush(): Unit = { + try { + eventualCurrent = f(current, grab(in)) + + eventualCurrent.value match { + case Some(result) ⇒ futureCB(result) + case _ ⇒ eventualCurrent.onComplete(futureCB)(ec) + } + } catch { + case NonFatal(ex) ⇒ + decider(ex) match { + case Supervision.Stop ⇒ failStage(ex) + case Supervision.Restart ⇒ onRestart(ex) + case Supervision.Resume ⇒ () + } + tryPull(in) + } + } + + override def onUpstreamFinish(): Unit = {} + + override val toString: String = s"ScanAsync.Logic(completed=${eventualCurrent.isCompleted})" + } +} + /** * INTERNAL API */ diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 39d5e1ab90..967dde3e3d 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -551,6 +551,33 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends def scan[T](zero: T)(f: function.Function2[T, Out, T]): javadsl.Flow[In, T, Mat] = new Flow(delegate.scan(zero)(f.apply)) + /** + * Similar to `scan` but with a asynchronous function, + * emits its current value which starts at `zero` and then + * applies the current and next value to the given function `f`, + * emitting a `Future` that resolves to the next current value. + * + * If the function `f` throws an exception and the supervision decision is + * [[akka.stream.Supervision.Restart]] current value starts at `zero` again + * the stream will continue. + * + * If the function `f` throws an exception and the supervision decision is + * [[akka.stream.Supervision.Resume]] current value starts at the previous + * current value, or zero when it doesn't have one, and the stream will continue. + * + * '''Emits when''' the future returned by f` completes + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes and the last future returned by `f` completes + * + * '''Cancels when''' downstream cancels + * + * See also [[FlowOps.scan]] + */ + def scanAsync[T](zero: T)(f: function.Function2[T, Out, CompletionStage[T]]): javadsl.Flow[In, T, Mat] = + new Flow(delegate.scanAsync(zero) { (out, in) ⇒ f(out, in).toScala }) + /** * Similar to `scan` but only emits its result when the upstream completes, * after which it also completes. Applies the given function `f` towards its current and next value, diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 23ce09c080..e1687f96d6 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -1243,6 +1243,32 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap def scan[T](zero: T)(f: function.Function2[T, Out, T]): javadsl.Source[T, Mat] = new Source(delegate.scan(zero)(f.apply)) + /** + * Similar to `scan` but with a asynchronous function, + * emits its current value which starts at `zero` and then + * applies the current and next value to the given function `f`, + * emitting a `Future` that resolves to the next current value. + * + * If the function `f` throws an exception and the supervision decision is + * [[akka.stream.Supervision.Restart]] current value starts at `zero` again + * the stream will continue. + * + * If the function `f` throws an exception and the supervision decision is + * [[akka.stream.Supervision.Resume]] current value starts at the previous + * current value, or zero when it doesn't have one, and the stream will continue. + * + * '''Emits when''' the future returned by f` completes + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes and the last future returned by `f` completes + * + * '''Cancels when''' downstream cancels + * + * See also [[FlowOps.scan]] + */ + def scanAsync[T](zero: T)(f: function.Function2[T, Out, CompletionStage[T]]): javadsl.Source[T, Mat] = + new Source(delegate.scanAsync(zero) { (out, in) ⇒ f(out, in).toScala }) /** * Similar to `scan` but only emits its result when the upstream completes, * after which it also completes. Applies the given function `f` towards its current and next value, diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala index eea2e9c848..4bd335e4a8 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala @@ -387,6 +387,33 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo def scan[T](zero: T)(f: function.Function2[T, Out, T]): SubFlow[In, T, Mat] = new SubFlow(delegate.scan(zero)(f.apply)) + /** + * Similar to `scan` but with a asynchronous function, + * emits its current value which starts at `zero` and then + * applies the current and next value to the given function `f`, + * emitting a `Future` that resolves to the next current value. + * + * If the function `f` throws an exception and the supervision decision is + * [[akka.stream.Supervision.Restart]] current value starts at `zero` again + * the stream will continue. + * + * If the function `f` throws an exception and the supervision decision is + * [[akka.stream.Supervision.Resume]] current value starts at the previous + * current value, or zero when it doesn't have one, and the stream will continue. + * + * '''Emits when''' the future returned by f` completes + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes and the last future returned by `f` completes + * + * '''Cancels when''' downstream cancels + * + * See also [[FlowOps.scan]] + */ + def scanAsync[T](zero: T)(f: function.Function2[T, Out, CompletionStage[T]]): SubFlow[In, T, Mat] = + new SubFlow(delegate.scanAsync(zero) { (out, in) ⇒ f(out, in).toScala }) + /** * Similar to `scan` but only emits its result when the upstream completes, * after which it also completes. Applies the given function `f` towards its current and next value, diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala index c3f08dffcf..93ee8a40fc 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala @@ -385,6 +385,33 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source def scan[T](zero: T)(f: function.Function2[T, Out, T]): SubSource[T, Mat] = new SubSource(delegate.scan(zero)(f.apply)) + /** + * Similar to `scan` but with a asynchronous function, + * emits its current value which starts at `zero` and then + * applies the current and next value to the given function `f`, + * emitting a `Future` that resolves to the next current value. + * + * If the function `f` throws an exception and the supervision decision is + * [[akka.stream.Supervision.Restart]] current value starts at `zero` again + * the stream will continue. + * + * If the function `f` throws an exception and the supervision decision is + * [[akka.stream.Supervision.Resume]] current value starts at the previous + * current value, or zero when it doesn't have one, and the stream will continue. + * + * '''Emits when''' the future returned by f` completes + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes and the last future returned by `f` completes + * + * '''Cancels when''' downstream cancels + * + * See also [[FlowOps.scan]] + */ + def scanAsync[T](zero: T)(f: function.Function2[T, Out, CompletionStage[T]]): SubSource[T, Mat] = + new SubSource(delegate.scanAsync(zero) { (out, in) ⇒ f(out, in).toScala }) + /** * Similar to `scan` but only emits its result when the upstream completes, * after which it also completes. Applies the given function `f` towards its current and next value, diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 93c2d40088..8bca92a0b2 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -761,9 +761,37 @@ trait FlowOps[+Out, +Mat] { * '''Completes when''' upstream completes * * '''Cancels when''' downstream cancels + * + * See also [[FlowOps.scanAsync]] */ def scan[T](zero: T)(f: (T, Out) ⇒ T): Repr[T] = via(Scan(zero, f)) + /** + * Similar to `scan` but with a asynchronous function, + * emits its current value which starts at `zero` and then + * applies the current and next value to the given function `f`, + * emitting a `Future` that resolves to the next current value. + * + * If the function `f` throws an exception and the supervision decision is + * [[akka.stream.Supervision.Restart]] current value starts at `zero` again + * the stream will continue. + * + * If the function `f` throws an exception and the supervision decision is + * [[akka.stream.Supervision.Resume]] current value starts at the previous + * current value, or zero when it doesn't have one, and the stream will continue. + * + * '''Emits when''' the future returned by f` completes + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes and the last future returned by `f` completes + * + * '''Cancels when''' downstream cancels + * + * See also [[FlowOps.scan]] + */ + def scanAsync[T](zero: T)(f: (T, Out) ⇒ Future[T]): Repr[T] = via(ScanAsync(zero, f)) + /** * Similar to `scan` but only emits its result when the upstream completes, * after which it also completes. Applies the given function towards its current and next value, diff --git a/project/MiMa.scala b/project/MiMa.scala index 31e85dd9a8..cf19166d07 100644 --- a/project/MiMa.scala +++ b/project/MiMa.scala @@ -971,20 +971,23 @@ object MiMa extends AutoPlugin { // #21290 new zipWithIndex flow op ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.zipWithIndex"), + // #21541 new ScanAsync flow op + ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.scanAsync"), + // Remove useUntrustedMode which is an internal API and not used anywhere anymore ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.Remoting.useUntrustedMode"), ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.RemoteTransport.useUntrustedMode"), - + // Use OptionVal in remote Send envelope FilterAnyProblemStartingWith("akka.remote.EndpointManager"), FilterAnyProblemStartingWith("akka.remote.Remoting"), FilterAnyProblemStartingWith("akka.remote.RemoteTransport"), FilterAnyProblemStartingWith("akka.remote.InboundMessageDispatcher"), FilterAnyProblemStartingWith("akka.remote.DefaultMessageDispatcher"), - FilterAnyProblemStartingWith("akka.remote.transport"), + FilterAnyProblemStartingWith("akka.remote.transport"), ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.RemoteActorRefProvider.quarantine"), ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.RemoteWatcher.quarantine"), - + // #20644 long uids ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.protobuf.msg.ClusterMessages#UniqueAddressOrBuilder.hasUid2"), ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.protobuf.msg.ClusterMessages#UniqueAddressOrBuilder.getUid2"), @@ -992,7 +995,6 @@ object MiMa extends AutoPlugin { ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.ddata.protobuf.msg.ReplicatorMessages#UniqueAddressOrBuilder.getUid2"), ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.remote.RemoteWatcher.receiveHeartbeatRsp"), ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.remote.RemoteWatcher.selfHeartbeatRspMsg") - ) ) }