Merge pull request #17953 from drewhk/wip-17891-processor-support-drewhk

+str #17891: Add direct support for RS Processors
This commit is contained in:
Martynas Mickevičius 2015-07-13 17:21:10 +03:00
commit efc659b70a
13 changed files with 129 additions and 30 deletions

View file

@ -6,12 +6,12 @@ package akka.stream.scaladsl
import akka.actor.ActorSystem
import akka.stream.impl.SplitDecision._
import akka.event.LoggingAdapter
import akka.stream.impl.Stages.{ MaterializingStageFactory, StageModule }
import akka.stream.impl.Stages.{ DirectProcessor, MaterializingStageFactory, StageModule }
import akka.stream.impl.StreamLayout.{ EmptyModule, Module }
import akka.stream._
import akka.stream.Attributes._
import akka.util.Collections.EmptyImmutableSeq
import org.reactivestreams.Processor
import org.reactivestreams.{ Subscription, Publisher, Subscriber, Processor }
import scala.annotation.implicitNotFound
import scala.annotation.unchecked.uncheckedVariance
import scala.collection.immutable
@ -282,6 +282,26 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module)
Source.wrap(source).via(this).toMat(sink)(Keep.both).run()
}
/**
* Converts this Flow to a [[RunnableGraph]] that materializes to a Reactive Streams [[org.reactivestreams.Processor]]
* which implements the operations encapsulated by this Flow. Every materialization results in a new Processor
* instance, i.e. the returned [[RunnableGraph]] is reusable.
*
* @return A [[RunnableGraph]] that materializes to a Processor when run() is called on it.
*/
def toProcessor: RunnableGraph[Processor[In @uncheckedVariance, Out @uncheckedVariance]] = {
Source.subscriber[In].via(this).toMat(Sink.publisher[Out])(Keep.both[Subscriber[In], Publisher[Out]])
.mapMaterializedValue {
case (sub, pub) new Processor[In, Out] {
override def onError(t: Throwable): Unit = sub.onError(t)
override def onSubscribe(s: Subscription): Unit = sub.onSubscribe(s)
override def onComplete(): Unit = sub.onComplete()
override def onNext(t: In): Unit = sub.onNext(t)
override def subscribe(s: Subscriber[_ >: Out]): Unit = pub.subscribe(s)
}
}
}
/** Converts this Scala DSL element to it's Java DSL counterpart. */
def asJava: javadsl.Flow[In, Out, Mat] = new javadsl.Flow(this)
@ -291,6 +311,14 @@ object Flow extends FlowApply {
private def shape[I, O](name: String): FlowShape[I, O] = FlowShape(Inlet(name + ".in"), Outlet(name + ".out"))
/**
* Creates a Flow from a Reactive Streams [[org.reactivestreams.Processor]]
*/
def apply[I, O](processorFactory: () Processor[I, O]): Flow[I, O, Unit] = {
val untypedFactory = processorFactory.asInstanceOf[() Processor[Any, Any]]
Flow[I].andThen(DirectProcessor(() (untypedFactory(), ())))
}
/**
* Helper to create `Flow` without a [[Source]] or a [[Sink]].
* Example usage: `Flow[Int]`