Merge pull request #19954 from agolubev/19481-Fix_akka-bench-agolubev

=str #19481 Fix akka-bench
This commit is contained in:
Konrad Malawski 2016-03-06 22:47:30 +01:00
commit c294a22d22
19 changed files with 80 additions and 84 deletions

View file

@ -34,7 +34,7 @@ class ActorCreationBenchmark {
}
@TearDown(Level.Trial)
def shutdown() {
def shutdown():Unit = {
system.terminate()
Await.ready(system.whenTerminated, 15.seconds)
}

View file

@ -28,7 +28,7 @@ class ForkJoinActorBenchmark {
implicit var system: ActorSystem = _
@Setup(Level.Trial)
def setup() {
def setup():Unit = {
system = ActorSystem("ForkJoinActorBenchmark", ConfigFactory.parseString(
s"""| akka {
| log-dead-letters = off
@ -48,7 +48,7 @@ class ForkJoinActorBenchmark {
}
@TearDown(Level.Trial)
def shutdown() {
def shutdown():Unit = {
system.terminate()
Await.ready(system.whenTerminated, 15.seconds)
}
@ -56,7 +56,7 @@ class ForkJoinActorBenchmark {
@Benchmark
@Measurement(timeUnit = TimeUnit.MILLISECONDS)
@OperationsPerInvocation(messages)
def pingPong = {
def pingPong():Unit = {
val ping = system.actorOf(Props[ForkJoinActorBenchmark.PingPong])
val pong = system.actorOf(Props[ForkJoinActorBenchmark.PingPong])
@ -72,7 +72,7 @@ class ForkJoinActorBenchmark {
@Benchmark
@Measurement(timeUnit = TimeUnit.MILLISECONDS)
@OperationsPerInvocation(messages)
def floodPipe = {
def floodPipe():Unit = {
val end = system.actorOf(Props(classOf[ForkJoinActorBenchmark.Pipe], None))
val middle = system.actorOf(Props(classOf[ForkJoinActorBenchmark.Pipe], Some(end)))

View file

@ -26,7 +26,7 @@ class RouterPoolCreationBenchmark {
var size = 0
@TearDown(Level.Trial)
def shutdown() {
def shutdown():Unit = {
system.terminate()
Await.ready(system.whenTerminated, 15.seconds)
}

View file

@ -56,13 +56,13 @@ class ScheduleBenchmark {
var promise: Promise[Any] = _
@Setup(Level.Iteration)
def setup() {
def setup():Unit = {
winner = (to * ratio + 1).toInt
promise = Promise[Any]()
}
@TearDown
def shutdown() {
def shutdown():Unit = {
system.terminate()
Await.ready(system.whenTerminated, 15.seconds)
}
@ -70,7 +70,7 @@ class ScheduleBenchmark {
def op(idx: Int) = if (idx == winner) promise.trySuccess(idx) else idx
@Benchmark
def oneSchedule = {
def oneSchedule():Unit = {
val aIdx = new AtomicInteger(1)
val tryWithNext = scheduler.schedule(0.millis, interval) {
val idx = aIdx.getAndIncrement
@ -84,7 +84,7 @@ class ScheduleBenchmark {
}
@Benchmark
def multipleScheduleOnce = {
def multipleScheduleOnce():Unit = {
val tryWithNext = (1 to to).foldLeft(0.millis -> List[Cancellable]()) {
case ((interv, c), idx)
(interv + interval, scheduler.scheduleOnce(interv) {

View file

@ -35,7 +35,7 @@ class StashCreationBenchmark {
val probe = TestProbe()
@TearDown(Level.Trial)
def shutdown() {
def shutdown():Unit = {
system.terminate()
Await.ready(system.whenTerminated, 15.seconds)
}

View file

@ -25,7 +25,7 @@ class TellOnlyBenchmark {
implicit var system: ActorSystem = _
@Setup(Level.Trial)
def setup() {
def setup():Unit = {
system = ActorSystem("TellOnlyBenchmark", ConfigFactory.parseString(
s"""| akka {
| log-dead-letters = off
@ -50,7 +50,7 @@ class TellOnlyBenchmark {
}
@TearDown(Level.Trial)
def shutdown() {
def shutdown():Unit = {
system.terminate()
Await.ready(system.whenTerminated, 15.seconds)
}
@ -59,7 +59,7 @@ class TellOnlyBenchmark {
var probe: TestProbe = _
@Setup(Level.Iteration)
def setupIteration() {
def setupIteration():Unit = {
actor = system.actorOf(Props[TellOnlyBenchmark.Echo].withDispatcher("dropping-dispatcher"))
probe = TestProbe()
probe.watch(actor)
@ -71,7 +71,7 @@ class TellOnlyBenchmark {
}
@TearDown(Level.Iteration)
def shutdownIteration() {
def shutdownIteration():Unit = {
probe.send(actor, flipDrop)
probe.expectNoMsg(200.millis)
actor ! stop
@ -82,7 +82,7 @@ class TellOnlyBenchmark {
@Benchmark
@OutputTimeUnit(TimeUnit.MICROSECONDS)
def tell() = {
def tell():Unit = {
probe.send(actor, message)
}
}
@ -105,7 +105,7 @@ object TellOnlyBenchmark {
class DroppingMessageQueue extends UnboundedMailbox.MessageQueue {
@volatile var dropping = false
override def enqueue(receiver: ActorRef, handle: Envelope) = {
override def enqueue(receiver: ActorRef, handle: Envelope):Unit = {
if (handle.message == flipDrop) dropping = !dropping
else if (!dropping) super.enqueue(receiver, handle)
}

View file

@ -48,7 +48,7 @@ class ORSetMergeBenchmark {
var elem2: String = _
@Setup(Level.Trial)
def setup() {
def setup():Unit = {
set1 = (1 to set1Size).foldLeft(ORSet.empty[String])((s, n) => s.add(nextNode(), "elem" + n))
addFromSameNode = set1.add(nodeA, "elem" + set1Size + 1).merge(set1)
addFromOtherNode = set1.add(nodeB, "elem" + set1Size + 1).merge(set1)

View file

@ -45,7 +45,7 @@ class VersionVectorBenchmark {
var dot1: VersionVector = _
@Setup(Level.Trial)
def setup() {
def setup():Unit = {
vv1 = (1 to size).foldLeft(VersionVector.empty)((vv, n) => vv + nextNode())
vv2 = vv1 + nextNode()
vv3 = vv1 + nextNode()

View file

@ -38,7 +38,7 @@ class HttpBenchmark {
var pool: Flow[(HttpRequest, Int), (Try[HttpResponse], Int), _] = _
@Setup
def setup() = {
def setup():Unit = {
val route = {
path("test") {
get {
@ -53,21 +53,21 @@ class HttpBenchmark {
}
@TearDown
def shutdown() = {
def shutdown():Unit ={
Await.ready(Http().shutdownAllConnectionPools(), 1.second)
binding.unbind()
Await.result(system.terminate(), 5.seconds)
}
@Benchmark
def single_request() = {
def single_request():Unit = {
import system.dispatcher
val response = Await.result(Http().singleRequest(request), 1.second)
Await.result(Unmarshal(response.entity).to[String], 1.second)
}
@Benchmark
def single_request_pool() = {
def single_request_pool():Unit = {
import system.dispatcher
val (response, id) = Await.result(Source.single(HttpRequest(uri = "/test") -> 42).via(pool).runWith(Sink.head), 1.second)
Await.result(Unmarshal(response.get.entity).to[String], 1.second)

View file

@ -45,7 +45,7 @@ class LevelDbBatchingBenchmark {
val batch_200 = List.fill(200) { AtomicWrite(PersistentRepr("data", 12, "pa")) }
@Setup(Level.Trial)
def setup() {
def setup():Unit = {
sys = ActorSystem("sys")
deleteStorage(sys)
SharedLeveldbJournal.setStore(store, sys)
@ -55,7 +55,7 @@ class LevelDbBatchingBenchmark {
}
@TearDown(Level.Trial)
def tearDown() {
def tearDown():Unit = {
store ! PoisonPill
Thread.sleep(500)
@ -66,7 +66,7 @@ class LevelDbBatchingBenchmark {
@Benchmark
@Measurement(timeUnit = TimeUnit.MICROSECONDS)
@OperationsPerInvocation(1)
def write_1() = {
def write_1():Unit = {
probe.send(store, WriteMessages(batch_1))
probe.expectMsgType[Any]
}
@ -74,7 +74,7 @@ class LevelDbBatchingBenchmark {
@Benchmark
@Measurement(timeUnit = TimeUnit.MICROSECONDS)
@OperationsPerInvocation(10)
def writeBatch_10() = {
def writeBatch_10():Unit = {
probe.send(store, WriteMessages(batch_10))
probe.expectMsgType[Any]
}
@ -82,7 +82,7 @@ class LevelDbBatchingBenchmark {
@Benchmark
@Measurement(timeUnit = TimeUnit.MICROSECONDS)
@OperationsPerInvocation(100)
def writeBatch_100() = {
def writeBatch_100():Unit = {
probe.send(store, WriteMessages(batch_100))
probe.expectMsgType[Any]
}
@ -90,7 +90,7 @@ class LevelDbBatchingBenchmark {
@Benchmark
@Measurement(timeUnit = TimeUnit.MICROSECONDS)
@OperationsPerInvocation(200)
def writeBatch_200() = {
def writeBatch_200():Unit = {
probe.send(store, WriteMessages(batch_200))
probe.expectMsgType[Any]
}

View file

@ -43,7 +43,7 @@ class PersistentActorDeferBenchmark {
val data10k = (1 to 10000).toArray
@Setup
def setup() {
def setup():Unit = {
system = ActorSystem("test", config)
probe = TestProbe()(system)
@ -54,7 +54,7 @@ class PersistentActorDeferBenchmark {
}
@TearDown
def shutdown() {
def shutdown():Unit = {
system.terminate()
Await.ready(system.whenTerminated, 15.seconds)
@ -63,7 +63,7 @@ class PersistentActorDeferBenchmark {
@Benchmark
@OperationsPerInvocation(10000)
def tell_persistAsync_defer_persistAsync_reply() {
def tell_persistAsync_defer_persistAsync_reply():Unit = {
for (i <- data10k) persistAsync_defer.tell(i, probe.ref)
probe.expectMsg(data10k.last)
@ -71,7 +71,7 @@ class PersistentActorDeferBenchmark {
@Benchmark
@OperationsPerInvocation(10000)
def tell_persistAsync_defer_persistAsync_replyASAP() {
def tell_persistAsync_defer_persistAsync_replyASAP():Unit = {
for (i <- data10k) persistAsync_defer_replyASAP.tell(i, probe.ref)
probe.expectMsg(data10k.last)

View file

@ -35,7 +35,7 @@ class PersistentActorThroughputBenchmark {
val data10k = (1 to 10000).toArray
@Setup
def setup() {
def setup():Unit = {
system = ActorSystem("test", config)
probe = TestProbe()(system)
@ -52,7 +52,7 @@ class PersistentActorThroughputBenchmark {
}
@TearDown
def shutdown() {
def shutdown():Unit = {
system.terminate()
Await.ready(system.whenTerminated, 15.seconds)
@ -61,7 +61,7 @@ class PersistentActorThroughputBenchmark {
@Benchmark
@OperationsPerInvocation(10000)
def actor_normalActor_reply_baseline() {
def actor_normalActor_reply_baseline():Unit = {
for (i <- data10k) actor.tell(i, probe.ref)
probe.expectMsg(data10k.last)
@ -69,7 +69,7 @@ class PersistentActorThroughputBenchmark {
@Benchmark
@OperationsPerInvocation(10000)
def persistentActor_persist_reply() {
def persistentActor_persist_reply():Unit = {
for (i <- data10k) persistPersistentActor.tell(i, probe.ref)
probe.expectMsg(Evt(data10k.last))
@ -77,7 +77,7 @@ class PersistentActorThroughputBenchmark {
@Benchmark
@OperationsPerInvocation(10000)
def persistentActor_persistAsync_reply() {
def persistentActor_persistAsync_reply():Unit = {
for (i <- data10k) persistAsync1PersistentActor.tell(i, probe.ref)
probe.expectMsg(Evt(data10k.last))
@ -85,7 +85,7 @@ class PersistentActorThroughputBenchmark {
@Benchmark
@OperationsPerInvocation(10000)
def persistentActor_noPersist_reply() {
def persistentActor_noPersist_reply():Unit = {
for (i <- data10k) noPersistPersistentActor.tell(i, probe.ref)
probe.expectMsg(Evt(data10k.last))
@ -93,7 +93,7 @@ class PersistentActorThroughputBenchmark {
@Benchmark
@OperationsPerInvocation(10000)
def persistentActor_persistAsync_replyRightOnCommandReceive() {
def persistentActor_persistAsync_replyRightOnCommandReceive():Unit = {
for (i <- data10k) persistAsyncQuickReplyPersistentActor.tell(i, probe.ref)
probe.expectMsg(Evt(data10k.last))

View file

@ -36,7 +36,7 @@ class PersistentActorWithAtLeastOnceDeliveryBenchmark {
val dataCount = 10000
@Setup
def setup() {
def setup():Unit = {
system = ActorSystem("PersistentActorWithAtLeastOnceDeliveryBenchmark", config)
probe = TestProbe()(system)
@ -51,7 +51,7 @@ class PersistentActorWithAtLeastOnceDeliveryBenchmark {
}
@TearDown
def shutdown() {
def shutdown():Unit = {
system.terminate()
Await.ready(system.whenTerminated, 15.seconds)
@ -60,7 +60,7 @@ class PersistentActorWithAtLeastOnceDeliveryBenchmark {
@Benchmark
@OperationsPerInvocation(10000)
def persistentActor_persistAsync_with_AtLeastOnceDelivery() {
def persistentActor_persistAsync_with_AtLeastOnceDelivery():Unit = {
for (i <- 1 to dataCount)
persistAsyncPersistentActorWithAtLeastOnceDelivery.tell(i, probe.ref)
probe.expectMsg(20.seconds, Evt(dataCount))
@ -68,7 +68,7 @@ class PersistentActorWithAtLeastOnceDeliveryBenchmark {
@Benchmark
@OperationsPerInvocation(10000)
def persistentActor_persist_with_AtLeastOnceDelivery() {
def persistentActor_persist_with_AtLeastOnceDelivery():Unit = {
for (i <- 1 to dataCount)
persistPersistentActorWithAtLeastOnceDelivery.tell(i, probe.ref)
probe.expectMsg(2.minutes, Evt(dataCount))
@ -76,7 +76,7 @@ class PersistentActorWithAtLeastOnceDeliveryBenchmark {
@Benchmark
@OperationsPerInvocation(10000)
def persistentActor_noPersist_with_AtLeastOnceDelivery() {
def persistentActor_noPersist_with_AtLeastOnceDelivery():Unit = {
for (i <- 1 to dataCount)
noPersistPersistentActorWithAtLeastOnceDelivery.tell(i, probe.ref)
probe.expectMsg(20.seconds, Evt(dataCount))

View file

@ -23,14 +23,14 @@ class FlatMapMergeBenchmark {
val NumberOfElements = 100000
@Param(Array("0", "1", "10"))
val NumberOfStreams = 0
var NumberOfStreams = 0
var graph: RunnableGraph[Future[Done]] = _
def createSource(count: Int): Graph[SourceShape[Int], NotUsed] = akka.stream.Fusing.aggressive(Source.repeat(1).take(count))
@Setup
def setup() {
def setup():Unit = {
val source = NumberOfStreams match {
// Base line: process NumberOfElements-many elements from a single source without using flatMapMerge
case 0 => createSource(NumberOfElements)
@ -43,13 +43,13 @@ class FlatMapMergeBenchmark {
}
@TearDown
def shutdown() {
def shutdown():Unit = {
Await.result(system.terminate(), 5.seconds)
}
@Benchmark
@OperationsPerInvocation(100000) // Note: needs to match NumberOfElements.
def flat_map_merge_100k_elements() {
def flat_map_merge_100k_elements():Unit = {
Await.result(graph.run(), Duration.Inf)
}
}

View file

@ -54,7 +54,7 @@ class FlowMapBenchmark {
var materializer: ActorMaterializer = _
@Param(Array("true", "false"))
val UseGraphStageIdentity = false
var UseGraphStageIdentity = false
final val successMarker = Success(1)
final val successFailure = Success(new Exception)
@ -63,13 +63,13 @@ class FlowMapBenchmark {
var flow: Source[Int, NotUsed] = _
@Param(Array("8", "32", "128"))
val initialInputBufferSize = 0
var initialInputBufferSize = 0
@Param(Array("1", "5", "10"))
val numberOfMapOps = 0
var numberOfMapOps = 0
@Setup
def setup() {
def setup():Unit = {
val settings = ActorMaterializerSettings(system)
.withInputBuffer(initialInputBufferSize, initialInputBufferSize)
@ -111,13 +111,13 @@ class FlowMapBenchmark {
}
@TearDown
def shutdown() {
def shutdown():Unit = {
Await.result(system.terminate(), 5.seconds)
}
@Benchmark
@OperationsPerInvocation(100000)
def flow_map_100k_elements() {
def flow_map_100k_elements():Unit = {
val lock = new Lock() // todo rethink what is the most lightweight way to await for a streams completion
lock.acquire()

View file

@ -13,25 +13,25 @@ import org.openjdk.jmh.annotations._
class GraphBuilderBenchmark {
@Param(Array("1", "10", "100", "1000"))
val complexity = 0
var complexity = 0
@Benchmark
def flow_with_map() {
def flow_with_map():Unit = {
MaterializationBenchmark.flowWithMapBuilder(complexity)
}
@Benchmark
def graph_with_junctions() {
def graph_with_junctions():Unit ={
MaterializationBenchmark.graphWithJunctionsBuilder(complexity)
}
@Benchmark
def graph_with_nested_imports() {
def graph_with_nested_imports():Unit = {
MaterializationBenchmark.graphWithNestedImportsBuilder(complexity)
}
@Benchmark
def graph_with_imported_flow() {
def graph_with_imported_flow():Unit = {
MaterializationBenchmark.graphWithImportedFlowBuilder(complexity)
}
}

View file

@ -20,11 +20,11 @@ class InterpreterBenchmark {
final val data100k: Vector[Int] = (1 to 100000).toVector
@Param(Array("1", "5", "10"))
val numberOfIds: Int = 0
var numberOfIds: Int = 0
@Benchmark
@OperationsPerInvocation(100000)
def graph_interpreter_100k_elements() {
def graph_interpreter_100k_elements():Unit = {
new GraphInterpreterSpecKit {
new TestSetup {
val identities = Vector.fill(numberOfIds)(GraphStages.identity[Int])

View file

@ -88,10 +88,10 @@ class MaterializationBenchmark {
var graphWithImportedFlow: RunnableGraph[NotUsed] = _
@Param(Array("1", "10", "100", "1000"))
val complexity = 0
var complexity = 0
@Setup
def setup() {
def setup():Unit = {
flowWithMap = flowWithMapBuilder(complexity)
graphWithJunctions = graphWithJunctionsBuilder(complexity)
graphWithNestedImports = graphWithNestedImportsBuilder(complexity)
@ -99,27 +99,23 @@ class MaterializationBenchmark {
}
@TearDown
def shutdown() {
def shutdown():Unit = {
Await.result(system.terminate(), 5.seconds)
}
@Benchmark
def flow_with_map() {
flowWithMap.run()
}
def flow_with_map():Unit = flowWithMap.run()
@Benchmark
def graph_with_junctions() {
graphWithJunctions.run()
}
def graph_with_junctions():Unit = graphWithJunctions.run()
@Benchmark
def graph_with_nested_imports() {
graphWithNestedImports.run()
}
def graph_with_nested_imports():Unit = graphWithNestedImports.run()
@Benchmark
def graph_with_imported_flow() {
graphWithImportedFlow.run()
}
def graph_with_imported_flow():Unit = graphWithImportedFlow.run()
}

View file

@ -43,14 +43,14 @@ class FileSourcesBenchmark {
}
@Param(Array("2048"))
val bufSize = 0
var bufSize = 0
var fileChannelSource: Source[ByteString, Future[IOResult]] = _
var fileInputStreamSource: Source[ByteString, Future[IOResult]] = _
var ioSourceLinesIterator: Source[ByteString, NotUsed] = _
@Setup
def setup() {
def setup():Unit = {
fileChannelSource = FileIO.fromFile(file, bufSize)
fileInputStreamSource = StreamConverters.fromInputStream(() new FileInputStream(file), bufSize)
ioSourceLinesIterator = Source.fromIterator(() scala.io.Source.fromFile(file).getLines()).map(ByteString(_))
@ -62,26 +62,26 @@ class FileSourcesBenchmark {
}
@TearDown
def shutdown() {
def shutdown():Unit = {
Await.result(system.terminate(), Duration.Inf)
}
@Benchmark
def fileChannel() = {
def fileChannel():Unit = {
val h = fileChannelSource.to(Sink.ignore).run()
Await.result(h, 30.seconds)
}
@Benchmark
def fileChannel_noReadAhead() = {
def fileChannel_noReadAhead():Unit = {
val h = fileChannelSource.withAttributes(Attributes.inputBuffer(1, 1)).to(Sink.ignore).run()
Await.result(h, 30.seconds)
}
@Benchmark
def inputStream() = {
def inputStream():Unit = {
val h = fileInputStreamSource.to(Sink.ignore).run()
Await.result(h, 30.seconds)
@ -93,7 +93,7 @@ class FileSourcesBenchmark {
* FileSourcesBenchmark.naive_ioSourceLinesIterator avgt 20 7067.944 ± 1341.847 ms/op
*/
@Benchmark
def naive_ioSourceLinesIterator() = {
def naive_ioSourceLinesIterator():Unit = {
val p = Promise[Done]()
ioSourceLinesIterator.to(Sink.onComplete(p.complete(_))).run()