null as empty state, in javadsl, #25768

This commit is contained in:
Patrik Nordwall 2018-10-18 11:38:27 +02:00 committed by GitHub
parent 1691961a10
commit bed17cc172
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 568 additions and 16 deletions

View file

@ -108,7 +108,11 @@ private[akka] trait EventsourcedJournalInteractions[C, E, S] {
}
protected def internalSaveSnapshot(state: EventsourcedRunning.EventsourcedState[S]): Unit = {
setup.snapshotStore.tell(SnapshotProtocol.SaveSnapshot(SnapshotMetadata(setup.persistenceId.id, state.seqNr), state.state), setup.selfUntyped)
// don't store null state
if (state.state != null)
setup.snapshotStore.tell(SnapshotProtocol.SaveSnapshot(
SnapshotMetadata(setup.persistenceId.id, state.seqNr),
state.state), setup.selfUntyped)
}
}

View file

@ -6,6 +6,7 @@ package akka.persistence.typed.javadsl
import java.util.function.BiFunction
import java.util.function.Predicate
import java.util.function.{ Function JFunction }
import akka.annotation.InternalApi
import akka.persistence.typed.internal._
@ -23,18 +24,14 @@ trait CommandHandler[Command, Event, State] {
object CommandHandlerBuilder {
private val _trueStatePredicate: Predicate[Any] = new Predicate[Any] {
override def test(t: Any): Boolean = true
}
private def trueStatePredicate[S]: Predicate[S] = _trueStatePredicate.asInstanceOf[Predicate[S]]
/**
* @param stateClass The handlers defined by this builder are used when the state is an instance of the `stateClass`
* @return A new, mutable, command handler builder
*/
def builder[Command, Event, S <: State, State](stateClass: Class[S]): CommandHandlerBuilder[Command, Event, S, State] =
new CommandHandlerBuilder(stateClass, statePredicate = trueStatePredicate)
new CommandHandlerBuilder(statePredicate = new Predicate[S] {
override def test(state: S): Boolean = state != null && stateClass.isAssignableFrom(state.getClass)
})
/**
* @param statePredicate The handlers defined by this builder are used when the `statePredicate` is `true`,
@ -42,7 +39,7 @@ object CommandHandlerBuilder {
* @return A new, mutable, command handler builder
*/
def builder[Command, Event, State](statePredicate: Predicate[State]): CommandHandlerBuilder[Command, Event, State, State] =
new CommandHandlerBuilder(classOf[Any].asInstanceOf[Class[State]], statePredicate)
new CommandHandlerBuilder(statePredicate)
/**
* INTERNAL API
@ -54,7 +51,7 @@ object CommandHandlerBuilder {
}
final class CommandHandlerBuilder[Command, Event, S <: State, State] @InternalApi private[persistence] (
val stateClass: Class[S], val statePredicate: Predicate[S]) {
val statePredicate: Predicate[S]) {
import CommandHandlerBuilder.CommandHandlerCase
private var cases: List[CommandHandlerCase[Command, Event, State]] = Nil
@ -62,7 +59,7 @@ final class CommandHandlerBuilder[Command, Event, S <: State, State] @InternalAp
private def addCase(predicate: Command Boolean, handler: BiFunction[S, Command, Effect[Event, State]]): Unit = {
cases = CommandHandlerCase[Command, Event, State](
commandPredicate = predicate,
statePredicate = state stateClass.isAssignableFrom(state.getClass) && statePredicate.test(state.asInstanceOf[S]),
statePredicate = state statePredicate.test(state.asInstanceOf[S]),
handler.asInstanceOf[BiFunction[State, Command, Effect[Event, State]]]) :: cases
}
@ -74,17 +71,45 @@ final class CommandHandlerBuilder[Command, Event, S <: State, State] @InternalAp
this
}
/**
* Match any command which the given `predicate` returns true for.
*
* Use this when then `State` is not needed in the `handler`, otherwise there is an overloaded method that pass
* the state in a `BiFunction`.
*/
def matchCommand(predicate: Predicate[Command], handler: JFunction[Command, Effect[Event, State]]): CommandHandlerBuilder[Command, Event, S, State] = {
addCase(cmd predicate.test(cmd), new BiFunction[S, Command, Effect[Event, State]] {
override def apply(state: S, cmd: Command): Effect[Event, State] = handler(cmd)
})
this
}
/**
* Match commands that are of the given `commandClass` or subclass thereof
*/
def matchCommand[C <: Command](commandClass: Class[C], handler: BiFunction[S, C, Effect[Event, State]]): CommandHandlerBuilder[Command, Event, S, State] = {
addCase(cmd commandClass.isAssignableFrom(cmd.getClass), handler.asInstanceOf[BiFunction[S, Command, Effect[Event, State]]])
this
}
/**
* Match commands that are of the given `commandClass` or subclass thereof.
*
* Use this when then `State` is not needed in the `handler`, otherwise there is an overloaded method that pass
* the state in a `BiFunction`.
*/
def matchCommand[C <: Command](commandClass: Class[C], handler: JFunction[C, Effect[Event, State]]): CommandHandlerBuilder[Command, Event, S, State] = {
matchCommand[C](commandClass, new BiFunction[S, C, Effect[Event, State]] {
override def apply(state: S, cmd: C): Effect[Event, State] = handler(cmd)
})
}
/**
* Compose this builder with another builder. The handlers in this builder will be tried first followed
* by the handlers in `other`.
*/
def orElse[S2 <: State](other: CommandHandlerBuilder[Command, Event, S2, State]): CommandHandlerBuilder[Command, Event, S2, State] = {
val newBuilder = new CommandHandlerBuilder[Command, Event, S2, State](other.stateClass, other.statePredicate)
val newBuilder = new CommandHandlerBuilder[Command, Event, S2, State](other.statePredicate)
// problem with overloaded constructor with `cases` as parameter
newBuilder.cases = other.cases ::: cases
newBuilder

View file

@ -5,6 +5,7 @@
package akka.persistence.typed.javadsl
import java.util.function.BiFunction
import java.util.function.{ Function JFunction }
import akka.annotation.InternalApi
import akka.util.OptionVal
@ -49,11 +50,23 @@ final class EventHandlerBuilder[State >: Null, Event]() {
this
}
/**
* Match any event which is an instance of `E` or a subtype of `E`.
*
* Use this when then `State` is not needed in the `handler`, otherwise there is an overloaded method that pass
* the state in a `BiFunction`.
*/
def matchEvent[E <: Event](eventClass: Class[E], f: JFunction[E, State]): EventHandlerBuilder[State, Event] = {
matchEvent[E](eventClass, new BiFunction[State, E, State] {
override def apply(state: State, event: E): State = f(event)
})
}
def matchEvent[E <: Event, S <: State](eventClass: Class[E], stateClass: Class[S],
biFunction: BiFunction[S, E, State]): EventHandlerBuilder[State, Event] = {
cases = EventHandlerCase[State, Event](
statePredicate = s stateClass.isAssignableFrom(s.getClass),
statePredicate = s s != null && stateClass.isAssignableFrom(s.getClass),
eventPredicate = e eventClass.isAssignableFrom(e.getClass),
biFunction.asInstanceOf[BiFunction[State, Event, State]]) :: cases
this
@ -102,7 +115,9 @@ final class EventHandlerBuilder[State >: Null, Event]() {
}
result match {
case OptionVal.None throw new MatchError(s"No match found for event [${event.getClass}] and state [${state.getClass}]. Has this event been stored using an EventAdapter?")
case OptionVal.None
val stateClass = if (state == null) "null" else state.getClass.getName
throw new MatchError(s"No match found for event [${event.getClass}] and state [$stateClass]. Has this event been stored using an EventAdapter?")
case OptionVal.Some(s) s
}
}

View file

@ -0,0 +1,108 @@
/**
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.typed.javadsl;
import akka.actor.testkit.typed.javadsl.TestKitJunitResource;
import akka.actor.testkit.typed.javadsl.TestProbe;
import akka.actor.typed.ActorRef;
import akka.actor.typed.Behavior;
import akka.actor.typed.javadsl.Behaviors;
import akka.persistence.typed.PersistenceId;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import org.junit.ClassRule;
import org.junit.Test;
import org.scalatest.junit.JUnitSuite;
import java.util.Objects;
public class NullEmptyStateTest extends JUnitSuite {
private static final Config config = ConfigFactory.parseString(
"akka.persistence.journal.plugin = \"akka.persistence.journal.inmem\" \n");
@ClassRule
public static final TestKitJunitResource testKit = new TestKitJunitResource(config);
static class NullEmptyState extends PersistentBehavior<String, String, String> {
private final ActorRef<String> probe;
NullEmptyState(PersistenceId persistenceId, ActorRef<String> probe) {
super(persistenceId);
this.probe = probe;
}
@Override
public String emptyState() {
return null;
}
@Override
public void onRecoveryCompleted(String s) {
probe.tell("onRecoveryCompleted:" + s);
}
@Override
public CommandHandler<String, String, String> commandHandler() {
CommandHandlerBuilder<String, String, String, String> b1 =
commandHandlerBuilder(Objects::isNull)
.matchCommand("stop"::equals, command -> Effect().stop())
.matchCommand(String.class, this::persistCommand);
CommandHandlerBuilder<String, String, String, String> b2 =
commandHandlerBuilder(String.class)
.matchCommand("stop"::equals, command -> Effect().stop())
.matchCommand(String.class, this::persistCommand);
return b1.orElse(b2).build();
}
private Effect<String, String> persistCommand(String command) {
return Effect().persist(command);
}
@Override
public EventHandler<String, String> eventHandler() {
return eventHandlerBuilder()
.matchEvent(String.class, this::applyEvent)
.build();
}
private String applyEvent(String state, String event) {
probe.tell("eventHandler:" + state + ":" + event);
if (state == null)
return event;
else
return state + event;
}
}
@Test
public void handleNullState() throws Exception {
TestProbe<String> probe = testKit.createTestProbe();
Behavior<String> b = Behaviors.setup(ctx -> new NullEmptyState(new PersistenceId("a"), probe.ref()));
ActorRef<String> ref1 = testKit.spawn(b);
probe.expectMessage("onRecoveryCompleted:null");
ref1.tell("stop");
ActorRef<String> ref2 = testKit.spawn(b);
probe.expectMessage("onRecoveryCompleted:null");
ref2.tell("one");
probe.expectMessage("eventHandler:null:one");
ref2.tell("two");
probe.expectMessage("eventHandler:one:two");
ref2.tell("stop");
ActorRef<String> ref3 = testKit.spawn(b);
// eventHandler from reply
probe.expectMessage("eventHandler:null:one");
probe.expectMessage("eventHandler:one:two");
probe.expectMessage("onRecoveryCompleted:onetwo");
ref3.tell("three");
probe.expectMessage("eventHandler:onetwo:three");
}
}

View file

@ -0,0 +1,85 @@
/**
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.typed.javadsl;
import akka.actor.testkit.typed.javadsl.TestKitJunitResource;
import akka.actor.testkit.typed.javadsl.TestProbe;
import akka.actor.typed.ActorRef;
import akka.actor.typed.Behavior;
import akka.actor.typed.javadsl.Behaviors;
import akka.persistence.typed.PersistenceId;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import org.junit.ClassRule;
import org.junit.Test;
import org.scalatest.junit.JUnitSuite;
public class PrimitiveStateTest extends JUnitSuite {
private static final Config config = ConfigFactory.parseString(
"akka.persistence.journal.plugin = \"akka.persistence.journal.inmem\" \n");
@ClassRule
public static final TestKitJunitResource testKit = new TestKitJunitResource(config);
static class PrimitiveState extends PersistentBehavior<Integer, Integer, Integer> {
private final ActorRef<String> probe;
PrimitiveState(PersistenceId persistenceId, ActorRef<String> probe) {
super(persistenceId);
this.probe = probe;
}
@Override
public Integer emptyState() {
return 0;
}
@Override
public void onRecoveryCompleted(Integer n) {
probe.tell("onRecoveryCompleted:" + n);
}
@Override
public CommandHandler<Integer, Integer, Integer> commandHandler() {
return (state, command) -> {
if (command < 0)
return Effect().stop();
else
return Effect().persist(command);
};
}
@Override
public EventHandler<Integer, Integer> eventHandler() {
return (state, event) -> {
probe.tell("eventHandler:" + state + ":" + event);
return state + event;
};
}
}
@Test
public void handleIntegerState() throws Exception {
TestProbe<String> probe = testKit.createTestProbe();
Behavior<Integer> b = Behaviors.setup(ctx -> new PrimitiveState(new PersistenceId("a"), probe.ref()));
ActorRef<Integer> ref1 = testKit.spawn(b);
probe.expectMessage("onRecoveryCompleted:0");
ref1.tell(1);
probe.expectMessage("eventHandler:0:1");
ref1.tell(2);
probe.expectMessage("eventHandler:1:2");
ref1.tell(-1);
ActorRef<Integer> ref2 = testKit.spawn(b);
// eventHandler from reply
probe.expectMessage("eventHandler:0:1");
probe.expectMessage("eventHandler:1:2");
probe.expectMessage("onRecoveryCompleted:3");
ref2.tell(3);
probe.expectMessage("eventHandler:3:3");
}
}

View file

@ -0,0 +1,181 @@
/*
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
*/
package jdocs.akka.persistence.typed;
import akka.Done;
import akka.actor.typed.ActorRef;
import akka.persistence.typed.PersistenceId;
import akka.persistence.typed.javadsl.CommandHandler;
import akka.persistence.typed.javadsl.CommandHandlerBuilder;
import akka.persistence.typed.javadsl.EventHandler;
import akka.persistence.typed.javadsl.PersistentBehavior;
import java.util.Objects;
public class NullBlogState {
interface BlogEvent {
}
public static class PostAdded implements BlogEvent {
private final String postId;
private final PostContent content;
public PostAdded(String postId, PostContent content) {
this.postId = postId;
this.content = content;
}
}
public static class BodyChanged implements BlogEvent {
private final String postId;
private final String newBody;
public BodyChanged(String postId, String newBody) {
this.postId = postId;
this.newBody = newBody;
}
}
public static class Published implements BlogEvent {
private final String postId;
public Published(String postId) {
this.postId = postId;
}
}
public static class BlogState {
final PostContent postContent;
final boolean published;
BlogState(PostContent postContent, boolean published) {
this.postContent = postContent;
this.published = published;
}
public BlogState withContent(PostContent newContent) {
return new BlogState(newContent, this.published);
}
public String postId() {
return postContent.postId;
}
}
public interface BlogCommand {
}
public static class AddPost implements BlogCommand {
final PostContent content;
final ActorRef<AddPostDone> replyTo;
public AddPost(PostContent content, ActorRef<AddPostDone> replyTo) {
this.content = content;
this.replyTo = replyTo;
}
}
public static class AddPostDone implements BlogCommand {
final String postId;
public AddPostDone(String postId) {
this.postId = postId;
}
}
public static class GetPost implements BlogCommand {
final ActorRef<PostContent> replyTo;
public GetPost(ActorRef<PostContent> replyTo) {
this.replyTo = replyTo;
}
}
public static class ChangeBody implements BlogCommand {
final String newBody;
final ActorRef<Done> replyTo;
public ChangeBody(String newBody, ActorRef<Done> replyTo) {
this.newBody = newBody;
this.replyTo = replyTo;
}
}
public static class Publish implements BlogCommand {
final ActorRef<Done> replyTo;
public Publish(ActorRef<Done> replyTo) {
this.replyTo = replyTo;
}
}
public static class PassivatePost implements BlogCommand {
}
public static class PostContent implements BlogCommand {
final String postId;
final String title;
final String body;
public PostContent(String postId, String title, String body) {
this.postId = postId;
this.title = title;
this.body = body;
}
}
public static class BlogBehavior extends PersistentBehavior<BlogCommand, BlogEvent, BlogState> {
private CommandHandlerBuilder<BlogCommand, BlogEvent, BlogState, BlogState> initialCommandHandler() {
return commandHandlerBuilder(Objects::isNull)
.matchCommand(AddPost.class, cmd -> {
PostAdded event = new PostAdded(cmd.content.postId, cmd.content);
return Effect().persist(event)
.andThen(() -> cmd.replyTo.tell(new AddPostDone(cmd.content.postId)));
})
.matchCommand(PassivatePost.class, cmd -> Effect().stop());
}
private CommandHandlerBuilder<BlogCommand, BlogEvent, BlogState, BlogState> postCommandHandler() {
return commandHandlerBuilder(Objects::nonNull)
.matchCommand(ChangeBody.class, (state, cmd) -> {
BodyChanged event = new BodyChanged(state.postId(), cmd.newBody);
return Effect().persist(event).andThen(() -> cmd.replyTo.tell(Done.getInstance()));
})
.matchCommand(Publish.class, (state, cmd) -> Effect()
.persist(new Published(state.postId())).andThen(() -> {
System.out.println("Blog post published: " + state.postId());
cmd.replyTo.tell(Done.getInstance());
}))
.matchCommand(GetPost.class, (state, cmd) -> {
cmd.replyTo.tell(state.postContent);
return Effect().none();
})
.matchCommand(AddPost.class, (state, cmd) -> Effect().unhandled())
.matchCommand(PassivatePost.class, cmd -> Effect().stop());
}
public BlogBehavior(PersistenceId persistenceId) {
super(persistenceId);
}
@Override
public BlogState emptyState() {
return null;
}
@Override
public CommandHandler<BlogCommand, BlogEvent, BlogState> commandHandler() {
return initialCommandHandler().orElse(postCommandHandler()).build();
}
@Override
public EventHandler<BlogState, BlogEvent> eventHandler() {
return eventHandlerBuilder()
.matchEvent(PostAdded.class, event ->
new BlogState(event.content, false))
.matchEvent(BodyChanged.class, (state, chg) ->
state.withContent(
new PostContent(state.postId(), state.postContent.title, chg.newBody)))
.matchEvent(Published.class, (state, event) ->
new BlogState(state.postContent, true))
.build();
}
}
}

View file

@ -129,7 +129,7 @@ public class OptionalBlogState {
return Effect().persist(event)
.andThen(() -> cmd.replyTo.tell(new AddPostDone(cmd.content.postId)));
})
.matchCommand(PassivatePost.class, (state, cmd) -> Effect().stop());
.matchCommand(PassivatePost.class, cmd -> Effect().stop());
}
private CommandHandlerBuilder<BlogCommand, BlogEvent, Optional<BlogState>, Optional<BlogState>> postCommandHandler() {
@ -148,7 +148,7 @@ public class OptionalBlogState {
return Effect().none();
})
.matchCommand(AddPost.class, (state, cmd) -> Effect().unhandled())
.matchCommand(PassivatePost.class, (state, cmd) -> Effect().stop());
.matchCommand(PassivatePost.class, cmd -> Effect().stop());
}
public BlogBehavior(PersistenceId persistenceId) {

View file

@ -0,0 +1,67 @@
/**
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.typed.scaladsl
import akka.actor.testkit.typed.TestKitSettings
import akka.actor.testkit.typed.scaladsl._
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.persistence.typed.PersistenceId
import com.typesafe.config.ConfigFactory
import org.scalatest.WordSpecLike
object NullEmptyStateSpec {
private val conf = ConfigFactory.parseString(
s"""
akka.persistence.journal.plugin = "akka.persistence.journal.inmem"
""")
}
class NullEmptyStateSpec extends ScalaTestWithActorTestKit(NullEmptyStateSpec.conf) with WordSpecLike {
implicit val testSettings = TestKitSettings(system)
def primitiveState(persistenceId: PersistenceId, probe: ActorRef[String]): Behavior[String] =
PersistentBehavior[String, String, String](
persistenceId,
emptyState = null,
commandHandler = (_, command) {
if (command == "stop")
Effect.stop
else
Effect.persist(command)
},
eventHandler = (state, event) {
probe.tell("eventHandler:" + state + ":" + event)
if (state == null) event else state + event
}
).onRecoveryCompleted { s
probe.tell("onRecoveryCompleted:" + s)
}
"A typed persistent actor with primitive state" must {
"persist events and update state" in {
val probe = TestProbe[String]()
val b = primitiveState(PersistenceId("a"), probe.ref)
val ref1 = spawn(b)
probe.expectMessage("onRecoveryCompleted:null")
ref1 ! "one"
probe.expectMessage("eventHandler:null:one")
ref1 ! "two"
probe.expectMessage("eventHandler:one:two")
ref1 ! "stop"
val ref2 = testKit.spawn(b)
// eventHandler from reply
probe.expectMessage("eventHandler:null:one")
probe.expectMessage("eventHandler:one:two")
probe.expectMessage("onRecoveryCompleted:onetwo")
ref2 ! "three"
probe.expectMessage("eventHandler:onetwo:three")
}
}
}

View file

@ -0,0 +1,67 @@
/**
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.typed.scaladsl
import akka.actor.testkit.typed.TestKitSettings
import akka.actor.testkit.typed.scaladsl._
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.persistence.typed.PersistenceId
import com.typesafe.config.ConfigFactory
import org.scalatest.WordSpecLike
object PrimitiveStateSpec {
private val conf = ConfigFactory.parseString(
s"""
akka.persistence.journal.plugin = "akka.persistence.journal.inmem"
""")
}
class PrimitiveStateSpec extends ScalaTestWithActorTestKit(PrimitiveStateSpec.conf) with WordSpecLike {
implicit val testSettings = TestKitSettings(system)
def primitiveState(persistenceId: PersistenceId, probe: ActorRef[String]): Behavior[Int] =
PersistentBehavior[Int, Int, Int](
persistenceId,
emptyState = 0,
commandHandler = (_, command) {
if (command < 0)
Effect.stop
else
Effect.persist(command)
},
eventHandler = (state, event) {
probe.tell("eventHandler:" + state + ":" + event)
state + event
}
).onRecoveryCompleted { n
probe.tell("onRecoveryCompleted:" + n)
}
"A typed persistent actor with primitive state" must {
"persist events and update state" in {
val probe = TestProbe[String]()
val b = primitiveState(PersistenceId("a"), probe.ref)
val ref1 = spawn(b)
probe.expectMessage("onRecoveryCompleted:0")
ref1 ! 1
probe.expectMessage("eventHandler:0:1")
ref1 ! 2
probe.expectMessage("eventHandler:1:2")
ref1 ! -1
val ref2 = testKit.spawn(b)
// eventHandler from reply
probe.expectMessage("eventHandler:0:1")
probe.expectMessage("eventHandler:1:2")
probe.expectMessage("onRecoveryCompleted:3")
ref2 ! 3
probe.expectMessage("eventHandler:3:3")
}
}
}