akka-cluster-tools compiler warnings as fatal errors (#26647)
This commit is contained in:
parent
cc1138523e
commit
d699332b53
21 changed files with 99 additions and 112 deletions
|
|
@ -41,6 +41,7 @@ import akka.util.MessageBuffer
|
||||||
import akka.util.ccompat._
|
import akka.util.ccompat._
|
||||||
import scala.collection.immutable.{ HashMap, HashSet }
|
import scala.collection.immutable.{ HashMap, HashSet }
|
||||||
|
|
||||||
|
@ccompatUsedUntil213
|
||||||
object ClusterClientSettings {
|
object ClusterClientSettings {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -7,7 +7,6 @@ package akka.cluster.client.protobuf
|
||||||
import scala.collection.JavaConverters._
|
import scala.collection.JavaConverters._
|
||||||
import akka.actor.ExtendedActorSystem
|
import akka.actor.ExtendedActorSystem
|
||||||
import akka.serialization.BaseSerializer
|
import akka.serialization.BaseSerializer
|
||||||
import akka.serialization.SerializationExtension
|
|
||||||
import akka.serialization.SerializerWithStringManifest
|
import akka.serialization.SerializerWithStringManifest
|
||||||
import akka.cluster.client.ClusterReceptionist
|
import akka.cluster.client.ClusterReceptionist
|
||||||
import akka.cluster.client.protobuf.msg.{ ClusterClientMessages => cm }
|
import akka.cluster.client.protobuf.msg.{ ClusterClientMessages => cm }
|
||||||
|
|
@ -21,8 +20,6 @@ private[akka] class ClusterClientMessageSerializer(val system: ExtendedActorSyst
|
||||||
with BaseSerializer {
|
with BaseSerializer {
|
||||||
import ClusterReceptionist.Internal._
|
import ClusterReceptionist.Internal._
|
||||||
|
|
||||||
private lazy val serialization = SerializationExtension(system)
|
|
||||||
|
|
||||||
private val ContactsManifest = "A"
|
private val ContactsManifest = "A"
|
||||||
private val GetContactsManifest = "B"
|
private val GetContactsManifest = "B"
|
||||||
private val HeartbeatManifest = "C"
|
private val HeartbeatManifest = "C"
|
||||||
|
|
|
||||||
|
|
@ -354,9 +354,11 @@ object DistributedPubSubMediator {
|
||||||
case Terminated(ref) =>
|
case Terminated(ref) =>
|
||||||
remove(ref)
|
remove(ref)
|
||||||
case Prune =>
|
case Prune =>
|
||||||
for (d <- pruneDeadline if d.isOverdue) {
|
pruneDeadline match {
|
||||||
pruneDeadline = None
|
case Some(deadline) if deadline.isOverdue() =>
|
||||||
context.parent ! NoMoreSubscribers
|
pruneDeadline = None
|
||||||
|
context.parent ! NoMoreSubscribers
|
||||||
|
case _ =>
|
||||||
}
|
}
|
||||||
case TerminateRequest =>
|
case TerminateRequest =>
|
||||||
if (subscribers.isEmpty && context.children.isEmpty)
|
if (subscribers.isEmpty && context.children.isEmpty)
|
||||||
|
|
@ -640,7 +642,7 @@ class DistributedPubSubMediator(settings: DistributedPubSubSettings)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
case msg @ RegisterTopic(t) =>
|
case RegisterTopic(t) =>
|
||||||
registerTopic(t)
|
registerTopic(t)
|
||||||
|
|
||||||
case NoMoreSubscribers =>
|
case NoMoreSubscribers =>
|
||||||
|
|
@ -655,7 +657,7 @@ class DistributedPubSubMediator(settings: DistributedPubSubSettings)
|
||||||
case GetTopics =>
|
case GetTopics =>
|
||||||
sender ! CurrentTopics(getCurrentTopics())
|
sender ! CurrentTopics(getCurrentTopics())
|
||||||
|
|
||||||
case msg @ Subscribed(ack, ref) =>
|
case Subscribed(ack, ref) =>
|
||||||
ref ! ack
|
ref ! ack
|
||||||
|
|
||||||
case msg @ Unsubscribe(topic, _, _) =>
|
case msg @ Unsubscribe(topic, _, _) =>
|
||||||
|
|
@ -667,7 +669,7 @@ class DistributedPubSubMediator(settings: DistributedPubSubSettings)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
case msg @ Unsubscribed(ack, ref) =>
|
case Unsubscribed(ack, ref) =>
|
||||||
ref ! ack
|
ref ! ack
|
||||||
|
|
||||||
case Status(otherVersions, isReplyToStatus) =>
|
case Status(otherVersions, isReplyToStatus) =>
|
||||||
|
|
@ -751,7 +753,7 @@ class DistributedPubSubMediator(settings: DistributedPubSubSettings)
|
||||||
|
|
||||||
case Count =>
|
case Count =>
|
||||||
val count = registry.map {
|
val count = registry.map {
|
||||||
case (owner, bucket) =>
|
case (_, bucket) =>
|
||||||
bucket.content.count {
|
bucket.content.count {
|
||||||
case (_, valueHolder) => valueHolder.ref.isDefined
|
case (_, valueHolder) => valueHolder.ref.isDefined
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -23,6 +23,7 @@ import java.io.NotSerializableException
|
||||||
/**
|
/**
|
||||||
* INTERNAL API: Protobuf serializer of DistributedPubSubMediator messages.
|
* INTERNAL API: Protobuf serializer of DistributedPubSubMediator messages.
|
||||||
*/
|
*/
|
||||||
|
@ccompatUsedUntil213
|
||||||
private[akka] class DistributedPubSubMessageSerializer(val system: ExtendedActorSystem)
|
private[akka] class DistributedPubSubMessageSerializer(val system: ExtendedActorSystem)
|
||||||
extends SerializerWithStringManifest
|
extends SerializerWithStringManifest
|
||||||
with BaseSerializer {
|
with BaseSerializer {
|
||||||
|
|
|
||||||
|
|
@ -29,11 +29,12 @@ import akka.util.JavaDurationConverters._
|
||||||
import scala.concurrent.Promise
|
import scala.concurrent.Promise
|
||||||
import akka.Done
|
import akka.Done
|
||||||
import akka.actor.CoordinatedShutdown
|
import akka.actor.CoordinatedShutdown
|
||||||
import akka.annotation.{ ApiMayChange, DoNotInherit }
|
import akka.annotation.DoNotInherit
|
||||||
import akka.coordination.lease.LeaseUsageSettings
|
import akka.coordination.lease.LeaseUsageSettings
|
||||||
import akka.pattern.ask
|
import akka.pattern.ask
|
||||||
import akka.util.Timeout
|
import akka.util.Timeout
|
||||||
import akka.coordination.lease.scaladsl.{ Lease, LeaseProvider }
|
import akka.coordination.lease.scaladsl.{ Lease, LeaseProvider }
|
||||||
|
import com.github.ghik.silencer.silent
|
||||||
|
|
||||||
import scala.util.control.NonFatal
|
import scala.util.control.NonFatal
|
||||||
|
|
||||||
|
|
@ -43,6 +44,7 @@ object ClusterSingletonManagerSettings {
|
||||||
* Create settings from the default configuration
|
* Create settings from the default configuration
|
||||||
* `akka.cluster.singleton`.
|
* `akka.cluster.singleton`.
|
||||||
*/
|
*/
|
||||||
|
@silent // deprecated setting
|
||||||
def apply(system: ActorSystem): ClusterSingletonManagerSettings =
|
def apply(system: ActorSystem): ClusterSingletonManagerSettings =
|
||||||
apply(system.settings.config.getConfig("akka.cluster.singleton"))
|
apply(system.settings.config.getConfig("akka.cluster.singleton"))
|
||||||
// note that this setting has some additional logic inside the ClusterSingletonManager
|
// note that this setting has some additional logic inside the ClusterSingletonManager
|
||||||
|
|
@ -476,7 +478,6 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se
|
||||||
import ClusterSingletonManager.Internal._
|
import ClusterSingletonManager.Internal._
|
||||||
import ClusterSingletonManager.Internal.OldestChangedBuffer._
|
import ClusterSingletonManager.Internal.OldestChangedBuffer._
|
||||||
import settings._
|
import settings._
|
||||||
import FSM.`->`
|
|
||||||
|
|
||||||
val cluster = Cluster(context.system)
|
val cluster = Cluster(context.system)
|
||||||
val selfUniqueAddressOption = Some(cluster.selfUniqueAddress)
|
val selfUniqueAddressOption = Some(cluster.selfUniqueAddress)
|
||||||
|
|
|
||||||
|
|
@ -264,7 +264,7 @@ final class ClusterSingletonProxy(singletonManagerPath: String, settings: Cluste
|
||||||
case _: MemberEvent => // do nothing
|
case _: MemberEvent => // do nothing
|
||||||
|
|
||||||
// singleton identification logic
|
// singleton identification logic
|
||||||
case ActorIdentity(identifyId, Some(s)) =>
|
case ActorIdentity(_, Some(s)) =>
|
||||||
// if the new singleton is defined, deliver all buffered messages
|
// if the new singleton is defined, deliver all buffered messages
|
||||||
log.info("Singleton identified at [{}]", s.path)
|
log.info("Singleton identified at [{}]", s.path)
|
||||||
singleton = Some(s)
|
singleton = Some(s)
|
||||||
|
|
|
||||||
|
|
@ -10,7 +10,6 @@ import akka.cluster.singleton.ClusterSingletonManager.Internal.HandOverInProgres
|
||||||
import akka.cluster.singleton.ClusterSingletonManager.Internal.HandOverToMe
|
import akka.cluster.singleton.ClusterSingletonManager.Internal.HandOverToMe
|
||||||
import akka.cluster.singleton.ClusterSingletonManager.Internal.TakeOverFromMe
|
import akka.cluster.singleton.ClusterSingletonManager.Internal.TakeOverFromMe
|
||||||
import akka.serialization.BaseSerializer
|
import akka.serialization.BaseSerializer
|
||||||
import akka.serialization.SerializationExtension
|
|
||||||
import akka.serialization.SerializerWithStringManifest
|
import akka.serialization.SerializerWithStringManifest
|
||||||
import java.io.NotSerializableException
|
import java.io.NotSerializableException
|
||||||
|
|
||||||
|
|
@ -23,8 +22,6 @@ private[akka] class ClusterSingletonMessageSerializer(val system: ExtendedActorS
|
||||||
extends SerializerWithStringManifest
|
extends SerializerWithStringManifest
|
||||||
with BaseSerializer {
|
with BaseSerializer {
|
||||||
|
|
||||||
private lazy val serialization = SerializationExtension(system)
|
|
||||||
|
|
||||||
private val HandOverToMeManifest = "A"
|
private val HandOverToMeManifest = "A"
|
||||||
private val HandOverInProgressManifest = "B"
|
private val HandOverInProgressManifest = "B"
|
||||||
private val HandOverDoneManifest = "C"
|
private val HandOverDoneManifest = "C"
|
||||||
|
|
|
||||||
|
|
@ -25,8 +25,8 @@ import akka.pattern.ask
|
||||||
import akka.util.Timeout
|
import akka.util.Timeout
|
||||||
|
|
||||||
object TestLeaseActor {
|
object TestLeaseActor {
|
||||||
def props(probe: ActorRef): Props =
|
def props(): Props =
|
||||||
Props(new TestLeaseActor(probe))
|
Props(new TestLeaseActor)
|
||||||
|
|
||||||
sealed trait LeaseRequest
|
sealed trait LeaseRequest
|
||||||
final case class Acquire(owner: String) extends LeaseRequest
|
final case class Acquire(owner: String) extends LeaseRequest
|
||||||
|
|
@ -38,7 +38,7 @@ object TestLeaseActor {
|
||||||
final case class ActionRequest(request: LeaseRequest, result: Any) // boolean of Failure
|
final case class ActionRequest(request: LeaseRequest, result: Any) // boolean of Failure
|
||||||
}
|
}
|
||||||
|
|
||||||
class TestLeaseActor(probe: ActorRef) extends Actor with ActorLogging {
|
class TestLeaseActor extends Actor with ActorLogging {
|
||||||
import TestLeaseActor._
|
import TestLeaseActor._
|
||||||
|
|
||||||
var requests: List[(ActorRef, LeaseRequest)] = Nil
|
var requests: List[(ActorRef, LeaseRequest)] = Nil
|
||||||
|
|
|
||||||
|
|
@ -28,6 +28,7 @@ import akka.testkit._
|
||||||
import akka.cluster.pubsub._
|
import akka.cluster.pubsub._
|
||||||
import akka.remote.transport.ThrottlerTransportAdapter.Direction
|
import akka.remote.transport.ThrottlerTransportAdapter.Direction
|
||||||
import akka.util.Timeout
|
import akka.util.Timeout
|
||||||
|
import akka.util.unused
|
||||||
|
|
||||||
import scala.concurrent.Await
|
import scala.concurrent.Await
|
||||||
|
|
||||||
|
|
@ -64,7 +65,7 @@ object ClusterClientSpec extends MultiNodeConfig {
|
||||||
context.system.terminate()
|
context.system.terminate()
|
||||||
case msg =>
|
case msg =>
|
||||||
testActor.forward(msg)
|
testActor.forward(msg)
|
||||||
sender() ! Reply(msg + "-ack", Cluster(context.system).selfAddress)
|
sender() ! Reply(s"$msg-ack", Cluster(context.system).selfAddress)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -189,6 +190,19 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod
|
||||||
node(r) / "system" / "receptionist"
|
node(r) / "system" / "receptionist"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@unused
|
||||||
|
def docOnly = { //not used, only demo
|
||||||
|
//#initialContacts
|
||||||
|
val initialContacts = Set(
|
||||||
|
ActorPath.fromString("akka.tcp://OtherSys@host1:2552/system/receptionist"),
|
||||||
|
ActorPath.fromString("akka.tcp://OtherSys@host2:2552/system/receptionist"))
|
||||||
|
val settings = ClusterClientSettings(system).withInitialContacts(initialContacts)
|
||||||
|
//#initialContacts
|
||||||
|
|
||||||
|
// make the compiler happy and thinking we use it
|
||||||
|
settings.acceptableHeartbeatPause
|
||||||
|
}
|
||||||
|
|
||||||
"A ClusterClient" must {
|
"A ClusterClient" must {
|
||||||
|
|
||||||
"startup cluster" in within(30 seconds) {
|
"startup cluster" in within(30 seconds) {
|
||||||
|
|
@ -278,15 +292,6 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod
|
||||||
receiveN(3).toSet should ===(Set("hello", "hi"))
|
receiveN(3).toSet should ===(Set("hello", "hi"))
|
||||||
}
|
}
|
||||||
|
|
||||||
lazy val docOnly = { //not used, only demo
|
|
||||||
//#initialContacts
|
|
||||||
val initialContacts = Set(
|
|
||||||
ActorPath.fromString("akka.tcp://OtherSys@host1:2552/system/receptionist"),
|
|
||||||
ActorPath.fromString("akka.tcp://OtherSys@host2:2552/system/receptionist"))
|
|
||||||
val settings = ClusterClientSettings(system).withInitialContacts(initialContacts)
|
|
||||||
//#initialContacts
|
|
||||||
}
|
|
||||||
|
|
||||||
// strange, barriers fail without this sleep
|
// strange, barriers fail without this sleep
|
||||||
Thread.sleep(1000)
|
Thread.sleep(1000)
|
||||||
enterBarrier("after-4")
|
enterBarrier("after-4")
|
||||||
|
|
@ -436,7 +441,7 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod
|
||||||
c ! ClusterClient.Send("/user/service2", "ping", localAffinity = true)
|
c ! ClusterClient.Send("/user/service2", "ping", localAffinity = true)
|
||||||
// if we would use remote watch the failure detector would trigger and
|
// if we would use remote watch the failure detector would trigger and
|
||||||
// connection quarantined
|
// connection quarantined
|
||||||
expectNoMsg(5 seconds)
|
expectNoMessage(5 seconds)
|
||||||
|
|
||||||
testConductor.passThrough(client, receptionistRoleName, Direction.Both).await
|
testConductor.passThrough(client, receptionistRoleName, Direction.Both).await
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -149,7 +149,7 @@ class DistributedPubSubMediatorSpec
|
||||||
var chatUsers: Map[String, ActorRef] = Map.empty
|
var chatUsers: Map[String, ActorRef] = Map.empty
|
||||||
|
|
||||||
def createChatUser(name: String): ActorRef = {
|
def createChatUser(name: String): ActorRef = {
|
||||||
var a = system.actorOf(Props(classOf[TestChatUser], mediator, testActor), name)
|
val a = system.actorOf(Props(classOf[TestChatUser], mediator, testActor), name)
|
||||||
chatUsers += (name -> a)
|
chatUsers += (name -> a)
|
||||||
a
|
a
|
||||||
}
|
}
|
||||||
|
|
@ -292,7 +292,7 @@ class DistributedPubSubMediatorSpec
|
||||||
lastSender.path.name should ===("u7")
|
lastSender.path.name should ===("u7")
|
||||||
}
|
}
|
||||||
runOn(third) {
|
runOn(third) {
|
||||||
expectNoMsg(2.seconds)
|
expectNoMessage(2.seconds)
|
||||||
}
|
}
|
||||||
|
|
||||||
enterBarrier("after-6")
|
enterBarrier("after-6")
|
||||||
|
|
@ -331,7 +331,7 @@ class DistributedPubSubMediatorSpec
|
||||||
lastSender.path.name should ===("u10")
|
lastSender.path.name should ===("u10")
|
||||||
}
|
}
|
||||||
runOn(third) {
|
runOn(third) {
|
||||||
expectNoMsg(2.seconds)
|
expectNoMessage(2.seconds)
|
||||||
}
|
}
|
||||||
|
|
||||||
enterBarrier("after-7")
|
enterBarrier("after-7")
|
||||||
|
|
@ -407,7 +407,7 @@ class DistributedPubSubMediatorSpec
|
||||||
lastSender.path.name should ===("u11")
|
lastSender.path.name should ===("u11")
|
||||||
}
|
}
|
||||||
runOn(third) {
|
runOn(third) {
|
||||||
expectNoMsg(2.seconds) // sender() node should not receive a message
|
expectNoMessage(2.seconds) // sender() node should not receive a message
|
||||||
}
|
}
|
||||||
|
|
||||||
enterBarrier("after-11")
|
enterBarrier("after-11")
|
||||||
|
|
@ -437,7 +437,7 @@ class DistributedPubSubMediatorSpec
|
||||||
|
|
||||||
runOn(first, second) {
|
runOn(first, second) {
|
||||||
expectMsg("hi")
|
expectMsg("hi")
|
||||||
expectNoMsg(2.seconds) // each group receive only one message
|
expectNoMessage(2.seconds) // each group receive only one message
|
||||||
}
|
}
|
||||||
enterBarrier("12-published")
|
enterBarrier("12-published")
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -156,7 +156,7 @@ class DistributedPubSubRestartSpec
|
||||||
probe.expectMsgType[SubscribeAck]
|
probe.expectMsgType[SubscribeAck]
|
||||||
|
|
||||||
// let them gossip, but Delta should not be exchanged
|
// let them gossip, but Delta should not be exchanged
|
||||||
probe.expectNoMsg(5.seconds)
|
probe.expectNoMessage(5.seconds)
|
||||||
newMediator.tell(Internal.DeltaCount, probe.ref)
|
newMediator.tell(Internal.DeltaCount, probe.ref)
|
||||||
probe.expectMsg(0L)
|
probe.expectMsg(0L)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -20,6 +20,7 @@ import akka.testkit._
|
||||||
import akka.util.ccompat._
|
import akka.util.ccompat._
|
||||||
import com.typesafe.config.ConfigFactory
|
import com.typesafe.config.ConfigFactory
|
||||||
|
|
||||||
|
@ccompatUsedUntil213
|
||||||
object ClusterSingletonManagerDownedSpec extends MultiNodeConfig {
|
object ClusterSingletonManagerDownedSpec extends MultiNodeConfig {
|
||||||
val first = role("first")
|
val first = role("first")
|
||||||
val second = role("second")
|
val second = role("second")
|
||||||
|
|
|
||||||
|
|
@ -10,11 +10,9 @@ import akka.cluster.TestLeaseActor._
|
||||||
import akka.cluster.singleton.ClusterSingletonManagerLeaseSpec.ImportantSingleton.Response
|
import akka.cluster.singleton.ClusterSingletonManagerLeaseSpec.ImportantSingleton.Response
|
||||||
import akka.cluster._
|
import akka.cluster._
|
||||||
import akka.remote.testkit.{ MultiNodeConfig, MultiNodeSpec, STMultiNodeSpec }
|
import akka.remote.testkit.{ MultiNodeConfig, MultiNodeSpec, STMultiNodeSpec }
|
||||||
import akka.remote.transport.ThrottlerTransportAdapter.Direction
|
|
||||||
import akka.testkit._
|
import akka.testkit._
|
||||||
import com.typesafe.config.ConfigFactory
|
import com.typesafe.config.ConfigFactory
|
||||||
|
|
||||||
import scala.language.postfixOps
|
|
||||||
import scala.concurrent.duration._
|
import scala.concurrent.duration._
|
||||||
|
|
||||||
object ClusterSingletonManagerLeaseSpec extends MultiNodeConfig {
|
object ClusterSingletonManagerLeaseSpec extends MultiNodeConfig {
|
||||||
|
|
@ -115,7 +113,7 @@ class ClusterSingletonManagerLeaseSpec
|
||||||
|
|
||||||
"start test lease" in {
|
"start test lease" in {
|
||||||
runOn(controller) {
|
runOn(controller) {
|
||||||
system.actorOf(TestLeaseActor.props(leaseProbe.ref), s"lease-${system.name}")
|
system.actorOf(TestLeaseActor.props(), s"lease-${system.name}")
|
||||||
}
|
}
|
||||||
enterBarrier("lease-actor-started")
|
enterBarrier("lease-actor-started")
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -80,7 +80,7 @@ object ClusterSingletonManagerSpec extends MultiNodeConfig {
|
||||||
sender() ! UnexpectedUnregistration
|
sender() ! UnexpectedUnregistration
|
||||||
context.stop(self)
|
context.stop(self)
|
||||||
case Reset => sender() ! ResetOk
|
case Reset => sender() ! ResetOk
|
||||||
case msg => // no consumer, drop
|
case _ => // no consumer, drop
|
||||||
}
|
}
|
||||||
|
|
||||||
def active(consumer: ActorRef): Receive = {
|
def active(consumer: ActorRef): Receive = {
|
||||||
|
|
@ -298,13 +298,13 @@ class ClusterSingletonManagerSpec
|
||||||
runOn(controller) {
|
runOn(controller) {
|
||||||
queue ! msg
|
queue ! msg
|
||||||
// make sure it's not terminated, which would be wrong
|
// make sure it's not terminated, which would be wrong
|
||||||
expectNoMsg(1 second)
|
expectNoMessage(1 second)
|
||||||
}
|
}
|
||||||
runOn(oldest) {
|
runOn(oldest) {
|
||||||
expectMsg(5.seconds, msg)
|
expectMsg(5.seconds, msg)
|
||||||
}
|
}
|
||||||
runOn(roles.filterNot(r => r == oldest || r == controller || r == observer): _*) {
|
runOn(roles.filterNot(r => r == oldest || r == controller || r == observer): _*) {
|
||||||
expectNoMsg(1 second)
|
expectNoMessage(1 second)
|
||||||
}
|
}
|
||||||
enterBarrier("after-" + msg + "-verified")
|
enterBarrier("after-" + msg + "-verified")
|
||||||
}
|
}
|
||||||
|
|
@ -383,7 +383,6 @@ class ClusterSingletonManagerSpec
|
||||||
|
|
||||||
"hand over when oldest leaves in 6 nodes cluster " in within(30 seconds) {
|
"hand over when oldest leaves in 6 nodes cluster " in within(30 seconds) {
|
||||||
val leaveRole = first
|
val leaveRole = first
|
||||||
val newOldestRole = second
|
|
||||||
|
|
||||||
runOn(leaveRole) {
|
runOn(leaveRole) {
|
||||||
Cluster(system).leave(node(leaveRole).address)
|
Cluster(system).leave(node(leaveRole).address)
|
||||||
|
|
|
||||||
|
|
@ -35,7 +35,7 @@ object ClusterSingletonManagerStartupSpec extends MultiNodeConfig {
|
||||||
/**
|
/**
|
||||||
* The singleton actor
|
* The singleton actor
|
||||||
*/
|
*/
|
||||||
class Echo(testActor: ActorRef) extends Actor {
|
class Echo extends Actor {
|
||||||
def receive = {
|
def receive = {
|
||||||
case _ =>
|
case _ =>
|
||||||
sender() ! self
|
sender() ! self
|
||||||
|
|
@ -65,7 +65,7 @@ class ClusterSingletonManagerStartupSpec
|
||||||
def createSingleton(): ActorRef = {
|
def createSingleton(): ActorRef = {
|
||||||
system.actorOf(
|
system.actorOf(
|
||||||
ClusterSingletonManager.props(
|
ClusterSingletonManager.props(
|
||||||
singletonProps = Props(classOf[Echo], testActor),
|
singletonProps = Props(classOf[Echo]),
|
||||||
terminationMessage = PoisonPill,
|
terminationMessage = PoisonPill,
|
||||||
settings = ClusterSingletonManagerSettings(system)),
|
settings = ClusterSingletonManagerSettings(system)),
|
||||||
name = "echo")
|
name = "echo")
|
||||||
|
|
|
||||||
|
|
@ -25,7 +25,7 @@ trait DeadLettersProbe { this: TestKitBase =>
|
||||||
val deadLettersProbe = TestProbe()
|
val deadLettersProbe = TestProbe()
|
||||||
system.eventStream.subscribe(deadLettersProbe.ref, classOf[DeadLetter])
|
system.eventStream.subscribe(deadLettersProbe.ref, classOf[DeadLetter])
|
||||||
|
|
||||||
def expectNoDeadLetters(): Unit = deadLettersProbe.expectNoMsg(100.milliseconds)
|
def expectNoDeadLetters(): Unit = deadLettersProbe.expectNoMessage(100.milliseconds)
|
||||||
def expectDeadLetter(): Unit = deadLettersProbe.expectMsgClass(classOf[DeadLetter])
|
def expectDeadLetter(): Unit = deadLettersProbe.expectMsgClass(classOf[DeadLetter])
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -71,6 +71,8 @@ class ClusterSingletonLeavingSpeedSpec
|
||||||
|
|
||||||
override def expectedTestDuration: FiniteDuration = 10.minutes
|
override def expectedTestDuration: FiniteDuration = 10.minutes
|
||||||
|
|
||||||
|
import akka.util.ccompat._
|
||||||
|
@ccompatUsedUntil213
|
||||||
def join(from: ActorSystem, to: ActorSystem, probe: ActorRef): Unit = {
|
def join(from: ActorSystem, to: ActorSystem, probe: ActorRef): Unit = {
|
||||||
|
|
||||||
from.actorOf(
|
from.actorOf(
|
||||||
|
|
@ -82,7 +84,7 @@ class ClusterSingletonLeavingSpeedSpec
|
||||||
|
|
||||||
Cluster(from).join(Cluster(to).selfAddress)
|
Cluster(from).join(Cluster(to).selfAddress)
|
||||||
within(15.seconds) {
|
within(15.seconds) {
|
||||||
import akka.util.ccompat._
|
|
||||||
awaitAssert {
|
awaitAssert {
|
||||||
Cluster(from).state.members.map(_.uniqueAddress) should contain(Cluster(from).selfUniqueAddress)
|
Cluster(from).state.members.map(_.uniqueAddress) should contain(Cluster(from).selfUniqueAddress)
|
||||||
Cluster(from).state.members.unsorted.map(_.status) should ===(Set(MemberStatus.Up))
|
Cluster(from).state.members.unsorted.map(_.status) should ===(Set(MemberStatus.Up))
|
||||||
|
|
|
||||||
|
|
@ -18,7 +18,7 @@ class ClusterSingletonProxySpec extends WordSpecLike with Matchers with BeforeAn
|
||||||
val seed = new ActorSys()
|
val seed = new ActorSys()
|
||||||
|
|
||||||
val testSystems = {
|
val testSystems = {
|
||||||
val joiners = (0 until 4).map(n => new ActorSys(joinTo = Some(seed.cluster.selfAddress)))
|
val joiners = (0 until 4).map(_ => new ActorSys(joinTo = Some(seed.cluster.selfAddress)))
|
||||||
joiners :+ seed
|
joiners :+ seed
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -5,7 +5,6 @@
|
||||||
package akka.cluster.singleton
|
package akka.cluster.singleton
|
||||||
|
|
||||||
import scala.concurrent.duration._
|
import scala.concurrent.duration._
|
||||||
|
|
||||||
import akka.actor.Actor
|
import akka.actor.Actor
|
||||||
import akka.actor.ActorSystem
|
import akka.actor.ActorSystem
|
||||||
import akka.actor.PoisonPill
|
import akka.actor.PoisonPill
|
||||||
|
|
@ -52,6 +51,8 @@ class ClusterSingletonRestart2Spec extends AkkaSpec("""
|
||||||
ConfigFactory.parseString("akka.cluster.roles = [other]").withFallback(system.settings.config))
|
ConfigFactory.parseString("akka.cluster.roles = [other]").withFallback(system.settings.config))
|
||||||
var sys4: ActorSystem = null
|
var sys4: ActorSystem = null
|
||||||
|
|
||||||
|
import akka.util.ccompat._
|
||||||
|
@ccompatUsedUntil213
|
||||||
def join(from: ActorSystem, to: ActorSystem): Unit = {
|
def join(from: ActorSystem, to: ActorSystem): Unit = {
|
||||||
if (Cluster(from).selfRoles.contains("singleton"))
|
if (Cluster(from).selfRoles.contains("singleton"))
|
||||||
from.actorOf(
|
from.actorOf(
|
||||||
|
|
@ -62,7 +63,6 @@ class ClusterSingletonRestart2Spec extends AkkaSpec("""
|
||||||
name = "echo")
|
name = "echo")
|
||||||
|
|
||||||
within(45.seconds) {
|
within(45.seconds) {
|
||||||
import akka.util.ccompat._
|
|
||||||
awaitAssert {
|
awaitAssert {
|
||||||
Cluster(from).join(Cluster(to).selfAddress)
|
Cluster(from).join(Cluster(to).selfAddress)
|
||||||
Cluster(from).state.members.map(_.uniqueAddress) should contain(Cluster(from).selfUniqueAddress)
|
Cluster(from).state.members.map(_.uniqueAddress) should contain(Cluster(from).selfUniqueAddress)
|
||||||
|
|
|
||||||
|
|
@ -34,6 +34,8 @@ class ClusterSingletonRestartSpec extends AkkaSpec("""
|
||||||
val sys2 = ActorSystem(system.name, system.settings.config)
|
val sys2 = ActorSystem(system.name, system.settings.config)
|
||||||
var sys3: ActorSystem = null
|
var sys3: ActorSystem = null
|
||||||
|
|
||||||
|
import akka.util.ccompat._
|
||||||
|
@ccompatUsedUntil213
|
||||||
def join(from: ActorSystem, to: ActorSystem): Unit = {
|
def join(from: ActorSystem, to: ActorSystem): Unit = {
|
||||||
from.actorOf(
|
from.actorOf(
|
||||||
ClusterSingletonManager.props(
|
ClusterSingletonManager.props(
|
||||||
|
|
@ -43,7 +45,6 @@ class ClusterSingletonRestartSpec extends AkkaSpec("""
|
||||||
name = "echo")
|
name = "echo")
|
||||||
|
|
||||||
within(10.seconds) {
|
within(10.seconds) {
|
||||||
import akka.util.ccompat._
|
|
||||||
awaitAssert {
|
awaitAssert {
|
||||||
Cluster(from).join(Cluster(to).selfAddress)
|
Cluster(from).join(Cluster(to).selfAddress)
|
||||||
Cluster(from).state.members.map(_.uniqueAddress) should contain(Cluster(from).selfUniqueAddress)
|
Cluster(from).state.members.map(_.uniqueAddress) should contain(Cluster(from).selfUniqueAddress)
|
||||||
|
|
|
||||||
|
|
@ -5,7 +5,7 @@
|
||||||
package akka
|
package akka
|
||||||
|
|
||||||
import sbt._
|
import sbt._
|
||||||
import Keys.{scalacOptions, _}
|
import Keys.{ scalacOptions, _ }
|
||||||
import sbt.plugins.JvmPlugin
|
import sbt.plugins.JvmPlugin
|
||||||
|
|
||||||
object AkkaDisciplinePlugin extends AutoPlugin with ScalafixSupport {
|
object AkkaDisciplinePlugin extends AutoPlugin with ScalafixSupport {
|
||||||
|
|
@ -24,23 +24,15 @@ object AkkaDisciplinePlugin extends AutoPlugin with ScalafixSupport {
|
||||||
"akka-coordination",
|
"akka-coordination",
|
||||||
"akka-protobuf",
|
"akka-protobuf",
|
||||||
"akka-stream-typed",
|
"akka-stream-typed",
|
||||||
"akka-cluster-typed"
|
"akka-cluster-typed",
|
||||||
)
|
"akka - cluster - tools")
|
||||||
|
|
||||||
val strictProjects = Set(
|
val strictProjects = Set("akka-discovery", "akka-protobuf", "akka-coordination")
|
||||||
"akka-discovery",
|
|
||||||
"akka-protobuf",
|
|
||||||
"akka-coordination"
|
|
||||||
)
|
|
||||||
|
|
||||||
lazy val scalaFixSettings = Seq(
|
lazy val scalaFixSettings = Seq(Compile / scalacOptions += "-Yrangepos")
|
||||||
Compile / scalacOptions += "-Yrangepos")
|
|
||||||
|
|
||||||
lazy val scoverageSettings = Seq(
|
lazy val scoverageSettings =
|
||||||
coverageMinimum := 70,
|
Seq(coverageMinimum := 70, coverageFailOnMinimum := false, coverageOutputHTML := true, coverageHighlighting := {
|
||||||
coverageFailOnMinimum := false,
|
|
||||||
coverageOutputHTML := true,
|
|
||||||
coverageHighlighting := {
|
|
||||||
import sbt.librarymanagement.{ SemanticSelector, VersionNumber }
|
import sbt.librarymanagement.{ SemanticSelector, VersionNumber }
|
||||||
!VersionNumber(scalaVersion.value).matchesSemVer(SemanticSelector("<=2.11.1"))
|
!VersionNumber(scalaVersion.value).matchesSemVer(SemanticSelector("<=2.11.1"))
|
||||||
})
|
})
|
||||||
|
|
@ -48,71 +40,62 @@ object AkkaDisciplinePlugin extends AutoPlugin with ScalafixSupport {
|
||||||
val silencerVersion = "1.3.1"
|
val silencerVersion = "1.3.1"
|
||||||
lazy val silencerSettings = Seq(
|
lazy val silencerSettings = Seq(
|
||||||
libraryDependencies ++= Seq(
|
libraryDependencies ++= Seq(
|
||||||
compilerPlugin("com.github.ghik" %% "silencer-plugin" % silencerVersion),
|
compilerPlugin("com.github.ghik" %% "silencer-plugin" % silencerVersion),
|
||||||
"com.github.ghik" %% "silencer-lib" % silencerVersion % Provided,
|
"com.github.ghik" %% "silencer-lib" % silencerVersion % Provided))
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
lazy val disciplineSettings =
|
lazy val disciplineSettings =
|
||||||
scalaFixSettings ++
|
scalaFixSettings ++
|
||||||
silencerSettings ++
|
silencerSettings ++
|
||||||
scoverageSettings ++ Seq(
|
scoverageSettings ++ Seq(
|
||||||
Compile / scalacOptions ++= (
|
Compile / scalacOptions ++= (
|
||||||
if (fatalWarningsFor(name.value)) Seq("-Xfatal-warnings")
|
if (fatalWarningsFor(name.value)) Seq("-Xfatal-warnings")
|
||||||
else Seq.empty
|
else Seq.empty
|
||||||
),
|
),
|
||||||
Test / scalacOptions --= testUndicipline,
|
Test / scalacOptions --= testUndicipline,
|
||||||
Compile / console / scalacOptions --= Seq("-deprecation", "-Xfatal-warnings", "-Xlint", "-Ywarn-unused:imports"),
|
Compile / console / scalacOptions --= Seq("-deprecation", "-Xfatal-warnings", "-Xlint", "-Ywarn-unused:imports"),
|
||||||
Compile / scalacOptions ++= (CrossVersion.partialVersion(scalaVersion.value) match {
|
Compile / scalacOptions ++= (CrossVersion.partialVersion(scalaVersion.value) match {
|
||||||
case Some((2, 13)) =>
|
case Some((2, 13)) =>
|
||||||
disciplineScalacOptions -- Set(
|
disciplineScalacOptions -- Set(
|
||||||
"-Ywarn-inaccessible",
|
"-Ywarn-inaccessible",
|
||||||
"-Ywarn-infer-any",
|
"-Ywarn-infer-any",
|
||||||
"-Ywarn-nullary-override",
|
"-Ywarn-nullary-override",
|
||||||
"-Ywarn-nullary-unit",
|
"-Ywarn-nullary-unit",
|
||||||
"-Ypartial-unification",
|
"-Ypartial-unification",
|
||||||
"-Yno-adapted-args",
|
"-Yno-adapted-args")
|
||||||
)
|
case Some((2, 12)) =>
|
||||||
case Some((2, 12)) =>
|
disciplineScalacOptions
|
||||||
disciplineScalacOptions
|
case Some((2, 11)) =>
|
||||||
case Some((2, 11)) =>
|
disciplineScalacOptions ++ Set("-language:existentials") -- Set(
|
||||||
disciplineScalacOptions ++ Set("-language:existentials") -- Set(
|
"-Ywarn-extra-implicit",
|
||||||
"-Ywarn-extra-implicit",
|
"-Ywarn-unused:_",
|
||||||
"-Ywarn-unused:_",
|
"-Ypartial-unification")
|
||||||
"-Ypartial-unification",
|
case _ =>
|
||||||
)
|
Nil
|
||||||
case _ =>
|
}).toSeq,
|
||||||
Nil
|
|
||||||
}).toSeq,
|
|
||||||
Compile / doc / scalacOptions ++= (CrossVersion.partialVersion(scalaVersion.value) match {
|
Compile / doc / scalacOptions ++= (CrossVersion.partialVersion(scalaVersion.value) match {
|
||||||
case Some((2, 11)) =>
|
case Some((2, 11)) =>
|
||||||
Seq("-no-link-warnings")
|
Seq("-no-link-warnings")
|
||||||
case _ =>
|
case _ =>
|
||||||
Seq.empty
|
Seq.empty
|
||||||
}),
|
}),
|
||||||
Compile / scalacOptions --=
|
Compile / scalacOptions --=
|
||||||
(if (strictProjects.contains(name.value)) Seq.empty
|
(if (strictProjects.contains(name.value)) Seq.empty
|
||||||
else undisciplineScalacOptions.toSeq),
|
else undisciplineScalacOptions.toSeq),
|
||||||
// Discipline is not needed for the docs compilation run (which uses
|
// Discipline is not needed for the docs compilation run (which uses
|
||||||
// different compiler phases from the regular run), and in particular
|
// different compiler phases from the regular run), and in particular
|
||||||
// '-Ywarn-unused:explicits' breaks 'sbt ++2.13.0-M5 akka-actor/doc'
|
// '-Ywarn-unused:explicits' breaks 'sbt ++2.13.0-M5 akka-actor/doc'
|
||||||
// https://github.com/akka/akka/issues/26119
|
// https://github.com/akka/akka/issues/26119
|
||||||
Compile / doc / scalacOptions --= disciplineScalacOptions.toSeq :+ "-Xfatal-warnings",
|
Compile / doc / scalacOptions --= disciplineScalacOptions.toSeq :+ "-Xfatal-warnings")
|
||||||
)
|
|
||||||
|
|
||||||
val testUndicipline = Seq(
|
val testUndicipline = Seq(
|
||||||
"-Ywarn-dead-code", // ??? used in compile only specs
|
"-Ywarn-dead-code", // ??? used in compile only specs
|
||||||
"-Ywarn-value-discard" // Ignoring returned assertions
|
"-Ywarn-value-discard" // Ignoring returned assertions
|
||||||
)
|
)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Remain visibly filtered for future code quality work and removing.
|
* Remain visibly filtered for future code quality work and removing.
|
||||||
*/
|
*/
|
||||||
val undisciplineScalacOptions = Set(
|
val undisciplineScalacOptions = Set("-Ywarn-value-discard", "-Ywarn-numeric-widen", "-Yno-adapted-args")
|
||||||
"-Ywarn-value-discard",
|
|
||||||
"-Ywarn-numeric-widen",
|
|
||||||
"-Yno-adapted-args",
|
|
||||||
)
|
|
||||||
|
|
||||||
/** These options are desired, but some are excluded for the time being*/
|
/** These options are desired, but some are excluded for the time being*/
|
||||||
val disciplineScalacOptions = Set(
|
val disciplineScalacOptions = Set(
|
||||||
|
|
@ -131,7 +114,6 @@ object AkkaDisciplinePlugin extends AutoPlugin with ScalafixSupport {
|
||||||
"-Ywarn-nullary-unit",
|
"-Ywarn-nullary-unit",
|
||||||
"-Ywarn-unused:_",
|
"-Ywarn-unused:_",
|
||||||
"-Ypartial-unification",
|
"-Ypartial-unification",
|
||||||
"-Ywarn-extra-implicit",
|
"-Ywarn-extra-implicit")
|
||||||
)
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue