Add test of basic Redis-style key-value store
This commit is contained in:
parent
ec4e7f7b03
commit
8b166454bf
3 changed files with 99 additions and 18 deletions
|
|
@ -13,25 +13,18 @@ import akka.dispatch.Promise
|
||||||
|
|
||||||
object IOActorSpec {
|
object IOActorSpec {
|
||||||
|
|
||||||
class SimpleEchoServer(host: String, port: Int, ioManager: ActorRef) extends Actor with IO {
|
class SimpleEchoServer(host: String, port: Int, ioManager: ActorRef) extends IOActor {
|
||||||
|
|
||||||
var server: Option[IO.Handle] = None
|
sequentialIO = false
|
||||||
var clients: Set[IO.Handle] = Set.empty
|
idleWakeup = true
|
||||||
|
|
||||||
override def preStart = {
|
override def preStart = {
|
||||||
server = Some(listen(ioManager, host, port))
|
listen(ioManager, host, port)
|
||||||
}
|
}
|
||||||
|
|
||||||
def receive = {
|
def receiveIO = {
|
||||||
case IO.NewConnection(handle) ⇒
|
case IO.NewConnection(handle) ⇒ accept(handle)
|
||||||
println("S: Client connected")
|
case IO.WakeUp(handle) ⇒ write(handle, read(handle))
|
||||||
clients += accept(handle, self)
|
|
||||||
case IO.Read(handle, bytes) ⇒
|
|
||||||
println("S: Echoing data")
|
|
||||||
write(handle, bytes)
|
|
||||||
case IO.Closed(handle) ⇒
|
|
||||||
println("S: Connection closed")
|
|
||||||
clients -= handle
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
@ -48,19 +41,79 @@ object IOActorSpec {
|
||||||
|
|
||||||
def receiveIO = {
|
def receiveIO = {
|
||||||
case bytes: ByteString ⇒
|
case bytes: ByteString ⇒
|
||||||
println("C: Sending Request")
|
|
||||||
write(handle, bytes)
|
write(handle, bytes)
|
||||||
self reply read(handle, bytes.length)
|
self reply read(handle, bytes.length)
|
||||||
println("C: Got Response")
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Basic Redis-style protocol
|
||||||
|
class KVStore(host: String, port: Int, ioManager: ActorRef) extends IOActor {
|
||||||
|
|
||||||
|
sequentialIO = false
|
||||||
|
idleWakeup = true
|
||||||
|
|
||||||
|
var kvs: Map[String, ByteString] = Map.empty
|
||||||
|
|
||||||
|
override def preStart = {
|
||||||
|
listen(ioManager, host, port)
|
||||||
|
}
|
||||||
|
|
||||||
|
def receiveIO = {
|
||||||
|
case IO.NewConnection(handle) ⇒
|
||||||
|
accept(handle)
|
||||||
|
case IO.WakeUp(handle) ⇒
|
||||||
|
val cmd = read(handle, ByteString(" ")).utf8String.trim
|
||||||
|
cmd match {
|
||||||
|
case "SET" ⇒
|
||||||
|
val key = read(handle, ByteString(" ")).utf8String.trim
|
||||||
|
val len = read(handle, ByteString("\r\n")).utf8String.trim
|
||||||
|
val value = read(handle, len.toInt)
|
||||||
|
kvs += (key -> value)
|
||||||
|
write(handle, ByteString("+OK\r\n"))
|
||||||
|
case "GET" ⇒
|
||||||
|
val key = read(handle, ByteString("\r\n")).utf8String.trim
|
||||||
|
write(handle, kvs.get(key).map(v ⇒ ByteString("$" + v.length + "\r\n") ++ v).getOrElse(ByteString("$-1\r\n")))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
class KVClient(host: String, port: Int, ioManager: ActorRef) extends IOActor {
|
||||||
|
|
||||||
|
// FIXME: should prioritize reads from first message
|
||||||
|
// sequentialIO = false
|
||||||
|
|
||||||
|
var handle: IO.Handle = _
|
||||||
|
|
||||||
|
override def preStart: Unit = {
|
||||||
|
handle = connect(ioManager, host, port)
|
||||||
|
}
|
||||||
|
|
||||||
|
def receiveIO = {
|
||||||
|
case ('set, key: String, value: ByteString) ⇒
|
||||||
|
write(handle, ByteString("SET " + key + " " + value.length + "\r\n") ++ value)
|
||||||
|
val resultType = read(handle, 1).utf8String
|
||||||
|
if (resultType != "+") sys.error("Unexpected response")
|
||||||
|
val status = read(handle, ByteString("\r\n"))
|
||||||
|
self reply status.take(status.length - 2)
|
||||||
|
|
||||||
|
case ('get, key: String) ⇒
|
||||||
|
write(handle, ByteString("GET " + key + "\r\n"))
|
||||||
|
val resultType = read(handle, 1).utf8String
|
||||||
|
if (resultType != "$") sys.error("Unexpected response")
|
||||||
|
val len = read(handle, ByteString("\r\n")).utf8String.trim
|
||||||
|
val value = read(handle, len.toInt)
|
||||||
|
self reply value
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
class IOActorSpec extends WordSpec with MustMatchers with BeforeAndAfterEach {
|
class IOActorSpec extends WordSpec with MustMatchers with BeforeAndAfterEach {
|
||||||
import IOActorSpec._
|
import IOActorSpec._
|
||||||
|
|
||||||
"an IO Actor" must {
|
"an IO Actor" must {
|
||||||
"run" in {
|
"run echo server" in {
|
||||||
val ioManager = Actor.actorOf(new IOManager(2)).start // teeny tiny buffer
|
val ioManager = Actor.actorOf(new IOManager(2)).start // teeny tiny buffer
|
||||||
val server = Actor.actorOf(new SimpleEchoServer("localhost", 8064, ioManager)).start
|
val server = Actor.actorOf(new SimpleEchoServer("localhost", 8064, ioManager)).start
|
||||||
val client = Actor.actorOf(new SimpleEchoClient("localhost", 8064, ioManager)).start
|
val client = Actor.actorOf(new SimpleEchoClient("localhost", 8064, ioManager)).start
|
||||||
|
|
@ -74,6 +127,26 @@ class IOActorSpec extends WordSpec with MustMatchers with BeforeAndAfterEach {
|
||||||
server.stop
|
server.stop
|
||||||
ioManager.stop
|
ioManager.stop
|
||||||
}
|
}
|
||||||
|
|
||||||
|
"run key-value store" in {
|
||||||
|
val ioManager = Actor.actorOf(new IOManager(2)).start // teeny tiny buffer
|
||||||
|
val server = Actor.actorOf(new KVStore("localhost", 8064, ioManager)).start
|
||||||
|
val client = Actor.actorOf(new KVClient("localhost", 8064, ioManager)).start
|
||||||
|
val promise1 = client !!! (('set, "hello", ByteString("World")))
|
||||||
|
val promise2 = client !!! (('set, "test", ByteString("No one will read me")))
|
||||||
|
val promise3 = client !!! (('get, "hello"))
|
||||||
|
val promise4 = client !!! (('set, "test", ByteString("I'm a test!")))
|
||||||
|
val promise5 = client !!! (('get, "test"))
|
||||||
|
(promise1.get: ByteString) must equal(ByteString("OK"))
|
||||||
|
(promise2.get: ByteString) must equal(ByteString("OK"))
|
||||||
|
(promise3.get: ByteString) must equal(ByteString("World"))
|
||||||
|
(promise4.get: ByteString) must equal(ByteString("OK"))
|
||||||
|
(promise5.get: ByteString) must equal(ByteString("I'm a test!"))
|
||||||
|
client.stop
|
||||||
|
server.stop
|
||||||
|
ioManager.stop
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -45,6 +45,7 @@ object IO {
|
||||||
case class Closed(handle: Handle) extends IOMessage
|
case class Closed(handle: Handle) extends IOMessage
|
||||||
case class Read(handle: Handle, bytes: ByteString) extends IOMessage
|
case class Read(handle: Handle, bytes: ByteString) extends IOMessage
|
||||||
case class Write(handle: Handle, bytes: ByteString) extends IOMessage
|
case class Write(handle: Handle, bytes: ByteString) extends IOMessage
|
||||||
|
case class WakeUp(handle: Handle) extends IOMessage
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -76,6 +77,8 @@ trait IO {
|
||||||
handle
|
handle
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def accept(source: IO.Handle): Unit = accept(source, self)
|
||||||
|
|
||||||
def write(handle: IO.Handle, bytes: ByteString): Unit =
|
def write(handle: IO.Handle, bytes: ByteString): Unit =
|
||||||
handle.ioManager ! IO.Write(handle, bytes)
|
handle.ioManager ! IO.Write(handle, bytes)
|
||||||
|
|
||||||
|
|
@ -100,6 +103,8 @@ trait IOActor extends Actor with IO {
|
||||||
|
|
||||||
protected var sequentialIO = true
|
protected var sequentialIO = true
|
||||||
|
|
||||||
|
protected var idleWakeup = false
|
||||||
|
|
||||||
private val _messages: mutable.Queue[MessageInvocation] = mutable.Queue.empty
|
private val _messages: mutable.Queue[MessageInvocation] = mutable.Queue.empty
|
||||||
|
|
||||||
private var _state: Map[IO.Handle, HandleState] = Map.empty
|
private var _state: Map[IO.Handle, HandleState] = Map.empty
|
||||||
|
|
@ -136,7 +141,8 @@ trait IOActor extends Actor with IO {
|
||||||
case IO.Read(handle, newBytes) ⇒
|
case IO.Read(handle, newBytes) ⇒
|
||||||
val st = state(handle)
|
val st = state(handle)
|
||||||
st.readBytes :+= newBytes
|
st.readBytes :+= newBytes
|
||||||
run(handle)
|
if (st.messages.isEmpty && idleWakeup) reset { _receiveIO(IO.WakeUp(handle)) }
|
||||||
|
else run(handle)
|
||||||
case IO.Connected(handle) ⇒
|
case IO.Connected(handle) ⇒
|
||||||
state(handle).connected = true
|
state(handle).connected = true
|
||||||
case IO.Closed(handle) ⇒
|
case IO.Closed(handle) ⇒
|
||||||
|
|
|
||||||
|
|
@ -93,6 +93,8 @@ final class ByteString private (bytes: Array[Byte], startIndex: Int, endIndex: I
|
||||||
|
|
||||||
def toByteBuffer: ByteBuffer = ByteBuffer.wrap(toArray)
|
def toByteBuffer: ByteBuffer = ByteBuffer.wrap(toArray)
|
||||||
|
|
||||||
|
def utf8String: String = new String(bytes, "UTF-8")
|
||||||
|
|
||||||
def mapI(f: Byte ⇒ Int): ByteString = map(f andThen (_.toByte))
|
def mapI(f: Byte ⇒ Int): ByteString = map(f andThen (_.toByte))
|
||||||
|
|
||||||
override def slice(from: Int, until: Int): ByteString = {
|
override def slice(from: Int, until: Int): ByteString = {
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue