Compile akka-cluster-sharding-typed with scala3 (#30738)
* Compile akka-cluster-sharding-typed with scala3 Refs #30243 * Remove unused variable
This commit is contained in:
parent
8db6362d71
commit
f12d72effc
13 changed files with 52 additions and 61 deletions
2
.github/workflows/scala3-build.yml
vendored
2
.github/workflows/scala3-build.yml
vendored
|
|
@ -21,7 +21,7 @@ jobs:
|
||||||
- akka-testkit/test akka-actor-tests/test
|
- akka-testkit/test akka-actor-tests/test
|
||||||
- akka-actor-testkit-typed/test akka-actor-typed-tests/test
|
- akka-actor-testkit-typed/test akka-actor-typed-tests/test
|
||||||
- akka-bench-jmh/test
|
- akka-bench-jmh/test
|
||||||
- akka-cluster/Test/compile akka-cluster-tools/test akka-cluster-typed/test akka-distributed-data/test akka-cluster-metrics/Test/compile akka-cluster-sharding/Test/compile
|
- akka-cluster/Test/compile akka-cluster-tools/test akka-cluster-typed/test akka-distributed-data/test akka-cluster-metrics/Test/compile akka-cluster-sharding/Test/compile akka-cluster-sharding-typed/compile
|
||||||
- akka-discovery/test akka-coordination/test
|
- akka-discovery/test akka-coordination/test
|
||||||
- akka-persistence/test akka-persistence-shared/test akka-persistence-query/test
|
- akka-persistence/test akka-persistence-shared/test akka-persistence-query/test
|
||||||
- akka-pki/test akka-slf4j/test
|
- akka-pki/test akka-slf4j/test
|
||||||
|
|
|
||||||
2
.github/workflows/scala3-compile.yml
vendored
2
.github/workflows/scala3-compile.yml
vendored
|
|
@ -19,7 +19,7 @@ jobs:
|
||||||
- akka-testkit/Test/compile akka-actor-tests/Test/compile
|
- akka-testkit/Test/compile akka-actor-tests/Test/compile
|
||||||
- akka-actor-testkit-typed/Test/compile akka-actor-typed-tests/Test/compile
|
- akka-actor-testkit-typed/Test/compile akka-actor-typed-tests/Test/compile
|
||||||
- akka-bench-jmh/Test/compile
|
- akka-bench-jmh/Test/compile
|
||||||
- akka-cluster/Test/compile akka-cluster-tools/Test/compile akka-cluster-typed/Test/compile akka-distributed-data/Test/compile akka-cluster-metrics/Test/compile akka-cluster-sharding/Test/compile
|
- akka-cluster/Test/compile akka-cluster-tools/Test/compile akka-cluster-typed/Test/compile akka-distributed-data/Test/compile akka-cluster-metrics/Test/compile akka-cluster-sharding/Test/compile akka-cluster-sharding-typed/compile
|
||||||
- akka-discovery/Test/compile akka-coordination/Test/compile
|
- akka-discovery/Test/compile akka-coordination/Test/compile
|
||||||
- akka-persistence/Test/compile akka-persistence-shared/Test/compile akka-persistence-query/Test/compile
|
- akka-persistence/Test/compile akka-persistence-shared/Test/compile akka-persistence-query/Test/compile
|
||||||
- akka-pki/Test/compile akka-slf4j/Test/compile
|
- akka-pki/Test/compile akka-slf4j/Test/compile
|
||||||
|
|
|
||||||
|
|
@ -156,10 +156,10 @@ object ClusterShardingSettings {
|
||||||
throw new IllegalArgumentException(
|
throw new IllegalArgumentException(
|
||||||
s"Not recognized RememberEntitiesStore, only '${RememberEntitiesStoreModeDData.name}' and '${RememberEntitiesStoreModeEventSourced.name}' are supported.")
|
s"Not recognized RememberEntitiesStore, only '${RememberEntitiesStoreModeDData.name}' and '${RememberEntitiesStoreModeEventSourced.name}' are supported.")
|
||||||
}
|
}
|
||||||
final case object RememberEntitiesStoreModeEventSourced extends RememberEntitiesStoreMode {
|
case object RememberEntitiesStoreModeEventSourced extends RememberEntitiesStoreMode {
|
||||||
override def name = "eventsourced"
|
override def name = "eventsourced"
|
||||||
}
|
}
|
||||||
final case object RememberEntitiesStoreModeDData extends RememberEntitiesStoreMode { override def name = "ddata" }
|
case object RememberEntitiesStoreModeDData extends RememberEntitiesStoreMode { override def name = "ddata" }
|
||||||
|
|
||||||
// generated using kaze-class
|
// generated using kaze-class
|
||||||
final class TuningParameters private (
|
final class TuningParameters private (
|
||||||
|
|
|
||||||
|
|
@ -73,9 +73,9 @@ abstract class ShardingMessageExtractor[E, M] {
|
||||||
final class HashCodeMessageExtractor[M](val numberOfShards: Int)
|
final class HashCodeMessageExtractor[M](val numberOfShards: Int)
|
||||||
extends ShardingMessageExtractor[ShardingEnvelope[M], M] {
|
extends ShardingMessageExtractor[ShardingEnvelope[M], M] {
|
||||||
|
|
||||||
import akka.cluster.sharding.ShardRegion.HashCodeMessageExtractor
|
|
||||||
override def entityId(envelope: ShardingEnvelope[M]): String = envelope.entityId
|
override def entityId(envelope: ShardingEnvelope[M]): String = envelope.entityId
|
||||||
override def shardId(entityId: String): String = HashCodeMessageExtractor.shardId(entityId, numberOfShards)
|
override def shardId(entityId: String): String =
|
||||||
|
akka.cluster.sharding.ShardRegion.HashCodeMessageExtractor.shardId(entityId, numberOfShards)
|
||||||
override def unwrapMessage(envelope: ShardingEnvelope[M]): M = envelope.message
|
override def unwrapMessage(envelope: ShardingEnvelope[M]): M = envelope.message
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -89,8 +89,8 @@ final class HashCodeMessageExtractor[M](val numberOfShards: Int)
|
||||||
*/
|
*/
|
||||||
abstract class HashCodeNoEnvelopeMessageExtractor[M](val numberOfShards: Int) extends ShardingMessageExtractor[M, M] {
|
abstract class HashCodeNoEnvelopeMessageExtractor[M](val numberOfShards: Int) extends ShardingMessageExtractor[M, M] {
|
||||||
|
|
||||||
import akka.cluster.sharding.ShardRegion.HashCodeMessageExtractor
|
override def shardId(entityId: String): String =
|
||||||
override def shardId(entityId: String): String = HashCodeMessageExtractor.shardId(entityId, numberOfShards)
|
akka.cluster.sharding.ShardRegion.HashCodeMessageExtractor.shardId(entityId, numberOfShards)
|
||||||
override final def unwrapMessage(message: M): M = message
|
override final def unwrapMessage(message: M): M = message
|
||||||
|
|
||||||
override def toString = s"HashCodeNoEnvelopeMessageExtractor($numberOfShards)"
|
override def toString = s"HashCodeNoEnvelopeMessageExtractor($numberOfShards)"
|
||||||
|
|
|
||||||
|
|
@ -40,7 +40,7 @@ import akka.cluster.sharding.typed.delivery.ShardingConsumerController
|
||||||
Behaviors.withStash(settings.bufferSize) { stashBuffer =>
|
Behaviors.withStash(settings.bufferSize) { stashBuffer =>
|
||||||
Behaviors
|
Behaviors
|
||||||
.receiveMessage[ConsumerController.Command[A]] {
|
.receiveMessage[ConsumerController.Command[A]] {
|
||||||
case start: ConsumerController.Start[A] =>
|
case start: ConsumerController.Start[A @unchecked] =>
|
||||||
ConsumerControllerImpl.enforceLocalConsumer(start.deliverTo)
|
ConsumerControllerImpl.enforceLocalConsumer(start.deliverTo)
|
||||||
context.unwatch(consumer)
|
context.unwatch(consumer)
|
||||||
context.watch(start.deliverTo)
|
context.watch(start.deliverTo)
|
||||||
|
|
@ -74,7 +74,7 @@ private class ShardingConsumerControllerImpl[A](
|
||||||
|
|
||||||
Behaviors
|
Behaviors
|
||||||
.receiveMessagePartial[ConsumerController.Command[A]] {
|
.receiveMessagePartial[ConsumerController.Command[A]] {
|
||||||
case seqMsg: ConsumerController.SequencedMessage[A] =>
|
case seqMsg: ConsumerController.SequencedMessage[A @unchecked] =>
|
||||||
def updatedProducerControllers(): Map[ActorRef[ProducerControllerImpl.InternalCommand], String] = {
|
def updatedProducerControllers(): Map[ActorRef[ProducerControllerImpl.InternalCommand], String] = {
|
||||||
producerControllers.get(seqMsg.producerController) match {
|
producerControllers.get(seqMsg.producerController) match {
|
||||||
case Some(_) =>
|
case Some(_) =>
|
||||||
|
|
|
||||||
|
|
@ -509,7 +509,7 @@ private class ShardingProducerControllerImpl[A: ClassTag](
|
||||||
|
|
||||||
Behaviors.receiveMessage {
|
Behaviors.receiveMessage {
|
||||||
|
|
||||||
case msg: Msg[A] =>
|
case msg: Msg[A @unchecked] =>
|
||||||
if (durableQueue.isEmpty) {
|
if (durableQueue.isEmpty) {
|
||||||
// currentSeqNr is only updated when durableQueue is enabled
|
// currentSeqNr is only updated when durableQueue is enabled
|
||||||
onMessage(msg.envelope.entityId, msg.envelope.message, None, s.currentSeqNr, s.replyAfterStore)
|
onMessage(msg.envelope.entityId, msg.envelope.message, None, s.currentSeqNr, s.replyAfterStore)
|
||||||
|
|
@ -537,13 +537,13 @@ private class ShardingProducerControllerImpl[A: ClassTag](
|
||||||
case StoreMessageSentCompleted(MessageSent(seqNr, msg: A, _, entityId, _)) =>
|
case StoreMessageSentCompleted(MessageSent(seqNr, msg: A, _, entityId, _)) =>
|
||||||
receiveStoreMessageSentCompleted(seqNr, msg, entityId)
|
receiveStoreMessageSentCompleted(seqNr, msg, entityId)
|
||||||
|
|
||||||
case f: StoreMessageSentFailed[A] =>
|
case f: StoreMessageSentFailed[A @unchecked] =>
|
||||||
receiveStoreMessageSentFailed(f)
|
receiveStoreMessageSentFailed(f)
|
||||||
|
|
||||||
case ack: Ack =>
|
case ack: Ack =>
|
||||||
receiveAck(ack)
|
receiveAck(ack)
|
||||||
|
|
||||||
case w: WrappedRequestNext[A] =>
|
case w: WrappedRequestNext[A @unchecked] =>
|
||||||
receiveWrappedRequestNext(w)
|
receiveWrappedRequestNext(w)
|
||||||
|
|
||||||
case ResendFirstUnconfirmed =>
|
case ResendFirstUnconfirmed =>
|
||||||
|
|
@ -552,7 +552,7 @@ private class ShardingProducerControllerImpl[A: ClassTag](
|
||||||
case CleanupUnused =>
|
case CleanupUnused =>
|
||||||
receiveCleanupUnused()
|
receiveCleanupUnused()
|
||||||
|
|
||||||
case start: Start[A] =>
|
case start: Start[A @unchecked] =>
|
||||||
receiveStart(start)
|
receiveStart(start)
|
||||||
|
|
||||||
case AskTimeout(outKey, outSeqNr) =>
|
case AskTimeout(outKey, outSeqNr) =>
|
||||||
|
|
|
||||||
|
|
@ -53,8 +53,8 @@ object ORSet {
|
||||||
underlying.originReplica,
|
underlying.originReplica,
|
||||||
concatElementsMap(u.elementsMap.asInstanceOf[Map[A, Dot]]),
|
concatElementsMap(u.elementsMap.asInstanceOf[Map[A, Dot]]),
|
||||||
underlying.vvector.merge(u.vvector)))
|
underlying.vvector.merge(u.vvector)))
|
||||||
case _: AtomicDeltaOp[A] => DeltaGroup(Vector(this, that))
|
case _: AtomicDeltaOp[A @unchecked] => DeltaGroup(Vector(this, that))
|
||||||
case DeltaGroup(ops) => DeltaGroup(this +: ops)
|
case DeltaGroup(ops) => DeltaGroup(this +: ops)
|
||||||
}
|
}
|
||||||
|
|
||||||
private def concatElementsMap(thatMap: Map[A, Dot]): Map[A, Dot] = {
|
private def concatElementsMap(thatMap: Map[A, Dot]): Map[A, Dot] = {
|
||||||
|
|
@ -72,16 +72,16 @@ object ORSet {
|
||||||
throw new IllegalArgumentException(s"RemoveDeltaOp should contain one removed element, but was $underlying")
|
throw new IllegalArgumentException(s"RemoveDeltaOp should contain one removed element, but was $underlying")
|
||||||
|
|
||||||
override def merge(that: DeltaOp): DeltaOp = that match {
|
override def merge(that: DeltaOp): DeltaOp = that match {
|
||||||
case _: AtomicDeltaOp[A] => DeltaGroup(Vector(this, that)) // keep it simple for removals
|
case _: AtomicDeltaOp[A @unchecked] => DeltaGroup(Vector(this, that)) // keep it simple for removals
|
||||||
case DeltaGroup(ops) => DeltaGroup(this +: ops)
|
case DeltaGroup(ops) => DeltaGroup(this +: ops)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/** INTERNAL API: Used for `clear` but could be used for other cases also */
|
/** INTERNAL API: Used for `clear` but could be used for other cases also */
|
||||||
@InternalApi private[akka] final case class FullStateDeltaOp[A](underlying: ORSet[A]) extends AtomicDeltaOp[A] {
|
@InternalApi private[akka] final case class FullStateDeltaOp[A](underlying: ORSet[A]) extends AtomicDeltaOp[A] {
|
||||||
override def merge(that: DeltaOp): DeltaOp = that match {
|
override def merge(that: DeltaOp): DeltaOp = that match {
|
||||||
case _: AtomicDeltaOp[A] => DeltaGroup(Vector(this, that))
|
case _: AtomicDeltaOp[A @unchecked] => DeltaGroup(Vector(this, that))
|
||||||
case DeltaGroup(ops) => DeltaGroup(this +: ops)
|
case DeltaGroup(ops) => DeltaGroup(this +: ops)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -90,11 +90,11 @@ object ORSet {
|
||||||
*/
|
*/
|
||||||
@InternalApi private[akka] final case class DeltaGroup[A](ops: immutable.IndexedSeq[DeltaOp]) extends DeltaOp {
|
@InternalApi private[akka] final case class DeltaGroup[A](ops: immutable.IndexedSeq[DeltaOp]) extends DeltaOp {
|
||||||
override def merge(that: DeltaOp): DeltaOp = that match {
|
override def merge(that: DeltaOp): DeltaOp = that match {
|
||||||
case thatAdd: AddDeltaOp[A] =>
|
case thatAdd: AddDeltaOp[A @unchecked] =>
|
||||||
// merge AddDeltaOp into last AddDeltaOp in the group, if possible
|
// merge AddDeltaOp into last AddDeltaOp in the group, if possible
|
||||||
ops.last match {
|
ops.last match {
|
||||||
case thisAdd: AddDeltaOp[A] => DeltaGroup(ops.dropRight(1) :+ thisAdd.merge(thatAdd))
|
case thisAdd: AddDeltaOp[A @unchecked] => DeltaGroup(ops.dropRight(1) :+ thisAdd.merge(thatAdd))
|
||||||
case _ => DeltaGroup(ops :+ thatAdd)
|
case _ => DeltaGroup(ops :+ thatAdd)
|
||||||
}
|
}
|
||||||
case DeltaGroup(thatOps) => DeltaGroup(ops ++ thatOps)
|
case DeltaGroup(thatOps) => DeltaGroup(ops ++ thatOps)
|
||||||
case _ => DeltaGroup(ops :+ that)
|
case _ => DeltaGroup(ops :+ that)
|
||||||
|
|
@ -434,15 +434,15 @@ final class ORSet[A] private[akka] (
|
||||||
|
|
||||||
override def applyOperation(thatDelta: ORSet.DeltaOp): ORSet[A] = {
|
override def applyOperation(thatDelta: ORSet.DeltaOp): ORSet[A] = {
|
||||||
thatDelta match {
|
thatDelta match {
|
||||||
case d: ORSet.AddDeltaOp[A] => merge(d.underlying, addDeltaOp = true)
|
case d: ORSet.AddDeltaOp[A @unchecked] => merge(d.underlying, addDeltaOp = true)
|
||||||
case d: ORSet.RemoveDeltaOp[A] => mergeRemoveDelta(d)
|
case d: ORSet.RemoveDeltaOp[A @unchecked] => mergeRemoveDelta(d)
|
||||||
case d: ORSet.FullStateDeltaOp[A] => merge(d.underlying, addDeltaOp = false)
|
case d: ORSet.FullStateDeltaOp[A @unchecked] => merge(d.underlying, addDeltaOp = false)
|
||||||
case ORSet.DeltaGroup(ops) =>
|
case ORSet.DeltaGroup(ops) =>
|
||||||
ops.foldLeft(this) {
|
ops.foldLeft(this) {
|
||||||
case (acc, op: ORSet.AddDeltaOp[A]) => acc.merge(op.underlying, addDeltaOp = true)
|
case (acc, op: ORSet.AddDeltaOp[A @unchecked]) => acc.merge(op.underlying, addDeltaOp = true)
|
||||||
case (acc, op: ORSet.RemoveDeltaOp[A]) => acc.mergeRemoveDelta(op)
|
case (acc, op: ORSet.RemoveDeltaOp[A @unchecked]) => acc.mergeRemoveDelta(op)
|
||||||
case (acc, op: ORSet.FullStateDeltaOp[A]) => acc.merge(op.underlying, addDeltaOp = false)
|
case (acc, op: ORSet.FullStateDeltaOp[A @unchecked]) => acc.merge(op.underlying, addDeltaOp = false)
|
||||||
case (_, _: ORSet.DeltaGroup[A]) =>
|
case (_, _: ORSet.DeltaGroup[A @unchecked]) =>
|
||||||
throw new IllegalArgumentException("ORSet.DeltaGroup should not be nested")
|
throw new IllegalArgumentException("ORSet.DeltaGroup should not be nested")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -95,15 +95,15 @@ private[akka] final class ReplayingEvents[C, E, S](
|
||||||
|
|
||||||
override def onMessage(msg: InternalProtocol): Behavior[InternalProtocol] = {
|
override def onMessage(msg: InternalProtocol): Behavior[InternalProtocol] = {
|
||||||
msg match {
|
msg match {
|
||||||
case JournalResponse(r) => onJournalResponse(r)
|
case JournalResponse(r) => onJournalResponse(r)
|
||||||
case SnapshotterResponse(r) => onSnapshotterResponse(r)
|
case SnapshotterResponse(r) => onSnapshotterResponse(r)
|
||||||
case RecoveryTickEvent(snap) => onRecoveryTick(snap)
|
case RecoveryTickEvent(snap) => onRecoveryTick(snap)
|
||||||
case evt: ReplicatedEventEnvelope[E] => onInternalCommand(evt)
|
case evt: ReplicatedEventEnvelope[E @unchecked] => onInternalCommand(evt)
|
||||||
case pe: PublishedEventImpl => onInternalCommand(pe)
|
case pe: PublishedEventImpl => onInternalCommand(pe)
|
||||||
case cmd: IncomingCommand[C] => onInternalCommand(cmd)
|
case cmd: IncomingCommand[C @unchecked] => onInternalCommand(cmd)
|
||||||
case get: GetState[S @unchecked] => stashInternal(get)
|
case get: GetState[S @unchecked] => stashInternal(get)
|
||||||
case get: GetSeenSequenceNr => stashInternal(get)
|
case get: GetSeenSequenceNr => stashInternal(get)
|
||||||
case RecoveryPermitGranted => Behaviors.unhandled // should not happen, we already have the permit
|
case RecoveryPermitGranted => Behaviors.unhandled // should not happen, we already have the permit
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -58,12 +58,12 @@ private[akka] class ReplayingSnapshot[C, E, S](override val setup: BehaviorSetup
|
||||||
def stay(receivedPoisonPill: Boolean): Behavior[InternalProtocol] = {
|
def stay(receivedPoisonPill: Boolean): Behavior[InternalProtocol] = {
|
||||||
Behaviors
|
Behaviors
|
||||||
.receiveMessage[InternalProtocol] {
|
.receiveMessage[InternalProtocol] {
|
||||||
case SnapshotterResponse(r) => onSnapshotterResponse(r, receivedPoisonPill)
|
case SnapshotterResponse(r) => onSnapshotterResponse(r, receivedPoisonPill)
|
||||||
case JournalResponse(r) => onJournalResponse(r)
|
case JournalResponse(r) => onJournalResponse(r)
|
||||||
case RecoveryTickEvent(snapshot) => onRecoveryTick(snapshot)
|
case RecoveryTickEvent(snapshot) => onRecoveryTick(snapshot)
|
||||||
case evt: ReplicatedEventEnvelope[E] => onReplicatedEvent(evt)
|
case evt: ReplicatedEventEnvelope[E @unchecked] => onReplicatedEvent(evt)
|
||||||
case pe: PublishedEventImpl => onPublishedEvent(pe)
|
case pe: PublishedEventImpl => onPublishedEvent(pe)
|
||||||
case cmd: IncomingCommand[C] =>
|
case cmd: IncomingCommand[C @unchecked] =>
|
||||||
if (receivedPoisonPill) {
|
if (receivedPoisonPill) {
|
||||||
if (setup.settings.logOnStashing)
|
if (setup.settings.logOnStashing)
|
||||||
setup.internalLogger.debug("Discarding message [{}], because actor is to be stopped.", cmd)
|
setup.internalLogger.debug("Discarding message [{}], because actor is to be stopped.", cmd)
|
||||||
|
|
|
||||||
|
|
@ -126,7 +126,7 @@ private[akka] object Running {
|
||||||
val queryPluginId = replicationSetup.allReplicasAndQueryPlugins(replicaId)
|
val queryPluginId = replicationSetup.allReplicasAndQueryPlugins(replicaId)
|
||||||
val replication = query.readJournalFor[EventsByPersistenceIdQuery](queryPluginId)
|
val replication = query.readJournalFor[EventsByPersistenceIdQuery](queryPluginId)
|
||||||
|
|
||||||
implicit val timeout = Timeout(30.seconds)
|
implicit val timeout: Timeout = 30.seconds
|
||||||
implicit val scheduler = setup.context.system.scheduler
|
implicit val scheduler = setup.context.system.scheduler
|
||||||
implicit val ec = setup.context.system.executionContext
|
implicit val ec = setup.context.system.executionContext
|
||||||
|
|
||||||
|
|
@ -907,9 +907,6 @@ private[akka] object Running {
|
||||||
case callback: Callback[_] =>
|
case callback: Callback[_] =>
|
||||||
callback.sideEffect(state.state)
|
callback.sideEffect(state.state)
|
||||||
behavior
|
behavior
|
||||||
|
|
||||||
case _ =>
|
|
||||||
throw new IllegalArgumentException(s"Unsupported side effect detected [${effect.getClass.getName}]")
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -72,10 +72,10 @@ private[akka] class Recovering[C, S](override val setup: BehaviorSetup[C, S])
|
||||||
def stay(receivedPoisonPill: Boolean): Behavior[InternalProtocol] = {
|
def stay(receivedPoisonPill: Boolean): Behavior[InternalProtocol] = {
|
||||||
Behaviors
|
Behaviors
|
||||||
.receiveMessage[InternalProtocol] {
|
.receiveMessage[InternalProtocol] {
|
||||||
case success: GetSuccess[S] => onGetSuccess(success.result, receivedPoisonPill)
|
case success: GetSuccess[S @unchecked] => onGetSuccess(success.result, receivedPoisonPill)
|
||||||
case GetFailure(exc) => onGetFailure(exc)
|
case GetFailure(exc) => onGetFailure(exc)
|
||||||
case RecoveryTimeout => onRecoveryTimeout()
|
case RecoveryTimeout => onRecoveryTimeout()
|
||||||
case cmd: IncomingCommand[C] =>
|
case cmd: IncomingCommand[C @unchecked] =>
|
||||||
if (receivedPoisonPill) {
|
if (receivedPoisonPill) {
|
||||||
if (setup.settings.logOnStashing)
|
if (setup.settings.logOnStashing)
|
||||||
setup.internalLogger.debug("Discarding message [{}], because actor is to be stopped.", cmd)
|
setup.internalLogger.debug("Discarding message [{}], because actor is to be stopped.", cmd)
|
||||||
|
|
|
||||||
|
|
@ -131,7 +131,7 @@ private[akka] object Running {
|
||||||
sideEffects.size)
|
sideEffects.size)
|
||||||
|
|
||||||
effect match {
|
effect match {
|
||||||
case CompositeEffect(eff, currentSideEffects) =>
|
case CompositeEffect(eff, currentSideEffects: Seq[SideEffect[S @unchecked]]) =>
|
||||||
// unwrap and accumulate effects
|
// unwrap and accumulate effects
|
||||||
applyEffects(msg, state, eff, currentSideEffects ++ sideEffects)
|
applyEffects(msg, state, eff, currentSideEffects ++ sideEffects)
|
||||||
|
|
||||||
|
|
@ -274,9 +274,6 @@ private[akka] object Running {
|
||||||
case callback: Callback[_] =>
|
case callback: Callback[_] =>
|
||||||
callback.sideEffect(state.state)
|
callback.sideEffect(state.state)
|
||||||
behavior
|
behavior
|
||||||
|
|
||||||
case _ =>
|
|
||||||
throw new IllegalArgumentException(s"Unsupported side effect detected [${effect.getClass.getName}]")
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -131,8 +131,6 @@ object ShoppingCartBehavior {
|
||||||
Effect.none
|
Effect.none
|
||||||
case Timeout =>
|
case Timeout =>
|
||||||
Effect.persist(CustomerInactive)
|
Effect.persist(CustomerInactive)
|
||||||
case _ =>
|
|
||||||
Effect.none
|
|
||||||
}
|
}
|
||||||
case Inactive(_) =>
|
case Inactive(_) =>
|
||||||
command match {
|
command match {
|
||||||
|
|
@ -164,13 +162,12 @@ object ShoppingCartBehavior {
|
||||||
case ItemAdded(item) => Shopping(cart.addItem(item))
|
case ItemAdded(item) => Shopping(cart.addItem(item))
|
||||||
case _ => la
|
case _ => la
|
||||||
}
|
}
|
||||||
case s @ Shopping(cart) =>
|
case Shopping(cart) =>
|
||||||
event match {
|
event match {
|
||||||
case ItemAdded(item) => Shopping(cart.addItem(item))
|
case ItemAdded(item) => Shopping(cart.addItem(item))
|
||||||
case OrderExecuted => Paid(cart)
|
case OrderExecuted => Paid(cart)
|
||||||
case OrderDiscarded => state // will be stopped
|
case OrderDiscarded => state // will be stopped
|
||||||
case CustomerInactive => Inactive(cart)
|
case CustomerInactive => Inactive(cart)
|
||||||
case _ => s
|
|
||||||
}
|
}
|
||||||
case i @ Inactive(cart) =>
|
case i @ Inactive(cart) =>
|
||||||
event match {
|
event match {
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue