!per #3652 Custom snapshot and persistent message serialization

- Protobuf serializer for Persistent message
- Configurable serializer for Persistent message's payload
- Configurable serializer for snapshots
This commit is contained in:
Martin Krasser 2013-10-09 13:11:53 +02:00
parent 6f89d346ec
commit 2a30399a29
35 changed files with 4049 additions and 390 deletions

View file

@ -1,3 +1,7 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.persistence;
import scala.Option;

View file

@ -1,3 +1,7 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.persistence;
//#plugin-imports
@ -10,132 +14,45 @@ import akka.persistence.snapshot.japi.*;
//#plugin-imports
public class PersistencePluginDocTest {
static Object o1 = new Object() {
abstract class MySnapshotStore extends SnapshotStore {
//#snapshot-store-plugin-api
/**
* Plugin Java API.
*
* Asynchronously loads a snapshot.
*
* @param processorId processor id.
* @param criteria selection criteria for loading.
*/
public abstract Future<Option<SelectedSnapshot>> doLoadAsync(String processorId, SnapshotSelectionCriteria criteria);
/**
* Plugin Java API.
*
* Asynchronously saves a snapshot.
*
* @param metadata snapshot metadata.
* @param snapshot snapshot.
*/
public abstract Future<Void> doSaveAsync(SnapshotMetadata metadata, Object snapshot);
/**
* Plugin Java API.
*
* Called after successful saving of a snapshot.
*
* @param metadata snapshot metadata.
*/
public abstract void onSaved(SnapshotMetadata metadata) throws Exception;
/**
* Plugin Java API.
*
* Deletes the snapshot identified by `metadata`.
*
* @param metadata snapshot metadata.
*/
public abstract void doDelete(SnapshotMetadata metadata) throws Exception;
//#snapshot-store-plugin-api
class MySnapshotStore extends SnapshotStore {
@Override
public Future<Option<SelectedSnapshot>> doLoadAsync(String processorId, SnapshotSelectionCriteria criteria) {
return null;
}
abstract class MySyncWriteJournal extends SyncWriteJournal {
//#sync-write-plugin-api
/**
* Plugin Java API.
*
* Synchronously writes a `persistent` message to the journal.
*/
@Override
public abstract void doWrite(PersistentImpl persistent) throws Exception;
/**
* Plugin Java API.
*
* Synchronously marks a `persistent` message as deleted.
*/
@Override
public abstract void doDelete(PersistentImpl persistent) throws Exception;
/**
* Plugin Java API.
*
* Synchronously writes a delivery confirmation to the journal.
*/
@Override
public abstract void doConfirm(String processorId, long sequenceNr, String channelId) throws Exception;
//#sync-write-plugin-api
@Override
public Future<Void> doSaveAsync(SnapshotMetadata metadata, Object snapshot) {
return null;
}
abstract class MyAsyncWriteJournal extends AsyncWriteJournal {
//#async-write-plugin-api
/**
* Plugin Java API.
*
* Asynchronously writes a `persistent` message to the journal.
*/
@Override
public abstract Future<Void> doWriteAsync(PersistentImpl persistent);
/**
* Plugin Java API.
*
* Asynchronously marks a `persistent` message as deleted.
*/
@Override
public abstract Future<Void> doDeleteAsync(PersistentImpl persistent);
/**
* Plugin Java API.
*
* Asynchronously writes a delivery confirmation to the journal.
*/
@Override
public abstract Future<Void> doConfirmAsync(String processorId, long sequenceNr, String channelId);
//#async-write-plugin-api
@Override
public void onSaved(SnapshotMetadata metadata) throws Exception {
}
abstract class MyAsyncReplay extends AsyncReplay {
//#async-replay-plugin-api
/**
* Plugin Java API.
*
* Asynchronously replays persistent messages. Implementations replay a message
* by calling `replayCallback`. The returned future must be completed when all
* messages (matching the sequence number bounds) have been replayed. The future
* `Long` value must be the highest stored sequence number in the journal for the
* specified processor. The future must be completed with a failure if any of
* the persistent messages could not be replayed.
*
* The `replayCallback` must also be called with messages that have been marked
* as deleted. In this case a replayed message's `deleted` field must be set to
* `true`.
*
* The channel ids of delivery confirmations that are available for a replayed
* message must be contained in that message's `confirms` sequence.
*
* @param processorId processor id.
* @param fromSequenceNr sequence number where replay should start.
* @param toSequenceNr sequence number where replay should end (inclusive).
* @param replayCallback called to replay a single message.
*/
@Override
public abstract Future<Long> doReplayAsync(String processorId, long fromSequenceNr, long toSequenceNr, Procedure<PersistentImpl> replayCallback);
//#async-replay-plugin-api
@Override
public void doDelete(SnapshotMetadata metadata) throws Exception {
}
};
}
class MyAsyncJournal extends AsyncWriteJournal {
@Override
public Future<Long> doReplayAsync(String processorId, long fromSequenceNr, long toSequenceNr, Procedure<PersistentImpl> replayCallback) {
return null;
}
@Override
public Future<Void> doWriteAsync(PersistentImpl persistent) {
return null;
}
@Override
public Future<Void> doDeleteAsync(PersistentImpl persistent) {
return null;
}
@Override
public Future<Void> doConfirmAsync(String processorId, long sequenceNr, String channelId) {
return null;
}
}
}

View file

@ -4,9 +4,21 @@
Persistence
###########
This section describes an early access version of the Akka persistence module. Akka persistence is heavily inspired
by the `eventsourced`_ library. It follows the same concepts and architecture of `eventsourced`_ but significantly
differs on API and implementation level.
Akka persistence enables stateful actors to persist their internal state so that it can be recovered when an actor
is started, restarted by a supervisor or migrated in a cluster. It also allows stateful actors to recover from JVM
crashes, for example. The key concept behind Akka persistence is that only changes to an actor's internal state are
persisted but never its current state directly (except for optional snapshots). These changes are only ever appended
to storage, nothing is ever mutated, which allows for very high transaction rates and efficient replication. Stateful
actors are recovered by replaying stored changes to these actors from which they can rebuild internal state. This can
be either the full history of changes or starting from a snapshot of internal actor state which can dramatically
reduce recovery times.
Storage backends for state changes and snapshots are pluggable in Akka persistence. Currently, these are written to
the local filesystem. Distributed and replicated storage, with the possibility of scaling writes, will be available
soon.
Akka persistence is inspired by the `eventsourced`_ library. It follows the same concepts and architecture of
`eventsourced`_ but significantly differs on API and implementation level.
.. warning::
@ -31,13 +43,20 @@ Akka persistence is a separate jar file. Make sure that you have the following d
Architecture
============
* *Processor*: A processor is a persistent actor. Messages sent to a processor are written to a journal before
its ``onReceive`` method is called. When a processor is started or restarted, journaled messages are replayed
* *Processor*: A processor is a persistent, stateful actor. Messages sent to a processor are written to a journal
before its ``onReceive`` method is called. When a processor is started or restarted, journaled messages are replayed
to that processor, so that it can recover internal state from these messages.
* *Channel*: Channels are used by processors to communicate with other actors. They prevent that replayed messages
are redundantly delivered to these actors.
* *Journal*: A journal stores the sequence of messages sent to a processor. An application can control which messages
are stored and which are received by the processor without being journaled. The storage backend of a journal is
pluggable.
* *Snapshot store*: A snapshot store persists snapshots of a processor's internal state. Snapshots are used for
optimizing recovery times. The storage backend of a snapshot store is pluggable.
Use cases
=========
@ -69,10 +88,11 @@ A processor can be implemented by extending the abstract ``UntypedProcessor`` cl
Processors only write messages of type ``Persistent`` to the journal, others are received without being persisted.
When a processor's ``onReceive`` method is called with a ``Persistent`` message it can safely assume that this message
has been successfully written to the journal. If a journal fails to write a ``Persistent`` message then the processor
receives a ``PersistenceFailure`` message instead of a ``Persistent`` message. In this case, a processor may want to
inform the sender about the failure, so that the sender can re-send the message, if needed, under the assumption that
the journal recovered from a temporary failure.
has been successfully written to the journal. If a journal fails to write a ``Persistent`` message then the processor
is stopped, by default. If an application wants that a processors continues to run on persistence failures it must
handle ``PersistenceFailure`` messages. In this case, a processor may want to inform the sender about the failure,
so that the sender can re-send the message, if needed, under the assumption that the journal recovered from a
temporary failure.
An ``UntypedProcessor`` itself is an ``Actor`` and can therefore be instantiated with ``actorOf``.
@ -268,16 +288,16 @@ A journal plugin either extends ``SyncWriteJournal`` or ``AsyncWriteJournal``.
actor that should be extended when the storage backend API only supports synchronous, blocking writes. The
methods to be implemented in this case are:
.. includecode:: code/docs/persistence/PersistencePluginDocTest.java#sync-write-plugin-api
.. includecode:: ../../../akka-persistence/src/main/java/akka/persistence/journal/japi/SyncWritePlugin.java#sync-write-plugin-api
``AsyncWriteJournal`` is an actor that should be extended if the storage backend API supports asynchronous,
non-blocking writes. The methods to be implemented in that case are:
.. includecode:: code/docs/persistence/PersistencePluginDocTest.java#async-write-plugin-api
.. includecode:: ../../../akka-persistence/src/main/java/akka/persistence/journal/japi/AsyncWritePlugin.java#async-write-plugin-api
Message replays are always asynchronous, therefore, any journal plugin must implement:
.. includecode:: code/docs/persistence/PersistencePluginDocTest.java#async-replay-plugin-api
.. includecode:: ../../../akka-persistence/src/main/java/akka/persistence/journal/japi/AsyncReplayPlugin.java#async-replay-plugin-api
A journal plugin can be activated with the following minimal configuration:
@ -292,7 +312,7 @@ Snapshot store plugin API
A snapshot store plugin must extend the ``SnapshotStore`` actor and implement the following methods:
.. includecode:: code/docs/persistence/PersistencePluginDocTest.java#snapshot-store-plugin-api
.. includecode:: ../../../akka-persistence/src/main/java/akka/persistence/snapshot/japi/SnapshotStorePlugin.java#snapshot-store-plugin-api
A snapshot store plugin can be activated with the following minimal configuration:
@ -301,10 +321,25 @@ A snapshot store plugin can be activated with the following minimal configuratio
The specified plugin ``class`` must have a no-arg constructor. The ``plugin-dispatcher`` is the dispatcher
used for the plugin actor. If not specified, it defaults to ``akka.persistence.dispatchers.default-plugin-dispatcher``.
Custom serialization
====================
Serialization of snapshots and payloads of ``Persistent`` messages is configurable with Akka's
:ref:`serialization-java` infrastructure. For example, if an application wants to serialize
* payloads of type ``MyPayload`` with a custom ``MyPayloadSerializer`` and
* snapshots of type ``MySnapshot`` with a custom ``MySnapshotSerializer``
it must add
.. includecode:: ../scala/code/docs/persistence/PersistenceSerializerDocSpec.scala#custom-serializer-config
to the application configuration. If not specified, a default serializer is used, which is the ``JavaSerializer``
in this example.
Upcoming features
=================
* Reliable channels
* Custom serialization of messages and snapshots
* Extended deletion of messages and snapshots
* ...

View file

@ -166,19 +166,7 @@ There is also a default remote address which is the one used by cluster support
Deep serialization of Actors
----------------------------
The current recommended approach to do deep serialization of internal actor state is to use Event Sourcing,
for more reading on the topic, see these examples:
`Martin Krasser on EventSourcing Part1 <http://krasserm.blogspot.com/2011/11/building-event-sourced-web-application.html>`_
`Martin Krasser on EventSourcing Part2 <http://krasserm.blogspot.com/2012/01/building-event-sourced-web-application.html>`_
.. note::
Built-in API support for persisting Actors will come in a later release, see the roadmap for more info:
`Akka 2.0 roadmap <https://docs.google.com/a/typesafe.com/document/d/18W9-fKs55wiFNjXL9q50PYOnR7-nnsImzJqHOPPbM4E>`_
The recommended approach to do deep serialization of internal actor state is to use Akka :ref:`persistence-java`.
A Word About Java Serialization
===============================

View file

@ -1,3 +1,7 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.persistence
import akka.actor.ActorSystem

View file

@ -1,3 +1,7 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.persistence
//#plugin-imports

View file

@ -0,0 +1,50 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.persistence
import com.typesafe.config._
import org.scalatest.WordSpec
import akka.actor.ActorSystem
import akka.serialization.{ Serializer, SerializationExtension }
class PersistenceSerializerDocSpec extends WordSpec {
val customSerializerConfig =
"""
//#custom-serializer-config
akka.actor {
serializers {
my-payload = "docs.persistence.MyPayloadSerializer"
my-snapshot = "docs.persistence.MySnapshotSerializer"
}
serialization-bindings {
"docs.persistence.MyPayload" = my-payload
"docs.persistence.MySnapshot" = my-snapshot
}
}
//#custom-serializer-config
""".stripMargin
SerializationExtension(ActorSystem("doc", ConfigFactory.parseString(customSerializerConfig)))
}
class MyPayload
class MySnapshot
class MyPayloadSerializer extends Serializer {
def identifier: Int = 77124
def includeManifest: Boolean = false
def toBinary(o: AnyRef): Array[Byte] = ???
def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef = ???
}
class MySnapshotSerializer extends Serializer {
def identifier: Int = 77125
def includeManifest: Boolean = false
def toBinary(o: AnyRef): Array[Byte] = ???
def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef = ???
}

View file

@ -4,9 +4,21 @@
Persistence
###########
This section describes an early access version of the Akka persistence module. Akka persistence is heavily inspired
by the `eventsourced`_ library. It follows the same concepts and architecture of `eventsourced`_ but significantly
differs on API and implementation level.
Akka persistence enables stateful actors to persist their internal state so that it can be recovered when an actor
is started, restarted by a supervisor or migrated in a cluster. It also allows stateful actors to recover from JVM
crashes, for example. The key concept behind Akka persistence is that only changes to an actor's internal state are
persisted but never its current state directly (except for optional snapshots). These changes are only ever appended
to storage, nothing is ever mutated, which allows for very high transaction rates and efficient replication. Stateful
actors are recovered by replaying stored changes to these actors from which they can rebuild internal state. This can
be either the full history of changes or starting from a snapshot of internal actor state which can dramatically
reduce recovery times.
Storage backends for state changes and snapshots are pluggable in Akka persistence. Currently, these are written to
the local filesystem. Distributed and replicated storage, with the possibility of scaling writes, will be available
soon.
Akka persistence is inspired by the `eventsourced`_ library. It follows the same concepts and architecture of
`eventsourced`_ but significantly differs on API and implementation level.
.. warning::
@ -27,13 +39,20 @@ Akka persistence is a separate jar file. Make sure that you have the following d
Architecture
============
* *Processor*: A processor is a persistent actor. Messages sent to a processor are written to a journal before
its ``receive`` method is called. When a processor is started or restarted, journaled messages are replayed
* *Processor*: A processor is a persistent, stateful actor. Messages sent to a processor are written to a journal
before its ``receive`` method is called. When a processor is started or restarted, journaled messages are replayed
to that processor, so that it can recover internal state from these messages.
* *Channel*: Channels are used by processors to communicate with other actors. They prevent that replayed messages
are redundantly delivered to these actors.
* *Journal*: A journal stores the sequence of messages sent to a processor. An application can control which messages
are stored and which are received by the processor without being journaled. The storage backend of a journal is
pluggable.
* *Snapshot store*: A snapshot store persists snapshots of a processor's internal state. Snapshots are used for
optimizing recovery times. The storage backend of a snapshot store is pluggable.
Use cases
=========
@ -65,9 +84,10 @@ A processor can be implemented by extending the ``Processor`` trait and implemen
Processors only write messages of type ``Persistent`` to the journal, others are received without being persisted.
When a processor's ``receive`` method is called with a ``Persistent`` message it can safely assume that this message
has been successfully written to the journal. If a journal fails to write a ``Persistent`` message then the processor
receives a ``PersistenceFailure`` message instead of a ``Persistent`` message. In this case, a processor may want to
inform the sender about the failure, so that the sender can re-send the message, if needed, under the assumption that
the journal recovered from a temporary failure.
is stopped, by default. If an application wants that a processors continues to run on persistence failures it must
handle ``PersistenceFailure`` messages. In this case, a processor may want to inform the sender about the failure,
so that the sender can re-send the message, if needed, under the assumption that the journal recovered from a
temporary failure.
A ``Processor`` itself is an ``Actor`` and can therefore be instantiated with ``actorOf``.
@ -312,6 +332,22 @@ A snapshot store plugin can be activated with the following minimal configuratio
The specified plugin ``class`` must have a no-arg constructor. The ``plugin-dispatcher`` is the dispatcher
used for the plugin actor. If not specified, it defaults to ``akka.persistence.dispatchers.default-plugin-dispatcher``.
Custom serialization
====================
Serialization of snapshots and payloads of ``Persistent`` messages is configurable with Akka's
:ref:`serialization-scala` infrastructure. For example, if an application wants to serialize
* payloads of type ``MyPayload`` with a custom ``MyPayloadSerializer`` and
* snapshots of type ``MySnapshot`` with a custom ``MySnapshotSerializer``
it must add
.. includecode:: code/docs/persistence/PersistenceSerializerDocSpec.scala#custom-serializer-config
to the application configuration. If not specified, a default serializer is used, which is the ``JavaSerializer``
in this example.
Miscellaneous
=============
@ -326,6 +362,5 @@ Upcoming features
=================
* Reliable channels
* Custom serialization of messages and snapshots
* Extended deletion of messages and snapshots
* ...

View file

@ -155,19 +155,7 @@ There is also a default remote address which is the one used by cluster support
Deep serialization of Actors
----------------------------
The current recommended approach to do deep serialization of internal actor state is to use Event Sourcing,
for more reading on the topic, see these examples:
`Martin Krasser on EventSourcing Part1 <http://krasserm.blogspot.com/2011/11/building-event-sourced-web-application.html>`_
`Martin Krasser on EventSourcing Part2 <http://krasserm.blogspot.com/2012/01/building-event-sourced-web-application.html>`_
.. note::
Built-in API support for persisting Actors will come in a later release, see the roadmap for more info:
`Akka 2.0 roadmap <https://docs.google.com/a/typesafe.com/document/d/18W9-fKs55wiFNjXL9q50PYOnR7-nnsImzJqHOPPbM4E>`_
The recommended approach to do deep serialization of internal actor state is to use Akka :ref:`persistence`.
A Word About Java Serialization
===============================

View file

@ -0,0 +1,39 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence.journal.japi;
import scala.concurrent.Future;
import akka.japi.Procedure;
import akka.persistence.PersistentImpl;
interface AsyncReplayPlugin {
//#async-replay-plugin-api
/**
* Plugin Java API.
*
* Asynchronously replays persistent messages. Implementations replay a message
* by calling `replayCallback`. The returned future must be completed when all
* messages (matching the sequence number bounds) have been replayed. The future
* `Long` value must be the highest stored sequence number in the journal for the
* specified processor. The future must be completed with a failure if any of
* the persistent messages could not be replayed.
*
* The `replayCallback` must also be called with messages that have been marked
* as deleted. In this case a replayed message's `deleted` field must be set to
* `true`.
*
* The channel ids of delivery confirmations that are available for a replayed
* message must be contained in that message's `confirms` sequence.
*
* @param processorId processor id.
* @param fromSequenceNr sequence number where replay should start.
* @param toSequenceNr sequence number where replay should end (inclusive).
* @param replayCallback called to replay a single message. Can be called from any
* thread.
*/
Future<Long> doReplayAsync(String processorId, long fromSequenceNr, long toSequenceNr, Procedure<PersistentImpl> replayCallback);
//#async-replay-plugin-api
}

View file

@ -0,0 +1,34 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence.journal.japi;
import scala.concurrent.Future;
import akka.persistence.PersistentImpl;
interface AsyncWritePlugin {
//#async-write-plugin-api
/**
* Plugin Java API.
*
* Asynchronously writes a `persistent` message to the journal.
*/
Future<Void> doWriteAsync(PersistentImpl persistent);
/**
* Plugin Java API.
*
* Asynchronously marks a `persistent` message as deleted.
*/
Future<Void> doDeleteAsync(PersistentImpl persistent);
/**
* Plugin Java API.
*
* Asynchronously writes a delivery confirmation to the journal.
*/
Future<Void> doConfirmAsync(String processorId, long sequenceNr, String channelId);
//#async-write-plugin-api
}

View file

@ -0,0 +1,32 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence.journal.japi;
import akka.persistence.PersistentImpl;
interface SyncWritePlugin {
//#sync-write-plugin-api
/**
* Plugin Java API.
*
* Synchronously writes a `persistent` message to the journal.
*/
void doWrite(PersistentImpl persistent) throws Exception;
/**
* Plugin Java API.
*
* Synchronously marks a `persistent` message as deleted.
*/
void doDelete(PersistentImpl persistent) throws Exception;
/**
* Plugin Java API.
*
* Synchronously writes a delivery confirmation to the journal.
*/
void doConfirm(String processorId, long sequenceNr, String channelId) throws Exception;
//#sync-write-plugin-api
}

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,52 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence.snapshot.japi;
import scala.concurrent.Future;
import akka.japi.Option;
import akka.persistence.*;
interface SnapshotStorePlugin {
//#snapshot-store-plugin-api
/**
* Plugin Java API.
*
* Asynchronously loads a snapshot.
*
* @param processorId processor id.
* @param criteria selection criteria for loading.
*/
Future<Option<SelectedSnapshot>> doLoadAsync(String processorId, SnapshotSelectionCriteria criteria);
/**
* Plugin Java API.
*
* Asynchronously saves a snapshot.
*
* @param metadata snapshot metadata.
* @param snapshot snapshot.
*/
Future<Void> doSaveAsync(SnapshotMetadata metadata, Object snapshot);
/**
* Plugin Java API.
*
* Called after successful saving of a snapshot.
*
* @param metadata snapshot metadata.
*/
void onSaved(SnapshotMetadata metadata) throws Exception;
/**
* Plugin Java API.
*
* Deletes the snapshot identified by `metadata`.
*
* @param metadata snapshot metadata.
*/
void doDelete(SnapshotMetadata metadata) throws Exception;
//#snapshot-store-plugin-api
}

View file

@ -0,0 +1,31 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
option java_package = "akka.persistence.serialization";
option optimize_for = SPEED;
message PersistentMessage {
optional PersistentPayload payload = 1;
optional int64 sequenceNr = 2;
optional string processorId = 3;
optional string channelId = 4;
optional bool deleted = 5;
optional bool resolved = 6;
repeated string confirms = 8;
optional ConfirmMessage confirmMessage = 10;
optional string confirmTarget = 9;
optional string sender = 7;
}
message PersistentPayload {
required int32 serializerId = 1;
required bytes payload = 2;
optional bytes payloadManifest = 3;
}
message ConfirmMessage {
optional string processorId = 1;
optional int64 sequenceNr = 2;
optional string channelId = 3;
}

View file

@ -2,8 +2,27 @@
# Akka Persistence Reference Config File #
##########################################
akka {
# Protobuf serialization for persistent messages
actor {
serializers {
akka-persistence-snapshot = "akka.persistence.serialization.SnapshotSerializer"
akka-persistence-message = "akka.persistence.serialization.MessageSerializer"
}
serialization-bindings {
"akka.persistence.serialization.Snapshot" = akka-persistence-snapshot
"akka.persistence.PersistentImpl" = akka-persistence-message
"akka.persistence.Confirm" = akka-persistence-message
}
}
persistence {
journal {

View file

@ -14,9 +14,6 @@ import akka.persistence.journal.AsyncWriteJournal
* Persistence extension.
*/
object Persistence extends ExtensionId[Persistence] with ExtensionIdProvider {
class Settings(config: Config) {
}
/**
* Java API.
*/

View file

@ -83,13 +83,13 @@ object Persistent {
*
* Internal [[Persistent]] message representation.
*
* @param resolved `true` by default, `false` for replayed messages. Set to `true` by a channel if this
* message is replayed and its sender reference was resolved. Channels use this field to
* avoid redundant sender reference resolutions.
* @param processorId Id of processor that journaled the message.
* @param channelId Id of last channel that delivered the message to a destination.
* @param sender Serialized sender reference.
* @param deleted `true` if this message is marked as deleted.
* @param resolved `true` by default, `false` for replayed messages. Set to `true` by a channel if this
* message is replayed and its sender reference was resolved. Channels use this field to
* avoid redundant sender reference resolutions.
* @param confirms Channel ids of delivery confirmations that are available for this message. Only non-empty
* for replayed messages.
* @param confirmTarget Delivery confirmation target.
@ -102,14 +102,14 @@ object Persistent {
case class PersistentImpl(
payload: Any,
sequenceNr: Long = 0L,
resolved: Boolean = true,
processorId: String = "",
channelId: String = "",
sender: String = "",
processorId: String = PersistentImpl.Undefined,
channelId: String = PersistentImpl.Undefined,
deleted: Boolean = false,
resolved: Boolean = true,
confirms: Seq[String] = Nil,
confirmMessage: Confirm = null,
confirmTarget: ActorRef = null,
confirmMessage: Confirm = null) extends Persistent {
sender: ActorRef = null) extends Persistent {
def withPayload(payload: Any): Persistent =
copy(payload = payload)
@ -126,21 +126,17 @@ case class PersistentImpl(
}
object PersistentImpl {
/**
* Java Plugin API.
*/
def create(payload: Any, sequenceNr: Long, resolved: Boolean, processorId: String, channelId: String, sender: String, deleted: Boolean, confirms: Seq[String]): PersistentImpl =
PersistentImpl(payload, sequenceNr, resolved, processorId, channelId, sender, deleted, confirms)
val Undefined = ""
/**
* Java Plugin API.
*/
def create(payload: Any, sequenceNr: Long, resolved: Boolean, processorId: String, channelId: String, sender: String, deleted: Boolean, confirms: Seq[String], confirmTarget: ActorRef, confirmMessage: Confirm): PersistentImpl =
PersistentImpl(payload, sequenceNr, resolved, processorId, channelId, sender, deleted, confirms, confirmTarget, confirmMessage)
def create(payload: Any, sequenceNr: Long, processorId: String, channelId: String, deleted: Boolean, resolved: Boolean, confirms: Seq[String], confirmMessage: Confirm, confirmTarget: ActorRef, sender: ActorRef): PersistentImpl =
PersistentImpl(payload, sequenceNr, processorId, channelId, deleted, resolved, confirms, confirmMessage, confirmTarget, sender)
}
/**
* Receive by a processor when a journal failed to write a [[Persistent]] message.
* Received by a processor when a journal failed to write a [[Persistent]] message.
*
* @param payload payload of the persistent message.
* @param sequenceNr sequence number of the persistent message.
@ -149,6 +145,8 @@ object PersistentImpl {
case class PersistenceFailure(payload: Any, sequenceNr: Long, cause: Throwable)
/**
* Internal API.
*
* Message to confirm the receipt of a persistent message (sent via a [[Channel]]).
*/
@SerialVersionUID(1L)

View file

@ -4,9 +4,16 @@
package akka.persistence
import akka.AkkaException
import akka.actor._
import akka.dispatch._
/**
* Thrown by a [[Processor]] if a journal failed to replay all requested messages.
*/
@SerialVersionUID(1L)
case class ReplayFailureException(message: String, cause: Throwable) extends AkkaException(message, cause)
/**
* An actor that persists (journals) messages of type [[Persistent]]. Messages of other types are not persisted.
*
@ -119,7 +126,8 @@ trait Processor extends Actor with Stash {
unstashAllInternal()
}
case ReplayFailure(cause) {
throw cause
val errorMsg = s"Replay failure by journal (processor id = [${processorId}])"
throw new ReplayFailureException(errorMsg, cause)
}
case Replayed(p) try { processPersistent(receive, p) } catch {
case t: Throwable {
@ -140,13 +148,22 @@ trait Processor extends Actor with Stash {
override def toString: String = "recovery finished"
def aroundReceive(receive: Actor.Receive, message: Any) = message match {
case r: Recover // ignore
case Replayed(p) processPersistent(receive, p) // can occur after unstash from user stash
case WriteSuccess(p) processPersistent(receive, p)
case WriteFailure(p, cause) process(receive, PersistenceFailure(p.payload, p.sequenceNr, cause))
case LoopSuccess(m) process(receive, m)
case p: PersistentImpl journal forward Write(p.copy(processorId = processorId, sequenceNr = nextSequenceNr()), self)
case m journal forward Loop(m, self)
case r: Recover // ignore
case Replayed(p) processPersistent(receive, p) // can occur after unstash from user stash
case WriteSuccess(p) processPersistent(receive, p)
case WriteFailure(p, cause) {
val notification = PersistenceFailure(p.payload, p.sequenceNr, cause)
if (receive.isDefinedAt(notification)) process(receive, notification)
else {
val errorMsg = "Processor killed after persistence failure " +
s"(processor id = [${processorId}], sequence nr = [${p.sequenceNr}], payload class = [${p.payload.getClass.getName}]). " +
"To avoid killing processors on persistence failure, a processor must handle PersistenceFailure messages."
throw new ActorKilledException(errorMsg)
}
}
case LoopSuccess(m) process(receive, m)
case p: PersistentImpl journal forward Write(p.copy(processorId = processorId, sequenceNr = nextSequenceNr()), self)
case m journal forward Loop(m, self)
}
}
@ -159,7 +176,7 @@ trait Processor extends Actor with Stash {
override def toString: String = "recovery failed"
def aroundReceive(receive: Actor.Receive, message: Any) = message match {
case ReplaySuccess(maxSnr) {
case ReplaySuccess(_) | ReplayFailure(_) {
_currentState = prepareRestart
mailbox.enqueueFirst(self, _recoveryFailureMessage)
}

View file

@ -33,7 +33,8 @@ trait AsyncReplay {
* @param processorId processor id.
* @param fromSequenceNr sequence number where replay should start.
* @param toSequenceNr sequence number where replay should end (inclusive).
* @param replayCallback called to replay a single message.
* @param replayCallback called to replay a single message. Can be called from any
* thread.
*
* @see [[AsyncWriteJournal]]
* @see [[SyncWriteJournal]]

View file

@ -20,7 +20,6 @@ trait AsyncWriteJournal extends Actor with AsyncReplay {
import AsyncWriteJournal._
import context.dispatcher
private val extension = Persistence(context.system)
private val resequencer = context.actorOf(Props[Resequencer])
private var resequencerCounter = 1L
@ -29,7 +28,7 @@ trait AsyncWriteJournal extends Actor with AsyncReplay {
val csdr = sender
val cctr = resequencerCounter
val psdr = if (sender.isInstanceOf[PromiseActorRef]) context.system.deadLetters else sender
writeAsync(persistent.copy(sender = Serialization.serializedActorPath(psdr), resolved = false, confirmTarget = null, confirmMessage = null)) map {
writeAsync(persistent.copy(sender = psdr, resolved = false, confirmTarget = null, confirmMessage = null)) map {
_ Desequenced(WriteSuccess(persistent), cctr, processor, csdr)
} recover {
case e Desequenced(WriteFailure(persistent, e), cctr, processor, csdr)
@ -40,7 +39,7 @@ trait AsyncWriteJournal extends Actor with AsyncReplay {
// Send replayed messages and replay result to processor directly. No need
// to resequence replayed messages relative to written and looped messages.
replayAsync(processorId, fromSequenceNr, toSequenceNr) { p
if (!p.deleted) processor.tell(Replayed(p), extension.system.provider.resolveActorRef(p.sender))
if (!p.deleted) processor.tell(Replayed(p), p.sender)
} map {
maxSnr ReplaySuccess(maxSnr)
} recover {

View file

@ -23,14 +23,14 @@ trait SyncWriteJournal extends Actor with AsyncReplay {
final def receive = {
case Write(persistent, processor) {
val sdr = if (sender.isInstanceOf[PromiseActorRef]) context.system.deadLetters else sender
Try(write(persistent.copy(sender = Serialization.serializedActorPath(sdr), resolved = false, confirmTarget = null, confirmMessage = null))) match {
Try(write(persistent.copy(sender = sdr, resolved = false, confirmTarget = null, confirmMessage = null))) match {
case Success(_) processor forward WriteSuccess(persistent)
case Failure(e) processor forward WriteFailure(persistent, e); throw e
}
}
case Replay(fromSequenceNr, toSequenceNr, processorId, processor) {
replayAsync(processorId, fromSequenceNr, toSequenceNr) { p
if (!p.deleted) processor.tell(Replayed(p), extension.system.provider.resolveActorRef(p.sender))
if (!p.deleted) processor.tell(Replayed(p), p.sender)
} map {
maxSnr ReplaySuccess(maxSnr)
} recover {

View file

@ -13,35 +13,16 @@ import akka.japi.Procedure
import akka.persistence.journal.{ AsyncReplay SAsyncReplay }
import akka.persistence.PersistentImpl
abstract class AsyncReplay extends SAsyncReplay { this: Actor
/**
* Java API.
*
* Asynchronous message replay interface.
*/
abstract class AsyncReplay extends SAsyncReplay with AsyncReplayPlugin { this: Actor
import context.dispatcher
final def replayAsync(processorId: String, fromSequenceNr: Long, toSequenceNr: Long)(replayCallback: (PersistentImpl) Unit) =
doReplayAsync(processorId, fromSequenceNr, toSequenceNr, new Procedure[PersistentImpl] {
def apply(p: PersistentImpl) = replayCallback(p)
}).map(_.longValue)
/**
* Plugin Java API.
*
* Asynchronously replays persistent messages. Implementations replay a message
* by calling `replayCallback`. The returned future must be completed when all
* messages (matching the sequence number bounds) have been replayed. The future
* `Long` value must be the highest stored sequence number in the journal for the
* specified processor. The future must be completed with a failure if any of
* the persistent messages could not be replayed.
*
* The `replayCallback` must also be called with messages that have been marked
* as deleted. In this case a replayed message's `deleted` field must be set to
* `true`.
*
* The channel ids of delivery confirmations that are available for a replayed
* message must be contained in that message's `confirms` sequence.
*
* @param processorId processor id.
* @param fromSequenceNr sequence number where replay should start.
* @param toSequenceNr sequence number where replay should end (inclusive).
* @param replayCallback called to replay a single message.
*/
def doReplayAsync(processorId: String, fromSequenceNr: Long, toSequenceNr: Long, replayCallback: Procedure[PersistentImpl]): Future[JLong]
}

View file

@ -14,7 +14,7 @@ import akka.persistence.PersistentImpl
*
* Abstract journal, optimized for asynchronous, non-blocking writes.
*/
abstract class AsyncWriteJournal extends AsyncReplay with SAsyncWriteJournal {
abstract class AsyncWriteJournal extends AsyncReplay with SAsyncWriteJournal with AsyncWritePlugin {
import context.dispatcher
final def writeAsync(persistent: PersistentImpl) =
@ -25,25 +25,4 @@ abstract class AsyncWriteJournal extends AsyncReplay with SAsyncWriteJournal {
final def confirmAsync(processorId: String, sequenceNr: Long, channelId: String) =
doConfirmAsync(processorId, sequenceNr, channelId).map(Unit.unbox)
/**
* Plugin Java API.
*
* Asynchronously writes a `persistent` message to the journal.
*/
def doWriteAsync(persistent: PersistentImpl): Future[Void]
/**
* Plugin Java API.
*
* Asynchronously marks a `persistent` message as deleted.
*/
def doDeleteAsync(persistent: PersistentImpl): Future[Void]
/**
* Plugin Java API.
*
* Asynchronously writes a delivery confirmation to the journal.
*/
def doConfirmAsync(processorId: String, sequenceNr: Long, channelId: String): Future[Void]
}

View file

@ -12,7 +12,7 @@ import akka.persistence.PersistentImpl
*
* Abstract journal, optimized for synchronous writes.
*/
abstract class SyncWriteJournal extends AsyncReplay with SSyncWriteJournal {
abstract class SyncWriteJournal extends AsyncReplay with SSyncWriteJournal with SyncWritePlugin {
final def write(persistent: PersistentImpl) =
doWrite(persistent)
@ -21,28 +21,4 @@ abstract class SyncWriteJournal extends AsyncReplay with SSyncWriteJournal {
final def confirm(processorId: String, sequenceNr: Long, channelId: String) =
doConfirm(processorId, sequenceNr, channelId)
/**
* Plugin Java API.
*
* Synchronously writes a `persistent` message to the journal.
*/
@throws(classOf[Exception])
def doWrite(persistent: PersistentImpl): Unit
/**
* Plugin Java API.
*
* Synchronously marks a `persistent` message as deleted.
*/
@throws(classOf[Exception])
def doDelete(persistent: PersistentImpl): Unit
/**
* Plugin Java API.
*
* Synchronously writes a delivery confirmation to the journal.
*/
@throws(classOf[Exception])
def doConfirm(processorId: String, sequenceNr: Long, channelId: String): Unit
}

View file

@ -32,9 +32,7 @@ private[leveldb] class LeveldbJournal extends SyncWriteJournal with LeveldbIdMap
// needed if default processor and channel ids are used
// (actor paths, which contain deployment information).
// TODO: use protobuf serializer for PersistentImpl
// TODO: use user-defined serializer for payload
val serializer = SerializationExtension(context.system).findSerializerFor("")
val serialization = SerializationExtension(context.system)
import Key._
@ -55,8 +53,8 @@ private[leveldb] class LeveldbJournal extends SyncWriteJournal with LeveldbIdMap
def leveldbSnapshot = leveldbReadOptions.snapshot(leveldb.getSnapshot)
def leveldbIterator = leveldb.iterator(leveldbSnapshot)
def persistentToBytes(p: PersistentImpl): Array[Byte] = serializer.toBinary(p)
def persistentFromBytes(a: Array[Byte]): PersistentImpl = serializer.fromBinary(a).asInstanceOf[PersistentImpl]
def persistentToBytes(p: PersistentImpl): Array[Byte] = serialization.serialize(p).get
def persistentFromBytes(a: Array[Byte]): PersistentImpl = serialization.deserialize(a, classOf[PersistentImpl]).get
private def withBatch[R](body: WriteBatch R): R = {
val batch = leveldb.createWriteBatch()

View file

@ -0,0 +1,126 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence.serialization
import scala.language.existentials
import com.google.protobuf._
import akka.actor.ExtendedActorSystem
import akka.persistence._
import akka.persistence.serialization.MessageFormats._
import akka.serialization._
/**
* Protobuf serializer for [[Persistent]] and `Confirm` messages.
*/
class MessageSerializer(val system: ExtendedActorSystem) extends Serializer {
import PersistentImpl.Undefined
val PersistentClass = classOf[PersistentImpl]
val ConfirmClass = classOf[Confirm]
def identifier: Int = 7
def includeManifest: Boolean = true
/**
* Serializes a [[Persistent]] message. Delegates serialization of the persistent message's
* payload to a matching `akka.serialization.Serializer`.
*/
def toBinary(o: AnyRef): Array[Byte] = o match {
case p: PersistentImpl persistentMessageBuilder(p).build().toByteArray
case c: Confirm confirmMessageBuilder(c).build().toByteArray
case _ throw new IllegalArgumentException(s"Can't serialize object of type ${o.getClass}")
}
/**
* Deserializes a [[Persistent]] message. Delegates deserialization of the persistent message's
* payload to a matching `akka.serialization.Serializer`.
*/
def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef = manifest match {
case None persistent(PersistentMessage.parseFrom(bytes))
case Some(c) c match {
case PersistentClass persistent(PersistentMessage.parseFrom(bytes))
case ConfirmClass confirm(ConfirmMessage.parseFrom(bytes))
case _ throw new IllegalArgumentException(s"Can't deserialize object of type ${c}")
}
}
//
// toBinary helpers
//
private def persistentMessageBuilder(persistent: PersistentImpl) = {
val builder = PersistentMessage.newBuilder
if (persistent.processorId != Undefined) builder.setProcessorId(persistent.processorId)
if (persistent.channelId != Undefined) builder.setChannelId(persistent.channelId)
if (persistent.confirmMessage != null) builder.setConfirmMessage(confirmMessageBuilder(persistent.confirmMessage))
if (persistent.confirmTarget != null) builder.setConfirmTarget(Serialization.serializedActorPath(persistent.confirmTarget))
if (persistent.sender != null) builder.setSender(Serialization.serializedActorPath(persistent.sender))
persistent.confirms.foreach(builder.addConfirms)
builder.setPayload(persistentPayloadBuilder(persistent.payload.asInstanceOf[AnyRef]))
builder.setSequenceNr(persistent.sequenceNr)
builder.setDeleted(persistent.deleted)
builder.setResolved(persistent.resolved)
builder
}
private def persistentPayloadBuilder(payload: AnyRef) = {
val serializer = SerializationExtension(system).findSerializerFor(payload)
val builder = PersistentPayload.newBuilder()
if (serializer.includeManifest) builder.setPayloadManifest((ByteString.copyFromUtf8(payload.getClass.getName)))
builder.setPayload(ByteString.copyFrom(serializer.toBinary(payload)))
builder.setSerializerId(serializer.identifier)
builder
}
private def confirmMessageBuilder(confirm: Confirm) = {
ConfirmMessage.newBuilder
.setProcessorId(confirm.processorId)
.setSequenceNr(confirm.sequenceNr)
.setChannelId(confirm.channelId)
}
//
// fromBinary helpers
//
private def persistent(persistentMessage: PersistentMessage): PersistentImpl = {
import scala.collection.JavaConverters._
PersistentImpl(
payload(persistentMessage.getPayload),
persistentMessage.getSequenceNr,
if (persistentMessage.hasProcessorId) persistentMessage.getProcessorId else Undefined,
if (persistentMessage.hasChannelId) persistentMessage.getChannelId else Undefined,
persistentMessage.getDeleted,
persistentMessage.getResolved,
persistentMessage.getConfirmsList.asScala.toList,
if (persistentMessage.hasConfirmMessage) confirm(persistentMessage.getConfirmMessage) else null,
if (persistentMessage.hasConfirmTarget) system.provider.resolveActorRef(persistentMessage.getConfirmTarget) else null,
if (persistentMessage.hasSender) system.provider.resolveActorRef(persistentMessage.getSender) else null)
}
private def payload(persistentPayload: PersistentPayload): Any = {
val payloadClass = if (persistentPayload.hasPayloadManifest)
Some(system.dynamicAccess.getClassFor[AnyRef](persistentPayload.getPayloadManifest.toStringUtf8).get) else None
SerializationExtension(system).deserialize(
persistentPayload.getPayload.toByteArray,
persistentPayload.getSerializerId,
payloadClass).get
}
private def confirm(confirmMessage: ConfirmMessage): Confirm = {
Confirm(
confirmMessage.getProcessorId,
confirmMessage.getSequenceNr,
confirmMessage.getChannelId)
}
}

View file

@ -0,0 +1,87 @@
/**
* Copyright (C) 2012-2013 Eligotech BV.
*/
package akka.persistence.serialization
import java.io._
import akka.actor._
import akka.serialization.{ Serializer, SerializationExtension }
/**
* Wrapper for snapshot `data`. Snapshot `data` are the actual snapshot objects captured by
* a [[Processor]].
*
* @see [[SnapshotSerializer]]
*/
@SerialVersionUID(1L)
case class Snapshot(data: Any)
/**
* INTERNAL API.
*/
@SerialVersionUID(1L)
private[serialization] case class SnapshotHeader(serializerId: Int, manifest: Option[String])
/**
* [[Snapshot]] serializer.
*/
class SnapshotSerializer(system: ExtendedActorSystem) extends Serializer {
def identifier: Int = 8
def includeManifest: Boolean = false
/**
* Serializes a [[Snapshot]]. Delegates serialization of snapshot `data` to a matching
* `akka.serialization.Serializer`.
*/
def toBinary(o: AnyRef): Array[Byte] = o match {
case Snapshot(data) snapshotToBinary(data.asInstanceOf[AnyRef])
case _ throw new IllegalArgumentException(s"Can't serialize object of type ${o.getClass}")
}
/**
* Deserializes a [[Snapshot]]. Delegates deserialization of snapshot `data` to a matching
* `akka.serialization.Serializer`.
*/
def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef =
Snapshot(snapshotFromBinary(bytes))
private def snapshotToBinary(snapshot: AnyRef): Array[Byte] = {
val extension = SerializationExtension(system)
val snapshotSerializer = extension.findSerializerFor(snapshot)
val snapshotManifest = if (snapshotSerializer.includeManifest) Some(snapshot.getClass.getName) else None
val header = SnapshotHeader(snapshotSerializer.identifier, snapshotManifest)
val headerSerializer = extension.findSerializerFor(header)
val headerBytes = headerSerializer.toBinary(header)
val out = new ByteArrayOutputStream
writeInt(out, headerBytes.length)
out.write(headerBytes)
out.write(snapshotSerializer.toBinary(snapshot))
out.toByteArray
}
private def snapshotFromBinary(bytes: Array[Byte]): AnyRef = {
val extension = SerializationExtension(system)
val headerLength = readInt(new ByteArrayInputStream(bytes))
val headerBytes = bytes.slice(4, headerLength + 4)
val snapshotBytes = bytes.drop(headerLength + 4)
val header = extension.deserialize(headerBytes, classOf[SnapshotHeader]).get
val manifest = header.manifest.map(system.dynamicAccess.getClassFor[AnyRef](_).get)
extension.deserialize[AnyRef](snapshotBytes, header.serializerId, manifest).get
}
private def writeInt(outputStream: OutputStream, i: Int) =
0 to 24 by 8 foreach { shift outputStream.write(i >> shift) }
private def readInt(inputStream: InputStream) =
(0 to 24 by 8).foldLeft(0) { (id, shift) (id | (inputStream.read() >> shift)) }
}

View file

@ -0,0 +1,26 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence
import java.io.{ ByteArrayOutputStream, InputStream }
package object serialization {
/**
* Converts an input stream to a byte array.
*/
def streamToBytes(inputStream: InputStream): Array[Byte] = {
val len = 16384
val buf = Array.ofDim[Byte](len)
val out = new ByteArrayOutputStream
@scala.annotation.tailrec
def copy(): Array[Byte] = {
val n = inputStream.read(buf, 0, len)
if (n != -1) { out.write(buf, 0, n); copy() } else out.toByteArray
}
copy()
}
}

View file

@ -1,57 +0,0 @@
/**
* Copyright (C) 2012-2013 Eligotech BV.
*/
package akka.persistence.snapshot
import java.io._
import akka.actor._
import akka.persistence.SnapshotMetadata
import akka.util.ClassLoaderObjectInputStream
/**
* Snapshot serialization extension.
*/
private[persistence] object SnapshotSerialization extends ExtensionId[SnapshotSerialization] with ExtensionIdProvider {
def createExtension(system: ExtendedActorSystem): SnapshotSerialization = new SnapshotSerialization(system)
def lookup() = SnapshotSerialization
}
/**
* Snapshot serialization extension.
*/
private[persistence] class SnapshotSerialization(val system: ExtendedActorSystem) extends Extension {
import akka.serialization.JavaSerializer
/**
* Java serialization based snapshot serializer.
*/
val java = new SnapshotSerializer {
def serialize(stream: OutputStream, metadata: SnapshotMetadata, state: Any) = {
val out = new ObjectOutputStream(stream)
JavaSerializer.currentSystem.withValue(system) { out.writeObject(state) }
}
def deserialize(stream: InputStream, metadata: SnapshotMetadata) = {
val in = new ClassLoaderObjectInputStream(system.dynamicAccess.classLoader, stream)
JavaSerializer.currentSystem.withValue(system) { in.readObject }
}
}
}
/**
* Stream-based snapshot serializer.
*/
private[persistence] trait SnapshotSerializer {
/**
* Serializes a `snapshot` to an output stream.
*/
def serialize(stream: OutputStream, metadata: SnapshotMetadata, snapshot: Any): Unit
/**
* Deserializes a snapshot from an input stream.
*/
def deserialize(stream: InputStream, metadata: SnapshotMetadata): Any
}

View file

@ -5,7 +5,6 @@
package akka.persistence.snapshot
import scala.concurrent.Future
import scala.util._
import akka.actor._
import akka.pattern.pipe

View file

@ -10,7 +10,12 @@ import akka.japi.{ Option ⇒ JOption }
import akka.persistence._
import akka.persistence.snapshot.{ SnapshotStore SSnapshotStore }
abstract class SnapshotStore extends SSnapshotStore {
/**
* Java API.
*
* Abstract snapshot store.
*/
abstract class SnapshotStore extends SSnapshotStore with SnapshotStorePlugin {
import context.dispatcher
final def loadAsync(processorId: String, criteria: SnapshotSelectionCriteria) =
@ -24,44 +29,4 @@ abstract class SnapshotStore extends SSnapshotStore {
final def delete(metadata: SnapshotMetadata) =
doDelete(metadata)
/**
* Plugin Java API.
*
* Asynchronously loads a snapshot.
*
* @param processorId processor id.
* @param criteria selection criteria for loading.
*/
def doLoadAsync(processorId: String, criteria: SnapshotSelectionCriteria): Future[JOption[SelectedSnapshot]]
/**
* Plugin Java API.
*
* Asynchronously saves a snapshot.
*
* @param metadata snapshot metadata.
* @param snapshot snapshot.
*/
def doSaveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Void]
/**
* Plugin Java API.
*
* Called after successful saving of a snapshot.
*
* @param metadata snapshot metadata.
*/
@throws(classOf[Exception])
def onSaved(metadata: SnapshotMetadata): Unit
/**
* Plugin Java API.
*
* Deletes the snapshot identified by `metadata`.
*
* @param metadata snapshot metadata.
*/
@throws(classOf[Exception])
def doDelete(metadata: SnapshotMetadata): Unit
}

View file

@ -14,6 +14,8 @@ import scala.util._
import akka.actor.ActorLogging
import akka.persistence._
import akka.persistence.snapshot._
import akka.persistence.serialization._
import akka.serialization.SerializationExtension
/**
* INTERNAL API.
@ -27,8 +29,7 @@ private[persistence] class LocalSnapshotStore extends SnapshotStore with ActorLo
private val streamDispatcher = context.system.dispatchers.lookup(config.getString("stream-dispatcher"))
private val snapshotDir = new File(config.getString("dir"))
// TODO: make snapshot serializer configurable
private val snapshotSerializer = SnapshotSerialization(context.system).java
private val serializationExtension = SerializationExtension(context.system)
private var snapshotMetadata = Map.empty[String, SortedSet[SnapshotMetadata]]
def loadAsync(processorId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] =
@ -57,8 +58,8 @@ private[persistence] class LocalSnapshotStore extends SnapshotStore with ActorLo
def load(metadata: SortedSet[SnapshotMetadata]): Option[SelectedSnapshot] = metadata.lastOption match {
case None None
case Some(md) {
Try(withInputStream(md)(snapshotSerializer.deserialize(_, md))) match {
case Success(s) Some(SelectedSnapshot(md, s))
Try(withInputStream(md)(deserialize)) match {
case Success(s) Some(SelectedSnapshot(md, s.data))
case Failure(e) {
log.error(e, s"error loading snapshot ${md}")
load(metadata.init) // try older snapshot
@ -84,12 +85,18 @@ private[persistence] class LocalSnapshotStore extends SnapshotStore with ActorLo
}
private def save(metadata: SnapshotMetadata, snapshot: Any): Unit =
withOutputStream(metadata)(snapshotSerializer.serialize(_, metadata, snapshot))
withOutputStream(metadata)(serialize(_, Snapshot(snapshot)))
private def withOutputStream(metadata: SnapshotMetadata)(p: (OutputStream) Unit) =
protected def deserialize(inputStream: InputStream): Snapshot =
serializationExtension.deserialize(streamToBytes(inputStream), classOf[Snapshot]).get
protected def serialize(outputStream: OutputStream, snapshot: Snapshot): Unit =
outputStream.write(serializationExtension.findSerializerFor(snapshot).toBinary(snapshot))
private def withOutputStream(metadata: SnapshotMetadata)(p: (OutputStream) Unit): Unit =
withStream(new BufferedOutputStream(new FileOutputStream(snapshotFile(metadata))), p)
private def withInputStream(metadata: SnapshotMetadata)(p: (InputStream) Any) =
private def withInputStream[T](metadata: SnapshotMetadata)(p: (InputStream) T): T =
withStream(new BufferedInputStream(new FileInputStream(snapshotFile(metadata))), p)
private def withStream[A <: Closeable, B](stream: A, p: A B): B =

View file

@ -0,0 +1,196 @@
package akka.persistence.serialization
import com.typesafe.config._
import akka.actor._
import akka.persistence._
import akka.serialization._
import akka.testkit._
object SerializerSpecConfigs {
val common =
"""
serialize-creators = on
serialize-messages = on
"""
val customSerializers =
"""
akka.actor {
serializers {
my-payload = "akka.persistence.serialization.MyPayloadSerializer"
my-snapshot = "akka.persistence.serialization.MySnapshotSerializer"
}
serialization-bindings {
"akka.persistence.serialization.MyPayload" = my-payload
"akka.persistence.serialization.MySnapshot" = my-snapshot
}
}
"""
val remoteCommon =
"""
akka {
actor {
provider = "akka.remote.RemoteActorRefProvider"
}
remote {
enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp.hostname = "127.0.0.1"
}
loglevel = ERROR
log-dead-letters = 0
log-dead-letters-during-shutdown = off
}
"""
val systemA = "akka.remote.netty.tcp.port = 0"
val systemB = "akka.remote.netty.tcp.port = 0"
def config(configs: String*): Config =
configs.foldLeft(ConfigFactory.parseString(common))((r, c) r.withFallback(ConfigFactory.parseString(c)))
}
import SerializerSpecConfigs._
class SnapshotSerializerPersistenceSpec extends AkkaSpec(config(customSerializers)) {
val serialization = SerializationExtension(system)
"A snapshot serializer" must {
"handle custom snapshot Serialization" in {
val wrapped = Snapshot(MySnapshot("a"))
val serializer = serialization.findSerializerFor(wrapped)
val bytes = serializer.toBinary(wrapped)
val deserialized = serializer.fromBinary(bytes, None)
deserialized must be(Snapshot(MySnapshot(".a.")))
}
}
}
class MessageSerializerPersistenceSpec extends AkkaSpec(config(customSerializers)) {
val serialization = SerializationExtension(system)
"A message serializer" when {
"not given a manifest" must {
"handle custom persistent message serialization" in {
val persistent = PersistentImpl(MyPayload("a"), 13, "p1", "c1", true, true, Seq("c1", "c2"), Confirm("p2", 14, "c2"), testActor, testActor)
val serializer = serialization.findSerializerFor(persistent)
val bytes = serializer.toBinary(persistent)
val deserialized = serializer.fromBinary(bytes, None)
deserialized must be(persistent.withPayload(MyPayload(".a.")))
}
}
"given a persistent message manifest" must {
"handle custom persistent message serialization" in {
val persistent = PersistentImpl(MyPayload("b"), 13, "p1", "c1", true, true, Seq("c1", "c2"), Confirm("p2", 14, "c2"), testActor, testActor)
val serializer = serialization.findSerializerFor(persistent)
val bytes = serializer.toBinary(persistent)
val deserialized = serializer.fromBinary(bytes, Some(classOf[PersistentImpl]))
deserialized must be(persistent.withPayload(MyPayload(".b.")))
}
}
"given a confirmation message manifest" must {
"handle confirmation message serialization" in {
val confirmation = Confirm("x", 2, "y")
val serializer = serialization.findSerializerFor(confirmation)
val bytes = serializer.toBinary(confirmation)
val deserialized = serializer.fromBinary(bytes, Some(classOf[Confirm]))
deserialized must be(confirmation)
}
}
}
}
object MessageSerializerRemotingSpec {
class LocalActor(port: Int) extends Actor {
def receive = {
case m context.actorSelection(s"akka.tcp://remote@127.0.0.1:${port}/user/remote") tell (m, sender)
}
}
class RemoteActor extends Actor {
def receive = {
case Persistent(MyPayload(data), _) sender ! data
case Confirm(pid, snr, cid) sender ! s"${pid},${snr},${cid}"
}
}
def port(system: ActorSystem, protocol: String) =
addr(system, protocol).port.get
def addr(system: ActorSystem, protocol: String) =
system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
}
class MessageSerializerRemotingSpec extends AkkaSpec(config(systemA).withFallback(config(customSerializers, remoteCommon))) with ImplicitSender {
import MessageSerializerRemotingSpec._
val remoteSystem = ActorSystem("remote", config(systemB).withFallback(config(customSerializers, remoteCommon)))
val localActor = system.actorOf(Props(classOf[LocalActor], port(remoteSystem, "tcp")))
override protected def atStartup() {
remoteSystem.actorOf(Props[RemoteActor], "remote")
}
override def afterTermination() {
remoteSystem.shutdown()
remoteSystem.awaitTermination()
}
"A message serializer" must {
"custom-serialize persistent messages during remoting" in {
localActor ! Persistent(MyPayload("a"))
expectMsg(".a.")
}
"serialize confirmation messages during remoting" in {
localActor ! Confirm("a", 2, "b")
expectMsg("a,2,b")
}
}
}
case class MyPayload(data: String)
case class MySnapshot(data: String)
class MyPayloadSerializer extends Serializer {
val MyPayloadClass = classOf[MyPayload]
def identifier: Int = 77123
def includeManifest: Boolean = true
def toBinary(o: AnyRef): Array[Byte] = o match {
case MyPayload(data) s".${data}".getBytes("UTF-8")
}
def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef = manifest match {
case Some(MyPayloadClass) MyPayload(s"${new String(bytes, "UTF-8")}.")
case Some(c) throw new Exception(s"unexpected manifest ${c}")
case None throw new Exception("no manifest")
}
}
class MySnapshotSerializer extends Serializer {
val MySnapshotClass = classOf[MySnapshot]
def identifier: Int = 77124
def includeManifest: Boolean = true
def toBinary(o: AnyRef): Array[Byte] = o match {
case MySnapshot(data) s".${data}".getBytes("UTF-8")
}
def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef = manifest match {
case Some(MySnapshotClass) MySnapshot(s"${new String(bytes, "UTF-8")}.")
case Some(c) throw new Exception(s"unexpected manifest ${c}")
case None throw new Exception("no manifest")
}
}

View file

@ -275,7 +275,7 @@ object AkkaBuild extends Build {
lazy val persistence = Project(
id = "akka-persistence-experimental",
base = file("akka-persistence"),
dependencies = Seq(actor, testkit % "test->test"),
dependencies = Seq(actor, remote % "test->test", testkit % "test->test"),
settings = defaultSettings ++ scaladocSettings ++ experimentalSettings ++ javadocSettings ++ OSGi.persistence ++ Seq(
libraryDependencies ++= Dependencies.persistence,
previousArtifact := akkaPreviousArtifact("akka-persistence")
@ -994,7 +994,7 @@ object AkkaBuild extends Build {
val transactor = exports(Seq("akka.transactor.*"))
val persistence = exports(Seq("akka.persistence.*"))
val persistence = exports(Seq("akka.persistence.*"), imports = Seq(protobufImport()))
val testkit = exports(Seq("akka.testkit.*"))
@ -1104,7 +1104,7 @@ object Dependencies {
val transactor = Seq(scalaStm, Test.scalatest, Test.junit)
val persistence = Seq(levelDB, Test.scalatest, Test.junit, Test.commonsIo)
val persistence = Seq(levelDB, protobuf, Test.scalatest, Test.junit, Test.commonsIo)
val mailboxes = Seq(Test.scalatest, Test.junit)