From a54894914342d114f7b149370e3b04f8dd46e910 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 7 Sep 2020 14:02:52 +0200 Subject: [PATCH] Chunked messages in reliable delivery, #24276 (#28915) * To avoid head of line blocking from serialization and transfer of large messages this can be enabled. * ProducerController setting to chunk messages * split up large messages in chunks in ProducerController and assemble again in ConsumerController * serialization moved to these actors instead of in the Artery stream * other messages (for other actors) can interleave with the chunks * serializer for ChunkedMessage in SequencedMessage and MessageSent * cleanup partially stored chunked messages * reference docs * mima filters * additional test for sending the Request after half window size * enforce that chunk-large-messages=off for sharding and work-pulling --- .../delivery/ConsumerControllerSpec.scala | 120 ++++- .../DurableProducerControllerSpec.scala | 130 +++++- .../delivery/DurableProducerQueueSpec.scala | 93 ++++ .../delivery/DurableWorkPullingSpec.scala | 5 +- .../delivery/ProducerControllerSpec.scala | 53 ++- .../delivery/ReliableDeliveryRandomSpec.scala | 44 +- .../typed/delivery/ReliableDeliverySpec.scala | 43 +- .../actor/typed/delivery/TestConsumer.scala | 50 ++- .../delivery/TestDurableProducerQueue.scala | 19 +- .../typed/delivery/TestProducerWithAsk.scala | 19 +- .../typed/delivery/WorkPullingSpec.scala | 4 +- .../issue-24276-reliable-chunks.excludes | 14 + .../src/main/resources/reference.conf | 13 + .../typed/delivery/ConsumerController.scala | 45 +- .../typed/delivery/DurableProducerQueue.scala | 149 +++++- .../typed/delivery/ProducerController.scala | 27 +- .../WorkPullingProducerController.scala | 3 + .../delivery/internal/ChunkedMessage.scala | 22 + .../internal/ConsumerControllerImpl.scala | 65 ++- .../internal/ProducerControllerImpl.scala | 298 +++++++++--- .../WorkPullingProducerControllerImpl.scala | 7 +- .../src/main/resources/reference.conf | 4 + .../delivery/ShardingProducerController.scala | 3 + .../ShardingProducerControllerImpl.scala | 7 +- .../ReliableDeliveryShardingSpec.scala | 6 +- .../internal/protobuf/ReliableDelivery.java | 424 +++++++++++++++++- .../issue-24276-reliable-chunks.excludes | 2 + .../src/main/protobuf/ReliableDelivery.proto | 4 + .../delivery/ReliableDeliverySerializer.scala | 62 ++- .../cluster/typed/ChunkLargeMessageSpec.scala | 217 +++++++++ .../typed/MultiNodeTypedClusterSpec.scala | 12 +- .../ReliableDeliverySerializerSpec.scala | 14 +- akka-docs/src/main/paradox/remoting-artery.md | 7 +- .../main/paradox/typed/reliable-delivery.md | 23 + .../PersistenceSchemaEvolutionDocSpec.scala | 6 +- .../delivery/EventSourcedProducerQueue.scala | 132 +++--- ...eryWithEventSourcedProducerQueueSpec.scala | 13 +- 37 files changed, 1916 insertions(+), 243 deletions(-) create mode 100644 akka-actor-typed-tests/src/test/scala/akka/actor/typed/delivery/DurableProducerQueueSpec.scala create mode 100644 akka-actor-typed/src/main/mima-filters/2.6.8.backwards.excludes/issue-24276-reliable-chunks.excludes create mode 100644 akka-actor-typed/src/main/scala/akka/actor/typed/delivery/internal/ChunkedMessage.scala create mode 100644 akka-cluster-typed/src/main/mima-filters/2.6.8.backwards.excludes/issue-24276-reliable-chunks.excludes create mode 100644 akka-cluster-typed/src/multi-jvm/scala/akka/cluster/typed/ChunkLargeMessageSpec.scala diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/delivery/ConsumerControllerSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/delivery/ConsumerControllerSpec.scala index 6f3a62c14b..6f380d331f 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/delivery/ConsumerControllerSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/delivery/ConsumerControllerSpec.scala @@ -6,6 +6,7 @@ package akka.actor.typed.delivery import scala.concurrent.duration._ +import com.typesafe.config.ConfigFactory import org.scalatest.wordspec.AnyWordSpecLike import akka.actor.testkit.typed.scaladsl.LogCapturing @@ -13,13 +14,17 @@ import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit import akka.actor.typed.delivery.ConsumerController.DeliverThenStop import akka.actor.typed.delivery.internal.ConsumerControllerImpl import akka.actor.typed.delivery.internal.ProducerControllerImpl +import akka.serialization.SerializationExtension -class ConsumerControllerSpec extends ScalaTestWithActorTestKit(""" +class ConsumerControllerSpec + extends ScalaTestWithActorTestKit(ConfigFactory.parseString(""" akka.reliable-delivery.consumer-controller { flow-control-window = 20 resend-interval-min = 1s } - """) with AnyWordSpecLike with LogCapturing { + """).withFallback(TestSerializer.config)) + with AnyWordSpecLike + with LogCapturing { import TestConsumer.sequencedMessage private var idCount = 0 @@ -33,6 +38,8 @@ class ConsumerControllerSpec extends ScalaTestWithActorTestKit(""" private val settings = ConsumerController.Settings(system) import settings.flowControlWindow + private val serialization = SerializationExtension(system) + "ConsumerController" must { "resend RegisterConsumer" in { nextId() @@ -537,6 +544,115 @@ class ConsumerControllerSpec extends ScalaTestWithActorTestKit(""" } } + "ConsumerController with chunked messages" must { + + "collect and assemble chunks" in { + nextId() + val consumerController = + spawn(ConsumerController[TestConsumer.Job](), s"consumerController-${idCount}") + .unsafeUpcast[ConsumerControllerImpl.InternalCommand] + val producerControllerProbe = createTestProbe[ProducerControllerImpl.InternalCommand]() + + val consumerProbe = createTestProbe[ConsumerController.Delivery[TestConsumer.Job]]() + consumerController ! ConsumerController.Start(consumerProbe.ref) + + // one chunk for each letter, "123" => 3 chunks + val chunks1 = ProducerControllerImpl.createChunks(TestConsumer.Job(s"123"), chunkSize = 1, serialization) + val seqMessages1 = chunks1.zipWithIndex.map { + case (chunk, i) => + ConsumerController.SequencedMessage.fromChunked( + producerId, + 1 + i, + chunk, + first = i == 0, + ack = false, + producerControllerProbe.ref) + } + + consumerController ! seqMessages1.head + consumerProbe.expectNoMessage() // need all chunks before delivery + producerControllerProbe.expectMessage(ProducerControllerImpl.Request(0, 20, true, false)) + consumerController ! seqMessages1(1) + consumerController ! seqMessages1(2) + consumerProbe.expectMessageType[ConsumerController.Delivery[TestConsumer.Job]].message.payload should ===("123") + consumerController ! ConsumerController.Confirmed + producerControllerProbe.expectMessage(ProducerControllerImpl.Request(3, 22, true, false)) + + val chunks2 = ProducerControllerImpl.createChunks(TestConsumer.Job(s"45"), chunkSize = 1, serialization) + val seqMessages2 = chunks2.zipWithIndex.map { + case (chunk, i) => + ConsumerController.SequencedMessage.fromChunked( + producerId, + 4 + i, + chunk, + first = false, + ack = true, + producerControllerProbe.ref) + } + + consumerController ! seqMessages2.head + consumerController ! seqMessages2(1) + consumerProbe.expectMessageType[ConsumerController.Delivery[TestConsumer.Job]].message.payload should ===("45") + consumerController ! ConsumerController.Confirmed + producerControllerProbe.expectMessage(ProducerControllerImpl.Ack(5)) + + testKit.stop(consumerController) + } + + "send Request after half window size when many chunks" in { + nextId() + val consumerController = + spawn(ConsumerController[TestConsumer.Job](), s"consumerController-${idCount}") + .unsafeUpcast[ConsumerControllerImpl.InternalCommand] + val producerControllerProbe = createTestProbe[ProducerControllerImpl.InternalCommand]() + + val consumerProbe = createTestProbe[ConsumerController.Delivery[TestConsumer.Job]]() + consumerController ! ConsumerController.Start(consumerProbe.ref) + + // one chunk for each letter, => 25 chunks + val chunks1 = + ProducerControllerImpl.createChunks( + TestConsumer.Job(s"1234567890123456789012345"), + chunkSize = 1, + serialization) + val seqMessages1 = chunks1.zipWithIndex.map { + case (chunk, i) => + ConsumerController.SequencedMessage.fromChunked( + producerId, + 1 + i, + chunk, + first = i == 0, + ack = false, + producerControllerProbe.ref) + } + + consumerController ! seqMessages1.head + producerControllerProbe.expectMessage(ProducerControllerImpl.Request(0, 20, true, false)) + producerControllerProbe.expectNoMessage() // no more Request yet + (1 to 8).foreach(i => consumerController ! seqMessages1(i)) + producerControllerProbe.expectNoMessage() // sent 9, no more Request yet + + consumerController ! seqMessages1(9) + producerControllerProbe.expectMessage(ProducerControllerImpl.Request(0, 30, true, false)) + + (10 to 18).foreach(i => consumerController ! seqMessages1(i)) + producerControllerProbe.expectNoMessage() // sent 19, no more Request yet + + consumerController ! seqMessages1(19) + producerControllerProbe.expectMessage(ProducerControllerImpl.Request(0, 40, true, false)) + + // not sending more for a while, timeout will trigger new Request + producerControllerProbe.expectMessage(ProducerControllerImpl.Request(0, 40, true, true)) + + (20 to 24).foreach(i => consumerController ! seqMessages1(i)) + consumerProbe.expectMessageType[ConsumerController.Delivery[TestConsumer.Job]].message.payload should ===( + "1234567890123456789012345") + consumerController ! ConsumerController.Confirmed + + testKit.stop(consumerController) + } + } + "ConsumerController without resending" must { "accept lost message" in { nextId() diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/delivery/DurableProducerControllerSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/delivery/DurableProducerControllerSpec.scala index 6fffc2bb2d..5dc9c28e3c 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/delivery/DurableProducerControllerSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/delivery/DurableProducerControllerSpec.scala @@ -8,19 +8,23 @@ import java.util.concurrent.atomic.AtomicReference import scala.concurrent.duration._ -import DurableProducerQueue.MessageSent -import ProducerController.MessageWithConfirmation +import com.typesafe.config.ConfigFactory import org.scalatest.wordspec.AnyWordSpecLike import akka.actor.testkit.typed.scaladsl.LogCapturing import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit +import akka.actor.typed.delivery.DurableProducerQueue.MessageSent +import akka.actor.typed.delivery.ProducerController.MessageWithConfirmation +import akka.actor.typed.delivery.internal.ChunkedMessage import akka.actor.typed.delivery.internal.ProducerControllerImpl +import akka.util.ByteString class DurableProducerControllerSpec - extends ScalaTestWithActorTestKit(""" + extends ScalaTestWithActorTestKit( + ConfigFactory.parseString(""" akka.reliable-delivery.consumer-controller.flow-control-window = 20 akka.reliable-delivery.consumer-controller.resend-interval-min = 1s - """) + """).withFallback(TestSerializer.config)) with AnyWordSpecLike with LogCapturing { import DurableProducerQueue.NoQualifier @@ -157,6 +161,124 @@ class DurableProducerControllerSpec testKit.stop(producerController) } + + "store chunked messages" in { + nextId() + val consumerControllerProbe = createTestProbe[ConsumerController.Command[TestConsumer.Job]]() + + val stateHolder = + new AtomicReference[DurableProducerQueue.State[TestConsumer.Job]](DurableProducerQueue.State.empty) + val durable = + TestDurableProducerQueue[TestConsumer.Job]( + Duration.Zero, + stateHolder, + (_: DurableProducerQueue.Command[_]) => false) + + val producerController = + spawn( + ProducerController[TestConsumer.Job]( + producerId, + Some(durable), + ProducerController.Settings(system).withChunkLargeMessagesBytes(1)), + s"producerController-${idCount}").unsafeUpcast[ProducerControllerImpl.InternalCommand] + val producerProbe = createTestProbe[ProducerController.RequestNext[TestConsumer.Job]]() + producerController ! ProducerController.Start(producerProbe.ref) + + producerController ! ProducerController.RegisterConsumer(consumerControllerProbe.ref) + + producerProbe.receiveMessage().sendNextTo ! TestConsumer.Job("abc") + consumerControllerProbe.expectMessageType[ConsumerController.SequencedMessage[TestConsumer.Job]] + + producerProbe.awaitAssert { + val durableState = stateHolder.get() + durableState.currentSeqNr should ===(2) + durableState.unconfirmed.size should ===(1) + durableState.unconfirmed.head.message.getClass should ===(classOf[ChunkedMessage]) + } + + producerController ! ProducerControllerImpl.Request(0L, 10L, true, false) + + consumerControllerProbe.expectMessageType[ConsumerController.SequencedMessage[TestConsumer.Job]] + + val seqMsg3 = consumerControllerProbe.expectMessageType[ConsumerController.SequencedMessage[TestConsumer.Job]] + seqMsg3.isFirstChunk should ===(false) + seqMsg3.isLastChunk should ===(true) + seqMsg3.seqNr should ===(3L) + + producerProbe.awaitAssert { + val durableState = stateHolder.get() + durableState.currentSeqNr should ===(4) + durableState.unconfirmed.size should ===(3) + durableState.unconfirmed.head.message.getClass should ===(classOf[ChunkedMessage]) + } + + testKit.stop(producerController) + } + + "load initial state but don't resend partially stored chunked messages" in { + // may happen if crashed before all chunked messages have been stored, + // should be treated as if none of them were stored (they were not confirmed) + nextId() + val consumerControllerProbe = createTestProbe[ConsumerController.Command[TestConsumer.Job]]() + + val durable = TestDurableProducerQueue[TestConsumer.Job]( + Duration.Zero, + DurableProducerQueue.State( + currentSeqNr = 5, + highestConfirmedSeqNr = 2, + confirmedSeqNr = Map(NoQualifier -> (2L -> TestTimestamp)), + unconfirmed = Vector( + DurableProducerQueue.MessageSent.fromChunked[TestConsumer.Job]( + 3, + ChunkedMessage(ByteString.fromString("abc"), true, true, 20, ""), + false, + NoQualifier, + TestTimestamp), + DurableProducerQueue.MessageSent.fromChunked[TestConsumer.Job]( + 4, + ChunkedMessage(ByteString.fromString("d"), true, false, 20, ""), + false, + NoQualifier, + TestTimestamp), + DurableProducerQueue.MessageSent.fromChunked[TestConsumer.Job]( + 5, + ChunkedMessage(ByteString.fromString("e"), false, false, 20, ""), + false, + NoQualifier, + TestTimestamp) + // missing last chunk + ))) + + val producerController = + spawn( + ProducerController[TestConsumer.Job]( + producerId, + Some(durable), + ProducerController.Settings(system).withChunkLargeMessagesBytes(1)), + s"producerController-${idCount}").unsafeUpcast[ProducerControllerImpl.InternalCommand] + val producerProbe = createTestProbe[ProducerController.RequestNext[TestConsumer.Job]]() + producerController ! ProducerController.Start(producerProbe.ref) + + producerController ! ProducerController.RegisterConsumer(consumerControllerProbe.ref) + + val seqMsg3 = consumerControllerProbe.expectMessageType[ConsumerController.SequencedMessage[TestConsumer.Job]] + seqMsg3.seqNr should ===(3) + seqMsg3.isFirstChunk should ===(true) + seqMsg3.isLastChunk should ===(true) + + producerController ! ProducerControllerImpl.Request(0L, 10L, true, false) + + // 4 and 5 discarded because missing last chunk + consumerControllerProbe.expectNoMessage() + + producerProbe.receiveMessage().sendNextTo ! TestConsumer.Job("g") + val seqMsg4 = consumerControllerProbe.expectMessageType[ConsumerController.SequencedMessage[TestConsumer.Job]] + seqMsg4.seqNr should ===(4) + seqMsg4.isFirstChunk should ===(true) + seqMsg4.isLastChunk should ===(true) + + testKit.stop(producerController) + } } } diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/delivery/DurableProducerQueueSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/delivery/DurableProducerQueueSpec.scala new file mode 100644 index 0000000000..e1208b3823 --- /dev/null +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/delivery/DurableProducerQueueSpec.scala @@ -0,0 +1,93 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package akka.actor.typed.delivery + +import org.scalatest.TestSuite +import org.scalatest.matchers.should.Matchers +import org.scalatest.wordspec.AnyWordSpec + +import akka.actor.typed.delivery.DurableProducerQueue.MessageSent +import akka.actor.typed.delivery.DurableProducerQueue.State +import akka.actor.typed.delivery.internal.ChunkedMessage +import akka.util.ByteString + +class DurableProducerQueueSpec extends AnyWordSpec with TestSuite with Matchers { + + "DurableProducerQueue.State" must { + "addMessageSent" in { + val state1 = State.empty.addMessageSent(MessageSent(1, "a", false, "", 0L)) + state1.unconfirmed.size should ===(1) + state1.unconfirmed.head.message should ===("a") + state1.currentSeqNr should ===(2L) + + val state2 = state1.addMessageSent(MessageSent(2, "b", false, "", 0L)) + state2.unconfirmed.size should ===(2) + state2.unconfirmed.last.message should ===("b") + state2.currentSeqNr should ===(3L) + } + + "confirm" in { + val state1 = State.empty + .addMessageSent(MessageSent(1, "a", false, "", 0L)) + .addMessageSent(MessageSent(2, "b", false, "", 0L)) + val state2 = state1.confirmed(1L, "", 0L) + state2.unconfirmed.size should ===(1) + state2.unconfirmed.head.message should ===("b") + state2.currentSeqNr should ===(3L) + } + + "filter partially stored chunked messages" in { + val state1 = State + .empty[String] + .addMessageSent( + MessageSent.fromChunked(1, ChunkedMessage(ByteString.fromString("a"), true, true, 20, ""), false, "", 0L)) + .addMessageSent( + MessageSent.fromChunked(2, ChunkedMessage(ByteString.fromString("b"), true, false, 20, ""), false, "", 0L)) + .addMessageSent( + MessageSent.fromChunked(3, ChunkedMessage(ByteString.fromString("c"), false, false, 20, ""), false, "", 0L)) + // last chunk was never stored + + val state2 = state1.cleanupPartialChunkedMessages() + state2.unconfirmed.size should ===(1) + state2.unconfirmed.head.message.asInstanceOf[ChunkedMessage].serialized should be(ByteString.fromString("a")) + state2.currentSeqNr should ===(2L) + + val state3 = state1 + .addMessageSent( + MessageSent.fromChunked(2, ChunkedMessage(ByteString.fromString("d"), true, false, 20, ""), false, "", 0L)) + .addMessageSent( + MessageSent.fromChunked(3, ChunkedMessage(ByteString.fromString("e"), false, true, 20, ""), false, "", 0L)) + + val state4 = state3.cleanupPartialChunkedMessages() + state4.unconfirmed.size should ===(3) + state4.unconfirmed.head.message.asInstanceOf[ChunkedMessage].serialized should be(ByteString.fromString("a")) + state4.unconfirmed(1).message.asInstanceOf[ChunkedMessage].serialized should be(ByteString.fromString("d")) + state4.unconfirmed(2).message.asInstanceOf[ChunkedMessage].serialized should be(ByteString.fromString("e")) + state4.currentSeqNr should ===(4L) + + val state5 = state3 + .addMessageSent( + MessageSent.fromChunked(4, ChunkedMessage(ByteString.fromString("f"), true, false, 20, ""), false, "", 0L)) + .addMessageSent( + MessageSent.fromChunked(5, ChunkedMessage(ByteString.fromString("g"), false, false, 20, ""), false, "", 0L)) + .addMessageSent( + MessageSent.fromChunked(4, ChunkedMessage(ByteString.fromString("h"), true, true, 20, ""), false, "", 0L)) + .addMessageSent( + MessageSent.fromChunked(5, ChunkedMessage(ByteString.fromString("i"), true, false, 20, ""), false, "", 0L)) + .addMessageSent( + MessageSent.fromChunked(6, ChunkedMessage(ByteString.fromString("j"), false, false, 20, ""), false, "", 0L)) + + val state6 = state5.cleanupPartialChunkedMessages() + state6.unconfirmed.size should ===(4) + state6.unconfirmed.head.message.asInstanceOf[ChunkedMessage].serialized should be(ByteString.fromString("a")) + state6.unconfirmed(1).message.asInstanceOf[ChunkedMessage].serialized should be(ByteString.fromString("d")) + state6.unconfirmed(2).message.asInstanceOf[ChunkedMessage].serialized should be(ByteString.fromString("e")) + state6.unconfirmed(3).message.asInstanceOf[ChunkedMessage].serialized should be(ByteString.fromString("h")) + state6.currentSeqNr should ===(5L) + } + + } + +} diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/delivery/DurableWorkPullingSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/delivery/DurableWorkPullingSpec.scala index 8cb63f6281..de23d55cf6 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/delivery/DurableWorkPullingSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/delivery/DurableWorkPullingSpec.scala @@ -53,10 +53,11 @@ class DurableWorkPullingSpec s: DurableProducerQueue.State[TestConsumer.Job], expected: DurableProducerQueue.State[TestConsumer.Job]): Unit = { - def cleanup(a: DurableProducerQueue.State[TestConsumer.Job]) = + def cleanup(a: DurableProducerQueue.State[TestConsumer.Job]): DurableProducerQueue.State[TestConsumer.Job] = { a.copy( confirmedSeqNr = Map.empty, - unconfirmed = s.unconfirmed.map(m => m.copy(confirmationQualifier = DurableProducerQueue.NoQualifier))) + unconfirmed = s.unconfirmed.map(m => m.withConfirmationQualifier(DurableProducerQueue.NoQualifier))) + } cleanup(s) should ===(cleanup(expected)) } diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/delivery/ProducerControllerSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/delivery/ProducerControllerSpec.scala index e2b930d093..0391a73549 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/delivery/ProducerControllerSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/delivery/ProducerControllerSpec.scala @@ -6,17 +6,20 @@ package akka.actor.typed.delivery import scala.concurrent.duration._ -import ProducerController.MessageWithConfirmation +import com.typesafe.config.ConfigFactory import org.scalatest.wordspec.AnyWordSpecLike import akka.actor.testkit.typed.scaladsl.LogCapturing import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit +import akka.actor.typed.delivery.ProducerController.MessageWithConfirmation +import akka.actor.typed.delivery.internal.ChunkedMessage import akka.actor.typed.delivery.internal.ProducerControllerImpl class ProducerControllerSpec - extends ScalaTestWithActorTestKit(""" + extends ScalaTestWithActorTestKit( + ConfigFactory.parseString(""" akka.reliable-delivery.consumer-controller.flow-control-window = 20 - """) + """).withFallback(TestSerializer.config)) with AnyWordSpecLike with LogCapturing { import TestConsumer.sequencedMessage @@ -247,6 +250,50 @@ class ProducerControllerSpec testKit.stop(producerController) } + "chunk large messages" in { + nextId() + val consumerControllerProbe = createTestProbe[ConsumerController.Command[TestConsumer.Job]]() + + val producerController = + spawn( + ProducerController[TestConsumer.Job]( + producerId, + None, + ProducerController.Settings(system).withChunkLargeMessagesBytes(1)), + s"producerController-${idCount}").unsafeUpcast[ProducerControllerImpl.InternalCommand] + val producerProbe = createTestProbe[ProducerController.RequestNext[TestConsumer.Job]]() + producerController ! ProducerController.Start(producerProbe.ref) + + producerController ! ProducerController.RegisterConsumer(consumerControllerProbe.ref) + + producerProbe.receiveMessage().sendNextTo ! TestConsumer.Job("abc") + val seqMsg1 = consumerControllerProbe.expectMessageType[ConsumerController.SequencedMessage[TestConsumer.Job]] + seqMsg1.message.getClass should ===(classOf[ChunkedMessage]) + seqMsg1.isFirstChunk should ===(true) + seqMsg1.isLastChunk should ===(false) + seqMsg1.seqNr should ===(1L) + + producerController ! ProducerControllerImpl.Request(0L, 10L, true, false) + + val seqMsg2 = consumerControllerProbe.expectMessageType[ConsumerController.SequencedMessage[TestConsumer.Job]] + seqMsg2.isFirstChunk should ===(false) + seqMsg2.isLastChunk should ===(false) + seqMsg2.seqNr should ===(2L) + + val seqMsg3 = consumerControllerProbe.expectMessageType[ConsumerController.SequencedMessage[TestConsumer.Job]] + seqMsg3.isFirstChunk should ===(false) + seqMsg3.isLastChunk should ===(true) + seqMsg3.seqNr should ===(3L) + + producerProbe.receiveMessage().sendNextTo ! TestConsumer.Job("d") + val seqMsg4 = consumerControllerProbe.expectMessageType[ConsumerController.SequencedMessage[TestConsumer.Job]] + seqMsg4.isFirstChunk should ===(true) + seqMsg4.isLastChunk should ===(true) + seqMsg4.seqNr should ===(4L) + + testKit.stop(producerController) + } + } "ProducerController without resends" must { diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/delivery/ReliableDeliveryRandomSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/delivery/ReliableDeliveryRandomSpec.scala index 9ef9fd1fd9..d872d014e8 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/delivery/ReliableDeliveryRandomSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/delivery/ReliableDeliveryRandomSpec.scala @@ -9,6 +9,8 @@ import java.util.concurrent.atomic.AtomicReference import scala.concurrent.duration._ import scala.util.Random +import com.typesafe.config.Config +import com.typesafe.config.ConfigFactory import org.scalatest.wordspec.AnyWordSpecLike import akka.actor.testkit.typed.scaladsl.LogCapturing @@ -21,6 +23,14 @@ import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.scaladsl.LoggerOps object ReliableDeliveryRandomSpec { + val config: Config = ConfigFactory.parseString(""" + akka.reliable-delivery.consumer-controller { + flow-control-window = 20 + resend-interval-min = 500 ms + resend-interval-max = 2 s + } + """) + object RandomFlakyNetwork { def apply[T](rnd: Random, dropProbability: Any => Double): BehaviorInterceptor[T, T] = new RandomFlakyNetwork(rnd, dropProbability).asInstanceOf[BehaviorInterceptor[T, T]] @@ -42,15 +52,14 @@ object ReliableDeliveryRandomSpec { } } -class ReliableDeliveryRandomSpec extends ScalaTestWithActorTestKit(""" - akka.reliable-delivery.consumer-controller { - flow-control-window = 20 - resend-interval-min = 500 ms - resend-interval-max = 2 s - } - """) with AnyWordSpecLike with LogCapturing { +class ReliableDeliveryRandomSpec(config: Config) + extends ScalaTestWithActorTestKit(config) + with AnyWordSpecLike + with LogCapturing { import ReliableDeliveryRandomSpec._ + def this() = this(ReliableDeliveryRandomSpec.config) + private var idCount = 0 private def nextId(): Int = { idCount += 1 @@ -90,7 +99,7 @@ class ReliableDeliveryRandomSpec extends ScalaTestWithActorTestKit(""" case _ => 0.0 } - val consumerEndProbe = createTestProbe[TestConsumer.CollectedProducerIds]() + val consumerEndProbe = createTestProbe[TestConsumer.Collected]() val consumerController = spawn( Behaviors.intercept(() => RandomFlakyNetwork[ConsumerController.Command[TestConsumer.Job]](rnd, consumerDrop))( @@ -137,8 +146,8 @@ class ReliableDeliveryRandomSpec extends ScalaTestWithActorTestKit(""" nextId() val rndSeed = System.currentTimeMillis() val rnd = new Random(rndSeed) - val consumerDropProbability = 0.1 + rnd.nextDouble() * 0.4 - val producerDropProbability = 0.1 + rnd.nextDouble() * 0.3 + val consumerDropProbability = 0.1 + rnd.nextDouble() * 0.2 + val producerDropProbability = 0.1 + rnd.nextDouble() * 0.2 test( rndSeed, rnd, @@ -153,7 +162,7 @@ class ReliableDeliveryRandomSpec extends ScalaTestWithActorTestKit(""" nextId() val rndSeed = System.currentTimeMillis() val rnd = new Random(rndSeed) - val durableFailProbability = 0.1 + rnd.nextDouble() * 0.2 + val durableFailProbability = 0.1 + rnd.nextDouble() * 0.1 test( rndSeed, rnd, @@ -168,9 +177,9 @@ class ReliableDeliveryRandomSpec extends ScalaTestWithActorTestKit(""" nextId() val rndSeed = System.currentTimeMillis() val rnd = new Random(rndSeed) - val consumerDropProbability = 0.1 + rnd.nextDouble() * 0.4 - val producerDropProbability = 0.1 + rnd.nextDouble() * 0.3 - val durableFailProbability = 0.1 + rnd.nextDouble() * 0.2 + val consumerDropProbability = 0.1 + rnd.nextDouble() * 0.1 + val producerDropProbability = 0.1 + rnd.nextDouble() * 0.1 + val durableFailProbability = 0.1 + rnd.nextDouble() * 0.1 test( rndSeed, rnd, @@ -200,3 +209,10 @@ class ReliableDeliveryRandomSpec extends ScalaTestWithActorTestKit(""" } } + +// same tests but with chunked messages +class ReliableDeliveryRandomChunkedSpec + extends ReliableDeliveryRandomSpec( + ConfigFactory.parseString(""" + akka.reliable-delivery.producer-controller.chunk-large-messages = 1b + """).withFallback(TestSerializer.config).withFallback(ReliableDeliveryRandomSpec.config)) diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/delivery/ReliableDeliverySpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/delivery/ReliableDeliverySpec.scala index c81a56f296..8834a5f7e6 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/delivery/ReliableDeliverySpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/delivery/ReliableDeliverySpec.scala @@ -6,20 +6,30 @@ package akka.actor.typed.delivery import scala.concurrent.duration._ +import com.typesafe.config.Config +import com.typesafe.config.ConfigFactory import org.scalatest.wordspec.AnyWordSpecLike import akka.actor.testkit.typed.scaladsl.LogCapturing import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit -class ReliableDeliverySpec - extends ScalaTestWithActorTestKit(""" - akka.reliable-delivery.consumer-controller.flow-control-window = 20 - """) +object ReliableDeliverySpec { + val config: Config = ConfigFactory.parseString(""" + akka.reliable-delivery.consumer-controller.flow-control-window = 20 + """) +} + +class ReliableDeliverySpec(config: Config) + extends ScalaTestWithActorTestKit(config) with AnyWordSpecLike with LogCapturing { import TestConsumer.defaultConsumerDelay import TestProducer.defaultProducerDelay + def this() = this(ReliableDeliverySpec.config) + + private val chunked = ProducerController.Settings(system).chunkLargeMessagesBytes > 0 + private var idCount = 0 private def nextId(): Int = { idCount += 1 @@ -30,7 +40,7 @@ class ReliableDeliverySpec "illustrate point-to-point usage" in { nextId() - val consumerEndProbe = createTestProbe[TestConsumer.CollectedProducerIds]() + val consumerEndProbe = createTestProbe[TestConsumer.Collected]() val consumerController = spawn(ConsumerController[TestConsumer.Job](), s"consumerController-${idCount}") spawn( @@ -52,7 +62,7 @@ class ReliableDeliverySpec "illustrate point-to-point usage with ask" in { nextId() - val consumerEndProbe = createTestProbe[TestConsumer.CollectedProducerIds]() + val consumerEndProbe = createTestProbe[TestConsumer.Collected]() val consumerController = spawn(ConsumerController[TestConsumer.Job](), s"consumerController-${idCount}") spawn( @@ -70,9 +80,11 @@ class ReliableDeliverySpec consumerController ! ConsumerController.RegisterToProducerController(producerController) - consumerEndProbe.receiveMessage(5.seconds) - - replyProbe.receiveMessages(42, 5.seconds).toSet should ===((1L to 42L).toSet) + val messageCount = consumerEndProbe.receiveMessage(5.seconds).messageCount + if (chunked) + replyProbe.receiveMessages(messageCount, 5.seconds) + else + replyProbe.receiveMessages(messageCount, 5.seconds).toSet should ===((1L to 42).toSet) testKit.stop(producer) testKit.stop(producerController) @@ -81,7 +93,7 @@ class ReliableDeliverySpec def testWithDelays(producerDelay: FiniteDuration, consumerDelay: FiniteDuration): Unit = { nextId() - val consumerEndProbe = createTestProbe[TestConsumer.CollectedProducerIds]() + val consumerEndProbe = createTestProbe[TestConsumer.Collected]() val consumerController = spawn(ConsumerController[TestConsumer.Job](), s"consumerController-${idCount}") spawn(TestConsumer(consumerDelay, 42, consumerEndProbe.ref, consumerController), name = s"destination-${idCount}") @@ -113,7 +125,7 @@ class ReliableDeliverySpec "allow replacement of destination" in { nextId() - val consumerEndProbe = createTestProbe[TestConsumer.CollectedProducerIds]() + val consumerEndProbe = createTestProbe[TestConsumer.Collected]() val consumerController = spawn(ConsumerController[TestConsumer.Job](), s"consumerController1-${idCount}") spawn(TestConsumer(defaultConsumerDelay, 42, consumerEndProbe.ref, consumerController), s"consumer1-${idCount}") @@ -126,7 +138,7 @@ class ReliableDeliverySpec consumerEndProbe.receiveMessage(5.seconds) - val consumerEndProbe2 = createTestProbe[TestConsumer.CollectedProducerIds]() + val consumerEndProbe2 = createTestProbe[TestConsumer.Collected]() val consumerController2 = spawn(ConsumerController[TestConsumer.Job](), s"consumerController2-${idCount}") spawn(TestConsumer(defaultConsumerDelay, 42, consumerEndProbe2.ref, consumerController2), s"consumer2-${idCount}") @@ -189,3 +201,10 @@ class ReliableDeliverySpec } } + +// Same tests but with chunked messages +class ReliableDeliveryChunkedSpec + extends ReliableDeliverySpec( + ConfigFactory.parseString(""" + akka.reliable-delivery.producer-controller.chunk-large-messages = 1b + """).withFallback(TestSerializer.config).withFallback(ReliableDeliverySpec.config)) diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/delivery/TestConsumer.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/delivery/TestConsumer.scala index d71ef74971..51a6bcd179 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/delivery/TestConsumer.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/delivery/TestConsumer.scala @@ -4,17 +4,22 @@ package akka.actor.typed.delivery -import scala.concurrent.duration._ +import java.nio.charset.StandardCharsets + import scala.concurrent.duration.Duration import scala.concurrent.duration.FiniteDuration +import scala.concurrent.duration._ -import ConsumerController.SequencedMessage +import com.typesafe.config.Config +import com.typesafe.config.ConfigFactory import akka.actor.typed.ActorRef import akka.actor.typed.Behavior +import akka.actor.typed.delivery.ConsumerController.SequencedMessage import akka.actor.typed.delivery.internal.ProducerControllerImpl import akka.actor.typed.scaladsl.ActorContext import akka.actor.typed.scaladsl.Behaviors +import akka.serialization.SerializerWithStringManifest object TestConsumer { @@ -33,7 +38,7 @@ object TestConsumer { seqNr: Long) extends Command - final case class CollectedProducerIds(producerIds: Set[String]) + final case class Collected(producerIds: Set[String], messageCount: Int) val defaultConsumerDelay: FiniteDuration = 10.millis @@ -53,17 +58,17 @@ object TestConsumer { def apply( delay: FiniteDuration, endSeqNr: Long, - endReplyTo: ActorRef[CollectedProducerIds], + endReplyTo: ActorRef[Collected], controller: ActorRef[ConsumerController.Start[TestConsumer.Job]]): Behavior[Command] = apply(delay, consumerEndCondition(endSeqNr), endReplyTo, controller) def apply( delay: FiniteDuration, endCondition: SomeAsyncJob => Boolean, - endReplyTo: ActorRef[CollectedProducerIds], + endReplyTo: ActorRef[Collected], controller: ActorRef[ConsumerController.Start[TestConsumer.Job]]): Behavior[Command] = Behaviors.setup[Command] { ctx => - new TestConsumer(ctx, delay, endCondition, endReplyTo, controller).active(Set.empty) + new TestConsumer(ctx, delay, endCondition, endReplyTo, controller).active(Set.empty, 0) } } @@ -72,7 +77,7 @@ class TestConsumer( ctx: ActorContext[TestConsumer.Command], delay: FiniteDuration, endCondition: TestConsumer.SomeAsyncJob => Boolean, - endReplyTo: ActorRef[TestConsumer.CollectedProducerIds], + endReplyTo: ActorRef[TestConsumer.Collected], controller: ActorRef[ConsumerController.Start[TestConsumer.Job]]) { import TestConsumer._ @@ -83,10 +88,11 @@ class TestConsumer( controller ! ConsumerController.Start(deliverTo) - private def active(processed: Set[(String, Long)]): Behavior[Command] = { + private def active(processed: Set[(String, Long)], messageCount: Int): Behavior[Command] = { Behaviors.receive { (ctx, m) => m match { case JobDelivery(msg, confirmTo, producerId, seqNr) => + ctx.log.trace("SeqNr [{}] was delivered to consumer.", seqNr) // confirmation can be later, asynchronously if (delay == Duration.Zero) ctx.self ! SomeAsyncJob(msg, confirmTo, producerId, seqNr) @@ -106,12 +112,36 @@ class TestConsumer( confirmTo ! ConsumerController.Confirmed if (endCondition(job)) { - endReplyTo ! CollectedProducerIds(processed.map(_._1)) + ctx.log.debug("End at [{}]", seqNr) + endReplyTo ! Collected(processed.map(_._1), messageCount + 1) Behaviors.stopped } else - active(cleanProcessed + (producerId -> seqNr)) + active(cleanProcessed + (producerId -> seqNr), messageCount + 1) } } } } + +object TestSerializer { + val config: Config = ConfigFactory.parseString(s""" + akka.actor.serializers.delivery-test = ${classOf[TestSerializer].getName} + akka.actor.serialization-bindings { + "${classOf[TestConsumer.Job].getName}" = delivery-test + } + """) +} + +class TestSerializer extends SerializerWithStringManifest { + override def identifier: Int = 787878 + + override def manifest(o: AnyRef): String = "" + + override def toBinary(o: AnyRef): Array[Byte] = + o match { + case TestConsumer.Job(payload) => payload.getBytes(StandardCharsets.UTF_8) + } + + override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = + TestConsumer.Job(new String(bytes, StandardCharsets.UTF_8)) +} diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/delivery/TestDurableProducerQueue.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/delivery/TestDurableProducerQueue.scala index 2f1cc9cc58..6e9a07a3fc 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/delivery/TestDurableProducerQueue.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/delivery/TestDurableProducerQueue.scala @@ -28,7 +28,7 @@ object TestDurableProducerQueue { .supervise { Behaviors.setup[Command[A]] { context => context.setLoggerName("TestDurableProducerQueue") - val state = stateHolder.get() + val state = stateHolder.get().cleanupPartialChunkedMessages() context.log.info("Starting with seqNr [{}], confirmedSeqNr [{}]", state.currentSeqNr, state.confirmedSeqNr) new TestDurableProducerQueue[A](context, delay, stateHolder, failWhen).active(state) } @@ -70,12 +70,9 @@ class TestDurableProducerQueue[A]( maybeFail(cmd) val reply = StoreMessageSentAck(cmd.sent.seqNr) if (delay == Duration.Zero) cmd.replyTo ! reply else context.scheduleOnce(delay, cmd.replyTo, reply) - active( - state.copy( - currentSeqNr = cmd.sent.seqNr + 1, - unconfirmed = state.unconfirmed :+ cmd.sent.copy(timestampMillis = TestTimestamp))) + active(state.addMessageSent(cmd.sent.withTimestampMillis(TestTimestamp))) } else if (cmd.sent.seqNr == state.currentSeqNr - 1) { - // already stored, could be a retry after timout + // already stored, could be a retry after timeout context.log.info("Duplicate seqNr [{}], currentSeqNr [{}]", cmd.sent.seqNr, state.currentSeqNr) val reply = StoreMessageSentAck(cmd.sent.seqNr) if (delay == Duration.Zero) cmd.replyTo ! reply else context.scheduleOnce(delay, cmd.replyTo, reply) @@ -92,15 +89,7 @@ class TestDurableProducerQueue[A]( cmd.seqNr, cmd.confirmationQualifier) maybeFail(cmd) - val newUnconfirmed = state.unconfirmed.filterNot { u => - u.confirmationQualifier == cmd.confirmationQualifier && u.seqNr <= cmd.seqNr - } - val newHighestConfirmed = math.max(state.highestConfirmedSeqNr, cmd.seqNr) - active( - state.copy( - highestConfirmedSeqNr = newHighestConfirmed, - confirmedSeqNr = state.confirmedSeqNr.updated(cmd.confirmationQualifier, (cmd.seqNr, TestTimestamp)), - unconfirmed = newUnconfirmed)) + active(state.confirmed(cmd.seqNr, cmd.confirmationQualifier, TestTimestamp)) } } diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/delivery/TestProducerWithAsk.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/delivery/TestProducerWithAsk.scala index 9b02332ccc..ee7d228e66 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/delivery/TestProducerWithAsk.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/delivery/TestProducerWithAsk.scala @@ -4,8 +4,8 @@ package akka.actor.typed.delivery -import scala.concurrent.duration._ import scala.concurrent.duration.FiniteDuration +import scala.concurrent.duration._ import scala.util.Failure import scala.util.Success @@ -43,12 +43,17 @@ object TestProducerWithAsk { } private def idle(n: Int, replyProbe: ActorRef[Long]): Behavior[Command] = { - Behaviors.receiveMessage { - case Tick => Behaviors.same - case RequestNext(sendTo) => active(n + 1, replyProbe, sendTo) - case Confirmed(seqNr) => - replyProbe ! seqNr - Behaviors.same + Behaviors.receive { (ctx, msg) => + msg match { + case Tick => Behaviors.same + case RequestNext(sendTo) => active(n + 1, replyProbe, sendTo) + case Confirmed(seqNr) => + replyProbe ! seqNr + Behaviors.same + case AskTimeout => + ctx.log.warn("Timeout") + Behaviors.same + } } } diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/delivery/WorkPullingSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/delivery/WorkPullingSpec.scala index 09a3198f62..d27db4b63d 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/delivery/WorkPullingSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/delivery/WorkPullingSpec.scala @@ -56,14 +56,14 @@ class WorkPullingSpec val jobProducer = spawn(TestProducerWorkPulling(defaultProducerDelay, workPullingController), name = s"jobProducer-${idCount}") - val consumerEndProbe1 = createTestProbe[TestConsumer.CollectedProducerIds]() + val consumerEndProbe1 = createTestProbe[TestConsumer.Collected]() val workerController1 = spawn(ConsumerController[TestConsumer.Job](workerServiceKey), s"workerController1-${idCount}") spawn( TestConsumer(defaultConsumerDelay, 42, consumerEndProbe1.ref, workerController1), name = s"worker1-${idCount}") - val consumerEndProbe2 = createTestProbe[TestConsumer.CollectedProducerIds]() + val consumerEndProbe2 = createTestProbe[TestConsumer.Collected]() val workerController2 = spawn(ConsumerController[TestConsumer.Job](workerServiceKey), s"workerController2-${idCount}") spawn( diff --git a/akka-actor-typed/src/main/mima-filters/2.6.8.backwards.excludes/issue-24276-reliable-chunks.excludes b/akka-actor-typed/src/main/mima-filters/2.6.8.backwards.excludes/issue-24276-reliable-chunks.excludes new file mode 100644 index 0000000000..0907ffe175 --- /dev/null +++ b/akka-actor-typed/src/main/mima-filters/2.6.8.backwards.excludes/issue-24276-reliable-chunks.excludes @@ -0,0 +1,14 @@ +# #24276 Chunked messages in Reliable Delivery +ProblemFilters.exclude[Problem]("akka.actor.typed.delivery.internal.*") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.typed.delivery.ProducerController#Settings.this") + +ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.actor.typed.delivery.DurableProducerQueue#MessageSent.apply") +ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.actor.typed.delivery.DurableProducerQueue#MessageSent.unapply") +ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.actor.typed.delivery.ConsumerController#SequencedMessage.apply") +ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.actor.typed.delivery.ConsumerController#SequencedMessage.unapply") +ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.actor.typed.delivery.ConsumerController#SequencedMessage.message") +ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.actor.typed.delivery.ConsumerController#SequencedMessage.copy") +ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.actor.typed.delivery.ConsumerController#SequencedMessage.copy$default$3") +ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.actor.typed.delivery.ConsumerController#SequencedMessage.this") +ProblemFilters.exclude[Problem]("akka.actor.typed.delivery.DurableProducerQueue$MessageSent*") +ProblemFilters.exclude[Problem]("akka.actor.typed.delivery.DurableProducerQueue#MessageSent*") diff --git a/akka-actor-typed/src/main/resources/reference.conf b/akka-actor-typed/src/main/resources/reference.conf index 3cb166b939..d34d52aeef 100644 --- a/akka-actor-typed/src/main/resources/reference.conf +++ b/akka-actor-typed/src/main/resources/reference.conf @@ -67,6 +67,15 @@ akka.use-slf4j = on akka.reliable-delivery { producer-controller { + + # To avoid head of line blocking from serialization and transfer + # of large messages this can be enabled. + # Large messages are chunked into pieces of the given size in bytes. The + # chunked messages are sent separatetely and assembled on the consumer side. + # Serialization and deserialization is performed by the ProducerController and + # ConsumerController respectively instead of in the remote transport layer. + chunk-large-messages = off + durable-queue { # The ProducerController uses this timeout for the requests to # the durable queue. If there is no reply within the timeout it @@ -111,6 +120,10 @@ akka.reliable-delivery { # Ask timeout for sending message to worker until receiving Ack from worker internal-ask-timeout = 60s + + # Chunked messages not implemented for work-pulling yet. Override to not + # propagate property from akka.reliable-delivery.producer-controller. + chunk-large-messages = off } } } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/ConsumerController.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/ConsumerController.scala index 7df92fe216..eba9bc2587 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/ConsumerController.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/ConsumerController.scala @@ -14,6 +14,7 @@ import akka.actor.DeadLetterSuppression import akka.actor.typed.ActorRef import akka.actor.typed.ActorSystem import akka.actor.typed.Behavior +import akka.actor.typed.delivery.internal.ChunkedMessage import akka.actor.typed.delivery.internal.ConsumerControllerImpl import akka.actor.typed.delivery.internal.DeliverySerializable import akka.actor.typed.delivery.internal.ProducerControllerImpl @@ -124,6 +125,26 @@ object ConsumerController { final case class DeliverThenStop[A]() extends Command[A] + object SequencedMessage { + + /** + * SequencedMessage.message can be `A` or `ChunkedMessage`. + */ + type MessageOrChunk = Any + + /** + * INTERNAL API + */ + @InternalApi private[akka] def fromChunked[A]( + producerId: String, + seqNr: SeqNr, + chunk: ChunkedMessage, + first: Boolean, + ack: Boolean, + producerController: ActorRef[ProducerControllerImpl.InternalCommand]): SequencedMessage[A] = + new SequencedMessage(producerId, seqNr, chunk, first, ack)(producerController) + } + /** * This is used between the `ProducerController` and `ConsumerController`. Should rarely be used in * application code but is public because it's in the signature for the `EntityTypeKey` when using @@ -135,8 +156,12 @@ object ConsumerController { * * @param producerController INTERNAL API: construction of SequencedMessage is internal */ - final case class SequencedMessage[A](producerId: String, seqNr: SeqNr, message: A, first: Boolean, ack: Boolean)( - @InternalApi private[akka] val producerController: ActorRef[ProducerControllerImpl.InternalCommand]) + final case class SequencedMessage[A]( + producerId: String, + seqNr: SeqNr, + message: SequencedMessage.MessageOrChunk, + first: Boolean, + ack: Boolean)(@InternalApi private[akka] val producerController: ActorRef[ProducerControllerImpl.InternalCommand]) extends Command[A] with DeliverySerializable with DeadLetterSuppression { @@ -144,6 +169,22 @@ object ConsumerController { /** INTERNAL API */ @InternalApi private[akka] def asFirst: SequencedMessage[A] = copy(first = true)(producerController) + + /** INTERNAL API */ + @InternalApi private[akka] def isFirstChunk: Boolean = { + message match { + case c: ChunkedMessage => c.firstChunk + case _ => true + } + } + + /** INTERNAL API */ + @InternalApi private[akka] def isLastChunk: Boolean = { + message match { + case c: ChunkedMessage => c.lastChunk + case _ => true + } + } } object Settings { diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/DurableProducerQueue.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/DurableProducerQueue.scala index 38d577ab49..e8be18d992 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/DurableProducerQueue.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/DurableProducerQueue.scala @@ -7,6 +7,7 @@ package akka.actor.typed.delivery import scala.collection.immutable import akka.actor.typed.ActorRef +import akka.actor.typed.delivery.internal.ChunkedMessage import akka.actor.typed.delivery.internal.DeliverySerializable import akka.annotation.ApiMayChange import akka.annotation.InternalApi @@ -67,7 +68,60 @@ object DurableProducerQueue { highestConfirmedSeqNr: SeqNr, confirmedSeqNr: Map[ConfirmationQualifier, (SeqNr, TimestampMillis)], unconfirmed: immutable.IndexedSeq[MessageSent[A]]) - extends DeliverySerializable + extends DeliverySerializable { + + def addMessageSent(sent: MessageSent[A]): State[A] = { + copy(currentSeqNr = sent.seqNr + 1, unconfirmed = unconfirmed :+ sent) + } + + def confirmed( + seqNr: SeqNr, + confirmationQualifier: ConfirmationQualifier, + timestampMillis: TimestampMillis): State[A] = { + val newUnconfirmed = unconfirmed.filterNot { u => + u.confirmationQualifier == confirmationQualifier && u.seqNr <= seqNr + } + copy( + highestConfirmedSeqNr = math.max(highestConfirmedSeqNr, seqNr), + confirmedSeqNr = confirmedSeqNr.updated(confirmationQualifier, (seqNr, timestampMillis)), + unconfirmed = newUnconfirmed) + } + + def cleanup(confirmationQualifiers: Set[String]): State[A] = { + copy(confirmedSeqNr = confirmedSeqNr -- confirmationQualifiers) + } + + /** + * If not all chunked messages were stored before crash those partial chunked messages should not be resent. + */ + def cleanupPartialChunkedMessages(): State[A] = { + if (unconfirmed.isEmpty || unconfirmed.forall(u => u.isFirstChunk && u.isLastChunk)) { + this + } else { + val tmp = Vector.newBuilder[MessageSent[A]] + val newUnconfirmed = Vector.newBuilder[MessageSent[A]] + var newCurrentSeqNr = highestConfirmedSeqNr + 1 + unconfirmed.foreach { u => + if (u.isFirstChunk && u.isLastChunk) { + tmp.clear() + newUnconfirmed += u + newCurrentSeqNr = u.seqNr + 1 + } else if (u.isFirstChunk && !u.isLastChunk) { + tmp.clear() + tmp += u + } else if (!u.isLastChunk) { + tmp += u + } else if (u.isLastChunk) { + newUnconfirmed ++= tmp.result() + newUnconfirmed += u + newCurrentSeqNr = u.seqNr + 1 + tmp.clear() + } + } + copy(currentSeqNr = newCurrentSeqNr, unconfirmed = newUnconfirmed.result()) + } + } + } /** * INTERNAL API @@ -77,13 +131,92 @@ object DurableProducerQueue { /** * The fact (event) that a message has been sent. */ - final case class MessageSent[A]( - seqNr: SeqNr, - message: A, - ack: Boolean, - confirmationQualifier: ConfirmationQualifier, - timestampMillis: TimestampMillis) - extends Event + final class MessageSent[A]( + val seqNr: SeqNr, + val message: MessageSent.MessageOrChunk, + val ack: Boolean, + val confirmationQualifier: ConfirmationQualifier, + val timestampMillis: TimestampMillis) + extends Event { + + /** INTERNAL API */ + @InternalApi private[akka] def isFirstChunk: Boolean = { + message match { + case c: ChunkedMessage => c.firstChunk + case _ => true + } + } + + /** INTERNAL API */ + @InternalApi private[akka] def isLastChunk: Boolean = { + message match { + case c: ChunkedMessage => c.lastChunk + case _ => true + } + } + + def withConfirmationQualifier(qualifier: ConfirmationQualifier): MessageSent[A] = + new MessageSent(seqNr, message, ack, qualifier, timestampMillis) + + def withTimestampMillis(timestamp: TimestampMillis): MessageSent[A] = + new MessageSent(seqNr, message, ack, confirmationQualifier, timestamp) + + override def equals(obj: Any): Boolean = { + obj match { + case other: MessageSent[_] => + seqNr == other.seqNr && message == other.message && ack == other.ack && confirmationQualifier == other.confirmationQualifier && timestampMillis == other.timestampMillis + case _ => false + } + } + + override def hashCode(): Int = seqNr.hashCode() + + override def toString: ConfirmationQualifier = + s"MessageSent($seqNr,$message,$ack,$confirmationQualifier,$timestampMillis)" + + } + + object MessageSent { + + /** + * SequencedMessage.message can be `A` or `ChunkedMessage`. + */ + type MessageOrChunk = Any + + def apply[A]( + seqNr: SeqNr, + message: A, + ack: Boolean, + confirmationQualifier: ConfirmationQualifier, + timestampMillis: TimestampMillis): MessageSent[A] = + new MessageSent(seqNr, message, ack, confirmationQualifier, timestampMillis) + + /** + * INTERNAL API + */ + @InternalApi private[akka] def fromChunked[A]( + seqNr: SeqNr, + chunkedMessage: ChunkedMessage, + ack: Boolean, + confirmationQualifier: ConfirmationQualifier, + timestampMillis: TimestampMillis): MessageSent[A] = + new MessageSent(seqNr, chunkedMessage, ack, confirmationQualifier, timestampMillis) + + /** + * INTERNAL API + */ + @InternalApi private[akka] def fromMessageOrChunked[A]( + seqNr: SeqNr, + message: MessageOrChunk, + ack: Boolean, + confirmationQualifier: ConfirmationQualifier, + timestampMillis: TimestampMillis): MessageSent[A] = + new MessageSent(seqNr, message, ack, confirmationQualifier, timestampMillis) + + def unapply( + sent: MessageSent[_]): Option[(SeqNr, MessageOrChunk, Boolean, ConfirmationQualifier, TimestampMillis)] = + Some((sent.seqNr, sent.message, sent.ack, sent.confirmationQualifier, sent.timestampMillis)) + } /** * INTERNAL API: The fact (event) that a message has been confirmed to be delivered and processed. diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/ProducerController.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/ProducerController.scala index c9eca025dd..3a21237fe1 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/ProducerController.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/ProducerController.scala @@ -21,6 +21,8 @@ import akka.actor.typed.delivery.internal.ProducerControllerImpl import akka.actor.typed.scaladsl.Behaviors import akka.annotation.ApiMayChange import akka.annotation.InternalApi +import akka.util.Helpers.toRootLowerCase +import akka.util.Helpers.Requiring import akka.util.JavaDurationConverters._ /** @@ -153,10 +155,16 @@ object ProducerController { * `akka.reliable-delivery.producer-controller`. */ def apply(config: Config): Settings = { + val chunkLargeMessagesBytes = toRootLowerCase(config.getString("chunk-large-messages")) match { + case "off" => 0 + case _ => + config.getBytes("chunk-large-messages").requiring(_ <= Int.MaxValue, "Too large chunk-large-messages.").toInt + } new Settings( durableQueueRequestTimeout = config.getDuration("durable-queue.request-timeout").asScala, durableQueueRetryAttempts = config.getInt("durable-queue.retry-attempts"), - durableQueueResendFirstInterval = config.getDuration("durable-queue.resend-first-interval").asScala) + durableQueueResendFirstInterval = config.getDuration("durable-queue.resend-first-interval").asScala, + chunkLargeMessagesBytes) } /** @@ -177,7 +185,8 @@ object ProducerController { final class Settings private ( val durableQueueRequestTimeout: FiniteDuration, val durableQueueRetryAttempts: Int, - val durableQueueResendFirstInterval: FiniteDuration) { + val durableQueueResendFirstInterval: FiniteDuration, + val chunkLargeMessagesBytes: Int) { def withDurableQueueRetryAttempts(newDurableQueueRetryAttempts: Int): Settings = copy(durableQueueRetryAttempts = newDurableQueueRetryAttempts) @@ -212,17 +221,25 @@ object ProducerController { def getDurableQueueRequestTimeout(): JavaDuration = durableQueueRequestTimeout.asJava + def withChunkLargeMessagesBytes(newChunkLargeMessagesBytes: Int): Settings = + copy(chunkLargeMessagesBytes = newChunkLargeMessagesBytes) + /** * Private copy method for internal use only. */ private def copy( durableQueueRequestTimeout: FiniteDuration = durableQueueRequestTimeout, durableQueueRetryAttempts: Int = durableQueueRetryAttempts, - durableQueueResendFirstInterval: FiniteDuration = durableQueueResendFirstInterval) = - new Settings(durableQueueRequestTimeout, durableQueueRetryAttempts, durableQueueResendFirstInterval) + durableQueueResendFirstInterval: FiniteDuration = durableQueueResendFirstInterval, + chunkLargeMessagesBytes: Int = chunkLargeMessagesBytes) = + new Settings( + durableQueueRequestTimeout, + durableQueueRetryAttempts, + durableQueueResendFirstInterval, + chunkLargeMessagesBytes) override def toString: String = - s"Settings($durableQueueRequestTimeout, $durableQueueRetryAttempts, $durableQueueResendFirstInterval)" + s"Settings($durableQueueRequestTimeout, $durableQueueRetryAttempts, $durableQueueResendFirstInterval, $chunkLargeMessagesBytes)" } def apply[A: ClassTag]( diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/WorkPullingProducerController.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/WorkPullingProducerController.scala index 9135adf33c..2a4383615f 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/WorkPullingProducerController.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/WorkPullingProducerController.scala @@ -178,6 +178,9 @@ object WorkPullingProducerController { val internalAskTimeout: FiniteDuration, val producerControllerSettings: ProducerController.Settings) { + if (producerControllerSettings.chunkLargeMessagesBytes > 0) + throw new IllegalArgumentException("Chunked messages not implemented for work-pulling yet.") + def withBufferSize(newBufferSize: Int): Settings = copy(bufferSize = newBufferSize) diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/internal/ChunkedMessage.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/internal/ChunkedMessage.scala new file mode 100644 index 0000000000..63f1416e8d --- /dev/null +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/internal/ChunkedMessage.scala @@ -0,0 +1,22 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package akka.actor.typed.delivery.internal + +import akka.annotation.InternalApi +import akka.util.ByteString + +/** + * INTERNAL API + */ +@InternalApi private[akka] final case class ChunkedMessage( + serialized: ByteString, + firstChunk: Boolean, + lastChunk: Boolean, + serializerId: Int, + manifest: String) { + + override def toString: String = + s"ChunkedMessage(${serialized.size},$firstChunk,$lastChunk,$serializerId,$manifest)" +} diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/internal/ConsumerControllerImpl.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/internal/ConsumerControllerImpl.scala index 362b6efc87..230c989dec 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/internal/ConsumerControllerImpl.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/internal/ConsumerControllerImpl.scala @@ -21,6 +21,8 @@ import akka.actor.typed.scaladsl.LoggerOps import akka.actor.typed.scaladsl.StashBuffer import akka.actor.typed.scaladsl.TimerScheduler import akka.annotation.InternalApi +import akka.serialization.SerializationExtension +import akka.util.ByteString import akka.util.ConstantFun.scalaIdentityFunction /** @@ -86,6 +88,7 @@ import akka.util.ConstantFun.scalaIdentityFunction receivedSeqNr: SeqNr, confirmedSeqNr: SeqNr, requestedSeqNr: SeqNr, + collectedChunks: List[SequencedMessage[A]], registering: Option[ActorRef[ProducerController.Command[A]]], stopping: Boolean) { @@ -101,6 +104,11 @@ import akka.util.ConstantFun.scalaIdentityFunction case s @ Some(reg) => if (seqMsg.producerController == reg) None else s } } + + def clearCollectedChunks(): State[A] = { + if (collectedChunks == Nil) this + else copy(collectedChunks = Nil) + } } def apply[A]( @@ -202,6 +210,7 @@ import akka.util.ConstantFun.scalaIdentityFunction receivedSeqNr = 0, confirmedSeqNr = 0, requestedSeqNr = 0, + collectedChunks = Nil, registering, stopping) } @@ -269,6 +278,8 @@ private class ConsumerControllerImpl[A]( private val traceEnabled = context.log.isTraceEnabled + private lazy val serialization = SerializationExtension(context.system) + retryTimer.start() private def resendLost = !settings.onlyFlowControl @@ -475,8 +486,56 @@ private class ConsumerControllerImpl[A]( } private def deliver(s: State[A], seqMsg: SequencedMessage[A]): Behavior[InternalCommand] = { - s.consumer ! Delivery(seqMsg.message, context.self, seqMsg.producerId, seqMsg.seqNr) - waitingForConfirmation(s, seqMsg) + def previouslyCollectedChunks = if (seqMsg.isFirstChunk) Nil else s.collectedChunks + if (seqMsg.isLastChunk) { + val assembledSeqMsg = + if (seqMsg.message.isInstanceOf[ChunkedMessage]) assembleChunks(seqMsg :: previouslyCollectedChunks) + else seqMsg + s.consumer ! Delivery(assembledSeqMsg.message.asInstanceOf[A], context.self, seqMsg.producerId, seqMsg.seqNr) + waitingForConfirmation(s.clearCollectedChunks(), assembledSeqMsg) + } else { + // collecting chunks + val newRequestedSeqNr = + if ((s.requestedSeqNr - seqMsg.seqNr) == flowControlWindow / 2) { + val newRequestedSeqNr = s.requestedSeqNr + flowControlWindow / 2 + flightRecorder.consumerSentRequest(seqMsg.producerId, newRequestedSeqNr) + context.log.debugN( + "Sending Request when collecting chunks seqNr [{}], confirmedSeqNr [{}], requestUpToSeqNr [{}].", + seqMsg.seqNr, + s.confirmedSeqNr, + newRequestedSeqNr) + s.producerController ! Request( + confirmedSeqNr = s.confirmedSeqNr, + newRequestedSeqNr, + resendLost, + viaTimeout = false) + retryTimer.start() // reset interval since Request was just sent + newRequestedSeqNr + } else { + s.requestedSeqNr + } + + stashBuffer.unstash( + active(s.copy(collectedChunks = seqMsg :: previouslyCollectedChunks, requestedSeqNr = newRequestedSeqNr)), + 1, + scalaIdentityFunction) + } + } + + private def assembleChunks(collectedChunks: List[SequencedMessage[A]]): SequencedMessage[A] = { + val reverseCollectedChunks = collectedChunks.reverse + val builder = ByteString.createBuilder + reverseCollectedChunks.foreach { seqMsg => + builder ++= seqMsg.message.asInstanceOf[ChunkedMessage].serialized + } + val bytes = builder.result().toArray + val head = collectedChunks.head // this is the last chunk + val headMessage = head.message.asInstanceOf[ChunkedMessage] + // serialization exceptions are thrown, because it will anyway be stuck with same error if retried and + // we can't just ignore the message + val message = serialization.deserialize(bytes, headMessage.serializerId, headMessage.manifest).get + SequencedMessage(head.producerId, head.seqNr, message, reverseCollectedChunks.head.first, head.ack)( + head.producerController) } // The message has been delivered to the consumer and it is now waiting for Confirmed from @@ -564,7 +623,7 @@ private class ConsumerControllerImpl[A]( Behaviors.same case start: Start[A] => - start.deliverTo ! Delivery(seqMsg.message, context.self, seqMsg.producerId, seqMsg.seqNr) + start.deliverTo ! Delivery(seqMsg.message.asInstanceOf[A], context.self, seqMsg.producerId, seqMsg.seqNr) receiveStart(s, start, newState => waitingForConfirmation(newState, seqMsg)) case ConsumerTerminated(c) => diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/internal/ProducerControllerImpl.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/internal/ProducerControllerImpl.scala index c08aaa3908..aecd1154c1 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/internal/ProducerControllerImpl.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/internal/ProducerControllerImpl.scala @@ -6,6 +6,7 @@ package akka.actor.typed.delivery.internal import java.util.concurrent.TimeoutException +import scala.collection.immutable import scala.reflect.ClassTag import scala.util.Failure import scala.util.Success @@ -23,6 +24,10 @@ import akka.actor.typed.scaladsl.ActorContext import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.scaladsl.LoggerOps import akka.actor.typed.scaladsl.TimerScheduler +import akka.serialization.Serialization +import akka.serialization.SerializationExtension +import akka.serialization.Serializers +import akka.util.ByteString import akka.util.Timeout /** @@ -100,16 +105,16 @@ object ProducerControllerImpl { private case class Msg[A](msg: A) extends InternalCommand private case object ResendFirst extends InternalCommand case object ResendFirstUnconfirmed extends InternalCommand + private case object SendChunk extends InternalCommand private case class LoadStateReply[A](state: DurableProducerQueue.State[A]) extends InternalCommand private case class LoadStateFailed(attempt: Int) extends InternalCommand private case class StoreMessageSentReply(ack: DurableProducerQueue.StoreMessageSentAck) private case class StoreMessageSentFailed[A](messageSent: DurableProducerQueue.MessageSent[A], attempt: Int) extends InternalCommand - private case object DurableQueueTerminated extends InternalCommand - private case class StoreMessageSentCompleted[A](messageSent: DurableProducerQueue.MessageSent[A]) extends InternalCommand + private case object DurableQueueTerminated extends InternalCommand private final case class State[A]( requested: Boolean, @@ -119,6 +124,8 @@ object ProducerControllerImpl { replyAfterStore: Map[SeqNr, ActorRef[SeqNr]], supportResend: Boolean, unconfirmed: Vector[ConsumerController.SequencedMessage[A]], + remainingChunks: immutable.Seq[SequencedMessage[A]], + storeMessageSentInProgress: SeqNr, firstSeqNr: SeqNr, producer: ActorRef[ProducerController.RequestNext[A]], send: ConsumerController.SequencedMessage[A] => Unit) @@ -236,6 +243,8 @@ object ProducerControllerImpl { replyAfterStore = Map.empty, supportResend = true, unconfirmed = unconfirmed, + remainingChunks = Nil, + storeMessageSentInProgress = 0, firstSeqNr = loadedState.highestConfirmedSeqNr + 1, producer, send) @@ -329,6 +338,30 @@ object ProducerControllerImpl { throw new IllegalArgumentException(s"Consumer [$ref] should be local.") } + def createChunks[A](m: A, chunkSize: Int, serialization: Serialization): immutable.Seq[ChunkedMessage] = { + val mAnyRef = m.asInstanceOf[AnyRef] + // serialization exceptions are thrown + val bytes = serialization.serialize(mAnyRef).get + val ser = serialization.findSerializerFor(mAnyRef) + val manifest = Serializers.manifestFor(ser, mAnyRef) + val serializerId = ser.identifier + if (bytes.length <= chunkSize) { + ChunkedMessage(ByteString(bytes), firstChunk = true, lastChunk = true, serializerId, manifest) :: Nil + } else { + val builder = Vector.newBuilder[ChunkedMessage] + val chunksIter = ByteString(bytes).grouped(chunkSize) + var first = true + while (chunksIter.hasNext) { + val chunk = chunksIter.next() + val firstChunk = first + first = false + val lastChunk = !chunksIter.hasNext + builder += ChunkedMessage(chunk, firstChunk, lastChunk, serializerId, manifest) + } + builder.result() + } + } + } private class ProducerControllerImpl[A: ClassTag]( @@ -356,13 +389,20 @@ private class ProducerControllerImpl[A: ClassTag]( // for the durableQueue StoreMessageSent ask private implicit val askTimeout: Timeout = settings.durableQueueRequestTimeout + private lazy val serialization = SerializationExtension(context.system) + private def active(s: State[A]): Behavior[InternalCommand] = { - def onMsg(m: A, newReplyAfterStore: Map[SeqNr, ActorRef[SeqNr]], ack: Boolean): Behavior[InternalCommand] = { + def onMsg( + seqMsg: SequencedMessage[A], + newReplyAfterStore: Map[SeqNr, ActorRef[SeqNr]], + newRemainingChunks: immutable.Seq[SequencedMessage[A]]): Behavior[InternalCommand] = { checkOnMsgRequestedState() + if (seqMsg.isLastChunk != newRemainingChunks.isEmpty) + throw new IllegalStateException( + s"seqMsg [${seqMsg.seqNr}] was lastChunk but remaining [${newRemainingChunks.size}] chunks.") if (traceEnabled) - context.log.trace("Sending [{}] with seqNr [{}].", m.getClass.getName, s.currentSeqNr) - val seqMsg = SequencedMessage(producerId, s.currentSeqNr, m, s.currentSeqNr == s.firstSeqNr, ack)(context.self) + context.log.trace("Sending [{}] with seqNr [{}].", seqMsg.message.getClass.getName, s.currentSeqNr) val newUnconfirmed = if (s.supportResend) s.unconfirmed :+ seqMsg else Vector.empty // no resending, no need to keep unconfirmed @@ -375,18 +415,24 @@ private class ProducerControllerImpl[A: ClassTag]( val newRequested = if (s.currentSeqNr == s.requestedSeqNr) { flightRecorder.producerWaitingForRequest(producerId, s.currentSeqNr) - false - } else { + newRemainingChunks.nonEmpty // keep it true until lastChunk + } else if (seqMsg.isLastChunk) { flightRecorder.producerRequestNext(producerId, s.currentSeqNr + 1, s.confirmedSeqNr) s.producer ! RequestNext(producerId, s.currentSeqNr + 1, s.confirmedSeqNr, msgAdapter, context.self) true + } else { + context.self ! SendChunk + true // keep it true until lastChunk } + active( s.copy( requested = newRequested, currentSeqNr = s.currentSeqNr + 1, replyAfterStore = newReplyAfterStore, - unconfirmed = newUnconfirmed)) + unconfirmed = newUnconfirmed, + remainingChunks = newRemainingChunks, + storeMessageSentInProgress = 0)) } def checkOnMsgRequestedState(): Unit = { @@ -397,6 +443,12 @@ private class ProducerControllerImpl[A: ClassTag]( } } + def checkReceiveMessageRemainingChunksState(): Unit = { + if (s.remainingChunks.nonEmpty) + throw new IllegalStateException( + s"Received unexpected message before sending remaining [${s.remainingChunks.size}] chunks.") + } + def receiveRequest( newConfirmedSeqNr: SeqNr, newRequestedSeqNr: SeqNr, @@ -434,13 +486,23 @@ private class ProducerControllerImpl[A: ClassTag]( stateAfterAck.currentSeqNr) if (newRequestedSeqNr2 > s.requestedSeqNr) { - if (!s.requested && (newRequestedSeqNr2 - s.currentSeqNr) > 0) { - flightRecorder.producerRequestNext(producerId, s.currentSeqNr, newConfirmedSeqNr) - s.producer ! RequestNext(producerId, s.currentSeqNr, newConfirmedSeqNr, msgAdapter, context.self) - } + val newRequested = + if (s.storeMessageSentInProgress != 0) { + s.requested + } else if (s.remainingChunks.nonEmpty) { + context.self ! SendChunk + s.requested + } else if (!s.requested && (newRequestedSeqNr2 - s.currentSeqNr) > 0) { + flightRecorder.producerRequestNext(producerId, s.currentSeqNr, newConfirmedSeqNr) + s.producer ! RequestNext(producerId, s.currentSeqNr, newConfirmedSeqNr, msgAdapter, context.self) + true + } else { + s.requested + } + active( stateAfterAck.copy( - requested = true, + requested = newRequested, requestedSeqNr = newRequestedSeqNr2, supportResend = supportResend, unconfirmed = newUnconfirmed)) @@ -486,29 +548,97 @@ private class ProducerControllerImpl[A: ClassTag]( s.copy(confirmedSeqNr = newMaxConfirmedSeqNr, replyAfterStore = newReplyAfterStore, unconfirmed = newUnconfirmed) } - def receiveStoreMessageSentCompleted(seqNr: SeqNr, m: A, ack: Boolean) = { - if (seqNr != s.currentSeqNr) - throw new IllegalStateException(s"currentSeqNr [${s.currentSeqNr}] not matching stored seqNr [$seqNr]") + def receiveStoreMessageSentCompleted(seqNr: SeqNr): Behavior[InternalCommand] = { + if (seqNr == s.storeMessageSentInProgress) { + if (seqNr != s.currentSeqNr) + throw new IllegalStateException(s"currentSeqNr [${s.currentSeqNr}] not matching stored seqNr [$seqNr]") + val seqMsg = s.remainingChunks.head + if (seqNr != seqMsg.seqNr) + throw new IllegalStateException(s"seqNr [${seqMsg.seqNr}] not matching stored seqNr [$seqNr]") - s.replyAfterStore.get(seqNr).foreach { replyTo => - if (traceEnabled) - context.log.trace("Sending confirmation reply to [{}] after storage.", seqNr) - replyTo ! seqNr + s.replyAfterStore.get(seqNr).foreach { replyTo => + if (traceEnabled) + context.log.trace("Sending confirmation reply to [{}] after storage.", seqNr) + replyTo ! seqNr + } + val newReplyAfterStore = s.replyAfterStore - seqNr + + onMsg(seqMsg, newReplyAfterStore, s.remainingChunks.tail) + } else { + context.log.debug( + "Received StoreMessageSentCompleted for seqNr [{}] but waiting for [{}]. " + + "Probably due to retry.", + seqNr, + s.storeMessageSentInProgress) + Behaviors.same } - val newReplyAfterStore = s.replyAfterStore - seqNr + } - onMsg(m, newReplyAfterStore, ack) + def receiveStoreMessageSentFailed(f: StoreMessageSentFailed[A]): Behavior[InternalCommand] = { + if (f.messageSent.seqNr == s.storeMessageSentInProgress) { + if (f.attempt >= settings.durableQueueRetryAttempts) { + val errorMessage = + s"StoreMessageSentFailed seqNr [${f.messageSent.seqNr}] failed after [${f.attempt}] attempts, giving up." + context.log.error(errorMessage) + throw new TimeoutException(errorMessage) + } else { + context.log.warnN( + "StoreMessageSent seqNr [{}] failed, attempt [{}] of [{}], retrying.", + f.messageSent.seqNr, + f.attempt, + settings.durableQueueRetryAttempts) + // retry + if (f.messageSent.isFirstChunk) { + storeMessageSent(f.messageSent, attempt = f.attempt + 1) + Behaviors.same + } else { + // store all chunks again, because partially stored chunks are discarded by the DurableQueue + // when it's restarted + val unconfirmedReverse = s.unconfirmed.reverse + val xs = unconfirmedReverse.takeWhile(!_.isFirstChunk) + if (unconfirmedReverse.size == xs.size) + throw new IllegalStateException(s"First chunk not found in unconfirmed: ${s.unconfirmed}") + val firstChunk = unconfirmedReverse.drop(xs.size).head + val newRemainingChunks = (firstChunk +: xs.reverse) ++ s.remainingChunks + val newUnconfirmed = s.unconfirmed.dropRight(xs.size + 1) + + context.log.debug( + "Store all [{}] chunks again, starting at seqNr [{}].", + newRemainingChunks.size, + firstChunk.seqNr) + + if (!newRemainingChunks.head.isFirstChunk || !newRemainingChunks.last.isLastChunk) + throw new IllegalStateException(s"Wrong remainingChunks: $newRemainingChunks") + + storeMessageSent( + MessageSent.fromMessageOrChunked( + firstChunk.seqNr, + firstChunk.message, + firstChunk.ack, + NoQualifier, + System.currentTimeMillis()), + attempt = f.attempt + 1) + active( + s.copy( + storeMessageSentInProgress = firstChunk.seqNr, + remainingChunks = newRemainingChunks, + unconfirmed = newUnconfirmed, + currentSeqNr = firstChunk.seqNr)) + } + } + } else { + Behaviors.same + } } def receiveResend(fromSeqNr: SeqNr): Behavior[InternalCommand] = { flightRecorder.producerReceivedResend(producerId, fromSeqNr) - val newUnconfirmed = - if (fromSeqNr == 0 && s.unconfirmed.nonEmpty) - s.unconfirmed.head.asFirst +: s.unconfirmed.tail - else - s.unconfirmed.dropWhile(_.seqNr < fromSeqNr) - resendUnconfirmed(newUnconfirmed) - active(s.copy(unconfirmed = newUnconfirmed)) + resendUnconfirmed(s.unconfirmed.dropWhile(_.seqNr < fromSeqNr)) + if (fromSeqNr == 0 && s.unconfirmed.nonEmpty) { + val newUnconfirmed = s.unconfirmed.head.asFirst +: s.unconfirmed.tail + active(s.copy(unconfirmed = newUnconfirmed)) + } else + Behaviors.same } def resendUnconfirmed(newUnconfirmed: Vector[SequencedMessage[A]]): Unit = { @@ -545,7 +675,7 @@ private class ProducerControllerImpl[A: ClassTag]( def receiveStart(start: Start[A]): Behavior[InternalCommand] = { ProducerControllerImpl.enforceLocalProducer(start.producer) context.log.debug("Register new Producer [{}], currentSeqNr [{}].", start.producer, s.currentSeqNr) - if (s.requested) { + if (s.requested && s.remainingChunks.isEmpty) { flightRecorder.producerRequestNext(producerId, s.currentSeqNr, s.confirmedSeqNr) start.producer ! RequestNext(producerId, s.currentSeqNr, s.confirmedSeqNr, msgAdapter, context.self) } @@ -570,32 +700,101 @@ private class ProducerControllerImpl[A: ClassTag]( active(s.copy(firstSeqNr = newFirstSeqNr, send = newSend)) } + def receiveSendChunk(): Behavior[InternalCommand] = { + if (s.remainingChunks.nonEmpty && s.remainingChunks.head.seqNr <= s.requestedSeqNr && s.storeMessageSentInProgress == 0) { + if (traceEnabled) + context.log.trace("Send next chunk seqNr [{}].", s.remainingChunks.head.seqNr) + if (durableQueue.isEmpty) { + onMsg(s.remainingChunks.head, s.replyAfterStore, s.remainingChunks.tail) + } else { + val seqMsg = s.remainingChunks.head + storeMessageSent( + MessageSent + .fromMessageOrChunked(seqMsg.seqNr, seqMsg.message, seqMsg.ack, NoQualifier, System.currentTimeMillis()), + attempt = 1) + active(s.copy(storeMessageSentInProgress = seqMsg.seqNr)) // still same s.remainingChunks + } + } else { + Behaviors.same + } + } + + def chunk(m: A, ack: Boolean): immutable.Seq[SequencedMessage[A]] = { + val chunkSize = settings.chunkLargeMessagesBytes + if (chunkSize > 0) { + val chunkedMessages = createChunks(m, chunkSize, serialization) + + if (traceEnabled) { + if (chunkedMessages.size == 1) + context.log.trace( + "No chunking of seqNr [{}], size [{} bytes].", + s.currentSeqNr, + chunkedMessages.head.serialized.size) + else + context.log.traceN( + "Chunked seqNr [{}] into [{}] pieces, total size [{} bytes].", + s.currentSeqNr, + chunkedMessages.size, + chunkedMessages.map(_.serialized.size).sum) + } + + var i = 0 + chunkedMessages.map { chunkedMessage => + val seqNr = s.currentSeqNr + i + i += 1 + SequencedMessage.fromChunked[A]( + producerId, + seqNr, + chunkedMessage, + seqNr == s.firstSeqNr, + ack && chunkedMessage.lastChunk, // only the last need ack = true + context.self) + } + } else { + val seqMsg = + SequencedMessage[A](producerId, s.currentSeqNr, m, s.currentSeqNr == s.firstSeqNr, ack)(context.self) + seqMsg :: Nil + } + } + Behaviors.receiveMessage { case MessageWithConfirmation(m: A, replyTo) => + checkReceiveMessageRemainingChunksState() flightRecorder.producerReceived(producerId, s.currentSeqNr) - val newReplyAfterStore = s.replyAfterStore.updated(s.currentSeqNr, replyTo) + val chunks = chunk(m, ack = true) + val newReplyAfterStore = s.replyAfterStore.updated(chunks.last.seqNr, replyTo) if (durableQueue.isEmpty) { - onMsg(m, newReplyAfterStore, ack = true) + onMsg(chunks.head, newReplyAfterStore, chunks.tail) } else { + val seqMsg = chunks.head storeMessageSent( - MessageSent(s.currentSeqNr, m, ack = true, NoQualifier, System.currentTimeMillis()), + MessageSent + .fromMessageOrChunked(seqMsg.seqNr, seqMsg.message, seqMsg.ack, NoQualifier, System.currentTimeMillis()), attempt = 1) - active(s.copy(replyAfterStore = newReplyAfterStore)) + active( + s.copy( + replyAfterStore = newReplyAfterStore, + remainingChunks = chunks, + storeMessageSentInProgress = seqMsg.seqNr)) } case Msg(m: A) => + checkReceiveMessageRemainingChunksState() flightRecorder.producerReceived(producerId, s.currentSeqNr) + val chunks = chunk(m, ack = false) if (durableQueue.isEmpty) { - onMsg(m, s.replyAfterStore, ack = false) + onMsg(chunks.head, s.replyAfterStore, chunks.tail) } else { + val seqMsg = chunks.head storeMessageSent( - MessageSent(s.currentSeqNr, m, ack = false, NoQualifier, System.currentTimeMillis()), + MessageSent + .fromMessageOrChunked(seqMsg.seqNr, seqMsg.message, seqMsg.ack, NoQualifier, System.currentTimeMillis()), attempt = 1) - Behaviors.same + active(s.copy(remainingChunks = chunks, storeMessageSentInProgress = seqMsg.seqNr)) } - case StoreMessageSentCompleted(MessageSent(seqNr, m: A, ack, NoQualifier, _)) => - receiveStoreMessageSentCompleted(seqNr, m, ack) + case StoreMessageSentCompleted(sent: MessageSent[_]) => + receiveStoreMessageSentCompleted(sent.seqNr) case f: StoreMessageSentFailed[A] => receiveStoreMessageSentFailed(f) @@ -606,6 +805,9 @@ private class ProducerControllerImpl[A: ClassTag]( case Ack(newConfirmedSeqNr) => receiveAck(newConfirmedSeqNr) + case SendChunk => + receiveSendChunk() + case Resend(fromSeqNr) => receiveResend(fromSeqNr) @@ -626,24 +828,6 @@ private class ProducerControllerImpl[A: ClassTag]( } } - private def receiveStoreMessageSentFailed(f: StoreMessageSentFailed[A]): Behavior[InternalCommand] = { - if (f.attempt >= settings.durableQueueRetryAttempts) { - val errorMessage = - s"StoreMessageSentFailed seqNr [${f.messageSent.seqNr}] failed after [${f.attempt}] attempts, giving up." - context.log.error(errorMessage) - throw new TimeoutException(errorMessage) - } else { - context.log.warnN( - "StoreMessageSent seqNr [{}] failed, attempt [{}] of [{}], retrying.", - f.messageSent.seqNr, - f.attempt, - settings.durableQueueRetryAttempts) - // retry - storeMessageSent(f.messageSent, attempt = f.attempt + 1) - Behaviors.same - } - } - private def storeMessageSent(messageSent: MessageSent[A], attempt: Int): Unit = { context.ask[StoreMessageSent[A], StoreMessageSentAck]( durableQueue.get, diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/internal/WorkPullingProducerControllerImpl.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/internal/WorkPullingProducerControllerImpl.scala index d6c846b83c..3abb7c8363 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/internal/WorkPullingProducerControllerImpl.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/internal/WorkPullingProducerControllerImpl.scala @@ -278,8 +278,9 @@ private class WorkPullingProducerControllerImpl[A: ClassTag]( import WorkPullingProducerController.WorkerStats import WorkPullingProducerControllerImpl._ + private val producerControllerSettings = settings.producerControllerSettings private val traceEnabled = context.log.isTraceEnabled - private val durableQueueAskTimeout: Timeout = settings.producerControllerSettings.durableQueueRequestTimeout + private val durableQueueAskTimeout: Timeout = producerControllerSettings.durableQueueRequestTimeout private val workerAskTimeout: Timeout = settings.internalAskTimeout private val workerRequestNextAdapter: ActorRef[ProducerController.RequestNext[A]] = @@ -556,7 +557,7 @@ private class WorkPullingProducerControllerImpl[A: ClassTag]( val outKey = s"$producerId-$uuid" context.log.debug2("Registered worker [{}], with producerId [{}].", c, outKey) val p = context.spawn( - ProducerController[A](outKey, durableQueueBehavior = None, settings.producerControllerSettings), + ProducerController[A](outKey, durableQueueBehavior = None, producerControllerSettings), uuid, DispatcherSelector.sameAsParent()) p ! ProducerController.Start(workerRequestNextAdapter) @@ -657,7 +658,7 @@ private class WorkPullingProducerControllerImpl[A: ClassTag]( } private def receiveStoreMessageSentFailed(f: StoreMessageSentFailed[A]): Behavior[InternalCommand] = { - if (f.attempt >= settings.producerControllerSettings.durableQueueRetryAttempts) { + if (f.attempt >= producerControllerSettings.durableQueueRetryAttempts) { val errorMessage = s"StoreMessageSentFailed seqNr [${f.messageSent.seqNr}] failed after [${f.attempt}] attempts, giving up." context.log.error(errorMessage) diff --git a/akka-cluster-sharding-typed/src/main/resources/reference.conf b/akka-cluster-sharding-typed/src/main/resources/reference.conf index c8dc27eb3b..b108f9cc6b 100644 --- a/akka-cluster-sharding-typed/src/main/resources/reference.conf +++ b/akka-cluster-sharding-typed/src/main/resources/reference.conf @@ -63,6 +63,10 @@ akka.reliable-delivery { # unconfirmed messages the ShardingConsumerController has to "wake up" # the consumer again by resending the first unconfirmed message. resend-first-unconfirmed-idle-timeout = 10s + + # Chunked messages not implemented for sharding yet. Override to not + # propagate property from akka.reliable-delivery.producer-controller. + chunk-large-messages = off } consumer-controller = ${akka.reliable-delivery.consumer-controller} diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/delivery/ShardingProducerController.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/delivery/ShardingProducerController.scala index a00e16fa58..9731277c9e 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/delivery/ShardingProducerController.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/delivery/ShardingProducerController.scala @@ -203,6 +203,9 @@ object ShardingProducerController { val resendFirsUnconfirmedIdleTimeout: FiniteDuration, val producerControllerSettings: ProducerController.Settings) { + if (producerControllerSettings.chunkLargeMessagesBytes > 0) + throw new IllegalArgumentException("Chunked messages not implemented for sharding yet.") + def withBufferSize(newBufferSize: Int): Settings = copy(bufferSize = newBufferSize) diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/delivery/internal/ShardingProducerControllerImpl.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/delivery/internal/ShardingProducerControllerImpl.scala index 395a5c47ed..9eb1ac61c1 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/delivery/internal/ShardingProducerControllerImpl.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/delivery/internal/ShardingProducerControllerImpl.scala @@ -279,7 +279,8 @@ private class ShardingProducerControllerImpl[A: ClassTag]( import ShardingProducerController.Start import ShardingProducerControllerImpl._ - private val durableQueueAskTimeout: Timeout = settings.producerControllerSettings.durableQueueRequestTimeout + private val producerControllerSettings = settings.producerControllerSettings + private val durableQueueAskTimeout: Timeout = producerControllerSettings.durableQueueRequestTimeout private val entityAskTimeout: Timeout = settings.internalAskTimeout private val traceEnabled = context.log.isTraceEnabled @@ -333,7 +334,7 @@ private class ShardingProducerControllerImpl[A: ClassTag]( region ! ShardingEnvelope(entityId, seqMsg) } val p = context.spawn( - ProducerController[A](outKey, durableQueueBehavior = None, settings.producerControllerSettings, send), + ProducerController[A](outKey, durableQueueBehavior = None, producerControllerSettings, send), entityId, DispatcherSelector.sameAsParent()) p ! ProducerController.Start(requestNextAdapter) @@ -389,7 +390,7 @@ private class ShardingProducerControllerImpl[A: ClassTag]( } def receiveStoreMessageSentFailed(f: StoreMessageSentFailed[A]): Behavior[InternalCommand] = { - if (f.attempt >= settings.producerControllerSettings.durableQueueRetryAttempts) { + if (f.attempt >= producerControllerSettings.durableQueueRetryAttempts) { val errorMessage = s"StoreMessageSentFailed seqNr [${f.messageSent.seqNr}] failed after [${f.attempt}] attempts, giving up." context.log.error(errorMessage) diff --git a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/delivery/ReliableDeliveryShardingSpec.scala b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/delivery/ReliableDeliveryShardingSpec.scala index 175e2cae6b..829dee5561 100644 --- a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/delivery/ReliableDeliveryShardingSpec.scala +++ b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/delivery/ReliableDeliveryShardingSpec.scala @@ -110,7 +110,7 @@ class ReliableDeliveryShardingSpec "illustrate sharding usage" in { nextId() - val consumerEndProbe = createTestProbe[TestConsumer.CollectedProducerIds]() + val consumerEndProbe = createTestProbe[TestConsumer.Collected]() val typeKey = EntityTypeKey[SequencedMessage[TestConsumer.Job]](s"TestConsumer-$idCount") val sharding: ActorRef[ShardingEnvelope[SequencedMessage[TestConsumer.Job]]] = ClusterSharding(system).init(Entity(typeKey)(_ => @@ -130,7 +130,7 @@ class ReliableDeliveryShardingSpec "illustrate sharding usage with several producers" in { nextId() - val consumerEndProbe = createTestProbe[TestConsumer.CollectedProducerIds]() + val consumerEndProbe = createTestProbe[TestConsumer.Collected]() val typeKey = EntityTypeKey[SequencedMessage[TestConsumer.Job]](s"TestConsumer-$idCount") val sharding: ActorRef[ShardingEnvelope[SequencedMessage[TestConsumer.Job]]] = ClusterSharding(system).init(Entity(typeKey)(_ => @@ -175,7 +175,7 @@ class ReliableDeliveryShardingSpec "reply to MessageWithConfirmation" in { nextId() - val consumerEndProbe = createTestProbe[TestConsumer.CollectedProducerIds]() + val consumerEndProbe = createTestProbe[TestConsumer.Collected]() val typeKey = EntityTypeKey[SequencedMessage[TestConsumer.Job]](s"TestConsumer-$idCount") val sharding: ActorRef[ShardingEnvelope[SequencedMessage[TestConsumer.Job]]] = ClusterSharding(system).init(Entity(typeKey)(_ => diff --git a/akka-cluster-typed/src/main/java/akka/cluster/typed/internal/protobuf/ReliableDelivery.java b/akka-cluster-typed/src/main/java/akka/cluster/typed/internal/protobuf/ReliableDelivery.java index 7f3407d223..bfa4e834c9 100644 --- a/akka-cluster-typed/src/main/java/akka/cluster/typed/internal/protobuf/ReliableDelivery.java +++ b/akka-cluster-typed/src/main/java/akka/cluster/typed/internal/protobuf/ReliableDelivery.java @@ -103,6 +103,28 @@ public final class ReliableDelivery { * required .Payload message = 6; */ akka.remote.ContainerFormats.PayloadOrBuilder getMessageOrBuilder(); + + /** + * optional bool firstChunk = 7; + * @return Whether the firstChunk field is set. + */ + boolean hasFirstChunk(); + /** + * optional bool firstChunk = 7; + * @return The firstChunk. + */ + boolean getFirstChunk(); + + /** + * optional bool lastChunk = 8; + * @return Whether the lastChunk field is set. + */ + boolean hasLastChunk(); + /** + * optional bool lastChunk = 8; + * @return The lastChunk. + */ + boolean getLastChunk(); } /** *
@@ -196,6 +218,16 @@ public final class ReliableDelivery {
               bitField0_ |= 0x00000020;
               break;
             }
+            case 56: {
+              bitField0_ |= 0x00000040;
+              firstChunk_ = input.readBool();
+              break;
+            }
+            case 64: {
+              bitField0_ |= 0x00000080;
+              lastChunk_ = input.readBool();
+              break;
+            }
             default: {
               if (!parseUnknownField(
                   input, unknownFields, extensionRegistry, tag)) {
@@ -393,6 +425,40 @@ public final class ReliableDelivery {
       return message_ == null ? akka.remote.ContainerFormats.Payload.getDefaultInstance() : message_;
     }
 
+    public static final int FIRSTCHUNK_FIELD_NUMBER = 7;
+    private boolean firstChunk_;
+    /**
+     * optional bool firstChunk = 7;
+     * @return Whether the firstChunk field is set.
+     */
+    public boolean hasFirstChunk() {
+      return ((bitField0_ & 0x00000040) != 0);
+    }
+    /**
+     * optional bool firstChunk = 7;
+     * @return The firstChunk.
+     */
+    public boolean getFirstChunk() {
+      return firstChunk_;
+    }
+
+    public static final int LASTCHUNK_FIELD_NUMBER = 8;
+    private boolean lastChunk_;
+    /**
+     * optional bool lastChunk = 8;
+     * @return Whether the lastChunk field is set.
+     */
+    public boolean hasLastChunk() {
+      return ((bitField0_ & 0x00000080) != 0);
+    }
+    /**
+     * optional bool lastChunk = 8;
+     * @return The lastChunk.
+     */
+    public boolean getLastChunk() {
+      return lastChunk_;
+    }
+
     private byte memoizedIsInitialized = -1;
     @java.lang.Override
     public final boolean isInitialized() {
@@ -453,6 +519,12 @@ public final class ReliableDelivery {
       if (((bitField0_ & 0x00000020) != 0)) {
         output.writeMessage(6, getMessage());
       }
+      if (((bitField0_ & 0x00000040) != 0)) {
+        output.writeBool(7, firstChunk_);
+      }
+      if (((bitField0_ & 0x00000080) != 0)) {
+        output.writeBool(8, lastChunk_);
+      }
       unknownFields.writeTo(output);
     }
 
@@ -484,6 +556,14 @@ public final class ReliableDelivery {
         size += akka.protobufv3.internal.CodedOutputStream
           .computeMessageSize(6, getMessage());
       }
+      if (((bitField0_ & 0x00000040) != 0)) {
+        size += akka.protobufv3.internal.CodedOutputStream
+          .computeBoolSize(7, firstChunk_);
+      }
+      if (((bitField0_ & 0x00000080) != 0)) {
+        size += akka.protobufv3.internal.CodedOutputStream
+          .computeBoolSize(8, lastChunk_);
+      }
       size += unknownFields.getSerializedSize();
       memoizedSize = size;
       return size;
@@ -529,6 +609,16 @@ public final class ReliableDelivery {
         if (!getMessage()
             .equals(other.getMessage())) return false;
       }
+      if (hasFirstChunk() != other.hasFirstChunk()) return false;
+      if (hasFirstChunk()) {
+        if (getFirstChunk()
+            != other.getFirstChunk()) return false;
+      }
+      if (hasLastChunk() != other.hasLastChunk()) return false;
+      if (hasLastChunk()) {
+        if (getLastChunk()
+            != other.getLastChunk()) return false;
+      }
       if (!unknownFields.equals(other.unknownFields)) return false;
       return true;
     }
@@ -567,6 +657,16 @@ public final class ReliableDelivery {
         hash = (37 * hash) + MESSAGE_FIELD_NUMBER;
         hash = (53 * hash) + getMessage().hashCode();
       }
+      if (hasFirstChunk()) {
+        hash = (37 * hash) + FIRSTCHUNK_FIELD_NUMBER;
+        hash = (53 * hash) + akka.protobufv3.internal.Internal.hashBoolean(
+            getFirstChunk());
+      }
+      if (hasLastChunk()) {
+        hash = (37 * hash) + LASTCHUNK_FIELD_NUMBER;
+        hash = (53 * hash) + akka.protobufv3.internal.Internal.hashBoolean(
+            getLastChunk());
+      }
       hash = (29 * hash) + unknownFields.hashCode();
       memoizedHashCode = hash;
       return hash;
@@ -721,6 +821,10 @@ public final class ReliableDelivery {
           messageBuilder_.clear();
         }
         bitField0_ = (bitField0_ & ~0x00000020);
+        firstChunk_ = false;
+        bitField0_ = (bitField0_ & ~0x00000040);
+        lastChunk_ = false;
+        bitField0_ = (bitField0_ & ~0x00000080);
         return this;
       }
 
@@ -777,6 +881,14 @@ public final class ReliableDelivery {
           }
           to_bitField0_ |= 0x00000020;
         }
+        if (((from_bitField0_ & 0x00000040) != 0)) {
+          result.firstChunk_ = firstChunk_;
+          to_bitField0_ |= 0x00000040;
+        }
+        if (((from_bitField0_ & 0x00000080) != 0)) {
+          result.lastChunk_ = lastChunk_;
+          to_bitField0_ |= 0x00000080;
+        }
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -848,6 +960,12 @@ public final class ReliableDelivery {
         if (other.hasMessage()) {
           mergeMessage(other.getMessage());
         }
+        if (other.hasFirstChunk()) {
+          setFirstChunk(other.getFirstChunk());
+        }
+        if (other.hasLastChunk()) {
+          setLastChunk(other.getLastChunk());
+        }
         this.mergeUnknownFields(other.unknownFields);
         onChanged();
         return this;
@@ -1297,6 +1415,80 @@ public final class ReliableDelivery {
         }
         return messageBuilder_;
       }
+
+      private boolean firstChunk_ ;
+      /**
+       * optional bool firstChunk = 7;
+       * @return Whether the firstChunk field is set.
+       */
+      public boolean hasFirstChunk() {
+        return ((bitField0_ & 0x00000040) != 0);
+      }
+      /**
+       * optional bool firstChunk = 7;
+       * @return The firstChunk.
+       */
+      public boolean getFirstChunk() {
+        return firstChunk_;
+      }
+      /**
+       * optional bool firstChunk = 7;
+       * @param value The firstChunk to set.
+       * @return This builder for chaining.
+       */
+      public Builder setFirstChunk(boolean value) {
+        bitField0_ |= 0x00000040;
+        firstChunk_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * optional bool firstChunk = 7;
+       * @return This builder for chaining.
+       */
+      public Builder clearFirstChunk() {
+        bitField0_ = (bitField0_ & ~0x00000040);
+        firstChunk_ = false;
+        onChanged();
+        return this;
+      }
+
+      private boolean lastChunk_ ;
+      /**
+       * optional bool lastChunk = 8;
+       * @return Whether the lastChunk field is set.
+       */
+      public boolean hasLastChunk() {
+        return ((bitField0_ & 0x00000080) != 0);
+      }
+      /**
+       * optional bool lastChunk = 8;
+       * @return The lastChunk.
+       */
+      public boolean getLastChunk() {
+        return lastChunk_;
+      }
+      /**
+       * optional bool lastChunk = 8;
+       * @param value The lastChunk to set.
+       * @return This builder for chaining.
+       */
+      public Builder setLastChunk(boolean value) {
+        bitField0_ |= 0x00000080;
+        lastChunk_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * optional bool lastChunk = 8;
+       * @return This builder for chaining.
+       */
+      public Builder clearLastChunk() {
+        bitField0_ = (bitField0_ & ~0x00000080);
+        lastChunk_ = false;
+        onChanged();
+        return this;
+      }
       @java.lang.Override
       public final Builder setUnknownFields(
           final akka.protobufv3.internal.UnknownFieldSet unknownFields) {
@@ -6168,6 +6360,28 @@ public final class ReliableDelivery {
      * required .Payload message = 5;
      */
     akka.remote.ContainerFormats.PayloadOrBuilder getMessageOrBuilder();
+
+    /**
+     * optional bool firstChunk = 6;
+     * @return Whether the firstChunk field is set.
+     */
+    boolean hasFirstChunk();
+    /**
+     * optional bool firstChunk = 6;
+     * @return The firstChunk.
+     */
+    boolean getFirstChunk();
+
+    /**
+     * optional bool lastChunk = 7;
+     * @return Whether the lastChunk field is set.
+     */
+    boolean hasLastChunk();
+    /**
+     * optional bool lastChunk = 7;
+     * @return The lastChunk.
+     */
+    boolean getLastChunk();
   }
   /**
    * 
@@ -6254,6 +6468,16 @@ public final class ReliableDelivery {
               bitField0_ |= 0x00000010;
               break;
             }
+            case 48: {
+              bitField0_ |= 0x00000020;
+              firstChunk_ = input.readBool();
+              break;
+            }
+            case 56: {
+              bitField0_ |= 0x00000040;
+              lastChunk_ = input.readBool();
+              break;
+            }
             default: {
               if (!parseUnknownField(
                   input, unknownFields, extensionRegistry, tag)) {
@@ -6406,6 +6630,40 @@ public final class ReliableDelivery {
       return message_ == null ? akka.remote.ContainerFormats.Payload.getDefaultInstance() : message_;
     }
 
+    public static final int FIRSTCHUNK_FIELD_NUMBER = 6;
+    private boolean firstChunk_;
+    /**
+     * optional bool firstChunk = 6;
+     * @return Whether the firstChunk field is set.
+     */
+    public boolean hasFirstChunk() {
+      return ((bitField0_ & 0x00000020) != 0);
+    }
+    /**
+     * optional bool firstChunk = 6;
+     * @return The firstChunk.
+     */
+    public boolean getFirstChunk() {
+      return firstChunk_;
+    }
+
+    public static final int LASTCHUNK_FIELD_NUMBER = 7;
+    private boolean lastChunk_;
+    /**
+     * optional bool lastChunk = 7;
+     * @return Whether the lastChunk field is set.
+     */
+    public boolean hasLastChunk() {
+      return ((bitField0_ & 0x00000040) != 0);
+    }
+    /**
+     * optional bool lastChunk = 7;
+     * @return The lastChunk.
+     */
+    public boolean getLastChunk() {
+      return lastChunk_;
+    }
+
     private byte memoizedIsInitialized = -1;
     @java.lang.Override
     public final boolean isInitialized() {
@@ -6459,6 +6717,12 @@ public final class ReliableDelivery {
       if (((bitField0_ & 0x00000010) != 0)) {
         output.writeMessage(5, getMessage());
       }
+      if (((bitField0_ & 0x00000020) != 0)) {
+        output.writeBool(6, firstChunk_);
+      }
+      if (((bitField0_ & 0x00000040) != 0)) {
+        output.writeBool(7, lastChunk_);
+      }
       unknownFields.writeTo(output);
     }
 
@@ -6487,6 +6751,14 @@ public final class ReliableDelivery {
         size += akka.protobufv3.internal.CodedOutputStream
           .computeMessageSize(5, getMessage());
       }
+      if (((bitField0_ & 0x00000020) != 0)) {
+        size += akka.protobufv3.internal.CodedOutputStream
+          .computeBoolSize(6, firstChunk_);
+      }
+      if (((bitField0_ & 0x00000040) != 0)) {
+        size += akka.protobufv3.internal.CodedOutputStream
+          .computeBoolSize(7, lastChunk_);
+      }
       size += unknownFields.getSerializedSize();
       memoizedSize = size;
       return size;
@@ -6527,6 +6799,16 @@ public final class ReliableDelivery {
         if (!getMessage()
             .equals(other.getMessage())) return false;
       }
+      if (hasFirstChunk() != other.hasFirstChunk()) return false;
+      if (hasFirstChunk()) {
+        if (getFirstChunk()
+            != other.getFirstChunk()) return false;
+      }
+      if (hasLastChunk() != other.hasLastChunk()) return false;
+      if (hasLastChunk()) {
+        if (getLastChunk()
+            != other.getLastChunk()) return false;
+      }
       if (!unknownFields.equals(other.unknownFields)) return false;
       return true;
     }
@@ -6561,6 +6843,16 @@ public final class ReliableDelivery {
         hash = (37 * hash) + MESSAGE_FIELD_NUMBER;
         hash = (53 * hash) + getMessage().hashCode();
       }
+      if (hasFirstChunk()) {
+        hash = (37 * hash) + FIRSTCHUNK_FIELD_NUMBER;
+        hash = (53 * hash) + akka.protobufv3.internal.Internal.hashBoolean(
+            getFirstChunk());
+      }
+      if (hasLastChunk()) {
+        hash = (37 * hash) + LASTCHUNK_FIELD_NUMBER;
+        hash = (53 * hash) + akka.protobufv3.internal.Internal.hashBoolean(
+            getLastChunk());
+      }
       hash = (29 * hash) + unknownFields.hashCode();
       memoizedHashCode = hash;
       return hash;
@@ -6713,6 +7005,10 @@ public final class ReliableDelivery {
           messageBuilder_.clear();
         }
         bitField0_ = (bitField0_ & ~0x00000010);
+        firstChunk_ = false;
+        bitField0_ = (bitField0_ & ~0x00000020);
+        lastChunk_ = false;
+        bitField0_ = (bitField0_ & ~0x00000040);
         return this;
       }
 
@@ -6765,6 +7061,14 @@ public final class ReliableDelivery {
           }
           to_bitField0_ |= 0x00000010;
         }
+        if (((from_bitField0_ & 0x00000020) != 0)) {
+          result.firstChunk_ = firstChunk_;
+          to_bitField0_ |= 0x00000020;
+        }
+        if (((from_bitField0_ & 0x00000040) != 0)) {
+          result.lastChunk_ = lastChunk_;
+          to_bitField0_ |= 0x00000040;
+        }
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -6831,6 +7135,12 @@ public final class ReliableDelivery {
         if (other.hasMessage()) {
           mergeMessage(other.getMessage());
         }
+        if (other.hasFirstChunk()) {
+          setFirstChunk(other.getFirstChunk());
+        }
+        if (other.hasLastChunk()) {
+          setLastChunk(other.getLastChunk());
+        }
         this.mergeUnknownFields(other.unknownFields);
         onChanged();
         return this;
@@ -7193,6 +7503,80 @@ public final class ReliableDelivery {
         }
         return messageBuilder_;
       }
+
+      private boolean firstChunk_ ;
+      /**
+       * optional bool firstChunk = 6;
+       * @return Whether the firstChunk field is set.
+       */
+      public boolean hasFirstChunk() {
+        return ((bitField0_ & 0x00000020) != 0);
+      }
+      /**
+       * optional bool firstChunk = 6;
+       * @return The firstChunk.
+       */
+      public boolean getFirstChunk() {
+        return firstChunk_;
+      }
+      /**
+       * optional bool firstChunk = 6;
+       * @param value The firstChunk to set.
+       * @return This builder for chaining.
+       */
+      public Builder setFirstChunk(boolean value) {
+        bitField0_ |= 0x00000020;
+        firstChunk_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * optional bool firstChunk = 6;
+       * @return This builder for chaining.
+       */
+      public Builder clearFirstChunk() {
+        bitField0_ = (bitField0_ & ~0x00000020);
+        firstChunk_ = false;
+        onChanged();
+        return this;
+      }
+
+      private boolean lastChunk_ ;
+      /**
+       * optional bool lastChunk = 7;
+       * @return Whether the lastChunk field is set.
+       */
+      public boolean hasLastChunk() {
+        return ((bitField0_ & 0x00000040) != 0);
+      }
+      /**
+       * optional bool lastChunk = 7;
+       * @return The lastChunk.
+       */
+      public boolean getLastChunk() {
+        return lastChunk_;
+      }
+      /**
+       * optional bool lastChunk = 7;
+       * @param value The lastChunk to set.
+       * @return This builder for chaining.
+       */
+      public Builder setLastChunk(boolean value) {
+        bitField0_ |= 0x00000040;
+        lastChunk_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * optional bool lastChunk = 7;
+       * @return This builder for chaining.
+       */
+      public Builder clearLastChunk() {
+        bitField0_ = (bitField0_ & ~0x00000040);
+        lastChunk_ = false;
+        onChanged();
+        return this;
+      }
       @java.lang.Override
       public final Builder setUnknownFields(
           final akka.protobufv3.internal.UnknownFieldSet unknownFields) {
@@ -7946,27 +8330,29 @@ public final class ReliableDelivery {
   static {
     java.lang.String[] descriptorData = {
       "\n\026ReliableDelivery.proto\022\033akka.cluster.t" +
-      "yped.delivery\032\026ContainerFormats.proto\"\213\001" +
+      "yped.delivery\032\026ContainerFormats.proto\"\262\001" +
       "\n\020SequencedMessage\022\022\n\nproducerId\030\001 \002(\t\022\r" +
       "\n\005seqNr\030\002 \002(\003\022\r\n\005first\030\003 \002(\010\022\013\n\003ack\030\004 \002(" +
       "\010\022\035\n\025producerControllerRef\030\005 \002(\t\022\031\n\007mess" +
-      "age\030\006 \002(\0132\010.Payload\"1\n\020RegisterConsumer\022" +
-      "\035\n\025consumerControllerRef\030\001 \002(\t\"f\n\007Reques" +
-      "t\022\026\n\016confirmedSeqNr\030\001 \002(\003\022\030\n\020requestUpTo" +
-      "SeqNr\030\002 \002(\003\022\025\n\rsupportResend\030\003 \002(\010\022\022\n\nvi" +
-      "aTimeout\030\004 \002(\010\"\033\n\006Resend\022\021\n\tfromSeqNr\030\001 " +
-      "\002(\003\"\035\n\003Ack\022\026\n\016confirmedSeqNr\030\001 \002(\003\"\266\001\n\005S" +
-      "tate\022\024\n\014currentSeqNr\030\001 \002(\003\022\035\n\025highestCon" +
-      "firmedSeqNr\030\002 \002(\003\0229\n\tconfirmed\030\003 \003(\0132&.a" +
-      "kka.cluster.typed.delivery.Confirmed\022=\n\013" +
-      "unconfirmed\030\004 \003(\0132(.akka.cluster.typed.d" +
-      "elivery.MessageSent\"@\n\tConfirmed\022\r\n\005seqN" +
-      "r\030\001 \002(\003\022\021\n\tqualifier\030\002 \002(\t\022\021\n\ttimestamp\030" +
-      "\003 \002(\003\"j\n\013MessageSent\022\r\n\005seqNr\030\001 \002(\003\022\021\n\tq" +
+      "age\030\006 \002(\0132\010.Payload\022\022\n\nfirstChunk\030\007 \001(\010\022" +
+      "\021\n\tlastChunk\030\010 \001(\010\"1\n\020RegisterConsumer\022\035" +
+      "\n\025consumerControllerRef\030\001 \002(\t\"f\n\007Request" +
+      "\022\026\n\016confirmedSeqNr\030\001 \002(\003\022\030\n\020requestUpToS" +
+      "eqNr\030\002 \002(\003\022\025\n\rsupportResend\030\003 \002(\010\022\022\n\nvia" +
+      "Timeout\030\004 \002(\010\"\033\n\006Resend\022\021\n\tfromSeqNr\030\001 \002" +
+      "(\003\"\035\n\003Ack\022\026\n\016confirmedSeqNr\030\001 \002(\003\"\266\001\n\005St" +
+      "ate\022\024\n\014currentSeqNr\030\001 \002(\003\022\035\n\025highestConf" +
+      "irmedSeqNr\030\002 \002(\003\0229\n\tconfirmed\030\003 \003(\0132&.ak" +
+      "ka.cluster.typed.delivery.Confirmed\022=\n\013u" +
+      "nconfirmed\030\004 \003(\0132(.akka.cluster.typed.de" +
+      "livery.MessageSent\"@\n\tConfirmed\022\r\n\005seqNr" +
+      "\030\001 \002(\003\022\021\n\tqualifier\030\002 \002(\t\022\021\n\ttimestamp\030\003" +
+      " \002(\003\"\221\001\n\013MessageSent\022\r\n\005seqNr\030\001 \002(\003\022\021\n\tq" +
       "ualifier\030\002 \002(\t\022\013\n\003ack\030\003 \002(\010\022\021\n\ttimestamp" +
-      "\030\004 \002(\003\022\031\n\007message\030\005 \002(\0132\010.Payload\"\035\n\007Cle" +
-      "anup\022\022\n\nqualifiers\030\001 \003(\tB(\n$akka.cluster" +
-      ".typed.internal.protobufH\001"
+      "\030\004 \002(\003\022\031\n\007message\030\005 \002(\0132\010.Payload\022\022\n\nfir" +
+      "stChunk\030\006 \001(\010\022\021\n\tlastChunk\030\007 \001(\010\"\035\n\007Clea" +
+      "nup\022\022\n\nqualifiers\030\001 \003(\tB(\n$akka.cluster." +
+      "typed.internal.protobufH\001"
     };
     descriptor = akka.protobufv3.internal.Descriptors.FileDescriptor
       .internalBuildGeneratedFileFrom(descriptorData,
@@ -7978,7 +8364,7 @@ public final class ReliableDelivery {
     internal_static_akka_cluster_typed_delivery_SequencedMessage_fieldAccessorTable = new
       akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable(
         internal_static_akka_cluster_typed_delivery_SequencedMessage_descriptor,
-        new java.lang.String[] { "ProducerId", "SeqNr", "First", "Ack", "ProducerControllerRef", "Message", });
+        new java.lang.String[] { "ProducerId", "SeqNr", "First", "Ack", "ProducerControllerRef", "Message", "FirstChunk", "LastChunk", });
     internal_static_akka_cluster_typed_delivery_RegisterConsumer_descriptor =
       getDescriptor().getMessageTypes().get(1);
     internal_static_akka_cluster_typed_delivery_RegisterConsumer_fieldAccessorTable = new
@@ -8020,7 +8406,7 @@ public final class ReliableDelivery {
     internal_static_akka_cluster_typed_delivery_MessageSent_fieldAccessorTable = new
       akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable(
         internal_static_akka_cluster_typed_delivery_MessageSent_descriptor,
-        new java.lang.String[] { "SeqNr", "Qualifier", "Ack", "Timestamp", "Message", });
+        new java.lang.String[] { "SeqNr", "Qualifier", "Ack", "Timestamp", "Message", "FirstChunk", "LastChunk", });
     internal_static_akka_cluster_typed_delivery_Cleanup_descriptor =
       getDescriptor().getMessageTypes().get(8);
     internal_static_akka_cluster_typed_delivery_Cleanup_fieldAccessorTable = new
diff --git a/akka-cluster-typed/src/main/mima-filters/2.6.8.backwards.excludes/issue-24276-reliable-chunks.excludes b/akka-cluster-typed/src/main/mima-filters/2.6.8.backwards.excludes/issue-24276-reliable-chunks.excludes
new file mode 100644
index 0000000000..4124fb103e
--- /dev/null
+++ b/akka-cluster-typed/src/main/mima-filters/2.6.8.backwards.excludes/issue-24276-reliable-chunks.excludes
@@ -0,0 +1,2 @@
+# #24276 Chunked messages in Reliable Delivery
+ProblemFilters.exclude[Problem]("akka.cluster.typed.internal.protobuf.ReliableDelivery*")
diff --git a/akka-cluster-typed/src/main/protobuf/ReliableDelivery.proto b/akka-cluster-typed/src/main/protobuf/ReliableDelivery.proto
index 39fe9e0be3..be996e5c0f 100644
--- a/akka-cluster-typed/src/main/protobuf/ReliableDelivery.proto
+++ b/akka-cluster-typed/src/main/protobuf/ReliableDelivery.proto
@@ -18,6 +18,8 @@ message SequencedMessage {
   required bool ack = 4;
   required string producerControllerRef = 5;
   required Payload message = 6;
+  optional bool firstChunk = 7;
+  optional bool lastChunk = 8;
 }
 
 // ProducerController
@@ -65,6 +67,8 @@ message MessageSent {
   required bool ack = 3;
   required int64 timestamp = 4;
   required Payload message = 5;
+  optional bool firstChunk = 6;
+  optional bool lastChunk = 7;
 }
 
 // DurableProducerQueue
diff --git a/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/delivery/ReliableDeliverySerializer.scala b/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/delivery/ReliableDeliverySerializer.scala
index ff0cd75ba3..6ac6b21651 100644
--- a/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/delivery/ReliableDeliverySerializer.scala
+++ b/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/delivery/ReliableDeliverySerializer.scala
@@ -10,15 +10,19 @@ import akka.actor.typed.ActorRefResolver
 import akka.actor.typed.delivery.ConsumerController
 import akka.actor.typed.delivery.DurableProducerQueue
 import akka.actor.typed.delivery.ProducerController
+import akka.actor.typed.delivery.internal.ChunkedMessage
 import akka.actor.typed.delivery.internal.ProducerControllerImpl
 import akka.actor.typed.scaladsl.adapter._
 import akka.annotation.InternalApi
 import akka.cluster.typed.internal.protobuf.ReliableDelivery
 import akka.cluster.typed.internal.protobuf.ReliableDelivery.Confirmed
+import akka.remote.ContainerFormats
+import akka.remote.ContainerFormats.Payload
 import akka.remote.serialization.WrappedPayloadSupport
 import akka.serialization.BaseSerializer
 import akka.serialization.SerializerWithStringManifest
 import akka.util.ccompat.JavaConverters._
+import akka.protobufv3.internal.ByteString
 
 /**
  * INTERNAL API
@@ -78,10 +82,27 @@ import akka.util.ccompat.JavaConverters._
     b.setFirst(m.first)
     b.setAck(m.ack)
     b.setProducerControllerRef(resolver.toSerializationFormat(m.producerController))
-    b.setMessage(payloadSupport.payloadBuilder(m.message))
+
+    m.message match {
+      case chunk: ChunkedMessage =>
+        b.setMessage(chunkedMessageToProto(chunk))
+        b.setFirstChunk(chunk.firstChunk)
+        b.setLastChunk(chunk.lastChunk)
+      case _ =>
+        b.setMessage(payloadSupport.payloadBuilder(m.message))
+    }
+
     b.build().toByteArray()
   }
 
+  private def chunkedMessageToProto(chunk: ChunkedMessage): Payload.Builder = {
+    val payloadBuilder = ContainerFormats.Payload.newBuilder()
+    payloadBuilder.setEnclosedMessage(ByteString.copyFrom(chunk.serialized.toArray))
+    payloadBuilder.setMessageManifest(ByteString.copyFromUtf8(chunk.manifest))
+    payloadBuilder.setSerializerId(chunk.serializerId)
+    payloadBuilder
+  }
+
   private def ackToBinary(m: ProducerControllerImpl.Ack): Array[Byte] = {
     val b = ReliableDelivery.Ack.newBuilder()
     b.setConfirmedSeqNr(m.confirmedSeqNr)
@@ -119,7 +140,16 @@ import akka.util.ccompat.JavaConverters._
     b.setQualifier(m.confirmationQualifier)
     b.setAck(m.ack)
     b.setTimestamp(m.timestampMillis)
-    b.setMessage(payloadSupport.payloadBuilder(m.message))
+
+    m.message match {
+      case chunk: ChunkedMessage =>
+        b.setMessage(chunkedMessageToProto(chunk))
+        b.setFirstChunk(chunk.firstChunk)
+        b.setLastChunk(chunk.lastChunk)
+      case _ =>
+        b.setMessage(payloadSupport.payloadBuilder(m.message))
+    }
+
     b.build()
   }
 
@@ -172,7 +202,19 @@ import akka.util.ccompat.JavaConverters._
 
   private def sequencedMessageFromBinary(bytes: Array[Byte]): AnyRef = {
     val seqMsg = ReliableDelivery.SequencedMessage.parseFrom(bytes)
-    val wrappedMsg = payloadSupport.deserializePayload(seqMsg.getMessage)
+    val wrappedMsg =
+      if (seqMsg.hasFirstChunk) {
+        val manifest =
+          if (seqMsg.getMessage.hasMessageManifest) seqMsg.getMessage.getMessageManifest.toStringUtf8 else ""
+        ChunkedMessage(
+          akka.util.ByteString(seqMsg.getMessage.getEnclosedMessage.toByteArray),
+          seqMsg.getFirstChunk,
+          seqMsg.getLastChunk,
+          seqMsg.getMessage.getSerializerId,
+          manifest)
+      } else {
+        payloadSupport.deserializePayload(seqMsg.getMessage)
+      }
     ConsumerController.SequencedMessage(
       seqMsg.getProducerId,
       seqMsg.getSeqNr,
@@ -213,7 +255,19 @@ import akka.util.ccompat.JavaConverters._
 
   private def durableQueueMessageSentFromProto(
       sent: ReliableDelivery.MessageSent): DurableProducerQueue.MessageSent[Any] = {
-    val wrappedMsg = payloadSupport.deserializePayload(sent.getMessage)
+    val wrappedMsg =
+      if (sent.hasFirstChunk) {
+        val manifest =
+          if (sent.getMessage.hasMessageManifest) sent.getMessage.getMessageManifest.toStringUtf8 else ""
+        ChunkedMessage(
+          akka.util.ByteString(sent.getMessage.getEnclosedMessage.toByteArray),
+          sent.getFirstChunk,
+          sent.getLastChunk,
+          sent.getMessage.getSerializerId,
+          manifest)
+      } else {
+        payloadSupport.deserializePayload(sent.getMessage)
+      }
     DurableProducerQueue.MessageSent(sent.getSeqNr, wrappedMsg, sent.getAck, sent.getQualifier, sent.getTimestamp)
   }
 
diff --git a/akka-cluster-typed/src/multi-jvm/scala/akka/cluster/typed/ChunkLargeMessageSpec.scala b/akka-cluster-typed/src/multi-jvm/scala/akka/cluster/typed/ChunkLargeMessageSpec.scala
new file mode 100644
index 0000000000..548a94769b
--- /dev/null
+++ b/akka-cluster-typed/src/multi-jvm/scala/akka/cluster/typed/ChunkLargeMessageSpec.scala
@@ -0,0 +1,217 @@
+/*
+ * Copyright (C) 2020 Lightbend Inc. 
+ */
+
+package akka.cluster.typed
+
+import scala.concurrent.duration._
+import scala.util.Random
+
+import com.typesafe.config.ConfigFactory
+import org.HdrHistogram.Histogram
+
+import akka.actor.ActorIdentity
+import akka.actor.Identify
+import akka.actor.testkit.typed.scaladsl.TestProbe
+import akka.actor.typed.ActorRef
+import akka.actor.typed.Behavior
+import akka.actor.typed.PostStop
+import akka.actor.typed.delivery.ConsumerController
+import akka.actor.typed.delivery.ProducerController
+import akka.actor.typed.scaladsl.Behaviors
+import akka.cluster.MultiNodeClusterSpec
+import akka.remote.testconductor.RoleName
+import akka.remote.testkit.MultiNodeConfig
+import akka.remote.testkit.MultiNodeSpec
+import akka.serialization.jackson.CborSerializable
+
+object ChunkLargeMessageSpec extends MultiNodeConfig {
+  val first = role("first")
+  val second = role("second")
+
+  commonConfig(ConfigFactory.parseString("""
+        akka.loglevel = INFO
+        #akka.serialization.jackson.verbose-debug-logging = on
+        akka.remote.artery {
+          advanced.inbound-lanes = 1
+          advanced.maximum-frame-size = 2 MB
+        }
+      """).withFallback(MultiNodeClusterSpec.clusterConfig))
+
+  object Producer {
+    sealed trait Command
+    case object Stop extends Command
+    final case class Reply(timestamp: Long) extends Command with CborSerializable
+
+    private case class WrappedRequestNext(r: ProducerController.RequestNext[Consumer.TheMessage]) extends Command
+    private case class SendNext(to: ActorRef[Consumer.TheMessage]) extends Command
+
+    def apply(
+        numberOfMessages: Int,
+        large: Boolean,
+        delay: FiniteDuration,
+        producerController: ActorRef[ProducerController.Command[Consumer.TheMessage]]): Behavior[Command] = {
+
+      Behaviors.setup { context =>
+        val requestNextAdapter =
+          context.messageAdapter[ProducerController.RequestNext[Consumer.TheMessage]](WrappedRequestNext(_))
+        producerController ! ProducerController.Start(requestNextAdapter)
+
+        val histogram = new Histogram(SECONDS.toNanos(10), 3)
+
+        def percentile(p: Double): Double = histogram.getValueAtPercentile(p) / 1000.0
+
+        val rnd = new Random
+        val elements = if (large) Vector.fill(500)(rnd.nextString(1000)) else Vector("a")
+
+        if (numberOfMessages == 0)
+          Behaviors.stopped
+        else
+          Behaviors
+            .receive[Command] { (context, msg) =>
+              msg match {
+                case WrappedRequestNext(next) =>
+                  if (delay > Duration.Zero)
+                    context.scheduleOnce(delay, context.self, SendNext(next.sendNextTo))
+                  else
+                    next.sendNextTo ! Consumer.TheMessage(System.nanoTime(), context.self, Vector("a"))
+                  Behaviors.same
+                case SendNext(to) =>
+                  to ! Consumer.TheMessage(System.nanoTime(), context.self, elements)
+                  Behaviors.same
+                case Reply(timestamp) =>
+                  histogram.recordValue(System.nanoTime() - timestamp)
+                  if (histogram.getTotalCount == numberOfMessages)
+                    Behaviors.stopped
+                  else
+                    Behaviors.same
+                case Stop =>
+                  Behaviors.stopped
+              }
+            }
+            .receiveSignal {
+              case (context, PostStop) =>
+                if (histogram.getTotalCount > 0) {
+                  context.log.info(
+                    s"=== Latency for [${context.self.path.name}] " +
+                    f"50%%ile: ${percentile(50.0)}%.0f µs, " +
+                    f"90%%ile: ${percentile(90.0)}%.0f µs, " +
+                    f"99%%ile: ${percentile(99.0)}%.0f µs")
+                  println(s"Histogram for [${context.self.path.name}] of RTT latencies in microseconds.")
+                  histogram.outputPercentileDistribution(System.out, 1000.0)
+                }
+                Behaviors.same
+            }
+
+      }
+    }
+
+  }
+
+  object Consumer {
+
+    sealed trait Command
+    final case class TheMessage(sendTimstamp: Long, replyTo: ActorRef[Producer.Reply], elements: Vector[String])
+        extends CborSerializable {
+      override def toString: String = s"TheMessage($sendTimstamp,$replyTo,${elements.size})"
+    }
+    private final case class WrappedDelivery(d: ConsumerController.Delivery[TheMessage]) extends Command
+    case object Stop extends Command
+
+    def apply(consumerController: ActorRef[ConsumerController.Start[TheMessage]]): Behavior[Command] = {
+      Behaviors.setup { context =>
+        val deliveryAdapter =
+          context.messageAdapter[ConsumerController.Delivery[TheMessage]](WrappedDelivery(_))
+        consumerController ! ConsumerController.Start(deliveryAdapter)
+
+        Behaviors.receiveMessage {
+          case WrappedDelivery(d) =>
+            d.message.replyTo ! Producer.Reply(d.message.sendTimstamp)
+            d.confirmTo ! ConsumerController.Confirmed
+            Behaviors.same
+          case Stop =>
+            Behaviors.stopped
+        }
+      }
+    }
+  }
+
+}
+
+class ChunkLargeMessageMultiJvmNode1 extends ChunkLargeMessageSpec
+class ChunkLargeMessageMultiJvmNode2 extends ChunkLargeMessageSpec
+
+abstract class ChunkLargeMessageSpec extends MultiNodeSpec(ChunkLargeMessageSpec) with MultiNodeTypedClusterSpec {
+  import ChunkLargeMessageSpec._
+
+  // TODO move this to MultiNodeTypedClusterSpec
+  def identify[A](name: String, r: RoleName): ActorRef[A] = {
+    import akka.actor.typed.scaladsl.adapter._
+    val sel = system.actorSelection(node(r) / "user" / "testSpawn" / name)
+    sel.tell(Identify(None), testActor)
+    expectMsgType[ActorIdentity](10.seconds).ref.get.toTyped
+  }
+
+  private def test(n: Int, numberOfMessages: Int, includeLarge: Boolean): Unit = {
+    runOn(first) {
+      val producerController = spawn(ProducerController[Consumer.TheMessage](s"p$n", None), s"producerController$n")
+      val producer =
+        spawn(Producer(numberOfMessages, large = false, delay = 10.millis, producerController), s"producer$n")
+      val largeProducerController =
+        spawn(
+          ProducerController[Consumer.TheMessage](
+            s"p$n",
+            None,
+            ProducerController.Settings(typedSystem).withChunkLargeMessagesBytes(50000)),
+          s"largeProducerController$n")
+      val largeProducer =
+        spawn(
+          Producer(if (includeLarge) Int.MaxValue else 0, large = true, delay = 10.millis, largeProducerController),
+          s"largeProducer$n")
+      enterBarrier(s"producer$n-started")
+      val probe = TestProbe[Any]()
+      probe.expectTerminated(producer, 25.seconds)
+      largeProducer ! Producer.Stop
+      enterBarrier(s"producer$n-stopped")
+    }
+    runOn(second) {
+      enterBarrier(s"producer$n-started")
+      val consumerController = spawn(ConsumerController[Consumer.TheMessage](), s"consumerController$n")
+      val consumer = spawn(Consumer(consumerController), s"consumer$n")
+      val largeConsumerController = spawn(ConsumerController[Consumer.TheMessage](), s"largeConsumerController$n")
+      val largeConsumer = spawn(Consumer(largeConsumerController), s"largeConsumer$n")
+      val producerController: ActorRef[ProducerController.Command[Consumer.TheMessage]] =
+        identify(s"producerController$n", first)
+      consumerController ! ConsumerController.RegisterToProducerController(producerController)
+      val largeProducerController: ActorRef[ProducerController.Command[Consumer.TheMessage]] =
+        identify(s"largeProducerController$n", first)
+      largeConsumerController ! ConsumerController.RegisterToProducerController(largeProducerController)
+      enterBarrier(s"producer$n-stopped")
+      consumer ! Consumer.Stop
+      largeConsumer ! Consumer.Stop
+    }
+    enterBarrier(s"after-$n")
+  }
+
+  "Reliable delivery with chunked messages" must {
+
+    "form a cluster" in {
+      formCluster(first, second)
+      enterBarrier("cluster started")
+    }
+
+    "warmup" in {
+      test(1, 100, includeLarge = true)
+    }
+
+    "measure latency without large messages" in {
+      test(2, 250, includeLarge = false)
+    }
+
+    "measure latency with large messages" in {
+      test(3, 250, includeLarge = true)
+    }
+
+  }
+
+}
diff --git a/akka-cluster-typed/src/multi-jvm/scala/akka/cluster/typed/MultiNodeTypedClusterSpec.scala b/akka-cluster-typed/src/multi-jvm/scala/akka/cluster/typed/MultiNodeTypedClusterSpec.scala
index 368645d233..f1fdd516a1 100644
--- a/akka-cluster-typed/src/multi-jvm/scala/akka/cluster/typed/MultiNodeTypedClusterSpec.scala
+++ b/akka-cluster-typed/src/multi-jvm/scala/akka/cluster/typed/MultiNodeTypedClusterSpec.scala
@@ -14,7 +14,6 @@ import scala.language.implicitConversions
 import org.scalatest.Suite
 import org.scalatest.matchers.should.Matchers
 
-import akka.actor.{ Address, Scheduler }
 import akka.actor.typed.ActorRef
 import akka.actor.typed.ActorSystem
 import akka.actor.typed.Behavior
@@ -22,9 +21,13 @@ import akka.actor.typed.Props
 import akka.actor.typed.SpawnProtocol
 import akka.actor.typed.scaladsl.AskPattern._
 import akka.actor.typed.scaladsl.adapter._
-import akka.cluster.{ ClusterEvent, MemberStatus }
+import akka.actor.Address
+import akka.actor.Scheduler
+import akka.cluster.ClusterEvent
+import akka.cluster.MemberStatus
 import akka.remote.testconductor.RoleName
-import akka.remote.testkit.{ MultiNodeSpec, STMultiNodeSpec }
+import akka.remote.testkit.MultiNodeSpec
+import akka.remote.testkit.STMultiNodeSpec
 import akka.testkit.WatchedByCoroner
 import akka.util.Timeout
 
@@ -84,7 +87,8 @@ trait MultiNodeTypedClusterSpec extends Suite with STMultiNodeSpec with WatchedB
     enterBarrier("all-joined")
   }
 
-  private lazy val spawnActor = system.actorOf(PropsAdapter(SpawnProtocol())).toTyped[SpawnProtocol.Command]
+  private lazy val spawnActor =
+    system.actorOf(PropsAdapter(SpawnProtocol()), "testSpawn").toTyped[SpawnProtocol.Command]
   def spawn[T](behavior: Behavior[T], name: String): ActorRef[T] = {
     implicit val timeout: Timeout = testKitSettings.DefaultTimeout
     val f: Future[ActorRef[T]] = spawnActor.ask(SpawnProtocol.Spawn(behavior, name, Props.empty, _))
diff --git a/akka-cluster-typed/src/test/scala/akka/cluster/typed/internal/delivery/ReliableDeliverySerializerSpec.scala b/akka-cluster-typed/src/test/scala/akka/cluster/typed/internal/delivery/ReliableDeliverySerializerSpec.scala
index 19ddb50f68..0d2998c93b 100644
--- a/akka-cluster-typed/src/test/scala/akka/cluster/typed/internal/delivery/ReliableDeliverySerializerSpec.scala
+++ b/akka-cluster-typed/src/test/scala/akka/cluster/typed/internal/delivery/ReliableDeliverySerializerSpec.scala
@@ -12,10 +12,12 @@ import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
 import akka.actor.typed.delivery.ConsumerController
 import akka.actor.typed.delivery.DurableProducerQueue
 import akka.actor.typed.delivery.ProducerController
+import akka.actor.typed.delivery.internal.ChunkedMessage
 import akka.actor.typed.delivery.internal.ProducerControllerImpl
 import akka.actor.typed.scaladsl.Behaviors
 import akka.actor.typed.scaladsl.adapter._
 import akka.serialization.SerializationExtension
+import akka.util.ByteString
 
 class ReliableDeliverySerializerSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike with LogCapturing {
 
@@ -53,7 +55,17 @@ class ReliableDeliverySerializerSpec extends ScalaTestWithActorTestKit with AnyW
         Vector(
           DurableProducerQueue.MessageSent(15L, "msg15", true, "q4", timestamp),
           DurableProducerQueue.MessageSent(16L, "msg16", true, "q4", timestamp))),
-      "DurableProducerQueue.Cleanup" -> DurableProducerQueue.Cleanup(Set("q1", "q2", "q3"))).foreach {
+      "DurableProducerQueue.Cleanup" -> DurableProducerQueue.Cleanup(Set("q1", "q2", "q3")),
+      "SequencedMessage-chunked-1" -> ConsumerController.SequencedMessage
+        .fromChunked("prod-1", 1L, ChunkedMessage(ByteString.fromString("abc"), true, true, 20, ""), true, true, ref),
+      "SequencedMessage-chunked-2" -> ConsumerController.SequencedMessage
+        .fromChunked("prod-1", 1L, ChunkedMessage(ByteString(1, 2, 3), true, false, 123456, "A"), false, false, ref),
+      "DurableProducerQueue.MessageSent-chunked" -> DurableProducerQueue.MessageSent.fromChunked(
+        3L,
+        ChunkedMessage(ByteString.fromString("abc"), true, true, 20, ""),
+        false,
+        "",
+        timestamp)).foreach {
       case (scenario, item) =>
         s"resolve serializer for $scenario" in {
           val serializer = SerializationExtension(classicSystem)
diff --git a/akka-docs/src/main/paradox/remoting-artery.md b/akka-docs/src/main/paradox/remoting-artery.md
index da6bcfd7a3..b3893dcc1d 100644
--- a/akka-docs/src/main/paradox/remoting-artery.md
+++ b/akka-docs/src/main/paradox/remoting-artery.md
@@ -664,7 +664,7 @@ See `inbound-lanes` and `outbound-lanes` in the @ref:[reference configuration](g
 
 All the communication between user defined remote actors are isolated from the channel of Akka internal messages so
 a large user message cannot block an urgent system message. While this provides good isolation for Akka services, all
-user communications by default happen through a shared network connection (an Aeron stream). When some actors
+user communications by default happen through a shared network connection. When some actors
 send large messages this can cause other messages to suffer higher latency as they need to wait until the full
 message has been transported on the shared channel (and hence, shared bottleneck). In these cases it is usually
 helpful to separate actors that have different QoS requirements: large messages vs. low latency.
@@ -695,6 +695,11 @@ This means that all messages sent to the following actors will pass through the
 
 Messages destined for actors not matching any of these patterns are sent using the default channel as before.
 
+The large messages channel can still not be used for extremely large messages, a few MB per message at most.
+An alternative is to use the @ref:[Reliable delivery](typed/reliable-delivery.md) that has support for 
+automatically @ref[splitting up large messages](typed/reliable-delivery.md#chunk-large-messages) and assemble
+them again on the receiving side.
+
 ### External, shared Aeron media driver
 
 The Aeron transport is running in a so called [media driver](https://github.com/real-logic/Aeron/wiki/Media-Driver-Operation).
diff --git a/akka-docs/src/main/paradox/typed/reliable-delivery.md b/akka-docs/src/main/paradox/typed/reliable-delivery.md
index 2fee1bc017..3c5a212247 100644
--- a/akka-docs/src/main/paradox/typed/reliable-delivery.md
+++ b/akka-docs/src/main/paradox/typed/reliable-delivery.md
@@ -50,6 +50,10 @@ There are 3 supported patterns, which are described in the following sections:
 * @ref:[Work pulling](#work-pulling)
 * @ref:[Sharding](#sharding)
 
+The Point-to-Point pattern has support for automatically @ref:[splitting up large messages](#chunk-large-messages)
+and assemble them again on the consumer side. This feature is useful for avoiding head of line blocking from
+serialization and transfer of large messages.
+
 ## Point-to-point
 
 This pattern implements point-to-point reliable delivery between a single producer actor sending messages and a single consumer actor
@@ -410,6 +414,25 @@ This can be more efficient since messages don't have to be kept in memory in the
 they have been confirmed, but the drawback is that lost messages will not be delivered. See configuration
 `only-flow-control` of the `ConsumerController`.
 
+## Chunk large messages
+
+To avoid head of line blocking from serialization and transfer of large messages the @ref:[Point-to-Point](#point-to-point) 
+pattern has support for automatically @ref:[splitting up large messages](#chunk-large-messages) and assemble them
+again on the consumer side.
+
+Serialization and deserialization is performed by the `ProducerController` and `ConsumerController` respectively
+instead of in the remote transport layer.
+
+This is enabled by configuration `akka.reliable-delivery.producer-controller.chunk-large-messages` and defines
+the maximum size in bytes of the chunked pieces. Messages smaller than the configured size are not chunked, but
+serialization still takes place in the `ProducerController` and `ConsumerController`. 
+
+Aside from the configuration the API is the same as the @ref:[Point-to-point](#point-to-point) pattern. If
+@ref:[Durable producer](#durable-producer) is enabled the chunked pieces are stored rather than the full large
+message.
+
+This feature is not implemented for @ref:[Work pulling](#work-pulling) and @ref:[Sharding](#sharding) yet.
+
 ## Configuration
 
 There are several configuration properties, please refer to `akka.reliable-delivery` config section in the
diff --git a/akka-docs/src/test/scala/docs/persistence/PersistenceSchemaEvolutionDocSpec.scala b/akka-docs/src/test/scala/docs/persistence/PersistenceSchemaEvolutionDocSpec.scala
index 574c94b35b..5cb1b7f099 100644
--- a/akka-docs/src/test/scala/docs/persistence/PersistenceSchemaEvolutionDocSpec.scala
+++ b/akka-docs/src/test/scala/docs/persistence/PersistenceSchemaEvolutionDocSpec.scala
@@ -5,8 +5,6 @@
 package docs.persistence
 
 import java.io.NotSerializableException
-
-import scala.language.reflectiveCalls
 import java.nio.charset.Charset
 
 import akka.actor.ActorSystem
@@ -16,8 +14,8 @@ import akka.testkit.TestKit
 import com.typesafe.config._
 import org.scalatest.wordspec.AnyWordSpec
 import spray.json.JsObject
-
 import scala.concurrent.duration._
+
 import docs.persistence.proto.FlightAppModels
 
 class PersistenceSchemaEvolutionDocSpec extends AnyWordSpec {
@@ -284,7 +282,7 @@ class RemovedEventsAwareSerializer extends SerializerWithStringManifest {
       case m if SkipEventManifestsEvents.contains(m) =>
         EventDeserializationSkipped
 
-      case other => new String(bytes, utf8)
+      case _ => new String(bytes, utf8)
     }
 }
 //#string-serializer-skip-deleved-event-by-manifest
diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/delivery/EventSourcedProducerQueue.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/delivery/EventSourcedProducerQueue.scala
index c923343570..de29bb80f9 100644
--- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/delivery/EventSourcedProducerQueue.scala
+++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/delivery/EventSourcedProducerQueue.scala
@@ -221,77 +221,103 @@ private class EventSourcedProducerQueue[A](
   import DurableProducerQueue._
 
   private val traceEnabled = context.log.isTraceEnabled
+  // transient
+  private var initialCleanupDone = false
 
-  def onCommand(state: State[A], command: Command[A]): Effect[Event, State[A]] = {
-    command match {
-      case StoreMessageSent(sent, replyTo) =>
-        if (sent.seqNr == state.currentSeqNr) {
+  def onCommand(state: State[A], command: Command[A]): Effect[Event, State[A]] =
+    if (initialCleanupDone) {
+      command match {
+        case StoreMessageSent(sent, replyTo) =>
+          val currentSeqNr = state.currentSeqNr
+          if (sent.seqNr == currentSeqNr) {
+            if (traceEnabled)
+              context.log.trace(
+                "StoreMessageSent seqNr [{}], confirmationQualifier [{}]",
+                sent.seqNr,
+                sent.confirmationQualifier)
+            Effect.persist(sent).thenReply(replyTo)(_ => StoreMessageSentAck(sent.seqNr))
+          } else if (sent.seqNr == currentSeqNr - 1) {
+            // already stored, could be a retry after timeout
+            context.log.debug("Duplicate seqNr [{}], currentSeqNr [{}]", sent.seqNr, currentSeqNr)
+            Effect.reply(replyTo)(StoreMessageSentAck(sent.seqNr))
+          } else {
+            // may happen after failure
+            context.log.debug("Ignoring unexpected seqNr [{}], currentSeqNr [{}]", sent.seqNr, currentSeqNr)
+            Effect.unhandled // no reply, request will timeout
+          }
+
+        case StoreMessageConfirmed(seqNr, confirmationQualifier, timestampMillis) =>
           if (traceEnabled)
             context.log.trace(
-              "StoreMessageSent seqNr [{}], confirmationQualifier [{}]",
-              sent.seqNr,
-              sent.confirmationQualifier)
-          Effect.persist(sent).thenReply(replyTo)(_ => StoreMessageSentAck(sent.seqNr))
-        } else if (sent.seqNr == state.currentSeqNr - 1) {
-          // already stored, could be a retry after timout
-          context.log.debug("Duplicate seqNr [{}], currentSeqNr [{}]", sent.seqNr, state.currentSeqNr)
-          Effect.reply(replyTo)(StoreMessageSentAck(sent.seqNr))
-        } else {
-          // may happen after failure
-          context.log.debug("Ignoring unexpected seqNr [{}], currentSeqNr [{}]", sent.seqNr, state.currentSeqNr)
-          Effect.unhandled // no reply, request will timeout
-        }
+              "StoreMessageConfirmed seqNr [{}], confirmationQualifier [{}]",
+              seqNr,
+              confirmationQualifier)
+          val previousConfirmedSeqNr = state.confirmedSeqNr.get(confirmationQualifier) match {
+            case Some((nr, _)) => nr
+            case None          => 0L
+          }
+          if (seqNr > previousConfirmedSeqNr)
+            Effect.persist(Confirmed(seqNr, confirmationQualifier, timestampMillis))
+          else
+            Effect.none // duplicate
 
-      case StoreMessageConfirmed(seqNr, confirmationQualifier, timestampMillis) =>
-        if (traceEnabled)
-          context.log.trace(
-            "StoreMessageConfirmed seqNr [{}], confirmationQualifier [{}]",
-            seqNr,
-            confirmationQualifier)
-        val previousConfirmedSeqNr = state.confirmedSeqNr.get(confirmationQualifier) match {
-          case Some((nr, _)) => nr
-          case None          => 0L
-        }
-        if (seqNr > previousConfirmedSeqNr)
-          Effect.persist(Confirmed(seqNr, confirmationQualifier, timestampMillis))
-        else
-          Effect.none // duplicate
+        case LoadState(replyTo) =>
+          Effect.reply(replyTo)(state)
 
-      case LoadState(replyTo) =>
-        Effect.reply(replyTo)(state)
+        case _: CleanupTick[_] =>
+          onCleanupTick(state)
+      }
+    } else {
+      onCommandBeforeInitialCleanup(state, command)
+    }
 
+  private def onCleanupTick(state: State[A]): Effect[Event, State[A]] = {
+    val old = oldUnconfirmedToCleanup(state)
+    if (old.isEmpty) {
+      Effect.none
+    } else {
+      if (context.log.isDebugEnabled)
+        context.log.debug("Periodic cleanup [{}]", old.mkString(","))
+      Effect.persist(DurableProducerQueue.Cleanup(old))
+    }
+  }
+
+  private def oldUnconfirmedToCleanup(state: State[A]): Set[ConfirmationQualifier] = {
+    val now = System.currentTimeMillis()
+    state.confirmedSeqNr.collect {
+      case (confirmationQualifier, (_, timestampMillis))
+          if (now - timestampMillis) >= cleanupUnusedAfter.toMillis && !state.unconfirmed.exists(
+            _.confirmationQualifier != confirmationQualifier) =>
+        confirmationQualifier
+    }.toSet
+  }
+
+  def onCommandBeforeInitialCleanup(state: State[A], command: Command[A]): Effect[Event, State[A]] = {
+    command match {
       case _: CleanupTick[_] =>
-        val now = System.currentTimeMillis()
-        val old = state.confirmedSeqNr.collect {
-          case (confirmationQualifier, (_, timestampMillis))
-              if (now - timestampMillis) >= cleanupUnusedAfter.toMillis && !state.unconfirmed.exists(
-                _.confirmationQualifier != confirmationQualifier) =>
-            confirmationQualifier
-        }.toSet
-        if (old.isEmpty) {
-          Effect.none
+        val old = oldUnconfirmedToCleanup(state)
+        val stateWithoutPartialChunkedMessages = state.cleanupPartialChunkedMessages()
+        initialCleanupDone = true
+        if (old.isEmpty && (stateWithoutPartialChunkedMessages eq state)) {
+          Effect.unstashAll()
         } else {
           if (context.log.isDebugEnabled)
-            context.log.debug("Cleanup [{}]", old.mkString(","))
-          Effect.persist(DurableProducerQueue.Cleanup(old))
+            context.log.debug("Initial cleanup [{}]", old.mkString(","))
+          Effect.persist(DurableProducerQueue.Cleanup(old)).thenUnstashAll()
         }
+      case _ =>
+        Effect.stash()
     }
   }
 
   def onEvent(state: State[A], event: Event): State[A] = {
     event match {
       case sent: MessageSent[A] @unchecked =>
-        state.copy(currentSeqNr = sent.seqNr + 1, unconfirmed = state.unconfirmed :+ sent)
+        state.addMessageSent(sent)
       case Confirmed(seqNr, confirmationQualifier, timestampMillis) =>
-        val newUnconfirmed = state.unconfirmed.filterNot { u =>
-          u.confirmationQualifier == confirmationQualifier && u.seqNr <= seqNr
-        }
-        state.copy(
-          highestConfirmedSeqNr = math.max(state.highestConfirmedSeqNr, seqNr),
-          confirmedSeqNr = state.confirmedSeqNr.updated(confirmationQualifier, (seqNr, timestampMillis)),
-          unconfirmed = newUnconfirmed)
+        state.confirmed(seqNr, confirmationQualifier, timestampMillis)
       case Cleanup(confirmationQualifiers) =>
-        state.copy(confirmedSeqNr = state.confirmedSeqNr -- confirmationQualifiers)
+        state.cleanup(confirmationQualifiers).cleanupPartialChunkedMessages()
     }
   }
 
diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/delivery/ReliableDeliveryWithEventSourcedProducerQueueSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/delivery/ReliableDeliveryWithEventSourcedProducerQueueSpec.scala
index 9b719c337f..bb03405ecd 100644
--- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/delivery/ReliableDeliveryWithEventSourcedProducerQueueSpec.scala
+++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/delivery/ReliableDeliveryWithEventSourcedProducerQueueSpec.scala
@@ -28,11 +28,13 @@ object ReliableDeliveryWithEventSourcedProducerQueueSpec {
     """)
 }
 
-class ReliableDeliveryWithEventSourcedProducerQueueSpec
-    extends ScalaTestWithActorTestKit(WorkPullingWithEventSourcedProducerQueueSpec.conf)
+class ReliableDeliveryWithEventSourcedProducerQueueSpec(config: Config)
+    extends ScalaTestWithActorTestKit(config)
     with AnyWordSpecLike
     with LogCapturing {
 
+  def this() = this(ReliableDeliveryWithEventSourcedProducerQueueSpec.conf)
+
   "ReliableDelivery with EventSourcedProducerQueue" must {
 
     "deliver messages after full producer and consumer restart" in {
@@ -168,3 +170,10 @@ class ReliableDeliveryWithEventSourcedProducerQueueSpec
   }
 
 }
+
+// same tests but with chunked messages
+class ReliableDeliveryWithEventSourcedProducerQueueChunkedSpec
+    extends ReliableDeliveryWithEventSourcedProducerQueueSpec(
+      ConfigFactory.parseString("""
+    akka.reliable-delivery.producer-controller.chunk-large-messages = 1b
+    """).withFallback(ReliableDeliveryWithEventSourcedProducerQueueSpec.conf))