Dataflow 是整合式的程式設計模型,也是一項代管服務,可用於開發和執行各種資料處理模式,包括 ETL、批次運算和持續不間斷的運算作業。由於 Dataflow 是代管服務,因此可視需求分配資源,盡量減少延遲時間,同時維持高使用率。
Dataflow 模型結合了批次和串流處理,因此開發人員不必在正確性、成本和處理時間之間做出取捨。在本程式碼研究室中,您將瞭解如何執行 Dataflow 管道,計算文字檔中不重複字詞的出現次數。
本教學課程改編自 https://cloud.google.com/dataflow/docs/quickstarts/quickstart-java-maven
課程內容
- 如何使用 Cloud Dataflow SDK 建立 Maven 專案
- 使用 Google Cloud Platform Console 執行範例管道
- 如何刪除相關聯的 Cloud Storage bucket 和當中內容
軟硬體需求
您會如何使用本教學課程?
您對使用 Google Cloud Platform 服務的體驗有何評價?
自行設定環境
如果您還沒有 Google 帳戶 (Gmail 或 Google 應用程式),請先建立帳戶。登入 Google Cloud Platform 主控台 (console.cloud.google.com),然後建立新專案:
請記住專案 ID,這是所有 Google Cloud 專案中不重複的名稱 (上述名稱已遭占用,因此不適用於您,抱歉!)。本程式碼研究室稍後會將其稱為 PROJECT_ID
。
接著,您必須在 Cloud 控制台中啟用帳單,才能使用 Google Cloud 資源。
完成本程式碼研究室的費用不應超過數美元,但如果您決定使用更多資源,或是將資源繼續執行 (請參閱本文件結尾的「清除」一節),則可能會增加費用。
Google Cloud Platform 新使用者享有價值 $300 美元的免費試用期。
啟用 API
按一下畫面左上方的「選單」圖示。
從下拉式選單中選取「API 管理工具」。
在搜尋框中搜尋「Google Compute Engine」。在顯示的結果清單中,按一下「Google Compute Engine API」。
在 Google Compute Engine 頁面中,按一下「啟用」
啟用後,按一下箭頭即可返回。
現在請搜尋並啟用下列 API:
- Google Dataflow API
- Stackdriver Logging API
- Google Cloud Storage
- Google Cloud Storage JSON API
- BigQuery API
- Google Cloud Pub/Sub API
- Google Cloud Datastore API
在 Google Cloud Platform Console 中,按一下畫面左上方的「選單」圖示:
向下捲動,然後在「儲存空間」子部分中選取「Cloud Storage」:
現在應該會看到 Cloud Storage 瀏覽器,如果您使用的專案目前沒有任何 Cloud Storage 值區,系統會顯示對話方塊,邀請您建立新的值區:
按下「建立值區」按鈕即可建立值區:
輸入 bucket 的名稱。如對話方塊所述,Cloud Storage 中所有值區的名稱皆不得重複。因此,如果您選擇「test」等顯而易見的名稱,可能會發現其他人已建立同名 bucket,並收到錯誤訊息。
此外,值區名稱可使用的字元也有一些規則。只要值區名稱的開頭和結尾是英文字母或數字,中間只使用連字號,就沒問題。如果嘗試使用特殊字元,或嘗試以英文字母或數字以外的字元做為值區名稱的開頭或結尾,對話方塊會提醒你相關規則。
輸入 bucket 的專屬名稱,然後按一下「建立」。如果選擇的名稱已在使用中,系統會顯示上述錯誤訊息。成功建立 bucket 後,瀏覽器會顯示新的空白 bucket:
當然,您看到的值區名稱會有所不同,因為所有專案的值區名稱都不得重複。
啟用 Google Cloud Shell
在 GCP 主控台的右上角工具列中,按一下 Cloud Shell 圖示:
然後按一下「啟動 Cloud Shell」:
佈建並連線至環境的作業只需幾分鐘的時間:
這部虛擬機器搭載各種您需要的開發工具,提供永久的 5 GB 主目錄,而且在 Google Cloud 中運作,可大幅提升網路效能和驗證功能。您只需要瀏覽器或 Google Chromebook,就能完成這個實驗室的大部分工作 (甚至全部)。
連線至 Cloud Shell 後,您應會發現自己通過驗證,且專案已設為您的「PROJECT_ID」PROJECT_ID。
在 Cloud Shell 中執行下列指令,確認您已通過驗證:
gcloud auth list
指令輸出
Credentialed accounts: - <myaccount>@<mydomain>.com (active)
gcloud config list project
指令輸出
[core] project = <PROJECT_ID>
如未設定,請輸入下列指令設定專案:
gcloud config set project <PROJECT_ID>
指令輸出
Updated property [core/project].
啟動 Cloud Shell 後,請先建立包含 Java 適用的 Cloud Dataflow SDK 的 Maven 專案。
在殼層中執行 mvn archetype:generate
指令,如下所示:
mvn archetype:generate \
-DarchetypeArtifactId=google-cloud-dataflow-java-archetypes-examples \
-DarchetypeGroupId=com.google.cloud.dataflow \
-DarchetypeVersion=1.9.0 \
-DgroupId=com.example \
-DartifactId=first-dataflow \
-Dversion="0.1" \
-DinteractiveMode=false \
-Dpackage=com.example
執行指令後,您應該會在目前的目錄下看到名為 first-dataflow
的新目錄。包含 Maven 專案,其中包含 Java 適用的 Cloud Dataflow SDK 和範例管道。first-dataflow
首先,請將專案 ID 和 Cloud Storage 值區名稱儲存為環境變數。您可以在 Cloud Shell 中執行此操作。請務必將 <your_project_id>
替換為您自己的專案 ID。
export PROJECT_ID=<your_project_id>
現在,我們將對 Cloud Storage 值區執行相同操作。請記得將 <your_bucket_name>
替換為您在先前步驟中建立值區時使用的專屬名稱。
export BUCKET_NAME=<your_bucket_name>
切換至 first-dataflow/
目錄。
cd first-dataflow
我們將執行名為 WordCount 的管道,這個管道會讀取文字、將文字行代碼化為個別字詞,然後計算每個字詞出現的頻率。首先,我們會執行管道,並在執行期間查看每個步驟的狀況。
在殼層或終端機視窗中執行 mvn compile exec:java
指令,啟動管道。對於 --project, --stagingLocation,
和 --output
引數,下列指令會參照您在本步驟稍早設定的環境變數。
mvn compile exec:java \
-Dexec.mainClass=com.example.WordCount \
-Dexec.args="--project=${PROJECT_ID} \
--stagingLocation=gs://${BUCKET_NAME}/staging/ \
--output=gs://${BUCKET_NAME}/output \
--runner=BlockingDataflowPipelineRunner"
工作執行期間,請在工作清單中找出該工作。
在 Google Cloud Platform Console 中開啟 Cloud Dataflow 監控 UI。畫面會顯示 wordcount 工作,狀態為「執行中」:
現在來看看管道參數。首先,請按一下工作名稱:
選取工作後,您可以查看執行圖。管道的執行圖會以包含轉換名稱和部分狀態資訊的方塊,呈現管道中的各個轉換。按一下每個步驟右上角的插入號,即可查看更多詳細資料:
現在來看看管道在每個步驟中轉換資料的方式:
- 讀取:在這個步驟中,管道會從輸入來源讀取資料。在本例中,這是 Cloud Storage 中的文字檔案,內含莎士比亞戲劇《李爾王》的全文。管道會逐行讀取檔案,並輸出每個
PCollection
,其中文字檔案中的每一行都是集合中的元素。 - CountWords:
CountWords
步驟包含兩個部分。首先,它會使用名為ExtractWords
的平行 do 函式 (ParDo),將每行代碼化為個別字詞。ExtractWords 的輸出內容是新的 PCollection,其中每個元素都是一個字。下一個步驟Count
會使用 Dataflow SDK 提供的轉換,傳回鍵/值組合,其中鍵是唯一字詞,值則是出現次數。以下是實作CountWords
的方法,您可以在 GitHub 上查看完整的 WordCount.java 檔案:
/**
* A PTransform that converts a PCollection containing lines of text
* into a PCollection of formatted word counts.
*/
public static class CountWords extends PTransform<PCollection<String>,
PCollection<KV<String, Long>>> {
@Override
public PCollection<KV<String, Long>> apply(PCollection<String> lines) {
// Convert lines of text into individual words.
PCollection<String> words = lines.apply(
ParDo.of(new ExtractWordsFn()));
// Count the number of times each word occurs.
PCollection<KV<String, Long>> wordCounts =
words.apply(Count.<String>perElement());
return wordCounts;
}
}
- FormatAsText:這個函式會將每個鍵/值組合格式化為可列印的字串。以下是實作這項功能的
FormatAsText
轉換:
/** A SimpleFunction that converts a Word and Count into a printable string. */
public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> {
@Override
public String apply(KV<String, Long> input) {
return input.getKey() + ": " + input.getValue();
}
}
- WriteCounts:在這個步驟中,我們會將可列印的字串寫入多個分片文字檔案。
我們會在幾分鐘後查看管道產生的輸出內容。
現在請查看圖表右側的「摘要」頁面,其中包含我們在 mvn compile exec:java
指令中加入的管道參數。
您也可以查看管道的自訂計數器,在本例中,這會顯示執行期間目前遇到的空白行數。您可以在管道中新增計數器,追蹤應用程式專屬指標。
您可以按一下「記錄」圖示,查看具體的錯誤訊息。
使用「最低嚴重性」下拉式選單,篩選「工作記錄」分頁中顯示的訊息。
您可以使用記錄分頁中的「工作站記錄」按鈕,查看執行管道的 Compute Engine 執行個體工作站記錄。工作站記錄包含程式碼產生的記錄行,以及執行程式碼的 Dataflow 產生程式碼。
如果您嘗試對管道中的失敗進行偵錯,通常「工作人員記錄」中會有額外的記錄,有助於解決問題。請注意,這些記錄是匯總所有工作站的記錄,可以篩選及搜尋。
在下一個步驟中,我們會檢查工作是否成功。
在 Google Cloud Platform Console 中開啟 Cloud Dataflow 監控 UI。
畫面會先後顯示 wordcount 工作的「Status」(狀態) 為「Running」(執行中) 和「Succeeded」(成功):
這項工作大約需要 3 到 4 分鐘才能完成。
還記得您執行管道並指定輸出值 bucket 嗎?讓我們看看結果 (因為您一定想知道《李爾王》中每個字詞出現的次數!)。返回 Google Cloud Platform Console 中的 Cloud Storage 瀏覽器。在您的值區中,您應該會看見工作所建立的輸出檔案和暫存檔案:
您可以透過 Google Cloud Platform 主控台關閉資源。
在 Google Cloud Platform 主控台中開啟 Cloud Storage 瀏覽器。
找出您建立的值區,並選取旁邊的核取方塊。
按一下「刪除」,即可永久刪除值區及其內容。
您已瞭解如何使用 Cloud Dataflow SDK 建立 Maven 專案、使用 Google Cloud Platform 控制台執行範例管道,以及刪除相關聯的 Cloud Storage 值區及其內容。
瞭解詳情
- Dataflow 說明文件:https://cloud.google.com/dataflow/docs/
授權
這項內容採用的授權為創用 CC 姓名標示 3.0 通用授權和 Apache 2.0 授權。