Remove akka-contrib (#26769)

* Remove akka-contrib #26183

This module has been deprecated since 2.5 and is removed in 2.6
This commit is contained in:
Arnout Engelen 2019-05-02 12:09:11 +02:00 committed by GitHub
parent 80fddd2317
commit 282b38e832
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
26 changed files with 7 additions and 3975 deletions

View file

@ -1,52 +0,0 @@
# External Contributions
This subproject provides a home to modules contributed by external developers
which may or may not move into the officially supported code base over time.
The conditions under which this transition can occur include:
* there must be enough interest in the module to warrant inclusion in the standard distribution,
* the module must be actively maintained and
* code quality must be good enough to allow efficient maintenance by the Akka core development team
If a contributions turns out to not “take off” it may be removed again at a
later time.
## Caveat Emptor
A module in this subproject doesn't have to obey the rule of staying binary
compatible between micro releases. Breaking API changes may be introduced in
minor releases without notice as we refine and simplify based on your feedback.
A module may be dropped in any release without prior deprecation. The Lightbend
subscription does not cover support for these modules.
## Suggested Format of Contributions
Each contribution should be a self-contained unit, consisting of one source
file or one exclusively used package, without dependencies to other modules in
this subproject; it may depend on everything else in the Akka distribution,
though. This ensures that contributions may be moved into the standard
distribution individually. The module shall be within a subpackage of
`akka.contrib`.
Each module must be accompanied by a test suite which verifies that the
provided features work, possibly complemented by integration and unit tests.
The tests should follow the [Developer
Guidelines](http://doc.akka.io/docs/akka/current/dev/developer-guidelines.html#testing)
and go into the `src/test/scala` or `src/test/java` directories (with package
name matching the module which is being tested). As an example, if the module
were called `akka.contrib.pattern.ReliableProxy`, then the test suite should be
called `akka.contrib.pattern.ReliableProxySpec`.
Each module must also have proper documentation in [reStructured Text
format](http://sphinx.pocoo.org/rest.html). The documentation should be a
single `<module>.rst` file in the `akka-contrib/docs` directory, including a
link from `index.rst`.
## Suggested Way of Using these Contributions
Since the Akka team does not restrict updates to this subproject even during
otherwise binary compatible releases, and modules may be removed without
deprecation, it is suggested to copy the source files into your own code base,
changing the package name. This way you can choose when to update or which
fixes to include (to keep binary compatibility if needed) and later releases of
Akka do not potentially break your application.

View file

@ -1,6 +0,0 @@
######################################
# Akka Contrib Reference Config File #
######################################
# This is the reference config file that contains all the default settings.
# Make your edits/overrides in your application.conf.

View file

@ -1,316 +0,0 @@
/*
* Copyright (C) 2014-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.contrib.circuitbreaker
import akka.actor._
import akka.contrib.circuitbreaker.CircuitBreakerProxy.CircuitOpenFailure
import akka.event.LoggingAdapter
import akka.pattern._
import akka.util.Timeout
import scala.util.{ Failure, Success }
/**
* This is an Actor which implements the circuit breaker pattern,
* you may also be interested in the raw circuit breaker [[akka.pattern.CircuitBreaker]]
*/
@deprecated("Use akka.pattern.CircuitBreaker + ask instead", "2.5.0")
object CircuitBreakerProxy {
/**
* Creates an circuit breaker actor proxying a target actor intended for request-reply interactions.
* It is possible to send messages through this proxy without expecting a response wrapping them into a
* [[akka.contrib.circuitbreaker.CircuitBreakerProxy.TellOnly]] or a
* [[akka.contrib.circuitbreaker.CircuitBreakerProxy.Passthrough]] the difference between the two being that
* a message wrapped into a [[akka.contrib.circuitbreaker.CircuitBreakerProxy.Passthrough]] is going to be
* forwarded even when the circuit is open (e.g. if you need to terminate the target and proxy actors sending
* a [[akka.actor.PoisonPill]] message)
*
* The circuit breaker implements the same state machine documented in [[akka.pattern.CircuitBreaker]]
*
* @param target the actor to proxy
* @param maxFailures maximum number of failures before opening the circuit
* @param callTimeout timeout before considering the ongoing call a failure
* @param resetTimeout time after which the channel will be closed after entering the open state
* @param circuitEventListener an actor that will receive a series of messages of type
* [[akka.contrib.circuitbreaker.CircuitBreakerProxy.CircuitBreakerEvent]] (optional)
* @param failureDetector function to detect if a message received from the target actor as a
* response from the request represents a failure
* @param failureMap function to map a failure into a response message. The failing response message is wrapped
* into a [[akka.contrib.circuitbreaker.CircuitBreakerProxy.CircuitOpenFailure]] object
*/
def props(
target: ActorRef,
maxFailures: Int,
callTimeout: Timeout,
resetTimeout: Timeout,
circuitEventListener: Option[ActorRef],
failureDetector: Any => Boolean,
failureMap: CircuitOpenFailure => Any) =
Props(
new CircuitBreakerProxy(
target,
maxFailures,
callTimeout,
resetTimeout,
circuitEventListener,
failureDetector,
failureMap))
sealed trait CircuitBreakerCommand
final case class TellOnly(msg: Any) extends CircuitBreakerCommand
final case class Passthrough(msg: Any) extends CircuitBreakerCommand
sealed trait CircuitBreakerResponse
final case class CircuitOpenFailure(failedMsg: Any)
sealed trait CircuitBreakerEvent
final case class CircuitOpen(circuit: ActorRef) extends CircuitBreakerCommand
final case class CircuitClosed(circuit: ActorRef) extends CircuitBreakerCommand
final case class CircuitHalfOpen(circuit: ActorRef) extends CircuitBreakerCommand
sealed trait CircuitBreakerState
case object Open extends CircuitBreakerState
case object Closed extends CircuitBreakerState
case object HalfOpen extends CircuitBreakerState
final case class CircuitBreakerStateData(failureCount: Int = 0, firstHalfOpenMessageSent: Boolean = false)
final case class CircuitBreakerPropsBuilder(
maxFailures: Int,
callTimeout: Timeout,
resetTimeout: Timeout,
circuitEventListener: Option[ActorRef] = None,
failureDetector: Any => Boolean = { _ =>
false
},
openCircuitFailureConverter: CircuitOpenFailure => Any = identity) {
def withMaxFailures(value: Int) = copy(maxFailures = value)
def withCallTimeout(value: Timeout) = copy(callTimeout = value)
def withResetTimeout(value: Timeout) = copy(resetTimeout = value)
def withCircuitEventListener(value: Option[ActorRef]) = copy(circuitEventListener = value)
def withFailureDetector(value: Any => Boolean) = copy(failureDetector = value)
def withOpenCircuitFailureConverter(value: CircuitOpenFailure => Any) = copy(openCircuitFailureConverter = value)
/**
* Creates the props for a [[akka.contrib.circuitbreaker.CircuitBreakerProxy]] proxying the given target
*
* @param target the target actor ref
*/
def props(target: ActorRef) =
CircuitBreakerProxy.props(
target,
maxFailures,
callTimeout,
resetTimeout,
circuitEventListener,
failureDetector,
openCircuitFailureConverter)
}
private[CircuitBreakerProxy] object CircuitBreakerInternalEvents {
sealed trait CircuitBreakerInternalEvent
case object CallFailed extends CircuitBreakerInternalEvent
case object CallSucceeded extends CircuitBreakerInternalEvent
}
}
import akka.contrib.circuitbreaker.CircuitBreakerProxy._
@deprecated("Use akka.pattern.CircuitBreaker + ask instead", "2.5.0")
final class CircuitBreakerProxy(
target: ActorRef,
maxFailures: Int,
callTimeout: Timeout,
resetTimeout: Timeout,
circuitEventListener: Option[ActorRef],
failureDetector: Any => Boolean,
failureMap: CircuitOpenFailure => Any)
extends Actor
with ActorLogging
with FSM[CircuitBreakerState, CircuitBreakerStateData] {
import CircuitBreakerInternalEvents._
import FSM.`->`
context.watch(target)
startWith(Closed, CircuitBreakerStateData(failureCount = 0))
def callSucceededHandling: StateFunction = {
case Event(CallSucceeded, state) =>
log.debug("Received call succeeded notification in state {} resetting counter", state)
goto(Closed).using(CircuitBreakerStateData(failureCount = 0, firstHalfOpenMessageSent = false))
}
def passthroughHandling: StateFunction = {
case Event(Passthrough(message), state) =>
log.debug(
"Received a passthrough message in state {}, forwarding the message to the target actor without altering current state",
state)
target ! message
stay
}
def targetTerminationHandling: StateFunction = {
case Event(Terminated(`target`), state) =>
log.debug("Target actor {} terminated while in state {}, terminating this proxy too", target, state)
stop
}
def commonStateHandling: StateFunction = {
callSucceededHandling.orElse(passthroughHandling).orElse(targetTerminationHandling)
}
when(Closed) {
commonStateHandling.orElse {
case Event(TellOnly(message), _) =>
log.debug("Closed: Sending message {} without expecting any response", message)
target ! message
stay
case Event(CallFailed, state) =>
log.debug("Received call failed notification in state {} incrementing counter", state)
val newState = state.copy(failureCount = state.failureCount + 1)
if (newState.failureCount < maxFailures) {
stay.using(newState)
} else {
goto(Open).using(newState)
}
case Event(message, state) =>
log.debug("CLOSED: Sending message {} expecting a response within timeout {}", message, callTimeout)
val currentSender = sender()
forwardRequest(message, sender, state, log)
stay
}
}
when(Open, stateTimeout = resetTimeout.duration) {
commonStateHandling.orElse {
case Event(StateTimeout, state) =>
log.debug("Timeout expired for state OPEN, going to half open")
goto(HalfOpen).using(state.copy(firstHalfOpenMessageSent = false))
case Event(CallFailed, state) =>
log.debug(
"Open: Call received a further call failed notification, probably from a previous timed out event, ignoring")
stay
case Event(openNotification @ CircuitOpenFailure(_), _) =>
log.warning(
"Unexpected circuit open notification {} sent to myself. Please report this as a bug.",
openNotification)
stay
case Event(message, state) =>
val failureNotification = failureMap(CircuitOpenFailure(message))
log.debug(
"OPEN: Failing request for message {}, sending failure notification {} to sender {}",
message,
failureNotification,
sender)
sender ! failureNotification
stay
}
}
when(HalfOpen) {
commonStateHandling.orElse {
case Event(TellOnly(message), _) =>
log.debug("HalfOpen: Dropping TellOnly request for message {}", message)
stay
case Event(CallFailed, CircuitBreakerStateData(_, true)) =>
log.debug("HalfOpen: First forwarded call failed returning to OPEN state")
goto(Open)
case Event(CallFailed, CircuitBreakerStateData(_, false)) =>
log.debug(
"HalfOpen: Call received a further call failed notification, probably from a previous timed out event, ignoring")
stay
case Event(message, state @ CircuitBreakerStateData(_, false)) =>
log.debug("HalfOpen: First message {} received, forwarding it to target {}", message, target)
forwardRequest(message, sender, state, log)
stay.using(state.copy(firstHalfOpenMessageSent = true))
case Event(message, CircuitBreakerStateData(_, true)) =>
val failureNotification = failureMap(CircuitOpenFailure(message))
log.debug(
"HALF-OPEN: Failing request for message {}, sending failure notification {} to sender {}",
message,
failureNotification,
sender)
sender ! failureNotification
stay
}
}
def forwardRequest(message: Any, currentSender: ActorRef, state: CircuitBreakerStateData, log: LoggingAdapter) = {
import context.dispatcher
target.ask(message)(callTimeout).onComplete {
case Success(response) =>
log.debug(
"Request '{}' has been replied to with response {}, forwarding to original sender {}",
message,
currentSender)
currentSender ! response
val isFailure = failureDetector(response)
if (isFailure) {
log.debug(
"Response '{}' is considered as failure sending self-message to ask incrementing failure count (origin state was {})",
response,
state)
self ! CallFailed
} else {
log.debug(
"Request '{}' succeeded with response {}, returning response to sender {} and sending message to ask to reset failure count (origin state was {})",
message,
response,
currentSender,
state)
self ! CallSucceeded
}
case Failure(reason) =>
log.debug(
"Request '{}' to target {} failed with exception {}, sending self-message to ask incrementing failure count (origin state was {})",
message,
target,
reason,
state)
self ! CallFailed
}
}
onTransition {
case from -> Closed =>
log.debug("Moving from state {} to state CLOSED", from)
circuitEventListener.foreach { _ ! CircuitClosed(self) }
case from -> HalfOpen =>
log.debug("Moving from state {} to state HALF OPEN", from)
circuitEventListener.foreach { _ ! CircuitHalfOpen(self) }
case from -> Open =>
log.debug("Moving from state {} to state OPEN", from)
circuitEventListener.foreach { _ ! CircuitOpen(self) }
}
}

View file

@ -1,124 +0,0 @@
/*
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.contrib.circuitbreaker
import akka.actor.{ Actor, ActorRef, ActorSelection }
import akka.contrib.circuitbreaker.CircuitBreakerProxy.CircuitOpenFailure
import akka.util.Timeout
import scala.language.implicitConversions
import scala.concurrent.{ ExecutionContext, Future }
@deprecated("Use akka.pattern.CircuitBreaker + ask instead", "2.5.0")
sealed class OpenCircuitException(message: String) extends RuntimeException(message)
private[circuitbreaker] final object OpenCircuitException
extends OpenCircuitException("Unable to complete operation since the Circuit Breaker Actor Proxy is in Open State")
/**
* Convenience implicit conversions to provide circuit-breaker aware management of the ask pattern,
* either directly replacing the `ask/?` with `askWithCircuitBreaker` or with an extension method to the
* `Future` result of an `ask` pattern to fail in case of
* [[akka.contrib.circuitbreaker.CircuitBreakerProxy.CircuitOpenFailure]] response
*/
@deprecated("Use akka.pattern.CircuitBreaker + ask instead", "2.5.0")
object Implicits {
/**
* Import this implicit to enable the methods `failForOpenCircuit` and `failForOpenCircuitWith`
* to [[scala.concurrent.Future]] converting
* [[akka.contrib.circuitbreaker.CircuitBreakerProxy.CircuitOpenFailure]] into a failure caused either by an
* [[akka.contrib.circuitbreaker.OpenCircuitException]] or by an exception built with the given
* exception builder
*/
implicit def futureExtensions(future: Future[Any]) = new CircuitBreakerAwareFuture(future)
/**
* Import this implicit method to get an extended versions of the `ask` pattern for
* [[akka.actor.ActorRef]] and [[akka.actor.ActorSelection]] converting
* [[akka.contrib.circuitbreaker.CircuitBreakerProxy.CircuitOpenFailure]] into a failure caused by an
* [[akka.contrib.circuitbreaker.OpenCircuitException]]
*/
implicit def askWithCircuitBreaker(actorRef: ActorRef) = new AskeableWithCircuitBreakerActor(actorRef)
/**
* Wraps the `ask` method in [[akka.pattern.AskSupport]] method to convert
* [[akka.contrib.circuitbreaker.CircuitBreakerProxy.CircuitOpenFailure]] responses into a failure response caused
* by an [[akka.contrib.circuitbreaker.OpenCircuitException]]
*/
@throws[akka.contrib.circuitbreaker.OpenCircuitException](
"if the call failed because the circuit breaker proxy state was OPEN")
def askWithCircuitBreaker(circuitBreakerProxy: ActorRef, message: Any)(
implicit executionContext: ExecutionContext,
timeout: Timeout): Future[Any] =
circuitBreakerProxy.internalAskWithCircuitBreaker(message, timeout, ActorRef.noSender)
/**
* Wraps the `ask` method in [[akka.pattern.AskSupport]] method to convert failures connected to the circuit
* breaker being in open state
*/
@throws[akka.contrib.circuitbreaker.OpenCircuitException](
"if the call failed because the circuit breaker proxy state was OPEN")
def askWithCircuitBreaker(circuitBreakerProxy: ActorRef, message: Any, sender: ActorRef)(
implicit executionContext: ExecutionContext,
timeout: Timeout): Future[Any] =
circuitBreakerProxy.internalAskWithCircuitBreaker(message, timeout, sender)
}
/**
* Extends [[scala.concurrent.Future]] with the method `failForOpenCircuitWith` to handle
* [[akka.contrib.circuitbreaker.CircuitBreakerProxy.CircuitOpenFailure]] failure responses throwing
* an exception built with the given exception builder
*/
@deprecated("Use akka.pattern.CircuitBreaker + ask instead", "2.5.0")
final class CircuitBreakerAwareFuture(val future: Future[Any]) extends AnyVal {
@throws[OpenCircuitException]
def failForOpenCircuit(implicit executionContext: ExecutionContext): Future[Any] =
failForOpenCircuitWith(OpenCircuitException)
def failForOpenCircuitWith(throwing: => Throwable)(implicit executionContext: ExecutionContext): Future[Any] = {
future.flatMap {
_ match {
case CircuitOpenFailure(_) => Future.failed(throwing)
case result => Future.successful(result)
}
}
}
}
@deprecated("Use akka.pattern.CircuitBreaker + ask instead", "2.5.0")
final class AskeableWithCircuitBreakerActor(val actorRef: ActorRef) extends AnyVal {
def askWithCircuitBreaker(message: Any)(
implicit executionContext: ExecutionContext,
timeout: Timeout,
sender: ActorRef = Actor.noSender): Future[Any] =
internalAskWithCircuitBreaker(message, timeout, sender)
@throws[OpenCircuitException]
private[circuitbreaker] def internalAskWithCircuitBreaker(message: Any, timeout: Timeout, sender: ActorRef)(
implicit executionContext: ExecutionContext) = {
import akka.pattern.ask
import Implicits.futureExtensions
ask(actorRef, message, sender)(timeout).failForOpenCircuit
}
}
@deprecated("Use akka.pattern.CircuitBreaker + ask instead", "2.5.0")
final class AskeableWithCircuitBreakerActorSelection(val actorSelection: ActorSelection) extends AnyVal {
def askWithCircuitBreaker(message: Any)(
implicit executionContext: ExecutionContext,
timeout: Timeout,
sender: ActorRef = Actor.noSender): Future[Any] =
internalAskWithCircuitBreaker(message, timeout, sender)
private[circuitbreaker] def internalAskWithCircuitBreaker(message: Any, timeout: Timeout, sender: ActorRef)(
implicit executionContext: ExecutionContext) = {
import akka.pattern.ask
import Implicits.futureExtensions
ask(actorSelection, message, sender)(timeout).failForOpenCircuit
}
}

View file

@ -1,125 +0,0 @@
/*
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.contrib.jul
import akka.event.Logging._
import akka.actor._
import akka.event.LoggingAdapter
import java.util.logging
import scala.concurrent.{ ExecutionContext, Future }
import akka.dispatch.RequiresMessageQueue
import akka.event.LoggerMessageQueueSemantics
/**
* Makes the Akka `Logging` API available as the `log`
* field, using `java.util.logging` as the backend.
*
* This trait does not require an `ActorSystem` and is
* encouraged to be used as a general purpose Scala
* logging API.
*
* For `Actor`s, use `ActorLogging` instead.
*/
@deprecated("Feel free to copy", "2.5.0")
trait JavaLogging {
@transient
protected lazy val log = new JavaLoggingAdapter {
def logger = logging.Logger.getLogger(JavaLogging.this.getClass.getName)
}
}
/**
* `java.util.logging` logger.
*/
@deprecated("Use akka.event.jul.JavaLogger in akka-actor instead", "2.5.0")
class JavaLogger extends Actor with RequiresMessageQueue[LoggerMessageQueueSemantics] {
def receive = {
case event @ Error(cause, _, _, _) => log(logging.Level.SEVERE, cause, event)
case event: Warning => log(logging.Level.WARNING, null, event)
case event: Info => log(logging.Level.INFO, null, event)
case event: Debug => log(logging.Level.CONFIG, null, event)
case InitializeLogger(_) => sender() ! LoggerInitialized
}
@inline
def log(level: logging.Level, cause: Throwable, event: LogEvent): Unit = {
val logger = logging.Logger.getLogger(event.logSource)
val record = new logging.LogRecord(level, String.valueOf(event.message))
record.setLoggerName(logger.getName)
record.setThrown(cause)
record.setThreadID(event.thread.getId.toInt)
record.setSourceClassName(event.logClass.getName)
record.setSourceMethodName(null) // lost forever
logger.log(record)
}
}
@deprecated("Feel free to copy", "2.5.0")
trait JavaLoggingAdapter extends LoggingAdapter {
def logger: logging.Logger
/** Override-able option for asynchronous logging */
def loggingExecutionContext: Option[ExecutionContext] = None
def isErrorEnabled = logger.isLoggable(logging.Level.SEVERE)
def isWarningEnabled = logger.isLoggable(logging.Level.WARNING)
def isInfoEnabled = logger.isLoggable(logging.Level.INFO)
def isDebugEnabled = logger.isLoggable(logging.Level.CONFIG)
protected def notifyError(message: String): Unit =
log(logging.Level.SEVERE, null, message)
protected def notifyError(cause: Throwable, message: String): Unit =
log(logging.Level.SEVERE, cause, message)
protected def notifyWarning(message: String): Unit =
log(logging.Level.WARNING, null, message)
protected def notifyInfo(message: String): Unit =
log(logging.Level.INFO, null, message)
protected def notifyDebug(message: String): Unit =
log(logging.Level.CONFIG, null, message)
@inline
def log(level: logging.Level, cause: Throwable, message: String): Unit = {
val record = new logging.LogRecord(level, message)
record.setLoggerName(logger.getName)
record.setThrown(cause)
updateSource(record)
if (loggingExecutionContext.isDefined) {
implicit val context = loggingExecutionContext.get
Future(logger.log(record)).failed.foreach { _.printStackTrace() }
} else
logger.log(record)
}
// it is unfortunate that this workaround is needed
private def updateSource(record: logging.LogRecord): Unit = {
val stack = Thread.currentThread.getStackTrace
val source = stack.find { frame =>
val cname = frame.getClassName
!cname.startsWith("akka.contrib.jul.") &&
!cname.startsWith("akka.event.LoggingAdapter") &&
!cname.startsWith("java.lang.reflect.") &&
!cname.startsWith("sun.reflect.")
}
if (source.isDefined) {
record.setSourceClassName(source.get.getClassName)
record.setSourceMethodName(source.get.getMethodName)
} else {
record.setSourceClassName(null)
record.setSourceMethodName(null)
}
}
}

View file

@ -1,113 +0,0 @@
/*
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.contrib.mailbox
import java.util.concurrent.{ ConcurrentHashMap, ConcurrentLinkedQueue }
import com.typesafe.config.Config
import akka.actor.{
ActorContext,
ActorRef,
ActorSystem,
ExtendedActorSystem,
Extension,
ExtensionId,
ExtensionIdProvider
}
import akka.dispatch.{ Envelope, MailboxType, MessageQueue, UnboundedQueueBasedMessageQueue }
@deprecated("Use an explicit supervisor or proxy actor instead", "2.5.0")
object PeekMailboxExtension extends ExtensionId[PeekMailboxExtension] with ExtensionIdProvider {
def lookup = this
def createExtension(s: ExtendedActorSystem) = new PeekMailboxExtension(s)
def ack()(implicit context: ActorContext): Unit = PeekMailboxExtension(context.system).ack()
}
@deprecated("Use an explicit supervisor or proxy actor instead", "2.5.0")
class PeekMailboxExtension(val system: ExtendedActorSystem) extends Extension {
private val mailboxes = new ConcurrentHashMap[ActorRef, PeekMailbox]
def register(actorRef: ActorRef, mailbox: PeekMailbox): Unit =
mailboxes.put(actorRef, mailbox)
def unregister(actorRef: ActorRef): Unit = mailboxes.remove(actorRef)
def ack()(implicit context: ActorContext): Unit =
mailboxes.get(context.self) match {
case null => throw new IllegalArgumentException("Mailbox not registered for: " + context.self)
case mailbox => mailbox.ack()
}
}
/**
* configure the mailbox via dispatcher configuration:
* {{{
* peek-dispatcher {
* mailbox-type = "example.PeekMailboxType"
* }
* }}}
*/
@deprecated("Use an explicit supervisor or proxy actor instead", "2.5.0")
class PeekMailboxType(settings: ActorSystem.Settings, config: Config) extends MailboxType {
override def create(owner: Option[ActorRef], system: Option[ActorSystem]) = (owner, system) match {
case (Some(o), Some(s)) =>
val retries = config.getInt("max-retries")
if (retries < 1) throw new akka.ConfigurationException("max-retries must be at least 1")
val mailbox = new PeekMailbox(o, s, retries)
PeekMailboxExtension(s).register(o, mailbox)
mailbox
case _ => throw new Exception("no mailbox owner or system given")
}
}
@deprecated("Use an explicit supervisor or proxy actor instead", "2.5.0")
class PeekMailbox(owner: ActorRef, system: ActorSystem, maxRetries: Int) extends UnboundedQueueBasedMessageQueue {
final val queue = new ConcurrentLinkedQueue[Envelope]()
/*
* Since the queue itself is used to determine when to schedule the actor
* (see Mailbox.hasMessages), we cannot poll() on the first try and then
* continue handing back out that same message until ACKed, peek() must be
* used. The retry limit logic is then formulated in terms of the `tries`
* field, which holds
* 0 if clean slate (i.e. last dequeue was ack()ed)
* 1..maxRetries if not yet ack()ed
* Marker if last try was done (at which point we had to poll())
* -1 during cleanUp (in order to disable the ack() requirement)
*/
// the mutable state is only ever accessed by the actor (i.e. dequeue() side)
var tries = 0
val Marker = maxRetries + 1
// this logic does not work if maxRetries==0, but then you could also use a normal mailbox
override def dequeue(): Envelope = tries match {
case -1 =>
queue.poll()
case 0 | Marker =>
val e = queue.peek()
tries = if (e eq null) 0 else 1
e
case `maxRetries` =>
tries = Marker
queue.poll()
case n =>
tries = n + 1
queue.peek()
}
def ack(): Unit = {
// do not dequeue for real if double-ack() or ack() on last try
if (tries != 0 && tries != Marker) queue.poll()
tries = 0
}
override def cleanUp(owner: ActorRef, deadLetters: MessageQueue): Unit = {
tries = -1 // put the queue into auto-ack mode
super.cleanUp(owner, deadLetters)
PeekMailboxExtension(system).unregister(owner)
}
}

View file

@ -1,202 +0,0 @@
/*
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.contrib.pattern
import akka.actor.Actor
import scala.annotation.tailrec
/**
* The aggregator is to be mixed into an actor for the aggregator behavior.
*/
@deprecated("Feel free to copy", "2.5.0")
trait Aggregator {
this: Actor =>
private var processing = false
private val expectList = WorkList.empty[Actor.Receive]
private val addBuffer = WorkList.empty[Actor.Receive]
/**
* Adds the partial function to the receive set, to be removed on first match.
* @param fn The receive function.
* @return The same receive function.
*/
def expectOnce(fn: Actor.Receive): Actor.Receive = {
if (processing) addBuffer.add(fn, permanent = false)
else expectList.add(fn, permanent = false)
fn
}
/**
* Adds the partial function to the receive set and keeping it in the receive set till removed.
* @param fn The receive function.
* @return The same receive function.
*/
def expect(fn: Actor.Receive): Actor.Receive = {
if (processing) addBuffer.add(fn, permanent = true)
else expectList.add(fn, permanent = true)
fn
}
/**
* Removes the partial function from the receive set.
* @param fn The receive function.
* @return True if the partial function is removed, false if not found.
*/
def unexpect(fn: Actor.Receive): Boolean = {
if (expectList.remove(fn)) true
else if (processing && (addBuffer.remove(fn))) true
else false
}
/**
* Receive function for handling the aggregations.
*/
def receive: Actor.Receive = {
case msg if handleMessage(msg) => // already dealt with in handleMessage
}
/**
* Handles messages and matches against the expect list.
* @param msg The message to be handled.
* @return true if message is successfully processed, false otherwise.
*/
def handleMessage(msg: Any): Boolean = {
processing = true
try {
expectList.process { fn =>
var processed = true
fn.applyOrElse(msg, (_: Any) => processed = false)
processed
}
} finally {
processing = false
expectList.addAll(addBuffer)
addBuffer.removeAll()
}
}
}
/**
* Provides the utility methods and constructors to the WorkList class.
*/
@deprecated("Feel free to copy", "2.5.0")
object WorkList {
def empty[T] = new WorkList[T]
/**
* Singly linked list entry implementation for WorkList.
* @param ref The item reference, None for head entry
* @param permanent If the entry is to be kept after processing
*/
class Entry[T](val ref: Option[T], val permanent: Boolean) {
var next: Entry[T] = null
var isDeleted = false
}
}
/**
* Fast, small, and dirty implementation of a linked list that removes transient work entries once they are processed.
* The list is not thread safe! However it is expected to be reentrant. This means a processing function can add/remove
* entries from the list while processing. Most important, a processing function can remove its own entry from the list.
* The first remove must return true and any subsequent removes must return false.
*/
@deprecated("Feel free to copy", "2.5.0")
class WorkList[T] {
import WorkList._
val head = new Entry[T](None, true)
var tail = head
/**
* Appends an entry to the work list.
* @param ref The entry.
* @return The updated work list.
*/
def add(ref: T, permanent: Boolean) = {
if (tail == head) {
tail = new Entry[T](Some(ref), permanent)
head.next = tail
} else {
tail.next = new Entry[T](Some(ref), permanent)
tail = tail.next
}
this
}
/**
* Removes an entry from the work list
* @param ref The entry.
* @return True if the entry is removed, false if the entry is not found.
*/
def remove(ref: T): Boolean = {
@tailrec
def remove(parent: Entry[T], entry: Entry[T]): Boolean = {
if (entry.ref.get == ref) {
parent.next = entry.next // Remove entry
if (tail == entry) tail = parent
entry.isDeleted = true
true
} else if (entry.next != null) remove(entry, entry.next)
else false
}
if (head.next == null) false else remove(head, head.next)
}
/**
* Tries to process each entry using the processing function. Stops at the first entry processing succeeds.
* If the entry is not permanent, the entry is removed.
* @param processFn The processing function, returns true if processing succeeds.
* @return true if an entry has been processed, false if no entries are processed successfully.
*/
def process(processFn: T => Boolean): Boolean = {
@tailrec
def process(parent: Entry[T], entry: Entry[T]): Boolean = {
val processed = processFn(entry.ref.get)
if (processed) {
if (!entry.permanent && !entry.isDeleted) {
parent.next = entry.next // Remove entry
if (tail == entry) tail = parent
entry.isDeleted = true
}
true // Handled
} else if (entry.next != null) process(entry, entry.next)
else false
}
if (head.next == null) false else process(head, head.next)
}
/**
* Appends another WorkList to this WorkList.
* @param other The other WorkList
* @return This WorkList
*/
def addAll(other: WorkList[T]) = {
if (other.head.next != null) {
tail.next = other.head.next
tail = other.tail
}
this
}
/**
* Removes all entries from this WorkList
* @return True if at least one entry is removed. False if none is removed.
*/
def removeAll() = {
if (head.next == null) false
else {
head.next = null
tail = head
true
}
}
}

View file

@ -1,125 +0,0 @@
/*
* Copyright (C) 2018-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.contrib.pattern
import akka.actor.Actor
@deprecated("Feel free to copy", "2.5.0")
object ReceivePipeline {
/**
* Result returned by an interceptor PF to determine what/whether to delegate to the next inner interceptor
*/
sealed trait Delegation
case class Inner(transformedMsg: Any) extends Delegation {
/**
* Add a block of code to be executed after the message (which may be further transformed and processed by
* inner interceptors) is handled by the actor's receive.
*
* The block of code will be executed before similar blocks in outer interceptors.
*/
def andAfter(after: => Unit): Delegation = InnerAndAfter(transformedMsg, (_ => after))
}
private[ReceivePipeline] case class InnerAndAfter(transformedMsg: Any, after: Unit => Unit) extends Delegation
/**
* Interceptor return value that indicates that the message has been handled
* completely. The message will not be passed to inner interceptors
* (or to the decorated actor's receive).
*/
case object HandledCompletely extends Delegation
private def withDefault(interceptor: Interceptor): Interceptor = interceptor.orElse({ case msg => Inner(msg) })
type Interceptor = PartialFunction[Any, Delegation]
private sealed trait HandlerResult
private case object Done extends HandlerResult
private case object Undefined extends HandlerResult
private type Handler = Any => HandlerResult
}
/**
* Trait implementing Receive Pipeline Pattern. Mixin this trait
* for configuring a chain of interceptors to be applied around
* Actor's current behavior.
*/
@deprecated("Feel free to copy", "2.5.0")
trait ReceivePipeline extends Actor {
import ReceivePipeline._
private var pipeline: Vector[Interceptor] = Vector.empty
private var decoratorCache: Option[(Receive, Receive)] = None
/**
* Adds an inner interceptor, it will be applied lastly, near to Actor's original behavior
* @param interceptor an interceptor
*/
def pipelineInner(interceptor: Interceptor): Unit = {
pipeline :+= withDefault(interceptor)
decoratorCache = None
}
/**
* Adds an outer interceptor, it will be applied firstly, far from Actor's original behavior
* @param interceptor an interceptor
*/
def pipelineOuter(interceptor: Interceptor): Unit = {
pipeline +:= withDefault(interceptor)
decoratorCache = None
}
private def combinedDecorator: Receive => Receive = { receive =>
// So that reconstructed Receive PF is undefined only when the actor's
// receive is undefined for a transformed message that reaches it...
val innerReceiveHandler: Handler = {
case msg => receive.lift(msg).map(_ => Done).getOrElse(Undefined)
}
val zipped = pipeline.foldRight(innerReceiveHandler) { (outerInterceptor, innerHandler) =>
outerInterceptor.andThen {
case Inner(msg) => innerHandler(msg)
case InnerAndAfter(msg, after) =>
try innerHandler(msg)
finally after(())
case HandledCompletely => Done
}
}
toReceive(zipped)
}
private def toReceive(handler: Handler) = new Receive {
def isDefinedAt(m: Any): Boolean = evaluate(m) != Undefined
def apply(m: Any): Unit = evaluate(m)
override def applyOrElse[A1 <: Any, B1 >: Unit](m: A1, default: A1 => B1): B1 = {
val result = handler(m)
if (result == Undefined) default(m)
}
private def evaluate(m: Any) = handler(m)
}
/**
* INTERNAL API.
*/
override protected[akka] def aroundReceive(receive: Receive, msg: Any): Unit = {
def withCachedDecoration(decorator: Receive => Receive): Receive = decoratorCache match {
case Some((`receive`, cached)) => cached
case _ =>
val decorated = decorator(receive)
decoratorCache = Some((receive, decorated))
decorated
}
super.aroundReceive(withCachedDecoration(combinedDecorator), msg)
}
}

View file

@ -1,407 +0,0 @@
/*
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.contrib.pattern
import akka.actor._
import akka.remote.RemoteScope
import scala.concurrent.duration._
import scala.util.Try
import java.util.concurrent.TimeUnit
@deprecated("Use AtLeastOnceDelivery instead", "2.5.0")
object ReliableProxy {
/**
* Scala API Props. Arguments are detailed in the [[akka.contrib.pattern.ReliableProxy]]
* constructor.
*/
def props(
targetPath: ActorPath,
retryAfter: FiniteDuration,
reconnectAfter: Option[FiniteDuration],
maxReconnects: Option[Int]): Props = {
Props(new ReliableProxy(targetPath, retryAfter, reconnectAfter, maxReconnects))
}
/**
* Java API Props. Arguments are detailed in the [[akka.contrib.pattern.ReliableProxy]]
* constructor.
*/
def props(
targetPath: ActorPath,
retryAfter: FiniteDuration,
reconnectAfter: FiniteDuration,
maxReconnects: Int): Props = {
props(targetPath, retryAfter, Option(reconnectAfter), if (maxReconnects > 0) Some(maxReconnects) else None)
}
/**
* Props with no limit on reconnections. Arguments are detailed in the [[akka.contrib.pattern.ReliableProxy]]
* constructor.
*/
def props(targetPath: ActorPath, retryAfter: FiniteDuration, reconnectAfter: FiniteDuration): Props = {
props(targetPath, retryAfter, Option(reconnectAfter), None)
}
/**
* Props with no reconnections. Arguments are detailed in the [[akka.contrib.pattern.ReliableProxy]]
* constructor.
*/
def props(targetPath: ActorPath, retryAfter: FiniteDuration): Props = {
props(targetPath, retryAfter, None, None)
}
class Receiver(target: ActorRef, initialSerial: Int) extends Actor with ReliableProxyDebugLogging {
var lastSerial = initialSerial
context.watch(target)
def receive = {
case Message(msg, snd, serial) =>
if (serial == lastSerial + 1) {
target.tell(msg, snd)
sender() ! Ack(serial)
lastSerial = serial
} else if (compare(serial, lastSerial) <= 0) {
sender() ! Ack(serial)
} else {
logDebug("Received message from {} with wrong serial: {}", snd, msg)
}
case Terminated(`target`) => context.stop(self)
}
}
/**
* Wrap-around aware comparison of integers: differences limited to 2**31-1
* in magnitude will work correctly.
*/
def compare(a: Int, b: Int): Int = {
val c = a - b
c match {
case x if x < 0 => -1
case x if x == 0 => 0
case x if x > 0 => 1
}
}
def receiver(target: ActorRef, currentSerial: Int): Props = Props(classOf[Receiver], target, currentSerial)
// Internal messages
final case class Message(msg: Any, sender: ActorRef, serial: Int)
private final case class Ack(serial: Int)
private case object Tick
private case object ReconnectTick
/**
* `TargetChanged` is sent to transition subscribers when the initial connection is made
* the target and when the target `ActorRef` has changed (for example, the target system
* crashed and has been restarted).
*/
final case class TargetChanged(ref: ActorRef)
/**
* `ProxyTerminated` is sent to transition subscribers during `postStop`. Any outstanding
* unsent messages are contained the `Unsent` object.
*/
final case class ProxyTerminated(actor: ActorRef, outstanding: Unsent)
final case class Unsent(queue: Vector[Message])
sealed trait State
case object Idle extends State
case object Active extends State
case object Connecting extends State
// Java API
val idle = Idle
val active = Active
val reconnecting = Connecting
}
/**
* INTERNAL API
*/
private[akka] trait ReliableProxyDebugLogging extends ActorLogging { this: Actor =>
val debug: Boolean =
Try(context.system.settings.config.getBoolean("akka.reliable-proxy.debug")).getOrElse(false)
def enabled: Boolean = debug && log.isDebugEnabled
def addSelf(template: String): String = s"$template [$self]"
def logDebug(template: String, arg1: Any, arg2: Any): Unit =
if (enabled) log.debug(addSelf(template), arg1, arg2)
def logDebug(template: String, arg1: Any): Unit =
if (enabled) log.debug(addSelf(template), arg1)
}
import ReliableProxy._
/**
* A ReliableProxy is a means to wrap a remote actor reference in order to
* obtain certain improved delivery guarantees:
*
* - as long as the proxy is not terminated before it sends all of its queued
* messages then no messages will be lost
* - messages re-sent due to the first point will not be delivered out-of-order,
* message ordering is preserved
*
* These guarantees are valid for the communication between the two end-points
* of the reliable tunnel, which usually spans an unreliable network.
*
* Note that the ReliableProxy guarantees at-least-once, not exactly-once, delivery.
*
* Delivery from the remote end-point to the target actor is still subject to in-JVM
* delivery semantics (i.e. not strictly guaranteed due to possible OutOfMemory
* situations or other VM errors).
*
* You can create a reliable connection like this:
*
* In Scala:
* {{{
* val proxy = context.actorOf(ReliableProxy.props(target, 100.millis, 120.seconds)
* }}}
* or in Java:
* {{{
* final ActorRef proxy = getContext().actorOf(ReliableProxy.props(
* target, Duration.create(100, "millis"), Duration.create(120, "seconds")));
* }}}
*
* '''''Please note:''''' the tunnel is uni-directional, and original sender
* information is retained, hence replies by the wrapped target reference will
* go back in the normal unreliable way unless also secured by a ReliableProxy
* from the remote end.
*
* ==Message Types==
*
* This actor is an [[akka.actor.FSM]], hence it offers the service of
* transition callbacks to those actors which subscribe using the
* `SubscribeTransitionCallBack` and `UnsubscribeTransitionCallBack`
* messages; see [[akka.actor.FSM]] for more documentation. The proxy will
* transition into `ReliableProxy.Active` state when ACKs
* are outstanding and return to the `ReliableProxy.Idle`
* state when every message send so far has been confirmed by the peer end-point.
*
* The initial state of the proxy is `ReliableProxy.Connecting`. In this state the
* proxy will repeatedly send [[akka.actor.Identify]] messages to `ActorSelection(targetPath)`
* in order to obtain a new `ActorRef` for the target. When an [[akka.actor.ActorIdentity]]
* for the target is received a new tunnel will be created, a [[ReliableProxy.TargetChanged]]
* message containing the target `ActorRef` will be sent to the proxy's transition subscribers
* and the proxy will transition into either the `ReliableProxy.Idle` or `ReliableProxy.Active`
* state, depending if there are any outstanding messages that need to be delivered. If
* `maxConnectAttempts` is defined this actor will stop itself after `Identify` is sent
* `maxConnectAttempts` times.
*
* While in the `Idle` or `Active` states, if a communication failure causes the tunnel to
* terminate via Remote Deathwatch the proxy will transition into the `ReliableProxy.Connecting`
* state as described above. After reconnecting `TargetChanged` will be sent only if the target
* `ActorRef` has changed.
*
* If this actor is stopped and it still has outstanding messages a
* [[ReliableProxy.ProxyTerminated]] message will be sent to the
* transition subscribers. It contains an `Unsent` object with the outstanding messages.
*
* If an [[ReliableProxy.Unsent]] message is sent to this actor
* the messages contained within it will be relayed through the tunnel to the target.
*
* Any other message type sent to this actor will be delivered via a remote-deployed
* child actor to the designated target.
*
* ==Failure Cases==
*
* All failures of either the local or the remote end-point are escalated to the
* parent of this actor; there are no specific error cases which are predefined.
*
* ==Arguments==
* See the constructor below for the arguments for this actor. However, prefer using
* [[akka.contrib.pattern.ReliableProxy#props]] to this actor's constructor.
*
* @param targetPath is the `ActorPath` to the actor to which all messages will be forwarded.
* `targetPath` can point to a local or remote actor, but the tunnel endpoint will be
* deployed remotely on the node where the target actor lives.
* @param retryAfter is the ACK timeout after which all outstanding messages
* will be resent. There is no limit on the queue size or the number of retries.
* @param reconnectAfter &nbsp;is an optional interval between connection attempts.
* It is also used as the interval between receiving a `Terminated` for the tunnel and
* attempting to reconnect to the target actor. The minimum recommended value for this is
* the value of the configuration setting `akka.remote.retry-gate-closed-for`. Use `None`
* to never reconnect after a disconnection.
* @param maxConnectAttempts &nbsp;is an optional maximum number of attempts to connect to the
* target actor. Use `None` for no limit. If `reconnectAfter` is `None` this value is ignored.
*/
@deprecated("Use AtLeastOnceDelivery instead", "2.5.0")
class ReliableProxy(
targetPath: ActorPath,
retryAfter: FiniteDuration,
reconnectAfter: Option[FiniteDuration],
maxConnectAttempts: Option[Int])
extends Actor
with LoggingFSM[State, Vector[Message]]
with ReliableProxyDebugLogging {
import FSM.`->`
var tunnel: ActorRef = _
var currentSerial: Int = 0
var lastAckSerial: Int = _
var currentTarget: ActorRef = _
var attemptedReconnects: Int = _
val resendTimer = "resend"
val reconnectTimer = "reconnect"
val retryGateClosedFor =
Try(context.system.settings.config.getDuration("akka.remote.retry-gate-closed-for", TimeUnit.MILLISECONDS))
.map(_.longValue)
.getOrElse(5000L)
val defaultConnectInterval =
Try(
context.system.settings.config.getDuration("akka.reliable-proxy.default-connect-interval", TimeUnit.MILLISECONDS))
.map(_.longValue)
.getOrElse(retryGateClosedFor)
.millis
val initialState = Connecting
self ! ReconnectTick
def createTunnel(target: ActorRef): Unit = {
logDebug("Creating new tunnel for {}", target)
tunnel = context.actorOf(
receiver(target, lastAckSerial).withDeploy(Deploy(scope = RemoteScope(target.path.address))),
"tunnel")
context.watch(tunnel)
currentTarget = target
attemptedReconnects = 0
resetBackoff()
}
if (targetPath.address.host.isEmpty && self.path.address == targetPath.address) {
logDebug("Unnecessary to use ReliableProxy for local target: {}", targetPath)
}
override def supervisorStrategy = OneForOneStrategy() {
case _ => SupervisorStrategy.Escalate
}
override def postStop(): Unit = {
logDebug("Stopping proxy and sending {} messages to subscribers in Unsent", stateData.size)
gossip(ProxyTerminated(self, Unsent(stateData)))
super.postStop()
}
startWith(initialState, Vector.empty)
when(Idle) {
case Event(Terminated(_), _) => terminated()
case Event(Ack(_), _) => stay()
case Event(Unsent(msgs), _) => goto(Active).using(resend(updateSerial(msgs)))
case Event(msg, _) => goto(Active).using(Vector(send(msg, sender())))
}
onTransition {
case _ -> Active => scheduleTick()
case Active -> Idle => cancelTimer(resendTimer)
case _ -> Connecting => scheduleReconnectTick()
}
when(Active) {
case Event(Terminated(_), _) =>
terminated()
case Event(Ack(serial), queue) =>
val q = queue.dropWhile(m => compare(m.serial, serial) <= 0)
if (compare(serial, lastAckSerial) > 0) lastAckSerial = serial
scheduleTick()
if (q.isEmpty) goto(Idle).using(Vector.empty)
else stay.using(q)
case Event(Tick, queue) =>
logResend(queue.size)
queue.foreach { tunnel ! _ }
scheduleTick()
stay()
case Event(Unsent(msgs), queue) =>
stay.using(queue ++ resend(updateSerial(msgs)))
case Event(msg, queue) =>
stay.using(queue :+ send(msg, sender()))
}
when(Connecting) {
case Event(Terminated(_), _) =>
stay()
case Event(ActorIdentity(_, Some(actor)), queue) =>
val curr = currentTarget
cancelTimer(reconnectTimer)
createTunnel(actor)
if (currentTarget != curr) gossip(TargetChanged(currentTarget))
if (queue.isEmpty) goto(Idle) else goto(Active).using(resend(queue))
case Event(ActorIdentity(_, None), _) =>
stay()
case Event(ReconnectTick, _) =>
if (maxConnectAttempts.exists(_ == attemptedReconnects)) {
logDebug("Failed to reconnect after {}", attemptedReconnects)
stop()
} else {
logDebug("{} ! {}", context.actorSelection(targetPath), Identify(targetPath))
context.actorSelection(targetPath) ! Identify(targetPath)
scheduleReconnectTick()
attemptedReconnects += 1
stay()
}
case Event(Unsent(msgs), queue) =>
stay.using(queue ++ updateSerial(msgs))
case Event(msg, queue) =>
stay.using(queue :+ Message(msg, sender(), nextSerial()))
}
def scheduleTick(): Unit = setTimer(resendTimer, Tick, retryAfter, repeat = false)
def nextSerial(): Int = {
currentSerial += 1
currentSerial
}
def send(msg: Any, snd: ActorRef): Message = {
val m = Message(msg, snd, nextSerial())
tunnel ! m
m
}
def updateSerial(q: Vector[Message]) = q.map(_.copy(serial = nextSerial()))
def resend(q: Vector[Message]): Vector[Message] = {
logResend(q.size)
q.foreach { tunnel ! _ }
q
}
def logResend(size: Int): Unit =
logDebug("Resending {} messages through tunnel", size)
def terminated(): State = {
logDebug("Terminated: {}", targetPath)
if (reconnectAfter.isDefined) goto(Connecting)
else stop()
}
def scheduleReconnectTick(): Unit = {
val delay = nextBackoff()
logDebug("Will attempt to reconnect to {} in {}", targetPath, delay)
setTimer(reconnectTimer, ReconnectTick, delay, repeat = false)
}
/**
* Reset backoff interval.
*
* This and nextBackoff are meant to be implemented by subclasses.
*/
def resetBackoff(): Unit = {}
/**
* Returns the next retry interval duration. By default each interval is the same, reconnectAfter.
*/
def nextBackoff(): FiniteDuration = reconnectAfter.getOrElse(defaultConnectInterval)
}

View file

@ -1,320 +0,0 @@
/*
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.contrib.throttle
import scala.concurrent.duration.{ Duration, FiniteDuration }
import scala.collection.immutable.{ Queue => Q }
import akka.actor.{ Actor, ActorRef, FSM }
import Throttler._
import TimerBasedThrottler._
import java.util.concurrent.TimeUnit
/**
* @see [[akka.contrib.throttle.TimerBasedThrottler]]
* @see [[akka.contrib.throttle.Throttler.Rate]]
* @see [[akka.contrib.throttle.Throttler.SetRate]]
* @see [[akka.contrib.throttle.Throttler.SetTarget]]
*/
@deprecated("Use streams, see migration guide", "2.5.0")
object Throttler {
/**
* A rate used for throttling.
*
* Scala API: There are some shorthands available to construct rates:
* {{{
* import java.util.concurrent.TimeUnit._
* import scala.concurrent.duration.{ Duration, FiniteDuration }
*
* val rate1 = 1 msgsPer (1, SECONDS)
* val rate2 = 1 msgsPer Duration(1, SECONDS)
* val rate3 = 1 msgsPer (1 seconds)
* val rate4 = 1 msgsPerSecond
* val rate5 = 1 msgsPerMinute
* val rate6 = 1 msgsPerHour
* }}}
*
* @param numberOfCalls the number of calls that may take place in a period
* @param duration the length of the period
* @see [[akka.contrib.throttle.Throttler]]
*/
final case class Rate(val numberOfCalls: Int, val duration: FiniteDuration) {
/**
* The duration in milliseconds.
*/
def durationInMillis(): Long = duration.toMillis
}
/**
* Set the target of a throttler.
*
* You may change a throttler's target at any time.
*
* Notice that the messages sent by the throttler to the target will have the original sender (and
* not the throttler) as the sender. (In Akka terms, the throttler `forward`s the message.)
*
* @param target if `target` is `None`, the throttler will stop delivering messages and the messages already received
* but not yet delivered, as well as any messages received in the future will be queued
* and eventually be delivered when a new target is set. If `target` is not `None`, the currently queued messages
* as well as any messages received in the future will be delivered to the new target at a rate not exceeding the current throttler's rate.
*/
final case class SetTarget(target: Option[ActorRef]) {
/**
* Java API:
* @param target if `target` is `null`, the throttler will stop delivering messages and the messages already received
* but not yet delivered, as well as any messages received in the future will be queued
* and eventually be delivered when a new target is set. If `target` is not `null`, the currently queued messages
* as well as any messages received in the future will be delivered to the new target at a rate not exceeding
* the current throttler's rate.
*/
def this(target: ActorRef) = this(Option(target))
}
/**
* Set the rate of a throttler.
*
* You may change a throttler's rate at any time.
*
* @param rate the rate at which messages will be delivered to the target of the throttler
*/
final case class SetRate(rate: Rate)
/**
* Helper for some syntactic sugar.
*
* @see [[akka.contrib.throttle.Throttler.Rate]]
*/
implicit class RateInt(val numberOfCalls: Int) extends AnyVal {
def msgsPer(duration: Int, timeUnit: TimeUnit) = Rate(numberOfCalls, Duration(duration, timeUnit))
def msgsPer(duration: FiniteDuration) = Rate(numberOfCalls, duration)
def msgsPerSecond = Rate(numberOfCalls, Duration(1, TimeUnit.SECONDS))
def msgsPerMinute = Rate(numberOfCalls, Duration(1, TimeUnit.MINUTES))
def msgsPerHour = Rate(numberOfCalls, Duration(1, TimeUnit.HOURS))
}
}
/**
* INTERNAL API
*/
private[throttle] object TimerBasedThrottler {
case object Tick
// States of the FSM: A `TimerBasedThrottler` is in state `Active` iff the timer is running.
sealed trait State
case object Idle extends State
case object Active extends State
// Messages, as we queue them to be sent later
final case class Message(message: Any, sender: ActorRef)
// The data of the FSM
final case class Data(target: Option[ActorRef], callsLeftInThisPeriod: Int, queue: Q[Message])
}
/**
* A throttler that uses a timer to control the message delivery rate.
*
* == Throttling ==
* A <em>throttler</em> is an actor that is defined through a <em>target actor</em> and a <em>rate</em>
* (of type [[akka.contrib.throttle.Throttler.Rate]]). You set or change the target and rate at any time through the
* [[akka.contrib.throttle.Throttler.SetTarget]] and [[akka.contrib.throttle.Throttler.SetRate]]
* messages, respectively. When you send the throttler any other message `msg`, it will
* put the message `msg` into an internal queue and eventually send all queued messages to the target, at
* a speed that respects the given rate. If no target is currently defined then the messages will be queued
* and will be delivered as soon as a target gets set.
*
* A throttler understands actor messages of type
* [[akka.contrib.throttle.Throttler.SetTarget]], [[akka.contrib.throttle.Throttler.SetRate]], in
* addition to any other messages, which the throttler will consider as messages to be sent to
* the target.
*
* == Transparency ==
* Notice that the throttler `forward`s messages, i.e., the target will see the original message sender
* (and not the throttler) as the sender of the message.
*
* == Persistence ==
* Throttlers usually use an internal queue to keep the messages that need to be sent to the target.
* You therefore cannot rely on the throttler's inbox size in order to learn how much messages are
* outstanding.
*
* It is left to the implementation whether the internal queue is persisted over application restarts or
* actor failure.
*
* == Processing messages ==
* The target should process messages as fast as possible. If the target requires substantial time to
* process messages, it should distribute its work to other actors (using for example something like
* a `BalancingDispatcher`), otherwise the resulting system will always work <em>below</em>
* the threshold rate.
*
* <em>Example:</em> Suppose the throttler has a rate of 3msg/s and the target requires 1s to process a message.
* This system will only process messages at a rate of 1msg/s: the target will receive messages at at most 3msg/s
* but as it handles them synchronously and each of them takes 1s, its inbox will grow and grow. In such
* a situation, the target should <em>distribute</em> its messages to a set of worker actors so that individual messages
* can be handled in parallel.
*
* ==Example==
* For example, if you set a rate like "3 messages in 1 second", the throttler
* will send the first three messages immediately to the target actor but will need to impose a delay before
* sending out further messages:
* {{{
* // A simple actor that prints whatever it receives
* class Printer extends Actor {
* def receive = {
* case x => println(x)
* }
* }
*
* val printer = system.actorOf(Props[Printer], "printer")
*
* // The throttler for this example, setting the rate
* val throttler = system.actorOf(Props(classOf[TimerBasedThrottler], 3 msgsPer 1.second))
*
* // Set the target
* throttler ! SetTarget(Some(printer))
* // These three messages will be sent to the printer immediately
* throttler ! "1"
* throttler ! "2"
* throttler ! "3"
* // These two will wait at least until 1 second has passed
* throttler ! "4"
* throttler ! "5"
* }}}
*
* ==Implementation notes==
* This throttler implementation internally installs a timer that repeats every `rate.durationInMillis` and enables `rate.numberOfCalls`
* additional calls to take place. A `TimerBasedThrottler` uses very few system resources, provided the rate's duration is not too
* fine-grained (which would cause a lot of timer invocations); for example, it does not store the calling history
* as other throttlers may need to do.
*
* However, a `TimerBasedThrottler` only provides ''weak guarantees'' on the rate (see also
* <a href='http://letitcrash.com/post/28901663062/throttling-messages-in-akka-2'>this blog post</a>):
*
* - Only ''delivery'' times are taken into account: if, for example, the throttler is used to throttle
* requests to an external web service then only the start times of the web requests are considered.
* If a web request takes very long on the server then more than `rate.numberOfCalls`-many requests
* may be observed on the server in an interval of duration `rate.durationInMillis()`.
* - There may be intervals of duration `rate.durationInMillis()` that contain more than `rate.numberOfCalls`
* message deliveries: a `TimerBasedThrottler` only makes guarantees for the intervals
* of its ''own'' timer, namely that no more than `rate.numberOfCalls`-many messages are delivered within such intervals. Other intervals on the
* timeline may contain more calls.
*
* For some applications, these guarantees may not be sufficient.
*
* ==Known issues==
*
* - If you change the rate using `SetRate(rate)`, the actual rate may in fact be higher for the
* overlapping period (i.e., `durationInMillis()`) of the new and old rate. Therefore,
* changing the rate frequently is not recommended with the current implementation.
* - The queue of messages to be delivered is not persisted in any way; actor or system failure will
* cause the queued messages to be lost.
*
* @see [[akka.contrib.throttle.Throttler]]
*/
@deprecated("Use streams, see migration guide", "2.5.0")
class TimerBasedThrottler(var rate: Rate) extends Actor with FSM[State, Data] {
import FSM.`->`
this.rate = normalizedRate(rate)
startWith(Idle, Data(None, rate.numberOfCalls, Q()))
// Idle: no messages, or target not set
when(Idle) {
// Set the rate
case Event(SetRate(newRate), d) =>
this.rate = normalizedRate(newRate)
stay.using(d.copy(callsLeftInThisPeriod = rate.numberOfCalls))
// Set the target
case Event(SetTarget(t @ Some(_)), d) if !d.queue.isEmpty =>
goto(Active).using(deliverMessages(d.copy(target = t)))
case Event(SetTarget(t), d) =>
stay.using(d.copy(target = t))
// Queuing
case Event(msg, d @ Data(None, _, queue)) =>
stay.using(d.copy(queue = queue.enqueue(Message(msg, context.sender()))))
case Event(msg, d @ Data(Some(_), _, Seq())) =>
goto(Active).using(deliverMessages(d.copy(queue = Q(Message(msg, context.sender())))))
// Note: The case Event(msg, t @ Data(Some(_), _, _, Seq(_*))) should never happen here.
}
when(Active) {
// Set the rate
case Event(SetRate(newRate), d) =>
this.rate = normalizedRate(newRate)
// Note: this should be improved (see "Known issues" in class comments)
stopTimer()
startTimer(rate)
stay.using(d.copy(callsLeftInThisPeriod = rate.numberOfCalls))
// Set the target (when the new target is None)
case Event(SetTarget(None), d) =>
// Note: We do not yet switch to state `Inactive` because we need the timer to tick once more before
stay.using(d.copy(target = None))
// Set the target (when the new target is not None)
case Event(SetTarget(t @ Some(_)), d) =>
stay.using(d.copy(target = t))
// Tick after a `SetTarget(None)`: take the additional permits and go to `Idle`
case Event(Tick, d @ Data(None, _, _)) =>
goto(Idle).using(d.copy(callsLeftInThisPeriod = rate.numberOfCalls))
// Period ends and we have no more messages: take the additional permits and go to `Idle`
case Event(Tick, d @ Data(_, _, Seq())) =>
goto(Idle).using(d.copy(callsLeftInThisPeriod = rate.numberOfCalls))
// Period ends and we get more occasions to send messages
case Event(Tick, d @ Data(_, _, _)) =>
stay.using(deliverMessages(d.copy(callsLeftInThisPeriod = rate.numberOfCalls)))
// Queue a message (when we cannot send messages in the current period anymore)
case Event(msg, d @ Data(_, 0, queue)) =>
stay.using(d.copy(queue = queue.enqueue(Message(msg, context.sender()))))
// Queue a message (when we can send some more messages in the current period)
case Event(msg, d @ Data(_, _, queue)) =>
stay.using(deliverMessages(d.copy(queue = queue.enqueue(Message(msg, context.sender())))))
}
onTransition {
case Idle -> Active => startTimer(rate)
case Active -> Idle => stopTimer()
}
initialize()
private def startTimer(rate: Rate) = setTimer("morePermits", Tick, rate.duration, true)
private def stopTimer() = cancelTimer("morePermits")
// Rate.numberOfCalls is an integer. So, the finest granularity of timing (i.e., highest
// precision) is achieved when it equals 1. So, the following function normalizes
// a Rate to 1 numberOfCall per calculated unit time.
private def normalizedRate(rate: Rate): Rate = {
// If number of calls is zero then we don't need to do anything
if (rate.numberOfCalls == 0) {
rate
} else {
Rate(1, FiniteDuration(rate.duration.toNanos / rate.numberOfCalls, TimeUnit.NANOSECONDS))
}
}
/**
* Send as many messages as we can (while respecting the rate) to the target and
* return the state data (with the queue containing the remaining ones).
*/
private def deliverMessages(data: Data): Data = {
val queue = data.queue
val nrOfMsgToSend = scala.math.min(queue.length, data.callsLeftInThisPeriod)
queue.take(nrOfMsgToSend).foreach(x => data.target.get.tell(x.message, x.sender))
data.copy(queue = queue.drop(nrOfMsgToSend), callsLeftInThisPeriod = data.callsLeftInThisPeriod - nrOfMsgToSend)
}
}

View file

@ -1,388 +0,0 @@
/*
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.contrib.pattern
import language.postfixOps
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.remote.testkit.STMultiNodeSpec
import org.scalatest.BeforeAndAfterEach
import akka.remote.transport.ThrottlerTransportAdapter.Direction
import akka.actor._
import akka.testkit.ImplicitSender
import scala.concurrent.duration._
import akka.actor.FSM
import akka.actor.ActorRef
import akka.testkit.TestKitExtension
import akka.actor.ActorIdentity
import akka.actor.Identify
import com.typesafe.config.ConfigFactory
object ReliableProxySpec extends MultiNodeConfig {
val local = role("local")
val remote = role("remote")
commonConfig(ConfigFactory.parseString("""
akka.remote.artery.enabled = false
"""))
testTransport(on = true)
}
class ReliableProxyMultiJvmNode1 extends ReliableProxySpec
class ReliableProxyMultiJvmNode2 extends ReliableProxySpec
class ReliableProxySpec
extends MultiNodeSpec(ReliableProxySpec)
with STMultiNodeSpec
with BeforeAndAfterEach
with ImplicitSender {
import ReliableProxySpec._
import ReliableProxy._
override def initialParticipants = roles.size
override def afterEach(): Unit = {
runOn(local) {
testConductor.passThrough(local, remote, Direction.Both).await
}
enterBarrier("after-each")
}
@volatile var target: ActorRef = system.deadLetters
@volatile var proxy: ActorRef = system.deadLetters
def idTarget(): Unit = {
system.actorSelection(node(remote) / "user" / "echo") ! Identify("echo")
target = expectMsgType[ActorIdentity].ref.get
}
def startTarget(): Unit = {
target = system.actorOf(Props(new Actor {
def receive = {
case x => testActor ! x
}
}).withDeploy(Deploy.local), "echo")
}
def stopProxy(): Unit = {
val currentProxy = proxy
currentProxy ! FSM.UnsubscribeTransitionCallBack(testActor)
currentProxy ! PoisonPill
expectTerminated(currentProxy)
}
def expectState(s: State) = expectMsg(FSM.CurrentState(proxy, s))
def expectTransition(s1: State, s2: State) = expectMsg(FSM.Transition(proxy, s1, s2))
def expectTransition(max: FiniteDuration, s1: State, s2: State) = expectMsg(max, FSM.Transition(proxy, s1, s2))
def sendN(n: Int) = (1 to n).foreach(proxy ! _)
def expectN(n: Int) = (1 to n).foreach { n =>
expectMsg(n); lastSender should ===(target)
}
// avoid too long timeout for expectNoMsg when using dilated timeouts, because
// blackhole will trigger failure detection
val expectNoMsgTimeout = {
val timeFactor = TestKitExtension(system).TestTimeFactor
if (timeFactor > 1.0) (1.0 / timeFactor).seconds else 1.second
}
"A ReliableProxy" must {
"initialize properly" in {
runOn(remote) {
startTarget()
}
enterBarrier("initialize")
runOn(local) {
import akka.contrib.pattern.ReliableProxy
idTarget()
proxy = system.actorOf(ReliableProxy.props(target.path, 100.millis, 5.seconds), "proxy1")
watch(proxy)
proxy ! FSM.SubscribeTransitionCallBack(testActor)
expectState(Connecting)
proxy ! "hello"
expectMsgType[TargetChanged]
expectTransition(Connecting, Active)
expectTransition(Active, Idle)
}
runOn(remote) {
expectMsg(1.second, "hello")
}
enterBarrier("initialize-done")
}
"forward messages in sequence" in {
runOn(local) {
sendN(100)
expectTransition(Idle, Active)
expectTransition(Active, Idle)
}
runOn(remote) {
within(5 seconds) {
expectN(100)
}
}
enterBarrier("test1a")
runOn(local) {
sendN(100)
expectTransition(Idle, Active)
expectTransition(Active, Idle)
}
runOn(remote) {
within(5 seconds) {
expectN(100)
}
}
enterBarrier("test1b")
}
"retry when sending fails" in {
runOn(local) {
testConductor.blackhole(local, remote, Direction.Send).await
sendN(100)
expectTransition(1 second, Idle, Active)
expectNoMsg(expectNoMsgTimeout)
}
enterBarrier("test2a")
runOn(remote) {
expectNoMsg(0 seconds)
}
enterBarrier("test2b")
runOn(local) {
testConductor.passThrough(local, remote, Direction.Send).await
expectTransition(5 seconds, Active, Idle)
}
runOn(remote) {
within(5 seconds) {
expectN(100)
}
}
enterBarrier("test2c")
}
"retry when receiving fails" in {
runOn(local) {
testConductor.blackhole(local, remote, Direction.Receive).await
sendN(100)
expectTransition(1 second, Idle, Active)
expectNoMsg(expectNoMsgTimeout)
}
runOn(remote) {
within(5 second) {
expectN(100)
}
}
enterBarrier("test3a")
runOn(local) {
testConductor.passThrough(local, remote, Direction.Receive).await
expectTransition(5 seconds, Active, Idle)
}
enterBarrier("test3b")
}
"resend across a slow outbound link" in {
runOn(local) {
// the rateMBit value is derived from empirical studies so that it will trigger resends,
// the exact value is not important, but it should not be too large
testConductor.throttle(local, remote, Direction.Send, rateMBit = 0.02).await
sendN(50)
within(5 seconds) {
expectTransition(Idle, Active)
// use the slow link for a while, which will trigger resends
Thread.sleep(2000)
// full speed, and it will catch up outstanding messages
testConductor.passThrough(local, remote, Direction.Send).await
expectTransition(Active, Idle)
}
}
runOn(remote) {
within(5 seconds) {
expectN(50)
}
expectNoMsg(expectNoMsgTimeout)
}
enterBarrier("test4")
}
"resend across a slow inbound link" in {
runOn(local) {
testConductor.passThrough(local, remote, Direction.Send).await
// the rateMBit value is derived from empirical studies so that it will trigger resends,
// the exact value is not important, but it should not be too large
testConductor.throttle(local, remote, Direction.Receive, rateMBit = 0.02).await
sendN(50)
within(5 seconds) {
expectTransition(Idle, Active)
// use the slow link for a while, which will trigger resends
Thread.sleep(2000)
// full speed, and it will catch up outstanding messages
testConductor.passThrough(local, remote, Direction.Receive).await
expectTransition(Active, Idle)
}
}
runOn(remote) {
within(5 second) {
expectN(50)
}
expectNoMsg(1 seconds)
}
enterBarrier("test5")
}
"reconnect to target" in {
runOn(remote) {
// Stop the target
system.stop(target)
}
runOn(local) {
// After the target stops the proxy will change to Reconnecting
within(5 seconds) {
expectTransition(Idle, Connecting)
}
// Send some messages while it's reconnecting
sendN(50)
}
enterBarrier("test6a")
runOn(remote) {
// Restart the target to have something to reconnect to
startTarget()
}
runOn(local) {
// After reconnecting a we'll get a TargetChanged message
// and the proxy will transition to Active to send the outstanding messages
within(10 seconds) {
expectMsgType[TargetChanged]
expectTransition(Connecting, Active)
}
}
enterBarrier("test6b")
runOn(local) {
// After the messages have been delivered, proxy is back to idle
expectTransition(Active, Idle)
}
runOn(remote) {
expectN(50)
}
enterBarrier("test6c")
}
"stop proxy if target stops and no reconnection" in {
runOn(local) {
stopProxy() // Stop previous proxy
// Start new proxy with no reconnections
proxy = system.actorOf(ReliableProxy.props(target.path, 100.millis), "proxy2")
proxy ! FSM.SubscribeTransitionCallBack(testActor)
watch(proxy)
expectState(Connecting)
expectMsgType[TargetChanged]
expectTransition(Connecting, Idle)
}
enterBarrier("test7a")
runOn(remote) {
// Stop the target, this will cause the proxy to stop
system.stop(target)
}
runOn(local) {
within(5 seconds) {
expectMsgType[ProxyTerminated]
expectTerminated(proxy)
}
}
enterBarrier("test7b")
}
"stop proxy after max reconnections" in {
runOn(remote) {
// Target is not running after previous test, start it
startTarget()
}
enterBarrier("target-started")
runOn(local) {
// Get new target's ref
idTarget()
}
enterBarrier("test8a")
runOn(local) {
// Proxy is not running after previous test
// Start new proxy with 3 reconnections every 2 sec
proxy = system.actorOf(ReliableProxy.props(target.path, 100.millis, 2.seconds, 3), "proxy3")
proxy ! FSM.SubscribeTransitionCallBack(testActor)
watch(proxy)
expectState(Connecting)
expectMsgType[TargetChanged]
expectTransition(Connecting, Idle)
}
enterBarrier("test8b")
runOn(remote) {
// Stop target
system.stop(target)
}
runOn(local) {
// Wait for transition to Connecting, then send messages
within(5 seconds) {
expectTransition(Idle, Connecting)
}
sendN(50)
}
enterBarrier("test8c")
runOn(local) {
// After max reconnections, proxy stops itself. Expect ProxyTerminated(Unsent(msgs, sender, serial)).
within(5 * 2.seconds) {
val proxyTerm = expectMsgType[ProxyTerminated]
// Validate that the unsent messages are 50 ints
val unsentInts = proxyTerm.outstanding.queue.collect { case Message(i: Int, _, _) if i > 0 && i <= 50 => i }
unsentInts should have size 50
expectTerminated(proxy)
}
}
enterBarrier("test8d")
}
}
}

View file

@ -1,117 +0,0 @@
/*
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.contrib.pattern;
import java.util.concurrent.TimeUnit;
import akka.actor.*;
import akka.testkit.AkkaJUnitActorSystemResource;
import org.junit.ClassRule;
import org.junit.Test;
import org.scalatest.junit.JUnitSuite;
import scala.concurrent.duration.Duration;
import akka.testkit.TestProbe;
// #import
import akka.contrib.pattern.ReliableProxy;
// #import
public class ReliableProxyTest extends JUnitSuite {
@ClassRule
public static AkkaJUnitActorSystemResource actorSystemResource =
new AkkaJUnitActorSystemResource("ReliableProxyTest");
private final ActorSystem system = actorSystemResource.getSystem();
public // #demo-proxy
static class ProxyParent extends AbstractActor {
private final ActorRef proxy;
public ProxyParent(ActorPath targetPath) {
proxy =
getContext()
.actorOf(
ReliableProxy.props(targetPath, Duration.create(100, TimeUnit.MILLISECONDS)));
}
@Override
public Receive createReceive() {
return receiveBuilder()
.matchEquals(
"hello",
m -> {
proxy.tell("world!", self());
})
.build();
}
}
// #demo-proxy
public // #demo-transition
static class ProxyTransitionParent extends AbstractActor {
private final ActorRef proxy;
private ActorRef client = null;
public ProxyTransitionParent(ActorPath targetPath) {
proxy =
getContext()
.actorOf(
ReliableProxy.props(targetPath, Duration.create(100, TimeUnit.MILLISECONDS)));
proxy.tell(new FSM.SubscribeTransitionCallBack(getSelf()), getSelf());
}
@Override
public Receive createReceive() {
return receiveBuilder()
.matchEquals(
"hello",
message -> {
proxy.tell("world!", self());
client = sender();
})
.matchUnchecked(
FSM.CurrentState.class,
(FSM.CurrentState<ReliableProxy.State> state) -> {
// get initial state
})
.matchUnchecked(
FSM.Transition.class,
(FSM.Transition<ReliableProxy.State> transition) -> {
assert transition.fsmRef().equals(proxy);
if (transition.from().equals(ReliableProxy.active())
&& transition.to().equals(ReliableProxy.idle())) {
client.tell("done", self());
}
})
.build();
}
}
// #demo-transition
@Test
public void demonstrateUsage() {
final TestProbe probe = TestProbe.apply(system);
final ActorRef target = probe.ref();
final ActorRef parent = system.actorOf(Props.create(ProxyParent.class, target.path()));
parent.tell("hello", null);
probe.expectMsg("world!");
}
@Test
public void demonstrateTransitions() {
final ActorRef target = TestProbe.apply(system).ref();
final ActorRef parent =
system.actorOf(Props.create(ProxyTransitionParent.class, target.path()));
final TestProbe probe = TestProbe.apply(system);
parent.tell("hello", probe.ref());
probe.expectMsg("done");
}
}

View file

@ -1,78 +0,0 @@
/*
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.contrib.throttle;
import org.junit.ClassRule;
import org.junit.Test;
import java.util.concurrent.TimeUnit;
import org.scalatest.junit.JUnitSuite;
import scala.concurrent.duration.Duration;
import com.typesafe.config.ConfigFactory;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.AbstractActor;
import akka.testkit.AkkaJUnitActorSystemResource;
public class TimerBasedThrottlerTest extends JUnitSuite {
@ClassRule
public static AkkaJUnitActorSystemResource actorSystemResource =
new AkkaJUnitActorSystemResource(
"TimerBasedThrottlerTest", ConfigFactory.parseString("akka.log-dead-letters=off"));
private final ActorSystem system = actorSystemResource.getSystem();
@Test
public void demonstrateUsage() {
// #demo-code
// A simple actor that prints whatever it receives
ActorRef printer = system.actorOf(Props.create(Printer.class));
// The throttler for this example, setting the rate
ActorRef throttler =
system.actorOf(
Props.create(
TimerBasedThrottler.class,
new Throttler.Rate(3, Duration.create(1, TimeUnit.SECONDS))));
// Set the target
throttler.tell(new Throttler.SetTarget(printer), null);
// These three messages will be sent to the target immediately
throttler.tell("1", null);
throttler.tell("2", null);
throttler.tell("3", null);
// These two will wait until a second has passed
throttler.tell("4", null);
throttler.tell("5", null);
// #demo-code
}
public // #demo-code
// A simple actor that prints whatever it receives
static class Printer extends AbstractActor {
@Override
public Receive createReceive() {
return receiveBuilder()
.matchAny(
message -> {
System.out.println(message);
})
.build();
}
}
// #demo-code
static class System {
static Out out = new Out();
static class Out {
void println(Object s) {}
}
}
}

View file

@ -1,8 +0,0 @@
akka {
actor {
serialize-creators = on
serialize-messages = on
warn-about-java-serializer-usage = off
}
remote.netty.tcp.port = 0
}

View file

@ -1,202 +0,0 @@
/*
* Copyright (C) 2014-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.contrib.circuitbreaker.sample
import akka.actor.{ Actor, ActorLogging, ActorRef }
import akka.contrib.circuitbreaker.CircuitBreakerProxy.{ CircuitBreakerPropsBuilder, CircuitOpenFailure }
import akka.contrib.circuitbreaker.sample.CircuitBreaker.AskFor
import akka.util.Timeout
import scala.concurrent.duration._
import scala.util.{ Failure, Random, Success }
//#simple-service
object SimpleService {
case class Request(content: String)
case class Response(content: Either[String, String])
case object ResetCount
}
/**
* This is a simple actor simulating a service
* - Becoming slower with the increase of frequency of input requests
* - Failing around 30% of the requests
*/
class SimpleService extends Actor with ActorLogging {
import SimpleService._
var messageCount = 0
import context.dispatcher
context.system.scheduler.schedule(1.second, 1.second, self, ResetCount)
override def receive = {
case ResetCount =>
messageCount = 0
case Request(content) =>
messageCount += 1
// simulate workload
Thread.sleep(100 * messageCount)
// Fails around 30% of the times
if (Random.nextInt(100) < 70) {
sender ! Response(Right(s"Successfully processed $content"))
} else {
sender ! Response(Left(s"Failure processing $content"))
}
}
}
//#simple-service
object CircuitBreaker {
case class AskFor(what: String)
}
//#basic-sample
class CircuitBreaker(potentiallyFailingService: ActorRef) extends Actor with ActorLogging {
import SimpleService._
val serviceCircuitBreaker =
context.actorOf(
CircuitBreakerPropsBuilder(maxFailures = 3, callTimeout = 2.seconds, resetTimeout = 30.seconds)
.copy(failureDetector = {
_ match {
case Response(Left(_)) => true
case _ => false
}
})
.props(potentiallyFailingService),
"serviceCircuitBreaker")
override def receive: Receive = {
case AskFor(requestToForward) =>
serviceCircuitBreaker ! Request(requestToForward)
case Right(Response(content)) =>
//handle response
log.info("Got successful response {}", content)
case Response(Right(content)) =>
//handle response
log.info("Got successful response {}", content)
case Response(Left(content)) =>
//handle response
log.info("Got failed response {}", content)
case CircuitOpenFailure(failedMsg) =>
log.warning("Unable to send message {}", failedMsg)
}
}
//#basic-sample
//#ask-sample
class CircuitBreakerAsk(potentiallyFailingService: ActorRef) extends Actor with ActorLogging {
import SimpleService._
import akka.pattern._
implicit val askTimeout: Timeout = 2.seconds
val serviceCircuitBreaker =
context.actorOf(
CircuitBreakerPropsBuilder(maxFailures = 3, callTimeout = askTimeout, resetTimeout = 30.seconds)
.copy(failureDetector = {
_ match {
case Response(Left(_)) => true
case _ => false
}
})
.copy(openCircuitFailureConverter = { failure =>
Left(s"Circuit open when processing ${failure.failedMsg}")
})
.props(potentiallyFailingService),
"serviceCircuitBreaker")
import context.dispatcher
override def receive: Receive = {
case AskFor(requestToForward) =>
(serviceCircuitBreaker ? Request(requestToForward)).mapTo[Either[String, String]].onComplete {
case Success(Right(successResponse)) =>
//handle response
log.info("Got successful response {}", successResponse)
case Success(Left(failureResponse)) =>
//handle response
log.info("Got successful response {}", failureResponse)
case Failure(exception) =>
//handle response
log.info("Got successful response {}", exception)
}
}
}
//#ask-sample
//#ask-with-failure-sample
class CircuitBreakerAskWithFailure(potentiallyFailingService: ActorRef) extends Actor with ActorLogging {
import SimpleService._
import akka.pattern._
import akka.contrib.circuitbreaker.Implicits.futureExtensions
implicit val askTimeout: Timeout = 2.seconds
val serviceCircuitBreaker =
context.actorOf(
CircuitBreakerPropsBuilder(maxFailures = 3, callTimeout = askTimeout, resetTimeout = 30.seconds)
.props(target = potentiallyFailingService),
"serviceCircuitBreaker")
import context.dispatcher
override def receive: Receive = {
case AskFor(requestToForward) =>
(serviceCircuitBreaker ? Request(requestToForward)).failForOpenCircuit.mapTo[String].onComplete {
case Success(successResponse) =>
//handle response
log.info("Got successful response {}", successResponse)
case Failure(exception) =>
//handle response
log.info("Got successful response {}", exception)
}
}
}
//#ask-with-failure-sample
//#ask-with-circuit-breaker-sample
class CircuitBreakerAskWithCircuitBreaker(potentiallyFailingService: ActorRef) extends Actor with ActorLogging {
import SimpleService._
import akka.contrib.circuitbreaker.Implicits.askWithCircuitBreaker
implicit val askTimeout: Timeout = 2.seconds
val serviceCircuitBreaker =
context.actorOf(
CircuitBreakerPropsBuilder(maxFailures = 3, callTimeout = askTimeout, resetTimeout = 30.seconds)
.props(target = potentiallyFailingService),
"serviceCircuitBreaker")
import context.dispatcher
override def receive: Receive = {
case AskFor(requestToForward) =>
serviceCircuitBreaker.askWithCircuitBreaker(Request(requestToForward)).mapTo[String].onComplete {
case Success(successResponse) =>
//handle response
log.info("Got successful response {}", successResponse)
case Failure(exception) =>
//handle response
log.info("Got successful response {}", exception)
}
}
}
//#ask-with-circuit-breaker-sample

View file

@ -1,78 +0,0 @@
/*
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.contrib.jul
import com.typesafe.config.ConfigFactory
import akka.actor.{ Actor, ActorLogging, Props }
import akka.testkit.AkkaSpec
import java.util.logging
object JavaLoggerSpec {
val config = ConfigFactory.parseString("""
akka {
loglevel = INFO
loggers = ["akka.contrib.jul.JavaLogger"]
}""")
class LogProducer extends Actor with ActorLogging {
def receive = {
case e: Exception =>
log.error(e, e.getMessage)
case (s: String, x: Int) =>
log.info(s, x)
}
}
}
class JavaLoggerSpec extends AkkaSpec(JavaLoggerSpec.config) {
val logger = logging.Logger.getLogger("akka://JavaLoggerSpec/user/log")
logger.setUseParentHandlers(false) // turn off output of test LogRecords
logger.addHandler(new logging.Handler {
def publish(record: logging.LogRecord): Unit = {
testActor ! record
}
def flush(): Unit = {}
def close(): Unit = {}
})
val producer = system.actorOf(Props[JavaLoggerSpec.LogProducer], name = "log")
"JavaLogger" must {
"log error with stackTrace" in {
producer ! new RuntimeException("Simulated error")
val record = expectMsgType[logging.LogRecord]
record should not be (null)
record.getMillis should not be (0)
record.getThreadID should not be (0)
record.getLevel should ===(logging.Level.SEVERE)
record.getMessage should ===("Simulated error")
record.getThrown.isInstanceOf[RuntimeException] should ===(true)
record.getSourceClassName should ===(classOf[JavaLoggerSpec.LogProducer].getName)
record.getSourceMethodName should ===(null)
}
"log info without stackTrace" in {
producer ! (("{} is the magic number", 3))
val record = expectMsgType[logging.LogRecord]
record should not be (null)
record.getMillis should not be (0)
record.getThreadID should not be (0)
record.getLevel should ===(logging.Level.INFO)
record.getMessage should ===("3 is the magic number")
record.getThrown should ===(null)
record.getSourceClassName should ===(classOf[JavaLoggerSpec.LogProducer].getName)
record.getSourceMethodName should ===(null)
}
}
}

View file

@ -1,133 +0,0 @@
/*
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.contrib.mailbox
import com.typesafe.config.ConfigFactory
import akka.actor.{ actorRef2Scala, Actor, ActorSystem, DeadLetter, PoisonPill, Props }
import akka.testkit.{ AkkaSpec, EventFilter, ImplicitSender }
object PeekMailboxSpec {
case object Check
case object DoubleAck
class PeekActor(tries: Int) extends Actor {
var togo = tries
def receive = {
case Check =>
sender() ! Check
PeekMailboxExtension.ack()
case DoubleAck =>
PeekMailboxExtension.ack()
PeekMailboxExtension.ack()
case msg =>
sender() ! msg
if (togo == 0) throw new RuntimeException("DONTWANNA")
togo -= 1
PeekMailboxExtension.ack()
}
override def preRestart(cause: Throwable, msg: Option[Any]): Unit = {
for (m <- msg if m == "DIE") context.stop(self) // for testing the case of mailbox.cleanUp
}
}
}
class PeekMailboxSpec extends AkkaSpec("""
peek-dispatcher {
mailbox-type = "akka.contrib.mailbox.PeekMailboxType"
max-retries = 2
}
""") with ImplicitSender {
import PeekMailboxSpec._
"A PeekMailbox" must {
"retry messages" in {
val a = system.actorOf(Props(classOf[PeekActor], 1).withDispatcher("peek-dispatcher"))
a ! "hello"
expectMsg("hello")
EventFilter[RuntimeException]("DONTWANNA", occurrences = 1).intercept {
a ! "world"
}
expectMsg("world")
expectMsg("world")
a ! Check
expectMsg(Check)
}
"put a bound on retries" in {
val a = system.actorOf(Props(classOf[PeekActor], 0).withDispatcher("peek-dispatcher"))
EventFilter[RuntimeException]("DONTWANNA", occurrences = 3).intercept {
a ! "hello"
}
a ! Check
expectMsg("hello")
expectMsg("hello")
expectMsg("hello")
expectMsg(Check)
}
"not waste messages on double-ack()" in {
val a = system.actorOf(Props(classOf[PeekActor], 0).withDispatcher("peek-dispatcher"))
a ! DoubleAck
a ! Check
expectMsg(Check)
}
"support cleanup" in {
system.eventStream.subscribe(testActor, classOf[DeadLetter])
val a = system.actorOf(Props(classOf[PeekActor], 0).withDispatcher("peek-dispatcher"))
watch(a)
EventFilter[RuntimeException]("DONTWANNA", occurrences = 1).intercept {
a ! "DIE" // stays in the mailbox
}
expectMsg("DIE")
expectMsgType[DeadLetter].message should ===("DIE")
expectTerminated(a)
}
}
}
//#demo
class MyActor extends Actor {
def receive = {
case msg =>
println(msg)
doStuff(msg) // may fail
PeekMailboxExtension.ack()
}
//#business-logic-elided
var i = 0
def doStuff(m: Any): Unit = {
if (i == 1) throw new Exception("DONTWANNA")
i += 1
}
override def postStop(): Unit = {
context.system.terminate()
}
//#business-logic-elided
}
object MyApp extends App {
val system = ActorSystem(
"MySystem",
ConfigFactory.parseString("""
peek-dispatcher {
mailbox-type = "akka.contrib.mailbox.PeekMailboxType"
max-retries = 2
}
"""))
val myActor = system.actorOf(Props[MyActor].withDispatcher("peek-dispatcher"), name = "myActor")
myActor ! "Hello"
myActor ! "World"
myActor ! PoisonPill
}
//#demo

View file

@ -1,396 +0,0 @@
/*
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.contrib.pattern
import akka.testkit.{ ImplicitSender, TestKit }
import org.scalatest.FunSuiteLike
import org.scalatest.Matchers
import scala.annotation.tailrec
import scala.collection._
import scala.concurrent.duration._
import scala.math.BigDecimal.int2bigDecimal
import akka.actor._
import org.scalatest.BeforeAndAfterAll
/**
* Sample and test code for the aggregator patter.
* This is based on Jamie Allen's tutorial at
* http://jaxenter.com/tutorial-asynchronous-programming-with-akka-actors-46220.html
*/
sealed trait AccountType
case object Checking extends AccountType
case object Savings extends AccountType
case object MoneyMarket extends AccountType
final case class GetCustomerAccountBalances(id: Long, accountTypes: Set[AccountType])
final case class GetAccountBalances(id: Long)
final case class AccountBalances(accountType: AccountType, balance: Option[List[(Long, BigDecimal)]])
final case class CheckingAccountBalances(balances: Option[List[(Long, BigDecimal)]])
final case class SavingsAccountBalances(balances: Option[List[(Long, BigDecimal)]])
final case class MoneyMarketAccountBalances(balances: Option[List[(Long, BigDecimal)]])
case object TimedOut
case object CantUnderstand
class SavingsAccountProxy extends Actor {
def receive = {
case GetAccountBalances(id: Long) =>
sender() ! SavingsAccountBalances(Some(List((1, 150000), (2, 29000))))
}
}
class CheckingAccountProxy extends Actor {
def receive = {
case GetAccountBalances(id: Long) =>
sender() ! CheckingAccountBalances(Some(List((3, 15000))))
}
}
class MoneyMarketAccountProxy extends Actor {
def receive = {
case GetAccountBalances(id: Long) =>
sender() ! MoneyMarketAccountBalances(None)
}
}
class AccountBalanceRetriever extends Actor with Aggregator {
import context._
//#initial-expect
expectOnce {
case GetCustomerAccountBalances(id, types) =>
new AccountAggregator(sender(), id, types)
case _ =>
sender() ! CantUnderstand
context.stop(self)
}
//#initial-expect
class AccountAggregator(originalSender: ActorRef, id: Long, types: Set[AccountType]) {
val results =
mutable.ArrayBuffer.empty[(AccountType, Option[List[(Long, BigDecimal)]])]
if (types.size > 0)
types.foreach {
case Checking => fetchCheckingAccountsBalance()
case Savings => fetchSavingsAccountsBalance()
case MoneyMarket => fetchMoneyMarketAccountsBalance()
} else collectBalances() // Empty type list yields empty response
context.system.scheduler.scheduleOnce(1.second, self, TimedOut)
//#expect-timeout
expect {
case TimedOut => collectBalances(force = true)
}
//#expect-timeout
//#expect-balance
def fetchCheckingAccountsBalance(): Unit = {
context.actorOf(Props[CheckingAccountProxy]) ! GetAccountBalances(id)
expectOnce {
case CheckingAccountBalances(balances) =>
results += (Checking -> balances)
collectBalances()
}
}
//#expect-balance
def fetchSavingsAccountsBalance(): Unit = {
context.actorOf(Props[SavingsAccountProxy]) ! GetAccountBalances(id)
expectOnce {
case SavingsAccountBalances(balances) =>
results += (Savings -> balances)
collectBalances()
}
}
def fetchMoneyMarketAccountsBalance(): Unit = {
context.actorOf(Props[MoneyMarketAccountProxy]) ! GetAccountBalances(id)
expectOnce {
case MoneyMarketAccountBalances(balances) =>
results += (MoneyMarket -> balances)
collectBalances()
}
}
def collectBalances(force: Boolean = false): Unit = {
if (results.size == types.size || force) {
originalSender ! results.toList // Make sure it becomes immutable
context.stop(self)
}
}
}
}
//#demo-code
//#chain-sample
final case class InitialRequest(name: String)
final case class Request(name: String)
final case class Response(name: String, value: String)
final case class EvaluationResults(name: String, eval: List[Int])
final case class FinalResponse(qualifiedValues: List[String])
/**
* An actor sample demonstrating use of unexpect and chaining.
* This is just an example and not a complete test case.
*/
class ChainingSample extends Actor with Aggregator {
expectOnce {
case InitialRequest(name) => new MultipleResponseHandler(sender(), name)
}
class MultipleResponseHandler(originalSender: ActorRef, propName: String) {
import context.dispatcher
import collection.mutable.ArrayBuffer
val values = ArrayBuffer.empty[String]
context.actorSelection("/user/request_proxies") ! Request(propName)
context.system.scheduler.scheduleOnce(50.milliseconds, self, TimedOut)
//#unexpect-sample
val handle = expect {
case Response(name, value) =>
values += value
if (values.size > 3) processList()
case TimedOut => processList()
}
def processList(): Unit = {
unexpect(handle)
if (values.size > 0) {
context.actorSelection("/user/evaluator") ! values.toList
expectOnce {
case EvaluationResults(name, eval) => processFinal(eval)
}
} else processFinal(List.empty[Int])
}
//#unexpect-sample
def processFinal(eval: List[Int]): Unit = {
// Select only the entries coming back from eval
originalSender ! FinalResponse(eval.map(values))
context.stop(self)
}
}
}
//#chain-sample
class AggregatorSpec
extends TestKit(ActorSystem("AggregatorSpec"))
with ImplicitSender
with FunSuiteLike
with Matchers
with BeforeAndAfterAll {
override def afterAll(): Unit = {
shutdown()
}
test("Test request 1 account type") {
system.actorOf(Props[AccountBalanceRetriever]) ! GetCustomerAccountBalances(1, Set(Savings))
receiveOne(10.seconds) match {
case result: List[_] =>
result should have size 1
case result =>
assert(false, s"Expect List, got ${result.getClass}")
}
}
test("Test request 3 account types") {
system.actorOf(Props[AccountBalanceRetriever]) !
GetCustomerAccountBalances(1, Set(Checking, Savings, MoneyMarket))
receiveOne(10.seconds) match {
case result: List[_] =>
result should have size 3
case result =>
assert(false, s"Expect List, got ${result.getClass}")
}
}
}
final case class TestEntry(id: Int)
class WorkListSpec extends FunSuiteLike {
val workList = WorkList.empty[TestEntry]
var entry2: TestEntry = null
var entry4: TestEntry = null
test("Processing empty WorkList") {
// ProcessAndRemove something in the middle
val processed = workList.process {
case TestEntry(9) => true
case _ => false
}
assert(!processed)
}
test("Insert temp entries") {
assert(workList.head === workList.tail)
val entry0 = TestEntry(0)
workList.add(entry0, permanent = false)
assert(workList.head.next != null)
assert(workList.tail === workList.head.next)
assert(workList.tail.ref.get === entry0)
val entry1 = TestEntry(1)
workList.add(entry1, permanent = false)
assert(workList.head.next != workList.tail)
assert(workList.head.next.ref.get === entry0)
assert(workList.tail.ref.get === entry1)
entry2 = TestEntry(2)
workList.add(entry2, permanent = false)
assert(workList.tail.ref.get === entry2)
val entry3 = TestEntry(3)
workList.add(entry3, permanent = false)
assert(workList.tail.ref.get === entry3)
}
test("Process temp entries") {
// ProcessAndRemove something in the middle
assert(workList.process {
case TestEntry(2) => true
case _ => false
})
// ProcessAndRemove the head
assert(workList.process {
case TestEntry(0) => true
case _ => false
})
// ProcessAndRemove the tail
assert(workList.process {
case TestEntry(3) => true
case _ => false
})
}
test("Re-insert permanent entry") {
entry4 = TestEntry(4)
workList.add(entry4, permanent = true)
assert(workList.tail.ref.get === entry4)
}
test("Process permanent entry") {
assert(workList.process {
case TestEntry(4) => true
case _ => false
})
}
test("Remove permanent entry") {
val removed = workList.remove(entry4)
assert(removed)
}
test("Remove temp entry already processed") {
val removed = workList.remove(entry2)
assert(!removed)
}
test("Process non-matching entries") {
val processed =
workList.process {
case TestEntry(2) => true
case _ => false
}
assert(!processed)
val processed2 =
workList.process {
case TestEntry(5) => true
case _ => false
}
assert(!processed2)
}
test("Append two lists") {
workList.removeAll()
(0 to 4).foreach { id =>
workList.add(TestEntry(id), permanent = false)
}
val l2 = new WorkList[TestEntry]
(5 to 9).foreach { id =>
l2.add(TestEntry(id), permanent = true)
}
workList.addAll(l2)
@tailrec
def checkEntries(id: Int, entry: WorkList.Entry[TestEntry]): Int = {
if (entry == null) id
else {
assert(entry.ref.get.id === id)
checkEntries(id + 1, entry.next)
}
}
assert(checkEntries(0, workList.head.next) === 10)
}
test("Clear list") {
workList.removeAll()
assert(workList.head.next === null)
assert(workList.tail === workList.head)
}
val workList2 = WorkList.empty[PartialFunction[Any, Unit]]
val fn1: PartialFunction[Any, Unit] = {
case s: String =>
val result1 = workList2.remove(fn1)
assert(result1 === true, "First remove must return true")
val result2 = workList2.remove(fn1)
assert(result2 === false, "Second remove must return false")
}
val fn2: PartialFunction[Any, Unit] = {
case s: String =>
workList2.add(fn1, permanent = true)
}
test("Reentrant insert") {
workList2.add(fn2, permanent = false)
assert(workList2.head.next != null)
assert(workList2.tail == workList2.head.next)
// Processing inserted fn1, reentrant adding fn2
workList2.process { fn =>
var processed = true
fn.applyOrElse("Foo", (_: Any) => processed = false)
processed
}
}
test("Reentrant delete") {
// Processing inserted fn2, should delete itself
workList2.process { fn =>
var processed = true
fn.applyOrElse("Foo", (_: Any) => processed = false)
processed
}
}
}

View file

@ -1,498 +0,0 @@
/*
* Copyright (C) 2018-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.contrib.pattern
import akka.actor.{ Actor, Props }
import akka.persistence.{ PersistentActor }
import akka.testkit.{ AkkaSpec, ImplicitSender }
import com.typesafe.config.{ Config, ConfigFactory }
import scala.concurrent.duration._
import akka.testkit.TestProbe
import akka.actor.ActorLogging
object ReceivePipelineSpec {
import ReceivePipeline._
class ReplierActor extends Actor with ReceivePipeline {
def receive: Actor.Receive = becomeAndReply
def becomeAndReply: Actor.Receive = {
case "become" => context.become(justReply)
case m => sender ! m
}
def justReply: Actor.Receive = {
case m => sender ! m
}
}
class IntReplierActor(max: Int) extends Actor with ReceivePipeline {
def receive: Actor.Receive = {
case m: Int if (m <= max) => sender ! m
}
}
class TotallerActor extends Actor with ReceivePipeline {
var total = 0
def receive: Actor.Receive = {
case m: Int => total += m
case "get" => sender ! total
}
}
case class IntList(l: List[Int]) {
override def toString: String = s"IntList(${l.mkString(", ")})"
}
trait ListBuilderInterceptor {
this: ReceivePipeline =>
pipelineOuter {
case n: Int => Inner(IntList((n until n + 3).toList))
}
}
trait AdderInterceptor {
this: ReceivePipeline =>
pipelineInner {
case n: Int => Inner(n + 10)
case IntList(l) => Inner(IntList(l.map(_ + 10)))
case "explicitly ignored" => HandledCompletely
}
}
trait ToStringInterceptor {
this: ReceivePipeline =>
pipelineInner {
case i: Int => Inner(i.toString)
case IntList(l) => Inner(l.toString)
case other: Iterable[_] => Inner(other.toString)
}
}
trait OddDoublerInterceptor {
this: ReceivePipeline =>
pipelineInner {
case i: Int if (i % 2 != 0) => Inner(i * 2)
}
}
trait EvenHalverInterceptor {
this: ReceivePipeline =>
pipelineInner {
case i: Int if (i % 2 == 0) => Inner(i / 2)
}
}
trait Timer {
this: ReceivePipeline =>
def notifyDuration(duration: Long): Unit
pipelineInner {
case msg: Any =>
val start = 1L // = currentTimeMillis
Inner(msg).andAfter {
val end = 100L // = currentTimeMillis
notifyDuration(end - start)
}
}
}
}
class ReceivePipelineSpec extends AkkaSpec with ImplicitSender {
import ReceivePipelineSpec._
"A ReceivePipeline" must {
"just invoke Actor's behavior when it's empty" in {
val replier = system.actorOf(Props[ReplierActor])
replier ! 3
expectMsg(3)
}
"invoke decorated Actor's behavior when has one interceptor" in {
val replier = system.actorOf(Props(new ReplierActor with AdderInterceptor))
replier ! 5
expectMsg(15)
}
"support any number of interceptors" in {
val replier = system.actorOf(
Props(new ReplierActor with ListBuilderInterceptor with AdderInterceptor with ToStringInterceptor))
replier ! 8
expectMsg("List(18, 19, 20)")
}
"delegate messages unhandled by interceptors to the inner behavior" in {
val replier = system.actorOf(
Props(new ReplierActor with ListBuilderInterceptor with AdderInterceptor with ToStringInterceptor))
replier ! 8L // unhandled by all interceptors but still replied
expectMsg(8L)
replier ! Set(8f) // unhandled by all but ToString Interceptor, so replied as String
expectMsg("Set(8.0)")
}
"let any interceptor to explicitly ignore some messages" in {
val replier = system.actorOf(
Props(new ReplierActor with ListBuilderInterceptor with AdderInterceptor with ToStringInterceptor))
replier ! "explicitly ignored"
replier ! 8L // unhandled by all interceptors but still replied
expectMsg(8L)
}
"support changing behavior without losing the interceptions" in {
val replier = system.actorOf(
Props(new ReplierActor with ListBuilderInterceptor with AdderInterceptor with ToStringInterceptor))
replier ! 8
expectMsg("List(18, 19, 20)")
replier ! "become"
replier ! 3
expectMsg("List(13, 14, 15)")
}
"support swapping inner and outer interceptors mixin order" in {
val outerInnerReplier = system.actorOf(Props(new ReplierActor with ListBuilderInterceptor with AdderInterceptor))
val innerOuterReplier = system.actorOf(Props(new ReplierActor with AdderInterceptor with ListBuilderInterceptor))
outerInnerReplier ! 4
expectMsg(IntList(List(14, 15, 16)))
innerOuterReplier ! 6
expectMsg(IntList(List(16, 17, 18)))
}
}
}
object PersistentReceivePipelineSpec {
class PersistentReplierActor extends PersistentActor with ReceivePipeline {
override def persistenceId: String = "p-1"
def becomeAndReply: Actor.Receive = {
case "become" => context.become(justReply)
case m => sender ! m
}
def justReply: Actor.Receive = {
case m => sender ! m
}
override def receiveCommand: Receive = becomeAndReply
override def receiveRecover: Receive = {
case _ => // ...
}
}
}
class PersistentReceivePipelineSpec(config: Config) extends AkkaSpec(config) with ImplicitSender {
import ReceivePipelineSpec._
import PersistentReceivePipelineSpec._
def this() {
this(ConfigFactory.parseString(s"""
|akka.persistence.journal.plugin = "akka.persistence.journal.inmem"
|akka.persistence.journal.leveldb.dir = "target/journal-${getClass.getSimpleName}"
""".stripMargin))
}
"A PersistentActor with ReceivePipeline" must {
"support any number of interceptors" in {
val replier = system.actorOf(
Props(new PersistentReplierActor with ListBuilderInterceptor with AdderInterceptor with ToStringInterceptor))
replier ! 8
expectMsg("List(18, 19, 20)")
}
"allow messages explicitly passed on by interceptors to be handled by the actor" in {
val replier = system.actorOf(Props(new IntReplierActor(10) with EvenHalverInterceptor with OddDoublerInterceptor))
// 6 -> 3 -> 6
replier ! 6
expectMsg(6)
}
"allow messages not handled by some interceptors to be handled by the actor" in {
val replier = system.actorOf(Props(new IntReplierActor(10) with EvenHalverInterceptor with OddDoublerInterceptor))
// 8 -> 4 ( -> not handled by OddDoublerInterceptor)
replier ! 8
expectMsg(4)
}
"allow messages explicitly passed on by interceptors but not handled by the actor to be treated as unhandled" in {
val probe = new TestProbe(system)
val probeRef = probe.ref
val replier = system.actorOf(Props(new IntReplierActor(10) with EvenHalverInterceptor with OddDoublerInterceptor {
override def unhandled(message: Any) = probeRef ! message
}))
// 22 -> 11 -> 22 but > 10 so not handled in main receive: falls back to unhandled implementation...
replier ! 22
probe.expectMsg(22)
}
"allow messages not handled by some interceptors or by the actor to be treated as unhandled" in {
val probe = new TestProbe(system)
val probeRef = probe.ref
val replier = system.actorOf(Props(new IntReplierActor(10) with EvenHalverInterceptor with OddDoublerInterceptor {
override def unhandled(message: Any) = probeRef ! message
}))
// 11 ( -> not handled by EvenHalverInterceptor) -> 22 but > 10 so not handled in main receive:
// original message falls back to unhandled implementation...
replier ! 11
probe.expectMsg(11)
}
"allow messages not handled by any interceptors or by the actor to be treated as unhandled" in {
val probe = new TestProbe(system)
val probeRef = probe.ref
val replier = system.actorOf(Props(new IntReplierActor(10) with EvenHalverInterceptor with OddDoublerInterceptor {
override def unhandled(message: Any) = probeRef ! message
}))
replier ! "hi there!"
probe.expectMsg("hi there!")
}
"not treat messages handled by the actor as unhandled" in {
val probe = new TestProbe(system)
val probeRef = probe.ref
val replier = system.actorOf(Props(new IntReplierActor(10) with EvenHalverInterceptor with OddDoublerInterceptor {
override def unhandled(message: Any) = probeRef ! message
}))
replier ! 4
expectMsg(2)
probe.expectNoMsg(100.millis)
}
"continue to handle messages normally after unhandled messages" in {
val probe = new TestProbe(system)
val probeRef = probe.ref
val replier = system.actorOf(Props(new IntReplierActor(10) with EvenHalverInterceptor with OddDoublerInterceptor {
override def unhandled(message: Any) = probeRef ! message
}))
replier ! "hi there!"
replier ! 8
probe.expectMsg("hi there!")
expectMsg(4)
}
"call side-effecting receive code only once" in {
val totaller = system.actorOf(Props(new TotallerActor with EvenHalverInterceptor with OddDoublerInterceptor))
totaller ! 8
totaller ! 6
totaller ! "get"
expectMsg(10)
}
"not cache the result of the same message" in {
val totaller = system.actorOf(Props(new TotallerActor with EvenHalverInterceptor with OddDoublerInterceptor))
totaller ! 6
totaller ! 6
totaller ! "get"
expectMsg(12)
}
"run code in 'after' block" in {
val probe = new TestProbe(system)
val probeRef = probe.ref
val totaller = system.actorOf(Props(new TotallerActor with Timer {
def notifyDuration(d: Long) = probeRef ! d
}))
totaller ! 6
totaller ! "get"
expectMsg(6)
probe.expectMsg(99L)
}
}
}
// Just compiling code samples for documentation. Not intended to be tests.
object InActorSample extends App {
import ReceivePipeline._
import akka.actor.ActorSystem
val system = ActorSystem("pipeline")
val actor = system.actorOf(Props[PipelinedActor]())
//#in-actor
class PipelinedActor extends Actor with ReceivePipeline {
// Increment
pipelineInner { case i: Int => Inner(i + 1) }
// Double
pipelineInner { case i: Int => Inner(i * 2) }
def receive: Receive = { case any => println(any) }
}
actor ! 5 // prints 12 = (5 + 1) * 2
//#in-actor
val withOuterActor = system.actorOf(Props[PipelinedOuterActor]())
class PipelinedOuterActor extends Actor with ReceivePipeline {
//#in-actor-outer
// Increment
pipelineInner { case i: Int => Inner(i + 1) }
// Double
pipelineOuter { case i: Int => Inner(i * 2) }
// prints 11 = (5 * 2) + 1
//#in-actor-outer
def receive: Receive = { case any => println(any) }
}
withOuterActor ! 5
}
object InterceptorSamples {
import ReceivePipeline._
//#interceptor-sample1
val incrementInterceptor: Interceptor = {
case i: Int => Inner(i + 1)
}
//#interceptor-sample1
def logTimeTaken(time: Long) = ???
//#interceptor-sample2
val timerInterceptor: Interceptor = {
case e =>
val start = System.nanoTime
Inner(e).andAfter {
val end = System.nanoTime
logTimeTaken(end - start)
}
}
//#interceptor-sample2
}
object MixinSample extends App {
import ReceivePipeline._
import akka.actor.{ ActorSystem, Props }
val system = ActorSystem("pipeline")
//#mixin-model
val texts = Map(
"that.rug_EN" -> "That rug really tied the room together.",
"your.opinion_EN" -> "Yeah, well, you know, that's just, like, your opinion, man.",
"that.rug_ES" -> "Esa alfombra realmente completaba la sala.",
"your.opinion_ES" -> "Sí, bueno, ya sabes, eso es solo, como, tu opinion, amigo.")
case class I18nText(locale: String, key: String)
case class Message(author: Option[String], text: Any)
//#mixin-model
//#mixin-interceptors
trait I18nInterceptor {
this: ReceivePipeline =>
pipelineInner {
case m @ Message(_, I18nText(loc, key)) =>
Inner(m.copy(text = texts(s"${key}_$loc")))
}
}
trait AuditInterceptor {
this: ReceivePipeline =>
pipelineOuter {
case m @ Message(Some(author), text) =>
println(s"$author is about to say: $text")
Inner(m)
}
}
//#mixin-interceptors
val printerActor = system.actorOf(Props[PrinterActor]())
//#mixin-actor
class PrinterActor extends Actor with ReceivePipeline with I18nInterceptor with AuditInterceptor {
override def receive: Receive = {
case Message(author, text) =>
println(s"${author.getOrElse("Unknown")} says '$text'")
}
}
printerActor ! Message(Some("The Dude"), I18nText("EN", "that.rug"))
// The Dude is about to say: I18nText(EN,that.rug)
// The Dude says 'That rug really tied the room together.'
printerActor ! Message(Some("The Dude"), I18nText("EN", "your.opinion"))
// The Dude is about to say: I18nText(EN,your.opinion)
// The Dude says 'Yeah, well, you know, that's just, like, your opinion, man.'
//#mixin-actor
system.terminate()
}
object UnhandledSample extends App {
import ReceivePipeline._
def isGranted(userId: Long) = true
//#unhandled
case class PrivateMessage(userId: Option[Long], msg: Any)
trait PrivateInterceptor {
this: ReceivePipeline =>
pipelineInner {
case PrivateMessage(Some(userId), msg) =>
if (isGranted(userId))
Inner(msg)
else
HandledCompletely
}
}
//#unhandled
}
object AfterSamples {
import ReceivePipeline._
//#interceptor-after
trait TimerInterceptor extends ActorLogging {
this: ReceivePipeline =>
def logTimeTaken(time: Long) = log.debug(s"Time taken: $time ns")
pipelineOuter {
case e =>
val start = System.nanoTime
Inner(e).andAfter {
val end = System.nanoTime
logTimeTaken(end - start)
}
}
}
//#interceptor-after
}

View file

@ -1,92 +0,0 @@
/*
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.contrib.pattern
import akka.testkit.AkkaSpec
import akka.actor._
import scala.concurrent.duration._
import akka.testkit.TestProbe
object ReliableProxyDocSpec {
//#demo
import akka.contrib.pattern.ReliableProxy
class ProxyParent(targetPath: ActorPath) extends Actor {
val proxy = context.actorOf(ReliableProxy.props(targetPath, 100.millis))
def receive = {
case "hello" => proxy ! "world!"
}
}
//#demo
//#demo-transition
class ProxyTransitionParent(targetPath: ActorPath) extends Actor {
val proxy = context.actorOf(ReliableProxy.props(targetPath, 100.millis))
proxy ! FSM.SubscribeTransitionCallBack(self)
var client: ActorRef = _
def receive = {
case "go" =>
proxy ! 42
client = sender()
case FSM.CurrentState(`proxy`, initial) =>
case FSM.Transition(`proxy`, from, to) =>
if (to == ReliableProxy.Idle)
client ! "done"
}
}
//#demo-transition
class WatchingProxyParent(targetPath: ActorPath) extends Actor {
val proxy = context.watch(
context.actorOf(ReliableProxy.props(targetPath, 100.millis, reconnectAfter = 500.millis, maxReconnects = 3)))
var client: Option[ActorRef] = None
def receive = {
case "hello" =>
proxy ! "world!"
client = Some(sender())
case Terminated(`proxy`) =>
client.foreach { _ ! "terminated" }
}
}
}
class ReliableProxyDocSpec extends AkkaSpec {
import ReliableProxyDocSpec._
"A ReliableProxy" must {
"show usage" in {
val probe = TestProbe()
val a = system.actorOf(Props(classOf[ProxyParent], probe.ref.path))
a.tell("hello", probe.ref)
probe.expectMsg("world!")
}
"show state transitions" in {
val target = TestProbe().ref
val probe = TestProbe()
val a = system.actorOf(Props(classOf[ProxyTransitionParent], target.path))
a.tell("go", probe.ref)
probe.expectMsg("done")
}
"show terminated after maxReconnects" in within(5.seconds) {
val target = system.deadLetters
val probe = TestProbe()
val a = system.actorOf(Props(classOf[WatchingProxyParent], target.path))
a.tell("hello", probe.ref)
probe.expectMsg("terminated")
}
}
}

View file

@ -1,42 +0,0 @@
/*
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.contrib.throttle
import java.util.concurrent.TimeUnit._
import akka.actor.ActorSystem
import akka.contrib.throttle.Throttler._
import akka.testkit.{ TestActorRef, TestKit }
import org.scalatest.WordSpecLike
class TimerBasedThrottleTest extends TestKit(ActorSystem("TimerBasedThrottler")) with WordSpecLike {
"A throttler" must {
"normalize all rates to the highest precision (nanoseconds)" in {
val throttler = TestActorRef(new TimerBasedThrottler(1.msgsPer(1, SECONDS)))
val throttler2 = TestActorRef(new TimerBasedThrottler(5.msgsPer(1, SECONDS)))
val throttler3 = TestActorRef(new TimerBasedThrottler(10.msgsPer(10, MILLISECONDS)))
val throttler4 = TestActorRef(new TimerBasedThrottler(1.msgsPer(1, MINUTES)))
assert(throttler.underlyingActor.rate.duration.toNanos == 1e9)
assert(throttler.underlyingActor.rate.numberOfCalls == 1)
assert(throttler2.underlyingActor.rate.duration.toNanos == 1e9 / 5)
assert(throttler2.underlyingActor.rate.numberOfCalls == 1)
assert(throttler3.underlyingActor.rate.duration.toNanos == 1e6 * 10 / 10) // Convert ms to nanos
assert(throttler3.underlyingActor.rate.numberOfCalls == 1)
assert(throttler4.underlyingActor.rate.duration.toNanos == 1e9 * 60)
assert(throttler4.underlyingActor.rate.numberOfCalls == 1)
}
"handle zero number of calls gracefully" in {
val throttler = TestActorRef(new TimerBasedThrottler(0.msgsPer(1, SECONDS)))
assert(throttler.underlyingActor.rate.duration.toSeconds == 1)
assert(throttler.underlyingActor.rate.numberOfCalls == 0)
}
}
}

View file

@ -1,127 +0,0 @@
/*
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.contrib.throttle
import language.postfixOps
import scala.concurrent.duration._
import akka.actor.ActorSystem
import akka.actor.Actor
import akka.actor.Props
import akka.testkit.TestKit
import akka.contrib.throttle.Throttler._
import org.scalatest.WordSpecLike
import org.scalatest.Matchers
import org.scalatest.BeforeAndAfterAll
import akka.testkit._
object TimerBasedThrottlerSpec {
def println(a: Any) = ()
//#demo-code
// A simple actor that prints whatever it receives
class PrintActor extends Actor {
def receive = {
case x => println(x)
}
}
//#demo-code
}
class TimerBasedThrottlerSpec
extends TestKit(ActorSystem("TimerBasedThrottlerSpec"))
with ImplicitSender
with WordSpecLike
with Matchers
with BeforeAndAfterAll {
import TimerBasedThrottlerSpec._
override def afterAll: Unit = {
shutdown()
}
"A throttler" must {
def println(a: Any) = ()
"pass the ScalaDoc class documentation example program" in {
//#demo-code
val printer = system.actorOf(Props[PrintActor])
// The throttler for this example, setting the rate
val throttler = system.actorOf(Props(classOf[TimerBasedThrottler], 3.msgsPer(1.second)))
// Set the target
throttler ! SetTarget(Some(printer))
// These three messages will be sent to the target immediately
throttler ! "1"
throttler ! "2"
throttler ! "3"
// These two will wait until a second has passed
throttler ! "4"
throttler ! "5"
//#demo-code
}
"keep messages until a target is set" in {
val echo = system.actorOf(TestActors.echoActorProps)
val throttler = system.actorOf(Props(classOf[TimerBasedThrottler], 3.msgsPer(1.second.dilated)))
(1 to 6).foreach { throttler ! _ }
expectNoMsg(1 second)
throttler ! SetTarget(Some(echo))
within(2.5 seconds) {
(1 to 6).foreach { expectMsg(_) }
}
}
"send messages after a `SetTarget(None)` pause" in {
val echo = system.actorOf(TestActors.echoActorProps)
val throttler = system.actorOf(Props(classOf[TimerBasedThrottler], 3.msgsPer(5.second.dilated)))
throttler ! SetTarget(Some(echo))
(1 to 3).foreach { throttler ! _ }
throttler ! SetTarget(None)
within(1.7 second) {
expectMsg(1)
expectNoMsg()
}
expectNoMsg(1 second)
throttler ! SetTarget(Some(echo))
(4 to 7).foreach { throttler ! _ }
within(10.5 seconds) {
(2 to 7).foreach { expectMsg(_) }
}
}
"keep messages when the target is set to None" in {
val echo = system.actorOf(TestActors.echoActorProps)
val throttler = system.actorOf(Props(classOf[TimerBasedThrottler], 3.msgsPer(5.second.dilated)))
throttler ! SetTarget(Some(echo))
(1 to 7).foreach { throttler ! _ }
throttler ! SetTarget(None)
within(1.7 second) {
expectMsg(1)
expectNoMsg()
}
expectNoMsg(1 second)
throttler ! SetTarget(Some(echo))
within(10.5 seconds) {
(2 to 7).foreach { expectMsg(_) }
}
}
"respect the rate (3 msg/s)" in within(1.5 seconds, 2.5 seconds) {
val echo = system.actorOf(TestActors.echoActorProps)
val throttler = system.actorOf(Props(classOf[TimerBasedThrottler], 3.msgsPer(1.second.dilated)))
throttler ! SetTarget(Some(echo))
(1 to 7).foreach { throttler ! _ }
(1 to 7).foreach { expectMsg(_) }
}
"respect the rate (4 msg/s)" in within(1.5 seconds, 2.5 seconds) {
val echo = system.actorOf(TestActors.echoActorProps)
val throttler = system.actorOf(Props(classOf[TimerBasedThrottler], 4.msgsPer(1.second.dilated)))
throttler ! SetTarget(Some(echo))
(1 to 9).foreach { throttler ! _ }
(1 to 9).foreach { expectMsg(_) }
}
}
}

View file

@ -15,6 +15,12 @@ After being deprecated in 2.5.0, the akka-agent module has been removed in 2.6.
If there is interest it may be moved to a separate, community-maintained
repository.
## akka-contrib removed
The akka-contrib module was deprecated in 2.5 and has been removed in 2.6.
To migrate, take the components you are using from [Akka 2.5](https://github.com/akka/akka/tree/release-2.5/akka-contrib)
and include them in your own project or library under your own package name.
## Scala 2.11 no longer supported
If you are still using Scala 2.11 then you must upgrade to 2.12 or 2.13
@ -102,4 +108,4 @@ A full cluster restart is required to change to Artery.
### Passivate idle entity
The configuration `akka.cluster.sharding.passivate-idle-entity-after` is now enabled by default.
Sharding will passivate entities when they have not received any messages after this duration.
Set
Set

View file

@ -48,7 +48,6 @@ lazy val aggregatedProjects: Seq[ProjectReference] = List[ProjectReference](
clusterShardingTyped,
clusterTools,
clusterTyped,
contrib,
coordination,
discovery,
distributedData,
@ -162,26 +161,6 @@ lazy val clusterTools = akkaModule("akka-cluster-tools")
.configs(MultiJvm)
.enablePlugins(MultiNode, ScaladocNoVerificationOfDiagrams)
lazy val contrib = akkaModule("akka-contrib")
.dependsOn(remote, remoteTests % "test->test", cluster, clusterTools, persistence % "compile->compile")
.settings(Dependencies.contrib)
.settings(AutomaticModuleName.settings("akka.contrib"))
.settings(OSGi.contrib)
.settings(description :=
"""|
|This subproject provides a home to modules contributed by external
|developers which may or may not move into the officially supported code
|base over time. A module in this subproject doesn't have to obey the rule
|of staying binary compatible between minor releases. Breaking API changes
|may be introduced in minor releases without notice as we refine and
|simplify based on your feedback. A module may be dropped in any release
|without prior deprecation. The Lightbend subscription does not cover
|support for these modules.
|""".stripMargin)
.configs(MultiJvm)
.enablePlugins(MultiNode, ScaladocNoVerificationOfDiagrams)
.disablePlugins(MimaPlugin)
lazy val distributedData = akkaModule("akka-distributed-data")
.dependsOn(cluster % "compile->compile;test->test;multi-jvm->multi-jvm")
.settings(Dependencies.distributedData)

View file

@ -210,8 +210,6 @@ object Dependencies {
val docs = l ++= Seq(Test.scalatest.value, Test.junit, Docs.sprayJson, Docs.gson, Provided.levelDB)
val contrib = l ++= Seq(Test.commonsIo)
val benchJmh = l ++= Seq(Provided.levelDB, Provided.levelDBNative, Compile.jctools)
// akka stream

View file

@ -56,8 +56,6 @@ object OSGi {
val distributedData = exports(Seq("akka.cluster.ddata.*"))
val contrib = exports(Seq("akka.contrib.*"))
val osgi = exports(Seq("akka.osgi.*"))
val protobuf = exports(Seq("akka.protobuf.*"))