Tests for EventSourced remember entities shard store (#29177)

* Tests for EventSourced remember entities shard store
* Move the migration event adapter to a better place and mention in docs.
This commit is contained in:
Johan Andrén 2020-06-04 09:54:28 +02:00 committed by GitHub
parent eb923bbbe1
commit fb39ac1a9c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 145 additions and 80 deletions

View file

@ -0,0 +1,34 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.sharding
import akka.annotation.InternalApi
import akka.cluster.sharding.ShardCoordinator.Internal.ShardHomeAllocated
import akka.persistence.journal.EventAdapter
import akka.persistence.journal.EventSeq
/**
* Used for migrating from persistent state store mode to the new event sourced remember entities. No user API,
* used through configuration. See reference docs for details.
*
* INTERNAL API
*/
@InternalApi
final class OldCoordinatorStateMigrationEventAdapter extends EventAdapter {
override def manifest(event: Any): String =
""
override def toJournal(event: Any): Any =
event
override def fromJournal(event: Any, manifest: String): EventSeq = {
event match {
case ShardHomeAllocated(shardId, _) =>
EventSeq.single(shardId)
case _ => EventSeq.empty
}
}
}

View file

@ -20,9 +20,9 @@ import akka.cluster.ddata.LWWRegisterKey
import akka.cluster.ddata.Replicator._
import akka.cluster.ddata.SelfUniqueAddress
import akka.cluster.sharding.ShardRegion.ShardId
import akka.cluster.sharding.internal.EventSourcedRememberShards.MigrationMarker
import akka.cluster.sharding.internal.EventSourcedRememberEntitiesCoordinatorStore.MigrationMarker
import akka.cluster.sharding.internal.{
EventSourcedRememberShards,
EventSourcedRememberEntitiesCoordinatorStore,
RememberEntitiesCoordinatorStore,
RememberEntitiesProvider
}
@ -1012,7 +1012,7 @@ class PersistentShardCoordinator(
override def snapshotPluginId: String = settings.snapshotPluginId
override def receiveRecover: Receive = {
case MigrationMarker | SnapshotOffer(_, _: EventSourcedRememberShards.State) =>
case MigrationMarker | SnapshotOffer(_, _: EventSourcedRememberEntitiesCoordinatorStore.State) =>
throw new IllegalStateException(
"state-store is set to persistence but a migration has taken place to remember-entities-store=eventsourced. You can not downgrade.")
case evt: DomainEvent =>

View file

@ -4,14 +4,14 @@
package akka.cluster.sharding.internal
import akka.actor.{ ActorLogging, Props }
import akka.actor.ActorLogging
import akka.actor.Props
import akka.annotation.InternalApi
import akka.cluster.sharding.{ ClusterShardingSerializable, ClusterShardingSettings }
import akka.cluster.sharding.ShardCoordinator.Internal
import akka.cluster.sharding.ShardCoordinator.Internal.ShardHomeAllocated
import akka.cluster.sharding.ShardRegion.ShardId
import akka.cluster.sharding.ClusterShardingSerializable
import akka.cluster.sharding.ClusterShardingSettings
import akka.persistence._
import akka.persistence.journal.{ EventAdapter, EventSeq }
import scala.collection.mutable
@ -19,26 +19,9 @@ import scala.collection.mutable
* INTERNAL API
*/
@InternalApi
private[akka] object EventSourcedRememberShards {
private[akka] object EventSourcedRememberEntitiesCoordinatorStore {
def props(typeName: String, settings: ClusterShardingSettings): Props =
Props(new EventSourcedRememberShards(typeName, settings))
class FromOldCoordinatorState() extends EventAdapter {
override def manifest(event: Any): String =
""
override def toJournal(event: Any): Any =
event
override def fromJournal(event: Any, manifest: String): EventSeq = {
event match {
case ShardHomeAllocated(shardId, _) =>
EventSeq.single(shardId)
case _ => EventSeq.empty
}
}
}
Props(new EventSourcedRememberEntitiesCoordinatorStore(typeName, settings))
case class State(shards: Set[ShardId], writtenMigrationMarker: Boolean = false) extends ClusterShardingSerializable
@ -49,11 +32,13 @@ private[akka] object EventSourcedRememberShards {
* INTERNAL API
*/
@InternalApi
private[akka] final class EventSourcedRememberShards(typeName: String, settings: ClusterShardingSettings)
private[akka] final class EventSourcedRememberEntitiesCoordinatorStore(
typeName: String,
settings: ClusterShardingSettings)
extends PersistentActor
with ActorLogging {
import EventSourcedRememberShards._
import EventSourcedRememberEntitiesCoordinatorStore._
// Uses the same persistence id as the old persistent coordinator so that the old data can be migrated
// without any user action

View file

@ -0,0 +1,28 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.sharding.internal
import akka.actor.Props
import akka.annotation.InternalApi
import akka.cluster.sharding.ClusterShardingSettings
import akka.cluster.sharding.ShardRegion.ShardId
/**
* INTERNAL API
*/
@InternalApi
private[akka] final class EventSourcedRememberEntitiesProvider(typeName: String, settings: ClusterShardingSettings)
extends RememberEntitiesProvider {
// this is backed by an actor using the same events, at the serialization level, as the now removed PersistentShard when state-store-mode=persistence
// new events can be added but the old events should continue to be handled
override def shardStoreProps(shardId: ShardId): Props =
EventSourcedRememberEntitiesShardStore.props(typeName, shardId, settings)
// Note that this one is never used for the deprecated persistent state store mode, only when state store is ddata
// combined with eventsourced remember entities storage
override def coordinatorStoreProps(): Props =
EventSourcedRememberEntitiesCoordinatorStore.props(typeName, settings)
}

View file

@ -11,7 +11,6 @@ import akka.cluster.sharding.ClusterShardingSerializable
import akka.cluster.sharding.ClusterShardingSettings
import akka.cluster.sharding.ShardRegion
import akka.cluster.sharding.ShardRegion.EntityId
import akka.cluster.sharding.ShardRegion.ShardId
import akka.persistence.DeleteMessagesFailure
import akka.persistence.DeleteMessagesSuccess
import akka.persistence.DeleteSnapshotsFailure
@ -27,25 +26,7 @@ import akka.persistence.SnapshotSelectionCriteria
* INTERNAL API
*/
@InternalApi
private[akka] final class EventSourcedRememberEntitiesProvider(typeName: String, settings: ClusterShardingSettings)
extends RememberEntitiesProvider {
// this is backed by an actor using the same events, at the serialization level, as the now removed PersistentShard when state-store-mode=persistence
// new events can be added but the old events should continue to be handled
override def shardStoreProps(shardId: ShardId): Props =
EventSourcedRememberEntitiesStore.props(typeName, shardId, settings)
// Note that this one is never used for the deprecated persistent state store mode, only when state store is ddata
// combined with eventsourced remember entities storage
override def coordinatorStoreProps(): Props =
EventSourcedRememberShards.props(typeName, settings)
}
/**
* INTERNAL API
*
*/
private[akka] object EventSourcedRememberEntitiesStore {
private[akka] object EventSourcedRememberEntitiesShardStore {
/**
* A case class which represents a state change for the Shard
@ -70,7 +51,7 @@ private[akka] object EventSourcedRememberEntitiesStore {
final case class EntitiesStopped(entities: Set[EntityId]) extends StateChange
def props(typeName: String, shardId: ShardRegion.ShardId, settings: ClusterShardingSettings): Props =
Props(new EventSourcedRememberEntitiesStore(typeName, shardId, settings))
Props(new EventSourcedRememberEntitiesShardStore(typeName, shardId, settings))
}
/**
@ -80,14 +61,15 @@ private[akka] object EventSourcedRememberEntitiesStore {
*
* @see [[ClusterSharding$ ClusterSharding extension]]
*/
private[akka] final class EventSourcedRememberEntitiesStore(
@InternalApi
private[akka] final class EventSourcedRememberEntitiesShardStore(
typeName: String,
shardId: ShardRegion.ShardId,
settings: ClusterShardingSettings)
extends PersistentActor
with ActorLogging {
import EventSourcedRememberEntitiesStore._
import EventSourcedRememberEntitiesShardStore._
import settings.tuningParameters._
log.debug("Starting up EventSourcedRememberEntitiesStore")

View file

@ -23,9 +23,12 @@ import akka.cluster.sharding.ShardCoordinator
import akka.cluster.sharding.ShardRegion._
import akka.cluster.sharding.protobuf.msg.{ ClusterShardingMessages => sm }
import akka.cluster.sharding.protobuf.msg.ClusterShardingMessages
import akka.cluster.sharding.internal.EventSourcedRememberShards.{ MigrationMarker, State => RememberShardsState }
import akka.cluster.sharding.internal.EventSourcedRememberEntitiesStore.{ State => EntityState }
import akka.cluster.sharding.internal.EventSourcedRememberEntitiesStore.{ EntitiesStarted, EntitiesStopped }
import akka.cluster.sharding.internal.EventSourcedRememberEntitiesCoordinatorStore.{
MigrationMarker,
State => RememberShardsState
}
import akka.cluster.sharding.internal.EventSourcedRememberEntitiesShardStore.{ State => EntityState }
import akka.cluster.sharding.internal.EventSourcedRememberEntitiesShardStore.{ EntitiesStarted, EntitiesStopped }
import akka.protobufv3.internal.MessageLite
import akka.serialization.BaseSerializer
import akka.serialization.Serialization

View file

@ -50,8 +50,7 @@ object PersistentShardingMigrationSpec {
""")
val configForNewMode = ConfigFactory
.parseString(
"""
.parseString("""
akka.cluster.sharding {
remember-entities = on
remember-entities-store = "eventsourced"
@ -60,7 +59,7 @@ object PersistentShardingMigrationSpec {
akka.persistence.journal.leveldb {
event-adapters {
coordinator-migration = "akka.cluster.sharding.internal.EventSourcedRememberShards$FromOldCoordinatorState"
coordinator-migration = "akka.cluster.sharding.OldCoordinatorStateMigrationEventAdapter"
}
event-adapter-bindings {

View file

@ -4,14 +4,19 @@
package akka.cluster.sharding.internal
import akka.actor.Props
import akka.cluster.ddata.{ Replicator, ReplicatorSettings }
import akka.cluster.sharding.ClusterShardingSettings
import akka.cluster.sharding.ShardRegion.ShardId
import akka.cluster.{ Cluster, MemberStatus }
import akka.testkit.{ AkkaSpec, ImplicitSender, WithLogCapturing }
import com.typesafe.config.ConfigFactory
import org.scalatest.wordspec.AnyWordSpecLike
object DDataRememberEntitiesShardStoreSpec {
/**
* Covers the interaction between the shard and the remember entities store
*/
object RememberEntitiesShardStoreSpec {
def config = ConfigFactory.parseString("""
akka.loglevel=DEBUG
akka.loggers = ["akka.testkit.SilenceAllTestEventListener"]
@ -22,16 +27,20 @@ object DDataRememberEntitiesShardStoreSpec {
akka.cluster.sharding.remember-entities = on
# no leaks between test runs thank you
akka.cluster.sharding.distributed-data.durable.keys = []
akka.persistence.journal.plugin = "akka.persistence.journal.inmem"
""".stripMargin)
}
// FIXME generalize to general test and cover both ddata and eventsourced
class DDataRememberEntitiesShardStoreSpec
extends AkkaSpec(DDataRememberEntitiesShardStoreSpec.config)
// shared base class for both persistence and ddata specs
abstract class RememberEntitiesShardStoreSpec
extends AkkaSpec(RememberEntitiesShardStoreSpec.config)
with AnyWordSpecLike
with ImplicitSender
with WithLogCapturing {
def storeName: String
def storeProps(shardId: ShardId, typeName: String, settings: ClusterShardingSettings): Props
override def atStartup(): Unit = {
// Form a one node cluster
val cluster = Cluster(system)
@ -39,17 +48,13 @@ class DDataRememberEntitiesShardStoreSpec
awaitAssert(cluster.readView.members.count(_.status == MemberStatus.Up) should ===(1))
}
"The DDataRememberEntitiesShardStore" must {
val replicatorSettings = ReplicatorSettings(system)
val replicator = system.actorOf(Replicator.props(replicatorSettings))
s"The $storeName" must {
val shardingSettings = ClusterShardingSettings(system)
"store starts and stops and list remembered entity ids" in {
val store = system.actorOf(
DDataRememberEntitiesShardStore
.props("FakeShardId", "FakeTypeName", shardingSettings, replicator, majorityMinCap = 1))
val store = system.actorOf(storeProps("FakeShardId", "FakeTypeName", shardingSettings))
store ! RememberEntitiesShardStore.GetEntities
expectMsgType[RememberEntitiesShardStore.RememberedEntities].entities should be(empty)
@ -67,9 +72,7 @@ class DDataRememberEntitiesShardStoreSpec
expectMsg(RememberEntitiesShardStore.UpdateDone(Set("2"), Set.empty))
// the store does not support get after update
val storeIncarnation2 = system.actorOf(
DDataRememberEntitiesShardStore
.props("FakeShardId", "FakeTypeName", shardingSettings, replicator, majorityMinCap = 1))
val storeIncarnation2 = system.actorOf(storeProps("FakeShardId", "FakeTypeName", shardingSettings))
storeIncarnation2 ! RememberEntitiesShardStore.GetEntities
expectMsgType[RememberEntitiesShardStore.RememberedEntities].entities should ===(Set("1", "2", "4", "5"))
@ -77,9 +80,7 @@ class DDataRememberEntitiesShardStoreSpec
"handle a late request" in {
// the store does not support get after update
val storeIncarnation3 = system.actorOf(
DDataRememberEntitiesShardStore
.props("FakeShardId", "FakeTypeName", shardingSettings, replicator, majorityMinCap = 1))
val storeIncarnation3 = system.actorOf(storeProps("FakeShardId", "FakeTypeName", shardingSettings))
Thread.sleep(500)
storeIncarnation3 ! RememberEntitiesShardStore.GetEntities
@ -89,3 +90,21 @@ class DDataRememberEntitiesShardStoreSpec
}
}
class DDataRememberEntitiesShardStoreSpec extends RememberEntitiesShardStoreSpec {
val replicatorSettings = ReplicatorSettings(system)
val replicator = system.actorOf(Replicator.props(replicatorSettings))
override def storeName: String = "DDataRememberEntitiesShardStore"
override def storeProps(shardId: ShardId, typeName: String, settings: ClusterShardingSettings): Props =
DDataRememberEntitiesShardStore.props(shardId, typeName, settings, replicator, majorityMinCap = 1)
}
class EventSourcedRememberEntitiesShardStoreSpec extends RememberEntitiesShardStoreSpec {
override def storeName: String = "EventSourcedRememberEntitiesShardStore"
override def storeProps(shardId: ShardId, typeName: String, settings: ClusterShardingSettings): Props =
EventSourcedRememberEntitiesShardStore.props(typeName, shardId, settings)
}

View file

@ -12,8 +12,8 @@ import akka.cluster.sharding.Shard
import akka.cluster.sharding.ShardCoordinator
import akka.cluster.sharding.ShardRegion
import akka.cluster.sharding.ShardRegion.ShardId
import akka.cluster.sharding.internal.EventSourcedRememberEntitiesStore
import akka.cluster.sharding.internal.EventSourcedRememberEntitiesStore.EntitiesStarted
import akka.cluster.sharding.internal.EventSourcedRememberEntitiesShardStore
import akka.cluster.sharding.internal.EventSourcedRememberEntitiesShardStore.EntitiesStarted
import akka.serialization.SerializationExtension
import akka.testkit.AkkaSpec
@ -71,12 +71,12 @@ class ClusterShardingMessageSerializerSpec extends AkkaSpec {
}
"be able to serialize PersistentShard snapshot state" in {
checkSerialization(EventSourcedRememberEntitiesStore.State(Set("e1", "e2", "e3")))
checkSerialization(EventSourcedRememberEntitiesShardStore.State(Set("e1", "e2", "e3")))
}
"be able to serialize PersistentShard domain events" in {
checkSerialization(EventSourcedRememberEntitiesStore.EntitiesStarted(Set("e1", "e2")))
checkSerialization(EventSourcedRememberEntitiesStore.EntitiesStopped(Set("e1", "e2")))
checkSerialization(EventSourcedRememberEntitiesShardStore.EntitiesStarted(Set("e1", "e2")))
checkSerialization(EventSourcedRememberEntitiesShardStore.EntitiesStopped(Set("e1", "e2")))
}
"be able to deserialize old entity started event into entities started" in {

View file

@ -394,9 +394,24 @@ If using remembered entities there are two migration options:
* `ddata` for the state store and `ddata` for remembering entities. All remembered entities will be lost after a full cluster restart.
* `ddata` for the state store and `eventsourced` for remembering entities. The new `eventsourced` remembering entities store
reads the data written by the old `persistence` mode. Your remembered entities will be remembered.
reads the data written by the old `persistence` mode. Your remembered entities will be remembered after a full cluster restart.
Once you have migrated you cannot go back to the old persistence store.
For migrating existing remembered entities an event adapter needs to be configured in the config for the journal you use in your `application.conf`.
In this example `leveldb` is the used journal:
```
akka.persistence.journal.leveldb {
event-adapters {
coordinator-migration = "akka.cluster.sharding.OldCoordinatorStateMigrationEventAdapter"
}
event-adapter-bindings {
"akka.cluster.sharding.ShardCoordinator$Internal$DomainEvent" = coordinator-migration
}
}
```
Once you have migrated you cannot go back to the old persistence store, a rolling upgrade is therefore not possible.
When @ref:[Distributed Data mode](#distributed-data-mode) is used the identifiers of the entities are
stored in @ref:[Durable Storage](distributed-data.md#durable-storage) of Distributed Data. You may want to change the