Allow tagging in persistence typed (#24217)

* Allow tagging in persistence typed (#23817)

* Use Set[String] for tags

* Documentation for persistence typed tagging

* Rename tagging parameter to tagger
This commit is contained in:
Richard Imaoka 2018-01-17 22:20:49 +09:00 committed by Patrik Nordwall
parent f890bc6abf
commit 38b10683a9
4 changed files with 45 additions and 10 deletions

View file

@ -178,6 +178,14 @@ Scala
The `onRecoveryCompleted` takes on an `ActorContext` and the current `State`.
## Tagging
Persistence typed allows you to use event tags with the following `withTagging` method,
without using @ref[`EventAdapter`](persistence.md#event-adapters).
Scala
: @@snip [BasicPersistentActorSpec.scala]($akka$/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentActorSpec.scala) { #tagging }
## Current limitations
* The `PersistentBehavior` can't be wrapped in other behaviors, such as `Actor.deferred`. See [#23694](https://github.com/akka/akka/issues/23694)

View file

@ -16,6 +16,7 @@ import akka.persistence.typed.scaladsl.PersistentBehavior
import akka.actor.typed.scaladsl.ActorContext
import akka.actor.typed.Terminated
import akka.actor.typed.internal.adapter.ActorRefAdapter
import akka.persistence.journal.Tagged
/**
* INTERNAL API
@ -107,7 +108,9 @@ import akka.actor.typed.internal.adapter.ActorRefAdapter
// the invalid event, in case such validation is implemented in the event handler.
// also, ensure that there is an event handler for each single event
state = applyEvent(state, event)
persist(event) { _
val tags = behavior.tagger(event)
val eventToPersist = if (tags.isEmpty) event else Tagged(event, tags)
persist(eventToPersist) { _
sideEffects.foreach(applySideEffect)
}
case PersistAll(events)
@ -117,7 +120,11 @@ import akka.actor.typed.internal.adapter.ActorRefAdapter
// also, ensure that there is an event handler for each single event
var count = events.size
state = events.foldLeft(state)(applyEvent)
persistAll(events) { _
val eventsToPersist = events.map { event
val tags = behavior.tagger(event)
if (tags.isEmpty) event else Tagged(event, tags)
}
persistAll(eventsToPersist) { _
count -= 1
if (count == 0) sideEffects.foreach(applySideEffect)
}

View file

@ -3,11 +3,12 @@
*/
package akka.persistence.typed.scaladsl
import scala.collection.{ immutable im }
import akka.annotation.{ DoNotInherit, InternalApi }
import akka.actor.typed.Behavior.UntypedBehavior
import akka.persistence.typed.internal.PersistentActorImpl
import akka.actor.typed.scaladsl.ActorContext
import akka.annotation.{ DoNotInherit, InternalApi }
import akka.persistence.typed.internal.PersistentActorImpl
import scala.collection.{ immutable im }
object PersistentActor {
@ -34,10 +35,11 @@ object PersistentActor {
commandHandler: CommandHandler[Command, Event, State],
eventHandler: (State, Event) State): PersistentBehavior[Command, Event, State] =
new PersistentBehavior(persistenceIdFromActorName, initialState, commandHandler, eventHandler,
recoveryCompleted = (_, _) ())
recoveryCompleted = (_, _) (),
tagger = _ Set.empty)
/**
* Factories for effects - how a persitent actor reacts on a command
* Factories for effects - how a persistent actor reacts on a command
*/
object Effect {
@ -187,7 +189,8 @@ class PersistentBehavior[Command, Event, State](
val initialState: State,
val commandHandler: PersistentActor.CommandHandler[Command, Event, State],
val eventHandler: (State, Event) State,
val recoveryCompleted: (ActorContext[Command], State) Unit) extends UntypedBehavior[Command] {
val recoveryCompleted: (ActorContext[Command], State) Unit,
val tagger: Event Set[String]) extends UntypedBehavior[Command] {
import PersistentActor._
/** INTERNAL API */
@ -210,11 +213,18 @@ class PersistentBehavior[Command, Event, State](
*/
def snapshotOn(predicate: (State, Event) Boolean): PersistentBehavior[Command, Event, State] = ???
/**
* The `tagger` function should give event tags, which will be used in persistence query
*/
def withTagger(tagger: Event Set[String]): PersistentBehavior[Command, Event, State] =
copy(tagger = tagger)
private def copy(
persistenceIdFromActorName: String String = persistenceIdFromActorName,
initialState: State = initialState,
commandHandler: CommandHandler[Command, Event, State] = commandHandler,
eventHandler: (State, Event) State = eventHandler,
recoveryCompleted: (ActorContext[Command], State) Unit = recoveryCompleted): PersistentBehavior[Command, Event, State] =
new PersistentBehavior(persistenceIdFromActorName, initialState, commandHandler, eventHandler, recoveryCompleted)
recoveryCompleted: (ActorContext[Command], State) Unit = recoveryCompleted,
tagger: Event Set[String] = tagger): PersistentBehavior[Command, Event, State] =
new PersistentBehavior(persistenceIdFromActorName, initialState, commandHandler, eventHandler, recoveryCompleted, tagger)
}

View file

@ -33,4 +33,14 @@ object BasicPersistentActorSpec {
}
//#recovery
//#tagging
val taggingBehavior: Behavior[Command] =
PersistentActor.immutable[Command, Event, State](
persistenceId = "abc",
initialState = State(),
commandHandler = (ctx, state, cmd) ???,
eventHandler = (state, evt) ???
).withTagger(_ Set("tag1", "tag2"))
//#tagging
}