Attributes on fromGraph(stage) should be treated as on stage #22911, #22523

This commit is contained in:
Johan Andrén 2017-11-23 10:26:00 +01:00 committed by GitHub
parent cb6a660cf4
commit 1751292580
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 741 additions and 101 deletions

View file

@ -3,39 +3,110 @@
*/
package akka.stream.scaladsl
import akka.stream.ActorMaterializer
import akka.stream.ActorMaterializerSettings
import akka.stream.Attributes
import java.util.Optional
import java.util.concurrent.{ CompletableFuture, CompletionStage, TimeUnit }
import akka.{ Done, NotUsed }
import akka.stream.Attributes._
import akka.stream.MaterializationContext
import akka.stream.SinkShape
import akka.stream._
import akka.stream.javadsl
import akka.stream.stage._
import akka.stream.testkit._
import scala.concurrent.Future
import scala.concurrent.Promise
import akka.stream.impl.SinkModule
import akka.stream.impl.SinkholeSubscriber
import com.typesafe.config.ConfigFactory
object AttributesSpec {
object AttributesSink {
def apply(): Sink[Nothing, Future[Attributes]] =
Sink.fromGraph[Nothing, Future[Attributes]](new AttributesSink(Attributes.name("attributesSink"), Sink.shape("attributesSink")))
class AttributesSource(_initialAttributes: Attributes = Attributes.none) extends GraphStageWithMaterializedValue[SourceShape[Any], Attributes] {
val out = Outlet[Any]("out")
override protected def initialAttributes: Attributes = _initialAttributes
override val shape = SourceShape.of(out)
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Attributes) = {
val logic = new GraphStageLogic(shape) {
setHandler(out, new OutHandler {
def onPull(): Unit = {
}
})
}
(logic, inheritedAttributes)
}
}
final class AttributesSink(val attributes: Attributes, shape: SinkShape[Nothing]) extends SinkModule[Nothing, Future[Attributes]](shape) {
override def create(context: MaterializationContext) =
(new SinkholeSubscriber(Promise()), Future.successful(context.effectiveAttributes))
class AttributesFlow(_initialAttributes: Attributes = Attributes.none) extends GraphStageWithMaterializedValue[FlowShape[Any, Any], Attributes] {
override protected def newInstance(shape: SinkShape[Nothing]): SinkModule[Nothing, Future[Attributes]] =
new AttributesSink(attributes, shape)
val in = Inlet[Any]("in")
val out = Outlet[Any]("out")
override def withAttributes(attr: Attributes): SinkModule[Nothing, Future[Attributes]] =
new AttributesSink(attr, amendShape(attr))
override protected def initialAttributes: Attributes = _initialAttributes
override val shape = FlowShape(in, out)
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Attributes) = {
val logic = new GraphStageLogic(shape) {
setHandlers(in, out, new InHandler with OutHandler {
override def onPush(): Unit = push(out, grab(in))
override def onPull(): Unit = pull(in)
})
}
(logic, inheritedAttributes)
}
}
class AttributesSink(_initialAttributes: Attributes = Attributes.none) extends GraphStageWithMaterializedValue[SinkShape[Any], Attributes] {
val in = Inlet[Any]("in")
override protected def initialAttributes: Attributes = _initialAttributes
override val shape = SinkShape(in)
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Attributes) = {
val logic = new GraphStageLogic(shape) {
override def preStart(): Unit = {
pull(in)
}
setHandler(in, new InHandler {
override def onPush(): Unit = {
grab(in)
pull(in)
}
})
}
(logic, inheritedAttributes)
}
}
class ThreadNameSnitchingStage(initialDispatcher: String) extends GraphStage[SourceShape[String]] {
val out = Outlet[String]("out")
override val shape = SourceShape.of(out)
override protected def initialAttributes: Attributes = ActorAttributes.dispatcher(initialDispatcher)
def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
setHandler(out, new OutHandler {
def onPull(): Unit = {
push(out, Thread.currentThread.getName)
completeStage()
}
})
}
}
def whateverAttribute(label: String): Attributes = Attributes(WhateverAttribute(label))
case class WhateverAttribute(label: String) extends Attribute
}
class AttributesSpec extends StreamSpec {
class AttributesSpec extends StreamSpec(ConfigFactory.parseString(
"""
my-dispatcher {
type = Dispatcher
executor = "thread-pool-executor"
thread-pool-executor {
fixed-pool-size = 1
}
throughput = 1
}
""").withFallback(Utils.UnboundedMailboxConfig)) {
import AttributesSpec._
val settings = ActorMaterializerSettings(system)
@ -43,32 +114,376 @@ class AttributesSpec extends StreamSpec {
implicit val materializer = ActorMaterializer(settings)
"attributes" must {
"be overridable on a module basis" in {
val runnable = Source.empty.toMat(AttributesSink().withAttributes(Attributes.name("new-name")))(Keep.right)
whenReady(runnable.run()) { attributes
attributes.get[Name] should contain(Name("new-name"))
}
}
"keep the outermost attribute as the least specific" in {
val runnable = Source.empty.toMat(AttributesSink())(Keep.right).withAttributes(Attributes.name("new-name"))
whenReady(runnable.run()) { attributes
attributes.get[Name] should contain(Name("attributesSink"))
}
}
"an attributes instance" must {
val attributes = Attributes.name("a") and Attributes.name("b") and Attributes.inputBuffer(1, 2)
"give access to first attribute" in {
"give access to the least specific attribute" in {
attributes.getFirst[Name] should ===(Some(Attributes.Name("a")))
}
"give access to attribute byt type" in {
"give access to the most specific attribute value" in {
attributes.get[Name] should ===(Some(Attributes.Name("b")))
}
}
"attributes on a graph stage" must {
"be appended with addAttributes" in {
val attributes =
Source.fromGraph(new AttributesSource()
.addAttributes(Attributes.name("new-name"))
.addAttributes(Attributes.name("re-added")) // adding twice at same level replaces
.addAttributes(whateverAttribute("other-thing"))
)
.toMat(Sink.head)(Keep.left)
.run()
attributes.get[Name] should contain(Name("re-added"))
attributes.get[WhateverAttribute] should contain(WhateverAttribute("other-thing"))
}
"be replaced withAttributes directly on a stage" in {
val attributes =
Source.fromGraph(new AttributesSource()
.withAttributes(Attributes.name("new-name") and whateverAttribute("other-thing"))
.withAttributes(Attributes.name("re-added")) // we loose all previous attributes for same level
)
.toMat(Sink.head)(Keep.left)
.run()
attributes.get[Name] should contain(Name("re-added"))
attributes.get[WhateverAttribute] shouldBe empty
}
"be overridable on a module basis" in {
val attributes =
Source.fromGraph(new AttributesSource().withAttributes(Attributes.name("new-name")))
.toMat(Sink.head)(Keep.left)
.run()
attributes.get[Name] should contain(Name("new-name"))
}
"keep the outermost attribute as the least specific" in {
val attributes = Source.fromGraph(new AttributesSource(Attributes.name("original-name")))
.map(identity)
.addAttributes(Attributes.name("whole-graph"))
.toMat(Sink.head)(Keep.left)
.run()
// most specific
attributes.get[Name] should contain(Name("original-name"))
// least specific
attributes.getFirst[Name] should contain(Name("whole-graph"))
}
}
"attributes on a source" must {
"make the attributes on fromGraph(single-source-stage) Source behave the same as the stage itself" in {
val attributes =
Source.fromGraph(
new AttributesSource(Attributes.name("original-name") and whateverAttribute("whatever"))
.withAttributes(Attributes.name("new-name")))
.toMat(Sink.head)(Keep.left)
.run()
// most specific
attributes.get[Name] should contain(Name("new-name"))
// least specific
attributes.getFirst[Name] should contain(Name("new-name"))
}
"make the attributes on Source.fromGraph source behave the same as the stage itself" in {
val attributes =
Source.fromGraph(new AttributesSource(Attributes.name("original-name")))
.withAttributes(Attributes.name("replaced")) // this actually replaces now
.toMat(Sink.head)(Keep.left).withAttributes(Attributes.name("whole-graph"))
.run()
// most specific
attributes.get[Name] should contain(Name("replaced"))
attributes.get[WhateverAttribute] shouldBe empty
// least specific
attributes.getFirst[Name] should contain(Name("whole-graph"))
attributes.getFirst[WhateverAttribute] shouldBe empty
}
"not replace stage specific attributes with attributes on surrounding composite source" in {
val attributes = Source.fromGraph(new AttributesSource(Attributes.name("original-name")))
.map(identity)
.addAttributes(Attributes.name("composite-graph"))
.toMat(Sink.head)(Keep.left)
.run()
// most specific still the original as the attribute was added on the composite source
attributes.get[Name] should contain(Name("original-name"))
// least specific
attributes.getFirst[Name] should contain(Name("composite-graph"))
}
"make the attributes on Sink.fromGraph source behave the same as the stage itself" in {
val attributes =
Source.maybe.toMat(
Sink.fromGraph(new AttributesSink(Attributes.name("original-name")))
.withAttributes(Attributes.name("replaced")) // this actually replaces now
)(Keep.right)
.withAttributes(Attributes.name("whole-graph"))
.run()
// most specific
attributes.get[Name] should contain(Name("replaced"))
// least specific
attributes.getFirst[Name] should contain(Name("whole-graph"))
}
"use the initial attributes for dispatcher" in {
val dispatcher =
Source.fromGraph(new ThreadNameSnitchingStage("my-dispatcher"))
.runWith(Sink.head)
.futureValue
dispatcher should startWith("AttributesSpec-my-dispatcher")
}
"use an explicit attribute on the stage to select dispatcher" in {
val dispatcher =
Source.fromGraph(
// directly on stage
new ThreadNameSnitchingStage("akka.stream.default-blocking-io-dispatcher")
.addAttributes(ActorAttributes.dispatcher("my-dispatcher")))
.runWith(Sink.head)
.futureValue
dispatcher should startWith("AttributesSpec-my-dispatcher")
}
"use the most specific dispatcher when another one is defined on a surrounding composed graph" in {
val dispatcher =
Source.fromGraph(
new ThreadNameSnitchingStage("akka.stream.default-blocking-io-dispatcher"))
.map(identity)
// this is now for the composed source -> flow graph
.addAttributes(ActorAttributes.dispatcher("my-dispatcher"))
.runWith(Sink.head)
.futureValue
dispatcher should startWith("AttributesSpec-akka.stream.default-blocking-io-dispatcher")
}
"not change dispatcher from one defined on a surrounding graph" in {
val dispatcher =
Source.fromGraph(
new ThreadNameSnitchingStage("akka.stream.default-blocking-io-dispatcher"))
// this already introduces an async boundary here
.map(identity)
// this is now just for map since there already is one inbetween stage and map
.async // potential sugar .async("my-dispatcher")
.addAttributes(ActorAttributes.dispatcher("my-dispatcher"))
.runWith(Sink.head)
.futureValue
dispatcher should startWith("AttributesSpec-akka.stream.default-blocking-io-dispatcher")
}
"change dispatcher when defined directly on top of the async boundary" in {
val dispatcher =
Source.fromGraph(
new ThreadNameSnitchingStage("akka.stream.default-blocking-io-dispatcher"))
.async
.withAttributes(ActorAttributes.dispatcher("my-dispatcher"))
.runWith(Sink.head)
.futureValue
dispatcher should startWith("AttributesSpec-my-dispatcher")
}
"change dispatcher when defined on the async call" in {
val dispatcher =
Source.fromGraph(
new ThreadNameSnitchingStage("akka.stream.default-blocking-io-dispatcher"))
.async("my-dispatcher")
.runWith(Sink.head)
.futureValue
dispatcher should startWith("AttributesSpec-my-dispatcher")
}
}
"attributes on a Flow" must {
"make the attributes on fromGraph(flow-stage) Flow behave the same as the stage itself" in {
val attributes =
Source.empty
.viaMat(
Flow.fromGraph(new AttributesFlow(Attributes.name("original-name")))
.withAttributes(Attributes.name("replaced")) // this actually replaces now
)(Keep.right)
.withAttributes(Attributes.name("source-flow"))
.toMat(Sink.ignore)(Keep.left)
.withAttributes(Attributes.name("whole-graph"))
.run()
attributes.get[Name] should contain(Name("replaced"))
attributes.getFirst[Name] should contain(Name("whole-graph"))
}
"handle attributes on a composed flow" in {
val attributes =
Source.empty
.viaMat(
Flow.fromGraph(new AttributesFlow(Attributes.name("original-name")))
.map(identity)
.withAttributes(Attributes.name("replaced"))
.addAttributes(whateverAttribute("whatever"))
.withAttributes(Attributes.name("replaced-again"))
.addAttributes(whateverAttribute("replaced"))
)(Keep.right)
.toMat(Sink.ignore)(Keep.left)
.run()
// this verifies that the old docs on flow.withAttribues was in fact incorrect
// there is no sealing going on here
attributes.get[Name] should contain(Name("original-name"))
attributes.get[WhateverAttribute] should contain(WhateverAttribute("replaced"))
attributes.getFirst[Name] should contain(Name("replaced-again"))
attributes.getFirst[WhateverAttribute] should contain(WhateverAttribute("replaced"))
}
}
"attributes on a Sink" must {
"make the attributes on fromGraph(sink-stage) Sink behave the same as the stage itself" in {
val attributes =
Source.empty.toMat(
Sink.fromGraph(new AttributesSink(Attributes.name("original-name")))
.withAttributes(Attributes.name("replaced")) // this actually replaces now
)(Keep.right)
.withAttributes(Attributes.name("whole-graph"))
.run()
// most specific
attributes.get[Name] should contain(Name("replaced"))
// least specific
attributes.getFirst[Name] should contain(Name("whole-graph"))
}
}
"attributes in the javadsl source" must {
"not change dispatcher from one defined on a surrounding graph" in {
val dispatcherF =
javadsl.Source.fromGraph(
new ThreadNameSnitchingStage("akka.stream.default-blocking-io-dispatcher"))
// this already introduces an async boundary here
.detach
// this is now just for map since there already is one inbetween stage and map
.async
.addAttributes(ActorAttributes.dispatcher("my-dispatcher"))
.runWith(javadsl.Sink.head(), materializer)
val dispatcher = dispatcherF.toCompletableFuture.get(remainingOrDefault.toMillis, TimeUnit.MILLISECONDS)
dispatcher should startWith("AttributesSpec-akka.stream.default-blocking-io-dispatcher")
}
"change dispatcher when defined directly on top of the async boundary" in {
val dispatcherF =
javadsl.Source.fromGraph(
new ThreadNameSnitchingStage("akka.stream.default-blocking-io-dispatcher"))
.async
.withAttributes(ActorAttributes.dispatcher("my-dispatcher"))
.runWith(javadsl.Sink.head(), materializer)
val dispatcher = dispatcherF.toCompletableFuture.get(remainingOrDefault.toMillis, TimeUnit.MILLISECONDS)
dispatcher should startWith("AttributesSpec-my-dispatcher")
}
"make the attributes on Source.fromGraph source behave the same as the stage itself" in {
val attributes: Attributes =
javadsl.Source.fromGraph(new AttributesSource(Attributes.name("original-name")))
.withAttributes(Attributes.name("replaced")) // this actually replaces now
.toMat(javadsl.Sink.ignore(), javadsl.Keep.left[Attributes, CompletionStage[Done]])
.withAttributes(Attributes.name("whole-graph"))
.run(materializer)
// most specific
attributes.get[Name] should contain(Name("replaced"))
// least specific
attributes.getFirst[Name] should contain(Name("whole-graph"))
}
"make the attributes on Flow.fromGraph source behave the same as the stage itself" in {
val attributes: Attributes =
javadsl.Source.empty[Any]
.viaMat(
javadsl.Flow.fromGraph(new AttributesFlow(Attributes.name("original-name")))
.withAttributes(Attributes.name("replaced")) // this actually replaces now
, javadsl.Keep.right[NotUsed, Attributes])
.withAttributes(Attributes.name("source-flow"))
.toMat(javadsl.Sink.ignore(), javadsl.Keep.left[Attributes, CompletionStage[Done]])
.withAttributes(Attributes.name("whole-graph"))
.run(materializer)
// most specific
attributes.get[Name] should contain(Name("replaced"))
// least specific
attributes.getFirst[Name] should contain(Name("whole-graph"))
}
"make the attributes on Sink.fromGraph source behave the same as the stage itself" in {
val attributes: Attributes =
javadsl.Source.empty[Any].toMat(
javadsl.Sink.fromGraph(new AttributesSink(Attributes.name("original-name")))
.withAttributes(Attributes.name("replaced")) // this actually replaces now
, javadsl.Keep.right[NotUsed, Attributes])
.withAttributes(Attributes.name("whole-graph"))
.run(materializer)
// most specific
attributes.get[Name] should contain(Name("replaced"))
// least specific
attributes.getFirst[Name] should contain(Name("whole-graph"))
}
}
"attributes on the materializer" should {
"be defaults and not used when more specific attributes are found" in {
// dispatcher set on the materializer
val myDispatcherMaterializer = ActorMaterializer(settings.withDispatcher("my-dispatcher"))
try {
val dispatcher =
Source.fromGraph(new ThreadNameSnitchingStage("akka.stream.default-blocking-io-dispatcher"))
.runWith(Sink.head)(myDispatcherMaterializer)
.futureValue
// should not override stage specific dispatcher
dispatcher should startWith("AttributesSpec-akka.stream.default-blocking-io-dispatcher")
} finally {
myDispatcherMaterializer.shutdown()
}
}
}
}

View file

@ -0,0 +1,2 @@
# Attributes overhaul
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.Graph.async")

View file

@ -7,6 +7,11 @@ import akka.stream.impl.{ GraphStageTag, IslandTag, TraversalBuilder }
import scala.annotation.unchecked.uncheckedVariance
/**
* Not intended to be directly extended by user classes
*
* @see [[akka.stream.stage.GraphStage]]
*/
trait Graph[+S <: Shape, +M] {
/**
* Type-level accessor for the shape parameter of this graph.
@ -32,5 +37,33 @@ trait Graph[+S <: Shape, +M] {
*/
def async: Graph[S, M] = addAttributes(Attributes.asyncBoundary)
/**
* Put an asynchronous boundary around this `Graph`
*
* @param dispatcher Run the graph on this dispatcher
*/
def async(dispatcher: String) =
addAttributes(
Attributes.asyncBoundary and ActorAttributes.dispatcher(dispatcher)
)
/**
* Put an asynchronous boundary around this `Graph`
*
* @param dispatcher Run the graph on this dispatcher
* @param inputBufferSize Set the input buffer to this size for the graph
*/
def async(dispatcher: String, inputBufferSize: Int) =
addAttributes(
Attributes.asyncBoundary and ActorAttributes.dispatcher(dispatcher)
and Attributes.inputBuffer(inputBufferSize, inputBufferSize)
)
/**
* Add the given attributes to this [[Graph]]. If the specific attribute was already present
* on this graph this means the added attribute will be more specific than the existing one.
* If this Source is a composite of multiple graphs, new attributes on the composite will be
* less specific than attributes set directly on the individual graphs of the composite.
*/
def addAttributes(attr: Attributes): Graph[S, M] = withAttributes(traversalBuilder.attributes and attr)
}

View file

@ -13,9 +13,11 @@ import akka.stream._
@InternalApi private[akka] object Stages {
object DefaultAttributes {
// reusable common attributes
val IODispatcher = ActorAttributes.IODispatcher
val inputBufferOne = inputBuffer(initial = 1, max = 1)
// stage specific default attributes
val fused = name("fused")
val materializedValueSource = name("matValueSource")
val map = name("map")

View file

@ -223,4 +223,27 @@ final class BidiFlow[-I1, +O1, -I2, +O2, +Mat](delegate: scaladsl.BidiFlow[I1, O
*/
override def named(name: String): BidiFlow[I1, O1, I2, O2, Mat] =
new BidiFlow(delegate.named(name))
/**
* Put an asynchronous boundary around this `Flow`
*/
override def async: BidiFlow[I1, O1, I2, O2, Mat] =
new BidiFlow(delegate.async)
/**
* Put an asynchronous boundary around this `Flow`
*
* @param dispatcher Run the graph on this dispatcher
*/
override def async(dispatcher: String): BidiFlow[I1, O1, I2, O2, Mat] =
new BidiFlow(delegate.async(dispatcher))
/**
* Put an asynchronous boundary around this `Flow`
*
* @param dispatcher Run the graph on this dispatcher
* @param inputBufferSize Set the input buffer to this size for the graph
*/
override def async(dispatcher: String, inputBufferSize: Int): BidiFlow[I1, O1, I2, O2, Mat] =
new BidiFlow(delegate.async(dispatcher, inputBufferSize))
}

View file

@ -7,7 +7,6 @@ import akka.util.ConstantFun
import akka.{ Done, NotUsed }
import akka.event.LoggingAdapter
import akka.japi.{ Pair, function }
import akka.stream.impl.StreamLayout
import akka.stream._
import org.reactivestreams.Processor
@ -2199,20 +2198,21 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
new Flow(delegate.initialDelay(delay))
/**
* Change the attributes of this [[Source]] to the given ones and seal the list
* of attributes. This means that further calls will not be able to remove these
* attributes, but instead add new ones. Note that this
* operation has no effect on an empty Flow (because the attributes apply
* Replace the attributes of this [[Flow]] with the given ones. If this Flow is a composite
* of multiple graphs, new attributes on the composite will be less specific than attributes
* set directly on the individual graphs of the composite.
*
* Note that this operation has no effect on an empty Flow (because the attributes apply
* only to the contained processing stages).
*/
override def withAttributes(attr: Attributes): javadsl.Flow[In, Out, Mat] =
new Flow(delegate.withAttributes(attr))
/**
* Add the given attributes to this Source. Further calls to `withAttributes`
* will not remove these attributes. Note that this
* operation has no effect on an empty Flow (because the attributes apply
* only to the contained processing stages).
* Add the given attributes to this [[Flow]]. If the specific attribute was already present
* on this graph this means the added attribute will be more specific than the existing one.
* If this Flow is a composite of multiple graphs, new attributes on the composite will be
* less specific than attributes set directly on the individual graphs of the composite.
*/
override def addAttributes(attr: Attributes): javadsl.Flow[In, Out, Mat] =
new Flow(delegate.addAttributes(attr))
@ -2229,6 +2229,23 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
override def async: javadsl.Flow[In, Out, Mat] =
new Flow(delegate.async)
/**
* Put an asynchronous boundary around this `Flow`
*
* @param dispatcher Run the graph on this dispatcher
*/
override def async(dispatcher: String): javadsl.Flow[In, Out, Mat] =
new Flow(delegate.async(dispatcher))
/**
* Put an asynchronous boundary around this `Flow`
*
* @param dispatcher Run the graph on this dispatcher
* @param inputBufferSize Set the input buffer to this size for the graph
*/
override def async(dispatcher: String, inputBufferSize: Int): javadsl.Flow[In, Out, Mat] =
new Flow(delegate.async(dispatcher, inputBufferSize))
/**
* Logs elements flowing through the stream as well as completion and erroring.
*

View file

@ -9,7 +9,7 @@ import akka.{ Done, NotUsed }
import akka.actor.{ ActorRef, Props }
import akka.dispatch.ExecutionContexts
import akka.japi.function
import akka.stream.impl.{ LinearTraversalBuilder, SinkQueueAdapter, StreamLayout }
import akka.stream.impl.{ LinearTraversalBuilder, SinkQueueAdapter }
import akka.stream.{ javadsl, scaladsl, _ }
import org.reactivestreams.{ Publisher, Subscriber }
@ -322,20 +322,18 @@ final class Sink[-In, +Mat](delegate: scaladsl.Sink[In, Mat]) extends Graph[Sink
new Sink(delegate.mapMaterializedValue(f.apply _))
/**
* Change the attributes of this [[Sink]] to the given ones and seal the list
* of attributes. This means that further calls will not be able to remove these
* attributes, but instead add new ones. Note that this
* operation has no effect on an empty Flow (because the attributes apply
* only to the contained processing stages).
* Replace the attributes of this [[Sink]] with the given ones. If this Sink is a composite
* of multiple graphs, new attributes on the composite will be less specific than attributes
* set directly on the individual graphs of the composite.
*/
override def withAttributes(attr: Attributes): javadsl.Sink[In, Mat] =
new Sink(delegate.withAttributes(attr))
/**
* Add the given attributes to this Sink. Further calls to `withAttributes`
* will not remove these attributes. Note that this
* operation has no effect on an empty Flow (because the attributes apply
* only to the contained processing stages).
* Add the given attributes to this [[Sink]]. If the specific attribute was already present
* on this graph this means the added attribute will be more specific than the existing one.
* If this Sink is a composite of multiple graphs, new attributes on the composite will be
* less specific than attributes set directly on the individual graphs of the composite.
*/
override def addAttributes(attr: Attributes): javadsl.Sink[In, Mat] =
new Sink(delegate.addAttributes(attr))
@ -352,4 +350,21 @@ final class Sink[-In, +Mat](delegate: scaladsl.Sink[In, Mat]) extends Graph[Sink
override def async: javadsl.Sink[In, Mat] =
new Sink(delegate.async)
/**
* Put an asynchronous boundary around this `Sink`
*
* @param dispatcher Run the graph on this dispatcher
*/
override def async(dispatcher: String): javadsl.Sink[In, Mat] =
new Sink(delegate.async(dispatcher))
/**
* Put an asynchronous boundary around this `Sink`
*
* @param dispatcher Run the graph on this dispatcher
* @param inputBufferSize Set the input buffer to this size for the graph
*/
override def async(dispatcher: String, inputBufferSize: Int): javadsl.Sink[In, Mat] =
new Sink(delegate.async(dispatcher, inputBufferSize))
}

View file

@ -2260,20 +2260,18 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
new Source(delegate.initialDelay(delay))
/**
* Change the attributes of this [[Source]] to the given ones and seal the list
* of attributes. This means that further calls will not be able to remove these
* attributes, but instead add new ones. Note that this
* operation has no effect on an empty Flow (because the attributes apply
* only to the contained processing stages).
* Replace the attributes of this [[Source]] with the given ones. If this Source is a composite
* of multiple graphs, new attributes on the composite will be less specific than attributes
* set directly on the individual graphs of the composite.
*/
override def withAttributes(attr: Attributes): javadsl.Source[Out, Mat] =
new Source(delegate.withAttributes(attr))
/**
* Add the given attributes to this Source. Further calls to `withAttributes`
* will not remove these attributes. Note that this
* operation has no effect on an empty Flow (because the attributes apply
* only to the contained processing stages).
* Add the given attributes to this [[Source]]. If the specific attribute was already present
* on this graph this means the added attribute will be more specific than the existing one.
* If this Source is a composite of multiple graphs, new attributes on the composite will be
* less specific than attributes set directly on the individual graphs of the composite.
*/
override def addAttributes(attr: Attributes): javadsl.Source[Out, Mat] =
new Source(delegate.addAttributes(attr))
@ -2290,6 +2288,23 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
override def async: javadsl.Source[Out, Mat] =
new Source(delegate.async)
/**
* Put an asynchronous boundary around this `Source`
*
* @param dispatcher Run the graph on this dispatcher
*/
override def async(dispatcher: String): javadsl.Source[Out, Mat] =
new Source(delegate.async(dispatcher))
/**
* Put an asynchronous boundary around this `Source`
*
* @param dispatcher Run the graph on this dispatcher
* @param inputBufferSize Set the input buffer to this size for the graph
*/
override def async(dispatcher: String, inputBufferSize: Int): javadsl.Source[Out, Mat] =
new Source(delegate.async(dispatcher, inputBufferSize))
/**
* Logs elements flowing through the stream as well as completion and erroring.
*

View file

@ -179,8 +179,29 @@ final class BidiFlow[-I1, +O1, -I2, +O2, +Mat](
override def named(name: String): BidiFlow[I1, O1, I2, O2, Mat] =
addAttributes(Attributes.name(name))
/**
* Put an asynchronous boundary around this `BidiFlow`
*/
override def async: BidiFlow[I1, O1, I2, O2, Mat] =
addAttributes(Attributes.asyncBoundary)
super.async.asInstanceOf[BidiFlow[I1, O1, I2, O2, Mat]]
/**
* Put an asynchronous boundary around this `BidiFlow`
*
* @param dispatcher Run the graph on this dispatcher
*/
override def async(dispatcher: String): BidiFlow[I1, O1, I2, O2, Mat] =
super.async(dispatcher).asInstanceOf[BidiFlow[I1, O1, I2, O2, Mat]]
/**
* Put an asynchronous boundary around this `BidiFlow`
*
* @param dispatcher Run the graph on this dispatcher
* @param inputBufferSize Set the input buffer to this size for the graph
*/
override def async(dispatcher: String, inputBufferSize: Int): BidiFlow[I1, O1, I2, O2, Mat] =
super.async(dispatcher, inputBufferSize).asInstanceOf[BidiFlow[I1, O1, I2, O2, Mat]]
}
object BidiFlow {

View file

@ -209,10 +209,11 @@ final class Flow[-In, +Out, +Mat](
}
/**
* Change the attributes of this [[Flow]] to the given ones and seal the list
* of attributes. This means that further calls will not be able to remove these
* attributes, but instead add new ones. Note that this
* operation has no effect on an empty Flow (because the attributes apply
* Replace the attributes of this [[Flow]] with the given ones. If this Flow is a composite
* of multiple graphs, new attributes on the composite will be less specific than attributes
* set directly on the individual graphs of the composite.
*
* Note that this operation has no effect on an empty Flow (because the attributes apply
* only to the contained processing stages).
*/
override def withAttributes(attr: Attributes): Repr[Out] =
@ -221,10 +222,10 @@ final class Flow[-In, +Out, +Mat](
shape)
/**
* Add the given attributes to this Flow. Further calls to `withAttributes`
* will not remove these attributes. Note that this
* operation has no effect on an empty Flow (because the attributes apply
* only to the contained processing stages).
* Add the given attributes to this [[Flow]]. If the specific attribute was already present
* on this graph this means the added attribute will be more specific than the existing one.
* If this Flow is a composite of multiple graphs, new attributes on the composite will be
* less specific than attributes set directly on the individual graphs of the composite.
*/
override def addAttributes(attr: Attributes): Repr[Out] = withAttributes(traversalBuilder.attributes and attr)
@ -236,7 +237,24 @@ final class Flow[-In, +Out, +Mat](
/**
* Put an asynchronous boundary around this `Flow`
*/
override def async: Repr[Out] = addAttributes(Attributes.asyncBoundary)
override def async: Repr[Out] = super.async.asInstanceOf[Repr[Out]]
/**
* Put an asynchronous boundary around this `Flow`
*
* @param dispatcher Run the graph on this dispatcher
*/
override def async(dispatcher: String): Repr[Out] =
super.async(dispatcher).asInstanceOf[Repr[Out]]
/**
* Put an asynchronous boundary around this `Flow`
*
* @param dispatcher Run the graph on this dispatcher
* @param inputBufferSize Set the input buffer to this size for the graph
*/
override def async(dispatcher: String, inputBufferSize: Int): Repr[Out] =
super.async(dispatcher, inputBufferSize).asInstanceOf[Repr[Out]]
/**
* Connect the `Source` to this `Flow` and then connect it to the `Sink` and run it. The returned tuple contains
@ -309,6 +327,16 @@ object Flow {
g match {
case f: Flow[I, O, M] f
case f: javadsl.Flow[I, O, M] f.asScala
case g: GraphStageWithMaterializedValue[FlowShape[I, O], M]
// move these from the stage itself to make the returned source
// behave as it is the stage with regards to attributes
val attrs = g.traversalBuilder.attributes
val noAttrStage = g.withAttributes(Attributes.none)
new Flow(
LinearTraversalBuilder.fromBuilder(noAttrStage.traversalBuilder, noAttrStage.shape, Keep.right),
noAttrStage.shape
).withAttributes(attrs)
case other new Flow(
LinearTraversalBuilder.fromBuilder(g.traversalBuilder, g.shape, Keep.right),
g.shape)
@ -504,7 +532,23 @@ final case class RunnableGraph[+Mat](override val traversalBuilder: TraversalBui
override def named(name: String): RunnableGraph[Mat] =
addAttributes(Attributes.name(name))
override def async: RunnableGraph[Mat] = addAttributes(Attributes.asyncBoundary)
/**
* Note that an async boundary around a runnable graph does not make sense
*/
override def async: RunnableGraph[Mat] =
super.async.asInstanceOf[RunnableGraph[Mat]]
/**
* Note that an async boundary around a runnable graph does not make sense
*/
override def async(dispatcher: String): RunnableGraph[Mat] =
super.async(dispatcher).asInstanceOf[RunnableGraph[Mat]]
/**
* Note that an async boundary around a runnable graph does not make sense
*/
override def async(dispatcher: String, inputBufferSize: Int): RunnableGraph[Mat] =
super.async(dispatcher, inputBufferSize).asInstanceOf[RunnableGraph[Mat]]
}
/**

View file

@ -10,7 +10,7 @@ import akka.stream.actor.ActorSubscriber
import akka.stream.impl.Stages.DefaultAttributes
import akka.stream.impl._
import akka.stream.impl.fusing.GraphStages
import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler }
import akka.stream.stage._
import akka.stream.{ javadsl, _ }
import org.reactivestreams.{ Publisher, Subscriber }
@ -57,11 +57,9 @@ final class Sink[-In, +Mat](
shape)
/**
* Change the attributes of this [[Sink]] to the given ones and seal the list
* of attributes. This means that further calls will not be able to remove these
* attributes, but instead add new ones. Note that this
* operation has no effect on an empty Flow (because the attributes apply
* only to the contained processing stages).
* Replace the attributes of this [[Sink]] with the given ones. If this Sink is a composite
* of multiple graphs, new attributes on the composite will be less specific than attributes
* set directly on the individual graphs of the composite.
*/
override def withAttributes(attr: Attributes): Sink[In, Mat] =
new Sink(
@ -69,10 +67,10 @@ final class Sink[-In, +Mat](
shape)
/**
* Add the given attributes to this Sink. Further calls to `withAttributes`
* will not remove these attributes. Note that this
* operation has no effect on an empty Flow (because the attributes apply
* only to the contained processing stages).
* Add the given attributes to this [[Sink]]. If the specific attribute was already present
* on this graph this means the added attribute will be more specific than the existing one.
* If this Sink is a composite of multiple graphs, new attributes on the composite will be
* less specific than attributes set directly on the individual graphs of the composite.
*/
override def addAttributes(attr: Attributes): Sink[In, Mat] =
withAttributes(traversalBuilder.attributes and attr)
@ -83,9 +81,26 @@ final class Sink[-In, +Mat](
override def named(name: String): Sink[In, Mat] = addAttributes(Attributes.name(name))
/**
* Put an asynchronous boundary around this `Sink`
* Put an asynchronous boundary around this `Source`
*/
override def async: Sink[In, Mat] = addAttributes(Attributes.asyncBoundary)
override def async: Sink[In, Mat] = super.async.asInstanceOf[Sink[In, Mat]]
/**
* Put an asynchronous boundary around this `Graph`
*
* @param dispatcher Run the graph on this dispatcher
*/
override def async(dispatcher: String): Sink[In, Mat] =
super.async(dispatcher).asInstanceOf[Sink[In, Mat]]
/**
* Put an asynchronous boundary around this `Graph`
*
* @param dispatcher Run the graph on this dispatcher
* @param inputBufferSize Set the input buffer to this size for the graph
*/
override def async(dispatcher: String, inputBufferSize: Int): Sink[In, Mat] =
super.async(dispatcher, inputBufferSize).asInstanceOf[Sink[In, Mat]]
/**
* Converts this Scala DSL element to it's Java DSL counterpart.
@ -106,6 +121,16 @@ object Sink {
g match {
case s: Sink[T, M] s
case s: javadsl.Sink[T, M] s.asScala
case g: GraphStageWithMaterializedValue[SinkShape[T], M]
// move these from the stage itself to make the returned source
// behave as it is the stage with regards to attributes
val attrs = g.traversalBuilder.attributes
val noAttrStage = g.withAttributes(Attributes.none)
new Sink(
LinearTraversalBuilder.fromBuilder(noAttrStage.traversalBuilder, noAttrStage.shape, Keep.right),
noAttrStage.shape
).withAttributes(attrs)
case other new Sink(
LinearTraversalBuilder.fromBuilder(other.traversalBuilder, other.shape, Keep.right),
other.shape)

View file

@ -21,6 +21,8 @@ import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ Future, Promise }
import java.util.concurrent.CompletionStage
import akka.stream.stage.{ GraphStage, GraphStageWithMaterializedValue }
import scala.compat.java8.FutureConverters._
/**
@ -137,20 +139,18 @@ final class Source[+Out, +Mat](
def runForeach(f: Out Unit)(implicit materializer: Materializer): Future[Done] = runWith(Sink.foreach(f))
/**
* Change the attributes of this [[Source]] to the given ones and seal the list
* of attributes. This means that further calls will not be able to remove these
* attributes, but instead add new ones. Note that this
* operation has no effect on an empty Flow (because the attributes apply
* only to the contained processing stages).
* Replace the attributes of this [[Source]] with the given ones. If this Source is a composite
* of multiple graphs, new attributes on the composite will be less specific than attributes
* set directly on the individual graphs of the composite.
*/
override def withAttributes(attr: Attributes): Repr[Out] =
new Source(traversalBuilder.setAttributes(attr), shape)
/**
* Add the given attributes to this Source. Further calls to `withAttributes`
* will not remove these attributes. Note that this
* operation has no effect on an empty Flow (because the attributes apply
* only to the contained processing stages).
* Add the given attributes to this Source. If the specific attribute was already on this source
* it will replace the previous value. If this Source is a composite
* of multiple graphs, the added attributes will be on the composite and therefore less specific than attributes
* set directly on the individual graphs of the composite.
*/
override def addAttributes(attr: Attributes): Repr[Out] = withAttributes(traversalBuilder.attributes and attr)
@ -162,7 +162,24 @@ final class Source[+Out, +Mat](
/**
* Put an asynchronous boundary around this `Source`
*/
override def async: Repr[Out] = addAttributes(Attributes.asyncBoundary)
override def async: Repr[Out] = super.async.asInstanceOf[Repr[Out]]
/**
* Put an asynchronous boundary around this `Graph`
*
* @param dispatcher Run the graph on this dispatcher
*/
override def async(dispatcher: String): Repr[Out] =
super.async(dispatcher).asInstanceOf[Repr[Out]]
/**
* Put an asynchronous boundary around this `Graph`
*
* @param dispatcher Run the graph on this dispatcher
* @param inputBufferSize Set the input buffer to this size for the graph
*/
override def async(dispatcher: String, inputBufferSize: Int): Repr[Out] =
super.async(dispatcher, inputBufferSize).asInstanceOf[Repr[Out]]
/**
* Converts this Scala DSL element to it's Java DSL counterpart.
@ -239,9 +256,20 @@ object Source {
def fromGraph[T, M](g: Graph[SourceShape[T], M]): Source[T, M] = g match {
case s: Source[T, M] s
case s: javadsl.Source[T, M] s.asScala
case other new Source(
LinearTraversalBuilder.fromBuilder(other.traversalBuilder, other.shape, Keep.right),
other.shape)
case g: GraphStageWithMaterializedValue[SourceShape[T], M]
// move these from the stage itself to make the returned source
// behave as it is the stage with regards to attributes
val attrs = g.traversalBuilder.attributes
val noAttrStage = g.withAttributes(Attributes.none)
new Source(
LinearTraversalBuilder.fromBuilder(noAttrStage.traversalBuilder, noAttrStage.shape, Keep.right),
noAttrStage.shape
).withAttributes(attrs)
case other
// composite source shaped graph
new Source(
LinearTraversalBuilder.fromBuilder(other.traversalBuilder, other.shape, Keep.right),
other.shape)
}
/**