PersistencePluginProxy

* Rename to PersistencePluginProxy.
* Watch target journal.
* Create PersistencePluginProxyExtension to enable eager initialization of
  the persistence plugin.
* Add initialization via configuration.
* Add tests and documentation.
* Clearer log messages.
This commit is contained in:
Tal Pressman 2015-11-17 16:50:54 +02:00
parent d39b368cac
commit f610952ae7
9 changed files with 338 additions and 40 deletions

View file

@ -245,7 +245,7 @@ akka.persistence.journal.leveldb-shared {
akka.persistence.journal.proxy {
# Class name of the plugin.
class = "akka.persistence.journal.JournalProxy"
class = "akka.persistence.journal.PersistencePluginProxy"
# Dispatcher for the plugin actor.
plugin-dispatcher = "akka.actor.default-dispatcher"
# Set this to on in the configuration of the ActorSystem
@ -253,13 +253,15 @@ akka.persistence.journal.proxy {
start-target-journal = off
# The journal plugin config path to use for the target journal
target-journal-plugin = ""
# The address of the proxy to connect to from other nodes. Optional setting.
target-journal-address = ""
# Initialization timeout of target lookup
init-timeout = 10s
}
akka.persistence.snapshot-store.proxy {
# Class name of the plugin.
class = "akka.persistence.journal.JournalProxy"
class = "akka.persistence.journal.PersistencePluginProxy"
# Dispatcher for the plugin actor.
plugin-dispatcher = "akka.actor.default-dispatcher"
# Set this to on in the configuration of the ActorSystem
@ -267,6 +269,8 @@ akka.persistence.snapshot-store.proxy {
start-target-snapshot-store = off
# The journal plugin config path to use for the target snapshot-store
target-snapshot-store-plugin = ""
# The address of the proxy to connect to from other nodes. Optional setting.
target-snapshot-store-address = ""
# Initialization timeout of target lookup
init-timeout = 10s
}

View file

@ -3,34 +3,17 @@
*/
package akka.persistence.journal
import akka.util.Helpers.Requiring
import scala.concurrent.duration._
import akka.actor.Actor
import akka.actor.Stash
import scala.concurrent.duration.FiniteDuration
import akka.actor.ActorRef
import akka.persistence.JournalProtocol
import akka.actor.ActorSystem
import akka.persistence.Persistence
import scala.util.control.NoStackTrace
import java.net.URISyntaxException
import java.util.concurrent.TimeoutException
import akka.persistence.AtomicWrite
import akka.persistence.NonPersistentRepr
import akka.persistence.DeleteMessagesFailure
import akka.actor.ActorLogging
import com.typesafe.config.Config
import akka.actor.Address
import akka.actor.ActorIdentity
import akka.actor.RootActorPath
import akka.actor.Identify
import akka.actor.ReceiveTimeout
import akka.actor.ExtendedActorSystem
import akka.persistence.SaveSnapshotFailure
import akka.persistence.DeleteSnapshotFailure
import akka.persistence.DeleteSnapshotsFailure
import akka.persistence.SnapshotProtocol
object JournalProxy {
import akka.actor._
import akka.persistence.{ AtomicWrite, DeleteMessagesFailure, DeleteSnapshotFailure, DeleteSnapshotsFailure, JournalProtocol, NonPersistentRepr, Persistence, SaveSnapshotFailure, SnapshotProtocol }
import akka.util.Helpers.Requiring
import com.typesafe.config.Config
import scala.concurrent.duration._
object PersistencePluginProxy {
final case class TargetLocation(address: Address)
private case object InitTimeout
@ -40,6 +23,13 @@ object JournalProxy {
Persistence(system).snapshotStoreFor(null) ! TargetLocation(address)
}
def start(system: ActorSystem): Unit = {
Persistence(system).journalFor(null)
if (system.settings.config.getString("akka.persistence.snapshot-store.plugin") != "")
Persistence(system).snapshotStoreFor(null)
}
private sealed trait PluginType {
def qualifier: String
}
@ -51,9 +41,23 @@ object JournalProxy {
}
}
// FIXME document me
final class JournalProxy(config: Config) extends Actor with Stash with ActorLogging {
import JournalProxy._
/**
* PersistencePluginProxyExtensionImpl is an `Extension` that enables initialization of the `PersistencePluginProxy`
* via configuration, without requiring any code changes or the creation of any actors.
* @param system The actor system to initialize the extension for
*/
class PersistencePluginProxyExtensionImpl(system: ActorSystem) extends Extension {
PersistencePluginProxy.start(system)
}
object PersistencePluginProxyExtension extends ExtensionId[PersistencePluginProxyExtensionImpl] with ExtensionIdProvider {
override def createExtension(system: ExtendedActorSystem): PersistencePluginProxyExtensionImpl = new PersistencePluginProxyExtensionImpl(system)
override def lookup(): ExtensionId[_ <: Extension] = PersistencePluginProxyExtension
override def get(system: ActorSystem): PersistencePluginProxyExtensionImpl = super.get(system)
}
final class PersistencePluginProxy(config: Config) extends Actor with Stash with ActorLogging {
import PersistencePluginProxy._
import JournalProtocol._
import SnapshotProtocol._
@ -65,7 +69,7 @@ final class JournalProxy(config: Config) extends Actor with Stash with ActorLogg
throw new IllegalArgumentException("Unknown plugin type: " + other)
}
private val timeout: FiniteDuration = config.getDuration("init-timeout", MILLISECONDS).millis
private val initTimeout: FiniteDuration = config.getDuration("init-timeout", MILLISECONDS).millis
private val targetPluginId: String = {
val key = s"target-${pluginType.qualifier}-plugin"
config.getString(key).requiring(_ != "", s"$pluginId.$key must be defined")
@ -84,7 +88,18 @@ final class JournalProxy(config: Config) extends Actor with Stash with ActorLogg
}
context.become(active(target, targetAtThisNode = true))
} else {
context.system.scheduler.scheduleOnce(timeout, self, InitTimeout)(context.dispatcher)
val targetAddressKey = s"target-${pluginType.qualifier}-address"
val targetAddress = config.getString(targetAddressKey)
if (targetAddress != "") {
try {
log.info("Setting target {} address to {}", pluginType.qualifier, targetAddress)
PersistencePluginProxy.setTargetLocation(context.system, AddressFromURIString(targetAddress))
} catch {
case _: URISyntaxException log.warning("Invalid URL provided for target {} address: {}", pluginType.qualifier, targetAddress)
}
}
context.system.scheduler.scheduleOnce(initTimeout, self, InitTimeout)(context.dispatcher)
}
}
@ -92,7 +107,7 @@ final class JournalProxy(config: Config) extends Actor with Stash with ActorLogg
context.system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
private def timeoutException() = new TimeoutException(s"Target ${pluginType.qualifier} not initialized. " +
"Use `JournalProxy.setTargetLocation`")
s"Use `PersistencePluginProxy.setTargetLocation` or set `target-${pluginType.qualifier}-address`")
def receive = init
@ -101,9 +116,10 @@ final class JournalProxy(config: Config) extends Actor with Stash with ActorLogg
context.setReceiveTimeout(1.second) // for retries
context.become(identifying(address))
case InitTimeout
log.info("Initialization timeout, Use `JournalProxy.setTargetLocation`")
log.info("Initialization timed-out (after {}), Use `PersistencePluginProxy.setTargetLocation` or set `target-{}-address`", initTimeout, pluginType.qualifier)
context.become(initTimedOut)
unstashAll() // will trigger appropriate failures
case Terminated(_)
case msg
stash()
}
@ -124,9 +140,11 @@ final class JournalProxy(config: Config) extends Actor with Stash with ActorLogg
case ActorIdentity(`targetPluginId`, Some(target))
log.info("Found target {} at [{}]", pluginType.qualifier, address)
context.setReceiveTimeout(Duration.Undefined)
context.watch(target)
unstashAll()
context.become(active(target, address == selfAddress))
case _: ActorIdentity // will retry after ReceiveTimeout
case Terminated(_)
case ReceiveTimeout
sendIdentify(address)
}: Receive).orElse(init)
@ -135,9 +153,13 @@ final class JournalProxy(config: Config) extends Actor with Stash with ActorLogg
case TargetLocation(address)
if (targetAtThisNode && address != selfAddress)
becomeIdentifying(address)
case InitTimeout
case Terminated(`targetJournal`)
context.unwatch(targetJournal)
context.become(initTimedOut)
case Terminated(_)
case InitTimeout
case msg
targetJournal.forward(msg)
targetJournal forward msg
}
def initTimedOut: Receive = {
@ -173,9 +195,11 @@ final class JournalProxy(config: Config) extends Actor with Stash with ActorLogg
case TargetLocation(address)
becomeIdentifying(address)
case Terminated(_)
case other
val e = timeoutException()
log.error(e, "Failed JournalProxy request: {}", e.getMessage)
log.error(e, "Failed PersistencePluginProxy request: {}", e.getMessage)
}
}

View file

@ -0,0 +1,129 @@
/**
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence.journal.leveldb
import akka.actor._
import akka.persistence._
import akka.persistence.journal.PersistencePluginProxy
import akka.testkit.{ TestProbe, AkkaSpec }
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._
object PersistencePluginProxySpec {
lazy val config = ConfigFactory.parseString(
"""
akka {
actor {
provider = "akka.remote.RemoteActorRefProvider"
}
persistence {
journal {
plugin = "akka.persistence.journal.proxy"
proxy.target-journal-plugin = "akka.persistence.journal.inmem"
}
snapshot-store {
plugin = "akka.persistence.snapshot-store.proxy"
proxy.target-snapshot-store-plugin = "akka.persistence.snapshot-store.local"
local.dir = target/snapshots-PersistencePluginProxySpec
}
}
remote {
enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp {
hostname = "127.0.0.1"
port = 0
}
}
loglevel = ERROR
log-dead-letters = 0
log-dead-letters-during-shutdown = off
test.single-expect-default = 10s
}
""")
lazy val startTargetConfig = ConfigFactory.parseString(
"""
|akka.extensions = ["akka.persistence.journal.PersistencePluginProxyExtension"]
|akka.persistence {
| journal.proxy.start-target-journal = on
| snapshot-store.proxy.start-target-snapshot-store = on
|}
""".stripMargin)
def targetAddressConfig(system: ActorSystem) = ConfigFactory.parseString(
s"""
|akka.persistence.journal.proxy.target-journal-address = "${system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress}"
|akka.persistence.snapshot-store.proxy.target-snapshot-store-address = "${system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress}"
""".stripMargin)
class ExamplePersistentActor(probe: ActorRef, name: String) extends NamedPersistentActor(name) {
override def receiveRecover = {
case RecoveryCompleted // ignore
case payload
probe ! payload
}
override def receiveCommand = {
case payload
persist(payload) { _
probe ! payload
}
}
}
class ExampleApp(probe: ActorRef) extends Actor {
val p = context.actorOf(Props(classOf[ExamplePersistentActor], probe, context.system.name))
def receive = {
case m p forward m
}
}
}
class PersistencePluginProxySpec extends AkkaSpec(PersistencePluginProxySpec.startTargetConfig withFallback PersistencePluginProxySpec.config) with Cleanup {
import PersistencePluginProxySpec._
val systemA = ActorSystem("SysA", config)
val systemB = ActorSystem("SysB", targetAddressConfig(system) withFallback PersistencePluginProxySpec.config)
override protected def afterTermination() {
shutdown(systemA)
shutdown(systemB)
super.afterTermination()
}
"A persistence proxy" can {
"be shared by multiple actor systems" in {
val address = system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
val probeA = new TestProbe(systemA)
val probeB = new TestProbe(systemB)
PersistencePluginProxy.setTargetLocation(systemA, address)
val appA = systemA.actorOf(Props(classOf[ExampleApp], probeA.ref))
val appB = systemB.actorOf(Props(classOf[ExampleApp], probeB.ref))
appA ! "a1"
appB ! "b1"
probeA.expectMsg("a1")
probeB.expectMsg("b1")
val recoveredAppA = systemA.actorOf(Props(classOf[ExampleApp], probeA.ref))
val recoveredAppB = systemB.actorOf(Props(classOf[ExampleApp], probeB.ref))
recoveredAppA ! "a2"
recoveredAppB ! "b2"
probeA.expectMsg("a1")
probeA.expectMsg("a2")
probeB.expectMsg("b1")
probeB.expectMsg("b2")
}
}
}