diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteProxy.scala b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteProxy.scala index 3241d92f9d..9134c33210 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteProxy.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteProxy.scala @@ -27,37 +27,65 @@ private[persistence] trait AsyncWriteProxy extends AsyncWriteJournal with Stash import context.dispatcher private var isInitialized = false - private var store: ActorRef = _ + private var isInitTimedOut = false + private var store: Option[ActorRef] = None + private val storeNotInitialized = + Future.failed(new TimeoutException("Store not initialized. " + + "Use `SharedLeveldbJournal.setStore(sharedStore, system)`")) + + override protected[akka] def aroundPreStart(): Unit = { + context.system.scheduler.scheduleOnce(timeout.duration, self, InitTimeout) + super.aroundPreStart() + } override protected[akka] def aroundReceive(receive: Receive, msg: Any): Unit = - if (isInitialized) super.aroundReceive(receive, msg) - else msg match { + if (isInitialized) { + if (msg != InitTimeout) super.aroundReceive(receive, msg) + } else msg match { case SetStore(ref) ⇒ - store = ref + store = Some(ref) unstashAll() isInitialized = true - case _ ⇒ stash() + case InitTimeout ⇒ + isInitTimedOut = true + unstashAll() // will trigger appropriate failures + case _ if isInitTimedOut ⇒ super.aroundReceive(receive, msg) + case _ ⇒ stash() } implicit def timeout: Timeout def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] = - (store ? WriteMessages(messages)).mapTo[immutable.Seq[Try[Unit]]] + store match { + case Some(s) ⇒ (s ? WriteMessages(messages)).mapTo[immutable.Seq[Try[Unit]]] + case None ⇒ storeNotInitialized + } def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long): Future[Unit] = - (store ? DeleteMessagesTo(persistenceId, toSequenceNr)).mapTo[Unit] + store match { + case Some(s) ⇒ (s ? DeleteMessagesTo(persistenceId, toSequenceNr)).mapTo[Unit] + case None ⇒ storeNotInitialized + } - def asyncReplayMessages(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)(replayCallback: PersistentRepr ⇒ Unit): Future[Unit] = { - val replayCompletionPromise = Promise[Unit]() - val mediator = context.actorOf(Props(classOf[ReplayMediator], replayCallback, replayCompletionPromise, timeout.duration).withDeploy(Deploy.local)) - store.tell(ReplayMessages(persistenceId, fromSequenceNr, toSequenceNr, max), mediator) - replayCompletionPromise.future - } + def asyncReplayMessages(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)(replayCallback: PersistentRepr ⇒ Unit): Future[Unit] = + store match { + case Some(s) ⇒ + val replayCompletionPromise = Promise[Unit]() + val mediator = context.actorOf(Props(classOf[ReplayMediator], replayCallback, replayCompletionPromise, timeout.duration).withDeploy(Deploy.local)) + s.tell(ReplayMessages(persistenceId, fromSequenceNr, toSequenceNr, max), mediator) + replayCompletionPromise.future + case None ⇒ storeNotInitialized + } def asyncReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] = - (store ? ReplayMessages(persistenceId, fromSequenceNr = 0L, toSequenceNr = 0L, max = 0L)).map { - case ReplaySuccess(highest) ⇒ highest + store match { + case Some(s) ⇒ + (s ? ReplayMessages(persistenceId, fromSequenceNr = 0L, toSequenceNr = 0L, max = 0L)).map { + case ReplaySuccess(highest) ⇒ highest + } + case None ⇒ storeNotInitialized } + } /** @@ -65,6 +93,7 @@ private[persistence] trait AsyncWriteProxy extends AsyncWriteJournal with Stash */ private[persistence] object AsyncWriteProxy { final case class SetStore(ref: ActorRef) + final case object InitTimeout } /**