diff --git a/akka-cluster-sharding/src/main/resources/reference.conf b/akka-cluster-sharding/src/main/resources/reference.conf index bc4c763ef2..78cde1a77c 100644 --- a/akka-cluster-sharding/src/main/resources/reference.conf +++ b/akka-cluster-sharding/src/main/resources/reference.conf @@ -148,6 +148,13 @@ akka.cluster.sharding { number-of-entities = 5 } + event-sourced-remember-entities-store { + # When using remember entities and the event sourced remember entities store the batches + # written to the store are limited by this number to avoid getting a too large event for + # the journal to handle. If using long persistence ids you may have to increase this. + max-updates-per-write = 100 + } + # Settings for the coordinator singleton. Same layout as akka.cluster.singleton. # The "role" of the singleton configuration is not used. The singleton role will # be the same as "akka.cluster.sharding.role". diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/EventSourcedRememberEntitiesShardStore.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/EventSourcedRememberEntitiesShardStore.scala index c8bc240a80..5257c1c590 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/EventSourcedRememberEntitiesShardStore.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/EventSourcedRememberEntitiesShardStore.scala @@ -72,6 +72,9 @@ private[akka] final class EventSourcedRememberEntitiesShardStore( import EventSourcedRememberEntitiesShardStore._ import settings.tuningParameters._ + private val maxUpdatesPerWrite = context.system.settings.config + .getInt("akka.cluster.sharding.event-sourced-remember-entities-store.max-updates-per-write") + log.debug("Starting up EventSourcedRememberEntitiesStore") private var state = State() override def persistenceId = s"/sharding/${typeName}Shard/$shardId" @@ -93,14 +96,23 @@ private[akka] final class EventSourcedRememberEntitiesShardStore( (if (started.nonEmpty) EntitiesStarted(started) :: Nil else Nil) ::: (if (stopped.nonEmpty) EntitiesStopped(stopped) :: Nil else Nil) var left = events.size - persistAll(events) { _ => - left -= 1 - if (left == 0) { - sender() ! RememberEntitiesShardStore.UpdateDone(started, stopped) - state.copy(state.entities.union(started).diff(stopped)) - saveSnapshotWhenNeeded() + def persistEventsAndHandleComplete(evts: List[StateChange]): Unit = { + persistAll(evts) { _ => + left -= 1 + if (left == 0) { + sender() ! RememberEntitiesShardStore.UpdateDone(started, stopped) + state.copy(state.entities.union(started).diff(stopped)) + saveSnapshotWhenNeeded() + } } } + if (left <= maxUpdatesPerWrite) { + // optimized when batches are small + persistEventsAndHandleComplete(events) + } else { + // split up in several writes so we don't hit journal limit + events.grouped(maxUpdatesPerWrite).foreach(persistEventsAndHandleComplete) + } case RememberEntitiesShardStore.GetEntities => sender() ! RememberEntitiesShardStore.RememberedEntities(state.entities) diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/internal/RememberEntitiesShardStoreSpec.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/internal/RememberEntitiesShardStoreSpec.scala index a33fe6a474..8792137f3a 100644 --- a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/internal/RememberEntitiesShardStoreSpec.scala +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/internal/RememberEntitiesShardStoreSpec.scala @@ -87,6 +87,25 @@ abstract class RememberEntitiesShardStoreSpec expectMsgType[RememberEntitiesShardStore.RememberedEntities].entities should ===(Set("1", "2", "4", "5")) // from previous test } + "handle a large batch" in { + var store = system.actorOf(storeProps("FakeShardIdLarge", "FakeTypeNameLarge", shardingSettings)) + store ! RememberEntitiesShardStore.GetEntities + expectMsgType[RememberEntitiesShardStore.RememberedEntities].entities should be(empty) + + store ! RememberEntitiesShardStore.Update((1 to 1000).map(_.toString).toSet, (1001 to 2000).map(_.toString).toSet) + val response = expectMsgType[RememberEntitiesShardStore.UpdateDone] + response.started should have size (1000) + response.stopped should have size (1000) + + watch(store) + system.stop(store) + expectTerminated(store) + + store = system.actorOf(storeProps("FakeShardIdLarge", "FakeTypeNameLarge", shardingSettings)) + store ! RememberEntitiesShardStore.GetEntities + expectMsgType[RememberEntitiesShardStore.RememberedEntities].entities should have size (1000) + } + } }