!str #16168: Expose materialized value in the graph as a "source"

This commit is contained in:
Endre Sándor Varga 2015-03-30 14:22:12 +02:00
parent 37aa2cb886
commit a7af773e2c
33 changed files with 601 additions and 173 deletions

View file

@ -3,11 +3,15 @@
*/
package akka.stream.impl
import java.util.concurrent.atomic.{ AtomicInteger, AtomicBoolean, AtomicReference }
import akka.stream.impl.StreamLayout.Module
import akka.stream.scaladsl.{ Keep, OperationAttributes }
import akka.stream._
import org.reactivestreams.{ Subscription, Publisher, Subscriber }
import akka.event.Logging.simpleName
import scala.collection.mutable
import scala.util.control.NonFatal
/**
* INTERNAL API
@ -310,6 +314,148 @@ private[stream] class VirtualPublisher[T]() extends Publisher[T] {
override def subscribe(s: Subscriber[_ >: T]): Unit = realPublisher.subscribe(s)
}
/**
* INTERNAL API
*/
private[stream] case class MaterializedValueSource[M](
shape: SourceShape[M] = SourceShape[M](new Outlet[M]("Materialized.out")),
attributes: OperationAttributes = OperationAttributes.name("Materialized")) extends StreamLayout.Module {
override def subModules: Set[Module] = Set.empty
override def withAttributes(attr: OperationAttributes): Module = this.copy(shape = amendShape(attr), attributes = attr)
override def carbonCopy: Module = this.copy(shape = SourceShape(new Outlet[M]("Materialized.out")))
override def replaceShape(s: Shape): Module =
if (s == shape) this
else throw new UnsupportedOperationException("cannot replace the shape of MaterializedValueSource")
def amendShape(attr: OperationAttributes): SourceShape[M] = {
attr.nameOption match {
case None shape
case s: Some[String] if s == attributes.nameOption shape
case Some(name) shape.copy(outlet = new Outlet(name + ".out"))
}
}
}
/**
* INTERNAL API
*/
private[stream] object MaterializedValuePublisher {
final val NotRequested = 0
final val Requested = 1
final val Completed = 2
final val NoValue = new AnyRef
}
/**
* INTERNAL API
*/
private[stream] class MaterializedValuePublisher extends Publisher[Any] {
import MaterializedValuePublisher._
private val value = new AtomicReference[AnyRef](NoValue)
private val registeredSubscriber = new AtomicReference[Subscriber[_ >: Any]](null)
private val requestState = new AtomicInteger(NotRequested)
private def close(): Unit = {
requestState.set(Completed)
value.set(NoValue)
registeredSubscriber.set(null)
}
private def tryOrClose(block: Unit): Unit = {
try block catch {
case v: ReactiveStreamsCompliance.SpecViolation
close()
// What else can we do here?
case NonFatal(e)
val sub = registeredSubscriber.get()
if ((sub ne null) &&
requestState.compareAndSet(NotRequested, Completed) || requestState.compareAndSet(Requested, Completed)) {
sub.onError(e)
}
close()
throw e
}
}
def setValue(m: Any): Unit =
tryOrClose {
if (value.compareAndSet(NoValue, m.asInstanceOf[AnyRef]) && requestState.get() == Requested)
pushAndClose(m)
}
/*
* Both call-sites do a CAS on their "own" side and a GET on the other side. The possible overlaps
* are (removing symmetric cases where you can relabel A->B, B->A):
*
* A-CAS
* A-GET
* B-CAS
* B-GET - pushAndClose fires here
*
* A-CAS
* B-CAS
* A-GET - pushAndClose fires here
* B-GET - pushAndClose fires here
*
* A-CAS
* B-CAS
* B-GET - pushAndClose fires here
* A-GET - pushAndClose fires here
*
* The proof that there are no cases:
*
* - all permutations of 4 operations are 4! = 24
* - the operations of A and B are cannot be reordered, so there are 24 / (2 * 2) = 6 actual orderings
* - if we don't count cases which are a simple relabeling A->B, B->A, we get 6 / 2 = 3 reorderings
* which are all enumerated above.
*
* pushAndClose protects against double onNext by doing a CAS itself.
*/
private def pushAndClose(m: Any): Unit = {
if (requestState.compareAndSet(Requested, Completed)) {
val sub = registeredSubscriber.get()
ReactiveStreamsCompliance.tryOnNext(sub, m)
ReactiveStreamsCompliance.tryOnComplete(sub)
close()
}
}
override def subscribe(subscriber: Subscriber[_ >: Any]): Unit = {
tryOrClose {
ReactiveStreamsCompliance.requireNonNullSubscriber(subscriber)
if (registeredSubscriber.compareAndSet(null, subscriber)) {
ReactiveStreamsCompliance.tryOnSubscribe(subscriber, new Subscription {
override def cancel(): Unit = close()
override def request(n: Long): Unit = {
if (n <= 0) {
ReactiveStreamsCompliance.tryOnError(
subscriber,
ReactiveStreamsCompliance.numberOfElementsInRequestMustBePositiveException)
} else {
if (requestState.compareAndSet(NotRequested, Requested)) {
val m = value.get()
if (m ne NoValue) pushAndClose(m)
}
}
}
})
} else {
if (subscriber == registeredSubscriber.get())
ReactiveStreamsCompliance.rejectDuplicateSubscriber(subscriber)
else
ReactiveStreamsCompliance.rejectAdditionalSubscriber(subscriber, "MaterializedValuePublisher")
}
}
}
}
/**
* INTERNAL API
*/
@ -332,12 +478,25 @@ private[stream] abstract class MaterializerSession(val topLevel: StreamLayout.Mo
protected def materializeModule(module: Module, effectiveAttributes: OperationAttributes): Any = {
val materializedValues = collection.mutable.HashMap.empty[Module, Any]
var materializedValuePublishers: List[MaterializedValuePublisher] = Nil
for (submodule module.subModules) {
val subEffectiveAttributes = mergeAttributes(effectiveAttributes, submodule.attributes)
if (submodule.isAtomic) materializedValues.put(submodule, materializeAtomic(submodule, subEffectiveAttributes))
else materializedValues.put(submodule, materializeComposite(submodule, subEffectiveAttributes))
submodule match {
case mv: MaterializedValueSource[_]
val pub = new MaterializedValuePublisher
materializedValuePublishers ::= pub
assignPort(mv.shape.outlet, pub)
case atomic if atomic.isAtomic
materializedValues.put(atomic, materializeAtomic(atomic, subEffectiveAttributes))
case composite
materializedValues.put(composite, materializeComposite(composite, subEffectiveAttributes))
}
}
resolveMaterialized(module.materializedValueComputation, materializedValues)
val mat = resolveMaterialized(module.materializedValueComputation, materializedValues)
materializedValuePublishers foreach { pub pub.setValue(mat) }
mat
}
protected def materializeComposite(composite: Module, effectiveAttributes: OperationAttributes): Any = {