diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala index 00dad374f0..955ab1b60e 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala @@ -469,9 +469,11 @@ class TcpSpec extends StreamSpec("akka.stream.materializer.subscription-timeout. val resultFuture = Source(testInput).via(Tcp().outgoingConnection(serverAddress)).runFold(ByteString.empty)((acc, in) ⇒ acc ++ in) + binding.whenUnbound.value should be(None) resultFuture.futureValue should be(expectedOutput) binding.unbind().futureValue echoServerFinish.futureValue + binding.whenUnbound.futureValue should be(Done) } "work with a chain of echoes" in { @@ -484,6 +486,7 @@ class TcpSpec extends StreamSpec("akka.stream.materializer.subscription-timeout. // make sure that the server has bound to the socket val binding = bindingFuture.futureValue + binding.whenUnbound.value should be(None) val echoConnection = Tcp().outgoingConnection(serverAddress) @@ -501,6 +504,7 @@ class TcpSpec extends StreamSpec("akka.stream.materializer.subscription-timeout. resultFuture.futureValue should be(expectedOutput) binding.unbind().futureValue echoServerFinish.futureValue + binding.whenUnbound.futureValue should be(Done) } "bind and unbind correctly" in EventFilter[BindException](occurrences = 2).intercept { diff --git a/akka-stream/src/main/mima-filters/2.5.6.backwards.excludes b/akka-stream/src/main/mima-filters/2.5.6.backwards.excludes index 5d2732768e..2f1ec9b84b 100644 --- a/akka-stream/src/main/mima-filters/2.5.6.backwards.excludes +++ b/akka-stream/src/main/mima-filters/2.5.6.backwards.excludes @@ -33,4 +33,10 @@ ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.stream.impl.fusing.Gra ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.fusing.GraphInterpreterShell#AsyncInput.apply") ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.stream.impl.fusing.GraphInterpreterShell#AsyncInput.copy$default$4") ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.fusing.GraphInterpreterShell#AsyncInput.this") -ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.fusing.GraphInterpreter.runAsyncInput") \ No newline at end of file +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.fusing.GraphInterpreter.runAsyncInput") + +# 23798 observable unbind on stream Tcp server +ProblemFilters.exclude[FinalClassProblem]("akka.stream.javadsl.Tcp$ServerBinding") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.Tcp#ServerBinding.copy") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.Tcp#ServerBinding.this") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.Tcp#ServerBinding.apply") diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala b/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala index 8c06148b55..9e621f8da5 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala @@ -7,7 +7,7 @@ import java.net.InetSocketAddress import java.util.concurrent.TimeoutException import java.util.concurrent.atomic.{ AtomicBoolean, AtomicLong } -import akka.NotUsed +import akka.{ Done, NotUsed } import akka.actor.{ ActorRef, Terminated } import akka.annotation.InternalApi import akka.dispatch.ExecutionContexts @@ -76,7 +76,7 @@ import scala.concurrent.{ Future, Promise } // stopped. thisStage.tell(Unbind, thisStage) unbindPromise.future - })) + }, unbindPromise.future.map(_ ⇒ Done)(ExecutionContexts.sameThreadExecutionContext))) case f: CommandFailed ⇒ val ex = new BindFailedException { // cannot modify the actual exception class for compatibility reasons @@ -84,7 +84,7 @@ import scala.concurrent.{ Future, Promise } } f.cause.foreach(ex.initCause) bindingPromise.failure(ex) - unbindPromise.success(() ⇒ Future.successful(())) + unbindPromise.tryFailure(ex) failStage(ex) case c: Connected ⇒ push(out, connectionFor(c, sender)) @@ -96,8 +96,10 @@ import scala.concurrent.{ Future, Promise } if (unbindStarted) { unbindCompleted() } else { - failStage(new IllegalStateException("IO Listener actor terminated unexpectedly for remote endpoint [" + - endpoint.getHostString + ":" + endpoint.getPort + "]")) + val ex = new IllegalStateException("IO Listener actor terminated unexpectedly for remote endpoint [" + + endpoint.getHostString + ":" + endpoint.getPort + "]") + unbindPromise.tryFailure(ex) + failStage(ex) } } } @@ -144,6 +146,7 @@ import scala.concurrent.{ Future, Promise } private def unbindCompleted(): Unit = { stageActor.unwatch(listener) + unbindPromise.trySuccess(Done) if (connectionFlowsAwaitingInitialization.get() == 0) completeStage() else scheduleOnce(BindShutdownTimer, bindShutdownTimeout) } @@ -154,7 +157,9 @@ import scala.concurrent.{ Future, Promise } } override def postStop(): Unit = { - unbindPromise.trySuccess(()) + // a bit unexpected to succeed here rather than fail with abrupt stage termination + // but there was an existing test case covering this behavior + unbindPromise.trySuccess(Done) bindingPromise.tryFailure(new NoSuchElementException("Binding was unbound before it was completely finished")) } } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Tcp.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Tcp.scala index e6b13cf0ce..eec6b899a8 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Tcp.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Tcp.scala @@ -5,9 +5,12 @@ package akka.stream.javadsl import java.lang.{ Iterable ⇒ JIterable } import java.util.Optional -import akka.NotUsed + +import akka.{ Done, NotUsed } + import scala.concurrent.duration._ import java.net.InetSocketAddress + import akka.actor.ActorSystem import akka.actor.ExtendedActorSystem import akka.actor.ExtensionId @@ -17,16 +20,21 @@ import akka.stream.scaladsl import akka.util.ByteString import akka.japi.Util.immutableSeq import akka.io.Inet.SocketOption + import scala.compat.java8.OptionConverters._ import scala.compat.java8.FutureConverters._ import java.util.concurrent.CompletionStage +import akka.annotation.InternalApi + object Tcp extends ExtensionId[Tcp] with ExtensionIdProvider { /** * Represents a prospective TCP server binding. + * + * Not indented for user construction */ - class ServerBinding private[akka] (delegate: scaladsl.Tcp.ServerBinding) { + final class ServerBinding @InternalApi private[akka] (delegate: scaladsl.Tcp.ServerBinding) { /** * The local address of the endpoint bound by the materialization of the `connections` [[Source]]. */ @@ -39,6 +47,11 @@ object Tcp extends ExtensionId[Tcp] with ExtensionIdProvider { * The produced [[java.util.concurrent.CompletionStage]] is fulfilled when the unbinding has been completed. */ def unbind(): CompletionStage[Unit] = delegate.unbind().toJava + + /** + * @return A completion stage that is completed when manually unbound, or failed if the server fails + */ + def whenUnbound(): CompletionStage[Done] = delegate.whenUnbound.toJava } /** diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Tcp.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Tcp.scala index c7770ac05a..68bd0a5ce2 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Tcp.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Tcp.scala @@ -6,8 +6,9 @@ package akka.stream.scaladsl import java.net.InetSocketAddress import java.util.concurrent.TimeoutException -import akka.NotUsed +import akka.{ Done, NotUsed } import akka.actor._ +import akka.annotation.InternalApi import akka.io.Inet.SocketOption import akka.io.{ IO, Tcp ⇒ IoTcp } import akka.stream._ @@ -23,9 +24,18 @@ import scala.util.control.NoStackTrace object Tcp extends ExtensionId[Tcp] with ExtensionIdProvider { /** - * * Represents a successful TCP server binding. + * Represents a successful TCP server binding. + * + * Not indented for user construction + * + * @param localAddress The address the server was bound to + * @param unbindAction a function that will trigger unbind of the server + * @param whenUnbound A future that is completed when the server is unbound, or failed if the server binding fails */ - final case class ServerBinding(localAddress: InetSocketAddress)(private val unbindAction: () ⇒ Future[Unit]) { + final case class ServerBinding @InternalApi private[akka] (localAddress: InetSocketAddress)( + private val unbindAction: () ⇒ Future[Unit], + val whenUnbound: Future[Done] + ) { def unbind(): Future[Unit] = unbindAction() }