use CompletionStage instead of Future in java doc classes (#22472)

* use CompletionStage instead of Future in FactorialBackend.java #22393

* use CompletionStage instead of Future in FactorialBackend.java #22393 2
This commit is contained in:
ortigali 2017-03-14 15:51:44 +05:00 committed by Patrik Nordwall
parent eaf50405ab
commit db0a473cd5
23 changed files with 67 additions and 146 deletions

View file

@ -5,30 +5,20 @@
package docs.actorlambda;
import akka.actor.*;
import akka.japi.pf.ReceiveBuilder;
import akka.testkit.ErrorFilter;
import akka.testkit.EventFilter;
import akka.testkit.TestEvent;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import docs.AbstractJavaTest;
import static docs.actorlambda.Messages.Swap.Swap;
import static docs.actorlambda.Messages.*;
import static akka.japi.Util.immutableSeq;
import akka.actor.CoordinatedShutdown;
import akka.util.Timeout;
import akka.Done;
import java.util.concurrent.CompletionStage;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import akka.testkit.TestActors;
import akka.dispatch.Mapper;
import akka.dispatch.Futures;
import akka.util.Timeout;
import scala.concurrent.Await;
import akka.testkit.JavaTestKit;
import org.junit.AfterClass;
@ -49,20 +39,16 @@ import akka.actor.ActorSelection;
import akka.actor.Identify;
//#import-identify
//#import-ask
import static akka.pattern.Patterns.ask;
import static akka.pattern.Patterns.pipe;
import akka.dispatch.Futures;
import akka.dispatch.Mapper;
import static akka.pattern.PatternsCS.ask;
import static akka.pattern.PatternsCS.pipe;
import akka.util.Timeout;
import java.util.concurrent.CompletableFuture;
//#import-ask
//#import-gracefulStop
import static akka.pattern.PatternsCS.gracefulStop;
import akka.pattern.AskTimeoutException;
import scala.concurrent.Await;
import scala.concurrent.duration.Duration;
//#import-ask
import scala.concurrent.Future;
import static akka.pattern.Patterns.gracefulStop;
//#import-ask
import java.util.concurrent.CompletionStage;
//#import-gracefulStop
//#import-terminated
import akka.actor.Terminated;
@ -160,7 +146,7 @@ public class ActorDocTest extends AbstractJavaTest {
public void onReceive(Object msg) throws Exception {
if (msg instanceof Msg1)
receiveMsg1((Msg1) msg);
else if (msg instanceof Msg1)
else if (msg instanceof Msg2)
receiveMsg2((Msg2) msg);
else if (msg instanceof Msg3)
receiveMsg3((Msg3) msg);
@ -424,9 +410,9 @@ public class ActorDocTest extends AbstractJavaTest {
ActorRef actorRef = system.actorOf(Props.create(Manager.class));
//#gracefulStop
try {
Future<Boolean> stopped =
CompletionStage<Boolean> stopped =
gracefulStop(actorRef, Duration.create(5, TimeUnit.SECONDS), Manager.SHUTDOWN);
Await.result(stopped, Duration.create(6, TimeUnit.SECONDS));
stopped.toCompletableFuture().get(6, TimeUnit.SECONDS);
// the actor has been stopped
} catch (AskTimeoutException e) {
// the actor wasn't stopped within 5 seconds
@ -768,25 +754,23 @@ public class ActorDocTest extends AbstractJavaTest {
ActorRef actorC = getRef();
//#ask-pipe
final Timeout t = new Timeout(Duration.create(5, TimeUnit.SECONDS));
Timeout t = new Timeout(Duration.create(5, TimeUnit.SECONDS));
final ArrayList<Future<Object>> futures = new ArrayList<Future<Object>>();
futures.add(ask(actorA, "request", 1000)); // using 1000ms timeout
futures.add(ask(actorB, "another request", t)); // using timeout from
// above
// using 1000ms timeout
CompletableFuture<Object> future1 =
ask(actorA, "request", 1000).toCompletableFuture();
final Future<Iterable<Object>> aggregate = Futures.sequence(futures,
system.dispatcher());
// using timeout from above
CompletableFuture<Object> future2 =
ask(actorB, "another request", t).toCompletableFuture();
final Future<Result> transformed = aggregate.map(
new Mapper<Iterable<Object>, Result>() {
public Result apply(Iterable<Object> coll) {
final Iterator<Object> it = coll.iterator();
final String x = (String) it.next();
final String s = (String) it.next();
return new Result(x, s);
}
}, system.dispatcher());
CompletableFuture<Result> transformed =
CompletableFuture.allOf(future1, future2)
.thenApply(v -> {
String x = (String) future1.join();
String s = (String) future2.join();
return new Result(x, s);
});
pipe(transformed, system.dispatcher()).to(actorC);
//#ask-pipe

View file

@ -2,14 +2,14 @@ package docs.camel;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletionStage;
import akka.testkit.JavaTestKit;
import scala.concurrent.Future;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.camel.CamelMessage;
import akka.pattern.Patterns;
import akka.pattern.PatternsCS;
public class ProducerTestBase {
public void tellJmsProducer() {
@ -29,7 +29,7 @@ public class ProducerTestBase {
ActorSystem system = ActorSystem.create("some-system");
Props props = Props.create(FirstProducer.class);
ActorRef producer = system.actorOf(props,"myproducer");
Future<Object> future = Patterns.ask(producer, "some request", 1000);
CompletionStage<Object> future = PatternsCS.ask(producer, "some request", 1000);
//#AskProducer
system.stop(producer);
JavaTestKit.shutdownActorSystem(system);

View file

@ -1,11 +1,10 @@
package docs.cluster;
import java.math.BigInteger;
import scala.concurrent.Future;
import java.util.concurrent.CompletableFuture;
import akka.actor.AbstractActor;
import akka.dispatch.Mapper;
import static akka.dispatch.Futures.future;
import static akka.pattern.Patterns.pipe;
import static akka.pattern.PatternsCS.pipe;
//#backend
public class FactorialBackend extends AbstractActor {
@ -14,15 +13,10 @@ public class FactorialBackend extends AbstractActor {
public Receive createReceive() {
return receiveBuilder()
.match(Integer.class, n -> {
Future<BigInteger> f = future(() -> factorial(n),
getContext().dispatcher());
Future<FactorialResult> result = f.map(
new Mapper<BigInteger, FactorialResult>() {
public FactorialResult apply(BigInteger factorial) {
return new FactorialResult(n, factorial);
}
}, getContext().dispatcher());
CompletableFuture<FactorialResult> result =
CompletableFuture.supplyAsync(() -> factorial(n))
.thenApply((factorial) -> new FactorialResult(n, factorial));
pipe(result, getContext().dispatcher()).to(sender());

View file

@ -1,11 +1,10 @@
package docs.pattern;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeoutException;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import akka.actor.Actor;
import akka.actor.ActorKilledException;
import akka.actor.ActorRef;
import akka.actor.ActorRefFactory;
@ -15,11 +14,9 @@ import akka.actor.Props;
import akka.actor.Scheduler;
import akka.actor.Status;
import akka.actor.SupervisorStrategy;
import akka.actor.SupervisorStrategy.Directive;
import akka.actor.Terminated;
import akka.actor.AbstractActor;
import akka.japi.Function;
import akka.pattern.Patterns;
import akka.pattern.PatternsCS;
import akka.util.Timeout;
public class SupervisedAsk {
@ -61,13 +58,10 @@ public class SupervisedAsk {
@Override
public SupervisorStrategy supervisorStrategy() {
return new OneForOneStrategy(0, Duration.Zero(),
new Function<Throwable, Directive>() {
public Directive apply(Throwable cause) {
caller.tell(new Status.Failure(cause), self());
return SupervisorStrategy.stop();
}
});
return new OneForOneStrategy(0, Duration.Zero(), cause -> {
caller.tell(new Status.Failure(cause), self());
return SupervisorStrategy.stop();
});
}
@Override
@ -99,10 +93,10 @@ public class SupervisedAsk {
}
}
public static Future<Object> askOf(ActorRef supervisorCreator, Props props,
Object message, Timeout timeout) {
public static CompletionStage<Object> askOf(ActorRef supervisorCreator, Props props,
Object message, Timeout timeout) {
AskParam param = new AskParam(props, message, timeout);
return Patterns.ask(supervisorCreator, param, timeout);
return PatternsCS.ask(supervisorCreator, param, timeout);
}
synchronized public static ActorRef createSupervisorCreator(

View file

@ -1,12 +1,13 @@
package docs.pattern;
import scala.concurrent.Await;
import scala.concurrent.Future;
import akka.actor.ActorRef;
import akka.actor.ActorRefFactory;
import akka.actor.Props;
import akka.actor.AbstractActor;
import akka.util.Timeout;
import scala.concurrent.duration.FiniteDuration;
import java.util.concurrent.CompletionStage;
public class SupervisedAskSpec {
@ -17,9 +18,10 @@ public class SupervisedAskSpec {
try {
ActorRef supervisorCreator = SupervisedAsk
.createSupervisorCreator(actorSystem);
Future<Object> finished = SupervisedAsk.askOf(supervisorCreator,
CompletionStage<Object> finished = SupervisedAsk.askOf(supervisorCreator,
Props.create(someActor), message, timeout);
return Await.result(finished, timeout.duration());
FiniteDuration d = timeout.duration();
return finished.toCompletableFuture().get(d.length(), d.unit());
} catch (Exception e) {
// exception propagated by supervision
throw e;

View file

@ -14,7 +14,6 @@ import akka.persistence.query.Offset;
import com.typesafe.config.Config;
import akka.actor.*;
import akka.japi.pf.ReceiveBuilder;
import akka.persistence.query.*;
import akka.stream.ActorMaterializer;
import akka.stream.javadsl.Sink;
@ -23,7 +22,6 @@ import akka.util.Timeout;
import docs.persistence.query.MyEventsByTagPublisher;
import org.reactivestreams.Subscriber;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
import java.util.ArrayList;
@ -272,11 +270,11 @@ public class PersistenceQueryDocTest {
readJournal.eventsByTag("blue", new Sequence(0L));
// find top 10 blue things:
final Future<List<Object>> top10BlueThings =
(Future<List<Object>>) blueThings
.map(t -> t.event())
final CompletionStage<List<Object>> top10BlueThings =
blueThings
.map(EventEnvelope::event)
.take(10) // cancels the query stream after pulling 10 elements
.<List<Object>>runFold(new ArrayList<>(10), (acc, e) -> {
.runFold(new ArrayList<>(10), (acc, e) -> {
acc.add(e);
return acc;
}, mat);

View file

@ -25,10 +25,6 @@ import akka.testkit.JavaTestKit;
import akka.util.ByteIterator;
import akka.util.ByteString;
import akka.util.ByteStringBuilder;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
import static org.junit.Assert.assertArrayEquals;
public class BidiFlowDocTest extends AbstractJavaTest {

View file

@ -18,12 +18,8 @@ import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.Promise;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
import scala.Option;
import akka.actor.ActorSystem;
import akka.actor.Cancellable;
import akka.dispatch.Futures;

View file

@ -15,9 +15,6 @@ import docs.AbstractJavaTest;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import akka.actor.ActorSystem;
import akka.stream.ActorMaterializer;

View file

@ -18,9 +18,6 @@ import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import akka.actor.ActorSystem;
import akka.japi.Pair;
import akka.stream.*;

View file

@ -24,9 +24,7 @@ import org.junit.BeforeClass;
import org.junit.Test;
import org.reactivestreams.Subscription;
import scala.Tuple2;
import scala.concurrent.Await;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.Promise;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;

View file

@ -31,9 +31,7 @@ import akka.stream.testkit.javadsl.TestSink;
import akka.stream.testkit.javadsl.TestSource;
import akka.testkit.JavaTestKit;
import akka.testkit.TestLatch;
import scala.collection.Iterator;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import scala.util.Random;

View file

@ -22,10 +22,6 @@ import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Set;

View file

@ -19,7 +19,6 @@ import docs.stream.SilenceSystemOut;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import scala.concurrent.Future;
import akka.stream.*;
import akka.testkit.JavaTestKit;

View file

@ -15,13 +15,11 @@ import java.net.InetSocketAddress;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import scala.concurrent.Future;
import akka.actor.ActorSystem;
import akka.stream.*;
import akka.stream.javadsl.*;
import akka.stream.javadsl.Tcp.*;
import akka.stream.stage.*;
import akka.testkit.JavaTestKit;
import akka.testkit.SocketUtil;
import akka.testkit.TestProbe;

View file

@ -12,7 +12,6 @@ import akka.testkit.JavaTestKit;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import scala.concurrent.Future;
import java.util.ArrayList;
import java.util.List;

View file

@ -16,9 +16,6 @@ import akka.testkit.JavaTestKit;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
import java.util.ArrayList;
import java.util.Arrays;

View file

@ -18,8 +18,6 @@ import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import scala.concurrent.Await;
import scala.concurrent.Future;
import java.util.Arrays;
import java.util.HashSet;

View file

@ -13,9 +13,6 @@ import akka.testkit.JavaTestKit;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
import java.util.Arrays;
import java.util.List;

View file

@ -11,8 +11,6 @@ import akka.testkit.JavaTestKit;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
import java.util.Arrays;

View file

@ -5,6 +5,7 @@ package docs.testkit;
import static org.junit.Assert.*;
import akka.pattern.PatternsCS;
import akka.testkit.*;
import docs.AbstractJavaTest;
import org.junit.Assert;
@ -21,11 +22,11 @@ import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.actor.Terminated;
import akka.actor.AbstractActor;
import scala.concurrent.Await;
import scala.concurrent.Future;
import akka.testkit.TestActor.AutoPilot;
import scala.concurrent.duration.Duration;
import java.util.concurrent.CompletableFuture;
public class TestKitDocTest extends AbstractJavaTest {
@ClassRule
@ -65,9 +66,9 @@ public class TestKitDocTest extends AbstractJavaTest {
//#test-behavior
final Props props = Props.create(MyActor.class);
final TestActorRef<MyActor> ref = TestActorRef.create(system, props, "testB");
final Future<Object> future = akka.pattern.Patterns.ask(ref, "say42", 3000);
assertTrue(future.isCompleted());
assertEquals(42, Await.result(future, Duration.Zero()));
final CompletableFuture<Object> future = PatternsCS.ask(ref, "say42", 3000).toCompletableFuture();
assertTrue(future.isDone());
assertEquals(42, future.get());
//#test-behavior
}