Merge pull request #17790 from drewhk/wip-17786-bindstream-cancel-not-stop-children-drewhk
=str #17786: Cancelling bind stream should not cancel children
This commit is contained in:
commit
2bd82617b4
8 changed files with 161 additions and 48 deletions
|
|
@ -15,7 +15,7 @@ import scala.concurrent.duration._
|
|||
import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpec }
|
||||
import akka.actor.ActorSystem
|
||||
import akka.testkit.EventFilter
|
||||
import akka.stream.{ ActorFlowMaterializer, BindFailedException }
|
||||
import akka.stream.{ StreamTcpException, ActorFlowMaterializer, BindFailedException }
|
||||
import akka.stream.scaladsl._
|
||||
import akka.stream.testkit._
|
||||
import akka.http.scaladsl.model.HttpEntity._
|
||||
|
|
@ -31,7 +31,9 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll {
|
|||
akka.loggers = ["akka.testkit.TestEventListener"]
|
||||
akka.loglevel = ERROR
|
||||
akka.stdout-loglevel = ERROR
|
||||
akka.log-dead-letters = OFF""")
|
||||
akka.log-dead-letters = OFF
|
||||
akka.io.tcp.windows-connection-abort-workaround-enabled = auto
|
||||
""")
|
||||
implicit val system = ActorSystem(getClass.getSimpleName, testConf)
|
||||
import system.dispatcher
|
||||
implicit val materializer = ActorFlowMaterializer()
|
||||
|
|
@ -131,30 +133,40 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll {
|
|||
}
|
||||
|
||||
"log materialization errors in `bindAndHandle`" which {
|
||||
"are triggered in `transform`" in {
|
||||
val testConf2: Config =
|
||||
ConfigFactory.parseString("akka.stream.materializer.subscription-timeout.timeout = 1 s")
|
||||
.withFallback(testConf)
|
||||
val system2 = ActorSystem(getClass.getSimpleName, testConf2)
|
||||
import system2.dispatcher
|
||||
val materializer2 = ActorFlowMaterializer.create(system2)
|
||||
|
||||
"are triggered in `transform`" in Utils.assertAllStagesStopped {
|
||||
val (_, hostname, port) = TestUtils.temporaryServerHostnameAndPort()
|
||||
val flow = Flow[HttpRequest].transform[HttpResponse](() ⇒ sys.error("BOOM"))
|
||||
val binding = Http().bindAndHandle(flow, hostname, port)
|
||||
val binding = Http(system2).bindAndHandle(flow, hostname, port)(materializer2)
|
||||
val b1 = Await.result(binding, 3.seconds)
|
||||
|
||||
EventFilter[RuntimeException](message = "BOOM", occurrences = 1) intercept {
|
||||
val (_, responseFuture) = Http().outgoingConnection(hostname, port).runWith(Source.single(HttpRequest()), Sink.head)
|
||||
Await.result(responseFuture.failed, 1.second) shouldBe a[NoSuchElementException]
|
||||
}
|
||||
EventFilter[RuntimeException](message = "BOOM", occurrences = 1).intercept {
|
||||
val (_, responseFuture) =
|
||||
Http(system2).outgoingConnection(hostname, port).runWith(Source.single(HttpRequest()), Sink.head)(materializer2)
|
||||
Await.result(responseFuture.failed, 5.second) shouldBe a[StreamTcpException]
|
||||
}(system2)
|
||||
Await.result(b1.unbind(), 1.second)
|
||||
}
|
||||
"are triggered in `mapMaterialized`" in {
|
||||
}(materializer2)
|
||||
|
||||
"are triggered in `mapMaterialized`" in Utils.assertAllStagesStopped {
|
||||
val (_, hostname, port) = TestUtils.temporaryServerHostnameAndPort()
|
||||
val flow = Flow[HttpRequest].map(_ ⇒ HttpResponse()).mapMaterializedValue(_ ⇒ sys.error("BOOM"))
|
||||
val binding = Http().bindAndHandle(flow, hostname, port)
|
||||
val b1 = Await.result(binding, 3.seconds)
|
||||
val binding = Http(system2).bindAndHandle(flow, hostname, port)(materializer2)
|
||||
val b1 = Await.result(binding, 1.seconds)
|
||||
|
||||
EventFilter[RuntimeException](message = "BOOM", occurrences = 1) intercept {
|
||||
val (_, responseFuture) = Http().outgoingConnection(hostname, port).runWith(Source.single(HttpRequest()), Sink.head)
|
||||
Await.result(responseFuture.failed, 1.second) shouldBe a[NoSuchElementException]
|
||||
}
|
||||
EventFilter[RuntimeException](message = "BOOM", occurrences = 1).intercept {
|
||||
val (_, responseFuture) =
|
||||
Http(system2).outgoingConnection(hostname, port).runWith(Source.single(HttpRequest()), Sink.head)(materializer2)
|
||||
Await.result(responseFuture.failed, 5.second) shouldBe a[StreamTcpException]
|
||||
}(system2)
|
||||
Await.result(b1.unbind(), 1.second)
|
||||
}
|
||||
}(materializer2)
|
||||
}
|
||||
|
||||
"properly complete a simple request/response cycle" in Utils.assertAllStagesStopped {
|
||||
|
|
|
|||
|
|
@ -150,6 +150,13 @@ class ActorInterpreterSpec extends AkkaSpec {
|
|||
}
|
||||
}
|
||||
|
||||
"handle failed stage factories" in {
|
||||
a[RuntimeException] should be thrownBy
|
||||
Await.result(
|
||||
Source.empty[Int].transform(() ⇒ sys.error("test error")).runWith(Sink.head),
|
||||
3.seconds)
|
||||
}
|
||||
|
||||
def largeDemand(extra: Int): Unit = {
|
||||
val N = 3 * system.settings.config.getInt("akka.stream.materializer.output-burst-limit")
|
||||
val large = new PushPullStage[Int, Int] {
|
||||
|
|
|
|||
|
|
@ -3,26 +3,21 @@
|
|||
*/
|
||||
package akka.stream.io
|
||||
|
||||
import akka.actor.{ ActorSystem, Kill }
|
||||
import akka.stream.scaladsl.Tcp.OutgoingConnection
|
||||
import akka.actor.{ActorSystem, Kill}
|
||||
import akka.io.Tcp._
|
||||
import akka.stream.scaladsl.Tcp.IncomingConnection
|
||||
import akka.stream.scaladsl.{Flow, _}
|
||||
import akka.stream.testkit.TestUtils.temporaryServerAddress
|
||||
import akka.stream.testkit.Utils._
|
||||
import akka.stream.testkit._
|
||||
import akka.stream.{ActorFlowMaterializer, BindFailedException, StreamTcpException}
|
||||
import akka.util.{ByteString, Helpers}
|
||||
|
||||
import scala.collection.immutable
|
||||
import scala.concurrent.{ Future, Await }
|
||||
import akka.io.Tcp._
|
||||
|
||||
import akka.stream.{ ActorFlowMaterializer, StreamTcpException, BindFailedException }
|
||||
|
||||
import scala.concurrent.Await
|
||||
import scala.concurrent.duration._
|
||||
import akka.util.{ Helpers, ByteString }
|
||||
import akka.stream.scaladsl.Flow
|
||||
import akka.stream.testkit._
|
||||
import akka.stream.testkit.Utils._
|
||||
import akka.stream.scaladsl._
|
||||
import akka.stream.testkit.TestUtils.temporaryServerAddress
|
||||
|
||||
class TcpSpec extends AkkaSpec("akka.io.tcp.windows-connection-abort-workaround-enabled=auto") with TcpHelper {
|
||||
import akka.stream.io.TcpHelper._
|
||||
class TcpSpec extends AkkaSpec("akka.io.tcp.windows-connection-abort-workaround-enabled=auto\nakka.stream.materializer.subscription-timeout.timeout = 3s") with TcpHelper {
|
||||
var demand = 0L
|
||||
|
||||
"Outgoing TCP stream" must {
|
||||
|
|
@ -494,6 +489,38 @@ class TcpSpec extends AkkaSpec("akka.io.tcp.windows-connection-abort-workaround-
|
|||
Await.result(binding4.unbind(), 1.second)
|
||||
}
|
||||
|
||||
"not shut down connections after the connection stream cancelled" in assertAllStagesStopped {
|
||||
val address = temporaryServerAddress()
|
||||
Tcp().bind(address.getHostName, address.getPort).take(1).runForeach(_.flow.join(Flow[ByteString]).run())
|
||||
|
||||
val total = Source(immutable.Iterable.fill(1000)(ByteString(0)))
|
||||
.via(Tcp().outgoingConnection(address))
|
||||
.runFold(0)(_ + _.size)
|
||||
|
||||
Await.result(total, 3.seconds) should ===(1000)
|
||||
}
|
||||
|
||||
"shut down properly even if some accepted connection Flows have not been subscribed to" in assertAllStagesStopped {
|
||||
val address = temporaryServerAddress()
|
||||
val takeTwoAndDropSecond = Flow[IncomingConnection].grouped(2).take(1).map(_.head)
|
||||
Tcp().bind(address.getHostName, address.getPort)
|
||||
.via(takeTwoAndDropSecond)
|
||||
.runForeach(_.flow.join(Flow[ByteString]).run())
|
||||
|
||||
val folder = Source(immutable.Iterable.fill(1000)(ByteString(0)))
|
||||
.via(Tcp().outgoingConnection(address))
|
||||
.toMat(Sink.fold(0)(_ + _.size))(Keep.right)
|
||||
|
||||
val total = folder.run()
|
||||
val rejected = folder.run()
|
||||
|
||||
Await.result(total, 3.seconds) should ===(1000)
|
||||
|
||||
a[StreamTcpException] should be thrownBy {
|
||||
Await.result(rejected, 5.seconds) should ===(1000)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
def validateServerClientCommunication(testData: ByteString,
|
||||
|
|
|
|||
|
|
@ -172,6 +172,8 @@ private[akka] class SimpleOutputs(val actor: ActorRef, val pump: Pump) extends D
|
|||
override def subreceive = _subreceive
|
||||
private val _subreceive = new SubReceive(waitingExposedPublisher)
|
||||
|
||||
def isSubscribed = subscriber ne null
|
||||
|
||||
def enqueueOutputElement(elem: Any): Unit = {
|
||||
ReactiveStreamsCompliance.requireNonNullElement(elem)
|
||||
downstreamDemand -= 1
|
||||
|
|
|
|||
|
|
@ -4,12 +4,13 @@
|
|||
package akka.stream.impl
|
||||
|
||||
import java.util.concurrent.atomic.{ AtomicInteger, AtomicBoolean, AtomicReference }
|
||||
import akka.stream.impl.MaterializerSession.MaterializationPanic
|
||||
import akka.stream.impl.StreamLayout.Module
|
||||
import akka.stream.scaladsl.Keep
|
||||
import akka.stream._
|
||||
import org.reactivestreams.{ Processor, Subscription, Publisher, Subscriber }
|
||||
import scala.collection.mutable
|
||||
import scala.util.control.NonFatal
|
||||
import scala.util.control.{ NoStackTrace, NonFatal }
|
||||
import akka.event.Logging.simpleName
|
||||
import scala.annotation.tailrec
|
||||
import java.util.concurrent.atomic.AtomicLong
|
||||
|
|
@ -540,6 +541,13 @@ private[stream] class MaterializedValuePublisher extends Publisher[Any] {
|
|||
|
||||
}
|
||||
|
||||
/**
|
||||
* INERNAL API
|
||||
*/
|
||||
private[stream] object MaterializerSession {
|
||||
class MaterializationPanic(cause: Throwable) extends RuntimeException("Materialization aborted.", cause) with NoStackTrace
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
|
|
@ -599,12 +607,38 @@ private[stream] abstract class MaterializerSession(val topLevel: StreamLayout.Mo
|
|||
|
||||
}
|
||||
|
||||
// Cancels all intermediate Publishers and fails all intermediate Subscribers.
|
||||
// (This is an attempt to clean up after an exception during materialization)
|
||||
private def panic(cause: Throwable): Unit = {
|
||||
val panicError = new MaterializationPanic(cause)
|
||||
for (subMap ← subscribersStack; sub ← subMap.valuesIterator) {
|
||||
sub.onSubscribe(new Subscription {
|
||||
override def cancel(): Unit = ()
|
||||
override def request(n: Long): Unit = sub.onError(panicError)
|
||||
})
|
||||
}
|
||||
|
||||
for (pubMap ← publishersStack; pub ← pubMap.valuesIterator) {
|
||||
pub.subscribe(new Subscriber[Any] {
|
||||
override def onSubscribe(s: Subscription): Unit = s.cancel()
|
||||
override def onComplete(): Unit = ()
|
||||
override def onError(t: Throwable): Unit = ()
|
||||
override def onNext(t: Any): Unit = ()
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
final def materialize(): Any = {
|
||||
require(topLevel ne EmptyModule, "An empty module cannot be materialized (EmptyModule was given)")
|
||||
require(
|
||||
topLevel.isRunnable,
|
||||
s"The top level module cannot be materialized because it has unconnected ports: ${(topLevel.inPorts ++ topLevel.outPorts).mkString(", ")}")
|
||||
materializeModule(topLevel, topLevel.attributes)
|
||||
try materializeModule(topLevel, topLevel.attributes)
|
||||
catch {
|
||||
case NonFatal(e) ⇒
|
||||
panic(e)
|
||||
throw e
|
||||
}
|
||||
}
|
||||
|
||||
protected def mergeAttributes(parent: OperationAttributes, current: OperationAttributes): OperationAttributes =
|
||||
|
|
|
|||
|
|
@ -377,8 +377,10 @@ private[akka] class ActorInterpreter(val settings: ActorFlowMaterializerSettings
|
|||
try upstream.onInternalError(AbruptTerminationException(self))
|
||||
// Will only have an effect if the above call to the interpreter failed to emit a proper failure to the downstream
|
||||
// otherwise this will have no effect
|
||||
finally downstream.fail(AbruptTerminationException(self))
|
||||
upstream.cancel()
|
||||
finally {
|
||||
downstream.fail(AbruptTerminationException(self))
|
||||
upstream.cancel()
|
||||
}
|
||||
}
|
||||
|
||||
override def postRestart(reason: Throwable): Unit = {
|
||||
|
|
|
|||
|
|
@ -5,16 +5,16 @@ package akka.stream.impl.io
|
|||
|
||||
import java.net.InetSocketAddress
|
||||
import akka.io.{ IO, Tcp }
|
||||
import akka.stream.impl.io.StreamTcpManager.ExposedProcessor
|
||||
import scala.concurrent.Promise
|
||||
import scala.util.control.NoStackTrace
|
||||
import akka.actor._
|
||||
import akka.util.ByteString
|
||||
import akka.io.Tcp._
|
||||
import akka.stream.ActorFlowMaterializerSettings
|
||||
import akka.stream.StreamTcpException
|
||||
import org.reactivestreams.Processor
|
||||
import akka.stream.{ AbruptTerminationException, StreamSubscriptionTimeoutSettings, ActorFlowMaterializerSettings, StreamTcpException }
|
||||
import org.reactivestreams.{ Publisher, Processor }
|
||||
import akka.stream.impl._
|
||||
import akka.actor.ActorLogging
|
||||
|
||||
import scala.util.control.NoStackTrace
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
|
|
@ -33,6 +33,7 @@ private[akka] object TcpStreamActor {
|
|||
def inboundProps(connection: ActorRef, halfClose: Boolean, settings: ActorFlowMaterializerSettings): Props =
|
||||
Props(new InboundTcpStreamActor(connection, halfClose, settings)).withDispatcher(settings.dispatcher).withDeploy(Deploy.local)
|
||||
|
||||
case object SubscriptionTimeout extends NoSerializationVerificationNeeded
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -47,7 +48,7 @@ private[akka] abstract class TcpStreamActor(val settings: ActorFlowMaterializerS
|
|||
override def inputOnError(e: Throwable): Unit = fail(e)
|
||||
}
|
||||
|
||||
val primaryOutputs: Outputs = new SimpleOutputs(self, readPump)
|
||||
val primaryOutputs: SimpleOutputs = new SimpleOutputs(self, readPump)
|
||||
|
||||
def fullClose: Boolean = !halfClose
|
||||
|
||||
|
|
@ -216,7 +217,14 @@ private[akka] abstract class TcpStreamActor(val settings: ActorFlowMaterializerS
|
|||
|
||||
final override def receive = new ExposedPublisherReceive(activeReceive, unhandled) {
|
||||
override def receiveExposedPublisher(ep: ExposedPublisher): Unit = {
|
||||
import context.dispatcher
|
||||
primaryOutputs.subreceive(ep)
|
||||
subscriptionTimer = Some(
|
||||
context.system.scheduler.scheduleOnce(
|
||||
settings.subscriptionTimeoutSettings.timeout,
|
||||
self,
|
||||
SubscriptionTimeout))
|
||||
|
||||
context become activeReceive
|
||||
}
|
||||
}
|
||||
|
|
@ -226,7 +234,8 @@ private[akka] abstract class TcpStreamActor(val settings: ActorFlowMaterializerS
|
|||
primaryOutputs.subreceive orElse
|
||||
tcpInputs.subreceive orElse
|
||||
tcpOutputs.subreceive orElse
|
||||
commonCloseHandling
|
||||
commonCloseHandling orElse
|
||||
handleSubscriptionTimeout
|
||||
|
||||
def commonCloseHandling: Receive = {
|
||||
case Terminated(_) ⇒ fail(new StreamTcpException("The connection actor has terminated. Stopping now."))
|
||||
|
|
@ -240,9 +249,20 @@ private[akka] abstract class TcpStreamActor(val settings: ActorFlowMaterializerS
|
|||
case Aborted ⇒ fail(new StreamTcpException("The connection has been aborted"))
|
||||
}
|
||||
|
||||
def handleSubscriptionTimeout: Receive = {
|
||||
case SubscriptionTimeout ⇒
|
||||
val millis = settings.subscriptionTimeoutSettings.timeout.toMillis
|
||||
if (!primaryOutputs.isSubscribed) {
|
||||
fail(new SubscriptionTimeoutException(s"Publisher was not attached to upstream within deadline (${millis}) ms") with NoStackTrace)
|
||||
context.stop(self)
|
||||
}
|
||||
}
|
||||
|
||||
readPump.nextPhase(readPump.running)
|
||||
writePump.nextPhase(writePump.running)
|
||||
|
||||
var subscriptionTimer: Option[Cancellable] = None
|
||||
|
||||
def fail(e: Throwable): Unit = {
|
||||
if (settings.debugLogging)
|
||||
log.debug("fail due to: {}", e.getMessage)
|
||||
|
|
@ -259,10 +279,12 @@ private[akka] abstract class TcpStreamActor(val settings: ActorFlowMaterializerS
|
|||
|
||||
override def postStop(): Unit = {
|
||||
// Close if it has not yet been done
|
||||
val abruptTermination = AbruptTerminationException(self)
|
||||
tcpInputs.cancel()
|
||||
tcpOutputs.complete()
|
||||
tcpOutputs.error(abruptTermination)
|
||||
primaryInputs.cancel()
|
||||
primaryOutputs.complete()
|
||||
primaryOutputs.error(abruptTermination)
|
||||
subscriptionTimer.foreach(_.cancel())
|
||||
super.postStop() // Remember, we have a Stash
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -65,7 +65,8 @@ private[akka] class TcpListenStreamActor(localAddressPromise: Promise[InetSocket
|
|||
finished = true
|
||||
incomingConnections.cancel()
|
||||
primaryOutputs.complete()
|
||||
context.stop(self)
|
||||
// Stop only after all already accepted connections have been shut down
|
||||
if (context.children.isEmpty) context.stop(self)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -123,6 +124,7 @@ private[akka] class TcpListenStreamActor(localAddressPromise: Promise[InetSocket
|
|||
if (!closed && listener != null) listener ! Unbind
|
||||
closed = true
|
||||
pendingConnection = null
|
||||
pump()
|
||||
}
|
||||
override def dequeueInputElement(): Any = {
|
||||
val elem = pendingConnection
|
||||
|
|
@ -139,11 +141,15 @@ private[akka] class TcpListenStreamActor(localAddressPromise: Promise[InetSocket
|
|||
}
|
||||
}
|
||||
|
||||
def activeReceive: Actor.Receive = primaryOutputs.subreceive orElse incomingConnections.subreceive
|
||||
def activeReceive: Actor.Receive = primaryOutputs.subreceive orElse incomingConnections.subreceive orElse {
|
||||
case Terminated(_) ⇒
|
||||
// If the Source is cancelled, and this was our last child, stop ourselves
|
||||
if (incomingConnections.isClosed && context.children.isEmpty) context.stop(self)
|
||||
}
|
||||
|
||||
def runningPhase = TransferPhase(primaryOutputs.NeedsDemand && incomingConnections.NeedsInput) { () ⇒
|
||||
val (connected: Connected, connection: ActorRef) = incomingConnections.dequeueInputElement()
|
||||
val tcpStreamActor = context.actorOf(TcpStreamActor.inboundProps(connection, halfClose, settings))
|
||||
val tcpStreamActor = context.watch(context.actorOf(TcpStreamActor.inboundProps(connection, halfClose, settings)))
|
||||
val processor = ActorProcessor[ByteString, ByteString](tcpStreamActor)
|
||||
val conn = StreamTcp.IncomingConnection(
|
||||
connected.localAddress,
|
||||
|
|
@ -154,6 +160,7 @@ private[akka] class TcpListenStreamActor(localAddressPromise: Promise[InetSocket
|
|||
|
||||
override def postStop(): Unit = {
|
||||
unboundPromise.trySuccess(())
|
||||
primaryOutputs.complete()
|
||||
super.postStop()
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue