* Add AsyncWriteJournal option for disabling Resequencer (#2026) * Add @InternalApi as requested in review * Update AsyncWriteJournal.scala --------- Co-authored-by: PJ Fanning <pjfanning@users.noreply.github.com>
This commit is contained in:
parent
e0619c34fb
commit
5d139e8b7c
3 changed files with 166 additions and 7 deletions
|
|
@ -151,6 +151,20 @@ pekko.persistence {
|
||||||
# replayed event.
|
# replayed event.
|
||||||
debug = off
|
debug = off
|
||||||
}
|
}
|
||||||
|
|
||||||
|
# Controls whether the journal plugin sends back write responses in the same order
|
||||||
|
# as it received requests.
|
||||||
|
#
|
||||||
|
# Originally Akka-Persistence implementation rearranged responses to match the request order.
|
||||||
|
# But this feature wasn't guaranteed by the Akka's test suite, and nothing in Akka itself relied on it.
|
||||||
|
#
|
||||||
|
# As this ordering is global, slow write requests for some entities can stall writes for all,
|
||||||
|
# which can cause latency issues under load.
|
||||||
|
#
|
||||||
|
# The old behaviour is still enabled by default ("on"). After more testing on existing applications,
|
||||||
|
# the default might be switched to "off", and eventually this option might be removed altogeter, leaving
|
||||||
|
# "off" the only behaviour available.
|
||||||
|
write-response-global-order = on
|
||||||
}
|
}
|
||||||
|
|
||||||
# Fallback settings for snapshot store plugin configurations
|
# Fallback settings for snapshot store plugin configurations
|
||||||
|
|
|
||||||
|
|
@ -22,6 +22,7 @@ import scala.util.control.NonFatal
|
||||||
|
|
||||||
import org.apache.pekko
|
import org.apache.pekko
|
||||||
import pekko.actor._
|
import pekko.actor._
|
||||||
|
import pekko.annotation.InternalApi
|
||||||
import pekko.pattern.CircuitBreaker
|
import pekko.pattern.CircuitBreaker
|
||||||
import pekko.pattern.pipe
|
import pekko.pattern.pipe
|
||||||
import pekko.persistence._
|
import pekko.persistence._
|
||||||
|
|
@ -67,9 +68,22 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery {
|
||||||
final val receiveWriteJournal: Actor.Receive = {
|
final val receiveWriteJournal: Actor.Receive = {
|
||||||
// cannot be a val in the trait due to binary compatibility
|
// cannot be a val in the trait due to binary compatibility
|
||||||
val replayDebugEnabled: Boolean = config.getBoolean("replay-filter.debug")
|
val replayDebugEnabled: Boolean = config.getBoolean("replay-filter.debug")
|
||||||
|
val enableGlobalWriteResponseOrder: Boolean = config.getBoolean("write-response-global-order")
|
||||||
|
|
||||||
val eventStream = context.system.eventStream // used from Future callbacks
|
val eventStream = context.system.eventStream // used from Future callbacks
|
||||||
implicit val ec: ExecutionContext = context.dispatcher
|
implicit val ec: ExecutionContext = context.dispatcher
|
||||||
|
|
||||||
|
// should be a private method in the trait, but it needs the enableGlobalWriteResponseOrder field which can't be
|
||||||
|
// moved to the trait level because adding any fields there breaks bincompat
|
||||||
|
@InternalApi
|
||||||
|
def sendWriteResponse(msg: Any, snr: Long, target: ActorRef, sender: ActorRef): Unit = {
|
||||||
|
if (enableGlobalWriteResponseOrder) {
|
||||||
|
resequencer ! Desequenced(msg, snr, target, sender)
|
||||||
|
} else {
|
||||||
|
target.tell(msg, sender)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
case WriteMessages(messages, persistentActor, actorInstanceId) =>
|
case WriteMessages(messages, persistentActor, actorInstanceId) =>
|
||||||
val cctr = resequencerCounter
|
val cctr = resequencerCounter
|
||||||
|
|
@ -100,7 +114,7 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery {
|
||||||
|
|
||||||
writeResult.onComplete {
|
writeResult.onComplete {
|
||||||
case Success(results) =>
|
case Success(results) =>
|
||||||
resequencer ! Desequenced(WriteMessagesSuccessful, cctr, persistentActor, self)
|
sendWriteResponse(WriteMessagesSuccessful, cctr, persistentActor, self)
|
||||||
|
|
||||||
val resultsIter =
|
val resultsIter =
|
||||||
if (results.isEmpty) Iterator.fill(atomicWriteCount)(AsyncWriteJournal.successUnit)
|
if (results.isEmpty) Iterator.fill(atomicWriteCount)(AsyncWriteJournal.successUnit)
|
||||||
|
|
@ -111,12 +125,12 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery {
|
||||||
resultsIter.next() match {
|
resultsIter.next() match {
|
||||||
case Success(_) =>
|
case Success(_) =>
|
||||||
a.payload.foreach { p =>
|
a.payload.foreach { p =>
|
||||||
resequencer ! Desequenced(WriteMessageSuccess(p, actorInstanceId), n, persistentActor, p.sender)
|
sendWriteResponse(WriteMessageSuccess(p, actorInstanceId), n, persistentActor, p.sender)
|
||||||
n += 1
|
n += 1
|
||||||
}
|
}
|
||||||
case Failure(e) =>
|
case Failure(e) =>
|
||||||
a.payload.foreach { p =>
|
a.payload.foreach { p =>
|
||||||
resequencer ! Desequenced(
|
sendWriteResponse(
|
||||||
WriteMessageRejected(p, e, actorInstanceId),
|
WriteMessageRejected(p, e, actorInstanceId),
|
||||||
n,
|
n,
|
||||||
persistentActor,
|
persistentActor,
|
||||||
|
|
@ -126,21 +140,21 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery {
|
||||||
}
|
}
|
||||||
|
|
||||||
case r: NonPersistentRepr =>
|
case r: NonPersistentRepr =>
|
||||||
resequencer ! Desequenced(LoopMessageSuccess(r.payload, actorInstanceId), n, persistentActor, r.sender)
|
sendWriteResponse(LoopMessageSuccess(r.payload, actorInstanceId), n, persistentActor, r.sender)
|
||||||
n += 1
|
n += 1
|
||||||
}
|
}
|
||||||
|
|
||||||
case Failure(e) =>
|
case Failure(e) =>
|
||||||
resequencer ! Desequenced(WriteMessagesFailed(e, atomicWriteCount), cctr, persistentActor, self)
|
sendWriteResponse(WriteMessagesFailed(e, atomicWriteCount), cctr, persistentActor, self)
|
||||||
var n = cctr + 1
|
var n = cctr + 1
|
||||||
messages.foreach {
|
messages.foreach {
|
||||||
case a: AtomicWrite =>
|
case a: AtomicWrite =>
|
||||||
a.payload.foreach { p =>
|
a.payload.foreach { p =>
|
||||||
resequencer ! Desequenced(WriteMessageFailure(p, e, actorInstanceId), n, persistentActor, p.sender)
|
sendWriteResponse(WriteMessageFailure(p, e, actorInstanceId), n, persistentActor, p.sender)
|
||||||
n += 1
|
n += 1
|
||||||
}
|
}
|
||||||
case r: NonPersistentRepr =>
|
case r: NonPersistentRepr =>
|
||||||
resequencer ! Desequenced(LoopMessageSuccess(r.payload, actorInstanceId), n, persistentActor, r.sender)
|
sendWriteResponse(LoopMessageSuccess(r.payload, actorInstanceId), n, persistentActor, r.sender)
|
||||||
n += 1
|
n += 1
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,131 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.pekko.persistence.journal
|
||||||
|
|
||||||
|
import org.apache.pekko.persistence.{ AtomicWrite, JournalProtocol, PersistenceSpec, PersistentRepr }
|
||||||
|
import org.apache.pekko.testkit.ImplicitSender
|
||||||
|
|
||||||
|
import scala.collection.immutable
|
||||||
|
import scala.concurrent.{ ExecutionContext, Future, Promise }
|
||||||
|
import scala.util.Try
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Verifies write response ordering logic for [[AsyncWriteJournal]].
|
||||||
|
*
|
||||||
|
* Checkout write-response-global-order config option for more information.
|
||||||
|
*/
|
||||||
|
class AsyncWriteJournalResponseOrderSpec
|
||||||
|
extends PersistenceSpec(
|
||||||
|
PersistenceSpec.config(
|
||||||
|
plugin = "", // we will provide explicit plugin IDs later
|
||||||
|
test = classOf[AsyncWriteJournalResponseOrderSpec].getSimpleName,
|
||||||
|
extraConfig = Some(
|
||||||
|
s"""
|
||||||
|
|pekko.persistence.journal.reverse-plugin {
|
||||||
|
| with-global-order {
|
||||||
|
| class = "${classOf[AsyncWriteJournalResponseOrderSpec.ReversePlugin].getName}"
|
||||||
|
|
|
||||||
|
| write-response-global-order = on
|
||||||
|
| }
|
||||||
|
| no-global-order {
|
||||||
|
| class = "${classOf[AsyncWriteJournalResponseOrderSpec.ReversePlugin].getName}"
|
||||||
|
|
|
||||||
|
| write-response-global-order = off
|
||||||
|
| }
|
||||||
|
|}
|
||||||
|
|""".stripMargin
|
||||||
|
))) with ImplicitSender {
|
||||||
|
|
||||||
|
import AsyncWriteJournalResponseOrderSpec._
|
||||||
|
|
||||||
|
"AsyncWriteJournal" must {
|
||||||
|
"return write responses in request order if global response order is enabled" in {
|
||||||
|
val pluginRef =
|
||||||
|
extension.journalFor(journalPluginId = "pekko.persistence.journal.reverse-plugin.with-global-order")
|
||||||
|
|
||||||
|
pluginRef ! mkWriteMessages(1)
|
||||||
|
pluginRef ! mkWriteMessages(2)
|
||||||
|
pluginRef ! mkWriteMessages(3)
|
||||||
|
|
||||||
|
pluginRef ! CompleteWriteOps
|
||||||
|
|
||||||
|
getMessageNumsFromResponses(receiveN(6)) shouldEqual Vector(1, 2, 3)
|
||||||
|
}
|
||||||
|
|
||||||
|
"return write responses in completion order if global response order is disabled" in {
|
||||||
|
val pluginRef =
|
||||||
|
extension.journalFor(journalPluginId = "pekko.persistence.journal.reverse-plugin.no-global-order")
|
||||||
|
|
||||||
|
pluginRef ! mkWriteMessages(1)
|
||||||
|
pluginRef ! mkWriteMessages(2)
|
||||||
|
pluginRef ! mkWriteMessages(3)
|
||||||
|
|
||||||
|
pluginRef ! CompleteWriteOps
|
||||||
|
|
||||||
|
getMessageNumsFromResponses(receiveN(6)) shouldEqual Vector(3, 2, 1)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private def mkWriteMessages(num: Int): JournalProtocol.WriteMessages = JournalProtocol.WriteMessages(
|
||||||
|
messages = Vector(AtomicWrite(PersistentRepr(
|
||||||
|
payload = num,
|
||||||
|
sequenceNr = 0L,
|
||||||
|
persistenceId = num.toString
|
||||||
|
))),
|
||||||
|
persistentActor = self,
|
||||||
|
actorInstanceId = 1
|
||||||
|
)
|
||||||
|
|
||||||
|
private def getMessageNumsFromResponses(responses: Seq[AnyRef]): Vector[Int] = responses.collect {
|
||||||
|
case successResponse: JournalProtocol.WriteMessageSuccess =>
|
||||||
|
successResponse.persistent.payload.asInstanceOf[Int]
|
||||||
|
}.toVector
|
||||||
|
}
|
||||||
|
|
||||||
|
private object AsyncWriteJournalResponseOrderSpec {
|
||||||
|
case object CompleteWriteOps
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Accumulates asyncWriteMessages requests and completes them in reverse receive order on [[CompleteWriteOps]] command
|
||||||
|
*/
|
||||||
|
class ReversePlugin extends AsyncWriteJournal {
|
||||||
|
|
||||||
|
private implicit val ec: ExecutionContext = context.dispatcher
|
||||||
|
|
||||||
|
private var pendingOps: Vector[Promise[Unit]] = Vector.empty
|
||||||
|
|
||||||
|
override def receivePluginInternal: Receive = {
|
||||||
|
case CompleteWriteOps =>
|
||||||
|
pendingOps.reverse.foreach(_.success(()))
|
||||||
|
pendingOps = Vector.empty
|
||||||
|
}
|
||||||
|
|
||||||
|
override def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] = {
|
||||||
|
val responsePromise = Promise[Unit]()
|
||||||
|
pendingOps = pendingOps :+ responsePromise
|
||||||
|
responsePromise.future.map(_ => Vector.empty)
|
||||||
|
}
|
||||||
|
|
||||||
|
override def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long): Future[Unit] = ???
|
||||||
|
|
||||||
|
override def asyncReplayMessages(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)(
|
||||||
|
recoveryCallback: PersistentRepr => Unit): Future[Unit] = ???
|
||||||
|
|
||||||
|
override def asyncReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] = ???
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
Add table
Add a link
Reference in a new issue