diff --git a/akka-bench-jmh/src/main/scala/akka/persistence/LevelDbBatchingBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/persistence/LevelDbBatchingBenchmark.scala new file mode 100644 index 0000000000..ced0104868 --- /dev/null +++ b/akka-bench-jmh/src/main/scala/akka/persistence/LevelDbBatchingBenchmark.scala @@ -0,0 +1,108 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.persistence + +import java.io.File +import java.util.concurrent.TimeUnit + +import akka.actor._ +import akka.persistence.journal.AsyncWriteTarget._ +import akka.persistence.journal.leveldb.{SharedLeveldbJournal, SharedLeveldbStore} +import akka.testkit.TestProbe +import org.apache.commons.io.FileUtils +import org.openjdk.jmh.annotations._ + +/* + # OS: OSX 10.9.3 + # CPU: Intel(R) Core(TM) i7-4850HQ CPU @ 2.30GHz + # Date: Mon Jul 23 11:07:42 CEST 2014 + + This bench emulates what we provide with "Processor batching". + As expected, batching writes is better than writing 1 by 1. + The important thing though is that there didn't appear to be any "write latency spikes" throughout this bench. + +[info] Benchmark Mode Samples Score Score error Units +[info] a.p.LevelDbBatchingBenchmark.write_1 avgt 20 0.799 0.011 ms/op +[info] a.p.LevelDbBatchingBenchmark.writeBatch_10 avgt 20 0.117 0.001 ms/op +[info] a.p.LevelDbBatchingBenchmark.writeBatch_100 avgt 20 0.050 0.000 ms/op +[info] a.p.LevelDbBatchingBenchmark.writeBatch_200 avgt 20 0.041 0.001 ms/op + */ +@Fork(1) +@Threads(10) +@State(Scope.Benchmark) +@BenchmarkMode(Array(Mode.AverageTime)) +class LevelDbBatchingBenchmark { + + var sys: ActorSystem = _ + var probe: TestProbe = _ + var store: ActorRef = _ + + val batch_1 = List.fill(1) { PersistentRepr("data", 12, "pa") } + val batch_10 = List.fill(10) { PersistentRepr("data", 12, "pa") } + val batch_100 = List.fill(100) { PersistentRepr("data", 12, "pa") } + val batch_200 = List.fill(200) { PersistentRepr("data", 12, "pa") } + + @Setup(Level.Trial) + def setup() { + sys = ActorSystem("sys") + deleteStorage(sys) + SharedLeveldbJournal.setStore(store, sys) + + probe = TestProbe()(sys) + store = sys.actorOf(Props[SharedLeveldbStore], "store") + } + + @TearDown(Level.Trial) + def tearDown() { + store ! PoisonPill + Thread.sleep(500) + + sys.shutdown() + sys.awaitTermination() + } + + @Benchmark + @Measurement(timeUnit = TimeUnit.MICROSECONDS) + @OperationsPerInvocation(1) + def write_1() = { + probe.send(store, WriteMessages(batch_1)) + probe.expectMsgType[Any] + } + + @Benchmark + @Measurement(timeUnit = TimeUnit.MICROSECONDS) + @OperationsPerInvocation(10) + def writeBatch_10() = { + probe.send(store, WriteMessages(batch_10)) + probe.expectMsgType[Any] + } + + @Benchmark + @Measurement(timeUnit = TimeUnit.MICROSECONDS) + @OperationsPerInvocation(100) + def writeBatch_100() = { + probe.send(store, WriteMessages(batch_100)) + probe.expectMsgType[Any] + } + + @Benchmark + @Measurement(timeUnit = TimeUnit.MICROSECONDS) + @OperationsPerInvocation(200) + def writeBatch_200() = { + probe.send(store, WriteMessages(batch_200)) + probe.expectMsgType[Any] + } + + // TOOLS + + private def deleteStorage(sys: ActorSystem) { + val storageLocations = List( + "akka.persistence.journal.leveldb.dir", + "akka.persistence.journal.leveldb-shared.store.dir", + "akka.persistence.snapshot-store.local.dir").map(s ⇒ new File(sys.settings.config.getString(s))) + + storageLocations.foreach(FileUtils.deleteDirectory) + } + +} diff --git a/akka-contrib/src/test/scala/akka/contrib/throttle/TimerBasedThrottlerSpec.scala b/akka-contrib/src/test/scala/akka/contrib/throttle/TimerBasedThrottlerSpec.scala index f549dc97f1..a739ecc48e 100644 --- a/akka-contrib/src/test/scala/akka/contrib/throttle/TimerBasedThrottlerSpec.scala +++ b/akka-contrib/src/test/scala/akka/contrib/throttle/TimerBasedThrottlerSpec.scala @@ -87,7 +87,7 @@ class TimerBasedThrottlerSpec extends TestKit(ActorSystem("TimerBasedThrottlerSp expectNoMsg(1 second) throttler ! SetTarget(Some(echo)) 4 to 7 foreach { throttler ! _ } - within(0.5 seconds, 1.5 seconds) { + within(1.5 seconds) { 4 to 7 foreach { expectMsg(_) } } } @@ -104,7 +104,7 @@ class TimerBasedThrottlerSpec extends TestKit(ActorSystem("TimerBasedThrottlerSp } expectNoMsg(1 second) throttler ! SetTarget(Some(echo)) - within(0.5 seconds, 1.5 seconds) { + within(1.5 seconds) { 4 to 7 foreach { expectMsg(_) } } } diff --git a/akka-persistence-tck/src/test/scala/akka/persistence/journal/leveldb/LeveldbJournalJavaSpec.scala b/akka-persistence-tck/src/test/scala/akka/persistence/journal/leveldb/LeveldbJournalJavaSpec.scala index bb03a9955b..71125d1a75 100644 --- a/akka-persistence-tck/src/test/scala/akka/persistence/journal/leveldb/LeveldbJournalJavaSpec.scala +++ b/akka-persistence-tck/src/test/scala/akka/persistence/journal/leveldb/LeveldbJournalJavaSpec.scala @@ -1,16 +1,12 @@ package akka.persistence.journal.leveldb -import com.typesafe.config.ConfigFactory - -import akka.persistence.journal.{ JournalPerfSpec, JournalSpec } -import akka.persistence.PluginCleanup +import akka.persistence.journal.JournalSpec +import akka.persistence.{PersistenceSpec, PluginCleanup} class LeveldbJournalJavaSpec extends JournalSpec with PluginCleanup { - lazy val config = ConfigFactory.parseString( - """ - |akka.persistence.journal.plugin = "akka.persistence.journal.leveldb" - |akka.persistence.journal.leveldb.native = off - |akka.persistence.journal.leveldb.dir = "target/journal-java" - |akka.persistence.snapshot-store.local.dir = "target/snapshots-java/" - """.stripMargin) + lazy val config = PersistenceSpec.config( + "leveldb", + "LeveldbJournalJavaSpec", + extraConfig = Some("akka.persistence.journal.leveldb.native = off") + ) } diff --git a/akka-persistence-tck/src/test/scala/akka/persistence/journal/leveldb/LeveldbJournalNativePerfSpec.scala b/akka-persistence-tck/src/test/scala/akka/persistence/journal/leveldb/LeveldbJournalNativePerfSpec.scala index bdcc391e5c..59d3823bc2 100644 --- a/akka-persistence-tck/src/test/scala/akka/persistence/journal/leveldb/LeveldbJournalNativePerfSpec.scala +++ b/akka-persistence-tck/src/test/scala/akka/persistence/journal/leveldb/LeveldbJournalNativePerfSpec.scala @@ -3,19 +3,15 @@ */ package akka.persistence.journal.leveldb -import com.typesafe.config.ConfigFactory - -import akka.persistence.journal.{ JournalPerfSpec, JournalSpec } -import akka.persistence.PluginCleanup +import akka.persistence.journal.{JournalPerfSpec, JournalSpec} +import akka.persistence.{PersistenceSpec, PluginCleanup} import org.scalatest.DoNotDiscover @DoNotDiscover // because only checking that compilation is OK with JournalPerfSpec class LeveldbJournalNativePerfSpec extends JournalSpec with JournalPerfSpec with PluginCleanup { - lazy val config = ConfigFactory.parseString( - """ - |akka.persistence.journal.plugin = "akka.persistence.journal.leveldb" - |akka.persistence.journal.leveldb.native = on - |akka.persistence.journal.leveldb.dir = "target/journal-native" - |akka.persistence.snapshot-store.local.dir = "target/snapshots-native/" - """.stripMargin) + lazy val config = PersistenceSpec.config( + "leveldb", + "LeveldbJournalNativePerfSpec", + extraConfig = Some("akka.persistence.journal.leveldb.native = on") + ) } diff --git a/akka-persistence-tck/src/test/scala/akka/persistence/journal/leveldb/LeveldbJournalNativeSpec.scala b/akka-persistence-tck/src/test/scala/akka/persistence/journal/leveldb/LeveldbJournalNativeSpec.scala index c83f6a9e68..6e41d20c69 100644 --- a/akka-persistence-tck/src/test/scala/akka/persistence/journal/leveldb/LeveldbJournalNativeSpec.scala +++ b/akka-persistence-tck/src/test/scala/akka/persistence/journal/leveldb/LeveldbJournalNativeSpec.scala @@ -1,16 +1,13 @@ package akka.persistence.journal.leveldb -import com.typesafe.config.ConfigFactory - -import akka.persistence.journal.{ JournalPerfSpec, JournalSpec } -import akka.persistence.PluginCleanup +import akka.persistence.journal.JournalSpec +import akka.persistence.{PersistenceSpec, PluginCleanup} class LeveldbJournalNativeSpec extends JournalSpec with PluginCleanup { - lazy val config = ConfigFactory.parseString( - """ - |akka.persistence.journal.plugin = "akka.persistence.journal.leveldb" - |akka.persistence.journal.leveldb.native = on - |akka.persistence.journal.leveldb.dir = "target/journal-native" - |akka.persistence.snapshot-store.local.dir = "target/snapshots-native/" - """.stripMargin) + lazy val config = PersistenceSpec.config( + "leveldb", + "LeveldbJournalNativeSpec", + extraConfig = Some("akka.persistence.journal.leveldb.native = on") + ) + } diff --git a/akka-persistence/src/test/scala/akka/persistence/AtLeastOnceDeliverySpec.scala b/akka-persistence/src/test/scala/akka/persistence/AtLeastOnceDeliverySpec.scala index 5885e25aa0..b2aeaee4e3 100644 --- a/akka-persistence/src/test/scala/akka/persistence/AtLeastOnceDeliverySpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/AtLeastOnceDeliverySpec.scala @@ -3,14 +3,13 @@ */ package akka.persistence +import akka.actor._ +import akka.persistence.AtLeastOnceDelivery.{ AtLeastOnceDeliverySnapshot, UnconfirmedWarning } +import akka.testkit._ +import com.typesafe.config._ + import scala.concurrent.duration._ import scala.util.control.NoStackTrace -import com.typesafe.config._ -import akka.actor._ -import akka.testkit._ -import akka.persistence.AtLeastOnceDelivery.AtLeastOnceDeliverySnapshot -import akka.persistence.AtLeastOnceDelivery.UnconfirmedWarning -import akka.persistence.AtLeastOnceDelivery.UnconfirmedWarning object AtLeastOnceDeliverySpec { @@ -43,10 +42,16 @@ object AtLeastOnceDeliverySpec { override def persistenceId: String = name + // simplistic confirmation mechanism, to tell the requester that a snapshot succeeded + var lastSnapshotAskedForBy: Option[ActorRef] = None + def updateState(evt: Evt): Unit = evt match { case AcceptedReq(payload, destination) ⇒ + log.debug(s"deliver(destination, deliveryId ⇒ Action(deliveryId, $payload)), recovery: " + recoveryRunning) deliver(destination, deliveryId ⇒ Action(deliveryId, payload)) + case ReqDone(id) ⇒ + log.debug(s"confirmDelivery($id), recovery: " + recoveryRunning) confirmDelivery(id) } @@ -77,21 +82,30 @@ object AtLeastOnceDeliverySpec { persist(ReqDone(id)) { evt ⇒ updateState(evt) } case Boom ⇒ + log.debug("Boom!") throw new RuntimeException("boom") with NoStackTrace case SaveSnap ⇒ + log.debug("Save snapshot!") + lastSnapshotAskedForBy = Some(sender()) saveSnapshot(Snap(getDeliverySnapshot)) + case success: SaveSnapshotSuccess ⇒ + log.debug("Snapshot success!") + lastSnapshotAskedForBy.map(_ ! success) + case w: UnconfirmedWarning ⇒ + log.debug("Sender got unconfirmed warning {}", w) testActor ! w } def receiveRecover: Receive = { - case evt: Evt ⇒ updateState(evt) + case evt: Evt ⇒ + updateState(evt) + case SnapshotOffer(_, Snap(deliverySnapshot)) ⇒ setDeliverySnapshot(deliverySnapshot) - } } @@ -134,7 +148,7 @@ object AtLeastOnceDeliverySpec { } abstract class AtLeastOnceDeliverySpec(config: Config) extends AkkaSpec(config) with PersistenceSpec with ImplicitSender { - import AtLeastOnceDeliverySpec._ + import akka.persistence.AtLeastOnceDeliverySpec._ "AtLeastOnceDelivery" must { "deliver messages in order when nothing is lost" in { @@ -208,7 +222,7 @@ abstract class AtLeastOnceDeliverySpec(config: Config) extends AkkaSpec(config) val probeA = TestProbe() val dst = system.actorOf(destinationProps(probeA.ref)) val destinations = Map("A" -> system.actorOf(unreliableProps(3, dst)).path) - val snd = system.actorOf(senderProps(testActor, name, 500.millis, 5, async = false, destinations), name) + val snd = system.actorOf(senderProps(testActor, name, 1000.millis, 5, async = false, destinations), name) snd ! Req("a-1") expectMsg(ReqAck) probeA.expectMsg(Action(1, "a-1")) @@ -225,6 +239,9 @@ abstract class AtLeastOnceDeliverySpec(config: Config) extends AkkaSpec(config) // a-3 was lost probeA.expectMsg(Action(4, "a-4")) + // after snapshot succeeded + expectMsgType[SaveSnapshotSuccess] + // trigger restart snd ! Boom @@ -288,6 +305,10 @@ abstract class AtLeastOnceDeliverySpec(config: Config) extends AkkaSpec(config) } } -class LeveldbAtLeastOnceDeliverySpec extends AtLeastOnceDeliverySpec(PersistenceSpec.config("leveldb", "AtLeastOnceDeliverySpec")) +class LeveldbAtLeastOnceDeliverySpec extends AtLeastOnceDeliverySpec( + // TODO disable debug logging once happy with stability of this test + ConfigFactory.parseString("""akka.logLevel = DEBUG""") withFallback PersistenceSpec.config("leveldb", "AtLeastOnceDeliverySpec") +) + @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class InmemAtLeastOnceDeliverySpec extends AtLeastOnceDeliverySpec(PersistenceSpec.config("inmem", "AtLeastOnceDeliverySpec"))