Adding old Dataflow tests, passing like a baws.
This commit is contained in:
parent
71fa783452
commit
984c456b16
2 changed files with 267 additions and 97 deletions
|
|
@ -56,6 +56,8 @@ package object dataflow {
|
||||||
fr.future
|
fr.future
|
||||||
}
|
}
|
||||||
|
|
||||||
|
final def <<(other: Promise[T])(implicit ec: ExecutionContext): Future[T] @cps[Future[Any]] = <<(other.future)
|
||||||
|
|
||||||
final def apply()(implicit ec: ExecutionContext): T @cps[Future[Any]] = shift(promise.future flatMap (_: T ⇒ Future[Any]))
|
final def apply()(implicit ec: ExecutionContext): T @cps[Future[Any]] = shift(promise.future flatMap (_: T ⇒ Future[Any]))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -3,130 +3,298 @@
|
||||||
*/
|
*/
|
||||||
package akka.dataflow
|
package akka.dataflow
|
||||||
|
|
||||||
/*import language.postfixOps
|
import language.postfixOps
|
||||||
|
|
||||||
import akka.testkit.AkkaSpec
|
import scala.reflect.ClassTag
|
||||||
import akka.actor.Actor
|
import akka.actor.{ Actor, Status, Props }
|
||||||
import akka.actor.ActorLogging
|
import akka.actor.Status._
|
||||||
|
import akka.pattern.ask
|
||||||
|
import akka.testkit.{ EventFilter, filterEvents, filterException }
|
||||||
|
import scala.concurrent.{ Await, Promise, Future }
|
||||||
import scala.concurrent.util.duration._
|
import scala.concurrent.util.duration._
|
||||||
import akka.event.Logging
|
import akka.testkit.{ DefaultTimeout, TestLatch, AkkaSpec }
|
||||||
import akka.actor.Props
|
import java.util.concurrent.TimeoutException
|
||||||
import ch.qos.logback.core.OutputStreamAppender
|
|
||||||
import java.io.StringWriter
|
|
||||||
import java.io.ByteArrayOutputStream
|
|
||||||
import org.scalatest.BeforeAndAfterEach
|
|
||||||
|
|
||||||
object Slf4jEventHandlerSpec {
|
object DataflowSpec {
|
||||||
|
class TestActor extends Actor {
|
||||||
// This test depends on logback configuration in src/test/resources/logback-test.xml
|
|
||||||
|
|
||||||
val config = """
|
|
||||||
akka {
|
|
||||||
loglevel = INFO
|
|
||||||
event-handlers = ["akka.event.slf4j.Slf4jEventHandler"]
|
|
||||||
event-handler-startup-timeout = 30s
|
|
||||||
}
|
|
||||||
"""
|
|
||||||
|
|
||||||
class LogProducer extends Actor with ActorLogging {
|
|
||||||
def receive = {
|
def receive = {
|
||||||
case e: Exception ⇒
|
case "Hello" ⇒ sender ! "World"
|
||||||
log.error(e, e.getMessage)
|
case "Failure" ⇒
|
||||||
case (s: String, x: Int, y: Int) ⇒
|
sender ! Status.Failure(new RuntimeException("Expected exception; to test fault-tolerance"))
|
||||||
log.info(s, x, y)
|
case "NoReply" ⇒
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
class MyLogSource
|
class TestDelayActor(await: TestLatch) extends Actor {
|
||||||
|
def receive = {
|
||||||
val output = new ByteArrayOutputStream
|
case "Hello" ⇒ Await.ready(await, TestLatch.DefaultTimeout); sender ! "World"
|
||||||
def outputString: String = output.toString("UTF-8")
|
case "NoReply" ⇒ Await.ready(await, TestLatch.DefaultTimeout)
|
||||||
|
case "Failure" ⇒
|
||||||
class TestAppender extends OutputStreamAppender {
|
Await.ready(await, TestLatch.DefaultTimeout)
|
||||||
|
sender ! Status.Failure(new RuntimeException("Expected exception; to test fault-tolerance"))
|
||||||
override def start(): Unit = {
|
|
||||||
setOutputStream(output)
|
|
||||||
super.start()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
class DataflowSpec extends AkkaSpec with DefaultTimeout {
|
||||||
class Slf4jEventHandlerSpec extends AkkaSpec(Slf4jEventHandlerSpec.config) with BeforeAndAfterEach {
|
import DataflowSpec._
|
||||||
import Slf4jEventHandlerSpec._
|
import system.dispatcher
|
||||||
|
"Dataflow API" must {
|
||||||
|
"futureComposingWithContinuations" in {
|
||||||
|
|
||||||
val producer = system.actorOf(Props[LogProducer], name = "logProducer")
|
val actor = system.actorOf(Props[TestActor])
|
||||||
|
|
||||||
override def beforeEach(): Unit = {
|
val x = Future("Hello")
|
||||||
output.reset()
|
val y = x flatMap (actor ? _) mapTo manifest[String]
|
||||||
}
|
|
||||||
|
|
||||||
val sourceThreadRegex = "sourceThread=\\[Slf4jEventHandlerSpec-akka.actor.default-dispatcher-[1-9][0-9]*\\]"
|
val r = flow(x() + " " + y() + "!")
|
||||||
|
|
||||||
"Slf4jEventHandler" must {
|
assert(Await.result(r, timeout.duration) === "Hello World!")
|
||||||
|
|
||||||
"log error with stackTrace" in {
|
system.stop(actor)
|
||||||
producer ! new RuntimeException("Simulated error")
|
|
||||||
|
|
||||||
awaitCond(outputString.contains("----"), 5 seconds)
|
|
||||||
val s = outputString
|
|
||||||
s must include("akkaSource=[akka://Slf4jEventHandlerSpec/user/logProducer]")
|
|
||||||
s must include("level=[ERROR]")
|
|
||||||
s must include("logger=[akka.event.slf4j.Slf4jEventHandlerSpec$LogProducer]")
|
|
||||||
s must include regex (sourceThreadRegex)
|
|
||||||
s must include("msg=[Simulated error]")
|
|
||||||
s must include("java.lang.RuntimeException: Simulated error")
|
|
||||||
s must include("at akka.event.slf4j.Slf4jEventHandlerSpec")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
"log info with parameters" in {
|
"futureComposingWithContinuationsFailureDivideZero" in {
|
||||||
producer ! ("test x={} y={}", 3, 17)
|
filterException[ArithmeticException] {
|
||||||
|
|
||||||
awaitCond(outputString.contains("----"), 5 seconds)
|
val x = Future("Hello")
|
||||||
val s = outputString
|
val y = x map (_.length)
|
||||||
s must include("akkaSource=[akka://Slf4jEventHandlerSpec/user/logProducer]")
|
|
||||||
s must include("level=[INFO]")
|
val r = flow(x() + " " + y.map(_ / 0).map(_.toString).apply, 100)
|
||||||
s must include("logger=[akka.event.slf4j.Slf4jEventHandlerSpec$LogProducer]")
|
|
||||||
s must include regex (sourceThreadRegex)
|
intercept[java.lang.ArithmeticException](Await.result(r, timeout.duration))
|
||||||
s must include("msg=[test x=3 y=17]")
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
"include system info in akkaSource when creating Logging with system" in {
|
"futureComposingWithContinuationsFailureCastInt" in {
|
||||||
val log = Logging(system, "akka.event.slf4j.Slf4jEventHandlerSpec.MyLogSource")
|
filterException[ClassCastException] {
|
||||||
log.info("test")
|
|
||||||
awaitCond(outputString.contains("----"), 5 seconds)
|
val actor = system.actorOf(Props[TestActor])
|
||||||
val s = outputString
|
|
||||||
s must include("akkaSource=[akka.event.slf4j.Slf4jEventHandlerSpec.MyLogSource(akka://Slf4jEventHandlerSpec)]")
|
val x = Future(3)
|
||||||
s must include("logger=[akka.event.slf4j.Slf4jEventHandlerSpec.MyLogSource(akka://Slf4jEventHandlerSpec)]")
|
val y = (actor ? "Hello").mapTo[Int]
|
||||||
|
|
||||||
|
val r = flow(x() + y(), 100)
|
||||||
|
|
||||||
|
intercept[ClassCastException](Await.result(r, timeout.duration))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
"not include system info in akkaSource when creating Logging with system.eventStream" in {
|
"futureComposingWithContinuationsFailureCastNothing" in {
|
||||||
val log = Logging(system.eventStream, "akka.event.slf4j.Slf4jEventHandlerSpec.MyLogSource")
|
filterException[ClassCastException] {
|
||||||
log.info("test")
|
|
||||||
awaitCond(outputString.contains("----"), 5 seconds)
|
val actor = system.actorOf(Props[TestActor])
|
||||||
val s = outputString
|
|
||||||
s must include("akkaSource=[akka.event.slf4j.Slf4jEventHandlerSpec.MyLogSource]")
|
val x = Future("Hello")
|
||||||
s must include("logger=[akka.event.slf4j.Slf4jEventHandlerSpec.MyLogSource]")
|
val y = (actor ? "Hello").mapTo[Nothing]
|
||||||
|
|
||||||
|
val r = flow(x() + y())
|
||||||
|
|
||||||
|
intercept[ClassCastException](Await.result(r, timeout.duration))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
"use short class name and include system info in akkaSource when creating Logging with system and class" in {
|
"futureCompletingWithContinuations" in {
|
||||||
val log = Logging(system, classOf[MyLogSource])
|
|
||||||
log.info("test")
|
val x, y, z = Promise[Int]()
|
||||||
awaitCond(outputString.contains("----"), 5 seconds)
|
val ly, lz = new TestLatch
|
||||||
val s = outputString
|
|
||||||
s must include("akkaSource=[Slf4jEventHandlerSpec$MyLogSource(akka://Slf4jEventHandlerSpec)]")
|
val result = flow {
|
||||||
s must include("logger=[akka.event.slf4j.Slf4jEventHandlerSpec$MyLogSource]")
|
y completeWith x.future
|
||||||
|
ly.open() // not within continuation
|
||||||
|
|
||||||
|
z << x
|
||||||
|
lz.open() // within continuation, will wait for 'z' to complete
|
||||||
|
z() + y()
|
||||||
|
}
|
||||||
|
|
||||||
|
Await.ready(ly, 100 milliseconds)
|
||||||
|
intercept[TimeoutException] { Await.ready(lz, 100 milliseconds) }
|
||||||
|
|
||||||
|
flow { x << 5 }
|
||||||
|
|
||||||
|
assert(Await.result(y.future, timeout.duration) === 5)
|
||||||
|
assert(Await.result(z.future, timeout.duration) === 5)
|
||||||
|
Await.ready(lz, timeout.duration)
|
||||||
|
assert(Await.result(result, timeout.duration) === 10)
|
||||||
|
|
||||||
|
val a, b, c = Promise[Int]()
|
||||||
|
|
||||||
|
val result2 = flow {
|
||||||
|
val n = (a << c).value.get.right.get + 10
|
||||||
|
b << (c() - 2)
|
||||||
|
a() + n * b()
|
||||||
|
}
|
||||||
|
|
||||||
|
c completeWith Future(5)
|
||||||
|
|
||||||
|
assert(Await.result(a.future, timeout.duration) === 5)
|
||||||
|
assert(Await.result(b.future, timeout.duration) === 3)
|
||||||
|
assert(Await.result(result2, timeout.duration) === 50)
|
||||||
}
|
}
|
||||||
|
|
||||||
"use short class name in akkaSource when creating Logging with system.eventStream and class" in {
|
"futureDataFlowShouldEmulateBlocking1" in {
|
||||||
val log = Logging(system.eventStream, classOf[MyLogSource])
|
|
||||||
log.info("test")
|
val one, two = Promise[Int]()
|
||||||
awaitCond(outputString.contains("----"), 5 seconds)
|
val simpleResult = flow {
|
||||||
val s = outputString
|
one() + two()
|
||||||
s must include("akkaSource=[Slf4jEventHandlerSpec$MyLogSource]")
|
}
|
||||||
s must include("logger=[akka.event.slf4j.Slf4jEventHandlerSpec$MyLogSource]")
|
|
||||||
|
assert(Seq(one.future, two.future, simpleResult).forall(_.isCompleted == false))
|
||||||
|
|
||||||
|
flow { one << 1 }
|
||||||
|
|
||||||
|
Await.ready(one.future, 1 minute)
|
||||||
|
|
||||||
|
assert(one.isCompleted)
|
||||||
|
assert(Seq(two.future, simpleResult).forall(_.isCompleted == false))
|
||||||
|
|
||||||
|
flow { two << 9 }
|
||||||
|
|
||||||
|
Await.ready(two.future, 1 minute)
|
||||||
|
|
||||||
|
assert(Seq(one, two).forall(_.isCompleted == true))
|
||||||
|
assert(Await.result(simpleResult, timeout.duration) === 10)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
"futureDataFlowShouldEmulateBlocking2" in {
|
||||||
|
|
||||||
|
val x1, x2, y1, y2 = Promise[Int]()
|
||||||
|
val lx, ly, lz = new TestLatch
|
||||||
|
val result = flow {
|
||||||
|
lx.open()
|
||||||
|
x1 << y1
|
||||||
|
ly.open()
|
||||||
|
x2 << y2
|
||||||
|
lz.open()
|
||||||
|
x1() + x2()
|
||||||
|
}
|
||||||
|
Await.ready(lx, 2 seconds)
|
||||||
|
assert(!ly.isOpen)
|
||||||
|
assert(!lz.isOpen)
|
||||||
|
assert(List(x1, x2, y1, y2).forall(_.isCompleted == false))
|
||||||
|
|
||||||
|
flow { y1 << 1 } // When this is set, it should cascade down the line
|
||||||
|
|
||||||
|
Await.ready(ly, 2 seconds)
|
||||||
|
assert(Await.result(x1.future, 1 minute) === 1)
|
||||||
|
assert(!lz.isOpen)
|
||||||
|
|
||||||
|
flow { y2 << 9 } // When this is set, it should cascade down the line
|
||||||
|
|
||||||
|
Await.ready(lz, 2 seconds)
|
||||||
|
assert(Await.result(x2.future, 1 minute) === 9)
|
||||||
|
|
||||||
|
assert(List(x1, x2, y1, y2).forall(_.isCompleted))
|
||||||
|
|
||||||
|
assert(Await.result(result, 1 minute) === 10)
|
||||||
|
}
|
||||||
|
|
||||||
|
"dataFlowAPIshouldbeSlick" in {
|
||||||
|
|
||||||
|
val i1, i2, s1, s2 = new TestLatch
|
||||||
|
|
||||||
|
val callService1 = Future { i1.open(); Await.ready(s1, TestLatch.DefaultTimeout); 1 }
|
||||||
|
val callService2 = Future { i2.open(); Await.ready(s2, TestLatch.DefaultTimeout); 9 }
|
||||||
|
|
||||||
|
val result = flow { callService1() + callService2() }
|
||||||
|
|
||||||
|
assert(!s1.isOpen)
|
||||||
|
assert(!s2.isOpen)
|
||||||
|
assert(!result.isCompleted)
|
||||||
|
Await.ready(i1, 2 seconds)
|
||||||
|
Await.ready(i2, 2 seconds)
|
||||||
|
s1.open()
|
||||||
|
s2.open()
|
||||||
|
assert(Await.result(result, timeout.duration) === 10)
|
||||||
|
}
|
||||||
|
|
||||||
|
"futureCompletingWithContinuationsFailure" in {
|
||||||
|
filterException[ArithmeticException] {
|
||||||
|
|
||||||
|
val x, y, z = Promise[Int]()
|
||||||
|
val ly, lz = new TestLatch
|
||||||
|
|
||||||
|
val result = flow {
|
||||||
|
y << x
|
||||||
|
ly.open()
|
||||||
|
val oops = 1 / 0
|
||||||
|
z << x
|
||||||
|
lz.open()
|
||||||
|
z() + y() + oops
|
||||||
|
}
|
||||||
|
intercept[TimeoutException] { Await.ready(ly, 100 milliseconds) }
|
||||||
|
intercept[TimeoutException] { Await.ready(lz, 100 milliseconds) }
|
||||||
|
flow { x << 5 }
|
||||||
|
|
||||||
|
assert(Await.result(y.future, timeout.duration) === 5)
|
||||||
|
intercept[java.lang.ArithmeticException](Await.result(result, timeout.duration))
|
||||||
|
assert(z.future.value === None)
|
||||||
|
assert(!lz.isOpen)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
"futureContinuationsShouldNotBlock" in {
|
||||||
|
|
||||||
|
val latch = new TestLatch
|
||||||
|
val future = Future {
|
||||||
|
Await.ready(latch, TestLatch.DefaultTimeout)
|
||||||
|
"Hello"
|
||||||
|
}
|
||||||
|
|
||||||
|
val result = flow {
|
||||||
|
Some(future()).filter(_ == "Hello")
|
||||||
|
}
|
||||||
|
|
||||||
|
assert(!result.isCompleted)
|
||||||
|
|
||||||
|
latch.open()
|
||||||
|
|
||||||
|
assert(Await.result(result, timeout.duration) === Some("Hello"))
|
||||||
|
}
|
||||||
|
|
||||||
|
"futureFlowShouldBeTypeSafe" in {
|
||||||
|
|
||||||
|
val rString = flow {
|
||||||
|
val x = Future(5)
|
||||||
|
x().toString
|
||||||
|
}
|
||||||
|
|
||||||
|
val rInt = flow {
|
||||||
|
val x = rString.apply
|
||||||
|
val y = Future(5)
|
||||||
|
x.length + y()
|
||||||
|
}
|
||||||
|
|
||||||
|
assert(checkType(rString, manifest[String]))
|
||||||
|
assert(checkType(rInt, manifest[Int]))
|
||||||
|
assert(!checkType(rInt, manifest[String]))
|
||||||
|
assert(!checkType(rInt, manifest[Nothing]))
|
||||||
|
assert(!checkType(rInt, manifest[Any]))
|
||||||
|
|
||||||
|
Await.result(rString, timeout.duration)
|
||||||
|
Await.result(rInt, timeout.duration)
|
||||||
|
}
|
||||||
|
|
||||||
|
"futureFlowSimpleAssign" in {
|
||||||
|
val x, y, z = Promise[Int]()
|
||||||
|
|
||||||
|
flow {
|
||||||
|
z << x() + y()
|
||||||
|
}
|
||||||
|
flow { x << 40 }
|
||||||
|
flow { y << 2 }
|
||||||
|
|
||||||
|
assert(Await.result(z.future, timeout.duration) === 42)
|
||||||
|
}
|
||||||
|
|
||||||
|
"should capture first exception with dataflow" in {
|
||||||
|
val f1 = flow { 40 / 0 }
|
||||||
|
intercept[java.lang.ArithmeticException](Await result (f1, TestLatch.DefaultTimeout))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}*/
|
def checkType[A: ClassTag, B](in: Future[A], ref: ClassTag[B]): Boolean = implicitly[ClassTag[A]].runtimeClass eq ref.runtimeClass
|
||||||
|
|
||||||
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue