#19451 use random port in WebsocketIntegrationSpec
This commit is contained in:
parent
e0361ece66
commit
8d6bacab2f
1 changed files with 8 additions and 5 deletions
|
|
@ -32,16 +32,18 @@ class WebsocketIntegrationSpec extends AkkaSpec("akka.stream.materializer.debug.
|
|||
"A Websocket server" must {
|
||||
|
||||
"echo 100 elements and then shut down without error" in Utils.assertAllStagesStopped {
|
||||
|
||||
val bindingFuture = Http().bindAndHandleSync({
|
||||
case HttpRequest(_, _, headers, _, _) ⇒
|
||||
val upgrade = headers.collectFirst { case u: UpgradeToWebsocket ⇒ u }.get
|
||||
upgrade.handleMessages(Flow.apply, None)
|
||||
}, interface = "localhost", port = 8080)
|
||||
}, interface = "localhost", port = 0)
|
||||
val binding = Await.result(bindingFuture, 3.seconds)
|
||||
val myPort = binding.localAddress.getPort
|
||||
|
||||
val N = 100
|
||||
val (response, count) = Http().singleWebsocketRequest(
|
||||
WebsocketRequest("ws://127.0.0.1:8080"),
|
||||
WebsocketRequest("ws://127.0.0.1:" + myPort),
|
||||
Flow.fromSinkAndSourceMat(
|
||||
Sink.fold(0)((n, _: Message) ⇒ n + 1),
|
||||
Source.repeat(TextMessage("hello")).take(N))(Keep.left))
|
||||
|
|
@ -73,18 +75,19 @@ class WebsocketIntegrationSpec extends AkkaSpec("akka.stream.materializer.debug.
|
|||
case HttpRequest(_, _, headers, _, _) ⇒
|
||||
val upgrade = headers.collectFirst { case u: UpgradeToWebsocket ⇒ u }.get
|
||||
upgrade.handleMessages(handler, None)
|
||||
}, interface = "localhost", port = 8080)
|
||||
}, interface = "localhost", port = 0)
|
||||
val binding = Await.result(bindingFuture, 3.seconds)
|
||||
val myPort = binding.localAddress.getPort
|
||||
|
||||
@volatile var messages = 0
|
||||
val (breaker, completion) =
|
||||
Source.maybe
|
||||
.viaMat {
|
||||
Http().websocketClientLayer(WebsocketRequest("ws://localhost:8080"))
|
||||
Http().websocketClientLayer(WebsocketRequest("ws://localhost:" + myPort))
|
||||
.atop(SslTlsPlacebo.forScala)
|
||||
// the resource leak of #19398 existed only for severed websocket connections
|
||||
.atopMat(GraphStages.bidiBreaker[ByteString, ByteString])(Keep.right)
|
||||
.join(Tcp().outgoingConnection(new InetSocketAddress("localhost", 8080), halfClose = true))
|
||||
.join(Tcp().outgoingConnection(new InetSocketAddress("localhost", myPort), halfClose = true))
|
||||
}(Keep.right)
|
||||
.toMat(Sink.foreach(_ ⇒ messages += 1))(Keep.both)
|
||||
.run()
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue