From 6b23195385b28eeea91e0712a3158d669f23072b Mon Sep 17 00:00:00 2001 From: Arnaud Burlet Date: Wed, 29 Jan 2020 11:38:20 +0100 Subject: [PATCH 1/2] Shield unsafe journal plugins from corner case, #28541 --- .../scala/akka/persistence/journal/AsyncWriteJournal.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala index 51ef61739d..3ced600632 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala @@ -69,7 +69,8 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery { val atomicWriteCount = messages.count(_.isInstanceOf[AtomicWrite]) val prepared = Try(preparePersistentBatch(messages)) val writeResult = (prepared match { - case Success(prep) => + case Success(prep) if prep.isEmpty => Future.successful(Nil) + case Success(prep) => // try in case the asyncWriteMessages throws try breaker.withCircuitBreaker(asyncWriteMessages(prep)) catch { case NonFatal(e) => Future.failed(e) } From 6c4aca16572024e0f6673d4c9e982e0c85117a78 Mon Sep 17 00:00:00 2001 From: Arnaud Burlet Date: Tue, 11 Feb 2020 13:30:51 +0100 Subject: [PATCH 2/2] Add comment --- .../akka/persistence/journal/AsyncWriteJournal.scala | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala index 3ced600632..442236be49 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala @@ -69,8 +69,12 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery { val atomicWriteCount = messages.count(_.isInstanceOf[AtomicWrite]) val prepared = Try(preparePersistentBatch(messages)) val writeResult = (prepared match { - case Success(prep) if prep.isEmpty => Future.successful(Nil) - case Success(prep) => + case Success(prep) if prep.isEmpty => + // prep is empty when all messages are instances of NonPersistentRepr (used for defer) in that case, + // we continue right away without calling the journal plugin (most plugins fail calling head on empty Seq). + // Ordering of the replies is handled by Resequencer + Future.successful(Nil) + case Success(prep) => // try in case the asyncWriteMessages throws try breaker.withCircuitBreaker(asyncWriteMessages(prep)) catch { case NonFatal(e) => Future.failed(e) }