Merge pull request #15586 from ktoso/forward-port-test-hardedning-ktoso

For Validation, 3 test hardening commits from release-2.3
This commit is contained in:
Konrad Malawski 2014-07-23 17:06:53 +02:00
commit 69aa415f43
6 changed files with 164 additions and 46 deletions

View file

@ -0,0 +1,108 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence
import java.io.File
import java.util.concurrent.TimeUnit
import akka.actor._
import akka.persistence.journal.AsyncWriteTarget._
import akka.persistence.journal.leveldb.{SharedLeveldbJournal, SharedLeveldbStore}
import akka.testkit.TestProbe
import org.apache.commons.io.FileUtils
import org.openjdk.jmh.annotations._
/*
# OS: OSX 10.9.3
# CPU: Intel(R) Core(TM) i7-4850HQ CPU @ 2.30GHz
# Date: Mon Jul 23 11:07:42 CEST 2014
This bench emulates what we provide with "Processor batching".
As expected, batching writes is better than writing 1 by 1.
The important thing though is that there didn't appear to be any "write latency spikes" throughout this bench.
[info] Benchmark Mode Samples Score Score error Units
[info] a.p.LevelDbBatchingBenchmark.write_1 avgt 20 0.799 0.011 ms/op
[info] a.p.LevelDbBatchingBenchmark.writeBatch_10 avgt 20 0.117 0.001 ms/op
[info] a.p.LevelDbBatchingBenchmark.writeBatch_100 avgt 20 0.050 0.000 ms/op
[info] a.p.LevelDbBatchingBenchmark.writeBatch_200 avgt 20 0.041 0.001 ms/op
*/
@Fork(1)
@Threads(10)
@State(Scope.Benchmark)
@BenchmarkMode(Array(Mode.AverageTime))
class LevelDbBatchingBenchmark {
var sys: ActorSystem = _
var probe: TestProbe = _
var store: ActorRef = _
val batch_1 = List.fill(1) { PersistentRepr("data", 12, "pa") }
val batch_10 = List.fill(10) { PersistentRepr("data", 12, "pa") }
val batch_100 = List.fill(100) { PersistentRepr("data", 12, "pa") }
val batch_200 = List.fill(200) { PersistentRepr("data", 12, "pa") }
@Setup(Level.Trial)
def setup() {
sys = ActorSystem("sys")
deleteStorage(sys)
SharedLeveldbJournal.setStore(store, sys)
probe = TestProbe()(sys)
store = sys.actorOf(Props[SharedLeveldbStore], "store")
}
@TearDown(Level.Trial)
def tearDown() {
store ! PoisonPill
Thread.sleep(500)
sys.shutdown()
sys.awaitTermination()
}
@Benchmark
@Measurement(timeUnit = TimeUnit.MICROSECONDS)
@OperationsPerInvocation(1)
def write_1() = {
probe.send(store, WriteMessages(batch_1))
probe.expectMsgType[Any]
}
@Benchmark
@Measurement(timeUnit = TimeUnit.MICROSECONDS)
@OperationsPerInvocation(10)
def writeBatch_10() = {
probe.send(store, WriteMessages(batch_10))
probe.expectMsgType[Any]
}
@Benchmark
@Measurement(timeUnit = TimeUnit.MICROSECONDS)
@OperationsPerInvocation(100)
def writeBatch_100() = {
probe.send(store, WriteMessages(batch_100))
probe.expectMsgType[Any]
}
@Benchmark
@Measurement(timeUnit = TimeUnit.MICROSECONDS)
@OperationsPerInvocation(200)
def writeBatch_200() = {
probe.send(store, WriteMessages(batch_200))
probe.expectMsgType[Any]
}
// TOOLS
private def deleteStorage(sys: ActorSystem) {
val storageLocations = List(
"akka.persistence.journal.leveldb.dir",
"akka.persistence.journal.leveldb-shared.store.dir",
"akka.persistence.snapshot-store.local.dir").map(s new File(sys.settings.config.getString(s)))
storageLocations.foreach(FileUtils.deleteDirectory)
}
}

View file

@ -87,7 +87,7 @@ class TimerBasedThrottlerSpec extends TestKit(ActorSystem("TimerBasedThrottlerSp
expectNoMsg(1 second)
throttler ! SetTarget(Some(echo))
4 to 7 foreach { throttler ! _ }
within(0.5 seconds, 1.5 seconds) {
within(1.5 seconds) {
4 to 7 foreach { expectMsg(_) }
}
}
@ -104,7 +104,7 @@ class TimerBasedThrottlerSpec extends TestKit(ActorSystem("TimerBasedThrottlerSp
}
expectNoMsg(1 second)
throttler ! SetTarget(Some(echo))
within(0.5 seconds, 1.5 seconds) {
within(1.5 seconds) {
4 to 7 foreach { expectMsg(_) }
}
}

View file

@ -1,16 +1,12 @@
package akka.persistence.journal.leveldb
import com.typesafe.config.ConfigFactory
import akka.persistence.journal.{ JournalPerfSpec, JournalSpec }
import akka.persistence.PluginCleanup
import akka.persistence.journal.JournalSpec
import akka.persistence.{PersistenceSpec, PluginCleanup}
class LeveldbJournalJavaSpec extends JournalSpec with PluginCleanup {
lazy val config = ConfigFactory.parseString(
"""
|akka.persistence.journal.plugin = "akka.persistence.journal.leveldb"
|akka.persistence.journal.leveldb.native = off
|akka.persistence.journal.leveldb.dir = "target/journal-java"
|akka.persistence.snapshot-store.local.dir = "target/snapshots-java/"
""".stripMargin)
lazy val config = PersistenceSpec.config(
"leveldb",
"LeveldbJournalJavaSpec",
extraConfig = Some("akka.persistence.journal.leveldb.native = off")
)
}

View file

@ -3,19 +3,15 @@
*/
package akka.persistence.journal.leveldb
import com.typesafe.config.ConfigFactory
import akka.persistence.journal.{ JournalPerfSpec, JournalSpec }
import akka.persistence.PluginCleanup
import akka.persistence.journal.{JournalPerfSpec, JournalSpec}
import akka.persistence.{PersistenceSpec, PluginCleanup}
import org.scalatest.DoNotDiscover
@DoNotDiscover // because only checking that compilation is OK with JournalPerfSpec
class LeveldbJournalNativePerfSpec extends JournalSpec with JournalPerfSpec with PluginCleanup {
lazy val config = ConfigFactory.parseString(
"""
|akka.persistence.journal.plugin = "akka.persistence.journal.leveldb"
|akka.persistence.journal.leveldb.native = on
|akka.persistence.journal.leveldb.dir = "target/journal-native"
|akka.persistence.snapshot-store.local.dir = "target/snapshots-native/"
""".stripMargin)
lazy val config = PersistenceSpec.config(
"leveldb",
"LeveldbJournalNativePerfSpec",
extraConfig = Some("akka.persistence.journal.leveldb.native = on")
)
}

View file

@ -1,16 +1,13 @@
package akka.persistence.journal.leveldb
import com.typesafe.config.ConfigFactory
import akka.persistence.journal.{ JournalPerfSpec, JournalSpec }
import akka.persistence.PluginCleanup
import akka.persistence.journal.JournalSpec
import akka.persistence.{PersistenceSpec, PluginCleanup}
class LeveldbJournalNativeSpec extends JournalSpec with PluginCleanup {
lazy val config = ConfigFactory.parseString(
"""
|akka.persistence.journal.plugin = "akka.persistence.journal.leveldb"
|akka.persistence.journal.leveldb.native = on
|akka.persistence.journal.leveldb.dir = "target/journal-native"
|akka.persistence.snapshot-store.local.dir = "target/snapshots-native/"
""".stripMargin)
lazy val config = PersistenceSpec.config(
"leveldb",
"LeveldbJournalNativeSpec",
extraConfig = Some("akka.persistence.journal.leveldb.native = on")
)
}

View file

@ -3,14 +3,13 @@
*/
package akka.persistence
import akka.actor._
import akka.persistence.AtLeastOnceDelivery.{ AtLeastOnceDeliverySnapshot, UnconfirmedWarning }
import akka.testkit._
import com.typesafe.config._
import scala.concurrent.duration._
import scala.util.control.NoStackTrace
import com.typesafe.config._
import akka.actor._
import akka.testkit._
import akka.persistence.AtLeastOnceDelivery.AtLeastOnceDeliverySnapshot
import akka.persistence.AtLeastOnceDelivery.UnconfirmedWarning
import akka.persistence.AtLeastOnceDelivery.UnconfirmedWarning
object AtLeastOnceDeliverySpec {
@ -43,10 +42,16 @@ object AtLeastOnceDeliverySpec {
override def persistenceId: String = name
// simplistic confirmation mechanism, to tell the requester that a snapshot succeeded
var lastSnapshotAskedForBy: Option[ActorRef] = None
def updateState(evt: Evt): Unit = evt match {
case AcceptedReq(payload, destination)
log.debug(s"deliver(destination, deliveryId ⇒ Action(deliveryId, $payload)), recovery: " + recoveryRunning)
deliver(destination, deliveryId Action(deliveryId, payload))
case ReqDone(id)
log.debug(s"confirmDelivery($id), recovery: " + recoveryRunning)
confirmDelivery(id)
}
@ -77,21 +82,30 @@ object AtLeastOnceDeliverySpec {
persist(ReqDone(id)) { evt updateState(evt) }
case Boom
log.debug("Boom!")
throw new RuntimeException("boom") with NoStackTrace
case SaveSnap
log.debug("Save snapshot!")
lastSnapshotAskedForBy = Some(sender())
saveSnapshot(Snap(getDeliverySnapshot))
case success: SaveSnapshotSuccess
log.debug("Snapshot success!")
lastSnapshotAskedForBy.map(_ ! success)
case w: UnconfirmedWarning
log.debug("Sender got unconfirmed warning {}", w)
testActor ! w
}
def receiveRecover: Receive = {
case evt: Evt updateState(evt)
case evt: Evt
updateState(evt)
case SnapshotOffer(_, Snap(deliverySnapshot))
setDeliverySnapshot(deliverySnapshot)
}
}
@ -134,7 +148,7 @@ object AtLeastOnceDeliverySpec {
}
abstract class AtLeastOnceDeliverySpec(config: Config) extends AkkaSpec(config) with PersistenceSpec with ImplicitSender {
import AtLeastOnceDeliverySpec._
import akka.persistence.AtLeastOnceDeliverySpec._
"AtLeastOnceDelivery" must {
"deliver messages in order when nothing is lost" in {
@ -208,7 +222,7 @@ abstract class AtLeastOnceDeliverySpec(config: Config) extends AkkaSpec(config)
val probeA = TestProbe()
val dst = system.actorOf(destinationProps(probeA.ref))
val destinations = Map("A" -> system.actorOf(unreliableProps(3, dst)).path)
val snd = system.actorOf(senderProps(testActor, name, 500.millis, 5, async = false, destinations), name)
val snd = system.actorOf(senderProps(testActor, name, 1000.millis, 5, async = false, destinations), name)
snd ! Req("a-1")
expectMsg(ReqAck)
probeA.expectMsg(Action(1, "a-1"))
@ -225,6 +239,9 @@ abstract class AtLeastOnceDeliverySpec(config: Config) extends AkkaSpec(config)
// a-3 was lost
probeA.expectMsg(Action(4, "a-4"))
// after snapshot succeeded
expectMsgType[SaveSnapshotSuccess]
// trigger restart
snd ! Boom
@ -288,6 +305,10 @@ abstract class AtLeastOnceDeliverySpec(config: Config) extends AkkaSpec(config)
}
}
class LeveldbAtLeastOnceDeliverySpec extends AtLeastOnceDeliverySpec(PersistenceSpec.config("leveldb", "AtLeastOnceDeliverySpec"))
class LeveldbAtLeastOnceDeliverySpec extends AtLeastOnceDeliverySpec(
// TODO disable debug logging once happy with stability of this test
ConfigFactory.parseString("""akka.logLevel = DEBUG""") withFallback PersistenceSpec.config("leveldb", "AtLeastOnceDeliverySpec")
)
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class InmemAtLeastOnceDeliverySpec extends AtLeastOnceDeliverySpec(PersistenceSpec.config("inmem", "AtLeastOnceDeliverySpec"))