#19447 remove compatibility fallbacks for Java < 8
This commit is contained in:
parent
aed67715af
commit
f7a5151bbb
18 changed files with 46 additions and 260 deletions
|
|
@ -1,60 +0,0 @@
|
|||
/*
|
||||
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.http.impl.util
|
||||
|
||||
import java.lang.reflect.{ InvocationTargetException, Method }
|
||||
import java.net.InetSocketAddress
|
||||
|
||||
import scala.util.control.NonFatal
|
||||
|
||||
/**
|
||||
* Provides getHostString support for Java 6.
|
||||
*
|
||||
* TODO: can be removed once support for Java 6 is dropped.
|
||||
*
|
||||
* Internal API
|
||||
*/
|
||||
private[http] class EnhancedInetSocketAddress(val address: InetSocketAddress) extends AnyVal {
|
||||
/**
|
||||
* Retrieve the original host string that was given (IP or DNS name) if the current JDK has
|
||||
* a `getHostString` method with the right signature that can be made accessible.
|
||||
*
|
||||
* This avoids a reverse DNS query from calling getHostName() if the original host string is an IP address.
|
||||
* If the reflective call doesn't work it falls back to getHostName.
|
||||
*/
|
||||
def getHostStringJava6Compatible: String = EnhancedInetSocketAddress.getHostStringFunction(address)
|
||||
}
|
||||
|
||||
/**
|
||||
* Internal API
|
||||
*/
|
||||
private[http] object EnhancedInetSocketAddress {
|
||||
private[http] val getHostStringFunction: InetSocketAddress ⇒ String = {
|
||||
def fallbackToGetHostName = (_: InetSocketAddress).getHostName
|
||||
def callReflectively(m: Method) =
|
||||
(address: InetSocketAddress) ⇒
|
||||
try m.invoke(address).asInstanceOf[String]
|
||||
catch {
|
||||
case ite: InvocationTargetException ⇒ throw ite.getTargetException
|
||||
}
|
||||
|
||||
try {
|
||||
val m = classOf[InetSocketAddress].getDeclaredMethod("getHostString")
|
||||
|
||||
val candidate =
|
||||
if (m.getReturnType == classOf[String] && m.getParameterTypes.isEmpty) {
|
||||
if (!m.isAccessible) m.setAccessible(true)
|
||||
callReflectively(m)
|
||||
} else fallbackToGetHostName
|
||||
|
||||
// probe so that we can be sure a reflective problem only turns up once
|
||||
// here during construction
|
||||
candidate(new InetSocketAddress("127.0.0.1", 80))
|
||||
candidate
|
||||
} catch {
|
||||
case NonFatal(_) ⇒ fallbackToGetHostName
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -1,60 +0,0 @@
|
|||
/*
|
||||
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.http.impl.util
|
||||
|
||||
import java.lang.reflect.{ InvocationTargetException, Method }
|
||||
import javax.net.ssl.SSLParameters
|
||||
|
||||
import scala.util.control.NonFatal
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*
|
||||
* Enables accessing SslParameters even if compiled against Java 6.
|
||||
*/
|
||||
private[http] object Java6Compat {
|
||||
|
||||
def isJava6: Boolean =
|
||||
System.getProperty("java.version").take(4) match {
|
||||
case "1.6." ⇒ true
|
||||
case _ ⇒ false
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true if setting the algorithm was successful.
|
||||
*/
|
||||
def trySetEndpointIdentificationAlgorithm(parameters: SSLParameters, algorithm: String): Boolean =
|
||||
setEndpointIdentificationAlgorithmFunction(parameters, algorithm)
|
||||
|
||||
private[this] val setEndpointIdentificationAlgorithmFunction: (SSLParameters, String) ⇒ Boolean = {
|
||||
def unsupported: (SSLParameters, String) ⇒ Boolean = (_, _) ⇒ false
|
||||
|
||||
def callReflectively(m: Method) =
|
||||
(params: SSLParameters, algorithm: String) ⇒
|
||||
try {
|
||||
m.invoke(params, algorithm)
|
||||
true
|
||||
} catch {
|
||||
case ite: InvocationTargetException ⇒ throw ite.getTargetException
|
||||
}
|
||||
|
||||
try {
|
||||
val m = classOf[SSLParameters].getMethod("setEndpointIdentificationAlgorithm", classOf[java.lang.String])
|
||||
|
||||
val candidate =
|
||||
if (m.getReturnType == Void.TYPE && m.getParameterTypes.toSeq == Seq(classOf[java.lang.String])) {
|
||||
if (!m.isAccessible) m.setAccessible(true)
|
||||
callReflectively(m)
|
||||
} else unsupported
|
||||
|
||||
// probe so that we can be sure a reflective problem only turns up once
|
||||
// here during construction
|
||||
candidate(new SSLParameters(), "https")
|
||||
candidate
|
||||
} catch {
|
||||
case NonFatal(_) ⇒ unsupported
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -49,7 +49,7 @@ private[http] abstract class SettingsCompanion[T](protected val prefix: String)
|
|||
private[http] object SettingsCompanion {
|
||||
lazy val configAdditions: Config = {
|
||||
val localHostName =
|
||||
try new InetSocketAddress(InetAddress.getLocalHost, 80).getHostStringJava6Compatible
|
||||
try new InetSocketAddress(InetAddress.getLocalHost, 80).getHostString
|
||||
catch { case NonFatal(_) ⇒ "" }
|
||||
ConfigFactory.parseMap(Map("akka.http.hostname" -> localHostName).asJava)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -4,12 +4,9 @@
|
|||
|
||||
package akka.http.impl
|
||||
|
||||
import java.net.InetSocketAddress
|
||||
|
||||
import language.implicitConversions
|
||||
import language.higherKinds
|
||||
import java.nio.charset.Charset
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
import com.typesafe.config.Config
|
||||
import akka.stream.scaladsl.{ Flow, Source }
|
||||
import akka.stream.stage._
|
||||
|
|
@ -18,7 +15,6 @@ import scala.concurrent.{ Await, Future }
|
|||
import scala.reflect.ClassTag
|
||||
import scala.util.{ Failure, Success }
|
||||
import scala.util.matching.Regex
|
||||
import akka.event.LoggingAdapter
|
||||
import akka.util.ByteString
|
||||
import akka.actor._
|
||||
|
||||
|
|
@ -40,8 +36,6 @@ package object util {
|
|||
private[http] implicit def enhanceConfig(config: Config): EnhancedConfig = new EnhancedConfig(config)
|
||||
private[http] implicit def enhanceString_(s: String): EnhancedString = new EnhancedString(s)
|
||||
private[http] implicit def enhanceRegex(regex: Regex): EnhancedRegex = new EnhancedRegex(regex)
|
||||
private[http] implicit def enhanceInetSocketAddress(address: InetSocketAddress): EnhancedInetSocketAddress =
|
||||
new EnhancedInetSocketAddress(address)
|
||||
private[http] implicit def enhanceByteStrings(byteStrings: TraversableOnce[ByteString]): EnhancedByteStringTraversableOnce =
|
||||
new EnhancedByteStringTraversableOnce(byteStrings)
|
||||
private[http] implicit def enhanceByteStringsMat[Mat](byteStrings: Source[ByteString, Mat]): EnhancedByteStringSource[Mat] =
|
||||
|
|
@ -75,7 +69,7 @@ package object util {
|
|||
private[http] def installEventStreamLoggerFor(channel: Class[_])(implicit system: ActorSystem): Unit = {
|
||||
synchronized {
|
||||
if (eventStreamLogger == null)
|
||||
eventStreamLogger = system.actorOf(Props[util.EventStreamLogger].withDeploy(Deploy.local), name = "event-stream-logger")
|
||||
eventStreamLogger = system.actorOf(Props[util.EventStreamLogger]().withDeploy(Deploy.local), name = "event-stream-logger")
|
||||
}
|
||||
system.eventStream.subscribe(eventStreamLogger, channel)
|
||||
}
|
||||
|
|
@ -178,6 +172,4 @@ package util {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
private[http] class ReadTheDocumentationException(message: String) extends RuntimeException(message)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -16,7 +16,7 @@ import akka.http.impl.engine.HttpConnectionTimeoutException
|
|||
import akka.http.impl.engine.client._
|
||||
import akka.http.impl.engine.server._
|
||||
import akka.http.impl.engine.ws.WebsocketClientBlueprint
|
||||
import akka.http.impl.util.{ Java6Compat, ReadTheDocumentationException, StreamUtils }
|
||||
import akka.http.impl.util.StreamUtils
|
||||
import akka.http.scaladsl.model._
|
||||
import akka.http.scaladsl.model.headers.Host
|
||||
import akka.http.scaladsl.model.ws.{ WebsocketUpgradeResponse, WebsocketRequest, Message }
|
||||
|
|
@ -770,12 +770,7 @@ trait DefaultSSLContextCreation {
|
|||
defaultParams.setCipherSuites(cipherSuites)
|
||||
|
||||
// hostname!
|
||||
if (!Java6Compat.trySetEndpointIdentificationAlgorithm(defaultParams, "https")) {
|
||||
log.info("Unable to use JDK built-in hostname verification, please consider upgrading your Java runtime to " +
|
||||
"a more up to date version (JDK7+). Using Typesafe ssl-config hostname verification.")
|
||||
// enabling the JDK7+ solution did not work, however this is fine since we do handle hostname
|
||||
// verification directly in SslTlsCipherActor manually by applying an ssl-config provider verifier
|
||||
}
|
||||
defaultParams.setEndpointIdentificationAlgorithm("https")
|
||||
|
||||
HttpsContext(sslContext, sslParameters = Some(defaultParams))
|
||||
}
|
||||
|
|
|
|||
|
|
@ -10,7 +10,6 @@ import java.security.MessageDigest
|
|||
import java.util
|
||||
import javax.net.ssl.SSLSession
|
||||
|
||||
import akka.event.Logging
|
||||
import akka.stream.io.ScalaSessionAPI
|
||||
|
||||
import scala.reflect.ClassTag
|
||||
|
|
@ -151,7 +150,7 @@ sealed abstract case class Expect private () extends ModeledHeader {
|
|||
// http://tools.ietf.org/html/rfc7230#section-5.4
|
||||
object Host extends ModeledCompanion[Host] {
|
||||
def apply(authority: Uri.Authority): Host = apply(authority.host, authority.port)
|
||||
def apply(address: InetSocketAddress): Host = apply(address.getHostStringJava6Compatible, address.getPort)
|
||||
def apply(address: InetSocketAddress): Host = apply(address.getHostString, address.getPort)
|
||||
def apply(host: String): Host = apply(host, 0)
|
||||
def apply(host: String, port: Int): Host = apply(Uri.Host(host), port)
|
||||
val empty = Host("")
|
||||
|
|
|
|||
|
|
@ -69,19 +69,14 @@ class TlsEndpointVerificationSpec extends AkkaSpec("""
|
|||
val ex = intercept[Exception] {
|
||||
Http().singleRequest(req).futureValue
|
||||
}
|
||||
if (Java6Compat.isJava6) {
|
||||
// our manual verification
|
||||
ex.getMessage should include("Hostname verification failed")
|
||||
} else {
|
||||
// JDK built-in verification
|
||||
val expectedMsg = "No subject alternative DNS name matching www.howsmyssl.com found"
|
||||
// JDK built-in verification
|
||||
val expectedMsg = "No subject alternative DNS name matching www.howsmyssl.com found"
|
||||
|
||||
var e: Throwable = ex
|
||||
while (e.getCause != null) e = e.getCause
|
||||
var e: Throwable = ex
|
||||
while (e.getCause != null) e = e.getCause
|
||||
|
||||
info("TLS failure cause: " + e.getMessage)
|
||||
e.getMessage should include(expectedMsg)
|
||||
}
|
||||
info("TLS failure cause: " + e.getMessage)
|
||||
e.getMessage should include(expectedMsg)
|
||||
}
|
||||
|
||||
"pass hostname verification on https://www.playframework.com/" in {
|
||||
|
|
|
|||
|
|
@ -1,34 +0,0 @@
|
|||
/*
|
||||
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.http.impl.util
|
||||
|
||||
import java.net.{ InetAddress, InetSocketAddress }
|
||||
|
||||
import org.scalatest.{ Matchers, WordSpec }
|
||||
|
||||
class EnhancedInetSocketAddressSpec extends WordSpec with Matchers {
|
||||
"getHostStringJava6Compatible" should {
|
||||
"return IPv4 address if InetSocketAddress was created with the address" in {
|
||||
val addr = likelyReverseResolvableAddress
|
||||
val socketAddress = new InetSocketAddress(addr, 80)
|
||||
socketAddress.getHostStringJava6Compatible shouldEqual addr.getHostAddress
|
||||
}
|
||||
"return host name if InetSocketAddress was created with host name" in {
|
||||
val address = new InetSocketAddress("github.com", 80)
|
||||
address.getHostStringJava6Compatible shouldEqual "github.com"
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns an InetAddress that can likely be reverse looked up, so that
|
||||
* getHostName returns a DNS address and not the IP. Unfortunately, we
|
||||
* cannot be sure that a host name was already cached somewhere in which
|
||||
* case getHostString may still return a host name even without doing
|
||||
* a reverse lookup at this time. If this start to fail non-deterministically,
|
||||
* it may be decided that this test needs to be disabled.
|
||||
*/
|
||||
def likelyReverseResolvableAddress: InetAddress =
|
||||
InetAddress.getByAddress(InetAddress.getByName("google.com").getAddress)
|
||||
}
|
||||
|
|
@ -43,7 +43,7 @@ object ExampleHttpContexts {
|
|||
context.init(null, certManagerFactory.getTrustManagers, new SecureRandom)
|
||||
|
||||
val params = new SSLParameters()
|
||||
Java6Compat.trySetEndpointIdentificationAlgorithm(params, "https")
|
||||
params.setEndpointIdentificationAlgorithm("https")
|
||||
HttpsContext(context, sslParameters = Some(params))
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -11,11 +11,9 @@ import akka.http.javadsl.server.RequestVal;
|
|||
import akka.http.javadsl.testkit.JUnitRouteTest;
|
||||
import akka.http.javadsl.testkit.TestRoute;
|
||||
import akka.japi.Option;
|
||||
import akka.japi.Pair;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.AbstractMap;
|
||||
import java.util.Map;
|
||||
|
||||
public class FormFieldsTest extends JUnitRouteTest {
|
||||
static FormField<String> stringParam = FormFields.stringValue("stringParam");
|
||||
static FormField<Byte> byteParam = FormFields.byteValue("byteParam");
|
||||
|
|
@ -33,21 +31,21 @@ public class FormFieldsTest extends JUnitRouteTest {
|
|||
static RequestVal<String> nameWithDefault = FormFields.stringValue("nameWithDefault").withDefault("John Doe");
|
||||
static RequestVal<Option<Integer>> optionalIntParam = FormFields.intValue("optionalIntParam").optional();
|
||||
|
||||
private Map.Entry<String, String> entry(String name, String value) {
|
||||
return new AbstractMap.SimpleImmutableEntry<>(name, value);
|
||||
private Pair<String, String> param(String name, String value) {
|
||||
return Pair.create(name, value);
|
||||
}
|
||||
@SafeVarargs
|
||||
final private HttpRequest urlEncodedRequest(Map.Entry<String, String>... entries) {
|
||||
final private HttpRequest urlEncodedRequest(Pair<String, String>... params) {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
boolean next = false;
|
||||
for (Map.Entry<String, String> entry: entries) {
|
||||
for (Pair<String, String> param: params) {
|
||||
if (next) {
|
||||
sb.append('&');
|
||||
}
|
||||
next = true;
|
||||
sb.append(entry.getKey());
|
||||
sb.append(param.first());
|
||||
sb.append('=');
|
||||
sb.append(entry.getValue());
|
||||
sb.append(param.second());
|
||||
}
|
||||
|
||||
return
|
||||
|
|
@ -55,7 +53,7 @@ public class FormFieldsTest extends JUnitRouteTest {
|
|||
.withEntity(MediaTypes.APPLICATION_X_WWW_FORM_URLENCODED.toContentType(HttpCharsets.UTF_8), sb.toString());
|
||||
}
|
||||
private HttpRequest singleParameterUrlEncodedRequest(String name, String value) {
|
||||
return urlEncodedRequest(entry(name, value));
|
||||
return urlEncodedRequest(param(name, value));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
|||
|
|
@ -4,7 +4,6 @@
|
|||
|
||||
package akka.http.scaladsl.coding
|
||||
|
||||
import java.lang.reflect.{ Method, InvocationTargetException }
|
||||
import java.util.zip.{ Inflater, Deflater }
|
||||
import akka.stream.stage._
|
||||
import akka.util.{ ByteStringBuilder, ByteString }
|
||||
|
|
@ -14,8 +13,6 @@ import akka.http.impl.util._
|
|||
import akka.http.scaladsl.model._
|
||||
import akka.http.scaladsl.model.headers.HttpEncodings
|
||||
|
||||
import scala.util.control.NonFatal
|
||||
|
||||
class Deflate(val messageFilter: HttpMessage ⇒ Boolean) extends Coder with StreamDecoder {
|
||||
val encoding = HttpEncodings.deflate
|
||||
def newCompressor = new DeflateCompressor
|
||||
|
|
@ -47,7 +44,10 @@ class DeflateCompressor extends Compressor {
|
|||
deflater.setInput(input.toArray)
|
||||
drainDeflater(deflater, buffer)
|
||||
}
|
||||
protected def flushWithBuffer(buffer: Array[Byte]): ByteString = DeflateCompressor.flush(deflater, buffer)
|
||||
protected def flushWithBuffer(buffer: Array[Byte]): ByteString = {
|
||||
val written = deflater.deflate(buffer, 0, buffer.length, Deflater.SYNC_FLUSH)
|
||||
ByteString.fromArray(buffer, 0, written)
|
||||
}
|
||||
protected def finishWithBuffer(buffer: Array[Byte]): ByteString = {
|
||||
deflater.finish()
|
||||
val res = drainDeflater(deflater, buffer)
|
||||
|
|
@ -72,40 +72,6 @@ class DeflateCompressor extends Compressor {
|
|||
private[http] object DeflateCompressor {
|
||||
val MinBufferSize = 1024
|
||||
|
||||
// TODO: remove reflective call once Java 6 support is dropped
|
||||
/**
|
||||
* Compatibility mode: reflectively call deflate(..., flushMode) if available or use a hack otherwise
|
||||
*/
|
||||
private[this] val flushImplementation: (Deflater, Array[Byte]) ⇒ ByteString = {
|
||||
def flushHack(deflater: Deflater, buffer: Array[Byte]): ByteString = {
|
||||
// hack: change compression mode to provoke flushing
|
||||
deflater.deflate(EmptyByteArray, 0, 0)
|
||||
deflater.setLevel(Deflater.NO_COMPRESSION)
|
||||
val res1 = drainDeflater(deflater, buffer)
|
||||
deflater.setLevel(Deflater.BEST_COMPRESSION)
|
||||
val res2 = drainDeflater(deflater, buffer)
|
||||
res1 ++ res2
|
||||
}
|
||||
def reflectiveDeflateWithSyncMode(method: Method, syncFlushConstant: Int)(deflater: Deflater, buffer: Array[Byte]): ByteString =
|
||||
try {
|
||||
val written = method.invoke(deflater, buffer, 0: java.lang.Integer, buffer.length: java.lang.Integer, syncFlushConstant: java.lang.Integer).asInstanceOf[Int]
|
||||
ByteString.fromArray(buffer, 0, written)
|
||||
} catch {
|
||||
case t: InvocationTargetException ⇒ throw t.getTargetException
|
||||
}
|
||||
|
||||
try {
|
||||
val deflateWithFlush = classOf[Deflater].getMethod("deflate", classOf[Array[Byte]], classOf[Int], classOf[Int], classOf[Int])
|
||||
require(deflateWithFlush.getReturnType == classOf[Int])
|
||||
val flushModeSync = classOf[Deflater].getField("SYNC_FLUSH").get(null).asInstanceOf[Int]
|
||||
reflectiveDeflateWithSyncMode(deflateWithFlush, flushModeSync)
|
||||
} catch {
|
||||
case NonFatal(e) ⇒ flushHack
|
||||
}
|
||||
}
|
||||
|
||||
def flush(deflater: Deflater, buffer: Array[Byte]): ByteString = flushImplementation(deflater, buffer)
|
||||
|
||||
@tailrec
|
||||
def drainDeflater(deflater: Deflater, buffer: Array[Byte], result: ByteStringBuilder = new ByteStringBuilder()): ByteString = {
|
||||
val len = deflater.deflate(buffer)
|
||||
|
|
|
|||
|
|
@ -240,7 +240,7 @@ private[transport] object NettyTransport {
|
|||
def addressFromSocketAddress(addr: SocketAddress, schemeIdentifier: String, systemName: String,
|
||||
hostName: Option[String], port: Option[Int]): Option[Address] = addr match {
|
||||
case sa: InetSocketAddress ⇒ Some(Address(schemeIdentifier, systemName,
|
||||
hostName.getOrElse(sa.getAddress.getHostAddress), port.getOrElse(sa.getPort))) // perhaps use getHostString in jdk 1.7
|
||||
hostName.getOrElse(sa.getHostString), port.getOrElse(sa.getPort)))
|
||||
case _ ⇒ None
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -183,7 +183,7 @@ object TestSubscriber {
|
|||
trait SubscriberEvent extends DeadLetterSuppression with NoSerializationVerificationNeeded
|
||||
final case class OnSubscribe(subscription: Subscription) extends SubscriberEvent
|
||||
final case class OnNext[I](element: I) extends SubscriberEvent
|
||||
final case object OnComplete extends SubscriberEvent
|
||||
case object OnComplete extends SubscriberEvent
|
||||
final case class OnError(cause: Throwable) extends SubscriberEvent {
|
||||
override def toString: String = {
|
||||
val str = new StringWriter
|
||||
|
|
@ -397,7 +397,7 @@ object TestSubscriber {
|
|||
* See also [[#expectSubscriptionAndComplete(Throwable, Boolean)]] if no demand should be signalled.
|
||||
*/
|
||||
def expectSubscriptionAndError(cause: Throwable): Self =
|
||||
expectSubscriptionAndError(cause, true)
|
||||
expectSubscriptionAndError(cause, signalDemand = true)
|
||||
|
||||
/**
|
||||
* Fluent DSL
|
||||
|
|
@ -555,8 +555,7 @@ object TestSubscriber {
|
|||
@tailrec def drain(): immutable.Seq[I] =
|
||||
self.expectEvent(deadline.timeLeft) match {
|
||||
case OnError(ex) ⇒
|
||||
// TODO once on JDK7+ this could be made an AssertionError, since it can carry ex in its cause param
|
||||
throw new AssertionError(s"toStrict received OnError(${ex.getMessage}) while draining stream! Accumulated elements: ${b.result()}")
|
||||
throw new AssertionError(s"toStrict received OnError while draining stream! Accumulated elements: ${b.result()}", ex)
|
||||
case OnComplete ⇒
|
||||
b.result()
|
||||
case OnNext(i: I @unchecked) ⇒
|
||||
|
|
|
|||
|
|
@ -7,9 +7,9 @@ import java.util.concurrent.CountDownLatch
|
|||
|
||||
import akka.stream._
|
||||
import akka.stream.scaladsl._
|
||||
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
|
||||
import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler }
|
||||
import akka.stream.testkit.Utils._
|
||||
import akka.stream.testkit.{AkkaSpec, TestPublisher, TestSubscriber}
|
||||
import akka.stream.testkit.{ AkkaSpec, TestPublisher, TestSubscriber }
|
||||
import akka.testkit.EventFilter
|
||||
|
||||
import scala.concurrent.Await
|
||||
|
|
|
|||
|
|
@ -9,10 +9,9 @@ import akka.actor.ActorSystem
|
|||
import akka.stream.impl.ActorMaterializerImpl
|
||||
import akka.stream.impl.StreamSupervisor
|
||||
import akka.stream.impl.StreamSupervisor.Children
|
||||
import akka.stream.scaladsl.{ FileIO, Sink, Source }
|
||||
import akka.stream.scaladsl.{ FileIO, Source }
|
||||
import akka.stream.testkit._
|
||||
import akka.stream.testkit.Utils._
|
||||
import akka.stream.testkit.StreamTestKit
|
||||
import akka.stream.ActorMaterializer
|
||||
import akka.stream.ActorMaterializerSettings
|
||||
import akka.stream.ActorAttributes
|
||||
|
|
|
|||
|
|
@ -3,7 +3,7 @@
|
|||
*/
|
||||
package akka.stream.impl.io
|
||||
|
||||
import java.io.{ File, RandomAccessFile }
|
||||
import java.io.File
|
||||
import java.nio.ByteBuffer
|
||||
import java.nio.channels.FileChannel
|
||||
|
||||
|
|
@ -26,8 +26,9 @@ private[akka] object FilePublisher {
|
|||
.withDeploy(Deploy.local)
|
||||
}
|
||||
|
||||
private final case object Continue extends DeadLetterSuppression
|
||||
private case object Continue extends DeadLetterSuppression
|
||||
|
||||
val Read = java.util.Collections.singleton(java.nio.file.StandardOpenOption.READ)
|
||||
}
|
||||
|
||||
/** INTERNAL API */
|
||||
|
|
@ -41,13 +42,11 @@ private[akka] final class FilePublisher(f: File, bytesReadPromise: Promise[Long]
|
|||
var readBytesTotal = 0L
|
||||
var availableChunks: Vector[ByteString] = Vector.empty // TODO possibly resign read-ahead-ing and make fusable as Stage
|
||||
|
||||
private var raf: RandomAccessFile = _
|
||||
private var chan: FileChannel = _
|
||||
|
||||
override def preStart() = {
|
||||
try {
|
||||
raf = new RandomAccessFile(f, "r") // best way to express this in JDK6, OpenOption are available since JDK7
|
||||
chan = raf.getChannel
|
||||
chan = FileChannel.open(f.toPath, FilePublisher.Read)
|
||||
} catch {
|
||||
case ex: Exception ⇒
|
||||
onErrorThenStop(ex)
|
||||
|
|
@ -80,7 +79,7 @@ private[akka] final class FilePublisher(f: File, bytesReadPromise: Promise[Long]
|
|||
}
|
||||
|
||||
/** BLOCKING I/O READ */
|
||||
@tailrec final def readAhead(maxChunks: Int, chunks: Vector[ByteString]): Vector[ByteString] =
|
||||
@tailrec def readAhead(maxChunks: Int, chunks: Vector[ByteString]): Vector[ByteString] =
|
||||
if (chunks.size <= maxChunks && isActive) {
|
||||
(try chan.read(buf) catch { case NonFatal(ex) ⇒ onErrorThenStop(ex); Int.MinValue }) match {
|
||||
case -1 ⇒ // EOF
|
||||
|
|
@ -98,13 +97,12 @@ private[akka] final class FilePublisher(f: File, bytesReadPromise: Promise[Long]
|
|||
}
|
||||
} else chunks
|
||||
|
||||
private final def eofEncountered: Boolean = eofReachedAtOffset != Long.MinValue
|
||||
private def eofEncountered: Boolean = eofReachedAtOffset != Long.MinValue
|
||||
|
||||
override def postStop(): Unit = {
|
||||
super.postStop()
|
||||
bytesReadPromise.trySuccess(readBytesTotal)
|
||||
|
||||
try if (chan ne null) chan.close()
|
||||
finally if (raf ne null) raf.close()
|
||||
if (chan ne null) chan.close()
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -3,8 +3,9 @@
|
|||
*/
|
||||
package akka.stream.impl.io
|
||||
|
||||
import java.io.{ File, RandomAccessFile }
|
||||
import java.io.File
|
||||
import java.nio.channels.FileChannel
|
||||
import java.util.Collections
|
||||
|
||||
import akka.actor.{ Deploy, ActorLogging, Props }
|
||||
import akka.stream.actor.{ ActorSubscriberMessage, WatermarkRequestStrategy }
|
||||
|
|
@ -19,6 +20,9 @@ private[akka] object FileSubscriber {
|
|||
Props(classOf[FileSubscriber], f, completionPromise, bufSize, append).withDeploy(Deploy.local)
|
||||
}
|
||||
|
||||
import java.nio.file.StandardOpenOption._
|
||||
val Write = Collections.singleton(WRITE)
|
||||
val Append = Collections.singleton(APPEND)
|
||||
}
|
||||
|
||||
/** INTERNAL API */
|
||||
|
|
@ -28,17 +32,13 @@ private[akka] class FileSubscriber(f: File, bytesWrittenPromise: Promise[Long],
|
|||
|
||||
override protected val requestStrategy = WatermarkRequestStrategy(highWatermark = bufSize)
|
||||
|
||||
private var raf: RandomAccessFile = _
|
||||
private var chan: FileChannel = _
|
||||
|
||||
private var bytesWritten: Long = 0
|
||||
|
||||
override def preStart(): Unit = try {
|
||||
raf = new RandomAccessFile(f, "rw") // best way to express this in JDK6, OpenOption are available since JDK7
|
||||
chan = raf.getChannel
|
||||
|
||||
// manually supporting appending to files - in Java 7 we could use OpenModes: FileChannel.open(f, openOptions.asJava)
|
||||
if (append) chan.position(chan.size())
|
||||
val openOptions = if (append) FileSubscriber.Append else FileSubscriber.Write
|
||||
chan = FileChannel.open(f.toPath, openOptions)
|
||||
|
||||
super.preStart()
|
||||
} catch {
|
||||
|
|
@ -75,7 +75,6 @@ private[akka] class FileSubscriber(f: File, bytesWrittenPromise: Promise[Long],
|
|||
bytesWrittenPromise.trySuccess(bytesWritten)
|
||||
|
||||
if (chan ne null) chan.close()
|
||||
if (raf ne null) raf.close()
|
||||
super.postStop()
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -239,7 +239,7 @@ object Source {
|
|||
*/
|
||||
def repeat[T](element: T): Source[T, Unit] = {
|
||||
val next = Some((element, element))
|
||||
unfold(element)(_ => next).withAttributes(DefaultAttributes.repeat)
|
||||
unfold(element)(_ ⇒ next).withAttributes(DefaultAttributes.repeat)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue