!str #15742 Hook-up groupBy to new DSL

* Produces Flows instead of Publisher
This commit is contained in:
Patrik Nordwall 2014-09-08 13:58:52 +02:00
parent bf1e264028
commit 42e9718c7a
4 changed files with 286 additions and 3 deletions

View file

@ -4,13 +4,10 @@
package akka.stream.impl2
import java.util.concurrent.atomic.AtomicLong
import scala.annotation.tailrec
import scala.collection.immutable
import scala.concurrent.Await
import org.reactivestreams.{ Processor, Publisher, Subscriber }
import akka.actor._
import akka.pattern.ask
import akka.stream.{ MaterializerSettings, Transformer }
@ -27,6 +24,10 @@ private[akka] object Ast {
case class Transform(name: String, mkTransformer: () Transformer[Any, Any]) extends AstNode
case class GroupBy(f: Any Any) extends AstNode {
override def name = "groupBy"
}
}
/**
@ -182,6 +183,7 @@ private[akka] object ActorProcessorFactory {
def props(settings: MaterializerSettings, op: AstNode): Props =
(op match {
case t: Transform Props(new TransformProcessorImpl(settings, t.mkTransformer()))
case g: GroupBy Props(new GroupByProcessorImpl(settings, g.f))
}).withDispatcher(settings.dispatcher)
def apply[I, O](impl: ActorRef): ActorProcessor[I, O] = {

View file

@ -0,0 +1,71 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.impl2
import akka.stream.MaterializerSettings
import akka.stream.impl.TransferPhase
import akka.stream.impl.MultiStreamOutputProcessor.SubstreamKey
import akka.stream.scaladsl2.FlowFrom
import akka.stream.impl.MultiStreamOutputProcessor
/**
* INTERNAL API
*/
private[akka] class GroupByProcessorImpl(settings: MaterializerSettings, val keyFor: Any Any) extends MultiStreamOutputProcessor(settings) {
var keyToSubstreamOutputs = collection.mutable.Map.empty[Any, SubstreamOutputs]
var pendingSubstreamOutputs: SubstreamOutputs = _
// No substream is open yet. If downstream cancels now, we are complete
val waitFirst = TransferPhase(primaryInputs.NeedsInput && primaryOutputs.NeedsDemand) { ()
val elem = primaryInputs.dequeueInputElement()
val key = keyFor(elem)
nextPhase(openSubstream(elem, key))
}
// some substreams are open now. If downstream cancels, we still continue until the substreams are closed
val waitNext = TransferPhase(primaryInputs.NeedsInput) { ()
val elem = primaryInputs.dequeueInputElement()
val key = keyFor(elem)
keyToSubstreamOutputs.get(key) match {
case Some(substream) if substream.isOpen nextPhase(dispatchToSubstream(elem, keyToSubstreamOutputs(key)))
case None if primaryOutputs.isOpen nextPhase(openSubstream(elem, key))
case _ // stay
}
}
def openSubstream(elem: Any, key: Any): TransferPhase = TransferPhase(primaryOutputs.NeedsDemand) { ()
if (primaryOutputs.isClosed) {
// Just drop, we do not open any more substreams
nextPhase(waitNext)
} else {
val substreamOutput = newSubstream()
val substreamFlow = FlowFrom(substreamOutput) // substreamOutput is a Publisher
primaryOutputs.enqueueOutputElement((key, substreamFlow))
keyToSubstreamOutputs(key) = substreamOutput
nextPhase(dispatchToSubstream(elem, substreamOutput))
}
}
def dispatchToSubstream(elem: Any, substream: SubstreamOutputs): TransferPhase = {
pendingSubstreamOutputs = substream
TransferPhase(substream.NeedsDemand) { ()
substream.enqueueOutputElement(elem)
pendingSubstreamOutputs = null
nextPhase(waitNext)
}
}
nextPhase(waitFirst)
override def invalidateSubstream(substream: SubstreamKey): Unit = {
if ((pendingSubstreamOutputs ne null) && substream == pendingSubstreamOutputs.key) {
pendingSubstreamOutputs = null
nextPhase(waitNext)
}
super.invalidateSubstream(substream)
}
}

View file

@ -44,6 +44,20 @@ trait FlowOps[-In, +Out] extends HasNoSink[Out] {
def transform[T](name: String, mkTransformer: () Transformer[Out, T]): Repr[In, T] = {
andThen(Transform(name, mkTransformer.asInstanceOf[() Transformer[Any, Any]]))
}
/**
* This operation demultiplexes the incoming stream into separate output
* streams, one for each element key. The key is computed for each element
* using the given function. When a new key is encountered for the first time
* it is emitted to the downstream subscriber together with a fresh
* flow that will eventually produce all the elements of the substream
* for that key. Not consuming the elements from the created streams will
* stop this processor from processing more elements, therefore you must take
* care to unblock (or cancel) all of the produced streams even if you want
* to consume only one of them.
*/
def groupBy[K, U >: Out](f: Out K): Repr[In, (K, FlowWithSource[U, U])] =
andThen(GroupBy(f.asInstanceOf[Any Any]))
}
/**

View file

@ -0,0 +1,196 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.scaladsl2
import scala.concurrent.duration._
import akka.stream.testkit._
import org.reactivestreams.Publisher
import scala.util.control.NoStackTrace
import akka.stream.MaterializerSettings
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class FlowGroupBySpec extends AkkaSpec {
val settings = MaterializerSettings(system)
.withInputBuffer(initialSize = 2, maxSize = 2)
.withFanOutBuffer(initialSize = 2, maxSize = 2)
implicit val materializer = FlowMaterializer(settings)
case class StreamPuppet(p: Publisher[Int]) {
val probe = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(probe)
val subscription = probe.expectSubscription()
def request(demand: Int): Unit = subscription.request(demand)
def expectNext(elem: Int): Unit = probe.expectNext(elem)
def expectNoMsg(max: FiniteDuration): Unit = probe.expectNoMsg(max)
def expectComplete(): Unit = probe.expectComplete()
def expectError(e: Throwable) = probe.expectError(e)
def cancel(): Unit = subscription.cancel()
}
class SubstreamsSupport(groupCount: Int = 2, elementCount: Int = 6) {
val source = FlowFrom((1 to elementCount).iterator).toPublisher()
val groupStream = FlowFrom(source).groupBy(_ % groupCount).toPublisher()
val masterSubscriber = StreamTestKit.SubscriberProbe[(Int, FlowWithSource[Int, Int])]()
groupStream.subscribe(masterSubscriber)
val masterSubscription = masterSubscriber.expectSubscription()
def getSubFlow(expectedKey: Int): FlowWithSource[Int, Int] = {
masterSubscription.request(1)
expectSubFlow(expectedKey: Int)
}
def expectSubFlow(expectedKey: Int): FlowWithSource[Int, Int] = {
val (key, substream) = masterSubscriber.expectNext()
key should be(expectedKey)
substream
}
}
case class TE(message: String) extends RuntimeException(message) with NoStackTrace
"groupBy" must {
"work in the happy case" in new SubstreamsSupport(groupCount = 2) {
val s1 = StreamPuppet(getSubFlow(1).toPublisher())
masterSubscriber.expectNoMsg(100.millis)
s1.expectNoMsg(100.millis)
s1.request(1)
s1.expectNext(1)
s1.expectNoMsg(100.millis)
val s2 = StreamPuppet(getSubFlow(0).toPublisher())
s2.expectNoMsg(100.millis)
s2.request(2)
s2.expectNext(2)
// Important to request here on the OTHER stream because the buffer space is exactly one without the fanout box
s1.request(1)
s2.expectNext(4)
s2.expectNoMsg(100.millis)
s1.expectNext(3)
s2.request(1)
// Important to request here on the OTHER stream because the buffer space is exactly one without the fanout box
s1.request(1)
s2.expectNext(6)
s2.expectComplete()
s1.expectNext(5)
s1.expectComplete()
masterSubscriber.expectComplete()
}
"accept cancellation of substreams" in new SubstreamsSupport(groupCount = 2) {
StreamPuppet(getSubFlow(1).toPublisher()).cancel()
val substream = StreamPuppet(getSubFlow(0).toPublisher())
substream.request(2)
substream.expectNext(2)
substream.expectNext(4)
substream.expectNoMsg(100.millis)
substream.request(2)
substream.expectNext(6)
substream.expectComplete()
masterSubscriber.expectComplete()
}
"accept cancellation of master stream when not consumed anything" in {
val publisherProbeProbe = StreamTestKit.PublisherProbe[Int]()
val publisher = FlowFrom(publisherProbeProbe).groupBy(_ % 2).toPublisher()
val subscriber = StreamTestKit.SubscriberProbe[(Int, FlowWithSource[Int, Int])]()
publisher.subscribe(subscriber)
val upstreamSubscription = publisherProbeProbe.expectSubscription()
val downstreamSubscription = subscriber.expectSubscription()
downstreamSubscription.cancel()
upstreamSubscription.expectCancellation()
}
"accept cancellation of master stream when substreams are open" in new SubstreamsSupport(groupCount = 3, elementCount = 13) {
pending
// FIXME: Needs handling of loose substreams that no one refers to anymore.
// val substream = StreamPuppet(getSubproducer(1))
//
// substream.request(1)
// substream.expectNext(1)
//
// masterSubscription.cancel()
// masterSubscriber.expectNoMsg(100.millis)
//
// // Open substreams still work, others are discarded
// substream.request(4)
// substream.expectNext(4)
// substream.expectNext(7)
// substream.expectNext(10)
// substream.expectNext(13)
// substream.expectComplete()
}
"work with empty input stream" in {
val publisher = FlowFrom(List.empty[Int]).groupBy(_ % 2).toPublisher()
val subscriber = StreamTestKit.SubscriberProbe[(Int, FlowWithSource[Int, Int])]()
publisher.subscribe(subscriber)
subscriber.expectCompletedOrSubscriptionFollowedByComplete()
}
"abort on onError from upstream" in {
val publisherProbeProbe = StreamTestKit.PublisherProbe[Int]()
val publisher = FlowFrom(publisherProbeProbe).groupBy(_ % 2).toPublisher()
val subscriber = StreamTestKit.SubscriberProbe[(Int, FlowWithSource[Int, Int])]()
publisher.subscribe(subscriber)
val upstreamSubscription = publisherProbeProbe.expectSubscription()
val downstreamSubscription = subscriber.expectSubscription()
downstreamSubscription.request(100)
val e = TE("test")
upstreamSubscription.sendError(e)
subscriber.expectError(e)
}
"abort on onError from upstream when substreams are running" in {
val publisherProbeProbe = StreamTestKit.PublisherProbe[Int]()
val publisher = FlowFrom(publisherProbeProbe).groupBy(_ % 2).toPublisher()
val subscriber = StreamTestKit.SubscriberProbe[(Int, FlowWithSource[Int, Int])]()
publisher.subscribe(subscriber)
val upstreamSubscription = publisherProbeProbe.expectSubscription()
val downstreamSubscription = subscriber.expectSubscription()
downstreamSubscription.request(100)
upstreamSubscription.sendNext(1)
val (_, substream) = subscriber.expectNext()
val substreamPuppet = StreamPuppet(substream.toPublisher())
substreamPuppet.request(1)
substreamPuppet.expectNext(1)
val e = TE("test")
upstreamSubscription.sendError(e)
substreamPuppet.expectError(e)
subscriber.expectError(e)
}
}
}