From 610a89c2d01d863bfa7684e8383bcab70312108c Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 20 Mar 2019 13:10:52 +0100 Subject: [PATCH] fix NPE in SnapshotStore, #26580 * access of context.system from Future callback * also changed import context.dispatcher, shouldn't be needed but rather safe than sorry --- .../journal/AsyncWriteJournal.scala | 12 +- .../persistence/snapshot/SnapshotStore.scala | 146 +++++++++--------- 2 files changed, 84 insertions(+), 74 deletions(-) diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala index dd8e9f496b..3d41ef6dd0 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala @@ -5,14 +5,17 @@ package akka.persistence.journal import scala.concurrent.duration._ + import akka.actor._ import akka.pattern.pipe import akka.persistence._ import akka.util.Helpers.toRootLowerCase import scala.collection.immutable +import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.util.{ Failure, Success, Try } import scala.util.control.NonFatal + import akka.pattern.CircuitBreaker /** @@ -21,7 +24,6 @@ import akka.pattern.CircuitBreaker trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery { import AsyncWriteJournal._ import JournalProtocol._ - import context.dispatcher private val extension = Persistence(context.system) private val publish = extension.settings.internal.publishPluginCommands @@ -56,6 +58,8 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery { final val receiveWriteJournal: Actor.Receive = { // cannot be a val in the trait due to binary compatibility val replayDebugEnabled: Boolean = config.getBoolean("replay-filter.debug") + val eventStream = context.system.eventStream // used from Future callbacks + implicit val ec: ExecutionContext = context.dispatcher { case WriteMessages(messages, persistentActor, actorInstanceId) => @@ -71,7 +75,7 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery { catch { case NonFatal(e) => Future.failed(e) } case f @ Failure(_) => // exception from preparePersistentBatch => rejected - Future.successful(messages.collect { case a: AtomicWrite => f }) + Future.successful(messages.collect { case _: AtomicWrite => f }) }).map { results => if (results.nonEmpty && results.size != atomicWriteCount) throw new IllegalStateException( @@ -171,7 +175,7 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery { } .pipeTo(replyTo) .foreach { _ => - if (publish) context.system.eventStream.publish(r) + if (publish) eventStream.publish(r) } case d @ DeleteMessagesTo(persistenceId, toSequenceNr, persistentActor) => @@ -185,7 +189,7 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery { } .pipeTo(persistentActor) .onComplete { _ => - if (publish) context.system.eventStream.publish(d) + if (publish) eventStream.publish(d) } } } diff --git a/akka-persistence/src/main/scala/akka/persistence/snapshot/SnapshotStore.scala b/akka-persistence/src/main/scala/akka/persistence/snapshot/SnapshotStore.scala index 59d51d4f52..9539bf7a3e 100644 --- a/akka-persistence/src/main/scala/akka/persistence/snapshot/SnapshotStore.scala +++ b/akka-persistence/src/main/scala/akka/persistence/snapshot/SnapshotStore.scala @@ -4,8 +4,10 @@ package akka.persistence.snapshot +import scala.concurrent.ExecutionContext import scala.concurrent.duration._ import scala.concurrent.Future + import akka.actor._ import akka.pattern.pipe import akka.persistence._ @@ -16,7 +18,6 @@ import akka.pattern.CircuitBreaker */ trait SnapshotStore extends Actor with ActorLogging { import SnapshotProtocol._ - import context.dispatcher private val extension = Persistence(context.system) private val publish = extension.settings.internal.publishPluginCommands @@ -32,82 +33,87 @@ trait SnapshotStore extends Actor with ActorLogging { final def receive = receiveSnapshotStore.orElse[Any, Unit](receivePluginInternal) final val receiveSnapshotStore: Actor.Receive = { - case LoadSnapshot(persistenceId, criteria, toSequenceNr) => - if (criteria == SnapshotSelectionCriteria.None) { - senderPersistentActor() ! LoadSnapshotResult(snapshot = None, toSequenceNr) - } else { + val eventStream = context.system.eventStream // used from Future callbacks + implicit val ec: ExecutionContext = context.dispatcher + + { + case LoadSnapshot(persistenceId, criteria, toSequenceNr) => + if (criteria == SnapshotSelectionCriteria.None) { + senderPersistentActor() ! LoadSnapshotResult(snapshot = None, toSequenceNr) + } else { + breaker + .withCircuitBreaker(loadAsync(persistenceId, criteria.limit(toSequenceNr))) + .map { sso => + LoadSnapshotResult(sso, toSequenceNr) + } + .recover { + case e => LoadSnapshotFailed(e) + } + .pipeTo(senderPersistentActor()) + } + + case SaveSnapshot(metadata, snapshot) => + val md = metadata.copy(timestamp = System.currentTimeMillis) breaker - .withCircuitBreaker(loadAsync(persistenceId, criteria.limit(toSequenceNr))) - .map { sso => - LoadSnapshotResult(sso, toSequenceNr) + .withCircuitBreaker(saveAsync(md, snapshot)) + .map { _ => + SaveSnapshotSuccess(md) } .recover { - case e => LoadSnapshotFailed(e) + case e => SaveSnapshotFailure(metadata, e) + } to (self, senderPersistentActor()) + + case evt: SaveSnapshotSuccess => + try tryReceivePluginInternal(evt) + finally senderPersistentActor ! evt // sender is persistentActor + case evt @ SaveSnapshotFailure(metadata, _) => + try { + tryReceivePluginInternal(evt) + breaker.withCircuitBreaker(deleteAsync(metadata)) + } finally senderPersistentActor() ! evt // sender is persistentActor + + case d @ DeleteSnapshot(metadata) => + breaker + .withCircuitBreaker(deleteAsync(metadata)) + .map { + case _ => DeleteSnapshotSuccess(metadata) + } + .recover { + case e => DeleteSnapshotFailure(metadata, e) + } + .pipeTo(self)(senderPersistentActor()) + .onComplete { + case _ => if (publish) eventStream.publish(d) } - .pipeTo(senderPersistentActor()) - } - case SaveSnapshot(metadata, snapshot) => - val md = metadata.copy(timestamp = System.currentTimeMillis) - breaker - .withCircuitBreaker(saveAsync(md, snapshot)) - .map { _ => - SaveSnapshotSuccess(md) - } - .recover { - case e => SaveSnapshotFailure(metadata, e) - } to (self, senderPersistentActor()) + case evt: DeleteSnapshotSuccess => + try tryReceivePluginInternal(evt) + finally senderPersistentActor() ! evt + case evt: DeleteSnapshotFailure => + try tryReceivePluginInternal(evt) + finally senderPersistentActor() ! evt - case evt: SaveSnapshotSuccess => - try tryReceivePluginInternal(evt) - finally senderPersistentActor ! evt // sender is persistentActor - case evt @ SaveSnapshotFailure(metadata, _) => - try { - tryReceivePluginInternal(evt) - breaker.withCircuitBreaker(deleteAsync(metadata)) - } finally senderPersistentActor() ! evt // sender is persistentActor + case d @ DeleteSnapshots(persistenceId, criteria) => + breaker + .withCircuitBreaker(deleteAsync(persistenceId, criteria)) + .map { + case _ => DeleteSnapshotsSuccess(criteria) + } + .recover { + case e => DeleteSnapshotsFailure(criteria, e) + } + .pipeTo(self)(senderPersistentActor()) + .onComplete { + case _ => if (publish) eventStream.publish(d) + } - case d @ DeleteSnapshot(metadata) => - breaker - .withCircuitBreaker(deleteAsync(metadata)) - .map { - case _ => DeleteSnapshotSuccess(metadata) - } - .recover { - case e => DeleteSnapshotFailure(metadata, e) - } - .pipeTo(self)(senderPersistentActor()) - .onComplete { - case _ => if (publish) context.system.eventStream.publish(d) - } - - case evt: DeleteSnapshotSuccess => - try tryReceivePluginInternal(evt) - finally senderPersistentActor() ! evt - case evt: DeleteSnapshotFailure => - try tryReceivePluginInternal(evt) - finally senderPersistentActor() ! evt - - case d @ DeleteSnapshots(persistenceId, criteria) => - breaker - .withCircuitBreaker(deleteAsync(persistenceId, criteria)) - .map { - case _ => DeleteSnapshotsSuccess(criteria) - } - .recover { - case e => DeleteSnapshotsFailure(criteria, e) - } - .pipeTo(self)(senderPersistentActor()) - .onComplete { - case _ => if (publish) context.system.eventStream.publish(d) - } - - case evt: DeleteSnapshotsFailure => - try tryReceivePluginInternal(evt) - finally senderPersistentActor() ! evt // sender is persistentActor - case evt: DeleteSnapshotsSuccess => - try tryReceivePluginInternal(evt) - finally senderPersistentActor() ! evt + case evt: DeleteSnapshotsFailure => + try tryReceivePluginInternal(evt) + finally senderPersistentActor() ! evt // sender is persistentActor + case evt: DeleteSnapshotsSuccess => + try tryReceivePluginInternal(evt) + finally senderPersistentActor() ! evt + } } /** Documents intent that the sender() is expected to be the PersistentActor */