Compile with jdk9, targeting jdk8 (#24711)
This commit is contained in:
parent
e94d011cbd
commit
8ba48c0adc
12 changed files with 290 additions and 50 deletions
|
|
@ -0,0 +1,197 @@
|
|||
/*
|
||||
* Copyright (C) 2017-2018 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.stream.impl
|
||||
|
||||
import java.util.concurrent.Flow
|
||||
|
||||
import akka.annotation.InternalApi
|
||||
import org.{ reactivestreams ⇒ rs }
|
||||
import JavaFlowAndRsConverters.Implicits._
|
||||
|
||||
/**
|
||||
* INTERNAL API: Provides converters between Reactive Streams (reactive-streams.org) and their Java 9+ counter-parts,
|
||||
* defined in `java.util.concurrent.Flow.*`. This API is internal because Reactive Streams will ship with such
|
||||
* adapters itself at some point, and we'd not want to duplicate that effort for users to be confused about which ones
|
||||
* to use. These adapters are used internally by Akka Streams to convert between the standards but you should not touch
|
||||
* them directly - use thr `JavaFlowSupport` classes instead.
|
||||
*
|
||||
* Please note that either of these types are designed for *inter-op* and usually should not be used directly
|
||||
* in applications. The intended use case is for shared libraries, like database drivers or similar to provide
|
||||
* the inter-operable types, such that other librarie can co-operate with them directly, if that is your use case
|
||||
* and you're using the j.u.c.Flow types, use the [[akka.stream.scaladsl.JavaFlowSupport]] sources/sinks/flows instead.
|
||||
*
|
||||
* The interfaces as well as semantic contract of these sets of interfaces.
|
||||
* Please see https://github.com/reactive-streams/reactive-streams-jvm to learn more about the specification,
|
||||
* and TCK that is available to verify an implementation is a valid implementation.
|
||||
*/
|
||||
@InternalApi
|
||||
private[akka] object JavaFlowAndRsConverters {
|
||||
|
||||
/** Adds `asJava` and `asRs` enrichment methods to Reactive Streams and j.u.c.Flow types. */
|
||||
object Implicits {
|
||||
final implicit class FlowPublisherConverter[T](val p: rs.Publisher[T]) extends AnyVal {
|
||||
def asJava: Flow.Publisher[T] =
|
||||
if (p eq null) null else JavaFlowAndRsConverters.asJava(p)
|
||||
}
|
||||
final implicit class RsPublisherConverter[T](val p: Flow.Publisher[T]) extends AnyVal {
|
||||
def asRs: rs.Publisher[T] = JavaFlowAndRsConverters.asRs(p)
|
||||
}
|
||||
|
||||
final implicit class FlowSubscriberConverter[T](val s: rs.Subscriber[T]) extends AnyVal {
|
||||
def asJava: Flow.Subscriber[T] = JavaFlowAndRsConverters.asJava(s)
|
||||
}
|
||||
final implicit class RsSubscriberConverter[T](val s: Flow.Subscriber[T]) extends AnyVal {
|
||||
def asRs: rs.Subscriber[T] = JavaFlowAndRsConverters.asRs(s)
|
||||
}
|
||||
|
||||
final implicit class FlowProcessorConverter[T, R](val s: rs.Processor[T, R]) extends AnyVal {
|
||||
def asJava: Flow.Processor[T, R] = JavaFlowAndRsConverters.asJava(s)
|
||||
}
|
||||
final implicit class RsProcessorConverter[T, R](val s: Flow.Processor[T, R]) extends AnyVal {
|
||||
def asRs: rs.Processor[T, R] = JavaFlowAndRsConverters.asRs(s)
|
||||
}
|
||||
|
||||
final implicit class FlowSubscriptionConverter[T](val s: rs.Subscription) extends AnyVal {
|
||||
def asJava: Flow.Subscription = JavaFlowAndRsConverters.asJava(s)
|
||||
}
|
||||
final implicit class RsSubscriptionConverter[T](val s: Flow.Subscription) extends AnyVal {
|
||||
def asRs: rs.Subscription = JavaFlowAndRsConverters.asRs(s)
|
||||
}
|
||||
}
|
||||
|
||||
final def asJava[T](p: rs.Publisher[T]): Flow.Publisher[T] = p match {
|
||||
case null ⇒ null // null remains null
|
||||
case adapter: JavaFlowPublisherToRsAdapter[T] ⇒ adapter.delegate // unwrap adapter instead of wrapping again
|
||||
case delegate ⇒ new RsPublisherToJavaFlowAdapter(delegate) // adapt, it is a real Publisher
|
||||
}
|
||||
final def asRs[T](p: Flow.Publisher[T]): rs.Publisher[T] = p match {
|
||||
case null ⇒ null // null remains null
|
||||
case adapter: RsPublisherToJavaFlowAdapter[T] ⇒ adapter.delegate // unwrap adapter instead of wrapping again
|
||||
case _ ⇒ new JavaFlowPublisherToRsAdapter[T](p)
|
||||
}
|
||||
|
||||
final def asJava[T](s: rs.Subscription): Flow.Subscription = s match {
|
||||
case null ⇒ null // null remains null
|
||||
case adapter: JavaFlowSubscriptionToRsAdapter ⇒ adapter.delegate // unwrap adapter instead of wrapping again
|
||||
case _ ⇒ new RsSubscriptionToJavaFlowAdapter(s)
|
||||
}
|
||||
final def asRs[T](s: Flow.Subscription): rs.Subscription = s match {
|
||||
case null ⇒ null // null remains null
|
||||
case adapter: RsSubscriptionToJavaFlowAdapter ⇒ adapter.delegate // unwrap adapter instead of wrapping again
|
||||
case _ ⇒ new JavaFlowSubscriptionToRsAdapter(s)
|
||||
}
|
||||
|
||||
final def asJava[T](s: rs.Subscriber[T]): Flow.Subscriber[T] =
|
||||
s match {
|
||||
case null ⇒ null // null remains null
|
||||
case adapter: JavaFlowSubscriberToRsAdapter[T] ⇒ adapter.delegate // unwrap adapter instead of wrapping again
|
||||
case _ ⇒ new RsSubscriberToJavaFlowAdapter[T](s)
|
||||
}
|
||||
final def asRs[T](s: Flow.Subscriber[T]): rs.Subscriber[T] = s match {
|
||||
case null ⇒ null // null remains null
|
||||
case adapter: RsSubscriberToJavaFlowAdapter[T] ⇒ adapter.delegate // unwrap adapter instead of wrapping again
|
||||
case _ ⇒ new JavaFlowSubscriberToRsAdapter[T](s)
|
||||
}
|
||||
|
||||
final def asJava[T, R](p: rs.Processor[T, R]): Flow.Processor[T, R] = p match {
|
||||
case null ⇒ null // null remains null
|
||||
case adapter: JavaFlowProcessorToRsAdapter[T, R] ⇒ adapter.delegate // unwrap adapter instead of wrapping again
|
||||
case _ ⇒ new RsProcessorToJavaFlowAdapter[T, R](p)
|
||||
}
|
||||
final def asRs[T, R](p: Flow.Processor[T, R]): rs.Processor[T, R] = p match {
|
||||
case null ⇒ null // null remains null
|
||||
case adapter: RsProcessorToJavaFlowAdapter[T, R] ⇒ adapter.delegate // unwrap adapter instead of wrapping again
|
||||
case _ ⇒ new JavaFlowProcessorToRsAdapter[T, R](p)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/** INTERNAL API: Adapters are not meant to be touched directly */
|
||||
@InternalApi private[akka] final class JavaFlowPublisherToRsAdapter[T](val delegate: Flow.Publisher[T]) extends rs.Publisher[T] {
|
||||
override def subscribe(rsSubscriber: rs.Subscriber[_ >: T]): Unit =
|
||||
delegate.subscribe(rsSubscriber.asJava)
|
||||
}
|
||||
/** INTERNAL API: Adapters are not meant to be touched directly */
|
||||
@InternalApi private[akka] final class RsPublisherToJavaFlowAdapter[T](val delegate: rs.Publisher[T]) extends Flow.Publisher[T] {
|
||||
override def subscribe(javaSubscriber: Flow.Subscriber[_ >: T]): Unit =
|
||||
delegate.subscribe(javaSubscriber.asRs)
|
||||
}
|
||||
|
||||
/** INTERNAL API: Adapters are not meant to be touched directly */
|
||||
@InternalApi private[akka] final class RsSubscriberToJavaFlowAdapter[T](val delegate: rs.Subscriber[T]) extends Flow.Subscriber[T] {
|
||||
override def onError(t: Throwable): Unit =
|
||||
delegate.onError(t)
|
||||
|
||||
override def onComplete(): Unit =
|
||||
delegate.onComplete()
|
||||
|
||||
override def onNext(elem: T): Unit =
|
||||
delegate.onNext(elem)
|
||||
|
||||
override def onSubscribe(s: Flow.Subscription): Unit =
|
||||
delegate.onSubscribe(s.asRs)
|
||||
}
|
||||
/** INTERNAL API: Adapters are not meant to be touched directly */
|
||||
@InternalApi private[akka] final class JavaFlowSubscriberToRsAdapter[T](val delegate: Flow.Subscriber[T]) extends rs.Subscriber[T] {
|
||||
override def onError(t: Throwable): Unit =
|
||||
delegate.onError(t)
|
||||
|
||||
override def onComplete(): Unit =
|
||||
delegate.onComplete()
|
||||
|
||||
override def onNext(elem: T): Unit =
|
||||
delegate.onNext(elem)
|
||||
|
||||
override def onSubscribe(s: rs.Subscription): Unit =
|
||||
delegate.onSubscribe(s.asJava)
|
||||
}
|
||||
|
||||
/** INTERNAL API: Adapters are not meant to be touched directly */
|
||||
@InternalApi private[akka] final class RsSubscriptionToJavaFlowAdapter(val delegate: rs.Subscription) extends Flow.Subscription {
|
||||
override def cancel(): Unit = delegate.cancel()
|
||||
|
||||
override def request(n: Long): Unit = delegate.request(n)
|
||||
}
|
||||
/** INTERNAL API: Adapters are not meant to be touched directly */
|
||||
@InternalApi private[akka] final class JavaFlowSubscriptionToRsAdapter(val delegate: Flow.Subscription) extends rs.Subscription {
|
||||
override def cancel(): Unit = delegate.cancel()
|
||||
|
||||
override def request(n: Long): Unit = delegate.request(n)
|
||||
}
|
||||
|
||||
/** INTERNAL API: Adapters are not meant to be touched directly */
|
||||
@InternalApi private[akka] final class RsProcessorToJavaFlowAdapter[T, R](val delegate: rs.Processor[T, R]) extends Flow.Processor[T, R] {
|
||||
override def onError(t: Throwable): Unit =
|
||||
delegate.onError(t)
|
||||
|
||||
override def onComplete(): Unit =
|
||||
delegate.onComplete()
|
||||
|
||||
override def onNext(elem: T): Unit =
|
||||
delegate.onNext(elem)
|
||||
|
||||
override def onSubscribe(s: Flow.Subscription): Unit =
|
||||
delegate.onSubscribe(s.asRs)
|
||||
|
||||
override def subscribe(javaSubscriber: Flow.Subscriber[_ >: R]): Unit =
|
||||
delegate.subscribe(javaSubscriber.asRs)
|
||||
}
|
||||
/** INTERNAL API: Adapters are not meant to be touched directly */
|
||||
@InternalApi private[akka] final class JavaFlowProcessorToRsAdapter[T, R](val delegate: Flow.Processor[T, R]) extends rs.Processor[T, R] {
|
||||
override def onError(t: Throwable): Unit =
|
||||
delegate.onError(t)
|
||||
|
||||
override def onComplete(): Unit =
|
||||
delegate.onComplete()
|
||||
|
||||
override def onNext(elem: T): Unit =
|
||||
delegate.onNext(elem)
|
||||
|
||||
override def onSubscribe(s: rs.Subscription): Unit =
|
||||
delegate.onSubscribe(s.asJava)
|
||||
|
||||
override def subscribe(rsSubscriber: rs.Subscriber[_ >: R]): Unit =
|
||||
delegate.subscribe(rsSubscriber.asJava)
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue