diff --git a/akka-docs/rst/java/code/docs/stream/RateTransformationDocTest.java b/akka-docs/rst/java/code/docs/stream/RateTransformationDocTest.java index 695c9f871a..0805cfe6e5 100644 --- a/akka-docs/rst/java/code/docs/stream/RateTransformationDocTest.java +++ b/akka-docs/rst/java/code/docs/stream/RateTransformationDocTest.java @@ -109,7 +109,7 @@ public class RateTransformationDocTest { public void expandShouldRepeatLast() throws Exception { //#expand-last final Flow lastFlow = Flow.of(Double.class) - .expand(d -> d, s -> new Pair<>(s, s)); + .expand(in -> Stream.iterate(in, i -> i).iterator()); //#expand-last final Pair, Future>> probeFut = TestSource. probe(system) @@ -132,15 +132,11 @@ public class RateTransformationDocTest { @SuppressWarnings("unused") //#expand-drift final Flow, NotUsed> driftFlow = Flow.of(Double.class) - .expand(d -> new Pair(d, 0), t -> { - return new Pair<>(t, new Pair<>(t.first(), t.second() + 1)); - }); + .expand(d -> Stream.iterate(0, i -> i + 1).map(i -> new Pair<>(d, i)).iterator()); //#expand-drift final TestLatch latch = new TestLatch(2, system); final Flow, NotUsed> realDriftFlow = Flow.of(Double.class) - .expand(d -> { latch.countDown(); return new Pair(d, 0); }, t -> { - return new Pair<>(t, new Pair<>(t.first(), t.second() + 1)); - }); + .expand(d -> { latch.countDown(); return Stream.iterate(0, i -> i + 1).map(i -> new Pair<>(d, i)).iterator(); }); final Pair, TestSubscriber.Probe>> pubSub = TestSource. probe(system) .via(realDriftFlow) diff --git a/akka-docs/rst/scala/code/docs/stream/RateTransformationDocSpec.scala b/akka-docs/rst/scala/code/docs/stream/RateTransformationDocSpec.scala index 6b428a262a..7bf3de9ff2 100644 --- a/akka-docs/rst/scala/code/docs/stream/RateTransformationDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/stream/RateTransformationDocSpec.scala @@ -63,7 +63,7 @@ class RateTransformationDocSpec extends AkkaSpec { "expand should repeat last" in { //#expand-last val lastFlow = Flow[Double] - .expand(identity)(s => (s, s)) + .expand(Iterator.continually(_)) //#expand-last val (probe, fut) = TestSource.probe[Double] @@ -81,15 +81,11 @@ class RateTransformationDocSpec extends AkkaSpec { "expand should track drift" in { //#expand-drift val driftFlow = Flow[Double] - .expand((_, 0)) { - case (lastElement, drift) => ((lastElement, drift), (lastElement, drift + 1)) - } + .expand(i => Iterator.from(0).map(i -> _)) //#expand-drift val latch = TestLatch(2) val realDriftFlow = Flow[Double] - .expand(d => { latch.countDown(); (d, 0) }) { - case (lastElement, drift) => ((lastElement, drift), (lastElement, drift + 1)) - } + .expand(d => { latch.countDown(); Iterator.from(0).map(d -> _) }) val (pub, sub) = TestSource.probe[Double] .via(realDriftFlow) diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java index 18a013a1be..56b08a685b 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java @@ -30,6 +30,7 @@ import scala.concurrent.duration.FiniteDuration; import java.util.*; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.stream.Stream; import static akka.stream.testkit.StreamTestKit.PublisherProbeSubscription; import static org.junit.Assert.*; @@ -537,17 +538,7 @@ public class FlowTest extends StreamTest { public void mustBeAbleToUseExpand() throws Exception { final JavaTestKit probe = new JavaTestKit(system); final List input = Arrays.asList("A", "B", "C"); - final Flow flow = Flow.of(String.class).expand(new Function() { - @Override - public String apply(String in) throws Exception { - return in; - } - }, new Function>() { - @Override - public Pair apply(String in) throws Exception { - return new Pair(in, in); - } - }); + final Flow flow = Flow.of(String.class).expand(in -> Stream.iterate(in, i -> i).iterator()); final Sink> sink = Sink.head(); Future future = Source.from(input).via(flow).runWith(sink, materializer); String result = Await.result(future, probe.dilated(FiniteDuration.create(3, TimeUnit.SECONDS))); diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java index a46e32f65f..85f038dfb8 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java @@ -31,6 +31,7 @@ import scala.util.Try; import java.util.*; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.stream.Stream; import static akka.stream.testkit.StreamTestKit.PublisherProbeSubscription; import static akka.stream.testkit.TestPublisher.ManualProbe; @@ -439,17 +440,7 @@ public class SourceTest extends StreamTest { public void mustBeAbleToUseExpand() throws Exception { final JavaTestKit probe = new JavaTestKit(system); final List input = Arrays.asList("A", "B", "C"); - Future future = Source.from(input).expand(new Function() { - @Override - public String apply(String in) throws Exception { - return in; - } - }, new Function>() { - @Override - public Pair apply(String in) throws Exception { - return new Pair(in, in); - } - }).runWith(Sink.head(), materializer); + Future future = Source.from(input).expand(in -> Stream.iterate(in, i -> i).iterator()).runWith(Sink.head(), materializer); String result = Await.result(future, probe.dilated(FiniteDuration.create(3, TimeUnit.SECONDS))); assertEquals("A", result); } diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpecKit.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpecKit.scala index d408e5dc96..cc2f396a21 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpecKit.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpecKit.scala @@ -298,14 +298,18 @@ trait GraphInterpreterSpecKit extends AkkaSpec { .init() } - abstract class OneBoundedSetup[T](ops: Array[GraphStageWithMaterializedValue[Shape, Any]]) extends Builder { + implicit class ToGraphStage[I, O](stage: Stage[I, O]) { + def toGS = + new PushPullGraphStage[Any, Any, Any]( + (_) ⇒ stage.asInstanceOf[Stage[Any, Any]], + Attributes.none) + } - def this(ops: Iterable[Stage[_, _]]) = { - this(ops.map { op ⇒ - new PushPullGraphStage[Any, Any, Any]( - (_) ⇒ op.asInstanceOf[Stage[Any, Any]], - Attributes.none) - }.toArray.asInstanceOf[Array[GraphStageWithMaterializedValue[Shape, Any]]]) + abstract class OneBoundedSetup[T](_ops: GraphStageWithMaterializedValue[Shape, Any]*) extends Builder { + val ops = _ops.toArray + + def this(op: Seq[Stage[_, _]], dummy: Int = 42) = { + this(op.map(_.toGS): _*) } val upstream = new UpstreamOneBoundedProbe[T] @@ -339,7 +343,7 @@ trait GraphInterpreterSpecKit extends AkkaSpec { outOwners(0) = Boundary while (i < ops.length) { - val stage = ops(i).asInstanceOf[PushPullGraphStage[_, _, _]] + val stage = ops(i).asInstanceOf[GraphStageWithMaterializedValue[FlowShape[_, _], _]] ins(i) = stage.shape.in inOwners(i) = i outs(i + 1) = stage.shape.out diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpec.scala index fe1c3cd34b..3d89afe1a2 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpec.scala @@ -273,9 +273,7 @@ class InterpreterSpec extends AkkaSpec with GraphInterpreterSpecKit { lastEvents() should be(Set(Cancel)) } - "implement expand" in new OneBoundedSetup[Int](Seq(Expand( - (in: Int) ⇒ in, - (agg: Int) ⇒ (agg, agg)))) { + "implement expand" in new OneBoundedSetup[Int](new Expand(Iterator.continually(_: Int))) { lastEvents() should be(Set(RequestOne)) @@ -339,13 +337,9 @@ class InterpreterSpec extends AkkaSpec with GraphInterpreterSpecKit { } - "work with expand-expand" in new OneBoundedSetup[Int](Seq( - Expand( - (in: Int) ⇒ in, - (agg: Int) ⇒ (agg, agg + 1)), - Expand( - (in: Int) ⇒ in, - (agg: Int) ⇒ (agg, agg + 1)))) { + "work with expand-expand" in new OneBoundedSetup[Int]( + new Expand(Iterator.from), + new Expand(Iterator.from)) { lastEvents() should be(Set(RequestOne)) @@ -376,14 +370,12 @@ class InterpreterSpec extends AkkaSpec with GraphInterpreterSpecKit { lastEvents() should be(Set(OnComplete, OnNext(12))) } - "implement conflate-expand" in new OneBoundedSetup[Int](Seq( + "implement conflate-expand" in new OneBoundedSetup[Int]( Conflate( (in: Int) ⇒ in, (agg: Int, x: Int) ⇒ agg + x, - stoppingDecider), - Expand( - (in: Int) ⇒ in, - (agg: Int) ⇒ (agg, agg)))) { + stoppingDecider).toGS, + new Expand(Iterator.continually(_: Int))) { lastEvents() should be(Set(RequestOne)) diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterStressSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterStressSpec.scala index 62012eb0ee..c273912a3c 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterStressSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterStressSpec.scala @@ -18,13 +18,9 @@ class InterpreterStressSpec extends AkkaSpec with GraphInterpreterSpecKit { val f = (x: Int) ⇒ x + 1 - val map: GraphStageWithMaterializedValue[Shape, Any] = - new PushPullGraphStage[Int, Int, NotUsed]((_) ⇒ Map(f, stoppingDecider), Attributes.none) - .asInstanceOf[GraphStageWithMaterializedValue[Shape, Any]] - "Interpreter" must { - "work with a massive chain of maps" in new OneBoundedSetup[Int](Array.fill(chainLength)(map).asInstanceOf[Array[GraphStageWithMaterializedValue[Shape, Any]]]) { + "work with a massive chain of maps" in new OneBoundedSetup[Int](Array.fill(chainLength)(Map(f, stoppingDecider))) { lastEvents() should be(Set.empty) val tstamp = System.nanoTime() @@ -46,7 +42,7 @@ class InterpreterStressSpec extends AkkaSpec with GraphInterpreterSpecKit { info(s"Chain finished in $time seconds ${(chainLength * repetition) / (time * 1000 * 1000)} million maps/s") } - "work with a massive chain of maps with early complete" in new OneBoundedSetup[Int](Iterable.fill(halfLength)(Map((x: Int) ⇒ x + 1, stoppingDecider)) ++ + "work with a massive chain of maps with early complete" in new OneBoundedSetup[Int](Vector.fill(halfLength)(Map((x: Int) ⇒ x + 1, stoppingDecider)) ++ Seq(Take(repetition / 2)) ++ Seq.fill(halfLength)(Map((x: Int) ⇒ x + 1, stoppingDecider))) { @@ -74,7 +70,7 @@ class InterpreterStressSpec extends AkkaSpec with GraphInterpreterSpecKit { info(s"Chain finished in $time seconds ${(chainLength * repetition) / (time * 1000 * 1000)} million maps/s") } - "work with a massive chain of takes" in new OneBoundedSetup[Int](Iterable.fill(chainLength)(Take(1))) { + "work with a massive chain of takes" in new OneBoundedSetup[Int](Vector.fill(chainLength)(Take(1))) { lastEvents() should be(Set.empty) downstream.requestOne() @@ -85,7 +81,7 @@ class InterpreterStressSpec extends AkkaSpec with GraphInterpreterSpecKit { } - "work with a massive chain of drops" in new OneBoundedSetup[Int](Iterable.fill(chainLength / 1000)(Drop(1))) { + "work with a massive chain of drops" in new OneBoundedSetup[Int](Vector.fill(chainLength / 1000)(Drop(1))) { lastEvents() should be(Set.empty) downstream.requestOne() @@ -103,7 +99,7 @@ class InterpreterStressSpec extends AkkaSpec with GraphInterpreterSpecKit { } - "work with a massive chain of conflates by overflowing to the heap" in new OneBoundedSetup[Int](Iterable.fill(100000)(Conflate( + "work with a massive chain of conflates by overflowing to the heap" in new OneBoundedSetup[Int](Vector.fill(100000)(Conflate( (in: Int) ⇒ in, (agg: Int, in: Int) ⇒ agg + in, Supervision.stoppingDecider))) { diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSupervisionSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSupervisionSpec.scala index 9a5ffeb5c7..35501d3eac 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSupervisionSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSupervisionSpec.scala @@ -451,9 +451,8 @@ class InterpreterSupervisionSpec extends AkkaSpec with GraphInterpreterSpecKit { lastEvents() should be(Set(Cancel)) } - "fail when Expand `seed` throws" in new OneBoundedSetup[Int](Seq(Expand( - (in: Int) ⇒ if (in == 2) throw TE else in, - (agg: Int) ⇒ (agg, -math.abs(agg))))) { + "fail when Expand `seed` throws" in new OneBoundedSetup[Int]( + new Expand((in: Int) ⇒ if (in == 2) throw TE else Iterator(in) ++ Iterator.continually(-math.abs(in)))) { lastEvents() should be(Set(RequestOne)) @@ -473,9 +472,8 @@ class InterpreterSupervisionSpec extends AkkaSpec with GraphInterpreterSpecKit { lastEvents() should be(Set(OnError(TE), Cancel)) } - "fail when Expand `extrapolate` throws" in new OneBoundedSetup[Int](Seq(Expand( - (in: Int) ⇒ in, - (agg: Int) ⇒ if (agg == 2) throw TE else (agg, -math.abs(agg))))) { + "fail when Expand `extrapolate` throws" in new OneBoundedSetup[Int]( + new Expand((in: Int) ⇒ if (in == 2) Iterator.continually(throw TE) else Iterator(in) ++ Iterator.continually(-math.abs(in)))) { lastEvents() should be(Set(RequestOne)) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowExpandSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowExpandSpec.scala index 2dbab7fb8b..111ade99b0 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowExpandSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowExpandSpec.scala @@ -6,10 +6,10 @@ package akka.stream.scaladsl import scala.concurrent.Await import scala.concurrent.duration._ import scala.concurrent.forkjoin.ThreadLocalRandom - import akka.stream.{ ActorMaterializer, ActorMaterializerSettings } - import akka.stream.testkit._ +import akka.stream.testkit.scaladsl.TestSource +import akka.stream.testkit.scaladsl.TestSink class FlowExpandSpec extends AkkaSpec { @@ -28,7 +28,7 @@ class FlowExpandSpec extends AkkaSpec { val subscriber = TestSubscriber.probe[Int]() // Simply repeat the last element as an extrapolation step - Source.fromPublisher(publisher).expand(seed = i ⇒ i)(extrapolate = i ⇒ (i, i)).to(Sink.fromSubscriber(subscriber)).run() + Source.fromPublisher(publisher).expand(Iterator.continually(_)).to(Sink.fromSubscriber(subscriber)).run() for (i ← 1 to 100) { // Order is important here: If the request comes first it will be extrapolated! @@ -44,7 +44,7 @@ class FlowExpandSpec extends AkkaSpec { val subscriber = TestSubscriber.probe[Int]() // Simply repeat the last element as an extrapolation step - Source.fromPublisher(publisher).expand(seed = i ⇒ i)(extrapolate = i ⇒ (i, i)).to(Sink.fromSubscriber(subscriber)).run() + Source.fromPublisher(publisher).expand(Iterator.continually(_)).to(Sink.fromSubscriber(subscriber)).run() publisher.sendNext(42) @@ -66,7 +66,7 @@ class FlowExpandSpec extends AkkaSpec { val subscriber = TestSubscriber.probe[Int]() // Simply repeat the last element as an extrapolation step - Source.fromPublisher(publisher).expand(seed = i ⇒ i)(extrapolate = i ⇒ (i, i)).to(Sink.fromSubscriber(subscriber)).run() + Source.fromPublisher(publisher).expand(Iterator.continually(_)).to(Sink.fromSubscriber(subscriber)).run() publisher.sendNext(1) subscriber.requestNext(1) @@ -84,7 +84,7 @@ class FlowExpandSpec extends AkkaSpec { "work on a variable rate chain" in { val future = Source(1 to 100) .map { i ⇒ if (ThreadLocalRandom.current().nextBoolean()) Thread.sleep(10); i } - .expand(seed = i ⇒ i)(extrapolate = i ⇒ (i, i)) + .expand(Iterator.continually(_)) .runFold(Set.empty[Int])(_ + _) Await.result(future, 10.seconds) should contain theSameElementsAs (1 to 100).toSet @@ -94,7 +94,7 @@ class FlowExpandSpec extends AkkaSpec { val publisher = TestPublisher.probe[Int]() val subscriber = TestSubscriber.probe[Int]() - Source.fromPublisher(publisher).expand(seed = i ⇒ i)(extrapolate = i ⇒ (i, i)).to(Sink.fromSubscriber(subscriber)).run() + Source.fromPublisher(publisher).expand(Iterator.continually(_)).to(Sink.fromSubscriber(subscriber)).run() publisher.sendNext(1) subscriber.requestNext(1) @@ -125,6 +125,26 @@ class FlowExpandSpec extends AkkaSpec { publisher.expectRequest() } + + "work properly with finite extrapolations" in { + val (source, sink) = + TestSource.probe[Int] + .expand(i ⇒ Iterator.from(0).map(i -> _).take(3)) + .toMat(TestSink.probe)(Keep.both) + .run() + source + .sendNext(1) + sink + .request(5) + .expectNext(1 -> 0, 1 -> 1, 1 -> 2) + .expectNoMsg(300.millis) + source + .sendNext(2) + .sendComplete() + sink + .expectNext(2 -> 0) + .expectComplete() + } } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala index 7a2b59fc2b..94592b6f67 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala @@ -204,10 +204,6 @@ private[stream] object Stages { override def create(attr: Attributes): Stage[In, Out] = fusing.Conflate(seed, aggregate, supervision(attr)) } - final case class Expand[In, Out, Seed](seed: In ⇒ Seed, extrapolate: Seed ⇒ (Out, Seed), attributes: Attributes = expand) extends SymbolicStage[In, Out] { - override def create(attr: Attributes): Stage[In, Out] = fusing.Expand(seed, extrapolate) - } - final case class MapConcat[In, Out](f: In ⇒ immutable.Iterable[Out], attributes: Attributes = mapConcat) extends SymbolicStage[In, Out] { override def create(attr: Attributes): Stage[In, Out] = fusing.MapConcat(f, supervision(attr)) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index 02424ff379..7dd91da4ce 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -20,6 +20,7 @@ import scala.util.control.NonFatal import scala.util.{ Failure, Success, Try } import akka.stream.ActorAttributes.SupervisionStrategy import scala.concurrent.duration.{ FiniteDuration, _ } +import akka.stream.impl.Stages.DefaultAttributes /** * INTERNAL API @@ -480,47 +481,54 @@ private[akka] final case class Conflate[In, Out](seed: In ⇒ Out, aggregate: (O /** * INTERNAL API */ -private[akka] final case class Expand[In, Out, Seed](seed: In ⇒ Seed, extrapolate: Seed ⇒ (Out, Seed)) extends DetachedStage[In, Out] { - private var s: Seed = _ - private var started: Boolean = false - private var expanded: Boolean = false +private[akka] final class Expand[In, Out](extrapolate: In ⇒ Iterator[Out]) extends GraphStage[FlowShape[In, Out]] { + private val in = Inlet[In]("expand.in") + private val out = Outlet[Out]("expand.out") - override def onPush(elem: In, ctx: DetachedContext[Out]): UpstreamDirective = { - s = seed(elem) - started = true - expanded = false - if (ctx.isHoldingDownstream) { - val (emit, newS) = extrapolate(s) - s = newS - expanded = true - ctx.pushAndPull(emit) - } else ctx.holdUpstream() + override def initialAttributes = DefaultAttributes.expand + override val shape = FlowShape(in, out) + + override def createLogic(attr: Attributes) = new GraphStageLogic(shape) { + private var iterator: Iterator[Out] = Iterator.empty + private var expanded = false + + override def preStart(): Unit = pull(in) + + setHandler(in, new InHandler { + override def onPush(): Unit = { + iterator = extrapolate(grab(in)) + if (iterator.hasNext) { + if (isAvailable(out)) { + expanded = true + pull(in) + push(out, iterator.next()) + } else expanded = false + } else pull(in) + } + override def onUpstreamFinish(): Unit = { + if (iterator.hasNext && !expanded) () // need to wait + else completeStage() + } + }) + + setHandler(out, new OutHandler { + override def onPull(): Unit = { + if (iterator.hasNext) { + if (expanded == false) { + expanded = true + if (isClosed(in)) { + push(out, iterator.next()) + completeStage() + } else { + // expand needs to pull first to be “fair” when upstream is not actually slow + pull(in) + push(out, iterator.next()) + } + } else push(out, iterator.next()) + } + } + }) } - - override def onPull(ctx: DetachedContext[Out]): DownstreamDirective = { - if (ctx.isFinishing) { - if (!started) ctx.finish() - else ctx.pushAndFinish(extrapolate(s)._1) - } else if (!started) ctx.holdDownstream() - else { - val (emit, newS) = extrapolate(s) - s = newS - expanded = true - if (ctx.isHoldingUpstream) ctx.pushAndPull(emit) - else ctx.push(emit) - } - - } - - override def onUpstreamFinish(ctx: DetachedContext[Out]): TerminationDirective = { - if (expanded) ctx.finish() - else ctx.absorbTermination() - } - - override def decide(t: Throwable): Supervision.Directive = Supervision.Stop - - override def restart(): Expand[In, Out, Seed] = - throw new UnsupportedOperationException("Expand doesn't support restart") } /** diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 3434f41a8d..ca1e61324b 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -824,7 +824,7 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends * * '''Emits when''' downstream stops backpressuring * - * '''Backpressures when''' downstream backpressures + * '''Backpressures when''' downstream backpressures or iterator runs emtpy * * '''Completes when''' upstream completes * @@ -834,11 +834,8 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends * @param extrapolate Takes the current extrapolation state to produce an output element and the next extrapolation * state. */ - def expand[S, U](seed: function.Function[Out, S], extrapolate: function.Function[S, akka.japi.Pair[U, S]]): javadsl.Flow[In, U, Mat] = - new Flow(delegate.expand(seed(_))(s ⇒ { - val p = extrapolate(s) - (p.first, p.second) - })) + def expand[U](extrapolate: function.Function[Out, java.util.Iterator[U]]): javadsl.Flow[In, U, Mat] = + new Flow(delegate.expand(in ⇒ extrapolate(in).asScala)) /** * Adds a fixed size buffer in the flow that allows to store elements from a faster upstream until it becomes full. @@ -1315,7 +1312,7 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends def zipMat[T, M, M2](that: Graph[SourceShape[T], M], matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, Out @uncheckedVariance Pair T, M2] = this.viaMat(Flow.fromGraph(GraphDSL.create(that, - new function.Function2[GraphDSL.Builder[M], SourceShape[T], FlowShape[Out, Out @ uncheckedVariance Pair T]] { + new function.Function2[GraphDSL.Builder[M], SourceShape[T], FlowShape[Out, Out @uncheckedVariance Pair T]] { def apply(b: GraphDSL.Builder[M], s: SourceShape[T]): FlowShape[Out, Out @uncheckedVariance Pair T] = { val zip: FanInShape2[Out, T, Out Pair T] = b.add(Zip.create[Out, T]) b.from(s).toInlet(zip.in1) diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 0ae180ce95..c8b9ea5e97 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -1253,7 +1253,7 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap * * '''Emits when''' downstream stops backpressuring * - * '''Backpressures when''' downstream backpressures + * '''Backpressures when''' downstream backpressures or iterator runs emtpy * * '''Completes when''' upstream completes * @@ -1263,11 +1263,8 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap * @param extrapolate Takes the current extrapolation state to produce an output element and the next extrapolation * state. */ - def expand[S, U](seed: function.Function[Out, S], extrapolate: function.Function[S, akka.japi.Pair[U, S]]): javadsl.Source[U, Mat] = - new Source(delegate.expand(seed(_))(s ⇒ { - val p = extrapolate(s) - (p.first, p.second) - })) + def expand[U](extrapolate: function.Function[Out, java.util.Iterator[U]]): javadsl.Source[U, Mat] = + new Source(delegate.expand(in ⇒ extrapolate(in).asScala)) /** * Adds a fixed size buffer in the flow that allows to store elements from a faster upstream until it becomes full. diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala index 2d452b2238..ab45f4ba09 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala @@ -666,7 +666,7 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo * * '''Emits when''' downstream stops backpressuring * - * '''Backpressures when''' downstream backpressures + * '''Backpressures when''' downstream backpressures or iterator runs emtpy * * '''Completes when''' upstream completes * @@ -676,11 +676,8 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo * @param extrapolate Takes the current extrapolation state to produce an output element and the next extrapolation * state. */ - def expand[S, U](seed: function.Function[Out, S], extrapolate: function.Function[S, akka.japi.Pair[U, S]]): SubFlow[In, U, Mat] = - new SubFlow(delegate.expand(seed(_))(s ⇒ { - val p = extrapolate(s) - (p.first, p.second) - })) + def expand[U](extrapolate: function.Function[Out, java.util.Iterator[U]]): SubFlow[In, U, Mat] = + new SubFlow(delegate.expand(in ⇒ extrapolate(in).asScala)) /** * Adds a fixed size buffer in the flow that allows to store elements from a faster upstream until it becomes full. diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala index fb206e33da..88eec41d19 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala @@ -662,7 +662,7 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source * * '''Emits when''' downstream stops backpressuring * - * '''Backpressures when''' downstream backpressures + * '''Backpressures when''' downstream backpressures or iterator runs emtpy * * '''Completes when''' upstream completes * @@ -672,11 +672,8 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source * @param extrapolate Takes the current extrapolation state to produce an output element and the next extrapolation * state. */ - def expand[S, U](seed: function.Function[Out, S], extrapolate: function.Function[S, akka.japi.Pair[U, S]]): SubSource[U, Mat] = - new SubSource(delegate.expand(seed(_))(s ⇒ { - val p = extrapolate(s) - (p.first, p.second) - })) + def expand[U](extrapolate: function.Function[Out, java.util.Iterator[U]]): SubSource[U, Mat] = + new SubSource(delegate.expand(in ⇒ extrapolate(in).asScala)) /** * Adds a fixed size buffer in the flow that allows to store elements from a faster upstream until it becomes full. diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 1cf316ba30..1958652769 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -922,7 +922,7 @@ trait FlowOps[+Out, +Mat] { * * '''Emits when''' downstream stops backpressuring * - * '''Backpressures when''' downstream backpressures + * '''Backpressures when''' downstream backpressures or iterator runs emtpy * * '''Completes when''' upstream completes * @@ -932,7 +932,7 @@ trait FlowOps[+Out, +Mat] { * @param extrapolate Takes the current extrapolation state to produce an output element and the next extrapolation * state. */ - def expand[S, U](seed: Out ⇒ S)(extrapolate: S ⇒ (U, S)): Repr[U] = andThen(Expand(seed, extrapolate)) + def expand[U](extrapolate: Out ⇒ Iterator[U]): Repr[U] = via(new Expand(extrapolate)) /** * Adds a fixed size buffer in the flow that allows to store elements from a faster upstream until it becomes full.