+per #20257 Snapshots with PersistentFSM

This commit is contained in:
leonidb 2016-04-14 02:31:33 +03:00 committed by leonid
parent 86f54e8719
commit 68e479a050
9 changed files with 1043 additions and 29 deletions

View file

@ -7,7 +7,7 @@ package akka.persistence.serialization
import akka.actor.{ ActorPath, ExtendedActorSystem }
import akka.persistence.AtLeastOnceDelivery._
import akka.persistence._
import akka.persistence.fsm.PersistentFSM.StateChangeEvent
import akka.persistence.fsm.PersistentFSM.{ PersistentFSMSnapshot, StateChangeEvent }
import akka.persistence.serialization.{ MessageFormats mf }
import akka.serialization._
import akka.protobuf._
@ -33,6 +33,7 @@ class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer
val PersistentImplClass = classOf[PersistentImpl]
val AtLeastOnceDeliverySnapshotClass = classOf[AtLeastOnceDeliverySnapshot]
val PersistentStateChangeEventClass = classOf[StateChangeEvent]
val PersistentFSMSnapshotClass = classOf[PersistentFSMSnapshot[Any]]
private lazy val serialization = SerializationExtension(system)
@ -53,6 +54,7 @@ class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer
case a: AtomicWrite atomicWriteBuilder(a).build().toByteArray
case a: AtLeastOnceDeliverySnapshot atLeastOnceDeliverySnapshotBuilder(a).build.toByteArray
case s: StateChangeEvent stateChangeBuilder(s).build.toByteArray
case p: PersistentFSMSnapshot[Any] persistentFSMSnapshotBuilder(p).build.toByteArray
case _ throw new IllegalArgumentException(s"Can't serialize object of type ${o.getClass}")
}
@ -68,6 +70,7 @@ class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer
case AtomicWriteClass atomicWrite(mf.AtomicWrite.parseFrom(bytes))
case AtLeastOnceDeliverySnapshotClass atLeastOnceDeliverySnapshot(mf.AtLeastOnceDeliverySnapshot.parseFrom(bytes))
case PersistentStateChangeEventClass stateChange(mf.PersistentStateChangeEvent.parseFrom(bytes))
case PersistentFSMSnapshotClass persistentFSMSnapshot(mf.PersistentFSMSnapshot.parseFrom(bytes))
case _ throw new IllegalArgumentException(s"Can't deserialize object of type ${c}")
}
}
@ -94,7 +97,17 @@ class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer
val builder = mf.PersistentStateChangeEvent.newBuilder.setStateIdentifier(stateChange.stateIdentifier)
stateChange.timeout match {
case None builder
case Some(timeout) builder.setTimeout(timeout.toString())
case Some(timeout) builder.setTimeoutNanos(timeout.toNanos)
}
}
private[persistence] def persistentFSMSnapshotBuilder(persistentFSMSnapshot: PersistentFSMSnapshot[Any]): mf.PersistentFSMSnapshot.Builder = {
val builder = mf.PersistentFSMSnapshot.newBuilder
.setStateIdentifier(persistentFSMSnapshot.stateIdentifier)
.setData(persistentPayloadBuilder(persistentFSMSnapshot.data.asInstanceOf[AnyRef]))
persistentFSMSnapshot.timeout match {
case None builder
case Some(timeout) builder.setTimeoutNanos(timeout.toNanos)
}
}
@ -114,7 +127,17 @@ class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer
def stateChange(persistentStateChange: mf.PersistentStateChangeEvent): StateChangeEvent = {
StateChangeEvent(
persistentStateChange.getStateIdentifier,
if (persistentStateChange.hasTimeout) Some(Duration(persistentStateChange.getTimeout).asInstanceOf[duration.FiniteDuration]) else None)
// timeout field is deprecated, left for backward compatibility. timeoutNanos is used instead.
if (persistentStateChange.hasTimeoutNanos) Some(Duration.fromNanos(persistentStateChange.getTimeoutNanos))
else if (persistentStateChange.hasTimeout) Some(Duration(persistentStateChange.getTimeout).asInstanceOf[duration.FiniteDuration])
else None)
}
def persistentFSMSnapshot(persistentFSMSnapshot: mf.PersistentFSMSnapshot): PersistentFSMSnapshot[Any] = {
PersistentFSMSnapshot(
persistentFSMSnapshot.getStateIdentifier,
payload(persistentFSMSnapshot.getData),
if (persistentFSMSnapshot.hasTimeoutNanos) Some(Duration.fromNanos(persistentFSMSnapshot.getTimeoutNanos)) else None)
}
private def atomicWriteBuilder(a: AtomicWrite) = {