Merge pull request #17282 from spray/w/17278-change-to-javadsl-in-java-model
!htc #17278 replace remaining usages of scaladsl.Source in akka.http.model.japi
This commit is contained in:
commit
0ac7900e9e
8 changed files with 20 additions and 28 deletions
|
|
@ -7,7 +7,7 @@ package akka.http.model.japi;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
|
|
||||||
import akka.util.ByteString;
|
import akka.util.ByteString;
|
||||||
import akka.stream.scaladsl.Source;
|
import akka.stream.javadsl.Source;
|
||||||
import akka.http.model.HttpEntity$;
|
import akka.http.model.HttpEntity$;
|
||||||
|
|
||||||
/** Constructors for HttpEntity instances */
|
/** Constructors for HttpEntity instances */
|
||||||
|
|
@ -43,20 +43,20 @@ public final class HttpEntities {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static HttpEntityDefault create(ContentType contentType, long contentLength, Source<ByteString, Object> data) {
|
public static HttpEntityDefault create(ContentType contentType, long contentLength, Source<ByteString, Object> data) {
|
||||||
return new akka.http.model.HttpEntity.Default((akka.http.model.ContentType) contentType, contentLength, data);
|
return new akka.http.model.HttpEntity.Default((akka.http.model.ContentType) contentType, contentLength, data.asScala());
|
||||||
}
|
}
|
||||||
|
|
||||||
public static HttpEntityCloseDelimited createCloseDelimited(ContentType contentType, Source<ByteString, Object> data) {
|
public static HttpEntityCloseDelimited createCloseDelimited(ContentType contentType, Source<ByteString, Object> data) {
|
||||||
return new akka.http.model.HttpEntity.CloseDelimited((akka.http.model.ContentType) contentType, data);
|
return new akka.http.model.HttpEntity.CloseDelimited((akka.http.model.ContentType) contentType, data.asScala());
|
||||||
}
|
}
|
||||||
|
|
||||||
public static HttpEntityIndefiniteLength createIndefiniteLength(ContentType contentType, Source<ByteString, Object> data) {
|
public static HttpEntityIndefiniteLength createIndefiniteLength(ContentType contentType, Source<ByteString, Object> data) {
|
||||||
return new akka.http.model.HttpEntity.IndefiniteLength((akka.http.model.ContentType) contentType, data);
|
return new akka.http.model.HttpEntity.IndefiniteLength((akka.http.model.ContentType) contentType, data.asScala());
|
||||||
}
|
}
|
||||||
|
|
||||||
public static HttpEntityChunked createChunked(ContentType contentType, Source<ByteString, Object> data) {
|
public static HttpEntityChunked createChunked(ContentType contentType, Source<ByteString, Object> data) {
|
||||||
return akka.http.model.HttpEntity.Chunked$.MODULE$.fromData(
|
return akka.http.model.HttpEntity.Chunked$.MODULE$.fromData(
|
||||||
(akka.http.model.ContentType) contentType,
|
(akka.http.model.ContentType) contentType,
|
||||||
data);
|
data.asScala());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -5,7 +5,7 @@
|
||||||
package akka.http.model.japi;
|
package akka.http.model.japi;
|
||||||
|
|
||||||
import akka.http.model.HttpEntity$;
|
import akka.http.model.HttpEntity$;
|
||||||
import akka.stream.scaladsl.Source;
|
import akka.stream.javadsl.Source;
|
||||||
import akka.util.ByteString;
|
import akka.util.ByteString;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -4,7 +4,7 @@
|
||||||
|
|
||||||
package akka.http.model.japi;
|
package akka.http.model.japi;
|
||||||
|
|
||||||
import akka.stream.scaladsl.Source;
|
import akka.stream.javadsl.Source;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Represents an entity transferred using `Transfer-Encoding: chunked`. It consists of a
|
* Represents an entity transferred using `Transfer-Encoding: chunked`. It consists of a
|
||||||
|
|
|
||||||
|
|
@ -4,14 +4,9 @@
|
||||||
|
|
||||||
package akka.http.model.japi;
|
package akka.http.model.japi;
|
||||||
|
|
||||||
import akka.util.ByteString;
|
|
||||||
import akka.stream.scaladsl.Source;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Represents an entity without a predetermined content-length. Its length is implicitly
|
* Represents an entity without a predetermined content-length. Its length is implicitly
|
||||||
* determined by closing the underlying connection. Therefore, this entity type is only
|
* determined by closing the underlying connection. Therefore, this entity type is only
|
||||||
* available for Http responses.
|
* available for Http responses.
|
||||||
*/
|
*/
|
||||||
public abstract class HttpEntityCloseDelimited implements ResponseEntity {
|
public abstract class HttpEntityCloseDelimited implements ResponseEntity {}
|
||||||
public abstract Source<ByteString, ?> data();
|
|
||||||
}
|
|
||||||
|
|
|
||||||
|
|
@ -4,13 +4,9 @@
|
||||||
|
|
||||||
package akka.http.model.japi;
|
package akka.http.model.japi;
|
||||||
|
|
||||||
import akka.util.ByteString;
|
|
||||||
import akka.stream.scaladsl.Source;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The default entity type which has a predetermined length and a stream of data bytes.
|
* The default entity type which has a predetermined length and a stream of data bytes.
|
||||||
*/
|
*/
|
||||||
public abstract class HttpEntityDefault implements BodyPartEntity, RequestEntity, ResponseEntity {
|
public abstract class HttpEntityDefault implements BodyPartEntity, RequestEntity, ResponseEntity {
|
||||||
public abstract long contentLength();
|
public abstract long contentLength();
|
||||||
public abstract Source<ByteString, ?> data();
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -4,12 +4,7 @@
|
||||||
|
|
||||||
package akka.http.model.japi;
|
package akka.http.model.japi;
|
||||||
|
|
||||||
import akka.util.ByteString;
|
|
||||||
import akka.stream.scaladsl.Source;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Represents an entity without a predetermined content-length to use in a BodyParts.
|
* Represents an entity without a predetermined content-length to use in a BodyParts.
|
||||||
*/
|
*/
|
||||||
public abstract class HttpEntityIndefiniteLength implements BodyPartEntity {
|
public abstract class HttpEntityIndefiniteLength implements BodyPartEntity {}
|
||||||
public abstract Source<ByteString, ?> data();
|
|
||||||
}
|
|
||||||
|
|
@ -14,6 +14,7 @@ import akka.util.ByteString
|
||||||
import akka.stream.OperationAttributes._
|
import akka.stream.OperationAttributes._
|
||||||
import akka.stream.FlowMaterializer
|
import akka.stream.FlowMaterializer
|
||||||
import akka.stream.scaladsl._
|
import akka.stream.scaladsl._
|
||||||
|
import akka.stream
|
||||||
import akka.stream.TimerTransformer
|
import akka.stream.TimerTransformer
|
||||||
import akka.http.util._
|
import akka.http.util._
|
||||||
import japi.JavaMapping.Implicits._
|
import japi.JavaMapping.Implicits._
|
||||||
|
|
@ -80,7 +81,7 @@ sealed trait HttpEntity extends japi.HttpEntity {
|
||||||
def withContentType(contentType: ContentType): HttpEntity
|
def withContentType(contentType: ContentType): HttpEntity
|
||||||
|
|
||||||
/** Java API */
|
/** Java API */
|
||||||
def getDataBytes: Source[ByteString, _] = dataBytes
|
def getDataBytes: stream.javadsl.Source[ByteString, _] = stream.javadsl.Source.adapt(dataBytes)
|
||||||
|
|
||||||
// default implementations, should be overridden
|
// default implementations, should be overridden
|
||||||
def isCloseDelimited: Boolean = false
|
def isCloseDelimited: Boolean = false
|
||||||
|
|
@ -277,7 +278,7 @@ object HttpEntity {
|
||||||
override def productPrefix = "HttpEntity.Chunked"
|
override def productPrefix = "HttpEntity.Chunked"
|
||||||
|
|
||||||
/** Java API */
|
/** Java API */
|
||||||
def getChunks: Source[japi.ChunkStreamPart, Any] = chunks.asInstanceOf[Source[japi.ChunkStreamPart, Any]]
|
def getChunks: stream.javadsl.Source[japi.ChunkStreamPart, Any] = stream.javadsl.Source.adapt(chunks)
|
||||||
}
|
}
|
||||||
object Chunked {
|
object Chunked {
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -433,7 +433,7 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec {
|
||||||
pushInput(closeFrame(Protocol.CloseCodes.Regular, mask = true))
|
pushInput(closeFrame(Protocol.CloseCodes.Regular, mask = true))
|
||||||
messageIn.expectComplete()
|
messageIn.expectComplete()
|
||||||
|
|
||||||
netIn.expectNoMsg(1.second) // especially the cancellation not yet
|
netIn.expectNoMsg(100.millis) // especially the cancellation not yet
|
||||||
expectNoNetworkData()
|
expectNoNetworkData()
|
||||||
messageOutSub.sendComplete()
|
messageOutSub.sendComplete()
|
||||||
|
|
||||||
|
|
@ -478,6 +478,7 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec {
|
||||||
netOut.expectComplete()
|
netOut.expectComplete()
|
||||||
}
|
}
|
||||||
"after receiving regular close frame when fragmented message is still open" in pendingUntilFixed {
|
"after receiving regular close frame when fragmented message is still open" in pendingUntilFixed {
|
||||||
|
pending
|
||||||
new ServerTestSetup {
|
new ServerTestSetup {
|
||||||
netOutSub.request(10)
|
netOutSub.request(10)
|
||||||
messageInSub.request(10)
|
messageInSub.request(10)
|
||||||
|
|
@ -534,7 +535,7 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec {
|
||||||
messageOutSub.sendComplete()
|
messageOutSub.sendComplete()
|
||||||
expectCloseCodeOnNetwork(Protocol.CloseCodes.Regular)
|
expectCloseCodeOnNetwork(Protocol.CloseCodes.Regular)
|
||||||
|
|
||||||
netOut.expectNoMsg(1.second) // wait for peer to close regularly
|
netOut.expectNoMsg(100.millis) // wait for peer to close regularly
|
||||||
pushInput(closeFrame(Protocol.CloseCodes.Regular, mask = true))
|
pushInput(closeFrame(Protocol.CloseCodes.Regular, mask = true))
|
||||||
|
|
||||||
messageIn.expectComplete()
|
messageIn.expectComplete()
|
||||||
|
|
@ -562,7 +563,7 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec {
|
||||||
dataSub.sendComplete()
|
dataSub.sendComplete()
|
||||||
expectFrameOnNetwork(Opcode.Continuation, ByteString.empty, fin = true)
|
expectFrameOnNetwork(Opcode.Continuation, ByteString.empty, fin = true)
|
||||||
expectCloseCodeOnNetwork(Protocol.CloseCodes.Regular)
|
expectCloseCodeOnNetwork(Protocol.CloseCodes.Regular)
|
||||||
netOut.expectNoMsg(1.second) // wait for peer to close regularly
|
netOut.expectNoMsg(100.millis) // wait for peer to close regularly
|
||||||
|
|
||||||
val mask = Random.nextInt()
|
val mask = Random.nextInt()
|
||||||
pushInput(closeFrame(Protocol.CloseCodes.Regular, mask = true))
|
pushInput(closeFrame(Protocol.CloseCodes.Regular, mask = true))
|
||||||
|
|
@ -596,6 +597,8 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec {
|
||||||
"reason is no valid utf8 data" in pending
|
"reason is no valid utf8 data" in pending
|
||||||
}
|
}
|
||||||
"timeout if user handler closes and peer doesn't send a close frame" in new ServerTestSetup {
|
"timeout if user handler closes and peer doesn't send a close frame" in new ServerTestSetup {
|
||||||
|
override protected def closeTimeout: FiniteDuration = 100.millis
|
||||||
|
|
||||||
netInSub.expectRequest()
|
netInSub.expectRequest()
|
||||||
messageOutSub.sendComplete()
|
messageOutSub.sendComplete()
|
||||||
expectCloseCodeOnNetwork(Protocol.CloseCodes.Regular)
|
expectCloseCodeOnNetwork(Protocol.CloseCodes.Regular)
|
||||||
|
|
@ -604,6 +607,8 @@ class MessageSpec extends FreeSpec with Matchers with WithMaterializerSpec {
|
||||||
netInSub.expectCancellation()
|
netInSub.expectCancellation()
|
||||||
}
|
}
|
||||||
"timeout after we close after error and peer doesn't send a close frame" in new ServerTestSetup {
|
"timeout after we close after error and peer doesn't send a close frame" in new ServerTestSetup {
|
||||||
|
override protected def closeTimeout: FiniteDuration = 100.millis
|
||||||
|
|
||||||
netInSub.expectRequest()
|
netInSub.expectRequest()
|
||||||
|
|
||||||
pushInput(frameHeader(Opcode.Binary, 0, fin = true, rsv1 = true))
|
pushInput(frameHeader(Opcode.Binary, 0, fin = true, rsv1 = true))
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue