+str #17464: mapConcat accepts immutable.Iterable

This commit is contained in:
Endre Sándor Varga 2015-05-12 15:54:26 +02:00
parent 050a930f4f
commit 30e4a57ee0
6 changed files with 36 additions and 46 deletions

View file

@ -57,12 +57,12 @@ public class FlowTest extends StreamTest {
} }
}); });
final Flow<String, String, ?> flow2 = Flow.of(String.class).grouped(2 final Flow<String, String, ?> flow2 = Flow.of(String.class).grouped(2
).mapConcat(new Function<java.util.List<String>, java.util.List<String>>() { ).mapConcat(new Function<java.util.List<String>, java.lang.Iterable<String>>() {
public java.util.List<String> apply(java.util.List<String> elem) { public java.util.List<String> apply(java.util.List<String> elem) {
return elem; return elem;
} }
}).groupedWithin(100, FiniteDuration.create(50, TimeUnit.MILLISECONDS) }).groupedWithin(100, FiniteDuration.create(50, TimeUnit.MILLISECONDS)
).mapConcat(new Function<java.util.List<String>, java.util.List<String>>() { ).mapConcat(new Function<java.util.List<String>, java.lang.Iterable<String>>() {
public java.util.List<String> apply(java.util.List<String> elem) { public java.util.List<String> apply(java.util.List<String> elem) {
return elem; return elem;
} }

View file

@ -193,7 +193,7 @@ private[stream] object Stages {
def withAttributes(attributes: OperationAttributes) = copy(attributes = attributes) def withAttributes(attributes: OperationAttributes) = copy(attributes = attributes)
override protected def newInstance: StageModule = this.copy() override protected def newInstance: StageModule = this.copy()
} }
final case class MapConcat(f: Any immutable.Seq[Any], attributes: OperationAttributes = mapConcat) extends StageModule { final case class MapConcat(f: Any immutable.Iterable[Any], attributes: OperationAttributes = mapConcat) extends StageModule {
def withAttributes(attributes: OperationAttributes) = copy(attributes = attributes) def withAttributes(attributes: OperationAttributes) = copy(attributes = attributes)
override protected def newInstance: StageModule = this.copy() override protected def newInstance: StageModule = this.copy()
} }

View file

@ -64,22 +64,30 @@ private[akka] final case class Collect[In, Out](decider: Supervision.Decider)(pf
/** /**
* INTERNAL API * INTERNAL API
*/ */
private[akka] final case class MapConcat[In, Out](f: In immutable.Seq[Out], decider: Supervision.Decider) extends PushPullStage[In, Out] { private[akka] final case class MapConcat[In, Out](f: In immutable.Iterable[Out], decider: Supervision.Decider) extends PushPullStage[In, Out] {
private var currentIterator: Iterator[Out] = Iterator.empty private var currentIterator: Iterator[Out] = Iterator.empty
override def onPush(elem: In, ctx: Context[Out]): SyncDirective = { override def onPush(elem: In, ctx: Context[Out]): SyncDirective = {
currentIterator = f(elem).iterator currentIterator = f(elem).iterator
if (currentIterator.isEmpty) ctx.pull() if (!currentIterator.hasNext) ctx.pull()
else ctx.push(currentIterator.next()) else ctx.push(currentIterator.next())
} }
override def onPull(ctx: Context[Out]): SyncDirective = override def onPull(ctx: Context[Out]): SyncDirective =
if (currentIterator.hasNext) ctx.push(currentIterator.next()) if (ctx.isFinishing) {
else if (ctx.isFinishing) ctx.finish() if (currentIterator.hasNext) {
else ctx.pull() val elem = currentIterator.next()
if (currentIterator.hasNext) ctx.push(elem)
else ctx.pushAndFinish(elem)
} else ctx.finish()
} else {
if (currentIterator.hasNext) ctx.push(currentIterator.next())
else ctx.pull()
}
override def onUpstreamFinish(ctx: Context[Out]): TerminationDirective = override def onUpstreamFinish(ctx: Context[Out]): TerminationDirective =
ctx.absorbTermination() if (currentIterator.hasNext) ctx.absorbTermination()
else ctx.finish()
override def decide(t: Throwable): Supervision.Directive = decider(t) override def decide(t: Throwable): Supervision.Directive = decider(t)

View file

@ -9,6 +9,7 @@ import akka.japi.{ Util, Pair }
import akka.japi.function import akka.japi.function
import akka.stream.scaladsl import akka.stream.scaladsl
import scala.annotation.unchecked.uncheckedVariance import scala.annotation.unchecked.uncheckedVariance
import scala.collection.immutable
import scala.concurrent.Future import scala.concurrent.Future
import scala.concurrent.duration.FiniteDuration import scala.concurrent.duration.FiniteDuration
import akka.stream.stage.Stage import akka.stream.stage.Stage
@ -170,10 +171,14 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph
new Flow(delegate.map(f.apply)) new Flow(delegate.map(f.apply))
/** /**
* Transform each input element into a sequence of output elements that is * Transform each input element into an `Iterable` of output elements that is
* then flattened into the output stream. * then flattened into the output stream.
* *
* The returned list MUST NOT contain `null` values, * Make sure that the `Iterable` is immutable or at least not modified after
* being used as an output sequence. Otherwise the stream may fail with
* `oncurrentModificationException` or other more subtle errors may occur.
*
* The returned `Iterable` MUST NOT contain `null` values,
* as they are illegal as stream elements - according to the Reactive Streams specification. * as they are illegal as stream elements - according to the Reactive Streams specification.
* *
* '''Emits when''' the mapping function returns an element or there are still remaining elements * '''Emits when''' the mapping function returns an element or there are still remaining elements
@ -186,8 +191,14 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph
* *
* '''Cancels when''' downstream cancels * '''Cancels when''' downstream cancels
*/ */
def mapConcat[T](f: function.Function[Out, java.util.List[T]]): javadsl.Flow[In, T, Mat] = def mapConcat[T](f: function.Function[Out, java.lang.Iterable[T]]): javadsl.Flow[In, T, Mat] =
new Flow(delegate.mapConcat(elem Util.immutableSeq(f.apply(elem)))) new Flow(delegate.mapConcat { elem
val scalaIterable = new immutable.Iterable[T] {
import collection.JavaConverters._
override def iterator: Iterator[T] = f(elem).iterator().asScala
}
scalaIterable
})
/** /**
* Transform this stream by applying the given function to each of the elements * Transform this stream by applying the given function to each of the elements

View file

@ -361,10 +361,10 @@ trait FlowOps[+Out, +Mat] {
def map[T](f: Out T): Repr[T, Mat] = andThen(Map(f.asInstanceOf[Any Any])) def map[T](f: Out T): Repr[T, Mat] = andThen(Map(f.asInstanceOf[Any Any]))
/** /**
* Transform each input element into a sequence of output elements that is * Transform each input element into an `Iterable` of output elements that is
* then flattened into the output stream. * then flattened into the output stream.
* *
* The returned sequence MUST NOT contain `null` values, * The returned `Iterable` MUST NOT contain `null` values,
* as they are illegal as stream elements - according to the Reactive Streams specification. * as they are illegal as stream elements - according to the Reactive Streams specification.
* *
* '''Emits when''' the mapping function returns an element or there are still remaining elements * '''Emits when''' the mapping function returns an element or there are still remaining elements
@ -378,7 +378,7 @@ trait FlowOps[+Out, +Mat] {
* '''Cancels when''' downstream cancels * '''Cancels when''' downstream cancels
* *
*/ */
def mapConcat[T](f: Out immutable.Seq[T]): Repr[T, Mat] = andThen(MapConcat(f.asInstanceOf[Any immutable.Seq[Any]])) def mapConcat[T](f: Out immutable.Iterable[T]): Repr[T, Mat] = andThen(MapConcat(f.asInstanceOf[Any immutable.Iterable[Any]]))
/** /**
* Transform this stream by applying the given function to each of the elements * Transform this stream by applying the given function to each of the elements

View file

@ -215,36 +215,7 @@ object Source extends SourceApply {
* beginning) regardless of when they subscribed. * beginning) regardless of when they subscribed.
*/ */
def apply[T](iterable: immutable.Iterable[T]): Source[T, Unit] = { def apply[T](iterable: immutable.Iterable[T]): Source[T, Unit] = {
Source.empty.transform(() { Source.single(()).mapConcat((_: Unit) iterable).withAttributes(DefaultAttributes.iterableSource)
new PushPullStage[Nothing, T] {
var iterator: Iterator[T] = null
// Delayed init so we onError instead of failing during construction of the Source
def initIterator(): Unit = if (iterator eq null) iterator = iterable.iterator
// Upstream is guaranteed to be empty
override def onPush(elem: Nothing, ctx: Context[T]): SyncDirective =
throw new UnsupportedOperationException("The IterableSource stage cannot be pushed")
override def onUpstreamFinish(ctx: Context[T]): TerminationDirective = {
initIterator()
if (iterator.hasNext) ctx.absorbTermination()
else ctx.finish()
}
override def onPull(ctx: Context[T]): SyncDirective = {
if (!ctx.isFinishing) {
initIterator()
ctx.pull()
} else {
val elem = iterator.next()
if (iterator.hasNext) ctx.push(elem)
else ctx.pushAndFinish(elem)
}
}
}
}).withAttributes(DefaultAttributes.iterableSource)
} }
/** /**