diff --git a/akka-actor-tests/src/test/scala/akka/io/BackpressureSpec.scala b/akka-actor-tests/src/test/scala/akka/io/BackpressureSpec.scala new file mode 100644 index 0000000000..f2818ae2d0 --- /dev/null +++ b/akka-actor-tests/src/test/scala/akka/io/BackpressureSpec.scala @@ -0,0 +1,203 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ + +package akka.io + +import java.net.InetSocketAddress +import java.security.MessageDigest + +import scala.concurrent.Await +import scala.concurrent.duration.{ Duration, DurationInt } +import scala.concurrent.forkjoin.ThreadLocalRandom + +import akka.actor.{ Actor, ActorContext, ActorLogging, ActorRef, Props, ReceiveTimeout, Stash, Terminated } +import akka.io.TcpPipelineHandler.{ Init, Management, WithinActorContext } +import akka.pattern.ask +import akka.testkit.{ AkkaSpec, ImplicitSender } +import akka.util.{ ByteString, Timeout } + +object BackpressureSpec { + + final val ChunkSize = 1024 + + case class StartSending(n: Int) + case class Done(hash: ByteString) + case object Failed + case object Close + + class Sender(receiver: InetSocketAddress) extends Actor with Stash with ActorLogging { + val digest = MessageDigest.getInstance("SHA-1") + digest.reset() + + import context.system + IO(Tcp) ! Tcp.Connect(receiver) + + def receive = { + case _: Tcp.Connected ⇒ + val init = TcpPipelineHandler.withLogger(log, + new TcpReadWriteAdapter >> + new BackpressureBuffer(10000, 1000000, Long.MaxValue)) + val handler = context.actorOf(TcpPipelineHandler(init, sender, self), "pipeline") + sender ! Tcp.Register(handler) + unstashAll() + context.become(connected(init, handler)) + + case _: Tcp.CommandFailed ⇒ + unstashAll() + context.become(failed) + + case _ ⇒ stash() + } + + def connected(init: Init[WithinActorContext, ByteString, ByteString], connection: ActorRef): Receive = { + case StartSending(0) ⇒ sender ! Done(ByteString(digest.digest())) + case StartSending(n) ⇒ + val rnd = ThreadLocalRandom.current + val data = Array.tabulate[Byte](ChunkSize)(_ ⇒ rnd.nextInt().toByte) + digest.update(data) + connection ! init.Command(ByteString(data)) + self forward StartSending(n - 1) + case BackpressureBuffer.HighWatermarkReached ⇒ + context.setReceiveTimeout(5.seconds) + context.become({ + case BackpressureBuffer.LowWatermarkReached ⇒ + unstashAll() + context.setReceiveTimeout(Duration.Undefined) + context.unbecome() + case ReceiveTimeout ⇒ + log.error("receive timeout while throttled") + context.stop(self) + case _ ⇒ stash() + }, discardOld = false) + case ReceiveTimeout ⇒ // that old cancellation race + case Close ⇒ connection ! Management(Tcp.Close) + case Tcp.Closed ⇒ context.stop(self) + } + + val failed: Receive = { + case _ ⇒ sender ! Failed + } + } + + case object GetPort + case class Port(p: Int) + case object GetProgress + case class Progress(n: Int) + case object GetHash + case class Hash(hash: ByteString) + + class Receiver(hiccups: Boolean) extends Actor with Stash with ActorLogging { + val digest = MessageDigest.getInstance("SHA-1") + digest.reset() + + import context.system + IO(Tcp) ! Tcp.Bind(self, new InetSocketAddress("localhost", 0)) + + var listener: ActorRef = _ + + def receive = { + case Tcp.Bound(local) ⇒ + listener = sender + unstashAll() + context.become(bound(local.getPort)) + case _: Tcp.CommandFailed ⇒ + unstashAll() + context.become(failed) + case _ ⇒ stash() + } + + def bound(port: Int): Receive = { + case GetPort ⇒ sender ! Port(port) + case Tcp.Connected(local, remote) ⇒ + val init = TcpPipelineHandler.withLogger(log, + new TcpReadWriteAdapter >> + new BackpressureBuffer(10000, 1000000, Long.MaxValue)) + val handler = context.actorOf(TcpPipelineHandler(init, sender, self), "pipeline") + sender ! Tcp.Register(handler) + unstashAll() + context.become(connected(init, handler)) + case _ ⇒ stash() + } + + def connected(init: Init[WithinActorContext, ByteString, ByteString], connection: ActorRef): Receive = { + var received = 0L + + { + case init.Event(data) ⇒ + digest.update(data.toArray) + received += data.length + if (hiccups && ThreadLocalRandom.current.nextInt(1000) == 0) { + connection ! Management(Tcp.SuspendReading) + import context.dispatcher + system.scheduler.scheduleOnce(100.millis, connection, Management(Tcp.ResumeReading)) + } + case GetProgress ⇒ + sender ! Progress((received / ChunkSize).toInt) + case GetHash ⇒ + sender ! Hash(ByteString(digest.digest())) + case Tcp.PeerClosed ⇒ + listener ! Tcp.Unbind + context.become { + case Tcp.Unbound ⇒ context.stop(self) + } + } + } + + val failed: Receive = { + case _ ⇒ sender ! Failed + } + } +} + +class BackpressureSpec extends AkkaSpec with ImplicitSender { + + import BackpressureSpec._ + + "A BackpressureBuffer" must { + + "transmit the right bytes" in { + val N = 100000 + val recv = watch(system.actorOf(Props(classOf[Receiver], false), "receiver1")) + recv ! GetPort + val port = expectMsgType[Port].p + val send = watch(system.actorOf(Props(classOf[Sender], new InetSocketAddress("localhost", port)), "sender1")) + within(20.seconds) { + send ! StartSending(N) + val hash = expectMsgType[Done].hash + implicit val t = Timeout(100.millis) + awaitAssert(Await.result(recv ? GetProgress, t.duration) === N) + recv ! GetHash + expectMsgType[Hash].hash === hash + } + send ! Close + val terminated = receiveWhile(1.second, messages = 2) { + case Terminated(t) ⇒ t + } + terminated === Set(send, recv) + } + + "transmit the right bytes with hiccups" in { + val N = 100000 + val recv = watch(system.actorOf(Props(classOf[Receiver], true), "receiver2")) + recv ! GetPort + val port = expectMsgType[Port].p + val send = watch(system.actorOf(Props(classOf[Sender], new InetSocketAddress("localhost", port)), "sender2")) + within(20.seconds) { + send ! StartSending(N) + val hash = expectMsgType[Done].hash + implicit val t = Timeout(100.millis) + awaitAssert(Await.result(recv ? GetProgress, t.duration) === N) + recv ! GetHash + expectMsgType[Hash].hash === hash + } + send ! Close + val terminated = receiveWhile(1.second, messages = 2) { + case Terminated(t) ⇒ t + } + terminated === Set(send, recv) + } + + } + +} \ No newline at end of file diff --git a/akka-actor-tests/src/test/scala/akka/io/DelimiterFramingSpec.scala b/akka-actor-tests/src/test/scala/akka/io/DelimiterFramingSpec.scala index 1147dd96de..42a1f8f6a7 100644 --- a/akka-actor-tests/src/test/scala/akka/io/DelimiterFramingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/DelimiterFramingSpec.scala @@ -10,6 +10,8 @@ import akka.actor.{ Props, ActorLogging, Actor, ActorContext } import akka.TestUtils import java.util.concurrent.atomic.AtomicInteger import scala.concurrent.duration._ +import akka.io.TcpPipelineHandler.Management +import akka.actor.ActorRef class DelimiterFramingSpec extends AkkaSpec { @@ -42,6 +44,7 @@ class DelimiterFramingSpec extends AkkaSpec { val probe = TestProbe() probe.send(IO(Tcp), Tcp.Bind(bindHandler, serverAddress)) probe.expectMsgType[Tcp.Bound] + bindHandler ! Listener(probe.lastSender) val client = new AkkaLineClient(serverAddress, delimiter, includeDelimiter) client.run() @@ -58,14 +61,10 @@ class DelimiterFramingSpec extends AkkaSpec { val connected = probe.expectMsgType[Tcp.Connected] val connection = probe.sender - val init = new TcpPipelineHandler.Init( + val init = TcpPipelineHandler.withLogger(system.log, new StringByteStringAdapter >> new DelimiterFraming(maxSize = 1024, delimiter = ByteString(delimiter), includeDelimiter = includeDelimiter) >> - new TcpReadWriteAdapter) { - override def makeContext(actorContext: ActorContext): HasLogging = new HasLogging { - override def getLogger = system.log - } - } + new TcpReadWriteAdapter) import init._ @@ -104,36 +103,32 @@ class DelimiterFramingSpec extends AkkaSpec { } def close() { - probe.send(handler, Tcp.Close) - probe.expectMsgType[Tcp.Event] match { - case _: Tcp.ConnectionClosed ⇒ true - } + probe.send(handler, Management(Tcp.Close)) + probe.expectMsgType[Tcp.ConnectionClosed] TestUtils.verifyActorTermination(handler) } - } + case class Listener(ref: ActorRef) + class AkkaLineEchoServer(delimiter: String, includeDelimiter: Boolean) extends Actor with ActorLogging { import Tcp.Connected + var listener: ActorRef = _ + def receive: Receive = { + case Listener(ref) ⇒ listener = ref case Connected(remote, _) ⇒ val init = - new TcpPipelineHandler.Init( + TcpPipelineHandler.withLogger(log, new StringByteStringAdapter >> new DelimiterFraming(maxSize = 1024, delimiter = ByteString(delimiter), includeDelimiter = includeDelimiter) >> - new TcpReadWriteAdapter) { - override def makeContext(actorContext: ActorContext): HasLogging = - new HasLogging { - override def getLogger = log - } - } + new TcpReadWriteAdapter) import init._ val connection = sender - val handler = system.actorOf( - TcpPipelineHandler(init, sender, self), "server" + counter.incrementAndGet()) + val handler = context.actorOf(TcpPipelineHandler(init, sender, self), "pipeline") connection ! Tcp.Register(handler) @@ -141,6 +136,8 @@ class DelimiterFramingSpec extends AkkaSpec { case Event(data) ⇒ if (includeDelimiter) sender ! Command(data) else sender ! Command(data + delimiter) + case Tcp.PeerClosed ⇒ listener ! Tcp.Unbind + case Tcp.Unbound ⇒ context.stop(self) } } } diff --git a/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala b/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala index 98cf97cbaf..13d67e5e4b 100644 --- a/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala @@ -140,7 +140,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") "write data to network (and acknowledge)" in new EstablishedConnectionTest() { run { - object Ack + object Ack extends Event val writer = TestProbe() // directly acknowledge an empty write @@ -172,7 +172,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") "write data after not acknowledged data" in new EstablishedConnectionTest() { run { - object Ack + object Ack extends Event val writer = TestProbe() writer.send(connectionActor, Write(ByteString(42.toByte))) writer.expectNoMsg(500.millis) @@ -197,7 +197,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") // maximum of 100 MB val size = math.min(testFile.length(), 100000000).toInt - object Ack + object Ack extends Event val writer = TestProbe() writer.send(connectionActor, WriteFile(testFile.getAbsolutePath, 0, size, Ack)) pullFromServerSide(size, 1000000) @@ -225,8 +225,8 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") "backpressure present.") pending ignoreIfWindows() - object Ack1 - object Ack2 + object Ack1 extends Event + object Ack2 extends Event clientSideChannel.socket.setSendBufferSize(1024) @@ -281,7 +281,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") new EstablishedConnectionTest() with SmallRcvBuffer { run { // we should test here that a pending write command is properly finished first - object Ack + object Ack extends Event // set an artificially small send buffer size so that the write is queued // inside the connection actor clientSideChannel.socket.setSendBufferSize(1024) @@ -343,7 +343,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") new EstablishedConnectionTest() with SmallRcvBuffer { run { // we should test here that a pending write command is properly finished first - object Ack + object Ack extends Event // set an artificially small send buffer size so that the write is queued // inside the connection actor clientSideChannel.socket.setSendBufferSize(1024) @@ -376,7 +376,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") ignoreIfWindows() // we should test here that a pending write command is properly finished first - object Ack + object Ack extends Event // set an artificially small send buffer size so that the write is queued // inside the connection actor clientSideChannel.socket.setSendBufferSize(1024) @@ -423,7 +423,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") selector.send(connectionActor, ChannelReadable) connectionHandler.expectMsg(PeerClosed) - object Ack + object Ack extends Event connectionHandler.send(connectionActor, writeCmd(Ack)) pullFromServerSide(TestSize) connectionHandler.expectMsg(Ack) @@ -441,7 +441,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") selector.send(connectionActor, ChannelReadable) connectionHandler.expectMsg(PeerClosed) - object Ack + object Ack extends Event connectionHandler.send(connectionActor, writeCmd(Ack)) pullFromServerSide(TestSize) connectionHandler.expectMsg(Ack) @@ -571,8 +571,9 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") writer.expectMsg(Duration.Zero, WritingResumed) // now write should work again - writer.send(connectionActor, writeCmd("works")) - writer.expectMsg("works") + object works extends Event + writer.send(connectionActor, writeCmd(works)) + writer.expectMsg(works) } } @@ -606,8 +607,9 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") writer.expectMsg(1.second, WritingResumed) // now write should work again - writer.send(connectionActor, writeCmd("works")) - writer.expectMsg("works") + object works extends Event + writer.send(connectionActor, writeCmd(works)) + writer.expectMsg(works) } } @@ -638,8 +640,9 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") pullFromServerSide(TestSize * written) // now write should work again - writer.send(connectionActor, writeCmd("works")) - writer.expectMsg("works") + object works extends Event + writer.send(connectionActor, writeCmd(works)) + writer.expectMsg(works) } } @@ -663,8 +666,9 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") pullFromServerSide(TestSize * written) // now write should work again - writer.send(connectionActor, writeCmd("works")) - writer.expectMsg("works") + object works extends Event + writer.send(connectionActor, writeCmd(works)) + writer.expectMsg(works) } } } @@ -767,7 +771,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") final val TestSize = 10000 // compile-time constant - def writeCmd(ack: AnyRef) = + def writeCmd(ack: Event) = Write(ByteString(Array.fill[Byte](TestSize)(0)), ack) def closeServerSideAndWaitForClientReadable(fullClose: Boolean = true): Unit = { diff --git a/akka-actor-tests/src/test/scala/akka/io/TcpIntegrationSpec.scala b/akka-actor-tests/src/test/scala/akka/io/TcpIntegrationSpec.scala index cac2ab5a88..383e0fe225 100644 --- a/akka-actor-tests/src/test/scala/akka/io/TcpIntegrationSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/TcpIntegrationSpec.scala @@ -41,12 +41,15 @@ class TcpIntegrationSpec extends AkkaSpec("akka.loglevel = INFO") with TcpIntegr "properly complete one client/server request/response cycle" in new TestSetup { val (clientHandler, clientConnection, serverHandler, serverConnection) = establishNewClientConnection() - clientHandler.send(clientConnection, Write(ByteString("Captain on the bridge!"), 'Aye)) - clientHandler.expectMsg('Aye) + object Aye extends Event + object Yes extends Event + + clientHandler.send(clientConnection, Write(ByteString("Captain on the bridge!"), Aye)) + clientHandler.expectMsg(Aye) serverHandler.expectMsgType[Received].data.decodeString("ASCII") must be("Captain on the bridge!") - serverHandler.send(serverConnection, Write(ByteString("For the king!"), 'Yes)) - serverHandler.expectMsg('Yes) + serverHandler.send(serverConnection, Write(ByteString("For the king!"), Yes)) + serverHandler.expectMsg(Yes) clientHandler.expectMsgType[Received].data.decodeString("ASCII") must be("For the king!") serverHandler.send(serverConnection, Close) @@ -60,8 +63,10 @@ class TcpIntegrationSpec extends AkkaSpec("akka.loglevel = INFO") with TcpIntegr "support waiting for writes with backpressure" in new TestSetup { val (clientHandler, clientConnection, serverHandler, serverConnection) = establishNewClientConnection() - serverHandler.send(serverConnection, Write(ByteString(Array.fill[Byte](100000)(0)), 'Ack)) - serverHandler.expectMsg('Ack) + object Ack extends Event + + serverHandler.send(serverConnection, Write(ByteString(Array.fill[Byte](100000)(0)), Ack)) + serverHandler.expectMsg(Ack) expectReceivedData(clientHandler, 100000) diff --git a/akka-actor/src/main/scala/akka/io/Pipelines.scala b/akka-actor/src/main/scala/akka/io/Pipelines.scala index 187815c183..cfa131a554 100644 --- a/akka-actor/src/main/scala/akka/io/Pipelines.scala +++ b/akka-actor/src/main/scala/akka/io/Pipelines.scala @@ -721,6 +721,223 @@ abstract class PipelineStage[Context <: PipelineContext, CmdAbove, CmdBelow, Evt } } +object BackpressureBuffer { + /** + * Message type which is sent when the buffer’s high watermark has been + * reached, which means that further write requests should not be sent + * until the low watermark has been reached again. + */ + trait HighWatermarkReached extends Tcp.Event + case object HighWatermarkReached extends HighWatermarkReached + + /** + * Message type which is sent when the buffer’s fill level falls below + * the low watermark, which means that writing can commence again. + */ + trait LowWatermarkReached extends Tcp.Event + case object LowWatermarkReached extends LowWatermarkReached +} + +/** + * This pipeline stage implements a configurable buffer for transforming the + * per-write ACK/NACK-based backpressure model of a TCP connection actor into + * an edge-triggered back-pressure model: the upper stages will receive + * notification when the buffer runs full ([[HighWatermarkReached]]) and when + * it subsequently empties ([[LowWatermarkReached]]). The upper layers should + * respond by not generating more writes when the buffer is full. There is also + * a hard limit upon which this buffer will abort the connection. + * + * All limits are configurable and are given in number of bytes. + * The `highWatermark` should be set such that the + * amount of data generated before reception of the asynchronous + * [[HighWatermarkReached]] notification does not lead to exceeding the + * `maxCapacity` hard limit; if the writes may arrive in bursts then the + * difference between these two should allow for at least one burst to be sent + * after the high watermark has been reached. The `lowWatermark` must be less + * than or equal to the `highWatermark`, where the difference between these two + * defines the hysteresis, i.e. how often these notifications are sent out (i.e. + * if the difference is rather large then it will take some time for the buffer + * to empty below the low watermark, and that room is then available for data + * sent in response to the [[LowWatermarkReached]] notification; if the + * difference was small then the buffer would more quickly oscillate between + * these two limits). + */ +class BackpressureBuffer(lowWatermark: Long, highWatermark: Long, maxCapacity: Long) + extends PipelineStage[HasLogging, Tcp.Command, Tcp.Command, Tcp.Event, Tcp.Event] { + + require(lowWatermark >= 0, "lowWatermark needs to be non-negative") + require(highWatermark >= lowWatermark, "highWatermark needs to be at least as large as lowWatermark") + require(maxCapacity >= highWatermark, "maxCapacity needs to be at least as large as highWatermark") + + case class Ack(num: Int, ack: Tcp.Event) extends Tcp.Event + + override def apply(ctx: HasLogging) = new PipePair[Tcp.Command, Tcp.Command, Tcp.Event, Tcp.Event] { + + import Tcp._ + import BackpressureBuffer._ + + private val log = ctx.getLogger + + private var storageOffset = 0 + private var storage = Vector.empty[Write] + private def currentOffset = storageOffset + storage.size + + private var stored = 0L + private var suspended = false + + private var behavior = writing + override def commandPipeline = behavior + override def eventPipeline = behavior + + private def become(f: Message ⇒ Iterable[Result]) { behavior = f } + + private lazy val writing: Message ⇒ Iterable[Result] = { + case Write(data, ack) ⇒ + buffer(Write(data, Ack(currentOffset, ack)), doWrite = true) + + case CommandFailed(Write(_, Ack(offset, _))) ⇒ + become(buffering(offset)) + ctx.singleCommand(ResumeWriting) + + case cmd: CloseCommand ⇒ cmd match { + case _ if storage.isEmpty ⇒ + become(finished) + ctx.singleCommand(cmd) + case Abort ⇒ + storage = Vector.empty + become(finished) + ctx.singleCommand(Abort) + case _ ⇒ + become(closing(cmd)) + ctx.nothing + } + + case Ack(seq, ack) ⇒ acknowledge(seq, ack) + + case cmd: Command ⇒ ctx.singleCommand(cmd) + case evt: Event ⇒ ctx.singleEvent(evt) + } + + private def buffering(nack: Int): Message ⇒ Iterable[Result] = { + var toAck = 10 + var closed: CloseCommand = null + + { + case Write(data, ack) ⇒ + buffer(Write(data, Ack(currentOffset, ack)), doWrite = false) + + case WritingResumed ⇒ + ctx.singleCommand(storage(0)) + + case cmd: CloseCommand ⇒ cmd match { + case Abort ⇒ + storage = Vector.empty + become(finished) + ctx.singleCommand(Abort) + case _ ⇒ + closed = cmd + ctx.nothing + } + + case Ack(seq, ack) if seq < nack ⇒ acknowledge(seq, ack) + + case Ack(seq, ack) ⇒ + val ackMsg = acknowledge(seq, ack) + if (storage.nonEmpty) { + if (toAck > 0) { + toAck -= 1 + ctx.dealias(ackMsg) ++ Seq(Right(storage(0))) + } else { + become(if (closed != null) closing(closed) else writing) + ctx.dealias(ackMsg) ++ storage.map(Right(_)) + } + } else if (closed != null) { + become(finished) + ctx.dealias(ackMsg) ++ Seq(Right(closed)) + } else { + become(writing) + ackMsg + } + + case CommandFailed(_: Write) ⇒ ctx.nothing + case cmd: Command ⇒ ctx.singleCommand(cmd) + case evt: Event ⇒ ctx.singleEvent(evt) + } + } + + private def closing(cmd: CloseCommand): Message ⇒ Iterable[Result] = { + case Ack(seq, ack) ⇒ + val result = acknowledge(seq, ack) + if (storage.isEmpty) { + become(finished) + ctx.dealias(result) ++ Seq(Right(cmd)) + } else result + + case CommandFailed(_: Write) ⇒ + become({ + case WritingResumed ⇒ + become(closing(cmd)) + storage.map(Right(_)) + case CommandFailed(_: Write) ⇒ ctx.nothing + case cmd: Command ⇒ ctx.singleCommand(cmd) + case evt: Event ⇒ ctx.singleEvent(evt) + }) + ctx.singleCommand(ResumeWriting) + + case cmd: Command ⇒ ctx.singleCommand(cmd) + case evt: Event ⇒ ctx.singleEvent(evt) + } + + private val finished: Message ⇒ Iterable[Result] = { + case _: Write ⇒ ctx.nothing + case CommandFailed(_: Write) ⇒ ctx.nothing + case cmd: Command ⇒ ctx.singleCommand(cmd) + case evt: Event ⇒ ctx.singleEvent(evt) + } + + private def buffer(w: Write, doWrite: Boolean): Iterable[Result] = { + storage :+= w + stored += w.data.size + + if (stored > maxCapacity) { + log.warning("aborting connection (buffer overrun)") + become(finished) + ctx.singleCommand(Abort) + } else if (stored > highWatermark && !suspended) { + log.debug("suspending writes") + suspended = true + if (doWrite) { + Seq(Right(w), Left(HighWatermarkReached)) + } else { + ctx.singleEvent(HighWatermarkReached) + } + } else if (doWrite) { + ctx.singleCommand(w) + } else Nil + } + + private def acknowledge(seq: Int, ack: Event): Iterable[Result] = { + require(seq == storageOffset, s"received ack $seq at $storageOffset") + require(storage.nonEmpty, s"storage was empty at ack $seq") + + val size = storage(0).data.size + stored -= size + + storageOffset += 1 + storage = storage drop 1 + + if (suspended && stored < lowWatermark) { + log.debug("resuming writes") + suspended = false + if (ack == NoAck) ctx.singleEvent(LowWatermarkReached) + else Vector(Left(ack), Left(LowWatermarkReached)) + } else if (ack == NoAck) ctx.nothing + else ctx.singleEvent(ack) + } + } + +} + //#length-field-frame /** * Pipeline stage for length-field encoded framing. It will prepend a diff --git a/akka-actor/src/main/scala/akka/io/SslTlsSupport.scala b/akka-actor/src/main/scala/akka/io/SslTlsSupport.scala index d90325a9fb..1114c64cec 100644 --- a/akka-actor/src/main/scala/akka/io/SslTlsSupport.scala +++ b/akka-actor/src/main/scala/akka/io/SslTlsSupport.scala @@ -219,7 +219,7 @@ class SslTlsSupport(engine: SSLEngine) extends PipelineStage[HasLogging, Command } } - private final class Send(val buffer: ByteBuffer, val ack: Any) + private final class Send(val buffer: ByteBuffer, val ack: Event) private object Send { val Empty = new Send(ByteBuffer wrap SslTlsSupport.EmptyByteArray, Tcp.NoAck) diff --git a/akka-actor/src/main/scala/akka/io/Tcp.scala b/akka-actor/src/main/scala/akka/io/Tcp.scala index 48c58335ac..a333037fb9 100644 --- a/akka-actor/src/main/scala/akka/io/Tcp.scala +++ b/akka-actor/src/main/scala/akka/io/Tcp.scala @@ -63,12 +63,14 @@ object Tcp extends ExtensionKey[TcpExt] { } + trait Message + /// COMMANDS /** * This is the common trait for all commands understood by TCP actors. */ - trait Command extends IO.HasFailureMessage { + trait Command extends Message with IO.HasFailureMessage { def failureMessage = CommandFailed(this) } @@ -102,7 +104,7 @@ object Tcp extends ExtensionKey[TcpExt] { override def event = Aborted } - case class NoAck(token: Any) + case class NoAck(token: Any) extends Event object NoAck extends NoAck(null) sealed trait WriteCommand extends Command { @@ -116,7 +118,7 @@ object Tcp extends ExtensionKey[TcpExt] { * Write data to the TCP connection. If no ack is needed use the special * `NoAck` object. */ - case class Write(data: ByteString, ack: Any) extends WriteCommand + case class Write(data: ByteString, ack: Event) extends WriteCommand object Write { /** * The empty Write doesn't write anything and isn't acknowledged. @@ -149,7 +151,7 @@ object Tcp extends ExtensionKey[TcpExt] { case object ResumeReading extends Command /// EVENTS - trait Event + trait Event extends Message case class Received(data: ByteString) extends Event case class Connected(remoteAddress: InetSocketAddress, localAddress: InetSocketAddress) extends Event @@ -274,7 +276,7 @@ object TcpMessage { def noAck(token: AnyRef): NoAck = NoAck(token) def write(data: ByteString): Command = Write(data) - def write(data: ByteString, ack: AnyRef): Command = Write(data, ack) + def write(data: ByteString, ack: Event): Command = Write(data, ack) def suspendReading: Command = SuspendReading def resumeReading: Command = ResumeReading diff --git a/akka-actor/src/main/scala/akka/io/TcpPipelineHandler.scala b/akka-actor/src/main/scala/akka/io/TcpPipelineHandler.scala index 12fd72d2ab..e3e6a8ee3d 100644 --- a/akka-actor/src/main/scala/akka/io/TcpPipelineHandler.scala +++ b/akka-actor/src/main/scala/akka/io/TcpPipelineHandler.scala @@ -4,21 +4,16 @@ package akka.io -import akka.actor.Actor -import akka.actor.ActorContext import scala.beans.BeanProperty -import akka.actor.ActorRef -import scala.util.Success -import scala.util.Failure -import akka.actor.Terminated -import akka.actor.Props +import scala.util.{ Failure, Success } +import akka.actor.{ Actor, ActorContext, ActorRef, Props, Terminated } +import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics } import akka.util.ByteString -import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue } +import akka.event.Logging +import akka.event.LoggingAdapter object TcpPipelineHandler { - case class EscapeEvent(ev: Tcp.Event) extends Tcp.Command - /** * This class wraps up a pipeline with its external (i.e. “top”) command and * event types and providing unique wrappers for sending commands and @@ -26,25 +21,61 @@ object TcpPipelineHandler { * instance of [[Init]]). All events emitted by the pipeline will be sent to * the registered handler wrapped in an Event. */ - abstract class Init[Ctx <: PipelineContext, Cmd, Evt](val stages: PipelineStage[Ctx, Cmd, Tcp.Command, Evt, Tcp.Event]) { + abstract class Init[Ctx <: PipelineContext, Cmd, Evt]( + val stages: PipelineStage[_ >: Ctx <: PipelineContext, Cmd, Tcp.Command, Evt, Tcp.Event]) { + + /** + * This method must be implemented to return the [[PipelineContext]] + * necessary for the operation of the given [[PipelineStage]]. + */ def makeContext(actorContext: ActorContext): Ctx + /** + * Java API: construct a command to be sent to the [[TcpPipelineHandler]] + * actor. + */ def command(cmd: Cmd): Command = Command(cmd) + + /** + * Java API: extract a wrapped event received from the [[TcpPipelineHandler]] + * actor. + * + * @throws MatchError if the given object is not an Event matching this + * specific Init instance. + */ def event(evt: AnyRef): Evt = evt match { case Event(evt) ⇒ evt } - final case class Command(@BeanProperty cmd: Cmd) - final case class Event(@BeanProperty evt: Evt) + /** + * Wrapper class for commands to be sent to the [[TcpPipelineHandler]] actor. + */ + case class Command(@BeanProperty cmd: Cmd) + + /** + * Wrapper class for events emitted by the [[TcpPipelineHandler]] actor. + */ + case class Event(@BeanProperty evt: Evt) } /** - * Wrapper around acknowledgements: if a Tcp.Write is generated which - * request an ACK then it is wrapped such that the ACK can flow back up the - * pipeline later, allowing you to use arbitrary ACK messages (not just - * subtypes of Tcp.Event). + * This interface bundles logging and ActorContext for Java. */ - case class Ack(ack: Any) extends Tcp.Event + trait WithinActorContext extends HasLogging with HasActorContext + + def withLogger[Cmd, Evt](log: LoggingAdapter, + stages: PipelineStage[_ >: WithinActorContext <: PipelineContext, Cmd, Tcp.Command, Evt, Tcp.Event]): Init[WithinActorContext, Cmd, Evt] = + new Init[WithinActorContext, Cmd, Evt](stages) { + override def makeContext(ctx: ActorContext): WithinActorContext = new WithinActorContext { + override def getLogger = log + override def getContext = ctx + } + } + + /** + * Wrapper class for management commands sent to the [[TcpPipelineHandler]] actor. + */ + case class Management(@BeanProperty cmd: AnyRef) /** * This is a new Tcp.Command which the pipeline can emit to effect the @@ -54,6 +85,14 @@ object TcpPipelineHandler { */ case class Tell(receiver: ActorRef, msg: Any, sender: ActorRef) extends Tcp.Command + /** + * The pipeline may want to emit a [[Tcp.Event]] to the registered handler + * actor, which is enabled by emitting this [[Tcp.Command]] wrapping an event + * instead. The [[TcpPipelineHandler]] actor will upon reception of this command + * forward the wrapped event to the handler. + */ + case class TcpEvent(@BeanProperty evt: Tcp.Event) extends Tcp.Command + /** * Scala API: create [[Props]] for a pipeline handler */ @@ -103,10 +142,8 @@ class TcpPipelineHandler[Ctx <: PipelineContext, Cmd, Evt]( val pipes = PipelineFactory.buildWithSinkFunctions(ctx, init.stages)({ case Success(cmd) ⇒ cmd match { - case Tcp.Write(data, Tcp.NoAck) ⇒ connection ! cmd - case Tcp.Write(data, ack) ⇒ connection ! Tcp.Write(data, Ack(ack)) case Tell(receiver, msg, sender) ⇒ receiver.tell(msg, sender) - case EscapeEvent(ev) ⇒ handler ! ev + case TcpEvent(ev) ⇒ handler ! ev case _ ⇒ connection ! cmd } case Failure(ex) ⇒ throw ex @@ -116,10 +153,11 @@ class TcpPipelineHandler[Ctx <: PipelineContext, Cmd, Evt]( }) def receive = { - case Command(cmd) ⇒ pipes.injectCommand(cmd) - case evt: Tcp.Event ⇒ pipes.injectEvent(evt) - case Terminated(`handler`) ⇒ connection ! Tcp.Abort - case cmd: Tcp.Command ⇒ pipes.managementCommand(cmd) + case Command(cmd) ⇒ pipes.injectCommand(cmd) + case evt: Tcp.Event ⇒ pipes.injectEvent(evt) + case Management(cmd) ⇒ pipes.managementCommand(cmd) + case Terminated(`handler`) ⇒ connection ! Tcp.Abort + case Terminated(`connection`) ⇒ context.stop(self) } } @@ -131,10 +169,11 @@ class TcpPipelineHandler[Ctx <: PipelineContext, Cmd, Evt]( * * While this adapter communicates to the stage above it via raw ByteStrings, it is possible to inject Tcp Command * by sending them to the management port, and the adapter will simply pass them down to the stage below. Incoming Tcp Events - * that are not Receive events will be passed directly to the handler registered for TcpPipelineHandler. - * @tparam Ctx + * that are not Receive events will be passed downwards wrapped in a [[TcpEvent]]; the [[TcpPipelineHandler]] will + * send these notifications to the registered event handler actor. */ class TcpReadWriteAdapter extends PipelineStage[PipelineContext, ByteString, Tcp.Command, ByteString, Tcp.Event] { + import TcpPipelineHandler.TcpEvent override def apply(ctx: PipelineContext) = new PipePair[ByteString, Tcp.Command, ByteString, Tcp.Event] { @@ -144,7 +183,7 @@ class TcpReadWriteAdapter extends PipelineStage[PipelineContext, ByteString, Tcp override val eventPipeline = (evt: Tcp.Event) ⇒ evt match { case Tcp.Received(data) ⇒ ctx.singleEvent(data) - case ev: Tcp.Event ⇒ ctx.singleCommand(TcpPipelineHandler.EscapeEvent(ev)) + case ev: Tcp.Event ⇒ ctx.singleCommand(TcpEvent(ev)) } override val managementPort: Mgmt = { diff --git a/akka-docs/rst/java/code/docs/io/japi/EchoHandler.java b/akka-docs/rst/java/code/docs/io/japi/EchoHandler.java index 53afaa524b..4021098b96 100644 --- a/akka-docs/rst/java/code/docs/io/japi/EchoHandler.java +++ b/akka-docs/rst/java/code/docs/io/japi/EchoHandler.java @@ -14,6 +14,7 @@ import akka.event.Logging; import akka.event.LoggingAdapter; import akka.io.Tcp.CommandFailed; import akka.io.Tcp.ConnectionClosed; +import akka.io.Tcp.Event; import akka.io.Tcp.Received; import akka.io.Tcp.Write; import akka.io.Tcp.WritingResumed; @@ -33,6 +34,13 @@ public class EchoHandler extends UntypedActor { public static final long MAX_STORED = 100000000; public static final long HIGH_WATERMARK = MAX_STORED * 5 / 10; public static final long LOW_WATERMARK = MAX_STORED * 2 / 10; + + private static class Ack implements Event { + public final int ack; + public Ack(int ack) { + this.ack = ack; + } + } public EchoHandler(ActorRef connection, InetSocketAddress remote) { this.connection = connection; @@ -50,7 +58,7 @@ public class EchoHandler extends UntypedActor { public void apply(Object msg) throws Exception { if (msg instanceof Received) { final ByteString data = ((Received) msg).data(); - connection.tell(TcpMessage.write(data, currentOffset()), getSelf()); + connection.tell(TcpMessage.write(data, new Ack(currentOffset())), getSelf()); buffer(data); } else if (msg instanceof Integer) { @@ -59,7 +67,7 @@ public class EchoHandler extends UntypedActor { } else if (msg instanceof CommandFailed) { final Write w = (Write) ((CommandFailed) msg).cmd(); connection.tell(TcpMessage.resumeWriting(), getSelf()); - getContext().become(buffering((Integer) w.ack())); + getContext().become(buffering((Ack) w.ack())); } else if (msg instanceof ConnectionClosed) { final ConnectionClosed cl = (ConnectionClosed) msg; @@ -75,7 +83,7 @@ public class EchoHandler extends UntypedActor { }; //#buffering - protected Procedure buffering(final int nack) { + protected Procedure buffering(final Ack nack) { return new Procedure() { private int toAck = 10; @@ -99,7 +107,7 @@ public class EchoHandler extends UntypedActor { final int ack = (Integer) msg; acknowledge(ack); - if (ack >= nack) { + if (ack >= nack.ack) { // otherwise it was the ack of the last successful write if (storage.isEmpty()) { @@ -216,12 +224,12 @@ public class EchoHandler extends UntypedActor { protected void writeAll() { int i = 0; for (ByteString data : storage) { - connection.tell(TcpMessage.write(data, storageOffset + i++), getSelf()); + connection.tell(TcpMessage.write(data, new Ack(storageOffset + i++)), getSelf()); } } protected void writeFirst() { - connection.tell(TcpMessage.write(storage.peek(), storageOffset), getSelf()); + connection.tell(TcpMessage.write(storage.peek(), new Ack(storageOffset)), getSelf()); } //#storage-omitted diff --git a/akka-docs/rst/java/code/docs/io/japi/SimpleEchoHandler.java b/akka-docs/rst/java/code/docs/io/japi/SimpleEchoHandler.java index 980e2f4768..64092de806 100644 --- a/akka-docs/rst/java/code/docs/io/japi/SimpleEchoHandler.java +++ b/akka-docs/rst/java/code/docs/io/japi/SimpleEchoHandler.java @@ -13,6 +13,7 @@ import akka.actor.UntypedActor; import akka.event.Logging; import akka.event.LoggingAdapter; import akka.io.Tcp.ConnectionClosed; +import akka.io.Tcp.Event; import akka.io.Tcp.Received; import akka.io.TcpMessage; import akka.japi.Procedure; @@ -85,7 +86,7 @@ public class SimpleEchoHandler extends UntypedActor { private boolean suspended = false; private boolean closing = false; - private final Object ACK = new Object(); + private final Event ACK = new Event() {}; //#simple-helpers protected void buffer(ByteString data) { diff --git a/akka-docs/rst/java/code/docs/io/japi/SslDocTest.java b/akka-docs/rst/java/code/docs/io/japi/SslDocTest.java index 956507a323..e1d75faa87 100644 --- a/akka-docs/rst/java/code/docs/io/japi/SslDocTest.java +++ b/akka-docs/rst/java/code/docs/io/japi/SslDocTest.java @@ -21,8 +21,13 @@ import akka.actor.UntypedActor; import akka.event.Logging; import akka.event.LoggingAdapter; import akka.io.AbstractPipelineContext; +import akka.io.BackpressureBuffer; +import akka.io.DelimiterFraming; import akka.io.HasLogging; +import akka.io.PipelineStage; +import static akka.io.PipelineStage.sequence; import akka.io.SslTlsSupport; +import akka.io.StringByteStringAdapter; import akka.io.Tcp; import akka.io.Tcp.Bound; import akka.io.Tcp.Command; @@ -33,6 +38,8 @@ import akka.io.Tcp.Received; import akka.io.TcpMessage; import akka.io.TcpPipelineHandler; import akka.io.TcpPipelineHandler.Init; +import akka.io.TcpPipelineHandler.WithinActorContext; +import akka.io.TcpReadWriteAdapter; import akka.io.ssl.SslTlsSupportSpec; import akka.testkit.AkkaSpec; import akka.testkit.JavaTestKit; @@ -60,14 +67,8 @@ public class SslDocTest { .tell(TcpMessage.connect(remote), getSelf()); } - class Context extends AbstractPipelineContext implements HasLogging { - @Override - public LoggingAdapter getLogger() { - return log; - } - } - - Init init = null; + // this will hold the pipeline handler’s context + Init init = null; @Override public void onReceive(Object msg) { @@ -79,33 +80,30 @@ public class SslDocTest { final SSLEngine engine = sslContext.createSSLEngine( remote.getHostName(), remote.getPort()); engine.setUseClientMode(true); - final SslTlsSupport ssl = new SslTlsSupport(engine); - // set up the context for communicating with TcpPipelineHandler - init = new Init(ssl) { - @Override - public HasLogging makeContext(ActorContext ctx) { - return new Context(); - } - }; + // build pipeline and set up context for communicating with TcpPipelineHandler + init = TcpPipelineHandler.withLogger(log, sequence(sequence(sequence(sequence( + new StringByteStringAdapter("utf-8"), + new DelimiterFraming(1024, ByteString.fromString("\n"), true)), + new TcpReadWriteAdapter()), + new SslTlsSupport(engine)), + new BackpressureBuffer(1000, 10000, 1000000))); + // create handler for pipeline, setting ourselves as payload recipient final ActorRef handler = getContext().actorOf( TcpPipelineHandler.create(init, getSender(), getSelf())); // register the SSL handler with the connection getSender().tell(TcpMessage.register(handler), getSelf()); + // and send a message across the SSL channel - handler.tell( - init.command(TcpMessage.write(ByteString.fromString("hello"))), - getSelf()); + handler.tell(init.command("hello\n"), getSelf()); } else if (msg instanceof Init.Event) { // unwrap TcpPipelineHandler’s event into a Tcp.Event - final Event recv = init.event(msg); - if (recv instanceof Received) { - // and inform someone of the received payload - listener.tell(((Received) recv).data().utf8String(), getSelf()); - } + final String recv = init.event(msg); + // and inform someone of the received payload + listener.tell(recv, getSelf()); } } } @@ -130,14 +128,8 @@ public class SslDocTest { getSelf()); } - class Context extends AbstractPipelineContext implements HasLogging { - @Override - public LoggingAdapter getLogger() { - return log; - } - } - - Init init = null; + // this will hold the pipeline handler’s context + Init init = null; @Override public void onReceive(Object msg) { @@ -153,15 +145,15 @@ public class SslDocTest { final SSLEngine engine = sslContext.createSSLEngine( remote.getHostName(), remote.getPort()); engine.setUseClientMode(false); - final SslTlsSupport ssl = new SslTlsSupport(engine); - - // set up the context for communicating with TcpPipelineHandler - init = new Init(ssl) { - @Override - public HasLogging makeContext(ActorContext ctx) { - return new Context(); - } - }; + + // build pipeline and set up context for communicating with TcpPipelineHandler + init = TcpPipelineHandler.withLogger(log, sequence(sequence(sequence(sequence( + new StringByteStringAdapter("utf-8"), + new DelimiterFraming(1024, ByteString.fromString("\n"), true)), + new TcpReadWriteAdapter()), + new SslTlsSupport(engine)), + new BackpressureBuffer(1000, 10000, 1000000))); + // create handler for pipeline, setting ourselves as payload recipient final ActorRef handler = getContext().actorOf( TcpPipelineHandler.create(init, getSender(), getSelf())); @@ -171,14 +163,11 @@ public class SslDocTest { } else if (msg instanceof Init.Event) { // unwrap TcpPipelineHandler’s event to get a Tcp.Event - final Event recv = init.event(msg); - if (recv instanceof Received) { - // inform someone of the received message - listener.tell(((Received) recv).data().utf8String(), getSelf()); - // and reply (sender is the SSL handler created above) - getSender().tell(init.command( - TcpMessage.write(ByteString.fromString("world"))), getSelf()); - } + final String recv = init.event(msg); + // inform someone of the received message + listener.tell(recv, getSelf()); + // and reply (sender is the SSL handler created above) + getSender().tell(init.command("world\n"), getSelf()); } } } @@ -201,9 +190,9 @@ public class SslDocTest { assert getLastSender() == server; final ActorRef client = system.actorOf(Props.create(SslClient.class, bound.localAddress(), ctx, getRef())); - expectMsgEquals("hello"); + expectMsgEquals("hello\n"); assert getLastSender() == server; - expectMsgEquals("world"); + expectMsgEquals("world\n"); assert getLastSender() == client; } }; diff --git a/akka-docs/rst/java/io.rst b/akka-docs/rst/java/io.rst index 94f99351f1..47fba4e5aa 100644 --- a/akka-docs/rst/java/io.rst +++ b/akka-docs/rst/java/io.rst @@ -563,7 +563,7 @@ resending of all queued data: It should be noted that all writes which are currently buffered have also been sent to the connection actor upon entering this state, which means that the :class:`ResumeWriting` message is enqueued after those writes, leading to the -reception of all outstanding :class:`CommandFailre` messages (which are ignored +reception of all outstanding :class:`CommandFailed` messages (which are ignored in this state) before receiving the :class:`WritingResumed` signal. That latter message is sent by the connection actor only once the internally queued write has been fully completed, meaning that a subsequent write will not fail. This @@ -590,36 +590,55 @@ first look at the SSL server: .. includecode:: code/docs/io/japi/SslDocTest.java#server +Please refer to `the source code`_ to see all imports. + +.. _the source code: @github@/akka-docs/rst/java/code/docs/io/japi/SslDocTest.java + The actor above binds to a local port and registers itself as the handler for new connections. When a new connection comes in it will create a -:class:`javax.net.ssl.SSLEngine` (details not shown here since they vary wildly +:class:`javax.net.ssl.SSLEngine` (details not shown here since they vary widely for different setups, please refer to the JDK documentation) and wrap that in an :class:`SslTlsSupport` pipeline stage (which is included in ``akka-actor``). -This single-stage pipeline will be driven by a :class:`TcpPipelineHandler` -actor which is also included in ``akka-actor``. In order to capture the generic -command and event types consumed and emitted by that actor we need to create a -wrapper—the nested :class:`Init` class—which also provides the -:meth:`makeContext` method for creating the pipeline context needed by the -supplied pipeline. With those things bundled up all that remains is creating a + +This sample demonstrates a few more things: below the SSL pipeline stage we +have inserted a backpressure buffer which will generate a +:class:`HighWatermarkReached` event to tell the upper stages to suspend writing +(generated at 10000 buffered bytes) and a :class:`LowWatermarkReached` when +they can resume writing (when buffer empties below 1000 bytes); the buffer has +a maximum capacity of 1MB. The implementation is very similar to the NACK-based +backpressure approach presented above. Above the SSL stage comes an adapter +which extracts only the payload data from the TCP commands and events, i.e. it +speaks :class:`ByteString` above. The resulting byte streams are broken into +frames by a :class:`DelimiterFraming` stage which chops them up on newline +characters. The top-most stage then converts between :class:`String` and UTF-8 +encoded :class:`ByteString`. + +As a result the pipeline will accept simple :class:`String` commands, encode +them using UTF-8, delimit them with newlines (which are expected to be already +present in the sending direction), transform them into TCP commands and events, +encrypt them and send them off to the connection actor while buffering writes. + +This pipeline is driven by a :class:`TcpPipelineHandler` actor which is also +included in ``akka-actor``. In order to capture the generic command and event +types consumed and emitted by that actor we need to create a wrapper—the nested +:class:`Init` class—which also provides the the pipeline context needed by the +supplied pipeline; in this case we use the :meth:`withLogger` convenience +method which supplies a context that implements :class:`HasLogger` and +:class:`HasActorContext` and should be sufficient for typical pipelines. With +those things bundled up all that remains is creating a :class:`TcpPipelineHandler` and registering that one as the recipient of inbound traffic from the TCP connection. Since we instructed that handler actor to send any events which are emitted by the SSL pipeline to ourselves, we can then just wait for the reception of the -decrypted payload messages, compute a response—just ``"world"`` in this -case—and reply by sending back a ``Tcp.Write``. It should be noted that +decrypted payload messages, compute a response—just ``"world\n"`` in this +case—and reply by sending back an ``Init.Command``. It should be noted that communication with the handler wraps commands and events in the inner types of the ``init`` object in order to keep things well separated. To ease handling of such path-dependent types there exist two helper methods, namely :class:`Init.command` for creating a command and :class:`Init.event` for unwrapping an event. -.. warning:: - - The :class:`TcpPipelineHandler` does currently not handle back-pressure from - the TCP socket, i.e. it will just lose data when the kernel buffer - overflows. This will be fixed before Akka 2.2 final. - Looking at the client side we see that not much needs to be changed: .. includecode:: code/docs/io/japi/SslDocTest.java#client diff --git a/akka-docs/rst/scala/code/docs/io/EchoServer.scala b/akka-docs/rst/scala/code/docs/io/EchoServer.scala index bdefdcee6f..2523f53bcc 100644 --- a/akka-docs/rst/scala/code/docs/io/EchoServer.scala +++ b/akka-docs/rst/scala/code/docs/io/EchoServer.scala @@ -81,6 +81,8 @@ class EchoHandler(connection: ActorRef, remote: InetSocketAddress) import Tcp._ + case class Ack(offset: Int) extends Event + // sign death pact: this actor terminates when connection breaks context watch connection @@ -90,13 +92,13 @@ class EchoHandler(connection: ActorRef, remote: InetSocketAddress) //#writing def writing: Receive = { case Received(data) ⇒ - connection ! Write(data, currentOffset) + connection ! Write(data, Ack(currentOffset)) buffer(data) - case ack: Int ⇒ + case Ack(ack) ⇒ acknowledge(ack) - case CommandFailed(Write(_, ack: Int)) ⇒ + case CommandFailed(Write(_, Ack(ack))) ⇒ connection ! ResumeWriting context become buffering(ack) @@ -115,8 +117,8 @@ class EchoHandler(connection: ActorRef, remote: InetSocketAddress) case Received(data) ⇒ buffer(data) case WritingResumed ⇒ writeFirst() case PeerClosed ⇒ peerClosed = true - case ack: Int if ack < nack ⇒ acknowledge(ack) - case ack: Int ⇒ + case Ack(ack) if ack < nack ⇒ acknowledge(ack) + case Ack(ack) ⇒ acknowledge(ack) if (storage.nonEmpty) { if (toAck > 0) { @@ -148,7 +150,7 @@ class EchoHandler(connection: ActorRef, remote: InetSocketAddress) }, discardOld = false) - case ack: Int ⇒ + case Ack(ack) ⇒ acknowledge(ack) if (storage.isEmpty) context stop self } @@ -159,15 +161,15 @@ class EchoHandler(connection: ActorRef, remote: InetSocketAddress) } //#storage-omitted - var storageOffset = 0 - var storage = Vector.empty[ByteString] - var stored = 0L - var transferred = 0L + private var storageOffset = 0 + private var storage = Vector.empty[ByteString] + private var stored = 0L + private var transferred = 0L val maxStored = 100000000L val highWatermark = maxStored * 5 / 10 val lowWatermark = maxStored * 3 / 10 - var suspended = false + private var suspended = false private def currentOffset = storageOffset + storage.size @@ -207,12 +209,12 @@ class EchoHandler(connection: ActorRef, remote: InetSocketAddress) //#helpers private def writeFirst(): Unit = { - connection ! Write(storage(0), storageOffset) + connection ! Write(storage(0), Ack(storageOffset)) } private def writeAll(): Unit = { for ((data, i) ← storage.zipWithIndex) { - connection ! Write(data, storageOffset + i) + connection ! Write(data, Ack(storageOffset + i)) } } @@ -229,7 +231,7 @@ class SimpleEchoHandler(connection: ActorRef, remote: InetSocketAddress) // sign death pact: this actor terminates when connection breaks context watch connection - case object Ack + case object Ack extends Event def receive = { case Received(data) ⇒ diff --git a/akka-docs/rst/scala/io.rst b/akka-docs/rst/scala/io.rst index c530a85062..10ada05f38 100644 --- a/akka-docs/rst/scala/io.rst +++ b/akka-docs/rst/scala/io.rst @@ -583,7 +583,7 @@ resending of all queued data: It should be noted that all writes which are currently buffered have also been sent to the connection actor upon entering this state, which means that the :class:`ResumeWriting` message is enqueued after those writes, leading to the -reception of all outstanding :class:`CommandFailre` messages (which are ignored +reception of all outstanding :class:`CommandFailed` messages (which are ignored in this state) before receiving the :class:`WritingResumed` signal. That latter message is sent by the connection actor only once the internally queued write has been fully completed, meaning that a subsequent write will not fail. This @@ -609,32 +609,47 @@ This example shows the different parts described above working together: .. includecode:: ../../../akka-remote/src/test/scala/akka/io/ssl/SslTlsSupportSpec.scala#server -The actor above is meant to be registered as the inbound connection handler for -a listen socket. When a new connection comes in it will create a -:class:`javax.net.ssl.SSLEngine` (details not shown here since they vary wildly +The actor above binds to a local port and registers itself as the handler for +new connections. When a new connection comes in it will create a +:class:`javax.net.ssl.SSLEngine` (details not shown here since they vary widely for different setups, please refer to the JDK documentation) and wrap that in an :class:`SslTlsSupport` pipeline stage (which is included in ``akka-actor``). -This single-stage pipeline will be driven by a :class:`TcpPipelineHandler` -actor which is also included in ``akka-actor``. In order to capture the generic -command and event types consumed and emitted by that actor we need to create a -wrapper—the nested :class:`Init` class—which also provides the -:meth:`makeContext` method for creating the pipeline context needed by the -supplied pipeline. With those things bundled up all that remains is creating a + +This sample demonstrates a few more things: below the SSL pipeline stage we +have inserted a backpressure buffer which will generate a +:class:`HighWatermarkReached` event to tell the upper stages to suspend writing +and a :class:`LowWatermarkReached` when they can resume writing. The +implementation is very similar to the NACK-based backpressure approach +presented above. Above the SSL stage comes an adapter which extracts only the +payload data from the TCP commands and events, i.e. it speaks +:class:`ByteString` above. The resulting byte streams are broken into frames by +a :class:`DelimiterFraming` stage which chops them up on newline characters. +The top-most stage then converts between :class:`String` and UTF-8 encoded +:class:`ByteString`. + +As a result the pipeline will accept simple :class:`String` commands, encode +them using UTF-8, delimit them with newlines (which are expected to be already +present in the sending direction), transform them into TCP commands and events, +encrypt them and send them off to the connection actor while buffering writes. + +This pipeline is driven by a :class:`TcpPipelineHandler` actor which is also +included in ``akka-actor``. In order to capture the generic command and event +types consumed and emitted by that actor we need to create a wrapper—the nested +:class:`Init` class—which also provides the the pipeline context needed by the +supplied pipeline; in this case we use the :meth:`withLogger` convenience +method which supplies a context that implements :class:`HasLogger` and +:class:`HasActorContext` and should be sufficient for typical pipelines. With +those things bundled up all that remains is creating a :class:`TcpPipelineHandler` and registering that one as the recipient of -inbound traffic from the TCP connection. +inbound traffic from the TCP connection. The pipeline handler is instructed to +send the decrypted payload data to the following actor: -Since we instructed that handler actor to send any events which are emitted by -the SSL pipeline to ourselves, we can then just switch behavior to receive the -decrypted payload message, compute a response and reply by sending back a -``Tcp.Write``. It should be noted that communication with the handler wraps -commands and events in the inner types of the ``init`` object in order to keep -things well separated. +.. includecode:: ../../../akka-remote/src/test/scala/akka/io/ssl/SslTlsSupportSpec.scala#handler -.. warning:: - - The :class:`TcpPipelineHandler` does currently not handle back-pressure from - the TCP socket, i.e. it will just lose data when the kernel buffer - overflows. This will be fixed before Akka 2.2 final. +This actor computes a response and replies by sending back a :class:`String`. +It should be noted that communication with the :class:`TcpPipelineHandler` +wraps commands and events in the inner types of the ``init`` object in order to +keep things well separated. Using UDP --------- diff --git a/akka-remote/src/test/scala/akka/io/ssl/SslTlsSupportSpec.scala b/akka-remote/src/test/scala/akka/io/ssl/SslTlsSupportSpec.scala index f3323c50d6..1f2dc7a344 100644 --- a/akka-remote/src/test/scala/akka/io/ssl/SslTlsSupportSpec.scala +++ b/akka-remote/src/test/scala/akka/io/ssl/SslTlsSupportSpec.scala @@ -24,20 +24,24 @@ package akka.io.ssl -import akka.TestUtils -import akka.event.Logging -import akka.event.LoggingAdapter -import akka.io._ -import akka.remote.security.provider.AkkaProvider -import akka.testkit.{ TestProbe, AkkaSpec } -import akka.util.{ ByteString, Timeout } -import java.io.{ BufferedWriter, OutputStreamWriter, InputStreamReader, BufferedReader } +import java.io.{ BufferedReader, BufferedWriter, InputStreamReader, OutputStreamWriter } import java.net.{ InetSocketAddress, SocketException } import java.security.{ KeyStore, SecureRandom } import java.util.concurrent.atomic.AtomicInteger -import javax.net.ssl._ -import scala.concurrent.duration._ -import akka.actor.{ Props, ActorLogging, Actor, ActorContext } + +import scala.concurrent.duration.DurationInt + +import akka.TestUtils +import akka.actor.{ Actor, ActorLogging, ActorRef, Props, Terminated } +import akka.event.{ Logging, LoggingAdapter } +import akka.io.{ BackpressureBuffer, DelimiterFraming, IO, SslTlsSupport, StringByteStringAdapter, Tcp } +import akka.io.TcpPipelineHandler +import akka.io.TcpPipelineHandler.{ Init, Management, WithinActorContext } +import akka.io.TcpReadWriteAdapter +import akka.remote.security.provider.AkkaProvider +import akka.testkit.{ AkkaSpec, TestProbe } +import akka.util.{ ByteString, Timeout } +import javax.net.ssl.{ KeyManagerFactory, SSLContext, SSLServerSocket, SSLSocket, TrustManagerFactory } // TODO move this into akka-actor once AkkaProvider for SecureRandom does not have external dependencies class SslTlsSupportSpec extends AkkaSpec { @@ -66,26 +70,26 @@ class SslTlsSupportSpec extends AkkaSpec { "work between a Java client and a akka server" in { val serverAddress = TestUtils.temporaryServerAddress() - val bindHandler = system.actorOf(Props(classOf[AkkaSslServer], this)) val probe = TestProbe() - probe.send(IO(Tcp), Tcp.Bind(bindHandler, serverAddress)) - probe.expectMsgType[Tcp.Bound] + val bindHandler = probe.watch(system.actorOf(Props(new AkkaSslServer(serverAddress)), "server1")) + expectMsg(Tcp.Bound) val client = new JavaSslClient(serverAddress) client.run() client.close() + probe.expectTerminated(bindHandler) } "work between a akka client and a akka server" in { val serverAddress = TestUtils.temporaryServerAddress() - val bindHandler = system.actorOf(Props(classOf[AkkaSslServer], this)) val probe = TestProbe() - probe.send(IO(Tcp), Tcp.Bind(bindHandler, serverAddress)) - probe.expectMsgType[Tcp.Bound] + val bindHandler = probe.watch(system.actorOf(Props(new AkkaSslServer(serverAddress)), "server2")) + expectMsg(Tcp.Bound) val client = new AkkaSslClient(serverAddress) client.run() client.close() + probe.expectTerminated(bindHandler) } } @@ -99,15 +103,11 @@ class SslTlsSupportSpec extends AkkaSpec { val connected = probe.expectMsgType[Tcp.Connected] val connection = probe.sender - val init = new TcpPipelineHandler.Init( + val init = TcpPipelineHandler.withLogger(system.log, new StringByteStringAdapter >> new DelimiterFraming(maxSize = 1024, delimiter = ByteString('\n'), includeDelimiter = true) >> new TcpReadWriteAdapter >> - new SslTlsSupport(sslEngine(connected.remoteAddress, client = true))) { - override def makeContext(actorContext: ActorContext): HasLogging = new HasLogging { - override def getLogger = system.log - } - } + new SslTlsSupport(sslEngine(connected.remoteAddress, client = true))) import init._ @@ -129,60 +129,75 @@ class SslTlsSupportSpec extends AkkaSpec { } def close() { - probe.send(handler, Tcp.Close) - probe.expectMsgType[Tcp.Event] match { - case _: Tcp.ConnectionClosed ⇒ true - } + probe.send(handler, Management(Tcp.Close)) + probe.expectMsgType[Tcp.ConnectionClosed] TestUtils.verifyActorTermination(handler) } } //#server - class AkkaSslServer extends Actor with ActorLogging { + class AkkaSslServer(local: InetSocketAddress) extends Actor with ActorLogging { - import Tcp.Connected + import Tcp._ + + implicit def system = context.system + IO(Tcp) ! Bind(self, local) def receive: Receive = { + case _: Bound ⇒ + context.become(bound(sender)) + //#server + testActor ! Bound + //#server + } + + def bound(listener: ActorRef): Receive = { case Connected(remote, _) ⇒ - val init = - new TcpPipelineHandler.Init( - new StringByteStringAdapter >> - new DelimiterFraming(maxSize = 1024, delimiter = ByteString('\n'), - includeDelimiter = true) >> - new TcpReadWriteAdapter >> - new SslTlsSupport(sslEngine(remote, client = false))) { - /* - * When creating an `Init` the abstract `makeContext` method needs to be - * implemented. If the type of the returned context does not satisfy the - * requirements of all pipeline stages, then you’ll get an error that - * `makeContext` has an incompatible type. - */ - override def makeContext(actorContext: ActorContext): HasLogging = - new HasLogging { - override def getLogger = log - } - } - import init._ + val init = TcpPipelineHandler.withLogger(log, + new StringByteStringAdapter("utf-8") >> + new DelimiterFraming(maxSize = 1024, delimiter = ByteString('\n'), + includeDelimiter = true) >> + new TcpReadWriteAdapter >> + new SslTlsSupport(sslEngine(remote, client = false)) >> + new BackpressureBuffer(lowWatermark = 1000, highWatermark = 10000, + maxCapacity = 1000000)) val connection = sender - val handler = system.actorOf( - TcpPipelineHandler(init, sender, self), "server" + counter.incrementAndGet()) + val handler = context.actorOf(Props(new AkkaSslHandler(init))) + //#server + context watch handler + //#server + val pipeline = context.actorOf(TcpPipelineHandler(init, sender, handler)) - connection ! Tcp.Register(handler) - - context become { - case Event(data) ⇒ - val input = data.dropRight(1) - log.debug("akka-io Server received {} from {}", input, sender) - val response = serverResponse(input) - sender ! Command(response) - log.debug("akka-io Server sent: {}", response.dropRight(1)) + connection ! Tcp.Register(pipeline) + //#server + case _: Terminated ⇒ + listener ! Unbind + context.become { + case Unbound ⇒ context stop self } + //#server } } //#server + //#handler + class AkkaSslHandler(init: Init[WithinActorContext, String, String]) + extends Actor with ActorLogging { + + def receive = { + case init.Event(data) ⇒ + val input = data.dropRight(1) + log.debug("akka-io Server received {} from {}", input, sender) + val response = serverResponse(input) + sender ! init.Command(response) + log.debug("akka-io Server sent: {}", response.dropRight(1)) + case Tcp.PeerClosed ⇒ context.stop(self) + } + } + //#handler + class JavaSslServer extends Thread { val log: LoggingAdapter = Logging(system, getClass) val address = TestUtils.temporaryServerAddress()