Merge pull request #25692 from akka/wip-25482-then-reply-patriknw

thenReply Effect, #25482
This commit is contained in:
Patrik Nordwall 2018-10-17 16:49:23 +02:00 committed by GitHub
commit 58ec80d4f8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 452 additions and 33 deletions

View file

@ -4,6 +4,9 @@
package akka.cluster.sharding.typed.scaladsl
import scala.concurrent.Future
import akka.Done
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
@ -13,6 +16,7 @@ import akka.cluster.typed.Cluster
import akka.cluster.typed.Join
import akka.persistence.typed.scaladsl.{ Effect, PersistentBehavior }
import akka.actor.testkit.typed.scaladsl.TestProbe
import akka.persistence.typed.ExpectingReply
import akka.persistence.typed.PersistenceId
import com.typesafe.config.ConfigFactory
import org.scalatest.{ WordSpec, WordSpecLike }
@ -41,6 +45,7 @@ object ClusterShardingPersistenceSpec {
sealed trait Command
final case class Add(s: String) extends Command
final case class AddWithConfirmation(s: String)(override val replyTo: ActorRef[Done]) extends Command with ExpectingReply[Done]
final case class Get(replyTo: ActorRef[String]) extends Command
final case object StopPlz extends Command
@ -51,7 +56,13 @@ object ClusterShardingPersistenceSpec {
persistenceId = typeKey.persistenceIdFrom(entityId),
emptyState = "",
commandHandler = (state, cmd) cmd match {
case Add(s) Effect.persist(s)
case Add(s)
Effect.persist(s)
case cmd @ AddWithConfirmation(s)
Effect.persist(s)
.thenReply(cmd)(newState Done)
case Get(replyTo)
replyTo ! s"$entityId:$state"
Effect.none
@ -64,19 +75,17 @@ object ClusterShardingPersistenceSpec {
class ClusterShardingPersistenceSpec extends ScalaTestWithActorTestKit(ClusterShardingPersistenceSpec.config) with WordSpecLike {
import ClusterShardingPersistenceSpec._
val sharding = ClusterSharding(system)
"Typed cluster sharding with persistent actor" must {
Cluster(system).manager ! Join(Cluster(system).selfMember.address)
"start persistent actor" in {
ClusterSharding(system).start(ShardedEntity(
entityId persistentActor(entityId),
typeKey,
StopPlz
))
Cluster(system).manager ! Join(Cluster(system).selfMember.address)
"start persistent actor" in {
val p = TestProbe[String]()
val ref = ClusterSharding(system).entityRefFor(typeKey, "123")
@ -86,5 +95,19 @@ class ClusterShardingPersistenceSpec extends ScalaTestWithActorTestKit(ClusterSh
ref ! Get(p.ref)
p.expectMessage("123:a|b|c")
}
"support ask with thenReply" in {
val p = TestProbe[String]()
val ref = ClusterSharding(system).entityRefFor(typeKey, "456")
val done1 = ref ? AddWithConfirmation("a")
done1.futureValue should ===(Done)
val done2: Future[Done] = ref ? AddWithConfirmation("b")
done2.futureValue should ===(Done)
ref ! Get(p.ref)
p.expectMessage("456:a|b")
}
}
}

View file

@ -248,6 +248,36 @@ Java
Any `SideEffect`s are executed on an at-once basis and will not be executed if the persist fails.
The `SideEffect`s are executed sequentially, it is not possible to execute `SideEffect`s in parallel.
## Replies
The @ref:[Request-Response interaction pattern](interaction-patterns.md#request-response) is very common for
persistent actors, because you typically want to know if the command was rejected due to validation errors and
when accepted you want a confirmation when the events have been successfully stored.
Therefore you typically include a @scala[`ActorRef[ReplyMessageType]`]@java[`ActorRef<ReplyMessageType>`] in the
commands. After validation errors or after persisting events, using a `thenRun` side effect, the reply message can
be sent to the `ActorRef`.
TODO example of thenRun reply
Since this is such a common pattern there is a reply effect for this purpose. It has the nice property that
it can be used to enforce that replies are not forgotten when implementing the `PersistentBehavior`.
If it's defined with @scala[`PersistentBehavior.withEnforcedReplies`]@java[`PersistentBehaviorWithEnforcedReplies`]
there will be compilation errors if the returned effect isn't a `ReplyEffect`, which can be
created with @scala[`Effect.reply`]@java[`Effects().reply`], @scala[`Effect.noReply`]@java[`Effects().noReply`],
@scala[`Effect.thenReply`]@java[`Effects().thenReply`], or @scala[`Effect.thenNoReply`]@java[`Effects().thenNoReply`].
These effects will send the reply message even when @scala[`PersistentBehavior.withEnforcedReplies`]@java[`PersistentBehaviorWithEnforcedReplies`]
is not used, but then there will be no compilation errors if the reply decision is left out.
Note that the `noReply` is a way of making conscious decision that a reply shouldn't be sent for a specific
command or the reply will be sent later, perhaps after some asynchronous interaction with other actors or services.
TODO example of thenReply
When using the reply effect the commands must implement `ExpectingReply` to include the @scala[`ActorRef[ReplyMessageType]`]@java[`ActorRef<ReplyMessageType>`]
in a standardized way.
## Serialization
The same @ref:[serialization](../serialization.md) mechanism as for untyped

View file

@ -0,0 +1,16 @@
/**
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.typed
import akka.actor.typed.ActorRef
/**
* Commands may implement this trait to facilitate sending reply messages via `Effect.thenReply`.
*
* @tparam ReplyMessage The type of the reply message
*/
trait ExpectingReply[ReplyMessage] {
def replyTo: ActorRef[ReplyMessage]
}

View file

@ -4,6 +4,7 @@
package akka.persistence.typed
import akka.actor.typed.ActorRef
import akka.japi.function
import akka.annotation.{ DoNotInherit, InternalApi }
@ -19,7 +20,23 @@ sealed abstract class SideEffect[State]
/** INTERNAL API */
@InternalApi
final private[akka] case class Callback[State](effect: State Unit) extends SideEffect[State]
private[akka] class Callback[State](val sideEffect: State Unit) extends SideEffect[State] {
override def toString: String = "Callback"
}
/** INTERNAL API */
@InternalApi
final private[akka] class ReplyEffectImpl[ReplyMessage, State](replyTo: ActorRef[ReplyMessage], replyWithMessage: State ReplyMessage)
extends Callback[State](state replyTo ! replyWithMessage(state)) {
override def toString: String = "Reply"
}
/** INTERNAL API */
@InternalApi
final private[akka] class NoReplyEffectImpl[State]
extends Callback[State](_ ()) {
override def toString: String = "NoReply"
}
/** INTERNAL API */
@InternalApi
@ -30,7 +47,7 @@ object SideEffect {
* Create a ChainedEffect that can be run after Effects
*/
def apply[State](callback: State Unit): SideEffect[State] =
Callback(callback)
new Callback(callback)
/**
* Java API
@ -38,7 +55,7 @@ object SideEffect {
* Create a ChainedEffect that can be run after Effects
*/
def create[State](callback: function.Procedure[State]): SideEffect[State] =
Callback(s callback.apply(s))
new Callback(s callback.apply(s))
def stop[State](): SideEffect[State] = Stop.asInstanceOf[SideEffect[State]]
}

View file

@ -4,37 +4,41 @@
package akka.persistence.typed.internal
import scala.collection.immutable
import akka.persistence.typed.{ SideEffect, javadsl, scaladsl }
import scala.collection.{ immutable im }
import akka.annotation.InternalApi
import akka.persistence.typed.scaladsl.Effect
import akka.persistence.typed.NoReplyEffectImpl
/** INTERNAL API */
@InternalApi
private[akka] abstract class EffectImpl[+Event, State] extends javadsl.Effect[Event, State] with scaladsl.Effect[Event, State] {
private[akka] abstract class EffectImpl[+Event, State] extends javadsl.ReplyEffect[Event, State] with scaladsl.ReplyEffect[Event, State] {
/* All events that will be persisted in this effect */
override def events: im.Seq[Event] = Nil
override def events: immutable.Seq[Event] = Nil
override def andThen(chainedEffect: SideEffect[State]): EffectImpl[Event, State] =
CompositeEffect(this, chainedEffect)
override def thenNoReply(): EffectImpl[Event, State] =
CompositeEffect(this, new NoReplyEffectImpl[State])
}
/** INTERNAL API */
@InternalApi
private[akka] object CompositeEffect {
def apply[Event, State](effect: Effect[Event, State], sideEffects: SideEffect[State]): EffectImpl[Event, State] =
def apply[Event, State](effect: scaladsl.Effect[Event, State], sideEffects: SideEffect[State]): CompositeEffect[Event, State] =
CompositeEffect[Event, State](effect, sideEffects :: Nil)
}
/** INTERNAL API */
@InternalApi
private[akka] final case class CompositeEffect[Event, State](
persistingEffect: Effect[Event, State],
_sideEffects: im.Seq[SideEffect[State]]) extends EffectImpl[Event, State] {
persistingEffect: scaladsl.Effect[Event, State],
_sideEffects: immutable.Seq[SideEffect[State]]) extends EffectImpl[Event, State] {
override val events = persistingEffect.events
override val events: immutable.Seq[Event] = persistingEffect.events
override def toString: String =
s"CompositeEffect($persistingEffect, sideEffects: ${_sideEffects.size})"
@ -52,7 +56,7 @@ private[akka] case class Persist[Event, State](event: Event) extends EffectImpl[
/** INTERNAL API */
@InternalApi
private[akka] case class PersistAll[Event, State](override val events: im.Seq[Event]) extends EffectImpl[Event, State]
private[akka] case class PersistAll[Event, State](override val events: immutable.Seq[Event]) extends EffectImpl[Event, State]
/** INTERNAL API */
@InternalApi

View file

@ -292,8 +292,8 @@ private[akka] object EventsourcedRunning {
case _: Stop.type @unchecked
Behaviors.stopped
case Callback(sideEffects)
sideEffects(state.state)
case callback: Callback[_]
callback.sideEffect(state.state)
Behaviors.same
case _

View file

@ -8,9 +8,10 @@ import akka.annotation.DoNotInherit
import akka.japi.function
import akka.persistence.typed.internal._
import akka.persistence.typed.{ SideEffect, Stop }
import scala.collection.JavaConverters._
import akka.persistence.typed.ExpectingReply
object EffectFactory extends EffectFactories[Nothing, Nothing, Nothing]
@DoNotInherit sealed class EffectFactories[Command, Event, State] {
@ -40,6 +41,31 @@ object EffectFactory extends EffectFactories[Nothing, Nothing, Nothing]
* This command is not handled, but it is not an error that it isn't.
*/
def unhandled: Effect[Event, State] = Unhandled.asInstanceOf[Effect[Event, State]]
/**
* Send a reply message to the command, which implements [[ExpectingReply]]. The type of the
* reply message must conform to the type specified in [[ExpectingReply.replyTo]] `ActorRef`.
*
* This has the same semantics as `cmd.replyTo.tell`.
*
* It is provided as a convenience (reducing boilerplate) and a way to enforce that replies are not forgotten
* when the `PersistentBehavior` is created with [[PersistentBehaviorWithEnforcedReplies]]. When
* `withEnforcedReplies` is used there will be compilation errors if the returned effect isn't a [[ReplyEffect]].
* The reply message will be sent also if `withEnforcedReplies` isn't used, but then the compiler will not help
* finding mistakes.
*/
def reply[ReplyMessage](cmd: ExpectingReply[ReplyMessage], replyWithMessage: ReplyMessage): ReplyEffect[Event, State] =
none.thenReply[ReplyMessage](cmd, new function.Function[State, ReplyMessage] {
override def apply(param: State): ReplyMessage = replyWithMessage
})
/**
* When [[PersistentBehaviorWithEnforcedReplies]] is used there will be compilation errors if the returned effect
* isn't a [[ReplyEffect]]. This `noReply` can be used as a conscious decision that a reply shouldn't be
* sent for a specific command or the reply will be sent later.
*/
def noReply(): ReplyEffect[Event, State] =
none.thenNoReply()
}
/**
@ -66,4 +92,35 @@ object EffectFactory extends EffectFactories[Nothing, Nothing, Nothing]
final def thenStop(): Effect[Event, State] =
CompositeEffect(this, Stop.asInstanceOf[SideEffect[State]])
/**
* Send a reply message to the command, which implements [[ExpectingReply]]. The type of the
* reply message must conform to the type specified in [[ExpectingReply.replyTo]] `ActorRef`.
*
* This has the same semantics as `cmd.replyTo().tell`.
*
* It is provided as a convenience (reducing boilerplate) and a way to enforce that replies are not forgotten
* when the `PersistentBehavior` is created with [[PersistentBehaviorWithEnforcedReplies]]. When
* `withEnforcedReplies` is used there will be compilation errors if the returned effect isn't a [[ReplyEffect]].
* The reply message will be sent also if `withEnforcedReplies` isn't used, but then the compiler will not help
* finding mistakes.
*/
def thenReply[ReplyMessage](cmd: ExpectingReply[ReplyMessage], replyWithMessage: function.Function[State, ReplyMessage]): ReplyEffect[Event, State] =
CompositeEffect(this, SideEffect[State](newState cmd.replyTo ! replyWithMessage(newState)))
/**
* When [[PersistentBehaviorWithEnforcedReplies]] is used there will be compilation errors if the returned effect
* isn't a [[ReplyEffect]]. This `thenNoReply` can be used as a conscious decision that a reply shouldn't be
* sent for a specific command or the reply will be sent later.
*/
def thenNoReply(): ReplyEffect[Event, State]
}
/**
* [[PersistentBehaviorWithEnforcedReplies]] can be used to enforce that replies are not forgotten.
* Then there will be compilation errors if the returned effect isn't a [[ReplyEffect]], which can be
* created with `Effects().reply`, `Effects().noReply`, [[Effect.thenReply]], or [[Effect.thenNoReply]].
*/
@DoNotInherit abstract class ReplyEffect[+Event, State] extends Effect[Event, State] {
self: EffectImpl[Event, State]
}

View file

@ -16,16 +16,15 @@ import akka.persistence.typed.{ EventAdapter, _ }
import akka.persistence.typed.internal._
import scala.util.{ Failure, Success }
/** Java API */
@ApiMayChange
abstract class PersistentBehavior[Command, Event, State >: Null] private (val persistenceId: PersistenceId, supervisorStrategy: Option[BackoffSupervisorStrategy]) extends DeferredBehavior[Command] {
abstract class PersistentBehavior[Command, Event, State >: Null] private[akka] (val persistenceId: PersistenceId, supervisorStrategy: Optional[BackoffSupervisorStrategy]) extends DeferredBehavior[Command] {
def this(persistenceId: PersistenceId) = {
this(persistenceId, None)
this(persistenceId, Optional.empty[BackoffSupervisorStrategy])
}
def this(persistenceId: PersistenceId, backoffSupervisorStrategy: BackoffSupervisorStrategy) = {
this(persistenceId, Some(backoffSupervisorStrategy))
this(persistenceId, Optional.ofNullable(backoffSupervisorStrategy))
}
/**
@ -169,7 +168,7 @@ abstract class PersistentBehavior[Command, Event, State >: Null] private (val pe
})
}).eventAdapter(eventAdapter())
if (supervisorStrategy.isDefined)
if (supervisorStrategy.isPresent)
behavior.onPersistFailure(supervisorStrategy.get)
else
behavior
@ -177,3 +176,25 @@ abstract class PersistentBehavior[Command, Event, State >: Null] private (val pe
}
/**
* FIXME This is not completed for javadsl yet. The compiler is not enforcing the replies yet.
*
* A [[PersistentBehavior]] that is enforcing that replies to commands are not forgotten.
* There will be compilation errors if the returned effect isn't a [[ReplyEffect]], which can be
* created with `Effects().reply`, `Effects().noReply`, [[Effect.thenReply]], or [[Effect.thenNoReply]].
*/
@ApiMayChange
abstract class PersistentBehaviorWithEnforcedReplies[Command, Event, State >: Null](persistenceId: PersistenceId, backoffSupervisorStrategy: Optional[BackoffSupervisorStrategy])
extends PersistentBehavior[Command, Event, State](persistenceId, backoffSupervisorStrategy) {
def this(persistenceId: PersistenceId) = {
this(persistenceId, Optional.empty[BackoffSupervisorStrategy])
}
def this(persistenceId: PersistenceId, backoffSupervisorStrategy: BackoffSupervisorStrategy) = {
this(persistenceId, Optional.ofNullable(backoffSupervisorStrategy))
}
// FIXME override commandHandler and commandHandlerBuilder to require the ReplyEffect return type,
// which is unfortunately intrusive to the CommandHandlerBuilder
}

View file

@ -4,13 +4,14 @@
package akka.persistence.typed.scaladsl
import akka.japi.function
import akka.annotation.DoNotInherit
import akka.persistence.typed.{ SideEffect, Stop }
import akka.persistence.typed.internal._
import scala.collection.{ immutable im }
import akka.persistence.typed.ExpectingReply
import akka.persistence.typed.ReplyEffectImpl
/**
* Factories for effects - how a persistent actor reacts on a command
*/
@ -58,6 +59,30 @@ object Effect {
* Side effects can be chained with `andThen`
*/
def stop[Event, State]: Effect[Event, State] = none.andThenStop()
/**
* Send a reply message to the command, which implements [[ExpectingReply]]. The type of the
* reply message must conform to the type specified in [[ExpectingReply.replyTo]] `ActorRef`.
*
* This has the same semantics as `cmd.replyTo.tell`.
*
* It is provided as a convenience (reducing boilerplate) and a way to enforce that replies are not forgotten
* when the `PersistentBehavior` is created with [[PersistentBehavior.withEnforcedReplies]]. When
* `withEnforcedReplies` is used there will be compilation errors if the returned effect isn't a [[ReplyEffect]].
* The reply message will be sent also if `withEnforcedReplies` isn't used, but then the compiler will not help
* finding mistakes.
*/
def reply[ReplyMessage, Event, State](cmd: ExpectingReply[ReplyMessage])(replyWithMessage: ReplyMessage): ReplyEffect[Event, State] =
none[Event, State].thenReply[ReplyMessage](cmd)(_ replyWithMessage)
/**
* When [[PersistentBehavior.withEnforcedReplies]] is used there will be compilation errors if the returned effect
* isn't a [[ReplyEffect]]. This `noReply` can be used as a conscious decision that a reply shouldn't be
* sent for a specific command or the reply will be sent later.
*/
def noReply[Event, State]: ReplyEffect[Event, State] =
none.thenNoReply()
}
/**
@ -91,5 +116,37 @@ trait Effect[+Event, State] {
def andThenStop(): Effect[Event, State] = {
CompositeEffect(this, Stop.asInstanceOf[SideEffect[State]])
}
/**
* Send a reply message to the command, which implements [[ExpectingReply]]. The type of the
* reply message must conform to the type specified in [[ExpectingReply.replyTo]] `ActorRef`.
*
* This has the same semantics as `cmd.replyTo.tell`.
*
* It is provided as a convenience (reducing boilerplate) and a way to enforce that replies are not forgotten
* when the `PersistentBehavior` is created with [[PersistentBehavior.withEnforcedReplies]]. When
* `withEnforcedReplies` is used there will be compilation errors if the returned effect isn't a [[ReplyEffect]].
* The reply message will be sent also if `withEnforcedReplies` isn't used, but then the compiler will not help
* finding mistakes.
*/
def thenReply[ReplyMessage](cmd: ExpectingReply[ReplyMessage])(replyWithMessage: State ReplyMessage): ReplyEffect[Event, State] =
CompositeEffect(this, new ReplyEffectImpl[ReplyMessage, State](cmd.replyTo, replyWithMessage))
/**
* When [[PersistentBehavior.withEnforcedReplies]] is used there will be compilation errors if the returned effect
* isn't a [[ReplyEffect]]. This `thenNoReply` can be used as a conscious decision that a reply shouldn't be
* sent for a specific command or the reply will be sent later.
*/
def thenNoReply(): ReplyEffect[Event, State]
}
/**
* [[PersistentBehavior.withEnforcedReplies]] can be used to enforce that replies are not forgotten.
* Then there will be compilation errors if the returned effect isn't a [[ReplyEffect]], which can be
* created with [[Effect.reply]], [[Effect.noReply]], [[Effect.thenReply]], or [[Effect.thenNoReply]].
*
* Not intended for user extension.
*/
@DoNotInherit trait ReplyEffect[+Event, State] extends Effect[Event, State]

View file

@ -13,6 +13,8 @@ import akka.persistence.typed.EventAdapter
import akka.persistence.typed.internal._
import scala.util.Try
import akka.annotation.DoNotInherit
import akka.persistence.typed.ExpectingReply
import akka.persistence.typed.PersistenceId
object PersistentBehavior {
@ -45,6 +47,18 @@ object PersistentBehavior {
eventHandler: (State, Event) State): PersistentBehavior[Command, Event, State] =
PersistentBehaviorImpl(persistenceId, emptyState, commandHandler, eventHandler)
/**
* Create a `Behavior` for a persistent actor that is enforcing that replies to commands are not forgotten.
* Then there will be compilation errors if the returned effect isn't a [[ReplyEffect]], which can be
* created with [[Effect.reply]], [[Effect.noReply]], [[Effect.thenReply]], or [[Effect.thenNoReply]].
*/
def withEnforcedReplies[Command <: ExpectingReply[_], Event, State](
persistenceId: PersistenceId,
emptyState: State,
commandHandler: (State, Command) ReplyEffect[Event, State],
eventHandler: (State, Event) State): PersistentBehavior[Command, Event, State] =
PersistentBehaviorImpl(persistenceId, emptyState, commandHandler, eventHandler)
/**
* The `CommandHandler` defines how to act on commands. A `CommandHandler` is
* a function:
@ -70,7 +84,10 @@ object PersistentBehavior {
}
trait PersistentBehavior[Command, Event, State] extends DeferredBehavior[Command] {
/**
* Not intended for user extension.
*/
@DoNotInherit trait PersistentBehavior[Command, Event, State] extends DeferredBehavior[Command] {
/**
* The `callback` function is called to notify the actor that the recovery process
* is finished.

View file

@ -4,6 +4,7 @@
package akka.persistence.typed.javadsl;
import akka.Done;
import akka.actor.typed.*;
import akka.actor.typed.javadsl.Adapter;
import akka.actor.typed.javadsl.Behaviors;
@ -17,6 +18,7 @@ import akka.persistence.query.PersistenceQuery;
import akka.persistence.query.Sequence;
import akka.persistence.query.journal.leveldb.javadsl.LeveldbReadJournal;
import akka.persistence.typed.EventAdapter;
import akka.persistence.typed.ExpectingReply;
import akka.persistence.typed.NoOpEventAdapter;
import akka.persistence.typed.PersistenceId;
import akka.stream.ActorMaterializer;
@ -85,6 +87,20 @@ public class PersistentActorJavaDslTest extends JUnitSuite {
static class IncrementTwiceAndLog implements Command {
}
public static class IncrementWithConfirmation implements Command, ExpectingReply<Done> {
private final ActorRef<Done> replyTo;
public IncrementWithConfirmation(ActorRef<Done> replyTo) {
this.replyTo = replyTo;
}
@Override
public ActorRef<Done> replyTo() {
return replyTo;
}
}
static class StopThenLog implements Command {
}
@ -251,7 +267,11 @@ public class PersistentActorJavaDslTest extends JUnitSuite {
@Override
public CommandHandler<Command, Incremented, State> commandHandler() {
return commandHandlerBuilder(State.class)
.matchCommand(Increment.class, (state, command) -> Effect().persist(new Incremented(1)))
.matchCommand(Increment.class, (state, command) ->
Effect().persist(new Incremented(1)))
.matchCommand(IncrementWithConfirmation.class, (state, command) ->
Effect().persist(new Incremented(1))
.thenReply(command, newState -> Done.getInstance()))
.matchCommand(GetValue.class, (state, command) -> {
command.replyTo.tell(state);
return Effect().none();
@ -361,6 +381,14 @@ public class PersistentActorJavaDslTest extends JUnitSuite {
probe.expectMessage(new State(4, Arrays.asList(0, 1, 2, 3)));
}
@Test
public void thenReplyEffect() {
ActorRef<Command> c = testKit.spawn(counter(new PersistenceId("c1b")));
TestProbe<Done> probe = testKit.createTestProbe();
c.tell(new IncrementWithConfirmation(probe.ref()));
probe.expectMessage(Done.getInstance());
}
@Test
public void handleTerminatedSignal() {
TestProbe<Pair<State, Incremented>> eventHandlerProbe = testKit.createTestProbe();

View file

@ -0,0 +1,131 @@
/**
* Copyright (C) 2017-2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.typed.scaladsl
import java.util.UUID
import java.util.concurrent.atomic.AtomicInteger
import scala.concurrent.duration._
import scala.util.Success
import scala.util.Try
import akka.Done
import akka.actor.testkit.typed.TestKitSettings
import akka.actor.testkit.typed.scaladsl._
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.ActorRef
import akka.actor.typed.ActorSystem
import akka.actor.typed.Behavior
import akka.actor.typed.SupervisorStrategy
import akka.actor.typed.Terminated
import akka.actor.typed.scaladsl.ActorContext
import akka.persistence.journal.inmem.InmemJournal
import akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal
import akka.persistence.query.EventEnvelope
import akka.persistence.query.PersistenceQuery
import akka.persistence.query.Sequence
import akka.persistence.typed.ExpectingReply
import akka.persistence.typed.PersistenceId
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import org.scalatest.WordSpecLike
object PersistentBehaviorReplySpec {
def conf: Config = ConfigFactory.parseString(
s"""
akka.loglevel = INFO
# akka.persistence.typed.log-stashing = INFO
akka.persistence.journal.leveldb.dir = "target/typed-persistence-${UUID.randomUUID().toString}"
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb"
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
akka.persistence.snapshot-store.local.dir = "target/typed-persistence-${UUID.randomUUID().toString}"
""")
sealed trait Command[ReplyMessage] extends ExpectingReply[ReplyMessage]
final case class IncrementWithConfirmation(override val replyTo: ActorRef[Done]) extends Command[Done]
final case class IncrementReplyLater(override val replyTo: ActorRef[Done]) extends Command[Done]
final case class ReplyNow(override val replyTo: ActorRef[Done]) extends Command[Done]
final case class GetValue(replyTo: ActorRef[State]) extends Command[State]
sealed trait Event
final case class Incremented(delta: Int) extends Event
final case class State(value: Int, history: Vector[Int])
def counter(persistenceId: PersistenceId)(implicit system: ActorSystem[_]): Behavior[Command[_]] =
Behaviors.setup(ctx counter(ctx, persistenceId))
def counter(
ctx: ActorContext[Command[_]],
persistenceId: PersistenceId): PersistentBehavior[Command[_], Event, State] = {
PersistentBehavior.withEnforcedReplies[Command[_], Event, State](
persistenceId,
emptyState = State(0, Vector.empty),
commandHandler = (state, cmd) cmd match {
case cmd: IncrementWithConfirmation
Effect.persist(Incremented(1))
.thenReply(cmd)(_ Done)
case cmd: IncrementReplyLater
Effect.persist(Incremented(1))
.thenRun((_: State) ctx.self ! ReplyNow(cmd.replyTo))
.thenNoReply()
case cmd: ReplyNow
Effect.reply(cmd)(Done)
case query: GetValue
Effect.reply(query)(state)
},
eventHandler = (state, evt) evt match {
case Incremented(delta)
State(state.value + delta, state.history :+ state.value)
})
}
}
class PersistentBehaviorReplySpec extends ScalaTestWithActorTestKit(PersistentBehaviorSpec.conf) with WordSpecLike {
import PersistentBehaviorReplySpec._
val pidCounter = new AtomicInteger(0)
private def nextPid(): PersistenceId = PersistenceId(s"c${pidCounter.incrementAndGet()})")
"A typed persistent actor with commands that are expecting replies" must {
"persist an event thenReply" in {
val c = spawn(counter(nextPid()))
val probe = TestProbe[Done]
c ! IncrementWithConfirmation(probe.ref)
probe.expectMessage(Done)
c ! IncrementWithConfirmation(probe.ref)
c ! IncrementWithConfirmation(probe.ref)
probe.expectMessage(Done)
probe.expectMessage(Done)
}
"persist an event thenReply later" in {
val c = spawn(counter(nextPid()))
val probe = TestProbe[Done]
c ! IncrementReplyLater(probe.ref)
probe.expectMessage(Done)
}
"reply to query command" in {
val c = spawn(counter(nextPid()))
val updateProbe = TestProbe[Done]
c ! IncrementWithConfirmation(updateProbe.ref)
val queryProbe = TestProbe[State]
c ! GetValue(queryProbe.ref)
queryProbe.expectMessage(State(1, Vector(0)))
}
}
}

View file

@ -26,6 +26,7 @@ import scala.concurrent.duration._
import scala.util.{ Success, Try }
import akka.persistence.journal.inmem.InmemJournal
import akka.persistence.typed.ExpectingReply
import akka.persistence.typed.PersistenceId
import org.scalatest.WordSpecLike
@ -81,6 +82,7 @@ object PersistentBehaviorSpec {
final case object IncrementLater extends Command
final case object IncrementAfterReceiveTimeout extends Command
final case object IncrementTwiceAndThenLog extends Command
final case class IncrementWithConfirmation(override val replyTo: ActorRef[Done]) extends Command with ExpectingReply[Done]
final case object DoNothingAndThenLog extends Command
final case object EmptyEventsListAndThenLog extends Command
final case class GetValue(replyTo: ActorRef[State]) extends Command
@ -149,6 +151,10 @@ object PersistentBehaviorSpec {
case IncrementWithPersistAll(n)
Effect.persist((0 until n).map(_ Incremented(1)))
case cmd: IncrementWithConfirmation
Effect.persist(Incremented(1))
.thenReply(cmd)(newState Done)
case GetValue(replyTo)
replyTo ! state
Effect.none
@ -327,6 +333,18 @@ class PersistentBehaviorSpec extends ScalaTestWithActorTestKit(PersistentBehavio
}
"persist an event thenReply" in {
val c = spawn(counter(nextPid))
val probe = TestProbe[Done]
c ! IncrementWithConfirmation(probe.ref)
probe.expectMessage(Done)
c ! IncrementWithConfirmation(probe.ref)
c ! IncrementWithConfirmation(probe.ref)
probe.expectMessage(Done)
probe.expectMessage(Done)
}
/** Proves that side-effects are called when emitting an empty list of events */
"chainable side effects without events" in {
val loggingProbe = TestProbe[String]