diff --git a/akka-http-core/RunWebsocketAutobahnTestSuite.md b/akka-http-core/RunWebsocketAutobahnTestSuite.md new file mode 100644 index 0000000000..9591870e75 --- /dev/null +++ b/akka-http-core/RunWebsocketAutobahnTestSuite.md @@ -0,0 +1,51 @@ +# Test the client side + +Start up the testsuite with docker: + +``` +docker run -ti --rm=true -p 8080:8080 -p 9001:9001 jrudolph/autobahn-testsuite +``` + +Then in sbt, to run all tests, use + +``` +akka-http-core-experimental/test:run-main akka.http.impl.engine.ws.WSClientAutobahnTest +``` + +or, to run a single test, use + +``` +akka-http-core-experimental/test:run-main akka.http.impl.engine.ws.WSClientAutobahnTest 1.1.1 +``` + +After a run, you can access the results of the run at http://localhost:8080/cwd/reports/clients/index.html. + +You can supply a configuration file for autobahn by mounting a version of `fuzzingserver.json` to `/tmp/fuzzingserver.json` +of the container, e.g. using this docker option: + +``` +-v /fullpath-on-host/my-fuzzingserver-config.json:/tmp/fuzzingserver.json +``` + +# Test the server side + +Start up the test server in sbt: + +``` +akka-http-core-experimental/test:run-main akka.http.impl.engine.ws.WSServerAutobahnTest +``` + +Then, run the test suite with docker: + +``` +docker run -ti --rm=true -v `pwd`/reports:/tmp/server-report jrudolph/autobahn-testsuite-client +``` + +This will put the result report into a `reports` directory in the current working directory on the host. + +You can supply a configuration file for autobahn by mounting a version of `fuzzingclient.json` to `/tmp/fuzzingclient.json` +of the container, e.g. using this docker option: + +``` +-v /fullpath-on-host/my-fuzzingclient-config.json:/tmp/fuzzingclient.json +``` diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/ws/EchoTestClientApp.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/ws/EchoTestClientApp.scala new file mode 100644 index 0000000000..900b25b0bd --- /dev/null +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/ws/EchoTestClientApp.scala @@ -0,0 +1,68 @@ +/* + * Copyright (C) 2009-2015 Typesafe Inc. + */ + +package akka.http.impl.engine.ws + +import scala.concurrent.duration._ + +import akka.actor.ActorSystem +import akka.http.scaladsl.Http +import akka.http.scaladsl.model.ws.{ TextMessage, BinaryMessage, Message } +import akka.stream.ActorMaterializer +import akka.stream.scaladsl._ +import akka.util.ByteString + +import scala.concurrent.Future +import scala.util.{ Failure, Success } + +/** + * An example App that runs a quick test against the websocket server at wss://echo.websocket.org + */ +object EchoTestClientApp extends App { + implicit val system = ActorSystem() + import system.dispatcher + implicit val mat = ActorMaterializer() + + def delayedCompletion(delay: FiniteDuration): Source[Nothing, Unit] = + Source.single(1) + .mapAsync(1)(_ ⇒ akka.pattern.after(delay, system.scheduler)(Future(1))) + .drop(1).asInstanceOf[Source[Nothing, Unit]] + + def messages: List[Message] = + List( + TextMessage("Test 1"), + BinaryMessage(ByteString("abc")), + TextMessage("Test 2"), + BinaryMessage(ByteString("def"))) + + def source: Source[Message, Unit] = + Source(messages) ++ delayedCompletion(1.second) // otherwise, we may start closing too soon + + def sink: Sink[Message, Future[Seq[String]]] = + Flow[Message] + .mapAsync(1) { + case tm: TextMessage ⇒ + tm.textStream.runWith(Sink.fold("")(_ + _)).map(str ⇒ s"TextMessage: '$str'") + case bm: BinaryMessage ⇒ + bm.dataStream.runWith(Sink.fold(ByteString.empty)(_ ++ _)).map(bs ⇒ s"BinaryMessage: '${bs.utf8String}'") + } + .grouped(10000) + .toMat(Sink.head)(Keep.right) + + def echoClient = Flow.wrap(sink, source)(Keep.left) + + val (upgrade, res) = Http().singleWebsocketRequest("wss://echo.websocket.org", echoClient) + res onComplete { + case Success(res) ⇒ + println("Run successful. Got these elements:") + res.foreach(println) + system.shutdown() + case Failure(e) ⇒ + println("Run failed.") + e.printStackTrace() + system.shutdown() + } + + system.scheduler.scheduleOnce(10.seconds)(system.shutdown()) +} diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/ws/WSClientAutobahnTest.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/ws/WSClientAutobahnTest.scala new file mode 100644 index 0000000000..4c05b8f46b --- /dev/null +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/ws/WSClientAutobahnTest.scala @@ -0,0 +1,227 @@ +/* + * Copyright (C) 2009-2015 Typesafe Inc. + */ + +package akka.http.impl.engine.ws + +import scala.concurrent.{ Promise, Future } +import scala.util.{ Try, Failure, Success } + +import spray.json._ + +import akka.actor.ActorSystem + +import akka.stream.ActorMaterializer +import akka.stream.io.SslTlsPlacebo +import akka.stream.stage.{ TerminationDirective, Context, SyncDirective, PushStage } +import akka.stream.scaladsl._ + +import akka.http.scaladsl.Http +import akka.http.scaladsl.model.Uri +import akka.http.scaladsl.model.ws._ + +object WSClientAutobahnTest extends App { + implicit val system = ActorSystem() + import system.dispatcher + implicit val mat = ActorMaterializer() + + val Agent = "akka-http" + val Parallelism = 4 + + val getCaseCountUri: Uri = + s"ws://localhost:9001/getCaseCount" + + def runCaseUri(caseIndex: Int, agent: String): Uri = + s"ws://localhost:9001/runCase?case=$caseIndex&agent=$agent" + + def getCaseStatusUri(caseIndex: Int, agent: String): Uri = + s"ws://localhost:9001/getCaseStatus?case=$caseIndex&agent=$agent" + + def getCaseInfoUri(caseIndex: Int): Uri = + s"ws://localhost:9001/getCaseInfo?case=$caseIndex" + + def updateReportsUri(agent: String): Uri = + s"ws://localhost:9001/updateReports?agent=$agent" + + def runCase(caseIndex: Int, agent: String = Agent): Future[CaseStatus] = + runWs(runCaseUri(caseIndex, agent), echo).recover { case _ ⇒ () }.flatMap { _ ⇒ + getCaseStatus(caseIndex, agent) + } + + def richRunCase(caseIndex: Int, agent: String = Agent): Future[CaseResult] = { + val info = getCaseInfo(caseIndex) + val startMillis = System.currentTimeMillis() + val status = runCase(caseIndex, agent).map { res ⇒ + val lastedMillis = System.currentTimeMillis() - startMillis + (res, lastedMillis) + } + import Console._ + info.flatMap { i ⇒ + val prefix = f"$YELLOW${i.caseInfo.id}%-7s$RESET - $WHITE${i.caseInfo.description}$RESET ... " + //println(prefix) + + status.onComplete { + case Success((CaseStatus(status), millis)) ⇒ + val color = if (status == "OK") GREEN else RED + println(f"${color}$status%-15s$RESET$millis%5d ms $prefix") + case Failure(e) ⇒ + println(s"$prefix${RED}failed with '${e.getMessage}'$RESET") + } + + status.map(s ⇒ CaseResult(i.caseInfo, s._1)) + } + } + + def getCaseCount(): Future[Int] = + runToSingleText(getCaseCountUri).map(_.toInt) + + def getCaseInfo(caseId: Int): Future[IndexedCaseInfo] = + runToSingleJsonValue[CaseInfo](getCaseInfoUri(caseId)).map(IndexedCaseInfo(caseId, _)) + + def getCaseStatus(caseId: Int, agent: String = Agent): Future[CaseStatus] = + runToSingleJsonValue[CaseStatus](getCaseStatusUri(caseId, agent)) + + def updateReports(agent: String = Agent): Future[Unit] = + runToSingleText(updateReportsUri(agent)).map(_ ⇒ ()) + + /** + * Map from textual case ID (like 1.1.1) to IndexedCaseInfo + * @return + */ + def getCaseMap(): Future[Map[String, IndexedCaseInfo]] = { + val res = + getCaseCount().flatMap { count ⇒ + println(s"Retrieving case info for $count cases...") + Future.traverse(1 to count)(getCaseInfo).map(_.map(e ⇒ e.caseInfo.id -> e).toMap) + } + res.foreach { res ⇒ + println(s"Received info for ${res.size} cases") + } + res + } + + def echo = Flow[Message].viaMat(completionSignal)(Keep.right) + + if (args.size >= 1) { + // run one + val testId = args(0) + println(s"Trying to run test $testId") + getCaseMap().flatMap { map ⇒ + val info = map(testId) + richRunCase(info.index) + }.onComplete { + case Success(res) ⇒ + println(s"Run successfully finished!") + updateReportsAndShutdown() + case Failure(e) ⇒ + println("Run failed with this exception") + e.printStackTrace() + updateReportsAndShutdown() + } + } else { + println("Running complete test suite") + getCaseCount().flatMap { count ⇒ + println(s"Found $count tests.") + Source(1 to count).mapAsyncUnordered(Parallelism)(richRunCase(_)).grouped(count).runWith(Sink.head) + }.map { results ⇒ + val grouped = + results.groupBy(_.status.behavior) + + import Console._ + println(s"${results.size} tests run.") + println() + println(s"${GREEN}OK$RESET: ${grouped.getOrElse("OK", Nil).size}") + val notOk = grouped.filterNot(_._1 == "OK") + notOk.toSeq.sortBy(_._2.size).foreach { + case (status, cases) ⇒ println(s"$RED$status$RESET: ${cases.size}") + } + println() + println("Not OK tests") + println() + results.filterNot(_.status.behavior == "OK").foreach { r ⇒ + println(f"$RED${r.status.behavior}%-20s$RESET $YELLOW${r.info.id}%-7s$RESET - $WHITE${r.info.description}$RESET") + } + + () + } + .onComplete(completion) + } + + def completion[T]: Try[T] ⇒ Unit = { + case Success(res) ⇒ + println(s"Run successfully finished!") + updateReportsAndShutdown() + case Failure(e) ⇒ + println("Run failed with this exception") + e.printStackTrace() + updateReportsAndShutdown() + } + def updateReportsAndShutdown(): Unit = + updateReports().onComplete { res ⇒ + println("Reports should now be accessible at http://localhost:8080/cwd/reports/clients/index.html") + system.shutdown() + } + + import scala.concurrent.duration._ + import system.dispatcher + system.scheduler.scheduleOnce(60.seconds)(system.shutdown()) + + def runWs[T](uri: Uri, clientFlow: Flow[Message, Message, T]): T = + Http().singleWebsocketRequest(uri, clientFlow)._2 + + def completionSignal[T]: Flow[T, T, Future[Unit]] = + Flow[T].transformMaterializing { () ⇒ + val p = Promise[Unit]() + val stage = + new PushStage[T, T] { + def onPush(elem: T, ctx: Context[T]): SyncDirective = ctx.push(elem) + override def onUpstreamFinish(ctx: Context[T]): TerminationDirective = { + p.success(()) + super.onUpstreamFinish(ctx) + } + override def onDownstreamFinish(ctx: Context[T]): TerminationDirective = { + p.success(()) // should this be failure as well? + super.onDownstreamFinish(ctx) + } + override def onUpstreamFailure(cause: Throwable, ctx: Context[T]): TerminationDirective = { + p.failure(cause) + super.onUpstreamFailure(cause, ctx) + } + } + + (stage, p.future) + } + + /** + * The autobahn tests define a weird API where every request must be a Websocket request and + * they will send a single websocket message with the result. Websocket everywhere? Strange, + * but somewhat consistent. + */ + def runToSingleText(uri: Uri): Future[String] = { + val sink = Sink.head[Message] + runWs(uri, Flow.wrap(sink, Source.lazyEmpty[Message])(Keep.left)).flatMap { + case tm: TextMessage ⇒ tm.textStream.runWith(Sink.fold("")(_ + _)) + } + } + def runToSingleJsonValue[T: JsonReader](uri: Uri): Future[T] = + runToSingleText(uri).map(_.parseJson.convertTo[T]) + + case class IndexedCaseInfo(index: Int, caseInfo: CaseInfo) + case class CaseResult(info: CaseInfo, status: CaseStatus) + + // {"behavior": "OK"} + case class CaseStatus(behavior: String) { + def isSuccessful: Boolean = behavior == "OK" + } + object CaseStatus { + import DefaultJsonProtocol._ + implicit def caseStatusFormat: JsonFormat[CaseStatus] = jsonFormat1(CaseStatus.apply) + } + + // {"id": "1.1.1", "description": "Send text message with payload 0."} + case class CaseInfo(id: String, description: String) + object CaseInfo { + import DefaultJsonProtocol._ + implicit def caseInfoFormat: JsonFormat[CaseInfo] = jsonFormat2(CaseInfo.apply) + } +} \ No newline at end of file diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/ws/WSServerAutobahnTest.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/ws/WSServerAutobahnTest.scala new file mode 100644 index 0000000000..85a3c26dee --- /dev/null +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/ws/WSServerAutobahnTest.scala @@ -0,0 +1,44 @@ +/* + * Copyright (C) 2009-2015 Typesafe Inc. + */ + +package akka.http.impl.engine.ws + +import scala.concurrent.Await +import scala.concurrent.duration._ + +import akka.actor.ActorSystem +import akka.http.scaladsl.Http +import akka.http.scaladsl.model.HttpMethods._ +import akka.http.scaladsl.model.ws.{ Message, UpgradeToWebsocket } +import akka.http.scaladsl.model._ +import akka.stream.ActorMaterializer +import akka.stream.scaladsl.Flow + +object WSServerAutobahnTest extends App { + implicit val system = ActorSystem("WSServerTest") + implicit val fm = ActorMaterializer() + + try { + val binding = Http().bindAndHandleSync({ + case req @ HttpRequest(GET, Uri.Path("/"), _, _, _) if req.header[UpgradeToWebsocket].isDefined ⇒ + req.header[UpgradeToWebsocket] match { + case Some(upgrade) ⇒ upgrade.handleMessages(echoWebsocketService) // needed for running the autobahn test suite + case None ⇒ HttpResponse(400, entity = "Not a valid websocket request!") + } + case _: HttpRequest ⇒ HttpResponse(404, entity = "Unknown resource!") + }, + interface = "172.17.42.1", // adapt to your docker host IP address if necessary + port = 9001) + + Await.result(binding, 1.second) // throws if binding fails + println("Server online at http://172.17.42.1:9001") + println("Press RETURN to stop...") + Console.readLine() + } finally { + system.shutdown() + } + + def echoWebsocketService: Flow[Message, Message, Unit] = + Flow[Message] // just let message flow directly to the output +}