+pro add Akka Streams HTTP to master build

> merged akka-bench-jmh-dev with akka-bench-jmh
> renamed akka-stream-tck to more correctly named akka-stream-tests-tck
> merged java8 tests with normal tests in http
This commit is contained in:
Konrad Malawski 2016-01-12 13:45:18 +01:00
parent 0763a150e2
commit f6147972d7
53 changed files with 503 additions and 21 deletions

View file

@ -0,0 +1,31 @@
package akka
import org.openjdk.jmh.results.RunResult
import org.openjdk.jmh.runner.Runner
import org.openjdk.jmh.runner.options.CommandLineOptions
object BenchRunner {
def main(args: Array[String]) = {
import scala.collection.JavaConversions._
val args2 = args.toList.flatMap {
case "quick" => "-i 1 -wi 1 -f1 -t1".split(" ").toList
case "full" => "-i 10 -wi 4 -f3 -t1".split(" ").toList
case "jitwatch" => "-jvmArgs=-XX:+UnlockDiagnosticVMOptions -XX:+TraceClassLoading -XX:+LogCompilation" :: Nil
case other => other :: Nil
}
val opts = new CommandLineOptions(args2: _*)
val results = new Runner(opts).run()
val report = results.map { result: RunResult
val bench = result.getParams.getBenchmark
val params = result.getParams.getParamsKeys.map(key => s"$key=${result.getParams.getParam(key)}").mkString("_")
val score = result.getAggregatedResult.getPrimaryResult.getScore.round
val unit = result.getAggregatedResult.getPrimaryResult.getScoreUnit
s"\t${bench}_${params}\t$score\t$unit"
}
report.toList.sorted.foreach(println)
}
}

View file

@ -0,0 +1,76 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.http
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import akka.http.scaladsl.Http
import akka.http.scaladsl.Http.ServerBinding
import akka.http.scaladsl.model._
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.unmarshalling._
import java.util.concurrent.TimeUnit
import org.openjdk.jmh.annotations._
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.util.Try
import com.typesafe.config.ConfigFactory
@State(Scope.Benchmark)
@OutputTimeUnit(TimeUnit.SECONDS)
@BenchmarkMode(Array(Mode.Throughput))
class HttpBenchmark {
val config = ConfigFactory.parseString(
"""
akka {
loglevel = "ERROR"
}""".stripMargin).withFallback(ConfigFactory.load())
implicit val system = ActorSystem("HttpBenchmark", config)
implicit val materializer = ActorMaterializer()
var binding: ServerBinding = _
var request: HttpRequest = _
var pool: Flow[(HttpRequest, Int), (Try[HttpResponse], Int), _] = _
@Setup
def setup() = {
val route = {
path("test") {
get {
complete("ok")
}
}
}
binding = Await.result(Http().bindAndHandle(route, "127.0.0.1", 0), 1.second)
request = HttpRequest(uri = s"http://${binding.localAddress.getHostString}:${binding.localAddress.getPort}/test")
pool = Http().cachedHostConnectionPool[Int](binding.localAddress.getHostString, binding.localAddress.getPort)
}
@TearDown
def shutdown() = {
Await.ready(Http().shutdownAllConnectionPools(), 1.second)
binding.unbind()
system.shutdown()
system.awaitTermination()
}
@Benchmark
def single_request() = {
import system.dispatcher
val response = Await.result(Http().singleRequest(request), 1.second)
Await.result(Unmarshal(response.entity).to[String], 1.second)
}
@Benchmark
def single_request_pool() = {
import system.dispatcher
val (response, id) = Await.result(Source.single(HttpRequest(uri = "/test") -> 42).via(pool).runWith(Sink.head), 1.second)
Await.result(Unmarshal(response.get.entity).to[String], 1.second)
}
}

View file

@ -0,0 +1,55 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream
import akka.actor.ActorSystem
import akka.stream.scaladsl._
import java.util.concurrent.TimeUnit
import org.openjdk.jmh.annotations._
import scala.concurrent._
import scala.concurrent.duration.Duration.Inf
@State(Scope.Benchmark)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@BenchmarkMode(Array(Mode.Throughput))
class FlatMapMergeBenchmark {
implicit val system = ActorSystem("FlatMapMergeBenchmark")
val materializerSettings = ActorMaterializerSettings(system).withDispatcher("akka.test.stream-dispatcher")
implicit val materializer = ActorMaterializer(materializerSettings)
val NumberOfElements = 100000
@Param(Array("0", "1", "10"))
val NumberOfStreams = 0
var graph: RunnableGraph[Future[Unit]] = _
def createSource(count: Int): Graph[SourceShape[Int], Unit] = akka.stream.Fusing.aggressive(Source.repeat(1).take(count))
@Setup
def setup() {
val source = NumberOfStreams match {
// Base line: process NumberOfElements-many elements from a single source without using flatMapMerge
case 0 => createSource(NumberOfElements)
// Stream merging: process NumberOfElements-many elements from n sources, each producing (NumberOfElements/n)-many elements
case n =>
val subSource = createSource(NumberOfElements / n)
Source.repeat(()).take(n).flatMapMerge(n, _ => subSource)
}
graph = Source.fromGraph(source).toMat(Sink.ignore)(Keep.right)
}
@TearDown
def shutdown() {
system.shutdown()
system.awaitTermination()
}
@Benchmark
@OperationsPerInvocation(100000) // Note: needs to match NumberOfElements.
def flat_map_merge_100k_elements() {
Await.result(graph.run(), Inf)
}
}

View file

@ -0,0 +1,136 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream
import java.util.concurrent.TimeUnit
import akka.actor.ActorSystem
import akka.stream.scaladsl._
import com.typesafe.config.ConfigFactory
import org.openjdk.jmh.annotations._
import scala.concurrent.Lock
import scala.util.Success
import akka.stream.impl.fusing.GraphStages
import org.reactivestreams._
@State(Scope.Benchmark)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@BenchmarkMode(Array(Mode.Throughput))
class FlowMapBenchmark {
val config = ConfigFactory.parseString(
"""
akka {
log-config-on-start = off
log-dead-letters-during-shutdown = off
loglevel = "WARNING"
actor.default-dispatcher {
#executor = "thread-pool-executor"
throughput = 1024
}
actor.default-mailbox {
mailbox-type = "akka.dispatch.SingleConsumerOnlyUnboundedMailbox"
}
test {
timefactor = 1.0
filter-leeway = 3s
single-expect-default = 3s
default-timeout = 5s
calling-thread-dispatcher {
type = akka.testkit.CallingThreadDispatcherConfigurator
}
}
}""".stripMargin).withFallback(ConfigFactory.load())
implicit val system = ActorSystem("test", config)
var materializer: ActorMaterializer = _
@Param(Array("true", "false"))
val UseGraphStageIdentity = false
final val successMarker = Success(1)
final val successFailure = Success(new Exception)
// safe to be benchmark scoped because the flows we construct in this bench are stateless
var flow: Source[Int, Unit] = _
@Param(Array("8", "32", "128"))
val initialInputBufferSize = 0
@Param(Array("1", "5", "10"))
val numberOfMapOps = 0
@Setup
def setup() {
val settings = ActorMaterializerSettings(system)
.withInputBuffer(initialInputBufferSize, initialInputBufferSize)
materializer = ActorMaterializer(settings)
// Important to use a synchronous, zero overhead source, otherwise the slowness of the source
// might bias the benchmark, since the stream always adjusts the rate to the slowest stage.
val syncTestPublisher = new Publisher[Int] {
override def subscribe(s: Subscriber[_ >: Int]): Unit = {
val sub = new Subscription {
var counter = 0 // Piggyback on caller thread, no need for volatile
override def request(n: Long): Unit = {
var i = n
while (i > 0) {
s.onNext(counter)
counter += 1
if (counter == 100000) {
s.onComplete()
return
}
i -= 1
}
}
override def cancel(): Unit = ()
}
s.onSubscribe(sub)
}
}
flow = mkMaps(Source.fromPublisher(syncTestPublisher), numberOfMapOps) {
if (UseGraphStageIdentity)
GraphStages.identity[Int]
else
Flow[Int].map(identity)
}
}
@TearDown
def shutdown() {
system.shutdown()
system.awaitTermination()
}
@Benchmark
@OperationsPerInvocation(100000)
def flow_map_100k_elements() {
val lock = new Lock() // todo rethink what is the most lightweight way to await for a streams completion
lock.acquire()
flow.runWith(Sink.onComplete(_ lock.release()))(materializer)
lock.acquire()
}
// source setup
private def mkMaps[O, Mat](source: Source[O, Mat], count: Int)(flow: => Graph[FlowShape[O, O], _]): Source[O, Mat] = {
var f = source
for (i 1 to count)
f = f.via(flow)
f
}
}

View file

@ -0,0 +1,37 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream
import java.util.concurrent.TimeUnit
import org.openjdk.jmh.annotations._
@State(Scope.Benchmark)
@OutputTimeUnit(TimeUnit.SECONDS)
@BenchmarkMode(Array(Mode.Throughput))
class GraphBuilderBenchmark {
@Param(Array("1", "10", "100", "1000"))
val complexity = 0
@Benchmark
def flow_with_map() {
MaterializationBenchmark.flowWithMapBuilder(complexity)
}
@Benchmark
def graph_with_junctions() {
MaterializationBenchmark.graphWithJunctionsBuilder(complexity)
}
@Benchmark
def graph_with_nested_imports() {
MaterializationBenchmark.graphWithNestedImportsBuilder(complexity)
}
@Benchmark
def graph_with_imported_flow() {
MaterializationBenchmark.graphWithImportedFlowBuilder(complexity)
}
}

View file

@ -0,0 +1,93 @@
package akka.stream
import akka.event._
import akka.stream.impl.fusing.{ GraphInterpreterSpecKit, GraphStages, Map => MapStage }
import akka.stream.impl.fusing.GraphStages
import akka.stream.impl.fusing.GraphInterpreter.{ DownstreamBoundaryStageLogic, UpstreamBoundaryStageLogic }
import akka.stream.stage._
import org.openjdk.jmh.annotations._
import scala.concurrent.Lock
import java.util.concurrent.TimeUnit
@State(Scope.Benchmark)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@BenchmarkMode(Array(Mode.Throughput))
class InterpreterBenchmark {
import InterpreterBenchmark._
// manual, and not via @Param, because we want @OperationsPerInvocation on our tests
final val data100k: Vector[Int] = (1 to 100000).toVector
@Param(Array("1", "5", "10"))
val numberOfIds: Int = 0
@Benchmark
@OperationsPerInvocation(100000)
def graph_interpreter_100k_elements() {
new GraphInterpreterSpecKit {
new TestSetup {
val identities = Vector.fill(numberOfIds)(GraphStages.identity[Int])
val source = new GraphDataSource("source", data100k)
val sink = new GraphDataSink[Int]("sink", data100k.size)
val b = builder(identities: _*)
.connect(source, identities.head.in)
.connect(identities.last.out, sink)
// FIXME: This should not be here, this is pure setup overhead
for (i <- (0 until identities.size - 1)) {
b.connect(identities(i).out, identities(i + 1).in)
}
b.init()
sink.requestOne()
interpreter.execute(Int.MaxValue)
}
}
}
}
object InterpreterBenchmark {
case class GraphDataSource[T](override val toString: String, data: Vector[T]) extends UpstreamBoundaryStageLogic[T] {
var idx: Int = 0
override val out: akka.stream.Outlet[T] = Outlet[T]("out")
out.id = 0
setHandler(out, new OutHandler {
override def onPull(): Unit = {
if (idx < data.size) {
push(out, data(idx))
idx += 1
} else {
completeStage()
}
}
override def onDownstreamFinish(): Unit = completeStage()
})
}
case class GraphDataSink[T](override val toString: String, var expected: Int) extends DownstreamBoundaryStageLogic[T] {
override val in: akka.stream.Inlet[T] = Inlet[T]("in")
in.id = 0
setHandler(in, new InHandler {
override def onPush(): Unit = {
expected -= 1
if (expected > 0) pull(in)
// Otherwise do nothing, it will exit the interpreter
}
})
def requestOne(): Unit = pull(in)
}
val NoopBus = new LoggingBus {
override def subscribe(subscriber: Subscriber, to: Classifier): Boolean = true
override def publish(event: Event): Unit = ()
override def unsubscribe(subscriber: Subscriber, from: Classifier): Boolean = true
override def unsubscribe(subscriber: Subscriber): Unit = ()
}
}

View file

@ -0,0 +1,122 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream
import java.util.concurrent.TimeUnit
import akka.actor.ActorSystem
import akka.stream.scaladsl._
import org.openjdk.jmh.annotations._
object MaterializationBenchmark {
val flowWithMapBuilder = (numOfCombinators: Int) => {
var source = Source.single(())
for (_ <- 1 to numOfCombinators) {
source = source.map(identity)
}
source.to(Sink.ignore)
}
val graphWithJunctionsBuilder = (numOfJunctions: Int) =>
RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val broadcast = b.add(Broadcast[Unit](numOfJunctions))
var outlet = broadcast.out(0)
for (i <- 1 until numOfJunctions) {
val merge = b.add(Merge[Unit](2))
outlet ~> merge
broadcast.out(i) ~> merge
outlet = merge.out
}
Source.single(()) ~> broadcast
outlet ~> Sink.ignore
ClosedShape
})
val graphWithNestedImportsBuilder = (numOfNestedGraphs: Int) => {
var flow: Graph[FlowShape[Unit, Unit], Unit] = Flow[Unit].map(identity)
for (_ <- 1 to numOfNestedGraphs) {
flow = GraphDSL.create(flow) { b
flow
FlowShape(flow.in, flow.out)
}
}
RunnableGraph.fromGraph(GraphDSL.create(flow) { implicit b
flow
import GraphDSL.Implicits._
Source.single(()) ~> flow ~> Sink.ignore
ClosedShape
})
}
val graphWithImportedFlowBuilder = (numOfFlows: Int) =>
RunnableGraph.fromGraph(GraphDSL.create(Source.single(())) { implicit b source
import GraphDSL.Implicits._
val flow = Flow[Unit].map(identity)
var out: Outlet[Unit] = source.out
for (i <- 0 until numOfFlows) {
val flowShape = b.add(flow)
out ~> flowShape
out = flowShape.outlet
}
out ~> Sink.ignore
ClosedShape
})
}
@State(Scope.Benchmark)
@OutputTimeUnit(TimeUnit.SECONDS)
@BenchmarkMode(Array(Mode.Throughput))
class MaterializationBenchmark {
import MaterializationBenchmark._
implicit val system = ActorSystem("MaterializationBenchmark")
implicit val materializer = ActorMaterializer()
var flowWithMap: RunnableGraph[Unit] = _
var graphWithJunctions: RunnableGraph[Unit] = _
var graphWithNestedImports: RunnableGraph[Unit] = _
var graphWithImportedFlow: RunnableGraph[Unit] = _
@Param(Array("1", "10", "100", "1000"))
val complexity = 0
@Setup
def setup() {
flowWithMap = flowWithMapBuilder(complexity)
graphWithJunctions = graphWithJunctionsBuilder(complexity)
graphWithNestedImports = graphWithNestedImportsBuilder(complexity)
graphWithImportedFlow = graphWithImportedFlowBuilder(complexity)
}
@TearDown
def shutdown() {
system.shutdown()
system.awaitTermination()
}
@Benchmark
def flow_with_map() {
flowWithMap.run()
}
@Benchmark
def graph_with_junctions() {
graphWithJunctions.run()
}
@Benchmark
def graph_with_nested_imports() {
graphWithNestedImports.run()
}
@Benchmark
def graph_with_imported_flow() {
graphWithImportedFlow.run()
}
}

View file

@ -0,0 +1,105 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.io
import java.io.{FileInputStream, File}
import java.util.concurrent.TimeUnit
import akka.actor.ActorSystem
import akka.stream.{Attributes, ActorMaterializer}
import akka.stream.scaladsl._
import akka.util.ByteString
import org.openjdk.jmh.annotations._
import scala.concurrent.duration._
import scala.concurrent.{Promise, Await, Future}
/**
* Benchmark (bufSize) Mode Cnt Score Error Units
* FileSourcesBenchmark.fileChannel 2048 avgt 100 1140.192 ± 55.184 ms/op
*/
@State(Scope.Benchmark)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@BenchmarkMode(Array(Mode.AverageTime))
class FileSourcesBenchmark {
implicit val system = ActorSystem("file-sources-benchmark")
implicit val materializer = ActorMaterializer()
val file: File = {
val line = ByteString("x" * 2048 + "\n")
val f = File.createTempFile(getClass.getName, ".bench.tmp")
f.deleteOnExit()
val ft = Source.fromIterator(() Iterator.continually(line))
.take(10 * 39062) // adjust as needed
.runWith(FileIO.toFile(f))
Await.result(ft, 30.seconds)
f
}
@Param(Array("2048"))
val bufSize = 0
var fileChannelSource: Source[ByteString, Future[Long]] = _
var fileInputStreamSource: Source[ByteString, Future[Long]] = _
var ioSourceLinesIterator: Source[ByteString, Unit] = _
@Setup
def setup() {
fileChannelSource = FileIO.fromFile(file, bufSize)
fileInputStreamSource = StreamConverters.fromInputStream(() new FileInputStream(file), bufSize)
ioSourceLinesIterator = Source.fromIterator(() scala.io.Source.fromFile(file).getLines()).map(ByteString(_))
}
@TearDown
def teardown(): Unit = {
file.delete()
}
@TearDown
def shutdown() {
system.shutdown()
system.awaitTermination()
}
@Benchmark
def fileChannel() = {
val h = fileChannelSource.to(Sink.ignore).run()
Await.result(h, 30.seconds)
}
@Benchmark
def fileChannel_noReadAhead() = {
val h = fileChannelSource.withAttributes(Attributes.inputBuffer(1, 1)).to(Sink.ignore).run()
Await.result(h, 30.seconds)
}
@Benchmark
def inputStream() = {
val h = fileInputStreamSource.to(Sink.ignore).run()
Await.result(h, 30.seconds)
}
/*
* The previous status quo was very slow:
* Benchmark Mode Cnt Score Error Units
* FileSourcesBenchmark.naive_ioSourceLinesIterator avgt 20 7067.944 ± 1341.847 ms/op
*/
@Benchmark
def naive_ioSourceLinesIterator() = {
val p = Promise[Unit]()
ioSourceLinesIterator.to(Sink.onComplete(p.complete(_))).run()
Await.result(p.future, 30.seconds)
}
}