diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala index 3a66eb6207..c59b0aa996 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala @@ -128,7 +128,7 @@ class SinkSpec extends StreamSpec with DefaultTimeout with ScalaFutures { import Attributes._ val s: Sink[Int, Future[Int]] = Sink.head[Int].async.addAttributes(none).named("name") - s.traversalBuilder.attributes.getFirst[Name] shouldEqual Some(Name("name")) + s.traversalBuilder.attributes.filtered[Name] shouldEqual List(Name("headSink"), Name("name")) s.traversalBuilder.attributes.getFirst[AsyncBoundary.type] shouldEqual (Some(AsyncBoundary)) } @@ -136,7 +136,7 @@ class SinkSpec extends StreamSpec with DefaultTimeout with ScalaFutures { import Attributes._ val s: Sink[Int, Future[Int]] = Sink.head[Int].async.addAttributes(none).named("name") - s.traversalBuilder.attributes.getFirst[Name](Name("default")) shouldEqual Name("name") + s.traversalBuilder.attributes.filtered[Name] shouldEqual List(Name("headSink"), Name("name")) } "given one attribute of a class should correctly get it as last attribute with default value" in { @@ -148,21 +148,21 @@ class SinkSpec extends StreamSpec with DefaultTimeout with ScalaFutures { "given no attributes of a class when getting first attribute with default value should get default value" in { import Attributes._ - val s: Sink[Int, Future[Int]] = Sink.head[Int].async.addAttributes(none) + val s: Sink[Int, Future[Int]] = Sink.head[Int].withAttributes(none).async s.traversalBuilder.attributes.getFirst[Name](Name("default")) shouldEqual Name("default") } "given no attributes of a class when getting last attribute with default value should get default value" in { import Attributes._ - val s: Sink[Int, Future[Int]] = Sink.head[Int].async.addAttributes(none) + val s: Sink[Int, Future[Int]] = Sink.head[Int].withAttributes(none).async s.traversalBuilder.attributes.get[Name](Name("default")) shouldEqual Name("default") } "given multiple attributes of a class when getting first attribute with default value should get first attribute" in { import Attributes._ - val s: Sink[Int, Future[Int]] = Sink.head[Int].async.addAttributes(none).named("name").named("another_name") + val s: Sink[Int, Future[Int]] = Sink.head[Int].withAttributes(none).async.named("name").named("another_name") s.traversalBuilder.attributes.getFirst[Name](Name("default")) shouldEqual Name("name") } diff --git a/akka-stream/src/main/scala/akka/stream/Attributes.scala b/akka-stream/src/main/scala/akka/stream/Attributes.scala index c3fb1a42cb..11a3ea968d 100644 --- a/akka-stream/src/main/scala/akka/stream/Attributes.scala +++ b/akka-stream/src/main/scala/akka/stream/Attributes.scala @@ -31,7 +31,7 @@ final case class Attributes(attributeList: List[Attributes.Attribute] = Nil) { * INTERNAL API */ private[stream] def isAsync: Boolean = { - attributeList.exists { + attributeList.nonEmpty && attributeList.exists { case AsyncBoundary ⇒ true case ActorAttributes.Dispatcher(_) ⇒ true case _ ⇒ false diff --git a/akka-stream/src/main/scala/akka/stream/Graph.scala b/akka-stream/src/main/scala/akka/stream/Graph.scala index 4a2422e7af..22f6e386fd 100644 --- a/akka-stream/src/main/scala/akka/stream/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/Graph.scala @@ -30,7 +30,6 @@ trait Graph[+S <: Shape, +M] { /** * Put an asynchronous boundary around this `Graph` */ - // TODO: no longer encoded as attributes!!!! def async: Graph[S, M] = addAttributes(Attributes.asyncBoundary) def addAttributes(attr: Attributes): Graph[S, M] = withAttributes(traversalBuilder.attributes and attr) diff --git a/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala b/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala index da1f835f1c..74da492efb 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala @@ -267,11 +267,8 @@ sealed trait TraversalBuilder { protected def internalSetAttributes(attributes: Attributes): TraversalBuilder def setAttributes(attributes: Attributes): TraversalBuilder = { - if (attributes ne Attributes.none) { - if (attributes.isAsync) this.makeIsland(GraphStageTag).internalSetAttributes(attributes) - else internalSetAttributes(attributes) - } else - this + if (attributes.isAsync) this.makeIsland(GraphStageTag).internalSetAttributes(attributes) + else internalSetAttributes(attributes) } def attributes: Attributes @@ -595,11 +592,8 @@ final case class LinearTraversalBuilder( copy(attributes = attributes) override def setAttributes(attributes: Attributes): LinearTraversalBuilder = { - if (attributes ne Attributes.none) { - if (attributes.isAsync) this.makeIsland(GraphStageTag).internalSetAttributes(attributes) - else internalSetAttributes(attributes) - } else - this + if (attributes.isAsync) this.makeIsland(GraphStageTag).internalSetAttributes(attributes) + else internalSetAttributes(attributes) } private def applyIslandAndAttributes(t: Traversal): Traversal = { @@ -640,10 +634,7 @@ final case class LinearTraversalBuilder( beforeBuilder.concat( composite .assign(out, inOffset - composite.offsetOfModule(out)) - .traversal - - ).concat(traversalSoFar) - ), + .traversal).concat(traversalSoFar)), pendingBuilder = None, beforeBuilder = EmptyTraversal) case None ⇒ copy(inPort = None, outPort = None, traversalSoFar = rewireLastOutTo(traversalSoFar, inOffset)) @@ -683,12 +674,9 @@ final case class LinearTraversalBuilder( composite .assign(out, relativeSlot) .traversal - .concat(traversalSoFar) - ) - ), + .concat(traversalSoFar))), pendingBuilder = None, - beforeBuilder = EmptyTraversal - ) + beforeBuilder = EmptyTraversal) case None ⇒ copy(outPort = None, traversalSoFar = rewireLastOutTo(traversalSoFar, relativeSlot)) } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 0bd4b83c8e..c7040766ab 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -25,8 +25,7 @@ import akka.annotation.DoNotInherit */ final class Flow[-In, +Out, +Mat]( override val traversalBuilder: LinearTraversalBuilder, - override val shape: FlowShape[In, Out] -) + override val shape: FlowShape[In, Out]) extends FlowOpsMat[Out, Mat] with Graph[FlowShape[In, Out], Mat] { // TODO: debug string @@ -46,13 +45,11 @@ final class Flow[-In, +Out, +Mat]( if (this.isIdentity) { new Flow( LinearTraversalBuilder.fromBuilder(flow.traversalBuilder, flow.shape, combine), - flow.shape - ).asInstanceOf[Flow[In, T, Mat3]] + flow.shape).asInstanceOf[Flow[In, T, Mat3]] } else { new Flow( traversalBuilder.append(flow.traversalBuilder, flow.shape, combine), - FlowShape[In, T](shape.in, flow.shape.out) - ) + FlowShape[In, T](shape.in, flow.shape.out)) } } @@ -98,13 +95,11 @@ final class Flow[-In, +Out, +Mat]( if (isIdentity) { new Sink( LinearTraversalBuilder.fromBuilder(sink.traversalBuilder, sink.shape, combine), - SinkShape(sink.shape.in) - ).asInstanceOf[Sink[In, Mat3]] + SinkShape(sink.shape.in)).asInstanceOf[Sink[In, Mat3]] } else { new Sink( traversalBuilder.append(sink.traversalBuilder, sink.shape, combine), - SinkShape(shape.in) - ) + SinkShape(shape.in)) } } @@ -114,8 +109,7 @@ final class Flow[-In, +Out, +Mat]( override def mapMaterializedValue[Mat2](f: Mat ⇒ Mat2): ReprMat[Out, Mat2] = new Flow( traversalBuilder.transformMat(f), - shape - ) + shape) /** * Join this [[Flow]] to another [[Flow]], by cross connecting the inputs and outputs, creating a [[RunnableGraph]]. @@ -208,8 +202,7 @@ final class Flow[-In, +Out, +Mat]( new Flow( LinearTraversalBuilder.fromBuilder(resultBuilder, newShape, Keep.right), - newShape - ) + newShape) } /** @@ -222,8 +215,7 @@ final class Flow[-In, +Out, +Mat]( override def withAttributes(attr: Attributes): Repr[Out] = new Flow( traversalBuilder.setAttributes(attr), - shape - ) + shape) /** * Add the given attributes to this Flow. Further calls to `withAttributes` @@ -241,12 +233,7 @@ final class Flow[-In, +Out, +Mat]( /** * Put an asynchronous boundary around this `Flow` */ - override def async: Repr[Out] = { - new Flow( - traversalBuilder.makeIsland(GraphStageTag), - shape - ) - } + override def async: Repr[Out] = addAttributes(Attributes.asyncBoundary) /** * Connect the `Source` to this `Flow` and then connect it to the `Sink` and run it. The returned tuple contains @@ -285,8 +272,7 @@ object Flow { private[this] val identity: Flow[Any, Any, NotUsed] = new Flow[Any, Any, NotUsed]( identityTraversalBuilder, - GraphStages.Identity.shape - ) + GraphStages.Identity.shape) /** * Creates a Flow from a Reactive Streams [[org.reactivestreams.Processor]] @@ -322,8 +308,7 @@ object Flow { case f: javadsl.Flow[I, O, M] ⇒ f.asScala case other ⇒ new Flow( LinearTraversalBuilder.fromBuilder(g.traversalBuilder, g.shape, Keep.right), - g.shape - ) + g.shape) } /** @@ -381,8 +366,7 @@ final case class RunnableGraph[+Mat](override val traversalBuilder: TraversalBui override def named(name: String): RunnableGraph[Mat] = addAttributes(Attributes.name(name)) - override def async: RunnableGraph[Mat] = - new RunnableGraph(traversalBuilder.makeIsland(GraphStageTag)) + override def async: RunnableGraph[Mat] = addAttributes(Attributes.asyncBoundary) } /** diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala index 240672b3fd..4911b77ee9 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala @@ -25,8 +25,7 @@ import scala.util.{ Failure, Success, Try } */ final class Sink[-In, +Mat]( override val traversalBuilder: LinearTraversalBuilder, - override val shape: SinkShape[In] -) + override val shape: SinkShape[In]) extends Graph[SinkShape[In], Mat] { // TODO: Debug string @@ -55,8 +54,7 @@ final class Sink[-In, +Mat]( def mapMaterializedValue[Mat2](f: Mat ⇒ Mat2): Sink[In, Mat2] = new Sink( traversalBuilder.transformMat(f.asInstanceOf[Any ⇒ Any]), - shape - ) + shape) /** * Change the attributes of this [[Sink]] to the given ones and seal the list @@ -68,8 +66,7 @@ final class Sink[-In, +Mat]( override def withAttributes(attr: Attributes): Sink[In, Mat] = new Sink( traversalBuilder.setAttributes(attr), - shape - ) + shape) /** * Add the given attributes to this Sink. Further calls to `withAttributes` @@ -88,11 +85,7 @@ final class Sink[-In, +Mat]( /** * Put an asynchronous boundary around this `Sink` */ - override def async: Sink[In, Mat] = - new Sink( - traversalBuilder.makeIsland(GraphStageTag), - shape - ) + override def async: Sink[In, Mat] = addAttributes(Attributes.asyncBoundary) /** * Converts this Scala DSL element to it's Java DSL counterpart. @@ -115,8 +108,7 @@ object Sink { case s: javadsl.Sink[T, M] ⇒ s.asScala case other ⇒ new Sink( LinearTraversalBuilder.fromBuilder(other.traversalBuilder, other.shape, Keep.right), - other.shape - ) + other.shape) } /** diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index 8912ebbc8c..ce2018d953 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -153,9 +153,7 @@ final class Source[+Out, +Mat]( /** * Put an asynchronous boundary around this `Source` */ - override def async: Repr[Out] = new Source( - traversalBuilder.makeIsland(GraphStageTag), - shape) + override def async: Repr[Out] = addAttributes(Attributes.asyncBoundary) /** * Converts this Scala DSL element to it's Java DSL counterpart. @@ -233,8 +231,7 @@ object Source { case s: javadsl.Source[T, M] ⇒ s.asScala case other ⇒ new Source( LinearTraversalBuilder.fromBuilder(other.traversalBuilder, other.shape, Keep.right), - other.shape - ) + other.shape) } /**