Merge pull request #23805 from johanandren/wip-23798-observe-tcp-unbind-johanandren

Make TCP server unbinding observable
This commit is contained in:
Patrik Nordwall 2017-11-17 12:22:27 +01:00 committed by GitHub
commit edcc2b2d75
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 50 additions and 12 deletions

View file

@ -469,9 +469,11 @@ class TcpSpec extends StreamSpec("akka.stream.materializer.subscription-timeout.
val resultFuture =
Source(testInput).via(Tcp().outgoingConnection(serverAddress)).runFold(ByteString.empty)((acc, in) acc ++ in)
binding.whenUnbound.value should be(None)
resultFuture.futureValue should be(expectedOutput)
binding.unbind().futureValue
echoServerFinish.futureValue
binding.whenUnbound.futureValue should be(Done)
}
"work with a chain of echoes" in {
@ -484,6 +486,7 @@ class TcpSpec extends StreamSpec("akka.stream.materializer.subscription-timeout.
// make sure that the server has bound to the socket
val binding = bindingFuture.futureValue
binding.whenUnbound.value should be(None)
val echoConnection = Tcp().outgoingConnection(serverAddress)
@ -501,6 +504,7 @@ class TcpSpec extends StreamSpec("akka.stream.materializer.subscription-timeout.
resultFuture.futureValue should be(expectedOutput)
binding.unbind().futureValue
echoServerFinish.futureValue
binding.whenUnbound.futureValue should be(Done)
}
"bind and unbind correctly" in EventFilter[BindException](occurrences = 2).intercept {

View file

@ -33,4 +33,10 @@ ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.stream.impl.fusing.Gra
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.fusing.GraphInterpreterShell#AsyncInput.apply")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.stream.impl.fusing.GraphInterpreterShell#AsyncInput.copy$default$4")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.fusing.GraphInterpreterShell#AsyncInput.this")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.fusing.GraphInterpreter.runAsyncInput")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.fusing.GraphInterpreter.runAsyncInput")
# 23798 observable unbind on stream Tcp server
ProblemFilters.exclude[FinalClassProblem]("akka.stream.javadsl.Tcp$ServerBinding")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.Tcp#ServerBinding.copy")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.Tcp#ServerBinding.this")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.Tcp#ServerBinding.apply")

View file

@ -7,7 +7,7 @@ import java.net.InetSocketAddress
import java.util.concurrent.TimeoutException
import java.util.concurrent.atomic.{ AtomicBoolean, AtomicLong }
import akka.NotUsed
import akka.{ Done, NotUsed }
import akka.actor.{ ActorRef, Terminated }
import akka.annotation.InternalApi
import akka.dispatch.ExecutionContexts
@ -76,7 +76,7 @@ import scala.concurrent.{ Future, Promise }
// stopped.
thisStage.tell(Unbind, thisStage)
unbindPromise.future
}))
}, unbindPromise.future.map(_ Done)(ExecutionContexts.sameThreadExecutionContext)))
case f: CommandFailed
val ex = new BindFailedException {
// cannot modify the actual exception class for compatibility reasons
@ -84,7 +84,7 @@ import scala.concurrent.{ Future, Promise }
}
f.cause.foreach(ex.initCause)
bindingPromise.failure(ex)
unbindPromise.success(() Future.successful(()))
unbindPromise.tryFailure(ex)
failStage(ex)
case c: Connected
push(out, connectionFor(c, sender))
@ -96,8 +96,10 @@ import scala.concurrent.{ Future, Promise }
if (unbindStarted) {
unbindCompleted()
} else {
failStage(new IllegalStateException("IO Listener actor terminated unexpectedly for remote endpoint [" +
endpoint.getHostString + ":" + endpoint.getPort + "]"))
val ex = new IllegalStateException("IO Listener actor terminated unexpectedly for remote endpoint [" +
endpoint.getHostString + ":" + endpoint.getPort + "]")
unbindPromise.tryFailure(ex)
failStage(ex)
}
}
}
@ -144,6 +146,7 @@ import scala.concurrent.{ Future, Promise }
private def unbindCompleted(): Unit = {
stageActor.unwatch(listener)
unbindPromise.trySuccess(Done)
if (connectionFlowsAwaitingInitialization.get() == 0) completeStage()
else scheduleOnce(BindShutdownTimer, bindShutdownTimeout)
}
@ -154,7 +157,9 @@ import scala.concurrent.{ Future, Promise }
}
override def postStop(): Unit = {
unbindPromise.trySuccess(())
// a bit unexpected to succeed here rather than fail with abrupt stage termination
// but there was an existing test case covering this behavior
unbindPromise.trySuccess(Done)
bindingPromise.tryFailure(new NoSuchElementException("Binding was unbound before it was completely finished"))
}
}

View file

@ -5,9 +5,12 @@ package akka.stream.javadsl
import java.lang.{ Iterable JIterable }
import java.util.Optional
import akka.NotUsed
import akka.{ Done, NotUsed }
import scala.concurrent.duration._
import java.net.InetSocketAddress
import akka.actor.ActorSystem
import akka.actor.ExtendedActorSystem
import akka.actor.ExtensionId
@ -17,16 +20,21 @@ import akka.stream.scaladsl
import akka.util.ByteString
import akka.japi.Util.immutableSeq
import akka.io.Inet.SocketOption
import scala.compat.java8.OptionConverters._
import scala.compat.java8.FutureConverters._
import java.util.concurrent.CompletionStage
import akka.annotation.InternalApi
object Tcp extends ExtensionId[Tcp] with ExtensionIdProvider {
/**
* Represents a prospective TCP server binding.
*
* Not indented for user construction
*/
class ServerBinding private[akka] (delegate: scaladsl.Tcp.ServerBinding) {
final class ServerBinding @InternalApi private[akka] (delegate: scaladsl.Tcp.ServerBinding) {
/**
* The local address of the endpoint bound by the materialization of the `connections` [[Source]].
*/
@ -39,6 +47,11 @@ object Tcp extends ExtensionId[Tcp] with ExtensionIdProvider {
* The produced [[java.util.concurrent.CompletionStage]] is fulfilled when the unbinding has been completed.
*/
def unbind(): CompletionStage[Unit] = delegate.unbind().toJava
/**
* @return A completion stage that is completed when manually unbound, or failed if the server fails
*/
def whenUnbound(): CompletionStage[Done] = delegate.whenUnbound.toJava
}
/**

View file

@ -6,8 +6,9 @@ package akka.stream.scaladsl
import java.net.InetSocketAddress
import java.util.concurrent.TimeoutException
import akka.NotUsed
import akka.{ Done, NotUsed }
import akka.actor._
import akka.annotation.InternalApi
import akka.io.Inet.SocketOption
import akka.io.{ IO, Tcp IoTcp }
import akka.stream._
@ -23,9 +24,18 @@ import scala.util.control.NoStackTrace
object Tcp extends ExtensionId[Tcp] with ExtensionIdProvider {
/**
* * Represents a successful TCP server binding.
* Represents a successful TCP server binding.
*
* Not indented for user construction
*
* @param localAddress The address the server was bound to
* @param unbindAction a function that will trigger unbind of the server
* @param whenUnbound A future that is completed when the server is unbound, or failed if the server binding fails
*/
final case class ServerBinding(localAddress: InetSocketAddress)(private val unbindAction: () Future[Unit]) {
final case class ServerBinding @InternalApi private[akka] (localAddress: InetSocketAddress)(
private val unbindAction: () Future[Unit],
val whenUnbound: Future[Done]
) {
def unbind(): Future[Unit] = unbindAction()
}