!str #16039 Remove old scaladsl, rename scaladsl2

* and impl2, testkit2
* keeping io2 for now
This commit is contained in:
Patrik Nordwall 2014-10-27 14:35:41 +01:00
parent 5562ceb94b
commit 7c0c618791
221 changed files with 1540 additions and 7985 deletions

View file

@ -5,16 +5,29 @@ package akka.stream.impl
import java.util.concurrent.atomic.AtomicLong
import akka.actor.{ Actor, ActorCell, ActorRef, ActorSystem, ExtendedActorSystem, Extension, ExtensionId, ExtensionIdProvider, LocalActorRef, Props, RepointableActorRef }
import akka.pattern.ask
import akka.stream._
import org.reactivestreams.{ Processor, Publisher, Subscriber }
import scala.annotation.tailrec
import scala.collection.immutable
import scala.concurrent.{ Await, Future }
import scala.concurrent.duration._
import scala.util.{ Failure, Success }
import akka.actor.Actor
import akka.actor.ActorCell
import akka.actor.ActorRef
import akka.actor.ActorSystem
import akka.actor.ExtendedActorSystem
import akka.actor.Extension
import akka.actor.ExtensionId
import akka.actor.ExtensionIdProvider
import akka.actor.LocalActorRef
import akka.actor.Props
import akka.actor.RepointableActorRef
import akka.actor.SupervisorStrategy
import akka.stream.{ FlowMaterializer, MaterializerSettings, OverflowStrategy, TimerTransformer, Transformer }
import akka.stream.MaterializationException
import akka.stream.actor.ActorSubscriber
import akka.stream.impl.Zip.ZipAs
import akka.stream.scaladsl._
import akka.pattern.ask
import org.reactivestreams.{ Processor, Publisher, Subscriber }
/**
* INTERNAL API
@ -24,35 +37,30 @@ private[akka] object Ast {
def name: String
}
case class FanoutBox(initialBufferSize: Int, maximumBufferSize: Int) extends AstNode {
override def name = "fanoutBox"
}
case class Transform(name: String, mkTransformer: () Transformer[Any, Any]) extends AstNode
case class TimerTransform(name: String, mkTransformer: () TimerTransformer[Any, Any]) extends AstNode
case class MapFuture(f: Any Future[Any]) extends AstNode {
override def name = "mapFuture"
case class MapAsync(f: Any Future[Any]) extends AstNode {
override def name = "mapAsync"
}
case class MapAsyncUnordered(f: Any Future[Any]) extends AstNode {
override def name = "mapAsyncUnordered"
}
case class GroupBy(f: Any Any) extends AstNode {
override def name = "groupBy"
}
case class SplitWhen(p: Any Boolean) extends AstNode {
override def name = "splitWhen"
}
case class Merge(other: Publisher[Any]) extends AstNode {
override def name = "merge"
}
case class Zip(other: Publisher[Any]) extends AstNode {
override def name = "zip"
}
case class Concat(next: Publisher[Any]) extends AstNode {
override def name = "concat"
}
case class Broadcast(other: Subscriber[Any]) extends AstNode {
override def name = "broadcast"
}
case class PrefixAndTail(n: Int) extends AstNode {
override def name = "prefixAndTail"
}
case class SplitWhen(p: Any Boolean) extends AstNode {
override def name = "splitWhen"
}
case object ConcatAll extends AstNode {
override def name = "concatFlatten"
}
@ -69,61 +77,58 @@ private[akka] object Ast {
override def name = "buffer"
}
trait PublisherNode[I] {
private[akka] def createPublisher(materializer: ActorBasedFlowMaterializer, flowName: String): Publisher[I]
sealed trait JunctionAstNode {
def name: String
}
final case class ExistingPublisher[I](publisher: Publisher[I]) extends PublisherNode[I] {
def createPublisher(materializer: ActorBasedFlowMaterializer, flowName: String) = publisher
// FIXME: Try to eliminate these
sealed trait FanInAstNode extends JunctionAstNode
sealed trait FanOutAstNode extends JunctionAstNode
case object Merge extends FanInAstNode {
override def name = "merge"
}
final case class IteratorPublisherNode[I](iterator: Iterator[I]) extends PublisherNode[I] {
final def createPublisher(materializer: ActorBasedFlowMaterializer, flowName: String): Publisher[I] =
if (iterator.isEmpty) EmptyPublisher[I]
else ActorPublisher[I](materializer.actorOf(IteratorPublisher.props(iterator, materializer.settings),
name = s"$flowName-0-iterator"))
case object MergePreferred extends FanInAstNode {
override def name = "mergePreferred"
}
final case class IterablePublisherNode[I](iterable: immutable.Iterable[I]) extends PublisherNode[I] {
def createPublisher(materializer: ActorBasedFlowMaterializer, flowName: String): Publisher[I] =
if (iterable.isEmpty) EmptyPublisher[I]
else ActorPublisher[I](materializer.actorOf(IterablePublisher.props(iterable, materializer.settings),
name = s"$flowName-0-iterable"), Some(iterable))
case object Broadcast extends FanOutAstNode {
override def name = "broadcast"
}
final case class ThunkPublisherNode[I](f: () I) extends PublisherNode[I] {
def createPublisher(materializer: ActorBasedFlowMaterializer, flowName: String): Publisher[I] =
ActorPublisher[I](materializer.actorOf(SimpleCallbackPublisher.props(materializer.settings, f),
name = s"$flowName-0-thunk"))
case object Balance extends FanOutAstNode {
override def name = "balance"
}
final case class FuturePublisherNode[I](future: Future[I]) extends PublisherNode[I] {
def createPublisher(materializer: ActorBasedFlowMaterializer, flowName: String): Publisher[I] =
future.value match {
case Some(Success(element))
ActorPublisher[I](materializer.actorOf(IterablePublisher.props(List(element), materializer.settings),
name = s"$flowName-0-future"), Some(future))
case Some(Failure(t))
ErrorPublisher(t).asInstanceOf[Publisher[I]]
case None
ActorPublisher[I](materializer.actorOf(FuturePublisher.props(future, materializer.settings),
name = s"$flowName-0-future"), Some(future))
}
final case class Zip(as: ZipAs) extends FanInAstNode {
override def name = "zip"
}
final case class TickPublisherNode[I](initialDelay: FiniteDuration, interval: FiniteDuration, tick: () I) extends PublisherNode[I] {
def createPublisher(materializer: ActorBasedFlowMaterializer, flowName: String): Publisher[I] =
ActorPublisher[I](materializer.actorOf(TickPublisher.props(initialDelay, interval, tick, materializer.settings),
name = s"$flowName-0-tick"))
case object Unzip extends FanOutAstNode {
override def name = "unzip"
}
case object Concat extends FanInAstNode {
override def name = "concat"
}
case class FlexiMergeNode(merger: FlexiMerge[Any]) extends FanInAstNode {
override def name = merger.name.getOrElse("")
}
}
/**
* INTERNAL API
*/
private[akka] case class ActorBasedFlowMaterializer(
override val settings: MaterializerSettings,
supervisor: ActorRef,
flowNameCounter: AtomicLong,
namePrefix: String)
case class ActorBasedFlowMaterializer(override val settings: MaterializerSettings,
supervisor: ActorRef,
flowNameCounter: AtomicLong,
namePrefix: String)
extends FlowMaterializer(settings) {
import akka.stream.impl.Ast._
import Ast.AstNode
def withNamePrefix(name: String): FlowMaterializer = this.copy(namePrefix = name)
@ -143,26 +148,67 @@ private[akka] case class ActorBasedFlowMaterializer(
}
// Ops come in reverse order
override def toPublisher[I, O](publisherNode: PublisherNode[I], ops: List[AstNode]): Publisher[O] = {
override def materialize[In, Out](source: Source[In], sink: Sink[Out], ops: List[Ast.AstNode]): MaterializedMap = {
val flowName = createFlowName()
if (ops.isEmpty) publisherNode.createPublisher(this, flowName).asInstanceOf[Publisher[O]]
else {
val opsSize = ops.size
val opProcessor = processorForNode(ops.head, flowName, opsSize)
val topSubscriber = processorChain(opProcessor, ops.tail, flowName, opsSize - 1)
publisherNode.createPublisher(this, flowName).subscribe(topSubscriber.asInstanceOf[Subscriber[I]])
opProcessor.asInstanceOf[Publisher[O]]
def throwUnknownType(typeName: String, s: Any): Nothing =
throw new MaterializationException(s"unknown $typeName type " + s.getClass)
def attachSink(pub: Publisher[Out]) = sink match {
case s: ActorFlowSink[Out] s.attach(pub, this, flowName)
case s throwUnknownType("Sink", s)
}
def attachSource(sub: Subscriber[In]) = source match {
case s: ActorFlowSource[In] s.attach(sub, this, flowName)
case s throwUnknownType("Source", s)
}
def createSink() = sink match {
case s: ActorFlowSink[In] s.create(this, flowName)
case s throwUnknownType("Sink", s)
}
def createSource() = source match {
case s: ActorFlowSource[Out] s.create(this, flowName)
case s throwUnknownType("Source", s)
}
def isActive(s: AnyRef) = s match {
case s: ActorFlowSource[_] s.isActive
case s: ActorFlowSink[_] s.isActive
case s: Source[_] throwUnknownType("Source", s)
case s: Sink[_] throwUnknownType("Sink", s)
}
val (sourceValue, sinkValue) =
if (ops.isEmpty) {
if (isActive(sink)) {
val (sub, value) = createSink()
(attachSource(sub), value)
} else if (isActive(source)) {
val (pub, value) = createSource()
(value, attachSink(pub))
} else {
val id: Processor[In, Out] = processorForNode(identityTransform, flowName, 1).asInstanceOf[Processor[In, Out]]
(attachSource(id), attachSink(id))
}
} else {
val opsSize = ops.size
val last = processorForNode(ops.head, flowName, opsSize).asInstanceOf[Processor[Any, Out]]
val first = processorChain(last, ops.tail, flowName, opsSize - 1).asInstanceOf[Processor[In, Any]]
(attachSource(first), attachSink(last))
}
new MaterializedPipe(source, sourceValue, sink, sinkValue)
}
private val identityTransform = Transform("identity", ()
private val identityTransform = Ast.Transform("identity", ()
new Transformer[Any, Any] {
override def onNext(element: Any) = List(element)
})
def processorForNode(op: AstNode, flowName: String, n: Int): Processor[Any, Any] = {
val impl = actorOf(ActorProcessor.props(settings, op), s"$flowName-$n-${op.name}")
ActorProcessor(impl)
/**
* INTERNAL API
*/
private[akka] def processorForNode(op: AstNode, flowName: String, n: Int): Processor[Any, Any] = {
val impl = actorOf(ActorProcessorFactory.props(this, op), s"$flowName-$n-${op.name}")
ActorProcessorFactory(impl)
}
def actorOf(props: Props, name: String): ActorRef = supervisor match {
@ -180,20 +226,49 @@ private[akka] case class ActorBasedFlowMaterializer(
throw new IllegalStateException(s"Stream supervisor must be a local actor, was [${supervisor.getClass.getName}]")
}
override def ductProduceTo[In, Out](subscriber: Subscriber[Out], ops: List[Ast.AstNode]): Subscriber[In] =
processorChain(subscriber, ops, createFlowName(), ops.size).asInstanceOf[Subscriber[In]]
override def ductBuild[In, Out](ops: List[Ast.AstNode]): (Subscriber[In], Publisher[Out]) = {
override def materializeJunction[In, Out](op: Ast.JunctionAstNode, inputCount: Int, outputCount: Int): (immutable.Seq[Subscriber[In]], immutable.Seq[Publisher[Out]]) = {
val flowName = createFlowName()
if (ops.isEmpty) {
val identityProcessor: Processor[In, Out] = processorForNode(identityTransform, flowName, 1).asInstanceOf[Processor[In, Out]]
(identityProcessor, identityProcessor)
} else {
val opsSize = ops.size
val outProcessor = processorForNode(ops.head, flowName, opsSize).asInstanceOf[Processor[In, Out]]
val topSubscriber = processorChain(outProcessor, ops.tail, flowName, opsSize - 1).asInstanceOf[Processor[In, Out]]
(topSubscriber, outProcessor)
val actorName = s"$flowName-${op.name}"
op match {
case fanin: Ast.FanInAstNode
val impl = op match {
case Ast.Merge
actorOf(FairMerge.props(settings, inputCount).withDispatcher(settings.dispatcher), actorName)
case Ast.MergePreferred
actorOf(UnfairMerge.props(settings, inputCount).withDispatcher(settings.dispatcher), actorName)
case zip: Ast.Zip
actorOf(Zip.props(settings, zip.as).withDispatcher(settings.dispatcher), actorName)
case Ast.Concat
actorOf(Concat.props(settings).withDispatcher(settings.dispatcher), actorName)
case Ast.FlexiMergeNode(merger)
actorOf(FlexiMergeImpl.props(settings, inputCount, merger.createMergeLogic()).
withDispatcher(settings.dispatcher), actorName)
}
val publisher = new ActorPublisher[Out](impl, equalityValue = None)
impl ! ExposedPublisher(publisher.asInstanceOf[ActorPublisher[Any]])
val subscribers = Vector.tabulate(inputCount)(FanIn.SubInput[In](impl, _))
(subscribers, List(publisher))
case fanout: Ast.FanOutAstNode
val impl = op match {
case Ast.Broadcast
actorOf(Broadcast.props(settings, outputCount).withDispatcher(settings.dispatcher), actorName)
case Ast.Balance
actorOf(Balance.props(settings, outputCount).withDispatcher(settings.dispatcher), actorName)
case Ast.Unzip
actorOf(Unzip.props(settings).withDispatcher(settings.dispatcher), actorName)
}
val publishers = Vector.tabulate(outputCount)(id new ActorPublisher[Out](impl, equalityValue = None) {
override val wakeUpMsg = FanOut.SubstreamSubscribePending(id)
})
impl ! FanOut.ExposedPublishers(publishers.asInstanceOf[immutable.Seq[ActorPublisher[Any]]])
val subscriber = ActorSubscriber[In](impl)
(List(subscriber), publishers)
}
}
}
@ -226,9 +301,41 @@ private[akka] object StreamSupervisor {
private[akka] class StreamSupervisor(settings: MaterializerSettings) extends Actor {
import StreamSupervisor._
override def supervisorStrategy = SupervisorStrategy.stoppingStrategy
def receive = {
case Materialize(props, name)
val impl = context.actorOf(props, name)
sender() ! impl
}
}
}
/**
* INTERNAL API
*/
private[akka] object ActorProcessorFactory {
import Ast._
def props(materializer: FlowMaterializer, op: AstNode): Props = {
val settings = materializer.settings
(op match {
case t: Transform Props(new TransformProcessorImpl(settings, t.mkTransformer()))
case t: TimerTransform Props(new TimerTransformerProcessorsImpl(settings, t.mkTransformer()))
case m: MapAsync Props(new MapAsyncProcessorImpl(settings, m.f))
case m: MapAsyncUnordered Props(new MapAsyncUnorderedProcessorImpl(settings, m.f))
case g: GroupBy Props(new GroupByProcessorImpl(settings, g.f))
case tt: PrefixAndTail Props(new PrefixAndTailImpl(settings, tt.n))
case s: SplitWhen Props(new SplitWhenProcessorImpl(settings, s.p))
case ConcatAll Props(new ConcatAllImpl(materializer))
case cf: Conflate Props(new ConflateImpl(settings, cf.seed, cf.aggregate))
case ex: Expand Props(new ExpandImpl(settings, ex.seed, ex.extrapolate))
case bf: Buffer Props(new BufferImpl(settings, bf.size, bf.overflowStrategy))
}).withDispatcher(settings.dispatcher)
}
def apply[I, O](impl: ActorRef): ActorProcessor[I, O] = {
val p = new ActorProcessor[I, O](impl)
impl ! ExposedPublisher(p.asInstanceOf[ActorPublisher[Any]])
p
}
}