IO ScalaDocs about half done

This commit is contained in:
Derek Williams 2012-01-18 18:01:33 -07:00
parent cac7c8cdda
commit d2275ec4d2

View file

@ -23,8 +23,17 @@ import scala.annotation.tailrec
import scala.collection.generic.CanBuildFrom
import com.eaio.uuid.UUID
/**
* IO messages and iteratees.
*/
object IO {
/**
* An immutable handle to a Java NIO Channel. Contains a reference to the
* [[akka.actor.ActorRef]] that will receive events related to the Channel,
* a reference to the [[akka.actor.IOManager]] that manages the Channel, and
* a [[com.eaio.uuid.UUID]] to uniquely identify the Channel.
*/
sealed trait Handle {
this: Product
def owner: ActorRef
@ -37,26 +46,69 @@ object IO {
def asSocket: SocketHandle = sys error "Not a socket"
def asServer: ServerHandle = sys error "Not a server"
/**
* Sends a request to the [[akka.actor.IOManager]] to close the Channel
* associated with this [[akka.actor.IO.Handle]].
*
* This can also be performed by sending [[akka.actor.IO.Close]] to the
* [[akka.actor.IOManager]].
*/
def close(): Unit = ioManager ! Close(this)
}
/**
* A [[akka.actor.IO.Handle]] to a ReadableByteChannel.
*/
sealed trait ReadHandle extends Handle with Product {
override def asReadable = this
}
/**
* A [[akka.actor.IO.Handle]] to a WritableByteChannel.
*/
sealed trait WriteHandle extends Handle with Product {
override def asWritable = this
/**
* Sends a request to the [[akka.actor.IOManager]] to write to the
* Channel associated with this [[akka.actor.IO.Handle]].
*
* This can also be performed by sending [[akka.actor.IO.Write]] to the
* [[akka.actor.IOManager]].
*/
def write(bytes: ByteString): Unit = ioManager ! Write(this, bytes)
}
/**
* A [[akka.actor.IO.Handle]] to a SocketChannel. Instances are normally
* created by [[akka.actor.IOManager]].connect() and
* [[akka.actor.IO.ServerHandle]].accept().
*/
case class SocketHandle(owner: ActorRef, ioManager: ActorRef, uuid: UUID = new UUID()) extends ReadHandle with WriteHandle {
override def asSocket = this
}
/**
* A [[akka.actor.IO.Handle]] to a ServerSocketChannel. Instances are
* normally created by [[akka.actor.IOManager]].listen().
*/
case class ServerHandle(owner: ActorRef, ioManager: ActorRef, uuid: UUID = new UUID()) extends Handle {
override def asServer = this
/**
* Sends a request to the [[akka.actor.IOManager]] to accept an incoming
* connection to the ServerSocketChannel associated with this
* [[akka.actor.IO.Handle]].
*
* This can also be performed by creating a new [[akka.actor.IO.SocketHandle]]
* and sending it within an [[akka.actor.IO.Accept]] to the [[akka.actor.IOManager]].
*
* @param socketOwner the [[akka.actor.ActorRef]] that should receive events
* associated with the SocketChannel. The ActorRef for the
* current Actor will be used implicitly.
* @return a new SocketHandle that can be used to perform actions on the
* new connection's SocketChannel.
*/
def accept()(implicit socketOwner: ActorRef): SocketHandle = {
val socket = SocketHandle(socketOwner, ioManager)
ioManager ! Accept(socket, this)
@ -64,18 +116,91 @@ object IO {
}
}
/**
* Messages used to communicate with an [[akka.actor.IOManager]].
*/
sealed trait IOMessage
/**
* Message to an [[akka.actor.IOManager]] to create a ServerSocketChannel
* listening on the provided address.
*
* Normally sent using IOManager.listen()
*/
case class Listen(server: ServerHandle, address: InetSocketAddress) extends IOMessage
/**
* Message from an [[akka.actor.IOManager]] that a new connection has been
* made to the ServerSocketChannel and needs to be accepted.
*/
case class NewClient(server: ServerHandle) extends IOMessage
/**
* Message to an [[akka.actor.IOManager]] to accept a new connection.
*
* Normally sent using [[akka.actor.IO.ServerHandle]].accept()
*/
case class Accept(socket: SocketHandle, server: ServerHandle) extends IOMessage
/**
* Message to an [[akka.actor.IOManager]] to create a SocketChannel connected
* to the provided address.
*
* Normally sent using IOManager.connect()
*/
case class Connect(socket: SocketHandle, address: InetSocketAddress) extends IOMessage
/**
* Message from an [[akka.actor.IOManager]] that the SocketChannel has
* successfully connected.
*
* No action is required by the receiving [[akka.actor.Actor]], although
* the message still needs to be in it's receive method.
*/
case class Connected(socket: SocketHandle) extends IOMessage
/**
* Message to an [[akka.actor.IOManager]] to close the Channel.
*
* Normally sent using [[akka.actor.IO.Handle]].close()
*/
case class Close(handle: Handle) extends IOMessage
/**
* Message from an [[akka.actor.IOManager]] that the Channel has closed. Can
* optionally contain the Exception that caused the Channel to close, if
* applicable.
*
* No action is required by the receiving [[akka.actor.Actor]], although
* the message still needs to be in it's receive method.
*/
case class Closed(handle: Handle, cause: Option[Exception]) extends IOMessage
/**
* Message from an [[akka.actor.IOManager]] that contains bytes read from
* the SocketChannel.
*/
case class Read(handle: ReadHandle, bytes: ByteString) extends IOMessage
/**
* Message to an [[akka.actor.IOManager]] to write to the SocketChannel.
*
* Normally sent using [[akka.actor.IO.SocketHandle]].write()
*/
case class Write(handle: WriteHandle, bytes: ByteString) extends IOMessage
/**
* Represents part of a stream of bytes that can be processed by an
* [[akka.actor.IO.Iteratee]].
*/
sealed trait Input {
/**
* Append another Input to this one.
*
* If 'that' is an [[akka.actor.IO.EOF]] then it will replace any
* remaining bytes in this Input. If 'this' is an [[akka.actor.IO.EOF]]
* then it will be replaced by 'that'.
*/
def ++(that: Input): Input
}
@ -83,6 +208,9 @@ object IO {
val empty = Chunk(ByteString.empty)
}
/**
* Part of an [[akka.actor.IO.Input]] stream that contains a chunk of bytes.
*/
case class Chunk(bytes: ByteString) extends Input {
def ++(that: Input) = that match {
case Chunk(more) Chunk(bytes ++ more)
@ -90,11 +218,24 @@ object IO {
}
}
/**
* Part of an [[akka.actor.IO.Input]] stream that represents the end of the
* stream.
*
* This will cause the [[akka.actor.IO.Iteratee]] that processes it
* to terminate early. If a cause is defined it can be 'caught' by
* Iteratee.recover() in order to handle it properly.
*/
case class EOF(cause: Option[Exception]) extends Input {
def ++(that: Input) = that
}
object Iteratee {
/**
* Wrap the provided value within a [[akka.actor.IO.Done]]
* [[akka.actor.IO.Iteratee]]. This is a helper for cases where the type should be
* inferred as an Iteratee and not as a Done.
*/
def apply[A](value: A): Iteratee[A] = Done(value)
def apply(): Iteratee[Unit] = unit
val unit: Iteratee[Unit] = Done(())
@ -102,37 +243,67 @@ object IO {
/**
* A basic Iteratee implementation of Oleg's Iteratee (http://okmij.org/ftp/Streams.html).
* No support for Enumerator or Input types other then ByteString at the moment.
* To keep this implementation simple it has no support for Enumerator or Input types
* other then ByteString.
*
* Other Iteratee implementations can be used in place of this one if any
* missing features are required.
*/
sealed abstract class Iteratee[+A] {
/**
* Applies the given input to the Iteratee, returning the resulting Iteratee
* and the unused Input.
* Processes the given [[akka.actor.IO.Input]], returning the resulting
* Iteratee and the remaining Input.
*/
final def apply(input: Input): (Iteratee[A], Input) = this match {
case Cont(f, None) f(input)
case iter (iter, input)
}
/**
* Passes an [[akka.actor.IO.EOF]] to this Iteratee and returns the
* result if available.
*
* If this Iteratee is in a failure state then the Exception will be thrown.
*
* If this Iteratee is not well behaved (does not return a result on EOF)
* then a "Divergent Iteratee" Exception will be thrown.
*/
final def get: A = this(EOF(None))._1 match {
case Done(value) value
case Cont(_, None) sys.error("Divergent Iteratee")
case Cont(_, Some(err)) throw err
}
/**
* Applies a function to the result of this Iteratee, resulting in a new
* Iteratee. Any unused [[akka.actor.IO.Input]] that is given to this
* Iteratee will be passed to that resulting Iteratee. This is the
* primary method of composing Iteratees together in order to process
* an Input stream.
*/
final def flatMap[B](f: A Iteratee[B]): Iteratee[B] = this match {
case Done(value) f(value)
case Cont(k: Chain[_], err) Cont(k :+ f, err)
case Cont(k, err) Cont(Chain(k, f), err)
}
/**
* Applies a function to transform the result of this Iteratee.
*/
final def map[B](f: A B): Iteratee[B] = this match {
case Done(value) Done(f(value))
case Cont(k: Chain[_], err) Cont(k :+ ((a: A) Done(f(a))), err)
case Cont(k, err) Cont(Chain(k, (a: A) Done(f(a))), err)
}
/**
* Provides a handler for any matching errors that may have occured while
* running this Iteratee.
*
* Errors are usually raised within the Iteratee with [[akka.actor.IO]].throwErr
* or by processing an [[akka.actor.IO.EOF]] that contains an Exception.
*/
def recover[B >: A](pf: PartialFunction[Exception, B]): Iteratee[B] = this match {
case done @ Done(_) done
case Cont(_, Some(err)) if pf isDefinedAt err Done(pf(err))
@ -142,24 +313,54 @@ object IO {
}
/**
* An Iteratee representing a result and the remaining ByteString. Also used to
* wrap any constants or precalculated values that need to be composed with
* other Iteratees.
* An Iteratee representing a result, usually returned by the successful
* completion of an Iteratee. Also used to wrap any constants or
* precalculated values that need to be composed with other Iteratees.
*/
final case class Done[+A](result: A) extends Iteratee[A]
/**
* An Iteratee that still requires more input to calculate it's result.
* An [[akka.actor.IO.Iteratee]] that still requires more input to calculate
* it's result. It may also contain an optional error, which can be handled
* with 'recover()'.
*
* It is possible to recover from an error and continue processing this
* Iteratee without losing the continuation, although that has not yet
* been tested. An example use case of this is resuming a failed download.
*/
final case class Cont[+A](f: Input (Iteratee[A], Input), error: Option[Exception] = None) extends Iteratee[A]
object IterateeRef {
/**
* Creates an [[akka.actor.IO.IterateeRefSync]] containing an initial
* [[akka.actor.IO.Iteratee]].
*/
def sync[A](initial: Iteratee[A]): IterateeRefSync[A] = new IterateeRefSync(initial)
/**
* Creates an empty [[akka.actor.IO.IterateeRefSync]].
*/
def sync(): IterateeRefSync[Unit] = new IterateeRefSync(Iteratee.unit)
/**
* Creates an [[akka.actor.IO.IterateeRefAsync]] containing an initial
* [[akka.actor.IO.Iteratee]].
*/
def async[A](initial: Iteratee[A])(implicit executor: ExecutionContext): IterateeRefAsync[A] = new IterateeRefAsync(initial)
/**
* Creates an empty [[akka.actor.IO.IterateeRefAsync]].
*/
def async()(implicit executor: ExecutionContext): IterateeRefAsync[Unit] = new IterateeRefAsync(Iteratee.unit)
/**
* A mutable Map to contain multiple IterateeRefs.
*
* This Map differs from the mutable Map within Scala's standard library
* by automatically including any keys used to lookup an IterateeRef. The
* 'refFactory' is used to provide the default value for new keys.
*/
class Map[K, V] private (refFactory: IterateeRef[V], underlying: mutable.Map[K, IterateeRef[V]] = mutable.Map.empty[K, IterateeRef[V]]) extends mutable.Map[K, IterateeRef[V]] {
def get(key: K) = Some(underlying.getOrElseUpdate(key, refFactory))
def iterator = underlying.iterator
@ -167,20 +368,31 @@ object IO {
def -=(key: K) = { underlying -= key; this }
override def empty = new Map[K, V](refFactory)
}
object Map {
/**
* Uses a factory to create the initial IterateeRef for each new key.
*/
def apply[K, V](refFactory: IterateeRef[V]): IterateeRef.Map[K, V] = new Map(refFactory)
/**
* Creates an empty [[akka.actor.IO.IterateeRefSync]] for each new key.
*/
def sync[K](): IterateeRef.Map[K, Unit] = new Map(IterateeRef.sync())
/**
* Creates an empty [[akka.actor.IO.IterateeRefAsync]] for each new key.
*/
def async[K]()(implicit executor: ExecutionContext): IterateeRef.Map[K, Unit] = new Map(IterateeRef.async())
}
}
/**
* A mutable reference to an Iteratee. Not thread safe.
* A mutable reference to an Iteratee designed for use within an Actor.
*
* Designed for use within an Actor.
*
* Includes mutable implementations of flatMap, map, and apply which
* update the internal reference and return Unit.
* See [[akka.actor.IO.IterateeRefSync]] and [[akka.actor.IO.IterateeRefAsync]]
* for details.
*/
trait IterateeRef[A] {
def flatMap(f: A Iteratee[A]): Unit
@ -188,6 +400,17 @@ object IO {
def apply(input: Input): Unit
}
/**
* A mutable reference to an [[akka.actor.IO.Iteratee]]. Not thread safe.
*
* Designed for use within an [[akka.actor.Actor]].
*
* Includes mutable implementations of flatMap, map, and apply which
* update the internal reference and return Unit.
*
* [[akka.actor.IO.Input]] remaining after processing the Iteratee will
* be stored and processed later when 'flatMap' is used next.
*/
final class IterateeRefSync[A](initial: Iteratee[A]) extends IterateeRef[A] {
private var _value: (Iteratee[A], Input) = (initial, Chunk.empty)
def flatMap(f: A Iteratee[A]): Unit = _value = _value match {
@ -199,6 +422,20 @@ object IO {
def value: (Iteratee[A], Input) = _value
}
/**
* A mutable reference to an [[akka.actor.IO.Iteratee]]. Not thread safe.
*
* Designed for use within an [[akka.actor.Actor]], although all actions
* perfomed on the Iteratee are processed within a [[akka.dispatch.Future]]
* so it is not safe to refer to the Actor's state from within this Iteratee.
* Messages should instead be sent to the Actor in order to modify state.
*
* Includes mutable implementations of flatMap, map, and apply which
* update the internal reference and return Unit.
*
* [[akka.actor.IO.Input]] remaining after processing the Iteratee will
* be stored and processed later when 'flatMap' is used next.
*/
final class IterateeRefAsync[A](initial: Iteratee[A])(implicit executor: ExecutionContext) extends IterateeRef[A] {
private var _value: Future[(Iteratee[A], Input)] = Future((initial, Chunk.empty))
def flatMap(f: A Iteratee[A]): Unit = _value = _value map {
@ -210,6 +447,10 @@ object IO {
def future: Future[(Iteratee[A], Input)] = _value
}
/**
* An [[akka.actor.IO.Iteratee]] that contains an Exception. The Exception
* can be handled with Iteratee.recover().
*/
final def throwErr(err: Exception): Iteratee[Nothing] = Cont(input (throwErr(err), input), Some(err))
/**