diff --git a/akka-http-core/src/main/java/akka/http/model/japi/ServerBinding.java b/akka-http-core/src/main/java/akka/http/model/japi/ServerBinding.java index 2d87765558..acd3763803 100644 --- a/akka-http-core/src/main/java/akka/http/model/japi/ServerBinding.java +++ b/akka-http-core/src/main/java/akka/http/model/japi/ServerBinding.java @@ -6,13 +6,14 @@ package akka.http.model.japi; import org.reactivestreams.Publisher; +import java.io.Closeable; import java.net.InetSocketAddress; /** * The binding of a server. Allows access to its own address and to the stream * of incoming connections. */ -public interface ServerBinding { +public interface ServerBinding extends Closeable { /** * The local address this server is listening on. */ diff --git a/akka-http-core/src/main/scala/akka/http/Http.scala b/akka-http-core/src/main/scala/akka/http/Http.scala index c1101e33a4..f9509865c7 100644 --- a/akka-http-core/src/main/scala/akka/http/Http.scala +++ b/akka-http-core/src/main/scala/akka/http/Http.scala @@ -4,6 +4,7 @@ package akka.http +import java.io.Closeable import java.net.InetSocketAddress import com.typesafe.config.Config import org.reactivestreams.{ Publisher, Subscriber } @@ -109,12 +110,19 @@ object Http extends ExtensionKey[HttpExt] { apply(new InetSocketAddress(interface, port), backlog, options, serverSettings, materializerSettings) } - final case class ServerBinding(localAddress: InetSocketAddress, - connectionStream: Publisher[IncomingConnection]) extends model.japi.ServerBinding { + sealed abstract case class ServerBinding(localAddress: InetSocketAddress, + connectionStream: Publisher[IncomingConnection]) extends model.japi.ServerBinding { /** Java API */ def getConnectionStream: Publisher[japi.IncomingConnection] = connectionStream.asInstanceOf[Publisher[japi.IncomingConnection]] } + /** INTERNAL API */ + private[http] final class InternalServerBinding(_localAddress: InetSocketAddress, + _connectionStream: Publisher[IncomingConnection], + closeable: Closeable) extends ServerBinding(_localAddress, _connectionStream) { + override def close() = closeable.close() + } + final case class IncomingConnection(remoteAddress: InetSocketAddress, requestPublisher: Publisher[HttpRequest], responseSubscriber: Subscriber[HttpResponse]) extends model.japi.IncomingConnection { diff --git a/akka-http-core/src/main/scala/akka/http/HttpManager.scala b/akka-http-core/src/main/scala/akka/http/HttpManager.scala index e0404b7082..0b7d0fa064 100644 --- a/akka-http-core/src/main/scala/akka/http/HttpManager.scala +++ b/akka-http-core/src/main/scala/akka/http/HttpManager.scala @@ -60,14 +60,14 @@ private[http] class HttpManager(httpSettings: HttpExt#Settings) extends Actor wi val askTimeout = Timeout(effectiveSettings.bindTimeout + 5.seconds) // FIXME: how can we improve this? val tcpServerBindingFuture = IO(StreamTcp)(context.system).ask(tcpBind)(askTimeout) tcpServerBindingFuture onComplete { - case Success(StreamTcp.TcpServerBinding(localAddress, connectionStream)) ⇒ + case Success(tcpServerBinding @ StreamTcp.TcpServerBinding(localAddress, connectionStream)) ⇒ log.info("Bound to {}", endpoint) implicit val materializer = FlowMaterializer() val httpServerPipeline = new HttpServerPipeline(effectiveSettings, log) val httpConnectionStream = Flow(connectionStream) .map(httpServerPipeline) .toPublisher() - commander ! Http.ServerBinding(localAddress, httpConnectionStream) + commander ! new Http.InternalServerBinding(localAddress, httpConnectionStream, tcpServerBinding) case Failure(error) ⇒ log.warning("Bind to {} failed due to {}", endpoint, error) diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/TcpFlowSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/TcpFlowSpec.scala index 4d3f177d91..414df44b2a 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/TcpFlowSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/TcpFlowSpec.scala @@ -160,7 +160,8 @@ class TcpFlowSpec extends AkkaSpec with TcpHelper { val resultFuture = Flow(conn.inputStream).fold(ByteString.empty)((acc, in) ⇒ acc ++ in).toFuture() Await.result(resultFuture, 3.seconds) should be(expectedOutput) - + server.close() + server.awaitTermination(3.seconds) } "work with a chain of echoes" in { @@ -181,7 +182,8 @@ class TcpFlowSpec extends AkkaSpec with TcpHelper { val resultFuture = Flow(conn3.inputStream).fold(ByteString.empty)((acc, in) ⇒ acc ++ in).toFuture() Await.result(resultFuture, 3.seconds) should be(expectedOutput) - + server.close() + server.awaitTermination(3.seconds) } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/TcpHelper.scala b/akka-stream-tests/src/test/scala/akka/stream/io/TcpHelper.scala index 18a896cea4..bc5593c750 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/TcpHelper.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/TcpHelper.scala @@ -3,6 +3,8 @@ */ package akka.stream.io +import java.io.Closeable + import akka.actor.{ Actor, ActorRef, Props } import akka.io.{ IO, Tcp } import akka.stream.scaladsl.Flow @@ -14,7 +16,8 @@ import java.net.InetSocketAddress import java.nio.channels.ServerSocketChannel import org.reactivestreams.Processor import scala.collection.immutable.Queue -import scala.concurrent.Future +import scala.concurrent.{ Await, Future } +import scala.concurrent.duration.Duration object TcpHelper { case class ClientWrite(bytes: ByteString) @@ -172,6 +175,12 @@ trait TcpHelper { this: TestKitBase ⇒ def close(): Unit = tcpWriteSubscription.sendComplete() } + class EchoServer(termination: Future[Unit], closeable: Closeable) extends Closeable { + def close(): Unit = closeable.close() + def awaitTermination(atMost: Duration): Unit = Await.result(termination, atMost) + def terminationFuture: Future[Unit] = termination + } + def connect(server: Server): (Processor[ByteString, ByteString], ServerConnection) = { val tcpProbe = TestProbe() tcpProbe.send(IO(StreamTcp), StreamTcp.Connect(server.address)) @@ -193,8 +202,10 @@ trait TcpHelper { this: TestKitBase ⇒ bindProbe.expectMsgType[StreamTcp.TcpServerBinding] } - def echoServer(serverAddress: InetSocketAddress = temporaryServerAddress): Future[Unit] = - Flow(bind(serverAddress).connectionStream).foreach { conn ⇒ + def echoServer(serverAddress: InetSocketAddress = temporaryServerAddress): EchoServer = { + val binding = bind(serverAddress) + new EchoServer(Flow(binding.connectionStream).foreach { conn ⇒ conn.inputStream.subscribe(conn.outputStream) - } + }, binding) + } } diff --git a/akka-stream/src/main/scala/akka/stream/io/StreamIO.scala b/akka-stream/src/main/scala/akka/stream/io/StreamIO.scala index 51d16487ae..23a4ee79f1 100644 --- a/akka-stream/src/main/scala/akka/stream/io/StreamIO.scala +++ b/akka-stream/src/main/scala/akka/stream/io/StreamIO.scala @@ -3,6 +3,8 @@ */ package akka.stream.io +import java.io.Closeable + import akka.util.ByteString import org.reactivestreams.{ Processor, Publisher, Subscriber } import java.net.InetSocketAddress @@ -30,8 +32,15 @@ object StreamTcp extends ExtensionId[StreamTcpExt] with ExtensionIdProvider { def inputStream: Publisher[ByteString] = processor } - case class TcpServerBinding(localAddress: InetSocketAddress, - connectionStream: Publisher[IncomingTcpConnection]) + abstract sealed case class TcpServerBinding(localAddress: InetSocketAddress, + connectionStream: Publisher[IncomingTcpConnection]) extends Closeable + + /** INTERNAL API */ + private[io] class InternalTcpServerBinding(_localAddress: InetSocketAddress, + _connectionStream: Publisher[IncomingTcpConnection], + closeable: Closeable) extends TcpServerBinding(_localAddress, _connectionStream) { + override def close() = closeable.close() + } case class IncomingTcpConnection(remoteAddress: InetSocketAddress, inputStream: Publisher[ByteString], diff --git a/akka-stream/src/main/scala/akka/stream/io/TcpListenStreamActor.scala b/akka-stream/src/main/scala/akka/stream/io/TcpListenStreamActor.scala index f93c8bfc27..5a7b363748 100644 --- a/akka-stream/src/main/scala/akka/stream/io/TcpListenStreamActor.scala +++ b/akka-stream/src/main/scala/akka/stream/io/TcpListenStreamActor.scala @@ -3,6 +3,8 @@ */ package akka.stream.io +import java.io.Closeable + import akka.actor._ import akka.io.Tcp._ import akka.io.{ IO, Tcp } @@ -47,9 +49,14 @@ private[akka] class TcpListenStreamActor(bindCmd: Tcp.Bind, requester: ActorRef, def getExposedPublisher = exposedPublisher } + private var finished = false override protected def pumpFinished(): Unit = { - incomingConnections.cancel() - context.stop(self) + if (!finished) { + finished = true + incomingConnections.cancel() + primaryOutputs.complete() + context.stop(self) + } } override protected def pumpFailed(e: Throwable): Unit = fail(e) @@ -64,9 +71,13 @@ private[akka] class TcpListenStreamActor(bindCmd: Tcp.Bind, requester: ActorRef, listener = sender() nextPhase(runningPhase) listener ! ResumeAccepting(1) - requester ! StreamTcp.TcpServerBinding( + val target = self + requester ! new StreamTcp.InternalTcpServerBinding( localAddress, - primaryOutputs.getExposedPublisher.asInstanceOf[Publisher[StreamTcp.IncomingTcpConnection]]) + primaryOutputs.getExposedPublisher.asInstanceOf[Publisher[StreamTcp.IncomingTcpConnection]], + new Closeable { + override def close() = target ! Unbind + }) subreceive.become(running) case f: CommandFailed ⇒ val ex = new TcpListenStreamException("Bind failed") @@ -80,6 +91,12 @@ private[akka] class TcpListenStreamActor(bindCmd: Tcp.Bind, requester: ActorRef, pump() case f: CommandFailed ⇒ fail(new TcpListenStreamException(s"Command [${f.cmd}] failed")) + case Unbind ⇒ + cancel() + pump() + case Unbound ⇒ // If we're unbound then just shut down + closed = true + pump() } override val subreceive = new SubReceive(waitBound) @@ -123,5 +140,4 @@ private[akka] class TcpListenStreamActor(bindCmd: Tcp.Bind, requester: ActorRef, incomingConnections.cancel() primaryOutputs.cancel(e) } - }