+str #15093 Add flow logging facility
* Fixes #15093 * Add Transformer.onError
This commit is contained in:
parent
3bb9471072
commit
b548395e2f
13 changed files with 249 additions and 33 deletions
|
|
@ -24,9 +24,9 @@ import akka.stream.scaladsl.Flow
|
||||||
|
|
||||||
object PersistentFlow {
|
object PersistentFlow {
|
||||||
/**
|
/**
|
||||||
* Starts a new [[Persistent]] message flow from the given processor,
|
* Starts a new [[akka.persistence.Persistent]] message flow from the given processor,
|
||||||
* identified by `processorId`. Elements are pulled from the processor's
|
* identified by `processorId`. Elements are pulled from the processor's
|
||||||
* journal (using a [[View]]) in accordance with the demand coming from
|
* journal (using a [[akka.persistence.View]]) in accordance with the demand coming from
|
||||||
* the downstream transformation steps.
|
* the downstream transformation steps.
|
||||||
*
|
*
|
||||||
* Elements pulled from the processor's journal are buffered in memory so that
|
* Elements pulled from the processor's journal are buffered in memory so that
|
||||||
|
|
@ -36,9 +36,9 @@ object PersistentFlow {
|
||||||
fromProcessor(processorId, PersistentPublisherSettings())
|
fromProcessor(processorId, PersistentPublisherSettings())
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Starts a new [[Persistent]] message flow from the given processor,
|
* Starts a new [[akka.persistence.Persistent]] message flow from the given processor,
|
||||||
* identified by `processorId`. Elements are pulled from the processor's
|
* identified by `processorId`. Elements are pulled from the processor's
|
||||||
* journal (using a [[View]]) in accordance with the demand coming from
|
* journal (using a [[akka.persistence.View]]) in accordance with the demand coming from
|
||||||
* the downstream transformation steps.
|
* the downstream transformation steps.
|
||||||
*
|
*
|
||||||
* Elements pulled from the processor's journal are buffered in memory so that
|
* Elements pulled from the processor's journal are buffered in memory so that
|
||||||
|
|
@ -46,14 +46,14 @@ object PersistentFlow {
|
||||||
* Reads from the journal are done in (coarse-grained) batches of configurable
|
* Reads from the journal are done in (coarse-grained) batches of configurable
|
||||||
* size (which correspond to the configurable maximum buffer size).
|
* size (which correspond to the configurable maximum buffer size).
|
||||||
*
|
*
|
||||||
* @see [[PersistentPublisherSettings]]
|
* @see [[akka.persistence.PersistentPublisherSettings]]
|
||||||
*/
|
*/
|
||||||
def fromProcessor(processorId: String, publisherSettings: PersistentPublisherSettings): Flow[Persistent] =
|
def fromProcessor(processorId: String, publisherSettings: PersistentPublisherSettings): Flow[Persistent] =
|
||||||
FlowImpl(PersistentPublisherNode(processorId, publisherSettings), Nil)
|
FlowImpl(PersistentPublisherNode(processorId, publisherSettings), Nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Configuration object for a [[Persistent]] stream publisher.
|
* Configuration object for a [[akka.persistence.Persistent]] stream publisher.
|
||||||
*
|
*
|
||||||
* @param fromSequenceNr Sequence number where the published stream shall start (inclusive).
|
* @param fromSequenceNr Sequence number where the published stream shall start (inclusive).
|
||||||
* Default is `1L`.
|
* Default is `1L`.
|
||||||
|
|
|
||||||
32
akka-stream/src/main/scala/akka/stream/extra/Implicits.scala
Normal file
32
akka-stream/src/main/scala/akka/stream/extra/Implicits.scala
Normal file
|
|
@ -0,0 +1,32 @@
|
||||||
|
/**
|
||||||
|
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
|
||||||
|
*/
|
||||||
|
package akka.stream.extra
|
||||||
|
|
||||||
|
import akka.stream.scaladsl.Duct
|
||||||
|
import akka.stream.scaladsl.Flow
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Additional [[Flow]] and [[Duct]] operators.
|
||||||
|
*/
|
||||||
|
object Implicits {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Implicit enrichment for stream logging.
|
||||||
|
*
|
||||||
|
* @see [[Log]]
|
||||||
|
*/
|
||||||
|
implicit class LogFlowDsl[T](val flow: Flow[T]) extends AnyVal {
|
||||||
|
def log(): Flow[T] = flow.transform(Log())
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Implicit enrichment for stream logging.
|
||||||
|
*
|
||||||
|
* @see [[Log]]
|
||||||
|
*/
|
||||||
|
implicit class LogDuctDsl[In, Out](val duct: Duct[In, Out]) extends AnyVal {
|
||||||
|
def log(): Duct[In, Out] = duct.transform(Log())
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
96
akka-stream/src/main/scala/akka/stream/extra/Log.scala
Normal file
96
akka-stream/src/main/scala/akka/stream/extra/Log.scala
Normal file
|
|
@ -0,0 +1,96 @@
|
||||||
|
/**
|
||||||
|
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
|
||||||
|
*/
|
||||||
|
package akka.stream.extra
|
||||||
|
|
||||||
|
import scala.collection.immutable
|
||||||
|
import akka.stream.scaladsl.Transformer
|
||||||
|
import akka.stream.impl.ActorBasedFlowMaterializer
|
||||||
|
import akka.actor.ActorContext
|
||||||
|
import akka.event.LoggingAdapter
|
||||||
|
import akka.event.Logging
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Scala API: Mix in TransformerLogging into your [[akka.stream.scaladsl.Transformer]]
|
||||||
|
* to obtain a reference to a logger, which is available under the name [[#log]].
|
||||||
|
*/
|
||||||
|
trait TransformerLogging { this: Transformer[_, _] ⇒
|
||||||
|
|
||||||
|
private def context = ActorBasedFlowMaterializer.currentActorContext()
|
||||||
|
|
||||||
|
private var _log: LoggingAdapter = _
|
||||||
|
|
||||||
|
def log: LoggingAdapter = {
|
||||||
|
// only used in Actor, i.e. thread safe
|
||||||
|
if (_log eq null)
|
||||||
|
_log = Logging(context.system, context.self)
|
||||||
|
_log
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
object Log {
|
||||||
|
def apply[T](): Log[T] = new Log[T]
|
||||||
|
def apply[T](name: String): Log[T] = new Log[T](name)
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Logs the elements, error and completion of a a flow.
|
||||||
|
*
|
||||||
|
* By default it logs `onNext` and `onComplete` at info log
|
||||||
|
* level, and `onError` at error log level. Subclass may customize
|
||||||
|
* the logging by overriding [[#logOnNext]], [[#logOnComplete]] and
|
||||||
|
* [[#logOnError]].
|
||||||
|
*
|
||||||
|
* The `logSource` of the [[akka.event.Logging.LogEvent]] is the path of
|
||||||
|
* the actor processing this step in the flow. It contains the
|
||||||
|
* flow name and the [[#name]] of this `Transformer`. The
|
||||||
|
* name can be customized with the [[#name]] constructor parameter.
|
||||||
|
*
|
||||||
|
* The [[akka.event.LoggingAdapter]] is accessible
|
||||||
|
* under the name `log`.
|
||||||
|
*
|
||||||
|
* Usage:
|
||||||
|
* {{{
|
||||||
|
* Flow(List(1, 2, 3)).transform(new Log[Int](name = "mylog") {
|
||||||
|
* override def logOnNext(i: Int): Unit =
|
||||||
|
* log.debug("Got element {}", i)
|
||||||
|
* }).
|
||||||
|
* consume(materializer)
|
||||||
|
* }}}
|
||||||
|
*
|
||||||
|
* Or with the implicit enrichment:
|
||||||
|
* {{{
|
||||||
|
* import akka.stream.extra.Implicits._
|
||||||
|
* Flow(List(1, 2, 3)).log().consume(materializer)
|
||||||
|
* }}}
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
class Log[T](override val name: String = "log") extends Transformer[T, T] with TransformerLogging {
|
||||||
|
|
||||||
|
final def onNext(element: T): immutable.Seq[T] = {
|
||||||
|
logOnNext(element)
|
||||||
|
List(element)
|
||||||
|
}
|
||||||
|
|
||||||
|
def logOnNext(element: T): Unit = {
|
||||||
|
log.info("OnNext: [{}]", element)
|
||||||
|
}
|
||||||
|
|
||||||
|
final override def onComplete(): immutable.Seq[T] = {
|
||||||
|
logOnComplete()
|
||||||
|
Nil
|
||||||
|
}
|
||||||
|
|
||||||
|
def logOnComplete(): Unit = {
|
||||||
|
log.info("OnComplete")
|
||||||
|
}
|
||||||
|
|
||||||
|
final override def onError(cause: Throwable): Unit = logOnError(cause)
|
||||||
|
|
||||||
|
def logOnError(cause: Throwable): Unit = {
|
||||||
|
log.error(cause, "OnError")
|
||||||
|
}
|
||||||
|
|
||||||
|
final override def isComplete: Boolean = false
|
||||||
|
|
||||||
|
}
|
||||||
|
|
@ -142,6 +142,9 @@ private[akka] class ActorBasedFlowMaterializer(
|
||||||
case x ⇒ x
|
case x ⇒ x
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def withNamePrefix(name: String): FlowMaterializer =
|
||||||
|
new ActorBasedFlowMaterializer(settings, _context, name)
|
||||||
|
|
||||||
private def system: ActorSystem = _context match {
|
private def system: ActorSystem = _context match {
|
||||||
case s: ExtendedActorSystem ⇒ s
|
case s: ExtendedActorSystem ⇒ s
|
||||||
case c: ActorContext ⇒ c.system
|
case c: ActorContext ⇒ c.system
|
||||||
|
|
@ -151,9 +154,6 @@ private[akka] class ActorBasedFlowMaterializer(
|
||||||
|
|
||||||
private def nextFlowNameCount(): Long = FlowNameCounter(system).counter.incrementAndGet()
|
private def nextFlowNameCount(): Long = FlowNameCounter(system).counter.incrementAndGet()
|
||||||
|
|
||||||
def withNamePrefix(name: String): FlowMaterializer =
|
|
||||||
new ActorBasedFlowMaterializer(settings, _context, name)
|
|
||||||
|
|
||||||
private def createFlowName(): String = s"$namePrefix-${nextFlowNameCount()}"
|
private def createFlowName(): String = s"$namePrefix-${nextFlowNameCount()}"
|
||||||
|
|
||||||
@tailrec private def processorChain(topConsumer: Consumer[_], ops: immutable.Seq[AstNode],
|
@tailrec private def processorChain(topConsumer: Consumer[_], ops: immutable.Seq[AstNode],
|
||||||
|
|
|
||||||
|
|
@ -85,9 +85,9 @@ private[akka] abstract class AbstractActorConsumer(val settings: MaterializerSet
|
||||||
requestMore()
|
requestMore()
|
||||||
context.become(active)
|
context.become(active)
|
||||||
case OnError(cause) ⇒
|
case OnError(cause) ⇒
|
||||||
onError(cause)
|
withCtx(context)(onError(cause))
|
||||||
case OnComplete ⇒
|
case OnComplete ⇒
|
||||||
onComplete()
|
withCtx(context)(onComplete())
|
||||||
}
|
}
|
||||||
|
|
||||||
private var subscription: Option[Subscription] = None
|
private var subscription: Option[Subscription] = None
|
||||||
|
|
@ -142,6 +142,7 @@ private[akka] class TransformActorConsumer(_settings: MaterializerSettings, tran
|
||||||
|
|
||||||
override def onError(cause: Throwable): Unit = {
|
override def onError(cause: Throwable): Unit = {
|
||||||
log.error(cause, "terminating due to onError")
|
log.error(cause, "terminating due to onError")
|
||||||
|
transformer.onError(cause)
|
||||||
shutdown()
|
shutdown()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -167,7 +168,7 @@ private[akka] class RecoverActorConsumer(_settings: MaterializerSettings, recove
|
||||||
extends TransformActorConsumer(_settings, recoveryTransformer) {
|
extends TransformActorConsumer(_settings, recoveryTransformer) {
|
||||||
|
|
||||||
override def onError(cause: Throwable): Unit = {
|
override def onError(cause: Throwable): Unit = {
|
||||||
recoveryTransformer.onError(cause)
|
recoveryTransformer.onErrorRecover(cause)
|
||||||
onComplete()
|
onComplete()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -35,7 +35,7 @@ private[akka] case class FlowImpl[I, O](producerNode: Ast.ProducerNode[I], ops:
|
||||||
transformRecover(new RecoveryTransformer[O, Unit] {
|
transformRecover(new RecoveryTransformer[O, Unit] {
|
||||||
var done = false
|
var done = false
|
||||||
override def onNext(in: O) = { p success in; done = true; Nil }
|
override def onNext(in: O) = { p success in; done = true; Nil }
|
||||||
override def onError(e: Throwable) = { p failure e; Nil }
|
override def onErrorRecover(e: Throwable) = { p failure e; Nil }
|
||||||
override def isComplete = done
|
override def isComplete = done
|
||||||
override def onComplete() = { p.tryFailure(new NoSuchElementException("empty stream")); Nil }
|
override def onComplete() = { p.tryFailure(new NoSuchElementException("empty stream")); Nil }
|
||||||
}).consume(materializer)
|
}).consume(materializer)
|
||||||
|
|
@ -48,7 +48,7 @@ private[akka] case class FlowImpl[I, O](producerNode: Ast.ProducerNode[I], ops:
|
||||||
transformRecover(new RecoveryTransformer[O, Unit] {
|
transformRecover(new RecoveryTransformer[O, Unit] {
|
||||||
var ok = true
|
var ok = true
|
||||||
override def onNext(in: O) = Nil
|
override def onNext(in: O) = Nil
|
||||||
override def onError(e: Throwable) = {
|
override def onErrorRecover(e: Throwable) = {
|
||||||
callback(Failure(e))
|
callback(Failure(e))
|
||||||
ok = false
|
ok = false
|
||||||
Nil
|
Nil
|
||||||
|
|
@ -82,7 +82,7 @@ private[akka] case class DuctImpl[In, Out](ops: List[Ast.AstNode]) extends Duct[
|
||||||
transformRecover(new RecoveryTransformer[Out, Unit] {
|
transformRecover(new RecoveryTransformer[Out, Unit] {
|
||||||
var ok = true
|
var ok = true
|
||||||
override def onNext(in: Out) = Nil
|
override def onNext(in: Out) = Nil
|
||||||
override def onError(e: Throwable) = {
|
override def onErrorRecover(e: Throwable) = {
|
||||||
callback(Failure(e))
|
callback(Failure(e))
|
||||||
ok = false
|
ok = false
|
||||||
Nil
|
Nil
|
||||||
|
|
|
||||||
|
|
@ -53,6 +53,7 @@ private[akka] class TransformProcessorImpl(_settings: MaterializerSettings, tran
|
||||||
s"transformer=$transformer)"
|
s"transformer=$transformer)"
|
||||||
|
|
||||||
override def softShutdown(): Unit = {
|
override def softShutdown(): Unit = {
|
||||||
|
shutdownReason foreach transformer.onError
|
||||||
transformer.cleanup()
|
transformer.cleanup()
|
||||||
hasCleanupRun = true // for postStop
|
hasCleanupRun = true // for postStop
|
||||||
super.softShutdown()
|
super.softShutdown()
|
||||||
|
|
@ -77,7 +78,7 @@ private[akka] class RecoverProcessorImpl(_settings: MaterializerSettings, recove
|
||||||
if (emits.isEmpty && error.isDefined && inputDrained) {
|
if (emits.isEmpty && error.isDefined && inputDrained) {
|
||||||
val e = error.get
|
val e = error.get
|
||||||
error = None
|
error = None
|
||||||
emits = recoveryTransformer.onError(e)
|
emits = recoveryTransformer.onErrorRecover(e)
|
||||||
} else if (emits.isEmpty) {
|
} else if (emits.isEmpty) {
|
||||||
isComplete = recoveryTransformer.isComplete
|
isComplete = recoveryTransformer.isComplete
|
||||||
if (depleted || isComplete) {
|
if (depleted || isComplete) {
|
||||||
|
|
|
||||||
|
|
@ -160,6 +160,8 @@ trait Flow[+T] {
|
||||||
* the [[Transformer#onComplete]] function is invoked to produce a (possibly empty)
|
* the [[Transformer#onComplete]] function is invoked to produce a (possibly empty)
|
||||||
* sequence of elements in response to the end-of-stream event.
|
* sequence of elements in response to the end-of-stream event.
|
||||||
*
|
*
|
||||||
|
* [[Transformer#onError]] is called when failure is signaled from upstream.
|
||||||
|
*
|
||||||
* After normal completion or error the [[Transformer#cleanup]] function is called.
|
* After normal completion or error the [[Transformer#cleanup]] function is called.
|
||||||
*
|
*
|
||||||
* It is possible to keep state in the concrete [[Transformer]] instance with
|
* It is possible to keep state in the concrete [[Transformer]] instance with
|
||||||
|
|
@ -172,11 +174,10 @@ trait Flow[+T] {
|
||||||
/**
|
/**
|
||||||
* This transformation stage works exactly like [[#transform]] with the
|
* This transformation stage works exactly like [[#transform]] with the
|
||||||
* change that failure signaled from upstream will invoke
|
* change that failure signaled from upstream will invoke
|
||||||
* [[RecoveryTransformer#onError]], which can emit an additional sequence of
|
* [[RecoveryTransformer#onErrorRecover]], which can emit an additional sequence of
|
||||||
* elements before the stream ends.
|
* elements before the stream ends.
|
||||||
*
|
*
|
||||||
* After normal completion or error the [[RecoveryTransformer#cleanup]] function
|
* [[Transformer#onError]] is not called when failure is signaled from upstream.
|
||||||
* is called.
|
|
||||||
*/
|
*/
|
||||||
def transformRecover[U](recoveryTransformer: RecoveryTransformer[T, U]): Flow[U]
|
def transformRecover[U](recoveryTransformer: RecoveryTransformer[T, U]): Flow[U]
|
||||||
|
|
||||||
|
|
@ -323,6 +324,11 @@ trait Transformer[-T, +U] {
|
||||||
*/
|
*/
|
||||||
def onComplete(): immutable.Seq[U] = Nil
|
def onComplete(): immutable.Seq[U] = Nil
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Invoked when failure is signaled from upstream.
|
||||||
|
*/
|
||||||
|
def onError(cause: Throwable): Unit = ()
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Invoked after normal completion or error.
|
* Invoked after normal completion or error.
|
||||||
*/
|
*/
|
||||||
|
|
@ -345,7 +351,7 @@ trait RecoveryTransformer[-T, +U] extends Transformer[T, U] {
|
||||||
* Invoked when failure is signaled from upstream to emit an additional
|
* Invoked when failure is signaled from upstream to emit an additional
|
||||||
* sequence of elements before the stream ends.
|
* sequence of elements before the stream ends.
|
||||||
*/
|
*/
|
||||||
def onError(cause: Throwable): immutable.Seq[U]
|
def onErrorRecover(cause: Throwable): immutable.Seq[U]
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Name of this transformation step. Used as part of the actor name.
|
* Name of this transformation step. Used as part of the actor name.
|
||||||
|
|
|
||||||
|
|
@ -19,8 +19,8 @@ object FlowTransformRecoverSpec {
|
||||||
abstract class TryRecoveryTransformer[T, U] extends RecoveryTransformer[T, U] {
|
abstract class TryRecoveryTransformer[T, U] extends RecoveryTransformer[T, U] {
|
||||||
def onNext(element: Try[T]): immutable.Seq[U]
|
def onNext(element: Try[T]): immutable.Seq[U]
|
||||||
|
|
||||||
def onNext(element: T): immutable.Seq[U] = onNext(Success(element))
|
override def onNext(element: T): immutable.Seq[U] = onNext(Success(element))
|
||||||
def onError(cause: Throwable): immutable.Seq[U] = onNext(Failure(cause))
|
override def onErrorRecover(cause: Throwable): immutable.Seq[U] = onNext(Failure(cause))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -44,7 +44,7 @@ class FlowTransformRecoverSpec extends AkkaSpec {
|
||||||
tot += elem
|
tot += elem
|
||||||
List(tot)
|
List(tot)
|
||||||
}
|
}
|
||||||
override def onError(e: Throwable) = List(-1)
|
override def onErrorRecover(e: Throwable) = List(-1)
|
||||||
}).
|
}).
|
||||||
toProducer(materializer)
|
toProducer(materializer)
|
||||||
val consumer = StreamTestKit.consumerProbe[Int]
|
val consumer = StreamTestKit.consumerProbe[Int]
|
||||||
|
|
@ -68,7 +68,7 @@ class FlowTransformRecoverSpec extends AkkaSpec {
|
||||||
tot += elem
|
tot += elem
|
||||||
Vector.fill(elem)(tot)
|
Vector.fill(elem)(tot)
|
||||||
}
|
}
|
||||||
override def onError(e: Throwable) = List(-1)
|
override def onErrorRecover(e: Throwable) = List(-1)
|
||||||
}).
|
}).
|
||||||
toProducer(materializer)
|
toProducer(materializer)
|
||||||
val consumer = StreamTestKit.consumerProbe[Int]
|
val consumer = StreamTestKit.consumerProbe[Int]
|
||||||
|
|
@ -95,7 +95,7 @@ class FlowTransformRecoverSpec extends AkkaSpec {
|
||||||
tot += elem
|
tot += elem
|
||||||
if (elem % 2 == 0) Nil else List(tot)
|
if (elem % 2 == 0) Nil else List(tot)
|
||||||
}
|
}
|
||||||
override def onError(e: Throwable) = List(-1)
|
override def onErrorRecover(e: Throwable) = List(-1)
|
||||||
}).
|
}).
|
||||||
toProducer(materializer)
|
toProducer(materializer)
|
||||||
val consumer = StreamTestKit.consumerProbe[Int]
|
val consumer = StreamTestKit.consumerProbe[Int]
|
||||||
|
|
@ -126,7 +126,7 @@ class FlowTransformRecoverSpec extends AkkaSpec {
|
||||||
tot += length
|
tot += length
|
||||||
List(tot)
|
List(tot)
|
||||||
}
|
}
|
||||||
override def onError(e: Throwable) = List(-1)
|
override def onErrorRecover(e: Throwable) = List(-1)
|
||||||
}).
|
}).
|
||||||
toProducer(materializer)
|
toProducer(materializer)
|
||||||
val c1 = StreamTestKit.consumerProbe[Int]
|
val c1 = StreamTestKit.consumerProbe[Int]
|
||||||
|
|
@ -228,7 +228,7 @@ class FlowTransformRecoverSpec extends AkkaSpec {
|
||||||
if (elem == 2) throw new IllegalArgumentException("two not allowed")
|
if (elem == 2) throw new IllegalArgumentException("two not allowed")
|
||||||
else List(elem, elem)
|
else List(elem, elem)
|
||||||
}
|
}
|
||||||
override def onError(e: Throwable) = List(-1)
|
override def onErrorRecover(e: Throwable) = List(-1)
|
||||||
}).
|
}).
|
||||||
toProducer(materializer)
|
toProducer(materializer)
|
||||||
val consumer = StreamTestKit.consumerProbe[Int]
|
val consumer = StreamTestKit.consumerProbe[Int]
|
||||||
|
|
@ -254,7 +254,7 @@ class FlowTransformRecoverSpec extends AkkaSpec {
|
||||||
}.
|
}.
|
||||||
transformRecover(new RecoveryTransformer[Int, Int] {
|
transformRecover(new RecoveryTransformer[Int, Int] {
|
||||||
override def onNext(elem: Int) = List(elem)
|
override def onNext(elem: Int) = List(elem)
|
||||||
override def onError(e: Throwable) = List(-1, -2, -3)
|
override def onErrorRecover(e: Throwable) = List(-1, -2, -3)
|
||||||
}).
|
}).
|
||||||
toProducer(materializer)
|
toProducer(materializer)
|
||||||
val consumer = StreamTestKit.consumerProbe[Int]
|
val consumer = StreamTestKit.consumerProbe[Int]
|
||||||
|
|
@ -296,7 +296,7 @@ class FlowTransformRecoverSpec extends AkkaSpec {
|
||||||
transformRecover(new RecoveryTransformer[Int, Throwable] {
|
transformRecover(new RecoveryTransformer[Int, Throwable] {
|
||||||
var s = ""
|
var s = ""
|
||||||
override def onNext(element: Int) = List(new IllegalStateException)
|
override def onNext(element: Int) = List(new IllegalStateException)
|
||||||
override def onError(ex: Throwable) = {
|
override def onErrorRecover(ex: Throwable) = {
|
||||||
s += ex.getMessage
|
s += ex.getMessage
|
||||||
List(ex)
|
List(ex)
|
||||||
}
|
}
|
||||||
|
|
@ -319,7 +319,7 @@ class FlowTransformRecoverSpec extends AkkaSpec {
|
||||||
val p2 = Flow(p).
|
val p2 = Flow(p).
|
||||||
transformRecover(new RecoveryTransformer[Int, Int] {
|
transformRecover(new RecoveryTransformer[Int, Int] {
|
||||||
override def onNext(in: Int) = List(in)
|
override def onNext(in: Int) = List(in)
|
||||||
override def onError(e: Throwable) = throw e
|
override def onErrorRecover(e: Throwable) = throw e
|
||||||
}).
|
}).
|
||||||
toProducer(materializer)
|
toProducer(materializer)
|
||||||
val proc = p.expectSubscription()
|
val proc = p.expectSubscription()
|
||||||
|
|
@ -338,7 +338,7 @@ class FlowTransformRecoverSpec extends AkkaSpec {
|
||||||
val p2 = Flow(p).
|
val p2 = Flow(p).
|
||||||
transformRecover(new RecoveryTransformer[Int, Int] {
|
transformRecover(new RecoveryTransformer[Int, Int] {
|
||||||
override def onNext(elem: Int) = List(elem, elem)
|
override def onNext(elem: Int) = List(elem, elem)
|
||||||
override def onError(e: Throwable) = List(-1)
|
override def onErrorRecover(e: Throwable) = List(-1)
|
||||||
}).
|
}).
|
||||||
toProducer(materializer)
|
toProducer(materializer)
|
||||||
val consumer = StreamTestKit.consumerProbe[Int]
|
val consumer = StreamTestKit.consumerProbe[Int]
|
||||||
|
|
|
||||||
|
|
@ -277,6 +277,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d
|
||||||
}
|
}
|
||||||
|
|
||||||
"report error when exception is thrown" in {
|
"report error when exception is thrown" in {
|
||||||
|
val errProbe = TestProbe()
|
||||||
val p = Flow(List(1, 2, 3).iterator).toProducer(materializer)
|
val p = Flow(List(1, 2, 3).iterator).toProducer(materializer)
|
||||||
val p2 = Flow(p).
|
val p2 = Flow(p).
|
||||||
transform(new Transformer[Int, Int] {
|
transform(new Transformer[Int, Int] {
|
||||||
|
|
@ -284,6 +285,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d
|
||||||
if (elem == 2) throw new IllegalArgumentException("two not allowed")
|
if (elem == 2) throw new IllegalArgumentException("two not allowed")
|
||||||
else List(elem, elem)
|
else List(elem, elem)
|
||||||
}
|
}
|
||||||
|
override def onError(cause: Throwable): Unit = errProbe.ref ! cause
|
||||||
}).
|
}).
|
||||||
toProducer(materializer)
|
toProducer(materializer)
|
||||||
val consumer = StreamTestKit.consumerProbe[Int]
|
val consumer = StreamTestKit.consumerProbe[Int]
|
||||||
|
|
@ -296,6 +298,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d
|
||||||
consumer.expectError().getMessage should be("two not allowed")
|
consumer.expectError().getMessage should be("two not allowed")
|
||||||
consumer.expectNoMsg(200.millis)
|
consumer.expectNoMsg(200.millis)
|
||||||
}
|
}
|
||||||
|
errProbe.expectMsgType[IllegalArgumentException].getMessage should be("two not allowed")
|
||||||
}
|
}
|
||||||
|
|
||||||
"support cancel as expected" in {
|
"support cancel as expected" in {
|
||||||
|
|
|
||||||
|
|
@ -17,7 +17,7 @@ class ProcessorHierarchySpec extends AkkaSpec("akka.actor.debug.lifecycle=off\na
|
||||||
|
|
||||||
val materializer = FlowMaterializer(MaterializerSettings())
|
val materializer = FlowMaterializer(MaterializerSettings())
|
||||||
|
|
||||||
def self = ActorBasedFlowMaterializer.ctx.get().asInstanceOf[ActorContext].self
|
def self = ActorBasedFlowMaterializer.currentActorContext().self
|
||||||
|
|
||||||
"An ActorBasedFlowMaterializer" must {
|
"An ActorBasedFlowMaterializer" must {
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -12,8 +12,8 @@ import akka.actor.ActorRef
|
||||||
import scala.collection.immutable.TreeSet
|
import scala.collection.immutable.TreeSet
|
||||||
import scala.util.control.NonFatal
|
import scala.util.control.NonFatal
|
||||||
import akka.stream.impl.ActorBasedFlowMaterializer
|
import akka.stream.impl.ActorBasedFlowMaterializer
|
||||||
import akka.stream.impl.FlowNameCounter
|
|
||||||
import akka.stream.scaladsl.Transformer
|
import akka.stream.scaladsl.Transformer
|
||||||
|
import akka.stream.impl.FlowNameCounter
|
||||||
|
|
||||||
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
||||||
class ProcessorNamingSpec extends AkkaSpec("akka.loglevel=INFO") {
|
class ProcessorNamingSpec extends AkkaSpec("akka.loglevel=INFO") {
|
||||||
|
|
|
||||||
77
akka-stream/src/test/scala/akka/stream/extra/LogSpec.scala
Normal file
77
akka-stream/src/test/scala/akka/stream/extra/LogSpec.scala
Normal file
|
|
@ -0,0 +1,77 @@
|
||||||
|
/**
|
||||||
|
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
|
||||||
|
*/
|
||||||
|
package akka.stream.extra
|
||||||
|
|
||||||
|
import scala.concurrent.duration._
|
||||||
|
import scala.util.control.NoStackTrace
|
||||||
|
import akka.stream.FlowMaterializer
|
||||||
|
import akka.stream.MaterializerSettings
|
||||||
|
import akka.stream.impl.ActorBasedFlowMaterializer
|
||||||
|
import akka.stream.scaladsl.Flow
|
||||||
|
import akka.stream.testkit.AkkaSpec
|
||||||
|
import akka.testkit.EventFilter
|
||||||
|
import akka.stream.impl.FlowNameCounter
|
||||||
|
|
||||||
|
object LogSpec {
|
||||||
|
class TestException extends IllegalArgumentException("simulated err") with NoStackTrace
|
||||||
|
}
|
||||||
|
|
||||||
|
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
||||||
|
class LogSpec extends AkkaSpec("akka.loglevel=INFO") {
|
||||||
|
import LogSpec._
|
||||||
|
|
||||||
|
val materializer = FlowMaterializer(MaterializerSettings())
|
||||||
|
|
||||||
|
def flowCount = FlowNameCounter(system).counter.get
|
||||||
|
def nextFlowCount = flowCount + 1
|
||||||
|
|
||||||
|
"Log Transformer" must {
|
||||||
|
|
||||||
|
"log onNext elements" in {
|
||||||
|
EventFilter.info(source = s"akka://LogSpec/user/flow-$nextFlowCount-1-log", pattern = """OnNext: \[[1|2|3]\]""", occurrences = 3) intercept {
|
||||||
|
Flow(List(1, 2, 3)).
|
||||||
|
transform(Log()).
|
||||||
|
consume(materializer)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
"log onComplete" in {
|
||||||
|
EventFilter.info(source = s"akka://LogSpec/user/flow-$nextFlowCount-1-log", message = "OnComplete", occurrences = 1) intercept {
|
||||||
|
Flow(Nil).
|
||||||
|
transform(Log()).
|
||||||
|
consume(materializer)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
"log onError exception" in {
|
||||||
|
// FIXME the "failure during processing" occurrence comes from ActorProcessImpl#fail, and will probably be removed
|
||||||
|
EventFilter[TestException](source = s"akka://LogSpec/user/flow-$nextFlowCount-2-mylog",
|
||||||
|
pattern = "[OnError: simulated err|failure during processing]", occurrences = 2) intercept {
|
||||||
|
Flow(List(1, 2, 3)).
|
||||||
|
map(i ⇒ if (i == 2) throw new TestException else i).
|
||||||
|
transform(Log(name = "mylog")).
|
||||||
|
consume(materializer)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
"have type inference" in {
|
||||||
|
val f1: Flow[Int] = Flow(List(1, 2, 3)).transform(Log[Int])
|
||||||
|
val f2: Flow[Int] = Flow(List(1, 2, 3)).transform(Log())
|
||||||
|
val f3: Flow[String] = Flow(List(1, 2, 3)).transform(Log[Int]).map((i: Int) ⇒ i.toString).transform(Log[String])
|
||||||
|
val f4: Flow[String] = Flow(List(1, 2, 3)).transform(Log()).map((i: Int) ⇒ i.toString).transform(Log())
|
||||||
|
val f5: Flow[String] =
|
||||||
|
Flow(List(1, 2, 3)).transform(new Log[Int](name = "mylog") {
|
||||||
|
override def logOnNext(i: Int): Unit =
|
||||||
|
log.debug("Got element {}", i)
|
||||||
|
}).map((i: Int) ⇒ i.toString)
|
||||||
|
}
|
||||||
|
|
||||||
|
"have nice DSL" in {
|
||||||
|
import akka.stream.extra.Implicits._
|
||||||
|
val f: Flow[String] = Flow(List(1, 2, 3)).log().map((i: Int) ⇒ i.toString).log()
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
Loading…
Add table
Add a link
Reference in a new issue