pekko/akka-samples/akka-sample-persistence-scala/src/main/scala/sample/persistence/EventsourcedExample.scala

69 lines
1.7 KiB
Scala
Raw Normal View History

2013-10-15 09:01:07 +02:00
package sample.persistence
//#eventsourced-example
import akka.actor._
import akka.persistence._
2014-03-07 13:20:01 +01:00
final case class Cmd(data: String)
final case class Evt(data: String)
2013-10-15 09:01:07 +02:00
2014-03-07 13:20:01 +01:00
final case class ExampleState(events: List[String] = Nil) {
2013-10-15 09:01:07 +02:00
def update(evt: Evt) = copy(evt.data :: events)
def size = events.length
override def toString: String = events.reverse.toString
}
class ExampleProcessor extends EventsourcedProcessor {
var state = ExampleState()
def updateState(event: Evt): Unit =
state = state.update(event)
def numEvents =
state.size
val receiveRecover: Receive = {
case evt: Evt => updateState(evt)
case SnapshotOffer(_, snapshot: ExampleState) => state = snapshot
2013-10-15 09:01:07 +02:00
}
val receiveCommand: Receive = {
case Cmd(data) =>
2013-10-15 09:01:07 +02:00
persist(Evt(s"${data}-${numEvents}"))(updateState)
persist(Evt(s"${data}-${numEvents + 1}")) { event =>
2013-10-15 09:01:07 +02:00
updateState(event)
context.system.eventStream.publish(event)
if (data == "foo") context.become(otherCommandHandler)
}
case "snap" => saveSnapshot(state)
case "print" => println(state)
2013-10-15 09:01:07 +02:00
}
val otherCommandHandler: Receive = {
case Cmd("bar") =>
persist(Evt(s"bar-${numEvents}")) { event =>
2013-10-15 09:01:07 +02:00
updateState(event)
context.unbecome()
}
unstashAll()
case other => stash()
2013-10-15 09:01:07 +02:00
}
}
//#eventsourced-example
object EventsourcedExample extends App {
val system = ActorSystem("example")
val processor = system.actorOf(Props[ExampleProcessor], "processor-4-scala")
processor ! Cmd("foo")
processor ! Cmd("baz") // will be stashed
processor ! Cmd("bar")
processor ! "snap"
processor ! Cmd("buzz")
processor ! "print"
Thread.sleep(1000)
system.shutdown()
}