+per #15279 FSM for PersistentActor

This commit is contained in:
leonidb 2014-11-09 14:12:36 +02:00
parent 122fdedd08
commit 09b6abd614
22 changed files with 3595 additions and 16 deletions

View file

@ -4,6 +4,8 @@
package akka.persistence.serialization
import scala.concurrent.duration
import scala.concurrent.duration.Duration
import scala.language.existentials
import com.google.protobuf._
import akka.actor.{ ActorPath, ExtendedActorSystem }
@ -14,6 +16,7 @@ import akka.serialization._
import akka.persistence.AtLeastOnceDelivery.{ AtLeastOnceDeliverySnapshot AtLeastOnceDeliverySnap }
import akka.persistence.AtLeastOnceDelivery.UnconfirmedDelivery
import scala.collection.immutable.VectorBuilder
import akka.persistence.fsm.PersistentFsmActor.StateChangeEvent
/**
* Marker trait for all protobuf-serializable messages in `akka.persistence`.
@ -21,7 +24,7 @@ import scala.collection.immutable.VectorBuilder
trait Message extends Serializable
/**
* Protobuf serializer for [[akka.persistence.PersistentRepr]] and [[akka.persistence.AtLeastOnceDelivery]] messages.
* Protobuf serializer for [[akka.persistence.PersistentRepr]], [[akka.persistence.AtLeastOnceDelivery]] and [[akka.persistence.fsm.PersistentFsmActor.StateChangeEvent]] messages.
*/
class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer {
import PersistentRepr.Undefined
@ -29,6 +32,7 @@ class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer
val PersistentReprClass = classOf[PersistentRepr]
val PersistentImplClass = classOf[PersistentImpl]
val AtLeastOnceDeliverySnapshotClass = classOf[AtLeastOnceDeliverySnap]
val PersistentStateChangeEventClass = classOf[StateChangeEvent]
override val includeManifest: Boolean = true
@ -45,6 +49,7 @@ class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer
def toBinary(o: AnyRef): Array[Byte] = o match {
case p: PersistentRepr persistentMessageBuilder(p).build().toByteArray
case a: AtLeastOnceDeliverySnap atLeastOnceDeliverySnapshotBuilder(a).build.toByteArray
case s: StateChangeEvent stateChangeBuilder(s).build.toByteArray
case _ throw new IllegalArgumentException(s"Can't serialize object of type ${o.getClass}")
}
@ -58,6 +63,7 @@ class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer
case PersistentImplClass persistent(PersistentMessage.parseFrom(bytes))
case PersistentReprClass persistent(PersistentMessage.parseFrom(bytes))
case AtLeastOnceDeliverySnapshotClass atLeastOnceDeliverySnapshot(AtLeastOnceDeliverySnapshot.parseFrom(bytes))
case PersistentStateChangeEventClass stateChange(PersistentStateChangeEvent.parseFrom(bytes))
case _ throw new IllegalArgumentException(s"Can't deserialize object of type ${c}")
}
}
@ -80,6 +86,14 @@ class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer
builder
}
def stateChangeBuilder(stateChange: StateChangeEvent): PersistentStateChangeEvent.Builder = {
val builder = PersistentStateChangeEvent.newBuilder.setStateIdentifier(stateChange.stateIdentifier)
stateChange.timeout match {
case None builder
case Some(timeout) builder.setTimeout(timeout.toString())
}
}
def atLeastOnceDeliverySnapshot(atLeastOnceDeliverySnapshot: AtLeastOnceDeliverySnapshot): AtLeastOnceDeliverySnap = {
import scala.collection.JavaConverters._
val unconfirmedDeliveries = new VectorBuilder[UnconfirmedDelivery]()
@ -93,6 +107,12 @@ class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer
unconfirmedDeliveries.result())
}
def stateChange(persistentStateChange: PersistentStateChangeEvent): StateChangeEvent = {
StateChangeEvent(
persistentStateChange.getStateIdentifier,
if (persistentStateChange.hasTimeout) Some(Duration(persistentStateChange.getTimeout).asInstanceOf[duration.FiniteDuration]) else None)
}
private def persistentMessageBuilder(persistent: PersistentRepr) = {
val builder = PersistentMessage.newBuilder