2014-11-28 10:41:57 +01:00
|
|
|
/**
|
2016-02-23 12:58:39 +01:00
|
|
|
* Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com>
|
2014-11-28 10:41:57 +01:00
|
|
|
*/
|
2014-12-16 11:48:42 +01:00
|
|
|
package akka.stream.scaladsl
|
2014-11-28 10:41:57 +01:00
|
|
|
|
2015-10-23 06:41:02 -07:00
|
|
|
import java.net.InetSocketAddress
|
|
|
|
|
|
2016-01-20 10:00:37 +02:00
|
|
|
import akka.NotUsed
|
2015-05-29 16:43:02 +02:00
|
|
|
import akka.actor._
|
2014-11-27 22:57:10 +01:00
|
|
|
import akka.io.Inet.SocketOption
|
2015-11-12 12:58:58 +01:00
|
|
|
import akka.io.{ IO, Tcp ⇒ IoTcp }
|
2015-01-28 14:19:50 +01:00
|
|
|
import akka.stream._
|
2015-12-14 17:02:00 +01:00
|
|
|
import akka.stream.impl.fusing.GraphStages.detacher
|
2015-11-12 12:58:58 +01:00
|
|
|
import akka.stream.impl.io.{ ConnectionSourceStage, OutgoingConnectionStage }
|
2014-12-16 11:48:42 +01:00
|
|
|
import akka.util.ByteString
|
2015-10-23 06:41:02 -07:00
|
|
|
|
|
|
|
|
import scala.collection.immutable
|
2015-11-12 12:58:58 +01:00
|
|
|
import scala.concurrent.Future
|
2015-10-23 06:41:02 -07:00
|
|
|
import scala.concurrent.duration.{ Duration, FiniteDuration }
|
2014-11-28 10:41:57 +01:00
|
|
|
|
2015-04-24 13:15:02 +02:00
|
|
|
object Tcp extends ExtensionId[Tcp] with ExtensionIdProvider {
|
2014-11-28 10:41:57 +01:00
|
|
|
|
|
|
|
|
/**
|
2015-09-28 22:23:59 -07:00
|
|
|
* * Represents a successful TCP server binding.
|
2014-11-28 10:41:57 +01:00
|
|
|
*/
|
2016-01-25 11:49:02 +01:00
|
|
|
final case class ServerBinding(localAddress: InetSocketAddress)(private val unbindAction: () ⇒ Future[Unit]) {
|
2015-01-28 14:19:50 +01:00
|
|
|
def unbind(): Future[Unit] = unbindAction()
|
2014-11-27 22:57:10 +01:00
|
|
|
}
|
2014-11-28 10:41:57 +01:00
|
|
|
|
|
|
|
|
/**
|
2014-11-27 22:57:10 +01:00
|
|
|
* Represents an accepted incoming TCP connection.
|
2014-11-28 10:41:57 +01:00
|
|
|
*/
|
2016-01-25 11:49:02 +01:00
|
|
|
final case class IncomingConnection(
|
2016-06-02 14:06:57 +02:00
|
|
|
localAddress: InetSocketAddress,
|
2015-01-28 14:19:50 +01:00
|
|
|
remoteAddress: InetSocketAddress,
|
2016-06-02 14:06:57 +02:00
|
|
|
flow: Flow[ByteString, ByteString, NotUsed]) {
|
2014-11-27 22:57:10 +01:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Handles the connection using the given flow, which is materialized exactly once and the respective
|
2015-01-28 14:19:50 +01:00
|
|
|
* materialized instance is returned.
|
2014-11-27 22:57:10 +01:00
|
|
|
*
|
|
|
|
|
* Convenience shortcut for: `flow.join(handler).run()`.
|
|
|
|
|
*/
|
2015-06-23 18:28:53 +02:00
|
|
|
def handleWith[Mat](handler: Flow[ByteString, ByteString, Mat])(implicit materializer: Materializer): Mat =
|
2015-01-28 14:19:50 +01:00
|
|
|
flow.joinMat(handler)(Keep.right).run()
|
2014-11-27 22:57:10 +01:00
|
|
|
|
|
|
|
|
}
|
2014-11-28 10:41:57 +01:00
|
|
|
|
|
|
|
|
/**
|
2014-11-27 22:57:10 +01:00
|
|
|
* Represents a prospective outgoing TCP connection.
|
2014-11-28 10:41:57 +01:00
|
|
|
*/
|
2016-01-25 11:49:02 +01:00
|
|
|
final case class OutgoingConnection(remoteAddress: InetSocketAddress, localAddress: InetSocketAddress)
|
2014-11-27 22:57:10 +01:00
|
|
|
|
2015-04-24 13:15:02 +02:00
|
|
|
def apply()(implicit system: ActorSystem): Tcp = super.apply(system)
|
2014-11-27 22:57:10 +01:00
|
|
|
|
2015-04-24 13:15:02 +02:00
|
|
|
override def get(system: ActorSystem): Tcp = super.get(system)
|
2014-11-27 22:57:10 +01:00
|
|
|
|
2015-04-24 13:15:02 +02:00
|
|
|
def lookup() = Tcp
|
2014-11-27 22:57:10 +01:00
|
|
|
|
2015-04-24 13:15:02 +02:00
|
|
|
def createExtension(system: ExtendedActorSystem): Tcp = new Tcp(system)
|
2014-11-27 22:57:10 +01:00
|
|
|
}
|
|
|
|
|
|
2016-01-25 11:49:02 +01:00
|
|
|
final class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension {
|
2015-04-24 13:15:02 +02:00
|
|
|
import Tcp._
|
2014-11-27 22:57:10 +01:00
|
|
|
|
2015-11-30 12:49:12 +01:00
|
|
|
// TODO maybe this should be a new setting, like `akka.stream.tcp.bind.timeout` / `shutdown-timeout` instead?
|
|
|
|
|
val bindShutdownTimeout = ActorMaterializer()(system).settings.subscriptionTimeoutSettings.timeout
|
|
|
|
|
|
2014-11-28 10:41:57 +01:00
|
|
|
/**
|
2015-04-24 13:15:02 +02:00
|
|
|
* Creates a [[Tcp.ServerBinding]] instance which represents a prospective TCP server binding on the given `endpoint`.
|
2015-06-19 13:30:48 +02:00
|
|
|
*
|
2015-07-09 11:49:32 +02:00
|
|
|
* Please note that the startup of the server is asynchronous, i.e. after materializing the enclosing
|
|
|
|
|
* [[akka.stream.scaladsl.RunnableGraph]] the server is not immediately available. Only after the materialized future
|
|
|
|
|
* completes is the server ready to accept client connections.
|
|
|
|
|
*
|
2015-06-19 13:30:48 +02:00
|
|
|
* @param interface The interface to listen on
|
|
|
|
|
* @param port The port to listen on
|
|
|
|
|
* @param backlog Controls the size of the connection backlog
|
|
|
|
|
* @param options TCP options for the connections, see [[akka.io.Tcp]] for details
|
|
|
|
|
* @param halfClose
|
|
|
|
|
* Controls whether the connection is kept open even after writing has been completed to the accepted
|
|
|
|
|
* TCP connections.
|
|
|
|
|
* If set to true, the connection will implement the TCP half-close mechanism, allowing the client to
|
|
|
|
|
* write to the connection even after the server has finished writing. The TCP socket is only closed
|
|
|
|
|
* after both the client and server finished writing.
|
|
|
|
|
* If set to false, the connection will immediately closed once the server closes its write side,
|
|
|
|
|
* independently whether the client is still attempting to write. This setting is recommended
|
|
|
|
|
* for servers, and therefore it is the default setting.
|
2014-11-28 10:41:57 +01:00
|
|
|
*/
|
2016-06-02 14:06:57 +02:00
|
|
|
def bind(
|
|
|
|
|
interface: String,
|
|
|
|
|
port: Int,
|
|
|
|
|
backlog: Int = 100,
|
|
|
|
|
options: immutable.Traversable[SocketOption] = Nil,
|
|
|
|
|
halfClose: Boolean = false,
|
|
|
|
|
idleTimeout: Duration = Duration.Inf): Source[IncomingConnection, Future[ServerBinding]] =
|
2015-11-12 12:58:58 +01:00
|
|
|
Source.fromGraph(new ConnectionSourceStage(
|
|
|
|
|
IO(IoTcp)(system),
|
|
|
|
|
new InetSocketAddress(interface, port),
|
|
|
|
|
backlog,
|
|
|
|
|
options,
|
|
|
|
|
halfClose,
|
2015-11-30 12:49:12 +01:00
|
|
|
idleTimeout,
|
|
|
|
|
bindShutdownTimeout))
|
2015-01-28 14:19:50 +01:00
|
|
|
|
2015-06-19 13:30:48 +02:00
|
|
|
/**
|
|
|
|
|
* Creates a [[Tcp.ServerBinding]] instance which represents a prospective TCP server binding on the given `endpoint`
|
|
|
|
|
* handling the incoming connections using the provided Flow.
|
|
|
|
|
*
|
2015-07-09 11:49:32 +02:00
|
|
|
* Please note that the startup of the server is asynchronous, i.e. after materializing the enclosing
|
|
|
|
|
* [[akka.stream.scaladsl.RunnableGraph]] the server is not immediately available. Only after the returned future
|
|
|
|
|
* completes is the server ready to accept client connections.
|
|
|
|
|
*
|
2015-06-19 13:30:48 +02:00
|
|
|
* @param handler A Flow that represents the server logic
|
|
|
|
|
* @param interface The interface to listen on
|
|
|
|
|
* @param port The port to listen on
|
|
|
|
|
* @param backlog Controls the size of the connection backlog
|
|
|
|
|
* @param options TCP options for the connections, see [[akka.io.Tcp]] for details
|
|
|
|
|
* @param halfClose
|
|
|
|
|
* Controls whether the connection is kept open even after writing has been completed to the accepted
|
|
|
|
|
* TCP connections.
|
|
|
|
|
* If set to true, the connection will implement the TCP half-close mechanism, allowing the client to
|
|
|
|
|
* write to the connection even after the server has finished writing. The TCP socket is only closed
|
|
|
|
|
* after both the client and server finished writing.
|
|
|
|
|
* If set to false, the connection will immediately closed once the server closes its write side,
|
|
|
|
|
* independently whether the client is still attempting to write. This setting is recommended
|
|
|
|
|
* for servers, and therefore it is the default setting.
|
|
|
|
|
*/
|
2015-01-28 14:19:50 +01:00
|
|
|
def bindAndHandle(
|
2016-06-02 14:06:57 +02:00
|
|
|
handler: Flow[ByteString, ByteString, _],
|
|
|
|
|
interface: String,
|
|
|
|
|
port: Int,
|
|
|
|
|
backlog: Int = 100,
|
|
|
|
|
options: immutable.Traversable[SocketOption] = Nil,
|
|
|
|
|
halfClose: Boolean = false,
|
|
|
|
|
idleTimeout: Duration = Duration.Inf)(implicit m: Materializer): Future[ServerBinding] = {
|
2015-06-19 13:30:48 +02:00
|
|
|
bind(interface, port, backlog, options, halfClose, idleTimeout).to(Sink.foreach { conn: IncomingConnection ⇒
|
2015-01-28 14:19:50 +01:00
|
|
|
conn.flow.join(handler).run()
|
|
|
|
|
}).run()
|
2014-11-28 10:41:57 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
2015-04-24 13:15:02 +02:00
|
|
|
* Creates an [[Tcp.OutgoingConnection]] instance representing a prospective TCP client connection to the given endpoint.
|
2015-06-19 13:30:48 +02:00
|
|
|
*
|
|
|
|
|
* @param remoteAddress The remote address to connect to
|
|
|
|
|
* @param localAddress Optional local address for the connection
|
|
|
|
|
* @param options TCP options for the connections, see [[akka.io.Tcp]] for details
|
|
|
|
|
* @param halfClose
|
|
|
|
|
* Controls whether the connection is kept open even after writing has been completed to the accepted
|
|
|
|
|
* TCP connections.
|
|
|
|
|
* If set to true, the connection will implement the TCP half-close mechanism, allowing the server to
|
|
|
|
|
* write to the connection even after the client has finished writing. The TCP socket is only closed
|
|
|
|
|
* after both the client and server finished writing. This setting is recommended for clients and
|
|
|
|
|
* therefore it is the default setting.
|
|
|
|
|
* If set to false, the connection will immediately closed once the client closes its write side,
|
|
|
|
|
* independently whether the server is still attempting to write.
|
2014-11-28 10:41:57 +01:00
|
|
|
*/
|
2016-06-02 14:06:57 +02:00
|
|
|
def outgoingConnection(
|
|
|
|
|
remoteAddress: InetSocketAddress,
|
|
|
|
|
localAddress: Option[InetSocketAddress] = None,
|
|
|
|
|
options: immutable.Traversable[SocketOption] = Nil,
|
|
|
|
|
halfClose: Boolean = true,
|
|
|
|
|
connectTimeout: Duration = Duration.Inf,
|
|
|
|
|
idleTimeout: Duration = Duration.Inf): Flow[ByteString, ByteString, Future[OutgoingConnection]] = {
|
2015-01-28 14:19:50 +01:00
|
|
|
|
2015-11-12 12:58:58 +01:00
|
|
|
val tcpFlow = Flow.fromGraph(new OutgoingConnectionStage(
|
|
|
|
|
IO(IoTcp)(system),
|
|
|
|
|
remoteAddress,
|
|
|
|
|
localAddress,
|
|
|
|
|
options,
|
|
|
|
|
halfClose,
|
2015-12-14 17:02:00 +01:00
|
|
|
connectTimeout)).via(detacher[ByteString]) // must read ahead for proper completions
|
2015-11-12 12:58:58 +01:00
|
|
|
|
|
|
|
|
idleTimeout match {
|
|
|
|
|
case d: FiniteDuration ⇒ tcpFlow.join(BidiFlow.bidirectionalIdleTimeout[ByteString, ByteString](d))
|
|
|
|
|
case _ ⇒ tcpFlow
|
2015-10-23 06:41:02 -07:00
|
|
|
}
|
2015-01-28 14:19:50 +01:00
|
|
|
|
2014-11-27 22:57:10 +01:00
|
|
|
}
|
2015-04-24 12:31:23 +02:00
|
|
|
|
|
|
|
|
/**
|
2015-04-24 13:15:02 +02:00
|
|
|
* Creates an [[Tcp.OutgoingConnection]] without specifying options.
|
2015-04-24 12:31:23 +02:00
|
|
|
* It represents a prospective TCP client connection to the given endpoint.
|
|
|
|
|
*/
|
|
|
|
|
def outgoingConnection(host: String, port: Int): Flow[ByteString, ByteString, Future[OutgoingConnection]] =
|
|
|
|
|
outgoingConnection(new InetSocketAddress(host, port))
|
2014-11-27 22:57:10 +01:00
|
|
|
}
|