From db0a473cd5c0d471978623625c3212a5678a6dd4 Mon Sep 17 00:00:00 2001 From: ortigali Date: Tue, 14 Mar 2017 15:51:44 +0500 Subject: [PATCH] use CompletionStage instead of Future in java doc classes (#22472) * use CompletionStage instead of Future in FactorialBackend.java #22393 * use CompletionStage instead of Future in FactorialBackend.java #22393 2 --- .../circuitbreaker/DangerousJavaActor.java | 29 +++------ .../code/docs/actorlambda/ActorDocTest.java | 62 +++++++------------ .../code/docs/camel/ProducerTestBase.java | 6 +- .../code/docs/cluster/FactorialBackend.java | 18 ++---- .../java/code/docs/pattern/SupervisedAsk.java | 24 +++---- .../code/docs/pattern/SupervisedAskSpec.java | 10 +-- .../persistence/PersistenceQueryDocTest.java | 10 ++- .../code/docs/stream/BidiFlowDocTest.java | 4 -- .../java/code/docs/stream/FlowDocTest.java | 4 -- .../code/docs/stream/FlowErrorDocTest.java | 3 - .../code/docs/stream/GraphDSLDocTest.java | 3 - .../code/docs/stream/GraphStageDocTest.java | 2 - .../stream/RateTransformationDocTest.java | 2 - .../TwitterStreamQuickstartDocTest.java | 4 -- .../docs/stream/io/StreamFileDocTest.java | 1 - .../code/docs/stream/io/StreamTcpDocTest.java | 2 - .../cookbook/RecipeDroppyBroadcast.java | 1 - .../cookbook/RecipeMultiGroupByTest.java | 3 - .../cookbook/RecipeReduceByKeyTest.java | 2 - .../stream/javadsl/cookbook/RecipeSeq.java | 3 - .../javadsl/cookbook/RecipeWorkerPool.java | 2 - .../code/docs/testkit/TestKitDocTest.java | 11 ++-- .../java/akka/stream/javadsl/SourceTest.java | 7 --- 23 files changed, 67 insertions(+), 146 deletions(-) diff --git a/akka-docs/rst/common/code/docs/circuitbreaker/DangerousJavaActor.java b/akka-docs/rst/common/code/docs/circuitbreaker/DangerousJavaActor.java index ea84a943e2..af9a7de764 100644 --- a/akka-docs/rst/common/code/docs/circuitbreaker/DangerousJavaActor.java +++ b/akka-docs/rst/common/code/docs/circuitbreaker/DangerousJavaActor.java @@ -6,16 +6,14 @@ package docs.circuitbreaker; //#imports1 import akka.actor.AbstractActor; -import scala.concurrent.Future; import akka.event.LoggingAdapter; import scala.concurrent.duration.Duration; import akka.pattern.CircuitBreaker; import akka.event.Logging; -import static akka.pattern.Patterns.pipe; -import static akka.dispatch.Futures.future; +import static akka.pattern.PatternsCS.pipe; -import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; //#imports1 @@ -29,11 +27,7 @@ public class DangerousJavaActor extends AbstractActor { this.breaker = new CircuitBreaker( getContext().dispatcher(), getContext().system().scheduler(), 5, Duration.create(10, "s"), Duration.create(1, "m")) - .onOpen(new Runnable() { - public void run() { - notifyMeOnOpen(); - } - }); + .onOpen(this::notifyMeOnOpen); } public void notifyMeOnOpen() { @@ -49,17 +43,14 @@ public class DangerousJavaActor extends AbstractActor { @Override public Receive createReceive() { return receiveBuilder(). - match(String.class, m -> "is my middle name".equals(m), m -> { - pipe( - breaker.callWithCircuitBreaker(() -> - future(() -> dangerousCall(), getContext().dispatcher()) - ), getContext().dispatcher() - ).to(sender()); - }) - .match(String.class, m -> "block for me".equals(m), m -> { + match(String.class, "is my middle name"::equals, m -> pipe( + breaker.callWithCircuitBreakerCS(() -> + CompletableFuture.supplyAsync(this::dangerousCall) + ), getContext().dispatcher() + ).to(sender())) + .match(String.class, "block for me"::equals, m -> { sender().tell(breaker - .callWithSyncCircuitBreaker( - () -> dangerousCall()), self()); + .callWithSyncCircuitBreaker(this::dangerousCall), self()); }) .build(); } diff --git a/akka-docs/rst/java/code/docs/actorlambda/ActorDocTest.java b/akka-docs/rst/java/code/docs/actorlambda/ActorDocTest.java index 83f4066815..5f2b14993d 100644 --- a/akka-docs/rst/java/code/docs/actorlambda/ActorDocTest.java +++ b/akka-docs/rst/java/code/docs/actorlambda/ActorDocTest.java @@ -5,30 +5,20 @@ package docs.actorlambda; import akka.actor.*; -import akka.japi.pf.ReceiveBuilder; -import akka.testkit.ErrorFilter; -import akka.testkit.EventFilter; -import akka.testkit.TestEvent; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import docs.AbstractJavaTest; import static docs.actorlambda.Messages.Swap.Swap; import static docs.actorlambda.Messages.*; -import static akka.japi.Util.immutableSeq; import akka.actor.CoordinatedShutdown; import akka.util.Timeout; import akka.Done; -import java.util.concurrent.CompletionStage; -import java.util.ArrayList; -import java.util.Iterator; import java.util.Optional; import java.util.concurrent.TimeUnit; import akka.testkit.TestActors; -import akka.dispatch.Mapper; -import akka.dispatch.Futures; -import akka.util.Timeout; +import scala.concurrent.Await; import akka.testkit.JavaTestKit; import org.junit.AfterClass; @@ -49,20 +39,16 @@ import akka.actor.ActorSelection; import akka.actor.Identify; //#import-identify //#import-ask -import static akka.pattern.Patterns.ask; -import static akka.pattern.Patterns.pipe; -import akka.dispatch.Futures; -import akka.dispatch.Mapper; +import static akka.pattern.PatternsCS.ask; +import static akka.pattern.PatternsCS.pipe; import akka.util.Timeout; +import java.util.concurrent.CompletableFuture; //#import-ask //#import-gracefulStop +import static akka.pattern.PatternsCS.gracefulStop; import akka.pattern.AskTimeoutException; -import scala.concurrent.Await; import scala.concurrent.duration.Duration; -//#import-ask -import scala.concurrent.Future; -import static akka.pattern.Patterns.gracefulStop; -//#import-ask +import java.util.concurrent.CompletionStage; //#import-gracefulStop //#import-terminated import akka.actor.Terminated; @@ -160,7 +146,7 @@ public class ActorDocTest extends AbstractJavaTest { public void onReceive(Object msg) throws Exception { if (msg instanceof Msg1) receiveMsg1((Msg1) msg); - else if (msg instanceof Msg1) + else if (msg instanceof Msg2) receiveMsg2((Msg2) msg); else if (msg instanceof Msg3) receiveMsg3((Msg3) msg); @@ -424,9 +410,9 @@ public class ActorDocTest extends AbstractJavaTest { ActorRef actorRef = system.actorOf(Props.create(Manager.class)); //#gracefulStop try { - Future stopped = + CompletionStage stopped = gracefulStop(actorRef, Duration.create(5, TimeUnit.SECONDS), Manager.SHUTDOWN); - Await.result(stopped, Duration.create(6, TimeUnit.SECONDS)); + stopped.toCompletableFuture().get(6, TimeUnit.SECONDS); // the actor has been stopped } catch (AskTimeoutException e) { // the actor wasn't stopped within 5 seconds @@ -768,25 +754,23 @@ public class ActorDocTest extends AbstractJavaTest { ActorRef actorC = getRef(); //#ask-pipe - final Timeout t = new Timeout(Duration.create(5, TimeUnit.SECONDS)); + Timeout t = new Timeout(Duration.create(5, TimeUnit.SECONDS)); - final ArrayList> futures = new ArrayList>(); - futures.add(ask(actorA, "request", 1000)); // using 1000ms timeout - futures.add(ask(actorB, "another request", t)); // using timeout from - // above + // using 1000ms timeout + CompletableFuture future1 = + ask(actorA, "request", 1000).toCompletableFuture(); - final Future> aggregate = Futures.sequence(futures, - system.dispatcher()); + // using timeout from above + CompletableFuture future2 = + ask(actorB, "another request", t).toCompletableFuture(); - final Future transformed = aggregate.map( - new Mapper, Result>() { - public Result apply(Iterable coll) { - final Iterator it = coll.iterator(); - final String x = (String) it.next(); - final String s = (String) it.next(); - return new Result(x, s); - } - }, system.dispatcher()); + CompletableFuture transformed = + CompletableFuture.allOf(future1, future2) + .thenApply(v -> { + String x = (String) future1.join(); + String s = (String) future2.join(); + return new Result(x, s); + }); pipe(transformed, system.dispatcher()).to(actorC); //#ask-pipe diff --git a/akka-docs/rst/java/code/docs/camel/ProducerTestBase.java b/akka-docs/rst/java/code/docs/camel/ProducerTestBase.java index e8d95526cb..f13712ba63 100644 --- a/akka-docs/rst/java/code/docs/camel/ProducerTestBase.java +++ b/akka-docs/rst/java/code/docs/camel/ProducerTestBase.java @@ -2,14 +2,14 @@ package docs.camel; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.CompletionStage; import akka.testkit.JavaTestKit; -import scala.concurrent.Future; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Props; import akka.camel.CamelMessage; -import akka.pattern.Patterns; +import akka.pattern.PatternsCS; public class ProducerTestBase { public void tellJmsProducer() { @@ -29,7 +29,7 @@ public class ProducerTestBase { ActorSystem system = ActorSystem.create("some-system"); Props props = Props.create(FirstProducer.class); ActorRef producer = system.actorOf(props,"myproducer"); - Future future = Patterns.ask(producer, "some request", 1000); + CompletionStage future = PatternsCS.ask(producer, "some request", 1000); //#AskProducer system.stop(producer); JavaTestKit.shutdownActorSystem(system); diff --git a/akka-docs/rst/java/code/docs/cluster/FactorialBackend.java b/akka-docs/rst/java/code/docs/cluster/FactorialBackend.java index 2411565048..3fe82524bb 100644 --- a/akka-docs/rst/java/code/docs/cluster/FactorialBackend.java +++ b/akka-docs/rst/java/code/docs/cluster/FactorialBackend.java @@ -1,11 +1,10 @@ package docs.cluster; import java.math.BigInteger; -import scala.concurrent.Future; +import java.util.concurrent.CompletableFuture; + import akka.actor.AbstractActor; -import akka.dispatch.Mapper; -import static akka.dispatch.Futures.future; -import static akka.pattern.Patterns.pipe; +import static akka.pattern.PatternsCS.pipe; //#backend public class FactorialBackend extends AbstractActor { @@ -14,15 +13,10 @@ public class FactorialBackend extends AbstractActor { public Receive createReceive() { return receiveBuilder() .match(Integer.class, n -> { - Future f = future(() -> factorial(n), - getContext().dispatcher()); - Future result = f.map( - new Mapper() { - public FactorialResult apply(BigInteger factorial) { - return new FactorialResult(n, factorial); - } - }, getContext().dispatcher()); + CompletableFuture result = + CompletableFuture.supplyAsync(() -> factorial(n)) + .thenApply((factorial) -> new FactorialResult(n, factorial)); pipe(result, getContext().dispatcher()).to(sender()); diff --git a/akka-docs/rst/java/code/docs/pattern/SupervisedAsk.java b/akka-docs/rst/java/code/docs/pattern/SupervisedAsk.java index efe36ea61f..43db7c9533 100644 --- a/akka-docs/rst/java/code/docs/pattern/SupervisedAsk.java +++ b/akka-docs/rst/java/code/docs/pattern/SupervisedAsk.java @@ -1,11 +1,10 @@ package docs.pattern; +import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeoutException; -import scala.concurrent.Future; import scala.concurrent.duration.Duration; -import akka.actor.Actor; import akka.actor.ActorKilledException; import akka.actor.ActorRef; import akka.actor.ActorRefFactory; @@ -15,11 +14,9 @@ import akka.actor.Props; import akka.actor.Scheduler; import akka.actor.Status; import akka.actor.SupervisorStrategy; -import akka.actor.SupervisorStrategy.Directive; import akka.actor.Terminated; import akka.actor.AbstractActor; -import akka.japi.Function; -import akka.pattern.Patterns; +import akka.pattern.PatternsCS; import akka.util.Timeout; public class SupervisedAsk { @@ -61,13 +58,10 @@ public class SupervisedAsk { @Override public SupervisorStrategy supervisorStrategy() { - return new OneForOneStrategy(0, Duration.Zero(), - new Function() { - public Directive apply(Throwable cause) { - caller.tell(new Status.Failure(cause), self()); - return SupervisorStrategy.stop(); - } - }); + return new OneForOneStrategy(0, Duration.Zero(), cause -> { + caller.tell(new Status.Failure(cause), self()); + return SupervisorStrategy.stop(); + }); } @Override @@ -99,10 +93,10 @@ public class SupervisedAsk { } } - public static Future askOf(ActorRef supervisorCreator, Props props, - Object message, Timeout timeout) { + public static CompletionStage askOf(ActorRef supervisorCreator, Props props, + Object message, Timeout timeout) { AskParam param = new AskParam(props, message, timeout); - return Patterns.ask(supervisorCreator, param, timeout); + return PatternsCS.ask(supervisorCreator, param, timeout); } synchronized public static ActorRef createSupervisorCreator( diff --git a/akka-docs/rst/java/code/docs/pattern/SupervisedAskSpec.java b/akka-docs/rst/java/code/docs/pattern/SupervisedAskSpec.java index f8682bbf24..90483962d0 100644 --- a/akka-docs/rst/java/code/docs/pattern/SupervisedAskSpec.java +++ b/akka-docs/rst/java/code/docs/pattern/SupervisedAskSpec.java @@ -1,12 +1,13 @@ package docs.pattern; -import scala.concurrent.Await; -import scala.concurrent.Future; import akka.actor.ActorRef; import akka.actor.ActorRefFactory; import akka.actor.Props; import akka.actor.AbstractActor; import akka.util.Timeout; +import scala.concurrent.duration.FiniteDuration; + +import java.util.concurrent.CompletionStage; public class SupervisedAskSpec { @@ -17,9 +18,10 @@ public class SupervisedAskSpec { try { ActorRef supervisorCreator = SupervisedAsk .createSupervisorCreator(actorSystem); - Future finished = SupervisedAsk.askOf(supervisorCreator, + CompletionStage finished = SupervisedAsk.askOf(supervisorCreator, Props.create(someActor), message, timeout); - return Await.result(finished, timeout.duration()); + FiniteDuration d = timeout.duration(); + return finished.toCompletableFuture().get(d.length(), d.unit()); } catch (Exception e) { // exception propagated by supervision throw e; diff --git a/akka-docs/rst/java/code/docs/persistence/PersistenceQueryDocTest.java b/akka-docs/rst/java/code/docs/persistence/PersistenceQueryDocTest.java index 0c4bdbbad5..c30050fb5c 100644 --- a/akka-docs/rst/java/code/docs/persistence/PersistenceQueryDocTest.java +++ b/akka-docs/rst/java/code/docs/persistence/PersistenceQueryDocTest.java @@ -14,7 +14,6 @@ import akka.persistence.query.Offset; import com.typesafe.config.Config; import akka.actor.*; -import akka.japi.pf.ReceiveBuilder; import akka.persistence.query.*; import akka.stream.ActorMaterializer; import akka.stream.javadsl.Sink; @@ -23,7 +22,6 @@ import akka.util.Timeout; import docs.persistence.query.MyEventsByTagPublisher; import org.reactivestreams.Subscriber; -import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; import java.util.ArrayList; @@ -272,11 +270,11 @@ public class PersistenceQueryDocTest { readJournal.eventsByTag("blue", new Sequence(0L)); // find top 10 blue things: - final Future> top10BlueThings = - (Future>) blueThings - .map(t -> t.event()) + final CompletionStage> top10BlueThings = + blueThings + .map(EventEnvelope::event) .take(10) // cancels the query stream after pulling 10 elements - .>runFold(new ArrayList<>(10), (acc, e) -> { + .runFold(new ArrayList<>(10), (acc, e) -> { acc.add(e); return acc; }, mat); diff --git a/akka-docs/rst/java/code/docs/stream/BidiFlowDocTest.java b/akka-docs/rst/java/code/docs/stream/BidiFlowDocTest.java index f26949a998..9338078530 100644 --- a/akka-docs/rst/java/code/docs/stream/BidiFlowDocTest.java +++ b/akka-docs/rst/java/code/docs/stream/BidiFlowDocTest.java @@ -25,10 +25,6 @@ import akka.testkit.JavaTestKit; import akka.util.ByteIterator; import akka.util.ByteString; import akka.util.ByteStringBuilder; -import scala.concurrent.Await; -import scala.concurrent.Future; -import scala.concurrent.duration.Duration; -import scala.concurrent.duration.FiniteDuration; import static org.junit.Assert.assertArrayEquals; public class BidiFlowDocTest extends AbstractJavaTest { diff --git a/akka-docs/rst/java/code/docs/stream/FlowDocTest.java b/akka-docs/rst/java/code/docs/stream/FlowDocTest.java index d93a7cb4c6..e815cc84a7 100644 --- a/akka-docs/rst/java/code/docs/stream/FlowDocTest.java +++ b/akka-docs/rst/java/code/docs/stream/FlowDocTest.java @@ -18,12 +18,8 @@ import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; -import scala.concurrent.Await; -import scala.concurrent.Future; -import scala.concurrent.Promise; import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; -import scala.Option; import akka.actor.ActorSystem; import akka.actor.Cancellable; import akka.dispatch.Futures; diff --git a/akka-docs/rst/java/code/docs/stream/FlowErrorDocTest.java b/akka-docs/rst/java/code/docs/stream/FlowErrorDocTest.java index c228683158..1f762f6da9 100644 --- a/akka-docs/rst/java/code/docs/stream/FlowErrorDocTest.java +++ b/akka-docs/rst/java/code/docs/stream/FlowErrorDocTest.java @@ -15,9 +15,6 @@ import docs.AbstractJavaTest; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; -import scala.concurrent.Await; -import scala.concurrent.Future; -import scala.concurrent.duration.Duration; import akka.actor.ActorSystem; import akka.stream.ActorMaterializer; diff --git a/akka-docs/rst/java/code/docs/stream/GraphDSLDocTest.java b/akka-docs/rst/java/code/docs/stream/GraphDSLDocTest.java index 6b4362c551..2d02ebb156 100644 --- a/akka-docs/rst/java/code/docs/stream/GraphDSLDocTest.java +++ b/akka-docs/rst/java/code/docs/stream/GraphDSLDocTest.java @@ -18,9 +18,6 @@ import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; -import scala.concurrent.Await; -import scala.concurrent.Future; -import scala.concurrent.duration.Duration; import akka.actor.ActorSystem; import akka.japi.Pair; import akka.stream.*; diff --git a/akka-docs/rst/java/code/docs/stream/GraphStageDocTest.java b/akka-docs/rst/java/code/docs/stream/GraphStageDocTest.java index feb30c8d9d..64773f7f64 100644 --- a/akka-docs/rst/java/code/docs/stream/GraphStageDocTest.java +++ b/akka-docs/rst/java/code/docs/stream/GraphStageDocTest.java @@ -24,9 +24,7 @@ import org.junit.BeforeClass; import org.junit.Test; import org.reactivestreams.Subscription; import scala.Tuple2; -import scala.concurrent.Await; import scala.concurrent.ExecutionContext; -import scala.concurrent.Future; import scala.concurrent.Promise; import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; diff --git a/akka-docs/rst/java/code/docs/stream/RateTransformationDocTest.java b/akka-docs/rst/java/code/docs/stream/RateTransformationDocTest.java index bc94d79fd6..05b1521d05 100644 --- a/akka-docs/rst/java/code/docs/stream/RateTransformationDocTest.java +++ b/akka-docs/rst/java/code/docs/stream/RateTransformationDocTest.java @@ -31,9 +31,7 @@ import akka.stream.testkit.javadsl.TestSink; import akka.stream.testkit.javadsl.TestSource; import akka.testkit.JavaTestKit; import akka.testkit.TestLatch; -import scala.collection.Iterator; import scala.concurrent.Await; -import scala.concurrent.Future; import scala.concurrent.duration.Duration; import scala.util.Random; diff --git a/akka-docs/rst/java/code/docs/stream/TwitterStreamQuickstartDocTest.java b/akka-docs/rst/java/code/docs/stream/TwitterStreamQuickstartDocTest.java index afbf1509e1..d3765ed22b 100644 --- a/akka-docs/rst/java/code/docs/stream/TwitterStreamQuickstartDocTest.java +++ b/akka-docs/rst/java/code/docs/stream/TwitterStreamQuickstartDocTest.java @@ -22,10 +22,6 @@ import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; -import scala.concurrent.Await; -import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; - import java.util.ArrayList; import java.util.Arrays; import java.util.Set; diff --git a/akka-docs/rst/java/code/docs/stream/io/StreamFileDocTest.java b/akka-docs/rst/java/code/docs/stream/io/StreamFileDocTest.java index fc7f5b46d9..302dea4db1 100644 --- a/akka-docs/rst/java/code/docs/stream/io/StreamFileDocTest.java +++ b/akka-docs/rst/java/code/docs/stream/io/StreamFileDocTest.java @@ -19,7 +19,6 @@ import docs.stream.SilenceSystemOut; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; -import scala.concurrent.Future; import akka.stream.*; import akka.testkit.JavaTestKit; diff --git a/akka-docs/rst/java/code/docs/stream/io/StreamTcpDocTest.java b/akka-docs/rst/java/code/docs/stream/io/StreamTcpDocTest.java index 88ef4e58e0..fa7635e0fc 100644 --- a/akka-docs/rst/java/code/docs/stream/io/StreamTcpDocTest.java +++ b/akka-docs/rst/java/code/docs/stream/io/StreamTcpDocTest.java @@ -15,13 +15,11 @@ import java.net.InetSocketAddress; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; -import scala.concurrent.Future; import akka.actor.ActorSystem; import akka.stream.*; import akka.stream.javadsl.*; import akka.stream.javadsl.Tcp.*; -import akka.stream.stage.*; import akka.testkit.JavaTestKit; import akka.testkit.SocketUtil; import akka.testkit.TestProbe; diff --git a/akka-docs/rst/java/code/docs/stream/javadsl/cookbook/RecipeDroppyBroadcast.java b/akka-docs/rst/java/code/docs/stream/javadsl/cookbook/RecipeDroppyBroadcast.java index 38100ccddf..e2c3fee9f2 100644 --- a/akka-docs/rst/java/code/docs/stream/javadsl/cookbook/RecipeDroppyBroadcast.java +++ b/akka-docs/rst/java/code/docs/stream/javadsl/cookbook/RecipeDroppyBroadcast.java @@ -12,7 +12,6 @@ import akka.testkit.JavaTestKit; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; -import scala.concurrent.Future; import java.util.ArrayList; import java.util.List; diff --git a/akka-docs/rst/java/code/docs/stream/javadsl/cookbook/RecipeMultiGroupByTest.java b/akka-docs/rst/java/code/docs/stream/javadsl/cookbook/RecipeMultiGroupByTest.java index c05625b549..4b420eaeab 100644 --- a/akka-docs/rst/java/code/docs/stream/javadsl/cookbook/RecipeMultiGroupByTest.java +++ b/akka-docs/rst/java/code/docs/stream/javadsl/cookbook/RecipeMultiGroupByTest.java @@ -16,9 +16,6 @@ import akka.testkit.JavaTestKit; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; -import scala.concurrent.Await; -import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; import java.util.ArrayList; import java.util.Arrays; diff --git a/akka-docs/rst/java/code/docs/stream/javadsl/cookbook/RecipeReduceByKeyTest.java b/akka-docs/rst/java/code/docs/stream/javadsl/cookbook/RecipeReduceByKeyTest.java index a0a2f9d3e3..b2e9028d41 100644 --- a/akka-docs/rst/java/code/docs/stream/javadsl/cookbook/RecipeReduceByKeyTest.java +++ b/akka-docs/rst/java/code/docs/stream/javadsl/cookbook/RecipeReduceByKeyTest.java @@ -18,8 +18,6 @@ import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; -import scala.concurrent.Await; -import scala.concurrent.Future; import java.util.Arrays; import java.util.HashSet; diff --git a/akka-docs/rst/java/code/docs/stream/javadsl/cookbook/RecipeSeq.java b/akka-docs/rst/java/code/docs/stream/javadsl/cookbook/RecipeSeq.java index 75c74d4ea5..648e7b5ddf 100644 --- a/akka-docs/rst/java/code/docs/stream/javadsl/cookbook/RecipeSeq.java +++ b/akka-docs/rst/java/code/docs/stream/javadsl/cookbook/RecipeSeq.java @@ -13,9 +13,6 @@ import akka.testkit.JavaTestKit; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; -import scala.concurrent.Await; -import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; import java.util.Arrays; import java.util.List; diff --git a/akka-docs/rst/java/code/docs/stream/javadsl/cookbook/RecipeWorkerPool.java b/akka-docs/rst/java/code/docs/stream/javadsl/cookbook/RecipeWorkerPool.java index 222295a50a..c1683afb05 100644 --- a/akka-docs/rst/java/code/docs/stream/javadsl/cookbook/RecipeWorkerPool.java +++ b/akka-docs/rst/java/code/docs/stream/javadsl/cookbook/RecipeWorkerPool.java @@ -11,8 +11,6 @@ import akka.testkit.JavaTestKit; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; -import scala.concurrent.Await; -import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; import java.util.Arrays; diff --git a/akka-docs/rst/java/code/docs/testkit/TestKitDocTest.java b/akka-docs/rst/java/code/docs/testkit/TestKitDocTest.java index 3e9107daf6..38f6d33ea7 100644 --- a/akka-docs/rst/java/code/docs/testkit/TestKitDocTest.java +++ b/akka-docs/rst/java/code/docs/testkit/TestKitDocTest.java @@ -5,6 +5,7 @@ package docs.testkit; import static org.junit.Assert.*; +import akka.pattern.PatternsCS; import akka.testkit.*; import docs.AbstractJavaTest; import org.junit.Assert; @@ -21,11 +22,11 @@ import akka.actor.PoisonPill; import akka.actor.Props; import akka.actor.Terminated; import akka.actor.AbstractActor; -import scala.concurrent.Await; -import scala.concurrent.Future; import akka.testkit.TestActor.AutoPilot; import scala.concurrent.duration.Duration; +import java.util.concurrent.CompletableFuture; + public class TestKitDocTest extends AbstractJavaTest { @ClassRule @@ -65,9 +66,9 @@ public class TestKitDocTest extends AbstractJavaTest { //#test-behavior final Props props = Props.create(MyActor.class); final TestActorRef ref = TestActorRef.create(system, props, "testB"); - final Future future = akka.pattern.Patterns.ask(ref, "say42", 3000); - assertTrue(future.isCompleted()); - assertEquals(42, Await.result(future, Duration.Zero())); + final CompletableFuture future = PatternsCS.ask(ref, "say42", 3000).toCompletableFuture(); + assertTrue(future.isDone()); + assertEquals(42, future.get()); //#test-behavior } diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java index aac2e49f9e..31156e87c9 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java @@ -7,24 +7,17 @@ import akka.Done; import akka.NotUsed; import akka.actor.ActorRef; import akka.actor.Cancellable; -import akka.dispatch.Foreach; -import akka.dispatch.Futures; -import akka.dispatch.OnSuccess; -import akka.japi.JavaPartialFunction; import akka.japi.Pair; import akka.japi.function.*; import akka.japi.pf.PFBuilder; import akka.stream.*; import akka.stream.impl.ConstantFun; -import akka.stream.stage.*; import akka.testkit.AkkaSpec; import akka.stream.testkit.TestPublisher; import akka.testkit.JavaTestKit; import org.junit.ClassRule; import org.junit.Ignore; import org.junit.Test; -import scala.concurrent.Await; -import scala.concurrent.Future; import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; import scala.util.Try;