diff --git a/akka-dataflow/src/main/scala/akka/package.scala b/akka-dataflow/src/main/scala/akka/package.scala new file mode 100644 index 0000000000..9b8579fa83 --- /dev/null +++ b/akka-dataflow/src/main/scala/akka/package.scala @@ -0,0 +1,71 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka + +import language.implicitConversions + +import scala.util.continuations._ +import scala.concurrent.{ Promise, Future, ExecutionContext } +import scala.util.control.NonFatal + +package object dataflow { + /** + * Captures a block that will be transformed into 'Continuation Passing Style' using Scala's Delimited + * Continuations plugin. + * + * Within the block, the result of a Future may be accessed by calling Future.apply. At that point + * execution is suspended with the rest of the block being stored in a continuation until the result + * of the Future is available. If an Exception is thrown while processing, it will be contained + * within the resulting Future. + * + * This allows working with Futures in an imperative style without blocking for each result. + * + * Completing a Future using 'Promise << Future' will also suspend execution until the + * value of the other Future is available. + * + * The Delimited Continuations compiler plugin must be enabled in order to use this method. + */ + def flow[A](body: ⇒ A @cps[Future[Any]])(implicit executor: ExecutionContext): Future[A] = { + val p = Promise[A] + executor.execute( + new Runnable { + def run = try { + (reify(body) foreachFull (r ⇒ p.success(r).future, f ⇒ p.failure(f).future): Future[Any]) onFailure { + case NonFatal(e) ⇒ p tryComplete Left(e) + } + } catch { + case NonFatal(e) ⇒ p tryComplete Left(e) + } + }) + p.future + } + + implicit class DataflowPromise[T](val promise: Promise[T]) extends AnyVal { + final def <<(value: T): Future[T] @cps[Future[Any]] = shift { + cont: (Future[T] ⇒ Future[Any]) ⇒ cont(promise.success(value).future) + } + + final def <<(other: Future[T])(implicit ec: ExecutionContext): Future[T] @cps[Future[Any]] = shift { + cont: (Future[T] ⇒ Future[Any]) ⇒ + val fr = Promise[Any]() + (promise completeWith other).future onComplete { + v ⇒ try { fr completeWith cont(promise.future) } catch { case NonFatal(e) ⇒ fr failure e } + } + fr.future + } + + final def apply()(implicit ec: ExecutionContext): T @cps[Future[Any]] = shift(promise.future flatMap (_: T ⇒ Future[Any])) + } + + implicit class DataflowFuture[T](val future: Future[T]) extends AnyVal { + /** + * For use only within a Future.flow block or another compatible Delimited Continuations reset block. + * + * Returns the result of this Future without blocking, by suspending execution and storing it as a + * continuation until the result is available. + */ + final def apply()(implicit ec: ExecutionContext): T @cps[Future[Any]] = shift(future flatMap (_: T ⇒ Future[Any])) + } +} \ No newline at end of file diff --git a/akka-dataflow/src/test/scala/akka/dataflow/DataflowSpec.scala b/akka-dataflow/src/test/scala/akka/dataflow/DataflowSpec.scala new file mode 100644 index 0000000000..5c284f8946 --- /dev/null +++ b/akka-dataflow/src/test/scala/akka/dataflow/DataflowSpec.scala @@ -0,0 +1,132 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.dataflow + +/*import language.postfixOps + +import akka.testkit.AkkaSpec +import akka.actor.Actor +import akka.actor.ActorLogging +import scala.concurrent.util.duration._ +import akka.event.Logging +import akka.actor.Props +import ch.qos.logback.core.OutputStreamAppender +import java.io.StringWriter +import java.io.ByteArrayOutputStream +import org.scalatest.BeforeAndAfterEach + +object Slf4jEventHandlerSpec { + + // This test depends on logback configuration in src/test/resources/logback-test.xml + + val config = """ + akka { + loglevel = INFO + event-handlers = ["akka.event.slf4j.Slf4jEventHandler"] + event-handler-startup-timeout = 30s + } + """ + + class LogProducer extends Actor with ActorLogging { + def receive = { + case e: Exception ⇒ + log.error(e, e.getMessage) + case (s: String, x: Int, y: Int) ⇒ + log.info(s, x, y) + } + } + + class MyLogSource + + val output = new ByteArrayOutputStream + def outputString: String = output.toString("UTF-8") + + class TestAppender extends OutputStreamAppender { + + override def start(): Unit = { + setOutputStream(output) + super.start() + } + } + +} + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class Slf4jEventHandlerSpec extends AkkaSpec(Slf4jEventHandlerSpec.config) with BeforeAndAfterEach { + import Slf4jEventHandlerSpec._ + + val producer = system.actorOf(Props[LogProducer], name = "logProducer") + + override def beforeEach(): Unit = { + output.reset() + } + + val sourceThreadRegex = "sourceThread=\\[Slf4jEventHandlerSpec-akka.actor.default-dispatcher-[1-9][0-9]*\\]" + + "Slf4jEventHandler" must { + + "log error with stackTrace" in { + producer ! new RuntimeException("Simulated error") + + awaitCond(outputString.contains("----"), 5 seconds) + val s = outputString + s must include("akkaSource=[akka://Slf4jEventHandlerSpec/user/logProducer]") + s must include("level=[ERROR]") + s must include("logger=[akka.event.slf4j.Slf4jEventHandlerSpec$LogProducer]") + s must include regex (sourceThreadRegex) + s must include("msg=[Simulated error]") + s must include("java.lang.RuntimeException: Simulated error") + s must include("at akka.event.slf4j.Slf4jEventHandlerSpec") + } + + "log info with parameters" in { + producer ! ("test x={} y={}", 3, 17) + + awaitCond(outputString.contains("----"), 5 seconds) + val s = outputString + s must include("akkaSource=[akka://Slf4jEventHandlerSpec/user/logProducer]") + s must include("level=[INFO]") + s must include("logger=[akka.event.slf4j.Slf4jEventHandlerSpec$LogProducer]") + s must include regex (sourceThreadRegex) + s must include("msg=[test x=3 y=17]") + } + + "include system info in akkaSource when creating Logging with system" in { + val log = Logging(system, "akka.event.slf4j.Slf4jEventHandlerSpec.MyLogSource") + log.info("test") + awaitCond(outputString.contains("----"), 5 seconds) + val s = outputString + s must include("akkaSource=[akka.event.slf4j.Slf4jEventHandlerSpec.MyLogSource(akka://Slf4jEventHandlerSpec)]") + s must include("logger=[akka.event.slf4j.Slf4jEventHandlerSpec.MyLogSource(akka://Slf4jEventHandlerSpec)]") + } + + "not include system info in akkaSource when creating Logging with system.eventStream" in { + val log = Logging(system.eventStream, "akka.event.slf4j.Slf4jEventHandlerSpec.MyLogSource") + log.info("test") + awaitCond(outputString.contains("----"), 5 seconds) + val s = outputString + s must include("akkaSource=[akka.event.slf4j.Slf4jEventHandlerSpec.MyLogSource]") + s must include("logger=[akka.event.slf4j.Slf4jEventHandlerSpec.MyLogSource]") + } + + "use short class name and include system info in akkaSource when creating Logging with system and class" in { + val log = Logging(system, classOf[MyLogSource]) + log.info("test") + awaitCond(outputString.contains("----"), 5 seconds) + val s = outputString + s must include("akkaSource=[Slf4jEventHandlerSpec$MyLogSource(akka://Slf4jEventHandlerSpec)]") + s must include("logger=[akka.event.slf4j.Slf4jEventHandlerSpec$MyLogSource]") + } + + "use short class name in akkaSource when creating Logging with system.eventStream and class" in { + val log = Logging(system.eventStream, classOf[MyLogSource]) + log.info("test") + awaitCond(outputString.contains("----"), 5 seconds) + val s = outputString + s must include("akkaSource=[Slf4jEventHandlerSpec$MyLogSource]") + s must include("logger=[akka.event.slf4j.Slf4jEventHandlerSpec$MyLogSource]") + } + } + +}*/ diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index d8e30f4d66..7cb609aabc 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -67,7 +67,7 @@ object AkkaBuild extends Build { sphinxLatex <<= sphinxLatex in LocalProject(docs.id) map identity, sphinxPdf <<= sphinxPdf in LocalProject(docs.id) map identity ), - aggregate = Seq(actor, testkit, actorTests, remote, remoteTests, camel, cluster, slf4j, agent, transactor, mailboxes, zeroMQ, kernel, akkaSbtPlugin, samples, tutorials, osgi, osgiAries, docs) + aggregate = Seq(actor, testkit, actorTests, dataflow, remote, remoteTests, camel, cluster, slf4j, agent, transactor, mailboxes, zeroMQ, kernel, akkaSbtPlugin, samples, tutorials, osgi, osgiAries, docs) ) lazy val actor = Project( @@ -84,6 +84,16 @@ object AkkaBuild extends Build { ) ) + lazy val dataflow = Project( + id = "akka-dataflow", + base = file("akka-dataflow"), + dependencies = Seq(actor, testkit % "test->test"), + settings = defaultSettings ++ OSGi.dataflow ++ Seq( + libraryDependencies <+= scalaVersion { v => compilerPlugin("org.scala-lang.plugins" % "continuations" % v) }, + scalacOptions += "-P:continuations:enable" + ) + ) + lazy val testkit = Project( id = "akka-testkit", base = file("akka-testkit"), @@ -573,6 +583,8 @@ object OSGi { val slf4j = exports(Seq("akka.event.slf4j.*")) + val dataflow = exports(Seq("akka.dataflow.*")) + val transactor = exports(Seq("akka.transactor.*")) val zeroMQ = exports(Seq("akka.zeromq.*"))