#19440 replace Scala Future usage with CompletionStage in javadsl

This entails:

  * adding akka.pattern.PatternCS.* to enable ask etc. with
    CompletionStage
  * changing RequestContext to offer an ExecutionContextExecutor for the
    CompletionStage.*Async combinators
  * splitting up akka.stream.Queue for JavaDSL consistency
This commit is contained in:
Roland Kuhn 2016-01-21 16:37:26 +01:00
parent 396f4370e9
commit 4c72495581
118 changed files with 1646 additions and 1379 deletions

View file

@ -22,7 +22,9 @@ import akka.http.javadsl.Http;
import scala.util.Try;
import static akka.http.javadsl.ConnectHttp.toHost;
import static akka.pattern.Patterns.*;
import static akka.pattern.PatternsCS.*;
import java.util.concurrent.CompletionStage;
@SuppressWarnings("unused")
public class HttpClientExampleDocTest {
@ -34,9 +36,9 @@ public class HttpClientExampleDocTest {
final ActorSystem system = ActorSystem.create();
final ActorMaterializer materializer = ActorMaterializer.create(system);
final Flow<HttpRequest, HttpResponse, Future<OutgoingConnection>> connectionFlow =
final Flow<HttpRequest, HttpResponse, CompletionStage<OutgoingConnection>> connectionFlow =
Http.get(system).outgoingConnection(toHost("akka.io", 80));
final Future<HttpResponse> responseFuture =
final CompletionStage<HttpResponse> responseFuture =
Source.single(HttpRequest.create("/"))
.via(connectionFlow)
.runWith(Sink.<HttpResponse>head(), materializer);
@ -58,7 +60,7 @@ public class HttpClientExampleDocTest {
// construct a pool client flow with context type `Integer`
final Future<Pair<Try<HttpResponse>, Integer>> responseFuture =
final CompletionStage<Pair<Try<HttpResponse>, Integer>> responseFuture =
Source
.single(Pair.create(HttpRequest.create("/"), 42))
.via(poolClientFlow)
@ -72,7 +74,7 @@ public class HttpClientExampleDocTest {
final ActorSystem system = ActorSystem.create();
final Materializer materializer = ActorMaterializer.create(system);
final Future<HttpResponse> responseFuture =
final CompletionStage<HttpResponse> responseFuture =
Http.get(system)
.singleRequest(HttpRequest.create("http://akka.io"), materializer);
//#single-request-example
@ -92,7 +94,7 @@ public class HttpClientExampleDocTest {
}).build());
}
Future<HttpResponse> fetch(String url) {
CompletionStage<HttpResponse> fetch(String url) {
return http.singleRequest(HttpRequest.create(url), materializer);
}
}

View file

@ -6,32 +6,25 @@ package docs.http.javadsl.server;
//#binding-failure-high-level-example
import akka.actor.ActorSystem;
import akka.dispatch.OnFailure;
import akka.http.javadsl.model.ContentTypes;
import akka.http.javadsl.server.*;
import akka.http.javadsl.server.values.Parameters;
import akka.http.scaladsl.Http;
import scala.concurrent.Future;
import java.io.IOException;
import java.util.concurrent.CompletionStage;
@SuppressWarnings("unchecked")
public class HighLevelServerBindFailureExample {
public static void main(String[] args) throws IOException {
// boot up server using the route as defined below
final ActorSystem system = ActorSystem.create();
// HttpApp.bindRoute expects a route being provided by HttpApp.createRoute
Future<Http.ServerBinding> bindingFuture =
CompletionStage<Http.ServerBinding> bindingFuture =
new HighLevelServerExample().bindRoute("localhost", 8080, system);
bindingFuture.onFailure(new OnFailure() {
@Override
public void onFailure(Throwable failure) throws Throwable {
bindingFuture.exceptionally(failure -> {
System.err.println("Something very bad happened! " + failure.getMessage());
system.terminate();
}
}, system.dispatcher());
return null;
});
system.terminate();
}

View file

@ -13,6 +13,10 @@ import akka.http.javadsl.server.values.BasicCredentials;
import akka.http.javadsl.server.values.HttpBasicAuthenticator;
import akka.http.javadsl.testkit.JUnitRouteTest;
import akka.http.scaladsl.model.headers.Authorization;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
import org.junit.Test;
import scala.Option;
import scala.concurrent.Future;
@ -27,7 +31,7 @@ public class HttpBasicAuthenticatorExample extends JUnitRouteTest {
private final String hardcodedPassword = "correcthorsebatterystaple";
public Future<Option<String>> authenticate(BasicCredentials credentials) {
public CompletionStage<Optional<String>> authenticate(BasicCredentials credentials) {
// this is where your actual authentication logic would go
if (credentials.available() && // no anonymous access
credentials.verify(hardcodedPassword)) {

View file

@ -30,13 +30,10 @@ import akka.stream.stage.PushStage;
import akka.stream.stage.SyncDirective;
import akka.stream.stage.TerminationDirective;
import akka.util.ByteString;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
import scala.runtime.BoxedUnit;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
@SuppressWarnings("unused")
@ -47,20 +44,17 @@ public class HttpServerExampleDocTest {
ActorSystem system = ActorSystem.create();
Materializer materializer = ActorMaterializer.create(system);
Source<IncomingConnection, Future<ServerBinding>> serverSource =
Source<IncomingConnection, CompletionStage<ServerBinding>> serverSource =
Http.get(system).bind("localhost", 8080, materializer);
Future<ServerBinding> serverBindingFuture =
serverSource.to(Sink.foreach(
new Procedure<IncomingConnection>() {
@Override
public void apply(IncomingConnection connection) throws Exception {
System.out.println("Accepted new connection from " + connection.remoteAddress());
// ... and then actually handle the connection
}
})).run(materializer);
CompletionStage<ServerBinding> serverBindingFuture =
serverSource.to(Sink.foreach(connection -> {
System.out.println("Accepted new connection from " + connection.remoteAddress());
// ... and then actually handle the connection
}
)).run(materializer);
//#binding-example
Await.result(serverBindingFuture, new FiniteDuration(3, TimeUnit.SECONDS));
serverBindingFuture.toCompletableFuture().get(3, TimeUnit.SECONDS);
}
public static void bindingFailureExample() throws Exception {
@ -68,27 +62,21 @@ public class HttpServerExampleDocTest {
ActorSystem system = ActorSystem.create();
Materializer materializer = ActorMaterializer.create(system);
Source<IncomingConnection, Future<ServerBinding>> serverSource =
Source<IncomingConnection, CompletionStage<ServerBinding>> serverSource =
Http.get(system).bind("localhost", 80, materializer);
Future<ServerBinding> serverBindingFuture =
serverSource.to(Sink.foreach(
new Procedure<IncomingConnection>() {
@Override
public void apply(IncomingConnection connection) throws Exception {
System.out.println("Accepted new connection from " + connection.remoteAddress());
// ... and then actually handle the connection
}
})).run(materializer);
CompletionStage<ServerBinding> serverBindingFuture =
serverSource.to(Sink.foreach(connection -> {
System.out.println("Accepted new connection from " + connection.remoteAddress());
// ... and then actually handle the connection
}
)).run(materializer);
serverBindingFuture.onFailure(new OnFailure() {
@Override
public void onFailure(Throwable failure) throws Throwable {
serverBindingFuture.whenCompleteAsync((binding, failure) -> {
// possibly report the failure somewhere...
}
}, system.dispatcher());
//#binding-failure-handling
Await.result(serverBindingFuture, new FiniteDuration(3, TimeUnit.SECONDS));
serverBindingFuture.toCompletableFuture().get(3, TimeUnit.SECONDS);
}
public static void connectionSourceFailureExample() throws Exception {
@ -96,7 +84,7 @@ public class HttpServerExampleDocTest {
ActorSystem system = ActorSystem.create();
Materializer materializer = ActorMaterializer.create(system);
Source<IncomingConnection, Future<ServerBinding>> serverSource =
Source<IncomingConnection, CompletionStage<ServerBinding>> serverSource =
Http.get(system).bind("localhost", 8080, materializer);
Flow<IncomingConnection, IncomingConnection, NotUsed> failureDetection =
@ -114,19 +102,16 @@ public class HttpServerExampleDocTest {
}
});
Future<ServerBinding> serverBindingFuture =
CompletionStage<ServerBinding> serverBindingFuture =
serverSource
.via(failureDetection) // feed signals through our custom stage
.to(Sink.foreach(
new Procedure<IncomingConnection>() {
@Override
public void apply(IncomingConnection connection) throws Exception {
System.out.println("Accepted new connection from " + connection.remoteAddress());
// ... and then actually handle the connection
}
})).run(materializer);
.to(Sink.foreach(connection -> {
System.out.println("Accepted new connection from " + connection.remoteAddress());
// ... and then actually handle the connection
}))
.run(materializer);
//#incoming-connections-source-failure-handling
Await.result(serverBindingFuture, new FiniteDuration(3, TimeUnit.SECONDS));
serverBindingFuture.toCompletableFuture().get(3, TimeUnit.SECONDS);
}
public static void connectionStreamFailureExample() throws Exception {
@ -134,7 +119,7 @@ public class HttpServerExampleDocTest {
ActorSystem system = ActorSystem.create();
Materializer materializer = ActorMaterializer.create(system);
Source<IncomingConnection, Future<ServerBinding>> serverSource =
Source<IncomingConnection, CompletionStage<ServerBinding>> serverSource =
Http.get(system).bind("localhost", 8080, materializer);
Flow<HttpRequest, HttpRequest, NotUsed> failureDetection =
@ -163,14 +148,14 @@ public class HttpServerExampleDocTest {
.withEntity(entity);
});
Future<ServerBinding> serverBindingFuture =
serverSource.to(Sink.foreach(con -> {
System.out.println("Accepted new connection from " + con.remoteAddress());
con.handleWith(httpEcho, materializer);
CompletionStage<ServerBinding> serverBindingFuture =
serverSource.to(Sink.foreach(conn -> {
System.out.println("Accepted new connection from " + conn.remoteAddress());
conn.handleWith(httpEcho, materializer);
}
)).run(materializer);
//#connection-stream-failure-handling
Await.result(serverBindingFuture, new FiniteDuration(3, TimeUnit.SECONDS));
serverBindingFuture.toCompletableFuture().get(3, TimeUnit.SECONDS);
}
public static void fullServerExample() throws Exception {
@ -181,7 +166,7 @@ public class HttpServerExampleDocTest {
//#full-server-example
final Materializer materializer = ActorMaterializer.create(system);
Source<IncomingConnection, Future<ServerBinding>> serverSource =
Source<IncomingConnection, CompletionStage<ServerBinding>> serverSource =
Http.get(system).bind("localhost", 8080, materializer);
//#request-handler
@ -219,21 +204,17 @@ public class HttpServerExampleDocTest {
};
//#request-handler
Future<ServerBinding> serverBindingFuture =
serverSource.to(Sink.foreach(
new Procedure<IncomingConnection>() {
@Override
public void apply(IncomingConnection connection) throws Exception {
System.out.println("Accepted new connection from " + connection.remoteAddress());
CompletionStage<ServerBinding> serverBindingFuture =
serverSource.to(Sink.foreach(connection -> {
System.out.println("Accepted new connection from " + connection.remoteAddress());
connection.handleWithSyncHandler(requestHandler, materializer);
// this is equivalent to
//connection.handleWith(Flow.of(HttpRequest.class).map(requestHandler), materializer);
}
connection.handleWithSyncHandler(requestHandler, materializer);
// this is equivalent to
//connection.handleWith(Flow.of(HttpRequest.class).map(requestHandler), materializer);
})).run(materializer);
//#full-server-example
Await.result(serverBindingFuture, new FiniteDuration(1, TimeUnit.SECONDS)); // will throw if binding fails
serverBindingFuture.toCompletableFuture().get(1, TimeUnit.SECONDS); // will throw if binding fails
System.out.println("Press ENTER to stop.");
new BufferedReader(new InputStreamReader(System.in)).readLine();
} finally {

View file

@ -16,6 +16,10 @@ import akka.http.javadsl.server.values.OAuth2Authenticator;
import akka.http.javadsl.server.values.OAuth2Credentials;
import akka.http.javadsl.testkit.JUnitRouteTest;
import akka.http.scaladsl.model.headers.Authorization;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
import org.junit.Test;
import scala.Option;
import scala.concurrent.Future;
@ -31,7 +35,7 @@ public class OAuth2AuthenticatorExample extends JUnitRouteTest {
private final String hardcodedToken = "token";
@Override
public Future<Option<String>> authenticate(OAuth2Credentials credentials) {
public CompletionStage<Optional<String>> authenticate(OAuth2Credentials credentials) {
// this is where your actual authentication logic would go, looking up the user
// based on the token or something in that direction
if (credentials.available() && // no anonymous access

View file

@ -7,6 +7,7 @@ package docs.http.javadsl.server;
//#websocket-example-using-core
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import akka.NotUsed;
@ -50,7 +51,7 @@ public class WebSocketCoreExample {
try {
final Materializer materializer = ActorMaterializer.create(system);
Future<ServerBinding> serverBindingFuture =
CompletionStage<ServerBinding> serverBindingFuture =
Http.get(system).bindAndHandleSync(
new Function<HttpRequest, HttpResponse>() {
public HttpResponse apply(HttpRequest request) throws Exception {
@ -59,7 +60,7 @@ public class WebSocketCoreExample {
}, "localhost", 8080, materializer);
// will throw if binding fails
Await.result(serverBindingFuture, new FiniteDuration(1, TimeUnit.SECONDS));
serverBindingFuture.toCompletableFuture().get(1, TimeUnit.SECONDS);
System.out.println("Press ENTER to stop.");
new BufferedReader(new InputStreamReader(System.in)).readLine();
} finally {

View file

@ -4,7 +4,7 @@
package docs.persistence;
import static akka.pattern.Patterns.ask;
import static akka.pattern.PatternsCS.ask;
import java.util.HashSet;
import java.util.Set;
import java.util.Iterator;
@ -42,6 +42,7 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
public class PersistenceQueryDocTest {
@ -343,7 +344,7 @@ public class PersistenceQueryDocTest {
//#projection-into-different-store-simple-classes
class ExampleStore {
Future<Void> save(Object any) {
CompletionStage<Void> save(Object any) {
// ...
//#projection-into-different-store-simple-classes
return null;
@ -379,13 +380,13 @@ public class PersistenceQueryDocTest {
this.name = name;
}
public Future<Long> saveProgress(long offset) {
public CompletionStage<Long> saveProgress(long offset) {
// ...
//#projection-into-different-store
return null;
//#projection-into-different-store
}
public Future<Long> latestOffset() {
public CompletionStage<Long> latestOffset() {
// ...
//#projection-into-different-store
return null;
@ -412,17 +413,13 @@ public class PersistenceQueryDocTest {
final Props writerProps = Props.create(TheOneWhoWritesToQueryJournal.class, "bid");
final ActorRef writer = system.actorOf(writerProps, "bid-projection-writer");
long startFromOffset = Await.result(bidProjection.latestOffset(), timeout.duration());
long startFromOffset = bidProjection.latestOffset().toCompletableFuture().get(3, TimeUnit.SECONDS);
readJournal
.eventsByTag("bid", startFromOffset)
.<Long>mapAsync(8, envelope -> {
final Future<Object> f = ask(writer, envelope.event(), timeout);
return f.<Long>map(new Mapper<Object, Long>() {
@Override public Long apply(Object in) {
return envelope.offset();
}
}, system.dispatcher());
.mapAsync(8, envelope -> {
final CompletionStage<Object> f = ask(writer, envelope.event(), timeout);
return f.thenApplyAsync(in -> envelope.offset(), system.dispatcher());
})
.mapAsync(1, offset -> bidProjection.saveProgress(offset))
.runWith(Sink.ignore(), mat);

View file

@ -6,6 +6,7 @@ package docs.stream;
import java.nio.ByteOrder;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import akka.NotUsed;
@ -221,16 +222,15 @@ public class BidiFlowDocTest {
);
final Flow<Message, Message, NotUsed> flow =
stack.atop(stack.reversed()).join(pingpong);
final Future<List<Message>> result = Source
final CompletionStage<List<Message>> result = Source
.from(Arrays.asList(0, 1, 2))
.<Message> map(id -> new Ping(id))
.via(flow)
.grouped(10)
.runWith(Sink.<List<Message>> head(), mat);
final FiniteDuration oneSec = Duration.create(1, TimeUnit.SECONDS);
assertArrayEquals(
new Message[] { new Pong(0), new Pong(1), new Pong(2) },
Await.result(result, oneSec).toArray(new Message[0]));
result.toCompletableFuture().get(1, TimeUnit.SECONDS).toArray(new Message[0]));
//#compose
}
}

View file

@ -5,6 +5,8 @@ package docs.stream;
import java.util.Arrays;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import akka.NotUsed;
import akka.stream.ClosedShape;
@ -215,27 +217,23 @@ public class CompositionDocTest {
//#mat-combine-4a
static class MyClass {
private Promise<Optional<Integer>> p;
private CompletableFuture<Optional<Integer>> p;
private OutgoingConnection conn;
public MyClass(Promise<Optional<Integer>> p, OutgoingConnection conn) {
public MyClass(CompletableFuture<Optional<Integer>> p, OutgoingConnection conn) {
this.p = p;
this.conn = conn;
}
public void close() {
p.success(Optional.empty());
p.complete(Optional.empty());
}
}
static class Combiner {
static Future<MyClass> f(Promise<Optional<Integer>> p,
Pair<Future<OutgoingConnection>, Future<String>> rest) {
return rest.first().map(new Mapper<OutgoingConnection, MyClass>() {
public MyClass apply(OutgoingConnection c) {
return new MyClass(p, c);
}
}, system.dispatcher());
static CompletionStage<MyClass> f(CompletableFuture<Optional<Integer>> p,
Pair<CompletionStage<OutgoingConnection>, CompletionStage<String>> rest) {
return rest.first().thenApply(c -> new MyClass(p, c));
}
}
//#mat-combine-4a
@ -244,13 +242,13 @@ public class CompositionDocTest {
public void materializedValues() throws Exception {
//#mat-combine-1
// Materializes to Promise<BoxedUnit> (red)
final Source<Integer, Promise<Optional<Integer>>> source = Source.<Integer>maybe();
final Source<Integer, CompletableFuture<Optional<Integer>>> source = Source.<Integer>maybe();
// Materializes to BoxedUnit (black)
final Flow<Integer, Integer, NotUsed> flow1 = Flow.of(Integer.class).take(100);
// Materializes to Promise<Option<>> (red)
final Source<Integer, Promise<Optional<Integer>>> nestedSource =
final Source<Integer, CompletableFuture<Optional<Integer>>> nestedSource =
source.viaMat(flow1, Keep.left()).named("nestedSource");
//#mat-combine-1
@ -260,27 +258,27 @@ public class CompositionDocTest {
.map(i -> ByteString.fromString(i.toString()));
// Materializes to Future<OutgoingConnection> (yellow)
final Flow<ByteString, ByteString, Future<OutgoingConnection>> flow3 =
final Flow<ByteString, ByteString, CompletionStage<OutgoingConnection>> flow3 =
Tcp.get(system).outgoingConnection("localhost", 8080);
// Materializes to Future<OutgoingConnection> (yellow)
final Flow<Integer, ByteString, Future<OutgoingConnection>> nestedFlow =
final Flow<Integer, ByteString, CompletionStage<OutgoingConnection>> nestedFlow =
flow2.viaMat(flow3, Keep.right()).named("nestedFlow");
//#mat-combine-2
//#mat-combine-3
// Materializes to Future<String> (green)
final Sink<ByteString, Future<String>> sink = Sink
.fold("", (acc, i) -> acc + i.utf8String());
final Sink<ByteString, CompletionStage<String>> sink =
Sink.<String, ByteString> fold("", (acc, i) -> acc + i.utf8String());
// Materializes to Pair<Future<OutgoingConnection>, Future<String>> (blue)
final Sink<Integer, Pair<Future<OutgoingConnection>, Future<String>>> nestedSink =
final Sink<Integer, Pair<CompletionStage<OutgoingConnection>, CompletionStage<String>>> nestedSink =
nestedFlow.toMat(sink, Keep.both());
//#mat-combine-3
//#mat-combine-4b
// Materializes to Future<MyClass> (purple)
final RunnableGraph<Future<MyClass>> runnableGraph =
final RunnableGraph<CompletionStage<MyClass>> runnableGraph =
nestedSource.toMat(nestedSink, Combiner::f);
//#mat-combine-4b
}

View file

@ -6,6 +6,8 @@ package docs.stream;
import static org.junit.Assert.assertEquals;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
@ -55,15 +57,12 @@ public class FlowDocTest {
// returns new Source<Integer>, with `map()` appended
final Source<Integer, NotUsed> zeroes = source.map(x -> 0);
final Sink<Integer, Future<Integer>> fold =
Sink.fold(0, (agg, next) -> agg + next);
final Sink<Integer, CompletionStage<Integer>> fold =
Sink.<Integer, Integer> fold(0, (agg, next) -> agg + next);
zeroes.runWith(fold, mat); // 0
//#source-immutable
int result = Await.result(
zeroes.runWith(fold, mat),
Duration.create(3, TimeUnit.SECONDS)
);
int result = zeroes.runWith(fold, mat).toCompletableFuture().get(3, TimeUnit.SECONDS);
assertEquals(0, result);
}
@ -73,18 +72,18 @@ public class FlowDocTest {
final Source<Integer, NotUsed> source =
Source.from(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
// note that the Future is scala.concurrent.Future
final Sink<Integer, Future<Integer>> sink =
Sink.fold(0, (aggr, next) -> aggr + next);
final Sink<Integer, CompletionStage<Integer>> sink =
Sink.<Integer, Integer> fold(0, (aggr, next) -> aggr + next);
// connect the Source to the Sink, obtaining a RunnableFlow
final RunnableGraph<Future<Integer>> runnable =
final RunnableGraph<CompletionStage<Integer>> runnable =
source.toMat(sink, Keep.right());
// materialize the flow
final Future<Integer> sum = runnable.run(mat);
final CompletionStage<Integer> sum = runnable.run(mat);
//#materialization-in-steps
int result = Await.result(sum, Duration.create(3, TimeUnit.SECONDS));
int result = sum.toCompletableFuture().get(3, TimeUnit.SECONDS);
assertEquals(55, result);
}
@ -93,14 +92,14 @@ public class FlowDocTest {
//#materialization-runWith
final Source<Integer, NotUsed> source =
Source.from(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
final Sink<Integer, Future<Integer>> sink =
Sink.fold(0, (aggr, next) -> aggr + next);
final Sink<Integer, CompletionStage<Integer>> sink =
Sink.<Integer, Integer> fold(0, (aggr, next) -> aggr + next);
// materialize the flow, getting the Sinks materialized value
final Future<Integer> sum = source.runWith(sink, mat);
final CompletionStage<Integer> sum = source.runWith(sink, mat);
//#materialization-runWith
int result = Await.result(sum, Duration.create(3, TimeUnit.SECONDS));
int result = sum.toCompletableFuture().get(3, TimeUnit.SECONDS);
assertEquals(55, result);
}
@ -108,21 +107,21 @@ public class FlowDocTest {
public void materializedMapUnique() throws Exception {
//#stream-reuse
// connect the Source to the Sink, obtaining a RunnableGraph
final Sink<Integer, Future<Integer>> sink =
Sink.fold(0, (aggr, next) -> aggr + next);
final RunnableGraph<Future<Integer>> runnable =
final Sink<Integer, CompletionStage<Integer>> sink =
Sink.<Integer, Integer> fold(0, (aggr, next) -> aggr + next);
final RunnableGraph<CompletionStage<Integer>> runnable =
Source.from(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)).toMat(sink, Keep.right());
// get the materialized value of the FoldSink
final Future<Integer> sum1 = runnable.run(mat);
final Future<Integer> sum2 = runnable.run(mat);
final CompletionStage<Integer> sum1 = runnable.run(mat);
final CompletionStage<Integer> sum2 = runnable.run(mat);
// sum1 and sum2 are different Futures!
//#stream-reuse
int result1 = Await.result(sum1, Duration.create(3, TimeUnit.SECONDS));
int result1 = sum1.toCompletableFuture().get(3, TimeUnit.SECONDS);
assertEquals(55, result1);
int result2 = Await.result(sum2, Duration.create(3, TimeUnit.SECONDS));
int result2 = sum2.toCompletableFuture().get(3, TimeUnit.SECONDS);
assertEquals(55, result2);
}
@ -222,40 +221,40 @@ public class FlowDocTest {
//#flow-mat-combine
// An empty source that can be shut down explicitly from the outside
Source<Integer, Promise<Optional<Integer>>> source = Source.<Integer>maybe();
Source<Integer, CompletableFuture<Optional<Integer>>> source = Source.<Integer>maybe();
// A flow that internally throttles elements to 1/second, and returns a Cancellable
// which can be used to shut down the stream
Flow<Integer, Integer, Cancellable> flow = throttler;
// A sink that returns the first element of a stream in the returned Future
Sink<Integer, Future<Integer>> sink = Sink.head();
Sink<Integer, CompletionStage<Integer>> sink = Sink.head();
// By default, the materialized value of the leftmost stage is preserved
RunnableGraph<Promise<Optional<Integer>>> r1 = source.via(flow).to(sink);
RunnableGraph<CompletableFuture<Optional<Integer>>> r1 = source.via(flow).to(sink);
// Simple selection of materialized values by using Keep.right
RunnableGraph<Cancellable> r2 = source.viaMat(flow, Keep.right()).to(sink);
RunnableGraph<Future<Integer>> r3 = source.via(flow).toMat(sink, Keep.right());
RunnableGraph<CompletionStage<Integer>> r3 = source.via(flow).toMat(sink, Keep.right());
// Using runWith will always give the materialized values of the stages added
// by runWith() itself
Future<Integer> r4 = source.via(flow).runWith(sink, mat);
Promise<Optional<Integer>> r5 = flow.to(sink).runWith(source, mat);
Pair<Promise<Optional<Integer>>, Future<Integer>> r6 = flow.runWith(source, sink, mat);
CompletionStage<Integer> r4 = source.via(flow).runWith(sink, mat);
CompletableFuture<Optional<Integer>> r5 = flow.to(sink).runWith(source, mat);
Pair<CompletableFuture<Optional<Integer>>, CompletionStage<Integer>> r6 = flow.runWith(source, sink, mat);
// Using more complext combinations
RunnableGraph<Pair<Promise<Optional<Integer>>, Cancellable>> r7 =
RunnableGraph<Pair<CompletableFuture<Optional<Integer>>, Cancellable>> r7 =
source.viaMat(flow, Keep.both()).to(sink);
RunnableGraph<Pair<Promise<Optional<Integer>>, Future<Integer>>> r8 =
RunnableGraph<Pair<CompletableFuture<Optional<Integer>>, CompletionStage<Integer>>> r8 =
source.via(flow).toMat(sink, Keep.both());
RunnableGraph<Pair<Pair<Promise<Optional<Integer>>, Cancellable>, Future<Integer>>> r9 =
RunnableGraph<Pair<Pair<CompletableFuture<Optional<Integer>>, Cancellable>, CompletionStage<Integer>>> r9 =
source.viaMat(flow, Keep.both()).toMat(sink, Keep.both());
RunnableGraph<Pair<Cancellable, Future<Integer>>> r10 =
RunnableGraph<Pair<Cancellable, CompletionStage<Integer>>> r10 =
source.viaMat(flow, Keep.right()).toMat(sink, Keep.both());
// It is also possible to map over the materialized values. In r9 we had a
@ -264,9 +263,9 @@ public class FlowDocTest {
RunnableGraph<Cancellable> r11 =
r9.mapMaterializedValue( (nestedTuple) -> {
Promise<Optional<Integer>> p = nestedTuple.first().first();
CompletableFuture<Optional<Integer>> p = nestedTuple.first().first();
Cancellable c = nestedTuple.first().second();
Future<Integer> f = nestedTuple.second();
CompletionStage<Integer> f = nestedTuple.second();
// Picking the Cancellable, but we could also construct a domain class here
return c;

View file

@ -6,6 +6,7 @@ package docs.stream;
import static org.junit.Assert.assertEquals;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import akka.NotUsed;
@ -49,14 +50,14 @@ public class FlowErrorDocTest {
final Materializer mat = ActorMaterializer.create(system);
final Source<Integer, NotUsed> source = Source.from(Arrays.asList(0, 1, 2, 3, 4, 5))
.map(elem -> 100 / elem);
final Sink<Integer, Future<Integer>> fold =
Sink.fold(0, (acc, elem) -> acc + elem);
final Future<Integer> result = source.runWith(fold, mat);
final Sink<Integer, CompletionStage<Integer>> fold =
Sink.<Integer, Integer> fold(0, (acc, elem) -> acc + elem);
final CompletionStage<Integer> result = source.runWith(fold, mat);
// division by zero will fail the stream and the
// result here will be a Future completed with Failure(ArithmeticException)
//#stop
Await.result(result, Duration.create(3, TimeUnit.SECONDS));
result.toCompletableFuture().get(3, TimeUnit.SECONDS);
}
@Test
@ -73,14 +74,14 @@ public class FlowErrorDocTest {
system);
final Source<Integer, NotUsed> source = Source.from(Arrays.asList(0, 1, 2, 3, 4, 5))
.map(elem -> 100 / elem);
final Sink<Integer, Future<Integer>> fold =
final Sink<Integer, CompletionStage<Integer>> fold =
Sink.fold(0, (acc, elem) -> acc + elem);
final Future<Integer> result = source.runWith(fold, mat);
final CompletionStage<Integer> result = source.runWith(fold, mat);
// the element causing division by zero will be dropped
// result here will be a Future completed with Success(228)
//#resume
assertEquals(Integer.valueOf(228), Await.result(result, Duration.create(3, TimeUnit.SECONDS)));
assertEquals(Integer.valueOf(228), result.toCompletableFuture().get(3, TimeUnit.SECONDS));
}
@Test
@ -98,14 +99,14 @@ public class FlowErrorDocTest {
.withAttributes(ActorAttributes.withSupervisionStrategy(decider));
final Source<Integer, NotUsed> source = Source.from(Arrays.asList(0, 1, 2, 3, 4, 5))
.via(flow);
final Sink<Integer, Future<Integer>> fold =
Sink.fold(0, (acc, elem) -> acc + elem);
final Future<Integer> result = source.runWith(fold, mat);
final Sink<Integer, CompletionStage<Integer>> fold =
Sink.<Integer, Integer> fold(0, (acc, elem) -> acc + elem);
final CompletionStage<Integer> result = source.runWith(fold, mat);
// the elements causing division by zero will be dropped
// result here will be a Future completed with Success(150)
//#resume-section
assertEquals(Integer.valueOf(150), Await.result(result, Duration.create(3, TimeUnit.SECONDS)));
assertEquals(Integer.valueOf(150), result.toCompletableFuture().get(3, TimeUnit.SECONDS));
}
@Test
@ -126,7 +127,7 @@ public class FlowErrorDocTest {
.withAttributes(ActorAttributes.withSupervisionStrategy(decider));
final Source<Integer, NotUsed> source = Source.from(Arrays.asList(1, 3, -1, 5, 7))
.via(flow);
final Future<List<Integer>> result = source.grouped(1000)
final CompletionStage<List<Integer>> result = source.grouped(1000)
.runWith(Sink.<List<Integer>>head(), mat);
// the negative element cause the scan stage to be restarted,
// i.e. start from 0 again
@ -135,7 +136,7 @@ public class FlowErrorDocTest {
assertEquals(
Arrays.asList(0, 1, 4, 0, 5, 12),
Await.result(result, Duration.create(3, TimeUnit.SECONDS)));
result.toCompletableFuture().get(3, TimeUnit.SECONDS));
}
}

View file

@ -7,6 +7,7 @@ import static org.junit.Assert.*;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import akka.NotUsed;
@ -46,15 +47,15 @@ public class FlowGraphDocTest {
public void demonstrateBuildSimpleGraph() throws Exception {
//#simple-flow-graph
final Source<Integer, NotUsed> in = Source.from(Arrays.asList(1, 2, 3, 4, 5));
final Sink<List<String>, Future<List<String>>> sink = Sink.head();
final Sink<List<Integer>, Future<List<Integer>>> sink2 = Sink.head();
final Sink<List<String>, CompletionStage<List<String>>> sink = Sink.head();
final Sink<List<Integer>, CompletionStage<List<Integer>>> sink2 = Sink.head();
final Flow<Integer, Integer, NotUsed> f1 = Flow.of(Integer.class).map(elem -> elem + 10);
final Flow<Integer, Integer, NotUsed> f2 = Flow.of(Integer.class).map(elem -> elem + 20);
final Flow<Integer, String, NotUsed> f3 = Flow.of(Integer.class).map(elem -> elem.toString());
final Flow<Integer, Integer, NotUsed> f4 = Flow.of(Integer.class).map(elem -> elem + 30);
final RunnableGraph<Future<List<String>>> result =
RunnableGraph.<Future<List<String>>>fromGraph(
final RunnableGraph<CompletionStage<List<String>>> result =
RunnableGraph.<CompletionStage<List<String>>>fromGraph(
GraphDSL
.create(
sink,
@ -70,7 +71,7 @@ public class FlowGraphDocTest {
return ClosedShape.getInstance();
}));
//#simple-flow-graph
final List<String> list = Await.result(result.run(mat), Duration.create(3, TimeUnit.SECONDS));
final List<String> list = result.run(mat).toCompletableFuture().get(3, TimeUnit.SECONDS);
final String[] res = list.toArray(new String[] {});
Arrays.sort(res, null);
assertArrayEquals(new String[] { "31", "32", "33", "34", "35", "41", "42", "43", "44", "45" }, res);
@ -105,12 +106,12 @@ public class FlowGraphDocTest {
@Test
public void demonstrateReusingFlowInGraph() throws Exception {
//#flow-graph-reusing-a-flow
final Sink<Integer, Future<Integer>> topHeadSink = Sink.head();
final Sink<Integer, Future<Integer>> bottomHeadSink = Sink.head();
final Sink<Integer, CompletionStage<Integer>> topHeadSink = Sink.head();
final Sink<Integer, CompletionStage<Integer>> bottomHeadSink = Sink.head();
final Flow<Integer, Integer, NotUsed> sharedDoubler = Flow.of(Integer.class).map(elem -> elem * 2);
final RunnableGraph<Pair<Future<Integer>, Future<Integer>>> g =
RunnableGraph.<Pair<Future<Integer>, Future<Integer>>>fromGraph(
final RunnableGraph<Pair<CompletionStage<Integer>, CompletionStage<Integer>>> g =
RunnableGraph.<Pair<CompletionStage<Integer>, CompletionStage<Integer>>>fromGraph(
GraphDSL.create(
topHeadSink, // import this sink into the graph
bottomHeadSink, // and this as well
@ -127,24 +128,22 @@ public class FlowGraphDocTest {
)
);
//#flow-graph-reusing-a-flow
final Pair<Future<Integer>, Future<Integer>> pair = g.run(mat);
assertEquals(Integer.valueOf(2), Await.result(pair.first(), Duration.create(3, TimeUnit.SECONDS)));
assertEquals(Integer.valueOf(2), Await.result(pair.second(), Duration.create(3, TimeUnit.SECONDS)));
final Pair<CompletionStage<Integer>, CompletionStage<Integer>> pair = g.run(mat);
assertEquals(Integer.valueOf(2), pair.first().toCompletableFuture().get(3, TimeUnit.SECONDS));
assertEquals(Integer.valueOf(2), pair.second().toCompletableFuture().get(3, TimeUnit.SECONDS));
}
@Test
public void demonstrateMatValue() throws Exception {
//#flow-graph-matvalue
final Sink<Integer, Future<Integer>> foldSink = Sink.<Integer, Integer> fold(0, (a, b) -> {
final Sink<Integer, CompletionStage<Integer>> foldSink = Sink.<Integer, Integer> fold(0, (a, b) -> {
return a + b;
});
final Flow<Future<Integer>, Integer, NotUsed> flatten = Flow.<Future<Integer>>create()
.mapAsync(4, x -> {
return x;
});
final Flow<CompletionStage<Integer>, Integer, NotUsed> flatten =
Flow.<CompletionStage<Integer>>create().mapAsync(4, x -> x);
final Flow<Integer, Integer, Future<Integer>> foldingFlow = Flow.fromGraph(
final Flow<Integer, Integer, CompletionStage<Integer>> foldingFlow = Flow.fromGraph(
GraphDSL.create(foldSink,
(b, fold) -> {
return FlowShape.of(
@ -155,7 +154,7 @@ public class FlowGraphDocTest {
//#flow-graph-matvalue-cycle
// This cannot produce any value:
final Source<Integer, Future<Integer>> cyclicSource = Source.fromGraph(
final Source<Integer, CompletionStage<Integer>> cyclicSource = Source.fromGraph(
GraphDSL.create(foldSink,
(b, fold) -> {
// - Fold cannot complete until its upstream mapAsync completes

View file

@ -9,6 +9,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Predicate;
@ -154,11 +155,11 @@ public class FlowStagesDocTest {
@Test
public void demonstrateVariousPushPullStages() throws Exception {
final Sink<Integer, Future<List<Integer>>> sink =
final Sink<Integer, CompletionStage<List<Integer>>> sink =
Flow.of(Integer.class).grouped(10).toMat(Sink.head(), Keep.right());
//#stage-chain
final RunnableGraph<Future<List<Integer>>> runnable =
final RunnableGraph<CompletionStage<List<Integer>>> runnable =
Source
.from(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
.transform(() -> new Filter<Integer>(elem -> elem % 2 == 0))
@ -168,7 +169,7 @@ public class FlowStagesDocTest {
//#stage-chain
assertEquals(Arrays.asList(1, 1, 2, 2, 3, 3, 4, 4, 5, 5),
Await.result(runnable.run(mat), FiniteDuration.create(3, TimeUnit.SECONDS)));
runnable.run(mat).toCompletableFuture().get(3, TimeUnit.SECONDS));
}
//#detached

View file

@ -31,6 +31,9 @@ import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
@ -101,14 +104,14 @@ public class GraphStageDocTest {
Source<Integer, NotUsed> mySource = Source.fromGraph(sourceGraph);
// Returns 55
Future<Integer> result1 = mySource.take(10).runFold(0, (sum, next) -> sum + next, mat);
CompletionStage<Integer> result1 = mySource.take(10).runFold(0, (sum, next) -> sum + next, mat);
// The source is reusable. This returns 5050
Future<Integer> result2 = mySource.take(100).runFold(0, (sum, next) -> sum + next, mat);
CompletionStage<Integer> result2 = mySource.take(100).runFold(0, (sum, next) -> sum + next, mat);
//#simple-source-usage
assertEquals(Await.result(result1, Duration.create(3, "seconds")), (Integer) 55);
assertEquals(Await.result(result2, Duration.create(3, "seconds")), (Integer) 5050);
assertEquals(result1.toCompletableFuture().get(3, TimeUnit.SECONDS), (Integer) 55);
assertEquals(result2.toCompletableFuture().get(3, TimeUnit.SECONDS), (Integer) 5050);
}
@ -169,12 +172,12 @@ public class GraphStageDocTest {
}
}));
Future<Integer> result =
CompletionStage<Integer> result =
Source.from(Arrays.asList("one", "two", "three"))
.via(stringLength)
.runFold(0, (sum, n) -> sum + n, mat);
assertEquals(new Integer(11), Await.result(result, Duration.create(3, "seconds")));
assertEquals(new Integer(11), result.toCompletableFuture().get(3, TimeUnit.SECONDS));
}
//#many-to-one
@ -231,12 +234,12 @@ public class GraphStageDocTest {
Graph<FlowShape<Integer, Integer>, NotUsed> evenFilter =
Flow.fromGraph(new Filter<Integer>(n -> n % 2 == 0));
Future<Integer> result =
CompletionStage<Integer> result =
Source.from(Arrays.asList(1, 2, 3, 4, 5, 6))
.via(evenFilter)
.runFold(0, (elem, sum) -> sum + elem, mat);
assertEquals(new Integer(12), Await.result(result, Duration.create(3, "seconds")));
assertEquals(new Integer(12), result.toCompletableFuture().get(3, TimeUnit.SECONDS));
}
//#one-to-many
@ -300,12 +303,12 @@ public class GraphStageDocTest {
Graph<FlowShape<Integer, Integer>, NotUsed> duplicator =
Flow.fromGraph(new Duplicator<Integer>());
Future<Integer> result =
CompletionStage<Integer> result =
Source.from(Arrays.asList(1, 2, 3))
.via(duplicator)
.runFold(0, (n, sum) -> n + sum, mat);
assertEquals(new Integer(12), Await.result(result, Duration.create(3, "seconds")));
assertEquals(new Integer(12), result.toCompletableFuture().get(3, TimeUnit.SECONDS));
}
@ -357,20 +360,20 @@ public class GraphStageDocTest {
Graph<FlowShape<Integer, Integer>, NotUsed> duplicator =
Flow.fromGraph(new Duplicator2<Integer>());
Future<Integer> result =
CompletionStage<Integer> result =
Source.from(Arrays.asList(1, 2, 3))
.via(duplicator)
.runFold(0, (n, sum) -> n + sum, mat);
assertEquals(new Integer(12), Await.result(result, Duration.create(3, "seconds")));
assertEquals(new Integer(12), result.toCompletableFuture().get(3, TimeUnit.SECONDS));
}
@Test
public void demonstrateChainingOfGraphStages() throws Exception {
Graph<SinkShape<Integer>, Future<String>> sink = Sink.fold("", (acc, n) -> acc + n.toString());
Graph<SinkShape<Integer>, CompletionStage<String>> sink = Sink.fold("", (acc, n) -> acc + n.toString());
//#graph-stage-chain
Future<String> resultFuture = Source.from(Arrays.asList(1,2,3,4,5))
CompletionStage<String> resultFuture = Source.from(Arrays.asList(1,2,3,4,5))
.via(new Filter<Integer>((n) -> n % 2 == 0))
.via(new Duplicator<Integer>())
.via(new Map<Integer, Integer>((n) -> n / 2))
@ -378,7 +381,7 @@ public class GraphStageDocTest {
//#graph-stage-chain
assertEquals("1122", Await.result(resultFuture, Duration.create(3, "seconds")));
assertEquals("1122", resultFuture.toCompletableFuture().get(3, TimeUnit.SECONDS));
}
@ -386,9 +389,9 @@ public class GraphStageDocTest {
// will close upstream when the future completes
public class KillSwitch<A> extends GraphStage<FlowShape<A, A>> {
private final Future<Done> switchF;
private final CompletionStage<Done> switchF;
public KillSwitch(Future<Done> switchF) {
public KillSwitch(CompletionStage<Done> switchF) {
this.switchF = switchF;
}
@ -430,12 +433,7 @@ public class GraphStageDocTest {
});
ExecutionContext ec = system.dispatcher();
switchF.onSuccess(new OnSuccess<Done>() {
@Override
public void onSuccess(Done result) throws Throwable {
callback.invoke(Done.getInstance());
}
}, ec);
switchF.thenAccept(callback::invoke);
}
};
}
@ -446,29 +444,23 @@ public class GraphStageDocTest {
public void demonstrateAnAsynchronousSideChannel() throws Exception{
// tests:
Promise<Done> switchF = Futures.promise();
CompletableFuture<Done> switchF = new CompletableFuture<>();
Graph<FlowShape<Integer, Integer>, NotUsed> killSwitch =
Flow.fromGraph(new KillSwitch<>(switchF.future()));
Flow.fromGraph(new KillSwitch<>(switchF));
ExecutionContext ec = system.dispatcher();
// TODO this is probably racey, is there a way to make sure it happens after?
Future<Integer> valueAfterKill = switchF.future().flatMap(new Mapper<Done, Future<Integer>>() {
@Override
public Future<Integer> apply(Done parameter) {
return Futures.successful(4);
}
}, ec);
CompletionStage<Integer> valueAfterKill = switchF.thenApply(in -> 4);
Future<Integer> result =
Source.from(Arrays.asList(1, 2, 3)).concat(Source.fromFuture(valueAfterKill))
CompletionStage<Integer> result =
Source.from(Arrays.asList(1, 2, 3)).concat(Source.fromCompletionStage(valueAfterKill))
.via(killSwitch)
.runFold(0, (n, sum) -> n + sum, mat);
switchF.success(Done.getInstance());
switchF.complete(Done.getInstance());
assertEquals(new Integer(6), Await.result(result, Duration.create(3, "seconds")));
assertEquals(new Integer(6), result.toCompletableFuture().get(3, TimeUnit.SECONDS));
}
@ -531,18 +523,18 @@ public class GraphStageDocTest {
public void demonstrateAGraphStageWithATimer() throws Exception {
// tests:
Future<Integer> result =
CompletionStage<Integer> result =
Source.from(Arrays.asList(1, 2, 3))
.via(new TimedGate<>(Duration.create(2, "seconds")))
.takeWithin(Duration.create(250, "millis"))
.runFold(0, (n, sum) -> n + sum, mat);
assertEquals(new Integer(1), Await.result(result, Duration.create(3, "seconds")));
assertEquals(new Integer(1), result.toCompletableFuture().get(3, TimeUnit.SECONDS));
}
//#materialized
public class FirstValue<A> extends GraphStageWithMaterializedValue<FlowShape<A, A>, Future<A>> {
public class FirstValue<A> extends GraphStageWithMaterializedValue<FlowShape<A, A>, CompletionStage<A>> {
public final Inlet<A> in = Inlet.create("FirstValue.in");
public final Outlet<A> out = Outlet.create("FirstValue.out");
@ -554,7 +546,7 @@ public class GraphStageDocTest {
}
@Override
public Tuple2<GraphStageLogic, Future<A>> createLogicAndMaterializedValue(Attributes inheritedAttributes) {
public Tuple2<GraphStageLogic, CompletionStage<A>> createLogicAndMaterializedValue(Attributes inheritedAttributes) {
Promise<A> promise = Futures.promise();
GraphStageLogic logic = new GraphStageLogic(shape) {
@ -592,13 +584,13 @@ public class GraphStageDocTest {
public void demonstrateACustomMaterializedValue() throws Exception {
// tests:
RunnableGraph<Future<Integer>> flow = Source.from(Arrays.asList(1, 2, 3))
RunnableGraph<CompletionStage<Integer>> flow = Source.from(Arrays.asList(1, 2, 3))
.viaMat(new FirstValue(), Keep.right())
.to(Sink.ignore());
Future<Integer> result = flow.run(mat);
CompletionStage<Integer> result = flow.run(mat);
assertEquals(new Integer(1), Await.result(result, Duration.create(3, "seconds")));
assertEquals(new Integer(1), result.toCompletableFuture().get(3, TimeUnit.SECONDS));
}
@ -685,11 +677,11 @@ public class GraphStageDocTest {
public void demonstrateADetachedGraphStage() throws Exception {
// tests:
Future<Integer> result1 = Source.from(Arrays.asList(1, 2, 3))
CompletionStage<Integer> result1 = Source.from(Arrays.asList(1, 2, 3))
.via(new TwoBuffer<>())
.runFold(0, (acc, n) -> acc + n, mat);
assertEquals(new Integer(6), Await.result(result1, Duration.create(3, "seconds")));
assertEquals(new Integer(6), result1.toCompletableFuture().get(3, TimeUnit.SECONDS));
TestSubscriber.ManualProbe<Integer> subscriber = TestSubscriber.manualProbe(system);
TestPublisher.Probe<Integer> publisher = TestPublisher.probe(0, system);

View file

@ -6,8 +6,6 @@ package docs.stream;
import akka.NotUsed;
import akka.actor.*;
import akka.dispatch.Futures;
import akka.dispatch.MessageDispatcher;
import akka.japi.pf.ReceiveBuilder;
import akka.stream.*;
import akka.stream.javadsl.*;
@ -21,14 +19,15 @@ import docs.stream.TwitterStreamQuickstartDocTest.Model.Tweet;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import static akka.pattern.Patterns.ask;
import static akka.pattern.PatternsCS.ask;
import static docs.stream.TwitterStreamQuickstartDocTest.Model.AKKA;
import static docs.stream.TwitterStreamQuickstartDocTest.Model.tweets;
import static junit.framework.TestCase.assertTrue;
@ -64,26 +63,26 @@ public class IntegrationDocTest {
class AddressSystem {
//#email-address-lookup
public Future<Optional<String>> lookupEmail(String handle)
public CompletionStage<Optional<String>> lookupEmail(String handle)
//#email-address-lookup
{
return Futures.successful(Optional.of(handle + "@somewhere.com"));
return CompletableFuture.completedFuture(Optional.of(handle + "@somewhere.com"));
}
//#phone-lookup
public Future<Optional<String>> lookupPhoneNumber(String handle)
public CompletionStage<Optional<String>> lookupPhoneNumber(String handle)
//#phone-lookup
{
return Futures.successful(Optional.of("" + handle.hashCode()));
return CompletableFuture.completedFuture(Optional.of("" + handle.hashCode()));
}
}
class AddressSystem2 {
//#email-address-lookup2
public Future<String> lookupEmail(String handle)
public CompletionStage<String> lookupEmail(String handle)
//#email-address-lookup2
{
return Futures.successful(handle + "@somewhere.com");
return CompletableFuture.completedFuture(handle + "@somewhere.com");
}
}
@ -177,11 +176,11 @@ public class IntegrationDocTest {
}
//#email-server-send
public Future<Email> send(Email email) {
public CompletionStage<Email> send(Email email) {
// ...
//#email-server-send
probe.tell(email.to, ActorRef.noSender());
return Futures.successful(email);
return CompletableFuture.completedFuture(email);
//#email-server-send
}
//#email-server-send
@ -258,21 +257,21 @@ public class IntegrationDocTest {
//#sometimes-slow-service
static class SometimesSlowService {
private final ExecutionContext ec;
public SometimesSlowService(ExecutionContext ec) {
private final Executor ec;
public SometimesSlowService(Executor ec) {
this.ec = ec;
}
private final AtomicInteger runningCount = new AtomicInteger();
public Future<String> convert(String s) {
public CompletionStage<String> convert(String s) {
System.out.println("running: " + s + "(" + runningCount.incrementAndGet() + ")");
return Futures.future(() -> {
return CompletableFuture.supplyAsync(() -> {
if (!s.isEmpty() && Character.isLowerCase(s.charAt(0)))
Thread.sleep(500);
try { Thread.sleep(500); } catch (InterruptedException e) {}
else
Thread.sleep(20);
try { Thread.sleep(20); } catch (InterruptedException e) {}
System.out.println("completed: " + s + "(" + runningCount.decrementAndGet() + ")");
return s.toUpperCase();
}, ec);
@ -399,15 +398,12 @@ public class IntegrationDocTest {
.map(o -> o.get());
//#blocking-mapAsync
final MessageDispatcher blockingEc = system.dispatchers().lookup("blocking-dispatcher");
final Executor blockingEc = system.dispatchers().lookup("blocking-dispatcher");
final RunnableGraph<NotUsed> sendTextMessages =
phoneNumbers
.mapAsync(4, phoneNo ->
Futures.future(() ->
smsServer.send(new TextMessage(phoneNo, "I like your tweet")),
blockingEc)
)
.mapAsync(4, phoneNo -> CompletableFuture.supplyAsync(() ->
smsServer.send(new TextMessage(phoneNo, "I like your tweet")), blockingEc))
.to(Sink.ignore());
sendTextMessages.run(mat);
@ -518,7 +514,7 @@ public class IntegrationDocTest {
{
//#sometimes-slow-mapAsync
final MessageDispatcher blockingEc = system.dispatchers().lookup("blocking-dispatcher");
final Executor blockingEc = system.dispatchers().lookup("blocking-dispatcher");
final SometimesSlowService service = new SometimesSlowService(blockingEc);
final ActorMaterializer mat = ActorMaterializer.create(
@ -563,7 +559,7 @@ public class IntegrationDocTest {
{
//#sometimes-slow-mapAsyncUnordered
final MessageDispatcher blockingEc = system.dispatchers().lookup("blocking-dispatcher");
final Executor blockingEc = system.dispatchers().lookup("blocking-dispatcher");
final SometimesSlowService service = new SometimesSlowService(blockingEc);
final ActorMaterializer mat = ActorMaterializer.create(

View file

@ -7,6 +7,7 @@ package docs.stream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.DoubleStream;
@ -72,13 +73,12 @@ public class RateTransformationDocTest {
});
//#conflate-summarize
final Future<List<Tuple3<Double, Double, Integer>>> fut = Source.repeat(0).map(i -> r.nextGaussian())
final CompletionStage<List<Tuple3<Double, Double, Integer>>> fut = Source.repeat(0).map(i -> r.nextGaussian())
.via(statsFlow)
.grouped(10)
.runWith(Sink.head(), mat);
final Duration timeout = Duration.create(100, TimeUnit.MILLISECONDS);
Await.result(fut, timeout);
fut.toCompletableFuture().get(1, TimeUnit.SECONDS);
}
@Test
@ -97,12 +97,11 @@ public class RateTransformationDocTest {
.mapConcat(d -> d);
//#conflate-sample
final Future<Double> fut = Source.from(new ArrayList<Double>(Collections.nCopies(1000, 1.0)))
final CompletionStage<Double> fut = Source.from(new ArrayList<Double>(Collections.nCopies(1000, 1.0)))
.via(sampleFlow)
.runWith(Sink.fold(0.0, (agg, next) -> agg + next), mat);
final Duration timeout = Duration.create(1, TimeUnit.SECONDS);
final Double count = Await.result(fut, timeout);
final Double count = fut.toCompletableFuture().get(1, TimeUnit.SECONDS);
}
@Test
@ -112,17 +111,16 @@ public class RateTransformationDocTest {
.expand(in -> Stream.iterate(in, i -> i).iterator());
//#expand-last
final Pair<TestPublisher.Probe<Double>, Future<List<Double>>> probeFut = TestSource.<Double> probe(system)
final Pair<TestPublisher.Probe<Double>, CompletionStage<List<Double>>> probeFut = TestSource.<Double> probe(system)
.via(lastFlow)
.grouped(10)
.toMat(Sink.head(), Keep.both())
.run(mat);
final TestPublisher.Probe<Double> probe = probeFut.first();
final Future<List<Double>> fut = probeFut.second();
final CompletionStage<List<Double>> fut = probeFut.second();
probe.sendNext(1.0);
final Duration timeout = Duration.create(1, TimeUnit.SECONDS);
final List<Double> expanded = Await.result(fut, timeout);
final List<Double> expanded = fut.toCompletableFuture().get(1, TimeUnit.SECONDS);
assertEquals(expanded.size(), 10);
assertEquals(expanded.stream().mapToDouble(d -> d).sum(), 10, 0.1);
}

View file

@ -6,6 +6,7 @@ package docs.stream;
import static org.junit.Assert.assertEquals;
import java.util.*;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import akka.Done;
@ -58,10 +59,10 @@ public class StreamPartialFlowGraphDocTest {
new Inlet[] {zip1.in0(), zip1.in1(), zip2.in1()});
});
final Sink<Integer, Future<Integer>> resultSink = Sink.<Integer>head();
final Sink<Integer, CompletionStage<Integer>> resultSink = Sink.<Integer>head();
final RunnableGraph<Future<Integer>> g =
RunnableGraph.<Future<Integer>>fromGraph(
final RunnableGraph<CompletionStage<Integer>> g =
RunnableGraph.<CompletionStage<Integer>>fromGraph(
GraphDSL.create(resultSink, (builder, sink) -> {
// import the partial flow graph explicitly
final UniformFanInShape<Integer, Integer> pm = builder.add(pickMaxOfThree);
@ -73,9 +74,9 @@ public class StreamPartialFlowGraphDocTest {
return ClosedShape.getInstance();
}));
final Future<Integer> max = g.run(mat);
final CompletionStage<Integer> max = g.run(mat);
//#simple-partial-flow-graph
assertEquals(Integer.valueOf(3), Await.result(max, Duration.create(3, TimeUnit.SECONDS)));
assertEquals(Integer.valueOf(3), max.toCompletableFuture().get(3, TimeUnit.SECONDS));
}
//#source-from-partial-flow-graph
@ -110,10 +111,10 @@ public class StreamPartialFlowGraphDocTest {
return SourceShape.of(zip.out());
}));
final Future<Pair<Integer, Integer>> firstPair =
final CompletionStage<Pair<Integer, Integer>> firstPair =
pairs.runWith(Sink.<Pair<Integer, Integer>>head(), mat);
//#source-from-partial-flow-graph
assertEquals(new Pair<>(0, 1), Await.result(firstPair, Duration.create(3, TimeUnit.SECONDS)));
assertEquals(new Pair<>(0, 1), firstPair.toCompletableFuture().get(3, TimeUnit.SECONDS));
}
@Test
@ -132,12 +133,12 @@ public class StreamPartialFlowGraphDocTest {
}));
//#flow-from-partial-flow-graph
final Future<Pair<Integer, String>> matSink =
final CompletionStage<Pair<Integer, String>> matSink =
//#flow-from-partial-flow-graph
Source.single(1).via(pairs).runWith(Sink.<Pair<Integer, String>>head(), mat);
//#flow-from-partial-flow-graph
assertEquals(new Pair<>(1, "1"), Await.result(matSink, Duration.create(3, TimeUnit.SECONDS)));
assertEquals(new Pair<>(1, "1"), matSink.toCompletableFuture().get(3, TimeUnit.SECONDS));
}
@ -150,12 +151,12 @@ public class StreamPartialFlowGraphDocTest {
final Source<Integer, NotUsed> sources = Source.combine(source1, source2, new ArrayList<>(),
i -> Merge.<Integer>create(i));
//#source-combine
final Future<Integer> result=
final CompletionStage<Integer> result=
//#source-combine
sources.runWith(Sink.<Integer, Integer>fold(0, (a,b) -> a + b), mat);
//#source-combine
assertEquals(Integer.valueOf(3), Await.result(result, Duration.create(3, TimeUnit.SECONDS)));
assertEquals(Integer.valueOf(3), result.toCompletableFuture().get(3, TimeUnit.SECONDS));
}
@Test
@ -165,7 +166,7 @@ public class StreamPartialFlowGraphDocTest {
//#sink-combine
Sink<Integer, NotUsed> sendRemotely = Sink.actorRef(actorRef, "Done");
Sink<Integer, Future<Done>> localProcessing = Sink.<Integer>foreach(a -> { /*do something useful*/ } );
Sink<Integer, CompletionStage<Done>> localProcessing = Sink.<Integer>foreach(a -> { /*do something useful*/ } );
Sink<Integer, NotUsed> sinks = Sink.combine(sendRemotely,localProcessing, new ArrayList<>(), a -> Broadcast.create(a));
Source.<Integer>from(Arrays.asList(new Integer[]{0, 1, 2})).runWith(sinks, mat);

View file

@ -6,6 +6,9 @@ package docs.stream;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import akka.NotUsed;
@ -13,7 +16,6 @@ import org.junit.*;
import static org.junit.Assert.assertEquals;
import akka.actor.*;
import akka.dispatch.Futures;
import akka.testkit.*;
import akka.japi.Pair;
import akka.stream.*;
@ -23,7 +25,6 @@ import akka.stream.testkit.javadsl.*;
import akka.testkit.TestProbe;
import scala.util.*;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
@ -48,13 +49,13 @@ public class StreamTestKitDocTest {
@Test
public void strictCollection() throws Exception {
//#strict-collection
final Sink<Integer, Future<Integer>> sinkUnderTest = Flow.of(Integer.class)
final Sink<Integer, CompletionStage<Integer>> sinkUnderTest = Flow.of(Integer.class)
.map(i -> i * 2)
.toMat(Sink.fold(0, (agg, next) -> agg + next), Keep.right());
final Future<Integer> future = Source.from(Arrays.asList(1, 2, 3, 4))
final CompletionStage<Integer> future = Source.from(Arrays.asList(1, 2, 3, 4))
.runWith(sinkUnderTest, mat);
final Integer result = Await.result(future, Duration.create(1, TimeUnit.SECONDS));
final Integer result = future.toCompletableFuture().get(1, TimeUnit.SECONDS);
assert(result == 20);
//#strict-collection
}
@ -65,11 +66,10 @@ public class StreamTestKitDocTest {
final Source<Integer, NotUsed> sourceUnderTest = Source.repeat(1)
.map(i -> i * 2);
final Future<List<Integer>> future = sourceUnderTest
final CompletionStage<List<Integer>> future = sourceUnderTest
.grouped(10)
.runWith(Sink.head(), mat);
final List<Integer> result =
Await.result(future, Duration.create(1, TimeUnit.SECONDS));
final List<Integer> result = future.toCompletableFuture().get(1, TimeUnit.SECONDS);
assertEquals(result, Collections.nCopies(10, 2));
//#grouped-infinite
}
@ -80,9 +80,9 @@ public class StreamTestKitDocTest {
final Flow<Integer, Integer, NotUsed> flowUnderTest = Flow.of(Integer.class)
.takeWhile(i -> i < 5);
final Future<Integer> future = Source.from(Arrays.asList(1, 2, 3, 4, 5, 6))
final CompletionStage<Integer> future = Source.from(Arrays.asList(1, 2, 3, 4, 5, 6))
.via(flowUnderTest).runWith(Sink.fold(0, (agg, next) -> agg + next), mat);
final Integer result = Await.result(future, Duration.create(1, TimeUnit.SECONDS));
final Integer result = future.toCompletableFuture().get(1, TimeUnit.SECONDS);
assert(result == 10);
//#folded-stream
}
@ -95,10 +95,10 @@ public class StreamTestKitDocTest {
.grouped(2);
final TestProbe probe = new TestProbe(system);
final Future<List<List<Integer>>> future = sourceUnderTest
final CompletionStage<List<List<Integer>>> future = sourceUnderTest
.grouped(2)
.runWith(Sink.head(), mat);
akka.pattern.Patterns.pipe(future, system.dispatcher()).to(probe.ref());
akka.pattern.PatternsCS.pipe(future, system.dispatcher()).to(probe.ref());
probe.expectMsg(Duration.create(1, TimeUnit.SECONDS),
Arrays.asList(Arrays.asList(1, 2), Arrays.asList(3, 4))
);
@ -129,23 +129,23 @@ public class StreamTestKitDocTest {
@Test
public void sourceActorRef() throws Exception {
//#source-actorref
final Sink<Integer, Future<String>> sinkUnderTest = Flow.of(Integer.class)
final Sink<Integer, CompletionStage<String>> sinkUnderTest = Flow.of(Integer.class)
.map(i -> i.toString())
.toMat(Sink.fold("", (agg, next) -> agg + next), Keep.right());
final Pair<ActorRef, Future<String>> refAndFuture =
final Pair<ActorRef, CompletionStage<String>> refAndCompletionStage =
Source.<Integer>actorRef(8, OverflowStrategy.fail())
.toMat(sinkUnderTest, Keep.both())
.run(mat);
final ActorRef ref = refAndFuture.first();
final Future<String> future = refAndFuture.second();
final ActorRef ref = refAndCompletionStage.first();
final CompletionStage<String> future = refAndCompletionStage.second();
ref.tell(1, ActorRef.noSender());
ref.tell(2, ActorRef.noSender());
ref.tell(3, ActorRef.noSender());
ref.tell(new akka.actor.Status.Success("done"), ActorRef.noSender());
final String result = Await.result(future, Duration.create(1, TimeUnit.SECONDS));
final String result = future.toCompletableFuture().get(1, TimeUnit.SECONDS);
assertEquals(result, "123");
//#source-actorref
}
@ -180,19 +180,23 @@ public class StreamTestKitDocTest {
@Test
public void injectingFailure() throws Exception {
//#injecting-failure
final Sink<Integer, Future<Integer>> sinkUnderTest = Sink.head();
final Sink<Integer, CompletionStage<Integer>> sinkUnderTest = Sink.head();
final Pair<TestPublisher.Probe<Integer>, Future<Integer>> probeAndFuture =
final Pair<TestPublisher.Probe<Integer>, CompletionStage<Integer>> probeAndCompletionStage =
TestSource.<Integer>probe(system)
.toMat(sinkUnderTest, Keep.both())
.run(mat);
final TestPublisher.Probe<Integer> probe = probeAndFuture.first();
final Future<Integer> future = probeAndFuture.second();
final TestPublisher.Probe<Integer> probe = probeAndCompletionStage.first();
final CompletionStage<Integer> future = probeAndCompletionStage.second();
probe.sendError(new Exception("boom"));
Await.ready(future, Duration.create(1, TimeUnit.SECONDS));
final Throwable exception = ((Failure)future.value().get()).exception();
assertEquals(exception.getMessage(), "boom");
try {
future.toCompletableFuture().get(1, TimeUnit.SECONDS);
assert false;
} catch (ExecutionException ee) {
final Throwable exception = ee.getCause();
assertEquals(exception.getMessage(), "boom");
}
//#injecting-failure
}
@ -200,11 +204,11 @@ public class StreamTestKitDocTest {
public void testSourceAndTestSink() throws Exception {
//#test-source-and-sink
final Flow<Integer, Integer, NotUsed> flowUnderTest = Flow.of(Integer.class)
.mapAsyncUnordered(2, sleep -> akka.pattern.Patterns.after(
.mapAsyncUnordered(2, sleep -> akka.pattern.PatternsCS.after(
Duration.create(10, TimeUnit.MILLISECONDS),
system.scheduler(),
system.dispatcher(),
Futures.successful(sleep)
CompletableFuture.completedFuture(sleep)
));
final Pair<TestPublisher.Probe<Integer>, TestSubscriber.Probe<Integer>> pubAndSub =

View file

@ -26,6 +26,8 @@ import scala.concurrent.duration.FiniteDuration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Set;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
@ -199,14 +201,14 @@ public class TwitterStreamQuickstartDocTest {
}
static class Example2 {
public void run(final Materializer mat) throws TimeoutException, InterruptedException {
public void run(final Materializer mat) throws TimeoutException, InterruptedException, ExecutionException {
//#backpressure-by-readline
final Future<?> completion =
final CompletionStage<Done> completion =
Source.from(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
.map(i -> { System.out.println("map => " + i); return i; })
.runForeach(i -> System.console().readLine("Element = %s continue reading? [press enter]\n", i), mat);
Await.ready(completion, FiniteDuration.create(1, TimeUnit.MINUTES));
completion.toCompletableFuture().get(1, TimeUnit.SECONDS);
//#backpressure-by-readline
}
}
@ -276,8 +278,8 @@ public class TwitterStreamQuickstartDocTest {
@Test
public void demonstrateBroadcast() {
final Sink<Author, Future<Done>> writeAuthors = Sink.ignore();
final Sink<Hashtag, Future<Done>> writeHashtags = Sink.ignore();
final Sink<Author, CompletionStage<Done>> writeAuthors = Sink.ignore();
final Sink<Hashtag, CompletionStage<Done>> writeHashtags = Sink.ignore();
//#flow-graph-broadcast
RunnableGraph.fromGraph(GraphDSL.create(b -> {
@ -317,24 +319,21 @@ public class TwitterStreamQuickstartDocTest {
@Test
public void demonstrateCountOnFiniteStream() {
//#tweets-fold-count
final Sink<Integer, Future<Integer>> sumSink =
final Sink<Integer, CompletionStage<Integer>> sumSink =
Sink.<Integer, Integer>fold(0, (acc, elem) -> acc + elem);
final RunnableGraph<Future<Integer>> counter =
final RunnableGraph<CompletionStage<Integer>> counter =
tweets.map(t -> 1).toMat(sumSink, Keep.right());
final Future<Integer> sum = counter.run(mat);
final CompletionStage<Integer> sum = counter.run(mat);
sum.foreach(new Foreach<Integer>() {
public void each(Integer c) {
System.out.println("Total tweets processed: " + c);
}
}, system.dispatcher());
sum.thenAcceptAsync(c -> System.out.println("Total tweets processed: " + c),
system.dispatcher());
//#tweets-fold-count
new Object() {
//#tweets-fold-count-oneline
final Future<Integer> sum = tweets.map(t -> 1).runWith(sumSink, mat);
final CompletionStage<Integer> sum = tweets.map(t -> 1).runWith(sumSink, mat);
//#tweets-fold-count-oneline
};
}
@ -344,18 +343,18 @@ public class TwitterStreamQuickstartDocTest {
final Source<Tweet, NotUsed> tweetsInMinuteFromNow = tweets; // not really in second, just acting as if
//#tweets-runnable-flow-materialized-twice
final Sink<Integer, Future<Integer>> sumSink =
final Sink<Integer, CompletionStage<Integer>> sumSink =
Sink.<Integer, Integer>fold(0, (acc, elem) -> acc + elem);
final RunnableGraph<Future<Integer>> counterRunnableGraph =
final RunnableGraph<CompletionStage<Integer>> counterRunnableGraph =
tweetsInMinuteFromNow
.filter(t -> t.hashtags().contains(AKKA))
.map(t -> 1)
.toMat(sumSink, Keep.right());
// materialize the stream once in the morning
final Future<Integer> morningTweetsCount = counterRunnableGraph.run(mat);
final CompletionStage<Integer> morningTweetsCount = counterRunnableGraph.run(mat);
// and once in the evening, reusing the blueprint
final Future<Integer> eveningTweetsCount = counterRunnableGraph.run(mat);
final CompletionStage<Integer> eveningTweetsCount = counterRunnableGraph.run(mat);
//#tweets-runnable-flow-materialized-twice
}

View file

@ -3,10 +3,9 @@
*/
package docs.stream.io;
import static org.junit.Assert.assertEquals;
import java.io.File;
import java.io.IOException;
import java.util.concurrent.CompletionStage;
import akka.Done;
import akka.actor.ActorSystem;
@ -15,7 +14,6 @@ import akka.stream.io.IOResult;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.FileIO;
import docs.stream.SilenceSystemOut;
import docs.stream.cookbook.RecipeParseLines;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@ -56,10 +54,10 @@ public class StreamFileDocTest {
try {
//#file-source
Sink<ByteString, Future<Done>> printlnSink =
Sink.foreach(chunk -> System.out.println(chunk.utf8String()));
Sink<ByteString, CompletionStage<Done>> printlnSink =
Sink.<ByteString> foreach(chunk -> System.out.println(chunk.utf8String()));
Future<IOResult> ioResult =
CompletionStage<IOResult> ioResult =
FileIO.fromFile(file)
.to(printlnSink)
.run(mat);
@ -74,7 +72,7 @@ public class StreamFileDocTest {
final File file = File.createTempFile(getClass().getName(), ".tmp");
try {
Sink<ByteString, Future<IOResult>> fileSink =
Sink<ByteString, CompletionStage<IOResult>> fileSink =
//#custom-dispatcher-code
FileIO.toFile(file)
.withAttributes(ActorAttributes.dispatcher("custom-blocking-io-dispatcher"));

View file

@ -3,6 +3,7 @@
*/
package docs.stream.io;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentLinkedQueue;
import akka.NotUsed;
@ -60,14 +61,14 @@ public class StreamTcpDocTest {
{
//#echo-server-simple-bind
// IncomingConnection and ServerBinding imported from Tcp
final Source<IncomingConnection, Future<ServerBinding>> connections =
final Source<IncomingConnection, CompletionStage<ServerBinding>> connections =
Tcp.get(system).bind("127.0.0.1", 8889);
//#echo-server-simple-bind
}
{
final InetSocketAddress localhost = SocketUtils.temporaryServerAddress();
final Source<IncomingConnection, Future<ServerBinding>> connections =
final Source<IncomingConnection, CompletionStage<ServerBinding>> connections =
Tcp.get(system).bind(localhost.getHostName(), localhost.getPort()); // TODO getHostString in Java7
//#echo-server-simple-handle
@ -93,7 +94,7 @@ public class StreamTcpDocTest {
final TestProbe serverProbe = new TestProbe(system);
final Source<IncomingConnection,Future<ServerBinding>> connections =
final Source<IncomingConnection,CompletionStage<ServerBinding>> connections =
Tcp.get(system).bind(localhost.getHostName(), localhost.getPort()); // TODO getHostString in Java7
//#welcome-banner-chat-server
connections.runForeach(connection -> {
@ -146,14 +147,14 @@ public class StreamTcpDocTest {
{
//#repl-client
final Flow<ByteString, ByteString, Future<OutgoingConnection>> connection =
final Flow<ByteString, ByteString, CompletionStage<OutgoingConnection>> connection =
Tcp.get(system).outgoingConnection("127.0.0.1", 8889);
//#repl-client
}
{
final Flow<ByteString, ByteString, Future<OutgoingConnection>> connection =
Tcp.get(system).outgoingConnection(localhost.getHostName(), localhost.getPort()); // TODO getHostString in Java7
final Flow<ByteString, ByteString, CompletionStage<OutgoingConnection>> connection =
Tcp.get(system).outgoingConnection(localhost.getHostString(), localhost.getPort());
//#repl-client
final PushStage<String, ByteString> replParser = new PushStage<String, ByteString>() {

View file

@ -20,12 +20,10 @@ import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import scala.Tuple2;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
@ -97,9 +95,9 @@ public class RecipeByteStrings extends RecipeTest {
rawBytes.transform(() -> new Chunker(CHUNK_LIMIT));
//#bytestring-chunker2
Future<List<ByteString>> chunksFuture = chunksStream.grouped(10).runWith(Sink.head(), mat);
CompletionStage<List<ByteString>> chunksFuture = chunksStream.grouped(10).runWith(Sink.head(), mat);
List<ByteString> chunks = Await.result(chunksFuture, FiniteDuration.create(3, TimeUnit.SECONDS));
List<ByteString> chunks = chunksFuture.toCompletableFuture().get(3, TimeUnit.SECONDS);
for (ByteString chunk : chunks) {
assertTrue(chunk.size() <= 2);
@ -159,9 +157,7 @@ public class RecipeByteStrings extends RecipeTest {
ByteString.fromArray(new byte[] { 4, 5, 6 }),
ByteString.fromArray(new byte[] { 7, 8, 9, 10 })));
FiniteDuration threeSeconds = FiniteDuration.create(3, TimeUnit.SECONDS);
List<ByteString> got = Await.result(bytes1.via(limiter).grouped(10).runWith(Sink.head(), mat), threeSeconds);
List<ByteString> got = bytes1.via(limiter).grouped(10).runWith(Sink.head(), mat).toCompletableFuture().get(3, TimeUnit.SECONDS);
ByteString acc = ByteString.empty();
for (ByteString b : got) {
acc = acc.concat(b);
@ -170,7 +166,7 @@ public class RecipeByteStrings extends RecipeTest {
boolean thrown = false;
try {
Await.result(bytes2.via(limiter).grouped(10).runWith(Sink.head(), mat), threeSeconds);
bytes2.via(limiter).grouped(10).runWith(Sink.head(), mat).toCompletableFuture().get(3, TimeUnit.SECONDS);
} catch (IllegalStateException ex) {
thrown = true;
}
@ -194,8 +190,7 @@ public class RecipeByteStrings extends RecipeTest {
Source<ByteString, NotUsed> compacted = rawBytes.map(bs -> bs.compact());
//#compacting-bytestrings
FiniteDuration timeout = FiniteDuration.create(3, TimeUnit.SECONDS);
List<ByteString> got = Await.result(compacted.grouped(10).runWith(Sink.head(), mat), timeout);
List<ByteString> got = compacted.grouped(10).runWith(Sink.head(), mat).toCompletableFuture().get(3, TimeUnit.SECONDS);
for (ByteString byteString : got) {
assertTrue(byteString.isCompact());

View file

@ -18,8 +18,6 @@ import akka.util.ByteString;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import scala.concurrent.Await;
import scala.concurrent.duration.Duration;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
@ -89,7 +87,7 @@ public class RecipeDigest extends RecipeTest {
.transform(() -> digestCalculator("SHA-256"));
//#calculating-digest2
ByteString got = Await.result(digest.runWith(Sink.head(), mat), Duration.create(3, TimeUnit.SECONDS));
ByteString got = digest.runWith(Sink.head(), mat).toCompletableFuture().get(3, TimeUnit.SECONDS);
assertEquals(ByteString.fromInts(
0x24, 0x8d, 0x6a, 0x61,
0xd2, 0x06, 0x38, 0xb8,

View file

@ -16,6 +16,7 @@ import scala.concurrent.Future;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletionStage;
public class RecipeDroppyBroadcast extends RecipeTest {
static ActorSystem system;
@ -38,7 +39,7 @@ public class RecipeDroppyBroadcast extends RecipeTest {
new JavaTestKit(system) {
//#droppy-bcast
// Makes a sink drop elements if too slow
public <T> Sink<T, Future<Done>> droppySink(Sink<T, Future<Done>> sink, int size) {
public <T> Sink<T, CompletionStage<Done>> droppySink(Sink<T, CompletionStage<Done>> sink, int size) {
return Flow.<T> create()
.buffer(size, OverflowStrategy.dropHead())
.toMat(sink, Keep.right());
@ -51,9 +52,9 @@ public class RecipeDroppyBroadcast extends RecipeTest {
nums.add(i + 1);
}
final Sink<Integer, Future<Done>> mySink1 = Sink.ignore();
final Sink<Integer, Future<Done>> mySink2 = Sink.ignore();
final Sink<Integer, Future<Done>> mySink3 = Sink.ignore();
final Sink<Integer, CompletionStage<Done>> mySink1 = Sink.ignore();
final Sink<Integer, CompletionStage<Done>> mySink2 = Sink.ignore();
final Sink<Integer, CompletionStage<Done>> mySink3 = Sink.ignore();
final Source<Integer, NotUsed> myData = Source.from(nums);

View file

@ -13,8 +13,6 @@ import akka.testkit.JavaTestKit;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import scala.concurrent.Await;
import scala.concurrent.duration.FiniteDuration;
import java.util.Arrays;
import java.util.List;
@ -50,8 +48,7 @@ public class RecipeFlattenList extends RecipeTest {
Source<Message, NotUsed> flattened = myData.mapConcat(i -> i);
//#flattening-lists
List<Message> got = Await.result(flattened.grouped(10).runWith(Sink.head(), mat),
new FiniteDuration(1, TimeUnit.SECONDS));
List<Message> got = flattened.grouped(10).runWith(Sink.head(), mat).toCompletableFuture().get(1, TimeUnit.SECONDS);
assertEquals(got.get(0), new Message("1"));
assertEquals(got.get(1), new Message("2"));
assertEquals(got.get(2), new Message("3"));

View file

@ -7,7 +7,7 @@ import akka.NotUsed;
import akka.actor.*;
import akka.dispatch.Mapper;
import akka.japi.pf.ReceiveBuilder;
import akka.pattern.Patterns;
import akka.pattern.PatternsCS;
import akka.stream.*;
import akka.stream.javadsl.*;
import akka.stream.testkit.TestSubscriber;
@ -25,6 +25,7 @@ import scala.runtime.BoxedUnit;
import java.util.*;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import static junit.framework.TestCase.assertTrue;
@ -150,14 +151,9 @@ public class RecipeGlobalRateLimit extends RecipeTest {
return f.mapAsync(parallelism, element -> {
final Timeout triggerTimeout = new Timeout(maxAllowedWait);
final Future<Object> limiterTriggerFuture =
Patterns.ask(limiter, Limiter.WANT_TO_PASS, triggerTimeout);
return limiterTriggerFuture.map(new Mapper<Object, T>() {
@Override
public T apply(Object parameter) {
return element;
}
}, system.dispatcher());
final CompletionStage<Object> limiterTriggerFuture =
PatternsCS.ask(limiter, Limiter.WANT_TO_PASS, triggerTimeout);
return limiterTriggerFuture.thenApplyAsync(response -> element, system.dispatcher());
});
}
//#global-limiter-flow

View file

@ -23,6 +23,7 @@ import scala.concurrent.duration.FiniteDuration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import static java.util.stream.Collectors.toList;
@ -125,7 +126,7 @@ public class RecipeMultiGroupByTest extends RecipeTest {
});
//#multi-groupby
Future<List<String>> result = multiGroups
CompletionStage<List<String>> result = multiGroups
.grouped(10)
.mergeSubstreams()
.map(pair -> {
@ -135,7 +136,7 @@ public class RecipeMultiGroupByTest extends RecipeTest {
.grouped(10)
.runWith(Sink.head(), mat);
List<String> got = Await.result(result, FiniteDuration.create(3, TimeUnit.SECONDS));
List<String> got = result.toCompletableFuture().get(3, TimeUnit.SECONDS);
assertTrue(got.contains("1[1: a, 1: b, all: c, all: d, 1: e]"));
assertTrue(got.contains("2[all: c, all: d]"));
}

View file

@ -15,8 +15,6 @@ import akka.util.ByteString;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import scala.concurrent.Await;
import scala.concurrent.duration.FiniteDuration;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
@ -53,7 +51,7 @@ public class RecipeParseLines extends RecipeTest {
.map(b -> b.utf8String());
//#parse-lines
Await.result(lines.grouped(10).runWith(Sink.head(), mat), new FiniteDuration(1, TimeUnit.SECONDS));
lines.grouped(10).runWith(Sink.head(), mat).toCompletableFuture().get(1, TimeUnit.SECONDS);
}
}

View file

@ -25,6 +25,8 @@ import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
public class RecipeReduceByKeyTest extends RecipeTest {
@ -63,8 +65,8 @@ public class RecipeReduceByKeyTest extends RecipeTest {
.mergeSubstreams();
//#word-count
final Future<List<Pair<String, Integer>>> f = counts.grouped(10).runWith(Sink.head(), mat);
final Set<Pair<String, Integer>> result = Await.result(f, getRemainingTime()).stream().collect(Collectors.toSet());
final CompletionStage<List<Pair<String, Integer>>> f = counts.grouped(10).runWith(Sink.head(), mat);
final Set<Pair<String, Integer>> result = f.toCompletableFuture().get(3, TimeUnit.SECONDS).stream().collect(Collectors.toSet());
final Set<Pair<String, Integer>> expected = new HashSet<>();
expected.add(new Pair<>("hello", 2));
expected.add(new Pair<>("world", 1));
@ -106,8 +108,8 @@ public class RecipeReduceByKeyTest extends RecipeTest {
(left, right) -> left + right));
//#reduce-by-key-general2
final Future<List<Pair<String, Integer>>> f = counts.grouped(10).runWith(Sink.head(), mat);
final Set<Pair<String, Integer>> result = Await.result(f, getRemainingTime()).stream().collect(Collectors.toSet());
final CompletionStage<List<Pair<String, Integer>>> f = counts.grouped(10).runWith(Sink.head(), mat);
final Set<Pair<String, Integer>> result = f.toCompletableFuture().get(3, TimeUnit.SECONDS).stream().collect(Collectors.toSet());
final Set<Pair<String, Integer>> expected = new HashSet<>();
expected.add(new Pair<>("hello", 2));
expected.add(new Pair<>("world", 1));

View file

@ -19,6 +19,7 @@ import scala.concurrent.duration.FiniteDuration;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
public class RecipeToStrict extends RecipeTest {
@ -45,11 +46,11 @@ public class RecipeToStrict extends RecipeTest {
final int MAX_ALLOWED_SIZE = 100;
//#draining-to-list
final Future<List<String>> strings = myData
final CompletionStage<List<String>> strings = myData
.grouped(MAX_ALLOWED_SIZE).runWith(Sink.head(), mat);
//#draining-to-list
Await.result(strings, new FiniteDuration(1, TimeUnit.SECONDS));
strings.toCompletableFuture().get(3, TimeUnit.SECONDS);
}
};
}

View file

@ -17,6 +17,7 @@ import scala.concurrent.duration.FiniteDuration;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertTrue;
@ -72,9 +73,8 @@ public class RecipeWorkerPool extends RecipeTest {
Source<Message, NotUsed> processedJobs = data.via(balancer);
//#worker-pool2
FiniteDuration timeout = FiniteDuration.create(200, TimeUnit.MILLISECONDS);
Future<List<String>> future = processedJobs.map(m -> m.msg).grouped(10).runWith(Sink.head(), mat);
List<String> got = Await.result(future, timeout);
CompletionStage<List<String>> future = processedJobs.map(m -> m.msg).grouped(10).runWith(Sink.head(), mat);
List<String> got = future.toCompletableFuture().get(1, TimeUnit.SECONDS);
assertTrue(got.contains("1 done"));
assertTrue(got.contains("2 done"));
assertTrue(got.contains("3 done"));