diff --git a/akka-http-core/src/main/scala/akka/http/Http.scala b/akka-http-core/src/main/scala/akka/http/Http.scala index f8cdccd072..dcbc74b597 100644 --- a/akka-http-core/src/main/scala/akka/http/Http.scala +++ b/akka-http-core/src/main/scala/akka/http/Http.scala @@ -32,7 +32,7 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E val effectiveSettings = ServerSettings(settings) val tcpBinding = StreamTcp().bind(endpoint, backlog, options, effectiveSettings.timeouts.idleTimeout) new ServerBinding { - def localAddress(mm: MaterializedMap) = tcpBinding.localAddress(mm) + def localAddress(mm: MaterializedMap): Future[InetSocketAddress] = tcpBinding.localAddress(mm) val connections = tcpBinding.connections map { tcpConn ⇒ new IncomingConnection { def localAddress = tcpConn.localAddress diff --git a/akka-http-core/src/test/scala/akka/http/ClientServerSpec.scala b/akka-http-core/src/test/scala/akka/http/ClientServerSpec.scala index 745a59b192..acb93c39e4 100644 --- a/akka-http-core/src/test/scala/akka/http/ClientServerSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/ClientServerSpec.scala @@ -12,12 +12,11 @@ import scala.concurrent.Await import scala.concurrent.duration._ import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpec } import akka.actor.ActorSystem -import akka.stream.scaladsl.StreamTcp +import akka.stream.scaladsl._ import akka.stream.BindFailedException import akka.stream.ActorFlowMaterializer import akka.stream.testkit.StreamTestKit import akka.stream.testkit.StreamTestKit.{ PublisherProbe, SubscriberProbe } -import akka.stream.scaladsl._ import akka.http.engine.client.ClientConnectionSettings import akka.http.engine.server.ServerSettings import akka.http.model._ @@ -42,27 +41,52 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll { val (hostname, port) = temporaryServerHostnameAndPort() val binding = Http().bind(hostname, port) val probe = StreamTestKit.SubscriberProbe[Http.IncomingConnection]() - binding.connections.runWith(Sink(probe)) - val sub = probe.expectSubscription() // if we get it we are bound + val mm = binding.connections.to(Sink(probe)).run() + val sub = probe.expectSubscription() + // if the future finishes successfully, we are bound + val address = Await.result(binding.localAddress(mm), 1.second) sub.cancel() } - "report failure if bind fails" in pendingUntilFixed { // FIXME: "unpend"! + "report failure if bind fails" in { val (hostname, port) = temporaryServerHostnameAndPort() val binding = Http().bind(hostname, port) val probe1 = StreamTestKit.SubscriberProbe[Http.IncomingConnection]() val mm1 = binding.connections.to(Sink(probe1)).run() probe1.expectSubscription() - val probe2 = StreamTestKit.SubscriberProbe[Http.IncomingConnection]() - binding.connections.runWith(Sink(probe2)) - probe2.expectError(BindFailedException) + // Bind succeeded, we have a local address + Await.result(binding.localAddress(mm1), 1.second) + + val probe2 = StreamTestKit.SubscriberProbe[Http.IncomingConnection]() + val mm2 = binding.connections.to(Sink(probe2)).run() + probe2.expectErrorOrSubscriptionFollowedByError() - Await.result(binding.unbind(mm1), 1.second) val probe3 = StreamTestKit.SubscriberProbe[Http.IncomingConnection]() val mm3 = binding.connections.to(Sink(probe3)).run() - probe3.expectSubscription() // we bound a second time, which means the previous unbind was successful + probe3.expectErrorOrSubscriptionFollowedByError() + + an[BindFailedException] shouldBe thrownBy { Await.result(binding.localAddress(mm2), 1.second) } + an[BindFailedException] shouldBe thrownBy { Await.result(binding.localAddress(mm3), 1.second) } + + // The unbind should NOT fail even though the bind failed. + Await.result(binding.unbind(mm2), 1.second) Await.result(binding.unbind(mm3), 1.second) + + // Now unbind the first + Await.result(binding.unbind(mm1), 1.second) + probe1.expectComplete() + + if (!akka.util.Helpers.isWindows) { + val probe4 = StreamTestKit.SubscriberProbe[Http.IncomingConnection]() + val mm4 = binding.connections.to(Sink(probe4)).run() + probe4.expectSubscription() + + // Bind succeeded, we have a local address + Await.result(binding.localAddress(mm4), 1.second) + // clean up + Await.result(binding.unbind(mm4), 1.second) + } } "properly complete a simple request/response cycle" in new TestSetup { diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/StreamTcpSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/StreamTcpSpec.scala index 42b5b56ecd..d6b0284c84 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/StreamTcpSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/StreamTcpSpec.scala @@ -3,11 +3,13 @@ */ package akka.stream.io +import akka.stream.BindFailedException + import scala.concurrent.Await import scala.concurrent.duration._ import akka.util.ByteString import akka.stream.scaladsl.Flow -import akka.stream.testkit.AkkaSpec +import akka.stream.testkit.{ StreamTestKit, AkkaSpec } import akka.stream.scaladsl._ import akka.stream.testkit.TestUtils.temporaryServerAddress @@ -233,6 +235,45 @@ class StreamTcpSpec extends AkkaSpec with TcpHelper { Await.result(echoServerFinish, 1.second) } + "bind and unbind correctly" in { + val address = temporaryServerAddress() + val binding = StreamTcp(system).bind(address) + val probe1 = StreamTestKit.SubscriberProbe[StreamTcp.IncomingConnection]() + val mm1 = binding.connections.to(Sink(probe1)).run() + probe1.expectSubscription() + + // Bind succeeded, we have a local address + Await.result(binding.localAddress(mm1), 1.second) + + val probe2 = StreamTestKit.SubscriberProbe[StreamTcp.IncomingConnection]() + val mm2 = binding.connections.to(Sink(probe2)).run() + probe2.expectErrorOrSubscriptionFollowedByError(BindFailedException) + + val probe3 = StreamTestKit.SubscriberProbe[StreamTcp.IncomingConnection]() + val mm3 = binding.connections.to(Sink(probe3)).run() + probe3.expectErrorOrSubscriptionFollowedByError() + + // The unbind should NOT fail even though the bind failed. + Await.result(binding.unbind(mm2), 1.second) + Await.result(binding.unbind(mm3), 1.second) + + an[BindFailedException] shouldBe thrownBy { Await.result(binding.localAddress(mm2), 1.second) } + an[BindFailedException] shouldBe thrownBy { Await.result(binding.localAddress(mm3), 1.second) } + + // Now unbind first + Await.result(binding.unbind(mm1), 1.second) + probe1.expectComplete() + + val probe4 = StreamTestKit.SubscriberProbe[StreamTcp.IncomingConnection]() + val mm4 = binding.connections.to(Sink(probe4)).run() + probe4.expectSubscription() + + // Bind succeeded, we have a local address + Await.result(binding.localAddress(mm4), 1.second) + // clean up + Await.result(binding.unbind(mm4), 1.second) + } + } def validateServerClientCommunication(testData: ByteString, diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/TcpListenStreamActor.scala b/akka-stream/src/main/scala/akka/stream/impl/io/TcpListenStreamActor.scala index 824497b485..07540f2496 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/TcpListenStreamActor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/TcpListenStreamActor.scala @@ -89,7 +89,7 @@ private[akka] class TcpListenStreamActor(localAddressPromise: Promise[InetSocket case f: CommandFailed ⇒ val ex = BindFailedException localAddressPromise.failure(ex) - unbindPromise.failure(ex) + unbindPromise.success(() ⇒ Future.successful(())) try tryOnError(flowSubscriber, ex) finally fail(ex) } @@ -100,7 +100,7 @@ private[akka] class TcpListenStreamActor(localAddressPromise: Promise[InetSocket pump() case f: CommandFailed ⇒ val ex = new ConnectionException(s"Command [${f.cmd}] failed") - unbindPromise.tryFailure(ex) + if (f.cmd.isInstanceOf[Unbind.type]) unboundPromise.tryFailure(BindFailedException) fail(ex) case Unbind ⇒ if (!closed && listener != null) listener ! Unbind