=per #16090 Fail loudly if SharedLeveldbJournal is not initialized
This commit is contained in:
parent
986c5e8934
commit
1d5f3726c9
1 changed files with 44 additions and 15 deletions
|
|
@ -27,37 +27,65 @@ private[persistence] trait AsyncWriteProxy extends AsyncWriteJournal with Stash
|
||||||
import context.dispatcher
|
import context.dispatcher
|
||||||
|
|
||||||
private var isInitialized = false
|
private var isInitialized = false
|
||||||
private var store: ActorRef = _
|
private var isInitTimedOut = false
|
||||||
|
private var store: Option[ActorRef] = None
|
||||||
|
private val storeNotInitialized =
|
||||||
|
Future.failed(new TimeoutException("Store not initialized. " +
|
||||||
|
"Use `SharedLeveldbJournal.setStore(sharedStore, system)`"))
|
||||||
|
|
||||||
|
override protected[akka] def aroundPreStart(): Unit = {
|
||||||
|
context.system.scheduler.scheduleOnce(timeout.duration, self, InitTimeout)
|
||||||
|
super.aroundPreStart()
|
||||||
|
}
|
||||||
|
|
||||||
override protected[akka] def aroundReceive(receive: Receive, msg: Any): Unit =
|
override protected[akka] def aroundReceive(receive: Receive, msg: Any): Unit =
|
||||||
if (isInitialized) super.aroundReceive(receive, msg)
|
if (isInitialized) {
|
||||||
else msg match {
|
if (msg != InitTimeout) super.aroundReceive(receive, msg)
|
||||||
|
} else msg match {
|
||||||
case SetStore(ref) ⇒
|
case SetStore(ref) ⇒
|
||||||
store = ref
|
store = Some(ref)
|
||||||
unstashAll()
|
unstashAll()
|
||||||
isInitialized = true
|
isInitialized = true
|
||||||
case _ ⇒ stash()
|
case InitTimeout ⇒
|
||||||
|
isInitTimedOut = true
|
||||||
|
unstashAll() // will trigger appropriate failures
|
||||||
|
case _ if isInitTimedOut ⇒ super.aroundReceive(receive, msg)
|
||||||
|
case _ ⇒ stash()
|
||||||
}
|
}
|
||||||
|
|
||||||
implicit def timeout: Timeout
|
implicit def timeout: Timeout
|
||||||
|
|
||||||
def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] =
|
def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] =
|
||||||
(store ? WriteMessages(messages)).mapTo[immutable.Seq[Try[Unit]]]
|
store match {
|
||||||
|
case Some(s) ⇒ (s ? WriteMessages(messages)).mapTo[immutable.Seq[Try[Unit]]]
|
||||||
|
case None ⇒ storeNotInitialized
|
||||||
|
}
|
||||||
|
|
||||||
def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long): Future[Unit] =
|
def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long): Future[Unit] =
|
||||||
(store ? DeleteMessagesTo(persistenceId, toSequenceNr)).mapTo[Unit]
|
store match {
|
||||||
|
case Some(s) ⇒ (s ? DeleteMessagesTo(persistenceId, toSequenceNr)).mapTo[Unit]
|
||||||
|
case None ⇒ storeNotInitialized
|
||||||
|
}
|
||||||
|
|
||||||
def asyncReplayMessages(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)(replayCallback: PersistentRepr ⇒ Unit): Future[Unit] = {
|
def asyncReplayMessages(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)(replayCallback: PersistentRepr ⇒ Unit): Future[Unit] =
|
||||||
val replayCompletionPromise = Promise[Unit]()
|
store match {
|
||||||
val mediator = context.actorOf(Props(classOf[ReplayMediator], replayCallback, replayCompletionPromise, timeout.duration).withDeploy(Deploy.local))
|
case Some(s) ⇒
|
||||||
store.tell(ReplayMessages(persistenceId, fromSequenceNr, toSequenceNr, max), mediator)
|
val replayCompletionPromise = Promise[Unit]()
|
||||||
replayCompletionPromise.future
|
val mediator = context.actorOf(Props(classOf[ReplayMediator], replayCallback, replayCompletionPromise, timeout.duration).withDeploy(Deploy.local))
|
||||||
}
|
s.tell(ReplayMessages(persistenceId, fromSequenceNr, toSequenceNr, max), mediator)
|
||||||
|
replayCompletionPromise.future
|
||||||
|
case None ⇒ storeNotInitialized
|
||||||
|
}
|
||||||
|
|
||||||
def asyncReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] =
|
def asyncReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] =
|
||||||
(store ? ReplayMessages(persistenceId, fromSequenceNr = 0L, toSequenceNr = 0L, max = 0L)).map {
|
store match {
|
||||||
case ReplaySuccess(highest) ⇒ highest
|
case Some(s) ⇒
|
||||||
|
(s ? ReplayMessages(persistenceId, fromSequenceNr = 0L, toSequenceNr = 0L, max = 0L)).map {
|
||||||
|
case ReplaySuccess(highest) ⇒ highest
|
||||||
|
}
|
||||||
|
case None ⇒ storeNotInitialized
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -65,6 +93,7 @@ private[persistence] trait AsyncWriteProxy extends AsyncWriteJournal with Stash
|
||||||
*/
|
*/
|
||||||
private[persistence] object AsyncWriteProxy {
|
private[persistence] object AsyncWriteProxy {
|
||||||
final case class SetStore(ref: ActorRef)
|
final case class SetStore(ref: ActorRef)
|
||||||
|
final case object InitTimeout
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue