pekko/docs/src/test/scala/docs/actor/FaultHandlingDocSample.scala
scala-steward-asf[bot] 5b2aab6f7a
Update scalafmt-core to 3.8.4 (#1717)
* Update scalafmt-core to 3.8.4

* Reformat with scalafmt 3.8.4

Executed command: scalafmt --non-interactive

* Add 'Reformat with scalafmt 3.8.4' to .git-blame-ignore-revs

* remove postfixOps imports

Assuming we want to keep the default
`rewrite.avoidInfix.excludePostfix = false` - otherwise
we can drop this commit

---------

Co-authored-by: scala-steward-asf[bot] <147768647+scala-steward-asf[bot]@users.noreply.github.com>
Co-authored-by: Arnout Engelen <arnout@bzzt.net>
2025-01-18 10:36:38 +08:00

307 lines
9 KiB
Scala

/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* license agreements; and to You under the Apache License, version 2.0:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* This file is part of the Apache Pekko project, which was derived from Akka.
*/
/*
* Copyright (C) 2009-2022 Lightbend Inc. <https://www.lightbend.com>
*/
package docs.actor
//#all
//#imports
import org.apache.pekko
import pekko.actor._
import pekko.actor.SupervisorStrategy._
import scala.concurrent.duration._
import pekko.util.Timeout
import pekko.event.LoggingReceive
import pekko.pattern.{ ask, pipe }
import com.typesafe.config.ConfigFactory
//#imports
/**
* Runs the sample
*/
object FaultHandlingDocSample extends App {
import Worker._
val config = ConfigFactory.parseString("""
pekko.loglevel = "DEBUG"
pekko.actor.debug {
receive = on
lifecycle = on
}
""")
val system = ActorSystem("FaultToleranceSample", config)
val worker = system.actorOf(Props[Worker](), name = "worker")
val listener = system.actorOf(Props[Listener](), name = "listener")
// start the work and listen on progress
// note that the listener is used as sender of the tell,
// i.e. it will receive replies from the worker
worker.tell(Start, sender = listener)
}
/**
* Listens on progress from the worker and shuts down the system when enough
* work has been done.
*/
class Listener extends Actor with ActorLogging {
import Worker._
// If we don't get any progress within 15 seconds then the service is unavailable
context.setReceiveTimeout(15.seconds)
def receive = {
case Progress(percent) =>
log.info("Current progress: {} %", percent)
if (percent >= 100.0) {
log.info("That's all, shutting down")
context.system.terminate()
}
case ReceiveTimeout =>
// No progress within 15 seconds, ServiceUnavailable
log.error("Shutting down due to unavailable service")
context.system.terminate()
}
}
//#messages
object Worker {
case object Start
case object Do
final case class Progress(percent: Double)
}
//#messages
/**
* Worker performs some work when it receives the `Start` message.
* It will continuously notify the sender of the `Start` message
* of current ``Progress``. The `Worker` supervise the `CounterService`.
*/
class Worker extends Actor with ActorLogging {
import Worker._
import CounterService._
implicit val askTimeout: Timeout = Timeout(5.seconds)
// Stop the CounterService child if it throws ServiceUnavailable
override val supervisorStrategy = OneForOneStrategy() {
case _: CounterService.ServiceUnavailable => Stop
}
// The sender of the initial Start message will continuously be notified
// about progress
var progressListener: Option[ActorRef] = None
val counterService = context.actorOf(Props[CounterService](), name = "counter")
val totalCount = 51
import context.dispatcher // Use this Actors' Dispatcher as ExecutionContext
def receive = LoggingReceive {
case Start if progressListener.isEmpty =>
progressListener = Some(sender())
context.system.scheduler.scheduleWithFixedDelay(Duration.Zero, 1.second, self, Do)
case Do =>
counterService ! Increment(1)
counterService ! Increment(1)
counterService ! Increment(1)
// Send current progress to the initial sender
(counterService ? GetCurrentCount)
.map {
case CurrentCount(_, count) => Progress(100.0 * count / totalCount)
}
.pipeTo(progressListener.get)
}
}
//#messages
object CounterService {
final case class Increment(n: Int)
sealed abstract class GetCurrentCount
case object GetCurrentCount extends GetCurrentCount
final case class CurrentCount(key: String, count: Long)
class ServiceUnavailable(msg: String) extends RuntimeException(msg)
private case object Reconnect
}
//#messages
/**
* Adds the value received in `Increment` message to a persistent
* counter. Replies with `CurrentCount` when it is asked for `CurrentCount`.
* `CounterService` supervise `Storage` and `Counter`.
*/
class CounterService extends Actor {
import CounterService._
import Counter._
import Storage._
// Restart the storage child when StorageException is thrown.
// After 3 restarts within 5 seconds it will be stopped.
override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 3, withinTimeRange = 5.seconds) {
case _: Storage.StorageException => Restart
}
val key = self.path.name
var storage: Option[ActorRef] = None
var counter: Option[ActorRef] = None
var backlog = IndexedSeq.empty[(ActorRef, Any)]
val MaxBacklog = 10000
import context.dispatcher // Use this Actors' Dispatcher as ExecutionContext
override def preStart(): Unit = {
initStorage()
}
/**
* The child storage is restarted in case of failure, but after 3 restarts,
* and still failing it will be stopped. Better to back-off than continuously
* failing. When it has been stopped we will schedule a Reconnect after a delay.
* Watch the child so we receive Terminated message when it has been terminated.
*/
def initStorage(): Unit = {
storage = Some(context.watch(context.actorOf(Props[Storage](), name = "storage")))
// Tell the counter, if any, to use the new storage
counter.foreach { _ ! UseStorage(storage) }
// We need the initial value to be able to operate
storage.get ! Get(key)
}
def receive = LoggingReceive {
case Entry(k, v) if k == key && counter == None =>
// Reply from Storage of the initial value, now we can create the Counter
val c = context.actorOf(Props(classOf[Counter], key, v))
counter = Some(c)
// Tell the counter to use current storage
c ! UseStorage(storage)
// and send the buffered backlog to the counter
for ((replyTo, msg) <- backlog) c.tell(msg, sender = replyTo)
backlog = IndexedSeq.empty
case msg: Increment => forwardOrPlaceInBacklog(msg)
case msg: GetCurrentCount => forwardOrPlaceInBacklog(msg)
case Terminated(actorRef) if Some(actorRef) == storage =>
// After 3 restarts the storage child is stopped.
// We receive Terminated because we watch the child, see initStorage.
storage = None
// Tell the counter that there is no storage for the moment
counter.foreach { _ ! UseStorage(None) }
// Try to re-establish storage after while
context.system.scheduler.scheduleOnce(10.seconds, self, Reconnect)
case Reconnect =>
// Re-establish storage after the scheduled delay
initStorage()
}
def forwardOrPlaceInBacklog(msg: Any): Unit = {
// We need the initial value from storage before we can start delegate to
// the counter. Before that we place the messages in a backlog, to be sent
// to the counter when it is initialized.
counter match {
case Some(c) => c.forward(msg)
case None =>
if (backlog.size >= MaxBacklog)
throw new ServiceUnavailable("CounterService not available, lack of initial value")
backlog :+= (sender() -> msg)
}
}
}
//#messages
object Counter {
final case class UseStorage(storage: Option[ActorRef])
}
//#messages
/**
* The in memory count variable that will send current
* value to the `Storage`, if there is any storage
* available at the moment.
*/
class Counter(key: String, initialValue: Long) extends Actor {
import Counter._
import CounterService._
import Storage._
var count = initialValue
var storage: Option[ActorRef] = None
def receive = LoggingReceive {
case UseStorage(s) =>
storage = s
storeCount()
case Increment(n) =>
count += n
storeCount()
case GetCurrentCount =>
sender() ! CurrentCount(key, count)
}
def storeCount(): Unit = {
// Delegate dangerous work, to protect our valuable state.
// We can continue without storage.
storage.foreach { _ ! Store(Entry(key, count)) }
}
}
//#messages
object Storage {
final case class Store(entry: Entry)
final case class Get(key: String)
final case class Entry(key: String, value: Long)
class StorageException(msg: String) extends RuntimeException(msg)
}
//#messages
/**
* Saves key/value pairs to persistent storage when receiving `Store` message.
* Replies with current value when receiving `Get` message.
* Will throw StorageException if the underlying data store is out of order.
*/
class Storage extends Actor {
import Storage._
val db = DummyDB
def receive = LoggingReceive {
case Store(Entry(key, count)) => db.save(key, count)
case Get(key) => sender() ! Entry(key, db.load(key).getOrElse(0L))
}
}
//#dummydb
object DummyDB {
import Storage.StorageException
private var db = Map[String, Long]()
@throws(classOf[StorageException])
def save(key: String, value: Long): Unit = synchronized {
if (11 <= value && value <= 14)
throw new StorageException("Simulated store failure " + value)
db += (key -> value)
}
@throws(classOf[StorageException])
def load(key: String): Option[Long] = synchronized {
db.get(key)
}
}
//#dummydb
//#all