Merge pull request #29227 from johanandren/wip-smaller-end-review-feedback-fixes

All the smaller pieces of feedback from Patrik addressed
This commit is contained in:
Johan Andrén 2020-06-12 11:27:22 +02:00 committed by GitHub
commit b70d851ea0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 46 additions and 33 deletions

View file

@ -8,10 +8,8 @@
<pattern>%date{ISO8601} %-5level %logger %marker - %msg MDC: {%mdc}%n</pattern>
</encoder>
</appender>
<logger name="akka.cluster.sharding" level="INFO"/>
<root level="WARN">
<root level="INFO">
<appender-ref ref="STDOUT"/>
</root>
</configuration>

View file

@ -23,6 +23,7 @@ object ClusterShardingSettings {
val StateStoreModeDData = "ddata"
/**
* Only for testing
* INTERNAL API
*/
@InternalApi
@ -308,9 +309,8 @@ final class ClusterShardingSettings(
extends NoSerializationVerificationNeeded {
@deprecated(
"Use the ClusterShardingSettings factory methods or the constructor including rememberedEntitiesStoreinstead",
"2.6.6" // TODO update once merged
)
"Use the ClusterShardingSettings factory methods or the constructor including rememberedEntitiesStore instead",
"2.6.7")
def this(
role: Option[String],
rememberEntities: Boolean,

View file

@ -16,7 +16,7 @@ import akka.persistence.journal.EventSeq
* INTERNAL API
*/
@InternalApi
final class OldCoordinatorStateMigrationEventAdapter extends EventAdapter {
private[akka] final class OldCoordinatorStateMigrationEventAdapter extends EventAdapter {
override def manifest(event: Any): String =
""

View file

@ -177,9 +177,11 @@ private[akka] object Shard {
}
object RememberingStart {
val empty = new RememberingStart(Set.empty)
def apply(ackTo: Option[ActorRef]): RememberingStart =
ackTo match {
case None => RememberingStartNoAck
case None => empty
case Some(ackTo) => RememberingStart(Set(ackTo))
}
}
@ -194,16 +196,15 @@ private[akka] object Shard {
case active: Active => active
case r: RememberingStart =>
if (ackTo.isEmpty) {
if (r.ackTo.isEmpty) RememberingStartNoAck
if (r.ackTo.isEmpty) RememberingStart.empty
else newState
} else {
if (r.ackTo.isEmpty) this
else RememberingStart(ackTo ++ r.ackTo)
else RememberingStart(ackTo.union(r.ackTo))
}
case _ => throw new IllegalArgumentException(s"Transition from $this to $newState not allowed")
}
}
private val RememberingStartNoAck = new RememberingStart(Set.empty)
/**
* When remember entities is enabled an entity is in this state while
@ -245,8 +246,9 @@ private[akka] object Shard {
final class Entities(log: LoggingAdapter, rememberingEntities: Boolean, verboseDebug: Boolean) {
private val entities: java.util.Map[EntityId, EntityState] = new util.HashMap[EntityId, EntityState]()
// needed to look up entity by reg when a Passivating is received
// needed to look up entity by ref when a Passivating is received
private val byRef = new util.HashMap[ActorRef, EntityId]()
// optimization to not have to go through all entities to find batched writes
private val remembering = new util.HashSet[EntityId]()
def alreadyRemembered(set: Set[EntityId]): Unit = {
@ -300,9 +302,11 @@ private[akka] object Shard {
case _ => OptionVal.None
}
def entityState(id: EntityId): EntityState = {
OptionVal(entities.get(id)).getOrElse(NoState)
}
def entityState(id: EntityId): EntityState =
entities.get(id) match {
case null => NoState
case state => state
}
def entityId(ref: ActorRef): OptionVal[EntityId] = OptionVal(byRef.get(ref))
@ -400,7 +404,7 @@ private[akka] class Shard(
import akka.cluster.sharding.ShardCoordinator.Internal.CoordinatorMessage
final val verboseDebug = context.system.settings.config.getBoolean("akka.cluster.sharding.verbose-debug-logging")
private val verboseDebug = context.system.settings.config.getBoolean("akka.cluster.sharding.verbose-debug-logging")
private val rememberEntitiesStore: Option[ActorRef] =
rememberEntitiesProvider.map { provider =>
@ -485,7 +489,9 @@ private[akka] class Shard(
tryGetLease(lease.get)
case ll: LeaseLost =>
receiveLeaseLost(ll)
case _ =>
case msg =>
if (verboseDebug)
log.debug("Got msg of type [{}] from [{}] while waiting for lease, stashing", msg.getClass, sender())
stash()
}
@ -520,7 +526,10 @@ private[akka] class Shard(
loadingEntityIdsFailed()
case msg =>
if (verboseDebug)
log.debug("Got msg of type [{}] from [{}] while waiting for remember entitites", msg.getClass, sender())
log.debug(
"Got msg of type [{}] from [{}] while waiting for remember entities, stashing",
msg.getClass,
sender())
stash()
}
@ -578,11 +587,13 @@ private[akka] class Shard(
storingStarts.mkString(", "),
storingStops.mkString(", "))
storingStarts.foreach { entityId =>
flightRecorder.rememberEntityAdd(entityId)
}
storingStops.foreach { id =>
flightRecorder.rememberEntityRemove(id)
if (flightRecorder != NoOpShardingFlightRecorder) {
storingStarts.foreach { entityId =>
flightRecorder.rememberEntityAdd(entityId)
}
storingStops.foreach { id =>
flightRecorder.rememberEntityRemove(id)
}
}
val startTimeNanos = System.nanoTime()
val update = RememberEntitiesShardStore.Update(started = storingStarts, stopped = storingStops)
@ -612,7 +623,7 @@ private[akka] class Shard(
onUpdateDone(storedStarts, storedStops)
case RememberEntityTimeout(`update`) =>
log.error("Remember entity store did not respond, crashing shard")
log.error("Remember entity store did not respond, restarting shard")
throw new RuntimeException(
s"Async write timed out after ${settings.tuningParameters.updatingStateTimeout.pretty}")
case ShardRegion.StartEntity(entityId) => startEntity(entityId, Some(sender()))

View file

@ -1422,7 +1422,7 @@ private[akka] class DDataShardCoordinator(
if (shardIds.nonEmpty) {
val newUnallocatedShards = state.unallocatedShards.union(shardIds.diff(state.shards.keySet))
state = state.copy(unallocatedShards = newUnallocatedShards)
newUnallocatedShards.foreach { self ! GetShardHome(_) }
allocateShardHomesForRememberEntities()
}
timers.cancel(RememberEntitiesTimeoutKey)

View file

@ -6,6 +6,7 @@ package akka.cluster.sharding.internal
import akka.actor.ActorSystem
import akka.actor.ExtendedActorSystem
import akka.actor.Props
import akka.annotation.InternalApi
import akka.cluster.sharding.ClusterShardingSettings
import akka.cluster.sharding.ShardRegion.ShardId
import akka.event.Logging
@ -15,6 +16,7 @@ import akka.event.Logging
*
* Only intended for testing, not an extension point.
*/
@InternalApi
private[akka] final class CustomStateStoreModeProvider(
typeName: String,
system: ActorSystem,

View file

@ -178,7 +178,7 @@ private[akka] final class DDataRememberEntitiesShardStore(
}
private def onUpdate(update: RememberEntitiesShardStore.Update): Unit = {
val allEvts: Set[Evt] = (update.started.map(Started) ++ update.stopped.map(Stopped))
val allEvts: Set[Evt] = (update.started.map(Started(_): Evt).union(update.stopped.map(Stopped)))
// map from set of evts (for same ddata key) to one update that applies each of them
val ddataUpdates: Map[Set[Evt], (Update[ORSet[EntityId]], Int)] =
allEvts.groupBy(evt => key(evt.id)).map {

View file

@ -80,7 +80,7 @@ private[akka] final class EventSourcedRememberEntitiesShardStore(
override def receiveRecover: Receive = {
case EntitiesStarted(ids) => state = state.copy(state.entities.union(ids))
case EntitiesStopped(ids) => state = state.copy(state.entities -- ids)
case EntitiesStopped(ids) => state = state.copy(state.entities.diff(ids))
case SnapshotOffer(_, snapshot: State) => state = snapshot
case RecoveryCompleted =>
log.debug("Recovery completed for shard [{}] with [{}] entities", shardId, state.entities.size)
@ -97,7 +97,7 @@ private[akka] final class EventSourcedRememberEntitiesShardStore(
left -= 1
if (left == 0) {
sender() ! RememberEntitiesShardStore.UpdateDone(started, stopped)
state.copy(state.entities.union(started) -- stopped)
state.copy(state.entities.union(started).diff(stopped))
saveSnapshotWhenNeeded()
}
}

View file

@ -54,6 +54,8 @@ private[akka] final class RememberEntityStarter(
implicit val ec: ExecutionContext = context.dispatcher
import RememberEntityStarter._
require(ids.nonEmpty)
private var idsLeftToStart = Set.empty[EntityId]
private var waitingForAck = Set.empty[EntityId]
private var entitiesMoved = Set.empty[EntityId]

View file

@ -284,7 +284,7 @@ akka.cluster.sharding.state-store-mode = ddata
```
The state of the `ShardCoordinator` is replicated across the cluster but is not stored to disk.
@ref:[Distributed Data](distributed-data.md) handles the `ShardCoordinator`'s state with `WriteMajority`/`ReadMajority` consistency.
@ref:[Distributed Data](distributed-data.md) handles the `ShardCoordinator`'s state with `WriteMajorityPlus`/`ReadMajorityPlus` consistency.
When all nodes in the cluster have been stopped, the state is no longer needed and dropped.
Cluster Sharding uses its own Distributed Data `Replicator` per node.
@ -337,7 +337,7 @@ When `rememberEntities` is enabled, whenever a `Shard` is rebalanced onto anothe
node or recovers after a crash, it will recreate all the entities which were previously
running in that `Shard`.
To permanently stop entities sent a `ClusterSharding.Passivate` to the
To permanently stop entities send a `ClusterSharding.Passivate` to the
@scala[`ActorRef[ShardCommand]`]@java[`ActorRef<ShardCommand>`] that was passed in to
the factory method when creating the entity.
Otherwise, the entity will be automatically restarted after the entity restart backoff specified in the configuration.
@ -397,10 +397,10 @@ If using remembered entities there are two migration options:
reads the data written by the old `persistence` mode. Your remembered entities will be remembered after a full cluster restart.
For migrating existing remembered entities an event adapter needs to be configured in the config for the journal you use in your `application.conf`.
In this example `leveldb` is the used journal:
In this example `cassandra` is the used journal:
```
akka.persistence.journal.leveldb {
akka.persistence.cassandra.journal {
event-adapters {
coordinator-migration = "akka.cluster.sharding.OldCoordinatorStateMigrationEventAdapter"
}