diff --git a/akka-actor-tests/src/test/scala/akka/actor/actor/IOActor.scala b/akka-actor-tests/src/test/scala/akka/actor/actor/IOActor.scala index 3d742e0afb..6087caa905 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/actor/IOActor.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/actor/IOActor.scala @@ -26,7 +26,10 @@ object IOActorSpec { def receiveIO = { case NewClient(server) ⇒ val socket = server.accept() - loopC { socket write socket.read() } + loopC { + val bytes = socket.read() + socket write bytes + } } }) @@ -38,16 +41,21 @@ object IOActorSpec { class SimpleEchoClient(host: String, port: Int, ioManager: ActorRef) extends Actor with IO { - var socket: SocketHandle = _ - - override def preStart: Unit = { - socket = connect(ioManager, host, port) - } + lazy val socket: SocketHandle = connect(ioManager, host, port)(Some(reader)) + lazy val reader: ActorRef = Actor.actorOf { + new Actor with IO { + def receiveIO = { + case length: Int ⇒ + val bytes = socket.read(length) + self reply bytes + } + } + }.start def receiveIO = { case bytes: ByteString ⇒ socket write bytes - self reply socket.read(bytes.length) + reader forward bytes.length } } @@ -177,6 +185,30 @@ class IOActorSpec extends WordSpec with MustMatchers with BeforeAndAfterEach { ioManager.stop } + "run echo server under high load" in { + val ioManager = Actor.actorOf(new IOManager()).start + val server = Actor.actorOf(new SimpleEchoServer("localhost", 8064, ioManager)).start + val client = Actor.actorOf(new SimpleEchoClient("localhost", 8064, ioManager)).start + val list = List.range(0, 100000) + val f: Future[List[ByteString]] = Future.traverse(list)(i ⇒ client !!! ByteString(i.toString)) + assert(f.get.size === 100000) + client.stop + server.stop + ioManager.stop + } + + "run echo server under high load with small buffer" in { + val ioManager = Actor.actorOf(new IOManager(2)).start + val server = Actor.actorOf(new SimpleEchoServer("localhost", 8064, ioManager)).start + val client = Actor.actorOf(new SimpleEchoClient("localhost", 8064, ioManager)).start + val list = List.range(0, 100000) + val f: Future[List[ByteString]] = Future.traverse(list)(i ⇒ client !!! ByteString(i.toString)) + assert(f.get.size === 100000) + client.stop + server.stop + ioManager.stop + } + "run key-value store" in { val ioManager = Actor.actorOf(new IOManager(2)).start // teeny tiny buffer val server = Actor.actorOf(new KVStore("localhost", 8064, ioManager)).start