diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/client/OutgoingConnectionBlueprint.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/client/OutgoingConnectionBlueprint.scala index 9a93592426..950ef41e14 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/client/OutgoingConnectionBlueprint.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/client/OutgoingConnectionBlueprint.scala @@ -61,7 +61,7 @@ private[http] object OutgoingConnectionBlueprint { val requestRendering: Flow[HttpRequest, ByteString, Unit] = Flow[HttpRequest] .map(RequestRenderingContext(_, hostHeader)) .via(Flow[RequestRenderingContext].map(requestRendererFactory.renderToSource).named("renderer")) - .flatten(FlattenStrategy.concat) + .flattenConcat() val methodBypass = Flow[HttpRequest].map(_.method) diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala index 7b8806b52b..ea12116193 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala @@ -108,7 +108,7 @@ private[http] object HttpServerBluePrint { Flow[ResponseRenderingContext] .via(Flow[ResponseRenderingContext].transform(() ⇒ new ErrorsTo500ResponseRecovery(log)).named("recover")) // FIXME: simplify after #16394 is closed .via(Flow[ResponseRenderingContext].transform(() ⇒ responseRendererFactory.newRenderer).named("renderer")) - .flatten(FlattenStrategy.concat) + .flattenConcat() .via(Flow[ResponseRenderingOutput].transform(() ⇒ errorLogger(log, "Outgoing response stream error")).named("errorLogger")) BidiFlow.fromGraph(FlowGraph.create(requestParsingFlow, rendererPipeline, oneHundredContinueSource)((_, _, _) ⇒ ()) { implicit b ⇒ diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/ws/MessageToFrameRenderer.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/ws/MessageToFrameRenderer.scala index 97887d5a7f..707db8a92b 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/ws/MessageToFrameRenderer.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/ws/MessageToFrameRenderer.scala @@ -5,7 +5,7 @@ package akka.http.impl.engine.ws import akka.util.ByteString -import akka.stream.scaladsl.{ FlattenStrategy, Source, Flow } +import akka.stream.scaladsl.{ Source, Flow } import Protocol.Opcode import akka.http.scaladsl.model.ws._ @@ -32,6 +32,6 @@ private[http] object MessageToFrameRenderer { case bm: BinaryMessage ⇒ streamedFrames(Opcode.Binary, bm.dataStream) case TextMessage.Strict(text) ⇒ strictFrames(Opcode.Text, ByteString(text, "UTF-8")) case tm: TextMessage ⇒ streamedFrames(Opcode.Text, tm.textStream.transform(() ⇒ new Utf8Encoder)) - }.flatten(FlattenStrategy.concat) + }.flattenConcat() } } diff --git a/akka-http-core/src/main/scala/akka/http/impl/util/package.scala b/akka-http-core/src/main/scala/akka/http/impl/util/package.scala index dc10cfea7f..c2acb0aae7 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/util/package.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/util/package.scala @@ -11,7 +11,7 @@ import language.higherKinds import java.nio.charset.Charset import java.util.concurrent.atomic.AtomicInteger import com.typesafe.config.Config -import akka.stream.scaladsl.{ FlattenStrategy, Flow, Source } +import akka.stream.scaladsl.{ Flow, Source } import akka.stream.stage._ import scala.concurrent.duration.Duration import scala.concurrent.{ Await, Future } @@ -54,7 +54,7 @@ package object util { .filter(_._1.nonEmpty) .map { case (prefix, tail) ⇒ (prefix.head, tail) } } - .flatten(FlattenStrategy.concat) + .flattenConcat() private[http] def printEvent[T](marker: String): Flow[T, T, Unit] = Flow[T].transform(() ⇒ new PushPullStage[T, T] { diff --git a/akka-http-core/src/main/scala/akka/http/scaladsl/model/Multipart.scala b/akka-http-core/src/main/scala/akka/http/scaladsl/model/Multipart.scala index 530546a8e6..a58d8e13df 100644 --- a/akka-http-core/src/main/scala/akka/http/scaladsl/model/Multipart.scala +++ b/akka-http-core/src/main/scala/akka/http/scaladsl/model/Multipart.scala @@ -15,7 +15,7 @@ import scala.collection.immutable import scala.util.{ Failure, Success, Try } import akka.stream.Materializer import akka.stream.io.SynchronousFileSource -import akka.stream.scaladsl.{ FlattenStrategy, Source } +import akka.stream.scaladsl.{ Source } import akka.http.scaladsl.util.FastFuture import akka.http.scaladsl.model.headers._ import akka.http.impl.engine.rendering.BodyPartRenderer @@ -40,7 +40,7 @@ sealed trait Multipart { val chunks = parts .transform(() ⇒ BodyPartRenderer.streamed(boundary, charset.nioCharset, partHeadersSizeHint = 128, log)) - .flatten(FlattenStrategy.concat) + .flattenConcat() HttpEntity.Chunked(mediaType withBoundary boundary, chunks) } } diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/parsing/RequestParserSpec.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/parsing/RequestParserSpec.scala index 2640c995d0..ad2ed0f687 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/engine/parsing/RequestParserSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/parsing/RequestParserSpec.scala @@ -19,7 +19,7 @@ import akka.http.scaladsl.model.headers._ import akka.http.scaladsl.util.FastFuture import akka.http.scaladsl.util.FastFuture._ import akka.stream.ActorMaterializer -import akka.stream.scaladsl.{ FlattenStrategy, _ } +import akka.stream.scaladsl._ import akka.util.ByteString import com.typesafe.config.{ Config, ConfigFactory } import org.scalatest.matchers.Matcher @@ -539,7 +539,7 @@ class RequestParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll { } } } - .flatten(FlattenStrategy.concat) + .flattenConcat() .map(strictEqualify) .grouped(100000).runWith(Sink.head) .awaitResult(awaitAtMost) diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/FlattenTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/FlattenTest.scala index 15510414fd..1fa849f442 100644 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/FlattenTest.scala +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/FlattenTest.scala @@ -3,7 +3,6 @@ */ package akka.stream.tck -import akka.stream.scaladsl.FlattenStrategy import akka.stream.scaladsl.Sink import akka.stream.scaladsl.Source import org.reactivestreams.Publisher @@ -13,7 +12,7 @@ class FlattenTest extends AkkaPublisherVerification[Int] { def createPublisher(elements: Long): Publisher[Int] = { val s1 = Source(iterable(elements / 2)) val s2 = Source(iterable((elements + 1) / 2)) - Source(List(s1, s2)).flatten(FlattenStrategy.concat).runWith(Sink.publisher) + Source(List(s1, s2)).flattenConcat().runWith(Sink.publisher) } } diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java index db6c3d33e4..4ad1b7cffc 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java @@ -484,7 +484,7 @@ public class FlowTest extends StreamTest { mainInputs.add(Source.from(input2)); final Flow, List, BoxedUnit> flow = Flow.>create(). - flatten(akka.stream.javadsl.FlattenStrategy. concat()).grouped(6); + flattenConcat().grouped(6); Future> future = Source.from(mainInputs).via(flow) .runWith(Sink.>head(), materializer); diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java index 1f70e19d3a..80b3c301a3 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java @@ -351,7 +351,7 @@ public class SourceTest extends StreamTest { mainInputs.add(Source.from(input2)); Future> future = Source.from(mainInputs) - .flatten(akka.stream.javadsl.FlattenStrategy.concat()).grouped(6) + .flattenConcat().grouped(6) .runWith(Sink.>head(), materializer); List result = Await.result(future, probe.dilated(FiniteDuration.create(3, TimeUnit.SECONDS))); diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConcatAllSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConcatAllSpec.scala index 9aa23f3a8b..9789071563 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConcatAllSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConcatAllSpec.scala @@ -31,7 +31,7 @@ class FlowConcatAllSpec extends AkkaSpec { val main = Source(List(s1, s2, s3, s4, s5)) val subscriber = TestSubscriber.manualProbe[Int]() - main.flatten(FlattenStrategy.concat).to(Sink(subscriber)).run() + main.flattenConcat().to(Sink(subscriber)).run() val subscription = subscriber.expectSubscription() subscription.request(10) for (i ← 1 to 10) @@ -42,7 +42,7 @@ class FlowConcatAllSpec extends AkkaSpec { "work together with SplitWhen" in { val subscriber = TestSubscriber.manualProbe[Int]() - Source(1 to 10).splitWhen(_ % 2 == 0).flatten(FlattenStrategy.concat).runWith(Sink(subscriber)) + Source(1 to 10).splitWhen(_ % 2 == 0).flattenConcat().runWith(Sink(subscriber)) val subscription = subscriber.expectSubscription() subscription.request(10) for (i ← (1 to 10)) @@ -54,7 +54,7 @@ class FlowConcatAllSpec extends AkkaSpec { "on onError on master stream cancel the current open substream and signal error" in assertAllStagesStopped { val publisher = TestPublisher.manualProbe[Source[Int, _]]() val subscriber = TestSubscriber.manualProbe[Int]() - Source(publisher).flatten(FlattenStrategy.concat).to(Sink(subscriber)).run() + Source(publisher).flattenConcat().to(Sink(subscriber)).run() val upstream = publisher.expectSubscription() val downstream = subscriber.expectSubscription() @@ -74,7 +74,7 @@ class FlowConcatAllSpec extends AkkaSpec { "on onError on master stream cancel the currently opening substream and signal error" in assertAllStagesStopped { val publisher = TestPublisher.manualProbe[Source[Int, _]]() val subscriber = TestSubscriber.manualProbe[Int]() - Source(publisher).flatten(FlattenStrategy.concat).to(Sink(subscriber)).run() + Source(publisher).flattenConcat().to(Sink(subscriber)).run() val upstream = publisher.expectSubscription() val downstream = subscriber.expectSubscription() @@ -97,7 +97,7 @@ class FlowConcatAllSpec extends AkkaSpec { "on onError on open substream, cancel the master stream and signal error " in assertAllStagesStopped { val publisher = TestPublisher.manualProbe[Source[Int, _]]() val subscriber = TestSubscriber.manualProbe[Int]() - Source(publisher).flatten(FlattenStrategy.concat).to(Sink(subscriber)).run() + Source(publisher).flattenConcat().to(Sink(subscriber)).run() val upstream = publisher.expectSubscription() val downstream = subscriber.expectSubscription() @@ -117,7 +117,7 @@ class FlowConcatAllSpec extends AkkaSpec { "on cancellation cancel the current open substream and the master stream" in assertAllStagesStopped { val publisher = TestPublisher.manualProbe[Source[Int, _]]() val subscriber = TestSubscriber.manualProbe[Int]() - Source(publisher).flatten(FlattenStrategy.concat).to(Sink(subscriber)).run() + Source(publisher).flattenConcat().to(Sink(subscriber)).run() val upstream = publisher.expectSubscription() val downstream = subscriber.expectSubscription() @@ -138,7 +138,7 @@ class FlowConcatAllSpec extends AkkaSpec { "on cancellation cancel the currently opening substream and the master stream" in assertAllStagesStopped { val publisher = TestPublisher.manualProbe[Source[Int, _]]() val subscriber = TestSubscriber.manualProbe[Int]() - Source(publisher).flatten(FlattenStrategy.concat).to(Sink(subscriber)).run() + Source(publisher).flattenConcat().to(Sink(subscriber)).run() val upstream = publisher.expectSubscription() val downstream = subscriber.expectSubscription() @@ -162,7 +162,7 @@ class FlowConcatAllSpec extends AkkaSpec { val up = TestPublisher.manualProbe[Source[Int, _]]() val down = TestSubscriber.manualProbe[Int]() - val flowSubscriber = Source.subscriber[Source[Int, _]].flatten(FlattenStrategy.concat).to(Sink(down)).run() + val flowSubscriber = Source.subscriber[Source[Int, _]].flattenConcat().to(Sink(down)).run() val downstream = down.expectSubscription() downstream.cancel() diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/FlattenStrategy.scala b/akka-stream/src/main/scala/akka/stream/javadsl/FlattenStrategy.scala deleted file mode 100644 index 5bcd43588c..0000000000 --- a/akka-stream/src/main/scala/akka/stream/javadsl/FlattenStrategy.scala +++ /dev/null @@ -1,25 +0,0 @@ -/** - * Copyright (C) 2014 Typesafe Inc. - */ -package akka.stream.javadsl - -import akka.stream.scaladsl - -/** - * Strategy that defines how a stream of streams should be flattened into a stream of simple elements. - */ -abstract class FlattenStrategy[-S, T] extends scaladsl.FlattenStrategy[S, T] - -object FlattenStrategy { - - /** - * Strategy that flattens a stream of streams by concatenating them. This means taking an incoming stream - * emitting its elements directly to the output until it completes and then taking the next stream. This has the - * consequence that if one of the input stream is infinite, no other streams after that will be consumed from. - */ - def concat[T, U]: FlattenStrategy[Source[T, U], T] = Concat.asInstanceOf[FlattenStrategy[Source[T, U], T]] - /** - * INTERNAL API - */ - private[akka] final case object Concat extends FlattenStrategy[Any, Nothing] -} diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 1481f1a291..701d05e91b 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -839,9 +839,8 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends /** * Transforms a stream of streams into a contiguous stream of elements using the provided flattening strategy. - * This operation can be used on a stream of element type [[Source]]. - * - * '''Emits when''' (Concat) the current consumed substream has an element available + * This operation can be used on a stream of element type `Source[U]`. + * '''Emits when''' a currently consumed substream has an element available * * '''Backpressures when''' downstream backpressures * @@ -850,8 +849,8 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends * '''Cancels when''' downstream cancels * */ - def flatten[U](strategy: FlattenStrategy[Out, U]): javadsl.Flow[In, U, Mat] = - new Flow(delegate.flatten(strategy)) + def flattenConcat[U](): javadsl.Flow[In, U, Mat] = + new Flow(delegate.flattenConcat[U]()(conforms[U].asInstanceOf[Out <:< scaladsl.Source[U, _]])) /** * Concatenate the given [[Source]] to this [[Flow]], meaning that once this diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index dba1b1c144..924bcbd799 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -841,10 +841,10 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap /** * Transforms a stream of streams into a contiguous stream of elements using the provided flattening strategy. - * This operation can be used on a stream of element type [[Source]]. + * This operation can be used on a stream of element type `Source[U]`. */ - def flatten[U](strategy: FlattenStrategy[Out, U]): javadsl.Source[U, Mat] = - new Source(delegate.flatten(strategy)) + def flattenConcat[U](): javadsl.Source[U, Mat] = + new Source(delegate.flattenConcat[U]()(conforms[U].asInstanceOf[Out <:< scaladsl.Source[U, _]])) /** * If the first element has not passed through this stage before the provided timeout, the stream is failed diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/FlattenStrategy.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/FlattenStrategy.scala deleted file mode 100644 index 8ff8a1ca67..0000000000 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/FlattenStrategy.scala +++ /dev/null @@ -1,21 +0,0 @@ -/** - * Copyright (C) 2009-2014 Typesafe Inc. - */ -package akka.stream.scaladsl - -/** - * Strategy that defines how a stream of streams should be flattened into a stream of simple elements. - */ -abstract class FlattenStrategy[-S, +T] - -object FlattenStrategy { - - /** - * Strategy that flattens a stream of streams by concatenating them. This means taking an incoming stream - * emitting its elements directly to the output until it completes and then taking the next stream. This has the - * consequence that if one of the input stream is infinite, no other streams after that will be consumed from. - */ - def concat[T]: FlattenStrategy[Source[T, Any], T] = Concat.asInstanceOf[FlattenStrategy[Source[T, Any], T]] - - private[akka] final case object Concat extends FlattenStrategy[Any, Nothing] -} diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 64babe0375..9fe04c863e 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -1000,10 +1000,9 @@ trait FlowOps[+Out, +Mat] { } /** - * Transforms a stream of streams into a contiguous stream of elements using the provided flattening strategy. - * This operation can be used on a stream of element type [[akka.stream.scaladsl.Source]]. + * Flattens a stream of [[Source]]s into a contiguous stream by fully consuming one stream after the other. * - * '''Emits when''' (Concat) the current consumed substream has an element available + * '''Emits when''' a currently consumed substream has an element available * * '''Backpressures when''' downstream backpressures * @@ -1012,11 +1011,7 @@ trait FlowOps[+Out, +Mat] { * '''Cancels when''' downstream cancels * */ - def flatten[U](strategy: FlattenStrategy[Out, U]): Repr[U, Mat] = strategy match { - case scaladsl.FlattenStrategy.Concat | javadsl.FlattenStrategy.Concat ⇒ deprecatedAndThen(ConcatAll()) - case _ ⇒ - throw new IllegalArgumentException(s"Unsupported flattening strategy [${strategy.getClass.getName}]") - } + def flattenConcat[U]()(implicit ev: Out <:< Source[U, _]): Repr[U, Mat] = deprecatedAndThen(ConcatAll()) /** * If the first element has not passed through this stage before the provided timeout, the stream is failed