Use correct implicits

This commit is contained in:
Derek Williams 2011-12-31 15:31:08 -07:00
parent a1f9b81ce9
commit 5bf8ca5b43
2 changed files with 29 additions and 22 deletions

View file

@ -14,6 +14,8 @@ object IOActorSpec {
class SimpleEchoServer(host: String, port: Int, started: TestLatch) extends Actor {
implicit val system = context.system
IO listen (host, port)
started.open
@ -44,6 +46,8 @@ object IOActorSpec {
class SimpleEchoClient(host: String, port: Int) extends Actor {
implicit val system = context.system
val socket = IO connect (host, port)
val state = IO.IterateeRef.sync()
@ -87,6 +91,8 @@ object IOActorSpec {
// Basic Redis-style protocol
class KVStore(host: String, port: Int, started: TestLatch) extends Actor {
implicit val system = context.system
val state = IO.IterateeRef.Map.sync[IO.Handle]()
var kvs: Map[String, String] = Map.empty
@ -151,6 +157,8 @@ object IOActorSpec {
class KVClient(host: String, port: Int) extends Actor {
implicit val system = context.system
val socket = IO connect (host, port)
val state = IO.IterateeRef.sync()

View file

@ -3,6 +3,7 @@
*/
package akka.actor
import akka.dispatch.{ Future, ExecutionContext }
import akka.util.ByteString
import java.net.InetSocketAddress
import java.io.IOException
@ -74,37 +75,37 @@ object IO {
case class Read(handle: ReadHandle, bytes: ByteString) extends IOMessage
case class Write(handle: WriteHandle, bytes: ByteString) extends IOMessage
def listen(address: InetSocketAddress)(implicit context: ActorContext, owner: ActorRef): ServerHandle = {
val ioManager = IOManager.start()(context.system)
def listen(address: InetSocketAddress)(implicit system: ActorSystem, owner: ActorRef): ServerHandle = {
val ioManager = IOManager.start()(system)
val server = ServerHandle(owner, ioManager)
ioManager ! Listen(server, address)
server
}
def listen(host: String, port: Int)(implicit context: ActorContext, owner: ActorRef): ServerHandle =
listen(new InetSocketAddress(host, port))(context, owner)
def listen(host: String, port: Int)(implicit system: ActorSystem, owner: ActorRef): ServerHandle =
listen(new InetSocketAddress(host, port))(system, owner)
def listen(address: InetSocketAddress, owner: ActorRef)(implicit context: ActorContext): ServerHandle =
listen(address)(context, owner)
def listen(address: InetSocketAddress, owner: ActorRef)(implicit system: ActorSystem): ServerHandle =
listen(address)(system, owner)
def listen(host: String, port: Int, owner: ActorRef)(implicit context: ActorContext): ServerHandle =
listen(new InetSocketAddress(host, port))(context, owner)
def listen(host: String, port: Int, owner: ActorRef)(implicit system: ActorSystem): ServerHandle =
listen(new InetSocketAddress(host, port))(system, owner)
def connect(address: InetSocketAddress)(implicit context: ActorContext, owner: ActorRef): SocketHandle = {
val ioManager = IOManager.start()(context.system)
def connect(address: InetSocketAddress)(implicit system: ActorSystem, owner: ActorRef): SocketHandle = {
val ioManager = IOManager.start()(system)
val socket = SocketHandle(owner, ioManager)
ioManager ! Connect(socket, address)
socket
}
def connect(host: String, port: Int)(implicit context: ActorContext, owner: ActorRef): SocketHandle =
connect(new InetSocketAddress(host, port))(context, owner)
def connect(host: String, port: Int)(implicit system: ActorSystem, owner: ActorRef): SocketHandle =
connect(new InetSocketAddress(host, port))(system, owner)
def connect(address: InetSocketAddress, owner: ActorRef)(implicit context: ActorContext): SocketHandle =
connect(address)(context, owner)
def connect(address: InetSocketAddress, owner: ActorRef)(implicit system: ActorSystem): SocketHandle =
connect(address)(system, owner)
def connect(host: String, port: Int, owner: ActorRef)(implicit context: ActorContext): SocketHandle =
connect(new InetSocketAddress(host, port))(context, owner)
def connect(host: String, port: Int, owner: ActorRef)(implicit system: ActorSystem): SocketHandle =
connect(new InetSocketAddress(host, port))(system, owner)
sealed trait Input {
def ++(that: Input): Input
@ -190,8 +191,8 @@ object IO {
def sync[A](initial: Iteratee[A]): IterateeRefSync[A] = new IterateeRefSync(initial)
def sync(): IterateeRefSync[Unit] = new IterateeRefSync(Iteratee.unit)
def async[A](initial: Iteratee[A])(implicit app: ActorSystem): IterateeRefAsync[A] = new IterateeRefAsync(initial)
def async()(implicit app: ActorSystem): IterateeRefAsync[Unit] = new IterateeRefAsync(Iteratee.unit)
def async[A](initial: Iteratee[A])(implicit executor: ExecutionContext): IterateeRefAsync[A] = new IterateeRefAsync(initial)
def async()(implicit executor: ExecutionContext): IterateeRefAsync[Unit] = new IterateeRefAsync(Iteratee.unit)
class Map[K, V] private (refFactory: IterateeRef[V], underlying: mutable.Map[K, IterateeRef[V]] = mutable.Map.empty[K, IterateeRef[V]]) extends mutable.Map[K, IterateeRef[V]] {
def get(key: K) = Some(underlying.getOrElseUpdate(key, refFactory))
@ -203,7 +204,7 @@ object IO {
object Map {
def apply[K, V](refFactory: IterateeRef[V]): IterateeRef.Map[K, V] = new Map(refFactory)
def sync[K](): IterateeRef.Map[K, Unit] = new Map(IterateeRef.sync())
def async[K]()(implicit app: ActorSystem): IterateeRef.Map[K, Unit] = new Map(IterateeRef.async())
def async[K]()(implicit executor: ExecutionContext): IterateeRef.Map[K, Unit] = new Map(IterateeRef.async())
}
}
@ -232,8 +233,7 @@ object IO {
def value: (Iteratee[A], Input) = _value
}
final class IterateeRefAsync[A](initial: Iteratee[A])(implicit app: ActorSystem) extends IterateeRef[A] {
import akka.dispatch.Future
final class IterateeRefAsync[A](initial: Iteratee[A])(implicit executor: ExecutionContext) extends IterateeRef[A] {
private var _value: Future[(Iteratee[A], Input)] = Future((initial, Chunk.empty))
def flatMap(f: A Iteratee[A]): Unit = _value = _value map {
case (iter, chunk @ Chunk(bytes)) if bytes.nonEmpty (iter flatMap f)(chunk)
@ -421,7 +421,6 @@ object IOManager {
def stop()(implicit system: ActorSystem): Unit = {
// TODO: send shutdown message to IOManager
}
}
final class WriteBuffer(bufferSize: Int) {