From caab7909d797aa5cc23b00d51a1700dc3a80fb56 Mon Sep 17 00:00:00 2001 From: Roland Date: Mon, 27 May 2013 14:26:34 +0200 Subject: [PATCH] a few fixes after review of the IO doc changes --- akka-actor/src/main/scala/akka/io/Pipelines.scala | 14 +++++++------- akka-actor/src/main/scala/akka/io/Tcp.scala | 3 +-- .../main/scala/akka/io/TcpPipelineHandler.scala | 2 +- akka-docs/rst/java/io-tcp.rst | 13 +++++++------ akka-docs/rst/java/io-udp.rst | 2 +- akka-docs/rst/scala/io-tcp.rst | 12 ++++++------ .../test/scala/akka/io/ssl/SslTlsSupportSpec.scala | 3 +-- 7 files changed, 24 insertions(+), 25 deletions(-) diff --git a/akka-actor/src/main/scala/akka/io/Pipelines.scala b/akka-actor/src/main/scala/akka/io/Pipelines.scala index cfa131a554..71f3f90417 100644 --- a/akka-actor/src/main/scala/akka/io/Pipelines.scala +++ b/akka-actor/src/main/scala/akka/io/Pipelines.scala @@ -762,12 +762,12 @@ object BackpressureBuffer { * difference was small then the buffer would more quickly oscillate between * these two limits). */ -class BackpressureBuffer(lowWatermark: Long, highWatermark: Long, maxCapacity: Long) +class BackpressureBuffer(lowBytes: Long, highBytes: Long, maxBytes: Long) extends PipelineStage[HasLogging, Tcp.Command, Tcp.Command, Tcp.Event, Tcp.Event] { - require(lowWatermark >= 0, "lowWatermark needs to be non-negative") - require(highWatermark >= lowWatermark, "highWatermark needs to be at least as large as lowWatermark") - require(maxCapacity >= highWatermark, "maxCapacity needs to be at least as large as highWatermark") + require(lowBytes >= 0, "lowWatermark needs to be non-negative") + require(highBytes >= lowBytes, "highWatermark needs to be at least as large as lowWatermark") + require(maxBytes >= highBytes, "maxCapacity needs to be at least as large as highWatermark") case class Ack(num: Int, ack: Tcp.Event) extends Tcp.Event @@ -899,11 +899,11 @@ class BackpressureBuffer(lowWatermark: Long, highWatermark: Long, maxCapacity: L storage :+= w stored += w.data.size - if (stored > maxCapacity) { + if (stored > maxBytes) { log.warning("aborting connection (buffer overrun)") become(finished) ctx.singleCommand(Abort) - } else if (stored > highWatermark && !suspended) { + } else if (stored > highBytes && !suspended) { log.debug("suspending writes") suspended = true if (doWrite) { @@ -926,7 +926,7 @@ class BackpressureBuffer(lowWatermark: Long, highWatermark: Long, maxCapacity: L storageOffset += 1 storage = storage drop 1 - if (suspended && stored < lowWatermark) { + if (suspended && stored < lowBytes) { log.debug("resuming writes") suspended = false if (ack == NoAck) ctx.singleEvent(LowWatermarkReached) diff --git a/akka-actor/src/main/scala/akka/io/Tcp.scala b/akka-actor/src/main/scala/akka/io/Tcp.scala index 5e4619fadd..0e7fbf6e1a 100644 --- a/akka-actor/src/main/scala/akka/io/Tcp.scala +++ b/akka-actor/src/main/scala/akka/io/Tcp.scala @@ -695,7 +695,6 @@ object TcpMessage { def resumeReading: Command = ResumeReading implicit private def fromJava[T](coll: JIterable[T]): immutable.Traversable[T] = { - import scala.collection.JavaConverters._ - coll.asScala.to[immutable.Traversable] + akka.japi.Util.immutableSeq(coll) } } diff --git a/akka-actor/src/main/scala/akka/io/TcpPipelineHandler.scala b/akka-actor/src/main/scala/akka/io/TcpPipelineHandler.scala index dad73f3519..193d1d7e72 100644 --- a/akka-actor/src/main/scala/akka/io/TcpPipelineHandler.scala +++ b/akka-actor/src/main/scala/akka/io/TcpPipelineHandler.scala @@ -118,7 +118,7 @@ object TcpPipelineHandler { * handler may want to watch this actor’s lifecycle. * * IMPORTANT: - * + * * Proper function of this actor (and of other pipeline stages like [[TcpReadWriteAdapter]] * depends on the fact that stages handling TCP commands and events pass unknown * subtypes through unaltered. There are more commands and events than are declared diff --git a/akka-docs/rst/java/io-tcp.rst b/akka-docs/rst/java/io-tcp.rst index 1cbccb71be..73164b21de 100644 --- a/akka-docs/rst/java/io-tcp.rst +++ b/akka-docs/rst/java/io-tcp.rst @@ -266,12 +266,13 @@ have inserted a backpressure buffer which will generate a (generated at 10000 buffered bytes) and a :class:`LowWatermarkReached` when they can resume writing (when buffer empties below 1000 bytes); the buffer has a maximum capacity of 1MB. The implementation is very similar to the NACK-based -backpressure approach presented above. Above the SSL stage comes an adapter -which extracts only the payload data from the TCP commands and events, i.e. it -speaks :class:`ByteString` above. The resulting byte streams are broken into -frames by a :class:`DelimiterFraming` stage which chops them up on newline -characters. The top-most stage then converts between :class:`String` and UTF-8 -encoded :class:`ByteString`. +backpressure approach presented above, please refer to the API documentation +for details about its usage. Above the SSL stage comes an adapter which +extracts only the payload data from the TCP commands and events, i.e. it speaks +:class:`ByteString` above. The resulting byte streams are broken into frames by +a :class:`DelimiterFraming` stage which chops them up on newline characters. +The top-most stage then converts between :class:`String` and UTF-8 encoded +:class:`ByteString`. As a result the pipeline will accept simple :class:`String` commands, encode them using UTF-8, delimit them with newlines (which are expected to be already diff --git a/akka-docs/rst/java/io-udp.rst b/akka-docs/rst/java/io-udp.rst index 6d2a4be024..85e2d71882 100644 --- a/akka-docs/rst/java/io-udp.rst +++ b/akka-docs/rst/java/io-udp.rst @@ -29,7 +29,7 @@ Unconnected UDP --------------- Simple Send -^^^^^^^^^^^^ +^^^^^^^^^^^ .. includecode:: code/docs/io/UdpDocTest.java#sender diff --git a/akka-docs/rst/scala/io-tcp.rst b/akka-docs/rst/scala/io-tcp.rst index de3ffa4227..c60b360f0d 100644 --- a/akka-docs/rst/scala/io-tcp.rst +++ b/akka-docs/rst/scala/io-tcp.rst @@ -261,12 +261,12 @@ have inserted a backpressure buffer which will generate a :class:`HighWatermarkReached` event to tell the upper stages to suspend writing and a :class:`LowWatermarkReached` when they can resume writing. The implementation is very similar to the NACK-based backpressure approach -presented above. Above the SSL stage comes an adapter which extracts only the -payload data from the TCP commands and events, i.e. it speaks -:class:`ByteString` above. The resulting byte streams are broken into frames by -a :class:`DelimiterFraming` stage which chops them up on newline characters. -The top-most stage then converts between :class:`String` and UTF-8 encoded -:class:`ByteString`. +presented above, please refer to the API docs for details on its usage. Above +the SSL stage comes an adapter which extracts only the payload data from the +TCP commands and events, i.e. it speaks :class:`ByteString` above. The +resulting byte streams are broken into frames by a :class:`DelimiterFraming` +stage which chops them up on newline characters. The top-most stage then +converts between :class:`String` and UTF-8 encoded :class:`ByteString`. As a result the pipeline will accept simple :class:`String` commands, encode them using UTF-8, delimit them with newlines (which are expected to be already diff --git a/akka-remote/src/test/scala/akka/io/ssl/SslTlsSupportSpec.scala b/akka-remote/src/test/scala/akka/io/ssl/SslTlsSupportSpec.scala index 6aa83b80a6..671f788402 100644 --- a/akka-remote/src/test/scala/akka/io/ssl/SslTlsSupportSpec.scala +++ b/akka-remote/src/test/scala/akka/io/ssl/SslTlsSupportSpec.scala @@ -160,8 +160,7 @@ class SslTlsSupportSpec extends AkkaSpec { includeDelimiter = true) >> new TcpReadWriteAdapter >> new SslTlsSupport(sslEngine(remote, client = false)) >> - new BackpressureBuffer(lowWatermark = 1000, highWatermark = 10000, - maxCapacity = 1000000)) + new BackpressureBuffer(lowBytes = 100, highBytes = 1000, maxBytes = 1000000)) val connection = sender val handler = context.actorOf(Props(new AkkaSslHandler(init)))