diff --git a/akka-docs/rst/java/persistence.rst b/akka-docs/rst/java/persistence.rst index bcd99a4068..68ca57f324 100644 --- a/akka-docs/rst/java/persistence.rst +++ b/akka-docs/rst/java/persistence.rst @@ -439,7 +439,7 @@ event sourcing as a pattern on top of command sourcing). A processor that extend ``UntypedEventsourcedProcessor`` is defined by implementing ``onReceiveRecover`` and ``onReceiveCommand``. This is demonstrated in the following example. -.. includecode:: ../../../akka-samples/akka-sample-persistence/src/main/java/sample/persistence/japi/EventsourcedExample.java#eventsourced-example +.. includecode:: ../../../akka-samples/akka-sample-persistence-java/src/main/java/sample/persistence/EventsourcedExample.java#eventsourced-example The example defines two data types, ``Cmd`` and ``Evt`` to represent commands and events, respectively. The ``state`` of the ``ExampleProcessor`` is a list of persisted event data contained in ``ExampleState``. @@ -463,6 +463,10 @@ the ``persist`` call and the execution(s) of the associated event handler. This calls in context of a single command. The example also shows how to switch between command different command handlers with ``getContext().become()`` and ``getContext().unbecome()``. +The easiest way to run this example yourself is to download `Typesafe Activator `_ +and open the tutorial named `Akka Persistence Samples with Java `_. +It contains instructions on how to run the ``EventsourcedExample``. + Reliable event delivery ----------------------- diff --git a/akka-docs/rst/scala/persistence.rst b/akka-docs/rst/scala/persistence.rst index 0860788505..a032e39582 100644 --- a/akka-docs/rst/scala/persistence.rst +++ b/akka-docs/rst/scala/persistence.rst @@ -449,7 +449,7 @@ as a pattern on top of command sourcing). A processor that extends this trait do directly but uses the ``persist`` method to persist and handle events. The behavior of an ``EventsourcedProcessor`` is defined by implementing ``receiveRecover`` and ``receiveCommand``. This is demonstrated in the following example. -.. includecode:: ../../../akka-samples/akka-sample-persistence/src/main/scala/sample/persistence/EventsourcedExample.scala#eventsourced-example +.. includecode:: ../../../akka-samples/akka-sample-persistence-scala/src/main/scala/sample/persistence/EventsourcedExample.scala#eventsourced-example The example defines two data types, ``Cmd`` and ``Evt`` to represent commands and events, respectively. The ``state`` of the ``ExampleProcessor`` is a list of persisted event data contained in ``ExampleState``. @@ -473,6 +473,10 @@ the ``persist`` call and the execution(s) of the associated event handler. This calls in context of a single command. The example also shows how to switch between command different command handlers with ``context.become()`` and ``context.unbecome()``. +The easiest way to run this example yourself is to download `Typesafe Activator `_ +and open the tutorial named `Akka Persistence Samples with Scala `_. +It contains instructions on how to run the ``EventsourcedExample``. + Reliable event delivery ----------------------- diff --git a/akka-samples/akka-sample-persistence-java/.gitignore b/akka-samples/akka-sample-persistence-java/.gitignore new file mode 100644 index 0000000000..660c959e44 --- /dev/null +++ b/akka-samples/akka-sample-persistence-java/.gitignore @@ -0,0 +1,17 @@ +*# +*.iml +*.ipr +*.iws +*.pyc +*.tm.epoch +*.vim +*-shim.sbt +.idea/ +/project/plugins/project +project/boot +target/ +/logs +.cache +.classpath +.project +.settings \ No newline at end of file diff --git a/akka-samples/akka-sample-persistence-java/LICENSE b/akka-samples/akka-sample-persistence-java/LICENSE new file mode 100644 index 0000000000..6c42406c7b --- /dev/null +++ b/akka-samples/akka-sample-persistence-java/LICENSE @@ -0,0 +1,13 @@ +Copyright 2014 Typesafe, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. diff --git a/akka-samples/akka-sample-persistence-java/activator.properties b/akka-samples/akka-sample-persistence-java/activator.properties new file mode 100644 index 0000000000..eca7098e0f --- /dev/null +++ b/akka-samples/akka-sample-persistence-java/activator.properties @@ -0,0 +1,7 @@ +name=akka-sample-persistence-java +title=Akka Persistence Samples with Java +description=Akka Persistence Samples with Java +tags=akka,persistence,java,sample +authorName=Akka Team +authorLink=http://akka.io/ +sourceLink=https://github.com/akka/akka diff --git a/akka-samples/akka-sample-persistence-java/build.sbt b/akka-samples/akka-sample-persistence-java/build.sbt new file mode 100644 index 0000000000..efe55eeb35 --- /dev/null +++ b/akka-samples/akka-sample-persistence-java/build.sbt @@ -0,0 +1,10 @@ +name := "akka-sample-persistence-java" + +version := "1.0" + +scalaVersion := "2.10.3" + +libraryDependencies ++= Seq( + "com.typesafe.akka" %% "akka-persistence-experimental" % "2.3-SNAPSHOT" +) + diff --git a/akka-samples/akka-sample-persistence-java/project/build.properties b/akka-samples/akka-sample-persistence-java/project/build.properties new file mode 100644 index 0000000000..37b489cb6e --- /dev/null +++ b/akka-samples/akka-sample-persistence-java/project/build.properties @@ -0,0 +1 @@ +sbt.version=0.13.1 diff --git a/akka-samples/akka-sample-persistence-java/src/main/java/sample/persistence/ConversationRecoveryExample.java b/akka-samples/akka-sample-persistence-java/src/main/java/sample/persistence/ConversationRecoveryExample.java new file mode 100644 index 0000000000..83b7ecbaaa --- /dev/null +++ b/akka-samples/akka-sample-persistence-java/src/main/java/sample/persistence/ConversationRecoveryExample.java @@ -0,0 +1,62 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ + +package sample.persistence; + +import akka.actor.*; +import akka.persistence.*; + +public class ConversationRecoveryExample { + public static String PING = "PING"; + public static String PONG = "PONG"; + + public static class Ping extends UntypedProcessor { + private final ActorRef pongChannel = getContext().actorOf(Channel.props(), "pongChannel"); + private int counter = 0; + + @Override + public void onReceive(Object message) throws Exception { + if (message instanceof ConfirmablePersistent) { + ConfirmablePersistent msg = (ConfirmablePersistent)message; + if (msg.payload().equals(PING)) { + counter += 1; + System.out.println(String.format("received ping %d times", counter)); + msg.confirm(); + if (!recoveryRunning()) Thread.sleep(1000); + pongChannel.tell(Deliver.create(msg.withPayload(PONG), getSender().path()), getSelf()); + } + } else if (message.equals("init") && counter == 0) { + pongChannel.tell(Deliver.create(Persistent.create(PONG), getSender().path()), getSelf()); + } + } + } + + public static class Pong extends UntypedProcessor { + private final ActorRef pingChannel = getContext().actorOf(Channel.props(), "pingChannel"); + private int counter = 0; + + @Override + public void onReceive(Object message) throws Exception { + if (message instanceof ConfirmablePersistent) { + ConfirmablePersistent msg = (ConfirmablePersistent)message; + if (msg.payload().equals(PONG)) { + counter += 1; + System.out.println(String.format("received pong %d times", counter)); + msg.confirm(); + if (!recoveryRunning()) Thread.sleep(1000); + pingChannel.tell(Deliver.create(msg.withPayload(PING), getSender().path()), getSelf()); + } + } + } + } + + public static void main(String... args) throws Exception { + final ActorSystem system = ActorSystem.create("example"); + + final ActorRef ping = system.actorOf(Props.create(Ping.class), "ping"); + final ActorRef pong = system.actorOf(Props.create(Pong.class), "pong"); + + ping.tell("init", pong); + } +} diff --git a/akka-samples/akka-sample-persistence/src/main/java/sample/persistence/japi/EventsourcedExample.java b/akka-samples/akka-sample-persistence-java/src/main/java/sample/persistence/EventsourcedExample.java similarity index 97% rename from akka-samples/akka-sample-persistence/src/main/java/sample/persistence/japi/EventsourcedExample.java rename to akka-samples/akka-sample-persistence-java/src/main/java/sample/persistence/EventsourcedExample.java index 9ab21b9324..f8a3419a75 100644 --- a/akka-samples/akka-sample-persistence/src/main/java/sample/persistence/japi/EventsourcedExample.java +++ b/akka-samples/akka-sample-persistence-java/src/main/java/sample/persistence/EventsourcedExample.java @@ -1,4 +1,8 @@ -package sample.persistence.japi; +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ + +package sample.persistence; //#eventsourced-example import java.io.Serializable; diff --git a/akka-samples/akka-sample-persistence/src/main/java/sample/persistence/japi/ProcessorChannelExample.java b/akka-samples/akka-sample-persistence-java/src/main/java/sample/persistence/ProcessorChannelExample.java similarity index 98% rename from akka-samples/akka-sample-persistence/src/main/java/sample/persistence/japi/ProcessorChannelExample.java rename to akka-samples/akka-sample-persistence-java/src/main/java/sample/persistence/ProcessorChannelExample.java index 8c7a22b589..28da133d98 100644 --- a/akka-samples/akka-sample-persistence/src/main/java/sample/persistence/japi/ProcessorChannelExample.java +++ b/akka-samples/akka-sample-persistence-java/src/main/java/sample/persistence/ProcessorChannelExample.java @@ -2,7 +2,7 @@ * Copyright (C) 2009-2014 Typesafe Inc. */ -package sample.persistence.japi; +package sample.persistence; import akka.actor.*; import akka.persistence.*; diff --git a/akka-samples/akka-sample-persistence/src/main/java/sample/persistence/japi/ProcessorFailureExample.java b/akka-samples/akka-sample-persistence-java/src/main/java/sample/persistence/ProcessorFailureExample.java similarity index 98% rename from akka-samples/akka-sample-persistence/src/main/java/sample/persistence/japi/ProcessorFailureExample.java rename to akka-samples/akka-sample-persistence-java/src/main/java/sample/persistence/ProcessorFailureExample.java index 93ea3ce59b..0e1f32db66 100644 --- a/akka-samples/akka-sample-persistence/src/main/java/sample/persistence/japi/ProcessorFailureExample.java +++ b/akka-samples/akka-sample-persistence-java/src/main/java/sample/persistence/ProcessorFailureExample.java @@ -2,7 +2,7 @@ * Copyright (C) 2009-2014 Typesafe Inc. */ -package sample.persistence.japi; +package sample.persistence; import java.util.ArrayList; diff --git a/akka-samples/akka-sample-persistence/src/main/java/sample/persistence/japi/SnapshotExample.java b/akka-samples/akka-sample-persistence-java/src/main/java/sample/persistence/SnapshotExample.java similarity index 98% rename from akka-samples/akka-sample-persistence/src/main/java/sample/persistence/japi/SnapshotExample.java rename to akka-samples/akka-sample-persistence-java/src/main/java/sample/persistence/SnapshotExample.java index 3ac2dc0f6d..4eec1983b6 100644 --- a/akka-samples/akka-sample-persistence/src/main/java/sample/persistence/japi/SnapshotExample.java +++ b/akka-samples/akka-sample-persistence-java/src/main/java/sample/persistence/SnapshotExample.java @@ -2,7 +2,7 @@ * Copyright (C) 2009-2014 Typesafe Inc. */ -package sample.persistence.japi; +package sample.persistence; import java.io.Serializable; import java.util.ArrayList; diff --git a/akka-samples/akka-sample-persistence/src/main/java/sample/persistence/japi/ViewExample.java b/akka-samples/akka-sample-persistence-java/src/main/java/sample/persistence/ViewExample.java similarity index 82% rename from akka-samples/akka-sample-persistence/src/main/java/sample/persistence/japi/ViewExample.java rename to akka-samples/akka-sample-persistence-java/src/main/java/sample/persistence/ViewExample.java index b1f886845a..b30808dc59 100644 --- a/akka-samples/akka-sample-persistence/src/main/java/sample/persistence/japi/ViewExample.java +++ b/akka-samples/akka-sample-persistence-java/src/main/java/sample/persistence/ViewExample.java @@ -1,6 +1,12 @@ -package sample.persistence.japi; +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ -import java.util.Scanner; +package sample.persistence; + +import java.util.concurrent.TimeUnit; + +import scala.concurrent.duration.Duration; import akka.actor.*; import akka.persistence.*; @@ -70,21 +76,7 @@ public class ViewExample { final ActorRef processor = system.actorOf(Props.create(ExampleProcessor.class)); final ActorRef view = system.actorOf(Props.create(ExampleView.class)); - Scanner scanner = new Scanner(System.in); - - while (scanner.hasNextLine()) { - String line = scanner.nextLine(); - if (line.equals("exit")) { - break; - } else if (line.equals("sync")) { - view.tell(Update.create(false), null); - } else if (line.equals("snap")) { - view.tell("snap", null); - } else { - processor.tell(Persistent.create(line), null); - } - } - - system.shutdown(); + system.scheduler().schedule(Duration.Zero(), Duration.create(2, TimeUnit.SECONDS), processor, Persistent.create("scheduled"), system.dispatcher(), null); + system.scheduler().schedule(Duration.Zero(), Duration.create(5, TimeUnit.SECONDS), view, "snap", system.dispatcher(), null); } } diff --git a/akka-samples/akka-sample-persistence-java/src/main/resources/application.conf b/akka-samples/akka-sample-persistence-java/src/main/resources/application.conf new file mode 100644 index 0000000000..da5be9b92c --- /dev/null +++ b/akka-samples/akka-sample-persistence-java/src/main/resources/application.conf @@ -0,0 +1,6 @@ +akka.persistence.journal.leveldb.dir = "target/example/journal" +akka.persistence.snapshot-store.local.dir = "target/example/snapshots" + +# DO NOT USE THIS IN PRODUCTION !!! +# See also https://github.com/typesafehub/activator/issues/287 +akka.persistence.journal.leveldb.native = false diff --git a/akka-samples/akka-sample-persistence-java/tutorial/index.html b/akka-samples/akka-sample-persistence-java/tutorial/index.html new file mode 100644 index 0000000000..9b0c6aae61 --- /dev/null +++ b/akka-samples/akka-sample-persistence-java/tutorial/index.html @@ -0,0 +1,136 @@ + + +Akka Persistence Samples with Scala + + + + +
+

