pekko/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteProxy.scala

115 lines
3.6 KiB
Scala
Raw Normal View History

/**
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence.journal
import akka.AkkaException
import akka.actor._
import akka.pattern.ask
import akka.persistence._
import akka.util._
import scala.util.Try
import scala.collection.immutable
import scala.concurrent._
import scala.concurrent.duration.Duration
import scala.language.postfixOps
/**
* INTERNAL API.
*
* A journal that delegates actual storage to a target actor. For testing only.
*/
private[persistence] trait AsyncWriteProxy extends AsyncWriteJournal with Stash with ActorLogging {
import AsyncWriteProxy._
import AsyncWriteTarget._
import context.dispatcher
private var isInitialized = false
private var store: ActorRef = _
override protected[akka] def aroundReceive(receive: Receive, msg: Any): Unit =
if (isInitialized) super.aroundReceive(receive, msg)
else msg match {
case SetStore(ref)
store = ref
unstashAll()
isInitialized = true
case _ stash()
}
implicit def timeout: Timeout
def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] =
(store ? WriteMessages(messages)).mapTo[immutable.Seq[Try[Unit]]]
def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long): Future[Unit] =
(store ? DeleteMessagesTo(persistenceId, toSequenceNr)).mapTo[Unit]
def asyncReplayMessages(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)(replayCallback: PersistentRepr Unit): Future[Unit] = {
val replayCompletionPromise = Promise[Unit]()
val mediator = context.actorOf(Props(classOf[ReplayMediator], replayCallback, replayCompletionPromise, timeout.duration).withDeploy(Deploy.local))
!per #15230 rename processorId => persistentId * This is NOT binary compatible, we're in an *experimental* module. * disabled binary compat checks for package akka.persistence * Source compatibility is retained, but users should migrate do the new method name ASAP. * Plugin APIs were migrated in a way that allows the old plugins to compile agains 2.3.4 without having to change anything. Hopefuly this will help authors migrate to 2.3.4 sooner. This is only source level compatible, not binary compatible. * added deprecation warnings on all processorId methods and provided bridges where possible * for users, the migration should be painless, they can still override the old method, and it'll work. But we encourage them to move to persistenceId; All delegation code will have to be removed afterwards ofc. Conflicts: akka-persistence/src/main/scala/akka/persistence/Channel.scala akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala akka-persistence/src/main/scala/akka/persistence/Persistent.scala akka-persistence/src/main/scala/akka/persistence/PersistentChannel.scala akka-persistence/src/main/scala/akka/persistence/Processor.scala akka-persistence/src/main/scala/akka/persistence/Snapshot.scala akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteProxy.scala akka-persistence/src/main/scala/akka/persistence/journal/inmem/InmemJournal.scala akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbKey.scala akka-persistence/src/main/scala/akka/persistence/snapshot/SnapshotStore.scala akka-persistence/src/test/scala/akka/persistence/serialization/SerializerSpec.scala project/AkkaBuild.scala
2014-06-23 14:33:35 +02:00
store.tell(ReplayMessages(persistenceId, fromSequenceNr, toSequenceNr, max), mediator)
replayCompletionPromise.future
}
!per #15230 rename processorId => persistentId * This is NOT binary compatible, we're in an *experimental* module. * disabled binary compat checks for package akka.persistence * Source compatibility is retained, but users should migrate do the new method name ASAP. * Plugin APIs were migrated in a way that allows the old plugins to compile agains 2.3.4 without having to change anything. Hopefuly this will help authors migrate to 2.3.4 sooner. This is only source level compatible, not binary compatible. * added deprecation warnings on all processorId methods and provided bridges where possible * for users, the migration should be painless, they can still override the old method, and it'll work. But we encourage them to move to persistenceId; All delegation code will have to be removed afterwards ofc. Conflicts: akka-persistence/src/main/scala/akka/persistence/Channel.scala akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala akka-persistence/src/main/scala/akka/persistence/Persistent.scala akka-persistence/src/main/scala/akka/persistence/PersistentChannel.scala akka-persistence/src/main/scala/akka/persistence/Processor.scala akka-persistence/src/main/scala/akka/persistence/Snapshot.scala akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteProxy.scala akka-persistence/src/main/scala/akka/persistence/journal/inmem/InmemJournal.scala akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbKey.scala akka-persistence/src/main/scala/akka/persistence/snapshot/SnapshotStore.scala akka-persistence/src/test/scala/akka/persistence/serialization/SerializerSpec.scala project/AkkaBuild.scala
2014-06-23 14:33:35 +02:00
def asyncReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] =
(store ? ReplayMessages(persistenceId, fromSequenceNr = 0L, toSequenceNr = 0L, max = 0L)).map {
case ReplaySuccess(highest) highest
}
}
/**
* INTERNAL API.
*/
private[persistence] object AsyncWriteProxy {
2014-03-07 13:20:01 +01:00
final case class SetStore(ref: ActorRef)
}
/**
* INTERNAL API.
*/
private[persistence] object AsyncWriteTarget {
@SerialVersionUID(1L)
final case class WriteMessages(messages: immutable.Seq[AtomicWrite])
@SerialVersionUID(1L)
final case class DeleteMessagesTo(persistenceId: String, toSequenceNr: Long)
@SerialVersionUID(1L)
!per #15230 rename processorId => persistentId * This is NOT binary compatible, we're in an *experimental* module. * disabled binary compat checks for package akka.persistence * Source compatibility is retained, but users should migrate do the new method name ASAP. * Plugin APIs were migrated in a way that allows the old plugins to compile agains 2.3.4 without having to change anything. Hopefuly this will help authors migrate to 2.3.4 sooner. This is only source level compatible, not binary compatible. * added deprecation warnings on all processorId methods and provided bridges where possible * for users, the migration should be painless, they can still override the old method, and it'll work. But we encourage them to move to persistenceId; All delegation code will have to be removed afterwards ofc. Conflicts: akka-persistence/src/main/scala/akka/persistence/Channel.scala akka-persistence/src/main/scala/akka/persistence/JournalProtocol.scala akka-persistence/src/main/scala/akka/persistence/Persistent.scala akka-persistence/src/main/scala/akka/persistence/PersistentChannel.scala akka-persistence/src/main/scala/akka/persistence/Processor.scala akka-persistence/src/main/scala/akka/persistence/Snapshot.scala akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteProxy.scala akka-persistence/src/main/scala/akka/persistence/journal/inmem/InmemJournal.scala akka-persistence/src/main/scala/akka/persistence/journal/leveldb/LeveldbKey.scala akka-persistence/src/main/scala/akka/persistence/snapshot/SnapshotStore.scala akka-persistence/src/test/scala/akka/persistence/serialization/SerializerSpec.scala project/AkkaBuild.scala
2014-06-23 14:33:35 +02:00
final case class ReplayMessages(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)
@SerialVersionUID(1L)
case class ReplaySuccess(highestSequenceNr: Long)
@SerialVersionUID(1L)
2014-03-07 13:20:01 +01:00
final case class ReplayFailure(cause: Throwable)
}
/**
* Thrown if replay inactivity exceeds a specified timeout.
*/
@SerialVersionUID(1L)
class AsyncReplayTimeoutException(msg: String) extends AkkaException(msg)
private class ReplayMediator(replayCallback: PersistentRepr Unit, replayCompletionPromise: Promise[Unit], replayTimeout: Duration) extends Actor {
import AsyncWriteTarget._
context.setReceiveTimeout(replayTimeout)
def receive = {
case p: PersistentRepr replayCallback(p)
case _: ReplaySuccess
replayCompletionPromise.success(())
context.stop(self)
case ReplayFailure(cause)
replayCompletionPromise.failure(cause)
context.stop(self)
case ReceiveTimeout
replayCompletionPromise.failure(new AsyncReplayTimeoutException(s"replay timed out after ${replayTimeout.toSeconds} seconds inactivity"))
context.stop(self)
}
}