!str - replaces flattenConcat with flatMapConcat

This commit is contained in:
Viktor Klang 2015-11-03 14:46:17 +01:00
parent 1378fedad0
commit 50c6f2267c
22 changed files with 112 additions and 96 deletions

View file

@ -1,6 +1,7 @@
package docs; package docs;
import akka.japi.Pair; import akka.japi.Pair;
import akka.japi.function.Function;
import akka.stream.*; import akka.stream.*;
import akka.stream.javadsl.*; import akka.stream.javadsl.*;
import scala.Option; import scala.Option;
@ -64,7 +65,7 @@ public class MigrationsJava {
FlowGraph.create(builder -> { FlowGraph.create(builder -> {
//... //...
return new FlowShape(inlet, outlet); return new FlowShape<>(inlet, outlet);
}); });
//#graph-create //#graph-create
} }
@ -117,9 +118,14 @@ public class MigrationsJava {
Flow<Integer, Integer, BoxedUnit> emptyFlow2 = Flow.of(Integer.class); Flow<Integer, Integer, BoxedUnit> emptyFlow2 = Flow.of(Integer.class);
//#empty-flow //#empty-flow
//#flattenConcat //#flatMapConcat
Flow.<Source<Integer, BoxedUnit>>create().flattenConcat(); Flow.<Source<Integer, BoxedUnit>>create().
//#flattenConcat <Integer>flatMapConcat(new Function<Source<Integer, BoxedUnit>, Source<Integer, ?>>(){
@Override public Source<Integer, ?> apply(Source<Integer, BoxedUnit> param) throws Exception {
return param;
}
});
//#flatMapConcat
} }
} }

View file

@ -263,12 +263,13 @@ should be replaced by
==================================================================== ====================================================================
To simplify type inference in Java 8 and to make the method more discoverable, ``flatten(FlattenStrategy.concat)`` To simplify type inference in Java 8 and to make the method more discoverable, ``flatten(FlattenStrategy.concat)``
has been removed and replaced with the alternative method ``flatten(FlattenStrategy.concat)``. has been removed and replaced with the alternative method ``flatMapConcat(f)``.
Update procedure Update procedure
---------------- ----------------
1. Replace all occurences of ``flatten(FlattenStrategy.concat)`` with ``flattenConcat()`` 1. Replace all occurrences of ``flatten(FlattenStrategy.concat)`` with ``flatMapConcat(identity)``
2. Consider replacing ``map(f).flatMapConcat(identity)`` with ``flatMapConcat(f)``
Example Example
^^^^^^^ ^^^^^^^
@ -279,7 +280,7 @@ Example
should be replaced by should be replaced by
.. includecode:: code/docs/MigrationsJava.java#flattenConcat .. includecode:: code/docs/MigrationsJava.java#flatMapConcat
FlexiMerge an FlexiRoute has been replaced by GraphStage FlexiMerge an FlexiRoute has been replaced by GraphStage
======================================================== ========================================================

View file

