Initial port of Dataflow API, added it as its own subproject, switched to implicit value classes with cps transformation annotations

This commit is contained in:
Viktor Klang 2012-08-07 14:30:16 +02:00
parent 1510f10842
commit 71fa783452
3 changed files with 216 additions and 1 deletions

View file

@ -0,0 +1,71 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka
import language.implicitConversions
import scala.util.continuations._
import scala.concurrent.{ Promise, Future, ExecutionContext }
import scala.util.control.NonFatal
package object dataflow {
/**
* Captures a block that will be transformed into 'Continuation Passing Style' using Scala's Delimited
* Continuations plugin.
*
* Within the block, the result of a Future may be accessed by calling Future.apply. At that point
* execution is suspended with the rest of the block being stored in a continuation until the result
* of the Future is available. If an Exception is thrown while processing, it will be contained
* within the resulting Future.
*
* This allows working with Futures in an imperative style without blocking for each result.
*
* Completing a Future using 'Promise << Future' will also suspend execution until the
* value of the other Future is available.
*
* The Delimited Continuations compiler plugin must be enabled in order to use this method.
*/
def flow[A](body: A @cps[Future[Any]])(implicit executor: ExecutionContext): Future[A] = {
val p = Promise[A]
executor.execute(
new Runnable {
def run = try {
(reify(body) foreachFull (r p.success(r).future, f p.failure(f).future): Future[Any]) onFailure {
case NonFatal(e) p tryComplete Left(e)
}
} catch {
case NonFatal(e) p tryComplete Left(e)
}
})
p.future
}
implicit class DataflowPromise[T](val promise: Promise[T]) extends AnyVal {
final def <<(value: T): Future[T] @cps[Future[Any]] = shift {
cont: (Future[T] Future[Any]) cont(promise.success(value).future)
}
final def <<(other: Future[T])(implicit ec: ExecutionContext): Future[T] @cps[Future[Any]] = shift {
cont: (Future[T] Future[Any])
val fr = Promise[Any]()
(promise completeWith other).future onComplete {
v try { fr completeWith cont(promise.future) } catch { case NonFatal(e) fr failure e }
}
fr.future
}
final def apply()(implicit ec: ExecutionContext): T @cps[Future[Any]] = shift(promise.future flatMap (_: T Future[Any]))
}
implicit class DataflowFuture[T](val future: Future[T]) extends AnyVal {
/**
* For use only within a Future.flow block or another compatible Delimited Continuations reset block.
*
* Returns the result of this Future without blocking, by suspending execution and storing it as a
* continuation until the result is available.
*/
final def apply()(implicit ec: ExecutionContext): T @cps[Future[Any]] = shift(future flatMap (_: T Future[Any]))
}
}

View file

@ -0,0 +1,132 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.dataflow
/*import language.postfixOps
import akka.testkit.AkkaSpec
import akka.actor.Actor
import akka.actor.ActorLogging
import scala.concurrent.util.duration._
import akka.event.Logging
import akka.actor.Props
import ch.qos.logback.core.OutputStreamAppender
import java.io.StringWriter
import java.io.ByteArrayOutputStream
import org.scalatest.BeforeAndAfterEach
object Slf4jEventHandlerSpec {
// This test depends on logback configuration in src/test/resources/logback-test.xml
val config = """
akka {
loglevel = INFO
event-handlers = ["akka.event.slf4j.Slf4jEventHandler"]
event-handler-startup-timeout = 30s
}
"""
class LogProducer extends Actor with ActorLogging {
def receive = {
case e: Exception
log.error(e, e.getMessage)
case (s: String, x: Int, y: Int)
log.info(s, x, y)
}
}
class MyLogSource
val output = new ByteArrayOutputStream
def outputString: String = output.toString("UTF-8")
class TestAppender extends OutputStreamAppender {
override def start(): Unit = {
setOutputStream(output)
super.start()
}
}
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class Slf4jEventHandlerSpec extends AkkaSpec(Slf4jEventHandlerSpec.config) with BeforeAndAfterEach {
import Slf4jEventHandlerSpec._
val producer = system.actorOf(Props[LogProducer], name = "logProducer")
override def beforeEach(): Unit = {
output.reset()
}
val sourceThreadRegex = "sourceThread=\\[Slf4jEventHandlerSpec-akka.actor.default-dispatcher-[1-9][0-9]*\\]"
"Slf4jEventHandler" must {
"log error with stackTrace" in {
producer ! new RuntimeException("Simulated error")
awaitCond(outputString.contains("----"), 5 seconds)
val s = outputString
s must include("akkaSource=[akka://Slf4jEventHandlerSpec/user/logProducer]")
s must include("level=[ERROR]")
s must include("logger=[akka.event.slf4j.Slf4jEventHandlerSpec$LogProducer]")
s must include regex (sourceThreadRegex)
s must include("msg=[Simulated error]")
s must include("java.lang.RuntimeException: Simulated error")
s must include("at akka.event.slf4j.Slf4jEventHandlerSpec")
}
"log info with parameters" in {
producer ! ("test x={} y={}", 3, 17)
awaitCond(outputString.contains("----"), 5 seconds)
val s = outputString
s must include("akkaSource=[akka://Slf4jEventHandlerSpec/user/logProducer]")
s must include("level=[INFO]")
s must include("logger=[akka.event.slf4j.Slf4jEventHandlerSpec$LogProducer]")
s must include regex (sourceThreadRegex)
s must include("msg=[test x=3 y=17]")
}
"include system info in akkaSource when creating Logging with system" in {
val log = Logging(system, "akka.event.slf4j.Slf4jEventHandlerSpec.MyLogSource")
log.info("test")
awaitCond(outputString.contains("----"), 5 seconds)
val s = outputString
s must include("akkaSource=[akka.event.slf4j.Slf4jEventHandlerSpec.MyLogSource(akka://Slf4jEventHandlerSpec)]")
s must include("logger=[akka.event.slf4j.Slf4jEventHandlerSpec.MyLogSource(akka://Slf4jEventHandlerSpec)]")
}
"not include system info in akkaSource when creating Logging with system.eventStream" in {
val log = Logging(system.eventStream, "akka.event.slf4j.Slf4jEventHandlerSpec.MyLogSource")
log.info("test")
awaitCond(outputString.contains("----"), 5 seconds)
val s = outputString
s must include("akkaSource=[akka.event.slf4j.Slf4jEventHandlerSpec.MyLogSource]")
s must include("logger=[akka.event.slf4j.Slf4jEventHandlerSpec.MyLogSource]")
}
"use short class name and include system info in akkaSource when creating Logging with system and class" in {
val log = Logging(system, classOf[MyLogSource])
log.info("test")
awaitCond(outputString.contains("----"), 5 seconds)
val s = outputString
s must include("akkaSource=[Slf4jEventHandlerSpec$MyLogSource(akka://Slf4jEventHandlerSpec)]")
s must include("logger=[akka.event.slf4j.Slf4jEventHandlerSpec$MyLogSource]")
}
"use short class name in akkaSource when creating Logging with system.eventStream and class" in {
val log = Logging(system.eventStream, classOf[MyLogSource])
log.info("test")
awaitCond(outputString.contains("----"), 5 seconds)
val s = outputString
s must include("akkaSource=[Slf4jEventHandlerSpec$MyLogSource]")
s must include("logger=[akka.event.slf4j.Slf4jEventHandlerSpec$MyLogSource]")
}
}
}*/

View file

@ -67,7 +67,7 @@ object AkkaBuild extends Build {
sphinxLatex <<= sphinxLatex in LocalProject(docs.id) map identity,
sphinxPdf <<= sphinxPdf in LocalProject(docs.id) map identity
),
aggregate = Seq(actor, testkit, actorTests, remote, remoteTests, camel, cluster, slf4j, agent, transactor, mailboxes, zeroMQ, kernel, akkaSbtPlugin, samples, tutorials, osgi, osgiAries, docs)
aggregate = Seq(actor, testkit, actorTests, dataflow, remote, remoteTests, camel, cluster, slf4j, agent, transactor, mailboxes, zeroMQ, kernel, akkaSbtPlugin, samples, tutorials, osgi, osgiAries, docs)
)
lazy val actor = Project(
@ -84,6 +84,16 @@ object AkkaBuild extends Build {
)
)
lazy val dataflow = Project(
id = "akka-dataflow",
base = file("akka-dataflow"),
dependencies = Seq(actor, testkit % "test->test"),
settings = defaultSettings ++ OSGi.dataflow ++ Seq(
libraryDependencies <+= scalaVersion { v => compilerPlugin("org.scala-lang.plugins" % "continuations" % v) },
scalacOptions += "-P:continuations:enable"
)
)
lazy val testkit = Project(
id = "akka-testkit",
base = file("akka-testkit"),
@ -573,6 +583,8 @@ object OSGi {
val slf4j = exports(Seq("akka.event.slf4j.*"))
val dataflow = exports(Seq("akka.dataflow.*"))
val transactor = exports(Seq("akka.transactor.*"))
val zeroMQ = exports(Seq("akka.zeromq.*"))