diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/OldCoordinatorStateMigrationEventAdapter.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/OldCoordinatorStateMigrationEventAdapter.scala new file mode 100644 index 0000000000..dd8e31d4bf --- /dev/null +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/OldCoordinatorStateMigrationEventAdapter.scala @@ -0,0 +1,34 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package akka.cluster.sharding + +import akka.annotation.InternalApi +import akka.cluster.sharding.ShardCoordinator.Internal.ShardHomeAllocated +import akka.persistence.journal.EventAdapter +import akka.persistence.journal.EventSeq + +/** + * Used for migrating from persistent state store mode to the new event sourced remember entities. No user API, + * used through configuration. See reference docs for details. + * + * INTERNAL API + */ +@InternalApi +final class OldCoordinatorStateMigrationEventAdapter extends EventAdapter { + override def manifest(event: Any): String = + "" + + override def toJournal(event: Any): Any = + event + + override def fromJournal(event: Any, manifest: String): EventSeq = { + event match { + case ShardHomeAllocated(shardId, _) => + EventSeq.single(shardId) + case _ => EventSeq.empty + } + + } +} diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala index 9d96a52f75..c724cb2a93 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala @@ -20,9 +20,9 @@ import akka.cluster.ddata.LWWRegisterKey import akka.cluster.ddata.Replicator._ import akka.cluster.ddata.SelfUniqueAddress import akka.cluster.sharding.ShardRegion.ShardId -import akka.cluster.sharding.internal.EventSourcedRememberShards.MigrationMarker +import akka.cluster.sharding.internal.EventSourcedRememberEntitiesCoordinatorStore.MigrationMarker import akka.cluster.sharding.internal.{ - EventSourcedRememberShards, + EventSourcedRememberEntitiesCoordinatorStore, RememberEntitiesCoordinatorStore, RememberEntitiesProvider } @@ -1012,7 +1012,7 @@ class PersistentShardCoordinator( override def snapshotPluginId: String = settings.snapshotPluginId override def receiveRecover: Receive = { - case MigrationMarker | SnapshotOffer(_, _: EventSourcedRememberShards.State) => + case MigrationMarker | SnapshotOffer(_, _: EventSourcedRememberEntitiesCoordinatorStore.State) => throw new IllegalStateException( "state-store is set to persistence but a migration has taken place to remember-entities-store=eventsourced. You can not downgrade.") case evt: DomainEvent => diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/EventSourcedRememberShards.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/EventSourcedRememberEntitiesCoordinatorStore.scala similarity index 81% rename from akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/EventSourcedRememberShards.scala rename to akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/EventSourcedRememberEntitiesCoordinatorStore.scala index 0e649a72e7..7a67709144 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/EventSourcedRememberShards.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/EventSourcedRememberEntitiesCoordinatorStore.scala @@ -4,14 +4,14 @@ package akka.cluster.sharding.internal -import akka.actor.{ ActorLogging, Props } +import akka.actor.ActorLogging +import akka.actor.Props import akka.annotation.InternalApi -import akka.cluster.sharding.{ ClusterShardingSerializable, ClusterShardingSettings } import akka.cluster.sharding.ShardCoordinator.Internal -import akka.cluster.sharding.ShardCoordinator.Internal.ShardHomeAllocated import akka.cluster.sharding.ShardRegion.ShardId +import akka.cluster.sharding.ClusterShardingSerializable +import akka.cluster.sharding.ClusterShardingSettings import akka.persistence._ -import akka.persistence.journal.{ EventAdapter, EventSeq } import scala.collection.mutable @@ -19,26 +19,9 @@ import scala.collection.mutable * INTERNAL API */ @InternalApi -private[akka] object EventSourcedRememberShards { +private[akka] object EventSourcedRememberEntitiesCoordinatorStore { def props(typeName: String, settings: ClusterShardingSettings): Props = - Props(new EventSourcedRememberShards(typeName, settings)) - - class FromOldCoordinatorState() extends EventAdapter { - override def manifest(event: Any): String = - "" - - override def toJournal(event: Any): Any = - event - - override def fromJournal(event: Any, manifest: String): EventSeq = { - event match { - case ShardHomeAllocated(shardId, _) => - EventSeq.single(shardId) - case _ => EventSeq.empty - } - - } - } + Props(new EventSourcedRememberEntitiesCoordinatorStore(typeName, settings)) case class State(shards: Set[ShardId], writtenMigrationMarker: Boolean = false) extends ClusterShardingSerializable @@ -49,11 +32,13 @@ private[akka] object EventSourcedRememberShards { * INTERNAL API */ @InternalApi -private[akka] final class EventSourcedRememberShards(typeName: String, settings: ClusterShardingSettings) +private[akka] final class EventSourcedRememberEntitiesCoordinatorStore( + typeName: String, + settings: ClusterShardingSettings) extends PersistentActor with ActorLogging { - import EventSourcedRememberShards._ + import EventSourcedRememberEntitiesCoordinatorStore._ // Uses the same persistence id as the old persistent coordinator so that the old data can be migrated // without any user action diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/EventSourcedRememberEntitiesProvider.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/EventSourcedRememberEntitiesProvider.scala new file mode 100644 index 0000000000..a237f09559 --- /dev/null +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/EventSourcedRememberEntitiesProvider.scala @@ -0,0 +1,28 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package akka.cluster.sharding.internal + +import akka.actor.Props +import akka.annotation.InternalApi +import akka.cluster.sharding.ClusterShardingSettings +import akka.cluster.sharding.ShardRegion.ShardId + +/** + * INTERNAL API + */ +@InternalApi +private[akka] final class EventSourcedRememberEntitiesProvider(typeName: String, settings: ClusterShardingSettings) + extends RememberEntitiesProvider { + + // this is backed by an actor using the same events, at the serialization level, as the now removed PersistentShard when state-store-mode=persistence + // new events can be added but the old events should continue to be handled + override def shardStoreProps(shardId: ShardId): Props = + EventSourcedRememberEntitiesShardStore.props(typeName, shardId, settings) + + // Note that this one is never used for the deprecated persistent state store mode, only when state store is ddata + // combined with eventsourced remember entities storage + override def coordinatorStoreProps(): Props = + EventSourcedRememberEntitiesCoordinatorStore.props(typeName, settings) +} diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/EventSourcedRememberEntities.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/EventSourcedRememberEntitiesShardStore.scala similarity index 81% rename from akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/EventSourcedRememberEntities.scala rename to akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/EventSourcedRememberEntitiesShardStore.scala index 682ce661d1..9eb108b5e6 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/EventSourcedRememberEntities.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/EventSourcedRememberEntitiesShardStore.scala @@ -11,7 +11,6 @@ import akka.cluster.sharding.ClusterShardingSerializable import akka.cluster.sharding.ClusterShardingSettings import akka.cluster.sharding.ShardRegion import akka.cluster.sharding.ShardRegion.EntityId -import akka.cluster.sharding.ShardRegion.ShardId import akka.persistence.DeleteMessagesFailure import akka.persistence.DeleteMessagesSuccess import akka.persistence.DeleteSnapshotsFailure @@ -27,25 +26,7 @@ import akka.persistence.SnapshotSelectionCriteria * INTERNAL API */ @InternalApi -private[akka] final class EventSourcedRememberEntitiesProvider(typeName: String, settings: ClusterShardingSettings) - extends RememberEntitiesProvider { - - // this is backed by an actor using the same events, at the serialization level, as the now removed PersistentShard when state-store-mode=persistence - // new events can be added but the old events should continue to be handled - override def shardStoreProps(shardId: ShardId): Props = - EventSourcedRememberEntitiesStore.props(typeName, shardId, settings) - - // Note that this one is never used for the deprecated persistent state store mode, only when state store is ddata - // combined with eventsourced remember entities storage - override def coordinatorStoreProps(): Props = - EventSourcedRememberShards.props(typeName, settings) -} - -/** - * INTERNAL API - * - */ -private[akka] object EventSourcedRememberEntitiesStore { +private[akka] object EventSourcedRememberEntitiesShardStore { /** * A case class which represents a state change for the Shard @@ -70,7 +51,7 @@ private[akka] object EventSourcedRememberEntitiesStore { final case class EntitiesStopped(entities: Set[EntityId]) extends StateChange def props(typeName: String, shardId: ShardRegion.ShardId, settings: ClusterShardingSettings): Props = - Props(new EventSourcedRememberEntitiesStore(typeName, shardId, settings)) + Props(new EventSourcedRememberEntitiesShardStore(typeName, shardId, settings)) } /** @@ -80,14 +61,15 @@ private[akka] object EventSourcedRememberEntitiesStore { * * @see [[ClusterSharding$ ClusterSharding extension]] */ -private[akka] final class EventSourcedRememberEntitiesStore( +@InternalApi +private[akka] final class EventSourcedRememberEntitiesShardStore( typeName: String, shardId: ShardRegion.ShardId, settings: ClusterShardingSettings) extends PersistentActor with ActorLogging { - import EventSourcedRememberEntitiesStore._ + import EventSourcedRememberEntitiesShardStore._ import settings.tuningParameters._ log.debug("Starting up EventSourcedRememberEntitiesStore") diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/protobuf/ClusterShardingMessageSerializer.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/protobuf/ClusterShardingMessageSerializer.scala index bcd285558c..aa8b361daf 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/protobuf/ClusterShardingMessageSerializer.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/protobuf/ClusterShardingMessageSerializer.scala @@ -23,9 +23,12 @@ import akka.cluster.sharding.ShardCoordinator import akka.cluster.sharding.ShardRegion._ import akka.cluster.sharding.protobuf.msg.{ ClusterShardingMessages => sm } import akka.cluster.sharding.protobuf.msg.ClusterShardingMessages -import akka.cluster.sharding.internal.EventSourcedRememberShards.{ MigrationMarker, State => RememberShardsState } -import akka.cluster.sharding.internal.EventSourcedRememberEntitiesStore.{ State => EntityState } -import akka.cluster.sharding.internal.EventSourcedRememberEntitiesStore.{ EntitiesStarted, EntitiesStopped } +import akka.cluster.sharding.internal.EventSourcedRememberEntitiesCoordinatorStore.{ + MigrationMarker, + State => RememberShardsState +} +import akka.cluster.sharding.internal.EventSourcedRememberEntitiesShardStore.{ State => EntityState } +import akka.cluster.sharding.internal.EventSourcedRememberEntitiesShardStore.{ EntitiesStarted, EntitiesStopped } import akka.protobufv3.internal.MessageLite import akka.serialization.BaseSerializer import akka.serialization.Serialization diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/PersistentShardingMigrationSpec.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/PersistentShardingMigrationSpec.scala index 62c5cc0c30..2421efa37b 100644 --- a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/PersistentShardingMigrationSpec.scala +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/PersistentShardingMigrationSpec.scala @@ -50,8 +50,7 @@ object PersistentShardingMigrationSpec { """) val configForNewMode = ConfigFactory - .parseString( - """ + .parseString(""" akka.cluster.sharding { remember-entities = on remember-entities-store = "eventsourced" @@ -60,7 +59,7 @@ object PersistentShardingMigrationSpec { akka.persistence.journal.leveldb { event-adapters { - coordinator-migration = "akka.cluster.sharding.internal.EventSourcedRememberShards$FromOldCoordinatorState" + coordinator-migration = "akka.cluster.sharding.OldCoordinatorStateMigrationEventAdapter" } event-adapter-bindings { diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/internal/DDataRememberEntitiesShardStoreSpec.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/internal/RememberEntitiesShardStoreSpec.scala similarity index 60% rename from akka-cluster-sharding/src/test/scala/akka/cluster/sharding/internal/DDataRememberEntitiesShardStoreSpec.scala rename to akka-cluster-sharding/src/test/scala/akka/cluster/sharding/internal/RememberEntitiesShardStoreSpec.scala index 1888fd6014..a33fe6a474 100644 --- a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/internal/DDataRememberEntitiesShardStoreSpec.scala +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/internal/RememberEntitiesShardStoreSpec.scala @@ -4,14 +4,19 @@ package akka.cluster.sharding.internal +import akka.actor.Props import akka.cluster.ddata.{ Replicator, ReplicatorSettings } import akka.cluster.sharding.ClusterShardingSettings +import akka.cluster.sharding.ShardRegion.ShardId import akka.cluster.{ Cluster, MemberStatus } import akka.testkit.{ AkkaSpec, ImplicitSender, WithLogCapturing } import com.typesafe.config.ConfigFactory import org.scalatest.wordspec.AnyWordSpecLike -object DDataRememberEntitiesShardStoreSpec { +/** + * Covers the interaction between the shard and the remember entities store + */ +object RememberEntitiesShardStoreSpec { def config = ConfigFactory.parseString(""" akka.loglevel=DEBUG akka.loggers = ["akka.testkit.SilenceAllTestEventListener"] @@ -22,16 +27,20 @@ object DDataRememberEntitiesShardStoreSpec { akka.cluster.sharding.remember-entities = on # no leaks between test runs thank you akka.cluster.sharding.distributed-data.durable.keys = [] + akka.persistence.journal.plugin = "akka.persistence.journal.inmem" """.stripMargin) } -// FIXME generalize to general test and cover both ddata and eventsourced -class DDataRememberEntitiesShardStoreSpec - extends AkkaSpec(DDataRememberEntitiesShardStoreSpec.config) +// shared base class for both persistence and ddata specs +abstract class RememberEntitiesShardStoreSpec + extends AkkaSpec(RememberEntitiesShardStoreSpec.config) with AnyWordSpecLike with ImplicitSender with WithLogCapturing { + def storeName: String + def storeProps(shardId: ShardId, typeName: String, settings: ClusterShardingSettings): Props + override def atStartup(): Unit = { // Form a one node cluster val cluster = Cluster(system) @@ -39,17 +48,13 @@ class DDataRememberEntitiesShardStoreSpec awaitAssert(cluster.readView.members.count(_.status == MemberStatus.Up) should ===(1)) } - "The DDataRememberEntitiesShardStore" must { - val replicatorSettings = ReplicatorSettings(system) - val replicator = system.actorOf(Replicator.props(replicatorSettings)) + s"The $storeName" must { val shardingSettings = ClusterShardingSettings(system) "store starts and stops and list remembered entity ids" in { - val store = system.actorOf( - DDataRememberEntitiesShardStore - .props("FakeShardId", "FakeTypeName", shardingSettings, replicator, majorityMinCap = 1)) + val store = system.actorOf(storeProps("FakeShardId", "FakeTypeName", shardingSettings)) store ! RememberEntitiesShardStore.GetEntities expectMsgType[RememberEntitiesShardStore.RememberedEntities].entities should be(empty) @@ -67,9 +72,7 @@ class DDataRememberEntitiesShardStoreSpec expectMsg(RememberEntitiesShardStore.UpdateDone(Set("2"), Set.empty)) // the store does not support get after update - val storeIncarnation2 = system.actorOf( - DDataRememberEntitiesShardStore - .props("FakeShardId", "FakeTypeName", shardingSettings, replicator, majorityMinCap = 1)) + val storeIncarnation2 = system.actorOf(storeProps("FakeShardId", "FakeTypeName", shardingSettings)) storeIncarnation2 ! RememberEntitiesShardStore.GetEntities expectMsgType[RememberEntitiesShardStore.RememberedEntities].entities should ===(Set("1", "2", "4", "5")) @@ -77,9 +80,7 @@ class DDataRememberEntitiesShardStoreSpec "handle a late request" in { // the store does not support get after update - val storeIncarnation3 = system.actorOf( - DDataRememberEntitiesShardStore - .props("FakeShardId", "FakeTypeName", shardingSettings, replicator, majorityMinCap = 1)) + val storeIncarnation3 = system.actorOf(storeProps("FakeShardId", "FakeTypeName", shardingSettings)) Thread.sleep(500) storeIncarnation3 ! RememberEntitiesShardStore.GetEntities @@ -89,3 +90,21 @@ class DDataRememberEntitiesShardStoreSpec } } + +class DDataRememberEntitiesShardStoreSpec extends RememberEntitiesShardStoreSpec { + + val replicatorSettings = ReplicatorSettings(system) + val replicator = system.actorOf(Replicator.props(replicatorSettings)) + + override def storeName: String = "DDataRememberEntitiesShardStore" + override def storeProps(shardId: ShardId, typeName: String, settings: ClusterShardingSettings): Props = + DDataRememberEntitiesShardStore.props(shardId, typeName, settings, replicator, majorityMinCap = 1) +} + +class EventSourcedRememberEntitiesShardStoreSpec extends RememberEntitiesShardStoreSpec { + + override def storeName: String = "EventSourcedRememberEntitiesShardStore" + override def storeProps(shardId: ShardId, typeName: String, settings: ClusterShardingSettings): Props = + EventSourcedRememberEntitiesShardStore.props(typeName, shardId, settings) + +} diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/protobuf/ClusterShardingMessageSerializerSpec.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/protobuf/ClusterShardingMessageSerializerSpec.scala index ad61a5fe82..58187feb8b 100644 --- a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/protobuf/ClusterShardingMessageSerializerSpec.scala +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/protobuf/ClusterShardingMessageSerializerSpec.scala @@ -12,8 +12,8 @@ import akka.cluster.sharding.Shard import akka.cluster.sharding.ShardCoordinator import akka.cluster.sharding.ShardRegion import akka.cluster.sharding.ShardRegion.ShardId -import akka.cluster.sharding.internal.EventSourcedRememberEntitiesStore -import akka.cluster.sharding.internal.EventSourcedRememberEntitiesStore.EntitiesStarted +import akka.cluster.sharding.internal.EventSourcedRememberEntitiesShardStore +import akka.cluster.sharding.internal.EventSourcedRememberEntitiesShardStore.EntitiesStarted import akka.serialization.SerializationExtension import akka.testkit.AkkaSpec @@ -71,12 +71,12 @@ class ClusterShardingMessageSerializerSpec extends AkkaSpec { } "be able to serialize PersistentShard snapshot state" in { - checkSerialization(EventSourcedRememberEntitiesStore.State(Set("e1", "e2", "e3"))) + checkSerialization(EventSourcedRememberEntitiesShardStore.State(Set("e1", "e2", "e3"))) } "be able to serialize PersistentShard domain events" in { - checkSerialization(EventSourcedRememberEntitiesStore.EntitiesStarted(Set("e1", "e2"))) - checkSerialization(EventSourcedRememberEntitiesStore.EntitiesStopped(Set("e1", "e2"))) + checkSerialization(EventSourcedRememberEntitiesShardStore.EntitiesStarted(Set("e1", "e2"))) + checkSerialization(EventSourcedRememberEntitiesShardStore.EntitiesStopped(Set("e1", "e2"))) } "be able to deserialize old entity started event into entities started" in { diff --git a/akka-docs/src/main/paradox/typed/cluster-sharding.md b/akka-docs/src/main/paradox/typed/cluster-sharding.md index 80b78a8551..f9c8cce3d7 100644 --- a/akka-docs/src/main/paradox/typed/cluster-sharding.md +++ b/akka-docs/src/main/paradox/typed/cluster-sharding.md @@ -394,9 +394,24 @@ If using remembered entities there are two migration options: * `ddata` for the state store and `ddata` for remembering entities. All remembered entities will be lost after a full cluster restart. * `ddata` for the state store and `eventsourced` for remembering entities. The new `eventsourced` remembering entities store - reads the data written by the old `persistence` mode. Your remembered entities will be remembered. + reads the data written by the old `persistence` mode. Your remembered entities will be remembered after a full cluster restart. -Once you have migrated you cannot go back to the old persistence store. +For migrating existing remembered entities an event adapter needs to be configured in the config for the journal you use in your `application.conf`. +In this example `leveldb` is the used journal: + +``` +akka.persistence.journal.leveldb { + event-adapters { + coordinator-migration = "akka.cluster.sharding.OldCoordinatorStateMigrationEventAdapter" + } + + event-adapter-bindings { + "akka.cluster.sharding.ShardCoordinator$Internal$DomainEvent" = coordinator-migration + } +} +``` + +Once you have migrated you cannot go back to the old persistence store, a rolling upgrade is therefore not possible. When @ref:[Distributed Data mode](#distributed-data-mode) is used the identifiers of the entities are stored in @ref:[Durable Storage](distributed-data.md#durable-storage) of Distributed Data. You may want to change the