This commit is contained in:
parent
16e6fb9d77
commit
2b8b946bc7
1 changed files with 5 additions and 8 deletions
|
|
@ -16,15 +16,12 @@ import akka.testkit.{ AkkaSpec, TestLatch }
|
||||||
|
|
||||||
class RateTransformationDocSpec extends AkkaSpec {
|
class RateTransformationDocSpec extends AkkaSpec {
|
||||||
|
|
||||||
type Seq[+A] = immutable.Seq[A]
|
|
||||||
val Seq = immutable.Seq
|
|
||||||
|
|
||||||
implicit val materializer = ActorMaterializer()
|
implicit val materializer = ActorMaterializer()
|
||||||
|
|
||||||
"conflate should summarize" in {
|
"conflate should summarize" in {
|
||||||
//#conflate-summarize
|
//#conflate-summarize
|
||||||
val statsFlow = Flow[Double]
|
val statsFlow = Flow[Double]
|
||||||
.conflateWithSeed(Seq(_))(_ :+ _)
|
.conflateWithSeed(immutable.Seq(_))(_ :+ _)
|
||||||
.map { s ⇒
|
.map { s ⇒
|
||||||
val μ = s.sum / s.size
|
val μ = s.sum / s.size
|
||||||
val se = s.map(x ⇒ pow(x - μ, 2))
|
val se = s.map(x ⇒ pow(x - μ, 2))
|
||||||
|
|
@ -38,14 +35,14 @@ class RateTransformationDocSpec extends AkkaSpec {
|
||||||
.grouped(10)
|
.grouped(10)
|
||||||
.runWith(Sink.head)
|
.runWith(Sink.head)
|
||||||
|
|
||||||
Await.result(fut, 100.millis)
|
fut.futureValue
|
||||||
}
|
}
|
||||||
|
|
||||||
"conflate should sample" in {
|
"conflate should sample" in {
|
||||||
//#conflate-sample
|
//#conflate-sample
|
||||||
val p = 0.01
|
val p = 0.01
|
||||||
val sampleFlow = Flow[Double]
|
val sampleFlow = Flow[Double]
|
||||||
.conflateWithSeed(Seq(_)) {
|
.conflateWithSeed(immutable.Seq(_)) {
|
||||||
case (acc, elem) if Random.nextDouble < p ⇒ acc :+ elem
|
case (acc, elem) if Random.nextDouble < p ⇒ acc :+ elem
|
||||||
case (acc, _) ⇒ acc
|
case (acc, _) ⇒ acc
|
||||||
}
|
}
|
||||||
|
|
@ -57,7 +54,7 @@ class RateTransformationDocSpec extends AkkaSpec {
|
||||||
.via(sampleFlow)
|
.via(sampleFlow)
|
||||||
.runWith(Sink.fold(Seq.empty[Double])(_ :+ _))
|
.runWith(Sink.fold(Seq.empty[Double])(_ :+ _))
|
||||||
|
|
||||||
val count = Await.result(fut, 1000.millis).size
|
fut.futureValue
|
||||||
}
|
}
|
||||||
|
|
||||||
"expand should repeat last" in {
|
"expand should repeat last" in {
|
||||||
|
|
@ -73,7 +70,7 @@ class RateTransformationDocSpec extends AkkaSpec {
|
||||||
.run()
|
.run()
|
||||||
|
|
||||||
probe.sendNext(1.0)
|
probe.sendNext(1.0)
|
||||||
val expanded = Await.result(fut, 100.millis)
|
val expanded = fut.futureValue
|
||||||
expanded.size shouldBe 10
|
expanded.size shouldBe 10
|
||||||
expanded.sum shouldBe 10
|
expanded.sum shouldBe 10
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue