fix SinkSpec, and async boundary attribute, #22463

* let's keep the AsyncBoundary attribute for now,
  makeIsland is done in setAttributes
* fixing SinkSpec and RunnableGraphSpec
This commit is contained in:
Patrik Nordwall 2017-03-08 14:22:19 +01:00 committed by Konrad `ktoso` Malawski
parent 2990edb66c
commit 6c8e24cefa
7 changed files with 32 additions and 72 deletions

View file

@ -128,7 +128,7 @@ class SinkSpec extends StreamSpec with DefaultTimeout with ScalaFutures {
import Attributes._
val s: Sink[Int, Future[Int]] = Sink.head[Int].async.addAttributes(none).named("name")
s.traversalBuilder.attributes.getFirst[Name] shouldEqual Some(Name("name"))
s.traversalBuilder.attributes.filtered[Name] shouldEqual List(Name("headSink"), Name("name"))
s.traversalBuilder.attributes.getFirst[AsyncBoundary.type] shouldEqual (Some(AsyncBoundary))
}
@ -136,7 +136,7 @@ class SinkSpec extends StreamSpec with DefaultTimeout with ScalaFutures {
import Attributes._
val s: Sink[Int, Future[Int]] = Sink.head[Int].async.addAttributes(none).named("name")
s.traversalBuilder.attributes.getFirst[Name](Name("default")) shouldEqual Name("name")
s.traversalBuilder.attributes.filtered[Name] shouldEqual List(Name("headSink"), Name("name"))
}
"given one attribute of a class should correctly get it as last attribute with default value" in {
@ -148,21 +148,21 @@ class SinkSpec extends StreamSpec with DefaultTimeout with ScalaFutures {
"given no attributes of a class when getting first attribute with default value should get default value" in {
import Attributes._
val s: Sink[Int, Future[Int]] = Sink.head[Int].async.addAttributes(none)
val s: Sink[Int, Future[Int]] = Sink.head[Int].withAttributes(none).async
s.traversalBuilder.attributes.getFirst[Name](Name("default")) shouldEqual Name("default")
}
"given no attributes of a class when getting last attribute with default value should get default value" in {
import Attributes._
val s: Sink[Int, Future[Int]] = Sink.head[Int].async.addAttributes(none)
val s: Sink[Int, Future[Int]] = Sink.head[Int].withAttributes(none).async
s.traversalBuilder.attributes.get[Name](Name("default")) shouldEqual Name("default")
}
"given multiple attributes of a class when getting first attribute with default value should get first attribute" in {
import Attributes._
val s: Sink[Int, Future[Int]] = Sink.head[Int].async.addAttributes(none).named("name").named("another_name")
val s: Sink[Int, Future[Int]] = Sink.head[Int].withAttributes(none).async.named("name").named("another_name")
s.traversalBuilder.attributes.getFirst[Name](Name("default")) shouldEqual Name("name")
}

View file

@ -31,7 +31,7 @@ final case class Attributes(attributeList: List[Attributes.Attribute] = Nil) {
* INTERNAL API
*/
private[stream] def isAsync: Boolean = {
attributeList.exists {
attributeList.nonEmpty && attributeList.exists {
case AsyncBoundary true
case ActorAttributes.Dispatcher(_) true
case _ false

View file

@ -30,7 +30,6 @@ trait Graph[+S <: Shape, +M] {
/**
* Put an asynchronous boundary around this `Graph`
*/
// TODO: no longer encoded as attributes!!!!
def async: Graph[S, M] = addAttributes(Attributes.asyncBoundary)
def addAttributes(attr: Attributes): Graph[S, M] = withAttributes(traversalBuilder.attributes and attr)

View file

@ -267,11 +267,8 @@ sealed trait TraversalBuilder {
protected def internalSetAttributes(attributes: Attributes): TraversalBuilder
def setAttributes(attributes: Attributes): TraversalBuilder = {
if (attributes ne Attributes.none) {
if (attributes.isAsync) this.makeIsland(GraphStageTag).internalSetAttributes(attributes)
else internalSetAttributes(attributes)
} else
this
if (attributes.isAsync) this.makeIsland(GraphStageTag).internalSetAttributes(attributes)
else internalSetAttributes(attributes)
}
def attributes: Attributes
@ -595,11 +592,8 @@ final case class LinearTraversalBuilder(
copy(attributes = attributes)
override def setAttributes(attributes: Attributes): LinearTraversalBuilder = {
if (attributes ne Attributes.none) {
if (attributes.isAsync) this.makeIsland(GraphStageTag).internalSetAttributes(attributes)
else internalSetAttributes(attributes)
} else
this
if (attributes.isAsync) this.makeIsland(GraphStageTag).internalSetAttributes(attributes)
else internalSetAttributes(attributes)
}
private def applyIslandAndAttributes(t: Traversal): Traversal = {
@ -640,10 +634,7 @@ final case class LinearTraversalBuilder(
beforeBuilder.concat(
composite
.assign(out, inOffset - composite.offsetOfModule(out))
.traversal
).concat(traversalSoFar)
),
.traversal).concat(traversalSoFar)),
pendingBuilder = None, beforeBuilder = EmptyTraversal)
case None
copy(inPort = None, outPort = None, traversalSoFar = rewireLastOutTo(traversalSoFar, inOffset))
@ -683,12 +674,9 @@ final case class LinearTraversalBuilder(
composite
.assign(out, relativeSlot)
.traversal
.concat(traversalSoFar)
)
),
.concat(traversalSoFar))),
pendingBuilder = None,
beforeBuilder = EmptyTraversal
)
beforeBuilder = EmptyTraversal)
case None
copy(outPort = None, traversalSoFar = rewireLastOutTo(traversalSoFar, relativeSlot))
}

View file

@ -25,8 +25,7 @@ import akka.annotation.DoNotInherit
*/
final class Flow[-In, +Out, +Mat](
override val traversalBuilder: LinearTraversalBuilder,
override val shape: FlowShape[In, Out]
)
override val shape: FlowShape[In, Out])
extends FlowOpsMat[Out, Mat] with Graph[FlowShape[In, Out], Mat] {
// TODO: debug string
@ -46,13 +45,11 @@ final class Flow[-In, +Out, +Mat](
if (this.isIdentity) {
new Flow(
LinearTraversalBuilder.fromBuilder(flow.traversalBuilder, flow.shape, combine),
flow.shape
).asInstanceOf[Flow[In, T, Mat3]]
flow.shape).asInstanceOf[Flow[In, T, Mat3]]
} else {
new Flow(
traversalBuilder.append(flow.traversalBuilder, flow.shape, combine),
FlowShape[In, T](shape.in, flow.shape.out)
)
FlowShape[In, T](shape.in, flow.shape.out))
}
}
@ -98,13 +95,11 @@ final class Flow[-In, +Out, +Mat](
if (isIdentity) {
new Sink(
LinearTraversalBuilder.fromBuilder(sink.traversalBuilder, sink.shape, combine),
SinkShape(sink.shape.in)
).asInstanceOf[Sink[In, Mat3]]
SinkShape(sink.shape.in)).asInstanceOf[Sink[In, Mat3]]
} else {
new Sink(
traversalBuilder.append(sink.traversalBuilder, sink.shape, combine),
SinkShape(shape.in)
)
SinkShape(shape.in))
}
}
@ -114,8 +109,7 @@ final class Flow[-In, +Out, +Mat](
override def mapMaterializedValue[Mat2](f: Mat Mat2): ReprMat[Out, Mat2] =
new Flow(
traversalBuilder.transformMat(f),
shape
)
shape)
/**
* Join this [[Flow]] to another [[Flow]], by cross connecting the inputs and outputs, creating a [[RunnableGraph]].
@ -208,8 +202,7 @@ final class Flow[-In, +Out, +Mat](
new Flow(
LinearTraversalBuilder.fromBuilder(resultBuilder, newShape, Keep.right),
newShape
)
newShape)
}
/**
@ -222,8 +215,7 @@ final class Flow[-In, +Out, +Mat](
override def withAttributes(attr: Attributes): Repr[Out] =
new Flow(
traversalBuilder.setAttributes(attr),
shape
)
shape)
/**
* Add the given attributes to this Flow. Further calls to `withAttributes`
@ -241,12 +233,7 @@ final class Flow[-In, +Out, +Mat](
/**
* Put an asynchronous boundary around this `Flow`
*/
override def async: Repr[Out] = {
new Flow(
traversalBuilder.makeIsland(GraphStageTag),
shape
)
}
override def async: Repr[Out] = addAttributes(Attributes.asyncBoundary)
/**
* Connect the `Source` to this `Flow` and then connect it to the `Sink` and run it. The returned tuple contains
@ -285,8 +272,7 @@ object Flow {
private[this] val identity: Flow[Any, Any, NotUsed] = new Flow[Any, Any, NotUsed](
identityTraversalBuilder,
GraphStages.Identity.shape
)
GraphStages.Identity.shape)
/**
* Creates a Flow from a Reactive Streams [[org.reactivestreams.Processor]]
@ -322,8 +308,7 @@ object Flow {
case f: javadsl.Flow[I, O, M] f.asScala
case other new Flow(
LinearTraversalBuilder.fromBuilder(g.traversalBuilder, g.shape, Keep.right),
g.shape
)
g.shape)
}
/**
@ -381,8 +366,7 @@ final case class RunnableGraph[+Mat](override val traversalBuilder: TraversalBui
override def named(name: String): RunnableGraph[Mat] =
addAttributes(Attributes.name(name))
override def async: RunnableGraph[Mat] =
new RunnableGraph(traversalBuilder.makeIsland(GraphStageTag))
override def async: RunnableGraph[Mat] = addAttributes(Attributes.asyncBoundary)
}
/**

View file

@ -25,8 +25,7 @@ import scala.util.{ Failure, Success, Try }
*/
final class Sink[-In, +Mat](
override val traversalBuilder: LinearTraversalBuilder,
override val shape: SinkShape[In]
)
override val shape: SinkShape[In])
extends Graph[SinkShape[In], Mat] {
// TODO: Debug string
@ -55,8 +54,7 @@ final class Sink[-In, +Mat](
def mapMaterializedValue[Mat2](f: Mat Mat2): Sink[In, Mat2] =
new Sink(
traversalBuilder.transformMat(f.asInstanceOf[Any Any]),
shape
)
shape)
/**
* Change the attributes of this [[Sink]] to the given ones and seal the list
@ -68,8 +66,7 @@ final class Sink[-In, +Mat](
override def withAttributes(attr: Attributes): Sink[In, Mat] =
new Sink(
traversalBuilder.setAttributes(attr),
shape
)
shape)
/**
* Add the given attributes to this Sink. Further calls to `withAttributes`
@ -88,11 +85,7 @@ final class Sink[-In, +Mat](
/**
* Put an asynchronous boundary around this `Sink`
*/
override def async: Sink[In, Mat] =
new Sink(
traversalBuilder.makeIsland(GraphStageTag),
shape
)
override def async: Sink[In, Mat] = addAttributes(Attributes.asyncBoundary)
/**
* Converts this Scala DSL element to it's Java DSL counterpart.
@ -115,8 +108,7 @@ object Sink {
case s: javadsl.Sink[T, M] s.asScala
case other new Sink(
LinearTraversalBuilder.fromBuilder(other.traversalBuilder, other.shape, Keep.right),
other.shape
)
other.shape)
}
/**

View file

@ -153,9 +153,7 @@ final class Source[+Out, +Mat](
/**
* Put an asynchronous boundary around this `Source`
*/
override def async: Repr[Out] = new Source(
traversalBuilder.makeIsland(GraphStageTag),
shape)
override def async: Repr[Out] = addAttributes(Attributes.asyncBoundary)
/**
* Converts this Scala DSL element to it's Java DSL counterpart.
@ -233,8 +231,7 @@ object Source {
case s: javadsl.Source[T, M] s.asScala
case other new Source(
LinearTraversalBuilder.fromBuilder(other.traversalBuilder, other.shape, Keep.right),
other.shape
)
other.shape)
}
/**