diff --git a/akka-docs/rst/project/migration-guide-2.4.x-2.5.x.rst b/akka-docs/rst/project/migration-guide-2.4.x-2.5.x.rst index 3399c8282d..25cf629058 100644 --- a/akka-docs/rst/project/migration-guide-2.4.x-2.5.x.rst +++ b/akka-docs/rst/project/migration-guide-2.4.x-2.5.x.rst @@ -270,6 +270,16 @@ would now be:: as the ``GraphStage`` itself is a factory of logic instances. +SubFlow.zip and SubSource.zip now emit akka.japi.Pair instead of Scala's Pair +----------------------------------------------------------------------------- + +The the Java API's ``zip`` operator on ``SubFlow`` and ``SubSource`` has been emiting +Scala's ``Pair`` (``Tuple2``) instead of ``akka.japi.Pair``. This is fixed in Akka 2.5 where it emits the proper +Java DSl type. + +Please note that the ``zip`` operator on ``Source`` and ``Flow`` has had the correct type, +this change only affects the ``Sub...`` versions of those classes. + Deprecation of ActorSubscriber and ActorPublisher ------------------------------------------------- diff --git a/akka-stream/src/main/scala/akka/stream/impl/ConstantFun.scala b/akka-stream/src/main/scala/akka/stream/impl/ConstantFun.scala index 10f11603e8..c8b3576596 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ConstantFun.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ConstantFun.scala @@ -19,12 +19,14 @@ private[akka] object ConstantFun { def javaIdentityFunction[T]: JFun[T, T] = JavaIdentityFunction.asInstanceOf[JFun[T, T]] - def scalaIdentityFunction[T]: T ⇒ T = conforms + def scalaIdentityFunction[T]: T ⇒ T = conforms.asInstanceOf[Function[T, T]] def scalaAnyToNone[A, B]: A ⇒ Option[B] = none def scalaAnyTwoToNone[A, B, C]: (A, B) ⇒ Option[C] = two2none def javaAnyToNone[A, B]: A ⇒ Option[B] = none + val conforms = (a: Any) ⇒ a + val zeroLong = (_: Any) ⇒ 0L val oneLong = (_: Any) ⇒ 1L diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala index 52d78c07ae..253c9d2ee5 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala @@ -18,8 +18,6 @@ import java.util.Comparator import scala.compat.java8.FutureConverters._ import java.util.concurrent.CompletionStage -import akka.stream.impl.fusing.MapError - /** * A “stream of streams” sub-flow of data elements, e.g. produced by `groupBy`. * SubFlows cannot contribute to the super-flow’s materialized value since they @@ -1161,8 +1159,8 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo * * '''Cancels when''' downstream cancels */ - def zip[T](source: Graph[SourceShape[T], _]): SubFlow[In, Out @uncheckedVariance Pair T, Mat] = - new SubFlow(delegate.zip(source)) + def zip[T](source: Graph[SourceShape[T], _]): SubFlow[In, akka.japi.Pair[Out @uncheckedVariance, T], Mat] = + new SubFlow(delegate.zip(source).map { case (o, t) ⇒ akka.japi.Pair.create(o, t) }) /** * Put together the elements of current [[Flow]] and the given [[Source]] diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala index adf936b818..8bb8f13292 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala @@ -1154,8 +1154,8 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source * * '''Cancels when''' downstream cancels */ - def zip[T](source: Graph[SourceShape[T], _]): SubSource[Out @uncheckedVariance Pair T, Mat] = - new SubSource(delegate.zip(source)) + def zip[T](source: Graph[SourceShape[T], _]): SubSource[akka.japi.Pair[Out @uncheckedVariance, T], Mat] = + new SubSource(delegate.zip(source).map { case (o, t) ⇒ akka.japi.Pair.create(o, t) }) /** * Put together the elements of current [[Flow]] and the given [[Source]] diff --git a/project/Dependencies.scala b/project/Dependencies.scala index ca74ba1c35..227f6e732f 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -24,14 +24,16 @@ object Dependencies { scalaVersion := System.getProperty("akka.build.scalaVersion", crossScalaVersions.value.head), scalaStmVersion := sys.props.get("akka.build.scalaStmVersion").getOrElse("0.8"), scalaCheckVersion := sys.props.get("akka.build.scalaCheckVersion").getOrElse( - if (scalaVersion.value.startsWith("2.12")) "1.13.4" // does not work for 2.11 - else "1.13.2" + CrossVersion.partialVersion(scalaVersion.value) match { + case Some((2, n)) if n >= 12 => "1.13.4" // does not work for 2.11 + case _ => "1.13.2" + } ), scalaTestVersion := "3.0.0", java8CompatVersion := { - scalaVersion.value match { - case x if x.startsWith("2.12") => "0.8.0" - case _ => "0.7.0" + CrossVersion.partialVersion(scalaVersion.value) match { + case Some((2, n)) if n >= 12 => "0.8.0" + case _ => "0.7.0" } } )