fix remember entities tests, #22994

This commit is contained in:
Patrik Nordwall 2017-05-22 13:14:43 +02:00
parent 86aa42cf6c
commit 99a044b472
7 changed files with 109 additions and 27 deletions

View file

@ -33,6 +33,7 @@ import akka.persistence.DeleteMessagesFailure
import akka.persistence.DeleteSnapshotsSuccess import akka.persistence.DeleteSnapshotsSuccess
import akka.persistence.SnapshotSelectionCriteria import akka.persistence.SnapshotSelectionCriteria
import akka.persistence.RecoveryCompleted import akka.persistence.RecoveryCompleted
import akka.actor.NoSerializationVerificationNeeded
/** /**
* INTERNAL API * INTERNAL API
@ -199,7 +200,7 @@ private[akka] class Shard(
} }
def restartEntities(ids: Set[EntityId]): Unit = { def restartEntities(ids: Set[EntityId]): Unit = {
context.actorOf(RememberEntityStarter.props(typeName, shardId, ids, settings, sender())) context.actorOf(RememberEntityStarter.props(context.parent, typeName, shardId, ids, settings, sender()))
} }
def receiveShardRegionCommand(msg: ShardRegionCommand): Unit = msg match { def receiveShardRegionCommand(msg: ShardRegionCommand): Unit = msg match {
@ -342,31 +343,32 @@ private[akka] class Shard(
private[akka] object RememberEntityStarter { private[akka] object RememberEntityStarter {
def props( def props(
region: ActorRef,
typeName: String, typeName: String,
shardId: ShardRegion.ShardId, shardId: ShardRegion.ShardId,
ids: Set[ShardRegion.EntityId], ids: Set[ShardRegion.EntityId],
settings: ClusterShardingSettings, settings: ClusterShardingSettings,
requestor: ActorRef) = requestor: ActorRef) =
Props(new RememberEntityStarter(typeName, shardId, ids, settings, requestor)) Props(new RememberEntityStarter(region, typeName, shardId, ids, settings, requestor))
private case object Tick extends NoSerializationVerificationNeeded
} }
/** /**
* INTERNAL API: Actor responsible for starting entities when rememberEntities is enabled * INTERNAL API: Actor responsible for starting entities when rememberEntities is enabled
*/ */
private[akka] class RememberEntityStarter( private[akka] class RememberEntityStarter(
region: ActorRef,
typeName: String, typeName: String,
shardId: ShardRegion.ShardId, shardId: ShardRegion.ShardId,
ids: Set[ShardRegion.EntityId], ids: Set[ShardRegion.EntityId],
settings: ClusterShardingSettings, settings: ClusterShardingSettings,
requestor: ActorRef requestor: ActorRef) extends Actor with ActorLogging {
) extends Actor {
import context.dispatcher import context.dispatcher
import scala.concurrent.duration._ import scala.concurrent.duration._
import RememberEntityStarter.Tick
case object Tick
val region = ClusterSharding(context.system).shardRegion(typeName)
var waitingForAck = ids var waitingForAck = ids
sendStart(ids) sendStart(ids)

View file

@ -126,8 +126,13 @@ object ShardRegion {
*/ */
override def entityMessage(message: Any): Any = message override def entityMessage(message: Any): Any = message
override def shardId(message: Any): String = override def shardId(message: Any): String = {
(math.abs(entityId(message).hashCode) % maxNumberOfShards).toString val id = message match {
case ShardRegion.StartEntity(id) id
case _ entityId(message)
}
(math.abs(id.hashCode) % maxNumberOfShards).toString
}
} }
sealed trait ShardRegionCommand sealed trait ShardRegionCommand

View file

@ -41,7 +41,8 @@ object ClusterShardingRememberEntitiesSpec {
} }
val extractShardId: ShardRegion.ExtractShardId = msg msg match { val extractShardId: ShardRegion.ExtractShardId = msg msg match {
case id: Int id.toString case id: Int id.toString
case ShardRegion.StartEntity(id) id
} }
} }
@ -73,10 +74,10 @@ abstract class ClusterShardingRememberEntitiesSpecConfig(val mode: String) exten
} }
""")) """))
nodeConfig(first, second)(ConfigFactory.parseString(s""" nodeConfig(third)(ConfigFactory.parseString(s"""
akka.cluster.sharding.distributed-data.durable.lmdb { akka.cluster.sharding.distributed-data.durable.lmdb {
# use same directory for first and second node (not used at same time) # use same directory when starting new node on third (not used at same time)
dir = target/ShardingRememberEntitiesSpec/sharding-first-second dir = target/ShardingRememberEntitiesSpec/sharding-third
} }
""")) """))
} }
@ -127,10 +128,10 @@ abstract class ClusterShardingRememberEntitiesSpec(config: ClusterShardingRememb
val cluster = Cluster(system) val cluster = Cluster(system)
def startSharding(): Unit = { def startSharding(sys: ActorSystem = system, probe: ActorRef = testActor): Unit = {
ClusterSharding(system).start( ClusterSharding(sys).start(
typeName = "Entity", typeName = "Entity",
entityProps = ClusterShardingRememberEntitiesSpec.props(testActor), entityProps = ClusterShardingRememberEntitiesSpec.props(probe),
settings = ClusterShardingSettings(system).withRememberEntities(true), settings = ClusterShardingSettings(system).withRememberEntities(true),
extractEntityId = extractEntityId, extractEntityId = extractEntityId,
extractShardId = extractShardId) extractShardId = extractShardId)
@ -203,16 +204,27 @@ abstract class ClusterShardingRememberEntitiesSpec(config: ClusterShardingRememb
} }
"start remembered entities in new cluster" in within(30.seconds) { "start remembered entities in new cluster" in within(30.seconds) {
runOn(first) { runOn(third) {
testConductor.exit(third, 0).await watch(region)
} Cluster(system).leave(Cluster(system).selfAddress)
enterBarrier("crash-third") expectTerminated(region)
awaitAssert {
Cluster(system).isTerminated should ===(true)
}
// no nodes left of the original cluster, start a new cluster
// no nodes left of the original cluster, start a new cluster val sys2 = ActorSystem(system.name, system.settings.config)
join(first, first) val probe2 = TestProbe()(sys2)
runOn(first) {
startSharding() if (!isDdataMode) {
expectMsgType[Started] sys2.actorSelection(node(first) / "user" / "store").tell(Identify(None), probe2.ref)
val sharedStore = probe2.expectMsgType[ActorIdentity](10.seconds).ref.get
SharedLeveldbJournal.setStore(sharedStore, sys2)
}
Cluster(sys2).join(Cluster(sys2).selfAddress)
startSharding(sys2, probe2.ref)
probe2.expectMsgType[Started](20.seconds)
} }
enterBarrier("after-3") enterBarrier("after-3")
} }

View file

@ -82,8 +82,9 @@ object ClusterShardingSpec {
val numberOfShards = 12 val numberOfShards = 12
val extractShardId: ShardRegion.ExtractShardId = { val extractShardId: ShardRegion.ExtractShardId = {
case EntityEnvelope(id, _) (id % numberOfShards).toString case EntityEnvelope(id, _) (id % numberOfShards).toString
case Get(id) (id % numberOfShards).toString case Get(id) (id % numberOfShards).toString
case ShardRegion.StartEntity(id) (id.toLong % numberOfShards).toString
} }
def qualifiedCounterProps(typeName: String): Props = def qualifiedCounterProps(typeName: String): Props =
@ -184,9 +185,24 @@ object ClusterShardingDocCode {
val extractShardId: ShardRegion.ExtractShardId = { val extractShardId: ShardRegion.ExtractShardId = {
case EntityEnvelope(id, _) (id % numberOfShards).toString case EntityEnvelope(id, _) (id % numberOfShards).toString
case Get(id) (id % numberOfShards).toString case Get(id) (id % numberOfShards).toString
case ShardRegion.StartEntity(id)
// StartEntity is used by remembering entities feature
(id.toLong % numberOfShards).toString
} }
//#counter-extractor //#counter-extractor
{
//#extractShardId-StartEntity
val extractShardId: ShardRegion.ExtractShardId = {
case EntityEnvelope(id, _) (id % numberOfShards).toString
case Get(id) (id % numberOfShards).toString
case ShardRegion.StartEntity(id)
// StartEntity is used by remembering entities feature
(id.toLong % numberOfShards).toString
}
//#extractShardId-StartEntity
}
} }
object PersistentClusterShardingSpecConfig extends ClusterShardingSpecConfig("persistence") object PersistentClusterShardingSpecConfig extends ClusterShardingSpecConfig("persistence")

View file

@ -93,6 +93,49 @@ public class ClusterShardingTest {
//#counter-supervisor-start //#counter-supervisor-start
} }
public void demonstrateUsage2() {
ShardRegion.MessageExtractor messageExtractor = new ShardRegion.MessageExtractor() {
@Override
public String entityId(Object message) {
if (message instanceof Counter.EntityEnvelope)
return String.valueOf(((Counter.EntityEnvelope) message).id);
else if (message instanceof Counter.Get)
return String.valueOf(((Counter.Get) message).counterId);
else
return null;
}
@Override
public Object entityMessage(Object message) {
if (message instanceof Counter.EntityEnvelope)
return ((Counter.EntityEnvelope) message).payload;
else
return message;
}
//#extractShardId-StartEntity
@Override
public String shardId(Object message) {
int numberOfShards = 100;
if (message instanceof Counter.EntityEnvelope) {
long id = ((Counter.EntityEnvelope) message).id;
return String.valueOf(id % numberOfShards);
} else if (message instanceof Counter.Get) {
long id = ((Counter.Get) message).counterId;
return String.valueOf(id % numberOfShards);
} else if (message instanceof ShardRegion.StartEntity) {
long id = Long.valueOf(((ShardRegion.StartEntity) message).entityId());
return String.valueOf(id % numberOfShards);
} else {
return null;
}
}
//#extractShardId-StartEntity
};
}
static//#counter-actor static//#counter-actor
public class Counter extends AbstractPersistentActor { public class Counter extends AbstractPersistentActor {

View file

@ -271,6 +271,8 @@ the `rememberEntities` flag to true in `ClusterShardingSettings` when calling
`Shard.StartEntity(EntityId)` which implies that a `ShardId` must be possible to `Shard.StartEntity(EntityId)` which implies that a `ShardId` must be possible to
extract from the `EntityId`. extract from the `EntityId`.
@@snip [ClusterShardingTest.java]($akka$/akka-cluster-sharding/src/test/java/akka/cluster/sharding/ClusterShardingTest.java) { #extractShardId-StartEntity }
When configured to remember entities, whenever a `Shard` is rebalanced onto another When configured to remember entities, whenever a `Shard` is rebalanced onto another
node or recovers after a crash it will recreate all the entities which were previously node or recovers after a crash it will recreate all the entities which were previously
running in that `Shard`. To permanently stop entities, a `Passivate` message must be running in that `Shard`. To permanently stop entities, a `Passivate` message must be

View file

@ -274,6 +274,8 @@ the `rememberEntities` flag to true in `ClusterShardingSettings` when calling
`Shard.StartEntity(EntityId)` which implies that a `ShardId` must be possible to `Shard.StartEntity(EntityId)` which implies that a `ShardId` must be possible to
extract from the `EntityId`. extract from the `EntityId`.
@@snip [ClusterShardingSpec.scala]($akka$/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala) { #extractShardId-StartEntity }
When configured to remember entities, whenever a `Shard` is rebalanced onto another When configured to remember entities, whenever a `Shard` is rebalanced onto another
node or recovers after a crash it will recreate all the entities which were previously node or recovers after a crash it will recreate all the entities which were previously
running in that `Shard`. To permanently stop entities, a `Passivate` message must be running in that `Shard`. To permanently stop entities, a `Passivate` message must be