diff --git a/akka-persistence/src/main/scala/akka/persistence/Channel.scala b/akka-persistence/src/main/scala/akka/persistence/Channel.scala index 6c85941e85..33580a920d 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Channel.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Channel.scala @@ -16,6 +16,8 @@ import akka.actor._ import akka.persistence.serialization.Message import akka.persistence.JournalProtocol._ +// TODO: remove Channel + /** * A [[Channel]] configuration object. * @@ -27,6 +29,7 @@ import akka.persistence.JournalProtocol._ * Alternatively, it can also confirm these messages, preventing further redeliveries. */ @SerialVersionUID(1L) +@deprecated("Channel will be removed, see `akka.persistence.AtLeastOnceDelivery` instead.", since = "2.3.4") final case class ChannelSettings( val redeliverMax: Int = 5, val redeliverInterval: FiniteDuration = 5.seconds, @@ -51,6 +54,7 @@ final case class ChannelSettings( copy(redeliverFailureListener = Option(redeliverFailureListener)) } +@deprecated("Channel will be removed, see `akka.persistence.AtLeastOnceDelivery` instead.", since = "2.3.4") object ChannelSettings { /** * Java API. @@ -118,6 +122,7 @@ object ChannelSettings { * * @see [[Deliver]] */ +@deprecated("Channel will be removed, see `akka.persistence.AtLeastOnceDelivery` instead.", since = "2.3.4") final class Channel private[akka] (_channelId: Option[String], channelSettings: ChannelSettings) extends Actor { import channelSettings._ @@ -142,6 +147,7 @@ final class Channel private[akka] (_channelId: Option[String], channelSettings: confirmMessage = DeliveredByChannel(persistent.persistenceId, id, persistent.sequenceNr, channel = self)) } +@deprecated("Channel will be removed, see `akka.persistence.AtLeastOnceDelivery` instead.", since = "2.3.4") object Channel { /** * Returns a channel actor configuration object for creating a [[Channel]] with a @@ -187,8 +193,10 @@ object Channel { * @param destination persistent message destination. */ @SerialVersionUID(1L) +@deprecated("Channel will be removed, see `akka.persistence.AtLeastOnceDelivery` instead.", since = "2.3.4") final case class Deliver(persistent: Persistent, destination: ActorPath) extends Message +@deprecated("Channel will be removed, see `akka.persistence.AtLeastOnceDelivery` instead.", since = "2.3.4") object Deliver { /** * Java API. @@ -200,6 +208,7 @@ object Deliver { * Plugin API: confirmation message generated by receivers of [[ConfirmablePersistent]] messages * by calling `ConfirmablePersistent.confirm()`. */ +@deprecated("Channel will be removed, see `akka.persistence.AtLeastOnceDelivery` instead.", since = "2.3.4") trait Delivered extends Message { def channelId: String def persistentSequenceNr: Long @@ -215,6 +224,7 @@ trait Delivered extends Message { /** * Plugin API. */ +@deprecated("Channel will be removed, see `akka.persistence.AtLeastOnceDelivery` instead.", since = "2.3.4") final case class DeliveredByChannel( @deprecatedName('processorId) persistenceId: String, channelId: String, @@ -230,6 +240,7 @@ final case class DeliveredByChannel( /** * INTERNAL API. */ +@deprecated("Channel will be removed, see `akka.persistence.AtLeastOnceDelivery` instead.", since = "2.3.4") private[persistence] class DeliveredByChannelBatching(journal: ActorRef, settings: PersistenceSettings) extends Actor { private val publish = settings.internal.publishConfirmations private val batchMax = settings.journal.maxConfirmationBatchSize @@ -270,6 +281,7 @@ private[persistence] class DeliveredByChannelBatching(journal: ActorRef, setting * Notification message to inform channel listeners about messages that have reached the maximum * number of redeliveries. */ +@deprecated("Channel will be removed, see `akka.persistence.AtLeastOnceDelivery` instead.", since = "2.3.4") final case class RedeliverFailure(messages: immutable.Seq[ConfirmablePersistent]) { /** * Java API. @@ -281,6 +293,7 @@ final case class RedeliverFailure(messages: immutable.Seq[ConfirmablePersistent] * Reliably deliver messages contained in [[Deliver]] requests to their destinations. Unconfirmed * messages are redelivered according to the parameters in [[ChannelSettings]]. */ +@deprecated("Channel will be removed, see `akka.persistence.AtLeastOnceDelivery` instead.", since = "2.3.4") private class ReliableDelivery(redeliverSettings: ChannelSettings) extends Actor { import redeliverSettings._ import ReliableDelivery._ @@ -313,6 +326,7 @@ private class ReliableDelivery(redeliverSettings: ChannelSettings) extends Actor } } +@deprecated("Channel will be removed, see `akka.persistence.AtLeastOnceDelivery` instead.", since = "2.3.4") private object ReliableDelivery { type DeliveryAttempts = immutable.SortedMap[Long, DeliveryAttempt] type FailedAttempts = Vector[ConfirmablePersistentImpl] @@ -328,6 +342,7 @@ private object ReliableDelivery { /** * Redelivery process used by [[ReliableDelivery]]. */ +@deprecated("Channel will be removed, see `akka.persistence.AtLeastOnceDelivery` instead.", since = "2.3.4") private class Redelivery(redeliverSettings: ChannelSettings) extends Actor { import context.dispatcher import redeliverSettings._ diff --git a/akka-persistence/src/main/scala/akka/persistence/PersistentChannel.scala b/akka-persistence/src/main/scala/akka/persistence/PersistentChannel.scala index 5132253f58..f3077e75b0 100644 --- a/akka-persistence/src/main/scala/akka/persistence/PersistentChannel.scala +++ b/akka-persistence/src/main/scala/akka/persistence/PersistentChannel.scala @@ -36,6 +36,7 @@ import akka.persistence.JournalProtocol._ * made after the configured timeout. */ @SerialVersionUID(1L) +@deprecated("PersistentChannel will be removed, see `akka.persistence.AtLeastOnceDelivery` instead.", since = "2.3.4") final case class PersistentChannelSettings( val redeliverMax: Int = 5, val redeliverInterval: FiniteDuration = 5.seconds, @@ -81,6 +82,12 @@ final case class PersistentChannelSettings( def withPendingConfirmationsMin(pendingConfirmationsMin: Long): PersistentChannelSettings = copy(pendingConfirmationsMin = pendingConfirmationsMin) + /** + * Java API. + */ + def withIdleTimeout(idleTimeout: FiniteDuration): PersistentChannelSettings = + copy(idleTimeout = idleTimeout) + /** * Converts this configuration object to [[ChannelSettings]]. */ @@ -88,6 +95,7 @@ final case class PersistentChannelSettings( ChannelSettings(redeliverMax, redeliverInterval, redeliverFailureListener) } +@deprecated("PersistentChannel will be removed, see `akka.persistence.AtLeastOnceDelivery` instead.", since = "2.3.4") object PersistentChannelSettings { /** * Java API. @@ -99,11 +107,18 @@ object PersistentChannelSettings { * Resets a [[PersistentChannel]], forcing it to redeliver all unconfirmed persistent * messages. This does not affect writing [[Deliver]] requests. */ -case object Reset +@deprecated("PersistentChannel will be removed, see `akka.persistence.AtLeastOnceDelivery` instead.", since = "2.3.4") +case object Reset { + /** + * Java API. + */ + def getInstance() = this +} /** * Exception thrown by a [[PersistentChannel]] child actor to re-initiate delivery. */ +@deprecated("PersistentChannel will be removed, see `akka.persistence.AtLeastOnceDelivery` instead.", since = "2.3.4") class ResetException extends AkkaException("Channel reset on application request") /** @@ -124,6 +139,7 @@ class ResetException extends AkkaException("Channel reset on application request * or not (see `replyPersistent` parameter in [[PersistentChannelSettings]]). In case of success, the channel * replies with the contained [[Persistent]] message, otherwise with a [[PersistenceFailure]] message. */ +@deprecated("PersistentChannel will be removed, see `akka.persistence.AtLeastOnceDelivery` instead.", since = "2.3.4") final class PersistentChannel private[akka] (_channelId: Option[String], channelSettings: PersistentChannelSettings) extends Actor { private val id = _channelId match { case Some(cid) ⇒ cid @@ -144,6 +160,7 @@ final class PersistentChannel private[akka] (_channelId: Option[String], channel } } +@deprecated("Channel will be removed, see `akka.persistence.AtLeastOnceDelivery` instead.", since = "2.3.4") object PersistentChannel { /** * Returns a channel actor configuration object for creating a [[PersistentChannel]] with a @@ -183,8 +200,8 @@ object PersistentChannel { /** * Plugin API. */ -@deprecated("PersistentChannel will be removed, see `AtLeastOnceDelivery` instead.", since = "2.3.4") -final case class DeliveredByPersistenceChannel( +@deprecated("PersistentChannel will be removed, see `akka.persistence.AtLeastOnceDelivery` instead.", since = "2.3.4") +final case class DeliveredByPersistentChannel( channelId: String, persistentSequenceNr: Long, deliverySequenceNr: Long = 0L, @@ -192,37 +209,38 @@ final case class DeliveredByPersistenceChannel( def persistenceId: String = channelId def sequenceNr: Long = persistentSequenceNr - def update(deliverySequenceNr: Long, channel: ActorRef): DeliveredByPersistenceChannel = + def update(deliverySequenceNr: Long, channel: ActorRef): DeliveredByPersistentChannel = copy(deliverySequenceNr = deliverySequenceNr, channel = channel) } /** * INTERNAL API. */ +@deprecated("PersistentChannel will be removed, see `akka.persistence.AtLeastOnceDelivery` instead.", since = "2.3.4") private[persistence] class DeliveredByPersistentChannelBatching(journal: ActorRef, settings: PersistenceSettings) extends Actor { private val publish = settings.internal.publishConfirmations private val batchMax = settings.journal.maxConfirmationBatchSize private var batching = false - private var batch = Vector.empty[DeliveredByPersistenceChannel] + private var batch = Vector.empty[DeliveredByPersistentChannel] def receive = { case DeleteMessagesSuccess(messageIds) ⇒ if (batch.isEmpty) batching = false else journalBatch() messageIds.foreach { - case c: DeliveredByPersistenceChannel ⇒ + case c: DeliveredByPersistentChannel ⇒ c.channel ! c if (publish) context.system.eventStream.publish(c) } case DeleteMessagesFailure(_) ⇒ if (batch.isEmpty) batching = false else journalBatch() - case d: DeliveredByPersistenceChannel ⇒ + case d: DeliveredByPersistentChannel ⇒ addToBatch(d) if (!batching || maxBatchSizeReached) journalBatch() case m ⇒ journal forward m } - def addToBatch(pc: DeliveredByPersistenceChannel): Unit = + def addToBatch(pc: DeliveredByPersistentChannel): Unit = batch = batch :+ pc def maxBatchSizeReached: Boolean = @@ -238,6 +256,7 @@ private[persistence] class DeliveredByPersistentChannelBatching(journal: ActorRe /** * Writes [[Deliver]] requests to the journal. */ +@deprecated("PersistentChannel will be removed, see `akka.persistence.AtLeastOnceDelivery` instead.", since = "2.3.4") private class RequestWriter(channelId: String, channelSettings: PersistentChannelSettings, reader: ActorRef) extends Processor { import RequestWriter._ import channelSettings._ @@ -282,6 +301,7 @@ private class RequestWriter(channelId: String, channelSettings: PersistentChanne } } +@deprecated("Channel will be removed, see `akka.persistence.AtLeastOnceDelivery` instead.", since = "2.3.4") private object RequestWriter { case object RequestsWritten } @@ -296,6 +316,7 @@ private object RequestWriter { * * @see [[PersistentChannel]] */ +@deprecated("PersistentChannel will be removed, see `akka.persistence.AtLeastOnceDelivery` instead.", since = "2.3.4") private class RequestReader(channelId: String, channelSettings: PersistentChannelSettings) extends Actor with Recovery { import RequestWriter._ import channelSettings._ @@ -378,7 +399,7 @@ private class RequestReader(channelId: String, channelSettings: PersistentChanne private def prepareDelivery(wrapped: PersistentRepr, wrapper: PersistentRepr): PersistentRepr = { ConfirmablePersistentImpl(wrapped, confirmTarget = dbJournal, - confirmMessage = DeliveredByPersistenceChannel(channelId, wrapper.sequenceNr, channel = self)) + confirmMessage = DeliveredByPersistentChannel(channelId, wrapper.sequenceNr, channel = self)) } override def preRestart(reason: Throwable, message: Option[Any]): Unit = { diff --git a/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala b/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala index e8f978090e..36a8d30863 100644 --- a/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala +++ b/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala @@ -30,7 +30,7 @@ class MessageSerializer(val system: ExtendedActorSystem) extends Serializer { val PersistentImplClass = classOf[PersistentImpl] val ConfirmablePersistentImplClass = classOf[ConfirmablePersistentImpl] val DeliveredByTransientChannelClass = classOf[DeliveredByChannel] - val DeliveredByPersistentChannelClass = classOf[DeliveredByPersistenceChannel] + val DeliveredByPersistentChannelClass = classOf[DeliveredByPersistentChannel] val DeliverClass = classOf[Deliver] def identifier: Int = 7 @@ -47,12 +47,12 @@ class MessageSerializer(val system: ExtendedActorSystem) extends Serializer { * serialization of a persistent message's payload to a matching `akka.serialization.Serializer`. */ def toBinary(o: AnyRef): Array[Byte] = o match { - case b: PersistentBatch ⇒ persistentMessageBatchBuilder(b).build().toByteArray - case p: PersistentRepr ⇒ persistentMessageBuilder(p).build().toByteArray - case c: DeliveredByChannel ⇒ deliveredMessageBuilder(c).build().toByteArray - case c: DeliveredByPersistenceChannel ⇒ deliveredMessageBuilder(c).build().toByteArray - case d: Deliver ⇒ deliverMessageBuilder(d).build.toByteArray - case _ ⇒ throw new IllegalArgumentException(s"Can't serialize object of type ${o.getClass}") + case b: PersistentBatch ⇒ persistentMessageBatchBuilder(b).build().toByteArray + case p: PersistentRepr ⇒ persistentMessageBuilder(p).build().toByteArray + case c: DeliveredByChannel ⇒ deliveredMessageBuilder(c).build().toByteArray + case c: DeliveredByPersistentChannel ⇒ deliveredMessageBuilder(c).build().toByteArray + case d: Deliver ⇒ deliverMessageBuilder(d).build.toByteArray + case _ ⇒ throw new IllegalArgumentException(s"Can't serialize object of type ${o.getClass}") } /** @@ -192,7 +192,7 @@ class MessageSerializer(val system: ExtendedActorSystem) extends Serializer { deliveredMessage.getDeliverySequenceNr, channel) } else { - DeliveredByPersistenceChannel( + DeliveredByPersistentChannel( deliveredMessage.getChannelId, deliveredMessage.getPersistentSequenceNr, deliveredMessage.getDeliverySequenceNr, diff --git a/akka-persistence/src/test/scala/akka/persistence/NumberProcessorSpec.scala b/akka-persistence/src/test/scala/akka/persistence/NumberProcessorSpec.scala index 06ad3a9ab2..f02c593dd5 100644 --- a/akka-persistence/src/test/scala/akka/persistence/NumberProcessorSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/NumberProcessorSpec.scala @@ -51,7 +51,7 @@ class NumberProcessorSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "Nu "resurrect with the correct state, not replaying confirmed messages to clients" in { val deliveredProbe = TestProbe() - system.eventStream.subscribe(deliveredProbe.testActor, classOf[DeliveredByPersistenceChannel]) + system.eventStream.subscribe(deliveredProbe.testActor, classOf[DeliveredByPersistentChannel]) val probe = TestProbe() @@ -62,7 +62,7 @@ class NumberProcessorSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "Nu zero.confirm() zero.payload should equal(0) - deliveredProbe.expectMsgType[DeliveredByPersistenceChannel] + deliveredProbe.expectMsgType[DeliveredByPersistentChannel] processor.tell(Persistent(DecrementAndGet), probe.testActor) @@ -70,7 +70,7 @@ class NumberProcessorSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "Nu decrementFrom0.confirm() decrementFrom0.payload should equal(-1) - deliveredProbe.expectMsgType[DeliveredByPersistenceChannel] + deliveredProbe.expectMsgType[DeliveredByPersistentChannel] watch(processor) system.stop(processor) @@ -83,7 +83,7 @@ class NumberProcessorSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "Nu decrementFromMinus1.confirm() decrementFromMinus1.payload should equal(-2) - deliveredProbe.expectMsgType[DeliveredByPersistenceChannel] + deliveredProbe.expectMsgType[DeliveredByPersistentChannel] } } } diff --git a/akka-persistence/src/test/scala/akka/persistence/PerformanceSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PerformanceSpec.scala index 86484691e7..f30dbf2c32 100644 --- a/akka-persistence/src/test/scala/akka/persistence/PerformanceSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/PerformanceSpec.scala @@ -179,10 +179,10 @@ class PerformanceSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "Perfor } def subscribeToConfirmation(probe: TestProbe): Unit = - system.eventStream.subscribe(probe.ref, classOf[DeliveredByPersistenceChannel]) + system.eventStream.subscribe(probe.ref, classOf[DeliveredByPersistentChannel]) def awaitConfirmation(probe: TestProbe): Unit = - probe.expectMsgType[DeliveredByPersistenceChannel] + probe.expectMsgType[DeliveredByPersistentChannel] "A command sourced processor" should { "have some reasonable throughput" in { @@ -213,7 +213,7 @@ class PerformanceSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "Perfor stressPersistentChannel() probe.fishForMessage(100.seconds) { - case DeliveredByPersistenceChannel(_, snr, _, _) ⇒ snr == warmupCycles + loadCycles + 2 + case DeliveredByPersistentChannel(_, snr, _, _) ⇒ snr == warmupCycles + loadCycles + 2 } } } diff --git a/akka-persistence/src/test/scala/akka/persistence/serialization/SerializerSpec.scala b/akka-persistence/src/test/scala/akka/persistence/serialization/SerializerSpec.scala index 61ce089fcf..91f5797f45 100644 --- a/akka-persistence/src/test/scala/akka/persistence/serialization/SerializerSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/serialization/SerializerSpec.scala @@ -124,11 +124,11 @@ class MessageSerializerPersistenceSpec extends AkkaSpec(customSerializers) { deserialized should be(confirmation) } "handle DeliveredByPersistentChannel message serialization" in { - val confirmation = DeliveredByPersistenceChannel("c2", 14) + val confirmation = DeliveredByPersistentChannel("c2", 14) val serializer = serialization.findSerializerFor(confirmation) val bytes = serializer.toBinary(confirmation) - val deserialized = serializer.fromBinary(bytes, Some(classOf[DeliveredByPersistenceChannel])) + val deserialized = serializer.fromBinary(bytes, Some(classOf[DeliveredByPersistentChannel])) deserialized should be(confirmation) } @@ -149,7 +149,7 @@ object MessageSerializerRemotingSpec { case ConfirmablePersistent(MyPayload(data), _, _) ⇒ sender() ! s"c${data}" case Persistent(MyPayload(data), _) ⇒ sender() ! s"p${data}" case DeliveredByChannel(pid, cid, msnr, dsnr, ep) ⇒ sender() ! s"${pid},${cid},${msnr},${dsnr},${ep.path.name.startsWith("testActor")}" - case DeliveredByPersistenceChannel(cid, msnr, dsnr, ep) ⇒ sender() ! s"${cid},${msnr},${dsnr},${ep.path.name.startsWith("testActor")}" + case DeliveredByPersistentChannel(cid, msnr, dsnr, ep) ⇒ sender() ! s"${cid},${msnr},${dsnr},${ep.path.name.startsWith("testActor")}" case Deliver(Persistent(payload, _), dp) ⇒ context.actorSelection(dp) ! payload } } @@ -194,7 +194,7 @@ class MessageSerializerRemotingSpec extends AkkaSpec(remote.withFallback(customS expectMsg("a,b,2,3,true") } "serialize DeliveredByPersistentChannel messages during remoting" in { - localActor ! DeliveredByPersistenceChannel("c", 2, 3, testActor) + localActor ! DeliveredByPersistentChannel("c", 2, 3, testActor) expectMsg("c,2,3,true") } "serialize Deliver messages during remoting" in {