Akka Remote compiling on Scala 3.0 (#30361)

Co-authored-by: Arnout Engelen <arnout@bzzt.net>
This commit is contained in:
Johan Andrén 2021-07-21 20:24:33 +02:00 committed by GitHub
parent 083303266e
commit 70ba0a1af0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
25 changed files with 62 additions and 56 deletions

View file

@ -38,7 +38,7 @@ jobs:
- stage: scala3
name: scala3
# separate job since only a few modules compile with Scala 3 yet
script: jabba install adopt@1.11-0 && jabba use adopt@1.11-0 && sbt -Dakka.build.scalaVersion=3.0 "akka-actor-tests/test:compile" akka-actor-testkit-typed/compile akka-actor-typed/compile akka-discovery/test akka-pki/test:compile akka-protobuf/test:compile akka-protobuf-v3/test:compile akka-slf4j/test:compile akka-stream/compile akka-stream-tests-tck/test akka-coordination/test akka-serialization-jackson/test:compile akka-testkit/test akka-stream-testkit/test
script: jabba install adopt@1.11-0 && jabba use adopt@1.11-0 && sbt -Dakka.build.scalaVersion=3.0 "akka-actor-tests/test:compile" akka-actor-testkit-typed/compile akka-actor-typed/compile akka-discovery/test akka-pki/test:compile akka-protobuf/test:compile akka-protobuf-v3/test:compile akka-slf4j/test:compile akka-stream/compile akka-stream-tests-tck/test akka-coordination/test akka-serialization-jackson/test:compile akka-testkit/test akka-stream-testkit/test akka-remote/compile
stages:
- name: whitesource

View file

@ -24,7 +24,7 @@ import akka.annotation.InternalApi
* information with an address, then this must be done externally.
*/
@SerialVersionUID(1L)
final case class Address private (protocol: String, system: String, host: Option[String], port: Option[Int]) {
final case class Address private[akka] (protocol: String, system: String, host: Option[String], port: Option[Int]) {
// Please note that local/non-local distinction must be preserved:
// host.isDefined == hasGlobalScope
// host.isEmpty == hasLocalScope

View file

@ -0,0 +1,3 @@
# deprecated methods since 2.4 removed (clashing with generated on Scala 3)
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.remote.QuarantinedEvent.copy")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.remote.QuarantinedEvent.copy$default$2")

View file

@ -289,7 +289,7 @@ private[akka] class RemoteActorRefProvider(
remoting: String,
libraryMissing: String,
link: String): Unit = {
system.dynamicAccess.getClassFor(className) match {
system.dynamicAccess.getClassFor[Any](className) match {
case Failure(_: ClassNotFoundException | _: NoClassDefFoundError) =>
throw new IllegalStateException(
s"$remoting remoting is enabled but $libraryMissing is not on the classpath, it must be added explicitly. See $link")

View file

@ -67,7 +67,7 @@ private[akka] class RemoteMetricsOn(system: ExtendedActorSystem) extends RemoteM
private val logFrameSizeExceeding: Int =
RARP(system).provider.remoteSettings.LogFrameSizeExceeding.getOrElse(Int.MaxValue)
private val log = Logging(system, this.getClass)
private val log = Logging(system, classOf[RemoteMetrics])
private val maxPayloadBytes: ConcurrentHashMap[Class[_], Integer] = new ConcurrentHashMap
override def logPayloadBytes(msg: Any, payloadBytes: Int): Unit =

View file

@ -157,7 +157,7 @@ private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteAc
override def localAddressForRemote(remote: Address): Address =
Remoting.localAddressForRemote(transportMapping, remote)
val log: LoggingAdapter = Logging(system.eventStream, getClass)
val log: LoggingAdapter = Logging(system.eventStream, classOf[Remoting])
val eventPublisher = new EventPublisher(system, log, RemoteLifecycleEventsLogLevel)
private def notifyError(msg: String, cause: Throwable): Unit =
@ -540,7 +540,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter)
case d: FiniteDuration =>
endpoints.markAsQuarantined(remoteAddress, uid, Deadline.now + d)
eventPublisher.notifyListeners(QuarantinedEvent(remoteAddress, uid.toLong))
case _ => // disabled
case null => // disabled
}
Stop
@ -651,7 +651,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter)
case ManagementCommand(cmd) =>
val allStatuses: immutable.Seq[Future[Boolean]] =
transportMapping.values.iterator.map(transport => transport.managementCommand(cmd)).to(immutable.IndexedSeq)
akka.compat.Future.fold(allStatuses)(true)(_ && _).map(ManagementCommandAck).pipeTo(sender())
akka.compat.Future.fold(allStatuses)(true)(_ && _).map(ManagementCommandAck.apply).pipeTo(sender())
case Quarantine(address, uidToQuarantineOption) =>
// Stop writers

View file

@ -115,9 +115,6 @@ final case class QuarantinedEvent(address: Address, longUid: Long) extends Remot
@deprecated("Use long uid", "2.4.x")
def uid: Int = longUid.toInt
@nowarn("msg=deprecated")
@deprecated("Use long uid copy method", "2.4.x")
def copy(address: Address = address, uid: Int = uid) = new QuarantinedEvent(address, uid.toLong)
}
/**

View file

@ -117,7 +117,7 @@ private[remote] object AssociationState {
/**
* INTERNAL API
*/
private[remote] final class AssociationState(
private[remote] final class AssociationState private (
val incarnation: Int,
val lastUsedTimestamp: AtomicLong, // System.nanoTime timestamp
val controlIdleKillSwitch: OptionVal[SharedKillSwitch],
@ -277,7 +277,7 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr
@volatile private[this] var controlSubject: ControlMessageSubject = _
@volatile private[this] var messageDispatcher: MessageDispatcher = _
override val log: MarkerLoggingAdapter = Logging.withMarker(system, getClass)
override val log: MarkerLoggingAdapter = Logging.withMarker(system, classOf[ArteryTransport])
val flightRecorder: RemotingFlightRecorder = RemotingFlightRecorder(system)
log.debug("Using flight recorder {}", flightRecorder)

View file

@ -146,7 +146,7 @@ private[remote] class Association(
require(remoteAddress.port.nonEmpty)
private val log = Logging.withMarker(transport.system, getClass)
private val log = Logging.withMarker(transport.system, classOf[Association])
private def flightRecorder = transport.flightRecorder
override def settings = transport.settings

View file

@ -118,7 +118,7 @@ private[akka] class ImmutableLongMap[A >: Null] private (private val keys: Array
}
override def equals(obj: Any): Boolean = obj match {
case other: ImmutableLongMap[A] =>
case other: ImmutableLongMap[_] =>
if (other eq this) true
else if (size != other.size) false
else if (size == 0 && other.size == 0) true

View file

@ -99,7 +99,7 @@ abstract class RemoteInstrument {
.settings
private val logFrameSizeExceeding = settings.LogFrameSizeExceeding.get
private val log = Logging(system, this.getClass)
private val log = Logging(system, classOf[LoggingRemoteInstrument])
private val maxPayloadBytes: ConcurrentHashMap[Class[_], Integer] = new ConcurrentHashMap

View file

@ -116,7 +116,7 @@ private[akka] object TaskRunner {
private[akka] class TaskRunner(system: ExtendedActorSystem, val idleCpuLevel: Int) extends Runnable {
import TaskRunner._
private val log = Logging(system, getClass)
private val log = Logging(system, classOf[TaskRunner])
private[this] var running = false
private[this] val cmdQueue = new CommandQueue
private[this] val tasks = new ArrayBag[Task]

View file

@ -35,7 +35,7 @@ private[remote] object DecompressionTable {
val DisabledVersion: Byte = -1
private[this] val _empty = DecompressionTable(0, 0, Array.empty)
private[this] val _empty = DecompressionTable(0, 0, Array.empty[Any])
def empty[T] = _empty.asInstanceOf[DecompressionTable[T]]
def disabled[T] = empty[T].copy(version = DisabledVersion)
}

View file

@ -116,7 +116,6 @@ private[remote] class ArteryTcpTransport(
outboundContext: OutboundContext,
streamId: Int,
bufferPool: EnvelopeBufferPool): Sink[EnvelopeBuffer, Future[Done]] = {
implicit val sys: ActorSystem = system
val host = outboundContext.remoteAddress.host.get
val port = outboundContext.remoteAddress.port.get
@ -129,7 +128,7 @@ private[remote] class ArteryTcpTransport(
}
if (tlsEnabled) {
val sslProvider = sslEngineProvider.get
Tcp().outgoingConnectionWithTls(
Tcp(system).outgoingConnectionWithTls(
remoteAddress,
createSSLEngine = () => sslProvider.createClientSSLEngine(host, port),
localAddress,
@ -139,7 +138,7 @@ private[remote] class ArteryTcpTransport(
verifySession = session => optionToTry(sslProvider.verifyClientSession(host, session)),
closing = IgnoreComplete)
} else {
Tcp().outgoingConnection(
Tcp(system).outgoingConnection(
remoteAddress,
localAddress,
halfClose = true, // issue https://github.com/akka/akka/issues/24392 if set to false
@ -244,7 +243,7 @@ private[remote] class ArteryTcpTransport(
val connectionSource: Source[Tcp.IncomingConnection, Future[ServerBinding]] =
if (tlsEnabled) {
val sslProvider = sslEngineProvider.get
Tcp().bindWithTls(
Tcp(system).bindWithTls(
interface = bindHost,
port = bindPort,
createSSLEngine = () => sslProvider.createServerSSLEngine(bindHost, bindPort),
@ -254,7 +253,7 @@ private[remote] class ArteryTcpTransport(
verifySession = session => optionToTry(sslProvider.verifyServerSession(bindHost, session)),
closing = IgnoreComplete)
} else {
Tcp().bind(interface = bindHost, port = bindPort, halfClose = false)
Tcp(system).bind(interface = bindHost, port = bindPort, halfClose = false)
}
val binding = serverBinding match {

View file

@ -26,7 +26,7 @@ private[ssl] trait SessionVerifier {
* INTERNAL API
*/
@InternalApi
private[ssl] final object NoopSessionVerifier extends SessionVerifier {
private[ssl] object NoopSessionVerifier extends SessionVerifier {
override def verifyClientSession(hostname: String, session: SSLSession): Option[Throwable] = None
override def verifyServerSession(hostname: String, session: SSLSession): Option[Throwable] = None
}

View file

@ -97,25 +97,27 @@ private[akka] final class ArteryMessageSerializer(val system: ExtendedActorSyste
override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef =
manifest match { // most frequent ones first (could be made a HashMap in the future)
case SystemMessageEnvelopeManifest => deserializeSystemMessageEnvelope(bytes)
case SystemMessageDeliveryAckManifest => deserializeSystemMessageDeliveryAck(bytes, SystemMessageDelivery.Ack)
case HandshakeReqManifest => deserializeHandshakeReq(bytes, HandshakeReq)
case HandshakeRspManifest => deserializeWithFromAddress(bytes, HandshakeRsp)
case SystemMessageDeliveryNackManifest => deserializeSystemMessageDeliveryAck(bytes, SystemMessageDelivery.Nack)
case SystemMessageEnvelopeManifest => deserializeSystemMessageEnvelope(bytes)
case SystemMessageDeliveryAckManifest =>
deserializeSystemMessageDeliveryAck(bytes, SystemMessageDelivery.Ack.apply)
case HandshakeReqManifest => deserializeHandshakeReq(bytes, HandshakeReq.apply)
case HandshakeRspManifest => deserializeWithFromAddress(bytes, HandshakeRsp.apply)
case SystemMessageDeliveryNackManifest =>
deserializeSystemMessageDeliveryAck(bytes, SystemMessageDelivery.Nack.apply)
case QuarantinedManifest => deserializeQuarantined(ArteryControlFormats.Quarantined.parseFrom(bytes))
case FlushManifest => Flush
case FlushAckManifest => FlushAck
case ActorSystemTerminatingManifest => deserializeWithFromAddress(bytes, ActorSystemTerminating)
case ActorSystemTerminatingAckManifest => deserializeWithFromAddress(bytes, ActorSystemTerminatingAck)
case ActorSystemTerminatingManifest => deserializeWithFromAddress(bytes, ActorSystemTerminating.apply)
case ActorSystemTerminatingAckManifest => deserializeWithFromAddress(bytes, ActorSystemTerminatingAck.apply)
case ActorRefCompressionAdvertisementManifest => deserializeActorRefCompressionAdvertisement(bytes)
case ActorRefCompressionAdvertisementAckManifest =>
deserializeCompressionTableAdvertisementAck(bytes, ActorRefCompressionAdvertisementAck)
deserializeCompressionTableAdvertisementAck(bytes, ActorRefCompressionAdvertisementAck.apply)
case ClassManifestCompressionAdvertisementManifest =>
deserializeCompressionAdvertisement(bytes, identity, ClassManifestCompressionAdvertisement)
deserializeCompressionAdvertisement(bytes, identity, ClassManifestCompressionAdvertisement.apply)
case ClassManifestCompressionAdvertisementAckManifest =>
deserializeCompressionTableAdvertisementAck(bytes, ClassManifestCompressionAdvertisementAck)
deserializeCompressionTableAdvertisementAck(bytes, ClassManifestCompressionAdvertisementAck.apply)
case ArteryHeartbeatManifest => RemoteWatcher.ArteryHeartbeat
case ArteryHeartbeatRspManifest => deserializeArteryHeartbeatRsp(bytes, ArteryHeartbeatRsp)
case ArteryHeartbeatRspManifest => deserializeArteryHeartbeatRsp(bytes, ArteryHeartbeatRsp.apply)
case _ =>
throw new NotSerializableException(
s"Manifest '$manifest' not defined for ArteryControlMessageSerializer (serializer id $identifier)")
@ -146,7 +148,7 @@ private[akka] final class ArteryMessageSerializer(val system: ExtendedActorSyste
serializeCompressionAdvertisement(adv)(serializeActorRef)
def deserializeActorRefCompressionAdvertisement(bytes: Array[Byte]): ActorRefCompressionAdvertisement =
deserializeCompressionAdvertisement(bytes, deserializeActorRef, ActorRefCompressionAdvertisement)
deserializeCompressionAdvertisement(bytes, deserializeActorRef, ActorRefCompressionAdvertisement.apply)
def serializeCompressionAdvertisement[T](adv: CompressionAdvertisement[T])(
keySerializer: T => String): ArteryControlFormats.CompressionTableAdvertisement = {

View file

@ -5,7 +5,6 @@
package akka.remote.serialization
import scala.collection.immutable
import scala.reflect.ClassTag
import com.typesafe.config.{ Config, ConfigFactory }
import util.{ Failure, Success }
@ -234,14 +233,15 @@ private[akka] final class DaemonMsgCreateSerializer(val system: ExtendedActorSys
private def oldDeserialize(data: ByteString, className: String): AnyRef =
if (data.isEmpty && className == "null") null
else oldDeserialize(data, system.dynamicAccess.getClassFor[AnyRef](className).get)
else
oldDeserialize[AnyRef](data, system.dynamicAccess.getClassFor[AnyRef](className).get.asInstanceOf[Class[AnyRef]])
private def oldDeserialize[T: ClassTag](data: ByteString, clazz: Class[T]): T = {
private def oldDeserialize[T](data: ByteString, clazz: Class[T]): T = {
val bytes = data.toByteArray
serialization.deserialize(bytes, clazz) match {
case Success(x: T) => x
case Success(other) =>
throw new IllegalArgumentException("Can't deserialize to [%s], got [%s]".format(clazz.getName, other))
case Success(x) =>
if (clazz.isInstance(x)) x
else throw new IllegalArgumentException("Can't deserialize to [%s], got [%s]".format(clazz.getName, x))
case Failure(e) =>
// Fallback to the java serializer, because some interfaces don't implement java.io.Serializable,
// but the impl instance does. This could be optimized by adding java serializers in reference.conf:
@ -249,8 +249,10 @@ private[akka] final class DaemonMsgCreateSerializer(val system: ExtendedActorSys
// akka.routing.RouterConfig
// akka.actor.Scope
serialization.deserialize(bytes, classOf[java.io.Serializable]) match {
case Success(x: T) => x
case _ => throw e // the first exception
case Success(x) =>
if (clazz.isInstance(x)) x.asInstanceOf[T]
else throw e
case _ => throw e // the first exception
}
}
}

View file

@ -55,7 +55,7 @@ class ProtobufSerializer(val system: ExtendedActorSystem) extends BaseSerializer
// This must lazy otherwise it will deadlock the ActorSystem creation
private lazy val serialization = SerializationExtension(system)
private val log = Logging.withMarker(system, getClass)
private val log = Logging.withMarker(system, classOf[ProtobufSerializer])
override def includeManifest: Boolean = true

View file

@ -21,7 +21,7 @@ private[akka] class ThrowableSupport(system: ExtendedActorSystem) {
private lazy val serialization = SerializationExtension(system)
private val payloadSupport = new WrappedPayloadSupport(system)
private val log = Logging(system, getClass)
private val log = Logging(system, classOf[ThrowableSupport])
def serializeThrowable(t: Throwable): Array[Byte] = {
toProtobufThrowable(t).build().toByteArray

View file

@ -18,7 +18,7 @@ import akka.serialization.DisabledJavaSerializer
private[akka] class WrappedPayloadSupport(system: ExtendedActorSystem) {
private lazy val serialization = SerializationExtension(system)
private val log = Logging(system, getClass)
private val log = Logging(system, classOf[WrappedPayloadSupport])
/**
* Serialize the `input` along with its `manifest` and `serializerId`.

View file

@ -69,7 +69,7 @@ private[remote] class FailureInjectorTransportAdapter(
with AssociationEventListener {
private def rng = ThreadLocalRandom.current()
private val log = Logging(extendedSystem, getClass)
private val log = Logging(extendedSystem, classOf[FailureInjectorTransportAdapter])
private val shouldDebugLog: Boolean = extendedSystem.settings.config.getBoolean("akka.remote.classic.gremlin.debug")
@volatile private var upstreamListener: Option[AssociationEventListener] = None

View file

@ -10,7 +10,6 @@ import scala.concurrent.{ Await, Future, Promise }
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import TestTransport._
import com.typesafe.config.Config
import akka.actor._
@ -29,11 +28,13 @@ import akka.util.ByteString
@deprecated("Classic remoting is deprecated, use Artery", "2.6.0")
class TestTransport(
val localAddress: Address,
final val registry: AssociationRegistry,
final val registry: TestTransport.AssociationRegistry,
val maximumPayloadBytes: Int = 32000,
val schemeIdentifier: String = "test")
extends Transport {
import TestTransport._
def this(system: ExtendedActorSystem, conf: Config) = {
this(
AddressFromURIString(conf.getString("local-address")),
@ -445,6 +446,7 @@ object TestTransport {
*/
@deprecated("Classic remoting is deprecated, use Artery", "2.6.0")
object AssociationRegistry {
import TestTransport._
private final val registries = scala.collection.mutable.Map[String, AssociationRegistry]()
def get(key: String): AssociationRegistry = this.synchronized {

View file

@ -474,7 +474,7 @@ private[transport] class ThrottledAssociation(
when(WaitOrigin) {
case Event(InboundPayload(p), ExposedHandle(exposedHandle)) =>
throttledMessages = throttledMessages.enqueue(p)
throttledMessages = throttledMessages :+ p
peekOrigin(p) match {
case Some(origin) =>
manager ! Checkin(origin, exposedHandle)
@ -485,7 +485,7 @@ private[transport] class ThrottledAssociation(
when(WaitMode) {
case Event(InboundPayload(p), _) =>
throttledMessages = throttledMessages.enqueue(p)
throttledMessages = throttledMessages :+ p
stay()
case Event(mode: ThrottleMode, ExposedHandle(exposedHandle)) =>
inboundThrottleMode = mode
@ -502,7 +502,7 @@ private[transport] class ThrottledAssociation(
when(WaitUpstreamListener) {
case Event(InboundPayload(p), _) =>
throttledMessages = throttledMessages.enqueue(p)
throttledMessages = throttledMessages :+ p
stay()
case Event(Listener(listener), _) =>
upstreamListener = listener
@ -517,7 +517,7 @@ private[transport] class ThrottledAssociation(
self ! Dequeue
goto(Throttling)
case Event(InboundPayload(p), _) =>
throttledMessages = throttledMessages.enqueue(p)
throttledMessages = throttledMessages :+ p
stay()
}
@ -585,11 +585,11 @@ private[transport] class ThrottledAssociation(
inboundThrottleMode = newbucket
upstreamListener.notify(InboundPayload(payload))
} else {
throttledMessages = throttledMessages.enqueue(payload)
throttledMessages = throttledMessages :+ payload
scheduleDequeue(inboundThrottleMode.timeToAvailable(System.nanoTime(), tokens))
}
} else {
throttledMessages = throttledMessages.enqueue(payload)
throttledMessages = throttledMessages :+ payload
}
}
}

View file

@ -360,7 +360,7 @@ class NettyTransport(val settings: NettyTransportSettings, val system: ExtendedA
@volatile private var boundTo: Address = _
@volatile private var serverChannel: Channel = _
private val log = Logging.withMarker(system, this.getClass)
private val log = Logging.withMarker(system, classOf[NettyTransport])
/**
* INTERNAL API

View file

@ -399,6 +399,7 @@ lazy val remote =
.settings(OSGi.remote)
.settings(Protobuf.settings)
.settings(Test / parallelExecution := false)
.settings(serialversionRemoverPluginSettings)
.enablePlugins(Jdk9)
lazy val remoteTests = akkaModule("akka-remote-tests")