diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java index c2c894cc30..aedf44edd8 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java @@ -490,11 +490,11 @@ public class FlowTest extends StreamTest { @Test public void mustBeAbleToUseWatchTermination() throws Exception { final List input = Arrays.asList("A", "B", "C"); - Future future = Source.from(input) - .watchTermination(Keep.>right()) + CompletionStage future = Source.from(input) + .watchTermination(Keep.right()) .to(Sink.ignore()).run(materializer); - assertEquals(Done.getInstance(), Await.result(future, FiniteDuration.create(3, TimeUnit.SECONDS))); + assertEquals(Done.getInstance(), future.toCompletableFuture().get(3, TimeUnit.SECONDS)); } @Test diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index ada3754bf4..7181f6bb4b 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -1563,8 +1563,8 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends * from downstream. It fails with the same error when received error message from * downstream. */ - def watchTermination[M]()(matF: function.Function2[Mat, Future[Done], M]): javadsl.Flow[In, Out, M] = - new Flow(delegate.watchTermination()(combinerToScala(matF))) + def watchTermination[M]()(matF: function.Function2[Mat, CompletionStage[Done], M]): javadsl.Flow[In, Out, M] = + new Flow(delegate.watchTermination()((left, right) => matF(left, right.toJava))) /** * Delays the initial element by the specified duration. diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index aba870e376..0f6695792f 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -1738,8 +1738,8 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap * from downstream. It fails with the same error when received error message from * downstream. */ - def watchTermination[M]()(matF: function.Function2[Mat, Future[Done], M]): javadsl.Source[Out, M] = - new Source(delegate.watchTermination()(combinerToScala(matF))) + def watchTermination[M]()(matF: function.Function2[Mat, CompletionStage[Done], M]): javadsl.Source[Out, M] = + new Source(delegate.watchTermination()((left, right) => matF(left, right.toJava))) /** * Delays the initial element by the specified duration.