diff --git a/akka-stream/src/main/scala/akka/stream/Attributes.scala b/akka-stream/src/main/scala/akka/stream/Attributes.scala index db6091ca57..7161e26b98 100644 --- a/akka-stream/src/main/scala/akka/stream/Attributes.scala +++ b/akka-stream/src/main/scala/akka/stream/Attributes.scala @@ -143,6 +143,7 @@ final case class Attributes(attributeList: List[Attributes.Attribute] = Nil) { if (i.hasNext) i.next() match { case Name(n) ⇒ + // FIXME this URLEncode is a bug IMO, if that format is important then that is how it should be store in Name val nn = URLEncoder.encode(n, "UTF-8") if (buf ne null) concatNames(i, null, buf.append('-').append(nn)) else if (first ne null) { @@ -171,11 +172,12 @@ object Attributes { final case class Name(n: String) extends Attribute final case class InputBuffer(initial: Int, max: Int) extends Attribute final case class LogLevels(onElement: Logging.LogLevel, onFinish: Logging.LogLevel, onFailure: Logging.LogLevel) extends Attribute + final case object AsyncBoundary extends Attribute + object LogLevels { /** Use to disable logging on certain operations when configuring [[Attributes.LogLevels]] */ final val Off: Logging.LogLevel = Logging.levelFor("off").get } - final case object AsyncBoundary extends Attribute /** * INTERNAL API diff --git a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala index 0fa12df2ea..7a2b59fc2b 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala @@ -25,14 +25,17 @@ private[stream] object Stages { val IODispatcher = ActorAttributes.Dispatcher("akka.stream.default-blocking-io-dispatcher") val fused = name("fused") + val materializedValueSource = name("matValueSource") val map = name("map") val log = name("log") val filter = name("filter") + val filterNot = name("filterNot") val collect = name("collect") val recover = name("recover") val mapAsync = name("mapAsync") val mapAsyncUnordered = name("mapAsyncUnordered") val grouped = name("grouped") + val groupedWithin = name("groupedWithin") val limit = name("limit") val limitWeighted = name("limitWeighted") val sliding = name("sliding") @@ -47,6 +50,7 @@ private[stream] object Stages { val conflate = name("conflate") val expand = name("expand") val mapConcat = name("mapConcat") + val detacher = name("detacher") val groupBy = name("groupBy") val prefixAndTail = name("prefixAndTail") val split = name("split") @@ -57,6 +61,7 @@ private[stream] object Stages { val merge = name("merge") val mergePreferred = name("mergePreferred") + val flattenMerge = name("flattenMerge") val broadcast = name("broadcast") val balance = name("balance") val zip = name("zip") @@ -65,6 +70,7 @@ private[stream] object Stages { val repeat = name("repeat") val unfold = name("unfold") val unfoldAsync = name("unfoldAsync") + val delay = name("delay") and inputBuffer(16, 16) val publisherSource = name("publisherSource") val iterableSource = name("iterableSource") diff --git a/akka-stream/src/main/scala/akka/stream/impl/Throttle.scala b/akka-stream/src/main/scala/akka/stream/impl/Throttle.scala index 797fff8650..2321d96e69 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Throttle.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Throttle.scala @@ -20,6 +20,9 @@ private[stream] class Throttle[T](cost: Int, costCalculation: (T) ⇒ Int, mode: ThrottleMode) extends SimpleLinearGraphStage[T] { + require(cost > 0, "cost must be > 0") + require(per.toMillis > 0, "per time must be > 0") + require(!(mode == ThrottleMode.Enforcing && maximumBurst < 0), "maximumBurst must be > 0 in Enforcing mode") override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) { var willStop = false diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala index 566003eaed..5faf5153ab 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala @@ -49,7 +49,7 @@ object GraphStages { } object Identity extends SimpleLinearGraphStage[Any] { - override def initialAttributes = Attributes.name("identityOp") + override def initialAttributes = DefaultAttributes.identityOp override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { setHandler(in, new InHandler { @@ -70,9 +70,9 @@ object GraphStages { * INERNAL API */ private[stream] final class Detacher[T] extends GraphStage[FlowShape[T, T]] { - val in = Inlet[T]("in") - val out = Outlet[T]("out") - override def initialAttributes = Attributes.name("Detacher") + val in = Inlet[T]("Detacher.in") + val out = Outlet[T]("Detacher.out") + override def initialAttributes = DefaultAttributes.detacher override val shape = FlowShape(in, out) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { @@ -224,13 +224,11 @@ object GraphStages { } } - class TickSource[T](initialDelay: FiniteDuration, interval: FiniteDuration, tick: T) + final class TickSource[T](initialDelay: FiniteDuration, interval: FiniteDuration, tick: T) extends GraphStageWithMaterializedValue[SourceShape[T], Cancellable] { - - val out = Outlet[T]("TimerSource.out") - override def initialAttributes = Attributes.name("TickSource") - override val shape = SourceShape(out) - + override val shape = SourceShape(Outlet[T]("TickSource.out")) + val out = shape.out + override def initialAttributes: Attributes = DefaultAttributes.tickSource override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Cancellable) = { import TickSource._ @@ -248,9 +246,7 @@ object GraphStages { cancellable.cancelFuture.onComplete(_ ⇒ callback.invoke(()))(interpreter.materializer.executionContext) } - setHandler(out, new OutHandler { - override def onPull() = () // Do nothing - }) + setHandler(out, eagerTerminateOutput) override protected def onTimer(timerKey: Any) = if (isAvailable(out)) push(out, tick) @@ -269,9 +265,9 @@ object GraphStages { * * This source is not reusable, it is only created internally. */ - private[stream] class MaterializedValueSource[T](val computation: MaterializedValueNode, val out: Outlet[T]) extends GraphStage[SourceShape[T]] { + private[stream] final class MaterializedValueSource[T](val computation: MaterializedValueNode, val out: Outlet[T]) extends GraphStage[SourceShape[T]] { def this(computation: MaterializedValueNode) = this(computation, Outlet[T]("matValue")) - override def initialAttributes: Attributes = Attributes.name("matValueSource") + override def initialAttributes: Attributes = DefaultAttributes.materializedValueSource override val shape = SourceShape(out) private val promise = Promise[T] @@ -287,10 +283,11 @@ object GraphStages { } } - override def toString: String = s"MatValSrc($computation)" + override def toString: String = s"MaterializedValueSource($computation)" } - private[stream] class SingleSource[T](val elem: T) extends GraphStage[SourceShape[T]] { + private[stream] final class SingleSource[T](val elem: T) extends GraphStage[SourceShape[T]] { + override def initialAttributes: Attributes = DefaultAttributes.singleSource ReactiveStreamsCompliance.requireNonNullElement(elem) val out = Outlet[T]("single.out") val shape = SourceShape(out) diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index e9d79b8def..ab19696c11 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -7,6 +7,7 @@ import akka.event.Logging.LogLevel import akka.event.{ LogSource, Logging, LoggingAdapter } import akka.stream.Attributes.{ InputBuffer, LogLevels } import akka.stream.DelayOverflowStrategy.EmitEarly +import akka.stream.impl.Stages.DefaultAttributes import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage import akka.stream.impl.{ FixedSizeBuffer, BoundedBuffer, ReactiveStreamsCompliance } import akka.stream.stage._ @@ -241,6 +242,9 @@ private[akka] final case class Fold[In, Out](zero: Out, f: (Out, In) ⇒ Out, de * INTERNAL API */ final case class Intersperse[T](start: Option[T], inject: T, end: Option[T]) extends GraphStage[FlowShape[T, T]] { + ReactiveStreamsCompliance.requireNonNullElement(inject) + if(start.isDefined) ReactiveStreamsCompliance.requireNonNullElement(start.get) + if(end.isDefined) ReactiveStreamsCompliance.requireNonNullElement(end.get) private val in = Inlet[T]("in") private val out = Outlet[T]("out") @@ -538,15 +542,14 @@ private[akka] final case class MapAsync[In, Out](parallelism: Int, f: In ⇒ Fut private val in = Inlet[In]("in") private val out = Outlet[Out]("out") - override def initialAttributes = Attributes.name("MapAsync") + override def initialAttributes = DefaultAttributes.mapAsync override val shape = FlowShape(in, out) override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) { override def toString = s"MapAsync.Logic(buffer=$buffer)" - val decider = - inheritedAttributes.getAttribute(classOf[SupervisionStrategy]) - .map(_.decider).getOrElse(Supervision.stoppingDecider) + //FIXME Put Supervision.stoppingDecider as a SupervisionStrategy on DefaultAttributes.mapAsync? + val decider = inheritedAttributes.getAttribute(classOf[SupervisionStrategy]).map(_.decider).getOrElse(Supervision.stoppingDecider) val buffer = new BoundedBuffer[Holder[Try[Out]]](parallelism) def todo = buffer.used @@ -617,7 +620,7 @@ private[akka] final case class MapAsyncUnordered[In, Out](parallelism: Int, f: I private val in = Inlet[In]("in") private val out = Outlet[Out]("out") - override def initialAttributes = Attributes.name("MapAsyncUnordered") + override def initialAttributes = DefaultAttributes.mapAsyncUnordered override val shape = FlowShape(in, out) override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) { @@ -782,10 +785,13 @@ private[stream] object TimerKeys { case object GroupedWithinTimerKey } -private[stream] class GroupedWithin[T](n: Int, d: FiniteDuration) extends GraphStage[FlowShape[T, immutable.Seq[T]]] { +private[stream] final class GroupedWithin[T](n: Int, d: FiniteDuration) extends GraphStage[FlowShape[T, immutable.Seq[T]]] { + require(n > 0, "n must be greater than 0") + require(d > Duration.Zero) + val in = Inlet[T]("in") val out = Outlet[immutable.Seq[T]]("out") - override def initialAttributes = Attributes.name("GroupedWithin") + override def initialAttributes = DefaultAttributes.groupedWithin val shape = FlowShape(in, out) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) { @@ -854,15 +860,21 @@ private[stream] class GroupedWithin[T](n: Int, d: FiniteDuration) extends GraphS } } -private[stream] class Delay[T](d: FiniteDuration, strategy: DelayOverflowStrategy) extends SimpleLinearGraphStage[T] { - +private[stream] final class Delay[T](d: FiniteDuration, strategy: DelayOverflowStrategy) extends SimpleLinearGraphStage[T] { + private[this] def timerName = "DelayedTimer" + override def initialAttributes: Attributes = DefaultAttributes.delay override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) { - val size = inheritedAttributes.getAttribute(classOf[InputBuffer], InputBuffer(16, 16)).max + val size = + inheritedAttributes.getAttribute(classOf[InputBuffer]) match { + case None ⇒ throw new IllegalStateException(s"Couldn't find InputBuffer Attribute for $this") + case Some(InputBuffer(min, max)) ⇒ max + } + val buffer = FixedSizeBuffer[(Long, T)](size) // buffer has pairs timestamp with upstream element - val timerName = "DelayedTimer" var willStop = false setHandler(in, handler = new InHandler { + //FIXME rewrite into distinct strategy functions to avoid matching on strategy for every input when full override def onPush(): Unit = { if (buffer.isFull) (strategy: @unchecked) match { case EmitEarly ⇒ @@ -932,7 +944,7 @@ private[stream] class Delay[T](d: FiniteDuration, strategy: DelayOverflowStrateg override def toString = "Delay" } -private[stream] class TakeWithin[T](timeout: FiniteDuration) extends SimpleLinearGraphStage[T] { +private[stream] final class TakeWithin[T](timeout: FiniteDuration) extends SimpleLinearGraphStage[T] { override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) { setHandler(in, new InHandler { @@ -952,7 +964,7 @@ private[stream] class TakeWithin[T](timeout: FiniteDuration) extends SimpleLinea override def toString = "TakeWithin" } -private[stream] class DropWithin[T](timeout: FiniteDuration) extends SimpleLinearGraphStage[T] { +private[stream] final class DropWithin[T](timeout: FiniteDuration) extends SimpleLinearGraphStage[T] { override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) { private var allow = false @@ -973,4 +985,4 @@ private[stream] class DropWithin[T](timeout: FiniteDuration) extends SimpleLinea } override def toString = "DropWithin" -} +} \ No newline at end of file diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala index 923f3b1a31..72a25e426e 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala @@ -5,6 +5,7 @@ package akka.stream.impl.fusing import java.util.concurrent.atomic.AtomicReference import akka.stream._ +import akka.stream.impl.Stages.DefaultAttributes import akka.stream.impl.SubscriptionTimeoutException import akka.stream.stage._ import akka.stream.scaladsl._ @@ -30,7 +31,7 @@ final class FlattenMerge[T, M](breadth: Int) extends GraphStage[FlowShape[Graph[ private val in = Inlet[Graph[SourceShape[T], M]]("flatten.in") private val out = Outlet[T]("flatten.out") - override def initialAttributes = Attributes.name("FlattenMerge") + override def initialAttributes = DefaultAttributes.flattenMerge override val shape = FlowShape(in, out) override def createLogic(attr: Attributes) = new GraphStageLogic(shape) { @@ -111,7 +112,7 @@ final class PrefixAndTail[T](n: Int) extends GraphStage[FlowShape[T, (immutable. val out: Outlet[(immutable.Seq[T], Source[T, Unit])] = Outlet("PrefixAndTail.out") override val shape: FlowShape[T, (immutable.Seq[T], Source[T, Unit])] = FlowShape(in, out) - override def initialAttributes = Attributes.name("PrefixAndTail") + override def initialAttributes = DefaultAttributes.prefixAndTail private final class PrefixAndTailLogic(_shape: Shape) extends TimerGraphStageLogic(_shape) with OutHandler with InHandler { diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index ef51d07c6d..455de072cb 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -534,7 +534,7 @@ trait FlowOps[+Out, +Mat] { * '''Cancels when''' downstream cancels */ def filterNot(p: Out ⇒ Boolean): Repr[Out] = - via(Flow[Out].filter(!p(_)).withAttributes(name("filterNot"))) + via(Flow[Out].filter(!p(_)).withAttributes(DefaultAttributes.filterNot)) /** * Terminate processing (and cancel the upstream publisher) after predicate @@ -738,12 +738,8 @@ trait FlowOps[+Out, +Mat] { * * '''Cancels when''' downstream cancels */ - def intersperse[T >: Out](start: T, inject: T, end: T): Repr[T] = { - ReactiveStreamsCompliance.requireNonNullElement(start) - ReactiveStreamsCompliance.requireNonNullElement(inject) - ReactiveStreamsCompliance.requireNonNullElement(end) + def intersperse[T >: Out](start: T, inject: T, end: T): Repr[T] = via(Intersperse(Some(start), inject, Some(end))) - } /** * Intersperses stream with provided element, similar to how [[scala.collection.immutable.List.mkString]] @@ -767,10 +763,8 @@ trait FlowOps[+Out, +Mat] { * * '''Cancels when''' downstream cancels */ - def intersperse[T >: Out](inject: T): Repr[T] = { - ReactiveStreamsCompliance.requireNonNullElement(inject) + def intersperse[T >: Out](inject: T): Repr[T] = via(Intersperse(None, inject, None)) - } /** * Chunk up this stream into groups of elements received within a time window, @@ -790,11 +784,8 @@ trait FlowOps[+Out, +Mat] { * * '''Cancels when''' downstream completes */ - def groupedWithin(n: Int, d: FiniteDuration): Repr[immutable.Seq[Out]] = { - require(n > 0, "n must be greater than 0") - require(d > Duration.Zero) - via(new GroupedWithin[Out](n, d).withAttributes(name("groupedWithin"))) - } + def groupedWithin(n: Int, d: FiniteDuration): Repr[immutable.Seq[Out]] = + via(new GroupedWithin[Out](n, d)) /** * Shifts elements emission in time by a specified amount. It allows to store elements @@ -822,7 +813,7 @@ trait FlowOps[+Out, +Mat] { * @param strategy Strategy that is used when incoming elements cannot fit inside the buffer */ def delay(of: FiniteDuration, strategy: DelayOverflowStrategy = DelayOverflowStrategy.dropTail): Repr[Out] = - via(new Delay[Out](of, strategy).withAttributes(name("delay"))) + via(new Delay[Out](of, strategy)) /** * Discard the given number of elements at the beginning of the stream. @@ -850,7 +841,7 @@ trait FlowOps[+Out, +Mat] { * '''Cancels when''' downstream cancels */ def dropWithin(d: FiniteDuration): Repr[Out] = - via(new DropWithin[Out](d).withAttributes(name("dropWithin"))) + via(new DropWithin[Out](d)) /** * Terminate processing (and cancel the upstream publisher) after the given @@ -890,7 +881,7 @@ trait FlowOps[+Out, +Mat] { * * '''Cancels when''' downstream cancels or timer fires */ - def takeWithin(d: FiniteDuration): Repr[Out] = via(new TakeWithin[Out](d).withAttributes(name("takeWithin"))) + def takeWithin(d: FiniteDuration): Repr[Out] = via(new TakeWithin[Out](d)) /** * Allows a faster upstream to progress independently of a slower subscriber by conflating elements into a summary @@ -1285,13 +1276,8 @@ trait FlowOps[+Out, +Mat] { * * '''Cancels when''' downstream cancels */ - def throttle(elements: Int, per: FiniteDuration, maximumBurst: Int, - mode: ThrottleMode): Repr[Out] = { - require(elements > 0, "elements must be > 0") - require(per.toMillis > 0, "per time must be > 0") - require(!(mode == ThrottleMode.Enforcing && maximumBurst < 0), "maximumBurst must be > 0 in Enforcing mode") - via(new Throttle(elements, per, maximumBurst, _ ⇒ 1, mode)) - } + def throttle(elements: Int, per: FiniteDuration, maximumBurst: Int, mode: ThrottleMode): Repr[Out] = + throttle(elements, per, maximumBurst, _ ⇒ 1, mode) /** * Sends elements downstream with speed limited to `cost/per`. Cost is @@ -1320,11 +1306,8 @@ trait FlowOps[+Out, +Mat] { * '''Cancels when''' downstream cancels */ def throttle(cost: Int, per: FiniteDuration, maximumBurst: Int, - costCalculation: (Out) ⇒ Int, mode: ThrottleMode): Repr[Out] = { - require(per.toMillis > 0, "per time must be > 0") - require(!(mode == ThrottleMode.Enforcing && maximumBurst < 0), "maximumBurst must be > 0 in Enforcing mode") + costCalculation: (Out) ⇒ Int, mode: ThrottleMode): Repr[Out] = via(new Throttle(cost, per, maximumBurst, costCalculation, mode)) - } /** * Detaches upstream demand from downstream demand without detaching the diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala index d5ec09c67f..983ecc5c13 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala @@ -7,7 +7,7 @@ import akka.stream._ import akka.stream.impl._ import akka.stream.impl.fusing.GraphStages import akka.stream.impl.fusing.GraphStages.MaterializedValueSource -import akka.stream.impl.Stages.{ StageModule, SymbolicStage } +import akka.stream.impl.Stages.{DefaultAttributes, StageModule, SymbolicStage} import akka.stream.impl.StreamLayout._ import akka.stream.stage.{ OutHandler, InHandler, GraphStageLogic, GraphStage } import scala.annotation.unchecked.uncheckedVariance @@ -42,7 +42,7 @@ final class Merge[T] private (val inputPorts: Int, val eagerComplete: Boolean) e val in: immutable.IndexedSeq[Inlet[T]] = Vector.tabulate(inputPorts)(i ⇒ Inlet[T]("Merge.in" + i)) val out: Outlet[T] = Outlet[T]("Merge.out") - override def initialAttributes = Attributes.name("Merge") + override def initialAttributes = DefaultAttributes.merge override val shape: UniformFanInShape[T, T] = UniformFanInShape(out, in: _*) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { @@ -145,7 +145,7 @@ object MergePreferred { final class MergePreferred[T] private (val secondaryPorts: Int, val eagerComplete: Boolean) extends GraphStage[MergePreferred.MergePreferredShape[T]] { require(secondaryPorts >= 1, "A MergePreferred must have more than 0 secondary input ports") - override def initialAttributes = Attributes.name("MergePreferred") + override def initialAttributes = DefaultAttributes.mergePreferred override val shape: MergePreferred.MergePreferredShape[T] = new MergePreferred.MergePreferredShape(secondaryPorts, "MergePreferred") @@ -397,7 +397,7 @@ final class Broadcast[T](private val outputPorts: Int, eagerCancel: Boolean) ext require(outputPorts > 1, "A Broadcast must have more than 1 output ports") val in: Inlet[T] = Inlet[T]("Broadast.in") val out: immutable.IndexedSeq[Outlet[T]] = Vector.tabulate(outputPorts)(i ⇒ Outlet[T]("Broadcast.out" + i)) - override def initialAttributes = Attributes.name("Broadcast") + override def initialAttributes = DefaultAttributes.broadcast override val shape: UniformFanOutShape[T, T] = UniformFanOutShape(in, out: _*) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { @@ -496,7 +496,7 @@ final class Balance[T](val outputPorts: Int, waitForAllDownstreams: Boolean) ext require(outputPorts > 1, "A Balance must have more than 1 output ports") val in: Inlet[T] = Inlet[T]("Balance.in") val out: immutable.IndexedSeq[Outlet[T]] = Vector.tabulate(outputPorts)(i ⇒ Outlet[T]("Balance.out" + i)) - override def initialAttributes = Attributes.name("Balance") + override def initialAttributes = DefaultAttributes.balance override val shape: UniformFanOutShape[T, T] = UniformFanOutShape[T, T](in, out: _*) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { @@ -664,7 +664,7 @@ final class Concat[T](inputPorts: Int) extends GraphStage[UniformFanInShape[T, T require(inputPorts > 1, "A Concat must have more than 1 input ports") val in: immutable.IndexedSeq[Inlet[T]] = Vector.tabulate(inputPorts)(i ⇒ Inlet[T]("Concat.in" + i)) val out: Outlet[T] = Outlet[T]("Concat.out") - override def initialAttributes = Attributes.name("Concat") + override def initialAttributes = DefaultAttributes.concat override val shape: UniformFanInShape[T, T] = UniformFanInShape(out, in: _*) override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) { diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index 067bcd9461..f43eb07630 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -225,14 +225,14 @@ object Source { * receive new tick elements as soon as it has requested more elements. */ def tick[T](initialDelay: FiniteDuration, interval: FiniteDuration, tick: T): Source[T, Cancellable] = - fromGraph(new TickSource[T](initialDelay, interval, tick).withAttributes(DefaultAttributes.tickSource)) + fromGraph(new TickSource[T](initialDelay, interval, tick)) /** * Create a `Source` with one element. * Every connected `Sink` of this stream will see an individual stream consisting of one element. */ def single[T](element: T): Source[T, Unit] = - fromGraph(new GraphStages.SingleSource(element).withAttributes(DefaultAttributes.singleSource)) + fromGraph(new GraphStages.SingleSource(element)) /** * Create a `Source` that will continually emit the given element.