+This tutorial contains examples that illustrate a subset of +Akka Persistence features. +

+
    +
  • Processors and channels
  • +
  • Processsor snapshots
  • +
  • Eventsourced processors
  • +
  • Processor failure handling
  • +
  • Processor views
  • +
  • Processor conversation recovery
  • +
+ +

+Custom storage locations for the journal and snapshots can be defined in +application.conf. +

+
+ +
+

Processors and channels

+

+ProcessorChannelExample.java +defines an ExampleProcessor and an ExampleDestination. The processor sends messages to a +destination via a channel. The destination confirms the delivery of these messages so that replayed messages aren't +redundantly delivered to the destination. Repeated runs of the application demonstrates that the processor receives +both replayed and new messages whereas the channel only receives new messages, sent by the application. The processor +also receives replies from the destination, demonstrating that a channel preserves sender references. +

+ +

+To run this example, go to the Run tab, and run the application main class +sample.persistence.ProcessorChannelExample several times. +

+
+ +
+

Processor snapshots

+

+SnapshotExample.java +demonstrates how processors can take snapshots of application state and recover from previously stored snapshots. +Snapshots are offered to processors at the beginning of recovery, before any messages (younger than the snapshot) +are replayed. +

+ +

+To run this example, go to the Run tab, and run the application main class +sample.persistence.SnapshotExample several times. With every run, the state offered by the +most recent snapshot is printed to stdout, followed by the updated state after sending new persistent +messages to the processor. +

+
+ +
+

Eventsourced processors

+

+EventsourcedExample.java +is described in detail in the Event sourcing +section of the user documentation. With every application run, the ExampleProcessor is recovered from +events stored in previous application runs, processes new commands, stores new events and snapshots and prints the +current processor state to stdout. +

+ +

+To run this example, go to the Run tab, and run the application main class +sample.persistence.EventsourcedExample several times. +

+
+ +
+

Processor failure handling

+

+ProcessorFailureExample.java +shows how a processor can delete persistent messages from the journal if they threw an exception. Throwing an exception +restarts the processor and replays messages. In order to prevent that the message that caused the exception is replayed, +it is marked as deleted in the journal (during invocation of preRestart). This is a common pattern in +command-sourcing to compensate write-ahead logging of messages. +

+ +

+To run this example, go to the Run tab, and run the application main class +sample.persistence.ProcessorFailureExample several times. +

+ +

+Event sourcing +on the other hand, does not persist commands directly but rather events that have been derived from received commands +(not shown here). These events are known to be successfully applicable to current processor state i.e. there's +no need for deleting them from the journal. Event sourced processors usually have a lower throughput than command +sourced processors, as the maximum size of a write batch is limited by the number of persisted events per received +command. +

+
+ +
+

Processor views

+

+ViewExample.java demonstrates +how a view (ExampleView) is updated with the persistent message stream of a processor +(ExampleProcessor). Messages sent to the processor are read from stdin. Views also support +snapshotting and can be used in combination with channels in the same way as processors. +

+ +

+To run this example, go to the Run tab, and run the application main class +sample.persistence.ViewExample. +

+ +

+Views can also receive events that have been persisted by event sourced processors (not shown). +

+
+ +
+

Processor conversation recovery

+

+ConversationRecoveryExample.java +defines two processors that send messages to each other via channels. The reliable delivery properties of channels, +in combination with processors, allow these processors to automatically resume their conversation after a JVM crash. +

+

+To run this example, go to the Run tab, and run the application main class +sample.persistence.ConversationRecoveryExample several times. +

+
+ + + diff --git a/akka-samples/akka-sample-persistence-scala/.gitignore b/akka-samples/akka-sample-persistence-scala/.gitignore new file mode 100644 index 0000000000..660c959e44 --- /dev/null +++ b/akka-samples/akka-sample-persistence-scala/.gitignore @@ -0,0 +1,17 @@ +*# +*.iml +*.ipr +*.iws +*.pyc +*.tm.epoch +*.vim +*-shim.sbt +.idea/ +/project/plugins/project +project/boot +target/ +/logs +.cache +.classpath +.project +.settings \ No newline at end of file diff --git a/akka-samples/akka-sample-persistence-scala/LICENSE b/akka-samples/akka-sample-persistence-scala/LICENSE new file mode 100644 index 0000000000..6c42406c7b --- /dev/null +++ b/akka-samples/akka-sample-persistence-scala/LICENSE @@ -0,0 +1,13 @@ +Copyright 2014 Typesafe, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. diff --git a/akka-samples/akka-sample-persistence-scala/activator.properties b/akka-samples/akka-sample-persistence-scala/activator.properties new file mode 100644 index 0000000000..21d1a65a6d --- /dev/null +++ b/akka-samples/akka-sample-persistence-scala/activator.properties @@ -0,0 +1,7 @@ +name=akka-sample-persistence-scala +title=Akka Persistence Samples with Scala +description=Akka Persistence Samples with Scala +tags=akka,persistence,scala,sample +authorName=Akka Team +authorLink=http://akka.io/ +sourceLink=https://github.com/akka/akka diff --git a/akka-samples/akka-sample-persistence-scala/build.sbt b/akka-samples/akka-sample-persistence-scala/build.sbt new file mode 100644 index 0000000000..873562e06e --- /dev/null +++ b/akka-samples/akka-sample-persistence-scala/build.sbt @@ -0,0 +1,10 @@ +name := "akka-sample-persistence-scala" + +version := "1.0" + +scalaVersion := "2.10.3" + +libraryDependencies ++= Seq( + "com.typesafe.akka" %% "akka-persistence-experimental" % "2.3-SNAPSHOT" +) + diff --git a/akka-samples/akka-sample-persistence-scala/project/build.properties b/akka-samples/akka-sample-persistence-scala/project/build.properties new file mode 100644 index 0000000000..37b489cb6e --- /dev/null +++ b/akka-samples/akka-sample-persistence-scala/project/build.properties @@ -0,0 +1 @@ +sbt.version=0.13.1 diff --git a/akka-samples/akka-sample-persistence-scala/src/main/resources/application.conf b/akka-samples/akka-sample-persistence-scala/src/main/resources/application.conf new file mode 100644 index 0000000000..da5be9b92c --- /dev/null +++ b/akka-samples/akka-sample-persistence-scala/src/main/resources/application.conf @@ -0,0 +1,6 @@ +akka.persistence.journal.leveldb.dir = "target/example/journal" +akka.persistence.snapshot-store.local.dir = "target/example/snapshots" + +# DO NOT USE THIS IN PRODUCTION !!! +# See also https://github.com/typesafehub/activator/issues/287 +akka.persistence.journal.leveldb.native = false diff --git a/akka-samples/akka-sample-persistence/src/main/scala/sample/persistence/ConversationRecoveryExample.scala b/akka-samples/akka-sample-persistence-scala/src/main/scala/sample/persistence/ConversationRecoveryExample.scala similarity index 86% rename from akka-samples/akka-sample-persistence/src/main/scala/sample/persistence/ConversationRecoveryExample.scala rename to akka-samples/akka-sample-persistence-scala/src/main/scala/sample/persistence/ConversationRecoveryExample.scala index 6a111bd3c3..bd2b186a2f 100644 --- a/akka-samples/akka-sample-persistence/src/main/scala/sample/persistence/ConversationRecoveryExample.scala +++ b/akka-samples/akka-sample-persistence-scala/src/main/scala/sample/persistence/ConversationRecoveryExample.scala @@ -22,10 +22,9 @@ object ConversationRecoveryExample extends App { m.confirm() if (!recoveryRunning) Thread.sleep(1000) pongChannel ! Deliver(m.withPayload(Pong), sender.path) - case "init" => if (counter == 0) pongChannel ! Deliver(Persistent(Pong), sender.path) + case "init" if (counter == 0) => + pongChannel ! Deliver(Persistent(Pong), sender.path) } - - override def preStart() = () } class Pong extends Processor { @@ -40,8 +39,6 @@ object ConversationRecoveryExample extends App { if (!recoveryRunning) Thread.sleep(1000) pingChannel ! Deliver(m.withPayload(Ping), sender.path) } - - override def preStart() = () } val system = ActorSystem("example") @@ -49,8 +46,5 @@ object ConversationRecoveryExample extends App { val ping = system.actorOf(Props(classOf[Ping]), "ping") val pong = system.actorOf(Props(classOf[Pong]), "pong") - ping ! Recover() - pong ! Recover() - ping tell ("init", pong) } diff --git a/akka-samples/akka-sample-persistence/src/main/scala/sample/persistence/EventsourcedExample.scala b/akka-samples/akka-sample-persistence-scala/src/main/scala/sample/persistence/EventsourcedExample.scala similarity index 100% rename from akka-samples/akka-sample-persistence/src/main/scala/sample/persistence/EventsourcedExample.scala rename to akka-samples/akka-sample-persistence-scala/src/main/scala/sample/persistence/EventsourcedExample.scala diff --git a/akka-samples/akka-sample-persistence/src/main/scala/sample/persistence/ProcessorChannelExample.scala b/akka-samples/akka-sample-persistence-scala/src/main/scala/sample/persistence/ProcessorChannelExample.scala similarity index 100% rename from akka-samples/akka-sample-persistence/src/main/scala/sample/persistence/ProcessorChannelExample.scala rename to akka-samples/akka-sample-persistence-scala/src/main/scala/sample/persistence/ProcessorChannelExample.scala diff --git a/akka-samples/akka-sample-persistence/src/main/scala/sample/persistence/ProcessorChannelRemoteExample.scala b/akka-samples/akka-sample-persistence-scala/src/main/scala/sample/persistence/ProcessorChannelRemoteExample.scala similarity index 72% rename from akka-samples/akka-sample-persistence/src/main/scala/sample/persistence/ProcessorChannelRemoteExample.scala rename to akka-samples/akka-sample-persistence-scala/src/main/scala/sample/persistence/ProcessorChannelRemoteExample.scala index c4a4750027..4f0374c23e 100644 --- a/akka-samples/akka-sample-persistence/src/main/scala/sample/persistence/ProcessorChannelRemoteExample.scala +++ b/akka-samples/akka-sample-persistence-scala/src/main/scala/sample/persistence/ProcessorChannelRemoteExample.scala @@ -34,19 +34,19 @@ object ProcessorChannelRemoteExample { """) } -object SenderApp extends App { +object SenderApp /*extends App*/ { // no app until https://github.com/typesafehub/activator/issues/287 is fixed import ProcessorChannelRemoteExample._ class ExampleProcessor(destination: ActorPath) extends Processor { val listener = context.actorOf(Props[ExampleListener]) val channel = context.actorOf(Channel.props(ChannelSettings( - redeliverMax = 5, - redeliverInterval = 1.second, + redeliverMax = 15, + redeliverInterval = 3.seconds, redeliverFailureListener = Some(listener))), "channel") def receive = { - case p @ Persistent(payload, _) => - println(s"[processor] received payload: ${payload} (replayed = ${recoveryRunning})") + case p @ Persistent(payload, snr) => + println(s"[processor] received payload: ${payload} (snr = ${snr}, replayed = ${recoveryRunning})") channel ! Deliver(p.withPayload(s"processed ${payload}"), destination) case "restart" => throw new Exception("restart requested") @@ -64,35 +64,23 @@ object SenderApp extends App { } val receiverPath = ActorPath.fromString("akka.tcp://receiver@127.0.0.1:44317/user/receiver") - val senderConfig = ConfigFactory.parseString(""" - akka.persistence.journal.leveldb.dir = "target/example/journal" - akka.persistence.snapshot-store.local.dir = "target/example/snapshots" - akka.remote.netty.tcp.port = 44316 - """) + val senderConfig = ConfigFactory.parseString("akka.remote.netty.tcp.port = 44316") val system = ActorSystem("sender", config.withFallback(senderConfig)) val sender = system.actorOf(Props(classOf[ExampleProcessor], receiverPath)) - @annotation.tailrec - def read(line: String): Unit = line match { - case "exit" | null => - case msg => - sender ! Persistent(msg) - read(Console.readLine()) - } - - read(Console.readLine()) - system.shutdown() + import system.dispatcher + system.scheduler.schedule(Duration.Zero, 3.seconds, sender, Persistent("scheduled")) } -object ReceiverApp extends App { +object ReceiverApp /*extends App*/ { // no app until https://github.com/typesafehub/activator/issues/287 is fixed import ProcessorChannelRemoteExample._ class ExampleDestination extends Actor { def receive = { - case p @ ConfirmablePersistent(payload, snr, _) => - println(s"[destination] received payload: ${payload}") + case p @ ConfirmablePersistent(payload, snr, redel) => + println(s"[destination] received payload: ${payload} (snr = ${snr}, redel = ${redel})") sender ! s"re: ${payload} (snr = ${snr})" p.confirm() } diff --git a/akka-samples/akka-sample-persistence/src/main/scala/sample/persistence/ProcessorFailureExample.scala b/akka-samples/akka-sample-persistence-scala/src/main/scala/sample/persistence/ProcessorFailureExample.scala similarity index 100% rename from akka-samples/akka-sample-persistence/src/main/scala/sample/persistence/ProcessorFailureExample.scala rename to akka-samples/akka-sample-persistence-scala/src/main/scala/sample/persistence/ProcessorFailureExample.scala diff --git a/akka-samples/akka-sample-persistence/src/main/scala/sample/persistence/SnapshotExample.scala b/akka-samples/akka-sample-persistence-scala/src/main/scala/sample/persistence/SnapshotExample.scala similarity index 100% rename from akka-samples/akka-sample-persistence/src/main/scala/sample/persistence/SnapshotExample.scala rename to akka-samples/akka-sample-persistence-scala/src/main/scala/sample/persistence/SnapshotExample.scala diff --git a/akka-samples/akka-sample-persistence/src/main/scala/sample/persistence/ViewExample.scala b/akka-samples/akka-sample-persistence-scala/src/main/scala/sample/persistence/ViewExample.scala similarity index 81% rename from akka-samples/akka-sample-persistence/src/main/scala/sample/persistence/ViewExample.scala rename to akka-samples/akka-sample-persistence-scala/src/main/scala/sample/persistence/ViewExample.scala index 6596c45c56..a489b8e65a 100644 --- a/akka-samples/akka-sample-persistence/src/main/scala/sample/persistence/ViewExample.scala +++ b/akka-samples/akka-sample-persistence-scala/src/main/scala/sample/persistence/ViewExample.scala @@ -4,6 +4,8 @@ package sample.persistence +import scala.concurrent.duration._ + import akka.actor._ import akka.persistence._ @@ -52,20 +54,8 @@ object ViewExample extends App { val processor = system.actorOf(Props(classOf[ExampleProcessor])) val view = system.actorOf(Props(classOf[ExampleView])) - @annotation.tailrec - def read(line: String): Unit = line match { - case "exit" | null => - case "sync" => - view ! Update(await = false) - read(Console.readLine()) - case "snap" => - view ! "snap" - read(Console.readLine()) - case msg => - processor ! Persistent(msg) - read(Console.readLine()) - } + import system.dispatcher - read(Console.readLine()) - system.shutdown() + system.scheduler.schedule(Duration.Zero, 2.seconds, processor, Persistent("scheduled")) + system.scheduler.schedule(Duration.Zero, 5.seconds, view, "snap") } diff --git a/akka-samples/akka-sample-persistence-scala/tutorial/index.html b/akka-samples/akka-sample-persistence-scala/tutorial/index.html new file mode 100644 index 0000000000..9769a2fca5 --- /dev/null +++ b/akka-samples/akka-sample-persistence-scala/tutorial/index.html @@ -0,0 +1,155 @@ + + +Akka Persistence Samples with Scala + + + + +
+

