=htc Various minor cleanups (#20451)

* minor fixes

* remove now superfluous buffer from MultipartUnmarshaller

* remove unused TokenSourceActor

* remove FIXME: add tests, see #16437

* removed unused param remoteAddress (comment: TODO: remove after #16168 is cleared)

* convert FIXME to TODO (#18709)

* reenable tests in {Request|Response}RendererSpec due to fixed #15981

* remove logging workaround in StreamTestDefaultMailbox due to fixed #15947
This commit is contained in:
2beaucoup 2016-05-06 10:32:06 +02:00 committed by Konrad Malawski
parent d24c1e0ede
commit bc7cd17bee
16 changed files with 33 additions and 95 deletions

View file

@ -11,11 +11,10 @@ import scala.concurrent.duration.Duration
import akka.actor.Address import akka.actor.Address
import akka.actor.AddressFromURIString import akka.actor.AddressFromURIString
import akka.dispatch.Dispatchers import akka.dispatch.Dispatchers
import akka.util.Helpers.{Requiring, ConfigOps, toRootLowerCase} import akka.util.Helpers.{ Requiring, ConfigOps, toRootLowerCase }
import scala.concurrent.duration.FiniteDuration import scala.concurrent.duration.FiniteDuration
import akka.japi.Util.immutableSeq import akka.japi.Util.immutableSeq
import java.util.Locale
final class ClusterSettings(val config: Config, val systemName: String) { final class ClusterSettings(val config: Config, val systemName: String) {
@ -98,7 +97,7 @@ final class ClusterSettings(val config: Config, val systemName: String) {
val MinNrOfMembersOfRole: Map[String, Int] = { val MinNrOfMembersOfRole: Map[String, Int] = {
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
cc.getConfig("role").root.asScala.collect { cc.getConfig("role").root.asScala.collect {
case (key, value: ConfigObject) (key -> value.toConfig.getInt("min-nr-of-members")) case (key, value: ConfigObject) key -> value.toConfig.getInt("min-nr-of-members")
}.toMap }.toMap
} }
val JmxEnabled: Boolean = cc.getBoolean("jmx.enabled") val JmxEnabled: Boolean = cc.getBoolean("jmx.enabled")

View file

@ -1,40 +0,0 @@
/*
* Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.http.impl.engine
import scala.annotation.tailrec
import akka.stream.actor.{ ActorPublisherMessage, ActorPublisher }
/**
* An actor publisher for producing a simple stream of singleton tokens
* the release of which is triggered by the reception of a [[TokenSourceActor.Trigger]] message.
*/
// FIXME #16520 move this into streams
private[engine] class TokenSourceActor[T](token: T) extends ActorPublisher[T] {
private var triggered = 0
def receive = {
case TokenSourceActor.Trigger
triggered += 1
tryDispatch()
case ActorPublisherMessage.Request(_)
tryDispatch()
case ActorPublisherMessage.Cancel
context.stop(self)
}
@tailrec private def tryDispatch(): Unit =
if (triggered > 0 && totalDemand > 0) {
onNext(token)
triggered -= 1
tryDispatch()
}
}
private[engine] object TokenSourceActor {
case object Trigger
}

View file

@ -4,7 +4,6 @@
package akka.http.impl.engine.client package akka.http.impl.engine.client
import java.net.InetSocketAddress
import akka.NotUsed import akka.NotUsed
import akka.http.scaladsl.settings.ConnectionPoolSettings import akka.http.scaladsl.settings.ConnectionPoolSettings
@ -69,7 +68,7 @@ private object PoolFlow {
*/ */
def apply(connectionFlow: Flow[HttpRequest, HttpResponse, Future[Http.OutgoingConnection]], def apply(connectionFlow: Flow[HttpRequest, HttpResponse, Future[Http.OutgoingConnection]],
remoteAddress: InetSocketAddress, settings: ConnectionPoolSettings, log: LoggingAdapter)( settings: ConnectionPoolSettings, log: LoggingAdapter)(
implicit system: ActorSystem, fm: Materializer): Flow[RequestContext, ResponseContext, NotUsed] = implicit system: ActorSystem, fm: Materializer): Flow[RequestContext, ResponseContext, NotUsed] =
Flow.fromGraph(GraphDSL.create[FlowShape[RequestContext, ResponseContext]]() { implicit b Flow.fromGraph(GraphDSL.create[FlowShape[RequestContext, ResponseContext]]() { implicit b
import settings._ import settings._
@ -77,8 +76,8 @@ private object PoolFlow {
val conductor = b.add(PoolConductor(maxConnections, pipeliningLimit, log)) val conductor = b.add(PoolConductor(maxConnections, pipeliningLimit, log))
val slots = Vector val slots = Vector
.tabulate(maxConnections)(PoolSlot(_, connectionFlow, remoteAddress, settings)) .tabulate(maxConnections)(PoolSlot(_, connectionFlow, settings))
.map(b.add(_)) .map(b.add)
val responseMerge = b.add(Merge[ResponseContext](maxConnections)) val responseMerge = b.add(Merge[ResponseContext](maxConnections))
val slotEventMerge = b.add(Merge[PoolSlot.RawSlotEvent](maxConnections)) val slotEventMerge = b.add(Merge[PoolSlot.RawSlotEvent](maxConnections))

View file

@ -4,8 +4,6 @@
package akka.http.impl.engine.client package akka.http.impl.engine.client
import java.net.InetSocketAddress
import akka.actor._ import akka.actor._
import akka.http.impl.engine.client.PoolFlow._ import akka.http.impl.engine.client.PoolFlow._
import akka.http.scaladsl.model._ import akka.http.scaladsl.model._
@ -66,10 +64,9 @@ private class PoolInterfaceActor(gateway: PoolGateway)(implicit fm: Materializer
case _ Http().outgoingConnection(host, port, None, settings.connectionSettings, setup.log) case _ Http().outgoingConnection(host, port, None, settings.connectionSettings, setup.log)
} }
val poolFlow = PoolFlow( val poolFlow =
Flow[HttpRequest].viaMat(connectionFlow)(Keep.right), PoolFlow(Flow[HttpRequest].viaMat(connectionFlow)(Keep.right), settings, setup.log)
new InetSocketAddress(host, port), settings, setup.log) .named("PoolFlow")
.named("PoolFlow")
Source.fromPublisher(ActorPublisher(self)).via(poolFlow).runWith(Sink.fromSubscriber(ActorSubscriber[ResponseContext](self))) Source.fromPublisher(ActorPublisher(self)).via(poolFlow).runWith(Sink.fromSubscriber(ActorSubscriber[ResponseContext](self)))
} }

View file

@ -4,8 +4,6 @@
package akka.http.impl.engine.client package akka.http.impl.engine.client
import java.net.InetSocketAddress
import akka.actor._ import akka.actor._
import akka.http.scaladsl.settings.ConnectionPoolSettings import akka.http.scaladsl.settings.ConnectionPoolSettings
import akka.http.scaladsl.model.{ HttpEntity, HttpRequest, HttpResponse } import akka.http.scaladsl.model.{ HttpEntity, HttpRequest, HttpResponse }
@ -50,7 +48,6 @@ private object PoolSlot {
v v
*/ */
def apply(slotIx: Int, connectionFlow: Flow[HttpRequest, HttpResponse, Any], def apply(slotIx: Int, connectionFlow: Flow[HttpRequest, HttpResponse, Any],
remoteAddress: InetSocketAddress, // TODO: remove after #16168 is cleared
settings: ConnectionPoolSettings)(implicit system: ActorSystem, settings: ConnectionPoolSettings)(implicit system: ActorSystem,
fm: Materializer): Graph[FanOutShape2[RequestContext, ResponseContext, RawSlotEvent], Any] = fm: Materializer): Graph[FanOutShape2[RequestContext, ResponseContext, RawSlotEvent], Any] =
GraphDSL.create() { implicit b GraphDSL.create() { implicit b
@ -63,7 +60,7 @@ private object PoolSlot {
val actor = system.actorOf(Props(new SlotProcessor(slotIx, connectionFlow, settings)).withDeploy(Deploy.local), val actor = system.actorOf(Props(new SlotProcessor(slotIx, connectionFlow, settings)).withDeploy(Deploy.local),
name) name)
ActorProcessor[RequestContext, List[ProcessorOut]](actor) ActorProcessor[RequestContext, List[ProcessorOut]](actor)
}.mapConcat(conforms) }.mapConcat(identity)
} }
val split = b.add(Broadcast[ProcessorOut](2)) val split = b.add(Broadcast[ProcessorOut](2))
@ -183,7 +180,7 @@ private object PoolSlot {
} else { } else {
inflightRequests.map { rc inflightRequests.map { rc
if (rc.retriesLeft == 0) { if (rc.retriesLeft == 0) {
val reason = error.fold[Throwable](new UnexpectedDisconnectException("Unexpected disconnect"))(conforms) val reason = error.fold[Throwable](new UnexpectedDisconnectException("Unexpected disconnect"))(identity)
connInport ! ActorPublisherMessage.Cancel connInport ! ActorPublisherMessage.Cancel
ResponseDelivery(ResponseContext(rc, Failure(reason))) ResponseDelivery(ResponseContext(rc, Failure(reason)))
} else SlotEvent.RetryRequest(rc.copy(retriesLeft = rc.retriesLeft - 1)) } else SlotEvent.RetryRequest(rc.copy(retriesLeft = rc.retriesLeft - 1))

View file

@ -80,7 +80,7 @@ private[http] object Handshake {
val protocol = find[`Sec-WebSocket-Protocol`] val protocol = find[`Sec-WebSocket-Protocol`]
val clientSupportedSubprotocols = protocol.toList.flatMap(_.protocols) val clientSupportedSubprotocols = protocol.toList.flatMap(_.protocols)
// Extension support is optional in WS and currently unsupported. // Extension support is optional in WS and currently unsupported.
// FIXME See #18709 // TODO See #18709
// val extensions = find[`Sec-WebSocket-Extensions`] // val extensions = find[`Sec-WebSocket-Extensions`]
if (upgrade.exists(_.hasWebSocket) && if (upgrade.exists(_.hasWebSocket) &&

View file

@ -140,8 +140,7 @@ class RequestRendererSpec extends FreeSpec with Matchers with BeforeAndAfterAll
"proper render a chunked" - { "proper render a chunked" - {
"PUT request with empty chunk stream and custom Content-Type" in new TestSetup() { "PUT request with empty chunk stream and custom Content-Type" in new TestSetup() {
pending // Disabled until #15981 is fixed HttpRequest(PUT, "/abc/xyz", entity = Chunked(ContentTypes.`text/plain(UTF-8)`, Source.empty)) should renderTo {
HttpRequest(PUT, "/abc/xyz", entity = Chunked(ContentTypes.`text/plain(UTF-8)`, source())) should renderTo {
"""PUT /abc/xyz HTTP/1.1 """PUT /abc/xyz HTTP/1.1
|Host: test.com:8080 |Host: test.com:8080
|User-Agent: akka-http/1.0.0 |User-Agent: akka-http/1.0.0

View file

@ -258,9 +258,8 @@ class ResponseRendererSpec extends FreeSpec with Matchers with BeforeAndAfterAll
"a chunked response" - { "a chunked response" - {
"with empty entity" in new TestSetup() { "with empty entity" in new TestSetup() {
pending // Disabled until #15981 is fixed
HttpResponse(200, List(Age(30)), HttpResponse(200, List(Age(30)),
Chunked(ContentTypes.NoContentType, source())) should renderTo { Chunked(ContentTypes.NoContentType, Source.empty)) should renderTo {
"""HTTP/1.1 200 OK """HTTP/1.1 200 OK
|Age: 30 |Age: 30
|Server: akka-http/1.0.0 |Server: akka-http/1.0.0
@ -271,9 +270,8 @@ class ResponseRendererSpec extends FreeSpec with Matchers with BeforeAndAfterAll
} }
"with empty entity but non-default Content-Type" in new TestSetup() { "with empty entity but non-default Content-Type" in new TestSetup() {
pending // Disabled until #15981 is fixed
HttpResponse(200, List(Age(30)), HttpResponse(200, List(Age(30)),
Chunked(ContentTypes.`application/json`, source())) should renderTo { Chunked(ContentTypes.`application/json`, Source.empty)) should renderTo {
"""HTTP/1.1 200 OK """HTTP/1.1 200 OK
|Age: 30 |Age: 30
|Server: akka-http/1.0.0 |Server: akka-http/1.0.0

View file

@ -26,7 +26,6 @@ trait Parameter[T] extends RequestVal[T] {
/** /**
* A collection of predefined parameters. * A collection of predefined parameters.
* FIXME: add tests, see #16437
*/ */
object Parameters { object Parameters {
import akka.http.scaladsl.common.ToNameReceptacleEnhancements._ import akka.http.scaladsl.common.ToNameReceptacleEnhancements._

View file

@ -10,7 +10,7 @@ import scala.collection.immutable
import scala.collection.immutable.VectorBuilder import scala.collection.immutable.VectorBuilder
import akka.util.ByteString import akka.util.ByteString
import akka.event.{ NoLogging, LoggingAdapter } import akka.event.{ NoLogging, LoggingAdapter }
import akka.stream.{ActorMaterializer, OverflowStrategy} import akka.stream.ActorMaterializer
import akka.stream.impl.fusing.IteratorInterpreter import akka.stream.impl.fusing.IteratorInterpreter
import akka.stream.scaladsl._ import akka.stream.scaladsl._
import akka.http.impl.engine.parsing.BodyPartParser import akka.http.impl.engine.parsing.BodyPartParser
@ -99,7 +99,6 @@ trait MultipartUnmarshallers {
val bodyParts = entity.dataBytes val bodyParts = entity.dataBytes
.via(parser) .via(parser)
.splitWhen(_.isInstanceOf[PartStart]) .splitWhen(_.isInstanceOf[PartStart])
.buffer(100, OverflowStrategy.backpressure) // FIXME remove (#19240)
.prefixAndTail(1) .prefixAndTail(1)
.collect { .collect {
case (Seq(BodyPartStart(headers, createEntity)), entityParts) case (Seq(BodyPartStart(headers, createEntity)), entityParts)

View file

@ -7,13 +7,12 @@ import com.typesafe.config.Config
import scala.concurrent.duration._ import scala.concurrent.duration._
import akka.util.Timeout import akka.util.Timeout
import scala.collection.immutable import scala.collection.immutable
import akka.util.Helpers.{ConfigOps, Requiring, toRootLowerCase} import akka.util.Helpers.{ ConfigOps, Requiring, toRootLowerCase }
import akka.japi.Util._ import akka.japi.Util._
import akka.actor.Props import akka.actor.Props
import akka.event.Logging import akka.event.Logging
import akka.event.Logging.LogLevel import akka.event.Logging.LogLevel
import akka.ConfigurationException import akka.ConfigurationException
import java.util.Locale
final class RemoteSettings(val config: Config) { final class RemoteSettings(val config: Config) {
import config._ import config._

View file

@ -22,21 +22,14 @@ private[akka] final case class StreamTestDefaultMailbox() extends MailboxType wi
final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue = { final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue = {
owner match { owner match {
case Some(r: ActorRefWithCell) case Some(r: ActorRefWithCell)
try { val actorClass = r.underlying.props.actorClass
val actorClass = r.underlying.props.actorClass assert(actorClass != classOf[Actor], s"Don't use anonymous actor classes, actor class for $r was [${actorClass.getName}]")
assert(actorClass != classOf[Actor], s"Don't use anonymous actor classes, actor class for $r was [${actorClass.getName}]") // StreamTcpManager is allowed to use another dispatcher
// StreamTcpManager is allowed to use another dispatcher assert(!actorClass.getName.startsWith("akka.stream."),
assert(!actorClass.getName.startsWith("akka.stream."), s"$r with actor class [${actorClass.getName}] must not run on default dispatcher in tests. " +
s"$r with actor class [${actorClass.getName}] must not run on default dispatcher in tests. " + "Did you forget to define `props.withDispatcher` when creating the actor? " +
"Did you forget to define `props.withDispatcher` when creating the actor? " + "Or did you forget to configure the `akka.stream.materializer` setting accordingly or force the " +
"Or did you forget to configure the `akka.stream.materializer` setting accordingly or force the " + """dispatcher using `ActorMaterializerSettings(sys).withDispatcher("akka.test.stream-dispatcher")` in the test?""")
"""dispatcher using `ActorMaterializerSettings(sys).withDispatcher("akka.test.stream-dispatcher")` in the test?""")
} catch {
// this logging should not be needed when issue #15947 has been fixed
case e: AssertionError
system.foreach(_.log.error(e, s"StreamTestDefaultMailbox assertion failed: ${e.getMessage}"))
throw e
}
case _ case _
} }
new UnboundedMailbox.MessageQueue new UnboundedMailbox.MessageQueue

View file

@ -600,7 +600,7 @@ class InterpreterSpec extends AkkaSpec with GraphInterpreterSpecKit {
setHandlers(in, out, this) setHandlers(in, out, this)
} }
override val shape: FlowShape[T, T] = FlowShape(in, out) override val shape: FlowShape[T, T] = FlowShape(in, out)
} }
private[akka] final case class KeepGoing[T]() extends GraphStage[FlowShape[T, T]] { private[akka] final case class KeepGoing[T]() extends GraphStage[FlowShape[T, T]] {

View file

@ -122,7 +122,6 @@ object ActorPublisherMessage {
* failure, completed or canceled. * failure, completed or canceled.
*/ */
trait ActorPublisher[T] extends Actor { trait ActorPublisher[T] extends Actor {
import akka.stream.actor.ActorPublisherMessage._
import ActorPublisher.Internal._ import ActorPublisher.Internal._
import ActorPublisherMessage._ import ActorPublisherMessage._
import ReactiveStreamsCompliance._ import ReactiveStreamsCompliance._
@ -412,7 +411,7 @@ private[akka] object ActorPublisherState extends ExtensionId[ActorPublisherState
override def get(system: ActorSystem): ActorPublisherState = super.get(system) override def get(system: ActorSystem): ActorPublisherState = super.get(system)
override def lookup = ActorPublisherState override def lookup() = ActorPublisherState
override def createExtension(system: ExtendedActorSystem): ActorPublisherState = override def createExtension(system: ExtendedActorSystem): ActorPublisherState =
new ActorPublisherState new ActorPublisherState

View file

@ -9,7 +9,7 @@ import akka.stream.Attributes.{ InputBuffer, LogLevels }
import akka.stream.OverflowStrategies._ import akka.stream.OverflowStrategies._
import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage
import akka.stream.impl.{ Buffer BufferImpl, ReactiveStreamsCompliance } import akka.stream.impl.{ Buffer BufferImpl, ReactiveStreamsCompliance }
import akka.stream.scaladsl.{ SourceQueue, Source } import akka.stream.scaladsl.Source
import akka.stream.stage._ import akka.stream.stage._
import akka.stream.{ Supervision, _ } import akka.stream.{ Supervision, _ }
import scala.annotation.tailrec import scala.annotation.tailrec
@ -212,11 +212,11 @@ private[akka] final case class Recover[T](pf: PartialFunction[Throwable, T]) ext
override def onPull(): Unit = { override def onPull(): Unit = {
recovered match { recovered match {
case Some(elem) { case Some(elem)
push(out, elem) push(out, elem)
completeStage() completeStage()
} case None
case None pull(in) pull(in)
} }
} }
@ -749,7 +749,7 @@ private[akka] final class Expand[In, Out](extrapolate: In ⇒ Iterator[Out]) ext
setHandler(out, new OutHandler { setHandler(out, new OutHandler {
override def onPull(): Unit = { override def onPull(): Unit = {
if (iterator.hasNext) { if (iterator.hasNext) {
if (expanded == false) { if (!expanded) {
expanded = true expanded = true
if (isClosed(in)) { if (isClosed(in)) {
push(out, iterator.next()) push(out, iterator.next())
@ -818,7 +818,7 @@ private[akka] final case class MapAsync[In, Out](parallelism: Int, f: In ⇒ Fut
if (buffer.isEmpty) { if (buffer.isEmpty) {
if (isClosed(in)) completeStage() if (isClosed(in)) completeStage()
else if (!hasBeenPulled(in)) pull(in) else if (!hasBeenPulled(in)) pull(in)
} else if (buffer.peek.elem == NotYetThere) { } else if (buffer.peek().elem == NotYetThere) {
if (todo < parallelism && !hasBeenPulled(in)) tryPull(in) if (todo < parallelism && !hasBeenPulled(in)) tryPull(in)
} else buffer.dequeue().elem match { } else buffer.dequeue().elem match {
case Success(elem) case Success(elem)

View file

@ -763,7 +763,7 @@ object MiMa extends AutoPlugin {
// internal api // internal api
FilterAnyProblemStartingWith("akka.stream.impl"), FilterAnyProblemStartingWith("akka.stream.impl"),
FilterAnyProblemStartingWith("akka.http.impl.engine.parsing.BodyPartParser"), FilterAnyProblemStartingWith("akka.http.impl"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.http.impl.util.package.printEvent"), ProblemFilters.exclude[DirectMissingMethodProblem]("akka.http.impl.util.package.printEvent"),
// #20362 - parser private // #20362 - parser private