diff --git a/akka-dataflow/src/main/scala/akka/package.scala b/akka-dataflow/src/main/scala/akka/package.scala index 9b8579fa83..27b7ba2275 100644 --- a/akka-dataflow/src/main/scala/akka/package.scala +++ b/akka-dataflow/src/main/scala/akka/package.scala @@ -56,6 +56,8 @@ package object dataflow { fr.future } + final def <<(other: Promise[T])(implicit ec: ExecutionContext): Future[T] @cps[Future[Any]] = <<(other.future) + final def apply()(implicit ec: ExecutionContext): T @cps[Future[Any]] = shift(promise.future flatMap (_: T ⇒ Future[Any])) } diff --git a/akka-dataflow/src/test/scala/akka/dataflow/DataflowSpec.scala b/akka-dataflow/src/test/scala/akka/dataflow/DataflowSpec.scala index 5c284f8946..c5d543f84d 100644 --- a/akka-dataflow/src/test/scala/akka/dataflow/DataflowSpec.scala +++ b/akka-dataflow/src/test/scala/akka/dataflow/DataflowSpec.scala @@ -3,130 +3,298 @@ */ package akka.dataflow -/*import language.postfixOps +import language.postfixOps -import akka.testkit.AkkaSpec -import akka.actor.Actor -import akka.actor.ActorLogging +import scala.reflect.ClassTag +import akka.actor.{ Actor, Status, Props } +import akka.actor.Status._ +import akka.pattern.ask +import akka.testkit.{ EventFilter, filterEvents, filterException } +import scala.concurrent.{ Await, Promise, Future } import scala.concurrent.util.duration._ -import akka.event.Logging -import akka.actor.Props -import ch.qos.logback.core.OutputStreamAppender -import java.io.StringWriter -import java.io.ByteArrayOutputStream -import org.scalatest.BeforeAndAfterEach +import akka.testkit.{ DefaultTimeout, TestLatch, AkkaSpec } +import java.util.concurrent.TimeoutException -object Slf4jEventHandlerSpec { - - // This test depends on logback configuration in src/test/resources/logback-test.xml - - val config = """ - akka { - loglevel = INFO - event-handlers = ["akka.event.slf4j.Slf4jEventHandler"] - event-handler-startup-timeout = 30s - } - """ - - class LogProducer extends Actor with ActorLogging { +object DataflowSpec { + class TestActor extends Actor { def receive = { - case e: Exception ⇒ - log.error(e, e.getMessage) - case (s: String, x: Int, y: Int) ⇒ - log.info(s, x, y) + case "Hello" ⇒ sender ! "World" + case "Failure" ⇒ + sender ! Status.Failure(new RuntimeException("Expected exception; to test fault-tolerance")) + case "NoReply" ⇒ } } - class MyLogSource - - val output = new ByteArrayOutputStream - def outputString: String = output.toString("UTF-8") - - class TestAppender extends OutputStreamAppender { - - override def start(): Unit = { - setOutputStream(output) - super.start() + class TestDelayActor(await: TestLatch) extends Actor { + def receive = { + case "Hello" ⇒ Await.ready(await, TestLatch.DefaultTimeout); sender ! "World" + case "NoReply" ⇒ Await.ready(await, TestLatch.DefaultTimeout) + case "Failure" ⇒ + Await.ready(await, TestLatch.DefaultTimeout) + sender ! Status.Failure(new RuntimeException("Expected exception; to test fault-tolerance")) } } - } -@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class Slf4jEventHandlerSpec extends AkkaSpec(Slf4jEventHandlerSpec.config) with BeforeAndAfterEach { - import Slf4jEventHandlerSpec._ +class DataflowSpec extends AkkaSpec with DefaultTimeout { + import DataflowSpec._ + import system.dispatcher + "Dataflow API" must { + "futureComposingWithContinuations" in { - val producer = system.actorOf(Props[LogProducer], name = "logProducer") + val actor = system.actorOf(Props[TestActor]) - override def beforeEach(): Unit = { - output.reset() - } + val x = Future("Hello") + val y = x flatMap (actor ? _) mapTo manifest[String] - val sourceThreadRegex = "sourceThread=\\[Slf4jEventHandlerSpec-akka.actor.default-dispatcher-[1-9][0-9]*\\]" + val r = flow(x() + " " + y() + "!") - "Slf4jEventHandler" must { + assert(Await.result(r, timeout.duration) === "Hello World!") - "log error with stackTrace" in { - producer ! new RuntimeException("Simulated error") - - awaitCond(outputString.contains("----"), 5 seconds) - val s = outputString - s must include("akkaSource=[akka://Slf4jEventHandlerSpec/user/logProducer]") - s must include("level=[ERROR]") - s must include("logger=[akka.event.slf4j.Slf4jEventHandlerSpec$LogProducer]") - s must include regex (sourceThreadRegex) - s must include("msg=[Simulated error]") - s must include("java.lang.RuntimeException: Simulated error") - s must include("at akka.event.slf4j.Slf4jEventHandlerSpec") + system.stop(actor) } - "log info with parameters" in { - producer ! ("test x={} y={}", 3, 17) + "futureComposingWithContinuationsFailureDivideZero" in { + filterException[ArithmeticException] { - awaitCond(outputString.contains("----"), 5 seconds) - val s = outputString - s must include("akkaSource=[akka://Slf4jEventHandlerSpec/user/logProducer]") - s must include("level=[INFO]") - s must include("logger=[akka.event.slf4j.Slf4jEventHandlerSpec$LogProducer]") - s must include regex (sourceThreadRegex) - s must include("msg=[test x=3 y=17]") + val x = Future("Hello") + val y = x map (_.length) + + val r = flow(x() + " " + y.map(_ / 0).map(_.toString).apply, 100) + + intercept[java.lang.ArithmeticException](Await.result(r, timeout.duration)) + } } - "include system info in akkaSource when creating Logging with system" in { - val log = Logging(system, "akka.event.slf4j.Slf4jEventHandlerSpec.MyLogSource") - log.info("test") - awaitCond(outputString.contains("----"), 5 seconds) - val s = outputString - s must include("akkaSource=[akka.event.slf4j.Slf4jEventHandlerSpec.MyLogSource(akka://Slf4jEventHandlerSpec)]") - s must include("logger=[akka.event.slf4j.Slf4jEventHandlerSpec.MyLogSource(akka://Slf4jEventHandlerSpec)]") + "futureComposingWithContinuationsFailureCastInt" in { + filterException[ClassCastException] { + + val actor = system.actorOf(Props[TestActor]) + + val x = Future(3) + val y = (actor ? "Hello").mapTo[Int] + + val r = flow(x() + y(), 100) + + intercept[ClassCastException](Await.result(r, timeout.duration)) + } } - "not include system info in akkaSource when creating Logging with system.eventStream" in { - val log = Logging(system.eventStream, "akka.event.slf4j.Slf4jEventHandlerSpec.MyLogSource") - log.info("test") - awaitCond(outputString.contains("----"), 5 seconds) - val s = outputString - s must include("akkaSource=[akka.event.slf4j.Slf4jEventHandlerSpec.MyLogSource]") - s must include("logger=[akka.event.slf4j.Slf4jEventHandlerSpec.MyLogSource]") + "futureComposingWithContinuationsFailureCastNothing" in { + filterException[ClassCastException] { + + val actor = system.actorOf(Props[TestActor]) + + val x = Future("Hello") + val y = (actor ? "Hello").mapTo[Nothing] + + val r = flow(x() + y()) + + intercept[ClassCastException](Await.result(r, timeout.duration)) + } } - "use short class name and include system info in akkaSource when creating Logging with system and class" in { - val log = Logging(system, classOf[MyLogSource]) - log.info("test") - awaitCond(outputString.contains("----"), 5 seconds) - val s = outputString - s must include("akkaSource=[Slf4jEventHandlerSpec$MyLogSource(akka://Slf4jEventHandlerSpec)]") - s must include("logger=[akka.event.slf4j.Slf4jEventHandlerSpec$MyLogSource]") + "futureCompletingWithContinuations" in { + + val x, y, z = Promise[Int]() + val ly, lz = new TestLatch + + val result = flow { + y completeWith x.future + ly.open() // not within continuation + + z << x + lz.open() // within continuation, will wait for 'z' to complete + z() + y() + } + + Await.ready(ly, 100 milliseconds) + intercept[TimeoutException] { Await.ready(lz, 100 milliseconds) } + + flow { x << 5 } + + assert(Await.result(y.future, timeout.duration) === 5) + assert(Await.result(z.future, timeout.duration) === 5) + Await.ready(lz, timeout.duration) + assert(Await.result(result, timeout.duration) === 10) + + val a, b, c = Promise[Int]() + + val result2 = flow { + val n = (a << c).value.get.right.get + 10 + b << (c() - 2) + a() + n * b() + } + + c completeWith Future(5) + + assert(Await.result(a.future, timeout.duration) === 5) + assert(Await.result(b.future, timeout.duration) === 3) + assert(Await.result(result2, timeout.duration) === 50) } - "use short class name in akkaSource when creating Logging with system.eventStream and class" in { - val log = Logging(system.eventStream, classOf[MyLogSource]) - log.info("test") - awaitCond(outputString.contains("----"), 5 seconds) - val s = outputString - s must include("akkaSource=[Slf4jEventHandlerSpec$MyLogSource]") - s must include("logger=[akka.event.slf4j.Slf4jEventHandlerSpec$MyLogSource]") + "futureDataFlowShouldEmulateBlocking1" in { + + val one, two = Promise[Int]() + val simpleResult = flow { + one() + two() + } + + assert(Seq(one.future, two.future, simpleResult).forall(_.isCompleted == false)) + + flow { one << 1 } + + Await.ready(one.future, 1 minute) + + assert(one.isCompleted) + assert(Seq(two.future, simpleResult).forall(_.isCompleted == false)) + + flow { two << 9 } + + Await.ready(two.future, 1 minute) + + assert(Seq(one, two).forall(_.isCompleted == true)) + assert(Await.result(simpleResult, timeout.duration) === 10) + + } + + "futureDataFlowShouldEmulateBlocking2" in { + + val x1, x2, y1, y2 = Promise[Int]() + val lx, ly, lz = new TestLatch + val result = flow { + lx.open() + x1 << y1 + ly.open() + x2 << y2 + lz.open() + x1() + x2() + } + Await.ready(lx, 2 seconds) + assert(!ly.isOpen) + assert(!lz.isOpen) + assert(List(x1, x2, y1, y2).forall(_.isCompleted == false)) + + flow { y1 << 1 } // When this is set, it should cascade down the line + + Await.ready(ly, 2 seconds) + assert(Await.result(x1.future, 1 minute) === 1) + assert(!lz.isOpen) + + flow { y2 << 9 } // When this is set, it should cascade down the line + + Await.ready(lz, 2 seconds) + assert(Await.result(x2.future, 1 minute) === 9) + + assert(List(x1, x2, y1, y2).forall(_.isCompleted)) + + assert(Await.result(result, 1 minute) === 10) + } + + "dataFlowAPIshouldbeSlick" in { + + val i1, i2, s1, s2 = new TestLatch + + val callService1 = Future { i1.open(); Await.ready(s1, TestLatch.DefaultTimeout); 1 } + val callService2 = Future { i2.open(); Await.ready(s2, TestLatch.DefaultTimeout); 9 } + + val result = flow { callService1() + callService2() } + + assert(!s1.isOpen) + assert(!s2.isOpen) + assert(!result.isCompleted) + Await.ready(i1, 2 seconds) + Await.ready(i2, 2 seconds) + s1.open() + s2.open() + assert(Await.result(result, timeout.duration) === 10) + } + + "futureCompletingWithContinuationsFailure" in { + filterException[ArithmeticException] { + + val x, y, z = Promise[Int]() + val ly, lz = new TestLatch + + val result = flow { + y << x + ly.open() + val oops = 1 / 0 + z << x + lz.open() + z() + y() + oops + } + intercept[TimeoutException] { Await.ready(ly, 100 milliseconds) } + intercept[TimeoutException] { Await.ready(lz, 100 milliseconds) } + flow { x << 5 } + + assert(Await.result(y.future, timeout.duration) === 5) + intercept[java.lang.ArithmeticException](Await.result(result, timeout.duration)) + assert(z.future.value === None) + assert(!lz.isOpen) + } + } + + "futureContinuationsShouldNotBlock" in { + + val latch = new TestLatch + val future = Future { + Await.ready(latch, TestLatch.DefaultTimeout) + "Hello" + } + + val result = flow { + Some(future()).filter(_ == "Hello") + } + + assert(!result.isCompleted) + + latch.open() + + assert(Await.result(result, timeout.duration) === Some("Hello")) + } + + "futureFlowShouldBeTypeSafe" in { + + val rString = flow { + val x = Future(5) + x().toString + } + + val rInt = flow { + val x = rString.apply + val y = Future(5) + x.length + y() + } + + assert(checkType(rString, manifest[String])) + assert(checkType(rInt, manifest[Int])) + assert(!checkType(rInt, manifest[String])) + assert(!checkType(rInt, manifest[Nothing])) + assert(!checkType(rInt, manifest[Any])) + + Await.result(rString, timeout.duration) + Await.result(rInt, timeout.duration) + } + + "futureFlowSimpleAssign" in { + val x, y, z = Promise[Int]() + + flow { + z << x() + y() + } + flow { x << 40 } + flow { y << 2 } + + assert(Await.result(z.future, timeout.duration) === 42) + } + + "should capture first exception with dataflow" in { + val f1 = flow { 40 / 0 } + intercept[java.lang.ArithmeticException](Await result (f1, TestLatch.DefaultTimeout)) } } -}*/ + def checkType[A: ClassTag, B](in: Future[A], ref: ClassTag[B]): Boolean = implicitly[ClassTag[A]].runtimeClass eq ref.runtimeClass + +}