diff --git a/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorJavaDslTest.java b/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorJavaDslTest.java index 82973a578e..bba31d1b9b 100644 --- a/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorJavaDslTest.java +++ b/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorJavaDslTest.java @@ -6,11 +6,10 @@ package akka.persistence.typed.javadsl; import akka.Done; import akka.actor.typed.*; +import akka.actor.typed.javadsl.ActorContext; import akka.actor.typed.javadsl.Adapter; import akka.actor.typed.javadsl.Behaviors; import akka.japi.Pair; -import akka.japi.function.Function3; -import akka.japi.function.Function; import akka.persistence.SnapshotMetadata; import akka.persistence.query.EventEnvelope; import akka.persistence.query.NoOffset; @@ -19,7 +18,6 @@ import akka.persistence.query.Sequence; import akka.persistence.query.journal.leveldb.javadsl.LeveldbReadJournal; import akka.persistence.typed.EventAdapter; import akka.persistence.typed.ExpectingReply; -import akka.persistence.typed.NoOpEventAdapter; import akka.persistence.typed.PersistenceId; import akka.stream.ActorMaterializer; import akka.stream.javadsl.Sink; @@ -37,6 +35,7 @@ import java.io.Serializable; import java.time.Duration; import java.util.*; +import static akka.Done.done; import static akka.persistence.typed.scaladsl.EventSourcedBehaviorSpec.*; import static java.util.Collections.singletonList; import static org.junit.Assert.assertEquals; @@ -49,10 +48,6 @@ public class PersistentActorJavaDslTest extends JUnitSuite { public static final TestKitJunitResource testKit = new TestKitJunitResource(config); - static final Incremented timeoutEvent = new Incremented(100); - static final State emptyState = new State(0, Collections.emptyList()); - static final Incremented terminatedEvent = new Incremented(10); - private LeveldbReadJournal queries = PersistenceQuery.get(Adapter.toUntyped(testKit.system())) .getReadJournalFor(LeveldbReadJournal.class, LeveldbReadJournal.Identifier()); @@ -61,34 +56,31 @@ public class PersistentActorJavaDslTest extends JUnitSuite { interface Command extends Serializable { } - public static class Increment implements Command { - private Increment() { - } - - public static Increment instance = new Increment(); + public enum Increment implements Command { + INSTANCE } - public static class Increment100OnTimeout implements Command { - private Increment100OnTimeout() { - } - - public static Increment100OnTimeout instance = new Increment100OnTimeout(); + public enum Increment100OnTimeout implements Command { + INSTANCE } - static class IncrementLater implements Command { + public enum IncrementLater implements Command { + INSTANCE } - static class DelayFinished implements Command { + public enum DelayFinished implements Command { + INSTANCE } - static class EmptyEventsListAndThenLog implements Command { + public enum EmptyEventsListAndThenLog implements Command { + INSTANCE } - static class IncrementTwiceAndLog implements Command { + public enum IncrementTwiceAndLog implements Command { + INSTANCE } public static class IncrementWithConfirmation implements Command, ExpectingReply { - private final ActorRef replyTo; public IncrementWithConfirmation(ActorRef replyTo) { @@ -101,18 +93,25 @@ public class PersistentActorJavaDslTest extends JUnitSuite { } } - static class StopThenLog implements Command { + public enum StopThenLog implements Command { + INSTANCE } - public static class Timeout implements Command { + public enum Timeout implements Command { + INSTANCE } - public static class GetValue implements Command { + public static class GetValue implements Command, ExpectingReply { private final ActorRef replyTo; public GetValue(ActorRef replyTo) { this.replyTo = replyTo; } + + @Override + public ActorRef replyTo() { + return replyTo; + } } public static class Incremented implements Serializable { @@ -148,11 +147,19 @@ public class PersistentActorJavaDslTest extends JUnitSuite { final int value; final List history; + static final State EMPTY = new State(0, Collections.emptyList()); + public State(int value, List history) { this.value = value; this.history = history; } + State incrementedBy(int delta) { + List newHistory = new ArrayList<>(history); + newHistory.add(value); + return new State(value + delta, newHistory); + } + @Override public String toString() { return "State{" + @@ -177,188 +184,121 @@ public class PersistentActorJavaDslTest extends JUnitSuite { } } - public static class Tick { - private Tick() { - } - - public static Tick instance = new Tick(); - } - - private static String loggingOne = "one"; - - - private Behavior counter(PersistenceId persistenceId, ActorRef> probe) { - ActorRef loggingProbe = TestProbe.create(String.class, testKit.system()).ref(); - ActorRef> snapshotProbe = TestProbe.>create(testKit.system()).ref(); - return counter(persistenceId, probe, loggingProbe, (s, i, l) -> false, (e) -> Collections.emptySet(), snapshotProbe, new NoOpEventAdapter<>()); - } - - private Behavior counter(PersistenceId persistenceId, - ActorRef> probe, - Function> tagger) { - ActorRef loggingProbe = TestProbe.create(String.class, testKit.system()).ref(); - ActorRef> snapshotProbe = TestProbe.>create(testKit.system()).ref(); - return counter(persistenceId, probe, loggingProbe, (s, i, l) -> false, tagger, snapshotProbe, new NoOpEventAdapter<>()); - } - - private Behavior counter(PersistenceId persistenceId, - ActorRef> probe, - EventAdapter transformer) { - ActorRef loggingProbe = TestProbe.create(String.class, testKit.system()).ref(); - ActorRef> snapshotProbe = TestProbe.>create(testKit.system()).ref(); - return counter(persistenceId, probe, loggingProbe, (s, i, l) -> false, e -> Collections.emptySet(), snapshotProbe, transformer); + public enum Tick { + INSTANCE } private Behavior counter(PersistenceId persistenceId) { - return counter(persistenceId, - TestProbe.>create(testKit.system()).ref(), - TestProbe.create(testKit.system()).ref(), - (s, i, l) -> false, - (i) -> Collections.emptySet(), - TestProbe.>create(testKit.system()).ref(), - new NoOpEventAdapter<>() - ); + return Behaviors.setup(ctx -> new CounterBehavior(persistenceId, ctx)); } - private Behavior counter( - PersistenceId persistenceId, - Function3 snapshot, - ActorRef> snapshotProbe - ) { - return counter(persistenceId, - testKit.>createTestProbe().ref(), - testKit.createTestProbe().ref(), - snapshot, - e -> Collections.emptySet(), - snapshotProbe, - new NoOpEventAdapter<>()); - } + @SuppressWarnings("unused") + private static class CounterBehavior extends EventSourcedBehavior { + private final ActorContext ctx; - private Behavior counter( - PersistenceId persistentId, - ActorRef> eventProbe, - ActorRef loggingProbe) { - return counter(persistentId, eventProbe, loggingProbe, (s, i, l) -> false, e -> Collections.emptySet(), - TestProbe.>create(testKit.system()).ref(), - new NoOpEventAdapter<>() - ); - } + CounterBehavior(PersistenceId persistentId, ActorContext ctx) { + super(persistentId, SupervisorStrategy.restartWithBackoff(Duration.ofMillis(1), Duration.ofMillis(5), 0.1)); + this.ctx = ctx; + } - private Behavior counter( - PersistenceId persistentId, - ActorRef> eventProbe, - Function3 snapshot) { - return counter(persistentId, eventProbe, testKit.createTestProbe().ref(), snapshot, (e) -> Collections.emptySet(), - TestProbe.>create(testKit.system()).ref(), new NoOpEventAdapter<>() - ); - } + @Override + public CommandHandler commandHandler() { + return commandHandlerBuilder(State.class) + .matchCommand(Increment.class, this::increment) + .matchCommand(IncrementWithConfirmation.class, this::incrementWithConfirmation) + .matchCommand(GetValue.class, this::getValue) + .matchCommand(IncrementLater.class, this::incrementLater) + .matchCommand(DelayFinished.class, this::delayFinished) + .matchCommand(Increment100OnTimeout.class, this::increment100OnTimeout) + .matchCommand(Timeout.class, this::timeout) + .matchCommand(EmptyEventsListAndThenLog.class, this::emptyEventsListAndThenLog) + .matchCommand(StopThenLog.class, this::stopThenLog) + .matchCommand(IncrementTwiceAndLog.class, this::incrementTwiceAndLog) + .build(); - private static Behavior counter( - PersistenceId persistentId, - ActorRef> eventProbe, - ActorRef loggingProbe, - Function3 snapshot, - Function> tagsFunction, - ActorRef> snapshotProbe, - EventAdapter transformer) { + } - return Behaviors.setup(ctx -> { - return new EventSourcedBehavior(persistentId, SupervisorStrategy.restartWithBackoff(Duration.ofMillis(1), Duration.ofMillis(5), 0.1)) { - @Override - public CommandHandler commandHandler() { - return commandHandlerBuilder(State.class) - .matchCommand(Increment.class, (state, command) -> - Effect().persist(new Incremented(1))) - .matchCommand(IncrementWithConfirmation.class, (state, command) -> - Effect().persist(new Incremented(1)) - .thenReply(command, newState -> Done.getInstance())) - .matchCommand(GetValue.class, (state, command) -> { - command.replyTo.tell(state); - return Effect().none(); - }) - .matchCommand(IncrementLater.class, (state, command) -> { - ActorRef delay = ctx.spawnAnonymous(Behaviors.withTimers(timers -> { - timers.startSingleTimer(Tick.instance, Tick.instance, Duration.ofMillis(10)); - return Behaviors.receive((context, o) -> Behaviors.stopped()); - })); - ctx.watchWith(delay, new DelayFinished()); - return Effect().none(); - }) - .matchCommand(DelayFinished.class, (state, finished) -> Effect().persist(new Incremented(10))) - .matchCommand(Increment100OnTimeout.class, (state, msg) -> { - ctx.setReceiveTimeout(Duration.ofMillis(10), new Timeout()); - return Effect().none(); - }) - .matchCommand(Timeout.class, - (state, msg) -> Effect().persist(timeoutEvent)) - .matchCommand(EmptyEventsListAndThenLog.class, (state, msg) -> Effect().persist(Collections.emptyList()) - .thenRun(s -> loggingProbe.tell(loggingOne))) - .matchCommand(StopThenLog.class, - (state, msg) -> Effect().stop() - .thenRun(s -> loggingProbe.tell(loggingOne))) - .matchCommand(IncrementTwiceAndLog.class, - (state, msg) -> Effect().persist( - Arrays.asList(new Incremented(1), new Incremented(1))) - .thenRun(s -> loggingProbe.tell(loggingOne))) - .build(); + private Effect increment(State state, Increment command) { + return Effect().persist(new Incremented(1)); + } - } + private ReplyEffect incrementWithConfirmation(State state, IncrementWithConfirmation command) { + return Effect() + .persist(new Incremented(1)) + .thenReply(command, newState -> done()); + } - @Override - public EventHandler eventHandler() { - return eventHandlerBuilder() - .matchEvent(Incremented.class, (state, event) -> { - List newHistory = new ArrayList<>(state.history); - newHistory.add(state.value); - eventProbe.tell(Pair.create(state, event)); - return new State(state.value + event.delta, newHistory); - }) - .build(); + private ReplyEffect getValue(State state, GetValue command) { + return Effect().reply(command, state); + } - } + private Effect incrementLater(State state, IncrementLater command) { + ActorRef delay = ctx.spawnAnonymous(Behaviors.withTimers(timers -> { + timers.startSingleTimer(Tick.INSTANCE, Tick.INSTANCE, Duration.ofMillis(10)); + return Behaviors.receive((context, o) -> Behaviors.stopped()); + })); + ctx.watchWith(delay, DelayFinished.INSTANCE); + return Effect().none(); + } - @Override - public State emptyState() { - return emptyState; - } + private Effect delayFinished(State state, DelayFinished command) { + return Effect().persist(new Incremented(10)); + } - @Override - public boolean shouldSnapshot(State state, Incremented event, long sequenceNr) { - try { - return snapshot.apply(state, event, sequenceNr); - } catch (Exception e) { - throw new RuntimeException(e); - } - } + private Effect increment100OnTimeout(State state, Increment100OnTimeout command) { + ctx.setReceiveTimeout(Duration.ofMillis(10), Timeout.INSTANCE); + return Effect().none(); + } - @Override - public Set tagsFor(Incremented incremented) { - try { - return tagsFunction.apply(incremented); - } catch (Exception e) { - throw new RuntimeException(e); - } - } + private Effect timeout(State state, Timeout command) { + return Effect().persist(new Incremented(100)); + } - @Override - public void onSnapshot(SnapshotMetadata meta, Optional result) { - snapshotProbe.tell(result); - } + private Effect emptyEventsListAndThenLog(State state, EmptyEventsListAndThenLog command) { + return Effect() + .persist(Collections.emptyList()) + .thenRun(s -> log()); + } + private Effect stopThenLog(State state, StopThenLog command) { + return Effect() + .stop() + .thenRun(s -> log()); + } - @Override - public EventAdapter eventAdapter() { - return transformer; - } - }; - }); + private Effect incrementTwiceAndLog(State state, IncrementTwiceAndLog command) { + return Effect() + .persist(Arrays.asList(new Incremented(1), new Incremented(1))) + .thenRun(s -> log()); + } + + @Override + public EventHandler eventHandler() { + return eventHandlerBuilder() + .matchEvent(Incremented.class, this::applyIncremented) + .build(); + } + + @Override + public State emptyState() { + return State.EMPTY; + } + + protected State applyIncremented(State state, Incremented event) { + // override to probe for events + return state.incrementedBy(event.delta); + } + + protected void log() { + // override to probe for logs + } } @Test public void persistEvents() { ActorRef c = testKit.spawn(counter(new PersistenceId("c1"))); TestProbe probe = testKit.createTestProbe(); - c.tell(Increment.instance); + c.tell(Increment.INSTANCE); c.tell(new GetValue(probe.ref())); probe.expectMessage(new State(1, singletonList(0))); } @@ -367,16 +307,16 @@ public class PersistentActorJavaDslTest extends JUnitSuite { public void replyStoredEvents() { ActorRef c = testKit.spawn(counter(new PersistenceId("c2"))); TestProbe probe = testKit.createTestProbe(); - c.tell(Increment.instance); - c.tell(Increment.instance); - c.tell(Increment.instance); + c.tell(Increment.INSTANCE); + c.tell(Increment.INSTANCE); + c.tell(Increment.INSTANCE); c.tell(new GetValue(probe.ref())); probe.expectMessage(new State(3, Arrays.asList(0, 1, 2))); ActorRef c2 = testKit.spawn(counter(new PersistenceId("c2"))); c2.tell(new GetValue(probe.ref())); probe.expectMessage(new State(3, Arrays.asList(0, 1, 2))); - c2.tell(Increment.instance); + c2.tell(Increment.INSTANCE); c2.tell(new GetValue(probe.ref())); probe.expectMessage(new State(4, Arrays.asList(0, 1, 2, 3))); } @@ -392,28 +332,53 @@ public class PersistentActorJavaDslTest extends JUnitSuite { @Test public void handleTerminatedSignal() { TestProbe> eventHandlerProbe = testKit.createTestProbe(); - ActorRef c = testKit.spawn(counter(new PersistenceId("c3"), eventHandlerProbe.ref())); - c.tell(Increment.instance); - c.tell(new IncrementLater()); - eventHandlerProbe.expectMessage(Pair.create(emptyState, new Incremented(1))); - eventHandlerProbe.expectMessage(Pair.create(new State(1, Collections.singletonList(0)), terminatedEvent)); + Behavior counter = Behaviors.setup(ctx -> + new CounterBehavior(new PersistenceId("c3"), ctx) { + @Override + protected State applyIncremented(State state, Incremented event) { + eventHandlerProbe.ref().tell(Pair.create(state, event)); + return super.applyIncremented(state, event); + } + } + ); + ActorRef c = testKit.spawn(counter); + c.tell(Increment.INSTANCE); + c.tell(IncrementLater.INSTANCE); + eventHandlerProbe.expectMessage(Pair.create(State.EMPTY, new Incremented(1))); + eventHandlerProbe.expectMessage(Pair.create(new State(1, Collections.singletonList(0)), new Incremented(10))); } @Test public void handleReceiveTimeout() { TestProbe> eventHandlerProbe = testKit.createTestProbe(); - ActorRef c = testKit.spawn(counter(new PersistenceId("c4"), eventHandlerProbe.ref())); - c.tell(new Increment100OnTimeout()); - eventHandlerProbe.expectMessage(Pair.create(emptyState, timeoutEvent)); + Behavior counter = Behaviors.setup(ctx -> + new CounterBehavior(new PersistenceId("c4"), ctx) { + @Override + protected State applyIncremented(State state, Incremented event) { + eventHandlerProbe.ref().tell(Pair.create(state, event)); + return super.applyIncremented(state, event); + } + } + ); + ActorRef c = testKit.spawn(counter); + c.tell(Increment100OnTimeout.INSTANCE); + eventHandlerProbe.expectMessage(Pair.create(State.EMPTY, new Incremented(100))); } @Test public void chainableSideEffectsWithEvents() { - TestProbe> eventHandlerProbe = testKit.createTestProbe(); TestProbe loggingProbe = testKit.createTestProbe(); - ActorRef c = testKit.spawn(counter(new PersistenceId("c5"), eventHandlerProbe.ref(), loggingProbe.ref())); - c.tell(new EmptyEventsListAndThenLog()); - loggingProbe.expectMessage(loggingOne); + Behavior counter = Behaviors.setup(ctx -> + new CounterBehavior(new PersistenceId("c5"), ctx) { + @Override + protected void log() { + loggingProbe.ref().tell("logged"); + } + } + ); + ActorRef c = testKit.spawn(counter); + c.tell(EmptyEventsListAndThenLog.INSTANCE); + loggingProbe.expectMessage("logged"); } @Test @@ -425,7 +390,7 @@ public class PersistentActorJavaDslTest extends JUnitSuite { ActorRef c = testKit.spawn(behavior); TestProbe probe = testKit.createTestProbe(); - c.tell(Increment.instance); + c.tell(Increment.INSTANCE); c.tell(new GetValue(probe.ref())); probe.expectMessage(new State(1, singletonList(0))); } @@ -433,31 +398,51 @@ public class PersistentActorJavaDslTest extends JUnitSuite { @Test public void snapshot() { TestProbe> snapshotProbe = testKit.createTestProbe(); - Behavior snapshoter = counter(new PersistenceId("c11"), (s, e, l) -> s.value % 2 == 0, snapshotProbe.ref()); + + Behavior snapshoter = Behaviors.setup(ctx -> + new CounterBehavior(new PersistenceId("snapshot"), ctx) { + @Override + public boolean shouldSnapshot(State state, Incremented event, long sequenceNr) { + return state.value % 2 == 0; + } + @Override + public void onSnapshot(SnapshotMetadata meta, Optional result) { + snapshotProbe.ref().tell(result); + } + } + ); ActorRef c = testKit.spawn(snapshoter); - c.tell(Increment.instance); - c.tell(Increment.instance); + c.tell(Increment.INSTANCE); + c.tell(Increment.INSTANCE); snapshotProbe.expectMessage(Optional.empty()); - c.tell(Increment.instance); - TestProbe probe = testKit.createTestProbe(); + c.tell(Increment.INSTANCE); - c.tell(new GetValue(probe.ref())); - probe.expectMessage(new State(3, Arrays.asList(0, 1, 2))); + TestProbe stateProbe = testKit.createTestProbe(); + c.tell(new GetValue(stateProbe.ref())); + stateProbe.expectMessage(new State(3, Arrays.asList(0, 1, 2))); - TestProbe> eventProbe = testKit.createTestProbe(); - snapshoter = counter(new PersistenceId("c11"), eventProbe.ref(), (s, e, l) -> s.value % 2 == 0); - ActorRef c2 = testKit.spawn(snapshoter); + TestProbe> eventHandlerProbe = testKit.createTestProbe(); + Behavior recovered = Behaviors.setup(ctx -> + new CounterBehavior(new PersistenceId("snapshot"), ctx) { + @Override + protected State applyIncremented(State state, Incremented event) { + eventHandlerProbe.ref().tell(Pair.create(state, event)); + return super.applyIncremented(state, event); + } + } + ); + ActorRef c2 = testKit.spawn(recovered); // First 2 are snapshot - eventProbe.expectMessage(Pair.create(new State(2, Arrays.asList(0, 1)), new Incremented(1))); - c2.tell(new GetValue(probe.ref())); - probe.expectMessage(new State(3, Arrays.asList(0, 1, 2))); + eventHandlerProbe.expectMessage(Pair.create(new State(2, Arrays.asList(0, 1)), new Incremented(1))); + c2.tell(new GetValue(stateProbe.ref())); + stateProbe.expectMessage(new State(3, Arrays.asList(0, 1, 2))); } @Test public void stopThenLog() { TestProbe probe = testKit.createTestProbe(); ActorRef c = testKit.spawn(counter(new PersistenceId("c12"))); - c.tell(new StopThenLog()); + c.tell(StopThenLog.INSTANCE); probe.expectTerminated(c, Duration.ofSeconds(1)); } @@ -465,31 +450,40 @@ public class PersistentActorJavaDslTest extends JUnitSuite { public void tapPersistentActor() { TestProbe interceptProbe = testKit.createTestProbe(); TestProbe signalProbe = testKit.createTestProbe(); - BehaviorInterceptor tap = new BehaviorInterceptor() { + BehaviorInterceptor tap = new BehaviorInterceptor() { @Override - public Behavior aroundReceive(ActorContext ctx, Object msg, ReceiveTarget target) { + public Behavior aroundReceive(akka.actor.typed.ActorContext ctx, Command msg, ReceiveTarget target) { interceptProbe.ref().tell(msg); return target.apply(ctx, msg); } @Override - public Behavior aroundSignal(ActorContext ctx, Signal signal, SignalTarget target) { + public Behavior aroundSignal(akka.actor.typed.ActorContext ctx, Signal signal, SignalTarget target) { signalProbe.ref().tell(signal); return target.apply(ctx, signal); } }; - ActorRef c = testKit.spawn(Behaviors.intercept(tap, ((Behavior)counter(new PersistenceId("tap1"))))); - c.tell(Increment.instance); - interceptProbe.expectMessage(Increment.instance); + ActorRef c = testKit.spawn(Behaviors.intercept(tap, counter(new PersistenceId("tap1")))); + c.tell(Increment.INSTANCE); + interceptProbe.expectMessage(Increment.INSTANCE); signalProbe.expectNoMessage(); } @Test public void tagEvent() throws Exception { - TestProbe> eventProbe = testKit.createTestProbe(); + Behavior tagger = Behaviors.setup(ctx -> + new CounterBehavior(new PersistenceId("tagging"), ctx) { + @Override + public Set tagsFor(Incremented incremented) { + return Sets.newHashSet("tag1", "tag2"); + } + } + ); + ActorRef c = testKit.spawn(tagger); + + c.tell(Increment.INSTANCE); + TestProbe stateProbe = testKit.createTestProbe(); - ActorRef c = testKit.spawn(counter(new PersistenceId("tagging"), eventProbe.ref(), e -> Sets.newHashSet("tag1", "tag2"))); - c.tell(new Increment()); c.tell(new GetValue(stateProbe.ref())); stateProbe.expectMessage(new State(1, Collections.singletonList(0))); @@ -502,11 +496,19 @@ public class PersistentActorJavaDslTest extends JUnitSuite { @Test public void transformEvent() throws Exception { - TestProbe> eventProbe = testKit.createTestProbe(); - TestProbe stateProbe = testKit.createTestProbe(); - ActorRef c = testKit.spawn(counter(new PersistenceId("transform"), eventProbe.ref(), new WrapperEventAdapter())); + Behavior transformer = Behaviors.setup(ctx -> + new CounterBehavior(new PersistenceId("transform"), ctx) { + private final EventAdapter adapter = new WrapperEventAdapter(); + public EventAdapter eventAdapter() { + return adapter; + } + } + ); + ActorRef c = testKit.spawn(transformer); - c.tell(new Increment()); + c.tell(Increment.INSTANCE); + + TestProbe stateProbe = testKit.createTestProbe(); c.tell(new GetValue(stateProbe.ref())); stateProbe.expectMessage(new State(1, Collections.singletonList(0))); @@ -516,7 +518,7 @@ public class PersistentActorJavaDslTest extends JUnitSuite { new EventEnvelope(new Sequence(1), "transform", 1, new Wrapper<>(new Incremented(1))) ), events); - ActorRef c2 = testKit.spawn(counter(new PersistenceId("transform"), eventProbe.ref(), new WrapperEventAdapter())); + ActorRef c2 = testKit.spawn(transformer); c2.tell(new GetValue(stateProbe.ref())); stateProbe.expectMessage(new State(1, Collections.singletonList(0))); }