diff --git a/akka-http-core/src/main/scala/akka/http/engine/client/HttpClient.scala b/akka-http-core/src/main/scala/akka/http/engine/client/HttpClient.scala index 715ce81fbd..9020237811 100644 --- a/akka-http-core/src/main/scala/akka/http/engine/client/HttpClient.scala +++ b/akka-http-core/src/main/scala/akka/http/engine/client/HttpClient.scala @@ -10,6 +10,7 @@ import akka.util.ByteString import akka.event.LoggingAdapter import akka.stream.FlattenStrategy import akka.stream.scaladsl._ +import akka.stream.scaladsl.OperationAttributes._ import akka.http.model.{ HttpMethod, HttpRequest, HttpResponse } import akka.http.engine.rendering.{ RequestRenderingContext, HttpRequestRendererFactory } import akka.http.engine.parsing.{ HttpHeaderParser, HttpResponseParser } @@ -40,14 +41,14 @@ private[http] object HttpClient { Flow[HttpRequest] .map(requestMethodByPass) - .transform("renderer", () ⇒ requestRendererFactory.newRenderer) + .section(name("renderer"))(_.transform(() ⇒ requestRendererFactory.newRenderer)) .flatten(FlattenStrategy.concat) - .transform("errorLogger", () ⇒ errorLogger(log, "Outgoing request stream error")) + .section(name("errorLogger"))(_.transform(() ⇒ errorLogger(log, "Outgoing request stream error"))) .via(transport) - .transform("rootParser", () ⇒ + .section(name("rootParser"))(_.transform(() ⇒ // each connection uses a single (private) response parser instance for all its responses // which builds a cache of all header instances seen on that connection - rootParser.createShallowCopy(requestMethodByPass)) + rootParser.createShallowCopy(requestMethodByPass))) .splitWhen(_.isInstanceOf[MessageStart]) .headAndTail .collect { diff --git a/akka-http-core/src/main/scala/akka/http/engine/parsing/HttpRequestParser.scala b/akka-http-core/src/main/scala/akka/http/engine/parsing/HttpRequestParser.scala index 5ffc57d77d..b5059c5355 100644 --- a/akka-http-core/src/main/scala/akka/http/engine/parsing/HttpRequestParser.scala +++ b/akka-http-core/src/main/scala/akka/http/engine/parsing/HttpRequestParser.scala @@ -7,6 +7,7 @@ package akka.http.engine.parsing import java.lang.{ StringBuilder ⇒ JStringBuilder } import scala.annotation.tailrec import akka.actor.ActorRef +import akka.stream.scaladsl.OperationAttributes._ import akka.stream.stage.{ Context, PushPullStage } import akka.stream.scaladsl.Source import akka.util.ByteString @@ -126,7 +127,7 @@ private[http] class HttpRequestParser(_settings: ParserSettings, def expect100continueHandling[T]: Source[T] ⇒ Source[T] = if (expect100continue) { - _.transform("expect100continueTrigger", () ⇒ new PushPullStage[T, T] { + _.section(name("expect100continueTrigger"))(_.transform(() ⇒ new PushPullStage[T, T] { private var oneHundredContinueSent = false def onPush(elem: T, ctx: Context[T]) = ctx.push(elem) def onPull(ctx: Context[T]) = { @@ -137,7 +138,7 @@ private[http] class HttpRequestParser(_settings: ParserSettings, } ctx.pull() } - }) + })) } else identityFunc teh match { diff --git a/akka-http-core/src/main/scala/akka/http/engine/rendering/HttpRequestRendererFactory.scala b/akka-http-core/src/main/scala/akka/http/engine/rendering/HttpRequestRendererFactory.scala index df3a23b9f3..480a624e7f 100644 --- a/akka-http-core/src/main/scala/akka/http/engine/rendering/HttpRequestRendererFactory.scala +++ b/akka-http-core/src/main/scala/akka/http/engine/rendering/HttpRequestRendererFactory.scala @@ -8,6 +8,7 @@ import java.net.InetSocketAddress import scala.annotation.tailrec import akka.event.LoggingAdapter import akka.util.ByteString +import akka.stream.scaladsl.OperationAttributes._ import akka.stream.scaladsl.Source import akka.stream.stage._ import akka.http.model._ @@ -114,11 +115,11 @@ private[http] class HttpRequestRendererFactory(userAgentHeader: Option[headers.` case HttpEntity.Default(_, contentLength, data) ⇒ renderContentLength(contentLength) ~~ CrLf renderByteStrings(r, - data.transform("checkContentLength", () ⇒ new CheckContentLengthTransformer(contentLength))) + data.section(name("checkContentLength"))(_.transform(() ⇒ new CheckContentLengthTransformer(contentLength)))) case HttpEntity.Chunked(_, chunks) ⇒ r ~~ CrLf - renderByteStrings(r, chunks.transform("chunkTransform", () ⇒ new ChunkTransformer)) + renderByteStrings(r, chunks.section(name("chunkTransform"))(_.transform(() ⇒ new ChunkTransformer))) } renderRequestLine() diff --git a/akka-http-core/src/main/scala/akka/http/engine/rendering/HttpResponseRendererFactory.scala b/akka-http-core/src/main/scala/akka/http/engine/rendering/HttpResponseRendererFactory.scala index bbeb1b07c5..04e1f5309e 100644 --- a/akka-http-core/src/main/scala/akka/http/engine/rendering/HttpResponseRendererFactory.scala +++ b/akka-http-core/src/main/scala/akka/http/engine/rendering/HttpResponseRendererFactory.scala @@ -7,6 +7,7 @@ package akka.http.engine.rendering import scala.annotation.tailrec import akka.event.LoggingAdapter import akka.util.ByteString +import akka.stream.scaladsl.OperationAttributes._ import akka.stream.scaladsl.Source import akka.stream.stage._ import akka.http.model._ @@ -155,7 +156,7 @@ private[http] class HttpResponseRendererFactory(serverHeader: Option[headers.Ser renderHeaders(headers.toList) renderEntityContentType(r, entity) renderContentLengthHeader(contentLength) ~~ CrLf - byteStrings(data.transform("checkContentLength", () ⇒ new CheckContentLengthTransformer(contentLength))) + byteStrings(data.section(name("checkContentLength"))(_.transform(() ⇒ new CheckContentLengthTransformer(contentLength)))) case HttpEntity.CloseDelimited(_, data) ⇒ renderHeaders(headers.toList, alwaysClose = ctx.requestMethod != HttpMethods.HEAD) @@ -168,7 +169,7 @@ private[http] class HttpResponseRendererFactory(serverHeader: Option[headers.Ser else { renderHeaders(headers.toList) renderEntityContentType(r, entity) ~~ CrLf - byteStrings(chunks.transform("renderChunks", () ⇒ new ChunkTransformer)) + byteStrings(chunks.section(name("renderChunks"))(_.transform(() ⇒ new ChunkTransformer))) } } diff --git a/akka-http-core/src/main/scala/akka/http/engine/server/HttpServer.scala b/akka-http-core/src/main/scala/akka/http/engine/server/HttpServer.scala index 7974aaf586..cd820d7263 100644 --- a/akka-http-core/src/main/scala/akka/http/engine/server/HttpServer.scala +++ b/akka-http-core/src/main/scala/akka/http/engine/server/HttpServer.scala @@ -8,8 +8,10 @@ import akka.actor.{ ActorRef, Props } import akka.util.ByteString import akka.event.LoggingAdapter import akka.stream.stage.PushPullStage +import akka.stream.scaladsl.OperationAttributes._ import akka.stream.FlattenStrategy import akka.stream.scaladsl._ +import akka.stream.stage.PushPullStage import akka.http.engine.parsing.{ HttpHeaderParser, HttpRequestParser } import akka.http.engine.rendering.{ ResponseRenderingContext, HttpResponseRendererFactory } import akka.http.engine.parsing.ParserOutput._ @@ -51,10 +53,10 @@ private[http] object HttpServer { val bypassFanout = Broadcast[RequestOutput]("bypassFanout") val bypassMerge = new BypassMerge(settings, log) - val requestParsing = Flow[ByteString].transform("rootParser", () ⇒ + val requestParsing = Flow[ByteString].section(name("rootParser"))(_.transform(() ⇒ // each connection uses a single (private) request parser instance for all its requests // which builds a cache of all header instances seen on that connection - rootParser.createShallowCopy(() ⇒ oneHundredContinueRef)) + rootParser.createShallowCopy(() ⇒ oneHundredContinueRef))) val requestPreparation = Flow[RequestOutput] @@ -69,10 +71,10 @@ private[http] object HttpServer { val rendererPipeline = Flow[ResponseRenderingContext] - .transform("recover", () ⇒ new ErrorsTo500ResponseRecovery(log)) // FIXME: simplify after #16394 is closed - .transform("renderer", () ⇒ responseRendererFactory.newRenderer) + .section(name("recover"))(_.transform(() ⇒ new ErrorsTo500ResponseRecovery(log))) // FIXME: simplify after #16394 is closed + .section(name("renderer"))(_.transform(() ⇒ responseRendererFactory.newRenderer)) .flatten(FlattenStrategy.concat) - .transform("errorLogger", () ⇒ errorLogger(log, "Outgoing response stream error")) + .section(name("errorLogger"))(_.transform(() ⇒ errorLogger(log, "Outgoing response stream error"))) val transportIn = UndefinedSource[ByteString] val transportOut = UndefinedSink[ByteString] diff --git a/akka-http-core/src/main/scala/akka/http/model/HttpEntity.scala b/akka-http-core/src/main/scala/akka/http/model/HttpEntity.scala index 12e02ec374..4c50df13bc 100644 --- a/akka-http-core/src/main/scala/akka/http/model/HttpEntity.scala +++ b/akka-http-core/src/main/scala/akka/http/model/HttpEntity.scala @@ -12,6 +12,7 @@ import scala.concurrent.duration.FiniteDuration import scala.collection.immutable import scala.util.control.NonFatal import akka.util.ByteString +import akka.stream.scaladsl.OperationAttributes._ import akka.stream.FlowMaterializer import akka.stream.scaladsl._ import akka.stream.TimerTransformer @@ -63,7 +64,7 @@ sealed trait HttpEntity extends japi.HttpEntity { } // TODO timerTransform is meant to be replaced / rewritten, it's currently private[akka]; See https://github.com/akka/akka/issues/16393 - dataBytes.timerTransform("toStrict", transformer).runWith(Sink.head) + dataBytes.section(name("toStrict"))(_.timerTransform(transformer)).runWith(Sink.head) } /** diff --git a/akka-http-core/src/main/scala/akka/http/util/StreamUtils.scala b/akka-http-core/src/main/scala/akka/http/util/StreamUtils.scala index d2f2c24288..dd096e945a 100644 --- a/akka-http-core/src/main/scala/akka/http/util/StreamUtils.scala +++ b/akka-http-core/src/main/scala/akka/http/util/StreamUtils.scala @@ -19,6 +19,7 @@ import akka.stream.impl.Ast.AstNode import akka.stream.impl.Ast.StageFactory import akka.stream.impl.fusing.IteratorInterpreter import akka.stream.scaladsl._ +import akka.stream.scaladsl.OperationAttributes._ import akka.stream.stage._ import akka.stream.impl import akka.util.ByteString @@ -44,7 +45,7 @@ private[http] object StreamUtils { override def onUpstreamFinish(ctx: Context[ByteString]): TerminationDirective = ctx.absorbTermination() } - Flow[ByteString].transform("transformBytes", () ⇒ transformer) + Flow[ByteString].section(name("transformBytes"))(_.transform(() ⇒ transformer)) } def failedPublisher[T](ex: Throwable): Publisher[T] = @@ -59,7 +60,7 @@ private[http] object StreamUtils { ctx.fail(f(cause)) } - Flow[ByteString].transform("transformError", () ⇒ transformer) + Flow[ByteString].section(name("transformError"))(_.transform(() ⇒ transformer)) } def sliceBytesTransformer(start: Long, length: Long): Flow[ByteString, ByteString] = { @@ -90,7 +91,7 @@ private[http] object StreamUtils { override def initial: State = if (start > 0) skipping else taking(length) } - Flow[ByteString].transform("sliceBytes", () ⇒ transformer) + Flow[ByteString].section(name("sliceBytes"))(_.transform(() ⇒ transformer)) } /** @@ -182,7 +183,7 @@ private[http] object StreamUtils { Try { transformer match { // FIXME #16382 right now the flow can't use keys, should that be allowed? - case Pipe(ops, keys) if keys.isEmpty ⇒ + case Pipe(ops, keys, _) if keys.isEmpty ⇒ if (ops.isEmpty) Some(sourceData) else { diff --git a/akka-http-core/src/main/scala/akka/http/util/package.scala b/akka-http-core/src/main/scala/akka/http/util/package.scala index f0be761f04..d7808d8ac3 100644 --- a/akka-http-core/src/main/scala/akka/http/util/package.scala +++ b/akka-http-core/src/main/scala/akka/http/util/package.scala @@ -57,21 +57,20 @@ package object util { private[http] implicit class EnhancedSource[T](val underlying: Source[T]) { def printEvent(marker: String): Source[T] = - underlying.transform("transform", - () ⇒ new PushStage[T, T] { - override def onPush(element: T, ctx: Context[T]): Directive = { - println(s"$marker: $element") - ctx.push(element) - } - override def onUpstreamFailure(cause: Throwable, ctx: Context[T]): TerminationDirective = { - println(s"$marker: Failure $cause") - super.onUpstreamFailure(cause, ctx) - } - override def onUpstreamFinish(ctx: Context[T]): TerminationDirective = { - println(s"$marker: Terminated") - super.onUpstreamFinish(ctx) - } - }) + underlying.transform(() ⇒ new PushStage[T, T] { + override def onPush(element: T, ctx: Context[T]): Directive = { + println(s"$marker: $element") + ctx.push(element) + } + override def onUpstreamFailure(cause: Throwable, ctx: Context[T]): TerminationDirective = { + println(s"$marker: Failure $cause") + super.onUpstreamFailure(cause, ctx) + } + override def onUpstreamFinish(ctx: Context[T]): TerminationDirective = { + println(s"$marker: Terminated") + super.onUpstreamFinish(ctx) + } + }) /** * Drain this stream into a Vector and provide it as a future value. diff --git a/akka-http-core/src/test/scala/akka/http/engine/parsing/RequestParserSpec.scala b/akka-http-core/src/test/scala/akka/http/engine/parsing/RequestParserSpec.scala index 177f637e2b..b88ab93b67 100644 --- a/akka-http-core/src/test/scala/akka/http/engine/parsing/RequestParserSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/engine/parsing/RequestParserSpec.scala @@ -10,6 +10,7 @@ import scala.concurrent.duration._ import org.scalatest.{ BeforeAndAfterAll, FreeSpec, Matchers } import org.scalatest.matchers.Matcher import akka.stream.scaladsl._ +import akka.stream.scaladsl.OperationAttributes._ import akka.stream.FlattenStrategy import akka.stream.FlowMaterializer import akka.util.ByteString @@ -441,7 +442,7 @@ class RequestParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll { def multiParse(parser: HttpRequestParser)(input: Seq[String]): Seq[Either[RequestOutput, StrictEqualHttpRequest]] = Source(input.toList) .map(ByteString.apply) - .transform("parser", () ⇒ parser) + .section(name("parser"))(_.transform(() ⇒ parser)) .splitWhen(x ⇒ x.isInstanceOf[MessageStart] || x.isInstanceOf[EntityStreamError]) .headAndTail .collect { diff --git a/akka-http-core/src/test/scala/akka/http/engine/parsing/ResponseParserSpec.scala b/akka-http-core/src/test/scala/akka/http/engine/parsing/ResponseParserSpec.scala index 58569af108..199f218267 100644 --- a/akka-http-core/src/test/scala/akka/http/engine/parsing/ResponseParserSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/engine/parsing/ResponseParserSpec.scala @@ -10,6 +10,7 @@ import scala.concurrent.duration._ import org.scalatest.{ BeforeAndAfterAll, FreeSpec, Matchers } import org.scalatest.matchers.Matcher import akka.stream.scaladsl._ +import akka.stream.scaladsl.OperationAttributes._ import akka.stream.FlattenStrategy import akka.stream.FlowMaterializer import akka.util.ByteString @@ -260,7 +261,7 @@ class ResponseParserSpec extends FreeSpec with Matchers with BeforeAndAfterAll { val future = Source(input.toList) .map(ByteString.apply) - .transform("parser", () ⇒ newParser(requestMethod)) + .section(name("parser"))(_.transform(() ⇒ newParser(requestMethod))) .splitWhen(x ⇒ x.isInstanceOf[MessageStart] || x.isInstanceOf[EntityStreamError]) .headAndTail .collect { diff --git a/akka-http-core/src/test/scala/akka/http/engine/rendering/RequestRendererSpec.scala b/akka-http-core/src/test/scala/akka/http/engine/rendering/RequestRendererSpec.scala index 43f2926d0c..805c91b8d2 100644 --- a/akka-http-core/src/test/scala/akka/http/engine/rendering/RequestRendererSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/engine/rendering/RequestRendererSpec.scala @@ -16,6 +16,7 @@ import akka.http.model._ import akka.http.model.headers._ import akka.http.util._ import akka.stream.scaladsl._ +import akka.stream.scaladsl.OperationAttributes._ import akka.stream.FlowMaterializer import akka.stream.impl.SynchronousIterablePublisher import HttpEntity._ @@ -253,7 +254,7 @@ class RequestRendererSpec extends FreeSpec with Matchers with BeforeAndAfterAll equal(expected.stripMarginWithNewline("\r\n")).matcher[String] compose { request ⇒ val renderer = newRenderer val byteStringSource = Await.result(Source.singleton(RequestRenderingContext(request, serverAddress)). - transform("renderer", () ⇒ renderer). + section(name("renderer"))(_.transform(() ⇒ renderer)). runWith(Sink.head), 1.second) val future = byteStringSource.grouped(1000).runWith(Sink.head).map(_.reduceLeft(_ ++ _).utf8String) Await.result(future, 250.millis) diff --git a/akka-http-core/src/test/scala/akka/http/engine/rendering/ResponseRendererSpec.scala b/akka-http-core/src/test/scala/akka/http/engine/rendering/ResponseRendererSpec.scala index 4269310194..63492ba916 100644 --- a/akka-http-core/src/test/scala/akka/http/engine/rendering/ResponseRendererSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/engine/rendering/ResponseRendererSpec.scala @@ -16,6 +16,7 @@ import akka.http.model.headers._ import akka.http.util._ import akka.util.ByteString import akka.stream.scaladsl._ +import akka.stream.scaladsl.OperationAttributes._ import akka.stream.FlowMaterializer import HttpEntity._ @@ -400,7 +401,7 @@ class ResponseRendererSpec extends FreeSpec with Matchers with BeforeAndAfterAll equal(expected.stripMarginWithNewline("\r\n") -> close).matcher[(String, Boolean)] compose { ctx ⇒ val renderer = newRenderer val byteStringSource = Await.result(Source.singleton(ctx). - transform("renderer", () ⇒ renderer). + section(name("renderer"))(_.transform(() ⇒ renderer)). runWith(Sink.head), 1.second) val future = byteStringSource.grouped(1000).runWith(Sink.head).map(_.reduceLeft(_ ++ _).utf8String) Await.result(future, 250.millis) -> renderer.isComplete diff --git a/akka-http/src/main/scala/akka/http/marshalling/MultipartMarshallers.scala b/akka-http/src/main/scala/akka/http/marshalling/MultipartMarshallers.scala index 698b34f9ca..64c92165d2 100644 --- a/akka-http/src/main/scala/akka/http/marshalling/MultipartMarshallers.scala +++ b/akka-http/src/main/scala/akka/http/marshalling/MultipartMarshallers.scala @@ -37,7 +37,7 @@ trait MultipartMarshallers { HttpEntity(contentType, data) case _ ⇒ val chunks = value.parts - .transform("bodyPartRenderer", () ⇒ BodyPartRenderer.streamed(boundary, charset.nioCharset, partHeadersSizeHint = 128, log)) + .transform(() ⇒ BodyPartRenderer.streamed(boundary, charset.nioCharset, partHeadersSizeHint = 128, log)) .flatten(FlattenStrategy.concat) HttpEntity.Chunked(contentType, chunks) } diff --git a/akka-http/src/main/scala/akka/http/unmarshalling/MultipartUnmarshallers.scala b/akka-http/src/main/scala/akka/http/unmarshalling/MultipartUnmarshallers.scala index e343dd71bf..d77407ba52 100644 --- a/akka-http/src/main/scala/akka/http/unmarshalling/MultipartUnmarshallers.scala +++ b/akka-http/src/main/scala/akka/http/unmarshalling/MultipartUnmarshallers.scala @@ -85,7 +85,7 @@ trait MultipartUnmarshallers { createStrict(mediaType, builder.result()) case _ ⇒ val bodyParts = entity.dataBytes - .transform("bodyPart", () ⇒ parser) + .transform(() ⇒ parser) .splitWhen(_.isInstanceOf[BodyPartStart]) .headAndTail .collect { diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/FusableProcessorTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/FusableProcessorTest.scala index 906ad5efb0..54b5534658 100644 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/FusableProcessorTest.scala +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/FusableProcessorTest.scala @@ -6,6 +6,7 @@ package akka.stream.tck import java.util.concurrent.atomic.AtomicInteger import akka.stream.impl.{ Ast, ActorBasedFlowMaterializer } import akka.stream.scaladsl.MaterializedMap +import akka.stream.scaladsl.OperationAttributes._ import akka.stream.{ FlowMaterializer, MaterializerSettings } import org.reactivestreams.{ Publisher, Processor } import akka.stream.impl.fusing.Map @@ -25,7 +26,7 @@ class FusableProcessorTest extends AkkaIdentityProcessorVerification[Int] { val flowName = getClass.getSimpleName + "-" + processorCounter.incrementAndGet() val (processor, _ns) = materializer.asInstanceOf[ActorBasedFlowMaterializer].processorForNode( - Ast.Fused(List(Map[Int, Int](identity)), "identity"), flowName, 1) + Ast.Fused(List(Map[Int, Int](identity)), name("identity")), flowName, 1) processor.asInstanceOf[Processor[Int, Int]] } diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/TransformProcessorTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/TransformProcessorTest.scala index c84a26b85e..46233f5d16 100644 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/TransformProcessorTest.scala +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/TransformProcessorTest.scala @@ -3,6 +3,7 @@ */ package akka.stream.tck +import akka.stream.scaladsl.OperationAttributes._ import akka.stream.MaterializerSettings import akka.stream.impl.ActorBasedFlowMaterializer import akka.stream.impl.Ast @@ -34,7 +35,7 @@ class TransformProcessorTest extends AkkaIdentityProcessorVerification[Int] { } val (processor, _) = materializer.asInstanceOf[ActorBasedFlowMaterializer].processorForNode( - Ast.StageFactory(mkStage, "transform"), flowName, 1) + Ast.StageFactory(mkStage, name("transform")), flowName, 1) processor.asInstanceOf[Processor[Int, Int]] } diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java index 99a29fdf6d..185410a07b 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java @@ -102,7 +102,7 @@ public class FlowTest extends StreamTest { final JavaTestKit probe = new JavaTestKit(system); final Iterable input = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7); // duplicate each element, stop after 4 elements, and emit sum to the end - Source.from(input).transform("publish", new Creator>() { + Source.from(input).transform(new Creator>() { @Override public PushPullStage create() throws Exception { return new StatefulStage() { @@ -248,9 +248,24 @@ public class FlowTest extends StreamTest { @Test public void mustBeAbleToUseMerge() throws Exception { - final Flow f1 = Flow.of(String.class).transform("f1", this.op()); // javadsl - final Flow f2 = Flow.of(String.class).transform("f2", this.op()); // javadsl - final Flow f3 = Flow.of(String.class).transform("f2", this.op()); // javadsl + final Flow f1 = Flow.of(String.class).section(OperationAttributes.name("f1"), new Function, Flow>() { + @Override + public Flow apply(Flow flow) { + return flow.transform(FlowTest.this.op()); + } + }); + final Flow f2 = Flow.of(String.class).section(OperationAttributes.name("f2"), new Function, Flow>() { + @Override + public Flow apply(Flow flow) { + return flow.transform(FlowTest.this.op()); + } + }); + final Flow f3 = Flow.of(String.class).section(OperationAttributes.name("f3"), new Function, Flow>() { + @Override + public Flow apply(Flow flow) { + return flow.transform(FlowTest.this.op()); + } + }); final Source in1 = Source.from(Arrays.asList("a", "b", "c")); final Source in2 = Source.from(Arrays.asList("d", "e", "f")); diff --git a/akka-stream-tests/src/test/scala/akka/stream/DslConsistencySpec.scala b/akka-stream-tests/src/test/scala/akka/stream/DslConsistencySpec.scala index bc4848a8a2..d7bb4bc4bc 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/DslConsistencySpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/DslConsistencySpec.scala @@ -33,7 +33,7 @@ class DslConsistencySpec extends WordSpec with Matchers { val ignore = Set("equals", "hashCode", "notify", "notifyAll", "wait", "toString", "getClass") ++ - Set("create", "apply", "ops", "appendJava", "andThen") ++ + Set("create", "apply", "ops", "appendJava", "andThen", "withAttributes") ++ Set("asScala", "asJava") val allowMissing: Map[Class[_], Set[String]] = Map( diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDispatcherSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDispatcherSpec.scala index 44c9b9ffad..9aa211b3fd 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDispatcherSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDispatcherSpec.scala @@ -6,20 +6,28 @@ package akka.stream.scaladsl import akka.testkit.TestProbe import akka.stream.testkit.AkkaSpec import akka.stream.FlowMaterializer +import akka.stream.MaterializerSettings -class FlowDispatcherSpec extends AkkaSpec { +class FlowDispatcherSpec extends AkkaSpec("my-dispatcher = ${akka.test.stream-dispatcher}") { - implicit val materializer = FlowMaterializer() + val defaultSettings = MaterializerSettings(system) - "Flow with dispatcher setting" must { - "use the specified dispatcher" in { - val probe = TestProbe() - val p = Source(List(1, 2, 3)).map(i ⇒ - { probe.ref ! Thread.currentThread().getName(); i }). - to(Sink.ignore).run() - probe.receiveN(3) foreach { - case s: String ⇒ s should startWith(system.name + "-akka.test.stream-dispatcher") - } + def testDispatcher(settings: MaterializerSettings = defaultSettings, dispatcher: String = "akka.test.stream-dispatcher") = { + + implicit val materializer = FlowMaterializer(settings) + + val probe = TestProbe() + val p = Source(List(1, 2, 3)).map(i ⇒ + { probe.ref ! Thread.currentThread().getName(); i }). + to(Sink.ignore).run() + probe.receiveN(3) foreach { + case s: String ⇒ s should startWith(system.name + "-" + dispatcher) } } + + "Flow with dispatcher setting" must { + "use the default dispatcher" in testDispatcher() + + "use custom dispatcher" in testDispatcher(defaultSettings.withDispatcher("my-dispatcher"), "my-dispatcher") + } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGraphCompileSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGraphCompileSpec.scala index 10204d9449..9d4d9c7ed5 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGraphCompileSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGraphCompileSpec.scala @@ -3,8 +3,9 @@ */ package akka.stream.scaladsl -import akka.stream.OverflowStrategy +import akka.stream.scaladsl.OperationAttributes._ import akka.stream.FlowMaterializer +import akka.stream.OverflowStrategy import akka.stream.testkit.AkkaSpec import akka.stream.testkit.StreamTestKit.{ PublisherProbe, SubscriberProbe } import akka.stream.stage._ @@ -28,12 +29,12 @@ class FlowGraphCompileSpec extends AkkaSpec { val apples = () ⇒ Iterator.continually(new Apple) - val f1 = Flow[String].transform("f1", op[String, String]) - val f2 = Flow[String].transform("f2", op[String, String]) - val f3 = Flow[String].transform("f3", op[String, String]) - val f4 = Flow[String].transform("f4", op[String, String]) - val f5 = Flow[String].transform("f5", op[String, String]) - val f6 = Flow[String].transform("f6", op[String, String]) + val f1 = Flow[String].section(name("f1"))(_.transform(op[String, String])) + val f2 = Flow[String].section(name("f2"))(_.transform(op[String, String])) + val f3 = Flow[String].section(name("f3"))(_.transform(op[String, String])) + val f4 = Flow[String].section(name("f4"))(_.transform(op[String, String])) + val f5 = Flow[String].section(name("f5"))(_.transform(op[String, String])) + val f6 = Flow[String].section(name("f6"))(_.transform(op[String, String])) val in1 = Source(List("a", "b", "c")) val in2 = Source(List("d", "e", "f")) @@ -164,7 +165,7 @@ class FlowGraphCompileSpec extends AkkaSpec { val out2 = Sink.publisher[String] val out9 = Sink.publisher[String] val out10 = Sink.publisher[String] - def f(s: String) = Flow[String].transform(s, op[String, String]) + def f(s: String) = Flow[String].section(name(s))(_.transform(op[String, String])) import FlowGraphImplicits._ in7 ~> f("a") ~> b7 ~> f("b") ~> m11 ~> f("c") ~> b11 ~> f("d") ~> out2 diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSectionSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSectionSpec.scala new file mode 100644 index 0000000000..4ca75d53bb --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSectionSpec.scala @@ -0,0 +1,80 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.scaladsl + +import akka.stream.scaladsl.OperationAttributes._ +import akka.stream.FlowMaterializer +import akka.stream.testkit.AkkaSpec +import akka.actor.ActorRef +import akka.testkit.TestProbe + +object FlowSectionSpec { + val config = "my-dispatcher = ${akka.test.stream-dispatcher}" +} + +class FlowSectionSpec extends AkkaSpec(FlowSectionSpec.config) { + + implicit val mat = FlowMaterializer() + + "A flow" can { + + "have an op with a name" in { + val n = "Converter to Int" + val f = Flow[Int].section(name(n))(_.map(_.toInt)) + f.toString should include(n) + } + + "have an op with a different dispatcher" in { + val flow = Flow[Int].section(dispatcher("my-dispatcher"))(_.map(sendThreadNameTo(testActor))) + + Source.singleton(1).via(flow).to(Sink.ignore).run() + + receiveN(1).foreach { + case s: String ⇒ s should include("my-dispatcher") + } + } + + "have an op section with a name" in { + val n = "Uppercase reverser" + val f = Flow[String]. + map(_.toLowerCase()). + section(name(n)) { + _.map(_.toUpperCase). + map(_.reverse) + }. + map(_.toLowerCase()) + f.toString should include(n) + } + + "have an op section with a different dispatcher and name" in { + val defaultDispatcher = TestProbe() + val customDispatcher = TestProbe() + + val f = Flow[Int]. + map(sendThreadNameTo(defaultDispatcher.ref)). + section(dispatcher("my-dispatcher") and name("separate-disptacher")) { + _.map(sendThreadNameTo(customDispatcher.ref)). + map(sendThreadNameTo(customDispatcher.ref)) + }. + map(sendThreadNameTo(defaultDispatcher.ref)) + + Source(0 to 2).via(f).runWith(Sink.ignore) + + defaultDispatcher.receiveN(6).foreach { + case s: String ⇒ s should include("akka.test.stream-dispatcher") + } + + customDispatcher.receiveN(6).foreach { + case s: String ⇒ s should include("my-dispatcher") + } + } + + def sendThreadNameTo[T](probe: ActorRef)(element: T) = { + probe ! Thread.currentThread.getName + element + } + + } + +} diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala index 09a9ab60e4..473379a84b 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala @@ -59,20 +59,20 @@ object FlowSpec { override def processorForNode[In, Out](op: AstNode, flowName: String, n: Int): (Processor[In, Out], MaterializedMap) = { val props = op match { - case f: Fused ⇒ Props(new BrokenActorInterpreter(settings, f.ops, brokenMessage)).withDispatcher(settings.dispatcher) - case Map(f) ⇒ Props(new BrokenActorInterpreter(settings, List(fusing.Map(f)), brokenMessage)) - case Filter(p) ⇒ Props(new BrokenActorInterpreter(settings, List(fusing.Filter(p)), brokenMessage)) - case Drop(n) ⇒ Props(new BrokenActorInterpreter(settings, List(fusing.Drop(n)), brokenMessage)) - case Take(n) ⇒ Props(new BrokenActorInterpreter(settings, List(fusing.Take(n)), brokenMessage)) - case Collect(pf) ⇒ Props(new BrokenActorInterpreter(settings, List(fusing.Collect(pf)), brokenMessage)) - case Scan(z, f) ⇒ Props(new BrokenActorInterpreter(settings, List(fusing.Scan(z, f)), brokenMessage)) - case Expand(s, f) ⇒ Props(new BrokenActorInterpreter(settings, List(fusing.Expand(s, f)), brokenMessage)) - case Conflate(s, f) ⇒ Props(new BrokenActorInterpreter(settings, List(fusing.Conflate(s, f)), brokenMessage)) - case Buffer(n, s) ⇒ Props(new BrokenActorInterpreter(settings, List(fusing.Buffer(n, s)), brokenMessage)) - case MapConcat(f) ⇒ Props(new BrokenActorInterpreter(settings, List(fusing.MapConcat(f)), brokenMessage)) - case o ⇒ ActorProcessorFactory.props(this, o) + case f: Fused ⇒ Props(new BrokenActorInterpreter(settings, f.ops, brokenMessage)) + case Map(f, _) ⇒ Props(new BrokenActorInterpreter(settings, List(fusing.Map(f)), brokenMessage)) + case Filter(p, _) ⇒ Props(new BrokenActorInterpreter(settings, List(fusing.Filter(p)), brokenMessage)) + case Drop(n, _) ⇒ Props(new BrokenActorInterpreter(settings, List(fusing.Drop(n)), brokenMessage)) + case Take(n, _) ⇒ Props(new BrokenActorInterpreter(settings, List(fusing.Take(n)), brokenMessage)) + case Collect(pf, _) ⇒ Props(new BrokenActorInterpreter(settings, List(fusing.Collect(pf)), brokenMessage)) + case Scan(z, f, _) ⇒ Props(new BrokenActorInterpreter(settings, List(fusing.Scan(z, f)), brokenMessage)) + case Expand(s, f, _) ⇒ Props(new BrokenActorInterpreter(settings, List(fusing.Expand(s, f)), brokenMessage)) + case Conflate(s, f, _) ⇒ Props(new BrokenActorInterpreter(settings, List(fusing.Conflate(s, f)), brokenMessage)) + case Buffer(n, s, _) ⇒ Props(new BrokenActorInterpreter(settings, List(fusing.Buffer(n, s)), brokenMessage)) + case MapConcat(f, _) ⇒ Props(new BrokenActorInterpreter(settings, List(fusing.MapConcat(f)), brokenMessage)) + case o ⇒ ActorProcessorFactory.props(this, o) } - val impl = actorOf(props, s"$flowName-$n-${op.name}") + val impl = actorOf(props.withDispatcher(settings.dispatcher), s"$flowName-$n-${op.attributes.name}") (ActorProcessorFactory(impl), MaterializedMap.empty) } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowStageSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowStageSpec.scala index eba691bcf2..8068784b8c 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowStageSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowStageSpec.scala @@ -25,7 +25,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug "produce one-to-one transformation as expected" in { val p = Source(List(1, 2, 3)).runWith(Sink.publisher) val p2 = Source(p). - transform("transform", () ⇒ new PushStage[Int, Int] { + transform(() ⇒ new PushStage[Int, Int] { var tot = 0 override def onPush(elem: Int, ctx: Context[Int]) = { tot += elem @@ -48,7 +48,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug "produce one-to-several transformation as expected" in { val p = Source(List(1, 2, 3)).runWith(Sink.publisher) val p2 = Source(p). - transform("transform", () ⇒ new StatefulStage[Int, Int] { + transform(() ⇒ new StatefulStage[Int, Int] { var tot = 0 lazy val waitForNext = new State { @@ -85,7 +85,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug "produce dropping transformation as expected" in { val p = Source(List(1, 2, 3, 4)).runWith(Sink.publisher) val p2 = Source(p). - transform("transform", () ⇒ new PushStage[Int, Int] { + transform(() ⇒ new PushStage[Int, Int] { var tot = 0 override def onPush(elem: Int, ctx: Context[Int]) = { tot += elem @@ -111,14 +111,14 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug "produce multi-step transformation as expected" in { val p = Source(List("a", "bc", "def")).runWith(Sink.publisher) val p2 = Source(p). - transform("transform", () ⇒ new PushStage[String, Int] { + transform(() ⇒ new PushStage[String, Int] { var concat = "" override def onPush(elem: String, ctx: Context[Int]) = { concat += elem ctx.push(concat.length) } }). - transform("transform", () ⇒ new PushStage[Int, Int] { + transform(() ⇒ new PushStage[Int, Int] { var tot = 0 override def onPush(length: Int, ctx: Context[Int]) = { tot += length @@ -150,7 +150,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug "support emit onUpstreamFinish" in { val p = Source(List("a")).runWith(Sink.publisher) val p2 = Source(p). - transform("transform", () ⇒ new StatefulStage[String, String] { + transform(() ⇒ new StatefulStage[String, String] { var s = "" override def initial = new State { override def onPush(element: String, ctx: Context[String]) = { @@ -173,7 +173,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug "allow early finish" in { val p = StreamTestKit.PublisherProbe[Int]() val p2 = Source(p). - transform("transform", () ⇒ new PushStage[Int, Int] { + transform(() ⇒ new PushStage[Int, Int] { var s = "" override def onPush(element: Int, ctx: Context[Int]) = { s += element @@ -199,7 +199,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug "report error when exception is thrown" in { val p = Source(List(1, 2, 3)).runWith(Sink.publisher) val p2 = Source(p). - transform("transform", () ⇒ new StatefulStage[Int, Int] { + transform(() ⇒ new StatefulStage[Int, Int] { override def initial = new State { override def onPush(elem: Int, ctx: Context[Int]) = { if (elem == 2) { @@ -227,7 +227,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug val p = Source(List(1, 2, 3)).runWith(Sink.publisher) val p2 = Source(p). map(elem ⇒ if (elem == 2) throw new IllegalArgumentException("two not allowed") else elem). - transform("transform", () ⇒ new StatefulStage[Int, Int] { + transform(() ⇒ new StatefulStage[Int, Int] { override def initial = new State { override def onPush(elem: Int, ctx: Context[Int]) = ctx.push(elem) } @@ -253,7 +253,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug "support cancel as expected" in { val p = Source(List(1, 2, 3)).runWith(Sink.publisher) val p2 = Source(p). - transform("transform", () ⇒ new StatefulStage[Int, Int] { + transform(() ⇒ new StatefulStage[Int, Int] { override def initial = new State { override def onPush(elem: Int, ctx: Context[Int]) = emit(Iterator(elem, elem), ctx) @@ -275,7 +275,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug "support producing elements from empty inputs" in { val p = Source(List.empty[Int]).runWith(Sink.publisher) val p2 = Source(p). - transform("transform", () ⇒ new StatefulStage[Int, Int] { + transform(() ⇒ new StatefulStage[Int, Int] { override def initial = new State { override def onPush(elem: Int, ctx: Context[Int]) = ctx.pull() } @@ -296,7 +296,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug "support converting onComplete into onError" in { val subscriber = StreamTestKit.SubscriberProbe[Int]() - Source(List(5, 1, 2, 3)).transform("transform", () ⇒ new PushStage[Int, Int] { + Source(List(5, 1, 2, 3)).transform(() ⇒ new PushStage[Int, Int] { var expectedNumberOfElements: Option[Int] = None var count = 0 override def onPush(elem: Int, ctx: Context[Int]) = @@ -326,7 +326,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug } "be safe to reuse" in { - val flow = Source(1 to 3).transform("transform", () ⇒ + val flow = Source(1 to 3).transform(() ⇒ new PushStage[Int, Int] { var count = 0 diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowTimerTransformerSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowTimerTransformerSpec.scala index 54c75fa56a..e0f15adff2 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowTimerTransformerSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowTimerTransformerSpec.scala @@ -19,7 +19,7 @@ class FlowTimerTransformerSpec extends AkkaSpec { "produce scheduled ticks as expected" in { val p = StreamTestKit.PublisherProbe[Int]() val p2 = Source(p). - timerTransform("timer", () ⇒ new TimerTransformer[Int, Int] { + timerTransform(() ⇒ new TimerTransformer[Int, Int] { schedulePeriodically("tick", 100.millis) var tickCount = 0 override def onNext(elem: Int) = List(elem) @@ -44,7 +44,7 @@ class FlowTimerTransformerSpec extends AkkaSpec { "schedule ticks when last transformation step (consume)" in { val p = StreamTestKit.PublisherProbe[Int]() val p2 = Source(p). - timerTransform("timer", () ⇒ new TimerTransformer[Int, Int] { + timerTransform(() ⇒ new TimerTransformer[Int, Int] { schedulePeriodically("tick", 100.millis) var tickCount = 0 override def onNext(elem: Int) = List(elem) @@ -68,7 +68,7 @@ class FlowTimerTransformerSpec extends AkkaSpec { val exception = new Exception("Expected exception to the rule") with NoStackTrace val p = StreamTestKit.PublisherProbe[Int]() val p2 = Source(p). - timerTransform("timer", () ⇒ new TimerTransformer[Int, Int] { + timerTransform(() ⇒ new TimerTransformer[Int, Int] { scheduleOnce("tick", 100.millis) def onNext(element: Int) = Nil diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphJunctionAttributesSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphJunctionAttributesSpec.scala new file mode 100644 index 0000000000..ee0114e08d --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphJunctionAttributesSpec.scala @@ -0,0 +1,53 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.scaladsl + +import akka.stream.scaladsl.OperationAttributes._ +import akka.stream.FlowMaterializer +import akka.stream.MaterializerSettings +import akka.stream.testkit.AkkaSpec +import akka.stream.testkit.StreamTestKit +import scala.concurrent.duration._ +import scala.concurrent.Await + +class GraphJunctionAttributesSpec extends AkkaSpec { + + implicit val set = MaterializerSettings(system).withInputBuffer(4, 4) + implicit val mat = FlowMaterializer(set) + + "A zip" should { + + "take custom inputBuffer settings" in { + + sealed abstract class SlowTick + case object SlowTick extends SlowTick + + sealed abstract class FastTick + case object FastTick extends FastTick + + val source = Source[(SlowTick, List[FastTick])]() { implicit b ⇒ + import FlowGraphImplicits._ + + val slow = Source(0.seconds, 100.millis, () ⇒ SlowTick) + val fast = Source(0.seconds, 10.millis, () ⇒ FastTick) + val sink = UndefinedSink[(SlowTick, List[FastTick])] + + val zip = Zip[SlowTick, List[FastTick]](inputBuffer(1, 1)) + + slow ~> zip.left + fast.conflate(tick ⇒ List(tick)) { case (list, tick) ⇒ tick :: list } ~> zip.right + + zip.out ~> sink + + sink + } + + val future = source.grouped(10).runWith(Sink.head) + + // FIXME #16435 drop(2) needed because first two SlowTicks get only one FastTick + Await.result(future, 2.seconds).map(_._2.size).filter(_ == 1).drop(2) should be(Nil) + } + } + +} diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/OptimizingActorBasedFlowMaterializerSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/OptimizingActorBasedFlowMaterializerSpec.scala index c17e986f57..12660a6343 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/OptimizingActorBasedFlowMaterializerSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/OptimizingActorBasedFlowMaterializerSpec.scala @@ -3,6 +3,7 @@ */ package akka.stream.scaladsl +import akka.stream.scaladsl.OperationAttributes._ import akka.stream.{ FlowMaterializer, MaterializerSettings } import akka.stream.impl.{ Optimizations, ActorBasedFlowMaterializer } import akka.stream.testkit.AkkaSpec @@ -20,7 +21,7 @@ class OptimizingActorBasedFlowMaterializerSpec extends AkkaSpec with ImplicitSen val f = Source(1 to 100). drop(4). drop(5). - transform("identity", () ⇒ FlowOps.identityStage). + section(name("identity"))(_.transform(() ⇒ FlowOps.identityStage)). filter(_ % 2 == 0). map(_ * 2). map(identity). diff --git a/akka-stream/src/main/scala/akka/stream/extra/Timed.scala b/akka-stream/src/main/scala/akka/stream/extra/Timed.scala index 89daa87334..7101b3f0f8 100644 --- a/akka-stream/src/main/scala/akka/stream/extra/Timed.scala +++ b/akka-stream/src/main/scala/akka/stream/extra/Timed.scala @@ -8,6 +8,7 @@ import java.util.concurrent.atomic.AtomicLong import scala.concurrent.duration._ import scala.language.implicitConversions import scala.language.existentials +import akka.stream.scaladsl.OperationAttributes._ import akka.stream.scaladsl.Source import akka.stream.scaladsl.Flow import akka.stream.stage._ @@ -22,28 +23,41 @@ private[akka] trait TimedOps { /** * INTERNAL API * - * Measures time from receieving the first element and completion events - one for each subscriber of this `Flow`. + * Measures time from receiving the first element and completion events - one for each subscriber of this `Flow`. */ def timed[I, O](flow: Source[I], measuredOps: Source[I] ⇒ Source[O], onComplete: FiniteDuration ⇒ Unit): Source[O] = { val ctx = new TimedFlowContext - val startWithTime = flow.transform("startTimed", () ⇒ new StartTimedFlow(ctx)) - val userFlow = measuredOps(startWithTime) - userFlow.transform("stopTimed", () ⇒ new StopTimed(ctx, onComplete)) + val startTimed = (f: Source[I]) ⇒ f.transform(() ⇒ new StartTimedFlow(ctx)) + val stopTimed = (f: Source[O]) ⇒ f.transform(() ⇒ new StopTimed(ctx, onComplete)) + + val measured = ((s: Source[I]) ⇒ s) andThen + (_.section(name("startTimed"))(startTimed)) andThen + measuredOps andThen + (_.section(name("stopTimed"))(stopTimed)) + + measured(flow) } /** * INTERNAL API * - * Measures time from receieving the first element and completion events - one for each subscriber of this `Flow`. + * Measures time from receiving the first element and completion events - one for each subscriber of this `Flow`. */ def timed[I, O, Out](flow: Flow[I, O], measuredOps: Flow[I, O] ⇒ Flow[O, Out], onComplete: FiniteDuration ⇒ Unit): Flow[O, Out] = { - // todo is there any other way to provide this for Flow, without duplicating impl? (they don't share any super-type) + // todo is there any other way to provide this for Flow, without duplicating impl? + // they do share a super-type (FlowOps), but all operations of FlowOps return path dependant type val ctx = new TimedFlowContext - val startWithTime: Flow[I, O] = flow.transform("startTimed", () ⇒ new StartTimedFlow(ctx)) - val userFlow: Flow[O, Out] = measuredOps(startWithTime) - userFlow.transform("stopTimed", () ⇒ new StopTimed(ctx, onComplete)) + val startTimed = (f: Flow[I, O]) ⇒ f.transform(() ⇒ new StartTimedFlow(ctx)) + val stopTimed = (f: Flow[O, Out]) ⇒ f.transform(() ⇒ new StopTimed(ctx, onComplete)) + + val measured = ((f: Flow[I, O]) ⇒ f) andThen + (_.section(name("startTimed"))(startTimed)) andThen + measuredOps andThen + (_.section(name("stopTimed"))(stopTimed)) + + measured(flow) } } @@ -58,18 +72,23 @@ private[akka] trait TimedIntervalBetweenOps { import Timed._ /** - * Measures rolling interval between immediatly subsequent `matching(o: O)` elements. + * Measures rolling interval between immediately subsequent `matching(o: O)` elements. */ def timedIntervalBetween[O](flow: Source[O], matching: O ⇒ Boolean, onInterval: FiniteDuration ⇒ Unit): Source[O] = { - flow.transform("timedInterval", () ⇒ new TimedIntervalTransformer[O](matching, onInterval)) + flow.section(name("timedInterval")) { + _.transform(() ⇒ new TimedIntervalTransformer[O](matching, onInterval)) + } } /** - * Measures rolling interval between immediatly subsequent `matching(o: O)` elements. + * Measures rolling interval between immediately subsequent `matching(o: O)` elements. */ def timedIntervalBetween[I, O](flow: Flow[I, O], matching: O ⇒ Boolean, onInterval: FiniteDuration ⇒ Unit): Flow[I, O] = { - // todo is there any other way to provide this for Flow / Duct, without duplicating impl? (they don't share any super-type) - flow.transform("timedInterval", () ⇒ new TimedIntervalTransformer[O](matching, onInterval)) + // todo is there any other way to provide this for Flow / Duct, without duplicating impl? + // they do share a super-type (FlowOps), but all operations of FlowOps return path dependant type + flow.section(name("timedInterval")) { + _.transform(() ⇒ new TimedIntervalTransformer[O](matching, onInterval)) + } } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala index 7c33fe8e1b..47b4858ce4 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala @@ -8,6 +8,7 @@ import java.util.concurrent.atomic.AtomicLong import akka.dispatch.Dispatchers import akka.event.Logging import akka.stream.impl.fusing.ActorInterpreter +import akka.stream.scaladsl.OperationAttributes._ import scala.annotation.tailrec import scala.collection.immutable import scala.concurrent.{ Promise, ExecutionContext, Await, Future } @@ -25,82 +26,171 @@ import org.reactivestreams.{ Processor, Publisher, Subscriber } * INTERNAL API */ private[akka] object Ast { + sealed abstract class AstNode { - def name: String + def attributes: OperationAttributes + def withAttributes(attributes: OperationAttributes): AstNode } - final case class TimerTransform(mkStage: () ⇒ TimerTransformer[Any, Any], override val name: String) extends AstNode + object Defaults { + val timerTransform = name("timerTransform") + val stageFactory = name("stageFactory") + val fused = name("fused") + val map = name("map") + val filter = name("filter") + val collect = name("collect") + val mapAsync = name("mapAsync") + val mapAsyncUnordered = name("mapAsyncUnordered") + val grouped = name("grouped") + val take = name("take") + val drop = name("drop") + val scan = name("scan") + val buffer = name("buffer") + val conflate = name("conflate") + val expand = name("expand") + val mapConcat = name("mapConcat") + val groupBy = name("groupBy") + val prefixAndTail = name("prefixAndTail") + val splitWhen = name("splitWhen") + val concatAll = name("concatAll") + val processor = name("processor") + val processorWithKey = name("processorWithKey") + val identityOp = name("identityOp") - final case class StageFactory(mkStage: () ⇒ Stage[_, _], override val name: String) extends AstNode + val merge = name("merge") + val mergePreferred = name("mergePreferred") + val broadcast = name("broadcast") + val balance = name("balance") + val zip = name("zip") + val unzip = name("unzip") + val concat = name("concat") + val flexiMerge = name("flexiMerge") + val flexiRoute = name("flexiRoute") + val identityJunction = name("identityJunction") + } + + import Defaults._ + + final case class TimerTransform(mkStage: () ⇒ TimerTransformer[Any, Any], attributes: OperationAttributes = timerTransform) extends AstNode { + def withAttributes(attributes: OperationAttributes) = + copy(attributes = attributes) + } + + final case class StageFactory(mkStage: () ⇒ Stage[_, _], attributes: OperationAttributes = stageFactory) extends AstNode { + def withAttributes(attributes: OperationAttributes) = + copy(attributes = attributes) + } object Fused { def apply(ops: immutable.Seq[Stage[_, _]]): Fused = - Fused(ops, ops.map(x ⇒ Logging.simpleName(x).toLowerCase).mkString("+")) //FIXME change to something more performant for name + Fused(ops, name(ops.map(x ⇒ Logging.simpleName(x).toLowerCase).mkString("+"))) //FIXME change to something more performant for name + } + final case class Fused(ops: immutable.Seq[Stage[_, _]], attributes: OperationAttributes = fused) extends AstNode { + def withAttributes(attributes: OperationAttributes) = + copy(attributes = attributes) } - final case class Fused(ops: immutable.Seq[Stage[_, _]], override val name: String) extends AstNode - final case class Map(f: Any ⇒ Any) extends AstNode { override def name = "map" } + final case class Map(f: Any ⇒ Any, attributes: OperationAttributes = map) extends AstNode { + def withAttributes(attributes: OperationAttributes) = + copy(attributes = attributes) + } - final case class Filter(p: Any ⇒ Boolean) extends AstNode { override def name = "filter" } + final case class Filter(p: Any ⇒ Boolean, attributes: OperationAttributes = filter) extends AstNode { + def withAttributes(attributes: OperationAttributes) = + copy(attributes = attributes) + } - final case class Collect(pf: PartialFunction[Any, Any]) extends AstNode { override def name = "collect" } + final case class Collect(pf: PartialFunction[Any, Any], attributes: OperationAttributes = collect) extends AstNode { + def withAttributes(attributes: OperationAttributes) = + copy(attributes = attributes) + } // FIXME Replace with OperateAsync - final case class MapAsync(f: Any ⇒ Future[Any]) extends AstNode { override def name = "mapAsync" } + final case class MapAsync(f: Any ⇒ Future[Any], attributes: OperationAttributes = mapAsync) extends AstNode { + def withAttributes(attributes: OperationAttributes) = + copy(attributes = attributes) + } //FIXME Should be OperateUnorderedAsync - final case class MapAsyncUnordered(f: Any ⇒ Future[Any]) extends AstNode { override def name = "mapAsyncUnordered" } + final case class MapAsyncUnordered(f: Any ⇒ Future[Any], attributes: OperationAttributes = mapAsyncUnordered) extends AstNode { + def withAttributes(attributes: OperationAttributes) = + copy(attributes = attributes) + } - final case class Grouped(n: Int) extends AstNode { + final case class Grouped(n: Int, attributes: OperationAttributes = grouped) extends AstNode { require(n > 0, "n must be greater than 0") - override def name = "grouped" + + def withAttributes(attributes: OperationAttributes) = + copy(attributes = attributes) } //FIXME should be `n: Long` - final case class Take(n: Int) extends AstNode { - override def name = "take" + final case class Take(n: Int, attributes: OperationAttributes = take) extends AstNode { + def withAttributes(attributes: OperationAttributes) = + copy(attributes = attributes) } //FIXME should be `n: Long` - final case class Drop(n: Int) extends AstNode { - override def name = "drop" + final case class Drop(n: Int, attributes: OperationAttributes = drop) extends AstNode { + def withAttributes(attributes: OperationAttributes) = + copy(attributes = attributes) } - final case class Scan(zero: Any, f: (Any, Any) ⇒ Any) extends AstNode { override def name = "scan" } + final case class Scan(zero: Any, f: (Any, Any) ⇒ Any, attributes: OperationAttributes = scan) extends AstNode { + def withAttributes(attributes: OperationAttributes) = + copy(attributes = attributes) + } - final case class Buffer(size: Int, overflowStrategy: OverflowStrategy) extends AstNode { + final case class Buffer(size: Int, overflowStrategy: OverflowStrategy, attributes: OperationAttributes = buffer) extends AstNode { require(size > 0, s"Buffer size must be larger than zero but was [$size]") - override def name = "buffer" + + def withAttributes(attributes: OperationAttributes) = + copy(attributes = attributes) } - final case class Conflate(seed: Any ⇒ Any, aggregate: (Any, Any) ⇒ Any) extends AstNode { - override def name = "conflate" + final case class Conflate(seed: Any ⇒ Any, aggregate: (Any, Any) ⇒ Any, attributes: OperationAttributes = conflate) extends AstNode { + def withAttributes(attributes: OperationAttributes) = + copy(attributes = attributes) } - final case class Expand(seed: Any ⇒ Any, extrapolate: Any ⇒ (Any, Any)) extends AstNode { - override def name = "expand" + final case class Expand(seed: Any ⇒ Any, extrapolate: Any ⇒ (Any, Any), attributes: OperationAttributes = expand) extends AstNode { + def withAttributes(attributes: OperationAttributes) = + copy(attributes = attributes) } - final case class MapConcat(f: Any ⇒ immutable.Seq[Any]) extends AstNode { - override def name = "mapConcat" + final case class MapConcat(f: Any ⇒ immutable.Seq[Any], attributes: OperationAttributes = mapConcat) extends AstNode { + def withAttributes(attributes: OperationAttributes) = + copy(attributes = attributes) } - final case class GroupBy(f: Any ⇒ Any) extends AstNode { override def name = "groupBy" } - - final case class PrefixAndTail(n: Int) extends AstNode { override def name = "prefixAndTail" } - - final case class SplitWhen(p: Any ⇒ Boolean) extends AstNode { override def name = "splitWhen" } - - final case object ConcatAll extends AstNode { - override def name = "concatFlatten" + final case class GroupBy(f: Any ⇒ Any, attributes: OperationAttributes = groupBy) extends AstNode { + def withAttributes(attributes: OperationAttributes) = + copy(attributes = attributes) } - case class DirectProcessor(p: () ⇒ Processor[Any, Any]) extends AstNode { - override def name = "processor" + final case class PrefixAndTail(n: Int, attributes: OperationAttributes = prefixAndTail) extends AstNode { + def withAttributes(attributes: OperationAttributes) = + copy(attributes = attributes) } - case class DirectProcessorWithKey(p: () ⇒ (Processor[Any, Any], Any), key: Key) extends AstNode { - override def name = "processorWithKey" + final case class SplitWhen(p: Any ⇒ Boolean, attributes: OperationAttributes = splitWhen) extends AstNode { + def withAttributes(attributes: OperationAttributes) = + copy(attributes = attributes) } + + final case class ConcatAll(attributes: OperationAttributes = concatAll) extends AstNode { + def withAttributes(attributes: OperationAttributes) = + copy(attributes = attributes) + } + + case class DirectProcessor(p: () ⇒ Processor[Any, Any], attributes: OperationAttributes = processor) extends AstNode { + def withAttributes(attributes: OperationAttributes) = + copy(attributes = attributes) + } + case class DirectProcessorWithKey(p: () ⇒ (Processor[Any, Any], Any), key: Key, attributes: OperationAttributes = processorWithKey) extends AstNode { + def withAttributes(attributes: OperationAttributes) = + copy(attributes = attributes) + } + sealed trait JunctionAstNode { - def name: String + def attributes: OperationAttributes } // FIXME: Try to eliminate these @@ -108,46 +198,21 @@ private[akka] object Ast { sealed trait FanOutAstNode extends JunctionAstNode // FIXME Why do we need this? - case class IdentityAstNode(id: Int) extends JunctionAstNode { - override val name = s"identity$id" - } + case class IdentityAstNode(attributes: OperationAttributes) extends JunctionAstNode - case object Merge extends FanInAstNode { - override def name = "merge" - } + final case class Merge(attributes: OperationAttributes) extends FanInAstNode + final case class MergePreferred(attributes: OperationAttributes) extends FanInAstNode - case object MergePreferred extends FanInAstNode { - override def name = "mergePreferred" - } + final case class Broadcast(attributes: OperationAttributes) extends FanOutAstNode + final case class Balance(waitForAllDownstreams: Boolean, attributes: OperationAttributes) extends FanOutAstNode - case object Broadcast extends FanOutAstNode { - override def name = "broadcast" - } + final case class Zip(as: ZipAs, attributes: OperationAttributes) extends FanInAstNode + final case class Unzip(attributes: OperationAttributes) extends FanOutAstNode - case class Balance(waitForAllDownstreams: Boolean) extends FanOutAstNode { - override def name = "balance" - } - - final case class Zip(as: ZipAs) extends FanInAstNode { - override def name = "zip" - } - - case object Unzip extends FanOutAstNode { - override def name = "unzip" - } - - case object Concat extends FanInAstNode { - override def name = "concat" - } - - case class FlexiMergeNode(factory: FlexiMergeImpl.MergeLogicFactory[Any]) extends FanInAstNode { - override def name = factory.name.getOrElse("flexiMerge") - } - - case class FlexiRouteNode(factory: FlexiRouteImpl.RouteLogicFactory[Any]) extends FanOutAstNode { - override def name = factory.name.getOrElse("flexiRoute") - } + final case class Concat(attributes: OperationAttributes) extends FanInAstNode + final case class FlexiMergeNode(factory: FlexiMergeImpl.MergeLogicFactory[Any], attributes: OperationAttributes) extends FanInAstNode + final case class FlexiRouteNode(factory: FlexiRouteImpl.RouteLogicFactory[Any], attributes: OperationAttributes) extends FanOutAstNode } /** @@ -216,12 +281,12 @@ case class ActorBasedFlowMaterializer(override val settings: MaterializerSetting rest match { case noMatch if !optimizations.elision || (noMatch ne orig) ⇒ orig //Collapses consecutive Take's into one - case (t1 @ Ast.Take(t1n)) :: (t2 @ Ast.Take(t2n)) :: rest ⇒ (if (t1n < t2n) t1 else t2) :: rest + case (t1: Ast.Take) :: (t2: Ast.Take) :: rest ⇒ (if (t1.n < t2.n) t1 else t2) :: rest //Collapses consecutive Drop's into one - case (d1 @ Ast.Drop(d1n)) :: (d2 @ Ast.Drop(d2n)) :: rest ⇒ new Ast.Drop(d1n + d2n) :: rest + case (d1: Ast.Drop) :: (d2: Ast.Drop) :: rest ⇒ new Ast.Drop(d1.n + d2.n, d1.attributes and d2.attributes) :: rest - case Ast.Drop(n) :: rest if n < 1 ⇒ rest // a 0 or negative drop is a NoOp + case Ast.Drop(n, _) :: rest if n < 1 ⇒ rest // a 0 or negative drop is a NoOp case noMatch ⇒ noMatch } @@ -231,7 +296,7 @@ case class ActorBasedFlowMaterializer(override val settings: MaterializerSetting case noMatch if !optimizations.simplification || (noMatch ne orig) ⇒ orig // Two consecutive maps is equivalent to one pipelined map - case Ast.Map(second) :: Ast.Map(first) :: rest ⇒ Ast.Map(first andThen second) :: rest + case (second: Ast.Map) :: (first: Ast.Map) :: rest ⇒ Ast.Map(first.f andThen second.f, first.attributes and second.attributes) :: rest case noMatch ⇒ noMatch } @@ -242,7 +307,7 @@ case class ActorBasedFlowMaterializer(override val settings: MaterializerSetting case noMatch if !optimizations.collapsing || (noMatch ne orig) ⇒ orig // Collapses a filter and a map into a collect - case Ast.Map(f) :: Ast.Filter(p) :: rest ⇒ Ast.Collect({ case i if p(i) ⇒ f(i) }) :: rest + case (map: Ast.Map) :: (fil: Ast.Filter) :: rest ⇒ Ast.Collect({ case i if fil.p(i) ⇒ map.f(i) }) :: rest case noMatch ⇒ noMatch } @@ -256,17 +321,17 @@ case class ActorBasedFlowMaterializer(override val settings: MaterializerSetting // Optimizations below case noMatch if !optimizations.fusion ⇒ prev - case Ast.Map(f) ⇒ fusing.Map(f) :: prev - case Ast.Filter(p) ⇒ fusing.Filter(p) :: prev - case Ast.Drop(n) ⇒ fusing.Drop(n) :: prev - case Ast.Take(n) ⇒ fusing.Take(n) :: prev - case Ast.Collect(pf) ⇒ fusing.Collect(pf) :: prev - case Ast.Scan(z, f) ⇒ fusing.Scan(z, f) :: prev - case Ast.Expand(s, f) ⇒ fusing.Expand(s, f) :: prev - case Ast.Conflate(s, f) ⇒ fusing.Conflate(s, f) :: prev - case Ast.Buffer(n, s) ⇒ fusing.Buffer(n, s) :: prev - case Ast.MapConcat(f) ⇒ fusing.MapConcat(f) :: prev - case Ast.Grouped(n) ⇒ fusing.Grouped(n) :: prev + case Ast.Map(f, _) ⇒ fusing.Map(f) :: prev + case Ast.Filter(p, _) ⇒ fusing.Filter(p) :: prev + case Ast.Drop(n, _) ⇒ fusing.Drop(n) :: prev + case Ast.Take(n, _) ⇒ fusing.Take(n) :: prev + case Ast.Collect(pf, _) ⇒ fusing.Collect(pf) :: prev + case Ast.Scan(z, f, _) ⇒ fusing.Scan(z, f) :: prev + case Ast.Expand(s, f, _) ⇒ fusing.Expand(s, f) :: prev + case Ast.Conflate(s, f, _) ⇒ fusing.Conflate(s, f) :: prev + case Ast.Buffer(n, s, _) ⇒ fusing.Buffer(n, s) :: prev + case Ast.MapConcat(f, _) ⇒ fusing.MapConcat(f) :: prev + case Ast.Grouped(n, _) ⇒ fusing.Grouped(n) :: prev //FIXME Add more fusion goodies here case _ ⇒ prev } @@ -354,7 +419,7 @@ case class ActorBasedFlowMaterializer(override val settings: MaterializerSetting } } //FIXME Should this be a dedicated AstNode? - private[this] val identityStageNode = Ast.StageFactory(() ⇒ FlowOps.identityStage[Any], "identity") + private[this] val identityStageNode = Ast.StageFactory(() ⇒ FlowOps.identityStage[Any], Ast.Defaults.identityOp) def executionContext: ExecutionContext = dispatchers.lookup(settings.dispatcher match { case Deploy.NoDispatcherGiven ⇒ Dispatchers.DefaultDispatcherId @@ -366,23 +431,32 @@ case class ActorBasedFlowMaterializer(override val settings: MaterializerSetting */ private[akka] def processorForNode[In, Out](op: AstNode, flowName: String, n: Int): (Processor[In, Out], MaterializedMap) = op match { // FIXME #16376 should probably be replaced with an ActorFlowProcessor similar to ActorFlowSource/Sink - case Ast.DirectProcessor(p) ⇒ (p().asInstanceOf[Processor[In, Out]], MaterializedMap.empty) - case Ast.DirectProcessorWithKey(p, key) ⇒ + case Ast.DirectProcessor(p, _) ⇒ (p().asInstanceOf[Processor[In, Out]], MaterializedMap.empty) + case Ast.DirectProcessorWithKey(p, key, _) ⇒ val (processor, value) = p() (processor.asInstanceOf[Processor[In, Out]], MaterializedMap.empty.updated(key, value)) case _ ⇒ - (ActorProcessorFactory[In, Out](actorOf(ActorProcessorFactory.props(this, op), s"$flowName-$n-${op.name}")), MaterializedMap.empty) + (ActorProcessorFactory[In, Out](actorOf(ActorProcessorFactory.props(this, op), s"$flowName-$n-${op.attributes.name}", op)), MaterializedMap.empty) } - def actorOf(props: Props, name: String): ActorRef = supervisor match { + private[akka] def actorOf(props: Props, name: String): ActorRef = + actorOf(props, name, settings.dispatcher) + + private[akka] def actorOf(props: Props, name: String, ast: Ast.JunctionAstNode): ActorRef = + actorOf(props, name, ast.attributes.settings(settings).dispatcher) + + private[akka] def actorOf(props: Props, name: String, ast: AstNode): ActorRef = + actorOf(props, name, ast.attributes.settings(settings).dispatcher) + + private[akka] def actorOf(props: Props, name: String, dispatcher: String): ActorRef = supervisor match { case ref: LocalActorRef ⇒ - ref.underlying.attachChild(props.withDispatcher(settings.dispatcher), name, systemService = false) + ref.underlying.attachChild(props.withDispatcher(dispatcher), name, systemService = false) case ref: RepointableActorRef ⇒ if (ref.isStarted) - ref.underlying.asInstanceOf[ActorCell].attachChild(props.withDispatcher(settings.dispatcher), name, systemService = false) + ref.underlying.asInstanceOf[ActorCell].attachChild(props.withDispatcher(dispatcher), name, systemService = false) else { implicit val timeout = ref.system.settings.CreationTimeout - val f = (supervisor ? StreamSupervisor.Materialize(props.withDispatcher(settings.dispatcher), name)).mapTo[ActorRef] + val f = (supervisor ? StreamSupervisor.Materialize(props.withDispatcher(dispatcher), name)).mapTo[ActorRef] Await.result(f, timeout.duration) } case unknown ⇒ @@ -390,17 +464,20 @@ case class ActorBasedFlowMaterializer(override val settings: MaterializerSetting } // FIXME Investigate possibility of using `enableOperationsFusion` in `materializeJunction` override def materializeJunction[In, Out](op: Ast.JunctionAstNode, inputCount: Int, outputCount: Int): (immutable.Seq[Subscriber[In]], immutable.Seq[Publisher[Out]]) = { - val actorName = s"${createFlowName()}-${op.name}" + val actorName = s"${createFlowName()}-${op.attributes.name}" + + val transformedSettings = op.attributes.settings(settings) op match { case fanin: Ast.FanInAstNode ⇒ - val impl = fanin match { - case Ast.Merge ⇒ actorOf(FairMerge.props(settings, inputCount), actorName) - case Ast.MergePreferred ⇒ actorOf(UnfairMerge.props(settings, inputCount), actorName) - case zip: Ast.Zip ⇒ actorOf(Zip.props(settings, zip.as), actorName) - case Ast.Concat ⇒ actorOf(Concat.props(settings), actorName) - case Ast.FlexiMergeNode(merger) ⇒ actorOf(FlexiMergeImpl.props(settings, inputCount, merger.createMergeLogic()), actorName) + val props = fanin match { + case Ast.Merge(_) ⇒ FairMerge.props(transformedSettings, inputCount) + case Ast.MergePreferred(_) ⇒ UnfairMerge.props(transformedSettings, inputCount) + case Ast.Zip(as, _) ⇒ Zip.props(transformedSettings, as) + case Ast.Concat(_) ⇒ Concat.props(transformedSettings) + case Ast.FlexiMergeNode(merger, _) ⇒ FlexiMergeImpl.props(transformedSettings, inputCount, merger.createMergeLogic()) } + val impl = actorOf(props, actorName, fanin) val publisher = new ActorPublisher[Out](impl) impl ! ExposedPublisher(publisher.asInstanceOf[ActorPublisher[Any]]) @@ -408,12 +485,13 @@ case class ActorBasedFlowMaterializer(override val settings: MaterializerSetting (subscribers, List(publisher)) case fanout: Ast.FanOutAstNode ⇒ - val impl = fanout match { - case Ast.Broadcast ⇒ actorOf(Broadcast.props(settings, outputCount), actorName) - case Ast.Balance(waitForAllDownstreams) ⇒ actorOf(Balance.props(settings, outputCount, waitForAllDownstreams), actorName) - case Ast.Unzip ⇒ actorOf(Unzip.props(settings), actorName) - case Ast.FlexiRouteNode(route) ⇒ actorOf(FlexiRouteImpl.props(settings, outputCount, route.createRouteLogic()), actorName) + val props = fanout match { + case Ast.Broadcast(_) ⇒ Broadcast.props(transformedSettings, outputCount) + case Ast.Balance(waitForAllDownstreams, _) ⇒ Balance.props(transformedSettings, outputCount, waitForAllDownstreams) + case Ast.Unzip(_) ⇒ Unzip.props(transformedSettings) + case Ast.FlexiRouteNode(route, _) ⇒ FlexiRouteImpl.props(transformedSettings, outputCount, route.createRouteLogic()) } + val impl = actorOf(props, actorName, fanout) val publishers = Vector.tabulate(outputCount)(id ⇒ new ActorPublisher[Out](impl) { override val wakeUpMsg = FanOut.SubstreamSubscribePending(id) @@ -422,9 +500,9 @@ case class ActorBasedFlowMaterializer(override val settings: MaterializerSetting val subscriber = ActorSubscriber[In](impl) (List(subscriber), publishers) - case identity @ Ast.IdentityAstNode(_) ⇒ // FIXME Why is IdentityAstNode a JunctionAStNode? + case identity @ Ast.IdentityAstNode(attr) ⇒ // FIXME Why is IdentityAstNode a JunctionAStNode? // We can safely ignore the materialized map that gets created here since it will be empty - val id = List(processorForNode[In, Out](identityStageNode, identity.name, 1)._1) // FIXME is `identity.name` appropriate/unique here? + val id = List(processorForNode[In, Out](identityStageNode, attr.name, 1)._1) // FIXME is `identity.name` appropriate/unique here? (id, id) } @@ -477,28 +555,28 @@ private[akka] object ActorProcessorFactory { import Ast._ def props(materializer: FlowMaterializer, op: AstNode): Props = { val settings = materializer.settings // USE THIS TO AVOID CLOSING OVER THE MATERIALIZER BELOW - (op match { + op match { case Fused(ops, _) ⇒ ActorInterpreter.props(settings, ops) - case Map(f) ⇒ ActorInterpreter.props(settings, List(fusing.Map(f))) - case Filter(p) ⇒ ActorInterpreter.props(settings, List(fusing.Filter(p))) - case Drop(n) ⇒ ActorInterpreter.props(settings, List(fusing.Drop(n))) - case Take(n) ⇒ ActorInterpreter.props(settings, List(fusing.Take(n))) - case Collect(pf) ⇒ ActorInterpreter.props(settings, List(fusing.Collect(pf))) - case Scan(z, f) ⇒ ActorInterpreter.props(settings, List(fusing.Scan(z, f))) - case Expand(s, f) ⇒ ActorInterpreter.props(settings, List(fusing.Expand(s, f))) - case Conflate(s, f) ⇒ ActorInterpreter.props(settings, List(fusing.Conflate(s, f))) - case Buffer(n, s) ⇒ ActorInterpreter.props(settings, List(fusing.Buffer(n, s))) - case MapConcat(f) ⇒ ActorInterpreter.props(settings, List(fusing.MapConcat(f))) - case MapAsync(f) ⇒ MapAsyncProcessorImpl.props(settings, f) - case MapAsyncUnordered(f) ⇒ MapAsyncUnorderedProcessorImpl.props(settings, f) - case Grouped(n) ⇒ ActorInterpreter.props(settings, List(fusing.Grouped(n))) - case GroupBy(f) ⇒ GroupByProcessorImpl.props(settings, f) - case PrefixAndTail(n) ⇒ PrefixAndTailImpl.props(settings, n) - case SplitWhen(p) ⇒ SplitWhenProcessorImpl.props(settings, p) - case ConcatAll ⇒ ConcatAllImpl.props(materializer) //FIXME closes over the materializer, is this good? + case Map(f, _) ⇒ ActorInterpreter.props(settings, List(fusing.Map(f))) + case Filter(p, _) ⇒ ActorInterpreter.props(settings, List(fusing.Filter(p))) + case Drop(n, _) ⇒ ActorInterpreter.props(settings, List(fusing.Drop(n))) + case Take(n, _) ⇒ ActorInterpreter.props(settings, List(fusing.Take(n))) + case Collect(pf, _) ⇒ ActorInterpreter.props(settings, List(fusing.Collect(pf))) + case Scan(z, f, _) ⇒ ActorInterpreter.props(settings, List(fusing.Scan(z, f))) + case Expand(s, f, _) ⇒ ActorInterpreter.props(settings, List(fusing.Expand(s, f))) + case Conflate(s, f, _) ⇒ ActorInterpreter.props(settings, List(fusing.Conflate(s, f))) + case Buffer(n, s, _) ⇒ ActorInterpreter.props(settings, List(fusing.Buffer(n, s))) + case MapConcat(f, _) ⇒ ActorInterpreter.props(settings, List(fusing.MapConcat(f))) + case MapAsync(f, _) ⇒ MapAsyncProcessorImpl.props(settings, f) + case MapAsyncUnordered(f, _) ⇒ MapAsyncUnorderedProcessorImpl.props(settings, f) + case Grouped(n, _) ⇒ ActorInterpreter.props(settings, List(fusing.Grouped(n))) + case GroupBy(f, _) ⇒ GroupByProcessorImpl.props(settings, f) + case PrefixAndTail(n, _) ⇒ PrefixAndTailImpl.props(settings, n) + case SplitWhen(p, _) ⇒ SplitWhenProcessorImpl.props(settings, p) + case ConcatAll(_) ⇒ ConcatAllImpl.props(materializer) //FIXME closes over the materializer, is this good? case StageFactory(mkStage, _) ⇒ ActorInterpreter.props(settings, List(mkStage())) case TimerTransform(mkStage, _) ⇒ TimerTransformerProcessorsImpl.props(settings, mkStage()) - }).withDispatcher(settings.dispatcher) + } } def apply[I, O](impl: ActorRef): ActorProcessor[I, O] = { diff --git a/akka-stream/src/main/scala/akka/stream/impl/FlexiMergeImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/FlexiMergeImpl.scala index 7a27c09597..5b224344f2 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FlexiMergeImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FlexiMergeImpl.scala @@ -5,6 +5,7 @@ package akka.stream.impl import akka.actor.Props import akka.stream.MaterializerSettings +import akka.stream.scaladsl.OperationAttributes import akka.stream.scaladsl.FlexiMerge import scala.collection.breakOut @@ -17,7 +18,7 @@ private[akka] object FlexiMergeImpl { Props(new FlexiMergeImpl(settings, inputCount, mergeLogic)) trait MergeLogicFactory[Out] { - def name: Option[String] + def attributes: OperationAttributes def createMergeLogic(): FlexiMerge.MergeLogic[Out] } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/FlexiRouteImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/FlexiRouteImpl.scala index 2693b090be..6567c94573 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FlexiRouteImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FlexiRouteImpl.scala @@ -3,6 +3,8 @@ */ package akka.stream.impl +import akka.stream.scaladsl.OperationAttributes + import scala.collection.breakOut import akka.actor.Props import akka.stream.scaladsl.FlexiRoute @@ -17,7 +19,7 @@ private[akka] object FlexiRouteImpl { Props(new FlexiRouteImpl(settings, outputCount, routeLogic)) trait RouteLogicFactory[In] { - def name: Option[String] + def attributes: OperationAttributes def createRouteLogic(): FlexiRoute.RouteLogic[In] } } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/FlexiMerge.scala b/akka-stream/src/main/scala/akka/stream/javadsl/FlexiMerge.scala index b88efc9994..246eb6828b 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/FlexiMerge.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/FlexiMerge.scala @@ -9,6 +9,7 @@ import akka.stream.scaladsl.FlexiMerge.ReadAllInputsBase import scala.collection.immutable import java.util.{ List ⇒ JList } import akka.japi.Util.immutableIndexedSeq +import akka.stream.impl.Ast.Defaults._ import akka.stream.impl.FlexiMergeImpl.MergeLogicFactory object FlexiMerge { @@ -337,20 +338,20 @@ object FlexiMerge { * * @param name optional name of the junction in the [[FlowGraph]], */ -abstract class FlexiMerge[In, Out](val name: Option[String]) { +abstract class FlexiMerge[In, Out](val attributes: OperationAttributes) { import FlexiMerge._ import scaladsl.FlowGraphInternal import akka.stream.impl.Ast - def this() = this(None) - def this(name: String) = this(Option(name)) + def this(name: String) = this(OperationAttributes.name(name)) + def this() = this(OperationAttributes.none) private var inputCount = 0 def createMergeLogic(): MergeLogic[In, Out] // hide the internal vertex things from subclass, and make it possible to create new instance - private class FlexiMergeVertex(vertexName: Option[String]) extends FlowGraphInternal.InternalVertex { + private class FlexiMergeVertex(override val attributes: scaladsl.OperationAttributes) extends FlowGraphInternal.InternalVertex { override def minimumInputCount = 2 override def maximumInputCount = inputCount override def minimumOutputCount = 1 @@ -358,22 +359,20 @@ abstract class FlexiMerge[In, Out](val name: Option[String]) { override private[akka] val astNode = { val factory = new MergeLogicFactory[Any] { - override def name: Option[String] = vertexName + override def attributes: scaladsl.OperationAttributes = FlexiMergeVertex.this.attributes override def createMergeLogic(): scaladsl.FlexiMerge.MergeLogic[Any] = new Internal.MergeLogicWrapper(FlexiMerge.this.createMergeLogic().asInstanceOf[MergeLogic[Any, Any]]) } - Ast.FlexiMergeNode(factory) + Ast.FlexiMergeNode(factory, flexiMerge and attributes) } - override def name = vertexName - - final override def newInstance() = new FlexiMergeVertex(None) + final override def newInstance() = new FlexiMergeVertex(attributes.withoutName) } /** * INTERNAL API */ - private[akka] val vertex: FlowGraphInternal.InternalVertex = new FlexiMergeVertex(name) + private[akka] val vertex: FlowGraphInternal.InternalVertex = new FlexiMergeVertex(attributes.asScala) /** * Output port of the `FlexiMerge` junction. A [[Sink]] can be connected to this output @@ -399,4 +398,8 @@ abstract class FlexiMerge[In, Out](val name: Option[String]) { new InputPort(port, parent = this) } + override def toString = attributes.asScala.nameLifted match { + case Some(n) ⇒ n + case None ⇒ getClass.getSimpleName + "@" + Integer.toHexString(super.hashCode()) + } } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/FlexiRoute.scala b/akka-stream/src/main/scala/akka/stream/javadsl/FlexiRoute.scala index 6c76210021..7ff0e8a5fd 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/FlexiRoute.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/FlexiRoute.scala @@ -8,6 +8,7 @@ import akka.stream.scaladsl import scala.collection.immutable import java.util.{ List ⇒ JList } import akka.japi.Util.immutableIndexedSeq +import akka.stream.impl.Ast.Defaults._ import akka.stream.impl.FlexiRouteImpl.RouteLogicFactory object FlexiRoute { @@ -304,18 +305,18 @@ object FlexiRoute { * * @param name optional name of the junction in the [[FlowGraph]], */ -abstract class FlexiRoute[In, Out](val name: Option[String]) { +abstract class FlexiRoute[In, Out](val attributes: OperationAttributes) { import FlexiRoute._ import scaladsl.FlowGraphInternal import akka.stream.impl.Ast - def this() = this(None) - def this(name: String) = this(Option(name)) + def this(name: String) = this(OperationAttributes.name(name)) + def this() = this(OperationAttributes.none) private var outputCount = 0 // hide the internal vertex things from subclass, and make it possible to create new instance - private class RouteVertex(vertexName: Option[String]) extends FlowGraphInternal.InternalVertex { + private class RouteVertex(override val attributes: scaladsl.OperationAttributes) extends FlowGraphInternal.InternalVertex { override def minimumInputCount = 1 override def maximumInputCount = 1 override def minimumOutputCount = 2 @@ -323,22 +324,20 @@ abstract class FlexiRoute[In, Out](val name: Option[String]) { override private[akka] val astNode = { val factory = new RouteLogicFactory[Any] { - override def name: Option[String] = vertexName + override def attributes: scaladsl.OperationAttributes = RouteVertex.this.attributes override def createRouteLogic(): scaladsl.FlexiRoute.RouteLogic[Any] = new Internal.RouteLogicWrapper(FlexiRoute.this.createRouteLogic().asInstanceOf[RouteLogic[Any, Any]]) } - Ast.FlexiRouteNode(factory) + Ast.FlexiRouteNode(factory, flexiRoute and attributes) } - override def name = vertexName - - final override def newInstance() = new RouteVertex(None) + final override def newInstance() = new RouteVertex(attributes.withoutName) } /** * INTERNAL API */ - private[akka] val vertex: FlowGraphInternal.InternalVertex = new RouteVertex(name) + private[akka] val vertex: FlowGraphInternal.InternalVertex = new RouteVertex(attributes.asScala) /** * Input port of the `FlexiRoute` junction. A [[Source]] can be connected to this output @@ -371,7 +370,7 @@ abstract class FlexiRoute[In, Out](val name: Option[String]) { */ def createRouteLogic(): RouteLogic[In, Out] - override def toString = name match { + override def toString = attributes.asScala.nameLifted match { case Some(n) ⇒ n case None ⇒ getClass.getSimpleName + "@" + Integer.toHexString(super.hashCode()) } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 033c701944..6c2e05bdf5 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -301,8 +301,8 @@ class Flow[-In, +Out](delegate: scaladsl.Flow[In, Out]) { * This operator makes it possible to extend the `Flow` API when there is no specialized * operator that performs the transformation. */ - def transform[U](name: String, mkStage: japi.Creator[Stage[Out, U]]): javadsl.Flow[In, U] = - new Flow(delegate.transform(name, () ⇒ mkStage.create())) + def transform[U](mkStage: japi.Creator[Stage[Out, U]]): javadsl.Flow[In, U] = + new Flow(delegate.transform(() ⇒ mkStage.create())) /** * Takes up to `n` elements from the stream and returns a pair containing a strict sequence of the taken element @@ -363,6 +363,16 @@ class Flow[-In, +Out](delegate: scaladsl.Flow[In, Out]) { */ def withKey[T](key: javadsl.Key[T]): Flow[In, Out] = new Flow(delegate.withKey(key.asScala)) + + /** + * Applies given [[OperationAttributes]] to a given section. + */ + def section[I <: In, O](attributes: OperationAttributes, section: japi.Function[javadsl.Flow[In, Out], javadsl.Flow[I, O]]): javadsl.Flow[I, O] = + new Flow(delegate.section(attributes.asScala) { + val scalaToJava = (flow: scaladsl.Flow[In, Out]) ⇒ new javadsl.Flow[In, Out](flow) + val javaToScala = (flow: javadsl.Flow[I, O]) ⇒ flow.asScala + scalaToJava andThen section.apply andThen javaToScala + }) } /** diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/FlowGraph.scala b/akka-stream/src/main/scala/akka/stream/javadsl/FlowGraph.scala index c520f411ae..1aa5351110 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/FlowGraph.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/FlowGraph.scala @@ -56,7 +56,7 @@ object Merge { * in the `FlowGraph`. Calling this method several times with the same name * returns instances that are `equal`. */ - def create[T](name: String): Merge[T] = new Merge(new scaladsl.Merge[T](Option(name))) + def create[T](name: String): Merge[T] = new Merge(new scaladsl.Merge[T](OperationAttributes.name(name).asScala)) /** * Create a named `Merge` vertex with the specified output type. @@ -101,7 +101,7 @@ object MergePreferred { * in the `FlowGraph`. Calling this method several times with the same name * returns instances that are `equal`. */ - def create[T](name: String): MergePreferred[T] = new MergePreferred(new scaladsl.MergePreferred[T](Option(name))) + def create[T](name: String): MergePreferred[T] = new MergePreferred(new scaladsl.MergePreferred[T](OperationAttributes.name(name).asScala)) /** * Create a named `MergePreferred` vertex with the specified output type. @@ -146,7 +146,7 @@ object Broadcast { * in the `FlowGraph`. Calling this method several times with the same name * returns instances that are `equal`. */ - def create[T](name: String): Broadcast[T] = new Broadcast(new scaladsl.Broadcast(Option(name))) + def create[T](name: String): Broadcast[T] = new Broadcast(new scaladsl.Broadcast(OperationAttributes.name(name).asScala)) /** * Create a named `Broadcast` vertex with the specified input type. @@ -190,7 +190,7 @@ object Balance { * returns instances that are `equal`. */ def create[T](name: String): Balance[T] = - new Balance(new scaladsl.Balance(Option(name), waitForAllDownstreams = false)) + new Balance(new scaladsl.Balance(waitForAllDownstreams = false, OperationAttributes.name(name).asScala)) /** * Create a named `Balance` vertex with the specified input type. @@ -214,7 +214,7 @@ class Balance[T](delegate: scaladsl.Balance[T]) extends javadsl.Junction[T] { * elements to downstream outputs until all of them have requested at least one element. */ def withWaitForAllDowstreams(enabled: Boolean): Balance[T] = - new Balance(new scaladsl.Balance(delegate.name, delegate.waitForAllDownstreams)) + new Balance(new scaladsl.Balance(delegate.waitForAllDownstreams, delegate.attributes)) } object Zip { @@ -242,8 +242,8 @@ object Zip { * is called and those instances are not `equal`.* */ def create[A, B](name: String): Zip[A, B] = - new Zip(new scaladsl.Zip[A, B](Option(name)) { - override private[akka] def astNode: Ast.FanInAstNode = Ast.Zip(impl.Zip.AsJavaPair) + new Zip(new scaladsl.Zip[A, B](OperationAttributes.name(name).asScala) { + override private[akka] def astNode: Ast.FanInAstNode = Ast.Zip(impl.Zip.AsJavaPair, attributes) }) /** @@ -287,7 +287,7 @@ object Unzip { def create[A, B](): Unzip[A, B] = create(name = null) def create[A, B](name: String): Unzip[A, B] = - new Unzip[A, B](new scaladsl.Unzip[A, B](Option(name))) + new Unzip[A, B](new scaladsl.Unzip[A, B](OperationAttributes.name(name).asScala)) def create[A, B](left: Class[A], right: Class[B]): Unzip[A, B] = create[A, B]() @@ -389,7 +389,7 @@ object UndefinedSource { * in the `FlowGraph`. This method creates a new instance every time it * is called and those instances are not `equal`. */ - def create[T](): UndefinedSource[T] = new UndefinedSource[T](new scaladsl.UndefinedSource[T](None)) + def create[T](): UndefinedSource[T] = new UndefinedSource[T](new scaladsl.UndefinedSource[T](scaladsl.OperationAttributes.none)) /** * Create a new anonymous `Undefinedsource` vertex with the specified input type. @@ -405,7 +405,7 @@ object UndefinedSource { * in the `FlowGraph`. Calling this method several times with the same name * returns instances that are `equal`. */ - def create[T](name: String): UndefinedSource[T] = new UndefinedSource[T](new scaladsl.UndefinedSource[T](Option(name))) + def create[T](name: String): UndefinedSource[T] = new UndefinedSource[T](new scaladsl.UndefinedSource[T](OperationAttributes.name(name).asScala)) /** * Create a named `Undefinedsource` vertex with the specified input type. @@ -448,7 +448,7 @@ object UndefinedSink { * in the `FlowGraph`. Calling this method several times with the same name * returns instances that are `equal`. */ - def create[T](name: String): UndefinedSink[T] = new UndefinedSink[T](new scaladsl.UndefinedSink[T](Option(name))) + def create[T](name: String): UndefinedSink[T] = new UndefinedSink[T](new scaladsl.UndefinedSink[T](OperationAttributes.name(name).asScala)) /** * Create a named `Undefinedsink` vertex with the specified input type. diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/OperationAttributes.scala b/akka-stream/src/main/scala/akka/stream/javadsl/OperationAttributes.scala new file mode 100644 index 0000000000..03617bdc28 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/javadsl/OperationAttributes.scala @@ -0,0 +1,60 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.javadsl + +import akka.stream.scaladsl + +/** + * Holds attributes which can be used to alter [[Flow]] or [[FlowGraph]] + * materialization. + */ +abstract class OperationAttributes private () { + private[akka] def asScala: scaladsl.OperationAttributes + + /** + * Adds given attributes to the end of these attributes. + */ + def and(other: OperationAttributes) = new OperationAttributes { + private[akka] def asScala = this.asScala and other.asScala + } +} + +/** + * Various attributes that can be applied to [[Flow]] or [[FlowGraph]] + * materialization. + */ +object OperationAttributes { + + /** + * Specifies the name of the operation. + */ + def name(name: String): OperationAttributes = new OperationAttributes { + private[akka] def asScala = scaladsl.OperationAttributes.name(name) + } + + /** + * Specifies the initial and maximum size of the input buffer. + */ + def inputBuffer(initial: Int, max: Int): OperationAttributes = new OperationAttributes { + private[akka] def asScala = scaladsl.OperationAttributes.inputBuffer(initial, max) + } + + /** + * Specifies the initial and maximum size of the fan out buffer. + */ + def fanOutBuffer(initial: Int, max: Int): OperationAttributes = new OperationAttributes { + private[akka] def asScala = scaladsl.OperationAttributes.fanOutBuffer(initial, max) + } + + /** + * Specifies the name of the dispatcher. + */ + def dispatcher(dispatcher: String): OperationAttributes = new OperationAttributes { + private[akka] def asScala = scaladsl.OperationAttributes.dispatcher(dispatcher) + } + + private[akka] val none: OperationAttributes = new OperationAttributes { + private[akka] def asScala = scaladsl.OperationAttributes.none + } +} diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index c40d793135..33760f9a39 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -390,8 +390,8 @@ class Source[+Out](delegate: scaladsl.Source[Out]) { * This operator makes it possible to extend the `Flow` API when there is no specialized * operator that performs the transformation. */ - def transform[U](name: String, mkStage: japi.Creator[Stage[Out, U]]): javadsl.Source[U] = - new Source(delegate.transform(name, () ⇒ mkStage.create())) + def transform[U](mkStage: japi.Creator[Stage[Out, U]]): javadsl.Source[U] = + new Source(delegate.transform(() ⇒ mkStage.create())) /** * Takes up to `n` elements from the stream and returns a pair containing a strict sequence of the taken element @@ -445,6 +445,16 @@ class Source[+Out](delegate: scaladsl.Source[Out]) { */ def withKey[T](key: javadsl.Key[T]): javadsl.Source[Out] = new Source(delegate.withKey(key.asScala)) + + /** + * Applies given [[OperationAttributes]] to a given section. + */ + def section[O](attributes: OperationAttributes, section: japi.Function[javadsl.Source[Out], javadsl.Source[O]]): javadsl.Source[O] = + new Source(delegate.section(attributes.asScala) { + val scalaToJava = (source: scaladsl.Source[Out]) ⇒ new javadsl.Source[Out](source) + val javaToScala = (source: javadsl.Source[O]) ⇒ source.asScala + scalaToJava andThen section.apply andThen javaToScala + }) } /** diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/ActorFlowSink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/ActorFlowSink.scala index 2c36723d9f..7d9aaa0b13 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/ActorFlowSink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/ActorFlowSink.scala @@ -10,9 +10,10 @@ import scala.annotation.unchecked.uncheckedVariance import scala.concurrent.{ Future, Promise } import scala.util.{ Failure, Success, Try } import org.reactivestreams.{ Publisher, Subscriber, Subscription } +import akka.stream.scaladsl.OperationAttributes._ import akka.stream.impl.{ ActorBasedFlowMaterializer, ActorProcessorFactory, FanoutProcessorImpl, BlackholeSubscriber } -import java.util.concurrent.atomic.AtomicReference import akka.stream.stage._ +import java.util.concurrent.atomic.AtomicReference sealed trait ActorFlowSink[-In] extends Sink[In] { @@ -170,8 +171,8 @@ object OnCompleteSink { */ final case class OnCompleteSink[In](callback: Try[Unit] ⇒ Unit) extends SimpleActorFlowSink[In] { - override def attach(flowPublisher: Publisher[In], materializer: ActorBasedFlowMaterializer, flowName: String) = - Source(flowPublisher).transform("onCompleteSink", () ⇒ new PushStage[In, Unit] { + override def attach(flowPublisher: Publisher[In], materializer: ActorBasedFlowMaterializer, flowName: String) = { + val section = (s: Source[In]) ⇒ s.transform(() ⇒ new PushStage[In, Unit] { override def onPush(elem: In, ctx: Context[Unit]): Directive = ctx.pull() override def onUpstreamFailure(cause: Throwable, ctx: Context[Unit]): TerminationDirective = { callback(Failure(cause)) @@ -181,7 +182,13 @@ final case class OnCompleteSink[In](callback: Try[Unit] ⇒ Unit) extends Simple callback(OnCompleteSink.SuccessUnit) ctx.finish() } - }).to(BlackholeSink).run()(materializer.withNamePrefix(flowName)) + }) + + Source(flowPublisher). + section(name("onCompleteSink"))(section). + to(BlackholeSink). + run()(materializer.withNamePrefix(flowName)) + } } /** @@ -195,7 +202,7 @@ final case class ForeachSink[In](f: In ⇒ Unit) extends KeyedActorFlowSink[In] override def attach(flowPublisher: Publisher[In], materializer: ActorBasedFlowMaterializer, flowName: String) = { val promise = Promise[Unit]() - Source(flowPublisher).transform("foreach", () ⇒ new PushStage[In, Unit] { + val section = (s: Source[In]) ⇒ s.transform(() ⇒ new PushStage[In, Unit] { override def onPush(elem: In, ctx: Context[Unit]): Directive = { f(elem) ctx.pull() @@ -208,7 +215,12 @@ final case class ForeachSink[In](f: In ⇒ Unit) extends KeyedActorFlowSink[In] promise.success(()) ctx.finish() } - }).to(BlackholeSink).run()(materializer.withNamePrefix(flowName)) + }) + + Source(flowPublisher). + section(name("foreach"))(section). + to(BlackholeSink). + run()(materializer.withNamePrefix(flowName)) promise.future } } @@ -226,8 +238,7 @@ final case class FoldSink[U, In](zero: U)(f: (U, In) ⇒ U) extends KeyedActorFl override def attach(flowPublisher: Publisher[In], materializer: ActorBasedFlowMaterializer, flowName: String) = { val promise = Promise[U]() - - Source(flowPublisher).transform("fold", () ⇒ new PushStage[In, U] { + val section = (s: Source[In]) ⇒ s.transform(() ⇒ new PushStage[In, U] { private var aggregator = zero override def onPush(elem: In, ctx: Context[U]): Directive = { @@ -244,8 +255,12 @@ final case class FoldSink[U, In](zero: U)(f: (U, In) ⇒ U) extends KeyedActorFl promise.success(aggregator) ctx.finish() } - }).to(BlackholeSink).run()(materializer.withNamePrefix(flowName)) + }) + Source(flowPublisher). + section(name("fold"))(section). + to(BlackholeSink). + run()(materializer.withNamePrefix(flowName)) promise.future } } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/ActorFlowSource.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/ActorFlowSource.scala index 8a5a33151d..892df12f78 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/ActorFlowSource.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/ActorFlowSource.scala @@ -62,6 +62,8 @@ sealed trait ActorFlowSource[+Out] extends Source[Out] { /** INTERNAL API */ override private[scaladsl] def andThen[U](op: AstNode) = SourcePipe(this, List(op), Nil) //FIXME raw addition of AstNodes + + def withAttributes(attr: OperationAttributes) = SourcePipe(this, Nil, Nil, attr) } /** diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/FlexiMerge.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/FlexiMerge.scala index 02193acef5..2d0bffdd0c 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/FlexiMerge.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/FlexiMerge.scala @@ -5,7 +5,9 @@ package akka.stream.scaladsl import scala.annotation.varargs import scala.collection.immutable +import akka.stream.scaladsl.OperationAttributes._ import akka.stream.impl.Ast +import akka.stream.impl.Ast.Defaults._ import akka.stream.impl.FlexiMergeImpl.MergeLogicFactory object FlexiMerge { @@ -235,28 +237,27 @@ object FlexiMerge { * * @param name optional name of the junction in the [[FlowGraph]], */ -abstract class FlexiMerge[Out](val name: Option[String]) extends MergeLogicFactory[Out] { +abstract class FlexiMerge[Out](override val attributes: OperationAttributes) extends MergeLogicFactory[Out] { import FlexiMerge._ - def this(name: String) = this(Some(name)) - def this() = this(None) + def this(name: String) = this(OperationAttributes.name(name)) + def this() = this(OperationAttributes.none) private var inputCount = 0 // hide the internal vertex things from subclass, and make it possible to create new instance - private class FlexiMergeVertex(vertexName: Option[String]) extends FlowGraphInternal.InternalVertex { + private class FlexiMergeVertex(override val attributes: OperationAttributes) extends FlowGraphInternal.InternalVertex { override def minimumInputCount = 2 override def maximumInputCount = inputCount override def minimumOutputCount = 1 override def maximumOutputCount = 1 - override private[akka] val astNode = Ast.FlexiMergeNode(FlexiMerge.this.asInstanceOf[FlexiMerge[Any]]) - override def name = vertexName + override private[akka] val astNode = Ast.FlexiMergeNode(FlexiMerge.this.asInstanceOf[FlexiMerge[Any]], flexiMerge and attributes) - final override private[scaladsl] def newInstance() = new FlexiMergeVertex(None) + final override private[scaladsl] def newInstance() = new FlexiMergeVertex(attributes.withoutName) } - private[scaladsl] val vertex: FlowGraphInternal.InternalVertex = new FlexiMergeVertex(name) + private[scaladsl] val vertex: FlowGraphInternal.InternalVertex = new FlexiMergeVertex(attributes) /** * Output port of the `FlexiMerge` junction. A [[Sink]] can be connected to this output @@ -286,7 +287,7 @@ abstract class FlexiMerge[Out](val name: Option[String]) extends MergeLogicFacto */ override def createMergeLogic(): MergeLogic[Out] - override def toString = name match { + override def toString = attributes.nameLifted match { case Some(n) ⇒ n case None ⇒ getClass.getSimpleName + "@" + Integer.toHexString(super.hashCode()) } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/FlexiRoute.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/FlexiRoute.scala index 241ad7712e..3610bca8b7 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/FlexiRoute.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/FlexiRoute.scala @@ -4,7 +4,9 @@ package akka.stream.scaladsl import scala.collection.immutable +import akka.stream.scaladsl.OperationAttributes._ import akka.stream.impl.Ast +import akka.stream.impl.Ast.Defaults._ import akka.stream.impl.FlexiRouteImpl.RouteLogicFactory object FlexiRoute { @@ -211,28 +213,27 @@ object FlexiRoute { * * @param name optional name of the junction in the [[FlowGraph]], */ -abstract class FlexiRoute[In](val name: Option[String]) extends RouteLogicFactory[In] { +abstract class FlexiRoute[In](override val attributes: OperationAttributes) extends RouteLogicFactory[In] { import FlexiRoute._ - def this(name: String) = this(Some(name)) - def this() = this(None) + def this(name: String) = this(OperationAttributes.name(name)) + def this() = this(OperationAttributes.none) private var outputCount = 0 // hide the internal vertex things from subclass, and make it possible to create new instance - private class RouteVertex(vertexName: Option[String]) extends FlowGraphInternal.InternalVertex { + private class RouteVertex(override val attributes: OperationAttributes) extends FlowGraphInternal.InternalVertex { override def minimumInputCount = 1 override def maximumInputCount = 1 override def minimumOutputCount = 2 override def maximumOutputCount = outputCount - override private[akka] val astNode = Ast.FlexiRouteNode(FlexiRoute.this.asInstanceOf[FlexiRoute[Any]]) - override def name = vertexName + override private[akka] val astNode = Ast.FlexiRouteNode(FlexiRoute.this.asInstanceOf[FlexiRoute[Any]], flexiRoute and attributes) - final override private[scaladsl] def newInstance() = new RouteVertex(None) + final override private[scaladsl] def newInstance() = new RouteVertex(OperationAttributes.none) } - private[scaladsl] val vertex: FlowGraphInternal.InternalVertex = new RouteVertex(name) + private[scaladsl] val vertex: FlowGraphInternal.InternalVertex = new RouteVertex(attributes) /** * Input port of the `FlexiRoute` junction. A [[Source]] can be connected to this output @@ -263,7 +264,7 @@ abstract class FlexiRoute[In](val name: Option[String]) extends RouteLogicFactor */ override def createRouteLogic(): RouteLogic[In] - override def toString = name match { + override def toString = attributes.nameLifted match { case Some(n) ⇒ n case None ⇒ getClass.getSimpleName + "@" + Integer.toHexString(super.hashCode()) } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index f4b008641c..6e017601f1 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -4,6 +4,7 @@ package akka.stream.scaladsl import akka.stream.impl.Ast._ +import akka.stream.scaladsl.OperationAttributes._ import akka.stream.{ TimerTransformer, TransformerLike, OverflowStrategy } import akka.util.Collections.EmptyImmutableSeq import scala.collection.immutable @@ -70,6 +71,13 @@ trait Flow[-In, +Out] extends FlowOps[Out] { * before this key. */ def withKey(key: Key): Flow[In, Out] + + /** + * Applies given [[OperationAttributes]] to a given section. + */ + def section[I <: In, O](attributes: OperationAttributes)(section: Flow[In, Out] ⇒ Flow[I, O]): Flow[I, O] = + section(this.withAttributes(attributes)).withAttributes(OperationAttributes.none) + } object Flow { @@ -122,7 +130,7 @@ trait RunnableFlow { */ trait FlowOps[+Out] { import FlowOps._ - type Repr[+O] + type Repr[+O] <: FlowOps[O] /** * Transform this stream by applying the given function to each of the elements @@ -199,10 +207,10 @@ trait FlowOps[+Out] { * `n` must be positive, and `d` must be greater than 0 seconds, otherwise * IllegalArgumentException is thrown. */ - def groupedWithin(n: Int, d: FiniteDuration): Repr[immutable.Seq[Out]] = { + def groupedWithin(n: Int, d: FiniteDuration): Repr[Out]#Repr[immutable.Seq[Out]] = { require(n > 0, "n must be greater than 0") require(d > Duration.Zero) - timerTransform("groupedWithin", () ⇒ new TimerTransformer[Out, immutable.Seq[Out]] { + withAttributes(name("groupedWithin")).timerTransform(() ⇒ new TimerTransformer[Out, immutable.Seq[Out]] { schedulePeriodically(GroupedWithinTimerKey, d) var buf: Vector[Out] = Vector.empty @@ -235,8 +243,8 @@ trait FlowOps[+Out] { /** * Discard the elements received within the given duration at beginning of the stream. */ - def dropWithin(d: FiniteDuration): Repr[Out] = - timerTransform("dropWithin", () ⇒ new TimerTransformer[Out, Out] { + def dropWithin(d: FiniteDuration): Repr[Out]#Repr[Out] = + withAttributes(name("dropWithin")).timerTransform(() ⇒ new TimerTransformer[Out, Out] { scheduleOnce(DropWithinTimerKey, d) var delegate: TransformerLike[Out, Out] = @@ -271,8 +279,8 @@ trait FlowOps[+Out] { * Note that this can be combined with [[#take]] to limit the number of elements * within the duration. */ - def takeWithin(d: FiniteDuration): Repr[Out] = - timerTransform("takeWithin", () ⇒ new TimerTransformer[Out, Out] { + def takeWithin(d: FiniteDuration): Repr[Out]#Repr[Out] = + withAttributes(name("takeWithin")).timerTransform(() ⇒ new TimerTransformer[Out, Out] { scheduleOnce(TakeWithinTimerKey, d) var delegate: TransformerLike[Out, Out] = FlowOps.identityTransformer[Out] @@ -331,8 +339,8 @@ trait FlowOps[+Out] { * This operator makes it possible to extend the `Flow` API when there is no specialized * operator that performs the transformation. */ - def transform[T](name: String, mkStage: () ⇒ Stage[Out, T]): Repr[T] = - andThen(StageFactory(mkStage, name)) + def transform[T](mkStage: () ⇒ Stage[Out, T]): Repr[T] = + andThen(StageFactory(mkStage)) /** * Takes up to `n` elements from the stream and returns a pair containing a strict sequence of the taken element @@ -377,7 +385,7 @@ trait FlowOps[+Out] { * This operation can be used on a stream of element type [[akka.stream.scaladsl.Source]]. */ def flatten[U](strategy: akka.stream.FlattenStrategy[Out, U]): Repr[U] = strategy match { - case _: FlattenStrategy.Concat[Out] ⇒ andThen(ConcatAll) + case _: FlattenStrategy.Concat[Out] ⇒ andThen(ConcatAll()) case _ ⇒ throw new IllegalArgumentException(s"Unsupported flattening strategy [${strategy.getClass.getName}]") } @@ -408,8 +416,11 @@ trait FlowOps[+Out] { * * Note that you can use [[#transform]] if you just need to transform elements time plays no role in the transformation. */ - private[akka] def timerTransform[U](name: String, mkStage: () ⇒ TimerTransformer[Out, U]): Repr[U] = - andThen(TimerTransform(mkStage.asInstanceOf[() ⇒ TimerTransformer[Any, Any]], name)) + private[akka] def timerTransform[U](mkStage: () ⇒ TimerTransformer[Out, U]): Repr[U] = + andThen(TimerTransform(mkStage.asInstanceOf[() ⇒ TimerTransformer[Any, Any]])) + + /** INTERNAL API */ + private[scaladsl] def withAttributes(attr: OperationAttributes): Repr[Out] /** INTERNAL API */ // Storing ops in reverse order @@ -440,4 +451,3 @@ private[stream] object FlowOps { override def onPush(elem: T, ctx: Context[T]): Directive = ctx.push(elem) } } - diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/FlowGraph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/FlowGraph.scala index dc0ecd3295..a6118f350a 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/FlowGraph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/FlowGraph.scala @@ -9,6 +9,8 @@ import akka.stream.FlowMaterializer import akka.stream.impl.Ast import akka.stream.impl.Ast.FanInAstNode import akka.stream.impl.{ DirectedGraphBuilder, Edge } +import akka.stream.impl.Ast.Defaults._ +import akka.stream.scaladsl.OperationAttributes._ import org.reactivestreams._ import scala.language.existentials @@ -59,11 +61,11 @@ private[akka] object Identity { def getId: Int = id.getAndIncrement } -private[akka] final class Identity[T]() extends FlowGraphInternal.InternalVertex with Junction[T] { +private[akka] final class Identity[T](override val attributes: OperationAttributes = OperationAttributes.none) extends FlowGraphInternal.InternalVertex with Junction[T] { import Identity._ // This vertex can not have a name or else there can only be one instance in the whole graph - def name: Option[String] = None + override def name: Option[String] = None override private[akka] val vertex = this override val minimumInputCount: Int = 1 @@ -71,9 +73,9 @@ private[akka] final class Identity[T]() extends FlowGraphInternal.InternalVertex override val minimumOutputCount: Int = 1 override val maximumOutputCount: Int = 1 - override private[akka] val astNode = Ast.IdentityAstNode(getId) + override private[akka] val astNode = Ast.IdentityAstNode(identityJunction and OperationAttributes.name(s"id$getId")) - final override private[scaladsl] def newInstance() = new Identity[T]() + final override private[scaladsl] def newInstance() = new Identity[T](attributes.withoutName) } object Merge { @@ -83,14 +85,16 @@ object Merge { * in the `FlowGraph`. This method creates a new instance every time it * is called and those instances are not `equal`. */ - def apply[T]: Merge[T] = new Merge[T](None) + def apply[T]: Merge[T] = new Merge[T](OperationAttributes.none) /** * Create a named `Merge` vertex with the specified output type. * Note that a `Merge` with a specific name can only be used at one place (one vertex) * in the `FlowGraph`. Calling this method several times with the same name * returns instances that are `equal`. */ - def apply[T](name: String): Merge[T] = new Merge[T](Some(name)) + def apply[T](name: String): Merge[T] = new Merge[T](OperationAttributes.name(name)) + + def apply[T](attributes: OperationAttributes): Merge[T] = new Merge[T](attributes) } /** @@ -100,16 +104,16 @@ object Merge { * When building the [[FlowGraph]] you must connect one or more input sources * and one output sink to the `Merge` vertex. */ -final class Merge[T](override val name: Option[String]) extends FlowGraphInternal.InternalVertex with Junction[T] { +final class Merge[T](override val attributes: OperationAttributes) extends FlowGraphInternal.InternalVertex with Junction[T] { override private[akka] val vertex = this override val minimumInputCount: Int = 2 override val maximumInputCount: Int = Int.MaxValue override val minimumOutputCount: Int = 1 override val maximumOutputCount: Int = 1 - override private[akka] def astNode = Ast.Merge + override private[akka] def astNode = Ast.Merge(merge and attributes) - final override private[scaladsl] def newInstance() = new Merge[T](None) + final override private[scaladsl] def newInstance() = new Merge[T](attributes.withoutName) } object MergePreferred { @@ -124,14 +128,16 @@ object MergePreferred { * in the `FlowGraph`. This method creates a new instance every time it * is called and those instances are not `equal`. */ - def apply[T]: MergePreferred[T] = new MergePreferred[T](None) + def apply[T]: MergePreferred[T] = new MergePreferred[T](OperationAttributes.none) /** * Create a named `MergePreferred` vertex with the specified output type. * Note that a `MergePreferred` with a specific name can only be used at one place (one vertex) * in the `FlowGraph`. Calling this method several times with the same name * returns instances that are `equal`. */ - def apply[T](name: String): MergePreferred[T] = new MergePreferred[T](Some(name)) + def apply[T](name: String): MergePreferred[T] = new MergePreferred[T](OperationAttributes.name(name)) + + def apply[T](attributes: OperationAttributes): MergePreferred[T] = new MergePreferred[T](attributes) class Preferred[A] private[akka] (private[akka] val vertex: MergePreferred[A]) extends JunctionInPort[A] { override private[akka] def port = PreferredPort @@ -146,7 +152,7 @@ object MergePreferred { * When building the [[FlowGraph]] you must connect one or more input sources * and one output sink to the `Merge` vertex. */ -final class MergePreferred[T](override val name: Option[String]) extends FlowGraphInternal.InternalVertex with Junction[T] { +final class MergePreferred[T](override val attributes: OperationAttributes) extends FlowGraphInternal.InternalVertex with Junction[T] { val preferred = new MergePreferred.Preferred(this) @@ -156,9 +162,9 @@ final class MergePreferred[T](override val name: Option[String]) extends FlowGra override val minimumOutputCount: Int = 1 override val maximumOutputCount: Int = 1 - override private[akka] def astNode = Ast.MergePreferred + override private[akka] def astNode = Ast.MergePreferred(mergePreferred and attributes) - final override private[scaladsl] def newInstance() = new MergePreferred[T](None) + final override private[scaladsl] def newInstance() = new MergePreferred[T](attributes.withoutName) } object Broadcast { @@ -168,14 +174,16 @@ object Broadcast { * in the `FlowGraph`. This method creates a new instance every time it * is called and those instances are not `equal`. */ - def apply[T]: Broadcast[T] = new Broadcast[T](None) + def apply[T]: Broadcast[T] = new Broadcast[T](OperationAttributes.none) /** * Create a named `Broadcast` vertex with the specified input type. * Note that a `Broadcast` with a specific name can only be used at one place (one vertex) * in the `FlowGraph`. Calling this method several times with the same name * returns instances that are `equal`. */ - def apply[T](name: String): Broadcast[T] = new Broadcast[T](Some(name)) + def apply[T](name: String): Broadcast[T] = new Broadcast[T](OperationAttributes.name(name)) + + def apply[T](attributes: OperationAttributes): Broadcast[T] = new Broadcast[T](attributes) } /** @@ -183,16 +191,16 @@ object Broadcast { * the other streams. It will not shutdown until the subscriptions for at least * two downstream subscribers have been established. */ -final class Broadcast[T](override val name: Option[String]) extends FlowGraphInternal.InternalVertex with Junction[T] { +final class Broadcast[T](override val attributes: OperationAttributes) extends FlowGraphInternal.InternalVertex with Junction[T] { override private[akka] def vertex = this override def minimumInputCount: Int = 1 override def maximumInputCount: Int = 1 override def minimumOutputCount: Int = 2 override def maximumOutputCount: Int = Int.MaxValue - override private[akka] def astNode = Ast.Broadcast + override private[akka] def astNode = Ast.Broadcast(broadcast and attributes) - final override private[scaladsl] def newInstance() = new Broadcast[T](None) + final override private[scaladsl] def newInstance() = new Broadcast[T](attributes.withoutName) } object Balance { @@ -202,7 +210,7 @@ object Balance { * in the `FlowGraph`. This method creates a new instance every time it * is called and those instances are not `equal`. */ - def apply[T]: Balance[T] = new Balance[T](None, waitForAllDownstreams = false) + def apply[T]: Balance[T] = new Balance[T](waitForAllDownstreams = false, OperationAttributes.none) /** * Create a named `Balance` vertex with the specified input type. * Note that a `Balance` with a specific name can only be used at one place (one vertex) @@ -212,7 +220,7 @@ object Balance { * If you use `waitForAllDownstreams = true` it will not start emitting * elements to downstream outputs until all of them have requested at least one element. */ - def apply[T](name: String, waitForAllDownstreams: Boolean = false): Balance[T] = new Balance[T](Some(name), waitForAllDownstreams) + def apply[T](name: String, waitForAllDownstreams: Boolean = false): Balance[T] = new Balance[T](waitForAllDownstreams, OperationAttributes.name(name)) /** * Create a new anonymous `Balance` vertex with the specified input type. @@ -223,7 +231,9 @@ object Balance { * If you use `waitForAllDownstreams = true` it will not start emitting * elements to downstream outputs until all of them have requested at least one element. */ - def apply[T](waitForAllDownstreams: Boolean): Balance[T] = new Balance[T](None, waitForAllDownstreams) + def apply[T](waitForAllDownstreams: Boolean): Balance[T] = new Balance[T](waitForAllDownstreams, OperationAttributes.none) + + def apply[T](waitForAllDownstreams: Boolean, attributes: OperationAttributes): Balance[T] = new Balance[T](waitForAllDownstreams, attributes) } /** @@ -231,16 +241,16 @@ object Balance { * one of the other streams. It will not shutdown until the subscriptions for at least * two downstream subscribers have been established. */ -final class Balance[T](override val name: Option[String], val waitForAllDownstreams: Boolean) extends FlowGraphInternal.InternalVertex with Junction[T] { +final class Balance[T](val waitForAllDownstreams: Boolean, override val attributes: OperationAttributes) extends FlowGraphInternal.InternalVertex with Junction[T] { override private[akka] def vertex = this override def minimumInputCount: Int = 1 override def maximumInputCount: Int = 1 override def minimumOutputCount: Int = 2 override def maximumOutputCount: Int = Int.MaxValue - override private[akka] val astNode = Ast.Balance(waitForAllDownstreams) + override private[akka] val astNode = Ast.Balance(waitForAllDownstreams, balance and attributes) - final override private[scaladsl] def newInstance() = new Balance[T](None, waitForAllDownstreams) + final override private[scaladsl] def newInstance() = new Balance[T](waitForAllDownstreams, attributes.withoutName) } object Zip { @@ -250,7 +260,7 @@ object Zip { * in the `FlowGraph`. This method creates a new instance every time it * is called and those instances are not `equal`.* */ - def apply[A, B]: Zip[A, B] = new Zip[A, B](None) + def apply[A, B]: Zip[A, B] = new Zip[A, B](OperationAttributes.none) /** * Create a named `Zip` vertex with the specified input types. @@ -258,7 +268,9 @@ object Zip { * in the `FlowGraph`. This method creates a new instance every time it * is called and those instances are not `equal`.* */ - def apply[A, B](name: String): Zip[A, B] = new Zip[A, B](Some(name)) + def apply[A, B](name: String): Zip[A, B] = new Zip[A, B](OperationAttributes.name(name)) + + def apply[A, B](attr: OperationAttributes): Zip[A, B] = new Zip[A, B](attr) class Left[A, B] private[akka] (private[akka] val vertex: Zip[A, B]) extends JunctionInPort[A] { override private[akka] def port = 0 @@ -278,7 +290,7 @@ object Zip { * by combining corresponding elements in pairs. If one of the two streams is * longer than the other, its remaining elements are ignored. */ -private[akka] class Zip[A, B](override val name: Option[String]) extends FlowGraphInternal.InternalVertex { +private[akka] class Zip[A, B](override val attributes: OperationAttributes) extends FlowGraphInternal.InternalVertex { import akka.stream.impl.Zip.AsScalaTuple2 val left = new Zip.Left(this) @@ -290,9 +302,9 @@ private[akka] class Zip[A, B](override val name: Option[String]) extends FlowGra override def minimumOutputCount: Int = 1 override def maximumOutputCount: Int = 1 - override private[akka] def astNode: FanInAstNode = Ast.Zip(AsScalaTuple2) + override private[akka] def astNode: FanInAstNode = Ast.Zip(AsScalaTuple2, zip and attributes) - final override private[scaladsl] def newInstance() = new Zip[A, B](name = None) + final override private[scaladsl] def newInstance() = new Zip[A, B](attributes.withoutName) } object Unzip { @@ -302,7 +314,7 @@ object Unzip { * in the `FlowGraph`. This method creates a new instance every time it * is called and those instances are not `equal`.* */ - def apply[A, B]: Unzip[A, B] = new Unzip[A, B](None) + def apply[A, B]: Unzip[A, B] = new Unzip[A, B](OperationAttributes.none) /** * Create a named `Unzip` vertex with the specified output types. @@ -310,7 +322,9 @@ object Unzip { * in the `FlowGraph`. This method creates a new instance every time it * is called and those instances are not `equal`.* */ - def apply[A, B](name: String): Unzip[A, B] = new Unzip[A, B](Some(name)) + def apply[A, B](name: String): Unzip[A, B] = new Unzip[A, B](OperationAttributes.name(name)) + + def apply[A, B](attributes: OperationAttributes): Unzip[A, B] = new Unzip[A, B](attributes) class In[A, B] private[akka] (private[akka] val vertex: Unzip[A, B]) extends JunctionInPort[(A, B)] { override type NextT = Nothing @@ -328,7 +342,7 @@ object Unzip { /** * Takes a stream of pair elements and splits each pair to two output streams. */ -final class Unzip[A, B](override val name: Option[String]) extends FlowGraphInternal.InternalVertex { +final class Unzip[A, B](override val attributes: OperationAttributes) extends FlowGraphInternal.InternalVertex { val in = new Unzip.In(this) val left = new Unzip.Left(this) val right = new Unzip.Right(this) @@ -338,9 +352,9 @@ final class Unzip[A, B](override val name: Option[String]) extends FlowGraphInte override def minimumOutputCount: Int = 2 override def maximumOutputCount: Int = 2 - override private[akka] def astNode = Ast.Unzip + override private[akka] def astNode = Ast.Unzip(unzip and attributes) - final override private[scaladsl] def newInstance() = new Unzip[A, B](name = None) + final override private[scaladsl] def newInstance() = new Unzip[A, B](attributes.withoutName) } object Concat { @@ -350,7 +364,7 @@ object Concat { * in the `FlowGraph`. This method creates a new instance every time it * is called and those instances are not `equal`.* */ - def apply[T]: Concat[T] = new Concat[T](None) + def apply[T]: Concat[T] = new Concat[T](OperationAttributes.none) /** * Create a named `Concat` vertex with the specified input types. @@ -358,7 +372,9 @@ object Concat { * in the `FlowGraph`. This method creates a new instance every time it * is called and those instances are not `equal`.* */ - def apply[T](name: String): Concat[T] = new Concat[T](Some(name)) + def apply[T](name: String): Concat[T] = new Concat[T](OperationAttributes.name(name)) + + def apply[T](attributes: OperationAttributes): Concat[T] = new Concat[T](attributes) class First[T] private[akka] (val vertex: Concat[T]) extends JunctionInPort[T] { override val port = 0 @@ -378,7 +394,7 @@ object Concat { * by consuming one stream first emitting all of its elements, then consuming the * second stream emitting all of its elements. */ -final class Concat[T](override val name: Option[String]) extends FlowGraphInternal.InternalVertex { +final class Concat[T](override val attributes: OperationAttributes) extends FlowGraphInternal.InternalVertex { val first = new Concat.First(this) val second = new Concat.Second(this) val out = new Concat.Out(this) @@ -388,9 +404,9 @@ final class Concat[T](override val name: Option[String]) extends FlowGraphIntern override def minimumOutputCount: Int = 1 override def maximumOutputCount: Int = 1 - override private[akka] def astNode = Ast.Concat + override private[akka] def astNode = Ast.Concat(concat and attributes) - final override private[scaladsl] def newInstance() = new Concat[T](name = None) + final override private[scaladsl] def newInstance() = new Concat[T](attributes.withoutName) } object UndefinedSink { @@ -400,21 +416,21 @@ object UndefinedSink { * in the `FlowGraph`. This method creates a new instance every time it * is called and those instances are not `equal`. */ - def apply[T]: UndefinedSink[T] = new UndefinedSink[T](None) + def apply[T]: UndefinedSink[T] = new UndefinedSink[T](OperationAttributes.none) /** * Create a named `UndefinedSink` vertex with the specified input type. * Note that a `UndefinedSink` with a specific name can only be used at one place (one vertex) * in the `FlowGraph`. Calling this method several times with the same name * returns instances that are `equal`. */ - def apply[T](name: String): UndefinedSink[T] = new UndefinedSink[T](Some(name)) + def apply[T](name: String): UndefinedSink[T] = new UndefinedSink[T](OperationAttributes.name(name)) } /** * It is possible to define a [[PartialFlowGraph]] with output pipes that are not connected * yet by using this placeholder instead of the real [[Sink]]. Later the placeholder can * be replaced with [[FlowGraphBuilder#attachSink]]. */ -final class UndefinedSink[-T](override val name: Option[String]) extends FlowGraphInternal.InternalVertex { +final class UndefinedSink[-T](override val attributes: OperationAttributes) extends FlowGraphInternal.InternalVertex { override def minimumInputCount: Int = 1 override def maximumInputCount: Int = 1 @@ -423,7 +439,7 @@ final class UndefinedSink[-T](override val name: Option[String]) extends FlowGra override private[akka] def astNode = throw new UnsupportedOperationException("Undefined sinks cannot be materialized") - final override private[scaladsl] def newInstance() = new UndefinedSink[T](name = None) + final override private[scaladsl] def newInstance() = new UndefinedSink[T](attributes.withoutName) } object UndefinedSource { @@ -433,21 +449,21 @@ object UndefinedSource { * in the `FlowGraph`. This method creates a new instance every time it * is called and those instances are not `equal`. */ - def apply[T]: UndefinedSource[T] = new UndefinedSource[T](None) + def apply[T]: UndefinedSource[T] = new UndefinedSource[T](OperationAttributes.none) /** * Create a named `UndefinedSource` vertex with the specified output type. * Note that a `UndefinedSource` with a specific name can only be used at one place (one vertex) * in the `FlowGraph`. Calling this method several times with the same name * returns instances that are `equal`. */ - def apply[T](name: String): UndefinedSource[T] = new UndefinedSource[T](Some(name)) + def apply[T](name: String): UndefinedSource[T] = new UndefinedSource[T](OperationAttributes.name(name)) } /** * It is possible to define a [[PartialFlowGraph]] with input pipes that are not connected * yet by using this placeholder instead of the real [[Source]]. Later the placeholder can * be replaced with [[FlowGraphBuilder#attachSource]]. */ -final class UndefinedSource[+T](override val name: Option[String]) extends FlowGraphInternal.InternalVertex { +final class UndefinedSource[+T](override val attributes: OperationAttributes) extends FlowGraphInternal.InternalVertex { override def minimumInputCount: Int = 0 override def maximumInputCount: Int = 0 override def minimumOutputCount: Int = 1 @@ -455,7 +471,7 @@ final class UndefinedSource[+T](override val name: Option[String]) extends FlowG override private[akka] def astNode = throw new UnsupportedOperationException("Undefined sources cannot be materialized") - final override private[scaladsl] def newInstance() = new UndefinedSource[T](name = None) + final override private[scaladsl] def newInstance() = new UndefinedSource[T](attributes.withoutName) } /** @@ -518,7 +534,8 @@ private[akka] object FlowGraphInternal { } trait InternalVertex extends Vertex { - def name: Option[String] + def attributes: OperationAttributes + def name: Option[String] = attributes.nameLifted def minimumInputCount: Int def maximumInputCount: Int @@ -1138,7 +1155,7 @@ class FlowGraphBuilder private[akka] ( node.outDegree <= v.maximumOutputCount, s"$v must have at most ${v.maximumOutputCount} outgoing edges") v.astNode match { - case Ast.MergePreferred ⇒ + case Ast.MergePreferred(_) ⇒ require( node.incoming.count(_.label.inputPort == MergePreferred.PreferredPort) <= 1, s"$v must have at most one preferred edge") diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/GraphFlow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/GraphFlow.scala index 79c6282986..1062d4c42d 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/GraphFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/GraphFlow.scala @@ -119,6 +119,8 @@ private[scaladsl] case class GraphFlow[-In, CIn, COut, +Out]( override def withKey(key: Key): Flow[In, Out] = this.copy(outPipe = outPipe.withKey(key)) override private[scaladsl] def andThen[T](op: AstNode): Repr[T] = copy(outPipe = outPipe.andThen(op)) + + def withAttributes(attr: OperationAttributes): Repr[Out] = copy(outPipe = outPipe.withAttributes(attr)) } private[scaladsl] case class GraphSource[COut, +Out](graph: PartialFlowGraph, out: UndefinedSink[COut], outPipe: Pipe[COut, Out]) extends Source[Out] { @@ -164,6 +166,8 @@ private[scaladsl] case class GraphSource[COut, +Out](graph: PartialFlowGraph, ou override def withKey(key: Key): Source[Out] = this.copy(outPipe = outPipe.withKey(key)) override private[scaladsl] def andThen[T](op: AstNode): Repr[T] = copy(outPipe = outPipe.andThen(op)) + + def withAttributes(attr: OperationAttributes): Repr[Out] = copy(outPipe = outPipe.withAttributes(attr)) } private[scaladsl] case class GraphSink[-In, CIn](inPipe: Pipe[In, CIn], in: UndefinedSource[CIn], graph: PartialFlowGraph) extends Sink[In] { diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/OperationAttributes.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/OperationAttributes.scala new file mode 100644 index 0000000000..6c59a14e70 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/OperationAttributes.scala @@ -0,0 +1,90 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.scaladsl + +import akka.stream.MaterializerSettings +import akka.stream.impl.Ast.AstNode + +/** + * Holds attributes which can be used to alter [[Flow]] or [[FlowGraph]] + * materialization. + */ +case class OperationAttributes private (private val attributes: List[OperationAttributes.Attribute] = Nil) { + + import OperationAttributes._ + + /** + * Adds given attributes to the end of these attributes. + */ + def and(other: OperationAttributes): OperationAttributes = { + OperationAttributes(attributes ::: other.attributes) + } + + private[akka] def nameLifted: Option[String] = + attributes.collect { + case Name(name) ⇒ name + }.reduceOption(_ + "-" + _) + + private[akka] def name: String = nameLifted match { + case Some(name) ⇒ name + case _ ⇒ "unknown-operation" + } + + private[akka] def settings: MaterializerSettings ⇒ MaterializerSettings = + attributes.collect { + case InputBuffer(initial, max) ⇒ (s: MaterializerSettings) ⇒ s.withInputBuffer(initial, max) + case FanOutBuffer(initial, max) ⇒ (s: MaterializerSettings) ⇒ s.withFanOutBuffer(initial, max) + case Dispatcher(dispatcher) ⇒ (s: MaterializerSettings) ⇒ s.withDispatcher(dispatcher) + }.reduceOption(_ andThen _).getOrElse(identity) + + private[akka] def transform(node: AstNode): AstNode = + if ((this eq OperationAttributes.none) || (this eq node.attributes)) node + else node.withAttributes(attributes = this and node.attributes) + + /** + * Filtering out name attributes is needed for Vertex.newInstance(). + * However there is an ongoing discussion for removing this feature, + * after which this will not be needed anymore. + * + * https://github.com/akka/akka/issues/16392 + */ + private[akka] def withoutName = this.copy( + attributes = attributes.filterNot { + case attr: Name ⇒ true + }) +} + +object OperationAttributes { + + private[OperationAttributes] trait Attribute + private[OperationAttributes] case class Name(n: String) extends Attribute + private[OperationAttributes] case class InputBuffer(initial: Int, max: Int) extends Attribute + private[OperationAttributes] case class FanOutBuffer(initial: Int, max: Int) extends Attribute + private[OperationAttributes] case class Dispatcher(dispatcher: String) extends Attribute + + private[OperationAttributes] def apply(attribute: Attribute): OperationAttributes = + apply(List(attribute)) + + private[akka] val none: OperationAttributes = OperationAttributes() + + /** + * Specifies the name of the operation. + */ + def name(name: String): OperationAttributes = OperationAttributes(Name(name)) + + /** + * Specifies the initial and maximum size of the input buffer. + */ + def inputBuffer(initial: Int, max: Int): OperationAttributes = OperationAttributes(InputBuffer(initial, max)) + + /** + * Specifies the initial and maximum size of the fan out buffer. + */ + def fanOutBuffer(initial: Int, max: Int): OperationAttributes = OperationAttributes(FanOutBuffer(initial, max)) + + /** + * Specifies the name of the dispatcher. + */ + def dispatcher(dispatcher: String): OperationAttributes = OperationAttributes(Dispatcher(dispatcher)) +} diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Pipe.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Pipe.scala index 65f4e192c2..652a27f714 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Pipe.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Pipe.scala @@ -33,10 +33,12 @@ private[akka] object Pipe { /** * Flow with one open input and one open output. */ -private[akka] final case class Pipe[-In, +Out](ops: List[AstNode], keys: List[Key]) extends Flow[In, Out] { +private[akka] final case class Pipe[-In, +Out](ops: List[AstNode], keys: List[Key], attributes: OperationAttributes = OperationAttributes.none) extends Flow[In, Out] { override type Repr[+O] = Pipe[In @uncheckedVariance, O] - override private[scaladsl] def andThen[U](op: AstNode): Repr[U] = Pipe(ops = op :: ops, keys) // FIXME raw addition of AstNodes + override private[scaladsl] def andThen[U](op: AstNode): Repr[U] = Pipe(ops = attributes.transform(op) :: ops, keys, attributes) // FIXME raw addition of AstNodes + + def withAttributes(attr: OperationAttributes): Repr[Out] = this.copy(attributes = attr) private[stream] def withSink(out: Sink[Out]): SinkPipe[In] = SinkPipe(out, ops, keys) @@ -79,10 +81,12 @@ private[stream] final case class SinkPipe[-In](output: Sink[_], ops: List[AstNod /** * Pipe with open output and attached input. Can be used as a `Publisher`. */ -private[stream] final case class SourcePipe[+Out](input: Source[_], ops: List[AstNode], keys: List[Key]) extends Source[Out] { +private[stream] final case class SourcePipe[+Out](input: Source[_], ops: List[AstNode], keys: List[Key], attributes: OperationAttributes = OperationAttributes.none) extends Source[Out] { override type Repr[+O] = SourcePipe[O] - override private[scaladsl] def andThen[U](op: AstNode): Repr[U] = SourcePipe(input, op :: ops, keys) // FIXME raw addition of AstNodes + override private[scaladsl] def andThen[U](op: AstNode): Repr[U] = SourcePipe(input, attributes.transform(op) :: ops, keys, attributes) // FIXME raw addition of AstNodes + + def withAttributes(attr: OperationAttributes): Repr[Out] = this.copy(attributes = attr) private[stream] def withSink(out: Sink[Out]): RunnablePipe = RunnablePipe(input, out, ops, keys) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index 6318497b38..263a3f5d67 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -79,6 +79,13 @@ trait Source[+Out] extends FlowOps[Out] { * before this key. This also includes the keyed source if applicable. */ def withKey(key: Key): Source[Out] + + /** + * Applies given [[OperationAttributes]] to a given section. + */ + def section[T](attributes: OperationAttributes)(section: Source[Out] ⇒ Source[T]): Source[T] = + section(this.withAttributes(attributes)).withAttributes(OperationAttributes.none) + } object Source {