文件

§串流遷移指南

Play 2.5 對串流資料和回應主體的方式進行了多項重大變更。

  1. Play 2.5 使用Akka 串流進行串流。Play 的先前版本使用迭代器進行串流,以及其他幾種類型的串流,例如 WebSocketChunks 等。

    變更為使用 Akka 串流有兩個主要好處。首先,Java 使用者現在可以存取 Play 的完整功能集,例如撰寫主體剖析器和篩選器。其次,串流程式庫現在在 Play 中更加一致。

  2. Play 2.5 使用 ByteString 來存放位元組封包。先前,Play 使用位元組陣列 (byte[]/ArrayByte) 來存放位元組。

    ByteString 類別與 Java 的 String 類別一樣是不可變的,因此使用上更安全且容易。與 String 類別一樣,在建構時會複製資料,因此效能會稍微下降,但這點會被其便宜的串接和子字串操作所平衡。

  3. Play 2.5 有新的 HttpEntity 類型來處理回應主體。先前,回應主體是位元組的純粹串流。現在,HTTP 主體是 HttpEntity 的一種類型:StrictStreamedChunked

    透過告訴 Play 要使用哪種類型的實體,應用程式可以更進一步控制 Play 傳送 HTTP 回應的方式。這也讓 Play 更容易最佳化傳送主體的方式。

§變更摘要

Play API 的下列部分已更新

下列類型已變更

目的 舊類型 新類型
存放位元組 byte[]/Array[Byte] ByteString
產生串流 EnumeratorWebSocket.OutChunks.OutEventSource.Out Source
將串流轉換為另一個串流 Enumeratee Flow
將串流轉換為單一值 Iteratee Accumulator
使用串流 Iteratee Sink

§如何遷移 (依 API)

下列部分概述如何遷移使用 API 不同部分的程式碼。

§將分塊結果移轉 (chunkedResults.Chunked)

在 Play 2.4 中,你會使用 Enumerator 在 Scala 中建立分塊結果,並使用 Results.Chunked 物件在 Java 中建立分塊結果。在 Play 2.5 中,API 的這些部分仍然可用,但已標示為不建議使用。

如果你選擇移轉到新的 API,你可以呼叫 StatusHeader 物件上的 chunked 方法來建立分塊結果,並提供 Akka Streams Source 物件作為分塊串流。

進階使用者可能偏好明確建立 HttpEntity.Chunked 物件,並將其傳遞到 Result 物件建構函式中。

§將串流結果移轉 (feedstream) (僅限 Scala)

在 Play 2.4 中,Scala 使用者可以透過將 Enumerator 傳遞到 feedstream 方法來串流結果。(Java 使用者沒有串流結果的方法,除了分塊結果之外。) feed 方法會串流 Enumerator 的資料,然後關閉連線。stream 方法會串流或分塊結果,並可能關閉連線,這取決於連線的 HTTP 版本,以及是否存在 Content-Length 標頭。

在 Play 2.5 中,stream 方法已移除,feed 方法已標示為不建議使用。你可以選擇是否將 feed 方法移轉到新的 API。如果你使用 stream 方法,你的程式碼將需要變更。

新 API 是直接建立一個 Result 物件,並選擇一個 HttpEntity 來表示其主體。對於串流結果,您可以使用 HttpEntity.Streamed 類別。Streamed 類別將 Source 作為主體,並將 Content-Length 標頭值設為選用。Source 的內容將傳送給用戶端。如果實體具有 Content-Length 標頭,則連線將保持開啟,否則會關閉以表示串流結束。

§移轉 WebSocket (WebSocket)

在 Play 2.4 中,WebSocket 的雙向串流在 Java 中以一對 WebSocket.InWebSocket.Out 物件表示,在 Scala 中則以一對 EnumeratorIteratee 物件表示。在 Play 2.5 中,Java 和 Scala 現在都使用 Akka Streams Flow 來表示雙向串流。

若要在 Play 2.5 中移轉您的 WebSocket 程式碼,您有兩個選項。

第一個選項是使用舊的 Play API,該 API 已棄用並重新命名為 LegacyWebSocket。這是最簡單的選項。您只需要將參考 WebSocket 的程式碼變更為參考 LegacyWebSocket 即可。LegacyWebSocket 類別提供從 Play 2.4 到 Play 2.5 的簡易移轉路徑。

