diff --git a/akka-actor-tests/src/test/scala/akka/actor/actor/IOActor.scala b/akka-actor-tests/src/test/scala/akka/actor/actor/IOActor.scala index 2b20232c1b..da4f7c3421 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/actor/IOActor.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/actor/IOActor.scala @@ -13,25 +13,18 @@ import akka.dispatch.Promise object IOActorSpec { - class SimpleEchoServer(host: String, port: Int, ioManager: ActorRef) extends Actor with IO { + class SimpleEchoServer(host: String, port: Int, ioManager: ActorRef) extends IOActor { - var server: Option[IO.Handle] = None - var clients: Set[IO.Handle] = Set.empty + sequentialIO = false + idleWakeup = true override def preStart = { - server = Some(listen(ioManager, host, port)) + listen(ioManager, host, port) } - def receive = { - case IO.NewConnection(handle) ⇒ - println("S: Client connected") - clients += accept(handle, self) - case IO.Read(handle, bytes) ⇒ - println("S: Echoing data") - write(handle, bytes) - case IO.Closed(handle) ⇒ - println("S: Connection closed") - clients -= handle + def receiveIO = { + case IO.NewConnection(handle) ⇒ accept(handle) + case IO.WakeUp(handle) ⇒ write(handle, read(handle)) } } @@ -48,19 +41,79 @@ object IOActorSpec { def receiveIO = { case bytes: ByteString ⇒ - println("C: Sending Request") write(handle, bytes) self reply read(handle, bytes.length) - println("C: Got Response") } } + + // Basic Redis-style protocol + class KVStore(host: String, port: Int, ioManager: ActorRef) extends IOActor { + + sequentialIO = false + idleWakeup = true + + var kvs: Map[String, ByteString] = Map.empty + + override def preStart = { + listen(ioManager, host, port) + } + + def receiveIO = { + case IO.NewConnection(handle) ⇒ + accept(handle) + case IO.WakeUp(handle) ⇒ + val cmd = read(handle, ByteString(" ")).utf8String.trim + cmd match { + case "SET" ⇒ + val key = read(handle, ByteString(" ")).utf8String.trim + val len = read(handle, ByteString("\r\n")).utf8String.trim + val value = read(handle, len.toInt) + kvs += (key -> value) + write(handle, ByteString("+OK\r\n")) + case "GET" ⇒ + val key = read(handle, ByteString("\r\n")).utf8String.trim + write(handle, kvs.get(key).map(v ⇒ ByteString("$" + v.length + "\r\n") ++ v).getOrElse(ByteString("$-1\r\n"))) + } + } + + } + + class KVClient(host: String, port: Int, ioManager: ActorRef) extends IOActor { + + // FIXME: should prioritize reads from first message + // sequentialIO = false + + var handle: IO.Handle = _ + + override def preStart: Unit = { + handle = connect(ioManager, host, port) + } + + def receiveIO = { + case ('set, key: String, value: ByteString) ⇒ + write(handle, ByteString("SET " + key + " " + value.length + "\r\n") ++ value) + val resultType = read(handle, 1).utf8String + if (resultType != "+") sys.error("Unexpected response") + val status = read(handle, ByteString("\r\n")) + self reply status.take(status.length - 2) + + case ('get, key: String) ⇒ + write(handle, ByteString("GET " + key + "\r\n")) + val resultType = read(handle, 1).utf8String + if (resultType != "$") sys.error("Unexpected response") + val len = read(handle, ByteString("\r\n")).utf8String.trim + val value = read(handle, len.toInt) + self reply value + } + } + } class IOActorSpec extends WordSpec with MustMatchers with BeforeAndAfterEach { import IOActorSpec._ "an IO Actor" must { - "run" in { + "run echo server" in { val ioManager = Actor.actorOf(new IOManager(2)).start // teeny tiny buffer val server = Actor.actorOf(new SimpleEchoServer("localhost", 8064, ioManager)).start val client = Actor.actorOf(new SimpleEchoClient("localhost", 8064, ioManager)).start @@ -74,6 +127,26 @@ class IOActorSpec extends WordSpec with MustMatchers with BeforeAndAfterEach { server.stop ioManager.stop } + + "run key-value store" in { + val ioManager = Actor.actorOf(new IOManager(2)).start // teeny tiny buffer + val server = Actor.actorOf(new KVStore("localhost", 8064, ioManager)).start + val client = Actor.actorOf(new KVClient("localhost", 8064, ioManager)).start + val promise1 = client !!! (('set, "hello", ByteString("World"))) + val promise2 = client !!! (('set, "test", ByteString("No one will read me"))) + val promise3 = client !!! (('get, "hello")) + val promise4 = client !!! (('set, "test", ByteString("I'm a test!"))) + val promise5 = client !!! (('get, "test")) + (promise1.get: ByteString) must equal(ByteString("OK")) + (promise2.get: ByteString) must equal(ByteString("OK")) + (promise3.get: ByteString) must equal(ByteString("World")) + (promise4.get: ByteString) must equal(ByteString("OK")) + (promise5.get: ByteString) must equal(ByteString("I'm a test!")) + client.stop + server.stop + ioManager.stop + } + } } diff --git a/akka-actor/src/main/scala/akka/actor/IO.scala b/akka-actor/src/main/scala/akka/actor/IO.scala index d6220f5d85..a27fc365f2 100644 --- a/akka-actor/src/main/scala/akka/actor/IO.scala +++ b/akka-actor/src/main/scala/akka/actor/IO.scala @@ -45,6 +45,7 @@ object IO { case class Closed(handle: Handle) extends IOMessage case class Read(handle: Handle, bytes: ByteString) extends IOMessage case class Write(handle: Handle, bytes: ByteString) extends IOMessage + case class WakeUp(handle: Handle) extends IOMessage } @@ -76,6 +77,8 @@ trait IO { handle } + def accept(source: IO.Handle): Unit = accept(source, self) + def write(handle: IO.Handle, bytes: ByteString): Unit = handle.ioManager ! IO.Write(handle, bytes) @@ -100,6 +103,8 @@ trait IOActor extends Actor with IO { protected var sequentialIO = true + protected var idleWakeup = false + private val _messages: mutable.Queue[MessageInvocation] = mutable.Queue.empty private var _state: Map[IO.Handle, HandleState] = Map.empty @@ -136,7 +141,8 @@ trait IOActor extends Actor with IO { case IO.Read(handle, newBytes) ⇒ val st = state(handle) st.readBytes :+= newBytes - run(handle) + if (st.messages.isEmpty && idleWakeup) reset { _receiveIO(IO.WakeUp(handle)) } + else run(handle) case IO.Connected(handle) ⇒ state(handle).connected = true case IO.Closed(handle) ⇒ diff --git a/akka-actor/src/main/scala/akka/util/ByteString.scala b/akka-actor/src/main/scala/akka/util/ByteString.scala index 5729892fa5..e20191b7c5 100644 --- a/akka-actor/src/main/scala/akka/util/ByteString.scala +++ b/akka-actor/src/main/scala/akka/util/ByteString.scala @@ -93,6 +93,8 @@ final class ByteString private (bytes: Array[Byte], startIndex: Int, endIndex: I def toByteBuffer: ByteBuffer = ByteBuffer.wrap(toArray) + def utf8String: String = new String(bytes, "UTF-8") + def mapI(f: Byte ⇒ Int): ByteString = map(f andThen (_.toByte)) override def slice(from: Int, until: Int): ByteString = {