Add unit tests and polish Javadocs with lambdas for PatternsCS. #24881 (#24998)

* Add unit tests and polish Javadocs with lambdas for PatternsCS. #24881

* remove blocking from StopActor
This commit is contained in:
Song Kun 2018-05-10 20:32:28 +08:00 committed by Konrad `ktoso` Malawski
parent 7fa28b3488
commit f976f8d793
2 changed files with 297 additions and 30 deletions

View file

@ -8,17 +8,20 @@ import akka.actor.*;
import akka.dispatch.Futures;
import akka.testkit.AkkaJUnitActorSystemResource;
import akka.testkit.AkkaSpec;
import akka.testkit.TestLatch;
import akka.testkit.TestProbe;
import akka.util.Timeout;
import org.junit.ClassRule;
import org.junit.Test;
import org.scalatest.junit.JUnitSuite;
import scala.concurrent.Await;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
import java.util.concurrent.CompletionStage;
import java.util.Arrays;
import java.util.concurrent.*;
import static akka.pattern.Patterns.ask;
import static akka.pattern.Patterns.pipe;
@ -50,12 +53,24 @@ public class PatternsTest extends JUnitSuite {
}
}
public static final class StopActor extends AbstractActor {
@Override
public Receive createReceive() {
return receiveBuilder()
.match(String.class, message -> sender().tell("Pong", getSelf()))
.build();
}
}
@ClassRule
public static AkkaJUnitActorSystemResource actorSystemResource = new AkkaJUnitActorSystemResource("JavaAPI",
AkkaSpec.testConf());
private final ActorSystem system = actorSystemResource.getSystem();
private final ExecutionContext ec = system.dispatcher();
@Test
public void useAsk() throws Exception {
@ -72,6 +87,28 @@ public class PatternsTest extends JUnitSuite {
assertEquals("Ask (Identify) should return the proper ActorIdentity", testActor, id.getActorRef().get());
}
@Test
public void testCSAsk() throws Exception {
ActorRef target = system.actorOf(Props.create(JavaAPITestActor.class));
CompletionStage<String> result = PatternsCS.ask(target, "hello", 3000).thenApply(o -> (String)o);
String actual = result.toCompletableFuture().get(3, SECONDS);
assertEquals(JavaAPITestActor.ANSWER, actual);
}
@Test
public void testCSAskWithActorSelection() throws Exception {
ActorRef target = system.actorOf(Props.create(JavaAPITestActor.class), "test3");
ActorSelection selection = system.actorSelection("/user/test3");
ActorIdentity id = PatternsCS.ask(selection, new Identify("hello"), 3000)
.toCompletableFuture()
.thenApply(o -> (ActorIdentity)o)
.get(3, SECONDS);
assertEquals(target, id.getActorRef().get());
}
@Test
public void testCSAskWithReplyToTimeout() throws Exception {
final String expected = "hello";
@ -186,4 +223,250 @@ public class PatternsTest extends JUnitSuite {
pipe(Futures.successful("hi!"), system.dispatcher()).to(selection);
probe.expectMsg("hi!");
}
@Test
public void testCSPipeToActorRef() throws Exception {
TestProbe probe = new TestProbe(system);
CompletableFuture<String> f = new CompletableFuture<>();
f.complete("ho!");
PatternsCS.pipe(f, ec).to(probe.ref());
probe.expectMsg("ho!");
}
@Test
public void testCSPipeToActorSelection() throws Exception {
TestProbe probe = new TestProbe(system);
ActorSelection selection = system.actorSelection(probe.ref().path());
CompletableFuture<String> f = new CompletableFuture<>();
f.complete("hi!");
PatternsCS.pipe(f, ec).to(selection);
probe.expectMsg("hi!");
}
@Test
public void testRetry() throws Exception {
final String expected = "hello";
Future<String> retriedFuture =
Patterns.retry(
() -> Futures.successful(expected),
3,
Duration.apply(200, "millis"),
system.scheduler(), ec);
String actual = Await.result(retriedFuture, FiniteDuration.apply(3, SECONDS));
assertEquals(expected, actual);
}
@Test
public void testCSRetry() throws Exception {
final String expected = "hello";
Callable<CompletionStage<String>> attempt = () -> CompletableFuture.completedFuture(expected);
CompletionStage<String> retriedStage =
PatternsCS.retry(
attempt,
3,
java.time.Duration.ofMillis(200),
system.scheduler(), ec);
final String actual = retriedStage.toCompletableFuture().get(3, SECONDS);
assertEquals(expected, actual);
}
@Test(expected = IllegalStateException.class)
public void testAfterFailedCallable() throws Exception {
Callable<Future<String>> failedCallable = () -> Futures.failed(new IllegalStateException("Illegal!"));
Future<String> delayedFuture = Patterns
.after(
Duration.create(200, "millis"),
system.scheduler(),
ec,
failedCallable);
Future<String> resultFuture = Futures.firstCompletedOf(Arrays.asList(delayedFuture), ec);
Await.result(resultFuture, FiniteDuration.apply(3, SECONDS));
}
@Test(expected = IllegalStateException.class)
public void testAfterFailedFuture() throws Exception {
Future<String> failedFuture = Futures.failed(new IllegalStateException("Illegal!"));
Future<String> delayedFuture = Patterns
.after(
Duration.create(200, "millis"),
system.scheduler(),
ec,
failedFuture);
Future<String> resultFuture = Futures.firstCompletedOf(Arrays.asList(delayedFuture), ec);
Await.result(resultFuture, FiniteDuration.apply(3, SECONDS));
}
@Test
public void testAfterSuccessfulCallable() throws Exception {
final String expected = "Hello";
Future<String> delayedFuture = Patterns
.after(
Duration.create(200, "millis"),
system.scheduler(),
ec,
() -> Futures.successful(expected));
Future<String> resultFuture = Futures.firstCompletedOf(Arrays.asList(delayedFuture), ec);
final String actual = Await.result(resultFuture, FiniteDuration.apply(3, SECONDS));
assertEquals(expected, actual);
}
@Test
public void testAfterSuccessfulFuture() throws Exception {
final String expected = "Hello";
Future<String> delayedFuture = Patterns
.after(
Duration.create(200, "millis"),
system.scheduler(),
ec,
Futures.successful(expected));
Future<String> resultFuture = Futures.firstCompletedOf(Arrays.asList(delayedFuture), ec);
final String actual = Await.result(resultFuture, FiniteDuration.apply(3, SECONDS));
assertEquals(expected, actual);
}
@Test
public void testAfterFiniteDuration() throws Exception {
final String expected = "Hello";
Future<String> delayedFuture = Patterns
.after(
Duration.create(200, "millis"),
system.scheduler(),
ec,
Futures.successful("world"));
Future<String> immediateFuture = Futures.future(() -> expected, ec);
Future<String> resultFuture = Futures.firstCompletedOf(Arrays.asList(delayedFuture, immediateFuture), ec);
final String actual = Await.result(resultFuture, FiniteDuration.apply(3, SECONDS));
assertEquals(expected, actual);
}
@Test(expected = ExecutionException.class)
public void testCSAfterFailedCallable() throws Exception {
Callable<CompletionStage<String>> failedCallable = () -> {
CompletableFuture<String> f = new CompletableFuture<>();
f.completeExceptionally(new IllegalStateException("Illegal!"));
return f;
};
CompletionStage<String> delayedStage = PatternsCS
.after(
java.time.Duration.ofMillis(200),
system.scheduler(),
ec,
failedCallable);
delayedStage.toCompletableFuture().get(3, SECONDS);
}
@Test(expected = ExecutionException.class)
public void testCSAfterFailedFuture() throws Exception {
Callable<CompletionStage<String>> failedFuture = () -> {
CompletableFuture<String> f = new CompletableFuture<>();
f.completeExceptionally(new IllegalStateException("Illegal!"));
return f;
};
CompletionStage<String> delayedStage = PatternsCS
.after(
Duration.create(200, "millis"),
system.scheduler(),
ec,
failedFuture);
String result = delayedStage.toCompletableFuture().get(3, SECONDS);
}
@Test
public void testCSAfterSuccessfulCallable() throws Exception {
final String expected = "Hello";
final Callable<CompletionStage<String>> cf = () -> {
CompletableFuture<String> f = CompletableFuture.completedFuture(expected);
return f;
};
CompletionStage<String> delayedStage = PatternsCS
.after(
java.time.Duration.ofMillis(200),
system.scheduler(),
ec,
cf);
final String actual = delayedStage.toCompletableFuture().get(3, SECONDS);
assertEquals(expected, actual);
}
@Test
public void testCSAfterSuccessfulFuture() throws Exception {
final String expected = "Hello";
final CompletionStage<String> f = CompletableFuture.completedFuture(expected);
CompletionStage<String> delayedStage = PatternsCS
.after(
Duration.create(200, "millis"),
system.scheduler(),
ec,
f);
final String actual = delayedStage.toCompletableFuture().get(3, SECONDS);
assertEquals(expected, actual);
}
@Test
public void testCSAfterDuration() throws Exception {
final String expected = "Hello";
final CompletionStage<String> f = CompletableFuture.completedFuture("world!");
CompletionStage<String> delayedStage = PatternsCS
.after(
Duration.create(200, "millis"),
system.scheduler(),
ec,
f);
CompletableFuture<String> immediateStage = CompletableFuture.completedFuture(expected);
CompletableFuture<Object> resultStage = CompletableFuture.anyOf(delayedStage.toCompletableFuture(), immediateStage);
final String actual = (String) resultStage.get(3, SECONDS);
assertEquals(expected, actual);
}
@Test
public void testGracefulStop() throws Exception {
ActorRef target = system.actorOf(Props.create(StopActor.class));
Future<Boolean> result = Patterns.gracefulStop(target, FiniteDuration.apply(200, TimeUnit.MILLISECONDS));
Boolean actual = Await.result(result, FiniteDuration.apply(3, SECONDS));
assertEquals(true, actual);
}
@Test
public void testCSGracefulStop() throws Exception {
ActorRef target = system.actorOf(Props.create(StopActor.class));
CompletionStage<Boolean> result = PatternsCS.gracefulStop(target, java.time.Duration.ofMillis(200));
Boolean actual = result.toCompletableFuture().get(3, SECONDS);
assertEquals(true, actual);
}
}

View file

@ -219,7 +219,7 @@ object Patterns {
* // apply some transformation (i.e. enrich with request info)
* final Future<Object> transformed = f.map(new akka.japi.Function<Object, Object>() { ... });
* // send it on to the next stage
* Patterns.pipe(transformed).to(nextActor);
* Patterns.pipe(transformed, context).to(nextActor);
* }}}
*/
def pipe[T](future: Future[T], context: ExecutionContext): PipeableFuture[T] = scalaPipe(future)(context)
@ -314,12 +314,8 @@ object PatternsCS {
* <b>Recommended usage:</b>
*
* {{{
* final CompletionStage<Object> f = Patterns.ask(worker, request, timeout);
* f.onSuccess(new Procedure<Object>() {
* public void apply(Object o) {
* nextActor.tell(new EnrichedResult(request, o));
* }
* });
* final CompletionStage<Object> f = PatternsCS.ask(worker, request, timeout);
* f.thenRun(result -> nextActor.tell(new EnrichedResult(request, result)));
* }}}
*/
def ask(actor: ActorRef, message: Any, timeout: Timeout): CompletionStage[AnyRef] =
@ -365,11 +361,7 @@ object PatternsCS {
*
* {{{
* final CompletionStage<Object> f = PatternsCS.ask(worker, request, timeout);
* f.onSuccess(new Procedure<Object>() {
* public void apply(Object o) {
* nextActor.tell(new EnrichedResult(request, o));
* }
* });
* f.thenRun(result -> nextActor.tell(new EnrichedResult(request, result)));
* }}}
*/
def ask(actor: ActorRef, message: Any, timeoutMillis: Long): CompletionStage[AnyRef] =
@ -414,12 +406,8 @@ object PatternsCS {
* <b>Recommended usage:</b>
*
* {{{
* final CompletionStage<Object> f = Patterns.ask(selection, request, timeout);
* f.onSuccess(new Procedure<Object>() {
* public void apply(Object o) {
* nextActor.tell(new EnrichedResult(request, o));
* }
* });
* final CompletionStage<Object> f = PatternsCS.ask(selection, request, timeout);
* f.thenRun(result -> nextActor.tell(new EnrichedResult(request, result)));
* }}}
*/
def ask(selection: ActorSelection, message: Any, timeout: Timeout): CompletionStage[AnyRef] =
@ -446,12 +434,8 @@ object PatternsCS {
* <b>Recommended usage:</b>
*
* {{{
* final CompletionStage<Object> f = Patterns.ask(selection, request, timeout);
* f.onSuccess(new Procedure<Object>() {
* public void apply(Object o) {
* nextActor.tell(new EnrichedResult(request, o));
* }
* });
* final CompletionStage<Object> f = PatternsCS.ask(selection, request, timeout);
* f.thenRun(result -> nextActor.tell(new EnrichedResult(request, result)));
* }}}
*/
def ask(selection: ActorSelection, message: Any, timeoutMillis: Long): CompletionStage[AnyRef] =
@ -472,8 +456,8 @@ object PatternsCS {
extended.ask(selection, messageFactory.apply _)(Timeout(timeoutMillis.millis)).toJava.asInstanceOf[CompletionStage[AnyRef]]
/**
* Register an onComplete callback on this [[java.util.concurrent.CompletionStage]] to send
* the result to the given [[akka.actor.ActorRef]] or [[akka.actor.ActorSelection]].
* When this [[java.util.concurrent.CompletionStage]] finishes, send its result to the given
* [[akka.actor.ActorRef]] or [[akka.actor.ActorSelection]].
* Returns the original CompletionStage to allow method chaining.
* If the future was completed with failure it is sent as a [[akka.actor.Status.Failure]]
* to the recipient.
@ -481,11 +465,11 @@ object PatternsCS {
* <b>Recommended usage example:</b>
*
* {{{
* final CompletionStage<Object> f = Patterns.ask(worker, request, timeout);
* final CompletionStage<Object> f = PatternsCS.ask(worker, request, timeout);
* // apply some transformation (i.e. enrich with request info)
* final CompletionStage<Object> transformed = f.map(new akka.japi.Function<Object, Object>() { ... });
* final CompletionStage<Object> transformed = f.thenApply(result -> { ... });
* // send it on to the next stage
* Patterns.pipe(transformed).to(nextActor);
* PatternsCS.pipe(transformed, context).to(nextActor);
* }}}
*/
def pipe[T](future: CompletionStage[T], context: ExecutionContext): PipeableCompletionStage[T] = pipeCompletionStage(future)(context)