Split up large writes in the EventSourcedRememberEntitiesShardStore (#29233)
This commit is contained in:
parent
7f48be9ef8
commit
b2b594293d
3 changed files with 44 additions and 6 deletions
|
|
@ -148,6 +148,13 @@ akka.cluster.sharding {
|
|||
number-of-entities = 5
|
||||
}
|
||||
|
||||
event-sourced-remember-entities-store {
|
||||
# When using remember entities and the event sourced remember entities store the batches
|
||||
# written to the store are limited by this number to avoid getting a too large event for
|
||||
# the journal to handle. If using long persistence ids you may have to increase this.
|
||||
max-updates-per-write = 100
|
||||
}
|
||||
|
||||
# Settings for the coordinator singleton. Same layout as akka.cluster.singleton.
|
||||
# The "role" of the singleton configuration is not used. The singleton role will
|
||||
# be the same as "akka.cluster.sharding.role".
|
||||
|
|
|
|||
|
|
@ -72,6 +72,9 @@ private[akka] final class EventSourcedRememberEntitiesShardStore(
|
|||
import EventSourcedRememberEntitiesShardStore._
|
||||
import settings.tuningParameters._
|
||||
|
||||
private val maxUpdatesPerWrite = context.system.settings.config
|
||||
.getInt("akka.cluster.sharding.event-sourced-remember-entities-store.max-updates-per-write")
|
||||
|
||||
log.debug("Starting up EventSourcedRememberEntitiesStore")
|
||||
private var state = State()
|
||||
override def persistenceId = s"/sharding/${typeName}Shard/$shardId"
|
||||
|
|
@ -93,14 +96,23 @@ private[akka] final class EventSourcedRememberEntitiesShardStore(
|
|||
(if (started.nonEmpty) EntitiesStarted(started) :: Nil else Nil) :::
|
||||
(if (stopped.nonEmpty) EntitiesStopped(stopped) :: Nil else Nil)
|
||||
var left = events.size
|
||||
persistAll(events) { _ =>
|
||||
left -= 1
|
||||
if (left == 0) {
|
||||
sender() ! RememberEntitiesShardStore.UpdateDone(started, stopped)
|
||||
state.copy(state.entities.union(started).diff(stopped))
|
||||
saveSnapshotWhenNeeded()
|
||||
def persistEventsAndHandleComplete(evts: List[StateChange]): Unit = {
|
||||
persistAll(evts) { _ =>
|
||||
left -= 1
|
||||
if (left == 0) {
|
||||
sender() ! RememberEntitiesShardStore.UpdateDone(started, stopped)
|
||||
state.copy(state.entities.union(started).diff(stopped))
|
||||
saveSnapshotWhenNeeded()
|
||||
}
|
||||
}
|
||||
}
|
||||
if (left <= maxUpdatesPerWrite) {
|
||||
// optimized when batches are small
|
||||
persistEventsAndHandleComplete(events)
|
||||
} else {
|
||||
// split up in several writes so we don't hit journal limit
|
||||
events.grouped(maxUpdatesPerWrite).foreach(persistEventsAndHandleComplete)
|
||||
}
|
||||
|
||||
case RememberEntitiesShardStore.GetEntities =>
|
||||
sender() ! RememberEntitiesShardStore.RememberedEntities(state.entities)
|
||||
|
|
|
|||
|
|
@ -87,6 +87,25 @@ abstract class RememberEntitiesShardStoreSpec
|
|||
expectMsgType[RememberEntitiesShardStore.RememberedEntities].entities should ===(Set("1", "2", "4", "5")) // from previous test
|
||||
}
|
||||
|
||||
"handle a large batch" in {
|
||||
var store = system.actorOf(storeProps("FakeShardIdLarge", "FakeTypeNameLarge", shardingSettings))
|
||||
store ! RememberEntitiesShardStore.GetEntities
|
||||
expectMsgType[RememberEntitiesShardStore.RememberedEntities].entities should be(empty)
|
||||
|
||||
store ! RememberEntitiesShardStore.Update((1 to 1000).map(_.toString).toSet, (1001 to 2000).map(_.toString).toSet)
|
||||
val response = expectMsgType[RememberEntitiesShardStore.UpdateDone]
|
||||
response.started should have size (1000)
|
||||
response.stopped should have size (1000)
|
||||
|
||||
watch(store)
|
||||
system.stop(store)
|
||||
expectTerminated(store)
|
||||
|
||||
store = system.actorOf(storeProps("FakeShardIdLarge", "FakeTypeNameLarge", shardingSettings))
|
||||
store ! RememberEntitiesShardStore.GetEntities
|
||||
expectMsgType[RememberEntitiesShardStore.RememberedEntities].entities should have size (1000)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue