a few fixes after review of the IO doc changes

This commit is contained in:
Roland 2013-05-27 14:26:34 +02:00
parent 214c29a459
commit caab7909d7
7 changed files with 24 additions and 25 deletions

View file

@ -762,12 +762,12 @@ object BackpressureBuffer {
* difference was small then the buffer would more quickly oscillate between * difference was small then the buffer would more quickly oscillate between
* these two limits). * these two limits).
*/ */
class BackpressureBuffer(lowWatermark: Long, highWatermark: Long, maxCapacity: Long) class BackpressureBuffer(lowBytes: Long, highBytes: Long, maxBytes: Long)
extends PipelineStage[HasLogging, Tcp.Command, Tcp.Command, Tcp.Event, Tcp.Event] { extends PipelineStage[HasLogging, Tcp.Command, Tcp.Command, Tcp.Event, Tcp.Event] {
require(lowWatermark >= 0, "lowWatermark needs to be non-negative") require(lowBytes >= 0, "lowWatermark needs to be non-negative")
require(highWatermark >= lowWatermark, "highWatermark needs to be at least as large as lowWatermark") require(highBytes >= lowBytes, "highWatermark needs to be at least as large as lowWatermark")
require(maxCapacity >= highWatermark, "maxCapacity needs to be at least as large as highWatermark") require(maxBytes >= highBytes, "maxCapacity needs to be at least as large as highWatermark")
case class Ack(num: Int, ack: Tcp.Event) extends Tcp.Event case class Ack(num: Int, ack: Tcp.Event) extends Tcp.Event
@ -899,11 +899,11 @@ class BackpressureBuffer(lowWatermark: Long, highWatermark: Long, maxCapacity: L
storage :+= w storage :+= w
stored += w.data.size stored += w.data.size
if (stored > maxCapacity) { if (stored > maxBytes) {
log.warning("aborting connection (buffer overrun)") log.warning("aborting connection (buffer overrun)")
become(finished) become(finished)
ctx.singleCommand(Abort) ctx.singleCommand(Abort)
} else if (stored > highWatermark && !suspended) { } else if (stored > highBytes && !suspended) {
log.debug("suspending writes") log.debug("suspending writes")
suspended = true suspended = true
if (doWrite) { if (doWrite) {
@ -926,7 +926,7 @@ class BackpressureBuffer(lowWatermark: Long, highWatermark: Long, maxCapacity: L
storageOffset += 1 storageOffset += 1
storage = storage drop 1 storage = storage drop 1
if (suspended && stored < lowWatermark) { if (suspended && stored < lowBytes) {
log.debug("resuming writes") log.debug("resuming writes")
suspended = false suspended = false
if (ack == NoAck) ctx.singleEvent(LowWatermarkReached) if (ack == NoAck) ctx.singleEvent(LowWatermarkReached)

View file

@ -695,7 +695,6 @@ object TcpMessage {
def resumeReading: Command = ResumeReading def resumeReading: Command = ResumeReading
implicit private def fromJava[T](coll: JIterable[T]): immutable.Traversable[T] = { implicit private def fromJava[T](coll: JIterable[T]): immutable.Traversable[T] = {
import scala.collection.JavaConverters._ akka.japi.Util.immutableSeq(coll)
coll.asScala.to[immutable.Traversable]
} }
} }

View file

@ -266,12 +266,13 @@ have inserted a backpressure buffer which will generate a
(generated at 10000 buffered bytes) and a :class:`LowWatermarkReached` when (generated at 10000 buffered bytes) and a :class:`LowWatermarkReached` when
they can resume writing (when buffer empties below 1000 bytes); the buffer has they can resume writing (when buffer empties below 1000 bytes); the buffer has
a maximum capacity of 1MB. The implementation is very similar to the NACK-based a maximum capacity of 1MB. The implementation is very similar to the NACK-based
backpressure approach presented above. Above the SSL stage comes an adapter backpressure approach presented above, please refer to the API documentation
which extracts only the payload data from the TCP commands and events, i.e. it for details about its usage. Above the SSL stage comes an adapter which
speaks :class:`ByteString` above. The resulting byte streams are broken into extracts only the payload data from the TCP commands and events, i.e. it speaks
frames by a :class:`DelimiterFraming` stage which chops them up on newline :class:`ByteString` above. The resulting byte streams are broken into frames by
characters. The top-most stage then converts between :class:`String` and UTF-8 a :class:`DelimiterFraming` stage which chops them up on newline characters.
encoded :class:`ByteString`. The top-most stage then converts between :class:`String` and UTF-8 encoded
:class:`ByteString`.
As a result the pipeline will accept simple :class:`String` commands, encode As a result the pipeline will accept simple :class:`String` commands, encode
them using UTF-8, delimit them with newlines (which are expected to be already them using UTF-8, delimit them with newlines (which are expected to be already

View file

@ -29,7 +29,7 @@ Unconnected UDP
--------------- ---------------
Simple Send Simple Send
^^^^^^^^^^^^ ^^^^^^^^^^^
.. includecode:: code/docs/io/UdpDocTest.java#sender .. includecode:: code/docs/io/UdpDocTest.java#sender

View file

@ -261,12 +261,12 @@ have inserted a backpressure buffer which will generate a
:class:`HighWatermarkReached` event to tell the upper stages to suspend writing :class:`HighWatermarkReached` event to tell the upper stages to suspend writing
and a :class:`LowWatermarkReached` when they can resume writing. The and a :class:`LowWatermarkReached` when they can resume writing. The
implementation is very similar to the NACK-based backpressure approach implementation is very similar to the NACK-based backpressure approach
presented above. Above the SSL stage comes an adapter which extracts only the presented above, please refer to the API docs for details on its usage. Above
payload data from the TCP commands and events, i.e. it speaks the SSL stage comes an adapter which extracts only the payload data from the
:class:`ByteString` above. The resulting byte streams are broken into frames by TCP commands and events, i.e. it speaks :class:`ByteString` above. The
a :class:`DelimiterFraming` stage which chops them up on newline characters. resulting byte streams are broken into frames by a :class:`DelimiterFraming`
The top-most stage then converts between :class:`String` and UTF-8 encoded stage which chops them up on newline characters. The top-most stage then
:class:`ByteString`. converts between :class:`String` and UTF-8 encoded :class:`ByteString`.
As a result the pipeline will accept simple :class:`String` commands, encode As a result the pipeline will accept simple :class:`String` commands, encode
them using UTF-8, delimit them with newlines (which are expected to be already them using UTF-8, delimit them with newlines (which are expected to be already

View file

@ -160,8 +160,7 @@ class SslTlsSupportSpec extends AkkaSpec {
includeDelimiter = true) >> includeDelimiter = true) >>
new TcpReadWriteAdapter >> new TcpReadWriteAdapter >>
new SslTlsSupport(sslEngine(remote, client = false)) >> new SslTlsSupport(sslEngine(remote, client = false)) >>
new BackpressureBuffer(lowWatermark = 1000, highWatermark = 10000, new BackpressureBuffer(lowBytes = 100, highBytes = 1000, maxBytes = 1000000))
maxCapacity = 1000000))
val connection = sender val connection = sender
val handler = context.actorOf(Props(new AkkaSslHandler(init))) val handler = context.actorOf(Props(new AkkaSslHandler(init)))