diff --git a/akka-stream-tests/src/test/java/akka/stream/actor/ActorSubscriberTest.java b/akka-stream-tests/src/test/java/akka/stream/actor/ActorSubscriberTest.java index 738acf9046..5787bb28a1 100644 --- a/akka-stream-tests/src/test/java/akka/stream/actor/ActorSubscriberTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/actor/ActorSubscriberTest.java @@ -28,7 +28,7 @@ public class ActorSubscriberTest extends StreamTest { @ClassRule public static AkkaJUnitActorSystemResource actorSystemResource = - new AkkaJUnitActorSystemResource("FlowTest", AkkaSpec.testConf()); + new AkkaJUnitActorSystemResource("ActorSubscriberTest", AkkaSpec.testConf()); public static class TestSubscriber extends UntypedActorSubscriber { diff --git a/akka-stream-tests/src/test/java/akka/stream/io/OutputStreamSourceTest.java b/akka-stream-tests/src/test/java/akka/stream/io/OutputStreamSourceTest.java index 94aabc2786..df6e0bce74 100644 --- a/akka-stream-tests/src/test/java/akka/stream/io/OutputStreamSourceTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/io/OutputStreamSourceTest.java @@ -30,7 +30,7 @@ public class OutputStreamSourceTest extends StreamTest { @ClassRule public static AkkaJUnitActorSystemResource actorSystemResource = - new AkkaJUnitActorSystemResource("OutputStreamSourceTest2", Utils.UnboundedMailboxConfig()); + new AkkaJUnitActorSystemResource("OutputStreamSourceTest", Utils.UnboundedMailboxConfig()); @Test public void mustSendEventsViaOutputStream() throws Exception { diff --git a/akka-stream-tests/src/test/java/akka/stream/io/SinkAsJavaSourceTest.java b/akka-stream-tests/src/test/java/akka/stream/io/SinkAsJavaSourceTest.java index 8cd3c0641c..d812404279 100644 --- a/akka-stream-tests/src/test/java/akka/stream/io/SinkAsJavaSourceTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/io/SinkAsJavaSourceTest.java @@ -27,7 +27,7 @@ public class SinkAsJavaSourceTest extends StreamTest { @ClassRule public static AkkaJUnitActorSystemResource actorSystemResource = - new AkkaJUnitActorSystemResource("OutputStreamSource", Utils.UnboundedMailboxConfig()); + new AkkaJUnitActorSystemResource("SinkAsJavaSourceTest", Utils.UnboundedMailboxConfig()); @Test public void mustBeAbleToUseAsJavaStream() throws Exception { diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java index e2cd937305..78bda44fe5 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java @@ -1301,4 +1301,11 @@ public class FlowTest extends StreamTest { assertEquals((Object) 1, result); } + + @Test + public void mustBeAbleToConvertToJavaInJava() { + final akka.stream.scaladsl.Flow scalaFlow = + akka.stream.scaladsl.Flow.apply(); + Flow javaFlow = scalaFlow.asJava(); + } } diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/KillSwitchTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/KillSwitchTest.java index f4b2b76c40..ae2c72fe1f 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/KillSwitchTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/KillSwitchTest.java @@ -27,7 +27,7 @@ public class KillSwitchTest extends StreamTest { @ClassRule public static AkkaJUnitActorSystemResource actorSystemResource = - new AkkaJUnitActorSystemResource("FlowTest", AkkaSpec.testConf()); + new AkkaJUnitActorSystemResource("KillSwitchTest", AkkaSpec.testConf()); @Test public void beAbleToUseKillSwitch() throws Exception { diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/RunnableGraphTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/RunnableGraphTest.java new file mode 100644 index 0000000000..eccc6057eb --- /dev/null +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/RunnableGraphTest.java @@ -0,0 +1,39 @@ +/* + * Copyright (C) 2019 Lightbend Inc. + */ + +package akka.stream.javadsl; + +import akka.NotUsed; +import akka.stream.StreamTest; +import akka.testkit.AkkaJUnitActorSystemResource; +import akka.testkit.AkkaSpec; +import org.junit.ClassRule; +import org.junit.Test; + +import static org.junit.Assert.*; + +public class RunnableGraphTest extends StreamTest { + public RunnableGraphTest() { + super(actorSystemResource); + } + + @ClassRule + public static AkkaJUnitActorSystemResource actorSystemResource = + new AkkaJUnitActorSystemResource("RunnableGraphTest", AkkaSpec.testConf()); + + @Test + public void beAbleToConvertFromJavaToScala() { + final RunnableGraph javaRunnable = Source.empty().to(Sink.ignore()); + final akka.stream.scaladsl.RunnableGraph scalaRunnable = javaRunnable.asScala(); + assertEquals(NotUsed.getInstance(), scalaRunnable.run(materializer)); + } + + @Test + public void beAbleToConvertFromScalaToJava() { + final akka.stream.scaladsl.RunnableGraph scalaRunnable = + akka.stream.scaladsl.Source.empty().to(akka.stream.scaladsl.Sink.ignore()); + final RunnableGraph javaRunnable = scalaRunnable.asJava(); + assertEquals(NotUsed.getInstance(), javaRunnable.run(materializer)); + } +} diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/SinkTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/SinkTest.java index 3502420adf..145bef5180 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/SinkTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/SinkTest.java @@ -34,7 +34,7 @@ public class SinkTest extends StreamTest { @ClassRule public static AkkaJUnitActorSystemResource actorSystemResource = - new AkkaJUnitActorSystemResource("FlowTest", AkkaSpec.testConf()); + new AkkaJUnitActorSystemResource("SinkTest", AkkaSpec.testConf()); @Test public void mustBeAbleToUseFanoutPublisher() throws Exception { @@ -146,4 +146,11 @@ public class SinkTest extends StreamTest { .addAttributes(Attributes.asyncBoundary()) .named(""); } + + @Test + public void mustBeAbleToConvertToJavaInJava() { + final akka.stream.scaladsl.Sink scalaSink = + akka.stream.scaladsl.Sink.cancelled(); + Sink javaSink = scalaSink.asJava(); + } } diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java index 94d43dd8ec..485be034bb 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java @@ -1112,4 +1112,11 @@ public class SourceTest extends StreamTest { final Pair> p = Source.empty().preMaterialize(materializer); } + + @Test + public void mustBeAbleToConvertToJavaInJava() { + final akka.stream.scaladsl.Source scalaSource = + akka.stream.scaladsl.Source.empty(); + Source javaSource = scalaSource.asJava(); + } } diff --git a/akka-stream-tests/src/test/java/akka/stream/stage/StageTest.java b/akka-stream-tests/src/test/java/akka/stream/stage/StageTest.java index 8a35fbf812..3dd4f44cae 100644 --- a/akka-stream-tests/src/test/java/akka/stream/stage/StageTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/stage/StageTest.java @@ -27,7 +27,7 @@ public class StageTest extends StreamTest { @ClassRule public static AkkaJUnitActorSystemResource actorSystemResource = - new AkkaJUnitActorSystemResource("FlowTest", AkkaSpec.testConf()); + new AkkaJUnitActorSystemResource("StageTest", AkkaSpec.testConf()); @Test public void javaStageUsage() throws Exception { diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/RunnableGraphSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/RunnableGraphSpec.scala index cea94c8237..4b11a17d35 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/RunnableGraphSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/RunnableGraphSpec.scala @@ -7,6 +7,7 @@ package akka.stream.scaladsl import akka.NotUsed import akka.stream.{ ActorMaterializer, Attributes } import akka.stream.testkit.StreamSpec +import akka.stream.javadsl class RunnableGraphSpec extends StreamSpec { @@ -18,8 +19,18 @@ class RunnableGraphSpec extends StreamSpec { import Attributes._ val r: RunnableGraph[NotUsed] = RunnableGraph.fromGraph(Source.empty.to(Sink.ignore)).async.addAttributes(none).named("useless") - r.traversalBuilder.attributes.getFirst[Name] shouldEqual Some(Name("useless")) - r.traversalBuilder.attributes.getFirst[AsyncBoundary.type] shouldEqual (Some(AsyncBoundary)) + r.traversalBuilder.attributes.get[Name] shouldEqual Some(Name("useless")) + r.traversalBuilder.attributes.get[AsyncBoundary.type] shouldEqual (Some(AsyncBoundary)) + } + + "allow conversion from scala to java" in { + val runnable: javadsl.RunnableGraph[NotUsed] = Source.empty.to(Sink.ignore).asJava + runnable.run(materializer) shouldBe NotUsed + } + + "allow conversion from java to scala" in { + val runnable: RunnableGraph[NotUsed] = javadsl.Source.empty.to(javadsl.Sink.ignore).asScala + runnable.run() shouldBe NotUsed } } diff --git a/akka-stream/src/main/mima-filters/2.5.20.backward.excludes b/akka-stream/src/main/mima-filters/2.5.20.backward.excludes new file mode 100644 index 0000000000..7312f7b3af --- /dev/null +++ b/akka-stream/src/main/mima-filters/2.5.20.backward.excludes @@ -0,0 +1,2 @@ +# #26246 Add asJava/asScala to RunnableGraph, not to be extended by users +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.javadsl.RunnableGraph.asScala") diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index b0aca138fc..3e4e7d6091 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -3313,6 +3313,8 @@ object RunnableGraph { if (newRunnable eq runnable) this else new RunnableGraphAdapter(newRunnable) } + + override def asScala: scaladsl.RunnableGraph[Mat] = runnable } } /** @@ -3338,4 +3340,9 @@ abstract class RunnableGraph[+Mat] extends Graph[ClosedShape, Mat] { override def named(name: String): RunnableGraph[Mat] = withAttributes(Attributes.name(name)) + + /** + * Converts this Java DSL element to its Scala DSL counterpart. + */ + def asScala: scaladsl.RunnableGraph[Mat] } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Hub.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Hub.scala index 12816d0d8f..ec71ccc6c4 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Hub.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Hub.scala @@ -35,7 +35,7 @@ object MergeHub { */ def of[T](clazz: Class[T], perProducerBufferSize: Int): Source[T, Sink[T, NotUsed]] = { akka.stream.scaladsl.MergeHub.source[T](perProducerBufferSize) - .mapMaterializedValue(_.asJava[T, NotUsed]) + .mapMaterializedValue(_.asJava[T]) .asJava } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 2c5a13088e..a1d76ccfc3 100755 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -309,7 +309,7 @@ final class Flow[-In, +Out, +Mat]( } /** Converts this Scala DSL element to it's Java DSL counterpart. */ - def asJava[JIn <: In, JOut >: Out, JMat >: Mat]: javadsl.Flow[JIn, JOut, JMat] = + def asJava[JIn <: In]: javadsl.Flow[JIn, Out @uncheckedVariance, Mat @uncheckedVariance] = new javadsl.Flow(this) } @@ -613,6 +613,9 @@ final case class RunnableGraph[+Mat](override val traversalBuilder: TraversalBui */ override def async(dispatcher: String, inputBufferSize: Int): RunnableGraph[Mat] = super.async(dispatcher, inputBufferSize).asInstanceOf[RunnableGraph[Mat]] + + /** Converts this Scala DSL element to it's Java DSL counterpart. */ + def asJava: javadsl.RunnableGraph[Mat] = javadsl.RunnableGraph.fromGraph(this) } /** diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala index 9d0a5778d6..2a91306354 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala @@ -23,6 +23,8 @@ import scala.util.{ Failure, Success, Try } import scala.collection.immutable import akka.util.ccompat._ +import scala.annotation.unchecked.uncheckedVariance + /** * A `Sink` is a set of stream processing steps that has one open input. * Can be used as a `Subscriber` @@ -120,7 +122,7 @@ final class Sink[-In, +Mat]( /** * Converts this Scala DSL element to it's Java DSL counterpart. */ - def asJava[JIn <: In, JMat >: Mat]: javadsl.Sink[JIn, JMat] = new javadsl.Sink(this) + def asJava[JIn <: In]: javadsl.Sink[JIn, Mat @uncheckedVariance] = new javadsl.Sink(this) } object Sink { diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index 0861849ef1..0676cd1c00 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -194,7 +194,7 @@ final class Source[+Out, +Mat]( /** * Converts this Scala DSL element to it's Java DSL counterpart. */ - def asJava[JOut >: Out, JMat >: Mat]: javadsl.Source[JOut, JMat] = new javadsl.Source(this) + def asJava: javadsl.Source[Out @uncheckedVariance, Mat @uncheckedVariance] = new javadsl.Source(this) /** * Combines several sources with fan-in strategy like `Merge` or `Concat` and returns `Source`.