diff --git a/akka-actor/src/main/scala/akka/actor/IO.scala b/akka-actor/src/main/scala/akka/actor/IO.scala index 5942f285de..1911770a63 100644 --- a/akka-actor/src/main/scala/akka/actor/IO.scala +++ b/akka-actor/src/main/scala/akka/actor/IO.scala @@ -23,8 +23,17 @@ import scala.annotation.tailrec import scala.collection.generic.CanBuildFrom import com.eaio.uuid.UUID +/** + * IO messages and iteratees. + */ object IO { + /** + * An immutable handle to a Java NIO Channel. Contains a reference to the + * [[akka.actor.ActorRef]] that will receive events related to the Channel, + * a reference to the [[akka.actor.IOManager]] that manages the Channel, and + * a [[com.eaio.uuid.UUID]] to uniquely identify the Channel. + */ sealed trait Handle { this: Product ⇒ def owner: ActorRef @@ -37,26 +46,69 @@ object IO { def asSocket: SocketHandle = sys error "Not a socket" def asServer: ServerHandle = sys error "Not a server" + /** + * Sends a request to the [[akka.actor.IOManager]] to close the Channel + * associated with this [[akka.actor.IO.Handle]]. + * + * This can also be performed by sending [[akka.actor.IO.Close]] to the + * [[akka.actor.IOManager]]. + */ def close(): Unit = ioManager ! Close(this) } + /** + * A [[akka.actor.IO.Handle]] to a ReadableByteChannel. + */ sealed trait ReadHandle extends Handle with Product { override def asReadable = this } + /** + * A [[akka.actor.IO.Handle]] to a WritableByteChannel. + */ sealed trait WriteHandle extends Handle with Product { override def asWritable = this + /** + * Sends a request to the [[akka.actor.IOManager]] to write to the + * Channel associated with this [[akka.actor.IO.Handle]]. + * + * This can also be performed by sending [[akka.actor.IO.Write]] to the + * [[akka.actor.IOManager]]. + */ def write(bytes: ByteString): Unit = ioManager ! Write(this, bytes) } + /** + * A [[akka.actor.IO.Handle]] to a SocketChannel. Instances are normally + * created by [[akka.actor.IOManager]].connect() and + * [[akka.actor.IO.ServerHandle]].accept(). + */ case class SocketHandle(owner: ActorRef, ioManager: ActorRef, uuid: UUID = new UUID()) extends ReadHandle with WriteHandle { override def asSocket = this } + /** + * A [[akka.actor.IO.Handle]] to a ServerSocketChannel. Instances are + * normally created by [[akka.actor.IOManager]].listen(). + */ case class ServerHandle(owner: ActorRef, ioManager: ActorRef, uuid: UUID = new UUID()) extends Handle { override def asServer = this + /** + * Sends a request to the [[akka.actor.IOManager]] to accept an incoming + * connection to the ServerSocketChannel associated with this + * [[akka.actor.IO.Handle]]. + * + * This can also be performed by creating a new [[akka.actor.IO.SocketHandle]] + * and sending it within an [[akka.actor.IO.Accept]] to the [[akka.actor.IOManager]]. + * + * @param socketOwner the [[akka.actor.ActorRef]] that should receive events + * associated with the SocketChannel. The ActorRef for the + * current Actor will be used implicitly. + * @return a new SocketHandle that can be used to perform actions on the + * new connection's SocketChannel. + */ def accept()(implicit socketOwner: ActorRef): SocketHandle = { val socket = SocketHandle(socketOwner, ioManager) ioManager ! Accept(socket, this) @@ -64,18 +116,91 @@ object IO { } } + /** + * Messages used to communicate with an [[akka.actor.IOManager]]. + */ sealed trait IOMessage + + /** + * Message to an [[akka.actor.IOManager]] to create a ServerSocketChannel + * listening on the provided address. + * + * Normally sent using IOManager.listen() + */ case class Listen(server: ServerHandle, address: InetSocketAddress) extends IOMessage + + /** + * Message from an [[akka.actor.IOManager]] that a new connection has been + * made to the ServerSocketChannel and needs to be accepted. + */ case class NewClient(server: ServerHandle) extends IOMessage + + /** + * Message to an [[akka.actor.IOManager]] to accept a new connection. + * + * Normally sent using [[akka.actor.IO.ServerHandle]].accept() + */ case class Accept(socket: SocketHandle, server: ServerHandle) extends IOMessage + + /** + * Message to an [[akka.actor.IOManager]] to create a SocketChannel connected + * to the provided address. + * + * Normally sent using IOManager.connect() + */ case class Connect(socket: SocketHandle, address: InetSocketAddress) extends IOMessage + + /** + * Message from an [[akka.actor.IOManager]] that the SocketChannel has + * successfully connected. + * + * No action is required by the receiving [[akka.actor.Actor]], although + * the message still needs to be in it's receive method. + */ case class Connected(socket: SocketHandle) extends IOMessage + + /** + * Message to an [[akka.actor.IOManager]] to close the Channel. + * + * Normally sent using [[akka.actor.IO.Handle]].close() + */ case class Close(handle: Handle) extends IOMessage + + /** + * Message from an [[akka.actor.IOManager]] that the Channel has closed. Can + * optionally contain the Exception that caused the Channel to close, if + * applicable. + * + * No action is required by the receiving [[akka.actor.Actor]], although + * the message still needs to be in it's receive method. + */ case class Closed(handle: Handle, cause: Option[Exception]) extends IOMessage + + /** + * Message from an [[akka.actor.IOManager]] that contains bytes read from + * the SocketChannel. + */ case class Read(handle: ReadHandle, bytes: ByteString) extends IOMessage + + /** + * Message to an [[akka.actor.IOManager]] to write to the SocketChannel. + * + * Normally sent using [[akka.actor.IO.SocketHandle]].write() + */ case class Write(handle: WriteHandle, bytes: ByteString) extends IOMessage + /** + * Represents part of a stream of bytes that can be processed by an + * [[akka.actor.IO.Iteratee]]. + */ sealed trait Input { + /** + * Append another Input to this one. + * + * If 'that' is an [[akka.actor.IO.EOF]] then it will replace any + * remaining bytes in this Input. If 'this' is an [[akka.actor.IO.EOF]] + * then it will be replaced by 'that'. + */ def ++(that: Input): Input } @@ -83,6 +208,9 @@ object IO { val empty = Chunk(ByteString.empty) } + /** + * Part of an [[akka.actor.IO.Input]] stream that contains a chunk of bytes. + */ case class Chunk(bytes: ByteString) extends Input { def ++(that: Input) = that match { case Chunk(more) ⇒ Chunk(bytes ++ more) @@ -90,11 +218,24 @@ object IO { } } + /** + * Part of an [[akka.actor.IO.Input]] stream that represents the end of the + * stream. + * + * This will cause the [[akka.actor.IO.Iteratee]] that processes it + * to terminate early. If a cause is defined it can be 'caught' by + * Iteratee.recover() in order to handle it properly. + */ case class EOF(cause: Option[Exception]) extends Input { def ++(that: Input) = that } object Iteratee { + /** + * Wrap the provided value within a [[akka.actor.IO.Done]] + * [[akka.actor.IO.Iteratee]]. This is a helper for cases where the type should be + * inferred as an Iteratee and not as a Done. + */ def apply[A](value: A): Iteratee[A] = Done(value) def apply(): Iteratee[Unit] = unit val unit: Iteratee[Unit] = Done(()) @@ -102,37 +243,67 @@ object IO { /** * A basic Iteratee implementation of Oleg's Iteratee (http://okmij.org/ftp/Streams.html). - * No support for Enumerator or Input types other then ByteString at the moment. + * To keep this implementation simple it has no support for Enumerator or Input types + * other then ByteString. + * + * Other Iteratee implementations can be used in place of this one if any + * missing features are required. */ sealed abstract class Iteratee[+A] { /** - * Applies the given input to the Iteratee, returning the resulting Iteratee - * and the unused Input. + * Processes the given [[akka.actor.IO.Input]], returning the resulting + * Iteratee and the remaining Input. */ final def apply(input: Input): (Iteratee[A], Input) = this match { case Cont(f, None) ⇒ f(input) case iter ⇒ (iter, input) } + /** + * Passes an [[akka.actor.IO.EOF]] to this Iteratee and returns the + * result if available. + * + * If this Iteratee is in a failure state then the Exception will be thrown. + * + * If this Iteratee is not well behaved (does not return a result on EOF) + * then a "Divergent Iteratee" Exception will be thrown. + */ final def get: A = this(EOF(None))._1 match { case Done(value) ⇒ value case Cont(_, None) ⇒ sys.error("Divergent Iteratee") case Cont(_, Some(err)) ⇒ throw err } + /** + * Applies a function to the result of this Iteratee, resulting in a new + * Iteratee. Any unused [[akka.actor.IO.Input]] that is given to this + * Iteratee will be passed to that resulting Iteratee. This is the + * primary method of composing Iteratees together in order to process + * an Input stream. + */ final def flatMap[B](f: A ⇒ Iteratee[B]): Iteratee[B] = this match { case Done(value) ⇒ f(value) case Cont(k: Chain[_], err) ⇒ Cont(k :+ f, err) case Cont(k, err) ⇒ Cont(Chain(k, f), err) } + /** + * Applies a function to transform the result of this Iteratee. + */ final def map[B](f: A ⇒ B): Iteratee[B] = this match { case Done(value) ⇒ Done(f(value)) case Cont(k: Chain[_], err) ⇒ Cont(k :+ ((a: A) ⇒ Done(f(a))), err) case Cont(k, err) ⇒ Cont(Chain(k, (a: A) ⇒ Done(f(a))), err) } + /** + * Provides a handler for any matching errors that may have occured while + * running this Iteratee. + * + * Errors are usually raised within the Iteratee with [[akka.actor.IO]].throwErr + * or by processing an [[akka.actor.IO.EOF]] that contains an Exception. + */ def recover[B >: A](pf: PartialFunction[Exception, B]): Iteratee[B] = this match { case done @ Done(_) ⇒ done case Cont(_, Some(err)) if pf isDefinedAt err ⇒ Done(pf(err)) @@ -142,24 +313,54 @@ object IO { } /** - * An Iteratee representing a result and the remaining ByteString. Also used to - * wrap any constants or precalculated values that need to be composed with - * other Iteratees. + * An Iteratee representing a result, usually returned by the successful + * completion of an Iteratee. Also used to wrap any constants or + * precalculated values that need to be composed with other Iteratees. */ final case class Done[+A](result: A) extends Iteratee[A] /** - * An Iteratee that still requires more input to calculate it's result. + * An [[akka.actor.IO.Iteratee]] that still requires more input to calculate + * it's result. It may also contain an optional error, which can be handled + * with 'recover()'. + * + * It is possible to recover from an error and continue processing this + * Iteratee without losing the continuation, although that has not yet + * been tested. An example use case of this is resuming a failed download. */ final case class Cont[+A](f: Input ⇒ (Iteratee[A], Input), error: Option[Exception] = None) extends Iteratee[A] object IterateeRef { + + /** + * Creates an [[akka.actor.IO.IterateeRefSync]] containing an initial + * [[akka.actor.IO.Iteratee]]. + */ def sync[A](initial: Iteratee[A]): IterateeRefSync[A] = new IterateeRefSync(initial) + + /** + * Creates an empty [[akka.actor.IO.IterateeRefSync]]. + */ def sync(): IterateeRefSync[Unit] = new IterateeRefSync(Iteratee.unit) + /** + * Creates an [[akka.actor.IO.IterateeRefAsync]] containing an initial + * [[akka.actor.IO.Iteratee]]. + */ def async[A](initial: Iteratee[A])(implicit executor: ExecutionContext): IterateeRefAsync[A] = new IterateeRefAsync(initial) + + /** + * Creates an empty [[akka.actor.IO.IterateeRefAsync]]. + */ def async()(implicit executor: ExecutionContext): IterateeRefAsync[Unit] = new IterateeRefAsync(Iteratee.unit) + /** + * A mutable Map to contain multiple IterateeRefs. + * + * This Map differs from the mutable Map within Scala's standard library + * by automatically including any keys used to lookup an IterateeRef. The + * 'refFactory' is used to provide the default value for new keys. + */ class Map[K, V] private (refFactory: ⇒ IterateeRef[V], underlying: mutable.Map[K, IterateeRef[V]] = mutable.Map.empty[K, IterateeRef[V]]) extends mutable.Map[K, IterateeRef[V]] { def get(key: K) = Some(underlying.getOrElseUpdate(key, refFactory)) def iterator = underlying.iterator @@ -167,20 +368,31 @@ object IO { def -=(key: K) = { underlying -= key; this } override def empty = new Map[K, V](refFactory) } + object Map { + /** + * Uses a factory to create the initial IterateeRef for each new key. + */ def apply[K, V](refFactory: ⇒ IterateeRef[V]): IterateeRef.Map[K, V] = new Map(refFactory) + + /** + * Creates an empty [[akka.actor.IO.IterateeRefSync]] for each new key. + */ def sync[K](): IterateeRef.Map[K, Unit] = new Map(IterateeRef.sync()) + + /** + * Creates an empty [[akka.actor.IO.IterateeRefAsync]] for each new key. + */ def async[K]()(implicit executor: ExecutionContext): IterateeRef.Map[K, Unit] = new Map(IterateeRef.async()) } + } /** - * A mutable reference to an Iteratee. Not thread safe. + * A mutable reference to an Iteratee designed for use within an Actor. * - * Designed for use within an Actor. - * - * Includes mutable implementations of flatMap, map, and apply which - * update the internal reference and return Unit. + * See [[akka.actor.IO.IterateeRefSync]] and [[akka.actor.IO.IterateeRefAsync]] + * for details. */ trait IterateeRef[A] { def flatMap(f: A ⇒ Iteratee[A]): Unit @@ -188,6 +400,17 @@ object IO { def apply(input: Input): Unit } + /** + * A mutable reference to an [[akka.actor.IO.Iteratee]]. Not thread safe. + * + * Designed for use within an [[akka.actor.Actor]]. + * + * Includes mutable implementations of flatMap, map, and apply which + * update the internal reference and return Unit. + * + * [[akka.actor.IO.Input]] remaining after processing the Iteratee will + * be stored and processed later when 'flatMap' is used next. + */ final class IterateeRefSync[A](initial: Iteratee[A]) extends IterateeRef[A] { private var _value: (Iteratee[A], Input) = (initial, Chunk.empty) def flatMap(f: A ⇒ Iteratee[A]): Unit = _value = _value match { @@ -199,6 +422,20 @@ object IO { def value: (Iteratee[A], Input) = _value } + /** + * A mutable reference to an [[akka.actor.IO.Iteratee]]. Not thread safe. + * + * Designed for use within an [[akka.actor.Actor]], although all actions + * perfomed on the Iteratee are processed within a [[akka.dispatch.Future]] + * so it is not safe to refer to the Actor's state from within this Iteratee. + * Messages should instead be sent to the Actor in order to modify state. + * + * Includes mutable implementations of flatMap, map, and apply which + * update the internal reference and return Unit. + * + * [[akka.actor.IO.Input]] remaining after processing the Iteratee will + * be stored and processed later when 'flatMap' is used next. + */ final class IterateeRefAsync[A](initial: Iteratee[A])(implicit executor: ExecutionContext) extends IterateeRef[A] { private var _value: Future[(Iteratee[A], Input)] = Future((initial, Chunk.empty)) def flatMap(f: A ⇒ Iteratee[A]): Unit = _value = _value map { @@ -210,6 +447,10 @@ object IO { def future: Future[(Iteratee[A], Input)] = _value } + /** + * An [[akka.actor.IO.Iteratee]] that contains an Exception. The Exception + * can be handled with Iteratee.recover(). + */ final def throwErr(err: Exception): Iteratee[Nothing] = Cont(input ⇒ (throwErr(err), input), Some(err)) /**