implement ResumeWriting, see #3200

also included:
- a complete rewrite of the TCP docs based on real/tested/working code
  samples
- an EchoServer implementation which handles all the edge cases,
  available in Java & Scala
- renamed StopReading to SuspendReading to match up with ResumeReading
- addition of Inbox.watch()
- Inbox RST docs for Java(!) and Scala

not included:
- ScalaDoc / JavaDoc for all IO stuff
This commit is contained in:
Roland 2013-04-16 22:31:09 +02:00
parent 489c00b913
commit 0e34edbcb3
20 changed files with 1874 additions and 187 deletions

View file

@ -0,0 +1,302 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.io
import java.net.InetSocketAddress
import scala.concurrent.duration.DurationInt
import com.typesafe.config.ConfigFactory
import akka.actor.{ Actor, ActorDSL, ActorLogging, ActorRef, ActorSystem, Props, SupervisorStrategy }
import akka.actor.ActorDSL.inbox
import akka.io.{ IO, Tcp }
import akka.util.ByteString
object EchoServer extends App {
val config = ConfigFactory.parseString("akka.loglevel = DEBUG")
implicit val system = ActorSystem("EchoServer", config)
// make sure to stop the system so that the application stops
try run()
finally system.shutdown()
def run(): Unit = {
import ActorDSL._
// create two EchoManager and stop the application once one dies
val watcher = inbox()
watcher.watch(system.actorOf(Props(classOf[EchoManager], classOf[EchoHandler]), "echo"))
watcher.watch(system.actorOf(Props(classOf[EchoManager], classOf[SimpleEchoHandler]), "simple"))
watcher.receive(10.minutes)
}
}
class EchoManager(handlerClass: Class[_]) extends Actor with ActorLogging {
import Tcp._
import context.system
// there is not recovery for broken connections
override val supervisorStrategy = SupervisorStrategy.stoppingStrategy
// bind to the listen port; the port will automatically be closed once this actor dies
override def preStart(): Unit = {
IO(Tcp) ! Bind(self, new InetSocketAddress("localhost", 0))
}
// do not restart
override def postRestart(thr: Throwable): Unit = context stop self
def receive = {
case Bound(localAddress)
log.info("listening on port {}", localAddress.getPort)
case CommandFailed(Bind(_, local, _, _))
log.warning(s"cannot bind to [$local]")
context stop self
//#echo-manager
case Connected(remote, local)
log.info("received connection from {}", remote)
val handler = context.actorOf(Props(handlerClass, sender, remote))
sender ! Register(handler, keepOpenOnPeerClosed = true)
//#echo-manager
}
}
object EchoHandler {
def apply(connection: ActorRef, remote: InetSocketAddress): Props =
Props(classOf[EchoHandler], connection, remote)
}
//#echo-handler
class EchoHandler(connection: ActorRef, remote: InetSocketAddress)
extends Actor with ActorLogging {
import Tcp._
// sign death pact: this actor terminates when connection breaks
context watch connection
// start out in optimistic write-through mode
def receive = writing
//#writing
def writing: Receive = {
case Received(data)
connection ! Write(data, currentOffset)
buffer(data)
case ack: Int
acknowledge(ack)
case CommandFailed(Write(_, ack: Int))
connection ! ResumeWriting
context become buffering(ack)
case PeerClosed
if (storage.isEmpty) context stop self
else context become closing
}
//#writing
//#buffering
def buffering(nack: Int): Receive = {
var toAck = 10
var peerClosed = false
{
case Received(data) buffer(data)
case WritingResumed writeFirst()
case PeerClosed peerClosed = true
case ack: Int if ack < nack acknowledge(ack)
case ack: Int
acknowledge(ack)
if (storage.nonEmpty) {
if (toAck > 0) {
// stay in ACK-based mode for a while
writeFirst()
toAck -= 1
} else {
// then return to NACK-based again
writeAll()
context become (if (peerClosed) closing else writing)
}
} else if (peerClosed) context stop self
else context become writing
}
}
//#buffering
//#closing
def closing: Receive = {
case CommandFailed(_: Write)
connection ! ResumeWriting
context.become({
case WritingResumed
writeAll()
context.unbecome()
case ack: Int acknowledge(ack)
}, discardOld = false)
case ack: Int
acknowledge(ack)
if (storage.isEmpty) context stop self
}
//#closing
override def postStop(): Unit = {
log.info(s"transferred $transferred bytes from/to [$remote]")
}
//#storage-omitted
var storageOffset = 0
var storage = Vector.empty[ByteString]
var stored = 0L
var transferred = 0L
val maxStored = 100000000L
val highWatermark = maxStored * 5 / 10
val lowWatermark = maxStored * 3 / 10
var suspended = false
private def currentOffset = storageOffset + storage.size
//#helpers
private def buffer(data: ByteString): Unit = {
storage :+= data
stored += data.size
if (stored > maxStored) {
log.warning(s"drop connection to [$remote] (buffer overrun)")
context stop self
} else if (stored > highWatermark) {
log.debug(s"suspending reading at $currentOffset")
connection ! SuspendReading
suspended = true
}
}
private def acknowledge(ack: Int): Unit = {
require(ack == storageOffset, s"received ack $ack at $storageOffset")
require(storage.nonEmpty, s"storage was empty at ack $ack")
val size = storage(0).size
stored -= size
transferred += size
storageOffset += 1
storage = storage drop 1
if (suspended && stored < lowWatermark) {
log.debug("resuming reading")
connection ! ResumeReading
suspended = false
}
}
//#helpers
private def writeFirst(): Unit = {
connection ! Write(storage(0), storageOffset)
}
private def writeAll(): Unit = {
for ((data, i) storage.zipWithIndex) {
connection ! Write(data, storageOffset + i)
}
}
//#storage-omitted
}
//#echo-handler
//#simple-echo-handler
class SimpleEchoHandler(connection: ActorRef, remote: InetSocketAddress)
extends Actor with ActorLogging {
import Tcp._
// sign death pact: this actor terminates when connection breaks
context watch connection
case object Ack
def receive = {
case Received(data)
buffer(data)
connection ! Write(data, Ack)
context.become({
case Received(data) buffer(data)
case Ack acknowledge()
case PeerClosed closing = true
}, discardOld = false)
case PeerClosed context stop self
}
//#storage-omitted
override def postStop(): Unit = {
log.info(s"transferred $transferred bytes from/to [$remote]")
}
var storage = Vector.empty[ByteString]
var stored = 0L
var transferred = 0L
var closing = false
val maxStored = 100000000L
val highWatermark = maxStored * 5 / 10
val lowWatermark = maxStored * 3 / 10
var suspended = false
//#simple-helpers
private def buffer(data: ByteString): Unit = {
storage :+= data
stored += data.size
if (stored > maxStored) {
log.warning(s"drop connection to [$remote] (buffer overrun)")
context stop self
} else if (stored > highWatermark) {
log.debug(s"suspending reading")
connection ! SuspendReading
suspended = true
}
}
private def acknowledge(): Unit = {
require(storage.nonEmpty, "storage was empty")
val size = storage(0).size
stored -= size
transferred += size
storage = storage drop 1
if (suspended && stored < lowWatermark) {
log.debug("resuming reading")
connection ! ResumeReading
suspended = false
}
if (storage.isEmpty) {
if (closing) context stop self
else context.unbecome()
} else connection ! Write(storage(0), Ack)
}
//#simple-helpers
//#storage-omitted
}
//#simple-echo-handler

