=per #15230 deprecate Channel and PersistentChannel
Conflicts: akka-persistence/src/main/scala/akka/persistence/Channel.scala akka-persistence/src/main/scala/akka/persistence/PersistentChannel.scala akka-persistence/src/test/scala/akka/persistence/serialization/SerializerSpec.scala
This commit is contained in:
parent
50bf83cc17
commit
242473bae8
6 changed files with 64 additions and 28 deletions
|
|
@ -16,6 +16,8 @@ import akka.actor._
|
||||||
import akka.persistence.serialization.Message
|
import akka.persistence.serialization.Message
|
||||||
import akka.persistence.JournalProtocol._
|
import akka.persistence.JournalProtocol._
|
||||||
|
|
||||||
|
// TODO: remove Channel
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A [[Channel]] configuration object.
|
* A [[Channel]] configuration object.
|
||||||
*
|
*
|
||||||
|
|
@ -27,6 +29,7 @@ import akka.persistence.JournalProtocol._
|
||||||
* Alternatively, it can also confirm these messages, preventing further redeliveries.
|
* Alternatively, it can also confirm these messages, preventing further redeliveries.
|
||||||
*/
|
*/
|
||||||
@SerialVersionUID(1L)
|
@SerialVersionUID(1L)
|
||||||
|
@deprecated("Channel will be removed, see `akka.persistence.AtLeastOnceDelivery` instead.", since = "2.3.4")
|
||||||
final case class ChannelSettings(
|
final case class ChannelSettings(
|
||||||
val redeliverMax: Int = 5,
|
val redeliverMax: Int = 5,
|
||||||
val redeliverInterval: FiniteDuration = 5.seconds,
|
val redeliverInterval: FiniteDuration = 5.seconds,
|
||||||
|
|
@ -51,6 +54,7 @@ final case class ChannelSettings(
|
||||||
copy(redeliverFailureListener = Option(redeliverFailureListener))
|
copy(redeliverFailureListener = Option(redeliverFailureListener))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@deprecated("Channel will be removed, see `akka.persistence.AtLeastOnceDelivery` instead.", since = "2.3.4")
|
||||||
object ChannelSettings {
|
object ChannelSettings {
|
||||||
/**
|
/**
|
||||||
* Java API.
|
* Java API.
|
||||||
|
|
@ -118,6 +122,7 @@ object ChannelSettings {
|
||||||
*
|
*
|
||||||
* @see [[Deliver]]
|
* @see [[Deliver]]
|
||||||
*/
|
*/
|
||||||
|
@deprecated("Channel will be removed, see `akka.persistence.AtLeastOnceDelivery` instead.", since = "2.3.4")
|
||||||
final class Channel private[akka] (_channelId: Option[String], channelSettings: ChannelSettings) extends Actor {
|
final class Channel private[akka] (_channelId: Option[String], channelSettings: ChannelSettings) extends Actor {
|
||||||
import channelSettings._
|
import channelSettings._
|
||||||
|
|
||||||
|
|
@ -142,6 +147,7 @@ final class Channel private[akka] (_channelId: Option[String], channelSettings:
|
||||||
confirmMessage = DeliveredByChannel(persistent.persistenceId, id, persistent.sequenceNr, channel = self))
|
confirmMessage = DeliveredByChannel(persistent.persistenceId, id, persistent.sequenceNr, channel = self))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@deprecated("Channel will be removed, see `akka.persistence.AtLeastOnceDelivery` instead.", since = "2.3.4")
|
||||||
object Channel {
|
object Channel {
|
||||||
/**
|
/**
|
||||||
* Returns a channel actor configuration object for creating a [[Channel]] with a
|
* Returns a channel actor configuration object for creating a [[Channel]] with a
|
||||||
|
|
@ -187,8 +193,10 @@ object Channel {
|
||||||
* @param destination persistent message destination.
|
* @param destination persistent message destination.
|
||||||
*/
|
*/
|
||||||
@SerialVersionUID(1L)
|
@SerialVersionUID(1L)
|
||||||
|
@deprecated("Channel will be removed, see `akka.persistence.AtLeastOnceDelivery` instead.", since = "2.3.4")
|
||||||
final case class Deliver(persistent: Persistent, destination: ActorPath) extends Message
|
final case class Deliver(persistent: Persistent, destination: ActorPath) extends Message
|
||||||
|
|
||||||
|
@deprecated("Channel will be removed, see `akka.persistence.AtLeastOnceDelivery` instead.", since = "2.3.4")
|
||||||
object Deliver {
|
object Deliver {
|
||||||
/**
|
/**
|
||||||
* Java API.
|
* Java API.
|
||||||
|
|
@ -200,6 +208,7 @@ object Deliver {
|
||||||
* Plugin API: confirmation message generated by receivers of [[ConfirmablePersistent]] messages
|
* Plugin API: confirmation message generated by receivers of [[ConfirmablePersistent]] messages
|
||||||
* by calling `ConfirmablePersistent.confirm()`.
|
* by calling `ConfirmablePersistent.confirm()`.
|
||||||
*/
|
*/
|
||||||
|
@deprecated("Channel will be removed, see `akka.persistence.AtLeastOnceDelivery` instead.", since = "2.3.4")
|
||||||
trait Delivered extends Message {
|
trait Delivered extends Message {
|
||||||
def channelId: String
|
def channelId: String
|
||||||
def persistentSequenceNr: Long
|
def persistentSequenceNr: Long
|
||||||
|
|
@ -215,6 +224,7 @@ trait Delivered extends Message {
|
||||||
/**
|
/**
|
||||||
* Plugin API.
|
* Plugin API.
|
||||||
*/
|
*/
|
||||||
|
@deprecated("Channel will be removed, see `akka.persistence.AtLeastOnceDelivery` instead.", since = "2.3.4")
|
||||||
final case class DeliveredByChannel(
|
final case class DeliveredByChannel(
|
||||||
@deprecatedName('processorId) persistenceId: String,
|
@deprecatedName('processorId) persistenceId: String,
|
||||||
channelId: String,
|
channelId: String,
|
||||||
|
|
@ -230,6 +240,7 @@ final case class DeliveredByChannel(
|
||||||
/**
|
/**
|
||||||
* INTERNAL API.
|
* INTERNAL API.
|
||||||
*/
|
*/
|
||||||
|
@deprecated("Channel will be removed, see `akka.persistence.AtLeastOnceDelivery` instead.", since = "2.3.4")
|
||||||
private[persistence] class DeliveredByChannelBatching(journal: ActorRef, settings: PersistenceSettings) extends Actor {
|
private[persistence] class DeliveredByChannelBatching(journal: ActorRef, settings: PersistenceSettings) extends Actor {
|
||||||
private val publish = settings.internal.publishConfirmations
|
private val publish = settings.internal.publishConfirmations
|
||||||
private val batchMax = settings.journal.maxConfirmationBatchSize
|
private val batchMax = settings.journal.maxConfirmationBatchSize
|
||||||
|
|
@ -270,6 +281,7 @@ private[persistence] class DeliveredByChannelBatching(journal: ActorRef, setting
|
||||||
* Notification message to inform channel listeners about messages that have reached the maximum
|
* Notification message to inform channel listeners about messages that have reached the maximum
|
||||||
* number of redeliveries.
|
* number of redeliveries.
|
||||||
*/
|
*/
|
||||||
|
@deprecated("Channel will be removed, see `akka.persistence.AtLeastOnceDelivery` instead.", since = "2.3.4")
|
||||||
final case class RedeliverFailure(messages: immutable.Seq[ConfirmablePersistent]) {
|
final case class RedeliverFailure(messages: immutable.Seq[ConfirmablePersistent]) {
|
||||||
/**
|
/**
|
||||||
* Java API.
|
* Java API.
|
||||||
|
|
@ -281,6 +293,7 @@ final case class RedeliverFailure(messages: immutable.Seq[ConfirmablePersistent]
|
||||||
* Reliably deliver messages contained in [[Deliver]] requests to their destinations. Unconfirmed
|
* Reliably deliver messages contained in [[Deliver]] requests to their destinations. Unconfirmed
|
||||||
* messages are redelivered according to the parameters in [[ChannelSettings]].
|
* messages are redelivered according to the parameters in [[ChannelSettings]].
|
||||||
*/
|
*/
|
||||||
|
@deprecated("Channel will be removed, see `akka.persistence.AtLeastOnceDelivery` instead.", since = "2.3.4")
|
||||||
private class ReliableDelivery(redeliverSettings: ChannelSettings) extends Actor {
|
private class ReliableDelivery(redeliverSettings: ChannelSettings) extends Actor {
|
||||||
import redeliverSettings._
|
import redeliverSettings._
|
||||||
import ReliableDelivery._
|
import ReliableDelivery._
|
||||||
|
|
@ -313,6 +326,7 @@ private class ReliableDelivery(redeliverSettings: ChannelSettings) extends Actor
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@deprecated("Channel will be removed, see `akka.persistence.AtLeastOnceDelivery` instead.", since = "2.3.4")
|
||||||
private object ReliableDelivery {
|
private object ReliableDelivery {
|
||||||
type DeliveryAttempts = immutable.SortedMap[Long, DeliveryAttempt]
|
type DeliveryAttempts = immutable.SortedMap[Long, DeliveryAttempt]
|
||||||
type FailedAttempts = Vector[ConfirmablePersistentImpl]
|
type FailedAttempts = Vector[ConfirmablePersistentImpl]
|
||||||
|
|
@ -328,6 +342,7 @@ private object ReliableDelivery {
|
||||||
/**
|
/**
|
||||||
* Redelivery process used by [[ReliableDelivery]].
|
* Redelivery process used by [[ReliableDelivery]].
|
||||||
*/
|
*/
|
||||||
|
@deprecated("Channel will be removed, see `akka.persistence.AtLeastOnceDelivery` instead.", since = "2.3.4")
|
||||||
private class Redelivery(redeliverSettings: ChannelSettings) extends Actor {
|
private class Redelivery(redeliverSettings: ChannelSettings) extends Actor {
|
||||||
import context.dispatcher
|
import context.dispatcher
|
||||||
import redeliverSettings._
|
import redeliverSettings._
|
||||||
|
|
|
||||||
|
|
@ -36,6 +36,7 @@ import akka.persistence.JournalProtocol._
|
||||||
* made after the configured timeout.
|
* made after the configured timeout.
|
||||||
*/
|
*/
|
||||||
@SerialVersionUID(1L)
|
@SerialVersionUID(1L)
|
||||||
|
@deprecated("PersistentChannel will be removed, see `akka.persistence.AtLeastOnceDelivery` instead.", since = "2.3.4")
|
||||||
final case class PersistentChannelSettings(
|
final case class PersistentChannelSettings(
|
||||||
val redeliverMax: Int = 5,
|
val redeliverMax: Int = 5,
|
||||||
val redeliverInterval: FiniteDuration = 5.seconds,
|
val redeliverInterval: FiniteDuration = 5.seconds,
|
||||||
|
|
@ -81,6 +82,12 @@ final case class PersistentChannelSettings(
|
||||||
def withPendingConfirmationsMin(pendingConfirmationsMin: Long): PersistentChannelSettings =
|
def withPendingConfirmationsMin(pendingConfirmationsMin: Long): PersistentChannelSettings =
|
||||||
copy(pendingConfirmationsMin = pendingConfirmationsMin)
|
copy(pendingConfirmationsMin = pendingConfirmationsMin)
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Java API.
|
||||||
|
*/
|
||||||
|
def withIdleTimeout(idleTimeout: FiniteDuration): PersistentChannelSettings =
|
||||||
|
copy(idleTimeout = idleTimeout)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Converts this configuration object to [[ChannelSettings]].
|
* Converts this configuration object to [[ChannelSettings]].
|
||||||
*/
|
*/
|
||||||
|
|
@ -88,6 +95,7 @@ final case class PersistentChannelSettings(
|
||||||
ChannelSettings(redeliverMax, redeliverInterval, redeliverFailureListener)
|
ChannelSettings(redeliverMax, redeliverInterval, redeliverFailureListener)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@deprecated("PersistentChannel will be removed, see `akka.persistence.AtLeastOnceDelivery` instead.", since = "2.3.4")
|
||||||
object PersistentChannelSettings {
|
object PersistentChannelSettings {
|
||||||
/**
|
/**
|
||||||
* Java API.
|
* Java API.
|
||||||
|
|
@ -99,11 +107,18 @@ object PersistentChannelSettings {
|
||||||
* Resets a [[PersistentChannel]], forcing it to redeliver all unconfirmed persistent
|
* Resets a [[PersistentChannel]], forcing it to redeliver all unconfirmed persistent
|
||||||
* messages. This does not affect writing [[Deliver]] requests.
|
* messages. This does not affect writing [[Deliver]] requests.
|
||||||
*/
|
*/
|
||||||
case object Reset
|
@deprecated("PersistentChannel will be removed, see `akka.persistence.AtLeastOnceDelivery` instead.", since = "2.3.4")
|
||||||
|
case object Reset {
|
||||||
|
/**
|
||||||
|
* Java API.
|
||||||
|
*/
|
||||||
|
def getInstance() = this
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Exception thrown by a [[PersistentChannel]] child actor to re-initiate delivery.
|
* Exception thrown by a [[PersistentChannel]] child actor to re-initiate delivery.
|
||||||
*/
|
*/
|
||||||
|
@deprecated("PersistentChannel will be removed, see `akka.persistence.AtLeastOnceDelivery` instead.", since = "2.3.4")
|
||||||
class ResetException extends AkkaException("Channel reset on application request")
|
class ResetException extends AkkaException("Channel reset on application request")
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -124,6 +139,7 @@ class ResetException extends AkkaException("Channel reset on application request
|
||||||
* or not (see `replyPersistent` parameter in [[PersistentChannelSettings]]). In case of success, the channel
|
* or not (see `replyPersistent` parameter in [[PersistentChannelSettings]]). In case of success, the channel
|
||||||
* replies with the contained [[Persistent]] message, otherwise with a [[PersistenceFailure]] message.
|
* replies with the contained [[Persistent]] message, otherwise with a [[PersistenceFailure]] message.
|
||||||
*/
|
*/
|
||||||
|
@deprecated("PersistentChannel will be removed, see `akka.persistence.AtLeastOnceDelivery` instead.", since = "2.3.4")
|
||||||
final class PersistentChannel private[akka] (_channelId: Option[String], channelSettings: PersistentChannelSettings) extends Actor {
|
final class PersistentChannel private[akka] (_channelId: Option[String], channelSettings: PersistentChannelSettings) extends Actor {
|
||||||
private val id = _channelId match {
|
private val id = _channelId match {
|
||||||
case Some(cid) ⇒ cid
|
case Some(cid) ⇒ cid
|
||||||
|
|
@ -144,6 +160,7 @@ final class PersistentChannel private[akka] (_channelId: Option[String], channel
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@deprecated("Channel will be removed, see `akka.persistence.AtLeastOnceDelivery` instead.", since = "2.3.4")
|
||||||
object PersistentChannel {
|
object PersistentChannel {
|
||||||
/**
|
/**
|
||||||
* Returns a channel actor configuration object for creating a [[PersistentChannel]] with a
|
* Returns a channel actor configuration object for creating a [[PersistentChannel]] with a
|
||||||
|
|
@ -183,8 +200,8 @@ object PersistentChannel {
|
||||||
/**
|
/**
|
||||||
* Plugin API.
|
* Plugin API.
|
||||||
*/
|
*/
|
||||||
@deprecated("PersistentChannel will be removed, see `AtLeastOnceDelivery` instead.", since = "2.3.4")
|
@deprecated("PersistentChannel will be removed, see `akka.persistence.AtLeastOnceDelivery` instead.", since = "2.3.4")
|
||||||
final case class DeliveredByPersistenceChannel(
|
final case class DeliveredByPersistentChannel(
|
||||||
channelId: String,
|
channelId: String,
|
||||||
persistentSequenceNr: Long,
|
persistentSequenceNr: Long,
|
||||||
deliverySequenceNr: Long = 0L,
|
deliverySequenceNr: Long = 0L,
|
||||||
|
|
@ -192,37 +209,38 @@ final case class DeliveredByPersistenceChannel(
|
||||||
|
|
||||||
def persistenceId: String = channelId
|
def persistenceId: String = channelId
|
||||||
def sequenceNr: Long = persistentSequenceNr
|
def sequenceNr: Long = persistentSequenceNr
|
||||||
def update(deliverySequenceNr: Long, channel: ActorRef): DeliveredByPersistenceChannel =
|
def update(deliverySequenceNr: Long, channel: ActorRef): DeliveredByPersistentChannel =
|
||||||
copy(deliverySequenceNr = deliverySequenceNr, channel = channel)
|
copy(deliverySequenceNr = deliverySequenceNr, channel = channel)
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* INTERNAL API.
|
* INTERNAL API.
|
||||||
*/
|
*/
|
||||||
|
@deprecated("PersistentChannel will be removed, see `akka.persistence.AtLeastOnceDelivery` instead.", since = "2.3.4")
|
||||||
private[persistence] class DeliveredByPersistentChannelBatching(journal: ActorRef, settings: PersistenceSettings) extends Actor {
|
private[persistence] class DeliveredByPersistentChannelBatching(journal: ActorRef, settings: PersistenceSettings) extends Actor {
|
||||||
private val publish = settings.internal.publishConfirmations
|
private val publish = settings.internal.publishConfirmations
|
||||||
private val batchMax = settings.journal.maxConfirmationBatchSize
|
private val batchMax = settings.journal.maxConfirmationBatchSize
|
||||||
|
|
||||||
private var batching = false
|
private var batching = false
|
||||||
private var batch = Vector.empty[DeliveredByPersistenceChannel]
|
private var batch = Vector.empty[DeliveredByPersistentChannel]
|
||||||
|
|
||||||
def receive = {
|
def receive = {
|
||||||
case DeleteMessagesSuccess(messageIds) ⇒
|
case DeleteMessagesSuccess(messageIds) ⇒
|
||||||
if (batch.isEmpty) batching = false else journalBatch()
|
if (batch.isEmpty) batching = false else journalBatch()
|
||||||
messageIds.foreach {
|
messageIds.foreach {
|
||||||
case c: DeliveredByPersistenceChannel ⇒
|
case c: DeliveredByPersistentChannel ⇒
|
||||||
c.channel ! c
|
c.channel ! c
|
||||||
if (publish) context.system.eventStream.publish(c)
|
if (publish) context.system.eventStream.publish(c)
|
||||||
}
|
}
|
||||||
case DeleteMessagesFailure(_) ⇒
|
case DeleteMessagesFailure(_) ⇒
|
||||||
if (batch.isEmpty) batching = false else journalBatch()
|
if (batch.isEmpty) batching = false else journalBatch()
|
||||||
case d: DeliveredByPersistenceChannel ⇒
|
case d: DeliveredByPersistentChannel ⇒
|
||||||
addToBatch(d)
|
addToBatch(d)
|
||||||
if (!batching || maxBatchSizeReached) journalBatch()
|
if (!batching || maxBatchSizeReached) journalBatch()
|
||||||
case m ⇒ journal forward m
|
case m ⇒ journal forward m
|
||||||
}
|
}
|
||||||
|
|
||||||
def addToBatch(pc: DeliveredByPersistenceChannel): Unit =
|
def addToBatch(pc: DeliveredByPersistentChannel): Unit =
|
||||||
batch = batch :+ pc
|
batch = batch :+ pc
|
||||||
|
|
||||||
def maxBatchSizeReached: Boolean =
|
def maxBatchSizeReached: Boolean =
|
||||||
|
|
@ -238,6 +256,7 @@ private[persistence] class DeliveredByPersistentChannelBatching(journal: ActorRe
|
||||||
/**
|
/**
|
||||||
* Writes [[Deliver]] requests to the journal.
|
* Writes [[Deliver]] requests to the journal.
|
||||||
*/
|
*/
|
||||||
|
@deprecated("PersistentChannel will be removed, see `akka.persistence.AtLeastOnceDelivery` instead.", since = "2.3.4")
|
||||||
private class RequestWriter(channelId: String, channelSettings: PersistentChannelSettings, reader: ActorRef) extends Processor {
|
private class RequestWriter(channelId: String, channelSettings: PersistentChannelSettings, reader: ActorRef) extends Processor {
|
||||||
import RequestWriter._
|
import RequestWriter._
|
||||||
import channelSettings._
|
import channelSettings._
|
||||||
|
|
@ -282,6 +301,7 @@ private class RequestWriter(channelId: String, channelSettings: PersistentChanne
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@deprecated("Channel will be removed, see `akka.persistence.AtLeastOnceDelivery` instead.", since = "2.3.4")
|
||||||
private object RequestWriter {
|
private object RequestWriter {
|
||||||
case object RequestsWritten
|
case object RequestsWritten
|
||||||
}
|
}
|
||||||
|
|
@ -296,6 +316,7 @@ private object RequestWriter {
|
||||||
*
|
*
|
||||||
* @see [[PersistentChannel]]
|
* @see [[PersistentChannel]]
|
||||||
*/
|
*/
|
||||||
|
@deprecated("PersistentChannel will be removed, see `akka.persistence.AtLeastOnceDelivery` instead.", since = "2.3.4")
|
||||||
private class RequestReader(channelId: String, channelSettings: PersistentChannelSettings) extends Actor with Recovery {
|
private class RequestReader(channelId: String, channelSettings: PersistentChannelSettings) extends Actor with Recovery {
|
||||||
import RequestWriter._
|
import RequestWriter._
|
||||||
import channelSettings._
|
import channelSettings._
|
||||||
|
|
@ -378,7 +399,7 @@ private class RequestReader(channelId: String, channelSettings: PersistentChanne
|
||||||
private def prepareDelivery(wrapped: PersistentRepr, wrapper: PersistentRepr): PersistentRepr = {
|
private def prepareDelivery(wrapped: PersistentRepr, wrapper: PersistentRepr): PersistentRepr = {
|
||||||
ConfirmablePersistentImpl(wrapped,
|
ConfirmablePersistentImpl(wrapped,
|
||||||
confirmTarget = dbJournal,
|
confirmTarget = dbJournal,
|
||||||
confirmMessage = DeliveredByPersistenceChannel(channelId, wrapper.sequenceNr, channel = self))
|
confirmMessage = DeliveredByPersistentChannel(channelId, wrapper.sequenceNr, channel = self))
|
||||||
}
|
}
|
||||||
|
|
||||||
override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
|
override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
|
||||||
|
|
|
||||||
|
|
@ -30,7 +30,7 @@ class MessageSerializer(val system: ExtendedActorSystem) extends Serializer {
|
||||||
val PersistentImplClass = classOf[PersistentImpl]
|
val PersistentImplClass = classOf[PersistentImpl]
|
||||||
val ConfirmablePersistentImplClass = classOf[ConfirmablePersistentImpl]
|
val ConfirmablePersistentImplClass = classOf[ConfirmablePersistentImpl]
|
||||||
val DeliveredByTransientChannelClass = classOf[DeliveredByChannel]
|
val DeliveredByTransientChannelClass = classOf[DeliveredByChannel]
|
||||||
val DeliveredByPersistentChannelClass = classOf[DeliveredByPersistenceChannel]
|
val DeliveredByPersistentChannelClass = classOf[DeliveredByPersistentChannel]
|
||||||
val DeliverClass = classOf[Deliver]
|
val DeliverClass = classOf[Deliver]
|
||||||
|
|
||||||
def identifier: Int = 7
|
def identifier: Int = 7
|
||||||
|
|
@ -47,12 +47,12 @@ class MessageSerializer(val system: ExtendedActorSystem) extends Serializer {
|
||||||
* serialization of a persistent message's payload to a matching `akka.serialization.Serializer`.
|
* serialization of a persistent message's payload to a matching `akka.serialization.Serializer`.
|
||||||
*/
|
*/
|
||||||
def toBinary(o: AnyRef): Array[Byte] = o match {
|
def toBinary(o: AnyRef): Array[Byte] = o match {
|
||||||
case b: PersistentBatch ⇒ persistentMessageBatchBuilder(b).build().toByteArray
|
case b: PersistentBatch ⇒ persistentMessageBatchBuilder(b).build().toByteArray
|
||||||
case p: PersistentRepr ⇒ persistentMessageBuilder(p).build().toByteArray
|
case p: PersistentRepr ⇒ persistentMessageBuilder(p).build().toByteArray
|
||||||
case c: DeliveredByChannel ⇒ deliveredMessageBuilder(c).build().toByteArray
|
case c: DeliveredByChannel ⇒ deliveredMessageBuilder(c).build().toByteArray
|
||||||
case c: DeliveredByPersistenceChannel ⇒ deliveredMessageBuilder(c).build().toByteArray
|
case c: DeliveredByPersistentChannel ⇒ deliveredMessageBuilder(c).build().toByteArray
|
||||||
case d: Deliver ⇒ deliverMessageBuilder(d).build.toByteArray
|
case d: Deliver ⇒ deliverMessageBuilder(d).build.toByteArray
|
||||||
case _ ⇒ throw new IllegalArgumentException(s"Can't serialize object of type ${o.getClass}")
|
case _ ⇒ throw new IllegalArgumentException(s"Can't serialize object of type ${o.getClass}")
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -192,7 +192,7 @@ class MessageSerializer(val system: ExtendedActorSystem) extends Serializer {
|
||||||
deliveredMessage.getDeliverySequenceNr,
|
deliveredMessage.getDeliverySequenceNr,
|
||||||
channel)
|
channel)
|
||||||
} else {
|
} else {
|
||||||
DeliveredByPersistenceChannel(
|
DeliveredByPersistentChannel(
|
||||||
deliveredMessage.getChannelId,
|
deliveredMessage.getChannelId,
|
||||||
deliveredMessage.getPersistentSequenceNr,
|
deliveredMessage.getPersistentSequenceNr,
|
||||||
deliveredMessage.getDeliverySequenceNr,
|
deliveredMessage.getDeliverySequenceNr,
|
||||||
|
|
|
||||||
|
|
@ -51,7 +51,7 @@ class NumberProcessorSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "Nu
|
||||||
|
|
||||||
"resurrect with the correct state, not replaying confirmed messages to clients" in {
|
"resurrect with the correct state, not replaying confirmed messages to clients" in {
|
||||||
val deliveredProbe = TestProbe()
|
val deliveredProbe = TestProbe()
|
||||||
system.eventStream.subscribe(deliveredProbe.testActor, classOf[DeliveredByPersistenceChannel])
|
system.eventStream.subscribe(deliveredProbe.testActor, classOf[DeliveredByPersistentChannel])
|
||||||
|
|
||||||
val probe = TestProbe()
|
val probe = TestProbe()
|
||||||
|
|
||||||
|
|
@ -62,7 +62,7 @@ class NumberProcessorSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "Nu
|
||||||
zero.confirm()
|
zero.confirm()
|
||||||
zero.payload should equal(0)
|
zero.payload should equal(0)
|
||||||
|
|
||||||
deliveredProbe.expectMsgType[DeliveredByPersistenceChannel]
|
deliveredProbe.expectMsgType[DeliveredByPersistentChannel]
|
||||||
|
|
||||||
processor.tell(Persistent(DecrementAndGet), probe.testActor)
|
processor.tell(Persistent(DecrementAndGet), probe.testActor)
|
||||||
|
|
||||||
|
|
@ -70,7 +70,7 @@ class NumberProcessorSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "Nu
|
||||||
decrementFrom0.confirm()
|
decrementFrom0.confirm()
|
||||||
decrementFrom0.payload should equal(-1)
|
decrementFrom0.payload should equal(-1)
|
||||||
|
|
||||||
deliveredProbe.expectMsgType[DeliveredByPersistenceChannel]
|
deliveredProbe.expectMsgType[DeliveredByPersistentChannel]
|
||||||
|
|
||||||
watch(processor)
|
watch(processor)
|
||||||
system.stop(processor)
|
system.stop(processor)
|
||||||
|
|
@ -83,7 +83,7 @@ class NumberProcessorSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "Nu
|
||||||
decrementFromMinus1.confirm()
|
decrementFromMinus1.confirm()
|
||||||
decrementFromMinus1.payload should equal(-2)
|
decrementFromMinus1.payload should equal(-2)
|
||||||
|
|
||||||
deliveredProbe.expectMsgType[DeliveredByPersistenceChannel]
|
deliveredProbe.expectMsgType[DeliveredByPersistentChannel]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -179,10 +179,10 @@ class PerformanceSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "Perfor
|
||||||
}
|
}
|
||||||
|
|
||||||
def subscribeToConfirmation(probe: TestProbe): Unit =
|
def subscribeToConfirmation(probe: TestProbe): Unit =
|
||||||
system.eventStream.subscribe(probe.ref, classOf[DeliveredByPersistenceChannel])
|
system.eventStream.subscribe(probe.ref, classOf[DeliveredByPersistentChannel])
|
||||||
|
|
||||||
def awaitConfirmation(probe: TestProbe): Unit =
|
def awaitConfirmation(probe: TestProbe): Unit =
|
||||||
probe.expectMsgType[DeliveredByPersistenceChannel]
|
probe.expectMsgType[DeliveredByPersistentChannel]
|
||||||
|
|
||||||
"A command sourced processor" should {
|
"A command sourced processor" should {
|
||||||
"have some reasonable throughput" in {
|
"have some reasonable throughput" in {
|
||||||
|
|
@ -213,7 +213,7 @@ class PerformanceSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "Perfor
|
||||||
stressPersistentChannel()
|
stressPersistentChannel()
|
||||||
|
|
||||||
probe.fishForMessage(100.seconds) {
|
probe.fishForMessage(100.seconds) {
|
||||||
case DeliveredByPersistenceChannel(_, snr, _, _) ⇒ snr == warmupCycles + loadCycles + 2
|
case DeliveredByPersistentChannel(_, snr, _, _) ⇒ snr == warmupCycles + loadCycles + 2
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -124,11 +124,11 @@ class MessageSerializerPersistenceSpec extends AkkaSpec(customSerializers) {
|
||||||
deserialized should be(confirmation)
|
deserialized should be(confirmation)
|
||||||
}
|
}
|
||||||
"handle DeliveredByPersistentChannel message serialization" in {
|
"handle DeliveredByPersistentChannel message serialization" in {
|
||||||
val confirmation = DeliveredByPersistenceChannel("c2", 14)
|
val confirmation = DeliveredByPersistentChannel("c2", 14)
|
||||||
val serializer = serialization.findSerializerFor(confirmation)
|
val serializer = serialization.findSerializerFor(confirmation)
|
||||||
|
|
||||||
val bytes = serializer.toBinary(confirmation)
|
val bytes = serializer.toBinary(confirmation)
|
||||||
val deserialized = serializer.fromBinary(bytes, Some(classOf[DeliveredByPersistenceChannel]))
|
val deserialized = serializer.fromBinary(bytes, Some(classOf[DeliveredByPersistentChannel]))
|
||||||
|
|
||||||
deserialized should be(confirmation)
|
deserialized should be(confirmation)
|
||||||
}
|
}
|
||||||
|
|
@ -149,7 +149,7 @@ object MessageSerializerRemotingSpec {
|
||||||
case ConfirmablePersistent(MyPayload(data), _, _) ⇒ sender() ! s"c${data}"
|
case ConfirmablePersistent(MyPayload(data), _, _) ⇒ sender() ! s"c${data}"
|
||||||
case Persistent(MyPayload(data), _) ⇒ sender() ! s"p${data}"
|
case Persistent(MyPayload(data), _) ⇒ sender() ! s"p${data}"
|
||||||
case DeliveredByChannel(pid, cid, msnr, dsnr, ep) ⇒ sender() ! s"${pid},${cid},${msnr},${dsnr},${ep.path.name.startsWith("testActor")}"
|
case DeliveredByChannel(pid, cid, msnr, dsnr, ep) ⇒ sender() ! s"${pid},${cid},${msnr},${dsnr},${ep.path.name.startsWith("testActor")}"
|
||||||
case DeliveredByPersistenceChannel(cid, msnr, dsnr, ep) ⇒ sender() ! s"${cid},${msnr},${dsnr},${ep.path.name.startsWith("testActor")}"
|
case DeliveredByPersistentChannel(cid, msnr, dsnr, ep) ⇒ sender() ! s"${cid},${msnr},${dsnr},${ep.path.name.startsWith("testActor")}"
|
||||||
case Deliver(Persistent(payload, _), dp) ⇒ context.actorSelection(dp) ! payload
|
case Deliver(Persistent(payload, _), dp) ⇒ context.actorSelection(dp) ! payload
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -194,7 +194,7 @@ class MessageSerializerRemotingSpec extends AkkaSpec(remote.withFallback(customS
|
||||||
expectMsg("a,b,2,3,true")
|
expectMsg("a,b,2,3,true")
|
||||||
}
|
}
|
||||||
"serialize DeliveredByPersistentChannel messages during remoting" in {
|
"serialize DeliveredByPersistentChannel messages during remoting" in {
|
||||||
localActor ! DeliveredByPersistenceChannel("c", 2, 3, testActor)
|
localActor ! DeliveredByPersistentChannel("c", 2, 3, testActor)
|
||||||
expectMsg("c,2,3,true")
|
expectMsg("c,2,3,true")
|
||||||
}
|
}
|
||||||
"serialize Deliver messages during remoting" in {
|
"serialize Deliver messages during remoting" in {
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue