diff --git a/akka-actor-tests/src/test/scala/akka/io/PipelineSpec.scala b/akka-actor-tests/src/test/scala/akka/io/PipelineSpec.scala new file mode 100644 index 0000000000..1d09b92bfd --- /dev/null +++ b/akka-actor-tests/src/test/scala/akka/io/PipelineSpec.scala @@ -0,0 +1,224 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ + +package akka.io + +import akka.testkit.AkkaSpec +import akka.util.ByteString +import scala.annotation.tailrec +import java.nio.ByteOrder +import scala.concurrent.forkjoin.ThreadLocalRandom +import scala.util.Try +import scala.util.Success + +class PipelineSpec extends AkkaSpec { + + trait Level1 + trait Level2 + trait Level3 + trait Level4 + + trait LevelFactory[Lvl] { + def msgA: Lvl + def msgB: Lvl + } + + implicit object Level1 extends LevelFactory[Level1] { + object msgA extends Level1 { override def toString = "Lvl1msgA" } + object msgB extends Level1 { override def toString = "Lvl1msgB" } + } + + implicit object Level2 extends LevelFactory[Level2] { + object msgA extends Level2 { override def toString = "Lvl2msgA" } + object msgB extends Level2 { override def toString = "Lvl2msgB" } + } + + implicit object Level3 extends LevelFactory[Level3] { + object msgA extends Level3 { override def toString = "Lvl3msgA" } + object msgB extends Level3 { override def toString = "Lvl3msgB" } + } + + implicit object Level4 extends LevelFactory[Level4] { + object msgA extends Level4 { override def toString = "Lvl4msgA" } + object msgB extends Level4 { override def toString = "Lvl4msgB" } + } + + val ctx = new PipelineContext {} + + "A Pipeline" must { + + "be correctly evaluated if single stage" in { + val PipelinePorts(cmd, evt, _) = + PipelineFactory.buildFunctionTriple(ctx, stage[Level2, Level1](1, 0, false)) + cmd(Level2.msgA) must be(Nil -> Seq(Level1.msgA)) + evt(Level1.msgA) must be(Seq(Level2.msgA) -> Nil) + cmd(Level2.msgB) must be(Nil -> Seq(Level1.msgB)) + evt(Level1.msgB) must be(Seq(Level2.msgB) -> Nil) + } + + "be correctly evaluated when two combined" in { + val stage1 = stage[Level3, Level2](1, 0, false) + val stage2 = stage[Level2, Level1](1, 0, false) + val PipelinePorts(cmd, evt, _) = PipelineFactory.buildFunctionTriple(ctx, stage1 >> stage2) + cmd(Level3.msgA) must be(Nil -> Seq(Level1.msgA)) + evt(Level1.msgA) must be(Seq(Level3.msgA) -> Nil) + cmd(Level3.msgB) must be(Nil -> Seq(Level1.msgB)) + evt(Level1.msgB) must be(Seq(Level3.msgB) -> Nil) + } + + "be correctly evaluated when three combined" in { + val stage1 = stage[Level4, Level3](1, 0, false) + val stage2 = stage[Level3, Level2](2, 0, false) + val stage3 = stage[Level2, Level1](1, 0, false) + val PipelinePorts(cmd, evt, _) = PipelineFactory.buildFunctionTriple(ctx, stage1 >> stage2 >> stage3) + cmd(Level4.msgA) must be(Nil -> Seq(Level1.msgA, Level1.msgA)) + evt(Level1.msgA) must be(Seq(Level4.msgA, Level4.msgA) -> Nil) + cmd(Level4.msgB) must be(Nil -> Seq(Level1.msgB, Level1.msgB)) + evt(Level1.msgB) must be(Seq(Level4.msgB, Level4.msgB) -> Nil) + } + + "be correctly evaluated with back-scatter" in { + val stage1 = stage[Level4, Level3](1, 0, true) + val stage2 = stage[Level3, Level2](1, 1, true) + val stage3 = stage[Level2, Level1](1, 0, false) + val PipelinePorts(cmd, evt, _) = PipelineFactory.buildFunctionTriple(ctx, stage1 >> stage2 >> stage3) + cmd(Level4.msgA) must be(Seq(Level4.msgB) -> Seq(Level1.msgA)) + evt(Level1.msgA) must be(Seq(Level4.msgA) -> Seq(Level1.msgB)) + } + + "handle management commands" in { + val stage1 = stage[Level4, Level3](1, 0, true, { case "doit" ⇒ Seq(Left(Level4.msgA), Right(Level3.msgA)) }) + val stage2 = stage[Level3, Level2](2, 0, true, { case "doit" ⇒ Seq(Left(Level3.msgA), Right(Level2.msgA)) }) + val stage3 = stage[Level2, Level1](1, 0, true, { case "doit" ⇒ Seq(Left(Level2.msgA), Right(Level1.msgA)) }) + val PipelinePorts(cmd, evt, mgmt) = PipelineFactory.buildFunctionTriple(ctx, stage1 >> stage2 >> stage3) + mgmt(42: java.lang.Integer) must be(Seq() -> Seq()) + val (events, commands) = mgmt("doit") + events must have size 4 + events count (_ == Level4.msgA) must be === 3 + events count (_ == Level4.msgB) must be === 1 + commands must have size 4 + commands count (_ == Level1.msgA) must be === 3 + commands count (_ == Level1.msgB) must be === 1 + } + + } + + def stage[Above: LevelFactory, Below: LevelFactory](forward: Int, backward: Int, invert: Boolean, + mgmt: SymmetricPipePair[Above, Below]#Mgmt = PartialFunction.empty) = + new SymmetricPipelineStage[PipelineContext, Above, Below] { + override def apply(ctx: PipelineContext) = { + val above = implicitly[LevelFactory[Above]] + val below = implicitly[LevelFactory[Below]] + PipePairFactory( + { a ⇒ + val msgA = a == above.msgA + val msgAbove = if (invert ^ msgA) above.msgA else above.msgB + val msgBelow = if (invert ^ msgA) below.msgA else below.msgB + (for (_ ← 1 to forward) yield Right(msgBelow)) ++ (for (_ ← 1 to backward) yield Left(msgAbove)) + }, + { b ⇒ + val msgA = b == below.msgA + val msgAbove = if (invert ^ msgA) above.msgA else above.msgB + val msgBelow = if (invert ^ msgA) below.msgA else below.msgB + (for (_ ← 1 to forward) yield Left(msgAbove)) ++ (for (_ ← 1 to backward) yield Right(msgBelow)) + }, + mgmt) + } + } + +} + +object PipelineBench extends App { + + val frame = new LengthFieldFrame(32000) + val frames = frame >> frame >> frame >> frame + + val ctx = new PipelineContext {} + // this way of creating a pipeline is not user API + val pipe = frames(ctx) + + val hello = ByteString("hello") + // ctx.dealias is only necessary because this is a “raw” pipe, not user API + val bytes = ctx.dealias(pipe.commandPipeline(ByteString("hello"))).head.fold(identity, identity).compact + println(bytes) + println(pipe.eventPipeline(bytes)) + + class Bytes { + var pos = 0 + var emitted = 0 + def get(): ByteString = { + val r = ThreadLocalRandom.current() + val l = r.nextInt(2 * bytes.length) + @tailrec def rec(left: Int, acc: ByteString): ByteString = { + if (pos + left <= bytes.length) { + val result = acc ++ bytes.slice(pos, pos + left) + pos = (pos + left) % bytes.length + result + } else { + val oldpos = pos + pos = 0 + rec(left - bytes.length + oldpos, acc ++ bytes.slice(oldpos, bytes.length)) + } + } + emitted += l + rec(l, ByteString.empty) + } + } + + println("warming up") + + val bpp = new Bytes + + { + println(" ... PipePair") + val y = for (_ ← 1 to 500000; x ← ctx.dealias(pipe.eventPipeline(bpp.get()))) yield x + assert(y forall { case Left(b) ⇒ b == ByteString("hello"); case _ ⇒ false }) + assert(y.size == bpp.emitted / bytes.length) + } + + val PipelinePorts(_, evt, _) = PipelineFactory.buildFunctionTriple(ctx, frames) + val bft = new Bytes + + { + println(" ... FunctionTriple") + val y = for (_ ← 1 to 500000; x ← evt(bft.get())._1) yield x + assert(y forall (_ == ByteString("hello"))) + assert(y.size == bft.emitted / bytes.length) + } + + var injected = 0 + val inj = PipelineFactory.buildWithSinkFunctions(ctx, frames)(_ ⇒ Nil, { case Success(bs) if bs == hello ⇒ injected += 1 }) + val bij = new Bytes + + { + println(" ... Injector") + for (_ ← 1 to 500000) inj.injectEvent(bij.get()) + assert(injected == bij.emitted / bytes.length) + } + + val N = 1000000 + + { + val start = System.nanoTime + val y = for (_ ← 1 to N; x ← ctx.dealias(pipe.eventPipeline(bpp.get()))) yield x + val time = System.nanoTime - start + println(s"PipePair: 1 iteration took ${time / N}ns (${y.size})") + } + + { + val start = System.nanoTime + val y = for (_ ← 1 to N; x ← evt(bft.get())._1) yield x + val time = System.nanoTime - start + println(s"FunctionTriple: 1 iteration took ${time / N}ns (${y.size})") + } + + { + injected = 0 + val start = System.nanoTime + for (_ ← 1 to N) inj.injectEvent(bij.get()) + val time = System.nanoTime - start + println(s"Injector: 1 iteration took ${time / N}ns ($injected)") + } + +} diff --git a/akka-actor/src/main/scala/akka/actor/FaultHandling.scala b/akka-actor/src/main/scala/akka/actor/FaultHandling.scala index 04fbdc69eb..2bfa1c60ea 100644 --- a/akka-actor/src/main/scala/akka/actor/FaultHandling.scala +++ b/akka-actor/src/main/scala/akka/actor/FaultHandling.scala @@ -393,7 +393,7 @@ case class AllForOneStrategy( */ private val retriesWindow = (maxNrOfRetriesOption(maxNrOfRetries), withinTimeRangeOption(withinTimeRange).map(_.toMillis.toInt)) - def handleChildTerminated(context: ActorContext, child: ActorRef, children: Iterable[ActorRef]): Unit = {} + def handleChildTerminated(context: ActorContext, child: ActorRef, children: Iterable[ActorRef]): Unit = () def processFailure(context: ActorContext, restart: Boolean, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Unit = { if (children.nonEmpty) { @@ -440,7 +440,7 @@ case class OneForOneStrategy( SupervisorStrategy.maxNrOfRetriesOption(maxNrOfRetries), SupervisorStrategy.withinTimeRangeOption(withinTimeRange).map(_.toMillis.toInt)) - def handleChildTerminated(context: ActorContext, child: ActorRef, children: Iterable[ActorRef]): Unit = {} + def handleChildTerminated(context: ActorContext, child: ActorRef, children: Iterable[ActorRef]): Unit = () def processFailure(context: ActorContext, restart: Boolean, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Unit = { if (restart && stats.requestRestartPermission(retriesWindow)) diff --git a/akka-actor/src/main/scala/akka/io/Pipelines.scala b/akka-actor/src/main/scala/akka/io/Pipelines.scala new file mode 100644 index 0000000000..0126547bbf --- /dev/null +++ b/akka-actor/src/main/scala/akka/io/Pipelines.scala @@ -0,0 +1,885 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ + +package akka.io + +import java.lang.{ Iterable ⇒ JIterable } +import scala.annotation.tailrec +import scala.util.{ Try, Success, Failure } +import java.nio.ByteOrder +import akka.util.ByteString +import scala.collection.mutable +import akka.actor.ActorContext +import scala.concurrent.duration.FiniteDuration +import scala.collection.mutable.WrappedArray +import scala.concurrent.duration.Deadline +import scala.beans.BeanProperty + +/** + * Scala API: A pair of pipes, one for commands and one for events, plus a + * management port. Commands travel from top to bottom, events from bottom to + * top. All messages which need to be handled “in-order” (e.g. top-down or + * bottom-up) need to be either events or commands; management messages are + * processed in no particular order. + * + * Java base classes are provided in the form of [[AbstractPipePair]] + * and [[AbstractSymmetricPipePair]] since the Scala function types can be + * awkward to handle in Java. + * + * @see [[PipelineStage]] + * @see [[AbstractPipePair]] + * @see [[AbstractSymmetricPipePair]] + * @see [[PipePairFactory]] + */ +trait PipePair[CmdAbove, CmdBelow, EvtAbove, EvtBelow] { + + type Mgmt = PartialFunction[AnyRef, Iterable[Either[EvtAbove, CmdBelow]]] + + /** + * The command pipeline transforms injected commands from the upper stage + * into commands for the stage below, but it can also emit events for the + * upper stage. Any number of each can be generated. + */ + def commandPipeline: CmdAbove ⇒ Iterable[Either[EvtAbove, CmdBelow]] + + /** + * The event pipeline transforms injected event from the lower stage + * into event for the stage above, but it can also emit commands for the + * stage below. Any number of each can be generated. + */ + def eventPipeline: EvtBelow ⇒ Iterable[Either[EvtAbove, CmdBelow]] + + /** + * The management port allows sending broadcast messages to all stages + * within this pipeline. This can be used to communicate with stages in the + * middle without having to thread those messages through the surrounding + * stages. Each stage can generate events and commands in response to a + * command, and the aggregation of all those is returned. + * + * The default implementation ignores all management commands. + */ + def managementPort: Mgmt = PartialFunction.empty +} + +/** + * A convenience type for expressing a [[PipePair]] which has the same types + * for commands and events. + */ +trait SymmetricPipePair[Above, Below] extends PipePair[Above, Below, Above, Below] + +/** + * Java API: A pair of pipes, one for commands and one for events. Commands travel from + * top to bottom, events from bottom to top. + * + * @see [[PipelineStage]] + * @see [[AbstractSymmetricPipePair]] + * @see [[PipePairFactory]] + */ +abstract class AbstractPipePair[CmdAbove, CmdBelow, EvtAbove, EvtBelow] { + + /** + * Commands reaching this pipe pair are transformed into a sequence of + * commands for the next or events for the previous stage. + * + * Throwing exceptions within this method will abort processing of the whole + * pipeline which this pipe pair is part of. + * + * @param cmd the incoming command + * @return an Iterable of elements which are either events or commands + * + * @see [[#makeCommand]] + * @see [[#makeEvent]] + */ + def onCommand(cmd: CmdAbove): JIterable[Either[EvtAbove, CmdBelow]] + + /** + * Events reaching this pipe pair are transformed into a sequence of + * commands for the next or events for the previous stage. + * + * Throwing exceptions within this method will abort processing of the whole + * pipeline which this pipe pair is part of. + * + * @param cmd the incoming command + * @return an Iterable of elements which are either events or commands + * + * @see [[#makeCommand]] + * @see [[#makeEvent]] + */ + def onEvent(event: EvtBelow): JIterable[Either[EvtAbove, CmdBelow]] + + /** + * Management commands are sent to all stages in a broadcast fashion, + * conceptually in parallel (but not actually executing a stage + * reentrantly in case of events or commands being generated in response + * to a management command). + */ + def onManagementCommand(cmd: AnyRef): JIterable[Either[EvtAbove, CmdBelow]] = + java.util.Collections.emptyList() + + /** + * Helper method for wrapping a command which shall be emitted. + */ + def makeCommand(cmd: CmdBelow): Either[EvtAbove, CmdBelow] = Right(cmd) + + /** + * Helper method for wrapping an event which shall be emitted. + */ + def makeEvent(event: EvtAbove): Either[EvtAbove, CmdBelow] = Left(event) + + /** + * INTERNAL API: do not touch! + */ + private[io] val _internal$cmd = { + val l = new java.util.ArrayList[AnyRef](1) + l add null + l + } + /** + * INTERNAL API: do not touch! + */ + private[io] val _internal$evt = { + val l = new java.util.ArrayList[AnyRef](1) + l add null + l + } + + /** + * Wrap a single command for efficient return to the pipeline’s machinery. + * This method avoids allocating a [[Right]] and an [[java.lang.Iterable]] by reusing + * one such instance within the AbstractPipePair, hence it can be used ONLY ONCE by + * each pipeline stage. Prototypic and safe usage looks like this: + * + * {{{ + * final MyResult result = ... ; + * return singleCommand(result); + * }}} + * + * @see PipelineContext#singleCommand + */ + def singleCommand(cmd: CmdBelow): JIterable[Either[EvtAbove, CmdBelow]] = { + _internal$cmd.set(0, cmd.asInstanceOf[AnyRef]) + _internal$cmd.asInstanceOf[JIterable[Either[EvtAbove, CmdBelow]]] + } + + /** + * Wrap a single event for efficient return to the pipeline’s machinery. + * This method avoids allocating a [[Left]] and an [[Iterable]] by reusing + * one such instance within the AbstractPipePair, hence it can be used ONLY ONCE by + * each pipeline stage. Prototypic and safe usage looks like this: + * + * {{{ + * final MyResult result = ... ; + * return singleEvent(result); + * }}} + * + * @see PipelineContext#singleEvent + */ + def singleEvent(evt: EvtAbove): JIterable[Either[EvtAbove, CmdBelow]] = { + _internal$evt.set(0, evt.asInstanceOf[AnyRef]) + _internal$evt.asInstanceOf[JIterable[Either[EvtAbove, CmdBelow]]] + } + + /** + * INTERNAL API: Dealias a possibly optimized return value such that it can + * be safely used; this is never needed when only using public API. + */ + def dealias[Cmd, Evt](msg: JIterable[Either[Evt, Cmd]]): JIterable[Either[Evt, Cmd]] = { + import java.util.Collections.singletonList + if (msg eq _internal$cmd) singletonList(Right(_internal$cmd.get(0).asInstanceOf[Cmd])) + else if (msg eq _internal$evt) singletonList(Left(_internal$evt.get(0).asInstanceOf[Evt])) + else msg + } +} + +/** + * A convenience type for expressing a [[AbstractPipePair]] which has the same types + * for commands and events. + */ +abstract class AbstractSymmetricPipePair[Above, Below] extends AbstractPipePair[Above, Below, Above, Below] + +/** + * This class contains static factory methods which produce [[PipePair]] + * instances; those are needed within the implementation of [[PipelineStage#apply]]. + */ +object PipePairFactory { + + /** + * Scala API: construct a [[PipePair]] from the two given functions; useful for not capturing `$outer` references. + */ + def apply[CmdAbove, CmdBelow, EvtAbove, EvtBelow] // + (commandPL: CmdAbove ⇒ Iterable[Either[EvtAbove, CmdBelow]], + eventPL: EvtBelow ⇒ Iterable[Either[EvtAbove, CmdBelow]], + management: PartialFunction[AnyRef, Iterable[Either[EvtAbove, CmdBelow]]] = PartialFunction.empty) = + new PipePair[CmdAbove, CmdBelow, EvtAbove, EvtBelow] { + override def commandPipeline = commandPL + override def eventPipeline = eventPL + override def managementPort = management + } + + private abstract class Converter[CmdAbove <: AnyRef, CmdBelow <: AnyRef, EvtAbove <: AnyRef, EvtBelow <: AnyRef] // + (val ap: AbstractPipePair[CmdAbove, CmdBelow, EvtAbove, EvtBelow], ctx: PipelineContext) { + import scala.collection.JavaConverters._ + protected def normalize(output: JIterable[Either[EvtAbove, CmdBelow]]): Iterable[Either[EvtAbove, CmdBelow]] = + if (output == java.util.Collections.EMPTY_LIST) Nil + else if (output eq ap._internal$cmd) ctx.singleCommand(ap._internal$cmd.get(0).asInstanceOf[CmdBelow]) + else if (output eq ap._internal$evt) ctx.singleEvent(ap._internal$evt.get(0).asInstanceOf[EvtAbove]) + else output.asScala + } + + /** + * Java API: construct a [[PipePair]] from the given [[AbstractPipePair]]. + */ + def create[CmdAbove <: AnyRef, CmdBelow <: AnyRef, EvtAbove <: AnyRef, EvtBelow <: AnyRef] // + (ctx: PipelineContext, ap: AbstractPipePair[CmdAbove, CmdBelow, EvtAbove, EvtBelow]) // + : PipePair[CmdAbove, CmdBelow, EvtAbove, EvtBelow] = + new Converter(ap, ctx) with PipePair[CmdAbove, CmdBelow, EvtAbove, EvtBelow] { + override val commandPipeline = { cmd: CmdAbove ⇒ normalize(ap.onCommand(cmd)) } + override val eventPipeline = { evt: EvtBelow ⇒ normalize(ap.onEvent(evt)) } + override val managementPort: Mgmt = { case x ⇒ normalize(ap.onManagementCommand(x)) } + } + + /** + * Java API: construct a [[PipePair]] from the given [[AbstractSymmetricPipePair]]. + */ + def create[Above <: AnyRef, Below <: AnyRef] // + (ctx: PipelineContext, ap: AbstractSymmetricPipePair[Above, Below]): SymmetricPipePair[Above, Below] = + new Converter(ap, ctx) with SymmetricPipePair[Above, Below] { + override val commandPipeline = { cmd: Above ⇒ normalize(ap.onCommand(cmd)) } + override val eventPipeline = { evt: Below ⇒ normalize(ap.onEvent(evt)) } + override val managementPort: Mgmt = { case x ⇒ normalize(ap.onManagementCommand(x)) } + } +} + +case class PipelinePorts[CmdAbove, CmdBelow, EvtAbove, EvtBelow]( + commands: CmdAbove ⇒ (Iterable[EvtAbove], Iterable[CmdBelow]), + events: EvtBelow ⇒ (Iterable[EvtAbove], Iterable[CmdBelow]), + management: PartialFunction[AnyRef, (Iterable[EvtAbove], Iterable[CmdBelow])]) + +/** + * This class contains static factory methods which turn a pipeline context + * and a [[PipelineStage]] into readily usable pipelines. + */ +object PipelineFactory { + + /** + * Scala API: build the pipeline and return a pair of functions representing + * the command and event pipelines. Each function returns the commands and + * events resulting from running the pipeline on the given input, where the + * the sequence of events is the first element of the returned pair and the + * sequence of commands the second element. + * + * Exceptions thrown by the pipeline stages will not be caught. + * + * @param ctx The context object for this pipeline + * @param stage The (composite) pipeline stage from whcih to build the pipeline + * @return a pair of command and event pipeline functions + */ + def buildFunctionTriple[Ctx <: PipelineContext, CmdAbove, CmdBelow, EvtAbove, EvtBelow] // + (ctx: Ctx, stage: PipelineStage[Ctx, CmdAbove, CmdBelow, EvtAbove, EvtBelow]) // + : PipelinePorts[CmdAbove, CmdBelow, EvtAbove, EvtBelow] = { + val pp = stage apply ctx + val split: (Iterable[Either[EvtAbove, CmdBelow]]) ⇒ (Iterable[EvtAbove], Iterable[CmdBelow]) = { in ⇒ + if (in.isEmpty) (Nil, Nil) + else if (in eq ctx.cmd) (Nil, Seq[CmdBelow](ctx.cmd(0))) + else if (in eq ctx.evt) (Seq[EvtAbove](ctx.evt(0)), Nil) + else { + val cmds = Vector.newBuilder[CmdBelow] + val evts = Vector.newBuilder[EvtAbove] + in foreach { + case Right(cmd) ⇒ cmds += cmd + case Left(evt) ⇒ evts += evt + } + (evts.result, cmds.result) + } + } + PipelinePorts(pp.commandPipeline andThen split, pp.eventPipeline andThen split, pp.managementPort andThen split) + } + + /** + * Scala API: build the pipeline attaching the given command and event sinks + * to its outputs. Exceptions thrown within the pipeline stages will abort + * processing (i.e. will not be processed in following stages) but will be + * caught and passed as [[scala.util.Failure]] into the respective sink. + * + * Exceptions thrown while processing management commands are not caught. + * + * @param ctx The context object for this pipeline + * @param stage The (composite) pipeline stage from whcih to build the pipeline + * @param commandSink The function to invoke for commands or command failures + * @param eventSink The function to invoke for events or event failures + * @return a handle for injecting events or commands into the pipeline + */ + def buildWithSinkFunctions[Ctx <: PipelineContext, CmdAbove, CmdBelow, EvtAbove, EvtBelow] // + (ctx: Ctx, + stage: PipelineStage[Ctx, CmdAbove, CmdBelow, EvtAbove, EvtBelow])( + commandSink: Try[CmdBelow] ⇒ Unit, + eventSink: Try[EvtAbove] ⇒ Unit): PipelineInjector[CmdAbove, EvtBelow] = + new PipelineInjector[CmdAbove, EvtBelow] { + val pl = stage(ctx) + override def injectCommand(cmd: CmdAbove): Unit = { + Try(pl.commandPipeline(cmd)) match { + case f: Failure[_] ⇒ commandSink(f.asInstanceOf[Try[CmdBelow]]) + case Success(out) ⇒ + if (out.isEmpty) () // nothing + else if (out eq ctx.cmd) commandSink(Success(ctx.cmd(0))) + else if (out eq ctx.evt) eventSink(Success(ctx.evt(0))) + else out foreach { + case Right(cmd) ⇒ commandSink(Success(cmd)) + case Left(evt) ⇒ eventSink(Success(evt)) + } + } + } + override def injectEvent(evt: EvtBelow): Unit = { + Try(pl.eventPipeline(evt)) match { + case f: Failure[_] ⇒ eventSink(f.asInstanceOf[Try[EvtAbove]]) + case Success(out) ⇒ + if (out.isEmpty) () // nothing + else if (out eq ctx.cmd) commandSink(Success(ctx.cmd(0))) + else if (out eq ctx.evt) eventSink(Success(ctx.evt(0))) + else out foreach { + case Right(cmd) ⇒ commandSink(Success(cmd)) + case Left(evt) ⇒ eventSink(Success(evt)) + } + } + } + override def managementCommand(cmd: AnyRef): Unit = { + val out = pl.managementPort(cmd) + if (out.isEmpty) () // nothing + else if (out eq ctx.cmd) commandSink(Success(ctx.cmd(0))) + else if (out eq ctx.evt) eventSink(Success(ctx.evt(0))) + else out foreach { + case Right(cmd) ⇒ commandSink(Success(cmd)) + case Left(evt) ⇒ eventSink(Success(evt)) + } + } + } + + /** + * Java API: build the pipeline attaching the given callback object to its + * outputs. Exceptions thrown within the pipeline stages will abort + * processing (i.e. will not be processed in following stages) but will be + * caught and passed as [[scala.util.Failure]] into the respective sink. + * + * Exceptions thrown while processing management commands are not caught. + * + * @param ctx The context object for this pipeline + * @param stage The (composite) pipeline stage from whcih to build the pipeline + * @param callback The [[PipelineSink]] to attach to the built pipeline + * @return a handle for injecting events or commands into the pipeline + */ + def buildWithSink[Ctx <: PipelineContext, CmdAbove, CmdBelow, EvtAbove, EvtBelow] // + (ctx: Ctx, + stage: PipelineStage[Ctx, CmdAbove, CmdBelow, EvtAbove, EvtBelow], + callback: PipelineSink[CmdBelow, EvtAbove]): PipelineInjector[CmdAbove, EvtBelow] = + buildWithSinkFunctions[Ctx, CmdAbove, CmdBelow, EvtAbove, EvtBelow](ctx, stage)({ + case Failure(thr) ⇒ callback.onCommandFailure(thr) + case Success(cmd) ⇒ callback.onCommand(cmd) + }, { + case Failure(thr) ⇒ callback.onEventFailure(thr) + case Success(evt) ⇒ callback.onEvent(evt) + }) +} + +/** + * A handle for injecting commands and events into a pipeline. Commands travel + * down (or to the right) through the stages, events travel in the opposite + * direction. + * + * @see [[PipelineFactory#buildWithSinkFunctions]] + * @see [[PipelineFactory#buildWithSink]] + */ +trait PipelineInjector[Cmd, Evt] { + + /** + * Inject the given command into the connected pipeline. + */ + @throws(classOf[Exception]) + def injectCommand(cmd: Cmd): Unit + + /** + * Inject the given event into the connected pipeline. + */ + @throws(classOf[Exception]) + def injectEvent(event: Evt): Unit + + /** + * Send a management command to all stages (in an unspecified order). + */ + @throws(classOf[Exception]) + def managementCommand(cmd: AnyRef): Unit +} + +/** + * A sink which can be attached by [[PipelineFactory#buildWithSink]] to a + * pipeline when it is being built. The methods are called when commands, + * events or their failures occur during evaluation of the pipeline (i.e. + * when injection is triggered using the associated [[PipelineInjector]]). + */ +abstract class PipelineSink[Cmd, Evt] { + + /** + * This callback is invoked for every command generated by the pipeline. + * + * By default this does nothing. + */ + @throws(classOf[Throwable]) + def onCommand(cmd: Cmd): Unit = () + + /** + * This callback is invoked if an exception occurred while processing an + * injected command. If this callback is invoked that no other callbacks will + * be invoked for the same injection. + * + * By default this will just throw the exception. + */ + @throws(classOf[Throwable]) + def onCommandFailure(thr: Throwable): Unit = throw thr + + /** + * This callback is invoked for every event generated by the pipeline. + * + * By default this does nothing. + */ + @throws(classOf[Throwable]) + def onEvent(event: Evt): Unit = () + + /** + * This callback is invoked if an exception occurred while processing an + * injected event. If this callback is invoked that no other callbacks will + * be invoked for the same injection. + * + * By default this will just throw the exception. + */ + @throws(classOf[Throwable]) + def onEventFailure(thr: Throwable): Unit = throw thr +} + +/** + * This base trait of each pipeline’s context provides optimized facilities + * for generating single commands or events (i.e. the fast common case of 1:1 + * message transformations). + * + * IMPORTANT NOTICE: + * + * A PipelineContext MUST NOT be shared between multiple pipelines, it contains mutable + * state without synchronization. You have been warned! + * + * @see AbstractPipelineContext see AbstractPipelineContext for a default implementation (Java) + */ +trait PipelineContext { + + /** + * INTERNAL API: do not touch! + */ + private val cmdHolder = new Array[AnyRef](1) + /** + * INTERNAL API: do not touch! + */ + private val evtHolder = new Array[AnyRef](1) + /** + * INTERNAL API: do not touch! + */ + private[io] val cmd = WrappedArray.make(cmdHolder) + /** + * INTERNAL API: do not touch! + */ + private[io] val evt = WrappedArray.make(evtHolder) + + /** + * Scala API: Wrap a single command for efficient return to the pipeline’s machinery. + * This method avoids allocating a [[Right]] and an [[Iterable]] by reusing + * one such instance within the PipelineContext, hence it can be used ONLY ONCE by + * each pipeline stage. Prototypic and safe usage looks like this: + * + * {{{ + * override val commandPipeline = { cmd => + * val myResult = ... + * ctx.singleCommand(myResult) + * } + * }}} + * + * @see AbstractPipePair#singleCommand see AbstractPipePair for the Java API + */ + def singleCommand[Cmd <: AnyRef, Evt <: AnyRef](cmd: Cmd): Iterable[Either[Evt, Cmd]] = { + cmdHolder(0) = cmd + this.cmd + } + + /** + * Scala API: Wrap a single event for efficient return to the pipeline’s machinery. + * This method avoids allocating a [[Left]] and an [[Iterable]] by reusing + * one such instance within the context, hence it can be used ONLY ONCE by + * each pipeline stage. Prototypic and safe usage looks like this: + * + * {{{ + * override val eventPipeline = { cmd => + * val myResult = ... + * ctx.singleEvent(myResult) + * } + * }}} + * + * @see AbstractPipePair#singleEvent see AbstractPipePair for the Java API + */ + def singleEvent[Cmd <: AnyRef, Evt <: AnyRef](evt: Evt): Iterable[Either[Evt, Cmd]] = { + evtHolder(0) = evt + this.evt + } + + /** + * A shared (and shareable) instance of an empty `Iterable[Either[EvtAbove, CmdBelow]]`. + * Use this when processing does not yield any commands or events as result. + */ + def nothing[Cmd, Evt]: Iterable[Either[Evt, Cmd]] = Nil + + /** + * INTERNAL API: Dealias a possibly optimized return value such that it can + * be safely used; this is never needed when only using public API. + */ + def dealias[Cmd, Evt](msg: Iterable[Either[Evt, Cmd]]): Iterable[Either[Evt, Cmd]] = { + if (msg.isEmpty) Nil + else if (msg eq cmd) Seq(Right(cmd(0))) + else if (msg eq evt) Seq(Left(evt(0))) + else msg + } +} + +/** + * This base trait of each pipeline’s context provides optimized facilities + * for generating single commands or events (i.e. the fast common case of 1:1 + * message transformations). + * + * IMPORTANT NOTICE: + * + * A PipelineContext MUST NOT be shared between multiple pipelines, it contains mutable + * state without synchronization. You have been warned! + */ +abstract class AbstractPipelineContext extends PipelineContext + +object PipelineStage { + + /** + * Java API: attach the two given stages such that the command output of the + * first is fed into the command input of the second, and the event output of + * the second is fed into the event input of the first. In other words: + * sequence the stages such that the left one is on top of the right one. + * + * @param left the left or upper pipeline stage + * @param right the right or lower pipeline stage + * @return a pipeline stage representing the sequence of the two stages + */ + def sequence[Ctx <: PipelineContext, CmdAbove, CmdBelow, CmdBelowBelow, EvtAbove, EvtBelow, EvtBelowBelow] // + (left: PipelineStage[_ >: Ctx, CmdAbove, CmdBelow, EvtAbove, EvtBelow], + right: PipelineStage[_ >: Ctx, CmdBelow, CmdBelowBelow, EvtBelow, EvtBelowBelow]) // + : PipelineStage[Ctx, CmdAbove, CmdBelowBelow, EvtAbove, EvtBelowBelow] = + left >> right + + /** + * Java API: combine the two stages such that the command pipeline of the + * left stage is used and the event pipeline of the right, discarding the + * other two sub-pipelines. + * + * @param left the command pipeline + * @param right the event pipeline + * @return a pipeline stage using the left command pipeline and the right event pipeline + */ + def combine[Ctx <: PipelineContext, CmdAbove, CmdBelow, EvtAbove, EvtBelow] // + (left: PipelineStage[Ctx, CmdAbove, CmdBelow, EvtAbove, EvtBelow], + right: PipelineStage[Ctx, CmdAbove, CmdBelow, EvtAbove, EvtBelow]) // + : PipelineStage[Ctx, CmdAbove, CmdBelow, EvtAbove, EvtBelow] = + left | right +} + +/** + * A [[PipelineStage]] which is symmetric in command and event types, i.e. it only + * has one command and event type above and one below. + */ +abstract class SymmetricPipelineStage[Context <: PipelineContext, Above, Below] extends PipelineStage[Context, Above, Below, Above, Below] + +/** + * A pipeline stage which can be combined with other stages to build a + * protocol stack. The main function of this class is to serve as a factory + * for the actual [[PipePair]] generated by the [[#apply]] method so that a + * context object can be passed in. + * + * @see [[PipelineFactory]] + */ +abstract class PipelineStage[Context <: PipelineContext, CmdAbove, CmdBelow, EvtAbove, EvtBelow] { left ⇒ + + /** + * Implement this method to generate this stage’s pair of command and event + * functions. + * + * INTERNAL API: do not use this method to instantiate a pipeline! + * + * @see [[PipelineFactory]] + * @see [[AbstractPipePair]] + * @see [[AbstractSymmetricPipePair]] + */ + protected[io] def apply(ctx: Context): PipePair[CmdAbove, CmdBelow, EvtAbove, EvtBelow] + + /** + * Scala API: attach the two given stages such that the command output of the + * first is fed into the command input of the second, and the event output of + * the second is fed into the event input of the first. In other words: + * sequence the stages such that the left one is on top of the right one. + * + * @param right the right or lower pipeline stage + * @return a pipeline stage representing the sequence of the two stages + */ + def >>[CmdBelowBelow, EvtBelowBelow, BelowContext <: Context] // + (right: PipelineStage[_ >: BelowContext, CmdBelow, CmdBelowBelow, EvtBelow, EvtBelowBelow]) // + : PipelineStage[BelowContext, CmdAbove, CmdBelowBelow, EvtAbove, EvtBelowBelow] = + new PipelineStage[BelowContext, CmdAbove, CmdBelowBelow, EvtAbove, EvtBelowBelow] { + + protected[io] override def apply(ctx: BelowContext): PipePair[CmdAbove, CmdBelowBelow, EvtAbove, EvtBelowBelow] = { + + val leftPL = left(ctx) + val rightPL = right(ctx) + + new PipePair[CmdAbove, CmdBelowBelow, EvtAbove, EvtBelowBelow] { + + type Output = Either[EvtAbove, CmdBelowBelow] + + import language.implicitConversions + @inline implicit def narrowRight[A, B, C](in: Right[A, B]): Right[C, B] = in.asInstanceOf[Right[C, B]] + @inline implicit def narrowLeft[A, B, C](in: Left[A, B]): Left[A, C] = in.asInstanceOf[Left[A, C]] + + def loopLeft(input: Iterable[Either[EvtAbove, CmdBelow]]): Iterable[Output] = { + if (input.isEmpty) Nil + else if (input eq ctx.cmd) loopRight(rightPL.commandPipeline(ctx.cmd(0))) + else if (input eq ctx.evt) ctx.evt + else { + val output = Vector.newBuilder[Output] + input foreach { + case Right(cmd) ⇒ output ++= ctx.dealias(loopRight(rightPL.commandPipeline(cmd))) + case l @ Left(_) ⇒ output += l + } + output.result + } + } + + def loopRight(input: Iterable[Either[EvtBelow, CmdBelowBelow]]): Iterable[Output] = { + if (input.isEmpty) Nil + else if (input eq ctx.cmd) ctx.cmd + else if (input eq ctx.evt) loopLeft(leftPL.eventPipeline(ctx.evt(0))) + else { + val output = Vector.newBuilder[Output] + input foreach { + case r @ Right(_) ⇒ output += r + case Left(evt) ⇒ output ++= ctx.dealias(loopLeft(leftPL.eventPipeline(evt))) + } + output.result + } + } + + override val commandPipeline = { a: CmdAbove ⇒ loopLeft(leftPL.commandPipeline(a)) } + + override val eventPipeline = { b: EvtBelowBelow ⇒ loopRight(rightPL.eventPipeline(b)) } + + override val managementPort: PartialFunction[AnyRef, Iterable[Either[EvtAbove, CmdBelowBelow]]] = { + case x ⇒ + val output = Vector.newBuilder[Output] + output ++= ctx.dealias(loopLeft(leftPL.managementPort.applyOrElse(x, (_: AnyRef) ⇒ Nil))) + output ++= ctx.dealias(loopRight(rightPL.managementPort.applyOrElse(x, (_: AnyRef) ⇒ Nil))) + output.result + } + } + } + } + + /** + * Scala API: combine the two stages such that the command pipeline of the + * left stage is used and the event pipeline of the right, discarding the + * other two sub-pipelines. + * + * @param right the event pipeline + * @return a pipeline stage using the left command pipeline and the right event pipeline + */ + def |[RightContext <: Context] // + (right: PipelineStage[_ >: RightContext, CmdAbove, CmdBelow, EvtAbove, EvtBelow]) // + : PipelineStage[RightContext, CmdAbove, CmdBelow, EvtAbove, EvtBelow] = + new PipelineStage[RightContext, CmdAbove, CmdBelow, EvtAbove, EvtBelow] { + override def apply(ctx: RightContext): PipePair[CmdAbove, CmdBelow, EvtAbove, EvtBelow] = + new PipePair[CmdAbove, CmdBelow, EvtAbove, EvtBelow] { + + val leftPL = left(ctx) + val rightPL = right(ctx) + + override val commandPipeline = leftPL.commandPipeline + override val eventPipeline = rightPL.eventPipeline + override val managementPort: Mgmt = { + case x ⇒ + val output = Vector.newBuilder[Either[EvtAbove, CmdBelow]] + output ++= ctx.dealias(leftPL.managementPort(x)) + output ++= ctx.dealias(rightPL.managementPort(x)) + output.result + } + } + } +} + +//#length-field-frame +/** + * Pipeline stage for length-field encoded framing. It will prepend a + * four-byte length header to the message; the header contains the length of + * the resulting frame including header in big-endian representation. + * + * The `maxSize` argument is used to protect the communication channel sanity: + * larger frames will not be sent (silently dropped) or received (in which case + * stream decoding would be broken, hence throwing an IllegalArgumentException). + */ +class LengthFieldFrame(maxSize: Int, + byteOrder: ByteOrder = ByteOrder.BIG_ENDIAN, + headerSize: Int = 4, + lengthIncludesHeader: Boolean = true) + extends SymmetricPipelineStage[PipelineContext, ByteString, ByteString] { + + //#range-checks-omitted + require(byteOrder ne null, "byteOrder must not be null") + require(headerSize > 0 && headerSize <= 4, "headerSize must be in (0, 4]") + require(maxSize > 0, "maxSize must be positive") + require(maxSize <= (Int.MaxValue >> (4 - headerSize) * 8) * (if (headerSize == 4) 1 else 2), + "maxSize cannot exceed 256**headerSize") + //#range-checks-omitted + + override def apply(ctx: PipelineContext) = + new SymmetricPipePair[ByteString, ByteString] { + var buffer = None: Option[ByteString] + implicit val byteOrder = LengthFieldFrame.this.byteOrder + + /** + * Extract as many complete frames as possible from the given ByteString + * and return the remainder together with the extracted frames in reverse + * order. + */ + @tailrec + def extractFrames(bs: ByteString, acc: List[ByteString]) // + : (Option[ByteString], Seq[ByteString]) = { + if (bs.isEmpty) { + (None, acc) + } else if (bs.length < headerSize) { + (Some(bs.compact), acc) + } else { + val length = bs.iterator.getLongPart(headerSize).toInt + if (length < 0 || length > maxSize) + throw new IllegalArgumentException( + s"received too large frame of size $length (max = $maxSize)") + val total = if (lengthIncludesHeader) length else length + headerSize + if (bs.length >= total) { + extractFrames(bs drop total, bs.slice(headerSize, total) :: acc) + } else { + (Some(bs.compact), acc) + } + } + } + + /* + * This is how commands (writes) are transformed: calculate length + * including header, write that to a ByteStringBuilder and append the + * payload data. The result is a single command (i.e. `Right(...)`). + */ + override def commandPipeline = + { bs: ByteString ⇒ + val length = + if (lengthIncludesHeader) bs.length + headerSize else bs.length + if (length > maxSize) Seq() + else { + val bb = ByteString.newBuilder + bb.putLongPart(length, headerSize) + bb ++= bs + ctx.singleCommand(bb.result) + } + } + + /* + * This is how events (reads) are transformed: append the received + * ByteString to the buffer (if any) and extract the frames from the + * result. In the end store the new buffer contents and return the + * list of events (i.e. `Left(...)`). + */ + override def eventPipeline = + { bs: ByteString ⇒ + val data = if (buffer.isEmpty) bs else buffer.get ++ bs + val (nb, frames) = extractFrames(data, Nil) + buffer = nb + /* + * please note the specialized (optimized) facility for emitting + * just a single event + */ + frames match { + case Nil ⇒ Nil + case one :: Nil ⇒ ctx.singleEvent(one) + case many ⇒ many.reverse map (Left(_)) + } + } + } +} +//#length-field-frame + +//#tick-generator +/** + * This trait expresses that the pipeline’s context needs to live within an + * actor and provide its ActorContext. + */ +trait HasActorContext extends PipelineContext { + /** + * Retrieve the [[ActorContext]] for this pipeline’s context. + */ + def getContext: ActorContext +} + +object TickGenerator { + /** + * This message type is used by the TickGenerator to trigger + * the rescheduling of the next Tick. The actor hosting the pipeline + * which includes a TickGenerator must arrange for messages of this + * type to be injected into the management port of the pipeline. + */ + trait Trigger + + /** + * This message type is emitted by the TickGenerator to the whole + * pipeline, informing all stages about the time at which this Tick + * was emitted (relative to some arbitrary epoch). + */ + case class Tick(@BeanProperty timestamp: FiniteDuration) extends Trigger +} + +/** + * This pipeline stage does not alter the events or commands + */ +class TickGenerator[Cmd <: AnyRef, Evt <: AnyRef](interval: FiniteDuration) + extends PipelineStage[HasActorContext, Cmd, Cmd, Evt, Evt] { + import TickGenerator._ + + override def apply(ctx: HasActorContext) = + new PipePair[Cmd, Cmd, Evt, Evt] { + + // use unique object to avoid double-activation on actor restart + private val trigger: Trigger = + new Trigger { + override def toString = s"Tick[${ctx.getContext.self.path}]" + } + + private def schedule() = + ctx.getContext.system.scheduler.scheduleOnce( + interval, ctx.getContext.self, trigger)(ctx.getContext.dispatcher) + + // automatically activate this generator + schedule() + + override val commandPipeline = (cmd: Cmd) ⇒ ctx.singleCommand(cmd) + + override val eventPipeline = (evt: Evt) ⇒ ctx.singleEvent(evt) + + override val managementPort: Mgmt = { + case `trigger` ⇒ + ctx.getContext.self ! Tick(Deadline.now.time) + schedule() + Nil + } + } +} +//#tick-generator + diff --git a/akka-actor/src/main/scala/akka/util/ByteIterator.scala b/akka-actor/src/main/scala/akka/util/ByteIterator.scala index ded49f63a6..3b7f0aab60 100644 --- a/akka-actor/src/main/scala/akka/util/ByteIterator.scala +++ b/akka-actor/src/main/scala/akka/util/ByteIterator.scala @@ -522,6 +522,22 @@ abstract class ByteIterator extends BufferedIterator[Byte] { else throw new IllegalArgumentException("Unknown byte order " + byteOrder) } + /** + * Get a Long from this iterator where only the least significant `n` + * bytes were encoded. + */ + def getLongPart(n: Int)(implicit byteOrder: ByteOrder): Long = { + if (byteOrder == ByteOrder.BIG_ENDIAN) { + var x = 0L + (1 to n) foreach (_ ⇒ x = (x << 8) | next()) + x + } else if (byteOrder == ByteOrder.LITTLE_ENDIAN) { + var x = 0L + (0 until n) foreach (i ⇒ x |= next() << 8 * i) + x + } else throw new IllegalArgumentException("Unknown byte order " + byteOrder) + } + def getFloat(implicit byteOrder: ByteOrder): Float = java.lang.Float.intBitsToFloat(getInt(byteOrder)) diff --git a/akka-actor/src/main/scala/akka/util/ByteString.scala b/akka-actor/src/main/scala/akka/util/ByteString.scala index 39639ae5f4..2f188f2a0a 100644 --- a/akka-actor/src/main/scala/akka/util/ByteString.scala +++ b/akka-actor/src/main/scala/akka/util/ByteString.scala @@ -69,6 +69,11 @@ object ByteString { */ def fromString(string: String, charset: String): ByteString = apply(string, charset) + /** + * Creates a new ByteString by copying bytes out of a ByteBuffer. + */ + def fromByteBuffer(buffer: ByteBuffer): ByteString = apply(buffer) + val empty: ByteString = CompactByteString(Array.empty[Byte]) def newBuilder: ByteStringBuilder = new ByteStringBuilder @@ -324,6 +329,11 @@ sealed abstract class ByteString extends IndexedSeq[Byte] with IndexedSeqOptimiz */ def ++(that: ByteString): ByteString + /** + * Java API: efficiently concatenate another ByteString. + */ + def concat(that: ByteString): ByteString = this ++ that + /** * Copy as many bytes as possible to a ByteBuffer, starting from it's * current position. This method will not overflow the buffer. @@ -570,6 +580,11 @@ final class ByteStringBuilder extends Builder[Byte, ByteString] { this } + /** + * Java API: append a ByteString to this builder. + */ + def append(bs: ByteString): this.type = this ++= bs + /** * Add a single Byte to this builder. */ @@ -592,19 +607,18 @@ final class ByteStringBuilder extends Builder[Byte, ByteString] { * Add a single Int to this builder. */ def putInt(x: Int)(implicit byteOrder: ByteOrder): this.type = { - fillArray(4) { - case (target, offset) ⇒ - if (byteOrder == ByteOrder.BIG_ENDIAN) { - target(offset + 0) = (x >>> 24).toByte - target(offset + 1) = (x >>> 16).toByte - target(offset + 2) = (x >>> 8).toByte - target(offset + 3) = (x >>> 0).toByte - } else if (byteOrder == ByteOrder.LITTLE_ENDIAN) { - target(offset + 0) = (x >>> 0).toByte - target(offset + 1) = (x >>> 8).toByte - target(offset + 2) = (x >>> 16).toByte - target(offset + 3) = (x >>> 24).toByte - } else throw new IllegalArgumentException("Unknown byte order " + byteOrder) + fillArray(4) { (target, offset) ⇒ + if (byteOrder == ByteOrder.BIG_ENDIAN) { + target(offset + 0) = (x >>> 24).toByte + target(offset + 1) = (x >>> 16).toByte + target(offset + 2) = (x >>> 8).toByte + target(offset + 3) = (x >>> 0).toByte + } else if (byteOrder == ByteOrder.LITTLE_ENDIAN) { + target(offset + 0) = (x >>> 0).toByte + target(offset + 1) = (x >>> 8).toByte + target(offset + 2) = (x >>> 16).toByte + target(offset + 3) = (x >>> 24).toByte + } else throw new IllegalArgumentException("Unknown byte order " + byteOrder) } this } @@ -613,31 +627,45 @@ final class ByteStringBuilder extends Builder[Byte, ByteString] { * Add a single Long to this builder. */ def putLong(x: Long)(implicit byteOrder: ByteOrder): this.type = { - fillArray(8) { - case (target, offset) ⇒ - if (byteOrder == ByteOrder.BIG_ENDIAN) { - target(offset + 0) = (x >>> 56).toByte - target(offset + 1) = (x >>> 48).toByte - target(offset + 2) = (x >>> 40).toByte - target(offset + 3) = (x >>> 32).toByte - target(offset + 4) = (x >>> 24).toByte - target(offset + 5) = (x >>> 16).toByte - target(offset + 6) = (x >>> 8).toByte - target(offset + 7) = (x >>> 0).toByte - } else if (byteOrder == ByteOrder.LITTLE_ENDIAN) { - target(offset + 0) = (x >>> 0).toByte - target(offset + 1) = (x >>> 8).toByte - target(offset + 2) = (x >>> 16).toByte - target(offset + 3) = (x >>> 24).toByte - target(offset + 4) = (x >>> 32).toByte - target(offset + 5) = (x >>> 40).toByte - target(offset + 6) = (x >>> 48).toByte - target(offset + 7) = (x >>> 56).toByte - } else throw new IllegalArgumentException("Unknown byte order " + byteOrder) + fillArray(8) { (target, offset) ⇒ + if (byteOrder == ByteOrder.BIG_ENDIAN) { + target(offset + 0) = (x >>> 56).toByte + target(offset + 1) = (x >>> 48).toByte + target(offset + 2) = (x >>> 40).toByte + target(offset + 3) = (x >>> 32).toByte + target(offset + 4) = (x >>> 24).toByte + target(offset + 5) = (x >>> 16).toByte + target(offset + 6) = (x >>> 8).toByte + target(offset + 7) = (x >>> 0).toByte + } else if (byteOrder == ByteOrder.LITTLE_ENDIAN) { + target(offset + 0) = (x >>> 0).toByte + target(offset + 1) = (x >>> 8).toByte + target(offset + 2) = (x >>> 16).toByte + target(offset + 3) = (x >>> 24).toByte + target(offset + 4) = (x >>> 32).toByte + target(offset + 5) = (x >>> 40).toByte + target(offset + 6) = (x >>> 48).toByte + target(offset + 7) = (x >>> 56).toByte + } else throw new IllegalArgumentException("Unknown byte order " + byteOrder) } this } + /** + * Add the `n` least significant bytes of the given Long to this builder. + */ + def putLongPart(x: Long, n: Int)(implicit byteOrder: ByteOrder): this.type = { + fillArray(n) { (target, offset) ⇒ + if (byteOrder == ByteOrder.BIG_ENDIAN) { + val start = n * 8 - 8 + (0 until n) foreach (i ⇒ target(offset + i) = (x >>> start - 8 * i).toByte) + } else if (byteOrder == ByteOrder.LITTLE_ENDIAN) { + val end = offset + n - 1 + (0 until n) foreach (i ⇒ target(end - i) = (x >>> 8 * i).toByte) + } else throw new IllegalArgumentException("Unknown byte order " + byteOrder) + } + } + /** * Add a single Float to this builder. */ diff --git a/akka-docs/rst/java/code/docs/io/japi/HasActorContext.java b/akka-docs/rst/java/code/docs/io/japi/HasActorContext.java new file mode 100644 index 0000000000..8987d4b9bb --- /dev/null +++ b/akka-docs/rst/java/code/docs/io/japi/HasActorContext.java @@ -0,0 +1,16 @@ +/** + * Copyright (C) 2013 Typesafe Inc. + */ + +package docs.io.japi; + +import akka.actor.ActorContext; +import akka.io.PipelineContext; + +//#actor-context +public interface HasActorContext extends PipelineContext { + + public ActorContext getContext(); + +} +//#actor-context diff --git a/akka-docs/rst/java/code/docs/io/japi/HasByteOrder.java b/akka-docs/rst/java/code/docs/io/japi/HasByteOrder.java new file mode 100644 index 0000000000..0462a1c587 --- /dev/null +++ b/akka-docs/rst/java/code/docs/io/japi/HasByteOrder.java @@ -0,0 +1,15 @@ +/** + * Copyright (C) 2013 Typesafe Inc. + */ + +package docs.io.japi; + +import java.nio.ByteOrder; + +import akka.io.PipelineContext; + +public interface HasByteOrder extends PipelineContext { + + public ByteOrder byteOrder(); + +} diff --git a/akka-docs/rst/java/code/docs/io/japi/LengthFieldFrame.java b/akka-docs/rst/java/code/docs/io/japi/LengthFieldFrame.java new file mode 100644 index 0000000000..a94ee27313 --- /dev/null +++ b/akka-docs/rst/java/code/docs/io/japi/LengthFieldFrame.java @@ -0,0 +1,84 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ + +package docs.io.japi; + +//#frame +import java.nio.ByteOrder; +import java.util.ArrayList; + +import scala.util.Either; +import akka.io.AbstractSymmetricPipePair; +import akka.io.PipePairFactory; +import akka.io.PipelineContext; +import akka.io.SymmetricPipePair; +import akka.io.SymmetricPipelineStage; +import akka.util.ByteString; +import akka.util.ByteStringBuilder; + +public class LengthFieldFrame extends + SymmetricPipelineStage { + + final int maxSize; + + public LengthFieldFrame(int maxSize) { + this.maxSize = maxSize; + } + + @Override + public SymmetricPipePair apply(final PipelineContext ctx) { + return PipePairFactory + .create(ctx, new AbstractSymmetricPipePair() { + + final ByteOrder byteOrder = ByteOrder.BIG_ENDIAN; + ByteString buffer = null; + + @Override + public Iterable> onCommand( + ByteString cmd) { + final int length = cmd.length() + 4; + if (length > maxSize) { + return new ArrayList>(0); + } + final ByteStringBuilder bb = new ByteStringBuilder(); + bb.putInt(length, byteOrder); + bb.append(cmd); + return singleCommand(bb.result()); + } + + @Override + public Iterable> onEvent( + ByteString event) { + final ArrayList> res = + new ArrayList>(); + ByteString current = buffer == null ? event : buffer.concat(event); + while (true) { + if (current.length() == 0) { + buffer = null; + return res; + } else if (current.length() < 4) { + buffer = current; + return res; + } else { + final int length = current.iterator().getInt(byteOrder); + if (length > maxSize) + throw new IllegalArgumentException( + "received too large frame of size " + length + " (max = " + + maxSize + ")"); + if (current.length() < length) { + buffer = current; + return res; + } else { + res.add(makeEvent(current.slice(4, length))); + current = current.drop(length); + } + } + } + } + + }); + } + +} +//#frame diff --git a/akka-docs/rst/java/code/docs/io/japi/Message.java b/akka-docs/rst/java/code/docs/io/japi/Message.java new file mode 100644 index 0000000000..c028822769 --- /dev/null +++ b/akka-docs/rst/java/code/docs/io/japi/Message.java @@ -0,0 +1,45 @@ +/** + * Copyright (C) 2013 Typesafe Inc. + */ + +package docs.io.japi; + +//#message +public class Message { + + static public class Person { + private final String first; + private final String last; + + public Person(String first, String last) { + this.first = first; + this.last = last; + } + + public String getFirst() { + return first; + } + + public String getLast() { + return last; + } + + } + + private final Person[] persons; + private final double[] happinessCurve; + + public Message(Person[] persons, double[] happinessCurve) { + this.persons = persons; + this.happinessCurve = happinessCurve; + } + + public Person[] getPersons() { + return persons; + } + + public double[] getHappinessCurve() { + return happinessCurve; + } +} +//#message diff --git a/akka-docs/rst/java/code/docs/io/japi/MessageStage.java b/akka-docs/rst/java/code/docs/io/japi/MessageStage.java new file mode 100644 index 0000000000..777f93e256 --- /dev/null +++ b/akka-docs/rst/java/code/docs/io/japi/MessageStage.java @@ -0,0 +1,118 @@ +/** + * Copyright (C) 2013 Typesafe Inc. + */ + +package docs.io.japi; + +import java.nio.ByteOrder; +import java.util.Collections; + +import scala.concurrent.duration.Duration; +import scala.concurrent.duration.FiniteDuration; +import scala.util.Either; +import akka.actor.ActorRef; +import akka.io.AbstractSymmetricPipePair; +import akka.io.PipePairFactory; +import akka.io.SymmetricPipePair; +import akka.io.SymmetricPipelineStage; +import akka.util.ByteIterator; +import akka.util.ByteString; +import akka.util.ByteStringBuilder; + +//#format +public class MessageStage extends + SymmetricPipelineStage { + + @Override + public SymmetricPipePair apply(final HasByteOrder context) { + + return PipePairFactory + .create(context, new AbstractSymmetricPipePair() { + + final ByteOrder byteOrder = context.byteOrder(); + + private void putString(ByteStringBuilder builder, String str) { + final byte[] bytes = ByteString.fromString(str, "UTF-8").toArray(); + builder.putInt(bytes.length, byteOrder); + builder.putBytes(bytes); + } + + @Override + public Iterable> onCommand(Message cmd) { + final ByteStringBuilder builder = new ByteStringBuilder(); + + builder.putInt(cmd.getPersons().length, byteOrder); + for (Message.Person p : cmd.getPersons()) { + putString(builder, p.getFirst()); + putString(builder, p.getLast()); + } + + builder.putInt(cmd.getHappinessCurve().length, byteOrder); + builder.putDoubles(cmd.getHappinessCurve(), byteOrder); + + return singleCommand(builder.result()); + } + + //#decoding-omitted + //#decoding + private String getString(ByteIterator iter) { + final int length = iter.getInt(byteOrder); + final byte[] bytes = new byte[length]; + iter.getBytes(bytes); + return ByteString.fromArray(bytes).utf8String(); + } + + @Override + public Iterable> onEvent(ByteString evt) { + final ByteIterator iter = evt.iterator(); + + final int personLength = iter.getInt(byteOrder); + final Message.Person[] persons = new Message.Person[personLength]; + for (int i = 0; i < personLength; ++i) { + persons[i] = new Message.Person(getString(iter), getString(iter)); + } + + final int curveLength = iter.getInt(byteOrder); + final double[] curve = new double[curveLength]; + iter.getDoubles(curve, byteOrder); + + // verify that this was all; could be left out to allow future + // extensions + assert iter.isEmpty(); + + return singleEvent(new Message(persons, curve)); + } + //#decoding + + ActorRef target = null; + + //#mgmt-ticks + private FiniteDuration lastTick = Duration.Zero(); + + @Override + public Iterable> onManagementCommand(Object cmd) { + //#omitted + if (cmd instanceof PipelineTest.SetTarget) { + target = ((PipelineTest.SetTarget) cmd).getRef(); + } else if (cmd instanceof TickGenerator.Tick && target != null) { + target.tell(cmd, null); + } + //#omitted + if (cmd instanceof TickGenerator.Tick) { + final FiniteDuration timestamp = ((TickGenerator.Tick) cmd) + .getTimestamp(); + System.out.println("time since last tick: " + + timestamp.minus(lastTick)); + lastTick = timestamp; + } + return Collections.emptyList(); + } + //#mgmt-ticks + //#decoding-omitted + + }); + + } + +} +//#format diff --git a/akka-docs/rst/java/code/docs/io/japi/PipelineTest.java b/akka-docs/rst/java/code/docs/io/japi/PipelineTest.java new file mode 100644 index 0000000000..cd1c0744f7 --- /dev/null +++ b/akka-docs/rst/java/code/docs/io/japi/PipelineTest.java @@ -0,0 +1,167 @@ +/** + * Copyright (C) 2013 Typesafe Inc. + */ + +package docs.io.japi; + +import java.nio.ByteOrder; +import java.util.concurrent.TimeUnit; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import scala.concurrent.duration.Duration; + +import akka.actor.Actor; +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.PoisonPill; +import akka.actor.Props; +import akka.actor.UntypedActorFactory; +import akka.io.AbstractPipelineContext; +import akka.io.PipelineFactory; +import akka.io.PipelineInjector; +import akka.io.PipelineSink; +import akka.io.PipelineStage; +import akka.testkit.JavaTestKit; +import akka.testkit.TestProbe; +import akka.util.ByteString; + +public class PipelineTest { + + //#message + final Message msg = new Message( + new Message.Person[] { + new Message.Person("Alice", "Gibbons"), + new Message.Person("Bob", "Sparseley") + }, + new double[] { 1.0, 3.0, 5.0 }); + //#message + + //#byteorder + class Context extends AbstractPipelineContext implements HasByteOrder { + + @Override + public ByteOrder byteOrder() { + return java.nio.ByteOrder.BIG_ENDIAN; + } + + } + final Context ctx = new Context(); + //#byteorder + + static ActorSystem system = null; + + @BeforeClass + public static void setup() { + system = ActorSystem.create("PipelineTest"); + } + + @AfterClass + public static void teardown() { + system.shutdown(); + } + + @Test + public void demonstratePipeline() throws Exception { + final TestProbe probe = TestProbe.apply(system); + final ActorRef commandHandler = probe.ref(); + final ActorRef eventHandler = probe.ref(); + //#build-sink + final PipelineStage stages = + PipelineStage.sequence( + new MessageStage(), + new LengthFieldFrame(10000) + ); + + final PipelineSink sink = + new PipelineSink() { + + @Override + public void onCommand(ByteString cmd) throws Throwable { + commandHandler.tell(cmd, null); + } + + @Override + public void onEvent(Message evt) throws Throwable { + eventHandler.tell(evt, null); + } + }; + + final PipelineInjector injector = + PipelineFactory.buildWithSink(ctx, stages, sink); + + injector.injectCommand(msg); + //#build-sink + final ByteString encoded = probe.expectMsgClass(ByteString.class); + injector.injectEvent(encoded); + final Message decoded = probe.expectMsgClass(Message.class); + assert msg == decoded; + } + + static class SetTarget { + final ActorRef ref; + + public SetTarget(ActorRef ref) { + super(); + this.ref = ref; + } + + public ActorRef getRef() { + return ref; + } + } + + @Test + public void testTick() { + new JavaTestKit(system) { + { + final ActorRef proc = system.actorOf(new Props( + new UntypedActorFactory() { + private static final long serialVersionUID = 1L; + + @Override + public Actor create() throws Exception { + return new Processor(getRef(), getRef()) { + + @Override + public void onReceive(Object obj) throws Exception { + if (obj.equals("fail!")) { + throw new RuntimeException("FAIL!"); + } + super.onReceive(obj); + } + + }; + } + }), "processor"); + expectMsgClass(TickGenerator.Tick.class); + proc.tell(msg, null); + final ByteString encoded = expectMsgClass(ByteString.class); + proc.tell(encoded, null); + final Message decoded = expectMsgClass(Message.class); + assert msg == decoded; + + new Within(Duration.create(1500, TimeUnit.MILLISECONDS), + Duration.create(3, TimeUnit.SECONDS)) { + protected void run() { + expectMsgClass(TickGenerator.Tick.class); + expectMsgClass(TickGenerator.Tick.class); + } + }; + proc.tell("fail!", null); + new Within(Duration.create(1700, TimeUnit.MILLISECONDS), + Duration.create(3, TimeUnit.SECONDS)) { + protected void run() { + expectMsgClass(TickGenerator.Tick.class); + expectMsgClass(TickGenerator.Tick.class); + proc.tell(PoisonPill.getInstance(), null); + expectNoMsg(); + } + }; + } + }; + } + +} diff --git a/akka-docs/rst/java/code/docs/io/japi/Processor.java b/akka-docs/rst/java/code/docs/io/japi/Processor.java new file mode 100644 index 0000000000..ee0b704121 --- /dev/null +++ b/akka-docs/rst/java/code/docs/io/japi/Processor.java @@ -0,0 +1,93 @@ +/** + * Copyright (C) 2013 Typesafe Inc. + */ + +package docs.io.japi; + +import java.nio.ByteOrder; +import java.util.concurrent.TimeUnit; + +import akka.actor.ActorContext; +import akka.actor.ActorRef; +import akka.actor.UntypedActor; +import akka.io.AbstractPipelineContext; +import akka.io.PipelineFactory; +import akka.io.PipelineInjector; +import akka.io.PipelineSink; +import akka.io.PipelineStage; +import akka.util.ByteString; +import scala.concurrent.duration.*; + +//#actor +public class Processor extends UntypedActor { + + private class Context extends AbstractPipelineContext + implements HasByteOrder, HasActorContext { + + @Override + public ActorContext getContext() { + return Processor.this.getContext(); + } + + @Override + public ByteOrder byteOrder() { + return java.nio.ByteOrder.BIG_ENDIAN; + } + + } + + final Context ctx = new Context(); + + final FiniteDuration interval = Duration.apply(1, TimeUnit.SECONDS); + + final PipelineStage stages = + PipelineStage.sequence( + // Java 7 can infer these types, Java 6 cannot + PipelineStage. sequence( // + new TickGenerator(interval), // + new MessageStage()), // + new LengthFieldFrame(10000)); + + private final ActorRef evts; + private final ActorRef cmds; + + final PipelineInjector injector = PipelineFactory + .buildWithSink(ctx, stages, new PipelineSink() { + + @Override + public void onCommand(ByteString cmd) { + cmds.tell(cmd, getSelf()); + } + + @Override + public void onEvent(Message evt) { + evts.tell(evt, getSelf()); + } + }); + + public Processor(ActorRef cmds, ActorRef evts) throws Exception { + this.cmds = cmds; + this.evts = evts; + } + + //#omitted + @Override + public void preStart() throws Exception { + injector.managementCommand(new PipelineTest.SetTarget(cmds)); + } + //#omitted + + @Override + public void onReceive(Object obj) throws Exception { + if (obj instanceof Message) { + injector.injectCommand((Message) obj); + } else if (obj instanceof ByteString) { + injector.injectEvent((ByteString) obj); + } else if (obj instanceof TickGenerator.Trigger) { + injector.managementCommand(obj); + } + } + +} +//#actor + diff --git a/akka-docs/rst/java/code/docs/io/japi/TickGenerator.java b/akka-docs/rst/java/code/docs/io/japi/TickGenerator.java new file mode 100644 index 0000000000..95c39f8fbc --- /dev/null +++ b/akka-docs/rst/java/code/docs/io/japi/TickGenerator.java @@ -0,0 +1,86 @@ +/** + * Copyright (C) 2013 Typesafe Inc. + */ + +package docs.io.japi; + +import java.util.Collections; + +import scala.concurrent.duration.Deadline; +import scala.concurrent.duration.FiniteDuration; +import scala.util.Either; +import akka.actor.ActorSystem; +import akka.io.AbstractPipePair; +import akka.io.PipePair; +import akka.io.PipePairFactory; +import akka.io.PipelineStage; + +//#tick-generator +public class TickGenerator extends + PipelineStage { + + public static interface Trigger {}; + + public static class Tick implements Trigger { + final FiniteDuration timestamp; + + public Tick(FiniteDuration timestamp) { + super(); + this.timestamp = timestamp; + } + + public FiniteDuration getTimestamp() { + return timestamp; + } + } + + private final FiniteDuration interval; + + public TickGenerator(FiniteDuration interval) { + this.interval = interval; + } + + @Override + public PipePair apply(final HasActorContext ctx) { + return PipePairFactory.create(ctx, + new AbstractPipePair() { + + private final Trigger trigger = new Trigger() { + public String toString() { + return "Tick[" + ctx.getContext().self().path() + "]"; + } + }; + + private void schedule() { + final ActorSystem system = ctx.getContext().system(); + system.scheduler().scheduleOnce(interval, + ctx.getContext().self(), trigger, system.dispatcher(), null); + } + + { + schedule(); + } + + @Override + public Iterable> onCommand(Cmd cmd) { + return singleCommand(cmd); + } + + @Override + public Iterable> onEvent(Evt evt) { + return singleEvent(evt); + } + + @Override + public Iterable> onManagementCommand(Object cmd) { + if (cmd == trigger) { + ctx.getContext().self().tell(new Tick(Deadline.now().time()), null); + schedule(); + } + return Collections.emptyList(); + } + + }); + } +} +//#tick-generator diff --git a/akka-docs/rst/java/io.rst b/akka-docs/rst/java/io.rst index 2da259b0b1..54d47804dc 100644 --- a/akka-docs/rst/java/io.rst +++ b/akka-docs/rst/java/io.rst @@ -99,6 +99,250 @@ Compatibility with java.io A ``ByteStringBuilder`` can be wrapped in a ``java.io.OutputStream`` via the ``asOutputStream`` method. Likewise, ``ByteIterator`` can we wrapped in a ``java.io.InputStream`` via ``asInputStream``. Using these, ``akka.io`` applications can integrate legacy code based on ``java.io`` streams. +Encoding and decoding binary data +--------------------------------- + +Akka adopted and adapted the implementation of data processing pipelines found +in the ``spray-io`` module. The idea is that encoding and decoding often +go hand in hand and keeping the code pertaining to one protocol layer together +is deemed more important than writing down the complete read side—say—in the +iteratee style in one go; pipelines encourage packaging the stages in a form +which lends itself better to reuse in a protocol stack. Another reason for +choosing this abstraction is that it is at times necessary to change the +behavior of encoding and decoding within a stage based on a message stream’s +state, and pipeline stages allow communication between the read and write +halves quite naturally. + +The actual byte-fiddling can be done within pipeline stages, for example using +the rich API of :class:`ByteIterator` and :class:`ByteStringBuilder` as shown +below. All these activities are synchronous transformations which benefit +greatly from CPU affinity to make good use of those data caches. Therefore the +design of the pipeline infrastructure is completely synchronous, every stage’s +handler code can only directly return the events and/or commands resulting from +an input, there are no callbacks. Exceptions thrown within a pipeline stage +will abort processing of the whole pipeline under the assumption that +recoverable error conditions will be signaled in-band to the next stage instead +of raising an exception. + +An overall “logical” pipeline can span multiple execution contexts, for example +starting with the low-level protocol layers directly within an actor handling +the reads and writes to a TCP connection and then being passed to a number of +higher-level actors which do the costly application level processing. This is +supported by feeding the generated events into a sink which sends them to +another actor, and that other actor will then upon reception feed them into its +own pipeline. + +Introducing the Sample Protocol +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +In the following the process of implementing a protocol stack using pipelines +is demonstrated on the following simple example: + +.. code-block:: text + + frameLen: Int + persons: Int + persons times { + first: String + last: String + } + points: Int + points times Double + +mapping to the following data type: + +.. includecode:: code/docs/io/japi/Message.java#message + +We will split the handling of this protocol into two parts: the frame-length +encoding handles the buffering necessary on the read side and the actual +encoding of the frame contents is done in a separate stage. + +Building a Pipeline Stage +^^^^^^^^^^^^^^^^^^^^^^^^^ + +As a common example, which is also included in the ``akka-actor`` package, let +us look at a framing protocol which works by prepending a length field to each +message (the following is a simplified version for demonstration purposes, the +real implementation is more configurable and implemented in Scala). + +.. includecode:: code/docs/io/japi/LengthFieldFrame.java + :include: frame + +In the end a pipeline stage is nothing more than a set of three methods: one +transforming commands arriving from above, one transforming events arriving +from below and the third transforming incoming management commands (not shown +here, see below for more information). The result of the transformation can in +either case be a sequence of commands flowing downwards or events flowing +upwards (or a combination thereof). + +In the case above the data type for commands and events are equal as both +functions operate only on ``ByteString``, and the transformation does not +change that type because it only adds or removes four octets at the front. + +The pair of command and event transformation functions is represented by an +object of type :class:`AbstractPipePair`, or in this case a +:class:`AbstractSymmetricPipePair`. This object could benefit from knowledge +about the context it is running in, for example an :class:`Actor`, and this +context is introduced by making a :class:`PipelineStage` be a factory for +producing a :class:`PipePair`. The factory method is called :meth:`apply` (a +Scala tradition) and receives the context object as its argument. The +implementation of this factory method could now make use of the context in +whatever way it sees fit, you will see an example further down. + +Manipulating ByteStrings +^^^^^^^^^^^^^^^^^^^^^^^^ + +The second stage of our sample protocol stack illustrates in more depth what +showed only a little in the pipeline stage built above: constructing and +deconstructing byte strings. Let us first take a look at the encoder: + +.. includecode:: code/docs/io/japi/MessageStage.java + :include: format + :exclude: decoding-omitted,omitted + +Note how the byte order to be used by this stage is fixed in exactly one place, +making it impossible get wrong between commands and events; the way how the +byte order is passed into the stage demonstrates one possible use for the +stage’s ``context`` parameter. + +The basic tool for constucting a :class:`ByteString` is a +:class:`ByteStringBuilder`. This builder is specialized for concatenating byte +representations of the primitive data types like ``Int`` and ``Double`` or +arrays thereof. Encoding a ``String`` requires a bit more work because not +only the sequence of bytes needs to be encoded but also the length, otherwise +the decoding stage would not know where the ``String`` terminates. When all +values making up the :class:`Message` have been appended to the builder, we +simply pass the resulting :class:`ByteString` on to the next stage as a command +using the optimized :meth:`singleCommand` facility. + +.. warning:: + + The :meth:`singleCommand` and :meth:`singleEvent` methods provide a way to + generate responses which transfer exactly one result from one pipeline stage + to the next without suffering the overhead of object allocations. This means + that the returned collection object will not work for anything else (you will + get :class:`ClassCastExceptions`!) and this facility can only be used *EXACTLY + ONCE* during the processing of one input (command or event). + +Now let us look at the decoder side: + +.. includecode:: code/docs/io/japi/MessageStage.java + :include: decoding + +The decoding side does the same things that the encoder does in the same order, +it just uses a :class:`ByteIterator` to retrieve primitive data types or arrays +of those from the underlying :class:`ByteString`. And in the end it hands the +assembled :class:`Message` as an event to the next stage using the optimized +:meth:`singleEvent` facility (see warning above). + +Building a Pipeline +^^^^^^^^^^^^^^^^^^^ + +Given the two pipeline stages introduced in the sections above we can now put +them to some use. First we define some message to be encoded: + +.. includecode:: code/docs/io/japi/PipelineTest.java + :include: message + +Then we need to create a pipeline context which satisfies our declared needs: + +.. includecode:: code/docs/io/japi/PipelineTest.java + :include: byteorder + +Building the pipeline and encoding this message then is quite simple: + +.. includecode:: code/docs/io/japi/PipelineTest.java + :include: build-sink + +First we *sequence* the two stages, i.e. attach them such that the output of +one becomes the input of the other. Then we create a :class:`PipelineSink` +which is essentially a callback interface for what shall happen with the +encoded commands or decoded events, respectively. Then we build the pipeline +using the :class:`PipelineFactory`, which returns an interface for feeding +commands and events into this pipeline instance. As a demonstration of how to +use this, we simply encode the message shown above and the resulting +:class:`ByteString` will then be sent to the ``commandHandler`` actor. Decoding +works in the same way, only using :meth:`injectEvent`. + +Injecting into a pipeline using a :class:`PipelineInjector` will catch +exceptions resulting from processing the input, in which case the exception +(there can only be one per injection) is passed into the respective sink. The +default implementation of :meth:`onCommandFailure` and :meth:`onEventFailure` +will re-throw the exception (whence originates the ``throws`` declaration of +the ``inject*`` method). + +Using the Pipeline’s Context +^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Up to this point there was always a parameter ``ctx`` which was used when +constructing a pipeline, but it was not explained in full. The context is a +piece of information which is made available to all stages of a pipeline. The +context may also carry behavior, provide infrastructure or helper methods etc. +It should be noted that the context is bound to the pipeline and as such must +not be accessed concurrently from different threads unless care is taken to +properly synchronize such access. Since the context will in many cases be +provided by an actor it is not recommended to share this context with code +executing outside of the actor’s message handling. + +.. warning:: + + A PipelineContext instance *MUST NOT* be used by two different pipelines + since it contains mutable fields which are used during message processing. + +Using Management Commands +^^^^^^^^^^^^^^^^^^^^^^^^^ + +Since pipeline stages do not have any reference to the pipeline or even to +their neighbors they cannot directly effect the injection of commands or events +outside of their normal processing. But sometimes things need to happen driven +by a timer, for example. In this case the timer would need to cause sending +tick messages to the whole pipeline, and those stages which wanted to receive +them would act upon those. In order to keep the type signatures for events and +commands useful, such external triggers are sent out-of-band, via a different +channel—the management port. One example which makes use of this facility is +the :class:`TickGenerator` which comes included with ``akka-actor`` (this is a +transcription of the Scala version which is actually included in the +``akka-actor`` JAR): + +.. includecode:: code/docs/io/japi/HasActorContext.java#actor-context + +.. includecode:: code/docs/io/japi/TickGenerator.java#tick-generator + +This pipeline stage is to be used within an actor, and it will make use of this +context in order to schedule the delivery of ``Tick`` messages; the actor is +then supposed to feed these messages into the management port of the pipeline. +An example could look like this: + +.. includecode:: code/docs/io/japi/Processor.java + :include: actor + :exclude: omitted + +This actor extends our well-known pipeline with the tick generator and attaches +the outputs to functions which send commands and events to actors for further +processing. The pipeline stages will then all receive on ``Tick`` per second +which can be used like so: + +.. includecode:: code/docs/io/japi/MessageStage.java + :include: mgmt-ticks + :exclude: omitted + +.. note:: + + Management commands are delivered to all stages of a pipeline “effectively + parallel”, like on a broadcast medium. No code will actually run concurrently + since a pipeline is strictly single-threaded, but the order in which these + commands are processed is not specified. + +The intended purpose of management commands is for each stage to define its +special command types and then listen only to those (where the aforementioned +``Tick`` message is a useful counter-example), exactly like sending packets on +a wifi network where every station receives all traffic but reacts only to +those messages which are destined for it. + +If you need all stages to react upon something in their defined order, then +this must be modeled either as a command or event, i.e. it will be part of the +“business” type of the pipeline. + Using TCP --------- diff --git a/akka-docs/rst/scala/code/docs/io/Pipelines.scala b/akka-docs/rst/scala/code/docs/io/Pipelines.scala new file mode 100644 index 0000000000..d853c4b644 --- /dev/null +++ b/akka-docs/rst/scala/code/docs/io/Pipelines.scala @@ -0,0 +1,225 @@ +/** + * Copyright (C) 2013 Typesafe Inc. + */ + +package docs.io + +import docs.io.japi.LengthFieldFrame; +import akka.testkit.{ AkkaSpec, EventFilter } +import akka.io._ +import akka.util._ +import akka.actor.{ Actor, ActorRef, Props, PoisonPill } +import scala.util.Success +import scala.util.Try +import scala.concurrent.duration._ + +class PipelinesDocSpec extends AkkaSpec { + + //#data + case class Person(first: String, last: String) + case class HappinessCurve(points: IndexedSeq[Double]) + case class Message(persons: Seq[Person], stats: HappinessCurve) + //#data + + //#format + /** + * This trait is used to formualate a requirement for the pipeline context. + * In this example it is used to configure the byte order to be used. + */ + trait HasByteOrder extends PipelineContext { + def byteOrder: java.nio.ByteOrder + } + + class MessageStage extends SymmetricPipelineStage[HasByteOrder, Message, ByteString] { + + override def apply(ctx: HasByteOrder) = new SymmetricPipePair[Message, ByteString] { + + implicit val byteOrder = ctx.byteOrder + + /** + * Append a length-prefixed UTF-8 encoded string to the ByteStringBuilder. + */ + def putString(builder: ByteStringBuilder, str: String): Unit = { + val bs = ByteString(str, "UTF-8") + builder putInt bs.length + builder ++= bs + } + + override val commandPipeline = { msg: Message ⇒ + val bs = ByteString.newBuilder + + // first store the persons + bs putInt msg.persons.size + msg.persons foreach { p ⇒ + putString(bs, p.first) + putString(bs, p.last) + } + + // then store the doubles + bs putInt msg.stats.points.length + bs putDoubles (msg.stats.points.toArray) + + // and return the result as a command + ctx.singleCommand(bs.result) + } + + //#decoding-omitted + //#decoding + def getString(iter: ByteIterator): String = { + val length = iter.getInt + val bytes = new Array[Byte](length) + iter getBytes bytes + ByteString(bytes).utf8String + } + + override val eventPipeline = { bs: ByteString ⇒ + val iter = bs.iterator + + val personLength = iter.getInt + val persons = + (1 to personLength) map (_ ⇒ Person(getString(iter), getString(iter))) + + val curveLength = iter.getInt + val curve = new Array[Double](curveLength) + iter getDoubles curve + + // verify that this was all; could be left out to allow future extensions + assert(iter.isEmpty) + + ctx.singleEvent(Message(persons, HappinessCurve(curve))) + } + //#decoding + + //#mgmt-ticks + var lastTick = Duration.Zero + + override val managementPort: Mgmt = { + case TickGenerator.Tick(timestamp) ⇒ + //#omitted + testActor ! TickGenerator.Tick(timestamp) + import java.lang.String.{ valueOf ⇒ println } + //#omitted + println(s"time since last tick: ${timestamp - lastTick}") + lastTick = timestamp + Nil + } + //#mgmt-ticks + //#decoding-omitted + } + } + //#format + + "A MessageStage" must { + + //#message + val msg = + Message( + Seq( + Person("Alice", "Gibbons"), + Person("Bob", "Sparsely")), + HappinessCurve(Array(1.0, 3.0, 5.0))) + //#message + + //#byteorder + val ctx = new HasByteOrder { + def byteOrder = java.nio.ByteOrder.BIG_ENDIAN + } + //#byteorder + + "correctly encode and decode" in { + //#build-pipeline + val stages = + new MessageStage >> + new LengthFieldFrame(10000) + + // using the extractor for the returned case class here + val PipelinePorts(cmd, evt, mgmt) = + PipelineFactory.buildFunctionTriple(ctx, stages) + + val encoded: (Iterable[Message], Iterable[ByteString]) = cmd(msg) + //#build-pipeline + encoded._1 must have size 0 + encoded._2 must have size 1 + + evt(encoded._2.head)._1 must be === Seq(msg) + } + + "demonstrate Injector/Sink" in { + val commandHandler = testActor + val eventHandler = testActor + + //#build-sink + val stages = + new MessageStage >> + new LengthFieldFrame(10000) + + val injector = PipelineFactory.buildWithSinkFunctions(ctx, stages)( + commandHandler ! _, // will receive messages of type Try[ByteString] + eventHandler ! _ // will receive messages of type Try[Message] + ) + + injector.injectCommand(msg) + //#build-sink + val encoded = expectMsgType[Success[ByteString]].get + + injector.injectEvent(encoded) + expectMsgType[Try[Message]].get must be === msg + } + + "demonstrate management port and context" in { + //#actor + class Processor(cmds: ActorRef, evts: ActorRef) extends Actor { + + val ctx = new HasActorContext with HasByteOrder { + def getContext = Processor.this.context + def byteOrder = java.nio.ByteOrder.BIG_ENDIAN + } + + val pipeline = PipelineFactory.buildWithSinkFunctions(ctx, + new TickGenerator(1000.millis) >> + new MessageStage >> + new LengthFieldFrame(10000) // + )( + // failure in the pipeline will fail this actor + cmd ⇒ cmds ! cmd.get, + evt ⇒ evts ! evt.get) + + def receive = { + case m: Message ⇒ pipeline.injectCommand(m) + case b: ByteString ⇒ pipeline.injectEvent(b) + case t: TickGenerator.Trigger ⇒ pipeline.managementCommand(t) + } + } + //#actor + + import TickGenerator.Tick + val proc = system.actorOf(Props(new Processor(testActor, testActor) { + override def receive = ({ + case "fail!" ⇒ throw new RuntimeException("FAIL!") + }: Receive) orElse super.receive + }), "processor") + expectMsgType[Tick] + proc ! msg + val encoded = expectMsgType[ByteString] + proc ! encoded + val decoded = expectMsgType[Message] + decoded must be === msg + + within(1.5.seconds, 3.seconds) { + expectMsgType[Tick] + expectMsgType[Tick] + } + EventFilter[RuntimeException]("FAIL!", occurrences = 1) intercept { + proc ! "fail!" + } + within(1.5.seconds, 3.seconds) { + expectMsgType[Tick] + expectMsgType[Tick] + proc ! PoisonPill + expectNoMsg + } + } + + } + +} \ No newline at end of file diff --git a/akka-docs/rst/scala/io.rst b/akka-docs/rst/scala/io.rst index 7235abbb9e..e05f61197e 100644 --- a/akka-docs/rst/scala/io.rst +++ b/akka-docs/rst/scala/io.rst @@ -105,43 +105,263 @@ Compatibility with java.io A ``ByteStringBuilder`` can be wrapped in a ``java.io.OutputStream`` via the ``asOutputStream`` method. Likewise, ``ByteIterator`` can be wrapped in a ``java.io.InputStream`` via ``asInputStream``. Using these, ``akka.io`` applications can integrate legacy code based on ``java.io`` streams. Encoding and decoding binary data -.................................... +--------------------------------- -``ByteStringBuilder`` and ``ByteIterator`` support encoding and decoding of binary data. As an example, consider a stream of binary data frames with the following format: +.. note:: + + Previously Akka offered a specialized Iteratee implementation in the + ``akka.actor.IO`` object which is now deprecated in favor of the pipeline + mechanism described here. The documentation for Iteratees can be found `here + `_. + +Akka adopted and adapted the implementation of data processing pipelines found +in the ``spray-io`` module. The idea is that encoding and decoding often +go hand in hand and keeping the code pertaining to one protocol layer together +is deemed more important than writing down the complete read side—say—in the +iteratee style in one go; pipelines encourage packaging the stages in a form +which lends itself better to reuse in a protocol stack. Another reason for +choosing this abstraction is that it is at times necessary to change the +behavior of encoding and decoding within a stage based on a message stream’s +state, and pipeline stages allow communication between the read and write +halves quite naturally. + +The actual byte-fiddling can be done within pipeline stages, for example using +the rich API of :class:`ByteIterator` and :class:`ByteStringBuilder` as shown +below. All these activities are synchronous transformations which benefit +greatly from CPU affinity to make good use of those data caches. Therefore the +design of the pipeline infrastructure is completely synchronous, every stage’s +handler code can only directly return the events and/or commands resulting from +an input, there are no callbacks. Exceptions thrown within a pipeline stage +will abort processing of the whole pipeline under the assumption that +recoverable error conditions will be signaled in-band to the next stage instead +of raising an exception. + +An overall “logical” pipeline can span multiple execution contexts, for example +starting with the low-level protocol layers directly within an actor handling +the reads and writes to a TCP connection and then being passed to a number of +higher-level actors which do the costly application level processing. This is +supported by feeding the generated events into a sink which sends them to +another actor, and that other actor will then upon reception feed them into its +own pipeline. + +Introducing the Sample Protocol +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +In the following the process of implementing a protocol stack using pipelines +is demonstrated on the following simple example: .. code-block:: text frameLen: Int - n: Int - m: Int - n times { - a: Short - b: Long + persons: Int + persons times { + first: String + last: String } - data: m times Double + points: Int + points times Double -In this example, the data will be stored in arrays of ``a``, ``b`` of length ``n`` and ``data`` of length ``m``. +mapping to the following data type: -Decoding of such frames can be efficiently implemented in the following fashion: +.. includecode:: code/docs/io/Pipelines.scala#data -.. includecode:: code/docs/io/BinaryCoding.scala +We will split the handling of this protocol into two parts: the frame-length +encoding handles the buffering necessary on the read side and the actual +encoding of the frame contents is done in a separate stage. + +Building a Pipeline Stage +^^^^^^^^^^^^^^^^^^^^^^^^^ + +As a common example, which is also included in the ``akka-actor`` package, let +us look at a framing protocol which works by prepending a length field to each +message. + +.. includecode:: ../../../akka-actor/src/main/scala/akka/io/Pipelines.scala + :include: length-field-frame + :exclude: range-checks-omitted + +In the end a pipeline stage is nothing more than a set of three functions: one +transforming commands arriving from above, one transforming events arriving +from below and the third transforming incoming management commands (not shown +here, see below for more information). The result of the transformation can in +either case be a sequence of commands flowing downwards or events flowing +upwards (or a combination thereof). + +In the case above the data type for commands and events are equal as both +functions operate only on ``ByteString``, and the transformation does not +change that type because it only adds or removes four octets at the front. + +The pair of command and event transformation functions is represented by an +object of type :class:`PipePair`, or in this case a :class:`SymmetricPipePair`. +This object could benefit from knowledge about the context it is running in, +for example an :class:`Actor`, and this context is introduced by making a +:class:`PipelineStage` be a factory for producing a :class:`PipePair`. The +factory method is called :meth:`apply` (in good Scala tradition) and receives +the context object as its argument. The implementation of this factory method +could now make use of the context in whatever way it sees fit, you will see an +example further down. + +Manipulating ByteStrings +^^^^^^^^^^^^^^^^^^^^^^^^ + +The second stage of our sample protocol stack illustrates in more depth what +showed only a little in the pipeline stage built above: constructing and +deconstructing byte strings. Let us first take a look at the encoder: + +.. includecode:: code/docs/io/Pipelines.scala + :include: format + :exclude: decoding-omitted,omitted + +Note how the byte order to be used by this stage is fixed in exactly one place, +making it impossible get wrong between commands and events; the way how the +byte order is passed into the stage demonstrates one possible use for the +stage’s ``context`` parameter. + +The basic tool for constucting a :class:`ByteString` is a +:class:`ByteStringBuilder` which can be obtained by calling +:meth:`ByteString.newBuilder` since byte strings implement the +:class:`IndexesSeq[Byte]` interface of the standard Scala collections. This +builder knows a few extra tricks, though, for appending byte representations of +the primitive data types like ``Int`` and ``Double`` or arrays thereof. +Encoding a ``String`` requires a bit more work because not only the sequence of +bytes needs to be encoded but also the length, otherwise the decoding stage +would not know where the ``String`` terminates. When all values making up the +:class:`Message` have been appended to the builder, we simply pass the +resulting :class:`ByteString` on to the next stage as a command using the +optimized :meth:`singleCommand` facility. + +.. warning:: + + The :meth:`singleCommand` and :meth:`singleEvent` methods provide a way to + generate responses which transfer exactly one result from one pipeline stage + to the next without suffering the overhead of object allocations. This means + that the returned collection object will not work for anything else (you will + get :class:`ClassCastExceptions`!) and this facility can only be used *EXACTLY + ONCE* during the processing of one input (command or event). + +Now let us look at the decoder side: + +.. includecode:: code/docs/io/Pipelines.scala :include: decoding -This implementation naturally follows the example data format. In a true Scala application one might of course want to use specialized immutable ``Short``/``Long``/``Double`` containers instead of mutable Arrays. +The decoding side does the same things that the encoder does in the same order, +it just uses a :class:`ByteIterator` to retrieve primitive data types or arrays +of those from the underlying :class:`ByteString`. And in the end it hands the +assembled :class:`Message` as an event to the next stage using the optimized +:meth:`singleEvent` facility (see warning above). -After extracting data from a ``ByteIterator``, the remaining content can also be turned back into a ``ByteString`` using -the ``toSeq`` method. No bytes are copied. Because of immutability the underlying bytes can be shared between both the -``ByteIterator`` and the ``ByteString``. +Building a Pipeline +^^^^^^^^^^^^^^^^^^^ -.. includecode:: code/docs/io/BinaryCoding.scala - :include: rest-to-seq +Given the two pipeline stages introduced in the sections above we can now put +them to some use. First we define some message to be encoded: -In general, conversions from ``ByteString`` to ``ByteIterator`` and vice versa are O(1) for non-chunked ByteStrings and (at worst) O(nChunks) for chunked ByteStrings. +.. includecode:: code/docs/io/Pipelines.scala + :include: message -Encoding of data also is very natural, using ``ByteStringBuilder`` +Then we need to create a pipeline context which satisfies our declared needs: -.. includecode:: code/docs/io/BinaryCoding.scala - :include: encoding +.. includecode:: code/docs/io/Pipelines.scala + :include: byteorder + +Building the pipeline and encoding this message then is quite simple: + +.. includecode:: code/docs/io/Pipelines.scala + :include: build-pipeline + +The tuple returned from :meth:`buildFunctionTriple` contains one function for +injecting commands, one for events and a third for injecting management +commands (see below). In this case we demonstrate how a single message ``msg`` +is encoded by passing it into the ``cmd`` function. The return value is a pair +of sequences, one for the resulting events and the other for the resulting +commands. For the sample pipeline this will contain exactly one command—one +:class:`ByteString`. Decoding works in the same way, only with the ``evt`` +function (which can again also result in commands being generated, although +that is not demonstrated in this sample). + +Besides the more functional style there is also an explicitly side-effecting one: + +.. includecode:: code/docs/io/Pipelines.scala + :include: build-sink + +The functions passed into the :meth:`buildWithSinkFunctions` factory method +describe what shall happen to the commands and events as they fall out of the +pipeline. In this case we just send those to some actors, since that is usually +quite a good strategy for distributing the work represented by the messages. + +The types of commands or events fed into the provided sink functions are +wrapped within :class:`Try` so that failures can also be encoded and acted +upon. This means that injecting into a pipeline using a +:class:`PipelineInjector` will catch exceptions resulting from processing the +input, in which case the exception (there can only be one per injection) is +passed into the respective sink. + +Using the Pipeline’s Context +^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Up to this point there was always a parameter ``ctx`` which was used when +constructing a pipeline, but it was not explained in full. The context is a +piece of information which is made available to all stages of a pipeline. The +context may also carry behavior, provide infrastructure or helper methods etc. +It should be noted that the context is bound to the pipeline and as such must +not be accessed concurrently from different threads unless care is taken to +properly synchronize such access. Since the context will in many cases be +provided by an actor it is not recommended to share this context with code +executing outside of the actor’s message handling. + +.. warning:: + + A PipelineContext instance *MUST NOT* be used by two different pipelines + since it contains mutable fields which are used during message processing. + +Using Management Commands +^^^^^^^^^^^^^^^^^^^^^^^^^ + +Since pipeline stages do not have any reference to the pipeline or even to +their neighbors they cannot directly effect the injection of commands or events +outside of their normal processing. But sometimes things need to happen driven +by a timer, for example. In this case the timer would need to cause sending +tick messages to the whole pipeline, and those stages which wanted to receive +them would act upon those. In order to keep the type signatures for events and +commands useful, such external triggers are sent out-of-band, via a different +channel—the management port. One example which makes use of this facility is +the :class:`TickGenerator` which comes included with ``akka-actor``: + +.. includecode:: ../../../akka-actor/src/main/scala/akka/io/Pipelines.scala + :include: tick-generator + +This pipeline stage is to be used within an actor, and it will make use of this +context in order to schedule the delivery of :class:`TickGenerator.Trigger` +messages; the actor is then supposed to feed these messages into the management +port of the pipeline. An example could look like this: + +.. includecode:: code/docs/io/Pipelines.scala#actor + +This actor extends our well-known pipeline with the tick generator and attaches +the outputs to functions which send commands and events to actors for further +processing. The pipeline stages will then all receive one ``Tick`` per second +which can be used like so: + +.. includecode:: code/docs/io/Pipelines.scala + :include: mgmt-ticks + :exclude: omitted + +.. note:: + + Management commands are delivered to all stages of a pipeline “effectively + parallel”, like on a broadcast medium. No code will actually run concurrently + since a pipeline is strictly single-threaded, but the order in which these + commands are processed is not specified. + +The intended purpose of management commands is for each stage to define its +special command types and then listen only to those (where the aforementioned +``Tick`` message is a useful counter-example), exactly like sending packets on +a wifi network where every station receives all traffic but reacts only to +those messages which are destined for it. + +If you need all stages to react upon something in their defined order, then +this must be modeled either as a command or event, i.e. it will be part of the +“business” type of the pipeline. Using TCP --------- diff --git a/akka-multi-node-testkit/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala b/akka-multi-node-testkit/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala index e3f760ccfe..6815f754c7 100644 --- a/akka-multi-node-testkit/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala +++ b/akka-multi-node-testkit/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala @@ -293,12 +293,12 @@ abstract class MultiNodeSpec(val myself: RoleName, _system: ActorSystem, _roles: /** * Override this method to do something when the whole test is starting up. */ - protected def atStartup(): Unit = {} + protected def atStartup(): Unit = () /** * Override this method to do something when the whole test is terminating. */ - protected def afterTermination(): Unit = {} + protected def afterTermination(): Unit = () /** * All registered roles diff --git a/akka-remote/src/main/scala/akka/remote/transport/netty/NettyHelpers.scala b/akka-remote/src/main/scala/akka/remote/transport/netty/NettyHelpers.scala index e3d793f9e5..15829bf449 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/netty/NettyHelpers.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/netty/NettyHelpers.scala @@ -13,15 +13,15 @@ import scala.util.control.NonFatal */ private[netty] trait NettyHelpers { - protected def onConnect(ctx: ChannelHandlerContext, e: ChannelStateEvent): Unit = {} + protected def onConnect(ctx: ChannelHandlerContext, e: ChannelStateEvent): Unit = () - protected def onDisconnect(ctx: ChannelHandlerContext, e: ChannelStateEvent): Unit = {} + protected def onDisconnect(ctx: ChannelHandlerContext, e: ChannelStateEvent): Unit = () - protected def onOpen(ctx: ChannelHandlerContext, e: ChannelStateEvent): Unit = {} + protected def onOpen(ctx: ChannelHandlerContext, e: ChannelStateEvent): Unit = () - protected def onMessage(ctx: ChannelHandlerContext, e: MessageEvent): Unit = {} + protected def onMessage(ctx: ChannelHandlerContext, e: MessageEvent): Unit = () - protected def onException(ctx: ChannelHandlerContext, e: ExceptionEvent): Unit = {} + protected def onException(ctx: ChannelHandlerContext, e: ExceptionEvent): Unit = () final protected def transformException(ctx: ChannelHandlerContext, ev: ExceptionEvent): Unit = { val cause = if (ev.getCause ne null) ev.getCause else new AkkaException("Unknown cause")