View file

@ -0,0 +1,117 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.io
//#imports
import akka.actor.{ Actor, ActorRef, Props }
import akka.io.{ IO, Tcp }
import akka.util.ByteString
import java.net.InetSocketAddress
//#imports
import akka.testkit.AkkaSpec
import scala.concurrent.duration._
class DemoActor extends Actor {
//#manager
import akka.io.{ IO, Tcp }
import context.system // implicitly used by IO(Tcp)
val manager = IO(Tcp)
//#manager
def receive = Actor.emptyBehavior
}
//#server
object Server {
def apply(manager: ActorRef) = Props(classOf[Server], manager)
}
class Server(manager: ActorRef) extends Actor {
import Tcp._
import context.system
IO(Tcp) ! Bind(self, new InetSocketAddress("localhost", 0))
def receive = {
case b @ Bound(localAddress) manager ! b
case CommandFailed(_: Bind) context stop self
case c @ Connected(remote, local)
manager ! c
val handler = context.actorOf(Props[SimplisticHandler])
val connection = sender
connection ! Register(handler)
}
}
//#server
//#simplistic-handler
class SimplisticHandler extends Actor {
import Tcp._
def receive = {
case Received(data) sender ! Write(data)
case PeerClosed context stop self
}
}
//#simplistic-handler
//#client
object Client {
def apply(remote: InetSocketAddress, replies: ActorRef) =
Props(classOf[Client], remote, replies)
}
class Client(remote: InetSocketAddress, listener: ActorRef) extends Actor {
import Tcp._
import context.system
IO(Tcp) ! Connect(remote)
def receive = {
case CommandFailed(_: Connect)
listener ! "failed"
context stop self
case c @ Connected(remote, local)
listener ! c
val connection = sender
connection ! Register(self)
context become {
case data: ByteString connection ! Write(data)
case CommandFailed(w: Write) // O/S buffer was full
case Received(data) listener ! data
case "close" connection ! Close
case _: ConnectionClosed context stop self
}
}
}
//#client
class IODocSpec extends AkkaSpec {
"demonstrate connect" in {
val server = system.actorOf(Server(testActor), "server1")
val listen = expectMsgType[Tcp.Bound].localAddress
val client = system.actorOf(Client(listen, testActor), "client1")
val c1, c2 = expectMsgType[Tcp.Connected]
c1.localAddress must be(c2.remoteAddress)
c2.localAddress must be(c1.remoteAddress)
client ! ByteString("hello")
expectMsgType[ByteString].utf8String must be("hello")
watch(client)
client ! "close"
expectTerminated(client, 1.second)
}
}