diff --git a/akka-docs/scala/code/akka/docs/io/HTTPServer.scala b/akka-docs/scala/code/akka/docs/io/HTTPServer.scala new file mode 100644 index 0000000000..b47b7ace44 --- /dev/null +++ b/akka-docs/scala/code/akka/docs/io/HTTPServer.scala @@ -0,0 +1,216 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ +package akka.docs.io + +import akka.actor._ +import akka.util.{ ByteString, ByteStringBuilder } +import java.net.InetSocketAddress + +class HttpServer(port: Int) extends Actor { + + val state = IO.IterateeRef.Map.async[IO.Handle]()(context.dispatcher) + + override def preStart { + IOManager(context.system) listen new InetSocketAddress(port) + } + + def receive = { + + case IO.NewClient(server) ⇒ + val socket = server.accept() + state(socket) flatMap (_ ⇒ HttpServer.processRequest(socket)) + + case IO.Read(socket, bytes) ⇒ + state(socket)(IO Chunk bytes) + + case IO.Closed(socket, cause) ⇒ + state(socket)(IO EOF None) + state -= socket + + } + +} + +object HttpServer { + import HttpIteratees._ + + def processRequest(socket: IO.SocketHandle): IO.Iteratee[Unit] = + IO repeat { + for { + request ← readRequest + } yield { + val rsp = request match { + case Request("GET", "ping" :: Nil, _, _, headers, _) ⇒ + OKResponse(ByteString("

pong

"), + request.headers.exists { case Header(n, v) ⇒ n.toLowerCase == "connection" && v.toLowerCase == "keep-alive" }) + case req ⇒ + OKResponse(ByteString("

" + req.toString + "

"), + request.headers.exists { case Header(n, v) ⇒ n.toLowerCase == "connection" && v.toLowerCase == "keep-alive" }) + } + socket write OKResponse.bytes(rsp).compact + if (!rsp.keepAlive) socket.close() + } + } + +} + +//#request-class +case class Request(meth: String, path: List[String], query: Option[String], httpver: String, headers: List[Header], body: Option[ByteString]) +case class Header(name: String, value: String) +//#request-class + +//#constants +object HttpConstants { + val SP = ByteString(" ") + val HT = ByteString("\t") + val CRLF = ByteString("\r\n") + val COLON = ByteString(":") + val PERCENT = ByteString("%") + val PATH = ByteString("/") + val QUERY = ByteString("?") +} +//#constants + +//#read-request +object HttpIteratees { + import HttpConstants._ + + def readRequest = + for { + requestLine ← readRequestLine + (meth, (path, query), httpver) = requestLine + headers ← readHeaders + body ← readBody(headers) + } yield Request(meth, path, query, httpver, headers, body) + //#read-request + + //#read-request-line + def ascii(bytes: ByteString): String = bytes.decodeString("US-ASCII").trim + + def readRequestLine = + for { + meth ← IO takeUntil SP + uri ← readRequestURI + _ ← IO takeUntil SP // ignore the rest + httpver ← IO takeUntil CRLF + } yield (ascii(meth), uri, ascii(httpver)) + //#read-request-line + + //#read-request-uri + def readRequestURI = IO peek 1 flatMap { + case PATH ⇒ + for { + path ← readPath + query ← readQuery + } yield (path, query) + case _ ⇒ sys.error("Not Implemented") + } + //#read-request-uri + + //#read-path + def readPath = { + def step(segments: List[String]): IO.Iteratee[List[String]] = IO peek 1 flatMap { + case PATH ⇒ IO drop 1 flatMap (_ ⇒ readUriPart(pathchar) flatMap (segment ⇒ step(segment :: segments))) + case _ ⇒ segments match { + case "" :: rest ⇒ IO Done rest.reverse + case _ ⇒ IO Done segments.reverse + } + } + step(Nil) + } + //#read-path + + //#read-query + def readQuery: IO.Iteratee[Option[String]] = IO peek 1 flatMap { + case QUERY ⇒ IO drop 1 flatMap (_ ⇒ readUriPart(querychar) map (Some(_))) + case _ ⇒ IO Done None + } + //#read-query + + //#read-uri-part + val alpha = Set.empty ++ ('a' to 'z') ++ ('A' to 'Z') map (_.toByte) + val digit = Set.empty ++ ('0' to '9') map (_.toByte) + val hexdigit = digit ++ (Set.empty ++ ('a' to 'f') ++ ('A' to 'F') map (_.toByte)) + val subdelim = Set('!', '$', '&', '\'', '(', ')', '*', '+', ',', ';', '=') map (_.toByte) + val pathchar = alpha ++ digit ++ subdelim ++ (Set(':', '@') map (_.toByte)) + val querychar = pathchar ++ (Set('/', '?') map (_.toByte)) + + def readUriPart(allowed: Set[Byte]): IO.Iteratee[String] = for { + str ← IO takeWhile allowed map ascii + pchar ← IO peek 1 map (_ == PERCENT) + all ← if (pchar) readPChar flatMap (ch ⇒ readUriPart(allowed) map (str + ch + _)) else IO Done str + } yield all + + def readPChar = IO take 3 map { + case Seq('%', rest @ _*) if rest forall hexdigit ⇒ + java.lang.Integer.parseInt(rest map (_.toChar) mkString, 16).toChar + } + //#read-uri-part + + //#read-headers + def readHeaders = { + def step(found: List[Header]): IO.Iteratee[List[Header]] = { + IO peek 2 flatMap { + case CRLF ⇒ IO takeUntil CRLF flatMap (_ ⇒ IO Done found) + case _ ⇒ readHeader flatMap (header ⇒ step(header :: found)) + } + } + step(Nil) + } + + def readHeader = + for { + name ← IO takeUntil COLON + value ← IO takeUntil CRLF flatMap readMultiLineValue + } yield Header(ascii(name), ascii(value)) + + def readMultiLineValue(initial: ByteString): IO.Iteratee[ByteString] = IO peek 1 flatMap { + case SP ⇒ IO takeUntil CRLF flatMap (bytes ⇒ readMultiLineValue(initial ++ bytes)) + case _ ⇒ IO Done initial + } + //#read-headers + + //#read-body + def readBody(headers: List[Header]) = + if (headers.exists(header ⇒ header.name == "Content-Length" || header.name == "Transfer-Encoding")) + IO.takeAll map (Some(_)) + else + IO Done None + //#read-body +} + +//#ok-response +object OKResponse { + import HttpConstants.CRLF + + val okStatus = ByteString("HTTP/1.1 200 OK") + val contentType = ByteString("Content-Type: text/html; charset=utf-8") + val cacheControl = ByteString("Cache-Control: no-cache") + val date = ByteString("Date: ") + val server = ByteString("Server: Akka") + val contentLength = ByteString("Content-Length: ") + val connection = ByteString("Connection: ") + val keepAlive = ByteString("Keep-Alive") + val close = ByteString("Close") + + def bytes(rsp: OKResponse) = { + new ByteStringBuilder ++= + okStatus ++= CRLF ++= + contentType ++= CRLF ++= + cacheControl ++= CRLF ++= + date ++= ByteString(new java.util.Date().toString) ++= CRLF ++= + server ++= CRLF ++= + contentLength ++= ByteString(rsp.body.length.toString) ++= CRLF ++= + connection ++= (if (rsp.keepAlive) keepAlive else close) ++= CRLF ++= CRLF ++= rsp.body result + } + +} +case class OKResponse(body: ByteString, keepAlive: Boolean) +//#ok-response + +object Main extends App { + val port = Option(System.getenv("PORT")) map (_.toInt) getOrElse 8080 + val system = ActorSystem() + val server = system.actorOf(Props(new HttpServer(port))) +} diff --git a/akka-docs/scala/index.rst b/akka-docs/scala/index.rst index 803dd26799..268fc06981 100644 --- a/akka-docs/scala/index.rst +++ b/akka-docs/scala/index.rst @@ -22,5 +22,6 @@ Scala API stm agents transactors + io testing extending-akka diff --git a/akka-docs/scala/io.rst b/akka-docs/scala/io.rst new file mode 100644 index 0000000000..2379902716 --- /dev/null +++ b/akka-docs/scala/io.rst @@ -0,0 +1,74 @@ +.. _io-scala: + +IO (Scala) +========== + +.. sidebar:: Contents + + .. contents:: :local: + +Introduction +------------ + +This documentation is in progress. More to come. + +Examples +-------- + +Http Server +^^^^^^^^^^^ + +Some commonly used constants: + +.. includecode:: code/akka/docs/io/HTTPServer.scala + :include: constants + +And case classes to hold the resulting request: + +.. includecode:: code/akka/docs/io/HTTPServer.scala + :include: request-class + +Now for our first ``Iteratee``. There are 3 main sections of a HTTP request: the request line, the headers, and an optional body. The main request ``Iteratee`` handles each section separately: + +.. includecode:: code/akka/docs/io/HTTPServer.scala + :include: read-request + +In the above code ``readRequest`` takes the results of 3 different ``Iteratees`` (``readRequestLine``, ``readHeaders``, ``readBody``) and combines them into a single ``Request`` object. ``readRequestLine`` actually returns a tuple, so we extract it's individual components. ``readBody`` depends on values contained within the header section, so we must pass those to the method. + +The request line has 3 parts to it: the HTTP method, the requested URI, and the HTTP version. The parts are separated by a single space, and the entire request line ends with a ``CRLF``. + +.. includecode:: code/akka/docs/io/HTTPServer.scala + :include: read-request-line + +Reading the request method is simple as it is a single string ending in a space. The simple ``Iteratee`` that performs this is ``IO.takeUntil(delimiter: ByteString): Iteratee[ByteString]``. It keeps consuming input until the specified delimiter is found. Reading the HTTP version is also a simple string that ends with a ``CRLF``. + +The ``ascii`` method is a helper that takes a ``ByteString`` and parses it as a ``US-ASCII`` ``String``. + +Reading the request URI is a bit more complicated because we want to parse the individual components of the URI instead of just returning a simple string: + +.. includecode:: code/akka/docs/io/HTTPServer.scala + :include: read-request-uri + +For this example we are only interested in handling absolute paths. To detect if we the URI is an absolute path we use ``IO.peek(length: Int): Iteratee[ByteString]``, which returns a ``ByteString`` of the request length but doesn't actually consume the input. We peek at the next bit of input and see if it matches our ``PATH`` constant (defined above as ``ByteString("/")``). If it doesn't match we throw an error, but for a more robust solution we would want to handle other valid URIs. + +Reading the URI path will be our most complex ``Iteratee``. It involves a recursive method that reads in each path segment of the URI: + +.. includecode:: code/akka/docs/io/HTTPServer.scala + :include: read-path + +The ``step`` method is a recursive method that takes a ``List`` of the accumulated path segments. It first checks if the remaining input starts with the ``PATH`` constant, and if it does, it drops that input, and returns the ``readUriPart`` ``Iteratee`` which has it's result added to the path segment accumulator and the ``step`` method is run again. + +If after reading in a path segment the next input does not start with a path, we reverse the accumulated segments and return it (dropping the last segment if it is blank). + +.. includecode:: code/akka/docs/io/HTTPServer.scala + :include: read-query + +.. includecode:: code/akka/docs/io/HTTPServer.scala + :include: read-uri-part + +.. includecode:: code/akka/docs/io/HTTPServer.scala + :include: read-headers + +.. includecode:: code/akka/docs/io/HTTPServer.scala + :include: read-body +