From 8af12295ebb1468dc501ad0f71574709abc53a4c Mon Sep 17 00:00:00 2001 From: Oleksii Tkachuk Date: Thu, 3 May 2018 06:30:00 -0500 Subject: [PATCH] Issue 24687: Transfer of PerformanceSpec --- ...EventsourcedStashReferenceManagement.scala | 39 +++++ .../internal/PersistentBehaviorImpl.scala | 47 +++--- ...tsourcedStashReferenceManagementTest.scala | 82 +++++++++++ .../typed/scaladsl/PerformanceSpec.scala | 138 ++++++++++++++++++ build.sbt | 1 + 5 files changed, 286 insertions(+), 21 deletions(-) create mode 100644 akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedStashReferenceManagement.scala create mode 100644 akka-persistence-typed/src/test/scala/akka/persistence/typed/internal/EventsourcedStashReferenceManagementTest.scala create mode 100644 akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PerformanceSpec.scala diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedStashReferenceManagement.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedStashReferenceManagement.scala new file mode 100644 index 0000000000..9087c7be9f --- /dev/null +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedStashReferenceManagement.scala @@ -0,0 +1,39 @@ +/* + * Copyright (C) 2018 Lightbend Inc. + */ + +package akka.persistence.typed.internal + +import akka.actor.typed.scaladsl.{ ActorContext, Behaviors, StashBuffer } +import akka.actor.typed.{ PostStop, Signal } +import akka.annotation.InternalApi +import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol +import akka.util.OptionVal + +/** + * Main reason for introduction of this trait is stash buffer reference management + * in order to survive restart of internal behavior + */ +@InternalApi +trait EventsourcedStashReferenceManagement { + + private var stashBuffer: OptionVal[StashBuffer[InternalProtocol]] = OptionVal.None + + def stashBuffer(settings: EventsourcedSettings): StashBuffer[InternalProtocol] = { + val buffer: StashBuffer[InternalProtocol] = stashBuffer match { + case OptionVal.Some(value) ⇒ value + case _ ⇒ StashBuffer(settings.stashCapacity) + } + this.stashBuffer = OptionVal.Some(buffer) + stashBuffer.get + } + + def onSignalCleanup: (ActorContext[InternalProtocol], Signal) ⇒ Unit = { + case (ctx, PostStop) ⇒ + stashBuffer match { + case OptionVal.Some(buffer) ⇒ buffer.unstashAll(ctx, Behaviors.ignore) + case _ ⇒ Unit + } + case _ ⇒ Unit + } +} diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/PersistentBehaviorImpl.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/PersistentBehaviorImpl.scala index c6b8058364..f529832f7f 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/PersistentBehaviorImpl.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/PersistentBehaviorImpl.scala @@ -6,7 +6,7 @@ package akka.persistence.typed.internal import akka.actor.typed import akka.actor.typed.Behavior -import akka.actor.typed.scaladsl.{ ActorContext, Behaviors, StashBuffer } +import akka.actor.typed.scaladsl.{ ActorContext, Behaviors } import akka.annotation.InternalApi import akka.persistence._ import akka.persistence.typed.internal.EventsourcedBehavior.{ InternalProtocol, WriterIdentity } @@ -25,32 +25,37 @@ private[akka] final case class PersistentBehaviorImpl[Command, Event, State]( tagger: Event ⇒ Set[String] = (_: Event) ⇒ Set.empty[String], snapshotWhen: (State, Event, Long) ⇒ Boolean = ConstantFun.scalaAnyThreeToFalse, recovery: Recovery = Recovery() -) extends PersistentBehavior[Command, Event, State] { +) extends PersistentBehavior[Command, Event, State] with EventsourcedStashReferenceManagement { override def apply(context: typed.ActorContext[Command]): Behavior[Command] = { Behaviors.setup[EventsourcedBehavior.InternalProtocol] { ctx ⇒ Behaviors.withTimers { timers ⇒ val settings = EventsourcedSettings(ctx.system) - .withJournalPluginId(journalPluginId.getOrElse("")) - .withSnapshotPluginId(snapshotPluginId.getOrElse("")) - val setup = new EventsourcedSetup( - ctx, - timers, - persistenceId, - initialState, - commandHandler, - eventHandler, - WriterIdentity.newIdentity(), - recoveryCompleted, - tagger, - snapshotWhen, - recovery, - holdingRecoveryPermit = false, - settings = settings, - internalStash = StashBuffer(settings.stashCapacity) - ) + val internalStash = stashBuffer(settings) + Behaviors.tap( + onMessage = (_, _) ⇒ Unit, + onSignal = onSignalCleanup, + behavior = { + val setup = new EventsourcedSetup( + ctx, + timers, + persistenceId, + initialState, + commandHandler, + eventHandler, + WriterIdentity.newIdentity(), + recoveryCompleted, + tagger, + snapshotWhen, + recovery, + holdingRecoveryPermit = false, + settings = settings, + internalStash = internalStash + ) - EventsourcedRequestingRecoveryPermit(setup) + EventsourcedRequestingRecoveryPermit(setup) + } + ) } }.widen[Any] { case res: JournalProtocol.Response ⇒ InternalProtocol.JournalResponse(res) diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/internal/EventsourcedStashReferenceManagementTest.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/internal/EventsourcedStashReferenceManagementTest.scala new file mode 100644 index 0000000000..036ba7fc18 --- /dev/null +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/internal/EventsourcedStashReferenceManagementTest.scala @@ -0,0 +1,82 @@ +/* + * Copyright (C) 2018 Lightbend Inc. + */ + +package akka.persistence.typed.internal + +import akka.actor.typed.scaladsl.Behaviors +import akka.actor.typed.{ Behavior, Signal, TypedAkkaSpecWithShutdown } +import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol +import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol.{ IncomingCommand, RecoveryPermitGranted } +import akka.testkit.typed.scaladsl.{ ActorTestKit, TestProbe } + +import scala.concurrent.duration.{ FiniteDuration, _ } + +class EventsourcedStashReferenceManagementTest extends ActorTestKit with TypedAkkaSpecWithShutdown { + + case class Impl() extends EventsourcedStashReferenceManagement + + "EventsourcedStashReferenceManagement instance" should { + "initialize stash only once" in { + val ref = Impl() + assert(ref.stashBuffer(dummySettings()).eq(ref.stashBuffer(dummySettings()))) + } + // or should we? + "not reinitialize when capacity changes" in { + val ref = Impl() + assert(ref.stashBuffer(dummySettings()).eq(ref.stashBuffer(dummySettings(21)))) + } + "clear buffer on PostStop" in { + val probe = TestProbe[Int]() + val behavior = TestBehavior(probe) + val ref = spawn(behavior) + ref ! RecoveryPermitGranted + ref ! RecoveryPermitGranted + probe.expectMessage(1) + probe.expectMessage(2) + ref ! IncomingCommand("bye") + probe.expectTerminated(ref, 100.millis) + + spawn(behavior) ! RecoveryPermitGranted + probe.expectMessage(1) + } + } + + object TestBehavior extends EventsourcedStashReferenceManagement { + + def apply(probe: TestProbe[Int]): Behavior[InternalProtocol] = { + val settings = dummySettings() + Behaviors.setup[InternalProtocol](ctx ⇒ + Behaviors.receiveMessagePartial[InternalProtocol] { + case RecoveryPermitGranted ⇒ + stashBuffer(settings).stash(RecoveryPermitGranted) + probe.ref ! stashBuffer(settings).size + Behaviors.same[InternalProtocol] + case _: IncomingCommand[_] ⇒ Behaviors.stopped + }.receiveSignal { + case (_, signal: Signal) ⇒ + onSignalCleanup.apply(ctx, signal); Behaviors.stopped[InternalProtocol] + } + ) + } + } + + private def dummySettings(capacity: Int = 42) = new EventsourcedSettings { + + override def stashCapacity: Int = capacity + + override def logOnStashing: Boolean = ??? + + override def stashOverflowStrategyConfigurator: String = ??? + + override def recoveryEventTimeout: FiniteDuration = ??? + + override def journalPluginId: String = ??? + + override def withJournalPluginId(id: String): EventsourcedSettings = ??? + + override def snapshotPluginId: String = ??? + + override def withSnapshotPluginId(id: String): EventsourcedSettings = ??? + } +} diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PerformanceSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PerformanceSpec.scala new file mode 100644 index 0000000000..b454674804 --- /dev/null +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PerformanceSpec.scala @@ -0,0 +1,138 @@ +/* + * Copyright (C) 2018 Lightbend Inc. + */ + +package akka.persistence.typed.scaladsl + +import java.util.UUID + +import akka.actor.typed.scaladsl.Behaviors +import akka.actor.typed.{ ActorRef, SupervisorStrategy, TypedAkkaSpecWithShutdown } +import akka.persistence.typed.scaladsl.PersistentBehaviors.CommandHandler +import akka.testkit.typed.TE +import akka.testkit.typed.scaladsl.{ ActorTestKit, TestProbe } +import com.typesafe.config.{ Config, ConfigFactory } +import org.scalatest.concurrent.Eventually + +import scala.concurrent.duration._ + +object PerformanceSpec { + + val config = + """ + akka.persistence.performance.cycles.load = 100 + # more accurate throughput measurements + #akka.persistence.performance.cycles.load = 200000 + """ + + sealed trait Command + + case object StopMeasure extends Command + + case class FailAt(sequence: Long) extends Command + + case class CommandWithEvent(evt: String) extends Command + + class Measure(numberOfMessages: Int) { + private val NanoToSecond = 1000.0 * 1000 * 1000 + + private var startTime: Long = 0L + private var stopTime: Long = 0L + + def startMeasure(): Unit = { + startTime = System.nanoTime + } + + def stopMeasure(): Double = { + stopTime = System.nanoTime + NanoToSecond * numberOfMessages / (stopTime - startTime) + } + } + + case class Parameters(var persistCalls: Long = 0L, var failAt: Long = -1) { + def every(num: Long): Boolean = persistCalls % num == 0 + + def shouldFail: Boolean = persistCalls == failAt + + def failureWasDefined: Boolean = failAt != -1L + } + + def behavior(name: String, probe: TestProbe[Command])(other: (Command, Parameters) ⇒ Effect[String, String]) = { + Behaviors.supervise({ + val parameters = Parameters() + PersistentBehaviors.receive[Command, String, String]( + persistenceId = name, + "", + commandHandler = CommandHandler.command { + case StopMeasure ⇒ Effect.none.andThen(probe.ref ! StopMeasure) + case FailAt(sequence) ⇒ Effect.none.andThen(_ ⇒ parameters.failAt = sequence) + case command ⇒ other(command, parameters) + }, + eventHandler = { + case (state, _) ⇒ state + } + ).onRecoveryCompleted { + case (_, _) ⇒ if (parameters.every(1000)) print("r") + } + }).onFailure(SupervisorStrategy.restart) + } + + def eventSourcedTestPersistenceBehavior(name: String, probe: TestProbe[Command]) = + behavior(name, probe) { + case (CommandWithEvent(evt), parameters) ⇒ + Effect.persist(evt).andThen({ + parameters.persistCalls += 1 + if (parameters.every(1000)) print(".") + if (parameters.shouldFail) throw TE("boom") + }) + case _ ⇒ Effect.none + } +} + +class PerformanceSpec extends ActorTestKit with TypedAkkaSpecWithShutdown with Eventually { + + import PerformanceSpec._ + + val loadCycles = system.settings.config.getInt("akka.persistence.performance.cycles.load") + + override def config: Config = + ConfigFactory.parseString( + s""" + akka.actor.serialize-creators = off + akka.actor.serialize-messages = off + akka.actor.warn-about-java-serializer-usage = off + akka.persistence.publish-plugin-commands = on + akka.persistence.journal.plugin = "akka.persistence.journal.leveldb" + akka.persistence.journal.leveldb.dir = "target/journal-PerformanceSpec" + akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local" + akka.persistence.snapshot-store.local.dir = "target/snapshots-PerformanceSpec/" + akka.test.single-expect-default = 10s + """).withFallback(ConfigFactory.parseString(PerformanceSpec.config)) + + def stressPersistentActor(persistentActor: ActorRef[Command], probe: TestProbe[Command], + failAt: Option[Long], description: String): Unit = { + failAt foreach { persistentActor ! FailAt(_) } + val m = new Measure(loadCycles) + m.startMeasure() + 1 to loadCycles foreach { i ⇒ persistentActor ! CommandWithEvent(s"msg$i") } + persistentActor ! StopMeasure + probe.expectMessage(100.seconds, StopMeasure) + println(f"\nthroughput = ${m.stopMeasure()}%.2f $description per second") + } + + def stressEventSourcedPersistentActor(failAt: Option[Long]): Unit = { + val probe = TestProbe[Command] + val name = s"${this.getClass.getSimpleName}-${UUID.randomUUID().toString}" + val persistentActor = spawn(eventSourcedTestPersistenceBehavior(name, probe), name) + stressPersistentActor(persistentActor, probe, failAt, "persistent events") + } + + "An event sourced persistent actor" should { + "have some reasonable throughput" in { + stressEventSourcedPersistentActor(None) + } + "have some reasonable throughput under failure conditions" in { + stressEventSourcedPersistentActor(Some(loadCycles / 10)) + } + } +} diff --git a/build.sbt b/build.sbt index 12033db6cc..6fba526e37 100644 --- a/build.sbt +++ b/build.sbt @@ -400,6 +400,7 @@ lazy val persistenceTyped = akkaModule("akka-persistence-typed") typedTestkit % "test->test", actorTypedTests % "test->test" ) + .settings(Dependencies.persistenceShared) .settings(AkkaBuild.mayChangeSettings) .settings(AutomaticModuleName.settings("akka.persistence.typed")) .disablePlugins(MimaPlugin)