fix NPE in SnapshotStore, #26580

* access of context.system from Future callback
* also changed import context.dispatcher, shouldn't be needed but
  rather safe than sorry
This commit is contained in:
Patrik Nordwall 2019-03-20 13:10:52 +01:00
parent 886088f03b
commit 610a89c2d0
2 changed files with 84 additions and 74 deletions

View file

@ -5,14 +5,17 @@
package akka.persistence.journal package akka.persistence.journal
import scala.concurrent.duration._ import scala.concurrent.duration._
import akka.actor._ import akka.actor._
import akka.pattern.pipe import akka.pattern.pipe
import akka.persistence._ import akka.persistence._
import akka.util.Helpers.toRootLowerCase import akka.util.Helpers.toRootLowerCase
import scala.collection.immutable import scala.collection.immutable
import scala.concurrent.ExecutionContext
import scala.concurrent.Future import scala.concurrent.Future
import scala.util.{ Failure, Success, Try } import scala.util.{ Failure, Success, Try }
import scala.util.control.NonFatal import scala.util.control.NonFatal
import akka.pattern.CircuitBreaker import akka.pattern.CircuitBreaker
/** /**
@ -21,7 +24,6 @@ import akka.pattern.CircuitBreaker
trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery { trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery {
import AsyncWriteJournal._ import AsyncWriteJournal._
import JournalProtocol._ import JournalProtocol._
import context.dispatcher
private val extension = Persistence(context.system) private val extension = Persistence(context.system)
private val publish = extension.settings.internal.publishPluginCommands private val publish = extension.settings.internal.publishPluginCommands
@ -56,6 +58,8 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery {
final val receiveWriteJournal: Actor.Receive = { final val receiveWriteJournal: Actor.Receive = {
// cannot be a val in the trait due to binary compatibility // cannot be a val in the trait due to binary compatibility
val replayDebugEnabled: Boolean = config.getBoolean("replay-filter.debug") val replayDebugEnabled: Boolean = config.getBoolean("replay-filter.debug")
val eventStream = context.system.eventStream // used from Future callbacks
implicit val ec: ExecutionContext = context.dispatcher
{ {
case WriteMessages(messages, persistentActor, actorInstanceId) => case WriteMessages(messages, persistentActor, actorInstanceId) =>
@ -71,7 +75,7 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery {
catch { case NonFatal(e) => Future.failed(e) } catch { case NonFatal(e) => Future.failed(e) }
case f @ Failure(_) => case f @ Failure(_) =>
// exception from preparePersistentBatch => rejected // exception from preparePersistentBatch => rejected
Future.successful(messages.collect { case a: AtomicWrite => f }) Future.successful(messages.collect { case _: AtomicWrite => f })
}).map { results => }).map { results =>
if (results.nonEmpty && results.size != atomicWriteCount) if (results.nonEmpty && results.size != atomicWriteCount)
throw new IllegalStateException( throw new IllegalStateException(
@ -171,7 +175,7 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery {
} }
.pipeTo(replyTo) .pipeTo(replyTo)
.foreach { _ => .foreach { _ =>
if (publish) context.system.eventStream.publish(r) if (publish) eventStream.publish(r)
} }
case d @ DeleteMessagesTo(persistenceId, toSequenceNr, persistentActor) => case d @ DeleteMessagesTo(persistenceId, toSequenceNr, persistentActor) =>
@ -185,7 +189,7 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery {
} }
.pipeTo(persistentActor) .pipeTo(persistentActor)
.onComplete { _ => .onComplete { _ =>
if (publish) context.system.eventStream.publish(d) if (publish) eventStream.publish(d)
} }
} }
} }

View file

@ -4,8 +4,10 @@
package akka.persistence.snapshot package akka.persistence.snapshot
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.concurrent.Future import scala.concurrent.Future
import akka.actor._ import akka.actor._
import akka.pattern.pipe import akka.pattern.pipe
import akka.persistence._ import akka.persistence._
@ -16,7 +18,6 @@ import akka.pattern.CircuitBreaker
*/ */
trait SnapshotStore extends Actor with ActorLogging { trait SnapshotStore extends Actor with ActorLogging {
import SnapshotProtocol._ import SnapshotProtocol._
import context.dispatcher
private val extension = Persistence(context.system) private val extension = Persistence(context.system)
private val publish = extension.settings.internal.publishPluginCommands private val publish = extension.settings.internal.publishPluginCommands
@ -32,82 +33,87 @@ trait SnapshotStore extends Actor with ActorLogging {
final def receive = receiveSnapshotStore.orElse[Any, Unit](receivePluginInternal) final def receive = receiveSnapshotStore.orElse[Any, Unit](receivePluginInternal)
final val receiveSnapshotStore: Actor.Receive = { final val receiveSnapshotStore: Actor.Receive = {
case LoadSnapshot(persistenceId, criteria, toSequenceNr) => val eventStream = context.system.eventStream // used from Future callbacks
if (criteria == SnapshotSelectionCriteria.None) { implicit val ec: ExecutionContext = context.dispatcher
senderPersistentActor() ! LoadSnapshotResult(snapshot = None, toSequenceNr)
} else { {
case LoadSnapshot(persistenceId, criteria, toSequenceNr) =>
if (criteria == SnapshotSelectionCriteria.None) {
senderPersistentActor() ! LoadSnapshotResult(snapshot = None, toSequenceNr)
} else {
breaker
.withCircuitBreaker(loadAsync(persistenceId, criteria.limit(toSequenceNr)))
.map { sso =>
LoadSnapshotResult(sso, toSequenceNr)
}
.recover {
case e => LoadSnapshotFailed(e)
}
.pipeTo(senderPersistentActor())
}
case SaveSnapshot(metadata, snapshot) =>
val md = metadata.copy(timestamp = System.currentTimeMillis)
breaker breaker
.withCircuitBreaker(loadAsync(persistenceId, criteria.limit(toSequenceNr))) .withCircuitBreaker(saveAsync(md, snapshot))
.map { sso => .map { _ =>
LoadSnapshotResult(sso, toSequenceNr) SaveSnapshotSuccess(md)
} }
.recover { .recover {
case e => LoadSnapshotFailed(e) case e => SaveSnapshotFailure(metadata, e)
} to (self, senderPersistentActor())
case evt: SaveSnapshotSuccess =>
try tryReceivePluginInternal(evt)
finally senderPersistentActor ! evt // sender is persistentActor
case evt @ SaveSnapshotFailure(metadata, _) =>
try {
tryReceivePluginInternal(evt)
breaker.withCircuitBreaker(deleteAsync(metadata))
} finally senderPersistentActor() ! evt // sender is persistentActor
case d @ DeleteSnapshot(metadata) =>
breaker
.withCircuitBreaker(deleteAsync(metadata))
.map {
case _ => DeleteSnapshotSuccess(metadata)
}
.recover {
case e => DeleteSnapshotFailure(metadata, e)
}
.pipeTo(self)(senderPersistentActor())
.onComplete {
case _ => if (publish) eventStream.publish(d)
} }
.pipeTo(senderPersistentActor())
}
case SaveSnapshot(metadata, snapshot) => case evt: DeleteSnapshotSuccess =>
val md = metadata.copy(timestamp = System.currentTimeMillis) try tryReceivePluginInternal(evt)
breaker finally senderPersistentActor() ! evt
.withCircuitBreaker(saveAsync(md, snapshot)) case evt: DeleteSnapshotFailure =>
.map { _ => try tryReceivePluginInternal(evt)
SaveSnapshotSuccess(md) finally senderPersistentActor() ! evt
}
.recover {
case e => SaveSnapshotFailure(metadata, e)
} to (self, senderPersistentActor())
case evt: SaveSnapshotSuccess => case d @ DeleteSnapshots(persistenceId, criteria) =>
try tryReceivePluginInternal(evt) breaker
finally senderPersistentActor ! evt // sender is persistentActor .withCircuitBreaker(deleteAsync(persistenceId, criteria))
case evt @ SaveSnapshotFailure(metadata, _) => .map {
try { case _ => DeleteSnapshotsSuccess(criteria)
tryReceivePluginInternal(evt) }
breaker.withCircuitBreaker(deleteAsync(metadata)) .recover {
} finally senderPersistentActor() ! evt // sender is persistentActor case e => DeleteSnapshotsFailure(criteria, e)
}
.pipeTo(self)(senderPersistentActor())
.onComplete {
case _ => if (publish) eventStream.publish(d)
}
case d @ DeleteSnapshot(metadata) => case evt: DeleteSnapshotsFailure =>
breaker try tryReceivePluginInternal(evt)
.withCircuitBreaker(deleteAsync(metadata)) finally senderPersistentActor() ! evt // sender is persistentActor
.map { case evt: DeleteSnapshotsSuccess =>
case _ => DeleteSnapshotSuccess(metadata) try tryReceivePluginInternal(evt)
} finally senderPersistentActor() ! evt
.recover { }
case e => DeleteSnapshotFailure(metadata, e)
}
.pipeTo(self)(senderPersistentActor())
.onComplete {
case _ => if (publish) context.system.eventStream.publish(d)
}
case evt: DeleteSnapshotSuccess =>
try tryReceivePluginInternal(evt)
finally senderPersistentActor() ! evt
case evt: DeleteSnapshotFailure =>
try tryReceivePluginInternal(evt)
finally senderPersistentActor() ! evt
case d @ DeleteSnapshots(persistenceId, criteria) =>
breaker
.withCircuitBreaker(deleteAsync(persistenceId, criteria))
.map {
case _ => DeleteSnapshotsSuccess(criteria)
}
.recover {
case e => DeleteSnapshotsFailure(criteria, e)
}
.pipeTo(self)(senderPersistentActor())
.onComplete {
case _ => if (publish) context.system.eventStream.publish(d)
}
case evt: DeleteSnapshotsFailure =>
try tryReceivePluginInternal(evt)
finally senderPersistentActor() ! evt // sender is persistentActor
case evt: DeleteSnapshotsSuccess =>
try tryReceivePluginInternal(evt)
finally senderPersistentActor() ! evt
} }
/** Documents intent that the sender() is expected to be the PersistentActor */ /** Documents intent that the sender() is expected to be the PersistentActor */