=pro fix junit version to be the same in both scopes
This commit is contained in:
parent
100f82be84
commit
0dd889aa17
4 changed files with 4 additions and 239 deletions
|
|
@ -542,7 +542,7 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
|
|||
* In order to allow for easy response-to-request association the flow takes in a custom, opaque context
|
||||
* object of type `T` from the application which is emitted together with the corresponding response.
|
||||
*/
|
||||
def superPool[T](materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], Unit] =
|
||||
def superPool[T](materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], NotUsed] =
|
||||
adaptTupleFlow(delegate.superPool[T]()(materializer))
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -1,174 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.http.impl.engine.ws
|
||||
|
||||
import scala.concurrent.Await
|
||||
import scala.concurrent.duration.DurationInt
|
||||
import org.scalactic.ConversionCheckedTripleEquals
|
||||
import org.scalatest.concurrent.ScalaFutures
|
||||
import org.scalatest.time.Span.convertDurationToSpan
|
||||
import akka.http.scaladsl.Http
|
||||
import akka.http.scaladsl.model.HttpRequest
|
||||
import akka.http.scaladsl.model.Uri.apply
|
||||
import akka.http.scaladsl.model.ws._
|
||||
import akka.stream._
|
||||
import akka.stream.scaladsl._
|
||||
import akka.stream.testkit._
|
||||
import akka.stream.scaladsl.GraphDSL.Implicits._
|
||||
import org.scalatest.concurrent.Eventually
|
||||
import akka.stream.io.SslTlsPlacebo
|
||||
import java.net.InetSocketAddress
|
||||
import akka.stream.impl.fusing.GraphStages
|
||||
import akka.util.ByteString
|
||||
import akka.http.scaladsl.model.StatusCodes
|
||||
import akka.stream.testkit.scaladsl.TestSink
|
||||
import scala.concurrent.Future
|
||||
|
||||
class WebsocketIntegrationSpec extends AkkaSpec("akka.stream.materializer.debug.fuzzing-mode=off")
|
||||
with ScalaFutures with ConversionCheckedTripleEquals with Eventually {
|
||||
|
||||
implicit val patience = PatienceConfig(3.seconds)
|
||||
import system.dispatcher
|
||||
implicit val materializer = ActorMaterializer()
|
||||
|
||||
"A Websocket server" must {
|
||||
|
||||
"not reset the connection when no data are flowing" in Utils.assertAllStagesStopped {
|
||||
val source = TestPublisher.probe[Message]()
|
||||
val bindingFuture = Http().bindAndHandleSync({
|
||||
case HttpRequest(_, _, headers, _, _) ⇒
|
||||
val upgrade = headers.collectFirst { case u: UpgradeToWebsocket ⇒ u }.get
|
||||
upgrade.handleMessages(Flow.fromSinkAndSource(Sink.ignore, Source.fromPublisher(source)), None)
|
||||
}, interface = "localhost", port = 0)
|
||||
val binding = Await.result(bindingFuture, 3.seconds)
|
||||
val myPort = binding.localAddress.getPort
|
||||
|
||||
val (response, sink) = Http().singleWebsocketRequest(
|
||||
WebsocketRequest("ws://127.0.0.1:" + myPort),
|
||||
Flow.fromSinkAndSourceMat(TestSink.probe[Message], Source.empty)(Keep.left))
|
||||
|
||||
response.futureValue.response.status.isSuccess should ===(true)
|
||||
sink
|
||||
.request(10)
|
||||
.expectNoMsg(500.millis)
|
||||
|
||||
source
|
||||
.sendNext(TextMessage("hello"))
|
||||
.sendComplete()
|
||||
sink
|
||||
.expectNext(TextMessage("hello"))
|
||||
.expectComplete()
|
||||
|
||||
binding.unbind()
|
||||
}
|
||||
|
||||
"not reset the connection when no data are flowing and the connection is closed from the client" in Utils.assertAllStagesStopped {
|
||||
val source = TestPublisher.probe[Message]()
|
||||
val bindingFuture = Http().bindAndHandleSync({
|
||||
case HttpRequest(_, _, headers, _, _) ⇒
|
||||
val upgrade = headers.collectFirst { case u: UpgradeToWebsocket ⇒ u }.get
|
||||
upgrade.handleMessages(Flow.fromSinkAndSource(Sink.ignore, Source.fromPublisher(source)), None)
|
||||
}, interface = "localhost", port = 0)
|
||||
val binding = Await.result(bindingFuture, 3.seconds)
|
||||
val myPort = binding.localAddress.getPort
|
||||
|
||||
val ((response, breaker), sink) =
|
||||
Source.empty
|
||||
.viaMat {
|
||||
Http().websocketClientLayer(WebsocketRequest("ws://localhost:" + myPort))
|
||||
.atop(SslTlsPlacebo.forScala)
|
||||
.joinMat(Flow.fromGraph(GraphStages.breaker[ByteString]).via(
|
||||
Tcp().outgoingConnection(new InetSocketAddress("localhost", myPort), halfClose = true)))(Keep.both)
|
||||
}(Keep.right)
|
||||
.toMat(TestSink.probe[Message])(Keep.both)
|
||||
.run()
|
||||
|
||||
response.futureValue.response.status.isSuccess should ===(true)
|
||||
sink
|
||||
.request(10)
|
||||
.expectNoMsg(1500.millis)
|
||||
|
||||
breaker.value.get.get.complete()
|
||||
|
||||
source
|
||||
.sendNext(TextMessage("hello"))
|
||||
.sendComplete()
|
||||
sink
|
||||
.expectNext(TextMessage("hello"))
|
||||
.expectComplete()
|
||||
|
||||
binding.unbind()
|
||||
}
|
||||
|
||||
"echo 100 elements and then shut down without error" in Utils.assertAllStagesStopped {
|
||||
|
||||
val bindingFuture = Http().bindAndHandleSync({
|
||||
case HttpRequest(_, _, headers, _, _) ⇒
|
||||
val upgrade = headers.collectFirst { case u: UpgradeToWebsocket ⇒ u }.get
|
||||
upgrade.handleMessages(Flow.apply, None)
|
||||
}, interface = "localhost", port = 0)
|
||||
val binding = Await.result(bindingFuture, 3.seconds)
|
||||
val myPort = binding.localAddress.getPort
|
||||
|
||||
val N = 100
|
||||
val (response, count) = Http().singleWebsocketRequest(
|
||||
WebsocketRequest("ws://127.0.0.1:" + myPort),
|
||||
Flow.fromSinkAndSourceMat(
|
||||
Sink.fold(0)((n, _: Message) ⇒ n + 1),
|
||||
Source.repeat(TextMessage("hello")).take(N))(Keep.left))
|
||||
|
||||
count.futureValue should ===(N)
|
||||
binding.unbind()
|
||||
}
|
||||
|
||||
"send back 100 elements and then terminate without error even when not ordinarily closed" in Utils.assertAllStagesStopped {
|
||||
val N = 100
|
||||
|
||||
val handler = Flow.fromGraph(GraphDSL.create() { implicit b ⇒
|
||||
val merge = b.add(Merge[Int](2))
|
||||
|
||||
// convert to int so we can connect to merge
|
||||
val mapMsgToInt = b.add(Flow[Message].map(_ ⇒ -1))
|
||||
val mapIntToMsg = b.add(Flow[Int].map(x ⇒ TextMessage.Strict(s"Sending: $x")))
|
||||
|
||||
// source we want to use to send message to the connected websocket sink
|
||||
val rangeSource = b.add(Source(1 to N))
|
||||
|
||||
mapMsgToInt ~> merge // this part of the merge will never provide msgs
|
||||
rangeSource ~> merge ~> mapIntToMsg
|
||||
|
||||
FlowShape(mapMsgToInt.in, mapIntToMsg.out)
|
||||
})
|
||||
|
||||
val bindingFuture = Http().bindAndHandleSync({
|
||||
case HttpRequest(_, _, headers, _, _) ⇒
|
||||
val upgrade = headers.collectFirst { case u: UpgradeToWebsocket ⇒ u }.get
|
||||
upgrade.handleMessages(handler, None)
|
||||
}, interface = "localhost", port = 0)
|
||||
val binding = Await.result(bindingFuture, 3.seconds)
|
||||
val myPort = binding.localAddress.getPort
|
||||
|
||||
@volatile var messages = 0
|
||||
val (breaker, completion) =
|
||||
Source.maybe
|
||||
.viaMat {
|
||||
Http().websocketClientLayer(WebsocketRequest("ws://localhost:" + myPort))
|
||||
.atop(SslTlsPlacebo.forScala)
|
||||
// the resource leak of #19398 existed only for severed websocket connections
|
||||
.atopMat(GraphStages.bidiBreaker[ByteString, ByteString])(Keep.right)
|
||||
.join(Tcp().outgoingConnection(new InetSocketAddress("localhost", myPort), halfClose = true))
|
||||
}(Keep.right)
|
||||
.toMat(Sink.foreach(_ ⇒ messages += 1))(Keep.both)
|
||||
.run()
|
||||
eventually(messages should ===(N))
|
||||
// breaker should have been fulfilled long ago
|
||||
breaker.value.get.get.completeAndCancel()
|
||||
completion.futureValue
|
||||
|
||||
binding.unbind()
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
62
notes.md
62
notes.md
|
|
@ -1,62 +0,0 @@
|
|||
Notes on changes
|
||||
|
||||
- hidden "Setup" using methods on Http
|
||||
- super pool to be "dead simple"
|
||||
- we want to move away from Option[HttpsContext] as it's a lie, None => defaultContext anyway
|
||||
- config performed in ssl-config, applying these settings done in Akka
|
||||
- e.g. NegotiateNewSession
|
||||
- was: singleRequest(req, settings, context: Option[HttpsContext]) == None meant default
|
||||
- default port in context is useful for starting the https server
|
||||
|
||||
- in WS, we'll always want to be TLS in practice. APIs use HttpsContext, but provide default one
|
||||
- if request is to "ws://" then the https is not used of course
|
||||
|
||||
### Server
|
||||
|
||||
Needs to know upfront.
|
||||
|
||||
**bind / bindAndHandle**
|
||||
- has context
|
||||
- default HTTP
|
||||
- if no port given, based on Context 80/443
|
||||
|
||||
=> Type: ConnectionContext - based on type HTTP / HTTPS
|
||||
Note: context should be obtainable Http().defaultServerHttpsContext
|
||||
|
||||
### Client
|
||||
|
||||
## connections
|
||||
Needs to know upfront.
|
||||
|
||||
**outgoingConnection**
|
||||
- no context
|
||||
|
||||
**outgoingConnectionTls**
|
||||
- needs https context
|
||||
- provides default HTTPS
|
||||
|
||||
**outgoingConnection**
|
||||
- no context
|
||||
|
||||
**newHostConnectionPoolTls**
|
||||
- needs https context
|
||||
|
||||
=> Tls methods provide default HTTPS config
|
||||
Type: HttpsConnectionConfig on Tls methods
|
||||
|
||||
## request sensitive (adds TLS when needed):
|
||||
Needs context "just in case", enables when request needs it.
|
||||
|
||||
**singleRequest**
|
||||
- has context, default HTTPS, may drop it
|
||||
|
||||
**singleWebSocketRequest**
|
||||
- has context, default HTTPS, may drop it
|
||||
|
||||
**singleWebSocketRequest**
|
||||
- needs context, "just in case"
|
||||
- provides default HTTPS
|
||||
|
||||
=> normal methods, Tls methods
|
||||
=> Tls methods provide default HTTPS config
|
||||
Type: HttpsConnectionConfig on Tls methods
|
||||
|
|
@ -9,6 +9,7 @@ object Dependencies {
|
|||
lazy val scalaTestVersion = settingKey[String]("The version of ScalaTest to use.")
|
||||
lazy val scalaStmVersion = settingKey[String]("The version of ScalaSTM to use.")
|
||||
lazy val scalaCheckVersion = settingKey[String]("The version of ScalaCheck to use.")
|
||||
val junitVersion = "4.12"
|
||||
|
||||
val Versions = Seq(
|
||||
crossScalaVersions := Seq("2.11.7"), //"2.12.0-M2"
|
||||
|
|
@ -54,7 +55,7 @@ object Dependencies {
|
|||
val jackson = "com.fasterxml.jackson.core" % "jackson-databind" % "2.4.3" // ApacheV2
|
||||
|
||||
// For akka-http-testkit-java
|
||||
val junit = "junit" % "junit" % "4.11" // Common Public License 1.0
|
||||
val junit = "junit" % "junit" % junitVersion // Common Public License 1.0
|
||||
|
||||
// For Java 8 Conversions
|
||||
val java8Compat = "org.scala-lang.modules" %% "scala-java8-compat" % "0.7.0" // Scala License
|
||||
|
|
@ -68,7 +69,7 @@ object Dependencies {
|
|||
val commonsMath = "org.apache.commons" % "commons-math" % "2.2" % "test" // ApacheV2
|
||||
val commonsIo = "commons-io" % "commons-io" % "2.4" % "test" // ApacheV2
|
||||
val commonsCodec = "commons-codec" % "commons-codec" % "1.10" % "test" // ApacheV2
|
||||
val junit = "junit" % "junit" % "4.12" % "test" // Common Public License 1.0
|
||||
val junit = "junit" % "junit" % junitVersion % "test" // Common Public License 1.0
|
||||
val logback = "ch.qos.logback" % "logback-classic" % "1.1.3" % "test" // EPL 1.0 / LGPL 2.1
|
||||
val mockito = "org.mockito" % "mockito-all" % "1.10.19" % "test" // MIT
|
||||
// changing the scalatest dependency must be reflected in akka-docs/rst/dev/multi-jvm-testing.rst
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue