diff --git a/akka-docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeAdhocSourceTest.java b/akka-docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeAdhocSourceTest.java index 6a0e4df308..d63d2f768c 100644 --- a/akka-docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeAdhocSourceTest.java +++ b/akka-docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeAdhocSourceTest.java @@ -145,8 +145,8 @@ public class RecipeAdhocSourceTest extends RecipeTest { Promise shutdown = Futures.promise(); AtomicInteger startedCount = new AtomicInteger(0); - Source source = Source - .empty().mapMaterializedValue(x -> startedCount.incrementAndGet()) + Source source = Source.empty() + .mapMaterializedValue(x -> startedCount.incrementAndGet()) .concat(Source.repeat("a")); TestSubscriber.Probe probe = @@ -172,8 +172,8 @@ public class RecipeAdhocSourceTest extends RecipeTest { Promise shutdown = Futures.promise(); AtomicInteger startedCount = new AtomicInteger(0); - Source source = Source - .empty().mapMaterializedValue(x -> startedCount.incrementAndGet()) + Source source = Source.empty() + .mapMaterializedValue(x -> startedCount.incrementAndGet()) .concat(Source.repeat("a")); TestSubscriber.Probe probe = diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java index 01224f045e..e5d99a438b 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java @@ -43,10 +43,21 @@ public class FlowTest extends StreamTest { super(actorSystemResource); } - @ClassRule + @ClassRule public static AkkaJUnitActorSystemResource actorSystemResource = new AkkaJUnitActorSystemResource("FlowTest", AkkaSpec.testConf()); + interface Fruit {} + static class Apple implements Fruit {}; + static class Orange implements Fruit {}; + + public void compileOnlyUpcast() { + Flow appleFlow = null; + Flow appleFruitFlow = Flow.upcast(appleFlow); + + Flow fruitFlow = appleFruitFlow.intersperse(new Orange()); + } + @Test public void mustBeAbleToUseSimpleOperators() { final TestKit probe = new TestKit(system); diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java index ae69020f43..5f757c717c 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java @@ -47,6 +47,20 @@ public class SourceTest extends StreamTest { public static AkkaJUnitActorSystemResource actorSystemResource = new AkkaJUnitActorSystemResource("SourceTest", AkkaSpec.testConf()); + + interface Fruit {} + static class Apple implements Fruit {}; + static class Orange implements Fruit {}; + + public void compileOnlyUpcast() { + Source apples = null; + Source oranges = null; + Source appleFruits = Source.upcast(apples); + Source orangeFruits = Source.upcast(oranges); + + Source fruits = appleFruits.merge(orangeFruits); + } + @Test public void mustBeAbleToUseSimpleOperators() { final TestKit probe = new TestKit(system); @@ -849,14 +863,10 @@ public class SourceTest extends StreamTest { @Test public void mustBeAbleToUseIdleInject() throws Exception { Integer result = - Source.maybe() - .keepAlive(Duration.create(1, "second"), new Creator() { - public Integer create() { - return 0; - } - }) + Source.maybe() + .keepAlive(Duration.create(1, "second"), () -> 0) .takeWithin(Duration.create(1500, "milliseconds")) - .runWith(Sink.head(), materializer) + .runWith(Sink.head(), materializer) .toCompletableFuture().get(3, TimeUnit.SECONDS); assertEquals((Object) 0, result); diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/BidiFlow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/BidiFlow.scala index b6c325ab7c..c442f125f5 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/BidiFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/BidiFlow.scala @@ -96,7 +96,7 @@ object BidiFlow { new BidiFlow(scaladsl.BidiFlow.bidirectionalIdleTimeout(timeout)) } -final class BidiFlow[-I1, +O1, -I2, +O2, +Mat](delegate: scaladsl.BidiFlow[I1, O1, I2, O2, Mat]) extends Graph[BidiShape[I1, O1, I2, O2], Mat] { +final class BidiFlow[I1, O1, I2, O2, Mat](delegate: scaladsl.BidiFlow[I1, O1, I2, O2, Mat]) extends Graph[BidiShape[I1, O1, I2, O2], Mat] { override def traversalBuilder = delegate.traversalBuilder override def shape = delegate.shape diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index f8bcf0e857..f13df70c97 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -11,7 +11,6 @@ import akka.japi.{ Pair, function } import akka.stream._ import org.reactivestreams.Processor -import scala.annotation.unchecked.uncheckedVariance import scala.concurrent.duration.FiniteDuration import akka.japi.Util import java.util.Comparator @@ -235,10 +234,21 @@ object Flow { Flow.fromGraph(new LazyFlow[I, O, M]( t ⇒ flowFactory.apply(t).toScala.map(_.asScala)(ExecutionContexts.sameThreadExecutionContext), () ⇒ fallback.create())) + + /** + * Upcast a stream of elements to a stream of supertypes of that element. Useful in combination with + * fan-in combinators where you do not want to pay the cost of casting each element in a `map`. + * + * @tparam SuperOut a supertype to the type of element flowing out of the flow + * @return A flow that accepts `In` and outputs elements of the super type + */ + def upcast[In, SuperOut, Out <: SuperOut, M](flow: Flow[In, Out, M]): Flow[In, SuperOut, M] = + flow.asInstanceOf[Flow[In, SuperOut, M]] + } /** Create a `Flow` which can process elements of type `T`. */ -final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph[FlowShape[In, Out], Mat] { +final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph[FlowShape[In, Out], Mat] { import scala.collection.JavaConverters._ override def shape: FlowShape[In, Out] = delegate.shape @@ -735,7 +745,7 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends * * '''Cancels when''' downstream cancels */ - def grouped(n: Int): javadsl.Flow[In, java.util.List[Out @uncheckedVariance], Mat] = + def grouped(n: Int): javadsl.Flow[In, java.util.List[Out], Mat] = new Flow(delegate.grouped(n).map(_.asJava)) // TODO optimize to one step /** @@ -810,7 +820,7 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends * * '''Cancels when''' downstream cancels */ - def sliding(n: Int, step: Int = 1): javadsl.Flow[In, java.util.List[Out @uncheckedVariance], Mat] = + def sliding(n: Int, step: Int = 1): javadsl.Flow[In, java.util.List[Out], Mat] = new Flow(delegate.sliding(n, step).map(_.asJava)) // TODO optimize to one step /** @@ -928,7 +938,7 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends * * '''Cancels when''' downstream cancels */ - def reduce(f: function.Function2[Out, Out, Out @uncheckedVariance]): javadsl.Flow[In, Out, Mat] = + def reduce(f: function.Function2[Out, Out, Out]): javadsl.Flow[In, Out, Mat] = new Flow(delegate.reduce(f.apply)) /** @@ -962,7 +972,7 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends * * '''Cancels when''' downstream cancels */ - def intersperse[T >: Out](start: T, inject: T, end: T): javadsl.Flow[In, T, Mat] = + def intersperse(start: Out, inject: Out, end: Out): javadsl.Flow[In, Out, Mat] = new Flow(delegate.intersperse(start, inject, end)) /** @@ -987,7 +997,7 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends * * '''Cancels when''' downstream cancels */ - def intersperse[T >: Out](inject: T): javadsl.Flow[In, T, Mat] = + def intersperse(inject: Out): javadsl.Flow[In, Out, Mat] = new Flow(delegate.intersperse(inject)) /** @@ -1008,7 +1018,7 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends * `n` must be positive, and `d` must be greater than 0 seconds, otherwise * IllegalArgumentException is thrown. */ - def groupedWithin(n: Int, d: FiniteDuration): javadsl.Flow[In, java.util.List[Out @uncheckedVariance], Mat] = + def groupedWithin(n: Int, d: FiniteDuration): javadsl.Flow[In, java.util.List[Out], Mat] = new Flow(delegate.groupedWithin(n, d).map(_.asJava)) // TODO optimize to one step /** @@ -1029,7 +1039,7 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends * `maxWeight` must be positive, and `d` must be greater than 0 seconds, otherwise * IllegalArgumentException is thrown. */ - def groupedWeightedWithin(maxWeight: Long, costFn: function.Function[Out, java.lang.Long], d: FiniteDuration): javadsl.Flow[In, java.util.List[Out @uncheckedVariance], Mat] = + def groupedWeightedWithin(maxWeight: Long, costFn: function.Function[Out, java.lang.Long], d: FiniteDuration): javadsl.Flow[In, java.util.List[Out], Mat] = new Flow(delegate.groupedWeightedWithin(maxWeight, d)(costFn.apply).map(_.asJava)) /** @@ -1164,7 +1174,7 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends * * '''Cancels when''' downstream cancels */ - def recover[T >: Out](pf: PartialFunction[Throwable, T]): javadsl.Flow[In, T, Mat] = + def recover(pf: PartialFunction[Throwable, Out]): javadsl.Flow[In, Out, Mat] = new Flow(delegate.recover(pf)) /** @@ -1210,7 +1220,7 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends * */ @deprecated("Use recoverWithRetries instead.", "2.4.4") - def recoverWith[T >: Out](pf: PartialFunction[Throwable, _ <: Graph[SourceShape[T], NotUsed]]): javadsl.Flow[In, T, Mat @uncheckedVariance] = + def recoverWith(pf: PartialFunction[Throwable, _ <: Graph[SourceShape[Out], NotUsed]]): javadsl.Flow[In, Out, Mat] = new Flow(delegate.recoverWith(pf)) /** @@ -1238,7 +1248,7 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends * @param attempts Maximum number of retries or -1 to retry indefinitely * @param pf Receives the failure cause and returns the new Source to be materialized if any */ - def recoverWithRetries[T >: Out](attempts: Int, pf: PartialFunction[Throwable, _ <: Graph[SourceShape[T], NotUsed]]): javadsl.Flow[In, T, Mat @uncheckedVariance] = + def recoverWithRetries(attempts: Int, pf: PartialFunction[Throwable, Graph[SourceShape[Out], NotUsed]]): javadsl.Flow[In, Out, Mat] = new Flow(delegate.recoverWithRetries(attempts, pf)) /** @@ -1341,7 +1351,7 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends * @param aggregate Takes the currently aggregated value and the current pending element to produce a new aggregate * */ - def conflate[O2 >: Out](aggregate: function.Function2[O2, O2, O2]): javadsl.Flow[In, O2, Mat] = + def conflate(aggregate: function.Function2[Out, Out, Out]): javadsl.Flow[In, Out, Mat] = new Flow(delegate.conflate(aggregate.apply)) /** @@ -1474,7 +1484,7 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends * * '''Cancels when''' downstream cancels or substream cancels */ - def prefixAndTail(n: Int): javadsl.Flow[In, akka.japi.Pair[java.util.List[Out @uncheckedVariance], javadsl.Source[Out @uncheckedVariance, NotUsed]], Mat] = + def prefixAndTail(n: Int): javadsl.Flow[In, akka.japi.Pair[java.util.List[Out], javadsl.Source[Out, NotUsed]], Mat] = new Flow(delegate.prefixAndTail(n).map { case (taken, tail) ⇒ akka.japi.Pair(taken.asJava, tail.asJava) }) /** @@ -1518,7 +1528,7 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends * @param maxSubstreams configures the maximum number of substreams (keys) * that are supported; if more distinct keys are encountered then the stream fails */ - def groupBy[K](maxSubstreams: Int, f: function.Function[Out, K]): SubFlow[In, Out @uncheckedVariance, Mat] = + def groupBy[K](maxSubstreams: Int, f: function.Function[Out, K]): SubFlow[In, Out, Mat] = new SubFlow(delegate.groupBy(maxSubstreams, f.apply)) /** @@ -1633,7 +1643,7 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends * * See also [[Flow.splitWhen]]. */ - def splitAfter[U >: Out](p: function.Predicate[Out]): SubFlow[In, Out, Mat] = + def splitAfter(p: function.Predicate[Out]): SubFlow[In, Out, Mat] = new SubFlow(delegate.splitAfter(p.test)) /** @@ -1696,7 +1706,7 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends * * '''Cancels when''' downstream cancels */ - def concat[T >: Out, M](that: Graph[SourceShape[T], M]): javadsl.Flow[In, T, Mat] = + def concat[M](that: Graph[SourceShape[Out], M]): javadsl.Flow[In, Out, Mat] = new Flow(delegate.concat(that)) /** @@ -1714,7 +1724,7 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends * * @see [[#concat]] */ - def concatMat[T >: Out, M, M2](that: Graph[SourceShape[T], M], matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, T, M2] = + def concatMat[M, M2](that: Graph[SourceShape[Out], M], matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, Out, M2] = new Flow(delegate.concatMat(that)(combinerToScala(matF))) /** @@ -1735,7 +1745,7 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends * * '''Cancels when''' downstream cancels */ - def prepend[T >: Out, M](that: Graph[SourceShape[T], M]): javadsl.Flow[In, T, Mat] = + def prepend[M](that: Graph[SourceShape[Out], M]): javadsl.Flow[In, Out, Mat] = new Flow(delegate.prepend(that)) /** @@ -1753,7 +1763,7 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends * * @see [[#prepend]] */ - def prependMat[T >: Out, M, M2](that: Graph[SourceShape[T], M], matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, T, M2] = + def prependMat[M, M2](that: Graph[SourceShape[Out], M], matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, Out, M2] = new Flow(delegate.prependMat(that)(combinerToScala(matF))) /** @@ -1778,7 +1788,7 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends * '''Cancels when''' downstream cancels and additionally the alternative is cancelled as soon as an element passes * by from this stream. */ - def orElse[T >: Out, M](secondary: Graph[SourceShape[T], M]): javadsl.Flow[In, T, Mat] = + def orElse[M](secondary: Graph[SourceShape[Out], M]): javadsl.Flow[In, Out, Mat] = new Flow(delegate.orElse(secondary)) /** @@ -1791,9 +1801,9 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends * * @see [[#orElse]] */ - def orElseMat[T >: Out, M2, M3]( - secondary: Graph[SourceShape[T], M2], - matF: function.Function2[Mat, M2, M3]): javadsl.Flow[In, T, M3] = + def orElseMat[M2, M3]( + secondary: Graph[SourceShape[Out], M2], + matF: function.Function2[Mat, M2, M3]): javadsl.Flow[In, Out, M3] = new Flow(delegate.orElseMat(secondary)(combinerToScala(matF))) /** @@ -1910,7 +1920,7 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends * * '''Cancels when''' downstream cancels */ - def interleave[T >: Out](that: Graph[SourceShape[T], _], segmentSize: Int): javadsl.Flow[In, T, Mat] = + def interleave(that: Graph[SourceShape[Out], _], segmentSize: Int): javadsl.Flow[In, Out, Mat] = interleave(that, segmentSize, eagerClose = false) /** @@ -1933,7 +1943,7 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends * * '''Cancels when''' downstream cancels */ - def interleave[T >: Out](that: Graph[SourceShape[T], _], segmentSize: Int, eagerClose: Boolean): javadsl.Flow[In, T, Mat] = + def interleave(that: Graph[SourceShape[Out], _], segmentSize: Int, eagerClose: Boolean): javadsl.Flow[In, Out, Mat] = new Flow(delegate.interleave(that, segmentSize, eagerClose)) /** @@ -1950,8 +1960,8 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends * * @see [[#interleave]] */ - def interleaveMat[T >: Out, M, M2](that: Graph[SourceShape[T], M], segmentSize: Int, - matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, T, M2] = + def interleaveMat[M, M2](that: Graph[SourceShape[Out], M], segmentSize: Int, + matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, Out, M2] = interleaveMat(that, segmentSize, eagerClose = false, matF) /** @@ -1970,8 +1980,8 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends * * @see [[#interleave]] */ - def interleaveMat[T >: Out, M, M2](that: Graph[SourceShape[T], M], segmentSize: Int, eagerClose: Boolean, - matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, T, M2] = + def interleaveMat[M, M2](that: Graph[SourceShape[Out], M], segmentSize: Int, eagerClose: Boolean, + matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, Out, M2] = new Flow(delegate.interleaveMat(that, segmentSize, eagerClose)(combinerToScala(matF))) /** @@ -1986,7 +1996,7 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends * * '''Cancels when''' downstream cancels */ - def merge[T >: Out](that: Graph[SourceShape[T], _]): javadsl.Flow[In, T, Mat] = + def merge(that: Graph[SourceShape[Out], _]): javadsl.Flow[In, Out, Mat] = merge(that, eagerComplete = false) /** @@ -2001,7 +2011,7 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends * * '''Cancels when''' downstream cancels */ - def merge[T >: Out](that: Graph[SourceShape[T], _], eagerComplete: Boolean): javadsl.Flow[In, T, Mat] = + def merge(that: Graph[SourceShape[Out], _], eagerComplete: Boolean): javadsl.Flow[In, Out, Mat] = new Flow(delegate.merge(that, eagerComplete)) /** @@ -2013,9 +2023,9 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends * * @see [[#merge]] */ - def mergeMat[T >: Out, M, M2]( - that: Graph[SourceShape[T], M], - matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, T, M2] = + def mergeMat[M, M2]( + that: Graph[SourceShape[Out], M], + matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, Out, M2] = mergeMat(that, matF, eagerComplete = false) /** @@ -2027,10 +2037,10 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends * * @see [[#merge]] */ - def mergeMat[T >: Out, M, M2]( - that: Graph[SourceShape[T], M], + def mergeMat[M, M2]( + that: Graph[SourceShape[Out], M], matF: function.Function2[Mat, M, M2], - eagerComplete: Boolean): javadsl.Flow[In, T, M2] = + eagerComplete: Boolean): javadsl.Flow[In, Out, M2] = new Flow(delegate.mergeMat(that, eagerComplete)(combinerToScala(matF))) /** @@ -2048,7 +2058,7 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends * * '''Cancels when''' downstream cancels */ - def mergeSorted[U >: Out, M](that: Graph[SourceShape[U], M], comp: Comparator[U]): javadsl.Flow[In, U, Mat] = + def mergeSorted[M](that: Graph[SourceShape[Out], M], comp: Comparator[Out]): javadsl.Flow[In, Out, Mat] = new Flow(delegate.mergeSorted(that)(Ordering.comparatorToOrdering(comp))) /** @@ -2063,8 +2073,8 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends * * @see [[#mergeSorted]]. */ - def mergeSortedMat[U >: Out, Mat2, Mat3](that: Graph[SourceShape[U], Mat2], comp: Comparator[U], - matF: function.Function2[Mat, Mat2, Mat3]): javadsl.Flow[In, U, Mat3] = + def mergeSortedMat[Mat2, Mat3](that: Graph[SourceShape[Out], Mat2], comp: Comparator[Out], + matF: function.Function2[Mat, Mat2, Mat3]): javadsl.Flow[In, Out, Mat3] = new Flow(delegate.mergeSortedMat(that)(combinerToScala(matF))(Ordering.comparatorToOrdering(comp))) /** @@ -2078,7 +2088,7 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends * * '''Cancels when''' downstream cancels */ - def zip[T](source: Graph[SourceShape[T], _]): javadsl.Flow[In, Out @uncheckedVariance Pair T, Mat] = + def zip[T](source: Graph[SourceShape[T], _]): javadsl.Flow[In, Out Pair T, Mat] = zipMat(source, Keep.left) /** @@ -2091,11 +2101,11 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends */ def zipMat[T, M, M2]( that: Graph[SourceShape[T], M], - matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, Out @uncheckedVariance Pair T, M2] = + matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, Out Pair T, M2] = this.viaMat(Flow.fromGraph(GraphDSL.create( that, - new function.Function2[GraphDSL.Builder[M], SourceShape[T], FlowShape[Out, Out @uncheckedVariance Pair T]] { - def apply(b: GraphDSL.Builder[M], s: SourceShape[T]): FlowShape[Out, Out @uncheckedVariance Pair T] = { + new function.Function2[GraphDSL.Builder[M], SourceShape[T], FlowShape[Out, Out Pair T]] { + def apply(b: GraphDSL.Builder[M], s: SourceShape[T]): FlowShape[Out, Out Pair T] = { val zip: FanInShape2[Out, T, Out Pair T] = b.add(Zip.create[Out, T]) b.from(s).toInlet(zip.in1) FlowShape(zip.in0, zip.out) @@ -2146,7 +2156,7 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends * * '''Cancels when''' downstream cancels */ - def zipWithIndex: Flow[In, Pair[Out @uncheckedVariance, Long], Mat] = + def zipWithIndex: Flow[In, Pair[Out, Long], Mat] = new Flow(delegate.zipWithIndex.map { case (elem, index) ⇒ Pair(elem, index) }) /** @@ -2228,7 +2238,7 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends * * '''Cancels when''' downstream cancels */ - def keepAlive[U >: Out](maxIdle: FiniteDuration, injectedElem: function.Creator[U]): javadsl.Flow[In, U, Mat] = + def keepAlive(maxIdle: FiniteDuration, injectedElem: function.Creator[Out]): javadsl.Flow[In, Out, Mat] = new Flow(delegate.keepAlive(maxIdle, () ⇒ injectedElem.create())) /** @@ -2527,7 +2537,7 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends * * @return A [[RunnableGraph]] that materializes to a Processor when run() is called on it. */ - def toProcessor: RunnableGraph[Processor[In @uncheckedVariance, Out @uncheckedVariance]] = { + def toProcessor: RunnableGraph[Processor[In, Out]] = { RunnableGraph.fromGraph(delegate.toProcessor) } } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Hub.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Hub.scala index 2fc343dd6d..ac62b334c5 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Hub.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Hub.scala @@ -36,7 +36,7 @@ object MergeHub { */ def of[T](clazz: Class[T], perProducerBufferSize: Int): Source[T, Sink[T, NotUsed]] = { akka.stream.scaladsl.MergeHub.source[T](perProducerBufferSize) - .mapMaterializedValue(_.asJava) + .mapMaterializedValue(_.asJava[T, NotUsed]) .asJava } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala index 3009fc8e02..86c04d4eab 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala @@ -288,7 +288,7 @@ object Sink { * A `Sink` is a set of stream processing steps that has one open input. * Can be used as a `Subscriber` */ -final class Sink[-In, +Mat](delegate: scaladsl.Sink[In, Mat]) extends Graph[SinkShape[In], Mat] { +final class Sink[In, Mat](delegate: scaladsl.Sink[In, Mat]) extends Graph[SinkShape[In], Mat] { override def shape: SinkShape[In] = delegate.shape override def traversalBuilder: LinearTraversalBuilder = delegate.traversalBuilder diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 101a61c2ef..d6fe9f23c2 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -464,6 +464,27 @@ object Source { (s: S) ⇒ read.apply(s).toScala.map(_.asScala)(akka.dispatch.ExecutionContexts.sameThreadExecutionContext), (s: S) ⇒ close.apply(s).toScala)) + /** + * Upcast a stream of elements to a stream of supertypes of that element. Useful in combination with + * fan-in combinators where you do not want to pay the cost of casting each element in a `map`. + * + * Example: + * + * {{{ + * Source apples = Source.single(new Apple()); + * Source oranges = Source.single(new Orange()); + * Source appleFruits = Source.upcast(apples); + * Source orangeFruits = Source.upcast(oranges); + * + * Source fruits = appleFruits.merge(orangeFruits); + * }}} + * + * @tparam SuperOut a supertype to the type of elements in stream + * @return A source with the supertype as elements + */ + def upcast[SuperOut, Out <: SuperOut, Mat](source: Source[Out, Mat]): Source[SuperOut, Mat] = + source.asInstanceOf[Source[SuperOut, Mat]] + } /** @@ -472,7 +493,7 @@ object Source { * A `Source` is a set of stream processing steps that has one open output and an attached input. * Can be used as a `Publisher` */ -final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[SourceShape[Out], Mat] { +final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[SourceShape[Out], Mat] { import scala.collection.JavaConverters._ @@ -627,7 +648,7 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap * which is semantically in-line with that Scala's standard library collections * do in such situations. */ - def runReduce[U >: Out](f: function.Function2[U, U, U], materializer: Materializer): CompletionStage[U] = + def runReduce(f: function.Function2[Out, Out, Out], materializer: Materializer): CompletionStage[Out] = runWith(Sink.reduce(f), materializer) /** @@ -648,7 +669,7 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap * * '''Cancels when''' downstream cancels */ - def concat[T >: Out, M](that: Graph[SourceShape[T], M]): javadsl.Source[T, Mat] = + def concat[M](that: Graph[SourceShape[Out], M]): javadsl.Source[Out, Mat] = new Source(delegate.concat(that)) /** @@ -666,9 +687,9 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap * * @see [[#concat]]. */ - def concatMat[T >: Out, M, M2]( - that: Graph[SourceShape[T], M], - matF: function.Function2[Mat, M, M2]): javadsl.Source[T, M2] = + def concatMat[M, M2]( + that: Graph[SourceShape[Out], M], + matF: function.Function2[Mat, M, M2]): javadsl.Source[Out, M2] = new Source(delegate.concatMat(that)(combinerToScala(matF))) /** @@ -689,7 +710,7 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap * * '''Cancels when''' downstream cancels */ - def prepend[T >: Out, M](that: Graph[SourceShape[T], M]): javadsl.Source[T, Mat] = + def prepend[M](that: Graph[SourceShape[Out], M]): javadsl.Source[Out, Mat] = new Source(delegate.prepend(that)) /** @@ -707,9 +728,9 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap * * @see [[#prepend]]. */ - def prependMat[T >: Out, M, M2]( - that: Graph[SourceShape[T], M], - matF: function.Function2[Mat, M, M2]): javadsl.Source[T, M2] = + def prependMat[M, M2]( + that: Graph[SourceShape[Out], M], + matF: function.Function2[Mat, M, M2]): javadsl.Source[Out, M2] = new Source(delegate.prependMat(that)(combinerToScala(matF))) /** @@ -734,7 +755,7 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap * '''Cancels when''' downstream cancels and additionally the alternative is cancelled as soon as an element passes * by from this stream. */ - def orElse[T >: Out, M](secondary: Graph[SourceShape[T], M]): javadsl.Source[T, Mat] = + def orElse[M](secondary: Graph[SourceShape[Out], M]): javadsl.Source[Out, Mat] = new Source(delegate.orElse(secondary)) /** @@ -747,7 +768,7 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap * * @see [[#orElse]] */ - def orElseMat[T >: Out, M, M2](secondary: Graph[SourceShape[T], M], matF: function.Function2[Mat, M, M2]): javadsl.Source[T, M2] = + def orElseMat[M, M2](secondary: Graph[SourceShape[Out], M], matF: function.Function2[Mat, M, M2]): javadsl.Source[Out, M2] = new Source(delegate.orElseMat(secondary)(combinerToScala(matF))) /** @@ -862,7 +883,7 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap * * '''Cancels when''' downstream cancels */ - def interleave[T >: Out](that: Graph[SourceShape[T], _], segmentSize: Int): javadsl.Source[T, Mat] = + def interleave(that: Graph[SourceShape[Out], _], segmentSize: Int): javadsl.Source[Out, Mat] = new Source(delegate.interleave(that, segmentSize)) /** @@ -879,8 +900,8 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap * * @see [[#interleave]]. */ - def interleaveMat[T >: Out, M, M2](that: Graph[SourceShape[T], M], segmentSize: Int, - matF: function.Function2[Mat, M, M2]): javadsl.Source[T, M2] = + def interleaveMat[M, M2](that: Graph[SourceShape[Out], M], segmentSize: Int, + matF: function.Function2[Mat, M, M2]): javadsl.Source[Out, M2] = new Source(delegate.interleaveMat(that, segmentSize)(combinerToScala(matF))) /** @@ -895,7 +916,7 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap * * '''Cancels when''' downstream cancels */ - def merge[T >: Out](that: Graph[SourceShape[T], _]): javadsl.Source[T, Mat] = + def merge(that: Graph[SourceShape[Out], _]): javadsl.Source[Out, Mat] = new Source(delegate.merge(that)) /** @@ -907,9 +928,9 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap * * @see [[#merge]]. */ - def mergeMat[T >: Out, M, M2]( - that: Graph[SourceShape[T], M], - matF: function.Function2[Mat, M, M2]): javadsl.Source[T, M2] = + def mergeMat[M, M2]( + that: Graph[SourceShape[Out], M], + matF: function.Function2[Mat, M, M2]): javadsl.Source[Out, M2] = new Source(delegate.mergeMat(that)(combinerToScala(matF))) /** @@ -927,7 +948,7 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap * * '''Cancels when''' downstream cancels */ - def mergeSorted[U >: Out, M](that: Graph[SourceShape[U], M], comp: util.Comparator[U]): javadsl.Source[U, Mat] = + def mergeSorted[M](that: Graph[SourceShape[Out], M], comp: util.Comparator[Out]): javadsl.Source[Out, Mat] = new Source(delegate.mergeSorted(that)(Ordering.comparatorToOrdering(comp))) /** @@ -942,8 +963,8 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap * * @see [[#mergeSorted]]. */ - def mergeSortedMat[U >: Out, Mat2, Mat3](that: Graph[SourceShape[U], Mat2], comp: util.Comparator[U], - matF: function.Function2[Mat, Mat2, Mat3]): javadsl.Source[U, Mat3] = + def mergeSortedMat[Mat2, Mat3](that: Graph[SourceShape[Out], Mat2], comp: util.Comparator[Out], + matF: function.Function2[Mat, Mat2, Mat3]): javadsl.Source[Out, Mat3] = new Source(delegate.mergeSortedMat(that)(combinerToScala(matF))(Ordering.comparatorToOrdering(comp))) /** @@ -1063,7 +1084,7 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap * '''Cancels when''' downstream cancels */ @deprecated("Use recoverWithRetries instead.", "2.4.4") - def recover[T >: Out](pf: PartialFunction[Throwable, T]): javadsl.Source[T, Mat] = + def recover(pf: PartialFunction[Throwable, Out]): javadsl.Source[Out, Mat] = new Source(delegate.recover(pf)) /** @@ -1108,7 +1129,7 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap * '''Cancels when''' downstream cancels * */ - def recoverWith[T >: Out](pf: PartialFunction[Throwable, _ <: Graph[SourceShape[T], NotUsed]]): Source[T, Mat @uncheckedVariance] = + def recoverWith(pf: PartialFunction[Throwable, _ <: Graph[SourceShape[Out], NotUsed]]): Source[Out, Mat] = new Source(delegate.recoverWith(pf)) /** @@ -1134,7 +1155,7 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap * '''Cancels when''' downstream cancels * */ - def recoverWithRetries[T >: Out](attempts: Int, pf: PartialFunction[Throwable, _ <: Graph[SourceShape[T], NotUsed]]): Source[T, Mat @uncheckedVariance] = + def recoverWithRetries(attempts: Int, pf: PartialFunction[Throwable, _ <: Graph[SourceShape[Out], NotUsed]]): Source[Out, Mat] = new Source(delegate.recoverWithRetries(attempts, pf)) /** @@ -1636,7 +1657,7 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap * * '''Cancels when''' downstream cancels */ - def intersperse[T >: Out](start: T, inject: T, end: T): javadsl.Source[T, Mat] = + def intersperse(start: Out, inject: Out, end: Out): javadsl.Source[Out, Mat] = new Source(delegate.intersperse(start, inject, end)) /** @@ -1661,7 +1682,7 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap * * '''Cancels when''' downstream cancels */ - def intersperse[T >: Out](inject: T): javadsl.Source[T, Mat] = + def intersperse(inject: Out): javadsl.Source[Out, Mat] = new Source(delegate.intersperse(inject)) /** @@ -1893,7 +1914,7 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap * * @param aggregate Takes the currently aggregated value and the current pending element to produce a new aggregate */ - def conflate[O2 >: Out](aggregate: function.Function2[O2, O2, O2]): javadsl.Source[O2, Mat] = + def conflate(aggregate: function.Function2[Out, Out, Out]): javadsl.Source[Out, Mat] = new Source(delegate.conflate(aggregate.apply)) /** @@ -2170,7 +2191,7 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap * * See also [[Source.splitWhen]]. */ - def splitAfter[U >: Out](p: function.Predicate[Out]): SubSource[Out, Mat] = + def splitAfter(p: function.Predicate[Out]): SubSource[Out, Mat] = new SubSource(delegate.splitAfter(p.test)) /** @@ -2284,7 +2305,7 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap * * '''Cancels when''' downstream cancels */ - def keepAlive[U >: Out](maxIdle: FiniteDuration, injectedElem: function.Creator[U]): javadsl.Source[U, Mat] = + def keepAlive(maxIdle: FiniteDuration, injectedElem: function.Creator[Out]): javadsl.Source[Out, Mat] = new Source(delegate.keepAlive(maxIdle, () ⇒ injectedElem.create())) /** diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala index 9f507eb956..039da712c9 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala @@ -21,12 +21,24 @@ import java.util.concurrent.CompletionStage import scala.reflect.ClassTag +object SubFlow { + /** + * Upcast a stream of elements to a stream of supertypes of that element. Useful in combination with + * fan-in combinators where you do not want to pay the cost of casting each element in a `map`. + * + * @tparam SuperOut a supertype to the type of element flowing out of the flow + * @return A flow that accepts `In` and outputs elements of the super type + */ + def upcast[In, SuperOut, Out <: SuperOut, M](flow: SubFlow[In, Out, M]): SubFlow[In, SuperOut, M] = + flow.asInstanceOf[SubFlow[In, SuperOut, M]] +} + /** * A “stream of streams” sub-flow of data elements, e.g. produced by `groupBy`. * SubFlows cannot contribute to the super-flow’s materialized value since they * are materialized later, during the runtime of the flow graph processing. */ -class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flow[In, Out, Mat]#Repr, scaladsl.Sink[In, Mat]]) { +class SubFlow[In, Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flow[In, Out, Mat]#Repr, scaladsl.Sink[In, Mat]]) { /** Converts this Flow to its Scala DSL counterpart */ def asScala: scaladsl.SubFlow[Out, Mat, scaladsl.Flow[In, Out, Mat]#Repr, scaladsl.Sink[In, Mat]] @uncheckedVariance = delegate @@ -548,7 +560,7 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo * * '''Cancels when''' downstream cancels */ - def intersperse[T >: Out](start: T, inject: T, end: T): SubFlow[In, T, Mat] = + def intersperse(start: Out, inject: Out, end: Out): SubFlow[In, Out, Mat] = new SubFlow(delegate.intersperse(start, inject, end)) /** @@ -573,7 +585,7 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo * * '''Cancels when''' downstream cancels */ - def intersperse[T >: Out](inject: T): SubFlow[In, T, Mat] = + def intersperse(inject: Out): SubFlow[In, Out, Mat] = new SubFlow(delegate.intersperse(inject)) /** @@ -749,7 +761,7 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo * '''Cancels when''' downstream cancels * */ - def recover[T >: Out](pf: PartialFunction[Throwable, T]): SubFlow[In, T, Mat] = + def recover(pf: PartialFunction[Throwable, Out]): SubFlow[In, Out, Mat] = new SubFlow(delegate.recover(pf)) /** @@ -773,7 +785,7 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo * */ @deprecated("Use recoverWithRetries instead.", "2.4.4") - def recoverWith[T >: Out](pf: PartialFunction[Throwable, _ <: Graph[SourceShape[T], NotUsed]]): SubFlow[In, T, Mat @uncheckedVariance] = + def recoverWith(pf: PartialFunction[Throwable, Graph[SourceShape[Out], NotUsed]]): SubFlow[In, Out, Mat @uncheckedVariance] = new SubFlow(delegate.recoverWith(pf)) /** @@ -799,7 +811,7 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo * '''Cancels when''' downstream cancels * */ - def recoverWithRetries[T >: Out](attempts: Int, pf: PartialFunction[Throwable, _ <: Graph[SourceShape[T], NotUsed]]): SubFlow[In, T, Mat @uncheckedVariance] = + def recoverWithRetries(attempts: Int, pf: PartialFunction[Throwable, Graph[SourceShape[Out], NotUsed]]): SubFlow[In, Out, Mat] = new SubFlow(delegate.recoverWithRetries(attempts, pf)) /** @@ -920,7 +932,7 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo * @param aggregate Takes the currently aggregated value and the current pending element to produce a new aggregate * */ - def conflate[O2 >: Out](aggregate: function.Function2[O2, O2, O2]): SubFlow[In, O2, Mat] = + def conflate(aggregate: function.Function2[Out, Out, Out]): SubFlow[In, Out, Mat] = new SubFlow(delegate.conflate(aggregate.apply)) /** @@ -1107,7 +1119,7 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo * * '''Cancels when''' downstream cancels */ - def concat[T >: Out, M](that: Graph[SourceShape[T], M]): SubFlow[In, T, Mat] = + def concat[M](that: Graph[SourceShape[Out], M]): SubFlow[In, Out, Mat] = new SubFlow(delegate.concat(that)) /** @@ -1128,7 +1140,7 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo * * '''Cancels when''' downstream cancels */ - def prepend[T >: Out, M](that: Graph[SourceShape[T], M]): SubFlow[In, T, Mat] = + def prepend[M](that: Graph[SourceShape[Out], M]): SubFlow[In, Out, Mat] = new SubFlow(delegate.prepend(that)) /** @@ -1153,7 +1165,7 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo * '''Cancels when''' downstream cancels and additionally the alternative is cancelled as soon as an element passes * by from this stream. */ - def orElse[T >: Out, M](secondary: Graph[SourceShape[T], M]): javadsl.SubFlow[In, T, Mat] = + def orElse[M](secondary: Graph[SourceShape[Out], M]): javadsl.SubFlow[In, Out, Mat] = new SubFlow(delegate.orElse(secondary)) /** @@ -1215,7 +1227,7 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo * * '''Cancels when''' downstream cancels */ - def merge[T >: Out](that: Graph[SourceShape[T], _]): SubFlow[In, T, Mat] = + def merge(that: Graph[SourceShape[Out], _]): SubFlow[In, Out, Mat] = new SubFlow(delegate.merge(that)) /** @@ -1241,7 +1253,7 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo * * '''Cancels when''' downstream cancels */ - def interleave[T >: Out](that: Graph[SourceShape[T], _], segmentSize: Int): SubFlow[In, T, Mat] = + def interleave(that: Graph[SourceShape[Out], _], segmentSize: Int): SubFlow[In, Out, Mat] = new SubFlow(delegate.interleave(that, segmentSize)) /** @@ -1259,7 +1271,7 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo * * '''Cancels when''' downstream cancels */ - def mergeSorted[U >: Out, M](that: Graph[SourceShape[U], M], comp: Comparator[U]): javadsl.SubFlow[In, U, Mat] = + def mergeSorted[M](that: Graph[SourceShape[Out], M], comp: Comparator[Out]): javadsl.SubFlow[In, Out, Mat] = new SubFlow(delegate.mergeSorted(that)(Ordering.comparatorToOrdering(comp))) /** @@ -1387,7 +1399,7 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo * * '''Cancels when''' downstream cancels */ - def keepAlive[U >: Out](maxIdle: FiniteDuration, injectedElem: function.Creator[U]): SubFlow[In, U, Mat] = + def keepAlive(maxIdle: FiniteDuration, injectedElem: function.Creator[Out]): SubFlow[In, Out, Mat] = new SubFlow(delegate.keepAlive(maxIdle, () ⇒ injectedElem.create())) /** diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala index b2d3c3e81d..df30edc348 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala @@ -22,12 +22,20 @@ import akka.stream.impl.fusing.MapError import scala.compat.java8.FutureConverters._ import scala.reflect.ClassTag +/** + * * Upcast a stream of elements to a stream of supertypes of that element. Useful in combination with + * fan-in combinators where you do not want to pay the cost of casting each element in a `map`. + */ +object SubSource { + def upcast[U, T <: U, Mat](source: SubSource[T, Mat]): SubSource[U, Mat] = source.asInstanceOf[SubSource[U, Mat]] +} + /** * A “stream of streams” sub-flow of data elements, e.g. produced by `groupBy`. * SubFlows cannot contribute to the super-flow’s materialized value since they * are materialized later, during the runtime of the flow graph processing. */ -class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source[Out, Mat]#Repr, scaladsl.RunnableGraph[Mat]]) { +class SubSource[Out, Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source[Out, Mat]#Repr, scaladsl.RunnableGraph[Mat]]) { /** Converts this Flow to its Scala DSL counterpart */ def asScala: scaladsl.SubFlow[Out, Mat, scaladsl.Source[Out, Mat]#Repr, scaladsl.RunnableGraph[Mat]] @uncheckedVariance = delegate @@ -545,7 +553,7 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source * * '''Cancels when''' downstream cancels */ - def intersperse[T >: Out](start: T, inject: T, end: T): SubSource[T, Mat] = + def intersperse(start: Out, inject: Out, end: Out): SubSource[Out, Mat] = new SubSource(delegate.intersperse(start, inject, end)) /** @@ -570,7 +578,7 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source * * '''Cancels when''' downstream cancels */ - def intersperse[T >: Out](inject: T): SubSource[T, Mat] = + def intersperse(inject: Out): SubSource[Out, Mat] = new SubSource(delegate.intersperse(inject)) /** @@ -744,7 +752,7 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source * '''Cancels when''' downstream cancels * */ - def recover[T >: Out](pf: PartialFunction[Throwable, T]): SubSource[T, Mat] = + def recover(pf: PartialFunction[Throwable, Out]): SubSource[Out, Mat] = new SubSource(delegate.recover(pf)) /** @@ -766,7 +774,7 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source * */ @deprecated("Use recoverWithRetries instead.", "2.4.4") - def recoverWith[T >: Out](pf: PartialFunction[Throwable, _ <: Graph[SourceShape[T], NotUsed]]): SubSource[T, Mat @uncheckedVariance] = + def recoverWith(pf: PartialFunction[Throwable, Graph[SourceShape[Out], NotUsed]]): SubSource[Out, Mat] = new SubSource(delegate.recoverWith(pf)) /** @@ -790,7 +798,7 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source * '''Cancels when''' downstream cancels * */ - def recoverWithRetries[T >: Out](attempts: Int, pf: PartialFunction[Throwable, _ <: Graph[SourceShape[T], NotUsed]]): SubSource[T, Mat @uncheckedVariance] = + def recoverWithRetries(attempts: Int, pf: PartialFunction[Throwable, Graph[SourceShape[Out], NotUsed]]): SubSource[Out, Mat] = new SubSource(delegate.recoverWithRetries(attempts, pf)) /** @@ -911,7 +919,7 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source * @param aggregate Takes the currently aggregated value and the current pending element to produce a new aggregate * */ - def conflate[O2 >: Out](aggregate: function.Function2[O2, O2, O2]): SubSource[O2, Mat] = + def conflate(aggregate: function.Function2[Out, Out, Out]): SubSource[Out, Mat] = new SubSource(delegate.conflate(aggregate.apply)) /** @@ -992,7 +1000,6 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source * * '''Cancels when''' downstream cancels * - * @param seed Provides the first state for extrapolation using the first unconsumed element * @param extrapolate Takes the current extrapolation state to produce an output element and the next extrapolation * state. */ @@ -1098,7 +1105,7 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source * * '''Cancels when''' downstream cancels */ - def concat[T >: Out, M](that: Graph[SourceShape[T], M]): SubSource[T, Mat] = + def concat[M](that: Graph[SourceShape[Out], M]): SubSource[Out, Mat] = new SubSource(delegate.concat(that)) /** @@ -1119,7 +1126,7 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source * * '''Cancels when''' downstream cancels */ - def prepend[T >: Out, M](that: Graph[SourceShape[T], M]): SubSource[T, Mat] = + def prepend[M](that: Graph[SourceShape[Out], M]): SubSource[Out, Mat] = new SubSource(delegate.prepend(that)) /** @@ -1144,7 +1151,7 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source * '''Cancels when''' downstream cancels and additionally the alternative is cancelled as soon as an element passes * by from this stream. */ - def orElse[T >: Out, M](secondary: Graph[SourceShape[T], M]): javadsl.SubSource[T, Mat] = + def orElse[M](secondary: Graph[SourceShape[Out], M]): javadsl.SubSource[Out, Mat] = new SubSource(delegate.orElse(secondary)) /** @@ -1206,7 +1213,7 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source * * '''Cancels when''' downstream cancels */ - def merge[T >: Out](that: Graph[SourceShape[T], _]): SubSource[T, Mat] = + def merge(that: Graph[SourceShape[Out], _]): SubSource[Out, Mat] = new SubSource(delegate.merge(that)) /** @@ -1233,7 +1240,7 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source * * '''Cancels when''' downstream cancels */ - def interleave[T >: Out](that: Graph[SourceShape[T], _], segmentSize: Int): SubSource[T, Mat] = + def interleave(that: Graph[SourceShape[Out], _], segmentSize: Int): SubSource[Out, Mat] = new SubSource(delegate.interleave(that, segmentSize)) /** @@ -1251,7 +1258,7 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source * * '''Cancels when''' downstream cancels */ - def mergeSorted[U >: Out, M](that: Graph[SourceShape[U], M], comp: Comparator[U]): javadsl.SubSource[U, Mat] = + def mergeSorted[M](that: Graph[SourceShape[Out], M], comp: Comparator[Out]): javadsl.SubSource[Out, Mat] = new SubSource(delegate.mergeSorted(that)(Ordering.comparatorToOrdering(comp))) /** @@ -1379,7 +1386,7 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source * * '''Cancels when''' downstream cancels */ - def keepAlive[U >: Out](maxIdle: FiniteDuration, injectedElem: function.Creator[U]): SubSource[U, Mat] = + def keepAlive(maxIdle: FiniteDuration, injectedElem: function.Creator[Out]): SubSource[Out, Mat] = new SubSource(delegate.keepAlive(maxIdle, () ⇒ injectedElem.create())) /** diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/BidiFlow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/BidiFlow.scala index 8be744705b..0f869107d1 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/BidiFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/BidiFlow.scala @@ -15,7 +15,8 @@ final class BidiFlow[-I1, +O1, -I2, +O2, +Mat]( override val shape: BidiShape[I1, O1, I2, O2] ) extends Graph[BidiShape[I1, O1, I2, O2], Mat] { - def asJava: javadsl.BidiFlow[I1, O1, I2, O2, Mat] = new javadsl.BidiFlow(this) + def asJava[JI1 <: I1, JO1 >: O1, JI2 <: I2, JO2 >: O2, JMat >: Mat]: javadsl.BidiFlow[JI1, JO1, JI2, JO2, JMat] = + new javadsl.BidiFlow(this) /** * Add the given BidiFlow as the next step in a bidirectional transformation diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index a711cd7644..e7689ea94a 100755 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -309,7 +309,8 @@ final class Flow[-In, +Out, +Mat]( } /** Converts this Scala DSL element to it's Java DSL counterpart. */ - def asJava: javadsl.Flow[In, Out, Mat] = new javadsl.Flow(this) + def asJava[JIn <: In, JOut >: Out, JMat >: Mat]: javadsl.Flow[JIn, JOut, JMat] = + new javadsl.Flow(this) } object Flow { diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala index bffc96beb2..bb910e54a2 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala @@ -119,7 +119,7 @@ final class Sink[-In, +Mat]( /** * Converts this Scala DSL element to it's Java DSL counterpart. */ - def asJava: javadsl.Sink[In, Mat] = new javadsl.Sink(this) + def asJava[JIn <: In, JMat >: Mat]: javadsl.Sink[JIn, JMat] = new javadsl.Sink(this) } object Sink { diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index 86eb6daa5c..9f672ff62b 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -195,7 +195,7 @@ final class Source[+Out, +Mat]( /** * Converts this Scala DSL element to it's Java DSL counterpart. */ - def asJava: javadsl.Source[Out, Mat] = new javadsl.Source(this) + def asJava[JOut >: Out, JMat >: Mat]: javadsl.Source[JOut, JMat] = new javadsl.Source(this) /** * Combines several sources with fan-in strategy like `Merge` or `Concat` and returns `Source`.