dynamic config path for levelDBJournal and localSnapshotStore #21394

This commit is contained in:
ortigali 2016-12-13 18:04:39 +05:00 committed by Johan "Party Cannon" Andrén
parent b250d03cc9
commit 5b5cf4fc7b
8 changed files with 28 additions and 11 deletions

View file

@ -3,6 +3,7 @@
*/
package akka.actor
import java.lang.reflect.InvocationTargetException
import language.implicitConversions
import java.lang.{ Iterable JIterable }
import java.util.concurrent.TimeUnit
@ -328,7 +329,10 @@ abstract class SupervisorStrategy {
def logFailure(context: ActorContext, child: ActorRef, cause: Throwable, decision: Directive): Unit =
if (loggingEnabled) {
val logMessage = cause match {
case e: ActorInitializationException if e.getCause ne null e.getCause.getMessage
case e: ActorInitializationException if e.getCause ne null e.getCause match {
case ex: InvocationTargetException if ex.getCause ne null ex.getCause.getMessage
case ex ex.getMessage
}
case e e.getMessage
}
decision match {

View file

@ -83,7 +83,8 @@ class SharedLeveldbJournalSpec extends AkkaSpec(SharedLeveldbJournalSpec.config)
val probeA = new TestProbe(systemA)
val probeB = new TestProbe(systemB)
system.actorOf(Props[SharedLeveldbStore], "store")
val storeConfig = system.settings.config.getConfig("akka.persistence.journal.leveldb-shared")
system.actorOf(Props(classOf[SharedLeveldbStore], storeConfig), "store")
val storePath = RootActorPath(system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress) / "user" / "store"
val appA = systemA.actorOf(Props(classOf[ExampleApp], probeA.ref, storePath))

View file

@ -13,13 +13,14 @@ import scala.concurrent.Future
import akka.persistence.JournalProtocol.RecoverySuccess
import akka.persistence.JournalProtocol.ReplayMessagesFailure
import akka.pattern.pipe
import com.typesafe.config.Config
/**
* INTERNAL API.
*
* Journal backed by a local LevelDB store. For production use.
*/
private[persistence] class LeveldbJournal extends { val configPath = "akka.persistence.journal.leveldb" } with AsyncWriteJournal with LeveldbStore {
private[persistence] class LeveldbJournal(val config: Config) extends AsyncWriteJournal with LeveldbStore {
import LeveldbJournal._
override def receivePluginInternal: Receive = {

View file

@ -9,7 +9,7 @@ import java.io.File
import scala.collection.mutable
import akka.actor._
import akka.persistence._
import akka.persistence.journal.{ WriteJournalBase }
import akka.persistence.journal.WriteJournalBase
import akka.serialization.SerializationExtension
import org.iq80.leveldb._
import scala.collection.immutable
@ -17,14 +17,14 @@ import scala.util._
import scala.concurrent.Future
import scala.util.control.NonFatal
import akka.persistence.journal.Tagged
import com.typesafe.config.Config
/**
* INTERNAL API.
*/
private[persistence] trait LeveldbStore extends Actor with WriteJournalBase with LeveldbIdMapping with LeveldbRecovery {
val configPath: String
val config = context.system.settings.config.getConfig(configPath)
val config: Config
val nativeLeveldb = config.getBoolean("native")
val leveldbOptions = new Options().createIfMissing(true)

View file

@ -10,6 +10,7 @@ import scala.util.Success
import scala.util.Failure
import scala.util.control.NonFatal
import akka.persistence.AtomicWrite
import com.typesafe.config.Config
import scala.concurrent.Future
/**
@ -17,7 +18,7 @@ import scala.concurrent.Future
* set for each actor system that uses the store via `SharedLeveldbJournal.setStore`. The
* shared LevelDB store is for testing only.
*/
class SharedLeveldbStore extends { val configPath = "akka.persistence.journal.leveldb-shared.store" } with LeveldbStore {
class SharedLeveldbStore(cfg: Config) extends { override val config = cfg.getConfig("store") } with LeveldbStore {
import AsyncWriteTarget._
import context.dispatcher

View file

@ -14,6 +14,7 @@ import akka.persistence.serialization._
import akka.persistence.snapshot._
import akka.serialization.SerializationExtension
import akka.util.ByteString.UTF_8
import com.typesafe.config.Config
import scala.collection.immutable
import scala.concurrent.Future
@ -24,12 +25,11 @@ import scala.util._
*
* Local filesystem backed snapshot store.
*/
private[persistence] class LocalSnapshotStore extends SnapshotStore with ActorLogging {
private[persistence] class LocalSnapshotStore(config: Config) extends SnapshotStore with ActorLogging {
private val FilenamePattern = """^snapshot-(.+)-(\d+)-(\d+)""".r
private val persistenceIdStartIdx = 9 // Persistence ID starts after the "snapshot-" substring
import akka.util.Helpers._
private val config = context.system.settings.config.getConfig("akka.persistence.snapshot-store.local")
private val maxLoadAttempts = config.getInt("max-load-attempts")
.requiring(_ > 1, "max-load-attempts must be >= 1")

View file

@ -10,6 +10,7 @@ import akka.actor.{ ActorRef, Props }
import akka.event.Logging
import akka.persistence.snapshot.local.LocalSnapshotStore
import akka.testkit.{ EventFilter, ImplicitSender, TestEvent }
import com.typesafe.config.Config
import scala.concurrent.Future
import scala.concurrent.duration._
@ -71,7 +72,7 @@ object SnapshotFailureRobustnessSpec {
}
}
class FailingLocalSnapshotStore extends LocalSnapshotStore {
class FailingLocalSnapshotStore(config: Config) extends LocalSnapshotStore(config) {
override def save(metadata: SnapshotMetadata, snapshot: Any): Unit = {
if (metadata.sequenceNr == 2 || snapshot == "boom") {
val bytes = "b0rk".getBytes("UTF-8")
@ -81,7 +82,7 @@ object SnapshotFailureRobustnessSpec {
}
}
class DeleteFailingLocalSnapshotStore extends LocalSnapshotStore {
class DeleteFailingLocalSnapshotStore(config: Config) extends LocalSnapshotStore(config) {
override def deleteAsync(metadata: SnapshotMetadata): Future[Unit] = {
super.deleteAsync(metadata) // we actually delete it properly, but act as if it failed
Future.failed(new IOException("Failed to delete snapshot for some reason!"))
@ -97,6 +98,7 @@ object SnapshotFailureRobustnessSpec {
class SnapshotFailureRobustnessSpec extends PersistenceSpec(PersistenceSpec.config("leveldb", "SnapshotFailureRobustnessSpec", serialization = "off", extraConfig = Some(
"""
akka.persistence.snapshot-store.local.class = "akka.persistence.SnapshotFailureRobustnessSpec$FailingLocalSnapshotStore"
akka.persistence.snapshot-store.local-delete-fail = ${akka.persistence.snapshot-store.local}
akka.persistence.snapshot-store.local-delete-fail.class = "akka.persistence.SnapshotFailureRobustnessSpec$DeleteFailingLocalSnapshotStore"
"""))) with ImplicitSender {

View file

@ -629,6 +629,14 @@ object MiMa extends AutoPlugin {
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ddata.WriteAggregator.this"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.cluster.ddata.Replicator.write"),
// #21394 remove static config path of levelDBJournal and localSnapshotStore
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.snapshot.local.LocalSnapshotStore.this"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.journal.leveldb.LeveldbStore.configPath"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.journal.leveldb.LeveldbJournal.configPath"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.journal.leveldb.LeveldbJournal.this"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.journal.leveldb.SharedLeveldbStore.configPath"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.journal.leveldb.SharedLeveldbStore.this"),
// #20737 aligned test sink and test source stage factory methods types
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.stream.testkit.TestSinkStage.apply"),