diff --git a/akka-docs/src/main/paradox/persistence-typed.md b/akka-docs/src/main/paradox/persistence-typed.md index dba8eb997f..e31a42f7a3 100644 --- a/akka-docs/src/main/paradox/persistence-typed.md +++ b/akka-docs/src/main/paradox/persistence-typed.md @@ -178,6 +178,14 @@ Scala The `onRecoveryCompleted` takes on an `ActorContext` and the current `State`. +## Tagging + +Persistence typed allows you to use event tags with the following `withTagging` method, +without using @ref[`EventAdapter`](persistence.md#event-adapters). + +Scala +: @@snip [BasicPersistentActorSpec.scala]($akka$/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentActorSpec.scala) { #tagging } + ## Current limitations * The `PersistentBehavior` can't be wrapped in other behaviors, such as `Actor.deferred`. See [#23694](https://github.com/akka/akka/issues/23694) diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/PersistentActorImpl.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/PersistentActorImpl.scala index 365344b4e3..f1bc9a9f54 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/PersistentActorImpl.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/PersistentActorImpl.scala @@ -16,6 +16,7 @@ import akka.persistence.typed.scaladsl.PersistentBehavior import akka.actor.typed.scaladsl.ActorContext import akka.actor.typed.Terminated import akka.actor.typed.internal.adapter.ActorRefAdapter +import akka.persistence.journal.Tagged /** * INTERNAL API @@ -107,7 +108,9 @@ import akka.actor.typed.internal.adapter.ActorRefAdapter // the invalid event, in case such validation is implemented in the event handler. // also, ensure that there is an event handler for each single event state = applyEvent(state, event) - persist(event) { _ ⇒ + val tags = behavior.tagger(event) + val eventToPersist = if (tags.isEmpty) event else Tagged(event, tags) + persist(eventToPersist) { _ ⇒ sideEffects.foreach(applySideEffect) } case PersistAll(events) ⇒ @@ -117,7 +120,11 @@ import akka.actor.typed.internal.adapter.ActorRefAdapter // also, ensure that there is an event handler for each single event var count = events.size state = events.foldLeft(state)(applyEvent) - persistAll(events) { _ ⇒ + val eventsToPersist = events.map { event ⇒ + val tags = behavior.tagger(event) + if (tags.isEmpty) event else Tagged(event, tags) + } + persistAll(eventsToPersist) { _ ⇒ count -= 1 if (count == 0) sideEffects.foreach(applySideEffect) } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/PersistentActor.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/PersistentActor.scala index 1ea64736eb..29554c6e00 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/PersistentActor.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/scaladsl/PersistentActor.scala @@ -3,11 +3,12 @@ */ package akka.persistence.typed.scaladsl -import scala.collection.{ immutable ⇒ im } -import akka.annotation.{ DoNotInherit, InternalApi } import akka.actor.typed.Behavior.UntypedBehavior -import akka.persistence.typed.internal.PersistentActorImpl import akka.actor.typed.scaladsl.ActorContext +import akka.annotation.{ DoNotInherit, InternalApi } +import akka.persistence.typed.internal.PersistentActorImpl + +import scala.collection.{ immutable ⇒ im } object PersistentActor { @@ -34,10 +35,11 @@ object PersistentActor { commandHandler: CommandHandler[Command, Event, State], eventHandler: (State, Event) ⇒ State): PersistentBehavior[Command, Event, State] = new PersistentBehavior(persistenceIdFromActorName, initialState, commandHandler, eventHandler, - recoveryCompleted = (_, _) ⇒ ()) + recoveryCompleted = (_, _) ⇒ (), + tagger = _ ⇒ Set.empty) /** - * Factories for effects - how a persitent actor reacts on a command + * Factories for effects - how a persistent actor reacts on a command */ object Effect { @@ -187,7 +189,8 @@ class PersistentBehavior[Command, Event, State]( val initialState: State, val commandHandler: PersistentActor.CommandHandler[Command, Event, State], val eventHandler: (State, Event) ⇒ State, - val recoveryCompleted: (ActorContext[Command], State) ⇒ Unit) extends UntypedBehavior[Command] { + val recoveryCompleted: (ActorContext[Command], State) ⇒ Unit, + val tagger: Event ⇒ Set[String]) extends UntypedBehavior[Command] { import PersistentActor._ /** INTERNAL API */ @@ -210,11 +213,18 @@ class PersistentBehavior[Command, Event, State]( */ def snapshotOn(predicate: (State, Event) ⇒ Boolean): PersistentBehavior[Command, Event, State] = ??? + /** + * The `tagger` function should give event tags, which will be used in persistence query + */ + def withTagger(tagger: Event ⇒ Set[String]): PersistentBehavior[Command, Event, State] = + copy(tagger = tagger) + private def copy( persistenceIdFromActorName: String ⇒ String = persistenceIdFromActorName, initialState: State = initialState, commandHandler: CommandHandler[Command, Event, State] = commandHandler, eventHandler: (State, Event) ⇒ State = eventHandler, - recoveryCompleted: (ActorContext[Command], State) ⇒ Unit = recoveryCompleted): PersistentBehavior[Command, Event, State] = - new PersistentBehavior(persistenceIdFromActorName, initialState, commandHandler, eventHandler, recoveryCompleted) + recoveryCompleted: (ActorContext[Command], State) ⇒ Unit = recoveryCompleted, + tagger: Event ⇒ Set[String] = tagger): PersistentBehavior[Command, Event, State] = + new PersistentBehavior(persistenceIdFromActorName, initialState, commandHandler, eventHandler, recoveryCompleted, tagger) } diff --git a/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentActorSpec.scala b/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentActorSpec.scala index 9678307656..04b0eae45d 100644 --- a/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentActorSpec.scala +++ b/akka-persistence-typed/src/test/scala/docs/akka/persistence/typed/BasicPersistentActorSpec.scala @@ -33,4 +33,14 @@ object BasicPersistentActorSpec { } //#recovery + //#tagging + val taggingBehavior: Behavior[Command] = + PersistentActor.immutable[Command, Event, State]( + persistenceId = "abc", + initialState = State(), + commandHandler = (ctx, state, cmd) ⇒ ???, + eventHandler = (state, evt) ⇒ ??? + ).withTagger(_ ⇒ Set("tag1", "tag2")) + + //#tagging }