=str #19423 add satefulMapConcat

This commit is contained in:
Alexander Golubev 2016-01-27 00:00:39 -05:00
parent 3d9ea4415f
commit 2a36859578
11 changed files with 347 additions and 80 deletions

View file

@ -99,6 +99,33 @@ public class FlowTest extends StreamTest {
future.toCompletableFuture().get(200, TimeUnit.MILLISECONDS);
}
@Test
public void mustBeAbleToUseStatefullMaponcat() throws Exception {
final JavaTestKit probe = new JavaTestKit(system);
final java.lang.Iterable<Integer> input = Arrays.asList(1, 2, 3, 4, 5);
final Source<Integer, NotUsed> ints = Source.from(input);
final Flow<Integer, Integer, NotUsed> flow = Flow.of(Integer.class).statefulMapConcat(
new Creator<Function<Integer, Iterable<Integer>>>() {
public Function<Integer, Iterable<Integer>> create() {
int[] state = new int[1];
state[0] = 0;
return new Function<Integer, Iterable<Integer>>() {
public List<Integer> apply(Integer elem) {
List<Integer> list = new ArrayList<>(Collections.nCopies(state[0], elem));
state[0] = elem;
return list;
}
};
}
});
ints.via(flow)
.runFold("", (acc, elem) -> acc + elem, materializer)
.thenAccept(elem -> probe.getRef().tell(elem, ActorRef.noSender()));
probe.expectMsgEquals("2334445555");
}
@Test
public void mustBeAbleToUseIntersperse() throws Exception {
final JavaTestKit probe = new JavaTestKit(system);

View file

@ -303,26 +303,6 @@ class InterpreterSupervisionSpec extends AkkaSpec with GraphInterpreterSpecKit {
lastEvents() should be(Set(OnNext(3)))
}
"resume when MapConcat throws" in new OneBoundedSetup[Int](Seq(
MapConcat((x: Int) if (x == 0) throw TE else List(x, -x), resumingDecider))) {
downstream.requestOne()
lastEvents() should be(Set(RequestOne))
upstream.onNext(1)
lastEvents() should be(Set(OnNext(1)))
downstream.requestOne()
lastEvents() should be(Set(OnNext(-1)))
downstream.requestOne()
lastEvents() should be(Set(RequestOne))
upstream.onNext(0) // boom
lastEvents() should be(Set(RequestOne))
upstream.onNext(2)
lastEvents() should be(Set(OnNext(2)))
downstream.requestOne()
lastEvents() should be(Set(OnNext(-2)))
}
"restart when Collect throws" in {
// TODO can't get type inference to work with `pf` inlined
val pf: PartialFunction[Int, Int] =

View file

@ -3,16 +3,18 @@
*/
package akka.stream.scaladsl
import scala.concurrent.duration._
import akka.stream.ActorMaterializerSettings
import akka.stream.testkit._
import akka.stream.testkit.scaladsl.TestSink
import akka.stream.{ Supervision, ActorAttributes, ActorMaterializer, ActorMaterializerSettings }
import akka.stream.testkit.Utils._
import akka.stream.ActorMaterializer
import akka.stream.testkit._
import scala.util.control.NoStackTrace
class FlowMapConcatSpec extends AkkaSpec with ScriptedTest {
val settings = ActorMaterializerSettings(system)
.withInputBuffer(initialSize = 2, maxSize = 16)
implicit val materializer = ActorMaterializer(settings)
"A MapConcat" must {
@ -27,11 +29,7 @@ class FlowMapConcatSpec extends AkkaSpec with ScriptedTest {
TestConfig.RandomTestRange foreach (_ runScript(script, settings)(_.mapConcat(x (1 to x) map (_ x))))
}
"map and concat grouping with slow downstream" in {
val settings = ActorMaterializerSettings(system)
.withInputBuffer(initialSize = 2, maxSize = 2)
implicit val materializer = ActorMaterializer(settings)
assertAllStagesStopped {
"map and concat grouping with slow downstream" in assertAllStagesStopped {
val s = TestSubscriber.manualProbe[Int]
val input = (1 to 20).grouped(5).toList
Source(input).mapConcat(identity).map(x { Thread.sleep(10); x }).runWith(Sink.fromSubscriber(s))
@ -40,6 +38,15 @@ class FlowMapConcatSpec extends AkkaSpec with ScriptedTest {
for (i 1 to 20) s.expectNext(i)
s.expectComplete()
}
"be able to resume" in assertAllStagesStopped {
val ex = new Exception("TEST") with NoStackTrace
Source(1 to 5).mapConcat(x if (x == 3) throw ex else List(x))
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))
.runWith(TestSink.probe[Int])
.request(4).expectNext(1, 2, 4, 5)
.expectComplete()
}
}

View file

@ -0,0 +1,85 @@
/**
* Copyright (C) 2016 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.scaladsl
import akka.stream.testkit.scaladsl.TestSink
import akka.stream.{ ActorMaterializer, ActorAttributes, Supervision, ActorMaterializerSettings }
import akka.stream.testkit._
import scala.util.control.NoStackTrace
class FlowStatefulMapConcatSpec extends AkkaSpec with ScriptedTest {
val settings = ActorMaterializerSettings(system)
.withInputBuffer(initialSize = 2, maxSize = 16)
implicit val materializer = ActorMaterializer(settings)
val ex = new Exception("TEST") with NoStackTrace
"A StatefulMapConcat" must {
"work in happy case" in {
val script = Script(
Seq(2) -> Seq(),
Seq(1) -> Seq(1, 1),
Seq(3) -> Seq(3),
Seq(6) -> Seq(6, 6, 6))
TestConfig.RandomTestRange foreach (_ runScript(script, settings)(_.statefulMapConcat(() {
var prev: Option[Int] = None
x prev match {
case Some(e)
prev = Some(x)
(1 to e) map (_ x)
case None
prev = Some(x)
List.empty[Int]
}
})))
}
"be able to restart" in {
Source(List(2, 1, 3, 4, 1)).statefulMapConcat(() {
var prev: Option[Int] = None
x {
if (x % 3 == 0) throw ex
prev match {
case Some(e)
prev = Some(x)
(1 to e) map (_ x)
case None
prev = Some(x)
List.empty[Int]
}
}
}).withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider))
.runWith(TestSink.probe[Int])
.request(2).expectNext(1, 1)
.request(4).expectNext(1, 1, 1, 1)
.expectComplete()
}
"be able to resume" in {
Source(List(2, 1, 3, 4, 1)).statefulMapConcat(() {
var prev: Option[Int] = None
x {
if (x % 3 == 0) throw ex
prev match {
case Some(e)
prev = Some(x)
(1 to e) map (_ x)
case None
prev = Some(x)
List.empty[Int]
}
}
}).withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))
.runWith(TestSink.probe[Int])
.request(2).expectNext(1, 1)
.requestNext(4)
.request(4).expectNext(1, 1, 1, 1)
.expectComplete()
}
}
}

View file

@ -52,7 +52,7 @@ private[stream] object Stages {
val batch = name("batch")
val batchWeighted = name("batchWeighted")
val expand = name("expand")
val mapConcat = name("mapConcat")
val statefulMapConcat = name("statefulMapConcat")
val detacher = name("detacher")
val groupBy = name("groupBy")
val prefixAndTail = name("prefixAndTail")
@ -206,10 +206,6 @@ private[stream] object Stages {
override def create(attr: Attributes): Stage[T, T] = fusing.Buffer(size, overflowStrategy)
}
final case class MapConcat[In, Out](f: In immutable.Iterable[Out], attributes: Attributes = mapConcat) extends SymbolicStage[In, Out] {
override def create(attr: Attributes): Stage[In, Out] = fusing.MapConcat(f, supervision(attr))
}
// FIXME: These are not yet proper stages, therefore they use the deprecated StageModule infrastructure
final case class GroupBy(maxSubstreams: Int, f: Any Any, attributes: Attributes = groupBy) extends StageModule {

View file

@ -21,6 +21,7 @@ import scala.util.{ Failure, Success, Try }
import akka.stream.ActorAttributes.SupervisionStrategy
import scala.concurrent.duration.{ FiniteDuration, _ }
import akka.stream.impl.Stages.DefaultAttributes
import akka.NotUsed
/**
* INTERNAL API
@ -121,39 +122,6 @@ private[akka] final case class Recover[T](pf: PartialFunction[Throwable, T]) ext
}
/**
* INTERNAL API
*/
private[akka] final case class MapConcat[In, Out](f: In immutable.Iterable[Out], decider: Supervision.Decider) extends PushPullStage[In, Out] {
private var currentIterator: Iterator[Out] = Iterator.empty
override def onPush(elem: In, ctx: Context[Out]): SyncDirective = {
currentIterator = f(elem).iterator
if (!currentIterator.hasNext) ctx.pull()
else ctx.push(currentIterator.next())
}
override def onPull(ctx: Context[Out]): SyncDirective =
if (ctx.isFinishing) {
if (currentIterator.hasNext) {
val elem = currentIterator.next()
if (currentIterator.hasNext) ctx.push(elem)
else ctx.pushAndFinish(elem)
} else ctx.finish()
} else {
if (currentIterator.hasNext) ctx.push(currentIterator.next())
else ctx.pull()
}
override def onUpstreamFinish(ctx: Context[Out]): TerminationDirective =
if (currentIterator.hasNext) ctx.absorbTermination()
else ctx.finish()
override def decide(t: Throwable): Supervision.Directive = decider(t)
override def restart(): MapConcat[In, Out] = copy()
}
/**
* INTERNAL API
*/
@ -1169,3 +1137,55 @@ private[stream] final class RecoverWith[T, M](pf: PartialFunction[Throwable, Gra
override def toString: String = "RecoverWith"
}
/**
* INTERNAL API
*/
private[stream] final class StatefulMapConcat[In, Out](f: () In immutable.Iterable[Out]) extends GraphStage[FlowShape[In, Out]] {
val in = Inlet[In]("StatefulMapConcat.in")
val out = Outlet[Out]("StatefulMapConcat.out")
override val shape = FlowShape(in, out)
override def initialAttributes: Attributes = DefaultAttributes.statefulMapConcat
def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) with InHandler with OutHandler {
val decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider)
var currentIterator: Iterator[Out] = _
var plainFun = f()
def hasNext = if (currentIterator != null) currentIterator.hasNext else false
setHandlers(in, out, this)
def pushPull(): Unit =
if (hasNext) {
push(out, currentIterator.next())
if (!hasNext && isClosed(in)) completeStage()
} else if (!isClosed(in))
pull(in)
else completeStage()
def onFinish(): Unit = if (!hasNext) completeStage()
override def onPush(): Unit =
try {
currentIterator = plainFun(grab(in)).iterator
pushPull()
} catch {
case NonFatal(ex) decider(ex) match {
case Supervision.Stop failStage(ex)
case Supervision.Resume if (!hasBeenPulled(in)) pull(in)
case Supervision.Restart
restartState()
if (!hasBeenPulled(in)) pull(in)
}
}
override def onUpstreamFinish(): Unit = onFinish()
override def onPull(): Unit = pushPull()
private def restartState(): Unit = {
plainFun = f()
currentIterator = null
}
}
override def toString = "StatefulMapConcat"
}

View file

@ -329,6 +329,36 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
def mapConcat[T](f: function.Function[Out, java.lang.Iterable[T]]): javadsl.Flow[In, T, Mat] =
new Flow(delegate.mapConcat { elem Util.immutableSeq(f(elem)) })
/**
* Transform each input element into an `Iterable` of output elements that is
* then flattened into the output stream. The transformation is meant to be stateful,
* which is enabled by creating the transformation function anew for every materialization
* the returned function will typically close over mutable objects to store state between
* invocations. For the stateless variant see [[#mapConcat]].
*
* Make sure that the `Iterable` is immutable or at least not modified after
* being used as an output sequence. Otherwise the stream may fail with
* `ConcurrentModificationException` or other more subtle errors may occur.
*
* The returned `Iterable` MUST NOT contain `null` values,
* as they are illegal as stream elements - according to the Reactive Streams specification.
*
* '''Emits when''' the mapping function returns an element or there are still remaining elements
* from the previously calculated collection
*
* '''Backpressures when''' downstream backpressures or there are still remaining elements from the
* previously calculated collection
*
* '''Completes when''' upstream completes and all remaining elements has been emitted
*
* '''Cancels when''' downstream cancels
*/
def statefulMapConcat[T](f: function.Creator[function.Function[Out, java.lang.Iterable[T]]]):javadsl.Flow[In, T, Mat] =
new Flow(delegate.statefulMapConcat{ () {
val fun = f.create()
elem Util.immutableSeq(fun(elem))
}})
/**
* Transform this stream by applying the given function to each of the elements
* as they pass through this processing step. The function returns a `CompletionStage` and the
@ -772,6 +802,7 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
* RecoverWith allows to switch to alternative Source on flow failure. It will stay in effect after
* a failure has been recovered so that each time there is a failure it is fed into the `pf` and a new
* Source may be materialized.
*
* Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements.
* This stage can recover the failure signal, but not the skipped elements, which will be dropped.
*

View file

@ -779,6 +779,7 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
* RecoverWith allows to switch to alternative Source on flow failure. It will stay in effect after
* a failure has been recovered so that each time there is a failure it is fed into the `pf` and a new
* Source may be materialized.
*
* Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements.
* This stage can recover the failure signal, but not the skipped elements, which will be dropped.
*
@ -796,7 +797,7 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
new Source(delegate.recoverWith(pf))
/**
* Transform each input element into an `Iterable of output elements that is
* Transform each input element into an `Iterable` of output elements that is
* then flattened into the output stream.
*
* Make sure that the `Iterable` is immutable or at least not modified after
@ -817,7 +818,38 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
* '''Cancels when''' downstream cancels
*/
def mapConcat[T](f: function.Function[Out, _ <: java.lang.Iterable[T]]): javadsl.Source[T, Mat] =
new Source(delegate.mapConcat(elem Util.immutableSeq(f.apply(elem))))
new Source(delegate.statefulMapConcat(() elem Util.immutableSeq(f.apply(elem))))
/**
* Transform each input element into an `Iterable` of output elements that is
* then flattened into the output stream. The transformation is meant to be stateful,
* which is enabled by creating the transformation function anew for every materialization
* the returned function will typically close over mutable objects to store state between
* invocations. For the stateless variant see [[#mapConcat]].
*
* Make sure that the `Iterable` is immutable or at least not modified after
* being used as an output sequence. Otherwise the stream may fail with
* `ConcurrentModificationException` or other more subtle errors may occur.
*
* The returned `Iterable` MUST NOT contain `null` values,
* as they are illegal as stream elements - according to the Reactive Streams specification.
*
* '''Emits when''' the mapping function returns an element or there are still remaining elements
* from the previously calculated collection
*
* '''Backpressures when''' downstream backpressures or there are still remaining elements from the
* previously calculated collection
*
* '''Completes when''' upstream completes and all remaining elements has been emitted
*
* '''Cancels when''' downstream cancels
*/
def statefulMapConcat[T](f: function.Creator[function.Function[Out, java.lang.Iterable[T]]]): javadsl.Source[T, Mat] =
new Source(delegate.statefulMapConcat{ () {
val fun = f.create()
elem Util.immutableSeq(fun(elem))
}})
/**
* Transform this stream by applying the given function to each of the elements

View file

@ -163,7 +163,38 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo
* '''Cancels when''' downstream cancels
*/
def mapConcat[T](f: function.Function[Out, java.lang.Iterable[T]]): SubFlow[In, T, Mat] =
new SubFlow(delegate.mapConcat { elem Util.immutableSeq(f(elem)) })
new SubFlow(delegate.statefulMapConcat { () elem Util.immutableSeq(f(elem)) })
/**
* Transform each input element into an `Iterable` of output elements that is
* then flattened into the output stream. The transformation is meant to be stateful,
* which is enabled by creating the transformation function anew for every materialization
* the returned function will typically close over mutable objects to store state between
* invocations. For the stateless variant see [[#mapConcat]].
*
* Make sure that the `Iterable` is immutable or at least not modified after
* being used as an output sequence. Otherwise the stream may fail with
* `ConcurrentModificationException` or other more subtle errors may occur.
*
* The returned `Iterable` MUST NOT contain `null` values,
* as they are illegal as stream elements - according to the Reactive Streams specification.
*
* '''Emits when''' the mapping function returns an element or there are still remaining elements
* from the previously calculated collection
*
* '''Backpressures when''' downstream backpressures or there are still remaining elements from the
* previously calculated collection
*
* '''Completes when''' upstream completes and all remaining elements has been emitted
*
* '''Cancels when''' downstream cancels
*/
def statefulMapConcat[T](f: function.Creator[function.Function[Out, java.lang.Iterable[T]]]): SubFlow[In, T, Mat] =
new SubFlow(delegate.statefulMapConcat{ () {
val fun = f.create()
elem Util.immutableSeq(fun(elem))
}})
/**
* Transform this stream by applying the given function to each of the elements
@ -610,6 +641,7 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo
* RecoverWith allows to switch to alternative Source on flow failure. It will stay in effect after
* a failure has been recovered so that each time there is a failure it is fed into the `pf` and a new
* Source may be materialized.
*
* Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements.
* This stage can recover the failure signal, but not the skipped elements, which will be dropped.
*

View file

@ -159,7 +159,37 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source
* '''Cancels when''' downstream cancels
*/
def mapConcat[T](f: function.Function[Out, java.lang.Iterable[T]]): SubSource[T, Mat] =
new SubSource(delegate.mapConcat { elem Util.immutableSeq(f(elem)) })
new SubSource(delegate.statefulMapConcat { () elem Util.immutableSeq(f(elem)) })
/**
* Transform each input element into an `Iterable` of output elements that is
* then flattened into the output stream. The transformation is meant to be stateful,
* which is enabled by creating the transformation function anew for every materialization
* the returned function will typically close over mutable objects to store state between
* invocations. For the stateless variant see [[#mapConcat]].
*
* Make sure that the `Iterable` is immutable or at least not modified after
* being used as an output sequence. Otherwise the stream may fail with
* `ConcurrentModificationException` or other more subtle errors may occur.
*
* The returned `Iterable` MUST NOT contain `null` values,
* as they are illegal as stream elements - according to the Reactive Streams specification.
*
* '''Emits when''' the mapping function returns an element or there are still remaining elements
* from the previously calculated collection
*
* '''Backpressures when''' downstream backpressures or there are still remaining elements from the
* previously calculated collection
*
* '''Completes when''' upstream completes and all remaining elements has been emitted
*
* '''Cancels when''' downstream cancels
*/
def statefulMapConcat[T](f: function.Creator[function.Function[Out, java.lang.Iterable[T]]]):SubSource[T, Mat] =
new SubSource(delegate.statefulMapConcat{ () {
val fun = f.create()
elem Util.immutableSeq(fun(elem))
}})
/**
* Transform this stream by applying the given function to each of the elements
@ -606,6 +636,7 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source
* RecoverWith allows to switch to alternative Source on flow failure. It will stay in effect after
* a failure has been recovered so that each time there is a failure it is fed into the `pf` and a new
* Source may be materialized.
*
* Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements.
* This stage can recover the failure signal, but not the skipped elements, which will be dropped.
*

View file

@ -430,6 +430,7 @@ trait FlowOps[+Out, +Mat] {
* RecoverWith allows to switch to alternative Source on flow failure. It will stay in effect after
* a failure has been recovered so that each time there is a failure it is fed into the `pf` and a new
* Source may be materialized.
*
* Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements.
* This stage can recover the failure signal, but not the skipped elements, which will be dropped.
*
@ -479,7 +480,32 @@ trait FlowOps[+Out, +Mat] {
* '''Cancels when''' downstream cancels
*
*/
def mapConcat[T](f: Out immutable.Iterable[T]): Repr[T] = andThen(MapConcat(f))
def mapConcat[T](f: Out immutable.Iterable[T]): Repr[T] = statefulMapConcat(() => f)
/**
* Transform each input element into an `Iterable` of output elements that is
* then flattened into the output stream. The transformation is meant to be stateful,
* which is enabled by creating the transformation function anew for every materialization
* the returned function will typically close over mutable objects to store state between
* invocations. For the stateless variant see [[FlowOps.mapConcat]].
*
* The returned `Iterable` MUST NOT contain `null` values,
* as they are illegal as stream elements - according to the Reactive Streams specification.
*
* '''Emits when''' the mapping function returns an element or there are still remaining elements
* from the previously calculated collection
*
* '''Backpressures when''' downstream backpressures or there are still remaining elements from the
* previously calculated collection
*
* '''Completes when''' upstream completes and all remaining elements has been emitted
*
* '''Cancels when''' downstream cancels
*
* See also [[FlowOps.mapConcat]]
*/
def statefulMapConcat[T](f: () Out immutable.Iterable[T]): Repr[T] =
via(new StatefulMapConcat(f))
/**
* Transform this stream by applying the given function to each of the elements