From 282b38e832b0c5b6e51aa6a18151701951b03f46 Mon Sep 17 00:00:00 2001 From: Arnout Engelen Date: Thu, 2 May 2019 12:09:11 +0200 Subject: [PATCH] Remove akka-contrib (#26769) * Remove akka-contrib #26183 This module has been deprecated since 2.5 and is removed in 2.6 --- akka-contrib/README.md | 52 -- .../src/main/resources/reference.conf | 6 - .../circuitbreaker/CircuitBreakerProxy.scala | 316 ----------- .../circuitbreaker/askExtensions.scala | 124 ----- .../scala/akka/contrib/jul/JavaLogger.scala | 125 ----- .../akka/contrib/mailbox/PeekMailbox.scala | 113 ---- .../akka/contrib/pattern/Aggregator.scala | 202 ------- .../contrib/pattern/ReceivePipeline.scala | 125 ----- .../akka/contrib/pattern/ReliableProxy.scala | 407 -------------- .../throttle/TimerBasedThrottler.scala | 320 ----------- .../contrib/pattern/ReliableProxySpec.scala | 388 -------------- .../contrib/pattern/ReliableProxyTest.java | 117 ---- .../throttle/TimerBasedThrottlerTest.java | 78 --- .../src/test/resources/reference.conf | 8 - .../sample/CircuitBreaker.scala | 202 ------- .../akka/contrib/jul/JavaLoggerSpec.scala | 78 --- .../contrib/mailbox/PeekMailboxSpec.scala | 133 ----- .../akka/contrib/pattern/AggregatorSpec.scala | 396 -------------- .../contrib/pattern/ReceivePipelineSpec.scala | 498 ------------------ .../pattern/ReliableProxyDocSpec.scala | 92 ---- .../throttle/TimerBasedThrottleTest.scala | 42 -- .../throttle/TimerBasedThrottlerSpec.scala | 127 ----- .../project/migration-guide-2.5.x-2.6.x.md | 8 +- build.sbt | 21 - project/Dependencies.scala | 2 - project/OSGi.scala | 2 - 26 files changed, 7 insertions(+), 3975 deletions(-) delete mode 100644 akka-contrib/README.md delete mode 100644 akka-contrib/src/main/resources/reference.conf delete mode 100644 akka-contrib/src/main/scala/akka/contrib/circuitbreaker/CircuitBreakerProxy.scala delete mode 100644 akka-contrib/src/main/scala/akka/contrib/circuitbreaker/askExtensions.scala delete mode 100644 akka-contrib/src/main/scala/akka/contrib/jul/JavaLogger.scala delete mode 100644 akka-contrib/src/main/scala/akka/contrib/mailbox/PeekMailbox.scala delete mode 100644 akka-contrib/src/main/scala/akka/contrib/pattern/Aggregator.scala delete mode 100644 akka-contrib/src/main/scala/akka/contrib/pattern/ReceivePipeline.scala delete mode 100644 akka-contrib/src/main/scala/akka/contrib/pattern/ReliableProxy.scala delete mode 100644 akka-contrib/src/main/scala/akka/contrib/throttle/TimerBasedThrottler.scala delete mode 100644 akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ReliableProxySpec.scala delete mode 100644 akka-contrib/src/test/java/akka/contrib/pattern/ReliableProxyTest.java delete mode 100644 akka-contrib/src/test/java/akka/contrib/throttle/TimerBasedThrottlerTest.java delete mode 100644 akka-contrib/src/test/resources/reference.conf delete mode 100644 akka-contrib/src/test/scala/akka/contrib/circuitbreaker/sample/CircuitBreaker.scala delete mode 100644 akka-contrib/src/test/scala/akka/contrib/jul/JavaLoggerSpec.scala delete mode 100644 akka-contrib/src/test/scala/akka/contrib/mailbox/PeekMailboxSpec.scala delete mode 100644 akka-contrib/src/test/scala/akka/contrib/pattern/AggregatorSpec.scala delete mode 100644 akka-contrib/src/test/scala/akka/contrib/pattern/ReceivePipelineSpec.scala delete mode 100644 akka-contrib/src/test/scala/akka/contrib/pattern/ReliableProxyDocSpec.scala delete mode 100644 akka-contrib/src/test/scala/akka/contrib/throttle/TimerBasedThrottleTest.scala delete mode 100644 akka-contrib/src/test/scala/akka/contrib/throttle/TimerBasedThrottlerSpec.scala diff --git a/akka-contrib/README.md b/akka-contrib/README.md deleted file mode 100644 index 8261dc6b25..0000000000 --- a/akka-contrib/README.md +++ /dev/null @@ -1,52 +0,0 @@ -# External Contributions - -This subproject provides a home to modules contributed by external developers -which may or may not move into the officially supported code base over time. -The conditions under which this transition can occur include: - -* there must be enough interest in the module to warrant inclusion in the standard distribution, -* the module must be actively maintained and -* code quality must be good enough to allow efficient maintenance by the Akka core development team - -If a contributions turns out to not “take off” it may be removed again at a -later time. - -## Caveat Emptor - -A module in this subproject doesn't have to obey the rule of staying binary -compatible between micro releases. Breaking API changes may be introduced in -minor releases without notice as we refine and simplify based on your feedback. -A module may be dropped in any release without prior deprecation. The Lightbend -subscription does not cover support for these modules. - -## Suggested Format of Contributions - -Each contribution should be a self-contained unit, consisting of one source -file or one exclusively used package, without dependencies to other modules in -this subproject; it may depend on everything else in the Akka distribution, -though. This ensures that contributions may be moved into the standard -distribution individually. The module shall be within a subpackage of -`akka.contrib`. - -Each module must be accompanied by a test suite which verifies that the -provided features work, possibly complemented by integration and unit tests. -The tests should follow the [Developer -Guidelines](http://doc.akka.io/docs/akka/current/dev/developer-guidelines.html#testing) -and go into the `src/test/scala` or `src/test/java` directories (with package -name matching the module which is being tested). As an example, if the module -were called `akka.contrib.pattern.ReliableProxy`, then the test suite should be -called `akka.contrib.pattern.ReliableProxySpec`. - -Each module must also have proper documentation in [reStructured Text -format](http://sphinx.pocoo.org/rest.html). The documentation should be a -single `.rst` file in the `akka-contrib/docs` directory, including a -link from `index.rst`. - -## Suggested Way of Using these Contributions - -Since the Akka team does not restrict updates to this subproject even during -otherwise binary compatible releases, and modules may be removed without -deprecation, it is suggested to copy the source files into your own code base, -changing the package name. This way you can choose when to update or which -fixes to include (to keep binary compatibility if needed) and later releases of -Akka do not potentially break your application. diff --git a/akka-contrib/src/main/resources/reference.conf b/akka-contrib/src/main/resources/reference.conf deleted file mode 100644 index 0f53f264ac..0000000000 --- a/akka-contrib/src/main/resources/reference.conf +++ /dev/null @@ -1,6 +0,0 @@ -###################################### -# Akka Contrib Reference Config File # -###################################### - -# This is the reference config file that contains all the default settings. -# Make your edits/overrides in your application.conf. diff --git a/akka-contrib/src/main/scala/akka/contrib/circuitbreaker/CircuitBreakerProxy.scala b/akka-contrib/src/main/scala/akka/contrib/circuitbreaker/CircuitBreakerProxy.scala deleted file mode 100644 index 694bb463a8..0000000000 --- a/akka-contrib/src/main/scala/akka/contrib/circuitbreaker/CircuitBreakerProxy.scala +++ /dev/null @@ -1,316 +0,0 @@ -/* - * Copyright (C) 2014-2019 Lightbend Inc. - */ - -package akka.contrib.circuitbreaker - -import akka.actor._ -import akka.contrib.circuitbreaker.CircuitBreakerProxy.CircuitOpenFailure -import akka.event.LoggingAdapter -import akka.pattern._ -import akka.util.Timeout - -import scala.util.{ Failure, Success } - -/** - * This is an Actor which implements the circuit breaker pattern, - * you may also be interested in the raw circuit breaker [[akka.pattern.CircuitBreaker]] - */ -@deprecated("Use akka.pattern.CircuitBreaker + ask instead", "2.5.0") -object CircuitBreakerProxy { - - /** - * Creates an circuit breaker actor proxying a target actor intended for request-reply interactions. - * It is possible to send messages through this proxy without expecting a response wrapping them into a - * [[akka.contrib.circuitbreaker.CircuitBreakerProxy.TellOnly]] or a - * [[akka.contrib.circuitbreaker.CircuitBreakerProxy.Passthrough]] the difference between the two being that - * a message wrapped into a [[akka.contrib.circuitbreaker.CircuitBreakerProxy.Passthrough]] is going to be - * forwarded even when the circuit is open (e.g. if you need to terminate the target and proxy actors sending - * a [[akka.actor.PoisonPill]] message) - * - * The circuit breaker implements the same state machine documented in [[akka.pattern.CircuitBreaker]] - * - * @param target the actor to proxy - * @param maxFailures maximum number of failures before opening the circuit - * @param callTimeout timeout before considering the ongoing call a failure - * @param resetTimeout time after which the channel will be closed after entering the open state - * @param circuitEventListener an actor that will receive a series of messages of type - * [[akka.contrib.circuitbreaker.CircuitBreakerProxy.CircuitBreakerEvent]] (optional) - * @param failureDetector function to detect if a message received from the target actor as a - * response from the request represents a failure - * @param failureMap function to map a failure into a response message. The failing response message is wrapped - * into a [[akka.contrib.circuitbreaker.CircuitBreakerProxy.CircuitOpenFailure]] object - */ - def props( - target: ActorRef, - maxFailures: Int, - callTimeout: Timeout, - resetTimeout: Timeout, - circuitEventListener: Option[ActorRef], - failureDetector: Any => Boolean, - failureMap: CircuitOpenFailure => Any) = - Props( - new CircuitBreakerProxy( - target, - maxFailures, - callTimeout, - resetTimeout, - circuitEventListener, - failureDetector, - failureMap)) - - sealed trait CircuitBreakerCommand - - final case class TellOnly(msg: Any) extends CircuitBreakerCommand - final case class Passthrough(msg: Any) extends CircuitBreakerCommand - - sealed trait CircuitBreakerResponse - final case class CircuitOpenFailure(failedMsg: Any) - - sealed trait CircuitBreakerEvent - final case class CircuitOpen(circuit: ActorRef) extends CircuitBreakerCommand - final case class CircuitClosed(circuit: ActorRef) extends CircuitBreakerCommand - final case class CircuitHalfOpen(circuit: ActorRef) extends CircuitBreakerCommand - - sealed trait CircuitBreakerState - case object Open extends CircuitBreakerState - case object Closed extends CircuitBreakerState - case object HalfOpen extends CircuitBreakerState - - final case class CircuitBreakerStateData(failureCount: Int = 0, firstHalfOpenMessageSent: Boolean = false) - - final case class CircuitBreakerPropsBuilder( - maxFailures: Int, - callTimeout: Timeout, - resetTimeout: Timeout, - circuitEventListener: Option[ActorRef] = None, - failureDetector: Any => Boolean = { _ => - false - }, - openCircuitFailureConverter: CircuitOpenFailure => Any = identity) { - - def withMaxFailures(value: Int) = copy(maxFailures = value) - def withCallTimeout(value: Timeout) = copy(callTimeout = value) - def withResetTimeout(value: Timeout) = copy(resetTimeout = value) - def withCircuitEventListener(value: Option[ActorRef]) = copy(circuitEventListener = value) - def withFailureDetector(value: Any => Boolean) = copy(failureDetector = value) - def withOpenCircuitFailureConverter(value: CircuitOpenFailure => Any) = copy(openCircuitFailureConverter = value) - - /** - * Creates the props for a [[akka.contrib.circuitbreaker.CircuitBreakerProxy]] proxying the given target - * - * @param target the target actor ref - */ - def props(target: ActorRef) = - CircuitBreakerProxy.props( - target, - maxFailures, - callTimeout, - resetTimeout, - circuitEventListener, - failureDetector, - openCircuitFailureConverter) - - } - - private[CircuitBreakerProxy] object CircuitBreakerInternalEvents { - sealed trait CircuitBreakerInternalEvent - case object CallFailed extends CircuitBreakerInternalEvent - case object CallSucceeded extends CircuitBreakerInternalEvent - } -} - -import akka.contrib.circuitbreaker.CircuitBreakerProxy._ - -@deprecated("Use akka.pattern.CircuitBreaker + ask instead", "2.5.0") -final class CircuitBreakerProxy( - target: ActorRef, - maxFailures: Int, - callTimeout: Timeout, - resetTimeout: Timeout, - circuitEventListener: Option[ActorRef], - failureDetector: Any => Boolean, - failureMap: CircuitOpenFailure => Any) - extends Actor - with ActorLogging - with FSM[CircuitBreakerState, CircuitBreakerStateData] { - - import CircuitBreakerInternalEvents._ - import FSM.`->` - - context.watch(target) - - startWith(Closed, CircuitBreakerStateData(failureCount = 0)) - - def callSucceededHandling: StateFunction = { - case Event(CallSucceeded, state) => - log.debug("Received call succeeded notification in state {} resetting counter", state) - goto(Closed).using(CircuitBreakerStateData(failureCount = 0, firstHalfOpenMessageSent = false)) - } - - def passthroughHandling: StateFunction = { - case Event(Passthrough(message), state) => - log.debug( - "Received a passthrough message in state {}, forwarding the message to the target actor without altering current state", - state) - target ! message - stay - } - - def targetTerminationHandling: StateFunction = { - case Event(Terminated(`target`), state) => - log.debug("Target actor {} terminated while in state {}, terminating this proxy too", target, state) - stop - } - - def commonStateHandling: StateFunction = { - callSucceededHandling.orElse(passthroughHandling).orElse(targetTerminationHandling) - } - - when(Closed) { - commonStateHandling.orElse { - case Event(TellOnly(message), _) => - log.debug("Closed: Sending message {} without expecting any response", message) - target ! message - stay - - case Event(CallFailed, state) => - log.debug("Received call failed notification in state {} incrementing counter", state) - val newState = state.copy(failureCount = state.failureCount + 1) - if (newState.failureCount < maxFailures) { - stay.using(newState) - } else { - goto(Open).using(newState) - } - - case Event(message, state) => - log.debug("CLOSED: Sending message {} expecting a response within timeout {}", message, callTimeout) - val currentSender = sender() - forwardRequest(message, sender, state, log) - stay - - } - } - - when(Open, stateTimeout = resetTimeout.duration) { - commonStateHandling.orElse { - case Event(StateTimeout, state) => - log.debug("Timeout expired for state OPEN, going to half open") - goto(HalfOpen).using(state.copy(firstHalfOpenMessageSent = false)) - - case Event(CallFailed, state) => - log.debug( - "Open: Call received a further call failed notification, probably from a previous timed out event, ignoring") - stay - - case Event(openNotification @ CircuitOpenFailure(_), _) => - log.warning( - "Unexpected circuit open notification {} sent to myself. Please report this as a bug.", - openNotification) - stay - - case Event(message, state) => - val failureNotification = failureMap(CircuitOpenFailure(message)) - log.debug( - "OPEN: Failing request for message {}, sending failure notification {} to sender {}", - message, - failureNotification, - sender) - sender ! failureNotification - stay - - } - } - - when(HalfOpen) { - commonStateHandling.orElse { - case Event(TellOnly(message), _) => - log.debug("HalfOpen: Dropping TellOnly request for message {}", message) - stay - - case Event(CallFailed, CircuitBreakerStateData(_, true)) => - log.debug("HalfOpen: First forwarded call failed returning to OPEN state") - goto(Open) - - case Event(CallFailed, CircuitBreakerStateData(_, false)) => - log.debug( - "HalfOpen: Call received a further call failed notification, probably from a previous timed out event, ignoring") - stay - - case Event(message, state @ CircuitBreakerStateData(_, false)) => - log.debug("HalfOpen: First message {} received, forwarding it to target {}", message, target) - forwardRequest(message, sender, state, log) - stay.using(state.copy(firstHalfOpenMessageSent = true)) - - case Event(message, CircuitBreakerStateData(_, true)) => - val failureNotification = failureMap(CircuitOpenFailure(message)) - log.debug( - "HALF-OPEN: Failing request for message {}, sending failure notification {} to sender {}", - message, - failureNotification, - sender) - sender ! failureNotification - stay - } - } - - def forwardRequest(message: Any, currentSender: ActorRef, state: CircuitBreakerStateData, log: LoggingAdapter) = { - import context.dispatcher - - target.ask(message)(callTimeout).onComplete { - case Success(response) => - log.debug( - "Request '{}' has been replied to with response {}, forwarding to original sender {}", - message, - currentSender) - - currentSender ! response - - val isFailure = failureDetector(response) - - if (isFailure) { - log.debug( - "Response '{}' is considered as failure sending self-message to ask incrementing failure count (origin state was {})", - response, - state) - - self ! CallFailed - } else { - - log.debug( - "Request '{}' succeeded with response {}, returning response to sender {} and sending message to ask to reset failure count (origin state was {})", - message, - response, - currentSender, - state) - - self ! CallSucceeded - } - - case Failure(reason) => - log.debug( - "Request '{}' to target {} failed with exception {}, sending self-message to ask incrementing failure count (origin state was {})", - message, - target, - reason, - state) - - self ! CallFailed - } - } - - onTransition { - case from -> Closed => - log.debug("Moving from state {} to state CLOSED", from) - circuitEventListener.foreach { _ ! CircuitClosed(self) } - - case from -> HalfOpen => - log.debug("Moving from state {} to state HALF OPEN", from) - circuitEventListener.foreach { _ ! CircuitHalfOpen(self) } - - case from -> Open => - log.debug("Moving from state {} to state OPEN", from) - circuitEventListener.foreach { _ ! CircuitOpen(self) } - } - -} diff --git a/akka-contrib/src/main/scala/akka/contrib/circuitbreaker/askExtensions.scala b/akka-contrib/src/main/scala/akka/contrib/circuitbreaker/askExtensions.scala deleted file mode 100644 index 7a6795cc87..0000000000 --- a/akka-contrib/src/main/scala/akka/contrib/circuitbreaker/askExtensions.scala +++ /dev/null @@ -1,124 +0,0 @@ -/* - * Copyright (C) 2009-2019 Lightbend Inc. - */ - -package akka.contrib.circuitbreaker - -import akka.actor.{ Actor, ActorRef, ActorSelection } -import akka.contrib.circuitbreaker.CircuitBreakerProxy.CircuitOpenFailure -import akka.util.Timeout -import scala.language.implicitConversions - -import scala.concurrent.{ ExecutionContext, Future } - -@deprecated("Use akka.pattern.CircuitBreaker + ask instead", "2.5.0") -sealed class OpenCircuitException(message: String) extends RuntimeException(message) -private[circuitbreaker] final object OpenCircuitException - extends OpenCircuitException("Unable to complete operation since the Circuit Breaker Actor Proxy is in Open State") - -/** - * Convenience implicit conversions to provide circuit-breaker aware management of the ask pattern, - * either directly replacing the `ask/?` with `askWithCircuitBreaker` or with an extension method to the - * `Future` result of an `ask` pattern to fail in case of - * [[akka.contrib.circuitbreaker.CircuitBreakerProxy.CircuitOpenFailure]] response - */ -@deprecated("Use akka.pattern.CircuitBreaker + ask instead", "2.5.0") -object Implicits { - - /** - * Import this implicit to enable the methods `failForOpenCircuit` and `failForOpenCircuitWith` - * to [[scala.concurrent.Future]] converting - * [[akka.contrib.circuitbreaker.CircuitBreakerProxy.CircuitOpenFailure]] into a failure caused either by an - * [[akka.contrib.circuitbreaker.OpenCircuitException]] or by an exception built with the given - * exception builder - */ - implicit def futureExtensions(future: Future[Any]) = new CircuitBreakerAwareFuture(future) - - /** - * Import this implicit method to get an extended versions of the `ask` pattern for - * [[akka.actor.ActorRef]] and [[akka.actor.ActorSelection]] converting - * [[akka.contrib.circuitbreaker.CircuitBreakerProxy.CircuitOpenFailure]] into a failure caused by an - * [[akka.contrib.circuitbreaker.OpenCircuitException]] - */ - implicit def askWithCircuitBreaker(actorRef: ActorRef) = new AskeableWithCircuitBreakerActor(actorRef) - - /** - * Wraps the `ask` method in [[akka.pattern.AskSupport]] method to convert - * [[akka.contrib.circuitbreaker.CircuitBreakerProxy.CircuitOpenFailure]] responses into a failure response caused - * by an [[akka.contrib.circuitbreaker.OpenCircuitException]] - */ - @throws[akka.contrib.circuitbreaker.OpenCircuitException]( - "if the call failed because the circuit breaker proxy state was OPEN") - def askWithCircuitBreaker(circuitBreakerProxy: ActorRef, message: Any)( - implicit executionContext: ExecutionContext, - timeout: Timeout): Future[Any] = - circuitBreakerProxy.internalAskWithCircuitBreaker(message, timeout, ActorRef.noSender) - - /** - * Wraps the `ask` method in [[akka.pattern.AskSupport]] method to convert failures connected to the circuit - * breaker being in open state - */ - @throws[akka.contrib.circuitbreaker.OpenCircuitException]( - "if the call failed because the circuit breaker proxy state was OPEN") - def askWithCircuitBreaker(circuitBreakerProxy: ActorRef, message: Any, sender: ActorRef)( - implicit executionContext: ExecutionContext, - timeout: Timeout): Future[Any] = - circuitBreakerProxy.internalAskWithCircuitBreaker(message, timeout, sender) - -} - -/** - * Extends [[scala.concurrent.Future]] with the method `failForOpenCircuitWith` to handle - * [[akka.contrib.circuitbreaker.CircuitBreakerProxy.CircuitOpenFailure]] failure responses throwing - * an exception built with the given exception builder - */ -@deprecated("Use akka.pattern.CircuitBreaker + ask instead", "2.5.0") -final class CircuitBreakerAwareFuture(val future: Future[Any]) extends AnyVal { - @throws[OpenCircuitException] - def failForOpenCircuit(implicit executionContext: ExecutionContext): Future[Any] = - failForOpenCircuitWith(OpenCircuitException) - - def failForOpenCircuitWith(throwing: => Throwable)(implicit executionContext: ExecutionContext): Future[Any] = { - future.flatMap { - _ match { - case CircuitOpenFailure(_) => Future.failed(throwing) - case result => Future.successful(result) - } - } - } -} - -@deprecated("Use akka.pattern.CircuitBreaker + ask instead", "2.5.0") -final class AskeableWithCircuitBreakerActor(val actorRef: ActorRef) extends AnyVal { - def askWithCircuitBreaker(message: Any)( - implicit executionContext: ExecutionContext, - timeout: Timeout, - sender: ActorRef = Actor.noSender): Future[Any] = - internalAskWithCircuitBreaker(message, timeout, sender) - - @throws[OpenCircuitException] - private[circuitbreaker] def internalAskWithCircuitBreaker(message: Any, timeout: Timeout, sender: ActorRef)( - implicit executionContext: ExecutionContext) = { - import akka.pattern.ask - import Implicits.futureExtensions - - ask(actorRef, message, sender)(timeout).failForOpenCircuit - } -} - -@deprecated("Use akka.pattern.CircuitBreaker + ask instead", "2.5.0") -final class AskeableWithCircuitBreakerActorSelection(val actorSelection: ActorSelection) extends AnyVal { - def askWithCircuitBreaker(message: Any)( - implicit executionContext: ExecutionContext, - timeout: Timeout, - sender: ActorRef = Actor.noSender): Future[Any] = - internalAskWithCircuitBreaker(message, timeout, sender) - - private[circuitbreaker] def internalAskWithCircuitBreaker(message: Any, timeout: Timeout, sender: ActorRef)( - implicit executionContext: ExecutionContext) = { - import akka.pattern.ask - import Implicits.futureExtensions - - ask(actorSelection, message, sender)(timeout).failForOpenCircuit - } -} diff --git a/akka-contrib/src/main/scala/akka/contrib/jul/JavaLogger.scala b/akka-contrib/src/main/scala/akka/contrib/jul/JavaLogger.scala deleted file mode 100644 index 76378a854e..0000000000 --- a/akka-contrib/src/main/scala/akka/contrib/jul/JavaLogger.scala +++ /dev/null @@ -1,125 +0,0 @@ -/* - * Copyright (C) 2009-2019 Lightbend Inc. - */ - -package akka.contrib.jul - -import akka.event.Logging._ -import akka.actor._ -import akka.event.LoggingAdapter -import java.util.logging -import scala.concurrent.{ ExecutionContext, Future } -import akka.dispatch.RequiresMessageQueue -import akka.event.LoggerMessageQueueSemantics - -/** - * Makes the Akka `Logging` API available as the `log` - * field, using `java.util.logging` as the backend. - * - * This trait does not require an `ActorSystem` and is - * encouraged to be used as a general purpose Scala - * logging API. - * - * For `Actor`s, use `ActorLogging` instead. - */ -@deprecated("Feel free to copy", "2.5.0") -trait JavaLogging { - - @transient - protected lazy val log = new JavaLoggingAdapter { - def logger = logging.Logger.getLogger(JavaLogging.this.getClass.getName) - } -} - -/** - * `java.util.logging` logger. - */ -@deprecated("Use akka.event.jul.JavaLogger in akka-actor instead", "2.5.0") -class JavaLogger extends Actor with RequiresMessageQueue[LoggerMessageQueueSemantics] { - - def receive = { - case event @ Error(cause, _, _, _) => log(logging.Level.SEVERE, cause, event) - case event: Warning => log(logging.Level.WARNING, null, event) - case event: Info => log(logging.Level.INFO, null, event) - case event: Debug => log(logging.Level.CONFIG, null, event) - case InitializeLogger(_) => sender() ! LoggerInitialized - } - - @inline - def log(level: logging.Level, cause: Throwable, event: LogEvent): Unit = { - val logger = logging.Logger.getLogger(event.logSource) - val record = new logging.LogRecord(level, String.valueOf(event.message)) - record.setLoggerName(logger.getName) - record.setThrown(cause) - record.setThreadID(event.thread.getId.toInt) - record.setSourceClassName(event.logClass.getName) - record.setSourceMethodName(null) // lost forever - logger.log(record) - } -} - -@deprecated("Feel free to copy", "2.5.0") -trait JavaLoggingAdapter extends LoggingAdapter { - - def logger: logging.Logger - - /** Override-able option for asynchronous logging */ - def loggingExecutionContext: Option[ExecutionContext] = None - - def isErrorEnabled = logger.isLoggable(logging.Level.SEVERE) - - def isWarningEnabled = logger.isLoggable(logging.Level.WARNING) - - def isInfoEnabled = logger.isLoggable(logging.Level.INFO) - - def isDebugEnabled = logger.isLoggable(logging.Level.CONFIG) - - protected def notifyError(message: String): Unit = - log(logging.Level.SEVERE, null, message) - - protected def notifyError(cause: Throwable, message: String): Unit = - log(logging.Level.SEVERE, cause, message) - - protected def notifyWarning(message: String): Unit = - log(logging.Level.WARNING, null, message) - - protected def notifyInfo(message: String): Unit = - log(logging.Level.INFO, null, message) - - protected def notifyDebug(message: String): Unit = - log(logging.Level.CONFIG, null, message) - - @inline - def log(level: logging.Level, cause: Throwable, message: String): Unit = { - val record = new logging.LogRecord(level, message) - record.setLoggerName(logger.getName) - record.setThrown(cause) - updateSource(record) - - if (loggingExecutionContext.isDefined) { - implicit val context = loggingExecutionContext.get - Future(logger.log(record)).failed.foreach { _.printStackTrace() } - } else - logger.log(record) - } - - // it is unfortunate that this workaround is needed - private def updateSource(record: logging.LogRecord): Unit = { - val stack = Thread.currentThread.getStackTrace - val source = stack.find { frame => - val cname = frame.getClassName - !cname.startsWith("akka.contrib.jul.") && - !cname.startsWith("akka.event.LoggingAdapter") && - !cname.startsWith("java.lang.reflect.") && - !cname.startsWith("sun.reflect.") - } - if (source.isDefined) { - record.setSourceClassName(source.get.getClassName) - record.setSourceMethodName(source.get.getMethodName) - } else { - record.setSourceClassName(null) - record.setSourceMethodName(null) - } - } - -} diff --git a/akka-contrib/src/main/scala/akka/contrib/mailbox/PeekMailbox.scala b/akka-contrib/src/main/scala/akka/contrib/mailbox/PeekMailbox.scala deleted file mode 100644 index 8a228d3d91..0000000000 --- a/akka-contrib/src/main/scala/akka/contrib/mailbox/PeekMailbox.scala +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Copyright (C) 2009-2019 Lightbend Inc. - */ - -package akka.contrib.mailbox - -import java.util.concurrent.{ ConcurrentHashMap, ConcurrentLinkedQueue } - -import com.typesafe.config.Config - -import akka.actor.{ - ActorContext, - ActorRef, - ActorSystem, - ExtendedActorSystem, - Extension, - ExtensionId, - ExtensionIdProvider -} -import akka.dispatch.{ Envelope, MailboxType, MessageQueue, UnboundedQueueBasedMessageQueue } - -@deprecated("Use an explicit supervisor or proxy actor instead", "2.5.0") -object PeekMailboxExtension extends ExtensionId[PeekMailboxExtension] with ExtensionIdProvider { - def lookup = this - def createExtension(s: ExtendedActorSystem) = new PeekMailboxExtension(s) - - def ack()(implicit context: ActorContext): Unit = PeekMailboxExtension(context.system).ack() -} - -@deprecated("Use an explicit supervisor or proxy actor instead", "2.5.0") -class PeekMailboxExtension(val system: ExtendedActorSystem) extends Extension { - private val mailboxes = new ConcurrentHashMap[ActorRef, PeekMailbox] - - def register(actorRef: ActorRef, mailbox: PeekMailbox): Unit = - mailboxes.put(actorRef, mailbox) - - def unregister(actorRef: ActorRef): Unit = mailboxes.remove(actorRef) - - def ack()(implicit context: ActorContext): Unit = - mailboxes.get(context.self) match { - case null => throw new IllegalArgumentException("Mailbox not registered for: " + context.self) - case mailbox => mailbox.ack() - } -} - -/** - * configure the mailbox via dispatcher configuration: - * {{{ - * peek-dispatcher { - * mailbox-type = "example.PeekMailboxType" - * } - * }}} - */ -@deprecated("Use an explicit supervisor or proxy actor instead", "2.5.0") -class PeekMailboxType(settings: ActorSystem.Settings, config: Config) extends MailboxType { - override def create(owner: Option[ActorRef], system: Option[ActorSystem]) = (owner, system) match { - case (Some(o), Some(s)) => - val retries = config.getInt("max-retries") - if (retries < 1) throw new akka.ConfigurationException("max-retries must be at least 1") - val mailbox = new PeekMailbox(o, s, retries) - PeekMailboxExtension(s).register(o, mailbox) - mailbox - case _ => throw new Exception("no mailbox owner or system given") - } -} - -@deprecated("Use an explicit supervisor or proxy actor instead", "2.5.0") -class PeekMailbox(owner: ActorRef, system: ActorSystem, maxRetries: Int) extends UnboundedQueueBasedMessageQueue { - final val queue = new ConcurrentLinkedQueue[Envelope]() - - /* - * Since the queue itself is used to determine when to schedule the actor - * (see Mailbox.hasMessages), we cannot poll() on the first try and then - * continue handing back out that same message until ACKed, peek() must be - * used. The retry limit logic is then formulated in terms of the `tries` - * field, which holds - * 0 if clean slate (i.e. last dequeue was ack()ed) - * 1..maxRetries if not yet ack()ed - * Marker if last try was done (at which point we had to poll()) - * -1 during cleanUp (in order to disable the ack() requirement) - */ - // the mutable state is only ever accessed by the actor (i.e. dequeue() side) - var tries = 0 - val Marker = maxRetries + 1 - - // this logic does not work if maxRetries==0, but then you could also use a normal mailbox - override def dequeue(): Envelope = tries match { - case -1 => - queue.poll() - case 0 | Marker => - val e = queue.peek() - tries = if (e eq null) 0 else 1 - e - case `maxRetries` => - tries = Marker - queue.poll() - case n => - tries = n + 1 - queue.peek() - } - - def ack(): Unit = { - // do not dequeue for real if double-ack() or ack() on last try - if (tries != 0 && tries != Marker) queue.poll() - tries = 0 - } - - override def cleanUp(owner: ActorRef, deadLetters: MessageQueue): Unit = { - tries = -1 // put the queue into auto-ack mode - super.cleanUp(owner, deadLetters) - PeekMailboxExtension(system).unregister(owner) - } -} diff --git a/akka-contrib/src/main/scala/akka/contrib/pattern/Aggregator.scala b/akka-contrib/src/main/scala/akka/contrib/pattern/Aggregator.scala deleted file mode 100644 index a4dd08b5bb..0000000000 --- a/akka-contrib/src/main/scala/akka/contrib/pattern/Aggregator.scala +++ /dev/null @@ -1,202 +0,0 @@ -/* - * Copyright (C) 2009-2019 Lightbend Inc. - */ - -package akka.contrib.pattern - -import akka.actor.Actor -import scala.annotation.tailrec - -/** - * The aggregator is to be mixed into an actor for the aggregator behavior. - */ -@deprecated("Feel free to copy", "2.5.0") -trait Aggregator { - this: Actor => - - private var processing = false - private val expectList = WorkList.empty[Actor.Receive] - private val addBuffer = WorkList.empty[Actor.Receive] - - /** - * Adds the partial function to the receive set, to be removed on first match. - * @param fn The receive function. - * @return The same receive function. - */ - def expectOnce(fn: Actor.Receive): Actor.Receive = { - if (processing) addBuffer.add(fn, permanent = false) - else expectList.add(fn, permanent = false) - fn - } - - /** - * Adds the partial function to the receive set and keeping it in the receive set till removed. - * @param fn The receive function. - * @return The same receive function. - */ - def expect(fn: Actor.Receive): Actor.Receive = { - if (processing) addBuffer.add(fn, permanent = true) - else expectList.add(fn, permanent = true) - fn - } - - /** - * Removes the partial function from the receive set. - * @param fn The receive function. - * @return True if the partial function is removed, false if not found. - */ - def unexpect(fn: Actor.Receive): Boolean = { - if (expectList.remove(fn)) true - else if (processing && (addBuffer.remove(fn))) true - else false - } - - /** - * Receive function for handling the aggregations. - */ - def receive: Actor.Receive = { - case msg if handleMessage(msg) => // already dealt with in handleMessage - } - - /** - * Handles messages and matches against the expect list. - * @param msg The message to be handled. - * @return true if message is successfully processed, false otherwise. - */ - def handleMessage(msg: Any): Boolean = { - processing = true - try { - expectList.process { fn => - var processed = true - fn.applyOrElse(msg, (_: Any) => processed = false) - processed - } - } finally { - processing = false - expectList.addAll(addBuffer) - addBuffer.removeAll() - } - } -} - -/** - * Provides the utility methods and constructors to the WorkList class. - */ -@deprecated("Feel free to copy", "2.5.0") -object WorkList { - - def empty[T] = new WorkList[T] - - /** - * Singly linked list entry implementation for WorkList. - * @param ref The item reference, None for head entry - * @param permanent If the entry is to be kept after processing - */ - class Entry[T](val ref: Option[T], val permanent: Boolean) { - var next: Entry[T] = null - var isDeleted = false - } -} - -/** - * Fast, small, and dirty implementation of a linked list that removes transient work entries once they are processed. - * The list is not thread safe! However it is expected to be reentrant. This means a processing function can add/remove - * entries from the list while processing. Most important, a processing function can remove its own entry from the list. - * The first remove must return true and any subsequent removes must return false. - */ -@deprecated("Feel free to copy", "2.5.0") -class WorkList[T] { - - import WorkList._ - - val head = new Entry[T](None, true) - var tail = head - - /** - * Appends an entry to the work list. - * @param ref The entry. - * @return The updated work list. - */ - def add(ref: T, permanent: Boolean) = { - if (tail == head) { - tail = new Entry[T](Some(ref), permanent) - head.next = tail - } else { - tail.next = new Entry[T](Some(ref), permanent) - tail = tail.next - } - this - } - - /** - * Removes an entry from the work list - * @param ref The entry. - * @return True if the entry is removed, false if the entry is not found. - */ - def remove(ref: T): Boolean = { - - @tailrec - def remove(parent: Entry[T], entry: Entry[T]): Boolean = { - if (entry.ref.get == ref) { - parent.next = entry.next // Remove entry - if (tail == entry) tail = parent - entry.isDeleted = true - true - } else if (entry.next != null) remove(entry, entry.next) - else false - } - - if (head.next == null) false else remove(head, head.next) - } - - /** - * Tries to process each entry using the processing function. Stops at the first entry processing succeeds. - * If the entry is not permanent, the entry is removed. - * @param processFn The processing function, returns true if processing succeeds. - * @return true if an entry has been processed, false if no entries are processed successfully. - */ - def process(processFn: T => Boolean): Boolean = { - - @tailrec - def process(parent: Entry[T], entry: Entry[T]): Boolean = { - val processed = processFn(entry.ref.get) - if (processed) { - if (!entry.permanent && !entry.isDeleted) { - parent.next = entry.next // Remove entry - if (tail == entry) tail = parent - entry.isDeleted = true - } - true // Handled - } else if (entry.next != null) process(entry, entry.next) - else false - } - - if (head.next == null) false else process(head, head.next) - } - - /** - * Appends another WorkList to this WorkList. - * @param other The other WorkList - * @return This WorkList - */ - def addAll(other: WorkList[T]) = { - if (other.head.next != null) { - tail.next = other.head.next - tail = other.tail - } - this - } - - /** - * Removes all entries from this WorkList - * @return True if at least one entry is removed. False if none is removed. - */ - def removeAll() = { - if (head.next == null) false - else { - head.next = null - tail = head - true - } - } -} diff --git a/akka-contrib/src/main/scala/akka/contrib/pattern/ReceivePipeline.scala b/akka-contrib/src/main/scala/akka/contrib/pattern/ReceivePipeline.scala deleted file mode 100644 index ac105ff681..0000000000 --- a/akka-contrib/src/main/scala/akka/contrib/pattern/ReceivePipeline.scala +++ /dev/null @@ -1,125 +0,0 @@ -/* - * Copyright (C) 2018-2019 Lightbend Inc. - */ - -package akka.contrib.pattern - -import akka.actor.Actor - -@deprecated("Feel free to copy", "2.5.0") -object ReceivePipeline { - - /** - * Result returned by an interceptor PF to determine what/whether to delegate to the next inner interceptor - */ - sealed trait Delegation - - case class Inner(transformedMsg: Any) extends Delegation { - - /** - * Add a block of code to be executed after the message (which may be further transformed and processed by - * inner interceptors) is handled by the actor's receive. - * - * The block of code will be executed before similar blocks in outer interceptors. - */ - def andAfter(after: => Unit): Delegation = InnerAndAfter(transformedMsg, (_ => after)) - } - - private[ReceivePipeline] case class InnerAndAfter(transformedMsg: Any, after: Unit => Unit) extends Delegation - - /** - * Interceptor return value that indicates that the message has been handled - * completely. The message will not be passed to inner interceptors - * (or to the decorated actor's receive). - */ - case object HandledCompletely extends Delegation - - private def withDefault(interceptor: Interceptor): Interceptor = interceptor.orElse({ case msg => Inner(msg) }) - - type Interceptor = PartialFunction[Any, Delegation] - - private sealed trait HandlerResult - private case object Done extends HandlerResult - private case object Undefined extends HandlerResult - - private type Handler = Any => HandlerResult -} - -/** - * Trait implementing Receive Pipeline Pattern. Mixin this trait - * for configuring a chain of interceptors to be applied around - * Actor's current behavior. - */ -@deprecated("Feel free to copy", "2.5.0") -trait ReceivePipeline extends Actor { - import ReceivePipeline._ - - private var pipeline: Vector[Interceptor] = Vector.empty - private var decoratorCache: Option[(Receive, Receive)] = None - - /** - * Adds an inner interceptor, it will be applied lastly, near to Actor's original behavior - * @param interceptor an interceptor - */ - def pipelineInner(interceptor: Interceptor): Unit = { - pipeline :+= withDefault(interceptor) - decoratorCache = None - } - - /** - * Adds an outer interceptor, it will be applied firstly, far from Actor's original behavior - * @param interceptor an interceptor - */ - def pipelineOuter(interceptor: Interceptor): Unit = { - pipeline +:= withDefault(interceptor) - decoratorCache = None - } - - private def combinedDecorator: Receive => Receive = { receive => - // So that reconstructed Receive PF is undefined only when the actor's - // receive is undefined for a transformed message that reaches it... - val innerReceiveHandler: Handler = { - case msg => receive.lift(msg).map(_ => Done).getOrElse(Undefined) - } - - val zipped = pipeline.foldRight(innerReceiveHandler) { (outerInterceptor, innerHandler) => - outerInterceptor.andThen { - case Inner(msg) => innerHandler(msg) - case InnerAndAfter(msg, after) => - try innerHandler(msg) - finally after(()) - case HandledCompletely => Done - } - } - - toReceive(zipped) - } - - private def toReceive(handler: Handler) = new Receive { - def isDefinedAt(m: Any): Boolean = evaluate(m) != Undefined - def apply(m: Any): Unit = evaluate(m) - - override def applyOrElse[A1 <: Any, B1 >: Unit](m: A1, default: A1 => B1): B1 = { - val result = handler(m) - - if (result == Undefined) default(m) - } - - private def evaluate(m: Any) = handler(m) - } - - /** - * INTERNAL API. - */ - override protected[akka] def aroundReceive(receive: Receive, msg: Any): Unit = { - def withCachedDecoration(decorator: Receive => Receive): Receive = decoratorCache match { - case Some((`receive`, cached)) => cached - case _ => - val decorated = decorator(receive) - decoratorCache = Some((receive, decorated)) - decorated - } - - super.aroundReceive(withCachedDecoration(combinedDecorator), msg) - } -} diff --git a/akka-contrib/src/main/scala/akka/contrib/pattern/ReliableProxy.scala b/akka-contrib/src/main/scala/akka/contrib/pattern/ReliableProxy.scala deleted file mode 100644 index fe3b5c4284..0000000000 --- a/akka-contrib/src/main/scala/akka/contrib/pattern/ReliableProxy.scala +++ /dev/null @@ -1,407 +0,0 @@ -/* - * Copyright (C) 2009-2019 Lightbend Inc. - */ - -package akka.contrib.pattern - -import akka.actor._ -import akka.remote.RemoteScope -import scala.concurrent.duration._ -import scala.util.Try -import java.util.concurrent.TimeUnit - -@deprecated("Use AtLeastOnceDelivery instead", "2.5.0") -object ReliableProxy { - - /** - * Scala API Props. Arguments are detailed in the [[akka.contrib.pattern.ReliableProxy]] - * constructor. - */ - def props( - targetPath: ActorPath, - retryAfter: FiniteDuration, - reconnectAfter: Option[FiniteDuration], - maxReconnects: Option[Int]): Props = { - Props(new ReliableProxy(targetPath, retryAfter, reconnectAfter, maxReconnects)) - } - - /** - * Java API Props. Arguments are detailed in the [[akka.contrib.pattern.ReliableProxy]] - * constructor. - */ - def props( - targetPath: ActorPath, - retryAfter: FiniteDuration, - reconnectAfter: FiniteDuration, - maxReconnects: Int): Props = { - props(targetPath, retryAfter, Option(reconnectAfter), if (maxReconnects > 0) Some(maxReconnects) else None) - } - - /** - * Props with no limit on reconnections. Arguments are detailed in the [[akka.contrib.pattern.ReliableProxy]] - * constructor. - */ - def props(targetPath: ActorPath, retryAfter: FiniteDuration, reconnectAfter: FiniteDuration): Props = { - props(targetPath, retryAfter, Option(reconnectAfter), None) - } - - /** - * Props with no reconnections. Arguments are detailed in the [[akka.contrib.pattern.ReliableProxy]] - * constructor. - */ - def props(targetPath: ActorPath, retryAfter: FiniteDuration): Props = { - props(targetPath, retryAfter, None, None) - } - - class Receiver(target: ActorRef, initialSerial: Int) extends Actor with ReliableProxyDebugLogging { - var lastSerial = initialSerial - - context.watch(target) - - def receive = { - case Message(msg, snd, serial) => - if (serial == lastSerial + 1) { - target.tell(msg, snd) - sender() ! Ack(serial) - lastSerial = serial - } else if (compare(serial, lastSerial) <= 0) { - sender() ! Ack(serial) - } else { - logDebug("Received message from {} with wrong serial: {}", snd, msg) - } - case Terminated(`target`) => context.stop(self) - } - } - - /** - * Wrap-around aware comparison of integers: differences limited to 2**31-1 - * in magnitude will work correctly. - */ - def compare(a: Int, b: Int): Int = { - val c = a - b - c match { - case x if x < 0 => -1 - case x if x == 0 => 0 - case x if x > 0 => 1 - } - } - - def receiver(target: ActorRef, currentSerial: Int): Props = Props(classOf[Receiver], target, currentSerial) - - // Internal messages - final case class Message(msg: Any, sender: ActorRef, serial: Int) - private final case class Ack(serial: Int) - private case object Tick - private case object ReconnectTick - - /** - * `TargetChanged` is sent to transition subscribers when the initial connection is made - * the target and when the target `ActorRef` has changed (for example, the target system - * crashed and has been restarted). - */ - final case class TargetChanged(ref: ActorRef) - - /** - * `ProxyTerminated` is sent to transition subscribers during `postStop`. Any outstanding - * unsent messages are contained the `Unsent` object. - */ - final case class ProxyTerminated(actor: ActorRef, outstanding: Unsent) - final case class Unsent(queue: Vector[Message]) - - sealed trait State - case object Idle extends State - case object Active extends State - case object Connecting extends State - - // Java API - val idle = Idle - val active = Active - val reconnecting = Connecting - -} - -/** - * INTERNAL API - */ -private[akka] trait ReliableProxyDebugLogging extends ActorLogging { this: Actor => - val debug: Boolean = - Try(context.system.settings.config.getBoolean("akka.reliable-proxy.debug")).getOrElse(false) - - def enabled: Boolean = debug && log.isDebugEnabled - - def addSelf(template: String): String = s"$template [$self]" - - def logDebug(template: String, arg1: Any, arg2: Any): Unit = - if (enabled) log.debug(addSelf(template), arg1, arg2) - - def logDebug(template: String, arg1: Any): Unit = - if (enabled) log.debug(addSelf(template), arg1) -} - -import ReliableProxy._ - -/** - * A ReliableProxy is a means to wrap a remote actor reference in order to - * obtain certain improved delivery guarantees: - * - * - as long as the proxy is not terminated before it sends all of its queued - * messages then no messages will be lost - * - messages re-sent due to the first point will not be delivered out-of-order, - * message ordering is preserved - * - * These guarantees are valid for the communication between the two end-points - * of the reliable “tunnel”, which usually spans an unreliable network. - * - * Note that the ReliableProxy guarantees at-least-once, not exactly-once, delivery. - * - * Delivery from the remote end-point to the target actor is still subject to in-JVM - * delivery semantics (i.e. not strictly guaranteed due to possible OutOfMemory - * situations or other VM errors). - * - * You can create a reliable connection like this: - * - * In Scala: - * {{{ - * val proxy = context.actorOf(ReliableProxy.props(target, 100.millis, 120.seconds) - * }}} - * or in Java: - * {{{ - * final ActorRef proxy = getContext().actorOf(ReliableProxy.props( - * target, Duration.create(100, "millis"), Duration.create(120, "seconds"))); - * }}} - * - * '''''Please note:''''' the tunnel is uni-directional, and original sender - * information is retained, hence replies by the wrapped target reference will - * go back in the normal “unreliable” way unless also secured by a ReliableProxy - * from the remote end. - * - * ==Message Types== - * - * This actor is an [[akka.actor.FSM]], hence it offers the service of - * transition callbacks to those actors which subscribe using the - * `SubscribeTransitionCallBack` and `UnsubscribeTransitionCallBack` - * messages; see [[akka.actor.FSM]] for more documentation. The proxy will - * transition into `ReliableProxy.Active` state when ACKs - * are outstanding and return to the `ReliableProxy.Idle` - * state when every message send so far has been confirmed by the peer end-point. - * - * The initial state of the proxy is `ReliableProxy.Connecting`. In this state the - * proxy will repeatedly send [[akka.actor.Identify]] messages to `ActorSelection(targetPath)` - * in order to obtain a new `ActorRef` for the target. When an [[akka.actor.ActorIdentity]] - * for the target is received a new tunnel will be created, a [[ReliableProxy.TargetChanged]] - * message containing the target `ActorRef` will be sent to the proxy's transition subscribers - * and the proxy will transition into either the `ReliableProxy.Idle` or `ReliableProxy.Active` - * state, depending if there are any outstanding messages that need to be delivered. If - * `maxConnectAttempts` is defined this actor will stop itself after `Identify` is sent - * `maxConnectAttempts` times. - * - * While in the `Idle` or `Active` states, if a communication failure causes the tunnel to - * terminate via Remote Deathwatch the proxy will transition into the `ReliableProxy.Connecting` - * state as described above. After reconnecting `TargetChanged` will be sent only if the target - * `ActorRef` has changed. - * - * If this actor is stopped and it still has outstanding messages a - * [[ReliableProxy.ProxyTerminated]] message will be sent to the - * transition subscribers. It contains an `Unsent` object with the outstanding messages. - * - * If an [[ReliableProxy.Unsent]] message is sent to this actor - * the messages contained within it will be relayed through the tunnel to the target. - * - * Any other message type sent to this actor will be delivered via a remote-deployed - * child actor to the designated target. - * - * ==Failure Cases== - * - * All failures of either the local or the remote end-point are escalated to the - * parent of this actor; there are no specific error cases which are predefined. - * - * ==Arguments== - * See the constructor below for the arguments for this actor. However, prefer using - * [[akka.contrib.pattern.ReliableProxy#props]] to this actor's constructor. - * - * @param targetPath is the `ActorPath` to the actor to which all messages will be forwarded. - * `targetPath` can point to a local or remote actor, but the tunnel endpoint will be - * deployed remotely on the node where the target actor lives. - * @param retryAfter is the ACK timeout after which all outstanding messages - * will be resent. There is no limit on the queue size or the number of retries. - * @param reconnectAfter  is an optional interval between connection attempts. - * It is also used as the interval between receiving a `Terminated` for the tunnel and - * attempting to reconnect to the target actor. The minimum recommended value for this is - * the value of the configuration setting `akka.remote.retry-gate-closed-for`. Use `None` - * to never reconnect after a disconnection. - * @param maxConnectAttempts  is an optional maximum number of attempts to connect to the - * target actor. Use `None` for no limit. If `reconnectAfter` is `None` this value is ignored. - */ -@deprecated("Use AtLeastOnceDelivery instead", "2.5.0") -class ReliableProxy( - targetPath: ActorPath, - retryAfter: FiniteDuration, - reconnectAfter: Option[FiniteDuration], - maxConnectAttempts: Option[Int]) - extends Actor - with LoggingFSM[State, Vector[Message]] - with ReliableProxyDebugLogging { - import FSM.`->` - - var tunnel: ActorRef = _ - var currentSerial: Int = 0 - var lastAckSerial: Int = _ - var currentTarget: ActorRef = _ - var attemptedReconnects: Int = _ - - val resendTimer = "resend" - val reconnectTimer = "reconnect" - - val retryGateClosedFor = - Try(context.system.settings.config.getDuration("akka.remote.retry-gate-closed-for", TimeUnit.MILLISECONDS)) - .map(_.longValue) - .getOrElse(5000L) - - val defaultConnectInterval = - Try( - context.system.settings.config.getDuration("akka.reliable-proxy.default-connect-interval", TimeUnit.MILLISECONDS)) - .map(_.longValue) - .getOrElse(retryGateClosedFor) - .millis - - val initialState = Connecting - - self ! ReconnectTick - - def createTunnel(target: ActorRef): Unit = { - logDebug("Creating new tunnel for {}", target) - tunnel = context.actorOf( - receiver(target, lastAckSerial).withDeploy(Deploy(scope = RemoteScope(target.path.address))), - "tunnel") - - context.watch(tunnel) - currentTarget = target - attemptedReconnects = 0 - resetBackoff() - } - - if (targetPath.address.host.isEmpty && self.path.address == targetPath.address) { - logDebug("Unnecessary to use ReliableProxy for local target: {}", targetPath) - } - - override def supervisorStrategy = OneForOneStrategy() { - case _ => SupervisorStrategy.Escalate - } - - override def postStop(): Unit = { - logDebug("Stopping proxy and sending {} messages to subscribers in Unsent", stateData.size) - gossip(ProxyTerminated(self, Unsent(stateData))) - super.postStop() - } - - startWith(initialState, Vector.empty) - - when(Idle) { - case Event(Terminated(_), _) => terminated() - case Event(Ack(_), _) => stay() - case Event(Unsent(msgs), _) => goto(Active).using(resend(updateSerial(msgs))) - case Event(msg, _) => goto(Active).using(Vector(send(msg, sender()))) - } - - onTransition { - case _ -> Active => scheduleTick() - case Active -> Idle => cancelTimer(resendTimer) - case _ -> Connecting => scheduleReconnectTick() - } - - when(Active) { - case Event(Terminated(_), _) => - terminated() - case Event(Ack(serial), queue) => - val q = queue.dropWhile(m => compare(m.serial, serial) <= 0) - if (compare(serial, lastAckSerial) > 0) lastAckSerial = serial - scheduleTick() - if (q.isEmpty) goto(Idle).using(Vector.empty) - else stay.using(q) - case Event(Tick, queue) => - logResend(queue.size) - queue.foreach { tunnel ! _ } - scheduleTick() - stay() - case Event(Unsent(msgs), queue) => - stay.using(queue ++ resend(updateSerial(msgs))) - case Event(msg, queue) => - stay.using(queue :+ send(msg, sender())) - } - - when(Connecting) { - case Event(Terminated(_), _) => - stay() - case Event(ActorIdentity(_, Some(actor)), queue) => - val curr = currentTarget - cancelTimer(reconnectTimer) - createTunnel(actor) - if (currentTarget != curr) gossip(TargetChanged(currentTarget)) - if (queue.isEmpty) goto(Idle) else goto(Active).using(resend(queue)) - case Event(ActorIdentity(_, None), _) => - stay() - case Event(ReconnectTick, _) => - if (maxConnectAttempts.exists(_ == attemptedReconnects)) { - logDebug("Failed to reconnect after {}", attemptedReconnects) - stop() - } else { - logDebug("{} ! {}", context.actorSelection(targetPath), Identify(targetPath)) - context.actorSelection(targetPath) ! Identify(targetPath) - scheduleReconnectTick() - attemptedReconnects += 1 - stay() - } - case Event(Unsent(msgs), queue) => - stay.using(queue ++ updateSerial(msgs)) - case Event(msg, queue) => - stay.using(queue :+ Message(msg, sender(), nextSerial())) - } - - def scheduleTick(): Unit = setTimer(resendTimer, Tick, retryAfter, repeat = false) - - def nextSerial(): Int = { - currentSerial += 1 - currentSerial - } - - def send(msg: Any, snd: ActorRef): Message = { - val m = Message(msg, snd, nextSerial()) - tunnel ! m - m - } - - def updateSerial(q: Vector[Message]) = q.map(_.copy(serial = nextSerial())) - - def resend(q: Vector[Message]): Vector[Message] = { - logResend(q.size) - q.foreach { tunnel ! _ } - q - } - - def logResend(size: Int): Unit = - logDebug("Resending {} messages through tunnel", size) - - def terminated(): State = { - logDebug("Terminated: {}", targetPath) - if (reconnectAfter.isDefined) goto(Connecting) - else stop() - } - - def scheduleReconnectTick(): Unit = { - val delay = nextBackoff() - logDebug("Will attempt to reconnect to {} in {}", targetPath, delay) - setTimer(reconnectTimer, ReconnectTick, delay, repeat = false) - } - - /** - * Reset backoff interval. - * - * This and nextBackoff are meant to be implemented by subclasses. - */ - def resetBackoff(): Unit = {} - - /** - * Returns the next retry interval duration. By default each interval is the same, reconnectAfter. - */ - def nextBackoff(): FiniteDuration = reconnectAfter.getOrElse(defaultConnectInterval) -} diff --git a/akka-contrib/src/main/scala/akka/contrib/throttle/TimerBasedThrottler.scala b/akka-contrib/src/main/scala/akka/contrib/throttle/TimerBasedThrottler.scala deleted file mode 100644 index 31591a883c..0000000000 --- a/akka-contrib/src/main/scala/akka/contrib/throttle/TimerBasedThrottler.scala +++ /dev/null @@ -1,320 +0,0 @@ -/* - * Copyright (C) 2009-2019 Lightbend Inc. - */ - -package akka.contrib.throttle - -import scala.concurrent.duration.{ Duration, FiniteDuration } -import scala.collection.immutable.{ Queue => Q } -import akka.actor.{ Actor, ActorRef, FSM } -import Throttler._ -import TimerBasedThrottler._ -import java.util.concurrent.TimeUnit - -/** - * @see [[akka.contrib.throttle.TimerBasedThrottler]] - * @see [[akka.contrib.throttle.Throttler.Rate]] - * @see [[akka.contrib.throttle.Throttler.SetRate]] - * @see [[akka.contrib.throttle.Throttler.SetTarget]] - */ -@deprecated("Use streams, see migration guide", "2.5.0") -object Throttler { - - /** - * A rate used for throttling. - * - * Scala API: There are some shorthands available to construct rates: - * {{{ - * import java.util.concurrent.TimeUnit._ - * import scala.concurrent.duration.{ Duration, FiniteDuration } - * - * val rate1 = 1 msgsPer (1, SECONDS) - * val rate2 = 1 msgsPer Duration(1, SECONDS) - * val rate3 = 1 msgsPer (1 seconds) - * val rate4 = 1 msgsPerSecond - * val rate5 = 1 msgsPerMinute - * val rate6 = 1 msgsPerHour - * }}} - * - * @param numberOfCalls the number of calls that may take place in a period - * @param duration the length of the period - * @see [[akka.contrib.throttle.Throttler]] - */ - final case class Rate(val numberOfCalls: Int, val duration: FiniteDuration) { - - /** - * The duration in milliseconds. - */ - def durationInMillis(): Long = duration.toMillis - } - - /** - * Set the target of a throttler. - * - * You may change a throttler's target at any time. - * - * Notice that the messages sent by the throttler to the target will have the original sender (and - * not the throttler) as the sender. (In Akka terms, the throttler `forward`s the message.) - * - * @param target if `target` is `None`, the throttler will stop delivering messages and the messages already received - * but not yet delivered, as well as any messages received in the future will be queued - * and eventually be delivered when a new target is set. If `target` is not `None`, the currently queued messages - * as well as any messages received in the future will be delivered to the new target at a rate not exceeding the current throttler's rate. - */ - final case class SetTarget(target: Option[ActorRef]) { - - /** - * Java API: - * @param target if `target` is `null`, the throttler will stop delivering messages and the messages already received - * but not yet delivered, as well as any messages received in the future will be queued - * and eventually be delivered when a new target is set. If `target` is not `null`, the currently queued messages - * as well as any messages received in the future will be delivered to the new target at a rate not exceeding - * the current throttler's rate. - */ - def this(target: ActorRef) = this(Option(target)) - } - - /** - * Set the rate of a throttler. - * - * You may change a throttler's rate at any time. - * - * @param rate the rate at which messages will be delivered to the target of the throttler - */ - final case class SetRate(rate: Rate) - - /** - * Helper for some syntactic sugar. - * - * @see [[akka.contrib.throttle.Throttler.Rate]] - */ - implicit class RateInt(val numberOfCalls: Int) extends AnyVal { - def msgsPer(duration: Int, timeUnit: TimeUnit) = Rate(numberOfCalls, Duration(duration, timeUnit)) - def msgsPer(duration: FiniteDuration) = Rate(numberOfCalls, duration) - def msgsPerSecond = Rate(numberOfCalls, Duration(1, TimeUnit.SECONDS)) - def msgsPerMinute = Rate(numberOfCalls, Duration(1, TimeUnit.MINUTES)) - def msgsPerHour = Rate(numberOfCalls, Duration(1, TimeUnit.HOURS)) - } - -} - -/** - * INTERNAL API - */ -private[throttle] object TimerBasedThrottler { - case object Tick - - // States of the FSM: A `TimerBasedThrottler` is in state `Active` iff the timer is running. - sealed trait State - case object Idle extends State - case object Active extends State - - // Messages, as we queue them to be sent later - final case class Message(message: Any, sender: ActorRef) - - // The data of the FSM - final case class Data(target: Option[ActorRef], callsLeftInThisPeriod: Int, queue: Q[Message]) -} - -/** - * A throttler that uses a timer to control the message delivery rate. - * - * == Throttling == - * A throttler is an actor that is defined through a target actor and a rate - * (of type [[akka.contrib.throttle.Throttler.Rate]]). You set or change the target and rate at any time through the - * [[akka.contrib.throttle.Throttler.SetTarget]] and [[akka.contrib.throttle.Throttler.SetRate]] - * messages, respectively. When you send the throttler any other message `msg`, it will - * put the message `msg` into an internal queue and eventually send all queued messages to the target, at - * a speed that respects the given rate. If no target is currently defined then the messages will be queued - * and will be delivered as soon as a target gets set. - * - * A throttler understands actor messages of type - * [[akka.contrib.throttle.Throttler.SetTarget]], [[akka.contrib.throttle.Throttler.SetRate]], in - * addition to any other messages, which the throttler will consider as messages to be sent to - * the target. - * - * == Transparency == - * Notice that the throttler `forward`s messages, i.e., the target will see the original message sender - * (and not the throttler) as the sender of the message. - * - * == Persistence == - * Throttlers usually use an internal queue to keep the messages that need to be sent to the target. - * You therefore cannot rely on the throttler's inbox size in order to learn how much messages are - * outstanding. - * - * It is left to the implementation whether the internal queue is persisted over application restarts or - * actor failure. - * - * == Processing messages == - * The target should process messages as fast as possible. If the target requires substantial time to - * process messages, it should distribute its work to other actors (using for example something like - * a `BalancingDispatcher`), otherwise the resulting system will always work below - * the threshold rate. - * - * Example: Suppose the throttler has a rate of 3msg/s and the target requires 1s to process a message. - * This system will only process messages at a rate of 1msg/s: the target will receive messages at at most 3msg/s - * but as it handles them synchronously and each of them takes 1s, its inbox will grow and grow. In such - * a situation, the target should distribute its messages to a set of worker actors so that individual messages - * can be handled in parallel. - * - * ==Example== - * For example, if you set a rate like "3 messages in 1 second", the throttler - * will send the first three messages immediately to the target actor but will need to impose a delay before - * sending out further messages: - * {{{ - * // A simple actor that prints whatever it receives - * class Printer extends Actor { - * def receive = { - * case x => println(x) - * } - * } - * - * val printer = system.actorOf(Props[Printer], "printer") - * - * // The throttler for this example, setting the rate - * val throttler = system.actorOf(Props(classOf[TimerBasedThrottler], 3 msgsPer 1.second)) - * - * // Set the target - * throttler ! SetTarget(Some(printer)) - * // These three messages will be sent to the printer immediately - * throttler ! "1" - * throttler ! "2" - * throttler ! "3" - * // These two will wait at least until 1 second has passed - * throttler ! "4" - * throttler ! "5" - * }}} - * - * ==Implementation notes== - * This throttler implementation internally installs a timer that repeats every `rate.durationInMillis` and enables `rate.numberOfCalls` - * additional calls to take place. A `TimerBasedThrottler` uses very few system resources, provided the rate's duration is not too - * fine-grained (which would cause a lot of timer invocations); for example, it does not store the calling history - * as other throttlers may need to do. - * - * However, a `TimerBasedThrottler` only provides ''weak guarantees'' on the rate (see also - * this blog post): - * - * - Only ''delivery'' times are taken into account: if, for example, the throttler is used to throttle - * requests to an external web service then only the start times of the web requests are considered. - * If a web request takes very long on the server then more than `rate.numberOfCalls`-many requests - * may be observed on the server in an interval of duration `rate.durationInMillis()`. - * - There may be intervals of duration `rate.durationInMillis()` that contain more than `rate.numberOfCalls` - * message deliveries: a `TimerBasedThrottler` only makes guarantees for the intervals - * of its ''own'' timer, namely that no more than `rate.numberOfCalls`-many messages are delivered within such intervals. Other intervals on the - * timeline may contain more calls. - * - * For some applications, these guarantees may not be sufficient. - * - * ==Known issues== - * - * - If you change the rate using `SetRate(rate)`, the actual rate may in fact be higher for the - * overlapping period (i.e., `durationInMillis()`) of the new and old rate. Therefore, - * changing the rate frequently is not recommended with the current implementation. - * - The queue of messages to be delivered is not persisted in any way; actor or system failure will - * cause the queued messages to be lost. - * - * @see [[akka.contrib.throttle.Throttler]] - */ -@deprecated("Use streams, see migration guide", "2.5.0") -class TimerBasedThrottler(var rate: Rate) extends Actor with FSM[State, Data] { - import FSM.`->` - - this.rate = normalizedRate(rate) - - startWith(Idle, Data(None, rate.numberOfCalls, Q())) - - // Idle: no messages, or target not set - when(Idle) { - // Set the rate - case Event(SetRate(newRate), d) => - this.rate = normalizedRate(newRate) - stay.using(d.copy(callsLeftInThisPeriod = rate.numberOfCalls)) - - // Set the target - case Event(SetTarget(t @ Some(_)), d) if !d.queue.isEmpty => - goto(Active).using(deliverMessages(d.copy(target = t))) - case Event(SetTarget(t), d) => - stay.using(d.copy(target = t)) - - // Queuing - case Event(msg, d @ Data(None, _, queue)) => - stay.using(d.copy(queue = queue.enqueue(Message(msg, context.sender())))) - case Event(msg, d @ Data(Some(_), _, Seq())) => - goto(Active).using(deliverMessages(d.copy(queue = Q(Message(msg, context.sender()))))) - // Note: The case Event(msg, t @ Data(Some(_), _, _, Seq(_*))) should never happen here. - } - - when(Active) { - // Set the rate - case Event(SetRate(newRate), d) => - this.rate = normalizedRate(newRate) - // Note: this should be improved (see "Known issues" in class comments) - stopTimer() - startTimer(rate) - stay.using(d.copy(callsLeftInThisPeriod = rate.numberOfCalls)) - - // Set the target (when the new target is None) - case Event(SetTarget(None), d) => - // Note: We do not yet switch to state `Inactive` because we need the timer to tick once more before - stay.using(d.copy(target = None)) - - // Set the target (when the new target is not None) - case Event(SetTarget(t @ Some(_)), d) => - stay.using(d.copy(target = t)) - - // Tick after a `SetTarget(None)`: take the additional permits and go to `Idle` - case Event(Tick, d @ Data(None, _, _)) => - goto(Idle).using(d.copy(callsLeftInThisPeriod = rate.numberOfCalls)) - - // Period ends and we have no more messages: take the additional permits and go to `Idle` - case Event(Tick, d @ Data(_, _, Seq())) => - goto(Idle).using(d.copy(callsLeftInThisPeriod = rate.numberOfCalls)) - - // Period ends and we get more occasions to send messages - case Event(Tick, d @ Data(_, _, _)) => - stay.using(deliverMessages(d.copy(callsLeftInThisPeriod = rate.numberOfCalls))) - - // Queue a message (when we cannot send messages in the current period anymore) - case Event(msg, d @ Data(_, 0, queue)) => - stay.using(d.copy(queue = queue.enqueue(Message(msg, context.sender())))) - - // Queue a message (when we can send some more messages in the current period) - case Event(msg, d @ Data(_, _, queue)) => - stay.using(deliverMessages(d.copy(queue = queue.enqueue(Message(msg, context.sender()))))) - } - - onTransition { - case Idle -> Active => startTimer(rate) - case Active -> Idle => stopTimer() - } - - initialize() - - private def startTimer(rate: Rate) = setTimer("morePermits", Tick, rate.duration, true) - private def stopTimer() = cancelTimer("morePermits") - - // Rate.numberOfCalls is an integer. So, the finest granularity of timing (i.e., highest - // precision) is achieved when it equals 1. So, the following function normalizes - // a Rate to 1 numberOfCall per calculated unit time. - private def normalizedRate(rate: Rate): Rate = { - // If number of calls is zero then we don't need to do anything - if (rate.numberOfCalls == 0) { - rate - } else { - Rate(1, FiniteDuration(rate.duration.toNanos / rate.numberOfCalls, TimeUnit.NANOSECONDS)) - } - } - - /** - * Send as many messages as we can (while respecting the rate) to the target and - * return the state data (with the queue containing the remaining ones). - */ - private def deliverMessages(data: Data): Data = { - val queue = data.queue - val nrOfMsgToSend = scala.math.min(queue.length, data.callsLeftInThisPeriod) - - queue.take(nrOfMsgToSend).foreach(x => data.target.get.tell(x.message, x.sender)) - - data.copy(queue = queue.drop(nrOfMsgToSend), callsLeftInThisPeriod = data.callsLeftInThisPeriod - nrOfMsgToSend) - } -} diff --git a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ReliableProxySpec.scala b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ReliableProxySpec.scala deleted file mode 100644 index dd9fd54f0c..0000000000 --- a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ReliableProxySpec.scala +++ /dev/null @@ -1,388 +0,0 @@ -/* - * Copyright (C) 2009-2019 Lightbend Inc. - */ - -package akka.contrib.pattern - -import language.postfixOps -import akka.remote.testkit.MultiNodeConfig -import akka.remote.testkit.MultiNodeSpec -import akka.remote.testkit.STMultiNodeSpec -import org.scalatest.BeforeAndAfterEach -import akka.remote.transport.ThrottlerTransportAdapter.Direction -import akka.actor._ -import akka.testkit.ImplicitSender - -import scala.concurrent.duration._ -import akka.actor.FSM -import akka.actor.ActorRef -import akka.testkit.TestKitExtension -import akka.actor.ActorIdentity -import akka.actor.Identify -import com.typesafe.config.ConfigFactory - -object ReliableProxySpec extends MultiNodeConfig { - val local = role("local") - val remote = role("remote") - - commonConfig(ConfigFactory.parseString(""" - akka.remote.artery.enabled = false - """)) - - testTransport(on = true) -} - -class ReliableProxyMultiJvmNode1 extends ReliableProxySpec -class ReliableProxyMultiJvmNode2 extends ReliableProxySpec - -class ReliableProxySpec - extends MultiNodeSpec(ReliableProxySpec) - with STMultiNodeSpec - with BeforeAndAfterEach - with ImplicitSender { - import ReliableProxySpec._ - import ReliableProxy._ - - override def initialParticipants = roles.size - - override def afterEach(): Unit = { - runOn(local) { - testConductor.passThrough(local, remote, Direction.Both).await - } - enterBarrier("after-each") - } - - @volatile var target: ActorRef = system.deadLetters - @volatile var proxy: ActorRef = system.deadLetters - - def idTarget(): Unit = { - system.actorSelection(node(remote) / "user" / "echo") ! Identify("echo") - target = expectMsgType[ActorIdentity].ref.get - } - - def startTarget(): Unit = { - target = system.actorOf(Props(new Actor { - def receive = { - case x => testActor ! x - } - }).withDeploy(Deploy.local), "echo") - } - - def stopProxy(): Unit = { - val currentProxy = proxy - currentProxy ! FSM.UnsubscribeTransitionCallBack(testActor) - currentProxy ! PoisonPill - expectTerminated(currentProxy) - } - - def expectState(s: State) = expectMsg(FSM.CurrentState(proxy, s)) - def expectTransition(s1: State, s2: State) = expectMsg(FSM.Transition(proxy, s1, s2)) - def expectTransition(max: FiniteDuration, s1: State, s2: State) = expectMsg(max, FSM.Transition(proxy, s1, s2)) - - def sendN(n: Int) = (1 to n).foreach(proxy ! _) - def expectN(n: Int) = (1 to n).foreach { n => - expectMsg(n); lastSender should ===(target) - } - - // avoid too long timeout for expectNoMsg when using dilated timeouts, because - // blackhole will trigger failure detection - val expectNoMsgTimeout = { - val timeFactor = TestKitExtension(system).TestTimeFactor - if (timeFactor > 1.0) (1.0 / timeFactor).seconds else 1.second - } - - "A ReliableProxy" must { - - "initialize properly" in { - runOn(remote) { - startTarget() - } - - enterBarrier("initialize") - - runOn(local) { - import akka.contrib.pattern.ReliableProxy - - idTarget() - proxy = system.actorOf(ReliableProxy.props(target.path, 100.millis, 5.seconds), "proxy1") - watch(proxy) - proxy ! FSM.SubscribeTransitionCallBack(testActor) - expectState(Connecting) - proxy ! "hello" - expectMsgType[TargetChanged] - expectTransition(Connecting, Active) - expectTransition(Active, Idle) - } - - runOn(remote) { - expectMsg(1.second, "hello") - } - - enterBarrier("initialize-done") - } - - "forward messages in sequence" in { - runOn(local) { - sendN(100) - expectTransition(Idle, Active) - expectTransition(Active, Idle) - } - runOn(remote) { - within(5 seconds) { - expectN(100) - } - } - - enterBarrier("test1a") - - runOn(local) { - sendN(100) - expectTransition(Idle, Active) - expectTransition(Active, Idle) - } - runOn(remote) { - within(5 seconds) { - expectN(100) - } - } - - enterBarrier("test1b") - } - - "retry when sending fails" in { - runOn(local) { - testConductor.blackhole(local, remote, Direction.Send).await - sendN(100) - expectTransition(1 second, Idle, Active) - expectNoMsg(expectNoMsgTimeout) - } - - enterBarrier("test2a") - - runOn(remote) { - expectNoMsg(0 seconds) - } - - enterBarrier("test2b") - - runOn(local) { - testConductor.passThrough(local, remote, Direction.Send).await - expectTransition(5 seconds, Active, Idle) - } - runOn(remote) { - within(5 seconds) { - expectN(100) - } - } - - enterBarrier("test2c") - } - - "retry when receiving fails" in { - runOn(local) { - testConductor.blackhole(local, remote, Direction.Receive).await - sendN(100) - expectTransition(1 second, Idle, Active) - expectNoMsg(expectNoMsgTimeout) - } - runOn(remote) { - within(5 second) { - expectN(100) - } - } - - enterBarrier("test3a") - - runOn(local) { - testConductor.passThrough(local, remote, Direction.Receive).await - expectTransition(5 seconds, Active, Idle) - } - - enterBarrier("test3b") - } - - "resend across a slow outbound link" in { - runOn(local) { - // the rateMBit value is derived from empirical studies so that it will trigger resends, - // the exact value is not important, but it should not be too large - testConductor.throttle(local, remote, Direction.Send, rateMBit = 0.02).await - sendN(50) - within(5 seconds) { - expectTransition(Idle, Active) - // use the slow link for a while, which will trigger resends - Thread.sleep(2000) - // full speed, and it will catch up outstanding messages - testConductor.passThrough(local, remote, Direction.Send).await - expectTransition(Active, Idle) - } - } - runOn(remote) { - within(5 seconds) { - expectN(50) - } - expectNoMsg(expectNoMsgTimeout) - } - - enterBarrier("test4") - } - - "resend across a slow inbound link" in { - runOn(local) { - testConductor.passThrough(local, remote, Direction.Send).await - // the rateMBit value is derived from empirical studies so that it will trigger resends, - // the exact value is not important, but it should not be too large - testConductor.throttle(local, remote, Direction.Receive, rateMBit = 0.02).await - sendN(50) - within(5 seconds) { - expectTransition(Idle, Active) - // use the slow link for a while, which will trigger resends - Thread.sleep(2000) - // full speed, and it will catch up outstanding messages - testConductor.passThrough(local, remote, Direction.Receive).await - expectTransition(Active, Idle) - } - } - runOn(remote) { - within(5 second) { - expectN(50) - } - expectNoMsg(1 seconds) - } - - enterBarrier("test5") - } - - "reconnect to target" in { - runOn(remote) { - // Stop the target - system.stop(target) - } - - runOn(local) { - // After the target stops the proxy will change to Reconnecting - within(5 seconds) { - expectTransition(Idle, Connecting) - } - // Send some messages while it's reconnecting - sendN(50) - } - - enterBarrier("test6a") - - runOn(remote) { - // Restart the target to have something to reconnect to - startTarget() - } - - runOn(local) { - // After reconnecting a we'll get a TargetChanged message - // and the proxy will transition to Active to send the outstanding messages - within(10 seconds) { - expectMsgType[TargetChanged] - expectTransition(Connecting, Active) - } - } - - enterBarrier("test6b") - - runOn(local) { - // After the messages have been delivered, proxy is back to idle - expectTransition(Active, Idle) - } - - runOn(remote) { - expectN(50) - } - - enterBarrier("test6c") - } - - "stop proxy if target stops and no reconnection" in { - runOn(local) { - stopProxy() // Stop previous proxy - - // Start new proxy with no reconnections - proxy = system.actorOf(ReliableProxy.props(target.path, 100.millis), "proxy2") - proxy ! FSM.SubscribeTransitionCallBack(testActor) - watch(proxy) - - expectState(Connecting) - expectMsgType[TargetChanged] - expectTransition(Connecting, Idle) - } - - enterBarrier("test7a") - - runOn(remote) { - // Stop the target, this will cause the proxy to stop - system.stop(target) - } - - runOn(local) { - within(5 seconds) { - expectMsgType[ProxyTerminated] - expectTerminated(proxy) - } - } - - enterBarrier("test7b") - - } - - "stop proxy after max reconnections" in { - runOn(remote) { - // Target is not running after previous test, start it - startTarget() - } - enterBarrier("target-started") - - runOn(local) { - // Get new target's ref - idTarget() - } - - enterBarrier("test8a") - - runOn(local) { - // Proxy is not running after previous test - // Start new proxy with 3 reconnections every 2 sec - proxy = system.actorOf(ReliableProxy.props(target.path, 100.millis, 2.seconds, 3), "proxy3") - proxy ! FSM.SubscribeTransitionCallBack(testActor) - watch(proxy) - expectState(Connecting) - expectMsgType[TargetChanged] - expectTransition(Connecting, Idle) - } - - enterBarrier("test8b") - - runOn(remote) { - // Stop target - system.stop(target) - } - - runOn(local) { - // Wait for transition to Connecting, then send messages - within(5 seconds) { - expectTransition(Idle, Connecting) - } - sendN(50) - } - - enterBarrier("test8c") - - runOn(local) { - // After max reconnections, proxy stops itself. Expect ProxyTerminated(Unsent(msgs, sender, serial)). - within(5 * 2.seconds) { - val proxyTerm = expectMsgType[ProxyTerminated] - // Validate that the unsent messages are 50 ints - val unsentInts = proxyTerm.outstanding.queue.collect { case Message(i: Int, _, _) if i > 0 && i <= 50 => i } - unsentInts should have size 50 - expectTerminated(proxy) - } - } - - enterBarrier("test8d") - } - } -} diff --git a/akka-contrib/src/test/java/akka/contrib/pattern/ReliableProxyTest.java b/akka-contrib/src/test/java/akka/contrib/pattern/ReliableProxyTest.java deleted file mode 100644 index 5cd10b689e..0000000000 --- a/akka-contrib/src/test/java/akka/contrib/pattern/ReliableProxyTest.java +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Copyright (C) 2009-2019 Lightbend Inc. - */ - -package akka.contrib.pattern; - -import java.util.concurrent.TimeUnit; - -import akka.actor.*; -import akka.testkit.AkkaJUnitActorSystemResource; - -import org.junit.ClassRule; -import org.junit.Test; - -import org.scalatest.junit.JUnitSuite; -import scala.concurrent.duration.Duration; -import akka.testkit.TestProbe; - -// #import -import akka.contrib.pattern.ReliableProxy; - -// #import - -public class ReliableProxyTest extends JUnitSuite { - - @ClassRule - public static AkkaJUnitActorSystemResource actorSystemResource = - new AkkaJUnitActorSystemResource("ReliableProxyTest"); - - private final ActorSystem system = actorSystemResource.getSystem(); - - public // #demo-proxy - static class ProxyParent extends AbstractActor { - private final ActorRef proxy; - - public ProxyParent(ActorPath targetPath) { - proxy = - getContext() - .actorOf( - ReliableProxy.props(targetPath, Duration.create(100, TimeUnit.MILLISECONDS))); - } - - @Override - public Receive createReceive() { - return receiveBuilder() - .matchEquals( - "hello", - m -> { - proxy.tell("world!", self()); - }) - .build(); - } - } - - // #demo-proxy - - public // #demo-transition - static class ProxyTransitionParent extends AbstractActor { - private final ActorRef proxy; - private ActorRef client = null; - - public ProxyTransitionParent(ActorPath targetPath) { - proxy = - getContext() - .actorOf( - ReliableProxy.props(targetPath, Duration.create(100, TimeUnit.MILLISECONDS))); - proxy.tell(new FSM.SubscribeTransitionCallBack(getSelf()), getSelf()); - } - - @Override - public Receive createReceive() { - return receiveBuilder() - .matchEquals( - "hello", - message -> { - proxy.tell("world!", self()); - client = sender(); - }) - .matchUnchecked( - FSM.CurrentState.class, - (FSM.CurrentState state) -> { - // get initial state - }) - .matchUnchecked( - FSM.Transition.class, - (FSM.Transition transition) -> { - assert transition.fsmRef().equals(proxy); - if (transition.from().equals(ReliableProxy.active()) - && transition.to().equals(ReliableProxy.idle())) { - client.tell("done", self()); - } - }) - .build(); - } - } - - // #demo-transition - - @Test - public void demonstrateUsage() { - final TestProbe probe = TestProbe.apply(system); - final ActorRef target = probe.ref(); - final ActorRef parent = system.actorOf(Props.create(ProxyParent.class, target.path())); - parent.tell("hello", null); - probe.expectMsg("world!"); - } - - @Test - public void demonstrateTransitions() { - final ActorRef target = TestProbe.apply(system).ref(); - final ActorRef parent = - system.actorOf(Props.create(ProxyTransitionParent.class, target.path())); - final TestProbe probe = TestProbe.apply(system); - parent.tell("hello", probe.ref()); - probe.expectMsg("done"); - } -} diff --git a/akka-contrib/src/test/java/akka/contrib/throttle/TimerBasedThrottlerTest.java b/akka-contrib/src/test/java/akka/contrib/throttle/TimerBasedThrottlerTest.java deleted file mode 100644 index d9caa59bfb..0000000000 --- a/akka-contrib/src/test/java/akka/contrib/throttle/TimerBasedThrottlerTest.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Copyright (C) 2009-2019 Lightbend Inc. - */ - -package akka.contrib.throttle; - -import org.junit.ClassRule; -import org.junit.Test; - -import java.util.concurrent.TimeUnit; - -import org.scalatest.junit.JUnitSuite; -import scala.concurrent.duration.Duration; -import com.typesafe.config.ConfigFactory; -import akka.actor.ActorRef; -import akka.actor.ActorSystem; -import akka.actor.Props; -import akka.actor.AbstractActor; -import akka.testkit.AkkaJUnitActorSystemResource; - -public class TimerBasedThrottlerTest extends JUnitSuite { - - @ClassRule - public static AkkaJUnitActorSystemResource actorSystemResource = - new AkkaJUnitActorSystemResource( - "TimerBasedThrottlerTest", ConfigFactory.parseString("akka.log-dead-letters=off")); - - private final ActorSystem system = actorSystemResource.getSystem(); - - @Test - public void demonstrateUsage() { - // #demo-code - // A simple actor that prints whatever it receives - ActorRef printer = system.actorOf(Props.create(Printer.class)); - // The throttler for this example, setting the rate - ActorRef throttler = - system.actorOf( - Props.create( - TimerBasedThrottler.class, - new Throttler.Rate(3, Duration.create(1, TimeUnit.SECONDS)))); - // Set the target - throttler.tell(new Throttler.SetTarget(printer), null); - // These three messages will be sent to the target immediately - throttler.tell("1", null); - throttler.tell("2", null); - throttler.tell("3", null); - // These two will wait until a second has passed - throttler.tell("4", null); - throttler.tell("5", null); - - // #demo-code - - } - - public // #demo-code - // A simple actor that prints whatever it receives - static class Printer extends AbstractActor { - @Override - public Receive createReceive() { - return receiveBuilder() - .matchAny( - message -> { - System.out.println(message); - }) - .build(); - } - } - - // #demo-code - - static class System { - static Out out = new Out(); - - static class Out { - void println(Object s) {} - } - } -} diff --git a/akka-contrib/src/test/resources/reference.conf b/akka-contrib/src/test/resources/reference.conf deleted file mode 100644 index e43aa8711e..0000000000 --- a/akka-contrib/src/test/resources/reference.conf +++ /dev/null @@ -1,8 +0,0 @@ -akka { - actor { - serialize-creators = on - serialize-messages = on - warn-about-java-serializer-usage = off - } - remote.netty.tcp.port = 0 -} diff --git a/akka-contrib/src/test/scala/akka/contrib/circuitbreaker/sample/CircuitBreaker.scala b/akka-contrib/src/test/scala/akka/contrib/circuitbreaker/sample/CircuitBreaker.scala deleted file mode 100644 index cb6a35f2c4..0000000000 --- a/akka-contrib/src/test/scala/akka/contrib/circuitbreaker/sample/CircuitBreaker.scala +++ /dev/null @@ -1,202 +0,0 @@ -/* - * Copyright (C) 2014-2019 Lightbend Inc. - */ - -package akka.contrib.circuitbreaker.sample - -import akka.actor.{ Actor, ActorLogging, ActorRef } -import akka.contrib.circuitbreaker.CircuitBreakerProxy.{ CircuitBreakerPropsBuilder, CircuitOpenFailure } -import akka.contrib.circuitbreaker.sample.CircuitBreaker.AskFor -import akka.util.Timeout - -import scala.concurrent.duration._ -import scala.util.{ Failure, Random, Success } - -//#simple-service -object SimpleService { - case class Request(content: String) - case class Response(content: Either[String, String]) - case object ResetCount -} - -/** - * This is a simple actor simulating a service - * - Becoming slower with the increase of frequency of input requests - * - Failing around 30% of the requests - */ -class SimpleService extends Actor with ActorLogging { - import SimpleService._ - - var messageCount = 0 - - import context.dispatcher - - context.system.scheduler.schedule(1.second, 1.second, self, ResetCount) - - override def receive = { - case ResetCount => - messageCount = 0 - - case Request(content) => - messageCount += 1 - // simulate workload - Thread.sleep(100 * messageCount) - // Fails around 30% of the times - if (Random.nextInt(100) < 70) { - sender ! Response(Right(s"Successfully processed $content")) - } else { - sender ! Response(Left(s"Failure processing $content")) - } - - } -} -//#simple-service - -object CircuitBreaker { - case class AskFor(what: String) -} - -//#basic-sample -class CircuitBreaker(potentiallyFailingService: ActorRef) extends Actor with ActorLogging { - import SimpleService._ - - val serviceCircuitBreaker = - context.actorOf( - CircuitBreakerPropsBuilder(maxFailures = 3, callTimeout = 2.seconds, resetTimeout = 30.seconds) - .copy(failureDetector = { - _ match { - case Response(Left(_)) => true - case _ => false - } - }) - .props(potentiallyFailingService), - "serviceCircuitBreaker") - - override def receive: Receive = { - case AskFor(requestToForward) => - serviceCircuitBreaker ! Request(requestToForward) - - case Right(Response(content)) => - //handle response - log.info("Got successful response {}", content) - - case Response(Right(content)) => - //handle response - log.info("Got successful response {}", content) - - case Response(Left(content)) => - //handle response - log.info("Got failed response {}", content) - - case CircuitOpenFailure(failedMsg) => - log.warning("Unable to send message {}", failedMsg) - } -} -//#basic-sample - -//#ask-sample -class CircuitBreakerAsk(potentiallyFailingService: ActorRef) extends Actor with ActorLogging { - import SimpleService._ - import akka.pattern._ - - implicit val askTimeout: Timeout = 2.seconds - - val serviceCircuitBreaker = - context.actorOf( - CircuitBreakerPropsBuilder(maxFailures = 3, callTimeout = askTimeout, resetTimeout = 30.seconds) - .copy(failureDetector = { - _ match { - case Response(Left(_)) => true - case _ => false - } - }) - .copy(openCircuitFailureConverter = { failure => - Left(s"Circuit open when processing ${failure.failedMsg}") - }) - .props(potentiallyFailingService), - "serviceCircuitBreaker") - - import context.dispatcher - - override def receive: Receive = { - case AskFor(requestToForward) => - (serviceCircuitBreaker ? Request(requestToForward)).mapTo[Either[String, String]].onComplete { - case Success(Right(successResponse)) => - //handle response - log.info("Got successful response {}", successResponse) - - case Success(Left(failureResponse)) => - //handle response - log.info("Got successful response {}", failureResponse) - - case Failure(exception) => - //handle response - log.info("Got successful response {}", exception) - - } - } -} -//#ask-sample - -//#ask-with-failure-sample -class CircuitBreakerAskWithFailure(potentiallyFailingService: ActorRef) extends Actor with ActorLogging { - import SimpleService._ - import akka.pattern._ - import akka.contrib.circuitbreaker.Implicits.futureExtensions - - implicit val askTimeout: Timeout = 2.seconds - - val serviceCircuitBreaker = - context.actorOf( - CircuitBreakerPropsBuilder(maxFailures = 3, callTimeout = askTimeout, resetTimeout = 30.seconds) - .props(target = potentiallyFailingService), - "serviceCircuitBreaker") - - import context.dispatcher - - override def receive: Receive = { - case AskFor(requestToForward) => - (serviceCircuitBreaker ? Request(requestToForward)).failForOpenCircuit.mapTo[String].onComplete { - case Success(successResponse) => - //handle response - log.info("Got successful response {}", successResponse) - - case Failure(exception) => - //handle response - log.info("Got successful response {}", exception) - - } - } -} -//#ask-with-failure-sample - -//#ask-with-circuit-breaker-sample -class CircuitBreakerAskWithCircuitBreaker(potentiallyFailingService: ActorRef) extends Actor with ActorLogging { - import SimpleService._ - import akka.contrib.circuitbreaker.Implicits.askWithCircuitBreaker - - implicit val askTimeout: Timeout = 2.seconds - - val serviceCircuitBreaker = - context.actorOf( - CircuitBreakerPropsBuilder(maxFailures = 3, callTimeout = askTimeout, resetTimeout = 30.seconds) - .props(target = potentiallyFailingService), - "serviceCircuitBreaker") - - import context.dispatcher - - override def receive: Receive = { - case AskFor(requestToForward) => - serviceCircuitBreaker.askWithCircuitBreaker(Request(requestToForward)).mapTo[String].onComplete { - case Success(successResponse) => - //handle response - log.info("Got successful response {}", successResponse) - - case Failure(exception) => - //handle response - log.info("Got successful response {}", exception) - - } - } -} -//#ask-with-circuit-breaker-sample diff --git a/akka-contrib/src/test/scala/akka/contrib/jul/JavaLoggerSpec.scala b/akka-contrib/src/test/scala/akka/contrib/jul/JavaLoggerSpec.scala deleted file mode 100644 index c9890d20e3..0000000000 --- a/akka-contrib/src/test/scala/akka/contrib/jul/JavaLoggerSpec.scala +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Copyright (C) 2009-2019 Lightbend Inc. - */ - -package akka.contrib.jul - -import com.typesafe.config.ConfigFactory -import akka.actor.{ Actor, ActorLogging, Props } -import akka.testkit.AkkaSpec -import java.util.logging - -object JavaLoggerSpec { - - val config = ConfigFactory.parseString(""" - akka { - loglevel = INFO - loggers = ["akka.contrib.jul.JavaLogger"] - }""") - - class LogProducer extends Actor with ActorLogging { - def receive = { - case e: Exception => - log.error(e, e.getMessage) - case (s: String, x: Int) => - log.info(s, x) - } - } -} - -class JavaLoggerSpec extends AkkaSpec(JavaLoggerSpec.config) { - - val logger = logging.Logger.getLogger("akka://JavaLoggerSpec/user/log") - logger.setUseParentHandlers(false) // turn off output of test LogRecords - logger.addHandler(new logging.Handler { - def publish(record: logging.LogRecord): Unit = { - testActor ! record - } - - def flush(): Unit = {} - def close(): Unit = {} - }) - - val producer = system.actorOf(Props[JavaLoggerSpec.LogProducer], name = "log") - - "JavaLogger" must { - - "log error with stackTrace" in { - producer ! new RuntimeException("Simulated error") - - val record = expectMsgType[logging.LogRecord] - - record should not be (null) - record.getMillis should not be (0) - record.getThreadID should not be (0) - record.getLevel should ===(logging.Level.SEVERE) - record.getMessage should ===("Simulated error") - record.getThrown.isInstanceOf[RuntimeException] should ===(true) - record.getSourceClassName should ===(classOf[JavaLoggerSpec.LogProducer].getName) - record.getSourceMethodName should ===(null) - } - - "log info without stackTrace" in { - producer ! (("{} is the magic number", 3)) - - val record = expectMsgType[logging.LogRecord] - - record should not be (null) - record.getMillis should not be (0) - record.getThreadID should not be (0) - record.getLevel should ===(logging.Level.INFO) - record.getMessage should ===("3 is the magic number") - record.getThrown should ===(null) - record.getSourceClassName should ===(classOf[JavaLoggerSpec.LogProducer].getName) - record.getSourceMethodName should ===(null) - } - } - -} diff --git a/akka-contrib/src/test/scala/akka/contrib/mailbox/PeekMailboxSpec.scala b/akka-contrib/src/test/scala/akka/contrib/mailbox/PeekMailboxSpec.scala deleted file mode 100644 index 63b8b99f47..0000000000 --- a/akka-contrib/src/test/scala/akka/contrib/mailbox/PeekMailboxSpec.scala +++ /dev/null @@ -1,133 +0,0 @@ -/* - * Copyright (C) 2009-2019 Lightbend Inc. - */ - -package akka.contrib.mailbox - -import com.typesafe.config.ConfigFactory - -import akka.actor.{ actorRef2Scala, Actor, ActorSystem, DeadLetter, PoisonPill, Props } -import akka.testkit.{ AkkaSpec, EventFilter, ImplicitSender } - -object PeekMailboxSpec { - case object Check - case object DoubleAck - class PeekActor(tries: Int) extends Actor { - var togo = tries - def receive = { - case Check => - sender() ! Check - PeekMailboxExtension.ack() - case DoubleAck => - PeekMailboxExtension.ack() - PeekMailboxExtension.ack() - case msg => - sender() ! msg - if (togo == 0) throw new RuntimeException("DONTWANNA") - togo -= 1 - PeekMailboxExtension.ack() - } - override def preRestart(cause: Throwable, msg: Option[Any]): Unit = { - for (m <- msg if m == "DIE") context.stop(self) // for testing the case of mailbox.cleanUp - } - } -} - -class PeekMailboxSpec extends AkkaSpec(""" - peek-dispatcher { - mailbox-type = "akka.contrib.mailbox.PeekMailboxType" - max-retries = 2 - } - """) with ImplicitSender { - - import PeekMailboxSpec._ - - "A PeekMailbox" must { - - "retry messages" in { - val a = system.actorOf(Props(classOf[PeekActor], 1).withDispatcher("peek-dispatcher")) - a ! "hello" - expectMsg("hello") - EventFilter[RuntimeException]("DONTWANNA", occurrences = 1).intercept { - a ! "world" - } - expectMsg("world") - expectMsg("world") - a ! Check - expectMsg(Check) - } - - "put a bound on retries" in { - val a = system.actorOf(Props(classOf[PeekActor], 0).withDispatcher("peek-dispatcher")) - EventFilter[RuntimeException]("DONTWANNA", occurrences = 3).intercept { - a ! "hello" - } - a ! Check - expectMsg("hello") - expectMsg("hello") - expectMsg("hello") - expectMsg(Check) - } - - "not waste messages on double-ack()" in { - val a = system.actorOf(Props(classOf[PeekActor], 0).withDispatcher("peek-dispatcher")) - a ! DoubleAck - a ! Check - expectMsg(Check) - } - - "support cleanup" in { - system.eventStream.subscribe(testActor, classOf[DeadLetter]) - val a = system.actorOf(Props(classOf[PeekActor], 0).withDispatcher("peek-dispatcher")) - watch(a) - EventFilter[RuntimeException]("DONTWANNA", occurrences = 1).intercept { - a ! "DIE" // stays in the mailbox - } - expectMsg("DIE") - expectMsgType[DeadLetter].message should ===("DIE") - expectTerminated(a) - } - - } - -} - -//#demo -class MyActor extends Actor { - def receive = { - case msg => - println(msg) - doStuff(msg) // may fail - PeekMailboxExtension.ack() - } - - //#business-logic-elided - var i = 0 - def doStuff(m: Any): Unit = { - if (i == 1) throw new Exception("DONTWANNA") - i += 1 - } - - override def postStop(): Unit = { - context.system.terminate() - } - //#business-logic-elided -} - -object MyApp extends App { - val system = ActorSystem( - "MySystem", - ConfigFactory.parseString(""" - peek-dispatcher { - mailbox-type = "akka.contrib.mailbox.PeekMailboxType" - max-retries = 2 - } - """)) - - val myActor = system.actorOf(Props[MyActor].withDispatcher("peek-dispatcher"), name = "myActor") - - myActor ! "Hello" - myActor ! "World" - myActor ! PoisonPill -} -//#demo diff --git a/akka-contrib/src/test/scala/akka/contrib/pattern/AggregatorSpec.scala b/akka-contrib/src/test/scala/akka/contrib/pattern/AggregatorSpec.scala deleted file mode 100644 index 7b0b48ed08..0000000000 --- a/akka-contrib/src/test/scala/akka/contrib/pattern/AggregatorSpec.scala +++ /dev/null @@ -1,396 +0,0 @@ -/* - * Copyright (C) 2009-2019 Lightbend Inc. - */ - -package akka.contrib.pattern - -import akka.testkit.{ ImplicitSender, TestKit } -import org.scalatest.FunSuiteLike -import org.scalatest.Matchers -import scala.annotation.tailrec -import scala.collection._ -import scala.concurrent.duration._ -import scala.math.BigDecimal.int2bigDecimal -import akka.actor._ -import org.scalatest.BeforeAndAfterAll - -/** - * Sample and test code for the aggregator patter. - * This is based on Jamie Allen's tutorial at - * http://jaxenter.com/tutorial-asynchronous-programming-with-akka-actors-46220.html - */ -sealed trait AccountType -case object Checking extends AccountType -case object Savings extends AccountType -case object MoneyMarket extends AccountType - -final case class GetCustomerAccountBalances(id: Long, accountTypes: Set[AccountType]) -final case class GetAccountBalances(id: Long) - -final case class AccountBalances(accountType: AccountType, balance: Option[List[(Long, BigDecimal)]]) - -final case class CheckingAccountBalances(balances: Option[List[(Long, BigDecimal)]]) -final case class SavingsAccountBalances(balances: Option[List[(Long, BigDecimal)]]) -final case class MoneyMarketAccountBalances(balances: Option[List[(Long, BigDecimal)]]) - -case object TimedOut -case object CantUnderstand - -class SavingsAccountProxy extends Actor { - def receive = { - case GetAccountBalances(id: Long) => - sender() ! SavingsAccountBalances(Some(List((1, 150000), (2, 29000)))) - } -} -class CheckingAccountProxy extends Actor { - def receive = { - case GetAccountBalances(id: Long) => - sender() ! CheckingAccountBalances(Some(List((3, 15000)))) - } -} -class MoneyMarketAccountProxy extends Actor { - def receive = { - case GetAccountBalances(id: Long) => - sender() ! MoneyMarketAccountBalances(None) - } -} - -class AccountBalanceRetriever extends Actor with Aggregator { - - import context._ - - //#initial-expect - expectOnce { - case GetCustomerAccountBalances(id, types) => - new AccountAggregator(sender(), id, types) - case _ => - sender() ! CantUnderstand - context.stop(self) - } - //#initial-expect - - class AccountAggregator(originalSender: ActorRef, id: Long, types: Set[AccountType]) { - - val results = - mutable.ArrayBuffer.empty[(AccountType, Option[List[(Long, BigDecimal)]])] - - if (types.size > 0) - types.foreach { - case Checking => fetchCheckingAccountsBalance() - case Savings => fetchSavingsAccountsBalance() - case MoneyMarket => fetchMoneyMarketAccountsBalance() - } else collectBalances() // Empty type list yields empty response - - context.system.scheduler.scheduleOnce(1.second, self, TimedOut) - //#expect-timeout - expect { - case TimedOut => collectBalances(force = true) - } - //#expect-timeout - - //#expect-balance - def fetchCheckingAccountsBalance(): Unit = { - context.actorOf(Props[CheckingAccountProxy]) ! GetAccountBalances(id) - expectOnce { - case CheckingAccountBalances(balances) => - results += (Checking -> balances) - collectBalances() - } - } - //#expect-balance - - def fetchSavingsAccountsBalance(): Unit = { - context.actorOf(Props[SavingsAccountProxy]) ! GetAccountBalances(id) - expectOnce { - case SavingsAccountBalances(balances) => - results += (Savings -> balances) - collectBalances() - } - } - - def fetchMoneyMarketAccountsBalance(): Unit = { - context.actorOf(Props[MoneyMarketAccountProxy]) ! GetAccountBalances(id) - expectOnce { - case MoneyMarketAccountBalances(balances) => - results += (MoneyMarket -> balances) - collectBalances() - } - } - - def collectBalances(force: Boolean = false): Unit = { - if (results.size == types.size || force) { - originalSender ! results.toList // Make sure it becomes immutable - context.stop(self) - } - } - } -} -//#demo-code - -//#chain-sample -final case class InitialRequest(name: String) -final case class Request(name: String) -final case class Response(name: String, value: String) -final case class EvaluationResults(name: String, eval: List[Int]) -final case class FinalResponse(qualifiedValues: List[String]) - -/** - * An actor sample demonstrating use of unexpect and chaining. - * This is just an example and not a complete test case. - */ -class ChainingSample extends Actor with Aggregator { - - expectOnce { - case InitialRequest(name) => new MultipleResponseHandler(sender(), name) - } - - class MultipleResponseHandler(originalSender: ActorRef, propName: String) { - - import context.dispatcher - import collection.mutable.ArrayBuffer - - val values = ArrayBuffer.empty[String] - - context.actorSelection("/user/request_proxies") ! Request(propName) - context.system.scheduler.scheduleOnce(50.milliseconds, self, TimedOut) - - //#unexpect-sample - val handle = expect { - case Response(name, value) => - values += value - if (values.size > 3) processList() - case TimedOut => processList() - } - - def processList(): Unit = { - unexpect(handle) - - if (values.size > 0) { - context.actorSelection("/user/evaluator") ! values.toList - expectOnce { - case EvaluationResults(name, eval) => processFinal(eval) - } - } else processFinal(List.empty[Int]) - } - //#unexpect-sample - - def processFinal(eval: List[Int]): Unit = { - // Select only the entries coming back from eval - originalSender ! FinalResponse(eval.map(values)) - context.stop(self) - } - } -} -//#chain-sample - -class AggregatorSpec - extends TestKit(ActorSystem("AggregatorSpec")) - with ImplicitSender - with FunSuiteLike - with Matchers - with BeforeAndAfterAll { - - override def afterAll(): Unit = { - shutdown() - } - - test("Test request 1 account type") { - system.actorOf(Props[AccountBalanceRetriever]) ! GetCustomerAccountBalances(1, Set(Savings)) - receiveOne(10.seconds) match { - case result: List[_] => - result should have size 1 - case result => - assert(false, s"Expect List, got ${result.getClass}") - } - } - - test("Test request 3 account types") { - system.actorOf(Props[AccountBalanceRetriever]) ! - GetCustomerAccountBalances(1, Set(Checking, Savings, MoneyMarket)) - receiveOne(10.seconds) match { - case result: List[_] => - result should have size 3 - case result => - assert(false, s"Expect List, got ${result.getClass}") - } - } -} - -final case class TestEntry(id: Int) - -class WorkListSpec extends FunSuiteLike { - - val workList = WorkList.empty[TestEntry] - var entry2: TestEntry = null - var entry4: TestEntry = null - - test("Processing empty WorkList") { - // ProcessAndRemove something in the middle - val processed = workList.process { - case TestEntry(9) => true - case _ => false - } - assert(!processed) - } - - test("Insert temp entries") { - assert(workList.head === workList.tail) - - val entry0 = TestEntry(0) - workList.add(entry0, permanent = false) - - assert(workList.head.next != null) - assert(workList.tail === workList.head.next) - assert(workList.tail.ref.get === entry0) - - val entry1 = TestEntry(1) - workList.add(entry1, permanent = false) - - assert(workList.head.next != workList.tail) - assert(workList.head.next.ref.get === entry0) - assert(workList.tail.ref.get === entry1) - - entry2 = TestEntry(2) - workList.add(entry2, permanent = false) - - assert(workList.tail.ref.get === entry2) - - val entry3 = TestEntry(3) - workList.add(entry3, permanent = false) - - assert(workList.tail.ref.get === entry3) - } - - test("Process temp entries") { - - // ProcessAndRemove something in the middle - assert(workList.process { - case TestEntry(2) => true - case _ => false - }) - - // ProcessAndRemove the head - assert(workList.process { - case TestEntry(0) => true - case _ => false - }) - - // ProcessAndRemove the tail - assert(workList.process { - case TestEntry(3) => true - case _ => false - }) - } - - test("Re-insert permanent entry") { - entry4 = TestEntry(4) - workList.add(entry4, permanent = true) - - assert(workList.tail.ref.get === entry4) - } - - test("Process permanent entry") { - assert(workList.process { - case TestEntry(4) => true - case _ => false - }) - } - - test("Remove permanent entry") { - val removed = workList.remove(entry4) - assert(removed) - } - - test("Remove temp entry already processed") { - val removed = workList.remove(entry2) - assert(!removed) - } - - test("Process non-matching entries") { - - val processed = - workList.process { - case TestEntry(2) => true - case _ => false - } - - assert(!processed) - - val processed2 = - workList.process { - case TestEntry(5) => true - case _ => false - } - - assert(!processed2) - - } - - test("Append two lists") { - workList.removeAll() - (0 to 4).foreach { id => - workList.add(TestEntry(id), permanent = false) - } - - val l2 = new WorkList[TestEntry] - (5 to 9).foreach { id => - l2.add(TestEntry(id), permanent = true) - } - - workList.addAll(l2) - - @tailrec - def checkEntries(id: Int, entry: WorkList.Entry[TestEntry]): Int = { - if (entry == null) id - else { - assert(entry.ref.get.id === id) - checkEntries(id + 1, entry.next) - } - } - - assert(checkEntries(0, workList.head.next) === 10) - } - - test("Clear list") { - workList.removeAll() - assert(workList.head.next === null) - assert(workList.tail === workList.head) - } - - val workList2 = WorkList.empty[PartialFunction[Any, Unit]] - - val fn1: PartialFunction[Any, Unit] = { - case s: String => - val result1 = workList2.remove(fn1) - assert(result1 === true, "First remove must return true") - val result2 = workList2.remove(fn1) - assert(result2 === false, "Second remove must return false") - } - - val fn2: PartialFunction[Any, Unit] = { - case s: String => - workList2.add(fn1, permanent = true) - } - - test("Reentrant insert") { - workList2.add(fn2, permanent = false) - assert(workList2.head.next != null) - assert(workList2.tail == workList2.head.next) - - // Processing inserted fn1, reentrant adding fn2 - workList2.process { fn => - var processed = true - fn.applyOrElse("Foo", (_: Any) => processed = false) - processed - } - } - - test("Reentrant delete") { - // Processing inserted fn2, should delete itself - workList2.process { fn => - var processed = true - fn.applyOrElse("Foo", (_: Any) => processed = false) - processed - } - } -} diff --git a/akka-contrib/src/test/scala/akka/contrib/pattern/ReceivePipelineSpec.scala b/akka-contrib/src/test/scala/akka/contrib/pattern/ReceivePipelineSpec.scala deleted file mode 100644 index 50e2382913..0000000000 --- a/akka-contrib/src/test/scala/akka/contrib/pattern/ReceivePipelineSpec.scala +++ /dev/null @@ -1,498 +0,0 @@ -/* - * Copyright (C) 2018-2019 Lightbend Inc. - */ - -package akka.contrib.pattern - -import akka.actor.{ Actor, Props } -import akka.persistence.{ PersistentActor } -import akka.testkit.{ AkkaSpec, ImplicitSender } -import com.typesafe.config.{ Config, ConfigFactory } -import scala.concurrent.duration._ -import akka.testkit.TestProbe -import akka.actor.ActorLogging - -object ReceivePipelineSpec { - import ReceivePipeline._ - - class ReplierActor extends Actor with ReceivePipeline { - def receive: Actor.Receive = becomeAndReply - def becomeAndReply: Actor.Receive = { - case "become" => context.become(justReply) - case m => sender ! m - } - def justReply: Actor.Receive = { - case m => sender ! m - } - } - - class IntReplierActor(max: Int) extends Actor with ReceivePipeline { - def receive: Actor.Receive = { - case m: Int if (m <= max) => sender ! m - } - } - - class TotallerActor extends Actor with ReceivePipeline { - var total = 0 - def receive: Actor.Receive = { - case m: Int => total += m - case "get" => sender ! total - } - } - - case class IntList(l: List[Int]) { - override def toString: String = s"IntList(${l.mkString(", ")})" - } - - trait ListBuilderInterceptor { - this: ReceivePipeline => - - pipelineOuter { - case n: Int => Inner(IntList((n until n + 3).toList)) - } - } - - trait AdderInterceptor { - this: ReceivePipeline => - - pipelineInner { - case n: Int => Inner(n + 10) - case IntList(l) => Inner(IntList(l.map(_ + 10))) - case "explicitly ignored" => HandledCompletely - } - } - - trait ToStringInterceptor { - this: ReceivePipeline => - - pipelineInner { - case i: Int => Inner(i.toString) - case IntList(l) => Inner(l.toString) - case other: Iterable[_] => Inner(other.toString) - } - } - - trait OddDoublerInterceptor { - this: ReceivePipeline => - - pipelineInner { - case i: Int if (i % 2 != 0) => Inner(i * 2) - } - } - - trait EvenHalverInterceptor { - this: ReceivePipeline => - - pipelineInner { - case i: Int if (i % 2 == 0) => Inner(i / 2) - } - } - - trait Timer { - this: ReceivePipeline => - - def notifyDuration(duration: Long): Unit - - pipelineInner { - case msg: Any => - val start = 1L // = currentTimeMillis - Inner(msg).andAfter { - val end = 100L // = currentTimeMillis - notifyDuration(end - start) - } - } - } -} - -class ReceivePipelineSpec extends AkkaSpec with ImplicitSender { - import ReceivePipelineSpec._ - - "A ReceivePipeline" must { - - "just invoke Actor's behavior when it's empty" in { - val replier = system.actorOf(Props[ReplierActor]) - replier ! 3 - expectMsg(3) - } - - "invoke decorated Actor's behavior when has one interceptor" in { - val replier = system.actorOf(Props(new ReplierActor with AdderInterceptor)) - replier ! 5 - expectMsg(15) - } - - "support any number of interceptors" in { - val replier = system.actorOf( - Props(new ReplierActor with ListBuilderInterceptor with AdderInterceptor with ToStringInterceptor)) - replier ! 8 - expectMsg("List(18, 19, 20)") - } - - "delegate messages unhandled by interceptors to the inner behavior" in { - - val replier = system.actorOf( - Props(new ReplierActor with ListBuilderInterceptor with AdderInterceptor with ToStringInterceptor)) - replier ! 8L // unhandled by all interceptors but still replied - expectMsg(8L) - replier ! Set(8f) // unhandled by all but ToString Interceptor, so replied as String - expectMsg("Set(8.0)") - } - - "let any interceptor to explicitly ignore some messages" in { - - val replier = system.actorOf( - Props(new ReplierActor with ListBuilderInterceptor with AdderInterceptor with ToStringInterceptor)) - replier ! "explicitly ignored" - replier ! 8L // unhandled by all interceptors but still replied - expectMsg(8L) - } - - "support changing behavior without losing the interceptions" in { - val replier = system.actorOf( - Props(new ReplierActor with ListBuilderInterceptor with AdderInterceptor with ToStringInterceptor)) - replier ! 8 - expectMsg("List(18, 19, 20)") - replier ! "become" - replier ! 3 - expectMsg("List(13, 14, 15)") - } - - "support swapping inner and outer interceptors mixin order" in { - val outerInnerReplier = system.actorOf(Props(new ReplierActor with ListBuilderInterceptor with AdderInterceptor)) - val innerOuterReplier = system.actorOf(Props(new ReplierActor with AdderInterceptor with ListBuilderInterceptor)) - outerInnerReplier ! 4 - expectMsg(IntList(List(14, 15, 16))) - innerOuterReplier ! 6 - expectMsg(IntList(List(16, 17, 18))) - } - } - -} - -object PersistentReceivePipelineSpec { - class PersistentReplierActor extends PersistentActor with ReceivePipeline { - override def persistenceId: String = "p-1" - - def becomeAndReply: Actor.Receive = { - case "become" => context.become(justReply) - case m => sender ! m - } - def justReply: Actor.Receive = { - case m => sender ! m - } - - override def receiveCommand: Receive = becomeAndReply - override def receiveRecover: Receive = { - case _ => // ... - } - } - -} -class PersistentReceivePipelineSpec(config: Config) extends AkkaSpec(config) with ImplicitSender { - import ReceivePipelineSpec._ - import PersistentReceivePipelineSpec._ - - def this() { - this(ConfigFactory.parseString(s""" - |akka.persistence.journal.plugin = "akka.persistence.journal.inmem" - |akka.persistence.journal.leveldb.dir = "target/journal-${getClass.getSimpleName}" - """.stripMargin)) - } - - "A PersistentActor with ReceivePipeline" must { - "support any number of interceptors" in { - val replier = system.actorOf( - Props(new PersistentReplierActor with ListBuilderInterceptor with AdderInterceptor with ToStringInterceptor)) - replier ! 8 - expectMsg("List(18, 19, 20)") - } - "allow messages explicitly passed on by interceptors to be handled by the actor" in { - val replier = system.actorOf(Props(new IntReplierActor(10) with EvenHalverInterceptor with OddDoublerInterceptor)) - - // 6 -> 3 -> 6 - replier ! 6 - expectMsg(6) - } - - "allow messages not handled by some interceptors to be handled by the actor" in { - val replier = system.actorOf(Props(new IntReplierActor(10) with EvenHalverInterceptor with OddDoublerInterceptor)) - - // 8 -> 4 ( -> not handled by OddDoublerInterceptor) - replier ! 8 - expectMsg(4) - } - - "allow messages explicitly passed on by interceptors but not handled by the actor to be treated as unhandled" in { - val probe = new TestProbe(system) - val probeRef = probe.ref - - val replier = system.actorOf(Props(new IntReplierActor(10) with EvenHalverInterceptor with OddDoublerInterceptor { - override def unhandled(message: Any) = probeRef ! message - })) - - // 22 -> 11 -> 22 but > 10 so not handled in main receive: falls back to unhandled implementation... - replier ! 22 - probe.expectMsg(22) - } - - "allow messages not handled by some interceptors or by the actor to be treated as unhandled" in { - val probe = new TestProbe(system) - val probeRef = probe.ref - - val replier = system.actorOf(Props(new IntReplierActor(10) with EvenHalverInterceptor with OddDoublerInterceptor { - override def unhandled(message: Any) = probeRef ! message - })) - - // 11 ( -> not handled by EvenHalverInterceptor) -> 22 but > 10 so not handled in main receive: - // original message falls back to unhandled implementation... - replier ! 11 - probe.expectMsg(11) - } - - "allow messages not handled by any interceptors or by the actor to be treated as unhandled" in { - val probe = new TestProbe(system) - val probeRef = probe.ref - - val replier = system.actorOf(Props(new IntReplierActor(10) with EvenHalverInterceptor with OddDoublerInterceptor { - override def unhandled(message: Any) = probeRef ! message - })) - - replier ! "hi there!" - probe.expectMsg("hi there!") - } - - "not treat messages handled by the actor as unhandled" in { - val probe = new TestProbe(system) - val probeRef = probe.ref - - val replier = system.actorOf(Props(new IntReplierActor(10) with EvenHalverInterceptor with OddDoublerInterceptor { - override def unhandled(message: Any) = probeRef ! message - })) - - replier ! 4 - expectMsg(2) - probe.expectNoMsg(100.millis) - } - - "continue to handle messages normally after unhandled messages" in { - val probe = new TestProbe(system) - val probeRef = probe.ref - - val replier = system.actorOf(Props(new IntReplierActor(10) with EvenHalverInterceptor with OddDoublerInterceptor { - override def unhandled(message: Any) = probeRef ! message - })) - - replier ! "hi there!" - replier ! 8 - probe.expectMsg("hi there!") - expectMsg(4) - } - - "call side-effecting receive code only once" in { - val totaller = system.actorOf(Props(new TotallerActor with EvenHalverInterceptor with OddDoublerInterceptor)) - - totaller ! 8 - totaller ! 6 - totaller ! "get" - expectMsg(10) - } - - "not cache the result of the same message" in { - val totaller = system.actorOf(Props(new TotallerActor with EvenHalverInterceptor with OddDoublerInterceptor)) - - totaller ! 6 - totaller ! 6 - totaller ! "get" - expectMsg(12) - } - - "run code in 'after' block" in { - val probe = new TestProbe(system) - val probeRef = probe.ref - - val totaller = system.actorOf(Props(new TotallerActor with Timer { - def notifyDuration(d: Long) = probeRef ! d - })) - - totaller ! 6 - totaller ! "get" - expectMsg(6) - probe.expectMsg(99L) - } - } -} - -// Just compiling code samples for documentation. Not intended to be tests. - -object InActorSample extends App { - import ReceivePipeline._ - - import akka.actor.ActorSystem - - val system = ActorSystem("pipeline") - - val actor = system.actorOf(Props[PipelinedActor]()) - - //#in-actor - class PipelinedActor extends Actor with ReceivePipeline { - - // Increment - pipelineInner { case i: Int => Inner(i + 1) } - // Double - pipelineInner { case i: Int => Inner(i * 2) } - - def receive: Receive = { case any => println(any) } - } - - actor ! 5 // prints 12 = (5 + 1) * 2 - //#in-actor - - val withOuterActor = system.actorOf(Props[PipelinedOuterActor]()) - - class PipelinedOuterActor extends Actor with ReceivePipeline { - - //#in-actor-outer - // Increment - pipelineInner { case i: Int => Inner(i + 1) } - // Double - pipelineOuter { case i: Int => Inner(i * 2) } - - // prints 11 = (5 * 2) + 1 - //#in-actor-outer - - def receive: Receive = { case any => println(any) } - } - - withOuterActor ! 5 - -} - -object InterceptorSamples { - import ReceivePipeline._ - - //#interceptor-sample1 - val incrementInterceptor: Interceptor = { - case i: Int => Inner(i + 1) - } - //#interceptor-sample1 - - def logTimeTaken(time: Long) = ??? - - //#interceptor-sample2 - val timerInterceptor: Interceptor = { - case e => - val start = System.nanoTime - Inner(e).andAfter { - val end = System.nanoTime - logTimeTaken(end - start) - } - } - //#interceptor-sample2 - -} - -object MixinSample extends App { - import ReceivePipeline._ - - import akka.actor.{ ActorSystem, Props } - - val system = ActorSystem("pipeline") - - //#mixin-model - val texts = Map( - "that.rug_EN" -> "That rug really tied the room together.", - "your.opinion_EN" -> "Yeah, well, you know, that's just, like, your opinion, man.", - "that.rug_ES" -> "Esa alfombra realmente completaba la sala.", - "your.opinion_ES" -> "Sí, bueno, ya sabes, eso es solo, como, tu opinion, amigo.") - - case class I18nText(locale: String, key: String) - case class Message(author: Option[String], text: Any) - //#mixin-model - - //#mixin-interceptors - trait I18nInterceptor { - this: ReceivePipeline => - - pipelineInner { - case m @ Message(_, I18nText(loc, key)) => - Inner(m.copy(text = texts(s"${key}_$loc"))) - } - } - - trait AuditInterceptor { - this: ReceivePipeline => - - pipelineOuter { - case m @ Message(Some(author), text) => - println(s"$author is about to say: $text") - Inner(m) - } - } - //#mixin-interceptors - - val printerActor = system.actorOf(Props[PrinterActor]()) - - //#mixin-actor - class PrinterActor extends Actor with ReceivePipeline with I18nInterceptor with AuditInterceptor { - - override def receive: Receive = { - case Message(author, text) => - println(s"${author.getOrElse("Unknown")} says '$text'") - } - } - - printerActor ! Message(Some("The Dude"), I18nText("EN", "that.rug")) - // The Dude is about to say: I18nText(EN,that.rug) - // The Dude says 'That rug really tied the room together.' - - printerActor ! Message(Some("The Dude"), I18nText("EN", "your.opinion")) - // The Dude is about to say: I18nText(EN,your.opinion) - // The Dude says 'Yeah, well, you know, that's just, like, your opinion, man.' - //#mixin-actor - - system.terminate() -} - -object UnhandledSample extends App { - import ReceivePipeline._ - - def isGranted(userId: Long) = true - - //#unhandled - case class PrivateMessage(userId: Option[Long], msg: Any) - - trait PrivateInterceptor { - this: ReceivePipeline => - - pipelineInner { - case PrivateMessage(Some(userId), msg) => - if (isGranted(userId)) - Inner(msg) - else - HandledCompletely - } - } - //#unhandled - -} - -object AfterSamples { - import ReceivePipeline._ - - //#interceptor-after - trait TimerInterceptor extends ActorLogging { - this: ReceivePipeline => - - def logTimeTaken(time: Long) = log.debug(s"Time taken: $time ns") - - pipelineOuter { - case e => - val start = System.nanoTime - Inner(e).andAfter { - val end = System.nanoTime - logTimeTaken(end - start) - } - } - } - //#interceptor-after -} diff --git a/akka-contrib/src/test/scala/akka/contrib/pattern/ReliableProxyDocSpec.scala b/akka-contrib/src/test/scala/akka/contrib/pattern/ReliableProxyDocSpec.scala deleted file mode 100644 index 73849a5aaa..0000000000 --- a/akka-contrib/src/test/scala/akka/contrib/pattern/ReliableProxyDocSpec.scala +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Copyright (C) 2009-2019 Lightbend Inc. - */ - -package akka.contrib.pattern - -import akka.testkit.AkkaSpec -import akka.actor._ -import scala.concurrent.duration._ -import akka.testkit.TestProbe - -object ReliableProxyDocSpec { - - //#demo - import akka.contrib.pattern.ReliableProxy - - class ProxyParent(targetPath: ActorPath) extends Actor { - val proxy = context.actorOf(ReliableProxy.props(targetPath, 100.millis)) - - def receive = { - case "hello" => proxy ! "world!" - } - } - //#demo - - //#demo-transition - class ProxyTransitionParent(targetPath: ActorPath) extends Actor { - val proxy = context.actorOf(ReliableProxy.props(targetPath, 100.millis)) - proxy ! FSM.SubscribeTransitionCallBack(self) - - var client: ActorRef = _ - - def receive = { - case "go" => - proxy ! 42 - client = sender() - case FSM.CurrentState(`proxy`, initial) => - case FSM.Transition(`proxy`, from, to) => - if (to == ReliableProxy.Idle) - client ! "done" - } - } - //#demo-transition - - class WatchingProxyParent(targetPath: ActorPath) extends Actor { - val proxy = context.watch( - context.actorOf(ReliableProxy.props(targetPath, 100.millis, reconnectAfter = 500.millis, maxReconnects = 3))) - - var client: Option[ActorRef] = None - - def receive = { - case "hello" => - proxy ! "world!" - client = Some(sender()) - case Terminated(`proxy`) => - client.foreach { _ ! "terminated" } - } - } -} - -class ReliableProxyDocSpec extends AkkaSpec { - - import ReliableProxyDocSpec._ - - "A ReliableProxy" must { - - "show usage" in { - val probe = TestProbe() - val a = system.actorOf(Props(classOf[ProxyParent], probe.ref.path)) - a.tell("hello", probe.ref) - probe.expectMsg("world!") - } - - "show state transitions" in { - val target = TestProbe().ref - val probe = TestProbe() - val a = system.actorOf(Props(classOf[ProxyTransitionParent], target.path)) - a.tell("go", probe.ref) - probe.expectMsg("done") - } - - "show terminated after maxReconnects" in within(5.seconds) { - val target = system.deadLetters - val probe = TestProbe() - val a = system.actorOf(Props(classOf[WatchingProxyParent], target.path)) - a.tell("hello", probe.ref) - probe.expectMsg("terminated") - } - - } - -} diff --git a/akka-contrib/src/test/scala/akka/contrib/throttle/TimerBasedThrottleTest.scala b/akka-contrib/src/test/scala/akka/contrib/throttle/TimerBasedThrottleTest.scala deleted file mode 100644 index ea351a2c92..0000000000 --- a/akka-contrib/src/test/scala/akka/contrib/throttle/TimerBasedThrottleTest.scala +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Copyright (C) 2009-2019 Lightbend Inc. - */ - -package akka.contrib.throttle - -import java.util.concurrent.TimeUnit._ - -import akka.actor.ActorSystem -import akka.contrib.throttle.Throttler._ -import akka.testkit.{ TestActorRef, TestKit } -import org.scalatest.WordSpecLike - -class TimerBasedThrottleTest extends TestKit(ActorSystem("TimerBasedThrottler")) with WordSpecLike { - "A throttler" must { - "normalize all rates to the highest precision (nanoseconds)" in { - val throttler = TestActorRef(new TimerBasedThrottler(1.msgsPer(1, SECONDS))) - val throttler2 = TestActorRef(new TimerBasedThrottler(5.msgsPer(1, SECONDS))) - val throttler3 = TestActorRef(new TimerBasedThrottler(10.msgsPer(10, MILLISECONDS))) - val throttler4 = TestActorRef(new TimerBasedThrottler(1.msgsPer(1, MINUTES))) - - assert(throttler.underlyingActor.rate.duration.toNanos == 1e9) - assert(throttler.underlyingActor.rate.numberOfCalls == 1) - - assert(throttler2.underlyingActor.rate.duration.toNanos == 1e9 / 5) - assert(throttler2.underlyingActor.rate.numberOfCalls == 1) - - assert(throttler3.underlyingActor.rate.duration.toNanos == 1e6 * 10 / 10) // Convert ms to nanos - assert(throttler3.underlyingActor.rate.numberOfCalls == 1) - - assert(throttler4.underlyingActor.rate.duration.toNanos == 1e9 * 60) - assert(throttler4.underlyingActor.rate.numberOfCalls == 1) - } - - "handle zero number of calls gracefully" in { - val throttler = TestActorRef(new TimerBasedThrottler(0.msgsPer(1, SECONDS))) - - assert(throttler.underlyingActor.rate.duration.toSeconds == 1) - assert(throttler.underlyingActor.rate.numberOfCalls == 0) - } - } -} diff --git a/akka-contrib/src/test/scala/akka/contrib/throttle/TimerBasedThrottlerSpec.scala b/akka-contrib/src/test/scala/akka/contrib/throttle/TimerBasedThrottlerSpec.scala deleted file mode 100644 index afbe66a692..0000000000 --- a/akka-contrib/src/test/scala/akka/contrib/throttle/TimerBasedThrottlerSpec.scala +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Copyright (C) 2009-2019 Lightbend Inc. - */ - -package akka.contrib.throttle - -import language.postfixOps -import scala.concurrent.duration._ -import akka.actor.ActorSystem -import akka.actor.Actor -import akka.actor.Props -import akka.testkit.TestKit -import akka.contrib.throttle.Throttler._ -import org.scalatest.WordSpecLike -import org.scalatest.Matchers -import org.scalatest.BeforeAndAfterAll -import akka.testkit._ - -object TimerBasedThrottlerSpec { - def println(a: Any) = () - - //#demo-code - // A simple actor that prints whatever it receives - class PrintActor extends Actor { - def receive = { - case x => println(x) - } - } - - //#demo-code -} - -class TimerBasedThrottlerSpec - extends TestKit(ActorSystem("TimerBasedThrottlerSpec")) - with ImplicitSender - with WordSpecLike - with Matchers - with BeforeAndAfterAll { - - import TimerBasedThrottlerSpec._ - - override def afterAll: Unit = { - shutdown() - } - - "A throttler" must { - def println(a: Any) = () - "pass the ScalaDoc class documentation example program" in { - //#demo-code - val printer = system.actorOf(Props[PrintActor]) - // The throttler for this example, setting the rate - val throttler = system.actorOf(Props(classOf[TimerBasedThrottler], 3.msgsPer(1.second))) - // Set the target - throttler ! SetTarget(Some(printer)) - // These three messages will be sent to the target immediately - throttler ! "1" - throttler ! "2" - throttler ! "3" - // These two will wait until a second has passed - throttler ! "4" - throttler ! "5" - //#demo-code - } - - "keep messages until a target is set" in { - val echo = system.actorOf(TestActors.echoActorProps) - val throttler = system.actorOf(Props(classOf[TimerBasedThrottler], 3.msgsPer(1.second.dilated))) - (1 to 6).foreach { throttler ! _ } - expectNoMsg(1 second) - throttler ! SetTarget(Some(echo)) - within(2.5 seconds) { - (1 to 6).foreach { expectMsg(_) } - } - } - - "send messages after a `SetTarget(None)` pause" in { - val echo = system.actorOf(TestActors.echoActorProps) - val throttler = system.actorOf(Props(classOf[TimerBasedThrottler], 3.msgsPer(5.second.dilated))) - throttler ! SetTarget(Some(echo)) - (1 to 3).foreach { throttler ! _ } - throttler ! SetTarget(None) - within(1.7 second) { - expectMsg(1) - expectNoMsg() - } - expectNoMsg(1 second) - throttler ! SetTarget(Some(echo)) - (4 to 7).foreach { throttler ! _ } - within(10.5 seconds) { - (2 to 7).foreach { expectMsg(_) } - } - } - - "keep messages when the target is set to None" in { - val echo = system.actorOf(TestActors.echoActorProps) - val throttler = system.actorOf(Props(classOf[TimerBasedThrottler], 3.msgsPer(5.second.dilated))) - throttler ! SetTarget(Some(echo)) - (1 to 7).foreach { throttler ! _ } - throttler ! SetTarget(None) - within(1.7 second) { - expectMsg(1) - expectNoMsg() - } - expectNoMsg(1 second) - throttler ! SetTarget(Some(echo)) - within(10.5 seconds) { - (2 to 7).foreach { expectMsg(_) } - } - } - - "respect the rate (3 msg/s)" in within(1.5 seconds, 2.5 seconds) { - val echo = system.actorOf(TestActors.echoActorProps) - val throttler = system.actorOf(Props(classOf[TimerBasedThrottler], 3.msgsPer(1.second.dilated))) - throttler ! SetTarget(Some(echo)) - (1 to 7).foreach { throttler ! _ } - (1 to 7).foreach { expectMsg(_) } - } - - "respect the rate (4 msg/s)" in within(1.5 seconds, 2.5 seconds) { - val echo = system.actorOf(TestActors.echoActorProps) - val throttler = system.actorOf(Props(classOf[TimerBasedThrottler], 4.msgsPer(1.second.dilated))) - throttler ! SetTarget(Some(echo)) - (1 to 9).foreach { throttler ! _ } - (1 to 9).foreach { expectMsg(_) } - } - } -} diff --git a/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md b/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md index e7ac1d7905..71c2006a78 100644 --- a/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md +++ b/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md @@ -15,6 +15,12 @@ After being deprecated in 2.5.0, the akka-agent module has been removed in 2.6. If there is interest it may be moved to a separate, community-maintained repository. +## akka-contrib removed + +The akka-contrib module was deprecated in 2.5 and has been removed in 2.6. +To migrate, take the components you are using from [Akka 2.5](https://github.com/akka/akka/tree/release-2.5/akka-contrib) +and include them in your own project or library under your own package name. + ## Scala 2.11 no longer supported If you are still using Scala 2.11 then you must upgrade to 2.12 or 2.13 @@ -102,4 +108,4 @@ A full cluster restart is required to change to Artery. ### Passivate idle entity The configuration `akka.cluster.sharding.passivate-idle-entity-after` is now enabled by default. Sharding will passivate entities when they have not received any messages after this duration. -Set \ No newline at end of file +Set diff --git a/build.sbt b/build.sbt index 623c2b1218..3f6a403624 100644 --- a/build.sbt +++ b/build.sbt @@ -48,7 +48,6 @@ lazy val aggregatedProjects: Seq[ProjectReference] = List[ProjectReference]( clusterShardingTyped, clusterTools, clusterTyped, - contrib, coordination, discovery, distributedData, @@ -162,26 +161,6 @@ lazy val clusterTools = akkaModule("akka-cluster-tools") .configs(MultiJvm) .enablePlugins(MultiNode, ScaladocNoVerificationOfDiagrams) -lazy val contrib = akkaModule("akka-contrib") - .dependsOn(remote, remoteTests % "test->test", cluster, clusterTools, persistence % "compile->compile") - .settings(Dependencies.contrib) - .settings(AutomaticModuleName.settings("akka.contrib")) - .settings(OSGi.contrib) - .settings(description := - """| - |This subproject provides a home to modules contributed by external - |developers which may or may not move into the officially supported code - |base over time. A module in this subproject doesn't have to obey the rule - |of staying binary compatible between minor releases. Breaking API changes - |may be introduced in minor releases without notice as we refine and - |simplify based on your feedback. A module may be dropped in any release - |without prior deprecation. The Lightbend subscription does not cover - |support for these modules. - |""".stripMargin) - .configs(MultiJvm) - .enablePlugins(MultiNode, ScaladocNoVerificationOfDiagrams) - .disablePlugins(MimaPlugin) - lazy val distributedData = akkaModule("akka-distributed-data") .dependsOn(cluster % "compile->compile;test->test;multi-jvm->multi-jvm") .settings(Dependencies.distributedData) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 86ef831922..41fec9f200 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -210,8 +210,6 @@ object Dependencies { val docs = l ++= Seq(Test.scalatest.value, Test.junit, Docs.sprayJson, Docs.gson, Provided.levelDB) - val contrib = l ++= Seq(Test.commonsIo) - val benchJmh = l ++= Seq(Provided.levelDB, Provided.levelDBNative, Compile.jctools) // akka stream diff --git a/project/OSGi.scala b/project/OSGi.scala index c03a12d75a..319886d671 100644 --- a/project/OSGi.scala +++ b/project/OSGi.scala @@ -56,8 +56,6 @@ object OSGi { val distributedData = exports(Seq("akka.cluster.ddata.*")) - val contrib = exports(Seq("akka.contrib.*")) - val osgi = exports(Seq("akka.osgi.*")) val protobuf = exports(Seq("akka.protobuf.*"))