+This tutorial contains examples that illustrate a subset of +Akka Persistence features. +

+
    +
  • Processors and channels
  • +
  • Processsor snapshots
  • +
  • Eventsourced processors
  • +
  • Processor failure handling
  • +
  • Processor views
  • +
  • Processor conversation recovery
  • +
+ +

+Custom storage locations for the journal and snapshots can be defined in +application.conf. +

+
+ +
+

Processors and channels

+

+ProcessorChannelExample.scala +defines an ExampleProcessor and an ExampleDestination. The processor sends messages to a +destination via a channel. The destination confirms the delivery of these messages so that replayed messages aren't +redundantly delivered to the destination. Repeated runs of the application demonstrates that the processor receives +both replayed and new messages whereas the channel only receives new messages, sent by the application. The processor +also receives replies from the destination, demonstrating that a channel preserves sender references. +

+ +

+To run this example, go to the Run tab, and run the application main class +sample.persistence.ProcessorChannelExample several times. +

+ + + +
+ +
+

Processor snapshots

+

+SnapshotExample.scala +demonstrates how processors can take snapshots of application state and recover from previously stored snapshots. +Snapshots are offered to processors at the beginning of recovery, before any messages (younger than the snapshot) +are replayed. +

+ +

+To run this example, go to the Run tab, and run the application main class +sample.persistence.SnapshotExample several times. With every run, the state offered by the +most recent snapshot is printed to stdout, followed by the updated state after sending new persistent +messages to the processor. +

