diff --git a/akka-actor/src/main/scala/akka/actor/FaultHandling.scala b/akka-actor/src/main/scala/akka/actor/FaultHandling.scala index 186888d725..74bdf1671a 100644 --- a/akka-actor/src/main/scala/akka/actor/FaultHandling.scala +++ b/akka-actor/src/main/scala/akka/actor/FaultHandling.scala @@ -3,6 +3,7 @@ */ package akka.actor +import java.lang.reflect.InvocationTargetException import language.implicitConversions import java.lang.{ Iterable ⇒ JIterable } import java.util.concurrent.TimeUnit @@ -328,7 +329,10 @@ abstract class SupervisorStrategy { def logFailure(context: ActorContext, child: ActorRef, cause: Throwable, decision: Directive): Unit = if (loggingEnabled) { val logMessage = cause match { - case e: ActorInitializationException if e.getCause ne null ⇒ e.getCause.getMessage + case e: ActorInitializationException if e.getCause ne null ⇒ e.getCause match { + case ex: InvocationTargetException if ex.getCause ne null ⇒ ex.getCause.getMessage + case ex ⇒ ex.getMessage + } case e ⇒ e.getMessage } decision match { diff --git a/akka-persistence-shared/src/test/scala/akka/persistence/journal/leveldb/SharedLeveldbJournalSpec.scala b/akka-persistence-shared/src/test/scala/akka/persistence/journal/leveldb/SharedLeveldbJournalSpec.scala index 6d3609372f..0df885ab91 100644 --- a/akka-persistence-shared/src/test/scala/akka/persistence/journal/leveldb/SharedLeveldbJournalSpec.scala +++ b/akka-persistence-shared/src/test/scala/akka/persistence/journal/leveldb/SharedLeveldbJournalSpec.scala @@ -83,7 +83,8 @@ class SharedLeveldbJournalSpec extends AkkaSpec(SharedLeveldbJournalSpec.config) val probeA = new TestProbe(systemA) val probeB = new TestProbe(systemB) - system.actorOf(Props[SharedLeveldbStore], "store") + val storeConfig = system.settings.config.getConfig("akka.persistence.journal.leveldb-shared") + system.actorOf(Props(classOf[SharedLeveldbStore], storeConfig), "store") val storePath = RootActorPath(system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress) / "user" / "store" val appA = systemA.actorOf(Props(classOf[ExampleApp], probeA.ref, storePath)) diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbJournal.scala index 01422af369..a6e8c3b3e2 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbJournal.scala @@ -13,13 +13,14 @@ import scala.concurrent.Future import akka.persistence.JournalProtocol.RecoverySuccess import akka.persistence.JournalProtocol.ReplayMessagesFailure import akka.pattern.pipe +import com.typesafe.config.Config /** * INTERNAL API. * * Journal backed by a local LevelDB store. For production use. */ -private[persistence] class LeveldbJournal extends { val configPath = "akka.persistence.journal.leveldb" } with AsyncWriteJournal with LeveldbStore { +private[persistence] class LeveldbJournal(val config: Config) extends AsyncWriteJournal with LeveldbStore { import LeveldbJournal._ override def receivePluginInternal: Receive = { diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbStore.scala b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbStore.scala index 6eb1931a9d..d9e0533d5b 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbStore.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbStore.scala @@ -9,7 +9,7 @@ import java.io.File import scala.collection.mutable import akka.actor._ import akka.persistence._ -import akka.persistence.journal.{ WriteJournalBase } +import akka.persistence.journal.WriteJournalBase import akka.serialization.SerializationExtension import org.iq80.leveldb._ import scala.collection.immutable @@ -17,14 +17,14 @@ import scala.util._ import scala.concurrent.Future import scala.util.control.NonFatal import akka.persistence.journal.Tagged +import com.typesafe.config.Config /** * INTERNAL API. */ private[persistence] trait LeveldbStore extends Actor with WriteJournalBase with LeveldbIdMapping with LeveldbRecovery { - val configPath: String - val config = context.system.settings.config.getConfig(configPath) + val config: Config val nativeLeveldb = config.getBoolean("native") val leveldbOptions = new Options().createIfMissing(true) diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/SharedLeveldbStore.scala b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/SharedLeveldbStore.scala index e24b52e1fa..d4f260da50 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/SharedLeveldbStore.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/leveldb/SharedLeveldbStore.scala @@ -10,6 +10,7 @@ import scala.util.Success import scala.util.Failure import scala.util.control.NonFatal import akka.persistence.AtomicWrite +import com.typesafe.config.Config import scala.concurrent.Future /** @@ -17,7 +18,7 @@ import scala.concurrent.Future * set for each actor system that uses the store via `SharedLeveldbJournal.setStore`. The * shared LevelDB store is for testing only. */ -class SharedLeveldbStore extends { val configPath = "akka.persistence.journal.leveldb-shared.store" } with LeveldbStore { +class SharedLeveldbStore(cfg: Config) extends { override val config = cfg.getConfig("store") } with LeveldbStore { import AsyncWriteTarget._ import context.dispatcher diff --git a/akka-persistence/src/main/scala/akka/persistence/snapshot/local/LocalSnapshotStore.scala b/akka-persistence/src/main/scala/akka/persistence/snapshot/local/LocalSnapshotStore.scala index 030e21c7dd..e20448b2ef 100644 --- a/akka-persistence/src/main/scala/akka/persistence/snapshot/local/LocalSnapshotStore.scala +++ b/akka-persistence/src/main/scala/akka/persistence/snapshot/local/LocalSnapshotStore.scala @@ -14,6 +14,7 @@ import akka.persistence.serialization._ import akka.persistence.snapshot._ import akka.serialization.SerializationExtension import akka.util.ByteString.UTF_8 +import com.typesafe.config.Config import scala.collection.immutable import scala.concurrent.Future @@ -24,12 +25,11 @@ import scala.util._ * * Local filesystem backed snapshot store. */ -private[persistence] class LocalSnapshotStore extends SnapshotStore with ActorLogging { +private[persistence] class LocalSnapshotStore(config: Config) extends SnapshotStore with ActorLogging { private val FilenamePattern = """^snapshot-(.+)-(\d+)-(\d+)""".r private val persistenceIdStartIdx = 9 // Persistence ID starts after the "snapshot-" substring import akka.util.Helpers._ - private val config = context.system.settings.config.getConfig("akka.persistence.snapshot-store.local") private val maxLoadAttempts = config.getInt("max-load-attempts") .requiring(_ > 1, "max-load-attempts must be >= 1") diff --git a/akka-persistence/src/test/scala/akka/persistence/SnapshotFailureRobustnessSpec.scala b/akka-persistence/src/test/scala/akka/persistence/SnapshotFailureRobustnessSpec.scala index 81859da6c9..599c7428ea 100644 --- a/akka-persistence/src/test/scala/akka/persistence/SnapshotFailureRobustnessSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/SnapshotFailureRobustnessSpec.scala @@ -10,6 +10,7 @@ import akka.actor.{ ActorRef, Props } import akka.event.Logging import akka.persistence.snapshot.local.LocalSnapshotStore import akka.testkit.{ EventFilter, ImplicitSender, TestEvent } +import com.typesafe.config.Config import scala.concurrent.Future import scala.concurrent.duration._ @@ -71,7 +72,7 @@ object SnapshotFailureRobustnessSpec { } } - class FailingLocalSnapshotStore extends LocalSnapshotStore { + class FailingLocalSnapshotStore(config: Config) extends LocalSnapshotStore(config) { override def save(metadata: SnapshotMetadata, snapshot: Any): Unit = { if (metadata.sequenceNr == 2 || snapshot == "boom") { val bytes = "b0rk".getBytes("UTF-8") @@ -81,7 +82,7 @@ object SnapshotFailureRobustnessSpec { } } - class DeleteFailingLocalSnapshotStore extends LocalSnapshotStore { + class DeleteFailingLocalSnapshotStore(config: Config) extends LocalSnapshotStore(config) { override def deleteAsync(metadata: SnapshotMetadata): Future[Unit] = { super.deleteAsync(metadata) // we actually delete it properly, but act as if it failed Future.failed(new IOException("Failed to delete snapshot for some reason!")) @@ -97,6 +98,7 @@ object SnapshotFailureRobustnessSpec { class SnapshotFailureRobustnessSpec extends PersistenceSpec(PersistenceSpec.config("leveldb", "SnapshotFailureRobustnessSpec", serialization = "off", extraConfig = Some( """ akka.persistence.snapshot-store.local.class = "akka.persistence.SnapshotFailureRobustnessSpec$FailingLocalSnapshotStore" + akka.persistence.snapshot-store.local-delete-fail = ${akka.persistence.snapshot-store.local} akka.persistence.snapshot-store.local-delete-fail.class = "akka.persistence.SnapshotFailureRobustnessSpec$DeleteFailingLocalSnapshotStore" """))) with ImplicitSender { diff --git a/project/MiMa.scala b/project/MiMa.scala index f4edf6089a..36cbec3bac 100644 --- a/project/MiMa.scala +++ b/project/MiMa.scala @@ -629,6 +629,14 @@ object MiMa extends AutoPlugin { ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ddata.WriteAggregator.this"), ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.cluster.ddata.Replicator.write"), + // #21394 remove static config path of levelDBJournal and localSnapshotStore + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.snapshot.local.LocalSnapshotStore.this"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.journal.leveldb.LeveldbStore.configPath"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.journal.leveldb.LeveldbJournal.configPath"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.journal.leveldb.LeveldbJournal.this"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.journal.leveldb.SharedLeveldbStore.configPath"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.journal.leveldb.SharedLeveldbStore.this"), + // #20737 aligned test sink and test source stage factory methods types ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.stream.testkit.TestSinkStage.apply"),