From 373f1acf3ad32419df943d21ee09189ee6cd43fb Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 20 Apr 2015 21:04:03 +0200 Subject: [PATCH] =str 15707 Define default names for sources and sinks * Not intended to close the ticket, but improve the situation somewhat by defining default names of all sources and sinks. * The stage names (actor names) are still rather weird For example the following Source.single(1).named("aa") .map(identity).named("bb") .map(identity).named("cc") .runWith(Sink.publisher) is materilaized with names: flow-1-0-cc-bb-aa-singleSource : akka.stream.impl.PublisherSource@1787f2a0 flow-1-1-cc-bb-map : Map(,OperationAttributes(List(Name(map)))) flow-1-2-cc-map : Map(,OperationAttributes(List(Name(map)))) flow-1-3-publisherSink : PublisherSink but that is out of scope for this commit --- .../SubstreamSubscriptionTimeoutSpec.scala | 2 +- .../akka/stream/OperationAttributes.scala | 19 ++++++++++-- .../main/scala/akka/stream/impl/Stages.scala | 24 +++++++++++++++ .../scala/akka/stream/scaladsl/Sink.scala | 26 +++++++++------- .../scala/akka/stream/scaladsl/Source.scala | 30 ++++++++++--------- 5 files changed, 72 insertions(+), 29 deletions(-) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SubstreamSubscriptionTimeoutSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SubstreamSubscriptionTimeoutSpec.scala index 74181f942b..dc2984f86e 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SubstreamSubscriptionTimeoutSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SubstreamSubscriptionTimeoutSpec.scala @@ -145,7 +145,7 @@ class SubstreamSubscriptionTimeoutSpec(conf: String) extends AkkaSpec(conf) { private def watchGroupByActor(flowNr: Int): ActorRef = { implicit val t = Timeout(300.millis) import akka.pattern.ask - val path = s"/user/$$a/flow-${flowNr}-1-groupBy" + val path = s"/user/$$a/flow-${flowNr}-1-publisherSource-groupBy" val gropByPath = system.actorSelection(path) val groupByActor = try { Await.result((gropByPath ? Identify("")).mapTo[ActorIdentity], 300.millis).ref.get diff --git a/akka-stream/src/main/scala/akka/stream/OperationAttributes.scala b/akka-stream/src/main/scala/akka/stream/OperationAttributes.scala index 4007489864..d24d4b24cf 100644 --- a/akka-stream/src/main/scala/akka/stream/OperationAttributes.scala +++ b/akka-stream/src/main/scala/akka/stream/OperationAttributes.scala @@ -62,9 +62,22 @@ final case class OperationAttributes private (attributes: immutable.Seq[Operatio * INTERNAL API */ private[akka] def nameLifted: Option[String] = - attributes.collect { - case Name(name) ⇒ name - }.reduceOption(_ + "-" + _) // FIXME don't do a double-traversal, use a fold instead + if (attributes.isEmpty) + None + else { + val sb = new java.lang.StringBuilder + val iter = attributes.iterator + while (iter.hasNext) { + iter.next() match { + case Name(name) ⇒ + if (sb.length == 0) sb.append(name) + else sb.append("-").append(name) + case _ ⇒ + } + } + if (sb.length == 0) None + else Some(sb.toString) + } /** * INTERNAL API diff --git a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala index b7446bf8fd..73dd93339d 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala @@ -54,6 +54,30 @@ private[stream] object Stages { val flexiMerge = name("flexiMerge") val flexiRoute = name("flexiRoute") val identityJunction = name("identityJunction") + + val publisherSource = name("publisherSource") + val iterableSource = name("iterableSource") + val futureSource = name("futureSource") + val tickSource = name("tickSource") + val singleSource = name("singleSource") + val repeat = name("repeat") + val emptySource = name("emptySource") + val lazyEmptySource = name("lazyEmptySource") + val failedSource = name("failedSource") + val concatSource = name("concatSource") + val concatMatSource = name("concatMatSource") + val subscriberSource = name("subscriberSource") + val actorPublisherSource = name("actorPublisherSource") + val actorRefSource = name("actorRefSource") + + val subscriberSink = name("subscriberSink") + val cancelledSink = name("cancelledSink") + val headSink = name("headSink") + val publisherSink = name("publisherSink") + val fanoutPublisherSink = name("fanoutPublisherSink") + val ignoreSink = name("ignoreSink") + val actorRefSink = name("actorRefSink") + val actorSubscriberSink = name("actorSubscriberSink") } import DefaultAttributes._ diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala index 37d36a2089..b9fe7c1b4c 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala @@ -6,6 +6,7 @@ package akka.stream.scaladsl import akka.stream.javadsl import akka.actor.{ ActorRef, Props } import akka.stream.impl._ +import akka.stream.impl.Stages.DefaultAttributes import akka.stream.{ SinkShape, Inlet, Outlet, Graph, OperationAttributes } import akka.stream.OperationAttributes._ import akka.stream.stage.{ TerminationDirective, Directive, Context, PushStage } @@ -63,36 +64,39 @@ object Sink extends SinkApply { * Helper to create [[Sink]] from `Subscriber`. */ def apply[T](subscriber: Subscriber[T]): Sink[T, Unit] = - new Sink(new SubscriberSink(subscriber, none, shape("SubscriberSink"))) + new Sink(new SubscriberSink(subscriber, DefaultAttributes.subscriberSink, shape("SubscriberSink"))) /** * A `Sink` that immediately cancels its upstream after materialization. */ - def cancelled[T]: Sink[T, Unit] = new Sink[Any, Unit](new CancelSink(none, shape("CancelledSink"))) + def cancelled[T]: Sink[T, Unit] = + new Sink[Any, Unit](new CancelSink(DefaultAttributes.cancelledSink, shape("CancelledSink"))) /** * A `Sink` that materializes into a `Future` of the first value received. */ - def head[T]: Sink[T, Future[T]] = new Sink(new HeadSink[T](none, shape("HeadSink"))) + def head[T]: Sink[T, Future[T]] = new Sink(new HeadSink[T](DefaultAttributes.headSink, shape("HeadSink"))) /** * A `Sink` that materializes into a [[org.reactivestreams.Publisher]]. * that can handle one [[org.reactivestreams.Subscriber]]. */ - def publisher[T]: Sink[T, Publisher[T]] = new Sink(new PublisherSink[T](none, shape("PublisherSink"))) + def publisher[T]: Sink[T, Publisher[T]] = + new Sink(new PublisherSink[T](DefaultAttributes.publisherSink, shape("PublisherSink"))) /** * A `Sink` that materializes into a [[org.reactivestreams.Publisher]] * that can handle more than one [[org.reactivestreams.Subscriber]]. */ def fanoutPublisher[T](initialBufferSize: Int, maximumBufferSize: Int): Sink[T, Publisher[T]] = - new Sink(new FanoutPublisherSink[T](initialBufferSize, maximumBufferSize, none, shape("FanoutPublisherSink"))) + new Sink(new FanoutPublisherSink[T](initialBufferSize, maximumBufferSize, DefaultAttributes.fanoutPublisherSink, + shape("FanoutPublisherSink"))) /** * A `Sink` that will consume the stream and discard the elements. */ def ignore: Sink[Any, Unit] = - new Sink(new BlackholeSink(none, shape("BlackholeSink"))) + new Sink(new BlackholeSink(DefaultAttributes.ignoreSink, shape("BlackholeSink"))) /** * A `Sink` that will invoke the given procedure for each received element. The sink is materialized @@ -129,7 +133,7 @@ object Sink extends SinkApply { (stage, promise.future) } - Flow[T].transformMaterializing(newForeachStage).to(Sink.ignore).named("ForeachSink") + Flow[T].transformMaterializing(newForeachStage).to(Sink.ignore).named("foreachSink") } /** @@ -172,7 +176,7 @@ object Sink extends SinkApply { (stage, promise.future) } - Flow[T].transformMaterializing(newFoldStage).to(Sink.ignore).named("FoldSink") + Flow[T].transformMaterializing(newFoldStage).to(Sink.ignore).named("foldSink") } /** @@ -196,7 +200,7 @@ object Sink extends SinkApply { } } - Flow[T].transform(newOnCompleteStage).to(Sink.ignore).named("OnCompleteSink") + Flow[T].transform(newOnCompleteStage).to(Sink.ignore).named("onCompleteSink") } /** @@ -215,7 +219,7 @@ object Sink extends SinkApply { * limiting stage in front of this `Sink`. */ def actorRef[T](ref: ActorRef, onCompleteMessage: Any): Sink[T, Unit] = - new Sink(new ActorRefSink(ref, onCompleteMessage, none, shape("ActorRefSink"))) + new Sink(new ActorRefSink(ref, onCompleteMessage, DefaultAttributes.actorRefSink, shape("ActorRefSink"))) /** * Creates a `Sink` that is materialized to an [[akka.actor.ActorRef]] which points to an Actor @@ -223,6 +227,6 @@ object Sink extends SinkApply { * be [[akka.stream.actor.ActorSubscriber]]. */ def actorSubscriber[T](props: Props): Sink[T, ActorRef] = - new Sink(new ActorSubscriberSink(props, none, shape("ActorSubscriberSink"))) + new Sink(new ActorSubscriberSink(props, DefaultAttributes.actorSubscriberSink, shape("ActorSubscriberSink"))) } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index 68d55b3e6d..675b126dfd 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -5,6 +5,7 @@ package akka.stream.scaladsl import akka.stream.javadsl import akka.stream.impl.Stages.{ MaterializingStageFactory, StageModule } +import akka.stream.impl.Stages.DefaultAttributes import akka.stream.{ SourceShape, Inlet, Outlet } import akka.stream.impl.StreamLayout.{ EmptyModule, Module } import akka.stream.stage.{ TerminationDirective, Directive, Context, PushPullStage } @@ -173,7 +174,7 @@ object Source extends SourceApply { * back-pressure upstream. */ def apply[T](publisher: Publisher[T]): Source[T, Unit] = - new Source(new PublisherSource(publisher, none, shape("PublisherSource"))) + new Source(new PublisherSource(publisher, DefaultAttributes.publisherSource, shape("PublisherSource"))) /** * Helper to create [[Source]] from `Iterator`. @@ -237,7 +238,7 @@ object Source extends SourceApply { } } - }).named("IterableSource") + }).withAttributes(DefaultAttributes.iterableSource) } /** @@ -247,7 +248,7 @@ object Source extends SourceApply { * The stream terminates with a failure if the `Future` is completed with a failure. */ def apply[T](future: Future[T]): Source[T, Unit] = - new Source(new FutureSource(future, none, shape("FutureSource"))) + new Source(new FutureSource(future, DefaultAttributes.futureSource, shape("FutureSource"))) /** * Elements are emitted periodically with the specified interval. @@ -257,26 +258,26 @@ object Source extends SourceApply { * receive new tick elements as soon as it has requested more elements. */ def apply[T](initialDelay: FiniteDuration, interval: FiniteDuration, tick: T): Source[T, Cancellable] = - new Source(new TickSource(initialDelay, interval, tick, none, shape("TickSource"))) + new Source(new TickSource(initialDelay, interval, tick, DefaultAttributes.tickSource, shape("TickSource"))) /** * Create a `Source` with one element. * Every connected `Sink` of this stream will see an individual stream consisting of one element. */ def single[T](element: T): Source[T, Unit] = - apply(SynchronousIterablePublisher(List(element), "SingleSource")) // FIXME optimize + apply(SynchronousIterablePublisher(List(element), "SingleSource")).withAttributes(DefaultAttributes.singleSource) // FIXME optimize /** * Create a `Source` that will continually emit the given element. */ def repeat[T](element: T): Source[T, Unit] = - apply(() ⇒ Iterator.continually(element)) // FIXME optimize + apply(() ⇒ Iterator.continually(element)).withAttributes(DefaultAttributes.repeat) // FIXME optimize /** * A `Source` with no elements, i.e. an empty stream that is completed immediately for every connected `Sink`. */ def empty[T]: Source[T, Unit] = _empty - private[this] val _empty: Source[Nothing, Unit] = apply(EmptyPublisher) + private[this] val _empty: Source[Nothing, Unit] = apply(EmptyPublisher).withAttributes(DefaultAttributes.emptySource) /** * Create a `Source` with no elements, which does not complete its downstream, @@ -288,12 +289,13 @@ object Source extends SourceApply { * to its downstream. */ def lazyEmpty[T]: Source[T, Promise[Unit]] = - new Source(new LazyEmptySource[T](none, shape("LazyEmptySource"))) + new Source(new LazyEmptySource[T](DefaultAttributes.lazyEmptySource, shape("LazyEmptySource"))) /** * Create a `Source` that immediately ends the stream with the `cause` error to every connected `Sink`. */ - def failed[T](cause: Throwable): Source[T, Unit] = apply(ErrorPublisher(cause, "FailedSource")) + def failed[T](cause: Throwable): Source[T, Unit] = + apply(ErrorPublisher(cause, "FailedSource")).withAttributes(DefaultAttributes.failedSource) /** * Concatenates two sources so that the first element @@ -301,7 +303,7 @@ object Source extends SourceApply { * source. */ def concat[T, Mat1, Mat2](source1: Source[T, Mat1], source2: Source[T, Mat2]): Source[T, (Mat1, Mat2)] = - concatMat(source1, source2)(Keep.both) + concatMat(source1, source2)(Keep.both).withAttributes(DefaultAttributes.concatSource) /** * Concatenates two sources so that the first element @@ -317,13 +319,13 @@ object Source extends SourceApply { s1.outlet ~> c.in(0) s2.outlet ~> c.in(1) SourceShape(c.out) - }) + }).withAttributes(DefaultAttributes.concatMatSource) /** * Creates a `Source` that is materialized as a [[org.reactivestreams.Subscriber]] */ def subscriber[T]: Source[T, Subscriber[T]] = - new Source(new SubscriberSource[T](none, shape("SubscriberSource"))) + new Source(new SubscriberSource[T](DefaultAttributes.subscriberSource, shape("SubscriberSource"))) /** * Creates a `Source` that is materialized to an [[akka.actor.ActorRef]] which points to an Actor @@ -331,7 +333,7 @@ object Source extends SourceApply { * be [[akka.stream.actor.ActorPublisher]]. */ def actorPublisher[T](props: Props): Source[T, ActorRef] = - new Source(new ActorPublisherSource(props, none, shape("ActorPublisherSource"))) + new Source(new ActorPublisherSource(props, DefaultAttributes.actorPublisherSource, shape("ActorPublisherSource"))) /** * Creates a `Source` that is materialized as an [[akka.actor.ActorRef]]. @@ -360,7 +362,7 @@ object Source extends SourceApply { def actorRef[T](bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, ActorRef] = { require(bufferSize >= 0, "bufferSize must be greater than or equal to 0") require(overflowStrategy != OverflowStrategy.Backpressure, "Backpressure overflowStrategy not supported") - new Source(new ActorRefSource(bufferSize, overflowStrategy, none, shape("ActorRefSource"))) + new Source(new ActorRefSource(bufferSize, overflowStrategy, DefaultAttributes.actorRefSource, shape("ActorRefSource"))) } }