diff --git a/akka-http-core/src/test/scala/akka/http/scaladsl/ClientServerSpec.scala b/akka-http-core/src/test/scala/akka/http/scaladsl/ClientServerSpec.scala index 7be5fa76f3..d5b3b20bc8 100644 --- a/akka-http-core/src/test/scala/akka/http/scaladsl/ClientServerSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/scaladsl/ClientServerSpec.scala @@ -15,7 +15,7 @@ import scala.concurrent.duration._ import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpec } import akka.actor.ActorSystem import akka.testkit.EventFilter -import akka.stream.{ ActorFlowMaterializer, BindFailedException } +import akka.stream.{ StreamTcpException, ActorFlowMaterializer, BindFailedException } import akka.stream.scaladsl._ import akka.stream.testkit._ import akka.http.scaladsl.model.HttpEntity._ @@ -31,7 +31,9 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll { akka.loggers = ["akka.testkit.TestEventListener"] akka.loglevel = ERROR akka.stdout-loglevel = ERROR - akka.log-dead-letters = OFF""") + akka.log-dead-letters = OFF + akka.io.tcp.windows-connection-abort-workaround-enabled = auto + """) implicit val system = ActorSystem(getClass.getSimpleName, testConf) import system.dispatcher implicit val materializer = ActorFlowMaterializer() @@ -131,30 +133,40 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll { } "log materialization errors in `bindAndHandle`" which { - "are triggered in `transform`" in { + val testConf2: Config = + ConfigFactory.parseString("akka.stream.materializer.subscription-timeout.timeout = 1 s") + .withFallback(testConf) + val system2 = ActorSystem(getClass.getSimpleName, testConf2) + import system2.dispatcher + val materializer2 = ActorFlowMaterializer.create(system2) + + "are triggered in `transform`" in Utils.assertAllStagesStopped { val (_, hostname, port) = TestUtils.temporaryServerHostnameAndPort() val flow = Flow[HttpRequest].transform[HttpResponse](() ⇒ sys.error("BOOM")) - val binding = Http().bindAndHandle(flow, hostname, port) + val binding = Http(system2).bindAndHandle(flow, hostname, port)(materializer2) val b1 = Await.result(binding, 3.seconds) - EventFilter[RuntimeException](message = "BOOM", occurrences = 1) intercept { - val (_, responseFuture) = Http().outgoingConnection(hostname, port).runWith(Source.single(HttpRequest()), Sink.head) - Await.result(responseFuture.failed, 1.second) shouldBe a[NoSuchElementException] - } + EventFilter[RuntimeException](message = "BOOM", occurrences = 1).intercept { + val (_, responseFuture) = + Http(system2).outgoingConnection(hostname, port).runWith(Source.single(HttpRequest()), Sink.head)(materializer2) + Await.result(responseFuture.failed, 5.second) shouldBe a[StreamTcpException] + }(system2) Await.result(b1.unbind(), 1.second) - } - "are triggered in `mapMaterialized`" in { + }(materializer2) + + "are triggered in `mapMaterialized`" in Utils.assertAllStagesStopped { val (_, hostname, port) = TestUtils.temporaryServerHostnameAndPort() val flow = Flow[HttpRequest].map(_ ⇒ HttpResponse()).mapMaterializedValue(_ ⇒ sys.error("BOOM")) - val binding = Http().bindAndHandle(flow, hostname, port) - val b1 = Await.result(binding, 3.seconds) + val binding = Http(system2).bindAndHandle(flow, hostname, port)(materializer2) + val b1 = Await.result(binding, 1.seconds) - EventFilter[RuntimeException](message = "BOOM", occurrences = 1) intercept { - val (_, responseFuture) = Http().outgoingConnection(hostname, port).runWith(Source.single(HttpRequest()), Sink.head) - Await.result(responseFuture.failed, 1.second) shouldBe a[NoSuchElementException] - } + EventFilter[RuntimeException](message = "BOOM", occurrences = 1).intercept { + val (_, responseFuture) = + Http(system2).outgoingConnection(hostname, port).runWith(Source.single(HttpRequest()), Sink.head)(materializer2) + Await.result(responseFuture.failed, 5.second) shouldBe a[StreamTcpException] + }(system2) Await.result(b1.unbind(), 1.second) - } + }(materializer2) } "properly complete a simple request/response cycle" in Utils.assertAllStagesStopped { diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/ActorInterpreterSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/ActorInterpreterSpec.scala index 4eca857805..b603829699 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/ActorInterpreterSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/ActorInterpreterSpec.scala @@ -150,6 +150,13 @@ class ActorInterpreterSpec extends AkkaSpec { } } + "handle failed stage factories" in { + a[RuntimeException] should be thrownBy + Await.result( + Source.empty[Int].transform(() ⇒ sys.error("test error")).runWith(Sink.head), + 3.seconds) + } + def largeDemand(extra: Int): Unit = { val N = 3 * system.settings.config.getInt("akka.stream.materializer.output-burst-limit") val large = new PushPullStage[Int, Int] { diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala index 3e7173b091..e73b44e0f2 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala @@ -3,26 +3,21 @@ */ package akka.stream.io -import akka.actor.{ ActorSystem, Kill } -import akka.stream.scaladsl.Tcp.OutgoingConnection +import akka.actor.{ActorSystem, Kill} +import akka.io.Tcp._ +import akka.stream.scaladsl.Tcp.IncomingConnection +import akka.stream.scaladsl.{Flow, _} +import akka.stream.testkit.TestUtils.temporaryServerAddress +import akka.stream.testkit.Utils._ +import akka.stream.testkit._ +import akka.stream.{ActorFlowMaterializer, BindFailedException, StreamTcpException} +import akka.util.{ByteString, Helpers} import scala.collection.immutable -import scala.concurrent.{ Future, Await } -import akka.io.Tcp._ - -import akka.stream.{ ActorFlowMaterializer, StreamTcpException, BindFailedException } - import scala.concurrent.Await import scala.concurrent.duration._ -import akka.util.{ Helpers, ByteString } -import akka.stream.scaladsl.Flow -import akka.stream.testkit._ -import akka.stream.testkit.Utils._ -import akka.stream.scaladsl._ -import akka.stream.testkit.TestUtils.temporaryServerAddress -class TcpSpec extends AkkaSpec("akka.io.tcp.windows-connection-abort-workaround-enabled=auto") with TcpHelper { - import akka.stream.io.TcpHelper._ +class TcpSpec extends AkkaSpec("akka.io.tcp.windows-connection-abort-workaround-enabled=auto\nakka.stream.materializer.subscription-timeout.timeout = 3s") with TcpHelper { var demand = 0L "Outgoing TCP stream" must { @@ -494,6 +489,38 @@ class TcpSpec extends AkkaSpec("akka.io.tcp.windows-connection-abort-workaround- Await.result(binding4.unbind(), 1.second) } + "not shut down connections after the connection stream cancelled" in assertAllStagesStopped { + val address = temporaryServerAddress() + Tcp().bind(address.getHostName, address.getPort).take(1).runForeach(_.flow.join(Flow[ByteString]).run()) + + val total = Source(immutable.Iterable.fill(1000)(ByteString(0))) + .via(Tcp().outgoingConnection(address)) + .runFold(0)(_ + _.size) + + Await.result(total, 3.seconds) should ===(1000) + } + + "shut down properly even if some accepted connection Flows have not been subscribed to" in assertAllStagesStopped { + val address = temporaryServerAddress() + val takeTwoAndDropSecond = Flow[IncomingConnection].grouped(2).take(1).map(_.head) + Tcp().bind(address.getHostName, address.getPort) + .via(takeTwoAndDropSecond) + .runForeach(_.flow.join(Flow[ByteString]).run()) + + val folder = Source(immutable.Iterable.fill(1000)(ByteString(0))) + .via(Tcp().outgoingConnection(address)) + .toMat(Sink.fold(0)(_ + _.size))(Keep.right) + + val total = folder.run() + val rejected = folder.run() + + Await.result(total, 3.seconds) should ===(1000) + + a[StreamTcpException] should be thrownBy { + Await.result(rejected, 5.seconds) should ===(1000) + } + } + } def validateServerClientCommunication(testData: ByteString, diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala index 170893d4bc..8102bd1d08 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala @@ -172,6 +172,8 @@ private[akka] class SimpleOutputs(val actor: ActorRef, val pump: Pump) extends D override def subreceive = _subreceive private val _subreceive = new SubReceive(waitingExposedPublisher) + def isSubscribed = subscriber ne null + def enqueueOutputElement(elem: Any): Unit = { ReactiveStreamsCompliance.requireNonNullElement(elem) downstreamDemand -= 1 diff --git a/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala b/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala index 473ef4bce4..c2cccbb051 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala @@ -4,12 +4,13 @@ package akka.stream.impl import java.util.concurrent.atomic.{ AtomicInteger, AtomicBoolean, AtomicReference } +import akka.stream.impl.MaterializerSession.MaterializationPanic import akka.stream.impl.StreamLayout.Module import akka.stream.scaladsl.Keep import akka.stream._ import org.reactivestreams.{ Processor, Subscription, Publisher, Subscriber } import scala.collection.mutable -import scala.util.control.NonFatal +import scala.util.control.{ NoStackTrace, NonFatal } import akka.event.Logging.simpleName import scala.annotation.tailrec import java.util.concurrent.atomic.AtomicLong @@ -540,6 +541,13 @@ private[stream] class MaterializedValuePublisher extends Publisher[Any] { } +/** + * INERNAL API + */ +private[stream] object MaterializerSession { + class MaterializationPanic(cause: Throwable) extends RuntimeException("Materialization aborted.", cause) with NoStackTrace +} + /** * INTERNAL API */ @@ -599,12 +607,38 @@ private[stream] abstract class MaterializerSession(val topLevel: StreamLayout.Mo } + // Cancels all intermediate Publishers and fails all intermediate Subscribers. + // (This is an attempt to clean up after an exception during materialization) + private def panic(cause: Throwable): Unit = { + val panicError = new MaterializationPanic(cause) + for (subMap ← subscribersStack; sub ← subMap.valuesIterator) { + sub.onSubscribe(new Subscription { + override def cancel(): Unit = () + override def request(n: Long): Unit = sub.onError(panicError) + }) + } + + for (pubMap ← publishersStack; pub ← pubMap.valuesIterator) { + pub.subscribe(new Subscriber[Any] { + override def onSubscribe(s: Subscription): Unit = s.cancel() + override def onComplete(): Unit = () + override def onError(t: Throwable): Unit = () + override def onNext(t: Any): Unit = () + }) + } + } + final def materialize(): Any = { require(topLevel ne EmptyModule, "An empty module cannot be materialized (EmptyModule was given)") require( topLevel.isRunnable, s"The top level module cannot be materialized because it has unconnected ports: ${(topLevel.inPorts ++ topLevel.outPorts).mkString(", ")}") - materializeModule(topLevel, topLevel.attributes) + try materializeModule(topLevel, topLevel.attributes) + catch { + case NonFatal(e) ⇒ + panic(e) + throw e + } } protected def mergeAttributes(parent: OperationAttributes, current: OperationAttributes): OperationAttributes = diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorInterpreter.scala index 80ef8b52fd..bc53852e10 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorInterpreter.scala @@ -377,8 +377,10 @@ private[akka] class ActorInterpreter(val settings: ActorFlowMaterializerSettings try upstream.onInternalError(AbruptTerminationException(self)) // Will only have an effect if the above call to the interpreter failed to emit a proper failure to the downstream // otherwise this will have no effect - finally downstream.fail(AbruptTerminationException(self)) - upstream.cancel() + finally { + downstream.fail(AbruptTerminationException(self)) + upstream.cancel() + } } override def postRestart(reason: Throwable): Unit = { diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/TcpConnectionStream.scala b/akka-stream/src/main/scala/akka/stream/impl/io/TcpConnectionStream.scala index bf18bbe02b..694e708f1a 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/TcpConnectionStream.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/TcpConnectionStream.scala @@ -5,16 +5,16 @@ package akka.stream.impl.io import java.net.InetSocketAddress import akka.io.{ IO, Tcp } +import akka.stream.impl.io.StreamTcpManager.ExposedProcessor import scala.concurrent.Promise -import scala.util.control.NoStackTrace import akka.actor._ import akka.util.ByteString import akka.io.Tcp._ -import akka.stream.ActorFlowMaterializerSettings -import akka.stream.StreamTcpException -import org.reactivestreams.Processor +import akka.stream.{ AbruptTerminationException, StreamSubscriptionTimeoutSettings, ActorFlowMaterializerSettings, StreamTcpException } +import org.reactivestreams.{ Publisher, Processor } import akka.stream.impl._ -import akka.actor.ActorLogging + +import scala.util.control.NoStackTrace /** * INTERNAL API @@ -33,6 +33,7 @@ private[akka] object TcpStreamActor { def inboundProps(connection: ActorRef, halfClose: Boolean, settings: ActorFlowMaterializerSettings): Props = Props(new InboundTcpStreamActor(connection, halfClose, settings)).withDispatcher(settings.dispatcher).withDeploy(Deploy.local) + case object SubscriptionTimeout extends NoSerializationVerificationNeeded } /** @@ -47,7 +48,7 @@ private[akka] abstract class TcpStreamActor(val settings: ActorFlowMaterializerS override def inputOnError(e: Throwable): Unit = fail(e) } - val primaryOutputs: Outputs = new SimpleOutputs(self, readPump) + val primaryOutputs: SimpleOutputs = new SimpleOutputs(self, readPump) def fullClose: Boolean = !halfClose @@ -216,7 +217,14 @@ private[akka] abstract class TcpStreamActor(val settings: ActorFlowMaterializerS final override def receive = new ExposedPublisherReceive(activeReceive, unhandled) { override def receiveExposedPublisher(ep: ExposedPublisher): Unit = { + import context.dispatcher primaryOutputs.subreceive(ep) + subscriptionTimer = Some( + context.system.scheduler.scheduleOnce( + settings.subscriptionTimeoutSettings.timeout, + self, + SubscriptionTimeout)) + context become activeReceive } } @@ -226,7 +234,8 @@ private[akka] abstract class TcpStreamActor(val settings: ActorFlowMaterializerS primaryOutputs.subreceive orElse tcpInputs.subreceive orElse tcpOutputs.subreceive orElse - commonCloseHandling + commonCloseHandling orElse + handleSubscriptionTimeout def commonCloseHandling: Receive = { case Terminated(_) ⇒ fail(new StreamTcpException("The connection actor has terminated. Stopping now.")) @@ -240,9 +249,20 @@ private[akka] abstract class TcpStreamActor(val settings: ActorFlowMaterializerS case Aborted ⇒ fail(new StreamTcpException("The connection has been aborted")) } + def handleSubscriptionTimeout: Receive = { + case SubscriptionTimeout ⇒ + val millis = settings.subscriptionTimeoutSettings.timeout.toMillis + if (!primaryOutputs.isSubscribed) { + fail(new SubscriptionTimeoutException(s"Publisher was not attached to upstream within deadline (${millis}) ms") with NoStackTrace) + context.stop(self) + } + } + readPump.nextPhase(readPump.running) writePump.nextPhase(writePump.running) + var subscriptionTimer: Option[Cancellable] = None + def fail(e: Throwable): Unit = { if (settings.debugLogging) log.debug("fail due to: {}", e.getMessage) @@ -259,10 +279,12 @@ private[akka] abstract class TcpStreamActor(val settings: ActorFlowMaterializerS override def postStop(): Unit = { // Close if it has not yet been done + val abruptTermination = AbruptTerminationException(self) tcpInputs.cancel() - tcpOutputs.complete() + tcpOutputs.error(abruptTermination) primaryInputs.cancel() - primaryOutputs.complete() + primaryOutputs.error(abruptTermination) + subscriptionTimer.foreach(_.cancel()) super.postStop() // Remember, we have a Stash } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/TcpListenStreamActor.scala b/akka-stream/src/main/scala/akka/stream/impl/io/TcpListenStreamActor.scala index 63df82ae4d..306bb2f126 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/TcpListenStreamActor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/TcpListenStreamActor.scala @@ -65,7 +65,8 @@ private[akka] class TcpListenStreamActor(localAddressPromise: Promise[InetSocket finished = true incomingConnections.cancel() primaryOutputs.complete() - context.stop(self) + // Stop only after all already accepted connections have been shut down + if (context.children.isEmpty) context.stop(self) } } @@ -123,6 +124,7 @@ private[akka] class TcpListenStreamActor(localAddressPromise: Promise[InetSocket if (!closed && listener != null) listener ! Unbind closed = true pendingConnection = null + pump() } override def dequeueInputElement(): Any = { val elem = pendingConnection @@ -139,11 +141,15 @@ private[akka] class TcpListenStreamActor(localAddressPromise: Promise[InetSocket } } - def activeReceive: Actor.Receive = primaryOutputs.subreceive orElse incomingConnections.subreceive + def activeReceive: Actor.Receive = primaryOutputs.subreceive orElse incomingConnections.subreceive orElse { + case Terminated(_) ⇒ + // If the Source is cancelled, and this was our last child, stop ourselves + if (incomingConnections.isClosed && context.children.isEmpty) context.stop(self) + } def runningPhase = TransferPhase(primaryOutputs.NeedsDemand && incomingConnections.NeedsInput) { () ⇒ val (connected: Connected, connection: ActorRef) = incomingConnections.dequeueInputElement() - val tcpStreamActor = context.actorOf(TcpStreamActor.inboundProps(connection, halfClose, settings)) + val tcpStreamActor = context.watch(context.actorOf(TcpStreamActor.inboundProps(connection, halfClose, settings))) val processor = ActorProcessor[ByteString, ByteString](tcpStreamActor) val conn = StreamTcp.IncomingConnection( connected.localAddress, @@ -154,6 +160,7 @@ private[akka] class TcpListenStreamActor(localAddressPromise: Promise[InetSocket override def postStop(): Unit = { unboundPromise.trySuccess(()) + primaryOutputs.complete() super.postStop() }