add BackpressureBuffer, see #3253

- also make a Write’s “ack” be a Tcp.Event (to suit pipelines)
- add stress test for BackpressureBuffer
- add it to SslTlsSupportSpec
- add it to the docs
This commit is contained in:
Roland 2013-05-26 10:58:55 +02:00
parent 025a91ecc2
commit ea5b79e562
15 changed files with 764 additions and 248 deletions

View file

@ -0,0 +1,203 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.io
import java.net.InetSocketAddress
import java.security.MessageDigest
import scala.concurrent.Await
import scala.concurrent.duration.{ Duration, DurationInt }
import scala.concurrent.forkjoin.ThreadLocalRandom
import akka.actor.{ Actor, ActorContext, ActorLogging, ActorRef, Props, ReceiveTimeout, Stash, Terminated }
import akka.io.TcpPipelineHandler.{ Init, Management, WithinActorContext }
import akka.pattern.ask
import akka.testkit.{ AkkaSpec, ImplicitSender }
import akka.util.{ ByteString, Timeout }
object BackpressureSpec {
final val ChunkSize = 1024
case class StartSending(n: Int)
case class Done(hash: ByteString)
case object Failed
case object Close
class Sender(receiver: InetSocketAddress) extends Actor with Stash with ActorLogging {
val digest = MessageDigest.getInstance("SHA-1")
digest.reset()
import context.system
IO(Tcp) ! Tcp.Connect(receiver)
def receive = {
case _: Tcp.Connected
val init = TcpPipelineHandler.withLogger(log,
new TcpReadWriteAdapter >>
new BackpressureBuffer(10000, 1000000, Long.MaxValue))
val handler = context.actorOf(TcpPipelineHandler(init, sender, self), "pipeline")
sender ! Tcp.Register(handler)
unstashAll()
context.become(connected(init, handler))
case _: Tcp.CommandFailed
unstashAll()
context.become(failed)
case _ stash()
}
def connected(init: Init[WithinActorContext, ByteString, ByteString], connection: ActorRef): Receive = {
case StartSending(0) sender ! Done(ByteString(digest.digest()))
case StartSending(n)
val rnd = ThreadLocalRandom.current
val data = Array.tabulate[Byte](ChunkSize)(_ rnd.nextInt().toByte)
digest.update(data)
connection ! init.Command(ByteString(data))
self forward StartSending(n - 1)
case BackpressureBuffer.HighWatermarkReached
context.setReceiveTimeout(5.seconds)
context.become({
case BackpressureBuffer.LowWatermarkReached
unstashAll()
context.setReceiveTimeout(Duration.Undefined)
context.unbecome()
case ReceiveTimeout
log.error("receive timeout while throttled")
context.stop(self)
case _ stash()
}, discardOld = false)
case ReceiveTimeout // that old cancellation race
case Close connection ! Management(Tcp.Close)
case Tcp.Closed context.stop(self)
}
val failed: Receive = {
case _ sender ! Failed
}
}
case object GetPort
case class Port(p: Int)
case object GetProgress
case class Progress(n: Int)
case object GetHash
case class Hash(hash: ByteString)
class Receiver(hiccups: Boolean) extends Actor with Stash with ActorLogging {
val digest = MessageDigest.getInstance("SHA-1")
digest.reset()
import context.system
IO(Tcp) ! Tcp.Bind(self, new InetSocketAddress("localhost", 0))
var listener: ActorRef = _
def receive = {
case Tcp.Bound(local)
listener = sender
unstashAll()
context.become(bound(local.getPort))
case _: Tcp.CommandFailed
unstashAll()
context.become(failed)
case _ stash()
}
def bound(port: Int): Receive = {
case GetPort sender ! Port(port)
case Tcp.Connected(local, remote)
val init = TcpPipelineHandler.withLogger(log,
new TcpReadWriteAdapter >>
new BackpressureBuffer(10000, 1000000, Long.MaxValue))
val handler = context.actorOf(TcpPipelineHandler(init, sender, self), "pipeline")
sender ! Tcp.Register(handler)
unstashAll()
context.become(connected(init, handler))
case _ stash()
}
def connected(init: Init[WithinActorContext, ByteString, ByteString], connection: ActorRef): Receive = {
var received = 0L
{
case init.Event(data)
digest.update(data.toArray)
received += data.length
if (hiccups && ThreadLocalRandom.current.nextInt(1000) == 0) {
connection ! Management(Tcp.SuspendReading)
import context.dispatcher
system.scheduler.scheduleOnce(100.millis, connection, Management(Tcp.ResumeReading))
}
case GetProgress
sender ! Progress((received / ChunkSize).toInt)
case GetHash
sender ! Hash(ByteString(digest.digest()))
case Tcp.PeerClosed
listener ! Tcp.Unbind
context.become {
case Tcp.Unbound context.stop(self)
}
}
}
val failed: Receive = {
case _ sender ! Failed
}
}
}
class BackpressureSpec extends AkkaSpec with ImplicitSender {
import BackpressureSpec._
"A BackpressureBuffer" must {
"transmit the right bytes" in {
val N = 100000
val recv = watch(system.actorOf(Props(classOf[Receiver], false), "receiver1"))
recv ! GetPort
val port = expectMsgType[Port].p
val send = watch(system.actorOf(Props(classOf[Sender], new InetSocketAddress("localhost", port)), "sender1"))
within(20.seconds) {
send ! StartSending(N)
val hash = expectMsgType[Done].hash
implicit val t = Timeout(100.millis)
awaitAssert(Await.result(recv ? GetProgress, t.duration) === N)
recv ! GetHash
expectMsgType[Hash].hash === hash
}
send ! Close
val terminated = receiveWhile(1.second, messages = 2) {
case Terminated(t) t
}
terminated === Set(send, recv)
}
"transmit the right bytes with hiccups" in {
val N = 100000
val recv = watch(system.actorOf(Props(classOf[Receiver], true), "receiver2"))
recv ! GetPort
val port = expectMsgType[Port].p
val send = watch(system.actorOf(Props(classOf[Sender], new InetSocketAddress("localhost", port)), "sender2"))
within(20.seconds) {
send ! StartSending(N)
val hash = expectMsgType[Done].hash
implicit val t = Timeout(100.millis)
awaitAssert(Await.result(recv ? GetProgress, t.duration) === N)
recv ! GetHash
expectMsgType[Hash].hash === hash
}
send ! Close
val terminated = receiveWhile(1.second, messages = 2) {
case Terminated(t) t
}
terminated === Set(send, recv)
}
}
}

View file

@ -10,6 +10,8 @@ import akka.actor.{ Props, ActorLogging, Actor, ActorContext }
import akka.TestUtils import akka.TestUtils
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicInteger
import scala.concurrent.duration._ import scala.concurrent.duration._
import akka.io.TcpPipelineHandler.Management
import akka.actor.ActorRef
class DelimiterFramingSpec extends AkkaSpec { class DelimiterFramingSpec extends AkkaSpec {
@ -42,6 +44,7 @@ class DelimiterFramingSpec extends AkkaSpec {
val probe = TestProbe() val probe = TestProbe()
probe.send(IO(Tcp), Tcp.Bind(bindHandler, serverAddress)) probe.send(IO(Tcp), Tcp.Bind(bindHandler, serverAddress))
probe.expectMsgType[Tcp.Bound] probe.expectMsgType[Tcp.Bound]
bindHandler ! Listener(probe.lastSender)
val client = new AkkaLineClient(serverAddress, delimiter, includeDelimiter) val client = new AkkaLineClient(serverAddress, delimiter, includeDelimiter)
client.run() client.run()
@ -58,14 +61,10 @@ class DelimiterFramingSpec extends AkkaSpec {
val connected = probe.expectMsgType[Tcp.Connected] val connected = probe.expectMsgType[Tcp.Connected]
val connection = probe.sender val connection = probe.sender
val init = new TcpPipelineHandler.Init( val init = TcpPipelineHandler.withLogger(system.log,
new StringByteStringAdapter >> new StringByteStringAdapter >>
new DelimiterFraming(maxSize = 1024, delimiter = ByteString(delimiter), includeDelimiter = includeDelimiter) >> new DelimiterFraming(maxSize = 1024, delimiter = ByteString(delimiter), includeDelimiter = includeDelimiter) >>
new TcpReadWriteAdapter) { new TcpReadWriteAdapter)
override def makeContext(actorContext: ActorContext): HasLogging = new HasLogging {
override def getLogger = system.log
}
}
import init._ import init._
@ -104,36 +103,32 @@ class DelimiterFramingSpec extends AkkaSpec {
} }
def close() { def close() {
probe.send(handler, Tcp.Close) probe.send(handler, Management(Tcp.Close))
probe.expectMsgType[Tcp.Event] match { probe.expectMsgType[Tcp.ConnectionClosed]
case _: Tcp.ConnectionClosed true
}
TestUtils.verifyActorTermination(handler) TestUtils.verifyActorTermination(handler)
} }
} }
case class Listener(ref: ActorRef)
class AkkaLineEchoServer(delimiter: String, includeDelimiter: Boolean) extends Actor with ActorLogging { class AkkaLineEchoServer(delimiter: String, includeDelimiter: Boolean) extends Actor with ActorLogging {
import Tcp.Connected import Tcp.Connected
var listener: ActorRef = _
def receive: Receive = { def receive: Receive = {
case Listener(ref) listener = ref
case Connected(remote, _) case Connected(remote, _)
val init = val init =
new TcpPipelineHandler.Init( TcpPipelineHandler.withLogger(log,
new StringByteStringAdapter >> new StringByteStringAdapter >>
new DelimiterFraming(maxSize = 1024, delimiter = ByteString(delimiter), includeDelimiter = includeDelimiter) >> new DelimiterFraming(maxSize = 1024, delimiter = ByteString(delimiter), includeDelimiter = includeDelimiter) >>
new TcpReadWriteAdapter) { new TcpReadWriteAdapter)
override def makeContext(actorContext: ActorContext): HasLogging =
new HasLogging {
override def getLogger = log
}
}
import init._ import init._
val connection = sender val connection = sender
val handler = system.actorOf( val handler = context.actorOf(TcpPipelineHandler(init, sender, self), "pipeline")
TcpPipelineHandler(init, sender, self), "server" + counter.incrementAndGet())
connection ! Tcp.Register(handler) connection ! Tcp.Register(handler)
@ -141,6 +136,8 @@ class DelimiterFramingSpec extends AkkaSpec {
case Event(data) case Event(data)
if (includeDelimiter) sender ! Command(data) if (includeDelimiter) sender ! Command(data)
else sender ! Command(data + delimiter) else sender ! Command(data + delimiter)
case Tcp.PeerClosed listener ! Tcp.Unbind
case Tcp.Unbound context.stop(self)
} }
} }
} }

View file

@ -140,7 +140,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms")
"write data to network (and acknowledge)" in new EstablishedConnectionTest() { "write data to network (and acknowledge)" in new EstablishedConnectionTest() {
run { run {
object Ack object Ack extends Event
val writer = TestProbe() val writer = TestProbe()
// directly acknowledge an empty write // directly acknowledge an empty write
@ -172,7 +172,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms")
"write data after not acknowledged data" in new EstablishedConnectionTest() { "write data after not acknowledged data" in new EstablishedConnectionTest() {
run { run {
object Ack object Ack extends Event
val writer = TestProbe() val writer = TestProbe()
writer.send(connectionActor, Write(ByteString(42.toByte))) writer.send(connectionActor, Write(ByteString(42.toByte)))
writer.expectNoMsg(500.millis) writer.expectNoMsg(500.millis)
@ -197,7 +197,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms")
// maximum of 100 MB // maximum of 100 MB
val size = math.min(testFile.length(), 100000000).toInt val size = math.min(testFile.length(), 100000000).toInt
object Ack object Ack extends Event
val writer = TestProbe() val writer = TestProbe()
writer.send(connectionActor, WriteFile(testFile.getAbsolutePath, 0, size, Ack)) writer.send(connectionActor, WriteFile(testFile.getAbsolutePath, 0, size, Ack))
pullFromServerSide(size, 1000000) pullFromServerSide(size, 1000000)
@ -225,8 +225,8 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms")
"backpressure present.") "backpressure present.")
pending pending
ignoreIfWindows() ignoreIfWindows()
object Ack1 object Ack1 extends Event
object Ack2 object Ack2 extends Event
clientSideChannel.socket.setSendBufferSize(1024) clientSideChannel.socket.setSendBufferSize(1024)
@ -281,7 +281,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms")
new EstablishedConnectionTest() with SmallRcvBuffer { new EstablishedConnectionTest() with SmallRcvBuffer {
run { run {
// we should test here that a pending write command is properly finished first // we should test here that a pending write command is properly finished first
object Ack object Ack extends Event
// set an artificially small send buffer size so that the write is queued // set an artificially small send buffer size so that the write is queued
// inside the connection actor // inside the connection actor
clientSideChannel.socket.setSendBufferSize(1024) clientSideChannel.socket.setSendBufferSize(1024)
@ -343,7 +343,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms")
new EstablishedConnectionTest() with SmallRcvBuffer { new EstablishedConnectionTest() with SmallRcvBuffer {
run { run {
// we should test here that a pending write command is properly finished first // we should test here that a pending write command is properly finished first
object Ack object Ack extends Event
// set an artificially small send buffer size so that the write is queued // set an artificially small send buffer size so that the write is queued
// inside the connection actor // inside the connection actor
clientSideChannel.socket.setSendBufferSize(1024) clientSideChannel.socket.setSendBufferSize(1024)
@ -376,7 +376,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms")
ignoreIfWindows() ignoreIfWindows()
// we should test here that a pending write command is properly finished first // we should test here that a pending write command is properly finished first
object Ack object Ack extends Event
// set an artificially small send buffer size so that the write is queued // set an artificially small send buffer size so that the write is queued
// inside the connection actor // inside the connection actor
clientSideChannel.socket.setSendBufferSize(1024) clientSideChannel.socket.setSendBufferSize(1024)
@ -423,7 +423,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms")
selector.send(connectionActor, ChannelReadable) selector.send(connectionActor, ChannelReadable)
connectionHandler.expectMsg(PeerClosed) connectionHandler.expectMsg(PeerClosed)
object Ack object Ack extends Event
connectionHandler.send(connectionActor, writeCmd(Ack)) connectionHandler.send(connectionActor, writeCmd(Ack))
pullFromServerSide(TestSize) pullFromServerSide(TestSize)
connectionHandler.expectMsg(Ack) connectionHandler.expectMsg(Ack)
@ -441,7 +441,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms")
selector.send(connectionActor, ChannelReadable) selector.send(connectionActor, ChannelReadable)
connectionHandler.expectMsg(PeerClosed) connectionHandler.expectMsg(PeerClosed)
object Ack object Ack extends Event
connectionHandler.send(connectionActor, writeCmd(Ack)) connectionHandler.send(connectionActor, writeCmd(Ack))
pullFromServerSide(TestSize) pullFromServerSide(TestSize)
connectionHandler.expectMsg(Ack) connectionHandler.expectMsg(Ack)
@ -571,8 +571,9 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms")
writer.expectMsg(Duration.Zero, WritingResumed) writer.expectMsg(Duration.Zero, WritingResumed)
// now write should work again // now write should work again
writer.send(connectionActor, writeCmd("works")) object works extends Event
writer.expectMsg("works") writer.send(connectionActor, writeCmd(works))
writer.expectMsg(works)
} }
} }
@ -606,8 +607,9 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms")
writer.expectMsg(1.second, WritingResumed) writer.expectMsg(1.second, WritingResumed)
// now write should work again // now write should work again
writer.send(connectionActor, writeCmd("works")) object works extends Event
writer.expectMsg("works") writer.send(connectionActor, writeCmd(works))
writer.expectMsg(works)
} }
} }
@ -638,8 +640,9 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms")
pullFromServerSide(TestSize * written) pullFromServerSide(TestSize * written)
// now write should work again // now write should work again
writer.send(connectionActor, writeCmd("works")) object works extends Event
writer.expectMsg("works") writer.send(connectionActor, writeCmd(works))
writer.expectMsg(works)
} }
} }
@ -663,8 +666,9 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms")
pullFromServerSide(TestSize * written) pullFromServerSide(TestSize * written)
// now write should work again // now write should work again
writer.send(connectionActor, writeCmd("works")) object works extends Event
writer.expectMsg("works") writer.send(connectionActor, writeCmd(works))
writer.expectMsg(works)
} }
} }
} }
@ -767,7 +771,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms")
final val TestSize = 10000 // compile-time constant final val TestSize = 10000 // compile-time constant
def writeCmd(ack: AnyRef) = def writeCmd(ack: Event) =
Write(ByteString(Array.fill[Byte](TestSize)(0)), ack) Write(ByteString(Array.fill[Byte](TestSize)(0)), ack)
def closeServerSideAndWaitForClientReadable(fullClose: Boolean = true): Unit = { def closeServerSideAndWaitForClientReadable(fullClose: Boolean = true): Unit = {

View file

@ -41,12 +41,15 @@ class TcpIntegrationSpec extends AkkaSpec("akka.loglevel = INFO") with TcpIntegr
"properly complete one client/server request/response cycle" in new TestSetup { "properly complete one client/server request/response cycle" in new TestSetup {
val (clientHandler, clientConnection, serverHandler, serverConnection) = establishNewClientConnection() val (clientHandler, clientConnection, serverHandler, serverConnection) = establishNewClientConnection()
clientHandler.send(clientConnection, Write(ByteString("Captain on the bridge!"), 'Aye)) object Aye extends Event
clientHandler.expectMsg('Aye) object Yes extends Event
clientHandler.send(clientConnection, Write(ByteString("Captain on the bridge!"), Aye))
clientHandler.expectMsg(Aye)
serverHandler.expectMsgType[Received].data.decodeString("ASCII") must be("Captain on the bridge!") serverHandler.expectMsgType[Received].data.decodeString("ASCII") must be("Captain on the bridge!")
serverHandler.send(serverConnection, Write(ByteString("For the king!"), 'Yes)) serverHandler.send(serverConnection, Write(ByteString("For the king!"), Yes))
serverHandler.expectMsg('Yes) serverHandler.expectMsg(Yes)
clientHandler.expectMsgType[Received].data.decodeString("ASCII") must be("For the king!") clientHandler.expectMsgType[Received].data.decodeString("ASCII") must be("For the king!")
serverHandler.send(serverConnection, Close) serverHandler.send(serverConnection, Close)
@ -60,8 +63,10 @@ class TcpIntegrationSpec extends AkkaSpec("akka.loglevel = INFO") with TcpIntegr
"support waiting for writes with backpressure" in new TestSetup { "support waiting for writes with backpressure" in new TestSetup {
val (clientHandler, clientConnection, serverHandler, serverConnection) = establishNewClientConnection() val (clientHandler, clientConnection, serverHandler, serverConnection) = establishNewClientConnection()
serverHandler.send(serverConnection, Write(ByteString(Array.fill[Byte](100000)(0)), 'Ack)) object Ack extends Event
serverHandler.expectMsg('Ack)
serverHandler.send(serverConnection, Write(ByteString(Array.fill[Byte](100000)(0)), Ack))
serverHandler.expectMsg(Ack)
expectReceivedData(clientHandler, 100000) expectReceivedData(clientHandler, 100000)

View file

@ -721,6 +721,223 @@ abstract class PipelineStage[Context <: PipelineContext, CmdAbove, CmdBelow, Evt
} }
} }
object BackpressureBuffer {
/**
* Message type which is sent when the buffers high watermark has been
* reached, which means that further write requests should not be sent
* until the low watermark has been reached again.
*/
trait HighWatermarkReached extends Tcp.Event
case object HighWatermarkReached extends HighWatermarkReached
/**
* Message type which is sent when the buffers fill level falls below
* the low watermark, which means that writing can commence again.
*/
trait LowWatermarkReached extends Tcp.Event
case object LowWatermarkReached extends LowWatermarkReached
}
/**
* This pipeline stage implements a configurable buffer for transforming the
* per-write ACK/NACK-based backpressure model of a TCP connection actor into
* an edge-triggered back-pressure model: the upper stages will receive
* notification when the buffer runs full ([[HighWatermarkReached]]) and when
* it subsequently empties ([[LowWatermarkReached]]). The upper layers should
* respond by not generating more writes when the buffer is full. There is also
* a hard limit upon which this buffer will abort the connection.
*
* All limits are configurable and are given in number of bytes.
* The `highWatermark` should be set such that the
* amount of data generated before reception of the asynchronous
* [[HighWatermarkReached]] notification does not lead to exceeding the
* `maxCapacity` hard limit; if the writes may arrive in bursts then the
* difference between these two should allow for at least one burst to be sent
* after the high watermark has been reached. The `lowWatermark` must be less
* than or equal to the `highWatermark`, where the difference between these two
* defines the hysteresis, i.e. how often these notifications are sent out (i.e.
* if the difference is rather large then it will take some time for the buffer
* to empty below the low watermark, and that room is then available for data
* sent in response to the [[LowWatermarkReached]] notification; if the
* difference was small then the buffer would more quickly oscillate between
* these two limits).
*/
class BackpressureBuffer(lowWatermark: Long, highWatermark: Long, maxCapacity: Long)
extends PipelineStage[HasLogging, Tcp.Command, Tcp.Command, Tcp.Event, Tcp.Event] {
require(lowWatermark >= 0, "lowWatermark needs to be non-negative")
require(highWatermark >= lowWatermark, "highWatermark needs to be at least as large as lowWatermark")
require(maxCapacity >= highWatermark, "maxCapacity needs to be at least as large as highWatermark")
case class Ack(num: Int, ack: Tcp.Event) extends Tcp.Event
override def apply(ctx: HasLogging) = new PipePair[Tcp.Command, Tcp.Command, Tcp.Event, Tcp.Event] {
import Tcp._
import BackpressureBuffer._
private val log = ctx.getLogger
private var storageOffset = 0
private var storage = Vector.empty[Write]
private def currentOffset = storageOffset + storage.size
private var stored = 0L
private var suspended = false
private var behavior = writing
override def commandPipeline = behavior
override def eventPipeline = behavior
private def become(f: Message Iterable[Result]) { behavior = f }
private lazy val writing: Message Iterable[Result] = {
case Write(data, ack)
buffer(Write(data, Ack(currentOffset, ack)), doWrite = true)
case CommandFailed(Write(_, Ack(offset, _)))
become(buffering(offset))
ctx.singleCommand(ResumeWriting)
case cmd: CloseCommand cmd match {
case _ if storage.isEmpty
become(finished)
ctx.singleCommand(cmd)
case Abort
storage = Vector.empty
become(finished)
ctx.singleCommand(Abort)
case _
become(closing(cmd))
ctx.nothing
}
case Ack(seq, ack) acknowledge(seq, ack)
case cmd: Command ctx.singleCommand(cmd)
case evt: Event ctx.singleEvent(evt)
}
private def buffering(nack: Int): Message Iterable[Result] = {
var toAck = 10
var closed: CloseCommand = null
{
case Write(data, ack)
buffer(Write(data, Ack(currentOffset, ack)), doWrite = false)
case WritingResumed
ctx.singleCommand(storage(0))
case cmd: CloseCommand cmd match {
case Abort
storage = Vector.empty
become(finished)
ctx.singleCommand(Abort)
case _
closed = cmd
ctx.nothing
}
case Ack(seq, ack) if seq < nack acknowledge(seq, ack)
case Ack(seq, ack)
val ackMsg = acknowledge(seq, ack)
if (storage.nonEmpty) {
if (toAck > 0) {
toAck -= 1
ctx.dealias(ackMsg) ++ Seq(Right(storage(0)))
} else {
become(if (closed != null) closing(closed) else writing)
ctx.dealias(ackMsg) ++ storage.map(Right(_))
}
} else if (closed != null) {
become(finished)
ctx.dealias(ackMsg) ++ Seq(Right(closed))
} else {
become(writing)
ackMsg
}
case CommandFailed(_: Write) ctx.nothing
case cmd: Command ctx.singleCommand(cmd)
case evt: Event ctx.singleEvent(evt)
}
}
private def closing(cmd: CloseCommand): Message Iterable[Result] = {
case Ack(seq, ack)
val result = acknowledge(seq, ack)
if (storage.isEmpty) {
become(finished)
ctx.dealias(result) ++ Seq(Right(cmd))
} else result
case CommandFailed(_: Write)
become({
case WritingResumed
become(closing(cmd))
storage.map(Right(_))
case CommandFailed(_: Write) ctx.nothing
case cmd: Command ctx.singleCommand(cmd)
case evt: Event ctx.singleEvent(evt)
})
ctx.singleCommand(ResumeWriting)
case cmd: Command ctx.singleCommand(cmd)
case evt: Event ctx.singleEvent(evt)
}
private val finished: Message Iterable[Result] = {
case _: Write ctx.nothing
case CommandFailed(_: Write) ctx.nothing
case cmd: Command ctx.singleCommand(cmd)
case evt: Event ctx.singleEvent(evt)
}
private def buffer(w: Write, doWrite: Boolean): Iterable[Result] = {
storage :+= w
stored += w.data.size
if (stored > maxCapacity) {
log.warning("aborting connection (buffer overrun)")
become(finished)
ctx.singleCommand(Abort)
} else if (stored > highWatermark && !suspended) {
log.debug("suspending writes")
suspended = true
if (doWrite) {
Seq(Right(w), Left(HighWatermarkReached))
} else {
ctx.singleEvent(HighWatermarkReached)
}
} else if (doWrite) {
ctx.singleCommand(w)
} else Nil
}
private def acknowledge(seq: Int, ack: Event): Iterable[Result] = {
require(seq == storageOffset, s"received ack $seq at $storageOffset")
require(storage.nonEmpty, s"storage was empty at ack $seq")
val size = storage(0).data.size
stored -= size
storageOffset += 1
storage = storage drop 1
if (suspended && stored < lowWatermark) {
log.debug("resuming writes")
suspended = false
if (ack == NoAck) ctx.singleEvent(LowWatermarkReached)
else Vector(Left(ack), Left(LowWatermarkReached))
} else if (ack == NoAck) ctx.nothing
else ctx.singleEvent(ack)
}
}
}
//#length-field-frame //#length-field-frame
/** /**
* Pipeline stage for length-field encoded framing. It will prepend a * Pipeline stage for length-field encoded framing. It will prepend a

View file

@ -219,7 +219,7 @@ class SslTlsSupport(engine: SSLEngine) extends PipelineStage[HasLogging, Command
} }
} }
private final class Send(val buffer: ByteBuffer, val ack: Any) private final class Send(val buffer: ByteBuffer, val ack: Event)
private object Send { private object Send {
val Empty = new Send(ByteBuffer wrap SslTlsSupport.EmptyByteArray, Tcp.NoAck) val Empty = new Send(ByteBuffer wrap SslTlsSupport.EmptyByteArray, Tcp.NoAck)

View file

@ -63,12 +63,14 @@ object Tcp extends ExtensionKey[TcpExt] {
} }
trait Message
/// COMMANDS /// COMMANDS
/** /**
* This is the common trait for all commands understood by TCP actors. * This is the common trait for all commands understood by TCP actors.
*/ */
trait Command extends IO.HasFailureMessage { trait Command extends Message with IO.HasFailureMessage {
def failureMessage = CommandFailed(this) def failureMessage = CommandFailed(this)
} }
@ -102,7 +104,7 @@ object Tcp extends ExtensionKey[TcpExt] {
override def event = Aborted override def event = Aborted
} }
case class NoAck(token: Any) case class NoAck(token: Any) extends Event
object NoAck extends NoAck(null) object NoAck extends NoAck(null)
sealed trait WriteCommand extends Command { sealed trait WriteCommand extends Command {
@ -116,7 +118,7 @@ object Tcp extends ExtensionKey[TcpExt] {
* Write data to the TCP connection. If no ack is needed use the special * Write data to the TCP connection. If no ack is needed use the special
* `NoAck` object. * `NoAck` object.
*/ */
case class Write(data: ByteString, ack: Any) extends WriteCommand case class Write(data: ByteString, ack: Event) extends WriteCommand
object Write { object Write {
/** /**
* The empty Write doesn't write anything and isn't acknowledged. * The empty Write doesn't write anything and isn't acknowledged.
@ -149,7 +151,7 @@ object Tcp extends ExtensionKey[TcpExt] {
case object ResumeReading extends Command case object ResumeReading extends Command
/// EVENTS /// EVENTS
trait Event trait Event extends Message
case class Received(data: ByteString) extends Event case class Received(data: ByteString) extends Event
case class Connected(remoteAddress: InetSocketAddress, localAddress: InetSocketAddress) extends Event case class Connected(remoteAddress: InetSocketAddress, localAddress: InetSocketAddress) extends Event
@ -274,7 +276,7 @@ object TcpMessage {
def noAck(token: AnyRef): NoAck = NoAck(token) def noAck(token: AnyRef): NoAck = NoAck(token)
def write(data: ByteString): Command = Write(data) def write(data: ByteString): Command = Write(data)
def write(data: ByteString, ack: AnyRef): Command = Write(data, ack) def write(data: ByteString, ack: Event): Command = Write(data, ack)
def suspendReading: Command = SuspendReading def suspendReading: Command = SuspendReading
def resumeReading: Command = ResumeReading def resumeReading: Command = ResumeReading

View file

@ -4,21 +4,16 @@
package akka.io package akka.io
import akka.actor.Actor
import akka.actor.ActorContext
import scala.beans.BeanProperty import scala.beans.BeanProperty
import akka.actor.ActorRef import scala.util.{ Failure, Success }
import scala.util.Success import akka.actor.{ Actor, ActorContext, ActorRef, Props, Terminated }
import scala.util.Failure import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics }
import akka.actor.Terminated
import akka.actor.Props
import akka.util.ByteString import akka.util.ByteString
import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue } import akka.event.Logging
import akka.event.LoggingAdapter
object TcpPipelineHandler { object TcpPipelineHandler {
case class EscapeEvent(ev: Tcp.Event) extends Tcp.Command
/** /**
* This class wraps up a pipeline with its external (i.e. top) command and * This class wraps up a pipeline with its external (i.e. top) command and
* event types and providing unique wrappers for sending commands and * event types and providing unique wrappers for sending commands and
@ -26,25 +21,61 @@ object TcpPipelineHandler {
* instance of [[Init]]). All events emitted by the pipeline will be sent to * instance of [[Init]]). All events emitted by the pipeline will be sent to
* the registered handler wrapped in an Event. * the registered handler wrapped in an Event.
*/ */
abstract class Init[Ctx <: PipelineContext, Cmd, Evt](val stages: PipelineStage[Ctx, Cmd, Tcp.Command, Evt, Tcp.Event]) { abstract class Init[Ctx <: PipelineContext, Cmd, Evt](
val stages: PipelineStage[_ >: Ctx <: PipelineContext, Cmd, Tcp.Command, Evt, Tcp.Event]) {
/**
* This method must be implemented to return the [[PipelineContext]]
* necessary for the operation of the given [[PipelineStage]].
*/
def makeContext(actorContext: ActorContext): Ctx def makeContext(actorContext: ActorContext): Ctx
/**
* Java API: construct a command to be sent to the [[TcpPipelineHandler]]
* actor.
*/
def command(cmd: Cmd): Command = Command(cmd) def command(cmd: Cmd): Command = Command(cmd)
/**
* Java API: extract a wrapped event received from the [[TcpPipelineHandler]]
* actor.
*
* @throws MatchError if the given object is not an Event matching this
* specific Init instance.
*/
def event(evt: AnyRef): Evt = evt match { def event(evt: AnyRef): Evt = evt match {
case Event(evt) evt case Event(evt) evt
} }
final case class Command(@BeanProperty cmd: Cmd) /**
final case class Event(@BeanProperty evt: Evt) * Wrapper class for commands to be sent to the [[TcpPipelineHandler]] actor.
*/
case class Command(@BeanProperty cmd: Cmd)
/**
* Wrapper class for events emitted by the [[TcpPipelineHandler]] actor.
*/
case class Event(@BeanProperty evt: Evt)
} }
/** /**
* Wrapper around acknowledgements: if a Tcp.Write is generated which * This interface bundles logging and ActorContext for Java.
* request an ACK then it is wrapped such that the ACK can flow back up the
* pipeline later, allowing you to use arbitrary ACK messages (not just
* subtypes of Tcp.Event).
*/ */
case class Ack(ack: Any) extends Tcp.Event trait WithinActorContext extends HasLogging with HasActorContext
def withLogger[Cmd, Evt](log: LoggingAdapter,
stages: PipelineStage[_ >: WithinActorContext <: PipelineContext, Cmd, Tcp.Command, Evt, Tcp.Event]): Init[WithinActorContext, Cmd, Evt] =
new Init[WithinActorContext, Cmd, Evt](stages) {
override def makeContext(ctx: ActorContext): WithinActorContext = new WithinActorContext {
override def getLogger = log
override def getContext = ctx
}
}
/**
* Wrapper class for management commands sent to the [[TcpPipelineHandler]] actor.
*/
case class Management(@BeanProperty cmd: AnyRef)
/** /**
* This is a new Tcp.Command which the pipeline can emit to effect the * This is a new Tcp.Command which the pipeline can emit to effect the
@ -54,6 +85,14 @@ object TcpPipelineHandler {
*/ */
case class Tell(receiver: ActorRef, msg: Any, sender: ActorRef) extends Tcp.Command case class Tell(receiver: ActorRef, msg: Any, sender: ActorRef) extends Tcp.Command
/**
* The pipeline may want to emit a [[Tcp.Event]] to the registered handler
* actor, which is enabled by emitting this [[Tcp.Command]] wrapping an event
* instead. The [[TcpPipelineHandler]] actor will upon reception of this command
* forward the wrapped event to the handler.
*/
case class TcpEvent(@BeanProperty evt: Tcp.Event) extends Tcp.Command
/** /**
* Scala API: create [[Props]] for a pipeline handler * Scala API: create [[Props]] for a pipeline handler
*/ */
@ -103,10 +142,8 @@ class TcpPipelineHandler[Ctx <: PipelineContext, Cmd, Evt](
val pipes = PipelineFactory.buildWithSinkFunctions(ctx, init.stages)({ val pipes = PipelineFactory.buildWithSinkFunctions(ctx, init.stages)({
case Success(cmd) case Success(cmd)
cmd match { cmd match {
case Tcp.Write(data, Tcp.NoAck) connection ! cmd
case Tcp.Write(data, ack) connection ! Tcp.Write(data, Ack(ack))
case Tell(receiver, msg, sender) receiver.tell(msg, sender) case Tell(receiver, msg, sender) receiver.tell(msg, sender)
case EscapeEvent(ev) handler ! ev case TcpEvent(ev) handler ! ev
case _ connection ! cmd case _ connection ! cmd
} }
case Failure(ex) throw ex case Failure(ex) throw ex
@ -118,8 +155,9 @@ class TcpPipelineHandler[Ctx <: PipelineContext, Cmd, Evt](
def receive = { def receive = {
case Command(cmd) pipes.injectCommand(cmd) case Command(cmd) pipes.injectCommand(cmd)
case evt: Tcp.Event pipes.injectEvent(evt) case evt: Tcp.Event pipes.injectEvent(evt)
case Management(cmd) pipes.managementCommand(cmd)
case Terminated(`handler`) connection ! Tcp.Abort case Terminated(`handler`) connection ! Tcp.Abort
case cmd: Tcp.Command pipes.managementCommand(cmd) case Terminated(`connection`) context.stop(self)
} }
} }
@ -131,10 +169,11 @@ class TcpPipelineHandler[Ctx <: PipelineContext, Cmd, Evt](
* *
* While this adapter communicates to the stage above it via raw ByteStrings, it is possible to inject Tcp Command * While this adapter communicates to the stage above it via raw ByteStrings, it is possible to inject Tcp Command
* by sending them to the management port, and the adapter will simply pass them down to the stage below. Incoming Tcp Events * by sending them to the management port, and the adapter will simply pass them down to the stage below. Incoming Tcp Events
* that are not Receive events will be passed directly to the handler registered for TcpPipelineHandler. * that are not Receive events will be passed downwards wrapped in a [[TcpEvent]]; the [[TcpPipelineHandler]] will
* @tparam Ctx * send these notifications to the registered event handler actor.
*/ */
class TcpReadWriteAdapter extends PipelineStage[PipelineContext, ByteString, Tcp.Command, ByteString, Tcp.Event] { class TcpReadWriteAdapter extends PipelineStage[PipelineContext, ByteString, Tcp.Command, ByteString, Tcp.Event] {
import TcpPipelineHandler.TcpEvent
override def apply(ctx: PipelineContext) = new PipePair[ByteString, Tcp.Command, ByteString, Tcp.Event] { override def apply(ctx: PipelineContext) = new PipePair[ByteString, Tcp.Command, ByteString, Tcp.Event] {
@ -144,7 +183,7 @@ class TcpReadWriteAdapter extends PipelineStage[PipelineContext, ByteString, Tcp
override val eventPipeline = (evt: Tcp.Event) evt match { override val eventPipeline = (evt: Tcp.Event) evt match {
case Tcp.Received(data) ctx.singleEvent(data) case Tcp.Received(data) ctx.singleEvent(data)
case ev: Tcp.Event ctx.singleCommand(TcpPipelineHandler.EscapeEvent(ev)) case ev: Tcp.Event ctx.singleCommand(TcpEvent(ev))
} }
override val managementPort: Mgmt = { override val managementPort: Mgmt = {

View file

@ -14,6 +14,7 @@ import akka.event.Logging;
import akka.event.LoggingAdapter; import akka.event.LoggingAdapter;
import akka.io.Tcp.CommandFailed; import akka.io.Tcp.CommandFailed;
import akka.io.Tcp.ConnectionClosed; import akka.io.Tcp.ConnectionClosed;
import akka.io.Tcp.Event;
import akka.io.Tcp.Received; import akka.io.Tcp.Received;
import akka.io.Tcp.Write; import akka.io.Tcp.Write;
import akka.io.Tcp.WritingResumed; import akka.io.Tcp.WritingResumed;
@ -34,6 +35,13 @@ public class EchoHandler extends UntypedActor {
public static final long HIGH_WATERMARK = MAX_STORED * 5 / 10; public static final long HIGH_WATERMARK = MAX_STORED * 5 / 10;
public static final long LOW_WATERMARK = MAX_STORED * 2 / 10; public static final long LOW_WATERMARK = MAX_STORED * 2 / 10;
private static class Ack implements Event {
public final int ack;
public Ack(int ack) {
this.ack = ack;
}
}
public EchoHandler(ActorRef connection, InetSocketAddress remote) { public EchoHandler(ActorRef connection, InetSocketAddress remote) {
this.connection = connection; this.connection = connection;
this.remote = remote; this.remote = remote;
@ -50,7 +58,7 @@ public class EchoHandler extends UntypedActor {
public void apply(Object msg) throws Exception { public void apply(Object msg) throws Exception {
if (msg instanceof Received) { if (msg instanceof Received) {
final ByteString data = ((Received) msg).data(); final ByteString data = ((Received) msg).data();
connection.tell(TcpMessage.write(data, currentOffset()), getSelf()); connection.tell(TcpMessage.write(data, new Ack(currentOffset())), getSelf());
buffer(data); buffer(data);
} else if (msg instanceof Integer) { } else if (msg instanceof Integer) {
@ -59,7 +67,7 @@ public class EchoHandler extends UntypedActor {
} else if (msg instanceof CommandFailed) { } else if (msg instanceof CommandFailed) {
final Write w = (Write) ((CommandFailed) msg).cmd(); final Write w = (Write) ((CommandFailed) msg).cmd();
connection.tell(TcpMessage.resumeWriting(), getSelf()); connection.tell(TcpMessage.resumeWriting(), getSelf());
getContext().become(buffering((Integer) w.ack())); getContext().become(buffering((Ack) w.ack()));
} else if (msg instanceof ConnectionClosed) { } else if (msg instanceof ConnectionClosed) {
final ConnectionClosed cl = (ConnectionClosed) msg; final ConnectionClosed cl = (ConnectionClosed) msg;
@ -75,7 +83,7 @@ public class EchoHandler extends UntypedActor {
}; };
//#buffering //#buffering
protected Procedure<Object> buffering(final int nack) { protected Procedure<Object> buffering(final Ack nack) {
return new Procedure<Object>() { return new Procedure<Object>() {
private int toAck = 10; private int toAck = 10;
@ -99,7 +107,7 @@ public class EchoHandler extends UntypedActor {
final int ack = (Integer) msg; final int ack = (Integer) msg;
acknowledge(ack); acknowledge(ack);
if (ack >= nack) { if (ack >= nack.ack) {
// otherwise it was the ack of the last successful write // otherwise it was the ack of the last successful write
if (storage.isEmpty()) { if (storage.isEmpty()) {
@ -216,12 +224,12 @@ public class EchoHandler extends UntypedActor {
protected void writeAll() { protected void writeAll() {
int i = 0; int i = 0;
for (ByteString data : storage) { for (ByteString data : storage) {
connection.tell(TcpMessage.write(data, storageOffset + i++), getSelf()); connection.tell(TcpMessage.write(data, new Ack(storageOffset + i++)), getSelf());
} }
} }
protected void writeFirst() { protected void writeFirst() {
connection.tell(TcpMessage.write(storage.peek(), storageOffset), getSelf()); connection.tell(TcpMessage.write(storage.peek(), new Ack(storageOffset)), getSelf());
} }
//#storage-omitted //#storage-omitted

View file

@ -13,6 +13,7 @@ import akka.actor.UntypedActor;
import akka.event.Logging; import akka.event.Logging;
import akka.event.LoggingAdapter; import akka.event.LoggingAdapter;
import akka.io.Tcp.ConnectionClosed; import akka.io.Tcp.ConnectionClosed;
import akka.io.Tcp.Event;
import akka.io.Tcp.Received; import akka.io.Tcp.Received;
import akka.io.TcpMessage; import akka.io.TcpMessage;
import akka.japi.Procedure; import akka.japi.Procedure;
@ -85,7 +86,7 @@ public class SimpleEchoHandler extends UntypedActor {
private boolean suspended = false; private boolean suspended = false;
private boolean closing = false; private boolean closing = false;
private final Object ACK = new Object(); private final Event ACK = new Event() {};
//#simple-helpers //#simple-helpers
protected void buffer(ByteString data) { protected void buffer(ByteString data) {

View file

@ -21,8 +21,13 @@ import akka.actor.UntypedActor;
import akka.event.Logging; import akka.event.Logging;
import akka.event.LoggingAdapter; import akka.event.LoggingAdapter;
import akka.io.AbstractPipelineContext; import akka.io.AbstractPipelineContext;
import akka.io.BackpressureBuffer;
import akka.io.DelimiterFraming;
import akka.io.HasLogging; import akka.io.HasLogging;
import akka.io.PipelineStage;
import static akka.io.PipelineStage.sequence;
import akka.io.SslTlsSupport; import akka.io.SslTlsSupport;
import akka.io.StringByteStringAdapter;
import akka.io.Tcp; import akka.io.Tcp;
import akka.io.Tcp.Bound; import akka.io.Tcp.Bound;
import akka.io.Tcp.Command; import akka.io.Tcp.Command;
@ -33,6 +38,8 @@ import akka.io.Tcp.Received;
import akka.io.TcpMessage; import akka.io.TcpMessage;
import akka.io.TcpPipelineHandler; import akka.io.TcpPipelineHandler;
import akka.io.TcpPipelineHandler.Init; import akka.io.TcpPipelineHandler.Init;
import akka.io.TcpPipelineHandler.WithinActorContext;
import akka.io.TcpReadWriteAdapter;
import akka.io.ssl.SslTlsSupportSpec; import akka.io.ssl.SslTlsSupportSpec;
import akka.testkit.AkkaSpec; import akka.testkit.AkkaSpec;
import akka.testkit.JavaTestKit; import akka.testkit.JavaTestKit;
@ -60,14 +67,8 @@ public class SslDocTest {
.tell(TcpMessage.connect(remote), getSelf()); .tell(TcpMessage.connect(remote), getSelf());
} }
class Context extends AbstractPipelineContext implements HasLogging { // this will hold the pipeline handlers context
@Override Init<WithinActorContext, String, String> init = null;
public LoggingAdapter getLogger() {
return log;
}
}
Init<HasLogging, Command, Event> init = null;
@Override @Override
public void onReceive(Object msg) { public void onReceive(Object msg) {
@ -79,33 +80,30 @@ public class SslDocTest {
final SSLEngine engine = sslContext.createSSLEngine( final SSLEngine engine = sslContext.createSSLEngine(
remote.getHostName(), remote.getPort()); remote.getHostName(), remote.getPort());
engine.setUseClientMode(true); engine.setUseClientMode(true);
final SslTlsSupport ssl = new SslTlsSupport(engine);
// set up the context for communicating with TcpPipelineHandler // build pipeline and set up context for communicating with TcpPipelineHandler
init = new Init<HasLogging, Command, Event>(ssl) { init = TcpPipelineHandler.withLogger(log, sequence(sequence(sequence(sequence(
@Override new StringByteStringAdapter("utf-8"),
public HasLogging makeContext(ActorContext ctx) { new DelimiterFraming(1024, ByteString.fromString("\n"), true)),
return new Context(); new TcpReadWriteAdapter()),
} new SslTlsSupport(engine)),
}; new BackpressureBuffer(1000, 10000, 1000000)));
// create handler for pipeline, setting ourselves as payload recipient // create handler for pipeline, setting ourselves as payload recipient
final ActorRef handler = getContext().actorOf( final ActorRef handler = getContext().actorOf(
TcpPipelineHandler.create(init, getSender(), getSelf())); TcpPipelineHandler.create(init, getSender(), getSelf()));
// register the SSL handler with the connection // register the SSL handler with the connection
getSender().tell(TcpMessage.register(handler), getSelf()); getSender().tell(TcpMessage.register(handler), getSelf());
// and send a message across the SSL channel // and send a message across the SSL channel
handler.tell( handler.tell(init.command("hello\n"), getSelf());
init.command(TcpMessage.write(ByteString.fromString("hello"))),
getSelf());
} else if (msg instanceof Init.Event) { } else if (msg instanceof Init.Event) {
// unwrap TcpPipelineHandlers event into a Tcp.Event // unwrap TcpPipelineHandlers event into a Tcp.Event
final Event recv = init.event(msg); final String recv = init.event(msg);
if (recv instanceof Received) {
// and inform someone of the received payload // and inform someone of the received payload
listener.tell(((Received) recv).data().utf8String(), getSelf()); listener.tell(recv, getSelf());
}
} }
} }
} }
@ -130,14 +128,8 @@ public class SslDocTest {
getSelf()); getSelf());
} }
class Context extends AbstractPipelineContext implements HasLogging { // this will hold the pipeline handlers context
@Override Init<WithinActorContext, String, String> init = null;
public LoggingAdapter getLogger() {
return log;
}
}
Init<HasLogging, Command, Event> init = null;
@Override @Override
public void onReceive(Object msg) { public void onReceive(Object msg) {
@ -153,15 +145,15 @@ public class SslDocTest {
final SSLEngine engine = sslContext.createSSLEngine( final SSLEngine engine = sslContext.createSSLEngine(
remote.getHostName(), remote.getPort()); remote.getHostName(), remote.getPort());
engine.setUseClientMode(false); engine.setUseClientMode(false);
final SslTlsSupport ssl = new SslTlsSupport(engine);
// set up the context for communicating with TcpPipelineHandler // build pipeline and set up context for communicating with TcpPipelineHandler
init = new Init<HasLogging, Command, Event>(ssl) { init = TcpPipelineHandler.withLogger(log, sequence(sequence(sequence(sequence(
@Override new StringByteStringAdapter("utf-8"),
public HasLogging makeContext(ActorContext ctx) { new DelimiterFraming(1024, ByteString.fromString("\n"), true)),
return new Context(); new TcpReadWriteAdapter()),
} new SslTlsSupport(engine)),
}; new BackpressureBuffer(1000, 10000, 1000000)));
// create handler for pipeline, setting ourselves as payload recipient // create handler for pipeline, setting ourselves as payload recipient
final ActorRef handler = getContext().actorOf( final ActorRef handler = getContext().actorOf(
TcpPipelineHandler.create(init, getSender(), getSelf())); TcpPipelineHandler.create(init, getSender(), getSelf()));
@ -171,14 +163,11 @@ public class SslDocTest {
} else if (msg instanceof Init.Event) { } else if (msg instanceof Init.Event) {
// unwrap TcpPipelineHandlers event to get a Tcp.Event // unwrap TcpPipelineHandlers event to get a Tcp.Event
final Event recv = init.event(msg); final String recv = init.event(msg);
if (recv instanceof Received) {
// inform someone of the received message // inform someone of the received message
listener.tell(((Received) recv).data().utf8String(), getSelf()); listener.tell(recv, getSelf());
// and reply (sender is the SSL handler created above) // and reply (sender is the SSL handler created above)
getSender().tell(init.command( getSender().tell(init.command("world\n"), getSelf());
TcpMessage.write(ByteString.fromString("world"))), getSelf());
}
} }
} }
} }
@ -201,9 +190,9 @@ public class SslDocTest {
assert getLastSender() == server; assert getLastSender() == server;
final ActorRef client = system.actorOf(Props.create(SslClient.class, bound.localAddress(), ctx, getRef())); final ActorRef client = system.actorOf(Props.create(SslClient.class, bound.localAddress(), ctx, getRef()));
expectMsgEquals("hello"); expectMsgEquals("hello\n");
assert getLastSender() == server; assert getLastSender() == server;
expectMsgEquals("world"); expectMsgEquals("world\n");
assert getLastSender() == client; assert getLastSender() == client;
} }
}; };

View file

@ -563,7 +563,7 @@ resending of all queued data:
It should be noted that all writes which are currently buffered have also been It should be noted that all writes which are currently buffered have also been
sent to the connection actor upon entering this state, which means that the sent to the connection actor upon entering this state, which means that the
:class:`ResumeWriting` message is enqueued after those writes, leading to the :class:`ResumeWriting` message is enqueued after those writes, leading to the
reception of all outstanding :class:`CommandFailre` messages (which are ignored reception of all outstanding :class:`CommandFailed` messages (which are ignored
in this state) before receiving the :class:`WritingResumed` signal. That latter in this state) before receiving the :class:`WritingResumed` signal. That latter
message is sent by the connection actor only once the internally queued write message is sent by the connection actor only once the internally queued write
has been fully completed, meaning that a subsequent write will not fail. This has been fully completed, meaning that a subsequent write will not fail. This
@ -590,36 +590,55 @@ first look at the SSL server:
.. includecode:: code/docs/io/japi/SslDocTest.java#server .. includecode:: code/docs/io/japi/SslDocTest.java#server
Please refer to `the source code`_ to see all imports.
.. _the source code: @github@/akka-docs/rst/java/code/docs/io/japi/SslDocTest.java
The actor above binds to a local port and registers itself as the handler for The actor above binds to a local port and registers itself as the handler for
new connections. When a new connection comes in it will create a new connections. When a new connection comes in it will create a
:class:`javax.net.ssl.SSLEngine` (details not shown here since they vary wildly :class:`javax.net.ssl.SSLEngine` (details not shown here since they vary widely
for different setups, please refer to the JDK documentation) and wrap that in for different setups, please refer to the JDK documentation) and wrap that in
an :class:`SslTlsSupport` pipeline stage (which is included in ``akka-actor``). an :class:`SslTlsSupport` pipeline stage (which is included in ``akka-actor``).
This single-stage pipeline will be driven by a :class:`TcpPipelineHandler`
actor which is also included in ``akka-actor``. In order to capture the generic This sample demonstrates a few more things: below the SSL pipeline stage we
command and event types consumed and emitted by that actor we need to create a have inserted a backpressure buffer which will generate a
wrapper—the nested :class:`Init` class—which also provides the :class:`HighWatermarkReached` event to tell the upper stages to suspend writing
:meth:`makeContext` method for creating the pipeline context needed by the (generated at 10000 buffered bytes) and a :class:`LowWatermarkReached` when
supplied pipeline. With those things bundled up all that remains is creating a they can resume writing (when buffer empties below 1000 bytes); the buffer has
a maximum capacity of 1MB. The implementation is very similar to the NACK-based
backpressure approach presented above. Above the SSL stage comes an adapter
which extracts only the payload data from the TCP commands and events, i.e. it
speaks :class:`ByteString` above. The resulting byte streams are broken into
frames by a :class:`DelimiterFraming` stage which chops them up on newline
characters. The top-most stage then converts between :class:`String` and UTF-8
encoded :class:`ByteString`.
As a result the pipeline will accept simple :class:`String` commands, encode
them using UTF-8, delimit them with newlines (which are expected to be already
present in the sending direction), transform them into TCP commands and events,
encrypt them and send them off to the connection actor while buffering writes.
This pipeline is driven by a :class:`TcpPipelineHandler` actor which is also
included in ``akka-actor``. In order to capture the generic command and event
types consumed and emitted by that actor we need to create a wrapper—the nested
:class:`Init` class—which also provides the the pipeline context needed by the
supplied pipeline; in this case we use the :meth:`withLogger` convenience
method which supplies a context that implements :class:`HasLogger` and
:class:`HasActorContext` and should be sufficient for typical pipelines. With
those things bundled up all that remains is creating a
:class:`TcpPipelineHandler` and registering that one as the recipient of :class:`TcpPipelineHandler` and registering that one as the recipient of
inbound traffic from the TCP connection. inbound traffic from the TCP connection.
Since we instructed that handler actor to send any events which are emitted by Since we instructed that handler actor to send any events which are emitted by
the SSL pipeline to ourselves, we can then just wait for the reception of the the SSL pipeline to ourselves, we can then just wait for the reception of the
decrypted payload messages, compute a response—just ``"world"`` in this decrypted payload messages, compute a response—just ``"world\n"`` in this
case—and reply by sending back a ``Tcp.Write``. It should be noted that case—and reply by sending back an ``Init.Command``. It should be noted that
communication with the handler wraps commands and events in the inner types of communication with the handler wraps commands and events in the inner types of
the ``init`` object in order to keep things well separated. To ease handling of the ``init`` object in order to keep things well separated. To ease handling of
such path-dependent types there exist two helper methods, namely such path-dependent types there exist two helper methods, namely
:class:`Init.command` for creating a command and :class:`Init.event` for :class:`Init.command` for creating a command and :class:`Init.event` for
unwrapping an event. unwrapping an event.
.. warning::
The :class:`TcpPipelineHandler` does currently not handle back-pressure from
the TCP socket, i.e. it will just lose data when the kernel buffer
overflows. This will be fixed before Akka 2.2 final.
Looking at the client side we see that not much needs to be changed: Looking at the client side we see that not much needs to be changed:
.. includecode:: code/docs/io/japi/SslDocTest.java#client .. includecode:: code/docs/io/japi/SslDocTest.java#client

View file

@ -81,6 +81,8 @@ class EchoHandler(connection: ActorRef, remote: InetSocketAddress)
import Tcp._ import Tcp._
case class Ack(offset: Int) extends Event
// sign death pact: this actor terminates when connection breaks // sign death pact: this actor terminates when connection breaks
context watch connection context watch connection
@ -90,13 +92,13 @@ class EchoHandler(connection: ActorRef, remote: InetSocketAddress)
//#writing //#writing
def writing: Receive = { def writing: Receive = {
case Received(data) case Received(data)
connection ! Write(data, currentOffset) connection ! Write(data, Ack(currentOffset))
buffer(data) buffer(data)
case ack: Int case Ack(ack)
acknowledge(ack) acknowledge(ack)
case CommandFailed(Write(_, ack: Int)) case CommandFailed(Write(_, Ack(ack)))
connection ! ResumeWriting connection ! ResumeWriting
context become buffering(ack) context become buffering(ack)
@ -115,8 +117,8 @@ class EchoHandler(connection: ActorRef, remote: InetSocketAddress)
case Received(data) buffer(data) case Received(data) buffer(data)
case WritingResumed writeFirst() case WritingResumed writeFirst()
case PeerClosed peerClosed = true case PeerClosed peerClosed = true
case ack: Int if ack < nack acknowledge(ack) case Ack(ack) if ack < nack acknowledge(ack)
case ack: Int case Ack(ack)
acknowledge(ack) acknowledge(ack)
if (storage.nonEmpty) { if (storage.nonEmpty) {
if (toAck > 0) { if (toAck > 0) {
@ -148,7 +150,7 @@ class EchoHandler(connection: ActorRef, remote: InetSocketAddress)
}, discardOld = false) }, discardOld = false)
case ack: Int case Ack(ack)
acknowledge(ack) acknowledge(ack)
if (storage.isEmpty) context stop self if (storage.isEmpty) context stop self
} }
@ -159,15 +161,15 @@ class EchoHandler(connection: ActorRef, remote: InetSocketAddress)
} }
//#storage-omitted //#storage-omitted
var storageOffset = 0 private var storageOffset = 0
var storage = Vector.empty[ByteString] private var storage = Vector.empty[ByteString]
var stored = 0L private var stored = 0L
var transferred = 0L private var transferred = 0L
val maxStored = 100000000L val maxStored = 100000000L
val highWatermark = maxStored * 5 / 10 val highWatermark = maxStored * 5 / 10
val lowWatermark = maxStored * 3 / 10 val lowWatermark = maxStored * 3 / 10
var suspended = false private var suspended = false
private def currentOffset = storageOffset + storage.size private def currentOffset = storageOffset + storage.size
@ -207,12 +209,12 @@ class EchoHandler(connection: ActorRef, remote: InetSocketAddress)
//#helpers //#helpers
private def writeFirst(): Unit = { private def writeFirst(): Unit = {
connection ! Write(storage(0), storageOffset) connection ! Write(storage(0), Ack(storageOffset))
} }
private def writeAll(): Unit = { private def writeAll(): Unit = {
for ((data, i) storage.zipWithIndex) { for ((data, i) storage.zipWithIndex) {
connection ! Write(data, storageOffset + i) connection ! Write(data, Ack(storageOffset + i))
} }
} }
@ -229,7 +231,7 @@ class SimpleEchoHandler(connection: ActorRef, remote: InetSocketAddress)
// sign death pact: this actor terminates when connection breaks // sign death pact: this actor terminates when connection breaks
context watch connection context watch connection
case object Ack case object Ack extends Event
def receive = { def receive = {
case Received(data) case Received(data)

View file

@ -583,7 +583,7 @@ resending of all queued data:
It should be noted that all writes which are currently buffered have also been It should be noted that all writes which are currently buffered have also been
sent to the connection actor upon entering this state, which means that the sent to the connection actor upon entering this state, which means that the
:class:`ResumeWriting` message is enqueued after those writes, leading to the :class:`ResumeWriting` message is enqueued after those writes, leading to the
reception of all outstanding :class:`CommandFailre` messages (which are ignored reception of all outstanding :class:`CommandFailed` messages (which are ignored
in this state) before receiving the :class:`WritingResumed` signal. That latter in this state) before receiving the :class:`WritingResumed` signal. That latter
message is sent by the connection actor only once the internally queued write message is sent by the connection actor only once the internally queued write
has been fully completed, meaning that a subsequent write will not fail. This has been fully completed, meaning that a subsequent write will not fail. This
@ -609,32 +609,47 @@ This example shows the different parts described above working together:
.. includecode:: ../../../akka-remote/src/test/scala/akka/io/ssl/SslTlsSupportSpec.scala#server .. includecode:: ../../../akka-remote/src/test/scala/akka/io/ssl/SslTlsSupportSpec.scala#server
The actor above is meant to be registered as the inbound connection handler for The actor above binds to a local port and registers itself as the handler for
a listen socket. When a new connection comes in it will create a new connections. When a new connection comes in it will create a
:class:`javax.net.ssl.SSLEngine` (details not shown here since they vary wildly :class:`javax.net.ssl.SSLEngine` (details not shown here since they vary widely
for different setups, please refer to the JDK documentation) and wrap that in for different setups, please refer to the JDK documentation) and wrap that in
an :class:`SslTlsSupport` pipeline stage (which is included in ``akka-actor``). an :class:`SslTlsSupport` pipeline stage (which is included in ``akka-actor``).
This single-stage pipeline will be driven by a :class:`TcpPipelineHandler`
actor which is also included in ``akka-actor``. In order to capture the generic This sample demonstrates a few more things: below the SSL pipeline stage we
command and event types consumed and emitted by that actor we need to create a have inserted a backpressure buffer which will generate a
wrapper—the nested :class:`Init` class—which also provides the :class:`HighWatermarkReached` event to tell the upper stages to suspend writing
:meth:`makeContext` method for creating the pipeline context needed by the and a :class:`LowWatermarkReached` when they can resume writing. The
supplied pipeline. With those things bundled up all that remains is creating a implementation is very similar to the NACK-based backpressure approach
presented above. Above the SSL stage comes an adapter which extracts only the
payload data from the TCP commands and events, i.e. it speaks
:class:`ByteString` above. The resulting byte streams are broken into frames by
a :class:`DelimiterFraming` stage which chops them up on newline characters.
The top-most stage then converts between :class:`String` and UTF-8 encoded
:class:`ByteString`.
As a result the pipeline will accept simple :class:`String` commands, encode
them using UTF-8, delimit them with newlines (which are expected to be already
present in the sending direction), transform them into TCP commands and events,
encrypt them and send them off to the connection actor while buffering writes.
This pipeline is driven by a :class:`TcpPipelineHandler` actor which is also
included in ``akka-actor``. In order to capture the generic command and event
types consumed and emitted by that actor we need to create a wrapper—the nested
:class:`Init` class—which also provides the the pipeline context needed by the
supplied pipeline; in this case we use the :meth:`withLogger` convenience
method which supplies a context that implements :class:`HasLogger` and
:class:`HasActorContext` and should be sufficient for typical pipelines. With
those things bundled up all that remains is creating a
:class:`TcpPipelineHandler` and registering that one as the recipient of :class:`TcpPipelineHandler` and registering that one as the recipient of
inbound traffic from the TCP connection. inbound traffic from the TCP connection. The pipeline handler is instructed to
send the decrypted payload data to the following actor:
Since we instructed that handler actor to send any events which are emitted by .. includecode:: ../../../akka-remote/src/test/scala/akka/io/ssl/SslTlsSupportSpec.scala#handler
the SSL pipeline to ourselves, we can then just switch behavior to receive the
decrypted payload message, compute a response and reply by sending back a
``Tcp.Write``. It should be noted that communication with the handler wraps
commands and events in the inner types of the ``init`` object in order to keep
things well separated.
.. warning:: This actor computes a response and replies by sending back a :class:`String`.
It should be noted that communication with the :class:`TcpPipelineHandler`
The :class:`TcpPipelineHandler` does currently not handle back-pressure from wraps commands and events in the inner types of the ``init`` object in order to
the TCP socket, i.e. it will just lose data when the kernel buffer keep things well separated.
overflows. This will be fixed before Akka 2.2 final.
Using UDP Using UDP
--------- ---------

View file

@ -24,20 +24,24 @@
package akka.io.ssl package akka.io.ssl
import akka.TestUtils import java.io.{ BufferedReader, BufferedWriter, InputStreamReader, OutputStreamWriter }
import akka.event.Logging
import akka.event.LoggingAdapter
import akka.io._
import akka.remote.security.provider.AkkaProvider
import akka.testkit.{ TestProbe, AkkaSpec }
import akka.util.{ ByteString, Timeout }
import java.io.{ BufferedWriter, OutputStreamWriter, InputStreamReader, BufferedReader }
import java.net.{ InetSocketAddress, SocketException } import java.net.{ InetSocketAddress, SocketException }
import java.security.{ KeyStore, SecureRandom } import java.security.{ KeyStore, SecureRandom }
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicInteger
import javax.net.ssl._
import scala.concurrent.duration._ import scala.concurrent.duration.DurationInt
import akka.actor.{ Props, ActorLogging, Actor, ActorContext }
import akka.TestUtils
import akka.actor.{ Actor, ActorLogging, ActorRef, Props, Terminated }
import akka.event.{ Logging, LoggingAdapter }
import akka.io.{ BackpressureBuffer, DelimiterFraming, IO, SslTlsSupport, StringByteStringAdapter, Tcp }
import akka.io.TcpPipelineHandler
import akka.io.TcpPipelineHandler.{ Init, Management, WithinActorContext }
import akka.io.TcpReadWriteAdapter
import akka.remote.security.provider.AkkaProvider
import akka.testkit.{ AkkaSpec, TestProbe }
import akka.util.{ ByteString, Timeout }
import javax.net.ssl.{ KeyManagerFactory, SSLContext, SSLServerSocket, SSLSocket, TrustManagerFactory }
// TODO move this into akka-actor once AkkaProvider for SecureRandom does not have external dependencies // TODO move this into akka-actor once AkkaProvider for SecureRandom does not have external dependencies
class SslTlsSupportSpec extends AkkaSpec { class SslTlsSupportSpec extends AkkaSpec {
@ -66,26 +70,26 @@ class SslTlsSupportSpec extends AkkaSpec {
"work between a Java client and a akka server" in { "work between a Java client and a akka server" in {
val serverAddress = TestUtils.temporaryServerAddress() val serverAddress = TestUtils.temporaryServerAddress()
val bindHandler = system.actorOf(Props(classOf[AkkaSslServer], this))
val probe = TestProbe() val probe = TestProbe()
probe.send(IO(Tcp), Tcp.Bind(bindHandler, serverAddress)) val bindHandler = probe.watch(system.actorOf(Props(new AkkaSslServer(serverAddress)), "server1"))
probe.expectMsgType[Tcp.Bound] expectMsg(Tcp.Bound)
val client = new JavaSslClient(serverAddress) val client = new JavaSslClient(serverAddress)
client.run() client.run()
client.close() client.close()
probe.expectTerminated(bindHandler)
} }
"work between a akka client and a akka server" in { "work between a akka client and a akka server" in {
val serverAddress = TestUtils.temporaryServerAddress() val serverAddress = TestUtils.temporaryServerAddress()
val bindHandler = system.actorOf(Props(classOf[AkkaSslServer], this))
val probe = TestProbe() val probe = TestProbe()
probe.send(IO(Tcp), Tcp.Bind(bindHandler, serverAddress)) val bindHandler = probe.watch(system.actorOf(Props(new AkkaSslServer(serverAddress)), "server2"))
probe.expectMsgType[Tcp.Bound] expectMsg(Tcp.Bound)
val client = new AkkaSslClient(serverAddress) val client = new AkkaSslClient(serverAddress)
client.run() client.run()
client.close() client.close()
probe.expectTerminated(bindHandler)
} }
} }
@ -99,15 +103,11 @@ class SslTlsSupportSpec extends AkkaSpec {
val connected = probe.expectMsgType[Tcp.Connected] val connected = probe.expectMsgType[Tcp.Connected]
val connection = probe.sender val connection = probe.sender
val init = new TcpPipelineHandler.Init( val init = TcpPipelineHandler.withLogger(system.log,
new StringByteStringAdapter >> new StringByteStringAdapter >>
new DelimiterFraming(maxSize = 1024, delimiter = ByteString('\n'), includeDelimiter = true) >> new DelimiterFraming(maxSize = 1024, delimiter = ByteString('\n'), includeDelimiter = true) >>
new TcpReadWriteAdapter >> new TcpReadWriteAdapter >>
new SslTlsSupport(sslEngine(connected.remoteAddress, client = true))) { new SslTlsSupport(sslEngine(connected.remoteAddress, client = true)))
override def makeContext(actorContext: ActorContext): HasLogging = new HasLogging {
override def getLogger = system.log
}
}
import init._ import init._
@ -129,60 +129,75 @@ class SslTlsSupportSpec extends AkkaSpec {
} }
def close() { def close() {
probe.send(handler, Tcp.Close) probe.send(handler, Management(Tcp.Close))
probe.expectMsgType[Tcp.Event] match { probe.expectMsgType[Tcp.ConnectionClosed]
case _: Tcp.ConnectionClosed true
}
TestUtils.verifyActorTermination(handler) TestUtils.verifyActorTermination(handler)
} }
} }
//#server //#server
class AkkaSslServer extends Actor with ActorLogging { class AkkaSslServer(local: InetSocketAddress) extends Actor with ActorLogging {
import Tcp.Connected import Tcp._
implicit def system = context.system
IO(Tcp) ! Bind(self, local)
def receive: Receive = { def receive: Receive = {
case _: Bound
context.become(bound(sender))
//#server
testActor ! Bound
//#server
}
def bound(listener: ActorRef): Receive = {
case Connected(remote, _) case Connected(remote, _)
val init = val init = TcpPipelineHandler.withLogger(log,
new TcpPipelineHandler.Init( new StringByteStringAdapter("utf-8") >>
new StringByteStringAdapter >>
new DelimiterFraming(maxSize = 1024, delimiter = ByteString('\n'), new DelimiterFraming(maxSize = 1024, delimiter = ByteString('\n'),
includeDelimiter = true) >> includeDelimiter = true) >>
new TcpReadWriteAdapter >> new TcpReadWriteAdapter >>
new SslTlsSupport(sslEngine(remote, client = false))) { new SslTlsSupport(sslEngine(remote, client = false)) >>
/* new BackpressureBuffer(lowWatermark = 1000, highWatermark = 10000,
* When creating an `Init` the abstract `makeContext` method needs to be maxCapacity = 1000000))
* implemented. If the type of the returned context does not satisfy the
* requirements of all pipeline stages, then youll get an error that
* `makeContext` has an incompatible type.
*/
override def makeContext(actorContext: ActorContext): HasLogging =
new HasLogging {
override def getLogger = log
}
}
import init._
val connection = sender val connection = sender
val handler = system.actorOf( val handler = context.actorOf(Props(new AkkaSslHandler(init)))
TcpPipelineHandler(init, sender, self), "server" + counter.incrementAndGet()) //#server
context watch handler
//#server
val pipeline = context.actorOf(TcpPipelineHandler(init, sender, handler))
connection ! Tcp.Register(handler) connection ! Tcp.Register(pipeline)
//#server
context become { case _: Terminated
case Event(data) listener ! Unbind
val input = data.dropRight(1) context.become {
log.debug("akka-io Server received {} from {}", input, sender) case Unbound context stop self
val response = serverResponse(input)
sender ! Command(response)
log.debug("akka-io Server sent: {}", response.dropRight(1))
} }
//#server
} }
} }
//#server //#server
//#handler
class AkkaSslHandler(init: Init[WithinActorContext, String, String])
extends Actor with ActorLogging {
def receive = {
case init.Event(data)
val input = data.dropRight(1)
log.debug("akka-io Server received {} from {}", input, sender)
val response = serverResponse(input)
sender ! init.Command(response)
log.debug("akka-io Server sent: {}", response.dropRight(1))
case Tcp.PeerClosed context.stop(self)
}
}
//#handler
class JavaSslServer extends Thread { class JavaSslServer extends Thread {
val log: LoggingAdapter = Logging(system, getClass) val log: LoggingAdapter = Logging(system, getClass)
val address = TestUtils.temporaryServerAddress() val address = TestUtils.temporaryServerAddress()