diff --git a/akka-docs-dev/rst/java/code/docs/MigrationsJava.java b/akka-docs-dev/rst/java/code/docs/MigrationsJava.java index 1a61b92a3b..a2c3b0eefe 100644 --- a/akka-docs-dev/rst/java/code/docs/MigrationsJava.java +++ b/akka-docs-dev/rst/java/code/docs/MigrationsJava.java @@ -1,6 +1,7 @@ package docs; import akka.japi.Pair; +import akka.japi.function.Function; import akka.stream.*; import akka.stream.javadsl.*; import scala.Option; @@ -64,7 +65,7 @@ public class MigrationsJava { FlowGraph.create(builder -> { //... - return new FlowShape(inlet, outlet); + return new FlowShape<>(inlet, outlet); }); //#graph-create } @@ -117,9 +118,14 @@ public class MigrationsJava { Flow emptyFlow2 = Flow.of(Integer.class); //#empty-flow - //#flattenConcat - Flow.>create().flattenConcat(); - //#flattenConcat + //#flatMapConcat + Flow.>create(). + flatMapConcat(new Function, Source>(){ + @Override public Source apply(Source param) throws Exception { + return param; + } + }); + //#flatMapConcat } } diff --git a/akka-docs-dev/rst/java/migration-guide-1.0-2.x-java.rst b/akka-docs-dev/rst/java/migration-guide-1.0-2.x-java.rst index c83b7e565c..e081d33f65 100644 --- a/akka-docs-dev/rst/java/migration-guide-1.0-2.x-java.rst +++ b/akka-docs-dev/rst/java/migration-guide-1.0-2.x-java.rst @@ -263,12 +263,13 @@ should be replaced by ==================================================================== To simplify type inference in Java 8 and to make the method more discoverable, ``flatten(FlattenStrategy.concat)`` -has been removed and replaced with the alternative method ``flatten(FlattenStrategy.concat)``. +has been removed and replaced with the alternative method ``flatMapConcat(f)``. Update procedure ---------------- -1. Replace all occurences of ``flatten(FlattenStrategy.concat)`` with ``flattenConcat()`` +1. Replace all occurrences of ``flatten(FlattenStrategy.concat)`` with ``flatMapConcat(identity)`` +2. Consider replacing ``map(f).flatMapConcat(identity)`` with ``flatMapConcat(f)`` Example ^^^^^^^ @@ -279,7 +280,7 @@ Example should be replaced by -.. includecode:: code/docs/MigrationsJava.java#flattenConcat +.. includecode:: code/docs/MigrationsJava.java#flatMapConcat FlexiMerge an FlexiRoute has been replaced by GraphStage ======================================================== diff --git a/akka-docs-dev/rst/scala/code/docs/Migrations.scala b/akka-docs-dev/rst/scala/code/docs/MigrationsScala.scala similarity index 97% rename from akka-docs-dev/rst/scala/code/docs/Migrations.scala rename to akka-docs-dev/rst/scala/code/docs/MigrationsScala.scala index 5c17569633..12f770abe6 100644 --- a/akka-docs-dev/rst/scala/code/docs/Migrations.scala +++ b/akka-docs-dev/rst/scala/code/docs/MigrationsScala.scala @@ -9,7 +9,7 @@ import scala.concurrent.{ Future, ExecutionContext, Promise } import scala.concurrent.duration._ import scala.util.{ Failure, Success, Try } -class Migrations extends AkkaSpec { +class MigrationsScala extends AkkaSpec { "Examples in migration guide" must { "compile" in { @@ -110,10 +110,9 @@ class Migrations extends AkkaSpec { val ticks = Source(1.second, 3.seconds, "tick") //#source-creators - //#flatten - // Please note that the parenthesis is mandatory due to implicit parameters - Flow[Source[Int, Any]].flattenConcat() - //#flatten + //#flatMapConcat + Flow[Source[Int, Any]].flatMapConcat(identity) + //#flatMapConcat //#port-async class MapAsyncOne[In, Out](f: In ⇒ Future[Out])(implicit ec: ExecutionContext) diff --git a/akka-docs-dev/rst/scala/migration-guide-1.0-2.x-scala.rst b/akka-docs-dev/rst/scala/migration-guide-1.0-2.x-scala.rst index 4758055cbe..7b516ed45d 100644 --- a/akka-docs-dev/rst/scala/migration-guide-1.0-2.x-scala.rst +++ b/akka-docs-dev/rst/scala/migration-guide-1.0-2.x-scala.rst @@ -71,7 +71,7 @@ Example should be replaced by -.. includecode:: code/docs/Migrations.scala#flow-wrap +.. includecode:: code/docs/MigrationsScala.scala#flow-wrap and @@ -90,7 +90,7 @@ and Should be replaced by -.. includecode:: code/docs/Migrations.scala#bidiflow-wrap +.. includecode:: code/docs/MigrationsScala.scala#bidiflow-wrap FlowGraph builder methods have been renamed =========================================== @@ -123,7 +123,7 @@ Example should be replaced by -.. includecode:: code/docs/Migrations.scala#graph-create +.. includecode:: code/docs/MigrationsScala.scala#graph-create Methods that create Source, Sink, Flow from Graphs have been removed ==================================================================== @@ -180,7 +180,7 @@ Example should be replaced by -.. includecode:: code/docs/Migrations.scala#graph-create-2 +.. includecode:: code/docs/MigrationsScala.scala#graph-create-2 Several Graph builder methods have been removed =============================================== @@ -213,7 +213,7 @@ Example should be replaced by -.. includecode:: code/docs/Migrations.scala#graph-edges +.. includecode:: code/docs/MigrationsScala.scala#graph-edges Source constructor name changes =============================== @@ -249,7 +249,7 @@ Example should be replaced by -.. includecode:: code/docs/Migrations.scala#source-creators +.. includecode:: code/docs/MigrationsScala.scala#source-creators ``flatten(FlattenStrategy)`` has been replaced by named counterparts ==================================================================== @@ -260,7 +260,8 @@ has been removed and replaced with the alternative method ``flatten(FlattenStrat Update procedure ---------------- -1. Replace all occurences of ``flatten(FlattenStrategy.concat)`` with ``flattenConcat()`` +1. Replace all occurrences of ``flatten(FlattenStrategy.concat)`` with ``flatMapConcat(identity)`` +2. Consider replacing all occurrences of ``map(f).flatMapConcat(identity)`` with ``flatMapConcat(f)`` Example ^^^^^^^ @@ -272,7 +273,7 @@ Example should be replaced by -.. includecode:: code/docs/Migrations.scala#flatten +.. includecode:: code/docs/MigrationsScala.scala#flatMapConcat FlexiMerge an FlexiRoute has been replaced by GraphStage ======================================================== @@ -408,4 +409,4 @@ Example should be replaced by -.. includecode:: code/docs/Migrations.scala#port-async \ No newline at end of file +.. includecode:: code/docs/MigrationsScala.scala#port-async \ No newline at end of file diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/client/OutgoingConnectionBlueprint.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/client/OutgoingConnectionBlueprint.scala index 950ef41e14..ccd0af28a3 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/client/OutgoingConnectionBlueprint.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/client/OutgoingConnectionBlueprint.scala @@ -60,8 +60,7 @@ private[http] object OutgoingConnectionBlueprint { val requestRendering: Flow[HttpRequest, ByteString, Unit] = Flow[HttpRequest] .map(RequestRenderingContext(_, hostHeader)) - .via(Flow[RequestRenderingContext].map(requestRendererFactory.renderToSource).named("renderer")) - .flattenConcat() + .via(Flow[RequestRenderingContext].flatMapConcat(requestRendererFactory.renderToSource).named("renderer")) val methodBypass = Flow[HttpRequest].map(_.method) diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala index ea12116193..71346f4b4c 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala @@ -20,6 +20,7 @@ import akka.http.impl.util._ import akka.http.scaladsl.Http import akka.http.scaladsl.model._ import akka.stream._ +import akka.stream.impl.ConstantFun import akka.stream.io._ import akka.stream.scaladsl._ import akka.stream.stage._ @@ -108,7 +109,7 @@ private[http] object HttpServerBluePrint { Flow[ResponseRenderingContext] .via(Flow[ResponseRenderingContext].transform(() ⇒ new ErrorsTo500ResponseRecovery(log)).named("recover")) // FIXME: simplify after #16394 is closed .via(Flow[ResponseRenderingContext].transform(() ⇒ responseRendererFactory.newRenderer).named("renderer")) - .flattenConcat() + .flatMapConcat(ConstantFun.scalaIdentityFunction) .via(Flow[ResponseRenderingOutput].transform(() ⇒ errorLogger(log, "Outgoing response stream error")).named("errorLogger")) BidiFlow.fromGraph(FlowGraph.create(requestParsingFlow, rendererPipeline, oneHundredContinueSource)((_, _, _) ⇒ ()) { implicit b ⇒ diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/ws/MessageToFrameRenderer.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/ws/MessageToFrameRenderer.scala index 707db8a92b..4bc7f6191a 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/ws/MessageToFrameRenderer.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/ws/MessageToFrameRenderer.scala @@ -27,11 +27,11 @@ private[http] object MessageToFrameRenderer { Source.single(FrameEvent.emptyLastContinuationFrame) Flow[Message] - .map { + .flatMapConcat { case BinaryMessage.Strict(data) ⇒ strictFrames(Opcode.Binary, data) case bm: BinaryMessage ⇒ streamedFrames(Opcode.Binary, bm.dataStream) case TextMessage.Strict(text) ⇒ strictFrames(Opcode.Text, ByteString(text, "UTF-8")) case tm: TextMessage ⇒ streamedFrames(Opcode.Text, tm.textStream.transform(() ⇒ new Utf8Encoder)) - }.flattenConcat() + } } } diff --git a/akka-http-core/src/main/scala/akka/http/impl/util/package.scala b/akka-http-core/src/main/scala/akka/http/impl/util/package.scala index c2acb0aae7..fdff173f60 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/util/package.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/util/package.scala @@ -49,12 +49,11 @@ package object util { private[http] def headAndTailFlow[T]: Flow[Source[T, Any], (T, Source[T, Unit]), Unit] = Flow[Source[T, Any]] - .map { + .flatMapConcat { _.prefixAndTail(1) .filter(_._1.nonEmpty) .map { case (prefix, tail) ⇒ (prefix.head, tail) } } - .flattenConcat() private[http] def printEvent[T](marker: String): Flow[T, T, Unit] = Flow[T].transform(() ⇒ new PushPullStage[T, T] { diff --git a/akka-http-core/src/main/scala/akka/http/scaladsl/model/Multipart.scala b/akka-http-core/src/main/scala/akka/http/scaladsl/model/Multipart.scala index a58d8e13df..cff907f653 100644 --- a/akka-http-core/src/main/scala/akka/http/scaladsl/model/Multipart.scala +++ b/akka-http-core/src/main/scala/akka/http/scaladsl/model/Multipart.scala @@ -7,6 +7,7 @@ package akka.http.scaladsl.model import java.io.File import akka.event.{ NoLogging, LoggingAdapter } +import akka.stream.impl.ConstantFun import scala.collection.immutable.VectorBuilder import scala.concurrent.duration.FiniteDuration @@ -40,7 +41,7 @@ sealed trait Multipart { val chunks = parts .transform(() ⇒ BodyPartRenderer.streamed(boundary, charset.nioCharset, partHeadersSizeHint = 128, log)) - .flattenConcat() + .flatMapConcat(ConstantFun.scalaIdentityFunction) HttpEntity.Chunked(mediaType withBoundary boundary, chunks) } } diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/parsing/RequestParserSpec.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/parsing/RequestParserSpec.scala index ad2ed0f687..4656912798 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/engine/parsing/RequestParserSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/parsing/RequestParserSpec.scala @@ -531,7 +531,7 @@ class RequestParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll { Right(HttpRequest(method, uri, headers, createEntity(entityParts), protocol)) case (x @ (MessageStartError(_, _) | EntityStreamError(_)), _) ⇒ Left(x) } - .map { x ⇒ + .flatMapConcat { x ⇒ Source { x match { case Right(request) ⇒ compactEntity(request.entity).fast.map(x ⇒ Right(request.withEntity(x))) @@ -539,7 +539,6 @@ class RequestParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll { } } } - .flattenConcat() .map(strictEqualify) .grouped(100000).runWith(Sink.head) .awaitResult(awaitAtMost) diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/FlattenTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/FlattenTest.scala index 1fa849f442..94fe7192be 100644 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/FlattenTest.scala +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/FlattenTest.scala @@ -3,6 +3,7 @@ */ package akka.stream.tck +import akka.stream.impl.ConstantFun import akka.stream.scaladsl.Sink import akka.stream.scaladsl.Source import org.reactivestreams.Publisher @@ -12,7 +13,7 @@ class FlattenTest extends AkkaPublisherVerification[Int] { def createPublisher(elements: Long): Publisher[Int] = { val s1 = Source(iterable(elements / 2)) val s2 = Source(iterable((elements + 1) / 2)) - Source(List(s1, s2)).flattenConcat().runWith(Sink.publisher) + Source(List(s1, s2)).flatMapConcat(ConstantFun.scalaIdentityFunction).runWith(Sink.publisher) } } diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java index b309b5c56e..8c47d6ac1c 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java @@ -10,6 +10,7 @@ import akka.japi.JavaPartialFunction; import akka.japi.Pair; import akka.japi.function.*; import akka.stream.*; +import akka.stream.impl.ConstantFun; import akka.stream.javadsl.FlowGraph.Builder; import akka.stream.stage.*; import akka.stream.testkit.AkkaSpec; @@ -481,12 +482,12 @@ public class FlowTest extends StreamTest { final Iterable input1 = Arrays.asList(1, 2, 3); final Iterable input2 = Arrays.asList(4, 5); - final List> mainInputs = new ArrayList>(); + final List> mainInputs = new ArrayList>(); mainInputs.add(Source.from(input1)); mainInputs.add(Source.from(input2)); - final Flow, List, BoxedUnit> flow = Flow.>create(). - flattenConcat().grouped(6); + final Flow, List, ?> flow = Flow.>create(). + flatMapConcat(ConstantFun.>javaIdentityFunction()).grouped(6); Future> future = Source.from(mainInputs).via(flow) .runWith(Sink.>head(), materializer); diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java index 2f19c93dd5..d41715bc9c 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java @@ -15,6 +15,7 @@ import akka.stream.Graph; import akka.stream.OverflowStrategy; import akka.stream.StreamTest; import akka.stream.UniformFanInShape; +import akka.stream.impl.ConstantFun; import akka.stream.stage.*; import akka.stream.testkit.AkkaSpec; import akka.stream.testkit.TestPublisher; @@ -348,12 +349,13 @@ public class SourceTest extends StreamTest { final Iterable input1 = Arrays.asList(1, 2, 3); final Iterable input2 = Arrays.asList(4, 5); - final List> mainInputs = new ArrayList>(); + final List> mainInputs = new ArrayList>(); mainInputs.add(Source.from(input1)); mainInputs.add(Source.from(input2)); Future> future = Source.from(mainInputs) - .flattenConcat().grouped(6) + .flatMapConcat(ConstantFun.>javaIdentityFunction()) + .grouped(6) .runWith(Sink.>head(), materializer); List result = Await.result(future, probe.dilated(FiniteDuration.create(3, TimeUnit.SECONDS))); diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConcatAllSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConcatAllSpec.scala index 9789071563..4fcdd95652 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConcatAllSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConcatAllSpec.scala @@ -3,6 +3,8 @@ */ package akka.stream.scaladsl +import akka.stream.impl.ConstantFun + import scala.concurrent.duration._ import scala.util.control.NoStackTrace import akka.stream.ActorMaterializer @@ -31,7 +33,7 @@ class FlowConcatAllSpec extends AkkaSpec { val main = Source(List(s1, s2, s3, s4, s5)) val subscriber = TestSubscriber.manualProbe[Int]() - main.flattenConcat().to(Sink(subscriber)).run() + main.flatMapConcat(ConstantFun.scalaIdentityFunction).to(Sink(subscriber)).run() val subscription = subscriber.expectSubscription() subscription.request(10) for (i ← 1 to 10) @@ -42,7 +44,7 @@ class FlowConcatAllSpec extends AkkaSpec { "work together with SplitWhen" in { val subscriber = TestSubscriber.manualProbe[Int]() - Source(1 to 10).splitWhen(_ % 2 == 0).flattenConcat().runWith(Sink(subscriber)) + Source(1 to 10).splitWhen(_ % 2 == 0).flatMapConcat(ConstantFun.scalaIdentityFunction).runWith(Sink(subscriber)) val subscription = subscriber.expectSubscription() subscription.request(10) for (i ← (1 to 10)) @@ -54,7 +56,7 @@ class FlowConcatAllSpec extends AkkaSpec { "on onError on master stream cancel the current open substream and signal error" in assertAllStagesStopped { val publisher = TestPublisher.manualProbe[Source[Int, _]]() val subscriber = TestSubscriber.manualProbe[Int]() - Source(publisher).flattenConcat().to(Sink(subscriber)).run() + Source(publisher).flatMapConcat(ConstantFun.scalaIdentityFunction).to(Sink(subscriber)).run() val upstream = publisher.expectSubscription() val downstream = subscriber.expectSubscription() @@ -74,7 +76,7 @@ class FlowConcatAllSpec extends AkkaSpec { "on onError on master stream cancel the currently opening substream and signal error" in assertAllStagesStopped { val publisher = TestPublisher.manualProbe[Source[Int, _]]() val subscriber = TestSubscriber.manualProbe[Int]() - Source(publisher).flattenConcat().to(Sink(subscriber)).run() + Source(publisher).flatMapConcat(ConstantFun.scalaIdentityFunction).to(Sink(subscriber)).run() val upstream = publisher.expectSubscription() val downstream = subscriber.expectSubscription() @@ -94,10 +96,27 @@ class FlowConcatAllSpec extends AkkaSpec { subUpstream.expectCancellation() } + "on onError on opening substream, cancel the master stream and signal error " in assertAllStagesStopped { + val publisher = TestPublisher.manualProbe[Source[Int, _]]() + val subscriber = TestSubscriber.manualProbe[Int]() + Source(publisher).flatMapConcat(_ ⇒ throw testException).to(Sink(subscriber)).run() + + val upstream = publisher.expectSubscription() + val downstream = subscriber.expectSubscription() + downstream.request(1000) + + val substreamPublisher = TestPublisher.manualProbe[Int]() + val substreamFlow = Source(substreamPublisher) + upstream.expectRequest() + upstream.sendNext(substreamFlow) + subscriber.expectError(testException) + upstream.expectCancellation() + } + "on onError on open substream, cancel the master stream and signal error " in assertAllStagesStopped { val publisher = TestPublisher.manualProbe[Source[Int, _]]() val subscriber = TestSubscriber.manualProbe[Int]() - Source(publisher).flattenConcat().to(Sink(subscriber)).run() + Source(publisher).flatMapConcat(ConstantFun.scalaIdentityFunction).to(Sink(subscriber)).run() val upstream = publisher.expectSubscription() val downstream = subscriber.expectSubscription() @@ -117,7 +136,7 @@ class FlowConcatAllSpec extends AkkaSpec { "on cancellation cancel the current open substream and the master stream" in assertAllStagesStopped { val publisher = TestPublisher.manualProbe[Source[Int, _]]() val subscriber = TestSubscriber.manualProbe[Int]() - Source(publisher).flattenConcat().to(Sink(subscriber)).run() + Source(publisher).flatMapConcat(ConstantFun.scalaIdentityFunction).to(Sink(subscriber)).run() val upstream = publisher.expectSubscription() val downstream = subscriber.expectSubscription() @@ -138,7 +157,7 @@ class FlowConcatAllSpec extends AkkaSpec { "on cancellation cancel the currently opening substream and the master stream" in assertAllStagesStopped { val publisher = TestPublisher.manualProbe[Source[Int, _]]() val subscriber = TestSubscriber.manualProbe[Int]() - Source(publisher).flattenConcat().to(Sink(subscriber)).run() + Source(publisher).flatMapConcat(ConstantFun.scalaIdentityFunction).to(Sink(subscriber)).run() val upstream = publisher.expectSubscription() val downstream = subscriber.expectSubscription() @@ -162,7 +181,11 @@ class FlowConcatAllSpec extends AkkaSpec { val up = TestPublisher.manualProbe[Source[Int, _]]() val down = TestSubscriber.manualProbe[Int]() - val flowSubscriber = Source.subscriber[Source[Int, _]].flattenConcat().to(Sink(down)).run() + val flowSubscriber = Source + .subscriber[Source[Int, _]] + .flatMapConcat(ConstantFun.scalaIdentityFunction) + .to(Sink(down)) + .run() val downstream = down.expectSubscription() downstream.cancel() diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala index 539d0ab787..84a977cfbd 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala @@ -255,7 +255,7 @@ private[akka] object ActorProcessorFactory { case GroupBy(f, _) ⇒ (GroupByProcessorImpl.props(settings, f), ()) case PrefixAndTail(n, _) ⇒ (PrefixAndTailImpl.props(settings, n), ()) case Split(d, _) ⇒ (SplitWhereProcessorImpl.props(settings, d), ()) - case ConcatAll(_) ⇒ (ConcatAllImpl.props(materializer), ()) + case ConcatAll(f, _) ⇒ (ConcatAllImpl.props(f, materializer), ()) case DirectProcessor(p, m) ⇒ throw new AssertionError("DirectProcessor cannot end up in ActorProcessorFactory") } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/ConcatAllImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/ConcatAllImpl.scala index b910184483..abbccb03a3 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ConcatAllImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ConcatAllImpl.scala @@ -4,28 +4,27 @@ package akka.stream.impl import akka.stream.ActorMaterializer -import akka.stream.scaladsl.Sink +import akka.stream.scaladsl.{ Source, Sink } import akka.actor.{ Deploy, Props } /** * INTERNAL API */ private[akka] object ConcatAllImpl { - def props(materializer: ActorMaterializer): Props = - Props(new ConcatAllImpl(materializer)).withDeploy(Deploy.local) + def props(f: Any ⇒ Source[Any, _], materializer: ActorMaterializer): Props = + Props(new ConcatAllImpl(f, materializer)).withDeploy(Deploy.local) } /** * INTERNAL API */ -private[akka] class ConcatAllImpl(materializer: ActorMaterializer) +private[akka] class ConcatAllImpl(f: Any ⇒ Source[Any, _], materializer: ActorMaterializer) extends MultiStreamInputProcessor(materializer.settings) { import akka.stream.impl.MultiStreamInputProcessor._ val takeNextSubstream = TransferPhase(primaryInputs.NeedsInput && primaryOutputs.NeedsDemand) { () ⇒ - val Extract.Source(source) = primaryInputs.dequeueInputElement() - val publisher = source.runWith(Sink.publisher)(materializer) + val publisher = f(primaryInputs.dequeueInputElement()).runWith(Sink.publisher)(materializer) // FIXME we can pass the flow to createSubstreamInput (but avoiding copy impl now) val inputs = createAndSubscribeSubstreamInput(publisher) nextPhase(streamSubstream(inputs)) diff --git a/akka-stream/src/main/scala/akka/stream/impl/ConstantFun.scala b/akka-stream/src/main/scala/akka/stream/impl/ConstantFun.scala index f65bec90b6..ee4d0dcd47 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ConstantFun.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ConstantFun.scala @@ -6,7 +6,7 @@ package akka.stream.impl import akka.japi.function.{ Function ⇒ JFun, Function2 ⇒ JFun2 } import akka.japi.{ Pair ⇒ JPair } -private[stream] object ConstantFun { +private[akka] object ConstantFun { private[this] val JavaIdentityFunction = new JFun[Any, Any] { @throws(classOf[Exception]) override def apply(param: Any): Any = param } diff --git a/akka-stream/src/main/scala/akka/stream/impl/Extract.scala b/akka-stream/src/main/scala/akka/stream/impl/Extract.scala deleted file mode 100644 index 1cce40f847..0000000000 --- a/akka-stream/src/main/scala/akka/stream/impl/Extract.scala +++ /dev/null @@ -1,32 +0,0 @@ -/** - * Copyright (C) 2014 Typesafe Inc. - */ -package akka.stream.impl - -import akka.stream.{ scaladsl, javadsl } - -/** - * INTERNAL API - * - * Unapply methods aware of both DSLs. - * Use these instead of manually casting to [[scaladsl.Source]]. - */ -private[akka] object Extract { - - object Source { - def unapply(a: Any): Option[scaladsl.Source[Any, _]] = a match { - case s: scaladsl.Source[_, _] ⇒ Some(s) - case s: javadsl.Source[_, _] ⇒ Some(s.asScala) - case _ ⇒ None - } - } - - object Sink { - def unapply(a: Any): Option[scaladsl.Sink[Nothing, _]] = a match { - case s: scaladsl.Sink[_, _] ⇒ Some(s) - case s: javadsl.Sink[_, _] ⇒ Some(s.asScala) - case _ ⇒ None - } - } - -} diff --git a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala index bc1196ba6d..6a0e9d9fcc 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala @@ -10,6 +10,7 @@ import akka.stream.Supervision.Decider import akka.stream._ import akka.stream.impl.SplitDecision.{ Continue, SplitAfter, SplitBefore, SplitDecision } import akka.stream.impl.StreamLayout._ +import akka.stream.scaladsl.Source import akka.stream.stage.AbstractStage.PushPullGraphStage import akka.stream.stage.Stage import org.reactivestreams.Processor @@ -232,7 +233,7 @@ private[stream] object Stages { def after(f: Any ⇒ Boolean) = Split(el ⇒ if (f(el)) SplitAfter else Continue, name("splitAfter")) } - final case class ConcatAll(attributes: Attributes = concatAll) extends StageModule { + final case class ConcatAll(f: Any ⇒ Source[Any, _], attributes: Attributes = concatAll) extends StageModule { override def withAttributes(attributes: Attributes) = copy(attributes = attributes) } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 513ceb285c..3d7bce587a 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -839,8 +839,10 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends new Flow(delegate.splitAfter(p.test).map(_.asJava)) /** - * Transforms a stream of streams into a contiguous stream of elements using the provided flattening strategy. - * This operation can be used on a stream of element type `Source[U]`. + * Transform each input element into a `Source` of output elements that is + * then flattened into the output stream by concatenation, + * fully consuming one Source after the other. + * * '''Emits when''' a currently consumed substream has an element available * * '''Backpressures when''' downstream backpressures @@ -850,8 +852,8 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends * '''Cancels when''' downstream cancels * */ - def flattenConcat[U](): javadsl.Flow[In, U, Mat] = - new Flow(delegate.flattenConcat[U]()(conforms[U].asInstanceOf[Out <:< scaladsl.Source[U, _]])) + def flatMapConcat[T](f: function.Function[Out, Source[T, _]]): Flow[In, T, Mat] = + new Flow(delegate.flatMapConcat[T](x ⇒ f(x).asScala)) /** * Concatenate the given [[Source]] to this [[Flow]], meaning that once this diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 68d96219fa..4a54467dd2 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -840,11 +840,21 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap new Source(delegate.splitAfter(p.test).map(_.asJava)) /** - * Transforms a stream of streams into a contiguous stream of elements using the provided flattening strategy. - * This operation can be used on a stream of element type `Source[U]`. + * Transform each input element into a `Source` of output elements that is + * then flattened into the output stream by concatenation, + * fully consuming one Source after the other. + * + * '''Emits when''' a currently consumed substream has an element available + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes and all consumed substreams complete + * + * '''Cancels when''' downstream cancels + * */ - def flattenConcat[U](): javadsl.Source[U, Mat] = - new Source(delegate.flattenConcat[U]()(conforms[U].asInstanceOf[Out <:< scaladsl.Source[U, _]])) + def flatMapConcat[T](f: function.Function[Out, Source[T, _]]): Source[T, Mat] = + new Source(delegate.flatMapConcat[T](x ⇒ f(x).asScala)) /** * If the first element has not passed through this stage before the provided timeout, the stream is failed diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 3626101073..652744f6c7 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -995,7 +995,9 @@ trait FlowOps[+Out, +Mat] { deprecatedAndThen(Split.after(p.asInstanceOf[Any ⇒ Boolean])) /** - * Flattens a stream of [[Source]]s into a contiguous stream by fully consuming one stream after the other. + * Transform each input element into a `Source` of output elements that is + * then flattened into the output stream by concatenation, + * fully consuming one Source after the other. * * '''Emits when''' a currently consumed substream has an element available * @@ -1006,7 +1008,8 @@ trait FlowOps[+Out, +Mat] { * '''Cancels when''' downstream cancels * */ - def flattenConcat[U]()(implicit ev: Out <:< Source[U, _]): Repr[U, Mat] = deprecatedAndThen(ConcatAll()) + def flatMapConcat[T](f: Out ⇒ Source[T, _]): Repr[T, Mat] = + deprecatedAndThen(ConcatAll(f.asInstanceOf[Any ⇒ Source[Any, _]])) /** * If the first element has not passed through this stage before the provided timeout, the stream is failed