From 057e03ebe2d820a722b6052fcd6a13488fa1326d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Fri, 13 Oct 2017 14:50:00 +0200 Subject: [PATCH] Make TCP server unbinding observable #23798 --- .../test/scala/akka/stream/io/TcpSpec.scala | 4 ++++ .../scala/akka/stream/impl/io/TcpStages.scala | 24 ++++++++++--------- .../main/scala/akka/stream/javadsl/Tcp.scala | 15 ++++++++++-- .../main/scala/akka/stream/scaladsl/Tcp.scala | 19 ++++++++++----- 4 files changed, 43 insertions(+), 19 deletions(-) diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala index 00dad374f0..955ab1b60e 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala @@ -469,9 +469,11 @@ class TcpSpec extends StreamSpec("akka.stream.materializer.subscription-timeout. val resultFuture = Source(testInput).via(Tcp().outgoingConnection(serverAddress)).runFold(ByteString.empty)((acc, in) ⇒ acc ++ in) + binding.whenUnbound.value should be(None) resultFuture.futureValue should be(expectedOutput) binding.unbind().futureValue echoServerFinish.futureValue + binding.whenUnbound.futureValue should be(Done) } "work with a chain of echoes" in { @@ -484,6 +486,7 @@ class TcpSpec extends StreamSpec("akka.stream.materializer.subscription-timeout. // make sure that the server has bound to the socket val binding = bindingFuture.futureValue + binding.whenUnbound.value should be(None) val echoConnection = Tcp().outgoingConnection(serverAddress) @@ -501,6 +504,7 @@ class TcpSpec extends StreamSpec("akka.stream.materializer.subscription-timeout. resultFuture.futureValue should be(expectedOutput) binding.unbind().futureValue echoServerFinish.futureValue + binding.whenUnbound.futureValue should be(Done) } "bind and unbind correctly" in EventFilter[BindException](occurrences = 2).intercept { diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala b/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala index 8c06148b55..89d585deff 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala @@ -5,10 +5,10 @@ package akka.stream.impl.io import java.net.InetSocketAddress import java.util.concurrent.TimeoutException -import java.util.concurrent.atomic.{ AtomicBoolean, AtomicLong } +import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} -import akka.NotUsed -import akka.actor.{ ActorRef, Terminated } +import akka.{Done, NotUsed} +import akka.actor.{ActorRef, Terminated} import akka.annotation.InternalApi import akka.dispatch.ExecutionContexts import akka.io.Inet.SocketOption @@ -17,14 +17,14 @@ import akka.io.Tcp._ import akka.stream._ import akka.stream.impl.ReactiveStreamsCompliance import akka.stream.impl.fusing.GraphStages.detacher -import akka.stream.scaladsl.Tcp.{ OutgoingConnection, ServerBinding } -import akka.stream.scaladsl.{ BidiFlow, Flow, TcpIdleTimeoutException, Tcp ⇒ StreamTcp } +import akka.stream.scaladsl.Tcp.{OutgoingConnection, ServerBinding} +import akka.stream.scaladsl.{BidiFlow, Flow, TcpIdleTimeoutException, Tcp => StreamTcp} import akka.stream.stage._ import akka.util.ByteString import scala.collection.immutable -import scala.concurrent.duration.{ Duration, FiniteDuration } -import scala.concurrent.{ Future, Promise } +import scala.concurrent.duration.{Duration, FiniteDuration} +import scala.concurrent.{Future, Promise} /** * INTERNAL API @@ -76,7 +76,7 @@ import scala.concurrent.{ Future, Promise } // stopped. thisStage.tell(Unbind, thisStage) unbindPromise.future - })) + }, unbindPromise.future.map(_ => Done)(ExecutionContexts.sameThreadExecutionContext))) case f: CommandFailed ⇒ val ex = new BindFailedException { // cannot modify the actual exception class for compatibility reasons @@ -84,7 +84,7 @@ import scala.concurrent.{ Future, Promise } } f.cause.foreach(ex.initCause) bindingPromise.failure(ex) - unbindPromise.success(() ⇒ Future.successful(())) + unbindPromise.failure(ex) failStage(ex) case c: Connected ⇒ push(out, connectionFor(c, sender)) @@ -96,8 +96,10 @@ import scala.concurrent.{ Future, Promise } if (unbindStarted) { unbindCompleted() } else { - failStage(new IllegalStateException("IO Listener actor terminated unexpectedly for remote endpoint [" + - endpoint.getHostString + ":" + endpoint.getPort + "]")) + val ex = new IllegalStateException("IO Listener actor terminated unexpectedly for remote endpoint [" + + endpoint.getHostString + ":" + endpoint.getPort + "]") + unbindPromise.failure(ex) + failStage(ex) } } } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Tcp.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Tcp.scala index e6b13cf0ce..ccd0f4d47e 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Tcp.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Tcp.scala @@ -3,11 +3,14 @@ */ package akka.stream.javadsl -import java.lang.{ Iterable ⇒ JIterable } +import java.lang.{Iterable => JIterable} import java.util.Optional -import akka.NotUsed + +import akka.{Done, NotUsed} + import scala.concurrent.duration._ import java.net.InetSocketAddress + import akka.actor.ActorSystem import akka.actor.ExtendedActorSystem import akka.actor.ExtensionId @@ -17,10 +20,13 @@ import akka.stream.scaladsl import akka.util.ByteString import akka.japi.Util.immutableSeq import akka.io.Inet.SocketOption + import scala.compat.java8.OptionConverters._ import scala.compat.java8.FutureConverters._ import java.util.concurrent.CompletionStage +import scala.concurrent.Future + object Tcp extends ExtensionId[Tcp] with ExtensionIdProvider { /** @@ -39,6 +45,11 @@ object Tcp extends ExtensionId[Tcp] with ExtensionIdProvider { * The produced [[java.util.concurrent.CompletionStage]] is fulfilled when the unbinding has been completed. */ def unbind(): CompletionStage[Unit] = delegate.unbind().toJava + + /** + * @return A completion stage that is completed when manually unbound, or failed if the server fails + */ + def whenUnbound(): CompletionStage[Done] = delegate.whenUnbound.toJava } /** diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Tcp.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Tcp.scala index c7770ac05a..60d4119718 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Tcp.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Tcp.scala @@ -6,26 +6,33 @@ package akka.stream.scaladsl import java.net.InetSocketAddress import java.util.concurrent.TimeoutException -import akka.NotUsed +import akka.{Done, NotUsed} import akka.actor._ import akka.io.Inet.SocketOption -import akka.io.{ IO, Tcp ⇒ IoTcp } +import akka.io.{IO, Tcp => IoTcp} import akka.stream._ import akka.stream.impl.fusing.GraphStages.detacher -import akka.stream.impl.io.{ ConnectionSourceStage, OutgoingConnectionStage, TcpIdleTimeout } +import akka.stream.impl.io.{ConnectionSourceStage, OutgoingConnectionStage, TcpIdleTimeout} import akka.util.ByteString import scala.collection.immutable import scala.concurrent.Future -import scala.concurrent.duration.{ Duration, FiniteDuration } +import scala.concurrent.duration.{Duration, FiniteDuration} import scala.util.control.NoStackTrace object Tcp extends ExtensionId[Tcp] with ExtensionIdProvider { /** - * * Represents a successful TCP server binding. + * Represents a successful TCP server binding. + * + * @param localAddress The address the server was bound to + * @param unbindAction a function that will trigger unbind of the server + * @param whenUnbound A future that is completed when the server is unbound, or failed if the server binding fails */ - final case class ServerBinding(localAddress: InetSocketAddress)(private val unbindAction: () ⇒ Future[Unit]) { + final case class ServerBinding(localAddress: InetSocketAddress)( + private val unbindAction: () ⇒ Future[Unit], + val whenUnbound: Future[Done] + ) { def unbind(): Future[Unit] = unbindAction() }