diff --git a/akka-actor/src/main/scala/akka/Done.scala b/akka-actor/src/main/scala/akka/Done.scala
index 99cc1aeaf0..f372eeb2c1 100644
--- a/akka-actor/src/main/scala/akka/Done.scala
+++ b/akka-actor/src/main/scala/akka/Done.scala
@@ -14,6 +14,5 @@ case object Done extends Done {
/**
* Java API: the singleton instance
*/
- def getInstance() = this
+ def getInstance(): Done = this
}
-
diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala
index f753967487..823369f308 100644
--- a/akka-actor/src/main/scala/akka/dispatch/Future.scala
+++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala
@@ -11,6 +11,8 @@ import java.lang.{ Iterable ⇒ JIterable }
import java.util.{ LinkedList ⇒ JLinkedList }
import java.util.concurrent.{ Executor, ExecutorService, ExecutionException, Callable, TimeoutException }
import scala.util.{ Try, Success, Failure }
+import java.util.concurrent.CompletionStage
+import java.util.concurrent.CompletableFuture
/**
* ExecutionContexts is the Java API for ExecutionContexts
@@ -111,6 +113,15 @@ object Futures {
*/
def successful[T](result: T): Future[T] = Future.successful(result)
+ /**
+ * Creates an already completed CompletionStage with the specified exception
+ */
+ def failedCompletionStage[T](ex: Throwable): CompletionStage[T] = {
+ val f = CompletableFuture.completedFuture[T](null.asInstanceOf[T])
+ f.obtrudeException(ex)
+ f
+ }
+
/**
* Returns a Future that will hold the optional result of the first Future with a result that matches the predicate
*/
diff --git a/akka-actor/src/main/scala/akka/pattern/FutureTimeoutSupport.scala b/akka-actor/src/main/scala/akka/pattern/FutureTimeoutSupport.scala
index fbec79d4b2..86161184be 100644
--- a/akka-actor/src/main/scala/akka/pattern/FutureTimeoutSupport.scala
+++ b/akka-actor/src/main/scala/akka/pattern/FutureTimeoutSupport.scala
@@ -8,6 +8,10 @@ import scala.concurrent.{ ExecutionContext, Promise, Future }
import akka.actor._
import scala.util.control.NonFatal
import scala.concurrent.duration.FiniteDuration
+import java.util.concurrent.CompletionStage
+import java.util.concurrent.CompletableFuture
+import akka.dispatch.Futures
+import java.util.function.BiConsumer
trait FutureTimeoutSupport {
/**
@@ -22,4 +26,29 @@ trait FutureTimeoutSupport {
using.scheduleOnce(duration) { p completeWith { try value catch { case NonFatal(t) ⇒ Future.failed(t) } } }
p.future
}
+
+ /**
+ * Returns a [[scala.concurrent.Future]] that will be completed with the success or failure of the provided value
+ * after the specified duration.
+ */
+ def afterCompletionStage[T](duration: FiniteDuration, using: Scheduler)(value: ⇒ CompletionStage[T])(implicit ec: ExecutionContext): CompletionStage[T] =
+ if (duration.isFinite() && duration.length < 1) {
+ try value catch { case NonFatal(t) ⇒ Futures.failedCompletionStage(t) }
+ } else {
+ val p = new CompletableFuture[T]
+ using.scheduleOnce(duration) {
+ try {
+ val future = value
+ future.whenComplete(new BiConsumer[T, Throwable] {
+ override def accept(t: T, ex: Throwable): Unit = {
+ if (t != null) p.complete(t)
+ if (ex != null) p.completeExceptionally(ex)
+ }
+ })
+ } catch {
+ case NonFatal(ex) ⇒ p.completeExceptionally(ex)
+ }
+ }
+ p
+ }
}
diff --git a/akka-actor/src/main/scala/akka/pattern/Patterns.scala b/akka-actor/src/main/scala/akka/pattern/Patterns.scala
index 17d2814134..bb5d302cfb 100644
--- a/akka-actor/src/main/scala/akka/pattern/Patterns.scala
+++ b/akka-actor/src/main/scala/akka/pattern/Patterns.scala
@@ -7,6 +7,8 @@ import akka.actor.{ ActorSelection, Scheduler }
import java.util.concurrent.{ Callable, TimeUnit }
import scala.concurrent.ExecutionContext
import scala.concurrent.duration.FiniteDuration
+import java.util.concurrent.CompletionStage
+import scala.compat.java8.FutureConverters._
object Patterns {
import akka.japi
@@ -250,9 +252,262 @@ object Patterns {
scalaAfter(duration, scheduler)(value.call())(context)
/**
- * Returns a [[scala.concurrent.Future]] that will be completed with the success or failure of the provided value
+ * Returns a [[scala.concurrent.Future]] that will be completed with the success or failure of the provided Callable
* after the specified duration.
*/
def after[T](duration: FiniteDuration, scheduler: Scheduler, context: ExecutionContext, value: Future[T]): Future[T] =
scalaAfter(duration, scheduler)(value)(context)
}
+
+object PatternsCS {
+ import akka.japi
+ import akka.actor.{ ActorRef, ActorSystem }
+ import akka.pattern.{ ask ⇒ scalaAsk, pipe ⇒ scalaPipe, gracefulStop ⇒ scalaGracefulStop, after ⇒ scalaAfter }
+ import akka.util.Timeout
+ import scala.concurrent.Future
+ import scala.concurrent.duration._
+
+ /**
+ * Java API for `akka.pattern.ask`:
+ * Sends a message asynchronously and returns a [[java.util.concurrent.CompletionStage]]
+ * holding the eventual reply message; this means that the target actor
+ * needs to send the result to the `sender` reference provided. The CompletionStage
+ * will be completed with an [[akka.pattern.AskTimeoutException]] after the
+ * given timeout has expired; this is independent from any timeout applied
+ * while awaiting a result for this future (i.e. in
+ * `Await.result(..., timeout)`).
+ *
+ * Warning:
+ * When using future callbacks, inside actors you need to carefully avoid closing over
+ * the containing actor’s object, i.e. do not call methods or access mutable state
+ * on the enclosing actor from within the callback. This would break the actor
+ * encapsulation and may introduce synchronization bugs and race conditions because
+ * the callback will be scheduled concurrently to the enclosing actor. Unfortunately
+ * there is not yet a way to detect these illegal accesses at compile time.
+ *
+ * Recommended usage:
+ *
+ * {{{
+ * final CompletionStage f = Patterns.ask(worker, request, timeout);
+ * f.onSuccess(new Procedure() {
+ * public void apply(Object o) {
+ * nextActor.tell(new EnrichedResult(request, o));
+ * }
+ * });
+ * }}}
+ */
+ def ask(actor: ActorRef, message: Any, timeout: Timeout): CompletionStage[AnyRef] =
+ scalaAsk(actor, message)(timeout).toJava.asInstanceOf[CompletionStage[AnyRef]]
+
+ /**
+ * A variation of ask which allows to implement "replyTo" pattern by including
+ * sender reference in message.
+ *
+ * {{{
+ * final CompletionStage f = Patterns.ask(
+ * worker,
+ * new akka.japi.Function {
+ * Object apply(ActorRef askSender) {
+ * return new Request(askSender);
+ * }
+ * },
+ * timeout);
+ * }}}
+ */
+ def ask(actor: ActorRef, messageFactory: japi.Function[ActorRef, Any], timeout: Timeout): CompletionStage[AnyRef] =
+ scalaAsk(actor, messageFactory.apply _)(timeout).toJava.asInstanceOf[CompletionStage[AnyRef]]
+
+ /**
+ * Java API for `akka.pattern.ask`:
+ * Sends a message asynchronously and returns a [[java.util.concurrent.CompletionStage]]
+ * holding the eventual reply message; this means that the target actor
+ * needs to send the result to the `sender` reference provided. The CompletionStage
+ * will be completed with an [[akka.pattern.AskTimeoutException]] after the
+ * given timeout has expired; this is independent from any timeout applied
+ * while awaiting a result for this future (i.e. in
+ * `Await.result(..., timeout)`).
+ *
+ * Warning:
+ * When using future callbacks, inside actors you need to carefully avoid closing over
+ * the containing actor’s object, i.e. do not call methods or access mutable state
+ * on the enclosing actor from within the callback. This would break the actor
+ * encapsulation and may introduce synchronization bugs and race conditions because
+ * the callback will be scheduled concurrently to the enclosing actor. Unfortunately
+ * there is not yet a way to detect these illegal accesses at compile time.
+ *
+ * Recommended usage:
+ *
+ * {{{
+ * final CompletionStage f = Patterns.ask(worker, request, timeout);
+ * f.onSuccess(new Procedure() {
+ * public void apply(Object o) {
+ * nextActor.tell(new EnrichedResult(request, o));
+ * }
+ * });
+ * }}}
+ */
+ def ask(actor: ActorRef, message: Any, timeoutMillis: Long): CompletionStage[AnyRef] =
+ scalaAsk(actor, message)(new Timeout(timeoutMillis, TimeUnit.MILLISECONDS)).toJava.asInstanceOf[CompletionStage[AnyRef]]
+
+ /**
+ * A variation of ask which allows to implement "replyTo" pattern by including
+ * sender reference in message.
+ *
+ * {{{
+ * final CompletionStage f = Patterns.ask(
+ * worker,
+ * new akka.japi.Function {
+ * Object apply(ActorRef askSender) {
+ * return new Request(askSender);
+ * }
+ * },
+ * timeout);
+ * }}}
+ */
+ def ask(actor: ActorRef, messageFactory: japi.Function[ActorRef, Any], timeoutMillis: Long): CompletionStage[AnyRef] =
+ scalaAsk(actor, messageFactory.apply _)(Timeout(timeoutMillis.millis)).toJava.asInstanceOf[CompletionStage[AnyRef]]
+
+ /**
+ * Java API for `akka.pattern.ask`:
+ * Sends a message asynchronously and returns a [[java.util.concurrent.CompletionStage]]
+ * holding the eventual reply message; this means that the target [[akka.actor.ActorSelection]]
+ * needs to send the result to the `sender` reference provided. The CompletionStage
+ * will be completed with an [[akka.pattern.AskTimeoutException]] after the
+ * given timeout has expired; this is independent from any timeout applied
+ * while awaiting a result for this future (i.e. in
+ * `Await.result(..., timeout)`).
+ *
+ * Warning:
+ * When using future callbacks, inside actors you need to carefully avoid closing over
+ * the containing actor’s object, i.e. do not call methods or access mutable state
+ * on the enclosing actor from within the callback. This would break the actor
+ * encapsulation and may introduce synchronization bugs and race conditions because
+ * the callback will be scheduled concurrently to the enclosing actor. Unfortunately
+ * there is not yet a way to detect these illegal accesses at compile time.
+ *
+ * Recommended usage:
+ *
+ * {{{
+ * final CompletionStage f = Patterns.ask(selection, request, timeout);
+ * f.onSuccess(new Procedure() {
+ * public void apply(Object o) {
+ * nextActor.tell(new EnrichedResult(request, o));
+ * }
+ * });
+ * }}}
+ */
+ def ask(selection: ActorSelection, message: Any, timeout: Timeout): CompletionStage[AnyRef] =
+ scalaAsk(selection, message)(timeout).toJava.asInstanceOf[CompletionStage[AnyRef]]
+
+ /**
+ * Java API for `akka.pattern.ask`:
+ * Sends a message asynchronously and returns a [[java.util.concurrent.CompletionStage]]
+ * holding the eventual reply message; this means that the target [[akka.actor.ActorSelection]]
+ * needs to send the result to the `sender` reference provided. The CompletionStage
+ * will be completed with an [[akka.pattern.AskTimeoutException]] after the
+ * given timeout has expired; this is independent from any timeout applied
+ * while awaiting a result for this future (i.e. in
+ * `Await.result(..., timeout)`).
+ *
+ * Warning:
+ * When using future callbacks, inside actors you need to carefully avoid closing over
+ * the containing actor’s object, i.e. do not call methods or access mutable state
+ * on the enclosing actor from within the callback. This would break the actor
+ * encapsulation and may introduce synchronization bugs and race conditions because
+ * the callback will be scheduled concurrently to the enclosing actor. Unfortunately
+ * there is not yet a way to detect these illegal accesses at compile time.
+ *
+ * Recommended usage:
+ *
+ * {{{
+ * final CompletionStage f = Patterns.ask(selection, request, timeout);
+ * f.onSuccess(new Procedure() {
+ * public void apply(Object o) {
+ * nextActor.tell(new EnrichedResult(request, o));
+ * }
+ * });
+ * }}}
+ */
+ def ask(selection: ActorSelection, message: Any, timeoutMillis: Long): CompletionStage[AnyRef] =
+ scalaAsk(selection, message)(new Timeout(timeoutMillis, TimeUnit.MILLISECONDS)).toJava.asInstanceOf[CompletionStage[AnyRef]]
+
+ /**
+ * A variation of ask which allows to implement "replyTo" pattern by including
+ * sender reference in message.
+ *
+ * {{{
+ * final CompletionStage f = Patterns.ask(
+ * selection,
+ * new akka.japi.Function {
+ * Object apply(ActorRef askSender) {
+ * return new Request(askSender);
+ * }
+ * },
+ * timeout);
+ * }}}
+ */
+ def ask(selection: ActorSelection, messageFactory: japi.Function[ActorRef, Any], timeoutMillis: Long): CompletionStage[AnyRef] =
+ scalaAsk(selection, messageFactory.apply _)(Timeout(timeoutMillis.millis)).toJava.asInstanceOf[CompletionStage[AnyRef]]
+
+ /**
+ * Register an onComplete callback on this [[java.util.concurrent.CompletionStage]] to send
+ * the result to the given [[akka.actor.ActorRef]] or [[akka.actor.ActorSelection]].
+ * Returns the original CompletionStage to allow method chaining.
+ * If the future was completed with failure it is sent as a [[akka.actor.Status.Failure]]
+ * to the recipient.
+ *
+ * Recommended usage example:
+ *
+ * {{{
+ * final CompletionStage f = Patterns.ask(worker, request, timeout);
+ * // apply some transformation (i.e. enrich with request info)
+ * final CompletionStage transformed = f.map(new akka.japi.Function() { ... });
+ * // send it on to the next stage
+ * Patterns.pipe(transformed).to(nextActor);
+ * }}}
+ */
+ def pipe[T](future: CompletionStage[T], context: ExecutionContext): PipeableCompletionStage[T] = pipeCompletionStage(future)(context)
+
+ /**
+ * Returns a [[java.util.concurrent.CompletionStage]] that will be completed with success (value `true`) when
+ * existing messages of the target actor has been processed and the actor has been
+ * terminated.
+ *
+ * Useful when you need to wait for termination or compose ordered termination of several actors.
+ *
+ * If the target actor isn't terminated within the timeout the [[java.util.concurrent.CompletionStage]]
+ * is completed with failure [[akka.pattern.AskTimeoutException]].
+ */
+ def gracefulStop(target: ActorRef, timeout: FiniteDuration): CompletionStage[java.lang.Boolean] =
+ scalaGracefulStop(target, timeout).toJava.asInstanceOf[CompletionStage[java.lang.Boolean]]
+
+ /**
+ * Returns a [[java.util.concurrent.CompletionStage]] that will be completed with success (value `true`) when
+ * existing messages of the target actor has been processed and the actor has been
+ * terminated.
+ *
+ * Useful when you need to wait for termination or compose ordered termination of several actors.
+ *
+ * If you want to invoke specialized stopping logic on your target actor instead of PoisonPill, you can pass your
+ * stop command as `stopMessage` parameter
+ *
+ * If the target actor isn't terminated within the timeout the [[java.util.concurrent.CompletionStage]]
+ * is completed with failure [[akka.pattern.AskTimeoutException]].
+ */
+ def gracefulStop(target: ActorRef, timeout: FiniteDuration, stopMessage: Any): CompletionStage[java.lang.Boolean] =
+ scalaGracefulStop(target, timeout, stopMessage).toJava.asInstanceOf[CompletionStage[java.lang.Boolean]]
+
+ /**
+ * Returns a [[java.util.concurrent.CompletionStage]] that will be completed with the success or failure of the provided Callable
+ * after the specified duration.
+ */
+ def after[T](duration: FiniteDuration, scheduler: Scheduler, context: ExecutionContext, value: Callable[CompletionStage[T]]): CompletionStage[T] =
+ afterCompletionStage(duration, scheduler)(value.call())(context)
+
+ /**
+ * Returns a [[java.util.concurrent.CompletionStage]] that will be completed with the success or failure of the provided value
+ * after the specified duration.
+ */
+ def after[T](duration: FiniteDuration, scheduler: Scheduler, context: ExecutionContext, value: CompletionStage[T]): CompletionStage[T] =
+ afterCompletionStage(duration, scheduler)(value)(context)
+}
diff --git a/akka-actor/src/main/scala/akka/pattern/PipeToSupport.scala b/akka-actor/src/main/scala/akka/pattern/PipeToSupport.scala
index de34e22b77..d84b8caec7 100644
--- a/akka-actor/src/main/scala/akka/pattern/PipeToSupport.scala
+++ b/akka-actor/src/main/scala/akka/pattern/PipeToSupport.scala
@@ -8,23 +8,23 @@ import scala.concurrent.{ Future, ExecutionContext }
import scala.util.{ Failure, Success }
import akka.actor.{ Status, ActorRef, Actor }
import akka.actor.ActorSelection
+import java.util.concurrent.CompletionStage
+import java.util.function.BiConsumer
trait PipeToSupport {
final class PipeableFuture[T](val future: Future[T])(implicit executionContext: ExecutionContext) {
def pipeTo(recipient: ActorRef)(implicit sender: ActorRef = Actor.noSender): Future[T] = {
- future onComplete {
+ future andThen {
case Success(r) ⇒ recipient ! r
case Failure(f) ⇒ recipient ! Status.Failure(f)
}
- future
}
def pipeToSelection(recipient: ActorSelection)(implicit sender: ActorRef = Actor.noSender): Future[T] = {
- future onComplete {
+ future andThen {
case Success(r) ⇒ recipient ! r
case Failure(f) ⇒ recipient ! Status.Failure(f)
}
- future
}
def to(recipient: ActorRef): PipeableFuture[T] = to(recipient, Actor.noSender)
def to(recipient: ActorRef, sender: ActorRef): PipeableFuture[T] = {
@@ -38,6 +38,35 @@ trait PipeToSupport {
}
}
+ final class PipeableCompletionStage[T](val future: CompletionStage[T])(implicit executionContext: ExecutionContext) {
+ def pipeTo(recipient: ActorRef)(implicit sender: ActorRef = Actor.noSender): CompletionStage[T] = {
+ future whenComplete new BiConsumer[T, Throwable] {
+ override def accept(t: T, ex: Throwable) {
+ if (t != null) recipient ! t
+ if (ex != null) recipient ! Status.Failure(ex)
+ }
+ }
+ }
+ def pipeToSelection(recipient: ActorSelection)(implicit sender: ActorRef = Actor.noSender): CompletionStage[T] = {
+ future whenComplete new BiConsumer[T, Throwable] {
+ override def accept(t: T, ex: Throwable) {
+ if (t != null) recipient ! t
+ if (ex != null) recipient ! Status.Failure(ex)
+ }
+ }
+ }
+ def to(recipient: ActorRef): PipeableCompletionStage[T] = to(recipient, Actor.noSender)
+ def to(recipient: ActorRef, sender: ActorRef): PipeableCompletionStage[T] = {
+ pipeTo(recipient)(sender)
+ this
+ }
+ def to(recipient: ActorSelection): PipeableCompletionStage[T] = to(recipient, Actor.noSender)
+ def to(recipient: ActorSelection, sender: ActorRef): PipeableCompletionStage[T] = {
+ pipeToSelection(recipient)(sender)
+ this
+ }
+ }
+
/**
* Import this implicit conversion to gain the `pipeTo` method on [[scala.concurrent.Future]]:
*
@@ -56,4 +85,23 @@ trait PipeToSupport {
* the failure is sent in a [[akka.actor.Status.Failure]] to the recipient.
*/
implicit def pipe[T](future: Future[T])(implicit executionContext: ExecutionContext): PipeableFuture[T] = new PipeableFuture(future)
+
+ /**
+ * Import this implicit conversion to gain the `pipeTo` method on [[scala.concurrent.Future]]:
+ *
+ * {{{
+ * import akka.pattern.pipe
+ *
+ * Future { doExpensiveCalc() } pipeTo nextActor
+ *
+ * or
+ *
+ * pipe(someFuture) to nextActor
+ *
+ * }}}
+ *
+ * The successful result of the future is sent as a message to the recipient, or
+ * the failure is sent in a [[akka.actor.Status.Failure]] to the recipient.
+ */
+ implicit def pipeCompletionStage[T](future: CompletionStage[T])(implicit executionContext: ExecutionContext): PipeableCompletionStage[T] = new PipeableCompletionStage(future)
}
diff --git a/akka-docs/rst/java/code/docs/http/javadsl/HttpClientExampleDocTest.java b/akka-docs/rst/java/code/docs/http/javadsl/HttpClientExampleDocTest.java
index aaf2ebbe94..da59cc79e1 100644
--- a/akka-docs/rst/java/code/docs/http/javadsl/HttpClientExampleDocTest.java
+++ b/akka-docs/rst/java/code/docs/http/javadsl/HttpClientExampleDocTest.java
@@ -22,7 +22,9 @@ import akka.http.javadsl.Http;
import scala.util.Try;
import static akka.http.javadsl.ConnectHttp.toHost;
-import static akka.pattern.Patterns.*;
+import static akka.pattern.PatternsCS.*;
+
+import java.util.concurrent.CompletionStage;
@SuppressWarnings("unused")
public class HttpClientExampleDocTest {
@@ -34,9 +36,9 @@ public class HttpClientExampleDocTest {
final ActorSystem system = ActorSystem.create();
final ActorMaterializer materializer = ActorMaterializer.create(system);
- final Flow> connectionFlow =
+ final Flow> connectionFlow =
Http.get(system).outgoingConnection(toHost("akka.io", 80));
- final Future responseFuture =
+ final CompletionStage responseFuture =
Source.single(HttpRequest.create("/"))
.via(connectionFlow)
.runWith(Sink.head(), materializer);
@@ -58,7 +60,7 @@ public class HttpClientExampleDocTest {
// construct a pool client flow with context type `Integer`
- final Future, Integer>> responseFuture =
+ final CompletionStage, Integer>> responseFuture =
Source
.single(Pair.create(HttpRequest.create("/"), 42))
.via(poolClientFlow)
@@ -72,7 +74,7 @@ public class HttpClientExampleDocTest {
final ActorSystem system = ActorSystem.create();
final Materializer materializer = ActorMaterializer.create(system);
- final Future responseFuture =
+ final CompletionStage responseFuture =
Http.get(system)
.singleRequest(HttpRequest.create("http://akka.io"), materializer);
//#single-request-example
@@ -92,7 +94,7 @@ public class HttpClientExampleDocTest {
}).build());
}
- Future fetch(String url) {
+ CompletionStage fetch(String url) {
return http.singleRequest(HttpRequest.create(url), materializer);
}
}
diff --git a/akka-docs/rst/java/code/docs/http/javadsl/server/HighLevelServerBindFailureExample.java b/akka-docs/rst/java/code/docs/http/javadsl/server/HighLevelServerBindFailureExample.java
index 7b0a750a74..cb509dcac9 100644
--- a/akka-docs/rst/java/code/docs/http/javadsl/server/HighLevelServerBindFailureExample.java
+++ b/akka-docs/rst/java/code/docs/http/javadsl/server/HighLevelServerBindFailureExample.java
@@ -6,32 +6,25 @@ package docs.http.javadsl.server;
//#binding-failure-high-level-example
import akka.actor.ActorSystem;
-import akka.dispatch.OnFailure;
-import akka.http.javadsl.model.ContentTypes;
-import akka.http.javadsl.server.*;
-import akka.http.javadsl.server.values.Parameters;
import akka.http.scaladsl.Http;
-import scala.concurrent.Future;
import java.io.IOException;
+import java.util.concurrent.CompletionStage;
-@SuppressWarnings("unchecked")
public class HighLevelServerBindFailureExample {
public static void main(String[] args) throws IOException {
// boot up server using the route as defined below
final ActorSystem system = ActorSystem.create();
// HttpApp.bindRoute expects a route being provided by HttpApp.createRoute
- Future bindingFuture =
+ CompletionStage bindingFuture =
new HighLevelServerExample().bindRoute("localhost", 8080, system);
- bindingFuture.onFailure(new OnFailure() {
- @Override
- public void onFailure(Throwable failure) throws Throwable {
+ bindingFuture.exceptionally(failure -> {
System.err.println("Something very bad happened! " + failure.getMessage());
system.terminate();
- }
- }, system.dispatcher());
+ return null;
+ });
system.terminate();
}
diff --git a/akka-docs/rst/java/code/docs/http/javadsl/server/HttpBasicAuthenticatorExample.java b/akka-docs/rst/java/code/docs/http/javadsl/server/HttpBasicAuthenticatorExample.java
index 624b34e441..8c6b3beb27 100644
--- a/akka-docs/rst/java/code/docs/http/javadsl/server/HttpBasicAuthenticatorExample.java
+++ b/akka-docs/rst/java/code/docs/http/javadsl/server/HttpBasicAuthenticatorExample.java
@@ -13,6 +13,10 @@ import akka.http.javadsl.server.values.BasicCredentials;
import akka.http.javadsl.server.values.HttpBasicAuthenticator;
import akka.http.javadsl.testkit.JUnitRouteTest;
import akka.http.scaladsl.model.headers.Authorization;
+
+import java.util.Optional;
+import java.util.concurrent.CompletionStage;
+
import org.junit.Test;
import scala.Option;
import scala.concurrent.Future;
@@ -27,7 +31,7 @@ public class HttpBasicAuthenticatorExample extends JUnitRouteTest {
private final String hardcodedPassword = "correcthorsebatterystaple";
- public Future> authenticate(BasicCredentials credentials) {
+ public CompletionStage> authenticate(BasicCredentials credentials) {
// this is where your actual authentication logic would go
if (credentials.available() && // no anonymous access
credentials.verify(hardcodedPassword)) {
diff --git a/akka-docs/rst/java/code/docs/http/javadsl/server/HttpServerExampleDocTest.java b/akka-docs/rst/java/code/docs/http/javadsl/server/HttpServerExampleDocTest.java
index c737b21095..83e7608ff2 100644
--- a/akka-docs/rst/java/code/docs/http/javadsl/server/HttpServerExampleDocTest.java
+++ b/akka-docs/rst/java/code/docs/http/javadsl/server/HttpServerExampleDocTest.java
@@ -30,13 +30,10 @@ import akka.stream.stage.PushStage;
import akka.stream.stage.SyncDirective;
import akka.stream.stage.TerminationDirective;
import akka.util.ByteString;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-import scala.runtime.BoxedUnit;
import java.io.BufferedReader;
import java.io.InputStreamReader;
+import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
@SuppressWarnings("unused")
@@ -47,20 +44,17 @@ public class HttpServerExampleDocTest {
ActorSystem system = ActorSystem.create();
Materializer materializer = ActorMaterializer.create(system);
- Source> serverSource =
+ Source> serverSource =
Http.get(system).bind("localhost", 8080, materializer);
- Future serverBindingFuture =
- serverSource.to(Sink.foreach(
- new Procedure() {
- @Override
- public void apply(IncomingConnection connection) throws Exception {
- System.out.println("Accepted new connection from " + connection.remoteAddress());
- // ... and then actually handle the connection
- }
- })).run(materializer);
+ CompletionStage serverBindingFuture =
+ serverSource.to(Sink.foreach(connection -> {
+ System.out.println("Accepted new connection from " + connection.remoteAddress());
+ // ... and then actually handle the connection
+ }
+ )).run(materializer);
//#binding-example
- Await.result(serverBindingFuture, new FiniteDuration(3, TimeUnit.SECONDS));
+ serverBindingFuture.toCompletableFuture().get(3, TimeUnit.SECONDS);
}
public static void bindingFailureExample() throws Exception {
@@ -68,27 +62,21 @@ public class HttpServerExampleDocTest {
ActorSystem system = ActorSystem.create();
Materializer materializer = ActorMaterializer.create(system);
- Source> serverSource =
+ Source> serverSource =
Http.get(system).bind("localhost", 80, materializer);
- Future serverBindingFuture =
- serverSource.to(Sink.foreach(
- new Procedure() {
- @Override
- public void apply(IncomingConnection connection) throws Exception {
- System.out.println("Accepted new connection from " + connection.remoteAddress());
- // ... and then actually handle the connection
- }
- })).run(materializer);
+ CompletionStage serverBindingFuture =
+ serverSource.to(Sink.foreach(connection -> {
+ System.out.println("Accepted new connection from " + connection.remoteAddress());
+ // ... and then actually handle the connection
+ }
+ )).run(materializer);
- serverBindingFuture.onFailure(new OnFailure() {
- @Override
- public void onFailure(Throwable failure) throws Throwable {
+ serverBindingFuture.whenCompleteAsync((binding, failure) -> {
// possibly report the failure somewhere...
- }
}, system.dispatcher());
//#binding-failure-handling
- Await.result(serverBindingFuture, new FiniteDuration(3, TimeUnit.SECONDS));
+ serverBindingFuture.toCompletableFuture().get(3, TimeUnit.SECONDS);
}
public static void connectionSourceFailureExample() throws Exception {
@@ -96,7 +84,7 @@ public class HttpServerExampleDocTest {
ActorSystem system = ActorSystem.create();
Materializer materializer = ActorMaterializer.create(system);
- Source> serverSource =
+ Source> serverSource =
Http.get(system).bind("localhost", 8080, materializer);
Flow failureDetection =
@@ -114,19 +102,16 @@ public class HttpServerExampleDocTest {
}
});
- Future serverBindingFuture =
+ CompletionStage serverBindingFuture =
serverSource
.via(failureDetection) // feed signals through our custom stage
- .to(Sink.foreach(
- new Procedure() {
- @Override
- public void apply(IncomingConnection connection) throws Exception {
- System.out.println("Accepted new connection from " + connection.remoteAddress());
- // ... and then actually handle the connection
- }
- })).run(materializer);
+ .to(Sink.foreach(connection -> {
+ System.out.println("Accepted new connection from " + connection.remoteAddress());
+ // ... and then actually handle the connection
+ }))
+ .run(materializer);
//#incoming-connections-source-failure-handling
- Await.result(serverBindingFuture, new FiniteDuration(3, TimeUnit.SECONDS));
+ serverBindingFuture.toCompletableFuture().get(3, TimeUnit.SECONDS);
}
public static void connectionStreamFailureExample() throws Exception {
@@ -134,7 +119,7 @@ public class HttpServerExampleDocTest {
ActorSystem system = ActorSystem.create();
Materializer materializer = ActorMaterializer.create(system);
- Source> serverSource =
+ Source> serverSource =
Http.get(system).bind("localhost", 8080, materializer);
Flow failureDetection =
@@ -163,14 +148,14 @@ public class HttpServerExampleDocTest {
.withEntity(entity);
});
- Future serverBindingFuture =
- serverSource.to(Sink.foreach(con -> {
- System.out.println("Accepted new connection from " + con.remoteAddress());
- con.handleWith(httpEcho, materializer);
+ CompletionStage serverBindingFuture =
+ serverSource.to(Sink.foreach(conn -> {
+ System.out.println("Accepted new connection from " + conn.remoteAddress());
+ conn.handleWith(httpEcho, materializer);
}
)).run(materializer);
//#connection-stream-failure-handling
- Await.result(serverBindingFuture, new FiniteDuration(3, TimeUnit.SECONDS));
+ serverBindingFuture.toCompletableFuture().get(3, TimeUnit.SECONDS);
}
public static void fullServerExample() throws Exception {
@@ -181,7 +166,7 @@ public class HttpServerExampleDocTest {
//#full-server-example
final Materializer materializer = ActorMaterializer.create(system);
- Source> serverSource =
+ Source> serverSource =
Http.get(system).bind("localhost", 8080, materializer);
//#request-handler
@@ -219,21 +204,17 @@ public class HttpServerExampleDocTest {
};
//#request-handler
- Future serverBindingFuture =
- serverSource.to(Sink.foreach(
- new Procedure() {
- @Override
- public void apply(IncomingConnection connection) throws Exception {
- System.out.println("Accepted new connection from " + connection.remoteAddress());
+ CompletionStage serverBindingFuture =
+ serverSource.to(Sink.foreach(connection -> {
+ System.out.println("Accepted new connection from " + connection.remoteAddress());
- connection.handleWithSyncHandler(requestHandler, materializer);
- // this is equivalent to
- //connection.handleWith(Flow.of(HttpRequest.class).map(requestHandler), materializer);
- }
+ connection.handleWithSyncHandler(requestHandler, materializer);
+ // this is equivalent to
+ //connection.handleWith(Flow.of(HttpRequest.class).map(requestHandler), materializer);
})).run(materializer);
//#full-server-example
- Await.result(serverBindingFuture, new FiniteDuration(1, TimeUnit.SECONDS)); // will throw if binding fails
+ serverBindingFuture.toCompletableFuture().get(1, TimeUnit.SECONDS); // will throw if binding fails
System.out.println("Press ENTER to stop.");
new BufferedReader(new InputStreamReader(System.in)).readLine();
} finally {
diff --git a/akka-docs/rst/java/code/docs/http/javadsl/server/OAuth2AuthenticatorExample.java b/akka-docs/rst/java/code/docs/http/javadsl/server/OAuth2AuthenticatorExample.java
index 3152179791..17f4973daa 100644
--- a/akka-docs/rst/java/code/docs/http/javadsl/server/OAuth2AuthenticatorExample.java
+++ b/akka-docs/rst/java/code/docs/http/javadsl/server/OAuth2AuthenticatorExample.java
@@ -16,6 +16,10 @@ import akka.http.javadsl.server.values.OAuth2Authenticator;
import akka.http.javadsl.server.values.OAuth2Credentials;
import akka.http.javadsl.testkit.JUnitRouteTest;
import akka.http.scaladsl.model.headers.Authorization;
+
+import java.util.Optional;
+import java.util.concurrent.CompletionStage;
+
import org.junit.Test;
import scala.Option;
import scala.concurrent.Future;
@@ -31,7 +35,7 @@ public class OAuth2AuthenticatorExample extends JUnitRouteTest {
private final String hardcodedToken = "token";
@Override
- public Future> authenticate(OAuth2Credentials credentials) {
+ public CompletionStage> authenticate(OAuth2Credentials credentials) {
// this is where your actual authentication logic would go, looking up the user
// based on the token or something in that direction
if (credentials.available() && // no anonymous access
diff --git a/akka-docs/rst/java/code/docs/http/javadsl/server/WebSocketCoreExample.java b/akka-docs/rst/java/code/docs/http/javadsl/server/WebSocketCoreExample.java
index 2b162a70d9..fad2fad273 100644
--- a/akka-docs/rst/java/code/docs/http/javadsl/server/WebSocketCoreExample.java
+++ b/akka-docs/rst/java/code/docs/http/javadsl/server/WebSocketCoreExample.java
@@ -7,6 +7,7 @@ package docs.http.javadsl.server;
//#websocket-example-using-core
import java.io.BufferedReader;
import java.io.InputStreamReader;
+import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import akka.NotUsed;
@@ -50,7 +51,7 @@ public class WebSocketCoreExample {
try {
final Materializer materializer = ActorMaterializer.create(system);
- Future serverBindingFuture =
+ CompletionStage serverBindingFuture =
Http.get(system).bindAndHandleSync(
new Function() {
public HttpResponse apply(HttpRequest request) throws Exception {
@@ -59,7 +60,7 @@ public class WebSocketCoreExample {
}, "localhost", 8080, materializer);
// will throw if binding fails
- Await.result(serverBindingFuture, new FiniteDuration(1, TimeUnit.SECONDS));
+ serverBindingFuture.toCompletableFuture().get(1, TimeUnit.SECONDS);
System.out.println("Press ENTER to stop.");
new BufferedReader(new InputStreamReader(System.in)).readLine();
} finally {
diff --git a/akka-docs/rst/java/code/docs/persistence/PersistenceQueryDocTest.java b/akka-docs/rst/java/code/docs/persistence/PersistenceQueryDocTest.java
index 220b8ec1fc..906ffec871 100644
--- a/akka-docs/rst/java/code/docs/persistence/PersistenceQueryDocTest.java
+++ b/akka-docs/rst/java/code/docs/persistence/PersistenceQueryDocTest.java
@@ -4,7 +4,7 @@
package docs.persistence;
-import static akka.pattern.Patterns.ask;
+import static akka.pattern.PatternsCS.ask;
import java.util.HashSet;
import java.util.Set;
import java.util.Iterator;
@@ -42,6 +42,7 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
public class PersistenceQueryDocTest {
@@ -343,7 +344,7 @@ public class PersistenceQueryDocTest {
//#projection-into-different-store-simple-classes
class ExampleStore {
- Future save(Object any) {
+ CompletionStage save(Object any) {
// ...
//#projection-into-different-store-simple-classes
return null;
@@ -379,13 +380,13 @@ public class PersistenceQueryDocTest {
this.name = name;
}
- public Future saveProgress(long offset) {
+ public CompletionStage saveProgress(long offset) {
// ...
//#projection-into-different-store
return null;
//#projection-into-different-store
}
- public Future latestOffset() {
+ public CompletionStage latestOffset() {
// ...
//#projection-into-different-store
return null;
@@ -412,17 +413,13 @@ public class PersistenceQueryDocTest {
final Props writerProps = Props.create(TheOneWhoWritesToQueryJournal.class, "bid");
final ActorRef writer = system.actorOf(writerProps, "bid-projection-writer");
- long startFromOffset = Await.result(bidProjection.latestOffset(), timeout.duration());
+ long startFromOffset = bidProjection.latestOffset().toCompletableFuture().get(3, TimeUnit.SECONDS);
readJournal
.eventsByTag("bid", startFromOffset)
- .mapAsync(8, envelope -> {
- final Future f = ask(writer, envelope.event(), timeout);
- return f.map(new Mapper() {
- @Override public Long apply(Object in) {
- return envelope.offset();
- }
- }, system.dispatcher());
+ .mapAsync(8, envelope -> {
+ final CompletionStage f = ask(writer, envelope.event(), timeout);
+ return f.thenApplyAsync(in -> envelope.offset(), system.dispatcher());
})
.mapAsync(1, offset -> bidProjection.saveProgress(offset))
.runWith(Sink.ignore(), mat);
diff --git a/akka-docs/rst/java/code/docs/stream/BidiFlowDocTest.java b/akka-docs/rst/java/code/docs/stream/BidiFlowDocTest.java
index f8896a9322..884d5f5fb3 100644
--- a/akka-docs/rst/java/code/docs/stream/BidiFlowDocTest.java
+++ b/akka-docs/rst/java/code/docs/stream/BidiFlowDocTest.java
@@ -6,6 +6,7 @@ package docs.stream;
import java.nio.ByteOrder;
import java.util.Arrays;
import java.util.List;
+import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import akka.NotUsed;
@@ -221,16 +222,15 @@ public class BidiFlowDocTest {
);
final Flow flow =
stack.atop(stack.reversed()).join(pingpong);
- final Future> result = Source
+ final CompletionStage> result = Source
.from(Arrays.asList(0, 1, 2))
. map(id -> new Ping(id))
.via(flow)
.grouped(10)
.runWith(Sink.> head(), mat);
- final FiniteDuration oneSec = Duration.create(1, TimeUnit.SECONDS);
assertArrayEquals(
new Message[] { new Pong(0), new Pong(1), new Pong(2) },
- Await.result(result, oneSec).toArray(new Message[0]));
+ result.toCompletableFuture().get(1, TimeUnit.SECONDS).toArray(new Message[0]));
//#compose
}
}
diff --git a/akka-docs/rst/java/code/docs/stream/CompositionDocTest.java b/akka-docs/rst/java/code/docs/stream/CompositionDocTest.java
index cf24b43786..6f4e5234c9 100644
--- a/akka-docs/rst/java/code/docs/stream/CompositionDocTest.java
+++ b/akka-docs/rst/java/code/docs/stream/CompositionDocTest.java
@@ -5,6 +5,8 @@ package docs.stream;
import java.util.Arrays;
import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
import akka.NotUsed;
import akka.stream.ClosedShape;
@@ -215,27 +217,23 @@ public class CompositionDocTest {
//#mat-combine-4a
static class MyClass {
- private Promise> p;
+ private CompletableFuture> p;
private OutgoingConnection conn;
- public MyClass(Promise> p, OutgoingConnection conn) {
+ public MyClass(CompletableFuture> p, OutgoingConnection conn) {
this.p = p;
this.conn = conn;
}
public void close() {
- p.success(Optional.empty());
+ p.complete(Optional.empty());
}
}
static class Combiner {
- static Future f(Promise> p,
- Pair, Future> rest) {
- return rest.first().map(new Mapper() {
- public MyClass apply(OutgoingConnection c) {
- return new MyClass(p, c);
- }
- }, system.dispatcher());
+ static CompletionStage f(CompletableFuture> p,
+ Pair, CompletionStage> rest) {
+ return rest.first().thenApply(c -> new MyClass(p, c));
}
}
//#mat-combine-4a
@@ -244,13 +242,13 @@ public class CompositionDocTest {
public void materializedValues() throws Exception {
//#mat-combine-1
// Materializes to Promise (red)
- final Source>> source = Source.maybe();
+ final Source>> source = Source.maybe();
// Materializes to BoxedUnit (black)
final Flow flow1 = Flow.of(Integer.class).take(100);
// Materializes to Promise> (red)
- final Source>> nestedSource =
+ final Source>> nestedSource =
source.viaMat(flow1, Keep.left()).named("nestedSource");
//#mat-combine-1
@@ -260,27 +258,27 @@ public class CompositionDocTest {
.map(i -> ByteString.fromString(i.toString()));
// Materializes to Future (yellow)
- final Flow> flow3 =
+ final Flow> flow3 =
Tcp.get(system).outgoingConnection("localhost", 8080);
// Materializes to Future (yellow)
- final Flow> nestedFlow =
+ final Flow> nestedFlow =
flow2.viaMat(flow3, Keep.right()).named("nestedFlow");
//#mat-combine-2
//#mat-combine-3
// Materializes to Future (green)
- final Sink> sink = Sink
- .fold("", (acc, i) -> acc + i.utf8String());
+ final Sink> sink =
+ Sink. fold("", (acc, i) -> acc + i.utf8String());
// Materializes to Pair, Future> (blue)
- final Sink, Future>> nestedSink =
+ final Sink, CompletionStage>> nestedSink =
nestedFlow.toMat(sink, Keep.both());
//#mat-combine-3
//#mat-combine-4b
// Materializes to Future (purple)
- final RunnableGraph> runnableGraph =
+ final RunnableGraph> runnableGraph =
nestedSource.toMat(nestedSink, Combiner::f);
//#mat-combine-4b
}
diff --git a/akka-docs/rst/java/code/docs/stream/FlowDocTest.java b/akka-docs/rst/java/code/docs/stream/FlowDocTest.java
index 2397b4fe8f..10b7ce4831 100644
--- a/akka-docs/rst/java/code/docs/stream/FlowDocTest.java
+++ b/akka-docs/rst/java/code/docs/stream/FlowDocTest.java
@@ -6,6 +6,8 @@ package docs.stream;
import static org.junit.Assert.assertEquals;
import java.util.*;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
@@ -55,15 +57,12 @@ public class FlowDocTest {
// returns new Source, with `map()` appended
final Source zeroes = source.map(x -> 0);
- final Sink> fold =
- Sink.fold(0, (agg, next) -> agg + next);
+ final Sink> fold =
+ Sink. fold(0, (agg, next) -> agg + next);
zeroes.runWith(fold, mat); // 0
//#source-immutable
- int result = Await.result(
- zeroes.runWith(fold, mat),
- Duration.create(3, TimeUnit.SECONDS)
- );
+ int result = zeroes.runWith(fold, mat).toCompletableFuture().get(3, TimeUnit.SECONDS);
assertEquals(0, result);
}
@@ -73,18 +72,18 @@ public class FlowDocTest {
final Source source =
Source.from(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
// note that the Future is scala.concurrent.Future
- final Sink> sink =
- Sink.fold(0, (aggr, next) -> aggr + next);
+ final Sink> sink =
+ Sink. fold(0, (aggr, next) -> aggr + next);
// connect the Source to the Sink, obtaining a RunnableFlow
- final RunnableGraph> runnable =
+ final RunnableGraph> runnable =
source.toMat(sink, Keep.right());
// materialize the flow
- final Future sum = runnable.run(mat);
+ final CompletionStage sum = runnable.run(mat);
//#materialization-in-steps
- int result = Await.result(sum, Duration.create(3, TimeUnit.SECONDS));
+ int result = sum.toCompletableFuture().get(3, TimeUnit.SECONDS);
assertEquals(55, result);
}
@@ -93,14 +92,14 @@ public class FlowDocTest {
//#materialization-runWith
final Source source =
Source.from(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
- final Sink> sink =
- Sink.fold(0, (aggr, next) -> aggr + next);
+ final Sink> sink =
+ Sink. fold(0, (aggr, next) -> aggr + next);
// materialize the flow, getting the Sinks materialized value
- final Future sum = source.runWith(sink, mat);
+ final CompletionStage sum = source.runWith(sink, mat);
//#materialization-runWith
- int result = Await.result(sum, Duration.create(3, TimeUnit.SECONDS));
+ int result = sum.toCompletableFuture().get(3, TimeUnit.SECONDS);
assertEquals(55, result);
}
@@ -108,21 +107,21 @@ public class FlowDocTest {
public void materializedMapUnique() throws Exception {
//#stream-reuse
// connect the Source to the Sink, obtaining a RunnableGraph
- final Sink> sink =
- Sink.fold(0, (aggr, next) -> aggr + next);
- final RunnableGraph> runnable =
+ final Sink> sink =
+ Sink. fold(0, (aggr, next) -> aggr + next);
+ final RunnableGraph> runnable =
Source.from(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)).toMat(sink, Keep.right());
// get the materialized value of the FoldSink
- final Future sum1 = runnable.run(mat);
- final Future sum2 = runnable.run(mat);
+ final CompletionStage sum1 = runnable.run(mat);
+ final CompletionStage sum2 = runnable.run(mat);
// sum1 and sum2 are different Futures!
//#stream-reuse
- int result1 = Await.result(sum1, Duration.create(3, TimeUnit.SECONDS));
+ int result1 = sum1.toCompletableFuture().get(3, TimeUnit.SECONDS);
assertEquals(55, result1);
- int result2 = Await.result(sum2, Duration.create(3, TimeUnit.SECONDS));
+ int result2 = sum2.toCompletableFuture().get(3, TimeUnit.SECONDS);
assertEquals(55, result2);
}
@@ -222,40 +221,40 @@ public class FlowDocTest {
//#flow-mat-combine
// An empty source that can be shut down explicitly from the outside
- Source>> source = Source.maybe();
+ Source>> source = Source.maybe();
// A flow that internally throttles elements to 1/second, and returns a Cancellable
// which can be used to shut down the stream
Flow flow = throttler;
// A sink that returns the first element of a stream in the returned Future
- Sink> sink = Sink.head();
+ Sink> sink = Sink.head();
// By default, the materialized value of the leftmost stage is preserved
- RunnableGraph>> r1 = source.via(flow).to(sink);
+ RunnableGraph>> r1 = source.via(flow).to(sink);
// Simple selection of materialized values by using Keep.right
RunnableGraph r2 = source.viaMat(flow, Keep.right()).to(sink);
- RunnableGraph> r3 = source.via(flow).toMat(sink, Keep.right());
+ RunnableGraph> r3 = source.via(flow).toMat(sink, Keep.right());
// Using runWith will always give the materialized values of the stages added
// by runWith() itself
- Future r4 = source.via(flow).runWith(sink, mat);
- Promise> r5 = flow.to(sink).runWith(source, mat);
- Pair>, Future> r6 = flow.runWith(source, sink, mat);
+ CompletionStage r4 = source.via(flow).runWith(sink, mat);
+ CompletableFuture> r5 = flow.to(sink).runWith(source, mat);
+ Pair>, CompletionStage> r6 = flow.runWith(source, sink, mat);
// Using more complext combinations
- RunnableGraph>, Cancellable>> r7 =
+ RunnableGraph>, Cancellable>> r7 =
source.viaMat(flow, Keep.both()).to(sink);
- RunnableGraph>, Future>> r8 =
+ RunnableGraph>, CompletionStage>> r8 =
source.via(flow).toMat(sink, Keep.both());
- RunnableGraph>, Cancellable>, Future>> r9 =
+ RunnableGraph>, Cancellable>, CompletionStage>> r9 =
source.viaMat(flow, Keep.both()).toMat(sink, Keep.both());
- RunnableGraph>> r10 =
+ RunnableGraph>> r10 =
source.viaMat(flow, Keep.right()).toMat(sink, Keep.both());
// It is also possible to map over the materialized values. In r9 we had a
@@ -264,9 +263,9 @@ public class FlowDocTest {
RunnableGraph r11 =
r9.mapMaterializedValue( (nestedTuple) -> {
- Promise> p = nestedTuple.first().first();
+ CompletableFuture> p = nestedTuple.first().first();
Cancellable c = nestedTuple.first().second();
- Future f = nestedTuple.second();
+ CompletionStage f = nestedTuple.second();
// Picking the Cancellable, but we could also construct a domain class here
return c;
diff --git a/akka-docs/rst/java/code/docs/stream/FlowErrorDocTest.java b/akka-docs/rst/java/code/docs/stream/FlowErrorDocTest.java
index d575b69ab0..105580bff3 100644
--- a/akka-docs/rst/java/code/docs/stream/FlowErrorDocTest.java
+++ b/akka-docs/rst/java/code/docs/stream/FlowErrorDocTest.java
@@ -6,6 +6,7 @@ package docs.stream;
import static org.junit.Assert.assertEquals;
import java.util.Arrays;
import java.util.List;
+import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import akka.NotUsed;
@@ -49,14 +50,14 @@ public class FlowErrorDocTest {
final Materializer mat = ActorMaterializer.create(system);
final Source source = Source.from(Arrays.asList(0, 1, 2, 3, 4, 5))
.map(elem -> 100 / elem);
- final Sink> fold =
- Sink.fold(0, (acc, elem) -> acc + elem);
- final Future result = source.runWith(fold, mat);
+ final Sink> fold =
+ Sink. fold(0, (acc, elem) -> acc + elem);
+ final CompletionStage result = source.runWith(fold, mat);
// division by zero will fail the stream and the
// result here will be a Future completed with Failure(ArithmeticException)
//#stop
- Await.result(result, Duration.create(3, TimeUnit.SECONDS));
+ result.toCompletableFuture().get(3, TimeUnit.SECONDS);
}
@Test
@@ -73,14 +74,14 @@ public class FlowErrorDocTest {
system);
final Source source = Source.from(Arrays.asList(0, 1, 2, 3, 4, 5))
.map(elem -> 100 / elem);
- final Sink> fold =
+ final Sink> fold =
Sink.fold(0, (acc, elem) -> acc + elem);
- final Future result = source.runWith(fold, mat);
+ final CompletionStage result = source.runWith(fold, mat);
// the element causing division by zero will be dropped
// result here will be a Future completed with Success(228)
//#resume
- assertEquals(Integer.valueOf(228), Await.result(result, Duration.create(3, TimeUnit.SECONDS)));
+ assertEquals(Integer.valueOf(228), result.toCompletableFuture().get(3, TimeUnit.SECONDS));
}
@Test
@@ -98,14 +99,14 @@ public class FlowErrorDocTest {
.withAttributes(ActorAttributes.withSupervisionStrategy(decider));
final Source source = Source.from(Arrays.asList(0, 1, 2, 3, 4, 5))
.via(flow);
- final Sink> fold =
- Sink.fold(0, (acc, elem) -> acc + elem);
- final Future result = source.runWith(fold, mat);
+ final Sink> fold =
+ Sink. fold(0, (acc, elem) -> acc + elem);
+ final CompletionStage result = source.runWith(fold, mat);
// the elements causing division by zero will be dropped
// result here will be a Future completed with Success(150)
//#resume-section
- assertEquals(Integer.valueOf(150), Await.result(result, Duration.create(3, TimeUnit.SECONDS)));
+ assertEquals(Integer.valueOf(150), result.toCompletableFuture().get(3, TimeUnit.SECONDS));
}
@Test
@@ -126,7 +127,7 @@ public class FlowErrorDocTest {
.withAttributes(ActorAttributes.withSupervisionStrategy(decider));
final Source source = Source.from(Arrays.asList(1, 3, -1, 5, 7))
.via(flow);
- final Future> result = source.grouped(1000)
+ final CompletionStage> result = source.grouped(1000)
.runWith(Sink.>head(), mat);
// the negative element cause the scan stage to be restarted,
// i.e. start from 0 again
@@ -135,7 +136,7 @@ public class FlowErrorDocTest {
assertEquals(
Arrays.asList(0, 1, 4, 0, 5, 12),
- Await.result(result, Duration.create(3, TimeUnit.SECONDS)));
+ result.toCompletableFuture().get(3, TimeUnit.SECONDS));
}
}
diff --git a/akka-docs/rst/java/code/docs/stream/FlowGraphDocTest.java b/akka-docs/rst/java/code/docs/stream/FlowGraphDocTest.java
index 0cef3e1bf5..99531726fc 100644
--- a/akka-docs/rst/java/code/docs/stream/FlowGraphDocTest.java
+++ b/akka-docs/rst/java/code/docs/stream/FlowGraphDocTest.java
@@ -7,6 +7,7 @@ import static org.junit.Assert.*;
import java.util.Arrays;
import java.util.List;
+import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import akka.NotUsed;
@@ -46,15 +47,15 @@ public class FlowGraphDocTest {
public void demonstrateBuildSimpleGraph() throws Exception {
//#simple-flow-graph
final Source in = Source.from(Arrays.asList(1, 2, 3, 4, 5));
- final Sink, Future>> sink = Sink.head();
- final Sink, Future>> sink2 = Sink.head();
+ final Sink, CompletionStage>> sink = Sink.head();
+ final Sink, CompletionStage>> sink2 = Sink.head();
final Flow f1 = Flow.of(Integer.class).map(elem -> elem + 10);
final Flow f2 = Flow.of(Integer.class).map(elem -> elem + 20);
final Flow f3 = Flow.of(Integer.class).map(elem -> elem.toString());
final Flow f4 = Flow.of(Integer.class).map(elem -> elem + 30);
- final RunnableGraph>> result =
- RunnableGraph.>>fromGraph(
+ final RunnableGraph>> result =
+ RunnableGraph.>>fromGraph(
GraphDSL
.create(
sink,
@@ -70,7 +71,7 @@ public class FlowGraphDocTest {
return ClosedShape.getInstance();
}));
//#simple-flow-graph
- final List list = Await.result(result.run(mat), Duration.create(3, TimeUnit.SECONDS));
+ final List list = result.run(mat).toCompletableFuture().get(3, TimeUnit.SECONDS);
final String[] res = list.toArray(new String[] {});
Arrays.sort(res, null);
assertArrayEquals(new String[] { "31", "32", "33", "34", "35", "41", "42", "43", "44", "45" }, res);
@@ -105,12 +106,12 @@ public class FlowGraphDocTest {
@Test
public void demonstrateReusingFlowInGraph() throws Exception {
//#flow-graph-reusing-a-flow
- final Sink> topHeadSink = Sink.head();
- final Sink> bottomHeadSink = Sink.head();
+ final Sink> topHeadSink = Sink.head();
+ final Sink> bottomHeadSink = Sink.head();
final Flow sharedDoubler = Flow.of(Integer.class).map(elem -> elem * 2);
- final RunnableGraph, Future>> g =
- RunnableGraph., Future>>fromGraph(
+ final RunnableGraph, CompletionStage>> g =
+ RunnableGraph., CompletionStage>>fromGraph(
GraphDSL.create(
topHeadSink, // import this sink into the graph
bottomHeadSink, // and this as well
@@ -127,24 +128,22 @@ public class FlowGraphDocTest {
)
);
//#flow-graph-reusing-a-flow
- final Pair, Future> pair = g.run(mat);
- assertEquals(Integer.valueOf(2), Await.result(pair.first(), Duration.create(3, TimeUnit.SECONDS)));
- assertEquals(Integer.valueOf(2), Await.result(pair.second(), Duration.create(3, TimeUnit.SECONDS)));
+ final Pair, CompletionStage> pair = g.run(mat);
+ assertEquals(Integer.valueOf(2), pair.first().toCompletableFuture().get(3, TimeUnit.SECONDS));
+ assertEquals(Integer.valueOf(2), pair.second().toCompletableFuture().get(3, TimeUnit.SECONDS));
}
@Test
public void demonstrateMatValue() throws Exception {
//#flow-graph-matvalue
- final Sink> foldSink = Sink. fold(0, (a, b) -> {
+ final Sink> foldSink = Sink. fold(0, (a, b) -> {
return a + b;
});
- final Flow, Integer, NotUsed> flatten = Flow.>create()
- .mapAsync(4, x -> {
- return x;
- });
+ final Flow, Integer, NotUsed> flatten =
+ Flow.>create().mapAsync(4, x -> x);
- final Flow> foldingFlow = Flow.fromGraph(
+ final Flow> foldingFlow = Flow.fromGraph(
GraphDSL.create(foldSink,
(b, fold) -> {
return FlowShape.of(
@@ -155,7 +154,7 @@ public class FlowGraphDocTest {
//#flow-graph-matvalue-cycle
// This cannot produce any value:
- final Source> cyclicSource = Source.fromGraph(
+ final Source> cyclicSource = Source.fromGraph(
GraphDSL.create(foldSink,
(b, fold) -> {
// - Fold cannot complete until its upstream mapAsync completes
diff --git a/akka-docs/rst/java/code/docs/stream/FlowStagesDocTest.java b/akka-docs/rst/java/code/docs/stream/FlowStagesDocTest.java
index 41fac66ccb..91c8433f59 100644
--- a/akka-docs/rst/java/code/docs/stream/FlowStagesDocTest.java
+++ b/akka-docs/rst/java/code/docs/stream/FlowStagesDocTest.java
@@ -9,6 +9,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Predicate;
@@ -154,11 +155,11 @@ public class FlowStagesDocTest {
@Test
public void demonstrateVariousPushPullStages() throws Exception {
- final Sink