Issue 24687: Transfer of PerformanceSpec

This commit is contained in:
Oleksii Tkachuk 2018-05-03 06:30:00 -05:00 committed by Johan Andrén
parent dfd8d8aa81
commit 8af12295eb
5 changed files with 286 additions and 21 deletions

View file

@ -0,0 +1,39 @@
/*
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.typed.internal
import akka.actor.typed.scaladsl.{ ActorContext, Behaviors, StashBuffer }
import akka.actor.typed.{ PostStop, Signal }
import akka.annotation.InternalApi
import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol
import akka.util.OptionVal
/**
* Main reason for introduction of this trait is stash buffer reference management
* in order to survive restart of internal behavior
*/
@InternalApi
trait EventsourcedStashReferenceManagement {
private var stashBuffer: OptionVal[StashBuffer[InternalProtocol]] = OptionVal.None
def stashBuffer(settings: EventsourcedSettings): StashBuffer[InternalProtocol] = {
val buffer: StashBuffer[InternalProtocol] = stashBuffer match {
case OptionVal.Some(value) value
case _ StashBuffer(settings.stashCapacity)
}
this.stashBuffer = OptionVal.Some(buffer)
stashBuffer.get
}
def onSignalCleanup: (ActorContext[InternalProtocol], Signal) Unit = {
case (ctx, PostStop)
stashBuffer match {
case OptionVal.Some(buffer) buffer.unstashAll(ctx, Behaviors.ignore)
case _ Unit
}
case _ Unit
}
}

View file

@ -6,7 +6,7 @@ package akka.persistence.typed.internal
import akka.actor.typed
import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.{ ActorContext, Behaviors, StashBuffer }
import akka.actor.typed.scaladsl.{ ActorContext, Behaviors }
import akka.annotation.InternalApi
import akka.persistence._
import akka.persistence.typed.internal.EventsourcedBehavior.{ InternalProtocol, WriterIdentity }
@ -25,14 +25,17 @@ private[akka] final case class PersistentBehaviorImpl[Command, Event, State](
tagger: Event Set[String] = (_: Event) Set.empty[String],
snapshotWhen: (State, Event, Long) Boolean = ConstantFun.scalaAnyThreeToFalse,
recovery: Recovery = Recovery()
) extends PersistentBehavior[Command, Event, State] {
) extends PersistentBehavior[Command, Event, State] with EventsourcedStashReferenceManagement {
override def apply(context: typed.ActorContext[Command]): Behavior[Command] = {
Behaviors.setup[EventsourcedBehavior.InternalProtocol] { ctx
Behaviors.withTimers { timers
val settings = EventsourcedSettings(ctx.system)
.withJournalPluginId(journalPluginId.getOrElse(""))
.withSnapshotPluginId(snapshotPluginId.getOrElse(""))
val internalStash = stashBuffer(settings)
Behaviors.tap(
onMessage = (_, _) Unit,
onSignal = onSignalCleanup,
behavior = {
val setup = new EventsourcedSetup(
ctx,
timers,
@ -47,11 +50,13 @@ private[akka] final case class PersistentBehaviorImpl[Command, Event, State](
recovery,
holdingRecoveryPermit = false,
settings = settings,
internalStash = StashBuffer(settings.stashCapacity)
internalStash = internalStash
)
EventsourcedRequestingRecoveryPermit(setup)
}
)
}
}.widen[Any] {
case res: JournalProtocol.Response InternalProtocol.JournalResponse(res)
case res: SnapshotProtocol.Response InternalProtocol.SnapshotterResponse(res)

View file

@ -0,0 +1,82 @@
/*
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.typed.internal
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.{ Behavior, Signal, TypedAkkaSpecWithShutdown }
import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol
import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol.{ IncomingCommand, RecoveryPermitGranted }
import akka.testkit.typed.scaladsl.{ ActorTestKit, TestProbe }
import scala.concurrent.duration.{ FiniteDuration, _ }
class EventsourcedStashReferenceManagementTest extends ActorTestKit with TypedAkkaSpecWithShutdown {
case class Impl() extends EventsourcedStashReferenceManagement
"EventsourcedStashReferenceManagement instance" should {
"initialize stash only once" in {
val ref = Impl()
assert(ref.stashBuffer(dummySettings()).eq(ref.stashBuffer(dummySettings())))
}
// or should we?
"not reinitialize when capacity changes" in {
val ref = Impl()
assert(ref.stashBuffer(dummySettings()).eq(ref.stashBuffer(dummySettings(21))))
}
"clear buffer on PostStop" in {
val probe = TestProbe[Int]()
val behavior = TestBehavior(probe)
val ref = spawn(behavior)
ref ! RecoveryPermitGranted
ref ! RecoveryPermitGranted
probe.expectMessage(1)
probe.expectMessage(2)
ref ! IncomingCommand("bye")
probe.expectTerminated(ref, 100.millis)
spawn(behavior) ! RecoveryPermitGranted
probe.expectMessage(1)
}
}
object TestBehavior extends EventsourcedStashReferenceManagement {
def apply(probe: TestProbe[Int]): Behavior[InternalProtocol] = {
val settings = dummySettings()
Behaviors.setup[InternalProtocol](ctx
Behaviors.receiveMessagePartial[InternalProtocol] {
case RecoveryPermitGranted
stashBuffer(settings).stash(RecoveryPermitGranted)
probe.ref ! stashBuffer(settings).size
Behaviors.same[InternalProtocol]
case _: IncomingCommand[_] Behaviors.stopped
}.receiveSignal {
case (_, signal: Signal)
onSignalCleanup.apply(ctx, signal); Behaviors.stopped[InternalProtocol]
}
)
}
}
private def dummySettings(capacity: Int = 42) = new EventsourcedSettings {
override def stashCapacity: Int = capacity
override def logOnStashing: Boolean = ???
override def stashOverflowStrategyConfigurator: String = ???
override def recoveryEventTimeout: FiniteDuration = ???
override def journalPluginId: String = ???
override def withJournalPluginId(id: String): EventsourcedSettings = ???
override def snapshotPluginId: String = ???
override def withSnapshotPluginId(id: String): EventsourcedSettings = ???
}
}

View file

@ -0,0 +1,138 @@
/*
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.typed.scaladsl
import java.util.UUID
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.{ ActorRef, SupervisorStrategy, TypedAkkaSpecWithShutdown }
import akka.persistence.typed.scaladsl.PersistentBehaviors.CommandHandler
import akka.testkit.typed.TE
import akka.testkit.typed.scaladsl.{ ActorTestKit, TestProbe }
import com.typesafe.config.{ Config, ConfigFactory }
import org.scalatest.concurrent.Eventually
import scala.concurrent.duration._
object PerformanceSpec {
val config =
"""
akka.persistence.performance.cycles.load = 100
# more accurate throughput measurements
#akka.persistence.performance.cycles.load = 200000
"""
sealed trait Command
case object StopMeasure extends Command
case class FailAt(sequence: Long) extends Command
case class CommandWithEvent(evt: String) extends Command
class Measure(numberOfMessages: Int) {
private val NanoToSecond = 1000.0 * 1000 * 1000
private var startTime: Long = 0L
private var stopTime: Long = 0L
def startMeasure(): Unit = {
startTime = System.nanoTime
}
def stopMeasure(): Double = {
stopTime = System.nanoTime
NanoToSecond * numberOfMessages / (stopTime - startTime)
}
}
case class Parameters(var persistCalls: Long = 0L, var failAt: Long = -1) {
def every(num: Long): Boolean = persistCalls % num == 0
def shouldFail: Boolean = persistCalls == failAt
def failureWasDefined: Boolean = failAt != -1L
}
def behavior(name: String, probe: TestProbe[Command])(other: (Command, Parameters) Effect[String, String]) = {
Behaviors.supervise({
val parameters = Parameters()
PersistentBehaviors.receive[Command, String, String](
persistenceId = name,
"",
commandHandler = CommandHandler.command {
case StopMeasure Effect.none.andThen(probe.ref ! StopMeasure)
case FailAt(sequence) Effect.none.andThen(_ parameters.failAt = sequence)
case command other(command, parameters)
},
eventHandler = {
case (state, _) state
}
).onRecoveryCompleted {
case (_, _) if (parameters.every(1000)) print("r")
}
}).onFailure(SupervisorStrategy.restart)
}
def eventSourcedTestPersistenceBehavior(name: String, probe: TestProbe[Command]) =
behavior(name, probe) {
case (CommandWithEvent(evt), parameters)
Effect.persist(evt).andThen({
parameters.persistCalls += 1
if (parameters.every(1000)) print(".")
if (parameters.shouldFail) throw TE("boom")
})
case _ Effect.none
}
}
class PerformanceSpec extends ActorTestKit with TypedAkkaSpecWithShutdown with Eventually {
import PerformanceSpec._
val loadCycles = system.settings.config.getInt("akka.persistence.performance.cycles.load")
override def config: Config =
ConfigFactory.parseString(
s"""
akka.actor.serialize-creators = off
akka.actor.serialize-messages = off
akka.actor.warn-about-java-serializer-usage = off
akka.persistence.publish-plugin-commands = on
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb"
akka.persistence.journal.leveldb.dir = "target/journal-PerformanceSpec"
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
akka.persistence.snapshot-store.local.dir = "target/snapshots-PerformanceSpec/"
akka.test.single-expect-default = 10s
""").withFallback(ConfigFactory.parseString(PerformanceSpec.config))
def stressPersistentActor(persistentActor: ActorRef[Command], probe: TestProbe[Command],
failAt: Option[Long], description: String): Unit = {
failAt foreach { persistentActor ! FailAt(_) }
val m = new Measure(loadCycles)
m.startMeasure()
1 to loadCycles foreach { i persistentActor ! CommandWithEvent(s"msg$i") }
persistentActor ! StopMeasure
probe.expectMessage(100.seconds, StopMeasure)
println(f"\nthroughput = ${m.stopMeasure()}%.2f $description per second")
}
def stressEventSourcedPersistentActor(failAt: Option[Long]): Unit = {
val probe = TestProbe[Command]
val name = s"${this.getClass.getSimpleName}-${UUID.randomUUID().toString}"
val persistentActor = spawn(eventSourcedTestPersistenceBehavior(name, probe), name)
stressPersistentActor(persistentActor, probe, failAt, "persistent events")
}
"An event sourced persistent actor" should {
"have some reasonable throughput" in {
stressEventSourcedPersistentActor(None)
}
"have some reasonable throughput under failure conditions" in {
stressEventSourcedPersistentActor(Some(loadCycles / 10))
}
}
}

View file

@ -400,6 +400,7 @@ lazy val persistenceTyped = akkaModule("akka-persistence-typed")
typedTestkit % "test->test",
actorTypedTests % "test->test"
)
.settings(Dependencies.persistenceShared)
.settings(AkkaBuild.mayChangeSettings)
.settings(AutomaticModuleName.settings("akka.persistence.typed"))
.disablePlugins(MimaPlugin)