=str: Make TCP fusable (first attempts)
This commit is contained in:
parent
248bad9ddd
commit
18843de175
7 changed files with 385 additions and 701 deletions
|
|
@ -7,18 +7,15 @@ import java.net.InetSocketAddress
|
|||
|
||||
import akka.actor._
|
||||
import akka.io.Inet.SocketOption
|
||||
import akka.io.{ Tcp ⇒ IoTcp }
|
||||
import akka.io.{ IO, Tcp ⇒ IoTcp }
|
||||
import akka.stream._
|
||||
import akka.stream.impl.ReactiveStreamsCompliance._
|
||||
import akka.stream.impl.StreamLayout.Module
|
||||
import akka.stream.impl._
|
||||
import akka.stream.impl.io.{ DelayedInitProcessor, StreamTcpManager }
|
||||
import akka.stream.impl.fusing.GraphStages.Detacher
|
||||
import akka.stream.impl.io.{ ConnectionSourceStage, OutgoingConnectionStage }
|
||||
import akka.util.ByteString
|
||||
import org.reactivestreams.{ Processor, Publisher, Subscriber }
|
||||
|
||||
import scala.collection.immutable
|
||||
import scala.concurrent.Future
|
||||
import scala.concurrent.duration.{ Duration, FiniteDuration }
|
||||
import scala.concurrent.{ Future, Promise }
|
||||
|
||||
object Tcp extends ExtensionId[Tcp] with ExtensionIdProvider {
|
||||
|
||||
|
|
@ -65,53 +62,6 @@ object Tcp extends ExtensionId[Tcp] with ExtensionIdProvider {
|
|||
class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension {
|
||||
import Tcp._
|
||||
|
||||
private val manager: ActorRef = system.systemActorOf(Props[StreamTcpManager]
|
||||
.withDispatcher(IoTcp(system).Settings.ManagementDispatcher).withDeploy(Deploy.local), name = "IO-TCP-STREAM")
|
||||
|
||||
private class BindSource(
|
||||
val endpoint: InetSocketAddress,
|
||||
val backlog: Int,
|
||||
val options: immutable.Traversable[SocketOption],
|
||||
val halfClose: Boolean,
|
||||
val idleTimeout: Duration = Duration.Inf,
|
||||
val attributes: Attributes,
|
||||
_shape: SourceShape[IncomingConnection]) extends SourceModule[IncomingConnection, Future[ServerBinding]](_shape) {
|
||||
|
||||
override def create(context: MaterializationContext): (Publisher[IncomingConnection], Future[ServerBinding]) = {
|
||||
val localAddressPromise = Promise[InetSocketAddress]()
|
||||
val unbindPromise = Promise[() ⇒ Future[Unit]]()
|
||||
val publisher = new Publisher[IncomingConnection] {
|
||||
|
||||
override def subscribe(s: Subscriber[_ >: IncomingConnection]): Unit = {
|
||||
requireNonNullSubscriber(s)
|
||||
manager ! StreamTcpManager.Bind(
|
||||
localAddressPromise,
|
||||
unbindPromise,
|
||||
s.asInstanceOf[Subscriber[IncomingConnection]],
|
||||
endpoint,
|
||||
backlog,
|
||||
halfClose,
|
||||
options,
|
||||
idleTimeout)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
import system.dispatcher
|
||||
val bindingFuture = unbindPromise.future.zip(localAddressPromise.future).map {
|
||||
case (unbindAction, localAddress) ⇒
|
||||
ServerBinding(localAddress)(unbindAction)
|
||||
}
|
||||
|
||||
(publisher, bindingFuture)
|
||||
}
|
||||
|
||||
override protected def newInstance(s: SourceShape[IncomingConnection]): SourceModule[IncomingConnection, Future[ServerBinding]] =
|
||||
new BindSource(endpoint, backlog, options, halfClose, idleTimeout, attributes, shape)
|
||||
override def withAttributes(attr: Attributes): Module =
|
||||
new BindSource(endpoint, backlog, options, halfClose, idleTimeout, attr, shape)
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a [[Tcp.ServerBinding]] instance which represents a prospective TCP server binding on the given `endpoint`.
|
||||
*
|
||||
|
|
@ -138,10 +88,14 @@ class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension {
|
|||
backlog: Int = 100,
|
||||
options: immutable.Traversable[SocketOption] = Nil,
|
||||
halfClose: Boolean = false,
|
||||
idleTimeout: Duration = Duration.Inf): Source[IncomingConnection, Future[ServerBinding]] = {
|
||||
new Source(new BindSource(new InetSocketAddress(interface, port), backlog, options, halfClose, idleTimeout,
|
||||
Attributes.none, SourceShape(Outlet("BindSource.out"))))
|
||||
}
|
||||
idleTimeout: Duration = Duration.Inf): Source[IncomingConnection, Future[ServerBinding]] =
|
||||
Source.fromGraph(new ConnectionSourceStage(
|
||||
IO(IoTcp)(system),
|
||||
new InetSocketAddress(interface, port),
|
||||
backlog,
|
||||
options,
|
||||
halfClose,
|
||||
idleTimeout))
|
||||
|
||||
/**
|
||||
* Creates a [[Tcp.ServerBinding]] instance which represents a prospective TCP server binding on the given `endpoint`
|
||||
|
|
@ -202,20 +156,18 @@ class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension {
|
|||
connectTimeout: Duration = Duration.Inf,
|
||||
idleTimeout: Duration = Duration.Inf): Flow[ByteString, ByteString, Future[OutgoingConnection]] = {
|
||||
|
||||
val timeoutHandling = idleTimeout match {
|
||||
case d: FiniteDuration ⇒ Flow[ByteString].join(BidiFlow.bidirectionalIdleTimeout[ByteString, ByteString](d))
|
||||
case _ ⇒ Flow[ByteString]
|
||||
}
|
||||
val tcpFlow = Flow.fromGraph(new OutgoingConnectionStage(
|
||||
IO(IoTcp)(system),
|
||||
remoteAddress,
|
||||
localAddress,
|
||||
options,
|
||||
halfClose,
|
||||
connectTimeout)).via(new Detacher[ByteString]) // must read ahead for proper completions
|
||||
|
||||
Flow[ByteString].deprecatedAndThenMat(() ⇒ {
|
||||
val processorPromise = Promise[Processor[ByteString, ByteString]]()
|
||||
val localAddressPromise = Promise[InetSocketAddress]()
|
||||
manager ! StreamTcpManager.Connect(processorPromise, localAddressPromise, remoteAddress, localAddress, halfClose, options,
|
||||
connectTimeout, idleTimeout)
|
||||
import system.dispatcher
|
||||
val outgoingConnection = localAddressPromise.future.map(OutgoingConnection(remoteAddress, _))
|
||||
(new DelayedInitProcessor[ByteString, ByteString](processorPromise.future), outgoingConnection)
|
||||
}).via(timeoutHandling)
|
||||
idleTimeout match {
|
||||
case d: FiniteDuration ⇒ tcpFlow.join(BidiFlow.bidirectionalIdleTimeout[ByteString, ByteString](d))
|
||||
case _ ⇒ tcpFlow
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue