+str #17162 add log() stage for simple logging in Flows

+ with javadsl
+ allows configuring log levels
+ allows turning off logging of certain actions completely
+ cookbook adjusted to show this instead of manual PushStage
- PENDING: preStart based impl will be faster, coming soon
This commit is contained in:
Konrad Malawski 2015-04-09 12:21:12 +02:00
parent 1a5d114290
commit f2b757df51
20 changed files with 622 additions and 95 deletions

View file

@ -1,7 +1,8 @@
package docs.stream.cookbook package docs.stream.cookbook
import akka.event.Logging import akka.event.Logging
import akka.stream.scaladsl.{ Sink, Source, Flow } import akka.stream.OperationAttributes
import akka.stream.scaladsl.{ Sink, Source }
import akka.testkit.{ EventFilter, TestProbe } import akka.testkit.{ EventFilter, TestProbe }
class RecipeLoggingElements extends RecipeSpec { class RecipeLoggingElements extends RecipeSpec {
@ -22,35 +23,23 @@ class RecipeLoggingElements extends RecipeSpec {
printProbe.expectMsgAllOf("1", "2", "3") printProbe.expectMsgAllOf("1", "2", "3")
} }
"work with PushStage" in { "use log()" in {
val mySource = Source(List("1", "2", "3")) val mySource = Source(List("1", "2", "3"))
def analyse(s: String) = s
//#loggingadapter //#log-custom
import akka.stream.stage._ // customise log levels
class LoggingStage[T] extends PushStage[T, T] { mySource.log("before-map")
private val log = Logging(system, "loggingName") .withAttributes(OperationAttributes.logLevels(onElement = Logging.WarningLevel))
.map(analyse)
override def onPush(elem: T, ctx: Context[T]): SyncDirective = { // or provide custom logging adapter
log.debug("Element flowing through: {}", elem) implicit val adapter = Logging(system, "customLogger")
ctx.push(elem) mySource.log("custom")
} //#log-custom
override def onUpstreamFailure(cause: Throwable, val loggedSource = mySource.log("custom")
ctx: Context[T]): TerminationDirective = { EventFilter.debug(start = "[custom] Element: ").intercept {
log.error(cause, "Upstream failed.")
super.onUpstreamFailure(cause, ctx)
}
override def onUpstreamFinish(ctx: Context[T]): TerminationDirective = {
log.debug("Upstream finished")
super.onUpstreamFinish(ctx)
}
}
val loggedSource = mySource.transform(() => new LoggingStage)
//#loggingadapter
EventFilter.debug(start = "Element flowing").intercept {
loggedSource.runWith(Sink.ignore) loggedSource.runWith(Sink.ignore)
} }

View file

@ -39,7 +39,7 @@ handlers, emitting log information through an Akka :class:`LoggingAdapter`. This
the elements flowing in the stream, it just emits them unmodified by calling ``ctx.push(elem)`` in its ``onPush`` the elements flowing in the stream, it just emits them unmodified by calling ``ctx.push(elem)`` in its ``onPush``
event handler logic. event handler logic.
.. includecode:: code/docs/stream/cookbook/RecipeLoggingElements.scala#loggingadapter .. includecode:: code/docs/stream/cookbook/RecipeLoggingElements.scala#log-custom
Flattening a stream of sequences Flattening a stream of sequences
-------------------------------- --------------------------------

View file

@ -5,7 +5,9 @@
package akka.stream; package akka.stream;
import akka.actor.ActorSystem; import akka.actor.ActorSystem;
import akka.event.Logging;
import akka.stream.javadsl.AkkaJUnitActorSystemResource; import akka.stream.javadsl.AkkaJUnitActorSystemResource;
import akka.stream.OperationAttributes;
public abstract class StreamTest { public abstract class StreamTest {
final protected ActorSystem system; final protected ActorSystem system;

View file

@ -3,6 +3,7 @@
*/ */
package akka.stream.impl.fusing package akka.stream.impl.fusing
import akka.stream.OperationAttributes
import akka.stream.testkit.AkkaSpec import akka.stream.testkit.AkkaSpec
import akka.stream.stage._ import akka.stream.stage._
import akka.testkit.TestProbe import akka.testkit.TestProbe
@ -61,6 +62,7 @@ trait InterpreterSpecKit extends AkkaSpec {
val interpreter = new OneBoundedInterpreter(upstream +: ops :+ downstream, val interpreter = new OneBoundedInterpreter(upstream +: ops :+ downstream,
(op, ctx, event) sidechannel.ref ! ActorInterpreter.AsyncInput(op, ctx, event), (op, ctx, event) sidechannel.ref ! ActorInterpreter.AsyncInput(op, ctx, event),
ActorFlowMaterializer(), ActorFlowMaterializer(),
OperationAttributes.none,
forkLimit, overflowToHeap) forkLimit, overflowToHeap)
interpreter.init() interpreter.init()

View file

@ -0,0 +1,173 @@
/**
* Copyright (C) 2014-2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.scaladsl
import akka.event.{ DummyClassForStringSources, Logging }
import akka.stream._
import akka.stream.OperationAttributes
import akka.stream.OperationAttributes.LogLevels
import akka.stream.testkit.{ AkkaSpec, ScriptedTest }
import akka.testkit.TestProbe
import scala.util.control.NoStackTrace
class FlowLogSpec extends AkkaSpec("akka.loglevel = DEBUG") with ScriptedTest {
implicit val mat: FlowMaterializer = ActorFlowMaterializer()
val logProbe = {
val p = TestProbe()
system.eventStream.subscribe(p.ref, classOf[Logging.LogEvent])
p
}
"A Log" must {
val LogSrc = s"akka.stream.Log(akka://${Logging.simpleName(classOf[FlowLogSpec])})"
val LogClazz = classOf[DummyClassForStringSources]
"on Flow" must {
"debug each element" in {
val debugging = Flow[Int].log("my-debug")
Source(1 to 2).via(debugging).runWith(Sink.ignore)
logProbe.expectMsg(Logging.Debug(LogSrc, LogClazz, "[my-debug] Element: 1"))
logProbe.expectMsg(Logging.Debug(LogSrc, LogClazz, "[my-debug] Element: 2"))
logProbe.expectMsg(Logging.Debug(LogSrc, LogClazz, "[my-debug] Upstream finished."))
}
"allow disabling element logging" in {
val disableElementLogging = OperationAttributes.logLevels(
onElement = LogLevels.Off,
onFinish = Logging.DebugLevel,
onFailure = Logging.DebugLevel)
val debugging = Flow[Int].log("my-debug")
Source(1 to 2)
.via(debugging)
.withAttributes(disableElementLogging)
.runWith(Sink.ignore)
logProbe.expectMsg(Logging.Debug(LogSrc, LogClazz, "[my-debug] Upstream finished."))
}
}
"on javadsl.Flow" must {
"debug each element" in {
val log = Logging(system, "com.example.ImportantLogger")
val debugging: javadsl.Flow[Integer, Integer, Unit] = javadsl.Flow.of(classOf[Integer])
.log("log-1")
.log("log-2", new akka.japi.function.Function[Integer, Integer] { def apply(i: Integer) = i })
.log("log-3", new akka.japi.function.Function[Integer, Integer] { def apply(i: Integer) = i }, log)
.log("log-4", log)
javadsl.Source.single[Integer](1).via(debugging).runWith(javadsl.Sink.ignore(), mat)
var counter = 0
var finishCounter = 0
import scala.concurrent.duration._
logProbe.fishForMessage(3.seconds) {
case Logging.Debug(_, _, msg: String) if msg contains "Element: 1"
counter += 1
counter == 4 && finishCounter == 4
case Logging.Debug(_, _, msg: String) if msg contains "Upstream finished"
finishCounter += 1
counter == 4 && finishCounter == 4
}
}
}
"on Source" must {
"debug each element" in {
Source(1 to 2).log("flow-s2").runWith(Sink.ignore)
logProbe.expectMsg(Logging.Debug(LogSrc, LogClazz, "[flow-s2] Element: 1"))
logProbe.expectMsg(Logging.Debug(LogSrc, LogClazz, "[flow-s2] Element: 2"))
logProbe.expectMsg(Logging.Debug(LogSrc, LogClazz, "[flow-s2] Upstream finished."))
}
"allow extracting value to be logged" in {
case class Complex(a: Int, b: String)
Source.single(Complex(1, "42")).log("flow-s3", _.b).runWith(Sink.ignore)
logProbe.expectMsg(Logging.Debug(LogSrc, LogClazz, "[flow-s3] Element: 42"))
logProbe.expectMsg(Logging.Debug(LogSrc, LogClazz, "[flow-s3] Upstream finished."))
}
"log upstream failure" in {
val cause = new TestException
Source.failed(cause).log("flow-4").runWith(Sink.ignore)
logProbe.expectMsg(Logging.Error(cause, LogSrc, LogClazz, "[flow-4] Upstream failed."))
}
"allow passing in custom LoggingAdapter" in {
val log = Logging(system, "com.example.ImportantLogger")
Source.single(42).log("flow-5")(log).runWith(Sink.ignore)
val src = "com.example.ImportantLogger(akka://FlowLogSpec)"
val clazz = classOf[DummyClassForStringSources]
logProbe.expectMsg(Logging.Debug(src, clazz, "[flow-5] Element: 42"))
logProbe.expectMsg(Logging.Debug(src, clazz, "[flow-5] Upstream finished."))
}
"allow configuring log levels via OperationAttributes" in {
val logAttrs = OperationAttributes.logLevels(
onElement = Logging.WarningLevel,
onFinish = Logging.InfoLevel,
onFailure = Logging.DebugLevel)
Source.single(42)
.log("flow-6")
.withAttributes(OperationAttributes.logLevels(
onElement = Logging.WarningLevel,
onFinish = Logging.InfoLevel,
onFailure = Logging.DebugLevel))
.runWith(Sink.ignore)
logProbe.expectMsg(Logging.Warning(LogSrc, LogClazz, "[flow-6] Element: 42"))
logProbe.expectMsg(Logging.Info(LogSrc, LogClazz, "[flow-6] Upstream finished."))
val cause = new TestException
Source.failed(cause)
.log("flow-6e")
.withAttributes(logAttrs)
.runWith(Sink.ignore)
logProbe.expectMsg(Logging.Debug(LogSrc, LogClazz, "[flow-6e] Upstream failed, cause: FlowLogSpec$TestException: Boom!"))
}
}
"on javadsl.Source" must {
"debug each element" in {
val log = Logging(system, "com.example.ImportantLogger")
javadsl.Source.single[Integer](1)
.log("log-1")
.log("log-2", new akka.japi.function.Function[Integer, Integer] { def apply(i: Integer) = i })
.log("log-3", new akka.japi.function.Function[Integer, Integer] { def apply(i: Integer) = i }, log)
.log("log-4", log)
.runWith(javadsl.Sink.ignore(), mat)
var counter = 1
import scala.concurrent.duration._
logProbe.fishForMessage(3.seconds) {
case Logging.Debug(_, _, msg: String) if msg contains "Element: 1"
counter += 1
counter == 4
case Logging.Debug(_, _, msg: String) if msg contains "Upstream finished"
false
}
}
}
}
final class TestException extends RuntimeException("Boom!") with NoStackTrace
}

View file

@ -11,8 +11,7 @@ import akka.stream.stage.Stage
import scala.collection.immutable import scala.collection.immutable
import scala.concurrent.duration._ import scala.concurrent.duration._
import akka.actor._ import akka.actor._
import akka.stream.ActorFlowMaterializerSettings import akka.stream.{ OperationAttributes, ActorFlowMaterializerSettings, ActorFlowMaterializer }
import akka.stream.ActorFlowMaterializer
import akka.stream.impl._ import akka.stream.impl._
import akka.stream.testkit._ import akka.stream.testkit._
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
@ -45,7 +44,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece
_settings: ActorFlowMaterializerSettings, _settings: ActorFlowMaterializerSettings,
_ops: Seq[Stage[_, _]], _ops: Seq[Stage[_, _]],
brokenMessage: Any) brokenMessage: Any)
extends ActorInterpreter(_settings, _ops, mat) { extends ActorInterpreter(_settings, _ops, mat, OperationAttributes.none) {
import akka.stream.actor.ActorSubscriberMessage._ import akka.stream.actor.ActorSubscriberMessage._

View file

@ -3,6 +3,9 @@
*/ */
package akka.stream.scaladsl package akka.stream.scaladsl
import akka.actor.PoisonPill
import akka.stream.{ OperationAttributes, OverflowStrategy, ActorFlowMaterializer, ActorFlowMaterializerSettings }
import akka.stream.stage._
import scala.collection.immutable.Seq import scala.collection.immutable.Seq
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.util.control.NoStackTrace import scala.util.control.NoStackTrace
@ -12,7 +15,9 @@ import akka.stream.testkit._
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import akka.testkit.{ EventFilter, TestProbe } import akka.testkit.{ EventFilter, TestProbe }
import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigFactory
import akka.stream.stage._
import scala.concurrent.duration._
import scala.util.control.NoStackTrace
class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.receive=off\nakka.loglevel=INFO")) { class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.receive=off\nakka.loglevel=INFO")) {
@ -418,6 +423,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug
val upsub = up.expectSubscription() val upsub = up.expectSubscription()
upsub.expectCancellation() upsub.expectCancellation()
} }
} }
} }

View file

@ -37,7 +37,7 @@ akka {
# Enable additional troubleshooting logging at DEBUG log level # Enable additional troubleshooting logging at DEBUG log level
debug-logging = off debug-logging = off
# Maximum number of elements emitted in batch if downstream signals large demand # Maximum number of elements emitted in batch if downstream signals large demand
output-burst-limit = 1000 output-burst-limit = 1000
} }

View file

@ -11,7 +11,7 @@ import akka.stream.impl._
import com.typesafe.config.Config import com.typesafe.config.Config
import scala.concurrent.duration._ import scala.concurrent.duration._
import akka.japi.{ function japi } import akka.japi.function
object ActorFlowMaterializer { object ActorFlowMaterializer {
@ -145,14 +145,16 @@ abstract class ActorFlowMaterializer extends FlowMaterializer {
def effectiveSettings(opAttr: OperationAttributes): ActorFlowMaterializerSettings def effectiveSettings(opAttr: OperationAttributes): ActorFlowMaterializerSettings
/** INTERNAL API */
private[akka] def system: ActorSystem
/** /**
* INTERNAL API: this might become public later * INTERNAL API: this might become public later
*/ */
private[akka] def actorOf(context: MaterializationContext, props: Props): ActorRef private[akka] def actorOf(context: MaterializationContext, props: Props): ActorRef
/**
* INTERNAL API
*/
private[akka] def system: ActorSystem
} }
/** /**
@ -247,7 +249,7 @@ final case class ActorFlowMaterializerSettings(
* overridden for specific flows of the stream operations with * overridden for specific flows of the stream operations with
* [[akka.stream.OperationAttributes#supervisionStrategy]]. * [[akka.stream.OperationAttributes#supervisionStrategy]].
*/ */
def withSupervisionStrategy(decider: japi.Function[Throwable, Supervision.Directive]): ActorFlowMaterializerSettings = { def withSupervisionStrategy(decider: function.Function[Throwable, Supervision.Directive]): ActorFlowMaterializerSettings = {
import Supervision._ import Supervision._
copy(supervisionDecider = decider match { copy(supervisionDecider = decider match {
case `resumingDecider` resumingDecider case `resumingDecider` resumingDecider

View file

@ -3,13 +3,15 @@
*/ */
package akka.stream package akka.stream
import akka.event.Logging
import scala.collection.immutable import scala.collection.immutable
import akka.japi.{ function japi }
import akka.stream.impl.Stages.StageModule import akka.stream.impl.Stages.StageModule
import akka.japi.function
/** /**
* Holds attributes which can be used to alter [[Flow]] or [[FlowGraph]] * Holds attributes which can be used to alter [[akka.stream.scaladsl.Flow]] / [[akka.stream.javadsl.Flow]]
* materialization. * or [[akka.stream.scaladsl.FlowGraph]] / [[akka.stream.javadsl.FlowGraph]] materialization.
* *
* Note that more attributes for the [[ActorFlowMaterializer]] are defined in [[ActorOperationAttributes]]. * Note that more attributes for the [[ActorFlowMaterializer]] are defined in [[ActorOperationAttributes]].
*/ */
@ -96,6 +98,9 @@ final case class OperationAttributes private (attributes: immutable.Seq[Operatio
/** /**
* INTERNAL API * INTERNAL API
*/ */
private[akka] def logLevels: Option[LogLevels] =
attributes.collectFirst { case l: LogLevels l }
private[akka] def transform(node: StageModule): StageModule = private[akka] def transform(node: StageModule): StageModule =
if ((this eq OperationAttributes.none) || (this eq node.attributes)) node if ((this eq OperationAttributes.none) || (this eq node.attributes)) node
else node.withAttributes(attributes = this and node.attributes) else node.withAttributes(attributes = this and node.attributes)
@ -110,6 +115,11 @@ object OperationAttributes {
trait Attribute trait Attribute
final case class Name(n: String) extends Attribute final case class Name(n: String) extends Attribute
final case class InputBuffer(initial: Int, max: Int) extends Attribute final case class InputBuffer(initial: Int, max: Int) extends Attribute
final case class LogLevels(onElement: Logging.LogLevel, onFinish: Logging.LogLevel, onFailure: Logging.LogLevel) extends Attribute
object LogLevels {
/** Use to disable logging on certain operations when configuring [[OperationAttributes.LogLevels]] */
final val Off: Logging.LogLevel = Logging.levelFor("off").get
}
/** /**
* INTERNAL API * INTERNAL API
@ -132,6 +142,30 @@ object OperationAttributes {
*/ */
def inputBuffer(initial: Int, max: Int): OperationAttributes = OperationAttributes(InputBuffer(initial, max)) def inputBuffer(initial: Int, max: Int): OperationAttributes = OperationAttributes(InputBuffer(initial, max))
/**
* Java API
*
* Configures `log()` stage log-levels to be used when logging.
* Logging a certain operation can be completely disabled by using [[LogLevels.Off]].
*
* Passing in null as any of the arguments sets the level to its default value, which is:
* `Debug` for `onElement` and `onFinish`, and `Error` for `onFailure`.
*/
def createLogLevels(onElement: Logging.LogLevel, onFinish: Logging.LogLevel, onFailure: Logging.LogLevel) =
logLevels(
onElement = Option(onElement).getOrElse(Logging.DebugLevel),
onFinish = Option(onFinish).getOrElse(Logging.DebugLevel),
onFailure = Option(onFailure).getOrElse(Logging.ErrorLevel))
/**
* Configures `log()` stage log-levels to be used when logging.
* Logging a certain operation can be completely disabled by using [[LogLevels.Off]].
*
* See [[OperationAttributes.createLogLevels]] for Java API
*/
def logLevels(onElement: Logging.LogLevel = Logging.DebugLevel, onFinish: Logging.LogLevel = Logging.DebugLevel, onFailure: Logging.LogLevel = Logging.ErrorLevel) =
OperationAttributes(LogLevels(onElement, onFinish, onFailure))
} }
/** /**
@ -157,6 +191,31 @@ object ActorOperationAttributes {
/** /**
* Java API: Decides how exceptions from application code are to be handled. * Java API: Decides how exceptions from application code are to be handled.
*/ */
def withSupervisionStrategy(decider: japi.Function[Throwable, Supervision.Directive]): OperationAttributes = def withSupervisionStrategy(decider: function.Function[Throwable, Supervision.Directive]): OperationAttributes =
ActorOperationAttributes.supervisionStrategy(decider.apply _) ActorOperationAttributes.supervisionStrategy(decider.apply _)
/**
* Java API
*
* Configures `log()` stage log-levels to be used when logging.
* Logging a certain operation can be completely disabled by using [[LogLevels.Off]].
*
* Passing in null as any of the arguments sets the level to its default value, which is:
* `Debug` for `onElement` and `onFinish`, and `Error` for `onFailure`.
*/
def createLogLevels(onElement: Logging.LogLevel, onFinish: Logging.LogLevel, onFailure: Logging.LogLevel) =
logLevels(
onElement = Option(onElement).getOrElse(Logging.DebugLevel),
onFinish = Option(onFinish).getOrElse(Logging.DebugLevel),
onFailure = Option(onFailure).getOrElse(Logging.ErrorLevel))
/**
* Configures `log()` stage log-levels to be used when logging.
* Logging a certain operation can be completely disabled by using [[LogLevels.Off]].
*
* See [[OperationAttributes.createLogLevels]] for Java API
*/
def logLevels(onElement: Logging.LogLevel = Logging.DebugLevel, onFinish: Logging.LogLevel = Logging.DebugLevel, onFailure: Logging.LogLevel = Logging.ErrorLevel) =
OperationAttributes(LogLevels(onElement, onFinish, onFailure))
} }

View file

@ -53,6 +53,7 @@ private[akka] case class ActorFlowMaterializerImpl(
case InputBuffer(initial, max) s.withInputBuffer(initial, max) case InputBuffer(initial, max) s.withInputBuffer(initial, max)
case Dispatcher(dispatcher) s.withDispatcher(dispatcher) case Dispatcher(dispatcher) s.withDispatcher(dispatcher)
case SupervisionStrategy(decider) s.withSupervisionStrategy(decider) case SupervisionStrategy(decider) s.withSupervisionStrategy(decider)
case l: LogLevels s
case Name(_) s case Name(_) s
} }
} }
@ -286,30 +287,31 @@ private[akka] object ActorProcessorFactory {
// Also, otherwise the attributes will not affect the settings properly! // Also, otherwise the attributes will not affect the settings properly!
val settings = materializer.effectiveSettings(att) val settings = materializer.effectiveSettings(att)
op match { op match {
case Identity(_) (ActorInterpreter.props(settings, List(fusing.Map(_identity, settings.supervisionDecider)), materializer), ()) case Identity(_) (ActorInterpreter.props(settings, List(fusing.Map(_identity, settings.supervisionDecider)), materializer, att), ())
case Fused(ops, _) (ActorInterpreter.props(settings, ops, materializer), ()) case Fused(ops, _) (ActorInterpreter.props(settings, ops, materializer, att), ())
case Map(f, _) (ActorInterpreter.props(settings, List(fusing.Map(f, settings.supervisionDecider)), materializer), ()) case Map(f, _) (ActorInterpreter.props(settings, List(fusing.Map(f, settings.supervisionDecider)), materializer, att), ())
case Filter(p, _) (ActorInterpreter.props(settings, List(fusing.Filter(p, settings.supervisionDecider)), materializer), ()) case Filter(p, _) (ActorInterpreter.props(settings, List(fusing.Filter(p, settings.supervisionDecider)), materializer, att), ())
case Drop(n, _) (ActorInterpreter.props(settings, List(fusing.Drop(n)), materializer), ()) case Drop(n, _) (ActorInterpreter.props(settings, List(fusing.Drop(n)), materializer, att), ())
case Take(n, _) (ActorInterpreter.props(settings, List(fusing.Take(n)), materializer), ()) case Take(n, _) (ActorInterpreter.props(settings, List(fusing.Take(n)), materializer, att), ())
case Collect(pf, _) (ActorInterpreter.props(settings, List(fusing.Collect(settings.supervisionDecider)(pf)), materializer), ()) case Collect(pf, _) (ActorInterpreter.props(settings, List(fusing.Collect(settings.supervisionDecider)(pf)), materializer, att), ())
case Scan(z, f, _) (ActorInterpreter.props(settings, List(fusing.Scan(z, f, settings.supervisionDecider)), materializer), ()) case Scan(z, f, _) (ActorInterpreter.props(settings, List(fusing.Scan(z, f, settings.supervisionDecider)), materializer, att), ())
case Expand(s, f, _) (ActorInterpreter.props(settings, List(fusing.Expand(s, f)), materializer), ()) case Expand(s, f, _) (ActorInterpreter.props(settings, List(fusing.Expand(s, f)), materializer, att), ())
case Conflate(s, f, _) (ActorInterpreter.props(settings, List(fusing.Conflate(s, f, settings.supervisionDecider)), materializer), ()) case Conflate(s, f, _) (ActorInterpreter.props(settings, List(fusing.Conflate(s, f, settings.supervisionDecider)), materializer, att), ())
case Buffer(n, s, _) (ActorInterpreter.props(settings, List(fusing.Buffer(n, s)), materializer), ()) case Buffer(n, s, _) (ActorInterpreter.props(settings, List(fusing.Buffer(n, s)), materializer, att), ())
case MapConcat(f, _) (ActorInterpreter.props(settings, List(fusing.MapConcat(f, settings.supervisionDecider)), materializer), ()) case MapConcat(f, _) (ActorInterpreter.props(settings, List(fusing.MapConcat(f, settings.supervisionDecider)), materializer, att), ())
case MapAsync(p, f, _) (ActorInterpreter.props(settings, List(fusing.MapAsync(p, f, settings.supervisionDecider)), materializer), ()) case MapAsync(p, f, _) (ActorInterpreter.props(settings, List(fusing.MapAsync(p, f, settings.supervisionDecider)), materializer, att), ())
case MapAsyncUnordered(p, f, _) (ActorInterpreter.props(settings, List(fusing.MapAsyncUnordered(p, f, settings.supervisionDecider)), materializer), ()) case MapAsyncUnordered(p, f, _) (ActorInterpreter.props(settings, List(fusing.MapAsyncUnordered(p, f, settings.supervisionDecider)), materializer, att), ())
case Grouped(n, _) (ActorInterpreter.props(settings, List(fusing.Grouped(n)), materializer), ()) case Grouped(n, _) (ActorInterpreter.props(settings, List(fusing.Grouped(n)), materializer, att), ())
case Log(n, e, l, _) (ActorInterpreter.props(settings, List(fusing.Log(n, e, l)), materializer, att), ())
case GroupBy(f, _) (GroupByProcessorImpl.props(settings, f), ()) case GroupBy(f, _) (GroupByProcessorImpl.props(settings, f), ())
case PrefixAndTail(n, _) (PrefixAndTailImpl.props(settings, n), ()) case PrefixAndTail(n, _) (PrefixAndTailImpl.props(settings, n), ())
case SplitWhen(p, _) (SplitWhenProcessorImpl.props(settings, p), ()) case SplitWhen(p, _) (SplitWhenProcessorImpl.props(settings, p), ())
case ConcatAll(_) (ConcatAllImpl.props(materializer), ()) case ConcatAll(_) (ConcatAllImpl.props(materializer), ())
case StageFactory(mkStage, _) (ActorInterpreter.props(settings, List(mkStage()), materializer), ()) case StageFactory(mkStage, _) (ActorInterpreter.props(settings, List(mkStage()), materializer, att), ())
case TimerTransform(mkStage, _) (TimerTransformerProcessorsImpl.props(settings, mkStage()), ()) case TimerTransform(mkStage, _) (TimerTransformerProcessorsImpl.props(settings, mkStage()), ())
case MaterializingStageFactory(mkStageAndMat, _) case MaterializingStageFactory(mkStageAndMat, _)
val sm = mkStageAndMat() val sm = mkStageAndMat()
(ActorInterpreter.props(settings, List(sm._1), materializer), sm._2) (ActorInterpreter.props(settings, List(sm._1), materializer, att), sm._2)
case DirectProcessor(p, m) throw new AssertionError("DirectProcessor cannot end up in ActorProcessorFactory") case DirectProcessor(p, m) throw new AssertionError("DirectProcessor cannot end up in ActorProcessorFactory")
} }
} }

View file

@ -3,7 +3,7 @@
*/ */
package akka.stream.impl package akka.stream.impl
import akka.event.Logging import akka.event.{ LoggingAdapter, Logging }
import akka.stream.{ OverflowStrategy, TimerTransformer } import akka.stream.{ OverflowStrategy, TimerTransformer }
import akka.stream.OperationAttributes import akka.stream.OperationAttributes
import akka.stream.OperationAttributes._ import akka.stream.OperationAttributes._
@ -132,6 +132,11 @@ private[stream] object Stages {
override protected def newInstance: StageModule = this.copy() override protected def newInstance: StageModule = this.copy()
} }
final case class Log(name: String, extract: Any Any, loggingAdapter: Option[LoggingAdapter], attributes: OperationAttributes = map) extends StageModule {
def withAttributes(attributes: OperationAttributes) = copy(attributes = attributes)
override protected def newInstance: StageModule = this.copy()
}
final case class Filter(p: Any Boolean, attributes: OperationAttributes = filter) extends StageModule { final case class Filter(p: Any Boolean, attributes: OperationAttributes = filter) extends StageModule {
def withAttributes(attributes: OperationAttributes) = copy(attributes = attributes) def withAttributes(attributes: OperationAttributes) = copy(attributes = attributes)
override protected def newInstance: StageModule = this.copy() override protected def newInstance: StageModule = this.copy()

View file

@ -5,17 +5,16 @@ package akka.stream.impl.fusing
import java.util.Arrays import java.util.Arrays
import akka.actor.{ Actor, ActorRef } import akka.actor.{ Actor, ActorRef }
import akka.event.Logging
import akka.stream.ActorFlowMaterializerSettings import akka.stream.ActorFlowMaterializerSettings
import akka.stream.actor.ActorSubscriber.OnSubscribe import akka.stream.actor.ActorSubscriber.OnSubscribe
import akka.stream.actor.ActorSubscriberMessage.{ OnNext, OnError, OnComplete } import akka.stream.actor.ActorSubscriberMessage.{ OnNext, OnError, OnComplete }
import akka.stream.impl._ import akka.stream.impl._
import akka.stream.OperationAttributes
import akka.stream.stage._ import akka.stream.stage._
import org.reactivestreams.{ Subscriber, Subscription } import org.reactivestreams.{ Subscriber, Subscription }
import scala.util.control.NonFatal
import akka.actor.Props import akka.actor.Props
import akka.actor.ActorLogging import akka.actor.ActorLogging
import akka.event.LoggingAdapter import akka.event.{ Logging, LoggingAdapter }
import akka.actor.DeadLetterSuppression import akka.actor.DeadLetterSuppression
import akka.stream.ActorFlowMaterializer import akka.stream.ActorFlowMaterializer
@ -302,8 +301,8 @@ private[akka] class ActorOutputBoundary(val actor: ActorRef,
* INTERNAL API * INTERNAL API
*/ */
private[akka] object ActorInterpreter { private[akka] object ActorInterpreter {
def props(settings: ActorFlowMaterializerSettings, ops: Seq[Stage[_, _]], materializer: ActorFlowMaterializer): Props = def props(settings: ActorFlowMaterializerSettings, ops: Seq[Stage[_, _]], materializer: ActorFlowMaterializer, attributes: OperationAttributes = OperationAttributes.none): Props =
Props(new ActorInterpreter(settings, ops, materializer)) Props(new ActorInterpreter(settings, ops, materializer, attributes))
case class AsyncInput(op: AsyncStage[Any, Any, Any], ctx: AsyncContext[Any, Any], event: Any) extends DeadLetterSuppression case class AsyncInput(op: AsyncStage[Any, Any, Any], ctx: AsyncContext[Any, Any], event: Any) extends DeadLetterSuppression
} }
@ -311,7 +310,7 @@ private[akka] object ActorInterpreter {
/** /**
* INTERNAL API * INTERNAL API
*/ */
private[akka] class ActorInterpreter(val settings: ActorFlowMaterializerSettings, val ops: Seq[Stage[_, _]], val materializer: ActorFlowMaterializer) private[akka] class ActorInterpreter(val settings: ActorFlowMaterializerSettings, val ops: Seq[Stage[_, _]], val materializer: ActorFlowMaterializer, val attributes: OperationAttributes)
extends Actor with ActorLogging { extends Actor with ActorLogging {
import ActorInterpreter._ import ActorInterpreter._
@ -321,15 +320,20 @@ private[akka] class ActorInterpreter(val settings: ActorFlowMaterializerSettings
new OneBoundedInterpreter(upstream +: ops :+ downstream, new OneBoundedInterpreter(upstream +: ops :+ downstream,
(op, ctx, event) self ! AsyncInput(op, ctx, event), (op, ctx, event) self ! AsyncInput(op, ctx, event),
materializer, materializer,
attributes,
name = context.self.path.toString) name = context.self.path.toString)
interpreter.init() interpreter.init()
def receive: Receive = upstream.subreceive.orElse[Any, Unit](downstream.subreceive).orElse[Any, Unit] { def receive: Receive =
case AsyncInput(op, ctx, event) upstream.subreceive
ctx.enter() .orElse[Any, Unit](downstream.subreceive)
op.onAsyncInput(event, ctx) .orElse[Any, Unit] {
ctx.execute() case AsyncInput(op, ctx, event)
} ctx.enter()
op.onAsyncInput(event, ctx)
ctx.execute()
}
override protected[akka] def aroundReceive(receive: Actor.Receive, msg: Any): Unit = { override protected[akka] def aroundReceive(receive: Actor.Receive, msg: Any): Unit = {
super.aroundReceive(receive, msg) super.aroundReceive(receive, msg)

View file

@ -3,13 +3,14 @@
*/ */
package akka.stream.impl.fusing package akka.stream.impl.fusing
import scala.annotation.{ tailrec, switch } import akka.stream.{ FlowMaterializer, Supervision }
import akka.stream.impl.ReactiveStreamsCompliance
import akka.stream.OperationAttributes
import akka.stream.stage._
import scala.annotation.{ switch, tailrec }
import scala.collection.breakOut import scala.collection.breakOut
import scala.util.control.NonFatal import scala.util.control.NonFatal
import akka.stream.stage._
import akka.stream.Supervision
import akka.stream.impl.ReactiveStreamsCompliance
import akka.stream.FlowMaterializer
// TODO: // TODO:
// fix jumpback table with keep-going-on-complete ops (we might jump between otherwise isolated execution regions) // fix jumpback table with keep-going-on-complete ops (we might jump between otherwise isolated execution regions)
@ -124,11 +125,12 @@ private[akka] object OneBoundedInterpreter {
private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]],
onAsyncInput: (AsyncStage[Any, Any, Any], AsyncContext[Any, Any], Any) Unit, onAsyncInput: (AsyncStage[Any, Any, Any], AsyncContext[Any, Any], Any) Unit,
materializer: FlowMaterializer, materializer: FlowMaterializer,
attributes: OperationAttributes = OperationAttributes.none,
val forkLimit: Int = 100, val forkLimit: Int = 100,
val overflowToHeap: Boolean = true, val overflowToHeap: Boolean = true,
val name: String = "") { val name: String = "") {
import OneBoundedInterpreter._
import AbstractStage._ import AbstractStage._
import OneBoundedInterpreter._
type UntypedOp = AbstractStage[Any, Any, Directive, Directive, Context[Any]] type UntypedOp = AbstractStage[Any, Any, Directive, Directive, Context[Any]]
require(ops.nonEmpty, "OneBoundedInterpreter cannot be created without at least one Op") require(ops.nonEmpty, "OneBoundedInterpreter cannot be created without at least one Op")
@ -301,7 +303,7 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]],
mustHave(DownstreamBall) mustHave(DownstreamBall)
} }
removeBits(DownstreamBall | PrecedingWasPull) removeBits(DownstreamBall | PrecedingWasPull)
pipeline(activeOpIndex) = Finished.asInstanceOf[UntypedOp] finishCurrentOp()
// This MUST be an unsafeFork because the execution of PushFinish MUST strictly come before the finish execution // This MUST be an unsafeFork because the execution of PushFinish MUST strictly come before the finish execution
// path. Other forks are not order dependent because they execute on isolated execution domains which cannot // path. Other forks are not order dependent because they execute on isolated execution domains which cannot
// "cross paths". This unsafeFork is relatively safe here because PushAndFinish simply absorbs all later downstream // "cross paths". This unsafeFork is relatively safe here because PushAndFinish simply absorbs all later downstream
@ -380,6 +382,7 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]],
} }
override def materializer: FlowMaterializer = OneBoundedInterpreter.this.materializer override def materializer: FlowMaterializer = OneBoundedInterpreter.this.materializer
override def attributes: OperationAttributes = OneBoundedInterpreter.this.attributes
} }
private final val Pushing: State = new State { private final val Pushing: State = new State {
@ -430,7 +433,7 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]],
private final val Completing: State = new State { private final val Completing: State = new State {
override def advance(): Unit = { override def advance(): Unit = {
elementInFlight = null elementInFlight = null
pipeline(activeOpIndex) = Finished.asInstanceOf[UntypedOp] finishCurrentOp()
activeOpIndex += 1 activeOpIndex += 1
} }
@ -463,7 +466,7 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]],
private final val Cancelling: State = new State { private final val Cancelling: State = new State {
override def advance(): Unit = { override def advance(): Unit = {
elementInFlight = null elementInFlight = null
pipeline(activeOpIndex) = Finished.asInstanceOf[UntypedOp] finishCurrentOp()
activeOpIndex -= 1 activeOpIndex -= 1
} }
@ -485,7 +488,7 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]],
private final case class Failing(cause: Throwable) extends State { private final case class Failing(cause: Throwable) extends State {
override def advance(): Unit = { override def advance(): Unit = {
elementInFlight = null elementInFlight = null
pipeline(activeOpIndex) = Finished.asInstanceOf[UntypedOp] finishCurrentOp()
activeOpIndex += 1 activeOpIndex += 1
} }
@ -630,16 +633,22 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]],
(pipeline(op): Any) match { (pipeline(op): Any) match {
case b: BoundaryStage case b: BoundaryStage
b.context = new EntryState("boundary", op) b.context = new EntryState("boundary", op)
case a: AsyncStage[Any, Any, Any] @unchecked case a: AsyncStage[Any, Any, Any] @unchecked
a.context = new EntryState("async", op) a.context = new EntryState("async", op)
activeOpIndex = op activeOpIndex = op
a.initAsyncInput(a.context) a.initAsyncInput(a.context) // TODO remove asyncInput? it's like preStart
case _ case _
} }
op += 1 op += 1
} }
} }
private def finishCurrentOp(): Unit = {
pipeline(activeOpIndex) = Finished.asInstanceOf[UntypedOp]
}
/** /**
* Starts execution of detached regions. * Starts execution of detached regions.
* *

View file

@ -3,16 +3,23 @@
*/ */
package akka.stream.impl.fusing package akka.stream.impl.fusing
import scala.collection.immutable import akka.event.Logging.LogLevel
import akka.event.Logging
import akka.event.LoggingAdapter
import akka.stream.OperationAttributes.LogLevels
import akka.stream.impl.FixedSizeBuffer import akka.stream.impl.FixedSizeBuffer
import akka.stream.stage._
import akka.stream._
import akka.stream.Supervision
import scala.concurrent.{ ExecutionContext, Future }
import scala.util.{ Try, Success, Failure }
import scala.annotation.tailrec
import scala.util.control.NonFatal
import akka.stream.impl.ReactiveStreamsCompliance import akka.stream.impl.ReactiveStreamsCompliance
import akka.stream.stage._
import akka.stream.Supervision
import akka.stream._
import scala.annotation.tailrec
import scala.collection.immutable
import scala.concurrent.Future
import scala.util.control.NonFatal
import scala.util.Failure
import scala.util.Success
import scala.util.Try
/** /**
* INTERNAL API * INTERNAL API
@ -42,7 +49,9 @@ private[akka] final object Collect {
} }
private[akka] final case class Collect[In, Out](decider: Supervision.Decider)(pf: PartialFunction[In, Out]) extends PushStage[In, Out] { private[akka] final case class Collect[In, Out](decider: Supervision.Decider)(pf: PartialFunction[In, Out]) extends PushStage[In, Out] {
import Collect.NotApplied import Collect.NotApplied
override def onPush(elem: In, ctx: Context[Out]): SyncDirective = override def onPush(elem: In, ctx: Context[Out]): SyncDirective =
pf.applyOrElse(elem, NotApplied) match { pf.applyOrElse(elem, NotApplied) match {
case NotApplied ctx.pull() case NotApplied ctx.pull()
@ -96,6 +105,7 @@ private[akka] final case class Take[T](count: Long) extends PushStage[T, T] {
*/ */
private[akka] final case class Drop[T](count: Long) extends PushStage[T, T] { private[akka] final case class Drop[T](count: Long) extends PushStage[T, T] {
private var left: Long = count private var left: Long = count
override def onPush(elem: T, ctx: Context[T]): SyncDirective = override def onPush(elem: T, ctx: Context[T]): SyncDirective =
if (left > 0) { if (left > 0) {
left -= 1 left -= 1
@ -196,6 +206,7 @@ private[akka] final case class Grouped[T](n: Int) extends PushPullStage[T, immut
* INTERNAL API * INTERNAL API
*/ */
private[akka] final case class Buffer[T](size: Int, overflowStrategy: OverflowStrategy) extends DetachedStage[T, T] { private[akka] final case class Buffer[T](size: Int, overflowStrategy: OverflowStrategy) extends DetachedStage[T, T] {
import OverflowStrategy._ import OverflowStrategy._
private val buffer = FixedSizeBuffer[T](size) private val buffer = FixedSizeBuffer[T](size)
@ -256,6 +267,7 @@ private[akka] final case class Buffer[T](size: Int, overflowStrategy: OverflowSt
*/ */
private[akka] final case class Completed[T]() extends PushPullStage[T, T] { private[akka] final case class Completed[T]() extends PushPullStage[T, T] {
override def onPush(elem: T, ctx: Context[T]): SyncDirective = ctx.finish() override def onPush(elem: T, ctx: Context[T]): SyncDirective = ctx.finish()
override def onPull(ctx: Context[T]): SyncDirective = ctx.finish() override def onPull(ctx: Context[T]): SyncDirective = ctx.finish()
} }
@ -361,6 +373,7 @@ private[akka] object MapAsync {
*/ */
private[akka] final case class MapAsync[In, Out](parallelism: Int, f: In Future[Out], decider: Supervision.Decider) private[akka] final case class MapAsync[In, Out](parallelism: Int, f: In Future[Out], decider: Supervision.Decider)
extends AsyncStage[In, Out, (Int, Try[Out])] { extends AsyncStage[In, Out, (Int, Try[Out])] {
import MapAsync._ import MapAsync._
type Notification = (Int, Try[Out]) type Notification = (Int, Try[Out])
@ -501,3 +514,71 @@ private[akka] final case class MapAsyncUnordered[In, Out](parallelism: Int, f: I
if (todo > 0) ctx.absorbTermination() if (todo > 0) ctx.absorbTermination()
else ctx.finish() else ctx.finish()
} }
/**
* INTERNAL API
*/
private[akka] final case class Log[T](name: String, extract: T Any, logAdapter: Option[LoggingAdapter]) extends PushStage[T, T] {
import Log._
private var logLevels: LogLevels = _
private var log: LoggingAdapter = _
// TODO implement as real preStart once https://github.com/akka/akka/pull/17295 is done
def preStart(ctx: Context[T]): Unit = {
logLevels = ctx.attributes.logLevels.getOrElse(DefaultLogLevels)
log = logAdapter getOrElse {
val sys = ctx.materializer.asInstanceOf[ActorFlowMaterializer].system
Logging(sys, DefaultLoggerName)
}
}
override def onPush(elem: T, ctx: Context[T]): SyncDirective = {
if (log == null) preStart(ctx)
if (isEnabled(logLevels.onElement))
log.log(logLevels.onElement, "[{}] Element: {}", name, extract(elem))
ctx.push(elem)
}
override def onUpstreamFailure(cause: Throwable, ctx: Context[T]): TerminationDirective = {
if (log == null) preStart(ctx)
if (isEnabled(logLevels.onFailure))
logLevels.onFailure match {
case Logging.ErrorLevel log.error(cause, "[{}] Upstream failed.", name)
case level log.log(level, "[{}] Upstream failed, cause: {}: {}", name, Logging.simpleName(cause.getClass), cause.getMessage)
}
super.onUpstreamFailure(cause, ctx)
}
override def onUpstreamFinish(ctx: Context[T]): TerminationDirective = {
if (log == null) preStart(ctx)
if (isEnabled(logLevels.onFinish))
log.log(logLevels.onFinish, "[{}] Upstream finished.", name)
super.onUpstreamFinish(ctx)
}
override def onDownstreamFinish(ctx: Context[T]): TerminationDirective = {
if (log == null) preStart(ctx)
if (isEnabled(logLevels.onFinish))
log.log(logLevels.onFinish, "[{}] Downstream finished.", name)
super.onDownstreamFinish(ctx)
}
private def isEnabled(l: LogLevel): Boolean = l.asInt != OffInt
}
/**
* INTERNAL API
*/
private[akka] object Log {
private final val DefaultLoggerName = "akka.stream.Log"
private final val OffInt = LogLevels.Off.asInt
private final val DefaultLogLevels = LogLevels(onElement = Logging.DebugLevel, onFinish = Logging.DebugLevel, onFailure = Logging.ErrorLevel)
}

View file

@ -3,6 +3,7 @@
*/ */
package akka.stream.javadsl package akka.stream.javadsl
import akka.event.LoggingAdapter
import akka.stream._ import akka.stream._
import akka.japi.{ Util, Pair } import akka.japi.{ Util, Pair }
import akka.japi.function import akka.japi.function
@ -11,7 +12,7 @@ import scala.annotation.unchecked.uncheckedVariance
import scala.concurrent.Future import scala.concurrent.Future
import scala.concurrent.duration.FiniteDuration import scala.concurrent.duration.FiniteDuration
import akka.stream.stage.Stage import akka.stream.stage.Stage
import akka.stream.impl.StreamLayout import akka.stream.impl.{ Stages, StreamLayout }
object Flow { object Flow {
@ -156,8 +157,6 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph
* '''Completes when''' upstream completes * '''Completes when''' upstream completes
* *
* '''Cancels when''' downstream cancels * '''Cancels when''' downstream cancels
*
*
*/ */
def map[T](f: function.Function[Out, T]): javadsl.Flow[In, T, Mat] = def map[T](f: function.Function[Out, T]): javadsl.Flow[In, T, Mat] =
new Flow(delegate.map(f.apply)) new Flow(delegate.map(f.apply))
@ -593,6 +592,81 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph
override def named(name: String): javadsl.Flow[In, Out, Mat] = override def named(name: String): javadsl.Flow[In, Out, Mat] =
new Flow(delegate.named(name)) new Flow(delegate.named(name))
/**
* Logs elements flowing through the stream as well as completion and erroring.
*
* By default element and completion signals are logged on debug level, and errors are logged on Error level.
* This can be adjusted according to your needs by providing a custom [[OperationAttributes.LogLevels]] atrribute on the given Flow:
*
* The `extract` function will be applied to each element before logging, so it is possible to log only those fields
* of a complex object flowing through this element.
*
* Uses the given [[LoggingAdapter]] for logging.
*
* '''Emits when''' the mapping function returns an element
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def log(name: String, extract: function.Function[Out, Any], log: LoggingAdapter): javadsl.Flow[In, Out, Mat] =
new Flow(delegate.log(name, e extract.apply(e))(log))
/**
* Logs elements flowing through the stream as well as completion and erroring.
*
* By default element and completion signals are logged on debug level, and errors are logged on Error level.
* This can be adjusted according to your needs by providing a custom [[OperationAttributes.LogLevels]] atrribute on the given Flow:
*
* The `extract` function will be applied to each element before logging, so it is possible to log only those fields
* of a complex object flowing through this element.
*
* Uses an internally created [[LoggingAdapter]] which uses `akka.stream.Log` as it's source (use this class to configure slf4j loggers).
*
* '''Emits when''' the mapping function returns an element
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def log(name: String, extract: function.Function[Out, Any]): javadsl.Flow[In, Out, Mat] =
this.log(name, extract, null)
/**
* Logs elements flowing through the stream as well as completion and erroring.
*
* By default element and completion signals are logged on debug level, and errors are logged on Error level.
* This can be adjusted according to your needs by providing a custom [[OperationAttributes.LogLevels]] atrribute on the given Flow:
*
* Uses the given [[LoggingAdapter]] for logging.
*
* '''Emits when''' the mapping function returns an element
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def log(name: String, log: LoggingAdapter): javadsl.Flow[In, Out, Mat] =
this.log(name, javaIdentityFunction[Out], log)
/**
* Logs elements flowing through the stream as well as completion and erroring.
*
* By default element and completion signals are logged on debug level, and errors are logged on Error level.
* This can be adjusted according to your needs by providing a custom [[OperationAttributes.LogLevels]] atrribute on the given Flow.
*
* Uses an internally created [[LoggingAdapter]] which uses `akka.stream.Log` as it's source (use this class to configure slf4j loggers).
*/
def log(name: String): javadsl.Flow[In, Out, Mat] =
this.log(name, javaIdentityFunction[Out], null)
} }
/** /**

View file

@ -8,6 +8,7 @@ import akka.japi.function
import scala.collection.immutable import scala.collection.immutable
import java.util.concurrent.Callable import java.util.concurrent.Callable
import akka.actor.{ Cancellable, ActorRef, Props } import akka.actor.{ Cancellable, ActorRef, Props }
import akka.event.LoggingAdapter
import akka.japi.Util import akka.japi.Util
import akka.stream.OperationAttributes._ import akka.stream.OperationAttributes._
import akka.stream._ import akka.stream._
@ -524,4 +525,86 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour
override def named(name: String): javadsl.Source[Out, Mat] = override def named(name: String): javadsl.Source[Out, Mat] =
new Source(delegate.named(name)) new Source(delegate.named(name))
/**
* Logs elements flowing through the stream as well as completion and erroring.
*
* By default element and completion signals are logged on debug level, and errors are logged on Error level.
* This can be adjusted according to your needs by providing a custom [[OperationAttributes.LogLevels]] atrribute on the given Flow:
*
* The `extract` function will be applied to each element before logging, so it is possible to log only those fields
* of a complex object flowing through this element.
*
* Uses the given [[LoggingAdapter]] for logging.
*
* '''Emits when''' the mapping function returns an element
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def log(name: String, extract: function.Function[Out, Any], log: LoggingAdapter): javadsl.Source[Out, Mat] =
new Source(delegate.log(name, e extract.apply(e))(log))
/**
* Logs elements flowing through the stream as well as completion and erroring.
*
* By default element and completion signals are logged on debug level, and errors are logged on Error level.
* This can be adjusted according to your needs by providing a custom [[OperationAttributes.LogLevels]] atrribute on the given Flow:
*
* The `extract` function will be applied to each element before logging, so it is possible to log only those fields
* of a complex object flowing through this element.
*
* Uses an internally created [[LoggingAdapter]] which uses `akka.stream.Log` as it's source (use this class to configure slf4j loggers).
*
* '''Emits when''' the mapping function returns an element
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def log(name: String, extract: function.Function[Out, Any]): javadsl.Source[Out, Mat] =
this.log(name, extract, null)
/**
* Logs elements flowing through the stream as well as completion and erroring.
*
* By default element and completion signals are logged on debug level, and errors are logged on Error level.
* This can be adjusted according to your needs by providing a custom [[OperationAttributes.LogLevels]] atrribute on the given Flow:
*
* Uses the given [[LoggingAdapter]] for logging.
*
* '''Emits when''' the mapping function returns an element
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def log(name: String, log: LoggingAdapter): javadsl.Source[Out, Mat] =
this.log(name, javaIdentityFunction[Out], log)
/**
* Logs elements flowing through the stream as well as completion and erroring.
*
* By default element and completion signals are logged on debug level, and errors are logged on Error level.
* This can be adjusted according to your needs by providing a custom [[OperationAttributes.LogLevels]] atrribute on the given Flow:
*
* Uses an internally created [[LoggingAdapter]] which uses `akka.stream.Log` as it's source (use this class to configure slf4j loggers).
*
* '''Emits when''' the mapping function returns an element
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def log(name: String): javadsl.Source[Out, Mat] =
this.log(name, javaIdentityFunction[Out], null)
} }

View file

@ -3,8 +3,17 @@
*/ */
package akka.stream package akka.stream
import akka.japi.function.Function
package object javadsl { package object javadsl {
val JavaIdentityFunction = new Function[Any, Any] {
@throws(classOf[Exception])
override def apply(param: Any): Any = param
}
def javaIdentityFunction[T] = JavaIdentityFunction.asInstanceOf[Function[T, T]]
def combinerToScala[M1, M2, M](f: akka.japi.function.Function2[M1, M2, M]): (M1, M2) M = def combinerToScala[M1, M2, M](f: akka.japi.function.Function2[M1, M2, M]): (M1, M2) M =
f match { f match {
case s: Function2[_, _, _] s.asInstanceOf[(M1, M2) M] case s: Function2[_, _, _] s.asInstanceOf[(M1, M2) M]

View file

@ -3,12 +3,15 @@
*/ */
package akka.stream.scaladsl package akka.stream.scaladsl
import akka.actor.ActorSystem
import akka.event.LoggingAdapter
import akka.stream.impl.Stages.{ MaterializingStageFactory, StageModule } import akka.stream.impl.Stages.{ MaterializingStageFactory, StageModule }
import akka.stream.impl.StreamLayout.{ EmptyModule, Module } import akka.stream.impl.StreamLayout.{ EmptyModule, Module }
import akka.stream._ import akka.stream._
import akka.stream.OperationAttributes._ import akka.stream.OperationAttributes._
import akka.util.Collections.EmptyImmutableSeq import akka.util.Collections.EmptyImmutableSeq
import org.reactivestreams.Processor import org.reactivestreams.Processor
import scala.annotation.implicitNotFound
import scala.annotation.unchecked.uncheckedVariance import scala.annotation.unchecked.uncheckedVariance
import scala.collection.immutable import scala.collection.immutable
import scala.concurrent.duration.{ Duration, FiniteDuration } import scala.concurrent.duration.{ Duration, FiniteDuration }
@ -340,6 +343,8 @@ trait FlowOps[+Out, +Mat] {
import FlowOps._ import FlowOps._
type Repr[+O, +M] <: FlowOps[O, M] type Repr[+O, +M] <: FlowOps[O, M]
private final val _identity = (x: Any) x
/** /**
* Transform this stream by applying the given function to each of the elements * Transform this stream by applying the given function to each of the elements
* as they pass through this processing step. * as they pass through this processing step.
@ -849,6 +854,26 @@ trait FlowOps[+Out, +Mat] {
private[akka] def timerTransform[U](mkStage: () TimerTransformer[Out, U]): Repr[U, Mat] = private[akka] def timerTransform[U](mkStage: () TimerTransformer[Out, U]): Repr[U, Mat] =
andThen(TimerTransform(mkStage.asInstanceOf[() TimerTransformer[Any, Any]])) andThen(TimerTransform(mkStage.asInstanceOf[() TimerTransformer[Any, Any]]))
/**
* Logs elements flowing through the stream as well as completion and erroring.
*
* By default element and completion signals are logged on debug level, and errors are logged on Error level.
* This can be adjusted according to your needs by providing a custom [[OperationAttributes.LogLevels]] atrribute on the given Flow:
*
* Uses implicit [[LoggingAdapter]] if available, otherwise uses an internally created one,
* which uses `akka.stream.Log` as it's source (use this class to configure slf4j loggers).
*
* '''Emits when''' the mapping function returns an element
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def log(name: String, extract: Out Any = _identity)(implicit log: LoggingAdapter = null): Repr[Out, Mat] =
andThen(Stages.Log(name, extract.asInstanceOf[Any Any], Option(log)))
def withAttributes(attr: OperationAttributes): Repr[Out, Mat] def withAttributes(attr: OperationAttributes): Repr[Out, Mat]
/** INTERNAL API */ /** INTERNAL API */

View file

@ -3,8 +3,7 @@
*/ */
package akka.stream.stage package akka.stream.stage
import akka.stream.Supervision import akka.stream.{ FlowMaterializer, OperationAttributes, Supervision }
import akka.stream.FlowMaterializer
/** /**
* General interface for stream transformation. * General interface for stream transformation.
@ -571,6 +570,9 @@ sealed trait Context[Out] {
* It can be used to materialize sub-flows. * It can be used to materialize sub-flows.
*/ */
def materializer: FlowMaterializer def materializer: FlowMaterializer
/** Returns operation attributes associated with the this Stage */
def attributes: OperationAttributes
} }
/** /**
@ -643,3 +645,4 @@ trait AsyncContext[Out, Ext] extends DetachedContext[Out] {
private[akka] trait BoundaryContext extends Context[Any] { private[akka] trait BoundaryContext extends Context[Any] {
def exit(): FreeDirective def exit(): FreeDirective
} }