From 33b34da36c8c3a9dacc48bc90f9f143f733729c6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Thu, 11 Jun 2020 15:52:46 +0200 Subject: [PATCH] Union and diff for sets are more performant --- .../src/main/scala/akka/cluster/sharding/Shard.scala | 2 +- .../sharding/internal/DDataRememberEntitiesShardStore.scala | 2 +- .../internal/EventSourcedRememberEntitiesShardStore.scala | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala index c7a6066a44..5cf7d0e6ed 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala @@ -198,7 +198,7 @@ private[akka] object Shard { else newState } else { if (r.ackTo.isEmpty) this - else RememberingStart(ackTo ++ r.ackTo) + else RememberingStart(ackTo.union(r.ackTo)) } case _ => throw new IllegalArgumentException(s"Transition from $this to $newState not allowed") } diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/DDataRememberEntitiesShardStore.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/DDataRememberEntitiesShardStore.scala index e04bda991e..6766f8d5db 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/DDataRememberEntitiesShardStore.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/DDataRememberEntitiesShardStore.scala @@ -178,7 +178,7 @@ private[akka] final class DDataRememberEntitiesShardStore( } private def onUpdate(update: RememberEntitiesShardStore.Update): Unit = { - val allEvts: Set[Evt] = (update.started.map(Started) ++ update.stopped.map(Stopped)) + val allEvts: Set[Evt] = (update.started.map(Started(_): Evt).union(update.stopped.map(Stopped))) // map from set of evts (for same ddata key) to one update that applies each of them val ddataUpdates: Map[Set[Evt], (Update[ORSet[EntityId]], Int)] = allEvts.groupBy(evt => key(evt.id)).map { diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/EventSourcedRememberEntitiesShardStore.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/EventSourcedRememberEntitiesShardStore.scala index 9eb108b5e6..c8bc240a80 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/EventSourcedRememberEntitiesShardStore.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/EventSourcedRememberEntitiesShardStore.scala @@ -80,7 +80,7 @@ private[akka] final class EventSourcedRememberEntitiesShardStore( override def receiveRecover: Receive = { case EntitiesStarted(ids) => state = state.copy(state.entities.union(ids)) - case EntitiesStopped(ids) => state = state.copy(state.entities -- ids) + case EntitiesStopped(ids) => state = state.copy(state.entities.diff(ids)) case SnapshotOffer(_, snapshot: State) => state = snapshot case RecoveryCompleted => log.debug("Recovery completed for shard [{}] with [{}] entities", shardId, state.entities.size) @@ -97,7 +97,7 @@ private[akka] final class EventSourcedRememberEntitiesShardStore( left -= 1 if (left == 0) { sender() ! RememberEntitiesShardStore.UpdateDone(started, stopped) - state.copy(state.entities.union(started) -- stopped) + state.copy(state.entities.union(started).diff(stopped)) saveSnapshotWhenNeeded() } }