diff --git a/akka-docs-dev/rst/java/stream-error.rst b/akka-docs-dev/rst/java/stream-error.rst index d93599a2a1..851210148d 100644 --- a/akka-docs-dev/rst/java/stream-error.rst +++ b/akka-docs-dev/rst/java/stream-error.rst @@ -37,7 +37,7 @@ elements that cause the division by zero are effectively dropped. Be aware that dropping elements may result in deadlocks in graphs with cycles, as explained in :ref:`graph-cycles-java`. -The supervision strategy can also be defined for a section of flow operators. +The supervision strategy can also be defined for all operators of a flow. .. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/FlowErrorDocTest.java#resume-section diff --git a/akka-docs-dev/rst/java/stream-quickstart.rst b/akka-docs-dev/rst/java/stream-quickstart.rst index 324810499d..f7dc5b36f6 100644 --- a/akka-docs-dev/rst/java/stream-quickstart.rst +++ b/akka-docs-dev/rst/java/stream-quickstart.rst @@ -26,7 +26,7 @@ which will be responsible for materializing and running the streams we are about The :class:`ActorFlowMaterializer` can optionally take :class:`ActorFlowMaterializerSettings` which can be used to define materialization properties, such as default buffer sizes (see also :ref:`stream-buffers-java`), the dispatcher to -be used by the pipeline etc. These can be overridden on an element-by-element basis or for an entire section, but this +be used by the pipeline etc. These can be overridden on a flow, source and sink, but this will be discussed in depth in :ref:`stream-section-configuration`. Let's assume we have a stream of tweets readily available, in Akka this is expressed as a :class:`Source`: diff --git a/akka-docs-dev/rst/java/stream-rate.rst b/akka-docs-dev/rst/java/stream-rate.rst index 914c2d4c7a..86e71cb54c 100644 --- a/akka-docs-dev/rst/java/stream-rate.rst +++ b/akka-docs-dev/rst/java/stream-rate.rst @@ -66,7 +66,8 @@ Alternatively they can be set by passing a :class:`ActorFlowMaterializerSettings .. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/StreamBuffersRateDocTest.java#materializer-buffer -If buffer size needs to be set for segments of a Flow only, it is possible by defining a ``section()``: +If buffer size needs to be set for segments of a :class:`Flow` only, it is possible by defining a separate +:class:`Flow` with these attributes: .. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/StreamBuffersRateDocTest.java#section-buffer diff --git a/akka-docs-dev/rst/scala/code/docs/stream/FlowErrorDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/FlowErrorDocSpec.scala index c4ce15e0c9..0b5dbfd985 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/FlowErrorDocSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/FlowErrorDocSpec.scala @@ -50,9 +50,11 @@ class FlowErrorDocSpec extends AkkaSpec { case _: ArithmeticException => Supervision.Resume case _ => Supervision.Stop } - val source = Source(0 to 5).section(OperationAttributes.supervisionStrategy(decider)) { - _.filter(100 / _ < 50).map(elem => 100 / (5 - elem)) - } + val flow = Flow[Int] + .filter(100 / _ < 50).map(elem => 100 / (5 - elem)) + .withAttributes(OperationAttributes.supervisionStrategy(decider)) + val source = Source(0 to 5).via(flow) + val result = source.runWith(Sink.fold(0)(_ + _)) // the elements causing division by zero will be dropped // result here will be a Future completed with Success(150) @@ -68,13 +70,13 @@ class FlowErrorDocSpec extends AkkaSpec { case _: IllegalArgumentException => Supervision.Restart case _ => Supervision.Stop } - val source = Source(List(1, 3, -1, 5, 7)).section( - OperationAttributes.supervisionStrategy(decider)) { - _.scan(0) { (acc, elem) => - if (elem < 0) throw new IllegalArgumentException("negative not allowed") - else acc + elem - } + val flow = Flow[Int] + .scan(0) { (acc, elem) => + if (elem < 0) throw new IllegalArgumentException("negative not allowed") + else acc + elem } + .withAttributes(OperationAttributes.supervisionStrategy(decider)) + val source = Source(List(1, 3, -1, 5, 7)).via(flow) val result = source.grouped(1000).runWith(Sink.head) // the negative element cause the scan stage to be restarted, // i.e. start from 0 again diff --git a/akka-docs-dev/rst/scala/code/docs/stream/IntegrationDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/IntegrationDocSpec.scala index d0657cee1c..9d6e0f307f 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/IntegrationDocSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/IntegrationDocSpec.scala @@ -19,6 +19,7 @@ import scala.concurrent.ExecutionContext import akka.stream.ActorFlowMaterializerSettings import java.util.concurrent.atomic.AtomicInteger import akka.stream.Supervision +import akka.stream.scaladsl.Flow object IntegrationDocSpec { import TwitterStreamQuickstartDocSpec._ @@ -173,9 +174,9 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) { import Supervision.resumingDecider val emailAddresses: Source[String, Unit] = - authors.section(supervisionStrategy(resumingDecider)) { - _.mapAsync(4, author => addressSystem.lookupEmail(author.handle)) - } + authors.via( + Flow[Author].mapAsync(4, author => addressSystem.lookupEmail(author.handle)) + .withAttributes(supervisionStrategy(resumingDecider))) //#email-addresses-mapAsync-supervision } @@ -263,15 +264,13 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) { .collect { case Some(phoneNo) => phoneNo } //#blocking-map + val send = Flow[String] + .map { phoneNo => + smsServer.send(TextMessage(to = phoneNo, body = "I like your tweet")) + } + .withAttributes(OperationAttributes.dispatcher("blocking-dispatcher")) val sendTextMessages: RunnableFlow[Unit] = - phoneNumbers - .section(OperationAttributes.dispatcher("blocking-dispatcher")) { - _.map { phoneNo => - smsServer.send( - TextMessage(to = phoneNo, body = "I like your tweet")) - } - } - .to(Sink.ignore) + phoneNumbers.via(send).to(Sink.ignore) sendTextMessages.run() //#blocking-map diff --git a/akka-docs-dev/rst/scala/code/docs/stream/StreamBuffersRateSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/StreamBuffersRateSpec.scala index 8221c2e3db..60dfdc9c7b 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/StreamBuffersRateSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/StreamBuffersRateSpec.scala @@ -28,13 +28,9 @@ class StreamBuffersRateSpec extends AkkaSpec { //#materializer-buffer //#section-buffer - val flow = - Flow[Int] - .section(OperationAttributes.inputBuffer(initial = 1, max = 1)) { sectionFlow => - // the buffer size of this map is 1 - sectionFlow.map(_ * 2) - } - .map(_ / 2) // the buffer size of this map is the default + val section = Flow[Int].map(_ * 2) + .withAttributes(OperationAttributes.inputBuffer(initial = 1, max = 1)) + val flow = section.via(Flow[Int].map(_ / 2)) // the buffer size of this map is the default //#section-buffer } diff --git a/akka-docs-dev/rst/scala/stream-error.rst b/akka-docs-dev/rst/scala/stream-error.rst index 38cbcc6424..4b4edee819 100644 --- a/akka-docs-dev/rst/scala/stream-error.rst +++ b/akka-docs-dev/rst/scala/stream-error.rst @@ -37,7 +37,7 @@ elements that cause the division by zero are effectively dropped. Be aware that dropping elements may result in deadlocks in graphs with cycles, as explained in :ref:`graph-cycles-scala`. -The supervision strategy can also be defined for a section of flow operators. +The supervision strategy can also be defined for all operators of a flow. .. includecode:: code/docs/stream/FlowErrorDocSpec.scala#resume-section diff --git a/akka-docs-dev/rst/scala/stream-rate.rst b/akka-docs-dev/rst/scala/stream-rate.rst index 02a9a4197e..ebf54583eb 100644 --- a/akka-docs-dev/rst/scala/stream-rate.rst +++ b/akka-docs-dev/rst/scala/stream-rate.rst @@ -66,7 +66,8 @@ Alternatively they can be set by passing a :class:`ActorFlowMaterializerSettings .. includecode:: code/docs/stream/StreamBuffersRateSpec.scala#materializer-buffer -If buffer size needs to be set for segments of a Flow only, it is possible by defining a ``section()``: +If buffer size needs to be set for segments of a :class:`Flow` only, it is possible by defining a separate +:class:`Flow` with these attributes: .. includecode:: code/docs/stream/StreamBuffersRateSpec.scala#section-buffer diff --git a/akka-http-core/src/main/scala/akka/http/engine/client/OutgoingConnectionBlueprint.scala b/akka-http-core/src/main/scala/akka/http/engine/client/OutgoingConnectionBlueprint.scala index 2783fc44cf..138f2c41b4 100644 --- a/akka-http-core/src/main/scala/akka/http/engine/client/OutgoingConnectionBlueprint.scala +++ b/akka-http-core/src/main/scala/akka/http/engine/client/OutgoingConnectionBlueprint.scala @@ -60,7 +60,7 @@ private[http] object OutgoingConnectionBlueprint { val requestRendering: Flow[HttpRequest, ByteString, Unit] = Flow[HttpRequest] .map(RequestRenderingContext(_, remoteAddress)) - .section(name("renderer"))(_.transform(() ⇒ requestRendererFactory.newRenderer)) + .via(Flow[RequestRenderingContext].transform(() ⇒ requestRendererFactory.newRenderer).named("renderer")) .flatten(FlattenStrategy.concat) val methodBypass = Flow[HttpRequest].map(_.method) @@ -85,8 +85,8 @@ private[http] object OutgoingConnectionBlueprint { val terminationFanout = b.add(Broadcast[HttpResponse](2)) val terminationMerge = b.add(new TerminationMerge) - val bytesOut = (terminationMerge.out ~> - requestRendering.section(name("errorLogger"))(_.transform(() ⇒ errorLogger(log, "Outgoing request stream error")))).outlet + val logger = Flow[ByteString].transform(() ⇒ errorLogger(log, "Outgoing request stream error")).named("errorLogger") + val bytesOut = (terminationMerge.out ~> requestRendering.via(logger)).outlet val bytesIn = responseParsingMerge.in0 diff --git a/akka-http-core/src/main/scala/akka/http/engine/parsing/HttpRequestParser.scala b/akka-http-core/src/main/scala/akka/http/engine/parsing/HttpRequestParser.scala index 858e65e2cc..dc11882011 100644 --- a/akka-http-core/src/main/scala/akka/http/engine/parsing/HttpRequestParser.scala +++ b/akka-http-core/src/main/scala/akka/http/engine/parsing/HttpRequestParser.scala @@ -9,6 +9,7 @@ import scala.annotation.tailrec import akka.actor.ActorRef import akka.stream.scaladsl.OperationAttributes._ import akka.stream.stage.{ Context, PushPullStage } +import akka.stream.scaladsl.Flow import akka.stream.scaladsl.{ Keep, Source } import akka.util.ByteString import akka.http.model.parser.CharacterClasses @@ -128,7 +129,7 @@ private[http] class HttpRequestParser(_settings: ParserSettings, def expect100continueHandling[T, Mat]: Source[T, Mat] ⇒ Source[T, Mat] = if (expect100continue) { - _.section(name("expect100continueTrigger"))(_.transform(() ⇒ new PushPullStage[T, T] { + _.via(Flow[T].transform(() ⇒ new PushPullStage[T, T] { private var oneHundredContinueSent = false def onPush(elem: T, ctx: Context[T]) = ctx.push(elem) def onPull(ctx: Context[T]) = { @@ -139,7 +140,7 @@ private[http] class HttpRequestParser(_settings: ParserSettings, } ctx.pull() } - })) + }).named("expect100continueTrigger")) } else identityFunc teh match { @@ -176,4 +177,4 @@ private[http] class HttpRequestParser(_settings: ParserSettings, expect100continue, hostHeaderPresent, closeAfterResponseCompletion) } } else failMessageStart("Request is missing required `Host` header") -} \ No newline at end of file +} diff --git a/akka-http-core/src/main/scala/akka/http/engine/rendering/HttpRequestRendererFactory.scala b/akka-http-core/src/main/scala/akka/http/engine/rendering/HttpRequestRendererFactory.scala index b3ef27450d..5da3fb7d2d 100644 --- a/akka-http-core/src/main/scala/akka/http/engine/rendering/HttpRequestRendererFactory.scala +++ b/akka-http-core/src/main/scala/akka/http/engine/rendering/HttpRequestRendererFactory.scala @@ -114,12 +114,11 @@ private[http] class HttpRequestRendererFactory(userAgentHeader: Option[headers.` case HttpEntity.Default(_, contentLength, data) ⇒ renderContentLength(contentLength) ~~ CrLf - renderByteStrings(r, - data.section(name("checkContentLength"))(_.transform(() ⇒ new CheckContentLengthTransformer(contentLength)))) + renderByteStrings(r, data.via(CheckContentLengthTransformer.flow(contentLength))) case HttpEntity.Chunked(_, chunks) ⇒ r ~~ CrLf - renderByteStrings(r, chunks.section(name("chunkTransform"))(_.transform(() ⇒ new ChunkTransformer))) + renderByteStrings(r, chunks.via(ChunkTransformer.flow)) } renderRequestLine() diff --git a/akka-http-core/src/main/scala/akka/http/engine/rendering/HttpResponseRendererFactory.scala b/akka-http-core/src/main/scala/akka/http/engine/rendering/HttpResponseRendererFactory.scala index fd1edc4738..fe73f42f93 100644 --- a/akka-http-core/src/main/scala/akka/http/engine/rendering/HttpResponseRendererFactory.scala +++ b/akka-http-core/src/main/scala/akka/http/engine/rendering/HttpResponseRendererFactory.scala @@ -172,7 +172,7 @@ private[http] class HttpResponseRendererFactory(serverHeader: Option[headers.Ser renderHeaders(headers.toList) renderEntityContentType(r, entity) renderContentLengthHeader(contentLength) ~~ CrLf - byteStrings(data.section(name("checkContentLength"))(_.transform(() ⇒ new CheckContentLengthTransformer(contentLength)))) + byteStrings(data.via(CheckContentLengthTransformer.flow(contentLength))) case HttpEntity.CloseDelimited(_, data) ⇒ renderHeaders(headers.toList, alwaysClose = ctx.requestMethod != HttpMethods.HEAD) @@ -185,7 +185,7 @@ private[http] class HttpResponseRendererFactory(serverHeader: Option[headers.Ser else { renderHeaders(headers.toList) renderEntityContentType(r, entity) ~~ CrLf - byteStrings(chunks.section(name("renderChunks"))(_.transform(() ⇒ new ChunkTransformer))) + byteStrings(chunks.via(ChunkTransformer.flow)) } } diff --git a/akka-http-core/src/main/scala/akka/http/engine/rendering/RenderSupport.scala b/akka-http-core/src/main/scala/akka/http/engine/rendering/RenderSupport.scala index 0a878b64fb..f8fe5003c4 100644 --- a/akka-http-core/src/main/scala/akka/http/engine/rendering/RenderSupport.scala +++ b/akka-http-core/src/main/scala/akka/http/engine/rendering/RenderSupport.scala @@ -12,6 +12,7 @@ import akka.stream.stage._ import akka.http.model._ import akka.http.util._ import org.reactivestreams.Subscriber +import akka.http.model.HttpEntity.ChunkStreamPart /** * INTERNAL API @@ -51,6 +52,10 @@ private object RenderSupport { messageBytes } + object ChunkTransformer { + val flow = Flow[ChunkStreamPart].transform(() ⇒ new ChunkTransformer).named("renderChunks") + } + class ChunkTransformer extends StatefulStage[HttpEntity.ChunkStreamPart, ByteString] { var lastChunkSeen = false @@ -67,6 +72,11 @@ private object RenderSupport { else terminationEmit(Iterator.single(defaultLastChunkBytes), ctx) } + object CheckContentLengthTransformer { + def flow(contentLength: Long) = Flow[ByteString].transform(() ⇒ + new CheckContentLengthTransformer(contentLength)).named("checkContentLength") + } + class CheckContentLengthTransformer(length: Long) extends PushStage[ByteString, ByteString] { var sent = 0L diff --git a/akka-http-core/src/main/scala/akka/http/engine/server/HttpServerBluePrint.scala b/akka-http-core/src/main/scala/akka/http/engine/server/HttpServerBluePrint.scala index 93a777a105..0cd5a614eb 100644 --- a/akka-http-core/src/main/scala/akka/http/engine/server/HttpServerBluePrint.scala +++ b/akka-http-core/src/main/scala/akka/http/engine/server/HttpServerBluePrint.scala @@ -48,10 +48,10 @@ private[http] object HttpServerBluePrint { } } - val requestParsingFlow = Flow[ByteString].section(name("rootParser"))(_.transform(() ⇒ + val requestParsingFlow = Flow[ByteString].transform(() ⇒ // each connection uses a single (private) request parser instance for all its requests // which builds a cache of all header instances seen on that connection - rootParser.createShallowCopy(() ⇒ oneHundredContinueRef).stage)) + rootParser.createShallowCopy(() ⇒ oneHundredContinueRef).stage).named("rootParser") val requestPreparation = Flow[RequestOutput] @@ -79,10 +79,10 @@ private[http] object HttpServerBluePrint { val rendererPipeline = Flow[ResponseRenderingContext] - .section(name("recover"))(_.transform(() ⇒ new ErrorsTo500ResponseRecovery(log))) // FIXME: simplify after #16394 is closed - .section(name("renderer"))(_.transform(() ⇒ responseRendererFactory.newRenderer)) + .via(Flow[ResponseRenderingContext].transform(() ⇒ new ErrorsTo500ResponseRecovery(log)).named("recover")) // FIXME: simplify after #16394 is closed + .via(Flow[ResponseRenderingContext].transform(() ⇒ responseRendererFactory.newRenderer).named("renderer")) .flatten(FlattenStrategy.concat) - .section(name("errorLogger"))(_.transform(() ⇒ errorLogger(log, "Outgoing response stream error"))) + .via(Flow[ByteString].transform(() ⇒ errorLogger(log, "Outgoing response stream error")).named("errorLogger")) FlowGraph.partial(requestParsingFlow, rendererPipeline)(Keep.right) { implicit b ⇒ (requestParsing, renderer) ⇒ diff --git a/akka-http-core/src/main/scala/akka/http/model/HttpEntity.scala b/akka-http-core/src/main/scala/akka/http/model/HttpEntity.scala index fb0a45dd9c..dbc4072b37 100644 --- a/akka-http-core/src/main/scala/akka/http/model/HttpEntity.scala +++ b/akka-http-core/src/main/scala/akka/http/model/HttpEntity.scala @@ -61,7 +61,7 @@ sealed trait HttpEntity extends japi.HttpEntity { } // TODO timerTransform is meant to be replaced / rewritten, it's currently private[akka]; See https://github.com/akka/akka/issues/16393 - dataBytes.section(name("toStrict"))(_.timerTransform(transformer)).runWith(Sink.head) + dataBytes.via(Flow[ByteString].timerTransform(transformer).named("toStrict")).runWith(Sink.head) } /** diff --git a/akka-http-core/src/main/scala/akka/http/util/StreamUtils.scala b/akka-http-core/src/main/scala/akka/http/util/StreamUtils.scala index 9c37338445..369b49e52f 100644 --- a/akka-http-core/src/main/scala/akka/http/util/StreamUtils.scala +++ b/akka-http-core/src/main/scala/akka/http/util/StreamUtils.scala @@ -50,7 +50,7 @@ private[http] object StreamUtils { ctx.fail(f(cause)) } - Flow[ByteString].section(name("transformError"))(_.transform(() ⇒ transformer)) + Flow[ByteString].transform(() ⇒ transformer).named("transformError") } def sliceBytesTransformer(start: Long, length: Long): Flow[ByteString, ByteString, Unit] = { @@ -84,7 +84,7 @@ private[http] object StreamUtils { override def initial: State = if (start > 0) skipping else taking(length) } - Flow[ByteString].section(name("sliceBytes"))(_.transform(() ⇒ transformer)) + Flow[ByteString].transform(() ⇒ transformer).named("sliceBytes") } def limitByteChunksStage(maxBytesPerChunk: Int): PushPullStage[ByteString, ByteString] = diff --git a/akka-http-core/src/test/scala/akka/http/engine/parsing/RequestParserSpec.scala b/akka-http-core/src/test/scala/akka/http/engine/parsing/RequestParserSpec.scala index 00638f5ce7..7b4e9cb506 100644 --- a/akka-http-core/src/test/scala/akka/http/engine/parsing/RequestParserSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/engine/parsing/RequestParserSpec.scala @@ -443,7 +443,7 @@ class RequestParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll { def multiParse(parser: HttpRequestParser)(input: Seq[String]): Seq[Either[RequestOutput, StrictEqualHttpRequest]] = Source(input.toList) .map(ByteString.apply) - .section(name("parser"))(_.transform(() ⇒ parser.stage)) + .transform(() ⇒ parser.stage).named("parser") .splitWhen(x ⇒ x.isInstanceOf[MessageStart] || x.isInstanceOf[EntityStreamError]) .headAndTail .collect { diff --git a/akka-http-core/src/test/scala/akka/http/engine/parsing/ResponseParserSpec.scala b/akka-http-core/src/test/scala/akka/http/engine/parsing/ResponseParserSpec.scala index 40f96f7948..859ed8ea58 100644 --- a/akka-http-core/src/test/scala/akka/http/engine/parsing/ResponseParserSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/engine/parsing/ResponseParserSpec.scala @@ -261,7 +261,7 @@ class ResponseParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll { val future = Source(input.toList) .map(ByteString.apply) - .section(name("parser"))(_.transform(() ⇒ newParserStage(requestMethod))) + .transform(() ⇒ newParserStage(requestMethod)).named("parser") .splitWhen(x ⇒ x.isInstanceOf[MessageStart] || x.isInstanceOf[EntityStreamError]) .headAndTail .collect { diff --git a/akka-http-core/src/test/scala/akka/http/engine/rendering/RequestRendererSpec.scala b/akka-http-core/src/test/scala/akka/http/engine/rendering/RequestRendererSpec.scala index d2966e2da8..ad8b029c46 100644 --- a/akka-http-core/src/test/scala/akka/http/engine/rendering/RequestRendererSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/engine/rendering/RequestRendererSpec.scala @@ -252,9 +252,9 @@ class RequestRendererSpec extends FreeSpec with Matchers with BeforeAndAfterAll def renderTo(expected: String): Matcher[HttpRequest] = equal(expected.stripMarginWithNewline("\r\n")).matcher[String] compose { request ⇒ val renderer = newRenderer - val byteStringSource = Await.result(Source.single(RequestRenderingContext(request, serverAddress)). - section(name("renderer"))(_.transform(() ⇒ renderer)). - runWith(Sink.head), 1.second) + val byteStringSource = Await.result(Source.single(RequestRenderingContext(request, serverAddress)) + .transform(() ⇒ renderer).named("renderer") + .runWith(Sink.head), 1.second) val future = byteStringSource.grouped(1000).runWith(Sink.head).map(_.reduceLeft(_ ++ _).utf8String) Await.result(future, 250.millis) } diff --git a/akka-http-core/src/test/scala/akka/http/engine/rendering/ResponseRendererSpec.scala b/akka-http-core/src/test/scala/akka/http/engine/rendering/ResponseRendererSpec.scala index 36c0a2db07..6acf05d03e 100644 --- a/akka-http-core/src/test/scala/akka/http/engine/rendering/ResponseRendererSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/engine/rendering/ResponseRendererSpec.scala @@ -548,9 +548,9 @@ class ResponseRendererSpec extends FreeSpec with Matchers with BeforeAndAfterAll def renderTo(expected: String, close: Boolean): Matcher[ResponseRenderingContext] = equal(expected.stripMarginWithNewline("\r\n") -> close).matcher[(String, Boolean)] compose { ctx ⇒ val renderer = newRenderer - val byteStringSource = Await.result(Source.single(ctx). - section(name("renderer"))(_.transform(() ⇒ renderer)). - runWith(Sink.head), 1.second) + val byteStringSource = Await.result(Source.single(ctx) + .transform(() ⇒ renderer).named("renderer") + .runWith(Sink.head), 1.second) val future = byteStringSource.grouped(1000).runWith(Sink.head).map(_.reduceLeft(_ ++ _).utf8String) Await.result(future, 250.millis) -> renderer.isComplete } diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowGraphTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowGraphTest.java index 7326625b1d..94de608d4b 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowGraphTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowGraphTest.java @@ -15,21 +15,17 @@ import akka.stream.stage.*; import akka.stream.javadsl.japi.*; import akka.stream.testkit.AkkaSpec; import akka.testkit.JavaTestKit; - import akka.testkit.TestProbe; + import org.junit.ClassRule; import org.junit.Test; import org.reactivestreams.Publisher; - import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.Duration; -import scala.concurrent.duration.FiniteDuration; import scala.runtime.BoxedUnit; - import java.util.*; import java.util.concurrent.TimeUnit; - import static org.junit.Assert.assertEquals; public class FlowGraphTest extends StreamTest { @@ -41,6 +37,7 @@ public class FlowGraphTest extends StreamTest { public static AkkaJUnitActorSystemResource actorSystemResource = new AkkaJUnitActorSystemResource("FlowGraphTest", AkkaSpec.testConf()); + @SuppressWarnings("serial") public Creator> op() { return new akka.stream.javadsl.japi.Creator>() { @Override @@ -62,39 +59,13 @@ public class FlowGraphTest extends StreamTest { @Test public void mustBeAbleToUseMerge() throws Exception { - final Flow f1 = Flow - .of(String.class) - .section( - OperationAttributes.name("f1"), - new Function, Flow>() { - @Override - public Flow apply( - Flow flow) { - return flow.transform(FlowGraphTest.this. op()); - } - }); - final Flow f2 = Flow - .of(String.class) - .section( - OperationAttributes.name("f2"), - new Function, Flow>() { - @Override - public Flow apply( - Flow flow) { - return flow.transform(FlowGraphTest.this. op()); - } - }); - final Flow f3 = Flow - .of(String.class) - .section( - OperationAttributes.name("f3"), - new Function, Flow>() { - @Override - public Flow apply( - Flow flow) { - return flow.transform(FlowGraphTest.this. op()); - } - }); + final Flow f1 = + Flow.of(String.class).transform(FlowGraphTest.this. op()).named("f1"); + final Flow f2 = + Flow.of(String.class).transform(FlowGraphTest.this. op()).named("f2"); + @SuppressWarnings("unused") + final Flow f3 = + Flow.of(String.class).transform(FlowGraphTest.this. op()).named("f3"); final Source in1 = Source.from(Arrays.asList("a", "b", "c")); final Source in2 = Source.from(Arrays.asList("d", "e", "f")); diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java index 7661eea392..06e67bccd8 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java @@ -18,23 +18,19 @@ import akka.stream.testkit.AkkaSpec; import akka.testkit.JavaTestKit; import org.reactivestreams.Publisher; - import scala.runtime.Boxed; import scala.runtime.BoxedUnit; - import org.junit.ClassRule; import org.junit.Test; - import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; import scala.concurrent.duration.Duration; - import java.util.*; import java.util.concurrent.TimeUnit; - import static org.junit.Assert.assertEquals; +@SuppressWarnings("serial") public class FlowTest extends StreamTest { public FlowTest() { super(actorSystemResource); @@ -242,39 +238,13 @@ public class FlowTest extends StreamTest { @Test public void mustBeAbleToUseMerge() throws Exception { - final Flow f1 = Flow - .of(String.class) - .section( - OperationAttributes.name("f1"), - new Function, Flow>() { - @Override - public Flow apply( - Flow flow) { - return flow.transform(FlowTest.this. op()); - } - }); - final Flow f2 = Flow - .of(String.class) - .section( - OperationAttributes.name("f2"), - new Function, Flow>() { - @Override - public Flow apply( - Flow flow) { - return flow.transform(FlowTest.this. op()); - } - }); - final Flow f3 = Flow - .of(String.class) - .section( - OperationAttributes.name("f3"), - new Function, Flow>() { - @Override - public Flow apply( - Flow flow) { - return flow.transform(FlowTest.this. op()); - } - }); + final Flow f1 = + Flow.of(String.class).transform(FlowTest.this. op()).named("f1"); + final Flow f2 = + Flow.of(String.class).transform(FlowTest.this. op()).named("f2"); + @SuppressWarnings("unused") + final Flow f3 = + Flow.of(String.class).transform(FlowTest.this. op()).named("f3"); final Source in1 = Source.from(Arrays.asList("a", "b", "c")); final Source in2 = Source.from(Arrays.asList("d", "e", "f")); diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/SinkTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/SinkTest.java index 081bd43bdf..17743cc706 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/SinkTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/SinkTest.java @@ -14,7 +14,6 @@ import org.reactivestreams.Publisher; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.Duration; - import akka.stream.StreamTest; import akka.stream.javadsl.japi.Function2; import akka.stream.testkit.AkkaSpec; diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java index dd2f32c36f..2926bb3120 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java @@ -28,6 +28,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertEquals; +@SuppressWarnings("serial") public class SourceTest extends StreamTest { public SourceTest() { super(actorSystemResource); diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGraphCompileSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGraphCompileSpec.scala index 62eff0a7fc..35e0452d71 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGraphCompileSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGraphCompileSpec.scala @@ -29,12 +29,12 @@ class FlowGraphCompileSpec extends AkkaSpec { val apples = () ⇒ Iterator.continually(new Apple) - val f1 = Flow[String].section(name("f1"))(_.transform(op[String, String])) - val f2 = Flow[String].section(name("f2"))(_.transform(op[String, String])) - val f3 = Flow[String].section(name("f3"))(_.transform(op[String, String])) - val f4 = Flow[String].section(name("f4"))(_.transform(op[String, String])) - val f5 = Flow[String].section(name("f5"))(_.transform(op[String, String])) - val f6 = Flow[String].section(name("f6"))(_.transform(op[String, String])) + val f1 = Flow[String].transform(op[String, String]).named("f1") + val f2 = Flow[String].transform(op[String, String]).named("f2") + val f3 = Flow[String].transform(op[String, String]).named("f3") + val f4 = Flow[String].transform(op[String, String]).named("f4") + val f5 = Flow[String].transform(op[String, String]).named("f5") + val f6 = Flow[String].transform(op[String, String]).named("f6") val in1 = Source(List("a", "b", "c")) val in2 = Source(List("d", "e", "f")) @@ -94,7 +94,7 @@ class FlowGraphCompileSpec extends AkkaSpec { }.run() } - /** + /* * in ---> f1 -+-> f2 -+-> f3 ---> b.add(out1) * ^ | * | V @@ -161,7 +161,7 @@ class FlowGraphCompileSpec extends AkkaSpec { val out2 = Sink.publisher[String] val out9 = Sink.publisher[String] val out10 = Sink.publisher[String] - def f(s: String) = Flow[String].section(name(s))(_.transform(op[String, String])) + def f(s: String) = Flow[String].transform(op[String, String]).named(s) import FlowGraph.Implicits._ in7 ~> f("a") ~> b7 ~> f("b") ~> m11 ~> f("c") ~> b11 ~> f("d") ~> out2 diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala index 289b0745e0..31f4c5fc31 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala @@ -221,8 +221,9 @@ class FlowGroupBySpec extends AkkaSpec { "resume stream when groupBy function throws" in { val publisherProbeProbe = StreamTestKit.PublisherProbe[Int]() val exc = TE("test") - val publisher = Source(publisherProbeProbe).section(OperationAttributes.supervisionStrategy(resumingDecider))( - _.groupBy(elem ⇒ if (elem == 2) throw exc else elem % 2)) + val publisher = Source(publisherProbeProbe) + .groupBy(elem ⇒ if (elem == 2) throw exc else elem % 2) + .withAttributes(OperationAttributes.supervisionStrategy(resumingDecider)) .runWith(Sink.publisher) val subscriber = StreamTestKit.SubscriberProbe[(Int, Source[Int, Unit])]() publisher.subscribe(subscriber) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncSpec.scala index 51f6112851..96cd1d3808 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncSpec.scala @@ -158,10 +158,13 @@ class FlowMapAsyncSpec extends AkkaSpec { "resume after future failure" in { val c = StreamTestKit.SubscriberProbe[Int]() implicit val ec = system.dispatcher - val p = Source(1 to 5).section(supervisionStrategy(resumingDecider))(_.mapAsync(4, n ⇒ Future { - if (n == 3) throw new RuntimeException("err3") with NoStackTrace - else n - })).to(Sink(c)).run() + val p = Source(1 to 5) + .mapAsync(4, n ⇒ Future { + if (n == 3) throw new RuntimeException("err3") with NoStackTrace + else n + }) + .withAttributes(supervisionStrategy(resumingDecider)) + .to(Sink(c)).run() val sub = c.expectSubscription() sub.request(10) for (n ← List(1, 2, 4, 5)) c.expectNext(n) @@ -171,10 +174,12 @@ class FlowMapAsyncSpec extends AkkaSpec { "resume when mapAsync throws" in { val c = StreamTestKit.SubscriberProbe[Int]() implicit val ec = system.dispatcher - val p = Source(1 to 5).section(supervisionStrategy(resumingDecider))(_.mapAsync(4, n ⇒ - if (n == 3) throw new RuntimeException("err4") with NoStackTrace - else Future(n))). - to(Sink(c)).run() + val p = Source(1 to 5) + .mapAsync(4, n ⇒ + if (n == 3) throw new RuntimeException("err4") with NoStackTrace + else Future(n)) + .withAttributes(supervisionStrategy(resumingDecider)) + .to(Sink(c)).run() val sub = c.expectSubscription() sub.request(10) for (n ← List(1, 2, 4, 5)) c.expectNext(n) @@ -191,8 +196,9 @@ class FlowMapAsyncSpec extends AkkaSpec { "resume when future is completed with null" in { val c = StreamTestKit.SubscriberProbe[String]() - val p = Source(List("a", "b", "c")).section(supervisionStrategy(resumingDecider))( - _.mapAsync(4, elem ⇒ if (elem == "b") Future.successful(null) else Future.successful(elem))) + val p = Source(List("a", "b", "c")) + .mapAsync(4, elem ⇒ if (elem == "b") Future.successful(null) else Future.successful(elem)) + .withAttributes(supervisionStrategy(resumingDecider)) .to(Sink(c)).run() val sub = c.expectSubscription() sub.request(10) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncUnorderedSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncUnorderedSpec.scala index 168611bdfa..3cbcdabcd6 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncUnorderedSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncUnorderedSpec.scala @@ -112,10 +112,13 @@ class FlowMapAsyncUnorderedSpec extends AkkaSpec { "resume after future failure" in { val c = StreamTestKit.SubscriberProbe[Int]() implicit val ec = system.dispatcher - val p = Source(1 to 5).section(supervisionStrategy(resumingDecider))(_.mapAsyncUnordered(4, n ⇒ Future { - if (n == 3) throw new RuntimeException("err3") with NoStackTrace - else n - })).to(Sink(c)).run() + val p = Source(1 to 5) + .mapAsyncUnordered(4, n ⇒ Future { + if (n == 3) throw new RuntimeException("err3") with NoStackTrace + else n + }) + .withAttributes(supervisionStrategy(resumingDecider)) + .to(Sink(c)).run() val sub = c.expectSubscription() sub.request(10) val expected = (OnComplete :: List(1, 2, 4, 5).map(OnNext.apply)).toSet @@ -125,10 +128,12 @@ class FlowMapAsyncUnorderedSpec extends AkkaSpec { "resume when mapAsyncUnordered throws" in { val c = StreamTestKit.SubscriberProbe[Int]() implicit val ec = system.dispatcher - val p = Source(1 to 5).section(supervisionStrategy(resumingDecider))(_.mapAsyncUnordered(4, n ⇒ - if (n == 3) throw new RuntimeException("err4") with NoStackTrace - else Future(n))). - to(Sink(c)).run() + val p = Source(1 to 5) + .mapAsyncUnordered(4, n ⇒ + if (n == 3) throw new RuntimeException("err4") with NoStackTrace + else Future(n)) + .withAttributes(supervisionStrategy(resumingDecider)) + .to(Sink(c)).run() val sub = c.expectSubscription() sub.request(10) val expected = (OnComplete :: List(1, 2, 4, 5).map(OnNext.apply)).toSet @@ -145,8 +150,9 @@ class FlowMapAsyncUnorderedSpec extends AkkaSpec { "resume when future is completed with null" in { val c = StreamTestKit.SubscriberProbe[String]() - val p = Source(List("a", "b", "c")).section(supervisionStrategy(resumingDecider))( - _.mapAsyncUnordered(4, elem ⇒ if (elem == "b") Future.successful(null) else Future.successful(elem))) + val p = Source(List("a", "b", "c")) + .mapAsyncUnordered(4, elem ⇒ if (elem == "b") Future.successful(null) else Future.successful(elem)) + .withAttributes(supervisionStrategy(resumingDecider)) .to(Sink(c)).run() val sub = c.expectSubscription() sub.request(10) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSectionSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSectionSpec.scala index 25f45131aa..9589467635 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSectionSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSectionSpec.scala @@ -11,9 +11,9 @@ import akka.testkit.TestProbe object FlowSectionSpec { val config = - """ - my-dispatcher1 = ${akka.test.stream-dispatcher} - my-dispatcher2 = ${akka.test.stream-dispatcher} + s""" + my-dispatcher1 = $${akka.test.stream-dispatcher} + my-dispatcher2 = $${akka.test.stream-dispatcher} """ } @@ -24,7 +24,7 @@ class FlowSectionSpec extends AkkaSpec(FlowSectionSpec.config) { "A flow" can { "have an op with a different dispatcher" in { - val flow = Flow[Int].section(dispatcher("my-dispatcher1"))(_.map(sendThreadNameTo(testActor))) + val flow = Flow[Int].map(sendThreadNameTo(testActor)).withAttributes(dispatcher("my-dispatcher1")) Source.single(1).via(flow).to(Sink.ignore).run() @@ -32,7 +32,13 @@ class FlowSectionSpec extends AkkaSpec(FlowSectionSpec.config) { } "have a nested flow with a different dispatcher" in { - val flow = Flow[Int].section(dispatcher("my-dispatcher1"))(_.via(Flow[Int].map(sendThreadNameTo(testActor)))) + val flow = Flow() { implicit b ⇒ + import FlowGraph.Implicits._ + val bcast1 = b.add(Broadcast[Int](1)) + val bcast2 = b.add(Broadcast[Int](1)) + bcast1 ~> Flow[Int].map(sendThreadNameTo(testActor)) ~> bcast2.in + (bcast1.in, bcast2.out(0)) + }.withAttributes(dispatcher("my-dispatcher1")) Source.single(1).via(flow).to(Sink.ignore).run() @@ -40,50 +46,58 @@ class FlowSectionSpec extends AkkaSpec(FlowSectionSpec.config) { } "have multiple levels of nesting" in { - val flow = Flow[Int].section(dispatcher("my-dispatcher1"))( - _.via(Flow[Int].map(sendThreadNameTo(testActor)).section(dispatcher("my-dispatcher2"))( - _.via(Flow[Int].map(sendThreadNameTo(testActor)))))) - Source.single(1).via(flow).to(Sink.ignore).run() + val probe1 = TestProbe() + val probe2 = TestProbe() - expectMsgType[String] should include("my-dispatcher1") - expectMsgType[String] should include("my-dispatcher2") + val flow1 = Flow() { implicit b ⇒ + import FlowGraph.Implicits._ + val bcast1 = b.add(Broadcast[Int](1)) + val bcast2 = b.add(Broadcast[Int](1)) + bcast1 ~> Flow[Int].map(sendThreadNameTo(probe1.ref)) ~> bcast2.in + (bcast1.in, bcast2.out(0)) + }.withAttributes(dispatcher("my-dispatcher1")) + + val flow2 = Flow() { implicit b ⇒ + import FlowGraph.Implicits._ + val bcast1 = b.add(Broadcast[Int](1)) + val bcast2 = b.add(Broadcast[Int](1)) + bcast1 ~> flow1.via(Flow[Int].map(sendThreadNameTo(probe2.ref))) ~> bcast2.in + (bcast1.in, bcast2.out(0)) + }.withAttributes(dispatcher("my-dispatcher2")) + + Source.single(1).via(flow2).to(Sink.ignore).run() + + probe1.expectMsgType[String] should include("my-dispatcher1") + probe2.expectMsgType[String] should include("my-dispatcher2") } - "have an op section with a name" in { + "include name in toString" in { //FIXME: Flow has no simple toString anymore pending val n = "Uppercase reverser" - val f = Flow[String]. - map(_.toLowerCase()). - section(name(n)) { - _.map(_.toUpperCase). - map(_.reverse) - }. - map(_.toLowerCase()) - f.toString should include(n) + val f1 = Flow[String].map(_.toLowerCase()) + val f2 = Flow[String].map(_.toUpperCase).map(_.reverse).named(n).map(_.toLowerCase()) + + f1.via(f2).toString should include(n) } "have an op section with a different dispatcher and name" in { val defaultDispatcher = TestProbe() val customDispatcher = TestProbe() - val f = Flow[Int]. - map(sendThreadNameTo(defaultDispatcher.ref)). - section(dispatcher("my-dispatcher1") and name("separate-disptacher")) { - _.map(sendThreadNameTo(customDispatcher.ref)). - map(sendThreadNameTo(customDispatcher.ref)) - }. - map(sendThreadNameTo(defaultDispatcher.ref)) + val f1 = Flow[Int].map(sendThreadNameTo(defaultDispatcher.ref)) + val f2 = Flow[Int].map(sendThreadNameTo(customDispatcher.ref)) + .withAttributes(dispatcher("my-dispatcher1") and name("separate-disptacher")) - Source(0 to 2).via(f).runWith(Sink.ignore) + Source(0 to 2).via(f1).via(f2).runWith(Sink.ignore) - defaultDispatcher.receiveN(6).foreach { + defaultDispatcher.receiveN(3).foreach { case s: String ⇒ s should include("akka.test.stream-dispatcher") } - customDispatcher.receiveN(6).foreach { + customDispatcher.receiveN(3).foreach { case s: String ⇒ s should include("my-dispatcher1") } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSplitWhenSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSplitWhenSpec.scala index 71fc49f99d..1c0d5b24ac 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSplitWhenSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSplitWhenSpec.scala @@ -143,8 +143,9 @@ class FlowSplitWhenSpec extends AkkaSpec { "resume stream when splitWhen function throws" in { val publisherProbeProbe = StreamTestKit.PublisherProbe[Int]() val exc = TE("test") - val publisher = Source(publisherProbeProbe).section(OperationAttributes.supervisionStrategy(resumingDecider))( - _.splitWhen(elem ⇒ if (elem == 3) throw exc else elem % 3 == 0)) + val publisher = Source(publisherProbeProbe) + .splitWhen(elem ⇒ if (elem == 3) throw exc else elem % 3 == 0) + .withAttributes(OperationAttributes.supervisionStrategy(resumingDecider)) .runWith(Sink.publisher) val subscriber = StreamTestKit.SubscriberProbe[Source[Int, Unit]]() publisher.subscribe(subscriber) diff --git a/akka-stream/src/main/scala/akka/stream/ActorFlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/ActorFlowMaterializer.scala index 15f78e31c5..a71697d4d6 100644 --- a/akka-stream/src/main/scala/akka/stream/ActorFlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/ActorFlowMaterializer.scala @@ -222,7 +222,7 @@ final case class ActorFlowMaterializerSettings( /** * Scala API: Decides how exceptions from application code are to be handled, unless - * overridden for specific sections of the stream operations with + * overridden for specific flows of the stream operations with * [[akka.stream.scaladsl.OperationAttributes#supervisionStrategy]]. */ def withSupervisionStrategy(decider: Supervision.Decider): ActorFlowMaterializerSettings = @@ -230,7 +230,7 @@ final case class ActorFlowMaterializerSettings( /** * Java API: Decides how exceptions from application code are to be handled, unless - * overridden for specific sections of the stream operations with + * overridden for specific flows of the stream operations with * [[akka.stream.javadsl.OperationAttributes#supervisionStrategy]]. */ def withSupervisionStrategy(decider: japi.Function[Throwable, Supervision.Directive]): ActorFlowMaterializerSettings = { diff --git a/akka-stream/src/main/scala/akka/stream/extra/Timed.scala b/akka-stream/src/main/scala/akka/stream/extra/Timed.scala index fdfc833ccc..a0eacb7890 100644 --- a/akka-stream/src/main/scala/akka/stream/extra/Timed.scala +++ b/akka-stream/src/main/scala/akka/stream/extra/Timed.scala @@ -26,11 +26,10 @@ private[akka] trait TimedOps { def timed[I, O, Mat, Mat2](source: Source[I, Mat], measuredOps: Source[I, Mat] ⇒ Source[O, Mat2], onComplete: FiniteDuration ⇒ Unit): Source[O, Mat2] = { val ctx = new TimedFlowContext - val startTimed = (f: Flow[I, I, Any]) ⇒ f.transform(() ⇒ new StartTimedFlow(ctx)) - val stopTimed = (f: Flow[O, O, Any]) ⇒ f.transform(() ⇒ new StopTimed(ctx, onComplete)) + val startTimed = Flow[I].transform(() ⇒ new StartTimed(ctx)).named("startTimed") + val stopTimed = Flow[O].transform(() ⇒ new StopTimed(ctx, onComplete)).named("stopTimed") - val begin = source.section(name("startTimed"))(startTimed) - measuredOps(begin).section(name("stopTimed"))(stopTimed) + measuredOps(source.via(startTimed)).via(stopTimed) } /** @@ -43,11 +42,10 @@ private[akka] trait TimedOps { // they do share a super-type (FlowOps), but all operations of FlowOps return path dependant type val ctx = new TimedFlowContext - val startTimed = (f: Flow[O, O, Any]) ⇒ f.transform(() ⇒ new StartTimedFlow(ctx)) - val stopTimed = (f: Flow[Out, Out, Any]) ⇒ f.transform(() ⇒ new StopTimed(ctx, onComplete)) + val startTimed = Flow[O].transform(() ⇒ new StartTimed(ctx)).named("startTimed") + val stopTimed = Flow[Out].transform(() ⇒ new StopTimed(ctx, onComplete)).named("stopTimed") - val begin: Flow[I, O, Mat] = flow.section(name("startTimed"))(startTimed) - measuredOps(begin).section(name("stopTimed"))(stopTimed) + measuredOps(flow.via(startTimed)).via(stopTimed) } } @@ -65,20 +63,16 @@ private[akka] trait TimedIntervalBetweenOps { * Measures rolling interval between immediately subsequent `matching(o: O)` elements. */ def timedIntervalBetween[O, Mat](source: Source[O, Mat], matching: O ⇒ Boolean, onInterval: FiniteDuration ⇒ Unit): Source[O, Mat] = { - source.section(name("timedInterval")) { - _.transform(() ⇒ new TimedIntervalTransformer[O](matching, onInterval)) - } + val timedInterval = Flow[O].transform(() ⇒ new TimedInterval[O](matching, onInterval)).named("timedInterval") + source.via(timedInterval) } /** * Measures rolling interval between immediately subsequent `matching(o: O)` elements. */ def timedIntervalBetween[I, O, Mat](flow: Flow[I, O, Mat], matching: O ⇒ Boolean, onInterval: FiniteDuration ⇒ Unit): Flow[I, O, Mat] = { - // todo is there any other way to provide this for Flow / Duct, without duplicating impl? - // they do share a super-type (FlowOps), but all operations of FlowOps return path dependant type - flow.section(name("timedInterval")) { - _.transform(() ⇒ new TimedIntervalTransformer[O](matching, onInterval)) - } + val timedInterval = Flow[O].transform(() ⇒ new TimedInterval[O](matching, onInterval)).named("timedInterval") + flow.via(timedInterval) } } @@ -108,7 +102,7 @@ object Timed extends TimedOps with TimedIntervalBetweenOps { } } - final class StartTimedFlow[T](timedContext: TimedFlowContext) extends PushStage[T, T] { + final class StartTimed[T](timedContext: TimedFlowContext) extends PushStage[T, T] { private var started = false override def onPush(elem: T, ctx: Context[T]): SyncDirective = { @@ -139,7 +133,7 @@ object Timed extends TimedOps with TimedIntervalBetweenOps { } - final class TimedIntervalTransformer[T](matching: T ⇒ Boolean, onInterval: FiniteDuration ⇒ Unit) extends PushStage[T, T] { + final class TimedInterval[T](matching: T ⇒ Boolean, onInterval: FiniteDuration ⇒ Unit) extends PushStage[T, T] { private var prevNanos = 0L private var matched = 0L diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index bcc8727100..ed417609cf 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -408,16 +408,6 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph def concat[M](second: javadsl.Source[Out @uncheckedVariance, M]): javadsl.Flow[In, Out, Mat @uncheckedVariance Pair M] = new Flow(delegate.concat(second.asScala).mapMaterialized(p ⇒ Pair(p._1, p._2))) - /** - * Applies given [[OperationAttributes]] to a given section. - */ - def section[O](attributes: OperationAttributes, section: japi.Function[javadsl.Flow[Out, Out, Any], javadsl.Flow[Out, O, Any]] @uncheckedVariance): javadsl.Flow[In, O, Mat] = - new Flow(delegate.section(attributes.asScala) { - val scalaToJava = (flow: scaladsl.Flow[Out, Out, Any]) ⇒ new javadsl.Flow(flow) - val javaToScala = (flow: javadsl.Flow[Out, O, Any]) ⇒ flow.asScala - scalaToJava andThen section.apply andThen javaToScala - }) - def withAttributes(attr: OperationAttributes): javadsl.Flow[In, Out, Mat] = new Flow(delegate.withAttributes(attr.asScala)) diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 55c0eff5fc..21e7d114c5 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -498,16 +498,6 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour def flatten[U](strategy: FlattenStrategy[Out, U]): javadsl.Source[U, Mat] = new Source(delegate.flatten(strategy)) - /** - * Applies given [[OperationAttributes]] to a given section. - */ - def section[O](attributes: OperationAttributes, section: japi.Function[javadsl.Flow[Out, Out, Any], javadsl.Flow[Out, O, Any]] @uncheckedVariance): javadsl.Source[O, Mat] = - new Source(delegate.section(attributes.asScala) { - val scalaToJava = (source: scaladsl.Flow[Out, Out, Any]) ⇒ new javadsl.Flow(source) - val javaToScala = (source: javadsl.Flow[Out, O, Any]) ⇒ source.asScala - scalaToJava andThen section.apply andThen javaToScala - }) - def withAttributes(attr: OperationAttributes): javadsl.Source[Out, Mat] = new Source(delegate.withAttributes(attr.asScala)) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 1f59f6a48f..d3a77bd231 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -277,25 +277,6 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module) source.via(this).toMat(sink)(Keep.both).run() } - // FIXME remove (in favor of .via) - def section[O, O2 >: Out, Mat2, Mat3](attributes: OperationAttributes, combine: (Mat, Mat2) ⇒ Mat3)( - section: Flow[O2, O2, Unit] ⇒ Flow[O2, O, Mat2]): Flow[In, O, Mat3] = { - val subFlow = section(Flow[O2]).module.carbonCopy.withAttributes(attributes).wrap() - if (this.isIdentity) new Flow(subFlow).asInstanceOf[Flow[In, O, Mat3]] - else new Flow( - module - .growConnect(subFlow, shape.outlet, subFlow.shape.inlets.head, combine) - .replaceShape(FlowShape(shape.inlet, subFlow.shape.outlets.head))) - } - - /** - * Applies given [[OperationAttributes]] to a given section. - */ - // FIXME remove (in favor of .via) - def section[O, O2 >: Out](attributes: OperationAttributes)(section: Flow[O2, O2, Unit] ⇒ Flow[O2, O, Any]): Flow[In, O, Mat] = { - this.section[O, O2, Any, Mat](attributes, Keep.left)(section) - } - /** Converts this Scala DSL element to it's Java DSL counterpart. */ def asJava: javadsl.Flow[In, Out, Mat] = new javadsl.Flow(this) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index 917bca52b5..5487763456 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -144,20 +144,6 @@ final class Source[+Out, +Mat](private[stream] override val module: Module) */ def ++[Out2 >: Out, M](second: Source[Out2, M]): Source[Out2, (Mat, M)] = concat(second) - /** - * Applies given [[OperationAttributes]] to a given section. - */ - def section[O, O2 >: Out, Mat2, Mat3](attributes: OperationAttributes, combine: (Mat, Mat2) ⇒ Mat3)(section: Flow[O2, O2, Unit] ⇒ Flow[O2, O, Mat2]): Source[O, Mat3] = { - val subFlow = section(Flow[O2]).module.carbonCopy.withAttributes(attributes).wrap() - new Source( - module - .growConnect(subFlow, shape.outlet, subFlow.shape.inlets.head, combine) - .replaceShape(SourceShape(subFlow.shape.outlets.head))) - } - - def section[O, O2 >: Out](attributes: OperationAttributes)(section: Flow[O2, O2, Unit] ⇒ Flow[O2, O, Any]): Source[O, Mat] = - this.section[O, O2, Any, Mat](attributes, Keep.left)(section) - override def withAttributes(attr: OperationAttributes): Repr[Out, Mat] = new Source(module.withAttributes(attr).wrap())