!str: #15474: Migrate to reactive-streams 0.4.0.M1

This commit is contained in:
Endre Sándor Varga 2014-07-22 12:21:53 +02:00
parent 5b13266431
commit d6fbadc61e
120 changed files with 2330 additions and 2674 deletions

View file

@ -5,8 +5,7 @@ package akka.stream.impl
import scala.annotation.tailrec
import scala.collection.immutable
import org.reactivestreams.api.{ Consumer, Processor, Producer }
import org.reactivestreams.spi.Subscriber
import org.reactivestreams.{ Publisher, Subscriber, Processor }
import akka.actor.ActorRefFactory
import akka.stream.{ OverflowStrategy, MaterializerSettings, FlowMaterializer, Transformer }
import scala.util.Try
@ -20,7 +19,6 @@ import akka.actor.ExtensionId
import akka.actor.ExtendedActorSystem
import akka.actor.ActorSystem
import akka.actor.Extension
import akka.stream.actor.ActorConsumer
import scala.concurrent.duration.FiniteDuration
import akka.stream.TimerTransformer
@ -44,16 +42,16 @@ private[akka] object Ast {
case class SplitWhen(p: Any Boolean) extends AstNode {
override def name = "splitWhen"
}
case class Merge(other: Producer[Any]) extends AstNode {
case class Merge(other: Publisher[Any]) extends AstNode {
override def name = "merge"
}
case class Zip(other: Producer[Any]) extends AstNode {
case class Zip(other: Publisher[Any]) extends AstNode {
override def name = "zip"
}
case class Concat(next: Producer[Any]) extends AstNode {
case class Concat(next: Publisher[Any]) extends AstNode {
override def name = "concat"
}
case class Tee(other: Consumer[Any]) extends AstNode {
case class Tee(other: Subscriber[Any]) extends AstNode {
override def name = "tee"
}
case class PrefixAndTail(n: Int) extends AstNode {
@ -75,47 +73,47 @@ private[akka] object Ast {
override def name = "buffer"
}
trait ProducerNode[I] {
private[akka] def createProducer(materializer: ActorBasedFlowMaterializer, flowName: String): Producer[I]
trait PublisherNode[I] {
private[akka] def createPublisher(materializer: ActorBasedFlowMaterializer, flowName: String): Publisher[I]
}
final case class ExistingProducer[I](producer: Producer[I]) extends ProducerNode[I] {
def createProducer(materializer: ActorBasedFlowMaterializer, flowName: String) = producer
final case class ExistingPublisher[I](publisher: Publisher[I]) extends PublisherNode[I] {
def createPublisher(materializer: ActorBasedFlowMaterializer, flowName: String) = publisher
}
final case class IteratorProducerNode[I](iterator: Iterator[I]) extends ProducerNode[I] {
final def createProducer(materializer: ActorBasedFlowMaterializer, flowName: String): Producer[I] =
if (iterator.isEmpty) EmptyProducer.asInstanceOf[Producer[I]]
else new ActorProducer[I](materializer.context.actorOf(IteratorProducer.props(iterator, materializer.settings),
final case class IteratorPublisherNode[I](iterator: Iterator[I]) extends PublisherNode[I] {
final def createPublisher(materializer: ActorBasedFlowMaterializer, flowName: String): Publisher[I] =
if (iterator.isEmpty) EmptyPublisher.asInstanceOf[Publisher[I]]
else ActorPublisher[I](materializer.context.actorOf(IteratorPublisher.props(iterator, materializer.settings),
name = s"$flowName-0-iterator"))
}
final case class IterableProducerNode[I](iterable: immutable.Iterable[I]) extends ProducerNode[I] {
def createProducer(materializer: ActorBasedFlowMaterializer, flowName: String): Producer[I] =
if (iterable.isEmpty) EmptyProducer.asInstanceOf[Producer[I]]
else new ActorProducer[I](materializer.context.actorOf(IterableProducer.props(iterable, materializer.settings),
final case class IterablePublisherNode[I](iterable: immutable.Iterable[I]) extends PublisherNode[I] {
def createPublisher(materializer: ActorBasedFlowMaterializer, flowName: String): Publisher[I] =
if (iterable.isEmpty) EmptyPublisher.asInstanceOf[Publisher[I]]
else ActorPublisher[I](materializer.context.actorOf(IterablePublisher.props(iterable, materializer.settings),
name = s"$flowName-0-iterable"), Some(iterable))
}
final case class ThunkProducerNode[I](f: () I) extends ProducerNode[I] {
def createProducer(materializer: ActorBasedFlowMaterializer, flowName: String): Producer[I] =
new ActorProducer(materializer.context.actorOf(ActorProducer.props(materializer.settings, f),
final case class ThunkPublisherNode[I](f: () I) extends PublisherNode[I] {
def createPublisher(materializer: ActorBasedFlowMaterializer, flowName: String): Publisher[I] =
ActorPublisher[I](materializer.context.actorOf(SimpleCallbackPublisher.props(materializer.settings, f),
name = s"$flowName-0-thunk"))
}
final case class FutureProducerNode[I](future: Future[I]) extends ProducerNode[I] {
def createProducer(materializer: ActorBasedFlowMaterializer, flowName: String): Producer[I] =
final case class FuturePublisherNode[I](future: Future[I]) extends PublisherNode[I] {
def createPublisher(materializer: ActorBasedFlowMaterializer, flowName: String): Publisher[I] =
future.value match {
case Some(Success(element))
new ActorProducer[I](materializer.context.actorOf(IterableProducer.props(List(element), materializer.settings),
ActorPublisher[I](materializer.context.actorOf(IterablePublisher.props(List(element), materializer.settings),
name = s"$flowName-0-future"), Some(future))
case Some(Failure(t))
ErrorProducer(t).asInstanceOf[Producer[I]]
ErrorPublisher(t).asInstanceOf[Publisher[I]]
case None
new ActorProducer[I](materializer.context.actorOf(FutureProducer.props(future, materializer.settings),
ActorPublisher[I](materializer.context.actorOf(FuturePublisher.props(future, materializer.settings),
name = s"$flowName-0-future"), Some(future))
}
}
final case class TickProducerNode[I](interval: FiniteDuration, tick: () I) extends ProducerNode[I] {
def createProducer(materializer: ActorBasedFlowMaterializer, flowName: String): Producer[I] =
new ActorProducer(materializer.context.actorOf(TickProducer.props(interval, tick, materializer.settings),
final case class TickPublisherNode[I](interval: FiniteDuration, tick: () I) extends PublisherNode[I] {
def createPublisher(materializer: ActorBasedFlowMaterializer, flowName: String): Publisher[I] =
ActorPublisher[I](materializer.context.actorOf(TickPublisher.props(interval, tick, materializer.settings),
name = s"$flowName-0-tick"))
}
}
@ -180,48 +178,43 @@ private[akka] class ActorBasedFlowMaterializer(
private def createFlowName(): String = s"$namePrefix-${nextFlowNameCount()}"
@tailrec private def processorChain(topConsumer: Consumer[_], ops: immutable.Seq[AstNode],
flowName: String, n: Int): Consumer[_] = {
@tailrec private def processorChain(topSubscriber: Subscriber[_], ops: immutable.Seq[AstNode],
flowName: String, n: Int): Subscriber[_] = {
ops match {
case op :: tail
val opProcessor: Processor[Any, Any] = processorForNode(op, flowName, n)
opProcessor.produceTo(topConsumer.asInstanceOf[Consumer[Any]])
opProcessor.subscribe(topSubscriber.asInstanceOf[Subscriber[Any]])
processorChain(opProcessor, tail, flowName, n - 1)
case _ topConsumer
case _ topSubscriber
}
}
// Ops come in reverse order
override def toProducer[I, O](producerNode: ProducerNode[I], ops: List[AstNode]): Producer[O] = {
override def toPublisher[I, O](publisherNode: PublisherNode[I], ops: List[AstNode]): Publisher[O] = {
val flowName = createFlowName()
if (ops.isEmpty) producerNode.createProducer(this, flowName).asInstanceOf[Producer[O]]
if (ops.isEmpty) publisherNode.createPublisher(this, flowName).asInstanceOf[Publisher[O]]
else {
val opsSize = ops.size
val opProcessor = processorForNode(ops.head, flowName, opsSize)
val topConsumer = processorChain(opProcessor, ops.tail, flowName, opsSize - 1)
producerNode.createProducer(this, flowName).produceTo(topConsumer.asInstanceOf[Consumer[I]])
opProcessor.asInstanceOf[Producer[O]]
val topSubscriber = processorChain(opProcessor, ops.tail, flowName, opsSize - 1)
publisherNode.createPublisher(this, flowName).subscribe(topSubscriber.asInstanceOf[Subscriber[I]])
opProcessor.asInstanceOf[Publisher[O]]
}
}
private val blackholeTransform = Transform(
new Transformer[Any, Any] {
override def onNext(element: Any) = Nil
})
private val identityTransform = Transform(
new Transformer[Any, Any] {
override def onNext(element: Any) = List(element)
})
def processorForNode(op: AstNode, flowName: String, n: Int): Processor[Any, Any] =
new ActorProcessor(context.actorOf(ActorProcessor.props(settings, op),
ActorProcessor(context.actorOf(ActorProcessor.props(settings, op),
name = s"$flowName-$n-${op.name}"))
override def ductProduceTo[In, Out](consumer: Consumer[Out], ops: List[Ast.AstNode]): Consumer[In] =
processorChain(consumer, ops, createFlowName(), ops.size).asInstanceOf[Consumer[In]]
override def ductProduceTo[In, Out](subscriber: Subscriber[Out], ops: List[Ast.AstNode]): Subscriber[In] =
processorChain(subscriber, ops, createFlowName(), ops.size).asInstanceOf[Subscriber[In]]
override def ductBuild[In, Out](ops: List[Ast.AstNode]): (Consumer[In], Producer[Out]) = {
override def ductBuild[In, Out](ops: List[Ast.AstNode]): (Subscriber[In], Publisher[Out]) = {
val flowName = createFlowName()
if (ops.isEmpty) {
val identityProcessor: Processor[In, Out] = processorForNode(identityTransform, flowName, 1).asInstanceOf[Processor[In, Out]]
@ -229,8 +222,8 @@ private[akka] class ActorBasedFlowMaterializer(
} else {
val opsSize = ops.size
val outProcessor = processorForNode(ops.head, flowName, opsSize).asInstanceOf[Processor[In, Out]]
val topConsumer = processorChain(outProcessor, ops.tail, flowName, opsSize - 1).asInstanceOf[Processor[In, Out]]
(topConsumer, outProcessor)
val topSubscriber = processorChain(outProcessor, ops.tail, flowName, opsSize - 1).asInstanceOf[Processor[In, Out]]
(topSubscriber, outProcessor)
}
}