diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala index ed9731de03..0cb6686173 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala @@ -11,11 +11,10 @@ import scala.concurrent.duration.Duration import akka.actor.Address import akka.actor.AddressFromURIString import akka.dispatch.Dispatchers -import akka.util.Helpers.{Requiring, ConfigOps, toRootLowerCase} +import akka.util.Helpers.{ Requiring, ConfigOps, toRootLowerCase } import scala.concurrent.duration.FiniteDuration import akka.japi.Util.immutableSeq -import java.util.Locale final class ClusterSettings(val config: Config, val systemName: String) { @@ -98,7 +97,7 @@ final class ClusterSettings(val config: Config, val systemName: String) { val MinNrOfMembersOfRole: Map[String, Int] = { import scala.collection.JavaConverters._ cc.getConfig("role").root.asScala.collect { - case (key, value: ConfigObject) ⇒ (key -> value.toConfig.getInt("min-nr-of-members")) + case (key, value: ConfigObject) ⇒ key -> value.toConfig.getInt("min-nr-of-members") }.toMap } val JmxEnabled: Boolean = cc.getBoolean("jmx.enabled") diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/TokenSourceActor.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/TokenSourceActor.scala deleted file mode 100644 index 8966e97b26..0000000000 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/TokenSourceActor.scala +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Copyright (C) 2009-2016 Lightbend Inc. - */ - -package akka.http.impl.engine - -import scala.annotation.tailrec -import akka.stream.actor.{ ActorPublisherMessage, ActorPublisher } - -/** - * An actor publisher for producing a simple stream of singleton tokens - * the release of which is triggered by the reception of a [[TokenSourceActor.Trigger]] message. - */ -// FIXME #16520 move this into streams -private[engine] class TokenSourceActor[T](token: T) extends ActorPublisher[T] { - private var triggered = 0 - - def receive = { - case TokenSourceActor.Trigger ⇒ - triggered += 1 - tryDispatch() - - case ActorPublisherMessage.Request(_) ⇒ - tryDispatch() - - case ActorPublisherMessage.Cancel ⇒ - context.stop(self) - } - - @tailrec private def tryDispatch(): Unit = - if (triggered > 0 && totalDemand > 0) { - onNext(token) - triggered -= 1 - tryDispatch() - } -} - -private[engine] object TokenSourceActor { - case object Trigger -} diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolFlow.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolFlow.scala index 5f65b3c263..457ff9b3aa 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolFlow.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolFlow.scala @@ -4,7 +4,6 @@ package akka.http.impl.engine.client -import java.net.InetSocketAddress import akka.NotUsed import akka.http.scaladsl.settings.ConnectionPoolSettings @@ -69,7 +68,7 @@ private object PoolFlow { */ def apply(connectionFlow: Flow[HttpRequest, HttpResponse, Future[Http.OutgoingConnection]], - remoteAddress: InetSocketAddress, settings: ConnectionPoolSettings, log: LoggingAdapter)( + settings: ConnectionPoolSettings, log: LoggingAdapter)( implicit system: ActorSystem, fm: Materializer): Flow[RequestContext, ResponseContext, NotUsed] = Flow.fromGraph(GraphDSL.create[FlowShape[RequestContext, ResponseContext]]() { implicit b ⇒ import settings._ @@ -77,8 +76,8 @@ private object PoolFlow { val conductor = b.add(PoolConductor(maxConnections, pipeliningLimit, log)) val slots = Vector - .tabulate(maxConnections)(PoolSlot(_, connectionFlow, remoteAddress, settings)) - .map(b.add(_)) + .tabulate(maxConnections)(PoolSlot(_, connectionFlow, settings)) + .map(b.add) val responseMerge = b.add(Merge[ResponseContext](maxConnections)) val slotEventMerge = b.add(Merge[PoolSlot.RawSlotEvent](maxConnections)) diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolInterfaceActor.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolInterfaceActor.scala index 104a10ea0f..8deb9c687b 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolInterfaceActor.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolInterfaceActor.scala @@ -4,8 +4,6 @@ package akka.http.impl.engine.client -import java.net.InetSocketAddress - import akka.actor._ import akka.http.impl.engine.client.PoolFlow._ import akka.http.scaladsl.model._ @@ -66,10 +64,9 @@ private class PoolInterfaceActor(gateway: PoolGateway)(implicit fm: Materializer case _ ⇒ Http().outgoingConnection(host, port, None, settings.connectionSettings, setup.log) } - val poolFlow = PoolFlow( - Flow[HttpRequest].viaMat(connectionFlow)(Keep.right), - new InetSocketAddress(host, port), settings, setup.log) - .named("PoolFlow") + val poolFlow = + PoolFlow(Flow[HttpRequest].viaMat(connectionFlow)(Keep.right), settings, setup.log) + .named("PoolFlow") Source.fromPublisher(ActorPublisher(self)).via(poolFlow).runWith(Sink.fromSubscriber(ActorSubscriber[ResponseContext](self))) } diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolSlot.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolSlot.scala index 833003e8be..0667fc8212 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolSlot.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolSlot.scala @@ -4,8 +4,6 @@ package akka.http.impl.engine.client -import java.net.InetSocketAddress - import akka.actor._ import akka.http.scaladsl.settings.ConnectionPoolSettings import akka.http.scaladsl.model.{ HttpEntity, HttpRequest, HttpResponse } @@ -50,7 +48,6 @@ private object PoolSlot {                              v */ def apply(slotIx: Int, connectionFlow: Flow[HttpRequest, HttpResponse, Any], - remoteAddress: InetSocketAddress, // TODO: remove after #16168 is cleared settings: ConnectionPoolSettings)(implicit system: ActorSystem, fm: Materializer): Graph[FanOutShape2[RequestContext, ResponseContext, RawSlotEvent], Any] = GraphDSL.create() { implicit b ⇒ @@ -63,7 +60,7 @@ private object PoolSlot { val actor = system.actorOf(Props(new SlotProcessor(slotIx, connectionFlow, settings)).withDeploy(Deploy.local), name) ActorProcessor[RequestContext, List[ProcessorOut]](actor) - }.mapConcat(conforms) + }.mapConcat(identity) } val split = b.add(Broadcast[ProcessorOut](2)) @@ -183,7 +180,7 @@ private object PoolSlot { } else { inflightRequests.map { rc ⇒ if (rc.retriesLeft == 0) { - val reason = error.fold[Throwable](new UnexpectedDisconnectException("Unexpected disconnect"))(conforms) + val reason = error.fold[Throwable](new UnexpectedDisconnectException("Unexpected disconnect"))(identity) connInport ! ActorPublisherMessage.Cancel ResponseDelivery(ResponseContext(rc, Failure(reason))) } else SlotEvent.RetryRequest(rc.copy(retriesLeft = rc.retriesLeft - 1)) diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/ws/Handshake.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/ws/Handshake.scala index 5a9e0ba8e2..885392262f 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/ws/Handshake.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/ws/Handshake.scala @@ -80,7 +80,7 @@ private[http] object Handshake { val protocol = find[`Sec-WebSocket-Protocol`] val clientSupportedSubprotocols = protocol.toList.flatMap(_.protocols) // Extension support is optional in WS and currently unsupported. - // FIXME See #18709 + // TODO See #18709 // val extensions = find[`Sec-WebSocket-Extensions`] if (upgrade.exists(_.hasWebSocket) && diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/rendering/RequestRendererSpec.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/rendering/RequestRendererSpec.scala index ef99312355..036e704e1d 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/engine/rendering/RequestRendererSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/rendering/RequestRendererSpec.scala @@ -140,8 +140,7 @@ class RequestRendererSpec extends FreeSpec with Matchers with BeforeAndAfterAll "proper render a chunked" - { "PUT request with empty chunk stream and custom Content-Type" in new TestSetup() { - pending // Disabled until #15981 is fixed - HttpRequest(PUT, "/abc/xyz", entity = Chunked(ContentTypes.`text/plain(UTF-8)`, source())) should renderTo { + HttpRequest(PUT, "/abc/xyz", entity = Chunked(ContentTypes.`text/plain(UTF-8)`, Source.empty)) should renderTo { """PUT /abc/xyz HTTP/1.1 |Host: test.com:8080 |User-Agent: akka-http/1.0.0 diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/rendering/ResponseRendererSpec.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/rendering/ResponseRendererSpec.scala index b416f2d6b3..c8759f5bae 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/engine/rendering/ResponseRendererSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/rendering/ResponseRendererSpec.scala @@ -258,9 +258,8 @@ class ResponseRendererSpec extends FreeSpec with Matchers with BeforeAndAfterAll "a chunked response" - { "with empty entity" in new TestSetup() { - pending // Disabled until #15981 is fixed HttpResponse(200, List(Age(30)), - Chunked(ContentTypes.NoContentType, source())) should renderTo { + Chunked(ContentTypes.NoContentType, Source.empty)) should renderTo { """HTTP/1.1 200 OK |Age: 30 |Server: akka-http/1.0.0 @@ -271,9 +270,8 @@ class ResponseRendererSpec extends FreeSpec with Matchers with BeforeAndAfterAll } "with empty entity but non-default Content-Type" in new TestSetup() { - pending // Disabled until #15981 is fixed HttpResponse(200, List(Age(30)), - Chunked(ContentTypes.`application/json`, source())) should renderTo { + Chunked(ContentTypes.`application/json`, Source.empty)) should renderTo { """HTTP/1.1 200 OK |Age: 30 |Server: akka-http/1.0.0 diff --git a/akka-http/src/main/scala/akka/http/javadsl/server/values/Parameter.scala b/akka-http/src/main/scala/akka/http/javadsl/server/values/Parameter.scala index 7b72ccebb0..bdf924e40a 100644 --- a/akka-http/src/main/scala/akka/http/javadsl/server/values/Parameter.scala +++ b/akka-http/src/main/scala/akka/http/javadsl/server/values/Parameter.scala @@ -26,7 +26,6 @@ trait Parameter[T] extends RequestVal[T] { /** * A collection of predefined parameters. - * FIXME: add tests, see #16437 */ object Parameters { import akka.http.scaladsl.common.ToNameReceptacleEnhancements._ diff --git a/akka-http/src/main/scala/akka/http/scaladsl/unmarshalling/MultipartUnmarshallers.scala b/akka-http/src/main/scala/akka/http/scaladsl/unmarshalling/MultipartUnmarshallers.scala index 21429636ba..2933073b39 100644 --- a/akka-http/src/main/scala/akka/http/scaladsl/unmarshalling/MultipartUnmarshallers.scala +++ b/akka-http/src/main/scala/akka/http/scaladsl/unmarshalling/MultipartUnmarshallers.scala @@ -10,7 +10,7 @@ import scala.collection.immutable import scala.collection.immutable.VectorBuilder import akka.util.ByteString import akka.event.{ NoLogging, LoggingAdapter } -import akka.stream.{ActorMaterializer, OverflowStrategy} +import akka.stream.ActorMaterializer import akka.stream.impl.fusing.IteratorInterpreter import akka.stream.scaladsl._ import akka.http.impl.engine.parsing.BodyPartParser @@ -99,7 +99,6 @@ trait MultipartUnmarshallers { val bodyParts = entity.dataBytes .via(parser) .splitWhen(_.isInstanceOf[PartStart]) - .buffer(100, OverflowStrategy.backpressure) // FIXME remove (#19240) .prefixAndTail(1) .collect { case (Seq(BodyPartStart(headers, createEntity)), entityParts) ⇒ diff --git a/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala b/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala index 5638ac1806..e35aee092a 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala @@ -7,13 +7,12 @@ import com.typesafe.config.Config import scala.concurrent.duration._ import akka.util.Timeout import scala.collection.immutable -import akka.util.Helpers.{ConfigOps, Requiring, toRootLowerCase} +import akka.util.Helpers.{ ConfigOps, Requiring, toRootLowerCase } import akka.japi.Util._ import akka.actor.Props import akka.event.Logging import akka.event.Logging.LogLevel import akka.ConfigurationException -import java.util.Locale final class RemoteSettings(val config: Config) { import config._ diff --git a/akka-stream-testkit/src/test/scala/akka/stream/testkit/StreamTestDefaultMailbox.scala b/akka-stream-testkit/src/test/scala/akka/stream/testkit/StreamTestDefaultMailbox.scala index d05c709471..6799addf66 100644 --- a/akka-stream-testkit/src/test/scala/akka/stream/testkit/StreamTestDefaultMailbox.scala +++ b/akka-stream-testkit/src/test/scala/akka/stream/testkit/StreamTestDefaultMailbox.scala @@ -22,21 +22,14 @@ private[akka] final case class StreamTestDefaultMailbox() extends MailboxType wi final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue = { owner match { case Some(r: ActorRefWithCell) ⇒ - try { - val actorClass = r.underlying.props.actorClass - assert(actorClass != classOf[Actor], s"Don't use anonymous actor classes, actor class for $r was [${actorClass.getName}]") - // StreamTcpManager is allowed to use another dispatcher - assert(!actorClass.getName.startsWith("akka.stream."), - s"$r with actor class [${actorClass.getName}] must not run on default dispatcher in tests. " + - "Did you forget to define `props.withDispatcher` when creating the actor? " + - "Or did you forget to configure the `akka.stream.materializer` setting accordingly or force the " + - """dispatcher using `ActorMaterializerSettings(sys).withDispatcher("akka.test.stream-dispatcher")` in the test?""") - } catch { - // this logging should not be needed when issue #15947 has been fixed - case e: AssertionError ⇒ - system.foreach(_.log.error(e, s"StreamTestDefaultMailbox assertion failed: ${e.getMessage}")) - throw e - } + val actorClass = r.underlying.props.actorClass + assert(actorClass != classOf[Actor], s"Don't use anonymous actor classes, actor class for $r was [${actorClass.getName}]") + // StreamTcpManager is allowed to use another dispatcher + assert(!actorClass.getName.startsWith("akka.stream."), + s"$r with actor class [${actorClass.getName}] must not run on default dispatcher in tests. " + + "Did you forget to define `props.withDispatcher` when creating the actor? " + + "Or did you forget to configure the `akka.stream.materializer` setting accordingly or force the " + + """dispatcher using `ActorMaterializerSettings(sys).withDispatcher("akka.test.stream-dispatcher")` in the test?""") case _ ⇒ } new UnboundedMailbox.MessageQueue diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpec.scala index e88bd7c674..ef92c42119 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpec.scala @@ -600,7 +600,7 @@ class InterpreterSpec extends AkkaSpec with GraphInterpreterSpecKit { setHandlers(in, out, this) } - override val shape: FlowShape[T, T] = FlowShape(in, out) + override val shape: FlowShape[T, T] = FlowShape(in, out) } private[akka] final case class KeepGoing[T]() extends GraphStage[FlowShape[T, T]] { diff --git a/akka-stream/src/main/scala/akka/stream/actor/ActorPublisher.scala b/akka-stream/src/main/scala/akka/stream/actor/ActorPublisher.scala index 939a077f55..ee930b4b24 100644 --- a/akka-stream/src/main/scala/akka/stream/actor/ActorPublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/actor/ActorPublisher.scala @@ -122,7 +122,6 @@ object ActorPublisherMessage { * failure, completed or canceled. */ trait ActorPublisher[T] extends Actor { - import akka.stream.actor.ActorPublisherMessage._ import ActorPublisher.Internal._ import ActorPublisherMessage._ import ReactiveStreamsCompliance._ @@ -412,7 +411,7 @@ private[akka] object ActorPublisherState extends ExtensionId[ActorPublisherState override def get(system: ActorSystem): ActorPublisherState = super.get(system) - override def lookup = ActorPublisherState + override def lookup() = ActorPublisherState override def createExtension(system: ExtendedActorSystem): ActorPublisherState = new ActorPublisherState diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index c6ee369d65..1a8e449e20 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -9,7 +9,7 @@ import akka.stream.Attributes.{ InputBuffer, LogLevels } import akka.stream.OverflowStrategies._ import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage import akka.stream.impl.{ Buffer ⇒ BufferImpl, ReactiveStreamsCompliance } -import akka.stream.scaladsl.{ SourceQueue, Source } +import akka.stream.scaladsl.Source import akka.stream.stage._ import akka.stream.{ Supervision, _ } import scala.annotation.tailrec @@ -212,11 +212,11 @@ private[akka] final case class Recover[T](pf: PartialFunction[Throwable, T]) ext override def onPull(): Unit = { recovered match { - case Some(elem) ⇒ { + case Some(elem) ⇒ push(out, elem) completeStage() - } - case None ⇒ pull(in) + case None ⇒ + pull(in) } } @@ -749,7 +749,7 @@ private[akka] final class Expand[In, Out](extrapolate: In ⇒ Iterator[Out]) ext setHandler(out, new OutHandler { override def onPull(): Unit = { if (iterator.hasNext) { - if (expanded == false) { + if (!expanded) { expanded = true if (isClosed(in)) { push(out, iterator.next()) @@ -818,7 +818,7 @@ private[akka] final case class MapAsync[In, Out](parallelism: Int, f: In ⇒ Fut if (buffer.isEmpty) { if (isClosed(in)) completeStage() else if (!hasBeenPulled(in)) pull(in) - } else if (buffer.peek.elem == NotYetThere) { + } else if (buffer.peek().elem == NotYetThere) { if (todo < parallelism && !hasBeenPulled(in)) tryPull(in) } else buffer.dequeue().elem match { case Success(elem) ⇒ diff --git a/project/MiMa.scala b/project/MiMa.scala index 9485d4216b..3a1c84250d 100644 --- a/project/MiMa.scala +++ b/project/MiMa.scala @@ -763,7 +763,7 @@ object MiMa extends AutoPlugin { // internal api FilterAnyProblemStartingWith("akka.stream.impl"), - FilterAnyProblemStartingWith("akka.http.impl.engine.parsing.BodyPartParser"), + FilterAnyProblemStartingWith("akka.http.impl"), ProblemFilters.exclude[DirectMissingMethodProblem]("akka.http.impl.util.package.printEvent"), // #20362 - parser private