diff --git a/akka-actor-tests/src/test/scala/akka/actor/IOActor.scala b/akka-actor-tests/src/test/scala/akka/actor/IOActor.scala index 7ef9f1c229..13ed9d8c7e 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/IOActor.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/IOActor.scala @@ -9,19 +9,22 @@ import akka.util.duration._ import scala.util.continuations._ import akka.testkit._ import akka.dispatch.{ Await, Future, Promise, ExecutionContext, MessageDispatcher } -import java.net.{ SocketAddress, InetSocketAddress } +import java.net.{ SocketAddress } import akka.pattern.ask object IOActorSpec { - class SimpleEchoServer(host: String, port: Int) extends Actor { + class SimpleEchoServer(addressPromise: Promise[SocketAddress]) extends Actor { - val server = IOManager(context.system) listen (host, port) + val server = IOManager(context.system) listen ("localhost", 0) val state = IO.IterateeRef.Map.sync[IO.Handle]() def receive = { + case IO.Listening(`server`, address) ⇒ + addressPromise success address + case IO.NewClient(`server`) ⇒ val socket = server.accept() state(socket) flatMap (_ ⇒ IO repeat (IO.takeAny map socket.write)) @@ -40,9 +43,9 @@ object IOActorSpec { } } - class SimpleEchoClient(host: String, port: Int) extends Actor { + class SimpleEchoClient(address: SocketAddress) extends Actor { - val socket = IOManager(context.system) connect (host, port) + val socket = IOManager(context.system) connect (address) val state = IO.IterateeRef.sync() @@ -60,8 +63,6 @@ object IOActorSpec { case IO.Read(`socket`, bytes) ⇒ state(IO Chunk bytes) - case IO.Connected(`socket`, _) ⇒ - case IO.Closed(`socket`, cause) ⇒ state(IO EOF cause) throw (cause getOrElse new RuntimeException("Socket closed")) @@ -99,7 +100,7 @@ object IOActorSpec { var kvs: Map[String, String] = Map.empty - val server = IOManager(context.system) listen (new InetSocketAddress("localhost", 0)) + val server = IOManager(context.system) listen ("localhost", 0) val EOL = ByteString("\r\n") @@ -183,8 +184,6 @@ object IOActorSpec { case IO.Read(`socket`, bytes) ⇒ state(IO Chunk bytes) - case IO.Connected(`socket`, _) ⇒ - case IO.Closed(`socket`, cause) ⇒ state(IO EOF cause) throw (cause getOrElse new RuntimeException("Socket closed")) @@ -274,8 +273,10 @@ class IOActorSpec extends AkkaSpec with DefaultTimeout { "an IO Actor" must { "run echo server" in { filterException[java.net.ConnectException] { - val server = system.actorOf(Props(new SimpleEchoServer("localhost", 8064))) - val client = system.actorOf(Props(new SimpleEchoClient("localhost", 8064))) + val addressPromise = Promise[SocketAddress]() + val server = system.actorOf(Props(new SimpleEchoServer(addressPromise))) + val address = Await.result(addressPromise, TestLatch.DefaultTimeout) + val client = system.actorOf(Props(new SimpleEchoClient(address))) val f1 = retry() { client ? ByteString("Hello World!1") } val f2 = retry() { client ? ByteString("Hello World!2") } val f3 = retry() { client ? ByteString("Hello World!3") } @@ -289,8 +290,10 @@ class IOActorSpec extends AkkaSpec with DefaultTimeout { "run echo server under high load" in { filterException[java.net.ConnectException] { - val server = system.actorOf(Props(new SimpleEchoServer("localhost", 8065))) - val client = system.actorOf(Props(new SimpleEchoClient("localhost", 8065))) + val addressPromise = Promise[SocketAddress]() + val server = system.actorOf(Props(new SimpleEchoServer(addressPromise))) + val address = Await.result(addressPromise, TestLatch.DefaultTimeout) + val client = system.actorOf(Props(new SimpleEchoClient(address))) val list = List.range(0, 100) val f = Future.traverse(list)(i ⇒ retry() { client ? ByteString(i.toString) }) assert(Await.result(f, TestLatch.DefaultTimeout).size === 100) diff --git a/akka-actor/src/main/scala/akka/actor/IO.scala b/akka-actor/src/main/scala/akka/actor/IO.scala index b7bfe9f0b4..b7942975d9 100644 --- a/akka-actor/src/main/scala/akka/actor/IO.scala +++ b/akka-actor/src/main/scala/akka/actor/IO.scala @@ -134,6 +134,12 @@ object IO { */ case class Listen(server: ServerHandle, address: SocketAddress) extends IOMessage + /** + * Message from an [[akka.actor.IOManager]] that the ServerSocketChannel is + * now listening for connections. + * + * No action is required by the receiving [[akka.actor.Actor]]. + */ case class Listening(server: ServerHandle, address: SocketAddress) extends IOMessage /** @@ -161,8 +167,7 @@ object IO { * Message from an [[akka.actor.IOManager]] that the SocketChannel has * successfully connected. * - * No action is required by the receiving [[akka.actor.Actor]], although - * the message still needs to be in it's receive method. + * No action is required by the receiving [[akka.actor.Actor]]. */ case class Connected(socket: SocketHandle, address: SocketAddress) extends IOMessage @@ -178,8 +183,7 @@ object IO { * optionally contain the Exception that caused the Channel to close, if * applicable. * - * No action is required by the receiving [[akka.actor.Actor]], although - * the message still needs to be in it's receive method. + * No action is required by the receiving [[akka.actor.Actor]]. */ case class Closed(handle: Handle, cause: Option[Exception]) extends IOMessage