diff --git a/akka-cluster-sharding/src/main/mima-filters/2.5.21.backwards.excludes b/akka-cluster-sharding/src/main/mima-filters/2.5.21.backwards.excludes index 772258f753..c658b37546 100644 --- a/akka-cluster-sharding/src/main/mima-filters/2.5.21.backwards.excludes +++ b/akka-cluster-sharding/src/main/mima-filters/2.5.21.backwards.excludes @@ -2,3 +2,6 @@ ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.DDataShard.initialized") ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.PersistentShard.initialized") ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.Shard.initialized") + +# Code discipline #26648 +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.RemoveInternalClusterShardingData.remove") diff --git a/akka-cluster-sharding/src/main/mima-filters/2.5.22.backwards.excludes b/akka-cluster-sharding/src/main/mima-filters/2.5.22.backwards.excludes new file mode 100644 index 0000000000..fbcc1dcc80 --- /dev/null +++ b/akka-cluster-sharding/src/main/mima-filters/2.5.22.backwards.excludes @@ -0,0 +1,9 @@ +# Cleaning up compiler warnings +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.RememberEntityStarter.this") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.Shard.props") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.DDataShard.this") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.PersistentShard.this") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.ShardCoordinator.this") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.RememberEntityStarter.props") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.Shard.this") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.RemoveInternalClusterShardingData.remove") diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala index 68016d0029..d71bdde2a3 100755 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala @@ -12,7 +12,6 @@ import scala.collection.JavaConverters._ import scala.collection.immutable import scala.concurrent.Await import scala.util.control.NonFatal - import akka.actor.Actor import akka.actor.ActorRef import akka.actor.ActorSystem @@ -34,7 +33,7 @@ import akka.cluster.ddata.ReplicatorSettings import akka.cluster.singleton.ClusterSingletonManager import akka.dispatch.Dispatchers import akka.event.Logging -import akka.pattern.BackoffSupervisor +import akka.pattern.BackoffOpts import akka.pattern.ask import akka.util.ByteString @@ -749,15 +748,16 @@ private[akka] class ClusterShardingGuardian extends Actor { ShardCoordinator.props(typeName, settings, allocationStrategy) else ShardCoordinator.props(typeName, settings, allocationStrategy, rep, majorityMinCap) - val singletonProps = BackoffSupervisor - .props( - childProps = coordinatorProps, - childName = "coordinator", - minBackoff = coordinatorFailureBackoff, - maxBackoff = coordinatorFailureBackoff * 5, - randomFactor = 0.2, - maxNrOfRetries = -1) - .withDeploy(Deploy.local) + val singletonProps = + BackoffOpts + .onFailure( + childProps = coordinatorProps, + childName = "coordinator", + minBackoff = coordinatorFailureBackoff, + maxBackoff = coordinatorFailureBackoff * 5, + randomFactor = 0.2) + .props + .withDeploy(Deploy.local) val singletonSettings = settings.coordinatorSingletonSettings.withSingletonName("singleton").withRole(role) context.actorOf( ClusterSingletonManager diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/RemoveInternalClusterShardingData.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/RemoveInternalClusterShardingData.scala index a9e297c3e9..0de02eca0d 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/RemoveInternalClusterShardingData.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/RemoveInternalClusterShardingData.scala @@ -70,7 +70,7 @@ object RemoveInternalClusterShardingData { else { val journalPluginId = system.settings.config.getString("akka.cluster.sharding.journal-plugin-id") import system.dispatcher - remove(system, journalPluginId, typeNames, terminateSystem = true, remove2dot3Data).onComplete { _ => + remove(system, journalPluginId, typeNames, remove2dot3Data).onComplete { _ => system.terminate() } } @@ -85,7 +85,6 @@ object RemoveInternalClusterShardingData { system: ActorSystem, journalPluginId: String, typeNames: Set[String], - terminateSystem: Boolean, remove2dot3Data: Boolean): Future[Unit] = { val resolvedJournalPluginId = @@ -141,7 +140,7 @@ object RemoveInternalClusterShardingData { var hasSnapshots = false override def receiveRecover: Receive = { - case event: ShardCoordinator.Internal.DomainEvent => + case _: ShardCoordinator.Internal.DomainEvent => case SnapshotOffer(_, _) => hasSnapshots = true diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala index 209e2877b4..0d07077829 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala @@ -20,19 +20,21 @@ import akka.actor.{ Timers } import akka.util.{ ConstantFun, MessageBufferMap } -import scala.concurrent.Future +import scala.concurrent.Future import akka.cluster.Cluster import akka.cluster.ddata.ORSet import akka.cluster.ddata.ORSetKey import akka.cluster.ddata.Replicator._ +import akka.cluster.ddata.SelfUniqueAddress import akka.persistence._ import akka.util.PrettyDuration._ import akka.coordination.lease.scaladsl.{ Lease, LeaseProvider } import akka.pattern.pipe -import scala.concurrent.duration._ +import scala.concurrent.duration._ import akka.cluster.sharding.ShardRegion.ShardInitialized +import akka.util.unused /** * INTERNAL API @@ -117,7 +119,6 @@ private[akka] object Shard { entityProps: String => Props, settings: ClusterShardingSettings, extractEntityId: ShardRegion.ExtractEntityId, - extractShardId: ShardRegion.ExtractShardId, handOffStopMessage: Any, replicator: ActorRef, majorityMinCap: Int): Props = { @@ -129,22 +130,14 @@ private[akka] object Shard { entityProps, settings, extractEntityId, - extractShardId, handOffStopMessage, replicator, majorityMinCap)).withDeploy(Deploy.local) } else if (settings.rememberEntities && settings.stateStoreMode == ClusterShardingSettings.StateStoreModePersistence) - Props( - new PersistentShard( - typeName, - shardId, - entityProps, - settings, - extractEntityId, - extractShardId, - handOffStopMessage)).withDeploy(Deploy.local) + Props(new PersistentShard(typeName, shardId, entityProps, settings, extractEntityId, handOffStopMessage)) + .withDeploy(Deploy.local) else - Props(new Shard(typeName, shardId, entityProps, settings, extractEntityId, extractShardId, handOffStopMessage)) + Props(new Shard(typeName, shardId, entityProps, settings, extractEntityId, handOffStopMessage)) .withDeploy(Deploy.local) } @@ -166,7 +159,6 @@ private[akka] class Shard( entityProps: String => Props, settings: ClusterShardingSettings, extractEntityId: ShardRegion.ExtractEntityId, - extractShardId: ShardRegion.ExtractShardId, handOffStopMessage: Any) extends Actor with ActorLogging @@ -321,7 +313,7 @@ private[akka] class Shard( } def restartEntities(ids: Set[EntityId]): Unit = { - context.actorOf(RememberEntityStarter.props(context.parent, typeName, shardId, ids, settings, sender())) + context.actorOf(RememberEntityStarter.props(context.parent, ids, settings, sender())) } def receiveShardRegionCommand(msg: ShardRegionCommand): Unit = msg match { @@ -460,7 +452,7 @@ private[akka] class Shard( } } - def deliverTo(id: EntityId, msg: Any, payload: Msg, snd: ActorRef): Unit = { + def deliverTo(id: EntityId, @unused msg: Any, payload: Msg, snd: ActorRef): Unit = { if (passivateIdleTask.isDefined) { lastMessageTimestamp = lastMessageTimestamp.updated(id, System.nanoTime()) } @@ -491,14 +483,8 @@ private[akka] class Shard( } private[akka] object RememberEntityStarter { - def props( - region: ActorRef, - typeName: String, - shardId: ShardRegion.ShardId, - ids: Set[ShardRegion.EntityId], - settings: ClusterShardingSettings, - requestor: ActorRef) = - Props(new RememberEntityStarter(region, typeName, shardId, ids, settings, requestor)) + def props(region: ActorRef, ids: Set[ShardRegion.EntityId], settings: ClusterShardingSettings, requestor: ActorRef) = + Props(new RememberEntityStarter(region, ids, settings, requestor)) private case object Tick extends NoSerializationVerificationNeeded } @@ -508,8 +494,6 @@ private[akka] object RememberEntityStarter { */ private[akka] class RememberEntityStarter( region: ActorRef, - typeName: String, - shardId: ShardRegion.ShardId, ids: Set[ShardRegion.EntityId], settings: ClusterShardingSettings, requestor: ActorRef) @@ -639,9 +623,8 @@ private[akka] class PersistentShard( entityProps: String => Props, override val settings: ClusterShardingSettings, extractEntityId: ShardRegion.ExtractEntityId, - extractShardId: ShardRegion.ExtractShardId, handOffStopMessage: Any) - extends Shard(typeName, shardId, entityProps, settings, extractEntityId, extractShardId, handOffStopMessage) + extends Shard(typeName, shardId, entityProps, settings, extractEntityId, handOffStopMessage) with RememberingShard with PersistentActor with ActorLogging { @@ -737,11 +720,10 @@ private[akka] class DDataShard( entityProps: String => Props, override val settings: ClusterShardingSettings, extractEntityId: ShardRegion.ExtractEntityId, - extractShardId: ShardRegion.ExtractShardId, handOffStopMessage: Any, replicator: ActorRef, majorityMinCap: Int) - extends Shard(typeName, shardId, entityProps, settings, extractEntityId, extractShardId, handOffStopMessage) + extends Shard(typeName, shardId, entityProps, settings, extractEntityId, handOffStopMessage) with RememberingShard with Stash with ActorLogging { @@ -755,6 +737,7 @@ private[akka] class DDataShard( private val maxUpdateAttempts = 3 implicit private val node = Cluster(context.system) + implicit private val selfUniqueAddress = SelfUniqueAddress(node.selfUniqueAddress) // The default maximum-frame-size is 256 KiB with Artery. // When using entity identifiers with 36 character strings (e.g. UUID.randomUUID). @@ -832,8 +815,8 @@ private[akka] class DDataShard( private def sendUpdate(evt: StateChange, retryCount: Int) = { replicator ! Update(key(evt.entityId), ORSet.empty[EntityId], writeMajority, Some((evt, retryCount))) { existing => evt match { - case EntityStarted(id) => existing + id - case EntityStopped(id) => existing - id + case EntityStarted(id) => existing :+ id + case EntityStopped(id) => existing.remove(id) } } } diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala index a739b7cbb1..4dfab36a78 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala @@ -26,6 +26,7 @@ import akka.cluster.ddata.GSet import akka.cluster.ddata.GSetKey import akka.cluster.ddata.Key import akka.cluster.ddata.ReplicatedData +import akka.cluster.ddata.SelfUniqueAddress /** * @see [[ClusterSharding$ ClusterSharding extension]] @@ -191,7 +192,7 @@ object ShardCoordinator { currentShardAllocations: Map[ActorRef, immutable.IndexedSeq[ShardId]], rebalanceInProgress: Set[ShardId]): Future[Set[ShardId]] = { if (rebalanceInProgress.size < maxSimultaneousRebalance) { - val (regionWithLeastShards, leastShards) = currentShardAllocations.minBy { case (_, v) => v.size } + val (_, leastShards) = currentShardAllocations.minBy { case (_, v) => v.size } val mostShards = currentShardAllocations .collect { case (_, v) => v.filterNot(s => rebalanceInProgress(s)) @@ -467,7 +468,6 @@ object ShardCoordinator { * @see [[ClusterSharding$ ClusterSharding extension]] */ abstract class ShardCoordinator( - typeName: String, settings: ClusterShardingSettings, allocationStrategy: ShardCoordinator.ShardAllocationStrategy) extends Actor @@ -573,7 +573,7 @@ abstract class ShardCoordinator( } } - case AllocateShardResult(shard, None, getShardHomeSender) => + case AllocateShardResult(shard, None, _) => log.debug("Shard [{}] allocation failed. It will be retried.", shard) case AllocateShardResult(shard, Some(region), getShardHomeSender) => @@ -667,7 +667,7 @@ abstract class ShardCoordinator( }.toMap) } .recover { - case x: AskTimeoutException => ShardRegion.ClusterShardingStats(Map.empty) + case _: AskTimeoutException => ShardRegion.ClusterShardingStats(Map.empty) } .pipeTo(sender()) @@ -893,7 +893,7 @@ class PersistentShardCoordinator( typeName: String, settings: ClusterShardingSettings, allocationStrategy: ShardCoordinator.ShardAllocationStrategy) - extends ShardCoordinator(typeName, settings, allocationStrategy) + extends ShardCoordinator(settings, allocationStrategy) with PersistentActor { import ShardCoordinator.Internal._ import settings.tuningParameters._ @@ -908,9 +908,9 @@ class PersistentShardCoordinator( case evt: DomainEvent => log.debug("receiveRecover {}", evt) evt match { - case ShardRegionRegistered(region) => + case _: ShardRegionRegistered => state = state.updated(evt) - case ShardRegionProxyRegistered(proxy) => + case _: ShardRegionProxyRegistered => state = state.updated(evt) case ShardRegionTerminated(region) => if (state.regions.contains(region)) @@ -925,7 +925,7 @@ class PersistentShardCoordinator( case ShardRegionProxyTerminated(proxy) => if (state.regionProxies.contains(proxy)) state = state.updated(evt) - case ShardHomeAllocated(shard, region) => + case _: ShardHomeAllocated => state = state.updated(evt) case _: ShardHomeDeallocated => state = state.updated(evt) @@ -1001,7 +1001,7 @@ class DDataShardCoordinator( replicator: ActorRef, majorityMinCap: Int, rememberEntities: Boolean) - extends ShardCoordinator(typeName, settings, allocationStrategy) + extends ShardCoordinator(settings, allocationStrategy) with Stash { import ShardCoordinator.Internal._ import akka.cluster.ddata.Replicator.Update @@ -1010,6 +1010,7 @@ class DDataShardCoordinator( private val writeMajority = WriteMajority(settings.tuningParameters.updatingStateTimeout, majorityMinCap) implicit val node = Cluster(context.system) + private implicit val selfUniqueAddress = SelfUniqueAddress(node.selfUniqueAddress) val CoordinatorStateKey = LWWRegisterKey[State](s"${typeName}CoordinatorState") val initEmptyState = State.empty.withRememberEntities(settings.rememberEntities) @@ -1200,8 +1201,9 @@ class DDataShardCoordinator( def sendCoordinatorStateUpdate(evt: DomainEvent) = { val s = state.updated(evt) - replicator ! Update(CoordinatorStateKey, LWWRegister(initEmptyState), writeMajority, Some(evt)) { reg => - reg.withValue(s) + replicator ! Update(CoordinatorStateKey, LWWRegister(selfUniqueAddress, initEmptyState), writeMajority, Some(evt)) { + reg => + reg.withValueOf(s) } } diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala index 887e502ec2..b30da0dbfd 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala @@ -508,7 +508,7 @@ private[akka] class ShardRegion( member.hasRole(targetDcRole) && role.forall(member.hasRole) def coordinatorSelection: Option[ActorSelection] = - membersByAge.headOption.map(m => context.actorSelection(RootActorPath(m.address) + coordinatorPath)) + membersByAge.headOption.map(m => context.actorSelection(RootActorPath(m.address).toString + coordinatorPath)) /** * When leaving the coordinator singleton is started rather quickly on next @@ -516,7 +516,7 @@ private[akka] class ShardRegion( * the likely locations of the coordinator. */ def gracefulShutdownCoordinatorSelections: List[ActorSelection] = - membersByAge.take(2).toList.map(m => context.actorSelection(RootActorPath(m.address) + coordinatorPath)) + membersByAge.take(2).toList.map(m => context.actorSelection(RootActorPath(m.address).toString + coordinatorPath)) var coordinator: Option[ActorRef] = None @@ -741,7 +741,7 @@ private[akka] class ShardRegion( }.toMap) } .recover { - case x: AskTimeoutException => ShardRegionStats(Map.empty) + case _: AskTimeoutException => ShardRegionStats(Map.empty) } .pipeTo(ref) } @@ -915,23 +915,14 @@ private[akka] class ShardRegion( val shard = context.watch( context.actorOf( Shard - .props( - typeName, - id, - props, - settings, - extractEntityId, - extractShardId, - handOffStopMessage, - replicator, - majorityMinCap) + .props(typeName, id, props, settings, extractEntityId, handOffStopMessage, replicator, majorityMinCap) .withDispatcher(context.props.dispatcher), name)) shardsByRef = shardsByRef.updated(shard, id) shards = shards.updated(id, shard) startingShards += id None - case Some(props) => + case Some(_) => None case None => throw new IllegalStateException("Shard must not be allocated to a proxy only ShardRegion") diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/protobuf/ClusterShardingMessageSerializer.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/protobuf/ClusterShardingMessageSerializer.scala index 14492d2237..5f3de33350 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/protobuf/ClusterShardingMessageSerializer.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/protobuf/ClusterShardingMessageSerializer.scala @@ -28,6 +28,7 @@ import akka.cluster.sharding.ShardRegion._ /** * INTERNAL API: Protobuf serializer of ClusterSharding messages. */ +@ccompatUsedUntil213 private[akka] class ClusterShardingMessageSerializer(val system: ExtendedActorSystem) extends SerializerWithStringManifest with BaseSerializer { @@ -125,13 +126,13 @@ private[akka] class ClusterShardingMessageSerializer(val system: ExtendedActorSy GracefulShutdownReqManifest -> { bytes => GracefulShutdownReq(actorRefMessageFromBinary(bytes)) }, - GetShardStatsManifest -> { bytes => + GetShardStatsManifest -> { _ => GetShardStats }, ShardStatsManifest -> { bytes => shardStatsFromBinary(bytes) }, - GetShardRegionStatsManifest -> { bytes => + GetShardRegionStatsManifest -> { _ => GetShardRegionStats }, ShardRegionStatsManifest -> { bytes => @@ -225,13 +226,6 @@ private[akka] class ClusterShardingMessageSerializer(val system: ExtendedActorSy } private def coordinatorStateToProto(state: State): sm.CoordinatorState = { - val regions = state.regions - .map { - case (regionRef, _) => Serialization.serializedActorPath(regionRef) - } - .toVector - .asJava - val builder = sm.CoordinatorState.newBuilder() state.shards.foreach { diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingFailureSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingFailureSpec.scala index 14882ede44..89c41206de 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingFailureSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingFailureSpec.scala @@ -24,6 +24,7 @@ import akka.remote.transport.ThrottlerTransportAdapter.Direction import akka.testkit._ import akka.util.ccompat._ +@ccompatUsedUntil213 object ClusterShardingFailureSpec { case class Get(id: String) case class Add(id: String, i: Int) diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingMinMembersSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingMinMembersSpec.scala index 6aea8b1466..c8f5ba9022 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingMinMembersSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingMinMembersSpec.scala @@ -21,6 +21,7 @@ import akka.cluster.sharding.ShardRegion.GetClusterShardingStats import akka.cluster.sharding.ShardRegion.ClusterShardingStats import akka.util.ccompat._ +@ccompatUsedUntil213 object ClusterShardingMinMembersSpec { case object StopEntity @@ -169,7 +170,7 @@ abstract class ClusterShardingMinMembersSpec(config: ClusterShardingMinMembersSp runOn(first) { region ! 1 // not allocated because third has not registered yet - expectNoMsg(2.second) + expectNoMessage(2.second) } enterBarrier("verified") diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesSpec.scala index 6dee4ef5b3..aaace0a90e 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesSpec.scala @@ -19,6 +19,7 @@ import akka.util.ccompat._ import scala.concurrent.duration._ +@ccompatUsedUntil213 object ClusterShardingRememberEntitiesSpec { final case class Started(ref: ActorRef) diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala index 2f38bf0cdd..f7c243580d 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala @@ -30,7 +30,7 @@ import java.io.File import org.apache.commons.io.FileUtils import akka.cluster.singleton.ClusterSingletonManager import akka.cluster.singleton.ClusterSingletonManagerSettings -import akka.pattern.BackoffSupervisor +import akka.pattern.BackoffOpts object ClusterShardingSpec { //#counter-actor @@ -202,6 +202,7 @@ object ClusterShardingDocCode { (id.toLong % numberOfShards).toString } //#extractShardId-StartEntity + extractShardId.toString() // keep the compiler happy } } @@ -310,15 +311,16 @@ abstract class ClusterShardingSpec(config: ClusterShardingSpecConfig) "AutoMigrateRememberRegionTest").foreach { typeName => val rebalanceEnabled = typeName.toLowerCase.startsWith("rebalancing") val rememberEnabled = typeName.toLowerCase.contains("remember") - val singletonProps = BackoffSupervisor - .props( - childProps = coordinatorProps(typeName, rebalanceEnabled, rememberEnabled), - childName = "coordinator", - minBackoff = 5.seconds, - maxBackoff = 5.seconds, - randomFactor = 0.1, - maxNrOfRetries = -1) - .withDeploy(Deploy.local) + val singletonProps = + BackoffOpts + .onFailure( + childProps = coordinatorProps(typeName, rebalanceEnabled, rememberEnabled), + childName = "coordinator", + minBackoff = 5.seconds, + maxBackoff = 5.seconds, + randomFactor = 0.1) + .props + .withDeploy(Deploy.local) system.actorOf( ClusterSingletonManager .props(singletonProps, terminationMessage = PoisonPill, settings = ClusterSingletonManagerSettings(system)), @@ -642,6 +644,8 @@ abstract class ClusterShardingSpec(config: ClusterShardingSpecConfig) extractEntityId = extractEntityId, extractShardId = extractShardId) //#counter-start + counterRegion.toString // keep the compiler happy + ClusterSharding(system).start( typeName = "AnotherCounter", entityProps = Props[AnotherCounter], @@ -717,6 +721,7 @@ abstract class ClusterShardingSpec(config: ClusterShardingSpecConfig) extractEntityId = extractEntityId, extractShardId = extractShardId) // #proxy-dc + counterProxyDcB.toString // keep the compiler happy } enterBarrier("after-dc-proxy") @@ -954,7 +959,7 @@ abstract class ClusterShardingSpec(config: ClusterShardingSpecConfig) entity ! Identify(n) receiveOne(3 seconds) match { case ActorIdentity(id, Some(_)) if id == n => count = count + 1 - case ActorIdentity(id, None) => //Not on the fifth shard + case ActorIdentity(_, None) => //Not on the fifth shard } } count should be >= (2) diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/MultiDcClusterShardingSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/MultiDcClusterShardingSpec.scala index 995a0647ac..4b29f2b301 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/MultiDcClusterShardingSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/MultiDcClusterShardingSpec.scala @@ -20,6 +20,7 @@ import akka.testkit._ import com.typesafe.config.ConfigFactory import akka.util.ccompat._ +@ccompatUsedUntil213 object MultiDcClusterShardingSpec { sealed trait EntityMsg { def id: String diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/AllAtOnceEntityRecoveryStrategySpec.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/AllAtOnceEntityRecoveryStrategySpec.scala index 5646242425..7ce9ce803c 100644 --- a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/AllAtOnceEntityRecoveryStrategySpec.scala +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/AllAtOnceEntityRecoveryStrategySpec.scala @@ -13,7 +13,6 @@ class AllAtOnceEntityRecoveryStrategySpec extends AkkaSpec { "AllAtOnceEntityRecoveryStrategy" must { "recover entities" in { val entities = Set[EntityId]("1", "2", "3", "4", "5") - val startTime = System.nanoTime() val result = strategy.recoverEntities(entities) result.size should ===(1) // the Future is completed immediately for allStrategy diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/ClusterShardingInternalsSpec.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/ClusterShardingInternalsSpec.scala index b907db6d51..1687128342 100644 --- a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/ClusterShardingInternalsSpec.scala +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/ClusterShardingInternalsSpec.scala @@ -11,7 +11,7 @@ import akka.cluster.sharding.ShardRegion.HandOffStopper import akka.testkit.{ AkkaSpec, TestProbe } import org.mockito.ArgumentMatchers import org.mockito.Mockito._ -import org.scalatest.mockito.MockitoSugar +import org.scalatestplus.mockito.MockitoSugar import scala.concurrent.duration._ diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/CoordinatedShutdownShardingSpec.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/CoordinatedShutdownShardingSpec.scala index 9440c15bac..8836255aa0 100644 --- a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/CoordinatedShutdownShardingSpec.scala +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/CoordinatedShutdownShardingSpec.scala @@ -36,6 +36,7 @@ object CoordinatedShutdownShardingSpec { } } +@ccompatUsedUntil213 class CoordinatedShutdownShardingSpec extends AkkaSpec(CoordinatedShutdownShardingSpec.config) with WithLogCapturing { import CoordinatedShutdownShardingSpec._ diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/PersistentShardSpec.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/PersistentShardSpec.scala index 1bca0ad1f3..d1e9431a82 100644 --- a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/PersistentShardSpec.scala +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/PersistentShardSpec.scala @@ -13,7 +13,7 @@ import com.typesafe.config.ConfigFactory import org.scalatest.WordSpecLike object PersistentShardSpec { - class EntityActor(id: String) extends Actor { + class EntityActor extends Actor { override def receive: Receive = { case _ => } @@ -29,10 +29,10 @@ class PersistentShardSpec extends AkkaSpec(PersistentShardSpec.config) with Word "Persistent Shard" must { "remember entities started with StartEntity" in { - val props = Props( - new PersistentShard("cats", "shard-1", id => Props(new EntityActor(id)), ClusterShardingSettings(system), { + val props = + Props(new PersistentShard("cats", "shard-1", _ => Props(new EntityActor), ClusterShardingSettings(system), { case _ => ("entity-1", "msg") - }, _ => "shard-1", PoisonPill)) + }, PoisonPill)) val persistentShard = system.actorOf(props) watch(persistentShard) diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/RemoveInternalClusterShardingDataSpec.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/RemoveInternalClusterShardingDataSpec.scala index e39853d2a1..5d980500b6 100644 --- a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/RemoveInternalClusterShardingDataSpec.scala +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/RemoveInternalClusterShardingDataSpec.scala @@ -77,7 +77,7 @@ object RemoveInternalClusterShardingDataSpec { override def recovery: Recovery = Recovery(fromSnapshot = SnapshotSelectionCriteria.None) override def receiveRecover: Receive = { - case event: ShardCoordinator.Internal.DomainEvent => + case _: ShardCoordinator.Internal.DomainEvent => hasEvents = true case RecoveryCompleted => replyTo ! hasEvents @@ -201,12 +201,8 @@ class RemoveInternalClusterShardingDataSpec hasEvents(typeName) should ===(true) } - val result = RemoveInternalClusterShardingData.remove( - system, - journalPluginId = "", - typeNames.toSet, - terminateSystem = false, - remove2dot3Data = true) + val result = + RemoveInternalClusterShardingData.remove(system, journalPluginId = "", typeNames.toSet, remove2dot3Data = true) Await.ready(result, remaining) typeNames.foreach { typeName => diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/ShardSpec.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/ShardSpec.scala index 6fe412090f..ec4c8bca6d 100644 --- a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/ShardSpec.scala +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/ShardSpec.scala @@ -114,7 +114,6 @@ class ShardSpec extends AkkaSpec(ShardSpec.config) with ImplicitSender { _ ⇒ Props(new EntityActor()), settings, extractEntityId, - extractShardId, PoisonPill, system.deadLetters, 1)) diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/SupervisionSpec.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/SupervisionSpec.scala index 437f48ec52..89999f9008 100644 --- a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/SupervisionSpec.scala +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/SupervisionSpec.scala @@ -7,7 +7,7 @@ package akka.cluster.sharding import akka.actor.{ Actor, ActorLogging, ActorRef, PoisonPill, Props } import akka.cluster.Cluster import akka.cluster.sharding.ShardRegion.Passivate -import akka.pattern.{ Backoff, BackoffOpts, BackoffSupervisor } +import akka.pattern.{ BackoffOpts, BackoffSupervisor } import akka.testkit.{ AkkaSpec, ImplicitSender } import com.typesafe.config.ConfigFactory @@ -29,7 +29,7 @@ object SupervisionSpec { } val shardResolver: ShardRegion.ExtractShardId = { - case Msg(id, msg) => (id % 2).toString + case Msg(id, _) => (id % 2).toString } class PassivatingActor extends Actor with ActorLogging { @@ -67,16 +67,16 @@ class SupervisionSpec extends AkkaSpec(SupervisionSpec.config) with ImplicitSend "allow passivation" in { - val supervisedProps = BackoffSupervisor.props( - Backoff + val supervisedProps = + BackoffOpts .onStop( Props(new PassivatingActor()), childName = "child", minBackoff = 1.seconds, maxBackoff = 30.seconds, - randomFactor = 0.2, - maxNrOfRetries = -1) - .withFinalStopMessage(_ == StopMessage)) + randomFactor = 0.2) + .withFinalStopMessage(_ == StopMessage) + .props Cluster(system).join(Cluster(system).selfAddress) val region = ClusterSharding(system).start( diff --git a/project/AkkaDisciplinePlugin.scala b/project/AkkaDisciplinePlugin.scala index 8912a6013b..d900457c7f 100644 --- a/project/AkkaDisciplinePlugin.scala +++ b/project/AkkaDisciplinePlugin.scala @@ -27,6 +27,7 @@ object AkkaDisciplinePlugin extends AutoPlugin with ScalafixSupport { "akka-cluster-typed", "akka-persistence", "akka-cluster-tools", + "akka-cluster-sharding", "akka-stream") val strictProjects = Set("akka-discovery", "akka-protobuf", "akka-coordination") @@ -113,4 +114,5 @@ object AkkaDisciplinePlugin extends AutoPlugin with ScalafixSupport { "-Ypartial-unification", "-Ywarn-extra-implicit") + }