From 5bf8ca5b43cb5ede53e93b96093fe5309f8677ff Mon Sep 17 00:00:00 2001 From: Derek Williams Date: Sat, 31 Dec 2011 15:31:08 -0700 Subject: [PATCH] Use correct implicits --- .../src/test/scala/akka/actor/IOActor.scala | 8 ++++ akka-actor/src/main/scala/akka/actor/IO.scala | 43 +++++++++---------- 2 files changed, 29 insertions(+), 22 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/IOActor.scala b/akka-actor-tests/src/test/scala/akka/actor/IOActor.scala index 06d3235409..5c2c162f25 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/IOActor.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/IOActor.scala @@ -14,6 +14,8 @@ object IOActorSpec { class SimpleEchoServer(host: String, port: Int, started: TestLatch) extends Actor { + implicit val system = context.system + IO listen (host, port) started.open @@ -44,6 +46,8 @@ object IOActorSpec { class SimpleEchoClient(host: String, port: Int) extends Actor { + implicit val system = context.system + val socket = IO connect (host, port) val state = IO.IterateeRef.sync() @@ -87,6 +91,8 @@ object IOActorSpec { // Basic Redis-style protocol class KVStore(host: String, port: Int, started: TestLatch) extends Actor { + implicit val system = context.system + val state = IO.IterateeRef.Map.sync[IO.Handle]() var kvs: Map[String, String] = Map.empty @@ -151,6 +157,8 @@ object IOActorSpec { class KVClient(host: String, port: Int) extends Actor { + implicit val system = context.system + val socket = IO connect (host, port) val state = IO.IterateeRef.sync() diff --git a/akka-actor/src/main/scala/akka/actor/IO.scala b/akka-actor/src/main/scala/akka/actor/IO.scala index 89922c94ca..bf4a8fc84b 100644 --- a/akka-actor/src/main/scala/akka/actor/IO.scala +++ b/akka-actor/src/main/scala/akka/actor/IO.scala @@ -3,6 +3,7 @@ */ package akka.actor +import akka.dispatch.{ Future, ExecutionContext } import akka.util.ByteString import java.net.InetSocketAddress import java.io.IOException @@ -74,37 +75,37 @@ object IO { case class Read(handle: ReadHandle, bytes: ByteString) extends IOMessage case class Write(handle: WriteHandle, bytes: ByteString) extends IOMessage - def listen(address: InetSocketAddress)(implicit context: ActorContext, owner: ActorRef): ServerHandle = { - val ioManager = IOManager.start()(context.system) + def listen(address: InetSocketAddress)(implicit system: ActorSystem, owner: ActorRef): ServerHandle = { + val ioManager = IOManager.start()(system) val server = ServerHandle(owner, ioManager) ioManager ! Listen(server, address) server } - def listen(host: String, port: Int)(implicit context: ActorContext, owner: ActorRef): ServerHandle = - listen(new InetSocketAddress(host, port))(context, owner) + def listen(host: String, port: Int)(implicit system: ActorSystem, owner: ActorRef): ServerHandle = + listen(new InetSocketAddress(host, port))(system, owner) - def listen(address: InetSocketAddress, owner: ActorRef)(implicit context: ActorContext): ServerHandle = - listen(address)(context, owner) + def listen(address: InetSocketAddress, owner: ActorRef)(implicit system: ActorSystem): ServerHandle = + listen(address)(system, owner) - def listen(host: String, port: Int, owner: ActorRef)(implicit context: ActorContext): ServerHandle = - listen(new InetSocketAddress(host, port))(context, owner) + def listen(host: String, port: Int, owner: ActorRef)(implicit system: ActorSystem): ServerHandle = + listen(new InetSocketAddress(host, port))(system, owner) - def connect(address: InetSocketAddress)(implicit context: ActorContext, owner: ActorRef): SocketHandle = { - val ioManager = IOManager.start()(context.system) + def connect(address: InetSocketAddress)(implicit system: ActorSystem, owner: ActorRef): SocketHandle = { + val ioManager = IOManager.start()(system) val socket = SocketHandle(owner, ioManager) ioManager ! Connect(socket, address) socket } - def connect(host: String, port: Int)(implicit context: ActorContext, owner: ActorRef): SocketHandle = - connect(new InetSocketAddress(host, port))(context, owner) + def connect(host: String, port: Int)(implicit system: ActorSystem, owner: ActorRef): SocketHandle = + connect(new InetSocketAddress(host, port))(system, owner) - def connect(address: InetSocketAddress, owner: ActorRef)(implicit context: ActorContext): SocketHandle = - connect(address)(context, owner) + def connect(address: InetSocketAddress, owner: ActorRef)(implicit system: ActorSystem): SocketHandle = + connect(address)(system, owner) - def connect(host: String, port: Int, owner: ActorRef)(implicit context: ActorContext): SocketHandle = - connect(new InetSocketAddress(host, port))(context, owner) + def connect(host: String, port: Int, owner: ActorRef)(implicit system: ActorSystem): SocketHandle = + connect(new InetSocketAddress(host, port))(system, owner) sealed trait Input { def ++(that: Input): Input @@ -190,8 +191,8 @@ object IO { def sync[A](initial: Iteratee[A]): IterateeRefSync[A] = new IterateeRefSync(initial) def sync(): IterateeRefSync[Unit] = new IterateeRefSync(Iteratee.unit) - def async[A](initial: Iteratee[A])(implicit app: ActorSystem): IterateeRefAsync[A] = new IterateeRefAsync(initial) - def async()(implicit app: ActorSystem): IterateeRefAsync[Unit] = new IterateeRefAsync(Iteratee.unit) + def async[A](initial: Iteratee[A])(implicit executor: ExecutionContext): IterateeRefAsync[A] = new IterateeRefAsync(initial) + def async()(implicit executor: ExecutionContext): IterateeRefAsync[Unit] = new IterateeRefAsync(Iteratee.unit) class Map[K, V] private (refFactory: ⇒ IterateeRef[V], underlying: mutable.Map[K, IterateeRef[V]] = mutable.Map.empty[K, IterateeRef[V]]) extends mutable.Map[K, IterateeRef[V]] { def get(key: K) = Some(underlying.getOrElseUpdate(key, refFactory)) @@ -203,7 +204,7 @@ object IO { object Map { def apply[K, V](refFactory: ⇒ IterateeRef[V]): IterateeRef.Map[K, V] = new Map(refFactory) def sync[K](): IterateeRef.Map[K, Unit] = new Map(IterateeRef.sync()) - def async[K]()(implicit app: ActorSystem): IterateeRef.Map[K, Unit] = new Map(IterateeRef.async()) + def async[K]()(implicit executor: ExecutionContext): IterateeRef.Map[K, Unit] = new Map(IterateeRef.async()) } } @@ -232,8 +233,7 @@ object IO { def value: (Iteratee[A], Input) = _value } - final class IterateeRefAsync[A](initial: Iteratee[A])(implicit app: ActorSystem) extends IterateeRef[A] { - import akka.dispatch.Future + final class IterateeRefAsync[A](initial: Iteratee[A])(implicit executor: ExecutionContext) extends IterateeRef[A] { private var _value: Future[(Iteratee[A], Input)] = Future((initial, Chunk.empty)) def flatMap(f: A ⇒ Iteratee[A]): Unit = _value = _value map { case (iter, chunk @ Chunk(bytes)) if bytes.nonEmpty ⇒ (iter flatMap f)(chunk) @@ -421,7 +421,6 @@ object IOManager { def stop()(implicit system: ActorSystem): Unit = { // TODO: send shutdown message to IOManager } - } final class WriteBuffer(bufferSize: Int) {