diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/AttributesSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/AttributesSpec.scala index 23e43cf895..38c2d042b9 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/AttributesSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/AttributesSpec.scala @@ -3,39 +3,110 @@ */ package akka.stream.scaladsl -import akka.stream.ActorMaterializer -import akka.stream.ActorMaterializerSettings -import akka.stream.Attributes +import java.util.Optional +import java.util.concurrent.{ CompletableFuture, CompletionStage, TimeUnit } + +import akka.{ Done, NotUsed } import akka.stream.Attributes._ -import akka.stream.MaterializationContext -import akka.stream.SinkShape +import akka.stream._ +import akka.stream.javadsl +import akka.stream.stage._ import akka.stream.testkit._ -import scala.concurrent.Future -import scala.concurrent.Promise -import akka.stream.impl.SinkModule -import akka.stream.impl.SinkholeSubscriber +import com.typesafe.config.ConfigFactory object AttributesSpec { - object AttributesSink { - def apply(): Sink[Nothing, Future[Attributes]] = - Sink.fromGraph[Nothing, Future[Attributes]](new AttributesSink(Attributes.name("attributesSink"), Sink.shape("attributesSink"))) + class AttributesSource(_initialAttributes: Attributes = Attributes.none) extends GraphStageWithMaterializedValue[SourceShape[Any], Attributes] { + val out = Outlet[Any]("out") + override protected def initialAttributes: Attributes = _initialAttributes + override val shape = SourceShape.of(out) + + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Attributes) = { + val logic = new GraphStageLogic(shape) { + setHandler(out, new OutHandler { + def onPull(): Unit = { + } + }) + } + (logic, inheritedAttributes) + } + } - final class AttributesSink(val attributes: Attributes, shape: SinkShape[Nothing]) extends SinkModule[Nothing, Future[Attributes]](shape) { - override def create(context: MaterializationContext) = - (new SinkholeSubscriber(Promise()), Future.successful(context.effectiveAttributes)) + class AttributesFlow(_initialAttributes: Attributes = Attributes.none) extends GraphStageWithMaterializedValue[FlowShape[Any, Any], Attributes] { - override protected def newInstance(shape: SinkShape[Nothing]): SinkModule[Nothing, Future[Attributes]] = - new AttributesSink(attributes, shape) + val in = Inlet[Any]("in") + val out = Outlet[Any]("out") - override def withAttributes(attr: Attributes): SinkModule[Nothing, Future[Attributes]] = - new AttributesSink(attr, amendShape(attr)) + override protected def initialAttributes: Attributes = _initialAttributes + override val shape = FlowShape(in, out) + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Attributes) = { + val logic = new GraphStageLogic(shape) { + + setHandlers(in, out, new InHandler with OutHandler { + override def onPush(): Unit = push(out, grab(in)) + override def onPull(): Unit = pull(in) + }) + } + + (logic, inheritedAttributes) + } } + class AttributesSink(_initialAttributes: Attributes = Attributes.none) extends GraphStageWithMaterializedValue[SinkShape[Any], Attributes] { + + val in = Inlet[Any]("in") + + override protected def initialAttributes: Attributes = _initialAttributes + override val shape = SinkShape(in) + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Attributes) = { + val logic = new GraphStageLogic(shape) { + override def preStart(): Unit = { + pull(in) + } + setHandler(in, new InHandler { + override def onPush(): Unit = { + grab(in) + pull(in) + } + }) + } + + (logic, inheritedAttributes) + } + } + + class ThreadNameSnitchingStage(initialDispatcher: String) extends GraphStage[SourceShape[String]] { + val out = Outlet[String]("out") + override val shape = SourceShape.of(out) + override protected def initialAttributes: Attributes = ActorAttributes.dispatcher(initialDispatcher) + def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { + setHandler(out, new OutHandler { + def onPull(): Unit = { + push(out, Thread.currentThread.getName) + completeStage() + } + }) + } + + } + + def whateverAttribute(label: String): Attributes = Attributes(WhateverAttribute(label)) + case class WhateverAttribute(label: String) extends Attribute } -class AttributesSpec extends StreamSpec { +class AttributesSpec extends StreamSpec(ConfigFactory.parseString( + """ + my-dispatcher { + type = Dispatcher + executor = "thread-pool-executor" + thread-pool-executor { + fixed-pool-size = 1 + } + throughput = 1 + } + """).withFallback(Utils.UnboundedMailboxConfig)) { + import AttributesSpec._ val settings = ActorMaterializerSettings(system) @@ -43,32 +114,376 @@ class AttributesSpec extends StreamSpec { implicit val materializer = ActorMaterializer(settings) - "attributes" must { - - "be overridable on a module basis" in { - val runnable = Source.empty.toMat(AttributesSink().withAttributes(Attributes.name("new-name")))(Keep.right) - whenReady(runnable.run()) { attributes ⇒ - attributes.get[Name] should contain(Name("new-name")) - } - } - - "keep the outermost attribute as the least specific" in { - val runnable = Source.empty.toMat(AttributesSink())(Keep.right).withAttributes(Attributes.name("new-name")) - whenReady(runnable.run()) { attributes ⇒ - attributes.get[Name] should contain(Name("attributesSink")) - } - } + "an attributes instance" must { val attributes = Attributes.name("a") and Attributes.name("b") and Attributes.inputBuffer(1, 2) - "give access to first attribute" in { + "give access to the least specific attribute" in { attributes.getFirst[Name] should ===(Some(Attributes.Name("a"))) } - "give access to attribute byt type" in { + "give access to the most specific attribute value" in { attributes.get[Name] should ===(Some(Attributes.Name("b"))) } } + "attributes on a graph stage" must { + + "be appended with addAttributes" in { + val attributes = + Source.fromGraph(new AttributesSource() + .addAttributes(Attributes.name("new-name")) + .addAttributes(Attributes.name("re-added")) // adding twice at same level replaces + .addAttributes(whateverAttribute("other-thing")) + ) + .toMat(Sink.head)(Keep.left) + .run() + + attributes.get[Name] should contain(Name("re-added")) + attributes.get[WhateverAttribute] should contain(WhateverAttribute("other-thing")) + } + + "be replaced withAttributes directly on a stage" in { + val attributes = + Source.fromGraph(new AttributesSource() + .withAttributes(Attributes.name("new-name") and whateverAttribute("other-thing")) + .withAttributes(Attributes.name("re-added")) // we loose all previous attributes for same level + ) + .toMat(Sink.head)(Keep.left) + .run() + + attributes.get[Name] should contain(Name("re-added")) + attributes.get[WhateverAttribute] shouldBe empty + } + + "be overridable on a module basis" in { + val attributes = + Source.fromGraph(new AttributesSource().withAttributes(Attributes.name("new-name"))) + .toMat(Sink.head)(Keep.left) + .run() + + attributes.get[Name] should contain(Name("new-name")) + } + + "keep the outermost attribute as the least specific" in { + val attributes = Source.fromGraph(new AttributesSource(Attributes.name("original-name"))) + .map(identity) + .addAttributes(Attributes.name("whole-graph")) + .toMat(Sink.head)(Keep.left) + .run() + + // most specific + attributes.get[Name] should contain(Name("original-name")) + + // least specific + attributes.getFirst[Name] should contain(Name("whole-graph")) + } + } + + "attributes on a source" must { + + "make the attributes on fromGraph(single-source-stage) Source behave the same as the stage itself" in { + val attributes = + Source.fromGraph( + new AttributesSource(Attributes.name("original-name") and whateverAttribute("whatever")) + .withAttributes(Attributes.name("new-name"))) + .toMat(Sink.head)(Keep.left) + .run() + + // most specific + attributes.get[Name] should contain(Name("new-name")) + + // least specific + attributes.getFirst[Name] should contain(Name("new-name")) + } + + "make the attributes on Source.fromGraph source behave the same as the stage itself" in { + val attributes = + Source.fromGraph(new AttributesSource(Attributes.name("original-name"))) + .withAttributes(Attributes.name("replaced")) // this actually replaces now + .toMat(Sink.head)(Keep.left).withAttributes(Attributes.name("whole-graph")) + .run() + + // most specific + attributes.get[Name] should contain(Name("replaced")) + attributes.get[WhateverAttribute] shouldBe empty + + // least specific + attributes.getFirst[Name] should contain(Name("whole-graph")) + attributes.getFirst[WhateverAttribute] shouldBe empty + } + + "not replace stage specific attributes with attributes on surrounding composite source" in { + val attributes = Source.fromGraph(new AttributesSource(Attributes.name("original-name"))) + .map(identity) + .addAttributes(Attributes.name("composite-graph")) + .toMat(Sink.head)(Keep.left) + .run() + + // most specific still the original as the attribute was added on the composite source + attributes.get[Name] should contain(Name("original-name")) + + // least specific + attributes.getFirst[Name] should contain(Name("composite-graph")) + } + + "make the attributes on Sink.fromGraph source behave the same as the stage itself" in { + val attributes = + Source.maybe.toMat( + Sink.fromGraph(new AttributesSink(Attributes.name("original-name"))) + .withAttributes(Attributes.name("replaced")) // this actually replaces now + )(Keep.right) + .withAttributes(Attributes.name("whole-graph")) + .run() + + // most specific + attributes.get[Name] should contain(Name("replaced")) + + // least specific + attributes.getFirst[Name] should contain(Name("whole-graph")) + } + + "use the initial attributes for dispatcher" in { + val dispatcher = + Source.fromGraph(new ThreadNameSnitchingStage("my-dispatcher")) + .runWith(Sink.head) + .futureValue + + dispatcher should startWith("AttributesSpec-my-dispatcher") + } + + "use an explicit attribute on the stage to select dispatcher" in { + val dispatcher = + Source.fromGraph( + // directly on stage + new ThreadNameSnitchingStage("akka.stream.default-blocking-io-dispatcher") + .addAttributes(ActorAttributes.dispatcher("my-dispatcher"))) + .runWith(Sink.head) + .futureValue + + dispatcher should startWith("AttributesSpec-my-dispatcher") + } + + "use the most specific dispatcher when another one is defined on a surrounding composed graph" in { + val dispatcher = + Source.fromGraph( + new ThreadNameSnitchingStage("akka.stream.default-blocking-io-dispatcher")) + .map(identity) + // this is now for the composed source -> flow graph + .addAttributes(ActorAttributes.dispatcher("my-dispatcher")) + .runWith(Sink.head) + .futureValue + + dispatcher should startWith("AttributesSpec-akka.stream.default-blocking-io-dispatcher") + } + + "not change dispatcher from one defined on a surrounding graph" in { + val dispatcher = + Source.fromGraph( + new ThreadNameSnitchingStage("akka.stream.default-blocking-io-dispatcher")) + // this already introduces an async boundary here + .map(identity) + // this is now just for map since there already is one inbetween stage and map + .async // potential sugar .async("my-dispatcher") + .addAttributes(ActorAttributes.dispatcher("my-dispatcher")) + .runWith(Sink.head) + .futureValue + + dispatcher should startWith("AttributesSpec-akka.stream.default-blocking-io-dispatcher") + } + + "change dispatcher when defined directly on top of the async boundary" in { + val dispatcher = + Source.fromGraph( + new ThreadNameSnitchingStage("akka.stream.default-blocking-io-dispatcher")) + .async + .withAttributes(ActorAttributes.dispatcher("my-dispatcher")) + .runWith(Sink.head) + .futureValue + + dispatcher should startWith("AttributesSpec-my-dispatcher") + } + + "change dispatcher when defined on the async call" in { + val dispatcher = + Source.fromGraph( + new ThreadNameSnitchingStage("akka.stream.default-blocking-io-dispatcher")) + .async("my-dispatcher") + .runWith(Sink.head) + .futureValue + + dispatcher should startWith("AttributesSpec-my-dispatcher") + } + } + + "attributes on a Flow" must { + + "make the attributes on fromGraph(flow-stage) Flow behave the same as the stage itself" in { + val attributes = + Source.empty + .viaMat( + Flow.fromGraph(new AttributesFlow(Attributes.name("original-name"))) + .withAttributes(Attributes.name("replaced")) // this actually replaces now + )(Keep.right) + .withAttributes(Attributes.name("source-flow")) + .toMat(Sink.ignore)(Keep.left) + .withAttributes(Attributes.name("whole-graph")) + .run() + + attributes.get[Name] should contain(Name("replaced")) + attributes.getFirst[Name] should contain(Name("whole-graph")) + } + + "handle attributes on a composed flow" in { + val attributes = + Source.empty + .viaMat( + Flow.fromGraph(new AttributesFlow(Attributes.name("original-name"))) + .map(identity) + .withAttributes(Attributes.name("replaced")) + .addAttributes(whateverAttribute("whatever")) + .withAttributes(Attributes.name("replaced-again")) + .addAttributes(whateverAttribute("replaced")) + )(Keep.right) + .toMat(Sink.ignore)(Keep.left) + .run() + + // this verifies that the old docs on flow.withAttribues was in fact incorrect + // there is no sealing going on here + attributes.get[Name] should contain(Name("original-name")) + attributes.get[WhateverAttribute] should contain(WhateverAttribute("replaced")) + + attributes.getFirst[Name] should contain(Name("replaced-again")) + attributes.getFirst[WhateverAttribute] should contain(WhateverAttribute("replaced")) + } + + } + + "attributes on a Sink" must { + "make the attributes on fromGraph(sink-stage) Sink behave the same as the stage itself" in { + val attributes = + Source.empty.toMat( + Sink.fromGraph(new AttributesSink(Attributes.name("original-name"))) + .withAttributes(Attributes.name("replaced")) // this actually replaces now + )(Keep.right) + .withAttributes(Attributes.name("whole-graph")) + .run() + + // most specific + attributes.get[Name] should contain(Name("replaced")) + + // least specific + attributes.getFirst[Name] should contain(Name("whole-graph")) + } + + } + + "attributes in the javadsl source" must { + + "not change dispatcher from one defined on a surrounding graph" in { + val dispatcherF = + javadsl.Source.fromGraph( + new ThreadNameSnitchingStage("akka.stream.default-blocking-io-dispatcher")) + // this already introduces an async boundary here + .detach + // this is now just for map since there already is one inbetween stage and map + .async + .addAttributes(ActorAttributes.dispatcher("my-dispatcher")) + .runWith(javadsl.Sink.head(), materializer) + + val dispatcher = dispatcherF.toCompletableFuture.get(remainingOrDefault.toMillis, TimeUnit.MILLISECONDS) + + dispatcher should startWith("AttributesSpec-akka.stream.default-blocking-io-dispatcher") + } + + "change dispatcher when defined directly on top of the async boundary" in { + val dispatcherF = + javadsl.Source.fromGraph( + new ThreadNameSnitchingStage("akka.stream.default-blocking-io-dispatcher")) + .async + .withAttributes(ActorAttributes.dispatcher("my-dispatcher")) + .runWith(javadsl.Sink.head(), materializer) + + val dispatcher = dispatcherF.toCompletableFuture.get(remainingOrDefault.toMillis, TimeUnit.MILLISECONDS) + + dispatcher should startWith("AttributesSpec-my-dispatcher") + } + + "make the attributes on Source.fromGraph source behave the same as the stage itself" in { + val attributes: Attributes = + javadsl.Source.fromGraph(new AttributesSource(Attributes.name("original-name"))) + .withAttributes(Attributes.name("replaced")) // this actually replaces now + .toMat(javadsl.Sink.ignore(), javadsl.Keep.left[Attributes, CompletionStage[Done]]) + .withAttributes(Attributes.name("whole-graph")) + .run(materializer) + + // most specific + attributes.get[Name] should contain(Name("replaced")) + + // least specific + attributes.getFirst[Name] should contain(Name("whole-graph")) + } + + "make the attributes on Flow.fromGraph source behave the same as the stage itself" in { + val attributes: Attributes = + javadsl.Source.empty[Any] + .viaMat( + javadsl.Flow.fromGraph(new AttributesFlow(Attributes.name("original-name"))) + .withAttributes(Attributes.name("replaced")) // this actually replaces now + , javadsl.Keep.right[NotUsed, Attributes]) + .withAttributes(Attributes.name("source-flow")) + .toMat(javadsl.Sink.ignore(), javadsl.Keep.left[Attributes, CompletionStage[Done]]) + .withAttributes(Attributes.name("whole-graph")) + .run(materializer) + + // most specific + attributes.get[Name] should contain(Name("replaced")) + + // least specific + attributes.getFirst[Name] should contain(Name("whole-graph")) + } + + "make the attributes on Sink.fromGraph source behave the same as the stage itself" in { + val attributes: Attributes = + javadsl.Source.empty[Any].toMat( + javadsl.Sink.fromGraph(new AttributesSink(Attributes.name("original-name"))) + .withAttributes(Attributes.name("replaced")) // this actually replaces now + , javadsl.Keep.right[NotUsed, Attributes]) + .withAttributes(Attributes.name("whole-graph")) + .run(materializer) + + // most specific + attributes.get[Name] should contain(Name("replaced")) + + // least specific + attributes.getFirst[Name] should contain(Name("whole-graph")) + } + + } + + "attributes on the materializer" should { + + "be defaults and not used when more specific attributes are found" in { + + // dispatcher set on the materializer + val myDispatcherMaterializer = ActorMaterializer(settings.withDispatcher("my-dispatcher")) + + try { + val dispatcher = + Source.fromGraph(new ThreadNameSnitchingStage("akka.stream.default-blocking-io-dispatcher")) + .runWith(Sink.head)(myDispatcherMaterializer) + .futureValue + + // should not override stage specific dispatcher + dispatcher should startWith("AttributesSpec-akka.stream.default-blocking-io-dispatcher") + + } finally { + myDispatcherMaterializer.shutdown() + } + + } + + } + } diff --git a/akka-stream/src/main/mima-filters/2.5.7.backwards.excludes b/akka-stream/src/main/mima-filters/2.5.7.backwards.excludes new file mode 100644 index 0000000000..d9a746cdfd --- /dev/null +++ b/akka-stream/src/main/mima-filters/2.5.7.backwards.excludes @@ -0,0 +1,2 @@ +# Attributes overhaul +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.Graph.async") \ No newline at end of file diff --git a/akka-stream/src/main/scala/akka/stream/Graph.scala b/akka-stream/src/main/scala/akka/stream/Graph.scala index 22f6e386fd..9e709097f4 100644 --- a/akka-stream/src/main/scala/akka/stream/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/Graph.scala @@ -7,6 +7,11 @@ import akka.stream.impl.{ GraphStageTag, IslandTag, TraversalBuilder } import scala.annotation.unchecked.uncheckedVariance +/** + * Not intended to be directly extended by user classes + * + * @see [[akka.stream.stage.GraphStage]] + */ trait Graph[+S <: Shape, +M] { /** * Type-level accessor for the shape parameter of this graph. @@ -32,5 +37,33 @@ trait Graph[+S <: Shape, +M] { */ def async: Graph[S, M] = addAttributes(Attributes.asyncBoundary) + /** + * Put an asynchronous boundary around this `Graph` + * + * @param dispatcher Run the graph on this dispatcher + */ + def async(dispatcher: String) = + addAttributes( + Attributes.asyncBoundary and ActorAttributes.dispatcher(dispatcher) + ) + + /** + * Put an asynchronous boundary around this `Graph` + * + * @param dispatcher Run the graph on this dispatcher + * @param inputBufferSize Set the input buffer to this size for the graph + */ + def async(dispatcher: String, inputBufferSize: Int) = + addAttributes( + Attributes.asyncBoundary and ActorAttributes.dispatcher(dispatcher) + and Attributes.inputBuffer(inputBufferSize, inputBufferSize) + ) + + /** + * Add the given attributes to this [[Graph]]. If the specific attribute was already present + * on this graph this means the added attribute will be more specific than the existing one. + * If this Source is a composite of multiple graphs, new attributes on the composite will be + * less specific than attributes set directly on the individual graphs of the composite. + */ def addAttributes(attr: Attributes): Graph[S, M] = withAttributes(traversalBuilder.attributes and attr) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala index a63ec46ae4..94f8161371 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala @@ -13,9 +13,11 @@ import akka.stream._ @InternalApi private[akka] object Stages { object DefaultAttributes { + // reusable common attributes val IODispatcher = ActorAttributes.IODispatcher val inputBufferOne = inputBuffer(initial = 1, max = 1) + // stage specific default attributes val fused = name("fused") val materializedValueSource = name("matValueSource") val map = name("map") diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/BidiFlow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/BidiFlow.scala index e1788357fa..913f1e5b62 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/BidiFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/BidiFlow.scala @@ -223,4 +223,27 @@ final class BidiFlow[-I1, +O1, -I2, +O2, +Mat](delegate: scaladsl.BidiFlow[I1, O */ override def named(name: String): BidiFlow[I1, O1, I2, O2, Mat] = new BidiFlow(delegate.named(name)) + + /** + * Put an asynchronous boundary around this `Flow` + */ + override def async: BidiFlow[I1, O1, I2, O2, Mat] = + new BidiFlow(delegate.async) + + /** + * Put an asynchronous boundary around this `Flow` + * + * @param dispatcher Run the graph on this dispatcher + */ + override def async(dispatcher: String): BidiFlow[I1, O1, I2, O2, Mat] = + new BidiFlow(delegate.async(dispatcher)) + + /** + * Put an asynchronous boundary around this `Flow` + * + * @param dispatcher Run the graph on this dispatcher + * @param inputBufferSize Set the input buffer to this size for the graph + */ + override def async(dispatcher: String, inputBufferSize: Int): BidiFlow[I1, O1, I2, O2, Mat] = + new BidiFlow(delegate.async(dispatcher, inputBufferSize)) } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index f80d5afd95..51e88b7d9b 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -7,7 +7,6 @@ import akka.util.ConstantFun import akka.{ Done, NotUsed } import akka.event.LoggingAdapter import akka.japi.{ Pair, function } -import akka.stream.impl.StreamLayout import akka.stream._ import org.reactivestreams.Processor @@ -2199,20 +2198,21 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends new Flow(delegate.initialDelay(delay)) /** - * Change the attributes of this [[Source]] to the given ones and seal the list - * of attributes. This means that further calls will not be able to remove these - * attributes, but instead add new ones. Note that this - * operation has no effect on an empty Flow (because the attributes apply + * Replace the attributes of this [[Flow]] with the given ones. If this Flow is a composite + * of multiple graphs, new attributes on the composite will be less specific than attributes + * set directly on the individual graphs of the composite. + * + * Note that this operation has no effect on an empty Flow (because the attributes apply * only to the contained processing stages). */ override def withAttributes(attr: Attributes): javadsl.Flow[In, Out, Mat] = new Flow(delegate.withAttributes(attr)) /** - * Add the given attributes to this Source. Further calls to `withAttributes` - * will not remove these attributes. Note that this - * operation has no effect on an empty Flow (because the attributes apply - * only to the contained processing stages). + * Add the given attributes to this [[Flow]]. If the specific attribute was already present + * on this graph this means the added attribute will be more specific than the existing one. + * If this Flow is a composite of multiple graphs, new attributes on the composite will be + * less specific than attributes set directly on the individual graphs of the composite. */ override def addAttributes(attr: Attributes): javadsl.Flow[In, Out, Mat] = new Flow(delegate.addAttributes(attr)) @@ -2229,6 +2229,23 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends override def async: javadsl.Flow[In, Out, Mat] = new Flow(delegate.async) + /** + * Put an asynchronous boundary around this `Flow` + * + * @param dispatcher Run the graph on this dispatcher + */ + override def async(dispatcher: String): javadsl.Flow[In, Out, Mat] = + new Flow(delegate.async(dispatcher)) + + /** + * Put an asynchronous boundary around this `Flow` + * + * @param dispatcher Run the graph on this dispatcher + * @param inputBufferSize Set the input buffer to this size for the graph + */ + override def async(dispatcher: String, inputBufferSize: Int): javadsl.Flow[In, Out, Mat] = + new Flow(delegate.async(dispatcher, inputBufferSize)) + /** * Logs elements flowing through the stream as well as completion and erroring. * diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala index 81be1a2e31..91978cb9bc 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala @@ -9,7 +9,7 @@ import akka.{ Done, NotUsed } import akka.actor.{ ActorRef, Props } import akka.dispatch.ExecutionContexts import akka.japi.function -import akka.stream.impl.{ LinearTraversalBuilder, SinkQueueAdapter, StreamLayout } +import akka.stream.impl.{ LinearTraversalBuilder, SinkQueueAdapter } import akka.stream.{ javadsl, scaladsl, _ } import org.reactivestreams.{ Publisher, Subscriber } @@ -322,20 +322,18 @@ final class Sink[-In, +Mat](delegate: scaladsl.Sink[In, Mat]) extends Graph[Sink new Sink(delegate.mapMaterializedValue(f.apply _)) /** - * Change the attributes of this [[Sink]] to the given ones and seal the list - * of attributes. This means that further calls will not be able to remove these - * attributes, but instead add new ones. Note that this - * operation has no effect on an empty Flow (because the attributes apply - * only to the contained processing stages). + * Replace the attributes of this [[Sink]] with the given ones. If this Sink is a composite + * of multiple graphs, new attributes on the composite will be less specific than attributes + * set directly on the individual graphs of the composite. */ override def withAttributes(attr: Attributes): javadsl.Sink[In, Mat] = new Sink(delegate.withAttributes(attr)) /** - * Add the given attributes to this Sink. Further calls to `withAttributes` - * will not remove these attributes. Note that this - * operation has no effect on an empty Flow (because the attributes apply - * only to the contained processing stages). + * Add the given attributes to this [[Sink]]. If the specific attribute was already present + * on this graph this means the added attribute will be more specific than the existing one. + * If this Sink is a composite of multiple graphs, new attributes on the composite will be + * less specific than attributes set directly on the individual graphs of the composite. */ override def addAttributes(attr: Attributes): javadsl.Sink[In, Mat] = new Sink(delegate.addAttributes(attr)) @@ -352,4 +350,21 @@ final class Sink[-In, +Mat](delegate: scaladsl.Sink[In, Mat]) extends Graph[Sink override def async: javadsl.Sink[In, Mat] = new Sink(delegate.async) + /** + * Put an asynchronous boundary around this `Sink` + * + * @param dispatcher Run the graph on this dispatcher + */ + override def async(dispatcher: String): javadsl.Sink[In, Mat] = + new Sink(delegate.async(dispatcher)) + + /** + * Put an asynchronous boundary around this `Sink` + * + * @param dispatcher Run the graph on this dispatcher + * @param inputBufferSize Set the input buffer to this size for the graph + */ + override def async(dispatcher: String, inputBufferSize: Int): javadsl.Sink[In, Mat] = + new Sink(delegate.async(dispatcher, inputBufferSize)) + } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 4208e7e133..f191c02065 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -2260,20 +2260,18 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap new Source(delegate.initialDelay(delay)) /** - * Change the attributes of this [[Source]] to the given ones and seal the list - * of attributes. This means that further calls will not be able to remove these - * attributes, but instead add new ones. Note that this - * operation has no effect on an empty Flow (because the attributes apply - * only to the contained processing stages). + * Replace the attributes of this [[Source]] with the given ones. If this Source is a composite + * of multiple graphs, new attributes on the composite will be less specific than attributes + * set directly on the individual graphs of the composite. */ override def withAttributes(attr: Attributes): javadsl.Source[Out, Mat] = new Source(delegate.withAttributes(attr)) /** - * Add the given attributes to this Source. Further calls to `withAttributes` - * will not remove these attributes. Note that this - * operation has no effect on an empty Flow (because the attributes apply - * only to the contained processing stages). + * Add the given attributes to this [[Source]]. If the specific attribute was already present + * on this graph this means the added attribute will be more specific than the existing one. + * If this Source is a composite of multiple graphs, new attributes on the composite will be + * less specific than attributes set directly on the individual graphs of the composite. */ override def addAttributes(attr: Attributes): javadsl.Source[Out, Mat] = new Source(delegate.addAttributes(attr)) @@ -2290,6 +2288,23 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap override def async: javadsl.Source[Out, Mat] = new Source(delegate.async) + /** + * Put an asynchronous boundary around this `Source` + * + * @param dispatcher Run the graph on this dispatcher + */ + override def async(dispatcher: String): javadsl.Source[Out, Mat] = + new Source(delegate.async(dispatcher)) + + /** + * Put an asynchronous boundary around this `Source` + * + * @param dispatcher Run the graph on this dispatcher + * @param inputBufferSize Set the input buffer to this size for the graph + */ + override def async(dispatcher: String, inputBufferSize: Int): javadsl.Source[Out, Mat] = + new Source(delegate.async(dispatcher, inputBufferSize)) + /** * Logs elements flowing through the stream as well as completion and erroring. * diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/BidiFlow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/BidiFlow.scala index 97e72f7bea..3d6a7a3cd8 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/BidiFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/BidiFlow.scala @@ -179,8 +179,29 @@ final class BidiFlow[-I1, +O1, -I2, +O2, +Mat]( override def named(name: String): BidiFlow[I1, O1, I2, O2, Mat] = addAttributes(Attributes.name(name)) + /** + * Put an asynchronous boundary around this `BidiFlow` + */ override def async: BidiFlow[I1, O1, I2, O2, Mat] = - addAttributes(Attributes.asyncBoundary) + super.async.asInstanceOf[BidiFlow[I1, O1, I2, O2, Mat]] + + /** + * Put an asynchronous boundary around this `BidiFlow` + * + * @param dispatcher Run the graph on this dispatcher + */ + override def async(dispatcher: String): BidiFlow[I1, O1, I2, O2, Mat] = + super.async(dispatcher).asInstanceOf[BidiFlow[I1, O1, I2, O2, Mat]] + + /** + * Put an asynchronous boundary around this `BidiFlow` + * + * @param dispatcher Run the graph on this dispatcher + * @param inputBufferSize Set the input buffer to this size for the graph + */ + override def async(dispatcher: String, inputBufferSize: Int): BidiFlow[I1, O1, I2, O2, Mat] = + super.async(dispatcher, inputBufferSize).asInstanceOf[BidiFlow[I1, O1, I2, O2, Mat]] + } object BidiFlow { diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 7c549b598d..a145633c9c 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -209,10 +209,11 @@ final class Flow[-In, +Out, +Mat]( } /** - * Change the attributes of this [[Flow]] to the given ones and seal the list - * of attributes. This means that further calls will not be able to remove these - * attributes, but instead add new ones. Note that this - * operation has no effect on an empty Flow (because the attributes apply + * Replace the attributes of this [[Flow]] with the given ones. If this Flow is a composite + * of multiple graphs, new attributes on the composite will be less specific than attributes + * set directly on the individual graphs of the composite. + * + * Note that this operation has no effect on an empty Flow (because the attributes apply * only to the contained processing stages). */ override def withAttributes(attr: Attributes): Repr[Out] = @@ -221,10 +222,10 @@ final class Flow[-In, +Out, +Mat]( shape) /** - * Add the given attributes to this Flow. Further calls to `withAttributes` - * will not remove these attributes. Note that this - * operation has no effect on an empty Flow (because the attributes apply - * only to the contained processing stages). + * Add the given attributes to this [[Flow]]. If the specific attribute was already present + * on this graph this means the added attribute will be more specific than the existing one. + * If this Flow is a composite of multiple graphs, new attributes on the composite will be + * less specific than attributes set directly on the individual graphs of the composite. */ override def addAttributes(attr: Attributes): Repr[Out] = withAttributes(traversalBuilder.attributes and attr) @@ -236,7 +237,24 @@ final class Flow[-In, +Out, +Mat]( /** * Put an asynchronous boundary around this `Flow` */ - override def async: Repr[Out] = addAttributes(Attributes.asyncBoundary) + override def async: Repr[Out] = super.async.asInstanceOf[Repr[Out]] + + /** + * Put an asynchronous boundary around this `Flow` + * + * @param dispatcher Run the graph on this dispatcher + */ + override def async(dispatcher: String): Repr[Out] = + super.async(dispatcher).asInstanceOf[Repr[Out]] + + /** + * Put an asynchronous boundary around this `Flow` + * + * @param dispatcher Run the graph on this dispatcher + * @param inputBufferSize Set the input buffer to this size for the graph + */ + override def async(dispatcher: String, inputBufferSize: Int): Repr[Out] = + super.async(dispatcher, inputBufferSize).asInstanceOf[Repr[Out]] /** * Connect the `Source` to this `Flow` and then connect it to the `Sink` and run it. The returned tuple contains @@ -309,6 +327,16 @@ object Flow { g match { case f: Flow[I, O, M] ⇒ f case f: javadsl.Flow[I, O, M] ⇒ f.asScala + case g: GraphStageWithMaterializedValue[FlowShape[I, O], M] ⇒ + // move these from the stage itself to make the returned source + // behave as it is the stage with regards to attributes + val attrs = g.traversalBuilder.attributes + val noAttrStage = g.withAttributes(Attributes.none) + new Flow( + LinearTraversalBuilder.fromBuilder(noAttrStage.traversalBuilder, noAttrStage.shape, Keep.right), + noAttrStage.shape + ).withAttributes(attrs) + case other ⇒ new Flow( LinearTraversalBuilder.fromBuilder(g.traversalBuilder, g.shape, Keep.right), g.shape) @@ -504,7 +532,23 @@ final case class RunnableGraph[+Mat](override val traversalBuilder: TraversalBui override def named(name: String): RunnableGraph[Mat] = addAttributes(Attributes.name(name)) - override def async: RunnableGraph[Mat] = addAttributes(Attributes.asyncBoundary) + /** + * Note that an async boundary around a runnable graph does not make sense + */ + override def async: RunnableGraph[Mat] = + super.async.asInstanceOf[RunnableGraph[Mat]] + + /** + * Note that an async boundary around a runnable graph does not make sense + */ + override def async(dispatcher: String): RunnableGraph[Mat] = + super.async(dispatcher).asInstanceOf[RunnableGraph[Mat]] + + /** + * Note that an async boundary around a runnable graph does not make sense + */ + override def async(dispatcher: String, inputBufferSize: Int): RunnableGraph[Mat] = + super.async(dispatcher, inputBufferSize).asInstanceOf[RunnableGraph[Mat]] } /** diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala index 1fa929a64d..477e0e9d7f 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala @@ -10,7 +10,7 @@ import akka.stream.actor.ActorSubscriber import akka.stream.impl.Stages.DefaultAttributes import akka.stream.impl._ import akka.stream.impl.fusing.GraphStages -import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler } +import akka.stream.stage._ import akka.stream.{ javadsl, _ } import org.reactivestreams.{ Publisher, Subscriber } @@ -57,11 +57,9 @@ final class Sink[-In, +Mat]( shape) /** - * Change the attributes of this [[Sink]] to the given ones and seal the list - * of attributes. This means that further calls will not be able to remove these - * attributes, but instead add new ones. Note that this - * operation has no effect on an empty Flow (because the attributes apply - * only to the contained processing stages). + * Replace the attributes of this [[Sink]] with the given ones. If this Sink is a composite + * of multiple graphs, new attributes on the composite will be less specific than attributes + * set directly on the individual graphs of the composite. */ override def withAttributes(attr: Attributes): Sink[In, Mat] = new Sink( @@ -69,10 +67,10 @@ final class Sink[-In, +Mat]( shape) /** - * Add the given attributes to this Sink. Further calls to `withAttributes` - * will not remove these attributes. Note that this - * operation has no effect on an empty Flow (because the attributes apply - * only to the contained processing stages). + * Add the given attributes to this [[Sink]]. If the specific attribute was already present + * on this graph this means the added attribute will be more specific than the existing one. + * If this Sink is a composite of multiple graphs, new attributes on the composite will be + * less specific than attributes set directly on the individual graphs of the composite. */ override def addAttributes(attr: Attributes): Sink[In, Mat] = withAttributes(traversalBuilder.attributes and attr) @@ -83,9 +81,26 @@ final class Sink[-In, +Mat]( override def named(name: String): Sink[In, Mat] = addAttributes(Attributes.name(name)) /** - * Put an asynchronous boundary around this `Sink` + * Put an asynchronous boundary around this `Source` */ - override def async: Sink[In, Mat] = addAttributes(Attributes.asyncBoundary) + override def async: Sink[In, Mat] = super.async.asInstanceOf[Sink[In, Mat]] + + /** + * Put an asynchronous boundary around this `Graph` + * + * @param dispatcher Run the graph on this dispatcher + */ + override def async(dispatcher: String): Sink[In, Mat] = + super.async(dispatcher).asInstanceOf[Sink[In, Mat]] + + /** + * Put an asynchronous boundary around this `Graph` + * + * @param dispatcher Run the graph on this dispatcher + * @param inputBufferSize Set the input buffer to this size for the graph + */ + override def async(dispatcher: String, inputBufferSize: Int): Sink[In, Mat] = + super.async(dispatcher, inputBufferSize).asInstanceOf[Sink[In, Mat]] /** * Converts this Scala DSL element to it's Java DSL counterpart. @@ -106,6 +121,16 @@ object Sink { g match { case s: Sink[T, M] ⇒ s case s: javadsl.Sink[T, M] ⇒ s.asScala + case g: GraphStageWithMaterializedValue[SinkShape[T], M] ⇒ + // move these from the stage itself to make the returned source + // behave as it is the stage with regards to attributes + val attrs = g.traversalBuilder.attributes + val noAttrStage = g.withAttributes(Attributes.none) + new Sink( + LinearTraversalBuilder.fromBuilder(noAttrStage.traversalBuilder, noAttrStage.shape, Keep.right), + noAttrStage.shape + ).withAttributes(attrs) + case other ⇒ new Sink( LinearTraversalBuilder.fromBuilder(other.traversalBuilder, other.shape, Keep.right), other.shape) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index c2a3521fb6..b7b9e8d6f8 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -21,6 +21,8 @@ import scala.concurrent.duration.FiniteDuration import scala.concurrent.{ Future, Promise } import java.util.concurrent.CompletionStage +import akka.stream.stage.{ GraphStage, GraphStageWithMaterializedValue } + import scala.compat.java8.FutureConverters._ /** @@ -137,20 +139,18 @@ final class Source[+Out, +Mat]( def runForeach(f: Out ⇒ Unit)(implicit materializer: Materializer): Future[Done] = runWith(Sink.foreach(f)) /** - * Change the attributes of this [[Source]] to the given ones and seal the list - * of attributes. This means that further calls will not be able to remove these - * attributes, but instead add new ones. Note that this - * operation has no effect on an empty Flow (because the attributes apply - * only to the contained processing stages). + * Replace the attributes of this [[Source]] with the given ones. If this Source is a composite + * of multiple graphs, new attributes on the composite will be less specific than attributes + * set directly on the individual graphs of the composite. */ override def withAttributes(attr: Attributes): Repr[Out] = new Source(traversalBuilder.setAttributes(attr), shape) /** - * Add the given attributes to this Source. Further calls to `withAttributes` - * will not remove these attributes. Note that this - * operation has no effect on an empty Flow (because the attributes apply - * only to the contained processing stages). + * Add the given attributes to this Source. If the specific attribute was already on this source + * it will replace the previous value. If this Source is a composite + * of multiple graphs, the added attributes will be on the composite and therefore less specific than attributes + * set directly on the individual graphs of the composite. */ override def addAttributes(attr: Attributes): Repr[Out] = withAttributes(traversalBuilder.attributes and attr) @@ -162,7 +162,24 @@ final class Source[+Out, +Mat]( /** * Put an asynchronous boundary around this `Source` */ - override def async: Repr[Out] = addAttributes(Attributes.asyncBoundary) + override def async: Repr[Out] = super.async.asInstanceOf[Repr[Out]] + + /** + * Put an asynchronous boundary around this `Graph` + * + * @param dispatcher Run the graph on this dispatcher + */ + override def async(dispatcher: String): Repr[Out] = + super.async(dispatcher).asInstanceOf[Repr[Out]] + + /** + * Put an asynchronous boundary around this `Graph` + * + * @param dispatcher Run the graph on this dispatcher + * @param inputBufferSize Set the input buffer to this size for the graph + */ + override def async(dispatcher: String, inputBufferSize: Int): Repr[Out] = + super.async(dispatcher, inputBufferSize).asInstanceOf[Repr[Out]] /** * Converts this Scala DSL element to it's Java DSL counterpart. @@ -239,9 +256,20 @@ object Source { def fromGraph[T, M](g: Graph[SourceShape[T], M]): Source[T, M] = g match { case s: Source[T, M] ⇒ s case s: javadsl.Source[T, M] ⇒ s.asScala - case other ⇒ new Source( - LinearTraversalBuilder.fromBuilder(other.traversalBuilder, other.shape, Keep.right), - other.shape) + case g: GraphStageWithMaterializedValue[SourceShape[T], M] ⇒ + // move these from the stage itself to make the returned source + // behave as it is the stage with regards to attributes + val attrs = g.traversalBuilder.attributes + val noAttrStage = g.withAttributes(Attributes.none) + new Source( + LinearTraversalBuilder.fromBuilder(noAttrStage.traversalBuilder, noAttrStage.shape, Keep.right), + noAttrStage.shape + ).withAttributes(attrs) + case other ⇒ + // composite source shaped graph + new Source( + LinearTraversalBuilder.fromBuilder(other.traversalBuilder, other.shape, Keep.right), + other.shape) } /**