=htc Websocket Autobahn Suite test runners and documentation
This commit is contained in:
parent
870ff2bbdc
commit
08aa903408
4 changed files with 390 additions and 0 deletions
|
|
@ -0,0 +1,68 @@
|
|||
/*
|
||||
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.http.impl.engine.ws
|
||||
|
||||
import scala.concurrent.duration._
|
||||
|
||||
import akka.actor.ActorSystem
|
||||
import akka.http.scaladsl.Http
|
||||
import akka.http.scaladsl.model.ws.{ TextMessage, BinaryMessage, Message }
|
||||
import akka.stream.ActorMaterializer
|
||||
import akka.stream.scaladsl._
|
||||
import akka.util.ByteString
|
||||
|
||||
import scala.concurrent.Future
|
||||
import scala.util.{ Failure, Success }
|
||||
|
||||
/**
|
||||
* An example App that runs a quick test against the websocket server at wss://echo.websocket.org
|
||||
*/
|
||||
object EchoTestClientApp extends App {
|
||||
implicit val system = ActorSystem()
|
||||
import system.dispatcher
|
||||
implicit val mat = ActorMaterializer()
|
||||
|
||||
def delayedCompletion(delay: FiniteDuration): Source[Nothing, Unit] =
|
||||
Source.single(1)
|
||||
.mapAsync(1)(_ ⇒ akka.pattern.after(delay, system.scheduler)(Future(1)))
|
||||
.drop(1).asInstanceOf[Source[Nothing, Unit]]
|
||||
|
||||
def messages: List[Message] =
|
||||
List(
|
||||
TextMessage("Test 1"),
|
||||
BinaryMessage(ByteString("abc")),
|
||||
TextMessage("Test 2"),
|
||||
BinaryMessage(ByteString("def")))
|
||||
|
||||
def source: Source[Message, Unit] =
|
||||
Source(messages) ++ delayedCompletion(1.second) // otherwise, we may start closing too soon
|
||||
|
||||
def sink: Sink[Message, Future[Seq[String]]] =
|
||||
Flow[Message]
|
||||
.mapAsync(1) {
|
||||
case tm: TextMessage ⇒
|
||||
tm.textStream.runWith(Sink.fold("")(_ + _)).map(str ⇒ s"TextMessage: '$str'")
|
||||
case bm: BinaryMessage ⇒
|
||||
bm.dataStream.runWith(Sink.fold(ByteString.empty)(_ ++ _)).map(bs ⇒ s"BinaryMessage: '${bs.utf8String}'")
|
||||
}
|
||||
.grouped(10000)
|
||||
.toMat(Sink.head)(Keep.right)
|
||||
|
||||
def echoClient = Flow.wrap(sink, source)(Keep.left)
|
||||
|
||||
val (upgrade, res) = Http().singleWebsocketRequest("wss://echo.websocket.org", echoClient)
|
||||
res onComplete {
|
||||
case Success(res) ⇒
|
||||
println("Run successful. Got these elements:")
|
||||
res.foreach(println)
|
||||
system.shutdown()
|
||||
case Failure(e) ⇒
|
||||
println("Run failed.")
|
||||
e.printStackTrace()
|
||||
system.shutdown()
|
||||
}
|
||||
|
||||
system.scheduler.scheduleOnce(10.seconds)(system.shutdown())
|
||||
}
|
||||
|
|
@ -0,0 +1,227 @@
|
|||
/*
|
||||
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.http.impl.engine.ws
|
||||
|
||||
import scala.concurrent.{ Promise, Future }
|
||||
import scala.util.{ Try, Failure, Success }
|
||||
|
||||
import spray.json._
|
||||
|
||||
import akka.actor.ActorSystem
|
||||
|
||||
import akka.stream.ActorMaterializer
|
||||
import akka.stream.io.SslTlsPlacebo
|
||||
import akka.stream.stage.{ TerminationDirective, Context, SyncDirective, PushStage }
|
||||
import akka.stream.scaladsl._
|
||||
|
||||
import akka.http.scaladsl.Http
|
||||
import akka.http.scaladsl.model.Uri
|
||||
import akka.http.scaladsl.model.ws._
|
||||
|
||||
object WSClientAutobahnTest extends App {
|
||||
implicit val system = ActorSystem()
|
||||
import system.dispatcher
|
||||
implicit val mat = ActorMaterializer()
|
||||
|
||||
val Agent = "akka-http"
|
||||
val Parallelism = 4
|
||||
|
||||
val getCaseCountUri: Uri =
|
||||
s"ws://localhost:9001/getCaseCount"
|
||||
|
||||
def runCaseUri(caseIndex: Int, agent: String): Uri =
|
||||
s"ws://localhost:9001/runCase?case=$caseIndex&agent=$agent"
|
||||
|
||||
def getCaseStatusUri(caseIndex: Int, agent: String): Uri =
|
||||
s"ws://localhost:9001/getCaseStatus?case=$caseIndex&agent=$agent"
|
||||
|
||||
def getCaseInfoUri(caseIndex: Int): Uri =
|
||||
s"ws://localhost:9001/getCaseInfo?case=$caseIndex"
|
||||
|
||||
def updateReportsUri(agent: String): Uri =
|
||||
s"ws://localhost:9001/updateReports?agent=$agent"
|
||||
|
||||
def runCase(caseIndex: Int, agent: String = Agent): Future[CaseStatus] =
|
||||
runWs(runCaseUri(caseIndex, agent), echo).recover { case _ ⇒ () }.flatMap { _ ⇒
|
||||
getCaseStatus(caseIndex, agent)
|
||||
}
|
||||
|
||||
def richRunCase(caseIndex: Int, agent: String = Agent): Future[CaseResult] = {
|
||||
val info = getCaseInfo(caseIndex)
|
||||
val startMillis = System.currentTimeMillis()
|
||||
val status = runCase(caseIndex, agent).map { res ⇒
|
||||
val lastedMillis = System.currentTimeMillis() - startMillis
|
||||
(res, lastedMillis)
|
||||
}
|
||||
import Console._
|
||||
info.flatMap { i ⇒
|
||||
val prefix = f"$YELLOW${i.caseInfo.id}%-7s$RESET - $WHITE${i.caseInfo.description}$RESET ... "
|
||||
//println(prefix)
|
||||
|
||||
status.onComplete {
|
||||
case Success((CaseStatus(status), millis)) ⇒
|
||||
val color = if (status == "OK") GREEN else RED
|
||||
println(f"${color}$status%-15s$RESET$millis%5d ms $prefix")
|
||||
case Failure(e) ⇒
|
||||
println(s"$prefix${RED}failed with '${e.getMessage}'$RESET")
|
||||
}
|
||||
|
||||
status.map(s ⇒ CaseResult(i.caseInfo, s._1))
|
||||
}
|
||||
}
|
||||
|
||||
def getCaseCount(): Future[Int] =
|
||||
runToSingleText(getCaseCountUri).map(_.toInt)
|
||||
|
||||
def getCaseInfo(caseId: Int): Future[IndexedCaseInfo] =
|
||||
runToSingleJsonValue[CaseInfo](getCaseInfoUri(caseId)).map(IndexedCaseInfo(caseId, _))
|
||||
|
||||
def getCaseStatus(caseId: Int, agent: String = Agent): Future[CaseStatus] =
|
||||
runToSingleJsonValue[CaseStatus](getCaseStatusUri(caseId, agent))
|
||||
|
||||
def updateReports(agent: String = Agent): Future[Unit] =
|
||||
runToSingleText(updateReportsUri(agent)).map(_ ⇒ ())
|
||||
|
||||
/**
|
||||
* Map from textual case ID (like 1.1.1) to IndexedCaseInfo
|
||||
* @return
|
||||
*/
|
||||
def getCaseMap(): Future[Map[String, IndexedCaseInfo]] = {
|
||||
val res =
|
||||
getCaseCount().flatMap { count ⇒
|
||||
println(s"Retrieving case info for $count cases...")
|
||||
Future.traverse(1 to count)(getCaseInfo).map(_.map(e ⇒ e.caseInfo.id -> e).toMap)
|
||||
}
|
||||
res.foreach { res ⇒
|
||||
println(s"Received info for ${res.size} cases")
|
||||
}
|
||||
res
|
||||
}
|
||||
|
||||
def echo = Flow[Message].viaMat(completionSignal)(Keep.right)
|
||||
|
||||
if (args.size >= 1) {
|
||||
// run one
|
||||
val testId = args(0)
|
||||
println(s"Trying to run test $testId")
|
||||
getCaseMap().flatMap { map ⇒
|
||||
val info = map(testId)
|
||||
richRunCase(info.index)
|
||||
}.onComplete {
|
||||
case Success(res) ⇒
|
||||
println(s"Run successfully finished!")
|
||||
updateReportsAndShutdown()
|
||||
case Failure(e) ⇒
|
||||
println("Run failed with this exception")
|
||||
e.printStackTrace()
|
||||
updateReportsAndShutdown()
|
||||
}
|
||||
} else {
|
||||
println("Running complete test suite")
|
||||
getCaseCount().flatMap { count ⇒
|
||||
println(s"Found $count tests.")
|
||||
Source(1 to count).mapAsyncUnordered(Parallelism)(richRunCase(_)).grouped(count).runWith(Sink.head)
|
||||
}.map { results ⇒
|
||||
val grouped =
|
||||
results.groupBy(_.status.behavior)
|
||||
|
||||
import Console._
|
||||
println(s"${results.size} tests run.")
|
||||
println()
|
||||
println(s"${GREEN}OK$RESET: ${grouped.getOrElse("OK", Nil).size}")
|
||||
val notOk = grouped.filterNot(_._1 == "OK")
|
||||
notOk.toSeq.sortBy(_._2.size).foreach {
|
||||
case (status, cases) ⇒ println(s"$RED$status$RESET: ${cases.size}")
|
||||
}
|
||||
println()
|
||||
println("Not OK tests")
|
||||
println()
|
||||
results.filterNot(_.status.behavior == "OK").foreach { r ⇒
|
||||
println(f"$RED${r.status.behavior}%-20s$RESET $YELLOW${r.info.id}%-7s$RESET - $WHITE${r.info.description}$RESET")
|
||||
}
|
||||
|
||||
()
|
||||
}
|
||||
.onComplete(completion)
|
||||
}
|
||||
|
||||
def completion[T]: Try[T] ⇒ Unit = {
|
||||
case Success(res) ⇒
|
||||
println(s"Run successfully finished!")
|
||||
updateReportsAndShutdown()
|
||||
case Failure(e) ⇒
|
||||
println("Run failed with this exception")
|
||||
e.printStackTrace()
|
||||
updateReportsAndShutdown()
|
||||
}
|
||||
def updateReportsAndShutdown(): Unit =
|
||||
updateReports().onComplete { res ⇒
|
||||
println("Reports should now be accessible at http://localhost:8080/cwd/reports/clients/index.html")
|
||||
system.shutdown()
|
||||
}
|
||||
|
||||
import scala.concurrent.duration._
|
||||
import system.dispatcher
|
||||
system.scheduler.scheduleOnce(60.seconds)(system.shutdown())
|
||||
|
||||
def runWs[T](uri: Uri, clientFlow: Flow[Message, Message, T]): T =
|
||||
Http().singleWebsocketRequest(uri, clientFlow)._2
|
||||
|
||||
def completionSignal[T]: Flow[T, T, Future[Unit]] =
|
||||
Flow[T].transformMaterializing { () ⇒
|
||||
val p = Promise[Unit]()
|
||||
val stage =
|
||||
new PushStage[T, T] {
|
||||
def onPush(elem: T, ctx: Context[T]): SyncDirective = ctx.push(elem)
|
||||
override def onUpstreamFinish(ctx: Context[T]): TerminationDirective = {
|
||||
p.success(())
|
||||
super.onUpstreamFinish(ctx)
|
||||
}
|
||||
override def onDownstreamFinish(ctx: Context[T]): TerminationDirective = {
|
||||
p.success(()) // should this be failure as well?
|
||||
super.onDownstreamFinish(ctx)
|
||||
}
|
||||
override def onUpstreamFailure(cause: Throwable, ctx: Context[T]): TerminationDirective = {
|
||||
p.failure(cause)
|
||||
super.onUpstreamFailure(cause, ctx)
|
||||
}
|
||||
}
|
||||
|
||||
(stage, p.future)
|
||||
}
|
||||
|
||||
/**
|
||||
* The autobahn tests define a weird API where every request must be a Websocket request and
|
||||
* they will send a single websocket message with the result. Websocket everywhere? Strange,
|
||||
* but somewhat consistent.
|
||||
*/
|
||||
def runToSingleText(uri: Uri): Future[String] = {
|
||||
val sink = Sink.head[Message]
|
||||
runWs(uri, Flow.wrap(sink, Source.lazyEmpty[Message])(Keep.left)).flatMap {
|
||||
case tm: TextMessage ⇒ tm.textStream.runWith(Sink.fold("")(_ + _))
|
||||
}
|
||||
}
|
||||
def runToSingleJsonValue[T: JsonReader](uri: Uri): Future[T] =
|
||||
runToSingleText(uri).map(_.parseJson.convertTo[T])
|
||||
|
||||
case class IndexedCaseInfo(index: Int, caseInfo: CaseInfo)
|
||||
case class CaseResult(info: CaseInfo, status: CaseStatus)
|
||||
|
||||
// {"behavior": "OK"}
|
||||
case class CaseStatus(behavior: String) {
|
||||
def isSuccessful: Boolean = behavior == "OK"
|
||||
}
|
||||
object CaseStatus {
|
||||
import DefaultJsonProtocol._
|
||||
implicit def caseStatusFormat: JsonFormat[CaseStatus] = jsonFormat1(CaseStatus.apply)
|
||||
}
|
||||
|
||||
// {"id": "1.1.1", "description": "Send text message with payload 0."}
|
||||
case class CaseInfo(id: String, description: String)
|
||||
object CaseInfo {
|
||||
import DefaultJsonProtocol._
|
||||
implicit def caseInfoFormat: JsonFormat[CaseInfo] = jsonFormat2(CaseInfo.apply)
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,44 @@
|
|||
/*
|
||||
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.http.impl.engine.ws
|
||||
|
||||
import scala.concurrent.Await
|
||||
import scala.concurrent.duration._
|
||||
|
||||
import akka.actor.ActorSystem
|
||||
import akka.http.scaladsl.Http
|
||||
import akka.http.scaladsl.model.HttpMethods._
|
||||
import akka.http.scaladsl.model.ws.{ Message, UpgradeToWebsocket }
|
||||
import akka.http.scaladsl.model._
|
||||
import akka.stream.ActorMaterializer
|
||||
import akka.stream.scaladsl.Flow
|
||||
|
||||
object WSServerAutobahnTest extends App {
|
||||
implicit val system = ActorSystem("WSServerTest")
|
||||
implicit val fm = ActorMaterializer()
|
||||
|
||||
try {
|
||||
val binding = Http().bindAndHandleSync({
|
||||
case req @ HttpRequest(GET, Uri.Path("/"), _, _, _) if req.header[UpgradeToWebsocket].isDefined ⇒
|
||||
req.header[UpgradeToWebsocket] match {
|
||||
case Some(upgrade) ⇒ upgrade.handleMessages(echoWebsocketService) // needed for running the autobahn test suite
|
||||
case None ⇒ HttpResponse(400, entity = "Not a valid websocket request!")
|
||||
}
|
||||
case _: HttpRequest ⇒ HttpResponse(404, entity = "Unknown resource!")
|
||||
},
|
||||
interface = "172.17.42.1", // adapt to your docker host IP address if necessary
|
||||
port = 9001)
|
||||
|
||||
Await.result(binding, 1.second) // throws if binding fails
|
||||
println("Server online at http://172.17.42.1:9001")
|
||||
println("Press RETURN to stop...")
|
||||
Console.readLine()
|
||||
} finally {
|
||||
system.shutdown()
|
||||
}
|
||||
|
||||
def echoWebsocketService: Flow[Message, Message, Unit] =
|
||||
Flow[Message] // just let message flow directly to the output
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue