=per #3774 Persistence activator templates

This commit is contained in:
Martin Krasser 2014-02-03 16:08:19 +01:00 committed by Patrik Nordwall
parent 1e86e6436b
commit cf6e61e45a
31 changed files with 518 additions and 76 deletions

View file

@ -439,7 +439,7 @@ event sourcing as a pattern on top of command sourcing). A processor that extend
``UntypedEventsourcedProcessor`` is defined by implementing ``onReceiveRecover`` and ``onReceiveCommand``. This is ``UntypedEventsourcedProcessor`` is defined by implementing ``onReceiveRecover`` and ``onReceiveCommand``. This is
demonstrated in the following example. demonstrated in the following example.
.. includecode:: ../../../akka-samples/akka-sample-persistence/src/main/java/sample/persistence/japi/EventsourcedExample.java#eventsourced-example .. includecode:: ../../../akka-samples/akka-sample-persistence-java/src/main/java/sample/persistence/EventsourcedExample.java#eventsourced-example
The example defines two data types, ``Cmd`` and ``Evt`` to represent commands and events, respectively. The The example defines two data types, ``Cmd`` and ``Evt`` to represent commands and events, respectively. The
``state`` of the ``ExampleProcessor`` is a list of persisted event data contained in ``ExampleState``. ``state`` of the ``ExampleProcessor`` is a list of persisted event data contained in ``ExampleState``.
@ -463,6 +463,10 @@ the ``persist`` call and the execution(s) of the associated event handler. This
calls in context of a single command. The example also shows how to switch between command different command handlers calls in context of a single command. The example also shows how to switch between command different command handlers
with ``getContext().become()`` and ``getContext().unbecome()``. with ``getContext().become()`` and ``getContext().unbecome()``.
The easiest way to run this example yourself is to download `Typesafe Activator <http://typesafe.com/platform/getstarted>`_
and open the tutorial named `Akka Persistence Samples with Java <http://typesafe.com/activator/template/akka-sample-persistence-java>`_.
It contains instructions on how to run the ``EventsourcedExample``.
Reliable event delivery Reliable event delivery
----------------------- -----------------------

View file

@ -449,7 +449,7 @@ as a pattern on top of command sourcing). A processor that extends this trait do
directly but uses the ``persist`` method to persist and handle events. The behavior of an ``EventsourcedProcessor`` directly but uses the ``persist`` method to persist and handle events. The behavior of an ``EventsourcedProcessor``
is defined by implementing ``receiveRecover`` and ``receiveCommand``. This is demonstrated in the following example. is defined by implementing ``receiveRecover`` and ``receiveCommand``. This is demonstrated in the following example.
.. includecode:: ../../../akka-samples/akka-sample-persistence/src/main/scala/sample/persistence/EventsourcedExample.scala#eventsourced-example .. includecode:: ../../../akka-samples/akka-sample-persistence-scala/src/main/scala/sample/persistence/EventsourcedExample.scala#eventsourced-example
The example defines two data types, ``Cmd`` and ``Evt`` to represent commands and events, respectively. The The example defines two data types, ``Cmd`` and ``Evt`` to represent commands and events, respectively. The
``state`` of the ``ExampleProcessor`` is a list of persisted event data contained in ``ExampleState``. ``state`` of the ``ExampleProcessor`` is a list of persisted event data contained in ``ExampleState``.
@ -473,6 +473,10 @@ the ``persist`` call and the execution(s) of the associated event handler. This
calls in context of a single command. The example also shows how to switch between command different command handlers calls in context of a single command. The example also shows how to switch between command different command handlers
with ``context.become()`` and ``context.unbecome()``. with ``context.become()`` and ``context.unbecome()``.
The easiest way to run this example yourself is to download `Typesafe Activator <http://typesafe.com/platform/getstarted>`_
and open the tutorial named `Akka Persistence Samples with Scala <http://typesafe.com/activator/template/akka-sample-persistence-scala>`_.
It contains instructions on how to run the ``EventsourcedExample``.
Reliable event delivery Reliable event delivery
----------------------- -----------------------

View file

@ -0,0 +1,17 @@
*#
*.iml
*.ipr
*.iws
*.pyc
*.tm.epoch
*.vim
*-shim.sbt
.idea/
/project/plugins/project
project/boot
target/
/logs
.cache
.classpath
.project
.settings

View file

@ -0,0 +1,13 @@
Copyright 2014 Typesafe, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

View file

@ -0,0 +1,7 @@
name=akka-sample-persistence-java
title=Akka Persistence Samples with Java
description=Akka Persistence Samples with Java
tags=akka,persistence,java,sample
authorName=Akka Team
authorLink=http://akka.io/
sourceLink=https://github.com/akka/akka

View file

@ -0,0 +1,10 @@
name := "akka-sample-persistence-java"
version := "1.0"
scalaVersion := "2.10.3"
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-persistence-experimental" % "2.3-SNAPSHOT"
)

View file

@ -0,0 +1 @@
sbt.version=0.13.1

View file

@ -0,0 +1,62 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package sample.persistence;
import akka.actor.*;
import akka.persistence.*;
public class ConversationRecoveryExample {
public static String PING = "PING";
public static String PONG = "PONG";
public static class Ping extends UntypedProcessor {
private final ActorRef pongChannel = getContext().actorOf(Channel.props(), "pongChannel");
private int counter = 0;
@Override
public void onReceive(Object message) throws Exception {
if (message instanceof ConfirmablePersistent) {
ConfirmablePersistent msg = (ConfirmablePersistent)message;
if (msg.payload().equals(PING)) {
counter += 1;
System.out.println(String.format("received ping %d times", counter));
msg.confirm();
if (!recoveryRunning()) Thread.sleep(1000);
pongChannel.tell(Deliver.create(msg.withPayload(PONG), getSender().path()), getSelf());
}
} else if (message.equals("init") && counter == 0) {
pongChannel.tell(Deliver.create(Persistent.create(PONG), getSender().path()), getSelf());
}
}
}
public static class Pong extends UntypedProcessor {
private final ActorRef pingChannel = getContext().actorOf(Channel.props(), "pingChannel");
private int counter = 0;
@Override
public void onReceive(Object message) throws Exception {
if (message instanceof ConfirmablePersistent) {
ConfirmablePersistent msg = (ConfirmablePersistent)message;
if (msg.payload().equals(PONG)) {
counter += 1;
System.out.println(String.format("received pong %d times", counter));
msg.confirm();
if (!recoveryRunning()) Thread.sleep(1000);
pingChannel.tell(Deliver.create(msg.withPayload(PING), getSender().path()), getSelf());
}
}
}
}
public static void main(String... args) throws Exception {
final ActorSystem system = ActorSystem.create("example");
final ActorRef ping = system.actorOf(Props.create(Ping.class), "ping");
final ActorRef pong = system.actorOf(Props.create(Pong.class), "pong");
ping.tell("init", pong);
}
}

View file

@ -1,4 +1,8 @@
package sample.persistence.japi; /**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package sample.persistence;
//#eventsourced-example //#eventsourced-example
import java.io.Serializable; import java.io.Serializable;

View file

@ -2,7 +2,7 @@
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/ */
package sample.persistence.japi; package sample.persistence;
import akka.actor.*; import akka.actor.*;
import akka.persistence.*; import akka.persistence.*;

View file

@ -2,7 +2,7 @@
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/ */
package sample.persistence.japi; package sample.persistence;
import java.util.ArrayList; import java.util.ArrayList;

View file

@ -2,7 +2,7 @@
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/ */
package sample.persistence.japi; package sample.persistence;
import java.io.Serializable; import java.io.Serializable;
import java.util.ArrayList; import java.util.ArrayList;

View file

@ -1,6 +1,12 @@
package sample.persistence.japi; /**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
import java.util.Scanner; package sample.persistence;
import java.util.concurrent.TimeUnit;
import scala.concurrent.duration.Duration;
import akka.actor.*; import akka.actor.*;
import akka.persistence.*; import akka.persistence.*;
@ -70,21 +76,7 @@ public class ViewExample {
final ActorRef processor = system.actorOf(Props.create(ExampleProcessor.class)); final ActorRef processor = system.actorOf(Props.create(ExampleProcessor.class));
final ActorRef view = system.actorOf(Props.create(ExampleView.class)); final ActorRef view = system.actorOf(Props.create(ExampleView.class));
Scanner scanner = new Scanner(System.in); system.scheduler().schedule(Duration.Zero(), Duration.create(2, TimeUnit.SECONDS), processor, Persistent.create("scheduled"), system.dispatcher(), null);
system.scheduler().schedule(Duration.Zero(), Duration.create(5, TimeUnit.SECONDS), view, "snap", system.dispatcher(), null);
while (scanner.hasNextLine()) {
String line = scanner.nextLine();
if (line.equals("exit")) {
break;
} else if (line.equals("sync")) {
view.tell(Update.create(false), null);
} else if (line.equals("snap")) {
view.tell("snap", null);
} else {
processor.tell(Persistent.create(line), null);
}
}
system.shutdown();
} }
} }

View file

@ -0,0 +1,6 @@
akka.persistence.journal.leveldb.dir = "target/example/journal"
akka.persistence.snapshot-store.local.dir = "target/example/snapshots"
# DO NOT USE THIS IN PRODUCTION !!!
# See also https://github.com/typesafehub/activator/issues/287
akka.persistence.journal.leveldb.native = false

View file

@ -0,0 +1,136 @@
<!-- <html> -->
<head>
<title>Akka Persistence Samples with Scala</title>
</head>
<body>
<div>
<p>
This tutorial contains examples that illustrate a subset of
<a href="http://doc.akka.io/docs/akka/2.3-SNAPSHOT/java/persistence.html" target="_blank">Akka Persistence</a> features.
</p>
<ul>
<li>Processors and channels</li>
<li>Processsor snapshots</li>
<li>Eventsourced processors</li>
<li>Processor failure handling</li>
<li>Processor views</li>
<li>Processor conversation recovery</li>
</ul>
<p>
Custom storage locations for the journal and snapshots can be defined in
<a href="#code/src/main/resources/application.conf" class="shortcut">application.conf</a>.
</p>
</div>
<div>
<h2>Processors and channels</h2>
<p>
<a href="#code/src/main/java/sample/persistence/ProcessorChannelExample.java" class="shortcut">ProcessorChannelExample.java</a>
defines an <code>ExampleProcessor</code> and an <code>ExampleDestination</code>. The processor sends messages to a
destination via a channel. The destination confirms the delivery of these messages so that replayed messages aren't
redundantly delivered to the destination. Repeated runs of the application demonstrates that the processor receives
both replayed and new messages whereas the channel only receives new messages, sent by the application. The processor
also receives replies from the destination, demonstrating that a channel preserves sender references.
</p>
<p>
To run this example, go to the <a href="#run" class="shortcut">Run</a> tab, and run the application main class
<b><code>sample.persistence.ProcessorChannelExample</code></b> several times.
</p>
</div>
<div>
<h2>Processor snapshots</h2>
<p>
<a href="#code/src/main/java/sample/persistence/SnapshotExample.java" class="shortcut">SnapshotExample.java</a>
demonstrates how processors can take snapshots of application state and recover from previously stored snapshots.
Snapshots are offered to processors at the beginning of recovery, before any messages (younger than the snapshot)
are replayed.
</p>
<p>
To run this example, go to the <a href="#run" class="shortcut">Run</a> tab, and run the application main class
<b><code>sample.persistence.SnapshotExample</code></b> several times. With every run, the state offered by the
most recent snapshot is printed to <code>stdout</code>, followed by the updated state after sending new persistent
messages to the processor.
</p>
</div>
<div>
<h2>Eventsourced processors</h2>
<p>
<a href="#code/src/main/java/sample/persistence/EventsourcedExample.java" class="shortcut">EventsourcedExample.java</a>
is described in detail in the <a href="http://doc.akka.io/docs/akka/2.3-SNAPSHOT/java/persistence.html#event-sourcing" target="_blank">Event sourcing</a>
section of the user documentation. With every application run, the <code>ExampleProcessor</code> is recovered from
events stored in previous application runs, processes new commands, stores new events and snapshots and prints the
current processor state to <code>stdout</code>.
</p>
<p>
To run this example, go to the <a href="#run" class="shortcut">Run</a> tab, and run the application main class
<b><code>sample.persistence.EventsourcedExample</code></b> several times.
</p>
</div>
<div>
<h2>Processor failure handling</h2>
<p>
<a href="#code/src/main/java/sample/persistence/ProcessorFailureExample.java" class="shortcut">ProcessorFailureExample.java</a>
shows how a processor can delete persistent messages from the journal if they threw an exception. Throwing an exception
restarts the processor and replays messages. In order to prevent that the message that caused the exception is replayed,
it is marked as deleted in the journal (during invocation of <code>preRestart</code>). This is a common pattern in
command-sourcing to compensate write-ahead logging of messages.
</p>
<p>
To run this example, go to the <a href="#run" class="shortcut">Run</a> tab, and run the application main class
<b><code>sample.persistence.ProcessorFailureExample</code></b> several times.
</p>
<p>
<a href="http://doc.akka.io/docs/akka/2.3-SNAPSHOT/java/persistence.html#event-sourcing" target="_blank">Event sourcing</a>
on the other hand, does not persist commands directly but rather events that have been derived from received commands
(not shown here). These events are known to be successfully applicable to current processor state i.e. there's
no need for deleting them from the journal. Event sourced processors usually have a lower throughput than command
sourced processors, as the maximum size of a write batch is limited by the number of persisted events per received
command.
</p>
</div>
<div>
<h2>Processor views</h2>
<p>
<a href="#code/src/main/java/sample/persistence/ViewExample.java" class="shortcut">ViewExample.java</a> demonstrates
how a view (<code>ExampleView</code>) is updated with the persistent message stream of a processor
(<code>ExampleProcessor</code>). Messages sent to the processor are read from <code>stdin</code>. Views also support
snapshotting and can be used in combination with channels in the same way as processors.
</p>
<p>
To run this example, go to the <a href="#run" class="shortcut">Run</a> tab, and run the application main class
<b><code>sample.persistence.ViewExample</code></b>.
</p>
<p>
Views can also receive events that have been persisted by event sourced processors (not shown).
</p>
</div>
<div>
<h2>Processor conversation recovery</h2>
<p>
<a href="#code/src/main/java/sample/persistence/ConversationRecoveryExample.java" class="shortcut">ConversationRecoveryExample.java</a>
defines two processors that send messages to each other via channels. The reliable delivery properties of channels,
in combination with processors, allow these processors to automatically resume their conversation after a JVM crash.
</p>
<p>
To run this example, go to the <a href="#run" class="shortcut">Run</a> tab, and run the application main class
<b><code>sample.persistence.ConversationRecoveryExample</code></b> several times.
</p>
</div>
</body>
</html>

View file

@ -0,0 +1,17 @@
*#
*.iml
*.ipr
*.iws
*.pyc
*.tm.epoch
*.vim
*-shim.sbt
.idea/
/project/plugins/project
project/boot
target/
/logs
.cache
.classpath
.project
.settings

View file

@ -0,0 +1,13 @@
Copyright 2014 Typesafe, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

View file

@ -0,0 +1,7 @@
name=akka-sample-persistence-scala
title=Akka Persistence Samples with Scala
description=Akka Persistence Samples with Scala
tags=akka,persistence,scala,sample
authorName=Akka Team
authorLink=http://akka.io/
sourceLink=https://github.com/akka/akka

View file

@ -0,0 +1,10 @@
name := "akka-sample-persistence-scala"
version := "1.0"
scalaVersion := "2.10.3"
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-persistence-experimental" % "2.3-SNAPSHOT"
)

View file

@ -0,0 +1 @@
sbt.version=0.13.1

View file

@ -0,0 +1,6 @@
akka.persistence.journal.leveldb.dir = "target/example/journal"
akka.persistence.snapshot-store.local.dir = "target/example/snapshots"
# DO NOT USE THIS IN PRODUCTION !!!
# See also https://github.com/typesafehub/activator/issues/287
akka.persistence.journal.leveldb.native = false

View file

@ -22,10 +22,9 @@ object ConversationRecoveryExample extends App {
m.confirm() m.confirm()
if (!recoveryRunning) Thread.sleep(1000) if (!recoveryRunning) Thread.sleep(1000)
pongChannel ! Deliver(m.withPayload(Pong), sender.path) pongChannel ! Deliver(m.withPayload(Pong), sender.path)
case "init" => if (counter == 0) pongChannel ! Deliver(Persistent(Pong), sender.path) case "init" if (counter == 0) =>
pongChannel ! Deliver(Persistent(Pong), sender.path)
} }
override def preStart() = ()
} }
class Pong extends Processor { class Pong extends Processor {
@ -40,8 +39,6 @@ object ConversationRecoveryExample extends App {
if (!recoveryRunning) Thread.sleep(1000) if (!recoveryRunning) Thread.sleep(1000)
pingChannel ! Deliver(m.withPayload(Ping), sender.path) pingChannel ! Deliver(m.withPayload(Ping), sender.path)
} }
override def preStart() = ()
} }
val system = ActorSystem("example") val system = ActorSystem("example")
@ -49,8 +46,5 @@ object ConversationRecoveryExample extends App {
val ping = system.actorOf(Props(classOf[Ping]), "ping") val ping = system.actorOf(Props(classOf[Ping]), "ping")
val pong = system.actorOf(Props(classOf[Pong]), "pong") val pong = system.actorOf(Props(classOf[Pong]), "pong")
ping ! Recover()
pong ! Recover()
ping tell ("init", pong) ping tell ("init", pong)
} }

View file

@ -34,19 +34,19 @@ object ProcessorChannelRemoteExample {
""") """)
} }
object SenderApp extends App { object SenderApp /*extends App*/ { // no app until https://github.com/typesafehub/activator/issues/287 is fixed
import ProcessorChannelRemoteExample._ import ProcessorChannelRemoteExample._
class ExampleProcessor(destination: ActorPath) extends Processor { class ExampleProcessor(destination: ActorPath) extends Processor {
val listener = context.actorOf(Props[ExampleListener]) val listener = context.actorOf(Props[ExampleListener])
val channel = context.actorOf(Channel.props(ChannelSettings( val channel = context.actorOf(Channel.props(ChannelSettings(
redeliverMax = 5, redeliverMax = 15,
redeliverInterval = 1.second, redeliverInterval = 3.seconds,
redeliverFailureListener = Some(listener))), "channel") redeliverFailureListener = Some(listener))), "channel")
def receive = { def receive = {
case p @ Persistent(payload, _) => case p @ Persistent(payload, snr) =>
println(s"[processor] received payload: ${payload} (replayed = ${recoveryRunning})") println(s"[processor] received payload: ${payload} (snr = ${snr}, replayed = ${recoveryRunning})")
channel ! Deliver(p.withPayload(s"processed ${payload}"), destination) channel ! Deliver(p.withPayload(s"processed ${payload}"), destination)
case "restart" => case "restart" =>
throw new Exception("restart requested") throw new Exception("restart requested")
@ -64,35 +64,23 @@ object SenderApp extends App {
} }
val receiverPath = ActorPath.fromString("akka.tcp://receiver@127.0.0.1:44317/user/receiver") val receiverPath = ActorPath.fromString("akka.tcp://receiver@127.0.0.1:44317/user/receiver")
val senderConfig = ConfigFactory.parseString(""" val senderConfig = ConfigFactory.parseString("akka.remote.netty.tcp.port = 44316")
akka.persistence.journal.leveldb.dir = "target/example/journal"
akka.persistence.snapshot-store.local.dir = "target/example/snapshots"
akka.remote.netty.tcp.port = 44316
""")
val system = ActorSystem("sender", config.withFallback(senderConfig)) val system = ActorSystem("sender", config.withFallback(senderConfig))
val sender = system.actorOf(Props(classOf[ExampleProcessor], receiverPath)) val sender = system.actorOf(Props(classOf[ExampleProcessor], receiverPath))
@annotation.tailrec import system.dispatcher
def read(line: String): Unit = line match {
case "exit" | null => system.scheduler.schedule(Duration.Zero, 3.seconds, sender, Persistent("scheduled"))
case msg =>
sender ! Persistent(msg)
read(Console.readLine())
} }
read(Console.readLine()) object ReceiverApp /*extends App*/ { // no app until https://github.com/typesafehub/activator/issues/287 is fixed
system.shutdown()
}
object ReceiverApp extends App {
import ProcessorChannelRemoteExample._ import ProcessorChannelRemoteExample._
class ExampleDestination extends Actor { class ExampleDestination extends Actor {
def receive = { def receive = {
case p @ ConfirmablePersistent(payload, snr, _) => case p @ ConfirmablePersistent(payload, snr, redel) =>
println(s"[destination] received payload: ${payload}") println(s"[destination] received payload: ${payload} (snr = ${snr}, redel = ${redel})")
sender ! s"re: ${payload} (snr = ${snr})" sender ! s"re: ${payload} (snr = ${snr})"
p.confirm() p.confirm()
} }

View file

@ -4,6 +4,8 @@
package sample.persistence package sample.persistence
import scala.concurrent.duration._
import akka.actor._ import akka.actor._
import akka.persistence._ import akka.persistence._
@ -52,20 +54,8 @@ object ViewExample extends App {
val processor = system.actorOf(Props(classOf[ExampleProcessor])) val processor = system.actorOf(Props(classOf[ExampleProcessor]))
val view = system.actorOf(Props(classOf[ExampleView])) val view = system.actorOf(Props(classOf[ExampleView]))
@annotation.tailrec import system.dispatcher
def read(line: String): Unit = line match {
case "exit" | null =>
case "sync" =>
view ! Update(await = false)
read(Console.readLine())
case "snap" =>
view ! "snap"
read(Console.readLine())
case msg =>
processor ! Persistent(msg)
read(Console.readLine())
}
read(Console.readLine()) system.scheduler.schedule(Duration.Zero, 2.seconds, processor, Persistent("scheduled"))
system.shutdown() system.scheduler.schedule(Duration.Zero, 5.seconds, view, "snap")
} }

View file

@ -0,0 +1,155 @@
<!-- <html> -->
<head>
<title>Akka Persistence Samples with Scala</title>
</head>
<body>
<div>
<p>
This tutorial contains examples that illustrate a subset of
<a href="http://doc.akka.io/docs/akka/2.3-SNAPSHOT/scala/persistence.html" target="_blank">Akka Persistence</a> features.
</p>
<ul>
<li>Processors and channels</li>
<li>Processsor snapshots</li>
<li>Eventsourced processors</li>
<li>Processor failure handling</li>
<li>Processor views</li>
<li>Processor conversation recovery</li>
</ul>
<p>
Custom storage locations for the journal and snapshots can be defined in
<a href="#code/src/main/resources/application.conf" class="shortcut">application.conf</a>.
</p>
</div>
<div>
<h2>Processors and channels</h2>
<p>
<a href="#code/src/main/scala/sample/persistence/ProcessorChannelExample.scala" class="shortcut">ProcessorChannelExample.scala</a>
defines an <code>ExampleProcessor</code> and an <code>ExampleDestination</code>. The processor sends messages to a
destination via a channel. The destination confirms the delivery of these messages so that replayed messages aren't
redundantly delivered to the destination. Repeated runs of the application demonstrates that the processor receives
both replayed and new messages whereas the channel only receives new messages, sent by the application. The processor
also receives replies from the destination, demonstrating that a channel preserves sender references.
</p>
<p>
To run this example, go to the <a href="#run" class="shortcut">Run</a> tab, and run the application main class
<b><code>sample.persistence.ProcessorChannelExample</code></b> several times.
</p>
<!-- do not show until https://github.com/typesafehub/activator/issues/287 is fixed
<p>
<a href="#code/src/main/scala/sample/persistence/ProcessorChannelRemoteExample.scala" class="shortcut">ProcessorChannelRemoteExample.scala</a>
is similar to the previous example. Here, processor and destination run in different JVMs. The processor JVM prompts the user for new
messages. By shutting down and restarting processor and/or destination JVMs, the reliable delivery properties of channels can
be examined. Furthermore, the application also registers an <code>ExampleListener</code> at the channel which
receives <code>RedeliverFailure</code> messages should the maximum number of redelivery attempts be reached.
In this case the sending processor is restarted and a new round of redelivery is started (triggered by un-confirmed,
replayed messages).
</p>
<p>
To run this example, go to the <a href="#run" class="shortcut">Run</a> tab, and start the processor JVM by running the
main class <b><code>sample.persistence.SenderApp</code></b>. The destination JVM can be starting by running the
main class <b><code>sample.persistence.ReceiverApp</code></b>
</p>
-->
</div>
<div>
<h2>Processor snapshots</h2>
<p>
<a href="#code/src/main/scala/sample/persistence/SnapshotExample.scala" class="shortcut">SnapshotExample.scala</a>
demonstrates how processors can take snapshots of application state and recover from previously stored snapshots.
Snapshots are offered to processors at the beginning of recovery, before any messages (younger than the snapshot)
are replayed.
</p>
<p>
To run this example, go to the <a href="#run" class="shortcut">Run</a> tab, and run the application main class
<b><code>sample.persistence.SnapshotExample</code></b> several times. With every run, the state offered by the
most recent snapshot is printed to <code>stdout</code>, followed by the updated state after sending new persistent
messages to the processor.
</p>
</div>
<div>
<h2>Eventsourced processors</h2>
<p>
<a href="#code/src/main/scala/sample/persistence/EventsourcedExample.scala" class="shortcut">EventsourcedExample.scala</a>
is described in detail in the <a href="http://doc.akka.io/docs/akka/2.3-SNAPSHOT/scala/persistence.html#event-sourcing" target="_blank">Event sourcing</a>
section of the user documentation. With every application run, the <code>ExampleProcessor</code> is recovered from
events stored in previous application runs, processes new commands, stores new events and snapshots and prints the
current processor state to <code>stdout</code>.
</p>
<p>
To run this example, go to the <a href="#run" class="shortcut">Run</a> tab, and run the application main class
<b><code>sample.persistence.EventsourcedExample</code></b> several times.
</p>
</div>
<div>
<h2>Processor failure handling</h2>
<p>
<a href="#code/src/main/scala/sample/persistence/ProcessorFailureExample.scala" class="shortcut">ProcessorFailureExample.scala</a>
shows how a processor can delete persistent messages from the journal if they threw an exception. Throwing an exception
restarts the processor and replays messages. In order to prevent that the message that caused the exception is replayed,
it is marked as deleted in the journal (during invocation of <code>preRestart</code>). This is a common pattern in
command-sourcing to compensate write-ahead logging of messages.
</p>
<p>
To run this example, go to the <a href="#run" class="shortcut">Run</a> tab, and run the application main class
<b><code>sample.persistence.ProcessorFailureExample</code></b> several times.
</p>
<p>
<a href="http://doc.akka.io/docs/akka/2.3-SNAPSHOT/scala/persistence.html#event-sourcing" target="_blank">Event sourcing</a>
on the other hand, does not persist commands directly but rather events that have been derived from received commands
(not shown here). These events are known to be successfully applicable to current processor state i.e. there's
no need for deleting them from the journal. Event sourced processors usually have a lower throughput than command
sourced processors, as the maximum size of a write batch is limited by the number of persisted events per received
command.
</p>
</div>
<div>
<h2>Processor views</h2>
<p>
<a href="#code/src/main/scala/sample/persistence/ViewExample.scala" class="shortcut">ViewExample.scala</a> demonstrates
how a view (<code>ExampleView</code>) is updated with the persistent message stream of a processor
(<code>ExampleProcessor</code>). Messages sent to the processor are read from <code>stdin</code>. Views also support
snapshotting and can be used in combination with channels in the same way as processors.
</p>
<p>
To run this example, go to the <a href="#run" class="shortcut">Run</a> tab, and run the application main class
<b><code>sample.persistence.ViewExample</code></b>.
</p>
<p>
Views can also receive events that have been persisted by event sourced processors (not shown).
</p>
</div>
<div>
<h2>Processor conversation recovery</h2>
<p>
<a href="#code/src/main/scala/sample/persistence/ConversationRecoveryExample.scala" class="shortcut">ConversationRecoveryExample.scala</a>
defines two processors that send messages to each other via channels. The reliable delivery properties of channels,
in combination with processors, allow these processors to automatically resume their conversation after a JVM crash.
</p>
<p>
To run this example, go to the <a href="#run" class="shortcut">Run</a> tab, and run the application main class
<b><code>sample.persistence.ConversationRecoveryExample</code></b> several times.
</p>
</div>
</body>
</html>

View file

@ -1,2 +0,0 @@
akka.persistence.journal.leveldb.dir = "target/example/journal"
akka.persistence.snapshot-store.local.dir = "target/example/snapshots"

View file

@ -369,7 +369,7 @@ object AkkaBuild extends Build {
settings = parentSettings ++ ActivatorDist.settings, settings = parentSettings ++ ActivatorDist.settings,
aggregate = Seq(camelSampleJava, camelSampleScala, mainSampleJava, mainSampleScala, aggregate = Seq(camelSampleJava, camelSampleScala, mainSampleJava, mainSampleScala,
remoteSampleJava, remoteSampleScala, clusterSampleJava, clusterSampleScala, remoteSampleJava, remoteSampleScala, clusterSampleJava, clusterSampleScala,
fsmSampleScala, persistenceSample, fsmSampleScala, persistenceSampleJava, persistenceSampleScala,
multiNodeSampleScala, helloKernelSample, osgiDiningHakkersSample) multiNodeSampleScala, helloKernelSample, osgiDiningHakkersSample)
) )
@ -429,9 +429,16 @@ object AkkaBuild extends Build {
settings = sampleSettings settings = sampleSettings
) )
lazy val persistenceSample = Project( lazy val persistenceSampleJava = Project(
id = "akka-sample-persistence", id = "akka-sample-persistence-java",
base = file("akka-samples/akka-sample-persistence"), base = file("akka-samples/akka-sample-persistence-java"),
dependencies = Seq(actor, persistence),
settings = sampleSettings
)
lazy val persistenceSampleScala = Project(
id = "akka-sample-persistence-scala",
base = file("akka-samples/akka-sample-persistence-scala"),
dependencies = Seq(actor, persistence), dependencies = Seq(actor, persistence),
settings = sampleSettings settings = sampleSettings
) )