2015-11-06 16:10:46 +01:00
/* *
2016-02-23 12:58:39 +01:00
* Copyright ( C ) 2015 - 2016 Lightbend Inc . < http : //www.lightbend.com>
2015-11-06 16:10:46 +01:00
*/
package akka.persistence.journal
2015-11-17 16:50:54 +02:00
import java.net.URISyntaxException
2015-11-06 16:10:46 +01:00
import java.util.concurrent.TimeoutException
2015-11-17 16:50:54 +02:00
import akka.actor._
import akka.persistence. { AtomicWrite , DeleteMessagesFailure , DeleteSnapshotFailure , DeleteSnapshotsFailure , JournalProtocol , NonPersistentRepr , Persistence , SaveSnapshotFailure , SnapshotProtocol }
import akka.util.Helpers.Requiring
2015-11-06 16:10:46 +01:00
import com.typesafe.config.Config
2015-11-17 16:50:54 +02:00
import scala.concurrent.duration._
object PersistencePluginProxy {
2015-11-06 16:10:46 +01:00
final case class TargetLocation ( address : Address )
private case object InitTimeout
def setTargetLocation ( system : ActorSystem , address : Address ) : Unit = {
Persistence ( system ) . journalFor ( null ) ! TargetLocation ( address )
if ( system . settings . config . getString ( "akka.persistence.snapshot-store.plugin" ) != "" )
Persistence ( system ) . snapshotStoreFor ( null ) ! TargetLocation ( address )
}
2015-11-17 16:50:54 +02:00
def start ( system : ActorSystem ) : Unit = {
Persistence ( system ) . journalFor ( null )
if ( system . settings . config . getString ( "akka.persistence.snapshot-store.plugin" ) != "" )
Persistence ( system ) . snapshotStoreFor ( null )
}
2015-11-06 16:10:46 +01:00
private sealed trait PluginType {
def qualifier : String
}
private case object Journal extends PluginType {
override def qualifier : String = "journal"
}
private case object SnapshotStore extends PluginType {
override def qualifier : String = "snapshot-store"
}
}
2015-11-17 16:50:54 +02:00
/* *
* PersistencePluginProxyExtensionImpl is an `Extension` that enables initialization of the `PersistencePluginProxy`
* via configuration , without requiring any code changes or the creation of any actors .
* @param system The actor system to initialize the extension for
*/
class PersistencePluginProxyExtensionImpl ( system : ActorSystem ) extends Extension {
PersistencePluginProxy . start ( system )
}
object PersistencePluginProxyExtension extends ExtensionId [ PersistencePluginProxyExtensionImpl ] with ExtensionIdProvider {
override def createExtension ( system : ExtendedActorSystem ) : PersistencePluginProxyExtensionImpl = new PersistencePluginProxyExtensionImpl ( system )
override def lookup ( ) : ExtensionId [ _ <: Extension ] = PersistencePluginProxyExtension
override def get ( system : ActorSystem ) : PersistencePluginProxyExtensionImpl = super . get ( system )
}
final class PersistencePluginProxy ( config : Config ) extends Actor with Stash with ActorLogging {
import PersistencePluginProxy._
2015-11-06 16:10:46 +01:00
import JournalProtocol._
import SnapshotProtocol._
private val pluginId = self . path . name
private val pluginType : PluginType = pluginId match {
case "akka.persistence.journal.proxy" ⇒ Journal
case "akka.persistence.snapshot-store.proxy" ⇒ SnapshotStore
case other ⇒
throw new IllegalArgumentException ( "Unknown plugin type: " + other )
}
2015-11-17 16:50:54 +02:00
private val initTimeout : FiniteDuration = config . getDuration ( "init-timeout" , MILLISECONDS ) . millis
2015-11-06 16:10:46 +01:00
private val targetPluginId : String = {
val key = s" target- ${ pluginType . qualifier } -plugin "
config . getString ( key ) . requiring ( _ != "" , s" $pluginId . $key must be defined " )
}
private val startTarget : Boolean = config . getBoolean ( s" start-target- ${ pluginType . qualifier } " )
override def preStart ( ) : Unit = {
if ( startTarget ) {
val target = pluginType match {
case Journal ⇒
log . info ( "Starting target journal [{}]" , targetPluginId )
Persistence ( context . system ) . journalFor ( targetPluginId )
case SnapshotStore ⇒
log . info ( "Starting target snapshot-store [{}]" , targetPluginId )
Persistence ( context . system ) . snapshotStoreFor ( targetPluginId )
}
context . become ( active ( target , targetAtThisNode = true ) )
} else {
2015-11-17 16:50:54 +02:00
val targetAddressKey = s" target- ${ pluginType . qualifier } -address "
val targetAddress = config . getString ( targetAddressKey )
if ( targetAddress != "" ) {
try {
log . info ( "Setting target {} address to {}" , pluginType . qualifier , targetAddress )
PersistencePluginProxy . setTargetLocation ( context . system , AddressFromURIString ( targetAddress ) )
} catch {
case _ : URISyntaxException ⇒ log . warning ( "Invalid URL provided for target {} address: {}" , pluginType . qualifier , targetAddress )
}
}
context . system . scheduler . scheduleOnce ( initTimeout , self , InitTimeout ) ( context . dispatcher )
2015-11-06 16:10:46 +01:00
}
}
private val selfAddress : Address =
context . system . asInstanceOf [ ExtendedActorSystem ] . provider . getDefaultAddress
private def timeoutException ( ) = new TimeoutException ( s" Target ${ pluginType . qualifier } not initialized. " +
2015-11-17 16:50:54 +02:00
s" Use `PersistencePluginProxy.setTargetLocation` or set `target- ${ pluginType . qualifier } -address` " )
2015-11-06 16:10:46 +01:00
def receive = init
def init : Receive = {
case TargetLocation ( address ) ⇒
context . setReceiveTimeout ( 1. second ) // for retries
context . become ( identifying ( address ) )
case InitTimeout ⇒
2015-11-17 16:50:54 +02:00
log . info ( "Initialization timed-out (after {}), Use `PersistencePluginProxy.setTargetLocation` or set `target-{}-address`" , initTimeout , pluginType . qualifier )
2015-11-06 16:10:46 +01:00
context . become ( initTimedOut )
unstashAll ( ) // will trigger appropriate failures
2015-11-17 16:50:54 +02:00
case Terminated ( _ ) ⇒
2015-11-06 16:10:46 +01:00
case msg ⇒
stash ( )
}
def becomeIdentifying ( address : Address ) : Unit = {
sendIdentify ( address )
context . setReceiveTimeout ( 1. second ) // for retries
context . become ( identifying ( address ) )
}
def sendIdentify ( address : Address ) : Unit = {
val sel = context . actorSelection ( RootActorPath ( address ) / "system" / targetPluginId )
log . info ( "Trying to identify target {} at {}" , pluginType . qualifier , sel )
sel ! Identify ( targetPluginId )
}
def identifying ( address : Address ) : Receive = ( {
case ActorIdentity ( `targetPluginId` , Some ( target ) ) ⇒
log . info ( "Found target {} at [{}]" , pluginType . qualifier , address )
context . setReceiveTimeout ( Duration . Undefined )
2015-11-17 16:50:54 +02:00
context . watch ( target )
2015-11-06 16:10:46 +01:00
unstashAll ( )
context . become ( active ( target , address == selfAddress ) )
case _ : ActorIdentity ⇒ // will retry after ReceiveTimeout
2015-11-17 16:50:54 +02:00
case Terminated ( _ ) ⇒
2015-11-06 16:10:46 +01:00
case ReceiveTimeout ⇒
sendIdentify ( address )
} : Receive ) . orElse ( init )
def active ( targetJournal : ActorRef , targetAtThisNode : Boolean ) : Receive = {
case TargetLocation ( address ) ⇒
if ( targetAtThisNode && address != selfAddress )
becomeIdentifying ( address )
2015-11-17 16:50:54 +02:00
case Terminated ( `targetJournal` ) ⇒
context . unwatch ( targetJournal )
context . become ( initTimedOut )
case Terminated ( _ ) ⇒
case InitTimeout ⇒
2015-11-06 16:10:46 +01:00
case msg ⇒
2015-11-17 16:50:54 +02:00
targetJournal forward msg
2015-11-06 16:10:46 +01:00
}
def initTimedOut : Receive = {
case req : JournalProtocol . Request ⇒ req match { // exhaustive match
case WriteMessages ( messages , persistentActor , actorInstanceId ) ⇒
persistentActor ! WriteMessagesFailed ( timeoutException )
messages . foreach {
case a : AtomicWrite ⇒
a . payload . foreach { p ⇒
persistentActor ! WriteMessageFailure ( p , timeoutException , actorInstanceId )
}
case r : NonPersistentRepr ⇒
persistentActor ! LoopMessageSuccess ( r . payload , actorInstanceId )
}
case ReplayMessages ( fromSequenceNr , toSequenceNr , max , persistenceId , persistentActor ) ⇒
persistentActor ! ReplayMessagesFailure ( timeoutException )
case DeleteMessagesTo ( persistenceId , toSequenceNr , persistentActor ) ⇒
persistentActor ! DeleteMessagesFailure ( timeoutException , toSequenceNr )
}
case req : SnapshotProtocol . Request ⇒ req match { // exhaustive match
case LoadSnapshot ( persistenceId , criteria , toSequenceNr ) ⇒
sender ( ) ! LoadSnapshotResult ( None , toSequenceNr )
case SaveSnapshot ( metadata , snapshot ) ⇒
sender ( ) ! SaveSnapshotFailure ( metadata , timeoutException )
case DeleteSnapshot ( metadata ) ⇒
sender ( ) ! DeleteSnapshotFailure ( metadata , timeoutException )
case DeleteSnapshots ( persistenceId , criteria ) ⇒
sender ( ) ! DeleteSnapshotsFailure ( criteria , timeoutException )
}
case TargetLocation ( address ) ⇒
becomeIdentifying ( address )
2015-11-17 16:50:54 +02:00
case Terminated ( _ ) ⇒
2015-11-06 16:10:46 +01:00
case other ⇒
val e = timeoutException ( )
2015-11-17 16:50:54 +02:00
log . error ( e , "Failed PersistencePluginProxy request: {}" , e . getMessage )
2015-11-06 16:10:46 +01:00
}
}