Move typed persistence tests away from LevelDB (#30224)

This commit is contained in:
Johan Andrén 2021-06-02 17:15:18 +02:00 committed by GitHub
parent ab98618a25
commit b5cbf383d6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
29 changed files with 221 additions and 270 deletions

View file

@ -19,6 +19,7 @@ private[testkit] object SerializedEventStorageImpl {
sequenceNr: Long,
payloadSerId: Int,
payloadSerManifest: String,
eventAdapterManifest: String,
writerUuid: String,
payload: Array[Byte],
tags: Set[String],
@ -48,14 +49,15 @@ private[testkit] class SerializedEventStorageImpl(system: ActorSystem) extends E
val s = serialization.findSerializerFor(payload)
val manifest = Serializers.manifestFor(s, payload)
Serialized(
pr.persistenceId,
pr.sequenceNr,
s.identifier,
manifest,
pr.writerUuid,
s.toBinary(payload),
tags,
pr.metadata)
persistenceId = pr.persistenceId,
sequenceNr = pr.sequenceNr,
payloadSerId = s.identifier,
payloadSerManifest = manifest,
eventAdapterManifest = pr.manifest,
writerUuid = pr.writerUuid,
payload = s.toBinary(payload),
tags = tags,
metadata = pr.metadata)
}
/**
@ -66,7 +68,12 @@ private[testkit] class SerializedEventStorageImpl(system: ActorSystem) extends E
val eventForRepr =
if (internal.tags.isEmpty) event
else Tagged(event, internal.tags)
val pr = PersistentRepr(eventForRepr, internal.sequenceNr, internal.persistenceId, writerUuid = internal.writerUuid)
val pr = PersistentRepr(
payload = eventForRepr,
sequenceNr = internal.sequenceNr,
persistenceId = internal.persistenceId,
writerUuid = internal.writerUuid,
manifest = internal.eventAdapterManifest)
internal.metadata.fold(pr)(meta => pr.withMetadata(meta))
}

View file

@ -45,15 +45,15 @@ final class PersistenceTestKitReadJournal(system: ExtendedActorSystem, @unused c
override def eventsByPersistenceId(
persistenceId: String,
fromSequenceNr: Long,
toSequenceNr: Long): Source[EventEnvelope, NotUsed] = {
fromSequenceNr: Long = 0,
toSequenceNr: Long = Long.MaxValue): Source[EventEnvelope, NotUsed] = {
Source.fromGraph(new EventsByPersistenceIdStage(persistenceId, fromSequenceNr, toSequenceNr, storage))
}
override def currentEventsByPersistenceId(
persistenceId: String,
fromSequenceNr: Long,
toSequenceNr: Long): Source[EventEnvelope, NotUsed] = {
fromSequenceNr: Long = 0,
toSequenceNr: Long = Long.MaxValue): Source[EventEnvelope, NotUsed] = {
Source(storage.tryRead(persistenceId, fromSequenceNr, toSequenceNr, Long.MaxValue)).map { pr =>
EventEnvelope(
Sequence(pr.sequenceNr),
@ -65,7 +65,7 @@ final class PersistenceTestKitReadJournal(system: ExtendedActorSystem, @unused c
}
}
override def currentEventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed] = {
override def currentEventsByTag(tag: String, offset: Offset = NoOffset): Source[EventEnvelope, NotUsed] = {
offset match {
case NoOffset =>
case _ =>
@ -73,7 +73,7 @@ final class PersistenceTestKitReadJournal(system: ExtendedActorSystem, @unused c
}
Source(storage.tryReadByTag(tag)).map { pr =>
EventEnvelope(
Sequence(pr.timestamp),
Sequence(pr.sequenceNr),
pr.persistenceId,
pr.sequenceNr,
unwrapTaggedPayload(pr.payload),

View file

@ -7,6 +7,8 @@ package akka.persistence.typed.javadsl;
import akka.Done;
import akka.actor.testkit.typed.javadsl.LogCapturing;
import akka.actor.testkit.typed.javadsl.LoggingTestKit;
import akka.actor.testkit.typed.javadsl.TestKitJunitResource;
import akka.actor.testkit.typed.javadsl.TestProbe;
import akka.actor.typed.*;
import akka.actor.typed.javadsl.ActorContext;
import akka.actor.typed.javadsl.Adapter;
@ -16,18 +18,16 @@ import akka.persistence.query.EventEnvelope;
import akka.persistence.query.NoOffset;
import akka.persistence.query.PersistenceQuery;
import akka.persistence.query.Sequence;
import akka.persistence.query.journal.leveldb.javadsl.LeveldbReadJournal;
import akka.persistence.testkit.PersistenceTestKitPlugin;
import akka.persistence.testkit.PersistenceTestKitSnapshotPlugin;
import akka.persistence.testkit.query.javadsl.PersistenceTestKitReadJournal;
import akka.persistence.typed.*;
import akka.persistence.typed.scaladsl.EventSourcedBehaviorSpec;
import akka.serialization.jackson.CborSerializable;
import akka.stream.javadsl.Sink;
import akka.actor.testkit.typed.javadsl.TestKitJunitResource;
import akka.actor.testkit.typed.javadsl.TestProbe;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.google.common.collect.Sets;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import org.junit.ClassRule;
import org.junit.Rule;
@ -42,18 +42,22 @@ import static akka.Done.done;
import static java.util.Collections.singletonList;
import static org.junit.Assert.assertEquals;
public class PersistentActorJavaDslTest extends JUnitSuite {
public class EventSourcedBehaviorJavaDslTest extends JUnitSuite {
public static final Config config =
EventSourcedBehaviorSpec.conf().withFallback(ConfigFactory.load());
@ClassRule public static final TestKitJunitResource testKit = new TestKitJunitResource(config);
@ClassRule
public static final TestKitJunitResource testKit =
new TestKitJunitResource(
ConfigFactory.parseString(
"akka.loglevel = INFO\n" + "akka.loggers = [\"akka.testkit.TestEventListener\"]")
.withFallback(PersistenceTestKitPlugin.getInstance().config())
.withFallback(PersistenceTestKitSnapshotPlugin.config()));
@Rule public final LogCapturing logCapturing = new LogCapturing();
private LeveldbReadJournal queries =
private PersistenceTestKitReadJournal queries =
PersistenceQuery.get(Adapter.toClassic(testKit.system()))
.getReadJournalFor(LeveldbReadJournal.class, LeveldbReadJournal.Identifier());
.getReadJournalFor(
PersistenceTestKitReadJournal.class, PersistenceTestKitReadJournal.Identifier());
interface Command extends CborSerializable {}

View file

@ -4,15 +4,6 @@
package akka.persistence.typed.scaladsl
import scala.collection.immutable
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.util.Try
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import org.scalatest.wordspec.AnyWordSpecLike
import akka.actor.testkit.typed.TestException
import akka.actor.testkit.typed.TestKitSettings
import akka.actor.testkit.typed.scaladsl._
@ -30,6 +21,14 @@ import akka.persistence.typed.PersistenceId
import akka.persistence.typed.RecoveryCompleted
import akka.persistence.typed.RecoveryFailed
import akka.persistence.typed.internal.JournalFailureException
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import org.scalatest.wordspec.AnyWordSpecLike
import scala.collection.immutable
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.util.Try
class ChaosJournal extends InmemJournal {
var counts = Map.empty[String, Int]

View file

@ -4,19 +4,19 @@
package akka.persistence.typed.scaladsl
import java.util.concurrent.atomic.AtomicInteger
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import org.scalatest.wordspec.AnyWordSpecLike
import akka.actor.testkit.typed.scaladsl._
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.actor.typed.BehaviorInterceptor
import akka.actor.typed.TypedActorContext
import akka.actor.typed.scaladsl.Behaviors
import akka.persistence.testkit.PersistenceTestKitPlugin
import akka.persistence.typed.PersistenceId
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import org.scalatest.wordspec.AnyWordSpecLike
import java.util.concurrent.atomic.AtomicInteger
object EventSourcedBehaviorInterceptorSpec {
@ -44,7 +44,7 @@ object EventSourcedBehaviorInterceptorSpec {
}
class EventSourcedBehaviorInterceptorSpec
extends ScalaTestWithActorTestKit(EventSourcedBehaviorTimersSpec.config)
extends ScalaTestWithActorTestKit(PersistenceTestKitPlugin.config)
with AnyWordSpecLike
with LogCapturing {

View file

@ -4,14 +4,6 @@
package akka.persistence.typed.scaladsl
import java.util.concurrent.atomic.AtomicInteger
import scala.concurrent.duration._
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import org.scalatest.wordspec.AnyWordSpecLike
import akka.actor.testkit.typed.scaladsl._
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
@ -21,6 +13,12 @@ import akka.persistence.journal.SteppingInmemJournal
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.RecoveryFailed
import akka.persistence.typed.internal.JournalFailureException
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import org.scalatest.wordspec.AnyWordSpecLike
import java.util.concurrent.atomic.AtomicInteger
import scala.concurrent.duration._
object EventSourcedBehaviorRecoveryTimeoutSpec {

View file

@ -4,31 +4,20 @@
package akka.persistence.typed.scaladsl
import java.util.UUID
import java.util.concurrent.atomic.AtomicInteger
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import org.scalatest.wordspec.AnyWordSpecLike
import akka.Done
import akka.actor.testkit.typed.scaladsl._
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.ActorContext
import akka.actor.typed.scaladsl.Behaviors
import akka.persistence.testkit.PersistenceTestKitPlugin
import akka.persistence.typed.PersistenceId
import akka.serialization.jackson.CborSerializable
import org.scalatest.wordspec.AnyWordSpecLike
import java.util.concurrent.atomic.AtomicInteger
object EventSourcedBehaviorReplySpec {
def conf: Config = ConfigFactory.parseString(s"""
akka.loglevel = INFO
# akka.persistence.typed.log-stashing = on
akka.persistence.journal.leveldb.dir = "target/typed-persistence-${UUID.randomUUID().toString}"
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb"
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
akka.persistence.snapshot-store.local.dir = "target/typed-persistence-${UUID.randomUUID().toString}"
""")
sealed trait Command[ReplyMessage] extends CborSerializable
final case class IncrementWithConfirmation(replyTo: ActorRef[Done]) extends Command[Done]
@ -75,7 +64,7 @@ object EventSourcedBehaviorReplySpec {
}
class EventSourcedBehaviorReplySpec
extends ScalaTestWithActorTestKit(EventSourcedBehaviorReplySpec.conf)
extends ScalaTestWithActorTestKit(PersistenceTestKitPlugin.config)
with AnyWordSpecLike
with LogCapturing {

View file

@ -4,23 +4,13 @@
package akka.persistence.typed.scaladsl
import java.util.UUID
import java.util.concurrent.atomic.AtomicInteger
import scala.concurrent.duration._
import scala.util.Success
import scala.util.Try
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpecLike
import akka.actor.testkit.typed.scaladsl._
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.ActorContext
import akka.actor.typed.scaladsl.Behaviors
import akka.persistence.testkit.PersistenceTestKitPlugin
import akka.persistence.testkit.PersistenceTestKitSnapshotPlugin
import akka.persistence.typed.DeleteEventsCompleted
import akka.persistence.typed.DeleteSnapshotsCompleted
import akka.persistence.typed.DeleteSnapshotsFailed
@ -33,19 +23,16 @@ import akka.persistence.typed.SnapshotFailed
import akka.persistence.typed.SnapshotSelectionCriteria
import akka.serialization.jackson.CborSerializable
import akka.util.unused
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpecLike
import java.util.concurrent.atomic.AtomicInteger
import scala.concurrent.duration._
import scala.util.Success
import scala.util.Try
object EventSourcedBehaviorRetentionSpec extends Matchers {
def conf: Config = ConfigFactory.parseString(s"""
akka.loglevel = INFO
# akka.persistence.typed.log-stashing = on
akka.persistence.journal.leveldb.dir = "target/typed-persistence-${UUID.randomUUID().toString}"
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb"
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
akka.persistence.snapshot-store.local.dir = "target/typed-persistence-${UUID.randomUUID().toString}"
akka.actor.testkit.typed.single-expect-default = 10s # increased for slow disk on Jenkins servers
""")
sealed trait Command extends CborSerializable
case object Increment extends Command
final case class IncrementWithPersistAll(nr: Int) extends Command
@ -128,7 +115,8 @@ object EventSourcedBehaviorRetentionSpec extends Matchers {
}
class EventSourcedBehaviorRetentionSpec
extends ScalaTestWithActorTestKit(EventSourcedBehaviorRetentionSpec.conf)
extends ScalaTestWithActorTestKit(
PersistenceTestKitPlugin.config.withFallback(PersistenceTestKitSnapshotPlugin.config))
with AnyWordSpecLike
with LogCapturing {

View file

@ -4,17 +4,6 @@
package akka.persistence.typed.scaladsl
import java.util.UUID
import java.util.concurrent.atomic.AtomicInteger
import scala.concurrent.Future
import scala.concurrent.Promise
import scala.concurrent.duration._
import scala.util.Failure
import scala.util.Success
import scala.util.Try
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import org.scalatest.wordspec.AnyWordSpecLike
import akka.Done
import akka.actor.ActorInitializationException
import akka.actor.testkit.typed.TestException
@ -26,25 +15,37 @@ import akka.actor.typed.SupervisorStrategy
import akka.actor.typed.Terminated
import akka.actor.typed.scaladsl.ActorContext
import akka.actor.typed.scaladsl.Behaviors
import akka.persistence.{ SnapshotMetadata => ClassicSnapshotMetadata }
import akka.persistence.{ SnapshotSelectionCriteria => ClassicSnapshotSelectionCriteria }
import akka.persistence.SelectedSnapshot
import akka.persistence.journal.inmem.InmemJournal
import akka.persistence.query.EventEnvelope
import akka.persistence.query.Offset
import akka.persistence.query.PersistenceQuery
import akka.persistence.query.Sequence
import akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal
import akka.persistence.snapshot.SnapshotStore
import akka.persistence.testkit.PersistenceTestKitPlugin
import akka.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.RecoveryCompleted
import akka.persistence.typed.SnapshotCompleted
import akka.persistence.typed.SnapshotFailed
import akka.persistence.typed.SnapshotMetadata
import akka.persistence.typed.SnapshotSelectionCriteria
import akka.persistence.{ SnapshotMetadata => ClassicSnapshotMetadata }
import akka.persistence.{ SnapshotSelectionCriteria => ClassicSnapshotSelectionCriteria }
import akka.serialization.jackson.CborSerializable
import akka.stream.scaladsl.Sink
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import org.scalatest.wordspec.AnyWordSpecLike
import scala.annotation.nowarn
import java.util.UUID
import java.util.concurrent.atomic.AtomicInteger
import scala.concurrent.Future
import scala.concurrent.Promise
import scala.concurrent.duration._
import scala.util.Failure
import scala.util.Success
import scala.util.Try
object EventSourcedBehaviorSpec {
@ -76,11 +77,9 @@ object EventSourcedBehaviorSpec {
}
// also used from PersistentActorTest, EventSourcedBehaviorWatchSpec
def conf: Config = ConfigFactory.parseString(s"""
def conf: Config = PersistenceTestKitPlugin.config.withFallback(ConfigFactory.parseString(s"""
akka.loglevel = INFO
# akka.persistence.typed.log-stashing = on
akka.persistence.journal.leveldb.dir = "target/typed-persistence-${UUID.randomUUID().toString}"
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb"
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
akka.persistence.snapshot-store.local.dir = "target/typed-persistence-${UUID.randomUUID().toString}"
@ -89,7 +88,7 @@ object EventSourcedBehaviorSpec {
class = "${classOf[InmemJournal].getName}"
recovery-event-timeout = 10 millis
}
""")
"""))
sealed trait Command extends CborSerializable
case object Increment extends Command
@ -285,9 +284,8 @@ class EventSourcedBehaviorSpec
import EventSourcedBehaviorSpec._
@nowarn("msg=deprecated")
val queries: LeveldbReadJournal =
PersistenceQuery(system).readJournalFor[LeveldbReadJournal](LeveldbReadJournal.Identifier)
val queries: PersistenceTestKitReadJournal =
PersistenceQuery(system).readJournalFor[PersistenceTestKitReadJournal](PersistenceTestKitReadJournal.Identifier)
val pidCounter = new AtomicInteger(0)
private def nextPid(): PersistenceId = PersistenceId.ofUniqueId(s"c${pidCounter.incrementAndGet()})")
@ -529,7 +527,7 @@ class EventSourcedBehaviorSpec
c ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(1, Vector(0)))
val events = queries.currentEventsByTag("tag1").runWith(Sink.seq).futureValue
val events = queries.currentEventsByTag("tag1", Offset.noOffset).runWith(Sink.seq).futureValue
events shouldEqual List(EventEnvelope(Sequence(1), pid.id, 1, Incremented(1), 0L))
}
@ -662,8 +660,7 @@ class EventSourcedBehaviorSpec
val testkit2 = ActorTestKit(
ActorTestKitBase.testNameFromCallStack(),
ConfigFactory.parseString(s"""
akka.persistence.journal.leveldb.dir = "target/typed-persistence-${UUID.randomUUID().toString}"
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb"
akka.persistence.journal.plugin = "akka.persistence.journal.inmem"
"""))
try {
LoggingTestKit
@ -685,8 +682,7 @@ class EventSourcedBehaviorSpec
val testkit2 = ActorTestKit(
ActorTestKitBase.testNameFromCallStack(),
ConfigFactory.parseString(s"""
akka.persistence.journal.leveldb.dir = "target/typed-persistence-${UUID.randomUUID().toString}"
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb"
akka.persistence.journal.plugin = "akka.persistence.journal.inmem"
"""))
try {
LoggingTestKit

View file

@ -4,16 +4,6 @@
package akka.persistence.typed.scaladsl
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import scala.concurrent.duration._
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import org.scalatest.wordspec.AnyWordSpecLike
import akka.NotUsed
import akka.actor.Dropped
import akka.actor.UnhandledMessage
@ -30,6 +20,14 @@ import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.adapter._
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.RecoveryCompleted
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import org.scalatest.wordspec.AnyWordSpecLike
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import scala.concurrent.duration._
object EventSourcedBehaviorStashSpec {
def conf: Config = ConfigFactory.parseString(s"""

View file

@ -4,31 +4,21 @@
package akka.persistence.typed.scaladsl
import java.util.UUID
import java.util.concurrent.atomic.AtomicInteger
import scala.concurrent.duration._
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import org.scalatest.wordspec.AnyWordSpecLike
import akka.actor.testkit.typed.scaladsl._
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.Behaviors
import akka.persistence.testkit.PersistenceTestKitPlugin
import akka.persistence.typed.PersistenceId
import org.scalatest.wordspec.AnyWordSpecLike
import java.util.concurrent.atomic.AtomicInteger
import scala.concurrent.duration._
object EventSourcedBehaviorTimersSpec {
val journalId = "event-sourced-behavior-timers-spec"
def config: Config = ConfigFactory.parseString(s"""
akka.loglevel = INFO
akka.persistence.journal.leveldb.dir = "target/typed-persistence-${UUID.randomUUID().toString}"
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb"
""")
def testBehavior(persistenceId: PersistenceId, probe: ActorRef[String]): Behavior[String] =
Behaviors.setup { _ =>
Behaviors.withTimers { timers =>
@ -74,7 +64,7 @@ object EventSourcedBehaviorTimersSpec {
}
class EventSourcedBehaviorTimersSpec
extends ScalaTestWithActorTestKit(EventSourcedBehaviorTimersSpec.config)
extends ScalaTestWithActorTestKit(PersistenceTestKitPlugin.config)
with AnyWordSpecLike
with LogCapturing {

View file

@ -4,27 +4,31 @@
package akka.persistence.typed.scaladsl
import java.util.concurrent.atomic.AtomicInteger
import org.scalatest.wordspec.AnyWordSpecLike
import akka.actor.testkit.typed.TestException
import akka.actor.testkit.typed.scaladsl.{ LogCapturing, LoggingTestKit, ScalaTestWithActorTestKit, TestProbe }
import akka.actor.testkit.typed.scaladsl.LogCapturing
import akka.actor.testkit.typed.scaladsl.LoggingTestKit
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.actor.testkit.typed.scaladsl.TestProbe
import akka.actor.typed._
import akka.actor.typed.scaladsl.{ ActorContext, Behaviors }
import akka.persistence.{ Recovery => ClassicRecovery }
import akka.persistence.typed.{ NoOpEventAdapter, PersistenceId, RecoveryCompleted }
import akka.persistence.typed.internal.{
BehaviorSetup,
EventSourcedSettings,
InternalProtocol,
NoOpSnapshotAdapter,
StashState
}
import akka.actor.typed.scaladsl.ActorContext
import akka.actor.typed.scaladsl.Behaviors
import akka.persistence.typed.internal.EventSourcedBehaviorImpl.WriterIdentity
import akka.persistence.typed.internal.BehaviorSetup
import akka.persistence.typed.internal.EventSourcedSettings
import akka.persistence.typed.internal.InternalProtocol
import akka.persistence.typed.internal.NoOpSnapshotAdapter
import akka.persistence.typed.internal.StashState
import akka.persistence.typed.NoOpEventAdapter
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.RecoveryCompleted
import akka.persistence.{ Recovery => ClassicRecovery }
import akka.serialization.jackson.CborSerializable
import akka.util.ConstantFun
import org.scalatest.wordspec.AnyWordSpecLike
import org.slf4j.LoggerFactory
import java.util.concurrent.atomic.AtomicInteger
object EventSourcedBehaviorWatchSpec {
sealed trait Command extends CborSerializable
case object Fail extends Command

View file

@ -4,10 +4,6 @@
package akka.persistence.typed.scaladsl
import java.util.UUID
import java.util.concurrent.atomic.AtomicInteger
import com.typesafe.config.ConfigFactory
import org.scalatest.wordspec.AnyWordSpecLike
import akka.actor.testkit.typed.scaladsl.LogCapturing
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.actor.testkit.typed.scaladsl.TestProbe
@ -16,23 +12,21 @@ import akka.actor.typed.scaladsl.Behaviors
import akka.persistence.query.EventEnvelope
import akka.persistence.query.PersistenceQuery
import akka.persistence.query.Sequence
import akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal
import akka.persistence.testkit.PersistenceTestKitPlugin
import akka.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal
import akka.persistence.typed.EventAdapter
import akka.persistence.typed.EventSeq
import akka.persistence.typed.PersistenceId
import akka.serialization.jackson.CborSerializable
import akka.stream.scaladsl.Sink
import akka.testkit.JavaSerializable
import com.typesafe.config.ConfigFactory
import org.scalatest.wordspec.AnyWordSpecLike
import scala.annotation.nowarn
import java.util.concurrent.atomic.AtomicInteger
object EventSourcedEventAdapterSpec {
private val conf = ConfigFactory.parseString(s"""
akka.persistence.journal.leveldb.dir = "target/typed-persistence-${UUID.randomUUID().toString}"
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb"
""")
case class Wrapper(event: String) extends CborSerializable
class WrapperEventAdapter extends EventAdapter[String, Wrapper] {
override def toJournal(e: String): Wrapper = Wrapper("<" + e)
@ -82,27 +76,18 @@ object EventSourcedEventAdapterSpec {
}
class EventSourcedEventAdapterSpec
extends ScalaTestWithActorTestKit(EventSourcedEventAdapterSpec.conf)
extends ScalaTestWithActorTestKit(ConfigFactory.parseString("""
akka.persistence.testkit.events.serialize = true""").withFallback(PersistenceTestKitPlugin.config))
with AnyWordSpecLike
with LogCapturing {
import EventSourcedBehaviorSpec.{
counter,
Command,
Event,
GetValue,
Increment,
IncrementWithPersistAll,
Incremented,
State
}
import EventSourcedBehaviorSpec._
import EventSourcedEventAdapterSpec._
val pidCounter = new AtomicInteger(0)
private def nextPid(): PersistenceId = PersistenceId.ofUniqueId(s"c${pidCounter.incrementAndGet()})")
@nowarn("msg=deprecated")
val queries: LeveldbReadJournal =
PersistenceQuery(system).readJournalFor[LeveldbReadJournal](LeveldbReadJournal.Identifier)
val queries: PersistenceTestKitReadJournal =
PersistenceQuery(system).readJournalFor[PersistenceTestKitReadJournal](PersistenceTestKitReadJournal.Identifier)
private def behavior(pid: PersistenceId, probe: ActorRef[String]): EventSourcedBehavior[String, String, String] =
EventSourcedBehavior(pid, "", commandHandler = { (_, command) =>

View file

@ -4,15 +4,16 @@
package akka.persistence.typed.scaladsl
import com.typesafe.config.ConfigFactory
import org.scalatest.wordspec.AnyWordSpecLike
import akka.actor.testkit.typed.scaladsl.{ ScalaTestWithActorTestKit, TestProbe }
import akka.actor.testkit.typed.scaladsl.LogCapturing
import akka.actor.typed.{ ActorRef, Behavior }
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.actor.testkit.typed.scaladsl.TestProbe
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.RecoveryCompleted
import com.typesafe.config.ConfigFactory
import org.scalatest.wordspec.AnyWordSpecLike
object EventSourcedSequenceNumberSpec {

View file

@ -4,30 +4,23 @@
package akka.persistence.typed.scaladsl
import java.util.UUID
import java.util.concurrent.atomic.AtomicInteger
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import org.scalatest.wordspec.AnyWordSpecLike
import akka.actor.testkit.typed.scaladsl.LogCapturing
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.actor.testkit.typed.scaladsl.TestProbe
import akka.actor.typed.ActorRef
import akka.persistence.query.PersistenceQuery
import akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal
import akka.persistence.testkit.PersistenceTestKitPlugin
import akka.persistence.testkit.PersistenceTestKitSnapshotPlugin
import akka.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.SnapshotAdapter
import akka.serialization.jackson.CborSerializable
import org.scalatest.wordspec.AnyWordSpecLike
import scala.annotation.nowarn
import java.util.concurrent.atomic.AtomicInteger
object EventSourcedSnapshotAdapterSpec {
private val conf: Config = ConfigFactory.parseString(s"""
akka.persistence.journal.leveldb.dir = "target/typed-persistence-${UUID.randomUUID().toString}"
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb"
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
akka.persistence.snapshot-store.local.dir = "target/typed-persistence-${UUID.randomUUID().toString}"
""")
case class State(s: String) extends CborSerializable
case class Command(c: String) extends CborSerializable
case class Event(e: String) extends CborSerializable
@ -35,19 +28,19 @@ object EventSourcedSnapshotAdapterSpec {
}
class EventSourcedSnapshotAdapterSpec
extends ScalaTestWithActorTestKit(EventSourcedSnapshotAdapterSpec.conf)
extends ScalaTestWithActorTestKit(
PersistenceTestKitPlugin.config.withFallback(PersistenceTestKitSnapshotPlugin.config))
with AnyWordSpecLike
with LogCapturing {
import EventSourcedSnapshotAdapterSpec._
import akka.actor.typed.scaladsl.adapter._
val pidCounter = new AtomicInteger(0)
private def nextPid(): PersistenceId = PersistenceId.ofUniqueId(s"c${pidCounter.incrementAndGet()})")
@nowarn("msg=deprecated")
val queries: LeveldbReadJournal =
PersistenceQuery(system.toClassic).readJournalFor[LeveldbReadJournal](LeveldbReadJournal.Identifier)
val queries: PersistenceTestKitReadJournal =
PersistenceQuery(system.toClassic)
.readJournalFor[PersistenceTestKitReadJournal](PersistenceTestKitReadJournal.Identifier)
private def behavior(pid: PersistenceId, probe: ActorRef[State]): EventSourcedBehavior[Command, Event, State] =
EventSourcedBehavior[Command, Event, State](

View file

@ -3,11 +3,6 @@
*/
package akka.persistence.typed.scaladsl
import java.util.concurrent.atomic.AtomicInteger
import org.scalatest.wordspec.AnyWordSpecLike
import org.slf4j.event.Level
import akka.actor.testkit.typed.scaladsl.LogCapturing
import akka.actor.testkit.typed.scaladsl.LoggingTestKit
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
@ -17,6 +12,10 @@ import akka.persistence.typed.PersistenceId
import akka.persistence.typed.RecoveryCompleted
import akka.persistence.typed.SnapshotCompleted
import akka.persistence.typed.SnapshotFailed
import org.scalatest.wordspec.AnyWordSpecLike
import org.slf4j.event.Level
import java.util.concurrent.atomic.AtomicInteger
// Note that the spec name here is important since there are heuristics in place to avoid names
// starting with EventSourcedBehavior

