akka-cluster-sharding compiler warnings as fatal errors (#26648)

This commit is contained in:
Johan Andrén 2019-04-05 14:56:33 +02:00 committed by GitHub
parent 38e63a0e41
commit 4bb60bbcc8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
21 changed files with 101 additions and 114 deletions

View file

@ -2,3 +2,6 @@
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.DDataShard.initialized") ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.DDataShard.initialized")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.PersistentShard.initialized") ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.PersistentShard.initialized")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.Shard.initialized") ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.Shard.initialized")
# Code discipline #26648
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.RemoveInternalClusterShardingData.remove")

View file

@ -0,0 +1,9 @@
# Cleaning up compiler warnings
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.RememberEntityStarter.this")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.Shard.props")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.DDataShard.this")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.PersistentShard.this")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.ShardCoordinator.this")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.RememberEntityStarter.props")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.Shard.this")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.RemoveInternalClusterShardingData.remove")

View file

@ -12,7 +12,6 @@ import scala.collection.JavaConverters._
import scala.collection.immutable import scala.collection.immutable
import scala.concurrent.Await import scala.concurrent.Await
import scala.util.control.NonFatal import scala.util.control.NonFatal
import akka.actor.Actor import akka.actor.Actor
import akka.actor.ActorRef import akka.actor.ActorRef
import akka.actor.ActorSystem import akka.actor.ActorSystem
@ -34,7 +33,7 @@ import akka.cluster.ddata.ReplicatorSettings
import akka.cluster.singleton.ClusterSingletonManager import akka.cluster.singleton.ClusterSingletonManager
import akka.dispatch.Dispatchers import akka.dispatch.Dispatchers
import akka.event.Logging import akka.event.Logging
import akka.pattern.BackoffSupervisor import akka.pattern.BackoffOpts
import akka.pattern.ask import akka.pattern.ask
import akka.util.ByteString import akka.util.ByteString
@ -749,15 +748,16 @@ private[akka] class ClusterShardingGuardian extends Actor {
ShardCoordinator.props(typeName, settings, allocationStrategy) ShardCoordinator.props(typeName, settings, allocationStrategy)
else else
ShardCoordinator.props(typeName, settings, allocationStrategy, rep, majorityMinCap) ShardCoordinator.props(typeName, settings, allocationStrategy, rep, majorityMinCap)
val singletonProps = BackoffSupervisor val singletonProps =
.props( BackoffOpts
childProps = coordinatorProps, .onFailure(
childName = "coordinator", childProps = coordinatorProps,
minBackoff = coordinatorFailureBackoff, childName = "coordinator",
maxBackoff = coordinatorFailureBackoff * 5, minBackoff = coordinatorFailureBackoff,
randomFactor = 0.2, maxBackoff = coordinatorFailureBackoff * 5,
maxNrOfRetries = -1) randomFactor = 0.2)
.withDeploy(Deploy.local) .props
.withDeploy(Deploy.local)
val singletonSettings = settings.coordinatorSingletonSettings.withSingletonName("singleton").withRole(role) val singletonSettings = settings.coordinatorSingletonSettings.withSingletonName("singleton").withRole(role)
context.actorOf( context.actorOf(
ClusterSingletonManager ClusterSingletonManager

View file

@ -70,7 +70,7 @@ object RemoveInternalClusterShardingData {
else { else {
val journalPluginId = system.settings.config.getString("akka.cluster.sharding.journal-plugin-id") val journalPluginId = system.settings.config.getString("akka.cluster.sharding.journal-plugin-id")
import system.dispatcher import system.dispatcher
remove(system, journalPluginId, typeNames, terminateSystem = true, remove2dot3Data).onComplete { _ => remove(system, journalPluginId, typeNames, remove2dot3Data).onComplete { _ =>
system.terminate() system.terminate()
} }
} }
@ -85,7 +85,6 @@ object RemoveInternalClusterShardingData {
system: ActorSystem, system: ActorSystem,
journalPluginId: String, journalPluginId: String,
typeNames: Set[String], typeNames: Set[String],
terminateSystem: Boolean,
remove2dot3Data: Boolean): Future[Unit] = { remove2dot3Data: Boolean): Future[Unit] = {
val resolvedJournalPluginId = val resolvedJournalPluginId =
@ -141,7 +140,7 @@ object RemoveInternalClusterShardingData {
var hasSnapshots = false var hasSnapshots = false
override def receiveRecover: Receive = { override def receiveRecover: Receive = {
case event: ShardCoordinator.Internal.DomainEvent => case _: ShardCoordinator.Internal.DomainEvent =>
case SnapshotOffer(_, _) => case SnapshotOffer(_, _) =>
hasSnapshots = true hasSnapshots = true

View file

@ -20,19 +20,21 @@ import akka.actor.{
Timers Timers
} }
import akka.util.{ ConstantFun, MessageBufferMap } import akka.util.{ ConstantFun, MessageBufferMap }
import scala.concurrent.Future
import scala.concurrent.Future
import akka.cluster.Cluster import akka.cluster.Cluster
import akka.cluster.ddata.ORSet import akka.cluster.ddata.ORSet
import akka.cluster.ddata.ORSetKey import akka.cluster.ddata.ORSetKey
import akka.cluster.ddata.Replicator._ import akka.cluster.ddata.Replicator._
import akka.cluster.ddata.SelfUniqueAddress
import akka.persistence._ import akka.persistence._
import akka.util.PrettyDuration._ import akka.util.PrettyDuration._
import akka.coordination.lease.scaladsl.{ Lease, LeaseProvider } import akka.coordination.lease.scaladsl.{ Lease, LeaseProvider }
import akka.pattern.pipe import akka.pattern.pipe
import scala.concurrent.duration._
import scala.concurrent.duration._
import akka.cluster.sharding.ShardRegion.ShardInitialized import akka.cluster.sharding.ShardRegion.ShardInitialized
import akka.util.unused
/** /**
* INTERNAL API * INTERNAL API
@ -117,7 +119,6 @@ private[akka] object Shard {
entityProps: String => Props, entityProps: String => Props,
settings: ClusterShardingSettings, settings: ClusterShardingSettings,
extractEntityId: ShardRegion.ExtractEntityId, extractEntityId: ShardRegion.ExtractEntityId,
extractShardId: ShardRegion.ExtractShardId,
handOffStopMessage: Any, handOffStopMessage: Any,
replicator: ActorRef, replicator: ActorRef,
majorityMinCap: Int): Props = { majorityMinCap: Int): Props = {
@ -129,22 +130,14 @@ private[akka] object Shard {
entityProps, entityProps,
settings, settings,
extractEntityId, extractEntityId,
extractShardId,
handOffStopMessage, handOffStopMessage,
replicator, replicator,
majorityMinCap)).withDeploy(Deploy.local) majorityMinCap)).withDeploy(Deploy.local)
} else if (settings.rememberEntities && settings.stateStoreMode == ClusterShardingSettings.StateStoreModePersistence) } else if (settings.rememberEntities && settings.stateStoreMode == ClusterShardingSettings.StateStoreModePersistence)
Props( Props(new PersistentShard(typeName, shardId, entityProps, settings, extractEntityId, handOffStopMessage))
new PersistentShard( .withDeploy(Deploy.local)
typeName,
shardId,
entityProps,
settings,
extractEntityId,
extractShardId,
handOffStopMessage)).withDeploy(Deploy.local)
else else
Props(new Shard(typeName, shardId, entityProps, settings, extractEntityId, extractShardId, handOffStopMessage)) Props(new Shard(typeName, shardId, entityProps, settings, extractEntityId, handOffStopMessage))
.withDeploy(Deploy.local) .withDeploy(Deploy.local)
} }
@ -166,7 +159,6 @@ private[akka] class Shard(
entityProps: String => Props, entityProps: String => Props,
settings: ClusterShardingSettings, settings: ClusterShardingSettings,
extractEntityId: ShardRegion.ExtractEntityId, extractEntityId: ShardRegion.ExtractEntityId,
extractShardId: ShardRegion.ExtractShardId,
handOffStopMessage: Any) handOffStopMessage: Any)
extends Actor extends Actor
with ActorLogging with ActorLogging
@ -321,7 +313,7 @@ private[akka] class Shard(
} }
def restartEntities(ids: Set[EntityId]): Unit = { def restartEntities(ids: Set[EntityId]): Unit = {
context.actorOf(RememberEntityStarter.props(context.parent, typeName, shardId, ids, settings, sender())) context.actorOf(RememberEntityStarter.props(context.parent, ids, settings, sender()))
} }
def receiveShardRegionCommand(msg: ShardRegionCommand): Unit = msg match { def receiveShardRegionCommand(msg: ShardRegionCommand): Unit = msg match {
@ -460,7 +452,7 @@ private[akka] class Shard(
} }
} }
def deliverTo(id: EntityId, msg: Any, payload: Msg, snd: ActorRef): Unit = { def deliverTo(id: EntityId, @unused msg: Any, payload: Msg, snd: ActorRef): Unit = {
if (passivateIdleTask.isDefined) { if (passivateIdleTask.isDefined) {
lastMessageTimestamp = lastMessageTimestamp.updated(id, System.nanoTime()) lastMessageTimestamp = lastMessageTimestamp.updated(id, System.nanoTime())
} }
@ -491,14 +483,8 @@ private[akka] class Shard(
} }
private[akka] object RememberEntityStarter { private[akka] object RememberEntityStarter {
def props( def props(region: ActorRef, ids: Set[ShardRegion.EntityId], settings: ClusterShardingSettings, requestor: ActorRef) =
region: ActorRef, Props(new RememberEntityStarter(region, ids, settings, requestor))
typeName: String,
shardId: ShardRegion.ShardId,
ids: Set[ShardRegion.EntityId],
settings: ClusterShardingSettings,
requestor: ActorRef) =
Props(new RememberEntityStarter(region, typeName, shardId, ids, settings, requestor))
private case object Tick extends NoSerializationVerificationNeeded private case object Tick extends NoSerializationVerificationNeeded
} }
@ -508,8 +494,6 @@ private[akka] object RememberEntityStarter {
*/ */
private[akka] class RememberEntityStarter( private[akka] class RememberEntityStarter(
region: ActorRef, region: ActorRef,
typeName: String,
shardId: ShardRegion.ShardId,
ids: Set[ShardRegion.EntityId], ids: Set[ShardRegion.EntityId],
settings: ClusterShardingSettings, settings: ClusterShardingSettings,
requestor: ActorRef) requestor: ActorRef)
@ -639,9 +623,8 @@ private[akka] class PersistentShard(
entityProps: String => Props, entityProps: String => Props,
override val settings: ClusterShardingSettings, override val settings: ClusterShardingSettings,
extractEntityId: ShardRegion.ExtractEntityId, extractEntityId: ShardRegion.ExtractEntityId,
extractShardId: ShardRegion.ExtractShardId,
handOffStopMessage: Any) handOffStopMessage: Any)
extends Shard(typeName, shardId, entityProps, settings, extractEntityId, extractShardId, handOffStopMessage) extends Shard(typeName, shardId, entityProps, settings, extractEntityId, handOffStopMessage)
with RememberingShard with RememberingShard
with PersistentActor with PersistentActor
with ActorLogging { with ActorLogging {
@ -737,11 +720,10 @@ private[akka] class DDataShard(
entityProps: String => Props, entityProps: String => Props,
override val settings: ClusterShardingSettings, override val settings: ClusterShardingSettings,
extractEntityId: ShardRegion.ExtractEntityId, extractEntityId: ShardRegion.ExtractEntityId,
extractShardId: ShardRegion.ExtractShardId,
handOffStopMessage: Any, handOffStopMessage: Any,
replicator: ActorRef, replicator: ActorRef,
majorityMinCap: Int) majorityMinCap: Int)
extends Shard(typeName, shardId, entityProps, settings, extractEntityId, extractShardId, handOffStopMessage) extends Shard(typeName, shardId, entityProps, settings, extractEntityId, handOffStopMessage)
with RememberingShard with RememberingShard
with Stash with Stash
with ActorLogging { with ActorLogging {
@ -755,6 +737,7 @@ private[akka] class DDataShard(
private val maxUpdateAttempts = 3 private val maxUpdateAttempts = 3
implicit private val node = Cluster(context.system) implicit private val node = Cluster(context.system)
implicit private val selfUniqueAddress = SelfUniqueAddress(node.selfUniqueAddress)
// The default maximum-frame-size is 256 KiB with Artery. // The default maximum-frame-size is 256 KiB with Artery.
// When using entity identifiers with 36 character strings (e.g. UUID.randomUUID). // When using entity identifiers with 36 character strings (e.g. UUID.randomUUID).
@ -832,8 +815,8 @@ private[akka] class DDataShard(
private def sendUpdate(evt: StateChange, retryCount: Int) = { private def sendUpdate(evt: StateChange, retryCount: Int) = {
replicator ! Update(key(evt.entityId), ORSet.empty[EntityId], writeMajority, Some((evt, retryCount))) { existing => replicator ! Update(key(evt.entityId), ORSet.empty[EntityId], writeMajority, Some((evt, retryCount))) { existing =>
evt match { evt match {
case EntityStarted(id) => existing + id case EntityStarted(id) => existing :+ id
case EntityStopped(id) => existing - id case EntityStopped(id) => existing.remove(id)
} }
} }
} }

View file

@ -26,6 +26,7 @@ import akka.cluster.ddata.GSet
import akka.cluster.ddata.GSetKey import akka.cluster.ddata.GSetKey
import akka.cluster.ddata.Key import akka.cluster.ddata.Key
import akka.cluster.ddata.ReplicatedData import akka.cluster.ddata.ReplicatedData
import akka.cluster.ddata.SelfUniqueAddress
/** /**
* @see [[ClusterSharding$ ClusterSharding extension]] * @see [[ClusterSharding$ ClusterSharding extension]]
@ -191,7 +192,7 @@ object ShardCoordinator {
currentShardAllocations: Map[ActorRef, immutable.IndexedSeq[ShardId]], currentShardAllocations: Map[ActorRef, immutable.IndexedSeq[ShardId]],
rebalanceInProgress: Set[ShardId]): Future[Set[ShardId]] = { rebalanceInProgress: Set[ShardId]): Future[Set[ShardId]] = {
if (rebalanceInProgress.size < maxSimultaneousRebalance) { if (rebalanceInProgress.size < maxSimultaneousRebalance) {
val (regionWithLeastShards, leastShards) = currentShardAllocations.minBy { case (_, v) => v.size } val (_, leastShards) = currentShardAllocations.minBy { case (_, v) => v.size }
val mostShards = currentShardAllocations val mostShards = currentShardAllocations
.collect { .collect {
case (_, v) => v.filterNot(s => rebalanceInProgress(s)) case (_, v) => v.filterNot(s => rebalanceInProgress(s))
@ -467,7 +468,6 @@ object ShardCoordinator {
* @see [[ClusterSharding$ ClusterSharding extension]] * @see [[ClusterSharding$ ClusterSharding extension]]
*/ */
abstract class ShardCoordinator( abstract class ShardCoordinator(
typeName: String,
settings: ClusterShardingSettings, settings: ClusterShardingSettings,
allocationStrategy: ShardCoordinator.ShardAllocationStrategy) allocationStrategy: ShardCoordinator.ShardAllocationStrategy)
extends Actor extends Actor
@ -573,7 +573,7 @@ abstract class ShardCoordinator(
} }
} }
case AllocateShardResult(shard, None, getShardHomeSender) => case AllocateShardResult(shard, None, _) =>
log.debug("Shard [{}] allocation failed. It will be retried.", shard) log.debug("Shard [{}] allocation failed. It will be retried.", shard)
case AllocateShardResult(shard, Some(region), getShardHomeSender) => case AllocateShardResult(shard, Some(region), getShardHomeSender) =>
@ -667,7 +667,7 @@ abstract class ShardCoordinator(
}.toMap) }.toMap)
} }
.recover { .recover {
case x: AskTimeoutException => ShardRegion.ClusterShardingStats(Map.empty) case _: AskTimeoutException => ShardRegion.ClusterShardingStats(Map.empty)
} }
.pipeTo(sender()) .pipeTo(sender())
@ -893,7 +893,7 @@ class PersistentShardCoordinator(
typeName: String, typeName: String,
settings: ClusterShardingSettings, settings: ClusterShardingSettings,
allocationStrategy: ShardCoordinator.ShardAllocationStrategy) allocationStrategy: ShardCoordinator.ShardAllocationStrategy)
extends ShardCoordinator(typeName, settings, allocationStrategy) extends ShardCoordinator(settings, allocationStrategy)
with PersistentActor { with PersistentActor {
import ShardCoordinator.Internal._ import ShardCoordinator.Internal._
import settings.tuningParameters._ import settings.tuningParameters._
@ -908,9 +908,9 @@ class PersistentShardCoordinator(
case evt: DomainEvent => case evt: DomainEvent =>
log.debug("receiveRecover {}", evt) log.debug("receiveRecover {}", evt)
evt match { evt match {
case ShardRegionRegistered(region) => case _: ShardRegionRegistered =>
state = state.updated(evt) state = state.updated(evt)
case ShardRegionProxyRegistered(proxy) => case _: ShardRegionProxyRegistered =>
state = state.updated(evt) state = state.updated(evt)
case ShardRegionTerminated(region) => case ShardRegionTerminated(region) =>
if (state.regions.contains(region)) if (state.regions.contains(region))
@ -925,7 +925,7 @@ class PersistentShardCoordinator(
case ShardRegionProxyTerminated(proxy) => case ShardRegionProxyTerminated(proxy) =>
if (state.regionProxies.contains(proxy)) if (state.regionProxies.contains(proxy))
state = state.updated(evt) state = state.updated(evt)
case ShardHomeAllocated(shard, region) => case _: ShardHomeAllocated =>
state = state.updated(evt) state = state.updated(evt)
case _: ShardHomeDeallocated => case _: ShardHomeDeallocated =>
state = state.updated(evt) state = state.updated(evt)
@ -1001,7 +1001,7 @@ class DDataShardCoordinator(
replicator: ActorRef, replicator: ActorRef,
majorityMinCap: Int, majorityMinCap: Int,
rememberEntities: Boolean) rememberEntities: Boolean)
extends ShardCoordinator(typeName, settings, allocationStrategy) extends ShardCoordinator(settings, allocationStrategy)
with Stash { with Stash {
import ShardCoordinator.Internal._ import ShardCoordinator.Internal._
import akka.cluster.ddata.Replicator.Update import akka.cluster.ddata.Replicator.Update
@ -1010,6 +1010,7 @@ class DDataShardCoordinator(
private val writeMajority = WriteMajority(settings.tuningParameters.updatingStateTimeout, majorityMinCap) private val writeMajority = WriteMajority(settings.tuningParameters.updatingStateTimeout, majorityMinCap)
implicit val node = Cluster(context.system) implicit val node = Cluster(context.system)
private implicit val selfUniqueAddress = SelfUniqueAddress(node.selfUniqueAddress)
val CoordinatorStateKey = LWWRegisterKey[State](s"${typeName}CoordinatorState") val CoordinatorStateKey = LWWRegisterKey[State](s"${typeName}CoordinatorState")
val initEmptyState = State.empty.withRememberEntities(settings.rememberEntities) val initEmptyState = State.empty.withRememberEntities(settings.rememberEntities)
@ -1200,8 +1201,9 @@ class DDataShardCoordinator(
def sendCoordinatorStateUpdate(evt: DomainEvent) = { def sendCoordinatorStateUpdate(evt: DomainEvent) = {
val s = state.updated(evt) val s = state.updated(evt)
replicator ! Update(CoordinatorStateKey, LWWRegister(initEmptyState), writeMajority, Some(evt)) { reg => replicator ! Update(CoordinatorStateKey, LWWRegister(selfUniqueAddress, initEmptyState), writeMajority, Some(evt)) {
reg.withValue(s) reg =>
reg.withValueOf(s)
} }
} }

View file

@ -508,7 +508,7 @@ private[akka] class ShardRegion(
member.hasRole(targetDcRole) && role.forall(member.hasRole) member.hasRole(targetDcRole) && role.forall(member.hasRole)
def coordinatorSelection: Option[ActorSelection] = def coordinatorSelection: Option[ActorSelection] =
membersByAge.headOption.map(m => context.actorSelection(RootActorPath(m.address) + coordinatorPath)) membersByAge.headOption.map(m => context.actorSelection(RootActorPath(m.address).toString + coordinatorPath))
/** /**
* When leaving the coordinator singleton is started rather quickly on next * When leaving the coordinator singleton is started rather quickly on next
@ -516,7 +516,7 @@ private[akka] class ShardRegion(
* the likely locations of the coordinator. * the likely locations of the coordinator.
*/ */
def gracefulShutdownCoordinatorSelections: List[ActorSelection] = def gracefulShutdownCoordinatorSelections: List[ActorSelection] =
membersByAge.take(2).toList.map(m => context.actorSelection(RootActorPath(m.address) + coordinatorPath)) membersByAge.take(2).toList.map(m => context.actorSelection(RootActorPath(m.address).toString + coordinatorPath))
var coordinator: Option[ActorRef] = None var coordinator: Option[ActorRef] = None
@ -741,7 +741,7 @@ private[akka] class ShardRegion(
}.toMap) }.toMap)
} }
.recover { .recover {
case x: AskTimeoutException => ShardRegionStats(Map.empty) case _: AskTimeoutException => ShardRegionStats(Map.empty)
} }
.pipeTo(ref) .pipeTo(ref)
} }
@ -915,23 +915,14 @@ private[akka] class ShardRegion(
val shard = context.watch( val shard = context.watch(
context.actorOf( context.actorOf(
Shard Shard
.props( .props(typeName, id, props, settings, extractEntityId, handOffStopMessage, replicator, majorityMinCap)
typeName,
id,
props,
settings,
extractEntityId,
extractShardId,
handOffStopMessage,
replicator,
majorityMinCap)
.withDispatcher(context.props.dispatcher), .withDispatcher(context.props.dispatcher),
name)) name))
shardsByRef = shardsByRef.updated(shard, id) shardsByRef = shardsByRef.updated(shard, id)
shards = shards.updated(id, shard) shards = shards.updated(id, shard)
startingShards += id startingShards += id
None None
case Some(props) => case Some(_) =>
None None
case None => case None =>
throw new IllegalStateException("Shard must not be allocated to a proxy only ShardRegion") throw new IllegalStateException("Shard must not be allocated to a proxy only ShardRegion")

View file

@ -28,6 +28,7 @@ import akka.cluster.sharding.ShardRegion._
/** /**
* INTERNAL API: Protobuf serializer of ClusterSharding messages. * INTERNAL API: Protobuf serializer of ClusterSharding messages.
*/ */
@ccompatUsedUntil213
private[akka] class ClusterShardingMessageSerializer(val system: ExtendedActorSystem) private[akka] class ClusterShardingMessageSerializer(val system: ExtendedActorSystem)
extends SerializerWithStringManifest extends SerializerWithStringManifest
with BaseSerializer { with BaseSerializer {
@ -125,13 +126,13 @@ private[akka] class ClusterShardingMessageSerializer(val system: ExtendedActorSy
GracefulShutdownReqManifest -> { bytes => GracefulShutdownReqManifest -> { bytes =>
GracefulShutdownReq(actorRefMessageFromBinary(bytes)) GracefulShutdownReq(actorRefMessageFromBinary(bytes))
}, },
GetShardStatsManifest -> { bytes => GetShardStatsManifest -> { _ =>
GetShardStats GetShardStats
}, },
ShardStatsManifest -> { bytes => ShardStatsManifest -> { bytes =>
shardStatsFromBinary(bytes) shardStatsFromBinary(bytes)
}, },
GetShardRegionStatsManifest -> { bytes => GetShardRegionStatsManifest -> { _ =>
GetShardRegionStats GetShardRegionStats
}, },
ShardRegionStatsManifest -> { bytes => ShardRegionStatsManifest -> { bytes =>
@ -225,13 +226,6 @@ private[akka] class ClusterShardingMessageSerializer(val system: ExtendedActorSy
} }
private def coordinatorStateToProto(state: State): sm.CoordinatorState = { private def coordinatorStateToProto(state: State): sm.CoordinatorState = {
val regions = state.regions
.map {
case (regionRef, _) => Serialization.serializedActorPath(regionRef)
}
.toVector
.asJava
val builder = sm.CoordinatorState.newBuilder() val builder = sm.CoordinatorState.newBuilder()
state.shards.foreach { state.shards.foreach {

View file

@ -24,6 +24,7 @@ import akka.remote.transport.ThrottlerTransportAdapter.Direction
import akka.testkit._ import akka.testkit._
import akka.util.ccompat._ import akka.util.ccompat._
@ccompatUsedUntil213
object ClusterShardingFailureSpec { object ClusterShardingFailureSpec {
case class Get(id: String) case class Get(id: String)
case class Add(id: String, i: Int) case class Add(id: String, i: Int)

View file

@ -21,6 +21,7 @@ import akka.cluster.sharding.ShardRegion.GetClusterShardingStats
import akka.cluster.sharding.ShardRegion.ClusterShardingStats import akka.cluster.sharding.ShardRegion.ClusterShardingStats
import akka.util.ccompat._ import akka.util.ccompat._
@ccompatUsedUntil213
object ClusterShardingMinMembersSpec { object ClusterShardingMinMembersSpec {
case object StopEntity case object StopEntity
@ -169,7 +170,7 @@ abstract class ClusterShardingMinMembersSpec(config: ClusterShardingMinMembersSp
runOn(first) { runOn(first) {
region ! 1 region ! 1
// not allocated because third has not registered yet // not allocated because third has not registered yet
expectNoMsg(2.second) expectNoMessage(2.second)
} }
enterBarrier("verified") enterBarrier("verified")

View file

@ -19,6 +19,7 @@ import akka.util.ccompat._
import scala.concurrent.duration._ import scala.concurrent.duration._
@ccompatUsedUntil213
object ClusterShardingRememberEntitiesSpec { object ClusterShardingRememberEntitiesSpec {
final case class Started(ref: ActorRef) final case class Started(ref: ActorRef)

View file

@ -30,7 +30,7 @@ import java.io.File
import org.apache.commons.io.FileUtils import org.apache.commons.io.FileUtils
import akka.cluster.singleton.ClusterSingletonManager import akka.cluster.singleton.ClusterSingletonManager
import akka.cluster.singleton.ClusterSingletonManagerSettings import akka.cluster.singleton.ClusterSingletonManagerSettings
import akka.pattern.BackoffSupervisor import akka.pattern.BackoffOpts
object ClusterShardingSpec { object ClusterShardingSpec {
//#counter-actor //#counter-actor
@ -202,6 +202,7 @@ object ClusterShardingDocCode {
(id.toLong % numberOfShards).toString (id.toLong % numberOfShards).toString
} }
//#extractShardId-StartEntity //#extractShardId-StartEntity
extractShardId.toString() // keep the compiler happy
} }
} }
@ -310,15 +311,16 @@ abstract class ClusterShardingSpec(config: ClusterShardingSpecConfig)
"AutoMigrateRememberRegionTest").foreach { typeName => "AutoMigrateRememberRegionTest").foreach { typeName =>
val rebalanceEnabled = typeName.toLowerCase.startsWith("rebalancing") val rebalanceEnabled = typeName.toLowerCase.startsWith("rebalancing")
val rememberEnabled = typeName.toLowerCase.contains("remember") val rememberEnabled = typeName.toLowerCase.contains("remember")
val singletonProps = BackoffSupervisor val singletonProps =
.props( BackoffOpts
childProps = coordinatorProps(typeName, rebalanceEnabled, rememberEnabled), .onFailure(
childName = "coordinator", childProps = coordinatorProps(typeName, rebalanceEnabled, rememberEnabled),
minBackoff = 5.seconds, childName = "coordinator",
maxBackoff = 5.seconds, minBackoff = 5.seconds,
randomFactor = 0.1, maxBackoff = 5.seconds,
maxNrOfRetries = -1) randomFactor = 0.1)
.withDeploy(Deploy.local) .props
.withDeploy(Deploy.local)
system.actorOf( system.actorOf(
ClusterSingletonManager ClusterSingletonManager
.props(singletonProps, terminationMessage = PoisonPill, settings = ClusterSingletonManagerSettings(system)), .props(singletonProps, terminationMessage = PoisonPill, settings = ClusterSingletonManagerSettings(system)),
@ -642,6 +644,8 @@ abstract class ClusterShardingSpec(config: ClusterShardingSpecConfig)
extractEntityId = extractEntityId, extractEntityId = extractEntityId,
extractShardId = extractShardId) extractShardId = extractShardId)
//#counter-start //#counter-start
counterRegion.toString // keep the compiler happy
ClusterSharding(system).start( ClusterSharding(system).start(
typeName = "AnotherCounter", typeName = "AnotherCounter",
entityProps = Props[AnotherCounter], entityProps = Props[AnotherCounter],
@ -717,6 +721,7 @@ abstract class ClusterShardingSpec(config: ClusterShardingSpecConfig)
extractEntityId = extractEntityId, extractEntityId = extractEntityId,
extractShardId = extractShardId) extractShardId = extractShardId)
// #proxy-dc // #proxy-dc
counterProxyDcB.toString // keep the compiler happy
} }
enterBarrier("after-dc-proxy") enterBarrier("after-dc-proxy")
@ -954,7 +959,7 @@ abstract class ClusterShardingSpec(config: ClusterShardingSpecConfig)
entity ! Identify(n) entity ! Identify(n)
receiveOne(3 seconds) match { receiveOne(3 seconds) match {
case ActorIdentity(id, Some(_)) if id == n => count = count + 1 case ActorIdentity(id, Some(_)) if id == n => count = count + 1
case ActorIdentity(id, None) => //Not on the fifth shard case ActorIdentity(_, None) => //Not on the fifth shard
} }
} }
count should be >= (2) count should be >= (2)

View file

@ -20,6 +20,7 @@ import akka.testkit._
import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigFactory
import akka.util.ccompat._ import akka.util.ccompat._
@ccompatUsedUntil213
object MultiDcClusterShardingSpec { object MultiDcClusterShardingSpec {
sealed trait EntityMsg { sealed trait EntityMsg {
def id: String def id: String

View file

@ -13,7 +13,6 @@ class AllAtOnceEntityRecoveryStrategySpec extends AkkaSpec {
"AllAtOnceEntityRecoveryStrategy" must { "AllAtOnceEntityRecoveryStrategy" must {
"recover entities" in { "recover entities" in {
val entities = Set[EntityId]("1", "2", "3", "4", "5") val entities = Set[EntityId]("1", "2", "3", "4", "5")
val startTime = System.nanoTime()
val result = strategy.recoverEntities(entities) val result = strategy.recoverEntities(entities)
result.size should ===(1) result.size should ===(1)
// the Future is completed immediately for allStrategy // the Future is completed immediately for allStrategy

View file

@ -11,7 +11,7 @@ import akka.cluster.sharding.ShardRegion.HandOffStopper
import akka.testkit.{ AkkaSpec, TestProbe } import akka.testkit.{ AkkaSpec, TestProbe }
import org.mockito.ArgumentMatchers import org.mockito.ArgumentMatchers
import org.mockito.Mockito._ import org.mockito.Mockito._
import org.scalatest.mockito.MockitoSugar import org.scalatestplus.mockito.MockitoSugar
import scala.concurrent.duration._ import scala.concurrent.duration._

View file

@ -36,6 +36,7 @@ object CoordinatedShutdownShardingSpec {
} }
} }
@ccompatUsedUntil213
class CoordinatedShutdownShardingSpec extends AkkaSpec(CoordinatedShutdownShardingSpec.config) with WithLogCapturing { class CoordinatedShutdownShardingSpec extends AkkaSpec(CoordinatedShutdownShardingSpec.config) with WithLogCapturing {
import CoordinatedShutdownShardingSpec._ import CoordinatedShutdownShardingSpec._

View file

@ -13,7 +13,7 @@ import com.typesafe.config.ConfigFactory
import org.scalatest.WordSpecLike import org.scalatest.WordSpecLike
object PersistentShardSpec { object PersistentShardSpec {
class EntityActor(id: String) extends Actor { class EntityActor extends Actor {
override def receive: Receive = { override def receive: Receive = {
case _ => case _ =>
} }
@ -29,10 +29,10 @@ class PersistentShardSpec extends AkkaSpec(PersistentShardSpec.config) with Word
"Persistent Shard" must { "Persistent Shard" must {
"remember entities started with StartEntity" in { "remember entities started with StartEntity" in {
val props = Props( val props =
new PersistentShard("cats", "shard-1", id => Props(new EntityActor(id)), ClusterShardingSettings(system), { Props(new PersistentShard("cats", "shard-1", _ => Props(new EntityActor), ClusterShardingSettings(system), {
case _ => ("entity-1", "msg") case _ => ("entity-1", "msg")
}, _ => "shard-1", PoisonPill)) }, PoisonPill))
val persistentShard = system.actorOf(props) val persistentShard = system.actorOf(props)
watch(persistentShard) watch(persistentShard)

View file

@ -77,7 +77,7 @@ object RemoveInternalClusterShardingDataSpec {
override def recovery: Recovery = Recovery(fromSnapshot = SnapshotSelectionCriteria.None) override def recovery: Recovery = Recovery(fromSnapshot = SnapshotSelectionCriteria.None)
override def receiveRecover: Receive = { override def receiveRecover: Receive = {
case event: ShardCoordinator.Internal.DomainEvent => case _: ShardCoordinator.Internal.DomainEvent =>
hasEvents = true hasEvents = true
case RecoveryCompleted => case RecoveryCompleted =>
replyTo ! hasEvents replyTo ! hasEvents
@ -201,12 +201,8 @@ class RemoveInternalClusterShardingDataSpec
hasEvents(typeName) should ===(true) hasEvents(typeName) should ===(true)
} }
val result = RemoveInternalClusterShardingData.remove( val result =
system, RemoveInternalClusterShardingData.remove(system, journalPluginId = "", typeNames.toSet, remove2dot3Data = true)
journalPluginId = "",
typeNames.toSet,
terminateSystem = false,
remove2dot3Data = true)
Await.ready(result, remaining) Await.ready(result, remaining)
typeNames.foreach { typeName => typeNames.foreach { typeName =>

View file

@ -114,7 +114,6 @@ class ShardSpec extends AkkaSpec(ShardSpec.config) with ImplicitSender {
_ Props(new EntityActor()), _ Props(new EntityActor()),
settings, settings,
extractEntityId, extractEntityId,
extractShardId,
PoisonPill, PoisonPill,
system.deadLetters, system.deadLetters,
1)) 1))

View file

@ -7,7 +7,7 @@ package akka.cluster.sharding
import akka.actor.{ Actor, ActorLogging, ActorRef, PoisonPill, Props } import akka.actor.{ Actor, ActorLogging, ActorRef, PoisonPill, Props }
import akka.cluster.Cluster import akka.cluster.Cluster
import akka.cluster.sharding.ShardRegion.Passivate import akka.cluster.sharding.ShardRegion.Passivate
import akka.pattern.{ Backoff, BackoffOpts, BackoffSupervisor } import akka.pattern.{ BackoffOpts, BackoffSupervisor }
import akka.testkit.{ AkkaSpec, ImplicitSender } import akka.testkit.{ AkkaSpec, ImplicitSender }
import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigFactory
@ -29,7 +29,7 @@ object SupervisionSpec {
} }
val shardResolver: ShardRegion.ExtractShardId = { val shardResolver: ShardRegion.ExtractShardId = {
case Msg(id, msg) => (id % 2).toString case Msg(id, _) => (id % 2).toString
} }
class PassivatingActor extends Actor with ActorLogging { class PassivatingActor extends Actor with ActorLogging {
@ -67,16 +67,16 @@ class SupervisionSpec extends AkkaSpec(SupervisionSpec.config) with ImplicitSend
"allow passivation" in { "allow passivation" in {
val supervisedProps = BackoffSupervisor.props( val supervisedProps =
Backoff BackoffOpts
.onStop( .onStop(
Props(new PassivatingActor()), Props(new PassivatingActor()),
childName = "child", childName = "child",
minBackoff = 1.seconds, minBackoff = 1.seconds,
maxBackoff = 30.seconds, maxBackoff = 30.seconds,
randomFactor = 0.2, randomFactor = 0.2)
maxNrOfRetries = -1) .withFinalStopMessage(_ == StopMessage)
.withFinalStopMessage(_ == StopMessage)) .props
Cluster(system).join(Cluster(system).selfAddress) Cluster(system).join(Cluster(system).selfAddress)
val region = ClusterSharding(system).start( val region = ClusterSharding(system).start(

View file

@ -27,6 +27,7 @@ object AkkaDisciplinePlugin extends AutoPlugin with ScalafixSupport {
"akka-cluster-typed", "akka-cluster-typed",
"akka-persistence", "akka-persistence",
"akka-cluster-tools", "akka-cluster-tools",
"akka-cluster-sharding",
"akka-stream") "akka-stream")
val strictProjects = Set("akka-discovery", "akka-protobuf", "akka-coordination") val strictProjects = Set("akka-discovery", "akka-protobuf", "akka-coordination")
@ -113,4 +114,5 @@ object AkkaDisciplinePlugin extends AutoPlugin with ScalafixSupport {
"-Ypartial-unification", "-Ypartial-unification",
"-Ywarn-extra-implicit") "-Ywarn-extra-implicit")
} }