Add scanAsync to Akka Streams, similar to scan but taking a Future (#21553)

* Add comprehensive tests
* Add documentation

* Damn comma after rebase

* Add documentation for foldAsync and scanAsync

* Rename aggreator and aggreating to current

* Remove out availability check

* Revert removing out and some refactors

* Formatting documentation

* Use after instead of Promise in test

* Use package reference to after and some refactoring
This commit is contained in:
Mateus Dubiela Oliveira 2016-10-17 12:43:11 -02:00 committed by Patrik Nordwall
parent 60bea26171
commit c3abde60d5
10 changed files with 458 additions and 4 deletions

View file

@ -636,6 +636,16 @@ the second element is required from downstream.
**completes** when upstream completes **completes** when upstream completes
scanAsync
^^^^
Just like ``scan`` but receiving a function that results in a ``Future`` to the next value.
**emits** when the ``Future`` resulting from the function scanning the element resolves to the next value
**backpressures** when downstream backpressures
**completes** when upstream completes and the last ``Future`` is resolved
fold fold
^^^^ ^^^^
Start with current value ``zero`` and then apply the current and next value to the given function, when upstream Start with current value ``zero`` and then apply the current and next value to the given function, when upstream
@ -647,6 +657,16 @@ complete the current value is emitted downstream.
**completes** when upstream completes **completes** when upstream completes
foldAsync
^^^^
Just like ``fold`` but receiving a function that results in a ``Future`` to the next value.
**emits** when upstream completes and the last ``Future`` is resolved
**backpressures** when downstream backpressures
**completes** when upstream completes and the last ``Future`` is resolved
reduce reduce
^^^^^^ ^^^^^^
Start with first element and then apply the current and next value to the given function, when upstream Start with first element and then apply the current and next value to the given function, when upstream

View file

@ -0,0 +1,188 @@
/**
* Copyright (C) 2014-2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.scaladsl
import akka.pattern
import akka.stream.{ ActorAttributes, ActorMaterializer, Supervision }
import akka.stream.impl.ReactiveStreamsCompliance
import akka.stream.testkit.TestSubscriber.Probe
import akka.stream.testkit._
import akka.stream.testkit.scaladsl._
import scala.collection.immutable
import scala.concurrent.Future
import scala.concurrent.duration._
class FlowScanAsyncSpec extends StreamSpec {
implicit val materializer: ActorMaterializer = ActorMaterializer()
implicit val executionContext = materializer.executionContext
"A ScanAsync" must {
val sumScanFlow = Flow[Int].scanAsync(0) { (accumulator, next)
Future(accumulator + next)
}
"work with a empty source" in {
Source.empty[Int]
.via(sumScanFlow)
.runWith(TestSink.probe[Int])
.request(1)
.expectNext(0)
.expectComplete()
}
"work with a single source" in {
Source.single(1)
.via(sumScanFlow)
.runWith(TestSink.probe[Int])
.request(2)
.expectNext(0, 1)
.expectComplete()
}
"work with a large source" in {
val elements = 1 to 100000
val expectedSum = elements.sum
val eventualActual: Future[Int] = Source(elements)
.via(sumScanFlow)
.runWith(Sink.last)
whenReady(eventualActual) { actual assert(actual === expectedSum) }
}
"work with slow futures" in {
val delay = 500.milliseconds
val delayedFutureScanFlow = Flow[Int].scanAsync(0) { (accumulator, next)
pattern.after(delay, system.scheduler)(Future.successful(accumulator + next))
}
val elements = 1 :: 1 :: Nil
Source(elements)
.via(delayedFutureScanFlow)
.runWith(TestSink.probe[Int])
.request(3)
.expectNext(100.milliseconds, 0)
.expectNext(1.second, 1)
.expectNext(1.second, 2)
.expectComplete()
}
"throw error with a failed source" in {
val expected = Utils.TE("failed source")
Source.failed[Int](expected)
.via(sumScanFlow)
.runWith(TestSink.probe[Int])
.expectSubscriptionAndError(expected)
}
"with the restarting decider" should {
"skip error values with a failed scan" in {
val elements = 1 :: -1 :: 1 :: Nil
whenFailedScan(elements, 0, decider = Supervision.restartingDecider)
.expectNext(1, 1)
.expectComplete()
}
"emit zero with a failed future" in {
val elements = 1 :: -1 :: 1 :: Nil
whenFailedFuture(elements, 0, decider = Supervision.restartingDecider)
.expectNext(1, 1)
.expectComplete()
}
}
"with the resuming decider" should {
"skip values with a failed scan" in {
val elements = 1 :: -1 :: 1 :: Nil
whenFailedScan(elements, 0, decider = Supervision.resumingDecider)
.expectNext(1, 2)
.expectComplete()
}
"skip values with a failed future" in {
val elements = 1 :: -1 :: 1 :: Nil
whenFailedFuture(elements, 0, decider = Supervision.resumingDecider)
.expectNext(1, 2)
.expectComplete()
}
}
"with the stopping decider" should {
"throw error with a failed scan function" in {
val expected = Utils.TE("failed scan function")
val elements = -1 :: Nil
whenFailedScan(elements, 0, expected)
.expectError(expected)
}
"throw error with a failed future" in {
val expected = Utils.TE("failed future generated from scan function")
val elements = -1 :: Nil
whenFailedFuture(elements, 0, expected)
.expectError(expected)
}
"throw error with a null element" in {
val expectedMessage = ReactiveStreamsCompliance.ElementMustNotBeNullMsg
val elements = "null" :: Nil
val actual = whenNullElement(elements, "")
.expectError()
assert(actual.getClass === classOf[NullPointerException])
assert(actual.getMessage === expectedMessage)
}
}
def whenFailedScan(
elements: immutable.Seq[Int],
zero: Int,
throwable: Throwable = new Exception("non fatal exception"),
decider: Supervision.Decider = Supervision.stoppingDecider): Probe[Int] = {
val failedScanFlow = Flow[Int].scanAsync(zero) { (accumulator: Int, next: Int)
if (next >= 0) Future(accumulator + next)
else throw throwable
}
Source(elements)
.via(failedScanFlow)
.withAttributes(ActorAttributes.supervisionStrategy(decider))
.runWith(TestSink.probe[Int])
.request(elements.size + 1)
.expectNext(zero)
}
def whenFailedFuture(
elements: immutable.Seq[Int],
zero: Int,
throwable: Throwable = new Exception("non fatal exception"),
decider: Supervision.Decider = Supervision.stoppingDecider): Probe[Int] = {
val failedFutureScanFlow = Flow[Int].scanAsync(zero) { (accumulator: Int, next: Int)
if (next >= 0) Future(accumulator + next)
else Future.failed(throwable)
}
Source(elements)
.via(failedFutureScanFlow)
.withAttributes(ActorAttributes.supervisionStrategy(decider))
.runWith(TestSink.probe[Int])
.request(elements.size + 1)
.expectNext(zero)
}
def whenNullElement(
elements: immutable.Seq[String],
zero: String,
decider: Supervision.Decider = Supervision.stoppingDecider): Probe[String] = {
val nullFutureScanFlow: Flow[String, String, _] = Flow[String].scanAsync(zero) { (_: String, next: String)
if (next != "null") Future(next)
else Future(null)
}
Source(elements)
.via(nullFutureScanFlow)
.withAttributes(ActorAttributes.supervisionStrategy(decider))
.runWith(TestSink.probe[String])
.request(elements.size + 1)
.expectNext(zero)
}
}
}

View file

@ -39,6 +39,7 @@ object Stages {
val takeWhile = name("takeWhile") val takeWhile = name("takeWhile")
val dropWhile = name("dropWhile") val dropWhile = name("dropWhile")
val scan = name("scan") val scan = name("scan")
val scanAsync = name("scanAsync")
val fold = name("fold") val fold = name("fold")
val foldAsync = name("foldAsync") val foldAsync = name("foldAsync")
val reduce = name("reduce") val reduce = name("reduce")

View file

@ -387,6 +387,114 @@ final case class Scan[In, Out](zero: Out, f: (Out, In) ⇒ Out) extends GraphSta
} }
} }
/**
* INTERNAL API
*/
final case class ScanAsync[In, Out](zero: Out, f: (Out, In) Future[Out]) extends GraphStage[FlowShape[In, Out]] {
import akka.dispatch.ExecutionContexts
val in = Inlet[In]("ScanAsync.in")
val out = Outlet[Out]("ScanAsync.out")
override val shape: FlowShape[In, Out] = FlowShape[In, Out](in, out)
override val initialAttributes: Attributes = Attributes.name("scanAsync")
override val toString: String = "ScanAsync"
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler with OutHandler {
self
private var current: Out = zero
private var eventualCurrent: Future[Out] = Future.successful(current)
private def ec = ExecutionContexts.sameThreadExecutionContext
private lazy val decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider)
private val ZeroHandler: OutHandler with InHandler = new OutHandler with InHandler {
override def onPush(): Unit = ()
override def onPull(): Unit = {
push(out, current)
setHandlers(in, out, self)
}
override def onUpstreamFinish(): Unit = setHandler(out, new OutHandler {
override def onPull(): Unit = {
push(out, current)
completeStage()
}
})
}
private def onRestart(t: Throwable): Unit = {
current = zero
}
private def safePull(): Unit = {
if (!hasBeenPulled(in)) {
tryPull(in)
}
}
private def pushAndPullOrFinish(update: Out): Unit = {
push(out, update)
if (isClosed(in)) {
completeStage()
} else if (isAvailable(out)) {
safePull()
}
}
private def doSupervision(t: Throwable): Unit = {
decider(t) match {
case Supervision.Stop failStage(t)
case Supervision.Resume safePull()
case Supervision.Restart
onRestart(t)
safePull()
}
}
private val futureCB = getAsyncCallback[Try[Out]] {
case Success(next) if next != null
current = next
pushAndPullOrFinish(next)
case Success(null) doSupervision(ReactiveStreamsCompliance.elementMustNotBeNullException)
case Failure(t) doSupervision(t)
}.invoke _
setHandlers(in, out, ZeroHandler)
def onPull(): Unit = safePull()
def onPush(): Unit = {
try {
eventualCurrent = f(current, grab(in))
eventualCurrent.value match {
case Some(result) futureCB(result)
case _ eventualCurrent.onComplete(futureCB)(ec)
}
} catch {
case NonFatal(ex)
decider(ex) match {
case Supervision.Stop failStage(ex)
case Supervision.Restart onRestart(ex)
case Supervision.Resume ()
}
tryPull(in)
}
}
override def onUpstreamFinish(): Unit = {}
override val toString: String = s"ScanAsync.Logic(completed=${eventualCurrent.isCompleted})"
}
}
/** /**
* INTERNAL API * INTERNAL API
*/ */

