Merge branch 'master' into wip-sync-2.4.10-patriknw
This commit is contained in:
commit
e8ce261faf
53 changed files with 1071 additions and 391 deletions
|
|
@ -40,6 +40,9 @@ object CircuitBreakerSpec {
|
|||
|
||||
def multiFailureCb()(implicit system: ActorSystem, ec: ExecutionContext): Breaker =
|
||||
new Breaker(new CircuitBreaker(system.scheduler, 5, 200.millis.dilated, 500.millis.dilated))
|
||||
|
||||
def nonOneFactorCb()(implicit system: ActorSystem, ec: ExecutionContext): Breaker =
|
||||
new Breaker(new CircuitBreaker(system.scheduler, 1, 2000.millis.dilated, 1000.millis.dilated, 1.day.dilated, 5))
|
||||
}
|
||||
|
||||
class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter {
|
||||
|
|
@ -209,6 +212,28 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter {
|
|||
breaker().withCircuitBreaker(Future(throwException))
|
||||
checkLatch(breaker.halfOpenLatch)
|
||||
}
|
||||
|
||||
"increase the reset timeout after it transits to open again" in {
|
||||
val breaker = CircuitBreakerSpec.nonOneFactorCb()
|
||||
breaker().withCircuitBreaker(Future(throwException))
|
||||
checkLatch(breaker.openLatch)
|
||||
|
||||
val e1 = intercept[CircuitBreakerOpenException] { breaker().withSyncCircuitBreaker(sayHi) }
|
||||
val shortRemainingDuration = e1.remainingDuration
|
||||
|
||||
Thread.sleep(1000.millis.dilated.toMillis)
|
||||
checkLatch(breaker.halfOpenLatch)
|
||||
|
||||
// transit to open again
|
||||
breaker().withCircuitBreaker(Future(throwException))
|
||||
checkLatch(breaker.openLatch)
|
||||
|
||||
val e2 = intercept[CircuitBreakerOpenException] { breaker().withSyncCircuitBreaker(sayHi) }
|
||||
val longRemainingDuration = e2.remainingDuration
|
||||
|
||||
(shortRemainingDuration < longRemainingDuration) should ===(true)
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
"An asynchronous circuit breaker that is half-open" must {
|
||||
|
|
|
|||
|
|
@ -7,10 +7,12 @@ import akka.util.Unsafe;
|
|||
|
||||
class AbstractCircuitBreaker {
|
||||
protected final static long stateOffset;
|
||||
protected final static long resetTimeoutOffset;
|
||||
|
||||
static {
|
||||
try {
|
||||
stateOffset = Unsafe.instance.objectFieldOffset(CircuitBreaker.class.getDeclaredField("_currentStateDoNotCallMeDirectly"));
|
||||
resetTimeoutOffset = Unsafe.instance.objectFieldOffset(CircuitBreaker.class.getDeclaredField("_currentResetTimeoutDoNotCallMeDirectly"));
|
||||
} catch(Throwable t){
|
||||
throw new ExceptionInInitializerError(t);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -18,7 +18,6 @@ import scala.concurrent.TimeoutException
|
|||
import scala.util.control.NonFatal
|
||||
import scala.util.Success
|
||||
import akka.dispatch.ExecutionContexts.sameThreadExecutionContext
|
||||
import akka.japi.function.Creator
|
||||
|
||||
import scala.compat.java8.FutureConverters
|
||||
|
||||
|
|
@ -41,7 +40,6 @@ object CircuitBreaker {
|
|||
*/
|
||||
def apply(scheduler: Scheduler, maxFailures: Int, callTimeout: FiniteDuration, resetTimeout: FiniteDuration): CircuitBreaker =
|
||||
new CircuitBreaker(scheduler, maxFailures, callTimeout, resetTimeout)(sameThreadExecutionContext)
|
||||
|
||||
/**
|
||||
* Java API: Create a new CircuitBreaker.
|
||||
*
|
||||
|
|
@ -71,17 +69,39 @@ object CircuitBreaker {
|
|||
* closed state. If it fails, the circuit breaker will re-open to open state. All calls beyond the first that
|
||||
* execute while the first is running will fail-fast with an exception.
|
||||
*
|
||||
*
|
||||
* @param scheduler Reference to Akka scheduler
|
||||
* @param maxFailures Maximum number of failures before opening the circuit
|
||||
* @param callTimeout [[scala.concurrent.duration.FiniteDuration]] of time after which to consider a call a failure
|
||||
* @param resetTimeout [[scala.concurrent.duration.FiniteDuration]] of time after which to attempt to close the circuit
|
||||
* @param executor [[scala.concurrent.ExecutionContext]] used for execution of state transition listeners
|
||||
*/
|
||||
class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: FiniteDuration, resetTimeout: FiniteDuration)(implicit executor: ExecutionContext) extends AbstractCircuitBreaker {
|
||||
class CircuitBreaker(
|
||||
scheduler: Scheduler,
|
||||
maxFailures: Int,
|
||||
callTimeout: FiniteDuration,
|
||||
resetTimeout: FiniteDuration,
|
||||
maxResetTimeout: FiniteDuration,
|
||||
exponentialBackoffFactor: Double)(implicit executor: ExecutionContext) extends AbstractCircuitBreaker {
|
||||
|
||||
require(exponentialBackoffFactor >= 1.0, "factor must be >= 1.0")
|
||||
|
||||
def this(executor: ExecutionContext, scheduler: Scheduler, maxFailures: Int, callTimeout: FiniteDuration, resetTimeout: FiniteDuration) = {
|
||||
this(scheduler, maxFailures, callTimeout, resetTimeout)(executor)
|
||||
this(scheduler, maxFailures, callTimeout, resetTimeout, 36500.days, 1.0)(executor)
|
||||
}
|
||||
|
||||
// add the old constructor to make it binary compatible
|
||||
def this(scheduler: Scheduler, maxFailures: Int, callTimeout: FiniteDuration, resetTimeout: FiniteDuration)(implicit executor: ExecutionContext) = {
|
||||
this(scheduler, maxFailures, callTimeout, resetTimeout, 36500.days, 1.0)(executor)
|
||||
}
|
||||
|
||||
/**
|
||||
* The `resetTimeout` will be increased exponentially for each failed attempt to close the circuit.
|
||||
* The default exponential backoff factor is 2.
|
||||
*
|
||||
* @param maxResetTimeout the upper bound of resetTimeout
|
||||
*/
|
||||
def withExponentialBackoff(maxResetTimeout: FiniteDuration): CircuitBreaker = {
|
||||
new CircuitBreaker(scheduler, maxFailures, callTimeout, resetTimeout, maxResetTimeout, 2.0)(executor)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -90,6 +110,12 @@ class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Finite
|
|||
@volatile
|
||||
private[this] var _currentStateDoNotCallMeDirectly: State = Closed
|
||||
|
||||
/**
|
||||
* Holds reference to current resetTimeout of CircuitBreaker - *access only via helper methods*
|
||||
*/
|
||||
@volatile
|
||||
private[this] var _currentResetTimeoutDoNotCallMeDirectly: FiniteDuration = resetTimeout
|
||||
|
||||
/**
|
||||
* Helper method for access to underlying state via Unsafe
|
||||
*
|
||||
|
|
@ -110,6 +136,20 @@ class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Finite
|
|||
private[this] def currentState: State =
|
||||
Unsafe.instance.getObjectVolatile(this, AbstractCircuitBreaker.stateOffset).asInstanceOf[State]
|
||||
|
||||
/**
|
||||
* Helper method for updating the underlying resetTimeout via Unsafe
|
||||
*/
|
||||
@inline
|
||||
private[this] def swapResetTimeout(oldResetTimeout: FiniteDuration, newResetTimeout: FiniteDuration): Boolean =
|
||||
Unsafe.instance.compareAndSwapObject(this, AbstractCircuitBreaker.resetTimeoutOffset, oldResetTimeout, newResetTimeout)
|
||||
|
||||
/**
|
||||
* Helper method for accessing to the underlying resetTimeout via Unsafe
|
||||
*/
|
||||
@inline
|
||||
private[this] def currentResetTimeout: FiniteDuration =
|
||||
Unsafe.instance.getObjectVolatile(this, AbstractCircuitBreaker.resetTimeoutOffset).asInstanceOf[FiniteDuration]
|
||||
|
||||
/**
|
||||
* Wraps invocations of asynchronous calls that need to be protected
|
||||
*
|
||||
|
|
@ -451,11 +491,14 @@ class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Finite
|
|||
override def callFails(): Unit = if (incrementAndGet() == maxFailures) tripBreaker(Closed)
|
||||
|
||||
/**
|
||||
* On entry of this state, failure count is reset.
|
||||
* On entry of this state, failure count and resetTimeout is reset.
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
override def _enter(): Unit = set(0)
|
||||
override def _enter(): Unit = {
|
||||
set(0)
|
||||
swapResetTimeout(currentResetTimeout, resetTimeout)
|
||||
}
|
||||
|
||||
/**
|
||||
* Override for more descriptive toString
|
||||
|
|
@ -530,7 +573,7 @@ class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Finite
|
|||
*/
|
||||
private def remainingDuration(): FiniteDuration = {
|
||||
val fromOpened = System.nanoTime() - get
|
||||
val diff = resetTimeout.toNanos - fromOpened
|
||||
val diff = currentResetTimeout.toNanos - fromOpened
|
||||
if (diff <= 0L) Duration.Zero
|
||||
else diff.nanos
|
||||
}
|
||||
|
|
@ -557,9 +600,16 @@ class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Finite
|
|||
*/
|
||||
override def _enter(): Unit = {
|
||||
set(System.nanoTime())
|
||||
scheduler.scheduleOnce(resetTimeout) {
|
||||
scheduler.scheduleOnce(currentResetTimeout) {
|
||||
attemptReset()
|
||||
}
|
||||
val nextResetTimeout = currentResetTimeout * exponentialBackoffFactor match {
|
||||
case f: FiniteDuration ⇒ f
|
||||
case _ ⇒ currentResetTimeout
|
||||
}
|
||||
|
||||
if (nextResetTimeout < maxResetTimeout)
|
||||
swapResetTimeout(currentResetTimeout, nextResetTimeout)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -561,7 +561,7 @@ sealed abstract class ByteString extends IndexedSeq[Byte] with IndexedSeqOptimiz
|
|||
override def init: ByteString = dropRight(1)
|
||||
|
||||
// *must* be overridden by derived classes.
|
||||
override def take(n: Int): ByteString = throw new UnsupportedOperationException("Method slice is not implemented in ByteString")
|
||||
override def take(n: Int): ByteString = throw new UnsupportedOperationException("Method take is not implemented in ByteString")
|
||||
override def takeRight(n: Int): ByteString = slice(length - n, length)
|
||||
|
||||
// these methods are optimized in derived classes utilising the maximum knowlage about data layout available to them:
|
||||
|
|
|
|||
|
|
@ -26,6 +26,10 @@ akka {
|
|||
# unreachable nodes as DOWN after a configured time of unreachability?
|
||||
# Using auto-down implies that two separate clusters will automatically be
|
||||
# formed in case of network partition.
|
||||
#
|
||||
# Don't enable this in production, see 'Auto-downing (DO NOT USE)' section
|
||||
# of Akka Cluster documentation.
|
||||
#
|
||||
# Disable with "off" or specify a duration to enable auto-down.
|
||||
# If a downing-provider-class is configured this setting is ignored.
|
||||
auto-down-unreachable-after = off
|
||||
|
|
|
|||
|
|
@ -10,6 +10,7 @@ import scala.concurrent.duration.FiniteDuration
|
|||
import akka.cluster.ClusterEvent._
|
||||
|
||||
import scala.concurrent.duration.Duration
|
||||
import akka.actor.ActorLogging
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
|
|
@ -51,7 +52,7 @@ final class AutoDowning(system: ActorSystem) extends DowningProvider {
|
|||
* able to unit test the logic without running cluster.
|
||||
*/
|
||||
private[cluster] class AutoDown(autoDownUnreachableAfter: FiniteDuration)
|
||||
extends AutoDownBase(autoDownUnreachableAfter) {
|
||||
extends AutoDownBase(autoDownUnreachableAfter) with ActorLogging {
|
||||
|
||||
val cluster = Cluster(context.system)
|
||||
import cluster.InfoLogger._
|
||||
|
|
@ -62,6 +63,8 @@ private[cluster] class AutoDown(autoDownUnreachableAfter: FiniteDuration)
|
|||
|
||||
// re-subscribe when restart
|
||||
override def preStart(): Unit = {
|
||||
log.warning("Don't use auto-down feature of Akka Cluster in production. " +
|
||||
"See 'Auto-downing (DO NOT USE)' section of Akka Cluster documentation.")
|
||||
cluster.subscribe(self, classOf[ClusterDomainEvent])
|
||||
super.preStart()
|
||||
}
|
||||
|
|
@ -72,7 +75,9 @@ private[cluster] class AutoDown(autoDownUnreachableAfter: FiniteDuration)
|
|||
|
||||
override def down(node: Address): Unit = {
|
||||
require(leader)
|
||||
logInfo("Leader is auto-downing unreachable node [{}]", node)
|
||||
logInfo("Leader is auto-downing unreachable node [{}]. " +
|
||||
"Don't use auto-down feature of Akka Cluster in production. " +
|
||||
"See 'Auto-downing (DO NOT USE)' section of Akka Cluster documentation.", node)
|
||||
cluster.down(node)
|
||||
}
|
||||
|
||||
|
|
@ -162,4 +167,4 @@ private[cluster] abstract class AutoDownBase(autoDownUnreachableAfter: FiniteDur
|
|||
pendingUnreachable -= node
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -45,8 +45,8 @@ What do they do?
|
|||
* In `Half-Open` state:
|
||||
* The first call attempted is allowed through without failing fast
|
||||
* All other calls fail-fast with an exception just as in `Open` state
|
||||
* If the first call succeeds, the breaker is reset back to `Closed` state
|
||||
* If the first call fails, the breaker is tripped again into the `Open` state for another full `resetTimeout`
|
||||
* If the first call succeeds, the breaker is reset back to `Closed` state and the `resetTimeout` is reset
|
||||
* If the first call fails, the breaker is tripped again into the `Open` state (as for exponential backoff circuit breaker, the `resetTimeout` is multiplied by the exponential backoff factor)
|
||||
* State transition listeners:
|
||||
* Callbacks can be provided for every state entry via `onOpen`, `onClose`, and `onHalfOpen`
|
||||
* These are executed in the :class:`ExecutionContext` provided.
|
||||
|
|
|
|||
|
|
@ -8,7 +8,7 @@ package docs.circuitbreaker
|
|||
import scala.concurrent.duration._
|
||||
import akka.pattern.CircuitBreaker
|
||||
import akka.pattern.pipe
|
||||
import akka.actor.{Actor, ActorLogging, ActorRef}
|
||||
import akka.actor.{ Actor, ActorLogging, ActorRef }
|
||||
|
||||
import scala.concurrent.Future
|
||||
|
||||
|
|
@ -44,7 +44,7 @@ class DangerousActor extends Actor with ActorLogging {
|
|||
|
||||
}
|
||||
|
||||
class TellPatternActor(recipient : ActorRef) extends Actor with ActorLogging {
|
||||
class TellPatternActor(recipient: ActorRef) extends Actor with ActorLogging {
|
||||
import context.dispatcher
|
||||
|
||||
val breaker =
|
||||
|
|
|
|||
|
|
@ -8,7 +8,7 @@ Akka requires that you have `Java 8 <http://www.oracle.com/technetwork/java/java
|
|||
later installed on your machine.
|
||||
|
||||
`Lightbend Inc. <http://www.lightbend.com>`_ provides a commercial build of Akka and related projects such as Scala or Play
|
||||
as part of the `Reactive Platform <http://www.lightbend.com/products/lightbend-reactive-platform>`_ which is made available
|
||||
as part of the `Lightbend Reactive Platform <http://www.lightbend.com/platform>`_ which is made available
|
||||
for Java 6 in case your project can not upgrade to Java 8 just yet. It also includes additional commercial features or libraries.
|
||||
|
||||
Getting Started Guides and Template Projects
|
||||
|
|
|
|||
|
|
@ -25,6 +25,12 @@ to route the message with the entity id to the final destination.
|
|||
Cluster sharding will not be active on members with status :ref:`WeaklyUp <weakly_up_java>`
|
||||
if that feature is enabled.
|
||||
|
||||
.. warning::
|
||||
**Don't use Cluster Sharding together with Automatic Downing**,
|
||||
since it allows the cluster to split up into two separate clusters, which in turn will result
|
||||
in *multiple shards and entities* being started, one in each separate cluster!
|
||||
See :ref:`automatic-vs-manual-downing-java`.
|
||||
|
||||
An Example
|
||||
----------
|
||||
|
||||
|
|
@ -304,6 +310,12 @@ cannot startup because of corrupt data, which may happen if accidentally
|
|||
two clusters were running at the same time, e.g. caused by using auto-down
|
||||
and there was a network partition.
|
||||
|
||||
.. warning::
|
||||
**Don't use Cluster Sharding together with Automatic Downing**,
|
||||
since it allows the cluster to split up into two separate clusters, which in turn will result
|
||||
in *multiple shards and entities* being started, one in each separate cluster!
|
||||
See :ref:`automatic-vs-manual-downing-java`.
|
||||
|
||||
Use this program as a standalone Java main program::
|
||||
|
||||
java -classpath <jar files, including akka-cluster-sharding>
|
||||
|
|
|
|||
|
|
@ -73,7 +73,7 @@ Especially the last point is something you should be aware of — in general whe
|
|||
you should take care of downing nodes yourself and not rely on the timing based auto-down feature.
|
||||
|
||||
.. warning::
|
||||
**Be very careful when using Cluster Singleton together with Automatic Downing**,
|
||||
**Don't use Cluster Singleton together with Automatic Downing**,
|
||||
since it allows the cluster to split up into two separate clusters, which in turn will result
|
||||
in *multiple Singletons* being started, one in each separate cluster!
|
||||
|
||||
|
|
|
|||
|
|
@ -126,8 +126,8 @@ be allowed to join.
|
|||
|
||||
.. _automatic-vs-manual-downing-java:
|
||||
|
||||
Automatic vs. Manual Downing
|
||||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
Downing
|
||||
^^^^^^^
|
||||
|
||||
When a member is considered by the failure detector to be unreachable the
|
||||
leader is not allowed to perform its duties, such as changing status of
|
||||
|
|
@ -138,7 +138,17 @@ can be performed automatically or manually. By default it must be done manually,
|
|||
|
||||
It can also be performed programmatically with ``Cluster.get(system).down(address)``.
|
||||
|
||||
You can enable automatic downing with configuration::
|
||||
A pre-packaged solution for the downing problem is provided by
|
||||
`Split Brain Resolver <http://doc.akka.io/docs/akka/akka-commercial-addons-1.0/java/split-brain-resolver.html>`_,
|
||||
which is part of the `Lightbend Reactive Platform <http://www.lightbend.com/platform>`_.
|
||||
If you don’t use RP, you should anyway carefully read the `documentation <http://doc.akka.io/docs/akka/akka-commercial-addons-1.0/java/split-brain-resolver.html>`_
|
||||
of the Split Brain Resolver and make sure that the solution you are using handles the concerns
|
||||
described there.
|
||||
|
||||
Auto-downing (DO NOT USE)
|
||||
-------------------------
|
||||
|
||||
There is an atomatic downing feature that you should not use in production. For testing purpose you can enable it with configuration::
|
||||
|
||||
akka.cluster.auto-down-unreachable-after = 120s
|
||||
|
||||
|
|
@ -157,19 +167,8 @@ can also happen because of long GC pauses or system overload.
|
|||
We recommend against using the auto-down feature of Akka Cluster in production.
|
||||
This is crucial for correct behavior if you use :ref:`cluster-singleton-java` or
|
||||
:ref:`cluster_sharding_java`, especially together with Akka :ref:`persistence-java`.
|
||||
|
||||
A pre-packaged solution for the downing problem is provided by
|
||||
`Split Brain Resolver <http://doc.akka.io/docs/akka/rp-16s01p03/java/split-brain-resolver.html>`_,
|
||||
which is part of the Lightbend Reactive Platform. If you don’t use RP, you should anyway carefully
|
||||
read the `documentation <http://doc.akka.io/docs/akka/rp-16s01p03/java/split-brain-resolver.html>`_
|
||||
of the Split Brain Resolver and make sure that the solution you are using handles the concerns
|
||||
described there.
|
||||
|
||||
.. note:: If you have *auto-down* enabled and the failure detector triggers, you
|
||||
can over time end up with a lot of single node clusters if you don't put
|
||||
measures in place to shut down nodes that have become ``unreachable``. This
|
||||
follows from the fact that the ``unreachable`` node will likely see the rest of
|
||||
the cluster as ``unreachable``, become its own leader and form its own cluster.
|
||||
For Akka Persistence with Cluster Sharding it can result in corrupt data in case
|
||||
of network partitions.
|
||||
|
||||
Leaving
|
||||
^^^^^^^
|
||||
|
|
|
|||
|
|
@ -0,0 +1,121 @@
|
|||
/*
|
||||
* Copyright (C) 2016-2016 Lightbend Inc. <http://www.lightbend.com>
|
||||
*/
|
||||
package docs.http.javadsl.server.directives;
|
||||
|
||||
import akka.NotUsed;
|
||||
import akka.http.javadsl.model.HttpRequest;
|
||||
import akka.http.javadsl.model.StatusCodes;
|
||||
import akka.http.javadsl.model.Uri;
|
||||
import akka.http.javadsl.model.headers.SecWebSocketProtocol;
|
||||
import akka.http.javadsl.model.ws.BinaryMessage;
|
||||
import akka.http.javadsl.model.ws.Message;
|
||||
import akka.http.javadsl.model.ws.TextMessage;
|
||||
import akka.http.javadsl.server.Route;
|
||||
import akka.http.javadsl.testkit.JUnitRouteTest;
|
||||
import akka.http.javadsl.testkit.WSProbe;
|
||||
import akka.stream.OverflowStrategy;
|
||||
import akka.stream.javadsl.Flow;
|
||||
import akka.stream.javadsl.Sink;
|
||||
import akka.stream.javadsl.Source;
|
||||
import akka.util.ByteString;
|
||||
import org.junit.Test;
|
||||
import scala.concurrent.duration.FiniteDuration;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class WebSocketDirectivesExamplesTest extends JUnitRouteTest {
|
||||
|
||||
@Test
|
||||
public void testHandleWebSocketMessages() {
|
||||
//#handleWebSocketMessages
|
||||
final Flow<Message, Message, NotUsed> greeter = Flow.of(Message.class).mapConcat(msg -> {
|
||||
if (msg instanceof TextMessage) {
|
||||
final TextMessage tm = (TextMessage) msg;
|
||||
final TextMessage ret = TextMessage.create(Source.single("Hello ").concat(tm.getStreamedText()).concat(Source.single("!")));
|
||||
return Collections.singletonList(ret);
|
||||
} else if (msg instanceof BinaryMessage) {
|
||||
final BinaryMessage bm = (BinaryMessage) msg;
|
||||
bm.getStreamedData().runWith(Sink.ignore(), materializer());
|
||||
return Collections.emptyList();
|
||||
} else {
|
||||
throw new IllegalArgumentException("Unsupported message type!");
|
||||
}
|
||||
});
|
||||
|
||||
final Route websocketRoute = path("greeter", () ->
|
||||
handleWebSocketMessages(greeter)
|
||||
);
|
||||
|
||||
// create a testing probe representing the client-side
|
||||
final WSProbe wsClient = WSProbe.create(system(), materializer());
|
||||
|
||||
// WS creates a WebSocket request for testing
|
||||
testRoute(websocketRoute).run(WS(Uri.create("/greeter"), wsClient.flow(), materializer()))
|
||||
.assertStatusCode(StatusCodes.SWITCHING_PROTOCOLS);
|
||||
|
||||
// manually run a WS conversation
|
||||
wsClient.sendMessage("Peter");
|
||||
wsClient.expectMessage("Hello Peter!");
|
||||
|
||||
wsClient.sendMessage(BinaryMessage.create(ByteString.fromString("abcdef")));
|
||||
wsClient.expectNoMessage(FiniteDuration.create(100, TimeUnit.MILLISECONDS));
|
||||
|
||||
wsClient.sendMessage("John");
|
||||
wsClient.expectMessage("Hello John!");
|
||||
|
||||
wsClient.sendCompletion();
|
||||
wsClient.expectCompletion();
|
||||
//#handleWebSocketMessages
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHandleWebSocketMessagesForProtocol() {
|
||||
//#handleWebSocketMessagesForProtocol
|
||||
final Flow<Message, Message, NotUsed> greeterService = Flow.of(Message.class).mapConcat(msg -> {
|
||||
if (msg instanceof TextMessage) {
|
||||
final TextMessage tm = (TextMessage) msg;
|
||||
final TextMessage ret = TextMessage.create(Source.single("Hello ").concat(tm.getStreamedText()).concat(Source.single("!")));
|
||||
return Collections.singletonList(ret);
|
||||
} else if (msg instanceof BinaryMessage) {
|
||||
final BinaryMessage bm = (BinaryMessage) msg;
|
||||
bm.getStreamedData().runWith(Sink.ignore(), materializer());
|
||||
return Collections.emptyList();
|
||||
} else {
|
||||
throw new IllegalArgumentException("Unsupported message type!");
|
||||
}
|
||||
});
|
||||
|
||||
final Flow<Message, Message, NotUsed> echoService = Flow.of(Message.class).buffer(1, OverflowStrategy.backpressure());
|
||||
|
||||
final Route websocketMultipleProtocolRoute = path("services", () ->
|
||||
route(
|
||||
handleWebSocketMessagesForProtocol(greeterService, "greeter"),
|
||||
handleWebSocketMessagesForProtocol(echoService, "echo")
|
||||
)
|
||||
);
|
||||
|
||||
// create a testing probe representing the client-side
|
||||
final WSProbe wsClient = WSProbe.create(system(), materializer());
|
||||
|
||||
// WS creates a WebSocket request for testing
|
||||
testRoute(websocketMultipleProtocolRoute)
|
||||
.run(WS(Uri.create("/services"), wsClient.flow(), materializer(), Arrays.asList("other", "echo")))
|
||||
.assertHeaderExists(SecWebSocketProtocol.create("echo"));
|
||||
|
||||
wsClient.sendMessage("Peter");
|
||||
wsClient.expectMessage("Peter");
|
||||
|
||||
wsClient.sendMessage(BinaryMessage.create(ByteString.fromString("abcdef")));
|
||||
wsClient.expectMessage(ByteString.fromString("abcdef"));
|
||||
|
||||
wsClient.sendMessage("John");
|
||||
wsClient.expectMessage("John");
|
||||
|
||||
wsClient.sendCompletion();
|
||||
wsClient.expectCompletion();
|
||||
//#handleWebSocketMessagesForProtocol
|
||||
}
|
||||
}
|
||||
|
|
@ -57,7 +57,7 @@ public class StreamTestKitDocTest extends AbstractJavaTest {
|
|||
|
||||
final CompletionStage<Integer> future = Source.from(Arrays.asList(1, 2, 3, 4))
|
||||
.runWith(sinkUnderTest, mat);
|
||||
final Integer result = future.toCompletableFuture().get(1, TimeUnit.SECONDS);
|
||||
final Integer result = future.toCompletableFuture().get(3, TimeUnit.SECONDS);
|
||||
assert(result == 20);
|
||||
//#strict-collection
|
||||
}
|
||||
|
|
@ -69,9 +69,9 @@ public class StreamTestKitDocTest extends AbstractJavaTest {
|
|||
.map(i -> i * 2);
|
||||
|
||||
final CompletionStage<List<Integer>> future = sourceUnderTest
|
||||
.grouped(10)
|
||||
.runWith(Sink.head(), mat);
|
||||
final List<Integer> result = future.toCompletableFuture().get(1, TimeUnit.SECONDS);
|
||||
.take(10)
|
||||
.runWith(Sink.seq(), mat);
|
||||
final List<Integer> result = future.toCompletableFuture().get(3, TimeUnit.SECONDS);
|
||||
assertEquals(result, Collections.nCopies(10, 2));
|
||||
//#grouped-infinite
|
||||
}
|
||||
|
|
@ -84,7 +84,7 @@ public class StreamTestKitDocTest extends AbstractJavaTest {
|
|||
|
||||
final CompletionStage<Integer> future = Source.from(Arrays.asList(1, 2, 3, 4, 5, 6))
|
||||
.via(flowUnderTest).runWith(Sink.fold(0, (agg, next) -> agg + next), mat);
|
||||
final Integer result = future.toCompletableFuture().get(1, TimeUnit.SECONDS);
|
||||
final Integer result = future.toCompletableFuture().get(3, TimeUnit.SECONDS);
|
||||
assert(result == 10);
|
||||
//#folded-stream
|
||||
}
|
||||
|
|
@ -101,7 +101,7 @@ public class StreamTestKitDocTest extends AbstractJavaTest {
|
|||
.grouped(2)
|
||||
.runWith(Sink.head(), mat);
|
||||
akka.pattern.PatternsCS.pipe(future, system.dispatcher()).to(probe.ref());
|
||||
probe.expectMsg(Duration.create(1, TimeUnit.SECONDS),
|
||||
probe.expectMsg(Duration.create(3, TimeUnit.SECONDS),
|
||||
Arrays.asList(Arrays.asList(1, 2), Arrays.asList(3, 4))
|
||||
);
|
||||
//#pipeto-testprobe
|
||||
|
|
@ -120,11 +120,11 @@ public class StreamTestKitDocTest extends AbstractJavaTest {
|
|||
final TestProbe probe = new TestProbe(system);
|
||||
final Cancellable cancellable = sourceUnderTest
|
||||
.to(Sink.actorRef(probe.ref(), Tick.COMPLETED)).run(mat);
|
||||
probe.expectMsg(Duration.create(1, TimeUnit.SECONDS), Tick.TOCK);
|
||||
probe.expectMsg(Duration.create(3, TimeUnit.SECONDS), Tick.TOCK);
|
||||
probe.expectNoMsg(Duration.create(100, TimeUnit.MILLISECONDS));
|
||||
probe.expectMsg(Duration.create(1, TimeUnit.SECONDS), Tick.TOCK);
|
||||
probe.expectMsg(Duration.create(3, TimeUnit.SECONDS), Tick.TOCK);
|
||||
cancellable.cancel();
|
||||
probe.expectMsg(Duration.create(1, TimeUnit.SECONDS), Tick.COMPLETED);
|
||||
probe.expectMsg(Duration.create(3, TimeUnit.SECONDS), Tick.COMPLETED);
|
||||
//#sink-actorref
|
||||
}
|
||||
|
||||
|
|
@ -193,7 +193,7 @@ public class StreamTestKitDocTest extends AbstractJavaTest {
|
|||
probe.sendError(new Exception("boom"));
|
||||
|
||||
try {
|
||||
future.toCompletableFuture().get(1, TimeUnit.SECONDS);
|
||||
future.toCompletableFuture().get(3, TimeUnit.SECONDS);
|
||||
assert false;
|
||||
} catch (ExecutionException ee) {
|
||||
final Throwable exception = ee.getCause();
|
||||
|
|
|
|||
|
|
@ -4,14 +4,16 @@
|
|||
package docs.testkit;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
import akka.actor.*;
|
||||
import akka.japi.Creator;
|
||||
import akka.japi.Function;
|
||||
import akka.testkit.AkkaJUnitActorSystemResource;
|
||||
import akka.testkit.JavaTestKit;
|
||||
import akka.testkit.TestActorRef;
|
||||
import akka.testkit.TestProbe;
|
||||
|
||||
import com.typesafe.config.ConfigFactory;
|
||||
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
|
||||
|
|
@ -187,6 +189,17 @@ public class ParentChildTest {
|
|||
}
|
||||
//#test-fabricated-parent-creator
|
||||
|
||||
@Test
|
||||
public void testProbeParentTest() throws Exception {
|
||||
//#test-TestProbe-parent
|
||||
JavaTestKit parent = new JavaTestKit(system);
|
||||
ActorRef child = parent.childActorOf(Props.create(Child.class));
|
||||
|
||||
parent.send(child, "ping");
|
||||
parent.expectMsgEquals("pong");
|
||||
//#test-TestProbe-parent
|
||||
}
|
||||
|
||||
@Test
|
||||
public void fabricatedParentTestsItsChildResponses() throws Exception {
|
||||
// didn't put final on these in order to make the parent fit in one line in the html docs
|
||||
|
|
|
|||
|
|
@ -16,4 +16,5 @@ For more information about the WebSocket support, see :ref:`server-side-websocke
|
|||
|
||||
Example
|
||||
-------
|
||||
TODO: Example snippets for JavaDSL are subject to community contributions! Help us complete the docs, read more about it here: `write example snippets for Akka HTTP Java DSL #20466 <https://github.com/akka/akka/issues/20466>`_.
|
||||
|
||||
.. includecode:: ../../../../code/docs/http/javadsl/server/directives/WebSocketDirectivesExamplesTest.java#handleWebSocketMessages
|
||||
|
|
|
|||
|
|
@ -20,4 +20,5 @@ For more information about the WebSocket support, see :ref:`server-side-websocke
|
|||
|
||||
Example
|
||||
-------
|
||||
TODO: Example snippets for JavaDSL are subject to community contributions! Help us complete the docs, read more about it here: `write example snippets for Akka HTTP Java DSL #20466 <https://github.com/akka/akka/issues/20466>`_.
|
||||
|
||||
.. includecode:: ../../../../code/docs/http/javadsl/server/directives/WebSocketDirectivesExamplesTest.java#handleWebSocketMessagesForProtocol
|
||||
|
|
|
|||
|
|
@ -150,6 +150,13 @@ The identifier must be defined with the ``persistenceId`` method.
|
|||
|
||||
.. _recovery-java:
|
||||
|
||||
.. note::
|
||||
``persistenceId`` must be unique to a given entity in the journal (database table/keyspace).
|
||||
When replaying messages persisted to the journal, you query messages with a ``persistenceId``.
|
||||
So, if two different entities share the same ``persistenceId``, message-replaying
|
||||
behavior is corrupted.
|
||||
|
||||
|
||||
Recovery
|
||||
--------
|
||||
|
||||
|
|
|
|||
|
|
@ -588,7 +588,7 @@ it returns false the element is discarded.
|
|||
**completes** when upstream completes
|
||||
|
||||
filterNot
|
||||
^^^^^^^^
|
||||
^^^^^^^^^
|
||||
Filter the incoming elements using a predicate. If the predicate returns false the element is passed downstream, if
|
||||
it returns true the element is discarded.
|
||||
|
||||
|
|
|
|||
|
|
@ -30,7 +30,7 @@ sink:
|
|||
The same strategy can be applied for sources as well. In the next example we
|
||||
have a source that produces an infinite stream of elements. Such source can be
|
||||
tested by asserting that first arbitrary number of elements hold some
|
||||
condition. Here the ``grouped`` combinator and ``Sink.head`` are very useful.
|
||||
condition. Here the ``take`` combinator and ``Sink.seq`` are very useful.
|
||||
|
||||
.. includecode:: ../code/docs/stream/StreamTestKitDocTest.java#grouped-infinite
|
||||
|
||||
|
|
|
|||
|
|
@ -470,25 +470,51 @@ Testing parent-child relationships
|
|||
|
||||
The parent of an actor is always the actor that created it. At times this leads to
|
||||
a coupling between the two that may not be straightforward to test.
|
||||
Broadly, there are three approaches to improve testability of parent-child
|
||||
relationships:
|
||||
There are several approaches to improve testability of a child actor that
|
||||
needs to refer to its parent:
|
||||
|
||||
1. when creating a child, pass an explicit reference to its parent
|
||||
2. when creating a parent, tell the parent how to create its child
|
||||
2. create the child with a ``TestProbe`` as parent
|
||||
3. create a fabricated parent when testing
|
||||
|
||||
Conversely, a parent's binding to its child can be lessened as follows:
|
||||
|
||||
4. when creating a parent, tell the parent how to create its child
|
||||
|
||||
For example, the structure of the code you want to test may follow this pattern:
|
||||
|
||||
.. includecode:: code/docs/testkit/ParentChildTest.java#test-example
|
||||
|
||||
Using dependency-injection
|
||||
^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
Introduce child to its parent
|
||||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
The first option is to avoid use of the :meth:`context.parent` function and create
|
||||
a child with a custom parent by passing an explicit reference to its parent instead.
|
||||
|
||||
.. includecode:: code/docs/testkit/ParentChildTest.java#test-dependentchild
|
||||
|
||||
Create the child using JavaTestKit
|
||||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
The ``JavaTestKit`` class can in fact create actors that will run with the test probe as parent.
|
||||
This will cause any messages the the child actor sends to `context().getParent()` to
|
||||
end up in the test probe.
|
||||
|
||||
.. includecode:: code/docs/testkit/ParentChildTest.java#test-TestProbe-parent
|
||||
|
||||
Using a fabricated parent
|
||||
^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
If you prefer to avoid modifying the child constructor you can
|
||||
create a fabricated parent in your test. This, however, does not enable you to test
|
||||
the parent actor in isolation.
|
||||
|
||||
.. includecode:: code/docs/testkit/ParentChildTest.java#test-fabricated-parent-creator
|
||||
.. includecode:: code/docs/testkit/ParentChildTest.java#test-fabricated-parent
|
||||
|
||||
Externalize child making from the parent
|
||||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
Alternatively, you can tell the parent how to create its child. There are two ways
|
||||
to do this: by giving it a :class:`Props` object or by giving it a function which takes care of creating the child actor:
|
||||
|
||||
|
|
@ -503,19 +529,10 @@ And like this in your application code:
|
|||
|
||||
.. includecode:: code/docs/testkit/ParentChildTest.java#child-maker-prod
|
||||
|
||||
Using a fabricated parent
|
||||
^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
If you prefer to avoid modifying the parent or child constructor you can
|
||||
create a fabricated parent in your test. This, however, does not enable you to test
|
||||
the parent actor in isolation.
|
||||
|
||||
.. includecode:: code/docs/testkit/ParentChildTest.java#test-fabricated-parent-creator
|
||||
.. includecode:: code/docs/testkit/ParentChildTest.java#test-fabricated-parent
|
||||
|
||||
Which of these methods is the best depends on what is most important to test. The
|
||||
most generic option is to create the parent actor by passing it a function that is
|
||||
responsible for the Actor creation, but the fabricated parent is often sufficient.
|
||||
responsible for the Actor creation, but using TestProbe or having a fabricated parent is often sufficient.
|
||||
|
||||
.. _Java-CallingThreadDispatcher:
|
||||
|
||||
|
|
|
|||
|
|
@ -25,6 +25,12 @@ to route the message with the entity id to the final destination.
|
|||
Cluster sharding will not be active on members with status :ref:`WeaklyUp <weakly_up_scala>`
|
||||
if that feature is enabled.
|
||||
|
||||
.. warning::
|
||||
**Don't use Cluster Sharding together with Automatic Downing**,
|
||||
since it allows the cluster to split up into two separate clusters, which in turn will result
|
||||
in *multiple shards and entities* being started, one in each separate cluster!
|
||||
See :ref:`automatic-vs-manual-downing-java`.
|
||||
|
||||
An Example
|
||||
----------
|
||||
|
||||
|
|
@ -306,6 +312,12 @@ cannot startup because of corrupt data, which may happen if accidentally
|
|||
two clusters were running at the same time, e.g. caused by using auto-down
|
||||
and there was a network partition.
|
||||
|
||||
.. warning::
|
||||
**Don't use Cluster Sharding together with Automatic Downing**,
|
||||
since it allows the cluster to split up into two separate clusters, which in turn will result
|
||||
in *multiple shards and entities* being started, one in each separate cluster!
|
||||
See :ref:`automatic-vs-manual-downing-scala`.
|
||||
|
||||
Use this program as a standalone Java main program::
|
||||
|
||||
java -classpath <jar files, including akka-cluster-sharding>
|
||||
|
|
|
|||
|
|
@ -73,7 +73,7 @@ Especially the last point is something you should be aware of — in general whe
|
|||
you should take care of downing nodes yourself and not rely on the timing based auto-down feature.
|
||||
|
||||
.. warning::
|
||||
**Be very careful when using Cluster Singleton together with Automatic Downing**,
|
||||
**Don't use Cluster Singleton together with Automatic Downing**,
|
||||
since it allows the cluster to split up into two separate clusters, which in turn will result
|
||||
in *multiple Singletons* being started, one in each separate cluster!
|
||||
|
||||
|
|
|
|||
|
|
@ -121,8 +121,8 @@ be allowed to join.
|
|||
|
||||
.. _automatic-vs-manual-downing-scala:
|
||||
|
||||
Automatic vs. Manual Downing
|
||||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
Downing
|
||||
^^^^^^^
|
||||
|
||||
When a member is considered by the failure detector to be unreachable the
|
||||
leader is not allowed to perform its duties, such as changing status of
|
||||
|
|
@ -133,7 +133,17 @@ can be performed automatically or manually. By default it must be done manually,
|
|||
|
||||
It can also be performed programmatically with ``Cluster(system).down(address)``.
|
||||
|
||||
You can enable automatic downing with configuration::
|
||||
A pre-packaged solution for the downing problem is provided by
|
||||
`Split Brain Resolver <http://doc.akka.io/docs/akka/akka-commercial-addons-1.0/scala/split-brain-resolver.html>`_,
|
||||
which is part of the `Lightbend Reactive Platform <http://www.lightbend.com/platform>`_.
|
||||
If you don’t use RP, you should anyway carefully read the `documentation <http://doc.akka.io/docs/akka/akka-commercial-addons-1.0/scala/split-brain-resolver.html>`_
|
||||
of the Split Brain Resolver and make sure that the solution you are using handles the concerns
|
||||
described there.
|
||||
|
||||
Auto-downing (DO NOT USE)
|
||||
-------------------------
|
||||
|
||||
There is an atomatic downing feature that you should not use in production. For testing purpose you can enable it with configuration::
|
||||
|
||||
akka.cluster.auto-down-unreachable-after = 120s
|
||||
|
||||
|
|
@ -152,19 +162,9 @@ can also happen because of long GC pauses or system overload.
|
|||
We recommend against using the auto-down feature of Akka Cluster in production.
|
||||
This is crucial for correct behavior if you use :ref:`cluster-singleton-scala` or
|
||||
:ref:`cluster_sharding_scala`, especially together with Akka :ref:`persistence-scala`.
|
||||
For Akka Persistence with Cluster Sharding it can result in corrupt data in case
|
||||
of network partitions.
|
||||
|
||||
A pre-packaged solution for the downing problem is provided by
|
||||
`Split Brain Resolver <http://doc.akka.io/docs/akka/rp-16s01p03/scala/split-brain-resolver.html>`_,
|
||||
which is part of the Lightbend Reactive Platform. If you don’t use RP, you should anyway carefully
|
||||
read the `documentation <http://doc.akka.io/docs/akka/rp-16s01p03/scala/split-brain-resolver.html>`_
|
||||
of the Split Brain Resolver and make sure that the solution you are using handles the concerns
|
||||
described there.
|
||||
|
||||
.. note:: If you have *auto-down* enabled and the failure detector triggers, you
|
||||
can over time end up with a lot of single node clusters if you don't put
|
||||
measures in place to shut down nodes that have become ``unreachable``. This
|
||||
follows from the fact that the ``unreachable`` node will likely see the rest of
|
||||
the cluster as ``unreachable``, become its own leader and form its own cluster.
|
||||
|
||||
Leaving
|
||||
^^^^^^^
|
||||
|
|
|
|||
|
|
@ -18,25 +18,23 @@ import scala.concurrent.duration._
|
|||
import scala.concurrent.{ Future, Promise }
|
||||
import akka.testkit.AkkaSpec
|
||||
|
||||
private[this] object TimeoutDirectivesTestConfig {
|
||||
private[this] object TimeoutDirectivesInfiniteTimeoutTestConfig {
|
||||
val testConf: Config = ConfigFactory.parseString("""
|
||||
akka.loggers = ["akka.testkit.TestEventListener"]
|
||||
akka.loglevel = ERROR
|
||||
akka.stdout-loglevel = ERROR
|
||||
windows-connection-abort-workaround-enabled = auto
|
||||
akka.log-dead-letters = OFF
|
||||
akka.http.server.request-timeout = 1000s""")
|
||||
// large timeout - 1000s (please note - setting to infinite will disable Timeout-Access header
|
||||
// and withRequestTimeout will not work)
|
||||
akka.http.server.request-timeout = infinite""")
|
||||
}
|
||||
|
||||
class TimeoutDirectivesExamplesSpec extends AkkaSpec(TimeoutDirectivesTestConfig.testConf)
|
||||
class TimeoutDirectivesExamplesSpec extends AkkaSpec(TimeoutDirectivesInfiniteTimeoutTestConfig.testConf)
|
||||
with ScalaFutures with CompileOnlySpec {
|
||||
//#testSetup
|
||||
import system.dispatcher
|
||||
implicit val materializer = ActorMaterializer()
|
||||
|
||||
def slowFuture(): Future[String] = Promise[String].future // move to Future.never in Scala 2.12
|
||||
def slowFuture(): Future[String] = Promise[String].future // TODO: move to Future.never in Scala 2.12
|
||||
|
||||
def runRoute(route: Route, routePath: String): HttpResponse = {
|
||||
val (_, hostname, port) = TestUtils.temporaryServerHostnameAndPort()
|
||||
|
|
@ -51,8 +49,9 @@ class TimeoutDirectivesExamplesSpec extends AkkaSpec(TimeoutDirectivesTestConfig
|
|||
|
||||
//#
|
||||
|
||||
// demonstrates that timeout is correctly set despite infinite initial value of akka.http.server.request-timeout
|
||||
"Request Timeout" should {
|
||||
"be configurable in routing layer" in {
|
||||
"be configurable in routing layer despite infinite initial value of request-timeout" in {
|
||||
//#withRequestTimeout-plain
|
||||
val route =
|
||||
path("timeout") {
|
||||
|
|
@ -124,3 +123,47 @@ class TimeoutDirectivesExamplesSpec extends AkkaSpec(TimeoutDirectivesTestConfig
|
|||
}
|
||||
|
||||
}
|
||||
|
||||
private[this] object TimeoutDirectivesFiniteTimeoutTestConfig {
|
||||
val testConf: Config = ConfigFactory.parseString("""
|
||||
akka.loggers = ["akka.testkit.TestEventListener"]
|
||||
akka.loglevel = ERROR
|
||||
akka.stdout-loglevel = ERROR
|
||||
windows-connection-abort-workaround-enabled = auto
|
||||
akka.log-dead-letters = OFF
|
||||
akka.http.server.request-timeout = 1000s""")
|
||||
}
|
||||
|
||||
class TimeoutDirectivesFiniteTimeoutExamplesSpec extends AkkaSpec(TimeoutDirectivesFiniteTimeoutTestConfig.testConf)
|
||||
with ScalaFutures with CompileOnlySpec {
|
||||
import system.dispatcher
|
||||
implicit val materializer = ActorMaterializer()
|
||||
|
||||
def slowFuture(): Future[String] = Promise[String].future // TODO: move to Future.never in Scala 2.12
|
||||
|
||||
def runRoute(route: Route, routePath: String): HttpResponse = {
|
||||
val (_, hostname, port) = TestUtils.temporaryServerHostnameAndPort()
|
||||
val binding = Http().bindAndHandle(route, hostname, port)
|
||||
|
||||
val response = Http().singleRequest(HttpRequest(uri = s"http://$hostname:$port/$routePath")).futureValue
|
||||
|
||||
binding.flatMap(_.unbind()).futureValue
|
||||
|
||||
response
|
||||
}
|
||||
|
||||
// demonstrates that timeout is correctly modified for finite initial values of akka.http.server.request-timeout
|
||||
"Request Timeout" should {
|
||||
"be configurable in routing layer for finite initial value of request-timeout" in {
|
||||
val route =
|
||||
path("timeout") {
|
||||
withRequestTimeout(1.seconds) { // modifies the global akka.http.server.request-timeout for this request
|
||||
val response: Future[String] = slowFuture() // very slow
|
||||
complete(response)
|
||||
}
|
||||
}
|
||||
runRoute(route, "timeout").status should ===(StatusCodes.ServiceUnavailable) // the timeout response
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -22,7 +22,7 @@ class StreamTestKitDocSpec extends AkkaSpec {
|
|||
val sinkUnderTest = Flow[Int].map(_ * 2).toMat(Sink.fold(0)(_ + _))(Keep.right)
|
||||
|
||||
val future = Source(1 to 4).runWith(sinkUnderTest)
|
||||
val result = Await.result(future, 100.millis)
|
||||
val result = Await.result(future, 3.seconds)
|
||||
assert(result == 20)
|
||||
//#strict-collection
|
||||
}
|
||||
|
|
@ -34,8 +34,8 @@ class StreamTestKitDocSpec extends AkkaSpec {
|
|||
|
||||
val sourceUnderTest = Source.repeat(1).map(_ * 2)
|
||||
|
||||
val future = sourceUnderTest.grouped(10).runWith(Sink.head)
|
||||
val result = Await.result(future, 100.millis)
|
||||
val future = sourceUnderTest.take(10).runWith(Sink.seq)
|
||||
val result = Await.result(future, 3.seconds)
|
||||
assert(result == Seq.fill(10)(2))
|
||||
//#grouped-infinite
|
||||
}
|
||||
|
|
@ -45,7 +45,7 @@ class StreamTestKitDocSpec extends AkkaSpec {
|
|||
val flowUnderTest = Flow[Int].takeWhile(_ < 5)
|
||||
|
||||
val future = Source(1 to 10).via(flowUnderTest).runWith(Sink.fold(Seq.empty[Int])(_ :+ _))
|
||||
val result = Await.result(future, 100.millis)
|
||||
val result = Await.result(future, 3.seconds)
|
||||
assert(result == (1 to 4))
|
||||
//#folded-stream
|
||||
}
|
||||
|
|
@ -58,8 +58,8 @@ class StreamTestKitDocSpec extends AkkaSpec {
|
|||
val sourceUnderTest = Source(1 to 4).grouped(2)
|
||||
|
||||
val probe = TestProbe()
|
||||
sourceUnderTest.grouped(2).runWith(Sink.head).pipeTo(probe.ref)
|
||||
probe.expectMsg(100.millis, Seq(Seq(1, 2), Seq(3, 4)))
|
||||
sourceUnderTest.runWith(Sink.seq).pipeTo(probe.ref)
|
||||
probe.expectMsg(3.seconds, Seq(Seq(1, 2), Seq(3, 4)))
|
||||
//#pipeto-testprobe
|
||||
}
|
||||
|
||||
|
|
@ -73,9 +73,9 @@ class StreamTestKitDocSpec extends AkkaSpec {
|
|||
|
||||
probe.expectMsg(1.second, Tick)
|
||||
probe.expectNoMsg(100.millis)
|
||||
probe.expectMsg(200.millis, Tick)
|
||||
probe.expectMsg(3.seconds, Tick)
|
||||
cancellable.cancel()
|
||||
probe.expectMsg(200.millis, "completed")
|
||||
probe.expectMsg(3.seconds, "completed")
|
||||
//#sink-actorref
|
||||
}
|
||||
|
||||
|
|
@ -91,7 +91,7 @@ class StreamTestKitDocSpec extends AkkaSpec {
|
|||
ref ! 3
|
||||
ref ! akka.actor.Status.Success("done")
|
||||
|
||||
val result = Await.result(future, 100.millis)
|
||||
val result = Await.result(future, 3.seconds)
|
||||
assert(result == "123")
|
||||
//#source-actorref
|
||||
}
|
||||
|
|
@ -128,7 +128,7 @@ class StreamTestKitDocSpec extends AkkaSpec {
|
|||
.run()
|
||||
probe.sendError(new Exception("boom"))
|
||||
|
||||
Await.ready(future, 100.millis)
|
||||
Await.ready(future, 3.seconds)
|
||||
val Failure(exception) = future.value.get
|
||||
assert(exception.getMessage == "boom")
|
||||
//#injecting-failure
|
||||
|
|
|
|||
|
|
@ -121,6 +121,17 @@ class ParentChildSpec extends WordSpec with Matchers with TestKitBase with Befor
|
|||
}
|
||||
}
|
||||
|
||||
//#test-TestProbe-parent
|
||||
"A TestProbe serving as parent" should {
|
||||
"test its child responses" in {
|
||||
val parent = TestProbe()
|
||||
val child = parent.childActorOf(Props[Child])
|
||||
parent.send(child, "ping")
|
||||
parent.expectMsg("pong")
|
||||
}
|
||||
}
|
||||
//#test-TestProbe-parent
|
||||
|
||||
//#test-fabricated-parent
|
||||
"A fabricated parent" should {
|
||||
"test its child responses" in {
|
||||
|
|
|
|||
|
|
@ -70,6 +70,7 @@ of programmatically provided parameter.
|
|||
|
||||
.. _ForkJoinPool documentation: https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ForkJoinPool.html
|
||||
.. _ThreadPoolExecutor documentation: https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ThreadPoolExecutor.html
|
||||
|
||||
Types of dispatchers
|
||||
--------------------
|
||||
|
||||
|
|
|
|||
|
|
@ -36,7 +36,7 @@ Building Advanced Directives
|
|||
----------------------------
|
||||
|
||||
This example will showcase the advanced logging using the ``DebuggingDirectives``.
|
||||
The built `logResponseTime ` directive will log the request time (or rejection reason):
|
||||
The built `logResponseTime` directive will log the request time (or rejection reason):
|
||||
|
||||
.. includecode2:: ../../../../code/docs/http/scaladsl/server/directives/DebuggingDirectivesExamplesSpec.scala
|
||||
:snippet: logRequestResultWithResponseTime
|
||||
|
|
|
|||
|
|
@ -133,6 +133,13 @@ The identifier must be defined with the ``persistenceId`` method.
|
|||
|
||||
.. _recovery:
|
||||
|
||||
.. note::
|
||||
``persistenceId`` must be unique to a given entity in the journal (database table/keyspace).
|
||||
When replaying messages persisted to the journal, you query messages with a ``persistenceId``.
|
||||
So, if two different entities share the same ``persistenceId``, message-replaying
|
||||
behavior is corrupted.
|
||||
|
||||
|
||||
Recovery
|
||||
--------
|
||||
|
||||
|
|
|
|||
|
|
@ -30,7 +30,7 @@ sink:
|
|||
The same strategy can be applied for sources as well. In the next example we
|
||||
have a source that produces an infinite stream of elements. Such source can be
|
||||
tested by asserting that first arbitrary number of elements hold some
|
||||
condition. Here the ``grouped`` combinator and ``Sink.head`` are very useful.
|
||||
condition. Here the ``take`` combinator and ``Sink.seq`` are very useful.
|
||||
|
||||
.. includecode:: ../code/docs/stream/StreamTestKitDocSpec.scala#grouped-infinite
|
||||
|
||||
|
|
|
|||
|
|
@ -548,26 +548,51 @@ Testing parent-child relationships
|
|||
----------------------------------
|
||||
|
||||
The parent of an actor is always the actor that created it. At times this leads to
|
||||
a coupling between the two that may not be straightforward to test.
|
||||
Broadly, there are three approaches to improve testability of parent-child
|
||||
relationships:
|
||||
a coupling between the two that may not be straightforward to test.
|
||||
There are several approaches to improve testability of a child actor that
|
||||
needs to refer to its parent:
|
||||
|
||||
1. when creating a child, pass an explicit reference to its parent
|
||||
2. when creating a parent, tell the parent how to create its child
|
||||
1. when creating a child, pass an explicit reference to its parent
|
||||
2. create the child with a ``TestProbe`` as parent
|
||||
3. create a fabricated parent when testing
|
||||
|
||||
Conversely, a parent's binding to its child can be lessened as follows:
|
||||
|
||||
4. when creating a parent, tell the parent how to create its child
|
||||
|
||||
For example, the structure of the code you want to test may follow this pattern:
|
||||
|
||||
.. includecode:: code/docs/testkit/ParentChildSpec.scala#test-example
|
||||
|
||||
Using dependency-injection
|
||||
^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
Introduce child to its parent
|
||||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
The first option is to avoid use of the :meth:`context.parent` function and create
|
||||
a child with a custom parent by passing an explicit reference to its parent instead.
|
||||
|
||||
.. includecode:: code/docs/testkit/ParentChildSpec.scala#test-dependentchild
|
||||
|
||||
Create the child using TestProbe
|
||||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
The ``TestProbe`` class can in fact create actors that will run with the test probe as parent.
|
||||
This will cause any messages the the child actor sends to `context.parent` to
|
||||
end up in the test probe.
|
||||
|
||||
.. includecode:: code/docs/testkit/ParentChildSpec.scala##test-TestProbe-parent
|
||||
|
||||
Using a fabricated parent
|
||||
^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
If you prefer to avoid modifying the parent or child constructor you can
|
||||
create a fabricated parent in your test. This, however, does not enable you to test
|
||||
the parent actor in isolation.
|
||||
|
||||
.. includecode:: code/docs/testkit/ParentChildSpec.scala#test-fabricated-parent
|
||||
|
||||
Externalize child making from the parent
|
||||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
Alternatively, you can tell the parent how to create its child. There are two ways
|
||||
to do this: by giving it a :class:`Props` object or by giving it a function which takes care of creating the child actor:
|
||||
|
||||
|
|
@ -581,14 +606,6 @@ And like this in your application code:
|
|||
|
||||
.. includecode:: code/docs/testkit/ParentChildSpec.scala#child-maker-prod
|
||||
|
||||
Using a fabricated parent
|
||||
^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||
|
||||
If you prefer to avoid modifying the parent or child constructor you can
|
||||
create a fabricated parent in your test. This, however, does not enable you to test
|
||||
the parent actor in isolation.
|
||||
|
||||
.. includecode:: code/docs/testkit/ParentChildSpec.scala#test-fabricated-parent
|
||||
|
||||
Which of these methods is the best depends on what is most important to test. The
|
||||
most generic option is to create the parent actor by passing it a function that is
|
||||
|
|
|
|||
|
|
@ -0,0 +1,20 @@
|
|||
/**
|
||||
* Copyright (C) 2016-2016 Lightbend Inc. <http://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.http.javadsl.model.headers;
|
||||
|
||||
import akka.http.impl.util.Util;
|
||||
|
||||
/**
|
||||
* Model for the `Sec-WebSocket-Protocol` header.
|
||||
*/
|
||||
public abstract class SecWebSocketProtocol extends akka.http.scaladsl.model.HttpHeader {
|
||||
public abstract Iterable<String> getProtocols();
|
||||
|
||||
public static SecWebSocketProtocol create(String... protocols) {
|
||||
return new akka.http.scaladsl.model.headers.Sec$minusWebSocket$minusProtocol(Util.convertArray(protocols));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
|
@ -4,25 +4,24 @@
|
|||
|
||||
package akka.http.impl.engine.client
|
||||
|
||||
import akka.actor._
|
||||
import akka.event.LoggingAdapter
|
||||
import akka.http.impl.engine.client.PoolConductor.{ ConnectEagerlyCommand, DispatchCommand, SlotCommand }
|
||||
import akka.http.impl.engine.client.PoolSlot.SlotEvent.ConnectedEagerly
|
||||
import akka.http.scaladsl.model.{ HttpEntity, HttpRequest, HttpResponse }
|
||||
import akka.stream._
|
||||
import akka.stream.actor._
|
||||
import akka.stream.impl.{ ActorProcessor, ConstantFun, ExposedPublisher, SeqActorName, SubscribePending }
|
||||
import akka.stream.scaladsl._
|
||||
import akka.stream.stage.GraphStageLogic.EagerTerminateOutput
|
||||
import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler }
|
||||
|
||||
import scala.collection.immutable
|
||||
import scala.concurrent.Future
|
||||
import scala.language.existentials
|
||||
import scala.util.{ Failure, Success }
|
||||
import scala.collection.JavaConverters._
|
||||
|
||||
private object PoolSlot {
|
||||
import PoolFlow.{ RequestContext, ResponseContext }
|
||||
|
||||
sealed trait ProcessorOut
|
||||
final case class ResponseDelivery(response: ResponseContext) extends ProcessorOut
|
||||
sealed trait RawSlotEvent extends ProcessorOut
|
||||
sealed trait RawSlotEvent
|
||||
sealed trait SlotEvent extends RawSlotEvent
|
||||
object SlotEvent {
|
||||
final case class RequestCompletedFuture(future: Future[RequestCompleted]) extends RawSlotEvent
|
||||
|
|
@ -36,242 +35,136 @@ private object PoolSlot {
|
|||
final case class ConnectedEagerly(slotIx: Int) extends SlotEvent
|
||||
}
|
||||
|
||||
private val slotProcessorActorName = SeqActorName("SlotProcessor")
|
||||
|
||||
/*
|
||||
Stream Setup
|
||||
============
|
||||
|
||||
Request- +-----------+ +-------------+ +-------------+ +------------+
|
||||
Context | Slot- | List[ | flatten | Processor- | doubler | | SlotEvent- | Response-
|
||||
+--------->| Processor +------------->| (MapConcat) +------------->| (MapConcat) +---->| Split +------------->
|
||||
| | Processor- | | Out | | | | Context
|
||||
+-----------+ Out] +-------------+ +-------------+ +-----+------+
|
||||
| RawSlotEvent
|
||||
| (to Conductor
|
||||
| via slotEventMerge)
|
||||
v
|
||||
*/
|
||||
def apply(slotIx: Int, connectionFlow: Flow[HttpRequest, HttpResponse, Any])(implicit system: ActorSystem, fm: Materializer): Graph[FanOutShape2[SlotCommand, ResponseContext, RawSlotEvent], Any] =
|
||||
GraphDSL.create() { implicit b ⇒
|
||||
import GraphDSL.Implicits._
|
||||
|
||||
// TODO wouldn't be better to have them under a known parent? /user/SlotProcessor-0 seems weird
|
||||
val name = slotProcessorActorName.next()
|
||||
|
||||
val slotProcessor = b.add {
|
||||
Flow.fromProcessor { () ⇒
|
||||
val actor = system.actorOf(
|
||||
Props(new SlotProcessor(slotIx, connectionFlow)).withDeploy(Deploy.local),
|
||||
name)
|
||||
ActorProcessor[SlotCommand, List[ProcessorOut]](actor)
|
||||
}.mapConcat(ConstantFun.scalaIdentityFunction)
|
||||
}
|
||||
val split = b.add(Broadcast[ProcessorOut](2))
|
||||
|
||||
slotProcessor ~> split.in
|
||||
|
||||
new FanOutShape2(
|
||||
slotProcessor.in,
|
||||
split.out(0).collect { case ResponseDelivery(r) ⇒ r }.outlet,
|
||||
split.out(1).collect { case r: RawSlotEvent ⇒ r }.outlet)
|
||||
}
|
||||
|
||||
import ActorPublisherMessage._
|
||||
import ActorSubscriberMessage._
|
||||
def apply(slotIx: Int, connectionFlow: Flow[HttpRequest, HttpResponse, Any])(implicit m: Materializer): Graph[FanOutShape2[SlotCommand, ResponseContext, RawSlotEvent], Any] =
|
||||
new SlotProcessor(slotIx, connectionFlow, ActorMaterializerHelper.downcast(m).logger)
|
||||
|
||||
/**
|
||||
* An actor managing a series of materializations of the given `connectionFlow`.
|
||||
* To the outside it provides a stable flow stage, consuming `SlotCommand` instances on its
|
||||
* input (ActorSubscriber) side and producing `List[ProcessorOut]` instances on its output
|
||||
* (ActorPublisher) side.
|
||||
* input side and producing `ResponseContext` and `RawSlotEvent` instances on its outputs.
|
||||
* The given `connectionFlow` is materialized into a running flow whenever required.
|
||||
* Completion and errors from the connection are not surfaced to the outside (unless we are
|
||||
* shutting down completely).
|
||||
*/
|
||||
private class SlotProcessor(slotIx: Int, connectionFlow: Flow[HttpRequest, HttpResponse, Any])(implicit fm: Materializer)
|
||||
extends ActorSubscriber with ActorPublisher[List[ProcessorOut]] with ActorLogging {
|
||||
var exposedPublisher: akka.stream.impl.ActorPublisher[Any] = _
|
||||
var inflightRequests = immutable.Queue.empty[RequestContext]
|
||||
private class SlotProcessor(slotIx: Int, connectionFlow: Flow[HttpRequest, HttpResponse, Any], log: LoggingAdapter)(implicit fm: Materializer)
|
||||
extends GraphStage[FanOutShape2[SlotCommand, ResponseContext, RawSlotEvent]] {
|
||||
|
||||
val runnableGraph = Source.actorPublisher[HttpRequest](flowInportProps(self))
|
||||
.via(connectionFlow)
|
||||
.toMat(Sink.actorSubscriber[HttpResponse](flowOutportProps(self)))(Keep.both)
|
||||
.named("SlotProcessorInternalConnectionFlow")
|
||||
val in: Inlet[SlotCommand] = Inlet("SlotProcessor.in")
|
||||
val responsesOut: Outlet[ResponseContext] = Outlet("SlotProcessor.responsesOut")
|
||||
val eventsOut: Outlet[RawSlotEvent] = Outlet("SlotProcessor.eventsOut")
|
||||
|
||||
override def requestStrategy = ZeroRequestStrategy
|
||||
override def shape: FanOutShape2[SlotCommand, ResponseContext, RawSlotEvent] = new FanOutShape2(in, responsesOut, eventsOut)
|
||||
|
||||
/**
|
||||
* How PoolProcessor changes its `receive`:
|
||||
* waitingExposedPublisher -> waitingForSubscribePending -> unconnected ->
|
||||
* waitingForDemandFromConnection OR waitingEagerlyConnected -> running
|
||||
* Given slot can become get to 'running' state via 'waitingForDemandFromConnection' or 'waitingEagerlyConnected'.
|
||||
* The difference between those two paths is that the first one is lazy - reacts to DispatchCommand and then uses
|
||||
* inport and outport actors to obtain more items.
|
||||
* Where the second one is eager - reacts to SlotShouldConnectCommand from PoolConductor, sends SlotEvent.ConnectedEagerly
|
||||
* back to conductor and then waits for the first DispatchCommand
|
||||
*/
|
||||
override def receive = waitingExposedPublisher
|
||||
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler {
|
||||
private var firstRequest: RequestContext = _
|
||||
private val inflightRequests = new java.util.ArrayDeque[RequestContext]()
|
||||
|
||||
def waitingExposedPublisher: Receive = {
|
||||
case ExposedPublisher(publisher) ⇒
|
||||
exposedPublisher = publisher
|
||||
context.become(waitingForSubscribePending)
|
||||
case other ⇒ throw new IllegalStateException(s"The first message must be `ExposedPublisher` but was [$other]")
|
||||
}
|
||||
private var connectionFlowSource: SubSourceOutlet[HttpRequest] = _
|
||||
private var connectionFlowSink: SubSinkInlet[HttpResponse] = _
|
||||
|
||||
def waitingForSubscribePending: Receive = {
|
||||
case SubscribePending ⇒
|
||||
exposedPublisher.takePendingSubscribers() foreach (s ⇒ self ! ActorPublisher.Internal.Subscribe(s))
|
||||
log.debug("become unconnected, from subscriber pending")
|
||||
context.become(unconnected)
|
||||
}
|
||||
private var isConnected = false
|
||||
|
||||
val unconnected: Receive = {
|
||||
case OnNext(DispatchCommand(rc: RequestContext)) ⇒
|
||||
val (connInport, connOutport) = runnableGraph.run()
|
||||
connOutport ! Request(totalDemand)
|
||||
context.become(waitingForDemandFromConnection(connInport = connInport, connOutport = connOutport, rc))
|
||||
def disconnect(ex: Option[Throwable] = None) = {
|
||||
connectionFlowSource.complete()
|
||||
if (isConnected) {
|
||||
isConnected = false
|
||||
|
||||
case OnNext(ConnectEagerlyCommand) ⇒
|
||||
val (in, out) = runnableGraph.run()
|
||||
onNext(SlotEvent.ConnectedEagerly(slotIx) :: Nil)
|
||||
out ! Request(totalDemand)
|
||||
context.become(waitingEagerlyConnected(connInport = in, connOutport = out))
|
||||
// if there was an error sending the request may have been sent so decrement retriesLeft
|
||||
// otherwise the downstream hasn't sent so sent them back without modifying retriesLeft
|
||||
val (retries, failures) = ex.map { fail ⇒
|
||||
val (inflightRetry, inflightFail) = inflightRequests.iterator().asScala.partition(_.retriesLeft > 0)
|
||||
val retries = inflightRetry.map(rc ⇒ SlotEvent.RetryRequest(rc.copy(retriesLeft = rc.retriesLeft - 1))).toList
|
||||
val failures = inflightFail.map(rc ⇒ ResponseContext(rc, Failure(fail))).toList
|
||||
(retries, failures)
|
||||
}.getOrElse((inflightRequests.iterator().asScala.map(rc ⇒ SlotEvent.RetryRequest(rc)).toList, Nil))
|
||||
|
||||
case Request(_) ⇒ if (remainingRequested == 0) request(1) // ask for first request if necessary
|
||||
inflightRequests.clear()
|
||||
|
||||
case OnComplete ⇒ onComplete()
|
||||
case OnError(e) ⇒ onError(e)
|
||||
case Cancel ⇒
|
||||
cancel()
|
||||
shutdown()
|
||||
|
||||
case c @ FromConnection(msg) ⇒ // ignore ...
|
||||
}
|
||||
|
||||
def waitingEagerlyConnected(connInport: ActorRef, connOutport: ActorRef): Receive = {
|
||||
case FromConnection(Request(n)) ⇒
|
||||
request(n)
|
||||
|
||||
case OnNext(DispatchCommand(rc: RequestContext)) ⇒
|
||||
inflightRequests = inflightRequests.enqueue(rc)
|
||||
request(1)
|
||||
connInport ! OnNext(rc.request)
|
||||
context.become(running(connInport, connOutport))
|
||||
}
|
||||
|
||||
def waitingForDemandFromConnection(connInport: ActorRef, connOutport: ActorRef,
|
||||
firstRequest: RequestContext): Receive = {
|
||||
case ev @ (Request(_) | Cancel) ⇒ connOutport ! ev
|
||||
case ev @ (OnComplete | OnError(_)) ⇒ connInport ! ev
|
||||
case OnNext(x) ⇒ throw new IllegalStateException("Unrequested RequestContext: " + x)
|
||||
|
||||
case FromConnection(Request(n)) ⇒
|
||||
inflightRequests = inflightRequests.enqueue(firstRequest)
|
||||
request(n - remainingRequested)
|
||||
connInport ! OnNext(firstRequest.request)
|
||||
context.become(running(connInport, connOutport))
|
||||
|
||||
case FromConnection(Cancel) ⇒ if (!isActive) { cancel(); shutdown() } // else ignore and wait for accompanying OnComplete or OnError
|
||||
case FromConnection(OnComplete) ⇒ handleDisconnect(sender(), None, Some(firstRequest))
|
||||
case FromConnection(OnError(e)) ⇒ handleDisconnect(sender(), Some(e), Some(firstRequest))
|
||||
case FromConnection(OnNext(x)) ⇒ throw new IllegalStateException("Unexpected HttpResponse: " + x)
|
||||
}
|
||||
|
||||
def running(connInport: ActorRef, connOutport: ActorRef): Receive = {
|
||||
case ev @ (Request(_) | Cancel) ⇒ connOutport ! ev
|
||||
case ev @ (OnComplete | OnError(_)) ⇒ connInport ! ev
|
||||
case OnNext(DispatchCommand(rc: RequestContext)) ⇒
|
||||
inflightRequests = inflightRequests.enqueue(rc)
|
||||
connInport ! OnNext(rc.request)
|
||||
|
||||
case FromConnection(Request(n)) ⇒ request(n)
|
||||
case FromConnection(Cancel) ⇒ if (!isActive) { cancel(); shutdown() } // else ignore and wait for accompanying OnComplete or OnError
|
||||
|
||||
case FromConnection(OnNext(response: HttpResponse)) ⇒
|
||||
val requestContext = inflightRequests.head
|
||||
inflightRequests = inflightRequests.tail
|
||||
val (entity, whenCompleted) = HttpEntity.captureTermination(response.entity)
|
||||
val delivery = ResponseDelivery(ResponseContext(requestContext, Success(response withEntity entity)))
|
||||
import fm.executionContext
|
||||
val requestCompleted = SlotEvent.RequestCompletedFuture(whenCompleted.map(_ ⇒ SlotEvent.RequestCompleted(slotIx)))
|
||||
onNext(delivery :: requestCompleted :: Nil)
|
||||
|
||||
case FromConnection(OnComplete) ⇒ handleDisconnect(sender(), None)
|
||||
case FromConnection(OnError(e)) ⇒ handleDisconnect(sender(), Some(e))
|
||||
}
|
||||
|
||||
def handleDisconnect(connInport: ActorRef, error: Option[Throwable], firstContext: Option[RequestContext] = None): Unit = {
|
||||
log.debug("Slot {} disconnected after {}", slotIx, error getOrElse "regular connection close")
|
||||
|
||||
val results: List[ProcessorOut] = {
|
||||
if (inflightRequests.isEmpty && firstContext.isDefined) {
|
||||
(error match {
|
||||
case Some(err) ⇒ ResponseDelivery(ResponseContext(firstContext.get, Failure(new UnexpectedDisconnectException("Unexpected (early) disconnect", err))))
|
||||
case _ ⇒ ResponseDelivery(ResponseContext(firstContext.get, Failure(new UnexpectedDisconnectException("Unexpected (early) disconnect"))))
|
||||
}) :: Nil
|
||||
} else {
|
||||
inflightRequests.map { rc ⇒
|
||||
if (rc.retriesLeft == 0) {
|
||||
val reason = error.fold[Throwable](new UnexpectedDisconnectException("Unexpected disconnect"))(ConstantFun.scalaIdentityFunction)
|
||||
connInport ! ActorPublisherMessage.Cancel
|
||||
ResponseDelivery(ResponseContext(rc, Failure(reason)))
|
||||
} else SlotEvent.RetryRequest(rc.copy(retriesLeft = rc.retriesLeft - 1))
|
||||
}(collection.breakOut)
|
||||
emitMultiple(responsesOut, failures)
|
||||
emitMultiple(eventsOut, SlotEvent.Disconnected(slotIx, retries.size + failures.size) :: retries, () ⇒ if (failures.isEmpty && !hasBeenPulled(in)) pull(in))
|
||||
}
|
||||
}
|
||||
inflightRequests = immutable.Queue.empty
|
||||
onNext(SlotEvent.Disconnected(slotIx, results.size) :: results)
|
||||
if (canceled) onComplete()
|
||||
|
||||
context.become(unconnected)
|
||||
// SourceOutlet is connected to the connectionFlow's inlet, when the connectionFlow
|
||||
// completes (e.g. connection closed) complete the subflow and emit the Disconnected event
|
||||
private val connectionOutFlowHandler = new OutHandler {
|
||||
// inner stream pulls, we either give first request or pull upstream
|
||||
override def onPull(): Unit = {
|
||||
if (firstRequest != null) {
|
||||
inflightRequests.add(firstRequest)
|
||||
connectionFlowSource.push(firstRequest.request)
|
||||
firstRequest = null
|
||||
} else pull(in)
|
||||
}
|
||||
|
||||
override def onDownstreamFinish(): Unit = connectionFlowSource.complete()
|
||||
}
|
||||
|
||||
// SinkInlet is connected to the connectionFlow's outlet, an upstream
|
||||
// complete indicates the remote has shutdown cleanly, a failure is
|
||||
// abnormal termination/connection refused. Successful requests
|
||||
// will show up in `onPush`
|
||||
private val connectionInFlowHandler = new InHandler {
|
||||
// inner stream pushes we push downstream
|
||||
override def onPush(): Unit = {
|
||||
val response = connectionFlowSink.grab()
|
||||
val requestContext = inflightRequests.pop
|
||||
|
||||
val (entity, whenCompleted) = HttpEntity.captureTermination(response.entity)
|
||||
import fm.executionContext
|
||||
push(responsesOut, ResponseContext(requestContext, Success(response withEntity entity)))
|
||||
push(eventsOut, SlotEvent.RequestCompletedFuture(whenCompleted.map(_ ⇒ SlotEvent.RequestCompleted(slotIx))))
|
||||
}
|
||||
|
||||
override def onUpstreamFinish(): Unit = disconnect()
|
||||
|
||||
override def onUpstreamFailure(ex: Throwable): Unit = disconnect(Some(ex))
|
||||
}
|
||||
|
||||
// upstream pushes we create the inner stream if necessary or push if we're already connected
|
||||
override def onPush(): Unit = {
|
||||
def establishConnectionFlow() = {
|
||||
connectionFlowSource = new SubSourceOutlet[HttpRequest]("RequestSource")
|
||||
connectionFlowSource.setHandler(connectionOutFlowHandler)
|
||||
|
||||
connectionFlowSink = new SubSinkInlet[HttpResponse]("ResponseSink")
|
||||
connectionFlowSink.setHandler(connectionInFlowHandler)
|
||||
|
||||
isConnected = true
|
||||
|
||||
Source.fromGraph(connectionFlowSource.source)
|
||||
.via(connectionFlow).runWith(Sink.fromGraph(connectionFlowSink.sink))(subFusingMaterializer)
|
||||
|
||||
connectionFlowSink.pull()
|
||||
}
|
||||
|
||||
grab(in) match {
|
||||
case ConnectEagerlyCommand ⇒
|
||||
if (!isConnected) establishConnectionFlow()
|
||||
|
||||
emit(eventsOut, ConnectedEagerly(slotIx))
|
||||
|
||||
case DispatchCommand(rc: RequestContext) ⇒
|
||||
if (isConnected) {
|
||||
inflightRequests.add(rc)
|
||||
connectionFlowSource.push(rc.request)
|
||||
} else {
|
||||
firstRequest = rc
|
||||
establishConnectionFlow()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
setHandler(in, this)
|
||||
|
||||
setHandler(responsesOut, new OutHandler {
|
||||
override def onPull(): Unit = {
|
||||
// downstream pulls, if connected we pull inner
|
||||
if (isConnected) connectionFlowSink.pull()
|
||||
else if (!hasBeenPulled(in)) pull(in)
|
||||
}
|
||||
})
|
||||
|
||||
setHandler(eventsOut, EagerTerminateOutput)
|
||||
}
|
||||
|
||||
override def onComplete(): Unit = {
|
||||
exposedPublisher.shutdown(None)
|
||||
super.onComplete()
|
||||
shutdown()
|
||||
}
|
||||
|
||||
override def onError(cause: Throwable): Unit = {
|
||||
exposedPublisher.shutdown(Some(cause))
|
||||
super.onError(cause)
|
||||
shutdown()
|
||||
}
|
||||
|
||||
def shutdown(): Unit = context.stop(self)
|
||||
}
|
||||
|
||||
private case class FromConnection(ev: Any) extends NoSerializationVerificationNeeded
|
||||
|
||||
private class FlowInportActor(slotProcessor: ActorRef) extends ActorPublisher[HttpRequest] with ActorLogging {
|
||||
def receive: Receive = {
|
||||
case ev: Request ⇒ slotProcessor ! FromConnection(ev)
|
||||
case OnNext(r: HttpRequest) ⇒ onNext(r)
|
||||
case OnComplete ⇒ onCompleteThenStop()
|
||||
case OnError(e) ⇒ onErrorThenStop(e)
|
||||
case Cancel ⇒
|
||||
slotProcessor ! FromConnection(Cancel)
|
||||
context.stop(self)
|
||||
}
|
||||
}
|
||||
def flowInportProps(s: ActorRef) = Props(new FlowInportActor(s)).withDeploy(Deploy.local)
|
||||
|
||||
private class FlowOutportActor(slotProcessor: ActorRef) extends ActorSubscriber with ActorLogging {
|
||||
def requestStrategy = ZeroRequestStrategy
|
||||
def receive: Receive = {
|
||||
case Request(n) ⇒ request(n)
|
||||
case Cancel ⇒ cancel()
|
||||
case ev: OnNext ⇒ slotProcessor ! FromConnection(ev)
|
||||
case ev @ (OnComplete | OnError(_)) ⇒
|
||||
slotProcessor ! FromConnection(ev)
|
||||
context.stop(self)
|
||||
}
|
||||
}
|
||||
def flowOutportProps(s: ActorRef) = Props(new FlowOutportActor(s)).withDeploy(Deploy.local)
|
||||
|
||||
final class UnexpectedDisconnectException(msg: String, cause: Throwable) extends RuntimeException(msg, cause) {
|
||||
def this(msg: String) = this(msg, null)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -85,10 +85,7 @@ private[http] object HttpServerBluePrint {
|
|||
BidiFlow.fromFlows(Flow[HttpResponse], new PrepareRequests(settings))
|
||||
|
||||
def requestTimeoutSupport(timeout: Duration): BidiFlow[HttpResponse, HttpResponse, HttpRequest, HttpRequest, NotUsed] =
|
||||
timeout match {
|
||||
case x: FiniteDuration ⇒ BidiFlow.fromGraph(new RequestTimeoutSupport(x)).reversed
|
||||
case _ ⇒ BidiFlow.identity
|
||||
}
|
||||
BidiFlow.fromGraph(new RequestTimeoutSupport(timeout)).reversed
|
||||
|
||||
/**
|
||||
* Two state stage, either transforms an incoming RequestOutput into a HttpRequest with strict entity and then pushes
|
||||
|
|
@ -249,7 +246,7 @@ private[http] object HttpServerBluePrint {
|
|||
.via(MapError[ResponseRenderingOutput](errorHandler).named("errorLogger"))
|
||||
}
|
||||
|
||||
class RequestTimeoutSupport(initialTimeout: FiniteDuration)
|
||||
class RequestTimeoutSupport(initialTimeout: Duration)
|
||||
extends GraphStage[BidiShape[HttpRequest, HttpRequest, HttpResponse, HttpResponse]] {
|
||||
private val requestIn = Inlet[HttpRequest]("requestIn")
|
||||
private val requestOut = Outlet[HttpRequest]("requestOut")
|
||||
|
|
@ -307,13 +304,23 @@ private[http] object HttpServerBluePrint {
|
|||
val timeout: Duration,
|
||||
val handler: HttpRequest ⇒ HttpResponse)
|
||||
|
||||
private class TimeoutAccessImpl(request: HttpRequest, initialTimeout: FiniteDuration, requestEnd: Future[Unit],
|
||||
private object DummyCancellable extends Cancellable {
|
||||
override def isCancelled: Boolean = true
|
||||
override def cancel(): Boolean = true
|
||||
}
|
||||
|
||||
private class TimeoutAccessImpl(request: HttpRequest, initialTimeout: Duration, requestEnd: Future[Unit],
|
||||
trigger: AsyncCallback[(TimeoutAccess, HttpResponse)], materializer: Materializer)
|
||||
extends AtomicReference[Future[TimeoutSetup]] with TimeoutAccess with (HttpRequest ⇒ HttpResponse) { self ⇒
|
||||
import materializer.executionContext
|
||||
|
||||
set {
|
||||
requestEnd.fast.map(_ ⇒ new TimeoutSetup(Deadline.now, schedule(initialTimeout, this), initialTimeout, this))
|
||||
initialTimeout match {
|
||||
case timeout: FiniteDuration ⇒ set {
|
||||
requestEnd.fast.map(_ ⇒ new TimeoutSetup(Deadline.now, schedule(timeout, this), timeout, this))
|
||||
}
|
||||
case _ ⇒ set {
|
||||
requestEnd.fast.map(_ ⇒ new TimeoutSetup(Deadline.now, DummyCancellable, Duration.Inf, this))
|
||||
}
|
||||
}
|
||||
|
||||
override def apply(request: HttpRequest) =
|
||||
|
|
|
|||
|
|
@ -11,7 +11,7 @@ import akka.util.ByteString
|
|||
/**
|
||||
* Represents a WebSocket message. A message can either be a binary message or a text message.
|
||||
*/
|
||||
sealed abstract class Message {
|
||||
abstract class Message {
|
||||
/**
|
||||
* Is this message a text message? If true, [[asTextMessage]] will return this
|
||||
* text message, if false, [[asBinaryMessage]] will return this binary message.
|
||||
|
|
@ -150,4 +150,4 @@ object BinaryMessage {
|
|||
case sm.ws.BinaryMessage.Strict(data) ⇒ create(data)
|
||||
case bm: sm.ws.BinaryMessage ⇒ create(bm.dataStream.asJava)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -766,11 +766,14 @@ private[http] object `Sec-WebSocket-Protocol` extends ModeledCompanion[`Sec-WebS
|
|||
* INTERNAL API
|
||||
*/
|
||||
private[http] final case class `Sec-WebSocket-Protocol`(protocols: immutable.Seq[String])
|
||||
extends RequestResponseHeader {
|
||||
extends jm.headers.SecWebSocketProtocol with RequestResponseHeader {
|
||||
require(protocols.nonEmpty, "Sec-WebSocket-Protocol.protocols must not be empty")
|
||||
import `Sec-WebSocket-Protocol`.protocolsRenderer
|
||||
protected[http] def renderValue[R <: Rendering](r: R): r.type = r ~~ protocols
|
||||
protected def companion = `Sec-WebSocket-Protocol`
|
||||
|
||||
/** Java API */
|
||||
override def getProtocols: Iterable[String] = protocols.asJava
|
||||
}
|
||||
|
||||
// http://tools.ietf.org/html/rfc6455#section-4.3
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@
|
|||
|
||||
package akka.http.scaladsl.model.ws
|
||||
|
||||
import akka.stream.javadsl
|
||||
import akka.stream.scaladsl.Source
|
||||
import akka.util.ByteString
|
||||
|
||||
|
|
@ -11,18 +12,22 @@ import akka.util.ByteString
|
|||
/**
|
||||
* The ADT for WebSocket messages. A message can either be a binary or a text message.
|
||||
*/
|
||||
sealed trait Message // FIXME: Why don't we extend akka.http.javadsl.model.ws.Message here?
|
||||
sealed trait Message extends akka.http.javadsl.model.ws.Message
|
||||
|
||||
/**
|
||||
* Represents a WebSocket text message. A text message can either be a [[TextMessage.Strict]] in which case
|
||||
* the complete data is already available or it can be [[TextMessage.Streamed]] in which case `textStream`
|
||||
* will return a Source streaming the data as it comes in.
|
||||
*/
|
||||
sealed trait TextMessage extends Message {
|
||||
sealed trait TextMessage extends akka.http.javadsl.model.ws.TextMessage with Message {
|
||||
/**
|
||||
* The contents of this message as a stream.
|
||||
*/
|
||||
def textStream: Source[String, _]
|
||||
|
||||
/** Java API */
|
||||
override def getStreamedText: javadsl.Source[String, _] = textStream.asJava
|
||||
override def asScala: TextMessage = this
|
||||
}
|
||||
//#message-model
|
||||
object TextMessage {
|
||||
|
|
@ -36,9 +41,18 @@ object TextMessage {
|
|||
final case class Strict(text: String) extends TextMessage {
|
||||
def textStream: Source[String, _] = Source.single(text)
|
||||
override def toString: String = s"TextMessage.Strict($text)"
|
||||
|
||||
/** Java API */
|
||||
override def getStrictText: String = text
|
||||
override def isStrict: Boolean = true
|
||||
}
|
||||
|
||||
final case class Streamed(textStream: Source[String, _]) extends TextMessage {
|
||||
override def toString: String = s"TextMessage.Streamed($textStream)"
|
||||
|
||||
/** Java API */
|
||||
override def getStrictText: String = throw new IllegalStateException("Cannot get strict text for streamed message.")
|
||||
override def isStrict: Boolean = false
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -48,11 +62,15 @@ object TextMessage {
|
|||
* will return a Source streaming the data as it comes in.
|
||||
*/
|
||||
//#message-model
|
||||
sealed trait BinaryMessage extends Message {
|
||||
sealed trait BinaryMessage extends akka.http.javadsl.model.ws.BinaryMessage with Message {
|
||||
/**
|
||||
* The contents of this message as a stream.
|
||||
*/
|
||||
def dataStream: Source[ByteString, _]
|
||||
|
||||
/** Java API */
|
||||
override def getStreamedData: javadsl.Source[ByteString, _] = dataStream.asJava
|
||||
override def asScala: BinaryMessage = this
|
||||
}
|
||||
//#message-model
|
||||
object BinaryMessage {
|
||||
|
|
@ -66,8 +84,16 @@ object BinaryMessage {
|
|||
final case class Strict(data: ByteString) extends BinaryMessage {
|
||||
def dataStream: Source[ByteString, _] = Source.single(data)
|
||||
override def toString: String = s"BinaryMessage.Strict($data)"
|
||||
|
||||
/** Java API */
|
||||
override def getStrictData: ByteString = data
|
||||
override def isStrict: Boolean = true
|
||||
}
|
||||
final case class Streamed(dataStream: Source[ByteString, _]) extends BinaryMessage {
|
||||
override def toString: String = s"BinaryMessage.Streamed($dataStream)"
|
||||
|
||||
/** Java API */
|
||||
override def getStrictData: ByteString = throw new IllegalStateException("Cannot get strict data for streamed message.")
|
||||
override def isStrict: Boolean = false
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -11,7 +11,7 @@ import java.util.concurrent.atomic.AtomicInteger
|
|||
|
||||
import akka.http.impl.engine.client.PoolMasterActor.PoolInterfaceRunning
|
||||
import akka.http.impl.settings.ConnectionPoolSettingsImpl
|
||||
import akka.http.impl.util.{ SingletonException, StreamUtils }
|
||||
import akka.http.impl.util.SingletonException
|
||||
import akka.http.scaladsl.model._
|
||||
import akka.http.scaladsl.model.headers._
|
||||
import akka.http.scaladsl.settings.{ ClientConnectionSettings, ConnectionPoolSettings, ServerSettings }
|
||||
|
|
@ -335,6 +335,67 @@ class ConnectionPoolSpec extends AkkaSpec("""
|
|||
}
|
||||
}
|
||||
|
||||
"be able to handle 500 `Connection: close` requests against the test server" in new TestSetup {
|
||||
val settings = ConnectionPoolSettings(system).withMaxConnections(4)
|
||||
val poolFlow = Http().cachedHostConnectionPool[Int](serverHostName, serverPort, settings = settings)
|
||||
|
||||
val N = 500
|
||||
val requestIds = Source.fromIterator(() ⇒ Iterator.from(1)).take(N)
|
||||
val idSum = requestIds.map(id ⇒ HttpRequest(uri = s"/r$id").withHeaders(Connection("close")) → id).via(poolFlow).map {
|
||||
case (Success(response), id) ⇒
|
||||
requestUri(response) should endWith(s"/r$id")
|
||||
id
|
||||
case x ⇒ fail(x.toString)
|
||||
}.runFold(0)(_ + _)
|
||||
|
||||
(1 to N).foreach(_ ⇒ acceptIncomingConnection())
|
||||
|
||||
Await.result(idSum, 10.seconds) shouldEqual N * (N + 1) / 2
|
||||
}
|
||||
|
||||
"be able to handle 500 pipelined requests with connection termination" in new TestSetup(autoAccept = true) {
|
||||
def closeHeader(): List[Connection] =
|
||||
if (util.Random.nextInt(8) == 0) Connection("close") :: Nil
|
||||
else Nil
|
||||
|
||||
override def testServerHandler(connNr: Int): HttpRequest ⇒ HttpResponse = { r ⇒
|
||||
val idx = r.uri.path.tail.head.toString
|
||||
HttpResponse()
|
||||
.withHeaders(RawHeader("Req-Idx", idx) +: responseHeaders(r, connNr))
|
||||
.withDefaultHeaders(closeHeader())
|
||||
}
|
||||
|
||||
for (pipeliningLimit ← Iterator.from(1).map(math.pow(2, _).toInt).take(4)) {
|
||||
val settings = ConnectionPoolSettings(system).withMaxConnections(4).withPipeliningLimit(pipeliningLimit).withMaxOpenRequests(4 * pipeliningLimit)
|
||||
val poolFlow = Http().cachedHostConnectionPool[Int](serverHostName, serverPort, settings = settings)
|
||||
|
||||
def method() =
|
||||
if (util.Random.nextInt(2) == 0) HttpMethods.POST else HttpMethods.GET
|
||||
|
||||
def request(i: Int) =
|
||||
HttpRequest(method = method(), headers = closeHeader(), uri = s"/$i") → i
|
||||
|
||||
try {
|
||||
val N = 200
|
||||
val (_, idSum) =
|
||||
Source.fromIterator(() ⇒ Iterator.from(1)).take(N)
|
||||
.map(request)
|
||||
.viaMat(poolFlow)(Keep.right)
|
||||
.map {
|
||||
case (Success(response), id) ⇒
|
||||
requestUri(response) should endWith(s"/$id")
|
||||
id
|
||||
case x ⇒ fail(x.toString)
|
||||
}.toMat(Sink.fold(0)(_ + _))(Keep.both).run()
|
||||
|
||||
Await.result(idSum, 30.seconds) shouldEqual N * (N + 1) / 2
|
||||
} catch {
|
||||
case thr: Throwable ⇒
|
||||
throw new RuntimeException(s"Failed at pipeliningLimit=$pipeliningLimit, poolFlow=$poolFlow", thr)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class TestSetup(
|
||||
serverSettings: ServerSettings = ServerSettings(system),
|
||||
autoAccept: Boolean = false) {
|
||||
|
|
|
|||
|
|
@ -40,7 +40,7 @@ class HttpServerSpec extends AkkaSpec(
|
|||
|
|
||||
|""")
|
||||
|
||||
expectRequest() shouldEqual HttpRequest(uri = "http://example.com/", headers = List(Host("example.com")))
|
||||
expectRequest() mapHeaders (_.filterNot(_.is("timeout-access"))) shouldEqual HttpRequest(uri = "http://example.com/", headers = List(Host("example.com")))
|
||||
|
||||
shutdownBlueprint()
|
||||
})
|
||||
|
|
@ -141,7 +141,7 @@ class HttpServerSpec extends AkkaSpec(
|
|||
|
|
||||
|abcdefghijkl""")
|
||||
|
||||
expectRequest() shouldEqual
|
||||
expectRequest() mapHeaders (_.filterNot(_.is("timeout-access"))) shouldEqual
|
||||
HttpRequest(
|
||||
method = POST,
|
||||
uri = "http://example.com/strict",
|
||||
|
|
@ -205,7 +205,7 @@ class HttpServerSpec extends AkkaSpec(
|
|||
|
|
||||
|abcdefghijkl""")
|
||||
|
||||
expectRequest() shouldEqual
|
||||
expectRequest() mapHeaders (_.filterNot(_.is("timeout-access"))) shouldEqual
|
||||
HttpRequest(
|
||||
method = POST,
|
||||
uri = "http://example.com/strict",
|
||||
|
|
@ -218,7 +218,7 @@ class HttpServerSpec extends AkkaSpec(
|
|||
|
|
||||
|mnopqrstuvwx""")
|
||||
|
||||
expectRequest() shouldEqual
|
||||
expectRequest() mapHeaders (_.filterNot(_.is("timeout-access"))) shouldEqual
|
||||
HttpRequest(
|
||||
method = POST,
|
||||
uri = "http://example.com/next-strict",
|
||||
|
|
@ -446,7 +446,7 @@ class HttpServerSpec extends AkkaSpec(
|
|||
|Host: example.com
|
||||
|
|
||||
|""")
|
||||
expectRequest() shouldEqual HttpRequest(GET, uri = "http://example.com/", headers = List(Host("example.com")))
|
||||
expectRequest() mapHeaders (_.filterNot(_.is("timeout-access"))) shouldEqual HttpRequest(GET, uri = "http://example.com/", headers = List(Host("example.com")))
|
||||
shutdownBlueprint()
|
||||
})
|
||||
|
||||
|
|
@ -456,7 +456,7 @@ class HttpServerSpec extends AkkaSpec(
|
|||
|Host: example.com
|
||||
|
|
||||
|""")
|
||||
expectRequest() shouldEqual HttpRequest(HEAD, uri = "http://example.com/", headers = List(Host("example.com")))
|
||||
expectRequest() mapHeaders (_.filterNot(_.is("timeout-access"))) shouldEqual HttpRequest(HEAD, uri = "http://example.com/", headers = List(Host("example.com")))
|
||||
shutdownBlueprint()
|
||||
})
|
||||
|
||||
|
|
@ -696,7 +696,7 @@ class HttpServerSpec extends AkkaSpec(
|
|||
|
|
||||
|""".stripMarginWithNewline("\r\n"))
|
||||
|
||||
expectRequest() shouldEqual HttpRequest(uri = "http://example.com/", headers = List(Host("example.com")))
|
||||
expectRequest() mapHeaders (_.filterNot(_.is("timeout-access"))) shouldEqual HttpRequest(uri = "http://example.com/", headers = List(Host("example.com")))
|
||||
|
||||
responses.expectRequest()
|
||||
responses.sendError(new RuntimeException("CRASH BOOM BANG"))
|
||||
|
|
@ -758,7 +758,7 @@ class HttpServerSpec extends AkkaSpec(
|
|||
|
|
||||
|""")
|
||||
|
||||
expectRequest() shouldEqual HttpRequest(uri = "http://example.com//foo", headers = List(Host("example.com")))
|
||||
expectRequest() mapHeaders (_.filterNot(_.is("timeout-access"))) shouldEqual HttpRequest(uri = "http://example.com//foo", headers = List(Host("example.com")))
|
||||
shutdownBlueprint()
|
||||
})
|
||||
|
||||
|
|
@ -767,7 +767,7 @@ class HttpServerSpec extends AkkaSpec(
|
|||
|
|
||||
|""")
|
||||
|
||||
expectRequest() shouldEqual HttpRequest(uri = "http://example.com/abc", protocol = HttpProtocols.`HTTP/1.0`)
|
||||
expectRequest() mapHeaders (_.filterNot(_.is("timeout-access"))) shouldEqual HttpRequest(uri = "http://example.com/abc", protocol = HttpProtocols.`HTTP/1.0`)
|
||||
|
||||
override def settings: ServerSettings = super.settings.withDefaultHostHeader(Host("example.com"))
|
||||
|
||||
|
|
|
|||
|
|
@ -447,7 +447,7 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll wit
|
|||
val serverInSub = serverIn.expectSubscription()
|
||||
serverInSub.request(1)
|
||||
private val HttpRequest(POST, uri, List(Accept(Seq(MediaRanges.`*/*`)), Host(_, _), `User-Agent`(_)),
|
||||
Chunked(`chunkedContentType`, chunkStream), HttpProtocols.`HTTP/1.1`) = serverIn.expectNext()
|
||||
Chunked(`chunkedContentType`, chunkStream), HttpProtocols.`HTTP/1.1`) = serverIn.expectNext() mapHeaders (_.filterNot(_.is("timeout-access")))
|
||||
uri shouldEqual Uri(s"http://$hostname:$port/chunked")
|
||||
Await.result(chunkStream.limit(5).runWith(Sink.seq), 100.millis) shouldEqual chunks
|
||||
|
||||
|
|
|
|||
|
|
@ -29,7 +29,7 @@ import akka.stream.Materializer
|
|||
*
|
||||
* See `JUnitRouteTest` for an example of a concrete implementation.
|
||||
*/
|
||||
abstract class RouteTest extends AllDirectives {
|
||||
abstract class RouteTest extends AllDirectives with WSTestRequestBuilding {
|
||||
implicit def system: ActorSystem
|
||||
implicit def materializer: Materializer
|
||||
implicit def executionContext: ExecutionContextExecutor = system.dispatcher
|
||||
|
|
|
|||
|
|
@ -0,0 +1,108 @@
|
|||
/*
|
||||
* Copyright (C) 2016-2016 Lightbend Inc. <http://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.http.javadsl.testkit
|
||||
|
||||
import akka.NotUsed
|
||||
import akka.actor.ActorSystem
|
||||
import akka.http.javadsl.model.ws.Message
|
||||
import akka.stream.Materializer
|
||||
import akka.stream.javadsl.Flow
|
||||
import akka.stream.scaladsl
|
||||
import akka.util.ByteString
|
||||
|
||||
import akka.http.scaladsl.{ testkit ⇒ st }
|
||||
|
||||
import akka.http.impl.util.JavaMapping.Implicits._
|
||||
|
||||
import scala.concurrent.duration._
|
||||
|
||||
/**
|
||||
* A WSProbe is a probe that implements a `Flow[Message, Message, Unit]` for testing
|
||||
* websocket code.
|
||||
*
|
||||
* Requesting elements is handled automatically.
|
||||
*/
|
||||
class WSProbe(delegate: st.WSProbe) {
|
||||
|
||||
def flow: Flow[Message, Message, Any] = {
|
||||
val underlying = scaladsl.Flow[Message].map(_.asScala).via(delegate.flow).map(_.asJava)
|
||||
new Flow[Message, Message, NotUsed](underlying)
|
||||
}
|
||||
|
||||
/**
|
||||
* Send the given messages out of the flow.
|
||||
*/
|
||||
def sendMessage(message: Message): Unit = delegate.sendMessage(message.asScala)
|
||||
|
||||
/**
|
||||
* Send a text message containing the given string out of the flow.
|
||||
*/
|
||||
def sendMessage(text: String): Unit = delegate.sendMessage(text)
|
||||
|
||||
/**
|
||||
* Send a binary message containing the given bytes out of the flow.
|
||||
*/
|
||||
def sendMessage(bytes: ByteString): Unit = delegate.sendMessage(bytes)
|
||||
|
||||
/**
|
||||
* Complete the output side of the flow.
|
||||
*/
|
||||
def sendCompletion(): Unit = delegate.sendCompletion()
|
||||
|
||||
/**
|
||||
* Expect a message on the input side of the flow.
|
||||
*/
|
||||
def expectMessage(): Message = delegate.expectMessage()
|
||||
|
||||
/**
|
||||
* Expect a text message on the input side of the flow and compares its payload with the given one.
|
||||
* If the received message is streamed its contents are collected and then asserted against the given
|
||||
* String.
|
||||
*/
|
||||
def expectMessage(text: String): Unit = delegate.expectMessage(text)
|
||||
|
||||
/**
|
||||
* Expect a binary message on the input side of the flow and compares its payload with the given one.
|
||||
* If the received message is streamed its contents are collected and then asserted against the given
|
||||
* ByteString.
|
||||
*/
|
||||
def expectMessage(bytes: ByteString): Unit = delegate.expectMessage(bytes)
|
||||
|
||||
/**
|
||||
* Expect no message on the input side of the flow.
|
||||
*/
|
||||
def expectNoMessage(): Unit = delegate.expectNoMessage()
|
||||
|
||||
/**
|
||||
* Expect no message on the input side of the flow for the given maximum duration.
|
||||
*/
|
||||
def expectNoMessage(max: FiniteDuration): Unit = delegate.expectNoMessage(max)
|
||||
|
||||
/**
|
||||
* Expect completion on the input side of the flow.
|
||||
*/
|
||||
def expectCompletion(): Unit = delegate.expectCompletion()
|
||||
|
||||
}
|
||||
|
||||
object WSProbe {
|
||||
|
||||
// A convenient method to create WSProbe with default maxChunks and maxChunkCollectionMills
|
||||
def create(system: ActorSystem, materializer: Materializer): WSProbe = {
|
||||
create(system, materializer, 1000, 5000)
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a WSProbe to use in tests against websocket handlers.
|
||||
*
|
||||
* @param maxChunks The maximum number of chunks to collect for streamed messages.
|
||||
* @param maxChunkCollectionMills The maximum time in milliseconds to collect chunks for streamed messages.
|
||||
*/
|
||||
def create(system: ActorSystem, materializer: Materializer, maxChunks: Int, maxChunkCollectionMills: Long): WSProbe = {
|
||||
val delegate = st.WSProbe(maxChunks, maxChunkCollectionMills)(system, materializer)
|
||||
new WSProbe(delegate)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,34 @@
|
|||
/*
|
||||
* Copyright (C) 2016-2016 Lightbend Inc. <http://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.http.javadsl.testkit
|
||||
|
||||
import akka.http.javadsl.model.ws.Message
|
||||
import akka.http.javadsl.model.{ HttpRequest, Uri }
|
||||
import akka.http.scaladsl.{ model ⇒ sm }
|
||||
import akka.stream.javadsl.Flow
|
||||
|
||||
import akka.http.scaladsl.{ testkit ⇒ st }
|
||||
|
||||
import akka.http.impl.util.JavaMapping.Implicits._
|
||||
import scala.collection.JavaConverters._
|
||||
import akka.stream.{ Materializer, scaladsl }
|
||||
|
||||
trait WSTestRequestBuilding {
|
||||
|
||||
def WS(uri: Uri, clientSideHandler: Flow[Message, Message, Any], materializer: Materializer): HttpRequest = {
|
||||
WS(uri, clientSideHandler, materializer, java.util.Collections.emptyList())
|
||||
}
|
||||
|
||||
def WS(
|
||||
uri: Uri,
|
||||
clientSideHandler: Flow[Message, Message, Any],
|
||||
materializer: Materializer,
|
||||
subprotocols: java.util.List[String]): HttpRequest = {
|
||||
|
||||
val handler = scaladsl.Flow[sm.ws.Message].map(_.asJava).via(clientSideHandler).map(_.asScala)
|
||||
st.WSTestRequestBuilding.WS(uri.asScala, handler, subprotocols.asScala)(materializer)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -9,11 +9,11 @@ import akka.http.scaladsl.model.headers.{ UpgradeProtocol, Upgrade, `Sec-WebSock
|
|||
import akka.http.scaladsl.model.{ StatusCodes, HttpResponse, HttpRequest, Uri }
|
||||
import akka.http.scaladsl.model.ws.{ UpgradeToWebSocket, Message }
|
||||
import scala.collection.immutable
|
||||
import akka.stream.{ Graph, FlowShape }
|
||||
import akka.stream.{ Materializer, Graph, FlowShape }
|
||||
import akka.stream.scaladsl.Flow
|
||||
|
||||
trait WSTestRequestBuilding { self: RouteTest ⇒
|
||||
def WS(uri: Uri, clientSideHandler: Flow[Message, Message, Any], subprotocols: Seq[String] = Nil)(): HttpRequest =
|
||||
trait WSTestRequestBuilding {
|
||||
def WS(uri: Uri, clientSideHandler: Flow[Message, Message, Any], subprotocols: Seq[String] = Nil)(implicit materializer: Materializer): HttpRequest =
|
||||
HttpRequest(uri = uri)
|
||||
.addHeader(new InternalCustomHeader("UpgradeToWebSocketTestHeader") with UpgradeToWebSocket {
|
||||
def requestedProtocols: immutable.Seq[String] = subprotocols.toList
|
||||
|
|
@ -28,3 +28,5 @@ trait WSTestRequestBuilding { self: RouteTest ⇒
|
|||
}
|
||||
})
|
||||
}
|
||||
|
||||
object WSTestRequestBuilding extends WSTestRequestBuilding
|
||||
|
|
|
|||
|
|
@ -52,9 +52,8 @@ abstract class WebSocketDirectives extends SecurityDirectives {
|
|||
* Handles WebSocket requests with the given handler if the given subprotocol is offered in the request and
|
||||
* rejects other requests with an [[ExpectedWebSocketRequestRejection]] or an [[UnsupportedWebSocketSubprotocolRejection]].
|
||||
*/
|
||||
def handleWebSocketMessagesForProtocol(handler: Flow[Message, Message, Any], subprotocol: String): Route = RouteAdapter {
|
||||
val adapted = scaladsl.Flow[s.Message].map(_.asJava).via(handler).map(_.asScala)
|
||||
D.handleWebSocketMessagesForProtocol(adapted, subprotocol)
|
||||
def handleWebSocketMessagesForProtocol[T](handler: Flow[Message, Message, T], subprotocol: String): Route = RouteAdapter {
|
||||
D.handleWebSocketMessagesForProtocol(adapt(handler), subprotocol)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -68,12 +67,10 @@ abstract class WebSocketDirectives extends SecurityDirectives {
|
|||
*
|
||||
* To support several subprotocols you may chain several `handleWebSocketMessage` Routes.
|
||||
*/
|
||||
def handleWebSocketMessagesForOptionalProtocol(handler: Flow[Message, Message, Any], subprotocol: Optional[String]): Route = RouteAdapter {
|
||||
val adapted = scaladsl.Flow[s.Message].map(_.asJava).via(handler).map(_.asScala)
|
||||
D.handleWebSocketMessagesForOptionalProtocol(adapted, subprotocol.asScala)
|
||||
def handleWebSocketMessagesForOptionalProtocol[T](handler: Flow[Message, Message, T], subprotocol: Optional[String]): Route = RouteAdapter {
|
||||
D.handleWebSocketMessagesForOptionalProtocol(adapt(handler), subprotocol.asScala)
|
||||
}
|
||||
|
||||
// TODO this is because scala Message does not extend java Message - we could fix that, but http-core is stable
|
||||
private def adapt[T](handler: Flow[Message, Message, T]): scaladsl.Flow[s.Message, s.Message, NotUsed] = {
|
||||
scaladsl.Flow[s.Message].map(_.asJava).via(handler).map(_.asScala)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -805,7 +805,10 @@ private[remote] class EndpointWriter(
|
|||
}
|
||||
} catch {
|
||||
case e: NotSerializableException ⇒
|
||||
log.error(e, "Transient association error (association remains live)")
|
||||
log.error(e, "Serializer not defined for message type []. Transient association error (association remains live)", s.message.getClass)
|
||||
true
|
||||
case e: MessageSerializer.SerializationException ⇒
|
||||
log.error(e, "{} Transient association error (association remains live)", e.getMessage)
|
||||
true
|
||||
case e: EndpointException ⇒
|
||||
publishAndThrow(e, Logging.ErrorLevel)
|
||||
|
|
|
|||
|
|
@ -8,8 +8,11 @@ import akka.remote.WireFormats._
|
|||
import akka.protobuf.ByteString
|
||||
import akka.actor.ExtendedActorSystem
|
||||
import akka.remote.artery.{ EnvelopeBuffer, HeaderBuilder }
|
||||
import akka.serialization.{ Serialization, SerializationExtension, SerializerWithStringManifest }
|
||||
import akka.serialization.Serialization
|
||||
import akka.serialization.ByteBufferSerializer
|
||||
import akka.serialization.SerializationExtension
|
||||
import akka.serialization.SerializerWithStringManifest
|
||||
import scala.util.control.NonFatal
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
|
|
@ -18,6 +21,8 @@ import akka.serialization.ByteBufferSerializer
|
|||
*/
|
||||
private[akka] object MessageSerializer {
|
||||
|
||||
class SerializationException(msg: String, cause: Throwable) extends RuntimeException(msg, cause)
|
||||
|
||||
/**
|
||||
* Uses Akka Serialization for the specified ActorSystem to transform the given MessageProtocol to a message
|
||||
*/
|
||||
|
|
@ -30,23 +35,32 @@ private[akka] object MessageSerializer {
|
|||
|
||||
/**
|
||||
* Uses Akka Serialization for the specified ActorSystem to transform the given message to a MessageProtocol
|
||||
* Throws `NotSerializableException` if serializer was not configured for the message type.
|
||||
* Throws `MessageSerializer.SerializationException` if exception was thrown from `toBinary` of the
|
||||
* serializer.
|
||||
*/
|
||||
def serialize(system: ExtendedActorSystem, message: AnyRef): SerializedMessage = {
|
||||
val s = SerializationExtension(system)
|
||||
val serializer = s.findSerializerFor(message)
|
||||
val builder = SerializedMessage.newBuilder
|
||||
builder.setMessage(ByteString.copyFrom(serializer.toBinary(message)))
|
||||
builder.setSerializerId(serializer.identifier)
|
||||
serializer match {
|
||||
case ser2: SerializerWithStringManifest ⇒
|
||||
val manifest = ser2.manifest(message)
|
||||
if (manifest != "")
|
||||
builder.setMessageManifest(ByteString.copyFromUtf8(manifest))
|
||||
case _ ⇒
|
||||
if (serializer.includeManifest)
|
||||
builder.setMessageManifest(ByteString.copyFromUtf8(message.getClass.getName))
|
||||
try {
|
||||
builder.setMessage(ByteString.copyFrom(serializer.toBinary(message)))
|
||||
builder.setSerializerId(serializer.identifier)
|
||||
serializer match {
|
||||
case ser2: SerializerWithStringManifest ⇒
|
||||
val manifest = ser2.manifest(message)
|
||||
if (manifest != "")
|
||||
builder.setMessageManifest(ByteString.copyFromUtf8(manifest))
|
||||
case _ ⇒
|
||||
if (serializer.includeManifest)
|
||||
builder.setMessageManifest(ByteString.copyFromUtf8(message.getClass.getName))
|
||||
}
|
||||
builder.build
|
||||
} catch {
|
||||
case NonFatal(e) ⇒
|
||||
throw new SerializationException(s"Failed to serialize remote message [${message.getClass}] " +
|
||||
s"using serializer [${serializer.getClass}].", e)
|
||||
}
|
||||
builder.build
|
||||
}
|
||||
|
||||
def serializeForArtery(serialization: Serialization, message: AnyRef, headerBuilder: HeaderBuilder, envelope: EnvelopeBuffer): Unit = {
|
||||
|
|
|
|||
|
|
@ -909,7 +909,7 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
|
|||
* The messages are looped through the [[getAsyncCallback]] mechanism of [[GraphStage]] so they are safe to modify
|
||||
* internal state of this stage.
|
||||
*
|
||||
* This method must not (the earliest) be called after the [[GraphStageLogic]] constructor has finished running,
|
||||
* This method must (the earliest) be called after the [[GraphStageLogic]] constructor has finished running,
|
||||
* for example from the [[preStart]] callback the graph stage logic provides.
|
||||
*
|
||||
* Created [[StageActorRef]] to get messages and watch other actors in synchronous way.
|
||||
|
|
|
|||
|
|
@ -4,9 +4,12 @@
|
|||
package akka.testkit;
|
||||
|
||||
import akka.actor.Terminated;
|
||||
import scala.Option;
|
||||
import scala.runtime.AbstractFunction0;
|
||||
import akka.actor.ActorRef;
|
||||
import akka.actor.ActorSystem;
|
||||
import akka.actor.Props;
|
||||
import akka.actor.SupervisorStrategy;
|
||||
import akka.event.Logging;
|
||||
import akka.event.Logging.LogEvent;
|
||||
import akka.japi.JavaPartialFunction;
|
||||
|
|
@ -684,4 +687,43 @@ public class JavaTestKit {
|
|||
public void shutdown(ActorSystem actorSystem, Boolean verifySystemShutdown) {
|
||||
shutdown(actorSystem, null, verifySystemShutdown);
|
||||
}
|
||||
|
||||
/**
|
||||
* Spawns an actor as a child of this test actor, and returns the child's ActorRef.
|
||||
* @param props Props to create the child actor
|
||||
* @param name Actor name for the child actor
|
||||
* @param supervisorStrategy Strategy should decide what to do with failures in the actor.
|
||||
*/
|
||||
public ActorRef childActorOf(Props props, String name, SupervisorStrategy supervisorStrategy) {
|
||||
return p.childActorOf(props, name, supervisorStrategy);
|
||||
}
|
||||
|
||||
/**
|
||||
* Spawns an actor as a child of this test actor, and returns the child's ActorRef.
|
||||
* The actor will have an auto-generated name.
|
||||
* @param props Props to create the child actor
|
||||
* @param supervisorStrategy Strategy should decide what to do with failures in the actor.
|
||||
*/
|
||||
public ActorRef childActorOf(Props props, SupervisorStrategy supervisorStrategy) {
|
||||
return p.childActorOf(props, supervisorStrategy);
|
||||
}
|
||||
|
||||
/**
|
||||
* Spawns an actor as a child of this test actor, and returns the child's ActorRef.
|
||||
* The actor will be supervised using {@link SupervisorStrategy.stoppingStrategy}.
|
||||
* @param props Props to create the child actor
|
||||
* @param name Actor name for the child actor
|
||||
*/
|
||||
public ActorRef childActorOf(Props props, String name) {
|
||||
return p.childActorOf(props, name);
|
||||
}
|
||||
|
||||
/**
|
||||
* Spawns an actor as a child of this test actor, and returns the child's ActorRef.
|
||||
* The actor will have an auto-generated name and will be supervised using {@link SupervisorStrategy.stoppingStrategy}.
|
||||
* @param props Props to create the child actor
|
||||
*/
|
||||
public ActorRef childActorOf(Props props) {
|
||||
return p.childActorOf(props);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -41,6 +41,12 @@ object TestActor {
|
|||
final case class Watch(ref: ActorRef) extends NoSerializationVerificationNeeded
|
||||
final case class UnWatch(ref: ActorRef) extends NoSerializationVerificationNeeded
|
||||
final case class SetAutoPilot(ap: AutoPilot) extends NoSerializationVerificationNeeded
|
||||
final case class Spawn(props: Props, name: Option[String] = None, strategy: Option[SupervisorStrategy] = None) extends NoSerializationVerificationNeeded {
|
||||
def apply(context: ActorRefFactory): ActorRef = name match {
|
||||
case Some(n) ⇒ context.actorOf(props, n)
|
||||
case None ⇒ context.actorOf(props)
|
||||
}
|
||||
}
|
||||
|
||||
trait Message {
|
||||
def msg: AnyRef
|
||||
|
|
@ -54,6 +60,31 @@ object TestActor {
|
|||
|
||||
val FALSE = (x: Any) ⇒ false
|
||||
|
||||
/** INTERNAL API */
|
||||
private[TestActor] class DelegatingSupervisorStrategy extends SupervisorStrategy {
|
||||
import SupervisorStrategy._
|
||||
|
||||
private var delegates = Map.empty[ActorRef, SupervisorStrategy]
|
||||
|
||||
private def delegate(child: ActorRef) = delegates.get(child).getOrElse(stoppingStrategy)
|
||||
|
||||
def update(child: ActorRef, supervisor: SupervisorStrategy): Unit = delegates += (child → supervisor)
|
||||
|
||||
override def decider = defaultDecider // not actually invoked
|
||||
|
||||
override def handleChildTerminated(context: ActorContext, child: ActorRef, children: Iterable[ActorRef]): Unit = {
|
||||
delegates -= child
|
||||
}
|
||||
|
||||
override def processFailure(context: ActorContext, restart: Boolean, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Unit = {
|
||||
delegates(child).processFailure(context, restart, child, cause, stats, children)
|
||||
}
|
||||
|
||||
override def handleFailure(context: ActorContext, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Boolean = {
|
||||
delegates(child).handleFailure(context, child, cause, stats, children)
|
||||
}
|
||||
}
|
||||
|
||||
// make creator serializable, for VerifySerializabilitySpec
|
||||
def props(queue: BlockingDeque[Message]): Props = Props(classOf[TestActor], queue)
|
||||
}
|
||||
|
|
@ -61,6 +92,8 @@ object TestActor {
|
|||
class TestActor(queue: BlockingDeque[TestActor.Message]) extends Actor {
|
||||
import TestActor._
|
||||
|
||||
override val supervisorStrategy: DelegatingSupervisorStrategy = new DelegatingSupervisorStrategy
|
||||
|
||||
var ignore: Ignore = None
|
||||
|
||||
var autopilot: AutoPilot = NoAutoPilot
|
||||
|
|
@ -70,6 +103,10 @@ class TestActor(queue: BlockingDeque[TestActor.Message]) extends Actor {
|
|||
case Watch(ref) ⇒ context.watch(ref)
|
||||
case UnWatch(ref) ⇒ context.unwatch(ref)
|
||||
case SetAutoPilot(pilot) ⇒ autopilot = pilot
|
||||
case spawn: Spawn ⇒
|
||||
val actor = spawn(context)
|
||||
for (s ← spawn.strategy) supervisorStrategy(actor) = s
|
||||
queue.offerLast(RealMessage(actor, self))
|
||||
case x: AnyRef ⇒
|
||||
autopilot = autopilot.run(sender(), x) match {
|
||||
case KeepRunning ⇒ autopilot
|
||||
|
|
@ -102,7 +139,7 @@ class TestActor(queue: BlockingDeque[TestActor.Message]) extends Actor {
|
|||
*/
|
||||
trait TestKitBase {
|
||||
|
||||
import TestActor.{ Message, RealMessage, NullMessage }
|
||||
import TestActor.{ Message, RealMessage, NullMessage, Spawn }
|
||||
|
||||
implicit val system: ActorSystem
|
||||
val testKitSettings = TestKitExtension(system)
|
||||
|
|
@ -688,6 +725,46 @@ trait TestKitBase {
|
|||
TestKit.shutdownActorSystem(actorSystem, duration, verifySystemShutdown)
|
||||
}
|
||||
|
||||
/**
|
||||
* Spawns an actor as a child of this test actor, and returns the child's ActorRef.
|
||||
* @param props Props to create the child actor
|
||||
* @param name Actor name for the child actor
|
||||
* @param supervisorStrategy Strategy should decide what to do with failures in the actor.
|
||||
*/
|
||||
def childActorOf(props: Props, name: String, supervisorStrategy: SupervisorStrategy): ActorRef = {
|
||||
testActor ! Spawn(props, Some(name), Some(supervisorStrategy))
|
||||
expectMsgType[ActorRef]
|
||||
}
|
||||
|
||||
/**
|
||||
* Spawns an actor as a child of this test actor with an auto-generated name, and returns the child's ActorRef.
|
||||
* @param props Props to create the child actor
|
||||
* @param supervisorStrategy Strategy should decide what to do with failures in the actor.
|
||||
*/
|
||||
def childActorOf(props: Props, supervisorStrategy: SupervisorStrategy): ActorRef = {
|
||||
testActor ! Spawn(props, None, Some(supervisorStrategy))
|
||||
expectMsgType[ActorRef]
|
||||
}
|
||||
|
||||
/**
|
||||
* Spawns an actor as a child of this test actor with a stopping supervisor strategy, and returns the child's ActorRef.
|
||||
* @param props Props to create the child actor
|
||||
* @param name Actor name for the child actor
|
||||
*/
|
||||
def childActorOf(props: Props, name: String): ActorRef = {
|
||||
testActor ! Spawn(props, Some(name), None)
|
||||
expectMsgType[ActorRef]
|
||||
}
|
||||
|
||||
/**
|
||||
* Spawns an actor as a child of this test actor with an auto-generated name and stopping supervisor strategy, returning the child's ActorRef.
|
||||
* @param props Props to create the child actor
|
||||
*/
|
||||
def childActorOf(props: Props): ActorRef = {
|
||||
testActor ! Spawn(props, None, None)
|
||||
expectMsgType[ActorRef]
|
||||
}
|
||||
|
||||
private def format(u: TimeUnit, d: Duration) = "%.3f %s".format(d.toUnit(u), u.toString.toLowerCase)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -7,6 +7,7 @@ import scala.concurrent.{ Await }
|
|||
import scala.concurrent.duration._
|
||||
import akka.pattern.ask
|
||||
import scala.util.Try
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
|
||||
class TestProbeSpec extends AkkaSpec with DefaultTimeout {
|
||||
|
||||
|
|
@ -39,6 +40,38 @@ class TestProbeSpec extends AkkaSpec with DefaultTimeout {
|
|||
probe1.expectMsg(0 millis, "some hint here", "world")
|
||||
}
|
||||
|
||||
"create a child when invoking actorOf" in {
|
||||
val probe = TestProbe()
|
||||
val child = probe.childActorOf(TestActors.echoActorProps)
|
||||
child.path.parent should be(probe.ref.path)
|
||||
|
||||
val namedChild = probe.childActorOf(TestActors.echoActorProps, "actorName")
|
||||
namedChild.path.name should be("actorName")
|
||||
}
|
||||
|
||||
"restart a failing child if the given supervisor says so" in {
|
||||
val restarts = new AtomicInteger(0)
|
||||
|
||||
class FailingActor extends Actor {
|
||||
override def receive = msg ⇒ msg match {
|
||||
case _ ⇒
|
||||
throw new RuntimeException("simulated failure")
|
||||
}
|
||||
|
||||
override def postRestart(reason: Throwable): Unit = {
|
||||
restarts.incrementAndGet()
|
||||
}
|
||||
}
|
||||
|
||||
val probe = TestProbe()
|
||||
val child = probe.childActorOf(Props(new FailingActor), SupervisorStrategy.defaultStrategy)
|
||||
|
||||
awaitAssert {
|
||||
child ! "hello"
|
||||
restarts.get() should be > (1)
|
||||
}
|
||||
}
|
||||
|
||||
def assertFailureMessageContains(expectedHint: String)(block: ⇒ Unit) {
|
||||
Try {
|
||||
block
|
||||
|
|
|
|||
|
|
@ -969,7 +969,19 @@ object MiMa extends AutoPlugin {
|
|||
),
|
||||
"2.4.10" -> Seq(
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.RemoteActorRefProvider.quarantine"),
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.RemoteWatcher.quarantine")
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.RemoteWatcher.quarantine"),
|
||||
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOpsMat.orElseMat"),
|
||||
|
||||
// #21201 adding childActorOf to TestActor / TestKit / TestProbe
|
||||
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.testkit.TestKitBase.childActorOf$default$3"),
|
||||
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.testkit.TestKitBase.childActorOf$default$2"),
|
||||
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.testkit.TestKitBase.childActorOf"),
|
||||
|
||||
// #21184 add java api for ws testkit
|
||||
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.scaladsl.model.ws.TextMessage.asScala"),
|
||||
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.scaladsl.model.ws.TextMessage.getStreamedText"),
|
||||
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.scaladsl.model.ws.BinaryMessage.asScala"),
|
||||
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.scaladsl.model.ws.BinaryMessage.getStreamedData")
|
||||
)
|
||||
)
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue