diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolInterfaceActor.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolInterfaceActor.scala index 94e5574071..5768324378 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolInterfaceActor.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolInterfaceActor.scala @@ -5,18 +5,19 @@ package akka.http.impl.engine.client import java.net.InetSocketAddress +import akka.event.Logging + import scala.annotation.tailrec import scala.concurrent.Promise import scala.concurrent.duration.FiniteDuration import akka.actor._ -import akka.stream.Materializer +import akka.stream.{ ActorAttributes, Materializer } import akka.stream.actor.{ ActorPublisher, ActorSubscriber, ZeroRequestStrategy } import akka.stream.actor.ActorPublisherMessage._ import akka.stream.actor.ActorSubscriberMessage._ -import akka.stream.impl.FixedSizeBuffer -import akka.stream.scaladsl.{ Sink, Source } +import akka.stream.impl.{ SeqActorName, FixedSizeBuffer } +import akka.stream.scaladsl.{ Keep, Flow, Sink, Source } import akka.http.HostConnectionPoolSetup -import akka.http.impl.util.SeqActorName import akka.http.scaladsl.model._ import akka.http.scaladsl.Http import PoolFlow._ @@ -54,14 +55,23 @@ private class PoolInterfaceActor(hcps: HostConnectionPoolSetup, log.debug("(Re-)starting host connection pool to {}:{}", hcps.host, hcps.port) - { // start the pool flow with this actor acting as source as well as sink + initConnectionFlow() + + /** Start the pool flow with this actor acting as source as well as sink */ + private def initConnectionFlow() = { import context.system import hcps._ import setup._ + val connectionFlow = if (httpsContext.isEmpty) Http().outgoingConnection(host, port, None, settings.connectionSettings, setup.log) else Http().outgoingConnectionTls(host, port, None, settings.connectionSettings, httpsContext, setup.log) - val poolFlow = PoolFlow(connectionFlow, new InetSocketAddress(host, port), settings, setup.log) + + val poolFlow = PoolFlow( + Flow[HttpRequest].viaMat(connectionFlow)(Keep.right), + new InetSocketAddress(host, port), settings, setup.log) + .named("PoolFlow") + Source(ActorPublisher(self)).via(poolFlow).runWith(Sink(ActorSubscriber[ResponseContext](self))) } @@ -106,7 +116,7 @@ private class PoolInterfaceActor(hcps: HostConnectionPoolSetup, // if we can't dispatch right now we buffer and dispatch when demand from the pool arrives if (inputBuffer.isFull) { x.responsePromise.failure( - new RuntimeException(s"Exceeded configured max-open-requests value of [${inputBuffer.size}]")) + new RuntimeException(s"Exceeded configured max-open-requests value of [${inputBuffer.size}]")) // TODO maybe named exception? } else inputBuffer.enqueue(x) } else dispatchRequest(x) // if we can dispatch right now, do it request(1) // for every incoming request we demand one response from the pool diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolSlot.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolSlot.scala index 7ac40d9998..01c3584c5e 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolSlot.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolSlot.scala @@ -4,20 +4,22 @@ package akka.http.impl.engine.client -import language.existentials import java.net.InetSocketAddress -import scala.concurrent.Future -import scala.util.{ Failure, Success } -import scala.collection.immutable + import akka.actor._ -import akka.http.scaladsl.model.{ HttpEntity, HttpResponse, HttpRequest } -import akka.http.scaladsl.util.FastFuture import akka.http.ConnectionPoolSettings import akka.http.impl.util._ -import akka.stream.impl.{ SubscribePending, ExposedPublisher, ActorProcessor } -import akka.stream.actor._ -import akka.stream.scaladsl._ +import akka.http.scaladsl.model.{ HttpEntity, HttpRequest, HttpResponse } +import akka.http.scaladsl.util.FastFuture import akka.stream._ +import akka.stream.actor._ +import akka.stream.impl.{ ActorProcessor, ExposedPublisher, SeqActorName, SubscribePending } +import akka.stream.scaladsl._ + +import scala.collection.immutable +import scala.concurrent.Future +import scala.language.existentials +import scala.util.{ Failure, Success } private object PoolSlot { import PoolFlow.{ RequestContext, ResponseContext } @@ -56,10 +58,12 @@ private object PoolSlot { FlowGraph.create() { implicit b ⇒ import FlowGraph.Implicits._ + // TODO wouldn't be better to have them under a known parent? /user/SlotProcessor-0 seems weird + val name = slotProcessorActorName.next() val slotProcessor = b.add { Flow.fromProcessor { () ⇒ val actor = system.actorOf(Props(new SlotProcessor(slotIx, connectionFlow, settings)).withDeploy(Deploy.local), - slotProcessorActorName.next()) + name) ActorProcessor[RequestContext, List[ProcessorOut]](actor) }.mapConcat(identity) } @@ -72,8 +76,8 @@ private object PoolSlot { split.out(1).collect { case r: RawSlotEvent ⇒ r }.outlet) } - import ActorSubscriberMessage._ import ActorPublisherMessage._ + import ActorSubscriberMessage._ /** * An actor mananging a series of materializations of the given `connectionFlow`. @@ -87,15 +91,15 @@ private object PoolSlot { private class SlotProcessor(slotIx: Int, connectionFlow: Flow[HttpRequest, HttpResponse, Any], settings: ConnectionPoolSettings)(implicit fm: Materializer) extends ActorSubscriber with ActorPublisher[List[ProcessorOut]] with ActorLogging { - var exposedPublisher: akka.stream.impl.ActorPublisher[Any] = _ var inflightRequests = immutable.Queue.empty[RequestContext] val runnableGraph = Source.actorPublisher[HttpRequest](Props(new FlowInportActor(self)).withDeploy(Deploy.local)) .via(connectionFlow) .toMat(Sink.actorSubscriber[HttpResponse](Props(new FlowOutportActor(self)).withDeploy(Deploy.local)))(Keep.both) + .named("SlotProcessorInternalConnectionFlow") - def requestStrategy = ZeroRequestStrategy - def receive = waitingExposedPublisher + override def requestStrategy = ZeroRequestStrategy + override def receive = waitingExposedPublisher def waitingExposedPublisher: Receive = { case ExposedPublisher(publisher) ⇒ @@ -107,20 +111,25 @@ private object PoolSlot { def waitingForSubscribePending: Receive = { case SubscribePending ⇒ exposedPublisher.takePendingSubscribers() foreach (s ⇒ self ! ActorPublisher.Internal.Subscribe(s)) + log.debug("become unconnected, from subscriber pending") context.become(unconnected) } val unconnected: Receive = { - case OnNext(rc: RequestContext) ⇒ + case m @ OnNext(rc: RequestContext) ⇒ val (connInport, connOutport) = runnableGraph.run() connOutport ! Request(totalDemand) context.become(waitingForDemandFromConnection(connInport, connOutport, rc)) - case Request(_) ⇒ if (remainingRequested == 0) request(1) // ask for first request if necessary + case m @ Request(_) ⇒ if (remainingRequested == 0) request(1) // ask for first request if necessary - case Cancel ⇒ { cancel(); shutdown() } - case OnComplete ⇒ onComplete() - case OnError(e) ⇒ onError(e) + case m @ OnComplete ⇒ onComplete() + case m @ OnError(e) ⇒ onError(e) + case m @ Cancel ⇒ + cancel() + shutdown() + + case c @ FromConnection(msg) ⇒ // ignore ... } def waitingForDemandFromConnection(connInport: ActorRef, connOutport: ActorRef, @@ -136,8 +145,8 @@ private object PoolSlot { context.become(running(connInport, connOutport)) case FromConnection(Cancel) ⇒ if (!isActive) { cancel(); shutdown() } // else ignore and wait for accompanying OnComplete or OnError - case FromConnection(OnComplete) ⇒ handleDisconnect(None) - case FromConnection(OnError(e)) ⇒ handleDisconnect(Some(e)) + case FromConnection(OnComplete) ⇒ handleDisconnect(sender(), None, Some(firstRequest)) + case FromConnection(OnError(e)) ⇒ handleDisconnect(sender(), Some(e), Some(firstRequest)) case FromConnection(OnNext(x)) ⇒ throw new IllegalStateException("Unexpected HttpResponse: " + x) } @@ -171,18 +180,29 @@ private object PoolSlot { val requestCompleted = SlotEvent.RequestCompletedFuture(whenCompleted.map(_ ⇒ SlotEvent.RequestCompleted(slotIx))) onNext(delivery :: requestCompleted :: Nil) - case FromConnection(OnComplete) ⇒ handleDisconnect(None) - case FromConnection(OnError(e)) ⇒ handleDisconnect(Some(e)) + case FromConnection(OnComplete) ⇒ handleDisconnect(sender(), None) + case FromConnection(OnError(e)) ⇒ handleDisconnect(sender(), Some(e)) } - def handleDisconnect(error: Option[Throwable]): Unit = { + def handleDisconnect(connInport: ActorRef, error: Option[Throwable], firstContext: Option[RequestContext] = None): Unit = { log.debug("Slot {} disconnected after {}", slotIx, error getOrElse "regular connection close") - val results: List[ProcessorOut] = inflightRequests.map { rc ⇒ - if (rc.retriesLeft == 0) { - val reason = error.fold[Throwable](new RuntimeException("Unexpected disconnect"))(identityFunc) - ResponseDelivery(ResponseContext(rc, Failure(reason))) - } else SlotEvent.RetryRequest(rc.copy(retriesLeft = rc.retriesLeft - 1)) - }(collection.breakOut) + + val results: List[ProcessorOut] = { + if (inflightRequests.isEmpty && firstContext.isDefined) { + (error match { + case Some(err) ⇒ ResponseDelivery(ResponseContext(firstContext.get, Failure(new RuntimeException("Unexpected (early) disconnect", err)))) + case _ ⇒ ResponseDelivery(ResponseContext(firstContext.get, Failure(new RuntimeException("Unexpected (early) disconnect")))) + }) :: Nil + } else { + inflightRequests.map { rc ⇒ + if (rc.retriesLeft == 0) { + val reason = error.fold[Throwable](new RuntimeException("Unexpected disconnect"))(identityFunc) + connInport ! ActorPublisherMessage.Cancel + ResponseDelivery(ResponseContext(rc, Failure(reason))) + } else SlotEvent.RetryRequest(rc.copy(retriesLeft = rc.retriesLeft - 1)) + }(collection.breakOut) + } + } inflightRequests = immutable.Queue.empty onNext(SlotEvent.Disconnected(slotIx, results.size) :: results) if (canceled) onComplete() @@ -207,23 +227,27 @@ private object PoolSlot { private case class FromConnection(ev: Any) extends NoSerializationVerificationNeeded - private class FlowInportActor(slotProcessor: ActorRef) extends ActorPublisher[HttpRequest] { + private class FlowInportActor(slotProcessor: ActorRef) extends ActorPublisher[HttpRequest] with ActorLogging { def receive: Receive = { case ev: Request ⇒ slotProcessor ! FromConnection(ev) - case Cancel ⇒ { slotProcessor ! FromConnection(Cancel); context.stop(self) } case OnNext(r: HttpRequest) ⇒ onNext(r) - case OnComplete ⇒ { onComplete(); context.stop(self) } - case OnError(e) ⇒ { onError(e); context.stop(self) } + case OnComplete ⇒ onCompleteThenStop() + case OnError(e) ⇒ onErrorThenStop(e) + case Cancel ⇒ + slotProcessor ! FromConnection(Cancel) + context.stop(self) } } - private class FlowOutportActor(slotProcessor: ActorRef) extends ActorSubscriber { + private class FlowOutportActor(slotProcessor: ActorRef) extends ActorSubscriber with ActorLogging { def requestStrategy = ZeroRequestStrategy def receive: Receive = { - case Request(n) ⇒ request(n) - case Cancel ⇒ cancel() - case ev: OnNext ⇒ slotProcessor ! FromConnection(ev) - case ev @ (OnComplete | OnError(_)) ⇒ { slotProcessor ! FromConnection(ev); context.stop(self) } + case Request(n) ⇒ request(n) + case Cancel ⇒ cancel() + case ev: OnNext ⇒ slotProcessor ! FromConnection(ev) + case ev @ (OnComplete | OnError(_)) ⇒ + slotProcessor ! FromConnection(ev) + context.stop(self) } } } diff --git a/akka-http-core/src/main/scala/akka/http/impl/util/package.scala b/akka-http-core/src/main/scala/akka/http/impl/util/package.scala index fdff173f60..e04fb8fd14 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/util/package.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/util/package.scala @@ -177,11 +177,6 @@ package util { def receive = { case x ⇒ log.warning(x.toString) } } - // Provisioning of actor names composed of a common prefix + a counter. According to #16613 not in scope as public API. - private[http] final class SeqActorName(prefix: String) extends AtomicInteger { - def next(): String = prefix + '-' + getAndIncrement() - } - private[http] trait LogMessages extends ActorLogging { this: Actor ⇒ def logMessages(mark: String = "")(r: Receive): Receive = new Receive { diff --git a/akka-http-tests/src/test/scala/akka/http/scaladsl/TestUtils.scala b/akka-http-tests/src/test/scala/akka/http/scaladsl/TestUtils.scala index 7eb2da3e9f..bbb3d4520f 100644 --- a/akka-http-tests/src/test/scala/akka/http/scaladsl/TestUtils.scala +++ b/akka-http-tests/src/test/scala/akka/http/scaladsl/TestUtils.scala @@ -5,6 +5,8 @@ package akka.http.scaladsl import java.io.{ FileOutputStream, File } +import java.net.InetSocketAddress +import java.nio.channels.ServerSocketChannel object TestUtils { def writeAllText(text: String, file: File): Unit = { @@ -13,4 +15,20 @@ object TestUtils { fos.write(text.getBytes("UTF-8")) } finally fos.close() } + + // TODO duplicated code from akka-http-core-tests + def temporaryServerAddress(interface: String = "127.0.0.1"): InetSocketAddress = { + val serverSocket = ServerSocketChannel.open() + try { + serverSocket.socket.bind(new InetSocketAddress(interface, 0)) + val port = serverSocket.socket.getLocalPort + new InetSocketAddress(interface, port) + } finally serverSocket.close() + } + + // TODO duplicated code from akka-http-core-tests + def temporaryServerHostnameAndPort(interface: String = "127.0.0.1"): (InetSocketAddress, String, Int) = { + val socketAddress = temporaryServerAddress(interface) + (socketAddress, socketAddress.getHostName, socketAddress.getPort) + } } diff --git a/akka-http-tests/src/test/scala/akka/http/scaladsl/server/DontLeakActorsOnFailingConnectionSpecs.scala b/akka-http-tests/src/test/scala/akka/http/scaladsl/server/DontLeakActorsOnFailingConnectionSpecs.scala new file mode 100644 index 0000000000..72d21574b5 --- /dev/null +++ b/akka-http-tests/src/test/scala/akka/http/scaladsl/server/DontLeakActorsOnFailingConnectionSpecs.scala @@ -0,0 +1,102 @@ +/* + * Copyright (C) 2009-2015 Typesafe Inc. + */ + +package akka.http.scaladsl.server + +import java.net.InetSocketAddress +import java.nio.channels.ServerSocketChannel +import java.util.concurrent.{ TimeUnit, CountDownLatch } + +import akka.actor.ActorSystem +import akka.event.Logging +import akka.http.scaladsl.{ TestUtils, Http } +import akka.http.scaladsl.model.{ HttpResponse, Uri, HttpRequest } +import akka.stream.impl.{ StreamSupervisor, ActorMaterializerImpl } +import akka.stream.{ ActorMaterializer, Materializer, OverflowStrategy } +import akka.stream.scaladsl.{ Sink, Source } +import akka.testkit.TestProbe +import com.typesafe.config.ConfigFactory +import org.scalatest.{ Matchers, BeforeAndAfterAll, WordSpecLike } +import sun.security.provider.ConfigFile + +import scala.concurrent.duration.FiniteDuration +import scala.concurrent.{ Await, Future } +import scala.concurrent.duration._ +import scala.util.{ Failure, Success, Try } + +class DontLeakActorsOnFailingConnectionSpecs extends WordSpecLike with Matchers with BeforeAndAfterAll { + + val config = ConfigFactory.parseString(""" + akka { + # disable logs (very noisy tests - 100 exepected errors) + loglevel = OFF + stdout-loglevel = OFF + }""").withFallback(ConfigFactory.load()) + implicit val system = ActorSystem("DontLeakActorsOnFailingConnectionSpecs", config) + import system.dispatcher + implicit val mat = ActorMaterializer() + + val log = Logging(system, getClass) + + // TODO DUPLICATED + def assertAllStagesStopped[T](name: String)(block: ⇒ T)(implicit materializer: Materializer): T = + materializer match { + case impl: ActorMaterializerImpl ⇒ + val probe = TestProbe()(impl.system) + probe.send(impl.supervisor, StreamSupervisor.StopChildren) + probe.expectMsg(StreamSupervisor.StoppedChildren) + val result = block + probe.within(5.seconds) { + probe.awaitAssert { + impl.supervisor.tell(StreamSupervisor.GetChildren, probe.ref) + val children = probe.expectMsgType[StreamSupervisor.Children].children.filter { c ⇒ + c.path.toString contains name + } + assert(children.isEmpty, + s"expected no StreamSupervisor children, but got [${children.mkString(", ")}]") + } + } + result + case _ ⇒ block + } + + "Http.superPool" should { + + "not leak connection Actors when hitting non-existing endpoint" in { + assertAllStagesStopped("InternalConnectionFlow") { + val reqsCount = 100 + val clientFlow = Http().superPool[Int]() + val (_, _, port) = TestUtils.temporaryServerHostnameAndPort() + val source = Source(1 to reqsCount).map(i ⇒ HttpRequest(uri = Uri(s"http://127.0.0.1:$port/test/$i")) -> i) + + val countDown = new CountDownLatch(reqsCount) + val sink = Sink.foreach[(Try[HttpResponse], Int)] { + case (resp, id) ⇒ handleResponse(resp, id) + } + + val resps = source.via(clientFlow).runWith(sink) + resps.onComplete({ case _ ⇒ countDown.countDown() }) + + countDown.await(10, TimeUnit.SECONDS) + } + } + } + + private def handleResponse(httpResp: Try[HttpResponse], id: Int): Unit = { + httpResp match { + case Success(httpRes) ⇒ + log.warning(s"$id: OK: (${httpRes.status.intValue}") + httpRes.entity.dataBytes.runWith(Sink.ignore) + + case Failure(ex) ⇒ + log.warning(s"$id: FAIL: $ex") + } + } + + override def afterAll = { + system.shutdown() + system.awaitTermination(3.seconds) + } + +} \ No newline at end of file diff --git a/akka-http/src/main/scala/akka/http/scaladsl/server/Directives.scala b/akka-http/src/main/scala/akka/http/scaladsl/server/Directives.scala index 30a0c9affe..b9add24ce7 100644 --- a/akka-http/src/main/scala/akka/http/scaladsl/server/Directives.scala +++ b/akka-http/src/main/scala/akka/http/scaladsl/server/Directives.scala @@ -4,8 +4,15 @@ package akka.http.scaladsl.server +import java.io.File + +import akka.http.scaladsl.model.Multipart +import akka.stream.scaladsl.{ Source, Sink } +import akka.util.ByteString import directives._ +import scala.util.Try + trait Directives extends RouteConcatenation with BasicDirectives with CacheConditionDirectives diff --git a/akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala b/akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala index 5067ac614b..fcef4e3243 100644 --- a/akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala @@ -5,7 +5,7 @@ package akka.stream import java.util.Locale import java.util.concurrent.TimeUnit -import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.atomic.{ AtomicInteger, AtomicBoolean } import akka.actor.{ ActorContext, ActorRef, ActorRefFactory, ActorSystem, ExtendedActorSystem, Props } import akka.stream.impl._ @@ -57,7 +57,7 @@ object ActorMaterializer { system, materializerSettings, system.dispatchers, - context.actorOf(StreamSupervisor.props(materializerSettings, haveShutDown).withDispatcher(materializerSettings.dispatcher)), + context.actorOf(StreamSupervisor.props(materializerSettings, haveShutDown).withDispatcher(materializerSettings.dispatcher), StreamSupervisor.nextName()), haveShutDown, FlowNameCounter(system).counter, namePrefix) diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala index 71c88bcf25..037c488f31 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala @@ -44,7 +44,7 @@ private[akka] case class ActorMaterializerImpl(system: ActorSystem, override def withNamePrefix(name: String): Materializer = this.copy(namePrefix = name) - private[this] def nextFlowNameCount(): Long = flowNameCounter.incrementAndGet() + private[this] def nextFlowNameCount(): Long = flowNameCounter.incrementAndGet() // TODO use SeqActorName instead private[this] def createFlowName(): String = s"$namePrefix-${nextFlowNameCount()}" @@ -215,6 +215,9 @@ private[akka] object StreamSupervisor { def props(settings: ActorMaterializerSettings, haveShutDown: AtomicBoolean): Props = Props(new StreamSupervisor(settings, haveShutDown)).withDeploy(Deploy.local) + private val actorName = new SeqActorName("StreamSupervisor") + def nextName(): String = actorName.next() + final case class Materialize(props: Props, name: String) extends DeadLetterSuppression with NoSerializationVerificationNeeded diff --git a/akka-stream/src/main/scala/akka/stream/impl/Modules.scala b/akka-stream/src/main/scala/akka/stream/impl/Modules.scala index b9320af0b3..4b13df795c 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Modules.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Modules.scala @@ -3,6 +3,8 @@ */ package akka.stream.impl +import java.util.concurrent.atomic.AtomicInteger + import akka.actor._ import akka.stream._ import akka.stream.impl.AcknowledgePublisher.{ Ok, Rejected } @@ -96,7 +98,8 @@ private[akka] final class ActorPublisherSource[Out](props: Props, val attributes (akka.stream.actor.ActorPublisher[Out](publisherRef), publisherRef) } - override protected def newInstance(shape: SourceShape[Out]): SourceModule[Out, ActorRef] = new ActorPublisherSource[Out](props, attributes, shape) + override protected def newInstance(shape: SourceShape[Out]): SourceModule[Out, ActorRef] = + new ActorPublisherSource[Out](props, attributes, shape) override def withAttributes(attr: Attributes): Module = new ActorPublisherSource(props, attr, amendShape(attr)) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/SeqActorName.scala b/akka-stream/src/main/scala/akka/stream/impl/SeqActorName.scala new file mode 100644 index 0000000000..f88949c1a2 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/impl/SeqActorName.scala @@ -0,0 +1,18 @@ +/* + * Copyright (C) 2009-2015 Typesafe Inc. + */ + +package akka.stream.impl + +import java.util.concurrent.atomic.AtomicLong + +/** + * INTERNAL API + * As discussed in https://github.com/akka/akka/issues/16613 + * + * Generator of sequentially numbered actor names. + * Pulled out from HTTP internals, most often used used by streams which materialize actors directly + */ +private[akka] final class SeqActorName(prefix: String) extends AtomicLong { + def next(): String = prefix + '-' + getAndIncrement() +}