第二個選項是變更為新的 Play API。為此,您需要將 WebSocket 程式碼變更為使用 Akka Streams Flow 物件。

§移轉 Scala WebSocket

Play 2.4 Scala WebSocket API 需要一個 Enumerator/Iteratee 對,該對會產生 In 物件並使用 Out 物件。一對 FrameFormatter 負責將資料從 InOut 物件中取出。

case class WebSocket[In, Out](f: RequestHeader => Future[Either[Result, (Enumerator[In], Iteratee[Out, Unit]) => Unit]])(implicit val inFormatter: WebSocket.FrameFormatter[In], val outFormatter: WebSocket.FrameFormatter[Out]) extends Handler {
trait FrameFormatter[A] {
  def transform[B](fba: B => A, fab: A => B): FrameFormatter[B]
}

Play 2.5 Scala WebSocket API 是圍繞著 MessageFlow 所建構的。Message 代表 WebSocket 框架MessageFlowTransformer 類型處理將高階物件(例如 JSON、XML 和位元組)轉換成 Message 框架。提供一組內建的隱式 MessageFlowTransformer,您也可以撰寫自己的 MessageFlowTransformer

trait WebSocket extends Handler {
  def apply(request: RequestHeader): Future[Either[Result, Flow[Message, Message, _]]]
}
sealed trait Message
case class TextMessage(data: String) extends Message
case class BinaryMessage(data: ByteString) extends Message
case class CloseMessage(statusCode: Option[Int] = Some(CloseCodes.Regular), reason: String = "") extends Message
case class PingMessage(data: ByteString) extends Message
case class PongMessage(data: ByteString) extends Message
trait MessageFlowTransformer[+In, -Out] { self =>
  def transform(flow: Flow[In, Out, _]): Flow[Message, Message, _]
}

若要進行移轉,您需要將雙向 Enumerator/Iteratee 串流轉換成 Flow。您可能還需要使用 MessageFlowTransformer 將您的 In/Out 物件轉換成 Message,不過對於 JSON 等常見類型來說,這是沒有必要的,因為提供了一些內建的隱式轉換。

§移轉 Java WebSockets

Play 2.4 Java WebSocket API 使用 WebSocket.In 物件來處理傳入訊息,並使用 WebSocket.Out 物件來傳送傳出訊息。該 API 支援傳輸文字、位元組或 JSON 框架的 WebSockets。

return WebSocket.whenReady((in, out) -> {
    out.write("Hello!");
    out.close();
});

新的 Play 2.5 API 強大許多。您現在可以建立 WebSocket,並傳回任意 WebSocket Message 框架。雙向 Message 串流表示為 Flow

public abstract class WebSocket {
    public abstract CompletionStage<F.Either<Result, Flow<Message, Message, ?>>> apply(Http.RequestHeader request);
}

如果您想將 WebSocket Message 框架轉換成您自己的類型,可以使用 MappedWebSocketAcceptor 類別。為您提供了幾個這樣的類別:TextBinaryJson。例如

return WebSocket.Text.accept(requestHeader -> {
  // return a Flow<String, String, ?>
})

您也可以透過定義如何轉換傳入傳出訊息來建立自己的 MappedWebSocketAcceptor

§移轉 Comet

要在 Play 中使用 [Comet](https://en.wikipedia.org/wiki/Comet_(programming)),您需要產生一個分塊的 HTTP 回應,其中包含特別格式化的分塊。Play 有 Comet 類別,用於協助在伺服器上產生可傳送至瀏覽器的事件。在 Play 2.4.x 中,必須建立新的 Comet 實例,並為 Java 使用回呼,而 Scala 則使用枚舉。在 Play 2.5 中,新增了基於 Akka Streams 的新 API。

§移轉 Java Comet

為您的物件建立 Akka Streams 來源,並將它們轉換成 StringJsonNode 物件。從那裡,您可以使用 play.libs.Comet.stringplay.libs.Comet.json 將您的物件轉換成適合 Results.ok().chunked() 的格式。在 JavaComet 中有其他文件。

由於 Java Comet 輔助程式是圍繞回呼建立的,因此將基於回呼的類別直接轉換成 org.reactivestreams.Publisher 並使用 Source.fromPublisher 建立來源可能會比較容易。

§移轉 Scala Comet

為您的物件建立 Akka Streams 來源,並將它們轉換成 StringJsValue 物件。從那裡,您可以使用 play.api.libs.Comet.stringplay.api.libs.Comet.json 將您的物件轉換成適合 Ok.chunked() 的格式。在 ScalaComet 中有其他文件。

§移轉伺服器傳送事件 (EventSource)

要使用 Server-Sent Events,您需要產生一個具有特別格式區塊的區塊 HTTP 回應。Play 有個 EventSource 介面,用於協助在伺服器上產生事件,這些事件可傳送至瀏覽器。在 Play 2.4 中,Java 和 Scala 各有非常不同的 API,但在 Play 2.5 中,它們已變更,因此都基於 Akka Streams。

§遷移 Java Server-Sent 事件

在 Play 2.4 的 Java API 中,您可以使用 EventSource 產生區塊串流,這是一個延伸 Chunks<String> 的類別。您可以從字串或 JSON 物件建構 Event 物件,然後透過呼叫 EventSourcesend 方法將它們傳送至回應中。

EventSource eventSource = new EventSource() {
    @Override
    public void onConnected() {
        send(Event.event("hello"));
        send(Event.event("world"));
        ...
    }
};
return ok(eventSource);

在 Play 2.5 中,您通常會為應用程式物件建立一個 Akka Streams Source,使用 Source.map 將物件轉換成 Event,最後使用 EventSource.chunkedEvent 轉換成區塊值。以下範例顯示如何將其用於傳送字串串流。

Source<String, ?> stringSource = ...;
Source<EventSource.Event, ?> eventSource = myStrings.map(Event::event);
return ok().chunked(EventSource.chunked(eventSource)).as("text/event-stream");

如果您仍想使用與 Play 2.4 相同的 API,您可以使用 LegacyEventSource 類別。此類別與 Play 2.4 API 相同,但已重新命名並標示為不建議使用。如果您想使用新的 API,但保留與舊命令式 API 相同的感覺,您可以嘗試 GraphStage

§遷移 Scala Server-Sent 事件

要使用 Play 2.4 的 Scala API,您可以提供一個應用程式物件的 Enumerator,然後使用 EventSource Enumeratee 將它們轉換成 Event。最後,您將 Event 傳遞給 chunked 方法,將它們轉換成區塊。

val someDataStream: Enumerator[SomeData] = ???
Ok.chunked(someDataStream &> EventSource())

在 Play 2.5 中,使用 EventSource 搭配 EnumeratorEnumeratee 已被棄用。您仍然可以使用 EnumeratorEnumeratee,但建議您將程式碼轉換為使用 SourceFlowSource 會產生物件串流,而 EventSource.flowFlow 會將它們轉換成 Event。例如,上述程式碼會改寫成

val someDataStream: Source[SomeData, Unit] = ???
Ok.chunked(someDataStream via EventSource.flow).as("text/event-stream")

§遷移自訂動作 (EssentialAction)(僅限 Scala)

大多數 Scala 使用者會為其動作使用 Action 類別。Action 類別是一種 EssentialAction,它會在執行其邏輯並傳送結果之前,總是完整地解析其主體。有些使用者可能已撰寫自己的自訂 EssentialAction,以便執行諸如遞增處理請求主體等動作。

如果您在 Play 2.4 應用程式中只使用一般的 Action,則它們不需要任何遷移。但是,如果您已撰寫 EssentialAction,則需要將它遷移到 Play 2.5 中的新 API。EssentialAction 的行為仍然相同,但簽章已從 Play 2.4

trait EssentialAction extends (RequestHeader => Iteratee[Array[Byte], Result])

變更為 Play 2.5 中的新簽章

trait EssentialAction extends (RequestHeader => Accumulator[ByteString, Result])

要進行遷移,您需要將 Iteratee 取代為 Accumulator,並將 Array[Byte] 取代為 ByteString

§移轉自訂主體剖析器 (BodyParser)(僅限 Scala)

如果您是 Scala 使用者,且在 Play 2.4 應用程式中擁有自訂 BodyParser,則需要將其移轉至新的 Play 2.5 API。BodyParser 特質簽章在 Play 2.4 中如下所示

trait BodyParser[+A] extends (RequestHeader => Iteratee[Array[Byte], Either[Result, A]])

在 Play 2.5 中,它已變更為使用 Akka Streams 類型

trait BodyParser[+A] extends (RequestHeader => Accumulator[ByteString, Either[Result, A]])

要進行遷移,您需要將 Iteratee 取代為 Accumulator,並將 Array[Byte] 取代為 ByteString

§移轉 Result 主體(僅限 Scala)

Result 物件已變更其表示結果主體和連線關閉標記的方式。它不再採用 body: Enumerator[Array[Byte]], connection: Connection,而是採用 body: HttpEntityHttpEntity 類型包含有關主體的資訊,以及有關如何關閉連線的隱含資訊。

您可以透過使用包含 Source 以及選用 Content-LengthContent-Type 標頭的 Streamed 實體,來移轉現有的 Enumerator

val bodyPublisher: Publisher[ByteString] = Streams.enumeratorToPublisher(bodyEnumerator)
val bodySource: Source[ByteString, _] = Source.fromPublisher(bodyPublisher)
val entity: HttpEntity = HttpEntity.Streamed(bodySource)
new Result(headers, entity)

請參閱有關移轉 Enumerator 和移轉至 ByteString 的區段,以取得有關移轉至這些類型的更多資訊。

您可能會發現您根本不需要 Result 主體的串流。如果是這樣,您可能想對主體使用 Strict 實體。

new Result(headers, HttpEntity.Strict(bytes))

§如何移轉(依類型)

本區段說明如何將您的位元組陣列和串流移轉至新的 Akka Streams API。

Akka Streams 是 Akka 專案的一部分。Play 使用 Akka Streams 提供串流功能:傳送和接收位元組序列和其他物件。Akka 專案有許多關於 Akka Streams 的良好文件。在 Play 中開始使用 Akka Streams 之前,值得查看 Akka Streams 文件,以了解有哪些資訊可用。

可以在主要 Akka API 文件中的 akka.stream 套件中找到 API 文件

在您第一次開始使用 Akka Streams 時,Akka 文件中的基礎和使用流程部分值得一看。它將向您介紹 Akka Streams API 中最重要的部分。

您不必一次轉換整個應用程式。應用程式的部分可以繼續使用迭代器,而其他部分則使用 Akka Streams。Akka Streams 提供 反應式串流 實作,而 Play 的迭代器函式庫也提供反應式串流實作,因此,Play 的迭代器可以輕鬆封裝在 Akka Streams 中,反之亦然。

§將位元組陣列 (byte[]/Array[Byte]) 移轉到 ByteString

請參閱 JavaScala API 文件以取得 ByteString

範例

Scala

// Get the empty ByteString (this instance is cached)
ByteString.empty
// Create a ByteString from a String
ByteString("hello")
ByteString.fromString("hello")
// Create a ByteString from an Array[Byte]
ByteString(arr)
ByteString.fromArray(arr)

Java

// Get the empty ByteString (this instance is cached)
ByteString.empty();
// Create a ByteString from a String
ByteString.fromString("hello");
// Create a ByteString from an Array[Byte]
ByteString.fromArray(arr);

§*.Out 移轉到 Source

Play 現在使用 Source 來產生事件,而不是舊有的 WebSocket.OutChunks.OutEventSource.Out 類別。這些類別雖然使用簡單,但缺乏彈性,而且沒有正確實作 背壓

你可以用任何產生串流的 Source 取代你的 *.Out 類別。有許多方法可以建立 Source (Java/Scala.

如果你想用一個簡單的物件取代你的 *.Out,你可以寫入訊息並關閉它,而不必擔心背壓,那麼你可以使用 Source.actorRef 方法

Java

Source<ByteString, ?> source = Source.<ByteString>actorRef(256, OverflowStrategy.dropNew())
  .mapMaterializedValue(sourceActor -> {
    sourceActor.tell(ByteString.fromString("hello"), null);
    sourceActor.tell(ByteString.fromString("world"), null);
    sourceActor.tell(new Status.Success(NotUsed.getInstance()), null);
    return null;
  });

Scala

val source = Source.actorRef[ByteString](256, OverflowStrategy.dropNew).mapMaterializedValue { sourceActor =>
  sourceActor ! ByteString("hello")
  sourceActor ! ByteString("world")
  sourceActor ! Status.Success(()) // close the source
}

§Enumerator 遷移到 Source

Play 在許多地方使用 Enumerator 來產生值串流。

步驟 1:使用過渡 API(如果可用)

如果你使用 Results.chunkedResults.feed,你可以繼續使用現有的方法。這些方法已經標示為已過時,所以你可能還是想變更你的程式碼。

步驟 2:使用轉接器將 Enumerator 轉換為 Source

你可以先使用 Streams.enumeratorToPublisher 將現有的 Enumerator 轉換為 Reactive Streams Publisher,然後再使用 Source.fromPublisher 將 Publisher 轉換為 Source,例如

val enumerator: Enumerator[T] = ...
val source = Source.fromPublisher(Streams.enumeratorToPublisher(enumerator))

步驟 3:(選用)改寫為 Source

以下是 Enumerator 工廠方法的一些常見對應

Iteratee Akka Streams 備註
Enumerator.apply(a) Source.single(a)
Enumerator.apply(a, b) Source.apply(List(a, b)))
Enumerator.enumerate(seq) Source.apply(seq) seq 必須是不可變的
Enumerator.repeat Source.repeat 在 Akka Streams 中,重複的元素並非每次都經過評估
Enumerator.empty Source.empty
Enumerator.unfold Source.unfold
Enumerator.generateM Source.unfoldAsync
Enumerator.fromStream StreamConverters.fromInputStream
Enumerator.fromFile StreamConverters.fromInputStream 您必須為 java.io.File 建立一個 InputStream

§Iteratee 遷移至 SinkAccumulator

步驟 1:使用轉接器進行轉換

您可以使用 Streams.iterateeToSubscriber 將現有的 Iteratee 轉換為 Reactive Streams Subscriber,然後再使用 Sink.fromSubscriber 將訂閱者轉換為接收器,例如

val iteratee: Iteratee[T, U] = ...
val (subscriber, resultIteratee) = Streams.iterateeToSubscriber(iteratee)
val sink = Sink.fromSubscriber(subscriber)

如果您需要傳回 Accumulator,您可以改用 Streams.iterateeToAccumulator 方法。

步驟 2:(選用)改寫為 Sink

以下是常見的 Iteratee 工廠方法對應的清單

Iteratee Akka Streams 備註
Iteratee.fold Sink.fold
Iteratee.head Sink.headOption
Iteratee.getChunks Sink.seq
Iteratee.foreach Sink.foreach
Iteratee.ignore Sink.ignore
Done Sink.cancelled 可以對已具體化的值進行對應,以產生結果,或者如果使用累加器,則可以使用 Accumulator.done 取代。

§Enumeratee 遷移至 Processor

步驟 1:使用轉接器進行轉換

你可以使用 Streams.enumerateeToProcessor 將現有的 Enumeratee 轉換為 reactive streams Processor,然後使用 Flow.fromProcessor 將處理器轉換為流程,例如

val enumeratee: Enumeratee[A, B] = ...
val flow = Flow.fromProcessor(() => Streams.enumerateeToProcessor(enumeratee))

步驟 2:(選用)改寫為 Flow

以下列出一些列舉工廠方法的常見對應

Iteratee Akka Streams 備註
Enumeratee.map Flow.map
Enumeratee.mapM Flow.mapAsync 你必須在 Akka Streams 中指定並行度,即一次並行對應的元素數量。
Enumeratee.mapConcat Flow.mapConcat
Enumeratee.filter Flow.filter
Enumeratee.take Flow.take
Enumeratee.takeWhile Flow.takeWhile
Enumeratee.drop Flow.drop
Enumeratee.dropWhile Flow.dropWhile
Enumeratee.collect Flow.collect

下一步:Java 遷移指南


在這個文件中發現錯誤?此頁面的原始程式碼可以在 這裡 找到。閱讀 文件指南 後,請隨時提交 pull request。有問題或建議要分享?前往 我們的社群論壇 與社群展開對話。