2014-08-27 18:21:44 +02:00
|
|
|
/**
|
|
|
|
|
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
|
|
|
|
|
*/
|
|
|
|
|
package akka.stream.impl2
|
|
|
|
|
|
|
|
|
|
import java.util.concurrent.atomic.AtomicLong
|
|
|
|
|
import akka.actor.{ Actor, ActorCell, ActorRef, ActorSystem, ExtendedActorSystem, Extension, ExtensionId, ExtensionIdProvider, LocalActorRef, Props, RepointableActorRef }
|
|
|
|
|
import akka.pattern.ask
|
|
|
|
|
import org.reactivestreams.{ Processor, Publisher, Subscriber }
|
|
|
|
|
import scala.annotation.tailrec
|
|
|
|
|
import scala.collection.immutable
|
|
|
|
|
import scala.concurrent.{ Await, Future }
|
|
|
|
|
import scala.concurrent.duration._
|
|
|
|
|
import scala.util.{ Failure, Success }
|
|
|
|
|
import akka.stream.Transformer
|
|
|
|
|
import akka.stream.scaladsl2.FlowMaterializer
|
|
|
|
|
import akka.stream.MaterializerSettings
|
|
|
|
|
import akka.stream.impl.ActorPublisher
|
|
|
|
|
import akka.stream.impl.IterablePublisher
|
|
|
|
|
import akka.stream.impl.TransformProcessorImpl
|
|
|
|
|
import akka.stream.impl.ActorProcessor
|
|
|
|
|
import akka.stream.impl.ExposedPublisher
|
2014-09-01 13:12:18 +02:00
|
|
|
import akka.stream.scaladsl2.Source
|
|
|
|
|
import akka.stream.scaladsl2.Sink
|
|
|
|
|
import akka.stream.scaladsl2.MaterializedFlow
|
2014-09-02 11:01:10 +02:00
|
|
|
import akka.stream.scaladsl2.IterableSource
|
|
|
|
|
import akka.stream.impl.EmptyPublisher
|
2014-08-27 18:21:44 +02:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
|
|
|
|
private[akka] object Ast {
|
|
|
|
|
sealed trait AstNode {
|
|
|
|
|
def name: String
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
case class Transform(name: String, mkTransformer: () ⇒ Transformer[Any, Any]) extends AstNode
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
|
|
|
|
private[akka] case class ActorBasedFlowMaterializer(
|
|
|
|
|
override val settings: MaterializerSettings,
|
|
|
|
|
supervisor: ActorRef,
|
|
|
|
|
flowNameCounter: AtomicLong,
|
|
|
|
|
namePrefix: String)
|
|
|
|
|
extends FlowMaterializer(settings) {
|
|
|
|
|
import akka.stream.impl2.Ast._
|
|
|
|
|
|
|
|
|
|
def withNamePrefix(name: String): FlowMaterializer = this.copy(namePrefix = name)
|
|
|
|
|
|
|
|
|
|
private def nextFlowNameCount(): Long = flowNameCounter.incrementAndGet()
|
|
|
|
|
|
|
|
|
|
private def createFlowName(): String = s"$namePrefix-${nextFlowNameCount()}"
|
|
|
|
|
|
|
|
|
|
@tailrec private def processorChain(topSubscriber: Subscriber[_], ops: immutable.Seq[AstNode],
|
|
|
|
|
flowName: String, n: Int): Subscriber[_] = {
|
|
|
|
|
ops match {
|
|
|
|
|
case op :: tail ⇒
|
|
|
|
|
val opProcessor: Processor[Any, Any] = processorForNode(op, flowName, n)
|
|
|
|
|
opProcessor.subscribe(topSubscriber.asInstanceOf[Subscriber[Any]])
|
|
|
|
|
processorChain(opProcessor, tail, flowName, n - 1)
|
|
|
|
|
case _ ⇒ topSubscriber
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Ops come in reverse order
|
2014-09-01 13:12:18 +02:00
|
|
|
override def materialize[In, Out](source: Source[In], sink: Sink[Out], ops: List[Ast.AstNode]): MaterializedFlow = {
|
2014-08-27 18:21:44 +02:00
|
|
|
val flowName = createFlowName()
|
2014-09-02 11:01:10 +02:00
|
|
|
val (s, p) =
|
|
|
|
|
if (ops.isEmpty) {
|
|
|
|
|
val identityProcessor: Processor[In, Out] = processorForNode(identityTransform, flowName, 1).asInstanceOf[Processor[In, Out]]
|
|
|
|
|
(identityProcessor, identityProcessor)
|
|
|
|
|
} else {
|
2014-09-01 13:12:18 +02:00
|
|
|
val opsSize = ops.size
|
2014-09-02 11:01:10 +02:00
|
|
|
val outProcessor = processorForNode(ops.head, flowName, opsSize).asInstanceOf[Processor[In, Out]]
|
|
|
|
|
val topSubscriber = processorChain(outProcessor, ops.tail, flowName, opsSize - 1).asInstanceOf[Processor[In, Out]]
|
|
|
|
|
(topSubscriber, outProcessor)
|
2014-09-01 13:12:18 +02:00
|
|
|
}
|
2014-09-02 11:01:10 +02:00
|
|
|
val sourceValue = source.attach(s, this, flowName)
|
2014-09-01 13:12:18 +02:00
|
|
|
val sinkValue = sink.attach(p, this)
|
|
|
|
|
new MaterializedFlow(source, sourceValue, sink, sinkValue)
|
2014-09-02 11:01:10 +02:00
|
|
|
|
2014-08-27 18:21:44 +02:00
|
|
|
}
|
|
|
|
|
|
2014-09-02 11:01:10 +02:00
|
|
|
private def identityProcessor[I](flowName: String): Processor[I, I] =
|
2014-09-01 13:12:18 +02:00
|
|
|
processorForNode(identityTransform, flowName, 1).asInstanceOf[Processor[I, I]]
|
|
|
|
|
|
2014-08-27 18:21:44 +02:00
|
|
|
private val identityTransform = Transform("identity", () ⇒
|
|
|
|
|
new Transformer[Any, Any] {
|
|
|
|
|
override def onNext(element: Any) = List(element)
|
|
|
|
|
})
|
|
|
|
|
|
2014-09-02 11:01:10 +02:00
|
|
|
override def materializeSource[In](source: IterableSource[In], flowName: String): Publisher[In] = {
|
|
|
|
|
if (source.iterable.isEmpty) EmptyPublisher[In]
|
|
|
|
|
else ActorPublisher(actorOf(IterablePublisher.props(source.iterable, settings),
|
|
|
|
|
name = s"$flowName-0-iterable"), Some(source.iterable))
|
|
|
|
|
}
|
|
|
|
|
|
2014-09-01 13:12:18 +02:00
|
|
|
private def processorForNode(op: AstNode, flowName: String, n: Int): Processor[Any, Any] = {
|
2014-08-27 18:21:44 +02:00
|
|
|
val impl = actorOf(ActorProcessorFactory.props(settings, op), s"$flowName-$n-${op.name}")
|
|
|
|
|
ActorProcessorFactory(impl)
|
|
|
|
|
}
|
|
|
|
|
|
2014-09-01 13:12:18 +02:00
|
|
|
private def actorOf(props: Props, name: String): ActorRef = supervisor match {
|
2014-08-27 18:21:44 +02:00
|
|
|
case ref: LocalActorRef ⇒
|
|
|
|
|
ref.underlying.attachChild(props, name, systemService = false)
|
|
|
|
|
case ref: RepointableActorRef ⇒
|
|
|
|
|
if (ref.isStarted)
|
|
|
|
|
ref.underlying.asInstanceOf[ActorCell].attachChild(props, name, systemService = false)
|
|
|
|
|
else {
|
|
|
|
|
implicit val timeout = ref.system.settings.CreationTimeout
|
|
|
|
|
val f = (supervisor ? StreamSupervisor.Materialize(props, name)).mapTo[ActorRef]
|
|
|
|
|
Await.result(f, timeout.duration)
|
|
|
|
|
}
|
|
|
|
|
case _ ⇒
|
|
|
|
|
throw new IllegalStateException(s"Stream supervisor must be a local actor, was [${supervisor.getClass.getName}]")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
|
|
|
|
private[akka] object FlowNameCounter extends ExtensionId[FlowNameCounter] with ExtensionIdProvider {
|
|
|
|
|
override def get(system: ActorSystem): FlowNameCounter = super.get(system)
|
|
|
|
|
override def lookup = FlowNameCounter
|
|
|
|
|
override def createExtension(system: ExtendedActorSystem): FlowNameCounter = new FlowNameCounter
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
|
|
|
|
private[akka] class FlowNameCounter extends Extension {
|
|
|
|
|
val counter = new AtomicLong(0)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
|
|
|
|
private[akka] object StreamSupervisor {
|
|
|
|
|
def props(settings: MaterializerSettings): Props = Props(new StreamSupervisor(settings))
|
|
|
|
|
|
|
|
|
|
case class Materialize(props: Props, name: String)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private[akka] class StreamSupervisor(settings: MaterializerSettings) extends Actor {
|
|
|
|
|
import StreamSupervisor._
|
|
|
|
|
|
|
|
|
|
def receive = {
|
|
|
|
|
case Materialize(props, name) ⇒
|
|
|
|
|
val impl = context.actorOf(props, name)
|
|
|
|
|
sender() ! impl
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
|
|
|
|
private[akka] object ActorProcessorFactory {
|
|
|
|
|
|
|
|
|
|
import Ast._
|
|
|
|
|
def props(settings: MaterializerSettings, op: AstNode): Props =
|
|
|
|
|
(op match {
|
|
|
|
|
case t: Transform ⇒ Props(new TransformProcessorImpl(settings, t.mkTransformer()))
|
|
|
|
|
}).withDispatcher(settings.dispatcher)
|
|
|
|
|
|
|
|
|
|
def apply[I, O](impl: ActorRef): ActorProcessor[I, O] = {
|
|
|
|
|
val p = new ActorProcessor[I, O](impl)
|
|
|
|
|
impl ! ExposedPublisher(p.asInstanceOf[ActorPublisher[Any]])
|
|
|
|
|
p
|
|
|
|
|
}
|
|
|
|
|
}
|