=jdk9,str JDK9 ready classes, in special directories (#23650)
* =jdk9,str JDK9 ready classes, in special directories * =str,jdk9 prepare for releasing using JDK9 (needs sbt 1.0+ though) * =str,jdk9 passing TCK spec touching some JDK9 types internally * make sure to include linting options
This commit is contained in:
parent
9bd6ca4c8b
commit
349a5f749a
7 changed files with 623 additions and 0 deletions
|
|
@ -0,0 +1,26 @@
|
|||
/*
|
||||
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com/>
|
||||
*/
|
||||
package akka.stream.tck
|
||||
|
||||
import java.util.concurrent.{ Flow => JavaFlow }
|
||||
|
||||
import akka.NotUsed
|
||||
import akka.stream.scaladsl.{ Flow, JavaFlowSupport, Sink, Source }
|
||||
import org.reactivestreams._
|
||||
|
||||
class IterablePublisherViaJavaFlowPublisherTest extends AkkaPublisherVerification[Int] {
|
||||
|
||||
override def createPublisher(elements: Long): Publisher[Int] = {
|
||||
val sourceViaJavaFlowPublisher: JavaFlow.Publisher[Int] = Source(iterable(elements))
|
||||
.runWith(JavaFlowSupport.Sink.asPublisher(fanout = false))
|
||||
|
||||
|
||||
val javaFlowPublisherIntoAkkaSource: Source[Int, NotUsed] =
|
||||
JavaFlowSupport.Source.fromPublisher(sourceViaJavaFlowPublisher)
|
||||
|
||||
javaFlowPublisherIntoAkkaSource
|
||||
.runWith(Sink.asPublisher(false)) // back as RS Publisher
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,45 @@
|
|||
/*
|
||||
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com/>
|
||||
*/
|
||||
|
||||
package akka.stream.javadsl;
|
||||
|
||||
import akka.NotUsed;
|
||||
import akka.japi.Pair;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.concurrent.Flow;
|
||||
|
||||
public class JavaFlowSupportCompileTest {
|
||||
@Test
|
||||
public void shouldCompile() throws Exception {
|
||||
final Flow.Processor<String,String> processor = new Flow.Processor<String, String>() {
|
||||
@Override
|
||||
public void subscribe(Flow.Subscriber<? super String> subscriber) {}
|
||||
@Override
|
||||
public void onSubscribe(Flow.Subscription subscription) {}
|
||||
@Override
|
||||
public void onNext(String item) {}
|
||||
@Override
|
||||
public void onError(Throwable throwable) {}
|
||||
@Override
|
||||
public void onComplete() {}
|
||||
};
|
||||
|
||||
|
||||
final Source<String, Flow.Subscriber<String>> stringSubscriberSource =
|
||||
JavaFlowSupport.Source.asSubscriber();
|
||||
final Source<String, NotUsed> stringNotUsedSource =
|
||||
JavaFlowSupport.Source.fromPublisher(processor);
|
||||
|
||||
final akka.stream.javadsl.Flow<String, String, NotUsed> stringStringNotUsedFlow =
|
||||
JavaFlowSupport.Flow.fromProcessor(() -> processor);
|
||||
final akka.stream.javadsl.Flow<String, String, NotUsed> stringStringNotUsedFlow1 =
|
||||
JavaFlowSupport.Flow.fromProcessorMat(() -> Pair.apply(processor, NotUsed.getInstance()));
|
||||
|
||||
final Sink<String, Flow.Publisher<String>> stringPublisherSink =
|
||||
JavaFlowSupport.Sink.asPublisher(AsPublisher.WITH_FANOUT);
|
||||
final Sink<String, NotUsed> stringNotUsedSink =
|
||||
JavaFlowSupport.Sink.fromSubscriber(processor);
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,36 @@
|
|||
/**
|
||||
* Copyright (C) 2014-2017 Lightbend Inc. <http://www.lightbend.com>
|
||||
*/
|
||||
package akka.stream.scaladsl
|
||||
|
||||
import akka.stream.testkit.StreamSpec
|
||||
import akka.stream.{ ClosedShape, ActorMaterializer }
|
||||
|
||||
import akka.stream.testkit.Utils._
|
||||
import scala.concurrent.duration._
|
||||
|
||||
import scala.concurrent.Await
|
||||
|
||||
class FlowPublisherSinkSpec extends StreamSpec {
|
||||
|
||||
implicit val materializer = ActorMaterializer()
|
||||
|
||||
"A FlowPublisherSink" must {
|
||||
|
||||
"work with SubscriberSource" in {
|
||||
val (sub, pub) = JavaFlowSupport.Source.asSubscriber[Int].toMat(JavaFlowSupport.Sink.asPublisher(false))(Keep.both).run()
|
||||
Source(1 to 100).to(JavaFlowSupport.Sink.fromSubscriber(sub)).run()
|
||||
Await.result(JavaFlowSupport.Source.fromPublisher(pub).limit(1000).runWith(Sink.seq), 3.seconds) should ===(1 to 100)
|
||||
}
|
||||
|
||||
"be able to use Publisher in materialized value transformation" in {
|
||||
val f = Source(1 to 3).runWith(
|
||||
JavaFlowSupport.Sink.asPublisher[Int](false).mapMaterializedValue { p ⇒
|
||||
JavaFlowSupport.Source.fromPublisher(p).runFold(0)(_ + _)
|
||||
})
|
||||
|
||||
Await.result(f, 3.seconds) should be(6)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,152 @@
|
|||
/*
|
||||
* Copyright (C) 2017 Lightbend Inc. <http//www.lightbend.com/>
|
||||
*/
|
||||
|
||||
package akka.stream.javadsl;
|
||||
|
||||
import akka.NotUsed;
|
||||
import akka.japi.Pair;
|
||||
import akka.japi.Creator;
|
||||
import akka.stream.impl.JavaFlowAndRsConverters;
|
||||
|
||||
/**
|
||||
* For use only with `JDK 9+`.
|
||||
* <p>
|
||||
* Provides support for `java.util.concurrent.Flow.*` interfaces which mirror the Reactive Streams
|
||||
* interfaces from `org.reactivestreams`. See <a href="http//www.reactive-streams.org/">reactive-streams.org</a>.
|
||||
*/
|
||||
public final class JavaFlowSupport {
|
||||
|
||||
private static final NotUsed NotUsed = akka.NotUsed.getInstance();
|
||||
|
||||
private JavaFlowSupport() {
|
||||
throw new RuntimeException("No instances allowed!");
|
||||
}
|
||||
|
||||
/**
|
||||
* {@link akka.stream.javadsl.Flow]] factories operating with {@code java.util.concurrent.Flow.*} interfaces.
|
||||
*/
|
||||
public static final class Source {
|
||||
private Source() {
|
||||
throw new RuntimeException("No instances allowed!");
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper to create {@code Source} from {@link java.util.concurrent.Flow.Publisher}.
|
||||
* <p>
|
||||
* Construct a transformation starting with given publisher. The transformation steps
|
||||
* are executed by a series of {@link java.util.concurrent.Flow.Processor} instances
|
||||
* that mediate the flow of elements downstream and the propagation of
|
||||
* back-pressure upstream.
|
||||
* <p>
|
||||
* See also {@code Source.fromPublisher} if wanting to integrate with {@link org.reactivestreams.Publisher} instead
|
||||
* (which carries the same semantics, however existed before RS's inclusion in Java 9).
|
||||
*/
|
||||
public static <T> akka.stream.javadsl.Source<T, NotUsed> fromPublisher(java.util.concurrent.Flow.Publisher<T> publisher) {
|
||||
return akka.stream.javadsl.Source.<T>fromPublisher(JavaFlowAndRsConverters.asRs(publisher));
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a {@code Source} that is materialized as a {@link java.util.concurrent.Flow.Subscriber}.
|
||||
* <p>
|
||||
* See also {@code Source.asSubscriber} if wanting to integrate with {@link org.reactivestreams.Subscriber} instead
|
||||
* (which carries the same semantics, however existed before RS's inclusion in Java 9).
|
||||
*/
|
||||
public static <T> akka.stream.javadsl.Source<T, java.util.concurrent.Flow.Subscriber<T>> asSubscriber() {
|
||||
return akka.stream.javadsl.Source.<T>asSubscriber().mapMaterializedValue(JavaFlowAndRsConverters::asJava);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* {@link akka.stream.javadsl.Flow]] factories operating with {@code java.util.concurrent.Flow.*} interfaces.
|
||||
*/
|
||||
public static final class Flow {
|
||||
private Flow() {
|
||||
throw new RuntimeException("No instances allowed!");
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a Flow from a {@link java.util.concurrent.Flow.Processor}
|
||||
*/
|
||||
public static <I, O> akka.stream.javadsl.Flow<I, O, NotUsed> fromProcessor(Creator<java.util.concurrent.Flow.Processor<I, O>> processorFactory) throws Exception {
|
||||
return fromProcessorMat(() -> Pair.apply(processorFactory.create(), NotUsed));
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Creates a Flow from a {@link java.util.concurrent.Flow.Processor>> and returns a materialized value.
|
||||
*/
|
||||
public static <I, O, M> akka.stream.javadsl.Flow<I, O, M> fromProcessorMat(
|
||||
akka.japi.Creator<akka.japi.Pair<java.util.concurrent.Flow.Processor<I, O>, M>> processorFactory) throws Exception {
|
||||
final Pair<java.util.concurrent.Flow.Processor<I, O>, M> value = processorFactory.create();
|
||||
final java.util.concurrent.Flow.Processor<I, O> processor = value.first();
|
||||
final M mat = value.second();
|
||||
|
||||
return akka.stream.javadsl.Flow.fromProcessorMat(() ->
|
||||
akka.japi.Pair.apply(JavaFlowAndRsConverters.asRs(processor), mat)
|
||||
);
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Converts this Flow to a {@code RunnableGraph} that materializes to a Reactive Streams {@link java.util.concurrent.Flow.Processor}
|
||||
* which implements the operations encapsulated by this Flow. Every materialization results in a new Processor
|
||||
* instance, i.e. the returned {@code RunnableGraph} is reusable.
|
||||
*
|
||||
* @return A {@code RunnableGraph} that materializes to a {@code Processor} when {@code run()} is called on it.
|
||||
*/
|
||||
public static <In, Out, Mat> akka.stream.javadsl.RunnableGraph<java.util.concurrent.Flow.Processor<In, Out>> toProcessor(akka.stream.javadsl.Flow<In, Out, Mat> flow) {
|
||||
final akka.stream.javadsl.Source<In, java.util.concurrent.Flow.Subscriber<In>> source = JavaFlowSupport.Source.<In>asSubscriber();
|
||||
final akka.stream.javadsl.Sink<Out, java.util.concurrent.Flow.Publisher<Out>> sink = JavaFlowSupport.Sink.<Out>asPublisher(AsPublisher.WITHOUT_FANOUT);
|
||||
|
||||
// have to jump though scaladsl for the toMat because type inference of the Keep.both
|
||||
return
|
||||
source.via(flow).toMat(sink, Keep.both())
|
||||
.mapMaterializedValue(pair -> {
|
||||
final java.util.concurrent.Flow.Subscriber<In> sub = pair.first();
|
||||
final java.util.concurrent.Flow.Publisher<Out> pub = pair.second();
|
||||
|
||||
return new java.util.concurrent.Flow.Processor<In, Out>() {
|
||||
@Override public void onError(Throwable t) { sub.onError(t); }
|
||||
@Override public void onSubscribe(java.util.concurrent.Flow.Subscription s) { sub.onSubscribe(s); }
|
||||
@Override public void onComplete() { sub.onComplete(); }
|
||||
@Override public void onNext(In t) { sub.onNext(t); }
|
||||
@Override public void subscribe(java.util.concurrent.Flow.Subscriber<? super Out> s) { pub.subscribe(s); }
|
||||
};
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* {@link akka.stream.javadsl.Sink} factories operating with {@code java.util.concurrent.Flow.*} interfaces.
|
||||
*/
|
||||
public static final class Sink {
|
||||
private Sink() {
|
||||
throw new RuntimeException("No instances allowed!");
|
||||
}
|
||||
|
||||
/**
|
||||
* A `Sink` that materializes into a {@link java.util.concurrent.Flow.Publisher}.
|
||||
* <p>
|
||||
* If {@code fanout} is {@code WITH_FANOUT}, the materialized {@code Publisher} will support multiple {@code Subscriber}s and
|
||||
* the size of the {@code inputBuffer} configured for this stage becomes the maximum number of elements that
|
||||
* the fastest {@link java.util.concurrent.Flow.Subscriber} can be ahead of the slowest one before slowing
|
||||
* the processing down due to back pressure.
|
||||
* <p>
|
||||
* If {@code fanout} is {@code WITHOUT_FANOUT} then the materialized {@code Publisher} will only support a single {@code Subscriber} and
|
||||
* reject any additional {@code Subscriber}s.
|
||||
*/
|
||||
public static <T> akka.stream.javadsl.Sink<T, java.util.concurrent.Flow.Publisher<T>> asPublisher(AsPublisher fanout) {
|
||||
return akka.stream.javadsl.Sink.<T>asPublisher(fanout).mapMaterializedValue(JavaFlowAndRsConverters::asJava);
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper to create <<Sink>> from <<java.util.concurrent.Flow.Subscriber>>.
|
||||
*/
|
||||
public static <T> akka.stream.javadsl.Sink<T, NotUsed> fromSubscriber(java.util.concurrent.Flow.Subscriber<T> s) {
|
||||
return akka.stream.javadsl.Sink.fromSubscriber(JavaFlowAndRsConverters.asRs(s));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,197 @@
|
|||
/*
|
||||
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com/>
|
||||
*/
|
||||
|
||||
package akka.stream.impl
|
||||
|
||||
import java.util.concurrent.Flow
|
||||
|
||||
import akka.annotation.InternalApi
|
||||
import org.{ reactivestreams => rs }
|
||||
import JavaFlowAndRsConverters.Implicits._
|
||||
|
||||
/**
|
||||
* INTERNAL API: Provides converters between Reactive Streams (reactive-streams.org) and their Java 9+ counter-parts,
|
||||
* defined in `java.util.concurrent.Flow.*`. This API is internal because Reactive Streams will ship with such
|
||||
* adapters itself at some point, and we'd not want to duplicate that effort for users to be confused about which ones
|
||||
* to use. These adapters are used internally by Akka Streams to convert between the standards but you should not touch
|
||||
* them directly - use thr `JavaFlowSupport` classes instead.
|
||||
*
|
||||
* Please note that either of these types are designed for *inter-op* and usually should not be used directly
|
||||
* in applications. The intended use case is for shared libraries, like database drivers or similar to provide
|
||||
* the inter-operable types, such that other librarie can co-operate with them directly, if that is your use case
|
||||
* and you're using the j.u.c.Flow types, use the [[akka.stream.scaladsl.JavaFlowSupport]] sources/sinks/flows instead.
|
||||
*
|
||||
* The interfaces as well as semantic contract of these sets of interfaces.
|
||||
* Please see https://github.com/reactive-streams/reactive-streams-jvm to learn more about the specification,
|
||||
* and TCK that is available to verify an implementation is a valid implementation.
|
||||
*/
|
||||
@InternalApi
|
||||
private[akka] object JavaFlowAndRsConverters {
|
||||
|
||||
/** Adds `asJava` and `asRs` enrichment methods to Reactive Streams and j.u.c.Flow types. */
|
||||
object Implicits {
|
||||
final implicit class FlowPublisherConverter[T](val p: rs.Publisher[T]) extends AnyVal {
|
||||
def asJava: Flow.Publisher[T] =
|
||||
if (p eq null) null else JavaFlowAndRsConverters.asJava(p)
|
||||
}
|
||||
final implicit class RsPublisherConverter[T](val p: Flow.Publisher[T]) extends AnyVal {
|
||||
def asRs: rs.Publisher[T] = JavaFlowAndRsConverters.asRs(p)
|
||||
}
|
||||
|
||||
final implicit class FlowSubscriberConverter[T](val s: rs.Subscriber[T]) extends AnyVal {
|
||||
def asJava: Flow.Subscriber[T] = JavaFlowAndRsConverters.asJava(s)
|
||||
}
|
||||
final implicit class RsSubscriberConverter[T](val s: Flow.Subscriber[T]) extends AnyVal {
|
||||
def asRs: rs.Subscriber[T] = JavaFlowAndRsConverters.asRs(s)
|
||||
}
|
||||
|
||||
final implicit class FlowProcessorConverter[T, R](val s: rs.Processor[T, R]) extends AnyVal {
|
||||
def asJava: Flow.Processor[T, R] = JavaFlowAndRsConverters.asJava(s)
|
||||
}
|
||||
final implicit class RsProcessorConverter[T, R](val s: Flow.Processor[T, R]) extends AnyVal {
|
||||
def asRs: rs.Processor[T, R] = JavaFlowAndRsConverters.asRs(s)
|
||||
}
|
||||
|
||||
final implicit class FlowSubscriptionConverter[T](val s: rs.Subscription) extends AnyVal {
|
||||
def asJava: Flow.Subscription = JavaFlowAndRsConverters.asJava(s)
|
||||
}
|
||||
final implicit class RsSubscriptionConverter[T](val s: Flow.Subscription) extends AnyVal {
|
||||
def asRs: rs.Subscription = JavaFlowAndRsConverters.asRs(s)
|
||||
}
|
||||
}
|
||||
|
||||
final def asJava[T](p: rs.Publisher[T]): Flow.Publisher[T] = p match {
|
||||
case null ⇒ null // null remains null
|
||||
case adapter: JavaFlowPublisherToRsAdapter[T] ⇒ adapter.delegate // unwrap adapter instead of wrapping again
|
||||
case delegate ⇒ new RsPublisherToJavaFlowAdapter(delegate) // adapt, it is a real Publisher
|
||||
}
|
||||
final def asRs[T](p: Flow.Publisher[T]): rs.Publisher[T] = p match {
|
||||
case null ⇒ null // null remains null
|
||||
case adapter: RsPublisherToJavaFlowAdapter[T] ⇒ adapter.delegate // unwrap adapter instead of wrapping again
|
||||
case _ => new JavaFlowPublisherToRsAdapter[T](p)
|
||||
}
|
||||
|
||||
final def asJava[T](s: rs.Subscription): Flow.Subscription = s match {
|
||||
case null ⇒ null // null remains null
|
||||
case adapter: JavaFlowSubscriptionToRsAdapter ⇒ adapter.delegate // unwrap adapter instead of wrapping again
|
||||
case _ => new RsSubscriptionToJavaFlowAdapter(s)
|
||||
}
|
||||
final def asRs[T](s: Flow.Subscription): rs.Subscription = s match {
|
||||
case null ⇒ null // null remains null
|
||||
case adapter: RsSubscriptionToJavaFlowAdapter ⇒ adapter.delegate // unwrap adapter instead of wrapping again
|
||||
case _ => new JavaFlowSubscriptionToRsAdapter(s)
|
||||
}
|
||||
|
||||
final def asJava[T](s: rs.Subscriber[T]): Flow.Subscriber[T] =
|
||||
s match {
|
||||
case null ⇒ null // null remains null
|
||||
case adapter: JavaFlowSubscriberToRsAdapter[T] ⇒ adapter.delegate // unwrap adapter instead of wrapping again
|
||||
case _ => new RsSubscriberToJavaFlowAdapter[T](s)
|
||||
}
|
||||
final def asRs[T](s: Flow.Subscriber[T]): rs.Subscriber[T] = s match {
|
||||
case null ⇒ null // null remains null
|
||||
case adapter: RsSubscriberToJavaFlowAdapter[T] ⇒ adapter.delegate // unwrap adapter instead of wrapping again
|
||||
case _ => new JavaFlowSubscriberToRsAdapter[T](s)
|
||||
}
|
||||
|
||||
final def asJava[T, R](p: rs.Processor[T, R]): Flow.Processor[T, R] = p match {
|
||||
case null ⇒ null // null remains null
|
||||
case adapter: JavaFlowProcessorToRsAdapter[T, R] ⇒ adapter.delegate // unwrap adapter instead of wrapping again
|
||||
case _ => new RsProcessorToJavaFlowAdapter[T, R](p)
|
||||
}
|
||||
final def asRs[T, R](p: Flow.Processor[T, R]): rs.Processor[T, R] = p match {
|
||||
case null ⇒ null // null remains null
|
||||
case adapter: RsProcessorToJavaFlowAdapter[T, R] ⇒ adapter.delegate // unwrap adapter instead of wrapping again
|
||||
case _ => new JavaFlowProcessorToRsAdapter[T, R](p)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/** INTERNAL API: Adapters are not meant to be touched directly */
|
||||
@InternalApi private[akka] final class JavaFlowPublisherToRsAdapter[T](val delegate: Flow.Publisher[T]) extends rs.Publisher[T] {
|
||||
override def subscribe(rsSubscriber: rs.Subscriber[_ >: T]): Unit =
|
||||
delegate.subscribe(rsSubscriber.asJava)
|
||||
}
|
||||
/** INTERNAL API: Adapters are not meant to be touched directly */
|
||||
@InternalApi private[akka] final class RsPublisherToJavaFlowAdapter[T](val delegate: rs.Publisher[T]) extends Flow.Publisher[T] {
|
||||
override def subscribe(javaSubscriber: Flow.Subscriber[_ >: T]): Unit =
|
||||
delegate.subscribe(javaSubscriber.asRs)
|
||||
}
|
||||
|
||||
/** INTERNAL API: Adapters are not meant to be touched directly */
|
||||
@InternalApi private[akka] final class RsSubscriberToJavaFlowAdapter[T](val delegate: rs.Subscriber[T]) extends Flow.Subscriber[T] {
|
||||
override def onError(t: Throwable): Unit =
|
||||
delegate.onError(t)
|
||||
|
||||
override def onComplete(): Unit =
|
||||
delegate.onComplete()
|
||||
|
||||
override def onNext(elem: T): Unit =
|
||||
delegate.onNext(elem)
|
||||
|
||||
override def onSubscribe(s: Flow.Subscription): Unit =
|
||||
delegate.onSubscribe(s.asRs)
|
||||
}
|
||||
/** INTERNAL API: Adapters are not meant to be touched directly */
|
||||
@InternalApi private[akka] final class JavaFlowSubscriberToRsAdapter[T](val delegate: Flow.Subscriber[T]) extends rs.Subscriber[T] {
|
||||
override def onError(t: Throwable): Unit =
|
||||
delegate.onError(t)
|
||||
|
||||
override def onComplete(): Unit =
|
||||
delegate.onComplete()
|
||||
|
||||
override def onNext(elem: T): Unit =
|
||||
delegate.onNext(elem)
|
||||
|
||||
override def onSubscribe(s: rs.Subscription): Unit =
|
||||
delegate.onSubscribe(s.asJava)
|
||||
}
|
||||
|
||||
/** INTERNAL API: Adapters are not meant to be touched directly */
|
||||
@InternalApi private[akka] final class RsSubscriptionToJavaFlowAdapter(val delegate: rs.Subscription) extends Flow.Subscription {
|
||||
override def cancel(): Unit = delegate.cancel()
|
||||
|
||||
override def request(n: Long): Unit = delegate.request(n)
|
||||
}
|
||||
/** INTERNAL API: Adapters are not meant to be touched directly */
|
||||
@InternalApi private[akka] final class JavaFlowSubscriptionToRsAdapter(val delegate: Flow.Subscription) extends rs.Subscription {
|
||||
override def cancel(): Unit = delegate.cancel()
|
||||
|
||||
override def request(n: Long): Unit = delegate.request(n)
|
||||
}
|
||||
|
||||
/** INTERNAL API: Adapters are not meant to be touched directly */
|
||||
@InternalApi private[akka] final class RsProcessorToJavaFlowAdapter[T, R](val delegate: rs.Processor[T, R]) extends Flow.Processor[T, R] {
|
||||
override def onError(t: Throwable): Unit =
|
||||
delegate.onError(t)
|
||||
|
||||
override def onComplete(): Unit =
|
||||
delegate.onComplete()
|
||||
|
||||
override def onNext(elem: T): Unit =
|
||||
delegate.onNext(elem)
|
||||
|
||||
override def onSubscribe(s: Flow.Subscription): Unit =
|
||||
delegate.onSubscribe(s.asRs)
|
||||
|
||||
override def subscribe(javaSubscriber: Flow.Subscriber[_ >: R]): Unit =
|
||||
delegate.subscribe(javaSubscriber.asRs)
|
||||
}
|
||||
/** INTERNAL API: Adapters are not meant to be touched directly */
|
||||
@InternalApi private[akka] final class JavaFlowProcessorToRsAdapter[T, R](val delegate: Flow.Processor[T, R]) extends rs.Processor[T, R] {
|
||||
override def onError(t: Throwable): Unit =
|
||||
delegate.onError(t)
|
||||
|
||||
override def onComplete(): Unit =
|
||||
delegate.onComplete()
|
||||
|
||||
override def onNext(elem: T): Unit =
|
||||
delegate.onNext(elem)
|
||||
|
||||
override def onSubscribe(s: rs.Subscription): Unit =
|
||||
delegate.onSubscribe(s.asJava)
|
||||
|
||||
override def subscribe(rsSubscriber: rs.Subscriber[_ >: R]): Unit =
|
||||
delegate.subscribe(rsSubscriber.asJava)
|
||||
}
|
||||
|
|
@ -0,0 +1,120 @@
|
|||
/*
|
||||
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com/>
|
||||
*/
|
||||
|
||||
package akka.stream.scaladsl
|
||||
|
||||
import java.util.{ concurrent ⇒ juc }
|
||||
|
||||
import akka.NotUsed
|
||||
import akka.stream.impl.JavaFlowAndRsConverters
|
||||
import akka.stream.scaladsl
|
||||
|
||||
import scala.annotation.unchecked.uncheckedVariance
|
||||
|
||||
/**
|
||||
* For use only with `JDK 9+`.
|
||||
*
|
||||
* Provides support for `java.util.concurrent.Flow.*` interfaces which mirror the Reactive Streams
|
||||
* interfaces from `org.reactivestreams`. See: [http://www.reactive-streams.org/](reactive-streams.org).
|
||||
*/
|
||||
object JavaFlowSupport {
|
||||
import JavaFlowAndRsConverters.Implicits._
|
||||
|
||||
/**
|
||||
* [[akka.stream.scaladsl.Source]] factories operating with `java.util.concurrent.Flow.*` interfaces.
|
||||
*/
|
||||
object Source {
|
||||
|
||||
/**
|
||||
* Helper to create [[Source]] from [[java.util.concurrent.Flow.Publisher]].
|
||||
*
|
||||
* Construct a transformation starting with given publisher. The transformation steps
|
||||
* are executed by a series of [[java.util.concurrent.Flow.Processor]] instances
|
||||
* that mediate the flow of elements downstream and the propagation of
|
||||
* back-pressure upstream.
|
||||
*
|
||||
* @see See also [[Source.fromPublisher]] if wanting to integrate with [[org.reactivestreams.Publisher]] instead
|
||||
* (which carries the same semantics, however existed before RS's inclusion in Java 9).
|
||||
*/
|
||||
final def fromPublisher[T](publisher: juc.Flow.Publisher[T]): Source[T, NotUsed] =
|
||||
scaladsl.Source.fromPublisher(publisher.asRs)
|
||||
|
||||
/**
|
||||
* Creates a `Source` that is materialized as a [[java.util.concurrent.Flow.Subscriber]]
|
||||
*
|
||||
* @see See also [[Source.asSubscriber]] if wanting to integrate with [[org.reactivestreams.Subscriber]] instead
|
||||
* (which carries the same semantics, however existed before RS's inclusion in Java 9).
|
||||
*/
|
||||
final def asSubscriber[T]: Source[T, juc.Flow.Subscriber[T]] =
|
||||
scaladsl.Source.asSubscriber[T].mapMaterializedValue(_.asJava)
|
||||
}
|
||||
|
||||
/**
|
||||
* [[akka.stream.scaladsl.Flow]] factories operating with `java.util.concurrent.Flow.*` interfaces.
|
||||
*/
|
||||
object Flow {
|
||||
|
||||
/**
|
||||
* Creates a Flow from a Reactive Streams [[org.reactivestreams.Processor]]
|
||||
*/
|
||||
def fromProcessor[I, O](processorFactory: () ⇒ juc.Flow.Processor[I, O]): Flow[I, O, NotUsed] = {
|
||||
fromProcessorMat(() ⇒ (processorFactory(), NotUsed))
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a Flow from a Reactive Streams [[java.util.concurrent.Flow.Processor]] and returns a materialized value.
|
||||
*/
|
||||
def fromProcessorMat[I, O, M](processorFactory: () ⇒ (juc.Flow.Processor[I, O], M)): Flow[I, O, M] =
|
||||
scaladsl.Flow.fromProcessorMat { () ⇒
|
||||
val (processor, mat) = processorFactory()
|
||||
(processor.asRs, mat)
|
||||
}
|
||||
|
||||
/**
|
||||
* Converts this Flow to a [[RunnableGraph]] that materializes to a Reactive Streams [[java.util.concurrent.Flow.Processor]]
|
||||
* which implements the operations encapsulated by this Flow. Every materialization results in a new Processor
|
||||
* instance, i.e. the returned [[RunnableGraph]] is reusable.
|
||||
*
|
||||
* @return A [[RunnableGraph]] that materializes to a Processor when run() is called on it.
|
||||
*/
|
||||
def toProcessor[In, Out, Mat](self: Flow[In, Out, Mat]): RunnableGraph[juc.Flow.Processor[In @uncheckedVariance, Out @uncheckedVariance]] =
|
||||
Source.asSubscriber[In].via(self)
|
||||
.toMat(Sink.asPublisher[Out](fanout = false))(Keep.both)
|
||||
.mapMaterializedValue {
|
||||
case (sub, pub) ⇒ new juc.Flow.Processor[In, Out] {
|
||||
override def onError(t: Throwable): Unit = sub.onError(t)
|
||||
override def onSubscribe(s: juc.Flow.Subscription): Unit = sub.onSubscribe(s)
|
||||
override def onComplete(): Unit = sub.onComplete()
|
||||
override def onNext(t: In): Unit = sub.onNext(t)
|
||||
override def subscribe(s: juc.Flow.Subscriber[_ >: Out]): Unit = pub.subscribe(s)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* [[akka.stream.scaladsl.Sink]] factories operating with `java.util.concurrent.Flow.*` interfaces.
|
||||
*/
|
||||
object Sink {
|
||||
/**
|
||||
* A `Sink` that materializes into a [[java.util.concurrent.Flow.Publisher]].
|
||||
*
|
||||
* If `fanout` is `WITH_FANOUT`, the materialized `Publisher` will support multiple `Subscriber`s and
|
||||
* the size of the `inputBuffer` configured for this stage becomes the maximum number of elements that
|
||||
* the fastest [[java.util.concurrent.Flow.Subscriber]] can be ahead of the slowest one before slowing
|
||||
* the processing down due to back pressure.
|
||||
*
|
||||
* If `fanout` is `WITHOUT_FANOUT` then the materialized `Publisher` will only support a single `Subscriber` and
|
||||
* reject any additional `Subscriber`s.
|
||||
*/
|
||||
final def asPublisher[T](fanout: Boolean): Sink[T, juc.Flow.Publisher[T]] =
|
||||
scaladsl.Sink.asPublisher[T](fanout).mapMaterializedValue(_.asJava)
|
||||
|
||||
/**
|
||||
* Helper to create [[Sink]] from [[java.util.concurrent.Flow.Subscriber]].
|
||||
*/
|
||||
final def fromSubscriber[T](s: juc.Flow.Subscriber[T]): Sink[T, NotUsed] =
|
||||
scaladsl.Sink.fromSubscriber(s.asRs)
|
||||
}
|
||||
|
||||
}
|
||||
47
project/Jdk9CompileDirectoriesPlugin.scala
Normal file
47
project/Jdk9CompileDirectoriesPlugin.scala
Normal file
|
|
@ -0,0 +1,47 @@
|
|||
/*
|
||||
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com/>
|
||||
*/
|
||||
import java.io.File
|
||||
|
||||
import sbt._
|
||||
import sbt.Keys._
|
||||
|
||||
object Jdk9CompileDirectoriesPlugin extends AutoPlugin {
|
||||
|
||||
val jdkVersion: String = System.getProperty("java.version")
|
||||
|
||||
override def trigger = allRequirements
|
||||
|
||||
override lazy val projectSettings = Seq(
|
||||
|
||||
javacOptions in Compile ++= {
|
||||
// making sure we're really targeting 1.8
|
||||
if (isJDK9) Seq("-target", "1.8", "-source", "1.8", "-Xdoclint:none")
|
||||
else Seq("-Xdoclint:none")
|
||||
},
|
||||
|
||||
unmanagedSourceDirectories in Compile ++= {
|
||||
if (isJDK9) {
|
||||
println(s"[JDK9] Enabled [...-jdk9-only] directories to be compiled.")
|
||||
Seq(
|
||||
(sourceDirectory in Compile).value / "java-jdk9-only",
|
||||
(sourceDirectory in Compile).value / "scala-jdk9-only"
|
||||
)
|
||||
} else Seq.empty
|
||||
},
|
||||
|
||||
unmanagedSourceDirectories in Test ++= {
|
||||
if (isJDK9) {
|
||||
Seq(
|
||||
(sourceDirectory in Test).value / "java-jdk9-only",
|
||||
(sourceDirectory in Test).value / "scala-jdk9-only"
|
||||
)
|
||||
} else Seq.empty
|
||||
}
|
||||
|
||||
)
|
||||
|
||||
private def isJDK9 = {
|
||||
jdkVersion startsWith "9"
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue