created factory method

This commit is contained in:
Endre Sándor Varga 2015-07-09 13:36:54 +02:00
parent 662dd035a0
commit 47ea3fde38
13 changed files with 129 additions and 30 deletions

View file

@ -399,7 +399,9 @@ Using an Akka Streams :class:`Flow` we can transform the stream and connect thos
The :class:`Publisher` is used as an input :class:`Source` to the flow and the
:class:`Subscriber` is used as an output :class:`Sink`.
A :class:`Flow` can also be materialized to a :class:`Subscriber`, :class:`Publisher` pair:
A :class:`Flow` can also be also converted to a :class:`RunnableGraph[Processor[In, Out]]` which
materializes to a :class:`Processor` when ``run()`` is called. ``run()`` itself can be called multiple
times, resulting in a new :class:`Processor` instance each time.
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/ReactiveStreamsDocTest.java#flow-publisher-subscriber
@ -429,4 +431,9 @@ by using the Subscriber-:class:`Source`:
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/ReactiveStreamsDocTest.java#sink-subscriber
It is also possible to use re-wrap :class:`Processor` instances as a :class:`Flow` by
passing a factory function that will create the :class:`Processor` instances:
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/ReactiveStreamsDocTest.java#use-processor
Please note that a factory is necessary to achieve reusability of the resulting :class:`Flow`.

View file

@ -4,10 +4,9 @@
package docs.stream
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Flow
import akka.stream.scaladsl.{ RunnableGraph, Flow, Sink, Source }
import akka.stream.testkit._
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
import org.reactivestreams.Processor
class ReactiveStreamsDocSpec extends AkkaSpec {
import TwitterStreamQuickstartDocSpec._
@ -78,11 +77,10 @@ class ReactiveStreamsDocSpec extends AkkaSpec {
val storage = impl.storage
//#flow-publisher-subscriber
val (in: Subscriber[Tweet], out: Publisher[Author]) =
authors.runWith(Source.subscriber[Tweet], Sink.publisher[Author])
val processor: Processor[Tweet, Author] = authors.toProcessor.run()
tweets.subscribe(in)
out.subscribe(storage)
tweets.subscribe(processor)
processor.subscribe(storage)
//#flow-publisher-subscriber
assertResult(storage)
@ -135,4 +133,15 @@ class ReactiveStreamsDocSpec extends AkkaSpec {
assertResult(storage)
}
"use a processor" in {
//#use-processor
// An example Processor factory
def createProcessor: Processor[Int, Int] = Flow[Int].toProcessor.run()
val flow: Flow[Int, Int, Unit] = Flow(() => createProcessor)
//#use-processor
}
}

View file

@ -394,7 +394,9 @@ Using an Akka Streams :class:`Flow` we can transform the stream and connect thos
The :class:`Publisher` is used as an input :class:`Source` to the flow and the
:class:`Subscriber` is used as an output :class:`Sink`.
A :class:`Flow` can also be materialized to a :class:`Subscriber`, :class:`Publisher` pair:
A :class:`Flow` can also be also converted to a :class:`RunnableGraph[Processor[In, Out]]` which
materializes to a :class:`Processor` when ``run()`` is called. ``run()`` itself can be called multiple
times, resulting in a new :class:`Processor` instance each time.
.. includecode:: code/docs/stream/ReactiveStreamsDocSpec.scala#flow-publisher-subscriber
@ -424,4 +426,10 @@ by using the Subscriber-:class:`Source`:
.. includecode:: code/docs/stream/ReactiveStreamsDocSpec.scala#sink-subscriber
It is also possible to use re-wrap :class:`Processor` instances as a :class:`Flow` by
passing a factory function that will create the :class:`Processor` instances:
.. includecode:: code/docs/stream/ReactiveStreamsDocSpec.scala#use-processor
Please note that a factory is necessary to achieve reusability of the resulting :class:`Flow`.

View file

@ -28,11 +28,6 @@ abstract class AkkaIdentityProcessorVerification[T](env: TestEnvironment, publis
override def createFailedPublisher(): Publisher[T] =
TestPublisher.error(new Exception("Unable to serve subscribers right now!"))
def processorFromFlow(flow: Flow[T, T, _])(implicit mat: ActorMaterializer): Processor[T, T] = {
val (sub: Subscriber[T], pub: Publisher[T]) = flow.runWith(Source.subscriber[T], Sink.publisher[T])
processorFromSubscriberAndPublisher(sub, pub)
}
def processorFromSubscriberAndPublisher(sub: Subscriber[T], pub: Publisher[T]): Processor[T, T] = {
new Processor[T, T] {
override def onSubscribe(s: Subscription): Unit = sub.onSubscribe(s)

View file

@ -17,9 +17,8 @@ class FusableProcessorTest extends AkkaIdentityProcessorVerification[Int] {
implicit val materializer = ActorMaterializer(settings)(system)
processorFromFlow(
// withAttributes "wraps" the underlying identity and protects it from automatic removal
Flow[Int].andThen(Identity()).named("identity"))
Flow[Int].andThen[Int](Identity()).named("identity").toProcessor.run()
}
override def createElement(element: Int): Int = element

View file

@ -13,8 +13,7 @@ class MapTest extends AkkaIdentityProcessorVerification[Int] {
override def createIdentityProcessor(maxBufferSize: Int): Processor[Int, Int] = {
implicit val materializer = ActorMaterializer()(system)
processorFromFlow(
Flow[Int].map(elem elem).named("identity"))
Flow[Int].map(elem elem).named("identity").toProcessor.run()
}
override def createElement(element: Int): Int = element

View file

@ -24,7 +24,7 @@ class TransformProcessorTest extends AkkaIdentityProcessorVerification[Int] {
override def onPush(in: Int, ctx: Context[Int]) = ctx.push(in)
}
processorFromFlow(Flow[Int].transform(mkStage))
Flow[Int].transform(mkStage).toProcessor.run()
}
override def createElement(element: Int): Int = element

View file

@ -13,7 +13,7 @@ class VirtualProcessorTest extends AkkaIdentityProcessorVerification[Int] {
override def createIdentityProcessor(maxBufferSize: Int): Processor[Int, Int] = {
implicit val materializer = ActorMaterializer()(system)
val identity = processorFromFlow(Flow[Int].map(elem elem).named("identity"))
val identity = Flow[Int].map(elem elem).named("identity").toProcessor.run()
val left, right = new VirtualProcessor[Int]
left.subscribe(identity)
identity.subscribe(right)

View file

@ -9,6 +9,7 @@ import akka.stream.Supervision._
import akka.stream.impl.Stages.StageModule
import akka.stream.stage.Stage
import scala.collection.immutable
import scala.concurrent.Await
import scala.concurrent.duration._
import akka.actor._
import akka.stream.{ AbruptTerminationException, Attributes, ActorMaterializerSettings, ActorMaterializer }
@ -311,6 +312,19 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece
subs.expectNext("6-s")
subs.expectComplete()
}
"be possible to convert to a processor, and should be able to take a Processor" in {
val identity1 = Flow[Int].toProcessor
val identity2 = Flow(() identity1.run())
Await.result(
Source(1 to 10).via(identity2).grouped(100).runWith(Sink.head),
3.seconds) should ===(1 to 10)
// Reusable:
Await.result(
Source(1 to 10).via(identity2).grouped(100).runWith(Sink.head),
3.seconds) should ===(1 to 10)
}
}
"A Flow with multiple subscribers (FanOutBox)" must {

View file

@ -8,6 +8,8 @@ import akka.stream._
import akka.japi.{ Util, Pair }
import akka.japi.function
import akka.stream.scaladsl
import akka.stream.scaladsl.{ Keep, Sink, Source }
import org.reactivestreams.{ Subscription, Publisher, Subscriber, Processor }
import scala.annotation.unchecked.uncheckedVariance
import scala.collection.immutable
import scala.concurrent.Future
@ -31,6 +33,9 @@ object Flow {
def create[T](): javadsl.Flow[T, T, Unit] =
adapt(scaladsl.Flow[T])
def create[I, O](processorFactory: function.Creator[Processor[I, O]]): javadsl.Flow[I, O, Unit] =
adapt(scaladsl.Flow(() processorFactory.create()))
/** Create a `Flow` which can process elements of type `T`. */
def of[T](clazz: Class[T]): javadsl.Flow[T, T, Unit] =
create[T]()
@ -72,12 +77,41 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph
/**
* Transform this [[Flow]] by appending the given processing steps.
*
* {{{
* +----------------------------+
* | Resulting Flow |
* | |
* | +------+ +------+ |
* | | | | | |
* In ~~> | this | ~Out~> | flow | ~~> T
* | | | | | |
* | +------+ +------+ |
* +----------------------------+
* }}}
*
* The materialized value of the combined [[Flow]] will be the materialized
* value of the current flow (ignoring the other Flows value), use
* [[Flow#viaMat viaMat]] if a different strategy is needed.
*/
def via[T, M](flow: Graph[FlowShape[Out, T], M]): javadsl.Flow[In, T, Mat] =
new Flow(delegate.via(flow))
/**
* Transform this [[Flow]] by appending the given processing steps.
* {{{
* +----------------------------+
* | Resulting Flow |
* | |
* | +------+ +------+ |
* | | | | | |
* In ~~> | this | ~Out~> | flow | ~~> T
* | | | | | |
* | +------+ +------+ |
* +----------------------------+
* }}}
* The `combine` function is used to compose the materialized values of this flow and that
* flow into the materialized value of the resulting Flow.
*/
def viaMat[T, M, M2](flow: Graph[FlowShape[Out, T], M], combine: function.Function2[Mat, M, M2]): javadsl.Flow[In, T, M2] =
new Flow(delegate.viaMat(flow)(combinerToScala(combine)))
@ -802,6 +836,16 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph
def log(name: String): javadsl.Flow[In, Out, Mat] =
this.log(name, javaIdentityFunction[Out], null)
/**
* Converts this Flow to a [[RunnableGraph]] that materializes to a Reactive Streams [[org.reactivestreams.Processor]]
* which implements the operations encapsulated by this Flow. Every materialization results in a new Processor
* instance, i.e. the returned [[RunnableGraph]] is reusable.
*
* @return A [[RunnableGraph]] that materializes to a Processor when run() is called on it.
*/
def toProcessor: RunnableGraph[Processor[In @uncheckedVariance, Out @uncheckedVariance]] = {
new RunnableGraphAdapter(delegate.toProcessor)
}
}
/**

View file

@ -14,8 +14,7 @@ import akka.stream.Attributes._
import akka.stream._
import akka.stream.impl.{ ActorPublisherSource, StreamLayout }
import akka.util.ByteString
import org.reactivestreams.Publisher
import org.reactivestreams.Subscriber
import org.reactivestreams.{ Processor, Publisher, Subscriber }
import scala.annotation.unchecked.uncheckedVariance
import scala.collection.JavaConverters._
import scala.concurrent.{ Promise, Future }

View file

@ -6,12 +6,12 @@ package akka.stream.scaladsl
import akka.actor.ActorSystem
import akka.stream.impl.SplitDecision._
import akka.event.LoggingAdapter
import akka.stream.impl.Stages.{ MaterializingStageFactory, StageModule }
import akka.stream.impl.Stages.{ DirectProcessor, MaterializingStageFactory, StageModule }
import akka.stream.impl.StreamLayout.{ EmptyModule, Module }
import akka.stream._
import akka.stream.Attributes._
import akka.util.Collections.EmptyImmutableSeq
import org.reactivestreams.Processor
import org.reactivestreams.{ Subscription, Publisher, Subscriber, Processor }
import scala.annotation.implicitNotFound
import scala.annotation.unchecked.uncheckedVariance
import scala.collection.immutable
@ -282,6 +282,26 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module)
Source.wrap(source).via(this).toMat(sink)(Keep.both).run()
}
/**
* Converts this Flow to a [[RunnableGraph]] that materializes to a Reactive Streams [[org.reactivestreams.Processor]]
* which implements the operations encapsulated by this Flow. Every materialization results in a new Processor
* instance, i.e. the returned [[RunnableGraph]] is reusable.
*
* @return A [[RunnableGraph]] that materializes to a Processor when run() is called on it.
*/
def toProcessor: RunnableGraph[Processor[In @uncheckedVariance, Out @uncheckedVariance]] = {
Source.subscriber[In].via(this).toMat(Sink.publisher[Out])(Keep.both[Subscriber[In], Publisher[Out]])
.mapMaterializedValue {
case (sub, pub) new Processor[In, Out] {
override def onError(t: Throwable): Unit = sub.onError(t)
override def onSubscribe(s: Subscription): Unit = sub.onSubscribe(s)
override def onComplete(): Unit = sub.onComplete()
override def onNext(t: In): Unit = sub.onNext(t)
override def subscribe(s: Subscriber[_ >: Out]): Unit = pub.subscribe(s)
}
}
}
/** Converts this Scala DSL element to it's Java DSL counterpart. */
def asJava: javadsl.Flow[In, Out, Mat] = new javadsl.Flow(this)
@ -291,6 +311,14 @@ object Flow extends FlowApply {
private def shape[I, O](name: String): FlowShape[I, O] = FlowShape(Inlet(name + ".in"), Outlet(name + ".out"))
/**
* Creates a Flow from a Reactive Streams [[org.reactivestreams.Processor]]
*/
def apply[I, O](processorFactory: () Processor[I, O]): Flow[I, O, Unit] = {
val untypedFactory = processorFactory.asInstanceOf[() Processor[Any, Any]]
Flow[I].andThen(DirectProcessor(() (untypedFactory(), ())))
}
/**
* Helper to create `Flow` without a [[Source]] or a [[Sink]].
* Example usage: `Flow[Int]`

View file

@ -5,11 +5,10 @@ package akka.stream.scaladsl
import akka.actor.{ ActorRef, Cancellable, Props }
import akka.stream._
import akka.stream.impl.Stages.{ MaterializingStageFactory, StageModule }
import akka.stream.impl.Stages.DefaultAttributes
import akka.stream.impl.Stages.{ DirectProcessor, MaterializingStageFactory, StageModule, DefaultAttributes }
import akka.stream.impl.{ EmptyPublisher, ErrorPublisher, _ }
import akka.stream.stage.{ Context, PushPullStage, SyncDirective, TerminationDirective }
import org.reactivestreams.{ Publisher, Subscriber }
import org.reactivestreams._
import akka.stream.{ SourceShape, Inlet, Outlet }
import akka.stream.impl.StreamLayout.{ EmptyModule, Module }
@ -18,7 +17,6 @@ import scala.annotation.unchecked.uncheckedVariance
import scala.language.higherKinds
import akka.actor.Props
import akka.stream.impl.{ EmptyPublisher, ErrorPublisher }
import org.reactivestreams.Publisher
import scala.collection.immutable
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ Future, Promise }
@ -29,7 +27,6 @@ import akka.stream.impl._
import akka.actor.Cancellable
import akka.actor.ActorRef
import scala.concurrent.Promise
import org.reactivestreams.Subscriber
import akka.stream.stage.SyncDirective
import akka.stream.OverflowStrategy
import akka.stream.Attributes