diff --git a/akka-stream-tests-tck/src/test/scala-jdk9-only/akka/stream/tck/IterablePublisherViaJavaFlowPublisherTest.scala b/akka-stream-tests-tck/src/test/scala-jdk9-only/akka/stream/tck/IterablePublisherViaJavaFlowPublisherTest.scala new file mode 100644 index 0000000000..f2a290e45c --- /dev/null +++ b/akka-stream-tests-tck/src/test/scala-jdk9-only/akka/stream/tck/IterablePublisherViaJavaFlowPublisherTest.scala @@ -0,0 +1,26 @@ +/* + * Copyright (C) 2017 Lightbend Inc. + */ +package akka.stream.tck + +import java.util.concurrent.{ Flow => JavaFlow } + +import akka.NotUsed +import akka.stream.scaladsl.{ Flow, JavaFlowSupport, Sink, Source } +import org.reactivestreams._ + +class IterablePublisherViaJavaFlowPublisherTest extends AkkaPublisherVerification[Int] { + + override def createPublisher(elements: Long): Publisher[Int] = { + val sourceViaJavaFlowPublisher: JavaFlow.Publisher[Int] = Source(iterable(elements)) + .runWith(JavaFlowSupport.Sink.asPublisher(fanout = false)) + + + val javaFlowPublisherIntoAkkaSource: Source[Int, NotUsed] = + JavaFlowSupport.Source.fromPublisher(sourceViaJavaFlowPublisher) + + javaFlowPublisherIntoAkkaSource + .runWith(Sink.asPublisher(false)) // back as RS Publisher + } + +} diff --git a/akka-stream-tests/src/test/java-jdk9-only/akka/stream/javadsl/JavaFlowSupportCompileTest.java b/akka-stream-tests/src/test/java-jdk9-only/akka/stream/javadsl/JavaFlowSupportCompileTest.java new file mode 100644 index 0000000000..7626d60b99 --- /dev/null +++ b/akka-stream-tests/src/test/java-jdk9-only/akka/stream/javadsl/JavaFlowSupportCompileTest.java @@ -0,0 +1,45 @@ +/* + * Copyright (C) 2017 Lightbend Inc. + */ + +package akka.stream.javadsl; + +import akka.NotUsed; +import akka.japi.Pair; +import org.junit.Test; + +import java.util.concurrent.Flow; + +public class JavaFlowSupportCompileTest { + @Test + public void shouldCompile() throws Exception { + final Flow.Processor processor = new Flow.Processor() { + @Override + public void subscribe(Flow.Subscriber subscriber) {} + @Override + public void onSubscribe(Flow.Subscription subscription) {} + @Override + public void onNext(String item) {} + @Override + public void onError(Throwable throwable) {} + @Override + public void onComplete() {} + }; + + + final Source> stringSubscriberSource = + JavaFlowSupport.Source.asSubscriber(); + final Source stringNotUsedSource = + JavaFlowSupport.Source.fromPublisher(processor); + + final akka.stream.javadsl.Flow stringStringNotUsedFlow = + JavaFlowSupport.Flow.fromProcessor(() -> processor); + final akka.stream.javadsl.Flow stringStringNotUsedFlow1 = + JavaFlowSupport.Flow.fromProcessorMat(() -> Pair.apply(processor, NotUsed.getInstance())); + + final Sink> stringPublisherSink = + JavaFlowSupport.Sink.asPublisher(AsPublisher.WITH_FANOUT); + final Sink stringNotUsedSink = + JavaFlowSupport.Sink.fromSubscriber(processor); + } +} diff --git a/akka-stream-tests/src/test/scala-jdk9-only/akka/stream/scaladsl/FlowPublisherSinkSpec.scala b/akka-stream-tests/src/test/scala-jdk9-only/akka/stream/scaladsl/FlowPublisherSinkSpec.scala new file mode 100644 index 0000000000..1b4285349a --- /dev/null +++ b/akka-stream-tests/src/test/scala-jdk9-only/akka/stream/scaladsl/FlowPublisherSinkSpec.scala @@ -0,0 +1,36 @@ +/** + * Copyright (C) 2014-2017 Lightbend Inc. + */ +package akka.stream.scaladsl + +import akka.stream.testkit.StreamSpec +import akka.stream.{ ClosedShape, ActorMaterializer } + +import akka.stream.testkit.Utils._ +import scala.concurrent.duration._ + +import scala.concurrent.Await + +class FlowPublisherSinkSpec extends StreamSpec { + + implicit val materializer = ActorMaterializer() + + "A FlowPublisherSink" must { + + "work with SubscriberSource" in { + val (sub, pub) = JavaFlowSupport.Source.asSubscriber[Int].toMat(JavaFlowSupport.Sink.asPublisher(false))(Keep.both).run() + Source(1 to 100).to(JavaFlowSupport.Sink.fromSubscriber(sub)).run() + Await.result(JavaFlowSupport.Source.fromPublisher(pub).limit(1000).runWith(Sink.seq), 3.seconds) should ===(1 to 100) + } + + "be able to use Publisher in materialized value transformation" in { + val f = Source(1 to 3).runWith( + JavaFlowSupport.Sink.asPublisher[Int](false).mapMaterializedValue { p ⇒ + JavaFlowSupport.Source.fromPublisher(p).runFold(0)(_ + _) + }) + + Await.result(f, 3.seconds) should be(6) + } + } + +} diff --git a/akka-stream/src/main/java-jdk9-only/akka/stream/javadsl/JavaFlowSupport.java b/akka-stream/src/main/java-jdk9-only/akka/stream/javadsl/JavaFlowSupport.java new file mode 100644 index 0000000000..9b43c74444 --- /dev/null +++ b/akka-stream/src/main/java-jdk9-only/akka/stream/javadsl/JavaFlowSupport.java @@ -0,0 +1,152 @@ +/* + * Copyright (C) 2017 Lightbend Inc. + */ + +package akka.stream.javadsl; + +import akka.NotUsed; +import akka.japi.Pair; +import akka.japi.Creator; +import akka.stream.impl.JavaFlowAndRsConverters; + +/** + * For use only with `JDK 9+`. + *

+ * Provides support for `java.util.concurrent.Flow.*` interfaces which mirror the Reactive Streams + * interfaces from `org.reactivestreams`. See reactive-streams.org. + */ +public final class JavaFlowSupport { + + private static final NotUsed NotUsed = akka.NotUsed.getInstance(); + + private JavaFlowSupport() { + throw new RuntimeException("No instances allowed!"); + } + + /** + * {@link akka.stream.javadsl.Flow]] factories operating with {@code java.util.concurrent.Flow.*} interfaces. + */ + public static final class Source { + private Source() { + throw new RuntimeException("No instances allowed!"); + } + + /** + * Helper to create {@code Source} from {@link java.util.concurrent.Flow.Publisher}. + *

+ * Construct a transformation starting with given publisher. The transformation steps + * are executed by a series of {@link java.util.concurrent.Flow.Processor} instances + * that mediate the flow of elements downstream and the propagation of + * back-pressure upstream. + *

+ * See also {@code Source.fromPublisher} if wanting to integrate with {@link org.reactivestreams.Publisher} instead + * (which carries the same semantics, however existed before RS's inclusion in Java 9). + */ + public static akka.stream.javadsl.Source fromPublisher(java.util.concurrent.Flow.Publisher publisher) { + return akka.stream.javadsl.Source.fromPublisher(JavaFlowAndRsConverters.asRs(publisher)); + } + + /** + * Creates a {@code Source} that is materialized as a {@link java.util.concurrent.Flow.Subscriber}. + *

+ * See also {@code Source.asSubscriber} if wanting to integrate with {@link org.reactivestreams.Subscriber} instead + * (which carries the same semantics, however existed before RS's inclusion in Java 9). + */ + public static akka.stream.javadsl.Source> asSubscriber() { + return akka.stream.javadsl.Source.asSubscriber().mapMaterializedValue(JavaFlowAndRsConverters::asJava); + } + } + + /** + * {@link akka.stream.javadsl.Flow]] factories operating with {@code java.util.concurrent.Flow.*} interfaces. + */ + public static final class Flow { + private Flow() { + throw new RuntimeException("No instances allowed!"); + } + + /** + * Creates a Flow from a {@link java.util.concurrent.Flow.Processor} + */ + public static akka.stream.javadsl.Flow fromProcessor(Creator> processorFactory) throws Exception { + return fromProcessorMat(() -> Pair.apply(processorFactory.create(), NotUsed)); + } + + + /** + * Creates a Flow from a {@link java.util.concurrent.Flow.Processor>> and returns a materialized value. + */ + public static akka.stream.javadsl.Flow fromProcessorMat( + akka.japi.Creator, M>> processorFactory) throws Exception { + final Pair, M> value = processorFactory.create(); + final java.util.concurrent.Flow.Processor processor = value.first(); + final M mat = value.second(); + + return akka.stream.javadsl.Flow.fromProcessorMat(() -> + akka.japi.Pair.apply(JavaFlowAndRsConverters.asRs(processor), mat) + ); + + } + + /** + * Converts this Flow to a {@code RunnableGraph} that materializes to a Reactive Streams {@link java.util.concurrent.Flow.Processor} + * which implements the operations encapsulated by this Flow. Every materialization results in a new Processor + * instance, i.e. the returned {@code RunnableGraph} is reusable. + * + * @return A {@code RunnableGraph} that materializes to a {@code Processor} when {@code run()} is called on it. + */ + public static akka.stream.javadsl.RunnableGraph> toProcessor(akka.stream.javadsl.Flow flow) { + final akka.stream.javadsl.Source> source = JavaFlowSupport.Source.asSubscriber(); + final akka.stream.javadsl.Sink> sink = JavaFlowSupport.Sink.asPublisher(AsPublisher.WITHOUT_FANOUT); + + // have to jump though scaladsl for the toMat because type inference of the Keep.both + return + source.via(flow).toMat(sink, Keep.both()) + .mapMaterializedValue(pair -> { + final java.util.concurrent.Flow.Subscriber sub = pair.first(); + final java.util.concurrent.Flow.Publisher pub = pair.second(); + + return new java.util.concurrent.Flow.Processor() { + @Override public void onError(Throwable t) { sub.onError(t); } + @Override public void onSubscribe(java.util.concurrent.Flow.Subscription s) { sub.onSubscribe(s); } + @Override public void onComplete() { sub.onComplete(); } + @Override public void onNext(In t) { sub.onNext(t); } + @Override public void subscribe(java.util.concurrent.Flow.Subscriber s) { pub.subscribe(s); } + }; + }); + } + } + + /** + * {@link akka.stream.javadsl.Sink} factories operating with {@code java.util.concurrent.Flow.*} interfaces. + */ + public static final class Sink { + private Sink() { + throw new RuntimeException("No instances allowed!"); + } + + /** + * A `Sink` that materializes into a {@link java.util.concurrent.Flow.Publisher}. + *

+ * If {@code fanout} is {@code WITH_FANOUT}, the materialized {@code Publisher} will support multiple {@code Subscriber}s and + * the size of the {@code inputBuffer} configured for this stage becomes the maximum number of elements that + * the fastest {@link java.util.concurrent.Flow.Subscriber} can be ahead of the slowest one before slowing + * the processing down due to back pressure. + *

+ * If {@code fanout} is {@code WITHOUT_FANOUT} then the materialized {@code Publisher} will only support a single {@code Subscriber} and + * reject any additional {@code Subscriber}s. + */ + public static akka.stream.javadsl.Sink> asPublisher(AsPublisher fanout) { + return akka.stream.javadsl.Sink.asPublisher(fanout).mapMaterializedValue(JavaFlowAndRsConverters::asJava); + } + + /** + * Helper to create <> from <>. + */ + public static akka.stream.javadsl.Sink fromSubscriber(java.util.concurrent.Flow.Subscriber s) { + return akka.stream.javadsl.Sink.fromSubscriber(JavaFlowAndRsConverters.asRs(s)); + } + + } + +} diff --git a/akka-stream/src/main/scala-jdk9-only/akka/stream/impl/JavaFlowAndRsConverters.scala b/akka-stream/src/main/scala-jdk9-only/akka/stream/impl/JavaFlowAndRsConverters.scala new file mode 100644 index 0000000000..032dee7187 --- /dev/null +++ b/akka-stream/src/main/scala-jdk9-only/akka/stream/impl/JavaFlowAndRsConverters.scala @@ -0,0 +1,197 @@ +/* + * Copyright (C) 2017 Lightbend Inc. + */ + +package akka.stream.impl + +import java.util.concurrent.Flow + +import akka.annotation.InternalApi +import org.{ reactivestreams => rs } +import JavaFlowAndRsConverters.Implicits._ + +/** + * INTERNAL API: Provides converters between Reactive Streams (reactive-streams.org) and their Java 9+ counter-parts, + * defined in `java.util.concurrent.Flow.*`. This API is internal because Reactive Streams will ship with such + * adapters itself at some point, and we'd not want to duplicate that effort for users to be confused about which ones + * to use. These adapters are used internally by Akka Streams to convert between the standards but you should not touch + * them directly - use thr `JavaFlowSupport` classes instead. + * + * Please note that either of these types are designed for *inter-op* and usually should not be used directly + * in applications. The intended use case is for shared libraries, like database drivers or similar to provide + * the inter-operable types, such that other librarie can co-operate with them directly, if that is your use case + * and you're using the j.u.c.Flow types, use the [[akka.stream.scaladsl.JavaFlowSupport]] sources/sinks/flows instead. + * + * The interfaces as well as semantic contract of these sets of interfaces. + * Please see https://github.com/reactive-streams/reactive-streams-jvm to learn more about the specification, + * and TCK that is available to verify an implementation is a valid implementation. + */ +@InternalApi +private[akka] object JavaFlowAndRsConverters { + + /** Adds `asJava` and `asRs` enrichment methods to Reactive Streams and j.u.c.Flow types. */ + object Implicits { + final implicit class FlowPublisherConverter[T](val p: rs.Publisher[T]) extends AnyVal { + def asJava: Flow.Publisher[T] = + if (p eq null) null else JavaFlowAndRsConverters.asJava(p) + } + final implicit class RsPublisherConverter[T](val p: Flow.Publisher[T]) extends AnyVal { + def asRs: rs.Publisher[T] = JavaFlowAndRsConverters.asRs(p) + } + + final implicit class FlowSubscriberConverter[T](val s: rs.Subscriber[T]) extends AnyVal { + def asJava: Flow.Subscriber[T] = JavaFlowAndRsConverters.asJava(s) + } + final implicit class RsSubscriberConverter[T](val s: Flow.Subscriber[T]) extends AnyVal { + def asRs: rs.Subscriber[T] = JavaFlowAndRsConverters.asRs(s) + } + + final implicit class FlowProcessorConverter[T, R](val s: rs.Processor[T, R]) extends AnyVal { + def asJava: Flow.Processor[T, R] = JavaFlowAndRsConverters.asJava(s) + } + final implicit class RsProcessorConverter[T, R](val s: Flow.Processor[T, R]) extends AnyVal { + def asRs: rs.Processor[T, R] = JavaFlowAndRsConverters.asRs(s) + } + + final implicit class FlowSubscriptionConverter[T](val s: rs.Subscription) extends AnyVal { + def asJava: Flow.Subscription = JavaFlowAndRsConverters.asJava(s) + } + final implicit class RsSubscriptionConverter[T](val s: Flow.Subscription) extends AnyVal { + def asRs: rs.Subscription = JavaFlowAndRsConverters.asRs(s) + } + } + + final def asJava[T](p: rs.Publisher[T]): Flow.Publisher[T] = p match { + case null ⇒ null // null remains null + case adapter: JavaFlowPublisherToRsAdapter[T] ⇒ adapter.delegate // unwrap adapter instead of wrapping again + case delegate ⇒ new RsPublisherToJavaFlowAdapter(delegate) // adapt, it is a real Publisher + } + final def asRs[T](p: Flow.Publisher[T]): rs.Publisher[T] = p match { + case null ⇒ null // null remains null + case adapter: RsPublisherToJavaFlowAdapter[T] ⇒ adapter.delegate // unwrap adapter instead of wrapping again + case _ => new JavaFlowPublisherToRsAdapter[T](p) + } + + final def asJava[T](s: rs.Subscription): Flow.Subscription = s match { + case null ⇒ null // null remains null + case adapter: JavaFlowSubscriptionToRsAdapter ⇒ adapter.delegate // unwrap adapter instead of wrapping again + case _ => new RsSubscriptionToJavaFlowAdapter(s) + } + final def asRs[T](s: Flow.Subscription): rs.Subscription = s match { + case null ⇒ null // null remains null + case adapter: RsSubscriptionToJavaFlowAdapter ⇒ adapter.delegate // unwrap adapter instead of wrapping again + case _ => new JavaFlowSubscriptionToRsAdapter(s) + } + + final def asJava[T](s: rs.Subscriber[T]): Flow.Subscriber[T] = + s match { + case null ⇒ null // null remains null + case adapter: JavaFlowSubscriberToRsAdapter[T] ⇒ adapter.delegate // unwrap adapter instead of wrapping again + case _ => new RsSubscriberToJavaFlowAdapter[T](s) + } + final def asRs[T](s: Flow.Subscriber[T]): rs.Subscriber[T] = s match { + case null ⇒ null // null remains null + case adapter: RsSubscriberToJavaFlowAdapter[T] ⇒ adapter.delegate // unwrap adapter instead of wrapping again + case _ => new JavaFlowSubscriberToRsAdapter[T](s) + } + + final def asJava[T, R](p: rs.Processor[T, R]): Flow.Processor[T, R] = p match { + case null ⇒ null // null remains null + case adapter: JavaFlowProcessorToRsAdapter[T, R] ⇒ adapter.delegate // unwrap adapter instead of wrapping again + case _ => new RsProcessorToJavaFlowAdapter[T, R](p) + } + final def asRs[T, R](p: Flow.Processor[T, R]): rs.Processor[T, R] = p match { + case null ⇒ null // null remains null + case adapter: RsProcessorToJavaFlowAdapter[T, R] ⇒ adapter.delegate // unwrap adapter instead of wrapping again + case _ => new JavaFlowProcessorToRsAdapter[T, R](p) + } + +} + +/** INTERNAL API: Adapters are not meant to be touched directly */ +@InternalApi private[akka] final class JavaFlowPublisherToRsAdapter[T](val delegate: Flow.Publisher[T]) extends rs.Publisher[T] { + override def subscribe(rsSubscriber: rs.Subscriber[_ >: T]): Unit = + delegate.subscribe(rsSubscriber.asJava) +} +/** INTERNAL API: Adapters are not meant to be touched directly */ +@InternalApi private[akka] final class RsPublisherToJavaFlowAdapter[T](val delegate: rs.Publisher[T]) extends Flow.Publisher[T] { + override def subscribe(javaSubscriber: Flow.Subscriber[_ >: T]): Unit = + delegate.subscribe(javaSubscriber.asRs) +} + +/** INTERNAL API: Adapters are not meant to be touched directly */ +@InternalApi private[akka] final class RsSubscriberToJavaFlowAdapter[T](val delegate: rs.Subscriber[T]) extends Flow.Subscriber[T] { + override def onError(t: Throwable): Unit = + delegate.onError(t) + + override def onComplete(): Unit = + delegate.onComplete() + + override def onNext(elem: T): Unit = + delegate.onNext(elem) + + override def onSubscribe(s: Flow.Subscription): Unit = + delegate.onSubscribe(s.asRs) +} +/** INTERNAL API: Adapters are not meant to be touched directly */ +@InternalApi private[akka] final class JavaFlowSubscriberToRsAdapter[T](val delegate: Flow.Subscriber[T]) extends rs.Subscriber[T] { + override def onError(t: Throwable): Unit = + delegate.onError(t) + + override def onComplete(): Unit = + delegate.onComplete() + + override def onNext(elem: T): Unit = + delegate.onNext(elem) + + override def onSubscribe(s: rs.Subscription): Unit = + delegate.onSubscribe(s.asJava) +} + +/** INTERNAL API: Adapters are not meant to be touched directly */ +@InternalApi private[akka] final class RsSubscriptionToJavaFlowAdapter(val delegate: rs.Subscription) extends Flow.Subscription { + override def cancel(): Unit = delegate.cancel() + + override def request(n: Long): Unit = delegate.request(n) +} +/** INTERNAL API: Adapters are not meant to be touched directly */ +@InternalApi private[akka] final class JavaFlowSubscriptionToRsAdapter(val delegate: Flow.Subscription) extends rs.Subscription { + override def cancel(): Unit = delegate.cancel() + + override def request(n: Long): Unit = delegate.request(n) +} + +/** INTERNAL API: Adapters are not meant to be touched directly */ +@InternalApi private[akka] final class RsProcessorToJavaFlowAdapter[T, R](val delegate: rs.Processor[T, R]) extends Flow.Processor[T, R] { + override def onError(t: Throwable): Unit = + delegate.onError(t) + + override def onComplete(): Unit = + delegate.onComplete() + + override def onNext(elem: T): Unit = + delegate.onNext(elem) + + override def onSubscribe(s: Flow.Subscription): Unit = + delegate.onSubscribe(s.asRs) + + override def subscribe(javaSubscriber: Flow.Subscriber[_ >: R]): Unit = + delegate.subscribe(javaSubscriber.asRs) +} +/** INTERNAL API: Adapters are not meant to be touched directly */ +@InternalApi private[akka] final class JavaFlowProcessorToRsAdapter[T, R](val delegate: Flow.Processor[T, R]) extends rs.Processor[T, R] { + override def onError(t: Throwable): Unit = + delegate.onError(t) + + override def onComplete(): Unit = + delegate.onComplete() + + override def onNext(elem: T): Unit = + delegate.onNext(elem) + + override def onSubscribe(s: rs.Subscription): Unit = + delegate.onSubscribe(s.asJava) + + override def subscribe(rsSubscriber: rs.Subscriber[_ >: R]): Unit = + delegate.subscribe(rsSubscriber.asJava) +} diff --git a/akka-stream/src/main/scala-jdk9-only/akka/stream/scaladsl/JavaFlowSupport.scala b/akka-stream/src/main/scala-jdk9-only/akka/stream/scaladsl/JavaFlowSupport.scala new file mode 100644 index 0000000000..0c58b5e192 --- /dev/null +++ b/akka-stream/src/main/scala-jdk9-only/akka/stream/scaladsl/JavaFlowSupport.scala @@ -0,0 +1,120 @@ +/* + * Copyright (C) 2017 Lightbend Inc. + */ + +package akka.stream.scaladsl + +import java.util.{ concurrent ⇒ juc } + +import akka.NotUsed +import akka.stream.impl.JavaFlowAndRsConverters +import akka.stream.scaladsl + +import scala.annotation.unchecked.uncheckedVariance + +/** + * For use only with `JDK 9+`. + * + * Provides support for `java.util.concurrent.Flow.*` interfaces which mirror the Reactive Streams + * interfaces from `org.reactivestreams`. See: [http://www.reactive-streams.org/](reactive-streams.org). + */ +object JavaFlowSupport { + import JavaFlowAndRsConverters.Implicits._ + + /** + * [[akka.stream.scaladsl.Source]] factories operating with `java.util.concurrent.Flow.*` interfaces. + */ + object Source { + + /** + * Helper to create [[Source]] from [[java.util.concurrent.Flow.Publisher]]. + * + * Construct a transformation starting with given publisher. The transformation steps + * are executed by a series of [[java.util.concurrent.Flow.Processor]] instances + * that mediate the flow of elements downstream and the propagation of + * back-pressure upstream. + * + * @see See also [[Source.fromPublisher]] if wanting to integrate with [[org.reactivestreams.Publisher]] instead + * (which carries the same semantics, however existed before RS's inclusion in Java 9). + */ + final def fromPublisher[T](publisher: juc.Flow.Publisher[T]): Source[T, NotUsed] = + scaladsl.Source.fromPublisher(publisher.asRs) + + /** + * Creates a `Source` that is materialized as a [[java.util.concurrent.Flow.Subscriber]] + * + * @see See also [[Source.asSubscriber]] if wanting to integrate with [[org.reactivestreams.Subscriber]] instead + * (which carries the same semantics, however existed before RS's inclusion in Java 9). + */ + final def asSubscriber[T]: Source[T, juc.Flow.Subscriber[T]] = + scaladsl.Source.asSubscriber[T].mapMaterializedValue(_.asJava) + } + + /** + * [[akka.stream.scaladsl.Flow]] factories operating with `java.util.concurrent.Flow.*` interfaces. + */ + object Flow { + + /** + * Creates a Flow from a Reactive Streams [[org.reactivestreams.Processor]] + */ + def fromProcessor[I, O](processorFactory: () ⇒ juc.Flow.Processor[I, O]): Flow[I, O, NotUsed] = { + fromProcessorMat(() ⇒ (processorFactory(), NotUsed)) + } + + /** + * Creates a Flow from a Reactive Streams [[java.util.concurrent.Flow.Processor]] and returns a materialized value. + */ + def fromProcessorMat[I, O, M](processorFactory: () ⇒ (juc.Flow.Processor[I, O], M)): Flow[I, O, M] = + scaladsl.Flow.fromProcessorMat { () ⇒ + val (processor, mat) = processorFactory() + (processor.asRs, mat) + } + + /** + * Converts this Flow to a [[RunnableGraph]] that materializes to a Reactive Streams [[java.util.concurrent.Flow.Processor]] + * which implements the operations encapsulated by this Flow. Every materialization results in a new Processor + * instance, i.e. the returned [[RunnableGraph]] is reusable. + * + * @return A [[RunnableGraph]] that materializes to a Processor when run() is called on it. + */ + def toProcessor[In, Out, Mat](self: Flow[In, Out, Mat]): RunnableGraph[juc.Flow.Processor[In @uncheckedVariance, Out @uncheckedVariance]] = + Source.asSubscriber[In].via(self) + .toMat(Sink.asPublisher[Out](fanout = false))(Keep.both) + .mapMaterializedValue { + case (sub, pub) ⇒ new juc.Flow.Processor[In, Out] { + override def onError(t: Throwable): Unit = sub.onError(t) + override def onSubscribe(s: juc.Flow.Subscription): Unit = sub.onSubscribe(s) + override def onComplete(): Unit = sub.onComplete() + override def onNext(t: In): Unit = sub.onNext(t) + override def subscribe(s: juc.Flow.Subscriber[_ >: Out]): Unit = pub.subscribe(s) + } + } + } + + /** + * [[akka.stream.scaladsl.Sink]] factories operating with `java.util.concurrent.Flow.*` interfaces. + */ + object Sink { + /** + * A `Sink` that materializes into a [[java.util.concurrent.Flow.Publisher]]. + * + * If `fanout` is `WITH_FANOUT`, the materialized `Publisher` will support multiple `Subscriber`s and + * the size of the `inputBuffer` configured for this stage becomes the maximum number of elements that + * the fastest [[java.util.concurrent.Flow.Subscriber]] can be ahead of the slowest one before slowing + * the processing down due to back pressure. + * + * If `fanout` is `WITHOUT_FANOUT` then the materialized `Publisher` will only support a single `Subscriber` and + * reject any additional `Subscriber`s. + */ + final def asPublisher[T](fanout: Boolean): Sink[T, juc.Flow.Publisher[T]] = + scaladsl.Sink.asPublisher[T](fanout).mapMaterializedValue(_.asJava) + + /** + * Helper to create [[Sink]] from [[java.util.concurrent.Flow.Subscriber]]. + */ + final def fromSubscriber[T](s: juc.Flow.Subscriber[T]): Sink[T, NotUsed] = + scaladsl.Sink.fromSubscriber(s.asRs) + } + +} diff --git a/project/Jdk9CompileDirectoriesPlugin.scala b/project/Jdk9CompileDirectoriesPlugin.scala new file mode 100644 index 0000000000..2b9abbfc63 --- /dev/null +++ b/project/Jdk9CompileDirectoriesPlugin.scala @@ -0,0 +1,47 @@ +/* + * Copyright (C) 2017 Lightbend Inc. + */ +import java.io.File + +import sbt._ +import sbt.Keys._ + +object Jdk9CompileDirectoriesPlugin extends AutoPlugin { + + val jdkVersion: String = System.getProperty("java.version") + + override def trigger = allRequirements + + override lazy val projectSettings = Seq( + + javacOptions in Compile ++= { + // making sure we're really targeting 1.8 + if (isJDK9) Seq("-target", "1.8", "-source", "1.8", "-Xdoclint:none") + else Seq("-Xdoclint:none") + }, + + unmanagedSourceDirectories in Compile ++= { + if (isJDK9) { + println(s"[JDK9] Enabled [...-jdk9-only] directories to be compiled.") + Seq( + (sourceDirectory in Compile).value / "java-jdk9-only", + (sourceDirectory in Compile).value / "scala-jdk9-only" + ) + } else Seq.empty + }, + + unmanagedSourceDirectories in Test ++= { + if (isJDK9) { + Seq( + (sourceDirectory in Test).value / "java-jdk9-only", + (sourceDirectory in Test).value / "scala-jdk9-only" + ) + } else Seq.empty + } + + ) + + private def isJDK9 = { + jdkVersion startsWith "9" + } +}