diff --git a/akka-actor-tests/src/test/scala/akka/pattern/CircuitBreakerSpec.scala b/akka-actor-tests/src/test/scala/akka/pattern/CircuitBreakerSpec.scala index 377dbc978c..878c29adbf 100644 --- a/akka-actor-tests/src/test/scala/akka/pattern/CircuitBreakerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/pattern/CircuitBreakerSpec.scala @@ -40,6 +40,9 @@ object CircuitBreakerSpec { def multiFailureCb()(implicit system: ActorSystem, ec: ExecutionContext): Breaker = new Breaker(new CircuitBreaker(system.scheduler, 5, 200.millis.dilated, 500.millis.dilated)) + + def nonOneFactorCb()(implicit system: ActorSystem, ec: ExecutionContext): Breaker = + new Breaker(new CircuitBreaker(system.scheduler, 1, 2000.millis.dilated, 1000.millis.dilated, 1.day.dilated, 5)) } class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter { @@ -209,6 +212,28 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter { breaker().withCircuitBreaker(Future(throwException)) checkLatch(breaker.halfOpenLatch) } + + "increase the reset timeout after it transits to open again" in { + val breaker = CircuitBreakerSpec.nonOneFactorCb() + breaker().withCircuitBreaker(Future(throwException)) + checkLatch(breaker.openLatch) + + val e1 = intercept[CircuitBreakerOpenException] { breaker().withSyncCircuitBreaker(sayHi) } + val shortRemainingDuration = e1.remainingDuration + + Thread.sleep(1000.millis.dilated.toMillis) + checkLatch(breaker.halfOpenLatch) + + // transit to open again + breaker().withCircuitBreaker(Future(throwException)) + checkLatch(breaker.openLatch) + + val e2 = intercept[CircuitBreakerOpenException] { breaker().withSyncCircuitBreaker(sayHi) } + val longRemainingDuration = e2.remainingDuration + + (shortRemainingDuration < longRemainingDuration) should ===(true) + + } } "An asynchronous circuit breaker that is half-open" must { diff --git a/akka-actor/src/main/java/akka/pattern/AbstractCircuitBreaker.java b/akka-actor/src/main/java/akka/pattern/AbstractCircuitBreaker.java index 45fbc8b3ab..5dc9c24c46 100644 --- a/akka-actor/src/main/java/akka/pattern/AbstractCircuitBreaker.java +++ b/akka-actor/src/main/java/akka/pattern/AbstractCircuitBreaker.java @@ -7,10 +7,12 @@ import akka.util.Unsafe; class AbstractCircuitBreaker { protected final static long stateOffset; + protected final static long resetTimeoutOffset; static { try { stateOffset = Unsafe.instance.objectFieldOffset(CircuitBreaker.class.getDeclaredField("_currentStateDoNotCallMeDirectly")); + resetTimeoutOffset = Unsafe.instance.objectFieldOffset(CircuitBreaker.class.getDeclaredField("_currentResetTimeoutDoNotCallMeDirectly")); } catch(Throwable t){ throw new ExceptionInInitializerError(t); } diff --git a/akka-actor/src/main/scala/akka/pattern/CircuitBreaker.scala b/akka-actor/src/main/scala/akka/pattern/CircuitBreaker.scala index 9a09ba483a..ee16d2cddd 100644 --- a/akka-actor/src/main/scala/akka/pattern/CircuitBreaker.scala +++ b/akka-actor/src/main/scala/akka/pattern/CircuitBreaker.scala @@ -18,7 +18,6 @@ import scala.concurrent.TimeoutException import scala.util.control.NonFatal import scala.util.Success import akka.dispatch.ExecutionContexts.sameThreadExecutionContext -import akka.japi.function.Creator import scala.compat.java8.FutureConverters @@ -41,7 +40,6 @@ object CircuitBreaker { */ def apply(scheduler: Scheduler, maxFailures: Int, callTimeout: FiniteDuration, resetTimeout: FiniteDuration): CircuitBreaker = new CircuitBreaker(scheduler, maxFailures, callTimeout, resetTimeout)(sameThreadExecutionContext) - /** * Java API: Create a new CircuitBreaker. * @@ -71,17 +69,39 @@ object CircuitBreaker { * closed state. If it fails, the circuit breaker will re-open to open state. All calls beyond the first that * execute while the first is running will fail-fast with an exception. * - * * @param scheduler Reference to Akka scheduler * @param maxFailures Maximum number of failures before opening the circuit * @param callTimeout [[scala.concurrent.duration.FiniteDuration]] of time after which to consider a call a failure * @param resetTimeout [[scala.concurrent.duration.FiniteDuration]] of time after which to attempt to close the circuit * @param executor [[scala.concurrent.ExecutionContext]] used for execution of state transition listeners */ -class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: FiniteDuration, resetTimeout: FiniteDuration)(implicit executor: ExecutionContext) extends AbstractCircuitBreaker { +class CircuitBreaker( + scheduler: Scheduler, + maxFailures: Int, + callTimeout: FiniteDuration, + resetTimeout: FiniteDuration, + maxResetTimeout: FiniteDuration, + exponentialBackoffFactor: Double)(implicit executor: ExecutionContext) extends AbstractCircuitBreaker { + + require(exponentialBackoffFactor >= 1.0, "factor must be >= 1.0") def this(executor: ExecutionContext, scheduler: Scheduler, maxFailures: Int, callTimeout: FiniteDuration, resetTimeout: FiniteDuration) = { - this(scheduler, maxFailures, callTimeout, resetTimeout)(executor) + this(scheduler, maxFailures, callTimeout, resetTimeout, 36500.days, 1.0)(executor) + } + + // add the old constructor to make it binary compatible + def this(scheduler: Scheduler, maxFailures: Int, callTimeout: FiniteDuration, resetTimeout: FiniteDuration)(implicit executor: ExecutionContext) = { + this(scheduler, maxFailures, callTimeout, resetTimeout, 36500.days, 1.0)(executor) + } + + /** + * The `resetTimeout` will be increased exponentially for each failed attempt to close the circuit. + * The default exponential backoff factor is 2. + * + * @param maxResetTimeout the upper bound of resetTimeout + */ + def withExponentialBackoff(maxResetTimeout: FiniteDuration): CircuitBreaker = { + new CircuitBreaker(scheduler, maxFailures, callTimeout, resetTimeout, maxResetTimeout, 2.0)(executor) } /** @@ -90,6 +110,12 @@ class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Finite @volatile private[this] var _currentStateDoNotCallMeDirectly: State = Closed + /** + * Holds reference to current resetTimeout of CircuitBreaker - *access only via helper methods* + */ + @volatile + private[this] var _currentResetTimeoutDoNotCallMeDirectly: FiniteDuration = resetTimeout + /** * Helper method for access to underlying state via Unsafe * @@ -110,6 +136,20 @@ class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Finite private[this] def currentState: State = Unsafe.instance.getObjectVolatile(this, AbstractCircuitBreaker.stateOffset).asInstanceOf[State] + /** + * Helper method for updating the underlying resetTimeout via Unsafe + */ + @inline + private[this] def swapResetTimeout(oldResetTimeout: FiniteDuration, newResetTimeout: FiniteDuration): Boolean = + Unsafe.instance.compareAndSwapObject(this, AbstractCircuitBreaker.resetTimeoutOffset, oldResetTimeout, newResetTimeout) + + /** + * Helper method for accessing to the underlying resetTimeout via Unsafe + */ + @inline + private[this] def currentResetTimeout: FiniteDuration = + Unsafe.instance.getObjectVolatile(this, AbstractCircuitBreaker.resetTimeoutOffset).asInstanceOf[FiniteDuration] + /** * Wraps invocations of asynchronous calls that need to be protected * @@ -451,11 +491,14 @@ class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Finite override def callFails(): Unit = if (incrementAndGet() == maxFailures) tripBreaker(Closed) /** - * On entry of this state, failure count is reset. + * On entry of this state, failure count and resetTimeout is reset. * * @return */ - override def _enter(): Unit = set(0) + override def _enter(): Unit = { + set(0) + swapResetTimeout(currentResetTimeout, resetTimeout) + } /** * Override for more descriptive toString @@ -530,7 +573,7 @@ class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Finite */ private def remainingDuration(): FiniteDuration = { val fromOpened = System.nanoTime() - get - val diff = resetTimeout.toNanos - fromOpened + val diff = currentResetTimeout.toNanos - fromOpened if (diff <= 0L) Duration.Zero else diff.nanos } @@ -557,9 +600,16 @@ class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Finite */ override def _enter(): Unit = { set(System.nanoTime()) - scheduler.scheduleOnce(resetTimeout) { + scheduler.scheduleOnce(currentResetTimeout) { attemptReset() } + val nextResetTimeout = currentResetTimeout * exponentialBackoffFactor match { + case f: FiniteDuration ⇒ f + case _ ⇒ currentResetTimeout + } + + if (nextResetTimeout < maxResetTimeout) + swapResetTimeout(currentResetTimeout, nextResetTimeout) } /** diff --git a/akka-actor/src/main/scala/akka/util/ByteString.scala b/akka-actor/src/main/scala/akka/util/ByteString.scala index 0b07bbdcc5..d2600320d8 100644 --- a/akka-actor/src/main/scala/akka/util/ByteString.scala +++ b/akka-actor/src/main/scala/akka/util/ByteString.scala @@ -561,7 +561,7 @@ sealed abstract class ByteString extends IndexedSeq[Byte] with IndexedSeqOptimiz override def init: ByteString = dropRight(1) // *must* be overridden by derived classes. - override def take(n: Int): ByteString = throw new UnsupportedOperationException("Method slice is not implemented in ByteString") + override def take(n: Int): ByteString = throw new UnsupportedOperationException("Method take is not implemented in ByteString") override def takeRight(n: Int): ByteString = slice(length - n, length) // these methods are optimized in derived classes utilising the maximum knowlage about data layout available to them: diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf index 3dc83b0db4..978438b4be 100644 --- a/akka-cluster/src/main/resources/reference.conf +++ b/akka-cluster/src/main/resources/reference.conf @@ -26,6 +26,10 @@ akka { # unreachable nodes as DOWN after a configured time of unreachability? # Using auto-down implies that two separate clusters will automatically be # formed in case of network partition. + # + # Don't enable this in production, see 'Auto-downing (DO NOT USE)' section + # of Akka Cluster documentation. + # # Disable with "off" or specify a duration to enable auto-down. # If a downing-provider-class is configured this setting is ignored. auto-down-unreachable-after = off diff --git a/akka-cluster/src/main/scala/akka/cluster/AutoDown.scala b/akka-cluster/src/main/scala/akka/cluster/AutoDown.scala index 6274ae8d3a..8bfab0f59e 100644 --- a/akka-cluster/src/main/scala/akka/cluster/AutoDown.scala +++ b/akka-cluster/src/main/scala/akka/cluster/AutoDown.scala @@ -10,6 +10,7 @@ import scala.concurrent.duration.FiniteDuration import akka.cluster.ClusterEvent._ import scala.concurrent.duration.Duration +import akka.actor.ActorLogging /** * INTERNAL API @@ -51,7 +52,7 @@ final class AutoDowning(system: ActorSystem) extends DowningProvider { * able to unit test the logic without running cluster. */ private[cluster] class AutoDown(autoDownUnreachableAfter: FiniteDuration) - extends AutoDownBase(autoDownUnreachableAfter) { + extends AutoDownBase(autoDownUnreachableAfter) with ActorLogging { val cluster = Cluster(context.system) import cluster.InfoLogger._ @@ -62,6 +63,8 @@ private[cluster] class AutoDown(autoDownUnreachableAfter: FiniteDuration) // re-subscribe when restart override def preStart(): Unit = { + log.warning("Don't use auto-down feature of Akka Cluster in production. " + + "See 'Auto-downing (DO NOT USE)' section of Akka Cluster documentation.") cluster.subscribe(self, classOf[ClusterDomainEvent]) super.preStart() } @@ -72,7 +75,9 @@ private[cluster] class AutoDown(autoDownUnreachableAfter: FiniteDuration) override def down(node: Address): Unit = { require(leader) - logInfo("Leader is auto-downing unreachable node [{}]", node) + logInfo("Leader is auto-downing unreachable node [{}]. " + + "Don't use auto-down feature of Akka Cluster in production. " + + "See 'Auto-downing (DO NOT USE)' section of Akka Cluster documentation.", node) cluster.down(node) } @@ -162,4 +167,4 @@ private[cluster] abstract class AutoDownBase(autoDownUnreachableAfter: FiniteDur pendingUnreachable -= node } -} \ No newline at end of file +} diff --git a/akka-docs/rst/common/circuitbreaker.rst b/akka-docs/rst/common/circuitbreaker.rst index 052a9cbfec..79528ef65f 100644 --- a/akka-docs/rst/common/circuitbreaker.rst +++ b/akka-docs/rst/common/circuitbreaker.rst @@ -45,8 +45,8 @@ What do they do? * In `Half-Open` state: * The first call attempted is allowed through without failing fast * All other calls fail-fast with an exception just as in `Open` state - * If the first call succeeds, the breaker is reset back to `Closed` state - * If the first call fails, the breaker is tripped again into the `Open` state for another full `resetTimeout` + * If the first call succeeds, the breaker is reset back to `Closed` state and the `resetTimeout` is reset + * If the first call fails, the breaker is tripped again into the `Open` state (as for exponential backoff circuit breaker, the `resetTimeout` is multiplied by the exponential backoff factor) * State transition listeners: * Callbacks can be provided for every state entry via `onOpen`, `onClose`, and `onHalfOpen` * These are executed in the :class:`ExecutionContext` provided. diff --git a/akka-docs/rst/common/code/docs/circuitbreaker/CircuitBreakerDocSpec.scala b/akka-docs/rst/common/code/docs/circuitbreaker/CircuitBreakerDocSpec.scala index 5f6c8c6d16..324ce19e04 100644 --- a/akka-docs/rst/common/code/docs/circuitbreaker/CircuitBreakerDocSpec.scala +++ b/akka-docs/rst/common/code/docs/circuitbreaker/CircuitBreakerDocSpec.scala @@ -8,7 +8,7 @@ package docs.circuitbreaker import scala.concurrent.duration._ import akka.pattern.CircuitBreaker import akka.pattern.pipe -import akka.actor.{Actor, ActorLogging, ActorRef} +import akka.actor.{ Actor, ActorLogging, ActorRef } import scala.concurrent.Future @@ -44,7 +44,7 @@ class DangerousActor extends Actor with ActorLogging { } -class TellPatternActor(recipient : ActorRef) extends Actor with ActorLogging { +class TellPatternActor(recipient: ActorRef) extends Actor with ActorLogging { import context.dispatcher val breaker = diff --git a/akka-docs/rst/intro/getting-started.rst b/akka-docs/rst/intro/getting-started.rst index 08191b0006..65d346a765 100644 --- a/akka-docs/rst/intro/getting-started.rst +++ b/akka-docs/rst/intro/getting-started.rst @@ -8,7 +8,7 @@ Akka requires that you have `Java 8 `_ provides a commercial build of Akka and related projects such as Scala or Play -as part of the `Reactive Platform `_ which is made available +as part of the `Lightbend Reactive Platform `_ which is made available for Java 6 in case your project can not upgrade to Java 8 just yet. It also includes additional commercial features or libraries. Getting Started Guides and Template Projects diff --git a/akka-docs/rst/java/cluster-sharding.rst b/akka-docs/rst/java/cluster-sharding.rst index aa130ddc8f..f272fed48a 100644 --- a/akka-docs/rst/java/cluster-sharding.rst +++ b/akka-docs/rst/java/cluster-sharding.rst @@ -25,6 +25,12 @@ to route the message with the entity id to the final destination. Cluster sharding will not be active on members with status :ref:`WeaklyUp ` if that feature is enabled. +.. warning:: + **Don't use Cluster Sharding together with Automatic Downing**, + since it allows the cluster to split up into two separate clusters, which in turn will result + in *multiple shards and entities* being started, one in each separate cluster! + See :ref:`automatic-vs-manual-downing-java`. + An Example ---------- @@ -304,6 +310,12 @@ cannot startup because of corrupt data, which may happen if accidentally two clusters were running at the same time, e.g. caused by using auto-down and there was a network partition. +.. warning:: + **Don't use Cluster Sharding together with Automatic Downing**, + since it allows the cluster to split up into two separate clusters, which in turn will result + in *multiple shards and entities* being started, one in each separate cluster! + See :ref:`automatic-vs-manual-downing-java`. + Use this program as a standalone Java main program:: java -classpath diff --git a/akka-docs/rst/java/cluster-singleton.rst b/akka-docs/rst/java/cluster-singleton.rst index 64d6327a6c..f4d5457050 100644 --- a/akka-docs/rst/java/cluster-singleton.rst +++ b/akka-docs/rst/java/cluster-singleton.rst @@ -73,7 +73,7 @@ Especially the last point is something you should be aware of — in general whe you should take care of downing nodes yourself and not rely on the timing based auto-down feature. .. warning:: - **Be very careful when using Cluster Singleton together with Automatic Downing**, + **Don't use Cluster Singleton together with Automatic Downing**, since it allows the cluster to split up into two separate clusters, which in turn will result in *multiple Singletons* being started, one in each separate cluster! diff --git a/akka-docs/rst/java/cluster-usage.rst b/akka-docs/rst/java/cluster-usage.rst index c7a521af80..b012ec35d5 100644 --- a/akka-docs/rst/java/cluster-usage.rst +++ b/akka-docs/rst/java/cluster-usage.rst @@ -126,8 +126,8 @@ be allowed to join. .. _automatic-vs-manual-downing-java: -Automatic vs. Manual Downing -^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +Downing +^^^^^^^ When a member is considered by the failure detector to be unreachable the leader is not allowed to perform its duties, such as changing status of @@ -138,7 +138,17 @@ can be performed automatically or manually. By default it must be done manually, It can also be performed programmatically with ``Cluster.get(system).down(address)``. -You can enable automatic downing with configuration:: +A pre-packaged solution for the downing problem is provided by +`Split Brain Resolver `_, +which is part of the `Lightbend Reactive Platform `_. +If you don’t use RP, you should anyway carefully read the `documentation `_ +of the Split Brain Resolver and make sure that the solution you are using handles the concerns +described there. + +Auto-downing (DO NOT USE) +------------------------- + +There is an atomatic downing feature that you should not use in production. For testing purpose you can enable it with configuration:: akka.cluster.auto-down-unreachable-after = 120s @@ -157,19 +167,8 @@ can also happen because of long GC pauses or system overload. We recommend against using the auto-down feature of Akka Cluster in production. This is crucial for correct behavior if you use :ref:`cluster-singleton-java` or :ref:`cluster_sharding_java`, especially together with Akka :ref:`persistence-java`. - -A pre-packaged solution for the downing problem is provided by -`Split Brain Resolver `_, -which is part of the Lightbend Reactive Platform. If you don’t use RP, you should anyway carefully -read the `documentation `_ -of the Split Brain Resolver and make sure that the solution you are using handles the concerns -described there. - -.. note:: If you have *auto-down* enabled and the failure detector triggers, you - can over time end up with a lot of single node clusters if you don't put - measures in place to shut down nodes that have become ``unreachable``. This - follows from the fact that the ``unreachable`` node will likely see the rest of - the cluster as ``unreachable``, become its own leader and form its own cluster. + For Akka Persistence with Cluster Sharding it can result in corrupt data in case + of network partitions. Leaving ^^^^^^^ diff --git a/akka-docs/rst/java/code/docs/http/javadsl/server/directives/WebSocketDirectivesExamplesTest.java b/akka-docs/rst/java/code/docs/http/javadsl/server/directives/WebSocketDirectivesExamplesTest.java new file mode 100644 index 0000000000..14f15fa1ae --- /dev/null +++ b/akka-docs/rst/java/code/docs/http/javadsl/server/directives/WebSocketDirectivesExamplesTest.java @@ -0,0 +1,121 @@ +/* + * Copyright (C) 2016-2016 Lightbend Inc. + */ +package docs.http.javadsl.server.directives; + +import akka.NotUsed; +import akka.http.javadsl.model.HttpRequest; +import akka.http.javadsl.model.StatusCodes; +import akka.http.javadsl.model.Uri; +import akka.http.javadsl.model.headers.SecWebSocketProtocol; +import akka.http.javadsl.model.ws.BinaryMessage; +import akka.http.javadsl.model.ws.Message; +import akka.http.javadsl.model.ws.TextMessage; +import akka.http.javadsl.server.Route; +import akka.http.javadsl.testkit.JUnitRouteTest; +import akka.http.javadsl.testkit.WSProbe; +import akka.stream.OverflowStrategy; +import akka.stream.javadsl.Flow; +import akka.stream.javadsl.Sink; +import akka.stream.javadsl.Source; +import akka.util.ByteString; +import org.junit.Test; +import scala.concurrent.duration.FiniteDuration; + +import java.util.Arrays; +import java.util.Collections; +import java.util.concurrent.TimeUnit; + +public class WebSocketDirectivesExamplesTest extends JUnitRouteTest { + + @Test + public void testHandleWebSocketMessages() { + //#handleWebSocketMessages + final Flow greeter = Flow.of(Message.class).mapConcat(msg -> { + if (msg instanceof TextMessage) { + final TextMessage tm = (TextMessage) msg; + final TextMessage ret = TextMessage.create(Source.single("Hello ").concat(tm.getStreamedText()).concat(Source.single("!"))); + return Collections.singletonList(ret); + } else if (msg instanceof BinaryMessage) { + final BinaryMessage bm = (BinaryMessage) msg; + bm.getStreamedData().runWith(Sink.ignore(), materializer()); + return Collections.emptyList(); + } else { + throw new IllegalArgumentException("Unsupported message type!"); + } + }); + + final Route websocketRoute = path("greeter", () -> + handleWebSocketMessages(greeter) + ); + + // create a testing probe representing the client-side + final WSProbe wsClient = WSProbe.create(system(), materializer()); + + // WS creates a WebSocket request for testing + testRoute(websocketRoute).run(WS(Uri.create("/greeter"), wsClient.flow(), materializer())) + .assertStatusCode(StatusCodes.SWITCHING_PROTOCOLS); + + // manually run a WS conversation + wsClient.sendMessage("Peter"); + wsClient.expectMessage("Hello Peter!"); + + wsClient.sendMessage(BinaryMessage.create(ByteString.fromString("abcdef"))); + wsClient.expectNoMessage(FiniteDuration.create(100, TimeUnit.MILLISECONDS)); + + wsClient.sendMessage("John"); + wsClient.expectMessage("Hello John!"); + + wsClient.sendCompletion(); + wsClient.expectCompletion(); + //#handleWebSocketMessages + } + + @Test + public void testHandleWebSocketMessagesForProtocol() { + //#handleWebSocketMessagesForProtocol + final Flow greeterService = Flow.of(Message.class).mapConcat(msg -> { + if (msg instanceof TextMessage) { + final TextMessage tm = (TextMessage) msg; + final TextMessage ret = TextMessage.create(Source.single("Hello ").concat(tm.getStreamedText()).concat(Source.single("!"))); + return Collections.singletonList(ret); + } else if (msg instanceof BinaryMessage) { + final BinaryMessage bm = (BinaryMessage) msg; + bm.getStreamedData().runWith(Sink.ignore(), materializer()); + return Collections.emptyList(); + } else { + throw new IllegalArgumentException("Unsupported message type!"); + } + }); + + final Flow echoService = Flow.of(Message.class).buffer(1, OverflowStrategy.backpressure()); + + final Route websocketMultipleProtocolRoute = path("services", () -> + route( + handleWebSocketMessagesForProtocol(greeterService, "greeter"), + handleWebSocketMessagesForProtocol(echoService, "echo") + ) + ); + + // create a testing probe representing the client-side + final WSProbe wsClient = WSProbe.create(system(), materializer()); + + // WS creates a WebSocket request for testing + testRoute(websocketMultipleProtocolRoute) + .run(WS(Uri.create("/services"), wsClient.flow(), materializer(), Arrays.asList("other", "echo"))) + .assertHeaderExists(SecWebSocketProtocol.create("echo")); + + wsClient.sendMessage("Peter"); + wsClient.expectMessage("Peter"); + + wsClient.sendMessage(BinaryMessage.create(ByteString.fromString("abcdef"))); + wsClient.expectMessage(ByteString.fromString("abcdef")); + + wsClient.sendMessage("John"); + wsClient.expectMessage("John"); + + wsClient.sendCompletion(); + wsClient.expectCompletion(); + //#handleWebSocketMessagesForProtocol + } +} diff --git a/akka-docs/rst/java/code/docs/stream/StreamTestKitDocTest.java b/akka-docs/rst/java/code/docs/stream/StreamTestKitDocTest.java index 9580bbaddd..e3fa5a3694 100644 --- a/akka-docs/rst/java/code/docs/stream/StreamTestKitDocTest.java +++ b/akka-docs/rst/java/code/docs/stream/StreamTestKitDocTest.java @@ -57,7 +57,7 @@ public class StreamTestKitDocTest extends AbstractJavaTest { final CompletionStage future = Source.from(Arrays.asList(1, 2, 3, 4)) .runWith(sinkUnderTest, mat); - final Integer result = future.toCompletableFuture().get(1, TimeUnit.SECONDS); + final Integer result = future.toCompletableFuture().get(3, TimeUnit.SECONDS); assert(result == 20); //#strict-collection } @@ -69,9 +69,9 @@ public class StreamTestKitDocTest extends AbstractJavaTest { .map(i -> i * 2); final CompletionStage> future = sourceUnderTest - .grouped(10) - .runWith(Sink.head(), mat); - final List result = future.toCompletableFuture().get(1, TimeUnit.SECONDS); + .take(10) + .runWith(Sink.seq(), mat); + final List result = future.toCompletableFuture().get(3, TimeUnit.SECONDS); assertEquals(result, Collections.nCopies(10, 2)); //#grouped-infinite } @@ -84,7 +84,7 @@ public class StreamTestKitDocTest extends AbstractJavaTest { final CompletionStage future = Source.from(Arrays.asList(1, 2, 3, 4, 5, 6)) .via(flowUnderTest).runWith(Sink.fold(0, (agg, next) -> agg + next), mat); - final Integer result = future.toCompletableFuture().get(1, TimeUnit.SECONDS); + final Integer result = future.toCompletableFuture().get(3, TimeUnit.SECONDS); assert(result == 10); //#folded-stream } @@ -101,7 +101,7 @@ public class StreamTestKitDocTest extends AbstractJavaTest { .grouped(2) .runWith(Sink.head(), mat); akka.pattern.PatternsCS.pipe(future, system.dispatcher()).to(probe.ref()); - probe.expectMsg(Duration.create(1, TimeUnit.SECONDS), + probe.expectMsg(Duration.create(3, TimeUnit.SECONDS), Arrays.asList(Arrays.asList(1, 2), Arrays.asList(3, 4)) ); //#pipeto-testprobe @@ -120,11 +120,11 @@ public class StreamTestKitDocTest extends AbstractJavaTest { final TestProbe probe = new TestProbe(system); final Cancellable cancellable = sourceUnderTest .to(Sink.actorRef(probe.ref(), Tick.COMPLETED)).run(mat); - probe.expectMsg(Duration.create(1, TimeUnit.SECONDS), Tick.TOCK); + probe.expectMsg(Duration.create(3, TimeUnit.SECONDS), Tick.TOCK); probe.expectNoMsg(Duration.create(100, TimeUnit.MILLISECONDS)); - probe.expectMsg(Duration.create(1, TimeUnit.SECONDS), Tick.TOCK); + probe.expectMsg(Duration.create(3, TimeUnit.SECONDS), Tick.TOCK); cancellable.cancel(); - probe.expectMsg(Duration.create(1, TimeUnit.SECONDS), Tick.COMPLETED); + probe.expectMsg(Duration.create(3, TimeUnit.SECONDS), Tick.COMPLETED); //#sink-actorref } @@ -193,7 +193,7 @@ public class StreamTestKitDocTest extends AbstractJavaTest { probe.sendError(new Exception("boom")); try { - future.toCompletableFuture().get(1, TimeUnit.SECONDS); + future.toCompletableFuture().get(3, TimeUnit.SECONDS); assert false; } catch (ExecutionException ee) { final Throwable exception = ee.getCause(); diff --git a/akka-docs/rst/java/code/docs/testkit/ParentChildTest.java b/akka-docs/rst/java/code/docs/testkit/ParentChildTest.java index 1fcc747bc4..1408f1c033 100644 --- a/akka-docs/rst/java/code/docs/testkit/ParentChildTest.java +++ b/akka-docs/rst/java/code/docs/testkit/ParentChildTest.java @@ -4,14 +4,16 @@ package docs.testkit; import static org.junit.Assert.*; - import akka.actor.*; import akka.japi.Creator; import akka.japi.Function; import akka.testkit.AkkaJUnitActorSystemResource; +import akka.testkit.JavaTestKit; import akka.testkit.TestActorRef; import akka.testkit.TestProbe; + import com.typesafe.config.ConfigFactory; + import org.junit.ClassRule; import org.junit.Test; @@ -187,6 +189,17 @@ public class ParentChildTest { } //#test-fabricated-parent-creator + @Test + public void testProbeParentTest() throws Exception { + //#test-TestProbe-parent + JavaTestKit parent = new JavaTestKit(system); + ActorRef child = parent.childActorOf(Props.create(Child.class)); + + parent.send(child, "ping"); + parent.expectMsgEquals("pong"); + //#test-TestProbe-parent + } + @Test public void fabricatedParentTestsItsChildResponses() throws Exception { // didn't put final on these in order to make the parent fit in one line in the html docs diff --git a/akka-docs/rst/java/http/routing-dsl/directives/websocket-directives/handleWebSocketMessages.rst b/akka-docs/rst/java/http/routing-dsl/directives/websocket-directives/handleWebSocketMessages.rst index f63b3bb4c8..33d844bd52 100644 --- a/akka-docs/rst/java/http/routing-dsl/directives/websocket-directives/handleWebSocketMessages.rst +++ b/akka-docs/rst/java/http/routing-dsl/directives/websocket-directives/handleWebSocketMessages.rst @@ -16,4 +16,5 @@ For more information about the WebSocket support, see :ref:`server-side-websocke Example ------- -TODO: Example snippets for JavaDSL are subject to community contributions! Help us complete the docs, read more about it here: `write example snippets for Akka HTTP Java DSL #20466 `_. + +.. includecode:: ../../../../code/docs/http/javadsl/server/directives/WebSocketDirectivesExamplesTest.java#handleWebSocketMessages diff --git a/akka-docs/rst/java/http/routing-dsl/directives/websocket-directives/handleWebSocketMessagesForProtocol.rst b/akka-docs/rst/java/http/routing-dsl/directives/websocket-directives/handleWebSocketMessagesForProtocol.rst index c4f981d96b..5f88c2c8d3 100644 --- a/akka-docs/rst/java/http/routing-dsl/directives/websocket-directives/handleWebSocketMessagesForProtocol.rst +++ b/akka-docs/rst/java/http/routing-dsl/directives/websocket-directives/handleWebSocketMessagesForProtocol.rst @@ -20,4 +20,5 @@ For more information about the WebSocket support, see :ref:`server-side-websocke Example ------- -TODO: Example snippets for JavaDSL are subject to community contributions! Help us complete the docs, read more about it here: `write example snippets for Akka HTTP Java DSL #20466 `_. + +.. includecode:: ../../../../code/docs/http/javadsl/server/directives/WebSocketDirectivesExamplesTest.java#handleWebSocketMessagesForProtocol diff --git a/akka-docs/rst/java/persistence.rst b/akka-docs/rst/java/persistence.rst index 0dc8219df0..29aceb61d8 100644 --- a/akka-docs/rst/java/persistence.rst +++ b/akka-docs/rst/java/persistence.rst @@ -150,6 +150,13 @@ The identifier must be defined with the ``persistenceId`` method. .. _recovery-java: +.. note:: + ``persistenceId`` must be unique to a given entity in the journal (database table/keyspace). + When replaying messages persisted to the journal, you query messages with a ``persistenceId``. + So, if two different entities share the same ``persistenceId``, message-replaying + behavior is corrupted. + + Recovery -------- diff --git a/akka-docs/rst/java/stream/stages-overview.rst b/akka-docs/rst/java/stream/stages-overview.rst index 166d2323cf..228ac0b580 100644 --- a/akka-docs/rst/java/stream/stages-overview.rst +++ b/akka-docs/rst/java/stream/stages-overview.rst @@ -588,7 +588,7 @@ it returns false the element is discarded. **completes** when upstream completes filterNot -^^^^^^^^ +^^^^^^^^^ Filter the incoming elements using a predicate. If the predicate returns false the element is passed downstream, if it returns true the element is discarded. diff --git a/akka-docs/rst/java/stream/stream-testkit.rst b/akka-docs/rst/java/stream/stream-testkit.rst index 6f863f0810..e604fa2529 100644 --- a/akka-docs/rst/java/stream/stream-testkit.rst +++ b/akka-docs/rst/java/stream/stream-testkit.rst @@ -30,7 +30,7 @@ sink: The same strategy can be applied for sources as well. In the next example we have a source that produces an infinite stream of elements. Such source can be tested by asserting that first arbitrary number of elements hold some -condition. Here the ``grouped`` combinator and ``Sink.head`` are very useful. +condition. Here the ``take`` combinator and ``Sink.seq`` are very useful. .. includecode:: ../code/docs/stream/StreamTestKitDocTest.java#grouped-infinite diff --git a/akka-docs/rst/java/testing.rst b/akka-docs/rst/java/testing.rst index 05f574c90d..20417ab6dd 100644 --- a/akka-docs/rst/java/testing.rst +++ b/akka-docs/rst/java/testing.rst @@ -470,25 +470,51 @@ Testing parent-child relationships The parent of an actor is always the actor that created it. At times this leads to a coupling between the two that may not be straightforward to test. -Broadly, there are three approaches to improve testability of parent-child -relationships: +There are several approaches to improve testability of a child actor that +needs to refer to its parent: 1. when creating a child, pass an explicit reference to its parent -2. when creating a parent, tell the parent how to create its child +2. create the child with a ``TestProbe`` as parent 3. create a fabricated parent when testing +Conversely, a parent's binding to its child can be lessened as follows: + +4. when creating a parent, tell the parent how to create its child + For example, the structure of the code you want to test may follow this pattern: .. includecode:: code/docs/testkit/ParentChildTest.java#test-example -Using dependency-injection -^^^^^^^^^^^^^^^^^^^^^^^^^^ +Introduce child to its parent +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ The first option is to avoid use of the :meth:`context.parent` function and create a child with a custom parent by passing an explicit reference to its parent instead. .. includecode:: code/docs/testkit/ParentChildTest.java#test-dependentchild +Create the child using JavaTestKit +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +The ``JavaTestKit`` class can in fact create actors that will run with the test probe as parent. +This will cause any messages the the child actor sends to `context().getParent()` to +end up in the test probe. + +.. includecode:: code/docs/testkit/ParentChildTest.java#test-TestProbe-parent + +Using a fabricated parent +^^^^^^^^^^^^^^^^^^^^^^^^^ + +If you prefer to avoid modifying the child constructor you can +create a fabricated parent in your test. This, however, does not enable you to test +the parent actor in isolation. + +.. includecode:: code/docs/testkit/ParentChildTest.java#test-fabricated-parent-creator +.. includecode:: code/docs/testkit/ParentChildTest.java#test-fabricated-parent + +Externalize child making from the parent +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + Alternatively, you can tell the parent how to create its child. There are two ways to do this: by giving it a :class:`Props` object or by giving it a function which takes care of creating the child actor: @@ -503,19 +529,10 @@ And like this in your application code: .. includecode:: code/docs/testkit/ParentChildTest.java#child-maker-prod -Using a fabricated parent -^^^^^^^^^^^^^^^^^^^^^^^^^ - -If you prefer to avoid modifying the parent or child constructor you can -create a fabricated parent in your test. This, however, does not enable you to test -the parent actor in isolation. - -.. includecode:: code/docs/testkit/ParentChildTest.java#test-fabricated-parent-creator -.. includecode:: code/docs/testkit/ParentChildTest.java#test-fabricated-parent Which of these methods is the best depends on what is most important to test. The most generic option is to create the parent actor by passing it a function that is -responsible for the Actor creation, but the fabricated parent is often sufficient. +responsible for the Actor creation, but using TestProbe or having a fabricated parent is often sufficient. .. _Java-CallingThreadDispatcher: diff --git a/akka-docs/rst/scala/cluster-sharding.rst b/akka-docs/rst/scala/cluster-sharding.rst index 875c216bdd..f9e7a2b0c6 100644 --- a/akka-docs/rst/scala/cluster-sharding.rst +++ b/akka-docs/rst/scala/cluster-sharding.rst @@ -25,6 +25,12 @@ to route the message with the entity id to the final destination. Cluster sharding will not be active on members with status :ref:`WeaklyUp ` if that feature is enabled. +.. warning:: + **Don't use Cluster Sharding together with Automatic Downing**, + since it allows the cluster to split up into two separate clusters, which in turn will result + in *multiple shards and entities* being started, one in each separate cluster! + See :ref:`automatic-vs-manual-downing-java`. + An Example ---------- @@ -306,6 +312,12 @@ cannot startup because of corrupt data, which may happen if accidentally two clusters were running at the same time, e.g. caused by using auto-down and there was a network partition. +.. warning:: + **Don't use Cluster Sharding together with Automatic Downing**, + since it allows the cluster to split up into two separate clusters, which in turn will result + in *multiple shards and entities* being started, one in each separate cluster! + See :ref:`automatic-vs-manual-downing-scala`. + Use this program as a standalone Java main program:: java -classpath diff --git a/akka-docs/rst/scala/cluster-singleton.rst b/akka-docs/rst/scala/cluster-singleton.rst index 50cfdd1900..8b7406b56b 100644 --- a/akka-docs/rst/scala/cluster-singleton.rst +++ b/akka-docs/rst/scala/cluster-singleton.rst @@ -73,7 +73,7 @@ Especially the last point is something you should be aware of — in general whe you should take care of downing nodes yourself and not rely on the timing based auto-down feature. .. warning:: - **Be very careful when using Cluster Singleton together with Automatic Downing**, + **Don't use Cluster Singleton together with Automatic Downing**, since it allows the cluster to split up into two separate clusters, which in turn will result in *multiple Singletons* being started, one in each separate cluster! diff --git a/akka-docs/rst/scala/cluster-usage.rst b/akka-docs/rst/scala/cluster-usage.rst index fc34e35698..4c3b3ebec7 100644 --- a/akka-docs/rst/scala/cluster-usage.rst +++ b/akka-docs/rst/scala/cluster-usage.rst @@ -121,8 +121,8 @@ be allowed to join. .. _automatic-vs-manual-downing-scala: -Automatic vs. Manual Downing -^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +Downing +^^^^^^^ When a member is considered by the failure detector to be unreachable the leader is not allowed to perform its duties, such as changing status of @@ -133,7 +133,17 @@ can be performed automatically or manually. By default it must be done manually, It can also be performed programmatically with ``Cluster(system).down(address)``. -You can enable automatic downing with configuration:: +A pre-packaged solution for the downing problem is provided by +`Split Brain Resolver `_, +which is part of the `Lightbend Reactive Platform `_. +If you don’t use RP, you should anyway carefully read the `documentation `_ +of the Split Brain Resolver and make sure that the solution you are using handles the concerns +described there. + +Auto-downing (DO NOT USE) +------------------------- + +There is an atomatic downing feature that you should not use in production. For testing purpose you can enable it with configuration:: akka.cluster.auto-down-unreachable-after = 120s @@ -152,19 +162,9 @@ can also happen because of long GC pauses or system overload. We recommend against using the auto-down feature of Akka Cluster in production. This is crucial for correct behavior if you use :ref:`cluster-singleton-scala` or :ref:`cluster_sharding_scala`, especially together with Akka :ref:`persistence-scala`. + For Akka Persistence with Cluster Sharding it can result in corrupt data in case + of network partitions. -A pre-packaged solution for the downing problem is provided by -`Split Brain Resolver `_, -which is part of the Lightbend Reactive Platform. If you don’t use RP, you should anyway carefully -read the `documentation `_ -of the Split Brain Resolver and make sure that the solution you are using handles the concerns -described there. - -.. note:: If you have *auto-down* enabled and the failure detector triggers, you - can over time end up with a lot of single node clusters if you don't put - measures in place to shut down nodes that have become ``unreachable``. This - follows from the fact that the ``unreachable`` node will likely see the rest of - the cluster as ``unreachable``, become its own leader and form its own cluster. Leaving ^^^^^^^ diff --git a/akka-docs/rst/scala/code/docs/http/scaladsl/server/directives/TimeoutDirectivesExamplesSpec.scala b/akka-docs/rst/scala/code/docs/http/scaladsl/server/directives/TimeoutDirectivesExamplesSpec.scala index 415219fb77..191215dac7 100644 --- a/akka-docs/rst/scala/code/docs/http/scaladsl/server/directives/TimeoutDirectivesExamplesSpec.scala +++ b/akka-docs/rst/scala/code/docs/http/scaladsl/server/directives/TimeoutDirectivesExamplesSpec.scala @@ -18,25 +18,23 @@ import scala.concurrent.duration._ import scala.concurrent.{ Future, Promise } import akka.testkit.AkkaSpec -private[this] object TimeoutDirectivesTestConfig { +private[this] object TimeoutDirectivesInfiniteTimeoutTestConfig { val testConf: Config = ConfigFactory.parseString(""" akka.loggers = ["akka.testkit.TestEventListener"] akka.loglevel = ERROR akka.stdout-loglevel = ERROR windows-connection-abort-workaround-enabled = auto akka.log-dead-letters = OFF - akka.http.server.request-timeout = 1000s""") - // large timeout - 1000s (please note - setting to infinite will disable Timeout-Access header - // and withRequestTimeout will not work) + akka.http.server.request-timeout = infinite""") } -class TimeoutDirectivesExamplesSpec extends AkkaSpec(TimeoutDirectivesTestConfig.testConf) +class TimeoutDirectivesExamplesSpec extends AkkaSpec(TimeoutDirectivesInfiniteTimeoutTestConfig.testConf) with ScalaFutures with CompileOnlySpec { //#testSetup import system.dispatcher implicit val materializer = ActorMaterializer() - def slowFuture(): Future[String] = Promise[String].future // move to Future.never in Scala 2.12 + def slowFuture(): Future[String] = Promise[String].future // TODO: move to Future.never in Scala 2.12 def runRoute(route: Route, routePath: String): HttpResponse = { val (_, hostname, port) = TestUtils.temporaryServerHostnameAndPort() @@ -51,8 +49,9 @@ class TimeoutDirectivesExamplesSpec extends AkkaSpec(TimeoutDirectivesTestConfig //# + // demonstrates that timeout is correctly set despite infinite initial value of akka.http.server.request-timeout "Request Timeout" should { - "be configurable in routing layer" in { + "be configurable in routing layer despite infinite initial value of request-timeout" in { //#withRequestTimeout-plain val route = path("timeout") { @@ -124,3 +123,47 @@ class TimeoutDirectivesExamplesSpec extends AkkaSpec(TimeoutDirectivesTestConfig } } + +private[this] object TimeoutDirectivesFiniteTimeoutTestConfig { + val testConf: Config = ConfigFactory.parseString(""" + akka.loggers = ["akka.testkit.TestEventListener"] + akka.loglevel = ERROR + akka.stdout-loglevel = ERROR + windows-connection-abort-workaround-enabled = auto + akka.log-dead-letters = OFF + akka.http.server.request-timeout = 1000s""") +} + +class TimeoutDirectivesFiniteTimeoutExamplesSpec extends AkkaSpec(TimeoutDirectivesFiniteTimeoutTestConfig.testConf) + with ScalaFutures with CompileOnlySpec { + import system.dispatcher + implicit val materializer = ActorMaterializer() + + def slowFuture(): Future[String] = Promise[String].future // TODO: move to Future.never in Scala 2.12 + + def runRoute(route: Route, routePath: String): HttpResponse = { + val (_, hostname, port) = TestUtils.temporaryServerHostnameAndPort() + val binding = Http().bindAndHandle(route, hostname, port) + + val response = Http().singleRequest(HttpRequest(uri = s"http://$hostname:$port/$routePath")).futureValue + + binding.flatMap(_.unbind()).futureValue + + response + } + + // demonstrates that timeout is correctly modified for finite initial values of akka.http.server.request-timeout + "Request Timeout" should { + "be configurable in routing layer for finite initial value of request-timeout" in { + val route = + path("timeout") { + withRequestTimeout(1.seconds) { // modifies the global akka.http.server.request-timeout for this request + val response: Future[String] = slowFuture() // very slow + complete(response) + } + } + runRoute(route, "timeout").status should ===(StatusCodes.ServiceUnavailable) // the timeout response + } + } + +} diff --git a/akka-docs/rst/scala/code/docs/stream/StreamTestKitDocSpec.scala b/akka-docs/rst/scala/code/docs/stream/StreamTestKitDocSpec.scala index 65bee809e4..e139368129 100644 --- a/akka-docs/rst/scala/code/docs/stream/StreamTestKitDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/stream/StreamTestKitDocSpec.scala @@ -22,7 +22,7 @@ class StreamTestKitDocSpec extends AkkaSpec { val sinkUnderTest = Flow[Int].map(_ * 2).toMat(Sink.fold(0)(_ + _))(Keep.right) val future = Source(1 to 4).runWith(sinkUnderTest) - val result = Await.result(future, 100.millis) + val result = Await.result(future, 3.seconds) assert(result == 20) //#strict-collection } @@ -34,8 +34,8 @@ class StreamTestKitDocSpec extends AkkaSpec { val sourceUnderTest = Source.repeat(1).map(_ * 2) - val future = sourceUnderTest.grouped(10).runWith(Sink.head) - val result = Await.result(future, 100.millis) + val future = sourceUnderTest.take(10).runWith(Sink.seq) + val result = Await.result(future, 3.seconds) assert(result == Seq.fill(10)(2)) //#grouped-infinite } @@ -45,7 +45,7 @@ class StreamTestKitDocSpec extends AkkaSpec { val flowUnderTest = Flow[Int].takeWhile(_ < 5) val future = Source(1 to 10).via(flowUnderTest).runWith(Sink.fold(Seq.empty[Int])(_ :+ _)) - val result = Await.result(future, 100.millis) + val result = Await.result(future, 3.seconds) assert(result == (1 to 4)) //#folded-stream } @@ -58,8 +58,8 @@ class StreamTestKitDocSpec extends AkkaSpec { val sourceUnderTest = Source(1 to 4).grouped(2) val probe = TestProbe() - sourceUnderTest.grouped(2).runWith(Sink.head).pipeTo(probe.ref) - probe.expectMsg(100.millis, Seq(Seq(1, 2), Seq(3, 4))) + sourceUnderTest.runWith(Sink.seq).pipeTo(probe.ref) + probe.expectMsg(3.seconds, Seq(Seq(1, 2), Seq(3, 4))) //#pipeto-testprobe } @@ -73,9 +73,9 @@ class StreamTestKitDocSpec extends AkkaSpec { probe.expectMsg(1.second, Tick) probe.expectNoMsg(100.millis) - probe.expectMsg(200.millis, Tick) + probe.expectMsg(3.seconds, Tick) cancellable.cancel() - probe.expectMsg(200.millis, "completed") + probe.expectMsg(3.seconds, "completed") //#sink-actorref } @@ -91,7 +91,7 @@ class StreamTestKitDocSpec extends AkkaSpec { ref ! 3 ref ! akka.actor.Status.Success("done") - val result = Await.result(future, 100.millis) + val result = Await.result(future, 3.seconds) assert(result == "123") //#source-actorref } @@ -128,7 +128,7 @@ class StreamTestKitDocSpec extends AkkaSpec { .run() probe.sendError(new Exception("boom")) - Await.ready(future, 100.millis) + Await.ready(future, 3.seconds) val Failure(exception) = future.value.get assert(exception.getMessage == "boom") //#injecting-failure diff --git a/akka-docs/rst/scala/code/docs/testkit/ParentChildSpec.scala b/akka-docs/rst/scala/code/docs/testkit/ParentChildSpec.scala index 2219ca10ad..fa87dec678 100644 --- a/akka-docs/rst/scala/code/docs/testkit/ParentChildSpec.scala +++ b/akka-docs/rst/scala/code/docs/testkit/ParentChildSpec.scala @@ -121,6 +121,17 @@ class ParentChildSpec extends WordSpec with Matchers with TestKitBase with Befor } } + //#test-TestProbe-parent + "A TestProbe serving as parent" should { + "test its child responses" in { + val parent = TestProbe() + val child = parent.childActorOf(Props[Child]) + parent.send(child, "ping") + parent.expectMsg("pong") + } + } + //#test-TestProbe-parent + //#test-fabricated-parent "A fabricated parent" should { "test its child responses" in { diff --git a/akka-docs/rst/scala/dispatchers.rst b/akka-docs/rst/scala/dispatchers.rst index 83d53776e8..a1caf2cc6f 100644 --- a/akka-docs/rst/scala/dispatchers.rst +++ b/akka-docs/rst/scala/dispatchers.rst @@ -70,6 +70,7 @@ of programmatically provided parameter. .. _ForkJoinPool documentation: https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ForkJoinPool.html .. _ThreadPoolExecutor documentation: https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ThreadPoolExecutor.html + Types of dispatchers -------------------- diff --git a/akka-docs/rst/scala/http/routing-dsl/directives/debugging-directives/logRequestResult.rst b/akka-docs/rst/scala/http/routing-dsl/directives/debugging-directives/logRequestResult.rst index 7bc199dc36..532f3c1192 100644 --- a/akka-docs/rst/scala/http/routing-dsl/directives/debugging-directives/logRequestResult.rst +++ b/akka-docs/rst/scala/http/routing-dsl/directives/debugging-directives/logRequestResult.rst @@ -36,7 +36,7 @@ Building Advanced Directives ---------------------------- This example will showcase the advanced logging using the ``DebuggingDirectives``. -The built `logResponseTime ` directive will log the request time (or rejection reason): +The built `logResponseTime` directive will log the request time (or rejection reason): .. includecode2:: ../../../../code/docs/http/scaladsl/server/directives/DebuggingDirectivesExamplesSpec.scala :snippet: logRequestResultWithResponseTime diff --git a/akka-docs/rst/scala/persistence.rst b/akka-docs/rst/scala/persistence.rst index a8649454ac..4cd8870625 100644 --- a/akka-docs/rst/scala/persistence.rst +++ b/akka-docs/rst/scala/persistence.rst @@ -133,6 +133,13 @@ The identifier must be defined with the ``persistenceId`` method. .. _recovery: +.. note:: + ``persistenceId`` must be unique to a given entity in the journal (database table/keyspace). + When replaying messages persisted to the journal, you query messages with a ``persistenceId``. + So, if two different entities share the same ``persistenceId``, message-replaying + behavior is corrupted. + + Recovery -------- diff --git a/akka-docs/rst/scala/stream/stream-testkit.rst b/akka-docs/rst/scala/stream/stream-testkit.rst index 377315e26c..293fecf3f0 100644 --- a/akka-docs/rst/scala/stream/stream-testkit.rst +++ b/akka-docs/rst/scala/stream/stream-testkit.rst @@ -30,7 +30,7 @@ sink: The same strategy can be applied for sources as well. In the next example we have a source that produces an infinite stream of elements. Such source can be tested by asserting that first arbitrary number of elements hold some -condition. Here the ``grouped`` combinator and ``Sink.head`` are very useful. +condition. Here the ``take`` combinator and ``Sink.seq`` are very useful. .. includecode:: ../code/docs/stream/StreamTestKitDocSpec.scala#grouped-infinite diff --git a/akka-docs/rst/scala/testing.rst b/akka-docs/rst/scala/testing.rst index 7401614c50..7b9dcc8737 100644 --- a/akka-docs/rst/scala/testing.rst +++ b/akka-docs/rst/scala/testing.rst @@ -548,26 +548,51 @@ Testing parent-child relationships ---------------------------------- The parent of an actor is always the actor that created it. At times this leads to -a coupling between the two that may not be straightforward to test. -Broadly, there are three approaches to improve testability of parent-child -relationships: +a coupling between the two that may not be straightforward to test. +There are several approaches to improve testability of a child actor that +needs to refer to its parent: -1. when creating a child, pass an explicit reference to its parent -2. when creating a parent, tell the parent how to create its child +1. when creating a child, pass an explicit reference to its parent +2. create the child with a ``TestProbe`` as parent 3. create a fabricated parent when testing +Conversely, a parent's binding to its child can be lessened as follows: + +4. when creating a parent, tell the parent how to create its child + For example, the structure of the code you want to test may follow this pattern: .. includecode:: code/docs/testkit/ParentChildSpec.scala#test-example -Using dependency-injection -^^^^^^^^^^^^^^^^^^^^^^^^^^ +Introduce child to its parent +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ The first option is to avoid use of the :meth:`context.parent` function and create a child with a custom parent by passing an explicit reference to its parent instead. .. includecode:: code/docs/testkit/ParentChildSpec.scala#test-dependentchild +Create the child using TestProbe +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +The ``TestProbe`` class can in fact create actors that will run with the test probe as parent. +This will cause any messages the the child actor sends to `context.parent` to +end up in the test probe. + +.. includecode:: code/docs/testkit/ParentChildSpec.scala##test-TestProbe-parent + +Using a fabricated parent +^^^^^^^^^^^^^^^^^^^^^^^^^ + +If you prefer to avoid modifying the parent or child constructor you can +create a fabricated parent in your test. This, however, does not enable you to test +the parent actor in isolation. + +.. includecode:: code/docs/testkit/ParentChildSpec.scala#test-fabricated-parent + +Externalize child making from the parent +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + Alternatively, you can tell the parent how to create its child. There are two ways to do this: by giving it a :class:`Props` object or by giving it a function which takes care of creating the child actor: @@ -581,14 +606,6 @@ And like this in your application code: .. includecode:: code/docs/testkit/ParentChildSpec.scala#child-maker-prod -Using a fabricated parent -^^^^^^^^^^^^^^^^^^^^^^^^^ - -If you prefer to avoid modifying the parent or child constructor you can -create a fabricated parent in your test. This, however, does not enable you to test -the parent actor in isolation. - -.. includecode:: code/docs/testkit/ParentChildSpec.scala#test-fabricated-parent Which of these methods is the best depends on what is most important to test. The most generic option is to create the parent actor by passing it a function that is diff --git a/akka-http-core/src/main/java/akka/http/javadsl/model/headers/SecWebSocketProtocol.java b/akka-http-core/src/main/java/akka/http/javadsl/model/headers/SecWebSocketProtocol.java new file mode 100644 index 0000000000..2683e5a45b --- /dev/null +++ b/akka-http-core/src/main/java/akka/http/javadsl/model/headers/SecWebSocketProtocol.java @@ -0,0 +1,20 @@ +/** + * Copyright (C) 2016-2016 Lightbend Inc. + */ + +package akka.http.javadsl.model.headers; + +import akka.http.impl.util.Util; + +/** + * Model for the `Sec-WebSocket-Protocol` header. + */ +public abstract class SecWebSocketProtocol extends akka.http.scaladsl.model.HttpHeader { + public abstract Iterable getProtocols(); + + public static SecWebSocketProtocol create(String... protocols) { + return new akka.http.scaladsl.model.headers.Sec$minusWebSocket$minusProtocol(Util.convertArray(protocols)); + } + +} + diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolSlot.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolSlot.scala index b73819f407..cb1ebb51e8 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolSlot.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolSlot.scala @@ -4,25 +4,24 @@ package akka.http.impl.engine.client -import akka.actor._ +import akka.event.LoggingAdapter import akka.http.impl.engine.client.PoolConductor.{ ConnectEagerlyCommand, DispatchCommand, SlotCommand } +import akka.http.impl.engine.client.PoolSlot.SlotEvent.ConnectedEagerly import akka.http.scaladsl.model.{ HttpEntity, HttpRequest, HttpResponse } import akka.stream._ -import akka.stream.actor._ -import akka.stream.impl.{ ActorProcessor, ConstantFun, ExposedPublisher, SeqActorName, SubscribePending } import akka.stream.scaladsl._ +import akka.stream.stage.GraphStageLogic.EagerTerminateOutput +import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler } -import scala.collection.immutable import scala.concurrent.Future import scala.language.existentials import scala.util.{ Failure, Success } +import scala.collection.JavaConverters._ private object PoolSlot { import PoolFlow.{ RequestContext, ResponseContext } - sealed trait ProcessorOut - final case class ResponseDelivery(response: ResponseContext) extends ProcessorOut - sealed trait RawSlotEvent extends ProcessorOut + sealed trait RawSlotEvent sealed trait SlotEvent extends RawSlotEvent object SlotEvent { final case class RequestCompletedFuture(future: Future[RequestCompleted]) extends RawSlotEvent @@ -36,242 +35,136 @@ private object PoolSlot { final case class ConnectedEagerly(slotIx: Int) extends SlotEvent } - private val slotProcessorActorName = SeqActorName("SlotProcessor") - - /* - Stream Setup - ============ - - Request- +-----------+ +-------------+ +-------------+ +------------+ - Context | Slot- | List[ | flatten | Processor- | doubler | | SlotEvent- | Response- - +--------->| Processor +------------->| (MapConcat) +------------->| (MapConcat) +---->| Split +-------------> - | | Processor- | | Out | | | | Context - +-----------+ Out] +-------------+ +-------------+ +-----+------+ - | RawSlotEvent - | (to Conductor - | via slotEventMerge) - v - */ - def apply(slotIx: Int, connectionFlow: Flow[HttpRequest, HttpResponse, Any])(implicit system: ActorSystem, fm: Materializer): Graph[FanOutShape2[SlotCommand, ResponseContext, RawSlotEvent], Any] = - GraphDSL.create() { implicit b ⇒ - import GraphDSL.Implicits._ - - // TODO wouldn't be better to have them under a known parent? /user/SlotProcessor-0 seems weird - val name = slotProcessorActorName.next() - - val slotProcessor = b.add { - Flow.fromProcessor { () ⇒ - val actor = system.actorOf( - Props(new SlotProcessor(slotIx, connectionFlow)).withDeploy(Deploy.local), - name) - ActorProcessor[SlotCommand, List[ProcessorOut]](actor) - }.mapConcat(ConstantFun.scalaIdentityFunction) - } - val split = b.add(Broadcast[ProcessorOut](2)) - - slotProcessor ~> split.in - - new FanOutShape2( - slotProcessor.in, - split.out(0).collect { case ResponseDelivery(r) ⇒ r }.outlet, - split.out(1).collect { case r: RawSlotEvent ⇒ r }.outlet) - } - - import ActorPublisherMessage._ - import ActorSubscriberMessage._ + def apply(slotIx: Int, connectionFlow: Flow[HttpRequest, HttpResponse, Any])(implicit m: Materializer): Graph[FanOutShape2[SlotCommand, ResponseContext, RawSlotEvent], Any] = + new SlotProcessor(slotIx, connectionFlow, ActorMaterializerHelper.downcast(m).logger) /** - * An actor managing a series of materializations of the given `connectionFlow`. * To the outside it provides a stable flow stage, consuming `SlotCommand` instances on its - * input (ActorSubscriber) side and producing `List[ProcessorOut]` instances on its output - * (ActorPublisher) side. + * input side and producing `ResponseContext` and `RawSlotEvent` instances on its outputs. * The given `connectionFlow` is materialized into a running flow whenever required. * Completion and errors from the connection are not surfaced to the outside (unless we are * shutting down completely). */ - private class SlotProcessor(slotIx: Int, connectionFlow: Flow[HttpRequest, HttpResponse, Any])(implicit fm: Materializer) - extends ActorSubscriber with ActorPublisher[List[ProcessorOut]] with ActorLogging { - var exposedPublisher: akka.stream.impl.ActorPublisher[Any] = _ - var inflightRequests = immutable.Queue.empty[RequestContext] + private class SlotProcessor(slotIx: Int, connectionFlow: Flow[HttpRequest, HttpResponse, Any], log: LoggingAdapter)(implicit fm: Materializer) + extends GraphStage[FanOutShape2[SlotCommand, ResponseContext, RawSlotEvent]] { - val runnableGraph = Source.actorPublisher[HttpRequest](flowInportProps(self)) - .via(connectionFlow) - .toMat(Sink.actorSubscriber[HttpResponse](flowOutportProps(self)))(Keep.both) - .named("SlotProcessorInternalConnectionFlow") + val in: Inlet[SlotCommand] = Inlet("SlotProcessor.in") + val responsesOut: Outlet[ResponseContext] = Outlet("SlotProcessor.responsesOut") + val eventsOut: Outlet[RawSlotEvent] = Outlet("SlotProcessor.eventsOut") - override def requestStrategy = ZeroRequestStrategy + override def shape: FanOutShape2[SlotCommand, ResponseContext, RawSlotEvent] = new FanOutShape2(in, responsesOut, eventsOut) - /** - * How PoolProcessor changes its `receive`: - * waitingExposedPublisher -> waitingForSubscribePending -> unconnected -> - * waitingForDemandFromConnection OR waitingEagerlyConnected -> running - * Given slot can become get to 'running' state via 'waitingForDemandFromConnection' or 'waitingEagerlyConnected'. - * The difference between those two paths is that the first one is lazy - reacts to DispatchCommand and then uses - * inport and outport actors to obtain more items. - * Where the second one is eager - reacts to SlotShouldConnectCommand from PoolConductor, sends SlotEvent.ConnectedEagerly - * back to conductor and then waits for the first DispatchCommand - */ - override def receive = waitingExposedPublisher + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler { + private var firstRequest: RequestContext = _ + private val inflightRequests = new java.util.ArrayDeque[RequestContext]() - def waitingExposedPublisher: Receive = { - case ExposedPublisher(publisher) ⇒ - exposedPublisher = publisher - context.become(waitingForSubscribePending) - case other ⇒ throw new IllegalStateException(s"The first message must be `ExposedPublisher` but was [$other]") - } + private var connectionFlowSource: SubSourceOutlet[HttpRequest] = _ + private var connectionFlowSink: SubSinkInlet[HttpResponse] = _ - def waitingForSubscribePending: Receive = { - case SubscribePending ⇒ - exposedPublisher.takePendingSubscribers() foreach (s ⇒ self ! ActorPublisher.Internal.Subscribe(s)) - log.debug("become unconnected, from subscriber pending") - context.become(unconnected) - } + private var isConnected = false - val unconnected: Receive = { - case OnNext(DispatchCommand(rc: RequestContext)) ⇒ - val (connInport, connOutport) = runnableGraph.run() - connOutport ! Request(totalDemand) - context.become(waitingForDemandFromConnection(connInport = connInport, connOutport = connOutport, rc)) + def disconnect(ex: Option[Throwable] = None) = { + connectionFlowSource.complete() + if (isConnected) { + isConnected = false - case OnNext(ConnectEagerlyCommand) ⇒ - val (in, out) = runnableGraph.run() - onNext(SlotEvent.ConnectedEagerly(slotIx) :: Nil) - out ! Request(totalDemand) - context.become(waitingEagerlyConnected(connInport = in, connOutport = out)) + // if there was an error sending the request may have been sent so decrement retriesLeft + // otherwise the downstream hasn't sent so sent them back without modifying retriesLeft + val (retries, failures) = ex.map { fail ⇒ + val (inflightRetry, inflightFail) = inflightRequests.iterator().asScala.partition(_.retriesLeft > 0) + val retries = inflightRetry.map(rc ⇒ SlotEvent.RetryRequest(rc.copy(retriesLeft = rc.retriesLeft - 1))).toList + val failures = inflightFail.map(rc ⇒ ResponseContext(rc, Failure(fail))).toList + (retries, failures) + }.getOrElse((inflightRequests.iterator().asScala.map(rc ⇒ SlotEvent.RetryRequest(rc)).toList, Nil)) - case Request(_) ⇒ if (remainingRequested == 0) request(1) // ask for first request if necessary + inflightRequests.clear() - case OnComplete ⇒ onComplete() - case OnError(e) ⇒ onError(e) - case Cancel ⇒ - cancel() - shutdown() - - case c @ FromConnection(msg) ⇒ // ignore ... - } - - def waitingEagerlyConnected(connInport: ActorRef, connOutport: ActorRef): Receive = { - case FromConnection(Request(n)) ⇒ - request(n) - - case OnNext(DispatchCommand(rc: RequestContext)) ⇒ - inflightRequests = inflightRequests.enqueue(rc) - request(1) - connInport ! OnNext(rc.request) - context.become(running(connInport, connOutport)) - } - - def waitingForDemandFromConnection(connInport: ActorRef, connOutport: ActorRef, - firstRequest: RequestContext): Receive = { - case ev @ (Request(_) | Cancel) ⇒ connOutport ! ev - case ev @ (OnComplete | OnError(_)) ⇒ connInport ! ev - case OnNext(x) ⇒ throw new IllegalStateException("Unrequested RequestContext: " + x) - - case FromConnection(Request(n)) ⇒ - inflightRequests = inflightRequests.enqueue(firstRequest) - request(n - remainingRequested) - connInport ! OnNext(firstRequest.request) - context.become(running(connInport, connOutport)) - - case FromConnection(Cancel) ⇒ if (!isActive) { cancel(); shutdown() } // else ignore and wait for accompanying OnComplete or OnError - case FromConnection(OnComplete) ⇒ handleDisconnect(sender(), None, Some(firstRequest)) - case FromConnection(OnError(e)) ⇒ handleDisconnect(sender(), Some(e), Some(firstRequest)) - case FromConnection(OnNext(x)) ⇒ throw new IllegalStateException("Unexpected HttpResponse: " + x) - } - - def running(connInport: ActorRef, connOutport: ActorRef): Receive = { - case ev @ (Request(_) | Cancel) ⇒ connOutport ! ev - case ev @ (OnComplete | OnError(_)) ⇒ connInport ! ev - case OnNext(DispatchCommand(rc: RequestContext)) ⇒ - inflightRequests = inflightRequests.enqueue(rc) - connInport ! OnNext(rc.request) - - case FromConnection(Request(n)) ⇒ request(n) - case FromConnection(Cancel) ⇒ if (!isActive) { cancel(); shutdown() } // else ignore and wait for accompanying OnComplete or OnError - - case FromConnection(OnNext(response: HttpResponse)) ⇒ - val requestContext = inflightRequests.head - inflightRequests = inflightRequests.tail - val (entity, whenCompleted) = HttpEntity.captureTermination(response.entity) - val delivery = ResponseDelivery(ResponseContext(requestContext, Success(response withEntity entity))) - import fm.executionContext - val requestCompleted = SlotEvent.RequestCompletedFuture(whenCompleted.map(_ ⇒ SlotEvent.RequestCompleted(slotIx))) - onNext(delivery :: requestCompleted :: Nil) - - case FromConnection(OnComplete) ⇒ handleDisconnect(sender(), None) - case FromConnection(OnError(e)) ⇒ handleDisconnect(sender(), Some(e)) - } - - def handleDisconnect(connInport: ActorRef, error: Option[Throwable], firstContext: Option[RequestContext] = None): Unit = { - log.debug("Slot {} disconnected after {}", slotIx, error getOrElse "regular connection close") - - val results: List[ProcessorOut] = { - if (inflightRequests.isEmpty && firstContext.isDefined) { - (error match { - case Some(err) ⇒ ResponseDelivery(ResponseContext(firstContext.get, Failure(new UnexpectedDisconnectException("Unexpected (early) disconnect", err)))) - case _ ⇒ ResponseDelivery(ResponseContext(firstContext.get, Failure(new UnexpectedDisconnectException("Unexpected (early) disconnect")))) - }) :: Nil - } else { - inflightRequests.map { rc ⇒ - if (rc.retriesLeft == 0) { - val reason = error.fold[Throwable](new UnexpectedDisconnectException("Unexpected disconnect"))(ConstantFun.scalaIdentityFunction) - connInport ! ActorPublisherMessage.Cancel - ResponseDelivery(ResponseContext(rc, Failure(reason))) - } else SlotEvent.RetryRequest(rc.copy(retriesLeft = rc.retriesLeft - 1)) - }(collection.breakOut) + emitMultiple(responsesOut, failures) + emitMultiple(eventsOut, SlotEvent.Disconnected(slotIx, retries.size + failures.size) :: retries, () ⇒ if (failures.isEmpty && !hasBeenPulled(in)) pull(in)) } } - inflightRequests = immutable.Queue.empty - onNext(SlotEvent.Disconnected(slotIx, results.size) :: results) - if (canceled) onComplete() - context.become(unconnected) + // SourceOutlet is connected to the connectionFlow's inlet, when the connectionFlow + // completes (e.g. connection closed) complete the subflow and emit the Disconnected event + private val connectionOutFlowHandler = new OutHandler { + // inner stream pulls, we either give first request or pull upstream + override def onPull(): Unit = { + if (firstRequest != null) { + inflightRequests.add(firstRequest) + connectionFlowSource.push(firstRequest.request) + firstRequest = null + } else pull(in) + } + + override def onDownstreamFinish(): Unit = connectionFlowSource.complete() + } + + // SinkInlet is connected to the connectionFlow's outlet, an upstream + // complete indicates the remote has shutdown cleanly, a failure is + // abnormal termination/connection refused. Successful requests + // will show up in `onPush` + private val connectionInFlowHandler = new InHandler { + // inner stream pushes we push downstream + override def onPush(): Unit = { + val response = connectionFlowSink.grab() + val requestContext = inflightRequests.pop + + val (entity, whenCompleted) = HttpEntity.captureTermination(response.entity) + import fm.executionContext + push(responsesOut, ResponseContext(requestContext, Success(response withEntity entity))) + push(eventsOut, SlotEvent.RequestCompletedFuture(whenCompleted.map(_ ⇒ SlotEvent.RequestCompleted(slotIx)))) + } + + override def onUpstreamFinish(): Unit = disconnect() + + override def onUpstreamFailure(ex: Throwable): Unit = disconnect(Some(ex)) + } + + // upstream pushes we create the inner stream if necessary or push if we're already connected + override def onPush(): Unit = { + def establishConnectionFlow() = { + connectionFlowSource = new SubSourceOutlet[HttpRequest]("RequestSource") + connectionFlowSource.setHandler(connectionOutFlowHandler) + + connectionFlowSink = new SubSinkInlet[HttpResponse]("ResponseSink") + connectionFlowSink.setHandler(connectionInFlowHandler) + + isConnected = true + + Source.fromGraph(connectionFlowSource.source) + .via(connectionFlow).runWith(Sink.fromGraph(connectionFlowSink.sink))(subFusingMaterializer) + + connectionFlowSink.pull() + } + + grab(in) match { + case ConnectEagerlyCommand ⇒ + if (!isConnected) establishConnectionFlow() + + emit(eventsOut, ConnectedEagerly(slotIx)) + + case DispatchCommand(rc: RequestContext) ⇒ + if (isConnected) { + inflightRequests.add(rc) + connectionFlowSource.push(rc.request) + } else { + firstRequest = rc + establishConnectionFlow() + } + } + } + + setHandler(in, this) + + setHandler(responsesOut, new OutHandler { + override def onPull(): Unit = { + // downstream pulls, if connected we pull inner + if (isConnected) connectionFlowSink.pull() + else if (!hasBeenPulled(in)) pull(in) + } + }) + + setHandler(eventsOut, EagerTerminateOutput) } - - override def onComplete(): Unit = { - exposedPublisher.shutdown(None) - super.onComplete() - shutdown() - } - - override def onError(cause: Throwable): Unit = { - exposedPublisher.shutdown(Some(cause)) - super.onError(cause) - shutdown() - } - - def shutdown(): Unit = context.stop(self) - } - - private case class FromConnection(ev: Any) extends NoSerializationVerificationNeeded - - private class FlowInportActor(slotProcessor: ActorRef) extends ActorPublisher[HttpRequest] with ActorLogging { - def receive: Receive = { - case ev: Request ⇒ slotProcessor ! FromConnection(ev) - case OnNext(r: HttpRequest) ⇒ onNext(r) - case OnComplete ⇒ onCompleteThenStop() - case OnError(e) ⇒ onErrorThenStop(e) - case Cancel ⇒ - slotProcessor ! FromConnection(Cancel) - context.stop(self) - } - } - def flowInportProps(s: ActorRef) = Props(new FlowInportActor(s)).withDeploy(Deploy.local) - - private class FlowOutportActor(slotProcessor: ActorRef) extends ActorSubscriber with ActorLogging { - def requestStrategy = ZeroRequestStrategy - def receive: Receive = { - case Request(n) ⇒ request(n) - case Cancel ⇒ cancel() - case ev: OnNext ⇒ slotProcessor ! FromConnection(ev) - case ev @ (OnComplete | OnError(_)) ⇒ - slotProcessor ! FromConnection(ev) - context.stop(self) - } - } - def flowOutportProps(s: ActorRef) = Props(new FlowOutportActor(s)).withDeploy(Deploy.local) - - final class UnexpectedDisconnectException(msg: String, cause: Throwable) extends RuntimeException(msg, cause) { - def this(msg: String) = this(msg, null) } } diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala index 38d6a8b688..80414e2be3 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala @@ -85,10 +85,7 @@ private[http] object HttpServerBluePrint { BidiFlow.fromFlows(Flow[HttpResponse], new PrepareRequests(settings)) def requestTimeoutSupport(timeout: Duration): BidiFlow[HttpResponse, HttpResponse, HttpRequest, HttpRequest, NotUsed] = - timeout match { - case x: FiniteDuration ⇒ BidiFlow.fromGraph(new RequestTimeoutSupport(x)).reversed - case _ ⇒ BidiFlow.identity - } + BidiFlow.fromGraph(new RequestTimeoutSupport(timeout)).reversed /** * Two state stage, either transforms an incoming RequestOutput into a HttpRequest with strict entity and then pushes @@ -249,7 +246,7 @@ private[http] object HttpServerBluePrint { .via(MapError[ResponseRenderingOutput](errorHandler).named("errorLogger")) } - class RequestTimeoutSupport(initialTimeout: FiniteDuration) + class RequestTimeoutSupport(initialTimeout: Duration) extends GraphStage[BidiShape[HttpRequest, HttpRequest, HttpResponse, HttpResponse]] { private val requestIn = Inlet[HttpRequest]("requestIn") private val requestOut = Outlet[HttpRequest]("requestOut") @@ -307,13 +304,23 @@ private[http] object HttpServerBluePrint { val timeout: Duration, val handler: HttpRequest ⇒ HttpResponse) - private class TimeoutAccessImpl(request: HttpRequest, initialTimeout: FiniteDuration, requestEnd: Future[Unit], + private object DummyCancellable extends Cancellable { + override def isCancelled: Boolean = true + override def cancel(): Boolean = true + } + + private class TimeoutAccessImpl(request: HttpRequest, initialTimeout: Duration, requestEnd: Future[Unit], trigger: AsyncCallback[(TimeoutAccess, HttpResponse)], materializer: Materializer) extends AtomicReference[Future[TimeoutSetup]] with TimeoutAccess with (HttpRequest ⇒ HttpResponse) { self ⇒ import materializer.executionContext - set { - requestEnd.fast.map(_ ⇒ new TimeoutSetup(Deadline.now, schedule(initialTimeout, this), initialTimeout, this)) + initialTimeout match { + case timeout: FiniteDuration ⇒ set { + requestEnd.fast.map(_ ⇒ new TimeoutSetup(Deadline.now, schedule(timeout, this), timeout, this)) + } + case _ ⇒ set { + requestEnd.fast.map(_ ⇒ new TimeoutSetup(Deadline.now, DummyCancellable, Duration.Inf, this)) + } } override def apply(request: HttpRequest) = diff --git a/akka-http-core/src/main/scala/akka/http/javadsl/model/ws/Message.scala b/akka-http-core/src/main/scala/akka/http/javadsl/model/ws/Message.scala index 6ec86a849b..0524f02464 100644 --- a/akka-http-core/src/main/scala/akka/http/javadsl/model/ws/Message.scala +++ b/akka-http-core/src/main/scala/akka/http/javadsl/model/ws/Message.scala @@ -11,7 +11,7 @@ import akka.util.ByteString /** * Represents a WebSocket message. A message can either be a binary message or a text message. */ -sealed abstract class Message { +abstract class Message { /** * Is this message a text message? If true, [[asTextMessage]] will return this * text message, if false, [[asBinaryMessage]] will return this binary message. @@ -150,4 +150,4 @@ object BinaryMessage { case sm.ws.BinaryMessage.Strict(data) ⇒ create(data) case bm: sm.ws.BinaryMessage ⇒ create(bm.dataStream.asJava) } -} \ No newline at end of file +} diff --git a/akka-http-core/src/main/scala/akka/http/scaladsl/model/headers/headers.scala b/akka-http-core/src/main/scala/akka/http/scaladsl/model/headers/headers.scala index 917243c58c..e70c530f00 100644 --- a/akka-http-core/src/main/scala/akka/http/scaladsl/model/headers/headers.scala +++ b/akka-http-core/src/main/scala/akka/http/scaladsl/model/headers/headers.scala @@ -766,11 +766,14 @@ private[http] object `Sec-WebSocket-Protocol` extends ModeledCompanion[`Sec-WebS * INTERNAL API */ private[http] final case class `Sec-WebSocket-Protocol`(protocols: immutable.Seq[String]) - extends RequestResponseHeader { + extends jm.headers.SecWebSocketProtocol with RequestResponseHeader { require(protocols.nonEmpty, "Sec-WebSocket-Protocol.protocols must not be empty") import `Sec-WebSocket-Protocol`.protocolsRenderer protected[http] def renderValue[R <: Rendering](r: R): r.type = r ~~ protocols protected def companion = `Sec-WebSocket-Protocol` + + /** Java API */ + override def getProtocols: Iterable[String] = protocols.asJava } // http://tools.ietf.org/html/rfc6455#section-4.3 diff --git a/akka-http-core/src/main/scala/akka/http/scaladsl/model/ws/Message.scala b/akka-http-core/src/main/scala/akka/http/scaladsl/model/ws/Message.scala index 355e45a161..c19fbc8adc 100644 --- a/akka-http-core/src/main/scala/akka/http/scaladsl/model/ws/Message.scala +++ b/akka-http-core/src/main/scala/akka/http/scaladsl/model/ws/Message.scala @@ -4,6 +4,7 @@ package akka.http.scaladsl.model.ws +import akka.stream.javadsl import akka.stream.scaladsl.Source import akka.util.ByteString @@ -11,18 +12,22 @@ import akka.util.ByteString /** * The ADT for WebSocket messages. A message can either be a binary or a text message. */ -sealed trait Message // FIXME: Why don't we extend akka.http.javadsl.model.ws.Message here? +sealed trait Message extends akka.http.javadsl.model.ws.Message /** * Represents a WebSocket text message. A text message can either be a [[TextMessage.Strict]] in which case * the complete data is already available or it can be [[TextMessage.Streamed]] in which case `textStream` * will return a Source streaming the data as it comes in. */ -sealed trait TextMessage extends Message { +sealed trait TextMessage extends akka.http.javadsl.model.ws.TextMessage with Message { /** * The contents of this message as a stream. */ def textStream: Source[String, _] + + /** Java API */ + override def getStreamedText: javadsl.Source[String, _] = textStream.asJava + override def asScala: TextMessage = this } //#message-model object TextMessage { @@ -36,9 +41,18 @@ object TextMessage { final case class Strict(text: String) extends TextMessage { def textStream: Source[String, _] = Source.single(text) override def toString: String = s"TextMessage.Strict($text)" + + /** Java API */ + override def getStrictText: String = text + override def isStrict: Boolean = true } + final case class Streamed(textStream: Source[String, _]) extends TextMessage { override def toString: String = s"TextMessage.Streamed($textStream)" + + /** Java API */ + override def getStrictText: String = throw new IllegalStateException("Cannot get strict text for streamed message.") + override def isStrict: Boolean = false } } @@ -48,11 +62,15 @@ object TextMessage { * will return a Source streaming the data as it comes in. */ //#message-model -sealed trait BinaryMessage extends Message { +sealed trait BinaryMessage extends akka.http.javadsl.model.ws.BinaryMessage with Message { /** * The contents of this message as a stream. */ def dataStream: Source[ByteString, _] + + /** Java API */ + override def getStreamedData: javadsl.Source[ByteString, _] = dataStream.asJava + override def asScala: BinaryMessage = this } //#message-model object BinaryMessage { @@ -66,8 +84,16 @@ object BinaryMessage { final case class Strict(data: ByteString) extends BinaryMessage { def dataStream: Source[ByteString, _] = Source.single(data) override def toString: String = s"BinaryMessage.Strict($data)" + + /** Java API */ + override def getStrictData: ByteString = data + override def isStrict: Boolean = true } final case class Streamed(dataStream: Source[ByteString, _]) extends BinaryMessage { override def toString: String = s"BinaryMessage.Streamed($dataStream)" + + /** Java API */ + override def getStrictData: ByteString = throw new IllegalStateException("Cannot get strict data for streamed message.") + override def isStrict: Boolean = false } } diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/client/ConnectionPoolSpec.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/client/ConnectionPoolSpec.scala index 09f62b348a..ccb658bc79 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/engine/client/ConnectionPoolSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/client/ConnectionPoolSpec.scala @@ -11,7 +11,7 @@ import java.util.concurrent.atomic.AtomicInteger import akka.http.impl.engine.client.PoolMasterActor.PoolInterfaceRunning import akka.http.impl.settings.ConnectionPoolSettingsImpl -import akka.http.impl.util.{ SingletonException, StreamUtils } +import akka.http.impl.util.SingletonException import akka.http.scaladsl.model._ import akka.http.scaladsl.model.headers._ import akka.http.scaladsl.settings.{ ClientConnectionSettings, ConnectionPoolSettings, ServerSettings } @@ -335,6 +335,67 @@ class ConnectionPoolSpec extends AkkaSpec(""" } } + "be able to handle 500 `Connection: close` requests against the test server" in new TestSetup { + val settings = ConnectionPoolSettings(system).withMaxConnections(4) + val poolFlow = Http().cachedHostConnectionPool[Int](serverHostName, serverPort, settings = settings) + + val N = 500 + val requestIds = Source.fromIterator(() ⇒ Iterator.from(1)).take(N) + val idSum = requestIds.map(id ⇒ HttpRequest(uri = s"/r$id").withHeaders(Connection("close")) → id).via(poolFlow).map { + case (Success(response), id) ⇒ + requestUri(response) should endWith(s"/r$id") + id + case x ⇒ fail(x.toString) + }.runFold(0)(_ + _) + + (1 to N).foreach(_ ⇒ acceptIncomingConnection()) + + Await.result(idSum, 10.seconds) shouldEqual N * (N + 1) / 2 + } + + "be able to handle 500 pipelined requests with connection termination" in new TestSetup(autoAccept = true) { + def closeHeader(): List[Connection] = + if (util.Random.nextInt(8) == 0) Connection("close") :: Nil + else Nil + + override def testServerHandler(connNr: Int): HttpRequest ⇒ HttpResponse = { r ⇒ + val idx = r.uri.path.tail.head.toString + HttpResponse() + .withHeaders(RawHeader("Req-Idx", idx) +: responseHeaders(r, connNr)) + .withDefaultHeaders(closeHeader()) + } + + for (pipeliningLimit ← Iterator.from(1).map(math.pow(2, _).toInt).take(4)) { + val settings = ConnectionPoolSettings(system).withMaxConnections(4).withPipeliningLimit(pipeliningLimit).withMaxOpenRequests(4 * pipeliningLimit) + val poolFlow = Http().cachedHostConnectionPool[Int](serverHostName, serverPort, settings = settings) + + def method() = + if (util.Random.nextInt(2) == 0) HttpMethods.POST else HttpMethods.GET + + def request(i: Int) = + HttpRequest(method = method(), headers = closeHeader(), uri = s"/$i") → i + + try { + val N = 200 + val (_, idSum) = + Source.fromIterator(() ⇒ Iterator.from(1)).take(N) + .map(request) + .viaMat(poolFlow)(Keep.right) + .map { + case (Success(response), id) ⇒ + requestUri(response) should endWith(s"/$id") + id + case x ⇒ fail(x.toString) + }.toMat(Sink.fold(0)(_ + _))(Keep.both).run() + + Await.result(idSum, 30.seconds) shouldEqual N * (N + 1) / 2 + } catch { + case thr: Throwable ⇒ + throw new RuntimeException(s"Failed at pipeliningLimit=$pipeliningLimit, poolFlow=$poolFlow", thr) + } + } + } + class TestSetup( serverSettings: ServerSettings = ServerSettings(system), autoAccept: Boolean = false) { diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/server/HttpServerSpec.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/server/HttpServerSpec.scala index b83f6be39e..17c4ffdc20 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/engine/server/HttpServerSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/server/HttpServerSpec.scala @@ -40,7 +40,7 @@ class HttpServerSpec extends AkkaSpec( | |""") - expectRequest() shouldEqual HttpRequest(uri = "http://example.com/", headers = List(Host("example.com"))) + expectRequest() mapHeaders (_.filterNot(_.is("timeout-access"))) shouldEqual HttpRequest(uri = "http://example.com/", headers = List(Host("example.com"))) shutdownBlueprint() }) @@ -141,7 +141,7 @@ class HttpServerSpec extends AkkaSpec( | |abcdefghijkl""") - expectRequest() shouldEqual + expectRequest() mapHeaders (_.filterNot(_.is("timeout-access"))) shouldEqual HttpRequest( method = POST, uri = "http://example.com/strict", @@ -205,7 +205,7 @@ class HttpServerSpec extends AkkaSpec( | |abcdefghijkl""") - expectRequest() shouldEqual + expectRequest() mapHeaders (_.filterNot(_.is("timeout-access"))) shouldEqual HttpRequest( method = POST, uri = "http://example.com/strict", @@ -218,7 +218,7 @@ class HttpServerSpec extends AkkaSpec( | |mnopqrstuvwx""") - expectRequest() shouldEqual + expectRequest() mapHeaders (_.filterNot(_.is("timeout-access"))) shouldEqual HttpRequest( method = POST, uri = "http://example.com/next-strict", @@ -446,7 +446,7 @@ class HttpServerSpec extends AkkaSpec( |Host: example.com | |""") - expectRequest() shouldEqual HttpRequest(GET, uri = "http://example.com/", headers = List(Host("example.com"))) + expectRequest() mapHeaders (_.filterNot(_.is("timeout-access"))) shouldEqual HttpRequest(GET, uri = "http://example.com/", headers = List(Host("example.com"))) shutdownBlueprint() }) @@ -456,7 +456,7 @@ class HttpServerSpec extends AkkaSpec( |Host: example.com | |""") - expectRequest() shouldEqual HttpRequest(HEAD, uri = "http://example.com/", headers = List(Host("example.com"))) + expectRequest() mapHeaders (_.filterNot(_.is("timeout-access"))) shouldEqual HttpRequest(HEAD, uri = "http://example.com/", headers = List(Host("example.com"))) shutdownBlueprint() }) @@ -696,7 +696,7 @@ class HttpServerSpec extends AkkaSpec( | |""".stripMarginWithNewline("\r\n")) - expectRequest() shouldEqual HttpRequest(uri = "http://example.com/", headers = List(Host("example.com"))) + expectRequest() mapHeaders (_.filterNot(_.is("timeout-access"))) shouldEqual HttpRequest(uri = "http://example.com/", headers = List(Host("example.com"))) responses.expectRequest() responses.sendError(new RuntimeException("CRASH BOOM BANG")) @@ -758,7 +758,7 @@ class HttpServerSpec extends AkkaSpec( | |""") - expectRequest() shouldEqual HttpRequest(uri = "http://example.com//foo", headers = List(Host("example.com"))) + expectRequest() mapHeaders (_.filterNot(_.is("timeout-access"))) shouldEqual HttpRequest(uri = "http://example.com//foo", headers = List(Host("example.com"))) shutdownBlueprint() }) @@ -767,7 +767,7 @@ class HttpServerSpec extends AkkaSpec( | |""") - expectRequest() shouldEqual HttpRequest(uri = "http://example.com/abc", protocol = HttpProtocols.`HTTP/1.0`) + expectRequest() mapHeaders (_.filterNot(_.is("timeout-access"))) shouldEqual HttpRequest(uri = "http://example.com/abc", protocol = HttpProtocols.`HTTP/1.0`) override def settings: ServerSettings = super.settings.withDefaultHostHeader(Host("example.com")) diff --git a/akka-http-core/src/test/scala/akka/http/scaladsl/ClientServerSpec.scala b/akka-http-core/src/test/scala/akka/http/scaladsl/ClientServerSpec.scala index 264c3fbae0..60869305d6 100644 --- a/akka-http-core/src/test/scala/akka/http/scaladsl/ClientServerSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/scaladsl/ClientServerSpec.scala @@ -447,7 +447,7 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll wit val serverInSub = serverIn.expectSubscription() serverInSub.request(1) private val HttpRequest(POST, uri, List(Accept(Seq(MediaRanges.`*/*`)), Host(_, _), `User-Agent`(_)), - Chunked(`chunkedContentType`, chunkStream), HttpProtocols.`HTTP/1.1`) = serverIn.expectNext() + Chunked(`chunkedContentType`, chunkStream), HttpProtocols.`HTTP/1.1`) = serverIn.expectNext() mapHeaders (_.filterNot(_.is("timeout-access"))) uri shouldEqual Uri(s"http://$hostname:$port/chunked") Await.result(chunkStream.limit(5).runWith(Sink.seq), 100.millis) shouldEqual chunks diff --git a/akka-http-testkit/src/main/scala/akka/http/javadsl/testkit/RouteTest.scala b/akka-http-testkit/src/main/scala/akka/http/javadsl/testkit/RouteTest.scala index e41ebed1b4..bb41a2ad8f 100644 --- a/akka-http-testkit/src/main/scala/akka/http/javadsl/testkit/RouteTest.scala +++ b/akka-http-testkit/src/main/scala/akka/http/javadsl/testkit/RouteTest.scala @@ -29,7 +29,7 @@ import akka.stream.Materializer * * See `JUnitRouteTest` for an example of a concrete implementation. */ -abstract class RouteTest extends AllDirectives { +abstract class RouteTest extends AllDirectives with WSTestRequestBuilding { implicit def system: ActorSystem implicit def materializer: Materializer implicit def executionContext: ExecutionContextExecutor = system.dispatcher diff --git a/akka-http-testkit/src/main/scala/akka/http/javadsl/testkit/WSProbe.scala b/akka-http-testkit/src/main/scala/akka/http/javadsl/testkit/WSProbe.scala new file mode 100644 index 0000000000..d00e03c8d2 --- /dev/null +++ b/akka-http-testkit/src/main/scala/akka/http/javadsl/testkit/WSProbe.scala @@ -0,0 +1,108 @@ +/* + * Copyright (C) 2016-2016 Lightbend Inc. + */ + +package akka.http.javadsl.testkit + +import akka.NotUsed +import akka.actor.ActorSystem +import akka.http.javadsl.model.ws.Message +import akka.stream.Materializer +import akka.stream.javadsl.Flow +import akka.stream.scaladsl +import akka.util.ByteString + +import akka.http.scaladsl.{ testkit ⇒ st } + +import akka.http.impl.util.JavaMapping.Implicits._ + +import scala.concurrent.duration._ + +/** + * A WSProbe is a probe that implements a `Flow[Message, Message, Unit]` for testing + * websocket code. + * + * Requesting elements is handled automatically. + */ +class WSProbe(delegate: st.WSProbe) { + + def flow: Flow[Message, Message, Any] = { + val underlying = scaladsl.Flow[Message].map(_.asScala).via(delegate.flow).map(_.asJava) + new Flow[Message, Message, NotUsed](underlying) + } + + /** + * Send the given messages out of the flow. + */ + def sendMessage(message: Message): Unit = delegate.sendMessage(message.asScala) + + /** + * Send a text message containing the given string out of the flow. + */ + def sendMessage(text: String): Unit = delegate.sendMessage(text) + + /** + * Send a binary message containing the given bytes out of the flow. + */ + def sendMessage(bytes: ByteString): Unit = delegate.sendMessage(bytes) + + /** + * Complete the output side of the flow. + */ + def sendCompletion(): Unit = delegate.sendCompletion() + + /** + * Expect a message on the input side of the flow. + */ + def expectMessage(): Message = delegate.expectMessage() + + /** + * Expect a text message on the input side of the flow and compares its payload with the given one. + * If the received message is streamed its contents are collected and then asserted against the given + * String. + */ + def expectMessage(text: String): Unit = delegate.expectMessage(text) + + /** + * Expect a binary message on the input side of the flow and compares its payload with the given one. + * If the received message is streamed its contents are collected and then asserted against the given + * ByteString. + */ + def expectMessage(bytes: ByteString): Unit = delegate.expectMessage(bytes) + + /** + * Expect no message on the input side of the flow. + */ + def expectNoMessage(): Unit = delegate.expectNoMessage() + + /** + * Expect no message on the input side of the flow for the given maximum duration. + */ + def expectNoMessage(max: FiniteDuration): Unit = delegate.expectNoMessage(max) + + /** + * Expect completion on the input side of the flow. + */ + def expectCompletion(): Unit = delegate.expectCompletion() + +} + +object WSProbe { + + // A convenient method to create WSProbe with default maxChunks and maxChunkCollectionMills + def create(system: ActorSystem, materializer: Materializer): WSProbe = { + create(system, materializer, 1000, 5000) + } + + /** + * Creates a WSProbe to use in tests against websocket handlers. + * + * @param maxChunks The maximum number of chunks to collect for streamed messages. + * @param maxChunkCollectionMills The maximum time in milliseconds to collect chunks for streamed messages. + */ + def create(system: ActorSystem, materializer: Materializer, maxChunks: Int, maxChunkCollectionMills: Long): WSProbe = { + val delegate = st.WSProbe(maxChunks, maxChunkCollectionMills)(system, materializer) + new WSProbe(delegate) + } + +} diff --git a/akka-http-testkit/src/main/scala/akka/http/javadsl/testkit/WSTestRequestBuilding.scala b/akka-http-testkit/src/main/scala/akka/http/javadsl/testkit/WSTestRequestBuilding.scala new file mode 100644 index 0000000000..abd4340b19 --- /dev/null +++ b/akka-http-testkit/src/main/scala/akka/http/javadsl/testkit/WSTestRequestBuilding.scala @@ -0,0 +1,34 @@ +/* + * Copyright (C) 2016-2016 Lightbend Inc. + */ + +package akka.http.javadsl.testkit + +import akka.http.javadsl.model.ws.Message +import akka.http.javadsl.model.{ HttpRequest, Uri } +import akka.http.scaladsl.{ model ⇒ sm } +import akka.stream.javadsl.Flow + +import akka.http.scaladsl.{ testkit ⇒ st } + +import akka.http.impl.util.JavaMapping.Implicits._ +import scala.collection.JavaConverters._ +import akka.stream.{ Materializer, scaladsl } + +trait WSTestRequestBuilding { + + def WS(uri: Uri, clientSideHandler: Flow[Message, Message, Any], materializer: Materializer): HttpRequest = { + WS(uri, clientSideHandler, materializer, java.util.Collections.emptyList()) + } + + def WS( + uri: Uri, + clientSideHandler: Flow[Message, Message, Any], + materializer: Materializer, + subprotocols: java.util.List[String]): HttpRequest = { + + val handler = scaladsl.Flow[sm.ws.Message].map(_.asJava).via(clientSideHandler).map(_.asScala) + st.WSTestRequestBuilding.WS(uri.asScala, handler, subprotocols.asScala)(materializer) + } + +} diff --git a/akka-http-testkit/src/main/scala/akka/http/scaladsl/testkit/WSTestRequestBuilding.scala b/akka-http-testkit/src/main/scala/akka/http/scaladsl/testkit/WSTestRequestBuilding.scala index ea9e0a92ba..edf3ad041b 100644 --- a/akka-http-testkit/src/main/scala/akka/http/scaladsl/testkit/WSTestRequestBuilding.scala +++ b/akka-http-testkit/src/main/scala/akka/http/scaladsl/testkit/WSTestRequestBuilding.scala @@ -9,11 +9,11 @@ import akka.http.scaladsl.model.headers.{ UpgradeProtocol, Upgrade, `Sec-WebSock import akka.http.scaladsl.model.{ StatusCodes, HttpResponse, HttpRequest, Uri } import akka.http.scaladsl.model.ws.{ UpgradeToWebSocket, Message } import scala.collection.immutable -import akka.stream.{ Graph, FlowShape } +import akka.stream.{ Materializer, Graph, FlowShape } import akka.stream.scaladsl.Flow -trait WSTestRequestBuilding { self: RouteTest ⇒ - def WS(uri: Uri, clientSideHandler: Flow[Message, Message, Any], subprotocols: Seq[String] = Nil)(): HttpRequest = +trait WSTestRequestBuilding { + def WS(uri: Uri, clientSideHandler: Flow[Message, Message, Any], subprotocols: Seq[String] = Nil)(implicit materializer: Materializer): HttpRequest = HttpRequest(uri = uri) .addHeader(new InternalCustomHeader("UpgradeToWebSocketTestHeader") with UpgradeToWebSocket { def requestedProtocols: immutable.Seq[String] = subprotocols.toList @@ -28,3 +28,5 @@ trait WSTestRequestBuilding { self: RouteTest ⇒ } }) } + +object WSTestRequestBuilding extends WSTestRequestBuilding diff --git a/akka-http/src/main/scala/akka/http/javadsl/server/directives/WebSocketDirectives.scala b/akka-http/src/main/scala/akka/http/javadsl/server/directives/WebSocketDirectives.scala index 79b78f9071..c807744f0f 100644 --- a/akka-http/src/main/scala/akka/http/javadsl/server/directives/WebSocketDirectives.scala +++ b/akka-http/src/main/scala/akka/http/javadsl/server/directives/WebSocketDirectives.scala @@ -52,9 +52,8 @@ abstract class WebSocketDirectives extends SecurityDirectives { * Handles WebSocket requests with the given handler if the given subprotocol is offered in the request and * rejects other requests with an [[ExpectedWebSocketRequestRejection]] or an [[UnsupportedWebSocketSubprotocolRejection]]. */ - def handleWebSocketMessagesForProtocol(handler: Flow[Message, Message, Any], subprotocol: String): Route = RouteAdapter { - val adapted = scaladsl.Flow[s.Message].map(_.asJava).via(handler).map(_.asScala) - D.handleWebSocketMessagesForProtocol(adapted, subprotocol) + def handleWebSocketMessagesForProtocol[T](handler: Flow[Message, Message, T], subprotocol: String): Route = RouteAdapter { + D.handleWebSocketMessagesForProtocol(adapt(handler), subprotocol) } /** @@ -68,12 +67,10 @@ abstract class WebSocketDirectives extends SecurityDirectives { * * To support several subprotocols you may chain several `handleWebSocketMessage` Routes. */ - def handleWebSocketMessagesForOptionalProtocol(handler: Flow[Message, Message, Any], subprotocol: Optional[String]): Route = RouteAdapter { - val adapted = scaladsl.Flow[s.Message].map(_.asJava).via(handler).map(_.asScala) - D.handleWebSocketMessagesForOptionalProtocol(adapted, subprotocol.asScala) + def handleWebSocketMessagesForOptionalProtocol[T](handler: Flow[Message, Message, T], subprotocol: Optional[String]): Route = RouteAdapter { + D.handleWebSocketMessagesForOptionalProtocol(adapt(handler), subprotocol.asScala) } - // TODO this is because scala Message does not extend java Message - we could fix that, but http-core is stable private def adapt[T](handler: Flow[Message, Message, T]): scaladsl.Flow[s.Message, s.Message, NotUsed] = { scaladsl.Flow[s.Message].map(_.asJava).via(handler).map(_.asScala) } diff --git a/akka-remote/src/main/scala/akka/remote/Endpoint.scala b/akka-remote/src/main/scala/akka/remote/Endpoint.scala index 1f65987741..497f179141 100644 --- a/akka-remote/src/main/scala/akka/remote/Endpoint.scala +++ b/akka-remote/src/main/scala/akka/remote/Endpoint.scala @@ -805,7 +805,10 @@ private[remote] class EndpointWriter( } } catch { case e: NotSerializableException ⇒ - log.error(e, "Transient association error (association remains live)") + log.error(e, "Serializer not defined for message type []. Transient association error (association remains live)", s.message.getClass) + true + case e: MessageSerializer.SerializationException ⇒ + log.error(e, "{} Transient association error (association remains live)", e.getMessage) true case e: EndpointException ⇒ publishAndThrow(e, Logging.ErrorLevel) diff --git a/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala b/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala index b09f9cbd01..f1422c7a1a 100644 --- a/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala +++ b/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala @@ -8,8 +8,11 @@ import akka.remote.WireFormats._ import akka.protobuf.ByteString import akka.actor.ExtendedActorSystem import akka.remote.artery.{ EnvelopeBuffer, HeaderBuilder } -import akka.serialization.{ Serialization, SerializationExtension, SerializerWithStringManifest } +import akka.serialization.Serialization import akka.serialization.ByteBufferSerializer +import akka.serialization.SerializationExtension +import akka.serialization.SerializerWithStringManifest +import scala.util.control.NonFatal /** * INTERNAL API @@ -18,6 +21,8 @@ import akka.serialization.ByteBufferSerializer */ private[akka] object MessageSerializer { + class SerializationException(msg: String, cause: Throwable) extends RuntimeException(msg, cause) + /** * Uses Akka Serialization for the specified ActorSystem to transform the given MessageProtocol to a message */ @@ -30,23 +35,32 @@ private[akka] object MessageSerializer { /** * Uses Akka Serialization for the specified ActorSystem to transform the given message to a MessageProtocol + * Throws `NotSerializableException` if serializer was not configured for the message type. + * Throws `MessageSerializer.SerializationException` if exception was thrown from `toBinary` of the + * serializer. */ def serialize(system: ExtendedActorSystem, message: AnyRef): SerializedMessage = { val s = SerializationExtension(system) val serializer = s.findSerializerFor(message) val builder = SerializedMessage.newBuilder - builder.setMessage(ByteString.copyFrom(serializer.toBinary(message))) - builder.setSerializerId(serializer.identifier) - serializer match { - case ser2: SerializerWithStringManifest ⇒ - val manifest = ser2.manifest(message) - if (manifest != "") - builder.setMessageManifest(ByteString.copyFromUtf8(manifest)) - case _ ⇒ - if (serializer.includeManifest) - builder.setMessageManifest(ByteString.copyFromUtf8(message.getClass.getName)) + try { + builder.setMessage(ByteString.copyFrom(serializer.toBinary(message))) + builder.setSerializerId(serializer.identifier) + serializer match { + case ser2: SerializerWithStringManifest ⇒ + val manifest = ser2.manifest(message) + if (manifest != "") + builder.setMessageManifest(ByteString.copyFromUtf8(manifest)) + case _ ⇒ + if (serializer.includeManifest) + builder.setMessageManifest(ByteString.copyFromUtf8(message.getClass.getName)) + } + builder.build + } catch { + case NonFatal(e) ⇒ + throw new SerializationException(s"Failed to serialize remote message [${message.getClass}] " + + s"using serializer [${serializer.getClass}].", e) } - builder.build } def serializeForArtery(serialization: Serialization, message: AnyRef, headerBuilder: HeaderBuilder, envelope: EnvelopeBuffer): Unit = { diff --git a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala index 85a3b7e90f..269f38a41c 100644 --- a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala +++ b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala @@ -909,7 +909,7 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: * The messages are looped through the [[getAsyncCallback]] mechanism of [[GraphStage]] so they are safe to modify * internal state of this stage. * - * This method must not (the earliest) be called after the [[GraphStageLogic]] constructor has finished running, + * This method must (the earliest) be called after the [[GraphStageLogic]] constructor has finished running, * for example from the [[preStart]] callback the graph stage logic provides. * * Created [[StageActorRef]] to get messages and watch other actors in synchronous way. diff --git a/akka-testkit/src/main/java/akka/testkit/JavaTestKit.java b/akka-testkit/src/main/java/akka/testkit/JavaTestKit.java index 7dabcaebac..ce0a98097d 100644 --- a/akka-testkit/src/main/java/akka/testkit/JavaTestKit.java +++ b/akka-testkit/src/main/java/akka/testkit/JavaTestKit.java @@ -4,9 +4,12 @@ package akka.testkit; import akka.actor.Terminated; +import scala.Option; import scala.runtime.AbstractFunction0; import akka.actor.ActorRef; import akka.actor.ActorSystem; +import akka.actor.Props; +import akka.actor.SupervisorStrategy; import akka.event.Logging; import akka.event.Logging.LogEvent; import akka.japi.JavaPartialFunction; @@ -684,4 +687,43 @@ public class JavaTestKit { public void shutdown(ActorSystem actorSystem, Boolean verifySystemShutdown) { shutdown(actorSystem, null, verifySystemShutdown); } + + /** + * Spawns an actor as a child of this test actor, and returns the child's ActorRef. + * @param props Props to create the child actor + * @param name Actor name for the child actor + * @param supervisorStrategy Strategy should decide what to do with failures in the actor. + */ + public ActorRef childActorOf(Props props, String name, SupervisorStrategy supervisorStrategy) { + return p.childActorOf(props, name, supervisorStrategy); + } + + /** + * Spawns an actor as a child of this test actor, and returns the child's ActorRef. + * The actor will have an auto-generated name. + * @param props Props to create the child actor + * @param supervisorStrategy Strategy should decide what to do with failures in the actor. + */ + public ActorRef childActorOf(Props props, SupervisorStrategy supervisorStrategy) { + return p.childActorOf(props, supervisorStrategy); + } + + /** + * Spawns an actor as a child of this test actor, and returns the child's ActorRef. + * The actor will be supervised using {@link SupervisorStrategy.stoppingStrategy}. + * @param props Props to create the child actor + * @param name Actor name for the child actor + */ + public ActorRef childActorOf(Props props, String name) { + return p.childActorOf(props, name); + } + + /** + * Spawns an actor as a child of this test actor, and returns the child's ActorRef. + * The actor will have an auto-generated name and will be supervised using {@link SupervisorStrategy.stoppingStrategy}. + * @param props Props to create the child actor + */ + public ActorRef childActorOf(Props props) { + return p.childActorOf(props); + } } diff --git a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala index 2016816ae6..fe106986ac 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala @@ -41,6 +41,12 @@ object TestActor { final case class Watch(ref: ActorRef) extends NoSerializationVerificationNeeded final case class UnWatch(ref: ActorRef) extends NoSerializationVerificationNeeded final case class SetAutoPilot(ap: AutoPilot) extends NoSerializationVerificationNeeded + final case class Spawn(props: Props, name: Option[String] = None, strategy: Option[SupervisorStrategy] = None) extends NoSerializationVerificationNeeded { + def apply(context: ActorRefFactory): ActorRef = name match { + case Some(n) ⇒ context.actorOf(props, n) + case None ⇒ context.actorOf(props) + } + } trait Message { def msg: AnyRef @@ -54,6 +60,31 @@ object TestActor { val FALSE = (x: Any) ⇒ false + /** INTERNAL API */ + private[TestActor] class DelegatingSupervisorStrategy extends SupervisorStrategy { + import SupervisorStrategy._ + + private var delegates = Map.empty[ActorRef, SupervisorStrategy] + + private def delegate(child: ActorRef) = delegates.get(child).getOrElse(stoppingStrategy) + + def update(child: ActorRef, supervisor: SupervisorStrategy): Unit = delegates += (child → supervisor) + + override def decider = defaultDecider // not actually invoked + + override def handleChildTerminated(context: ActorContext, child: ActorRef, children: Iterable[ActorRef]): Unit = { + delegates -= child + } + + override def processFailure(context: ActorContext, restart: Boolean, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Unit = { + delegates(child).processFailure(context, restart, child, cause, stats, children) + } + + override def handleFailure(context: ActorContext, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Boolean = { + delegates(child).handleFailure(context, child, cause, stats, children) + } + } + // make creator serializable, for VerifySerializabilitySpec def props(queue: BlockingDeque[Message]): Props = Props(classOf[TestActor], queue) } @@ -61,6 +92,8 @@ object TestActor { class TestActor(queue: BlockingDeque[TestActor.Message]) extends Actor { import TestActor._ + override val supervisorStrategy: DelegatingSupervisorStrategy = new DelegatingSupervisorStrategy + var ignore: Ignore = None var autopilot: AutoPilot = NoAutoPilot @@ -70,6 +103,10 @@ class TestActor(queue: BlockingDeque[TestActor.Message]) extends Actor { case Watch(ref) ⇒ context.watch(ref) case UnWatch(ref) ⇒ context.unwatch(ref) case SetAutoPilot(pilot) ⇒ autopilot = pilot + case spawn: Spawn ⇒ + val actor = spawn(context) + for (s ← spawn.strategy) supervisorStrategy(actor) = s + queue.offerLast(RealMessage(actor, self)) case x: AnyRef ⇒ autopilot = autopilot.run(sender(), x) match { case KeepRunning ⇒ autopilot @@ -102,7 +139,7 @@ class TestActor(queue: BlockingDeque[TestActor.Message]) extends Actor { */ trait TestKitBase { - import TestActor.{ Message, RealMessage, NullMessage } + import TestActor.{ Message, RealMessage, NullMessage, Spawn } implicit val system: ActorSystem val testKitSettings = TestKitExtension(system) @@ -688,6 +725,46 @@ trait TestKitBase { TestKit.shutdownActorSystem(actorSystem, duration, verifySystemShutdown) } + /** + * Spawns an actor as a child of this test actor, and returns the child's ActorRef. + * @param props Props to create the child actor + * @param name Actor name for the child actor + * @param supervisorStrategy Strategy should decide what to do with failures in the actor. + */ + def childActorOf(props: Props, name: String, supervisorStrategy: SupervisorStrategy): ActorRef = { + testActor ! Spawn(props, Some(name), Some(supervisorStrategy)) + expectMsgType[ActorRef] + } + + /** + * Spawns an actor as a child of this test actor with an auto-generated name, and returns the child's ActorRef. + * @param props Props to create the child actor + * @param supervisorStrategy Strategy should decide what to do with failures in the actor. + */ + def childActorOf(props: Props, supervisorStrategy: SupervisorStrategy): ActorRef = { + testActor ! Spawn(props, None, Some(supervisorStrategy)) + expectMsgType[ActorRef] + } + + /** + * Spawns an actor as a child of this test actor with a stopping supervisor strategy, and returns the child's ActorRef. + * @param props Props to create the child actor + * @param name Actor name for the child actor + */ + def childActorOf(props: Props, name: String): ActorRef = { + testActor ! Spawn(props, Some(name), None) + expectMsgType[ActorRef] + } + + /** + * Spawns an actor as a child of this test actor with an auto-generated name and stopping supervisor strategy, returning the child's ActorRef. + * @param props Props to create the child actor + */ + def childActorOf(props: Props): ActorRef = { + testActor ! Spawn(props, None, None) + expectMsgType[ActorRef] + } + private def format(u: TimeUnit, d: Duration) = "%.3f %s".format(d.toUnit(u), u.toString.toLowerCase) } diff --git a/akka-testkit/src/test/scala/akka/testkit/TestProbeSpec.scala b/akka-testkit/src/test/scala/akka/testkit/TestProbeSpec.scala index 1a81181a97..a8862a16b6 100644 --- a/akka-testkit/src/test/scala/akka/testkit/TestProbeSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/TestProbeSpec.scala @@ -7,6 +7,7 @@ import scala.concurrent.{ Await } import scala.concurrent.duration._ import akka.pattern.ask import scala.util.Try +import java.util.concurrent.atomic.AtomicInteger class TestProbeSpec extends AkkaSpec with DefaultTimeout { @@ -39,6 +40,38 @@ class TestProbeSpec extends AkkaSpec with DefaultTimeout { probe1.expectMsg(0 millis, "some hint here", "world") } + "create a child when invoking actorOf" in { + val probe = TestProbe() + val child = probe.childActorOf(TestActors.echoActorProps) + child.path.parent should be(probe.ref.path) + + val namedChild = probe.childActorOf(TestActors.echoActorProps, "actorName") + namedChild.path.name should be("actorName") + } + + "restart a failing child if the given supervisor says so" in { + val restarts = new AtomicInteger(0) + + class FailingActor extends Actor { + override def receive = msg ⇒ msg match { + case _ ⇒ + throw new RuntimeException("simulated failure") + } + + override def postRestart(reason: Throwable): Unit = { + restarts.incrementAndGet() + } + } + + val probe = TestProbe() + val child = probe.childActorOf(Props(new FailingActor), SupervisorStrategy.defaultStrategy) + + awaitAssert { + child ! "hello" + restarts.get() should be > (1) + } + } + def assertFailureMessageContains(expectedHint: String)(block: ⇒ Unit) { Try { block diff --git a/project/MiMa.scala b/project/MiMa.scala index 3c293e9b4f..3d896c3d31 100644 --- a/project/MiMa.scala +++ b/project/MiMa.scala @@ -969,7 +969,19 @@ object MiMa extends AutoPlugin { ), "2.4.10" -> Seq( ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.RemoteActorRefProvider.quarantine"), - ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.RemoteWatcher.quarantine") + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.RemoteWatcher.quarantine"), + ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOpsMat.orElseMat"), + + // #21201 adding childActorOf to TestActor / TestKit / TestProbe + ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.testkit.TestKitBase.childActorOf$default$3"), + ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.testkit.TestKitBase.childActorOf$default$2"), + ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.testkit.TestKitBase.childActorOf"), + + // #21184 add java api for ws testkit + ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.scaladsl.model.ws.TextMessage.asScala"), + ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.scaladsl.model.ws.TextMessage.getStreamedText"), + ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.scaladsl.model.ws.BinaryMessage.asScala"), + ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.scaladsl.model.ws.BinaryMessage.getStreamedData") ) ) }