=str #15739 Hook-up grouped, groupedWithin, timerTransform to new DSL

This commit is contained in:
Patrik Nordwall 2014-09-09 13:44:16 +02:00
parent 913b34ae5f
commit 9732051019
8 changed files with 435 additions and 14 deletions

View file

@ -13,6 +13,8 @@ import akka.pattern.ask
import akka.stream.{ MaterializerSettings, Transformer }
import akka.stream.impl.{ ActorProcessor, ActorPublisher, ExposedPublisher, TransformProcessorImpl }
import akka.stream.scaladsl2._
import akka.stream.TimerTransformer
import akka.stream.impl.TimerTransformerProcessorsImpl
/**
* INTERNAL API
@ -24,6 +26,8 @@ private[akka] object Ast {
case class Transform(name: String, mkTransformer: () Transformer[Any, Any]) extends AstNode
case class TimerTransform(name: String, mkTransformer: () TimerTransformer[Any, Any]) extends AstNode
case class GroupBy(f: Any Any) extends AstNode {
override def name = "groupBy"
}
@ -196,11 +200,11 @@ private[akka] object ActorProcessorFactory {
val settings = materializer.settings
(op match {
case t: Transform Props(new TransformProcessorImpl(settings, t.mkTransformer()))
case t: TimerTransform Props(new TimerTransformerProcessorsImpl(settings, t.mkTransformer()))
case g: GroupBy Props(new GroupByProcessorImpl(settings, g.f))
case tt: PrefixAndTail Props(new PrefixAndTailImpl(settings, tt.n))
case s: SplitWhen Props(new SplitWhenProcessorImpl(settings, s.p))
case ConcatAll Props(new ConcatAllImpl(materializer))
}).withDispatcher(settings.dispatcher)
}

View file

@ -3,12 +3,17 @@
*/
package akka.stream.scaladsl2
import scala.collection.immutable
import scala.collection.immutable
import akka.stream.impl2.Ast._
import org.reactivestreams._
import scala.annotation.unchecked.uncheckedVariance
import scala.language.higherKinds
import akka.stream.Transformer
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.duration.Duration
import akka.util.Collections.EmptyImmutableSeq
import akka.stream.TimerTransformer
/**
* This is the interface from which all concrete Flows inherit. No generic
@ -17,10 +22,15 @@ import akka.stream.Transformer
*/
sealed trait Flow
object FlowOps {
private case object GroupedWithinTimerKey
}
/**
* Operations offered by flows with a free output side: the DSL flows left-to-right only.
*/
trait FlowOps[-In, +Out] {
import FlowOps._
type Repr[-I, +O] <: FlowOps[I, O]
// Storing ops in reverse order
@ -53,6 +63,66 @@ trait FlowOps[-In, +Out] {
override def onNext(in: Out) = if (pf.isDefinedAt(in)) List(pf(in)) else Nil
})
/**
* Chunk up this stream into groups of the given size, with the last group
* possibly smaller than requested due to end-of-stream.
*
* `n` must be positive, otherwise IllegalArgumentException is thrown.
*/
def grouped(n: Int): Repr[In, immutable.Seq[Out]] = {
require(n > 0, "n must be greater than 0")
transform("grouped", () new Transformer[Out, immutable.Seq[Out]] {
var buf: Vector[Out] = Vector.empty
override def onNext(in: Out) = {
buf :+= in
if (buf.size == n) {
val group = buf
buf = Vector.empty
List(group)
} else
Nil
}
override def onTermination(e: Option[Throwable]) = if (buf.isEmpty) Nil else List(buf)
})
}
/**
* Chunk up this stream into groups of elements received within a time window,
* or limited by the given number of elements, whatever happens first.
* Empty groups will not be emitted if no elements are received from upstream.
* The last group before end-of-stream will contain the buffered elements
* since the previously emitted group.
*
* `n` must be positive, and `d` must be greater than 0 seconds, otherwise
* IllegalArgumentException is thrown.
*/
def groupedWithin(n: Int, d: FiniteDuration): Repr[In, immutable.Seq[Out]] = {
require(n > 0, "n must be greater than 0")
require(d > Duration.Zero)
timerTransform("groupedWithin", () new TimerTransformer[Out, immutable.Seq[Out]] {
schedulePeriodically(GroupedWithinTimerKey, d)
var buf: Vector[Out] = Vector.empty
override def onNext(in: Out) = {
buf :+= in
if (buf.size == n) {
// start new time window
schedulePeriodically(GroupedWithinTimerKey, d)
emitGroup()
} else Nil
}
override def onTermination(e: Option[Throwable]) = if (buf.isEmpty) Nil else List(buf)
override def onTimer(timerKey: Any) = emitGroup()
private def emitGroup(): immutable.Seq[immutable.Seq[Out]] =
if (buf.isEmpty) EmptyImmutableSeq
else {
val group = buf
buf = Vector.empty
List(group)
}
})
}
/**
* Generic transformation of a stream: for each element the [[akka.stream.Transformer#onNext]]
* function is invoked, expecting a (possibly empty) sequence of output elements
@ -125,6 +195,33 @@ trait FlowOps[-In, +Out] {
case _: FlattenStrategy.Concat[Out] andThen(ConcatAll)
case _ throw new IllegalArgumentException(s"Unsupported flattening strategy [${strategy.getClass.getSimpleName}]")
}
/**
* Transformation of a stream, with additional support for scheduled events.
*
* For each element the [[akka.stream.Transformer#onNext]]
* function is invoked, expecting a (possibly empty) sequence of output elements
* to be produced.
* After handing off the elements produced from one input element to the downstream
* subscribers, the [[akka.stream.Transformer#isComplete]] predicate determines whether to end
* stream processing at this point; in that case the upstream subscription is
* canceled. Before signaling normal completion to the downstream subscribers,
* the [[akka.stream.Transformer#onComplete]] function is invoked to produce a (possibly empty)
* sequence of elements in response to the end-of-stream event.
*
* [[akka.stream.Transformer#onError]] is called when failure is signaled from upstream.
*
* After normal completion or error the [[akka.stream.Transformer#cleanup]] function is called.
*
* It is possible to keep state in the concrete [[akka.stream.Transformer]] instance with
* ordinary instance variables. The [[akka.stream.Transformer]] is executed by an actor and
* therefore you do not have to add any additional thread safety or memory
* visibility constructs to access the state from the callback methods.
*
* Note that you can use [[#transform]] if you just need to transform elements time plays no role in the transformation.
*/
def timerTransform[U](name: String, mkTransformer: () TimerTransformer[Out, U]): Repr[In, U] =
andThen(TimerTransform(name, mkTransformer.asInstanceOf[() TimerTransformer[Any, Any]]))
}
/**

View file

@ -0,0 +1,33 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.scaladsl2
import scala.collection.immutable
import akka.stream.testkit.AkkaSpec
import akka.stream.testkit2.ScriptedTest
import scala.concurrent.forkjoin.ThreadLocalRandom.{ current random }
import akka.stream.MaterializerSettings
class FlowGroupedSpec extends AkkaSpec with ScriptedTest {
val settings = MaterializerSettings(system)
.withInputBuffer(initialSize = 2, maxSize = 16)
.withFanOutBuffer(initialSize = 1, maxSize = 16)
"A Grouped" must {
"group evenly" in {
def script = Script((1 to 20) map { _ val x, y, z = random.nextInt(); Seq(x, y, z) -> Seq(immutable.Seq(x, y, z)) }: _*)
(1 to 30) foreach (_ runScript(script, settings)(_.grouped(3)))
}
"group with rest" in {
def script = Script(((1 to 20).map { _ val x, y, z = random.nextInt(); Seq(x, y, z) -> Seq(immutable.Seq(x, y, z)) }
:+ { val x = random.nextInt(); Seq(x) -> Seq(immutable.Seq(x)) }): _*)
(1 to 30) foreach (_ runScript(script, settings)(_.grouped(3)))
}
}
}

View file

@ -0,0 +1,139 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.scaladsl2
import scala.collection.immutable
import scala.concurrent.duration._
import scala.concurrent.forkjoin.ThreadLocalRandom.{ current random }
import akka.stream.testkit.AkkaSpec
import akka.stream.testkit.StreamTestKit
import akka.stream.testkit2.ScriptedTest
import akka.stream.MaterializerSettings
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class FlowGroupedWithinSpec extends AkkaSpec with ScriptedTest {
val settings = MaterializerSettings(system)
implicit val materializer = FlowMaterializer()
"A GroupedWithin" must {
"group elements within the duration" in {
val input = Iterator.from(1)
val p = StreamTestKit.PublisherProbe[Int]()
val c = StreamTestKit.SubscriberProbe[immutable.Seq[Int]]()
FlowFrom(p).groupedWithin(1000, 1.second).publishTo(c)
val pSub = p.expectSubscription
val cSub = c.expectSubscription
cSub.request(100)
val demand1 = pSub.expectRequest
(1 to demand1) foreach { _ pSub.sendNext(input.next()) }
val demand2 = pSub.expectRequest
(1 to demand2) foreach { _ pSub.sendNext(input.next()) }
val demand3 = pSub.expectRequest
c.expectNext((1 to (demand1 + demand2)).toVector)
(1 to demand3) foreach { _ pSub.sendNext(input.next()) }
c.expectNoMsg(300.millis)
c.expectNext(((demand1 + demand2 + 1) to (demand1 + demand2 + demand3)).toVector)
c.expectNoMsg(300.millis)
pSub.expectRequest
val last = input.next()
pSub.sendNext(last)
pSub.sendComplete()
c.expectNext(List(last))
c.expectComplete
c.expectNoMsg(200.millis)
}
"deliver bufferd elements onComplete before the timeout" in {
val c = StreamTestKit.SubscriberProbe[immutable.Seq[Int]]()
FlowFrom(1 to 3).groupedWithin(1000, 10.second).publishTo(c)
val cSub = c.expectSubscription
cSub.request(100)
c.expectNext((1 to 3).toList)
c.expectComplete
c.expectNoMsg(200.millis)
}
"buffer groups until requested from downstream" in {
val input = Iterator.from(1)
val p = StreamTestKit.PublisherProbe[Int]()
val c = StreamTestKit.SubscriberProbe[immutable.Seq[Int]]()
FlowFrom(p).groupedWithin(1000, 1.second).publishTo(c)
val pSub = p.expectSubscription
val cSub = c.expectSubscription
cSub.request(1)
val demand1 = pSub.expectRequest
(1 to demand1) foreach { _ pSub.sendNext(input.next()) }
c.expectNext((1 to demand1).toVector)
val demand2 = pSub.expectRequest
(1 to demand2) foreach { _ pSub.sendNext(input.next()) }
c.expectNoMsg(300.millis)
cSub.request(1)
c.expectNext(((demand1 + 1) to (demand1 + demand2)).toVector)
pSub.sendComplete()
c.expectComplete
c.expectNoMsg(100.millis)
}
"drop empty groups" in {
val p = StreamTestKit.PublisherProbe[Int]()
val c = StreamTestKit.SubscriberProbe[immutable.Seq[Int]]()
FlowFrom(p).groupedWithin(1000, 500.millis).publishTo(c)
val pSub = p.expectSubscription
val cSub = c.expectSubscription
cSub.request(2)
pSub.expectRequest
c.expectNoMsg(600.millis)
pSub.sendNext(1)
pSub.sendNext(2)
c.expectNext(List(1, 2))
// nothing more requested
c.expectNoMsg(1100.millis)
cSub.request(3)
c.expectNoMsg(600.millis)
pSub.sendComplete()
c.expectComplete
c.expectNoMsg(100.millis)
}
"reset time window when max elements reached" in {
val input = Iterator.from(1)
val p = StreamTestKit.PublisherProbe[Int]()
val c = StreamTestKit.SubscriberProbe[immutable.Seq[Int]]()
FlowFrom(p).groupedWithin(3, 2.second).publishTo(c)
val pSub = p.expectSubscription
val cSub = c.expectSubscription
cSub.request(4)
val demand1 = pSub.expectRequest
demand1 should be(4)
c.expectNoMsg(1000.millis)
(1 to demand1) foreach { _ pSub.sendNext(input.next()) }
c.probe.within(1000.millis) {
c.expectNext((1 to 3).toVector)
}
c.expectNoMsg(1500.millis)
c.probe.within(1000.millis) {
c.expectNext(List(4))
}
pSub.sendComplete()
c.expectComplete
c.expectNoMsg(100.millis)
}
"group evenly" in {
def script = Script((1 to 20) map { _ val x, y, z = random.nextInt(); Seq(x, y, z) -> Seq(immutable.Seq(x, y, z)) }: _*)
(1 to 30) foreach (_ runScript(script, settings)(_.groupedWithin(3, 10.minutes)))
}
"group with rest" in {
def script = Script(((1 to 20).map { _ val x, y, z = random.nextInt(); Seq(x, y, z) -> Seq(immutable.Seq(x, y, z)) }
:+ { val x = random.nextInt(); Seq(x) -> Seq(immutable.Seq(x)) }): _*)
(1 to 30) foreach (_ runScript(script, settings)(_.groupedWithin(3, 10.minutes)))
}
}
}

View file

@ -3,7 +3,9 @@
*/
package akka.stream.scaladsl2
import akka.stream.testkit.{ AkkaSpec, ScriptedTest, StreamTestKit }
import akka.stream.testkit.{ AkkaSpec, StreamTestKit }
import akka.stream.testkit.AkkaSpec
import akka.stream.testkit2.ScriptedTest
import akka.testkit.TestProbe
import scala.concurrent.duration._
import scala.concurrent.forkjoin.ThreadLocalRandom.{ current random }

View file

@ -11,6 +11,7 @@ import scala.concurrent.Await
import scala.concurrent.duration._
import scala.util.control.NoStackTrace
import akka.stream.MaterializerSettings
import akka.stream.testkit.StreamTestKit.SubscriberProbe
class FlowPrefixAndTailSpec extends AkkaSpec {
@ -30,14 +31,22 @@ class FlowPrefixAndTailSpec extends AkkaSpec {
val futureSink = newFutureSink
val mf = FlowFrom(Nil).prefixAndTail(10).withSink(futureSink).run()
val fut = futureSink.future(mf)
Await.result(fut, 3.seconds) should be((Nil, FlowFrom(EmptyPublisher[Int])))
val (prefix, tailFlow) = Await.result(fut, 3.seconds)
prefix should be(Nil)
val tailSubscriber = SubscriberProbe[Int]
tailFlow.publishTo(tailSubscriber)
tailSubscriber.expectComplete()
}
"work on short input" in {
val futureSink = newFutureSink
val mf = FlowFrom(List(1, 2, 3)).prefixAndTail(10).withSink(futureSink).run()
val fut = futureSink.future(mf)
Await.result(fut, 3.seconds) should be((List(1, 2, 3), FlowFrom(EmptyPublisher[Int])))
val (prefix, tailFlow) = Await.result(fut, 3.seconds)
prefix should be(List(1, 2, 3))
val tailSubscriber = SubscriberProbe[Int]
tailFlow.publishTo(tailSubscriber)
tailSubscriber.expectComplete()
}
"work on longer inputs" in {
@ -47,11 +56,10 @@ class FlowPrefixAndTailSpec extends AkkaSpec {
val (takes, tail) = Await.result(fut, 3.seconds)
takes should be(1 to 5)
// FIXME enable this again, when grouped is implemented
// val futureSink2 = ???
// val mf2 = tail.grouped(6).withSink(futureSink2).run()
// val fut2 = futureSink2.future(mf2)
// Await.result(fut2, 3.seconds) should be(6 to 10)
val futureSink2 = FutureSink[immutable.Seq[Int]]
val mf2 = tail.grouped(6).withSink(futureSink2).run()
val fut2 = futureSink2.future(mf2)
Await.result(fut2, 3.seconds) should be(6 to 10)
}
"handle zero take count" in {
@ -61,8 +69,10 @@ class FlowPrefixAndTailSpec extends AkkaSpec {
val (takes, tail) = Await.result(fut, 3.seconds)
takes should be(Nil)
// FIXME enable this again, when grouped is implemented
// Await.result(FlowFrom(tail).grouped(11).toFuture(), 3.seconds) should be(1 to 10)
val futureSink2 = FutureSink[immutable.Seq[Int]]
val mf2 = tail.grouped(11).withSink(futureSink2).run()
val fut2 = futureSink2.future(mf2)
Await.result(fut2, 3.seconds) should be(1 to 10)
}
"handle negative take count" in {
@ -72,8 +82,10 @@ class FlowPrefixAndTailSpec extends AkkaSpec {
val (takes, tail) = Await.result(fut, 3.seconds)
takes should be(Nil)
// FIXME enable this again, when grouped is implemented
// Await.result(FlowFrom(tail).grouped(11).toFuture(), 3.seconds) should be(1 to 10)
val futureSink2 = FutureSink[immutable.Seq[Int]]
val mf2 = tail.grouped(11).withSink(futureSink2).run()
val fut2 = futureSink2.future(mf2)
Await.result(fut2, 3.seconds) should be(1 to 10)
}
"work if size of take is equal to stream size" in {

View file

@ -3,7 +3,8 @@
*/
package akka.stream.scaladsl2
import akka.stream.testkit.{ AkkaSpec, ScriptedTest, StreamTestKit }
import akka.stream.testkit.{ AkkaSpec, StreamTestKit }
import akka.stream.testkit2.ScriptedTest
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.concurrent.forkjoin.ThreadLocalRandom.{ current random }

View file

@ -0,0 +1,133 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.scaladsl2
import language.postfixOps
import scala.collection.immutable
import scala.concurrent.duration._
import akka.actor.Actor
import akka.actor.ActorCell
import akka.actor.ActorRef
import akka.actor.Props
import akka.stream.TimerTransformer.Scheduled
import akka.stream.testkit.AkkaSpec
import akka.testkit.TestDuration
import akka.testkit.TestKit
import akka.stream.TimerTransformer
object TimerTransformerSpec {
case object TestSingleTimer
case object TestSingleTimerResubmit
case object TestCancelTimer
case object TestCancelTimerAck
case object TestRepeatedTimer
case class Tick(n: Int)
def driverProps(probe: ActorRef): Props =
Props(classOf[Driver], probe).withDispatcher("akka.test.stream-dispatcher")
class Driver(probe: ActorRef) extends Actor {
// need implicit system for dilated
import context.system
val tickCount = Iterator from 1
val transformer = new TimerTransformer[Int, Int] {
override def onNext(elem: Int): immutable.Seq[Int] = List(elem)
override def onTimer(timerKey: Any): immutable.Seq[Int] = {
val tick = Tick(tickCount.next())
probe ! tick
if (timerKey == "TestSingleTimerResubmit" && tick.n == 1)
scheduleOnce("TestSingleTimerResubmit", 500.millis.dilated)
else if (timerKey == "TestRepeatedTimer" && tick.n == 5)
cancelTimer("TestRepeatedTimer")
Nil
}
}
override def preStart(): Unit = {
super.preStart()
transformer.start(context)
}
override def postStop(): Unit = {
super.postStop()
transformer.stop()
}
def receive = {
case TestSingleTimer
transformer.scheduleOnce("TestSingleTimer", 500.millis.dilated)
case TestSingleTimerResubmit
transformer.scheduleOnce("TestSingleTimerResubmit", 500.millis.dilated)
case TestCancelTimer
transformer.scheduleOnce("TestCancelTimer", 1.milli.dilated)
TestKit.awaitCond(context.asInstanceOf[ActorCell].mailbox.hasMessages, 1.second.dilated)
transformer.cancelTimer("TestCancelTimer")
probe ! TestCancelTimerAck
transformer.scheduleOnce("TestCancelTimer", 500.milli.dilated)
case TestRepeatedTimer
transformer.schedulePeriodically("TestRepeatedTimer", 100.millis.dilated)
case s: Scheduled transformer.onScheduled(s)
}
}
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class TimerTransformerSpec extends AkkaSpec {
import TimerTransformerSpec._
"A TimerTransformer" must {
"receive single-shot timer" in {
val driver = system.actorOf(driverProps(testActor))
within(2 seconds) {
within(500 millis, 1 second) {
driver ! TestSingleTimer
expectMsg(Tick(1))
}
expectNoMsg(1 second)
}
}
"resubmit single-shot timer" in {
val driver = system.actorOf(driverProps(testActor))
within(2.5 seconds) {
within(500 millis, 1 second) {
driver ! TestSingleTimerResubmit
expectMsg(Tick(1))
}
within(1 second) {
expectMsg(Tick(2))
}
expectNoMsg(1 second)
}
}
"correctly cancel a named timer" in {
val driver = system.actorOf(driverProps(testActor))
driver ! TestCancelTimer
within(500 millis) {
expectMsg(TestCancelTimerAck)
}
within(300 millis, 1 second) {
expectMsg(Tick(1))
}
expectNoMsg(1 second)
}
"receive and cancel a repeated timer" in {
val driver = system.actorOf(driverProps(testActor))
driver ! TestRepeatedTimer
val seq = receiveWhile(2 seconds) {
case t: Tick t
}
seq should have length 5
expectNoMsg(1 second)
}
}
}