=str #16699 fix some FIXMEs
This commit is contained in:
parent
aad8704085
commit
c63b9c801a
11 changed files with 45 additions and 40 deletions
|
|
@ -62,9 +62,22 @@ final case class OperationAttributes private (attributes: immutable.Seq[Operatio
|
|||
* INTERNAL API
|
||||
*/
|
||||
private[akka] def nameLifted: Option[String] =
|
||||
attributes.collect {
|
||||
case Name(name) ⇒ name
|
||||
}.reduceOption(_ + "-" + _) // FIXME don't do a double-traversal, use a fold instead
|
||||
if (attributes.isEmpty)
|
||||
None
|
||||
else {
|
||||
val sb = new java.lang.StringBuilder
|
||||
val iter = attributes.iterator
|
||||
while (iter.hasNext) {
|
||||
iter.next() match {
|
||||
case Name(name) ⇒
|
||||
if (sb.length == 0) sb.append(name)
|
||||
else sb.append("-").append(name)
|
||||
case _ ⇒
|
||||
}
|
||||
}
|
||||
if (sb.length == 0) None
|
||||
else Some(sb.toString)
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
|
|
|
|||
|
|
@ -118,7 +118,7 @@ private[akka] case class ActorFlowMaterializerImpl(override val settings: ActorF
|
|||
(UnfairMerge.props(effectiveSettings, shape.inlets.size), shape.preferred +: shape.inArray.toSeq, shape.out)
|
||||
|
||||
case ConcatModule(shape, _) ⇒
|
||||
require(shape.inArray.size == 2, "currently only supporting concatenation of exactly two inputs") // FIXME
|
||||
require(shape.inArray.size == 2, "currently only supporting concatenation of exactly two inputs") // TODO
|
||||
(Concat.props(effectiveSettings), shape.inArray.toSeq, shape.out)
|
||||
|
||||
case zip: ZipWithModule ⇒
|
||||
|
|
@ -269,7 +269,7 @@ private[akka] object ActorProcessorFactory {
|
|||
case GroupBy(f, _) ⇒ (GroupByProcessorImpl.props(settings, f), ())
|
||||
case PrefixAndTail(n, _) ⇒ (PrefixAndTailImpl.props(settings, n), ())
|
||||
case SplitWhen(p, _) ⇒ (SplitWhenProcessorImpl.props(settings, p), ())
|
||||
case ConcatAll(_) ⇒ (ConcatAllImpl.props(materializer), ()) //FIXME closes over the materializer, is this good?
|
||||
case ConcatAll(_) ⇒ (ConcatAllImpl.props(materializer), ())
|
||||
case StageFactory(mkStage, _) ⇒ (ActorInterpreter.props(settings, List(mkStage()), materializer), ())
|
||||
case TimerTransform(mkStage, _) ⇒ (TimerTransformerProcessorsImpl.props(settings, mkStage()), ())
|
||||
case MaterializingStageFactory(mkStageAndMat, _) ⇒
|
||||
|
|
|
|||
|
|
@ -248,7 +248,6 @@ private[akka] abstract class ActorProcessorImpl(val settings: ActorFlowMateriali
|
|||
with ActorLogging
|
||||
with Pump {
|
||||
|
||||
// FIXME: make pump a member
|
||||
protected val primaryInputs: Inputs = new BatchingInputBuffer(settings.initialInputBufferSize, this) {
|
||||
override def inputOnError(e: Throwable): Unit = ActorProcessorImpl.this.onError(e)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -79,11 +79,9 @@ private[akka] abstract class FanoutOutputs(val maxBufferSize: Int, val initialBu
|
|||
case SubscribePending ⇒
|
||||
subscribePending()
|
||||
case RequestMore(subscription, elements) ⇒
|
||||
// FIXME can we avoid this cast?
|
||||
moreRequested(subscription.asInstanceOf[ActorSubscriptionWithCursor[Any]], elements)
|
||||
pump.pump()
|
||||
case Cancel(subscription) ⇒
|
||||
// FIXME can we avoid this cast?
|
||||
unregisterSubscription(subscription.asInstanceOf[ActorSubscriptionWithCursor[Any]])
|
||||
pump.pump()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -142,7 +142,7 @@ private[akka] class FuturePublisher(future: Future[Any], settings: ActorFlowMate
|
|||
}
|
||||
}
|
||||
|
||||
override def postStop(): Unit = // FIXME if something blows up, are the subscribers onErrored?
|
||||
override def postStop(): Unit =
|
||||
if (exposedPublisher ne null)
|
||||
exposedPublisher.shutdown(shutdownReason)
|
||||
|
||||
|
|
|
|||
|
|
@ -173,7 +173,7 @@ private[akka] final case class Grouped[T](n: Int) extends PushPullStage[T, immut
|
|||
override def onPull(ctx: Context[immutable.Seq[T]]): SyncDirective =
|
||||
if (ctx.isFinishing) {
|
||||
val elem = buf.result()
|
||||
buf.clear() //FIXME null out the reference to the `buf`?
|
||||
buf.clear()
|
||||
left = n
|
||||
ctx.pushAndFinish(elem)
|
||||
} else ctx.pull()
|
||||
|
|
|
|||
|
|
@ -220,7 +220,7 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph
|
|||
* `n` must be positive, otherwise IllegalArgumentException is thrown.
|
||||
*/
|
||||
def grouped(n: Int): javadsl.Flow[In, java.util.List[Out @uncheckedVariance], Mat] =
|
||||
new Flow(delegate.grouped(n).map(_.asJava)) // FIXME optimize to one step
|
||||
new Flow(delegate.grouped(n).map(_.asJava)) // TODO optimize to one step
|
||||
|
||||
/**
|
||||
* Similar to `fold` but is not a terminal operation,
|
||||
|
|
@ -246,7 +246,7 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph
|
|||
* IllegalArgumentException is thrown.
|
||||
*/
|
||||
def groupedWithin(n: Int, d: FiniteDuration): javadsl.Flow[In, java.util.List[Out @uncheckedVariance], Mat] =
|
||||
new Flow(delegate.groupedWithin(n, d).map(_.asJava)) // FIXME optimize to one step
|
||||
new Flow(delegate.groupedWithin(n, d).map(_.asJava)) // TODO optimize to one step
|
||||
|
||||
/**
|
||||
* Discard the given number of elements at the beginning of the stream.
|
||||
|
|
@ -368,7 +368,7 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph
|
|||
* the element is dropped and the stream and substreams continue.
|
||||
*/
|
||||
def groupBy[K](f: japi.Function[Out, K]): javadsl.Flow[In, akka.japi.Pair[K, javadsl.Source[Out @uncheckedVariance, Unit]], Mat] =
|
||||
new Flow(delegate.groupBy(f.apply).map { case (k, p) ⇒ akka.japi.Pair(k, p.asJava) }) // FIXME optimize to one step
|
||||
new Flow(delegate.groupBy(f.apply).map { case (k, p) ⇒ akka.japi.Pair(k, p.asJava) }) // TODO optimize to one step
|
||||
|
||||
/**
|
||||
* This operation applies the given predicate to all incoming elements and
|
||||
|
|
|
|||
|
|
@ -3,6 +3,7 @@
|
|||
*/
|
||||
package akka.stream.javadsl
|
||||
|
||||
import scala.collection.immutable
|
||||
import java.util.concurrent.Callable
|
||||
import akka.actor.{ Cancellable, ActorRef, Props }
|
||||
import akka.japi.Util
|
||||
|
|
@ -95,9 +96,21 @@ object Source {
|
|||
* Iterator, but every Subscriber directly attached to the Publisher of this
|
||||
* stream will see an individual flow of elements (always starting from the
|
||||
* beginning) regardless of when they subscribed.
|
||||
*
|
||||
* Make sure that the `Iterable` is immutable or at least not modified after
|
||||
* being used as a `Source`. Otherwise the stream may fail with
|
||||
* `ConcurrentModificationException` or other more subtle errors may occur.
|
||||
*/
|
||||
def from[O](iterable: java.lang.Iterable[O]): javadsl.Source[O, Unit] =
|
||||
new Source(scaladsl.Source(akka.stream.javadsl.japi.Util.immutableIterable(iterable)))
|
||||
def from[O](iterable: java.lang.Iterable[O]): javadsl.Source[O, Unit] = {
|
||||
// this adapter is not immutable if the the underlying java.lang.Iterable is modified
|
||||
// but there is not anything we can do to prevent that from happening.
|
||||
// ConcurrentModificationException will be thrown in some cases.
|
||||
val scalaIterable = new immutable.Iterable[O] {
|
||||
import collection.JavaConverters._
|
||||
override def iterator: Iterator[O] = iterable.iterator().asScala
|
||||
}
|
||||
new Source(scaladsl.Source(scalaIterable))
|
||||
}
|
||||
|
||||
/**
|
||||
* Start a new `Source` from the given `Future`. The stream will consist of
|
||||
|
|
@ -363,7 +376,7 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour
|
|||
* @param n must be positive, and `d` must be greater than 0 seconds, otherwise [[IllegalArgumentException]] is thrown.
|
||||
*/
|
||||
def groupedWithin(n: Int, d: FiniteDuration): javadsl.Source[java.util.List[Out @uncheckedVariance], Mat] =
|
||||
new Source(delegate.groupedWithin(n, d).map(_.asJava)) // FIXME optimize to one step
|
||||
new Source(delegate.groupedWithin(n, d).map(_.asJava)) // TODO optimize to one step
|
||||
|
||||
/**
|
||||
* Discard the given number of elements at the beginning of the stream.
|
||||
|
|
@ -473,7 +486,7 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour
|
|||
* to consume only one of them.
|
||||
*/
|
||||
def groupBy[K](f: japi.Function[Out, K]): javadsl.Source[akka.japi.Pair[K, javadsl.Source[Out @uncheckedVariance, Unit]], Mat] =
|
||||
new Source(delegate.groupBy(f.apply).map { case (k, p) ⇒ akka.japi.Pair(k, p.asJava) }) // FIXME optimize to one step
|
||||
new Source(delegate.groupBy(f.apply).map { case (k, p) ⇒ akka.japi.Pair(k, p.asJava) }) // TODO optimize to one step
|
||||
|
||||
/**
|
||||
* This operation applies the given predicate to all incoming elements and
|
||||
|
|
|
|||
|
|
@ -1,17 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.stream.javadsl.japi
|
||||
|
||||
import scala.collection.immutable
|
||||
|
||||
object Util {
|
||||
|
||||
import collection.JavaConverters._
|
||||
// FIXME this does not make something an immutable iterable!!
|
||||
def immutableIterable[T](iterable: java.lang.Iterable[T]): immutable.Iterable[T] =
|
||||
new immutable.Iterable[T] {
|
||||
override def iterator: Iterator[T] = iterable.iterator().asScala
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -9,7 +9,7 @@ package akka.stream.javadsl.japi
|
|||
/**
|
||||
* A Function interface. Used to create first-class-functions is Java.
|
||||
*/
|
||||
@SerialVersionUID(1L) // FIXME: add variance to akka.japi and remove this akka.stream.japi!
|
||||
@SerialVersionUID(1L) // TODO: add variance to akka.japi and remove this akka.stream.japi!
|
||||
trait Function[-T, +R] {
|
||||
@throws(classOf[Exception])
|
||||
def apply(param: T): R
|
||||
|
|
@ -18,7 +18,7 @@ trait Function[-T, +R] {
|
|||
/**
|
||||
* A Function interface. Used to create 2-arg first-class-functions is Java.
|
||||
*/
|
||||
@SerialVersionUID(1L) // FIXME: add variance to akka.japi and remove this akka.stream.japi!
|
||||
@SerialVersionUID(1L) // TODO: add variance to akka.japi and remove this akka.stream.japi!
|
||||
trait Function2[-T1, -T2, +R] {
|
||||
@throws(classOf[Exception])
|
||||
def apply(arg1: T1, arg2: T2): R
|
||||
|
|
@ -27,7 +27,7 @@ trait Function2[-T1, -T2, +R] {
|
|||
/**
|
||||
* A constructor/factory, takes no parameters but creates a new value of type T every call.
|
||||
*/
|
||||
@SerialVersionUID(1L) // FIXME: add variance to akka.japi and remove this akka.stream.japi!
|
||||
@SerialVersionUID(1L) // TODO: add variance to akka.japi and remove this akka.stream.japi!
|
||||
trait Creator[+T] extends Serializable {
|
||||
/**
|
||||
* This method must return a different instance upon every call.
|
||||
|
|
@ -39,7 +39,7 @@ trait Creator[+T] extends Serializable {
|
|||
/**
|
||||
* A Procedure is like a Function, but it doesn't produce a return value.
|
||||
*/
|
||||
@SerialVersionUID(1L) // FIXME: add variance to akka.japi and remove this akka.stream.japi!
|
||||
@SerialVersionUID(1L) // TODO: add variance to akka.japi and remove this akka.stream.japi!
|
||||
trait Procedure[-T] {
|
||||
@throws(classOf[Exception])
|
||||
def apply(param: T): Unit
|
||||
|
|
@ -48,7 +48,7 @@ trait Procedure[-T] {
|
|||
/**
|
||||
* Java API: Defines a criteria and determines whether the parameter meets this criteria.
|
||||
*/
|
||||
@SerialVersionUID(1L) // FIXME: add variance to akka.japi and remove this akka.stream.japi!
|
||||
@SerialVersionUID(1L) // TODO: add variance to akka.japi and remove this akka.stream.japi!
|
||||
trait Predicate[-T] {
|
||||
def test(param: T): Boolean
|
||||
}
|
||||
|
|
|
|||
|
|
@ -240,7 +240,6 @@ abstract class FlexiRoute[In, S <: Shape](val shape: S, attributes: OperationAtt
|
|||
case None ⇒ super.toString
|
||||
}
|
||||
|
||||
// FIXME what to do about this?
|
||||
override def withAttributes(attr: OperationAttributes): Graph[S, Unit] =
|
||||
throw new UnsupportedOperationException(
|
||||
"withAttributes not supported by default by FlexiRoute, subclass may override and implement it")
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue