From ba3543b0462b014ac6998bfb1589bd45a851322f Mon Sep 17 00:00:00 2001 From: Konrad 'ktoso' Malawski Date: Tue, 22 Jul 2014 12:18:06 +0200 Subject: [PATCH 1/3] =per #15575 lessen timing issue in AtLeastOnce spec * lower redelivery interval does not change semantics here (no one answers anyway), and lessens the changes to be "unlucky" with gc pauses * was unable to reproduce error with snapshotting in 80 builds on jenkins. Brainstormed possible errors but not found yet.. --- .../LevelDbBatchingBenchmark.scala | 108 ++++++++++++++++++ .../persistence/AtLeastOnceDeliverySpec.scala | 43 +++++-- 2 files changed, 140 insertions(+), 11 deletions(-) create mode 100644 akka-bench-jmh/src/main/scala/akka/persistence/LevelDbBatchingBenchmark.scala diff --git a/akka-bench-jmh/src/main/scala/akka/persistence/LevelDbBatchingBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/persistence/LevelDbBatchingBenchmark.scala new file mode 100644 index 0000000000..ced0104868 --- /dev/null +++ b/akka-bench-jmh/src/main/scala/akka/persistence/LevelDbBatchingBenchmark.scala @@ -0,0 +1,108 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.persistence + +import java.io.File +import java.util.concurrent.TimeUnit + +import akka.actor._ +import akka.persistence.journal.AsyncWriteTarget._ +import akka.persistence.journal.leveldb.{SharedLeveldbJournal, SharedLeveldbStore} +import akka.testkit.TestProbe +import org.apache.commons.io.FileUtils +import org.openjdk.jmh.annotations._ + +/* + # OS: OSX 10.9.3 + # CPU: Intel(R) Core(TM) i7-4850HQ CPU @ 2.30GHz + # Date: Mon Jul 23 11:07:42 CEST 2014 + + This bench emulates what we provide with "Processor batching". + As expected, batching writes is better than writing 1 by 1. + The important thing though is that there didn't appear to be any "write latency spikes" throughout this bench. + +[info] Benchmark Mode Samples Score Score error Units +[info] a.p.LevelDbBatchingBenchmark.write_1 avgt 20 0.799 0.011 ms/op +[info] a.p.LevelDbBatchingBenchmark.writeBatch_10 avgt 20 0.117 0.001 ms/op +[info] a.p.LevelDbBatchingBenchmark.writeBatch_100 avgt 20 0.050 0.000 ms/op +[info] a.p.LevelDbBatchingBenchmark.writeBatch_200 avgt 20 0.041 0.001 ms/op + */ +@Fork(1) +@Threads(10) +@State(Scope.Benchmark) +@BenchmarkMode(Array(Mode.AverageTime)) +class LevelDbBatchingBenchmark { + + var sys: ActorSystem = _ + var probe: TestProbe = _ + var store: ActorRef = _ + + val batch_1 = List.fill(1) { PersistentRepr("data", 12, "pa") } + val batch_10 = List.fill(10) { PersistentRepr("data", 12, "pa") } + val batch_100 = List.fill(100) { PersistentRepr("data", 12, "pa") } + val batch_200 = List.fill(200) { PersistentRepr("data", 12, "pa") } + + @Setup(Level.Trial) + def setup() { + sys = ActorSystem("sys") + deleteStorage(sys) + SharedLeveldbJournal.setStore(store, sys) + + probe = TestProbe()(sys) + store = sys.actorOf(Props[SharedLeveldbStore], "store") + } + + @TearDown(Level.Trial) + def tearDown() { + store ! PoisonPill + Thread.sleep(500) + + sys.shutdown() + sys.awaitTermination() + } + + @Benchmark + @Measurement(timeUnit = TimeUnit.MICROSECONDS) + @OperationsPerInvocation(1) + def write_1() = { + probe.send(store, WriteMessages(batch_1)) + probe.expectMsgType[Any] + } + + @Benchmark + @Measurement(timeUnit = TimeUnit.MICROSECONDS) + @OperationsPerInvocation(10) + def writeBatch_10() = { + probe.send(store, WriteMessages(batch_10)) + probe.expectMsgType[Any] + } + + @Benchmark + @Measurement(timeUnit = TimeUnit.MICROSECONDS) + @OperationsPerInvocation(100) + def writeBatch_100() = { + probe.send(store, WriteMessages(batch_100)) + probe.expectMsgType[Any] + } + + @Benchmark + @Measurement(timeUnit = TimeUnit.MICROSECONDS) + @OperationsPerInvocation(200) + def writeBatch_200() = { + probe.send(store, WriteMessages(batch_200)) + probe.expectMsgType[Any] + } + + // TOOLS + + private def deleteStorage(sys: ActorSystem) { + val storageLocations = List( + "akka.persistence.journal.leveldb.dir", + "akka.persistence.journal.leveldb-shared.store.dir", + "akka.persistence.snapshot-store.local.dir").map(s ⇒ new File(sys.settings.config.getString(s))) + + storageLocations.foreach(FileUtils.deleteDirectory) + } + +} diff --git a/akka-persistence/src/test/scala/akka/persistence/AtLeastOnceDeliverySpec.scala b/akka-persistence/src/test/scala/akka/persistence/AtLeastOnceDeliverySpec.scala index 5885e25aa0..b2aeaee4e3 100644 --- a/akka-persistence/src/test/scala/akka/persistence/AtLeastOnceDeliverySpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/AtLeastOnceDeliverySpec.scala @@ -3,14 +3,13 @@ */ package akka.persistence +import akka.actor._ +import akka.persistence.AtLeastOnceDelivery.{ AtLeastOnceDeliverySnapshot, UnconfirmedWarning } +import akka.testkit._ +import com.typesafe.config._ + import scala.concurrent.duration._ import scala.util.control.NoStackTrace -import com.typesafe.config._ -import akka.actor._ -import akka.testkit._ -import akka.persistence.AtLeastOnceDelivery.AtLeastOnceDeliverySnapshot -import akka.persistence.AtLeastOnceDelivery.UnconfirmedWarning -import akka.persistence.AtLeastOnceDelivery.UnconfirmedWarning object AtLeastOnceDeliverySpec { @@ -43,10 +42,16 @@ object AtLeastOnceDeliverySpec { override def persistenceId: String = name + // simplistic confirmation mechanism, to tell the requester that a snapshot succeeded + var lastSnapshotAskedForBy: Option[ActorRef] = None + def updateState(evt: Evt): Unit = evt match { case AcceptedReq(payload, destination) ⇒ + log.debug(s"deliver(destination, deliveryId ⇒ Action(deliveryId, $payload)), recovery: " + recoveryRunning) deliver(destination, deliveryId ⇒ Action(deliveryId, payload)) + case ReqDone(id) ⇒ + log.debug(s"confirmDelivery($id), recovery: " + recoveryRunning) confirmDelivery(id) } @@ -77,21 +82,30 @@ object AtLeastOnceDeliverySpec { persist(ReqDone(id)) { evt ⇒ updateState(evt) } case Boom ⇒ + log.debug("Boom!") throw new RuntimeException("boom") with NoStackTrace case SaveSnap ⇒ + log.debug("Save snapshot!") + lastSnapshotAskedForBy = Some(sender()) saveSnapshot(Snap(getDeliverySnapshot)) + case success: SaveSnapshotSuccess ⇒ + log.debug("Snapshot success!") + lastSnapshotAskedForBy.map(_ ! success) + case w: UnconfirmedWarning ⇒ + log.debug("Sender got unconfirmed warning {}", w) testActor ! w } def receiveRecover: Receive = { - case evt: Evt ⇒ updateState(evt) + case evt: Evt ⇒ + updateState(evt) + case SnapshotOffer(_, Snap(deliverySnapshot)) ⇒ setDeliverySnapshot(deliverySnapshot) - } } @@ -134,7 +148,7 @@ object AtLeastOnceDeliverySpec { } abstract class AtLeastOnceDeliverySpec(config: Config) extends AkkaSpec(config) with PersistenceSpec with ImplicitSender { - import AtLeastOnceDeliverySpec._ + import akka.persistence.AtLeastOnceDeliverySpec._ "AtLeastOnceDelivery" must { "deliver messages in order when nothing is lost" in { @@ -208,7 +222,7 @@ abstract class AtLeastOnceDeliverySpec(config: Config) extends AkkaSpec(config) val probeA = TestProbe() val dst = system.actorOf(destinationProps(probeA.ref)) val destinations = Map("A" -> system.actorOf(unreliableProps(3, dst)).path) - val snd = system.actorOf(senderProps(testActor, name, 500.millis, 5, async = false, destinations), name) + val snd = system.actorOf(senderProps(testActor, name, 1000.millis, 5, async = false, destinations), name) snd ! Req("a-1") expectMsg(ReqAck) probeA.expectMsg(Action(1, "a-1")) @@ -225,6 +239,9 @@ abstract class AtLeastOnceDeliverySpec(config: Config) extends AkkaSpec(config) // a-3 was lost probeA.expectMsg(Action(4, "a-4")) + // after snapshot succeeded + expectMsgType[SaveSnapshotSuccess] + // trigger restart snd ! Boom @@ -288,6 +305,10 @@ abstract class AtLeastOnceDeliverySpec(config: Config) extends AkkaSpec(config) } } -class LeveldbAtLeastOnceDeliverySpec extends AtLeastOnceDeliverySpec(PersistenceSpec.config("leveldb", "AtLeastOnceDeliverySpec")) +class LeveldbAtLeastOnceDeliverySpec extends AtLeastOnceDeliverySpec( + // TODO disable debug logging once happy with stability of this test + ConfigFactory.parseString("""akka.logLevel = DEBUG""") withFallback PersistenceSpec.config("leveldb", "AtLeastOnceDeliverySpec") +) + @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class InmemAtLeastOnceDeliverySpec extends AtLeastOnceDeliverySpec(PersistenceSpec.config("inmem", "AtLeastOnceDeliverySpec")) From ad2a80029817c0e7fbd6600d78ae356ab0bdafcb Mon Sep 17 00:00:00 2001 From: Konrad 'ktoso' Malawski Date: Wed, 23 Jul 2014 12:15:07 +0200 Subject: [PATCH 2/3] =per #15559 use the same config as all other peristence tests * this includes the default expect timeout of 10s (instead of 3s) --- .../leveldb/LeveldbJournalJavaSpec.scala | 18 +++++++----------- .../LeveldbJournalNativePerfSpec.scala | 18 +++++++----------- .../leveldb/LeveldbJournalNativeSpec.scala | 19 ++++++++----------- 3 files changed, 22 insertions(+), 33 deletions(-) diff --git a/akka-persistence-tck/src/test/scala/akka/persistence/journal/leveldb/LeveldbJournalJavaSpec.scala b/akka-persistence-tck/src/test/scala/akka/persistence/journal/leveldb/LeveldbJournalJavaSpec.scala index bb03a9955b..71125d1a75 100644 --- a/akka-persistence-tck/src/test/scala/akka/persistence/journal/leveldb/LeveldbJournalJavaSpec.scala +++ b/akka-persistence-tck/src/test/scala/akka/persistence/journal/leveldb/LeveldbJournalJavaSpec.scala @@ -1,16 +1,12 @@ package akka.persistence.journal.leveldb -import com.typesafe.config.ConfigFactory - -import akka.persistence.journal.{ JournalPerfSpec, JournalSpec } -import akka.persistence.PluginCleanup +import akka.persistence.journal.JournalSpec +import akka.persistence.{PersistenceSpec, PluginCleanup} class LeveldbJournalJavaSpec extends JournalSpec with PluginCleanup { - lazy val config = ConfigFactory.parseString( - """ - |akka.persistence.journal.plugin = "akka.persistence.journal.leveldb" - |akka.persistence.journal.leveldb.native = off - |akka.persistence.journal.leveldb.dir = "target/journal-java" - |akka.persistence.snapshot-store.local.dir = "target/snapshots-java/" - """.stripMargin) + lazy val config = PersistenceSpec.config( + "leveldb", + "LeveldbJournalJavaSpec", + extraConfig = Some("akka.persistence.journal.leveldb.native = off") + ) } diff --git a/akka-persistence-tck/src/test/scala/akka/persistence/journal/leveldb/LeveldbJournalNativePerfSpec.scala b/akka-persistence-tck/src/test/scala/akka/persistence/journal/leveldb/LeveldbJournalNativePerfSpec.scala index bdcc391e5c..59d3823bc2 100644 --- a/akka-persistence-tck/src/test/scala/akka/persistence/journal/leveldb/LeveldbJournalNativePerfSpec.scala +++ b/akka-persistence-tck/src/test/scala/akka/persistence/journal/leveldb/LeveldbJournalNativePerfSpec.scala @@ -3,19 +3,15 @@ */ package akka.persistence.journal.leveldb -import com.typesafe.config.ConfigFactory - -import akka.persistence.journal.{ JournalPerfSpec, JournalSpec } -import akka.persistence.PluginCleanup +import akka.persistence.journal.{JournalPerfSpec, JournalSpec} +import akka.persistence.{PersistenceSpec, PluginCleanup} import org.scalatest.DoNotDiscover @DoNotDiscover // because only checking that compilation is OK with JournalPerfSpec class LeveldbJournalNativePerfSpec extends JournalSpec with JournalPerfSpec with PluginCleanup { - lazy val config = ConfigFactory.parseString( - """ - |akka.persistence.journal.plugin = "akka.persistence.journal.leveldb" - |akka.persistence.journal.leveldb.native = on - |akka.persistence.journal.leveldb.dir = "target/journal-native" - |akka.persistence.snapshot-store.local.dir = "target/snapshots-native/" - """.stripMargin) + lazy val config = PersistenceSpec.config( + "leveldb", + "LeveldbJournalNativePerfSpec", + extraConfig = Some("akka.persistence.journal.leveldb.native = on") + ) } diff --git a/akka-persistence-tck/src/test/scala/akka/persistence/journal/leveldb/LeveldbJournalNativeSpec.scala b/akka-persistence-tck/src/test/scala/akka/persistence/journal/leveldb/LeveldbJournalNativeSpec.scala index c83f6a9e68..6e41d20c69 100644 --- a/akka-persistence-tck/src/test/scala/akka/persistence/journal/leveldb/LeveldbJournalNativeSpec.scala +++ b/akka-persistence-tck/src/test/scala/akka/persistence/journal/leveldb/LeveldbJournalNativeSpec.scala @@ -1,16 +1,13 @@ package akka.persistence.journal.leveldb -import com.typesafe.config.ConfigFactory - -import akka.persistence.journal.{ JournalPerfSpec, JournalSpec } -import akka.persistence.PluginCleanup +import akka.persistence.journal.JournalSpec +import akka.persistence.{PersistenceSpec, PluginCleanup} class LeveldbJournalNativeSpec extends JournalSpec with PluginCleanup { - lazy val config = ConfigFactory.parseString( - """ - |akka.persistence.journal.plugin = "akka.persistence.journal.leveldb" - |akka.persistence.journal.leveldb.native = on - |akka.persistence.journal.leveldb.dir = "target/journal-native" - |akka.persistence.snapshot-store.local.dir = "target/snapshots-native/" - """.stripMargin) + lazy val config = PersistenceSpec.config( + "leveldb", + "LeveldbJournalNativeSpec", + extraConfig = Some("akka.persistence.journal.leveldb.native = on") + ) + } From c76b8a0338d7d2017ee3b2c7b4368b8d205d5d38 Mon Sep 17 00:00:00 2001 From: Konrad 'ktoso' Malawski Date: Wed, 23 Jul 2014 14:03:19 +0200 Subject: [PATCH 3/3] =con #15574 impr TimerBasedThrottlerSpec stability * the lower bound was rather racy, depends on "where in it's Tick" time the throtteler currently was. In general the upper bound is also not exact, but "good enough" because the `.5` is an estimation of "the throtteler must finish it's previous tick, and then it sends the data" --- .../scala/akka/contrib/throttle/TimerBasedThrottlerSpec.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/akka-contrib/src/test/scala/akka/contrib/throttle/TimerBasedThrottlerSpec.scala b/akka-contrib/src/test/scala/akka/contrib/throttle/TimerBasedThrottlerSpec.scala index f549dc97f1..a739ecc48e 100644 --- a/akka-contrib/src/test/scala/akka/contrib/throttle/TimerBasedThrottlerSpec.scala +++ b/akka-contrib/src/test/scala/akka/contrib/throttle/TimerBasedThrottlerSpec.scala @@ -87,7 +87,7 @@ class TimerBasedThrottlerSpec extends TestKit(ActorSystem("TimerBasedThrottlerSp expectNoMsg(1 second) throttler ! SetTarget(Some(echo)) 4 to 7 foreach { throttler ! _ } - within(0.5 seconds, 1.5 seconds) { + within(1.5 seconds) { 4 to 7 foreach { expectMsg(_) } } } @@ -104,7 +104,7 @@ class TimerBasedThrottlerSpec extends TestKit(ActorSystem("TimerBasedThrottlerSp } expectNoMsg(1 second) throttler ! SetTarget(Some(echo)) - within(0.5 seconds, 1.5 seconds) { + within(1.5 seconds) { 4 to 7 foreach { expectMsg(_) } } }