Java Flow dsl lower bounds incorrect #24368

This commit is contained in:
Johan Andrén 2018-03-15 14:53:50 +01:00 committed by GitHub
parent a53a09e6ba
commit d8b9bb1b3a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 199 additions and 126 deletions

View file

@ -22,12 +22,20 @@ import akka.stream.impl.fusing.MapError
import scala.compat.java8.FutureConverters._
import scala.reflect.ClassTag
/**
* * Upcast a stream of elements to a stream of supertypes of that element. Useful in combination with
* fan-in combinators where you do not want to pay the cost of casting each element in a `map`.
*/
object SubSource {
def upcast[U, T <: U, Mat](source: SubSource[T, Mat]): SubSource[U, Mat] = source.asInstanceOf[SubSource[U, Mat]]
}
/**
* A stream of streams sub-flow of data elements, e.g. produced by `groupBy`.
* SubFlows cannot contribute to the super-flows materialized value since they
* are materialized later, during the runtime of the flow graph processing.
*/
class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source[Out, Mat]#Repr, scaladsl.RunnableGraph[Mat]]) {
class SubSource[Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source[Out, Mat]#Repr, scaladsl.RunnableGraph[Mat]]) {
/** Converts this Flow to its Scala DSL counterpart */
def asScala: scaladsl.SubFlow[Out, Mat, scaladsl.Source[Out, Mat]#Repr, scaladsl.RunnableGraph[Mat]] @uncheckedVariance = delegate
@ -545,7 +553,7 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source
*
* '''Cancels when''' downstream cancels
*/
def intersperse[T >: Out](start: T, inject: T, end: T): SubSource[T, Mat] =
def intersperse(start: Out, inject: Out, end: Out): SubSource[Out, Mat] =
new SubSource(delegate.intersperse(start, inject, end))
/**
@ -570,7 +578,7 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source
*
* '''Cancels when''' downstream cancels
*/
def intersperse[T >: Out](inject: T): SubSource[T, Mat] =
def intersperse(inject: Out): SubSource[Out, Mat] =
new SubSource(delegate.intersperse(inject))
/**
@ -744,7 +752,7 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source
* '''Cancels when''' downstream cancels
*
*/
def recover[T >: Out](pf: PartialFunction[Throwable, T]): SubSource[T, Mat] =
def recover(pf: PartialFunction[Throwable, Out]): SubSource[Out, Mat] =
new SubSource(delegate.recover(pf))
/**
@ -766,7 +774,7 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source
*
*/
@deprecated("Use recoverWithRetries instead.", "2.4.4")
def recoverWith[T >: Out](pf: PartialFunction[Throwable, _ <: Graph[SourceShape[T], NotUsed]]): SubSource[T, Mat @uncheckedVariance] =
def recoverWith(pf: PartialFunction[Throwable, Graph[SourceShape[Out], NotUsed]]): SubSource[Out, Mat] =
new SubSource(delegate.recoverWith(pf))
/**
@ -790,7 +798,7 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source
* '''Cancels when''' downstream cancels
*
*/
def recoverWithRetries[T >: Out](attempts: Int, pf: PartialFunction[Throwable, _ <: Graph[SourceShape[T], NotUsed]]): SubSource[T, Mat @uncheckedVariance] =
def recoverWithRetries(attempts: Int, pf: PartialFunction[Throwable, Graph[SourceShape[Out], NotUsed]]): SubSource[Out, Mat] =
new SubSource(delegate.recoverWithRetries(attempts, pf))
/**
@ -911,7 +919,7 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source
* @param aggregate Takes the currently aggregated value and the current pending element to produce a new aggregate
*
*/
def conflate[O2 >: Out](aggregate: function.Function2[O2, O2, O2]): SubSource[O2, Mat] =
def conflate(aggregate: function.Function2[Out, Out, Out]): SubSource[Out, Mat] =
new SubSource(delegate.conflate(aggregate.apply))
/**
@ -992,7 +1000,6 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source
*
* '''Cancels when''' downstream cancels
*
* @param seed Provides the first state for extrapolation using the first unconsumed element
* @param extrapolate Takes the current extrapolation state to produce an output element and the next extrapolation
* state.
*/
@ -1098,7 +1105,7 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source
*
* '''Cancels when''' downstream cancels
*/
def concat[T >: Out, M](that: Graph[SourceShape[T], M]): SubSource[T, Mat] =
def concat[M](that: Graph[SourceShape[Out], M]): SubSource[Out, Mat] =
new SubSource(delegate.concat(that))
/**
@ -1119,7 +1126,7 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source
*
* '''Cancels when''' downstream cancels
*/
def prepend[T >: Out, M](that: Graph[SourceShape[T], M]): SubSource[T, Mat] =
def prepend[M](that: Graph[SourceShape[Out], M]): SubSource[Out, Mat] =
new SubSource(delegate.prepend(that))
/**
@ -1144,7 +1151,7 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source
* '''Cancels when''' downstream cancels and additionally the alternative is cancelled as soon as an element passes
* by from this stream.
*/
def orElse[T >: Out, M](secondary: Graph[SourceShape[T], M]): javadsl.SubSource[T, Mat] =
def orElse[M](secondary: Graph[SourceShape[Out], M]): javadsl.SubSource[Out, Mat] =
new SubSource(delegate.orElse(secondary))
/**
@ -1206,7 +1213,7 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source
*
* '''Cancels when''' downstream cancels
*/
def merge[T >: Out](that: Graph[SourceShape[T], _]): SubSource[T, Mat] =
def merge(that: Graph[SourceShape[Out], _]): SubSource[Out, Mat] =
new SubSource(delegate.merge(that))
/**
@ -1233,7 +1240,7 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source
*
* '''Cancels when''' downstream cancels
*/
def interleave[T >: Out](that: Graph[SourceShape[T], _], segmentSize: Int): SubSource[T, Mat] =
def interleave(that: Graph[SourceShape[Out], _], segmentSize: Int): SubSource[Out, Mat] =
new SubSource(delegate.interleave(that, segmentSize))
/**
@ -1251,7 +1258,7 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source
*
* '''Cancels when''' downstream cancels
*/
def mergeSorted[U >: Out, M](that: Graph[SourceShape[U], M], comp: Comparator[U]): javadsl.SubSource[U, Mat] =
def mergeSorted[M](that: Graph[SourceShape[Out], M], comp: Comparator[Out]): javadsl.SubSource[Out, Mat] =
new SubSource(delegate.mergeSorted(that)(Ordering.comparatorToOrdering(comp)))
/**
@ -1379,7 +1386,7 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source
*
* '''Cancels when''' downstream cancels
*/
def keepAlive[U >: Out](maxIdle: FiniteDuration, injectedElem: function.Creator[U]): SubSource[U, Mat] =
def keepAlive(maxIdle: FiniteDuration, injectedElem: function.Creator[Out]): SubSource[Out, Mat] =
new SubSource(delegate.keepAlive(maxIdle, () injectedElem.create()))
/**