rememberingEntities with ddata mode, #22154
* one Replicator per configured role * log LMDB directory at startup * clarify the imporantce of the LMDB directory * use more than one key to support many entities
This commit is contained in:
parent
8fd5b7e53e
commit
37679d307e
23 changed files with 713 additions and 337 deletions
|
|
@ -19,6 +19,12 @@ import akka.persistence.SaveSnapshotFailure
|
|||
import akka.persistence.SaveSnapshotSuccess
|
||||
import scala.concurrent.Future
|
||||
import scala.concurrent.duration.FiniteDuration
|
||||
import akka.cluster.Cluster
|
||||
import akka.cluster.ddata.ORSet
|
||||
import akka.cluster.ddata.ORSetKey
|
||||
import akka.cluster.ddata.Replicator._
|
||||
import akka.actor.Stash
|
||||
import akka.cluster.ddata.DistributedData
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
|
|
@ -96,8 +102,12 @@ private[akka] object Shard {
|
|||
settings: ClusterShardingSettings,
|
||||
extractEntityId: ShardRegion.ExtractEntityId,
|
||||
extractShardId: ShardRegion.ExtractShardId,
|
||||
handOffStopMessage: Any): Props = {
|
||||
if (settings.rememberEntities)
|
||||
handOffStopMessage: Any,
|
||||
replicator: ActorRef): Props = {
|
||||
if (settings.rememberEntities && settings.stateStoreMode == ClusterShardingSettings.StateStoreModeDData) {
|
||||
Props(new DDataShard(typeName, shardId, entityProps, settings, extractEntityId, extractShardId,
|
||||
handOffStopMessage, replicator)).withDeploy(Deploy.local)
|
||||
} else if (settings.rememberEntities && settings.stateStoreMode == ClusterShardingSettings.StateStoreModePersistence)
|
||||
Props(new PersistentShard(typeName, shardId, entityProps, settings, extractEntityId, extractShardId, handOffStopMessage))
|
||||
.withDeploy(Deploy.local)
|
||||
else
|
||||
|
|
@ -125,8 +135,7 @@ private[akka] class Shard(
|
|||
|
||||
import ShardRegion.{ handOffStopperProps, EntityId, Msg, Passivate, ShardInitialized }
|
||||
import ShardCoordinator.Internal.{ HandOff, ShardStopped }
|
||||
import Shard.{ State, RestartEntity, RestartEntities, EntityStopped, EntityStarted }
|
||||
import Shard.{ ShardQuery, GetCurrentShardState, CurrentShardState, GetShardStats, ShardStats }
|
||||
import Shard._
|
||||
import akka.cluster.sharding.ShardCoordinator.Internal.CoordinatorMessage
|
||||
import akka.cluster.sharding.ShardRegion.ShardRegionCommand
|
||||
import settings.tuningParameters._
|
||||
|
|
@ -145,7 +154,7 @@ private[akka] class Shard(
|
|||
|
||||
def totalBufferSize = messageBuffers.foldLeft(0) { (sum, entity) ⇒ sum + entity._2.size }
|
||||
|
||||
def processChange[A](event: A)(handler: A ⇒ Unit): Unit =
|
||||
def processChange[E <: StateChange](event: E)(handler: E ⇒ Unit): Unit =
|
||||
handler(event)
|
||||
|
||||
def receive = receiveCommand
|
||||
|
|
@ -304,80 +313,35 @@ private[akka] class Shard(
|
|||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*
|
||||
* This actor creates children entity actors on demand that it is told to be
|
||||
* responsible for. It is used when `rememberEntities` is enabled.
|
||||
*
|
||||
* @see [[ClusterSharding$ ClusterSharding extension]]
|
||||
* INTERNAL API: Common things for PersistentShard and DDataShard
|
||||
*/
|
||||
private[akka] class PersistentShard(
|
||||
typeName: String,
|
||||
shardId: ShardRegion.ShardId,
|
||||
entityProps: Props,
|
||||
settings: ClusterShardingSettings,
|
||||
extractEntityId: ShardRegion.ExtractEntityId,
|
||||
extractShardId: ShardRegion.ExtractShardId,
|
||||
handOffStopMessage: Any) extends Shard(
|
||||
typeName, shardId, entityProps, settings, extractEntityId, extractShardId, handOffStopMessage)
|
||||
with PersistentActor with ActorLogging {
|
||||
|
||||
private[akka] trait RememberingShard { selfType: Shard ⇒
|
||||
import ShardRegion.{ EntityId, Msg }
|
||||
import Shard.{ State, RestartEntity, RestartEntities, EntityStopped, EntityStarted }
|
||||
import settings.tuningParameters._
|
||||
import Shard._
|
||||
import akka.pattern.pipe
|
||||
|
||||
val rememberedEntitiesRecoveryStrategy: EntityRecoveryStrategy =
|
||||
protected val settings: ClusterShardingSettings
|
||||
|
||||
protected val rememberedEntitiesRecoveryStrategy: EntityRecoveryStrategy = {
|
||||
import settings.tuningParameters._
|
||||
entityRecoveryStrategy match {
|
||||
case "all" ⇒ EntityRecoveryStrategy.allStrategy()
|
||||
case "constant" ⇒ EntityRecoveryStrategy.constantStrategy(
|
||||
context.system,
|
||||
entityRecoveryConstantRateStrategyFrequency,
|
||||
entityRecoveryConstantRateStrategyNumberOfEntities
|
||||
)
|
||||
}
|
||||
|
||||
override def persistenceId = s"/sharding/${typeName}Shard/${shardId}"
|
||||
|
||||
override def journalPluginId: String = settings.journalPluginId
|
||||
|
||||
override def snapshotPluginId: String = settings.snapshotPluginId
|
||||
|
||||
// would be initialized after recovery completed
|
||||
override def initialized(): Unit = {}
|
||||
|
||||
override def receive = receiveCommand
|
||||
|
||||
override def processChange[A](event: A)(handler: A ⇒ Unit): Unit = {
|
||||
saveSnapshotWhenNeeded()
|
||||
persist(event)(handler)
|
||||
}
|
||||
|
||||
def saveSnapshotWhenNeeded(): Unit = {
|
||||
if (lastSequenceNr % snapshotAfter == 0 && lastSequenceNr != 0) {
|
||||
log.debug("Saving snapshot, sequence number [{}]", snapshotSequenceNr)
|
||||
saveSnapshot(state)
|
||||
entityRecoveryConstantRateStrategyNumberOfEntities)
|
||||
}
|
||||
}
|
||||
|
||||
override def receiveRecover: Receive = {
|
||||
case EntityStarted(id) ⇒ state = state.copy(state.entities + id)
|
||||
case EntityStopped(id) ⇒ state = state.copy(state.entities - id)
|
||||
case SnapshotOffer(_, snapshot: State) ⇒ state = snapshot
|
||||
case RecoveryCompleted ⇒
|
||||
restartRememberedEntities()
|
||||
super.initialized()
|
||||
log.debug("Shard recovery completed {}", shardId)
|
||||
protected def restartRememberedEntities(): Unit = {
|
||||
rememberedEntitiesRecoveryStrategy.recoverEntities(state.entities).foreach { scheduledRecovery ⇒
|
||||
import context.dispatcher
|
||||
scheduledRecovery.filter(_.nonEmpty).map(RestartEntities).pipeTo(self)
|
||||
}
|
||||
}
|
||||
|
||||
override def receiveCommand: Receive = ({
|
||||
case _: SaveSnapshotSuccess ⇒
|
||||
log.debug("PersistentShard snapshot saved successfully")
|
||||
case SaveSnapshotFailure(_, reason) ⇒
|
||||
log.warning("PersistentShard snapshot failure: {}", reason.getMessage)
|
||||
}: Receive).orElse(super.receiveCommand)
|
||||
|
||||
override def entityTerminated(ref: ActorRef): Unit = {
|
||||
import settings.tuningParameters._
|
||||
val id = idByRef(ref)
|
||||
if (messageBuffers.getOrElse(id, Vector.empty).nonEmpty) {
|
||||
//Note; because we're not persisting the EntityStopped, we don't need
|
||||
|
|
@ -404,17 +368,225 @@ private[akka] class PersistentShard(
|
|||
case None ⇒
|
||||
//Note; we only do this if remembering, otherwise the buffer is an overhead
|
||||
messageBuffers = messageBuffers.updated(id, Vector((msg, snd)))
|
||||
saveSnapshotWhenNeeded()
|
||||
persist(EntityStarted(id))(sendMsgBuffer)
|
||||
processChange(EntityStarted(id))(sendMsgBuffer)
|
||||
}
|
||||
}
|
||||
|
||||
private def restartRememberedEntities(): Unit = {
|
||||
rememberedEntitiesRecoveryStrategy.recoverEntities(state.entities).foreach { scheduledRecovery ⇒
|
||||
import context.dispatcher
|
||||
scheduledRecovery.filter(_.nonEmpty).map(RestartEntities).pipeTo(self)
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*
|
||||
* This actor creates children entity actors on demand that it is told to be
|
||||
* responsible for. It is used when `rememberEntities` is enabled and
|
||||
* `state-store-mode=persistence`.
|
||||
*
|
||||
* @see [[ClusterSharding$ ClusterSharding extension]]
|
||||
*/
|
||||
private[akka] class PersistentShard(
|
||||
typeName: String,
|
||||
shardId: ShardRegion.ShardId,
|
||||
entityProps: Props,
|
||||
override val settings: ClusterShardingSettings,
|
||||
extractEntityId: ShardRegion.ExtractEntityId,
|
||||
extractShardId: ShardRegion.ExtractShardId,
|
||||
handOffStopMessage: Any) extends Shard(
|
||||
typeName, shardId, entityProps, settings, extractEntityId, extractShardId, handOffStopMessage)
|
||||
with RememberingShard with PersistentActor with ActorLogging {
|
||||
|
||||
import ShardRegion.{ EntityId, Msg }
|
||||
import Shard._
|
||||
import settings.tuningParameters._
|
||||
|
||||
override def persistenceId = s"/sharding/${typeName}Shard/${shardId}"
|
||||
|
||||
override def journalPluginId: String = settings.journalPluginId
|
||||
|
||||
override def snapshotPluginId: String = settings.snapshotPluginId
|
||||
|
||||
// would be initialized after recovery completed
|
||||
override def initialized(): Unit = {}
|
||||
|
||||
override def receive = receiveCommand
|
||||
|
||||
override def processChange[E <: StateChange](event: E)(handler: E ⇒ Unit): Unit = {
|
||||
saveSnapshotWhenNeeded()
|
||||
persist(event)(handler)
|
||||
}
|
||||
|
||||
def saveSnapshotWhenNeeded(): Unit = {
|
||||
if (lastSequenceNr % snapshotAfter == 0 && lastSequenceNr != 0) {
|
||||
log.debug("Saving snapshot, sequence number [{}]", snapshotSequenceNr)
|
||||
saveSnapshot(state)
|
||||
}
|
||||
}
|
||||
|
||||
override def receiveRecover: Receive = {
|
||||
case EntityStarted(id) ⇒ state = state.copy(state.entities + id)
|
||||
case EntityStopped(id) ⇒ state = state.copy(state.entities - id)
|
||||
case SnapshotOffer(_, snapshot: State) ⇒ state = snapshot
|
||||
case RecoveryCompleted ⇒
|
||||
restartRememberedEntities()
|
||||
super.initialized()
|
||||
log.debug("PersistentShard recovery completed shard [{}] with [{}] entities", shardId, state.entities.size)
|
||||
}
|
||||
|
||||
override def receiveCommand: Receive = ({
|
||||
case _: SaveSnapshotSuccess ⇒
|
||||
log.debug("PersistentShard snapshot saved successfully")
|
||||
case SaveSnapshotFailure(_, reason) ⇒
|
||||
log.warning("PersistentShard snapshot failure: {}", reason.getMessage)
|
||||
}: Receive).orElse(super.receiveCommand)
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*
|
||||
* This actor creates children entity actors on demand that it is told to be
|
||||
* responsible for. It is used when `rememberEntities` is enabled and
|
||||
* `state-store-mode=ddata`.
|
||||
*
|
||||
* @see [[ClusterSharding$ ClusterSharding extension]]
|
||||
*/
|
||||
private[akka] class DDataShard(
|
||||
typeName: String,
|
||||
shardId: ShardRegion.ShardId,
|
||||
entityProps: Props,
|
||||
override val settings: ClusterShardingSettings,
|
||||
extractEntityId: ShardRegion.ExtractEntityId,
|
||||
extractShardId: ShardRegion.ExtractShardId,
|
||||
handOffStopMessage: Any,
|
||||
replicator: ActorRef) extends Shard(
|
||||
typeName, shardId, entityProps, settings, extractEntityId, extractShardId, handOffStopMessage)
|
||||
with RememberingShard with Stash with ActorLogging {
|
||||
|
||||
import ShardRegion.{ EntityId, Msg }
|
||||
import Shard._
|
||||
import settings.tuningParameters._
|
||||
|
||||
private val waitingForStateTimeout = settings.tuningParameters.waitingForStateTimeout
|
||||
private val updatingStateTimeout = settings.tuningParameters.updatingStateTimeout
|
||||
private val maxUpdateAttempts = 3
|
||||
|
||||
implicit private val node = Cluster(context.system)
|
||||
|
||||
// The default maximum-frame-size is 256 KiB with Artery.
|
||||
// ORSet with 40000 elements has a size of ~ 200000 bytes.
|
||||
// By splitting the elements over 5 keys we can safely support 200000 entities per shard.
|
||||
// This is by intention not configurable because it's important to have the same
|
||||
// configuration on each node.
|
||||
private val numberOfKeys = 5
|
||||
private val stateKeys: Array[ORSetKey[EntityId]] =
|
||||
Array.tabulate(numberOfKeys)(i ⇒ ORSetKey[EntityId](s"shard-${typeName}-${shardId}-$i"))
|
||||
|
||||
private def key(entityId: EntityId): ORSetKey[EntityId] = {
|
||||
val i = (math.abs(entityId.hashCode) % numberOfKeys)
|
||||
stateKeys(i)
|
||||
}
|
||||
|
||||
// get initial state from ddata replicator
|
||||
getState()
|
||||
|
||||
private def getState(): Unit = {
|
||||
(0 until numberOfKeys).map { i ⇒
|
||||
replicator ! Get(stateKeys(i), ReadMajority(waitingForStateTimeout), Some(i))
|
||||
}
|
||||
}
|
||||
|
||||
// would be initialized after recovery completed
|
||||
override def initialized(): Unit = {}
|
||||
|
||||
override def receive = waitingForState(Set.empty)
|
||||
|
||||
// This state will stash all commands
|
||||
private def waitingForState(gotKeys: Set[Int]): Receive = {
|
||||
def receiveOne(i: Int): Unit = {
|
||||
val newGotKeys = gotKeys + i
|
||||
if (newGotKeys.size == numberOfKeys)
|
||||
recoveryCompleted()
|
||||
else
|
||||
context.become(waitingForState(newGotKeys))
|
||||
}
|
||||
|
||||
{
|
||||
case g @ GetSuccess(_, Some(i: Int)) ⇒
|
||||
val key = stateKeys(i)
|
||||
state = state.copy(entities = state.entities union (g.get(key).elements))
|
||||
receiveOne(i)
|
||||
|
||||
case GetFailure(_, _) ⇒
|
||||
log.error(
|
||||
"The DDataShard was unable to get an initial state within 'waiting-for-state-timeout': {} millis",
|
||||
waitingForStateTimeout.toMillis)
|
||||
// parent ShardRegion supervisor will notice that it terminated and will start it again, after backoff
|
||||
context.stop(self)
|
||||
|
||||
case NotFound(_, Some(i: Int)) ⇒
|
||||
receiveOne(i)
|
||||
|
||||
case _ ⇒
|
||||
stash()
|
||||
}
|
||||
}
|
||||
|
||||
private def recoveryCompleted(): Unit = {
|
||||
restartRememberedEntities()
|
||||
super.initialized()
|
||||
log.debug("DDataShard recovery completed shard [{}] with [{}] entities", shardId, state.entities.size)
|
||||
unstashAll()
|
||||
context.become(receiveCommand)
|
||||
}
|
||||
|
||||
override def processChange[E <: StateChange](event: E)(handler: E ⇒ Unit): Unit = {
|
||||
context.become(waitingForUpdate(event, handler), discardOld = false)
|
||||
sendUpdate(event, retryCount = 1)
|
||||
}
|
||||
|
||||
private def sendUpdate(evt: StateChange, retryCount: Int) = {
|
||||
replicator ! Update(key(evt.entityId), ORSet.empty[EntityId], WriteMajority(updatingStateTimeout),
|
||||
Some((evt, retryCount))) { existing ⇒
|
||||
evt match {
|
||||
case EntityStarted(id) ⇒ existing + id
|
||||
case EntityStopped(id) ⇒ existing - id
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// this state will stash all messages until it receives UpdateSuccess
|
||||
private def waitingForUpdate[E <: StateChange](evt: E, afterUpdateCallback: E ⇒ Unit): Receive = {
|
||||
case UpdateSuccess(_, Some((`evt`, _))) ⇒
|
||||
log.debug("The DDataShard state was successfully updated with {}", evt)
|
||||
context.unbecome()
|
||||
afterUpdateCallback(evt)
|
||||
unstashAll()
|
||||
|
||||
case UpdateTimeout(_, Some((`evt`, retryCount: Int))) ⇒
|
||||
if (retryCount == maxUpdateAttempts) {
|
||||
// parent ShardRegion supervisor will notice that it terminated and will start it again, after backoff
|
||||
log.error(
|
||||
"The DDataShard was unable to update state after {} attempts, within 'updating-state-timeout'={} millis, event={}. " +
|
||||
"Shard will be restarted after backoff.",
|
||||
maxUpdateAttempts, updatingStateTimeout.toMillis, evt)
|
||||
context.stop(self)
|
||||
} else {
|
||||
log.warning(
|
||||
"The DDataShard was unable to update state, attempt {} of {}, within 'updating-state-timeout'={} millis, event={}",
|
||||
retryCount, maxUpdateAttempts, updatingStateTimeout.toMillis, evt)
|
||||
sendUpdate(evt, retryCount + 1)
|
||||
}
|
||||
|
||||
case ModifyFailure(_, error, cause, Some((`evt`, _))) ⇒
|
||||
log.error(
|
||||
cause,
|
||||
"The DDataShard was unable to update state with error {} and event {}. Shard will be restarted",
|
||||
error,
|
||||
evt)
|
||||
throw cause
|
||||
|
||||
case _ ⇒ stash()
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
object EntityRecoveryStrategy {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue