!act #3812: Remove Pipelines

This commit is contained in:
Endre Sándor Varga 2014-01-16 22:06:24 +01:00
parent 8d2bc2bc40
commit 293dd0b9d2
31 changed files with 12 additions and 4400 deletions

View file

@ -1,254 +0,0 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.io
import java.net.InetSocketAddress
import java.security.MessageDigest
import scala.concurrent.Await
import scala.concurrent.duration.{ Duration, DurationInt }
import scala.concurrent.forkjoin.ThreadLocalRandom
import scala.util.control.NonFatal
import akka.actor.{ Actor, ActorLogging, ActorRef, Props, ReceiveTimeout, Stash, Terminated }
import akka.io.TcpPipelineHandler.{ Init, Management, WithinActorContext }
import akka.pattern.ask
import akka.testkit.{ AkkaSpec, ImplicitSender }
import akka.util.{ ByteString, Timeout }
import akka.actor.Deploy
object BackpressureSpec {
final val ChunkSize = 1024
case class StartSending(n: Int)
case class Done(hash: ByteString)
case object Failed
case object Close
case object GetStatus
case class SenderStatus(restarted: Throwable, sent: Int, buffering: Boolean)
class Sender(receiver: InetSocketAddress) extends Actor with Stash with ActorLogging {
val digest = MessageDigest.getInstance("SHA-1")
digest.reset()
import context.system
IO(Tcp) ! Tcp.Connect(receiver)
var restarted: Throwable = _
var sent = 0
var buffering = false
override def postRestart(thr: Throwable): Unit = {
restarted = thr
context.stop(self)
}
def receive = {
case _: Tcp.Connected
val init = TcpPipelineHandler.withLogger(log,
new TcpReadWriteAdapter >>
new BackpressureBuffer(10000, 1000000, Long.MaxValue))
val handler = context.actorOf(TcpPipelineHandler.props(init, sender, self).withDeploy(Deploy.local), "pipeline")
sender ! Tcp.Register(handler)
unstashAll()
context.become(connected(init, handler))
case _: Tcp.CommandFailed
unstashAll()
context.become(failed)
case _ stash()
}
def connected(init: Init[WithinActorContext, ByteString, ByteString], connection: ActorRef): Receive = {
case StartSending(0) sender ! Done(ByteString(digest.digest()))
case StartSending(n)
val rnd = ThreadLocalRandom.current
val data = Array.tabulate[Byte](ChunkSize)(_ rnd.nextInt().toByte)
digest.update(data)
connection ! init.Command(ByteString(data))
self forward StartSending(n - 1)
sent += 1
case BackpressureBuffer.HighWatermarkReached
context.setReceiveTimeout(5.seconds)
buffering = true
context.become({
case BackpressureBuffer.LowWatermarkReached
unstashAll()
context.setReceiveTimeout(Duration.Undefined)
buffering = false
context.unbecome()
case ReceiveTimeout
log.error("receive timeout while throttled")
context.stop(self)
case _: StartSending stash()
}, discardOld = false)
case ReceiveTimeout // that old cancellation race
case Close connection ! Management(Tcp.Close)
case Tcp.Closed context.stop(self)
}
override def unhandled(msg: Any): Unit = msg match {
case GetStatus sender ! SenderStatus(restarted, sent, buffering)
}
val failed: Receive = {
case _ sender ! Failed
}
}
case object GetPort
case class Port(p: Int)
case object GetProgress
case class Progress(n: Int)
case object GetHash
case class Hash(hash: ByteString)
case class ReceiverStatus(received: Long, hiccupStarted: Int, hiccupEnded: Int)
class Receiver(hiccups: Boolean) extends Actor with Stash with ActorLogging {
val digest = MessageDigest.getInstance("SHA-1")
digest.reset()
import context.system
IO(Tcp) ! Tcp.Bind(self, new InetSocketAddress("localhost", 0))
var listener: ActorRef = _
var received = 0L
var hiccupStarted = 0
var hiccupEnded = 0
override def postRestart(thr: Throwable): Unit = {
context.stop(self)
}
def receive = {
case Tcp.Bound(local)
listener = sender
unstashAll()
context.become(bound(local.getPort))
case _: Tcp.CommandFailed
unstashAll()
context.become(failed)
case _ stash()
}
def bound(port: Int): Receive = {
case GetPort sender ! Port(port)
case Tcp.Connected(local, remote)
val init = TcpPipelineHandler.withLogger(log,
new TcpReadWriteAdapter >>
new BackpressureBuffer(10000, 1000000, Long.MaxValue))
val handler = context.actorOf(TcpPipelineHandler.props(init, sender, self).withDeploy(Deploy.local), "pipeline")
sender ! Tcp.Register(handler)
unstashAll()
context.become(connected(init, handler))
case _ stash()
}
def connected(init: Init[WithinActorContext, ByteString, ByteString], connection: ActorRef): Receive = {
case init.Event(data)
digest.update(data.toArray)
received += data.length
if (hiccups && hiccupStarted == hiccupEnded && ThreadLocalRandom.current.nextInt(1000) == 0) {
connection ! Management(Tcp.SuspendReading)
import context.dispatcher
system.scheduler.scheduleOnce(100.millis, self, Management(Tcp.ResumeReading))
hiccupStarted += 1
}
case m: Management
hiccupEnded += 1
connection ! m
case GetProgress
sender ! Progress((received / ChunkSize).toInt)
case GetHash
sender ! Hash(ByteString(digest.digest()))
case Tcp.PeerClosed
listener ! Tcp.Unbind
context.become {
case Tcp.Unbound context.stop(self)
case _: Management
}
}
override def unhandled(msg: Any): Unit = msg match {
case GetStatus sender ! ReceiverStatus(received, hiccupStarted, hiccupEnded)
}
val failed: Receive = {
case _ sender ! Failed
}
}
}
class BackpressureSpec extends AkkaSpec("akka.actor.serialize-creators=on") with ImplicitSender {
import BackpressureSpec._
"A BackpressureBuffer" must {
"transmit the right bytes" in {
val N = 100000
val recv = watch(system.actorOf(Props(classOf[Receiver], false), "receiver1"))
recv ! GetPort
val port = expectMsgType[Port].p
val send = watch(system.actorOf(Props(classOf[Sender], new InetSocketAddress("localhost", port)), "sender1"))
try {
within(20.seconds) {
send ! StartSending(N)
val hash = expectMsgType[Done].hash
implicit val t = Timeout(100.millis)
awaitAssert(Await.result(recv ? GetProgress, t.duration) should equal(Progress(N)))
recv ! GetHash
expectMsgType[Hash].hash should equal(hash)
}
} catch {
case NonFatal(e)
system.log.error(e, "timeout")
send ! GetStatus
println(expectMsgType[SenderStatus])
recv ! GetStatus
println(expectMsgType[ReceiverStatus])
throw e
}
send ! Close
val terminated = receiveWhile(1.second, messages = 2) {
case Terminated(t) t
}
terminated.toSet should equal(Set(send, recv))
}
"transmit the right bytes with hiccups" in {
val N = 100000
val recv = watch(system.actorOf(Props(classOf[Receiver], true), "receiver2"))
recv ! GetPort
val port = expectMsgType[Port].p
val send = watch(system.actorOf(Props(classOf[Sender], new InetSocketAddress("localhost", port)), "sender2"))
try {
within(20.seconds) {
send ! StartSending(N)
val hash = expectMsgType[Done].hash
implicit val t = Timeout(100.millis)
awaitAssert(Await.result(recv ? GetProgress, t.duration) should equal(Progress(N)))
recv ! GetHash
expectMsgType[Hash].hash should equal(hash)
}
} catch {
case NonFatal(e)
system.log.error(e, "timeout")
send ! GetStatus
println(expectMsgType[SenderStatus])
recv ! GetStatus
println(expectMsgType[ReceiverStatus])
throw e
}
send ! Close
val terminated = receiveWhile(1.second, messages = 2) {
case Terminated(t) t
}
terminated.toSet should equal(Set(send, recv))
}
}
}

View file

@ -1,149 +0,0 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.io
import akka.testkit.{ TestProbe, AkkaSpec }
import java.net.InetSocketAddress
import akka.util.ByteString
import akka.actor.{ Props, ActorLogging, Actor }
import akka.TestUtils
import java.util.concurrent.atomic.AtomicInteger
import scala.concurrent.duration._
import akka.io.TcpPipelineHandler.Management
import akka.actor.ActorRef
import akka.actor.Deploy
object DelimiterFramingSpec {
case class Listener(ref: ActorRef)
}
class DelimiterFramingSpec extends AkkaSpec("akka.actor.serialize-creators = on") {
import DelimiterFramingSpec._
val addresses = TestUtils.temporaryServerAddresses(4)
"DelimiterFramingSpec" must {
"send and receive delimiter based frames correctly (one byte delimiter, exclude)" in {
testSetup(serverAddress = addresses(0), delimiter = "\n", includeDelimiter = false)
}
"send and receive delimiter based frames correctly (multi-byte delimiter, exclude)" in {
testSetup(serverAddress = addresses(1), delimiter = "DELIMITER", includeDelimiter = false)
}
"send and receive delimiter based frames correctly (one byte delimiter, include)" in {
testSetup(serverAddress = addresses(2), delimiter = "\n", includeDelimiter = true)
}
"send and receive delimiter based frames correctly (multi-byte delimiter, include)" in {
testSetup(serverAddress = addresses(3), delimiter = "DELIMITER", includeDelimiter = true)
}
}
val counter = new AtomicInteger
def testSetup(serverAddress: InetSocketAddress, delimiter: String, includeDelimiter: Boolean): Unit = {
val bindHandler = system.actorOf(Props(classOf[AkkaLineEchoServer], this, delimiter, includeDelimiter).withDeploy(Deploy.local))
val probe = TestProbe()
probe.send(IO(Tcp), Tcp.Bind(bindHandler, serverAddress))
probe.expectMsgType[Tcp.Bound]
bindHandler ! Listener(probe.lastSender)
val client = new AkkaLineClient(serverAddress, delimiter, includeDelimiter)
client.run()
client.close()
}
class AkkaLineClient(address: InetSocketAddress, delimiter: String, includeDelimiter: Boolean) {
val expectedDelimiter = if (includeDelimiter) delimiter else ""
val probe = TestProbe()
probe.send(IO(Tcp), Tcp.Connect(address))
val connected = probe.expectMsgType[Tcp.Connected]
val connection = probe.sender
val init = TcpPipelineHandler.withLogger(system.log,
new StringByteStringAdapter >>
new DelimiterFraming(maxSize = 1024, delimiter = ByteString(delimiter), includeDelimiter = includeDelimiter) >>
new TcpReadWriteAdapter)
import init._
val handler = system.actorOf(TcpPipelineHandler.props(init, connection, probe.ref).withDeploy(Deploy.local),
"client" + counter.incrementAndGet())
probe.send(connection, Tcp.Register(handler))
def run() {
probe.send(handler, Command(s"testone$delimiter"))
probe.expectMsg(Event(s"testone$expectedDelimiter"))
probe.send(handler, Command(s"two${delimiter}thr"))
probe.expectMsg(Event(s"two$expectedDelimiter"))
probe.expectNoMsg(1.seconds)
probe.send(handler, Command(s"ee$delimiter"))
probe.expectMsg(Event(s"three$expectedDelimiter"))
if (delimiter.size > 1) {
val (first, second) = delimiter.splitAt(1)
// Test a fragmented delimiter
probe.send(handler, Command(s"four$first"))
probe.expectNoMsg(1.seconds)
probe.send(handler, Command(second))
probe.expectMsg(Event(s"four$expectedDelimiter"))
// Test cases of false match on a delimiter fragment
for (piece s"${first}five${first}$delimiter") {
probe.expectNoMsg(100.milliseconds)
probe.send(handler, Command(String.valueOf(piece)))
}
probe.expectMsg(Event(s"${first}five${first}$expectedDelimiter"))
}
probe.send(handler, Command(s"${delimiter}${delimiter}"))
probe.expectMsg(Event(expectedDelimiter))
probe.expectMsg(Event(expectedDelimiter))
}
def close() {
probe.send(handler, Management(Tcp.Close))
probe.expectMsgType[Tcp.ConnectionClosed]
TestUtils.verifyActorTermination(handler)
}
}
class AkkaLineEchoServer(delimiter: String, includeDelimiter: Boolean) extends Actor with ActorLogging {
import Tcp.Connected
var listener: ActorRef = _
def receive: Receive = {
case Listener(ref) listener = ref
case Connected(remote, _)
val init =
TcpPipelineHandler.withLogger(log,
new StringByteStringAdapter >>
new DelimiterFraming(maxSize = 1024, delimiter = ByteString(delimiter), includeDelimiter = includeDelimiter) >>
new TcpReadWriteAdapter)
import init._
val connection = sender
val handler = context.actorOf(TcpPipelineHandler.props(init, sender, self).withDeploy(Deploy.local), "pipeline")
connection ! Tcp.Register(handler)
context become {
case Event(data)
if (includeDelimiter) sender ! Command(data)
else sender ! Command(data + delimiter)
case Tcp.PeerClosed listener ! Tcp.Unbind
case Tcp.Unbound context.stop(self)
}
}
}
}

View file

@ -1,222 +0,0 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.io
import scala.annotation.tailrec
import scala.concurrent.forkjoin.ThreadLocalRandom
import scala.util.Success
import akka.testkit.AkkaSpec
import akka.util.ByteString
class PipelineSpec extends AkkaSpec("akka.actor.serialize-creators = on") {
trait Level1
trait Level2
trait Level3
trait Level4
trait LevelFactory[Lvl] {
def msgA: Lvl
def msgB: Lvl
}
implicit object Level1 extends LevelFactory[Level1] {
object msgA extends Level1 { override def toString = "Lvl1msgA" }
object msgB extends Level1 { override def toString = "Lvl1msgB" }
}
implicit object Level2 extends LevelFactory[Level2] {
object msgA extends Level2 { override def toString = "Lvl2msgA" }
object msgB extends Level2 { override def toString = "Lvl2msgB" }
}
implicit object Level3 extends LevelFactory[Level3] {
object msgA extends Level3 { override def toString = "Lvl3msgA" }
object msgB extends Level3 { override def toString = "Lvl3msgB" }
}
implicit object Level4 extends LevelFactory[Level4] {
object msgA extends Level4 { override def toString = "Lvl4msgA" }
object msgB extends Level4 { override def toString = "Lvl4msgB" }
}
val ctx = new PipelineContext {}
"A Pipeline" must {
"be correctly evaluated if single stage" in {
val PipelinePorts(cmd, evt, _) =
PipelineFactory.buildFunctionTriple(ctx, stage[Level2, Level1](1, 0, false))
cmd(Level2.msgA) should be(Nil -> Seq(Level1.msgA))
evt(Level1.msgA) should be(Seq(Level2.msgA) -> Nil)
cmd(Level2.msgB) should be(Nil -> Seq(Level1.msgB))
evt(Level1.msgB) should be(Seq(Level2.msgB) -> Nil)
}
"be correctly evaluated when two combined" in {
val stage1 = stage[Level3, Level2](1, 0, false)
val stage2 = stage[Level2, Level1](1, 0, false)
val PipelinePorts(cmd, evt, _) = PipelineFactory.buildFunctionTriple(ctx, stage1 >> stage2)
cmd(Level3.msgA) should be(Nil -> Seq(Level1.msgA))
evt(Level1.msgA) should be(Seq(Level3.msgA) -> Nil)
cmd(Level3.msgB) should be(Nil -> Seq(Level1.msgB))
evt(Level1.msgB) should be(Seq(Level3.msgB) -> Nil)
}
"be correctly evaluated when three combined" in {
val stage1 = stage[Level4, Level3](1, 0, false)
val stage2 = stage[Level3, Level2](2, 0, false)
val stage3 = stage[Level2, Level1](1, 0, false)
val PipelinePorts(cmd, evt, _) = PipelineFactory.buildFunctionTriple(ctx, stage1 >> stage2 >> stage3)
cmd(Level4.msgA) should be(Nil -> Seq(Level1.msgA, Level1.msgA))
evt(Level1.msgA) should be(Seq(Level4.msgA, Level4.msgA) -> Nil)
cmd(Level4.msgB) should be(Nil -> Seq(Level1.msgB, Level1.msgB))
evt(Level1.msgB) should be(Seq(Level4.msgB, Level4.msgB) -> Nil)
}
"be correctly evaluated with back-scatter" in {
val stage1 = stage[Level4, Level3](1, 0, true)
val stage2 = stage[Level3, Level2](1, 1, true)
val stage3 = stage[Level2, Level1](1, 0, false)
val PipelinePorts(cmd, evt, _) = PipelineFactory.buildFunctionTriple(ctx, stage1 >> stage2 >> stage3)
cmd(Level4.msgA) should be(Seq(Level4.msgB) -> Seq(Level1.msgA))
evt(Level1.msgA) should be(Seq(Level4.msgA) -> Seq(Level1.msgB))
}
"handle management commands" in {
val stage1 = stage[Level4, Level3](1, 0, true, { case "doit" Seq(Left(Level4.msgA), Right(Level3.msgA)) })
val stage2 = stage[Level3, Level2](2, 0, true, { case "doit" Seq(Left(Level3.msgA), Right(Level2.msgA)) })
val stage3 = stage[Level2, Level1](1, 0, true, { case "doit" Seq(Left(Level2.msgA), Right(Level1.msgA)) })
val PipelinePorts(cmd, evt, mgmt) = PipelineFactory.buildFunctionTriple(ctx, stage1 >> stage2 >> stage3)
mgmt(42: java.lang.Integer) should be(Seq() -> Seq())
val (events, commands) = mgmt("doit")
events should have size 4
events count (_ == Level4.msgA) should equal(3)
events count (_ == Level4.msgB) should equal(1)
commands should have size 4
commands count (_ == Level1.msgA) should equal(3)
commands count (_ == Level1.msgB) should equal(1)
}
}
def stage[Above: LevelFactory, Below: LevelFactory](forward: Int, backward: Int, invert: Boolean,
mgmt: SymmetricPipePair[Above, Below]#Mgmt = PartialFunction.empty) =
new SymmetricPipelineStage[PipelineContext, Above, Below] {
override def apply(ctx: PipelineContext) = {
val above = implicitly[LevelFactory[Above]]
val below = implicitly[LevelFactory[Below]]
PipePairFactory(
{ a
val msgA = a == above.msgA
val msgAbove = if (invert ^ msgA) above.msgA else above.msgB
val msgBelow = if (invert ^ msgA) below.msgA else below.msgB
(for (_ 1 to forward) yield Right(msgBelow)) ++ (for (_ 1 to backward) yield Left(msgAbove))
},
{ b
val msgA = b == below.msgA
val msgAbove = if (invert ^ msgA) above.msgA else above.msgB
val msgBelow = if (invert ^ msgA) below.msgA else below.msgB
(for (_ 1 to forward) yield Left(msgAbove)) ++ (for (_ 1 to backward) yield Right(msgBelow))
},
mgmt)
}
}
}
object PipelineBench extends App {
val frame = new LengthFieldFrame(32000)
val frames = frame >> frame >> frame >> frame
val ctx = new PipelineContext {}
// this way of creating a pipeline is not user API
val pipe = frames(ctx)
val hello = ByteString("hello")
// ctx.dealias is only necessary because this is a raw pipe, not user API
val bytes = ctx.dealias(pipe.commandPipeline(ByteString("hello"))).head.fold(identity, identity).compact
println(bytes)
println(pipe.eventPipeline(bytes))
class Bytes {
var pos = 0
var emitted = 0
def get(): ByteString = {
val r = ThreadLocalRandom.current()
val l = r.nextInt(2 * bytes.length)
@tailrec def rec(left: Int, acc: ByteString): ByteString = {
if (pos + left <= bytes.length) {
val result = acc ++ bytes.slice(pos, pos + left)
pos = (pos + left) % bytes.length
result
} else {
val oldpos = pos
pos = 0
rec(left - bytes.length + oldpos, acc ++ bytes.slice(oldpos, bytes.length))
}
}
emitted += l
rec(l, ByteString.empty)
}
}
println("warming up")
val bpp = new Bytes
{
println(" ... PipePair")
val y = for (_ 1 to 500000; x ctx.dealias(pipe.eventPipeline(bpp.get()))) yield x
assert(y forall { case Left(b) b == ByteString("hello"); case _ false })
assert(y.size == bpp.emitted / bytes.length)
}
val PipelinePorts(_, evt, _) = PipelineFactory.buildFunctionTriple(ctx, frames)
val bft = new Bytes
{
println(" ... FunctionTriple")
val y = for (_ 1 to 500000; x evt(bft.get())._1) yield x
assert(y forall (_ == ByteString("hello")))
assert(y.size == bft.emitted / bytes.length)
}
var injected = 0
val inj = PipelineFactory.buildWithSinkFunctions(ctx, frames)(_ Nil, { case Success(bs) if bs == hello injected += 1 })
val bij = new Bytes
{
println(" ... Injector")
for (_ 1 to 500000) inj.injectEvent(bij.get())
assert(injected == bij.emitted / bytes.length)
}
val N = 1000000
{
val start = System.nanoTime
val y = for (_ 1 to N; x ctx.dealias(pipe.eventPipeline(bpp.get()))) yield x
val time = System.nanoTime - start
println(s"PipePair: 1 iteration took ${time / N}ns (${y.size})")
}
{
val start = System.nanoTime
val y = for (_ 1 to N; x evt(bft.get())._1) yield x
val time = System.nanoTime - start
println(s"FunctionTriple: 1 iteration took ${time / N}ns (${y.size})")
}
{
injected = 0
val start = System.nanoTime
for (_ 1 to N) inj.injectEvent(bij.get())
val time = System.nanoTime - start
println(s"Injector: 1 iteration took ${time / N}ns ($injected)")
}
}

View file

@ -13,12 +13,6 @@ import akka.event.Logging
/** /**
* Entry point to Akkas IO layer. * Entry point to Akkas IO layer.
* *
* <b>All contents of the `akka.io` package is marked experimental.</b>
*
* This marker signifies that APIs may still change in response to user feedback
* through-out the 2.2 release cycle. The implementation itself is considered
* stable and ready for production use.
*
* @see <a href="http://doc.akka.io/">the Akka online documentation</a> * @see <a href="http://doc.akka.io/">the Akka online documentation</a>
*/ */
object IO { object IO {

File diff suppressed because it is too large Load diff

View file

@ -1,267 +0,0 @@
/**
* Copyright (C) 2013 Typesafe Inc. <http://www.typesafe.com>
*/
// adapted from
// https://github.com/spray/spray/blob/eef5c4f54a0cadaf9e98298faf5b337f9adc04bb/spray-io/src/main/scala/spray/io/SslTlsSupport.scala
// original copyright notice follows:
/*
* Copyright (C) 2011-2013 spray.io
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package akka.io
import java.nio.ByteBuffer
import javax.net.ssl.{ SSLContext, SSLException, SSLEngineResult, SSLEngine }
import javax.net.ssl.SSLEngineResult.HandshakeStatus._
import javax.net.ssl.SSLEngineResult.Status._
import scala.collection.immutable
import scala.annotation.tailrec
import akka.util.ByteString
import Tcp.{ Command, Event }
object SslTlsSupport {
// we are using Nettys default values:
// 16665 + 1024 (room for compressed data) + 1024 (for OpenJDK compatibility)
private final val MaxPacketSize = 16665 + 2048
private final val EmptyByteArray = new Array[Byte](0)
}
/**
* This pipeline stage implements SSL / TLS support, using an externally
* configured [[javax.net.ssl.SSLEngine]]. It operates on the level of [[Tcp.Event]] and
* [[Tcp.Command]] messages, which means that it will typically be one of
* the lowest stages in a protocol stack. Since SSLEngine relies on contiguous
* transmission of a data stream you will need to handle backpressure from
* the TCP connection actor, for example by using a [[BackpressureBuffer]]
* underneath the SSL stage.
*
* Each instance of this stage has a scratch [[ByteBuffer]] of approx. 18kiB
* allocated which is used by the SSLEngine.
*
* One thing to keep in mind is that there's no support for half-closed connections
* in SSL (but SSL on the other side requires half-closed connections from its transport
* layer).
*
* This means:
* 1. keepOpenOnPeerClosed is not supported on top of SSL (once you receive PeerClosed
* the connection is closed, further CloseCommands are ignored)
* 2. keepOpenOnPeerClosed should always be enabled on the transport layer beneath SSL so
* that one can wait for the other side's SSL level close_notify message without barfing
* RST to the peer because this socket is already gone
*
*/
class SslTlsSupport(engine: SSLEngine) extends PipelineStage[HasLogging, Command, Command, Event, Event] {
override def apply(ctx: HasLogging) =
new PipePair[Command, Command, Event, Event] {
var pendingSends = immutable.Queue.empty[Send]
var inboundReceptacle: ByteBuffer = _ // holds incoming data that are too small to be decrypted yet
val log = ctx.getLogger
// TODO: should this be a ThreadLocal?
val tempBuf = ByteBuffer.allocate(SslTlsSupport.MaxPacketSize)
var originalCloseCommand: Tcp.CloseCommand = _
override val commandPipeline = (cmd: Command) cmd match {
case x: Tcp.Write
if (pendingSends.isEmpty) encrypt(Send(x))
else {
pendingSends = pendingSends enqueue Send(x)
Nil
}
case x @ (Tcp.Close | Tcp.ConfirmedClose)
originalCloseCommand = x.asInstanceOf[Tcp.CloseCommand]
log.debug("Closing SSLEngine due to reception of [{}]", x)
engine.closeOutbound()
// don't send close command to network here, it's the job of the SSL engine
// to shutdown the connection when getting CLOSED in encrypt
closeEngine()
case x: Tcp.WriteCommand
throw new IllegalArgumentException(
"SslTlsSupport doesn't support Tcp.WriteCommands of type " + x.getClass.getSimpleName)
case cmd ctx.singleCommand(cmd)
}
val eventPipeline = (evt: Event) evt match {
case Tcp.Received(data)
val buf = if (inboundReceptacle != null) {
try ByteBuffer.allocate(inboundReceptacle.remaining + data.length).put(inboundReceptacle)
finally inboundReceptacle = null
} else ByteBuffer allocate data.length
data copyToBuffer buf
buf.flip()
decrypt(buf)
case x: Tcp.ConnectionClosed
// After we have closed the connection we ignore FIN from the other side.
// That's to avoid a strange condition where we know that no truncation attack
// can happen any more (because we actively closed the connection) but the peer
// isn't behaving properly and didn't send close_notify. Why is this condition strange?
// Because if we had closed the connection directly after we sent close_notify (which
// is allowed by the spec) we wouldn't even have noticed.
if (!engine.isOutboundDone)
try engine.closeInbound()
catch { case e: SSLException } // ignore warning about possible truncation attacks
if (x.isAborted || (originalCloseCommand eq null)) ctx.singleEvent(x)
else if (!engine.isInboundDone) ctx.singleEvent(originalCloseCommand.event)
// else the close message was sent by decrypt case CLOSED
else ctx.singleEvent(x)
case ev ctx.singleEvent(ev)
}
/**
* Encrypts the given buffers and dispatches the results as Tcp.Write commands.
*/
@tailrec
def encrypt(send: Send, fromQueue: Boolean = false, commands: Vector[Result] = Vector.empty): Vector[Result] = {
import send.{ ack, buffer }
tempBuf.clear()
val ackDefinedAndPreContentLeft = ack != Tcp.NoAck && buffer.remaining > 0
val result = engine.wrap(buffer, tempBuf)
val postContentLeft = buffer.remaining > 0
tempBuf.flip()
val nextCmds =
if (tempBuf.remaining > 0) {
val writeAck = if (ackDefinedAndPreContentLeft && !postContentLeft) ack else Tcp.NoAck
commands :+ Right(Tcp.Write(ByteString(tempBuf), writeAck))
} else commands
result.getStatus match {
case OK result.getHandshakeStatus match {
case NOT_HANDSHAKING | FINISHED
if (postContentLeft) encrypt(send, fromQueue, nextCmds)
else nextCmds
case NEED_WRAP
encrypt(send, fromQueue, nextCmds)
case NEED_UNWRAP
pendingSends =
if (fromQueue) send +: pendingSends // output coming from the queue needs to go to the front
else pendingSends enqueue send // "new" output to the back of the queue
nextCmds
case NEED_TASK
runDelegatedTasks()
encrypt(send, fromQueue, nextCmds)
}
case CLOSED
if (postContentLeft) {
log.warning("SSLEngine closed prematurely while sending")
nextCmds :+ Right(Tcp.Abort)
} else nextCmds :+ Right(Tcp.ConfirmedClose)
case BUFFER_OVERFLOW
throw new IllegalStateException("BUFFER_OVERFLOW: the SslBufferPool should make sure that buffers are never too small")
case BUFFER_UNDERFLOW
throw new IllegalStateException("BUFFER_UNDERFLOW should never appear as a result of a wrap")
}
}
/**
* Decrypts the given buffer and dispatches the results as Tcp.Received events.
*/
@tailrec
def decrypt(buffer: ByteBuffer, output: Vector[Result] = Vector.empty): Vector[Result] = {
tempBuf.clear()
val result = engine.unwrap(buffer, tempBuf)
tempBuf.flip()
val nextOutput =
if (tempBuf.remaining > 0) output :+ Left(Tcp.Received(ByteString(tempBuf)))
else output
result.getStatus match {
case OK result.getHandshakeStatus match {
case NOT_HANDSHAKING | FINISHED
if (buffer.remaining > 0) decrypt(buffer, nextOutput)
else nextOutput ++ processPendingSends(tempBuf)
case NEED_UNWRAP
decrypt(buffer, nextOutput)
case NEED_WRAP
val n = nextOutput ++ (
if (pendingSends.isEmpty) encrypt(Send.Empty)
else processPendingSends(tempBuf))
if (buffer.remaining > 0) decrypt(buffer, n)
else n
case NEED_TASK
runDelegatedTasks()
decrypt(buffer, nextOutput)
}
case CLOSED
if (!engine.isOutboundDone) {
closeEngine(nextOutput :+ Left(Tcp.PeerClosed))
} else { // now both sides are closed on the SSL level
// close the underlying connection, we don't need it any more
nextOutput :+ Left(originalCloseCommand.event) :+ Right(Tcp.Close)
}
case BUFFER_UNDERFLOW
inboundReceptacle = buffer // save buffer so we can append the next one to it
nextOutput
case BUFFER_OVERFLOW
throw new IllegalStateException("BUFFER_OVERFLOW: the SslBufferPool should make sure that buffers are never too small")
}
}
@tailrec
def runDelegatedTasks() {
val task = engine.getDelegatedTask
if (task != null) {
task.run()
runDelegatedTasks()
}
}
@tailrec
def processPendingSends(tempBuf: ByteBuffer, commands: Vector[Result] = Vector.empty): Vector[Result] = {
if (pendingSends.nonEmpty) {
val next = pendingSends.head
pendingSends = pendingSends.tail
val nextCmds = commands ++ encrypt(next, fromQueue = true)
// it may be that the send we just passed to `encrypt` was put back into the queue because
// the SSLEngine demands a `NEED_UNWRAP`, in this case we want to stop looping
if (pendingSends.nonEmpty && pendingSends.head != next)
processPendingSends(tempBuf)
else nextCmds
} else commands
}
@tailrec
def closeEngine(commands: Vector[Result] = Vector.empty): Vector[Result] = {
if (!engine.isOutboundDone) {
closeEngine(commands ++ encrypt(Send.Empty))
} else commands
}
}
private final class Send(val buffer: ByteBuffer, val ack: Event)
private object Send {
val Empty = new Send(ByteBuffer wrap SslTlsSupport.EmptyByteArray, Tcp.NoAck)
def apply(write: Tcp.Write) = {
val buffer = ByteBuffer allocate write.data.length
write.data copyToBuffer buffer
buffer.flip()
new Send(buffer, write.ack)
}
}
}

View file

@ -19,12 +19,6 @@ import java.lang.{ Iterable ⇒ JIterable }
/** /**
* TCP Extension for Akkas IO layer. * TCP Extension for Akkas IO layer.
* *
* <b>All contents of the `akka.io` package is marked experimental.</b>
*
* This marker signifies that APIs may still change in response to user feedback
* through-out the 2.2 release cycle. The implementation itself is considered
* stable and ready for production use.
*
* For a full description of the design and philosophy behind this IO * For a full description of the design and philosophy behind this IO
* implementation please refer to <a href="http://doc.akka.io/">the Akka online documentation</a>. * implementation please refer to <a href="http://doc.akka.io/">the Akka online documentation</a>.
* *

View file

@ -1,188 +0,0 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.io
import scala.beans.BeanProperty
import scala.util.{ Failure, Success }
import akka.actor._
import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics }
import akka.util.ByteString
import akka.event.Logging
import akka.event.LoggingAdapter
object TcpPipelineHandler {
/**
* This class wraps up a pipeline with its external (i.e. top) command and
* event types and providing unique wrappers for sending commands and
* receiving events (nested and non-static classes which are specific to each
* instance of [[Init]]). All events emitted by the pipeline will be sent to
* the registered handler wrapped in an Event.
*/
abstract class Init[Ctx <: PipelineContext, Cmd, Evt](
val stages: PipelineStage[_ >: Ctx <: PipelineContext, Cmd, Tcp.Command, Evt, Tcp.Event]) {
/**
* This method must be implemented to return the [[PipelineContext]]
* necessary for the operation of the given [[PipelineStage]].
*/
def makeContext(actorContext: ActorContext): Ctx
/**
* Java API: construct a command to be sent to the [[TcpPipelineHandler]]
* actor.
*/
def command(cmd: Cmd): Command = Command(cmd)
/**
* Java API: extract a wrapped event received from the [[TcpPipelineHandler]]
* actor.
*
* @throws MatchError if the given object is not an Event matching this
* specific Init instance.
*/
def event(evt: AnyRef): Evt = evt match {
case Event(evt) evt
}
/**
* Wrapper class for commands to be sent to the [[TcpPipelineHandler]] actor.
*/
case class Command(@BeanProperty cmd: Cmd) extends NoSerializationVerificationNeeded
/**
* Wrapper class for events emitted by the [[TcpPipelineHandler]] actor.
*/
case class Event(@BeanProperty evt: Evt) extends NoSerializationVerificationNeeded
}
/**
* This interface bundles logging and ActorContext for Java.
*/
trait WithinActorContext extends HasLogging with HasActorContext
def withLogger[Cmd, Evt](log: LoggingAdapter,
stages: PipelineStage[_ >: WithinActorContext <: PipelineContext, Cmd, Tcp.Command, Evt, Tcp.Event]): Init[WithinActorContext, Cmd, Evt] =
new Init[WithinActorContext, Cmd, Evt](stages) {
override def makeContext(ctx: ActorContext): WithinActorContext = new WithinActorContext {
override def getLogger = log
override def getContext = ctx
}
}
/**
* Wrapper class for management commands sent to the [[TcpPipelineHandler]] actor.
*/
case class Management(@BeanProperty cmd: AnyRef)
/**
* This is a new Tcp.Command which the pipeline can emit to effect the
* sending a message to another actor. Using this instead of doing the send
* directly has the advantage that other pipeline stages can also see and
* possibly transform the send.
*/
case class Tell(receiver: ActorRef, msg: Any, sender: ActorRef) extends Tcp.Command
/**
* The pipeline may want to emit a [[Tcp.Event]] to the registered handler
* actor, which is enabled by emitting this [[Tcp.Command]] wrapping an event
* instead. The [[TcpPipelineHandler]] actor will upon reception of this command
* forward the wrapped event to the handler.
*/
case class TcpEvent(@BeanProperty evt: Tcp.Event) extends Tcp.Command
/**
* create [[akka.actor.Props]] for a pipeline handler
*/
def props[Ctx <: PipelineContext, Cmd, Evt](init: TcpPipelineHandler.Init[Ctx, Cmd, Evt], connection: ActorRef, handler: ActorRef) =
Props(classOf[TcpPipelineHandler[_, _, _]], init, connection, handler)
}
/**
* This actor wraps a pipeline and forwards commands and events between that
* one and a [[Tcp]] connection actor. In order to inject commands into the
* pipeline send an [[TcpPipelineHandler.Init.Command]] message to this actor; events will be sent
* to the designated handler wrapped in [[TcpPipelineHandler.Init.Event]] messages.
*
* When the designated handler terminates the TCP connection is aborted. When
* the connection actor terminates this actor terminates as well; the designated
* handler may want to watch this actors lifecycle.
*
* <b>IMPORTANT:</b>
*
* Proper function of this actor (and of other pipeline stages like [[TcpReadWriteAdapter]]
* depends on the fact that stages handling TCP commands and events pass unknown
* subtypes through unaltered. There are more commands and events than are declared
* within the [[Tcp]] object and you can even define your own.
*/
class TcpPipelineHandler[Ctx <: PipelineContext, Cmd, Evt](
init: TcpPipelineHandler.Init[Ctx, Cmd, Evt],
connection: ActorRef,
handler: ActorRef)
extends Actor with RequiresMessageQueue[UnboundedMessageQueueSemantics] {
import init._
import TcpPipelineHandler._
// sign death pact
context watch connection
// watch so we can Close
context watch handler
val ctx = init.makeContext(context)
val pipes = PipelineFactory.buildWithSinkFunctions(ctx, init.stages)({
case Success(cmd)
cmd match {
case Tell(receiver, msg, sender) receiver.tell(msg, sender)
case TcpEvent(ev) handler ! ev
case _ connection ! cmd
}
case Failure(ex) throw ex
}, {
case Success(evt) handler ! Event(evt)
case Failure(ex) throw ex
})
def receive = {
case Command(cmd) pipes.injectCommand(cmd)
case evt: Tcp.Event pipes.injectEvent(evt)
case Management(cmd) pipes.managementCommand(cmd)
case Terminated(`handler`) connection ! Tcp.Abort
case Terminated(`connection`) context.stop(self)
}
}
/**
* Adapts a ByteString oriented pipeline stage to a stage that communicates via Tcp Commands and Events. Every ByteString
* passed down to this stage will be converted to Tcp.Write commands, while incoming Tcp.Receive events will be unwrapped
* and their contents passed up as raw ByteStrings. This adapter should be used together with TcpPipelineHandler.
*
* While this adapter communicates to the stage above it via raw ByteStrings, it is possible to inject Tcp Command
* by sending them to the management port, and the adapter will simply pass them down to the stage below. Incoming Tcp Events
* that are not Receive events will be passed downwards wrapped in a [[TcpPipelineHandler.TcpEvent]]; the [[TcpPipelineHandler]] will
* send these notifications to the registered event handler actor.
*/
class TcpReadWriteAdapter extends PipelineStage[PipelineContext, ByteString, Tcp.Command, ByteString, Tcp.Event] {
import TcpPipelineHandler.TcpEvent
override def apply(ctx: PipelineContext) = new PipePair[ByteString, Tcp.Command, ByteString, Tcp.Event] {
override val commandPipeline = {
data: ByteString ctx.singleCommand(Tcp.Write(data))
}
override val eventPipeline = (evt: Tcp.Event) evt match {
case Tcp.Received(data) ctx.singleEvent(data)
case ev: Tcp.Event ctx.singleCommand(TcpEvent(ev))
}
override val managementPort: Mgmt = {
case cmd: Tcp.Command ctx.singleCommand(cmd)
}
}
}

View file

@ -15,12 +15,6 @@ import akka.actor._
/** /**
* UDP Extension for Akkas IO layer. * UDP Extension for Akkas IO layer.
* *
* <b>All contents of the `akka.io` package is marked experimental.</b>
*
* This marker signifies that APIs may still change in response to user feedback
* through-out the 2.2 release cycle. The implementation itself is considered
* stable and ready for production use.
*
* This extension implements the connectionless UDP protocol without * This extension implements the connectionless UDP protocol without
* calling `connect` on the underlying sockets, i.e. without restricting * calling `connect` on the underlying sockets, i.e. without restricting
* from whom data can be received. For connected UDP mode see [[UdpConnected]]. * from whom data can be received. For connected UDP mode see [[UdpConnected]].

View file

@ -14,12 +14,6 @@ import akka.actor._
/** /**
* UDP Extension for Akkas IO layer. * UDP Extension for Akkas IO layer.
* *
* <b>All contents of the `akka.io` package is marked experimental.</b>
*
* This marker signifies that APIs may still change in response to user feedback
* through-out the 2.2 release cycle. The implementation itself is considered
* stable and ready for production use.
*
* This extension implements the connectionless UDP protocol with * This extension implements the connectionless UDP protocol with
* calling `connect` on the underlying sockets, i.e. with restricting * calling `connect` on the underlying sockets, i.e. with restricting
* from whom data can be received. For unconnected UDP mode see [[Udp]]. * from whom data can be received. For unconnected UDP mode see [[Udp]].

View file

@ -1,16 +0,0 @@
/**
* Copyright (C) 2013 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.io.japi;
import akka.actor.ActorContext;
import akka.io.PipelineContext;
//#actor-context
public interface HasActorContext extends PipelineContext {
public ActorContext getContext();
}
//#actor-context

View file

@ -1,15 +0,0 @@
/**
* Copyright (C) 2013 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.io.japi;
import java.nio.ByteOrder;
import akka.io.PipelineContext;
public interface HasByteOrder extends PipelineContext {
public ByteOrder byteOrder();
}

View file

@ -1,84 +0,0 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.io.japi;
//#frame
import java.nio.ByteOrder;
import java.util.ArrayList;
import scala.util.Either;
import akka.io.AbstractSymmetricPipePair;
import akka.io.PipePairFactory;
import akka.io.PipelineContext;
import akka.io.SymmetricPipePair;
import akka.io.SymmetricPipelineStage;
import akka.util.ByteString;
import akka.util.ByteStringBuilder;
public class LengthFieldFrame extends
SymmetricPipelineStage<PipelineContext, ByteString, ByteString> {
final int maxSize;
public LengthFieldFrame(int maxSize) {
this.maxSize = maxSize;
}
@Override
public SymmetricPipePair<ByteString, ByteString> apply(final PipelineContext ctx) {
return PipePairFactory
.create(ctx, new AbstractSymmetricPipePair<ByteString, ByteString>() {
final ByteOrder byteOrder = ByteOrder.BIG_ENDIAN;
ByteString buffer = null;
@Override
public Iterable<Either<ByteString, ByteString>> onCommand(
ByteString cmd) {
final int length = cmd.length() + 4;
if (length > maxSize) {
return new ArrayList<Either<ByteString, ByteString>>(0);
}
final ByteStringBuilder bb = new ByteStringBuilder();
bb.putInt(length, byteOrder);
bb.append(cmd);
return singleCommand(bb.result());
}
@Override
public Iterable<Either<ByteString, ByteString>> onEvent(
ByteString event) {
final ArrayList<Either<ByteString, ByteString>> res =
new ArrayList<Either<ByteString, ByteString>>();
ByteString current = buffer == null ? event : buffer.concat(event);
while (true) {
if (current.length() == 0) {
buffer = null;
return res;
} else if (current.length() < 4) {
buffer = current;
return res;
} else {
final int length = current.iterator().getInt(byteOrder);
if (length > maxSize)
throw new IllegalArgumentException(
"received too large frame of size " + length + " (max = "
+ maxSize + ")");
if (current.length() < length) {
buffer = current;
return res;
} else {
res.add(makeEvent(current.slice(4, length)));
current = current.drop(length);
}
}
}
}
});
}
}
//#frame

View file

@ -1,118 +0,0 @@
/**
* Copyright (C) 2013 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.io.japi;
import java.nio.ByteOrder;
import java.util.Collections;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
import scala.util.Either;
import akka.actor.ActorRef;
import akka.io.AbstractSymmetricPipePair;
import akka.io.PipePairFactory;
import akka.io.SymmetricPipePair;
import akka.io.SymmetricPipelineStage;
import akka.util.ByteIterator;
import akka.util.ByteString;
import akka.util.ByteStringBuilder;
//#format
public class MessageStage extends
SymmetricPipelineStage<HasByteOrder, Message, ByteString> {
@Override
public SymmetricPipePair<Message, ByteString> apply(final HasByteOrder context) {
return PipePairFactory
.create(context, new AbstractSymmetricPipePair<Message, ByteString>() {
final ByteOrder byteOrder = context.byteOrder();
private void putString(ByteStringBuilder builder, String str) {
final byte[] bytes = ByteString.fromString(str, "UTF-8").toArray();
builder.putInt(bytes.length, byteOrder);
builder.putBytes(bytes);
}
@Override
public Iterable<Either<Message, ByteString>> onCommand(Message cmd) {
final ByteStringBuilder builder = new ByteStringBuilder();
builder.putInt(cmd.getPersons().length, byteOrder);
for (Message.Person p : cmd.getPersons()) {
putString(builder, p.getFirst());
putString(builder, p.getLast());
}
builder.putInt(cmd.getHappinessCurve().length, byteOrder);
builder.putDoubles(cmd.getHappinessCurve(), byteOrder);
return singleCommand(builder.result());
}
//#decoding-omitted
//#decoding
private String getString(ByteIterator iter) {
final int length = iter.getInt(byteOrder);
final byte[] bytes = new byte[length];
iter.getBytes(bytes);
return ByteString.fromArray(bytes).utf8String();
}
@Override
public Iterable<Either<Message, ByteString>> onEvent(ByteString evt) {
final ByteIterator iter = evt.iterator();
final int personLength = iter.getInt(byteOrder);
final Message.Person[] persons = new Message.Person[personLength];
for (int i = 0; i < personLength; ++i) {
persons[i] = new Message.Person(getString(iter), getString(iter));
}
final int curveLength = iter.getInt(byteOrder);
final double[] curve = new double[curveLength];
iter.getDoubles(curve, byteOrder);
// verify that this was all; could be left out to allow future
// extensions
assert iter.isEmpty();
return singleEvent(new Message(persons, curve));
}
//#decoding
ActorRef target = null;
//#mgmt-ticks
private FiniteDuration lastTick = Duration.Zero();
@Override
public Iterable<Either<Message, ByteString>> onManagementCommand(Object cmd) {
//#omitted
if (cmd instanceof PipelineTest.SetTarget) {
target = ((PipelineTest.SetTarget) cmd).getRef();
} else if (cmd instanceof TickGenerator.Tick && target != null) {
target.tell(cmd, ActorRef.noSender());
}
//#omitted
if (cmd instanceof TickGenerator.Tick) {
final FiniteDuration timestamp = ((TickGenerator.Tick) cmd)
.getTimestamp();
System.out.println("time since last tick: "
+ timestamp.minus(lastTick));
lastTick = timestamp;
}
return Collections.emptyList();
}
//#mgmt-ticks
//#decoding-omitted
});
}
}
//#format

View file

@ -1,157 +0,0 @@
/**
* Copyright (C) 2013 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.io.japi;
import java.nio.ByteOrder;
import java.util.concurrent.TimeUnit;
import akka.testkit.AkkaJUnitActorSystemResource;
import org.junit.ClassRule;
import org.junit.Test;
import scala.concurrent.duration.Duration;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.io.AbstractPipelineContext;
import akka.io.PipelineFactory;
import akka.io.PipelineInjector;
import akka.io.PipelineSink;
import akka.io.PipelineStage;
import akka.testkit.JavaTestKit;
import akka.testkit.TestProbe;
import akka.util.ByteString;
public class PipelineTest {
//#message
final Message msg = new Message(
new Message.Person[] {
new Message.Person("Alice", "Gibbons"),
new Message.Person("Bob", "Sparseley")
},
new double[] { 1.0, 3.0, 5.0 });
//#message
//#byteorder
class Context extends AbstractPipelineContext implements HasByteOrder {
@Override
public ByteOrder byteOrder() {
return java.nio.ByteOrder.BIG_ENDIAN;
}
}
final Context ctx = new Context();
//#byteorder
@ClassRule
public static AkkaJUnitActorSystemResource actorSystemResource =
new AkkaJUnitActorSystemResource("PipelineTest");
private final ActorSystem system = actorSystemResource.getSystem();
@Test
public void demonstratePipeline() throws Exception {
final TestProbe probe = TestProbe.apply(system);
final ActorRef commandHandler = probe.ref();
final ActorRef eventHandler = probe.ref();
//#build-sink
final PipelineStage<Context, Message, ByteString, Message, ByteString> stages =
PipelineStage.sequence(
new MessageStage(),
new LengthFieldFrame(10000)
);
final PipelineSink<ByteString, Message> sink =
new PipelineSink<ByteString, Message>() {
@Override
public void onCommand(ByteString cmd) throws Throwable {
commandHandler.tell(cmd, ActorRef.noSender());
}
@Override
public void onEvent(Message evt) throws Throwable {
eventHandler.tell(evt, ActorRef.noSender());
}
};
final PipelineInjector<Message, ByteString> injector =
PipelineFactory.buildWithSink(ctx, stages, sink);
injector.injectCommand(msg);
//#build-sink
final ByteString encoded = probe.expectMsgClass(ByteString.class);
injector.injectEvent(encoded);
final Message decoded = probe.expectMsgClass(Message.class);
assert msg == decoded;
}
static class SetTarget {
final ActorRef ref;
public SetTarget(ActorRef ref) {
super();
this.ref = ref;
}
public ActorRef getRef() {
return ref;
}
}
@Test
public void testTick() {
new JavaTestKit(system) {
{
class P extends Processor {
public P(ActorRef cmds, ActorRef evts) throws Exception {
super(cmds, evts);
}
@Override
public void onReceive(Object obj) throws Exception {
if (obj.equals("fail!")) {
throw new RuntimeException("FAIL!");
}
super.onReceive(obj);
}
}
final ActorRef proc = system.actorOf(Props.create(
P.class, this, getRef(), getRef()), "processor");
expectMsgClass(TickGenerator.Tick.class);
proc.tell(msg, ActorRef.noSender());
final ByteString encoded = expectMsgClass(ByteString.class);
proc.tell(encoded, ActorRef.noSender());
final Message decoded = expectMsgClass(Message.class);
assert msg == decoded;
new Within(Duration.create(1500, TimeUnit.MILLISECONDS),
Duration.create(3, TimeUnit.SECONDS)) {
protected void run() {
expectMsgClass(TickGenerator.Tick.class);
expectMsgClass(TickGenerator.Tick.class);
}
};
proc.tell("fail!", ActorRef.noSender());
new Within(Duration.create(1700, TimeUnit.MILLISECONDS),
Duration.create(3, TimeUnit.SECONDS)) {
protected void run() {
expectMsgClass(TickGenerator.Tick.class);
expectMsgClass(TickGenerator.Tick.class);
proc.tell(PoisonPill.getInstance(), ActorRef.noSender());
expectNoMsg();
}
};
}
};
}
}

View file

@ -1,94 +0,0 @@
/**
* Copyright (C) 2013 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.io.japi;
import java.nio.ByteOrder;
import java.util.concurrent.TimeUnit;
import akka.actor.ActorContext;
import akka.actor.ActorRef;
import akka.actor.UntypedActor;
import akka.io.AbstractPipelineContext;
import akka.io.PipelineFactory;
import akka.io.PipelineInjector;
import akka.io.PipelineSink;
import akka.io.PipelineStage;
import akka.util.ByteString;
import scala.concurrent.duration.*;
//#actor
public class Processor extends UntypedActor {
private class Context extends AbstractPipelineContext
implements HasByteOrder, HasActorContext {
@Override
public ActorContext getContext() {
return Processor.this.getContext();
}
@Override
public ByteOrder byteOrder() {
return java.nio.ByteOrder.BIG_ENDIAN;
}
}
final Context ctx = new Context();
final FiniteDuration interval = Duration.apply(1, TimeUnit.SECONDS);
final PipelineStage<Context, Message, ByteString, Message, ByteString> stages =
PipelineStage.sequence(
// Java 7 can infer these types, Java 6 cannot
PipelineStage.<Context, Message, Message, ByteString, Message, Message,
ByteString> sequence( //
new TickGenerator<Message, Message>(interval), //
new MessageStage()), //
new LengthFieldFrame(10000));
private final ActorRef evts;
private final ActorRef cmds;
final PipelineInjector<Message, ByteString> injector = PipelineFactory
.buildWithSink(ctx, stages, new PipelineSink<ByteString, Message>() {
@Override
public void onCommand(ByteString cmd) {
cmds.tell(cmd, getSelf());
}
@Override
public void onEvent(Message evt) {
evts.tell(evt, getSelf());
}
});
public Processor(ActorRef cmds, ActorRef evts) throws Exception {
this.cmds = cmds;
this.evts = evts;
}
//#omitted
@Override
public void preStart() throws Exception {
injector.managementCommand(new PipelineTest.SetTarget(cmds));
}
//#omitted
@Override
public void onReceive(Object obj) throws Exception {
if (obj instanceof Message) {
injector.injectCommand((Message) obj);
} else if (obj instanceof ByteString) {
injector.injectEvent((ByteString) obj);
} else if (obj instanceof TickGenerator.Trigger) {
injector.managementCommand(obj);
}
}
}
//#actor

View file

@ -1,202 +0,0 @@
/**
* Copyright (C) 2013 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.io.japi;
import java.net.InetSocketAddress;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import akka.testkit.AkkaJUnitActorSystemResource;
import org.junit.ClassRule;
import org.junit.Test;
import akka.actor.ActorContext;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.io.AbstractPipelineContext;
import akka.io.BackpressureBuffer;
import akka.io.DelimiterFraming;
import akka.io.HasLogging;
import akka.io.PipelineStage;
import static akka.io.PipelineStage.sequence;
import akka.io.SslTlsSupport;
import akka.io.StringByteStringAdapter;
import akka.io.Tcp;
import akka.io.Tcp.Bound;
import akka.io.Tcp.Command;
import akka.io.Tcp.CommandFailed;
import akka.io.Tcp.Connected;
import akka.io.Tcp.Event;
import akka.io.Tcp.Received;
import akka.io.TcpMessage;
import akka.io.TcpPipelineHandler;
import akka.io.TcpPipelineHandler.Init;
import akka.io.TcpPipelineHandler.WithinActorContext;
import akka.io.TcpReadWriteAdapter;
import akka.io.ssl.SslTlsSupportSpec;
import akka.testkit.AkkaSpec;
import akka.testkit.JavaTestKit;
import akka.util.ByteString;
public class SslDocTest {
static
//#client
public class SslClient extends UntypedActor {
final InetSocketAddress remote;
final SSLContext sslContext;
final ActorRef listener;
final LoggingAdapter log = Logging
.getLogger(getContext().system(), getSelf());
public SslClient(InetSocketAddress remote, SSLContext sslContext,
ActorRef listener) {
this.remote = remote;
this.sslContext = sslContext;
this.listener = listener;
// open a connection to the remote TCP port
Tcp.get(getContext().system()).getManager()
.tell(TcpMessage.connect(remote), getSelf());
}
// this will hold the pipeline handlers context
Init<WithinActorContext, String, String> init = null;
@Override
public void onReceive(Object msg) {
if (msg instanceof CommandFailed) {
getContext().stop(getSelf());
} else if (msg instanceof Connected) {
// create a javax.net.ssl.SSLEngine for our peer in client mode
final SSLEngine engine = sslContext.createSSLEngine(
remote.getHostName(), remote.getPort());
engine.setUseClientMode(true);
// build pipeline and set up context for communicating with TcpPipelineHandler
init = TcpPipelineHandler.withLogger(log, sequence(sequence(sequence(sequence(
new StringByteStringAdapter("utf-8"),
new DelimiterFraming(1024, ByteString.fromString("\n"), true)),
new TcpReadWriteAdapter()),
new SslTlsSupport(engine)),
new BackpressureBuffer(1000, 10000, 1000000)));
// create handler for pipeline, setting ourselves as payload recipient
final ActorRef handler = getContext().actorOf(
TcpPipelineHandler.props(init, getSender(), getSelf()));
// register the SSL handler with the connection
getSender().tell(TcpMessage.register(handler), getSelf());
// and send a message across the SSL channel
handler.tell(init.command("hello\n"), getSelf());
} else if (msg instanceof Init.Event) {
// unwrap TcpPipelineHandlers event into a Tcp.Event
final String recv = init.event(msg);
// and inform someone of the received payload
listener.tell(recv, getSelf());
}
}
}
//#client
static
//#server
public class SslServer extends UntypedActor {
final SSLContext sslContext;
final ActorRef listener;
final LoggingAdapter log = Logging
.getLogger(getContext().system(), getSelf());
public SslServer(SSLContext sslContext, ActorRef listener) {
this.sslContext = sslContext;
this.listener = listener;
// bind to a socket, registering ourselves as incoming connection handler
Tcp.get(getContext().system()).getManager().tell(
TcpMessage.bind(getSelf(), new InetSocketAddress("localhost", 0), 100),
getSelf());
}
// this will hold the pipeline handlers context
Init<WithinActorContext, String, String> init = null;
@Override
public void onReceive(Object msg) {
if (msg instanceof CommandFailed) {
getContext().stop(getSelf());
} else if (msg instanceof Bound) {
listener.tell(msg, getSelf());
} else if (msg instanceof Connected) {
// create a javax.net.ssl.SSLEngine for our peer in server mode
final InetSocketAddress remote = ((Connected) msg).remoteAddress();
final SSLEngine engine = sslContext.createSSLEngine(
remote.getHostName(), remote.getPort());
engine.setUseClientMode(false);
// build pipeline and set up context for communicating with TcpPipelineHandler
init = TcpPipelineHandler.withLogger(log, sequence(sequence(sequence(sequence(
new StringByteStringAdapter("utf-8"),
new DelimiterFraming(1024, ByteString.fromString("\n"), true)),
new TcpReadWriteAdapter()),
new SslTlsSupport(engine)),
new BackpressureBuffer(1000, 10000, 1000000)));
// create handler for pipeline, setting ourselves as payload recipient
final ActorRef handler = getContext().actorOf(
TcpPipelineHandler.props(init, getSender(), getSelf()));
// register the SSL handler with the connection
getSender().tell(TcpMessage.register(handler), getSelf());
} else if (msg instanceof Init.Event) {
// unwrap TcpPipelineHandlers event to get a Tcp.Event
final String recv = init.event(msg);
// inform someone of the received message
listener.tell(recv, getSelf());
// and reply (sender is the SSL handler created above)
getSender().tell(init.command("world\n"), getSelf());
}
}
}
//#server
@ClassRule
public static AkkaJUnitActorSystemResource actorSystemResource =
new AkkaJUnitActorSystemResource("SslDocTest", AkkaSpec.testConf());
private final ActorSystem system = actorSystemResource.getSystem();
@Test
public void demonstrateSslClient() {
new JavaTestKit(system) {
{
final SSLContext ctx = SslTlsSupportSpec.createSslContext("/keystore", "/truststore", "changeme");
final ActorRef server = system.actorOf(Props.create(SslServer.class, ctx, getRef()));
final Bound bound = expectMsgClass(Bound.class);
assert getLastSender() == server;
final ActorRef client = system.actorOf(Props.create(SslClient.class, bound.localAddress(), ctx, getRef()));
expectMsgEquals("hello\n");
assert getLastSender() == server;
expectMsgEquals("world\n");
assert getLastSender() == client;
}
};
}
}

View file

@ -1,88 +0,0 @@
/**
* Copyright (C) 2013 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.io.japi;
import java.util.Collections;
import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration;
import scala.util.Either;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.io.AbstractPipePair;
import akka.io.PipePair;
import akka.io.PipePairFactory;
import akka.io.PipelineStage;
//#tick-generator
public class TickGenerator<Cmd, Evt> extends
PipelineStage<HasActorContext, Cmd, Cmd, Evt, Evt> {
public static interface Trigger {};
public static class Tick implements Trigger {
final FiniteDuration timestamp;
public Tick(FiniteDuration timestamp) {
super();
this.timestamp = timestamp;
}
public FiniteDuration getTimestamp() {
return timestamp;
}
}
private final FiniteDuration interval;
public TickGenerator(FiniteDuration interval) {
this.interval = interval;
}
@Override
public PipePair<Cmd, Cmd, Evt, Evt> apply(final HasActorContext ctx) {
return PipePairFactory.create(ctx,
new AbstractPipePair<Cmd, Cmd, Evt, Evt>() {
private final Trigger trigger = new Trigger() {
public String toString() {
return "Tick[" + ctx.getContext().self().path() + "]";
}
};
private void schedule() {
final ActorSystem system = ctx.getContext().system();
system.scheduler().scheduleOnce(interval,
ctx.getContext().self(), trigger, system.dispatcher(), null);
}
{
schedule();
}
@Override
public Iterable<Either<Evt, Cmd>> onCommand(Cmd cmd) {
return singleCommand(cmd);
}
@Override
public Iterable<Either<Evt, Cmd>> onEvent(Evt evt) {
return singleEvent(evt);
}
@Override
public Iterable<Either<Evt, Cmd>> onManagementCommand(Object cmd) {
if (cmd == trigger) {
ctx.getContext().self().tell(new Tick(Deadline.now().time()),
ActorRef.noSender());
schedule();
}
return Collections.emptyList();
}
});
}
}
//#tick-generator

View file

@ -9,7 +9,6 @@ Networking
remoting remoting
serialization serialization
io io
io-codec
io-tcp io-tcp
io-udp io-udp
zeromq zeromq

View file

@ -1,254 +0,0 @@
.. _io-java-codec:
Encoding and decoding binary data
=================================
.. warning::
The IO implementation is marked as **“experimental”** as of its introduction
in Akka 2.2.0. We will continue to improve this API based on our users
feedback, which implies that while we try to keep incompatible changes to a
minimum the binary compatibility guarantee for maintenance releases does not
apply to the contents of the `akka.io` package.
Akka adopted and adapted the implementation of data processing pipelines found
in the ``spray-io`` module. The idea is that encoding and decoding often
go hand in hand and keeping the code pertaining to one protocol layer together
is deemed more important than writing down the complete read side—say—in the
iteratee style in one go; pipelines encourage packaging the stages in a form
which lends itself better to reuse in a protocol stack. Another reason for
choosing this abstraction is that it is at times necessary to change the
behavior of encoding and decoding within a stage based on a message streams
state, and pipeline stages allow communication between the read and write
halves quite naturally.
The actual byte-fiddling can be done within pipeline stages, for example using
the rich API of :class:`ByteIterator` and :class:`ByteStringBuilder` as shown
below. All these activities are synchronous transformations which benefit
greatly from CPU affinity to make good use of those data caches. Therefore the
design of the pipeline infrastructure is completely synchronous, every stages
handler code can only directly return the events and/or commands resulting from
an input, there are no callbacks. Exceptions thrown within a pipeline stage
will abort processing of the whole pipeline under the assumption that
recoverable error conditions will be signaled in-band to the next stage instead
of raising an exception.
An overall “logical” pipeline can span multiple execution contexts, for example
starting with the low-level protocol layers directly within an actor handling
the reads and writes to a TCP connection and then being passed to a number of
higher-level actors which do the costly application level processing. This is
supported by feeding the generated events into a sink which sends them to
another actor, and that other actor will then upon reception feed them into its
own pipeline.
Introducing the Sample Protocol
-------------------------------
In the following the process of implementing a protocol stack using pipelines
is demonstrated on the following simple example:
.. code-block:: text
frameLen: Int
persons: Int
persons times {
first: String
last: String
}
points: Int
points times Double
mapping to the following data type:
.. includecode:: code/docs/io/japi/Message.java#message
We will split the handling of this protocol into two parts: the frame-length
encoding handles the buffering necessary on the read side and the actual
encoding of the frame contents is done in a separate stage.
Building a Pipeline Stage
-------------------------
As a common example, which is also included in the ``akka-actor`` package, let
us look at a framing protocol which works by prepending a length field to each
message (the following is a simplified version for demonstration purposes, the
real implementation is more configurable and implemented in Scala).
.. includecode:: code/docs/io/japi/LengthFieldFrame.java
:include: frame
In the end a pipeline stage is nothing more than a set of three methods: one
transforming commands arriving from above, one transforming events arriving
from below and the third transforming incoming management commands (not shown
here, see below for more information). The result of the transformation can in
either case be a sequence of commands flowing downwards or events flowing
upwards (or a combination thereof).
In the case above the data type for commands and events are equal as both
functions operate only on ``ByteString``, and the transformation does not
change that type because it only adds or removes four octets at the front.
The pair of command and event transformation functions is represented by an
object of type :class:`AbstractPipePair`, or in this case a
:class:`AbstractSymmetricPipePair`. This object could benefit from knowledge
about the context it is running in, for example an :class:`Actor`, and this
context is introduced by making a :class:`PipelineStage` be a factory for
producing a :class:`PipePair`. The factory method is called :meth:`apply` (a
Scala tradition) and receives the context object as its argument. The
implementation of this factory method could now make use of the context in
whatever way it sees fit, you will see an example further down.
Manipulating ByteStrings
------------------------
The second stage of our sample protocol stack illustrates in more depth what
showed only a little in the pipeline stage built above: constructing and
deconstructing byte strings. Let us first take a look at the encoder:
.. includecode:: code/docs/io/japi/MessageStage.java
:include: format
:exclude: decoding-omitted,omitted
Note how the byte order to be used by this stage is fixed in exactly one place,
making it impossible get wrong between commands and events; the way how the
byte order is passed into the stage demonstrates one possible use for the
stages ``context`` parameter.
The basic tool for constucting a :class:`ByteString` is a
:class:`ByteStringBuilder`. This builder is specialized for concatenating byte
representations of the primitive data types like ``Int`` and ``Double`` or
arrays thereof. Encoding a ``String`` requires a bit more work because not
only the sequence of bytes needs to be encoded but also the length, otherwise
the decoding stage would not know where the ``String`` terminates. When all
values making up the :class:`Message` have been appended to the builder, we
simply pass the resulting :class:`ByteString` on to the next stage as a command
using the optimized :meth:`singleCommand` facility.
.. warning::
The :meth:`singleCommand` and :meth:`singleEvent` methods provide a way to
generate responses which transfer exactly one result from one pipeline stage
to the next without suffering the overhead of object allocations. This means
that the returned collection object will not work for anything else (you will
get :class:`ClassCastExceptions`!) and this facility can only be used *EXACTLY
ONCE* during the processing of one input (command or event).
Now let us look at the decoder side:
.. includecode:: code/docs/io/japi/MessageStage.java
:include: decoding
The decoding side does the same things that the encoder does in the same order,
it just uses a :class:`ByteIterator` to retrieve primitive data types or arrays
of those from the underlying :class:`ByteString`. And in the end it hands the
assembled :class:`Message` as an event to the next stage using the optimized
:meth:`singleEvent` facility (see warning above).
Building a Pipeline
-------------------
Given the two pipeline stages introduced in the sections above we can now put
them to some use. First we define some message to be encoded:
.. includecode:: code/docs/io/japi/PipelineTest.java
:include: message
Then we need to create a pipeline context which satisfies our declared needs:
.. includecode:: code/docs/io/japi/PipelineTest.java
:include: byteorder
Building the pipeline and encoding this message then is quite simple:
.. includecode:: code/docs/io/japi/PipelineTest.java
:include: build-sink
First we *sequence* the two stages, i.e. attach them such that the output of
one becomes the input of the other. Then we create a :class:`PipelineSink`
which is essentially a callback interface for what shall happen with the
encoded commands or decoded events, respectively. Then we build the pipeline
using the :class:`PipelineFactory`, which returns an interface for feeding
commands and events into this pipeline instance. As a demonstration of how to
use this, we simply encode the message shown above and the resulting
:class:`ByteString` will then be sent to the ``commandHandler`` actor. Decoding
works in the same way, only using :meth:`injectEvent`.
Injecting into a pipeline using a :class:`PipelineInjector` will catch
exceptions resulting from processing the input, in which case the exception
(there can only be one per injection) is passed into the respective sink. The
default implementation of :meth:`onCommandFailure` and :meth:`onEventFailure`
will re-throw the exception (whence originates the ``throws`` declaration of
the ``inject*`` method).
Using the Pipelines Context
----------------------------
Up to this point there was always a parameter ``ctx`` which was used when
constructing a pipeline, but it was not explained in full. The context is a
piece of information which is made available to all stages of a pipeline. The
context may also carry behavior, provide infrastructure or helper methods etc.
It should be noted that the context is bound to the pipeline and as such must
not be accessed concurrently from different threads unless care is taken to
properly synchronize such access. Since the context will in many cases be
provided by an actor it is not recommended to share this context with code
executing outside of the actors message handling.
.. warning::
A PipelineContext instance *MUST NOT* be used by two different pipelines
since it contains mutable fields which are used during message processing.
Using Management Commands
-------------------------
Since pipeline stages do not have any reference to the pipeline or even to
their neighbors they cannot directly effect the injection of commands or events
outside of their normal processing. But sometimes things need to happen driven
by a timer, for example. In this case the timer would need to cause sending
tick messages to the whole pipeline, and those stages which wanted to receive
them would act upon those. In order to keep the type signatures for events and
commands useful, such external triggers are sent out-of-band, via a different
channel—the management port. One example which makes use of this facility is
the :class:`TickGenerator` which comes included with ``akka-actor`` (this is a
transcription of the Scala version which is actually included in the
``akka-actor`` JAR):
.. includecode:: code/docs/io/japi/HasActorContext.java#actor-context
.. includecode:: code/docs/io/japi/TickGenerator.java#tick-generator
This pipeline stage is to be used within an actor, and it will make use of this
context in order to schedule the delivery of ``Tick`` messages; the actor is
then supposed to feed these messages into the management port of the pipeline.
An example could look like this:
.. includecode:: code/docs/io/japi/Processor.java
:include: actor
:exclude: omitted
This actor extends our well-known pipeline with the tick generator and attaches
the outputs to functions which send commands and events to actors for further
processing. The pipeline stages will then all receive on ``Tick`` per second
which can be used like so:
.. includecode:: code/docs/io/japi/MessageStage.java
:include: mgmt-ticks
:exclude: omitted
.. note::
Management commands are delivered to all stages of a pipeline “effectively
parallel”, like on a broadcast medium. No code will actually run concurrently
since a pipeline is strictly single-threaded, but the order in which these
commands are processed is not specified.
The intended purpose of management commands is for each stage to define its
special command types and then listen only to those (where the aforementioned
``Tick`` message is a useful counter-example), exactly like sending packets on
a wifi network where every station receives all traffic but reacts only to
those messages which are destined for it.
If you need all stages to react upon something in their defined order, then
this must be modeled either as a command or event, i.e. it will be part of the
“business” type of the pipeline.

View file

@ -3,14 +3,6 @@
Using TCP Using TCP
========= =========
.. warning::
The IO implementation is marked as **“experimental”** as of its introduction
in Akka 2.2.0. We will continue to improve this API based on our users
feedback, which implies that while we try to keep incompatible changes to a
minimum the binary compatibility guarantee for maintenance releases does not
apply to the contents of the `akka.io` package.
The code snippets through-out this section assume the following imports: The code snippets through-out this section assume the following imports:
.. includecode:: code/docs/io/japi/IODocTest.java#imports .. includecode:: code/docs/io/japi/IODocTest.java#imports
@ -278,79 +270,3 @@ behavior to await the :class:`WritingResumed` event and start over.
The helper functions are very similar to the ACK-based case: The helper functions are very similar to the ACK-based case:
.. includecode:: code/docs/io/japi/EchoHandler.java#helpers .. includecode:: code/docs/io/japi/EchoHandler.java#helpers
Usage Example: TcpPipelineHandler and SSL
-----------------------------------------
This example shows the different parts described above working together. Let us
first look at the SSL server:
.. includecode:: code/docs/io/japi/SslDocTest.java#server
Please refer to `the source code`_ to see all imports.
.. _the source code: @github@/akka-docs/rst/java/code/docs/io/japi/SslDocTest.java
The actor above binds to a local port and registers itself as the handler for
new connections. When a new connection comes in it will create a
:class:`javax.net.ssl.SSLEngine` (details not shown here since they vary widely
for different setups, please refer to the JDK documentation) and wrap that in
an :class:`SslTlsSupport` pipeline stage (which is included in ``akka-actor``).
This sample demonstrates a few more things: below the SSL pipeline stage we
have inserted a backpressure buffer which will generate a
:class:`HighWatermarkReached` event to tell the upper stages to suspend writing
(generated at 10000 buffered bytes) and a :class:`LowWatermarkReached` when
they can resume writing (when buffer empties below 1000 bytes); the buffer has
a maximum capacity of 1MB. The implementation is very similar to the NACK-based
backpressure approach presented above, please refer to the API documentation
for details about its usage. Above the SSL stage comes an adapter which
extracts only the payload data from the TCP commands and events, i.e. it speaks
:class:`ByteString` above. The resulting byte streams are broken into frames by
a :class:`DelimiterFraming` stage which chops them up on newline characters.
The top-most stage then converts between :class:`String` and UTF-8 encoded
:class:`ByteString`.
As a result the pipeline will accept simple :class:`String` commands, encode
them using UTF-8, delimit them with newlines (which are expected to be already
present in the sending direction), transform them into TCP commands and events,
encrypt them and send them off to the connection actor while buffering writes.
This pipeline is driven by a :class:`TcpPipelineHandler` actor which is also
included in ``akka-actor``. In order to capture the generic command and event
types consumed and emitted by that actor we need to create a wrapper—the nested
:class:`Init` class—which also provides the pipeline context needed by the
supplied pipeline; in this case we use the :meth:`withLogger` convenience
method which supplies a context that implements :class:`HasLogger` and
:class:`HasActorContext` and should be sufficient for typical pipelines. With
those things bundled up all that remains is creating a
:class:`TcpPipelineHandler` and registering that one as the recipient of
inbound traffic from the TCP connection.
Since we instructed that handler actor to send any events which are emitted by
the SSL pipeline to ourselves, we can then just wait for the reception of the
decrypted payload messages, compute a response—just ``"world\n"`` in this
case—and reply by sending back an ``Init.Command``. It should be noted that
communication with the handler wraps commands and events in the inner types of
the ``init`` object in order to keep things well separated. To ease handling of
such path-dependent types there exist two helper methods, namely
:class:`Init.command` for creating a command and :class:`Init.event` for
unwrapping an event.
Looking at the client side we see that not much needs to be changed:
.. includecode:: code/docs/io/japi/SslDocTest.java#client
Once the connection is established we again create a
:class:`TcpPipelineHandler` wrapping an :class:`SslTlsSupport` (in client mode)
and register that as the recipient of inbound traffic and ourselves as
recipient for the decrypted payload data. The we send a greeting to the server
and forward any replies to some ``listener`` actor.
.. warning::
The SslTlsSupport currently does not support using a ``Tcp.WriteCommand``
other than ``Tcp.Write``, like for example ``Tcp.WriteFile``. It also doesn't
support messages that are larger than the size of the send buffer on the socket.
Trying to send such a message will result in a ``CommandFailed``. If you need
to send large messages over SSL, then they have to be sent in chunks.

View file

@ -3,14 +3,6 @@
Using UDP Using UDP
========= =========
.. warning::
The IO implementation is marked as **“experimental”** as of its introduction
in Akka 2.2.0. We will continue to improve this API based on our users
feedback, which implies that while we try to keep incompatible changes to a
minimum the binary compatibility guarantee for maintenance releases does not
apply to the contents of the `akka.io` package.
UDP is a connectionless datagram protocol which offers two different ways of UDP is a connectionless datagram protocol which offers two different ways of
communication on the JDK level: communication on the JDK level:

View file

@ -11,14 +11,6 @@ and `spray.io`_ teams. Its design combines experiences from the
``spray-io`` module with improvements that were jointly developed for ``spray-io`` module with improvements that were jointly developed for
more general consumption as an actor-based service. more general consumption as an actor-based service.
.. warning::
The IO implementation is marked as **“experimental”** as of its introduction
in Akka 2.2.0. We will continue to improve this API based on our users
feedback, which implies that while we try to keep incompatible changes to a
minimum the binary compatibility guarantee for maintenance releases does not
apply to the contents of the `akka.io` package.
The guiding design goal for this I/O implementation was to reach extreme The guiding design goal for this I/O implementation was to reach extreme
scalability, make no compromises in providing an API correctly matching the scalability, make no compromises in providing an API correctly matching the
underlying transport mechanism and to be fully event-driven, non-blocking and underlying transport mechanism and to be fully event-driven, non-blocking and

View file

@ -95,6 +95,18 @@ without much trouble.
Read more about the new routers in the :ref:`documentation for Scala <routing-scala>` and Read more about the new routers in the :ref:`documentation for Scala <routing-scala>` and
:ref:`documentation for Java <routing-java>`. :ref:`documentation for Java <routing-java>`.
Akka IO is no longer experimental
=================================
The core IO layer introduced in Akka 2.2 is now a fully supported module of Akka.
Experimental Pipelines IO abstraction has been removed
======================================================
Pipelines in the form introduced by 2.2 has been found unintuitive and are therefore discontinued.
A new more flexible and easier-to-use abstraction will replace their role in the future. Pipelines
will be still available in the 2.2 series.
Changed cluster expected-response-after configuration Changed cluster expected-response-after configuration
===================================================== =====================================================

View file

@ -1,227 +0,0 @@
/**
* Copyright (C) 2013 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.io
import docs.io.japi.LengthFieldFrame;
import akka.testkit.{ AkkaSpec, EventFilter }
import akka.io._
import akka.util._
import akka.actor.{ Actor, ActorRef, Props, PoisonPill }
import scala.util.Success
import scala.util.Try
import scala.concurrent.duration._
class PipelinesDocSpec extends AkkaSpec {
//#data
case class Person(first: String, last: String)
case class HappinessCurve(points: IndexedSeq[Double])
case class Message(persons: Seq[Person], stats: HappinessCurve)
//#data
//#format
/**
* This trait is used to formualate a requirement for the pipeline context.
* In this example it is used to configure the byte order to be used.
*/
trait HasByteOrder extends PipelineContext {
def byteOrder: java.nio.ByteOrder
}
class MessageStage extends SymmetricPipelineStage[HasByteOrder, Message, ByteString] {
override def apply(ctx: HasByteOrder) = new SymmetricPipePair[Message, ByteString] {
implicit val byteOrder = ctx.byteOrder
/**
* Append a length-prefixed UTF-8 encoded string to the ByteStringBuilder.
*/
def putString(builder: ByteStringBuilder, str: String): Unit = {
val bs = ByteString(str, "UTF-8")
builder putInt bs.length
builder ++= bs
}
override val commandPipeline = { msg: Message =>
val bs = ByteString.newBuilder
// first store the persons
bs putInt msg.persons.size
msg.persons foreach { p =>
putString(bs, p.first)
putString(bs, p.last)
}
// then store the doubles
bs putInt msg.stats.points.length
bs putDoubles (msg.stats.points.toArray)
// and return the result as a command
ctx.singleCommand(bs.result)
}
//#decoding-omitted
//#decoding
def getString(iter: ByteIterator): String = {
val length = iter.getInt
val bytes = new Array[Byte](length)
iter getBytes bytes
ByteString(bytes).utf8String
}
override val eventPipeline = { bs: ByteString =>
val iter = bs.iterator
val personLength = iter.getInt
val persons =
(1 to personLength) map (_ => Person(getString(iter), getString(iter)))
val curveLength = iter.getInt
val curve = new Array[Double](curveLength)
iter getDoubles curve
// verify that this was all; could be left out to allow future extensions
assert(iter.isEmpty)
ctx.singleEvent(Message(persons, HappinessCurve(curve)))
}
//#decoding
//#mgmt-ticks
var lastTick = Duration.Zero
override val managementPort: Mgmt = {
case TickGenerator.Tick(timestamp) =>
//#omitted
testActor ! TickGenerator.Tick(timestamp)
import java.lang.String.{ valueOf => println }
//#omitted
println(s"time since last tick: ${timestamp - lastTick}")
lastTick = timestamp
Nil
}
//#mgmt-ticks
//#decoding-omitted
}
}
//#format
"A MessageStage" must {
//#message
val msg =
Message(
Seq(
Person("Alice", "Gibbons"),
Person("Bob", "Sparsely")),
HappinessCurve(Array(1.0, 3.0, 5.0)))
//#message
//#byteorder
val ctx = new HasByteOrder {
def byteOrder = java.nio.ByteOrder.BIG_ENDIAN
}
//#byteorder
"correctly encode and decode" in {
//#build-pipeline
val stages =
new MessageStage >>
new LengthFieldFrame(10000)
// using the extractor for the returned case class here
val PipelinePorts(cmd, evt, mgmt) =
PipelineFactory.buildFunctionTriple(ctx, stages)
val encoded: (Iterable[Message], Iterable[ByteString]) = cmd(msg)
//#build-pipeline
encoded._1 should have size 0
encoded._2 should have size 1
evt(encoded._2.head)._1 should equal(Seq(msg))
}
"demonstrate Injector/Sink" in {
val commandHandler = testActor
val eventHandler = testActor
//#build-sink
val stages =
new MessageStage >>
new LengthFieldFrame(10000)
val injector = PipelineFactory.buildWithSinkFunctions(ctx, stages)(
commandHandler ! _, // will receive messages of type Try[ByteString]
eventHandler ! _ // will receive messages of type Try[Message]
)
injector.injectCommand(msg)
//#build-sink
val encoded = expectMsgType[Success[ByteString]].get
injector.injectEvent(encoded)
expectMsgType[Try[Message]].get should equal(msg)
}
"demonstrate management port and context" in {
import TickGenerator.Tick
val proc = system.actorOf(Props(classOf[P], this, testActor, testActor), "processor")
expectMsgType[Tick]
proc ! msg
val encoded = expectMsgType[ByteString]
proc ! encoded
val decoded = expectMsgType[Message]
decoded should equal(msg)
within(1.5.seconds, 3.seconds) {
expectMsgType[Tick]
expectMsgType[Tick]
}
EventFilter[RuntimeException]("FAIL!", occurrences = 1) intercept {
proc ! "fail!"
}
within(1.5.seconds, 3.seconds) {
expectMsgType[Tick]
expectMsgType[Tick]
proc ! PoisonPill
expectNoMsg
}
}
}
//#actor
class Processor(cmds: ActorRef, evts: ActorRef) extends Actor {
val ctx = new HasActorContext with HasByteOrder {
def getContext = Processor.this.context
def byteOrder = java.nio.ByteOrder.BIG_ENDIAN
}
val pipeline = PipelineFactory.buildWithSinkFunctions(ctx,
new TickGenerator(1000.millis) >>
new MessageStage >>
new LengthFieldFrame(10000) //
)(
// failure in the pipeline will fail this actor
cmd => cmds ! cmd.get,
evt => evts ! evt.get)
def receive = {
case m: Message => pipeline.injectCommand(m)
case b: ByteString => pipeline.injectEvent(b)
case t: TickGenerator.Trigger => pipeline.managementCommand(t)
}
}
//#actor
class P(cmds: ActorRef, evts: ActorRef) extends Processor(cmds, evts) {
override def receive = ({
case "fail!" => throw new RuntimeException("FAIL!")
}: Receive) orElse super.receive
}
}

View file

@ -9,7 +9,6 @@ Networking
remoting remoting
serialization serialization
io io
io-codec
io-tcp io-tcp
io-udp io-udp
zeromq zeromq

View file

@ -1,269 +0,0 @@
.. _io-scala-codec:
Encoding and decoding binary data
=================================
.. note::
Previously Akka offered a specialized Iteratee implementation in the
``akka.actor.IO`` object which is now deprecated in favor of the pipeline
mechanism described here. The documentation for Iteratees can be found `here
<http://doc.akka.io/docs/akka/2.1.4/scala/io.html#Encoding_and_decoding_of_binary_data>`_.
.. warning::
The IO implementation is marked as **“experimental”** as of its introduction
in Akka 2.2.0. We will continue to improve this API based on our users
feedback, which implies that while we try to keep incompatible changes to a
minimum the binary compatibility guarantee for maintenance releases does not
apply to the contents of the `akka.io` package.
Akka adopted and adapted the implementation of data processing pipelines found
in the ``spray-io`` module. The idea is that encoding and decoding often
go hand in hand and keeping the code pertaining to one protocol layer together
is deemed more important than writing down the complete read side—say—in the
iteratee style in one go; pipelines encourage packaging the stages in a form
which lends itself better to reuse in a protocol stack. Another reason for
choosing this abstraction is that it is at times necessary to change the
behavior of encoding and decoding within a stage based on a message streams
state, and pipeline stages allow communication between the read and write
halves quite naturally.
The actual byte-fiddling can be done within pipeline stages, for example using
the rich API of :class:`ByteIterator` and :class:`ByteStringBuilder` as shown
below. All these activities are synchronous transformations which benefit
greatly from CPU affinity to make good use of those data caches. Therefore the
design of the pipeline infrastructure is completely synchronous, every stages
handler code can only directly return the events and/or commands resulting from
an input, there are no callbacks. Exceptions thrown within a pipeline stage
will abort processing of the whole pipeline under the assumption that
recoverable error conditions will be signaled in-band to the next stage instead
of raising an exception.
An overall “logical” pipeline can span multiple execution contexts, for example
starting with the low-level protocol layers directly within an actor handling
the reads and writes to a TCP connection and then being passed to a number of
higher-level actors which do the costly application level processing. This is
supported by feeding the generated events into a sink which sends them to
another actor, and that other actor will then upon reception feed them into its
own pipeline.
Introducing the Sample Protocol
-------------------------------
In the following the process of implementing a protocol stack using pipelines
is demonstrated on the following simple example:
.. code-block:: text
frameLen: Int
persons: Int
persons times {
first: String
last: String
}
points: Int
points times Double
mapping to the following data type:
.. includecode:: code/docs/io/Pipelines.scala#data
We will split the handling of this protocol into two parts: the frame-length
encoding handles the buffering necessary on the read side and the actual
encoding of the frame contents is done in a separate stage.
Building a Pipeline Stage
-------------------------
As a common example, which is also included in the ``akka-actor`` package, let
us look at a framing protocol which works by prepending a length field to each
message.
.. includecode:: ../../../akka-actor/src/main/scala/akka/io/Pipelines.scala
:include: length-field-frame
:exclude: range-checks-omitted
In the end a pipeline stage is nothing more than a set of three functions: one
transforming commands arriving from above, one transforming events arriving
from below and the third transforming incoming management commands (not shown
here, see below for more information). The result of the transformation can in
either case be a sequence of commands flowing downwards or events flowing
upwards (or a combination thereof).
In the case above the data type for commands and events are equal as both
functions operate only on ``ByteString``, and the transformation does not
change that type because it only adds or removes four octets at the front.
The pair of command and event transformation functions is represented by an
object of type :class:`PipePair`, or in this case a :class:`SymmetricPipePair`.
This object could benefit from knowledge about the context it is running in,
for example an :class:`Actor`, and this context is introduced by making a
:class:`PipelineStage` be a factory for producing a :class:`PipePair`. The
factory method is called :meth:`apply` (in good Scala tradition) and receives
the context object as its argument. The implementation of this factory method
could now make use of the context in whatever way it sees fit, you will see an
example further down.
Manipulating ByteStrings
------------------------
The second stage of our sample protocol stack illustrates in more depth what
showed only a little in the pipeline stage built above: constructing and
deconstructing byte strings. Let us first take a look at the encoder:
.. includecode:: code/docs/io/Pipelines.scala
:include: format
:exclude: decoding-omitted,omitted
Note how the byte order to be used by this stage is fixed in exactly one place,
making it impossible get wrong between commands and events; the way how the
byte order is passed into the stage demonstrates one possible use for the
stages ``context`` parameter.
The basic tool for constucting a :class:`ByteString` is a
:class:`ByteStringBuilder` which can be obtained by calling
:meth:`ByteString.newBuilder` since byte strings implement the
:class:`IndexesSeq[Byte]` interface of the standard Scala collections. This
builder knows a few extra tricks, though, for appending byte representations of
the primitive data types like ``Int`` and ``Double`` or arrays thereof.
Encoding a ``String`` requires a bit more work because not only the sequence of
bytes needs to be encoded but also the length, otherwise the decoding stage
would not know where the ``String`` terminates. When all values making up the
:class:`Message` have been appended to the builder, we simply pass the
resulting :class:`ByteString` on to the next stage as a command using the
optimized :meth:`singleCommand` facility.
.. warning::
The :meth:`singleCommand` and :meth:`singleEvent` methods provide a way to
generate responses which transfer exactly one result from one pipeline stage
to the next without suffering the overhead of object allocations. This means
that the returned collection object will not work for anything else (you will
get :class:`ClassCastExceptions`!) and this facility can only be used *EXACTLY
ONCE* during the processing of one input (command or event).
Now let us look at the decoder side:
.. includecode:: code/docs/io/Pipelines.scala
:include: decoding
The decoding side does the same things that the encoder does in the same order,
it just uses a :class:`ByteIterator` to retrieve primitive data types or arrays
of those from the underlying :class:`ByteString`. And in the end it hands the
assembled :class:`Message` as an event to the next stage using the optimized
:meth:`singleEvent` facility (see warning above).
Building a Pipeline
-------------------
Given the two pipeline stages introduced in the sections above we can now put
them to some use. First we define some message to be encoded:
.. includecode:: code/docs/io/Pipelines.scala
:include: message
Then we need to create a pipeline context which satisfies our declared needs:
.. includecode:: code/docs/io/Pipelines.scala
:include: byteorder
Building the pipeline and encoding this message then is quite simple:
.. includecode:: code/docs/io/Pipelines.scala
:include: build-pipeline
The tuple returned from :meth:`buildFunctionTriple` contains one function for
injecting commands, one for events and a third for injecting management
commands (see below). In this case we demonstrate how a single message ``msg``
is encoded by passing it into the ``cmd`` function. The return value is a pair
of sequences, one for the resulting events and the other for the resulting
commands. For the sample pipeline this will contain exactly one command—one
:class:`ByteString`. Decoding works in the same way, only with the ``evt``
function (which can again also result in commands being generated, although
that is not demonstrated in this sample).
Besides the more functional style there is also an explicitly side-effecting one:
.. includecode:: code/docs/io/Pipelines.scala
:include: build-sink
The functions passed into the :meth:`buildWithSinkFunctions` factory method
describe what shall happen to the commands and events as they fall out of the
pipeline. In this case we just send those to some actors, since that is usually
quite a good strategy for distributing the work represented by the messages.
The types of commands or events fed into the provided sink functions are
wrapped within :class:`Try` so that failures can also be encoded and acted
upon. This means that injecting into a pipeline using a
:class:`PipelineInjector` will catch exceptions resulting from processing the
input, in which case the exception (there can only be one per injection) is
passed into the respective sink.
Using the Pipelines Context
----------------------------
Up to this point there was always a parameter ``ctx`` which was used when
constructing a pipeline, but it was not explained in full. The context is a
piece of information which is made available to all stages of a pipeline. The
context may also carry behavior, provide infrastructure or helper methods etc.
It should be noted that the context is bound to the pipeline and as such must
not be accessed concurrently from different threads unless care is taken to
properly synchronize such access. Since the context will in many cases be
provided by an actor it is not recommended to share this context with code
executing outside of the actors message handling.
.. warning::
A PipelineContext instance *MUST NOT* be used by two different pipelines
since it contains mutable fields which are used during message processing.
Using Management Commands
-------------------------
Since pipeline stages do not have any reference to the pipeline or even to
their neighbors they cannot directly effect the injection of commands or events
outside of their normal processing. But sometimes things need to happen driven
by a timer, for example. In this case the timer would need to cause sending
tick messages to the whole pipeline, and those stages which wanted to receive
them would act upon those. In order to keep the type signatures for events and
commands useful, such external triggers are sent out-of-band, via a different
channel—the management port. One example which makes use of this facility is
the :class:`TickGenerator` which comes included with ``akka-actor``:
.. includecode:: ../../../akka-actor/src/main/scala/akka/io/Pipelines.scala
:include: tick-generator
This pipeline stage is to be used within an actor, and it will make use of this
context in order to schedule the delivery of :class:`TickGenerator.Trigger`
messages; the actor is then supposed to feed these messages into the management
port of the pipeline. An example could look like this:
.. includecode:: code/docs/io/Pipelines.scala#actor
This actor extends our well-known pipeline with the tick generator and attaches
the outputs to functions which send commands and events to actors for further
processing. The pipeline stages will then all receive one ``Tick`` per second
which can be used like so:
.. includecode:: code/docs/io/Pipelines.scala
:include: mgmt-ticks
:exclude: omitted
.. note::
Management commands are delivered to all stages of a pipeline “effectively
parallel”, like on a broadcast medium. No code will actually run concurrently
since a pipeline is strictly single-threaded, but the order in which these
commands are processed is not specified.
The intended purpose of management commands is for each stage to define its
special command types and then listen only to those (where the aforementioned
``Tick`` message is a useful counter-example), exactly like sending packets on
a wifi network where every station receives all traffic but reacts only to
those messages which are destined for it.
If you need all stages to react upon something in their defined order, then
this must be modeled either as a command or event, i.e. it will be part of the
“business” type of the pipeline.

View file

@ -3,14 +3,6 @@
Using TCP Using TCP
========= =========
.. warning::
The IO implementation is marked as **“experimental”** as of its introduction
in Akka 2.2.0. We will continue to improve this API based on our users
feedback, which implies that while we try to keep incompatible changes to a
minimum the binary compatibility guarantee for maintenance releases does not
apply to the contents of the `akka.io` package.
The code snippets through-out this section assume the following imports: The code snippets through-out this section assume the following imports:
.. includecode:: code/docs/io/IODocSpec.scala#imports .. includecode:: code/docs/io/IODocSpec.scala#imports
@ -279,60 +271,3 @@ behavior to await the :class:`WritingResumed` event and start over.
The helper functions are very similar to the ACK-based case: The helper functions are very similar to the ACK-based case:
.. includecode:: code/docs/io/EchoServer.scala#helpers .. includecode:: code/docs/io/EchoServer.scala#helpers
Usage Example: TcpPipelineHandler and SSL
-----------------------------------------
This example shows the different parts described above working together:
.. includecode:: ../../../akka-remote/src/test/scala/akka/io/ssl/SslTlsSupportSpec.scala#server
The actor above binds to a local port and registers itself as the handler for
new connections. When a new connection comes in it will create a
:class:`javax.net.ssl.SSLEngine` (details not shown here since they vary widely
for different setups, please refer to the JDK documentation) and wrap that in
an :class:`SslTlsSupport` pipeline stage (which is included in ``akka-actor``).
This sample demonstrates a few more things: below the SSL pipeline stage we
have inserted a backpressure buffer which will generate a
:class:`HighWatermarkReached` event to tell the upper stages to suspend writing
and a :class:`LowWatermarkReached` when they can resume writing. The
implementation is very similar to the NACK-based backpressure approach
presented above, please refer to the API docs for details on its usage. Above
the SSL stage comes an adapter which extracts only the payload data from the
TCP commands and events, i.e. it speaks :class:`ByteString` above. The
resulting byte streams are broken into frames by a :class:`DelimiterFraming`
stage which chops them up on newline characters. The top-most stage then
converts between :class:`String` and UTF-8 encoded :class:`ByteString`.
As a result the pipeline will accept simple :class:`String` commands, encode
them using UTF-8, delimit them with newlines (which are expected to be already
present in the sending direction), transform them into TCP commands and events,
encrypt them and send them off to the connection actor while buffering writes.
This pipeline is driven by a :class:`TcpPipelineHandler` actor which is also
included in ``akka-actor``. In order to capture the generic command and event
types consumed and emitted by that actor we need to create a wrapper—the nested
:class:`Init` class—which also provides the pipeline context needed by the
supplied pipeline; in this case we use the :meth:`withLogger` convenience
method which supplies a context that implements :class:`HasLogger` and
:class:`HasActorContext` and should be sufficient for typical pipelines. With
those things bundled up all that remains is creating a
:class:`TcpPipelineHandler` and registering that one as the recipient of
inbound traffic from the TCP connection. The pipeline handler is instructed to
send the decrypted payload data to the following actor:
.. includecode:: ../../../akka-remote/src/test/scala/akka/io/ssl/SslTlsSupportSpec.scala#handler
This actor computes a response and replies by sending back a :class:`String`.
It should be noted that communication with the :class:`TcpPipelineHandler`
wraps commands and events in the inner types of the ``init`` object in order to
keep things well separated.
.. warning::
The SslTlsSupport currently does not support using a ``Tcp.WriteCommand``
other than ``Tcp.Write``, like for example ``Tcp.WriteFile``. It also doesn't
support messages that are larger than the size of the send buffer on the socket.
Trying to send such a message will result in a ``CommandFailed``. If you need
to send large messages over SSL, then they have to be sent in chunks.

View file

@ -3,14 +3,6 @@
Using UDP Using UDP
========= =========
.. warning::
The IO implementation is marked as **“experimental”** as of its introduction
in Akka 2.2.0. We will continue to improve this API based on our users
feedback, which implies that while we try to keep incompatible changes to a
minimum the binary compatibility guarantee for maintenance releases does not
apply to the contents of the `akka.io` package.
UDP is a connectionless datagram protocol which offers two different ways of UDP is a connectionless datagram protocol which offers two different ways of
communication on the JDK level: communication on the JDK level:

View file

@ -11,14 +11,6 @@ and `spray.io`_ teams. Its design combines experiences from the
``spray-io`` module with improvements that were jointly developed for ``spray-io`` module with improvements that were jointly developed for
more general consumption as an actor-based service. more general consumption as an actor-based service.
.. warning::
The IO implementation is marked as **“experimental”** as of its introduction
in Akka 2.2.0. We will continue to improve this API based on our users
feedback, which implies that while we try to keep incompatible changes to a
minimum the binary compatibility guarantee for maintenance releases does not
apply to the contents of the `akka.io` package.
The guiding design goal for this I/O implementation was to reach extreme The guiding design goal for this I/O implementation was to reach extreme
scalability, make no compromises in providing an API correctly matching the scalability, make no compromises in providing an API correctly matching the
underlying transport mechanism and to be fully event-driven, non-blocking and underlying transport mechanism and to be fully event-driven, non-blocking and

View file

@ -1,358 +0,0 @@
/**
* Copyright (C) 2013 Typesafe Inc. <http://www.typesafe.com>
*/
// adapted from
// https://github.com/spray/spray/blob/eef5c4f54a0cadaf9e98298faf5b337f9adc04bb/spray-io-tests/src/test/scala/spray/io/SslTlsSupportSpec.scala
// original copyright notice follows:
/*
* Copyright (C) 2011-2013 spray.io
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package akka.io.ssl
import java.io.{ BufferedReader, BufferedWriter, InputStreamReader, OutputStreamWriter }
import java.net.{ InetSocketAddress, SocketException }
import java.security.{ KeyStore, SecureRandom }
import java.util.concurrent.atomic.AtomicInteger
import scala.concurrent.duration.DurationInt
import akka.TestUtils
import akka.actor.{ Actor, ActorLogging, ActorRef, Props, Terminated }
import akka.event.{ Logging, LoggingAdapter }
import akka.io.{ BackpressureBuffer, DelimiterFraming, IO, SslTlsSupport, StringByteStringAdapter, Tcp }
import akka.io.TcpPipelineHandler
import akka.io.TcpPipelineHandler.{ Init, Management, WithinActorContext }
import akka.io.TcpReadWriteAdapter
import akka.remote.security.provider.AkkaProvider
import akka.testkit.{ AkkaSpec, TestProbe }
import akka.util.{ ByteString, Timeout }
import javax.net.ssl._
import akka.actor.Deploy
// TODO move this into akka-actor once AkkaProvider for SecureRandom does not have external dependencies
class SslTlsSupportSpec extends AkkaSpec {
implicit val timeOut: Timeout = 1.second
val sslContext = SslTlsSupportSpec.createSslContext("/keystore", "/truststore", "changeme")
"The SslTlsSupport" should {
"work between a Java client and a Java server" in {
invalidateSessions()
val server = new JavaSslServer
val client = new JavaSslClient(server.address)
client.run()
val baselineSessionCounts = sessionCounts()
client.close()
// make sure not to lose sessions by invalid session closure
sessionCounts() === baselineSessionCounts
server.close()
sessionCounts() === baselineSessionCounts // see above
}
"work between a akka client and a Java server" in {
invalidateSessions()
val server = new JavaSslServer
val client = new AkkaSslClient(server.address)
client.run()
val baselineSessionCounts = sessionCounts()
client.close()
sessionCounts() === baselineSessionCounts // see above
server.close()
sessionCounts() === baselineSessionCounts // see above
}
"work between a Java client and a akka server" in {
invalidateSessions()
val serverAddress = TestUtils.temporaryServerAddress()
val probe = TestProbe()
val bindHandler = probe.watch(system.actorOf(Props(new AkkaSslServer(serverAddress)).withDeploy(Deploy.local), "server1"))
expectMsg(Tcp.Bound)
val client = new JavaSslClient(serverAddress)
client.run()
val baselineSessionCounts = sessionCounts()
client.close()
sessionCounts() === baselineSessionCounts // see above
probe.expectTerminated(bindHandler)
}
"work between a akka client and a akka server" in {
invalidateSessions()
val serverAddress = TestUtils.temporaryServerAddress()
val probe = TestProbe()
val bindHandler = probe.watch(system.actorOf(Props(new AkkaSslServer(serverAddress)).withDeploy(Deploy.local), "server2"))
expectMsg(Tcp.Bound)
val client = new AkkaSslClient(serverAddress)
client.run()
val baselineSessionCounts = sessionCounts()
client.close()
sessionCounts() === baselineSessionCounts // see above
probe.expectTerminated(bindHandler)
}
"work between an akka client and a Java server with confirmedClose" in {
invalidateSessions()
val server = new JavaSslServer
val client = new AkkaSslClient(server.address)
client.run()
val baselineSessionCounts = sessionCounts()
client.closeConfirmed()
sessionCounts() === baselineSessionCounts // see above
server.close()
sessionCounts() === baselineSessionCounts // see above
}
"akka client runs the full shutdown sequence if peer closes" in {
invalidateSessions()
val server = new JavaSslServer
val client = new AkkaSslClient(server.address)
client.run()
val baselineSessionCounts = serverSessions().length
server.close()
client.peerClosed()
// we only check the akka side server sessions here
// the java client seems to lose the session for some reason
serverSessions().length === baselineSessionCounts
}
}
val counter = new AtomicInteger
class AkkaSslClient(address: InetSocketAddress) {
val probe = TestProbe()
probe.send(IO(Tcp), Tcp.Connect(address))
val connected = probe.expectMsgType[Tcp.Connected]
val connection = probe.sender
val init = TcpPipelineHandler.withLogger(system.log,
new StringByteStringAdapter >>
new DelimiterFraming(maxSize = 1024, delimiter = ByteString('\n'), includeDelimiter = true) >>
new TcpReadWriteAdapter >>
new SslTlsSupport(sslEngine(connected.remoteAddress, client = true)))
import init._
val handler = system.actorOf(TcpPipelineHandler.props(init, connection, probe.ref).withDeploy(Deploy.local),
"client" + counter.incrementAndGet())
probe.send(connection, Tcp.Register(handler, keepOpenOnPeerClosed = true))
def run() {
probe.send(handler, Command("3+4\n"))
probe.expectMsg(Event("7\n"))
probe.send(handler, Command("20+22\n"))
probe.expectMsg(Event("42\n"))
probe.send(handler, Command("12+24\n11+1"))
Thread.sleep(1000) // Exercise framing by waiting at a mid-frame point
probe.send(handler, Command("1\n0+0\n"))
probe.expectMsg(Event("36\n"))
probe.expectMsg(Event("22\n"))
probe.expectMsg(Event("0\n"))
}
def peerClosed(): Unit = {
probe.expectMsg(Tcp.PeerClosed)
TestUtils.verifyActorTermination(handler)
}
def close() {
probe.send(handler, Management(Tcp.Close))
probe.expectMsgType[Tcp.ConnectionClosed]
TestUtils.verifyActorTermination(handler)
}
def closeConfirmed(): Unit = {
probe.send(handler, Management(Tcp.ConfirmedClose))
probe.expectMsg(Tcp.ConfirmedClosed)
TestUtils.verifyActorTermination(handler)
}
}
//#server
class AkkaSslServer(local: InetSocketAddress) extends Actor with ActorLogging {
import Tcp._
implicit def system = context.system
IO(Tcp) ! Bind(self, local)
def receive: Receive = {
case _: Bound
context.become(bound(sender))
//#server
testActor ! Bound
//#server
}
def bound(listener: ActorRef): Receive = {
case Connected(remote, _)
val init = TcpPipelineHandler.withLogger(log,
new StringByteStringAdapter("utf-8") >>
new DelimiterFraming(maxSize = 1024, delimiter = ByteString('\n'),
includeDelimiter = true) >>
new TcpReadWriteAdapter >>
new SslTlsSupport(sslEngine(remote, client = false)) >>
new BackpressureBuffer(lowBytes = 100, highBytes = 1000, maxBytes = 1000000))
val connection = sender
val handler = context.actorOf(Props(new AkkaSslHandler(init)).withDeploy(Deploy.local))
//#server
context watch handler
//#server
val pipeline = context.actorOf(TcpPipelineHandler.props(
init, connection, handler).withDeploy(Deploy.local))
connection ! Tcp.Register(pipeline)
//#server
case _: Terminated
listener ! Unbind
context.become {
case Unbound context stop self
}
//#server
}
}
//#server
//#handler
class AkkaSslHandler(init: Init[WithinActorContext, String, String])
extends Actor with ActorLogging {
def receive = {
case init.Event(data)
val input = data.dropRight(1)
log.debug("akka-io Server received {} from {}", input, sender)
val response = serverResponse(input)
sender ! init.Command(response)
log.debug("akka-io Server sent: {}", response.dropRight(1))
case _: Tcp.ConnectionClosed context.stop(self)
}
}
//#handler
class JavaSslServer extends Thread {
val log: LoggingAdapter = Logging(system, getClass)
val address = TestUtils.temporaryServerAddress()
private val serverSocket =
sslContext.getServerSocketFactory.createServerSocket(address.getPort).asInstanceOf[SSLServerSocket]
@volatile private var socket: SSLSocket = _
start()
def close() {
serverSocket.close()
if (socket != null) socket.close()
}
override def run() {
try {
socket = serverSocket.accept().asInstanceOf[SSLSocket]
val (reader, writer) = readerAndWriter(socket)
while (true) {
val line = reader.readLine()
log.debug("SSLServerSocket Server received: {}", line)
if (line == null) throw new SocketException("closed")
val result = serverResponse(line)
writer.write(result)
writer.flush()
log.debug("SSLServerSocket Server sent: {}", result.dropRight(1))
}
} catch {
case _: SocketException // expected during shutdown
} finally close()
}
}
class JavaSslClient(address: InetSocketAddress) {
val socket = sslContext.getSocketFactory.createSocket(address.getHostName, address.getPort).asInstanceOf[SSLSocket]
val (reader, writer) = readerAndWriter(socket)
val log: LoggingAdapter = Logging(system, getClass)
def run() {
write("1+2")
readLine() should equal("3")
write("12+24")
readLine() should equal("36")
}
def write(string: String) {
writer.write(string + "\n")
writer.flush()
log.debug("SSLSocket Client sent: {}", string)
}
def readLine() = {
val string = reader.readLine()
log.debug("SSLSocket Client received: {}", string)
string
}
def close() { socket.close() }
}
def readerAndWriter(socket: SSLSocket) = {
val reader = new BufferedReader(new InputStreamReader(socket.getInputStream))
val writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream))
reader -> writer
}
def serverResponse(input: String): String = input.split('+').map(_.toInt).reduceLeft(_ + _).toString + '\n'
def sslEngine(address: InetSocketAddress, client: Boolean) = {
val engine = sslContext.createSSLEngine(address.getHostName, address.getPort)
engine.setUseClientMode(client)
engine
}
import collection.JavaConverters._
def clientSessions() = sessions(_.getServerSessionContext)
def serverSessions() = sessions(_.getClientSessionContext)
def sessionCounts() = (clientSessions().length, serverSessions().length)
def sessions(f: SSLContext SSLSessionContext): Seq[SSLSession] = {
val ctx = f(sslContext)
val ids = ctx.getIds().asScala.toIndexedSeq
ids.map(ctx.getSession)
}
def invalidateSessions() = {
clientSessions().foreach(_.invalidate())
serverSessions().foreach(_.invalidate())
}
}
object SslTlsSupportSpec {
def createSslContext(keyStoreResource: String, trustStoreResource: String, password: String): SSLContext = {
val keyStore = KeyStore.getInstance("jks")
keyStore.load(getClass.getResourceAsStream(keyStoreResource), password.toCharArray)
val keyManagerFactory = KeyManagerFactory.getInstance("SunX509")
keyManagerFactory.init(keyStore, password.toCharArray)
val trustStore = KeyStore.getInstance("jks")
trustStore.load(getClass.getResourceAsStream(trustStoreResource), password.toCharArray())
val trustManagerFactory = TrustManagerFactory.getInstance("SunX509")
trustManagerFactory.init(trustStore)
val context = SSLContext.getInstance("SSL")
val rng = SecureRandom.getInstance("AES128CounterSecureRNG", AkkaProvider)
rng.nextInt() // if it stalls then it stalls here
context.init(keyManagerFactory.getKeyManagers, trustManagerFactory.getTrustManagers, rng)
context
}
}