From 2cb7beabb6fd207107e3f1ccb0d4efc151dd3f39 Mon Sep 17 00:00:00 2001 From: Derek Williams Date: Mon, 13 Jun 2011 19:19:45 -0600 Subject: [PATCH] Update to latest master --- .../test/scala/akka/actor/actor/IOActor.scala | 64 +++++++++---------- akka-actor/src/main/scala/akka/actor/IO.scala | 32 +++++++--- 2 files changed, 54 insertions(+), 42 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/actor/IOActor.scala b/akka-actor-tests/src/test/scala/akka/actor/actor/IOActor.scala index f0d30be6d7..b888771888 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/actor/IOActor.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/actor/IOActor.scala @@ -41,7 +41,7 @@ object IOActorSpec { class SimpleEchoClient(host: String, port: Int, ioManager: ActorRef) extends Actor with IO { - lazy val socket: SocketHandle = connect(ioManager, host, port)(Some(reader)) + lazy val socket: SocketHandle = connect(ioManager, host, port, reader) lazy val reader: ActorRef = Actor.actorOf { new Actor with IO { def receiveIO = { @@ -77,20 +77,20 @@ object IOActorSpec { val result = matchC(cmd.split(' ')) { case Array("SET", key, length) ⇒ val value = socket read length.toInt - server.owner !!! (('set, key, value)) map ((_: Unit) ⇒ ByteString("+OK\r\n")) + server.owner ? ('set, key, value) map ((x: Any) ⇒ ByteString("+OK\r\n")) case Array("GET", key) ⇒ - server.owner !!! (('get, key)) map { value: Option[ByteString] ⇒ - value map { bytes ⇒ - ByteString("$" + bytes.length + "\r\n") ++ bytes - } getOrElse ByteString("$-1\r\n") + server.owner ? ('get, key) collect { + case Some(b: ByteString) ⇒ ByteString("$" + b.length + "\r\n") ++ b + case None ⇒ ByteString("$-1\r\n") } case Array("GETALL") ⇒ - server.owner !!! 'getall map { all: Map[String, ByteString] ⇒ - (ByteString("*" + (all.size * 2) + "\r\n") /: all) { - case (result, (k, v)) ⇒ - val kBytes = ByteString(k) - result ++ ByteString("$" + kBytes.length + "\r\n") ++ kBytes ++ ByteString("$" + v.length + "\r\n") ++ v - } + server.owner ? 'getall collect { + case m: Map[_, _] ⇒ + (ByteString("*" + (m.size * 2) + "\r\n") /: m) { + case (result, (k: String, v: ByteString)) ⇒ + val kBytes = ByteString(k) + result ++ ByteString("$" + kBytes.length + "\r\n") ++ kBytes ++ ByteString("$" + v.length + "\r\n") ++ v + } } } result recover { @@ -174,12 +174,12 @@ class IOActorSpec extends WordSpec with MustMatchers with BeforeAndAfterEach { val ioManager = Actor.actorOf(new IOManager(2)).start // teeny tiny buffer val server = Actor.actorOf(new SimpleEchoServer("localhost", 8064, ioManager)).start val client = Actor.actorOf(new SimpleEchoClient("localhost", 8064, ioManager)).start - val f1 = client !!! ByteString("Hello World!1") - val f2 = client !!! ByteString("Hello World!2") - val f3 = client !!! ByteString("Hello World!3") - (f1.get: ByteString) must equal(ByteString("Hello World!1")) - (f2.get: ByteString) must equal(ByteString("Hello World!2")) - (f3.get: ByteString) must equal(ByteString("Hello World!3")) + val f1 = client ? ByteString("Hello World!1") + val f2 = client ? ByteString("Hello World!2") + val f3 = client ? ByteString("Hello World!3") + f1.get must equal(ByteString("Hello World!1")) + f2.get must equal(ByteString("Hello World!2")) + f3.get must equal(ByteString("Hello World!3")) client.stop server.stop ioManager.stop @@ -190,7 +190,7 @@ class IOActorSpec extends WordSpec with MustMatchers with BeforeAndAfterEach { val server = Actor.actorOf(new SimpleEchoServer("localhost", 8064, ioManager)).start val client = Actor.actorOf(new SimpleEchoClient("localhost", 8064, ioManager)).start val list = List.range(0, 100000) - val f: Future[List[ByteString]] = Future.traverse(list)(i ⇒ client !!! ByteString(i.toString)) + val f = Future.traverse(list)(i ⇒ client ? ByteString(i.toString)) assert(f.get.size === 100000) client.stop server.stop @@ -202,7 +202,7 @@ class IOActorSpec extends WordSpec with MustMatchers with BeforeAndAfterEach { val server = Actor.actorOf(new SimpleEchoServer("localhost", 8064, ioManager)).start val client = Actor.actorOf(new SimpleEchoClient("localhost", 8064, ioManager)).start val list = List.range(0, 100000) - val f: Future[List[ByteString]] = Future.traverse(list)(i ⇒ client !!! ByteString(i.toString)) + val f = Future.traverse(list)(i ⇒ client ? ByteString(i.toString)) assert(f.get.size === 100000) client.stop server.stop @@ -214,20 +214,20 @@ class IOActorSpec extends WordSpec with MustMatchers with BeforeAndAfterEach { val server = Actor.actorOf(new KVStore("localhost", 8064, ioManager)).start val client1 = Actor.actorOf(new KVClient("localhost", 8064, ioManager)).start val client2 = Actor.actorOf(new KVClient("localhost", 8064, ioManager)).start - val f1 = client1 !!! (('set, "hello", ByteString("World"))) - val f2 = client1 !!! (('set, "test", ByteString("No one will read me"))) - val f3 = client1 !!! (('get, "hello")) + val f1 = client1 ? ('set, "hello", ByteString("World")) + val f2 = client1 ? ('set, "test", ByteString("No one will read me")) + val f3 = client1 ? ('get, "hello") f2.await - val f4 = client2 !!! (('set, "test", ByteString("I'm a test!"))) + val f4 = client2 ? ('set, "test", ByteString("I'm a test!")) f4.await - val f5 = client1 !!! (('get, "test")) - val f6 = client2 !!! 'getall - (f1.get: String) must equal("OK") - (f2.get: String) must equal("OK") - (f3.get: ByteString) must equal(ByteString("World")) - (f4.get: String) must equal("OK") - (f5.get: ByteString) must equal(ByteString("I'm a test!")) - (f6.get: Map[String, ByteString]) must equal(Map("hello" -> ByteString("World"), "test" -> ByteString("I'm a test!"))) + val f5 = client1 ? ('get, "test") + val f6 = client2 ? 'getall + f1.get must equal("OK") + f2.get must equal("OK") + f3.get must equal(ByteString("World")) + f4.get must equal("OK") + f5.get must equal(ByteString("I'm a test!")) + f6.get must equal(Map("hello" -> ByteString("World"), "test" -> ByteString("I'm a test!"))) client1.stop client2.stop server.stop diff --git a/akka-actor/src/main/scala/akka/actor/IO.scala b/akka-actor/src/main/scala/akka/actor/IO.scala index f177240d08..c7fe40343d 100644 --- a/akka-actor/src/main/scala/akka/actor/IO.scala +++ b/akka-actor/src/main/scala/akka/actor/IO.scala @@ -81,7 +81,7 @@ object IO { socket } - def accept()(implicit sender: Some[ActorRef]): SocketHandle = accept(sender.get) + def accept()(implicit socketOwner: ScalaActorRef): SocketHandle = accept(socketOwner) } sealed trait IOMessage @@ -95,24 +95,36 @@ object IO { case class Read(handle: ReadHandle, bytes: ByteString) extends IOMessage case class Write(handle: WriteHandle, bytes: ByteString) extends IOMessage - def listen(ioManager: ActorRef, host: String, port: Int)(implicit sender: Some[ActorRef]): ServerHandle = - listen(ioManager, new InetSocketAddress(host, port)) - - def listen(ioManager: ActorRef, address: InetSocketAddress)(implicit sender: Some[ActorRef]): ServerHandle = { - val server = ServerHandle(sender.get, ioManager) + def listen(ioManager: ActorRef, address: InetSocketAddress, owner: ActorRef): ServerHandle = { + val server = ServerHandle(owner, ioManager) ioManager ! Listen(server, address) server } - def connect(ioManager: ActorRef, host: String, port: Int)(implicit sender: Some[ActorRef]): SocketHandle = - connect(ioManager, new InetSocketAddress(host, port)) + def listen(ioManager: ActorRef, address: InetSocketAddress)(implicit sender: ScalaActorRef): ServerHandle = + listen(ioManager, address, sender) - def connect(ioManager: ActorRef, address: InetSocketAddress)(implicit sender: Some[ActorRef]): SocketHandle = { - val socket = SocketHandle(sender.get, ioManager) + def listen(ioManager: ActorRef, host: String, port: Int, owner: ActorRef): ServerHandle = + listen(ioManager, new InetSocketAddress(host, port), owner) + + def listen(ioManager: ActorRef, host: String, port: Int)(implicit sender: ScalaActorRef): ServerHandle = + listen(ioManager, new InetSocketAddress(host, port), sender) + + def connect(ioManager: ActorRef, address: InetSocketAddress, owner: ActorRef): SocketHandle = { + val socket = SocketHandle(owner, ioManager) ioManager ! Connect(socket, address) socket } + def connect(ioManager: ActorRef, address: InetSocketAddress)(implicit sender: ScalaActorRef): SocketHandle = + connect(ioManager, address, sender) + + def connect(ioManager: ActorRef, host: String, port: Int, owner: ActorRef): SocketHandle = + connect(ioManager, new InetSocketAddress(host, port), owner) + + def connect(ioManager: ActorRef, host: String, port: Int)(implicit sender: ScalaActorRef): SocketHandle = + connect(ioManager, new InetSocketAddress(host, port), sender) + private class HandleState(var readBytes: ByteString, var connected: Boolean) { def this() = this(ByteString.empty, false) }