在 Cloud Dataflow 中執行大數據文字處理管道

Dataflow 是整合式的程式設計模型,也是一項代管服務,可用於開發和執行各種資料處理模式,包括 ETL、批次運算和持續不間斷的運算作業。由於 Dataflow 是代管服務,因此可視需求分配資源,盡量減少延遲時間,同時維持高使用率。

Dataflow 模型結合了批次和串流處理,因此開發人員不必在正確性、成本和處理時間之間做出取捨。在本程式碼研究室中,您將瞭解如何執行 Dataflow 管道,計算文字檔中不重複字詞的出現次數。

本教學課程改編自 https://cloud.google.com/dataflow/docs/quickstarts/quickstart-java-maven

課程內容

  • 如何使用 Cloud Dataflow SDK 建立 Maven 專案
  • 使用 Google Cloud Platform Console 執行範例管道
  • 如何刪除相關聯的 Cloud Storage bucket 和當中內容

軟硬體需求

您會如何使用本教學課程?

僅閱讀 閱讀並完成練習

您對使用 Google Cloud Platform 服務的體驗有何評價?

新手 中級 熟練

自行設定環境

如果您還沒有 Google 帳戶 (Gmail 或 Google 應用程式),請先建立帳戶。登入 Google Cloud Platform 主控台 (console.cloud.google.com),然後建立新專案:

Screenshot from 2016-02-10 12:45:26.png

請記住專案 ID,這是所有 Google Cloud 專案中不重複的名稱 (上述名稱已遭占用,因此不適用於您,抱歉!)。本程式碼研究室稍後會將其稱為 PROJECT_ID

接著,您必須在 Cloud 控制台中啟用帳單,才能使用 Google Cloud 資源。

完成本程式碼研究室的費用不應超過數美元,但如果您決定使用更多資源,或是將資源繼續執行 (請參閱本文件結尾的「清除」一節),則可能會增加費用。

Google Cloud Platform 新使用者享有價值 $300 美元的免費試用期

啟用 API

按一下畫面左上方的「選單」圖示。

從下拉式選單中選取「API 管理工具」

在搜尋框中搜尋「Google Compute Engine」。在顯示的結果清單中,按一下「Google Compute Engine API」。

在 Google Compute Engine 頁面中,按一下「啟用」

啟用後,按一下箭頭即可返回。

現在請搜尋並啟用下列 API:

  • Google Dataflow API
  • Stackdriver Logging API
  • Google Cloud Storage
  • Google Cloud Storage JSON API
  • BigQuery API
  • Google Cloud Pub/Sub API
  • Google Cloud Datastore API

Google Cloud Platform Console 中,按一下畫面左上方的「選單」圖示:

向下捲動,然後在「儲存空間」子部分中選取「Cloud Storage」

現在應該會看到 Cloud Storage 瀏覽器,如果您使用的專案目前沒有任何 Cloud Storage 值區,系統會顯示對話方塊,邀請您建立新的值區:

按下「建立值區」按鈕即可建立值區:

輸入 bucket 的名稱。如對話方塊所述,Cloud Storage 中所有值區的名稱皆不得重複。因此,如果您選擇「test」等顯而易見的名稱,可能會發現其他人已建立同名 bucket,並收到錯誤訊息。

此外,值區名稱可使用的字元也有一些規則。只要值區名稱的開頭和結尾是英文字母或數字,中間只使用連字號,就沒問題。如果嘗試使用特殊字元,或嘗試以英文字母或數字以外的字元做為值區名稱的開頭或結尾,對話方塊會提醒你相關規則。

輸入 bucket 的專屬名稱,然後按一下「建立」。如果選擇的名稱已在使用中,系統會顯示上述錯誤訊息。成功建立 bucket 後,瀏覽器會顯示新的空白 bucket:

當然,您看到的值區名稱會有所不同,因為所有專案的值區名稱都不得重複。

啟用 Google Cloud Shell

在 GCP 主控台的右上角工具列中,按一下 Cloud Shell 圖示:

然後按一下「啟動 Cloud Shell」:

佈建並連線至環境的作業只需幾分鐘的時間:

這部虛擬機器搭載各種您需要的開發工具,提供永久的 5 GB 主目錄,而且在 Google Cloud 中運作,可大幅提升網路效能和驗證功能。您只需要瀏覽器或 Google Chromebook,就能完成這個實驗室的大部分工作 (甚至全部)。

連線至 Cloud Shell 後,您應會發現自己通過驗證,且專案已設為您的「PROJECT_ID」PROJECT_ID

在 Cloud Shell 中執行下列指令,確認您已通過驗證:

gcloud auth list

指令輸出

Credentialed accounts:
 - <myaccount>@<mydomain>.com (active)
gcloud config list project

指令輸出

[core]
project = <PROJECT_ID>

如未設定,請輸入下列指令設定專案:

gcloud config set project <PROJECT_ID>

指令輸出

Updated property [core/project].

啟動 Cloud Shell 後,請先建立包含 Java 適用的 Cloud Dataflow SDK 的 Maven 專案。

在殼層中執行 mvn archetype:generate 指令,如下所示:

  mvn archetype:generate \
     -DarchetypeArtifactId=google-cloud-dataflow-java-archetypes-examples \
     -DarchetypeGroupId=com.google.cloud.dataflow \
     -DarchetypeVersion=1.9.0 \
     -DgroupId=com.example \
     -DartifactId=first-dataflow \
     -Dversion="0.1" \
     -DinteractiveMode=false \
     -Dpackage=com.example

執行指令後,您應該會在目前的目錄下看到名為 first-dataflow 的新目錄。包含 Maven 專案,其中包含 Java 適用的 Cloud Dataflow SDK 和範例管道。first-dataflow

首先,請將專案 ID 和 Cloud Storage 值區名稱儲存為環境變數。您可以在 Cloud Shell 中執行此操作。請務必將 <your_project_id> 替換為您自己的專案 ID。

 export PROJECT_ID=<your_project_id>

現在,我們將對 Cloud Storage 值區執行相同操作。請記得將 <your_bucket_name> 替換為您在先前步驟中建立值區時使用的專屬名稱。

 export BUCKET_NAME=<your_bucket_name>

切換至 first-dataflow/ 目錄。

 cd first-dataflow

我們將執行名為 WordCount 的管道,這個管道會讀取文字、將文字行代碼化為個別字詞,然後計算每個字詞出現的頻率。首先,我們會執行管道,並在執行期間查看每個步驟的狀況。

在殼層或終端機視窗中執行 mvn compile exec:java 指令,啟動管道。對於 --project, --stagingLocation,--output 引數,下列指令會參照您在本步驟稍早設定的環境變數。

 mvn compile exec:java \
      -Dexec.mainClass=com.example.WordCount \
      -Dexec.args="--project=${PROJECT_ID} \
      --stagingLocation=gs://${BUCKET_NAME}/staging/ \
      --output=gs://${BUCKET_NAME}/output \
      --runner=BlockingDataflowPipelineRunner"

工作執行期間,請在工作清單中找出該工作。

Google Cloud Platform Console 中開啟 Cloud Dataflow 監控 UI。畫面會顯示 wordcount 工作,狀態為「執行中」

現在來看看管道參數。首先,請按一下工作名稱:

選取工作後,您可以查看執行圖。管道的執行圖會以包含轉換名稱和部分狀態資訊的方塊,呈現管道中的各個轉換。按一下每個步驟右上角的插入號,即可查看更多詳細資料:

現在來看看管道在每個步驟中轉換資料的方式:

  • 讀取:在這個步驟中,管道會從輸入來源讀取資料。在本例中,這是 Cloud Storage 中的文字檔案,內含莎士比亞戲劇《李爾王》的全文。管道會逐行讀取檔案,並輸出每個 PCollection,其中文字檔案中的每一行都是集合中的元素。
  • CountWordsCountWords 步驟包含兩個部分。首先,它會使用名為 ExtractWords 的平行 do 函式 (ParDo),將每行代碼化為個別字詞。ExtractWords 的輸出內容是新的 PCollection,其中每個元素都是一個字。下一個步驟 Count 會使用 Dataflow SDK 提供的轉換,傳回鍵/值組合,其中鍵是唯一字詞,值則是出現次數。以下是實作 CountWords 的方法,您可以在 GitHub 上查看完整的 WordCount.java 檔案:
  /**
   * A PTransform that converts a PCollection containing lines of text 
   * into a PCollection of formatted word counts.
   */
  public static class CountWords extends PTransform<PCollection<String>,
      PCollection<KV<String, Long>>> {
    @Override
    public PCollection<KV<String, Long>> apply(PCollection<String> lines) {

      // Convert lines of text into individual words.
      PCollection<String> words = lines.apply(
          ParDo.of(new ExtractWordsFn()));

      // Count the number of times each word occurs.
      PCollection<KV<String, Long>> wordCounts =
          words.apply(Count.<String>perElement());

      return wordCounts;
    }
  }
  • FormatAsText:這個函式會將每個鍵/值組合格式化為可列印的字串。以下是實作這項功能的 FormatAsText 轉換:
  /** A SimpleFunction that converts a Word and Count into a printable string. */
  public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> {
    @Override
    public String apply(KV<String, Long> input) {
      return input.getKey() + ": " + input.getValue();
    }
  }
  • WriteCounts:在這個步驟中,我們會將可列印的字串寫入多個分片文字檔案。

我們會在幾分鐘後查看管道產生的輸出內容。

現在請查看圖表右側的「摘要」頁面,其中包含我們在 mvn compile exec:java 指令中加入的管道參數。

您也可以查看管道的自訂計數器,在本例中,這會顯示執行期間目前遇到的空白行數。您可以在管道中新增計數器,追蹤應用程式專屬指標。

您可以按一下「記錄」圖示,查看具體的錯誤訊息。

使用「最低嚴重性」下拉式選單,篩選「工作記錄」分頁中顯示的訊息。

您可以使用記錄分頁中的「工作站記錄」按鈕,查看執行管道的 Compute Engine 執行個體工作站記錄。工作站記錄包含程式碼產生的記錄行,以及執行程式碼的 Dataflow 產生程式碼。

如果您嘗試對管道中的失敗進行偵錯,通常「工作人員記錄」中會有額外的記錄,有助於解決問題。請注意,這些記錄是匯總所有工作站的記錄,可以篩選及搜尋。

在下一個步驟中,我們會檢查工作是否成功。

Google Cloud Platform Console 中開啟 Cloud Dataflow 監控 UI。

畫面會先後顯示 wordcount 工作的「Status」(狀態) 為「Running」(執行中) 和「Succeeded」(成功)

這項工作大約需要 3 到 4 分鐘才能完成。

還記得您執行管道並指定輸出值 bucket 嗎?讓我們看看結果 (因為您一定想知道《李爾王》中每個字詞出現的次數!)。返回 Google Cloud Platform Console 中的 Cloud Storage 瀏覽器。在您的值區中,您應該會看見工作所建立的輸出檔案和暫存檔案:

您可以透過 Google Cloud Platform 主控台關閉資源。

在 Google Cloud Platform 主控台中開啟 Cloud Storage 瀏覽器。

找出您建立的值區,並選取旁邊的核取方塊。

按一下「刪除」,即可永久刪除值區及其內容。

您已瞭解如何使用 Cloud Dataflow SDK 建立 Maven 專案、使用 Google Cloud Platform 控制台執行範例管道,以及刪除相關聯的 Cloud Storage 值區及其內容。

瞭解詳情

授權

這項內容採用的授權為創用 CC 姓名標示 3.0 通用授權和 Apache 2.0 授權。