diff --git a/akka-stream/src/main/scala/akka/persistence/stream/PersistentPublisher.scala b/akka-stream/src/main/scala/akka/persistence/stream/PersistentPublisher.scala index 27e0e541ba..94d3ed88de 100644 --- a/akka-stream/src/main/scala/akka/persistence/stream/PersistentPublisher.scala +++ b/akka-stream/src/main/scala/akka/persistence/stream/PersistentPublisher.scala @@ -72,7 +72,7 @@ private object PersistentPublisher { private case class PersistentPublisherNode(processorId: String, publisherSettings: PersistentPublisherSettings) extends PublisherNode[Persistent] { def createPublisher(materializer: ActorBasedFlowMaterializer, flowName: String): Publisher[Persistent] = - ActorPublisher[Persistent](materializer.context.actorOf(PersistentPublisher.props(processorId, publisherSettings, materializer.settings), + ActorPublisher[Persistent](materializer.actorOf(PersistentPublisher.props(processorId, publisherSettings, materializer.settings), name = s"$flowName-0-persistentPublisher")) } diff --git a/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala index 07169abba6..ff57528570 100644 --- a/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala @@ -10,6 +10,10 @@ import akka.stream.impl.Ast import org.reactivestreams.{ Publisher, Subscriber } import scala.concurrent.duration._ import akka.actor.Deploy +import akka.actor.ExtendedActorSystem +import akka.actor.ActorContext +import akka.stream.impl.StreamSupervisor +import akka.stream.impl.FlowNameCounter object FlowMaterializer { @@ -24,8 +28,21 @@ object FlowMaterializer { * the processing steps. The default `namePrefix` is `"flow"`. The actor names are built up of * `namePrefix-flowNumber-flowStepNumber-stepName`. */ - def apply(settings: MaterializerSettings, namePrefix: Option[String] = None)(implicit context: ActorRefFactory): FlowMaterializer = - new ActorBasedFlowMaterializer(settings, context, namePrefix.getOrElse("flow")) + def apply(settings: MaterializerSettings, namePrefix: Option[String] = None)(implicit context: ActorRefFactory): FlowMaterializer = { + val system = context match { + case s: ExtendedActorSystem ⇒ s + case c: ActorContext ⇒ c.system + case null ⇒ throw new IllegalArgumentException("ActorRefFactory context must be defined") + case _ ⇒ throw new IllegalArgumentException(s"ActorRefFactory context must be a ActorSystem or ActorContext, " + + "got [${_contex.getClass.getName}]") + } + + new ActorBasedFlowMaterializer( + settings, + context.actorOf(StreamSupervisor.props(settings).withDispatcher(settings.dispatcher)), + FlowNameCounter(system).counter, + namePrefix.getOrElse("flow")) + } /** * Java API: Creates a FlowMaterializer which will execute every step of a transformation diff --git a/akka-stream/src/main/scala/akka/stream/extra/Implicits.scala b/akka-stream/src/main/scala/akka/stream/extra/Implicits.scala index 232c5c5b7d..aa6a5b07c1 100644 --- a/akka-stream/src/main/scala/akka/stream/extra/Implicits.scala +++ b/akka-stream/src/main/scala/akka/stream/extra/Implicits.scala @@ -13,24 +13,6 @@ import scala.concurrent.duration.FiniteDuration */ object Implicits { - /** - * Implicit enrichment for stream logging. - * - * @see [[Log]] - */ - implicit class LogFlowDsl[T](val flow: Flow[T]) extends AnyVal { - def log(): Flow[T] = flow.transform(Log()) - } - - /** - * Implicit enrichment for stream logging. - * - * @see [[Log]] - */ - implicit class LogDuctDsl[In, Out](val duct: Duct[In, Out]) extends AnyVal { - def log(): Duct[In, Out] = duct.transform(Log()) - } - /** * Provides time measurement utilities on Stream elements. * diff --git a/akka-stream/src/main/scala/akka/stream/extra/Log.scala b/akka-stream/src/main/scala/akka/stream/extra/Log.scala deleted file mode 100644 index d03a96895d..0000000000 --- a/akka-stream/src/main/scala/akka/stream/extra/Log.scala +++ /dev/null @@ -1,99 +0,0 @@ -/** - * Copyright (C) 2014 Typesafe Inc. - */ -package akka.stream.extra - -import scala.collection.immutable -import akka.stream.Transformer -import akka.stream.impl.ActorBasedFlowMaterializer -import akka.actor.ActorContext -import akka.event.LoggingAdapter -import akka.event.Logging - -/** - * Scala API: Mix in TransformerLogging into your [[akka.stream.Transformer]] - * to obtain a reference to a logger, which is available under the name [[#log]]. - */ -trait TransformerLogging { this: Transformer[_, _] ⇒ - - private def context = ActorBasedFlowMaterializer.currentActorContext() - - private var _log: LoggingAdapter = _ - - def log: LoggingAdapter = { - // only used in Actor, i.e. thread safe - if (_log eq null) - _log = Logging(context.system, context.self) - _log - } -} - -object Log { - def apply[T](): Log[T] = new Log[T] - def apply[T](name: String): Log[T] = new Log[T](name) -} - -/** - * Logs the elements, error and completion of a a flow. - * - * By default it logs `onNext` and `onComplete` at info log - * level, and `onError` at error log level. Subclass may customize - * the logging by overriding [[#logOnNext]], [[#logOnComplete]] and - * [[#logOnError]]. - * - * The `logSource` of the [[akka.event.Logging.LogEvent]] is the path of - * the actor processing this step in the flow. It contains the - * flow name and the [[#name]] of this `Transformer`. The - * name can be customized with the [[#name]] constructor parameter. - * - * The [[akka.event.LoggingAdapter]] is accessible - * under the name `log`. - * - * Usage: - * {{{ - * Flow(List(1, 2, 3)).transform(new Log[Int](name = "mylog") { - * override def logOnNext(i: Int): Unit = - * log.debug("Got element {}", i) - * }). - * consume(materializer) - * }}} - * - * Or with the implicit enrichment: - * {{{ - * import akka.stream.extra.Implicits._ - * Flow(List(1, 2, 3)).log().consume(materializer) - * }}} - * - */ -class Log[T](override val name: String = "log") extends Transformer[T, T] with TransformerLogging { - - final def onNext(element: T): immutable.Seq[T] = { - logOnNext(element) - List(element) - } - - def logOnNext(element: T): Unit = { - log.info("OnNext: [{}]", element) - } - - final override def onTermination(e: Option[Throwable]): immutable.Seq[T] = { - logOnComplete() - Nil - } - - def logOnComplete(): Unit = { - log.info("OnComplete") - } - - final override def onError(cause: Throwable): Unit = { - logOnError(cause) - throw cause - } - - def logOnError(cause: Throwable): Unit = { - log.error(cause, "OnError") - } - - final override def isComplete: Boolean = false - -} \ No newline at end of file diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala index 91b3f5145e..589ad13720 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala @@ -21,6 +21,16 @@ import akka.actor.ActorSystem import akka.actor.Extension import scala.concurrent.duration.FiniteDuration import akka.stream.TimerTransformer +import akka.actor.Props +import akka.actor.Actor +import akka.actor.ActorRef +import akka.pattern.ask +import akka.util.Timeout +import scala.concurrent.duration._ +import scala.concurrent.Await +import akka.actor.LocalActorRef +import akka.actor.RepointableActorRef +import akka.actor.ActorCell /** * INTERNAL API @@ -84,36 +94,36 @@ private[akka] object Ast { final case class IteratorPublisherNode[I](iterator: Iterator[I]) extends PublisherNode[I] { final def createPublisher(materializer: ActorBasedFlowMaterializer, flowName: String): Publisher[I] = if (iterator.isEmpty) EmptyPublisher.asInstanceOf[Publisher[I]] - else ActorPublisher[I](materializer.context.actorOf(IteratorPublisher.props(iterator, materializer.settings), + else ActorPublisher[I](materializer.actorOf(IteratorPublisher.props(iterator, materializer.settings), name = s"$flowName-0-iterator")) } final case class IterablePublisherNode[I](iterable: immutable.Iterable[I]) extends PublisherNode[I] { def createPublisher(materializer: ActorBasedFlowMaterializer, flowName: String): Publisher[I] = if (iterable.isEmpty) EmptyPublisher.asInstanceOf[Publisher[I]] - else ActorPublisher[I](materializer.context.actorOf(IterablePublisher.props(iterable, materializer.settings), + else ActorPublisher[I](materializer.actorOf(IterablePublisher.props(iterable, materializer.settings), name = s"$flowName-0-iterable"), Some(iterable)) } final case class ThunkPublisherNode[I](f: () ⇒ I) extends PublisherNode[I] { def createPublisher(materializer: ActorBasedFlowMaterializer, flowName: String): Publisher[I] = - ActorPublisher[I](materializer.context.actorOf(SimpleCallbackPublisher.props(materializer.settings, f), + ActorPublisher[I](materializer.actorOf(SimpleCallbackPublisher.props(materializer.settings, f), name = s"$flowName-0-thunk")) } final case class FuturePublisherNode[I](future: Future[I]) extends PublisherNode[I] { def createPublisher(materializer: ActorBasedFlowMaterializer, flowName: String): Publisher[I] = future.value match { case Some(Success(element)) ⇒ - ActorPublisher[I](materializer.context.actorOf(IterablePublisher.props(List(element), materializer.settings), + ActorPublisher[I](materializer.actorOf(IterablePublisher.props(List(element), materializer.settings), name = s"$flowName-0-future"), Some(future)) case Some(Failure(t)) ⇒ ErrorPublisher(t).asInstanceOf[Publisher[I]] case None ⇒ - ActorPublisher[I](materializer.context.actorOf(FuturePublisher.props(future, materializer.settings), + ActorPublisher[I](materializer.actorOf(FuturePublisher.props(future, materializer.settings), name = s"$flowName-0-future"), Some(future)) } } final case class TickPublisherNode[I](initialDelay: FiniteDuration, interval: FiniteDuration, tick: () ⇒ I) extends PublisherNode[I] { def createPublisher(materializer: ActorBasedFlowMaterializer, flowName: String): Publisher[I] = - ActorPublisher[I](materializer.context.actorOf(TickPublisher.props(initialDelay, interval, tick, materializer.settings), + ActorPublisher[I](materializer.actorOf(TickPublisher.props(initialDelay, interval, tick, materializer.settings), name = s"$flowName-0-tick")) } } @@ -121,60 +131,18 @@ private[akka] object Ast { /** * INTERNAL API */ -private[akka] object ActorBasedFlowMaterializer { - - val ctx = new ThreadLocal[ActorRefFactory] - - def withCtx[T](arf: ActorRefFactory)(block: ⇒ T): T = { - val old = ctx.get() - ctx.set(arf) - try block - finally ctx.set(old) - } - - def currentActorContext(): ActorContext = - ActorBasedFlowMaterializer.ctx.get() match { - case c: ActorContext ⇒ c - case _ ⇒ - throw new IllegalStateException(s"Transformer [${getClass.getName}] is running without ActorContext") - } - -} - -/** - * INTERNAL API - */ -private[akka] class ActorBasedFlowMaterializer( - settings: MaterializerSettings, - _context: ActorRefFactory, +private[akka] case class ActorBasedFlowMaterializer( + override val settings: MaterializerSettings, + supervisor: ActorRef, + flowNameCounter: AtomicLong, namePrefix: String) extends FlowMaterializer(settings) { import Ast._ import ActorBasedFlowMaterializer._ - _context match { - case _: ActorSystem | _: ActorContext ⇒ // ok - case null ⇒ throw new IllegalArgumentException("ActorRefFactory context must be defined") - case _ ⇒ throw new IllegalArgumentException(s"ActorRefFactory context must be a ActorSystem or ActorContext, " + - "got [${_contex.getClass.getName}]") - } + def withNamePrefix(name: String): FlowMaterializer = this.copy(namePrefix = name) - def context = ctx.get() match { - case null ⇒ _context - case x ⇒ x - } - - def withNamePrefix(name: String): FlowMaterializer = - new ActorBasedFlowMaterializer(settings, _context, name) - - private def system: ActorSystem = _context match { - case s: ExtendedActorSystem ⇒ s - case c: ActorContext ⇒ c.system - case _ ⇒ - throw new IllegalArgumentException(s"Unknown ActorRefFactory [${_context.getClass.getName}") - } - - private def nextFlowNameCount(): Long = FlowNameCounter(system).counter.incrementAndGet() + private def nextFlowNameCount(): Long = flowNameCounter.incrementAndGet() private def createFlowName(): String = s"$namePrefix-${nextFlowNameCount()}" @@ -207,9 +175,26 @@ private[akka] class ActorBasedFlowMaterializer( override def onNext(element: Any) = List(element) }) - def processorForNode(op: AstNode, flowName: String, n: Int): Processor[Any, Any] = - ActorProcessor(context.actorOf(ActorProcessor.props(settings, op), - name = s"$flowName-$n-${op.name}")) + def processorForNode(op: AstNode, flowName: String, n: Int): Processor[Any, Any] = { + val impl = actorOf(ActorProcessor.props(settings, op), s"$flowName-$n-${op.name}") + ActorProcessor(impl) + + } + + def actorOf(props: Props, name: String): ActorRef = supervisor match { + case ref: LocalActorRef ⇒ + ref.underlying.attachChild(props, name, systemService = false) + case ref: RepointableActorRef ⇒ + if (ref.isStarted) + ref.underlying.asInstanceOf[ActorCell].attachChild(props, name, systemService = false) + else { + implicit val timeout = ref.system.settings.CreationTimeout + val f = (supervisor ? StreamSupervisor.Materialize(props, name)).mapTo[ActorRef] + Await.result(f, timeout.duration) + } + case _ ⇒ + throw new IllegalStateException(s"Stream supervisor must be a local actor, was [${supervisor.getClass.getName}]") + } override def ductProduceTo[In, Out](subscriber: Subscriber[Out], ops: List[Ast.AstNode]): Subscriber[In] = processorChain(subscriber, ops, createFlowName(), ops.size).asInstanceOf[Subscriber[In]] @@ -243,4 +228,23 @@ private[akka] object FlowNameCounter extends ExtensionId[FlowNameCounter] with E */ private[akka] class FlowNameCounter extends Extension { val counter = new AtomicLong(0) +} + +/** + * INTERNAL API + */ +private[akka] object StreamSupervisor { + def props(settings: MaterializerSettings): Props = Props(new StreamSupervisor(settings)) + + case class Materialize(props: Props, name: String) +} + +private[akka] class StreamSupervisor(settings: MaterializerSettings) extends Actor { + import StreamSupervisor._ + + def receive = { + case Materialize(props, name) ⇒ + val impl = context.actorOf(props, name) + sender() ! impl + } } \ No newline at end of file diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorPublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorPublisher.scala index 9f7ef4414a..52f9f5a4f1 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorPublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorPublisher.scala @@ -194,7 +194,7 @@ private[akka] class SimpleCallbackPublisherImpl[T](f: () ⇒ T, settings: Materi if (demand > 0) { try { demand -= 1 - pushToDownstream(withCtx(context)(f())) + pushToDownstream(f()) if (demand > 0) self ! Generate } catch { case Stop ⇒ { completeDownstream(); shutdownReason = None } diff --git a/akka-stream/src/main/scala/akka/stream/impl/IterablePublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/IterablePublisher.scala index 3d6e9b6078..664334738d 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/IterablePublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/IterablePublisher.scala @@ -95,7 +95,7 @@ private[akka] class IterablePublisher(iterable: immutable.Iterable[Any], setting if (subscribers(subscriber)) subscriber.onError(new IllegalStateException(s"Cannot subscribe $subscriber twice")) else { - val iterator = withCtx(context)(iterable.iterator) + val iterator = iterable.iterator val worker = context.watch(context.actorOf(IterablePublisherWorker.props(iterator, subscriber, settings.maximumInputBufferSize).withDispatcher(context.props.dispatcher))) val subscription = new BasicActorSubscription(worker) @@ -155,7 +155,7 @@ private[akka] class IterablePublisherWorker(iterator: Iterator[Any], subscriber: @tailrec def doPush(n: Int): Unit = if (demand > 0) { demand -= 1 - val hasNext = withCtx(context) { + val hasNext = { subscriber.onNext(iterator.next()) iterator.hasNext } diff --git a/akka-stream/src/main/scala/akka/stream/impl/TickPublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/TickPublisher.scala index 79a15831b7..5ca66bd893 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/TickPublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/TickPublisher.scala @@ -72,21 +72,19 @@ private[akka] class TickPublisher(initialDelay: FiniteDuration, interval: Finite def active: Receive = { case Tick ⇒ - ActorBasedFlowMaterializer.withCtx(context) { - try { - val tickElement = tick() - demand foreach { - case (subscriber, d) ⇒ - if (d > 0) { - demand(subscriber) = d - 1 - subscriber.onNext(tickElement) - } - } - } catch { - case NonFatal(e) ⇒ - // tick closure throwed => onError downstream - demand foreach { case (subscriber, _) ⇒ subscriber.onError(e) } + try { + val tickElement = tick() + demand foreach { + case (subscriber, d) ⇒ + if (d > 0) { + demand(subscriber) = d - 1 + subscriber.onNext(tickElement) + } } + } catch { + case NonFatal(e) ⇒ + // tick closure throwed => onError downstream + demand foreach { case (subscriber, _) ⇒ subscriber.onError(e) } } case RequestMore(elements, subscriber) ⇒ diff --git a/akka-stream/src/main/scala/akka/stream/impl/Transfer.scala b/akka-stream/src/main/scala/akka/stream/impl/Transfer.scala index a7119b0f48..81c24b131f 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Transfer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Transfer.scala @@ -158,7 +158,7 @@ private[akka] trait Pump { // Generate upstream requestMore for every Nth consumed input element final def pump(): Unit = { try while (transferState.isExecutable) { - ActorBasedFlowMaterializer.withCtx(pumpContext)(currentAction()) + currentAction() } catch { case NonFatal(e) ⇒ pumpFailed(e) } if (isPumpFinished) pumpFinished() diff --git a/akka-stream/src/test/scala/akka/stream/ProcessorHierarchySpec.scala b/akka-stream/src/test/scala/akka/stream/ProcessorHierarchySpec.scala deleted file mode 100644 index c8d233f388..0000000000 --- a/akka-stream/src/test/scala/akka/stream/ProcessorHierarchySpec.scala +++ /dev/null @@ -1,48 +0,0 @@ -/** - * Copyright (C) 2014 Typesafe Inc. - */ -package akka.stream - -import akka.stream.testkit.AkkaSpec -import akka.stream.scaladsl.Flow -import akka.actor.ActorContext -import scala.concurrent.Await -import scala.concurrent.duration._ -import akka.actor.ActorRef -import scala.collection.immutable.TreeSet -import scala.util.control.NonFatal -import akka.stream.impl.ActorBasedFlowMaterializer - -class ProcessorHierarchySpec extends AkkaSpec("akka.actor.debug.lifecycle=off\nakka.loglevel=INFO") { - - val materializer = FlowMaterializer(MaterializerSettings(dispatcher = "akka.test.stream-dispatcher")) - - def self = ActorBasedFlowMaterializer.currentActorContext().self - - "An ActorBasedFlowMaterializer" must { - - "generate the right level of descendants" in { - val f = Flow(() ⇒ { - testActor ! self - Flow(List(1)).map(x ⇒ { testActor ! self; x }).toPublisher(materializer) - }).take(3).foreach(x ⇒ { - testActor ! self - Flow(x).foreach(_ ⇒ testActor ! self, materializer) - }, materializer) - Await.result(f, 3.seconds) - val refs = receiveWhile(idle = 250.millis) { - case r: ActorRef ⇒ r - } - try { - refs.toSet.size should be(8) - refs.distinct.map(_.path.elements.size).groupBy(x ⇒ x).mapValues(x ⇒ x.size) should be(Map(2 -> 2, 3 -> 6)) - } catch { - case NonFatal(e) ⇒ - println(refs.map(_.toString).to[TreeSet].mkString("\n")) - throw e - } - } - - } - -} \ No newline at end of file diff --git a/akka-stream/src/test/scala/akka/stream/ProcessorNamingSpec.scala b/akka-stream/src/test/scala/akka/stream/ProcessorNamingSpec.scala deleted file mode 100644 index b7add3a934..0000000000 --- a/akka-stream/src/test/scala/akka/stream/ProcessorNamingSpec.scala +++ /dev/null @@ -1,72 +0,0 @@ -/** - * Copyright (C) 2014 Typesafe Inc. - */ -package akka.stream - -import akka.stream.testkit.AkkaSpec -import akka.stream.scaladsl.Flow -import akka.actor.ActorContext -import scala.concurrent.Await -import scala.concurrent.duration._ -import akka.actor.ActorRef -import scala.collection.immutable.TreeSet -import scala.util.control.NonFatal -import akka.stream.impl.ActorBasedFlowMaterializer -import akka.stream.impl.FlowNameCounter - -@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class ProcessorNamingSpec extends AkkaSpec("akka.loglevel=INFO") { - - def self = ActorBasedFlowMaterializer.currentActorContext().self - def flowCount = FlowNameCounter(system).counter.get - - "Processors of a flow" must { - - "have sensible default names for flow with one step" in { - val materializer = FlowMaterializer(MaterializerSettings(dispatcher = "akka.test.stream-dispatcher")) - Flow(List(1)).map(in ⇒ { testActor ! self; in }).consume(materializer) - expectMsgType[ActorRef].path.name should be(s"flow-$flowCount-1-map") - } - - "have sensible default names for flow with several steps" in { - val materializer = FlowMaterializer(MaterializerSettings(dispatcher = "akka.test.stream-dispatcher")) - Flow(List(1)). - map(in ⇒ { testActor ! self; in }). - transform(new Transformer[Int, Int] { - override def onNext(in: Int) = { testActor ! self; List(in) } - }). - filter(_ ⇒ { testActor ! self; true }). - consume(materializer) - - expectMsgType[ActorRef].path.name should be(s"flow-$flowCount-1-map") - expectMsgType[ActorRef].path.name should be(s"flow-$flowCount-2-transform") - expectMsgType[ActorRef].path.name should be(s"flow-$flowCount-3-filter") - } - - "use specified flow namePrefix in materializer" in { - val materializer = FlowMaterializer(MaterializerSettings(dispatcher = "akka.test.stream-dispatcher"), - namePrefix = Some("myflow")) - Flow(List(1)).map(in ⇒ { testActor ! self; in }).consume(materializer) - expectMsgType[ActorRef].path.name should be(s"myflow-$flowCount-1-map") - } - - "use specified withNamePrefix in materializer" in { - val materializer = FlowMaterializer(MaterializerSettings(dispatcher = "akka.test.stream-dispatcher")) - Flow(List(2)).map(in ⇒ { testActor ! self; in }).consume(materializer.withNamePrefix("myotherflow")) - expectMsgType[ActorRef].path.name should be(s"myotherflow-$flowCount-1-map") - } - - "create unique name for each materialization" in { - val materializer = FlowMaterializer(MaterializerSettings(dispatcher = "akka.test.stream-dispatcher"), - namePrefix = Some("myflow")) - val flow = Flow(List(1)).map(in ⇒ { testActor ! self; in }) - flow.consume(materializer) - val name1 = expectMsgType[ActorRef].path.name - flow.consume(materializer) - val name2 = expectMsgType[ActorRef].path.name - name1 should not be (name2) - } - - } - -} \ No newline at end of file diff --git a/akka-stream/src/test/scala/akka/stream/extra/LogSpec.scala b/akka-stream/src/test/scala/akka/stream/extra/LogSpec.scala deleted file mode 100644 index a071af5a41..0000000000 --- a/akka-stream/src/test/scala/akka/stream/extra/LogSpec.scala +++ /dev/null @@ -1,77 +0,0 @@ -/** - * Copyright (C) 2014 Typesafe Inc. - */ -package akka.stream.extra - -import scala.concurrent.duration._ -import scala.util.control.NoStackTrace -import akka.stream.FlowMaterializer -import akka.stream.MaterializerSettings -import akka.stream.impl.ActorBasedFlowMaterializer -import akka.stream.scaladsl.Flow -import akka.stream.testkit.AkkaSpec -import akka.testkit.EventFilter -import akka.stream.impl.FlowNameCounter - -object LogSpec { - class TestException extends IllegalArgumentException("simulated err") with NoStackTrace -} - -@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class LogSpec extends AkkaSpec("akka.loglevel=INFO") { - import LogSpec._ - - val materializer = FlowMaterializer(MaterializerSettings(dispatcher = "akka.test.stream-dispatcher")) - - def flowCount = FlowNameCounter(system).counter.get - def nextFlowCount = flowCount + 1 - - "Log Transformer" must { - - "log onNext elements" in { - EventFilter.info(source = s"akka://LogSpec/user/flow-$nextFlowCount-1-log", pattern = """OnNext: \[[1|2|3]\]""", occurrences = 3) intercept { - Flow(List(1, 2, 3)). - transform(Log()). - consume(materializer) - } - } - - "log onComplete" in { - EventFilter.info(source = s"akka://LogSpec/user/flow-$nextFlowCount-1-log", message = "OnComplete", occurrences = 1) intercept { - Flow(Nil). - transform(Log()). - consume(materializer) - } - } - - "log onError exception" in { - // FIXME the "failure during processing" occurrence comes from ActorProcessImpl#fail, and will probably be removed - EventFilter[TestException](source = s"akka://LogSpec/user/flow-$nextFlowCount-2-mylog", - pattern = "[OnError: simulated err|failure during processing]", occurrences = 2) intercept { - Flow(List(1, 2, 3)). - map(i ⇒ if (i == 2) throw new TestException else i). - transform(Log(name = "mylog")). - consume(materializer) - } - } - - "have type inference" in { - val f1: Flow[Int] = Flow(List(1, 2, 3)).transform(Log[Int]) - val f2: Flow[Int] = Flow(List(1, 2, 3)).transform(Log()) - val f3: Flow[String] = Flow(List(1, 2, 3)).transform(Log[Int]).map((i: Int) ⇒ i.toString).transform(Log[String]) - val f4: Flow[String] = Flow(List(1, 2, 3)).transform(Log()).map((i: Int) ⇒ i.toString).transform(Log()) - val f5: Flow[String] = - Flow(List(1, 2, 3)).transform(new Log[Int](name = "mylog") { - override def logOnNext(i: Int): Unit = - log.debug("Got element {}", i) - }).map((i: Int) ⇒ i.toString) - } - - "have nice DSL" in { - import akka.stream.extra.Implicits._ - val f: Flow[String] = Flow(List(1, 2, 3)).log().map((i: Int) ⇒ i.toString).log() - } - - } - -} \ No newline at end of file