View file

@ -551,6 +551,33 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
def scan[T](zero: T)(f: function.Function2[T, Out, T]): javadsl.Flow[In, T, Mat] = def scan[T](zero: T)(f: function.Function2[T, Out, T]): javadsl.Flow[In, T, Mat] =
new Flow(delegate.scan(zero)(f.apply)) new Flow(delegate.scan(zero)(f.apply))
/**
* Similar to `scan` but with a asynchronous function,
* emits its current value which starts at `zero` and then
* applies the current and next value to the given function `f`,
* emitting a `Future` that resolves to the next current value.
*
* If the function `f` throws an exception and the supervision decision is
* [[akka.stream.Supervision.Restart]] current value starts at `zero` again
* the stream will continue.
*
* If the function `f` throws an exception and the supervision decision is
* [[akka.stream.Supervision.Resume]] current value starts at the previous
* current value, or zero when it doesn't have one, and the stream will continue.
*
* '''Emits when''' the future returned by f` completes
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes and the last future returned by `f` completes
*
* '''Cancels when''' downstream cancels
*
* See also [[FlowOps.scan]]
*/
def scanAsync[T](zero: T)(f: function.Function2[T, Out, CompletionStage[T]]): javadsl.Flow[In, T, Mat] =
new Flow(delegate.scanAsync(zero) { (out, in) f(out, in).toScala })
/** /**
* Similar to `scan` but only emits its result when the upstream completes, * Similar to `scan` but only emits its result when the upstream completes,
* after which it also completes. Applies the given function `f` towards its current and next value, * after which it also completes. Applies the given function `f` towards its current and next value,

View file

@ -1243,6 +1243,32 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
def scan[T](zero: T)(f: function.Function2[T, Out, T]): javadsl.Source[T, Mat] = def scan[T](zero: T)(f: function.Function2[T, Out, T]): javadsl.Source[T, Mat] =
new Source(delegate.scan(zero)(f.apply)) new Source(delegate.scan(zero)(f.apply))
/**
* Similar to `scan` but with a asynchronous function,
* emits its current value which starts at `zero` and then
* applies the current and next value to the given function `f`,
* emitting a `Future` that resolves to the next current value.
*
* If the function `f` throws an exception and the supervision decision is
* [[akka.stream.Supervision.Restart]] current value starts at `zero` again
* the stream will continue.
*
* If the function `f` throws an exception and the supervision decision is
* [[akka.stream.Supervision.Resume]] current value starts at the previous
* current value, or zero when it doesn't have one, and the stream will continue.
*
* '''Emits when''' the future returned by f` completes
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes and the last future returned by `f` completes
*
* '''Cancels when''' downstream cancels
*
* See also [[FlowOps.scan]]
*/
def scanAsync[T](zero: T)(f: function.Function2[T, Out, CompletionStage[T]]): javadsl.Source[T, Mat] =
new Source(delegate.scanAsync(zero) { (out, in) f(out, in).toScala })
/** /**
* Similar to `scan` but only emits its result when the upstream completes, * Similar to `scan` but only emits its result when the upstream completes,
* after which it also completes. Applies the given function `f` towards its current and next value, * after which it also completes. Applies the given function `f` towards its current and next value,

View file

@ -387,6 +387,33 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo
def scan[T](zero: T)(f: function.Function2[T, Out, T]): SubFlow[In, T, Mat] = def scan[T](zero: T)(f: function.Function2[T, Out, T]): SubFlow[In, T, Mat] =
new SubFlow(delegate.scan(zero)(f.apply)) new SubFlow(delegate.scan(zero)(f.apply))
/**
* Similar to `scan` but with a asynchronous function,
* emits its current value which starts at `zero` and then
* applies the current and next value to the given function `f`,
* emitting a `Future` that resolves to the next current value.
*
* If the function `f` throws an exception and the supervision decision is
* [[akka.stream.Supervision.Restart]] current value starts at `zero` again
* the stream will continue.
*
* If the function `f` throws an exception and the supervision decision is
* [[akka.stream.Supervision.Resume]] current value starts at the previous
* current value, or zero when it doesn't have one, and the stream will continue.
*
* '''Emits when''' the future returned by f` completes
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes and the last future returned by `f` completes
*
* '''Cancels when''' downstream cancels
*
* See also [[FlowOps.scan]]
*/
def scanAsync[T](zero: T)(f: function.Function2[T, Out, CompletionStage[T]]): SubFlow[In, T, Mat] =
new SubFlow(delegate.scanAsync(zero) { (out, in) f(out, in).toScala })
/** /**
* Similar to `scan` but only emits its result when the upstream completes, * Similar to `scan` but only emits its result when the upstream completes,
* after which it also completes. Applies the given function `f` towards its current and next value, * after which it also completes. Applies the given function `f` towards its current and next value,

View file

@ -385,6 +385,33 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source
def scan[T](zero: T)(f: function.Function2[T, Out, T]): SubSource[T, Mat] = def scan[T](zero: T)(f: function.Function2[T, Out, T]): SubSource[T, Mat] =
new SubSource(delegate.scan(zero)(f.apply)) new SubSource(delegate.scan(zero)(f.apply))
/**
* Similar to `scan` but with a asynchronous function,
* emits its current value which starts at `zero` and then
* applies the current and next value to the given function `f`,
* emitting a `Future` that resolves to the next current value.
*
* If the function `f` throws an exception and the supervision decision is
* [[akka.stream.Supervision.Restart]] current value starts at `zero` again
* the stream will continue.
*
* If the function `f` throws an exception and the supervision decision is
* [[akka.stream.Supervision.Resume]] current value starts at the previous
* current value, or zero when it doesn't have one, and the stream will continue.
*
* '''Emits when''' the future returned by f` completes
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes and the last future returned by `f` completes
*
* '''Cancels when''' downstream cancels
*
* See also [[FlowOps.scan]]
*/
def scanAsync[T](zero: T)(f: function.Function2[T, Out, CompletionStage[T]]): SubSource[T, Mat] =
new SubSource(delegate.scanAsync(zero) { (out, in) f(out, in).toScala })
/** /**
* Similar to `scan` but only emits its result when the upstream completes, * Similar to `scan` but only emits its result when the upstream completes,
* after which it also completes. Applies the given function `f` towards its current and next value, * after which it also completes. Applies the given function `f` towards its current and next value,

View file

@ -761,9 +761,37 @@ trait FlowOps[+Out, +Mat] {
* '''Completes when''' upstream completes * '''Completes when''' upstream completes
* *
* '''Cancels when''' downstream cancels * '''Cancels when''' downstream cancels
*
* See also [[FlowOps.scanAsync]]
*/ */
def scan[T](zero: T)(f: (T, Out) T): Repr[T] = via(Scan(zero, f)) def scan[T](zero: T)(f: (T, Out) T): Repr[T] = via(Scan(zero, f))
/**
* Similar to `scan` but with a asynchronous function,
* emits its current value which starts at `zero` and then
* applies the current and next value to the given function `f`,
* emitting a `Future` that resolves to the next current value.
*
* If the function `f` throws an exception and the supervision decision is
* [[akka.stream.Supervision.Restart]] current value starts at `zero` again
* the stream will continue.
*
* If the function `f` throws an exception and the supervision decision is
* [[akka.stream.Supervision.Resume]] current value starts at the previous
* current value, or zero when it doesn't have one, and the stream will continue.
*
* '''Emits when''' the future returned by f` completes
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes and the last future returned by `f` completes
*
* '''Cancels when''' downstream cancels
*
* See also [[FlowOps.scan]]
*/
def scanAsync[T](zero: T)(f: (T, Out) Future[T]): Repr[T] = via(ScanAsync(zero, f))
/** /**
* Similar to `scan` but only emits its result when the upstream completes, * Similar to `scan` but only emits its result when the upstream completes,
* after which it also completes. Applies the given function towards its current and next value, * after which it also completes. Applies the given function towards its current and next value,

View file

@ -971,20 +971,23 @@ object MiMa extends AutoPlugin {
// #21290 new zipWithIndex flow op // #21290 new zipWithIndex flow op
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.zipWithIndex"), ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.zipWithIndex"),
// #21541 new ScanAsync flow op
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.scanAsync"),
// Remove useUntrustedMode which is an internal API and not used anywhere anymore // Remove useUntrustedMode which is an internal API and not used anywhere anymore
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.Remoting.useUntrustedMode"), ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.Remoting.useUntrustedMode"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.RemoteTransport.useUntrustedMode"), ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.RemoteTransport.useUntrustedMode"),
// Use OptionVal in remote Send envelope // Use OptionVal in remote Send envelope
FilterAnyProblemStartingWith("akka.remote.EndpointManager"), FilterAnyProblemStartingWith("akka.remote.EndpointManager"),
FilterAnyProblemStartingWith("akka.remote.Remoting"), FilterAnyProblemStartingWith("akka.remote.Remoting"),
FilterAnyProblemStartingWith("akka.remote.RemoteTransport"), FilterAnyProblemStartingWith("akka.remote.RemoteTransport"),
FilterAnyProblemStartingWith("akka.remote.InboundMessageDispatcher"), FilterAnyProblemStartingWith("akka.remote.InboundMessageDispatcher"),
FilterAnyProblemStartingWith("akka.remote.DefaultMessageDispatcher"), FilterAnyProblemStartingWith("akka.remote.DefaultMessageDispatcher"),
FilterAnyProblemStartingWith("akka.remote.transport"), FilterAnyProblemStartingWith("akka.remote.transport"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.RemoteActorRefProvider.quarantine"), ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.RemoteActorRefProvider.quarantine"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.RemoteWatcher.quarantine"), ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.RemoteWatcher.quarantine"),
// #20644 long uids // #20644 long uids
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.protobuf.msg.ClusterMessages#UniqueAddressOrBuilder.hasUid2"), ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.protobuf.msg.ClusterMessages#UniqueAddressOrBuilder.hasUid2"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.protobuf.msg.ClusterMessages#UniqueAddressOrBuilder.getUid2"), ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.protobuf.msg.ClusterMessages#UniqueAddressOrBuilder.getUid2"),
@ -992,7 +995,6 @@ object MiMa extends AutoPlugin {
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.ddata.protobuf.msg.ReplicatorMessages#UniqueAddressOrBuilder.getUid2"), ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.ddata.protobuf.msg.ReplicatorMessages#UniqueAddressOrBuilder.getUid2"),
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.remote.RemoteWatcher.receiveHeartbeatRsp"), ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.remote.RemoteWatcher.receiveHeartbeatRsp"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.remote.RemoteWatcher.selfHeartbeatRspMsg") ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.remote.RemoteWatcher.selfHeartbeatRspMsg")
) )
) )
} }