thenReply Effect, #25482

* enforce ReplyEffect
This commit is contained in:
Patrik Nordwall 2018-09-27 13:38:46 +02:00
parent 4131036a12
commit 81c7adf4a1
13 changed files with 452 additions and 33 deletions

View file

@ -0,0 +1,16 @@
/**
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.typed
import akka.actor.typed.ActorRef
/**
* Commands may implement this trait to facilitate sending reply messages via `Effect.thenReply`.
*
* @tparam ReplyMessage The type of the reply message
*/
trait ExpectingReply[ReplyMessage] {
def replyTo: ActorRef[ReplyMessage]
}

View file

@ -4,6 +4,7 @@
package akka.persistence.typed
import akka.actor.typed.ActorRef
import akka.japi.function
import akka.annotation.{ DoNotInherit, InternalApi }
@ -19,7 +20,23 @@ sealed abstract class SideEffect[State]
/** INTERNAL API */
@InternalApi
final private[akka] case class Callback[State](effect: State Unit) extends SideEffect[State]
private[akka] class Callback[State](val sideEffect: State Unit) extends SideEffect[State] {
override def toString: String = "Callback"
}
/** INTERNAL API */
@InternalApi
final private[akka] class ReplyEffectImpl[ReplyMessage, State](replyTo: ActorRef[ReplyMessage], replyWithMessage: State ReplyMessage)
extends Callback[State](state replyTo ! replyWithMessage(state)) {
override def toString: String = "Reply"
}
/** INTERNAL API */
@InternalApi
final private[akka] class NoReplyEffectImpl[State]
extends Callback[State](_ ()) {
override def toString: String = "NoReply"
}
/** INTERNAL API */
@InternalApi
@ -30,7 +47,7 @@ object SideEffect {
* Create a ChainedEffect that can be run after Effects
*/
def apply[State](callback: State Unit): SideEffect[State] =
Callback(callback)
new Callback(callback)
/**
* Java API
@ -38,7 +55,7 @@ object SideEffect {
* Create a ChainedEffect that can be run after Effects
*/
def create[State](callback: function.Procedure[State]): SideEffect[State] =
Callback(s callback.apply(s))
new Callback(s callback.apply(s))
def stop[State](): SideEffect[State] = Stop.asInstanceOf[SideEffect[State]]
}

View file

@ -4,37 +4,41 @@
package akka.persistence.typed.internal
import scala.collection.immutable
import akka.persistence.typed.{ SideEffect, javadsl, scaladsl }
import scala.collection.{ immutable im }
import akka.annotation.InternalApi
import akka.persistence.typed.scaladsl.Effect
import akka.persistence.typed.NoReplyEffectImpl
/** INTERNAL API */
@InternalApi
private[akka] abstract class EffectImpl[+Event, State] extends javadsl.Effect[Event, State] with scaladsl.Effect[Event, State] {
private[akka] abstract class EffectImpl[+Event, State] extends javadsl.ReplyEffect[Event, State] with scaladsl.ReplyEffect[Event, State] {
/* All events that will be persisted in this effect */
override def events: im.Seq[Event] = Nil
override def events: immutable.Seq[Event] = Nil
override def andThen(chainedEffect: SideEffect[State]): EffectImpl[Event, State] =
CompositeEffect(this, chainedEffect)
override def thenNoReply(): EffectImpl[Event, State] =
CompositeEffect(this, new NoReplyEffectImpl[State])
}
/** INTERNAL API */
@InternalApi
private[akka] object CompositeEffect {
def apply[Event, State](effect: Effect[Event, State], sideEffects: SideEffect[State]): EffectImpl[Event, State] =
def apply[Event, State](effect: scaladsl.Effect[Event, State], sideEffects: SideEffect[State]): CompositeEffect[Event, State] =
CompositeEffect[Event, State](effect, sideEffects :: Nil)
}
/** INTERNAL API */
@InternalApi
private[akka] final case class CompositeEffect[Event, State](
persistingEffect: Effect[Event, State],
_sideEffects: im.Seq[SideEffect[State]]) extends EffectImpl[Event, State] {
persistingEffect: scaladsl.Effect[Event, State],
_sideEffects: immutable.Seq[SideEffect[State]]) extends EffectImpl[Event, State] {
override val events = persistingEffect.events
override val events: immutable.Seq[Event] = persistingEffect.events
override def toString: String =
s"CompositeEffect($persistingEffect, sideEffects: ${_sideEffects.size})"
@ -52,7 +56,7 @@ private[akka] case class Persist[Event, State](event: Event) extends EffectImpl[
/** INTERNAL API */
@InternalApi
private[akka] case class PersistAll[Event, State](override val events: im.Seq[Event]) extends EffectImpl[Event, State]
private[akka] case class PersistAll[Event, State](override val events: immutable.Seq[Event]) extends EffectImpl[Event, State]
/** INTERNAL API */
@InternalApi

View file

@ -292,8 +292,8 @@ private[akka] object EventsourcedRunning {
case _: Stop.type @unchecked
Behaviors.stopped
case Callback(sideEffects)
sideEffects(state.state)
case callback: Callback[_]
callback.sideEffect(state.state)
Behaviors.same
case _

View file

@ -8,9 +8,10 @@ import akka.annotation.DoNotInherit
import akka.japi.function
import akka.persistence.typed.internal._
import akka.persistence.typed.{ SideEffect, Stop }
import scala.collection.JavaConverters._
import akka.persistence.typed.ExpectingReply
object EffectFactory extends EffectFactories[Nothing, Nothing, Nothing]
@DoNotInherit sealed class EffectFactories[Command, Event, State] {
@ -40,6 +41,31 @@ object EffectFactory extends EffectFactories[Nothing, Nothing, Nothing]
* This command is not handled, but it is not an error that it isn't.
*/
def unhandled: Effect[Event, State] = Unhandled.asInstanceOf[Effect[Event, State]]
/**
* Send a reply message to the command, which implements [[ExpectingReply]]. The type of the
* reply message must conform to the type specified in [[ExpectingReply.replyTo]] `ActorRef`.
*
* This has the same semantics as `cmd.replyTo.tell`.
*
* It is provided as a convenience (reducing boilerplate) and a way to enforce that replies are not forgotten
* when the `PersistentBehavior` is created with [[PersistentBehaviorWithEnforcedReplies]]. When
* `withEnforcedReplies` is used there will be compilation errors if the returned effect isn't a [[ReplyEffect]].
* The reply message will be sent also if `withEnforcedReplies` isn't used, but then the compiler will not help
* finding mistakes.
*/
def reply[ReplyMessage](cmd: ExpectingReply[ReplyMessage], replyWithMessage: ReplyMessage): ReplyEffect[Event, State] =
none.thenReply[ReplyMessage](cmd, new function.Function[State, ReplyMessage] {
override def apply(param: State): ReplyMessage = replyWithMessage
})
/**
* When [[PersistentBehaviorWithEnforcedReplies]] is used there will be compilation errors if the returned effect
* isn't a [[ReplyEffect]]. This `noReply` can be used as a conscious decision that a reply shouldn't be
* sent for a specific command or the reply will be sent later.
*/
def noReply(): ReplyEffect[Event, State] =
none.thenNoReply()
}
/**
@ -66,4 +92,35 @@ object EffectFactory extends EffectFactories[Nothing, Nothing, Nothing]
final def thenStop(): Effect[Event, State] =
CompositeEffect(this, Stop.asInstanceOf[SideEffect[State]])
/**
* Send a reply message to the command, which implements [[ExpectingReply]]. The type of the
* reply message must conform to the type specified in [[ExpectingReply.replyTo]] `ActorRef`.
*
* This has the same semantics as `cmd.replyTo().tell`.
*
* It is provided as a convenience (reducing boilerplate) and a way to enforce that replies are not forgotten
* when the `PersistentBehavior` is created with [[PersistentBehaviorWithEnforcedReplies]]. When
* `withEnforcedReplies` is used there will be compilation errors if the returned effect isn't a [[ReplyEffect]].
* The reply message will be sent also if `withEnforcedReplies` isn't used, but then the compiler will not help
* finding mistakes.
*/
def thenReply[ReplyMessage](cmd: ExpectingReply[ReplyMessage], replyWithMessage: function.Function[State, ReplyMessage]): ReplyEffect[Event, State] =
CompositeEffect(this, SideEffect[State](newState cmd.replyTo ! replyWithMessage(newState)))
/**
* When [[PersistentBehaviorWithEnforcedReplies]] is used there will be compilation errors if the returned effect
* isn't a [[ReplyEffect]]. This `thenNoReply` can be used as a conscious decision that a reply shouldn't be
* sent for a specific command or the reply will be sent later.
*/
def thenNoReply(): ReplyEffect[Event, State]
}
/**
* [[PersistentBehaviorWithEnforcedReplies]] can be used to enforce that replies are not forgotten.
* Then there will be compilation errors if the returned effect isn't a [[ReplyEffect]], which can be
* created with `Effects().reply`, `Effects().noReply`, [[Effect.thenReply]], or [[Effect.thenNoReply]].
*/
@DoNotInherit abstract class ReplyEffect[+Event, State] extends Effect[Event, State] {
self: EffectImpl[Event, State]
}

View file

@ -16,16 +16,15 @@ import akka.persistence.typed.{ EventAdapter, _ }
import akka.persistence.typed.internal._
import scala.util.{ Failure, Success }
/** Java API */
@ApiMayChange
abstract class PersistentBehavior[Command, Event, State >: Null] private (val persistenceId: PersistenceId, supervisorStrategy: Option[BackoffSupervisorStrategy]) extends DeferredBehavior[Command] {
abstract class PersistentBehavior[Command, Event, State >: Null] private[akka] (val persistenceId: PersistenceId, supervisorStrategy: Optional[BackoffSupervisorStrategy]) extends DeferredBehavior[Command] {
def this(persistenceId: PersistenceId) = {
this(persistenceId, None)
this(persistenceId, Optional.empty[BackoffSupervisorStrategy])
}
def this(persistenceId: PersistenceId, backoffSupervisorStrategy: BackoffSupervisorStrategy) = {
this(persistenceId, Some(backoffSupervisorStrategy))
this(persistenceId, Optional.ofNullable(backoffSupervisorStrategy))
}
/**
@ -169,7 +168,7 @@ abstract class PersistentBehavior[Command, Event, State >: Null] private (val pe
})
}).eventAdapter(eventAdapter())
if (supervisorStrategy.isDefined)
if (supervisorStrategy.isPresent)
behavior.onPersistFailure(supervisorStrategy.get)
else
behavior
@ -177,3 +176,25 @@ abstract class PersistentBehavior[Command, Event, State >: Null] private (val pe
}
/**
* FIXME This is not completed for javadsl yet. The compiler is not enforcing the replies yet.
*
* A [[PersistentBehavior]] that is enforcing that replies to commands are not forgotten.
* There will be compilation errors if the returned effect isn't a [[ReplyEffect]], which can be
* created with `Effects().reply`, `Effects().noReply`, [[Effect.thenReply]], or [[Effect.thenNoReply]].
*/
@ApiMayChange
abstract class PersistentBehaviorWithEnforcedReplies[Command, Event, State >: Null](persistenceId: PersistenceId, backoffSupervisorStrategy: Optional[BackoffSupervisorStrategy])
extends PersistentBehavior[Command, Event, State](persistenceId, backoffSupervisorStrategy) {
def this(persistenceId: PersistenceId) = {
this(persistenceId, Optional.empty[BackoffSupervisorStrategy])
}
def this(persistenceId: PersistenceId, backoffSupervisorStrategy: BackoffSupervisorStrategy) = {
this(persistenceId, Optional.ofNullable(backoffSupervisorStrategy))
}
// FIXME override commandHandler and commandHandlerBuilder to require the ReplyEffect return type,
// which is unfortunately intrusive to the CommandHandlerBuilder
}

View file

@ -4,13 +4,14 @@
package akka.persistence.typed.scaladsl
import akka.japi.function
import akka.annotation.DoNotInherit
import akka.persistence.typed.{ SideEffect, Stop }
import akka.persistence.typed.internal._
import scala.collection.{ immutable im }
import akka.persistence.typed.ExpectingReply
import akka.persistence.typed.ReplyEffectImpl
/**
* Factories for effects - how a persistent actor reacts on a command
*/
@ -58,6 +59,30 @@ object Effect {
* Side effects can be chained with `andThen`
*/
def stop[Event, State]: Effect[Event, State] = none.andThenStop()
/**
* Send a reply message to the command, which implements [[ExpectingReply]]. The type of the
* reply message must conform to the type specified in [[ExpectingReply.replyTo]] `ActorRef`.
*
* This has the same semantics as `cmd.replyTo.tell`.
*
* It is provided as a convenience (reducing boilerplate) and a way to enforce that replies are not forgotten
* when the `PersistentBehavior` is created with [[PersistentBehavior.withEnforcedReplies]]. When
* `withEnforcedReplies` is used there will be compilation errors if the returned effect isn't a [[ReplyEffect]].
* The reply message will be sent also if `withEnforcedReplies` isn't used, but then the compiler will not help
* finding mistakes.
*/
def reply[ReplyMessage, Event, State](cmd: ExpectingReply[ReplyMessage])(replyWithMessage: ReplyMessage): ReplyEffect[Event, State] =
none[Event, State].thenReply[ReplyMessage](cmd)(_ replyWithMessage)
/**
* When [[PersistentBehavior.withEnforcedReplies]] is used there will be compilation errors if the returned effect
* isn't a [[ReplyEffect]]. This `noReply` can be used as a conscious decision that a reply shouldn't be
* sent for a specific command or the reply will be sent later.
*/
def noReply[Event, State]: ReplyEffect[Event, State] =
none.thenNoReply()
}
/**
@ -91,5 +116,37 @@ trait Effect[+Event, State] {
def andThenStop(): Effect[Event, State] = {
CompositeEffect(this, Stop.asInstanceOf[SideEffect[State]])
}
/**
* Send a reply message to the command, which implements [[ExpectingReply]]. The type of the
* reply message must conform to the type specified in [[ExpectingReply.replyTo]] `ActorRef`.
*
* This has the same semantics as `cmd.replyTo.tell`.
*
* It is provided as a convenience (reducing boilerplate) and a way to enforce that replies are not forgotten
* when the `PersistentBehavior` is created with [[PersistentBehavior.withEnforcedReplies]]. When
* `withEnforcedReplies` is used there will be compilation errors if the returned effect isn't a [[ReplyEffect]].
* The reply message will be sent also if `withEnforcedReplies` isn't used, but then the compiler will not help
* finding mistakes.
*/
def thenReply[ReplyMessage](cmd: ExpectingReply[ReplyMessage])(replyWithMessage: State ReplyMessage): ReplyEffect[Event, State] =
CompositeEffect(this, new ReplyEffectImpl[ReplyMessage, State](cmd.replyTo, replyWithMessage))
/**
* When [[PersistentBehavior.withEnforcedReplies]] is used there will be compilation errors if the returned effect
* isn't a [[ReplyEffect]]. This `thenNoReply` can be used as a conscious decision that a reply shouldn't be
* sent for a specific command or the reply will be sent later.
*/
def thenNoReply(): ReplyEffect[Event, State]
}
/**
* [[PersistentBehavior.withEnforcedReplies]] can be used to enforce that replies are not forgotten.
* Then there will be compilation errors if the returned effect isn't a [[ReplyEffect]], which can be
* created with [[Effect.reply]], [[Effect.noReply]], [[Effect.thenReply]], or [[Effect.thenNoReply]].
*
* Not intended for user extension.
*/
@DoNotInherit trait ReplyEffect[+Event, State] extends Effect[Event, State]

View file

@ -13,6 +13,8 @@ import akka.persistence.typed.EventAdapter
import akka.persistence.typed.internal._
import scala.util.Try
import akka.annotation.DoNotInherit
import akka.persistence.typed.ExpectingReply
import akka.persistence.typed.PersistenceId
object PersistentBehavior {
@ -45,6 +47,18 @@ object PersistentBehavior {
eventHandler: (State, Event) State): PersistentBehavior[Command, Event, State] =
PersistentBehaviorImpl(persistenceId, emptyState, commandHandler, eventHandler)
/**
* Create a `Behavior` for a persistent actor that is enforcing that replies to commands are not forgotten.
* Then there will be compilation errors if the returned effect isn't a [[ReplyEffect]], which can be
* created with [[Effect.reply]], [[Effect.noReply]], [[Effect.thenReply]], or [[Effect.thenNoReply]].
*/
def withEnforcedReplies[Command <: ExpectingReply[_], Event, State](
persistenceId: PersistenceId,
emptyState: State,
commandHandler: (State, Command) ReplyEffect[Event, State],
eventHandler: (State, Event) State): PersistentBehavior[Command, Event, State] =
PersistentBehaviorImpl(persistenceId, emptyState, commandHandler, eventHandler)
/**
* The `CommandHandler` defines how to act on commands. A `CommandHandler` is
* a function:
@ -70,7 +84,10 @@ object PersistentBehavior {
}
trait PersistentBehavior[Command, Event, State] extends DeferredBehavior[Command] {
/**
* Not intended for user extension.
*/
@DoNotInherit trait PersistentBehavior[Command, Event, State] extends DeferredBehavior[Command] {
/**
* The `callback` function is called to notify the actor that the recovery process
* is finished.

View file

@ -4,6 +4,7 @@
package akka.persistence.typed.javadsl;
import akka.Done;
import akka.actor.typed.*;
import akka.actor.typed.javadsl.Adapter;
import akka.actor.typed.javadsl.Behaviors;
@ -17,6 +18,7 @@ import akka.persistence.query.PersistenceQuery;
import akka.persistence.query.Sequence;
import akka.persistence.query.journal.leveldb.javadsl.LeveldbReadJournal;
import akka.persistence.typed.EventAdapter;
import akka.persistence.typed.ExpectingReply;
import akka.persistence.typed.NoOpEventAdapter;
import akka.persistence.typed.PersistenceId;
import akka.stream.ActorMaterializer;
@ -85,6 +87,20 @@ public class PersistentActorJavaDslTest extends JUnitSuite {
static class IncrementTwiceAndLog implements Command {
}
public static class IncrementWithConfirmation implements Command, ExpectingReply<Done> {
private final ActorRef<Done> replyTo;
public IncrementWithConfirmation(ActorRef<Done> replyTo) {
this.replyTo = replyTo;
}
@Override
public ActorRef<Done> replyTo() {
return replyTo;
}
}
static class StopThenLog implements Command {
}
@ -251,7 +267,11 @@ public class PersistentActorJavaDslTest extends JUnitSuite {
@Override
public CommandHandler<Command, Incremented, State> commandHandler() {
return commandHandlerBuilder(State.class)
.matchCommand(Increment.class, (state, command) -> Effect().persist(new Incremented(1)))
.matchCommand(Increment.class, (state, command) ->
Effect().persist(new Incremented(1)))
.matchCommand(IncrementWithConfirmation.class, (state, command) ->
Effect().persist(new Incremented(1))
.thenReply(command, newState -> Done.getInstance()))
.matchCommand(GetValue.class, (state, command) -> {
command.replyTo.tell(state);
return Effect().none();
@ -361,6 +381,14 @@ public class PersistentActorJavaDslTest extends JUnitSuite {
probe.expectMessage(new State(4, Arrays.asList(0, 1, 2, 3)));
}
@Test
public void thenReplyEffect() {
ActorRef<Command> c = testKit.spawn(counter(new PersistenceId("c1b")));
TestProbe<Done> probe = testKit.createTestProbe();
c.tell(new IncrementWithConfirmation(probe.ref()));
probe.expectMessage(Done.getInstance());
}
@Test
public void handleTerminatedSignal() {
TestProbe<Pair<State, Incremented>> eventHandlerProbe = testKit.createTestProbe();

View file

@ -0,0 +1,131 @@
/**
* Copyright (C) 2017-2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.typed.scaladsl
import java.util.UUID
import java.util.concurrent.atomic.AtomicInteger
import scala.concurrent.duration._
import scala.util.Success
import scala.util.Try
import akka.Done
import akka.actor.testkit.typed.TestKitSettings
import akka.actor.testkit.typed.scaladsl._
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.ActorRef
import akka.actor.typed.ActorSystem
import akka.actor.typed.Behavior
import akka.actor.typed.SupervisorStrategy
import akka.actor.typed.Terminated
import akka.actor.typed.scaladsl.ActorContext
import akka.persistence.journal.inmem.InmemJournal
import akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal
import akka.persistence.query.EventEnvelope
import akka.persistence.query.PersistenceQuery
import akka.persistence.query.Sequence
import akka.persistence.typed.ExpectingReply
import akka.persistence.typed.PersistenceId
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import org.scalatest.WordSpecLike
object PersistentBehaviorReplySpec {
def conf: Config = ConfigFactory.parseString(
s"""
akka.loglevel = INFO
# akka.persistence.typed.log-stashing = INFO
akka.persistence.journal.leveldb.dir = "target/typed-persistence-${UUID.randomUUID().toString}"
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb"
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
akka.persistence.snapshot-store.local.dir = "target/typed-persistence-${UUID.randomUUID().toString}"
""")
sealed trait Command[ReplyMessage] extends ExpectingReply[ReplyMessage]
final case class IncrementWithConfirmation(override val replyTo: ActorRef[Done]) extends Command[Done]
final case class IncrementReplyLater(override val replyTo: ActorRef[Done]) extends Command[Done]
final case class ReplyNow(override val replyTo: ActorRef[Done]) extends Command[Done]
final case class GetValue(replyTo: ActorRef[State]) extends Command[State]
sealed trait Event
final case class Incremented(delta: Int) extends Event
final case class State(value: Int, history: Vector[Int])
def counter(persistenceId: PersistenceId)(implicit system: ActorSystem[_]): Behavior[Command[_]] =
Behaviors.setup(ctx counter(ctx, persistenceId))
def counter(
ctx: ActorContext[Command[_]],
persistenceId: PersistenceId): PersistentBehavior[Command[_], Event, State] = {
PersistentBehavior.withEnforcedReplies[Command[_], Event, State](
persistenceId,
emptyState = State(0, Vector.empty),
commandHandler = (state, cmd) cmd match {
case cmd: IncrementWithConfirmation
Effect.persist(Incremented(1))
.thenReply(cmd)(_ Done)
case cmd: IncrementReplyLater
Effect.persist(Incremented(1))
.thenRun((_: State) ctx.self ! ReplyNow(cmd.replyTo))
.thenNoReply()
case cmd: ReplyNow
Effect.reply(cmd)(Done)
case query: GetValue
Effect.reply(query)(state)
},
eventHandler = (state, evt) evt match {
case Incremented(delta)
State(state.value + delta, state.history :+ state.value)
})
}
}
class PersistentBehaviorReplySpec extends ScalaTestWithActorTestKit(PersistentBehaviorSpec.conf) with WordSpecLike {
import PersistentBehaviorReplySpec._
val pidCounter = new AtomicInteger(0)
private def nextPid(): PersistenceId = PersistenceId(s"c${pidCounter.incrementAndGet()})")
"A typed persistent actor with commands that are expecting replies" must {
"persist an event thenReply" in {
val c = spawn(counter(nextPid()))
val probe = TestProbe[Done]
c ! IncrementWithConfirmation(probe.ref)
probe.expectMessage(Done)
c ! IncrementWithConfirmation(probe.ref)
c ! IncrementWithConfirmation(probe.ref)
probe.expectMessage(Done)
probe.expectMessage(Done)
}
"persist an event thenReply later" in {
val c = spawn(counter(nextPid()))
val probe = TestProbe[Done]
c ! IncrementReplyLater(probe.ref)
probe.expectMessage(Done)
}
"reply to query command" in {
val c = spawn(counter(nextPid()))
val updateProbe = TestProbe[Done]
c ! IncrementWithConfirmation(updateProbe.ref)
val queryProbe = TestProbe[State]
c ! GetValue(queryProbe.ref)
queryProbe.expectMessage(State(1, Vector(0)))
}
}
}

View file

@ -26,6 +26,7 @@ import scala.concurrent.duration._
import scala.util.{ Success, Try }
import akka.persistence.journal.inmem.InmemJournal
import akka.persistence.typed.ExpectingReply
import akka.persistence.typed.PersistenceId
import org.scalatest.WordSpecLike
@ -81,6 +82,7 @@ object PersistentBehaviorSpec {
final case object IncrementLater extends Command
final case object IncrementAfterReceiveTimeout extends Command
final case object IncrementTwiceAndThenLog extends Command
final case class IncrementWithConfirmation(override val replyTo: ActorRef[Done]) extends Command with ExpectingReply[Done]
final case object DoNothingAndThenLog extends Command
final case object EmptyEventsListAndThenLog extends Command
final case class GetValue(replyTo: ActorRef[State]) extends Command
@ -149,6 +151,10 @@ object PersistentBehaviorSpec {
case IncrementWithPersistAll(n)
Effect.persist((0 until n).map(_ Incremented(1)))
case cmd: IncrementWithConfirmation
Effect.persist(Incremented(1))
.thenReply(cmd)(newState Done)
case GetValue(replyTo)
replyTo ! state
Effect.none
@ -327,6 +333,18 @@ class PersistentBehaviorSpec extends ScalaTestWithActorTestKit(PersistentBehavio
}
"persist an event thenReply" in {
val c = spawn(counter(nextPid))
val probe = TestProbe[Done]
c ! IncrementWithConfirmation(probe.ref)
probe.expectMessage(Done)
c ! IncrementWithConfirmation(probe.ref)
c ! IncrementWithConfirmation(probe.ref)
probe.expectMessage(Done)
probe.expectMessage(Done)
}
/** Proves that side-effects are called when emitting an empty list of events */
"chainable side effects without events" in {
val loggingProbe = TestProbe[String]