From 78ff85d0fb90c73033fa0fe4ce9271decbf5bfb4 Mon Sep 17 00:00:00 2001 From: Debasish Ghosh Date: Thu, 3 Jun 2021 12:02:43 +0530 Subject: [PATCH] Initial implementation of DurableStateStore and one test --- .../DurableStateBehaviorReplySpec.scala | 113 ++++++++++++++++++ .../InmemDurableStateStoreProvider.scala | 19 +++ .../javadsl/InmemDurableStateStore.scala | 32 +++++ .../scaladsl/InmemDurableStateStore.scala | 28 +++++ 4 files changed, 192 insertions(+) create mode 100644 akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/state/scaladsl/DurableStateBehaviorReplySpec.scala create mode 100644 akka-persistence/src/main/scala/akka/persistence/state/inmem/InmemDurableStateStoreProvider.scala create mode 100644 akka-persistence/src/main/scala/akka/persistence/state/inmem/javadsl/InmemDurableStateStore.scala create mode 100644 akka-persistence/src/main/scala/akka/persistence/state/inmem/scaladsl/InmemDurableStateStore.scala diff --git a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/state/scaladsl/DurableStateBehaviorReplySpec.scala b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/state/scaladsl/DurableStateBehaviorReplySpec.scala new file mode 100644 index 0000000000..231b480077 --- /dev/null +++ b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/state/scaladsl/DurableStateBehaviorReplySpec.scala @@ -0,0 +1,113 @@ +/* + * Copyright (C) 2021 Lightbend Inc. + */ + +package akka.persistence.typed.state.scaladsl + +import java.util.concurrent.atomic.AtomicInteger + +import com.typesafe.config.Config +import com.typesafe.config.ConfigFactory +import org.scalatest.wordspec.AnyWordSpecLike + +import akka.Done +import akka.actor.testkit.typed.scaladsl._ +import akka.actor.typed.ActorRef +import akka.actor.typed.Behavior +import akka.actor.typed.scaladsl.ActorContext +import akka.actor.typed.scaladsl.Behaviors +import akka.persistence.typed.PersistenceId +import akka.serialization.jackson.CborSerializable + +object DurableStateBehaviorReplySpec { + def conf: Config = ConfigFactory.parseString(s""" + akka.loglevel = INFO + akka.persistence.state.plugin = "akka.persistence.state.inmem" + akka.persistence.state.inmem { + class = "akka.persistence.state.inmem.InmemDurableStateStoreProvider" + # FIXME we should change this to use a fallback in reference.conf + recovery-timeout = 30s + } + """) + + sealed trait Command[ReplyMessage] extends CborSerializable + final case class IncrementWithConfirmation(replyTo: ActorRef[Done]) extends Command[Done] + final case class IncrementReplyLater(replyTo: ActorRef[Done]) extends Command[Done] + final case class ReplyNow(replyTo: ActorRef[Done]) extends Command[Done] + final case class GetValue(replyTo: ActorRef[State]) extends Command[State] + + final case class State(value: Int) extends CborSerializable + + def counter(persistenceId: PersistenceId): Behavior[Command[_]] = + Behaviors.setup(ctx => counter(ctx, persistenceId)) + + def counter(ctx: ActorContext[Command[_]], persistenceId: PersistenceId): DurableStateBehavior[Command[_], State] = { + DurableStateBehavior + .withEnforcedReplies[Command[_], State]( + persistenceId, + emptyState = State(0), + commandHandler = (state, command) => + command match { + + case IncrementWithConfirmation(replyTo) => + Effect.persist(state.copy(value = state.value + 1)).thenReply(replyTo)(_ => Done) + + case IncrementReplyLater(replyTo) => + Effect + .persist(state.copy(value = state.value + 1)) + .thenRun((_: State) => ctx.self ! ReplyNow(replyTo)) + .thenNoReply() + + case ReplyNow(replyTo) => + Effect.reply(replyTo)(Done) + + case GetValue(replyTo) => + Effect.reply(replyTo)(state) + + }) + .withDurableStateStorePluginId("akka.persistence.state.inmem") + } +} + +class DurableStateBehaviorReplySpec + extends ScalaTestWithActorTestKit(DurableStateBehaviorReplySpec.conf) + with AnyWordSpecLike + with LogCapturing { + + import DurableStateBehaviorReplySpec._ + + val pidCounter = new AtomicInteger(0) + private def nextPid(): PersistenceId = PersistenceId.ofUniqueId(s"c${pidCounter.incrementAndGet()})") + + "A typed persistent actor with commands that are expecting replies" must { + + "persist an event thenReply" in { + val c = spawn(counter(nextPid())) + val probe = TestProbe[Done]() + c ! IncrementWithConfirmation(probe.ref) + probe.expectMessage(Done) + + c ! IncrementWithConfirmation(probe.ref) + c ! IncrementWithConfirmation(probe.ref) + probe.expectMessage(Done) + probe.expectMessage(Done) + } + + "persist an event thenReply later" in { + val c = spawn(counter(nextPid())) + val probe = TestProbe[Done]() + c ! IncrementReplyLater(probe.ref) + probe.expectMessage(Done) + } + + "reply to query command" in { + val c = spawn(counter(nextPid())) + val updateProbe = TestProbe[Done]() + c ! IncrementWithConfirmation(updateProbe.ref) + + val queryProbe = TestProbe[State]() + c ! GetValue(queryProbe.ref) + queryProbe.expectMessage(State(1)) + } + } +} diff --git a/akka-persistence/src/main/scala/akka/persistence/state/inmem/InmemDurableStateStoreProvider.scala b/akka-persistence/src/main/scala/akka/persistence/state/inmem/InmemDurableStateStoreProvider.scala new file mode 100644 index 0000000000..a38774e597 --- /dev/null +++ b/akka-persistence/src/main/scala/akka/persistence/state/inmem/InmemDurableStateStoreProvider.scala @@ -0,0 +1,19 @@ +/* + * Copyright (C) 2021 Lightbend Inc. + */ + +package akka.persistence.state.inmem + +import akka.persistence.state.DurableStateStoreProvider +import akka.persistence.state.scaladsl.DurableStateStore +import akka.persistence.state.inmem.scaladsl.InmemDurableStateStore +import akka.persistence.state.javadsl.{ DurableStateStore => JDurableStateStore } +import akka.persistence.state.inmem.javadsl.{ InmemDurableStateStore => JInmemDurableStateStore } + +class InmemDurableStateStoreProvider extends DurableStateStoreProvider { + override def scaladslDurableStateStore(): DurableStateStore[Any] = + new InmemDurableStateStore[Any] + + override def javadslDurableStateStore(): JDurableStateStore[AnyRef] = + new JInmemDurableStateStore[AnyRef] +} diff --git a/akka-persistence/src/main/scala/akka/persistence/state/inmem/javadsl/InmemDurableStateStore.scala b/akka-persistence/src/main/scala/akka/persistence/state/inmem/javadsl/InmemDurableStateStore.scala new file mode 100644 index 0000000000..3365e7ecbb --- /dev/null +++ b/akka-persistence/src/main/scala/akka/persistence/state/inmem/javadsl/InmemDurableStateStore.scala @@ -0,0 +1,32 @@ +/* + * Copyright (C) 2021 Lightbend Inc. + */ + +package akka.persistence.state.inmem.javadsl + +import java.util.Optional +import java.util.concurrent.{ CompletionStage, ConcurrentHashMap } + +import scala.concurrent.Future +import scala.compat.java8.FutureConverters._ + +import akka.Done + +import akka.persistence.state.javadsl.{ DurableStateUpdateStore, GetObjectResult } + +class InmemDurableStateStore[A] extends DurableStateUpdateStore[A] { + val store = new ConcurrentHashMap[String, A]() + + def getObject(persistenceId: String): CompletionStage[GetObjectResult[A]] = + toJava(Future.successful(GetObjectResult(Optional.ofNullable(store.get(persistenceId)), 0))) + + def upsertObject(persistenceId: String, seqNr: Long, value: A, tag: String): CompletionStage[Done] = + toJava(Future.successful(store.put(persistenceId, value) match { + case _ => Done + })) + + def deleteObject(persistenceId: String): CompletionStage[Done] = + toJava(Future.successful(store.remove(persistenceId) match { + case _ => Done + })) +} diff --git a/akka-persistence/src/main/scala/akka/persistence/state/inmem/scaladsl/InmemDurableStateStore.scala b/akka-persistence/src/main/scala/akka/persistence/state/inmem/scaladsl/InmemDurableStateStore.scala new file mode 100644 index 0000000000..dfbf1168e0 --- /dev/null +++ b/akka-persistence/src/main/scala/akka/persistence/state/inmem/scaladsl/InmemDurableStateStore.scala @@ -0,0 +1,28 @@ +/* + * Copyright (C) 2021 Lightbend Inc. + */ + +package akka.persistence.state.inmem.scaladsl + +import scala.collection.concurrent.TrieMap +import scala.concurrent.Future + +import akka.Done +import akka.persistence.state.scaladsl.{ DurableStateUpdateStore, GetObjectResult } + +class InmemDurableStateStore[A] extends DurableStateUpdateStore[A] { + val store = new TrieMap[String, A]() + + def getObject(persistenceId: String): Future[GetObjectResult[A]] = + Future.successful(GetObjectResult(store.get(persistenceId), 0)) + + def upsertObject(persistenceId: String, seqNr: Long, value: A, tag: String): Future[Done] = + Future.successful(store.put(persistenceId, value) match { + case _ => Done + }) + + def deleteObject(persistenceId: String): Future[Done] = + Future.successful(store.remove(persistenceId) match { + case _ => Done + }) +}