Use random port in remaining IO tests

This commit is contained in:
Derek Williams 2012-01-23 23:44:28 -07:00
parent ae79194118
commit 4bbae37ee7
2 changed files with 25 additions and 18 deletions

View file

@ -9,19 +9,22 @@ import akka.util.duration._
import scala.util.continuations._
import akka.testkit._
import akka.dispatch.{ Await, Future, Promise, ExecutionContext, MessageDispatcher }
import java.net.{ SocketAddress, InetSocketAddress }
import java.net.{ SocketAddress }
import akka.pattern.ask
object IOActorSpec {
class SimpleEchoServer(host: String, port: Int) extends Actor {
class SimpleEchoServer(addressPromise: Promise[SocketAddress]) extends Actor {
val server = IOManager(context.system) listen (host, port)
val server = IOManager(context.system) listen ("localhost", 0)
val state = IO.IterateeRef.Map.sync[IO.Handle]()
def receive = {
case IO.Listening(`server`, address)
addressPromise success address
case IO.NewClient(`server`)
val socket = server.accept()
state(socket) flatMap (_ IO repeat (IO.takeAny map socket.write))
@ -40,9 +43,9 @@ object IOActorSpec {
}
}
class SimpleEchoClient(host: String, port: Int) extends Actor {
class SimpleEchoClient(address: SocketAddress) extends Actor {
val socket = IOManager(context.system) connect (host, port)
val socket = IOManager(context.system) connect (address)
val state = IO.IterateeRef.sync()
@ -60,8 +63,6 @@ object IOActorSpec {
case IO.Read(`socket`, bytes)
state(IO Chunk bytes)
case IO.Connected(`socket`, _)
case IO.Closed(`socket`, cause)
state(IO EOF cause)
throw (cause getOrElse new RuntimeException("Socket closed"))
@ -99,7 +100,7 @@ object IOActorSpec {
var kvs: Map[String, String] = Map.empty
val server = IOManager(context.system) listen (new InetSocketAddress("localhost", 0))
val server = IOManager(context.system) listen ("localhost", 0)
val EOL = ByteString("\r\n")
@ -183,8 +184,6 @@ object IOActorSpec {
case IO.Read(`socket`, bytes)
state(IO Chunk bytes)
case IO.Connected(`socket`, _)
case IO.Closed(`socket`, cause)
state(IO EOF cause)
throw (cause getOrElse new RuntimeException("Socket closed"))
@ -274,8 +273,10 @@ class IOActorSpec extends AkkaSpec with DefaultTimeout {
"an IO Actor" must {
"run echo server" in {
filterException[java.net.ConnectException] {
val server = system.actorOf(Props(new SimpleEchoServer("localhost", 8064)))
val client = system.actorOf(Props(new SimpleEchoClient("localhost", 8064)))
val addressPromise = Promise[SocketAddress]()
val server = system.actorOf(Props(new SimpleEchoServer(addressPromise)))
val address = Await.result(addressPromise, TestLatch.DefaultTimeout)
val client = system.actorOf(Props(new SimpleEchoClient(address)))
val f1 = retry() { client ? ByteString("Hello World!1") }
val f2 = retry() { client ? ByteString("Hello World!2") }
val f3 = retry() { client ? ByteString("Hello World!3") }
@ -289,8 +290,10 @@ class IOActorSpec extends AkkaSpec with DefaultTimeout {
"run echo server under high load" in {
filterException[java.net.ConnectException] {
val server = system.actorOf(Props(new SimpleEchoServer("localhost", 8065)))
val client = system.actorOf(Props(new SimpleEchoClient("localhost", 8065)))
val addressPromise = Promise[SocketAddress]()
val server = system.actorOf(Props(new SimpleEchoServer(addressPromise)))
val address = Await.result(addressPromise, TestLatch.DefaultTimeout)
val client = system.actorOf(Props(new SimpleEchoClient(address)))
val list = List.range(0, 100)
val f = Future.traverse(list)(i retry() { client ? ByteString(i.toString) })
assert(Await.result(f, TestLatch.DefaultTimeout).size === 100)

View file

@ -134,6 +134,12 @@ object IO {
*/
case class Listen(server: ServerHandle, address: SocketAddress) extends IOMessage
/**
* Message from an [[akka.actor.IOManager]] that the ServerSocketChannel is
* now listening for connections.
*
* No action is required by the receiving [[akka.actor.Actor]].
*/
case class Listening(server: ServerHandle, address: SocketAddress) extends IOMessage
/**
@ -161,8 +167,7 @@ object IO {
* Message from an [[akka.actor.IOManager]] that the SocketChannel has
* successfully connected.
*
* No action is required by the receiving [[akka.actor.Actor]], although
* the message still needs to be in it's receive method.
* No action is required by the receiving [[akka.actor.Actor]].
*/
case class Connected(socket: SocketHandle, address: SocketAddress) extends IOMessage
@ -178,8 +183,7 @@ object IO {
* optionally contain the Exception that caused the Channel to close, if
* applicable.
*
* No action is required by the receiving [[akka.actor.Actor]], although
* the message still needs to be in it's receive method.
* No action is required by the receiving [[akka.actor.Actor]].
*/
case class Closed(handle: Handle, cause: Option[Exception]) extends IOMessage