From 1595a8a9113f05eb45fbc321a2a15b026810acec Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 24 Apr 2015 13:15:02 +0200 Subject: [PATCH] !str #17031 Rename StreamTcp to Tcp --- .../docs/stream/io/StreamTcpDocSpec.scala | 12 ++--- .../engine/client/ConnectionPoolSpec.scala | 2 +- .../{StreamTcpTest.java => TcpTest.java} | 16 +++---- .../io/{StreamTcpSpec.scala => TcpSpec.scala} | 48 +++++++++---------- .../test/scala/akka/stream/io/TlsSpec.scala | 10 ++-- .../stream/impl/io/StreamTcpManager.scala | 2 +- .../stream/impl/io/TcpListenStreamActor.scala | 2 +- .../javadsl/{StreamTcp.scala => Tcp.scala} | 28 +++++------ .../scaladsl/{StreamTcp.scala => Tcp.scala} | 24 +++++----- 9 files changed, 72 insertions(+), 72 deletions(-) rename akka-stream-tests/src/test/java/akka/stream/javadsl/{StreamTcpTest.java => TcpTest.java} (89%) rename akka-stream-tests/src/test/scala/akka/stream/io/{StreamTcpSpec.scala => TcpSpec.scala} (85%) rename akka-stream/src/main/scala/akka/stream/javadsl/{StreamTcp.scala => Tcp.scala} (79%) rename akka-stream/src/main/scala/akka/stream/scaladsl/{StreamTcp.scala => Tcp.scala} (87%) diff --git a/akka-docs-dev/rst/scala/code/docs/stream/io/StreamTcpDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/io/StreamTcpDocSpec.scala index 12b0a85c1c..c7de46f93c 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/io/StreamTcpDocSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/io/StreamTcpDocSpec.scala @@ -7,7 +7,7 @@ import java.net.InetSocketAddress import java.util.concurrent.atomic.AtomicReference import akka.stream._ -import akka.stream.scaladsl.StreamTcp._ +import akka.stream.scaladsl.Tcp._ import akka.stream.scaladsl._ import akka.stream.stage.Context import akka.stream.stage.PushStage @@ -32,13 +32,13 @@ class StreamTcpDocSpec extends AkkaSpec { { //#echo-server-simple-bind val connections: Source[IncomingConnection, Future[ServerBinding]] = - StreamTcp().bind("127.0.0.1", 8888) + Tcp().bind("127.0.0.1", 8888) //#echo-server-simple-bind } { val localhost = TestUtils.temporaryServerAddress() val connections: Source[IncomingConnection, Future[ServerBinding]] = - StreamTcp().bind(localhost.getHostName, localhost.getPort) // TODO getHostString in Java7 + Tcp().bind(localhost.getHostName, localhost.getPort) // TODO getHostString in Java7 //#echo-server-simple-handle connections runForeach { connection => @@ -57,7 +57,7 @@ class StreamTcpDocSpec extends AkkaSpec { "initial server banner echo server" in { val localhost = TestUtils.temporaryServerAddress() - val connections = StreamTcp().bind(localhost.getHostName, localhost.getPort) // TODO getHostString in Java7 + val connections = Tcp().bind(localhost.getHostName, localhost.getPort) // TODO getHostString in Java7 val serverProbe = TestProbe() //#welcome-banner-chat-server @@ -113,12 +113,12 @@ class StreamTcpDocSpec extends AkkaSpec { { //#repl-client - val connection = StreamTcp().outgoingConnection("127.0.0.1", 8888) + val connection = Tcp().outgoingConnection("127.0.0.1", 8888) //#repl-client } { - val connection = StreamTcp().outgoingConnection(localhost) + val connection = Tcp().outgoingConnection(localhost) //#repl-client val replParser = new PushStage[String, ByteString] { diff --git a/akka-http-core/src/test/scala/akka/http/engine/client/ConnectionPoolSpec.scala b/akka-http-core/src/test/scala/akka/http/engine/client/ConnectionPoolSpec.scala index 0d2c7ee986..d54402e4dc 100644 --- a/akka-http-core/src/test/scala/akka/http/engine/client/ConnectionPoolSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/engine/client/ConnectionPoolSpec.scala @@ -260,7 +260,7 @@ class ConnectionPoolSpec extends AkkaSpec("akka.loggers = []\n akka.loglevel = O } val sink = if (autoAccept) Sink.foreach[Http.IncomingConnection](handleConnection) else Sink(incomingConnections) // TODO getHostString in Java7 - StreamTcp().bind(serverEndpoint.getHostName, serverEndpoint.getPort, idleTimeout = serverSettings.timeouts.idleTimeout) + Tcp().bind(serverEndpoint.getHostName, serverEndpoint.getPort, idleTimeout = serverSettings.timeouts.idleTimeout) .map { c ⇒ val layer = Http().serverLayer(serverSettings, log) Http.IncomingConnection(c.localAddress, c.remoteAddress, layer atop rawBytesInjection join c.flow) diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/StreamTcpTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/TcpTest.java similarity index 89% rename from akka-stream-tests/src/test/java/akka/stream/javadsl/StreamTcpTest.java rename to akka-stream-tests/src/test/java/akka/stream/javadsl/TcpTest.java index 3aa69278f5..5ec4a5b6c9 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/StreamTcpTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/TcpTest.java @@ -17,19 +17,19 @@ import scala.concurrent.duration.FiniteDuration; import scala.runtime.BoxedUnit; import akka.stream.*; -import akka.stream.javadsl.StreamTcp.*; +import akka.stream.javadsl.Tcp.*; import akka.japi.function.*; import akka.stream.testkit.AkkaSpec; import akka.stream.testkit.TestUtils; import akka.util.ByteString; -public class StreamTcpTest extends StreamTest { - public StreamTcpTest() { +public class TcpTest extends StreamTest { + public TcpTest() { super(actorSystemResource); } @ClassRule - public static AkkaJUnitActorSystemResource actorSystemResource = new AkkaJUnitActorSystemResource("StreamTcpTest", + public static AkkaJUnitActorSystemResource actorSystemResource = new AkkaJUnitActorSystemResource("TcpTest", AkkaSpec.testConf()); final Sink> echoHandler = @@ -49,7 +49,7 @@ public class StreamTcpTest extends StreamTest { @Test public void mustWorkInHappyCase() throws Exception { final InetSocketAddress serverAddress = TestUtils.temporaryServerAddress("127.0.0.1", false); - final Source> binding = StreamTcp.get(system) + final Source> binding = Tcp.get(system) .bind(serverAddress.getHostName(), serverAddress.getPort()); // TODO getHostString in Java7 final Future future = binding.to(echoHandler).run(materializer); @@ -59,7 +59,7 @@ public class StreamTcpTest extends StreamTest { final Future resultFuture = Source .from(testInput) // TODO getHostString in Java7 - .via(StreamTcp.get(system).outgoingConnection(serverAddress.getHostName(), serverAddress.getPort())) + .via(Tcp.get(system).outgoingConnection(serverAddress.getHostName(), serverAddress.getPort())) .runFold(ByteString.empty(), new Function2() { public ByteString apply(ByteString acc, ByteString elem) { @@ -76,7 +76,7 @@ public class StreamTcpTest extends StreamTest { @Test public void mustReportServerBindFailure() throws Exception { final InetSocketAddress serverAddress = TestUtils.temporaryServerAddress("127.0.0.1", false); - final Source> binding = StreamTcp.get(system) + final Source> binding = Tcp.get(system) .bind(serverAddress.getHostName(), serverAddress.getPort()); // TODO getHostString in Java7 final Future future = binding.to(echoHandler).run(materializer); @@ -99,7 +99,7 @@ public class StreamTcpTest extends StreamTest { Await.result( Source.from(testInput) // TODO getHostString in Java7 - .via(StreamTcp.get(system).outgoingConnection(serverAddress.getHostName(), serverAddress.getPort()), + .via(Tcp.get(system).outgoingConnection(serverAddress.getHostName(), serverAddress.getPort()), Keep.> right()) .to(Sink. ignore()) .run(materializer), diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/StreamTcpSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala similarity index 85% rename from akka-stream-tests/src/test/scala/akka/stream/io/StreamTcpSpec.scala rename to akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala index 7852e0b2f4..4011f5f73c 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/StreamTcpSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala @@ -3,7 +3,7 @@ */ package akka.stream.io -import akka.stream.scaladsl.StreamTcp.OutgoingConnection +import akka.stream.scaladsl.Tcp.OutgoingConnection import scala.concurrent.{ Future, Await } import akka.io.Tcp._ @@ -19,7 +19,7 @@ import akka.stream.testkit.Utils._ import akka.stream.scaladsl._ import akka.stream.testkit.TestUtils.temporaryServerAddress -class StreamTcpSpec extends AkkaSpec with TcpHelper { +class TcpSpec extends AkkaSpec with TcpHelper { import akka.stream.io.TcpHelper._ var demand = 0L @@ -32,7 +32,7 @@ class StreamTcpSpec extends AkkaSpec with TcpHelper { val tcpReadProbe = new TcpReadProbe() val tcpWriteProbe = new TcpWriteProbe() - Source(tcpWriteProbe.publisherProbe).via(StreamTcp().outgoingConnection(server.address)).to(Sink(tcpReadProbe.subscriberProbe)).run() + Source(tcpWriteProbe.publisherProbe).via(Tcp().outgoingConnection(server.address)).to(Sink(tcpReadProbe.subscriberProbe)).run() val serverConnection = server.waitAccept() validateServerClientCommunication(testData, serverConnection, tcpReadProbe, tcpWriteProbe) @@ -48,7 +48,7 @@ class StreamTcpSpec extends AkkaSpec with TcpHelper { val testInput = (0 to 255).map(ByteString(_)) val expectedOutput = ByteString(Array.tabulate(256)(_.asInstanceOf[Byte])) - Source(testInput).via(StreamTcp().outgoingConnection(server.address)).to(Sink.ignore).run() + Source(testInput).via(Tcp().outgoingConnection(server.address)).to(Sink.ignore).run() val serverConnection = server.waitAccept() serverConnection.read(256) @@ -63,7 +63,7 @@ class StreamTcpSpec extends AkkaSpec with TcpHelper { val idle = new TcpWriteProbe() // Just register an idle upstream val resultFuture = Source(idle.publisherProbe) - .via(StreamTcp().outgoingConnection(server.address)) + .via(Tcp().outgoingConnection(server.address)) .runFold(ByteString.empty)((acc, in) ⇒ acc ++ in) val serverConnection = server.waitAccept() @@ -82,7 +82,7 @@ class StreamTcpSpec extends AkkaSpec with TcpHelper { val tcpWriteProbe = new TcpWriteProbe() val tcpReadProbe = new TcpReadProbe() - Source(tcpWriteProbe.publisherProbe).via(StreamTcp().outgoingConnection(server.address)).to(Sink(tcpReadProbe.subscriberProbe)).run() + Source(tcpWriteProbe.publisherProbe).via(Tcp().outgoingConnection(server.address)).to(Sink(tcpReadProbe.subscriberProbe)).run() val serverConnection = server.waitAccept() // Client can still write @@ -112,7 +112,7 @@ class StreamTcpSpec extends AkkaSpec with TcpHelper { val tcpWriteProbe = new TcpWriteProbe() val tcpReadProbe = new TcpReadProbe() - Source(tcpWriteProbe.publisherProbe).via(StreamTcp().outgoingConnection(server.address)).to(Sink(tcpReadProbe.subscriberProbe)).run() + Source(tcpWriteProbe.publisherProbe).via(Tcp().outgoingConnection(server.address)).to(Sink(tcpReadProbe.subscriberProbe)).run() val serverConnection = server.waitAccept() // Server can still write @@ -140,7 +140,7 @@ class StreamTcpSpec extends AkkaSpec with TcpHelper { val tcpWriteProbe = new TcpWriteProbe() val tcpReadProbe = new TcpReadProbe() - Source(tcpWriteProbe.publisherProbe).via(StreamTcp().outgoingConnection(server.address)).to(Sink(tcpReadProbe.subscriberProbe)).run() + Source(tcpWriteProbe.publisherProbe).via(Tcp().outgoingConnection(server.address)).to(Sink(tcpReadProbe.subscriberProbe)).run() val serverConnection = server.waitAccept() // Server can still write @@ -172,7 +172,7 @@ class StreamTcpSpec extends AkkaSpec with TcpHelper { val tcpWriteProbe = new TcpWriteProbe() val tcpReadProbe = new TcpReadProbe() - Source(tcpWriteProbe.publisherProbe).via(StreamTcp().outgoingConnection(server.address)).to(Sink(tcpReadProbe.subscriberProbe)).run() + Source(tcpWriteProbe.publisherProbe).via(Tcp().outgoingConnection(server.address)).to(Sink(tcpReadProbe.subscriberProbe)).run() val serverConnection = server.waitAccept() // Client can still write @@ -205,7 +205,7 @@ class StreamTcpSpec extends AkkaSpec with TcpHelper { val tcpWriteProbe = new TcpWriteProbe() val tcpReadProbe = new TcpReadProbe() - Source(tcpWriteProbe.publisherProbe).via(StreamTcp().outgoingConnection(server.address)).to(Sink(tcpReadProbe.subscriberProbe)).run() + Source(tcpWriteProbe.publisherProbe).via(Tcp().outgoingConnection(server.address)).to(Sink(tcpReadProbe.subscriberProbe)).run() val serverConnection = server.waitAccept() // Server can still write @@ -235,7 +235,7 @@ class StreamTcpSpec extends AkkaSpec with TcpHelper { val tcpWriteProbe = new TcpWriteProbe() val tcpReadProbe = new TcpReadProbe() - Source(tcpWriteProbe.publisherProbe).via(StreamTcp().outgoingConnection(server.address)).to(Sink(tcpReadProbe.subscriberProbe)).run() + Source(tcpWriteProbe.publisherProbe).via(Tcp().outgoingConnection(server.address)).to(Sink(tcpReadProbe.subscriberProbe)).run() val serverConnection = server.waitAccept() // Server can still write @@ -262,7 +262,7 @@ class StreamTcpSpec extends AkkaSpec with TcpHelper { val tcpWriteProbe = new TcpWriteProbe() val tcpReadProbe = new TcpReadProbe() - Source(tcpWriteProbe.publisherProbe).via(StreamTcp().outgoingConnection(server.address)).to(Sink(tcpReadProbe.subscriberProbe)).run() + Source(tcpWriteProbe.publisherProbe).via(Tcp().outgoingConnection(server.address)).to(Sink(tcpReadProbe.subscriberProbe)).run() val serverConnection = server.waitAccept() // Server can still write @@ -291,7 +291,7 @@ class StreamTcpSpec extends AkkaSpec with TcpHelper { val tcpWriteProbe = new TcpWriteProbe() val tcpReadProbe = new TcpReadProbe() - Source(tcpWriteProbe.publisherProbe).via(StreamTcp().outgoingConnection(server.address)).to(Sink(tcpReadProbe.subscriberProbe)).run() + Source(tcpWriteProbe.publisherProbe).via(Tcp().outgoingConnection(server.address)).to(Sink(tcpReadProbe.subscriberProbe)).run() val serverConnection = server.waitAccept() serverConnection.abort() @@ -310,7 +310,7 @@ class StreamTcpSpec extends AkkaSpec with TcpHelper { val tcpWriteProbe1 = new TcpWriteProbe() val tcpReadProbe2 = new TcpReadProbe() val tcpWriteProbe2 = new TcpWriteProbe() - val outgoingConnection = StreamTcp().outgoingConnection(server.address) + val outgoingConnection = Tcp().outgoingConnection(server.address) val conn1F = Source(tcpWriteProbe1.publisherProbe) @@ -346,12 +346,12 @@ class StreamTcpSpec extends AkkaSpec with TcpHelper { "TCP listen stream" must { // Reusing handler - val echoHandler = Sink.foreach[StreamTcp.IncomingConnection] { _.flow.join(Flow[ByteString]).run() } + val echoHandler = Sink.foreach[Tcp.IncomingConnection] { _.flow.join(Flow[ByteString]).run() } "be able to implement echo" in { val serverAddress = temporaryServerAddress() val (bindingFuture, echoServerFinish) = - StreamTcp() + Tcp() .bind(serverAddress.getHostName, serverAddress.getPort) // TODO getHostString in Java7 .toMat(echoHandler)(Keep.both) .run() @@ -362,7 +362,7 @@ class StreamTcpSpec extends AkkaSpec with TcpHelper { val testInput = (0 to 255).map(ByteString(_)) val expectedOutput = ByteString(Array.tabulate(256)(_.asInstanceOf[Byte])) val resultFuture = - Source(testInput).via(StreamTcp().outgoingConnection(serverAddress)).runFold(ByteString.empty)((acc, in) ⇒ acc ++ in) + Source(testInput).via(Tcp().outgoingConnection(serverAddress)).runFold(ByteString.empty)((acc, in) ⇒ acc ++ in) Await.result(resultFuture, 3.seconds) should be(expectedOutput) Await.result(binding.unbind(), 3.seconds) @@ -372,7 +372,7 @@ class StreamTcpSpec extends AkkaSpec with TcpHelper { "work with a chain of echoes" in { val serverAddress = temporaryServerAddress() val (bindingFuture, echoServerFinish) = - StreamTcp() + Tcp() .bind(serverAddress.getHostName, serverAddress.getPort) // TODO getHostString in Java7 .toMat(echoHandler)(Keep.both) .run() @@ -380,7 +380,7 @@ class StreamTcpSpec extends AkkaSpec with TcpHelper { // make sure that the server has bound to the socket val binding = Await.result(bindingFuture, 100.millis) - val echoConnection = StreamTcp().outgoingConnection(serverAddress) + val echoConnection = Tcp().outgoingConnection(serverAddress) val testInput = (0 to 255).map(ByteString(_)) val expectedOutput = ByteString(Array.tabulate(256)(_.asInstanceOf[Byte])) @@ -400,18 +400,18 @@ class StreamTcpSpec extends AkkaSpec with TcpHelper { "bind and unbind correctly" in { val address = temporaryServerAddress() - val probe1 = TestSubscriber.manualProbe[StreamTcp.IncomingConnection]() - val bind = StreamTcp(system).bind(address.getHostName, address.getPort) // TODO getHostString in Java7 + val probe1 = TestSubscriber.manualProbe[Tcp.IncomingConnection]() + val bind = Tcp(system).bind(address.getHostName, address.getPort) // TODO getHostString in Java7 // Bind succeeded, we have a local address val binding1 = Await.result(bind.to(Sink(probe1)).run(), 3.second) probe1.expectSubscription() - val probe2 = TestSubscriber.manualProbe[StreamTcp.IncomingConnection]() + val probe2 = TestSubscriber.manualProbe[Tcp.IncomingConnection]() val binding2F = bind.to(Sink(probe2)).run() probe2.expectSubscriptionAndError(BindFailedException) - val probe3 = TestSubscriber.manualProbe[StreamTcp.IncomingConnection]() + val probe3 = TestSubscriber.manualProbe[Tcp.IncomingConnection]() val binding3F = bind.to(Sink(probe3)).run() probe3.expectSubscriptionAndError() @@ -422,7 +422,7 @@ class StreamTcpSpec extends AkkaSpec with TcpHelper { Await.result(binding1.unbind(), 1.second) probe1.expectComplete() - val probe4 = TestSubscriber.manualProbe[StreamTcp.IncomingConnection]() + val probe4 = TestSubscriber.manualProbe[Tcp.IncomingConnection]() // Bind succeeded, we have a local address val binding4 = Await.result(bind.to(Sink(probe4)).run(), 3.second) probe4.expectSubscription() diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/TlsSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/TlsSpec.scala index 4f8f3b57d8..aaca97a2c3 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/TlsSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/TlsSpec.scala @@ -154,7 +154,7 @@ class TlsSpec extends AkkaSpec("akka.loglevel=INFO\nakka.actor.debug.receive=off } def server(flow: Flow[ByteString, ByteString, Any]) = { - val server = StreamTcp() + val server = Tcp() .bind("localhost", 0) .to(Sink.foreach(c ⇒ c.flow.join(flow).run())) .run() @@ -162,21 +162,21 @@ class TlsSpec extends AkkaSpec("akka.loglevel=INFO\nakka.actor.debug.receive=off } object ClientInitiatesViaTcp extends CommunicationSetup { - var binding: StreamTcp.ServerBinding = null + var binding: Tcp.ServerBinding = null def decorateFlow(leftClosing: Closing, rightClosing: Closing, rhs: Flow[SslTlsInbound, SslTlsOutbound, Any]) = { binding = server(serverTls(rightClosing).reversed join rhs) - clientTls(leftClosing) join StreamTcp().outgoingConnection(binding.localAddress) + clientTls(leftClosing) join Tcp().outgoingConnection(binding.localAddress) } override def cleanup(): Unit = binding.unbind() } object ServerInitiatesViaTcp extends CommunicationSetup { - var binding: StreamTcp.ServerBinding = null + var binding: Tcp.ServerBinding = null def decorateFlow(leftClosing: Closing, rightClosing: Closing, rhs: Flow[SslTlsInbound, SslTlsOutbound, Any]) = { binding = server(clientTls(rightClosing).reversed join rhs) - serverTls(leftClosing) join StreamTcp().outgoingConnection(binding.localAddress) + serverTls(leftClosing) join Tcp().outgoingConnection(binding.localAddress) } override def cleanup(): Unit = binding.unbind() } diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/StreamTcpManager.scala b/akka-stream/src/main/scala/akka/stream/impl/io/StreamTcpManager.scala index e60ae0eef1..8778a3c763 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/StreamTcpManager.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/StreamTcpManager.scala @@ -16,7 +16,7 @@ import akka.io.Tcp import akka.stream.ActorFlowMaterializerSettings import akka.stream.impl.ActorProcessor import akka.stream.impl.ActorPublisher -import akka.stream.scaladsl.StreamTcp +import akka.stream.scaladsl.{ Tcp ⇒ StreamTcp } import akka.util.ByteString import org.reactivestreams.Processor import org.reactivestreams.Subscriber diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/TcpListenStreamActor.scala b/akka-stream/src/main/scala/akka/stream/impl/io/TcpListenStreamActor.scala index 69d602763a..6ad135f912 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/TcpListenStreamActor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/TcpListenStreamActor.scala @@ -13,7 +13,7 @@ import akka.io.Tcp._ import akka.stream.{ FlowMaterializer, ActorFlowMaterializerSettings } import akka.stream.impl._ import akka.stream.scaladsl.Flow -import akka.stream.scaladsl.StreamTcp +import akka.stream.scaladsl.{ Tcp ⇒ StreamTcp } import akka.util.ByteString import org.reactivestreams.Subscriber import akka.stream.ConnectionException diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/StreamTcp.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Tcp.scala similarity index 79% rename from akka-stream/src/main/scala/akka/stream/javadsl/StreamTcp.scala rename to akka-stream/src/main/scala/akka/stream/javadsl/Tcp.scala index 385134bf01..b4df1b3d3e 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/StreamTcp.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Tcp.scala @@ -19,12 +19,12 @@ import akka.util.ByteString import akka.japi.Util.immutableSeq import akka.io.Inet.SocketOption -object StreamTcp extends ExtensionId[StreamTcp] with ExtensionIdProvider { +object Tcp extends ExtensionId[Tcp] with ExtensionIdProvider { /** * Represents a prospective TCP server binding. */ - class ServerBinding private[akka] (delegate: scaladsl.StreamTcp.ServerBinding) { + class ServerBinding private[akka] (delegate: scaladsl.Tcp.ServerBinding) { /** * The local address of the endpoint bound by the materialization of the `connections` [[Source]]. */ @@ -42,7 +42,7 @@ object StreamTcp extends ExtensionId[StreamTcp] with ExtensionIdProvider { /** * Represents an accepted incoming TCP connection. */ - class IncomingConnection private[akka] (delegate: scaladsl.StreamTcp.IncomingConnection) { + class IncomingConnection private[akka] (delegate: scaladsl.Tcp.IncomingConnection) { /** * The local address this connection is bound to. */ @@ -72,7 +72,7 @@ object StreamTcp extends ExtensionId[StreamTcp] with ExtensionIdProvider { /** * Represents a prospective outgoing TCP connection. */ - class OutgoingConnection private[akka] (delegate: scaladsl.StreamTcp.OutgoingConnection) { + class OutgoingConnection private[akka] (delegate: scaladsl.Tcp.OutgoingConnection) { /** * The remote address this connection is or will be bound to. */ @@ -84,21 +84,21 @@ object StreamTcp extends ExtensionId[StreamTcp] with ExtensionIdProvider { def localAddress: InetSocketAddress = delegate.localAddress } - override def get(system: ActorSystem): StreamTcp = super.get(system) + override def get(system: ActorSystem): Tcp = super.get(system) - def lookup() = StreamTcp + def lookup() = Tcp - def createExtension(system: ExtendedActorSystem): StreamTcp = new StreamTcp(system) + def createExtension(system: ExtendedActorSystem): Tcp = new Tcp(system) } -class StreamTcp(system: ExtendedActorSystem) extends akka.actor.Extension { - import StreamTcp._ +class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension { + import Tcp._ import akka.dispatch.ExecutionContexts.{ sameThreadExecutionContext ⇒ ec } - private lazy val delegate: scaladsl.StreamTcp = scaladsl.StreamTcp(system) + private lazy val delegate: scaladsl.Tcp = scaladsl.Tcp(system) /** - * Creates a [[StreamTcp.ServerBinding]] instance which represents a prospective TCP server binding on the given `endpoint`. + * Creates a [[Tcp.ServerBinding]] instance which represents a prospective TCP server binding on the given `endpoint`. */ def bind(interface: String, port: Int, @@ -110,7 +110,7 @@ class StreamTcp(system: ExtendedActorSystem) extends akka.actor.Extension { .mapMaterialized(_.map(new ServerBinding(_))(ec))) /** - * Creates a [[StreamTcp.ServerBinding]] without specifying options. + * Creates a [[Tcp.ServerBinding]] without specifying options. * It represents a prospective TCP server binding on the given `endpoint`. */ def bind(interface: String, port: Int): Source[IncomingConnection, Future[ServerBinding]] = @@ -119,7 +119,7 @@ class StreamTcp(system: ExtendedActorSystem) extends akka.actor.Extension { .mapMaterialized(_.map(new ServerBinding(_))(ec))) /** - * Creates an [[StreamTcp.OutgoingConnection]] instance representing a prospective TCP client connection to the given endpoint. + * Creates an [[Tcp.OutgoingConnection]] instance representing a prospective TCP client connection to the given endpoint. */ def outgoingConnection(remoteAddress: InetSocketAddress, localAddress: Option[InetSocketAddress], @@ -130,7 +130,7 @@ class StreamTcp(system: ExtendedActorSystem) extends akka.actor.Extension { .mapMaterialized(_.map(new OutgoingConnection(_))(ec))) /** - * Creates an [[StreamTcp.OutgoingConnection]] without specifying options. + * Creates an [[Tcp.OutgoingConnection]] without specifying options. * It represents a prospective TCP client connection to the given endpoint. */ def outgoingConnection(host: String, port: Int): Flow[ByteString, ByteString, Future[OutgoingConnection]] = diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/StreamTcp.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Tcp.scala similarity index 87% rename from akka-stream/src/main/scala/akka/stream/scaladsl/StreamTcp.scala rename to akka-stream/src/main/scala/akka/stream/scaladsl/Tcp.scala index e759eb1004..a7bae8a5d5 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/StreamTcp.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Tcp.scala @@ -18,7 +18,7 @@ import akka.actor.ExtensionId import akka.actor.ExtensionIdProvider import akka.actor.Props import akka.io.Inet.SocketOption -import akka.io.Tcp +import akka.io.{ Tcp ⇒ IoTcp } import akka.stream._ import akka.stream.impl._ import akka.stream.impl.ReactiveStreamsCompliance._ @@ -31,7 +31,7 @@ import akka.stream.impl.io.TcpListenStreamActor import akka.stream.impl.io.DelayedInitProcessor import akka.stream.impl.io.StreamTcpManager -object StreamTcp extends ExtensionId[StreamTcp] with ExtensionIdProvider { +object Tcp extends ExtensionId[Tcp] with ExtensionIdProvider { /** * * Represents a succdessful TCP server binding. @@ -64,20 +64,20 @@ object StreamTcp extends ExtensionId[StreamTcp] with ExtensionIdProvider { */ case class OutgoingConnection(remoteAddress: InetSocketAddress, localAddress: InetSocketAddress) - def apply()(implicit system: ActorSystem): StreamTcp = super.apply(system) + def apply()(implicit system: ActorSystem): Tcp = super.apply(system) - override def get(system: ActorSystem): StreamTcp = super.get(system) + override def get(system: ActorSystem): Tcp = super.get(system) - def lookup() = StreamTcp + def lookup() = Tcp - def createExtension(system: ExtendedActorSystem): StreamTcp = new StreamTcp(system) + def createExtension(system: ExtendedActorSystem): Tcp = new Tcp(system) } -class StreamTcp(system: ExtendedActorSystem) extends akka.actor.Extension { - import StreamTcp._ +class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension { + import Tcp._ private val manager: ActorRef = system.systemActorOf(Props[StreamTcpManager] - .withDispatcher(Tcp(system).Settings.ManagementDispatcher), name = "IO-TCP-STREAM") + .withDispatcher(IoTcp(system).Settings.ManagementDispatcher), name = "IO-TCP-STREAM") private class BindSource( val endpoint: InetSocketAddress, @@ -122,7 +122,7 @@ class StreamTcp(system: ExtendedActorSystem) extends akka.actor.Extension { } /** - * Creates a [[StreamTcp.ServerBinding]] instance which represents a prospective TCP server binding on the given `endpoint`. + * Creates a [[Tcp.ServerBinding]] instance which represents a prospective TCP server binding on the given `endpoint`. */ def bind(interface: String, port: Int, @@ -146,7 +146,7 @@ class StreamTcp(system: ExtendedActorSystem) extends akka.actor.Extension { } /** - * Creates an [[StreamTcp.OutgoingConnection]] instance representing a prospective TCP client connection to the given endpoint. + * Creates an [[Tcp.OutgoingConnection]] instance representing a prospective TCP client connection to the given endpoint. */ def outgoingConnection(remoteAddress: InetSocketAddress, localAddress: Option[InetSocketAddress] = None, @@ -169,7 +169,7 @@ class StreamTcp(system: ExtendedActorSystem) extends akka.actor.Extension { } /** - * Creates an [[StreamTcp.OutgoingConnection]] without specifying options. + * Creates an [[Tcp.OutgoingConnection]] without specifying options. * It represents a prospective TCP client connection to the given endpoint. */ def outgoingConnection(host: String, port: Int): Flow[ByteString, ByteString, Future[OutgoingConnection]] =