@ -9,7 +9,7 @@ import scala.concurrent.{ Future, ExecutionContext, Promise }
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.util.{ Failure, Success, Try } import scala.util.{ Failure, Success, Try }
class Migrations extends AkkaSpec { class MigrationsScala extends AkkaSpec {
"Examples in migration guide" must { "Examples in migration guide" must {
"compile" in { "compile" in {
@ -110,10 +110,9 @@ class Migrations extends AkkaSpec {
val ticks = Source(1.second, 3.seconds, "tick") val ticks = Source(1.second, 3.seconds, "tick")
//#source-creators //#source-creators
//#flatten //#flatMapConcat
// Please note that the parenthesis is mandatory due to implicit parameters Flow[Source[Int, Any]].flatMapConcat(identity)
Flow[Source[Int, Any]].flattenConcat() //#flatMapConcat
//#flatten
//#port-async //#port-async
class MapAsyncOne[In, Out](f: In Future[Out])(implicit ec: ExecutionContext) class MapAsyncOne[In, Out](f: In Future[Out])(implicit ec: ExecutionContext)

View file

@ -71,7 +71,7 @@ Example
should be replaced by should be replaced by
.. includecode:: code/docs/Migrations.scala#flow-wrap .. includecode:: code/docs/MigrationsScala.scala#flow-wrap
and and
@ -90,7 +90,7 @@ and
Should be replaced by Should be replaced by
.. includecode:: code/docs/Migrations.scala#bidiflow-wrap .. includecode:: code/docs/MigrationsScala.scala#bidiflow-wrap
FlowGraph builder methods have been renamed FlowGraph builder methods have been renamed
=========================================== ===========================================
@ -123,7 +123,7 @@ Example
should be replaced by should be replaced by
.. includecode:: code/docs/Migrations.scala#graph-create .. includecode:: code/docs/MigrationsScala.scala#graph-create
Methods that create Source, Sink, Flow from Graphs have been removed Methods that create Source, Sink, Flow from Graphs have been removed
==================================================================== ====================================================================
@ -180,7 +180,7 @@ Example
should be replaced by should be replaced by
.. includecode:: code/docs/Migrations.scala#graph-create-2 .. includecode:: code/docs/MigrationsScala.scala#graph-create-2
Several Graph builder methods have been removed Several Graph builder methods have been removed
=============================================== ===============================================
@ -213,7 +213,7 @@ Example
should be replaced by should be replaced by
.. includecode:: code/docs/Migrations.scala#graph-edges .. includecode:: code/docs/MigrationsScala.scala#graph-edges
Source constructor name changes Source constructor name changes
=============================== ===============================
@ -249,7 +249,7 @@ Example
should be replaced by should be replaced by
.. includecode:: code/docs/Migrations.scala#source-creators .. includecode:: code/docs/MigrationsScala.scala#source-creators
``flatten(FlattenStrategy)`` has been replaced by named counterparts ``flatten(FlattenStrategy)`` has been replaced by named counterparts
==================================================================== ====================================================================
@ -260,7 +260,8 @@ has been removed and replaced with the alternative method ``flatten(FlattenStrat
Update procedure Update procedure
---------------- ----------------
1. Replace all occurences of ``flatten(FlattenStrategy.concat)`` with ``flattenConcat()`` 1. Replace all occurrences of ``flatten(FlattenStrategy.concat)`` with ``flatMapConcat(identity)``
2. Consider replacing all occurrences of ``map(f).flatMapConcat(identity)`` with ``flatMapConcat(f)``
Example Example
^^^^^^^ ^^^^^^^
@ -272,7 +273,7 @@ Example
should be replaced by should be replaced by
.. includecode:: code/docs/Migrations.scala#flatten .. includecode:: code/docs/MigrationsScala.scala#flatMapConcat
FlexiMerge an FlexiRoute has been replaced by GraphStage FlexiMerge an FlexiRoute has been replaced by GraphStage
======================================================== ========================================================
@ -408,4 +409,4 @@ Example
should be replaced by should be replaced by
.. includecode:: code/docs/Migrations.scala#port-async .. includecode:: code/docs/MigrationsScala.scala#port-async

View file

@ -60,8 +60,7 @@ private[http] object OutgoingConnectionBlueprint {
val requestRendering: Flow[HttpRequest, ByteString, Unit] = Flow[HttpRequest] val requestRendering: Flow[HttpRequest, ByteString, Unit] = Flow[HttpRequest]
.map(RequestRenderingContext(_, hostHeader)) .map(RequestRenderingContext(_, hostHeader))
.via(Flow[RequestRenderingContext].map(requestRendererFactory.renderToSource).named("renderer")) .via(Flow[RequestRenderingContext].flatMapConcat(requestRendererFactory.renderToSource).named("renderer"))
.flattenConcat()
val methodBypass = Flow[HttpRequest].map(_.method) val methodBypass = Flow[HttpRequest].map(_.method)

View file

@ -20,6 +20,7 @@ import akka.http.impl.util._
import akka.http.scaladsl.Http import akka.http.scaladsl.Http
import akka.http.scaladsl.model._ import akka.http.scaladsl.model._
import akka.stream._ import akka.stream._
import akka.stream.impl.ConstantFun
import akka.stream.io._ import akka.stream.io._
import akka.stream.scaladsl._ import akka.stream.scaladsl._
import akka.stream.stage._ import akka.stream.stage._
@ -108,7 +109,7 @@ private[http] object HttpServerBluePrint {
Flow[ResponseRenderingContext] Flow[ResponseRenderingContext]
.via(Flow[ResponseRenderingContext].transform(() new ErrorsTo500ResponseRecovery(log)).named("recover")) // FIXME: simplify after #16394 is closed .via(Flow[ResponseRenderingContext].transform(() new ErrorsTo500ResponseRecovery(log)).named("recover")) // FIXME: simplify after #16394 is closed
.via(Flow[ResponseRenderingContext].transform(() responseRendererFactory.newRenderer).named("renderer")) .via(Flow[ResponseRenderingContext].transform(() responseRendererFactory.newRenderer).named("renderer"))
.flattenConcat() .flatMapConcat(ConstantFun.scalaIdentityFunction)
.via(Flow[ResponseRenderingOutput].transform(() errorLogger(log, "Outgoing response stream error")).named("errorLogger")) .via(Flow[ResponseRenderingOutput].transform(() errorLogger(log, "Outgoing response stream error")).named("errorLogger"))
BidiFlow.fromGraph(FlowGraph.create(requestParsingFlow, rendererPipeline, oneHundredContinueSource)((_, _, _) ()) { implicit b BidiFlow.fromGraph(FlowGraph.create(requestParsingFlow, rendererPipeline, oneHundredContinueSource)((_, _, _) ()) { implicit b

View file

@ -27,11 +27,11 @@ private[http] object MessageToFrameRenderer {
Source.single(FrameEvent.emptyLastContinuationFrame) Source.single(FrameEvent.emptyLastContinuationFrame)
Flow[Message] Flow[Message]
.map { .flatMapConcat {
case BinaryMessage.Strict(data) strictFrames(Opcode.Binary, data) case BinaryMessage.Strict(data) strictFrames(Opcode.Binary, data)
case bm: BinaryMessage streamedFrames(Opcode.Binary, bm.dataStream) case bm: BinaryMessage streamedFrames(Opcode.Binary, bm.dataStream)
case TextMessage.Strict(text) strictFrames(Opcode.Text, ByteString(text, "UTF-8")) case TextMessage.Strict(text) strictFrames(Opcode.Text, ByteString(text, "UTF-8"))
case tm: TextMessage streamedFrames(Opcode.Text, tm.textStream.transform(() new Utf8Encoder)) case tm: TextMessage streamedFrames(Opcode.Text, tm.textStream.transform(() new Utf8Encoder))
}.flattenConcat() }
} }
} }

View file

@ -49,12 +49,11 @@ package object util {
private[http] def headAndTailFlow[T]: Flow[Source[T, Any], (T, Source[T, Unit]), Unit] = private[http] def headAndTailFlow[T]: Flow[Source[T, Any], (T, Source[T, Unit]), Unit] =
Flow[Source[T, Any]] Flow[Source[T, Any]]
.map { .flatMapConcat {
_.prefixAndTail(1) _.prefixAndTail(1)
.filter(_._1.nonEmpty) .filter(_._1.nonEmpty)
.map { case (prefix, tail) (prefix.head, tail) } .map { case (prefix, tail) (prefix.head, tail) }
} }
.flattenConcat()
private[http] def printEvent[T](marker: String): Flow[T, T, Unit] = private[http] def printEvent[T](marker: String): Flow[T, T, Unit] =
Flow[T].transform(() new PushPullStage[T, T] { Flow[T].transform(() new PushPullStage[T, T] {

View file

@ -7,6 +7,7 @@ package akka.http.scaladsl.model
import java.io.File import java.io.File
import akka.event.{ NoLogging, LoggingAdapter } import akka.event.{ NoLogging, LoggingAdapter }
import akka.stream.impl.ConstantFun
import scala.collection.immutable.VectorBuilder import scala.collection.immutable.VectorBuilder
import scala.concurrent.duration.FiniteDuration import scala.concurrent.duration.FiniteDuration
@ -40,7 +41,7 @@ sealed trait Multipart {
val chunks = val chunks =
parts parts
.transform(() BodyPartRenderer.streamed(boundary, charset.nioCharset, partHeadersSizeHint = 128, log)) .transform(() BodyPartRenderer.streamed(boundary, charset.nioCharset, partHeadersSizeHint = 128, log))
.flattenConcat() .flatMapConcat(ConstantFun.scalaIdentityFunction)
HttpEntity.Chunked(mediaType withBoundary boundary, chunks) HttpEntity.Chunked(mediaType withBoundary boundary, chunks)
} }
} }

View file

@ -531,7 +531,7 @@ class RequestParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll {
Right(HttpRequest(method, uri, headers, createEntity(entityParts), protocol)) Right(HttpRequest(method, uri, headers, createEntity(entityParts), protocol))
case (x @ (MessageStartError(_, _) | EntityStreamError(_)), _) Left(x) case (x @ (MessageStartError(_, _) | EntityStreamError(_)), _) Left(x)
} }
.map { x .flatMapConcat { x
Source { Source {
x match { x match {
case Right(request) compactEntity(request.entity).fast.map(x Right(request.withEntity(x))) case Right(request) compactEntity(request.entity).fast.map(x Right(request.withEntity(x)))
@ -539,7 +539,6 @@ class RequestParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll {
} }
} }
} }
.flattenConcat()
.map(strictEqualify) .map(strictEqualify)
.grouped(100000).runWith(Sink.head) .grouped(100000).runWith(Sink.head)
.awaitResult(awaitAtMost) .awaitResult(awaitAtMost)

View file

@ -3,6 +3,7 @@
*/ */
package akka.stream.tck package akka.stream.tck
import akka.stream.impl.ConstantFun
import akka.stream.scaladsl.Sink import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source import akka.stream.scaladsl.Source
import org.reactivestreams.Publisher import org.reactivestreams.Publisher
@ -12,7 +13,7 @@ class FlattenTest extends AkkaPublisherVerification[Int] {
def createPublisher(elements: Long): Publisher[Int] = { def createPublisher(elements: Long): Publisher[Int] = {
val s1 = Source(iterable(elements / 2)) val s1 = Source(iterable(elements / 2))
val s2 = Source(iterable((elements + 1) / 2)) val s2 = Source(iterable((elements + 1) / 2))
Source(List(s1, s2)).flattenConcat().runWith(Sink.publisher) Source(List(s1, s2)).flatMapConcat(ConstantFun.scalaIdentityFunction).runWith(Sink.publisher)
} }
} }

View file

@ -10,6 +10,7 @@ import akka.japi.JavaPartialFunction;
import akka.japi.Pair; import akka.japi.Pair;
import akka.japi.function.*; import akka.japi.function.*;
import akka.stream.*; import akka.stream.*;
import akka.stream.impl.ConstantFun;
import akka.stream.javadsl.FlowGraph.Builder; import akka.stream.javadsl.FlowGraph.Builder;
import akka.stream.stage.*; import akka.stream.stage.*;
import akka.stream.testkit.AkkaSpec; import akka.stream.testkit.AkkaSpec;
@ -481,12 +482,12 @@ public class FlowTest extends StreamTest {
final Iterable<Integer> input1 = Arrays.asList(1, 2, 3); final Iterable<Integer> input1 = Arrays.asList(1, 2, 3);
final Iterable<Integer> input2 = Arrays.asList(4, 5); final Iterable<Integer> input2 = Arrays.asList(4, 5);
final List<Source<Integer, BoxedUnit>> mainInputs = new ArrayList<Source<Integer,BoxedUnit>>(); final List<Source<Integer, ?>> mainInputs = new ArrayList<Source<Integer,?>>();
mainInputs.add(Source.from(input1)); mainInputs.add(Source.from(input1));
mainInputs.add(Source.from(input2)); mainInputs.add(Source.from(input2));
final Flow<Source<Integer, BoxedUnit>, List<Integer>, BoxedUnit> flow = Flow.<Source<Integer, BoxedUnit>>create(). final Flow<Source<Integer, ?>, List<Integer>, ?> flow = Flow.<Source<Integer, ?>>create().
<Integer>flattenConcat().grouped(6); flatMapConcat(ConstantFun.<Source<Integer, ?>>javaIdentityFunction()).grouped(6);
Future<List<Integer>> future = Source.from(mainInputs).via(flow) Future<List<Integer>> future = Source.from(mainInputs).via(flow)
.runWith(Sink.<List<Integer>>head(), materializer); .runWith(Sink.<List<Integer>>head(), materializer);

View file

@ -15,6 +15,7 @@ import akka.stream.Graph;
import akka.stream.OverflowStrategy; import akka.stream.OverflowStrategy;
import akka.stream.StreamTest; import akka.stream.StreamTest;
import akka.stream.UniformFanInShape; import akka.stream.UniformFanInShape;
import akka.stream.impl.ConstantFun;
import akka.stream.stage.*; import akka.stream.stage.*;
import akka.stream.testkit.AkkaSpec; import akka.stream.testkit.AkkaSpec;
import akka.stream.testkit.TestPublisher; import akka.stream.testkit.TestPublisher;
@ -348,12 +349,13 @@ public class SourceTest extends StreamTest {
final Iterable<Integer> input1 = Arrays.asList(1, 2, 3); final Iterable<Integer> input1 = Arrays.asList(1, 2, 3);
final Iterable<Integer> input2 = Arrays.asList(4, 5); final Iterable<Integer> input2 = Arrays.asList(4, 5);
final List<Source<Integer, BoxedUnit>> mainInputs = new ArrayList<Source<Integer,BoxedUnit>>(); final List<Source<Integer, ?>> mainInputs = new ArrayList<Source<Integer,?>>();
mainInputs.add(Source.from(input1)); mainInputs.add(Source.from(input1));
mainInputs.add(Source.from(input2)); mainInputs.add(Source.from(input2));
Future<List<Integer>> future = Source.from(mainInputs) Future<List<Integer>> future = Source.from(mainInputs)
.<Integer>flattenConcat().grouped(6) .<Integer>flatMapConcat(ConstantFun.<Source<Integer,?>>javaIdentityFunction())
.grouped(6)
.runWith(Sink.<List<Integer>>head(), materializer); .runWith(Sink.<List<Integer>>head(), materializer);
List<Integer> result = Await.result(future, probe.dilated(FiniteDuration.create(3, TimeUnit.SECONDS))); List<Integer> result = Await.result(future, probe.dilated(FiniteDuration.create(3, TimeUnit.SECONDS)));

View file

@ -3,6 +3,8 @@
*/ */
package akka.stream.scaladsl package akka.stream.scaladsl
import akka.stream.impl.ConstantFun
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.util.control.NoStackTrace import scala.util.control.NoStackTrace
import akka.stream.ActorMaterializer import akka.stream.ActorMaterializer
@ -31,7 +33,7 @@ class FlowConcatAllSpec extends AkkaSpec {
val main = Source(List(s1, s2, s3, s4, s5)) val main = Source(List(s1, s2, s3, s4, s5))
val subscriber = TestSubscriber.manualProbe[Int]() val subscriber = TestSubscriber.manualProbe[Int]()
main.flattenConcat().to(Sink(subscriber)).run() main.flatMapConcat(ConstantFun.scalaIdentityFunction).to(Sink(subscriber)).run()
val subscription = subscriber.expectSubscription() val subscription = subscriber.expectSubscription()
subscription.request(10) subscription.request(10)
for (i 1 to 10) for (i 1 to 10)
@ -42,7 +44,7 @@ class FlowConcatAllSpec extends AkkaSpec {
"work together with SplitWhen" in { "work together with SplitWhen" in {
val subscriber = TestSubscriber.manualProbe[Int]() val subscriber = TestSubscriber.manualProbe[Int]()
Source(1 to 10).splitWhen(_ % 2 == 0).flattenConcat().runWith(Sink(subscriber)) Source(1 to 10).splitWhen(_ % 2 == 0).flatMapConcat(ConstantFun.scalaIdentityFunction).runWith(Sink(subscriber))
val subscription = subscriber.expectSubscription() val subscription = subscriber.expectSubscription()
subscription.request(10) subscription.request(10)
for (i (1 to 10)) for (i (1 to 10))
@ -54,7 +56,7 @@ class FlowConcatAllSpec extends AkkaSpec {
"on onError on master stream cancel the current open substream and signal error" in assertAllStagesStopped { "on onError on master stream cancel the current open substream and signal error" in assertAllStagesStopped {
val publisher = TestPublisher.manualProbe[Source[Int, _]]() val publisher = TestPublisher.manualProbe[Source[Int, _]]()
val subscriber = TestSubscriber.manualProbe[Int]() val subscriber = TestSubscriber.manualProbe[Int]()
Source(publisher).flattenConcat().to(Sink(subscriber)).run() Source(publisher).flatMapConcat(ConstantFun.scalaIdentityFunction).to(Sink(subscriber)).run()
val upstream = publisher.expectSubscription() val upstream = publisher.expectSubscription()
val downstream = subscriber.expectSubscription() val downstream = subscriber.expectSubscription()
@ -74,7 +76,7 @@ class FlowConcatAllSpec extends AkkaSpec {
"on onError on master stream cancel the currently opening substream and signal error" in assertAllStagesStopped { "on onError on master stream cancel the currently opening substream and signal error" in assertAllStagesStopped {
val publisher = TestPublisher.manualProbe[Source[Int, _]]() val publisher = TestPublisher.manualProbe[Source[Int, _]]()
val subscriber = TestSubscriber.manualProbe[Int]() val subscriber = TestSubscriber.manualProbe[Int]()
Source(publisher).flattenConcat().to(Sink(subscriber)).run() Source(publisher).flatMapConcat(ConstantFun.scalaIdentityFunction).to(Sink(subscriber)).run()
val upstream = publisher.expectSubscription() val upstream = publisher.expectSubscription()
val downstream = subscriber.expectSubscription() val downstream = subscriber.expectSubscription()
@ -94,10 +96,27 @@ class FlowConcatAllSpec extends AkkaSpec {
subUpstream.expectCancellation() subUpstream.expectCancellation()
} }
"on onError on opening substream, cancel the master stream and signal error " in assertAllStagesStopped {
val publisher = TestPublisher.manualProbe[Source[Int, _]]()
val subscriber = TestSubscriber.manualProbe[Int]()
Source(publisher).flatMapConcat(_ throw testException).to(Sink(subscriber)).run()
val upstream = publisher.expectSubscription()
val downstream = subscriber.expectSubscription()
downstream.request(1000)
val substreamPublisher = TestPublisher.manualProbe[Int]()
val substreamFlow = Source(substreamPublisher)
upstream.expectRequest()
upstream.sendNext(substreamFlow)
subscriber.expectError(testException)
upstream.expectCancellation()
}
"on onError on open substream, cancel the master stream and signal error " in assertAllStagesStopped { "on onError on open substream, cancel the master stream and signal error " in assertAllStagesStopped {
val publisher = TestPublisher.manualProbe[Source[Int, _]]() val publisher = TestPublisher.manualProbe[Source[Int, _]]()
val subscriber = TestSubscriber.manualProbe[Int]() val subscriber = TestSubscriber.manualProbe[Int]()
Source(publisher).flattenConcat().to(Sink(subscriber)).run() Source(publisher).flatMapConcat(ConstantFun.scalaIdentityFunction).to(Sink(subscriber)).run()
val upstream = publisher.expectSubscription() val upstream = publisher.expectSubscription()
val downstream = subscriber.expectSubscription() val downstream = subscriber.expectSubscription()
@ -117,7 +136,7 @@ class FlowConcatAllSpec extends AkkaSpec {
"on cancellation cancel the current open substream and the master stream" in assertAllStagesStopped { "on cancellation cancel the current open substream and the master stream" in assertAllStagesStopped {
val publisher = TestPublisher.manualProbe[Source[Int, _]]() val publisher = TestPublisher.manualProbe[Source[Int, _]]()
val subscriber = TestSubscriber.manualProbe[Int]() val subscriber = TestSubscriber.manualProbe[Int]()
Source(publisher).flattenConcat().to(Sink(subscriber)).run() Source(publisher).flatMapConcat(ConstantFun.scalaIdentityFunction).to(Sink(subscriber)).run()
val upstream = publisher.expectSubscription() val upstream = publisher.expectSubscription()
val downstream = subscriber.expectSubscription() val downstream = subscriber.expectSubscription()
@ -138,7 +157,7 @@ class FlowConcatAllSpec extends AkkaSpec {
"on cancellation cancel the currently opening substream and the master stream" in assertAllStagesStopped { "on cancellation cancel the currently opening substream and the master stream" in assertAllStagesStopped {
val publisher = TestPublisher.manualProbe[Source[Int, _]]() val publisher = TestPublisher.manualProbe[Source[Int, _]]()
val subscriber = TestSubscriber.manualProbe[Int]() val subscriber = TestSubscriber.manualProbe[Int]()
Source(publisher).flattenConcat().to(Sink(subscriber)).run() Source(publisher).flatMapConcat(ConstantFun.scalaIdentityFunction).to(Sink(subscriber)).run()
val upstream = publisher.expectSubscription() val upstream = publisher.expectSubscription()
val downstream = subscriber.expectSubscription() val downstream = subscriber.expectSubscription()
@ -162,7 +181,11 @@ class FlowConcatAllSpec extends AkkaSpec {
val up = TestPublisher.manualProbe[Source[Int, _]]() val up = TestPublisher.manualProbe[Source[Int, _]]()
val down = TestSubscriber.manualProbe[Int]() val down = TestSubscriber.manualProbe[Int]()
val flowSubscriber = Source.subscriber[Source[Int, _]].flattenConcat().to(Sink(down)).run() val flowSubscriber = Source
.subscriber[Source[Int, _]]
.flatMapConcat(ConstantFun.scalaIdentityFunction)
.to(Sink(down))
.run()
val downstream = down.expectSubscription() val downstream = down.expectSubscription()
downstream.cancel() downstream.cancel()

View file

@ -255,7 +255,7 @@ private[akka] object ActorProcessorFactory {
case GroupBy(f, _) (GroupByProcessorImpl.props(settings, f), ()) case GroupBy(f, _) (GroupByProcessorImpl.props(settings, f), ())
case PrefixAndTail(n, _) (PrefixAndTailImpl.props(settings, n), ()) case PrefixAndTail(n, _) (PrefixAndTailImpl.props(settings, n), ())
case Split(d, _) (SplitWhereProcessorImpl.props(settings, d), ()) case Split(d, _) (SplitWhereProcessorImpl.props(settings, d), ())
case ConcatAll(_) (ConcatAllImpl.props(materializer), ()) case ConcatAll(f, _) (ConcatAllImpl.props(f, materializer), ())
case DirectProcessor(p, m) throw new AssertionError("DirectProcessor cannot end up in ActorProcessorFactory") case DirectProcessor(p, m) throw new AssertionError("DirectProcessor cannot end up in ActorProcessorFactory")
} }
} }

View file

@ -4,28 +4,27 @@
package akka.stream.impl package akka.stream.impl
import akka.stream.ActorMaterializer import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink import akka.stream.scaladsl.{ Source, Sink }
import akka.actor.{ Deploy, Props } import akka.actor.{ Deploy, Props }
/** /**
* INTERNAL API * INTERNAL API
*/ */
private[akka] object ConcatAllImpl { private[akka] object ConcatAllImpl {
def props(materializer: ActorMaterializer): Props = def props(f: Any Source[Any, _], materializer: ActorMaterializer): Props =
Props(new ConcatAllImpl(materializer)).withDeploy(Deploy.local) Props(new ConcatAllImpl(f, materializer)).withDeploy(Deploy.local)
} }
/** /**
* INTERNAL API * INTERNAL API
*/ */
private[akka] class ConcatAllImpl(materializer: ActorMaterializer) private[akka] class ConcatAllImpl(f: Any Source[Any, _], materializer: ActorMaterializer)
extends MultiStreamInputProcessor(materializer.settings) { extends MultiStreamInputProcessor(materializer.settings) {
import akka.stream.impl.MultiStreamInputProcessor._ import akka.stream.impl.MultiStreamInputProcessor._
val takeNextSubstream = TransferPhase(primaryInputs.NeedsInput && primaryOutputs.NeedsDemand) { () val takeNextSubstream = TransferPhase(primaryInputs.NeedsInput && primaryOutputs.NeedsDemand) { ()
val Extract.Source(source) = primaryInputs.dequeueInputElement() val publisher = f(primaryInputs.dequeueInputElement()).runWith(Sink.publisher)(materializer)
val publisher = source.runWith(Sink.publisher)(materializer)
// FIXME we can pass the flow to createSubstreamInput (but avoiding copy impl now) // FIXME we can pass the flow to createSubstreamInput (but avoiding copy impl now)
val inputs = createAndSubscribeSubstreamInput(publisher) val inputs = createAndSubscribeSubstreamInput(publisher)
nextPhase(streamSubstream(inputs)) nextPhase(streamSubstream(inputs))

View file

@ -6,7 +6,7 @@ package akka.stream.impl
import akka.japi.function.{ Function JFun, Function2 JFun2 } import akka.japi.function.{ Function JFun, Function2 JFun2 }
import akka.japi.{ Pair JPair } import akka.japi.{ Pair JPair }
private[stream] object ConstantFun { private[akka] object ConstantFun {
private[this] val JavaIdentityFunction = new JFun[Any, Any] { private[this] val JavaIdentityFunction = new JFun[Any, Any] {
@throws(classOf[Exception]) override def apply(param: Any): Any = param @throws(classOf[Exception]) override def apply(param: Any): Any = param
} }

View file

@ -1,32 +0,0 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.impl
import akka.stream.{ scaladsl, javadsl }
/**
* INTERNAL API
*
* Unapply methods aware of both DSLs.
* Use these instead of manually casting to [[scaladsl.Source]].
*/
private[akka] object Extract {
object Source {
def unapply(a: Any): Option[scaladsl.Source[Any, _]] = a match {
case s: scaladsl.Source[_, _] Some(s)
case s: javadsl.Source[_, _] Some(s.asScala)
case _ None
}
}
object Sink {
def unapply(a: Any): Option[scaladsl.Sink[Nothing, _]] = a match {
case s: scaladsl.Sink[_, _] Some(s)
case s: javadsl.Sink[_, _] Some(s.asScala)
case _ None
}
}
}

View file

@ -10,6 +10,7 @@ import akka.stream.Supervision.Decider
import akka.stream._ import akka.stream._
import akka.stream.impl.SplitDecision.{ Continue, SplitAfter, SplitBefore, SplitDecision } import akka.stream.impl.SplitDecision.{ Continue, SplitAfter, SplitBefore, SplitDecision }
import akka.stream.impl.StreamLayout._ import akka.stream.impl.StreamLayout._
import akka.stream.scaladsl.Source
import akka.stream.stage.AbstractStage.PushPullGraphStage import akka.stream.stage.AbstractStage.PushPullGraphStage
import akka.stream.stage.Stage import akka.stream.stage.Stage
import org.reactivestreams.Processor import org.reactivestreams.Processor
@ -232,7 +233,7 @@ private[stream] object Stages {
def after(f: Any Boolean) = Split(el if (f(el)) SplitAfter else Continue, name("splitAfter")) def after(f: Any Boolean) = Split(el if (f(el)) SplitAfter else Continue, name("splitAfter"))
} }
final case class ConcatAll(attributes: Attributes = concatAll) extends StageModule { final case class ConcatAll(f: Any Source[Any, _], attributes: Attributes = concatAll) extends StageModule {
override def withAttributes(attributes: Attributes) = copy(attributes = attributes) override def withAttributes(attributes: Attributes) = copy(attributes = attributes)
} }

View file

@ -839,8 +839,10 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
new Flow(delegate.splitAfter(p.test).map(_.asJava)) new Flow(delegate.splitAfter(p.test).map(_.asJava))
/** /**
* Transforms a stream of streams into a contiguous stream of elements using the provided flattening strategy. * Transform each input element into a `Source` of output elements that is
* This operation can be used on a stream of element type `Source[U]`. * then flattened into the output stream by concatenation,
* fully consuming one Source after the other.
*
* '''Emits when''' a currently consumed substream has an element available * '''Emits when''' a currently consumed substream has an element available
* *
* '''Backpressures when''' downstream backpressures * '''Backpressures when''' downstream backpressures
@ -850,8 +852,8 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
* '''Cancels when''' downstream cancels * '''Cancels when''' downstream cancels
* *
*/ */
def flattenConcat[U](): javadsl.Flow[In, U, Mat] = def flatMapConcat[T](f: function.Function[Out, Source[T, _]]): Flow[In, T, Mat] =
new Flow(delegate.flattenConcat[U]()(conforms[U].asInstanceOf[Out <:< scaladsl.Source[U, _]])) new Flow(delegate.flatMapConcat[T](x f(x).asScala))
/** /**
* Concatenate the given [[Source]] to this [[Flow]], meaning that once this * Concatenate the given [[Source]] to this [[Flow]], meaning that once this

View file

@ -840,11 +840,21 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
new Source(delegate.splitAfter(p.test).map(_.asJava)) new Source(delegate.splitAfter(p.test).map(_.asJava))
/** /**
* Transforms a stream of streams into a contiguous stream of elements using the provided flattening strategy. * Transform each input element into a `Source` of output elements that is
* This operation can be used on a stream of element type `Source[U]`. * then flattened into the output stream by concatenation,
* fully consuming one Source after the other.
*
* '''Emits when''' a currently consumed substream has an element available
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes and all consumed substreams complete
*
* '''Cancels when''' downstream cancels
*
*/ */
def flattenConcat[U](): javadsl.Source[U, Mat] = def flatMapConcat[T](f: function.Function[Out, Source[T, _]]): Source[T, Mat] =
new Source(delegate.flattenConcat[U]()(conforms[U].asInstanceOf[Out <:< scaladsl.Source[U, _]])) new Source(delegate.flatMapConcat[T](x f(x).asScala))
/** /**
* If the first element has not passed through this stage before the provided timeout, the stream is failed * If the first element has not passed through this stage before the provided timeout, the stream is failed

View file

@ -995,7 +995,9 @@ trait FlowOps[+Out, +Mat] {
deprecatedAndThen(Split.after(p.asInstanceOf[Any Boolean])) deprecatedAndThen(Split.after(p.asInstanceOf[Any Boolean]))
/** /**
* Flattens a stream of [[Source]]s into a contiguous stream by fully consuming one stream after the other. * Transform each input element into a `Source` of output elements that is
* then flattened into the output stream by concatenation,
* fully consuming one Source after the other.
* *
* '''Emits when''' a currently consumed substream has an element available * '''Emits when''' a currently consumed substream has an element available
* *
@ -1006,7 +1008,8 @@ trait FlowOps[+Out, +Mat] {
* '''Cancels when''' downstream cancels * '''Cancels when''' downstream cancels
* *
*/ */
def flattenConcat[U]()(implicit ev: Out <:< Source[U, _]): Repr[U, Mat] = deprecatedAndThen(ConcatAll()) def flatMapConcat[T](f: Out Source[T, _]): Repr[T, Mat] =
deprecatedAndThen(ConcatAll(f.asInstanceOf[Any Source[Any, _]]))
/** /**
* If the first element has not passed through this stage before the provided timeout, the stream is failed * If the first element has not passed through this stage before the provided timeout, the stream is failed