Initial implementation of DurableStateStore and one test

This commit is contained in:
Debasish Ghosh 2021-06-03 12:02:43 +05:30 committed by Patrik Nordwall
parent 2168cec497
commit 78ff85d0fb
4 changed files with 192 additions and 0 deletions

View file

@ -0,0 +1,113 @@
/*
* Copyright (C) 2021 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.typed.state.scaladsl
import java.util.concurrent.atomic.AtomicInteger
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import org.scalatest.wordspec.AnyWordSpecLike
import akka.Done
import akka.actor.testkit.typed.scaladsl._
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.ActorContext
import akka.actor.typed.scaladsl.Behaviors
import akka.persistence.typed.PersistenceId
import akka.serialization.jackson.CborSerializable
object DurableStateBehaviorReplySpec {
def conf: Config = ConfigFactory.parseString(s"""
akka.loglevel = INFO
akka.persistence.state.plugin = "akka.persistence.state.inmem"
akka.persistence.state.inmem {
class = "akka.persistence.state.inmem.InmemDurableStateStoreProvider"
# FIXME we should change this to use a fallback in reference.conf
recovery-timeout = 30s
}
""")
sealed trait Command[ReplyMessage] extends CborSerializable
final case class IncrementWithConfirmation(replyTo: ActorRef[Done]) extends Command[Done]
final case class IncrementReplyLater(replyTo: ActorRef[Done]) extends Command[Done]
final case class ReplyNow(replyTo: ActorRef[Done]) extends Command[Done]
final case class GetValue(replyTo: ActorRef[State]) extends Command[State]
final case class State(value: Int) extends CborSerializable
def counter(persistenceId: PersistenceId): Behavior[Command[_]] =
Behaviors.setup(ctx => counter(ctx, persistenceId))
def counter(ctx: ActorContext[Command[_]], persistenceId: PersistenceId): DurableStateBehavior[Command[_], State] = {
DurableStateBehavior
.withEnforcedReplies[Command[_], State](
persistenceId,
emptyState = State(0),
commandHandler = (state, command) =>
command match {
case IncrementWithConfirmation(replyTo) =>
Effect.persist(state.copy(value = state.value + 1)).thenReply(replyTo)(_ => Done)
case IncrementReplyLater(replyTo) =>
Effect
.persist(state.copy(value = state.value + 1))
.thenRun((_: State) => ctx.self ! ReplyNow(replyTo))
.thenNoReply()
case ReplyNow(replyTo) =>
Effect.reply(replyTo)(Done)
case GetValue(replyTo) =>
Effect.reply(replyTo)(state)
})
.withDurableStateStorePluginId("akka.persistence.state.inmem")
}
}
class DurableStateBehaviorReplySpec
extends ScalaTestWithActorTestKit(DurableStateBehaviorReplySpec.conf)
with AnyWordSpecLike
with LogCapturing {
import DurableStateBehaviorReplySpec._
val pidCounter = new AtomicInteger(0)
private def nextPid(): PersistenceId = PersistenceId.ofUniqueId(s"c${pidCounter.incrementAndGet()})")
"A typed persistent actor with commands that are expecting replies" must {
"persist an event thenReply" in {
val c = spawn(counter(nextPid()))
val probe = TestProbe[Done]()
c ! IncrementWithConfirmation(probe.ref)
probe.expectMessage(Done)
c ! IncrementWithConfirmation(probe.ref)
c ! IncrementWithConfirmation(probe.ref)
probe.expectMessage(Done)
probe.expectMessage(Done)
}
"persist an event thenReply later" in {
val c = spawn(counter(nextPid()))
val probe = TestProbe[Done]()
c ! IncrementReplyLater(probe.ref)
probe.expectMessage(Done)
}
"reply to query command" in {
val c = spawn(counter(nextPid()))
val updateProbe = TestProbe[Done]()
c ! IncrementWithConfirmation(updateProbe.ref)
val queryProbe = TestProbe[State]()
c ! GetValue(queryProbe.ref)
queryProbe.expectMessage(State(1))
}
}
}

View file

@ -0,0 +1,19 @@
/*
* Copyright (C) 2021 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.state.inmem
import akka.persistence.state.DurableStateStoreProvider
import akka.persistence.state.scaladsl.DurableStateStore
import akka.persistence.state.inmem.scaladsl.InmemDurableStateStore
import akka.persistence.state.javadsl.{ DurableStateStore => JDurableStateStore }
import akka.persistence.state.inmem.javadsl.{ InmemDurableStateStore => JInmemDurableStateStore }
class InmemDurableStateStoreProvider extends DurableStateStoreProvider {
override def scaladslDurableStateStore(): DurableStateStore[Any] =
new InmemDurableStateStore[Any]
override def javadslDurableStateStore(): JDurableStateStore[AnyRef] =
new JInmemDurableStateStore[AnyRef]
}

View file

@ -0,0 +1,32 @@
/*
* Copyright (C) 2021 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.state.inmem.javadsl
import java.util.Optional
import java.util.concurrent.{ CompletionStage, ConcurrentHashMap }
import scala.concurrent.Future
import scala.compat.java8.FutureConverters._
import akka.Done
import akka.persistence.state.javadsl.{ DurableStateUpdateStore, GetObjectResult }
class InmemDurableStateStore[A] extends DurableStateUpdateStore[A] {
val store = new ConcurrentHashMap[String, A]()
def getObject(persistenceId: String): CompletionStage[GetObjectResult[A]] =
toJava(Future.successful(GetObjectResult(Optional.ofNullable(store.get(persistenceId)), 0)))
def upsertObject(persistenceId: String, seqNr: Long, value: A, tag: String): CompletionStage[Done] =
toJava(Future.successful(store.put(persistenceId, value) match {
case _ => Done
}))
def deleteObject(persistenceId: String): CompletionStage[Done] =
toJava(Future.successful(store.remove(persistenceId) match {
case _ => Done
}))
}

View file

@ -0,0 +1,28 @@
/*
* Copyright (C) 2021 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.state.inmem.scaladsl
import scala.collection.concurrent.TrieMap
import scala.concurrent.Future
import akka.Done
import akka.persistence.state.scaladsl.{ DurableStateUpdateStore, GetObjectResult }
class InmemDurableStateStore[A] extends DurableStateUpdateStore[A] {
val store = new TrieMap[String, A]()
def getObject(persistenceId: String): Future[GetObjectResult[A]] =
Future.successful(GetObjectResult(store.get(persistenceId), 0))
def upsertObject(persistenceId: String, seqNr: Long, value: A, tag: String): Future[Done] =
Future.successful(store.put(persistenceId, value) match {
case _ => Done
})
def deleteObject(persistenceId: String): Future[Done] =
Future.successful(store.remove(persistenceId) match {
case _ => Done
})
}