From 3bb947107296f7c6a4cb4c0023a60b70edd0c626 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 8 May 2014 19:34:58 +0200 Subject: [PATCH] +str #15084 Use sensible flow processor actor names --- .../stream/PersistentPublisher.scala | 5 +- .../scala/akka/stream/FlowMaterializer.scala | 17 +- .../impl/ActorBasedFlowMaterializer.scala | 190 +++++++++++++----- .../scala/akka/stream/impl/FlowImpl.scala | 8 + .../main/scala/akka/stream/io/StreamIO.scala | 11 +- .../scala/akka/stream/scaladsl/Flow.scala | 12 ++ .../scala/akka/stream/ActorProducerTest.scala | 1 - .../scala/akka/stream/FlowConcatSpec.scala | 1 - .../scala/akka/stream/FlowFilterSpec.scala | 4 +- .../scala/akka/stream/FlowGroupBySpec.scala | 5 +- .../test/scala/akka/stream/FlowMapSpec.scala | 2 +- .../scala/akka/stream/FlowMergeSpec.scala | 1 - .../scala/akka/stream/FlowSplitWhenSpec.scala | 5 +- .../test/scala/akka/stream/FlowZipSpec.scala | 1 - .../akka/stream/IdentityProcessorTest.scala | 6 +- .../akka/stream/ProcessorHierarchySpec.scala | 2 +- .../akka/stream/ProcessorNamingSpec.scala | 71 +++++++ .../scala/akka/stream/TwoStreamsSetup.scala | 5 +- .../scala/akka/stream/io/TcpFlowSpec.scala | 5 +- 19 files changed, 277 insertions(+), 75 deletions(-) create mode 100644 akka-stream/src/test/scala/akka/stream/ProcessorNamingSpec.scala diff --git a/akka-stream/src/main/scala/akka/persistence/stream/PersistentPublisher.scala b/akka-stream/src/main/scala/akka/persistence/stream/PersistentPublisher.scala index 438363b633..a6c6aa664b 100644 --- a/akka-stream/src/main/scala/akka/persistence/stream/PersistentPublisher.scala +++ b/akka-stream/src/main/scala/akka/persistence/stream/PersistentPublisher.scala @@ -74,8 +74,9 @@ private object PersistentPublisher { } private case class PersistentPublisherNode(processorId: String, publisherSettings: PersistentPublisherSettings) extends ProducerNode[Persistent] { - def createProducer(settings: MaterializerSettings, context: ActorRefFactory): Producer[Persistent] = - new ActorProducer(context.actorOf(PersistentPublisher.props(processorId, publisherSettings, settings))) + def createProducer(materializer: ActorBasedFlowMaterializer, flowName: String): Producer[Persistent] = + new ActorProducer(materializer.context.actorOf(PersistentPublisher.props(processorId, publisherSettings, materializer.settings), + name = s"$flowName-0-persistentPublisher")) } private class PersistentPublisherImpl(processorId: String, publisherSettings: PersistentPublisherSettings, materializerSettings: MaterializerSettings) diff --git a/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala index c1e2b94af0..e5efbf1b55 100644 --- a/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala @@ -12,15 +12,21 @@ import scala.concurrent.duration._ import org.reactivestreams.api.Consumer object FlowMaterializer { + /** * Creates a FlowMaterializer which will execute every step of a transformation * pipeline within its own [[akka.actor.Actor]]. The required [[akka.actor.ActorRefFactory]] * (which can be either an [[akka.actor.ActorSystem]] or an [[akka.actor.ActorContext]]) * will be used to create these actors, therefore it is *forbidden* to pass this object * to another actor if the factory is an ActorContext. + * + * The `namePrefix` is used as the first part of the names of the actors running + * the processing steps. The default `namePrefix` is `"flow"`. The actor names are built up of + * `namePrefix-flowNumber-flowStepNumber-stepName`. */ - def apply(settings: MaterializerSettings)(implicit context: ActorRefFactory): FlowMaterializer = - new ActorBasedFlowMaterializer(settings, context) + def apply(settings: MaterializerSettings, namePrefix: Option[String] = None)(implicit context: ActorRefFactory): FlowMaterializer = + new ActorBasedFlowMaterializer(settings, context, namePrefix.getOrElse("flow")) + } /** @@ -31,6 +37,13 @@ object FlowMaterializer { * dependent. */ trait FlowMaterializer { + + /** + * The `namePrefix` is used as the first part of the names of the actors running + * the processing steps. + */ + def withNamePrefix(name: String): FlowMaterializer + /** * INTERNAL API * ops are stored in reverse order diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala index 69e98f353a..c94d09d2c3 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala @@ -15,53 +15,83 @@ import scala.util.Try import scala.concurrent.Future import scala.util.Success import scala.util.Failure +import java.util.concurrent.atomic.AtomicLong +import akka.actor.ActorContext +import akka.actor.ExtensionIdProvider +import akka.actor.ExtensionId +import akka.actor.ExtendedActorSystem +import akka.actor.ActorSystem +import akka.actor.Extension /** * INTERNAL API */ private[akka] object Ast { - trait AstNode + trait AstNode { + def name: String + } - case class Transform(transformer: Transformer[Any, Any]) extends AstNode - case class Recover(recoveryTransformer: RecoveryTransformer[Any, Any]) extends AstNode - case class GroupBy(f: Any ⇒ Any) extends AstNode - case class SplitWhen(p: Any ⇒ Boolean) extends AstNode - case class Merge(other: Producer[Any]) extends AstNode - case class Zip(other: Producer[Any]) extends AstNode - case class Concat(next: Producer[Any]) extends AstNode - case class Tee(other: Consumer[Any]) extends AstNode + case class Transform(transformer: Transformer[Any, Any]) extends AstNode { + override def name = transformer.name + } + case class Recover(recoveryTransformer: RecoveryTransformer[Any, Any]) extends AstNode { + override def name = recoveryTransformer.name + } + case class GroupBy(f: Any ⇒ Any) extends AstNode { + override def name = "groupBy" + } + case class SplitWhen(p: Any ⇒ Boolean) extends AstNode { + override def name = "splitWhen" + } + case class Merge(other: Producer[Any]) extends AstNode { + override def name = "merge" + } + case class Zip(other: Producer[Any]) extends AstNode { + override def name = "zip" + } + case class Concat(next: Producer[Any]) extends AstNode { + override def name = "concat" + } + case class Tee(other: Consumer[Any]) extends AstNode { + override def name = "tee" + } trait ProducerNode[I] { - def createProducer(settings: MaterializerSettings, context: ActorRefFactory): Producer[I] + private[akka] def createProducer(materializer: ActorBasedFlowMaterializer, flowName: String): Producer[I] } - case class ExistingProducer[I](producer: Producer[I]) extends ProducerNode[I] { - def createProducer(settings: MaterializerSettings, context: ActorRefFactory) = producer + final case class ExistingProducer[I](producer: Producer[I]) extends ProducerNode[I] { + def createProducer(materializer: ActorBasedFlowMaterializer, flowName: String) = producer } - case class IteratorProducerNode[I](iterator: Iterator[I]) extends ProducerNode[I] { - def createProducer(settings: MaterializerSettings, context: ActorRefFactory): Producer[I] = + final case class IteratorProducerNode[I](iterator: Iterator[I]) extends ProducerNode[I] { + final def createProducer(materializer: ActorBasedFlowMaterializer, flowName: String): Producer[I] = if (iterator.isEmpty) EmptyProducer.asInstanceOf[Producer[I]] - else new ActorProducer[I](context.actorOf(IteratorProducer.props(iterator, settings))) + else new ActorProducer[I](materializer.context.actorOf(IteratorProducer.props(iterator, materializer.settings), + name = s"$flowName-0-iterator")) } - case class IterableProducerNode[I](iterable: immutable.Iterable[I]) extends ProducerNode[I] { - def createProducer(settings: MaterializerSettings, context: ActorRefFactory): Producer[I] = + final case class IterableProducerNode[I](iterable: immutable.Iterable[I]) extends ProducerNode[I] { + def createProducer(materializer: ActorBasedFlowMaterializer, flowName: String): Producer[I] = if (iterable.isEmpty) EmptyProducer.asInstanceOf[Producer[I]] - else new ActorProducer[I](context.actorOf(IterableProducer.props(iterable, settings))) + else new ActorProducer[I](materializer.context.actorOf(IterableProducer.props(iterable, materializer.settings), + name = s"$flowName-0-iterable")) } - case class ThunkProducerNode[I](f: () ⇒ I) extends ProducerNode[I] { - def createProducer(settings: MaterializerSettings, context: ActorRefFactory): Producer[I] = - new ActorProducer(context.actorOf(ActorProducer.props(settings, f))) + final case class ThunkProducerNode[I](f: () ⇒ I) extends ProducerNode[I] { + def createProducer(materializer: ActorBasedFlowMaterializer, flowName: String): Producer[I] = + new ActorProducer(materializer.context.actorOf(ActorProducer.props(materializer.settings, f), + name = s"$flowName-0-thunk")) } - case class FutureProducerNode[I](future: Future[I]) extends ProducerNode[I] { - def createProducer(settings: MaterializerSettings, context: ActorRefFactory): Producer[I] = + final case class FutureProducerNode[I](future: Future[I]) extends ProducerNode[I] { + def createProducer(materializer: ActorBasedFlowMaterializer, flowName: String): Producer[I] = future.value match { case Some(Success(element)) ⇒ - new ActorProducer[I](context.actorOf(IterableProducer.props(List(element), settings))) + new ActorProducer[I](materializer.context.actorOf(IterableProducer.props(List(element), materializer.settings), + name = s"$flowName-0-future")) case Some(Failure(t)) ⇒ new ErrorProducer(t).asInstanceOf[Producer[I]] case None ⇒ - new ActorProducer[I](context.actorOf(FutureProducer.props(future, settings))) + new ActorProducer[I](materializer.context.actorOf(FutureProducer.props(future, materializer.settings), + name = s"$flowName-0-future")) } } } @@ -80,37 +110,72 @@ private[akka] object ActorBasedFlowMaterializer { finally ctx.set(old) } + def currentActorContext(): ActorContext = + ActorBasedFlowMaterializer.ctx.get() match { + case c: ActorContext ⇒ c + case _ ⇒ + throw new IllegalStateException(s"Transformer [${getClass.getName}] is running without ActorContext") + } + } /** * INTERNAL API */ -private[akka] class ActorBasedFlowMaterializer(settings: MaterializerSettings, _context: ActorRefFactory) extends FlowMaterializer { +private[akka] class ActorBasedFlowMaterializer( + val settings: MaterializerSettings, + _context: ActorRefFactory, + namePrefix: String) + extends FlowMaterializer { import Ast._ import ActorBasedFlowMaterializer._ - private def context = ctx.get() match { + _context match { + case _: ActorSystem | _: ActorContext ⇒ // ok + case null ⇒ throw new IllegalArgumentException("ActorRefFactory context must be defined") + case _ ⇒ throw new IllegalArgumentException(s"ActorRefFactory context must be a ActorSystem or ActorContext, " + + "got [${_contex.getClass.getName}]") + } + + def context = ctx.get() match { case null ⇒ _context case x ⇒ x } - @tailrec private def processorChain(topConsumer: Consumer[_], ops: immutable.Seq[AstNode]): Consumer[_] = { + private def system: ActorSystem = _context match { + case s: ExtendedActorSystem ⇒ s + case c: ActorContext ⇒ c.system + case _ ⇒ + throw new IllegalArgumentException(s"Unknown ActorRefFactory [${_context.getClass.getName}") + } + + private def nextFlowNameCount(): Long = FlowNameCounter(system).counter.incrementAndGet() + + def withNamePrefix(name: String): FlowMaterializer = + new ActorBasedFlowMaterializer(settings, _context, name) + + private def createFlowName(): String = s"$namePrefix-${nextFlowNameCount()}" + + @tailrec private def processorChain(topConsumer: Consumer[_], ops: immutable.Seq[AstNode], + flowName: String, n: Int): Consumer[_] = { ops match { case op :: tail ⇒ - val opProcessor: Processor[Any, Any] = processorForNode(op) + val opProcessor: Processor[Any, Any] = processorForNode(op, flowName, n) opProcessor.produceTo(topConsumer.asInstanceOf[Consumer[Any]]) - processorChain(opProcessor, tail) + processorChain(opProcessor, tail, flowName, n - 1) case _ ⇒ topConsumer } } // Ops come in reverse order override def toProducer[I, O](producerNode: ProducerNode[I], ops: List[AstNode]): Producer[O] = { - if (ops.isEmpty) producerNode.createProducer(settings, context).asInstanceOf[Producer[O]] + val flowName = createFlowName() + if (ops.isEmpty) producerNode.createProducer(this, flowName).asInstanceOf[Producer[O]] else { - val opProcessor = processorForNode(ops.head) - val topConsumer = processorChain(opProcessor, ops.tail) - producerNode.createProducer(settings, context).produceTo(topConsumer.asInstanceOf[Consumer[I]]) + val opsSize = ops.size + val opProcessor = processorForNode(ops.head, flowName, opsSize) + val topConsumer = processorChain(opProcessor, ops.tail, flowName, opsSize - 1) + producerNode.createProducer(this, flowName).produceTo(topConsumer.asInstanceOf[Consumer[I]]) opProcessor.asInstanceOf[Producer[O]] } } @@ -126,33 +191,62 @@ private[akka] class ActorBasedFlowMaterializer(settings: MaterializerSettings, _ }) override def consume[I](producerNode: ProducerNode[I], ops: List[AstNode]): Unit = { - val consumer = ops match { - case Nil ⇒ - new ActorConsumer[Any](context.actorOf(ActorConsumer.props(settings, blackholeTransform))) - case head :: tail ⇒ - val c = new ActorConsumer[Any](context.actorOf(ActorConsumer.props(settings, head))) - processorChain(c, tail) - } - producerNode.createProducer(settings, context).produceTo(consumer.asInstanceOf[Consumer[I]]) + val flowName = createFlowName() + val consumer = consume(ops, flowName) + producerNode.createProducer(this, flowName).produceTo(consumer.asInstanceOf[Consumer[I]]) } - def processorForNode(op: AstNode): Processor[Any, Any] = new ActorProcessor(context.actorOf(ActorProcessor.props(settings, op))) + private def consume[In, Out](ops: List[Ast.AstNode], flowName: String): Consumer[In] = { + val c = ops match { + case Nil ⇒ + new ActorConsumer[Any](context.actorOf(ActorConsumer.props(settings, blackholeTransform), + name = s"$flowName-1-consume")) + case head :: tail ⇒ + val opsSize = ops.size + val c = new ActorConsumer[Any](context.actorOf(ActorConsumer.props(settings, head), + name = s"$flowName-$opsSize-${head.name}")) + processorChain(c, tail, flowName, ops.size - 1) + } + c.asInstanceOf[Consumer[In]] + } + + def processorForNode(op: AstNode, flowName: String, n: Int): Processor[Any, Any] = + new ActorProcessor(context.actorOf(ActorProcessor.props(settings, op), + name = s"$flowName-$n-${op.name}")) override def ductProduceTo[In, Out](consumer: Consumer[Out], ops: List[Ast.AstNode]): Consumer[In] = - processorChain(consumer, ops).asInstanceOf[Consumer[In]] + processorChain(consumer, ops, createFlowName(), ops.size).asInstanceOf[Consumer[In]] override def ductConsume[In](ops: List[Ast.AstNode]): Consumer[In] = - ductProduceTo(new ActorConsumer[Any](context.actorOf(ActorConsumer.props(settings, blackholeTransform))), ops) + consume(ops, createFlowName) override def ductBuild[In, Out](ops: List[Ast.AstNode]): (Consumer[In], Producer[Out]) = { + val flowName = createFlowName() if (ops.isEmpty) { - val identityProcessor: Processor[In, Out] = processorForNode(identityTransform).asInstanceOf[Processor[In, Out]] + val identityProcessor: Processor[In, Out] = processorForNode(identityTransform, flowName, 1).asInstanceOf[Processor[In, Out]] (identityProcessor, identityProcessor) } else { - val outProcessor = processorForNode(ops.head).asInstanceOf[Processor[In, Out]] - val topConsumer = processorChain(outProcessor, ops.tail).asInstanceOf[Processor[In, Out]] + val opsSize = ops.size + val outProcessor = processorForNode(ops.head, flowName, opsSize).asInstanceOf[Processor[In, Out]] + val topConsumer = processorChain(outProcessor, ops.tail, flowName, opsSize - 1).asInstanceOf[Processor[In, Out]] (topConsumer, outProcessor) } } } + +/** + * INTERNAL API + */ +private[akka] object FlowNameCounter extends ExtensionId[FlowNameCounter] with ExtensionIdProvider { + override def get(system: ActorSystem): FlowNameCounter = super.get(system) + override def lookup = FlowNameCounter + override def createExtension(system: ExtendedActorSystem): FlowNameCounter = new FlowNameCounter +} + +/** + * INTERNAL API + */ +private[akka] class FlowNameCounter extends Extension { + val counter = new AtomicLong(0) +} \ No newline at end of file diff --git a/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala index 4eec112bc3..0b915b9176 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala @@ -128,11 +128,13 @@ private[akka] trait Builder[Out] { def map[U](f: Out ⇒ U): Thing[U] = transform(new Transformer[Out, U] { override def onNext(in: Out) = List(f(in)) + override def name = "map" }) def filter(p: Out ⇒ Boolean): Thing[Out] = transform(new Transformer[Out, Out] { override def onNext(in: Out) = if (p(in)) List(in) else Nil + override def name = "filter" }) def collect[U](pf: PartialFunction[Out, U]): Thing[U] = @@ -144,6 +146,7 @@ private[akka] trait Builder[Out] { transform(new Transformer[Out, Unit] { override def onNext(in: Out) = { c(in); Nil } override def onComplete() = ListOfUnit + override def name = "foreach" }) def fold[U](zero: U)(f: (U, Out) ⇒ U): Thing[U] = @@ -154,6 +157,7 @@ private[akka] trait Builder[Out] { class FoldTransformer[S](var state: S, f: (S, Out) ⇒ S) extends Transformer[Out, S] { override def onNext(in: Out): immutable.Seq[S] = { state = f(state, in); Nil } override def onComplete(): immutable.Seq[S] = List(state) + override def name = "fold" } def drop(n: Int): Thing[Out] = @@ -171,6 +175,7 @@ private[akka] trait Builder[Out] { } override def onNext(in: Out) = delegate.onNext(in) + override def name = "drop" }) def take(n: Int): Thing[Out] = @@ -189,6 +194,7 @@ private[akka] trait Builder[Out] { override def onNext(in: Out) = delegate.onNext(in) override def isComplete = delegate.isComplete + override def name = "take" }) def grouped(n: Int): Thing[immutable.Seq[Out]] = @@ -204,11 +210,13 @@ private[akka] trait Builder[Out] { Nil } override def onComplete() = if (buf.isEmpty) Nil else List(buf) + override def name = "grouped" }) def mapConcat[U](f: Out ⇒ immutable.Seq[U]): Thing[U] = transform(new Transformer[Out, U] { override def onNext(in: Out) = f(in) + override def name = "mapConcat" }) def transform[U](transformer: Transformer[Out, U]): Thing[U] = diff --git a/akka-stream/src/main/scala/akka/stream/io/StreamIO.scala b/akka-stream/src/main/scala/akka/stream/io/StreamIO.scala index fed901e1da..089b306fd6 100644 --- a/akka-stream/src/main/scala/akka/stream/io/StreamIO.scala +++ b/akka-stream/src/main/scala/akka/stream/io/StreamIO.scala @@ -14,6 +14,7 @@ import akka.io.Tcp import akka.stream.impl.{ ActorPublisher, ExposedPublisher, ActorProcessor } import akka.stream.MaterializerSettings import akka.io.IO +import java.net.URLEncoder object StreamTcp extends ExtensionId[StreamTcpExt] with ExtensionIdProvider { @@ -73,19 +74,25 @@ private[akka] object StreamTcpManager { private[akka] class StreamTcpManager extends Actor { import StreamTcpManager._ + var nameCounter = 0 + def encName(prefix: String, address: InetSocketAddress) = { + nameCounter += 1 + s"$prefix-$nameCounter-${URLEncoder.encode(address.toString, "utf-8")}" + } + def receive: Receive = { case StreamTcp.Connect(remoteAddress, localAddress, options, timeout, settings) ⇒ val processorActor = context.actorOf(TcpStreamActor.outboundProps( Tcp.Connect(remoteAddress, localAddress, options, timeout, pullMode = true), requester = sender(), - settings)) + settings), name = encName("client", remoteAddress)) processorActor ! ExposedProcessor(new ActorProcessor[ByteString, ByteString](processorActor)) case StreamTcp.Bind(localAddress, backlog, options, settings) ⇒ val publisherActor = context.actorOf(TcpListenStreamActor.props( Tcp.Bind(context.system.deadLetters, localAddress, backlog, options, pullMode = true), requester = sender(), - settings)) + settings), name = encName("server", localAddress)) publisherActor ! ExposedPublisher(new ActorPublisher(publisherActor)) } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index b0462c98cd..815167dda6 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -327,6 +327,12 @@ trait Transformer[-T, +U] { * Invoked after normal completion or error. */ def cleanup(): Unit = () + + /** + * Name of this transformation step. Used as part of the actor name. + * Facilitates debugging and logging. + */ + def name: String = "transform" } /** @@ -340,5 +346,11 @@ trait RecoveryTransformer[-T, +U] extends Transformer[T, U] { * sequence of elements before the stream ends. */ def onError(cause: Throwable): immutable.Seq[U] + + /** + * Name of this transformation step. Used as part of the actor name. + * Facilitates debugging and logging. + */ + override def name: String = "transformRecover" } diff --git a/akka-stream/src/test/scala/akka/stream/ActorProducerTest.scala b/akka-stream/src/test/scala/akka/stream/ActorProducerTest.scala index 2473811481..69add49701 100644 --- a/akka-stream/src/test/scala/akka/stream/ActorProducerTest.scala +++ b/akka-stream/src/test/scala/akka/stream/ActorProducerTest.scala @@ -6,7 +6,6 @@ package akka.stream import org.scalatest.testng.TestNGSuiteLike import org.reactivestreams.spi.Publisher import org.reactivestreams.tck.{ TestEnvironment, PublisherVerification } -import akka.stream.impl.ActorBasedFlowMaterializer import org.reactivestreams.api.Producer import akka.stream.scaladsl.Flow import akka.actor.ActorSystem diff --git a/akka-stream/src/test/scala/akka/stream/FlowConcatSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowConcatSpec.scala index c3905f92e5..4a62c28973 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowConcatSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowConcatSpec.scala @@ -3,7 +3,6 @@ */ package akka.stream -import akka.stream.impl.{ IteratorProducer, ActorBasedFlowMaterializer } import akka.stream.testkit.StreamTestKit import akka.stream.scaladsl.Flow import org.reactivestreams.api.Producer diff --git a/akka-stream/src/test/scala/akka/stream/FlowFilterSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowFilterSpec.scala index 1ecd85fa01..ab6853e1db 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowFilterSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowFilterSpec.scala @@ -25,11 +25,11 @@ class FlowFilterSpec extends AkkaSpec with ScriptedTest { } "not blow up with high request counts" in { - val gen = new ActorBasedFlowMaterializer(MaterializerSettings( + val gen = FlowMaterializer(MaterializerSettings( initialInputBufferSize = 1, maximumInputBufferSize = 1, initialFanOutBufferSize = 1, - maxFanOutBufferSize = 1), system) + maxFanOutBufferSize = 1)) val probe = StreamTestKit.consumerProbe[Int] Flow(Iterator.fill(1000)(0) ++ List(1)).filter(_ != 0). diff --git a/akka-stream/src/test/scala/akka/stream/FlowGroupBySpec.scala b/akka-stream/src/test/scala/akka/stream/FlowGroupBySpec.scala index a0e9e664bd..0bf58b6bcc 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowGroupBySpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowGroupBySpec.scala @@ -6,18 +6,17 @@ package akka.stream import scala.concurrent.duration._ import akka.stream.testkit._ import org.reactivestreams.api.Producer -import akka.stream.impl.{ IteratorProducer, ActorBasedFlowMaterializer } import akka.stream.scaladsl.Flow import scala.util.control.NoStackTrace @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class FlowGroupBySpec extends AkkaSpec { - val materializer = new ActorBasedFlowMaterializer(MaterializerSettings( + val materializer = FlowMaterializer(MaterializerSettings( initialInputBufferSize = 2, maximumInputBufferSize = 2, initialFanOutBufferSize = 2, - maxFanOutBufferSize = 2), system) + maxFanOutBufferSize = 2)) case class StreamPuppet(p: Producer[Int]) { val probe = StreamTestKit.consumerProbe[Int] diff --git a/akka-stream/src/test/scala/akka/stream/FlowMapSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowMapSpec.scala index c4b8079a00..ae9f38c34f 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowMapSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowMapSpec.scala @@ -17,7 +17,7 @@ class FlowMapSpec extends AkkaSpec with ScriptedTest { initialFanOutBufferSize = 1, maxFanOutBufferSize = 16) - val gen = new ActorBasedFlowMaterializer(settings, system) + val gen = FlowMaterializer(settings) "A Map" must { diff --git a/akka-stream/src/test/scala/akka/stream/FlowMergeSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowMergeSpec.scala index ad6980737e..9e3041b2e7 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowMergeSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowMergeSpec.scala @@ -7,7 +7,6 @@ import scala.concurrent.duration._ import akka.stream.testkit.StreamTestKit import akka.stream.testkit.AkkaSpec import org.reactivestreams.api.Producer -import akka.stream.impl.{ IteratorProducer, ActorBasedFlowMaterializer } import akka.stream.scaladsl.Flow class FlowMergeSpec extends TwoStreamsSetup { diff --git a/akka-stream/src/test/scala/akka/stream/FlowSplitWhenSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowSplitWhenSpec.scala index 021e53973d..6b81190436 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowSplitWhenSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowSplitWhenSpec.scala @@ -7,17 +7,16 @@ import scala.concurrent.duration._ import akka.stream.testkit.StreamTestKit import akka.stream.testkit.AkkaSpec import org.reactivestreams.api.Producer -import akka.stream.impl.{ IteratorProducer, ActorBasedFlowMaterializer } import akka.stream.scaladsl.Flow @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class FlowSplitWhenSpec extends AkkaSpec { - val materializer = new ActorBasedFlowMaterializer(MaterializerSettings( + val materializer = FlowMaterializer(MaterializerSettings( initialInputBufferSize = 2, maximumInputBufferSize = 2, initialFanOutBufferSize = 2, - maxFanOutBufferSize = 2), system) + maxFanOutBufferSize = 2)) case class StreamPuppet(p: Producer[Int]) { val probe = StreamTestKit.consumerProbe[Int] diff --git a/akka-stream/src/test/scala/akka/stream/FlowZipSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowZipSpec.scala index ec73d131fc..6003113202 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowZipSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowZipSpec.scala @@ -3,7 +3,6 @@ */ package akka.stream -import akka.stream.impl.{ IteratorProducer, ActorBasedFlowMaterializer } import akka.stream.testkit.StreamTestKit import akka.stream.testkit.AkkaSpec import akka.stream.scaladsl.Flow diff --git a/akka-stream/src/test/scala/akka/stream/IdentityProcessorTest.scala b/akka-stream/src/test/scala/akka/stream/IdentityProcessorTest.scala index eaedb46406..e97e972375 100644 --- a/akka-stream/src/test/scala/akka/stream/IdentityProcessorTest.scala +++ b/akka-stream/src/test/scala/akka/stream/IdentityProcessorTest.scala @@ -16,6 +16,7 @@ import akka.stream.impl.ActorBasedFlowMaterializer import akka.stream.scaladsl.Flow import akka.stream.scaladsl.Transformer import akka.stream.testkit.AkkaSpec +import java.util.concurrent.atomic.AtomicInteger class IdentityProcessorTest(_system: ActorSystem, env: TestEnvironment, publisherShutdownTimeout: Long) extends IdentityProcessorVerification[Int](env, publisherShutdownTimeout) @@ -33,6 +34,7 @@ class IdentityProcessorTest(_system: ActorSystem, env: TestEnvironment, publishe } system.eventStream.publish(TestEvent.Mute(EventFilter[RuntimeException]("Test exception"))) + val processorCounter = new AtomicInteger def createIdentityProcessor(maxBufferSize: Int): Processor[Int, Int] = { val fanoutSize = maxBufferSize / 2 @@ -44,12 +46,12 @@ class IdentityProcessorTest(_system: ActorSystem, env: TestEnvironment, publishe maximumInputBufferSize = inputSize, initialFanOutBufferSize = fanoutSize, maxFanOutBufferSize = fanoutSize), - system) + system, system.name) val processor = materializer.processorForNode(Ast.Transform( new Transformer[Any, Any] { override def onNext(in: Any) = List(in) - })) + }), "IdentityProcessorTest-" + processorCounter.incrementAndGet(), 1) processor.asInstanceOf[Processor[Int, Int]] } diff --git a/akka-stream/src/test/scala/akka/stream/ProcessorHierarchySpec.scala b/akka-stream/src/test/scala/akka/stream/ProcessorHierarchySpec.scala index 4d3a284446..b89b0a5ff6 100644 --- a/akka-stream/src/test/scala/akka/stream/ProcessorHierarchySpec.scala +++ b/akka-stream/src/test/scala/akka/stream/ProcessorHierarchySpec.scala @@ -5,13 +5,13 @@ package akka.stream import akka.stream.testkit.AkkaSpec import akka.stream.scaladsl.Flow -import akka.stream.impl.ActorBasedFlowMaterializer import akka.actor.ActorContext import scala.concurrent.Await import scala.concurrent.duration._ import akka.actor.ActorRef import scala.collection.immutable.TreeSet import scala.util.control.NonFatal +import akka.stream.impl.ActorBasedFlowMaterializer class ProcessorHierarchySpec extends AkkaSpec("akka.actor.debug.lifecycle=off\nakka.loglevel=INFO") { diff --git a/akka-stream/src/test/scala/akka/stream/ProcessorNamingSpec.scala b/akka-stream/src/test/scala/akka/stream/ProcessorNamingSpec.scala new file mode 100644 index 0000000000..95fa0c7da3 --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/ProcessorNamingSpec.scala @@ -0,0 +1,71 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream + +import akka.stream.testkit.AkkaSpec +import akka.stream.scaladsl.Flow +import akka.actor.ActorContext +import scala.concurrent.Await +import scala.concurrent.duration._ +import akka.actor.ActorRef +import scala.collection.immutable.TreeSet +import scala.util.control.NonFatal +import akka.stream.impl.ActorBasedFlowMaterializer +import akka.stream.impl.FlowNameCounter +import akka.stream.scaladsl.Transformer + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class ProcessorNamingSpec extends AkkaSpec("akka.loglevel=INFO") { + + def self = ActorBasedFlowMaterializer.currentActorContext().self + def flowCount = FlowNameCounter(system).counter.get + + "Processors of a flow" must { + + "have sensible default names for flow with one step" in { + val materializer = FlowMaterializer(MaterializerSettings()) + Flow(List(1)).map(in ⇒ { testActor ! self; in }).consume(materializer) + expectMsgType[ActorRef].path.name should be(s"flow-$flowCount-1-map") + } + + "have sensible default names for flow with several steps" in { + val materializer = FlowMaterializer(MaterializerSettings()) + Flow(List(1)). + map(in ⇒ { testActor ! self; in }). + transform(new Transformer[Int, Int] { + override def onNext(in: Int) = { testActor ! self; List(in) } + }). + filter(_ ⇒ { testActor ! self; true }). + consume(materializer) + + expectMsgType[ActorRef].path.name should be(s"flow-$flowCount-1-map") + expectMsgType[ActorRef].path.name should be(s"flow-$flowCount-2-transform") + expectMsgType[ActorRef].path.name should be(s"flow-$flowCount-3-filter") + } + + "use specified flow namePrefix in materializer" in { + val materializer = FlowMaterializer(MaterializerSettings(), namePrefix = Some("myflow")) + Flow(List(1)).map(in ⇒ { testActor ! self; in }).consume(materializer) + expectMsgType[ActorRef].path.name should be(s"myflow-$flowCount-1-map") + } + + "use specified withNamePrefix in materializer" in { + val materializer = FlowMaterializer(MaterializerSettings()) + Flow(List(2)).map(in ⇒ { testActor ! self; in }).consume(materializer.withNamePrefix("myotherflow")) + expectMsgType[ActorRef].path.name should be(s"myotherflow-$flowCount-1-map") + } + + "create unique name for each materialization" in { + val materializer = FlowMaterializer(MaterializerSettings(), namePrefix = Some("myflow")) + val flow = Flow(List(1)).map(in ⇒ { testActor ! self; in }) + flow.consume(materializer) + val name1 = expectMsgType[ActorRef].path.name + flow.consume(materializer) + val name2 = expectMsgType[ActorRef].path.name + name1 should not be (name2) + } + + } + +} \ No newline at end of file diff --git a/akka-stream/src/test/scala/akka/stream/TwoStreamsSetup.scala b/akka-stream/src/test/scala/akka/stream/TwoStreamsSetup.scala index 62dd171515..92504a7031 100644 --- a/akka-stream/src/test/scala/akka/stream/TwoStreamsSetup.scala +++ b/akka-stream/src/test/scala/akka/stream/TwoStreamsSetup.scala @@ -6,7 +6,6 @@ package akka.stream import scala.util.control.NoStackTrace import org.reactivestreams.api.{ Consumer, Producer } import org.reactivestreams.spi.{ Subscriber, Publisher, Subscription } -import akka.stream.impl.ActorBasedFlowMaterializer import akka.stream.testkit.{ AkkaSpec, StreamTestKit } import akka.stream.scaladsl.Flow import akka.stream.testkit.OnSubscribe @@ -14,11 +13,11 @@ import akka.stream.testkit.OnError abstract class TwoStreamsSetup extends AkkaSpec { - val materializer = new ActorBasedFlowMaterializer(MaterializerSettings( + val materializer = FlowMaterializer(MaterializerSettings( initialInputBufferSize = 2, maximumInputBufferSize = 2, initialFanOutBufferSize = 2, - maxFanOutBufferSize = 2), system) + maxFanOutBufferSize = 2)) case class TE(message: String) extends RuntimeException(message) with NoStackTrace diff --git a/akka-stream/src/test/scala/akka/stream/io/TcpFlowSpec.scala b/akka-stream/src/test/scala/akka/stream/io/TcpFlowSpec.scala index b27e80b624..76b5981256 100644 --- a/akka-stream/src/test/scala/akka/stream/io/TcpFlowSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/io/TcpFlowSpec.scala @@ -8,7 +8,7 @@ import akka.io.{ Tcp, IO } import java.nio.channels.ServerSocketChannel import java.net.InetSocketAddress import akka.stream.MaterializerSettings -import akka.stream.impl.{ ActorProcessor, ActorBasedFlowMaterializer } +import akka.stream.impl.ActorProcessor import akka.stream.scaladsl.Flow import akka.util.ByteString import akka.stream.testkit.{ StreamTestKit, AkkaSpec } @@ -17,6 +17,7 @@ import akka.actor.{ Props, ActorRef, Actor } import scala.collection.immutable.Queue import scala.concurrent.{ Future, Await } import scala.concurrent.duration._ +import akka.stream.FlowMaterializer object TcpFlowSpec { @@ -112,7 +113,7 @@ class TcpFlowSpec extends AkkaSpec { initialFanOutBufferSize = 2, maxFanOutBufferSize = 2) - val materializer = new ActorBasedFlowMaterializer(settings, system) + val materializer = FlowMaterializer(settings) // FIXME: get it from TestUtil def temporaryServerAddress: InetSocketAddress = {