Chunked messages in reliable delivery, #24276 (#28915)

* To avoid head of line blocking from serialization and transfer of large messages
  this can be enabled.
* ProducerController setting to chunk messages
* split up large messages in chunks in ProducerController
  and assemble again in ConsumerController
* serialization moved to these actors instead of in the Artery stream
* other messages (for other actors) can interleave with the chunks
* serializer for ChunkedMessage in SequencedMessage and MessageSent
* cleanup partially stored chunked messages
* reference docs
* mima filters
* additional test for sending the Request after half window size
* enforce that chunk-large-messages=off for sharding and work-pulling
This commit is contained in:
Patrik Nordwall 2020-09-07 14:02:52 +02:00 committed by GitHub
parent d5bb125ae0
commit a548949143
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
37 changed files with 1916 additions and 243 deletions

View file

@ -6,6 +6,7 @@ package akka.actor.typed.delivery
import scala.concurrent.duration._
import com.typesafe.config.ConfigFactory
import org.scalatest.wordspec.AnyWordSpecLike
import akka.actor.testkit.typed.scaladsl.LogCapturing
@ -13,13 +14,17 @@ import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.actor.typed.delivery.ConsumerController.DeliverThenStop
import akka.actor.typed.delivery.internal.ConsumerControllerImpl
import akka.actor.typed.delivery.internal.ProducerControllerImpl
import akka.serialization.SerializationExtension
class ConsumerControllerSpec extends ScalaTestWithActorTestKit("""
class ConsumerControllerSpec
extends ScalaTestWithActorTestKit(ConfigFactory.parseString("""
akka.reliable-delivery.consumer-controller {
flow-control-window = 20
resend-interval-min = 1s
}
""") with AnyWordSpecLike with LogCapturing {
""").withFallback(TestSerializer.config))
with AnyWordSpecLike
with LogCapturing {
import TestConsumer.sequencedMessage
private var idCount = 0
@ -33,6 +38,8 @@ class ConsumerControllerSpec extends ScalaTestWithActorTestKit("""
private val settings = ConsumerController.Settings(system)
import settings.flowControlWindow
private val serialization = SerializationExtension(system)
"ConsumerController" must {
"resend RegisterConsumer" in {
nextId()
@ -537,6 +544,115 @@ class ConsumerControllerSpec extends ScalaTestWithActorTestKit("""
}
}
"ConsumerController with chunked messages" must {
"collect and assemble chunks" in {
nextId()
val consumerController =
spawn(ConsumerController[TestConsumer.Job](), s"consumerController-${idCount}")
.unsafeUpcast[ConsumerControllerImpl.InternalCommand]
val producerControllerProbe = createTestProbe[ProducerControllerImpl.InternalCommand]()
val consumerProbe = createTestProbe[ConsumerController.Delivery[TestConsumer.Job]]()
consumerController ! ConsumerController.Start(consumerProbe.ref)
// one chunk for each letter, "123" => 3 chunks
val chunks1 = ProducerControllerImpl.createChunks(TestConsumer.Job(s"123"), chunkSize = 1, serialization)
val seqMessages1 = chunks1.zipWithIndex.map {
case (chunk, i) =>
ConsumerController.SequencedMessage.fromChunked(
producerId,
1 + i,
chunk,
first = i == 0,
ack = false,
producerControllerProbe.ref)
}
consumerController ! seqMessages1.head
consumerProbe.expectNoMessage() // need all chunks before delivery
producerControllerProbe.expectMessage(ProducerControllerImpl.Request(0, 20, true, false))
consumerController ! seqMessages1(1)
consumerController ! seqMessages1(2)
consumerProbe.expectMessageType[ConsumerController.Delivery[TestConsumer.Job]].message.payload should ===("123")
consumerController ! ConsumerController.Confirmed
producerControllerProbe.expectMessage(ProducerControllerImpl.Request(3, 22, true, false))
val chunks2 = ProducerControllerImpl.createChunks(TestConsumer.Job(s"45"), chunkSize = 1, serialization)
val seqMessages2 = chunks2.zipWithIndex.map {
case (chunk, i) =>
ConsumerController.SequencedMessage.fromChunked(
producerId,
4 + i,
chunk,
first = false,
ack = true,
producerControllerProbe.ref)
}
consumerController ! seqMessages2.head
consumerController ! seqMessages2(1)
consumerProbe.expectMessageType[ConsumerController.Delivery[TestConsumer.Job]].message.payload should ===("45")
consumerController ! ConsumerController.Confirmed
producerControllerProbe.expectMessage(ProducerControllerImpl.Ack(5))
testKit.stop(consumerController)
}
"send Request after half window size when many chunks" in {
nextId()
val consumerController =
spawn(ConsumerController[TestConsumer.Job](), s"consumerController-${idCount}")
.unsafeUpcast[ConsumerControllerImpl.InternalCommand]
val producerControllerProbe = createTestProbe[ProducerControllerImpl.InternalCommand]()
val consumerProbe = createTestProbe[ConsumerController.Delivery[TestConsumer.Job]]()
consumerController ! ConsumerController.Start(consumerProbe.ref)
// one chunk for each letter, => 25 chunks
val chunks1 =
ProducerControllerImpl.createChunks(
TestConsumer.Job(s"1234567890123456789012345"),
chunkSize = 1,
serialization)
val seqMessages1 = chunks1.zipWithIndex.map {
case (chunk, i) =>
ConsumerController.SequencedMessage.fromChunked(
producerId,
1 + i,
chunk,
first = i == 0,
ack = false,
producerControllerProbe.ref)
}
consumerController ! seqMessages1.head
producerControllerProbe.expectMessage(ProducerControllerImpl.Request(0, 20, true, false))
producerControllerProbe.expectNoMessage() // no more Request yet
(1 to 8).foreach(i => consumerController ! seqMessages1(i))
producerControllerProbe.expectNoMessage() // sent 9, no more Request yet
consumerController ! seqMessages1(9)
producerControllerProbe.expectMessage(ProducerControllerImpl.Request(0, 30, true, false))
(10 to 18).foreach(i => consumerController ! seqMessages1(i))
producerControllerProbe.expectNoMessage() // sent 19, no more Request yet
consumerController ! seqMessages1(19)
producerControllerProbe.expectMessage(ProducerControllerImpl.Request(0, 40, true, false))
// not sending more for a while, timeout will trigger new Request
producerControllerProbe.expectMessage(ProducerControllerImpl.Request(0, 40, true, true))
(20 to 24).foreach(i => consumerController ! seqMessages1(i))
consumerProbe.expectMessageType[ConsumerController.Delivery[TestConsumer.Job]].message.payload should ===(
"1234567890123456789012345")
consumerController ! ConsumerController.Confirmed
testKit.stop(consumerController)
}
}
"ConsumerController without resending" must {
"accept lost message" in {
nextId()

View file

@ -8,19 +8,23 @@ import java.util.concurrent.atomic.AtomicReference
import scala.concurrent.duration._
import DurableProducerQueue.MessageSent
import ProducerController.MessageWithConfirmation
import com.typesafe.config.ConfigFactory
import org.scalatest.wordspec.AnyWordSpecLike
import akka.actor.testkit.typed.scaladsl.LogCapturing
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.actor.typed.delivery.DurableProducerQueue.MessageSent
import akka.actor.typed.delivery.ProducerController.MessageWithConfirmation
import akka.actor.typed.delivery.internal.ChunkedMessage
import akka.actor.typed.delivery.internal.ProducerControllerImpl
import akka.util.ByteString
class DurableProducerControllerSpec
extends ScalaTestWithActorTestKit("""
extends ScalaTestWithActorTestKit(
ConfigFactory.parseString("""
akka.reliable-delivery.consumer-controller.flow-control-window = 20
akka.reliable-delivery.consumer-controller.resend-interval-min = 1s
""")
""").withFallback(TestSerializer.config))
with AnyWordSpecLike
with LogCapturing {
import DurableProducerQueue.NoQualifier
@ -157,6 +161,124 @@ class DurableProducerControllerSpec
testKit.stop(producerController)
}
"store chunked messages" in {
nextId()
val consumerControllerProbe = createTestProbe[ConsumerController.Command[TestConsumer.Job]]()
val stateHolder =
new AtomicReference[DurableProducerQueue.State[TestConsumer.Job]](DurableProducerQueue.State.empty)
val durable =
TestDurableProducerQueue[TestConsumer.Job](
Duration.Zero,
stateHolder,
(_: DurableProducerQueue.Command[_]) => false)
val producerController =
spawn(
ProducerController[TestConsumer.Job](
producerId,
Some(durable),
ProducerController.Settings(system).withChunkLargeMessagesBytes(1)),
s"producerController-${idCount}").unsafeUpcast[ProducerControllerImpl.InternalCommand]
val producerProbe = createTestProbe[ProducerController.RequestNext[TestConsumer.Job]]()
producerController ! ProducerController.Start(producerProbe.ref)
producerController ! ProducerController.RegisterConsumer(consumerControllerProbe.ref)
producerProbe.receiveMessage().sendNextTo ! TestConsumer.Job("abc")
consumerControllerProbe.expectMessageType[ConsumerController.SequencedMessage[TestConsumer.Job]]
producerProbe.awaitAssert {
val durableState = stateHolder.get()
durableState.currentSeqNr should ===(2)
durableState.unconfirmed.size should ===(1)
durableState.unconfirmed.head.message.getClass should ===(classOf[ChunkedMessage])
}
producerController ! ProducerControllerImpl.Request(0L, 10L, true, false)
consumerControllerProbe.expectMessageType[ConsumerController.SequencedMessage[TestConsumer.Job]]
val seqMsg3 = consumerControllerProbe.expectMessageType[ConsumerController.SequencedMessage[TestConsumer.Job]]
seqMsg3.isFirstChunk should ===(false)
seqMsg3.isLastChunk should ===(true)
seqMsg3.seqNr should ===(3L)
producerProbe.awaitAssert {
val durableState = stateHolder.get()
durableState.currentSeqNr should ===(4)
durableState.unconfirmed.size should ===(3)
durableState.unconfirmed.head.message.getClass should ===(classOf[ChunkedMessage])
}
testKit.stop(producerController)
}
"load initial state but don't resend partially stored chunked messages" in {
// may happen if crashed before all chunked messages have been stored,
// should be treated as if none of them were stored (they were not confirmed)
nextId()
val consumerControllerProbe = createTestProbe[ConsumerController.Command[TestConsumer.Job]]()
val durable = TestDurableProducerQueue[TestConsumer.Job](
Duration.Zero,
DurableProducerQueue.State(
currentSeqNr = 5,
highestConfirmedSeqNr = 2,
confirmedSeqNr = Map(NoQualifier -> (2L -> TestTimestamp)),
unconfirmed = Vector(
DurableProducerQueue.MessageSent.fromChunked[TestConsumer.Job](
3,
ChunkedMessage(ByteString.fromString("abc"), true, true, 20, ""),
false,
NoQualifier,
TestTimestamp),
DurableProducerQueue.MessageSent.fromChunked[TestConsumer.Job](
4,
ChunkedMessage(ByteString.fromString("d"), true, false, 20, ""),
false,
NoQualifier,
TestTimestamp),
DurableProducerQueue.MessageSent.fromChunked[TestConsumer.Job](
5,
ChunkedMessage(ByteString.fromString("e"), false, false, 20, ""),
false,
NoQualifier,
TestTimestamp)
// missing last chunk
)))
val producerController =
spawn(
ProducerController[TestConsumer.Job](
producerId,
Some(durable),
ProducerController.Settings(system).withChunkLargeMessagesBytes(1)),
s"producerController-${idCount}").unsafeUpcast[ProducerControllerImpl.InternalCommand]
val producerProbe = createTestProbe[ProducerController.RequestNext[TestConsumer.Job]]()
producerController ! ProducerController.Start(producerProbe.ref)
producerController ! ProducerController.RegisterConsumer(consumerControllerProbe.ref)
val seqMsg3 = consumerControllerProbe.expectMessageType[ConsumerController.SequencedMessage[TestConsumer.Job]]
seqMsg3.seqNr should ===(3)
seqMsg3.isFirstChunk should ===(true)
seqMsg3.isLastChunk should ===(true)
producerController ! ProducerControllerImpl.Request(0L, 10L, true, false)
// 4 and 5 discarded because missing last chunk
consumerControllerProbe.expectNoMessage()
producerProbe.receiveMessage().sendNextTo ! TestConsumer.Job("g")
val seqMsg4 = consumerControllerProbe.expectMessageType[ConsumerController.SequencedMessage[TestConsumer.Job]]
seqMsg4.seqNr should ===(4)
seqMsg4.isFirstChunk should ===(true)
seqMsg4.isLastChunk should ===(true)
testKit.stop(producerController)
}
}
}

View file

@ -0,0 +1,93 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.actor.typed.delivery
import org.scalatest.TestSuite
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec
import akka.actor.typed.delivery.DurableProducerQueue.MessageSent
import akka.actor.typed.delivery.DurableProducerQueue.State
import akka.actor.typed.delivery.internal.ChunkedMessage
import akka.util.ByteString
class DurableProducerQueueSpec extends AnyWordSpec with TestSuite with Matchers {
"DurableProducerQueue.State" must {
"addMessageSent" in {
val state1 = State.empty.addMessageSent(MessageSent(1, "a", false, "", 0L))
state1.unconfirmed.size should ===(1)
state1.unconfirmed.head.message should ===("a")
state1.currentSeqNr should ===(2L)
val state2 = state1.addMessageSent(MessageSent(2, "b", false, "", 0L))
state2.unconfirmed.size should ===(2)
state2.unconfirmed.last.message should ===("b")
state2.currentSeqNr should ===(3L)
}
"confirm" in {
val state1 = State.empty
.addMessageSent(MessageSent(1, "a", false, "", 0L))
.addMessageSent(MessageSent(2, "b", false, "", 0L))
val state2 = state1.confirmed(1L, "", 0L)
state2.unconfirmed.size should ===(1)
state2.unconfirmed.head.message should ===("b")
state2.currentSeqNr should ===(3L)
}
"filter partially stored chunked messages" in {
val state1 = State
.empty[String]
.addMessageSent(
MessageSent.fromChunked(1, ChunkedMessage(ByteString.fromString("a"), true, true, 20, ""), false, "", 0L))
.addMessageSent(
MessageSent.fromChunked(2, ChunkedMessage(ByteString.fromString("b"), true, false, 20, ""), false, "", 0L))
.addMessageSent(
MessageSent.fromChunked(3, ChunkedMessage(ByteString.fromString("c"), false, false, 20, ""), false, "", 0L))
// last chunk was never stored
val state2 = state1.cleanupPartialChunkedMessages()
state2.unconfirmed.size should ===(1)
state2.unconfirmed.head.message.asInstanceOf[ChunkedMessage].serialized should be(ByteString.fromString("a"))
state2.currentSeqNr should ===(2L)
val state3 = state1
.addMessageSent(
MessageSent.fromChunked(2, ChunkedMessage(ByteString.fromString("d"), true, false, 20, ""), false, "", 0L))
.addMessageSent(
MessageSent.fromChunked(3, ChunkedMessage(ByteString.fromString("e"), false, true, 20, ""), false, "", 0L))
val state4 = state3.cleanupPartialChunkedMessages()
state4.unconfirmed.size should ===(3)
state4.unconfirmed.head.message.asInstanceOf[ChunkedMessage].serialized should be(ByteString.fromString("a"))
state4.unconfirmed(1).message.asInstanceOf[ChunkedMessage].serialized should be(ByteString.fromString("d"))
state4.unconfirmed(2).message.asInstanceOf[ChunkedMessage].serialized should be(ByteString.fromString("e"))
state4.currentSeqNr should ===(4L)
val state5 = state3
.addMessageSent(
MessageSent.fromChunked(4, ChunkedMessage(ByteString.fromString("f"), true, false, 20, ""), false, "", 0L))
.addMessageSent(
MessageSent.fromChunked(5, ChunkedMessage(ByteString.fromString("g"), false, false, 20, ""), false, "", 0L))
.addMessageSent(
MessageSent.fromChunked(4, ChunkedMessage(ByteString.fromString("h"), true, true, 20, ""), false, "", 0L))
.addMessageSent(
MessageSent.fromChunked(5, ChunkedMessage(ByteString.fromString("i"), true, false, 20, ""), false, "", 0L))
.addMessageSent(
MessageSent.fromChunked(6, ChunkedMessage(ByteString.fromString("j"), false, false, 20, ""), false, "", 0L))
val state6 = state5.cleanupPartialChunkedMessages()
state6.unconfirmed.size should ===(4)
state6.unconfirmed.head.message.asInstanceOf[ChunkedMessage].serialized should be(ByteString.fromString("a"))
state6.unconfirmed(1).message.asInstanceOf[ChunkedMessage].serialized should be(ByteString.fromString("d"))
state6.unconfirmed(2).message.asInstanceOf[ChunkedMessage].serialized should be(ByteString.fromString("e"))
state6.unconfirmed(3).message.asInstanceOf[ChunkedMessage].serialized should be(ByteString.fromString("h"))
state6.currentSeqNr should ===(5L)
}
}
}

View file

@ -53,10 +53,11 @@ class DurableWorkPullingSpec
s: DurableProducerQueue.State[TestConsumer.Job],
expected: DurableProducerQueue.State[TestConsumer.Job]): Unit = {
def cleanup(a: DurableProducerQueue.State[TestConsumer.Job]) =
def cleanup(a: DurableProducerQueue.State[TestConsumer.Job]): DurableProducerQueue.State[TestConsumer.Job] = {
a.copy(
confirmedSeqNr = Map.empty,
unconfirmed = s.unconfirmed.map(m => m.copy(confirmationQualifier = DurableProducerQueue.NoQualifier)))
unconfirmed = s.unconfirmed.map(m => m.withConfirmationQualifier(DurableProducerQueue.NoQualifier)))
}
cleanup(s) should ===(cleanup(expected))
}

View file

@ -6,17 +6,20 @@ package akka.actor.typed.delivery
import scala.concurrent.duration._
import ProducerController.MessageWithConfirmation
import com.typesafe.config.ConfigFactory
import org.scalatest.wordspec.AnyWordSpecLike
import akka.actor.testkit.typed.scaladsl.LogCapturing
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.actor.typed.delivery.ProducerController.MessageWithConfirmation
import akka.actor.typed.delivery.internal.ChunkedMessage
import akka.actor.typed.delivery.internal.ProducerControllerImpl
class ProducerControllerSpec
extends ScalaTestWithActorTestKit("""
extends ScalaTestWithActorTestKit(
ConfigFactory.parseString("""
akka.reliable-delivery.consumer-controller.flow-control-window = 20
""")
""").withFallback(TestSerializer.config))
with AnyWordSpecLike
with LogCapturing {
import TestConsumer.sequencedMessage
@ -247,6 +250,50 @@ class ProducerControllerSpec
testKit.stop(producerController)
}
"chunk large messages" in {
nextId()
val consumerControllerProbe = createTestProbe[ConsumerController.Command[TestConsumer.Job]]()
val producerController =
spawn(
ProducerController[TestConsumer.Job](
producerId,
None,
ProducerController.Settings(system).withChunkLargeMessagesBytes(1)),
s"producerController-${idCount}").unsafeUpcast[ProducerControllerImpl.InternalCommand]
val producerProbe = createTestProbe[ProducerController.RequestNext[TestConsumer.Job]]()
producerController ! ProducerController.Start(producerProbe.ref)
producerController ! ProducerController.RegisterConsumer(consumerControllerProbe.ref)
producerProbe.receiveMessage().sendNextTo ! TestConsumer.Job("abc")
val seqMsg1 = consumerControllerProbe.expectMessageType[ConsumerController.SequencedMessage[TestConsumer.Job]]
seqMsg1.message.getClass should ===(classOf[ChunkedMessage])
seqMsg1.isFirstChunk should ===(true)
seqMsg1.isLastChunk should ===(false)
seqMsg1.seqNr should ===(1L)
producerController ! ProducerControllerImpl.Request(0L, 10L, true, false)
val seqMsg2 = consumerControllerProbe.expectMessageType[ConsumerController.SequencedMessage[TestConsumer.Job]]
seqMsg2.isFirstChunk should ===(false)
seqMsg2.isLastChunk should ===(false)
seqMsg2.seqNr should ===(2L)
val seqMsg3 = consumerControllerProbe.expectMessageType[ConsumerController.SequencedMessage[TestConsumer.Job]]
seqMsg3.isFirstChunk should ===(false)
seqMsg3.isLastChunk should ===(true)
seqMsg3.seqNr should ===(3L)
producerProbe.receiveMessage().sendNextTo ! TestConsumer.Job("d")
val seqMsg4 = consumerControllerProbe.expectMessageType[ConsumerController.SequencedMessage[TestConsumer.Job]]
seqMsg4.isFirstChunk should ===(true)
seqMsg4.isLastChunk should ===(true)
seqMsg4.seqNr should ===(4L)
testKit.stop(producerController)
}
}
"ProducerController without resends" must {

View file

@ -9,6 +9,8 @@ import java.util.concurrent.atomic.AtomicReference
import scala.concurrent.duration._
import scala.util.Random
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import org.scalatest.wordspec.AnyWordSpecLike
import akka.actor.testkit.typed.scaladsl.LogCapturing
@ -21,6 +23,14 @@ import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.LoggerOps
object ReliableDeliveryRandomSpec {
val config: Config = ConfigFactory.parseString("""
akka.reliable-delivery.consumer-controller {
flow-control-window = 20
resend-interval-min = 500 ms
resend-interval-max = 2 s
}
""")
object RandomFlakyNetwork {
def apply[T](rnd: Random, dropProbability: Any => Double): BehaviorInterceptor[T, T] =
new RandomFlakyNetwork(rnd, dropProbability).asInstanceOf[BehaviorInterceptor[T, T]]
@ -42,15 +52,14 @@ object ReliableDeliveryRandomSpec {
}
}
class ReliableDeliveryRandomSpec extends ScalaTestWithActorTestKit("""
akka.reliable-delivery.consumer-controller {
flow-control-window = 20
resend-interval-min = 500 ms
resend-interval-max = 2 s
}
""") with AnyWordSpecLike with LogCapturing {
class ReliableDeliveryRandomSpec(config: Config)
extends ScalaTestWithActorTestKit(config)
with AnyWordSpecLike
with LogCapturing {
import ReliableDeliveryRandomSpec._
def this() = this(ReliableDeliveryRandomSpec.config)
private var idCount = 0
private def nextId(): Int = {
idCount += 1
@ -90,7 +99,7 @@ class ReliableDeliveryRandomSpec extends ScalaTestWithActorTestKit("""
case _ => 0.0
}
val consumerEndProbe = createTestProbe[TestConsumer.CollectedProducerIds]()
val consumerEndProbe = createTestProbe[TestConsumer.Collected]()
val consumerController =
spawn(
Behaviors.intercept(() => RandomFlakyNetwork[ConsumerController.Command[TestConsumer.Job]](rnd, consumerDrop))(
@ -137,8 +146,8 @@ class ReliableDeliveryRandomSpec extends ScalaTestWithActorTestKit("""
nextId()
val rndSeed = System.currentTimeMillis()
val rnd = new Random(rndSeed)
val consumerDropProbability = 0.1 + rnd.nextDouble() * 0.4
val producerDropProbability = 0.1 + rnd.nextDouble() * 0.3
val consumerDropProbability = 0.1 + rnd.nextDouble() * 0.2
val producerDropProbability = 0.1 + rnd.nextDouble() * 0.2
test(
rndSeed,
rnd,
@ -153,7 +162,7 @@ class ReliableDeliveryRandomSpec extends ScalaTestWithActorTestKit("""
nextId()
val rndSeed = System.currentTimeMillis()
val rnd = new Random(rndSeed)
val durableFailProbability = 0.1 + rnd.nextDouble() * 0.2
val durableFailProbability = 0.1 + rnd.nextDouble() * 0.1
test(
rndSeed,
rnd,
@ -168,9 +177,9 @@ class ReliableDeliveryRandomSpec extends ScalaTestWithActorTestKit("""
nextId()
val rndSeed = System.currentTimeMillis()
val rnd = new Random(rndSeed)
val consumerDropProbability = 0.1 + rnd.nextDouble() * 0.4
val producerDropProbability = 0.1 + rnd.nextDouble() * 0.3
val durableFailProbability = 0.1 + rnd.nextDouble() * 0.2
val consumerDropProbability = 0.1 + rnd.nextDouble() * 0.1
val producerDropProbability = 0.1 + rnd.nextDouble() * 0.1
val durableFailProbability = 0.1 + rnd.nextDouble() * 0.1
test(
rndSeed,
rnd,
@ -200,3 +209,10 @@ class ReliableDeliveryRandomSpec extends ScalaTestWithActorTestKit("""
}
}
// same tests but with chunked messages
class ReliableDeliveryRandomChunkedSpec
extends ReliableDeliveryRandomSpec(
ConfigFactory.parseString("""
akka.reliable-delivery.producer-controller.chunk-large-messages = 1b
""").withFallback(TestSerializer.config).withFallback(ReliableDeliveryRandomSpec.config))

View file

@ -6,20 +6,30 @@ package akka.actor.typed.delivery
import scala.concurrent.duration._
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import org.scalatest.wordspec.AnyWordSpecLike
import akka.actor.testkit.typed.scaladsl.LogCapturing
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
class ReliableDeliverySpec
extends ScalaTestWithActorTestKit("""
akka.reliable-delivery.consumer-controller.flow-control-window = 20
""")
object ReliableDeliverySpec {
val config: Config = ConfigFactory.parseString("""
akka.reliable-delivery.consumer-controller.flow-control-window = 20
""")
}
class ReliableDeliverySpec(config: Config)
extends ScalaTestWithActorTestKit(config)
with AnyWordSpecLike
with LogCapturing {
import TestConsumer.defaultConsumerDelay
import TestProducer.defaultProducerDelay
def this() = this(ReliableDeliverySpec.config)
private val chunked = ProducerController.Settings(system).chunkLargeMessagesBytes > 0
private var idCount = 0
private def nextId(): Int = {
idCount += 1
@ -30,7 +40,7 @@ class ReliableDeliverySpec
"illustrate point-to-point usage" in {
nextId()
val consumerEndProbe = createTestProbe[TestConsumer.CollectedProducerIds]()
val consumerEndProbe = createTestProbe[TestConsumer.Collected]()
val consumerController =
spawn(ConsumerController[TestConsumer.Job](), s"consumerController-${idCount}")
spawn(
@ -52,7 +62,7 @@ class ReliableDeliverySpec
"illustrate point-to-point usage with ask" in {
nextId()
val consumerEndProbe = createTestProbe[TestConsumer.CollectedProducerIds]()
val consumerEndProbe = createTestProbe[TestConsumer.Collected]()
val consumerController =
spawn(ConsumerController[TestConsumer.Job](), s"consumerController-${idCount}")
spawn(
@ -70,9 +80,11 @@ class ReliableDeliverySpec
consumerController ! ConsumerController.RegisterToProducerController(producerController)
consumerEndProbe.receiveMessage(5.seconds)
replyProbe.receiveMessages(42, 5.seconds).toSet should ===((1L to 42L).toSet)
val messageCount = consumerEndProbe.receiveMessage(5.seconds).messageCount
if (chunked)
replyProbe.receiveMessages(messageCount, 5.seconds)
else
replyProbe.receiveMessages(messageCount, 5.seconds).toSet should ===((1L to 42).toSet)
testKit.stop(producer)
testKit.stop(producerController)
@ -81,7 +93,7 @@ class ReliableDeliverySpec
def testWithDelays(producerDelay: FiniteDuration, consumerDelay: FiniteDuration): Unit = {
nextId()
val consumerEndProbe = createTestProbe[TestConsumer.CollectedProducerIds]()
val consumerEndProbe = createTestProbe[TestConsumer.Collected]()
val consumerController =
spawn(ConsumerController[TestConsumer.Job](), s"consumerController-${idCount}")
spawn(TestConsumer(consumerDelay, 42, consumerEndProbe.ref, consumerController), name = s"destination-${idCount}")
@ -113,7 +125,7 @@ class ReliableDeliverySpec
"allow replacement of destination" in {
nextId()
val consumerEndProbe = createTestProbe[TestConsumer.CollectedProducerIds]()
val consumerEndProbe = createTestProbe[TestConsumer.Collected]()
val consumerController =
spawn(ConsumerController[TestConsumer.Job](), s"consumerController1-${idCount}")
spawn(TestConsumer(defaultConsumerDelay, 42, consumerEndProbe.ref, consumerController), s"consumer1-${idCount}")
@ -126,7 +138,7 @@ class ReliableDeliverySpec
consumerEndProbe.receiveMessage(5.seconds)
val consumerEndProbe2 = createTestProbe[TestConsumer.CollectedProducerIds]()
val consumerEndProbe2 = createTestProbe[TestConsumer.Collected]()
val consumerController2 =
spawn(ConsumerController[TestConsumer.Job](), s"consumerController2-${idCount}")
spawn(TestConsumer(defaultConsumerDelay, 42, consumerEndProbe2.ref, consumerController2), s"consumer2-${idCount}")
@ -189,3 +201,10 @@ class ReliableDeliverySpec
}
}
// Same tests but with chunked messages
class ReliableDeliveryChunkedSpec
extends ReliableDeliverySpec(
ConfigFactory.parseString("""
akka.reliable-delivery.producer-controller.chunk-large-messages = 1b
""").withFallback(TestSerializer.config).withFallback(ReliableDeliverySpec.config))

View file

@ -4,17 +4,22 @@
package akka.actor.typed.delivery
import scala.concurrent.duration._
import java.nio.charset.StandardCharsets
import scala.concurrent.duration.Duration
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.duration._
import ConsumerController.SequencedMessage
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.actor.typed.delivery.ConsumerController.SequencedMessage
import akka.actor.typed.delivery.internal.ProducerControllerImpl
import akka.actor.typed.scaladsl.ActorContext
import akka.actor.typed.scaladsl.Behaviors
import akka.serialization.SerializerWithStringManifest
object TestConsumer {
@ -33,7 +38,7 @@ object TestConsumer {
seqNr: Long)
extends Command
final case class CollectedProducerIds(producerIds: Set[String])
final case class Collected(producerIds: Set[String], messageCount: Int)
val defaultConsumerDelay: FiniteDuration = 10.millis
@ -53,17 +58,17 @@ object TestConsumer {
def apply(
delay: FiniteDuration,
endSeqNr: Long,
endReplyTo: ActorRef[CollectedProducerIds],
endReplyTo: ActorRef[Collected],
controller: ActorRef[ConsumerController.Start[TestConsumer.Job]]): Behavior[Command] =
apply(delay, consumerEndCondition(endSeqNr), endReplyTo, controller)
def apply(
delay: FiniteDuration,
endCondition: SomeAsyncJob => Boolean,
endReplyTo: ActorRef[CollectedProducerIds],
endReplyTo: ActorRef[Collected],
controller: ActorRef[ConsumerController.Start[TestConsumer.Job]]): Behavior[Command] =
Behaviors.setup[Command] { ctx =>
new TestConsumer(ctx, delay, endCondition, endReplyTo, controller).active(Set.empty)
new TestConsumer(ctx, delay, endCondition, endReplyTo, controller).active(Set.empty, 0)
}
}
@ -72,7 +77,7 @@ class TestConsumer(
ctx: ActorContext[TestConsumer.Command],
delay: FiniteDuration,
endCondition: TestConsumer.SomeAsyncJob => Boolean,
endReplyTo: ActorRef[TestConsumer.CollectedProducerIds],
endReplyTo: ActorRef[TestConsumer.Collected],
controller: ActorRef[ConsumerController.Start[TestConsumer.Job]]) {
import TestConsumer._
@ -83,10 +88,11 @@ class TestConsumer(
controller ! ConsumerController.Start(deliverTo)
private def active(processed: Set[(String, Long)]): Behavior[Command] = {
private def active(processed: Set[(String, Long)], messageCount: Int): Behavior[Command] = {
Behaviors.receive { (ctx, m) =>
m match {
case JobDelivery(msg, confirmTo, producerId, seqNr) =>
ctx.log.trace("SeqNr [{}] was delivered to consumer.", seqNr)
// confirmation can be later, asynchronously
if (delay == Duration.Zero)
ctx.self ! SomeAsyncJob(msg, confirmTo, producerId, seqNr)
@ -106,12 +112,36 @@ class TestConsumer(
confirmTo ! ConsumerController.Confirmed
if (endCondition(job)) {
endReplyTo ! CollectedProducerIds(processed.map(_._1))
ctx.log.debug("End at [{}]", seqNr)
endReplyTo ! Collected(processed.map(_._1), messageCount + 1)
Behaviors.stopped
} else
active(cleanProcessed + (producerId -> seqNr))
active(cleanProcessed + (producerId -> seqNr), messageCount + 1)
}
}
}
}
object TestSerializer {
val config: Config = ConfigFactory.parseString(s"""
akka.actor.serializers.delivery-test = ${classOf[TestSerializer].getName}
akka.actor.serialization-bindings {
"${classOf[TestConsumer.Job].getName}" = delivery-test
}
""")
}
class TestSerializer extends SerializerWithStringManifest {
override def identifier: Int = 787878
override def manifest(o: AnyRef): String = ""
override def toBinary(o: AnyRef): Array[Byte] =
o match {
case TestConsumer.Job(payload) => payload.getBytes(StandardCharsets.UTF_8)
}
override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef =
TestConsumer.Job(new String(bytes, StandardCharsets.UTF_8))
}

View file

@ -28,7 +28,7 @@ object TestDurableProducerQueue {
.supervise {
Behaviors.setup[Command[A]] { context =>
context.setLoggerName("TestDurableProducerQueue")
val state = stateHolder.get()
val state = stateHolder.get().cleanupPartialChunkedMessages()
context.log.info("Starting with seqNr [{}], confirmedSeqNr [{}]", state.currentSeqNr, state.confirmedSeqNr)
new TestDurableProducerQueue[A](context, delay, stateHolder, failWhen).active(state)
}
@ -70,12 +70,9 @@ class TestDurableProducerQueue[A](
maybeFail(cmd)
val reply = StoreMessageSentAck(cmd.sent.seqNr)
if (delay == Duration.Zero) cmd.replyTo ! reply else context.scheduleOnce(delay, cmd.replyTo, reply)
active(
state.copy(
currentSeqNr = cmd.sent.seqNr + 1,
unconfirmed = state.unconfirmed :+ cmd.sent.copy(timestampMillis = TestTimestamp)))
active(state.addMessageSent(cmd.sent.withTimestampMillis(TestTimestamp)))
} else if (cmd.sent.seqNr == state.currentSeqNr - 1) {
// already stored, could be a retry after timout
// already stored, could be a retry after timeout
context.log.info("Duplicate seqNr [{}], currentSeqNr [{}]", cmd.sent.seqNr, state.currentSeqNr)
val reply = StoreMessageSentAck(cmd.sent.seqNr)
if (delay == Duration.Zero) cmd.replyTo ! reply else context.scheduleOnce(delay, cmd.replyTo, reply)
@ -92,15 +89,7 @@ class TestDurableProducerQueue[A](
cmd.seqNr,
cmd.confirmationQualifier)
maybeFail(cmd)
val newUnconfirmed = state.unconfirmed.filterNot { u =>
u.confirmationQualifier == cmd.confirmationQualifier && u.seqNr <= cmd.seqNr
}
val newHighestConfirmed = math.max(state.highestConfirmedSeqNr, cmd.seqNr)
active(
state.copy(
highestConfirmedSeqNr = newHighestConfirmed,
confirmedSeqNr = state.confirmedSeqNr.updated(cmd.confirmationQualifier, (cmd.seqNr, TestTimestamp)),
unconfirmed = newUnconfirmed))
active(state.confirmed(cmd.seqNr, cmd.confirmationQualifier, TestTimestamp))
}
}

View file

@ -4,8 +4,8 @@
package akka.actor.typed.delivery
import scala.concurrent.duration._
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.duration._
import scala.util.Failure
import scala.util.Success
@ -43,12 +43,17 @@ object TestProducerWithAsk {
}
private def idle(n: Int, replyProbe: ActorRef[Long]): Behavior[Command] = {
Behaviors.receiveMessage {
case Tick => Behaviors.same
case RequestNext(sendTo) => active(n + 1, replyProbe, sendTo)
case Confirmed(seqNr) =>
replyProbe ! seqNr
Behaviors.same
Behaviors.receive { (ctx, msg) =>
msg match {
case Tick => Behaviors.same
case RequestNext(sendTo) => active(n + 1, replyProbe, sendTo)
case Confirmed(seqNr) =>
replyProbe ! seqNr
Behaviors.same
case AskTimeout =>
ctx.log.warn("Timeout")
Behaviors.same
}
}
}

View file

@ -56,14 +56,14 @@ class WorkPullingSpec
val jobProducer =
spawn(TestProducerWorkPulling(defaultProducerDelay, workPullingController), name = s"jobProducer-${idCount}")
val consumerEndProbe1 = createTestProbe[TestConsumer.CollectedProducerIds]()
val consumerEndProbe1 = createTestProbe[TestConsumer.Collected]()
val workerController1 =
spawn(ConsumerController[TestConsumer.Job](workerServiceKey), s"workerController1-${idCount}")
spawn(
TestConsumer(defaultConsumerDelay, 42, consumerEndProbe1.ref, workerController1),
name = s"worker1-${idCount}")
val consumerEndProbe2 = createTestProbe[TestConsumer.CollectedProducerIds]()
val consumerEndProbe2 = createTestProbe[TestConsumer.Collected]()
val workerController2 =
spawn(ConsumerController[TestConsumer.Job](workerServiceKey), s"workerController2-${idCount}")
spawn(

View file

@ -0,0 +1,14 @@
# #24276 Chunked messages in Reliable Delivery
ProblemFilters.exclude[Problem]("akka.actor.typed.delivery.internal.*")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.typed.delivery.ProducerController#Settings.this")
ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.actor.typed.delivery.DurableProducerQueue#MessageSent.apply")
ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.actor.typed.delivery.DurableProducerQueue#MessageSent.unapply")
ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.actor.typed.delivery.ConsumerController#SequencedMessage.apply")
ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.actor.typed.delivery.ConsumerController#SequencedMessage.unapply")
ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.actor.typed.delivery.ConsumerController#SequencedMessage.message")
ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.actor.typed.delivery.ConsumerController#SequencedMessage.copy")
ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.actor.typed.delivery.ConsumerController#SequencedMessage.copy$default$3")
ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.actor.typed.delivery.ConsumerController#SequencedMessage.this")
ProblemFilters.exclude[Problem]("akka.actor.typed.delivery.DurableProducerQueue$MessageSent*")
ProblemFilters.exclude[Problem]("akka.actor.typed.delivery.DurableProducerQueue#MessageSent*")

View file

@ -67,6 +67,15 @@ akka.use-slf4j = on
akka.reliable-delivery {
producer-controller {
# To avoid head of line blocking from serialization and transfer
# of large messages this can be enabled.
# Large messages are chunked into pieces of the given size in bytes. The
# chunked messages are sent separatetely and assembled on the consumer side.
# Serialization and deserialization is performed by the ProducerController and
# ConsumerController respectively instead of in the remote transport layer.
chunk-large-messages = off
durable-queue {
# The ProducerController uses this timeout for the requests to
# the durable queue. If there is no reply within the timeout it
@ -111,6 +120,10 @@ akka.reliable-delivery {
# Ask timeout for sending message to worker until receiving Ack from worker
internal-ask-timeout = 60s
# Chunked messages not implemented for work-pulling yet. Override to not
# propagate property from akka.reliable-delivery.producer-controller.
chunk-large-messages = off
}
}
}

View file

@ -14,6 +14,7 @@ import akka.actor.DeadLetterSuppression
import akka.actor.typed.ActorRef
import akka.actor.typed.ActorSystem
import akka.actor.typed.Behavior
import akka.actor.typed.delivery.internal.ChunkedMessage
import akka.actor.typed.delivery.internal.ConsumerControllerImpl
import akka.actor.typed.delivery.internal.DeliverySerializable
import akka.actor.typed.delivery.internal.ProducerControllerImpl
@ -124,6 +125,26 @@ object ConsumerController {
final case class DeliverThenStop[A]() extends Command[A]
object SequencedMessage {
/**
* SequencedMessage.message can be `A` or `ChunkedMessage`.
*/
type MessageOrChunk = Any
/**
* INTERNAL API
*/
@InternalApi private[akka] def fromChunked[A](
producerId: String,
seqNr: SeqNr,
chunk: ChunkedMessage,
first: Boolean,
ack: Boolean,
producerController: ActorRef[ProducerControllerImpl.InternalCommand]): SequencedMessage[A] =
new SequencedMessage(producerId, seqNr, chunk, first, ack)(producerController)
}
/**
* This is used between the `ProducerController` and `ConsumerController`. Should rarely be used in
* application code but is public because it's in the signature for the `EntityTypeKey` when using
@ -135,8 +156,12 @@ object ConsumerController {
*
* @param producerController INTERNAL API: construction of SequencedMessage is internal
*/
final case class SequencedMessage[A](producerId: String, seqNr: SeqNr, message: A, first: Boolean, ack: Boolean)(
@InternalApi private[akka] val producerController: ActorRef[ProducerControllerImpl.InternalCommand])
final case class SequencedMessage[A](
producerId: String,
seqNr: SeqNr,
message: SequencedMessage.MessageOrChunk,
first: Boolean,
ack: Boolean)(@InternalApi private[akka] val producerController: ActorRef[ProducerControllerImpl.InternalCommand])
extends Command[A]
with DeliverySerializable
with DeadLetterSuppression {
@ -144,6 +169,22 @@ object ConsumerController {
/** INTERNAL API */
@InternalApi private[akka] def asFirst: SequencedMessage[A] =
copy(first = true)(producerController)
/** INTERNAL API */
@InternalApi private[akka] def isFirstChunk: Boolean = {
message match {
case c: ChunkedMessage => c.firstChunk
case _ => true
}
}
/** INTERNAL API */
@InternalApi private[akka] def isLastChunk: Boolean = {
message match {
case c: ChunkedMessage => c.lastChunk
case _ => true
}
}
}
object Settings {

View file

@ -7,6 +7,7 @@ package akka.actor.typed.delivery
import scala.collection.immutable
import akka.actor.typed.ActorRef
import akka.actor.typed.delivery.internal.ChunkedMessage
import akka.actor.typed.delivery.internal.DeliverySerializable
import akka.annotation.ApiMayChange
import akka.annotation.InternalApi
@ -67,7 +68,60 @@ object DurableProducerQueue {
highestConfirmedSeqNr: SeqNr,
confirmedSeqNr: Map[ConfirmationQualifier, (SeqNr, TimestampMillis)],
unconfirmed: immutable.IndexedSeq[MessageSent[A]])
extends DeliverySerializable
extends DeliverySerializable {
def addMessageSent(sent: MessageSent[A]): State[A] = {
copy(currentSeqNr = sent.seqNr + 1, unconfirmed = unconfirmed :+ sent)
}
def confirmed(
seqNr: SeqNr,
confirmationQualifier: ConfirmationQualifier,
timestampMillis: TimestampMillis): State[A] = {
val newUnconfirmed = unconfirmed.filterNot { u =>
u.confirmationQualifier == confirmationQualifier && u.seqNr <= seqNr
}
copy(
highestConfirmedSeqNr = math.max(highestConfirmedSeqNr, seqNr),
confirmedSeqNr = confirmedSeqNr.updated(confirmationQualifier, (seqNr, timestampMillis)),
unconfirmed = newUnconfirmed)
}
def cleanup(confirmationQualifiers: Set[String]): State[A] = {
copy(confirmedSeqNr = confirmedSeqNr -- confirmationQualifiers)
}
/**
* If not all chunked messages were stored before crash those partial chunked messages should not be resent.
*/
def cleanupPartialChunkedMessages(): State[A] = {
if (unconfirmed.isEmpty || unconfirmed.forall(u => u.isFirstChunk && u.isLastChunk)) {
this
} else {
val tmp = Vector.newBuilder[MessageSent[A]]
val newUnconfirmed = Vector.newBuilder[MessageSent[A]]
var newCurrentSeqNr = highestConfirmedSeqNr + 1
unconfirmed.foreach { u =>
if (u.isFirstChunk && u.isLastChunk) {
tmp.clear()
newUnconfirmed += u
newCurrentSeqNr = u.seqNr + 1
} else if (u.isFirstChunk && !u.isLastChunk) {
tmp.clear()
tmp += u
} else if (!u.isLastChunk) {
tmp += u
} else if (u.isLastChunk) {
newUnconfirmed ++= tmp.result()
newUnconfirmed += u
newCurrentSeqNr = u.seqNr + 1
tmp.clear()
}
}
copy(currentSeqNr = newCurrentSeqNr, unconfirmed = newUnconfirmed.result())
}
}
}
/**
* INTERNAL API
@ -77,13 +131,92 @@ object DurableProducerQueue {
/**
* The fact (event) that a message has been sent.
*/
final case class MessageSent[A](
seqNr: SeqNr,
message: A,
ack: Boolean,
confirmationQualifier: ConfirmationQualifier,
timestampMillis: TimestampMillis)
extends Event
final class MessageSent[A](
val seqNr: SeqNr,
val message: MessageSent.MessageOrChunk,
val ack: Boolean,
val confirmationQualifier: ConfirmationQualifier,
val timestampMillis: TimestampMillis)
extends Event {
/** INTERNAL API */
@InternalApi private[akka] def isFirstChunk: Boolean = {
message match {
case c: ChunkedMessage => c.firstChunk
case _ => true
}
}
/** INTERNAL API */
@InternalApi private[akka] def isLastChunk: Boolean = {
message match {
case c: ChunkedMessage => c.lastChunk
case _ => true
}
}
def withConfirmationQualifier(qualifier: ConfirmationQualifier): MessageSent[A] =
new MessageSent(seqNr, message, ack, qualifier, timestampMillis)
def withTimestampMillis(timestamp: TimestampMillis): MessageSent[A] =
new MessageSent(seqNr, message, ack, confirmationQualifier, timestamp)
override def equals(obj: Any): Boolean = {
obj match {
case other: MessageSent[_] =>
seqNr == other.seqNr && message == other.message && ack == other.ack && confirmationQualifier == other.confirmationQualifier && timestampMillis == other.timestampMillis
case _ => false
}
}
override def hashCode(): Int = seqNr.hashCode()
override def toString: ConfirmationQualifier =
s"MessageSent($seqNr,$message,$ack,$confirmationQualifier,$timestampMillis)"
}
object MessageSent {
/**
* SequencedMessage.message can be `A` or `ChunkedMessage`.
*/
type MessageOrChunk = Any
def apply[A](
seqNr: SeqNr,
message: A,
ack: Boolean,
confirmationQualifier: ConfirmationQualifier,
timestampMillis: TimestampMillis): MessageSent[A] =
new MessageSent(seqNr, message, ack, confirmationQualifier, timestampMillis)
/**
* INTERNAL API
*/
@InternalApi private[akka] def fromChunked[A](
seqNr: SeqNr,
chunkedMessage: ChunkedMessage,
ack: Boolean,
confirmationQualifier: ConfirmationQualifier,
timestampMillis: TimestampMillis): MessageSent[A] =
new MessageSent(seqNr, chunkedMessage, ack, confirmationQualifier, timestampMillis)
/**
* INTERNAL API
*/
@InternalApi private[akka] def fromMessageOrChunked[A](
seqNr: SeqNr,
message: MessageOrChunk,
ack: Boolean,
confirmationQualifier: ConfirmationQualifier,
timestampMillis: TimestampMillis): MessageSent[A] =
new MessageSent(seqNr, message, ack, confirmationQualifier, timestampMillis)
def unapply(
sent: MessageSent[_]): Option[(SeqNr, MessageOrChunk, Boolean, ConfirmationQualifier, TimestampMillis)] =
Some((sent.seqNr, sent.message, sent.ack, sent.confirmationQualifier, sent.timestampMillis))
}
/**
* INTERNAL API: The fact (event) that a message has been confirmed to be delivered and processed.

View file

@ -21,6 +21,8 @@ import akka.actor.typed.delivery.internal.ProducerControllerImpl
import akka.actor.typed.scaladsl.Behaviors
import akka.annotation.ApiMayChange
import akka.annotation.InternalApi
import akka.util.Helpers.toRootLowerCase
import akka.util.Helpers.Requiring
import akka.util.JavaDurationConverters._
/**
@ -153,10 +155,16 @@ object ProducerController {
* `akka.reliable-delivery.producer-controller`.
*/
def apply(config: Config): Settings = {
val chunkLargeMessagesBytes = toRootLowerCase(config.getString("chunk-large-messages")) match {
case "off" => 0
case _ =>
config.getBytes("chunk-large-messages").requiring(_ <= Int.MaxValue, "Too large chunk-large-messages.").toInt
}
new Settings(
durableQueueRequestTimeout = config.getDuration("durable-queue.request-timeout").asScala,
durableQueueRetryAttempts = config.getInt("durable-queue.retry-attempts"),
durableQueueResendFirstInterval = config.getDuration("durable-queue.resend-first-interval").asScala)
durableQueueResendFirstInterval = config.getDuration("durable-queue.resend-first-interval").asScala,
chunkLargeMessagesBytes)
}
/**
@ -177,7 +185,8 @@ object ProducerController {
final class Settings private (
val durableQueueRequestTimeout: FiniteDuration,
val durableQueueRetryAttempts: Int,
val durableQueueResendFirstInterval: FiniteDuration) {
val durableQueueResendFirstInterval: FiniteDuration,
val chunkLargeMessagesBytes: Int) {
def withDurableQueueRetryAttempts(newDurableQueueRetryAttempts: Int): Settings =
copy(durableQueueRetryAttempts = newDurableQueueRetryAttempts)
@ -212,17 +221,25 @@ object ProducerController {
def getDurableQueueRequestTimeout(): JavaDuration =
durableQueueRequestTimeout.asJava
def withChunkLargeMessagesBytes(newChunkLargeMessagesBytes: Int): Settings =
copy(chunkLargeMessagesBytes = newChunkLargeMessagesBytes)
/**
* Private copy method for internal use only.
*/
private def copy(
durableQueueRequestTimeout: FiniteDuration = durableQueueRequestTimeout,
durableQueueRetryAttempts: Int = durableQueueRetryAttempts,
durableQueueResendFirstInterval: FiniteDuration = durableQueueResendFirstInterval) =
new Settings(durableQueueRequestTimeout, durableQueueRetryAttempts, durableQueueResendFirstInterval)
durableQueueResendFirstInterval: FiniteDuration = durableQueueResendFirstInterval,
chunkLargeMessagesBytes: Int = chunkLargeMessagesBytes) =
new Settings(
durableQueueRequestTimeout,
durableQueueRetryAttempts,
durableQueueResendFirstInterval,
chunkLargeMessagesBytes)
override def toString: String =
s"Settings($durableQueueRequestTimeout, $durableQueueRetryAttempts, $durableQueueResendFirstInterval)"
s"Settings($durableQueueRequestTimeout, $durableQueueRetryAttempts, $durableQueueResendFirstInterval, $chunkLargeMessagesBytes)"
}
def apply[A: ClassTag](

View file

@ -178,6 +178,9 @@ object WorkPullingProducerController {
val internalAskTimeout: FiniteDuration,
val producerControllerSettings: ProducerController.Settings) {
if (producerControllerSettings.chunkLargeMessagesBytes > 0)
throw new IllegalArgumentException("Chunked messages not implemented for work-pulling yet.")
def withBufferSize(newBufferSize: Int): Settings =
copy(bufferSize = newBufferSize)

View file

@ -0,0 +1,22 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.actor.typed.delivery.internal
import akka.annotation.InternalApi
import akka.util.ByteString
/**
* INTERNAL API
*/
@InternalApi private[akka] final case class ChunkedMessage(
serialized: ByteString,
firstChunk: Boolean,
lastChunk: Boolean,
serializerId: Int,
manifest: String) {
override def toString: String =
s"ChunkedMessage(${serialized.size},$firstChunk,$lastChunk,$serializerId,$manifest)"
}

View file

@ -21,6 +21,8 @@ import akka.actor.typed.scaladsl.LoggerOps
import akka.actor.typed.scaladsl.StashBuffer
import akka.actor.typed.scaladsl.TimerScheduler
import akka.annotation.InternalApi
import akka.serialization.SerializationExtension
import akka.util.ByteString
import akka.util.ConstantFun.scalaIdentityFunction
/**
@ -86,6 +88,7 @@ import akka.util.ConstantFun.scalaIdentityFunction
receivedSeqNr: SeqNr,
confirmedSeqNr: SeqNr,
requestedSeqNr: SeqNr,
collectedChunks: List[SequencedMessage[A]],
registering: Option[ActorRef[ProducerController.Command[A]]],
stopping: Boolean) {
@ -101,6 +104,11 @@ import akka.util.ConstantFun.scalaIdentityFunction
case s @ Some(reg) => if (seqMsg.producerController == reg) None else s
}
}
def clearCollectedChunks(): State[A] = {
if (collectedChunks == Nil) this
else copy(collectedChunks = Nil)
}
}
def apply[A](
@ -202,6 +210,7 @@ import akka.util.ConstantFun.scalaIdentityFunction
receivedSeqNr = 0,
confirmedSeqNr = 0,
requestedSeqNr = 0,
collectedChunks = Nil,
registering,
stopping)
}
@ -269,6 +278,8 @@ private class ConsumerControllerImpl[A](
private val traceEnabled = context.log.isTraceEnabled
private lazy val serialization = SerializationExtension(context.system)
retryTimer.start()
private def resendLost = !settings.onlyFlowControl
@ -475,8 +486,56 @@ private class ConsumerControllerImpl[A](
}
private def deliver(s: State[A], seqMsg: SequencedMessage[A]): Behavior[InternalCommand] = {
s.consumer ! Delivery(seqMsg.message, context.self, seqMsg.producerId, seqMsg.seqNr)
waitingForConfirmation(s, seqMsg)
def previouslyCollectedChunks = if (seqMsg.isFirstChunk) Nil else s.collectedChunks
if (seqMsg.isLastChunk) {
val assembledSeqMsg =
if (seqMsg.message.isInstanceOf[ChunkedMessage]) assembleChunks(seqMsg :: previouslyCollectedChunks)
else seqMsg
s.consumer ! Delivery(assembledSeqMsg.message.asInstanceOf[A], context.self, seqMsg.producerId, seqMsg.seqNr)
waitingForConfirmation(s.clearCollectedChunks(), assembledSeqMsg)
} else {
// collecting chunks
val newRequestedSeqNr =
if ((s.requestedSeqNr - seqMsg.seqNr) == flowControlWindow / 2) {
val newRequestedSeqNr = s.requestedSeqNr + flowControlWindow / 2
flightRecorder.consumerSentRequest(seqMsg.producerId, newRequestedSeqNr)
context.log.debugN(
"Sending Request when collecting chunks seqNr [{}], confirmedSeqNr [{}], requestUpToSeqNr [{}].",
seqMsg.seqNr,
s.confirmedSeqNr,
newRequestedSeqNr)
s.producerController ! Request(
confirmedSeqNr = s.confirmedSeqNr,
newRequestedSeqNr,
resendLost,
viaTimeout = false)
retryTimer.start() // reset interval since Request was just sent
newRequestedSeqNr
} else {
s.requestedSeqNr
}
stashBuffer.unstash(
active(s.copy(collectedChunks = seqMsg :: previouslyCollectedChunks, requestedSeqNr = newRequestedSeqNr)),
1,
scalaIdentityFunction)
}
}
private def assembleChunks(collectedChunks: List[SequencedMessage[A]]): SequencedMessage[A] = {
val reverseCollectedChunks = collectedChunks.reverse
val builder = ByteString.createBuilder
reverseCollectedChunks.foreach { seqMsg =>
builder ++= seqMsg.message.asInstanceOf[ChunkedMessage].serialized
}
val bytes = builder.result().toArray
val head = collectedChunks.head // this is the last chunk
val headMessage = head.message.asInstanceOf[ChunkedMessage]
// serialization exceptions are thrown, because it will anyway be stuck with same error if retried and
// we can't just ignore the message
val message = serialization.deserialize(bytes, headMessage.serializerId, headMessage.manifest).get
SequencedMessage(head.producerId, head.seqNr, message, reverseCollectedChunks.head.first, head.ack)(
head.producerController)
}
// The message has been delivered to the consumer and it is now waiting for Confirmed from
@ -564,7 +623,7 @@ private class ConsumerControllerImpl[A](
Behaviors.same
case start: Start[A] =>
start.deliverTo ! Delivery(seqMsg.message, context.self, seqMsg.producerId, seqMsg.seqNr)
start.deliverTo ! Delivery(seqMsg.message.asInstanceOf[A], context.self, seqMsg.producerId, seqMsg.seqNr)
receiveStart(s, start, newState => waitingForConfirmation(newState, seqMsg))
case ConsumerTerminated(c) =>

View file

@ -6,6 +6,7 @@ package akka.actor.typed.delivery.internal
import java.util.concurrent.TimeoutException
import scala.collection.immutable
import scala.reflect.ClassTag
import scala.util.Failure
import scala.util.Success
@ -23,6 +24,10 @@ import akka.actor.typed.scaladsl.ActorContext
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.LoggerOps
import akka.actor.typed.scaladsl.TimerScheduler
import akka.serialization.Serialization
import akka.serialization.SerializationExtension
import akka.serialization.Serializers
import akka.util.ByteString
import akka.util.Timeout
/**
@ -100,16 +105,16 @@ object ProducerControllerImpl {
private case class Msg[A](msg: A) extends InternalCommand
private case object ResendFirst extends InternalCommand
case object ResendFirstUnconfirmed extends InternalCommand
private case object SendChunk extends InternalCommand
private case class LoadStateReply[A](state: DurableProducerQueue.State[A]) extends InternalCommand
private case class LoadStateFailed(attempt: Int) extends InternalCommand
private case class StoreMessageSentReply(ack: DurableProducerQueue.StoreMessageSentAck)
private case class StoreMessageSentFailed[A](messageSent: DurableProducerQueue.MessageSent[A], attempt: Int)
extends InternalCommand
private case object DurableQueueTerminated extends InternalCommand
private case class StoreMessageSentCompleted[A](messageSent: DurableProducerQueue.MessageSent[A])
extends InternalCommand
private case object DurableQueueTerminated extends InternalCommand
private final case class State[A](
requested: Boolean,
@ -119,6 +124,8 @@ object ProducerControllerImpl {
replyAfterStore: Map[SeqNr, ActorRef[SeqNr]],
supportResend: Boolean,
unconfirmed: Vector[ConsumerController.SequencedMessage[A]],
remainingChunks: immutable.Seq[SequencedMessage[A]],
storeMessageSentInProgress: SeqNr,
firstSeqNr: SeqNr,
producer: ActorRef[ProducerController.RequestNext[A]],
send: ConsumerController.SequencedMessage[A] => Unit)
@ -236,6 +243,8 @@ object ProducerControllerImpl {
replyAfterStore = Map.empty,
supportResend = true,
unconfirmed = unconfirmed,
remainingChunks = Nil,
storeMessageSentInProgress = 0,
firstSeqNr = loadedState.highestConfirmedSeqNr + 1,
producer,
send)
@ -329,6 +338,30 @@ object ProducerControllerImpl {
throw new IllegalArgumentException(s"Consumer [$ref] should be local.")
}
def createChunks[A](m: A, chunkSize: Int, serialization: Serialization): immutable.Seq[ChunkedMessage] = {
val mAnyRef = m.asInstanceOf[AnyRef]
// serialization exceptions are thrown
val bytes = serialization.serialize(mAnyRef).get
val ser = serialization.findSerializerFor(mAnyRef)
val manifest = Serializers.manifestFor(ser, mAnyRef)
val serializerId = ser.identifier
if (bytes.length <= chunkSize) {
ChunkedMessage(ByteString(bytes), firstChunk = true, lastChunk = true, serializerId, manifest) :: Nil
} else {
val builder = Vector.newBuilder[ChunkedMessage]
val chunksIter = ByteString(bytes).grouped(chunkSize)
var first = true
while (chunksIter.hasNext) {
val chunk = chunksIter.next()
val firstChunk = first
first = false
val lastChunk = !chunksIter.hasNext
builder += ChunkedMessage(chunk, firstChunk, lastChunk, serializerId, manifest)
}
builder.result()
}
}
}
private class ProducerControllerImpl[A: ClassTag](
@ -356,13 +389,20 @@ private class ProducerControllerImpl[A: ClassTag](
// for the durableQueue StoreMessageSent ask
private implicit val askTimeout: Timeout = settings.durableQueueRequestTimeout
private lazy val serialization = SerializationExtension(context.system)
private def active(s: State[A]): Behavior[InternalCommand] = {
def onMsg(m: A, newReplyAfterStore: Map[SeqNr, ActorRef[SeqNr]], ack: Boolean): Behavior[InternalCommand] = {
def onMsg(
seqMsg: SequencedMessage[A],
newReplyAfterStore: Map[SeqNr, ActorRef[SeqNr]],
newRemainingChunks: immutable.Seq[SequencedMessage[A]]): Behavior[InternalCommand] = {
checkOnMsgRequestedState()
if (seqMsg.isLastChunk != newRemainingChunks.isEmpty)
throw new IllegalStateException(
s"seqMsg [${seqMsg.seqNr}] was lastChunk but remaining [${newRemainingChunks.size}] chunks.")
if (traceEnabled)
context.log.trace("Sending [{}] with seqNr [{}].", m.getClass.getName, s.currentSeqNr)
val seqMsg = SequencedMessage(producerId, s.currentSeqNr, m, s.currentSeqNr == s.firstSeqNr, ack)(context.self)
context.log.trace("Sending [{}] with seqNr [{}].", seqMsg.message.getClass.getName, s.currentSeqNr)
val newUnconfirmed =
if (s.supportResend) s.unconfirmed :+ seqMsg
else Vector.empty // no resending, no need to keep unconfirmed
@ -375,18 +415,24 @@ private class ProducerControllerImpl[A: ClassTag](
val newRequested =
if (s.currentSeqNr == s.requestedSeqNr) {
flightRecorder.producerWaitingForRequest(producerId, s.currentSeqNr)
false
} else {
newRemainingChunks.nonEmpty // keep it true until lastChunk
} else if (seqMsg.isLastChunk) {
flightRecorder.producerRequestNext(producerId, s.currentSeqNr + 1, s.confirmedSeqNr)
s.producer ! RequestNext(producerId, s.currentSeqNr + 1, s.confirmedSeqNr, msgAdapter, context.self)
true
} else {
context.self ! SendChunk
true // keep it true until lastChunk
}
active(
s.copy(
requested = newRequested,
currentSeqNr = s.currentSeqNr + 1,
replyAfterStore = newReplyAfterStore,
unconfirmed = newUnconfirmed))
unconfirmed = newUnconfirmed,
remainingChunks = newRemainingChunks,
storeMessageSentInProgress = 0))
}
def checkOnMsgRequestedState(): Unit = {
@ -397,6 +443,12 @@ private class ProducerControllerImpl[A: ClassTag](
}
}
def checkReceiveMessageRemainingChunksState(): Unit = {
if (s.remainingChunks.nonEmpty)
throw new IllegalStateException(
s"Received unexpected message before sending remaining [${s.remainingChunks.size}] chunks.")
}
def receiveRequest(
newConfirmedSeqNr: SeqNr,
newRequestedSeqNr: SeqNr,
@ -434,13 +486,23 @@ private class ProducerControllerImpl[A: ClassTag](
stateAfterAck.currentSeqNr)
if (newRequestedSeqNr2 > s.requestedSeqNr) {
if (!s.requested && (newRequestedSeqNr2 - s.currentSeqNr) > 0) {
flightRecorder.producerRequestNext(producerId, s.currentSeqNr, newConfirmedSeqNr)
s.producer ! RequestNext(producerId, s.currentSeqNr, newConfirmedSeqNr, msgAdapter, context.self)
}
val newRequested =
if (s.storeMessageSentInProgress != 0) {
s.requested
} else if (s.remainingChunks.nonEmpty) {
context.self ! SendChunk
s.requested
} else if (!s.requested && (newRequestedSeqNr2 - s.currentSeqNr) > 0) {
flightRecorder.producerRequestNext(producerId, s.currentSeqNr, newConfirmedSeqNr)
s.producer ! RequestNext(producerId, s.currentSeqNr, newConfirmedSeqNr, msgAdapter, context.self)
true
} else {
s.requested
}
active(
stateAfterAck.copy(
requested = true,
requested = newRequested,
requestedSeqNr = newRequestedSeqNr2,
supportResend = supportResend,
unconfirmed = newUnconfirmed))
@ -486,29 +548,97 @@ private class ProducerControllerImpl[A: ClassTag](
s.copy(confirmedSeqNr = newMaxConfirmedSeqNr, replyAfterStore = newReplyAfterStore, unconfirmed = newUnconfirmed)
}
def receiveStoreMessageSentCompleted(seqNr: SeqNr, m: A, ack: Boolean) = {
if (seqNr != s.currentSeqNr)
throw new IllegalStateException(s"currentSeqNr [${s.currentSeqNr}] not matching stored seqNr [$seqNr]")
def receiveStoreMessageSentCompleted(seqNr: SeqNr): Behavior[InternalCommand] = {
if (seqNr == s.storeMessageSentInProgress) {
if (seqNr != s.currentSeqNr)
throw new IllegalStateException(s"currentSeqNr [${s.currentSeqNr}] not matching stored seqNr [$seqNr]")
val seqMsg = s.remainingChunks.head
if (seqNr != seqMsg.seqNr)
throw new IllegalStateException(s"seqNr [${seqMsg.seqNr}] not matching stored seqNr [$seqNr]")
s.replyAfterStore.get(seqNr).foreach { replyTo =>
if (traceEnabled)
context.log.trace("Sending confirmation reply to [{}] after storage.", seqNr)
replyTo ! seqNr
s.replyAfterStore.get(seqNr).foreach { replyTo =>
if (traceEnabled)
context.log.trace("Sending confirmation reply to [{}] after storage.", seqNr)
replyTo ! seqNr
}
val newReplyAfterStore = s.replyAfterStore - seqNr
onMsg(seqMsg, newReplyAfterStore, s.remainingChunks.tail)
} else {
context.log.debug(
"Received StoreMessageSentCompleted for seqNr [{}] but waiting for [{}]. " +
"Probably due to retry.",
seqNr,
s.storeMessageSentInProgress)
Behaviors.same
}
val newReplyAfterStore = s.replyAfterStore - seqNr
}
onMsg(m, newReplyAfterStore, ack)
def receiveStoreMessageSentFailed(f: StoreMessageSentFailed[A]): Behavior[InternalCommand] = {
if (f.messageSent.seqNr == s.storeMessageSentInProgress) {
if (f.attempt >= settings.durableQueueRetryAttempts) {
val errorMessage =
s"StoreMessageSentFailed seqNr [${f.messageSent.seqNr}] failed after [${f.attempt}] attempts, giving up."
context.log.error(errorMessage)
throw new TimeoutException(errorMessage)
} else {
context.log.warnN(
"StoreMessageSent seqNr [{}] failed, attempt [{}] of [{}], retrying.",
f.messageSent.seqNr,
f.attempt,
settings.durableQueueRetryAttempts)
// retry
if (f.messageSent.isFirstChunk) {
storeMessageSent(f.messageSent, attempt = f.attempt + 1)
Behaviors.same
} else {
// store all chunks again, because partially stored chunks are discarded by the DurableQueue
// when it's restarted
val unconfirmedReverse = s.unconfirmed.reverse
val xs = unconfirmedReverse.takeWhile(!_.isFirstChunk)
if (unconfirmedReverse.size == xs.size)
throw new IllegalStateException(s"First chunk not found in unconfirmed: ${s.unconfirmed}")
val firstChunk = unconfirmedReverse.drop(xs.size).head
val newRemainingChunks = (firstChunk +: xs.reverse) ++ s.remainingChunks
val newUnconfirmed = s.unconfirmed.dropRight(xs.size + 1)
context.log.debug(
"Store all [{}] chunks again, starting at seqNr [{}].",
newRemainingChunks.size,
firstChunk.seqNr)
if (!newRemainingChunks.head.isFirstChunk || !newRemainingChunks.last.isLastChunk)
throw new IllegalStateException(s"Wrong remainingChunks: $newRemainingChunks")
storeMessageSent(
MessageSent.fromMessageOrChunked(
firstChunk.seqNr,
firstChunk.message,
firstChunk.ack,
NoQualifier,
System.currentTimeMillis()),
attempt = f.attempt + 1)
active(
s.copy(
storeMessageSentInProgress = firstChunk.seqNr,
remainingChunks = newRemainingChunks,
unconfirmed = newUnconfirmed,
currentSeqNr = firstChunk.seqNr))
}
}
} else {
Behaviors.same
}
}
def receiveResend(fromSeqNr: SeqNr): Behavior[InternalCommand] = {
flightRecorder.producerReceivedResend(producerId, fromSeqNr)
val newUnconfirmed =
if (fromSeqNr == 0 && s.unconfirmed.nonEmpty)
s.unconfirmed.head.asFirst +: s.unconfirmed.tail
else
s.unconfirmed.dropWhile(_.seqNr < fromSeqNr)
resendUnconfirmed(newUnconfirmed)
active(s.copy(unconfirmed = newUnconfirmed))
resendUnconfirmed(s.unconfirmed.dropWhile(_.seqNr < fromSeqNr))
if (fromSeqNr == 0 && s.unconfirmed.nonEmpty) {
val newUnconfirmed = s.unconfirmed.head.asFirst +: s.unconfirmed.tail
active(s.copy(unconfirmed = newUnconfirmed))
} else
Behaviors.same
}
def resendUnconfirmed(newUnconfirmed: Vector[SequencedMessage[A]]): Unit = {
@ -545,7 +675,7 @@ private class ProducerControllerImpl[A: ClassTag](
def receiveStart(start: Start[A]): Behavior[InternalCommand] = {
ProducerControllerImpl.enforceLocalProducer(start.producer)
context.log.debug("Register new Producer [{}], currentSeqNr [{}].", start.producer, s.currentSeqNr)
if (s.requested) {
if (s.requested && s.remainingChunks.isEmpty) {
flightRecorder.producerRequestNext(producerId, s.currentSeqNr, s.confirmedSeqNr)
start.producer ! RequestNext(producerId, s.currentSeqNr, s.confirmedSeqNr, msgAdapter, context.self)
}
@ -570,32 +700,101 @@ private class ProducerControllerImpl[A: ClassTag](
active(s.copy(firstSeqNr = newFirstSeqNr, send = newSend))
}
def receiveSendChunk(): Behavior[InternalCommand] = {
if (s.remainingChunks.nonEmpty && s.remainingChunks.head.seqNr <= s.requestedSeqNr && s.storeMessageSentInProgress == 0) {
if (traceEnabled)
context.log.trace("Send next chunk seqNr [{}].", s.remainingChunks.head.seqNr)
if (durableQueue.isEmpty) {
onMsg(s.remainingChunks.head, s.replyAfterStore, s.remainingChunks.tail)
} else {
val seqMsg = s.remainingChunks.head
storeMessageSent(
MessageSent
.fromMessageOrChunked(seqMsg.seqNr, seqMsg.message, seqMsg.ack, NoQualifier, System.currentTimeMillis()),
attempt = 1)
active(s.copy(storeMessageSentInProgress = seqMsg.seqNr)) // still same s.remainingChunks
}
} else {
Behaviors.same
}
}
def chunk(m: A, ack: Boolean): immutable.Seq[SequencedMessage[A]] = {
val chunkSize = settings.chunkLargeMessagesBytes
if (chunkSize > 0) {
val chunkedMessages = createChunks(m, chunkSize, serialization)
if (traceEnabled) {
if (chunkedMessages.size == 1)
context.log.trace(
"No chunking of seqNr [{}], size [{} bytes].",
s.currentSeqNr,
chunkedMessages.head.serialized.size)
else
context.log.traceN(
"Chunked seqNr [{}] into [{}] pieces, total size [{} bytes].",
s.currentSeqNr,
chunkedMessages.size,
chunkedMessages.map(_.serialized.size).sum)
}
var i = 0
chunkedMessages.map { chunkedMessage =>
val seqNr = s.currentSeqNr + i
i += 1
SequencedMessage.fromChunked[A](
producerId,
seqNr,
chunkedMessage,
seqNr == s.firstSeqNr,
ack && chunkedMessage.lastChunk, // only the last need ack = true
context.self)
}
} else {
val seqMsg =
SequencedMessage[A](producerId, s.currentSeqNr, m, s.currentSeqNr == s.firstSeqNr, ack)(context.self)
seqMsg :: Nil
}
}
Behaviors.receiveMessage {
case MessageWithConfirmation(m: A, replyTo) =>
checkReceiveMessageRemainingChunksState()
flightRecorder.producerReceived(producerId, s.currentSeqNr)
val newReplyAfterStore = s.replyAfterStore.updated(s.currentSeqNr, replyTo)
val chunks = chunk(m, ack = true)
val newReplyAfterStore = s.replyAfterStore.updated(chunks.last.seqNr, replyTo)
if (durableQueue.isEmpty) {
onMsg(m, newReplyAfterStore, ack = true)
onMsg(chunks.head, newReplyAfterStore, chunks.tail)
} else {
val seqMsg = chunks.head
storeMessageSent(
MessageSent(s.currentSeqNr, m, ack = true, NoQualifier, System.currentTimeMillis()),
MessageSent
.fromMessageOrChunked(seqMsg.seqNr, seqMsg.message, seqMsg.ack, NoQualifier, System.currentTimeMillis()),
attempt = 1)
active(s.copy(replyAfterStore = newReplyAfterStore))
active(
s.copy(
replyAfterStore = newReplyAfterStore,
remainingChunks = chunks,
storeMessageSentInProgress = seqMsg.seqNr))
}
case Msg(m: A) =>
checkReceiveMessageRemainingChunksState()
flightRecorder.producerReceived(producerId, s.currentSeqNr)
val chunks = chunk(m, ack = false)
if (durableQueue.isEmpty) {
onMsg(m, s.replyAfterStore, ack = false)
onMsg(chunks.head, s.replyAfterStore, chunks.tail)
} else {
val seqMsg = chunks.head
storeMessageSent(
MessageSent(s.currentSeqNr, m, ack = false, NoQualifier, System.currentTimeMillis()),
MessageSent
.fromMessageOrChunked(seqMsg.seqNr, seqMsg.message, seqMsg.ack, NoQualifier, System.currentTimeMillis()),
attempt = 1)
Behaviors.same
active(s.copy(remainingChunks = chunks, storeMessageSentInProgress = seqMsg.seqNr))
}
case StoreMessageSentCompleted(MessageSent(seqNr, m: A, ack, NoQualifier, _)) =>
receiveStoreMessageSentCompleted(seqNr, m, ack)
case StoreMessageSentCompleted(sent: MessageSent[_]) =>
receiveStoreMessageSentCompleted(sent.seqNr)
case f: StoreMessageSentFailed[A] =>
receiveStoreMessageSentFailed(f)
@ -606,6 +805,9 @@ private class ProducerControllerImpl[A: ClassTag](
case Ack(newConfirmedSeqNr) =>
receiveAck(newConfirmedSeqNr)
case SendChunk =>
receiveSendChunk()
case Resend(fromSeqNr) =>
receiveResend(fromSeqNr)
@ -626,24 +828,6 @@ private class ProducerControllerImpl[A: ClassTag](
}
}
private def receiveStoreMessageSentFailed(f: StoreMessageSentFailed[A]): Behavior[InternalCommand] = {
if (f.attempt >= settings.durableQueueRetryAttempts) {
val errorMessage =
s"StoreMessageSentFailed seqNr [${f.messageSent.seqNr}] failed after [${f.attempt}] attempts, giving up."
context.log.error(errorMessage)
throw new TimeoutException(errorMessage)
} else {
context.log.warnN(
"StoreMessageSent seqNr [{}] failed, attempt [{}] of [{}], retrying.",
f.messageSent.seqNr,
f.attempt,
settings.durableQueueRetryAttempts)
// retry
storeMessageSent(f.messageSent, attempt = f.attempt + 1)
Behaviors.same
}
}
private def storeMessageSent(messageSent: MessageSent[A], attempt: Int): Unit = {
context.ask[StoreMessageSent[A], StoreMessageSentAck](
durableQueue.get,

View file

@ -278,8 +278,9 @@ private class WorkPullingProducerControllerImpl[A: ClassTag](
import WorkPullingProducerController.WorkerStats
import WorkPullingProducerControllerImpl._
private val producerControllerSettings = settings.producerControllerSettings
private val traceEnabled = context.log.isTraceEnabled
private val durableQueueAskTimeout: Timeout = settings.producerControllerSettings.durableQueueRequestTimeout
private val durableQueueAskTimeout: Timeout = producerControllerSettings.durableQueueRequestTimeout
private val workerAskTimeout: Timeout = settings.internalAskTimeout
private val workerRequestNextAdapter: ActorRef[ProducerController.RequestNext[A]] =
@ -556,7 +557,7 @@ private class WorkPullingProducerControllerImpl[A: ClassTag](
val outKey = s"$producerId-$uuid"
context.log.debug2("Registered worker [{}], with producerId [{}].", c, outKey)
val p = context.spawn(
ProducerController[A](outKey, durableQueueBehavior = None, settings.producerControllerSettings),
ProducerController[A](outKey, durableQueueBehavior = None, producerControllerSettings),
uuid,
DispatcherSelector.sameAsParent())
p ! ProducerController.Start(workerRequestNextAdapter)
@ -657,7 +658,7 @@ private class WorkPullingProducerControllerImpl[A: ClassTag](
}
private def receiveStoreMessageSentFailed(f: StoreMessageSentFailed[A]): Behavior[InternalCommand] = {
if (f.attempt >= settings.producerControllerSettings.durableQueueRetryAttempts) {
if (f.attempt >= producerControllerSettings.durableQueueRetryAttempts) {
val errorMessage =
s"StoreMessageSentFailed seqNr [${f.messageSent.seqNr}] failed after [${f.attempt}] attempts, giving up."
context.log.error(errorMessage)

View file

@ -63,6 +63,10 @@ akka.reliable-delivery {
# unconfirmed messages the ShardingConsumerController has to "wake up"
# the consumer again by resending the first unconfirmed message.
resend-first-unconfirmed-idle-timeout = 10s
# Chunked messages not implemented for sharding yet. Override to not
# propagate property from akka.reliable-delivery.producer-controller.
chunk-large-messages = off
}
consumer-controller = ${akka.reliable-delivery.consumer-controller}

View file

@ -203,6 +203,9 @@ object ShardingProducerController {
val resendFirsUnconfirmedIdleTimeout: FiniteDuration,
val producerControllerSettings: ProducerController.Settings) {
if (producerControllerSettings.chunkLargeMessagesBytes > 0)
throw new IllegalArgumentException("Chunked messages not implemented for sharding yet.")
def withBufferSize(newBufferSize: Int): Settings =
copy(bufferSize = newBufferSize)

View file

@ -279,7 +279,8 @@ private class ShardingProducerControllerImpl[A: ClassTag](
import ShardingProducerController.Start
import ShardingProducerControllerImpl._
private val durableQueueAskTimeout: Timeout = settings.producerControllerSettings.durableQueueRequestTimeout
private val producerControllerSettings = settings.producerControllerSettings
private val durableQueueAskTimeout: Timeout = producerControllerSettings.durableQueueRequestTimeout
private val entityAskTimeout: Timeout = settings.internalAskTimeout
private val traceEnabled = context.log.isTraceEnabled
@ -333,7 +334,7 @@ private class ShardingProducerControllerImpl[A: ClassTag](
region ! ShardingEnvelope(entityId, seqMsg)
}
val p = context.spawn(
ProducerController[A](outKey, durableQueueBehavior = None, settings.producerControllerSettings, send),
ProducerController[A](outKey, durableQueueBehavior = None, producerControllerSettings, send),
entityId,
DispatcherSelector.sameAsParent())
p ! ProducerController.Start(requestNextAdapter)
@ -389,7 +390,7 @@ private class ShardingProducerControllerImpl[A: ClassTag](
}
def receiveStoreMessageSentFailed(f: StoreMessageSentFailed[A]): Behavior[InternalCommand] = {
if (f.attempt >= settings.producerControllerSettings.durableQueueRetryAttempts) {
if (f.attempt >= producerControllerSettings.durableQueueRetryAttempts) {
val errorMessage =
s"StoreMessageSentFailed seqNr [${f.messageSent.seqNr}] failed after [${f.attempt}] attempts, giving up."
context.log.error(errorMessage)

View file

@ -110,7 +110,7 @@ class ReliableDeliveryShardingSpec
"illustrate sharding usage" in {
nextId()
val consumerEndProbe = createTestProbe[TestConsumer.CollectedProducerIds]()
val consumerEndProbe = createTestProbe[TestConsumer.Collected]()
val typeKey = EntityTypeKey[SequencedMessage[TestConsumer.Job]](s"TestConsumer-$idCount")
val sharding: ActorRef[ShardingEnvelope[SequencedMessage[TestConsumer.Job]]] =
ClusterSharding(system).init(Entity(typeKey)(_ =>
@ -130,7 +130,7 @@ class ReliableDeliveryShardingSpec
"illustrate sharding usage with several producers" in {
nextId()
val consumerEndProbe = createTestProbe[TestConsumer.CollectedProducerIds]()
val consumerEndProbe = createTestProbe[TestConsumer.Collected]()
val typeKey = EntityTypeKey[SequencedMessage[TestConsumer.Job]](s"TestConsumer-$idCount")
val sharding: ActorRef[ShardingEnvelope[SequencedMessage[TestConsumer.Job]]] =
ClusterSharding(system).init(Entity(typeKey)(_ =>
@ -175,7 +175,7 @@ class ReliableDeliveryShardingSpec
"reply to MessageWithConfirmation" in {
nextId()
val consumerEndProbe = createTestProbe[TestConsumer.CollectedProducerIds]()
val consumerEndProbe = createTestProbe[TestConsumer.Collected]()
val typeKey = EntityTypeKey[SequencedMessage[TestConsumer.Job]](s"TestConsumer-$idCount")
val sharding: ActorRef[ShardingEnvelope[SequencedMessage[TestConsumer.Job]]] =
ClusterSharding(system).init(Entity(typeKey)(_ =>

View file

@ -103,6 +103,28 @@ public final class ReliableDelivery {
* <code>required .Payload message = 6;</code>
*/
akka.remote.ContainerFormats.PayloadOrBuilder getMessageOrBuilder();
/**
* <code>optional bool firstChunk = 7;</code>
* @return Whether the firstChunk field is set.
*/
boolean hasFirstChunk();
/**
* <code>optional bool firstChunk = 7;</code>
* @return The firstChunk.
*/
boolean getFirstChunk();
/**
* <code>optional bool lastChunk = 8;</code>
* @return Whether the lastChunk field is set.
*/
boolean hasLastChunk();
/**
* <code>optional bool lastChunk = 8;</code>
* @return The lastChunk.
*/
boolean getLastChunk();
}
/**
* <pre>
@ -196,6 +218,16 @@ public final class ReliableDelivery {
bitField0_ |= 0x00000020;
break;
}
case 56: {
bitField0_ |= 0x00000040;
firstChunk_ = input.readBool();
break;
}
case 64: {
bitField0_ |= 0x00000080;
lastChunk_ = input.readBool();
break;
}
default: {
if (!parseUnknownField(
input, unknownFields, extensionRegistry, tag)) {
@ -393,6 +425,40 @@ public final class ReliableDelivery {
return message_ == null ? akka.remote.ContainerFormats.Payload.getDefaultInstance() : message_;
}
public static final int FIRSTCHUNK_FIELD_NUMBER = 7;
private boolean firstChunk_;
/**
* <code>optional bool firstChunk = 7;</code>
* @return Whether the firstChunk field is set.
*/
public boolean hasFirstChunk() {
return ((bitField0_ & 0x00000040) != 0);
}
/**
* <code>optional bool firstChunk = 7;</code>
* @return The firstChunk.
*/
public boolean getFirstChunk() {
return firstChunk_;
}
public static final int LASTCHUNK_FIELD_NUMBER = 8;
private boolean lastChunk_;
/**
* <code>optional bool lastChunk = 8;</code>
* @return Whether the lastChunk field is set.
*/
public boolean hasLastChunk() {
return ((bitField0_ & 0x00000080) != 0);
}
/**
* <code>optional bool lastChunk = 8;</code>
* @return The lastChunk.
*/
public boolean getLastChunk() {
return lastChunk_;
}
private byte memoizedIsInitialized = -1;
@java.lang.Override
public final boolean isInitialized() {
@ -453,6 +519,12 @@ public final class ReliableDelivery {
if (((bitField0_ & 0x00000020) != 0)) {
output.writeMessage(6, getMessage());
}
if (((bitField0_ & 0x00000040) != 0)) {
output.writeBool(7, firstChunk_);
}
if (((bitField0_ & 0x00000080) != 0)) {
output.writeBool(8, lastChunk_);
}
unknownFields.writeTo(output);
}
@ -484,6 +556,14 @@ public final class ReliableDelivery {
size += akka.protobufv3.internal.CodedOutputStream
.computeMessageSize(6, getMessage());
}
if (((bitField0_ & 0x00000040) != 0)) {
size += akka.protobufv3.internal.CodedOutputStream
.computeBoolSize(7, firstChunk_);
}
if (((bitField0_ & 0x00000080) != 0)) {
size += akka.protobufv3.internal.CodedOutputStream
.computeBoolSize(8, lastChunk_);
}
size += unknownFields.getSerializedSize();
memoizedSize = size;
return size;
@ -529,6 +609,16 @@ public final class ReliableDelivery {
if (!getMessage()
.equals(other.getMessage())) return false;
}
if (hasFirstChunk() != other.hasFirstChunk()) return false;
if (hasFirstChunk()) {
if (getFirstChunk()
!= other.getFirstChunk()) return false;
}
if (hasLastChunk() != other.hasLastChunk()) return false;
if (hasLastChunk()) {
if (getLastChunk()
!= other.getLastChunk()) return false;
}
if (!unknownFields.equals(other.unknownFields)) return false;
return true;
}
@ -567,6 +657,16 @@ public final class ReliableDelivery {
hash = (37 * hash) + MESSAGE_FIELD_NUMBER;
hash = (53 * hash) + getMessage().hashCode();
}
if (hasFirstChunk()) {
hash = (37 * hash) + FIRSTCHUNK_FIELD_NUMBER;
hash = (53 * hash) + akka.protobufv3.internal.Internal.hashBoolean(
getFirstChunk());
}
if (hasLastChunk()) {
hash = (37 * hash) + LASTCHUNK_FIELD_NUMBER;
hash = (53 * hash) + akka.protobufv3.internal.Internal.hashBoolean(
getLastChunk());
}
hash = (29 * hash) + unknownFields.hashCode();
memoizedHashCode = hash;
return hash;
@ -721,6 +821,10 @@ public final class ReliableDelivery {
messageBuilder_.clear();
}
bitField0_ = (bitField0_ & ~0x00000020);
firstChunk_ = false;
bitField0_ = (bitField0_ & ~0x00000040);
lastChunk_ = false;
bitField0_ = (bitField0_ & ~0x00000080);
return this;
}
@ -777,6 +881,14 @@ public final class ReliableDelivery {
}
to_bitField0_ |= 0x00000020;
}
if (((from_bitField0_ & 0x00000040) != 0)) {
result.firstChunk_ = firstChunk_;
to_bitField0_ |= 0x00000040;
}
if (((from_bitField0_ & 0x00000080) != 0)) {
result.lastChunk_ = lastChunk_;
to_bitField0_ |= 0x00000080;
}
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@ -848,6 +960,12 @@ public final class ReliableDelivery {
if (other.hasMessage()) {
mergeMessage(other.getMessage());
}
if (other.hasFirstChunk()) {
setFirstChunk(other.getFirstChunk());
}
if (other.hasLastChunk()) {
setLastChunk(other.getLastChunk());
}
this.mergeUnknownFields(other.unknownFields);
onChanged();
return this;
@ -1297,6 +1415,80 @@ public final class ReliableDelivery {
}
return messageBuilder_;
}
private boolean firstChunk_ ;
/**
* <code>optional bool firstChunk = 7;</code>
* @return Whether the firstChunk field is set.
*/
public boolean hasFirstChunk() {
return ((bitField0_ & 0x00000040) != 0);
}
/**
* <code>optional bool firstChunk = 7;</code>
* @return The firstChunk.
*/
public boolean getFirstChunk() {
return firstChunk_;
}
/**
* <code>optional bool firstChunk = 7;</code>
* @param value The firstChunk to set.
* @return This builder for chaining.
*/
public Builder setFirstChunk(boolean value) {
bitField0_ |= 0x00000040;
firstChunk_ = value;
onChanged();
return this;
}
/**
* <code>optional bool firstChunk = 7;</code>
* @return This builder for chaining.
*/
public Builder clearFirstChunk() {
bitField0_ = (bitField0_ & ~0x00000040);
firstChunk_ = false;
onChanged();
return this;
}
private boolean lastChunk_ ;
/**
* <code>optional bool lastChunk = 8;</code>
* @return Whether the lastChunk field is set.
*/
public boolean hasLastChunk() {
return ((bitField0_ & 0x00000080) != 0);
}
/**
* <code>optional bool lastChunk = 8;</code>
* @return The lastChunk.
*/
public boolean getLastChunk() {
return lastChunk_;
}
/**
* <code>optional bool lastChunk = 8;</code>
* @param value The lastChunk to set.
* @return This builder for chaining.
*/
public Builder setLastChunk(boolean value) {
bitField0_ |= 0x00000080;
lastChunk_ = value;
onChanged();
return this;
}
/**
* <code>optional bool lastChunk = 8;</code>
* @return This builder for chaining.
*/
public Builder clearLastChunk() {
bitField0_ = (bitField0_ & ~0x00000080);
lastChunk_ = false;
onChanged();
return this;
}
@java.lang.Override
public final Builder setUnknownFields(
final akka.protobufv3.internal.UnknownFieldSet unknownFields) {
@ -6168,6 +6360,28 @@ public final class ReliableDelivery {
* <code>required .Payload message = 5;</code>
*/
akka.remote.ContainerFormats.PayloadOrBuilder getMessageOrBuilder();
/**
* <code>optional bool firstChunk = 6;</code>
* @return Whether the firstChunk field is set.
*/
boolean hasFirstChunk();
/**
* <code>optional bool firstChunk = 6;</code>
* @return The firstChunk.
*/
boolean getFirstChunk();
/**
* <code>optional bool lastChunk = 7;</code>
* @return Whether the lastChunk field is set.
*/
boolean hasLastChunk();
/**
* <code>optional bool lastChunk = 7;</code>
* @return The lastChunk.
*/
boolean getLastChunk();
}
/**
* <pre>
@ -6254,6 +6468,16 @@ public final class ReliableDelivery {
bitField0_ |= 0x00000010;
break;
}
case 48: {
bitField0_ |= 0x00000020;
firstChunk_ = input.readBool();
break;
}
case 56: {
bitField0_ |= 0x00000040;
lastChunk_ = input.readBool();
break;
}
default: {
if (!parseUnknownField(
input, unknownFields, extensionRegistry, tag)) {
@ -6406,6 +6630,40 @@ public final class ReliableDelivery {
return message_ == null ? akka.remote.ContainerFormats.Payload.getDefaultInstance() : message_;
}
public static final int FIRSTCHUNK_FIELD_NUMBER = 6;
private boolean firstChunk_;
/**
* <code>optional bool firstChunk = 6;</code>
* @return Whether the firstChunk field is set.
*/
public boolean hasFirstChunk() {
return ((bitField0_ & 0x00000020) != 0);
}
/**
* <code>optional bool firstChunk = 6;</code>
* @return The firstChunk.
*/
public boolean getFirstChunk() {
return firstChunk_;
}
public static final int LASTCHUNK_FIELD_NUMBER = 7;
private boolean lastChunk_;
/**
* <code>optional bool lastChunk = 7;</code>
* @return Whether the lastChunk field is set.
*/
public boolean hasLastChunk() {
return ((bitField0_ & 0x00000040) != 0);
}
/**
* <code>optional bool lastChunk = 7;</code>
* @return The lastChunk.
*/
public boolean getLastChunk() {
return lastChunk_;
}
private byte memoizedIsInitialized = -1;
@java.lang.Override
public final boolean isInitialized() {
@ -6459,6 +6717,12 @@ public final class ReliableDelivery {
if (((bitField0_ & 0x00000010) != 0)) {
output.writeMessage(5, getMessage());
}
if (((bitField0_ & 0x00000020) != 0)) {
output.writeBool(6, firstChunk_);
}
if (((bitField0_ & 0x00000040) != 0)) {
output.writeBool(7, lastChunk_);
}
unknownFields.writeTo(output);
}
@ -6487,6 +6751,14 @@ public final class ReliableDelivery {
size += akka.protobufv3.internal.CodedOutputStream
.computeMessageSize(5, getMessage());
}
if (((bitField0_ & 0x00000020) != 0)) {
size += akka.protobufv3.internal.CodedOutputStream
.computeBoolSize(6, firstChunk_);
}
if (((bitField0_ & 0x00000040) != 0)) {
size += akka.protobufv3.internal.CodedOutputStream
.computeBoolSize(7, lastChunk_);
}
size += unknownFields.getSerializedSize();
memoizedSize = size;
return size;
@ -6527,6 +6799,16 @@ public final class ReliableDelivery {
if (!getMessage()
.equals(other.getMessage())) return false;
}
if (hasFirstChunk() != other.hasFirstChunk()) return false;
if (hasFirstChunk()) {
if (getFirstChunk()
!= other.getFirstChunk()) return false;
}
if (hasLastChunk() != other.hasLastChunk()) return false;
if (hasLastChunk()) {
if (getLastChunk()
!= other.getLastChunk()) return false;
}
if (!unknownFields.equals(other.unknownFields)) return false;
return true;
}
@ -6561,6 +6843,16 @@ public final class ReliableDelivery {
hash = (37 * hash) + MESSAGE_FIELD_NUMBER;
hash = (53 * hash) + getMessage().hashCode();
}
if (hasFirstChunk()) {
hash = (37 * hash) + FIRSTCHUNK_FIELD_NUMBER;
hash = (53 * hash) + akka.protobufv3.internal.Internal.hashBoolean(
getFirstChunk());
}
if (hasLastChunk()) {
hash = (37 * hash) + LASTCHUNK_FIELD_NUMBER;
hash = (53 * hash) + akka.protobufv3.internal.Internal.hashBoolean(
getLastChunk());
}
hash = (29 * hash) + unknownFields.hashCode();
memoizedHashCode = hash;
return hash;
@ -6713,6 +7005,10 @@ public final class ReliableDelivery {
messageBuilder_.clear();
}
bitField0_ = (bitField0_ & ~0x00000010);
firstChunk_ = false;
bitField0_ = (bitField0_ & ~0x00000020);
lastChunk_ = false;
bitField0_ = (bitField0_ & ~0x00000040);
return this;
}
@ -6765,6 +7061,14 @@ public final class ReliableDelivery {
}
to_bitField0_ |= 0x00000010;
}
if (((from_bitField0_ & 0x00000020) != 0)) {
result.firstChunk_ = firstChunk_;
to_bitField0_ |= 0x00000020;
}
if (((from_bitField0_ & 0x00000040) != 0)) {
result.lastChunk_ = lastChunk_;
to_bitField0_ |= 0x00000040;
}
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@ -6831,6 +7135,12 @@ public final class ReliableDelivery {
if (other.hasMessage()) {
mergeMessage(other.getMessage());
}
if (other.hasFirstChunk()) {
setFirstChunk(other.getFirstChunk());
}
if (other.hasLastChunk()) {
setLastChunk(other.getLastChunk());
}
this.mergeUnknownFields(other.unknownFields);
onChanged();
return this;
@ -7193,6 +7503,80 @@ public final class ReliableDelivery {
}
return messageBuilder_;
}
private boolean firstChunk_ ;
/**
* <code>optional bool firstChunk = 6;</code>
* @return Whether the firstChunk field is set.
*/
public boolean hasFirstChunk() {
return ((bitField0_ & 0x00000020) != 0);
}
/**
* <code>optional bool firstChunk = 6;</code>
* @return The firstChunk.
*/
public boolean getFirstChunk() {
return firstChunk_;
}
/**
* <code>optional bool firstChunk = 6;</code>
* @param value The firstChunk to set.
* @return This builder for chaining.
*/
public Builder setFirstChunk(boolean value) {
bitField0_ |= 0x00000020;
firstChunk_ = value;
onChanged();
return this;
}
/**
* <code>optional bool firstChunk = 6;</code>
* @return This builder for chaining.
*/
public Builder clearFirstChunk() {
bitField0_ = (bitField0_ & ~0x00000020);
firstChunk_ = false;
onChanged();
return this;
}
private boolean lastChunk_ ;
/**
* <code>optional bool lastChunk = 7;</code>
* @return Whether the lastChunk field is set.
*/
public boolean hasLastChunk() {
return ((bitField0_ & 0x00000040) != 0);
}
/**
* <code>optional bool lastChunk = 7;</code>
* @return The lastChunk.
*/
public boolean getLastChunk() {
return lastChunk_;
}
/**
* <code>optional bool lastChunk = 7;</code>
* @param value The lastChunk to set.
* @return This builder for chaining.
*/
public Builder setLastChunk(boolean value) {
bitField0_ |= 0x00000040;
lastChunk_ = value;
onChanged();
return this;
}
/**
* <code>optional bool lastChunk = 7;</code>
* @return This builder for chaining.
*/
public Builder clearLastChunk() {
bitField0_ = (bitField0_ & ~0x00000040);
lastChunk_ = false;
onChanged();
return this;
}
@java.lang.Override
public final Builder setUnknownFields(
final akka.protobufv3.internal.UnknownFieldSet unknownFields) {
@ -7946,27 +8330,29 @@ public final class ReliableDelivery {
static {
java.lang.String[] descriptorData = {
"\n\026ReliableDelivery.proto\022\033akka.cluster.t" +
"yped.delivery\032\026ContainerFormats.proto\"\213\001" +
"yped.delivery\032\026ContainerFormats.proto\"\262\001" +
"\n\020SequencedMessage\022\022\n\nproducerId\030\001 \002(\t\022\r" +
"\n\005seqNr\030\002 \002(\003\022\r\n\005first\030\003 \002(\010\022\013\n\003ack\030\004 \002(" +
"\010\022\035\n\025producerControllerRef\030\005 \002(\t\022\031\n\007mess" +
"age\030\006 \002(\0132\010.Payload\"1\n\020RegisterConsumer\022" +
"\035\n\025consumerControllerRef\030\001 \002(\t\"f\n\007Reques" +
"t\022\026\n\016confirmedSeqNr\030\001 \002(\003\022\030\n\020requestUpTo" +
"SeqNr\030\002 \002(\003\022\025\n\rsupportResend\030\003 \002(\010\022\022\n\nvi" +
"aTimeout\030\004 \002(\010\"\033\n\006Resend\022\021\n\tfromSeqNr\030\001 " +
"\002(\003\"\035\n\003Ack\022\026\n\016confirmedSeqNr\030\001 \002(\003\"\266\001\n\005S" +
"tate\022\024\n\014currentSeqNr\030\001 \002(\003\022\035\n\025highestCon" +
"firmedSeqNr\030\002 \002(\003\0229\n\tconfirmed\030\003 \003(\0132&.a" +
"kka.cluster.typed.delivery.Confirmed\022=\n\013" +
"unconfirmed\030\004 \003(\0132(.akka.cluster.typed.d" +
"elivery.MessageSent\"@\n\tConfirmed\022\r\n\005seqN" +
"r\030\001 \002(\003\022\021\n\tqualifier\030\002 \002(\t\022\021\n\ttimestamp\030" +
"\003 \002(\003\"j\n\013MessageSent\022\r\n\005seqNr\030\001 \002(\003\022\021\n\tq" +
"age\030\006 \002(\0132\010.Payload\022\022\n\nfirstChunk\030\007 \001(\010\022" +
"\021\n\tlastChunk\030\010 \001(\010\"1\n\020RegisterConsumer\022\035" +
"\n\025consumerControllerRef\030\001 \002(\t\"f\n\007Request" +
"\022\026\n\016confirmedSeqNr\030\001 \002(\003\022\030\n\020requestUpToS" +
"eqNr\030\002 \002(\003\022\025\n\rsupportResend\030\003 \002(\010\022\022\n\nvia" +
"Timeout\030\004 \002(\010\"\033\n\006Resend\022\021\n\tfromSeqNr\030\001 \002" +
"(\003\"\035\n\003Ack\022\026\n\016confirmedSeqNr\030\001 \002(\003\"\266\001\n\005St" +
"ate\022\024\n\014currentSeqNr\030\001 \002(\003\022\035\n\025highestConf" +
"irmedSeqNr\030\002 \002(\003\0229\n\tconfirmed\030\003 \003(\0132&.ak" +
"ka.cluster.typed.delivery.Confirmed\022=\n\013u" +
"nconfirmed\030\004 \003(\0132(.akka.cluster.typed.de" +
"livery.MessageSent\"@\n\tConfirmed\022\r\n\005seqNr" +
"\030\001 \002(\003\022\021\n\tqualifier\030\002 \002(\t\022\021\n\ttimestamp\030\003" +
" \002(\003\"\221\001\n\013MessageSent\022\r\n\005seqNr\030\001 \002(\003\022\021\n\tq" +
"ualifier\030\002 \002(\t\022\013\n\003ack\030\003 \002(\010\022\021\n\ttimestamp" +
"\030\004 \002(\003\022\031\n\007message\030\005 \002(\0132\010.Payload\"\035\n\007Cle" +
"anup\022\022\n\nqualifiers\030\001 \003(\tB(\n$akka.cluster" +
".typed.internal.protobufH\001"
"\030\004 \002(\003\022\031\n\007message\030\005 \002(\0132\010.Payload\022\022\n\nfir" +
"stChunk\030\006 \001(\010\022\021\n\tlastChunk\030\007 \001(\010\"\035\n\007Clea" +
"nup\022\022\n\nqualifiers\030\001 \003(\tB(\n$akka.cluster." +
"typed.internal.protobufH\001"
};
descriptor = akka.protobufv3.internal.Descriptors.FileDescriptor
.internalBuildGeneratedFileFrom(descriptorData,
@ -7978,7 +8364,7 @@ public final class ReliableDelivery {
internal_static_akka_cluster_typed_delivery_SequencedMessage_fieldAccessorTable = new
akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable(
internal_static_akka_cluster_typed_delivery_SequencedMessage_descriptor,
new java.lang.String[] { "ProducerId", "SeqNr", "First", "Ack", "ProducerControllerRef", "Message", });
new java.lang.String[] { "ProducerId", "SeqNr", "First", "Ack", "ProducerControllerRef", "Message", "FirstChunk", "LastChunk", });
internal_static_akka_cluster_typed_delivery_RegisterConsumer_descriptor =
getDescriptor().getMessageTypes().get(1);
internal_static_akka_cluster_typed_delivery_RegisterConsumer_fieldAccessorTable = new
@ -8020,7 +8406,7 @@ public final class ReliableDelivery {
internal_static_akka_cluster_typed_delivery_MessageSent_fieldAccessorTable = new
akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable(
internal_static_akka_cluster_typed_delivery_MessageSent_descriptor,
new java.lang.String[] { "SeqNr", "Qualifier", "Ack", "Timestamp", "Message", });
new java.lang.String[] { "SeqNr", "Qualifier", "Ack", "Timestamp", "Message", "FirstChunk", "LastChunk", });
internal_static_akka_cluster_typed_delivery_Cleanup_descriptor =
getDescriptor().getMessageTypes().get(8);
internal_static_akka_cluster_typed_delivery_Cleanup_fieldAccessorTable = new

View file

@ -0,0 +1,2 @@
# #24276 Chunked messages in Reliable Delivery
ProblemFilters.exclude[Problem]("akka.cluster.typed.internal.protobuf.ReliableDelivery*")

View file

@ -18,6 +18,8 @@ message SequencedMessage {
required bool ack = 4;
required string producerControllerRef = 5;
required Payload message = 6;
optional bool firstChunk = 7;
optional bool lastChunk = 8;
}
// ProducerController
@ -65,6 +67,8 @@ message MessageSent {
required bool ack = 3;
required int64 timestamp = 4;
required Payload message = 5;
optional bool firstChunk = 6;
optional bool lastChunk = 7;
}
// DurableProducerQueue

View file

@ -10,15 +10,19 @@ import akka.actor.typed.ActorRefResolver
import akka.actor.typed.delivery.ConsumerController
import akka.actor.typed.delivery.DurableProducerQueue
import akka.actor.typed.delivery.ProducerController
import akka.actor.typed.delivery.internal.ChunkedMessage
import akka.actor.typed.delivery.internal.ProducerControllerImpl
import akka.actor.typed.scaladsl.adapter._
import akka.annotation.InternalApi
import akka.cluster.typed.internal.protobuf.ReliableDelivery
import akka.cluster.typed.internal.protobuf.ReliableDelivery.Confirmed
import akka.remote.ContainerFormats
import akka.remote.ContainerFormats.Payload
import akka.remote.serialization.WrappedPayloadSupport
import akka.serialization.BaseSerializer
import akka.serialization.SerializerWithStringManifest
import akka.util.ccompat.JavaConverters._
import akka.protobufv3.internal.ByteString
/**
* INTERNAL API
@ -78,10 +82,27 @@ import akka.util.ccompat.JavaConverters._
b.setFirst(m.first)
b.setAck(m.ack)
b.setProducerControllerRef(resolver.toSerializationFormat(m.producerController))
b.setMessage(payloadSupport.payloadBuilder(m.message))
m.message match {
case chunk: ChunkedMessage =>
b.setMessage(chunkedMessageToProto(chunk))
b.setFirstChunk(chunk.firstChunk)
b.setLastChunk(chunk.lastChunk)
case _ =>
b.setMessage(payloadSupport.payloadBuilder(m.message))
}
b.build().toByteArray()
}
private def chunkedMessageToProto(chunk: ChunkedMessage): Payload.Builder = {
val payloadBuilder = ContainerFormats.Payload.newBuilder()
payloadBuilder.setEnclosedMessage(ByteString.copyFrom(chunk.serialized.toArray))
payloadBuilder.setMessageManifest(ByteString.copyFromUtf8(chunk.manifest))
payloadBuilder.setSerializerId(chunk.serializerId)
payloadBuilder
}
private def ackToBinary(m: ProducerControllerImpl.Ack): Array[Byte] = {
val b = ReliableDelivery.Ack.newBuilder()
b.setConfirmedSeqNr(m.confirmedSeqNr)
@ -119,7 +140,16 @@ import akka.util.ccompat.JavaConverters._
b.setQualifier(m.confirmationQualifier)
b.setAck(m.ack)
b.setTimestamp(m.timestampMillis)
b.setMessage(payloadSupport.payloadBuilder(m.message))
m.message match {
case chunk: ChunkedMessage =>
b.setMessage(chunkedMessageToProto(chunk))
b.setFirstChunk(chunk.firstChunk)
b.setLastChunk(chunk.lastChunk)
case _ =>
b.setMessage(payloadSupport.payloadBuilder(m.message))
}
b.build()
}
@ -172,7 +202,19 @@ import akka.util.ccompat.JavaConverters._
private def sequencedMessageFromBinary(bytes: Array[Byte]): AnyRef = {
val seqMsg = ReliableDelivery.SequencedMessage.parseFrom(bytes)
val wrappedMsg = payloadSupport.deserializePayload(seqMsg.getMessage)
val wrappedMsg =
if (seqMsg.hasFirstChunk) {
val manifest =
if (seqMsg.getMessage.hasMessageManifest) seqMsg.getMessage.getMessageManifest.toStringUtf8 else ""
ChunkedMessage(
akka.util.ByteString(seqMsg.getMessage.getEnclosedMessage.toByteArray),
seqMsg.getFirstChunk,
seqMsg.getLastChunk,
seqMsg.getMessage.getSerializerId,
manifest)
} else {
payloadSupport.deserializePayload(seqMsg.getMessage)
}
ConsumerController.SequencedMessage(
seqMsg.getProducerId,
seqMsg.getSeqNr,
@ -213,7 +255,19 @@ import akka.util.ccompat.JavaConverters._
private def durableQueueMessageSentFromProto(
sent: ReliableDelivery.MessageSent): DurableProducerQueue.MessageSent[Any] = {
val wrappedMsg = payloadSupport.deserializePayload(sent.getMessage)
val wrappedMsg =
if (sent.hasFirstChunk) {
val manifest =
if (sent.getMessage.hasMessageManifest) sent.getMessage.getMessageManifest.toStringUtf8 else ""
ChunkedMessage(
akka.util.ByteString(sent.getMessage.getEnclosedMessage.toByteArray),
sent.getFirstChunk,
sent.getLastChunk,
sent.getMessage.getSerializerId,
manifest)
} else {
payloadSupport.deserializePayload(sent.getMessage)
}
DurableProducerQueue.MessageSent(sent.getSeqNr, wrappedMsg, sent.getAck, sent.getQualifier, sent.getTimestamp)
}

View file

@ -0,0 +1,217 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.typed
import scala.concurrent.duration._
import scala.util.Random
import com.typesafe.config.ConfigFactory
import org.HdrHistogram.Histogram
import akka.actor.ActorIdentity
import akka.actor.Identify
import akka.actor.testkit.typed.scaladsl.TestProbe
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.actor.typed.PostStop
import akka.actor.typed.delivery.ConsumerController
import akka.actor.typed.delivery.ProducerController
import akka.actor.typed.scaladsl.Behaviors
import akka.cluster.MultiNodeClusterSpec
import akka.remote.testconductor.RoleName
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.serialization.jackson.CborSerializable
object ChunkLargeMessageSpec extends MultiNodeConfig {
val first = role("first")
val second = role("second")
commonConfig(ConfigFactory.parseString("""
akka.loglevel = INFO
#akka.serialization.jackson.verbose-debug-logging = on
akka.remote.artery {
advanced.inbound-lanes = 1
advanced.maximum-frame-size = 2 MB
}
""").withFallback(MultiNodeClusterSpec.clusterConfig))
object Producer {
sealed trait Command
case object Stop extends Command
final case class Reply(timestamp: Long) extends Command with CborSerializable
private case class WrappedRequestNext(r: ProducerController.RequestNext[Consumer.TheMessage]) extends Command
private case class SendNext(to: ActorRef[Consumer.TheMessage]) extends Command
def apply(
numberOfMessages: Int,
large: Boolean,
delay: FiniteDuration,
producerController: ActorRef[ProducerController.Command[Consumer.TheMessage]]): Behavior[Command] = {
Behaviors.setup { context =>
val requestNextAdapter =
context.messageAdapter[ProducerController.RequestNext[Consumer.TheMessage]](WrappedRequestNext(_))
producerController ! ProducerController.Start(requestNextAdapter)
val histogram = new Histogram(SECONDS.toNanos(10), 3)
def percentile(p: Double): Double = histogram.getValueAtPercentile(p) / 1000.0
val rnd = new Random
val elements = if (large) Vector.fill(500)(rnd.nextString(1000)) else Vector("a")
if (numberOfMessages == 0)
Behaviors.stopped
else
Behaviors
.receive[Command] { (context, msg) =>
msg match {
case WrappedRequestNext(next) =>
if (delay > Duration.Zero)
context.scheduleOnce(delay, context.self, SendNext(next.sendNextTo))
else
next.sendNextTo ! Consumer.TheMessage(System.nanoTime(), context.self, Vector("a"))
Behaviors.same
case SendNext(to) =>
to ! Consumer.TheMessage(System.nanoTime(), context.self, elements)
Behaviors.same
case Reply(timestamp) =>
histogram.recordValue(System.nanoTime() - timestamp)
if (histogram.getTotalCount == numberOfMessages)
Behaviors.stopped
else
Behaviors.same
case Stop =>
Behaviors.stopped
}
}
.receiveSignal {
case (context, PostStop) =>
if (histogram.getTotalCount > 0) {
context.log.info(
s"=== Latency for [${context.self.path.name}] " +
f"50%%ile: ${percentile(50.0)}%.0f µs, " +
f"90%%ile: ${percentile(90.0)}%.0f µs, " +
f"99%%ile: ${percentile(99.0)}%.0f µs")
println(s"Histogram for [${context.self.path.name}] of RTT latencies in microseconds.")
histogram.outputPercentileDistribution(System.out, 1000.0)
}
Behaviors.same
}
}
}
}
object Consumer {
sealed trait Command
final case class TheMessage(sendTimstamp: Long, replyTo: ActorRef[Producer.Reply], elements: Vector[String])
extends CborSerializable {
override def toString: String = s"TheMessage($sendTimstamp,$replyTo,${elements.size})"
}
private final case class WrappedDelivery(d: ConsumerController.Delivery[TheMessage]) extends Command
case object Stop extends Command
def apply(consumerController: ActorRef[ConsumerController.Start[TheMessage]]): Behavior[Command] = {
Behaviors.setup { context =>
val deliveryAdapter =
context.messageAdapter[ConsumerController.Delivery[TheMessage]](WrappedDelivery(_))
consumerController ! ConsumerController.Start(deliveryAdapter)
Behaviors.receiveMessage {
case WrappedDelivery(d) =>
d.message.replyTo ! Producer.Reply(d.message.sendTimstamp)
d.confirmTo ! ConsumerController.Confirmed
Behaviors.same
case Stop =>
Behaviors.stopped
}
}
}
}
}
class ChunkLargeMessageMultiJvmNode1 extends ChunkLargeMessageSpec
class ChunkLargeMessageMultiJvmNode2 extends ChunkLargeMessageSpec
abstract class ChunkLargeMessageSpec extends MultiNodeSpec(ChunkLargeMessageSpec) with MultiNodeTypedClusterSpec {
import ChunkLargeMessageSpec._
// TODO move this to MultiNodeTypedClusterSpec
def identify[A](name: String, r: RoleName): ActorRef[A] = {
import akka.actor.typed.scaladsl.adapter._
val sel = system.actorSelection(node(r) / "user" / "testSpawn" / name)
sel.tell(Identify(None), testActor)
expectMsgType[ActorIdentity](10.seconds).ref.get.toTyped
}
private def test(n: Int, numberOfMessages: Int, includeLarge: Boolean): Unit = {
runOn(first) {
val producerController = spawn(ProducerController[Consumer.TheMessage](s"p$n", None), s"producerController$n")
val producer =
spawn(Producer(numberOfMessages, large = false, delay = 10.millis, producerController), s"producer$n")
val largeProducerController =
spawn(
ProducerController[Consumer.TheMessage](
s"p$n",
None,
ProducerController.Settings(typedSystem).withChunkLargeMessagesBytes(50000)),
s"largeProducerController$n")
val largeProducer =
spawn(
Producer(if (includeLarge) Int.MaxValue else 0, large = true, delay = 10.millis, largeProducerController),
s"largeProducer$n")
enterBarrier(s"producer$n-started")
val probe = TestProbe[Any]()
probe.expectTerminated(producer, 25.seconds)
largeProducer ! Producer.Stop
enterBarrier(s"producer$n-stopped")
}
runOn(second) {
enterBarrier(s"producer$n-started")
val consumerController = spawn(ConsumerController[Consumer.TheMessage](), s"consumerController$n")
val consumer = spawn(Consumer(consumerController), s"consumer$n")
val largeConsumerController = spawn(ConsumerController[Consumer.TheMessage](), s"largeConsumerController$n")
val largeConsumer = spawn(Consumer(largeConsumerController), s"largeConsumer$n")
val producerController: ActorRef[ProducerController.Command[Consumer.TheMessage]] =
identify(s"producerController$n", first)
consumerController ! ConsumerController.RegisterToProducerController(producerController)
val largeProducerController: ActorRef[ProducerController.Command[Consumer.TheMessage]] =
identify(s"largeProducerController$n", first)
largeConsumerController ! ConsumerController.RegisterToProducerController(largeProducerController)
enterBarrier(s"producer$n-stopped")
consumer ! Consumer.Stop
largeConsumer ! Consumer.Stop
}
enterBarrier(s"after-$n")
}
"Reliable delivery with chunked messages" must {
"form a cluster" in {
formCluster(first, second)
enterBarrier("cluster started")
}
"warmup" in {
test(1, 100, includeLarge = true)
}
"measure latency without large messages" in {
test(2, 250, includeLarge = false)
}
"measure latency with large messages" in {
test(3, 250, includeLarge = true)
}
}
}

View file

@ -14,7 +14,6 @@ import scala.language.implicitConversions
import org.scalatest.Suite
import org.scalatest.matchers.should.Matchers
import akka.actor.{ Address, Scheduler }
import akka.actor.typed.ActorRef
import akka.actor.typed.ActorSystem
import akka.actor.typed.Behavior
@ -22,9 +21,13 @@ import akka.actor.typed.Props
import akka.actor.typed.SpawnProtocol
import akka.actor.typed.scaladsl.AskPattern._
import akka.actor.typed.scaladsl.adapter._
import akka.cluster.{ ClusterEvent, MemberStatus }
import akka.actor.Address
import akka.actor.Scheduler
import akka.cluster.ClusterEvent
import akka.cluster.MemberStatus
import akka.remote.testconductor.RoleName
import akka.remote.testkit.{ MultiNodeSpec, STMultiNodeSpec }
import akka.remote.testkit.MultiNodeSpec
import akka.remote.testkit.STMultiNodeSpec
import akka.testkit.WatchedByCoroner
import akka.util.Timeout
@ -84,7 +87,8 @@ trait MultiNodeTypedClusterSpec extends Suite with STMultiNodeSpec with WatchedB
enterBarrier("all-joined")
}
private lazy val spawnActor = system.actorOf(PropsAdapter(SpawnProtocol())).toTyped[SpawnProtocol.Command]
private lazy val spawnActor =
system.actorOf(PropsAdapter(SpawnProtocol()), "testSpawn").toTyped[SpawnProtocol.Command]
def spawn[T](behavior: Behavior[T], name: String): ActorRef[T] = {
implicit val timeout: Timeout = testKitSettings.DefaultTimeout
val f: Future[ActorRef[T]] = spawnActor.ask(SpawnProtocol.Spawn(behavior, name, Props.empty, _))

View file

@ -12,10 +12,12 @@ import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.actor.typed.delivery.ConsumerController
import akka.actor.typed.delivery.DurableProducerQueue
import akka.actor.typed.delivery.ProducerController
import akka.actor.typed.delivery.internal.ChunkedMessage
import akka.actor.typed.delivery.internal.ProducerControllerImpl
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.adapter._
import akka.serialization.SerializationExtension
import akka.util.ByteString
class ReliableDeliverySerializerSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike with LogCapturing {
@ -53,7 +55,17 @@ class ReliableDeliverySerializerSpec extends ScalaTestWithActorTestKit with AnyW
Vector(
DurableProducerQueue.MessageSent(15L, "msg15", true, "q4", timestamp),
DurableProducerQueue.MessageSent(16L, "msg16", true, "q4", timestamp))),
"DurableProducerQueue.Cleanup" -> DurableProducerQueue.Cleanup(Set("q1", "q2", "q3"))).foreach {
"DurableProducerQueue.Cleanup" -> DurableProducerQueue.Cleanup(Set("q1", "q2", "q3")),
"SequencedMessage-chunked-1" -> ConsumerController.SequencedMessage
.fromChunked("prod-1", 1L, ChunkedMessage(ByteString.fromString("abc"), true, true, 20, ""), true, true, ref),
"SequencedMessage-chunked-2" -> ConsumerController.SequencedMessage
.fromChunked("prod-1", 1L, ChunkedMessage(ByteString(1, 2, 3), true, false, 123456, "A"), false, false, ref),
"DurableProducerQueue.MessageSent-chunked" -> DurableProducerQueue.MessageSent.fromChunked(
3L,
ChunkedMessage(ByteString.fromString("abc"), true, true, 20, ""),
false,
"",
timestamp)).foreach {
case (scenario, item) =>
s"resolve serializer for $scenario" in {
val serializer = SerializationExtension(classicSystem)

View file

@ -664,7 +664,7 @@ See `inbound-lanes` and `outbound-lanes` in the @ref:[reference configuration](g
All the communication between user defined remote actors are isolated from the channel of Akka internal messages so
a large user message cannot block an urgent system message. While this provides good isolation for Akka services, all
user communications by default happen through a shared network connection (an Aeron stream). When some actors
user communications by default happen through a shared network connection. When some actors
send large messages this can cause other messages to suffer higher latency as they need to wait until the full
message has been transported on the shared channel (and hence, shared bottleneck). In these cases it is usually
helpful to separate actors that have different QoS requirements: large messages vs. low latency.
@ -695,6 +695,11 @@ This means that all messages sent to the following actors will pass through the
Messages destined for actors not matching any of these patterns are sent using the default channel as before.
The large messages channel can still not be used for extremely large messages, a few MB per message at most.
An alternative is to use the @ref:[Reliable delivery](typed/reliable-delivery.md) that has support for
automatically @ref[splitting up large messages](typed/reliable-delivery.md#chunk-large-messages) and assemble
them again on the receiving side.
### External, shared Aeron media driver
The Aeron transport is running in a so called [media driver](https://github.com/real-logic/Aeron/wiki/Media-Driver-Operation).

View file

@ -50,6 +50,10 @@ There are 3 supported patterns, which are described in the following sections:
* @ref:[Work pulling](#work-pulling)
* @ref:[Sharding](#sharding)
The Point-to-Point pattern has support for automatically @ref:[splitting up large messages](#chunk-large-messages)
and assemble them again on the consumer side. This feature is useful for avoiding head of line blocking from
serialization and transfer of large messages.
## Point-to-point
This pattern implements point-to-point reliable delivery between a single producer actor sending messages and a single consumer actor
@ -410,6 +414,25 @@ This can be more efficient since messages don't have to be kept in memory in the
they have been confirmed, but the drawback is that lost messages will not be delivered. See configuration
`only-flow-control` of the `ConsumerController`.
## Chunk large messages
To avoid head of line blocking from serialization and transfer of large messages the @ref:[Point-to-Point](#point-to-point)
pattern has support for automatically @ref:[splitting up large messages](#chunk-large-messages) and assemble them
again on the consumer side.
Serialization and deserialization is performed by the `ProducerController` and `ConsumerController` respectively
instead of in the remote transport layer.
This is enabled by configuration `akka.reliable-delivery.producer-controller.chunk-large-messages` and defines
the maximum size in bytes of the chunked pieces. Messages smaller than the configured size are not chunked, but
serialization still takes place in the `ProducerController` and `ConsumerController`.
Aside from the configuration the API is the same as the @ref:[Point-to-point](#point-to-point) pattern. If
@ref:[Durable producer](#durable-producer) is enabled the chunked pieces are stored rather than the full large
message.
This feature is not implemented for @ref:[Work pulling](#work-pulling) and @ref:[Sharding](#sharding) yet.
## Configuration
There are several configuration properties, please refer to `akka.reliable-delivery` config section in the

View file

@ -5,8 +5,6 @@
package docs.persistence
import java.io.NotSerializableException
import scala.language.reflectiveCalls
import java.nio.charset.Charset
import akka.actor.ActorSystem
@ -16,8 +14,8 @@ import akka.testkit.TestKit
import com.typesafe.config._
import org.scalatest.wordspec.AnyWordSpec
import spray.json.JsObject
import scala.concurrent.duration._
import docs.persistence.proto.FlightAppModels
class PersistenceSchemaEvolutionDocSpec extends AnyWordSpec {
@ -284,7 +282,7 @@ class RemovedEventsAwareSerializer extends SerializerWithStringManifest {
case m if SkipEventManifestsEvents.contains(m) =>
EventDeserializationSkipped
case other => new String(bytes, utf8)
case _ => new String(bytes, utf8)
}
}
//#string-serializer-skip-deleved-event-by-manifest

View file

@ -221,77 +221,103 @@ private class EventSourcedProducerQueue[A](
import DurableProducerQueue._
private val traceEnabled = context.log.isTraceEnabled
// transient
private var initialCleanupDone = false
def onCommand(state: State[A], command: Command[A]): Effect[Event, State[A]] = {
command match {
case StoreMessageSent(sent, replyTo) =>
if (sent.seqNr == state.currentSeqNr) {
def onCommand(state: State[A], command: Command[A]): Effect[Event, State[A]] =
if (initialCleanupDone) {
command match {
case StoreMessageSent(sent, replyTo) =>
val currentSeqNr = state.currentSeqNr
if (sent.seqNr == currentSeqNr) {
if (traceEnabled)
context.log.trace(
"StoreMessageSent seqNr [{}], confirmationQualifier [{}]",
sent.seqNr,
sent.confirmationQualifier)
Effect.persist(sent).thenReply(replyTo)(_ => StoreMessageSentAck(sent.seqNr))
} else if (sent.seqNr == currentSeqNr - 1) {
// already stored, could be a retry after timeout
context.log.debug("Duplicate seqNr [{}], currentSeqNr [{}]", sent.seqNr, currentSeqNr)
Effect.reply(replyTo)(StoreMessageSentAck(sent.seqNr))
} else {
// may happen after failure
context.log.debug("Ignoring unexpected seqNr [{}], currentSeqNr [{}]", sent.seqNr, currentSeqNr)
Effect.unhandled // no reply, request will timeout
}
case StoreMessageConfirmed(seqNr, confirmationQualifier, timestampMillis) =>
if (traceEnabled)
context.log.trace(
"StoreMessageSent seqNr [{}], confirmationQualifier [{}]",
sent.seqNr,
sent.confirmationQualifier)
Effect.persist(sent).thenReply(replyTo)(_ => StoreMessageSentAck(sent.seqNr))
} else if (sent.seqNr == state.currentSeqNr - 1) {
// already stored, could be a retry after timout
context.log.debug("Duplicate seqNr [{}], currentSeqNr [{}]", sent.seqNr, state.currentSeqNr)
Effect.reply(replyTo)(StoreMessageSentAck(sent.seqNr))
} else {
// may happen after failure
context.log.debug("Ignoring unexpected seqNr [{}], currentSeqNr [{}]", sent.seqNr, state.currentSeqNr)
Effect.unhandled // no reply, request will timeout
}
"StoreMessageConfirmed seqNr [{}], confirmationQualifier [{}]",
seqNr,
confirmationQualifier)
val previousConfirmedSeqNr = state.confirmedSeqNr.get(confirmationQualifier) match {
case Some((nr, _)) => nr
case None => 0L
}
if (seqNr > previousConfirmedSeqNr)
Effect.persist(Confirmed(seqNr, confirmationQualifier, timestampMillis))
else
Effect.none // duplicate
case StoreMessageConfirmed(seqNr, confirmationQualifier, timestampMillis) =>
if (traceEnabled)
context.log.trace(
"StoreMessageConfirmed seqNr [{}], confirmationQualifier [{}]",
seqNr,
confirmationQualifier)
val previousConfirmedSeqNr = state.confirmedSeqNr.get(confirmationQualifier) match {
case Some((nr, _)) => nr
case None => 0L
}
if (seqNr > previousConfirmedSeqNr)
Effect.persist(Confirmed(seqNr, confirmationQualifier, timestampMillis))
else
Effect.none // duplicate
case LoadState(replyTo) =>
Effect.reply(replyTo)(state)
case LoadState(replyTo) =>
Effect.reply(replyTo)(state)
case _: CleanupTick[_] =>
onCleanupTick(state)
}
} else {
onCommandBeforeInitialCleanup(state, command)
}
private def onCleanupTick(state: State[A]): Effect[Event, State[A]] = {
val old = oldUnconfirmedToCleanup(state)
if (old.isEmpty) {
Effect.none
} else {
if (context.log.isDebugEnabled)
context.log.debug("Periodic cleanup [{}]", old.mkString(","))
Effect.persist(DurableProducerQueue.Cleanup(old))
}
}
private def oldUnconfirmedToCleanup(state: State[A]): Set[ConfirmationQualifier] = {
val now = System.currentTimeMillis()
state.confirmedSeqNr.collect {
case (confirmationQualifier, (_, timestampMillis))
if (now - timestampMillis) >= cleanupUnusedAfter.toMillis && !state.unconfirmed.exists(
_.confirmationQualifier != confirmationQualifier) =>
confirmationQualifier
}.toSet
}
def onCommandBeforeInitialCleanup(state: State[A], command: Command[A]): Effect[Event, State[A]] = {
command match {
case _: CleanupTick[_] =>
val now = System.currentTimeMillis()
val old = state.confirmedSeqNr.collect {
case (confirmationQualifier, (_, timestampMillis))
if (now - timestampMillis) >= cleanupUnusedAfter.toMillis && !state.unconfirmed.exists(
_.confirmationQualifier != confirmationQualifier) =>
confirmationQualifier
}.toSet
if (old.isEmpty) {
Effect.none
val old = oldUnconfirmedToCleanup(state)
val stateWithoutPartialChunkedMessages = state.cleanupPartialChunkedMessages()
initialCleanupDone = true
if (old.isEmpty && (stateWithoutPartialChunkedMessages eq state)) {
Effect.unstashAll()
} else {
if (context.log.isDebugEnabled)
context.log.debug("Cleanup [{}]", old.mkString(","))
Effect.persist(DurableProducerQueue.Cleanup(old))
context.log.debug("Initial cleanup [{}]", old.mkString(","))
Effect.persist(DurableProducerQueue.Cleanup(old)).thenUnstashAll()
}
case _ =>
Effect.stash()
}
}
def onEvent(state: State[A], event: Event): State[A] = {
event match {
case sent: MessageSent[A] @unchecked =>
state.copy(currentSeqNr = sent.seqNr + 1, unconfirmed = state.unconfirmed :+ sent)
state.addMessageSent(sent)
case Confirmed(seqNr, confirmationQualifier, timestampMillis) =>
val newUnconfirmed = state.unconfirmed.filterNot { u =>
u.confirmationQualifier == confirmationQualifier && u.seqNr <= seqNr
}
state.copy(
highestConfirmedSeqNr = math.max(state.highestConfirmedSeqNr, seqNr),
confirmedSeqNr = state.confirmedSeqNr.updated(confirmationQualifier, (seqNr, timestampMillis)),
unconfirmed = newUnconfirmed)
state.confirmed(seqNr, confirmationQualifier, timestampMillis)
case Cleanup(confirmationQualifiers) =>
state.copy(confirmedSeqNr = state.confirmedSeqNr -- confirmationQualifiers)
state.cleanup(confirmationQualifiers).cleanupPartialChunkedMessages()
}
}

View file

@ -28,11 +28,13 @@ object ReliableDeliveryWithEventSourcedProducerQueueSpec {
""")
}
class ReliableDeliveryWithEventSourcedProducerQueueSpec
extends ScalaTestWithActorTestKit(WorkPullingWithEventSourcedProducerQueueSpec.conf)
class ReliableDeliveryWithEventSourcedProducerQueueSpec(config: Config)
extends ScalaTestWithActorTestKit(config)
with AnyWordSpecLike
with LogCapturing {
def this() = this(ReliableDeliveryWithEventSourcedProducerQueueSpec.conf)
"ReliableDelivery with EventSourcedProducerQueue" must {
"deliver messages after full producer and consumer restart" in {
@ -168,3 +170,10 @@ class ReliableDeliveryWithEventSourcedProducerQueueSpec
}
}
// same tests but with chunked messages
class ReliableDeliveryWithEventSourcedProducerQueueChunkedSpec
extends ReliableDeliveryWithEventSourcedProducerQueueSpec(
ConfigFactory.parseString("""
akka.reliable-delivery.producer-controller.chunk-large-messages = 1b
""").withFallback(ReliableDeliveryWithEventSourcedProducerQueueSpec.conf))