From 18a3569a7e6dfd5ef8f884e85ed9dab7bd544660 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Wed, 15 May 2019 10:28:12 +0200 Subject: [PATCH] Java interop via for FlowWithContext #26896 --- .../stream/javadsl/FlowWithContextTest.java | 67 +++++++++++++++++++ .../mima-filters/2.5.22.backwards.excludes | 3 + .../akka/stream/javadsl/FlowWithContext.scala | 37 +++++----- .../stream/scaladsl/FlowWithContext.scala | 12 +++- 4 files changed, 97 insertions(+), 22 deletions(-) create mode 100644 akka-stream-tests/src/test/java/akka/stream/javadsl/FlowWithContextTest.java diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowWithContextTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowWithContextTest.java new file mode 100644 index 0000000000..a442e2895e --- /dev/null +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowWithContextTest.java @@ -0,0 +1,67 @@ +/* + * Copyright (C) 2019 Lightbend Inc. + */ + +package akka.stream.javadsl; + +import akka.NotUsed; +import akka.japi.Pair; +import akka.stream.StreamTest; +import akka.testkit.AkkaJUnitActorSystemResource; +import akka.testkit.AkkaSpec; +import org.junit.ClassRule; +import org.junit.Test; + +import java.util.List; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.TimeUnit; + +import static akka.NotUsed.notUsed; +import static org.junit.Assert.assertEquals; + +public class FlowWithContextTest extends StreamTest { + + public FlowWithContextTest() { + super(actorSystemResource); + } + + @ClassRule + public static AkkaJUnitActorSystemResource actorSystemResource = + new AkkaJUnitActorSystemResource("FlowWithContextTest", AkkaSpec.testConf()); + + @Test + public void simpleCaseHappyPath() throws Exception { + final FlowWithContext flow = + FlowWithContext.create(); + + final CompletionStage>> result = + Source.single(new Pair<>(1, "context")) + .via(flow.map(n -> n + 1).mapContext(ctx -> ctx + "-mapped")) + .runWith(Sink.seq(), materializer); + final List> pairs = result.toCompletableFuture().get(3, TimeUnit.SECONDS); + assertEquals(1, pairs.size()); + assertEquals(Integer.valueOf(2), pairs.get(0).first()); + assertEquals("context-mapped", pairs.get(0).second()); + } + + @Test + public void mustAllowComposingFlows() throws Exception { + final FlowWithContext flow1 = + FlowWithContext.create(); + final FlowWithContext flow2 = + FlowWithContext.create().map(Object::toString); + + final FlowWithContext flow3 = flow1.via(flow2); + + final CompletionStage>> result = + Source.single(new Pair<>(1, notUsed())) + .via(flow3.asFlow()) + .runWith(Sink.seq(), materializer); + + List> pairs = result.toCompletableFuture().get(3, TimeUnit.SECONDS); + + assertEquals(1, pairs.size()); + assertEquals("1", pairs.get(0).first()); + assertEquals(notUsed(), pairs.get(0).second()); + } +} diff --git a/akka-stream/src/main/mima-filters/2.5.22.backwards.excludes b/akka-stream/src/main/mima-filters/2.5.22.backwards.excludes index edade4e1ec..b25b984150 100644 --- a/akka-stream/src/main/mima-filters/2.5.22.backwards.excludes +++ b/akka-stream/src/main/mima-filters/2.5.22.backwards.excludes @@ -15,3 +15,6 @@ ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.MergeHu # #25045 adding Java/Scala interop to SourceQueue and SinkQueue ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.SinkQueueAdapter") ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.SourceQueueAdapter") + +# 26896 JavaDSL FlowWithContext +ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.stream.javadsl.FlowWithContext.this") \ No newline at end of file diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/FlowWithContext.scala b/akka-stream/src/main/scala/akka/stream/javadsl/FlowWithContext.scala index ab9ea93fe0..90fa46f016 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/FlowWithContext.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/FlowWithContext.scala @@ -22,22 +22,16 @@ import scala.compat.java8.FutureConverters._ @ApiMayChange object FlowWithContext { - def create[In, Ctx](): FlowWithContext[In, Ctx, In, Ctx, akka.NotUsed] = { - new FlowWithContext(scaladsl.FlowWithContext[In, Ctx]) - } + def create[In, Ctx](): FlowWithContext[In, Ctx, In, Ctx, akka.NotUsed] = + new FlowWithContext(Flow.create[Pair[In, Ctx]]()) /** * Creates a FlowWithContext from a regular flow that operates on `Pair` elements. */ def fromPairs[In, CtxIn, Out, CtxOut, Mat]( - under: Flow[Pair[In, CtxIn], Pair[Out, CtxOut], Mat]): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] = { - new FlowWithContext( - scaladsl.FlowWithContext.fromTuples( - scaladsl - .Flow[(In, CtxIn)] - .map { case (i, c) => Pair(i, c) } - .viaMat(under.asScala.map(_.toScala))(scaladsl.Keep.right))) - } + under: Flow[Pair[In, CtxIn], Pair[Out, CtxOut], Mat]): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] = + new FlowWithContext(under) + } /** @@ -51,8 +45,8 @@ object FlowWithContext { * API MAY CHANGE */ @ApiMayChange -final class FlowWithContext[-In, -CtxIn, +Out, +CtxOut, +Mat]( - delegate: scaladsl.FlowWithContext[In, CtxIn, Out, CtxOut, Mat]) +final class FlowWithContext[In, CtxIn, Out, CtxOut, +Mat]( + delegate: javadsl.Flow[Pair[In, CtxIn], Pair[Out, CtxOut], Mat]) extends GraphDelegate(delegate) { /** @@ -83,12 +77,7 @@ final class FlowWithContext[-In, -CtxIn, +Out, +CtxOut, +Mat]( * Creates a regular flow of pairs (data, context). */ def asFlow(): Flow[Pair[In, CtxIn], Pair[Out, CtxOut], Mat] @uncheckedVariance = - scaladsl - .Flow[Pair[In, CtxIn]] - .map(_.toScala) - .viaMat(delegate.asFlow)(scaladsl.Keep.right) - .map { case (o, c) => Pair(o, c) } - .asJava + delegate // remaining operations in alphabetic order @@ -240,7 +229,12 @@ final class FlowWithContext[-In, -CtxIn, +Out, +CtxOut, +Mat]( def log(name: String): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] = this.log(name, ConstantFun.javaIdentityFunction[Out], null) - def asScala: scaladsl.FlowWithContext[In, CtxIn, Out, CtxOut, Mat] = delegate + def asScala: scaladsl.FlowWithContext[In, CtxIn, Out, CtxOut, Mat] = + scaladsl.FlowWithContext.fromTuples( + scaladsl + .Flow[(In, CtxIn)] + .map { case (i, c) => Pair(i, c) } + .viaMat(delegate.asScala.map(_.toScala))(scaladsl.Keep.right)) private[this] def viaScala[In2, CtxIn2, Out2, CtxOut2, Mat2]( f: scaladsl.FlowWithContext[In, CtxIn, Out, CtxOut, Mat] => scaladsl.FlowWithContext[ @@ -249,5 +243,6 @@ final class FlowWithContext[-In, -CtxIn, +Out, +CtxOut, +Mat]( Out2, CtxOut2, Mat2]): FlowWithContext[In2, CtxIn2, Out2, CtxOut2, Mat2] = - new FlowWithContext(f(delegate)) + f(this.asScala).asJava + } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/FlowWithContext.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/FlowWithContext.scala index 075bb4a526..0f79b2e75f 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/FlowWithContext.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/FlowWithContext.scala @@ -4,8 +4,11 @@ package akka.stream.scaladsl +import akka.NotUsed + import scala.annotation.unchecked.uncheckedVariance import akka.annotation.ApiMayChange +import akka.japi.Pair import akka.stream._ /** @@ -66,5 +69,12 @@ final class FlowWithContext[-In, -CtxIn, +Out, +CtxOut, +Mat](delegate: Flow[(In def asJava[JIn <: In, JCtxIn <: CtxIn, JOut >: Out, JCtxOut >: CtxOut, JMat >: Mat] : javadsl.FlowWithContext[JIn, JCtxIn, JOut, JCtxOut, JMat] = - new javadsl.FlowWithContext(this) + new javadsl.FlowWithContext( + javadsl.Flow + .create[Pair[JIn, JCtxIn]]() + .map(_.toScala) + .viaMat(delegate.map { + case (first, second) => + Pair[JOut, JCtxOut](first, second) + }.asJava, javadsl.Keep.right[NotUsed, JMat])) }