diff --git a/akka-docs/src/test/java/jdocs/stream/io/StreamTcpDocTest.java b/akka-docs/src/test/java/jdocs/stream/io/StreamTcpDocTest.java index 7f409851d9..4ab0f0d669 100644 --- a/akka-docs/src/test/java/jdocs/stream/io/StreamTcpDocTest.java +++ b/akka-docs/src/test/java/jdocs/stream/io/StreamTcpDocTest.java @@ -12,6 +12,7 @@ import jdocs.AbstractJavaTest; import jdocs.stream.SilenceSystemOut; import akka.testkit.javadsl.TestKit; import java.net.InetSocketAddress; +import java.util.concurrent.TimeUnit; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -47,7 +48,7 @@ public class StreamTcpDocTest extends AbstractJavaTest { final SilenceSystemOut.System System = SilenceSystemOut.get(); - private final ConcurrentLinkedQueue input = new ConcurrentLinkedQueue(); + private final ConcurrentLinkedQueue input = new ConcurrentLinkedQueue<>(); { input.add("Hello world"); input.add("What a lovely day"); @@ -90,27 +91,27 @@ public class StreamTcpDocTest extends AbstractJavaTest { } @Test - public void actuallyWorkingClientServerApp() { + public void actuallyWorkingClientServerApp() throws Exception { final InetSocketAddress localhost = SocketUtil.temporaryServerAddress("127.0.0.1", false); - final TestProbe serverProbe = new TestProbe(system); final Source> connections = - Tcp.get(system).bind(localhost.getHostString(), localhost.getPort()); + Tcp.get(system).bind(localhost.getHostString(), localhost.getPort()); + final CompletionStage bindingCS = //#welcome-banner-chat-server - connections.runForeach(connection -> { - // server logic, parses incoming commands - final Flow commandParser = + connections.to(Sink.foreach((IncomingConnection connection) -> { + // server logic, parses incoming commands + final Flow commandParser = Flow.create() .takeWhile(elem -> !elem.equals("BYE")) .map(elem -> elem + "!"); - final String welcomeMsg = "Welcome to: " + connection.localAddress() + + final String welcomeMsg = "Welcome to: " + connection.localAddress() + " you are: " + connection.remoteAddress() + "!"; - final Source welcome = Source.single(welcomeMsg); - final Flow serverLogic = + final Source welcome = Source.single(welcomeMsg); + final Flow serverLogic = Flow.of(ByteString.class) .via(Framing.delimiter(ByteString.fromString("\n"), 256, FramingTruncation.DISALLOW)) .map(ByteString::utf8String) @@ -122,16 +123,20 @@ public class StreamTcpDocTest extends AbstractJavaTest { //#welcome-banner-chat-server .via(commandParser) .merge(welcome) - .map(s -> s + "\n") + .map(s -> s + "\n") .map(ByteString::fromString); - connection.handleWith(serverLogic, mat); - }, mat); - + connection.handleWith(serverLogic, mat); + })).run(mat); //#welcome-banner-chat-server + // make sure server is bound before we do anything else + bindingCS.toCompletableFuture().get(3, TimeUnit.SECONDS); + + { - //#repl-client + // just for docs, never actually used + //#repl-client final Flow> connection = Tcp.get(system).outgoingConnection("127.0.0.1", 8888); //#repl-client @@ -139,13 +144,13 @@ public class StreamTcpDocTest extends AbstractJavaTest { { final Flow> connection = - Tcp.get(system).outgoingConnection(localhost.getHostString(), localhost.getPort()); + Tcp.get(system).outgoingConnection(localhost.getHostString(), localhost.getPort()); //#repl-client final Flow replParser = - Flow.create() - .takeWhile(elem -> !elem.equals("q")) - .concat(Source.single("BYE")) // will run after the original flow completes - .map(elem -> ByteString.fromString(elem + "\n")); + Flow.create() + .takeWhile(elem -> !elem.equals("q")) + .concat(Source.single("BYE")) // will run after the original flow completes + .map(elem -> ByteString.fromString(elem + "\n")); final Flow repl = Flow.of(ByteString.class) .via(Framing.delimiter(ByteString.fromString("\n"), 256, FramingTruncation.DISALLOW)) @@ -154,10 +159,12 @@ public class StreamTcpDocTest extends AbstractJavaTest { .map(elem -> readLine("> ")) .via(replParser); - connection.join(repl).run(mat); + CompletionStage connectionCS = connection.join(repl).run(mat); //#repl-client - } + // make sure it got connected (or fails the test) + connectionCS.toCompletableFuture().get(5L, TimeUnit.SECONDS); + } serverProbe.expectMsg("Hello world"); diff --git a/akka-docs/src/test/scala/docs/stream/io/StreamTcpDocSpec.scala b/akka-docs/src/test/scala/docs/stream/io/StreamTcpDocSpec.scala index cf48493848..e1b5304449 100644 --- a/akka-docs/src/test/scala/docs/stream/io/StreamTcpDocSpec.scala +++ b/akka-docs/src/test/scala/docs/stream/io/StreamTcpDocSpec.scala @@ -63,39 +63,43 @@ class StreamTcpDocSpec extends AkkaSpec { "initial server banner echo server" in { val localhost = SocketUtil.temporaryServerAddress() + val connections = Tcp().bind(localhost.getHostString, localhost.getPort) val serverProbe = TestProbe() import akka.stream.scaladsl.Framing + val binding = + //#welcome-banner-chat-server + connections.to(Sink.foreach { connection ⇒ + + // server logic, parses incoming commands + val commandParser = Flow[String].takeWhile(_ != "BYE").map(_ + "!") + + import connection._ + val welcomeMsg = s"Welcome to: $localAddress, you are: $remoteAddress!" + val welcome = Source.single(welcomeMsg) + + val serverLogic = Flow[ByteString] + .via(Framing.delimiter( + ByteString("\n"), + maximumFrameLength = 256, + allowTruncation = true)) + .map(_.utf8String) + //#welcome-banner-chat-server + .map { command ⇒ serverProbe.ref ! command; command } + //#welcome-banner-chat-server + .via(commandParser) + // merge in the initial banner after parser + .merge(welcome) + .map(_ + "\n") + .map(ByteString(_)) + + connection.handleWith(serverLogic) + }).run() //#welcome-banner-chat-server - connections.runForeach { connection ⇒ - - // server logic, parses incoming commands - val commandParser = Flow[String].takeWhile(_ != "BYE").map(_ + "!") - - import connection._ - val welcomeMsg = s"Welcome to: $localAddress, you are: $remoteAddress!" - val welcome = Source.single(welcomeMsg) - - val serverLogic = Flow[ByteString] - .via(Framing.delimiter( - ByteString("\n"), - maximumFrameLength = 256, - allowTruncation = true)) - .map(_.utf8String) - //#welcome-banner-chat-server - .map { command ⇒ serverProbe.ref ! command; command } - //#welcome-banner-chat-server - .via(commandParser) - // merge in the initial banner after parser - .merge(welcome) - .map(_ + "\n") - .map(ByteString(_)) - - connection.handleWith(serverLogic) - } - //#welcome-banner-chat-server + // make sure server is started before we connect + binding.futureValue import akka.stream.scaladsl.Framing @@ -108,6 +112,7 @@ class StreamTcpDocSpec extends AkkaSpec { } { + // just for docs, never actually used //#repl-client val connection = Tcp().outgoingConnection("127.0.0.1", 8888) //#repl-client @@ -132,8 +137,11 @@ class StreamTcpDocSpec extends AkkaSpec { .map(_ ⇒ readLine("> ")) .via(replParser) - connection.join(repl).run() + val connected = connection.join(repl).run() //#repl-client + + // make sure we have a connection or fail already here + connected.futureValue } serverProbe.expectMsg("Hello world")