Merge pull request #22996 from akka/wip-22994-remember-patriknw
fix remember entities tests, #22994
This commit is contained in:
commit
f528e67422
7 changed files with 109 additions and 27 deletions
|
|
@ -33,6 +33,7 @@ import akka.persistence.DeleteMessagesFailure
|
||||||
import akka.persistence.DeleteSnapshotsSuccess
|
import akka.persistence.DeleteSnapshotsSuccess
|
||||||
import akka.persistence.SnapshotSelectionCriteria
|
import akka.persistence.SnapshotSelectionCriteria
|
||||||
import akka.persistence.RecoveryCompleted
|
import akka.persistence.RecoveryCompleted
|
||||||
|
import akka.actor.NoSerializationVerificationNeeded
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* INTERNAL API
|
* INTERNAL API
|
||||||
|
|
@ -199,7 +200,7 @@ private[akka] class Shard(
|
||||||
}
|
}
|
||||||
|
|
||||||
def restartEntities(ids: Set[EntityId]): Unit = {
|
def restartEntities(ids: Set[EntityId]): Unit = {
|
||||||
context.actorOf(RememberEntityStarter.props(typeName, shardId, ids, settings, sender()))
|
context.actorOf(RememberEntityStarter.props(context.parent, typeName, shardId, ids, settings, sender()))
|
||||||
}
|
}
|
||||||
|
|
||||||
def receiveShardRegionCommand(msg: ShardRegionCommand): Unit = msg match {
|
def receiveShardRegionCommand(msg: ShardRegionCommand): Unit = msg match {
|
||||||
|
|
@ -342,31 +343,32 @@ private[akka] class Shard(
|
||||||
|
|
||||||
private[akka] object RememberEntityStarter {
|
private[akka] object RememberEntityStarter {
|
||||||
def props(
|
def props(
|
||||||
|
region: ActorRef,
|
||||||
typeName: String,
|
typeName: String,
|
||||||
shardId: ShardRegion.ShardId,
|
shardId: ShardRegion.ShardId,
|
||||||
ids: Set[ShardRegion.EntityId],
|
ids: Set[ShardRegion.EntityId],
|
||||||
settings: ClusterShardingSettings,
|
settings: ClusterShardingSettings,
|
||||||
requestor: ActorRef) =
|
requestor: ActorRef) =
|
||||||
Props(new RememberEntityStarter(typeName, shardId, ids, settings, requestor))
|
Props(new RememberEntityStarter(region, typeName, shardId, ids, settings, requestor))
|
||||||
|
|
||||||
|
private case object Tick extends NoSerializationVerificationNeeded
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* INTERNAL API: Actor responsible for starting entities when rememberEntities is enabled
|
* INTERNAL API: Actor responsible for starting entities when rememberEntities is enabled
|
||||||
*/
|
*/
|
||||||
private[akka] class RememberEntityStarter(
|
private[akka] class RememberEntityStarter(
|
||||||
|
region: ActorRef,
|
||||||
typeName: String,
|
typeName: String,
|
||||||
shardId: ShardRegion.ShardId,
|
shardId: ShardRegion.ShardId,
|
||||||
ids: Set[ShardRegion.EntityId],
|
ids: Set[ShardRegion.EntityId],
|
||||||
settings: ClusterShardingSettings,
|
settings: ClusterShardingSettings,
|
||||||
requestor: ActorRef
|
requestor: ActorRef) extends Actor with ActorLogging {
|
||||||
) extends Actor {
|
|
||||||
|
|
||||||
import context.dispatcher
|
import context.dispatcher
|
||||||
import scala.concurrent.duration._
|
import scala.concurrent.duration._
|
||||||
|
import RememberEntityStarter.Tick
|
||||||
|
|
||||||
case object Tick
|
|
||||||
|
|
||||||
val region = ClusterSharding(context.system).shardRegion(typeName)
|
|
||||||
var waitingForAck = ids
|
var waitingForAck = ids
|
||||||
|
|
||||||
sendStart(ids)
|
sendStart(ids)
|
||||||
|
|
|
||||||
|
|
@ -126,8 +126,13 @@ object ShardRegion {
|
||||||
*/
|
*/
|
||||||
override def entityMessage(message: Any): Any = message
|
override def entityMessage(message: Any): Any = message
|
||||||
|
|
||||||
override def shardId(message: Any): String =
|
override def shardId(message: Any): String = {
|
||||||
(math.abs(entityId(message).hashCode) % maxNumberOfShards).toString
|
val id = message match {
|
||||||
|
case ShardRegion.StartEntity(id) ⇒ id
|
||||||
|
case _ ⇒ entityId(message)
|
||||||
|
}
|
||||||
|
(math.abs(id.hashCode) % maxNumberOfShards).toString
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
sealed trait ShardRegionCommand
|
sealed trait ShardRegionCommand
|
||||||
|
|
|
||||||
|
|
@ -42,6 +42,7 @@ object ClusterShardingRememberEntitiesSpec {
|
||||||
|
|
||||||
val extractShardId: ShardRegion.ExtractShardId = msg ⇒ msg match {
|
val extractShardId: ShardRegion.ExtractShardId = msg ⇒ msg match {
|
||||||
case id: Int ⇒ id.toString
|
case id: Int ⇒ id.toString
|
||||||
|
case ShardRegion.StartEntity(id) ⇒ id
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
@ -73,10 +74,10 @@ abstract class ClusterShardingRememberEntitiesSpecConfig(val mode: String) exten
|
||||||
}
|
}
|
||||||
"""))
|
"""))
|
||||||
|
|
||||||
nodeConfig(first, second)(ConfigFactory.parseString(s"""
|
nodeConfig(third)(ConfigFactory.parseString(s"""
|
||||||
akka.cluster.sharding.distributed-data.durable.lmdb {
|
akka.cluster.sharding.distributed-data.durable.lmdb {
|
||||||
# use same directory for first and second node (not used at same time)
|
# use same directory when starting new node on third (not used at same time)
|
||||||
dir = target/ShardingRememberEntitiesSpec/sharding-first-second
|
dir = target/ShardingRememberEntitiesSpec/sharding-third
|
||||||
}
|
}
|
||||||
"""))
|
"""))
|
||||||
}
|
}
|
||||||
|
|
@ -127,10 +128,10 @@ abstract class ClusterShardingRememberEntitiesSpec(config: ClusterShardingRememb
|
||||||
|
|
||||||
val cluster = Cluster(system)
|
val cluster = Cluster(system)
|
||||||
|
|
||||||
def startSharding(): Unit = {
|
def startSharding(sys: ActorSystem = system, probe: ActorRef = testActor): Unit = {
|
||||||
ClusterSharding(system).start(
|
ClusterSharding(sys).start(
|
||||||
typeName = "Entity",
|
typeName = "Entity",
|
||||||
entityProps = ClusterShardingRememberEntitiesSpec.props(testActor),
|
entityProps = ClusterShardingRememberEntitiesSpec.props(probe),
|
||||||
settings = ClusterShardingSettings(system).withRememberEntities(true),
|
settings = ClusterShardingSettings(system).withRememberEntities(true),
|
||||||
extractEntityId = extractEntityId,
|
extractEntityId = extractEntityId,
|
||||||
extractShardId = extractShardId)
|
extractShardId = extractShardId)
|
||||||
|
|
@ -203,16 +204,27 @@ abstract class ClusterShardingRememberEntitiesSpec(config: ClusterShardingRememb
|
||||||
}
|
}
|
||||||
|
|
||||||
"start remembered entities in new cluster" in within(30.seconds) {
|
"start remembered entities in new cluster" in within(30.seconds) {
|
||||||
runOn(first) {
|
runOn(third) {
|
||||||
testConductor.exit(third, 0).await
|
watch(region)
|
||||||
|
Cluster(system).leave(Cluster(system).selfAddress)
|
||||||
|
expectTerminated(region)
|
||||||
|
awaitAssert {
|
||||||
|
Cluster(system).isTerminated should ===(true)
|
||||||
}
|
}
|
||||||
enterBarrier("crash-third")
|
|
||||||
|
|
||||||
// no nodes left of the original cluster, start a new cluster
|
// no nodes left of the original cluster, start a new cluster
|
||||||
join(first, first)
|
|
||||||
runOn(first) {
|
val sys2 = ActorSystem(system.name, system.settings.config)
|
||||||
startSharding()
|
val probe2 = TestProbe()(sys2)
|
||||||
expectMsgType[Started]
|
|
||||||
|
if (!isDdataMode) {
|
||||||
|
sys2.actorSelection(node(first) / "user" / "store").tell(Identify(None), probe2.ref)
|
||||||
|
val sharedStore = probe2.expectMsgType[ActorIdentity](10.seconds).ref.get
|
||||||
|
SharedLeveldbJournal.setStore(sharedStore, sys2)
|
||||||
|
}
|
||||||
|
|
||||||
|
Cluster(sys2).join(Cluster(sys2).selfAddress)
|
||||||
|
startSharding(sys2, probe2.ref)
|
||||||
|
probe2.expectMsgType[Started](20.seconds)
|
||||||
}
|
}
|
||||||
enterBarrier("after-3")
|
enterBarrier("after-3")
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -84,6 +84,7 @@ object ClusterShardingSpec {
|
||||||
val extractShardId: ShardRegion.ExtractShardId = {
|
val extractShardId: ShardRegion.ExtractShardId = {
|
||||||
case EntityEnvelope(id, _) ⇒ (id % numberOfShards).toString
|
case EntityEnvelope(id, _) ⇒ (id % numberOfShards).toString
|
||||||
case Get(id) ⇒ (id % numberOfShards).toString
|
case Get(id) ⇒ (id % numberOfShards).toString
|
||||||
|
case ShardRegion.StartEntity(id) ⇒ (id.toLong % numberOfShards).toString
|
||||||
}
|
}
|
||||||
|
|
||||||
def qualifiedCounterProps(typeName: String): Props =
|
def qualifiedCounterProps(typeName: String): Props =
|
||||||
|
|
@ -184,9 +185,24 @@ object ClusterShardingDocCode {
|
||||||
val extractShardId: ShardRegion.ExtractShardId = {
|
val extractShardId: ShardRegion.ExtractShardId = {
|
||||||
case EntityEnvelope(id, _) ⇒ (id % numberOfShards).toString
|
case EntityEnvelope(id, _) ⇒ (id % numberOfShards).toString
|
||||||
case Get(id) ⇒ (id % numberOfShards).toString
|
case Get(id) ⇒ (id % numberOfShards).toString
|
||||||
|
case ShardRegion.StartEntity(id) ⇒
|
||||||
|
// StartEntity is used by remembering entities feature
|
||||||
|
(id.toLong % numberOfShards).toString
|
||||||
}
|
}
|
||||||
//#counter-extractor
|
//#counter-extractor
|
||||||
|
|
||||||
|
{
|
||||||
|
//#extractShardId-StartEntity
|
||||||
|
val extractShardId: ShardRegion.ExtractShardId = {
|
||||||
|
case EntityEnvelope(id, _) ⇒ (id % numberOfShards).toString
|
||||||
|
case Get(id) ⇒ (id % numberOfShards).toString
|
||||||
|
case ShardRegion.StartEntity(id) ⇒
|
||||||
|
// StartEntity is used by remembering entities feature
|
||||||
|
(id.toLong % numberOfShards).toString
|
||||||
|
}
|
||||||
|
//#extractShardId-StartEntity
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
object PersistentClusterShardingSpecConfig extends ClusterShardingSpecConfig("persistence")
|
object PersistentClusterShardingSpecConfig extends ClusterShardingSpecConfig("persistence")
|
||||||
|
|
|
||||||
|
|
@ -93,6 +93,49 @@ public class ClusterShardingTest {
|
||||||
//#counter-supervisor-start
|
//#counter-supervisor-start
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void demonstrateUsage2() {
|
||||||
|
ShardRegion.MessageExtractor messageExtractor = new ShardRegion.MessageExtractor() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String entityId(Object message) {
|
||||||
|
if (message instanceof Counter.EntityEnvelope)
|
||||||
|
return String.valueOf(((Counter.EntityEnvelope) message).id);
|
||||||
|
else if (message instanceof Counter.Get)
|
||||||
|
return String.valueOf(((Counter.Get) message).counterId);
|
||||||
|
else
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Object entityMessage(Object message) {
|
||||||
|
if (message instanceof Counter.EntityEnvelope)
|
||||||
|
return ((Counter.EntityEnvelope) message).payload;
|
||||||
|
else
|
||||||
|
return message;
|
||||||
|
}
|
||||||
|
|
||||||
|
//#extractShardId-StartEntity
|
||||||
|
@Override
|
||||||
|
public String shardId(Object message) {
|
||||||
|
int numberOfShards = 100;
|
||||||
|
if (message instanceof Counter.EntityEnvelope) {
|
||||||
|
long id = ((Counter.EntityEnvelope) message).id;
|
||||||
|
return String.valueOf(id % numberOfShards);
|
||||||
|
} else if (message instanceof Counter.Get) {
|
||||||
|
long id = ((Counter.Get) message).counterId;
|
||||||
|
return String.valueOf(id % numberOfShards);
|
||||||
|
} else if (message instanceof ShardRegion.StartEntity) {
|
||||||
|
long id = Long.valueOf(((ShardRegion.StartEntity) message).entityId());
|
||||||
|
return String.valueOf(id % numberOfShards);
|
||||||
|
} else {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
//#extractShardId-StartEntity
|
||||||
|
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
static//#counter-actor
|
static//#counter-actor
|
||||||
public class Counter extends AbstractPersistentActor {
|
public class Counter extends AbstractPersistentActor {
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -271,6 +271,8 @@ the `rememberEntities` flag to true in `ClusterShardingSettings` when calling
|
||||||
`Shard.StartEntity(EntityId)` which implies that a `ShardId` must be possible to
|
`Shard.StartEntity(EntityId)` which implies that a `ShardId` must be possible to
|
||||||
extract from the `EntityId`.
|
extract from the `EntityId`.
|
||||||
|
|
||||||
|
@@snip [ClusterShardingTest.java]($akka$/akka-cluster-sharding/src/test/java/akka/cluster/sharding/ClusterShardingTest.java) { #extractShardId-StartEntity }
|
||||||
|
|
||||||
When configured to remember entities, whenever a `Shard` is rebalanced onto another
|
When configured to remember entities, whenever a `Shard` is rebalanced onto another
|
||||||
node or recovers after a crash it will recreate all the entities which were previously
|
node or recovers after a crash it will recreate all the entities which were previously
|
||||||
running in that `Shard`. To permanently stop entities, a `Passivate` message must be
|
running in that `Shard`. To permanently stop entities, a `Passivate` message must be
|
||||||
|
|
|
||||||
|
|
@ -274,6 +274,8 @@ the `rememberEntities` flag to true in `ClusterShardingSettings` when calling
|
||||||
`Shard.StartEntity(EntityId)` which implies that a `ShardId` must be possible to
|
`Shard.StartEntity(EntityId)` which implies that a `ShardId` must be possible to
|
||||||
extract from the `EntityId`.
|
extract from the `EntityId`.
|
||||||
|
|
||||||
|
@@snip [ClusterShardingSpec.scala]($akka$/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala) { #extractShardId-StartEntity }
|
||||||
|
|
||||||
When configured to remember entities, whenever a `Shard` is rebalanced onto another
|
When configured to remember entities, whenever a `Shard` is rebalanced onto another
|
||||||
node or recovers after a crash it will recreate all the entities which were previously
|
node or recovers after a crash it will recreate all the entities which were previously
|
||||||
running in that `Shard`. To permanently stop entities, a `Passivate` message must be
|
running in that `Shard`. To permanently stop entities, a `Passivate` message must be
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue