diff --git a/akka-stream/src/main/scala/akka/persistence/stream/PersistentPublisher.scala b/akka-stream/src/main/scala/akka/persistence/stream/PersistentPublisher.scala index a6c6aa664b..4dc47c4b37 100644 --- a/akka-stream/src/main/scala/akka/persistence/stream/PersistentPublisher.scala +++ b/akka-stream/src/main/scala/akka/persistence/stream/PersistentPublisher.scala @@ -24,9 +24,9 @@ import akka.stream.scaladsl.Flow object PersistentFlow { /** - * Starts a new [[Persistent]] message flow from the given processor, + * Starts a new [[akka.persistence.Persistent]] message flow from the given processor, * identified by `processorId`. Elements are pulled from the processor's - * journal (using a [[View]]) in accordance with the demand coming from + * journal (using a [[akka.persistence.View]]) in accordance with the demand coming from * the downstream transformation steps. * * Elements pulled from the processor's journal are buffered in memory so that @@ -36,9 +36,9 @@ object PersistentFlow { fromProcessor(processorId, PersistentPublisherSettings()) /** - * Starts a new [[Persistent]] message flow from the given processor, + * Starts a new [[akka.persistence.Persistent]] message flow from the given processor, * identified by `processorId`. Elements are pulled from the processor's - * journal (using a [[View]]) in accordance with the demand coming from + * journal (using a [[akka.persistence.View]]) in accordance with the demand coming from * the downstream transformation steps. * * Elements pulled from the processor's journal are buffered in memory so that @@ -46,14 +46,14 @@ object PersistentFlow { * Reads from the journal are done in (coarse-grained) batches of configurable * size (which correspond to the configurable maximum buffer size). * - * @see [[PersistentPublisherSettings]] + * @see [[akka.persistence.PersistentPublisherSettings]] */ def fromProcessor(processorId: String, publisherSettings: PersistentPublisherSettings): Flow[Persistent] = FlowImpl(PersistentPublisherNode(processorId, publisherSettings), Nil) } /** - * Configuration object for a [[Persistent]] stream publisher. + * Configuration object for a [[akka.persistence.Persistent]] stream publisher. * * @param fromSequenceNr Sequence number where the published stream shall start (inclusive). * Default is `1L`. diff --git a/akka-stream/src/main/scala/akka/stream/extra/Implicits.scala b/akka-stream/src/main/scala/akka/stream/extra/Implicits.scala new file mode 100644 index 0000000000..555dd96a28 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/extra/Implicits.scala @@ -0,0 +1,32 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.extra + +import akka.stream.scaladsl.Duct +import akka.stream.scaladsl.Flow + +/** + * Additional [[Flow]] and [[Duct]] operators. + */ +object Implicits { + + /** + * Implicit enrichment for stream logging. + * + * @see [[Log]] + */ + implicit class LogFlowDsl[T](val flow: Flow[T]) extends AnyVal { + def log(): Flow[T] = flow.transform(Log()) + } + + /** + * Implicit enrichment for stream logging. + * + * @see [[Log]] + */ + implicit class LogDuctDsl[In, Out](val duct: Duct[In, Out]) extends AnyVal { + def log(): Duct[In, Out] = duct.transform(Log()) + } + +} \ No newline at end of file diff --git a/akka-stream/src/main/scala/akka/stream/extra/Log.scala b/akka-stream/src/main/scala/akka/stream/extra/Log.scala new file mode 100644 index 0000000000..ec660fb664 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/extra/Log.scala @@ -0,0 +1,96 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.extra + +import scala.collection.immutable +import akka.stream.scaladsl.Transformer +import akka.stream.impl.ActorBasedFlowMaterializer +import akka.actor.ActorContext +import akka.event.LoggingAdapter +import akka.event.Logging + +/** + * Scala API: Mix in TransformerLogging into your [[akka.stream.scaladsl.Transformer]] + * to obtain a reference to a logger, which is available under the name [[#log]]. + */ +trait TransformerLogging { this: Transformer[_, _] ⇒ + + private def context = ActorBasedFlowMaterializer.currentActorContext() + + private var _log: LoggingAdapter = _ + + def log: LoggingAdapter = { + // only used in Actor, i.e. thread safe + if (_log eq null) + _log = Logging(context.system, context.self) + _log + } +} + +object Log { + def apply[T](): Log[T] = new Log[T] + def apply[T](name: String): Log[T] = new Log[T](name) +} + +/** + * Logs the elements, error and completion of a a flow. + * + * By default it logs `onNext` and `onComplete` at info log + * level, and `onError` at error log level. Subclass may customize + * the logging by overriding [[#logOnNext]], [[#logOnComplete]] and + * [[#logOnError]]. + * + * The `logSource` of the [[akka.event.Logging.LogEvent]] is the path of + * the actor processing this step in the flow. It contains the + * flow name and the [[#name]] of this `Transformer`. The + * name can be customized with the [[#name]] constructor parameter. + * + * The [[akka.event.LoggingAdapter]] is accessible + * under the name `log`. + * + * Usage: + * {{{ + * Flow(List(1, 2, 3)).transform(new Log[Int](name = "mylog") { + * override def logOnNext(i: Int): Unit = + * log.debug("Got element {}", i) + * }). + * consume(materializer) + * }}} + * + * Or with the implicit enrichment: + * {{{ + * import akka.stream.extra.Implicits._ + * Flow(List(1, 2, 3)).log().consume(materializer) + * }}} + * + */ +class Log[T](override val name: String = "log") extends Transformer[T, T] with TransformerLogging { + + final def onNext(element: T): immutable.Seq[T] = { + logOnNext(element) + List(element) + } + + def logOnNext(element: T): Unit = { + log.info("OnNext: [{}]", element) + } + + final override def onComplete(): immutable.Seq[T] = { + logOnComplete() + Nil + } + + def logOnComplete(): Unit = { + log.info("OnComplete") + } + + final override def onError(cause: Throwable): Unit = logOnError(cause) + + def logOnError(cause: Throwable): Unit = { + log.error(cause, "OnError") + } + + final override def isComplete: Boolean = false + +} \ No newline at end of file diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala index c94d09d2c3..c49014a328 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala @@ -142,6 +142,9 @@ private[akka] class ActorBasedFlowMaterializer( case x ⇒ x } + def withNamePrefix(name: String): FlowMaterializer = + new ActorBasedFlowMaterializer(settings, _context, name) + private def system: ActorSystem = _context match { case s: ExtendedActorSystem ⇒ s case c: ActorContext ⇒ c.system @@ -151,9 +154,6 @@ private[akka] class ActorBasedFlowMaterializer( private def nextFlowNameCount(): Long = FlowNameCounter(system).counter.incrementAndGet() - def withNamePrefix(name: String): FlowMaterializer = - new ActorBasedFlowMaterializer(settings, _context, name) - private def createFlowName(): String = s"$namePrefix-${nextFlowNameCount()}" @tailrec private def processorChain(topConsumer: Consumer[_], ops: immutable.Seq[AstNode], diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorConsumer.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorConsumer.scala index 466568a8ca..cbbb12c418 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorConsumer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorConsumer.scala @@ -85,9 +85,9 @@ private[akka] abstract class AbstractActorConsumer(val settings: MaterializerSet requestMore() context.become(active) case OnError(cause) ⇒ - onError(cause) + withCtx(context)(onError(cause)) case OnComplete ⇒ - onComplete() + withCtx(context)(onComplete()) } private var subscription: Option[Subscription] = None @@ -142,6 +142,7 @@ private[akka] class TransformActorConsumer(_settings: MaterializerSettings, tran override def onError(cause: Throwable): Unit = { log.error(cause, "terminating due to onError") + transformer.onError(cause) shutdown() } @@ -167,7 +168,7 @@ private[akka] class RecoverActorConsumer(_settings: MaterializerSettings, recove extends TransformActorConsumer(_settings, recoveryTransformer) { override def onError(cause: Throwable): Unit = { - recoveryTransformer.onError(cause) + recoveryTransformer.onErrorRecover(cause) onComplete() } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala index 0b915b9176..81ab08f56f 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala @@ -35,7 +35,7 @@ private[akka] case class FlowImpl[I, O](producerNode: Ast.ProducerNode[I], ops: transformRecover(new RecoveryTransformer[O, Unit] { var done = false override def onNext(in: O) = { p success in; done = true; Nil } - override def onError(e: Throwable) = { p failure e; Nil } + override def onErrorRecover(e: Throwable) = { p failure e; Nil } override def isComplete = done override def onComplete() = { p.tryFailure(new NoSuchElementException("empty stream")); Nil } }).consume(materializer) @@ -48,7 +48,7 @@ private[akka] case class FlowImpl[I, O](producerNode: Ast.ProducerNode[I], ops: transformRecover(new RecoveryTransformer[O, Unit] { var ok = true override def onNext(in: O) = Nil - override def onError(e: Throwable) = { + override def onErrorRecover(e: Throwable) = { callback(Failure(e)) ok = false Nil @@ -82,7 +82,7 @@ private[akka] case class DuctImpl[In, Out](ops: List[Ast.AstNode]) extends Duct[ transformRecover(new RecoveryTransformer[Out, Unit] { var ok = true override def onNext(in: Out) = Nil - override def onError(e: Throwable) = { + override def onErrorRecover(e: Throwable) = { callback(Failure(e)) ok = false Nil diff --git a/akka-stream/src/main/scala/akka/stream/impl/SingleStreamProcessors.scala b/akka-stream/src/main/scala/akka/stream/impl/SingleStreamProcessors.scala index 59f8b270b7..86526c8120 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/SingleStreamProcessors.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/SingleStreamProcessors.scala @@ -53,6 +53,7 @@ private[akka] class TransformProcessorImpl(_settings: MaterializerSettings, tran s"transformer=$transformer)" override def softShutdown(): Unit = { + shutdownReason foreach transformer.onError transformer.cleanup() hasCleanupRun = true // for postStop super.softShutdown() @@ -77,7 +78,7 @@ private[akka] class RecoverProcessorImpl(_settings: MaterializerSettings, recove if (emits.isEmpty && error.isDefined && inputDrained) { val e = error.get error = None - emits = recoveryTransformer.onError(e) + emits = recoveryTransformer.onErrorRecover(e) } else if (emits.isEmpty) { isComplete = recoveryTransformer.isComplete if (depleted || isComplete) { diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 815167dda6..37fe2f98f9 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -160,6 +160,8 @@ trait Flow[+T] { * the [[Transformer#onComplete]] function is invoked to produce a (possibly empty) * sequence of elements in response to the end-of-stream event. * + * [[Transformer#onError]] is called when failure is signaled from upstream. + * * After normal completion or error the [[Transformer#cleanup]] function is called. * * It is possible to keep state in the concrete [[Transformer]] instance with @@ -172,11 +174,10 @@ trait Flow[+T] { /** * This transformation stage works exactly like [[#transform]] with the * change that failure signaled from upstream will invoke - * [[RecoveryTransformer#onError]], which can emit an additional sequence of + * [[RecoveryTransformer#onErrorRecover]], which can emit an additional sequence of * elements before the stream ends. * - * After normal completion or error the [[RecoveryTransformer#cleanup]] function - * is called. + * [[Transformer#onError]] is not called when failure is signaled from upstream. */ def transformRecover[U](recoveryTransformer: RecoveryTransformer[T, U]): Flow[U] @@ -323,6 +324,11 @@ trait Transformer[-T, +U] { */ def onComplete(): immutable.Seq[U] = Nil + /** + * Invoked when failure is signaled from upstream. + */ + def onError(cause: Throwable): Unit = () + /** * Invoked after normal completion or error. */ @@ -345,7 +351,7 @@ trait RecoveryTransformer[-T, +U] extends Transformer[T, U] { * Invoked when failure is signaled from upstream to emit an additional * sequence of elements before the stream ends. */ - def onError(cause: Throwable): immutable.Seq[U] + def onErrorRecover(cause: Throwable): immutable.Seq[U] /** * Name of this transformation step. Used as part of the actor name. diff --git a/akka-stream/src/test/scala/akka/stream/FlowTransformRecoverSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowTransformRecoverSpec.scala index c3c29f339f..5be4a1d556 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowTransformRecoverSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowTransformRecoverSpec.scala @@ -19,8 +19,8 @@ object FlowTransformRecoverSpec { abstract class TryRecoveryTransformer[T, U] extends RecoveryTransformer[T, U] { def onNext(element: Try[T]): immutable.Seq[U] - def onNext(element: T): immutable.Seq[U] = onNext(Success(element)) - def onError(cause: Throwable): immutable.Seq[U] = onNext(Failure(cause)) + override def onNext(element: T): immutable.Seq[U] = onNext(Success(element)) + override def onErrorRecover(cause: Throwable): immutable.Seq[U] = onNext(Failure(cause)) } } @@ -44,7 +44,7 @@ class FlowTransformRecoverSpec extends AkkaSpec { tot += elem List(tot) } - override def onError(e: Throwable) = List(-1) + override def onErrorRecover(e: Throwable) = List(-1) }). toProducer(materializer) val consumer = StreamTestKit.consumerProbe[Int] @@ -68,7 +68,7 @@ class FlowTransformRecoverSpec extends AkkaSpec { tot += elem Vector.fill(elem)(tot) } - override def onError(e: Throwable) = List(-1) + override def onErrorRecover(e: Throwable) = List(-1) }). toProducer(materializer) val consumer = StreamTestKit.consumerProbe[Int] @@ -95,7 +95,7 @@ class FlowTransformRecoverSpec extends AkkaSpec { tot += elem if (elem % 2 == 0) Nil else List(tot) } - override def onError(e: Throwable) = List(-1) + override def onErrorRecover(e: Throwable) = List(-1) }). toProducer(materializer) val consumer = StreamTestKit.consumerProbe[Int] @@ -126,7 +126,7 @@ class FlowTransformRecoverSpec extends AkkaSpec { tot += length List(tot) } - override def onError(e: Throwable) = List(-1) + override def onErrorRecover(e: Throwable) = List(-1) }). toProducer(materializer) val c1 = StreamTestKit.consumerProbe[Int] @@ -228,7 +228,7 @@ class FlowTransformRecoverSpec extends AkkaSpec { if (elem == 2) throw new IllegalArgumentException("two not allowed") else List(elem, elem) } - override def onError(e: Throwable) = List(-1) + override def onErrorRecover(e: Throwable) = List(-1) }). toProducer(materializer) val consumer = StreamTestKit.consumerProbe[Int] @@ -254,7 +254,7 @@ class FlowTransformRecoverSpec extends AkkaSpec { }. transformRecover(new RecoveryTransformer[Int, Int] { override def onNext(elem: Int) = List(elem) - override def onError(e: Throwable) = List(-1, -2, -3) + override def onErrorRecover(e: Throwable) = List(-1, -2, -3) }). toProducer(materializer) val consumer = StreamTestKit.consumerProbe[Int] @@ -296,7 +296,7 @@ class FlowTransformRecoverSpec extends AkkaSpec { transformRecover(new RecoveryTransformer[Int, Throwable] { var s = "" override def onNext(element: Int) = List(new IllegalStateException) - override def onError(ex: Throwable) = { + override def onErrorRecover(ex: Throwable) = { s += ex.getMessage List(ex) } @@ -319,7 +319,7 @@ class FlowTransformRecoverSpec extends AkkaSpec { val p2 = Flow(p). transformRecover(new RecoveryTransformer[Int, Int] { override def onNext(in: Int) = List(in) - override def onError(e: Throwable) = throw e + override def onErrorRecover(e: Throwable) = throw e }). toProducer(materializer) val proc = p.expectSubscription() @@ -338,7 +338,7 @@ class FlowTransformRecoverSpec extends AkkaSpec { val p2 = Flow(p). transformRecover(new RecoveryTransformer[Int, Int] { override def onNext(elem: Int) = List(elem, elem) - override def onError(e: Throwable) = List(-1) + override def onErrorRecover(e: Throwable) = List(-1) }). toProducer(materializer) val consumer = StreamTestKit.consumerProbe[Int] diff --git a/akka-stream/src/test/scala/akka/stream/FlowTransformSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowTransformSpec.scala index 96e5c3a80f..317fec372d 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowTransformSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowTransformSpec.scala @@ -277,6 +277,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d } "report error when exception is thrown" in { + val errProbe = TestProbe() val p = Flow(List(1, 2, 3).iterator).toProducer(materializer) val p2 = Flow(p). transform(new Transformer[Int, Int] { @@ -284,6 +285,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d if (elem == 2) throw new IllegalArgumentException("two not allowed") else List(elem, elem) } + override def onError(cause: Throwable): Unit = errProbe.ref ! cause }). toProducer(materializer) val consumer = StreamTestKit.consumerProbe[Int] @@ -296,6 +298,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d consumer.expectError().getMessage should be("two not allowed") consumer.expectNoMsg(200.millis) } + errProbe.expectMsgType[IllegalArgumentException].getMessage should be("two not allowed") } "support cancel as expected" in { diff --git a/akka-stream/src/test/scala/akka/stream/ProcessorHierarchySpec.scala b/akka-stream/src/test/scala/akka/stream/ProcessorHierarchySpec.scala index b89b0a5ff6..254923b466 100644 --- a/akka-stream/src/test/scala/akka/stream/ProcessorHierarchySpec.scala +++ b/akka-stream/src/test/scala/akka/stream/ProcessorHierarchySpec.scala @@ -17,7 +17,7 @@ class ProcessorHierarchySpec extends AkkaSpec("akka.actor.debug.lifecycle=off\na val materializer = FlowMaterializer(MaterializerSettings()) - def self = ActorBasedFlowMaterializer.ctx.get().asInstanceOf[ActorContext].self + def self = ActorBasedFlowMaterializer.currentActorContext().self "An ActorBasedFlowMaterializer" must { diff --git a/akka-stream/src/test/scala/akka/stream/ProcessorNamingSpec.scala b/akka-stream/src/test/scala/akka/stream/ProcessorNamingSpec.scala index 95fa0c7da3..8ce8a4ff23 100644 --- a/akka-stream/src/test/scala/akka/stream/ProcessorNamingSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/ProcessorNamingSpec.scala @@ -12,8 +12,8 @@ import akka.actor.ActorRef import scala.collection.immutable.TreeSet import scala.util.control.NonFatal import akka.stream.impl.ActorBasedFlowMaterializer -import akka.stream.impl.FlowNameCounter import akka.stream.scaladsl.Transformer +import akka.stream.impl.FlowNameCounter @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class ProcessorNamingSpec extends AkkaSpec("akka.loglevel=INFO") { diff --git a/akka-stream/src/test/scala/akka/stream/extra/LogSpec.scala b/akka-stream/src/test/scala/akka/stream/extra/LogSpec.scala new file mode 100644 index 0000000000..476dae640f --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/extra/LogSpec.scala @@ -0,0 +1,77 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.extra + +import scala.concurrent.duration._ +import scala.util.control.NoStackTrace +import akka.stream.FlowMaterializer +import akka.stream.MaterializerSettings +import akka.stream.impl.ActorBasedFlowMaterializer +import akka.stream.scaladsl.Flow +import akka.stream.testkit.AkkaSpec +import akka.testkit.EventFilter +import akka.stream.impl.FlowNameCounter + +object LogSpec { + class TestException extends IllegalArgumentException("simulated err") with NoStackTrace +} + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class LogSpec extends AkkaSpec("akka.loglevel=INFO") { + import LogSpec._ + + val materializer = FlowMaterializer(MaterializerSettings()) + + def flowCount = FlowNameCounter(system).counter.get + def nextFlowCount = flowCount + 1 + + "Log Transformer" must { + + "log onNext elements" in { + EventFilter.info(source = s"akka://LogSpec/user/flow-$nextFlowCount-1-log", pattern = """OnNext: \[[1|2|3]\]""", occurrences = 3) intercept { + Flow(List(1, 2, 3)). + transform(Log()). + consume(materializer) + } + } + + "log onComplete" in { + EventFilter.info(source = s"akka://LogSpec/user/flow-$nextFlowCount-1-log", message = "OnComplete", occurrences = 1) intercept { + Flow(Nil). + transform(Log()). + consume(materializer) + } + } + + "log onError exception" in { + // FIXME the "failure during processing" occurrence comes from ActorProcessImpl#fail, and will probably be removed + EventFilter[TestException](source = s"akka://LogSpec/user/flow-$nextFlowCount-2-mylog", + pattern = "[OnError: simulated err|failure during processing]", occurrences = 2) intercept { + Flow(List(1, 2, 3)). + map(i ⇒ if (i == 2) throw new TestException else i). + transform(Log(name = "mylog")). + consume(materializer) + } + } + + "have type inference" in { + val f1: Flow[Int] = Flow(List(1, 2, 3)).transform(Log[Int]) + val f2: Flow[Int] = Flow(List(1, 2, 3)).transform(Log()) + val f3: Flow[String] = Flow(List(1, 2, 3)).transform(Log[Int]).map((i: Int) ⇒ i.toString).transform(Log[String]) + val f4: Flow[String] = Flow(List(1, 2, 3)).transform(Log()).map((i: Int) ⇒ i.toString).transform(Log()) + val f5: Flow[String] = + Flow(List(1, 2, 3)).transform(new Log[Int](name = "mylog") { + override def logOnNext(i: Int): Unit = + log.debug("Got element {}", i) + }).map((i: Int) ⇒ i.toString) + } + + "have nice DSL" in { + import akka.stream.extra.Implicits._ + val f: Flow[String] = Flow(List(1, 2, 3)).log().map((i: Int) ⇒ i.toString).log() + } + + } + +} \ No newline at end of file