Refactor PersistentActorJavaDslTest (#25728)

* Move empty state and increment logic into the State class

* Improve types in tapPersistentActor

* Extract CounterBehavior class

Tests override functionality as needed to make assertions.

* Use singleton enum pattern from Effective Java

This ensures that serialization/deserialization is handled correctly.

* Extract command handler methods from lambdas

* Inline special event values

This makes it easier to understand what it's doing.
This commit is contained in:
Tim Moore 2018-12-19 07:12:28 +10:30 committed by Patrik Nordwall
parent bf3bc4d6e1
commit c2dd5ef759

View file

@ -6,11 +6,10 @@ package akka.persistence.typed.javadsl;
import akka.Done;
import akka.actor.typed.*;
import akka.actor.typed.javadsl.ActorContext;
import akka.actor.typed.javadsl.Adapter;
import akka.actor.typed.javadsl.Behaviors;
import akka.japi.Pair;
import akka.japi.function.Function3;
import akka.japi.function.Function;
import akka.persistence.SnapshotMetadata;
import akka.persistence.query.EventEnvelope;
import akka.persistence.query.NoOffset;
@ -19,7 +18,6 @@ import akka.persistence.query.Sequence;
import akka.persistence.query.journal.leveldb.javadsl.LeveldbReadJournal;
import akka.persistence.typed.EventAdapter;
import akka.persistence.typed.ExpectingReply;
import akka.persistence.typed.NoOpEventAdapter;
import akka.persistence.typed.PersistenceId;
import akka.stream.ActorMaterializer;
import akka.stream.javadsl.Sink;
@ -37,6 +35,7 @@ import java.io.Serializable;
import java.time.Duration;
import java.util.*;
import static akka.Done.done;
import static akka.persistence.typed.scaladsl.EventSourcedBehaviorSpec.*;
import static java.util.Collections.singletonList;
import static org.junit.Assert.assertEquals;
@ -49,10 +48,6 @@ public class PersistentActorJavaDslTest extends JUnitSuite {
public static final TestKitJunitResource testKit = new TestKitJunitResource(config);
static final Incremented timeoutEvent = new Incremented(100);
static final State emptyState = new State(0, Collections.emptyList());
static final Incremented terminatedEvent = new Incremented(10);
private LeveldbReadJournal queries = PersistenceQuery.get(Adapter.toUntyped(testKit.system()))
.getReadJournalFor(LeveldbReadJournal.class, LeveldbReadJournal.Identifier());
@ -61,34 +56,31 @@ public class PersistentActorJavaDslTest extends JUnitSuite {
interface Command extends Serializable {
}
public static class Increment implements Command {
private Increment() {
}
public static Increment instance = new Increment();
public enum Increment implements Command {
INSTANCE
}
public static class Increment100OnTimeout implements Command {
private Increment100OnTimeout() {
}
public static Increment100OnTimeout instance = new Increment100OnTimeout();
public enum Increment100OnTimeout implements Command {
INSTANCE
}
static class IncrementLater implements Command {
public enum IncrementLater implements Command {
INSTANCE
}
static class DelayFinished implements Command {
public enum DelayFinished implements Command {
INSTANCE
}
static class EmptyEventsListAndThenLog implements Command {
public enum EmptyEventsListAndThenLog implements Command {
INSTANCE
}
static class IncrementTwiceAndLog implements Command {
public enum IncrementTwiceAndLog implements Command {
INSTANCE
}
public static class IncrementWithConfirmation implements Command, ExpectingReply<Done> {
private final ActorRef<Done> replyTo;
public IncrementWithConfirmation(ActorRef<Done> replyTo) {
@ -101,18 +93,25 @@ public class PersistentActorJavaDslTest extends JUnitSuite {
}
}
static class StopThenLog implements Command {
public enum StopThenLog implements Command {
INSTANCE
}
public static class Timeout implements Command {
public enum Timeout implements Command {
INSTANCE
}
public static class GetValue implements Command {
public static class GetValue implements Command, ExpectingReply<State> {
private final ActorRef<State> replyTo;
public GetValue(ActorRef<State> replyTo) {
this.replyTo = replyTo;
}
@Override
public ActorRef<State> replyTo() {
return replyTo;
}
}
public static class Incremented implements Serializable {
@ -148,11 +147,19 @@ public class PersistentActorJavaDslTest extends JUnitSuite {
final int value;
final List<Integer> history;
static final State EMPTY = new State(0, Collections.emptyList());
public State(int value, List<Integer> history) {
this.value = value;
this.history = history;
}
State incrementedBy(int delta) {
List<Integer> newHistory = new ArrayList<>(history);
newHistory.add(value);
return new State(value + delta, newHistory);
}
@Override
public String toString() {
return "State{" +
@ -177,188 +184,121 @@ public class PersistentActorJavaDslTest extends JUnitSuite {
}
}
public static class Tick {
private Tick() {
}
public static Tick instance = new Tick();
}
private static String loggingOne = "one";
private Behavior<Command> counter(PersistenceId persistenceId, ActorRef<Pair<State, Incremented>> probe) {
ActorRef<String> loggingProbe = TestProbe.create(String.class, testKit.system()).ref();
ActorRef<Optional<Throwable>> snapshotProbe = TestProbe.<Optional<Throwable>>create(testKit.system()).ref();
return counter(persistenceId, probe, loggingProbe, (s, i, l) -> false, (e) -> Collections.emptySet(), snapshotProbe, new NoOpEventAdapter<>());
}
private Behavior<Command> counter(PersistenceId persistenceId,
ActorRef<Pair<State, Incremented>> probe,
Function<Incremented, Set<String>> tagger) {
ActorRef<String> loggingProbe = TestProbe.create(String.class, testKit.system()).ref();
ActorRef<Optional<Throwable>> snapshotProbe = TestProbe.<Optional<Throwable>>create(testKit.system()).ref();
return counter(persistenceId, probe, loggingProbe, (s, i, l) -> false, tagger, snapshotProbe, new NoOpEventAdapter<>());
}
private Behavior<Command> counter(PersistenceId persistenceId,
ActorRef<Pair<State, Incremented>> probe,
EventAdapter<Incremented, ?> transformer) {
ActorRef<String> loggingProbe = TestProbe.create(String.class, testKit.system()).ref();
ActorRef<Optional<Throwable>> snapshotProbe = TestProbe.<Optional<Throwable>>create(testKit.system()).ref();
return counter(persistenceId, probe, loggingProbe, (s, i, l) -> false, e -> Collections.emptySet(), snapshotProbe, transformer);
public enum Tick {
INSTANCE
}
private Behavior<Command> counter(PersistenceId persistenceId) {
return counter(persistenceId,
TestProbe.<Pair<State, Incremented>>create(testKit.system()).ref(),
TestProbe.<String>create(testKit.system()).ref(),
(s, i, l) -> false,
(i) -> Collections.emptySet(),
TestProbe.<Optional<Throwable>>create(testKit.system()).ref(),
new NoOpEventAdapter<>()
);
return Behaviors.setup(ctx -> new CounterBehavior(persistenceId, ctx));
}
private Behavior<Command> counter(
PersistenceId persistenceId,
Function3<State, Incremented, Long, Boolean> snapshot,
ActorRef<Optional<Throwable>> snapshotProbe
) {
return counter(persistenceId,
testKit.<Pair<State, Incremented>>createTestProbe().ref(),
testKit.<String>createTestProbe().ref(),
snapshot,
e -> Collections.emptySet(),
snapshotProbe,
new NoOpEventAdapter<>());
}
@SuppressWarnings("unused")
private static class CounterBehavior extends EventSourcedBehavior<Command, Incremented, State> {
private final ActorContext<Command> ctx;
private Behavior<Command> counter(
PersistenceId persistentId,
ActorRef<Pair<State, Incremented>> eventProbe,
ActorRef<String> loggingProbe) {
return counter(persistentId, eventProbe, loggingProbe, (s, i, l) -> false, e -> Collections.emptySet(),
TestProbe.<Optional<Throwable>>create(testKit.system()).ref(),
new NoOpEventAdapter<>()
);
}
CounterBehavior(PersistenceId persistentId, ActorContext<Command> ctx) {
super(persistentId, SupervisorStrategy.restartWithBackoff(Duration.ofMillis(1), Duration.ofMillis(5), 0.1));
this.ctx = ctx;
}
private Behavior<Command> counter(
PersistenceId persistentId,
ActorRef<Pair<State, Incremented>> eventProbe,
Function3<State, Incremented, Long, Boolean> snapshot) {
return counter(persistentId, eventProbe, testKit.<String>createTestProbe().ref(), snapshot, (e) -> Collections.emptySet(),
TestProbe.<Optional<Throwable>>create(testKit.system()).ref(), new NoOpEventAdapter<>()
);
}
@Override
public CommandHandler<Command, Incremented, State> commandHandler() {
return commandHandlerBuilder(State.class)
.matchCommand(Increment.class, this::increment)
.matchCommand(IncrementWithConfirmation.class, this::incrementWithConfirmation)
.matchCommand(GetValue.class, this::getValue)
.matchCommand(IncrementLater.class, this::incrementLater)
.matchCommand(DelayFinished.class, this::delayFinished)
.matchCommand(Increment100OnTimeout.class, this::increment100OnTimeout)
.matchCommand(Timeout.class, this::timeout)
.matchCommand(EmptyEventsListAndThenLog.class, this::emptyEventsListAndThenLog)
.matchCommand(StopThenLog.class, this::stopThenLog)
.matchCommand(IncrementTwiceAndLog.class, this::incrementTwiceAndLog)
.build();
private static <A> Behavior<Command> counter(
PersistenceId persistentId,
ActorRef<Pair<State, Incremented>> eventProbe,
ActorRef<String> loggingProbe,
Function3<State, Incremented, Long, Boolean> snapshot,
Function<Incremented, Set<String>> tagsFunction,
ActorRef<Optional<Throwable>> snapshotProbe,
EventAdapter<Incremented, A> transformer) {
}
return Behaviors.setup(ctx -> {
return new EventSourcedBehavior<Command, Incremented, State>(persistentId, SupervisorStrategy.restartWithBackoff(Duration.ofMillis(1), Duration.ofMillis(5), 0.1)) {
@Override
public CommandHandler<Command, Incremented, State> commandHandler() {
return commandHandlerBuilder(State.class)
.matchCommand(Increment.class, (state, command) ->
Effect().persist(new Incremented(1)))
.matchCommand(IncrementWithConfirmation.class, (state, command) ->
Effect().persist(new Incremented(1))
.thenReply(command, newState -> Done.getInstance()))
.matchCommand(GetValue.class, (state, command) -> {
command.replyTo.tell(state);
return Effect().none();
})
.matchCommand(IncrementLater.class, (state, command) -> {
ActorRef<Object> delay = ctx.spawnAnonymous(Behaviors.withTimers(timers -> {
timers.startSingleTimer(Tick.instance, Tick.instance, Duration.ofMillis(10));
return Behaviors.receive((context, o) -> Behaviors.stopped());
}));
ctx.watchWith(delay, new DelayFinished());
return Effect().none();
})
.matchCommand(DelayFinished.class, (state, finished) -> Effect().persist(new Incremented(10)))
.matchCommand(Increment100OnTimeout.class, (state, msg) -> {
ctx.setReceiveTimeout(Duration.ofMillis(10), new Timeout());
return Effect().none();
})
.matchCommand(Timeout.class,
(state, msg) -> Effect().persist(timeoutEvent))
.matchCommand(EmptyEventsListAndThenLog.class, (state, msg) -> Effect().persist(Collections.emptyList())
.thenRun(s -> loggingProbe.tell(loggingOne)))
.matchCommand(StopThenLog.class,
(state, msg) -> Effect().stop()
.thenRun(s -> loggingProbe.tell(loggingOne)))
.matchCommand(IncrementTwiceAndLog.class,
(state, msg) -> Effect().persist(
Arrays.asList(new Incremented(1), new Incremented(1)))
.thenRun(s -> loggingProbe.tell(loggingOne)))
.build();
private Effect<Incremented, State> increment(State state, Increment command) {
return Effect().persist(new Incremented(1));
}
}
private ReplyEffect<Incremented, State> incrementWithConfirmation(State state, IncrementWithConfirmation command) {
return Effect()
.persist(new Incremented(1))
.thenReply(command, newState -> done());
}
@Override
public EventHandler<State, Incremented> eventHandler() {
return eventHandlerBuilder()
.matchEvent(Incremented.class, (state, event) -> {
List<Integer> newHistory = new ArrayList<>(state.history);
newHistory.add(state.value);
eventProbe.tell(Pair.create(state, event));
return new State(state.value + event.delta, newHistory);
})
.build();
private ReplyEffect<Incremented, State> getValue(State state, GetValue command) {
return Effect().reply(command, state);
}
}
private Effect<Incremented, State> incrementLater(State state, IncrementLater command) {
ActorRef<Object> delay = ctx.spawnAnonymous(Behaviors.withTimers(timers -> {
timers.startSingleTimer(Tick.INSTANCE, Tick.INSTANCE, Duration.ofMillis(10));
return Behaviors.receive((context, o) -> Behaviors.stopped());
}));
ctx.watchWith(delay, DelayFinished.INSTANCE);
return Effect().none();
}
@Override
public State emptyState() {
return emptyState;
}
private Effect<Incremented, State> delayFinished(State state, DelayFinished command) {
return Effect().persist(new Incremented(10));
}
@Override
public boolean shouldSnapshot(State state, Incremented event, long sequenceNr) {
try {
return snapshot.apply(state, event, sequenceNr);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private Effect<Incremented, State> increment100OnTimeout(State state, Increment100OnTimeout command) {
ctx.setReceiveTimeout(Duration.ofMillis(10), Timeout.INSTANCE);
return Effect().none();
}
@Override
public Set<String> tagsFor(Incremented incremented) {
try {
return tagsFunction.apply(incremented);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private Effect<Incremented, State> timeout(State state, Timeout command) {
return Effect().persist(new Incremented(100));
}
@Override
public void onSnapshot(SnapshotMetadata meta, Optional<Throwable> result) {
snapshotProbe.tell(result);
}
private Effect<Incremented, State> emptyEventsListAndThenLog(State state, EmptyEventsListAndThenLog command) {
return Effect()
.persist(Collections.emptyList())
.thenRun(s -> log());
}
private Effect<Incremented, State> stopThenLog(State state, StopThenLog command) {
return Effect()
.stop()
.thenRun(s -> log());
}
@Override
public EventAdapter<Incremented, A> eventAdapter() {
return transformer;
}
};
});
private Effect<Incremented, State> incrementTwiceAndLog(State state, IncrementTwiceAndLog command) {
return Effect()
.persist(Arrays.asList(new Incremented(1), new Incremented(1)))
.thenRun(s -> log());
}
@Override
public EventHandler<State, Incremented> eventHandler() {
return eventHandlerBuilder()
.matchEvent(Incremented.class, this::applyIncremented)
.build();
}
@Override
public State emptyState() {
return State.EMPTY;
}
protected State applyIncremented(State state, Incremented event) {
// override to probe for events
return state.incrementedBy(event.delta);
}
protected void log() {
// override to probe for logs
}
}
@Test
public void persistEvents() {
ActorRef<Command> c = testKit.spawn(counter(new PersistenceId("c1")));
TestProbe<State> probe = testKit.createTestProbe();
c.tell(Increment.instance);
c.tell(Increment.INSTANCE);
c.tell(new GetValue(probe.ref()));
probe.expectMessage(new State(1, singletonList(0)));
}
@ -367,16 +307,16 @@ public class PersistentActorJavaDslTest extends JUnitSuite {
public void replyStoredEvents() {
ActorRef<Command> c = testKit.spawn(counter(new PersistenceId("c2")));
TestProbe<State> probe = testKit.createTestProbe();
c.tell(Increment.instance);
c.tell(Increment.instance);
c.tell(Increment.instance);
c.tell(Increment.INSTANCE);
c.tell(Increment.INSTANCE);
c.tell(Increment.INSTANCE);
c.tell(new GetValue(probe.ref()));
probe.expectMessage(new State(3, Arrays.asList(0, 1, 2)));
ActorRef<Command> c2 = testKit.spawn(counter(new PersistenceId("c2")));
c2.tell(new GetValue(probe.ref()));
probe.expectMessage(new State(3, Arrays.asList(0, 1, 2)));
c2.tell(Increment.instance);
c2.tell(Increment.INSTANCE);
c2.tell(new GetValue(probe.ref()));
probe.expectMessage(new State(4, Arrays.asList(0, 1, 2, 3)));
}
@ -392,28 +332,53 @@ public class PersistentActorJavaDslTest extends JUnitSuite {
@Test
public void handleTerminatedSignal() {
TestProbe<Pair<State, Incremented>> eventHandlerProbe = testKit.createTestProbe();
ActorRef<Command> c = testKit.spawn(counter(new PersistenceId("c3"), eventHandlerProbe.ref()));
c.tell(Increment.instance);
c.tell(new IncrementLater());
eventHandlerProbe.expectMessage(Pair.create(emptyState, new Incremented(1)));
eventHandlerProbe.expectMessage(Pair.create(new State(1, Collections.singletonList(0)), terminatedEvent));
Behavior<Command> counter = Behaviors.setup(ctx ->
new CounterBehavior(new PersistenceId("c3"), ctx) {
@Override
protected State applyIncremented(State state, Incremented event) {
eventHandlerProbe.ref().tell(Pair.create(state, event));
return super.applyIncremented(state, event);
}
}
);
ActorRef<Command> c = testKit.spawn(counter);
c.tell(Increment.INSTANCE);
c.tell(IncrementLater.INSTANCE);
eventHandlerProbe.expectMessage(Pair.create(State.EMPTY, new Incremented(1)));
eventHandlerProbe.expectMessage(Pair.create(new State(1, Collections.singletonList(0)), new Incremented(10)));
}
@Test
public void handleReceiveTimeout() {
TestProbe<Pair<State, Incremented>> eventHandlerProbe = testKit.createTestProbe();
ActorRef<Command> c = testKit.spawn(counter(new PersistenceId("c4"), eventHandlerProbe.ref()));
c.tell(new Increment100OnTimeout());
eventHandlerProbe.expectMessage(Pair.create(emptyState, timeoutEvent));
Behavior<Command> counter = Behaviors.setup(ctx ->
new CounterBehavior(new PersistenceId("c4"), ctx) {
@Override
protected State applyIncremented(State state, Incremented event) {
eventHandlerProbe.ref().tell(Pair.create(state, event));
return super.applyIncremented(state, event);
}
}
);
ActorRef<Command> c = testKit.spawn(counter);
c.tell(Increment100OnTimeout.INSTANCE);
eventHandlerProbe.expectMessage(Pair.create(State.EMPTY, new Incremented(100)));
}
@Test
public void chainableSideEffectsWithEvents() {
TestProbe<Pair<State, Incremented>> eventHandlerProbe = testKit.createTestProbe();
TestProbe<String> loggingProbe = testKit.createTestProbe();
ActorRef<Command> c = testKit.spawn(counter(new PersistenceId("c5"), eventHandlerProbe.ref(), loggingProbe.ref()));
c.tell(new EmptyEventsListAndThenLog());
loggingProbe.expectMessage(loggingOne);
Behavior<Command> counter = Behaviors.setup(ctx ->
new CounterBehavior(new PersistenceId("c5"), ctx) {
@Override
protected void log() {
loggingProbe.ref().tell("logged");
}
}
);
ActorRef<Command> c = testKit.spawn(counter);
c.tell(EmptyEventsListAndThenLog.INSTANCE);
loggingProbe.expectMessage("logged");
}
@Test
@ -425,7 +390,7 @@ public class PersistentActorJavaDslTest extends JUnitSuite {
ActorRef<Command> c = testKit.spawn(behavior);
TestProbe<State> probe = testKit.createTestProbe();
c.tell(Increment.instance);
c.tell(Increment.INSTANCE);
c.tell(new GetValue(probe.ref()));
probe.expectMessage(new State(1, singletonList(0)));
}
@ -433,31 +398,51 @@ public class PersistentActorJavaDslTest extends JUnitSuite {
@Test
public void snapshot() {
TestProbe<Optional<Throwable>> snapshotProbe = testKit.createTestProbe();
Behavior<Command> snapshoter = counter(new PersistenceId("c11"), (s, e, l) -> s.value % 2 == 0, snapshotProbe.ref());
Behavior<Command> snapshoter = Behaviors.setup(ctx ->
new CounterBehavior(new PersistenceId("snapshot"), ctx) {
@Override
public boolean shouldSnapshot(State state, Incremented event, long sequenceNr) {
return state.value % 2 == 0;
}
@Override
public void onSnapshot(SnapshotMetadata meta, Optional<Throwable> result) {
snapshotProbe.ref().tell(result);
}
}
);
ActorRef<Command> c = testKit.spawn(snapshoter);
c.tell(Increment.instance);
c.tell(Increment.instance);
c.tell(Increment.INSTANCE);
c.tell(Increment.INSTANCE);
snapshotProbe.expectMessage(Optional.empty());
c.tell(Increment.instance);
TestProbe<State> probe = testKit.createTestProbe();
c.tell(Increment.INSTANCE);
c.tell(new GetValue(probe.ref()));
probe.expectMessage(new State(3, Arrays.asList(0, 1, 2)));
TestProbe<State> stateProbe = testKit.createTestProbe();
c.tell(new GetValue(stateProbe.ref()));
stateProbe.expectMessage(new State(3, Arrays.asList(0, 1, 2)));
TestProbe<Pair<State, Incremented>> eventProbe = testKit.createTestProbe();
snapshoter = counter(new PersistenceId("c11"), eventProbe.ref(), (s, e, l) -> s.value % 2 == 0);
ActorRef<Command> c2 = testKit.spawn(snapshoter);
TestProbe<Pair<State, Incremented>> eventHandlerProbe = testKit.createTestProbe();
Behavior<Command> recovered = Behaviors.setup(ctx ->
new CounterBehavior(new PersistenceId("snapshot"), ctx) {
@Override
protected State applyIncremented(State state, Incremented event) {
eventHandlerProbe.ref().tell(Pair.create(state, event));
return super.applyIncremented(state, event);
}
}
);
ActorRef<Command> c2 = testKit.spawn(recovered);
// First 2 are snapshot
eventProbe.expectMessage(Pair.create(new State(2, Arrays.asList(0, 1)), new Incremented(1)));
c2.tell(new GetValue(probe.ref()));
probe.expectMessage(new State(3, Arrays.asList(0, 1, 2)));
eventHandlerProbe.expectMessage(Pair.create(new State(2, Arrays.asList(0, 1)), new Incremented(1)));
c2.tell(new GetValue(stateProbe.ref()));
stateProbe.expectMessage(new State(3, Arrays.asList(0, 1, 2)));
}
@Test
public void stopThenLog() {
TestProbe<State> probe = testKit.createTestProbe();
ActorRef<Command> c = testKit.spawn(counter(new PersistenceId("c12")));
c.tell(new StopThenLog());
c.tell(StopThenLog.INSTANCE);
probe.expectTerminated(c, Duration.ofSeconds(1));
}
@ -465,31 +450,40 @@ public class PersistentActorJavaDslTest extends JUnitSuite {
public void tapPersistentActor() {
TestProbe<Object> interceptProbe = testKit.createTestProbe();
TestProbe<Signal> signalProbe = testKit.createTestProbe();
BehaviorInterceptor<Object, Object> tap = new BehaviorInterceptor<Object, Object>() {
BehaviorInterceptor<Command, Command> tap = new BehaviorInterceptor<Command, Command>() {
@Override
public Behavior<Object> aroundReceive(ActorContext<Object> ctx, Object msg, ReceiveTarget<Object> target) {
public Behavior<Command> aroundReceive(akka.actor.typed.ActorContext<Command> ctx, Command msg, ReceiveTarget<Command> target) {
interceptProbe.ref().tell(msg);
return target.apply(ctx, msg);
}
@Override
public Behavior<Object> aroundSignal(ActorContext<Object> ctx, Signal signal, SignalTarget<Object> target) {
public Behavior<Command> aroundSignal(akka.actor.typed.ActorContext<Command> ctx, Signal signal, SignalTarget<Command> target) {
signalProbe.ref().tell(signal);
return target.apply(ctx, signal);
}
};
ActorRef<Command> c = testKit.spawn(Behaviors.intercept(tap, ((Behavior)counter(new PersistenceId("tap1")))));
c.tell(Increment.instance);
interceptProbe.expectMessage(Increment.instance);
ActorRef<Command> c = testKit.spawn(Behaviors.intercept(tap, counter(new PersistenceId("tap1"))));
c.tell(Increment.INSTANCE);
interceptProbe.expectMessage(Increment.INSTANCE);
signalProbe.expectNoMessage();
}
@Test
public void tagEvent() throws Exception {
TestProbe<Pair<State, Incremented>> eventProbe = testKit.createTestProbe();
Behavior<Command> tagger = Behaviors.setup(ctx ->
new CounterBehavior(new PersistenceId("tagging"), ctx) {
@Override
public Set<String> tagsFor(Incremented incremented) {
return Sets.newHashSet("tag1", "tag2");
}
}
);
ActorRef<Command> c = testKit.spawn(tagger);
c.tell(Increment.INSTANCE);
TestProbe<State> stateProbe = testKit.createTestProbe();
ActorRef<Command> c = testKit.spawn(counter(new PersistenceId("tagging"), eventProbe.ref(), e -> Sets.newHashSet("tag1", "tag2")));
c.tell(new Increment());
c.tell(new GetValue(stateProbe.ref()));
stateProbe.expectMessage(new State(1, Collections.singletonList(0)));
@ -502,11 +496,19 @@ public class PersistentActorJavaDslTest extends JUnitSuite {
@Test
public void transformEvent() throws Exception {
TestProbe<Pair<State, Incremented>> eventProbe = testKit.createTestProbe();
TestProbe<State> stateProbe = testKit.createTestProbe();
ActorRef<Command> c = testKit.spawn(counter(new PersistenceId("transform"), eventProbe.ref(), new WrapperEventAdapter()));
Behavior<Command> transformer = Behaviors.setup(ctx ->
new CounterBehavior(new PersistenceId("transform"), ctx) {
private final EventAdapter<Incremented, ?> adapter = new WrapperEventAdapter();
public EventAdapter<Incremented, ?> eventAdapter() {
return adapter;
}
}
);
ActorRef<Command> c = testKit.spawn(transformer);
c.tell(new Increment());
c.tell(Increment.INSTANCE);
TestProbe<State> stateProbe = testKit.createTestProbe();
c.tell(new GetValue(stateProbe.ref()));
stateProbe.expectMessage(new State(1, Collections.singletonList(0)));
@ -516,7 +518,7 @@ public class PersistentActorJavaDslTest extends JUnitSuite {
new EventEnvelope(new Sequence(1), "transform", 1, new Wrapper<>(new Incremented(1)))
), events);
ActorRef<Command> c2 = testKit.spawn(counter(new PersistenceId("transform"), eventProbe.ref(), new WrapperEventAdapter()));
ActorRef<Command> c2 = testKit.spawn(transformer);
c2.tell(new GetValue(stateProbe.ref()));
stateProbe.expectMessage(new State(1, Collections.singletonList(0)));
}