StreamTcpDocSpec: Less race, better error if client fails #24500
This commit is contained in:
parent
4cde6e7feb
commit
cb048457c8
2 changed files with 65 additions and 50 deletions
|
|
@ -12,6 +12,7 @@ import jdocs.AbstractJavaTest;
|
|||
import jdocs.stream.SilenceSystemOut;
|
||||
import akka.testkit.javadsl.TestKit;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
|
|
@ -47,7 +48,7 @@ public class StreamTcpDocTest extends AbstractJavaTest {
|
|||
|
||||
final SilenceSystemOut.System System = SilenceSystemOut.get();
|
||||
|
||||
private final ConcurrentLinkedQueue<String> input = new ConcurrentLinkedQueue<String>();
|
||||
private final ConcurrentLinkedQueue<String> input = new ConcurrentLinkedQueue<>();
|
||||
{
|
||||
input.add("Hello world");
|
||||
input.add("What a lovely day");
|
||||
|
|
@ -90,27 +91,27 @@ public class StreamTcpDocTest extends AbstractJavaTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void actuallyWorkingClientServerApp() {
|
||||
public void actuallyWorkingClientServerApp() throws Exception {
|
||||
|
||||
final InetSocketAddress localhost = SocketUtil.temporaryServerAddress("127.0.0.1", false);
|
||||
|
||||
final TestProbe serverProbe = new TestProbe(system);
|
||||
|
||||
final Source<IncomingConnection,CompletionStage<ServerBinding>> connections =
|
||||
Tcp.get(system).bind(localhost.getHostString(), localhost.getPort());
|
||||
Tcp.get(system).bind(localhost.getHostString(), localhost.getPort());
|
||||
final CompletionStage<ServerBinding> bindingCS =
|
||||
//#welcome-banner-chat-server
|
||||
connections.runForeach(connection -> {
|
||||
// server logic, parses incoming commands
|
||||
final Flow<String, String, NotUsed> commandParser =
|
||||
connections.to(Sink.foreach((IncomingConnection connection) -> {
|
||||
// server logic, parses incoming commands
|
||||
final Flow<String, String, NotUsed> commandParser =
|
||||
Flow.<String>create()
|
||||
.takeWhile(elem -> !elem.equals("BYE"))
|
||||
.map(elem -> elem + "!");
|
||||
|
||||
final String welcomeMsg = "Welcome to: " + connection.localAddress() +
|
||||
final String welcomeMsg = "Welcome to: " + connection.localAddress() +
|
||||
" you are: " + connection.remoteAddress() + "!";
|
||||
|
||||
final Source<String, NotUsed> welcome = Source.single(welcomeMsg);
|
||||
final Flow<ByteString, ByteString, NotUsed> serverLogic =
|
||||
final Source<String, NotUsed> welcome = Source.single(welcomeMsg);
|
||||
final Flow<ByteString, ByteString, NotUsed> serverLogic =
|
||||
Flow.of(ByteString.class)
|
||||
.via(Framing.delimiter(ByteString.fromString("\n"), 256, FramingTruncation.DISALLOW))
|
||||
.map(ByteString::utf8String)
|
||||
|
|
@ -122,16 +123,20 @@ public class StreamTcpDocTest extends AbstractJavaTest {
|
|||
//#welcome-banner-chat-server
|
||||
.via(commandParser)
|
||||
.merge(welcome)
|
||||
.map(s -> s + "\n")
|
||||
.map(s -> s + "\n")
|
||||
.map(ByteString::fromString);
|
||||
|
||||
connection.handleWith(serverLogic, mat);
|
||||
}, mat);
|
||||
|
||||
connection.handleWith(serverLogic, mat);
|
||||
})).run(mat);
|
||||
//#welcome-banner-chat-server
|
||||
|
||||
// make sure server is bound before we do anything else
|
||||
bindingCS.toCompletableFuture().get(3, TimeUnit.SECONDS);
|
||||
|
||||
|
||||
{
|
||||
//#repl-client
|
||||
// just for docs, never actually used
|
||||
//#repl-client
|
||||
final Flow<ByteString, ByteString, CompletionStage<OutgoingConnection>> connection =
|
||||
Tcp.get(system).outgoingConnection("127.0.0.1", 8888);
|
||||
//#repl-client
|
||||
|
|
@ -139,13 +144,13 @@ public class StreamTcpDocTest extends AbstractJavaTest {
|
|||
|
||||
{
|
||||
final Flow<ByteString, ByteString, CompletionStage<OutgoingConnection>> connection =
|
||||
Tcp.get(system).outgoingConnection(localhost.getHostString(), localhost.getPort());
|
||||
Tcp.get(system).outgoingConnection(localhost.getHostString(), localhost.getPort());
|
||||
//#repl-client
|
||||
final Flow<String, ByteString, NotUsed> replParser =
|
||||
Flow.<String>create()
|
||||
.takeWhile(elem -> !elem.equals("q"))
|
||||
.concat(Source.single("BYE")) // will run after the original flow completes
|
||||
.map(elem -> ByteString.fromString(elem + "\n"));
|
||||
Flow.<String>create()
|
||||
.takeWhile(elem -> !elem.equals("q"))
|
||||
.concat(Source.single("BYE")) // will run after the original flow completes
|
||||
.map(elem -> ByteString.fromString(elem + "\n"));
|
||||
|
||||
final Flow<ByteString, ByteString, NotUsed> repl = Flow.of(ByteString.class)
|
||||
.via(Framing.delimiter(ByteString.fromString("\n"), 256, FramingTruncation.DISALLOW))
|
||||
|
|
@ -154,10 +159,12 @@ public class StreamTcpDocTest extends AbstractJavaTest {
|
|||
.map(elem -> readLine("> "))
|
||||
.via(replParser);
|
||||
|
||||
connection.join(repl).run(mat);
|
||||
CompletionStage<OutgoingConnection> connectionCS = connection.join(repl).run(mat);
|
||||
//#repl-client
|
||||
}
|
||||
|
||||
// make sure it got connected (or fails the test)
|
||||
connectionCS.toCompletableFuture().get(5L, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
|
||||
serverProbe.expectMsg("Hello world");
|
||||
|
|
|
|||
|
|
@ -63,39 +63,43 @@ class StreamTcpDocSpec extends AkkaSpec {
|
|||
|
||||
"initial server banner echo server" in {
|
||||
val localhost = SocketUtil.temporaryServerAddress()
|
||||
|
||||
val connections = Tcp().bind(localhost.getHostString, localhost.getPort)
|
||||
val serverProbe = TestProbe()
|
||||
|
||||
import akka.stream.scaladsl.Framing
|
||||
val binding =
|
||||
//#welcome-banner-chat-server
|
||||
connections.to(Sink.foreach { connection ⇒
|
||||
|
||||
// server logic, parses incoming commands
|
||||
val commandParser = Flow[String].takeWhile(_ != "BYE").map(_ + "!")
|
||||
|
||||
import connection._
|
||||
val welcomeMsg = s"Welcome to: $localAddress, you are: $remoteAddress!"
|
||||
val welcome = Source.single(welcomeMsg)
|
||||
|
||||
val serverLogic = Flow[ByteString]
|
||||
.via(Framing.delimiter(
|
||||
ByteString("\n"),
|
||||
maximumFrameLength = 256,
|
||||
allowTruncation = true))
|
||||
.map(_.utf8String)
|
||||
//#welcome-banner-chat-server
|
||||
.map { command ⇒ serverProbe.ref ! command; command }
|
||||
//#welcome-banner-chat-server
|
||||
.via(commandParser)
|
||||
// merge in the initial banner after parser
|
||||
.merge(welcome)
|
||||
.map(_ + "\n")
|
||||
.map(ByteString(_))
|
||||
|
||||
connection.handleWith(serverLogic)
|
||||
}).run()
|
||||
//#welcome-banner-chat-server
|
||||
|
||||
connections.runForeach { connection ⇒
|
||||
|
||||
// server logic, parses incoming commands
|
||||
val commandParser = Flow[String].takeWhile(_ != "BYE").map(_ + "!")
|
||||
|
||||
import connection._
|
||||
val welcomeMsg = s"Welcome to: $localAddress, you are: $remoteAddress!"
|
||||
val welcome = Source.single(welcomeMsg)
|
||||
|
||||
val serverLogic = Flow[ByteString]
|
||||
.via(Framing.delimiter(
|
||||
ByteString("\n"),
|
||||
maximumFrameLength = 256,
|
||||
allowTruncation = true))
|
||||
.map(_.utf8String)
|
||||
//#welcome-banner-chat-server
|
||||
.map { command ⇒ serverProbe.ref ! command; command }
|
||||
//#welcome-banner-chat-server
|
||||
.via(commandParser)
|
||||
// merge in the initial banner after parser
|
||||
.merge(welcome)
|
||||
.map(_ + "\n")
|
||||
.map(ByteString(_))
|
||||
|
||||
connection.handleWith(serverLogic)
|
||||
}
|
||||
//#welcome-banner-chat-server
|
||||
// make sure server is started before we connect
|
||||
binding.futureValue
|
||||
|
||||
import akka.stream.scaladsl.Framing
|
||||
|
||||
|
|
@ -108,6 +112,7 @@ class StreamTcpDocSpec extends AkkaSpec {
|
|||
}
|
||||
|
||||
{
|
||||
// just for docs, never actually used
|
||||
//#repl-client
|
||||
val connection = Tcp().outgoingConnection("127.0.0.1", 8888)
|
||||
//#repl-client
|
||||
|
|
@ -132,8 +137,11 @@ class StreamTcpDocSpec extends AkkaSpec {
|
|||
.map(_ ⇒ readLine("> "))
|
||||
.via(replParser)
|
||||
|
||||
connection.join(repl).run()
|
||||
val connected = connection.join(repl).run()
|
||||
//#repl-client
|
||||
|
||||
// make sure we have a connection or fail already here
|
||||
connected.futureValue
|
||||
}
|
||||
|
||||
serverProbe.expectMsg("Hello world")
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue