diff --git a/akka-http-core/src/main/scala/akka/http/javadsl/Http.scala b/akka-http-core/src/main/scala/akka/http/javadsl/Http.scala index a6bce65f66..4cadd43cd5 100644 --- a/akka-http-core/src/main/scala/akka/http/javadsl/Http.scala +++ b/akka-http-core/src/main/scala/akka/http/javadsl/Http.scala @@ -542,7 +542,7 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension { * In order to allow for easy response-to-request association the flow takes in a custom, opaque context * object of type `T` from the application which is emitted together with the corresponding response. */ - def superPool[T](materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], Unit] = + def superPool[T](materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], NotUsed] = adaptTupleFlow(delegate.superPool[T]()(materializer)) /** diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/ws/WebsocketIntegrationSpec.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/ws/WebsocketIntegrationSpec.scala deleted file mode 100644 index 184ed34cf4..0000000000 --- a/akka-http-core/src/test/scala/akka/http/impl/engine/ws/WebsocketIntegrationSpec.scala +++ /dev/null @@ -1,174 +0,0 @@ -/** - * Copyright (C) 2015 Typesafe Inc. - */ -package akka.http.impl.engine.ws - -import scala.concurrent.Await -import scala.concurrent.duration.DurationInt -import org.scalactic.ConversionCheckedTripleEquals -import org.scalatest.concurrent.ScalaFutures -import org.scalatest.time.Span.convertDurationToSpan -import akka.http.scaladsl.Http -import akka.http.scaladsl.model.HttpRequest -import akka.http.scaladsl.model.Uri.apply -import akka.http.scaladsl.model.ws._ -import akka.stream._ -import akka.stream.scaladsl._ -import akka.stream.testkit._ -import akka.stream.scaladsl.GraphDSL.Implicits._ -import org.scalatest.concurrent.Eventually -import akka.stream.io.SslTlsPlacebo -import java.net.InetSocketAddress -import akka.stream.impl.fusing.GraphStages -import akka.util.ByteString -import akka.http.scaladsl.model.StatusCodes -import akka.stream.testkit.scaladsl.TestSink -import scala.concurrent.Future - -class WebsocketIntegrationSpec extends AkkaSpec("akka.stream.materializer.debug.fuzzing-mode=off") - with ScalaFutures with ConversionCheckedTripleEquals with Eventually { - - implicit val patience = PatienceConfig(3.seconds) - import system.dispatcher - implicit val materializer = ActorMaterializer() - - "A Websocket server" must { - - "not reset the connection when no data are flowing" in Utils.assertAllStagesStopped { - val source = TestPublisher.probe[Message]() - val bindingFuture = Http().bindAndHandleSync({ - case HttpRequest(_, _, headers, _, _) ⇒ - val upgrade = headers.collectFirst { case u: UpgradeToWebsocket ⇒ u }.get - upgrade.handleMessages(Flow.fromSinkAndSource(Sink.ignore, Source.fromPublisher(source)), None) - }, interface = "localhost", port = 0) - val binding = Await.result(bindingFuture, 3.seconds) - val myPort = binding.localAddress.getPort - - val (response, sink) = Http().singleWebsocketRequest( - WebsocketRequest("ws://127.0.0.1:" + myPort), - Flow.fromSinkAndSourceMat(TestSink.probe[Message], Source.empty)(Keep.left)) - - response.futureValue.response.status.isSuccess should ===(true) - sink - .request(10) - .expectNoMsg(500.millis) - - source - .sendNext(TextMessage("hello")) - .sendComplete() - sink - .expectNext(TextMessage("hello")) - .expectComplete() - - binding.unbind() - } - - "not reset the connection when no data are flowing and the connection is closed from the client" in Utils.assertAllStagesStopped { - val source = TestPublisher.probe[Message]() - val bindingFuture = Http().bindAndHandleSync({ - case HttpRequest(_, _, headers, _, _) ⇒ - val upgrade = headers.collectFirst { case u: UpgradeToWebsocket ⇒ u }.get - upgrade.handleMessages(Flow.fromSinkAndSource(Sink.ignore, Source.fromPublisher(source)), None) - }, interface = "localhost", port = 0) - val binding = Await.result(bindingFuture, 3.seconds) - val myPort = binding.localAddress.getPort - - val ((response, breaker), sink) = - Source.empty - .viaMat { - Http().websocketClientLayer(WebsocketRequest("ws://localhost:" + myPort)) - .atop(SslTlsPlacebo.forScala) - .joinMat(Flow.fromGraph(GraphStages.breaker[ByteString]).via( - Tcp().outgoingConnection(new InetSocketAddress("localhost", myPort), halfClose = true)))(Keep.both) - }(Keep.right) - .toMat(TestSink.probe[Message])(Keep.both) - .run() - - response.futureValue.response.status.isSuccess should ===(true) - sink - .request(10) - .expectNoMsg(1500.millis) - - breaker.value.get.get.complete() - - source - .sendNext(TextMessage("hello")) - .sendComplete() - sink - .expectNext(TextMessage("hello")) - .expectComplete() - - binding.unbind() - } - - "echo 100 elements and then shut down without error" in Utils.assertAllStagesStopped { - - val bindingFuture = Http().bindAndHandleSync({ - case HttpRequest(_, _, headers, _, _) ⇒ - val upgrade = headers.collectFirst { case u: UpgradeToWebsocket ⇒ u }.get - upgrade.handleMessages(Flow.apply, None) - }, interface = "localhost", port = 0) - val binding = Await.result(bindingFuture, 3.seconds) - val myPort = binding.localAddress.getPort - - val N = 100 - val (response, count) = Http().singleWebsocketRequest( - WebsocketRequest("ws://127.0.0.1:" + myPort), - Flow.fromSinkAndSourceMat( - Sink.fold(0)((n, _: Message) ⇒ n + 1), - Source.repeat(TextMessage("hello")).take(N))(Keep.left)) - - count.futureValue should ===(N) - binding.unbind() - } - - "send back 100 elements and then terminate without error even when not ordinarily closed" in Utils.assertAllStagesStopped { - val N = 100 - - val handler = Flow.fromGraph(GraphDSL.create() { implicit b ⇒ - val merge = b.add(Merge[Int](2)) - - // convert to int so we can connect to merge - val mapMsgToInt = b.add(Flow[Message].map(_ ⇒ -1)) - val mapIntToMsg = b.add(Flow[Int].map(x ⇒ TextMessage.Strict(s"Sending: $x"))) - - // source we want to use to send message to the connected websocket sink - val rangeSource = b.add(Source(1 to N)) - - mapMsgToInt ~> merge // this part of the merge will never provide msgs - rangeSource ~> merge ~> mapIntToMsg - - FlowShape(mapMsgToInt.in, mapIntToMsg.out) - }) - - val bindingFuture = Http().bindAndHandleSync({ - case HttpRequest(_, _, headers, _, _) ⇒ - val upgrade = headers.collectFirst { case u: UpgradeToWebsocket ⇒ u }.get - upgrade.handleMessages(handler, None) - }, interface = "localhost", port = 0) - val binding = Await.result(bindingFuture, 3.seconds) - val myPort = binding.localAddress.getPort - - @volatile var messages = 0 - val (breaker, completion) = - Source.maybe - .viaMat { - Http().websocketClientLayer(WebsocketRequest("ws://localhost:" + myPort)) - .atop(SslTlsPlacebo.forScala) - // the resource leak of #19398 existed only for severed websocket connections - .atopMat(GraphStages.bidiBreaker[ByteString, ByteString])(Keep.right) - .join(Tcp().outgoingConnection(new InetSocketAddress("localhost", myPort), halfClose = true)) - }(Keep.right) - .toMat(Sink.foreach(_ ⇒ messages += 1))(Keep.both) - .run() - eventually(messages should ===(N)) - // breaker should have been fulfilled long ago - breaker.value.get.get.completeAndCancel() - completion.futureValue - - binding.unbind() - } - - } - -} diff --git a/notes.md b/notes.md deleted file mode 100644 index a16d40ab7e..0000000000 --- a/notes.md +++ /dev/null @@ -1,62 +0,0 @@ -Notes on changes - -- hidden "Setup" using methods on Http -- super pool to be "dead simple" -- we want to move away from Option[HttpsContext] as it's a lie, None => defaultContext anyway -- config performed in ssl-config, applying these settings done in Akka - - e.g. NegotiateNewSession -- was: singleRequest(req, settings, context: Option[HttpsContext]) == None meant default -- default port in context is useful for starting the https server - -- in WS, we'll always want to be TLS in practice. APIs use HttpsContext, but provide default one - - if request is to "ws://" then the https is not used of course - -### Server - -Needs to know upfront. - -**bind / bindAndHandle** - - has context - - default HTTP - - if no port given, based on Context 80/443 - -=> Type: ConnectionContext - based on type HTTP / HTTPS -Note: context should be obtainable Http().defaultServerHttpsContext - -### Client - -## connections -Needs to know upfront. - -**outgoingConnection** - - no context - -**outgoingConnectionTls** - - needs https context - - provides default HTTPS - -**outgoingConnection** - - no context - -**newHostConnectionPoolTls** - - needs https context - -=> Tls methods provide default HTTPS config -Type: HttpsConnectionConfig on Tls methods - -## request sensitive (adds TLS when needed): -Needs context "just in case", enables when request needs it. - -**singleRequest** - - has context, default HTTPS, may drop it - -**singleWebSocketRequest** - - has context, default HTTPS, may drop it - -**singleWebSocketRequest** - - needs context, "just in case" - - provides default HTTPS - -=> normal methods, Tls methods -=> Tls methods provide default HTTPS config -Type: HttpsConnectionConfig on Tls methods diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 23354b42c2..ea27be8442 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -9,6 +9,7 @@ object Dependencies { lazy val scalaTestVersion = settingKey[String]("The version of ScalaTest to use.") lazy val scalaStmVersion = settingKey[String]("The version of ScalaSTM to use.") lazy val scalaCheckVersion = settingKey[String]("The version of ScalaCheck to use.") + val junitVersion = "4.12" val Versions = Seq( crossScalaVersions := Seq("2.11.7"), //"2.12.0-M2" @@ -54,7 +55,7 @@ object Dependencies { val jackson = "com.fasterxml.jackson.core" % "jackson-databind" % "2.4.3" // ApacheV2 // For akka-http-testkit-java - val junit = "junit" % "junit" % "4.11" // Common Public License 1.0 + val junit = "junit" % "junit" % junitVersion // Common Public License 1.0 // For Java 8 Conversions val java8Compat = "org.scala-lang.modules" %% "scala-java8-compat" % "0.7.0" // Scala License @@ -68,7 +69,7 @@ object Dependencies { val commonsMath = "org.apache.commons" % "commons-math" % "2.2" % "test" // ApacheV2 val commonsIo = "commons-io" % "commons-io" % "2.4" % "test" // ApacheV2 val commonsCodec = "commons-codec" % "commons-codec" % "1.10" % "test" // ApacheV2 - val junit = "junit" % "junit" % "4.12" % "test" // Common Public License 1.0 + val junit = "junit" % "junit" % junitVersion % "test" // Common Public License 1.0 val logback = "ch.qos.logback" % "logback-classic" % "1.1.3" % "test" // EPL 1.0 / LGPL 2.1 val mockito = "org.mockito" % "mockito-all" % "1.10.19" % "test" // MIT // changing the scalatest dependency must be reflected in akka-docs/rst/dev/multi-jvm-testing.rst