-str - #18806 - Replacing flatten(FlattenStrategy) with flattenConcat

This commit is contained in:
Viktor Klang 2015-10-29 16:55:03 +01:00
parent 06ce968b16
commit ce10456804
15 changed files with 31 additions and 84 deletions

View file

@ -61,7 +61,7 @@ private[http] object OutgoingConnectionBlueprint {
val requestRendering: Flow[HttpRequest, ByteString, Unit] = Flow[HttpRequest] val requestRendering: Flow[HttpRequest, ByteString, Unit] = Flow[HttpRequest]
.map(RequestRenderingContext(_, hostHeader)) .map(RequestRenderingContext(_, hostHeader))
.via(Flow[RequestRenderingContext].map(requestRendererFactory.renderToSource).named("renderer")) .via(Flow[RequestRenderingContext].map(requestRendererFactory.renderToSource).named("renderer"))
.flatten(FlattenStrategy.concat) .flattenConcat()
val methodBypass = Flow[HttpRequest].map(_.method) val methodBypass = Flow[HttpRequest].map(_.method)

View file

@ -108,7 +108,7 @@ private[http] object HttpServerBluePrint {
Flow[ResponseRenderingContext] Flow[ResponseRenderingContext]
.via(Flow[ResponseRenderingContext].transform(() new ErrorsTo500ResponseRecovery(log)).named("recover")) // FIXME: simplify after #16394 is closed .via(Flow[ResponseRenderingContext].transform(() new ErrorsTo500ResponseRecovery(log)).named("recover")) // FIXME: simplify after #16394 is closed
.via(Flow[ResponseRenderingContext].transform(() responseRendererFactory.newRenderer).named("renderer")) .via(Flow[ResponseRenderingContext].transform(() responseRendererFactory.newRenderer).named("renderer"))
.flatten(FlattenStrategy.concat) .flattenConcat()
.via(Flow[ResponseRenderingOutput].transform(() errorLogger(log, "Outgoing response stream error")).named("errorLogger")) .via(Flow[ResponseRenderingOutput].transform(() errorLogger(log, "Outgoing response stream error")).named("errorLogger"))
BidiFlow.fromGraph(FlowGraph.create(requestParsingFlow, rendererPipeline, oneHundredContinueSource)((_, _, _) ()) { implicit b BidiFlow.fromGraph(FlowGraph.create(requestParsingFlow, rendererPipeline, oneHundredContinueSource)((_, _, _) ()) { implicit b

View file

@ -5,7 +5,7 @@
package akka.http.impl.engine.ws package akka.http.impl.engine.ws
import akka.util.ByteString import akka.util.ByteString
import akka.stream.scaladsl.{ FlattenStrategy, Source, Flow } import akka.stream.scaladsl.{ Source, Flow }
import Protocol.Opcode import Protocol.Opcode
import akka.http.scaladsl.model.ws._ import akka.http.scaladsl.model.ws._
@ -32,6 +32,6 @@ private[http] object MessageToFrameRenderer {
case bm: BinaryMessage streamedFrames(Opcode.Binary, bm.dataStream) case bm: BinaryMessage streamedFrames(Opcode.Binary, bm.dataStream)
case TextMessage.Strict(text) strictFrames(Opcode.Text, ByteString(text, "UTF-8")) case TextMessage.Strict(text) strictFrames(Opcode.Text, ByteString(text, "UTF-8"))
case tm: TextMessage streamedFrames(Opcode.Text, tm.textStream.transform(() new Utf8Encoder)) case tm: TextMessage streamedFrames(Opcode.Text, tm.textStream.transform(() new Utf8Encoder))
}.flatten(FlattenStrategy.concat) }.flattenConcat()
} }
} }

View file

@ -11,7 +11,7 @@ import language.higherKinds
import java.nio.charset.Charset import java.nio.charset.Charset
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicInteger
import com.typesafe.config.Config import com.typesafe.config.Config
import akka.stream.scaladsl.{ FlattenStrategy, Flow, Source } import akka.stream.scaladsl.{ Flow, Source }
import akka.stream.stage._ import akka.stream.stage._
import scala.concurrent.duration.Duration import scala.concurrent.duration.Duration
import scala.concurrent.{ Await, Future } import scala.concurrent.{ Await, Future }
@ -54,7 +54,7 @@ package object util {
.filter(_._1.nonEmpty) .filter(_._1.nonEmpty)
.map { case (prefix, tail) (prefix.head, tail) } .map { case (prefix, tail) (prefix.head, tail) }
} }
.flatten(FlattenStrategy.concat) .flattenConcat()
private[http] def printEvent[T](marker: String): Flow[T, T, Unit] = private[http] def printEvent[T](marker: String): Flow[T, T, Unit] =
Flow[T].transform(() new PushPullStage[T, T] { Flow[T].transform(() new PushPullStage[T, T] {

View file

@ -15,7 +15,7 @@ import scala.collection.immutable
import scala.util.{ Failure, Success, Try } import scala.util.{ Failure, Success, Try }
import akka.stream.Materializer import akka.stream.Materializer
import akka.stream.io.SynchronousFileSource import akka.stream.io.SynchronousFileSource
import akka.stream.scaladsl.{ FlattenStrategy, Source } import akka.stream.scaladsl.{ Source }
import akka.http.scaladsl.util.FastFuture import akka.http.scaladsl.util.FastFuture
import akka.http.scaladsl.model.headers._ import akka.http.scaladsl.model.headers._
import akka.http.impl.engine.rendering.BodyPartRenderer import akka.http.impl.engine.rendering.BodyPartRenderer
@ -40,7 +40,7 @@ sealed trait Multipart {
val chunks = val chunks =
parts parts
.transform(() BodyPartRenderer.streamed(boundary, charset.nioCharset, partHeadersSizeHint = 128, log)) .transform(() BodyPartRenderer.streamed(boundary, charset.nioCharset, partHeadersSizeHint = 128, log))
.flatten(FlattenStrategy.concat) .flattenConcat()
HttpEntity.Chunked(mediaType withBoundary boundary, chunks) HttpEntity.Chunked(mediaType withBoundary boundary, chunks)
} }
} }

View file

@ -19,7 +19,7 @@ import akka.http.scaladsl.model.headers._
import akka.http.scaladsl.util.FastFuture import akka.http.scaladsl.util.FastFuture
import akka.http.scaladsl.util.FastFuture._ import akka.http.scaladsl.util.FastFuture._
import akka.stream.ActorMaterializer import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{ FlattenStrategy, _ } import akka.stream.scaladsl._
import akka.util.ByteString import akka.util.ByteString
import com.typesafe.config.{ Config, ConfigFactory } import com.typesafe.config.{ Config, ConfigFactory }
import org.scalatest.matchers.Matcher import org.scalatest.matchers.Matcher
@ -539,7 +539,7 @@ class RequestParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll {
} }
} }
} }
.flatten(FlattenStrategy.concat) .flattenConcat()
.map(strictEqualify) .map(strictEqualify)
.grouped(100000).runWith(Sink.head) .grouped(100000).runWith(Sink.head)
.awaitResult(awaitAtMost) .awaitResult(awaitAtMost)

View file

@ -3,7 +3,6 @@
*/ */
package akka.stream.tck package akka.stream.tck
import akka.stream.scaladsl.FlattenStrategy
import akka.stream.scaladsl.Sink import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source import akka.stream.scaladsl.Source
import org.reactivestreams.Publisher import org.reactivestreams.Publisher
@ -13,7 +12,7 @@ class FlattenTest extends AkkaPublisherVerification[Int] {
def createPublisher(elements: Long): Publisher[Int] = { def createPublisher(elements: Long): Publisher[Int] = {
val s1 = Source(iterable(elements / 2)) val s1 = Source(iterable(elements / 2))
val s2 = Source(iterable((elements + 1) / 2)) val s2 = Source(iterable((elements + 1) / 2))
Source(List(s1, s2)).flatten(FlattenStrategy.concat).runWith(Sink.publisher) Source(List(s1, s2)).flattenConcat().runWith(Sink.publisher)
} }
} }

View file

@ -484,7 +484,7 @@ public class FlowTest extends StreamTest {
mainInputs.add(Source.from(input2)); mainInputs.add(Source.from(input2));
final Flow<Source<Integer, BoxedUnit>, List<Integer>, BoxedUnit> flow = Flow.<Source<Integer, BoxedUnit>>create(). final Flow<Source<Integer, BoxedUnit>, List<Integer>, BoxedUnit> flow = Flow.<Source<Integer, BoxedUnit>>create().
flatten(akka.stream.javadsl.FlattenStrategy.<Integer, BoxedUnit> concat()).grouped(6); <Integer>flattenConcat().grouped(6);
Future<List<Integer>> future = Source.from(mainInputs).via(flow) Future<List<Integer>> future = Source.from(mainInputs).via(flow)
.runWith(Sink.<List<Integer>>head(), materializer); .runWith(Sink.<List<Integer>>head(), materializer);

View file

@ -351,7 +351,7 @@ public class SourceTest extends StreamTest {
mainInputs.add(Source.from(input2)); mainInputs.add(Source.from(input2));
Future<List<Integer>> future = Source.from(mainInputs) Future<List<Integer>> future = Source.from(mainInputs)
.flatten(akka.stream.javadsl.FlattenStrategy.<Integer, BoxedUnit>concat()).grouped(6) .<Integer>flattenConcat().grouped(6)
.runWith(Sink.<List<Integer>>head(), materializer); .runWith(Sink.<List<Integer>>head(), materializer);
List<Integer> result = Await.result(future, probe.dilated(FiniteDuration.create(3, TimeUnit.SECONDS))); List<Integer> result = Await.result(future, probe.dilated(FiniteDuration.create(3, TimeUnit.SECONDS)));

View file

@ -31,7 +31,7 @@ class FlowConcatAllSpec extends AkkaSpec {
val main = Source(List(s1, s2, s3, s4, s5)) val main = Source(List(s1, s2, s3, s4, s5))
val subscriber = TestSubscriber.manualProbe[Int]() val subscriber = TestSubscriber.manualProbe[Int]()
main.flatten(FlattenStrategy.concat).to(Sink(subscriber)).run() main.flattenConcat().to(Sink(subscriber)).run()
val subscription = subscriber.expectSubscription() val subscription = subscriber.expectSubscription()
subscription.request(10) subscription.request(10)
for (i 1 to 10) for (i 1 to 10)
@ -42,7 +42,7 @@ class FlowConcatAllSpec extends AkkaSpec {
"work together with SplitWhen" in { "work together with SplitWhen" in {
val subscriber = TestSubscriber.manualProbe[Int]() val subscriber = TestSubscriber.manualProbe[Int]()
Source(1 to 10).splitWhen(_ % 2 == 0).flatten(FlattenStrategy.concat).runWith(Sink(subscriber)) Source(1 to 10).splitWhen(_ % 2 == 0).flattenConcat().runWith(Sink(subscriber))
val subscription = subscriber.expectSubscription() val subscription = subscriber.expectSubscription()
subscription.request(10) subscription.request(10)
for (i (1 to 10)) for (i (1 to 10))
@ -54,7 +54,7 @@ class FlowConcatAllSpec extends AkkaSpec {
"on onError on master stream cancel the current open substream and signal error" in assertAllStagesStopped { "on onError on master stream cancel the current open substream and signal error" in assertAllStagesStopped {
val publisher = TestPublisher.manualProbe[Source[Int, _]]() val publisher = TestPublisher.manualProbe[Source[Int, _]]()
val subscriber = TestSubscriber.manualProbe[Int]() val subscriber = TestSubscriber.manualProbe[Int]()
Source(publisher).flatten(FlattenStrategy.concat).to(Sink(subscriber)).run() Source(publisher).flattenConcat().to(Sink(subscriber)).run()
val upstream = publisher.expectSubscription() val upstream = publisher.expectSubscription()
val downstream = subscriber.expectSubscription() val downstream = subscriber.expectSubscription()
@ -74,7 +74,7 @@ class FlowConcatAllSpec extends AkkaSpec {
"on onError on master stream cancel the currently opening substream and signal error" in assertAllStagesStopped { "on onError on master stream cancel the currently opening substream and signal error" in assertAllStagesStopped {
val publisher = TestPublisher.manualProbe[Source[Int, _]]() val publisher = TestPublisher.manualProbe[Source[Int, _]]()
val subscriber = TestSubscriber.manualProbe[Int]() val subscriber = TestSubscriber.manualProbe[Int]()
Source(publisher).flatten(FlattenStrategy.concat).to(Sink(subscriber)).run() Source(publisher).flattenConcat().to(Sink(subscriber)).run()
val upstream = publisher.expectSubscription() val upstream = publisher.expectSubscription()
val downstream = subscriber.expectSubscription() val downstream = subscriber.expectSubscription()
@ -97,7 +97,7 @@ class FlowConcatAllSpec extends AkkaSpec {
"on onError on open substream, cancel the master stream and signal error " in assertAllStagesStopped { "on onError on open substream, cancel the master stream and signal error " in assertAllStagesStopped {
val publisher = TestPublisher.manualProbe[Source[Int, _]]() val publisher = TestPublisher.manualProbe[Source[Int, _]]()
val subscriber = TestSubscriber.manualProbe[Int]() val subscriber = TestSubscriber.manualProbe[Int]()
Source(publisher).flatten(FlattenStrategy.concat).to(Sink(subscriber)).run() Source(publisher).flattenConcat().to(Sink(subscriber)).run()
val upstream = publisher.expectSubscription() val upstream = publisher.expectSubscription()
val downstream = subscriber.expectSubscription() val downstream = subscriber.expectSubscription()
@ -117,7 +117,7 @@ class FlowConcatAllSpec extends AkkaSpec {
"on cancellation cancel the current open substream and the master stream" in assertAllStagesStopped { "on cancellation cancel the current open substream and the master stream" in assertAllStagesStopped {
val publisher = TestPublisher.manualProbe[Source[Int, _]]() val publisher = TestPublisher.manualProbe[Source[Int, _]]()
val subscriber = TestSubscriber.manualProbe[Int]() val subscriber = TestSubscriber.manualProbe[Int]()
Source(publisher).flatten(FlattenStrategy.concat).to(Sink(subscriber)).run() Source(publisher).flattenConcat().to(Sink(subscriber)).run()
val upstream = publisher.expectSubscription() val upstream = publisher.expectSubscription()
val downstream = subscriber.expectSubscription() val downstream = subscriber.expectSubscription()
@ -138,7 +138,7 @@ class FlowConcatAllSpec extends AkkaSpec {
"on cancellation cancel the currently opening substream and the master stream" in assertAllStagesStopped { "on cancellation cancel the currently opening substream and the master stream" in assertAllStagesStopped {
val publisher = TestPublisher.manualProbe[Source[Int, _]]() val publisher = TestPublisher.manualProbe[Source[Int, _]]()
val subscriber = TestSubscriber.manualProbe[Int]() val subscriber = TestSubscriber.manualProbe[Int]()
Source(publisher).flatten(FlattenStrategy.concat).to(Sink(subscriber)).run() Source(publisher).flattenConcat().to(Sink(subscriber)).run()
val upstream = publisher.expectSubscription() val upstream = publisher.expectSubscription()
val downstream = subscriber.expectSubscription() val downstream = subscriber.expectSubscription()
@ -162,7 +162,7 @@ class FlowConcatAllSpec extends AkkaSpec {
val up = TestPublisher.manualProbe[Source[Int, _]]() val up = TestPublisher.manualProbe[Source[Int, _]]()
val down = TestSubscriber.manualProbe[Int]() val down = TestSubscriber.manualProbe[Int]()
val flowSubscriber = Source.subscriber[Source[Int, _]].flatten(FlattenStrategy.concat).to(Sink(down)).run() val flowSubscriber = Source.subscriber[Source[Int, _]].flattenConcat().to(Sink(down)).run()
val downstream = down.expectSubscription() val downstream = down.expectSubscription()
downstream.cancel() downstream.cancel()

View file

@ -1,25 +0,0 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.javadsl
import akka.stream.scaladsl
/**
* Strategy that defines how a stream of streams should be flattened into a stream of simple elements.
*/
abstract class FlattenStrategy[-S, T] extends scaladsl.FlattenStrategy[S, T]
object FlattenStrategy {
/**
* Strategy that flattens a stream of streams by concatenating them. This means taking an incoming stream
* emitting its elements directly to the output until it completes and then taking the next stream. This has the
* consequence that if one of the input stream is infinite, no other streams after that will be consumed from.
*/
def concat[T, U]: FlattenStrategy[Source[T, U], T] = Concat.asInstanceOf[FlattenStrategy[Source[T, U], T]]
/**
* INTERNAL API
*/
private[akka] final case object Concat extends FlattenStrategy[Any, Nothing]
}

View file

@ -839,9 +839,8 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
/** /**
* Transforms a stream of streams into a contiguous stream of elements using the provided flattening strategy. * Transforms a stream of streams into a contiguous stream of elements using the provided flattening strategy.
* This operation can be used on a stream of element type [[Source]]. * This operation can be used on a stream of element type `Source[U]`.
* * '''Emits when''' a currently consumed substream has an element available
* '''Emits when''' (Concat) the current consumed substream has an element available
* *
* '''Backpressures when''' downstream backpressures * '''Backpressures when''' downstream backpressures
* *
@ -850,8 +849,8 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
* '''Cancels when''' downstream cancels * '''Cancels when''' downstream cancels
* *
*/ */
def flatten[U](strategy: FlattenStrategy[Out, U]): javadsl.Flow[In, U, Mat] = def flattenConcat[U](): javadsl.Flow[In, U, Mat] =
new Flow(delegate.flatten(strategy)) new Flow(delegate.flattenConcat[U]()(conforms[U].asInstanceOf[Out <:< scaladsl.Source[U, _]]))
/** /**
* Concatenate the given [[Source]] to this [[Flow]], meaning that once this * Concatenate the given [[Source]] to this [[Flow]], meaning that once this

View file

@ -841,10 +841,10 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour
/** /**
* Transforms a stream of streams into a contiguous stream of elements using the provided flattening strategy. * Transforms a stream of streams into a contiguous stream of elements using the provided flattening strategy.
* This operation can be used on a stream of element type [[Source]]. * This operation can be used on a stream of element type `Source[U]`.
*/ */
def flatten[U](strategy: FlattenStrategy[Out, U]): javadsl.Source[U, Mat] = def flattenConcat[U](): javadsl.Source[U, Mat] =
new Source(delegate.flatten(strategy)) new Source(delegate.flattenConcat[U]()(conforms[U].asInstanceOf[Out <:< scaladsl.Source[U, _]]))
override def withAttributes(attr: Attributes): javadsl.Source[Out, Mat] = override def withAttributes(attr: Attributes): javadsl.Source[Out, Mat] =
new Source(delegate.withAttributes(attr)) new Source(delegate.withAttributes(attr))

View file

@ -1,21 +0,0 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.scaladsl
/**
* Strategy that defines how a stream of streams should be flattened into a stream of simple elements.
*/
abstract class FlattenStrategy[-S, +T]
object FlattenStrategy {
/**
* Strategy that flattens a stream of streams by concatenating them. This means taking an incoming stream
* emitting its elements directly to the output until it completes and then taking the next stream. This has the
* consequence that if one of the input stream is infinite, no other streams after that will be consumed from.
*/
def concat[T]: FlattenStrategy[Source[T, Any], T] = Concat.asInstanceOf[FlattenStrategy[Source[T, Any], T]]
private[akka] final case object Concat extends FlattenStrategy[Any, Nothing]
}

View file

@ -1001,10 +1001,9 @@ trait FlowOps[+Out, +Mat] {
} }
/** /**
* Transforms a stream of streams into a contiguous stream of elements using the provided flattening strategy. * Flattens a stream of [[Source]]s into a contiguous stream by fully consuming one stream after the other.
* This operation can be used on a stream of element type [[akka.stream.scaladsl.Source]].
* *
* '''Emits when''' (Concat) the current consumed substream has an element available * '''Emits when''' a currently consumed substream has an element available
* *
* '''Backpressures when''' downstream backpressures * '''Backpressures when''' downstream backpressures
* *
@ -1013,11 +1012,7 @@ trait FlowOps[+Out, +Mat] {
* '''Cancels when''' downstream cancels * '''Cancels when''' downstream cancels
* *
*/ */
def flatten[U](strategy: FlattenStrategy[Out, U]): Repr[U, Mat] = strategy match { def flattenConcat[U]()(implicit ev: Out <:< Source[U, _]): Repr[U, Mat] = deprecatedAndThen(ConcatAll())
case scaladsl.FlattenStrategy.Concat | javadsl.FlattenStrategy.Concat deprecatedAndThen(ConcatAll())
case _
throw new IllegalArgumentException(s"Unsupported flattening strategy [${strategy.getClass.getName}]")
}
/** /**
* Logs elements flowing through the stream as well as completion and erroring. * Logs elements flowing through the stream as well as completion and erroring.