build akka-cluster-tools on scala3 (#30683)

This commit is contained in:
Arnout Engelen 2021-09-16 09:06:34 +02:00 committed by GitHub
parent 8187afebe9
commit a085b21fc1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 22 additions and 16 deletions

View file

@ -21,7 +21,7 @@ jobs:
command: command:
- akka-testkit/test akka-actor-tests/test - akka-testkit/test akka-actor-tests/test
- akka-actor-testkit-typed/test akka-actor-typed-tests/test - akka-actor-testkit-typed/test akka-actor-typed-tests/test
- akka-cluster/Test/compile akka-distributed-data/test - akka-cluster/Test/compile akka-cluster-tools/test akka-distributed-data/test
- akka-coordination/test - akka-coordination/test
- akka-discovery/test - akka-discovery/test
- akka-persistence/test akka-persistence-shared/test akka-persistence-query/test - akka-persistence/test akka-persistence-shared/test akka-persistence-query/test

View file

@ -278,7 +278,7 @@ object DistributedPubSubMediator {
@SerialVersionUID(1L) @SerialVersionUID(1L)
final case class ValueHolder(version: Long, ref: Option[ActorRef]) { final case class ValueHolder(version: Long, ref: Option[ActorRef]) {
@transient lazy val routee: Option[Routee] = ref.map(ActorRefRoutee) @transient lazy val routee: Option[Routee] = ref.map(ActorRefRoutee(_))
} }
@SerialVersionUID(1L) @SerialVersionUID(1L)
@ -454,7 +454,7 @@ object DistributedPubSubMediator {
def business = { def business = {
case SendToOneSubscriber(msg) => case SendToOneSubscriber(msg) =>
if (subscribers.nonEmpty) if (subscribers.nonEmpty)
Router(routingLogic, subscribers.map(ActorRefRoutee).toVector).route(wrapIfNeeded(msg), sender()) Router(routingLogic, subscribers.map(ActorRefRoutee(_)).toVector).route(wrapIfNeeded(msg), sender())
} }
} }

View file

@ -34,6 +34,7 @@ import akka.coordination.lease.scaladsl.LeaseProvider
import akka.dispatch.Dispatchers import akka.dispatch.Dispatchers
import akka.event.LogMarker import akka.event.LogMarker
import akka.event.Logging import akka.event.Logging
import akka.event.MarkerLoggingAdapter
import akka.pattern.ask import akka.pattern.ask
import akka.pattern.pipe import akka.pattern.pipe
import akka.util.JavaDurationConverters._ import akka.util.JavaDurationConverters._
@ -495,7 +496,7 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se
private val singletonLeaseName = s"${context.system.name}-singleton-${self.path}" private val singletonLeaseName = s"${context.system.name}-singleton-${self.path}"
override val log = Logging.withMarker(context.system, this) override val log: MarkerLoggingAdapter = Logging.withMarker(context.system, this)
val lease: Option[Lease] = settings.leaseSettings.map( val lease: Option[Lease] = settings.leaseSettings.map(
settings => settings =>
@ -821,7 +822,7 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se
import context.dispatcher import context.dispatcher
if (!preparingForFullShutdown) { if (!preparingForFullShutdown) {
pipe(lease.get.acquire(reason => self ! LeaseLost(reason)).map[Any](AcquireLeaseResult).recover { pipe(lease.get.acquire(reason => self ! LeaseLost(reason)).map[Any](AcquireLeaseResult(_)).recover {
case NonFatal(t) => AcquireLeaseFailure(t) case NonFatal(t) => AcquireLeaseFailure(t)
}).to(self) }).to(self)
} }
@ -1201,7 +1202,7 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se
logInfo("Releasing lease as leaving AcquiringLease going to [{}]", to) logInfo("Releasing lease as leaving AcquiringLease going to [{}]", to)
import context.dispatcher import context.dispatcher
lease.foreach(l => lease.foreach(l =>
pipe(l.release().map[Any](ReleaseLeaseResult).recover { pipe(l.release().map[Any](ReleaseLeaseResult(_)).recover {
case t => ReleaseLeaseFailure(t) case t => ReleaseLeaseFailure(t)
}).to(self)) }).to(self))
case _ => case _ =>
@ -1213,7 +1214,7 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se
lease.foreach { l => lease.foreach { l =>
logInfo("Releasing lease as leaving Oldest") logInfo("Releasing lease as leaving Oldest")
import context.dispatcher import context.dispatcher
pipe(l.release().map(ReleaseLeaseResult)).to(self) pipe(l.release().map(ReleaseLeaseResult(_))).to(self)
} }
} }

View file

@ -182,7 +182,8 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod
def awaitCount(expected: Int): Unit = { def awaitCount(expected: Int): Unit = {
awaitAssert { awaitAssert {
DistributedPubSub(system).mediator ! DistributedPubSubMediator.Count DistributedPubSub(system).mediator ! DistributedPubSubMediator.Count
expectMsgType[Int] should ===(expected) val actual = expectMsgType[Int]
actual should ===(expected)
} }
} }
@ -247,7 +248,7 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod
val c = system.actorOf( val c = system.actorOf(
ClusterClient.props(ClusterClientSettings(system).withInitialContacts(initialContacts)), ClusterClient.props(ClusterClientSettings(system).withInitialContacts(initialContacts)),
"ask-client") "ask-client")
implicit val timeout = Timeout(remaining) implicit val timeout: Timeout = Timeout(remaining)
val reply = c ? ClusterClient.Send("/user/testService", "hello-request", localAffinity = true) val reply = c ? ClusterClient.Send("/user/testService", "hello-request", localAffinity = true)
Await.result(reply.mapTo[Reply], remaining).msg should be("hello-request-ack") Await.result(reply.mapTo[Reply], remaining).msg should be("hello-request-ack")
system.stop(c) system.stop(c)
@ -303,7 +304,7 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod
"report events" in within(15 seconds) { "report events" in within(15 seconds) {
runOn(client) { runOn(client) {
implicit val timeout = Timeout(1.second.dilated) implicit val timeout: Timeout = Timeout(1.second.dilated)
val client = Await.result(system.actorSelection("/user/client").resolveOne(), timeout.duration) val client = Await.result(system.actorSelection("/user/client").resolveOne(), timeout.duration)
val listener = system.actorOf(Props(classOf[TestClientListener], client), "reporter-client-listener") val listener = system.actorOf(Props(classOf[TestClientListener], client), "reporter-client-listener")
@ -319,7 +320,7 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod
runOn(first, second, third) { runOn(first, second, third) {
// Only run this test on a node that knows about our client. It could be that no node knows // Only run this test on a node that knows about our client. It could be that no node knows
// but there isn't a means of expressing that at least one of the nodes needs to pass the test. // but there isn't a means of expressing that at least one of the nodes needs to pass the test.
implicit val timeout = Timeout(2.seconds.dilated) implicit val timeout: Timeout = Timeout(2.seconds.dilated)
val r = ClusterClientReceptionist(system).underlying val r = ClusterClientReceptionist(system).underlying
r ! GetClusterClients r ! GetClusterClients
val cps = expectMsgType[ClusterClients] val cps = expectMsgType[ClusterClients]

View file

@ -64,7 +64,8 @@ class ClusterClientStopSpec extends MultiNodeSpec(ClusterClientStopSpec) with ST
def awaitCount(expected: Int): Unit = { def awaitCount(expected: Int): Unit = {
awaitAssert { awaitAssert {
DistributedPubSub(system).mediator ! DistributedPubSubMediator.Count DistributedPubSub(system).mediator ! DistributedPubSubMediator.Count
expectMsgType[Int] should ===(expected) val actual = expectMsgType[Int]
actual should ===(expected)
} }
} }

View file

@ -161,14 +161,16 @@ class DistributedPubSubMediatorSpec
def awaitCount(expected: Int): Unit = { def awaitCount(expected: Int): Unit = {
awaitAssert { awaitAssert {
mediator ! Count mediator ! Count
expectMsgType[Int] should ===(expected) val actual = expectMsgType[Int]
actual should ===(expected)
} }
} }
def awaitCountSubscribers(expected: Int, topic: String): Unit = { def awaitCountSubscribers(expected: Int, topic: String): Unit = {
awaitAssert { awaitAssert {
mediator ! CountSubscribers(topic) mediator ! CountSubscribers(topic)
expectMsgType[Int] should ===(expected) val actual = expectMsgType[Int]
actual should ===(expected)
} }
} }

View file

@ -76,7 +76,8 @@ class DistributedPubSubRestartSpec
val probe = TestProbe() val probe = TestProbe()
awaitAssert { awaitAssert {
mediator.tell(Count, probe.ref) mediator.tell(Count, probe.ref)
probe.expectMsgType[Int] should ===(expected) val actual = probe.expectMsgType[Int]
actual should ===(expected)
} }
} }

View file

@ -91,7 +91,7 @@ object AkkaBuild {
private def allWarnings: Boolean = System.getProperty("akka.allwarnings", "false").toBoolean private def allWarnings: Boolean = System.getProperty("akka.allwarnings", "false").toBoolean
final val DefaultScalacOptions = { final val DefaultScalacOptions = {
if (Dependencies.getScalaVersion().startsWith("3.0")) { if (Dependencies.getScalaVersion().startsWith("3.")) {
Seq( Seq(
"-encoding", "-encoding",
"UTF-8", "UTF-8",