diff --git a/akka-http-core/src/main/scala/akka/http/Http.scala b/akka-http-core/src/main/scala/akka/http/Http.scala index e12d6d79f8..a108634900 100644 --- a/akka-http-core/src/main/scala/akka/http/Http.scala +++ b/akka-http-core/src/main/scala/akka/http/Http.scala @@ -12,7 +12,6 @@ import akka.event.LoggingAdapter import akka.util.ByteString import akka.io.Inet import akka.stream.FlowMaterializer -import akka.stream.io.StreamTcp import akka.stream.scaladsl._ import akka.http.engine.client.{ HttpClient, ClientConnectionSettings } import akka.http.engine.server.{ HttpServer, ServerSettings } @@ -201,7 +200,7 @@ object Http extends ExtensionId[HttpExt] with ExtensionIdProvider { * A flow representing the HTTP server on a single HTTP connection. * This flow can be materialized several times, every materialization will open a new connection to the `remoteAddress`. * If the connection cannot be established the materialized stream will immediately be terminated - * with a [[StreamTcp.ConnectionAttemptFailedException]]. + * with a [[akka.stream.StreamTcpException]]. */ def flow: Flow[HttpRequest, HttpResponse] } diff --git a/akka-http-core/src/test/scala/akka/http/ClientServerSpec.scala b/akka-http-core/src/test/scala/akka/http/ClientServerSpec.scala index 1e971ef694..6b4fceea45 100644 --- a/akka-http-core/src/test/scala/akka/http/ClientServerSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/ClientServerSpec.scala @@ -12,7 +12,8 @@ import scala.concurrent.Await import scala.concurrent.duration._ import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpec } import akka.actor.ActorSystem -import akka.stream.io.StreamTcp +import akka.stream.scaladsl.StreamTcp +import akka.stream.BindFailedException import akka.stream.FlowMaterializer import akka.stream.testkit.StreamTestKit import akka.stream.testkit.StreamTestKit.{ PublisherProbe, SubscriberProbe } @@ -55,7 +56,7 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll { val probe2 = StreamTestKit.SubscriberProbe[Http.IncomingConnection]() binding.connections.runWith(Sink(probe2)) - probe2.expectError(StreamTcp.BindFailedException) + probe2.expectError(BindFailedException) Await.result(binding.unbind(mm1), 1.second) val probe3 = StreamTestKit.SubscriberProbe[Http.IncomingConnection]() diff --git a/akka-stream-testkit/src/test/scala/akka/stream/testkit/StreamTestDefaultMailbox.scala b/akka-stream-testkit/src/test/scala/akka/stream/testkit/StreamTestDefaultMailbox.scala index 1db73d399f..fb66ea5e4c 100644 --- a/akka-stream-testkit/src/test/scala/akka/stream/testkit/StreamTestDefaultMailbox.scala +++ b/akka-stream-testkit/src/test/scala/akka/stream/testkit/StreamTestDefaultMailbox.scala @@ -8,7 +8,7 @@ import akka.actor.ActorSystem import akka.dispatch.MailboxType import akka.actor.ActorRef import akka.actor.ActorRefWithCell -import akka.stream.io.StreamTcpManager +import akka.stream.impl.io.StreamTcpManager import akka.actor.Actor /** diff --git a/akka-stream-testkit/src/test/scala/akka/stream/testkit/TestUtils.scala b/akka-stream-testkit/src/test/scala/akka/stream/testkit/TestUtils.scala new file mode 100644 index 0000000000..878701ff6e --- /dev/null +++ b/akka-stream-testkit/src/test/scala/akka/stream/testkit/TestUtils.scala @@ -0,0 +1,33 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.testkit + +import scala.collection.immutable +import java.nio.channels.DatagramChannel +import java.nio.channels.ServerSocketChannel +import java.net.InetSocketAddress +import java.net.SocketAddress + +object TestUtils { // FIXME: remove once going back to project dependencies + // Structural type needed since DatagramSocket and ServerSocket has no common ancestor apart from Object + type GeneralSocket = { + def bind(sa: SocketAddress): Unit + def close(): Unit + def getLocalPort(): Int + } + + def temporaryServerAddress(address: String = "127.0.0.1", udp: Boolean = false): InetSocketAddress = + temporaryServerAddresses(1, address, udp).head + + def temporaryServerAddresses(numberOfAddresses: Int, hostname: String = "127.0.0.1", udp: Boolean = false): immutable.IndexedSeq[InetSocketAddress] = { + Vector.fill(numberOfAddresses) { + val serverSocket: GeneralSocket = + if (udp) DatagramChannel.open().socket() + else ServerSocketChannel.open().socket() + + serverSocket.bind(new InetSocketAddress(hostname, 0)) + (serverSocket, new InetSocketAddress(hostname, serverSocket.getLocalPort)) + } collect { case (socket, address) ⇒ socket.close(); address } + } +} diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowGraphTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowGraphTest.java index fbbfa9f9b3..d7bafe8edb 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowGraphTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowGraphTest.java @@ -1,3 +1,6 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ package akka.stream.javadsl; import akka.actor.ActorRef; diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java index 0b99164127..1b1e61d5a3 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java @@ -1,3 +1,6 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ package akka.stream.javadsl; import akka.actor.ActorRef; diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java index 16149a8618..b6df362cdc 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java @@ -1,3 +1,6 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ package akka.stream.javadsl; import akka.actor.ActorRef; diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/StreamTcpTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/StreamTcpTest.java new file mode 100644 index 0000000000..f2932186b5 --- /dev/null +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/StreamTcpTest.java @@ -0,0 +1,123 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.javadsl; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import org.junit.ClassRule; +import org.junit.Test; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; + +import akka.stream.BindFailedException; +import akka.stream.StreamTcpException; +import akka.stream.StreamTest; +import akka.stream.javadsl.StreamTcp.IncomingConnection; +import akka.stream.javadsl.StreamTcp.ServerBinding; +import akka.stream.javadsl.japi.Function2; +import akka.stream.javadsl.japi.Procedure; +import akka.stream.testkit.AkkaSpec; +import akka.stream.testkit.TestUtils; +import akka.util.ByteString; + +public class StreamTcpTest extends StreamTest { + public StreamTcpTest() { + super(actorSystemResource); + } + + @ClassRule + public static AkkaJUnitActorSystemResource actorSystemResource = new AkkaJUnitActorSystemResource("StreamTcpTest", + AkkaSpec.testConf()); + + final Sink echoHandler = + Sink.foreach(new Procedure() { + public void apply(IncomingConnection conn) { + conn.handleWith(Flow.empty(), materializer); + } + }); + + final List testInput = new ArrayList(); + { + for (char c = 'a'; c <= 'z'; c++) { + testInput.add(ByteString.fromString(String.valueOf(c))); + } + } + + @Test + public void mustWorkInHappyCase() throws Exception { + + final InetSocketAddress serverAddress = TestUtils.temporaryServerAddress("127.0.0.1", false); + final ServerBinding binding = StreamTcp.get(system).bind(serverAddress); + + final MaterializedMap materializedServer = binding.connections().to(echoHandler).run(materializer); + final Future serverFuture = binding.localAddress(materializedServer); + final InetSocketAddress s = Await.result(serverFuture, FiniteDuration.create(5, TimeUnit.SECONDS)); + assertEquals(s.getPort(), serverAddress.getPort()); + + final Source responseStream = + Source.from(testInput).via(StreamTcp.get(system).outgoingConnection(serverAddress).flow()); + + final Future resultFuture = responseStream.fold( + ByteString.empty(), new Function2() { + public ByteString apply(ByteString acc, ByteString elem) { + return acc.concat(elem); + } + }, materializer); + + final byte[] result = Await.result(resultFuture, FiniteDuration.create(5, TimeUnit.SECONDS)).toArray(); + for (int i = 0; i < testInput.size(); i ++) { + assertEquals(testInput.get(i).head(), result[i]); + } + + } + + @Test + public void mustReportServerBindFailure() throws Exception { + + final InetSocketAddress serverAddress = TestUtils.temporaryServerAddress("127.0.0.1", false); + final ServerBinding binding = StreamTcp.get(system).bind(serverAddress); + + final MaterializedMap materializedServer = binding.connections().to(echoHandler).run(materializer); + final Future serverFuture = binding.localAddress(materializedServer); + final InetSocketAddress s = Await.result(serverFuture, FiniteDuration.create(5, TimeUnit.SECONDS)); + assertEquals(s.getPort(), serverAddress.getPort()); + + // bind again, to same port + final MaterializedMap materializedServer2 = binding.connections().to(echoHandler).run(materializer); + final Future serverFuture2 = binding.localAddress(materializedServer2); + boolean bindFailed = false; + try { + Await.result(serverFuture2, FiniteDuration.create(5, TimeUnit.SECONDS)); + } catch (BindFailedException e) { + // as expected + bindFailed = true; + } + assertTrue("Expected BindFailedException, but nothing was reported", bindFailed); + } + + @Test + public void mustReportClientConnectFailure() throws Exception { + + final InetSocketAddress serverAddress = TestUtils.temporaryServerAddress("127.0.0.1", false); + final Source responseStream = + Source.from(testInput).via(StreamTcp.get(system).outgoingConnection(serverAddress).flow()); + final Future resultFuture = responseStream.runWith(Sink.head(), materializer); + + boolean streamTcpException = false; + try { + Await.result(resultFuture, FiniteDuration.create(5, TimeUnit.SECONDS)); + } catch (StreamTcpException e) { + // as expected + streamTcpException = true; + } + assertTrue("Expected StreamTcpException, but nothing was reported", streamTcpException); + + } + +} diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/TcpFlowSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/StreamTcpSpec.scala similarity index 97% rename from akka-stream-tests/src/test/scala/akka/stream/io/TcpFlowSpec.scala rename to akka-stream-tests/src/test/scala/akka/stream/io/StreamTcpSpec.scala index a21c12ab9b..6bffa63ad5 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/TcpFlowSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/StreamTcpSpec.scala @@ -9,8 +9,9 @@ import akka.util.ByteString import akka.stream.scaladsl.Flow import akka.stream.testkit.AkkaSpec import akka.stream.scaladsl._ +import akka.stream.testkit.TestUtils.temporaryServerAddress -class TcpFlowSpec extends AkkaSpec with TcpHelper { +class StreamTcpSpec extends AkkaSpec with TcpHelper { import akka.stream.io.TcpHelper._ var demand = 0L @@ -185,7 +186,7 @@ class TcpFlowSpec extends AkkaSpec with TcpHelper { val echoHandler = ForeachSink[StreamTcp.IncomingConnection] { _ handleWith Flow[ByteString] } "be able to implement echo" in { - val serverAddress = temporaryServerAddress + val serverAddress = temporaryServerAddress() val binding = StreamTcp().bind(serverAddress) val echoServerMM = binding.connections.to(echoHandler).run() @@ -205,7 +206,7 @@ class TcpFlowSpec extends AkkaSpec with TcpHelper { } "work with a chain of echoes" in { - val serverAddress = temporaryServerAddress + val serverAddress = temporaryServerAddress() val binding = StreamTcp(system).bind(serverAddress) val echoServerMM = binding.connections.to(echoHandler).run() diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/TcpHelper.scala b/akka-stream-tests/src/test/scala/akka/stream/io/TcpHelper.scala index 1d97f43837..b58b9a00ef 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/TcpHelper.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/TcpHelper.scala @@ -18,6 +18,7 @@ import scala.collection.immutable.Queue import scala.concurrent.{ Await, Future } import scala.concurrent.duration.Duration import akka.stream.scaladsl.Source +import akka.stream.testkit.TestUtils.temporaryServerAddress object TcpHelper { case class ClientWrite(bytes: ByteString) @@ -103,14 +104,6 @@ object TcpHelper { } - // FIXME: get it from TestUtil - def temporaryServerAddress: InetSocketAddress = { - val serverSocket = ServerSocketChannel.open().socket() - serverSocket.bind(new InetSocketAddress("127.0.0.1", 0)) - val address = new InetSocketAddress("127.0.0.1", serverSocket.getLocalPort) - serverSocket.close() - address - } } trait TcpHelper { this: TestKitBase ⇒ @@ -121,7 +114,7 @@ trait TcpHelper { this: TestKitBase ⇒ implicit val materializer = FlowMaterializer(settings) - class Server(val address: InetSocketAddress = temporaryServerAddress) { + class Server(val address: InetSocketAddress = temporaryServerAddress()) { val serverProbe = TestProbe() val serverRef = system.actorOf(testServerProps(address, serverProbe.ref)) serverProbe.expectMsgType[Tcp.Bound] diff --git a/akka-stream/src/main/scala/akka/stream/StreamTcpException.scala b/akka-stream/src/main/scala/akka/stream/StreamTcpException.scala new file mode 100644 index 0000000000..6fde9797cf --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/StreamTcpException.scala @@ -0,0 +1,16 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream + +import scala.util.control.NoStackTrace +import java.net.InetSocketAddress + +class StreamTcpException(msg: String) extends RuntimeException(msg) with NoStackTrace + +abstract class BindFailedException extends StreamTcpException("bind failed") + +case object BindFailedException extends BindFailedException + +class ConnectionException(msg: String) extends StreamTcpException(msg) + diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/DelayedInitProcessor.scala b/akka-stream/src/main/scala/akka/stream/impl/io/DelayedInitProcessor.scala new file mode 100644 index 0000000000..8e4d3518f1 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/impl/io/DelayedInitProcessor.scala @@ -0,0 +1,42 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.impl.io + +import scala.concurrent.ExecutionContext +import org.reactivestreams.Subscription +import org.reactivestreams.Processor +import org.reactivestreams.Subscriber +import scala.concurrent.Future +import scala.util.Failure +import scala.util.Success + +/** + * INTERNAL API + */ +private[akka] class DelayedInitProcessor[I, O](val implFuture: Future[Processor[I, O]])(implicit ec: ExecutionContext) extends Processor[I, O] { + @volatile private var impl: Processor[I, O] = _ + private val setVarFuture = implFuture.andThen { case Success(p) ⇒ impl = p } + + override def onSubscribe(s: Subscription): Unit = setVarFuture.onComplete { + case Success(x) ⇒ x.onSubscribe(s) + case Failure(_) ⇒ s.cancel() + } + + override def onError(t: Throwable): Unit = { + if (impl eq null) setVarFuture.onSuccess { case p ⇒ p.onError(t) } + else impl.onError(t) + } + + override def onComplete(): Unit = { + if (impl eq null) setVarFuture.onSuccess { case p ⇒ p.onComplete() } + else impl.onComplete() + } + + override def onNext(t: I): Unit = impl.onNext(t) + + override def subscribe(s: Subscriber[_ >: O]): Unit = setVarFuture.onComplete { + case Success(x) ⇒ x.subscribe(s) + case Failure(e) ⇒ s.onError(e) + } +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/StreamTcpManager.scala b/akka-stream/src/main/scala/akka/stream/impl/io/StreamTcpManager.scala new file mode 100644 index 0000000000..1b8ce3936d --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/impl/io/StreamTcpManager.scala @@ -0,0 +1,87 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.impl.io + +import java.net.InetSocketAddress +import java.net.URLEncoder +import scala.collection.immutable +import scala.concurrent.Future +import scala.concurrent.Promise +import scala.concurrent.duration.Duration +import scala.concurrent.duration.FiniteDuration +import akka.actor.Actor +import akka.io.Inet.SocketOption +import akka.io.Tcp +import akka.stream.MaterializerSettings +import akka.stream.impl.ActorProcessor +import akka.stream.impl.ActorPublisher +import akka.stream.scaladsl.StreamTcp +import akka.util.ByteString +import org.reactivestreams.Processor +import org.reactivestreams.Subscriber + +/** + * INTERNAL API + */ +private[akka] object StreamTcpManager { + /** + * INTERNAL API + */ + private[akka] case class Connect(processorPromise: Promise[Processor[ByteString, ByteString]], + localAddressPromise: Promise[InetSocketAddress], + remoteAddress: InetSocketAddress, + localAddress: Option[InetSocketAddress], + options: immutable.Traversable[SocketOption], + connectTimeout: Duration, + idleTimeout: Duration) + + /** + * INTERNAL API + */ + private[akka] case class Bind(localAddressPromise: Promise[InetSocketAddress], + unbindPromise: Promise[() ⇒ Future[Unit]], + flowSubscriber: Subscriber[StreamTcp.IncomingConnection], + endpoint: InetSocketAddress, + backlog: Int, + options: immutable.Traversable[SocketOption], + idleTimeout: Duration) + + /** + * INTERNAL API + */ + private[akka] case class ExposedProcessor(processor: Processor[ByteString, ByteString]) + +} + +/** + * INTERNAL API + */ +private[akka] class StreamTcpManager extends Actor { + import StreamTcpManager._ + + var nameCounter = 0 + def encName(prefix: String, endpoint: InetSocketAddress) = { + nameCounter += 1 + s"$prefix-$nameCounter-${URLEncoder.encode(endpoint.toString, "utf-8")}" + } + + def receive: Receive = { + case Connect(processorPromise, localAddressPromise, remoteAddress, localAddress, options, connectTimeout, _) ⇒ + val connTimeout = connectTimeout match { + case x: FiniteDuration ⇒ Some(x) + case _ ⇒ None + } + val processorActor = context.actorOf(TcpStreamActor.outboundProps(processorPromise, localAddressPromise, + Tcp.Connect(remoteAddress, localAddress, options, connTimeout, pullMode = true), + materializerSettings = MaterializerSettings(context.system)), name = encName("client", remoteAddress)) + processorActor ! ExposedProcessor(ActorProcessor[ByteString, ByteString](processorActor)) + + case Bind(localAddressPromise, unbindPromise, flowSubscriber, endpoint, backlog, options, _) ⇒ + val publisherActor = context.actorOf(TcpListenStreamActor.props(localAddressPromise, unbindPromise, + flowSubscriber, Tcp.Bind(context.system.deadLetters, endpoint, backlog, options, pullMode = true), + MaterializerSettings(context.system)), name = encName("server", endpoint)) + // this sends the ExposedPublisher message to the publisher actor automatically + ActorPublisher[Any](publisherActor) + } +} diff --git a/akka-stream/src/main/scala/akka/stream/io/TcpConnectionStream.scala b/akka-stream/src/main/scala/akka/stream/impl/io/TcpConnectionStream.scala similarity index 95% rename from akka-stream/src/main/scala/akka/stream/io/TcpConnectionStream.scala rename to akka-stream/src/main/scala/akka/stream/impl/io/TcpConnectionStream.scala index 1aaff85999..10307a6e7c 100644 --- a/akka-stream/src/main/scala/akka/stream/io/TcpConnectionStream.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/TcpConnectionStream.scala @@ -1,27 +1,26 @@ /** * Copyright (C) 2009-2014 Typesafe Inc. */ -package akka.stream.io +package akka.stream.impl.io import java.net.InetSocketAddress - import akka.io.{ IO, Tcp } import scala.concurrent.Promise import scala.util.control.NoStackTrace import akka.actor.{ ActorRefFactory, Actor, Props, ActorRef, Status } -import akka.stream.impl._ import akka.util.ByteString import akka.io.Tcp._ import akka.stream.MaterializerSettings +import akka.stream.StreamTcpException import org.reactivestreams.Processor import akka.actor.Stash +import akka.stream.impl._ /** * INTERNAL API */ private[akka] object TcpStreamActor { case object WriteAck extends Tcp.Event - class TcpStreamException(msg: String) extends RuntimeException(msg) with NoStackTrace def outboundProps(processorPromise: Promise[Processor[ByteString, ByteString]], localAddressPromise: Promise[InetSocketAddress], @@ -77,9 +76,9 @@ private[akka] abstract class TcpStreamActor(val settings: MaterializerSettings) case PeerClosed ⇒ closed = true readPump.pump() - case ErrorClosed(cause) ⇒ fail(new TcpStreamException(s"The connection closed with error $cause")) - case CommandFailed(cmd) ⇒ fail(new TcpStreamException(s"Tcp command [$cmd] failed")) - case Aborted ⇒ fail(new TcpStreamException("The connection has been aborted")) + case ErrorClosed(cause) ⇒ fail(new StreamTcpException(s"The connection closed with error $cause")) + case CommandFailed(cmd) ⇒ fail(new StreamTcpException(s"Tcp command [$cmd] failed")) + case Aborted ⇒ fail(new StreamTcpException("The connection has been aborted")) } override def inputsAvailable: Boolean = pendingElement ne null @@ -237,9 +236,9 @@ private[akka] class OutboundTcpStreamActor(processorPromise: Promise[Processor[B initSteps.become(Actor.emptyBehavior) case f: CommandFailed ⇒ - val ex = new TcpStreamException("Connection failed.") + val ex = new StreamTcpException("Connection failed.") localAddressPromise.failure(ex) processorPromise.failure(ex) fail(ex) } -} \ No newline at end of file +} diff --git a/akka-stream/src/main/scala/akka/stream/io/TcpListenStreamActor.scala b/akka-stream/src/main/scala/akka/stream/impl/io/TcpListenStreamActor.scala similarity index 95% rename from akka-stream/src/main/scala/akka/stream/io/TcpListenStreamActor.scala rename to akka-stream/src/main/scala/akka/stream/impl/io/TcpListenStreamActor.scala index 41111c1642..3b7303bc36 100644 --- a/akka-stream/src/main/scala/akka/stream/io/TcpListenStreamActor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/TcpListenStreamActor.scala @@ -1,19 +1,24 @@ /** * Copyright (C) 2009-2014 Typesafe Inc. */ -package akka.stream.io +package akka.stream.impl.io import java.net.InetSocketAddress -import akka.stream.io.StreamTcp.ConnectionException -import org.reactivestreams.Subscriber import scala.concurrent.{ Future, Promise } -import akka.util.ByteString -import akka.io.Tcp._ +import akka.actor.Actor +import akka.actor.ActorRef +import akka.actor.Props +import akka.actor.Stash import akka.io.{ IO, Tcp } +import akka.io.Tcp._ import akka.stream.{ FlowMaterializer, MaterializerSettings } -import akka.stream.scaladsl.{ Flow, Pipe } import akka.stream.impl._ -import akka.actor._ +import akka.stream.scaladsl.{ Flow, Pipe } +import akka.stream.scaladsl.StreamTcp +import akka.util.ByteString +import org.reactivestreams.Subscriber +import akka.stream.ConnectionException +import akka.stream.BindFailedException /** * INTERNAL API @@ -81,7 +86,7 @@ private[akka] class TcpListenStreamActor(localAddressPromise: Promise[InetSocket primaryOutputs.getExposedPublisher.subscribe(flowSubscriber.asInstanceOf[Subscriber[Any]]) subreceive.become(running) case f: CommandFailed ⇒ - val ex = StreamTcp.BindFailedException + val ex = BindFailedException localAddressPromise.failure(ex) unbindPromise.failure(ex) flowSubscriber.onError(ex) diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/StreamTcp.scala b/akka-stream/src/main/scala/akka/stream/javadsl/StreamTcp.scala new file mode 100644 index 0000000000..ac64e55039 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/javadsl/StreamTcp.scala @@ -0,0 +1,168 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.javadsl + +import java.lang.{ Iterable ⇒ JIterable } +import scala.collection.immutable +import scala.concurrent.duration._ +import java.net.InetSocketAddress +import scala.concurrent.Future +import scala.util.control.NoStackTrace +import akka.actor.ActorSystem +import akka.actor.ExtendedActorSystem +import akka.actor.ExtensionId +import akka.actor.ExtensionIdProvider +import akka.stream.FlowMaterializer +import akka.stream.scaladsl +import akka.util.ByteString +import akka.japi.Util.immutableSeq +import akka.io.Inet.SocketOption + +object StreamTcp extends ExtensionId[StreamTcp] with ExtensionIdProvider { + + /** + * Represents a prospective TCP server binding. + */ + class ServerBinding private[akka] (delegate: scaladsl.StreamTcp.ServerBinding) { + /** + * The local address of the endpoint bound by the materialization of the `connections` [[Source]] + * whose [[MaterializedMap]] is passed as parameter. + */ + def localAddress(materializedMap: MaterializedMap): Future[InetSocketAddress] = + delegate.localAddress(materializedMap.asScala) + + /** + * The stream of accepted incoming connections. + * Can be materialized several times but only one subscription can be "live" at one time, i.e. + * subsequent materializations will reject subscriptions with an [[BindFailedException]] if the previous + * materialization still has an uncancelled subscription. + * Cancelling the subscription to a materialization of this source will cause the listening port to be unbound. + */ + def connections: Source[IncomingConnection] = + Source.adapt(delegate.connections.map(new IncomingConnection(_))) + + /** + * Asynchronously triggers the unbinding of the port that was bound by the materialization of the `connections` + * [[Source]] whose [[MaterializedMap]] is passed as parameter. + * + * The produced [[scala.concurrent.Future]] is fulfilled when the unbinding has been completed. + */ + def unbind(materializedMap: MaterializedMap): Future[Unit] = + delegate.unbind(materializedMap.asScala) + } + + /** + * Represents an accepted incoming TCP connection. + */ + class IncomingConnection private[akka] (delegate: scaladsl.StreamTcp.IncomingConnection) { + /** + * The local address this connection is bound to. + */ + def localAddress: InetSocketAddress = delegate.localAddress + + /** + * The remote address this connection is bound to. + */ + def remoteAddress: InetSocketAddress = delegate.remoteAddress + + /** + * Handles the connection using the given flow, which is materialized exactly once and the respective + * [[MaterializedMap]] returned. + * + * Convenience shortcut for: `flow.join(handler).run()`. + */ + def handleWith(handler: Flow[ByteString, ByteString], materializer: FlowMaterializer): MaterializedMap = + new MaterializedMap(delegate.handleWith(handler.asScala)(materializer)) + + /** + * A flow representing the client on the other side of the connection. + * This flow can be materialized only once. + */ + def flow: Flow[ByteString, ByteString] = Flow.adapt(delegate.flow) + } + + /** + * Represents a prospective outgoing TCP connection. + */ + class OutgoingConnection private[akka] (delegate: scaladsl.StreamTcp.OutgoingConnection) { + /** + * The remote address this connection is or will be bound to. + */ + def remoteAddress: InetSocketAddress = delegate.remoteAddress + + /** + * The local address of the endpoint bound by the materialization of the connection materialization + * whose [[MaterializedMap]] is passed as parameter. + */ + def localAddress(mMap: MaterializedMap): Future[InetSocketAddress] = + delegate.localAddress(mMap.asScala) + + /** + * Handles the connection using the given flow. + * This method can be called several times, every call will materialize the given flow exactly once thereby + * triggering a new connection attempt to the `remoteAddress`. + * If the connection cannot be established the materialized stream will immediately be terminated + * with a [[akka.stream.StreamTcpException]]. + * + * Convenience shortcut for: `flow.join(handler).run()`. + */ + def handleWith(handler: Flow[ByteString, ByteString], materializer: FlowMaterializer): MaterializedMap = + new MaterializedMap(delegate.handleWith(handler.asScala)(materializer)) + + /** + * A flow representing the server on the other side of the connection. + * This flow can be materialized several times, every materialization will open a new connection to the + * `remoteAddress`. If the connection cannot be established the materialized stream will immediately be terminated + * with a [[akka.stream.StreamTcpException]]. + */ + def flow: Flow[ByteString, ByteString] = Flow.adapt(delegate.flow) + } + + override def get(system: ActorSystem): StreamTcp = super.get(system) + + def lookup() = StreamTcp + + def createExtension(system: ExtendedActorSystem): StreamTcp = new StreamTcp(system) +} + +class StreamTcp(system: ExtendedActorSystem) extends akka.actor.Extension { + import StreamTcp._ + + private lazy val delegate: scaladsl.StreamTcp = scaladsl.StreamTcp(system) + + /** + * Creates a [[StreamTcp.ServerBinding]] instance which represents a prospective TCP server binding on the given `endpoint`. + */ + def bind(endpoint: InetSocketAddress, + backlog: Int, + options: JIterable[SocketOption], + idleTimeout: Duration): ServerBinding = + new ServerBinding(delegate.bind(endpoint, backlog, immutableSeq(options), idleTimeout)) + + /** + * Creates a [[StreamTcp.ServerBinding]] without specifying options. + * It represents a prospective TCP server binding on the given `endpoint`. + */ + def bind(endpoint: InetSocketAddress): ServerBinding = + new ServerBinding(delegate.bind(endpoint)) + + /** + * Creates an [[StreamTcp.OutgoingConnection]] instance representing a prospective TCP client connection to the given endpoint. + */ + def outgoingConnection(remoteAddress: InetSocketAddress, + localAddress: Option[InetSocketAddress], + options: JIterable[SocketOption], + connectTimeout: Duration, + idleTimeout: Duration): OutgoingConnection = + new OutgoingConnection(delegate.outgoingConnection( + remoteAddress, localAddress, immutableSeq(options), connectTimeout, idleTimeout)) + + /** + * Creates an [[StreamTcp.OutgoingConnection]] without specifying options. + * It represents a prospective TCP client connection to the given endpoint. + */ + def outgoingConnection(remoteAddress: InetSocketAddress): OutgoingConnection = + new OutgoingConnection(delegate.outgoingConnection(remoteAddress)) + +} diff --git a/akka-stream/src/main/scala/akka/stream/io/StreamTcp.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/StreamTcp.scala similarity index 56% rename from akka-stream/src/main/scala/akka/stream/io/StreamTcp.scala rename to akka-stream/src/main/scala/akka/stream/scaladsl/StreamTcp.scala index 0cf197d66b..59ca81255f 100644 --- a/akka-stream/src/main/scala/akka/stream/io/StreamTcp.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/StreamTcp.scala @@ -1,24 +1,35 @@ /** * Copyright (C) 2009-2014 Typesafe Inc. */ -package akka.stream.io +package akka.stream.scaladsl import java.net.{ InetSocketAddress, URLEncoder } -import org.reactivestreams.{ Processor, Subscriber, Subscription } -import scala.util.control.NoStackTrace -import scala.util.{ Failure, Success } import scala.collection.immutable -import scala.concurrent.duration._ import scala.concurrent.{ Promise, ExecutionContext, Future } -import akka.util.ByteString +import scala.concurrent.duration.Duration +import scala.util.{ Failure, Success } +import scala.util.control.NoStackTrace +import akka.actor.Actor +import akka.actor.ActorRef +import akka.actor.ActorSystem +import akka.actor.ExtendedActorSystem +import akka.actor.ExtensionId +import akka.actor.ExtensionIdProvider +import akka.actor.Props import akka.io.Inet.SocketOption import akka.io.Tcp import akka.stream.{ FlowMaterializer, MaterializerSettings } -import akka.stream.scaladsl._ import akka.stream.impl._ -import akka.actor._ +import akka.stream.scaladsl._ +import akka.util.ByteString +import org.reactivestreams.{ Processor, Subscriber, Subscription } +import akka.actor.actorRef2Scala +import akka.stream.impl.io.TcpStreamActor +import akka.stream.impl.io.TcpListenStreamActor +import akka.stream.impl.io.DelayedInitProcessor +import akka.stream.impl.io.StreamTcpManager -object StreamTcp extends ExtensionId[StreamTcpExt] with ExtensionIdProvider { +object StreamTcp extends ExtensionId[StreamTcp] with ExtensionIdProvider { /** * Represents a prospective TCP server binding. @@ -43,7 +54,7 @@ object StreamTcp extends ExtensionId[StreamTcpExt] with ExtensionIdProvider { * Asynchronously triggers the unbinding of the port that was bound by the materialization of the `connections` * [[Source]] whose [[MaterializedMap]] is passed as parameter. * - * The produced [[Future]] is fulfilled when the unbinding has been completed. + * The produced [[scala.concurrent.Future]] is fulfilled when the unbinding has been completed. */ def unbind(materializedMap: MaterializedMap): Future[Unit] } @@ -80,7 +91,7 @@ object StreamTcp extends ExtensionId[StreamTcpExt] with ExtensionIdProvider { /** * Represents a prospective outgoing TCP connection. */ - sealed trait OutgoingConnection { + trait OutgoingConnection { /** * The remote address this connection is or will be bound to. */ @@ -97,7 +108,7 @@ object StreamTcp extends ExtensionId[StreamTcpExt] with ExtensionIdProvider { * This method can be called several times, every call will materialize the given flow exactly once thereby * triggering a new connection attempt to the `remoteAddress`. * If the connection cannot be established the materialized stream will immediately be terminated - * with a [[ConnectionAttemptFailedException]]. + * with a [[akka.stream.StreamTcpException]]. * * Convenience shortcut for: `flow.join(handler).run()`. */ @@ -107,35 +118,36 @@ object StreamTcp extends ExtensionId[StreamTcpExt] with ExtensionIdProvider { * A flow representing the server on the other side of the connection. * This flow can be materialized several times, every materialization will open a new connection to the * `remoteAddress`. If the connection cannot be established the materialized stream will immediately be terminated - * with a [[ConnectionAttemptFailedException]]. + * with a [[akka.stream.StreamTcpException]]. */ def flow: Flow[ByteString, ByteString] } - case object BindFailedException extends RuntimeException with NoStackTrace + /** + * INTERNAL API + */ + private[akka] class PreMaterializedOutgoingKey extends Key[Future[InetSocketAddress]] { + override def materialize(map: MaterializedMap) = + throw new IllegalStateException("This key has already been materialized by the TCP Processor") + } - class ConnectionException(message: String) extends RuntimeException(message) + def apply()(implicit system: ActorSystem): StreamTcp = super.apply(system) - class ConnectionAttemptFailedException(val endpoint: InetSocketAddress) extends ConnectionException(s"Connection attempt to $endpoint failed") - - //////////////////// EXTENSION SETUP /////////////////// - - def apply()(implicit system: ActorSystem): StreamTcpExt = super.apply(system) + override def get(system: ActorSystem): StreamTcp = super.get(system) def lookup() = StreamTcp - def createExtension(system: ExtendedActorSystem): StreamTcpExt = new StreamTcpExt(system) + def createExtension(system: ExtendedActorSystem): StreamTcp = new StreamTcp(system) } -class StreamTcpExt(system: ExtendedActorSystem) extends akka.actor.Extension { - import StreamTcpExt._ +class StreamTcp(system: ExtendedActorSystem) extends akka.actor.Extension { import StreamTcp._ import system.dispatcher private val manager: ActorRef = system.systemActorOf(Props[StreamTcpManager], name = "IO-TCP-STREAM") /** - * Creates a [[ServerBinding]] instance which represents a prospective TCP server binding on the given `endpoint`. + * Creates a [[StreamTcp.ServerBinding]] instance which represents a prospective TCP server binding on the given `endpoint`. */ def bind(endpoint: InetSocketAddress, backlog: Int = 100, @@ -160,7 +172,7 @@ class StreamTcpExt(system: ExtendedActorSystem) extends akka.actor.Extension { } /** - * Creates an [[OutgoingConnection]] instance representing a prospective TCP client connection to the given endpoint. + * Creates an [[StreamTcp.OutgoingConnection]] instance representing a prospective TCP client connection to the given endpoint. */ def outgoingConnection(remoteAddress: InetSocketAddress, localAddress: Option[InetSocketAddress] = None, @@ -186,110 +198,3 @@ class StreamTcpExt(system: ExtendedActorSystem) extends akka.actor.Extension { } } -/** - * INTERNAL API - */ -private[akka] object StreamTcpExt { - /** - * INTERNAL API - */ - class PreMaterializedOutgoingKey extends Key[Future[InetSocketAddress]] { - override def materialize(map: MaterializedMap) = - throw new IllegalStateException("This key has already been materialized by the TCP Processor") - } -} - -/** - * INTERNAL API - */ -private[akka] class DelayedInitProcessor[I, O](val implFuture: Future[Processor[I, O]])(implicit ec: ExecutionContext) extends Processor[I, O] { - @volatile private var impl: Processor[I, O] = _ - private val setVarFuture = implFuture.andThen { case Success(p) ⇒ impl = p } - - override def onSubscribe(s: Subscription): Unit = setVarFuture.onComplete { - case Success(x) ⇒ x.onSubscribe(s) - case Failure(_) ⇒ s.cancel() - } - - override def onError(t: Throwable): Unit = { - if (impl eq null) setVarFuture.onSuccess { case p ⇒ p.onError(t) } - else impl.onError(t) - } - - override def onComplete(): Unit = { - if (impl eq null) setVarFuture.onSuccess { case p ⇒ p.onComplete() } - else impl.onComplete() - } - - override def onNext(t: I): Unit = impl.onNext(t) - - override def subscribe(s: Subscriber[_ >: O]): Unit = setVarFuture.onComplete { - case Success(x) ⇒ x.subscribe(s) - case Failure(e) ⇒ s.onError(e) - } -} - -/** - * INTERNAL API - */ -private[io] object StreamTcpManager { - /** - * INTERNAL API - */ - private[io] case class Connect(processorPromise: Promise[Processor[ByteString, ByteString]], - localAddressPromise: Promise[InetSocketAddress], - remoteAddress: InetSocketAddress, - localAddress: Option[InetSocketAddress], - options: immutable.Traversable[SocketOption], - connectTimeout: Duration, - idleTimeout: Duration) - - /** - * INTERNAL API - */ - private[io] case class Bind(localAddressPromise: Promise[InetSocketAddress], - unbindPromise: Promise[() ⇒ Future[Unit]], - flowSubscriber: Subscriber[StreamTcp.IncomingConnection], - endpoint: InetSocketAddress, - backlog: Int, - options: immutable.Traversable[SocketOption], - idleTimeout: Duration) - - /** - * INTERNAL API - */ - private[io] case class ExposedProcessor(processor: Processor[ByteString, ByteString]) - -} - -/** - * INTERNAL API - */ -private[akka] class StreamTcpManager extends Actor { - import akka.stream.io.StreamTcpManager._ - - var nameCounter = 0 - def encName(prefix: String, endpoint: InetSocketAddress) = { - nameCounter += 1 - s"$prefix-$nameCounter-${URLEncoder.encode(endpoint.toString, "utf-8")}" - } - - def receive: Receive = { - case Connect(processorPromise, localAddressPromise, remoteAddress, localAddress, options, connectTimeout, _) ⇒ - val connTimeout = connectTimeout match { - case x: FiniteDuration ⇒ Some(x) - case _ ⇒ None - } - val processorActor = context.actorOf(TcpStreamActor.outboundProps(processorPromise, localAddressPromise, - Tcp.Connect(remoteAddress, localAddress, options, connTimeout, pullMode = true), - materializerSettings = MaterializerSettings(context.system)), name = encName("client", remoteAddress)) - processorActor ! ExposedProcessor(ActorProcessor[ByteString, ByteString](processorActor)) - - case Bind(localAddressPromise, unbindPromise, flowSubscriber, endpoint, backlog, options, _) ⇒ - val publisherActor = context.actorOf(TcpListenStreamActor.props(localAddressPromise, unbindPromise, - flowSubscriber, Tcp.Bind(context.system.deadLetters, endpoint, backlog, options, pullMode = true), - MaterializerSettings(context.system)), name = encName("server", endpoint)) - // this sends the ExposedPublisher message to the publisher actor automatically - ActorPublisher[Any](publisherActor) - } -} diff --git a/akka-stream/src/main/scala/akka/stream/io/SslTlsCipher.scala b/akka-stream/src/main/scala/akka/stream/ssl/SslTlsCipher.scala similarity index 99% rename from akka-stream/src/main/scala/akka/stream/io/SslTlsCipher.scala rename to akka-stream/src/main/scala/akka/stream/ssl/SslTlsCipher.scala index a1ca79ca10..69565d6c19 100644 --- a/akka-stream/src/main/scala/akka/stream/io/SslTlsCipher.scala +++ b/akka-stream/src/main/scala/akka/stream/ssl/SslTlsCipher.scala @@ -2,7 +2,7 @@ * Copyright (C) 2009-2014 Typesafe Inc. */ -package akka.stream.io +package akka.stream.ssl import java.nio.ByteBuffer import java.security.Principal