+
+ +
+

Eventsourced processors

+

+EventsourcedExample.scala +is described in detail in the Event sourcing +section of the user documentation. With every application run, the ExampleProcessor is recovered from +events stored in previous application runs, processes new commands, stores new events and snapshots and prints the +current processor state to stdout. +

+ +

+To run this example, go to the Run tab, and run the application main class +sample.persistence.EventsourcedExample several times. +

+
+ +
+

Processor failure handling

+

+ProcessorFailureExample.scala +shows how a processor can delete persistent messages from the journal if they threw an exception. Throwing an exception +restarts the processor and replays messages. In order to prevent that the message that caused the exception is replayed, +it is marked as deleted in the journal (during invocation of preRestart). This is a common pattern in +command-sourcing to compensate write-ahead logging of messages. +

+ +

+To run this example, go to the Run tab, and run the application main class +sample.persistence.ProcessorFailureExample several times. +

+ +

+Event sourcing +on the other hand, does not persist commands directly but rather events that have been derived from received commands +(not shown here). These events are known to be successfully applicable to current processor state i.e. there's +no need for deleting them from the journal. Event sourced processors usually have a lower throughput than command +sourced processors, as the maximum size of a write batch is limited by the number of persisted events per received +command. +

+
+ +
+

Processor views

+

