diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala index 41af79ca98..61eb8eb2a1 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala @@ -33,6 +33,7 @@ import akka.persistence.DeleteMessagesFailure import akka.persistence.DeleteSnapshotsSuccess import akka.persistence.SnapshotSelectionCriteria import akka.persistence.RecoveryCompleted +import akka.actor.NoSerializationVerificationNeeded /** * INTERNAL API @@ -199,7 +200,7 @@ private[akka] class Shard( } def restartEntities(ids: Set[EntityId]): Unit = { - context.actorOf(RememberEntityStarter.props(typeName, shardId, ids, settings, sender())) + context.actorOf(RememberEntityStarter.props(context.parent, typeName, shardId, ids, settings, sender())) } def receiveShardRegionCommand(msg: ShardRegionCommand): Unit = msg match { @@ -342,31 +343,32 @@ private[akka] class Shard( private[akka] object RememberEntityStarter { def props( + region: ActorRef, typeName: String, shardId: ShardRegion.ShardId, ids: Set[ShardRegion.EntityId], settings: ClusterShardingSettings, requestor: ActorRef) = - Props(new RememberEntityStarter(typeName, shardId, ids, settings, requestor)) + Props(new RememberEntityStarter(region, typeName, shardId, ids, settings, requestor)) + + private case object Tick extends NoSerializationVerificationNeeded } /** * INTERNAL API: Actor responsible for starting entities when rememberEntities is enabled */ private[akka] class RememberEntityStarter( + region: ActorRef, typeName: String, shardId: ShardRegion.ShardId, ids: Set[ShardRegion.EntityId], settings: ClusterShardingSettings, - requestor: ActorRef -) extends Actor { + requestor: ActorRef) extends Actor with ActorLogging { import context.dispatcher import scala.concurrent.duration._ + import RememberEntityStarter.Tick - case object Tick - - val region = ClusterSharding(context.system).shardRegion(typeName) var waitingForAck = ids sendStart(ids) diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala index 27b807b8f8..3afe6746f7 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala @@ -126,8 +126,13 @@ object ShardRegion { */ override def entityMessage(message: Any): Any = message - override def shardId(message: Any): String = - (math.abs(entityId(message).hashCode) % maxNumberOfShards).toString + override def shardId(message: Any): String = { + val id = message match { + case ShardRegion.StartEntity(id) ⇒ id + case _ ⇒ entityId(message) + } + (math.abs(id.hashCode) % maxNumberOfShards).toString + } } sealed trait ShardRegionCommand diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesSpec.scala index 0516958a9f..c46f0e5150 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesSpec.scala @@ -41,7 +41,8 @@ object ClusterShardingRememberEntitiesSpec { } val extractShardId: ShardRegion.ExtractShardId = msg ⇒ msg match { - case id: Int ⇒ id.toString + case id: Int ⇒ id.toString + case ShardRegion.StartEntity(id) ⇒ id } } @@ -73,10 +74,10 @@ abstract class ClusterShardingRememberEntitiesSpecConfig(val mode: String) exten } """)) - nodeConfig(first, second)(ConfigFactory.parseString(s""" + nodeConfig(third)(ConfigFactory.parseString(s""" akka.cluster.sharding.distributed-data.durable.lmdb { - # use same directory for first and second node (not used at same time) - dir = target/ShardingRememberEntitiesSpec/sharding-first-second + # use same directory when starting new node on third (not used at same time) + dir = target/ShardingRememberEntitiesSpec/sharding-third } """)) } @@ -127,10 +128,10 @@ abstract class ClusterShardingRememberEntitiesSpec(config: ClusterShardingRememb val cluster = Cluster(system) - def startSharding(): Unit = { - ClusterSharding(system).start( + def startSharding(sys: ActorSystem = system, probe: ActorRef = testActor): Unit = { + ClusterSharding(sys).start( typeName = "Entity", - entityProps = ClusterShardingRememberEntitiesSpec.props(testActor), + entityProps = ClusterShardingRememberEntitiesSpec.props(probe), settings = ClusterShardingSettings(system).withRememberEntities(true), extractEntityId = extractEntityId, extractShardId = extractShardId) @@ -203,16 +204,27 @@ abstract class ClusterShardingRememberEntitiesSpec(config: ClusterShardingRememb } "start remembered entities in new cluster" in within(30.seconds) { - runOn(first) { - testConductor.exit(third, 0).await - } - enterBarrier("crash-third") + runOn(third) { + watch(region) + Cluster(system).leave(Cluster(system).selfAddress) + expectTerminated(region) + awaitAssert { + Cluster(system).isTerminated should ===(true) + } + // no nodes left of the original cluster, start a new cluster - // no nodes left of the original cluster, start a new cluster - join(first, first) - runOn(first) { - startSharding() - expectMsgType[Started] + val sys2 = ActorSystem(system.name, system.settings.config) + val probe2 = TestProbe()(sys2) + + if (!isDdataMode) { + sys2.actorSelection(node(first) / "user" / "store").tell(Identify(None), probe2.ref) + val sharedStore = probe2.expectMsgType[ActorIdentity](10.seconds).ref.get + SharedLeveldbJournal.setStore(sharedStore, sys2) + } + + Cluster(sys2).join(Cluster(sys2).selfAddress) + startSharding(sys2, probe2.ref) + probe2.expectMsgType[Started](20.seconds) } enterBarrier("after-3") } diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala index ce2410c6e3..72fc03f350 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala @@ -82,8 +82,9 @@ object ClusterShardingSpec { val numberOfShards = 12 val extractShardId: ShardRegion.ExtractShardId = { - case EntityEnvelope(id, _) ⇒ (id % numberOfShards).toString - case Get(id) ⇒ (id % numberOfShards).toString + case EntityEnvelope(id, _) ⇒ (id % numberOfShards).toString + case Get(id) ⇒ (id % numberOfShards).toString + case ShardRegion.StartEntity(id) ⇒ (id.toLong % numberOfShards).toString } def qualifiedCounterProps(typeName: String): Props = @@ -184,9 +185,24 @@ object ClusterShardingDocCode { val extractShardId: ShardRegion.ExtractShardId = { case EntityEnvelope(id, _) ⇒ (id % numberOfShards).toString case Get(id) ⇒ (id % numberOfShards).toString + case ShardRegion.StartEntity(id) ⇒ + // StartEntity is used by remembering entities feature + (id.toLong % numberOfShards).toString } //#counter-extractor + { + //#extractShardId-StartEntity + val extractShardId: ShardRegion.ExtractShardId = { + case EntityEnvelope(id, _) ⇒ (id % numberOfShards).toString + case Get(id) ⇒ (id % numberOfShards).toString + case ShardRegion.StartEntity(id) ⇒ + // StartEntity is used by remembering entities feature + (id.toLong % numberOfShards).toString + } + //#extractShardId-StartEntity + } + } object PersistentClusterShardingSpecConfig extends ClusterShardingSpecConfig("persistence") diff --git a/akka-cluster-sharding/src/test/java/akka/cluster/sharding/ClusterShardingTest.java b/akka-cluster-sharding/src/test/java/akka/cluster/sharding/ClusterShardingTest.java index 55bde97274..2206a8fa65 100644 --- a/akka-cluster-sharding/src/test/java/akka/cluster/sharding/ClusterShardingTest.java +++ b/akka-cluster-sharding/src/test/java/akka/cluster/sharding/ClusterShardingTest.java @@ -93,6 +93,49 @@ public class ClusterShardingTest { //#counter-supervisor-start } + public void demonstrateUsage2() { + ShardRegion.MessageExtractor messageExtractor = new ShardRegion.MessageExtractor() { + + @Override + public String entityId(Object message) { + if (message instanceof Counter.EntityEnvelope) + return String.valueOf(((Counter.EntityEnvelope) message).id); + else if (message instanceof Counter.Get) + return String.valueOf(((Counter.Get) message).counterId); + else + return null; + } + + @Override + public Object entityMessage(Object message) { + if (message instanceof Counter.EntityEnvelope) + return ((Counter.EntityEnvelope) message).payload; + else + return message; + } + + //#extractShardId-StartEntity + @Override + public String shardId(Object message) { + int numberOfShards = 100; + if (message instanceof Counter.EntityEnvelope) { + long id = ((Counter.EntityEnvelope) message).id; + return String.valueOf(id % numberOfShards); + } else if (message instanceof Counter.Get) { + long id = ((Counter.Get) message).counterId; + return String.valueOf(id % numberOfShards); + } else if (message instanceof ShardRegion.StartEntity) { + long id = Long.valueOf(((ShardRegion.StartEntity) message).entityId()); + return String.valueOf(id % numberOfShards); + } else { + return null; + } + } + //#extractShardId-StartEntity + + }; + } + static//#counter-actor public class Counter extends AbstractPersistentActor { diff --git a/akka-docs/src/main/paradox/java/cluster-sharding.md b/akka-docs/src/main/paradox/java/cluster-sharding.md index 5bfde4acd8..548c94bf9c 100644 --- a/akka-docs/src/main/paradox/java/cluster-sharding.md +++ b/akka-docs/src/main/paradox/java/cluster-sharding.md @@ -271,6 +271,8 @@ the `rememberEntities` flag to true in `ClusterShardingSettings` when calling `Shard.StartEntity(EntityId)` which implies that a `ShardId` must be possible to extract from the `EntityId`. +@@snip [ClusterShardingTest.java]($akka$/akka-cluster-sharding/src/test/java/akka/cluster/sharding/ClusterShardingTest.java) { #extractShardId-StartEntity } + When configured to remember entities, whenever a `Shard` is rebalanced onto another node or recovers after a crash it will recreate all the entities which were previously running in that `Shard`. To permanently stop entities, a `Passivate` message must be diff --git a/akka-docs/src/main/paradox/scala/cluster-sharding.md b/akka-docs/src/main/paradox/scala/cluster-sharding.md index 361d66530c..033c6a4602 100644 --- a/akka-docs/src/main/paradox/scala/cluster-sharding.md +++ b/akka-docs/src/main/paradox/scala/cluster-sharding.md @@ -274,6 +274,8 @@ the `rememberEntities` flag to true in `ClusterShardingSettings` when calling `Shard.StartEntity(EntityId)` which implies that a `ShardId` must be possible to extract from the `EntityId`. +@@snip [ClusterShardingSpec.scala]($akka$/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala) { #extractShardId-StartEntity } + When configured to remember entities, whenever a `Shard` is rebalanced onto another node or recovers after a crash it will recreate all the entities which were previously running in that `Shard`. To permanently stop entities, a `Passivate` message must be