=str 15707 Define default names for sources and sinks

* Not intended to close the ticket, but improve the situation somewhat
  by defining default names of all sources and sinks.
* The stage names (actor names) are still rather weird

For example the following

    Source.single(1).named("aa")
      .map(identity).named("bb")
      .map(identity).named("cc")
      .runWith(Sink.publisher)

is materilaized with names:
flow-1-0-cc-bb-aa-singleSource : akka.stream.impl.PublisherSource@1787f2a0
flow-1-1-cc-bb-map : Map(<function1>,OperationAttributes(List(Name(map))))
flow-1-2-cc-map : Map(<function1>,OperationAttributes(List(Name(map))))
flow-1-3-publisherSink : PublisherSink

but that is out of scope for this commit
This commit is contained in:
Patrik Nordwall 2015-04-20 21:04:03 +02:00
parent aad8704085
commit 373f1acf3a
5 changed files with 72 additions and 29 deletions

View file

@ -145,7 +145,7 @@ class SubstreamSubscriptionTimeoutSpec(conf: String) extends AkkaSpec(conf) {
private def watchGroupByActor(flowNr: Int): ActorRef = { private def watchGroupByActor(flowNr: Int): ActorRef = {
implicit val t = Timeout(300.millis) implicit val t = Timeout(300.millis)
import akka.pattern.ask import akka.pattern.ask
val path = s"/user/$$a/flow-${flowNr}-1-groupBy" val path = s"/user/$$a/flow-${flowNr}-1-publisherSource-groupBy"
val gropByPath = system.actorSelection(path) val gropByPath = system.actorSelection(path)
val groupByActor = try { val groupByActor = try {
Await.result((gropByPath ? Identify("")).mapTo[ActorIdentity], 300.millis).ref.get Await.result((gropByPath ? Identify("")).mapTo[ActorIdentity], 300.millis).ref.get

View file

@ -62,9 +62,22 @@ final case class OperationAttributes private (attributes: immutable.Seq[Operatio
* INTERNAL API * INTERNAL API
*/ */
private[akka] def nameLifted: Option[String] = private[akka] def nameLifted: Option[String] =
attributes.collect { if (attributes.isEmpty)
case Name(name) name None
}.reduceOption(_ + "-" + _) // FIXME don't do a double-traversal, use a fold instead else {
val sb = new java.lang.StringBuilder
val iter = attributes.iterator
while (iter.hasNext) {
iter.next() match {
case Name(name)
if (sb.length == 0) sb.append(name)
else sb.append("-").append(name)
case _
}
}
if (sb.length == 0) None
else Some(sb.toString)
}
/** /**
* INTERNAL API * INTERNAL API

View file

@ -54,6 +54,30 @@ private[stream] object Stages {
val flexiMerge = name("flexiMerge") val flexiMerge = name("flexiMerge")
val flexiRoute = name("flexiRoute") val flexiRoute = name("flexiRoute")
val identityJunction = name("identityJunction") val identityJunction = name("identityJunction")
val publisherSource = name("publisherSource")
val iterableSource = name("iterableSource")
val futureSource = name("futureSource")
val tickSource = name("tickSource")
val singleSource = name("singleSource")
val repeat = name("repeat")
val emptySource = name("emptySource")
val lazyEmptySource = name("lazyEmptySource")
val failedSource = name("failedSource")
val concatSource = name("concatSource")
val concatMatSource = name("concatMatSource")
val subscriberSource = name("subscriberSource")
val actorPublisherSource = name("actorPublisherSource")
val actorRefSource = name("actorRefSource")
val subscriberSink = name("subscriberSink")
val cancelledSink = name("cancelledSink")
val headSink = name("headSink")
val publisherSink = name("publisherSink")
val fanoutPublisherSink = name("fanoutPublisherSink")
val ignoreSink = name("ignoreSink")
val actorRefSink = name("actorRefSink")
val actorSubscriberSink = name("actorSubscriberSink")
} }
import DefaultAttributes._ import DefaultAttributes._

View file

@ -6,6 +6,7 @@ package akka.stream.scaladsl
import akka.stream.javadsl import akka.stream.javadsl
import akka.actor.{ ActorRef, Props } import akka.actor.{ ActorRef, Props }
import akka.stream.impl._ import akka.stream.impl._
import akka.stream.impl.Stages.DefaultAttributes
import akka.stream.{ SinkShape, Inlet, Outlet, Graph, OperationAttributes } import akka.stream.{ SinkShape, Inlet, Outlet, Graph, OperationAttributes }
import akka.stream.OperationAttributes._ import akka.stream.OperationAttributes._
import akka.stream.stage.{ TerminationDirective, Directive, Context, PushStage } import akka.stream.stage.{ TerminationDirective, Directive, Context, PushStage }
@ -63,36 +64,39 @@ object Sink extends SinkApply {
* Helper to create [[Sink]] from `Subscriber`. * Helper to create [[Sink]] from `Subscriber`.
*/ */
def apply[T](subscriber: Subscriber[T]): Sink[T, Unit] = def apply[T](subscriber: Subscriber[T]): Sink[T, Unit] =
new Sink(new SubscriberSink(subscriber, none, shape("SubscriberSink"))) new Sink(new SubscriberSink(subscriber, DefaultAttributes.subscriberSink, shape("SubscriberSink")))
/** /**
* A `Sink` that immediately cancels its upstream after materialization. * A `Sink` that immediately cancels its upstream after materialization.
*/ */
def cancelled[T]: Sink[T, Unit] = new Sink[Any, Unit](new CancelSink(none, shape("CancelledSink"))) def cancelled[T]: Sink[T, Unit] =
new Sink[Any, Unit](new CancelSink(DefaultAttributes.cancelledSink, shape("CancelledSink")))
/** /**
* A `Sink` that materializes into a `Future` of the first value received. * A `Sink` that materializes into a `Future` of the first value received.
*/ */
def head[T]: Sink[T, Future[T]] = new Sink(new HeadSink[T](none, shape("HeadSink"))) def head[T]: Sink[T, Future[T]] = new Sink(new HeadSink[T](DefaultAttributes.headSink, shape("HeadSink")))
/** /**
* A `Sink` that materializes into a [[org.reactivestreams.Publisher]]. * A `Sink` that materializes into a [[org.reactivestreams.Publisher]].
* that can handle one [[org.reactivestreams.Subscriber]]. * that can handle one [[org.reactivestreams.Subscriber]].
*/ */
def publisher[T]: Sink[T, Publisher[T]] = new Sink(new PublisherSink[T](none, shape("PublisherSink"))) def publisher[T]: Sink[T, Publisher[T]] =
new Sink(new PublisherSink[T](DefaultAttributes.publisherSink, shape("PublisherSink")))
/** /**
* A `Sink` that materializes into a [[org.reactivestreams.Publisher]] * A `Sink` that materializes into a [[org.reactivestreams.Publisher]]
* that can handle more than one [[org.reactivestreams.Subscriber]]. * that can handle more than one [[org.reactivestreams.Subscriber]].
*/ */
def fanoutPublisher[T](initialBufferSize: Int, maximumBufferSize: Int): Sink[T, Publisher[T]] = def fanoutPublisher[T](initialBufferSize: Int, maximumBufferSize: Int): Sink[T, Publisher[T]] =
new Sink(new FanoutPublisherSink[T](initialBufferSize, maximumBufferSize, none, shape("FanoutPublisherSink"))) new Sink(new FanoutPublisherSink[T](initialBufferSize, maximumBufferSize, DefaultAttributes.fanoutPublisherSink,
shape("FanoutPublisherSink")))
/** /**
* A `Sink` that will consume the stream and discard the elements. * A `Sink` that will consume the stream and discard the elements.
*/ */
def ignore: Sink[Any, Unit] = def ignore: Sink[Any, Unit] =
new Sink(new BlackholeSink(none, shape("BlackholeSink"))) new Sink(new BlackholeSink(DefaultAttributes.ignoreSink, shape("BlackholeSink")))
/** /**
* A `Sink` that will invoke the given procedure for each received element. The sink is materialized * A `Sink` that will invoke the given procedure for each received element. The sink is materialized
@ -129,7 +133,7 @@ object Sink extends SinkApply {
(stage, promise.future) (stage, promise.future)
} }
Flow[T].transformMaterializing(newForeachStage).to(Sink.ignore).named("ForeachSink") Flow[T].transformMaterializing(newForeachStage).to(Sink.ignore).named("foreachSink")
} }
/** /**
@ -172,7 +176,7 @@ object Sink extends SinkApply {
(stage, promise.future) (stage, promise.future)
} }
Flow[T].transformMaterializing(newFoldStage).to(Sink.ignore).named("FoldSink") Flow[T].transformMaterializing(newFoldStage).to(Sink.ignore).named("foldSink")
} }
/** /**
@ -196,7 +200,7 @@ object Sink extends SinkApply {
} }
} }
Flow[T].transform(newOnCompleteStage).to(Sink.ignore).named("OnCompleteSink") Flow[T].transform(newOnCompleteStage).to(Sink.ignore).named("onCompleteSink")
} }
/** /**
@ -215,7 +219,7 @@ object Sink extends SinkApply {
* limiting stage in front of this `Sink`. * limiting stage in front of this `Sink`.
*/ */
def actorRef[T](ref: ActorRef, onCompleteMessage: Any): Sink[T, Unit] = def actorRef[T](ref: ActorRef, onCompleteMessage: Any): Sink[T, Unit] =
new Sink(new ActorRefSink(ref, onCompleteMessage, none, shape("ActorRefSink"))) new Sink(new ActorRefSink(ref, onCompleteMessage, DefaultAttributes.actorRefSink, shape("ActorRefSink")))
/** /**
* Creates a `Sink` that is materialized to an [[akka.actor.ActorRef]] which points to an Actor * Creates a `Sink` that is materialized to an [[akka.actor.ActorRef]] which points to an Actor
@ -223,6 +227,6 @@ object Sink extends SinkApply {
* be [[akka.stream.actor.ActorSubscriber]]. * be [[akka.stream.actor.ActorSubscriber]].
*/ */
def actorSubscriber[T](props: Props): Sink[T, ActorRef] = def actorSubscriber[T](props: Props): Sink[T, ActorRef] =
new Sink(new ActorSubscriberSink(props, none, shape("ActorSubscriberSink"))) new Sink(new ActorSubscriberSink(props, DefaultAttributes.actorSubscriberSink, shape("ActorSubscriberSink")))
} }

View file

@ -5,6 +5,7 @@ package akka.stream.scaladsl
import akka.stream.javadsl import akka.stream.javadsl
import akka.stream.impl.Stages.{ MaterializingStageFactory, StageModule } import akka.stream.impl.Stages.{ MaterializingStageFactory, StageModule }
import akka.stream.impl.Stages.DefaultAttributes
import akka.stream.{ SourceShape, Inlet, Outlet } import akka.stream.{ SourceShape, Inlet, Outlet }
import akka.stream.impl.StreamLayout.{ EmptyModule, Module } import akka.stream.impl.StreamLayout.{ EmptyModule, Module }
import akka.stream.stage.{ TerminationDirective, Directive, Context, PushPullStage } import akka.stream.stage.{ TerminationDirective, Directive, Context, PushPullStage }
@ -173,7 +174,7 @@ object Source extends SourceApply {
* back-pressure upstream. * back-pressure upstream.
*/ */
def apply[T](publisher: Publisher[T]): Source[T, Unit] = def apply[T](publisher: Publisher[T]): Source[T, Unit] =
new Source(new PublisherSource(publisher, none, shape("PublisherSource"))) new Source(new PublisherSource(publisher, DefaultAttributes.publisherSource, shape("PublisherSource")))
/** /**
* Helper to create [[Source]] from `Iterator`. * Helper to create [[Source]] from `Iterator`.
@ -237,7 +238,7 @@ object Source extends SourceApply {
} }
} }
}).named("IterableSource") }).withAttributes(DefaultAttributes.iterableSource)
} }
/** /**
@ -247,7 +248,7 @@ object Source extends SourceApply {
* The stream terminates with a failure if the `Future` is completed with a failure. * The stream terminates with a failure if the `Future` is completed with a failure.
*/ */
def apply[T](future: Future[T]): Source[T, Unit] = def apply[T](future: Future[T]): Source[T, Unit] =
new Source(new FutureSource(future, none, shape("FutureSource"))) new Source(new FutureSource(future, DefaultAttributes.futureSource, shape("FutureSource")))
/** /**
* Elements are emitted periodically with the specified interval. * Elements are emitted periodically with the specified interval.
@ -257,26 +258,26 @@ object Source extends SourceApply {
* receive new tick elements as soon as it has requested more elements. * receive new tick elements as soon as it has requested more elements.
*/ */
def apply[T](initialDelay: FiniteDuration, interval: FiniteDuration, tick: T): Source[T, Cancellable] = def apply[T](initialDelay: FiniteDuration, interval: FiniteDuration, tick: T): Source[T, Cancellable] =
new Source(new TickSource(initialDelay, interval, tick, none, shape("TickSource"))) new Source(new TickSource(initialDelay, interval, tick, DefaultAttributes.tickSource, shape("TickSource")))
/** /**
* Create a `Source` with one element. * Create a `Source` with one element.
* Every connected `Sink` of this stream will see an individual stream consisting of one element. * Every connected `Sink` of this stream will see an individual stream consisting of one element.
*/ */
def single[T](element: T): Source[T, Unit] = def single[T](element: T): Source[T, Unit] =
apply(SynchronousIterablePublisher(List(element), "SingleSource")) // FIXME optimize apply(SynchronousIterablePublisher(List(element), "SingleSource")).withAttributes(DefaultAttributes.singleSource) // FIXME optimize
/** /**
* Create a `Source` that will continually emit the given element. * Create a `Source` that will continually emit the given element.
*/ */
def repeat[T](element: T): Source[T, Unit] = def repeat[T](element: T): Source[T, Unit] =
apply(() Iterator.continually(element)) // FIXME optimize apply(() Iterator.continually(element)).withAttributes(DefaultAttributes.repeat) // FIXME optimize
/** /**
* A `Source` with no elements, i.e. an empty stream that is completed immediately for every connected `Sink`. * A `Source` with no elements, i.e. an empty stream that is completed immediately for every connected `Sink`.
*/ */
def empty[T]: Source[T, Unit] = _empty def empty[T]: Source[T, Unit] = _empty
private[this] val _empty: Source[Nothing, Unit] = apply(EmptyPublisher) private[this] val _empty: Source[Nothing, Unit] = apply(EmptyPublisher).withAttributes(DefaultAttributes.emptySource)
/** /**
* Create a `Source` with no elements, which does not complete its downstream, * Create a `Source` with no elements, which does not complete its downstream,
@ -288,12 +289,13 @@ object Source extends SourceApply {
* to its downstream. * to its downstream.
*/ */
def lazyEmpty[T]: Source[T, Promise[Unit]] = def lazyEmpty[T]: Source[T, Promise[Unit]] =
new Source(new LazyEmptySource[T](none, shape("LazyEmptySource"))) new Source(new LazyEmptySource[T](DefaultAttributes.lazyEmptySource, shape("LazyEmptySource")))
/** /**
* Create a `Source` that immediately ends the stream with the `cause` error to every connected `Sink`. * Create a `Source` that immediately ends the stream with the `cause` error to every connected `Sink`.
*/ */
def failed[T](cause: Throwable): Source[T, Unit] = apply(ErrorPublisher(cause, "FailedSource")) def failed[T](cause: Throwable): Source[T, Unit] =
apply(ErrorPublisher(cause, "FailedSource")).withAttributes(DefaultAttributes.failedSource)
/** /**
* Concatenates two sources so that the first element * Concatenates two sources so that the first element
@ -301,7 +303,7 @@ object Source extends SourceApply {
* source. * source.
*/ */
def concat[T, Mat1, Mat2](source1: Source[T, Mat1], source2: Source[T, Mat2]): Source[T, (Mat1, Mat2)] = def concat[T, Mat1, Mat2](source1: Source[T, Mat1], source2: Source[T, Mat2]): Source[T, (Mat1, Mat2)] =
concatMat(source1, source2)(Keep.both) concatMat(source1, source2)(Keep.both).withAttributes(DefaultAttributes.concatSource)
/** /**
* Concatenates two sources so that the first element * Concatenates two sources so that the first element
@ -317,13 +319,13 @@ object Source extends SourceApply {
s1.outlet ~> c.in(0) s1.outlet ~> c.in(0)
s2.outlet ~> c.in(1) s2.outlet ~> c.in(1)
SourceShape(c.out) SourceShape(c.out)
}) }).withAttributes(DefaultAttributes.concatMatSource)
/** /**
* Creates a `Source` that is materialized as a [[org.reactivestreams.Subscriber]] * Creates a `Source` that is materialized as a [[org.reactivestreams.Subscriber]]
*/ */
def subscriber[T]: Source[T, Subscriber[T]] = def subscriber[T]: Source[T, Subscriber[T]] =
new Source(new SubscriberSource[T](none, shape("SubscriberSource"))) new Source(new SubscriberSource[T](DefaultAttributes.subscriberSource, shape("SubscriberSource")))
/** /**
* Creates a `Source` that is materialized to an [[akka.actor.ActorRef]] which points to an Actor * Creates a `Source` that is materialized to an [[akka.actor.ActorRef]] which points to an Actor
@ -331,7 +333,7 @@ object Source extends SourceApply {
* be [[akka.stream.actor.ActorPublisher]]. * be [[akka.stream.actor.ActorPublisher]].
*/ */
def actorPublisher[T](props: Props): Source[T, ActorRef] = def actorPublisher[T](props: Props): Source[T, ActorRef] =
new Source(new ActorPublisherSource(props, none, shape("ActorPublisherSource"))) new Source(new ActorPublisherSource(props, DefaultAttributes.actorPublisherSource, shape("ActorPublisherSource")))
/** /**
* Creates a `Source` that is materialized as an [[akka.actor.ActorRef]]. * Creates a `Source` that is materialized as an [[akka.actor.ActorRef]].
@ -360,7 +362,7 @@ object Source extends SourceApply {
def actorRef[T](bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, ActorRef] = { def actorRef[T](bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, ActorRef] = {
require(bufferSize >= 0, "bufferSize must be greater than or equal to 0") require(bufferSize >= 0, "bufferSize must be greater than or equal to 0")
require(overflowStrategy != OverflowStrategy.Backpressure, "Backpressure overflowStrategy not supported") require(overflowStrategy != OverflowStrategy.Backpressure, "Backpressure overflowStrategy not supported")
new Source(new ActorRefSource(bufferSize, overflowStrategy, none, shape("ActorRefSource"))) new Source(new ActorRefSource(bufferSize, overflowStrategy, DefaultAttributes.actorRefSource, shape("ActorRefSource")))
} }
} }