+ViewExample.scala demonstrates +how a view (ExampleView) is updated with the persistent message stream of a processor +(ExampleProcessor). Messages sent to the processor are read from stdin. Views also support +snapshotting and can be used in combination with channels in the same way as processors. +

+ +

+To run this example, go to the Run tab, and run the application main class +sample.persistence.ViewExample. +

+ +

+Views can also receive events that have been persisted by event sourced processors (not shown). +

+
+ +
+

Processor conversation recovery

+

+ConversationRecoveryExample.scala +defines two processors that send messages to each other via channels. The reliable delivery properties of channels, +in combination with processors, allow these processors to automatically resume their conversation after a JVM crash. +

+

+To run this example, go to the Run tab, and run the application main class +sample.persistence.ConversationRecoveryExample several times. +

+
+ + + diff --git a/akka-samples/akka-sample-persistence/src/main/resources/application.conf b/akka-samples/akka-sample-persistence/src/main/resources/application.conf deleted file mode 100644 index 432132898d..0000000000 --- a/akka-samples/akka-sample-persistence/src/main/resources/application.conf +++ /dev/null @@ -1,2 +0,0 @@ -akka.persistence.journal.leveldb.dir = "target/example/journal" -akka.persistence.snapshot-store.local.dir = "target/example/snapshots" diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index 3f8247ff77..8393833f38 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -369,7 +369,7 @@ object AkkaBuild extends Build { settings = parentSettings ++ ActivatorDist.settings, aggregate = Seq(camelSampleJava, camelSampleScala, mainSampleJava, mainSampleScala, remoteSampleJava, remoteSampleScala, clusterSampleJava, clusterSampleScala, - fsmSampleScala, persistenceSample, + fsmSampleScala, persistenceSampleJava, persistenceSampleScala, multiNodeSampleScala, helloKernelSample, osgiDiningHakkersSample) ) @@ -429,9 +429,16 @@ object AkkaBuild extends Build { settings = sampleSettings ) - lazy val persistenceSample = Project( - id = "akka-sample-persistence", - base = file("akka-samples/akka-sample-persistence"), + lazy val persistenceSampleJava = Project( + id = "akka-sample-persistence-java", + base = file("akka-samples/akka-sample-persistence-java"), + dependencies = Seq(actor, persistence), + settings = sampleSettings + ) + + lazy val persistenceSampleScala = Project( + id = "akka-sample-persistence-scala", + base = file("akka-samples/akka-sample-persistence-scala"), dependencies = Seq(actor, persistence), settings = sampleSettings )