View file

@ -4,15 +4,14 @@
package akka.persistence.typed.scaladsl
import com.typesafe.config.ConfigFactory
import org.scalatest.wordspec.AnyWordSpecLike
import akka.actor.testkit.typed.TestKitSettings
import akka.actor.testkit.typed.scaladsl._
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.RecoveryCompleted
import com.typesafe.config.ConfigFactory
import org.scalatest.wordspec.AnyWordSpecLike
object NullEmptyStateSpec {

View file

@ -4,10 +4,6 @@
package akka.persistence.typed.scaladsl
import java.util.UUID
import org.scalatest.wordspec.AnyWordSpecLike
import akka.actor.testkit.typed.scaladsl.LogCapturing
import akka.actor.testkit.typed.scaladsl.LoggingTestKit
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
@ -15,6 +11,9 @@ import akka.actor.testkit.typed.scaladsl.TestProbe
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.scaladsl.EventSourcedBehavior.CommandHandler
import akka.serialization.jackson.CborSerializable
import org.scalatest.wordspec.AnyWordSpecLike
import java.util.UUID
object OptionalSnapshotStoreSpec {

View file

@ -4,13 +4,6 @@
package akka.persistence.typed.scaladsl
import java.util.UUID
import scala.concurrent.duration._
import com.typesafe.config.ConfigFactory
import org.scalatest.wordspec.AnyWordSpecLike
import akka.actor.testkit.typed.TestException
import akka.actor.testkit.typed.scaladsl.LogCapturing
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
@ -18,9 +11,16 @@ import akka.actor.testkit.typed.scaladsl.TestProbe
import akka.actor.typed.ActorRef
import akka.actor.typed.SupervisorStrategy
import akka.actor.typed.scaladsl.Behaviors
import akka.persistence.testkit.PersistenceTestKitPlugin
import akka.persistence.testkit.PersistenceTestKitSnapshotPlugin
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.RecoveryCompleted
import akka.persistence.typed.scaladsl.EventSourcedBehavior.CommandHandler
import com.typesafe.config.ConfigFactory
import org.scalatest.wordspec.AnyWordSpecLike
import java.util.UUID
import scala.concurrent.duration._
object PerformanceSpec {
@ -111,14 +111,17 @@ object PerformanceSpec {
}
}
class PerformanceSpec extends ScalaTestWithActorTestKit(ConfigFactory.parseString(s"""
class PerformanceSpec
extends ScalaTestWithActorTestKit(
PersistenceTestKitPlugin.config
.withFallback(PersistenceTestKitSnapshotPlugin.config)
.withFallback(ConfigFactory.parseString(s"""
akka.persistence.publish-plugin-commands = on
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb"
akka.persistence.journal.leveldb.dir = "target/journal-PerformanceSpec"
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
akka.persistence.snapshot-store.local.dir = "target/snapshots-PerformanceSpec/"
akka.actor.testkit.typed.single-expect-default = 10s
""").withFallback(ConfigFactory.parseString(PerformanceSpec.config))) with AnyWordSpecLike with LogCapturing {
"""))
.withFallback(ConfigFactory.parseString(PerformanceSpec.config)))
with AnyWordSpecLike
with LogCapturing {
import PerformanceSpec._
@ -157,7 +160,7 @@ class PerformanceSpec extends ScalaTestWithActorTestKit(ConfigFactory.parseStrin
stressEventSourcedPersistentActor(None)
}
"have some reasonable throughput under failure conditions" in {
stressEventSourcedPersistentActor(Some(loadCycles / 10))
stressEventSourcedPersistentActor(Some((loadCycles / 10).toLong))
}
}
}

View file

@ -4,14 +4,13 @@
package akka.persistence.typed.scaladsl
import com.typesafe.config.ConfigFactory
import org.scalatest.wordspec.AnyWordSpecLike
import akka.actor.testkit.typed.scaladsl._
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.RecoveryCompleted
import com.typesafe.config.ConfigFactory
import org.scalatest.wordspec.AnyWordSpecLike
object PrimitiveStateSpec {

View file

@ -4,13 +4,13 @@
package akka.persistence.typed.scaladsl
import scala.concurrent.Future
import akka.persistence.{ SnapshotMetadata => ClassicSnapshotMetadata }
import akka.persistence.{ SnapshotSelectionCriteria => ClassicSnapshotSelectionCriteria }
import akka.persistence.SelectedSnapshot
import akka.persistence.snapshot.SnapshotStore
import akka.persistence.typed.scaladsl.SnapshotMutableStateSpec.MutableState
import akka.persistence.{ SnapshotMetadata => ClassicSnapshotMetadata }
import akka.persistence.{ SnapshotSelectionCriteria => ClassicSnapshotSelectionCriteria }
import scala.concurrent.Future
class SlowInMemorySnapshotStore extends SnapshotStore {

View file

@ -4,31 +4,28 @@
package akka.persistence.typed.scaladsl
import java.util.UUID
import java.util.concurrent.atomic.AtomicInteger
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import org.scalatest.wordspec.AnyWordSpecLike
import akka.actor.testkit.typed.scaladsl._
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.persistence.testkit.PersistenceTestKitPlugin
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.SnapshotCompleted
import akka.persistence.typed.SnapshotFailed
import akka.serialization.jackson.CborSerializable
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import org.scalatest.wordspec.AnyWordSpecLike
import java.util.concurrent.atomic.AtomicInteger
object SnapshotMutableStateSpec {
def conf: Config = ConfigFactory.parseString(s"""
def conf: Config = PersistenceTestKitPlugin.config.withFallback(ConfigFactory.parseString(s"""
akka.loglevel = INFO
akka.persistence.journal.leveldb.dir = "target/typed-persistence-${UUID.randomUUID().toString}"
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb"
akka.persistence.snapshot-store.plugin = "slow-snapshot-store"
slow-snapshot-store.class = "${classOf[SlowInMemorySnapshotStore].getName}"
""")
"""))
sealed trait Command extends CborSerializable
case object Increment extends Command

View file

@ -4,36 +4,34 @@
package akka.persistence.typed.scaladsl
import java.io.File
import java.util.UUID
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import org.apache.commons.io.FileUtils
import org.scalatest.wordspec.AnyWordSpecLike
import akka.actor.testkit.typed.scaladsl.LogCapturing
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.Behaviors
import akka.persistence.serialization.Snapshot
import akka.persistence.testkit.PersistenceTestKitPlugin
import akka.persistence.typed.PersistenceId
import akka.serialization.Serialization
import akka.serialization.SerializationExtension
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import org.apache.commons.io.FileUtils
import org.scalatest.wordspec.AnyWordSpecLike
import java.io.File
import java.util.UUID
object SnapshotRecoveryWithEmptyJournalSpec {
val survivingSnapshotPath = s"target/survivingSnapshotPath-${UUID.randomUUID().toString}"
def conf: Config = ConfigFactory.parseString(s"""
def conf: Config = PersistenceTestKitPlugin.config.withFallback(ConfigFactory.parseString(s"""
akka.loglevel = INFO
akka.persistence.journal.leveldb.dir = "target/typed-persistence-${UUID.randomUUID().toString}"
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb"
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
akka.persistence.snapshot-store.local.dir = "${SnapshotRecoveryWithEmptyJournalSpec.survivingSnapshotPath}"
akka.actor.allow-java-serialization = on
akka.actor.warn-about-java-serializer-usage = off
""")
"""))
object TestActor {
def apply(name: String, probe: ActorRef[Any]): Behavior[String] = {

View file

@ -123,7 +123,6 @@ private[akka] final class ReplayingEvents[C, E, S](
var eventForErrorReporting: OptionVal[Any] = OptionVal.None
try {
val eventSeq = setup.eventAdapter.fromJournal(repr.payload, repr.manifest)
def handleEvent(event: E): Unit = {
eventForErrorReporting = OptionVal.Some(event)
state = state.copy(seqNr = repr.sequenceNr, eventsReplayed = state.eventsReplayed + 1)

View file

@ -4,19 +4,19 @@
package akka.persistence.typed.scaladsl
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.annotation.nowarn
import akka.actor.typed.{ ActorRef, Behavior }
import akka.actor.typed.ActorRef
import akka.actor.typed.ActorSystem
import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.TimerScheduler
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.RecoveryCompleted
import scala.annotation.nowarn
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.duration._
// unused names in pattern match can be useful in the docs
@nowarn
object PersistentActorCompileOnlyTest {

View file

@ -36,6 +36,7 @@ import scala.concurrent.duration._
import akka.actor.testkit.typed.scaladsl.LogCapturing
object PersistentFsmToTypedMigrationSpec {
// cannot be moved to testkit journals as it requires sharing journal content across actor system instances
val config = ConfigFactory.parseString(s"""
akka.actor.allow-java-serialization = on
akka.persistence.journal.leveldb.dir = "target/typed-persistence-${UUID.randomUUID().toString}"

View file

@ -328,7 +328,12 @@ lazy val persistenceTestkit = akkaModule("akka-persistence-testkit")
.disablePlugins(MimaPlugin)
lazy val persistenceTypedTests = akkaModule("akka-persistence-typed-tests")
.dependsOn(persistenceTyped, persistenceTestkit % "test", actorTestkitTyped % "test", jackson % "test->test")
.dependsOn(
persistenceTyped,
persistenceTestkit % "test",
actorTestkitTyped % "test",
persistence % "test->test", // for SteppingInMemJournal
jackson % "test->test")
.settings(AkkaBuild.mayChangeSettings)
.settings(Dependencies.persistenceTypedTests)
.settings(javacOptions += "-parameters") // for Jackson