Merge pull request #16550 from akka/wip-16400-javadsl-stream-io-patriknw

!str #16400 Add Java API for StreamTcp
This commit is contained in:
Patrik Nordwall 2014-12-18 13:19:20 +01:00
commit ef2835d60e
18 changed files with 548 additions and 167 deletions

View file

@ -12,7 +12,6 @@ import akka.event.LoggingAdapter
import akka.util.ByteString
import akka.io.Inet
import akka.stream.FlowMaterializer
import akka.stream.io.StreamTcp
import akka.stream.scaladsl._
import akka.http.engine.client.{ HttpClient, ClientConnectionSettings }
import akka.http.engine.server.{ HttpServer, ServerSettings }
@ -201,7 +200,7 @@ object Http extends ExtensionId[HttpExt] with ExtensionIdProvider {
* A flow representing the HTTP server on a single HTTP connection.
* This flow can be materialized several times, every materialization will open a new connection to the `remoteAddress`.
* If the connection cannot be established the materialized stream will immediately be terminated
* with a [[StreamTcp.ConnectionAttemptFailedException]].
* with a [[akka.stream.StreamTcpException]].
*/
def flow: Flow[HttpRequest, HttpResponse]
}

View file

@ -12,7 +12,8 @@ import scala.concurrent.Await
import scala.concurrent.duration._
import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpec }
import akka.actor.ActorSystem
import akka.stream.io.StreamTcp
import akka.stream.scaladsl.StreamTcp
import akka.stream.BindFailedException
import akka.stream.FlowMaterializer
import akka.stream.testkit.StreamTestKit
import akka.stream.testkit.StreamTestKit.{ PublisherProbe, SubscriberProbe }
@ -55,7 +56,7 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll {
val probe2 = StreamTestKit.SubscriberProbe[Http.IncomingConnection]()
binding.connections.runWith(Sink(probe2))
probe2.expectError(StreamTcp.BindFailedException)
probe2.expectError(BindFailedException)
Await.result(binding.unbind(mm1), 1.second)
val probe3 = StreamTestKit.SubscriberProbe[Http.IncomingConnection]()

View file

@ -8,7 +8,7 @@ import akka.actor.ActorSystem
import akka.dispatch.MailboxType
import akka.actor.ActorRef
import akka.actor.ActorRefWithCell
import akka.stream.io.StreamTcpManager
import akka.stream.impl.io.StreamTcpManager
import akka.actor.Actor
/**

View file

@ -0,0 +1,33 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.testkit
import scala.collection.immutable
import java.nio.channels.DatagramChannel
import java.nio.channels.ServerSocketChannel
import java.net.InetSocketAddress
import java.net.SocketAddress
object TestUtils { // FIXME: remove once going back to project dependencies
// Structural type needed since DatagramSocket and ServerSocket has no common ancestor apart from Object
type GeneralSocket = {
def bind(sa: SocketAddress): Unit
def close(): Unit
def getLocalPort(): Int
}
def temporaryServerAddress(address: String = "127.0.0.1", udp: Boolean = false): InetSocketAddress =
temporaryServerAddresses(1, address, udp).head
def temporaryServerAddresses(numberOfAddresses: Int, hostname: String = "127.0.0.1", udp: Boolean = false): immutable.IndexedSeq[InetSocketAddress] = {
Vector.fill(numberOfAddresses) {
val serverSocket: GeneralSocket =
if (udp) DatagramChannel.open().socket()
else ServerSocketChannel.open().socket()
serverSocket.bind(new InetSocketAddress(hostname, 0))
(serverSocket, new InetSocketAddress(hostname, serverSocket.getLocalPort))
} collect { case (socket, address) socket.close(); address }
}
}

View file

@ -1,3 +1,6 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.javadsl;
import akka.actor.ActorRef;

View file

@ -1,3 +1,6 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.javadsl;
import akka.actor.ActorRef;

View file

@ -1,3 +1,6 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.javadsl;
import akka.actor.ActorRef;

View file

@ -0,0 +1,123 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.javadsl;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.junit.ClassRule;
import org.junit.Test;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
import akka.stream.BindFailedException;
import akka.stream.StreamTcpException;
import akka.stream.StreamTest;
import akka.stream.javadsl.StreamTcp.IncomingConnection;
import akka.stream.javadsl.StreamTcp.ServerBinding;
import akka.stream.javadsl.japi.Function2;
import akka.stream.javadsl.japi.Procedure;
import akka.stream.testkit.AkkaSpec;
import akka.stream.testkit.TestUtils;
import akka.util.ByteString;
public class StreamTcpTest extends StreamTest {
public StreamTcpTest() {
super(actorSystemResource);
}
@ClassRule
public static AkkaJUnitActorSystemResource actorSystemResource = new AkkaJUnitActorSystemResource("StreamTcpTest",
AkkaSpec.testConf());
final Sink<IncomingConnection> echoHandler =
Sink.foreach(new Procedure<IncomingConnection>() {
public void apply(IncomingConnection conn) {
conn.handleWith(Flow.<ByteString>empty(), materializer);
}
});
final List<ByteString> testInput = new ArrayList<ByteString>();
{
for (char c = 'a'; c <= 'z'; c++) {
testInput.add(ByteString.fromString(String.valueOf(c)));
}
}
@Test
public void mustWorkInHappyCase() throws Exception {
final InetSocketAddress serverAddress = TestUtils.temporaryServerAddress("127.0.0.1", false);
final ServerBinding binding = StreamTcp.get(system).bind(serverAddress);
final MaterializedMap materializedServer = binding.connections().to(echoHandler).run(materializer);
final Future<InetSocketAddress> serverFuture = binding.localAddress(materializedServer);
final InetSocketAddress s = Await.result(serverFuture, FiniteDuration.create(5, TimeUnit.SECONDS));
assertEquals(s.getPort(), serverAddress.getPort());
final Source<ByteString> responseStream =
Source.from(testInput).via(StreamTcp.get(system).outgoingConnection(serverAddress).flow());
final Future<ByteString> resultFuture = responseStream.fold(
ByteString.empty(), new Function2<ByteString, ByteString, ByteString>() {
public ByteString apply(ByteString acc, ByteString elem) {
return acc.concat(elem);
}
}, materializer);
final byte[] result = Await.result(resultFuture, FiniteDuration.create(5, TimeUnit.SECONDS)).toArray();
for (int i = 0; i < testInput.size(); i ++) {
assertEquals(testInput.get(i).head(), result[i]);
}
}
@Test
public void mustReportServerBindFailure() throws Exception {
final InetSocketAddress serverAddress = TestUtils.temporaryServerAddress("127.0.0.1", false);
final ServerBinding binding = StreamTcp.get(system).bind(serverAddress);
final MaterializedMap materializedServer = binding.connections().to(echoHandler).run(materializer);
final Future<InetSocketAddress> serverFuture = binding.localAddress(materializedServer);
final InetSocketAddress s = Await.result(serverFuture, FiniteDuration.create(5, TimeUnit.SECONDS));
assertEquals(s.getPort(), serverAddress.getPort());
// bind again, to same port
final MaterializedMap materializedServer2 = binding.connections().to(echoHandler).run(materializer);
final Future<InetSocketAddress> serverFuture2 = binding.localAddress(materializedServer2);
boolean bindFailed = false;
try {
Await.result(serverFuture2, FiniteDuration.create(5, TimeUnit.SECONDS));
} catch (BindFailedException e) {
// as expected
bindFailed = true;
}
assertTrue("Expected BindFailedException, but nothing was reported", bindFailed);
}
@Test
public void mustReportClientConnectFailure() throws Exception {
final InetSocketAddress serverAddress = TestUtils.temporaryServerAddress("127.0.0.1", false);
final Source<ByteString> responseStream =
Source.from(testInput).via(StreamTcp.get(system).outgoingConnection(serverAddress).flow());
final Future<ByteString> resultFuture = responseStream.runWith(Sink.<ByteString>head(), materializer);
boolean streamTcpException = false;
try {
Await.result(resultFuture, FiniteDuration.create(5, TimeUnit.SECONDS));
} catch (StreamTcpException e) {
// as expected
streamTcpException = true;
}
assertTrue("Expected StreamTcpException, but nothing was reported", streamTcpException);
}
}

View file

@ -9,8 +9,9 @@ import akka.util.ByteString
import akka.stream.scaladsl.Flow
import akka.stream.testkit.AkkaSpec
import akka.stream.scaladsl._
import akka.stream.testkit.TestUtils.temporaryServerAddress
class TcpFlowSpec extends AkkaSpec with TcpHelper {
class StreamTcpSpec extends AkkaSpec with TcpHelper {
import akka.stream.io.TcpHelper._
var demand = 0L
@ -185,7 +186,7 @@ class TcpFlowSpec extends AkkaSpec with TcpHelper {
val echoHandler = ForeachSink[StreamTcp.IncomingConnection] { _ handleWith Flow[ByteString] }
"be able to implement echo" in {
val serverAddress = temporaryServerAddress
val serverAddress = temporaryServerAddress()
val binding = StreamTcp().bind(serverAddress)
val echoServerMM = binding.connections.to(echoHandler).run()
@ -205,7 +206,7 @@ class TcpFlowSpec extends AkkaSpec with TcpHelper {
}
"work with a chain of echoes" in {
val serverAddress = temporaryServerAddress
val serverAddress = temporaryServerAddress()
val binding = StreamTcp(system).bind(serverAddress)
val echoServerMM = binding.connections.to(echoHandler).run()

View file

@ -18,6 +18,7 @@ import scala.collection.immutable.Queue
import scala.concurrent.{ Await, Future }
import scala.concurrent.duration.Duration
import akka.stream.scaladsl.Source
import akka.stream.testkit.TestUtils.temporaryServerAddress
object TcpHelper {
case class ClientWrite(bytes: ByteString)
@ -103,14 +104,6 @@ object TcpHelper {
}
// FIXME: get it from TestUtil
def temporaryServerAddress: InetSocketAddress = {
val serverSocket = ServerSocketChannel.open().socket()
serverSocket.bind(new InetSocketAddress("127.0.0.1", 0))
val address = new InetSocketAddress("127.0.0.1", serverSocket.getLocalPort)
serverSocket.close()
address
}
}
trait TcpHelper { this: TestKitBase
@ -121,7 +114,7 @@ trait TcpHelper { this: TestKitBase ⇒
implicit val materializer = FlowMaterializer(settings)
class Server(val address: InetSocketAddress = temporaryServerAddress) {
class Server(val address: InetSocketAddress = temporaryServerAddress()) {
val serverProbe = TestProbe()
val serverRef = system.actorOf(testServerProps(address, serverProbe.ref))
serverProbe.expectMsgType[Tcp.Bound]

View file

@ -0,0 +1,16 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream
import scala.util.control.NoStackTrace
import java.net.InetSocketAddress
class StreamTcpException(msg: String) extends RuntimeException(msg) with NoStackTrace
abstract class BindFailedException extends StreamTcpException("bind failed")
case object BindFailedException extends BindFailedException
class ConnectionException(msg: String) extends StreamTcpException(msg)

View file

@ -0,0 +1,42 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.impl.io
import scala.concurrent.ExecutionContext
import org.reactivestreams.Subscription
import org.reactivestreams.Processor
import org.reactivestreams.Subscriber
import scala.concurrent.Future
import scala.util.Failure
import scala.util.Success
/**
* INTERNAL API
*/
private[akka] class DelayedInitProcessor[I, O](val implFuture: Future[Processor[I, O]])(implicit ec: ExecutionContext) extends Processor[I, O] {
@volatile private var impl: Processor[I, O] = _
private val setVarFuture = implFuture.andThen { case Success(p) impl = p }
override def onSubscribe(s: Subscription): Unit = setVarFuture.onComplete {
case Success(x) x.onSubscribe(s)
case Failure(_) s.cancel()
}
override def onError(t: Throwable): Unit = {
if (impl eq null) setVarFuture.onSuccess { case p p.onError(t) }
else impl.onError(t)
}
override def onComplete(): Unit = {
if (impl eq null) setVarFuture.onSuccess { case p p.onComplete() }
else impl.onComplete()
}
override def onNext(t: I): Unit = impl.onNext(t)
override def subscribe(s: Subscriber[_ >: O]): Unit = setVarFuture.onComplete {
case Success(x) x.subscribe(s)
case Failure(e) s.onError(e)
}
}

View file

@ -0,0 +1,87 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.impl.io
import java.net.InetSocketAddress
import java.net.URLEncoder
import scala.collection.immutable
import scala.concurrent.Future
import scala.concurrent.Promise
import scala.concurrent.duration.Duration
import scala.concurrent.duration.FiniteDuration
import akka.actor.Actor
import akka.io.Inet.SocketOption
import akka.io.Tcp
import akka.stream.MaterializerSettings
import akka.stream.impl.ActorProcessor
import akka.stream.impl.ActorPublisher
import akka.stream.scaladsl.StreamTcp
import akka.util.ByteString
import org.reactivestreams.Processor
import org.reactivestreams.Subscriber
/**
* INTERNAL API
*/
private[akka] object StreamTcpManager {
/**
* INTERNAL API
*/
private[akka] case class Connect(processorPromise: Promise[Processor[ByteString, ByteString]],
localAddressPromise: Promise[InetSocketAddress],
remoteAddress: InetSocketAddress,
localAddress: Option[InetSocketAddress],
options: immutable.Traversable[SocketOption],
connectTimeout: Duration,
idleTimeout: Duration)
/**
* INTERNAL API
*/
private[akka] case class Bind(localAddressPromise: Promise[InetSocketAddress],
unbindPromise: Promise[() Future[Unit]],
flowSubscriber: Subscriber[StreamTcp.IncomingConnection],
endpoint: InetSocketAddress,
backlog: Int,
options: immutable.Traversable[SocketOption],
idleTimeout: Duration)
/**
* INTERNAL API
*/
private[akka] case class ExposedProcessor(processor: Processor[ByteString, ByteString])
}
/**
* INTERNAL API
*/
private[akka] class StreamTcpManager extends Actor {
import StreamTcpManager._
var nameCounter = 0
def encName(prefix: String, endpoint: InetSocketAddress) = {
nameCounter += 1
s"$prefix-$nameCounter-${URLEncoder.encode(endpoint.toString, "utf-8")}"
}
def receive: Receive = {
case Connect(processorPromise, localAddressPromise, remoteAddress, localAddress, options, connectTimeout, _)
val connTimeout = connectTimeout match {
case x: FiniteDuration Some(x)
case _ None
}
val processorActor = context.actorOf(TcpStreamActor.outboundProps(processorPromise, localAddressPromise,
Tcp.Connect(remoteAddress, localAddress, options, connTimeout, pullMode = true),
materializerSettings = MaterializerSettings(context.system)), name = encName("client", remoteAddress))
processorActor ! ExposedProcessor(ActorProcessor[ByteString, ByteString](processorActor))
case Bind(localAddressPromise, unbindPromise, flowSubscriber, endpoint, backlog, options, _)
val publisherActor = context.actorOf(TcpListenStreamActor.props(localAddressPromise, unbindPromise,
flowSubscriber, Tcp.Bind(context.system.deadLetters, endpoint, backlog, options, pullMode = true),
MaterializerSettings(context.system)), name = encName("server", endpoint))
// this sends the ExposedPublisher message to the publisher actor automatically
ActorPublisher[Any](publisherActor)
}
}

View file

@ -1,27 +1,26 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.io
package akka.stream.impl.io
import java.net.InetSocketAddress
import akka.io.{ IO, Tcp }
import scala.concurrent.Promise
import scala.util.control.NoStackTrace
import akka.actor.{ ActorRefFactory, Actor, Props, ActorRef, Status }
import akka.stream.impl._
import akka.util.ByteString
import akka.io.Tcp._
import akka.stream.MaterializerSettings
import akka.stream.StreamTcpException
import org.reactivestreams.Processor
import akka.actor.Stash
import akka.stream.impl._
/**
* INTERNAL API
*/
private[akka] object TcpStreamActor {
case object WriteAck extends Tcp.Event
class TcpStreamException(msg: String) extends RuntimeException(msg) with NoStackTrace
def outboundProps(processorPromise: Promise[Processor[ByteString, ByteString]],
localAddressPromise: Promise[InetSocketAddress],
@ -77,9 +76,9 @@ private[akka] abstract class TcpStreamActor(val settings: MaterializerSettings)
case PeerClosed
closed = true
readPump.pump()
case ErrorClosed(cause) fail(new TcpStreamException(s"The connection closed with error $cause"))
case CommandFailed(cmd) fail(new TcpStreamException(s"Tcp command [$cmd] failed"))
case Aborted fail(new TcpStreamException("The connection has been aborted"))
case ErrorClosed(cause) fail(new StreamTcpException(s"The connection closed with error $cause"))
case CommandFailed(cmd) fail(new StreamTcpException(s"Tcp command [$cmd] failed"))
case Aborted fail(new StreamTcpException("The connection has been aborted"))
}
override def inputsAvailable: Boolean = pendingElement ne null
@ -237,9 +236,9 @@ private[akka] class OutboundTcpStreamActor(processorPromise: Promise[Processor[B
initSteps.become(Actor.emptyBehavior)
case f: CommandFailed
val ex = new TcpStreamException("Connection failed.")
val ex = new StreamTcpException("Connection failed.")
localAddressPromise.failure(ex)
processorPromise.failure(ex)
fail(ex)
}
}
}

View file

@ -1,19 +1,24 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.io
package akka.stream.impl.io
import java.net.InetSocketAddress
import akka.stream.io.StreamTcp.ConnectionException
import org.reactivestreams.Subscriber
import scala.concurrent.{ Future, Promise }
import akka.util.ByteString
import akka.io.Tcp._
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.Props
import akka.actor.Stash
import akka.io.{ IO, Tcp }
import akka.io.Tcp._
import akka.stream.{ FlowMaterializer, MaterializerSettings }
import akka.stream.scaladsl.{ Flow, Pipe }
import akka.stream.impl._
import akka.actor._
import akka.stream.scaladsl.{ Flow, Pipe }
import akka.stream.scaladsl.StreamTcp
import akka.util.ByteString
import org.reactivestreams.Subscriber
import akka.stream.ConnectionException
import akka.stream.BindFailedException
/**
* INTERNAL API
@ -81,7 +86,7 @@ private[akka] class TcpListenStreamActor(localAddressPromise: Promise[InetSocket
primaryOutputs.getExposedPublisher.subscribe(flowSubscriber.asInstanceOf[Subscriber[Any]])
subreceive.become(running)
case f: CommandFailed
val ex = StreamTcp.BindFailedException
val ex = BindFailedException
localAddressPromise.failure(ex)
unbindPromise.failure(ex)
flowSubscriber.onError(ex)

View file

@ -0,0 +1,168 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.javadsl
import java.lang.{ Iterable JIterable }
import scala.collection.immutable
import scala.concurrent.duration._
import java.net.InetSocketAddress
import scala.concurrent.Future
import scala.util.control.NoStackTrace
import akka.actor.ActorSystem
import akka.actor.ExtendedActorSystem
import akka.actor.ExtensionId
import akka.actor.ExtensionIdProvider
import akka.stream.FlowMaterializer
import akka.stream.scaladsl
import akka.util.ByteString
import akka.japi.Util.immutableSeq
import akka.io.Inet.SocketOption
object StreamTcp extends ExtensionId[StreamTcp] with ExtensionIdProvider {
/**
* Represents a prospective TCP server binding.
*/
class ServerBinding private[akka] (delegate: scaladsl.StreamTcp.ServerBinding) {
/**
* The local address of the endpoint bound by the materialization of the `connections` [[Source]]
* whose [[MaterializedMap]] is passed as parameter.
*/
def localAddress(materializedMap: MaterializedMap): Future[InetSocketAddress] =
delegate.localAddress(materializedMap.asScala)
/**
* The stream of accepted incoming connections.
* Can be materialized several times but only one subscription can be "live" at one time, i.e.
* subsequent materializations will reject subscriptions with an [[BindFailedException]] if the previous
* materialization still has an uncancelled subscription.
* Cancelling the subscription to a materialization of this source will cause the listening port to be unbound.
*/
def connections: Source[IncomingConnection] =
Source.adapt(delegate.connections.map(new IncomingConnection(_)))
/**
* Asynchronously triggers the unbinding of the port that was bound by the materialization of the `connections`
* [[Source]] whose [[MaterializedMap]] is passed as parameter.
*
* The produced [[scala.concurrent.Future]] is fulfilled when the unbinding has been completed.
*/
def unbind(materializedMap: MaterializedMap): Future[Unit] =
delegate.unbind(materializedMap.asScala)
}
/**
* Represents an accepted incoming TCP connection.
*/
class IncomingConnection private[akka] (delegate: scaladsl.StreamTcp.IncomingConnection) {
/**
* The local address this connection is bound to.
*/
def localAddress: InetSocketAddress = delegate.localAddress
/**
* The remote address this connection is bound to.
*/
def remoteAddress: InetSocketAddress = delegate.remoteAddress
/**
* Handles the connection using the given flow, which is materialized exactly once and the respective
* [[MaterializedMap]] returned.
*
* Convenience shortcut for: `flow.join(handler).run()`.
*/
def handleWith(handler: Flow[ByteString, ByteString], materializer: FlowMaterializer): MaterializedMap =
new MaterializedMap(delegate.handleWith(handler.asScala)(materializer))
/**
* A flow representing the client on the other side of the connection.
* This flow can be materialized only once.
*/
def flow: Flow[ByteString, ByteString] = Flow.adapt(delegate.flow)
}
/**
* Represents a prospective outgoing TCP connection.
*/
class OutgoingConnection private[akka] (delegate: scaladsl.StreamTcp.OutgoingConnection) {
/**
* The remote address this connection is or will be bound to.
*/
def remoteAddress: InetSocketAddress = delegate.remoteAddress
/**
* The local address of the endpoint bound by the materialization of the connection materialization
* whose [[MaterializedMap]] is passed as parameter.
*/
def localAddress(mMap: MaterializedMap): Future[InetSocketAddress] =
delegate.localAddress(mMap.asScala)
/**
* Handles the connection using the given flow.
* This method can be called several times, every call will materialize the given flow exactly once thereby
* triggering a new connection attempt to the `remoteAddress`.
* If the connection cannot be established the materialized stream will immediately be terminated
* with a [[akka.stream.StreamTcpException]].
*
* Convenience shortcut for: `flow.join(handler).run()`.
*/
def handleWith(handler: Flow[ByteString, ByteString], materializer: FlowMaterializer): MaterializedMap =
new MaterializedMap(delegate.handleWith(handler.asScala)(materializer))
/**
* A flow representing the server on the other side of the connection.
* This flow can be materialized several times, every materialization will open a new connection to the
* `remoteAddress`. If the connection cannot be established the materialized stream will immediately be terminated
* with a [[akka.stream.StreamTcpException]].
*/
def flow: Flow[ByteString, ByteString] = Flow.adapt(delegate.flow)
}
override def get(system: ActorSystem): StreamTcp = super.get(system)
def lookup() = StreamTcp
def createExtension(system: ExtendedActorSystem): StreamTcp = new StreamTcp(system)
}
class StreamTcp(system: ExtendedActorSystem) extends akka.actor.Extension {
import StreamTcp._
private lazy val delegate: scaladsl.StreamTcp = scaladsl.StreamTcp(system)
/**
* Creates a [[StreamTcp.ServerBinding]] instance which represents a prospective TCP server binding on the given `endpoint`.
*/
def bind(endpoint: InetSocketAddress,
backlog: Int,
options: JIterable[SocketOption],
idleTimeout: Duration): ServerBinding =
new ServerBinding(delegate.bind(endpoint, backlog, immutableSeq(options), idleTimeout))
/**
* Creates a [[StreamTcp.ServerBinding]] without specifying options.
* It represents a prospective TCP server binding on the given `endpoint`.
*/
def bind(endpoint: InetSocketAddress): ServerBinding =
new ServerBinding(delegate.bind(endpoint))
/**
* Creates an [[StreamTcp.OutgoingConnection]] instance representing a prospective TCP client connection to the given endpoint.
*/
def outgoingConnection(remoteAddress: InetSocketAddress,
localAddress: Option[InetSocketAddress],
options: JIterable[SocketOption],
connectTimeout: Duration,
idleTimeout: Duration): OutgoingConnection =
new OutgoingConnection(delegate.outgoingConnection(
remoteAddress, localAddress, immutableSeq(options), connectTimeout, idleTimeout))
/**
* Creates an [[StreamTcp.OutgoingConnection]] without specifying options.
* It represents a prospective TCP client connection to the given endpoint.
*/
def outgoingConnection(remoteAddress: InetSocketAddress): OutgoingConnection =
new OutgoingConnection(delegate.outgoingConnection(remoteAddress))
}

View file

@ -1,24 +1,35 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.io
package akka.stream.scaladsl
import java.net.{ InetSocketAddress, URLEncoder }
import org.reactivestreams.{ Processor, Subscriber, Subscription }
import scala.util.control.NoStackTrace
import scala.util.{ Failure, Success }
import scala.collection.immutable
import scala.concurrent.duration._
import scala.concurrent.{ Promise, ExecutionContext, Future }
import akka.util.ByteString
import scala.concurrent.duration.Duration
import scala.util.{ Failure, Success }
import scala.util.control.NoStackTrace
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.ActorSystem
import akka.actor.ExtendedActorSystem
import akka.actor.ExtensionId
import akka.actor.ExtensionIdProvider
import akka.actor.Props
import akka.io.Inet.SocketOption
import akka.io.Tcp
import akka.stream.{ FlowMaterializer, MaterializerSettings }
import akka.stream.scaladsl._
import akka.stream.impl._
import akka.actor._
import akka.stream.scaladsl._
import akka.util.ByteString
import org.reactivestreams.{ Processor, Subscriber, Subscription }
import akka.actor.actorRef2Scala
import akka.stream.impl.io.TcpStreamActor
import akka.stream.impl.io.TcpListenStreamActor
import akka.stream.impl.io.DelayedInitProcessor
import akka.stream.impl.io.StreamTcpManager
object StreamTcp extends ExtensionId[StreamTcpExt] with ExtensionIdProvider {
object StreamTcp extends ExtensionId[StreamTcp] with ExtensionIdProvider {
/**
* Represents a prospective TCP server binding.
@ -43,7 +54,7 @@ object StreamTcp extends ExtensionId[StreamTcpExt] with ExtensionIdProvider {
* Asynchronously triggers the unbinding of the port that was bound by the materialization of the `connections`
* [[Source]] whose [[MaterializedMap]] is passed as parameter.
*
* The produced [[Future]] is fulfilled when the unbinding has been completed.
* The produced [[scala.concurrent.Future]] is fulfilled when the unbinding has been completed.
*/
def unbind(materializedMap: MaterializedMap): Future[Unit]
}
@ -80,7 +91,7 @@ object StreamTcp extends ExtensionId[StreamTcpExt] with ExtensionIdProvider {
/**
* Represents a prospective outgoing TCP connection.
*/
sealed trait OutgoingConnection {
trait OutgoingConnection {
/**
* The remote address this connection is or will be bound to.
*/
@ -97,7 +108,7 @@ object StreamTcp extends ExtensionId[StreamTcpExt] with ExtensionIdProvider {
* This method can be called several times, every call will materialize the given flow exactly once thereby
* triggering a new connection attempt to the `remoteAddress`.
* If the connection cannot be established the materialized stream will immediately be terminated
* with a [[ConnectionAttemptFailedException]].
* with a [[akka.stream.StreamTcpException]].
*
* Convenience shortcut for: `flow.join(handler).run()`.
*/
@ -107,35 +118,36 @@ object StreamTcp extends ExtensionId[StreamTcpExt] with ExtensionIdProvider {
* A flow representing the server on the other side of the connection.
* This flow can be materialized several times, every materialization will open a new connection to the
* `remoteAddress`. If the connection cannot be established the materialized stream will immediately be terminated
* with a [[ConnectionAttemptFailedException]].
* with a [[akka.stream.StreamTcpException]].
*/
def flow: Flow[ByteString, ByteString]
}
case object BindFailedException extends RuntimeException with NoStackTrace
/**
* INTERNAL API
*/
private[akka] class PreMaterializedOutgoingKey extends Key[Future[InetSocketAddress]] {
override def materialize(map: MaterializedMap) =
throw new IllegalStateException("This key has already been materialized by the TCP Processor")
}
class ConnectionException(message: String) extends RuntimeException(message)
def apply()(implicit system: ActorSystem): StreamTcp = super.apply(system)
class ConnectionAttemptFailedException(val endpoint: InetSocketAddress) extends ConnectionException(s"Connection attempt to $endpoint failed")
//////////////////// EXTENSION SETUP ///////////////////
def apply()(implicit system: ActorSystem): StreamTcpExt = super.apply(system)
override def get(system: ActorSystem): StreamTcp = super.get(system)
def lookup() = StreamTcp
def createExtension(system: ExtendedActorSystem): StreamTcpExt = new StreamTcpExt(system)
def createExtension(system: ExtendedActorSystem): StreamTcp = new StreamTcp(system)
}
class StreamTcpExt(system: ExtendedActorSystem) extends akka.actor.Extension {
import StreamTcpExt._
class StreamTcp(system: ExtendedActorSystem) extends akka.actor.Extension {
import StreamTcp._
import system.dispatcher
private val manager: ActorRef = system.systemActorOf(Props[StreamTcpManager], name = "IO-TCP-STREAM")
/**
* Creates a [[ServerBinding]] instance which represents a prospective TCP server binding on the given `endpoint`.
* Creates a [[StreamTcp.ServerBinding]] instance which represents a prospective TCP server binding on the given `endpoint`.
*/
def bind(endpoint: InetSocketAddress,
backlog: Int = 100,
@ -160,7 +172,7 @@ class StreamTcpExt(system: ExtendedActorSystem) extends akka.actor.Extension {
}
/**
* Creates an [[OutgoingConnection]] instance representing a prospective TCP client connection to the given endpoint.
* Creates an [[StreamTcp.OutgoingConnection]] instance representing a prospective TCP client connection to the given endpoint.
*/
def outgoingConnection(remoteAddress: InetSocketAddress,
localAddress: Option[InetSocketAddress] = None,
@ -186,110 +198,3 @@ class StreamTcpExt(system: ExtendedActorSystem) extends akka.actor.Extension {
}
}
/**
* INTERNAL API
*/
private[akka] object StreamTcpExt {
/**
* INTERNAL API
*/
class PreMaterializedOutgoingKey extends Key[Future[InetSocketAddress]] {
override def materialize(map: MaterializedMap) =
throw new IllegalStateException("This key has already been materialized by the TCP Processor")
}
}
/**
* INTERNAL API
*/
private[akka] class DelayedInitProcessor[I, O](val implFuture: Future[Processor[I, O]])(implicit ec: ExecutionContext) extends Processor[I, O] {
@volatile private var impl: Processor[I, O] = _
private val setVarFuture = implFuture.andThen { case Success(p) impl = p }
override def onSubscribe(s: Subscription): Unit = setVarFuture.onComplete {
case Success(x) x.onSubscribe(s)
case Failure(_) s.cancel()
}
override def onError(t: Throwable): Unit = {
if (impl eq null) setVarFuture.onSuccess { case p p.onError(t) }
else impl.onError(t)
}
override def onComplete(): Unit = {
if (impl eq null) setVarFuture.onSuccess { case p p.onComplete() }
else impl.onComplete()
}
override def onNext(t: I): Unit = impl.onNext(t)
override def subscribe(s: Subscriber[_ >: O]): Unit = setVarFuture.onComplete {
case Success(x) x.subscribe(s)
case Failure(e) s.onError(e)
}
}
/**
* INTERNAL API
*/
private[io] object StreamTcpManager {
/**
* INTERNAL API
*/
private[io] case class Connect(processorPromise: Promise[Processor[ByteString, ByteString]],
localAddressPromise: Promise[InetSocketAddress],
remoteAddress: InetSocketAddress,
localAddress: Option[InetSocketAddress],
options: immutable.Traversable[SocketOption],
connectTimeout: Duration,
idleTimeout: Duration)
/**
* INTERNAL API
*/
private[io] case class Bind(localAddressPromise: Promise[InetSocketAddress],
unbindPromise: Promise[() Future[Unit]],
flowSubscriber: Subscriber[StreamTcp.IncomingConnection],
endpoint: InetSocketAddress,
backlog: Int,
options: immutable.Traversable[SocketOption],
idleTimeout: Duration)
/**
* INTERNAL API
*/
private[io] case class ExposedProcessor(processor: Processor[ByteString, ByteString])
}
/**
* INTERNAL API
*/
private[akka] class StreamTcpManager extends Actor {
import akka.stream.io.StreamTcpManager._
var nameCounter = 0
def encName(prefix: String, endpoint: InetSocketAddress) = {
nameCounter += 1
s"$prefix-$nameCounter-${URLEncoder.encode(endpoint.toString, "utf-8")}"
}
def receive: Receive = {
case Connect(processorPromise, localAddressPromise, remoteAddress, localAddress, options, connectTimeout, _)
val connTimeout = connectTimeout match {
case x: FiniteDuration Some(x)
case _ None
}
val processorActor = context.actorOf(TcpStreamActor.outboundProps(processorPromise, localAddressPromise,
Tcp.Connect(remoteAddress, localAddress, options, connTimeout, pullMode = true),
materializerSettings = MaterializerSettings(context.system)), name = encName("client", remoteAddress))
processorActor ! ExposedProcessor(ActorProcessor[ByteString, ByteString](processorActor))
case Bind(localAddressPromise, unbindPromise, flowSubscriber, endpoint, backlog, options, _)
val publisherActor = context.actorOf(TcpListenStreamActor.props(localAddressPromise, unbindPromise,
flowSubscriber, Tcp.Bind(context.system.deadLetters, endpoint, backlog, options, pullMode = true),
MaterializerSettings(context.system)), name = encName("server", endpoint))
// this sends the ExposedPublisher message to the publisher actor automatically
ActorPublisher[Any](publisherActor)
}
}

View file

@ -2,7 +2,7 @@
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.io
package akka.stream.ssl
import java.nio.ByteBuffer
import java.security.Principal