From f12d72effc995f31c6f82ea72cf0b50de0e11fd0 Mon Sep 17 00:00:00 2001 From: Arnout Engelen Date: Mon, 4 Oct 2021 12:23:35 +0200 Subject: [PATCH] Compile akka-cluster-sharding-typed with scala3 (#30738) * Compile akka-cluster-sharding-typed with scala3 Refs #30243 * Remove unused variable --- .github/workflows/scala3-build.yml | 2 +- .github/workflows/scala3-compile.yml | 2 +- .../typed/ClusterShardingSettings.scala | 4 +-- .../typed/ShardingMessageExtractor.scala | 8 ++--- .../ShardingConsumerControllerImpl.scala | 4 +-- .../ShardingProducerControllerImpl.scala | 8 ++--- .../akka/persistence/typed/crdt/ORSet.scala | 32 +++++++++---------- .../typed/internal/ReplayingEvents.scala | 18 +++++------ .../typed/internal/ReplayingSnapshot.scala | 12 +++---- .../persistence/typed/internal/Running.scala | 5 +-- .../typed/state/internal/Recovering.scala | 8 ++--- .../typed/state/internal/Running.scala | 5 +-- .../PersistentFsmToTypedMigrationSpec.scala | 5 +-- 13 files changed, 52 insertions(+), 61 deletions(-) diff --git a/.github/workflows/scala3-build.yml b/.github/workflows/scala3-build.yml index c689dcc2eb..94c77bec7e 100644 --- a/.github/workflows/scala3-build.yml +++ b/.github/workflows/scala3-build.yml @@ -21,7 +21,7 @@ jobs: - akka-testkit/test akka-actor-tests/test - akka-actor-testkit-typed/test akka-actor-typed-tests/test - akka-bench-jmh/test - - akka-cluster/Test/compile akka-cluster-tools/test akka-cluster-typed/test akka-distributed-data/test akka-cluster-metrics/Test/compile akka-cluster-sharding/Test/compile + - akka-cluster/Test/compile akka-cluster-tools/test akka-cluster-typed/test akka-distributed-data/test akka-cluster-metrics/Test/compile akka-cluster-sharding/Test/compile akka-cluster-sharding-typed/compile - akka-discovery/test akka-coordination/test - akka-persistence/test akka-persistence-shared/test akka-persistence-query/test - akka-pki/test akka-slf4j/test diff --git a/.github/workflows/scala3-compile.yml b/.github/workflows/scala3-compile.yml index b082601b61..01557712db 100644 --- a/.github/workflows/scala3-compile.yml +++ b/.github/workflows/scala3-compile.yml @@ -19,7 +19,7 @@ jobs: - akka-testkit/Test/compile akka-actor-tests/Test/compile - akka-actor-testkit-typed/Test/compile akka-actor-typed-tests/Test/compile - akka-bench-jmh/Test/compile - - akka-cluster/Test/compile akka-cluster-tools/Test/compile akka-cluster-typed/Test/compile akka-distributed-data/Test/compile akka-cluster-metrics/Test/compile akka-cluster-sharding/Test/compile + - akka-cluster/Test/compile akka-cluster-tools/Test/compile akka-cluster-typed/Test/compile akka-distributed-data/Test/compile akka-cluster-metrics/Test/compile akka-cluster-sharding/Test/compile akka-cluster-sharding-typed/compile - akka-discovery/Test/compile akka-coordination/Test/compile - akka-persistence/Test/compile akka-persistence-shared/Test/compile akka-persistence-query/Test/compile - akka-pki/Test/compile akka-slf4j/Test/compile diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ClusterShardingSettings.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ClusterShardingSettings.scala index 51a7bcf905..ab025af028 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ClusterShardingSettings.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ClusterShardingSettings.scala @@ -156,10 +156,10 @@ object ClusterShardingSettings { throw new IllegalArgumentException( s"Not recognized RememberEntitiesStore, only '${RememberEntitiesStoreModeDData.name}' and '${RememberEntitiesStoreModeEventSourced.name}' are supported.") } - final case object RememberEntitiesStoreModeEventSourced extends RememberEntitiesStoreMode { + case object RememberEntitiesStoreModeEventSourced extends RememberEntitiesStoreMode { override def name = "eventsourced" } - final case object RememberEntitiesStoreModeDData extends RememberEntitiesStoreMode { override def name = "ddata" } + case object RememberEntitiesStoreModeDData extends RememberEntitiesStoreMode { override def name = "ddata" } // generated using kaze-class final class TuningParameters private ( diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ShardingMessageExtractor.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ShardingMessageExtractor.scala index ec59c3a35d..6adda657a0 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ShardingMessageExtractor.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ShardingMessageExtractor.scala @@ -73,9 +73,9 @@ abstract class ShardingMessageExtractor[E, M] { final class HashCodeMessageExtractor[M](val numberOfShards: Int) extends ShardingMessageExtractor[ShardingEnvelope[M], M] { - import akka.cluster.sharding.ShardRegion.HashCodeMessageExtractor override def entityId(envelope: ShardingEnvelope[M]): String = envelope.entityId - override def shardId(entityId: String): String = HashCodeMessageExtractor.shardId(entityId, numberOfShards) + override def shardId(entityId: String): String = + akka.cluster.sharding.ShardRegion.HashCodeMessageExtractor.shardId(entityId, numberOfShards) override def unwrapMessage(envelope: ShardingEnvelope[M]): M = envelope.message } @@ -89,8 +89,8 @@ final class HashCodeMessageExtractor[M](val numberOfShards: Int) */ abstract class HashCodeNoEnvelopeMessageExtractor[M](val numberOfShards: Int) extends ShardingMessageExtractor[M, M] { - import akka.cluster.sharding.ShardRegion.HashCodeMessageExtractor - override def shardId(entityId: String): String = HashCodeMessageExtractor.shardId(entityId, numberOfShards) + override def shardId(entityId: String): String = + akka.cluster.sharding.ShardRegion.HashCodeMessageExtractor.shardId(entityId, numberOfShards) override final def unwrapMessage(message: M): M = message override def toString = s"HashCodeNoEnvelopeMessageExtractor($numberOfShards)" diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/delivery/internal/ShardingConsumerControllerImpl.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/delivery/internal/ShardingConsumerControllerImpl.scala index e35f308d1e..c9c75f663b 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/delivery/internal/ShardingConsumerControllerImpl.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/delivery/internal/ShardingConsumerControllerImpl.scala @@ -40,7 +40,7 @@ import akka.cluster.sharding.typed.delivery.ShardingConsumerController Behaviors.withStash(settings.bufferSize) { stashBuffer => Behaviors .receiveMessage[ConsumerController.Command[A]] { - case start: ConsumerController.Start[A] => + case start: ConsumerController.Start[A @unchecked] => ConsumerControllerImpl.enforceLocalConsumer(start.deliverTo) context.unwatch(consumer) context.watch(start.deliverTo) @@ -74,7 +74,7 @@ private class ShardingConsumerControllerImpl[A]( Behaviors .receiveMessagePartial[ConsumerController.Command[A]] { - case seqMsg: ConsumerController.SequencedMessage[A] => + case seqMsg: ConsumerController.SequencedMessage[A @unchecked] => def updatedProducerControllers(): Map[ActorRef[ProducerControllerImpl.InternalCommand], String] = { producerControllers.get(seqMsg.producerController) match { case Some(_) => diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/delivery/internal/ShardingProducerControllerImpl.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/delivery/internal/ShardingProducerControllerImpl.scala index 0773346684..e682ccea7e 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/delivery/internal/ShardingProducerControllerImpl.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/delivery/internal/ShardingProducerControllerImpl.scala @@ -509,7 +509,7 @@ private class ShardingProducerControllerImpl[A: ClassTag]( Behaviors.receiveMessage { - case msg: Msg[A] => + case msg: Msg[A @unchecked] => if (durableQueue.isEmpty) { // currentSeqNr is only updated when durableQueue is enabled onMessage(msg.envelope.entityId, msg.envelope.message, None, s.currentSeqNr, s.replyAfterStore) @@ -537,13 +537,13 @@ private class ShardingProducerControllerImpl[A: ClassTag]( case StoreMessageSentCompleted(MessageSent(seqNr, msg: A, _, entityId, _)) => receiveStoreMessageSentCompleted(seqNr, msg, entityId) - case f: StoreMessageSentFailed[A] => + case f: StoreMessageSentFailed[A @unchecked] => receiveStoreMessageSentFailed(f) case ack: Ack => receiveAck(ack) - case w: WrappedRequestNext[A] => + case w: WrappedRequestNext[A @unchecked] => receiveWrappedRequestNext(w) case ResendFirstUnconfirmed => @@ -552,7 +552,7 @@ private class ShardingProducerControllerImpl[A: ClassTag]( case CleanupUnused => receiveCleanupUnused() - case start: Start[A] => + case start: Start[A @unchecked] => receiveStart(start) case AskTimeout(outKey, outSeqNr) => diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/crdt/ORSet.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/crdt/ORSet.scala index 16acace57f..7c368fa2d2 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/crdt/ORSet.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/crdt/ORSet.scala @@ -53,8 +53,8 @@ object ORSet { underlying.originReplica, concatElementsMap(u.elementsMap.asInstanceOf[Map[A, Dot]]), underlying.vvector.merge(u.vvector))) - case _: AtomicDeltaOp[A] => DeltaGroup(Vector(this, that)) - case DeltaGroup(ops) => DeltaGroup(this +: ops) + case _: AtomicDeltaOp[A @unchecked] => DeltaGroup(Vector(this, that)) + case DeltaGroup(ops) => DeltaGroup(this +: ops) } private def concatElementsMap(thatMap: Map[A, Dot]): Map[A, Dot] = { @@ -72,16 +72,16 @@ object ORSet { throw new IllegalArgumentException(s"RemoveDeltaOp should contain one removed element, but was $underlying") override def merge(that: DeltaOp): DeltaOp = that match { - case _: AtomicDeltaOp[A] => DeltaGroup(Vector(this, that)) // keep it simple for removals - case DeltaGroup(ops) => DeltaGroup(this +: ops) + case _: AtomicDeltaOp[A @unchecked] => DeltaGroup(Vector(this, that)) // keep it simple for removals + case DeltaGroup(ops) => DeltaGroup(this +: ops) } } /** INTERNAL API: Used for `clear` but could be used for other cases also */ @InternalApi private[akka] final case class FullStateDeltaOp[A](underlying: ORSet[A]) extends AtomicDeltaOp[A] { override def merge(that: DeltaOp): DeltaOp = that match { - case _: AtomicDeltaOp[A] => DeltaGroup(Vector(this, that)) - case DeltaGroup(ops) => DeltaGroup(this +: ops) + case _: AtomicDeltaOp[A @unchecked] => DeltaGroup(Vector(this, that)) + case DeltaGroup(ops) => DeltaGroup(this +: ops) } } @@ -90,11 +90,11 @@ object ORSet { */ @InternalApi private[akka] final case class DeltaGroup[A](ops: immutable.IndexedSeq[DeltaOp]) extends DeltaOp { override def merge(that: DeltaOp): DeltaOp = that match { - case thatAdd: AddDeltaOp[A] => + case thatAdd: AddDeltaOp[A @unchecked] => // merge AddDeltaOp into last AddDeltaOp in the group, if possible ops.last match { - case thisAdd: AddDeltaOp[A] => DeltaGroup(ops.dropRight(1) :+ thisAdd.merge(thatAdd)) - case _ => DeltaGroup(ops :+ thatAdd) + case thisAdd: AddDeltaOp[A @unchecked] => DeltaGroup(ops.dropRight(1) :+ thisAdd.merge(thatAdd)) + case _ => DeltaGroup(ops :+ thatAdd) } case DeltaGroup(thatOps) => DeltaGroup(ops ++ thatOps) case _ => DeltaGroup(ops :+ that) @@ -434,15 +434,15 @@ final class ORSet[A] private[akka] ( override def applyOperation(thatDelta: ORSet.DeltaOp): ORSet[A] = { thatDelta match { - case d: ORSet.AddDeltaOp[A] => merge(d.underlying, addDeltaOp = true) - case d: ORSet.RemoveDeltaOp[A] => mergeRemoveDelta(d) - case d: ORSet.FullStateDeltaOp[A] => merge(d.underlying, addDeltaOp = false) + case d: ORSet.AddDeltaOp[A @unchecked] => merge(d.underlying, addDeltaOp = true) + case d: ORSet.RemoveDeltaOp[A @unchecked] => mergeRemoveDelta(d) + case d: ORSet.FullStateDeltaOp[A @unchecked] => merge(d.underlying, addDeltaOp = false) case ORSet.DeltaGroup(ops) => ops.foldLeft(this) { - case (acc, op: ORSet.AddDeltaOp[A]) => acc.merge(op.underlying, addDeltaOp = true) - case (acc, op: ORSet.RemoveDeltaOp[A]) => acc.mergeRemoveDelta(op) - case (acc, op: ORSet.FullStateDeltaOp[A]) => acc.merge(op.underlying, addDeltaOp = false) - case (_, _: ORSet.DeltaGroup[A]) => + case (acc, op: ORSet.AddDeltaOp[A @unchecked]) => acc.merge(op.underlying, addDeltaOp = true) + case (acc, op: ORSet.RemoveDeltaOp[A @unchecked]) => acc.mergeRemoveDelta(op) + case (acc, op: ORSet.FullStateDeltaOp[A @unchecked]) => acc.merge(op.underlying, addDeltaOp = false) + case (_, _: ORSet.DeltaGroup[A @unchecked]) => throw new IllegalArgumentException("ORSet.DeltaGroup should not be nested") } } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala index f3f7da819f..4b24632bde 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala @@ -95,15 +95,15 @@ private[akka] final class ReplayingEvents[C, E, S]( override def onMessage(msg: InternalProtocol): Behavior[InternalProtocol] = { msg match { - case JournalResponse(r) => onJournalResponse(r) - case SnapshotterResponse(r) => onSnapshotterResponse(r) - case RecoveryTickEvent(snap) => onRecoveryTick(snap) - case evt: ReplicatedEventEnvelope[E] => onInternalCommand(evt) - case pe: PublishedEventImpl => onInternalCommand(pe) - case cmd: IncomingCommand[C] => onInternalCommand(cmd) - case get: GetState[S @unchecked] => stashInternal(get) - case get: GetSeenSequenceNr => stashInternal(get) - case RecoveryPermitGranted => Behaviors.unhandled // should not happen, we already have the permit + case JournalResponse(r) => onJournalResponse(r) + case SnapshotterResponse(r) => onSnapshotterResponse(r) + case RecoveryTickEvent(snap) => onRecoveryTick(snap) + case evt: ReplicatedEventEnvelope[E @unchecked] => onInternalCommand(evt) + case pe: PublishedEventImpl => onInternalCommand(pe) + case cmd: IncomingCommand[C @unchecked] => onInternalCommand(cmd) + case get: GetState[S @unchecked] => stashInternal(get) + case get: GetSeenSequenceNr => stashInternal(get) + case RecoveryPermitGranted => Behaviors.unhandled // should not happen, we already have the permit } } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingSnapshot.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingSnapshot.scala index 9e8c240a2f..990aab253a 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingSnapshot.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingSnapshot.scala @@ -58,12 +58,12 @@ private[akka] class ReplayingSnapshot[C, E, S](override val setup: BehaviorSetup def stay(receivedPoisonPill: Boolean): Behavior[InternalProtocol] = { Behaviors .receiveMessage[InternalProtocol] { - case SnapshotterResponse(r) => onSnapshotterResponse(r, receivedPoisonPill) - case JournalResponse(r) => onJournalResponse(r) - case RecoveryTickEvent(snapshot) => onRecoveryTick(snapshot) - case evt: ReplicatedEventEnvelope[E] => onReplicatedEvent(evt) - case pe: PublishedEventImpl => onPublishedEvent(pe) - case cmd: IncomingCommand[C] => + case SnapshotterResponse(r) => onSnapshotterResponse(r, receivedPoisonPill) + case JournalResponse(r) => onJournalResponse(r) + case RecoveryTickEvent(snapshot) => onRecoveryTick(snapshot) + case evt: ReplicatedEventEnvelope[E @unchecked] => onReplicatedEvent(evt) + case pe: PublishedEventImpl => onPublishedEvent(pe) + case cmd: IncomingCommand[C @unchecked] => if (receivedPoisonPill) { if (setup.settings.logOnStashing) setup.internalLogger.debug("Discarding message [{}], because actor is to be stopped.", cmd) diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala index 685ce2e309..cc35a03725 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/Running.scala @@ -126,7 +126,7 @@ private[akka] object Running { val queryPluginId = replicationSetup.allReplicasAndQueryPlugins(replicaId) val replication = query.readJournalFor[EventsByPersistenceIdQuery](queryPluginId) - implicit val timeout = Timeout(30.seconds) + implicit val timeout: Timeout = 30.seconds implicit val scheduler = setup.context.system.scheduler implicit val ec = setup.context.system.executionContext @@ -907,9 +907,6 @@ private[akka] object Running { case callback: Callback[_] => callback.sideEffect(state.state) behavior - - case _ => - throw new IllegalArgumentException(s"Unsupported side effect detected [${effect.getClass.getName}]") } } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/state/internal/Recovering.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/state/internal/Recovering.scala index 5b9d645c12..e2f5e42298 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/state/internal/Recovering.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/state/internal/Recovering.scala @@ -72,10 +72,10 @@ private[akka] class Recovering[C, S](override val setup: BehaviorSetup[C, S]) def stay(receivedPoisonPill: Boolean): Behavior[InternalProtocol] = { Behaviors .receiveMessage[InternalProtocol] { - case success: GetSuccess[S] => onGetSuccess(success.result, receivedPoisonPill) - case GetFailure(exc) => onGetFailure(exc) - case RecoveryTimeout => onRecoveryTimeout() - case cmd: IncomingCommand[C] => + case success: GetSuccess[S @unchecked] => onGetSuccess(success.result, receivedPoisonPill) + case GetFailure(exc) => onGetFailure(exc) + case RecoveryTimeout => onRecoveryTimeout() + case cmd: IncomingCommand[C @unchecked] => if (receivedPoisonPill) { if (setup.settings.logOnStashing) setup.internalLogger.debug("Discarding message [{}], because actor is to be stopped.", cmd) diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/state/internal/Running.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/state/internal/Running.scala index 60f314caef..f4f77a3225 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/state/internal/Running.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/state/internal/Running.scala @@ -131,7 +131,7 @@ private[akka] object Running { sideEffects.size) effect match { - case CompositeEffect(eff, currentSideEffects) => + case CompositeEffect(eff, currentSideEffects: Seq[SideEffect[S @unchecked]]) => // unwrap and accumulate effects applyEffects(msg, state, eff, currentSideEffects ++ sideEffects) @@ -274,9 +274,6 @@ private[akka] object Running { case callback: Callback[_] => callback.sideEffect(state.state) behavior - - case _ => - throw new IllegalArgumentException(s"Unsupported side effect detected [${effect.getClass.getName}]") } } diff --git a/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/PersistentFsmToTypedMigrationSpec.scala b/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/PersistentFsmToTypedMigrationSpec.scala index bd9c042397..c85bf5af33 100644 --- a/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/PersistentFsmToTypedMigrationSpec.scala +++ b/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/PersistentFsmToTypedMigrationSpec.scala @@ -131,8 +131,6 @@ object ShoppingCartBehavior { Effect.none case Timeout => Effect.persist(CustomerInactive) - case _ => - Effect.none } case Inactive(_) => command match { @@ -164,13 +162,12 @@ object ShoppingCartBehavior { case ItemAdded(item) => Shopping(cart.addItem(item)) case _ => la } - case s @ Shopping(cart) => + case Shopping(cart) => event match { case ItemAdded(item) => Shopping(cart.addItem(item)) case OrderExecuted => Paid(cart) case OrderDiscarded => state // will be stopped case CustomerInactive => Inactive(cart) - case _ => s } case i @ Inactive(cart) => event match {