Compile typed actors with Scala 3 (#30258)

This commit is contained in:
Lukas Rytz 2021-06-02 16:10:10 +02:00 committed by GitHub
parent 3cafeb2639
commit 41d5f1571b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
19 changed files with 104 additions and 54 deletions

View file

@ -38,7 +38,7 @@ jobs:
- stage: scala3
name: scala3
# separate job since only a few modules compile with Scala 3 yet
script: jabba install adopt@1.11-0 && jabba use adopt@1.11-0 && sbt -Dakka.build.scalaVersion=3.0 "akka-actor-tests/test:compile"
script: jabba install adopt@1.11-0 && jabba use adopt@1.11-0 && sbt -Dakka.build.scalaVersion=3.0 "akka-actor-tests/test:compile" akka-actor-typed/compile
stages:
- name: whitesource

View file

@ -0,0 +1,5 @@
ProblemFilters.exclude[MissingClassProblem]("akka.actor.typed.internal.BehaviorImpl$StoppedBehavior$")
ProblemFilters.exclude[FinalClassProblem]("akka.actor.typed.internal.BehaviorImpl$StoppedBehavior")
ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.actor.typed.internal.receptionist.LocalReceptionist#State.serviceInstanceAdded")
ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.actor.typed.internal.receptionist.LocalReceptionist#State.serviceInstanceRemoved")

View file

@ -0,0 +1,16 @@
/*
* Copyright (C) 2009-2021 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.actor.typed.internal.receptionist
import akka.actor.typed.ActorRef
import akka.annotation.InternalApi
/**
* INTERNAL API
*/
@InternalApi private[receptionist] object Platform {
type Service[K <: AbstractServiceKey] = ActorRef[K#Protocol]
type Subscriber[K <: AbstractServiceKey] = ActorRef[ReceptionistMessages.Listing[K#Protocol]]
}

View file

@ -0,0 +1,16 @@
/*
* Copyright (C) 2009-2021 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.actor.typed.internal.receptionist
import akka.actor.typed.ActorRef
import akka.annotation.InternalApi
/**
* INTERNAL API
*/
@InternalApi private[receptionist] object Platform {
type Service[K <: AbstractServiceKey] = ActorRef[K#Protocol]
type Subscriber[K <: AbstractServiceKey] = ActorRef[ReceptionistMessages.Listing[K#Protocol]]
}

View file

@ -0,0 +1,23 @@
/*
* Copyright (C) 2009-2021 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.actor.typed.internal.receptionist
import akka.actor.typed.ActorRef
import akka.annotation.InternalApi
/**
* INTERNAL API
*/
@InternalApi private[receptionist] object Platform {
type Aux[P] = AbstractServiceKey { type Protocol = P }
type Service[K <: Aux[_]] = K match {
case Aux[t] => ActorRef[t]
}
type Subscriber[K <: Aux[_]] = K match {
case Aux[t] => ActorRef[ReceptionistMessages.Listing[t]]
}
}

View file

@ -55,13 +55,13 @@ object SpawnProtocol {
def apply(): Behavior[Command] =
Behaviors.receive { (ctx, msg) =>
msg match {
case Spawn(bhvr, name, props, replyTo) =>
case Spawn(bhvr: Behavior[t], name, props, replyTo) =>
val ref =
if (name == null || name.equals(""))
ctx.spawnAnonymous(bhvr, props)
else {
@tailrec def spawnWithUniqueName(c: Int): ActorRef[Any] = {
@tailrec def spawnWithUniqueName(c: Int): ActorRef[t] = {
val nameSuggestion = if (c == 0) name else s"$name-$c"
ctx.child(nameSuggestion) match {
case Some(_) => spawnWithUniqueName(c + 1) // already taken, try next

View file

@ -257,7 +257,7 @@ import akka.util.ConstantFun.scalaIdentityFunction
}
}
private class ConsumerControllerImpl[A](
private class ConsumerControllerImpl[A] private (
context: ActorContext[ConsumerControllerImpl.InternalCommand],
retryTimer: ConsumerControllerImpl.RetryTimer,
stashBuffer: StashBuffer[ConsumerControllerImpl.InternalCommand],
@ -289,7 +289,7 @@ private class ConsumerControllerImpl[A](
private def active(s: State[A]): Behavior[InternalCommand] = {
Behaviors
.receiveMessage[InternalCommand] {
case seqMsg: SequencedMessage[A] =>
case seqMsg: SequencedMessage[A @unchecked] =>
val pid = seqMsg.producerId
val seqNr = seqMsg.seqNr
val expectedSeqNr = s.receivedSeqNr + 1
@ -340,13 +340,13 @@ private class ConsumerControllerImpl[A](
case Confirmed =>
receiveUnexpectedConfirmed()
case start: Start[A] =>
case start: Start[A @unchecked] =>
receiveStart(s, start, newState => active(newState))
case ConsumerTerminated(c) =>
receiveConsumerTerminated(c)
case reg: RegisterToProducerController[A] =>
case reg: RegisterToProducerController[A @unchecked] =>
receiveRegisterToProducerController(s, reg, newState => active(newState))
case _: DeliverThenStop[_] =>
@ -426,7 +426,7 @@ private class ConsumerControllerImpl[A](
throw new IllegalStateException("StashBuffer should be cleared before resending.")
Behaviors
.receiveMessage[InternalCommand] {
case seqMsg: SequencedMessage[A] =>
case seqMsg: SequencedMessage[A @unchecked] =>
val seqNr = seqMsg.seqNr
if (s.isProducerChanged(seqMsg)) {
@ -465,13 +465,13 @@ private class ConsumerControllerImpl[A](
case Confirmed =>
receiveUnexpectedConfirmed()
case start: Start[A] =>
case start: Start[A @unchecked] =>
receiveStart(s, start, newState => resending(newState))
case ConsumerTerminated(c) =>
receiveConsumerTerminated(c)
case reg: RegisterToProducerController[A] =>
case reg: RegisterToProducerController[A @unchecked] =>
receiveRegisterToProducerController(s, reg, newState => active(newState))
case _: DeliverThenStop[_] =>
@ -594,7 +594,7 @@ private class ConsumerControllerImpl[A](
scalaIdentityFunction)
}
case msg: SequencedMessage[A] =>
case msg: SequencedMessage[_] =>
flightRecorder.consumerReceivedPreviousInProgress(msg.producerId, msg.seqNr, stashBuffer.size + 1)
val expectedSeqNr = seqMsg.seqNr + stashBuffer.size + 1
if (msg.seqNr < expectedSeqNr && msg.producerController == seqMsg.producerController) {
@ -622,14 +622,14 @@ private class ConsumerControllerImpl[A](
// no retries when waitingForConfirmation, will be performed from (idle) active
Behaviors.same
case start: Start[A] =>
case start: Start[A @unchecked] =>
start.deliverTo ! Delivery(seqMsg.message.asInstanceOf[A], context.self, seqMsg.producerId, seqMsg.seqNr)
receiveStart(s, start, newState => waitingForConfirmation(newState, seqMsg))
case ConsumerTerminated(c) =>
receiveConsumerTerminated(c)
case reg: RegisterToProducerController[A] =>
case reg: RegisterToProducerController[A @unchecked] =>
receiveRegisterToProducerController(s, reg, newState => waitingForConfirmation(newState, seqMsg))
case _: DeliverThenStop[_] =>

View file

@ -696,7 +696,7 @@ private class ProducerControllerImpl[A: ClassTag](
context.self ! ResendFirst
}
// update the send function
val newSend = consumerController ! _
val newSend = consumerController.tell(_)
active(s.copy(firstSeqNr = newFirstSeqNr, send = newSend))
}
@ -796,7 +796,7 @@ private class ProducerControllerImpl[A: ClassTag](
case StoreMessageSentCompleted(sent: MessageSent[_]) =>
receiveStoreMessageSentCompleted(sent.seqNr)
case f: StoreMessageSentFailed[A] =>
case f: StoreMessageSentFailed[A @unchecked] =>
receiveStoreMessageSentFailed(f)
case Request(newConfirmedSeqNr, newRequestedSeqNr, supportResend, viaTimeout) =>
@ -817,7 +817,7 @@ private class ProducerControllerImpl[A: ClassTag](
case ResendFirstUnconfirmed =>
receiveResendFirstUnconfirmed()
case start: Start[A] =>
case start: Start[A @unchecked] =>
receiveStart(start)
case RegisterConsumer(consumerController: ActorRef[ConsumerController.Command[A]] @unchecked) =>

View file

@ -621,22 +621,22 @@ private class WorkPullingProducerControllerImpl[A: ClassTag](
else
onMessageBeforeDurableQueue(msg, Some(replyTo))
case m: ResendDurableMsg[A] =>
case m: ResendDurableMsg[A @unchecked] =>
onResendDurableMsg(m)
case StoreMessageSentCompleted(MessageSent(seqNr, m: A, _, _, _)) =>
receiveStoreMessageSentCompleted(seqNr, m)
case f: StoreMessageSentFailed[A] =>
case f: StoreMessageSentFailed[A @unchecked] =>
receiveStoreMessageSentFailed(f)
case ack: Ack =>
receiveAck(ack)
case w: WorkerRequestNext[A] =>
case w: WorkerRequestNext[A @unchecked] =>
receiveWorkerRequestNext(w)
case curr: CurrentWorkers[A] =>
case curr: CurrentWorkers[A @unchecked] =>
receiveCurrentWorkers(curr)
case GetWorkerStats(replyTo) =>
@ -646,7 +646,7 @@ private class WorkPullingProducerControllerImpl[A: ClassTag](
case RegisterConsumerDone =>
Behaviors.same
case start: Start[A] =>
case start: Start[A @unchecked] =>
receiveStart(start)
case AskTimeout(outKey, outSeqNr) =>

View file

@ -227,7 +227,9 @@ import scala.util.Success
createRequest: akka.japi.function.Function[ActorRef[Res], Req],
applyToResponse: akka.japi.function.Function2[Res, Throwable, T]): Unit = {
import akka.actor.typed.javadsl.AskPattern
pipeToSelf(AskPattern.ask(target, (ref) => createRequest(ref), responseTimeout, system.scheduler), applyToResponse)
pipeToSelf[Res](
AskPattern.ask(target, ref => createRequest(ref), responseTimeout, system.scheduler),
applyToResponse)
}
override def askWithStatus[Req, Res](

View file

@ -85,13 +85,14 @@ private[akka] object BehaviorTags {
override def toString: String = s"Failed($cause)"
}
object StoppedBehavior extends StoppedBehavior[Nothing](OptionVal.None)
// used to be `object StoppedBehavior extends ...` https://github.com/lampepfl/dotty/issues/12602
val StoppedBehavior = new StoppedBehavior[Nothing](OptionVal.None)
/**
* When the cell is stopping this behavior is used, so
* that PostStop can be sent to previous behavior from `finishTerminate`.
*/
private[akka] sealed class StoppedBehavior[T](val postStop: OptionVal[TypedActorContext[T] => Unit])
private[akka] final class StoppedBehavior[T](val postStop: OptionVal[TypedActorContext[T] => Unit])
extends Behavior[T](BehaviorTags.StoppedBehavior) {
def onPostStop(ctx: TypedActorContext[T]): Unit = {

View file

@ -100,7 +100,7 @@ private[akka] final class InterceptorImpl[O, I](
} else {
// returned behavior could be nested in setups, so we need to start before we deduplicate
val duplicateInterceptExists = Behavior.existsInStack(started) {
case i: InterceptorImpl[O, I]
case i: InterceptorImpl[_, _]
if interceptor.isSame(i.interceptor.asInstanceOf[BehaviorInterceptor[Any, Any]]) =>
true
case _ => false
@ -181,7 +181,6 @@ private[akka] final class LogMessagesInterceptor(val opts: LogOptions) extends B
case Level.INFO => logger.info(template, selfPath, messageOrSignal)
case Level.DEBUG => logger.debug(template, selfPath, messageOrSignal)
case Level.TRACE => logger.trace(template, selfPath, messageOrSignal)
case other => throw new IllegalArgumentException(s"Unknown log level [$other].")
}
}
}

View file

@ -25,7 +25,7 @@ import java.util.function.Predicate
* INTERNAL API
*/
@InternalApi private[akka] object StashBufferImpl {
private final class Node[T](var next: Node[T], val message: T) {
private[akka] final class Node[T](var next: Node[T], val message: T) {
def apply(f: T => Unit): Unit = f(message)
}

View file

@ -67,8 +67,8 @@ private abstract class AbstractSupervisor[I, Thr <: Throwable](strategy: Supervi
override def isSame(other: BehaviorInterceptor[Any, Any]): Boolean = {
other match {
case as: AbstractSupervisor[_, Thr] if throwableClass == as.throwableClass => true
case _ => false
case as: AbstractSupervisor[_, _] if throwableClass == as.throwableClass => true
case _ => false
}
}
@ -95,7 +95,6 @@ private abstract class AbstractSupervisor[I, Thr <: Throwable](strategy: Supervi
case Level.INFO => logger.info(logMessage, unwrapped)
case Level.DEBUG => logger.debug(logMessage, unwrapped)
case Level.TRACE => logger.trace(logMessage, unwrapped)
case other => throw new IllegalArgumentException(s"Unknown log level [$other].")
}
}
}

View file

@ -49,7 +49,8 @@ import akka.annotation.InternalApi
// so we need to look through the stack and eliminate any MCD already existing
def loop(next: Behavior[T]): Behavior[T] = {
next match {
case i: InterceptorImpl[T, T] if i.interceptor.isSame(this.asInstanceOf[BehaviorInterceptor[Any, Any]]) =>
case i: InterceptorImpl[_, T @unchecked]
if i.interceptor.isSame(this.asInstanceOf[BehaviorInterceptor[Any, Any]]) =>
// eliminate that interceptor
loop(i.nestedBehavior)

View file

@ -25,22 +25,10 @@ private[akka] object ActorContextAdapter {
}
def toClassic[U](context: scaladsl.ActorContext[_]): classic.ActorContext =
context match {
case c: TypedActorContext[_] => toClassicImp(c)
case _ =>
throw new UnsupportedOperationException(
"unknown ActorContext type " +
s"($context of class ${context.getClass.getName})")
}
toClassicImp(context)
def toClassic[U](context: javadsl.ActorContext[_]): classic.ActorContext =
context match {
case c: TypedActorContext[_] => toClassicImp(c)
case _ =>
throw new UnsupportedOperationException(
"unknown ActorContext type " +
s"($context of class ${context.getClass.getName})")
}
toClassicImp(context)
}
/**

View file

@ -36,8 +36,8 @@ private[akka] object LocalReceptionist extends ReceptionistBehaviorProvider {
override val name = "localReceptionist"
private type Service[K <: AbstractServiceKey] = ActorRef[K#Protocol]
private type Subscriber[K <: AbstractServiceKey] = ActorRef[ReceptionistMessages.Listing[K#Protocol]]
private type Service[K <: AbstractServiceKey] = Platform.Service[K]
private type Subscriber[K <: AbstractServiceKey] = Platform.Subscriber[K]
private sealed trait InternalCommand
private final case class RegisteredActorTerminated[T](ref: ActorRef[T]) extends InternalCommand
@ -65,7 +65,7 @@ private[akka] object LocalReceptionist extends ReceptionistBehaviorProvider {
subscriptions: TypedMultiMap[AbstractServiceKey, Subscriber],
subscriptionsPerActor: Map[ActorRef[_], Set[AbstractServiceKey]]) {
def serviceInstanceAdded[Key <: AbstractServiceKey, SI <: Service[Key]](key: Key)(serviceInstance: SI): State = {
def serviceInstanceAdded[Key <: AbstractServiceKey](key: Key)(serviceInstance: ActorRef[key.Protocol]): State = {
val newServices = services.inserted(key)(serviceInstance)
val newServicePerActor =
servicesPerActor.updated(
@ -74,7 +74,7 @@ private[akka] object LocalReceptionist extends ReceptionistBehaviorProvider {
copy(services = newServices, servicesPerActor = newServicePerActor)
}
def serviceInstanceRemoved[Key <: AbstractServiceKey, SI <: Service[Key]](key: Key)(serviceInstance: SI): State = {
def serviceInstanceRemoved[Key <: AbstractServiceKey](key: Key)(serviceInstance: ActorRef[key.Protocol]): State = {
val newServices = services.removed(key)(serviceInstance)
val newServicePerActor =
servicesPerActor.get(serviceInstance) match {
@ -97,7 +97,7 @@ private[akka] object LocalReceptionist extends ReceptionistBehaviorProvider {
if (keys.isEmpty) services
else
keys.foldLeft(services)((acc, key) =>
acc.removed(key.asServiceKey)(serviceInstance.asInstanceOf[Service[AbstractServiceKey]]))
acc.removed(key.asServiceKey)(serviceInstance.asInstanceOf[ActorRef[key.Protocol]]))
val newServicesPerActor = servicesPerActor - serviceInstance
copy(services = newServices, servicesPerActor = newServicesPerActor)
}

View file

@ -108,7 +108,7 @@ object AskPattern {
// We do not currently use the implicit sched, but want to require it
// because it might be needed when we move to a 'native' typed runtime, see #24219
ref match {
case a: InternalRecipientRef[_] => askClassic(a, timeout, replyTo)
case a: InternalRecipientRef[Req] => askClassic[Req, Res](a, timeout, replyTo)
case a =>
throw new IllegalStateException(
"Only expect references to be RecipientRef, ActorRefAdapter or ActorSystemAdapter until " +

View file

@ -5,7 +5,7 @@
package akka.actor.typed
package scaladsl
import scala.reflect.{ classTag, ClassTag }
import scala.reflect.ClassTag
import akka.actor.typed.internal._
import akka.annotation.{ DoNotInherit, InternalApi }
@ -222,8 +222,8 @@ object Behaviors {
final class Supervise[T] private[akka] (val wrapped: Behavior[T]) extends AnyVal {
/** Specify the [[SupervisorStrategy]] to be invoked when the wrapped behavior throws. */
def onFailure[Thr <: Throwable: ClassTag](strategy: SupervisorStrategy): Behavior[T] = {
val tag = classTag[Thr]
def onFailure[Thr <: Throwable](strategy: SupervisorStrategy)(
implicit tag: ClassTag[Thr] = ThrowableClassTag): Behavior[T] = {
val effectiveTag = if (tag == ClassTag.Nothing) ThrowableClassTag else tag
Supervisor(Behavior.validateAsInitial(wrapped), strategy)(effectiveTag)
}