diff --git a/akka-docs-dev/rst/java/stream-integrations.rst b/akka-docs-dev/rst/java/stream-integrations.rst index afc1a3e5c7..5bee154450 100644 --- a/akka-docs-dev/rst/java/stream-integrations.rst +++ b/akka-docs-dev/rst/java/stream-integrations.rst @@ -401,7 +401,9 @@ Using an Akka Streams :class:`Flow` we can transform the stream and connect thos The :class:`Publisher` is used as an input :class:`Source` to the flow and the :class:`Subscriber` is used as an output :class:`Sink`. -A :class:`Flow` can also be materialized to a :class:`Subscriber`, :class:`Publisher` pair: +A :class:`Flow` can also be also converted to a :class:`RunnableGraph[Processor[In, Out]]` which +materializes to a :class:`Processor` when ``run()`` is called. ``run()`` itself can be called multiple +times, resulting in a new :class:`Processor` instance each time. .. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/ReactiveStreamsDocTest.java#flow-publisher-subscriber @@ -431,4 +433,9 @@ by using the Subscriber-:class:`Source`: .. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/ReactiveStreamsDocTest.java#sink-subscriber +It is also possible to use re-wrap :class:`Processor` instances as a :class:`Flow` by +passing a factory function that will create the :class:`Processor` instances: +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/ReactiveStreamsDocTest.java#use-processor + +Please note that a factory is necessary to achieve reusability of the resulting :class:`Flow`. diff --git a/akka-docs-dev/rst/scala/code/docs/stream/ReactiveStreamsDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/ReactiveStreamsDocSpec.scala index d69667043d..079f5c3f30 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/ReactiveStreamsDocSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/ReactiveStreamsDocSpec.scala @@ -4,10 +4,9 @@ package docs.stream import akka.stream.ActorMaterializer -import akka.stream.scaladsl.Flow +import akka.stream.scaladsl.{ RunnableGraph, Flow, Sink, Source } import akka.stream.testkit._ -import akka.stream.scaladsl.Sink -import akka.stream.scaladsl.Source +import org.reactivestreams.Processor class ReactiveStreamsDocSpec extends AkkaSpec { import TwitterStreamQuickstartDocSpec._ @@ -78,11 +77,10 @@ class ReactiveStreamsDocSpec extends AkkaSpec { val storage = impl.storage //#flow-publisher-subscriber - val (in: Subscriber[Tweet], out: Publisher[Author]) = - authors.runWith(Source.subscriber[Tweet], Sink.publisher[Author]) + val processor: Processor[Tweet, Author] = authors.toProcessor.run() - tweets.subscribe(in) - out.subscribe(storage) + tweets.subscribe(processor) + processor.subscribe(storage) //#flow-publisher-subscriber assertResult(storage) @@ -135,4 +133,15 @@ class ReactiveStreamsDocSpec extends AkkaSpec { assertResult(storage) } + "use a processor" in { + + //#use-processor + // An example Processor factory + def createProcessor: Processor[Int, Int] = Flow[Int].toProcessor.run() + + val flow: Flow[Int, Int, Unit] = Flow(() => createProcessor) + //#use-processor + + } + } diff --git a/akka-docs-dev/rst/scala/stream-integrations.rst b/akka-docs-dev/rst/scala/stream-integrations.rst index 87f99e07f0..4c044c22d2 100644 --- a/akka-docs-dev/rst/scala/stream-integrations.rst +++ b/akka-docs-dev/rst/scala/stream-integrations.rst @@ -396,7 +396,9 @@ Using an Akka Streams :class:`Flow` we can transform the stream and connect thos The :class:`Publisher` is used as an input :class:`Source` to the flow and the :class:`Subscriber` is used as an output :class:`Sink`. -A :class:`Flow` can also be materialized to a :class:`Subscriber`, :class:`Publisher` pair: +A :class:`Flow` can also be also converted to a :class:`RunnableGraph[Processor[In, Out]]` which +materializes to a :class:`Processor` when ``run()`` is called. ``run()`` itself can be called multiple +times, resulting in a new :class:`Processor` instance each time. .. includecode:: code/docs/stream/ReactiveStreamsDocSpec.scala#flow-publisher-subscriber @@ -426,4 +428,10 @@ by using the Subscriber-:class:`Source`: .. includecode:: code/docs/stream/ReactiveStreamsDocSpec.scala#sink-subscriber +It is also possible to use re-wrap :class:`Processor` instances as a :class:`Flow` by +passing a factory function that will create the :class:`Processor` instances: + +.. includecode:: code/docs/stream/ReactiveStreamsDocSpec.scala#use-processor + +Please note that a factory is necessary to achieve reusability of the resulting :class:`Flow`. diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/AkkaIdentityProcessorVerification.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/AkkaIdentityProcessorVerification.scala index 133d6bd46b..43b4ea4756 100644 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/AkkaIdentityProcessorVerification.scala +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/AkkaIdentityProcessorVerification.scala @@ -28,11 +28,6 @@ abstract class AkkaIdentityProcessorVerification[T](env: TestEnvironment, publis override def createFailedPublisher(): Publisher[T] = TestPublisher.error(new Exception("Unable to serve subscribers right now!")) - def processorFromFlow(flow: Flow[T, T, _])(implicit mat: ActorMaterializer): Processor[T, T] = { - val (sub: Subscriber[T], pub: Publisher[T]) = flow.runWith(Source.subscriber[T], Sink.publisher[T]) - processorFromSubscriberAndPublisher(sub, pub) - } - def processorFromSubscriberAndPublisher(sub: Subscriber[T], pub: Publisher[T]): Processor[T, T] = { new Processor[T, T] { override def onSubscribe(s: Subscription): Unit = sub.onSubscribe(s) diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/FusableProcessorTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/FusableProcessorTest.scala index ae20ba358a..fe4fd1895f 100644 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/FusableProcessorTest.scala +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/FusableProcessorTest.scala @@ -17,9 +17,8 @@ class FusableProcessorTest extends AkkaIdentityProcessorVerification[Int] { implicit val materializer = ActorMaterializer(settings)(system) - processorFromFlow( - // withAttributes "wraps" the underlying identity and protects it from automatic removal - Flow[Int].andThen(Identity()).named("identity")) + // withAttributes "wraps" the underlying identity and protects it from automatic removal + Flow[Int].andThen[Int](Identity()).named("identity").toProcessor.run() } override def createElement(element: Int): Int = element diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/MapTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/MapTest.scala index 2a0e3da7b8..fdf4cfd394 100644 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/MapTest.scala +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/MapTest.scala @@ -13,8 +13,7 @@ class MapTest extends AkkaIdentityProcessorVerification[Int] { override def createIdentityProcessor(maxBufferSize: Int): Processor[Int, Int] = { implicit val materializer = ActorMaterializer()(system) - processorFromFlow( - Flow[Int].map(elem ⇒ elem).named("identity")) + Flow[Int].map(elem ⇒ elem).named("identity").toProcessor.run() } override def createElement(element: Int): Int = element diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/TransformProcessorTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/TransformProcessorTest.scala index bc921a7017..2f6b418481 100644 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/TransformProcessorTest.scala +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/TransformProcessorTest.scala @@ -24,7 +24,7 @@ class TransformProcessorTest extends AkkaIdentityProcessorVerification[Int] { override def onPush(in: Int, ctx: Context[Int]) = ctx.push(in) } - processorFromFlow(Flow[Int].transform(mkStage)) + Flow[Int].transform(mkStage).toProcessor.run() } override def createElement(element: Int): Int = element diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/VirtualPublisherTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/VirtualPublisherTest.scala index 2d539ab219..c0b005c7de 100644 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/VirtualPublisherTest.scala +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/VirtualPublisherTest.scala @@ -13,7 +13,7 @@ class VirtualProcessorTest extends AkkaIdentityProcessorVerification[Int] { override def createIdentityProcessor(maxBufferSize: Int): Processor[Int, Int] = { implicit val materializer = ActorMaterializer()(system) - val identity = processorFromFlow(Flow[Int].map(elem ⇒ elem).named("identity")) + val identity = Flow[Int].map(elem ⇒ elem).named("identity").toProcessor.run() val left, right = new VirtualProcessor[Int] left.subscribe(identity) identity.subscribe(right) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala index e9bf465e2d..bdcebab1df 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala @@ -9,6 +9,7 @@ import akka.stream.Supervision._ import akka.stream.impl.Stages.StageModule import akka.stream.stage.Stage import scala.collection.immutable +import scala.concurrent.Await import scala.concurrent.duration._ import akka.actor._ import akka.stream.{ AbruptTerminationException, Attributes, ActorMaterializerSettings, ActorMaterializer } @@ -311,6 +312,19 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece subs.expectNext("6-s") subs.expectComplete() } + + "be possible to convert to a processor, and should be able to take a Processor" in { + val identity1 = Flow[Int].toProcessor + val identity2 = Flow(() ⇒ identity1.run()) + Await.result( + Source(1 to 10).via(identity2).grouped(100).runWith(Sink.head), + 3.seconds) should ===(1 to 10) + + // Reusable: + Await.result( + Source(1 to 10).via(identity2).grouped(100).runWith(Sink.head), + 3.seconds) should ===(1 to 10) + } } "A Flow with multiple subscribers (FanOutBox)" must { diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 3ad2a86a24..af0e43c2e7 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -8,6 +8,8 @@ import akka.stream._ import akka.japi.{ Util, Pair } import akka.japi.function import akka.stream.scaladsl +import akka.stream.scaladsl.{ Keep, Sink, Source } +import org.reactivestreams.{ Subscription, Publisher, Subscriber, Processor } import scala.annotation.unchecked.uncheckedVariance import scala.collection.immutable import scala.concurrent.Future @@ -31,6 +33,9 @@ object Flow { def create[T](): javadsl.Flow[T, T, Unit] = adapt(scaladsl.Flow[T]) + def create[I, O](processorFactory: function.Creator[Processor[I, O]]): javadsl.Flow[I, O, Unit] = + adapt(scaladsl.Flow(() ⇒ processorFactory.create())) + /** Create a `Flow` which can process elements of type `T`. */ def of[T](clazz: Class[T]): javadsl.Flow[T, T, Unit] = create[T]() @@ -72,12 +77,41 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph /** * Transform this [[Flow]] by appending the given processing steps. + * + * {{{ + * +----------------------------+ + * | Resulting Flow | + * | | + * | +------+ +------+ | + * | | | | | | + * In ~~> | this | ~Out~> | flow | ~~> T + * | | | | | | + * | +------+ +------+ | + * +----------------------------+ + * }}} + * + * The materialized value of the combined [[Flow]] will be the materialized + * value of the current flow (ignoring the other Flow’s value), use + * [[Flow#viaMat viaMat]] if a different strategy is needed. */ def via[T, M](flow: Graph[FlowShape[Out, T], M]): javadsl.Flow[In, T, Mat] = new Flow(delegate.via(flow)) /** * Transform this [[Flow]] by appending the given processing steps. + * {{{ + * +----------------------------+ + * | Resulting Flow | + * | | + * | +------+ +------+ | + * | | | | | | + * In ~~> | this | ~Out~> | flow | ~~> T + * | | | | | | + * | +------+ +------+ | + * +----------------------------+ + * }}} + * The `combine` function is used to compose the materialized values of this flow and that + * flow into the materialized value of the resulting Flow. */ def viaMat[T, M, M2](flow: Graph[FlowShape[Out, T], M], combine: function.Function2[Mat, M, M2]): javadsl.Flow[In, T, M2] = new Flow(delegate.viaMat(flow)(combinerToScala(combine))) @@ -809,6 +843,16 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph def log(name: String): javadsl.Flow[In, Out, Mat] = this.log(name, javaIdentityFunction[Out], null) + /** + * Converts this Flow to a [[RunnableGraph]] that materializes to a Reactive Streams [[org.reactivestreams.Processor]] + * which implements the operations encapsulated by this Flow. Every materialization results in a new Processor + * instance, i.e. the returned [[RunnableGraph]] is reusable. + * + * @return A [[RunnableGraph]] that materializes to a Processor when run() is called on it. + */ + def toProcessor: RunnableGraph[Processor[In @uncheckedVariance, Out @uncheckedVariance]] = { + new RunnableGraphAdapter(delegate.toProcessor) + } } /** diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 96f49c7e69..45be60b496 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -14,8 +14,7 @@ import akka.stream.Attributes._ import akka.stream._ import akka.stream.impl.{ ActorPublisherSource, StreamLayout } import akka.util.ByteString -import org.reactivestreams.Publisher -import org.reactivestreams.Subscriber +import org.reactivestreams.{ Processor, Publisher, Subscriber } import scala.annotation.unchecked.uncheckedVariance import scala.collection.JavaConverters._ import scala.concurrent.{ Promise, Future } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index d476f89652..d76daa8ed7 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -6,12 +6,12 @@ package akka.stream.scaladsl import akka.actor.ActorSystem import akka.stream.impl.SplitDecision._ import akka.event.LoggingAdapter -import akka.stream.impl.Stages.{ MaterializingStageFactory, StageModule } +import akka.stream.impl.Stages.{ DirectProcessor, MaterializingStageFactory, StageModule } import akka.stream.impl.StreamLayout.{ EmptyModule, Module } import akka.stream._ import akka.stream.Attributes._ import akka.util.Collections.EmptyImmutableSeq -import org.reactivestreams.Processor +import org.reactivestreams.{ Subscription, Publisher, Subscriber, Processor } import scala.annotation.implicitNotFound import scala.annotation.unchecked.uncheckedVariance import scala.collection.immutable @@ -282,6 +282,26 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module) Source.wrap(source).via(this).toMat(sink)(Keep.both).run() } + /** + * Converts this Flow to a [[RunnableGraph]] that materializes to a Reactive Streams [[org.reactivestreams.Processor]] + * which implements the operations encapsulated by this Flow. Every materialization results in a new Processor + * instance, i.e. the returned [[RunnableGraph]] is reusable. + * + * @return A [[RunnableGraph]] that materializes to a Processor when run() is called on it. + */ + def toProcessor: RunnableGraph[Processor[In @uncheckedVariance, Out @uncheckedVariance]] = { + Source.subscriber[In].via(this).toMat(Sink.publisher[Out])(Keep.both[Subscriber[In], Publisher[Out]]) + .mapMaterializedValue { + case (sub, pub) ⇒ new Processor[In, Out] { + override def onError(t: Throwable): Unit = sub.onError(t) + override def onSubscribe(s: Subscription): Unit = sub.onSubscribe(s) + override def onComplete(): Unit = sub.onComplete() + override def onNext(t: In): Unit = sub.onNext(t) + override def subscribe(s: Subscriber[_ >: Out]): Unit = pub.subscribe(s) + } + } + } + /** Converts this Scala DSL element to it's Java DSL counterpart. */ def asJava: javadsl.Flow[In, Out, Mat] = new javadsl.Flow(this) @@ -291,6 +311,14 @@ object Flow extends FlowApply { private def shape[I, O](name: String): FlowShape[I, O] = FlowShape(Inlet(name + ".in"), Outlet(name + ".out")) + /** + * Creates a Flow from a Reactive Streams [[org.reactivestreams.Processor]] + */ + def apply[I, O](processorFactory: () ⇒ Processor[I, O]): Flow[I, O, Unit] = { + val untypedFactory = processorFactory.asInstanceOf[() ⇒ Processor[Any, Any]] + Flow[I].andThen(DirectProcessor(() ⇒ (untypedFactory(), ()))) + } + /** * Helper to create `Flow` without a [[Source]] or a [[Sink]]. * Example usage: `Flow[Int]` diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index 5782e738c0..6691d7831c 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -5,11 +5,10 @@ package akka.stream.scaladsl import akka.actor.{ ActorRef, Cancellable, Props } import akka.stream._ -import akka.stream.impl.Stages.{ MaterializingStageFactory, StageModule } -import akka.stream.impl.Stages.DefaultAttributes +import akka.stream.impl.Stages.{ DirectProcessor, MaterializingStageFactory, StageModule, DefaultAttributes } import akka.stream.impl.{ EmptyPublisher, ErrorPublisher, _ } import akka.stream.stage.{ Context, PushPullStage, SyncDirective, TerminationDirective } -import org.reactivestreams.{ Publisher, Subscriber } +import org.reactivestreams._ import akka.stream.{ SourceShape, Inlet, Outlet } import akka.stream.impl.StreamLayout.{ EmptyModule, Module } @@ -18,7 +17,6 @@ import scala.annotation.unchecked.uncheckedVariance import scala.language.higherKinds import akka.actor.Props import akka.stream.impl.{ EmptyPublisher, ErrorPublisher } -import org.reactivestreams.Publisher import scala.collection.immutable import scala.concurrent.duration.FiniteDuration import scala.concurrent.{ Future, Promise } @@ -29,7 +27,6 @@ import akka.stream.impl._ import akka.actor.Cancellable import akka.actor.ActorRef import scala.concurrent.Promise -import org.reactivestreams.Subscriber import akka.stream.stage.SyncDirective import akka.stream.OverflowStrategy import akka.stream.Attributes