From 07f299a1e07bfd9d1a340d4ef498acaaa1e4d153 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 24 Apr 2015 12:31:23 +0200 Subject: [PATCH] !str #17031 Use host and port instead of InetSocketAddress * as convenience in bind and outgoingConnection --- .../docs/stream/io/StreamTcpDocSpec.scala | 49 +++++++++++-------- .../src/main/scala/akka/http/Http.scala | 9 ++-- .../engine/client/ConnectionPoolSpec.scala | 3 +- .../akka/stream/javadsl/StreamTcpTest.java | 13 +++-- .../scala/akka/stream/io/StreamTcpSpec.scala | 6 +-- .../test/scala/akka/stream/io/TlsSpec.scala | 2 +- .../scala/akka/stream/javadsl/StreamTcp.scala | 13 ++--- .../akka/stream/scaladsl/StreamTcp.scala | 18 +++++-- 8 files changed, 68 insertions(+), 45 deletions(-) diff --git a/akka-docs-dev/rst/scala/code/docs/stream/io/StreamTcpDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/io/StreamTcpDocSpec.scala index a035af288c..12b0a85c1c 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/io/StreamTcpDocSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/io/StreamTcpDocSpec.scala @@ -31,15 +31,14 @@ class StreamTcpDocSpec extends AkkaSpec { "simple server connection" in { { //#echo-server-simple-bind - val localhost = new InetSocketAddress("127.0.0.1", 8888) + val connections: Source[IncomingConnection, Future[ServerBinding]] = + StreamTcp().bind("127.0.0.1", 8888) //#echo-server-simple-bind } { val localhost = TestUtils.temporaryServerAddress() - //#echo-server-simple-bind val connections: Source[IncomingConnection, Future[ServerBinding]] = - StreamTcp().bind(localhost) - //#echo-server-simple-bind + StreamTcp().bind(localhost.getHostName, localhost.getPort) // TODO getHostString in Java7 //#echo-server-simple-handle connections runForeach { connection => @@ -58,7 +57,7 @@ class StreamTcpDocSpec extends AkkaSpec { "initial server banner echo server" in { val localhost = TestUtils.temporaryServerAddress() - val connections = StreamTcp().bind(localhost) + val connections = StreamTcp().bind(localhost.getHostName, localhost.getPort) // TODO getHostString in Java7 val serverProbe = TestProbe() //#welcome-banner-chat-server @@ -112,25 +111,33 @@ class StreamTcpDocSpec extends AkkaSpec { } } - //#repl-client - val connection = StreamTcp().outgoingConnection(localhost) - - val replParser = new PushStage[String, ByteString] { - override def onPush(elem: String, ctx: Context[ByteString]): SyncDirective = { - elem match { - case "q" ⇒ ctx.pushAndFinish(ByteString("BYE\n")) - case _ ⇒ ctx.push(ByteString(s"$elem\n")) - } - } + { + //#repl-client + val connection = StreamTcp().outgoingConnection("127.0.0.1", 8888) + //#repl-client } - val repl = Flow[ByteString] - .transform(() => RecipeParseLines.parseLines("\n", maximumLineBytes = 256)) - .map(text => println("Server: " + text)) - .map(_ => readLine("> ")) - .transform(() ⇒ replParser) + { + val connection = StreamTcp().outgoingConnection(localhost) + //#repl-client - connection.join(repl).run() + val replParser = new PushStage[String, ByteString] { + override def onPush(elem: String, ctx: Context[ByteString]): SyncDirective = { + elem match { + case "q" ⇒ ctx.pushAndFinish(ByteString("BYE\n")) + case _ ⇒ ctx.push(ByteString(s"$elem\n")) + } + } + } + + val repl = Flow[ByteString] + .transform(() => RecipeParseLines.parseLines("\n", maximumLineBytes = 256)) + .map(text => println("Server: " + text)) + .map(_ => readLine("> ")) + .transform(() ⇒ replParser) + + connection.join(repl).run() + } //#repl-client serverProbe.expectMsg("Hello world") diff --git a/akka-http-core/src/main/scala/akka/http/Http.scala b/akka-http-core/src/main/scala/akka/http/Http.scala index 76382318af..684f971ef7 100644 --- a/akka-http-core/src/main/scala/akka/http/Http.scala +++ b/akka-http-core/src/main/scala/akka/http/Http.scala @@ -39,11 +39,10 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E options: immutable.Traversable[Inet.SocketOption] = Nil, settings: ServerSettings = ServerSettings(system), log: LoggingAdapter = system.log)(implicit fm: FlowMaterializer): Source[IncomingConnection, Future[ServerBinding]] = { - val endpoint = new InetSocketAddress(interface, port) - val connections: Source[StreamTcp.IncomingConnection, Future[StreamTcp.ServerBinding]] = - StreamTcp().bind(endpoint, backlog, options, settings.timeouts.idleTimeout) + val connections: Source[Tcp.IncomingConnection, Future[Tcp.ServerBinding]] = + Tcp().bind(interface, port, backlog, options, settings.timeouts.idleTimeout) connections.map { - case StreamTcp.IncomingConnection(localAddress, remoteAddress, flow) ⇒ + case Tcp.IncomingConnection(localAddress, remoteAddress, flow) ⇒ val layer = serverLayer(settings, log) IncomingConnection(localAddress, remoteAddress, layer join flow) }.mapMaterialized { @@ -145,7 +144,7 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E val remoteAddr = new InetSocketAddress(host, port) val layer = clientLayer(remoteAddr, settings, log) - val transportFlow = StreamTcp().outgoingConnection(remoteAddr, localAddress, + val transportFlow = Tcp().outgoingConnection(remoteAddr, localAddress, options, settings.connectingTimeout, settings.idleTimeout) layer.joinMat(transportFlow) { (_, tcpConnFuture) ⇒ diff --git a/akka-http-core/src/test/scala/akka/http/engine/client/ConnectionPoolSpec.scala b/akka-http-core/src/test/scala/akka/http/engine/client/ConnectionPoolSpec.scala index 101e802a9b..0d2c7ee986 100644 --- a/akka-http-core/src/test/scala/akka/http/engine/client/ConnectionPoolSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/engine/client/ConnectionPoolSpec.scala @@ -259,7 +259,8 @@ class ConnectionPoolSpec extends AkkaSpec("akka.loggers = []\n akka.loglevel = O BidiShape(top.inlet, top.outlet, bottom.inlet, bottom.outlet) } val sink = if (autoAccept) Sink.foreach[Http.IncomingConnection](handleConnection) else Sink(incomingConnections) - StreamTcp().bind(serverEndpoint, idleTimeout = serverSettings.timeouts.idleTimeout) + // TODO getHostString in Java7 + StreamTcp().bind(serverEndpoint.getHostName, serverEndpoint.getPort, idleTimeout = serverSettings.timeouts.idleTimeout) .map { c ⇒ val layer = Http().serverLayer(serverSettings, log) Http.IncomingConnection(c.localAddress, c.remoteAddress, layer atop rawBytesInjection join c.flow) diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/StreamTcpTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/StreamTcpTest.java index 19b9851ad3..3aa69278f5 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/StreamTcpTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/StreamTcpTest.java @@ -49,7 +49,8 @@ public class StreamTcpTest extends StreamTest { @Test public void mustWorkInHappyCase() throws Exception { final InetSocketAddress serverAddress = TestUtils.temporaryServerAddress("127.0.0.1", false); - final Source> binding = StreamTcp.get(system).bind(serverAddress); + final Source> binding = StreamTcp.get(system) + .bind(serverAddress.getHostName(), serverAddress.getPort()); // TODO getHostString in Java7 final Future future = binding.to(echoHandler).run(materializer); final ServerBinding b = Await.result(future, FiniteDuration.create(5, TimeUnit.SECONDS)); @@ -57,7 +58,8 @@ public class StreamTcpTest extends StreamTest { final Future resultFuture = Source .from(testInput) - .via(StreamTcp.get(system).outgoingConnection(serverAddress)) + // TODO getHostString in Java7 + .via(StreamTcp.get(system).outgoingConnection(serverAddress.getHostName(), serverAddress.getPort())) .runFold(ByteString.empty(), new Function2() { public ByteString apply(ByteString acc, ByteString elem) { @@ -74,7 +76,8 @@ public class StreamTcpTest extends StreamTest { @Test public void mustReportServerBindFailure() throws Exception { final InetSocketAddress serverAddress = TestUtils.temporaryServerAddress("127.0.0.1", false); - final Source> binding = StreamTcp.get(system).bind(serverAddress); + final Source> binding = StreamTcp.get(system) + .bind(serverAddress.getHostName(), serverAddress.getPort()); // TODO getHostString in Java7 final Future future = binding.to(echoHandler).run(materializer); final ServerBinding b = Await.result(future, FiniteDuration.create(5, TimeUnit.SECONDS)); @@ -95,7 +98,9 @@ public class StreamTcpTest extends StreamTest { try { Await.result( Source.from(testInput) - .via(StreamTcp.get(system).outgoingConnection(serverAddress), Keep.> right()) + // TODO getHostString in Java7 + .via(StreamTcp.get(system).outgoingConnection(serverAddress.getHostName(), serverAddress.getPort()), + Keep.> right()) .to(Sink. ignore()) .run(materializer), FiniteDuration.create(5, TimeUnit.SECONDS)); diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/StreamTcpSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/StreamTcpSpec.scala index 0bf5aa53be..7852e0b2f4 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/StreamTcpSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/StreamTcpSpec.scala @@ -352,7 +352,7 @@ class StreamTcpSpec extends AkkaSpec with TcpHelper { val serverAddress = temporaryServerAddress() val (bindingFuture, echoServerFinish) = StreamTcp() - .bind(serverAddress) + .bind(serverAddress.getHostName, serverAddress.getPort) // TODO getHostString in Java7 .toMat(echoHandler)(Keep.both) .run() @@ -373,7 +373,7 @@ class StreamTcpSpec extends AkkaSpec with TcpHelper { val serverAddress = temporaryServerAddress() val (bindingFuture, echoServerFinish) = StreamTcp() - .bind(serverAddress) + .bind(serverAddress.getHostName, serverAddress.getPort) // TODO getHostString in Java7 .toMat(echoHandler)(Keep.both) .run() @@ -401,7 +401,7 @@ class StreamTcpSpec extends AkkaSpec with TcpHelper { "bind and unbind correctly" in { val address = temporaryServerAddress() val probe1 = TestSubscriber.manualProbe[StreamTcp.IncomingConnection]() - val bind = StreamTcp(system).bind(address) + val bind = StreamTcp(system).bind(address.getHostName, address.getPort) // TODO getHostString in Java7 // Bind succeeded, we have a local address val binding1 = Await.result(bind.to(Sink(probe1)).run(), 3.second) diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/TlsSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/TlsSpec.scala index 106770b274..4f8f3b57d8 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/TlsSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/TlsSpec.scala @@ -155,7 +155,7 @@ class TlsSpec extends AkkaSpec("akka.loglevel=INFO\nakka.actor.debug.receive=off def server(flow: Flow[ByteString, ByteString, Any]) = { val server = StreamTcp() - .bind(new InetSocketAddress("localhost", 0)) + .bind("localhost", 0) .to(Sink.foreach(c ⇒ c.flow.join(flow).run())) .run() Await.result(server, 2.seconds) diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/StreamTcp.scala b/akka-stream/src/main/scala/akka/stream/javadsl/StreamTcp.scala index d5f9f89541..385134bf01 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/StreamTcp.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/StreamTcp.scala @@ -100,11 +100,12 @@ class StreamTcp(system: ExtendedActorSystem) extends akka.actor.Extension { /** * Creates a [[StreamTcp.ServerBinding]] instance which represents a prospective TCP server binding on the given `endpoint`. */ - def bind(endpoint: InetSocketAddress, + def bind(interface: String, + port: Int, backlog: Int, options: JIterable[SocketOption], idleTimeout: Duration): Source[IncomingConnection, Future[ServerBinding]] = - Source.adapt(delegate.bind(endpoint, backlog, immutableSeq(options), idleTimeout) + Source.adapt(delegate.bind(interface, port, backlog, immutableSeq(options), idleTimeout) .map(new IncomingConnection(_)) .mapMaterialized(_.map(new ServerBinding(_))(ec))) @@ -112,8 +113,8 @@ class StreamTcp(system: ExtendedActorSystem) extends akka.actor.Extension { * Creates a [[StreamTcp.ServerBinding]] without specifying options. * It represents a prospective TCP server binding on the given `endpoint`. */ - def bind(endpoint: InetSocketAddress): Source[IncomingConnection, Future[ServerBinding]] = - Source.adapt(delegate.bind(endpoint) + def bind(interface: String, port: Int): Source[IncomingConnection, Future[ServerBinding]] = + Source.adapt(delegate.bind(interface, port) .map(new IncomingConnection(_)) .mapMaterialized(_.map(new ServerBinding(_))(ec))) @@ -132,8 +133,8 @@ class StreamTcp(system: ExtendedActorSystem) extends akka.actor.Extension { * Creates an [[StreamTcp.OutgoingConnection]] without specifying options. * It represents a prospective TCP client connection to the given endpoint. */ - def outgoingConnection(remoteAddress: InetSocketAddress): Flow[ByteString, ByteString, Future[OutgoingConnection]] = - Flow.adapt(delegate.outgoingConnection(remoteAddress) + def outgoingConnection(host: String, port: Int): Flow[ByteString, ByteString, Future[OutgoingConnection]] = + Flow.adapt(delegate.outgoingConnection(new InetSocketAddress(host, port)) .mapMaterialized(_.map(new OutgoingConnection(_))(ec))) } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/StreamTcp.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/StreamTcp.scala index 051a81050a..e759eb1004 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/StreamTcp.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/StreamTcp.scala @@ -124,20 +124,23 @@ class StreamTcp(system: ExtendedActorSystem) extends akka.actor.Extension { /** * Creates a [[StreamTcp.ServerBinding]] instance which represents a prospective TCP server binding on the given `endpoint`. */ - def bind(endpoint: InetSocketAddress, + def bind(interface: String, + port: Int, backlog: Int = 100, options: immutable.Traversable[SocketOption] = Nil, idleTimeout: Duration = Duration.Inf): Source[IncomingConnection, Future[ServerBinding]] = { - new Source(new BindSource(endpoint, backlog, options, idleTimeout, OperationAttributes.none, SourceShape(new Outlet("BindSource.out")))) + new Source(new BindSource(new InetSocketAddress(interface, port), backlog, options, idleTimeout, + OperationAttributes.none, SourceShape(new Outlet("BindSource.out")))) } def bindAndHandle( handler: Flow[ByteString, ByteString, _], - endpoint: InetSocketAddress, + interface: String, + port: Int, backlog: Int = 100, options: immutable.Traversable[SocketOption] = Nil, idleTimeout: Duration = Duration.Inf)(implicit m: FlowMaterializer): Future[ServerBinding] = { - bind(endpoint, backlog, options, idleTimeout).to(Sink.foreach { conn: IncomingConnection ⇒ + bind(interface, port, backlog, options, idleTimeout).to(Sink.foreach { conn: IncomingConnection ⇒ conn.flow.join(handler).run() }).run() } @@ -164,5 +167,12 @@ class StreamTcp(system: ExtendedActorSystem) extends akka.actor.Extension { }) } + + /** + * Creates an [[StreamTcp.OutgoingConnection]] without specifying options. + * It represents a prospective TCP client connection to the given endpoint. + */ + def outgoingConnection(host: String, port: Int): Flow[ByteString, ByteString, Future[OutgoingConnection]] = + outgoingConnection(new InetSocketAddress(host, port)) }