Merge pull request #15701 from akka/wip-15564-stream-supervisor-patriknw

!str #15564 Add stream supervisor
This commit is contained in:
Patrik Nordwall 2014-08-21 13:31:25 +02:00
commit fa243e656b
12 changed files with 97 additions and 392 deletions

View file

@ -72,7 +72,7 @@ private object PersistentPublisher {
private case class PersistentPublisherNode(processorId: String, publisherSettings: PersistentPublisherSettings) extends PublisherNode[Persistent] {
def createPublisher(materializer: ActorBasedFlowMaterializer, flowName: String): Publisher[Persistent] =
ActorPublisher[Persistent](materializer.context.actorOf(PersistentPublisher.props(processorId, publisherSettings, materializer.settings),
ActorPublisher[Persistent](materializer.actorOf(PersistentPublisher.props(processorId, publisherSettings, materializer.settings),
name = s"$flowName-0-persistentPublisher"))
}

View file

@ -10,6 +10,10 @@ import akka.stream.impl.Ast
import org.reactivestreams.{ Publisher, Subscriber }
import scala.concurrent.duration._
import akka.actor.Deploy
import akka.actor.ExtendedActorSystem
import akka.actor.ActorContext
import akka.stream.impl.StreamSupervisor
import akka.stream.impl.FlowNameCounter
object FlowMaterializer {
@ -24,8 +28,21 @@ object FlowMaterializer {
* the processing steps. The default `namePrefix` is `"flow"`. The actor names are built up of
* `namePrefix-flowNumber-flowStepNumber-stepName`.
*/
def apply(settings: MaterializerSettings, namePrefix: Option[String] = None)(implicit context: ActorRefFactory): FlowMaterializer =
new ActorBasedFlowMaterializer(settings, context, namePrefix.getOrElse("flow"))
def apply(settings: MaterializerSettings, namePrefix: Option[String] = None)(implicit context: ActorRefFactory): FlowMaterializer = {
val system = context match {
case s: ExtendedActorSystem s
case c: ActorContext c.system
case null throw new IllegalArgumentException("ActorRefFactory context must be defined")
case _ throw new IllegalArgumentException(s"ActorRefFactory context must be a ActorSystem or ActorContext, " +
"got [${_contex.getClass.getName}]")
}
new ActorBasedFlowMaterializer(
settings,
context.actorOf(StreamSupervisor.props(settings).withDispatcher(settings.dispatcher)),
FlowNameCounter(system).counter,
namePrefix.getOrElse("flow"))
}
/**
* Java API: Creates a FlowMaterializer which will execute every step of a transformation

View file

@ -13,24 +13,6 @@ import scala.concurrent.duration.FiniteDuration
*/
object Implicits {
/**
* Implicit enrichment for stream logging.
*
* @see [[Log]]
*/
implicit class LogFlowDsl[T](val flow: Flow[T]) extends AnyVal {
def log(): Flow[T] = flow.transform(Log())
}
/**
* Implicit enrichment for stream logging.
*
* @see [[Log]]
*/
implicit class LogDuctDsl[In, Out](val duct: Duct[In, Out]) extends AnyVal {
def log(): Duct[In, Out] = duct.transform(Log())
}
/**
* Provides time measurement utilities on Stream elements.
*

View file

@ -1,99 +0,0 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.extra
import scala.collection.immutable
import akka.stream.Transformer
import akka.stream.impl.ActorBasedFlowMaterializer
import akka.actor.ActorContext
import akka.event.LoggingAdapter
import akka.event.Logging
/**
* Scala API: Mix in TransformerLogging into your [[akka.stream.Transformer]]
* to obtain a reference to a logger, which is available under the name [[#log]].
*/
trait TransformerLogging { this: Transformer[_, _]
private def context = ActorBasedFlowMaterializer.currentActorContext()
private var _log: LoggingAdapter = _
def log: LoggingAdapter = {
// only used in Actor, i.e. thread safe
if (_log eq null)
_log = Logging(context.system, context.self)
_log
}
}
object Log {
def apply[T](): Log[T] = new Log[T]
def apply[T](name: String): Log[T] = new Log[T](name)
}
/**
* Logs the elements, error and completion of a a flow.
*
* By default it logs `onNext` and `onComplete` at info log
* level, and `onError` at error log level. Subclass may customize
* the logging by overriding [[#logOnNext]], [[#logOnComplete]] and
* [[#logOnError]].
*
* The `logSource` of the [[akka.event.Logging.LogEvent]] is the path of
* the actor processing this step in the flow. It contains the
* flow name and the [[#name]] of this `Transformer`. The
* name can be customized with the [[#name]] constructor parameter.
*
* The [[akka.event.LoggingAdapter]] is accessible
* under the name `log`.
*
* Usage:
* {{{
* Flow(List(1, 2, 3)).transform(new Log[Int](name = "mylog") {
* override def logOnNext(i: Int): Unit =
* log.debug("Got element {}", i)
* }).
* consume(materializer)
* }}}
*
* Or with the implicit enrichment:
* {{{
* import akka.stream.extra.Implicits._
* Flow(List(1, 2, 3)).log().consume(materializer)
* }}}
*
*/
class Log[T](override val name: String = "log") extends Transformer[T, T] with TransformerLogging {
final def onNext(element: T): immutable.Seq[T] = {
logOnNext(element)
List(element)
}
def logOnNext(element: T): Unit = {
log.info("OnNext: [{}]", element)
}
final override def onTermination(e: Option[Throwable]): immutable.Seq[T] = {
logOnComplete()
Nil
}
def logOnComplete(): Unit = {
log.info("OnComplete")
}
final override def onError(cause: Throwable): Unit = {
logOnError(cause)
throw cause
}
def logOnError(cause: Throwable): Unit = {
log.error(cause, "OnError")
}
final override def isComplete: Boolean = false
}

View file

@ -21,6 +21,16 @@ import akka.actor.ActorSystem
import akka.actor.Extension
import scala.concurrent.duration.FiniteDuration
import akka.stream.TimerTransformer
import akka.actor.Props
import akka.actor.Actor
import akka.actor.ActorRef
import akka.pattern.ask
import akka.util.Timeout
import scala.concurrent.duration._
import scala.concurrent.Await
import akka.actor.LocalActorRef
import akka.actor.RepointableActorRef
import akka.actor.ActorCell
/**
* INTERNAL API
@ -84,36 +94,36 @@ private[akka] object Ast {
final case class IteratorPublisherNode[I](iterator: Iterator[I]) extends PublisherNode[I] {
final def createPublisher(materializer: ActorBasedFlowMaterializer, flowName: String): Publisher[I] =
if (iterator.isEmpty) EmptyPublisher.asInstanceOf[Publisher[I]]
else ActorPublisher[I](materializer.context.actorOf(IteratorPublisher.props(iterator, materializer.settings),
else ActorPublisher[I](materializer.actorOf(IteratorPublisher.props(iterator, materializer.settings),
name = s"$flowName-0-iterator"))
}
final case class IterablePublisherNode[I](iterable: immutable.Iterable[I]) extends PublisherNode[I] {
def createPublisher(materializer: ActorBasedFlowMaterializer, flowName: String): Publisher[I] =
if (iterable.isEmpty) EmptyPublisher.asInstanceOf[Publisher[I]]
else ActorPublisher[I](materializer.context.actorOf(IterablePublisher.props(iterable, materializer.settings),
else ActorPublisher[I](materializer.actorOf(IterablePublisher.props(iterable, materializer.settings),
name = s"$flowName-0-iterable"), Some(iterable))
}
final case class ThunkPublisherNode[I](f: () I) extends PublisherNode[I] {
def createPublisher(materializer: ActorBasedFlowMaterializer, flowName: String): Publisher[I] =
ActorPublisher[I](materializer.context.actorOf(SimpleCallbackPublisher.props(materializer.settings, f),
ActorPublisher[I](materializer.actorOf(SimpleCallbackPublisher.props(materializer.settings, f),
name = s"$flowName-0-thunk"))
}
final case class FuturePublisherNode[I](future: Future[I]) extends PublisherNode[I] {
def createPublisher(materializer: ActorBasedFlowMaterializer, flowName: String): Publisher[I] =
future.value match {
case Some(Success(element))
ActorPublisher[I](materializer.context.actorOf(IterablePublisher.props(List(element), materializer.settings),
ActorPublisher[I](materializer.actorOf(IterablePublisher.props(List(element), materializer.settings),
name = s"$flowName-0-future"), Some(future))
case Some(Failure(t))
ErrorPublisher(t).asInstanceOf[Publisher[I]]
case None
ActorPublisher[I](materializer.context.actorOf(FuturePublisher.props(future, materializer.settings),
ActorPublisher[I](materializer.actorOf(FuturePublisher.props(future, materializer.settings),
name = s"$flowName-0-future"), Some(future))
}
}
final case class TickPublisherNode[I](initialDelay: FiniteDuration, interval: FiniteDuration, tick: () I) extends PublisherNode[I] {
def createPublisher(materializer: ActorBasedFlowMaterializer, flowName: String): Publisher[I] =
ActorPublisher[I](materializer.context.actorOf(TickPublisher.props(initialDelay, interval, tick, materializer.settings),
ActorPublisher[I](materializer.actorOf(TickPublisher.props(initialDelay, interval, tick, materializer.settings),
name = s"$flowName-0-tick"))
}
}
@ -121,60 +131,18 @@ private[akka] object Ast {
/**
* INTERNAL API
*/
private[akka] object ActorBasedFlowMaterializer {
val ctx = new ThreadLocal[ActorRefFactory]
def withCtx[T](arf: ActorRefFactory)(block: T): T = {
val old = ctx.get()
ctx.set(arf)
try block
finally ctx.set(old)
}
def currentActorContext(): ActorContext =
ActorBasedFlowMaterializer.ctx.get() match {
case c: ActorContext c
case _
throw new IllegalStateException(s"Transformer [${getClass.getName}] is running without ActorContext")
}
}
/**
* INTERNAL API
*/
private[akka] class ActorBasedFlowMaterializer(
settings: MaterializerSettings,
_context: ActorRefFactory,
private[akka] case class ActorBasedFlowMaterializer(
override val settings: MaterializerSettings,
supervisor: ActorRef,
flowNameCounter: AtomicLong,
namePrefix: String)
extends FlowMaterializer(settings) {
import Ast._
import ActorBasedFlowMaterializer._
_context match {
case _: ActorSystem | _: ActorContext // ok
case null throw new IllegalArgumentException("ActorRefFactory context must be defined")
case _ throw new IllegalArgumentException(s"ActorRefFactory context must be a ActorSystem or ActorContext, " +
"got [${_contex.getClass.getName}]")
}
def withNamePrefix(name: String): FlowMaterializer = this.copy(namePrefix = name)
def context = ctx.get() match {
case null _context
case x x
}
def withNamePrefix(name: String): FlowMaterializer =
new ActorBasedFlowMaterializer(settings, _context, name)
private def system: ActorSystem = _context match {
case s: ExtendedActorSystem s
case c: ActorContext c.system
case _
throw new IllegalArgumentException(s"Unknown ActorRefFactory [${_context.getClass.getName}")
}
private def nextFlowNameCount(): Long = FlowNameCounter(system).counter.incrementAndGet()
private def nextFlowNameCount(): Long = flowNameCounter.incrementAndGet()
private def createFlowName(): String = s"$namePrefix-${nextFlowNameCount()}"
@ -207,9 +175,26 @@ private[akka] class ActorBasedFlowMaterializer(
override def onNext(element: Any) = List(element)
})
def processorForNode(op: AstNode, flowName: String, n: Int): Processor[Any, Any] =
ActorProcessor(context.actorOf(ActorProcessor.props(settings, op),
name = s"$flowName-$n-${op.name}"))
def processorForNode(op: AstNode, flowName: String, n: Int): Processor[Any, Any] = {
val impl = actorOf(ActorProcessor.props(settings, op), s"$flowName-$n-${op.name}")
ActorProcessor(impl)
}
def actorOf(props: Props, name: String): ActorRef = supervisor match {
case ref: LocalActorRef
ref.underlying.attachChild(props, name, systemService = false)
case ref: RepointableActorRef
if (ref.isStarted)
ref.underlying.asInstanceOf[ActorCell].attachChild(props, name, systemService = false)
else {
implicit val timeout = ref.system.settings.CreationTimeout
val f = (supervisor ? StreamSupervisor.Materialize(props, name)).mapTo[ActorRef]
Await.result(f, timeout.duration)
}
case _
throw new IllegalStateException(s"Stream supervisor must be a local actor, was [${supervisor.getClass.getName}]")
}
override def ductProduceTo[In, Out](subscriber: Subscriber[Out], ops: List[Ast.AstNode]): Subscriber[In] =
processorChain(subscriber, ops, createFlowName(), ops.size).asInstanceOf[Subscriber[In]]
@ -243,4 +228,23 @@ private[akka] object FlowNameCounter extends ExtensionId[FlowNameCounter] with E
*/
private[akka] class FlowNameCounter extends Extension {
val counter = new AtomicLong(0)
}
/**
* INTERNAL API
*/
private[akka] object StreamSupervisor {
def props(settings: MaterializerSettings): Props = Props(new StreamSupervisor(settings))
case class Materialize(props: Props, name: String)
}
private[akka] class StreamSupervisor(settings: MaterializerSettings) extends Actor {
import StreamSupervisor._
def receive = {
case Materialize(props, name)
val impl = context.actorOf(props, name)
sender() ! impl
}
}

View file

@ -194,7 +194,7 @@ private[akka] class SimpleCallbackPublisherImpl[T](f: () ⇒ T, settings: Materi
if (demand > 0) {
try {
demand -= 1
pushToDownstream(withCtx(context)(f()))
pushToDownstream(f())
if (demand > 0) self ! Generate
} catch {
case Stop { completeDownstream(); shutdownReason = None }

View file

@ -95,7 +95,7 @@ private[akka] class IterablePublisher(iterable: immutable.Iterable[Any], setting
if (subscribers(subscriber))
subscriber.onError(new IllegalStateException(s"Cannot subscribe $subscriber twice"))
else {
val iterator = withCtx(context)(iterable.iterator)
val iterator = iterable.iterator
val worker = context.watch(context.actorOf(IterablePublisherWorker.props(iterator, subscriber,
settings.maximumInputBufferSize).withDispatcher(context.props.dispatcher)))
val subscription = new BasicActorSubscription(worker)
@ -155,7 +155,7 @@ private[akka] class IterablePublisherWorker(iterator: Iterator[Any], subscriber:
@tailrec def doPush(n: Int): Unit =
if (demand > 0) {
demand -= 1
val hasNext = withCtx(context) {
val hasNext = {
subscriber.onNext(iterator.next())
iterator.hasNext
}

View file

@ -72,21 +72,19 @@ private[akka] class TickPublisher(initialDelay: FiniteDuration, interval: Finite
def active: Receive = {
case Tick
ActorBasedFlowMaterializer.withCtx(context) {
try {
val tickElement = tick()
demand foreach {
case (subscriber, d)
if (d > 0) {
demand(subscriber) = d - 1
subscriber.onNext(tickElement)
}
}
} catch {
case NonFatal(e)
// tick closure throwed => onError downstream
demand foreach { case (subscriber, _) subscriber.onError(e) }
try {
val tickElement = tick()
demand foreach {
case (subscriber, d)
if (d > 0) {
demand(subscriber) = d - 1
subscriber.onNext(tickElement)
}
}
} catch {
case NonFatal(e)
// tick closure throwed => onError downstream
demand foreach { case (subscriber, _) subscriber.onError(e) }
}
case RequestMore(elements, subscriber)

View file

@ -158,7 +158,7 @@ private[akka] trait Pump {
// Generate upstream requestMore for every Nth consumed input element
final def pump(): Unit = {
try while (transferState.isExecutable) {
ActorBasedFlowMaterializer.withCtx(pumpContext)(currentAction())
currentAction()
} catch { case NonFatal(e) pumpFailed(e) }
if (isPumpFinished) pumpFinished()

View file

@ -1,48 +0,0 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream
import akka.stream.testkit.AkkaSpec
import akka.stream.scaladsl.Flow
import akka.actor.ActorContext
import scala.concurrent.Await
import scala.concurrent.duration._
import akka.actor.ActorRef
import scala.collection.immutable.TreeSet
import scala.util.control.NonFatal
import akka.stream.impl.ActorBasedFlowMaterializer
class ProcessorHierarchySpec extends AkkaSpec("akka.actor.debug.lifecycle=off\nakka.loglevel=INFO") {
val materializer = FlowMaterializer(MaterializerSettings(dispatcher = "akka.test.stream-dispatcher"))
def self = ActorBasedFlowMaterializer.currentActorContext().self
"An ActorBasedFlowMaterializer" must {
"generate the right level of descendants" in {
val f = Flow(() {
testActor ! self
Flow(List(1)).map(x { testActor ! self; x }).toPublisher(materializer)
}).take(3).foreach(x {
testActor ! self
Flow(x).foreach(_ testActor ! self, materializer)
}, materializer)
Await.result(f, 3.seconds)
val refs = receiveWhile(idle = 250.millis) {
case r: ActorRef r
}
try {
refs.toSet.size should be(8)
refs.distinct.map(_.path.elements.size).groupBy(x x).mapValues(x x.size) should be(Map(2 -> 2, 3 -> 6))
} catch {
case NonFatal(e)
println(refs.map(_.toString).to[TreeSet].mkString("\n"))
throw e
}
}
}
}

View file

@ -1,72 +0,0 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream
import akka.stream.testkit.AkkaSpec
import akka.stream.scaladsl.Flow
import akka.actor.ActorContext
import scala.concurrent.Await
import scala.concurrent.duration._
import akka.actor.ActorRef
import scala.collection.immutable.TreeSet
import scala.util.control.NonFatal
import akka.stream.impl.ActorBasedFlowMaterializer
import akka.stream.impl.FlowNameCounter
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class ProcessorNamingSpec extends AkkaSpec("akka.loglevel=INFO") {
def self = ActorBasedFlowMaterializer.currentActorContext().self
def flowCount = FlowNameCounter(system).counter.get
"Processors of a flow" must {
"have sensible default names for flow with one step" in {
val materializer = FlowMaterializer(MaterializerSettings(dispatcher = "akka.test.stream-dispatcher"))
Flow(List(1)).map(in { testActor ! self; in }).consume(materializer)
expectMsgType[ActorRef].path.name should be(s"flow-$flowCount-1-map")
}
"have sensible default names for flow with several steps" in {
val materializer = FlowMaterializer(MaterializerSettings(dispatcher = "akka.test.stream-dispatcher"))
Flow(List(1)).
map(in { testActor ! self; in }).
transform(new Transformer[Int, Int] {
override def onNext(in: Int) = { testActor ! self; List(in) }
}).
filter(_ { testActor ! self; true }).
consume(materializer)
expectMsgType[ActorRef].path.name should be(s"flow-$flowCount-1-map")
expectMsgType[ActorRef].path.name should be(s"flow-$flowCount-2-transform")
expectMsgType[ActorRef].path.name should be(s"flow-$flowCount-3-filter")
}
"use specified flow namePrefix in materializer" in {
val materializer = FlowMaterializer(MaterializerSettings(dispatcher = "akka.test.stream-dispatcher"),
namePrefix = Some("myflow"))
Flow(List(1)).map(in { testActor ! self; in }).consume(materializer)
expectMsgType[ActorRef].path.name should be(s"myflow-$flowCount-1-map")
}
"use specified withNamePrefix in materializer" in {
val materializer = FlowMaterializer(MaterializerSettings(dispatcher = "akka.test.stream-dispatcher"))
Flow(List(2)).map(in { testActor ! self; in }).consume(materializer.withNamePrefix("myotherflow"))
expectMsgType[ActorRef].path.name should be(s"myotherflow-$flowCount-1-map")
}
"create unique name for each materialization" in {
val materializer = FlowMaterializer(MaterializerSettings(dispatcher = "akka.test.stream-dispatcher"),
namePrefix = Some("myflow"))
val flow = Flow(List(1)).map(in { testActor ! self; in })
flow.consume(materializer)
val name1 = expectMsgType[ActorRef].path.name
flow.consume(materializer)
val name2 = expectMsgType[ActorRef].path.name
name1 should not be (name2)
}
}
}

View file

@ -1,77 +0,0 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.extra
import scala.concurrent.duration._
import scala.util.control.NoStackTrace
import akka.stream.FlowMaterializer
import akka.stream.MaterializerSettings
import akka.stream.impl.ActorBasedFlowMaterializer
import akka.stream.scaladsl.Flow
import akka.stream.testkit.AkkaSpec
import akka.testkit.EventFilter
import akka.stream.impl.FlowNameCounter
object LogSpec {
class TestException extends IllegalArgumentException("simulated err") with NoStackTrace
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class LogSpec extends AkkaSpec("akka.loglevel=INFO") {
import LogSpec._
val materializer = FlowMaterializer(MaterializerSettings(dispatcher = "akka.test.stream-dispatcher"))
def flowCount = FlowNameCounter(system).counter.get
def nextFlowCount = flowCount + 1
"Log Transformer" must {
"log onNext elements" in {
EventFilter.info(source = s"akka://LogSpec/user/flow-$nextFlowCount-1-log", pattern = """OnNext: \[[1|2|3]\]""", occurrences = 3) intercept {
Flow(List(1, 2, 3)).
transform(Log()).
consume(materializer)
}
}
"log onComplete" in {
EventFilter.info(source = s"akka://LogSpec/user/flow-$nextFlowCount-1-log", message = "OnComplete", occurrences = 1) intercept {
Flow(Nil).
transform(Log()).
consume(materializer)
}
}
"log onError exception" in {
// FIXME the "failure during processing" occurrence comes from ActorProcessImpl#fail, and will probably be removed
EventFilter[TestException](source = s"akka://LogSpec/user/flow-$nextFlowCount-2-mylog",
pattern = "[OnError: simulated err|failure during processing]", occurrences = 2) intercept {
Flow(List(1, 2, 3)).
map(i if (i == 2) throw new TestException else i).
transform(Log(name = "mylog")).
consume(materializer)
}
}
"have type inference" in {
val f1: Flow[Int] = Flow(List(1, 2, 3)).transform(Log[Int])
val f2: Flow[Int] = Flow(List(1, 2, 3)).transform(Log())
val f3: Flow[String] = Flow(List(1, 2, 3)).transform(Log[Int]).map((i: Int) i.toString).transform(Log[String])
val f4: Flow[String] = Flow(List(1, 2, 3)).transform(Log()).map((i: Int) i.toString).transform(Log())
val f5: Flow[String] =
Flow(List(1, 2, 3)).transform(new Log[Int](name = "mylog") {
override def logOnNext(i: Int): Unit =
log.debug("Got element {}", i)
}).map((i: Int) i.toString)
}
"have nice DSL" in {
import akka.stream.extra.Implicits._
val f: Flow[String] = Flow(List(1, 2, 3)).log().map((i: Int) i.toString).log()
}
}
}