add fusing

This commit is contained in:
Roland Kuhn 2015-12-14 17:02:00 +01:00
parent 0fb6654f4f
commit a20bbce433
64 changed files with 1397 additions and 534 deletions

View file

@ -4,16 +4,22 @@
package akka.stream.impl
import java.util.concurrent.atomic.{ AtomicInteger, AtomicReference }
import java.{ util ju }
import akka.stream.impl.MaterializerSession.MaterializationPanic
import akka.stream.impl.StreamLayout.Module
import akka.stream.impl.fusing.GraphStages.MaterializedValueSource
import akka.stream.scaladsl.Keep
import akka.stream._
import org.reactivestreams.{ Processor, Subscription, Publisher, Subscriber }
import scala.collection.mutable
import scala.util.control.{ NoStackTrace, NonFatal }
import akka.event.Logging.simpleName
import scala.annotation.tailrec
import java.util.concurrent.atomic.AtomicLong
import scala.collection.JavaConverters._
import akka.stream.impl.fusing.GraphStageModule
import akka.stream.impl.fusing.GraphStages.MaterializedValueSource
import akka.stream.impl.fusing.GraphStages.MaterializedValueSource
import akka.stream.impl.fusing.GraphModule
/**
* INTERNAL API
@ -23,14 +29,14 @@ private[akka] object StreamLayout {
// compile-time constant
final val Debug = false
final def validate(m: Module, level: Int = 0, doPrint: Boolean = false, idMap: mutable.Map[AnyRef, Int] = mutable.Map.empty): Unit = {
final def validate(m: Module, level: Int = 0, doPrint: Boolean = false, idMap: ju.Map[AnyRef, Integer] = new ju.HashMap): Unit = {
val ids = Iterator from 1
def id(obj: AnyRef) = idMap get obj match {
case Some(x) x
case None
case null
val x = ids.next()
idMap(obj) = x
idMap.put(obj, x)
x
case x x
}
def in(i: InPort) = s"${i.toString}@${id(i)}"
def out(o: OutPort) = s"${o.toString}@${id(o)}"
@ -78,7 +84,12 @@ private[akka] object StreamLayout {
case Combine(f, left, right) atomics(left) ++ atomics(right)
}
val atomic = atomics(materializedValueComputation)
if ((atomic -- subModules - m).nonEmpty) problems ::= s"computation refers to non-existent modules [${atomic -- subModules - m mkString ","}]"
val graphValues = subModules.flatMap {
case GraphModule(_, _, _, mvids) mvids
case _ Nil
}
if ((atomic -- subModules -- graphValues - m).nonEmpty)
problems ::= s"computation refers to non-existent modules [${atomic -- subModules -- graphValues - m mkString ","}]"
val print = doPrint || problems.nonEmpty
@ -98,10 +109,24 @@ private[akka] object StreamLayout {
// TODO: Special case linear composites
// TODO: Cycles
sealed trait MaterializedValueNode
case class Combine(f: (Any, Any) Any, dep1: MaterializedValueNode, dep2: MaterializedValueNode) extends MaterializedValueNode
case class Atomic(module: Module) extends MaterializedValueNode
case class Transform(f: Any Any, dep: MaterializedValueNode) extends MaterializedValueNode
sealed trait MaterializedValueNode {
/*
* These nodes are used in hash maps and therefore must have efficient implementations
* of hashCode and equals. There is no value in allowing aliases to be equal, so using
* reference equality.
*/
override def hashCode: Int = super.hashCode
override def equals(other: Any): Boolean = super.equals(other)
}
case class Combine(f: (Any, Any) Any, dep1: MaterializedValueNode, dep2: MaterializedValueNode) extends MaterializedValueNode {
override def toString: String = s"Combine($dep1,$dep2)"
}
case class Atomic(module: Module) extends MaterializedValueNode {
override def toString: String = s"Atomic(${module.attributes.nameOrDefault(module.getClass.getName)})"
}
case class Transform(f: Any Any, dep: MaterializedValueNode) extends MaterializedValueNode {
override def toString: String = s"Transform($dep)"
}
case object Ignore extends MaterializedValueNode
trait Module {
@ -179,7 +204,7 @@ private[akka] object StreamLayout {
if (Debug) validate(this)
CompositeModule(
subModules = if (this.isSealed) Set(this) else this.subModules,
if (this.isSealed) Set(this) else this.subModules,
shape,
downstreams,
upstreams,
@ -228,6 +253,40 @@ private[akka] object StreamLayout {
Attributes.none)
}
/**
* Creates a new Module which is `this` Module composed with `that` Module.
*
* The difference to compose(that) is that this version completely ignores the materialized value
* computation of `that` while the normal version executes the computation and discards its result.
* This means that this version must not be used for user-provided `that` modules because users may
* transform materialized values only to achieve some side-effect; it can only be
* used where we know that there is no meaningful computation to be done (like for
* MaterializedValueSource).
*
* @param that a Module to be composed with (cannot be itself)
* @return a Module that represents the composition of `this` and `that`
*/
def composeNoMat(that: Module): Module = {
if (Debug) validate(this)
require(that ne this, "A module cannot be added to itself. You should pass a separate instance to compose().")
require(!subModules(that), "An existing submodule cannot be added again. All contained modules must be unique.")
val modules1 = if (this.isSealed) Set(this) else this.subModules
val modules2 = if (that.isSealed) Set(that) else that.subModules
val matComputation = if (this.isSealed) Atomic(this) else this.materializedValueComputation
CompositeModule(
modules1 ++ modules2,
AmorphousShape(shape.inlets ++ that.shape.inlets, shape.outlets ++ that.shape.outlets),
downstreams ++ that.downstreams,
upstreams ++ that.upstreams,
// would like to optimize away this allocation for Keep.{left,right} but that breaks side-effecting transformations
matComputation,
Attributes.none)
}
/**
* Creates a new Module which contains `this` Module
* @return a new Module
@ -236,7 +295,7 @@ private[akka] object StreamLayout {
if (Debug) validate(this)
CompositeModule(
subModules = Set(this),
Set(this),
shape,
/*
* Composite modules always maintain the flattened upstreams/downstreams map (i.e. they contain all the
@ -296,7 +355,9 @@ private[akka] object StreamLayout {
override def materializedValueComputation: MaterializedValueNode = Ignore
}
final case class CopiedModule(shape: Shape, attributes: Attributes, copyOf: Module) extends Module {
final case class CopiedModule(override val shape: Shape,
override val attributes: Attributes,
copyOf: Module) extends Module {
override val subModules: Set[Module] = Set(copyOf)
override def withAttributes(attr: Attributes): Module = this.copy(attributes = attr)
@ -316,12 +377,12 @@ private[akka] object StreamLayout {
}
final case class CompositeModule(
subModules: Set[Module],
shape: Shape,
override val subModules: Set[Module],
override val shape: Shape,
override val downstreams: Map[OutPort, InPort],
override val upstreams: Map[InPort, OutPort],
override val materializedValueComputation: MaterializedValueNode,
attributes: Attributes) extends Module {
override val attributes: Attributes) extends Module {
override def replaceShape(s: Shape): Module = {
shape.requireSamePortsAs(s)
@ -340,6 +401,34 @@ private[akka] object StreamLayout {
| Upstreams: ${upstreams.iterator.map { case (out, in) s"\n $out -> $in" }.mkString("")}
|""".stripMargin
}
final case class FusedModule(
override val subModules: Set[Module],
override val shape: Shape,
override val downstreams: Map[OutPort, InPort],
override val upstreams: Map[InPort, OutPort],
override val materializedValueComputation: MaterializedValueNode,
override val attributes: Attributes,
info: Fusing.StructuralInfo) extends Module {
override def replaceShape(s: Shape): Module = {
shape.requireSamePortsAs(s)
copy(shape = s)
}
override def carbonCopy: Module = CopiedModule(shape.deepCopy(), attributes, copyOf = this)
override def withAttributes(attributes: Attributes): FusedModule = copy(attributes = attributes)
override def toString =
s"""
| Module: ${this.attributes.nameOrDefault("unnamed")}
| Modules:
| ${subModules.iterator.map(m m.toString.split("\n").mkString("\n ")).mkString("\n ")}
| Downstreams: ${downstreams.iterator.map { case (in, out) s"\n $in -> $out" }.mkString("")}
| Upstreams: ${upstreams.iterator.map { case (out, in) s"\n $out -> $in" }.mkString("")}
|""".stripMargin
}
}
private[stream] object VirtualProcessor {
@ -442,152 +531,13 @@ private[stream] final class VirtualProcessor[T] extends Processor[T, T] {
}
}
/**
* INTERNAL API
*/
private[stream] final case class MaterializedValueSource[M](
shape: SourceShape[M] = SourceShape[M](Outlet[M]("Materialized.out")),
attributes: Attributes = Attributes.name("Materialized")) extends StreamLayout.Module {
override def subModules: Set[Module] = Set.empty
override def withAttributes(attr: Attributes): Module = this.copy(shape = amendShape(attr), attributes = attr)
override def carbonCopy: Module = this.copy(shape = SourceShape(Outlet[M]("Materialized.out")))
override def replaceShape(s: Shape): Module =
if (s == shape) this
else throw new UnsupportedOperationException("cannot replace the shape of MaterializedValueSource")
private def amendShape(attr: Attributes): SourceShape[M] = {
val thisN = attributes.nameOrDefault(null)
val thatN = attr.nameOrDefault(null)
if ((thatN eq null) || thisN == thatN) shape
else shape.copy(outlet = Outlet(thatN + ".out"))
}
}
/**
* INTERNAL API
*/
private[stream] object MaterializedValuePublisher {
final val NotRequested = 0
final val Requested = 1
final val Completed = 2
final val NoValue = new AnyRef
}
/**
* INTERNAL API
*/
private[stream] class MaterializedValuePublisher extends Publisher[Any] {
import MaterializedValuePublisher._
private val value = new AtomicReference[AnyRef](NoValue)
private val registeredSubscriber = new AtomicReference[Subscriber[_ >: Any]](null)
private val requestState = new AtomicInteger(NotRequested)
private def close(): Unit = {
requestState.set(Completed)
value.set(NoValue)
registeredSubscriber.set(null)
}
private def tryOrClose(block: Unit): Unit = {
try block catch {
case v: ReactiveStreamsCompliance.SpecViolation
close()
// What else can we do here?
case NonFatal(e)
val sub = registeredSubscriber.get()
if ((sub ne null) &&
requestState.compareAndSet(NotRequested, Completed) || requestState.compareAndSet(Requested, Completed)) {
sub.onError(e)
}
close()
throw e
}
}
def setValue(m: Any): Unit =
tryOrClose {
if (value.compareAndSet(NoValue, m.asInstanceOf[AnyRef]) && requestState.get() == Requested)
pushAndClose(m)
}
/*
* Both call-sites do a CAS on their "own" side and a GET on the other side. The possible overlaps
* are (removing symmetric cases where you can relabel A->B, B->A):
*
* A-CAS
* A-GET
* B-CAS
* B-GET - pushAndClose fires here
*
* A-CAS
* B-CAS
* A-GET - pushAndClose fires here
* B-GET - pushAndClose fires here
*
* A-CAS
* B-CAS
* B-GET - pushAndClose fires here
* A-GET - pushAndClose fires here
*
* The proof that there are no other cases:
*
* - all permutations of 4 operations are 4! = 24
* - the operations of A and B are cannot be reordered, so there are 24 / (2 * 2) = 6 actual orderings
* - if we don't count cases which are a simple relabeling A->B, B->A, we get 6 / 2 = 3 reorderings
* which are all enumerated above.
*
* pushAndClose protects against double onNext by doing a CAS itself.
*/
private def pushAndClose(m: Any): Unit = {
if (requestState.compareAndSet(Requested, Completed)) {
val sub = registeredSubscriber.get()
ReactiveStreamsCompliance.tryOnNext(sub, m)
ReactiveStreamsCompliance.tryOnComplete(sub)
close()
}
}
override def subscribe(subscriber: Subscriber[_ >: Any]): Unit = {
tryOrClose {
ReactiveStreamsCompliance.requireNonNullSubscriber(subscriber)
if (registeredSubscriber.compareAndSet(null, subscriber)) {
ReactiveStreamsCompliance.tryOnSubscribe(subscriber, new Subscription {
override def cancel(): Unit = close()
override def request(n: Long): Unit = {
if (n <= 0) {
ReactiveStreamsCompliance.tryOnError(
subscriber,
ReactiveStreamsCompliance.numberOfElementsInRequestMustBePositiveException)
} else {
if (requestState.compareAndSet(NotRequested, Requested)) {
val m = value.get()
if (m ne NoValue) pushAndClose(m)
}
}
}
})
} else {
if (subscriber == registeredSubscriber.get())
ReactiveStreamsCompliance.rejectDuplicateSubscriber(subscriber)
else
ReactiveStreamsCompliance.rejectAdditionalSubscriber(subscriber, "MaterializedValuePublisher")
}
}
}
}
/**
* INERNAL API
*/
private[stream] object MaterializerSession {
class MaterializationPanic(cause: Throwable) extends RuntimeException("Materialization aborted.", cause) with NoStackTrace
final val Debug = false
}
/**
@ -596,10 +546,10 @@ private[stream] object MaterializerSession {
private[stream] abstract class MaterializerSession(val topLevel: StreamLayout.Module, val initialAttributes: Attributes) {
import StreamLayout._
private var subscribersStack: List[mutable.Map[InPort, Subscriber[Any]]] =
mutable.Map.empty[InPort, Subscriber[Any]].withDefaultValue(null) :: Nil
private var publishersStack: List[mutable.Map[OutPort, Publisher[Any]]] =
mutable.Map.empty[OutPort, Publisher[Any]].withDefaultValue(null) :: Nil
private var subscribersStack: List[ju.Map[InPort, Subscriber[Any]]] =
new ju.HashMap[InPort, Subscriber[Any]] :: Nil
private var publishersStack: List[ju.Map[OutPort, Publisher[Any]]] =
new ju.HashMap[OutPort, Publisher[Any]] :: Nil
/*
* Please note that this stack keeps track of the scoped modules wrapped in CopiedModule but not the CopiedModule
@ -611,16 +561,16 @@ private[stream] abstract class MaterializerSession(val topLevel: StreamLayout.Mo
*/
private var moduleStack: List[Module] = topLevel :: Nil
private def subscribers: mutable.Map[InPort, Subscriber[Any]] = subscribersStack.head
private def publishers: mutable.Map[OutPort, Publisher[Any]] = publishersStack.head
private def subscribers: ju.Map[InPort, Subscriber[Any]] = subscribersStack.head
private def publishers: ju.Map[OutPort, Publisher[Any]] = publishersStack.head
private def currentLayout: Module = moduleStack.head
// Enters a copied module and establishes a scope that prevents internals to leak out and interfere with copies
// of the same module.
// We don't store the enclosing CopiedModule itself as state since we don't use it anywhere else than exit and enter
private def enterScope(enclosing: CopiedModule): Unit = {
subscribersStack ::= mutable.Map.empty.withDefaultValue(null)
publishersStack ::= mutable.Map.empty.withDefaultValue(null)
subscribersStack ::= new ju.HashMap
publishersStack ::= new ju.HashMap
moduleStack ::= enclosing.copyOf
}
@ -638,11 +588,11 @@ private[stream] abstract class MaterializerSession(val topLevel: StreamLayout.Mo
// When we exit the scope of a copied module, pick up the Subscribers/Publishers belonging to exposed ports of
// the original module and assign them to the copy ports in the outer scope that we will return to
enclosing.copyOf.shape.inlets.iterator.zip(enclosing.shape.inlets.iterator).foreach {
case (original, exposed) assignPort(exposed, scopeSubscribers(original))
case (original, exposed) assignPort(exposed, scopeSubscribers.get(original))
}
enclosing.copyOf.shape.outlets.iterator.zip(enclosing.shape.outlets.iterator).foreach {
case (original, exposed) assignPort(exposed, scopePublishers(original))
case (original, exposed) assignPort(exposed, scopePublishers.get(original))
}
}
@ -658,10 +608,10 @@ private[stream] abstract class MaterializerSession(val topLevel: StreamLayout.Mo
// Cancels all intermediate Publishers and fails all intermediate Subscribers.
// (This is an attempt to clean up after an exception during materialization)
val errorPublisher = new ErrorPublisher(new MaterializationPanic(cause), "")
for (subMap subscribersStack; sub subMap.valuesIterator)
for (subMap subscribersStack; sub subMap.asScala.valuesIterator)
errorPublisher.subscribe(sub)
for (pubMap publishersStack; pub pubMap.valuesIterator)
for (pubMap publishersStack; pub pubMap.asScala.valuesIterator)
pub.subscribe(new CancellingSubscriber)
throw cause
@ -671,20 +621,27 @@ private[stream] abstract class MaterializerSession(val topLevel: StreamLayout.Mo
protected def mergeAttributes(parent: Attributes, current: Attributes): Attributes =
parent and current
private val matValSrc: ju.Map[MaterializedValueNode, List[MaterializedValueSource[Any]]] = new ju.HashMap
def registerSrc(ms: MaterializedValueSource[Any]): Unit = {
if (MaterializerSession.Debug) println(s"registering source $ms")
matValSrc.get(ms.computation) match {
case null matValSrc.put(ms.computation, ms :: Nil)
case xs matValSrc.put(ms.computation, ms :: xs)
}
}
protected def materializeModule(module: Module, effectiveAttributes: Attributes): Any = {
val materializedValues = collection.mutable.HashMap.empty[Module, Any]
var materializedValuePublishers: List[MaterializedValuePublisher] = Nil
val materializedValues: ju.Map[Module, Any] = new ju.HashMap
for (submodule module.subModules) {
val subEffectiveAttributes = mergeAttributes(effectiveAttributes, submodule.attributes)
submodule match {
case mv: MaterializedValueSource[_]
val pub = new MaterializedValuePublisher
materializedValuePublishers ::= pub
materializedValues.put(mv, ())
assignPort(mv.shape.outlet, pub)
case GraphStageModule(shape, attributes, mv: MaterializedValueSource[_])
val copy = mv.copySrc.asInstanceOf[MaterializedValueSource[Any]]
registerSrc(copy)
materializeAtomic(copy.module, subEffectiveAttributes, materializedValues)
case atomic if atomic.isAtomic
materializedValues.put(atomic, materializeAtomic(atomic, subEffectiveAttributes))
materializeAtomic(atomic, subEffectiveAttributes, materializedValues)
case copied: CopiedModule
enterScope(copied)
materializedValues.put(copied, materializeModule(copied, subEffectiveAttributes))
@ -694,38 +651,54 @@ private[stream] abstract class MaterializerSession(val topLevel: StreamLayout.Mo
}
}
val mat = resolveMaterialized(module.materializedValueComputation, materializedValues)
materializedValuePublishers foreach { pub pub.setValue(mat) }
mat
if (MaterializerSession.Debug) {
println("RESOLVING")
println(s" module = $module")
println(s" computation = ${module.materializedValueComputation}")
println(s" matValSrc = $matValSrc")
println(s" matVals = $materializedValues")
}
resolveMaterialized(module.materializedValueComputation, materializedValues, " ")
}
protected def materializeComposite(composite: Module, effectiveAttributes: Attributes): Any = {
materializeModule(composite, effectiveAttributes)
}
protected def materializeAtomic(atomic: Module, effectiveAttributes: Attributes): Any
protected def materializeAtomic(atomic: Module, effectiveAttributes: Attributes, matVal: ju.Map[Module, Any]): Unit
private def resolveMaterialized(matNode: MaterializedValueNode, materializedValues: collection.Map[Module, Any]): Any = matNode match {
case Atomic(m) materializedValues(m)
case Combine(f, d1, d2) f(resolveMaterialized(d1, materializedValues), resolveMaterialized(d2, materializedValues))
case Transform(f, d) f(resolveMaterialized(d, materializedValues))
case Ignore ()
private def resolveMaterialized(matNode: MaterializedValueNode, matVal: ju.Map[Module, Any], indent: String): Any = {
if (MaterializerSession.Debug) println(indent + matNode)
val ret = matNode match {
case Atomic(m) matVal.get(m)
case Combine(f, d1, d2) f(resolveMaterialized(d1, matVal, indent + " "), resolveMaterialized(d2, matVal, indent + " "))
case Transform(f, d) f(resolveMaterialized(d, matVal, indent + " "))
case Ignore ()
}
if (MaterializerSession.Debug) println(indent + s"result = $ret")
matValSrc.remove(matNode) match {
case null // nothing to do
case srcs
if (MaterializerSession.Debug) println(indent + s"triggering sources $srcs")
srcs.foreach(_.setValue(ret))
}
ret
}
final protected def assignPort(in: InPort, subscriber: Subscriber[Any]): Unit = {
subscribers(in) = subscriber
subscribers.put(in, subscriber)
// Interface (unconnected) ports of the current scope will be wired when exiting the scope
if (!currentLayout.inPorts(in)) {
val publisher = publishers(currentLayout.upstreams(in))
val publisher = publishers.get(currentLayout.upstreams(in))
if (publisher ne null) publisher.subscribe(subscriber)
}
}
final protected def assignPort(out: OutPort, publisher: Publisher[Any]): Unit = {
publishers(out) = publisher
publishers.put(out, publisher)
// Interface (unconnected) ports of the current scope will be wired when exiting the scope
if (!currentLayout.outPorts(out)) {
val subscriber = subscribers(currentLayout.downstreams(out))
val subscriber = subscribers.get(currentLayout.downstreams(out))
if (subscriber ne null) publisher.subscribe(subscriber)
}
}