From df697175b6e22ceed9e8faa9c5db63380aff3c16 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Thu, 20 Sep 2018 14:59:41 +0200 Subject: [PATCH] ActorContext removed from more signatures #25620 --- .../main/scala/akka/util/ConstantFun.scala | 4 +++ .../EventsourcedReplayingEvents.scala | 5 ++-- .../typed/internal/EventsourcedRunning.scala | 6 ++-- .../typed/internal/EventsourcedSetup.scala | 5 ++-- .../internal/PersistentBehaviorImpl.scala | 30 +++++++++++-------- .../typed/javadsl/PersistentBehavior.scala | 11 +++---- .../typed/scaladsl/PersistentBehaviors.scala | 7 ++--- .../internal/RecoveryPermitterSpec.scala | 7 ++--- .../typed/scaladsl/PerformanceSpec.scala | 4 +-- .../PersistentActorCompileOnlyTest.scala | 15 +++++----- .../PersistentBehaviorFailureSpec.scala | 2 +- .../scaladsl/PersistentBehaviorSpec.scala | 6 ++-- .../BasicPersistentBehaviorsCompileOnly.scala | 4 +-- 13 files changed, 53 insertions(+), 53 deletions(-) diff --git a/akka-actor/src/main/scala/akka/util/ConstantFun.scala b/akka-actor/src/main/scala/akka/util/ConstantFun.scala index df650d5d6e..c73acad637 100644 --- a/akka-actor/src/main/scala/akka/util/ConstantFun.scala +++ b/akka-actor/src/main/scala/akka/util/ConstantFun.scala @@ -12,6 +12,7 @@ import akka.japi.{ Pair ⇒ JPair } * INTERNAL API */ @InternalApi private[akka] object ConstantFun { + private[this] val JavaIdentityFunction = new JFun[Any, Any] { @throws(classOf[Exception]) override def apply(param: Any): Any = param } @@ -27,6 +28,7 @@ import akka.japi.{ Pair ⇒ JPair } def scalaIdentityFunction[T]: T ⇒ T = conforms.asInstanceOf[Function[T, T]] def scalaAnyToNone[A, B]: A ⇒ Option[B] = none + def scalaAnyToUnit[A]: A ⇒ Unit = unit def scalaAnyTwoToNone[A, B, C]: (A, B) ⇒ Option[C] = two2none def scalaAnyTwoToUnit[A, B]: (A, B) ⇒ Unit = two2unit def scalaAnyThreeToUnit[A, B, C]: (A, B, C) ⇒ Unit = three2unit @@ -46,6 +48,8 @@ import akka.japi.{ Pair ⇒ JPair } private val conforms = (a: Any) ⇒ a + private val unit = (_: Any) ⇒ () + private val none = (_: Any) ⇒ None private val two2none = (_: Any, _: Any) ⇒ None diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedReplayingEvents.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedReplayingEvents.scala index ab187e6143..9aff9f8a7a 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedReplayingEvents.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedReplayingEvents.scala @@ -5,7 +5,7 @@ package akka.persistence.typed.internal import akka.actor.typed.Behavior -import akka.actor.typed.scaladsl.{ Behaviors, TimerScheduler } +import akka.actor.typed.scaladsl.Behaviors import akka.annotation.InternalApi import akka.event.Logging import akka.persistence.JournalProtocol._ @@ -13,7 +13,6 @@ import akka.persistence._ import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol._ import akka.persistence.typed.internal.EventsourcedBehavior._ -import scala.concurrent.duration.FiniteDuration import scala.util.control.NonFatal /*** @@ -155,7 +154,7 @@ private[persistence] class EventsourcedReplayingEvents[C, E, S](override val set protected def onRecoveryCompleted(state: ReplayingState[S]): Behavior[InternalProtocol] = try { tryReturnRecoveryPermit("replay completed successfully") - setup.recoveryCompleted(setup.commandContext, state.state) + setup.recoveryCompleted(state.state) val running = EventsourcedRunning[C, E, S]( setup, diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRunning.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRunning.scala index ebba52771c..bf0b31e123 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRunning.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedRunning.scala @@ -70,8 +70,6 @@ private[akka] object EventsourcedRunning { extends EventsourcedJournalInteractions[C, E, S] with EventsourcedStashManagement[C, E, S] { import EventsourcedRunning.EventsourcedState - private def commandContext = setup.commandContext - private val runningCmdsMdc = MDC.create(setup.persistenceId, MDC.RunningCmds) private val persistingEventsMdc = MDC.create(setup.persistenceId, MDC.PersistingEvents) @@ -255,10 +253,10 @@ private[akka] object EventsourcedRunning { outer: Behavior[InternalProtocol]): Behavior[InternalProtocol] = { response match { case SaveSnapshotSuccess(meta) ⇒ - setup.onSnapshot(commandContext, meta, Success(Done)) + setup.onSnapshot(meta, Success(Done)) outer case SaveSnapshotFailure(meta, ex) ⇒ - setup.onSnapshot(commandContext, meta, Failure(ex)) + setup.onSnapshot(meta, Failure(ex)) outer // FIXME not implemented diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedSetup.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedSetup.scala index ff6d6accdd..8db1c54c44 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedSetup.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedSetup.scala @@ -34,8 +34,8 @@ private[persistence] final class EventsourcedSetup[C, E, S]( val commandHandler: PersistentBehaviors.CommandHandler[C, E, S], val eventHandler: PersistentBehaviors.EventHandler[S, E], val writerIdentity: WriterIdentity, - val recoveryCompleted: (ActorContext[C], S) ⇒ Unit, - val onSnapshot: (ActorContext[C], SnapshotMetadata, Try[Done]) ⇒ Unit, + val recoveryCompleted: S ⇒ Unit, + val onSnapshot: (SnapshotMetadata, Try[Done]) ⇒ Unit, val tagger: E ⇒ Set[String], val eventAdapter: EventAdapter[E, _], val snapshotWhen: (S, E, Long) ⇒ Boolean, @@ -46,6 +46,7 @@ private[persistence] final class EventsourcedSetup[C, E, S]( ) { import akka.actor.typed.scaladsl.adapter._ + // FIXME: do we really need it anymore? def commandContext: ActorContext[C] = context.asInstanceOf[ActorContext[C]] val persistence: Persistence = Persistence(context.system.toUntyped) diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/PersistentBehaviorImpl.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/PersistentBehaviorImpl.scala index a5c3c4ba94..d93abc100d 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/PersistentBehaviorImpl.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/PersistentBehaviorImpl.scala @@ -36,15 +36,15 @@ private[akka] final case class PersistentBehaviorImpl[Command, Event, State]( emptyState: State, commandHandler: PersistentBehaviors.CommandHandler[Command, Event, State], eventHandler: PersistentBehaviors.EventHandler[State, Event], - journalPluginId: Option[String] = None, - snapshotPluginId: Option[String] = None, - recoveryCompleted: (ActorContext[Command], State) ⇒ Unit = ConstantFun.scalaAnyTwoToUnit, - tagger: Event ⇒ Set[String] = (_: Event) ⇒ Set.empty[String], - eventAdapter: EventAdapter[Event, Any] = NoOpEventAdapter.instance[Event], - snapshotWhen: (State, Event, Long) ⇒ Boolean = ConstantFun.scalaAnyThreeToFalse, - recovery: Recovery = Recovery(), - supervisionStrategy: SupervisorStrategy = SupervisorStrategy.stop, - onSnapshot: (ActorContext[Command], SnapshotMetadata, Try[Done]) ⇒ Unit = PersistentBehaviorImpl.defaultOnSnapshot[Command] _ + journalPluginId: Option[String] = None, + snapshotPluginId: Option[String] = None, + recoveryCompleted: State ⇒ Unit = ConstantFun.scalaAnyToUnit, + tagger: Event ⇒ Set[String] = (_: Event) ⇒ Set.empty[String], + eventAdapter: EventAdapter[Event, Any] = NoOpEventAdapter.instance[Event], + snapshotWhen: (State, Event, Long) ⇒ Boolean = ConstantFun.scalaAnyThreeToFalse, + recovery: Recovery = Recovery(), + supervisionStrategy: SupervisorStrategy = SupervisorStrategy.stop, + onSnapshot: (SnapshotMetadata, Try[Done]) ⇒ Unit = ConstantFun.scalaAnyTwoToUnit ) extends PersistentBehavior[Command, Event, State] with EventsourcedStashReferenceManagement { override def apply(context: typed.ActorContext[Command]): Behavior[Command] = { @@ -54,6 +54,12 @@ private[akka] final case class PersistentBehaviorImpl[Command, Event, State]( val internalStash = stashBuffer(settings) + // the default impl needs context which isn't available until here, so we + // use the anyTwoToUnit as a marker to use the default + val actualOnSnapshot: (SnapshotMetadata, Try[Done]) ⇒ Unit = + if (onSnapshot == ConstantFun.scalaAnyTwoToUnit) PersistentBehaviorImpl.defaultOnSnapshot[Command](ctx, _, _) + else onSnapshot + val eventsourcedSetup = new EventsourcedSetup( ctx.asInstanceOf[ActorContext[InternalProtocol]], persistenceId, @@ -62,7 +68,7 @@ private[akka] final case class PersistentBehaviorImpl[Command, Event, State]( eventHandler, WriterIdentity.newIdentity(), recoveryCompleted, - onSnapshot, + actualOnSnapshot, tagger, eventAdapter, snapshotWhen, @@ -104,7 +110,7 @@ private[akka] final case class PersistentBehaviorImpl[Command, Event, State]( * The `callback` function is called to notify the actor that the recovery process * is finished. */ - def onRecoveryCompleted(callback: (ActorContext[Command], State) ⇒ Unit): PersistentBehavior[Command, Event, State] = + def onRecoveryCompleted(callback: State ⇒ Unit): PersistentBehavior[Command, Event, State] = copy(recoveryCompleted = callback) /** @@ -171,7 +177,7 @@ private[akka] final case class PersistentBehaviorImpl[Command, Event, State]( /** * The `callback` function is called to notify the actor that a snapshot has finished */ - def onSnapshot(callback: (ActorContext[Command], SnapshotMetadata, Try[Done]) ⇒ Unit): PersistentBehavior[Command, Event, State] = + def onSnapshot(callback: (SnapshotMetadata, Try[Done]) ⇒ Unit): PersistentBehavior[Command, Event, State] = copy(onSnapshot = callback) /** diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/PersistentBehavior.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/PersistentBehavior.scala index 9744705b09..e8e78ccbcf 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/PersistentBehavior.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/PersistentBehavior.scala @@ -10,15 +10,12 @@ import java.util.{ Collections, Optional } import akka.actor.typed import akka.actor.typed.{ BackoffSupervisorStrategy, Behavior } import akka.actor.typed.Behavior.DeferredBehavior -import akka.actor.typed.javadsl.ActorContext import akka.annotation.{ ApiMayChange, InternalApi } import akka.persistence.SnapshotMetadata import akka.persistence.typed.{ EventAdapter, _ } import akka.persistence.typed.internal._ import scala.util.{ Failure, Success } -import akka.japi.pf.FI - /** Java API */ @ApiMayChange abstract class PersistentBehavior[Command, Event, State >: Null] private (val persistenceId: String, supervisorStrategy: Option[BackoffSupervisorStrategy]) extends DeferredBehavior[Command] { @@ -155,15 +152,15 @@ abstract class PersistentBehavior[Command, Event, State >: Null] private (val pe emptyState, (state, cmd) ⇒ commandHandler()(state, cmd).asInstanceOf[EffectImpl[Event, State]], eventHandler()(_, _)) - .onRecoveryCompleted((ctx, state) ⇒ onRecoveryCompleted(state)) + .onRecoveryCompleted(onRecoveryCompleted) .snapshotWhen(snapshotWhen) .withTagger(tagger) - .onSnapshot((ctx, meta, result) ⇒ { + .onSnapshot((meta, result) ⇒ { result match { case Success(_) ⇒ - ctx.log.debug("Save snapshot successful, snapshot metadata: [{}]", meta) + context.asScala.log.debug("Save snapshot successful, snapshot metadata: [{}]", meta) case Failure(e) ⇒ - ctx.log.error(e, "Save snapshot failed, snapshot metadata: [{}]", meta) + context.asScala.log.error(e, "Save snapshot failed, snapshot metadata: [{}]", meta) } onSnapshot(meta, result match { diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/PersistentBehaviors.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/PersistentBehaviors.scala index dcefb25bbc..cca2a0905f 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/PersistentBehaviors.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/PersistentBehaviors.scala @@ -7,7 +7,6 @@ package akka.persistence.typed.scaladsl import akka.Done import akka.actor.typed.BackoffSupervisorStrategy import akka.actor.typed.Behavior.DeferredBehavior -import akka.actor.typed.scaladsl.ActorContext import akka.annotation.InternalApi import akka.persistence._ import akka.persistence.typed.EventAdapter @@ -50,7 +49,7 @@ object PersistentBehaviors { * a function: * * {{{ - * (ActorContext[Command], State, Command) ⇒ Effect[Event, State] + * (State, Command) ⇒ Effect[Event, State] * }}} * * Note that you can have different command handlers based on current state by using @@ -97,12 +96,12 @@ trait PersistentBehavior[Command, Event, State] extends DeferredBehavior[Command * The `callback` function is called to notify the actor that the recovery process * is finished. */ - def onRecoveryCompleted(callback: (ActorContext[Command], State) ⇒ Unit): PersistentBehavior[Command, Event, State] + def onRecoveryCompleted(callback: State ⇒ Unit): PersistentBehavior[Command, Event, State] /** * The `callback` function is called to notify when a snapshot is complete. */ - def onSnapshot(callback: (ActorContext[Command], SnapshotMetadata, Try[Done]) ⇒ Unit): PersistentBehavior[Command, Event, State] + def onSnapshot(callback: (SnapshotMetadata, Try[Done]) ⇒ Unit): PersistentBehavior[Command, Event, State] /** * Initiates a snapshot if the given function returns true. diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/internal/RecoveryPermitterSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/internal/RecoveryPermitterSpec.scala index bfb28f4c10..572545916e 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/internal/RecoveryPermitterSpec.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/internal/RecoveryPermitterSpec.scala @@ -51,10 +51,9 @@ object RecoveryPermitterSpec { case command ⇒ commandProbe.ref ! command; Effect.none }, eventHandler = { (state, event) ⇒ eventProbe.ref ! event; state } - ).onRecoveryCompleted { - case (_, _) ⇒ - eventProbe.ref ! Recovered - if (throwOnRecovery) throw new TE + ).onRecoveryCompleted { _ ⇒ + eventProbe.ref ! Recovered + if (throwOnRecovery) throw new TE } def forwardingBehavior(target: TestProbe[Any]): Behavior[Any] = diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PerformanceSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PerformanceSpec.scala index 2c8d1eee1f..1a17e8473d 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PerformanceSpec.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PerformanceSpec.scala @@ -72,8 +72,8 @@ object PerformanceSpec { eventHandler = { case (state, _) ⇒ state } - ).onRecoveryCompleted { - case (_, _) ⇒ if (parameters.every(1000)) print("r") + ).onRecoveryCompleted { _ ⇒ + if (parameters.every(1000)) print("r") } }).onFailure(SupervisorStrategy.restart) } diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentActorCompileOnlyTest.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentActorCompileOnlyTest.scala index cbfe1a46ab..f35e901bba 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentActorCompileOnlyTest.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentActorCompileOnlyTest.scala @@ -129,12 +129,11 @@ object PersistentActorCompileOnlyTest { dataByCorrelationId = state.dataByCorrelationId + (correlationId → data)) case SideEffectAcknowledged(correlationId) ⇒ state.copy(dataByCorrelationId = state.dataByCorrelationId - correlationId) - }).onRecoveryCompleted { - case (ctx, state) ⇒ - state.dataByCorrelationId.foreach { - case (correlationId, data) ⇒ performSideEffect(ctx.self, correlationId, data) - } - } + }).onRecoveryCompleted(state ⇒ + state.dataByCorrelationId.foreach { + case (correlationId, data) ⇒ performSideEffect(ctx.self, correlationId, data) + } + ) ) } @@ -318,9 +317,9 @@ object PersistentActorCompileOnlyTest { eventHandler = (state, evt) ⇒ evt match { case ItemAdded(id) ⇒ id +: state case ItemRemoved(id) ⇒ state.filter(_ != id) - }).onRecoveryCompleted((ctx, state) ⇒ { + }).onRecoveryCompleted(state ⇒ state.foreach(id ⇒ metadataRegistry ! GetMetaData(id, adapt)) - }) + ) } } diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentBehaviorFailureSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentBehaviorFailureSpec.scala index 4de39c337b..f57e96debd 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentBehaviorFailureSpec.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentBehaviorFailureSpec.scala @@ -76,7 +76,7 @@ class PersistentBehaviorFailureSpec extends ScalaTestWithActorTestKit(Persistent probe.tell(event) state + event } - ).onRecoveryCompleted { (ctx, state) ⇒ + ).onRecoveryCompleted { state ⇒ probe.tell("starting") }.onPersistFailure(SupervisorStrategy.restartWithBackoff(1.milli, 5.millis, 0.1)) diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentBehaviorSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentBehaviorSpec.scala index 34df8e9fa8..7601050553 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentBehaviorSpec.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentBehaviorSpec.scala @@ -209,11 +209,9 @@ object PersistentBehaviorSpec { case Incremented(delta) ⇒ probe ! ((state, evt)) State(state.value + delta, state.history :+ state.value) - }).onRecoveryCompleted { - case (_, _) ⇒ - } + }).onRecoveryCompleted(_ ⇒ ()) .onSnapshot { - case (_, _, result) ⇒ + case (_, result) ⇒ snapshotProbe ! result } } diff --git a/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorsCompileOnly.scala b/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorsCompileOnly.scala index a565059417..ee3abba935 100644 --- a/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorsCompileOnly.scala +++ b/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentBehaviorsCompileOnly.scala @@ -45,7 +45,7 @@ object BasicPersistentBehaviorsCompileOnly { eventHandler = (state, evt) ⇒ throw new RuntimeException("TODO: process the event return the next state") - ).onRecoveryCompleted { (ctx, state) ⇒ + ).onRecoveryCompleted { state ⇒ throw new RuntimeException("TODO: add some end-of-recovery side-effect here") } //#recovery @@ -74,7 +74,7 @@ object BasicPersistentBehaviorsCompileOnly { eventHandler = (state, evt) ⇒ throw new RuntimeException("TODO: process the event return the next state") - ).onRecoveryCompleted { (ctx, state) ⇒ + ).onRecoveryCompleted { state ⇒ throw new RuntimeException("TODO: add some end-of-recovery side-effect here") }