Update to latest master
This commit is contained in:
parent
d1a71a1aba
commit
2cb7beabb6
2 changed files with 54 additions and 42 deletions
|
|
@ -41,7 +41,7 @@ object IOActorSpec {
|
|||
|
||||
class SimpleEchoClient(host: String, port: Int, ioManager: ActorRef) extends Actor with IO {
|
||||
|
||||
lazy val socket: SocketHandle = connect(ioManager, host, port)(Some(reader))
|
||||
lazy val socket: SocketHandle = connect(ioManager, host, port, reader)
|
||||
lazy val reader: ActorRef = Actor.actorOf {
|
||||
new Actor with IO {
|
||||
def receiveIO = {
|
||||
|
|
@ -77,20 +77,20 @@ object IOActorSpec {
|
|||
val result = matchC(cmd.split(' ')) {
|
||||
case Array("SET", key, length) ⇒
|
||||
val value = socket read length.toInt
|
||||
server.owner !!! (('set, key, value)) map ((_: Unit) ⇒ ByteString("+OK\r\n"))
|
||||
server.owner ? ('set, key, value) map ((x: Any) ⇒ ByteString("+OK\r\n"))
|
||||
case Array("GET", key) ⇒
|
||||
server.owner !!! (('get, key)) map { value: Option[ByteString] ⇒
|
||||
value map { bytes ⇒
|
||||
ByteString("$" + bytes.length + "\r\n") ++ bytes
|
||||
} getOrElse ByteString("$-1\r\n")
|
||||
server.owner ? ('get, key) collect {
|
||||
case Some(b: ByteString) ⇒ ByteString("$" + b.length + "\r\n") ++ b
|
||||
case None ⇒ ByteString("$-1\r\n")
|
||||
}
|
||||
case Array("GETALL") ⇒
|
||||
server.owner !!! 'getall map { all: Map[String, ByteString] ⇒
|
||||
(ByteString("*" + (all.size * 2) + "\r\n") /: all) {
|
||||
case (result, (k, v)) ⇒
|
||||
val kBytes = ByteString(k)
|
||||
result ++ ByteString("$" + kBytes.length + "\r\n") ++ kBytes ++ ByteString("$" + v.length + "\r\n") ++ v
|
||||
}
|
||||
server.owner ? 'getall collect {
|
||||
case m: Map[_, _] ⇒
|
||||
(ByteString("*" + (m.size * 2) + "\r\n") /: m) {
|
||||
case (result, (k: String, v: ByteString)) ⇒
|
||||
val kBytes = ByteString(k)
|
||||
result ++ ByteString("$" + kBytes.length + "\r\n") ++ kBytes ++ ByteString("$" + v.length + "\r\n") ++ v
|
||||
}
|
||||
}
|
||||
}
|
||||
result recover {
|
||||
|
|
@ -174,12 +174,12 @@ class IOActorSpec extends WordSpec with MustMatchers with BeforeAndAfterEach {
|
|||
val ioManager = Actor.actorOf(new IOManager(2)).start // teeny tiny buffer
|
||||
val server = Actor.actorOf(new SimpleEchoServer("localhost", 8064, ioManager)).start
|
||||
val client = Actor.actorOf(new SimpleEchoClient("localhost", 8064, ioManager)).start
|
||||
val f1 = client !!! ByteString("Hello World!1")
|
||||
val f2 = client !!! ByteString("Hello World!2")
|
||||
val f3 = client !!! ByteString("Hello World!3")
|
||||
(f1.get: ByteString) must equal(ByteString("Hello World!1"))
|
||||
(f2.get: ByteString) must equal(ByteString("Hello World!2"))
|
||||
(f3.get: ByteString) must equal(ByteString("Hello World!3"))
|
||||
val f1 = client ? ByteString("Hello World!1")
|
||||
val f2 = client ? ByteString("Hello World!2")
|
||||
val f3 = client ? ByteString("Hello World!3")
|
||||
f1.get must equal(ByteString("Hello World!1"))
|
||||
f2.get must equal(ByteString("Hello World!2"))
|
||||
f3.get must equal(ByteString("Hello World!3"))
|
||||
client.stop
|
||||
server.stop
|
||||
ioManager.stop
|
||||
|
|
@ -190,7 +190,7 @@ class IOActorSpec extends WordSpec with MustMatchers with BeforeAndAfterEach {
|
|||
val server = Actor.actorOf(new SimpleEchoServer("localhost", 8064, ioManager)).start
|
||||
val client = Actor.actorOf(new SimpleEchoClient("localhost", 8064, ioManager)).start
|
||||
val list = List.range(0, 100000)
|
||||
val f: Future[List[ByteString]] = Future.traverse(list)(i ⇒ client !!! ByteString(i.toString))
|
||||
val f = Future.traverse(list)(i ⇒ client ? ByteString(i.toString))
|
||||
assert(f.get.size === 100000)
|
||||
client.stop
|
||||
server.stop
|
||||
|
|
@ -202,7 +202,7 @@ class IOActorSpec extends WordSpec with MustMatchers with BeforeAndAfterEach {
|
|||
val server = Actor.actorOf(new SimpleEchoServer("localhost", 8064, ioManager)).start
|
||||
val client = Actor.actorOf(new SimpleEchoClient("localhost", 8064, ioManager)).start
|
||||
val list = List.range(0, 100000)
|
||||
val f: Future[List[ByteString]] = Future.traverse(list)(i ⇒ client !!! ByteString(i.toString))
|
||||
val f = Future.traverse(list)(i ⇒ client ? ByteString(i.toString))
|
||||
assert(f.get.size === 100000)
|
||||
client.stop
|
||||
server.stop
|
||||
|
|
@ -214,20 +214,20 @@ class IOActorSpec extends WordSpec with MustMatchers with BeforeAndAfterEach {
|
|||
val server = Actor.actorOf(new KVStore("localhost", 8064, ioManager)).start
|
||||
val client1 = Actor.actorOf(new KVClient("localhost", 8064, ioManager)).start
|
||||
val client2 = Actor.actorOf(new KVClient("localhost", 8064, ioManager)).start
|
||||
val f1 = client1 !!! (('set, "hello", ByteString("World")))
|
||||
val f2 = client1 !!! (('set, "test", ByteString("No one will read me")))
|
||||
val f3 = client1 !!! (('get, "hello"))
|
||||
val f1 = client1 ? ('set, "hello", ByteString("World"))
|
||||
val f2 = client1 ? ('set, "test", ByteString("No one will read me"))
|
||||
val f3 = client1 ? ('get, "hello")
|
||||
f2.await
|
||||
val f4 = client2 !!! (('set, "test", ByteString("I'm a test!")))
|
||||
val f4 = client2 ? ('set, "test", ByteString("I'm a test!"))
|
||||
f4.await
|
||||
val f5 = client1 !!! (('get, "test"))
|
||||
val f6 = client2 !!! 'getall
|
||||
(f1.get: String) must equal("OK")
|
||||
(f2.get: String) must equal("OK")
|
||||
(f3.get: ByteString) must equal(ByteString("World"))
|
||||
(f4.get: String) must equal("OK")
|
||||
(f5.get: ByteString) must equal(ByteString("I'm a test!"))
|
||||
(f6.get: Map[String, ByteString]) must equal(Map("hello" -> ByteString("World"), "test" -> ByteString("I'm a test!")))
|
||||
val f5 = client1 ? ('get, "test")
|
||||
val f6 = client2 ? 'getall
|
||||
f1.get must equal("OK")
|
||||
f2.get must equal("OK")
|
||||
f3.get must equal(ByteString("World"))
|
||||
f4.get must equal("OK")
|
||||
f5.get must equal(ByteString("I'm a test!"))
|
||||
f6.get must equal(Map("hello" -> ByteString("World"), "test" -> ByteString("I'm a test!")))
|
||||
client1.stop
|
||||
client2.stop
|
||||
server.stop
|
||||
|
|
|
|||
|
|
@ -81,7 +81,7 @@ object IO {
|
|||
socket
|
||||
}
|
||||
|
||||
def accept()(implicit sender: Some[ActorRef]): SocketHandle = accept(sender.get)
|
||||
def accept()(implicit socketOwner: ScalaActorRef): SocketHandle = accept(socketOwner)
|
||||
}
|
||||
|
||||
sealed trait IOMessage
|
||||
|
|
@ -95,24 +95,36 @@ object IO {
|
|||
case class Read(handle: ReadHandle, bytes: ByteString) extends IOMessage
|
||||
case class Write(handle: WriteHandle, bytes: ByteString) extends IOMessage
|
||||
|
||||
def listen(ioManager: ActorRef, host: String, port: Int)(implicit sender: Some[ActorRef]): ServerHandle =
|
||||
listen(ioManager, new InetSocketAddress(host, port))
|
||||
|
||||
def listen(ioManager: ActorRef, address: InetSocketAddress)(implicit sender: Some[ActorRef]): ServerHandle = {
|
||||
val server = ServerHandle(sender.get, ioManager)
|
||||
def listen(ioManager: ActorRef, address: InetSocketAddress, owner: ActorRef): ServerHandle = {
|
||||
val server = ServerHandle(owner, ioManager)
|
||||
ioManager ! Listen(server, address)
|
||||
server
|
||||
}
|
||||
|
||||
def connect(ioManager: ActorRef, host: String, port: Int)(implicit sender: Some[ActorRef]): SocketHandle =
|
||||
connect(ioManager, new InetSocketAddress(host, port))
|
||||
def listen(ioManager: ActorRef, address: InetSocketAddress)(implicit sender: ScalaActorRef): ServerHandle =
|
||||
listen(ioManager, address, sender)
|
||||
|
||||
def connect(ioManager: ActorRef, address: InetSocketAddress)(implicit sender: Some[ActorRef]): SocketHandle = {
|
||||
val socket = SocketHandle(sender.get, ioManager)
|
||||
def listen(ioManager: ActorRef, host: String, port: Int, owner: ActorRef): ServerHandle =
|
||||
listen(ioManager, new InetSocketAddress(host, port), owner)
|
||||
|
||||
def listen(ioManager: ActorRef, host: String, port: Int)(implicit sender: ScalaActorRef): ServerHandle =
|
||||
listen(ioManager, new InetSocketAddress(host, port), sender)
|
||||
|
||||
def connect(ioManager: ActorRef, address: InetSocketAddress, owner: ActorRef): SocketHandle = {
|
||||
val socket = SocketHandle(owner, ioManager)
|
||||
ioManager ! Connect(socket, address)
|
||||
socket
|
||||
}
|
||||
|
||||
def connect(ioManager: ActorRef, address: InetSocketAddress)(implicit sender: ScalaActorRef): SocketHandle =
|
||||
connect(ioManager, address, sender)
|
||||
|
||||
def connect(ioManager: ActorRef, host: String, port: Int, owner: ActorRef): SocketHandle =
|
||||
connect(ioManager, new InetSocketAddress(host, port), owner)
|
||||
|
||||
def connect(ioManager: ActorRef, host: String, port: Int)(implicit sender: ScalaActorRef): SocketHandle =
|
||||
connect(ioManager, new InetSocketAddress(host, port), sender)
|
||||
|
||||
private class HandleState(var readBytes: ByteString, var connected: Boolean) {
|
||||
def this() = this(ByteString.empty, false)
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue