remember entities and changing shardIdExtractor (#22894)
* Test case covering changing shard id extractor with remember-entities * This should do the trick * Feedback addressed * Docs and migration guide mention * Correct logic to persist that entity has moved off off shard
This commit is contained in:
parent
601024dfe0
commit
86aa42cf6c
11 changed files with 1648 additions and 35 deletions
File diff suppressed because it is too large
Load diff
|
|
@ -52,3 +52,12 @@ message ShardStats {
|
|||
required string shard = 1;
|
||||
required int32 entityCount = 2;
|
||||
}
|
||||
|
||||
message StartEntity {
|
||||
required string entityId = 1;
|
||||
}
|
||||
|
||||
message StartEntityAck {
|
||||
required string entityId = 1;
|
||||
required string shardId = 2;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -47,7 +47,7 @@ private[akka] object Shard {
|
|||
sealed trait ShardCommand
|
||||
|
||||
/**
|
||||
* When an remembering entities and the entity stops without issuing a `Passivate`, we
|
||||
* When remembering entities and the entity stops without issuing a `Passivate`, we
|
||||
* restart it after a back off using this message.
|
||||
*/
|
||||
final case class RestartEntity(entity: EntityId) extends ShardCommand
|
||||
|
|
@ -170,6 +170,8 @@ private[akka] class Shard(
|
|||
case Terminated(ref) ⇒ receiveTerminated(ref)
|
||||
case msg: CoordinatorMessage ⇒ receiveCoordinatorMessage(msg)
|
||||
case msg: ShardCommand ⇒ receiveShardCommand(msg)
|
||||
case msg: ShardRegion.StartEntity ⇒ receiveStartEntity(msg)
|
||||
case msg: ShardRegion.StartEntityAck ⇒ receiveStartEntityAck(msg)
|
||||
case msg: ShardRegionCommand ⇒ receiveShardRegionCommand(msg)
|
||||
case msg: ShardQuery ⇒ receiveShardQuery(msg)
|
||||
case msg if extractEntityId.isDefinedAt(msg) ⇒ deliverMessage(msg, sender())
|
||||
|
|
@ -177,7 +179,27 @@ private[akka] class Shard(
|
|||
|
||||
def receiveShardCommand(msg: ShardCommand): Unit = msg match {
|
||||
case RestartEntity(id) ⇒ getEntity(id)
|
||||
case RestartEntities(ids) ⇒ ids foreach getEntity
|
||||
case RestartEntities(ids) ⇒ restartEntities(ids)
|
||||
}
|
||||
|
||||
def receiveStartEntity(start: ShardRegion.StartEntity): Unit = {
|
||||
log.debug("Got a request from [{}] to start entity [{}] in shard [{}]", sender(), start.entityId, shardId)
|
||||
getEntity(start.entityId)
|
||||
sender() ! ShardRegion.StartEntityAck(start.entityId, shardId)
|
||||
}
|
||||
|
||||
def receiveStartEntityAck(ack: ShardRegion.StartEntityAck): Unit = {
|
||||
if (ack.shardId != shardId && state.entities.contains(ack.entityId)) {
|
||||
log.debug("Entity [{}] previously owned by shard [{}] started in shard [{}]", ack.entityId, shardId, ack.shardId)
|
||||
processChange(EntityStopped(ack.entityId)) { _ ⇒
|
||||
state = state.copy(state.entities - ack.entityId)
|
||||
messageBuffers.remove(ack.entityId)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def restartEntities(ids: Set[EntityId]): Unit = {
|
||||
context.actorOf(RememberEntityStarter.props(typeName, shardId, ids, settings, sender()))
|
||||
}
|
||||
|
||||
def receiveShardRegionCommand(msg: ShardRegionCommand): Unit = msg match {
|
||||
|
|
@ -318,6 +340,63 @@ private[akka] class Shard(
|
|||
}
|
||||
}
|
||||
|
||||
private[akka] object RememberEntityStarter {
|
||||
def props(
|
||||
typeName: String,
|
||||
shardId: ShardRegion.ShardId,
|
||||
ids: Set[ShardRegion.EntityId],
|
||||
settings: ClusterShardingSettings,
|
||||
requestor: ActorRef) =
|
||||
Props(new RememberEntityStarter(typeName, shardId, ids, settings, requestor))
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API: Actor responsible for starting entities when rememberEntities is enabled
|
||||
*/
|
||||
private[akka] class RememberEntityStarter(
|
||||
typeName: String,
|
||||
shardId: ShardRegion.ShardId,
|
||||
ids: Set[ShardRegion.EntityId],
|
||||
settings: ClusterShardingSettings,
|
||||
requestor: ActorRef
|
||||
) extends Actor {
|
||||
|
||||
import context.dispatcher
|
||||
import scala.concurrent.duration._
|
||||
|
||||
case object Tick
|
||||
|
||||
val region = ClusterSharding(context.system).shardRegion(typeName)
|
||||
var waitingForAck = ids
|
||||
|
||||
sendStart(ids)
|
||||
|
||||
val tickTask = {
|
||||
val resendInterval = settings.tuningParameters.retryInterval
|
||||
context.system.scheduler.schedule(resendInterval, resendInterval, self, Tick)
|
||||
}
|
||||
|
||||
def sendStart(ids: Set[ShardRegion.EntityId]): Unit = {
|
||||
ids.foreach(id ⇒ region ! ShardRegion.StartEntity(id))
|
||||
}
|
||||
|
||||
override def receive = {
|
||||
case ack: ShardRegion.StartEntityAck ⇒
|
||||
waitingForAck -= ack.entityId
|
||||
// inform whoever requested the start that it happened
|
||||
requestor ! ack
|
||||
if (waitingForAck.isEmpty) context.stop(self)
|
||||
|
||||
case Tick ⇒
|
||||
sendStart(waitingForAck)
|
||||
|
||||
}
|
||||
|
||||
override def postStop(): Unit = {
|
||||
tickTask.cancel()
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API: Common things for PersistentShard and DDataShard
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@
|
|||
package akka.cluster.sharding
|
||||
|
||||
import java.net.URLEncoder
|
||||
|
||||
import akka.pattern.AskTimeoutException
|
||||
import akka.util.{ MessageBufferMap, Timeout }
|
||||
import akka.pattern.{ ask, pipe }
|
||||
|
|
@ -12,6 +13,7 @@ import akka.cluster.Cluster
|
|||
import akka.cluster.ClusterEvent._
|
||||
import akka.cluster.Member
|
||||
import akka.cluster.MemberStatus
|
||||
|
||||
import scala.collection.immutable
|
||||
import scala.concurrent.duration._
|
||||
import scala.concurrent.Future
|
||||
|
|
@ -300,6 +302,19 @@ object ShardRegion {
|
|||
*/
|
||||
private final case class RestartShard(shardId: ShardId)
|
||||
|
||||
/**
|
||||
* When remembering entities and a shard is started, each entity id that needs to
|
||||
* be running will trigger this message being sent through sharding. For this to work
|
||||
* the message *must* be handled by the shard id extractor.
|
||||
*/
|
||||
final case class StartEntity(entityId: EntityId) extends ClusterShardingSerializable
|
||||
|
||||
/**
|
||||
* Sent back when a `ShardRegion.StartEntity` message was received and triggered the entity
|
||||
* to start (it does not guarantee the entity successfully started)
|
||||
*/
|
||||
final case class StartEntityAck(entityId: EntityId, shardId: ShardRegion.ShardId) extends ClusterShardingSerializable
|
||||
|
||||
private def roleOption(role: String): Option[String] =
|
||||
if (role == "") None else Option(role)
|
||||
|
||||
|
|
@ -438,6 +453,7 @@ private[akka] class ShardRegion(
|
|||
case cmd: ShardRegionCommand ⇒ receiveCommand(cmd)
|
||||
case query: ShardRegionQuery ⇒ receiveQuery(query)
|
||||
case msg: RestartShard ⇒ deliverMessage(msg, sender())
|
||||
case msg: StartEntity ⇒ deliverStartEntity(msg, sender())
|
||||
case msg if extractEntityId.isDefinedAt(msg) ⇒ deliverMessage(msg, sender())
|
||||
case unknownMsg ⇒ log.warning("Message does not have an extractor defined in shard [{}] so it was ignored: {}", typeName, unknownMsg)
|
||||
}
|
||||
|
|
@ -699,6 +715,15 @@ private[akka] class ShardRegion(
|
|||
retryCount = 0
|
||||
}
|
||||
|
||||
def deliverStartEntity(msg: StartEntity, snd: ActorRef): Unit = {
|
||||
try {
|
||||
deliverMessage(msg, snd)
|
||||
} catch {
|
||||
case ex: MatchError ⇒
|
||||
log.error(ex, "When using remember-entities the shard id extractor must handle ShardRegion.StartEntity(id).")
|
||||
}
|
||||
}
|
||||
|
||||
def deliverMessage(msg: Any, snd: ActorRef): Unit =
|
||||
msg match {
|
||||
case RestartShard(shardId) ⇒
|
||||
|
|
|
|||
|
|
@ -10,7 +10,6 @@ import java.util.zip.GZIPOutputStream
|
|||
import scala.annotation.tailrec
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.collection.breakOut
|
||||
|
||||
import akka.actor.ActorRef
|
||||
import akka.actor.ExtendedActorSystem
|
||||
import akka.cluster.sharding.Shard
|
||||
|
|
@ -18,11 +17,12 @@ import akka.cluster.sharding.ShardCoordinator
|
|||
import akka.cluster.sharding.protobuf.msg.{ ClusterShardingMessages ⇒ sm }
|
||||
import akka.serialization.BaseSerializer
|
||||
import akka.serialization.Serialization
|
||||
import akka.serialization.SerializationExtension
|
||||
import akka.serialization.SerializerWithStringManifest
|
||||
import akka.protobuf.MessageLite
|
||||
import java.io.NotSerializableException
|
||||
|
||||
import akka.cluster.sharding.ShardRegion.{ StartEntity, StartEntityAck }
|
||||
|
||||
/**
|
||||
* INTERNAL API: Protobuf serializer of ClusterSharding messages.
|
||||
*/
|
||||
|
|
@ -59,6 +59,9 @@ private[akka] class ClusterShardingMessageSerializer(val system: ExtendedActorSy
|
|||
private val EntityStartedManifest = "CB"
|
||||
private val EntityStoppedManifest = "CD"
|
||||
|
||||
private val StartEntityManifest = "EA"
|
||||
private val StartEntityAckManifest = "EB"
|
||||
|
||||
private val GetShardStatsManifest = "DA"
|
||||
private val ShardStatsManifest = "DB"
|
||||
|
||||
|
|
@ -89,7 +92,11 @@ private[akka] class ClusterShardingMessageSerializer(val system: ExtendedActorSy
|
|||
GracefulShutdownReqManifest → { bytes ⇒ GracefulShutdownReq(actorRefMessageFromBinary(bytes)) },
|
||||
|
||||
GetShardStatsManifest → { bytes ⇒ GetShardStats },
|
||||
ShardStatsManifest → { bytes ⇒ shardStatsFromBinary(bytes) })
|
||||
ShardStatsManifest → { bytes ⇒ shardStatsFromBinary(bytes) },
|
||||
|
||||
StartEntityManifest → { startEntityFromBinary(_) },
|
||||
StartEntityAckManifest → { startEntityAckFromBinary(_) }
|
||||
)
|
||||
|
||||
override def manifest(obj: AnyRef): String = obj match {
|
||||
case _: EntityState ⇒ EntityStateManifest
|
||||
|
|
@ -117,6 +124,9 @@ private[akka] class ClusterShardingMessageSerializer(val system: ExtendedActorSy
|
|||
case _: ShardStopped ⇒ ShardStoppedManifest
|
||||
case _: GracefulShutdownReq ⇒ GracefulShutdownReqManifest
|
||||
|
||||
case _: StartEntity ⇒ StartEntityManifest
|
||||
case _: StartEntityAck ⇒ StartEntityAckManifest
|
||||
|
||||
case GetShardStats ⇒ GetShardStatsManifest
|
||||
case _: ShardStats ⇒ ShardStatsManifest
|
||||
case _ ⇒
|
||||
|
|
@ -146,12 +156,15 @@ private[akka] class ClusterShardingMessageSerializer(val system: ExtendedActorSy
|
|||
case GracefulShutdownReq(ref) ⇒
|
||||
actorRefMessageToProto(ref).toByteArray
|
||||
|
||||
case m: EntityState ⇒ entityStateToProto(m).toByteArray
|
||||
case m: EntityStarted ⇒ entityStartedToProto(m).toByteArray
|
||||
case m: EntityStopped ⇒ entityStoppedToProto(m).toByteArray
|
||||
case m: EntityState ⇒ entityStateToProto(m).toByteArray
|
||||
case m: EntityStarted ⇒ entityStartedToProto(m).toByteArray
|
||||
case m: EntityStopped ⇒ entityStoppedToProto(m).toByteArray
|
||||
|
||||
case GetShardStats ⇒ Array.emptyByteArray
|
||||
case m: ShardStats ⇒ shardStatsToProto(m).toByteArray
|
||||
case s: StartEntity ⇒ startEntityToByteArray(s)
|
||||
case s: StartEntityAck ⇒ startEntityAckToByteArray(s)
|
||||
|
||||
case GetShardStats ⇒ Array.emptyByteArray
|
||||
case m: ShardStats ⇒ shardStatsToProto(m).toByteArray
|
||||
|
||||
case _ ⇒
|
||||
throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass} in [${getClass.getName}]")
|
||||
|
|
@ -266,6 +279,29 @@ private[akka] class ClusterShardingMessageSerializer(val system: ExtendedActorSy
|
|||
ShardStats(parsed.getShard, parsed.getEntityCount)
|
||||
}
|
||||
|
||||
private def startEntityToByteArray(s: StartEntity): Array[Byte] = {
|
||||
val builder = sm.StartEntity.newBuilder()
|
||||
builder.setEntityId(s.entityId)
|
||||
builder.build().toByteArray
|
||||
}
|
||||
|
||||
private def startEntityFromBinary(bytes: Array[Byte]): StartEntity = {
|
||||
val se = sm.StartEntity.parseFrom(bytes)
|
||||
StartEntity(se.getEntityId)
|
||||
}
|
||||
|
||||
private def startEntityAckToByteArray(s: StartEntityAck): Array[Byte] = {
|
||||
val builder = sm.StartEntityAck.newBuilder()
|
||||
builder.setEntityId(s.entityId)
|
||||
builder.setShardId(s.shardId)
|
||||
builder.build().toByteArray
|
||||
}
|
||||
|
||||
private def startEntityAckFromBinary(bytes: Array[Byte]): StartEntityAck = {
|
||||
val sea = sm.StartEntityAck.parseFrom(bytes)
|
||||
StartEntityAck(sea.getEntityId, sea.getShardId)
|
||||
}
|
||||
|
||||
private def resolveActorRef(path: String): ActorRef = {
|
||||
system.provider.resolveActorRef(path)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,296 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
|
||||
*/
|
||||
package akka.cluster.sharding
|
||||
|
||||
import java.io.File
|
||||
|
||||
import akka.actor._
|
||||
import akka.cluster.{ Cluster, MemberStatus }
|
||||
import akka.persistence.Persistence
|
||||
import akka.persistence.journal.leveldb.{ SharedLeveldbJournal, SharedLeveldbStore }
|
||||
import akka.remote.testconductor.RoleName
|
||||
import akka.remote.testkit.{ MultiNodeConfig, MultiNodeSpec, STMultiNodeSpec }
|
||||
import akka.testkit._
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import org.apache.commons.io.FileUtils
|
||||
|
||||
import scala.concurrent.duration._
|
||||
|
||||
object ClusterShardingRememberEntitiesNewExtractorSpec {
|
||||
|
||||
final case class Started(ref: ActorRef)
|
||||
|
||||
def props(probe: Option[ActorRef]): Props = Props(new TestEntity(probe))
|
||||
|
||||
class TestEntity(probe: Option[ActorRef]) extends Actor with ActorLogging {
|
||||
log.info("Entity started: " + self.path)
|
||||
probe.foreach(_ ! Started(self))
|
||||
|
||||
def receive = {
|
||||
case m ⇒ sender() ! m
|
||||
}
|
||||
}
|
||||
|
||||
val shardCount = 5
|
||||
|
||||
val extractEntityId: ShardRegion.ExtractEntityId = {
|
||||
case id: Int ⇒ (id.toString, id)
|
||||
}
|
||||
|
||||
val extractShardId1: ShardRegion.ExtractShardId = {
|
||||
case id: Int ⇒ (id % shardCount).toString
|
||||
case ShardRegion.StartEntity(id) ⇒ extractShardId1(id.toInt)
|
||||
}
|
||||
|
||||
val extractShardId2: ShardRegion.ExtractShardId = {
|
||||
// always bump it one shard id
|
||||
case id: Int ⇒ ((id + 1) % shardCount).toString
|
||||
case ShardRegion.StartEntity(id) ⇒ extractShardId2(id.toInt)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
abstract class ClusterShardingRememberEntitiesNewExtractorSpecConfig(val mode: String) extends MultiNodeConfig {
|
||||
val first = role("first")
|
||||
val second = role("second")
|
||||
val third = role("third")
|
||||
val fourth = role("fourth")
|
||||
val fifth = role("fifth")
|
||||
|
||||
commonConfig(ConfigFactory.parseString(s"""
|
||||
akka.loglevel = DEBUG
|
||||
akka.actor.provider = "cluster"
|
||||
akka.cluster.auto-down-unreachable-after = 0s
|
||||
akka.remote.log-remote-lifecycle-events = off
|
||||
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared"
|
||||
akka.persistence.journal.leveldb-shared {
|
||||
timeout = 5s
|
||||
store {
|
||||
native = off
|
||||
dir = "target/ShardingRememberEntitiesNewExtractorSpec/journal"
|
||||
}
|
||||
}
|
||||
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
|
||||
akka.persistence.snapshot-store.local.dir = "target/ShardingRememberEntitiesNewExtractorSpec/snapshots"
|
||||
akka.cluster.sharding.state-store-mode = "$mode"
|
||||
akka.cluster.sharding.distributed-data.durable.lmdb {
|
||||
dir = target/ShardingRememberEntitiesNewExtractorSpec/sharding-ddata
|
||||
map-size = 10 MiB
|
||||
}
|
||||
"""))
|
||||
|
||||
val roleConfig = ConfigFactory.parseString(
|
||||
"""
|
||||
akka.cluster.roles = [sharding]
|
||||
""")
|
||||
|
||||
// we pretend node 4 and 5 are new incarnations of node 2 and 3 as they never run in parallel
|
||||
// so we can use the same lmdb store for them and have node 4 pick up the persisted data of node 2
|
||||
val ddataNodeAConfig = ConfigFactory.parseString(
|
||||
"""
|
||||
akka.cluster.sharding.distributed-data.durable.lmdb {
|
||||
dir = target/ShardingRememberEntitiesNewExtractorSpec/sharding-node-a
|
||||
}
|
||||
""")
|
||||
val ddataNodeBConfig = ConfigFactory.parseString(
|
||||
"""
|
||||
akka.cluster.sharding.distributed-data.durable.lmdb {
|
||||
dir = target/ShardingRememberEntitiesNewExtractorSpec/sharding-node-b
|
||||
}
|
||||
""")
|
||||
|
||||
nodeConfig(second)(roleConfig.withFallback(ddataNodeAConfig))
|
||||
nodeConfig(third)(roleConfig.withFallback(ddataNodeBConfig))
|
||||
nodeConfig(fourth)(roleConfig.withFallback(ddataNodeAConfig))
|
||||
nodeConfig(fifth)(roleConfig.withFallback(ddataNodeBConfig))
|
||||
|
||||
}
|
||||
|
||||
object PersistentClusterShardingRememberEntitiesSpecNewExtractorConfig extends ClusterShardingRememberEntitiesNewExtractorSpecConfig(
|
||||
ClusterShardingSettings.StateStoreModePersistence)
|
||||
object DDataClusterShardingRememberEntitiesNewExtractorSpecConfig extends ClusterShardingRememberEntitiesNewExtractorSpecConfig(
|
||||
ClusterShardingSettings.StateStoreModeDData)
|
||||
|
||||
class PersistentClusterShardingRememberEntitiesNewExtractorSpec extends ClusterShardingRememberEntitiesNewExtractorSpec(
|
||||
PersistentClusterShardingRememberEntitiesSpecNewExtractorConfig)
|
||||
|
||||
class PersistentClusterShardingRememberEntitiesNewExtractorMultiJvmNode1 extends PersistentClusterShardingRememberEntitiesNewExtractorSpec
|
||||
class PersistentClusterShardingRememberEntitiesNewExtractorMultiJvmNode2 extends PersistentClusterShardingRememberEntitiesNewExtractorSpec
|
||||
class PersistentClusterShardingRememberEntitiesNewExtractorMultiJvmNode3 extends PersistentClusterShardingRememberEntitiesNewExtractorSpec
|
||||
class PersistentClusterShardingRememberEntitiesNewExtractorMultiJvmNode4 extends PersistentClusterShardingRememberEntitiesNewExtractorSpec
|
||||
class PersistentClusterShardingRememberEntitiesNewExtractorMultiJvmNode5 extends PersistentClusterShardingRememberEntitiesNewExtractorSpec
|
||||
|
||||
class DDataClusterShardingRememberEntitiesNewExtractorSpec extends ClusterShardingRememberEntitiesNewExtractorSpec(
|
||||
DDataClusterShardingRememberEntitiesNewExtractorSpecConfig)
|
||||
|
||||
class DDataClusterShardingRememberEntitiesNewExtractorMultiJvmNode1 extends DDataClusterShardingRememberEntitiesNewExtractorSpec
|
||||
class DDataClusterShardingRememberEntitiesNewExtractorMultiJvmNode2 extends DDataClusterShardingRememberEntitiesNewExtractorSpec
|
||||
class DDataClusterShardingRememberEntitiesNewExtractorMultiJvmNode3 extends DDataClusterShardingRememberEntitiesNewExtractorSpec
|
||||
class DDataClusterShardingRememberEntitiesNewExtractorMultiJvmNode4 extends DDataClusterShardingRememberEntitiesNewExtractorSpec
|
||||
class DDataClusterShardingRememberEntitiesNewExtractorMultiJvmNode5 extends DDataClusterShardingRememberEntitiesNewExtractorSpec
|
||||
|
||||
abstract class ClusterShardingRememberEntitiesNewExtractorSpec(config: ClusterShardingRememberEntitiesNewExtractorSpecConfig) extends MultiNodeSpec(config) with STMultiNodeSpec with ImplicitSender {
|
||||
import ClusterShardingRememberEntitiesNewExtractorSpec._
|
||||
import config._
|
||||
|
||||
val typeName = "Entity"
|
||||
|
||||
override def initialParticipants = roles.size
|
||||
|
||||
val storageLocations = List(new File(system.settings.config.getString(
|
||||
"akka.cluster.sharding.distributed-data.durable.lmdb.dir")).getParentFile)
|
||||
|
||||
override protected def atStartup() {
|
||||
storageLocations.foreach(dir ⇒ if (dir.exists) FileUtils.deleteQuietly(dir))
|
||||
enterBarrier("startup")
|
||||
}
|
||||
|
||||
override protected def afterTermination() {
|
||||
storageLocations.foreach(dir ⇒ if (dir.exists) FileUtils.deleteQuietly(dir))
|
||||
}
|
||||
|
||||
def join(from: RoleName, to: RoleName): Unit = {
|
||||
runOn(from) {
|
||||
Cluster(system) join node(to).address
|
||||
}
|
||||
enterBarrier(from.name + "-joined")
|
||||
}
|
||||
|
||||
val cluster = Cluster(system)
|
||||
|
||||
def startShardingWithExtractor1(): Unit = {
|
||||
ClusterSharding(system).start(
|
||||
typeName = typeName,
|
||||
entityProps = ClusterShardingRememberEntitiesNewExtractorSpec.props(None),
|
||||
settings = ClusterShardingSettings(system).withRememberEntities(true).withRole("sharding"),
|
||||
extractEntityId = extractEntityId,
|
||||
extractShardId = extractShardId1)
|
||||
}
|
||||
|
||||
def startShardingWithExtractor2(): Unit = {
|
||||
ClusterSharding(system).start(
|
||||
typeName = typeName,
|
||||
entityProps = ClusterShardingRememberEntitiesNewExtractorSpec.props(Some(testActor)),
|
||||
settings = ClusterShardingSettings(system).withRememberEntities(true).withRole("sharding"),
|
||||
extractEntityId = extractEntityId,
|
||||
extractShardId = extractShardId2)
|
||||
}
|
||||
|
||||
lazy val region = ClusterSharding(system).shardRegion(typeName)
|
||||
|
||||
def isDdataMode: Boolean = mode == ClusterShardingSettings.StateStoreModeDData
|
||||
|
||||
s"Cluster with min-nr-of-members using sharding ($mode)" must {
|
||||
|
||||
if (!isDdataMode) {
|
||||
"setup shared journal" in {
|
||||
// start the Persistence extension
|
||||
Persistence(system)
|
||||
runOn(first) {
|
||||
system.actorOf(Props[SharedLeveldbStore], "store")
|
||||
}
|
||||
enterBarrier("persistence-started")
|
||||
|
||||
runOn(second, third, fourth, fifth) {
|
||||
system.actorSelection(node(first) / "user" / "store") ! Identify(None)
|
||||
val sharedStore = expectMsgType[ActorIdentity](10.seconds).ref.get
|
||||
SharedLeveldbJournal.setStore(sharedStore, system)
|
||||
}
|
||||
|
||||
enterBarrier("after-1")
|
||||
}
|
||||
}
|
||||
|
||||
"start up first cluster and sharding" in within(15.seconds) {
|
||||
join(first, first)
|
||||
join(second, first)
|
||||
join(third, first)
|
||||
|
||||
runOn(first, second, third) {
|
||||
within(remaining) {
|
||||
awaitAssert {
|
||||
cluster.state.members.count(_.status == MemberStatus.Up) should ===(3)
|
||||
}
|
||||
}
|
||||
}
|
||||
runOn(second, third) {
|
||||
startShardingWithExtractor1()
|
||||
}
|
||||
enterBarrier("first-cluster-up")
|
||||
|
||||
runOn(second, third) {
|
||||
// one entity for each shard id
|
||||
(1 to 10).foreach { n ⇒
|
||||
region ! n
|
||||
expectMsg(n)
|
||||
}
|
||||
}
|
||||
enterBarrier("first-cluster-entities-up")
|
||||
}
|
||||
|
||||
"shutdown sharding nodes" in within(30.seconds) {
|
||||
runOn(first) {
|
||||
testConductor.exit(second, 0).await
|
||||
testConductor.exit(third, 0).await
|
||||
}
|
||||
runOn(first) {
|
||||
within(remaining) {
|
||||
awaitAssert {
|
||||
cluster.state.members.count(_.status == MemberStatus.Up) should ===(1)
|
||||
}
|
||||
}
|
||||
}
|
||||
enterBarrier("first-sharding-cluster-stopped")
|
||||
}
|
||||
|
||||
"start new nodes with different extractor" in within(15.seconds) {
|
||||
|
||||
// start it with a new shard id extractor, which will put the entities
|
||||
// on different shards
|
||||
|
||||
join(fourth, first)
|
||||
join(fifth, first)
|
||||
runOn(first) {
|
||||
within(remaining) {
|
||||
awaitAssert {
|
||||
cluster.state.members.count(_.status == MemberStatus.Up) should ===(3)
|
||||
}
|
||||
}
|
||||
}
|
||||
runOn(fourth, fifth) {
|
||||
startShardingWithExtractor2()
|
||||
}
|
||||
|
||||
// TODO how do we know that the shards has started??
|
||||
Thread.sleep(7000)
|
||||
enterBarrier("new-nodes-started")
|
||||
}
|
||||
|
||||
"have the remembered entities running on the right shards" in within(15.seconds) {
|
||||
runOn(fourth, fifth) {
|
||||
var stats: ShardRegion.CurrentShardRegionState = null
|
||||
within(remaining) {
|
||||
awaitAssert {
|
||||
region ! ShardRegion.GetShardRegionState
|
||||
val reply = expectMsgType[ShardRegion.CurrentShardRegionState]
|
||||
reply.shards should not be empty
|
||||
stats = reply
|
||||
}
|
||||
}
|
||||
|
||||
for {
|
||||
shardState ← stats.shards
|
||||
entityId ← shardState.entityIds
|
||||
} {
|
||||
val calculatedShardId = extractShardId2(entityId.toInt)
|
||||
calculatedShardId should ===(shardState.shardId)
|
||||
}
|
||||
}
|
||||
|
||||
enterBarrier("done")
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -1,6 +1,6 @@
|
|||
akka {
|
||||
actor {
|
||||
serialize-creators = on
|
||||
serialize-creators = off
|
||||
serialize-messages = on
|
||||
warn-about-java-serializer-usage = off
|
||||
}
|
||||
|
|
|
|||
|
|
@ -3,11 +3,10 @@
|
|||
*/
|
||||
package akka.cluster.sharding.protobuf
|
||||
|
||||
import akka.actor.{ ExtendedActorSystem }
|
||||
import akka.actor.ExtendedActorSystem
|
||||
import akka.testkit.AkkaSpec
|
||||
import akka.actor.Props
|
||||
import akka.cluster.sharding.ShardCoordinator
|
||||
import akka.cluster.sharding.Shard
|
||||
import akka.cluster.sharding.{ Shard, ShardCoordinator, ShardRegion }
|
||||
|
||||
class ClusterShardingMessageSerializerSpec extends AkkaSpec {
|
||||
import ShardCoordinator.Internal._
|
||||
|
|
@ -28,7 +27,7 @@ class ClusterShardingMessageSerializerSpec extends AkkaSpec {
|
|||
|
||||
"ClusterShardingMessageSerializer" must {
|
||||
|
||||
"be able to serializable ShardCoordinator snapshot State" in {
|
||||
"be able to serialize ShardCoordinator snapshot State" in {
|
||||
val state = State(
|
||||
shards = Map("a" → region1, "b" → region2, "c" → region2),
|
||||
regions = Map(region1 → Vector("a"), region2 → Vector("b", "c"), region3 → Vector.empty[String]),
|
||||
|
|
@ -37,7 +36,7 @@ class ClusterShardingMessageSerializerSpec extends AkkaSpec {
|
|||
checkSerialization(state)
|
||||
}
|
||||
|
||||
"be able to serializable ShardCoordinator domain events" in {
|
||||
"be able to serialize ShardCoordinator domain events" in {
|
||||
checkSerialization(ShardRegionRegistered(region1))
|
||||
checkSerialization(ShardRegionProxyRegistered(regionProxy1))
|
||||
checkSerialization(ShardRegionTerminated(region1))
|
||||
|
|
@ -46,7 +45,7 @@ class ClusterShardingMessageSerializerSpec extends AkkaSpec {
|
|||
checkSerialization(ShardHomeDeallocated("a"))
|
||||
}
|
||||
|
||||
"be able to serializable ShardCoordinator remote messages" in {
|
||||
"be able to serialize ShardCoordinator remote messages" in {
|
||||
checkSerialization(Register(region1))
|
||||
checkSerialization(RegisterProxy(regionProxy1))
|
||||
checkSerialization(RegisterAck(region1))
|
||||
|
|
@ -61,21 +60,26 @@ class ClusterShardingMessageSerializerSpec extends AkkaSpec {
|
|||
checkSerialization(GracefulShutdownReq(region1))
|
||||
}
|
||||
|
||||
"be able to serializable PersistentShard snapshot state" in {
|
||||
"be able to serialize PersistentShard snapshot state" in {
|
||||
checkSerialization(Shard.State(Set("e1", "e2", "e3")))
|
||||
}
|
||||
|
||||
"be able to serializable PersistentShard domain events" in {
|
||||
"be able to serialize PersistentShard domain events" in {
|
||||
checkSerialization(Shard.EntityStarted("e1"))
|
||||
checkSerialization(Shard.EntityStopped("e1"))
|
||||
}
|
||||
|
||||
"be able to serializable GetShardStats" in {
|
||||
"be able to serialize GetShardStats" in {
|
||||
checkSerialization(Shard.GetShardStats)
|
||||
}
|
||||
|
||||
"be able to serializable ShardStats" in {
|
||||
"be able to serialize ShardStats" in {
|
||||
checkSerialization(Shard.ShardStats("a", 23))
|
||||
}
|
||||
|
||||
"be able to serialize StartEntity" in {
|
||||
checkSerialization(ShardRegion.StartEntity("42"))
|
||||
checkSerialization(ShardRegion.StartEntityAck("13", "37"))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -267,12 +267,15 @@ are thereafter delivered to a new incarnation of the entity.
|
|||
|
||||
The list of entities in each `Shard` can be made persistent (durable) by setting
|
||||
the `rememberEntities` flag to true in `ClusterShardingSettings` when calling
|
||||
`ClusterSharding.start`. When configured to remember entities, whenever a `Shard`
|
||||
is rebalanced onto another node or recovers after a crash it will recreate all the
|
||||
entities which were previously running in that `Shard`. To permanently stop entities,
|
||||
a `Passivate` message must be sent to the parent of the entity actor, otherwise the
|
||||
entity will be automatically restarted after the entity restart backoff specified in
|
||||
the configuration.
|
||||
`ClusterSharding.start` and making sure the `shardIdExtractor` handles
|
||||
`Shard.StartEntity(EntityId)` which implies that a `ShardId` must be possible to
|
||||
extract from the `EntityId`.
|
||||
|
||||
When configured to remember entities, whenever a `Shard` is rebalanced onto another
|
||||
node or recovers after a crash it will recreate all the entities which were previously
|
||||
running in that `Shard`. To permanently stop entities, a `Passivate` message must be
|
||||
sent to the parent of the entity actor, otherwise the entity will be automatically
|
||||
restarted after the entity restart backoff specified in the configuration.
|
||||
|
||||
When [Distributed Data mode](#cluster-sharding-mode) is used the identifiers of the entities are
|
||||
stored in @ref:[Durable Storage](distributed-data.md#ddata-durable) of Distributed Data. You may want to change the
|
||||
|
|
|
|||
|
|
@ -270,12 +270,15 @@ are thereafter delivered to a new incarnation of the entity.
|
|||
|
||||
The list of entities in each `Shard` can be made persistent (durable) by setting
|
||||
the `rememberEntities` flag to true in `ClusterShardingSettings` when calling
|
||||
`ClusterSharding.start`. When configured to remember entities, whenever a `Shard`
|
||||
is rebalanced onto another node or recovers after a crash it will recreate all the
|
||||
entities which were previously running in that `Shard`. To permanently stop entities,
|
||||
a `Passivate` message must be sent to the parent of the entity actor, otherwise the
|
||||
entity will be automatically restarted after the entity restart backoff specified in
|
||||
the configuration.
|
||||
`ClusterSharding.start` and making sure the `shardIdExtractor` handles
|
||||
`Shard.StartEntity(EntityId)` which implies that a `ShardId` must be possible to
|
||||
extract from the `EntityId`.
|
||||
|
||||
When configured to remember entities, whenever a `Shard` is rebalanced onto another
|
||||
node or recovers after a crash it will recreate all the entities which were previously
|
||||
running in that `Shard`. To permanently stop entities, a `Passivate` message must be
|
||||
sent to the parent of the entity actor, otherwise the entity will be automatically
|
||||
restarted after the entity restart backoff specified in the configuration.
|
||||
|
||||
When [Distributed Data mode](#cluster-sharding-mode) is used the identifiers of the entities are
|
||||
stored in @ref:[Durable Storage](distributed-data.md#ddata-durable) of Distributed Data. You may want to change the
|
||||
|
|
|
|||
|
|
@ -479,6 +479,18 @@ Note that the stored @ref:[Remembering Entities](../cluster-sharding.md#cluster-
|
|||
be migrated to the `data` mode. Such entities must be started again in some other way when using
|
||||
`ddata` mode.
|
||||
|
||||
### Cluster Sharding remember entities
|
||||
|
||||
To use *remember entities* with cluster sharding there are now an additional requirement added: the
|
||||
`extractShardId` must be able to extract the shard id from the message `Shard.StartEntity(EntityId)`.
|
||||
This is implies that it must be possible to calculate a shard id from an entity id when using remember
|
||||
entities.
|
||||
|
||||
This was added to be able to gracefully handle when persisted locations of entities does not match
|
||||
where the entities should live when a shard region starts up. Such states could be cause by changing
|
||||
the `extractShardId` logic and restart a system using *remember entities*.
|
||||
|
||||
|
||||
### Cluster Management Command Line Tool
|
||||
|
||||
There is a new cluster management tool with HTTP API that has the same functionality as the command line tool.
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue