2014-11-28 10:41:57 +01:00
|
|
|
/**
|
|
|
|
|
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
|
|
|
|
|
*/
|
2014-12-16 11:48:42 +01:00
|
|
|
package akka.stream.scaladsl
|
2014-11-28 10:41:57 +01:00
|
|
|
|
|
|
|
|
import java.net.{ InetSocketAddress, URLEncoder }
|
2015-01-28 14:19:50 +01:00
|
|
|
import akka.stream.impl.StreamLayout.Module
|
2014-11-27 22:57:10 +01:00
|
|
|
import scala.collection.immutable
|
|
|
|
|
import scala.concurrent.{ Promise, ExecutionContext, Future }
|
2014-12-16 11:48:42 +01:00
|
|
|
import scala.concurrent.duration.Duration
|
|
|
|
|
import scala.util.{ Failure, Success }
|
|
|
|
|
import scala.util.control.NoStackTrace
|
|
|
|
|
import akka.actor.Actor
|
|
|
|
|
import akka.actor.ActorRef
|
|
|
|
|
import akka.actor.ActorSystem
|
|
|
|
|
import akka.actor.ExtendedActorSystem
|
|
|
|
|
import akka.actor.ExtensionId
|
|
|
|
|
import akka.actor.ExtensionIdProvider
|
|
|
|
|
import akka.actor.Props
|
2014-11-27 22:57:10 +01:00
|
|
|
import akka.io.Inet.SocketOption
|
|
|
|
|
import akka.io.Tcp
|
2015-01-28 14:19:50 +01:00
|
|
|
import akka.stream._
|
2014-11-27 22:57:10 +01:00
|
|
|
import akka.stream.impl._
|
2014-12-16 11:48:42 +01:00
|
|
|
import akka.stream.scaladsl._
|
|
|
|
|
import akka.util.ByteString
|
2015-01-28 14:19:50 +01:00
|
|
|
import org.reactivestreams.{ Publisher, Processor, Subscriber, Subscription }
|
|
|
|
|
import akka.actor.actorRef2Scala
|
2014-12-16 11:48:42 +01:00
|
|
|
import akka.stream.impl.io.TcpStreamActor
|
|
|
|
|
import akka.stream.impl.io.TcpListenStreamActor
|
|
|
|
|
import akka.stream.impl.io.DelayedInitProcessor
|
|
|
|
|
import akka.stream.impl.io.StreamTcpManager
|
2014-11-28 10:41:57 +01:00
|
|
|
|
2014-12-16 11:48:42 +01:00
|
|
|
object StreamTcp extends ExtensionId[StreamTcp] with ExtensionIdProvider {
|
2014-11-28 10:41:57 +01:00
|
|
|
|
|
|
|
|
/**
|
2015-01-28 14:19:50 +01:00
|
|
|
* * Represents a succdessful TCP server binding.
|
2014-11-28 10:41:57 +01:00
|
|
|
*/
|
2015-01-28 14:19:50 +01:00
|
|
|
case class ServerBinding(localAddress: InetSocketAddress)(private val unbindAction: () ⇒ Future[Unit]) {
|
|
|
|
|
def unbind(): Future[Unit] = unbindAction()
|
2014-11-27 22:57:10 +01:00
|
|
|
}
|
2014-11-28 10:41:57 +01:00
|
|
|
|
|
|
|
|
/**
|
2014-11-27 22:57:10 +01:00
|
|
|
* Represents an accepted incoming TCP connection.
|
2014-11-28 10:41:57 +01:00
|
|
|
*/
|
2015-01-28 14:19:50 +01:00
|
|
|
case class IncomingConnection(
|
|
|
|
|
localAddress: InetSocketAddress,
|
|
|
|
|
remoteAddress: InetSocketAddress,
|
|
|
|
|
flow: Flow[ByteString, ByteString, Unit]) {
|
2014-11-27 22:57:10 +01:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Handles the connection using the given flow, which is materialized exactly once and the respective
|
2015-01-28 14:19:50 +01:00
|
|
|
* materialized instance is returned.
|
2014-11-27 22:57:10 +01:00
|
|
|
*
|
|
|
|
|
* Convenience shortcut for: `flow.join(handler).run()`.
|
|
|
|
|
*/
|
2015-01-28 14:19:50 +01:00
|
|
|
def handleWith[Mat](handler: Flow[ByteString, ByteString, Mat])(implicit materializer: ActorFlowMaterializer): Mat =
|
|
|
|
|
flow.joinMat(handler)(Keep.right).run()
|
2014-11-27 22:57:10 +01:00
|
|
|
|
|
|
|
|
}
|
2014-11-28 10:41:57 +01:00
|
|
|
|
|
|
|
|
/**
|
2014-11-27 22:57:10 +01:00
|
|
|
* Represents a prospective outgoing TCP connection.
|
2014-11-28 10:41:57 +01:00
|
|
|
*/
|
2015-01-28 14:19:50 +01:00
|
|
|
case class OutgoingConnection(remoteAddress: InetSocketAddress, localAddress: InetSocketAddress)
|
2014-11-27 22:57:10 +01:00
|
|
|
|
2014-12-16 11:48:42 +01:00
|
|
|
def apply()(implicit system: ActorSystem): StreamTcp = super.apply(system)
|
2014-11-27 22:57:10 +01:00
|
|
|
|
2014-12-16 11:48:42 +01:00
|
|
|
override def get(system: ActorSystem): StreamTcp = super.get(system)
|
2014-11-27 22:57:10 +01:00
|
|
|
|
|
|
|
|
def lookup() = StreamTcp
|
|
|
|
|
|
2014-12-16 11:48:42 +01:00
|
|
|
def createExtension(system: ExtendedActorSystem): StreamTcp = new StreamTcp(system)
|
2014-11-27 22:57:10 +01:00
|
|
|
}
|
|
|
|
|
|
2014-12-16 11:48:42 +01:00
|
|
|
class StreamTcp(system: ExtendedActorSystem) extends akka.actor.Extension {
|
2014-11-27 22:57:10 +01:00
|
|
|
import StreamTcp._
|
|
|
|
|
import system.dispatcher
|
|
|
|
|
|
|
|
|
|
private val manager: ActorRef = system.systemActorOf(Props[StreamTcpManager], name = "IO-TCP-STREAM")
|
2014-11-28 10:41:57 +01:00
|
|
|
|
2015-01-28 14:19:50 +01:00
|
|
|
private class BindSource(
|
|
|
|
|
val endpoint: InetSocketAddress,
|
|
|
|
|
val backlog: Int,
|
|
|
|
|
val options: immutable.Traversable[SocketOption],
|
|
|
|
|
val idleTimeout: Duration = Duration.Inf,
|
|
|
|
|
val attributes: OperationAttributes,
|
|
|
|
|
_shape: SourceShape[IncomingConnection]) extends SourceModule[IncomingConnection, Future[ServerBinding]](_shape) {
|
|
|
|
|
|
|
|
|
|
override def create(materializer: ActorFlowMaterializerImpl, flowName: String): (Publisher[IncomingConnection], Future[ServerBinding]) = {
|
|
|
|
|
val localAddressPromise = Promise[InetSocketAddress]()
|
|
|
|
|
val unbindPromise = Promise[() ⇒ Future[Unit]]()
|
|
|
|
|
val publisher = new Publisher[IncomingConnection] {
|
|
|
|
|
|
|
|
|
|
override def subscribe(s: Subscriber[_ >: IncomingConnection]): Unit = {
|
|
|
|
|
manager ! StreamTcpManager.Bind(
|
|
|
|
|
localAddressPromise,
|
|
|
|
|
unbindPromise,
|
|
|
|
|
s.asInstanceOf[Subscriber[IncomingConnection]],
|
|
|
|
|
endpoint,
|
|
|
|
|
backlog,
|
|
|
|
|
options,
|
|
|
|
|
idleTimeout)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
val bindingFuture = unbindPromise.future.zip(localAddressPromise.future).map {
|
|
|
|
|
case (unbindAction, localAddress) ⇒
|
|
|
|
|
ServerBinding(localAddress)(unbindAction)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
(publisher, bindingFuture)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
override protected def newInstance(s: SourceShape[IncomingConnection]): SourceModule[IncomingConnection, Future[ServerBinding]] =
|
|
|
|
|
new BindSource(endpoint, backlog, options, idleTimeout, attributes, shape)
|
|
|
|
|
override def withAttributes(attr: OperationAttributes): Module =
|
|
|
|
|
new BindSource(endpoint, backlog, options, idleTimeout, attr, shape)
|
|
|
|
|
}
|
|
|
|
|
|
2014-11-28 10:41:57 +01:00
|
|
|
/**
|
2014-12-16 11:48:42 +01:00
|
|
|
* Creates a [[StreamTcp.ServerBinding]] instance which represents a prospective TCP server binding on the given `endpoint`.
|
2014-11-28 10:41:57 +01:00
|
|
|
*/
|
2014-11-27 22:57:10 +01:00
|
|
|
def bind(endpoint: InetSocketAddress,
|
|
|
|
|
backlog: Int = 100,
|
|
|
|
|
options: immutable.Traversable[SocketOption] = Nil,
|
2015-01-28 14:19:50 +01:00
|
|
|
idleTimeout: Duration = Duration.Inf): Source[IncomingConnection, Future[ServerBinding]] = {
|
|
|
|
|
new Source(new BindSource(endpoint, backlog, options, idleTimeout, OperationAttributes.none, SourceShape(new Outlet("BindSource.out"))))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
def bindAndHandle(
|
|
|
|
|
handler: Flow[ByteString, ByteString, _],
|
|
|
|
|
endpoint: InetSocketAddress,
|
|
|
|
|
backlog: Int = 100,
|
|
|
|
|
options: immutable.Traversable[SocketOption] = Nil,
|
|
|
|
|
idleTimeout: Duration = Duration.Inf)(implicit m: ActorFlowMaterializer): Future[ServerBinding] = {
|
|
|
|
|
bind(endpoint, backlog, options, idleTimeout).to(Sink.foreach { conn: IncomingConnection ⇒
|
|
|
|
|
conn.flow.join(handler).run()
|
|
|
|
|
}).run()
|
2014-11-28 10:41:57 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
2014-12-16 11:48:42 +01:00
|
|
|
* Creates an [[StreamTcp.OutgoingConnection]] instance representing a prospective TCP client connection to the given endpoint.
|
2014-11-28 10:41:57 +01:00
|
|
|
*/
|
2014-11-27 22:57:10 +01:00
|
|
|
def outgoingConnection(remoteAddress: InetSocketAddress,
|
|
|
|
|
localAddress: Option[InetSocketAddress] = None,
|
|
|
|
|
options: immutable.Traversable[SocketOption] = Nil,
|
|
|
|
|
connectTimeout: Duration = Duration.Inf,
|
2015-01-28 14:19:50 +01:00
|
|
|
idleTimeout: Duration = Duration.Inf): Flow[ByteString, ByteString, Future[OutgoingConnection]] = {
|
|
|
|
|
|
2014-11-27 22:57:10 +01:00
|
|
|
val remoteAddr = remoteAddress
|
2015-01-28 14:19:50 +01:00
|
|
|
|
|
|
|
|
Flow[ByteString].andThenMat(() ⇒ {
|
2014-11-27 22:57:10 +01:00
|
|
|
val processorPromise = Promise[Processor[ByteString, ByteString]]()
|
|
|
|
|
val localAddressPromise = Promise[InetSocketAddress]()
|
|
|
|
|
manager ! StreamTcpManager.Connect(processorPromise, localAddressPromise, remoteAddress, localAddress, options,
|
|
|
|
|
connectTimeout, idleTimeout)
|
2015-01-28 14:19:50 +01:00
|
|
|
val outgoingConnection = localAddressPromise.future.map(OutgoingConnection(remoteAddress, _))
|
|
|
|
|
(new DelayedInitProcessor[ByteString, ByteString](processorPromise.future), outgoingConnection)
|
|
|
|
|
})
|
|
|
|
|
|
2014-11-27 22:57:10 +01:00
|
|
|
}
|
|
|
|
|
}
|
2014-11-28 10:41:57 +01:00
|
|
|
|