Make TCP server unbinding observable #23798

This commit is contained in:
Johan Andrén 2017-10-13 14:50:00 +02:00
parent b51d720b18
commit 057e03ebe2
4 changed files with 43 additions and 19 deletions

View file

@ -469,9 +469,11 @@ class TcpSpec extends StreamSpec("akka.stream.materializer.subscription-timeout.
val resultFuture =
Source(testInput).via(Tcp().outgoingConnection(serverAddress)).runFold(ByteString.empty)((acc, in) acc ++ in)
binding.whenUnbound.value should be(None)
resultFuture.futureValue should be(expectedOutput)
binding.unbind().futureValue
echoServerFinish.futureValue
binding.whenUnbound.futureValue should be(Done)
}
"work with a chain of echoes" in {
@ -484,6 +486,7 @@ class TcpSpec extends StreamSpec("akka.stream.materializer.subscription-timeout.
// make sure that the server has bound to the socket
val binding = bindingFuture.futureValue
binding.whenUnbound.value should be(None)
val echoConnection = Tcp().outgoingConnection(serverAddress)
@ -501,6 +504,7 @@ class TcpSpec extends StreamSpec("akka.stream.materializer.subscription-timeout.
resultFuture.futureValue should be(expectedOutput)
binding.unbind().futureValue
echoServerFinish.futureValue
binding.whenUnbound.futureValue should be(Done)
}
"bind and unbind correctly" in EventFilter[BindException](occurrences = 2).intercept {

View file

@ -5,10 +5,10 @@ package akka.stream.impl.io
import java.net.InetSocketAddress
import java.util.concurrent.TimeoutException
import java.util.concurrent.atomic.{ AtomicBoolean, AtomicLong }
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
import akka.NotUsed
import akka.actor.{ ActorRef, Terminated }
import akka.{Done, NotUsed}
import akka.actor.{ActorRef, Terminated}
import akka.annotation.InternalApi
import akka.dispatch.ExecutionContexts
import akka.io.Inet.SocketOption
@ -17,14 +17,14 @@ import akka.io.Tcp._
import akka.stream._
import akka.stream.impl.ReactiveStreamsCompliance
import akka.stream.impl.fusing.GraphStages.detacher
import akka.stream.scaladsl.Tcp.{ OutgoingConnection, ServerBinding }
import akka.stream.scaladsl.{ BidiFlow, Flow, TcpIdleTimeoutException, Tcp StreamTcp }
import akka.stream.scaladsl.Tcp.{OutgoingConnection, ServerBinding}
import akka.stream.scaladsl.{BidiFlow, Flow, TcpIdleTimeoutException, Tcp => StreamTcp}
import akka.stream.stage._
import akka.util.ByteString
import scala.collection.immutable
import scala.concurrent.duration.{ Duration, FiniteDuration }
import scala.concurrent.{ Future, Promise }
import scala.concurrent.duration.{Duration, FiniteDuration}
import scala.concurrent.{Future, Promise}
/**
* INTERNAL API
@ -76,7 +76,7 @@ import scala.concurrent.{ Future, Promise }
// stopped.
thisStage.tell(Unbind, thisStage)
unbindPromise.future
}))
}, unbindPromise.future.map(_ => Done)(ExecutionContexts.sameThreadExecutionContext)))
case f: CommandFailed
val ex = new BindFailedException {
// cannot modify the actual exception class for compatibility reasons
@ -84,7 +84,7 @@ import scala.concurrent.{ Future, Promise }
}
f.cause.foreach(ex.initCause)
bindingPromise.failure(ex)
unbindPromise.success(() Future.successful(()))
unbindPromise.failure(ex)
failStage(ex)
case c: Connected
push(out, connectionFor(c, sender))
@ -96,8 +96,10 @@ import scala.concurrent.{ Future, Promise }
if (unbindStarted) {
unbindCompleted()
} else {
failStage(new IllegalStateException("IO Listener actor terminated unexpectedly for remote endpoint [" +
endpoint.getHostString + ":" + endpoint.getPort + "]"))
val ex = new IllegalStateException("IO Listener actor terminated unexpectedly for remote endpoint [" +
endpoint.getHostString + ":" + endpoint.getPort + "]")
unbindPromise.failure(ex)
failStage(ex)
}
}
}

View file

@ -3,11 +3,14 @@
*/
package akka.stream.javadsl
import java.lang.{ Iterable JIterable }
import java.lang.{Iterable => JIterable}
import java.util.Optional
import akka.NotUsed
import akka.{Done, NotUsed}
import scala.concurrent.duration._
import java.net.InetSocketAddress
import akka.actor.ActorSystem
import akka.actor.ExtendedActorSystem
import akka.actor.ExtensionId
@ -17,10 +20,13 @@ import akka.stream.scaladsl
import akka.util.ByteString
import akka.japi.Util.immutableSeq
import akka.io.Inet.SocketOption
import scala.compat.java8.OptionConverters._
import scala.compat.java8.FutureConverters._
import java.util.concurrent.CompletionStage
import scala.concurrent.Future
object Tcp extends ExtensionId[Tcp] with ExtensionIdProvider {
/**
@ -39,6 +45,11 @@ object Tcp extends ExtensionId[Tcp] with ExtensionIdProvider {
* The produced [[java.util.concurrent.CompletionStage]] is fulfilled when the unbinding has been completed.
*/
def unbind(): CompletionStage[Unit] = delegate.unbind().toJava
/**
* @return A completion stage that is completed when manually unbound, or failed if the server fails
*/
def whenUnbound(): CompletionStage[Done] = delegate.whenUnbound.toJava
}
/**

View file

@ -6,26 +6,33 @@ package akka.stream.scaladsl
import java.net.InetSocketAddress
import java.util.concurrent.TimeoutException
import akka.NotUsed
import akka.{Done, NotUsed}
import akka.actor._
import akka.io.Inet.SocketOption
import akka.io.{ IO, Tcp IoTcp }
import akka.io.{IO, Tcp => IoTcp}
import akka.stream._
import akka.stream.impl.fusing.GraphStages.detacher
import akka.stream.impl.io.{ ConnectionSourceStage, OutgoingConnectionStage, TcpIdleTimeout }
import akka.stream.impl.io.{ConnectionSourceStage, OutgoingConnectionStage, TcpIdleTimeout}
import akka.util.ByteString
import scala.collection.immutable
import scala.concurrent.Future
import scala.concurrent.duration.{ Duration, FiniteDuration }
import scala.concurrent.duration.{Duration, FiniteDuration}
import scala.util.control.NoStackTrace
object Tcp extends ExtensionId[Tcp] with ExtensionIdProvider {
/**
* * Represents a successful TCP server binding.
* Represents a successful TCP server binding.
*
* @param localAddress The address the server was bound to
* @param unbindAction a function that will trigger unbind of the server
* @param whenUnbound A future that is completed when the server is unbound, or failed if the server binding fails
*/
final case class ServerBinding(localAddress: InetSocketAddress)(private val unbindAction: () Future[Unit]) {
final case class ServerBinding(localAddress: InetSocketAddress)(
private val unbindAction: () Future[Unit],
val whenUnbound: Future[Done]
) {
def unbind(): Future[Unit] = unbindAction()
}