From 8c6223e8788e260c6aaf413206d2fe2ba00aa581 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Endre=20S=C3=A1ndor=20Varga?= Date: Wed, 24 Jun 2015 14:43:52 +0200 Subject: [PATCH 1/4] Fix master cancellation in SplitWhere --- .../stream/scaladsl/FlowSplitWhenSpec.scala | 66 +++++++++++++++++++ .../stream/impl/SplitWhereProcessorImpl.scala | 2 +- 2 files changed, 67 insertions(+), 1 deletion(-) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSplitWhenSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSplitWhenSpec.scala index ad67f0afb3..9ab3b14345 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSplitWhenSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSplitWhenSpec.scala @@ -114,6 +114,72 @@ class FlowSplitWhenSpec extends AkkaSpec { masterSubscriber.expectComplete() } } + + "support cancelling both master and substream" in assertAllStagesStopped { + val inputs = TestPublisher.probe[Int]() + + val substream = TestSubscriber.probe[Int]() + val masterStream = TestSubscriber.probe[Any]() + + Source(inputs) + .splitWhen(_ == 2) + .map(_.runWith(Sink(substream))) + .runWith(Sink(masterStream)) + + masterStream.request(1) + inputs.sendNext(1) + + substream.cancel() + + masterStream.expectNext(()) + masterStream.expectNoMsg(1.second) + masterStream.cancel() + inputs.expectCancellation() + + val inputs2 = TestPublisher.probe[Int]() + Source(inputs2) + .splitWhen(_ == 2) + .map(_.runWith(Sink.cancelled)) + .runWith(Sink.cancelled) + + inputs2.expectCancellation() + + val inputs3 = TestPublisher.probe[Int]() + + val substream3 = TestSubscriber.probe[Int]() + val masterStream3 = TestSubscriber.probe[Source[Int, Any]]() + + Source(inputs3) + .splitWhen(_ == 2) + .runWith(Sink(masterStream3)) + + masterStream3.request(1) + inputs3.sendNext(1) + + val src = masterStream3.expectNext() + src.runWith(Sink.cancelled) + + masterStream3.request(1) + inputs3.sendNext(2) + val src2 = masterStream3.expectNext() + val substream4 = TestSubscriber.probe[Int]() + src2.runWith(Sink(substream4)) + + substream4.requestNext(2) + substream4.expectNoMsg(1.second) + masterStream3.expectNoMsg(1.second) + inputs3.expectRequest() + inputs3.expectRequest() + inputs3.expectNoMsg(1.second) + + substream4.cancel() + inputs3.expectNoMsg(1.second) + masterStream3.expectNoMsg(1.second) + + masterStream3.cancel() + inputs3.expectCancellation() + + } } "support cancelling the master stream" in assertAllStagesStopped { diff --git a/akka-stream/src/main/scala/akka/stream/impl/SplitWhereProcessorImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/SplitWhereProcessorImpl.scala index cf1d69b8ce..44c814956b 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/SplitWhereProcessorImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/SplitWhereProcessorImpl.scala @@ -121,7 +121,7 @@ private[akka] class SplitWhereProcessorImpl(_settings: ActorMaterializerSettings } // Ignore elements for a cancelled substream until a new substream needs to be opened - val ignoreUntilNewSubstream = TransferPhase(primaryInputs.NeedsInput) { () ⇒ + val ignoreUntilNewSubstream = TransferPhase(primaryInputs.NeedsInput && primaryOutputs.NeedsDemand) { () ⇒ val elem = primaryInputs.dequeueInputElement() decideSplit(elem) match { case Continue | Drop ⇒ // ignore elem From df35562c0b5b3f32c6d6703fcdaac31e2ce7378d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Endre=20S=C3=A1ndor=20Varga?= Date: Wed, 24 Jun 2015 14:44:07 +0200 Subject: [PATCH 2/4] Make broadcast cancellation configurable --- .../client/OutgoingConnectionBlueprint.scala | 2 +- .../akka/stream/impl/ActorMaterializerImpl.scala | 4 ++-- .../src/main/scala/akka/stream/impl/FanOut.scala | 7 ++++--- .../main/scala/akka/stream/impl/Junctions.scala | 3 ++- .../main/scala/akka/stream/javadsl/Graph.scala | 15 +++++++++++++-- .../main/scala/akka/stream/scaladsl/Graph.scala | 9 ++++++--- 6 files changed, 28 insertions(+), 12 deletions(-) diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/client/OutgoingConnectionBlueprint.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/client/OutgoingConnectionBlueprint.scala index e19abc372d..ad924c435f 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/client/OutgoingConnectionBlueprint.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/client/OutgoingConnectionBlueprint.scala @@ -78,7 +78,7 @@ private[http] object OutgoingConnectionBlueprint { FlowGraph.partial() { implicit b ⇒ import FlowGraph.Implicits._ - val methodBypassFanout = b.add(Broadcast[HttpRequest](2)) + val methodBypassFanout = b.add(Broadcast[HttpRequest](2, eagerCancel = true)) val responseParsingMerge = b.add(new ResponseParsingMerge(rootParser)) val terminationFanout = b.add(Broadcast[HttpResponse](2)) diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala index d8a005676b..29a1360f2d 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala @@ -173,8 +173,8 @@ private[akka] case class ActorMaterializerImpl( val flexi = r.flexi(r.shape) (FlexiRoute.props(effectiveSettings, r.shape, flexi), r.shape.inlets.head: InPort, r.shape.outlets) - case BroadcastModule(shape, _) ⇒ - (Broadcast.props(effectiveSettings, shape.outArray.size), shape.in, shape.outArray.toSeq) + case BroadcastModule(shape, eagerCancel, _) ⇒ + (Broadcast.props(effectiveSettings, eagerCancel, shape.outArray.size), shape.in, shape.outArray.toSeq) case BalanceModule(shape, waitForDownstreams, _) ⇒ (Balance.props(effectiveSettings, shape.outArray.size, waitForDownstreams), shape.in, shape.outArray.toSeq) diff --git a/akka-stream/src/main/scala/akka/stream/impl/FanOut.scala b/akka-stream/src/main/scala/akka/stream/impl/FanOut.scala index 8aea7288bf..6791d26854 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FanOut.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FanOut.scala @@ -289,14 +289,15 @@ private[akka] abstract class FanOut(val settings: ActorMaterializerSettings, val * INTERNAL API */ private[akka] object Broadcast { - def props(settings: ActorMaterializerSettings, outputPorts: Int): Props = - Props(new Broadcast(settings, outputPorts)).withDeploy(Deploy.local) + def props(settings: ActorMaterializerSettings, eagerCancel: Boolean, outputPorts: Int): Props = + Props(new Broadcast(settings, outputPorts, eagerCancel)).withDeploy(Deploy.local) } /** * INTERNAL API */ -private[akka] class Broadcast(_settings: ActorMaterializerSettings, _outputPorts: Int) extends FanOut(_settings, _outputPorts) { +private[akka] class Broadcast(_settings: ActorMaterializerSettings, _outputPorts: Int, eagerCancel: Boolean) extends FanOut(_settings, _outputPorts) { + outputBunch.unmarkCancelledOutputs(!eagerCancel) outputBunch.markAllOutputs() initialPhase(1, TransferPhase(primaryInputs.NeedsInput && outputBunch.AllOfMarkedOutputs) { () ⇒ diff --git a/akka-stream/src/main/scala/akka/stream/impl/Junctions.scala b/akka-stream/src/main/scala/akka/stream/impl/Junctions.scala index 56f65d1207..fb280a67cd 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Junctions.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Junctions.scala @@ -44,11 +44,12 @@ private[stream] object Junctions { final case class BroadcastModule[T]( shape: UniformFanOutShape[T, T], + eagerCancel: Boolean, override val attributes: Attributes = name("broadcast")) extends FanOutModule { override def withAttributes(attr: Attributes): Module = copy(attributes = attr) - override def carbonCopy: Module = BroadcastModule(shape.deepCopy(), attributes) + override def carbonCopy: Module = BroadcastModule(shape.deepCopy(), eagerCancel, attributes) } final case class MergePreferredModule[T]( diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala index 0da7b5cf9d..29ed4ccb9e 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala @@ -92,15 +92,26 @@ object MergePreferred { * * '''Completes when''' upstream completes * - * '''Cancels when''' all downstreams cancel + * '''Cancels when''' + * If eagerCancel is enabled: when any downstream cancels; otherwise: when all downstreams cancel */ object Broadcast { /** * Create a new `Broadcast` vertex with the specified input type. + * + * @param outputCount number of output ports + * @param eagerCancel if true, broadcast cancels upstream if any of its downstreams cancel. */ - def create[T](outputCount: Int): Graph[UniformFanOutShape[T, T], Unit] = + def create[T](outputCount: Int, eagerCancel: Boolean): Graph[UniformFanOutShape[T, T], Unit] = scaladsl.Broadcast(outputCount) + /** + * Create a new `Broadcast` vertex with the specified input type. + * + * @param outputCount number of output ports + */ + def create[T](outputCount: Int): Graph[UniformFanOutShape[T, T], Unit] = create(outputCount, eagerCancel = false) + /** * Create a new `Broadcast` vertex with the specified input type. */ diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala index 75d0e3a8b2..2a5e05cad7 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala @@ -104,10 +104,11 @@ object Broadcast { * Create a new `Broadcast` with the specified number of output ports. * * @param outputPorts number of output ports + * @param eagerCancel if true, broadcast cancels upstream if any of its downstreams cancel. */ - def apply[T](outputPorts: Int): Broadcast[T] = { + def apply[T](outputPorts: Int, eagerCancel: Boolean = false): Broadcast[T] = { val shape = new UniformFanOutShape[T, T](outputPorts) - new Broadcast(outputPorts, shape, new BroadcastModule(shape, Attributes.name("Broadcast"))) + new Broadcast(outputPorts, shape, new BroadcastModule(shape, eagerCancel, Attributes.name("Broadcast"))) } } @@ -121,7 +122,9 @@ object Broadcast { * * '''Completes when''' upstream completes * - * '''Cancels when''' all downstreams cancel + * '''Cancels when''' + * If eagerCancel is enabled: when any downstream cancels; otherwise: when all downstreams cancel + * */ class Broadcast[T] private (outputPorts: Int, override val shape: UniformFanOutShape[T, T], From 234aaadaa704b6eef9f142e1a35a4674c5555ba0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Endre=20S=C3=A1ndor=20Varga?= Date: Wed, 24 Jun 2015 16:15:01 +0200 Subject: [PATCH 3/4] Added test for cancellation scenarios in Http client/server --- .../client/ClientCancellationSpec.scala | 76 +++++++++++++++++++ 1 file changed, 76 insertions(+) create mode 100644 akka-http-core/src/test/scala/akka/http/impl/engine/client/ClientCancellationSpec.scala diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/client/ClientCancellationSpec.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/client/ClientCancellationSpec.scala new file mode 100644 index 0000000000..52c7b40653 --- /dev/null +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/client/ClientCancellationSpec.scala @@ -0,0 +1,76 @@ +package akka.http.impl.engine.client + +import javax.net.ssl.SSLContext + +import akka.http.scaladsl.{ HttpsContext, Http } +import akka.http.scaladsl.model.{ HttpHeader, HttpResponse, HttpRequest } +import akka.stream.ActorMaterializer +import akka.stream.scaladsl.{ Flow, Sink, Source } +import akka.stream.testkit.{ TestSubscriber, TestPublisher, AkkaSpec, TestUtils, Utils } +import akka.http.scaladsl.model.headers + +class ClientCancellationSpec extends AkkaSpec(""" + #akka.loggers = [] + akka.loglevel = DEBUG + #akka.io.tcp.trace-logging = off + akka.io.tcp.windows-connection-abort-workaround-enabled=auto""") { + + implicit val materializer = ActorMaterializer() + val noncheckedMaterializer = ActorMaterializer() + + "Http client connections" must { + val address = TestUtils.temporaryServerAddress() + Http().bindAndHandleSync( + { req ⇒ HttpResponse(headers = headers.Connection("close") :: Nil) }, + address.getHostName, + address.getPort)(noncheckedMaterializer) + + val addressTls = TestUtils.temporaryServerAddress() + Http().bindAndHandleSync( + { req ⇒ HttpResponse() }, // TLS client does full-close, no need for the connection:close header + addressTls.getHostName, + addressTls.getPort, + httpsContext = Some(HttpsContext(SSLContext.getDefault)))(noncheckedMaterializer) + + def testCase(connection: Flow[HttpRequest, HttpResponse, Any]): Unit = Utils.assertAllStagesStopped { + val requests = TestPublisher.probe[HttpRequest]() + val responses = TestSubscriber.probe[HttpResponse]() + Source(requests).via(connection).runWith(Sink(responses)) + responses.request(1) + requests.sendNext(HttpRequest()) + responses.expectNext().entity.dataBytes.runWith(Sink.cancelled) + responses.cancel() + requests.expectCancellation() + } + + "support cancellation in simple outgoing connection" in { + testCase( + Http().outgoingConnection(address.getHostName, address.getPort)) + } + + "support cancellation in pooled outgoing connection" in { + testCase( + Flow[HttpRequest] + .map((_, ())) + .via(Http().cachedHostConnectionPool(address.getHostName, address.getPort)(noncheckedMaterializer)) + .map(_._1.get)) + } + + "support cancellation in simple outgoing connection with TLS" in { + pending + testCase( + Http().outgoingConnectionTls(addressTls.getHostName, addressTls.getPort)) + } + + "support cancellation in pooled outgoing connection with TLS" in { + pending + testCase( + Flow[HttpRequest] + .map((_, ())) + .via(Http().cachedHostConnectionPoolTls(addressTls.getHostName, addressTls.getPort)(noncheckedMaterializer)) + .map(_._1.get)) + } + + } + +} From cb4aa33e1168ad70f675951137bfda9b3a43a991 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Endre=20S=C3=A1ndor=20Varga?= Date: Wed, 24 Jun 2015 16:19:04 +0200 Subject: [PATCH 4/4] Removed problematic and wrong test --- .../GraphJunctionAttributesSpec.scala | 55 ------------------- 1 file changed, 55 deletions(-) delete mode 100644 akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphJunctionAttributesSpec.scala diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphJunctionAttributesSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphJunctionAttributesSpec.scala deleted file mode 100644 index fbdc51ead3..0000000000 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphJunctionAttributesSpec.scala +++ /dev/null @@ -1,55 +0,0 @@ -/** - * Copyright (C) 2014 Typesafe Inc. - */ -package akka.stream.scaladsl - -import akka.stream.Attributes._ -import akka.stream.ActorMaterializer -import akka.stream.ActorMaterializerSettings -import akka.stream.testkit._ -import scala.concurrent.duration._ -import scala.concurrent.Await - -class GraphJunctionAttributesSpec extends AkkaSpec { - - implicit val set = ActorMaterializerSettings(system).withInputBuffer(4, 4) - implicit val mat = ActorMaterializer(set) - - "A zip" should { - - "take custom inputBuffer settings" in { - - sealed abstract class SlowTick - case object SlowTick extends SlowTick - - sealed abstract class FastTick - case object FastTick extends FastTick - - val source = Source[(SlowTick, List[FastTick])]() { implicit b ⇒ - import FlowGraph.Implicits._ - - val slow = Source(100.millis, 100.millis, SlowTick) - val fast = Source(0.seconds, 10.millis, FastTick) - - val zip = b add Zip[SlowTick, List[FastTick]]().withAttributes(inputBuffer(1, 1)) - - slow ~> zip.in0 - fast.conflate(tick ⇒ List(tick)) { case (list, tick) ⇒ tick :: list } ~> zip.in1 - - zip.out - } - - val future = source - .drop(1) // account for prefetch - .grouped(10) - .runWith(Sink.head) - val fastTicks = Await.result(future, 2.seconds).map(_._2.size) - - // Account for the possibility for the zip to act as a buffer of two. - // If that happens there would be one fast tick for one slow tick in the results. - // More explanation in #16435 - atLeast(8, fastTicks) shouldBe 10 +- 1 - } - } - -}