From 30e4a57ee0ef40906ca41aee6e1df653bbb14ba1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Endre=20S=C3=A1ndor=20Varga?= Date: Tue, 12 May 2015 15:54:26 +0200 Subject: [PATCH] +str #17464: mapConcat accepts immutable.Iterable --- .../java/akka/stream/javadsl/FlowTest.java | 4 +-- .../main/scala/akka/stream/impl/Stages.scala | 2 +- .../scala/akka/stream/impl/fusing/Ops.scala | 20 ++++++++---- .../main/scala/akka/stream/javadsl/Flow.scala | 19 +++++++++--- .../scala/akka/stream/scaladsl/Flow.scala | 6 ++-- .../scala/akka/stream/scaladsl/Source.scala | 31 +------------------ 6 files changed, 36 insertions(+), 46 deletions(-) diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java index 4a166b01f8..66ae9a0c41 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java @@ -57,12 +57,12 @@ public class FlowTest extends StreamTest { } }); final Flow flow2 = Flow.of(String.class).grouped(2 - ).mapConcat(new Function, java.util.List>() { + ).mapConcat(new Function, java.lang.Iterable>() { public java.util.List apply(java.util.List elem) { return elem; } }).groupedWithin(100, FiniteDuration.create(50, TimeUnit.MILLISECONDS) - ).mapConcat(new Function, java.util.List>() { + ).mapConcat(new Function, java.lang.Iterable>() { public java.util.List apply(java.util.List elem) { return elem; } diff --git a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala index 138cfa3522..9cb7a39742 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala @@ -193,7 +193,7 @@ private[stream] object Stages { def withAttributes(attributes: OperationAttributes) = copy(attributes = attributes) override protected def newInstance: StageModule = this.copy() } - final case class MapConcat(f: Any ⇒ immutable.Seq[Any], attributes: OperationAttributes = mapConcat) extends StageModule { + final case class MapConcat(f: Any ⇒ immutable.Iterable[Any], attributes: OperationAttributes = mapConcat) extends StageModule { def withAttributes(attributes: OperationAttributes) = copy(attributes = attributes) override protected def newInstance: StageModule = this.copy() } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index ed4eb52f2d..21772f5dc2 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -64,22 +64,30 @@ private[akka] final case class Collect[In, Out](decider: Supervision.Decider)(pf /** * INTERNAL API */ -private[akka] final case class MapConcat[In, Out](f: In ⇒ immutable.Seq[Out], decider: Supervision.Decider) extends PushPullStage[In, Out] { +private[akka] final case class MapConcat[In, Out](f: In ⇒ immutable.Iterable[Out], decider: Supervision.Decider) extends PushPullStage[In, Out] { private var currentIterator: Iterator[Out] = Iterator.empty override def onPush(elem: In, ctx: Context[Out]): SyncDirective = { currentIterator = f(elem).iterator - if (currentIterator.isEmpty) ctx.pull() + if (!currentIterator.hasNext) ctx.pull() else ctx.push(currentIterator.next()) } override def onPull(ctx: Context[Out]): SyncDirective = - if (currentIterator.hasNext) ctx.push(currentIterator.next()) - else if (ctx.isFinishing) ctx.finish() - else ctx.pull() + if (ctx.isFinishing) { + if (currentIterator.hasNext) { + val elem = currentIterator.next() + if (currentIterator.hasNext) ctx.push(elem) + else ctx.pushAndFinish(elem) + } else ctx.finish() + } else { + if (currentIterator.hasNext) ctx.push(currentIterator.next()) + else ctx.pull() + } override def onUpstreamFinish(ctx: Context[Out]): TerminationDirective = - ctx.absorbTermination() + if (currentIterator.hasNext) ctx.absorbTermination() + else ctx.finish() override def decide(t: Throwable): Supervision.Directive = decider(t) diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index b0d96c17cb..8af5eb1db7 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -9,6 +9,7 @@ import akka.japi.{ Util, Pair } import akka.japi.function import akka.stream.scaladsl import scala.annotation.unchecked.uncheckedVariance +import scala.collection.immutable import scala.concurrent.Future import scala.concurrent.duration.FiniteDuration import akka.stream.stage.Stage @@ -170,10 +171,14 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph new Flow(delegate.map(f.apply)) /** - * Transform each input element into a sequence of output elements that is + * Transform each input element into an `Iterable` of output elements that is * then flattened into the output stream. * - * The returned list MUST NOT contain `null` values, + * Make sure that the `Iterable` is immutable or at least not modified after + * being used as an output sequence. Otherwise the stream may fail with + * `oncurrentModificationException` or other more subtle errors may occur. + * + * The returned `Iterable` MUST NOT contain `null` values, * as they are illegal as stream elements - according to the Reactive Streams specification. * * '''Emits when''' the mapping function returns an element or there are still remaining elements @@ -186,8 +191,14 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph * * '''Cancels when''' downstream cancels */ - def mapConcat[T](f: function.Function[Out, java.util.List[T]]): javadsl.Flow[In, T, Mat] = - new Flow(delegate.mapConcat(elem ⇒ Util.immutableSeq(f.apply(elem)))) + def mapConcat[T](f: function.Function[Out, java.lang.Iterable[T]]): javadsl.Flow[In, T, Mat] = + new Flow(delegate.mapConcat { elem ⇒ + val scalaIterable = new immutable.Iterable[T] { + import collection.JavaConverters._ + override def iterator: Iterator[T] = f(elem).iterator().asScala + } + scalaIterable + }) /** * Transform this stream by applying the given function to each of the elements diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index cc08efca2c..1944be3772 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -361,10 +361,10 @@ trait FlowOps[+Out, +Mat] { def map[T](f: Out ⇒ T): Repr[T, Mat] = andThen(Map(f.asInstanceOf[Any ⇒ Any])) /** - * Transform each input element into a sequence of output elements that is + * Transform each input element into an `Iterable` of output elements that is * then flattened into the output stream. * - * The returned sequence MUST NOT contain `null` values, + * The returned `Iterable` MUST NOT contain `null` values, * as they are illegal as stream elements - according to the Reactive Streams specification. * * '''Emits when''' the mapping function returns an element or there are still remaining elements @@ -378,7 +378,7 @@ trait FlowOps[+Out, +Mat] { * '''Cancels when''' downstream cancels * */ - def mapConcat[T](f: Out ⇒ immutable.Seq[T]): Repr[T, Mat] = andThen(MapConcat(f.asInstanceOf[Any ⇒ immutable.Seq[Any]])) + def mapConcat[T](f: Out ⇒ immutable.Iterable[T]): Repr[T, Mat] = andThen(MapConcat(f.asInstanceOf[Any ⇒ immutable.Iterable[Any]])) /** * Transform this stream by applying the given function to each of the elements diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index acb1cd6e10..99551c371a 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -215,36 +215,7 @@ object Source extends SourceApply { * beginning) regardless of when they subscribed. */ def apply[T](iterable: immutable.Iterable[T]): Source[T, Unit] = { - Source.empty.transform(() ⇒ { - new PushPullStage[Nothing, T] { - var iterator: Iterator[T] = null - - // Delayed init so we onError instead of failing during construction of the Source - def initIterator(): Unit = if (iterator eq null) iterator = iterable.iterator - - // Upstream is guaranteed to be empty - override def onPush(elem: Nothing, ctx: Context[T]): SyncDirective = - throw new UnsupportedOperationException("The IterableSource stage cannot be pushed") - - override def onUpstreamFinish(ctx: Context[T]): TerminationDirective = { - initIterator() - if (iterator.hasNext) ctx.absorbTermination() - else ctx.finish() - } - - override def onPull(ctx: Context[T]): SyncDirective = { - if (!ctx.isFinishing) { - initIterator() - ctx.pull() - } else { - val elem = iterator.next() - if (iterator.hasNext) ctx.push(elem) - else ctx.pushAndFinish(elem) - } - } - } - - }).withAttributes(DefaultAttributes.iterableSource) + Source.single(()).mapConcat((_: Unit) ⇒ iterable).withAttributes(DefaultAttributes.iterableSource) } /**