ActorContext removed from more signatures #25620

This commit is contained in:
Johan Andrén 2018-09-20 14:59:41 +02:00
parent ad1eca9876
commit df697175b6
13 changed files with 53 additions and 53 deletions

View file

@ -12,6 +12,7 @@ import akka.japi.{ Pair ⇒ JPair }
* INTERNAL API * INTERNAL API
*/ */
@InternalApi private[akka] object ConstantFun { @InternalApi private[akka] object ConstantFun {
private[this] val JavaIdentityFunction = new JFun[Any, Any] { private[this] val JavaIdentityFunction = new JFun[Any, Any] {
@throws(classOf[Exception]) override def apply(param: Any): Any = param @throws(classOf[Exception]) override def apply(param: Any): Any = param
} }
@ -27,6 +28,7 @@ import akka.japi.{ Pair ⇒ JPair }
def scalaIdentityFunction[T]: T T = conforms.asInstanceOf[Function[T, T]] def scalaIdentityFunction[T]: T T = conforms.asInstanceOf[Function[T, T]]
def scalaAnyToNone[A, B]: A Option[B] = none def scalaAnyToNone[A, B]: A Option[B] = none
def scalaAnyToUnit[A]: A Unit = unit
def scalaAnyTwoToNone[A, B, C]: (A, B) Option[C] = two2none def scalaAnyTwoToNone[A, B, C]: (A, B) Option[C] = two2none
def scalaAnyTwoToUnit[A, B]: (A, B) Unit = two2unit def scalaAnyTwoToUnit[A, B]: (A, B) Unit = two2unit
def scalaAnyThreeToUnit[A, B, C]: (A, B, C) Unit = three2unit def scalaAnyThreeToUnit[A, B, C]: (A, B, C) Unit = three2unit
@ -46,6 +48,8 @@ import akka.japi.{ Pair ⇒ JPair }
private val conforms = (a: Any) a private val conforms = (a: Any) a
private val unit = (_: Any) ()
private val none = (_: Any) None private val none = (_: Any) None
private val two2none = (_: Any, _: Any) None private val two2none = (_: Any, _: Any) None

View file

@ -5,7 +5,7 @@
package akka.persistence.typed.internal package akka.persistence.typed.internal
import akka.actor.typed.Behavior import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.{ Behaviors, TimerScheduler } import akka.actor.typed.scaladsl.Behaviors
import akka.annotation.InternalApi import akka.annotation.InternalApi
import akka.event.Logging import akka.event.Logging
import akka.persistence.JournalProtocol._ import akka.persistence.JournalProtocol._
@ -13,7 +13,6 @@ import akka.persistence._
import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol._ import akka.persistence.typed.internal.EventsourcedBehavior.InternalProtocol._
import akka.persistence.typed.internal.EventsourcedBehavior._ import akka.persistence.typed.internal.EventsourcedBehavior._
import scala.concurrent.duration.FiniteDuration
import scala.util.control.NonFatal import scala.util.control.NonFatal
/*** /***
@ -155,7 +154,7 @@ private[persistence] class EventsourcedReplayingEvents[C, E, S](override val set
protected def onRecoveryCompleted(state: ReplayingState[S]): Behavior[InternalProtocol] = try { protected def onRecoveryCompleted(state: ReplayingState[S]): Behavior[InternalProtocol] = try {
tryReturnRecoveryPermit("replay completed successfully") tryReturnRecoveryPermit("replay completed successfully")
setup.recoveryCompleted(setup.commandContext, state.state) setup.recoveryCompleted(state.state)
val running = EventsourcedRunning[C, E, S]( val running = EventsourcedRunning[C, E, S](
setup, setup,

View file

@ -70,8 +70,6 @@ private[akka] object EventsourcedRunning {
extends EventsourcedJournalInteractions[C, E, S] with EventsourcedStashManagement[C, E, S] { extends EventsourcedJournalInteractions[C, E, S] with EventsourcedStashManagement[C, E, S] {
import EventsourcedRunning.EventsourcedState import EventsourcedRunning.EventsourcedState
private def commandContext = setup.commandContext
private val runningCmdsMdc = MDC.create(setup.persistenceId, MDC.RunningCmds) private val runningCmdsMdc = MDC.create(setup.persistenceId, MDC.RunningCmds)
private val persistingEventsMdc = MDC.create(setup.persistenceId, MDC.PersistingEvents) private val persistingEventsMdc = MDC.create(setup.persistenceId, MDC.PersistingEvents)
@ -255,10 +253,10 @@ private[akka] object EventsourcedRunning {
outer: Behavior[InternalProtocol]): Behavior[InternalProtocol] = { outer: Behavior[InternalProtocol]): Behavior[InternalProtocol] = {
response match { response match {
case SaveSnapshotSuccess(meta) case SaveSnapshotSuccess(meta)
setup.onSnapshot(commandContext, meta, Success(Done)) setup.onSnapshot(meta, Success(Done))
outer outer
case SaveSnapshotFailure(meta, ex) case SaveSnapshotFailure(meta, ex)
setup.onSnapshot(commandContext, meta, Failure(ex)) setup.onSnapshot(meta, Failure(ex))
outer outer
// FIXME not implemented // FIXME not implemented

View file

@ -34,8 +34,8 @@ private[persistence] final class EventsourcedSetup[C, E, S](
val commandHandler: PersistentBehaviors.CommandHandler[C, E, S], val commandHandler: PersistentBehaviors.CommandHandler[C, E, S],
val eventHandler: PersistentBehaviors.EventHandler[S, E], val eventHandler: PersistentBehaviors.EventHandler[S, E],
val writerIdentity: WriterIdentity, val writerIdentity: WriterIdentity,
val recoveryCompleted: (ActorContext[C], S) Unit, val recoveryCompleted: S Unit,
val onSnapshot: (ActorContext[C], SnapshotMetadata, Try[Done]) Unit, val onSnapshot: (SnapshotMetadata, Try[Done]) Unit,
val tagger: E Set[String], val tagger: E Set[String],
val eventAdapter: EventAdapter[E, _], val eventAdapter: EventAdapter[E, _],
val snapshotWhen: (S, E, Long) Boolean, val snapshotWhen: (S, E, Long) Boolean,
@ -46,6 +46,7 @@ private[persistence] final class EventsourcedSetup[C, E, S](
) { ) {
import akka.actor.typed.scaladsl.adapter._ import akka.actor.typed.scaladsl.adapter._
// FIXME: do we really need it anymore?
def commandContext: ActorContext[C] = context.asInstanceOf[ActorContext[C]] def commandContext: ActorContext[C] = context.asInstanceOf[ActorContext[C]]
val persistence: Persistence = Persistence(context.system.toUntyped) val persistence: Persistence = Persistence(context.system.toUntyped)

View file

@ -36,15 +36,15 @@ private[akka] final case class PersistentBehaviorImpl[Command, Event, State](
emptyState: State, emptyState: State,
commandHandler: PersistentBehaviors.CommandHandler[Command, Event, State], commandHandler: PersistentBehaviors.CommandHandler[Command, Event, State],
eventHandler: PersistentBehaviors.EventHandler[State, Event], eventHandler: PersistentBehaviors.EventHandler[State, Event],
journalPluginId: Option[String] = None, journalPluginId: Option[String] = None,
snapshotPluginId: Option[String] = None, snapshotPluginId: Option[String] = None,
recoveryCompleted: (ActorContext[Command], State) Unit = ConstantFun.scalaAnyTwoToUnit, recoveryCompleted: State Unit = ConstantFun.scalaAnyToUnit,
tagger: Event Set[String] = (_: Event) Set.empty[String], tagger: Event Set[String] = (_: Event) Set.empty[String],
eventAdapter: EventAdapter[Event, Any] = NoOpEventAdapter.instance[Event], eventAdapter: EventAdapter[Event, Any] = NoOpEventAdapter.instance[Event],
snapshotWhen: (State, Event, Long) Boolean = ConstantFun.scalaAnyThreeToFalse, snapshotWhen: (State, Event, Long) Boolean = ConstantFun.scalaAnyThreeToFalse,
recovery: Recovery = Recovery(), recovery: Recovery = Recovery(),
supervisionStrategy: SupervisorStrategy = SupervisorStrategy.stop, supervisionStrategy: SupervisorStrategy = SupervisorStrategy.stop,
onSnapshot: (ActorContext[Command], SnapshotMetadata, Try[Done]) Unit = PersistentBehaviorImpl.defaultOnSnapshot[Command] _ onSnapshot: (SnapshotMetadata, Try[Done]) Unit = ConstantFun.scalaAnyTwoToUnit
) extends PersistentBehavior[Command, Event, State] with EventsourcedStashReferenceManagement { ) extends PersistentBehavior[Command, Event, State] with EventsourcedStashReferenceManagement {
override def apply(context: typed.ActorContext[Command]): Behavior[Command] = { override def apply(context: typed.ActorContext[Command]): Behavior[Command] = {
@ -54,6 +54,12 @@ private[akka] final case class PersistentBehaviorImpl[Command, Event, State](
val internalStash = stashBuffer(settings) val internalStash = stashBuffer(settings)
// the default impl needs context which isn't available until here, so we
// use the anyTwoToUnit as a marker to use the default
val actualOnSnapshot: (SnapshotMetadata, Try[Done]) Unit =
if (onSnapshot == ConstantFun.scalaAnyTwoToUnit) PersistentBehaviorImpl.defaultOnSnapshot[Command](ctx, _, _)
else onSnapshot
val eventsourcedSetup = new EventsourcedSetup( val eventsourcedSetup = new EventsourcedSetup(
ctx.asInstanceOf[ActorContext[InternalProtocol]], ctx.asInstanceOf[ActorContext[InternalProtocol]],
persistenceId, persistenceId,
@ -62,7 +68,7 @@ private[akka] final case class PersistentBehaviorImpl[Command, Event, State](
eventHandler, eventHandler,
WriterIdentity.newIdentity(), WriterIdentity.newIdentity(),
recoveryCompleted, recoveryCompleted,
onSnapshot, actualOnSnapshot,
tagger, tagger,
eventAdapter, eventAdapter,
snapshotWhen, snapshotWhen,
@ -104,7 +110,7 @@ private[akka] final case class PersistentBehaviorImpl[Command, Event, State](
* The `callback` function is called to notify the actor that the recovery process * The `callback` function is called to notify the actor that the recovery process
* is finished. * is finished.
*/ */
def onRecoveryCompleted(callback: (ActorContext[Command], State) Unit): PersistentBehavior[Command, Event, State] = def onRecoveryCompleted(callback: State Unit): PersistentBehavior[Command, Event, State] =
copy(recoveryCompleted = callback) copy(recoveryCompleted = callback)
/** /**
@ -171,7 +177,7 @@ private[akka] final case class PersistentBehaviorImpl[Command, Event, State](
/** /**
* The `callback` function is called to notify the actor that a snapshot has finished * The `callback` function is called to notify the actor that a snapshot has finished
*/ */
def onSnapshot(callback: (ActorContext[Command], SnapshotMetadata, Try[Done]) Unit): PersistentBehavior[Command, Event, State] = def onSnapshot(callback: (SnapshotMetadata, Try[Done]) Unit): PersistentBehavior[Command, Event, State] =
copy(onSnapshot = callback) copy(onSnapshot = callback)
/** /**

View file

@ -10,15 +10,12 @@ import java.util.{ Collections, Optional }
import akka.actor.typed import akka.actor.typed
import akka.actor.typed.{ BackoffSupervisorStrategy, Behavior } import akka.actor.typed.{ BackoffSupervisorStrategy, Behavior }
import akka.actor.typed.Behavior.DeferredBehavior import akka.actor.typed.Behavior.DeferredBehavior
import akka.actor.typed.javadsl.ActorContext
import akka.annotation.{ ApiMayChange, InternalApi } import akka.annotation.{ ApiMayChange, InternalApi }
import akka.persistence.SnapshotMetadata import akka.persistence.SnapshotMetadata
import akka.persistence.typed.{ EventAdapter, _ } import akka.persistence.typed.{ EventAdapter, _ }
import akka.persistence.typed.internal._ import akka.persistence.typed.internal._
import scala.util.{ Failure, Success } import scala.util.{ Failure, Success }
import akka.japi.pf.FI
/** Java API */ /** Java API */
@ApiMayChange @ApiMayChange
abstract class PersistentBehavior[Command, Event, State >: Null] private (val persistenceId: String, supervisorStrategy: Option[BackoffSupervisorStrategy]) extends DeferredBehavior[Command] { abstract class PersistentBehavior[Command, Event, State >: Null] private (val persistenceId: String, supervisorStrategy: Option[BackoffSupervisorStrategy]) extends DeferredBehavior[Command] {
@ -155,15 +152,15 @@ abstract class PersistentBehavior[Command, Event, State >: Null] private (val pe
emptyState, emptyState,
(state, cmd) commandHandler()(state, cmd).asInstanceOf[EffectImpl[Event, State]], (state, cmd) commandHandler()(state, cmd).asInstanceOf[EffectImpl[Event, State]],
eventHandler()(_, _)) eventHandler()(_, _))
.onRecoveryCompleted((ctx, state) onRecoveryCompleted(state)) .onRecoveryCompleted(onRecoveryCompleted)
.snapshotWhen(snapshotWhen) .snapshotWhen(snapshotWhen)
.withTagger(tagger) .withTagger(tagger)
.onSnapshot((ctx, meta, result) { .onSnapshot((meta, result) {
result match { result match {
case Success(_) case Success(_)
ctx.log.debug("Save snapshot successful, snapshot metadata: [{}]", meta) context.asScala.log.debug("Save snapshot successful, snapshot metadata: [{}]", meta)
case Failure(e) case Failure(e)
ctx.log.error(e, "Save snapshot failed, snapshot metadata: [{}]", meta) context.asScala.log.error(e, "Save snapshot failed, snapshot metadata: [{}]", meta)
} }
onSnapshot(meta, result match { onSnapshot(meta, result match {

View file

@ -7,7 +7,6 @@ package akka.persistence.typed.scaladsl
import akka.Done import akka.Done
import akka.actor.typed.BackoffSupervisorStrategy import akka.actor.typed.BackoffSupervisorStrategy
import akka.actor.typed.Behavior.DeferredBehavior import akka.actor.typed.Behavior.DeferredBehavior
import akka.actor.typed.scaladsl.ActorContext
import akka.annotation.InternalApi import akka.annotation.InternalApi
import akka.persistence._ import akka.persistence._
import akka.persistence.typed.EventAdapter import akka.persistence.typed.EventAdapter
@ -50,7 +49,7 @@ object PersistentBehaviors {
* a function: * a function:
* *
* {{{ * {{{
* (ActorContext[Command], State, Command) Effect[Event, State] * (State, Command) Effect[Event, State]
* }}} * }}}
* *
* Note that you can have different command handlers based on current state by using * Note that you can have different command handlers based on current state by using
@ -97,12 +96,12 @@ trait PersistentBehavior[Command, Event, State] extends DeferredBehavior[Command
* The `callback` function is called to notify the actor that the recovery process * The `callback` function is called to notify the actor that the recovery process
* is finished. * is finished.
*/ */
def onRecoveryCompleted(callback: (ActorContext[Command], State) Unit): PersistentBehavior[Command, Event, State] def onRecoveryCompleted(callback: State Unit): PersistentBehavior[Command, Event, State]
/** /**
* The `callback` function is called to notify when a snapshot is complete. * The `callback` function is called to notify when a snapshot is complete.
*/ */
def onSnapshot(callback: (ActorContext[Command], SnapshotMetadata, Try[Done]) Unit): PersistentBehavior[Command, Event, State] def onSnapshot(callback: (SnapshotMetadata, Try[Done]) Unit): PersistentBehavior[Command, Event, State]
/** /**
* Initiates a snapshot if the given function returns true. * Initiates a snapshot if the given function returns true.

View file

@ -51,10 +51,9 @@ object RecoveryPermitterSpec {
case command commandProbe.ref ! command; Effect.none case command commandProbe.ref ! command; Effect.none
}, },
eventHandler = { (state, event) eventProbe.ref ! event; state } eventHandler = { (state, event) eventProbe.ref ! event; state }
).onRecoveryCompleted { ).onRecoveryCompleted { _
case (_, _) eventProbe.ref ! Recovered
eventProbe.ref ! Recovered if (throwOnRecovery) throw new TE
if (throwOnRecovery) throw new TE
} }
def forwardingBehavior(target: TestProbe[Any]): Behavior[Any] = def forwardingBehavior(target: TestProbe[Any]): Behavior[Any] =

View file

@ -72,8 +72,8 @@ object PerformanceSpec {
eventHandler = { eventHandler = {
case (state, _) state case (state, _) state
} }
).onRecoveryCompleted { ).onRecoveryCompleted { _
case (_, _) if (parameters.every(1000)) print("r") if (parameters.every(1000)) print("r")
} }
}).onFailure(SupervisorStrategy.restart) }).onFailure(SupervisorStrategy.restart)
} }

View file

@ -129,12 +129,11 @@ object PersistentActorCompileOnlyTest {
dataByCorrelationId = state.dataByCorrelationId + (correlationId data)) dataByCorrelationId = state.dataByCorrelationId + (correlationId data))
case SideEffectAcknowledged(correlationId) case SideEffectAcknowledged(correlationId)
state.copy(dataByCorrelationId = state.dataByCorrelationId - correlationId) state.copy(dataByCorrelationId = state.dataByCorrelationId - correlationId)
}).onRecoveryCompleted { }).onRecoveryCompleted(state
case (ctx, state) state.dataByCorrelationId.foreach {
state.dataByCorrelationId.foreach { case (correlationId, data) performSideEffect(ctx.self, correlationId, data)
case (correlationId, data) performSideEffect(ctx.self, correlationId, data) }
} )
}
) )
} }
@ -318,9 +317,9 @@ object PersistentActorCompileOnlyTest {
eventHandler = (state, evt) evt match { eventHandler = (state, evt) evt match {
case ItemAdded(id) id +: state case ItemAdded(id) id +: state
case ItemRemoved(id) state.filter(_ != id) case ItemRemoved(id) state.filter(_ != id)
}).onRecoveryCompleted((ctx, state) { }).onRecoveryCompleted(state
state.foreach(id metadataRegistry ! GetMetaData(id, adapt)) state.foreach(id metadataRegistry ! GetMetaData(id, adapt))
}) )
} }
} }

View file

@ -76,7 +76,7 @@ class PersistentBehaviorFailureSpec extends ScalaTestWithActorTestKit(Persistent
probe.tell(event) probe.tell(event)
state + event state + event
} }
).onRecoveryCompleted { (ctx, state) ).onRecoveryCompleted { state
probe.tell("starting") probe.tell("starting")
}.onPersistFailure(SupervisorStrategy.restartWithBackoff(1.milli, 5.millis, 0.1)) }.onPersistFailure(SupervisorStrategy.restartWithBackoff(1.milli, 5.millis, 0.1))

View file

@ -209,11 +209,9 @@ object PersistentBehaviorSpec {
case Incremented(delta) case Incremented(delta)
probe ! ((state, evt)) probe ! ((state, evt))
State(state.value + delta, state.history :+ state.value) State(state.value + delta, state.history :+ state.value)
}).onRecoveryCompleted { }).onRecoveryCompleted(_ ())
case (_, _)
}
.onSnapshot { .onSnapshot {
case (_, _, result) case (_, result)
snapshotProbe ! result snapshotProbe ! result
} }
} }

View file

@ -45,7 +45,7 @@ object BasicPersistentBehaviorsCompileOnly {
eventHandler = eventHandler =
(state, evt) (state, evt)
throw new RuntimeException("TODO: process the event return the next state") throw new RuntimeException("TODO: process the event return the next state")
).onRecoveryCompleted { (ctx, state) ).onRecoveryCompleted { state
throw new RuntimeException("TODO: add some end-of-recovery side-effect here") throw new RuntimeException("TODO: add some end-of-recovery side-effect here")
} }
//#recovery //#recovery
@ -74,7 +74,7 @@ object BasicPersistentBehaviorsCompileOnly {
eventHandler = eventHandler =
(state, evt) (state, evt)
throw new RuntimeException("TODO: process the event return the next state") throw new RuntimeException("TODO: process the event return the next state")
).onRecoveryCompleted { (ctx, state) ).onRecoveryCompleted { state
throw new RuntimeException("TODO: add some end-of-recovery side-effect here") throw new RuntimeException("TODO: add some end-of-recovery side-effect here")
} }