Merge pull request #16047 from ktoso/wip-java-streams-ktoso

+str #15736 EARLY WIP akka-streams JavaAPIs
This commit is contained in:
Martynas Mickevičius 2014-10-10 19:28:04 +03:00
commit 31642396a1
39 changed files with 2294 additions and 1569 deletions

View file

@ -11,9 +11,10 @@ import akka.actor.ActorSystem;
import akka.dispatch.Foreach;
import akka.japi.Function;
import akka.japi.Procedure;
import akka.stream.FlowMaterializer;
import akka.stream.MaterializerSettings;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Source;
import akka.stream.javadsl.SubscriberDrain;
import akka.stream.scaladsl2.FlowMaterializer;
import scala.concurrent.Future;
import java.io.BufferedReader;
@ -34,18 +35,18 @@ public abstract class JavaTestServer {
ServerBinding binding = (ServerBinding) result;
System.out.println("Bound to " + binding.localAddress());
Flow.create(binding.getConnectionStream()).foreach(new Procedure<IncomingConnection>() {
Source.from(binding.getConnectionStream()).foreach(new akka.stream.javadsl.japi.Procedure<IncomingConnection>() {
@Override
public void apply(IncomingConnection conn) throws Exception {
System.out.println("New incoming connection from " + conn.remoteAddress());
Flow.create(conn.getRequestPublisher()).map(new Function<HttpRequest, HttpResponse>() {
Source.from(conn.getRequestPublisher()).map(new akka.stream.javadsl.japi.Function<HttpRequest, HttpResponse>() {
@Override
public HttpResponse apply(HttpRequest request) throws Exception {
System.out.println("Handling request to " + request.getUri());
return JavaApiTestCases.handleRequest(request);
}
}).produceTo(conn.getResponseSubscriber(), materializer);
}).runWith(SubscriberDrain.create(conn.getResponseSubscriber()), materializer);
}
}, materializer);
}