diff --git a/persistence/src/main/resources/reference.conf b/persistence/src/main/resources/reference.conf index 145257bec0..e3f3694267 100644 --- a/persistence/src/main/resources/reference.conf +++ b/persistence/src/main/resources/reference.conf @@ -151,6 +151,20 @@ pekko.persistence { # replayed event. debug = off } + + # Controls whether the journal plugin sends back write responses in the same order + # as it received requests. + # + # Originally Akka-Persistence implementation rearranged responses to match the request order. + # But this feature wasn't guaranteed by the Akka's test suite, and nothing in Akka itself relied on it. + # + # As this ordering is global, slow write requests for some entities can stall writes for all, + # which can cause latency issues under load. + # + # The old behaviour is still enabled by default ("on"). After more testing on existing applications, + # the default might be switched to "off", and eventually this option might be removed altogeter, leaving + # "off" the only behaviour available. + write-response-global-order = on } # Fallback settings for snapshot store plugin configurations diff --git a/persistence/src/main/scala/org/apache/pekko/persistence/journal/AsyncWriteJournal.scala b/persistence/src/main/scala/org/apache/pekko/persistence/journal/AsyncWriteJournal.scala index 9bede9e068..ae27bd5c4f 100644 --- a/persistence/src/main/scala/org/apache/pekko/persistence/journal/AsyncWriteJournal.scala +++ b/persistence/src/main/scala/org/apache/pekko/persistence/journal/AsyncWriteJournal.scala @@ -22,6 +22,7 @@ import scala.util.control.NonFatal import org.apache.pekko import pekko.actor._ +import pekko.annotation.InternalApi import pekko.pattern.CircuitBreaker import pekko.pattern.pipe import pekko.persistence._ @@ -67,9 +68,22 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery { final val receiveWriteJournal: Actor.Receive = { // cannot be a val in the trait due to binary compatibility val replayDebugEnabled: Boolean = config.getBoolean("replay-filter.debug") + val enableGlobalWriteResponseOrder: Boolean = config.getBoolean("write-response-global-order") + val eventStream = context.system.eventStream // used from Future callbacks implicit val ec: ExecutionContext = context.dispatcher + // should be a private method in the trait, but it needs the enableGlobalWriteResponseOrder field which can't be + // moved to the trait level because adding any fields there breaks bincompat + @InternalApi + def sendWriteResponse(msg: Any, snr: Long, target: ActorRef, sender: ActorRef): Unit = { + if (enableGlobalWriteResponseOrder) { + resequencer ! Desequenced(msg, snr, target, sender) + } else { + target.tell(msg, sender) + } + } + { case WriteMessages(messages, persistentActor, actorInstanceId) => val cctr = resequencerCounter @@ -100,7 +114,7 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery { writeResult.onComplete { case Success(results) => - resequencer ! Desequenced(WriteMessagesSuccessful, cctr, persistentActor, self) + sendWriteResponse(WriteMessagesSuccessful, cctr, persistentActor, self) val resultsIter = if (results.isEmpty) Iterator.fill(atomicWriteCount)(AsyncWriteJournal.successUnit) @@ -111,12 +125,12 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery { resultsIter.next() match { case Success(_) => a.payload.foreach { p => - resequencer ! Desequenced(WriteMessageSuccess(p, actorInstanceId), n, persistentActor, p.sender) + sendWriteResponse(WriteMessageSuccess(p, actorInstanceId), n, persistentActor, p.sender) n += 1 } case Failure(e) => a.payload.foreach { p => - resequencer ! Desequenced( + sendWriteResponse( WriteMessageRejected(p, e, actorInstanceId), n, persistentActor, @@ -126,21 +140,21 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery { } case r: NonPersistentRepr => - resequencer ! Desequenced(LoopMessageSuccess(r.payload, actorInstanceId), n, persistentActor, r.sender) + sendWriteResponse(LoopMessageSuccess(r.payload, actorInstanceId), n, persistentActor, r.sender) n += 1 } case Failure(e) => - resequencer ! Desequenced(WriteMessagesFailed(e, atomicWriteCount), cctr, persistentActor, self) + sendWriteResponse(WriteMessagesFailed(e, atomicWriteCount), cctr, persistentActor, self) var n = cctr + 1 messages.foreach { case a: AtomicWrite => a.payload.foreach { p => - resequencer ! Desequenced(WriteMessageFailure(p, e, actorInstanceId), n, persistentActor, p.sender) + sendWriteResponse(WriteMessageFailure(p, e, actorInstanceId), n, persistentActor, p.sender) n += 1 } case r: NonPersistentRepr => - resequencer ! Desequenced(LoopMessageSuccess(r.payload, actorInstanceId), n, persistentActor, r.sender) + sendWriteResponse(LoopMessageSuccess(r.payload, actorInstanceId), n, persistentActor, r.sender) n += 1 } } diff --git a/persistence/src/test/scala/org/apache/pekko/persistence/journal/AsyncWriteJournalResponseOrderSpec.scala b/persistence/src/test/scala/org/apache/pekko/persistence/journal/AsyncWriteJournalResponseOrderSpec.scala new file mode 100644 index 0000000000..a0faa633c0 --- /dev/null +++ b/persistence/src/test/scala/org/apache/pekko/persistence/journal/AsyncWriteJournalResponseOrderSpec.scala @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.persistence.journal + +import org.apache.pekko.persistence.{ AtomicWrite, JournalProtocol, PersistenceSpec, PersistentRepr } +import org.apache.pekko.testkit.ImplicitSender + +import scala.collection.immutable +import scala.concurrent.{ ExecutionContext, Future, Promise } +import scala.util.Try + +/** + * Verifies write response ordering logic for [[AsyncWriteJournal]]. + * + * Checkout write-response-global-order config option for more information. + */ +class AsyncWriteJournalResponseOrderSpec + extends PersistenceSpec( + PersistenceSpec.config( + plugin = "", // we will provide explicit plugin IDs later + test = classOf[AsyncWriteJournalResponseOrderSpec].getSimpleName, + extraConfig = Some( + s""" + |pekko.persistence.journal.reverse-plugin { + | with-global-order { + | class = "${classOf[AsyncWriteJournalResponseOrderSpec.ReversePlugin].getName}" + | + | write-response-global-order = on + | } + | no-global-order { + | class = "${classOf[AsyncWriteJournalResponseOrderSpec.ReversePlugin].getName}" + | + | write-response-global-order = off + | } + |} + |""".stripMargin + ))) with ImplicitSender { + + import AsyncWriteJournalResponseOrderSpec._ + + "AsyncWriteJournal" must { + "return write responses in request order if global response order is enabled" in { + val pluginRef = + extension.journalFor(journalPluginId = "pekko.persistence.journal.reverse-plugin.with-global-order") + + pluginRef ! mkWriteMessages(1) + pluginRef ! mkWriteMessages(2) + pluginRef ! mkWriteMessages(3) + + pluginRef ! CompleteWriteOps + + getMessageNumsFromResponses(receiveN(6)) shouldEqual Vector(1, 2, 3) + } + + "return write responses in completion order if global response order is disabled" in { + val pluginRef = + extension.journalFor(journalPluginId = "pekko.persistence.journal.reverse-plugin.no-global-order") + + pluginRef ! mkWriteMessages(1) + pluginRef ! mkWriteMessages(2) + pluginRef ! mkWriteMessages(3) + + pluginRef ! CompleteWriteOps + + getMessageNumsFromResponses(receiveN(6)) shouldEqual Vector(3, 2, 1) + } + } + + private def mkWriteMessages(num: Int): JournalProtocol.WriteMessages = JournalProtocol.WriteMessages( + messages = Vector(AtomicWrite(PersistentRepr( + payload = num, + sequenceNr = 0L, + persistenceId = num.toString + ))), + persistentActor = self, + actorInstanceId = 1 + ) + + private def getMessageNumsFromResponses(responses: Seq[AnyRef]): Vector[Int] = responses.collect { + case successResponse: JournalProtocol.WriteMessageSuccess => + successResponse.persistent.payload.asInstanceOf[Int] + }.toVector +} + +private object AsyncWriteJournalResponseOrderSpec { + case object CompleteWriteOps + + /** + * Accumulates asyncWriteMessages requests and completes them in reverse receive order on [[CompleteWriteOps]] command + */ + class ReversePlugin extends AsyncWriteJournal { + + private implicit val ec: ExecutionContext = context.dispatcher + + private var pendingOps: Vector[Promise[Unit]] = Vector.empty + + override def receivePluginInternal: Receive = { + case CompleteWriteOps => + pendingOps.reverse.foreach(_.success(())) + pendingOps = Vector.empty + } + + override def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] = { + val responsePromise = Promise[Unit]() + pendingOps = pendingOps :+ responsePromise + responsePromise.future.map(_ => Vector.empty) + } + + override def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long): Future[Unit] = ??? + + override def asyncReplayMessages(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)( + recoveryCallback: PersistentRepr => Unit): Future[Unit] = ??? + + override def asyncReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] = ??? + } +}