在 Cloud Dataflow 中运行大数据文本处理流水线

Dataflow 是一种统一的编程模型,同时也是一项托管服务,用于开发和执行各种各样的数据处理模式,包括 ETL、批量计算和连续计算等。由于 Dataflow 是一项代管式服务,因此它可以按需分配资源,从而最大限度地缩短延迟时间,同时保持高使用效率。

Dataflow 模型将批处理和流处理相结合,因此开发者无需在正确性、费用和处理时间之间做出权衡。在此 Codelab 中,您将学习如何运行一个 Dataflow 流水线,该流水线用于统计文本文件中唯一字词的出现次数。

本教程改编自 https://cloud.google.com/dataflow/docs/quickstarts/quickstart-java-maven

学习内容

  • 如何使用 Cloud Dataflow SDK 创建 Maven 项目
  • 使用 Google Cloud Platform Console 运行示例流水线
  • 如何删除关联的 Cloud Storage 存储分区及其内容

所需条件

您打算如何使用本教程?

仅阅读教程内容 阅读并完成练习

您如何评价自己在使用 Google Cloud Platform 服务方面的经验水平?

新手水平 中等水平 熟练水平

自定进度的环境设置

如果您还没有 Google 账号(Gmail 或 Google Apps),则必须创建一个。登录 Google Cloud Platform Console (console.cloud.google.com) 并创建一个新项目:

2016-02-10 12:45:26 的屏幕截图.png

请记住项目 ID,它在所有 Google Cloud 项目中都是唯一名称(很抱歉,上述名称已被占用,您无法使用!)。它稍后将在此 Codelab 中被称为 PROJECT_ID

接下来,您需要在 Cloud Console 中启用结算功能,才能使用 Google Cloud 资源。

在此 Codelab 中运行仅花费几美元,但是如果您决定使用更多资源或继续让它们运行,费用可能更高(请参阅本文档末尾的“清理”部分)。

Google Cloud Platform 的新用户有资格获享 $300 免费试用

启用 API

点击屏幕左上角的菜单图标。

从下拉菜单中选择 API 管理器

在搜索框中搜索“Google Compute Engine”。在显示的搜索结果列表中,点击“Google Compute Engine API”。

在 Google Compute Engine 页面上,点击启用

启用后,点击箭头返回。

现在,搜索以下 API 并启用它们:

  • Google Dataflow API
  • Stackdriver Logging API
  • Google Cloud Storage
  • Google Cloud Storage JSON API
  • BigQuery API
  • Google Cloud Pub/Sub API
  • Google Cloud Datastore API

Google Cloud Platform 控制台 中,点击屏幕左上角的菜单图标:

向下滚动,然后在存储空间子部分中选择 Cloud Storage

您现在应该会看到 Cloud Storage 浏览器,并且假设您使用的是目前没有任何 Cloud Storage 存储分区的项目,您会看到一个对话框,邀请您创建新的存储分区:

创建存储分区按钮以创建一个存储分区:

为存储分区输入名称。如对话框中所述,存储分区名称在整个 Cloud Storage 中必须具有唯一性。因此,如果您选择一个显而易见的名称(例如“test”),您可能会发现其他人已经创建了具有该名称的存储分区,并会收到错误消息。

此外,对于允许在存储分区名称中使用的字符,也有一些规则。如果您的存储分区名称以字母或数字开头和结尾,并且中间只使用短划线,则不会出现问题。如果您尝试使用特殊字符,或者尝试以字母或数字以外的字符开头或结尾来命名存储分区,系统会在对话框中提醒您相关规则。

为您的存储分区输入一个唯一的名称,然后按创建。如果您选择的名称已被使用,系统会显示上述错误消息。成功创建存储分区后,系统会将您转到浏览器中的新空存储分区:

您看到的存储分区名称当然会有所不同,因为存储分区名称在所有项目中必须是唯一的。

激活 Google Cloud Shell

在 GCP 控制台中,点击右上角工具栏上的 Cloud Shell 图标:

然后点击“启动 Cloud Shell”:

配置和连接到环境应该只需要片刻时间:

这个虚拟机已加载了您需要的所有开发工具。它提供了一个永久性的 5GB 主目录,并且在 Google Cloud 上运行,从而大大增强了网络性能和身份验证功能。只需使用一个浏览器或 Google Chromebook 即可完成本实验中的大部分(甚至全部)工作。

在连接到 Cloud Shell 后,您应该会看到自己已通过身份验证,并且相关项目已设置为您的 PROJECT_ID。

在 Cloud Shell 中运行以下命令以确认您已通过身份验证:

gcloud auth list

命令输出

Credentialed accounts:
 - <myaccount>@<mydomain>.com (active)
gcloud config list project

命令输出

[core]
project = <PROJECT_ID>

如果不是上述结果,您可以使用以下命令进行设置:

gcloud config set project <PROJECT_ID>

命令输出

Updated property [core/project].

Cloud Shell 启动后,我们先创建一个包含 Java 版 Cloud Dataflow SDK 的 Maven 项目。

在 shell 中运行 mvn archetype:generate 命令,如下所示:

  mvn archetype:generate \
     -DarchetypeArtifactId=google-cloud-dataflow-java-archetypes-examples \
     -DarchetypeGroupId=com.google.cloud.dataflow \
     -DarchetypeVersion=1.9.0 \
     -DgroupId=com.example \
     -DartifactId=first-dataflow \
     -Dversion="0.1" \
     -DinteractiveMode=false \
     -Dpackage=com.example

运行该命令后,您应该会在当前目录下看到一个名为 first-dataflow 的新目录。first-dataflow 包含一个 Maven 项目,其中包含 Java 版 Cloud Dataflow SDK 和示例流水线。

我们先将项目 ID 和 Cloud Storage 存储分区名称保存为环境变量。您可以在 Cloud Shell 中执行此操作。请务必将 <your_project_id> 替换为您自己的项目 ID。

 export PROJECT_ID=<your_project_id>

现在,我们将对 Cloud Storage 存储分区执行相同的操作。请注意,您需要将 <your_bucket_name> 替换为您在之前的步骤中用于创建存储分区的唯一名称。

 export BUCKET_NAME=<your_bucket_name>

切换到 first-dataflow/ 目录:

 cd first-dataflow

我们将运行一个名为 WordCount 的流水线,该流水线可读取文本、将文本行标记化为单个词,并对其中的每个词执行词频计数。首先,我们将运行流水线,并在其运行期间查看每个步骤中发生的情况。

在 shell 或终端窗口中运行 mvn compile exec:java 命令,以启动流水线。对于 --project, --stagingLocation,--output 实参,以下命令引用了您在此步骤中之前设置的环境变量。

 mvn compile exec:java \
      -Dexec.mainClass=com.example.WordCount \
      -Dexec.args="--project=${PROJECT_ID} \
      --stagingLocation=gs://${BUCKET_NAME}/staging/ \
      --output=gs://${BUCKET_NAME}/output \
      --runner=BlockingDataflowPipelineRunner"

在作业运行期间,我们来在作业列表中找到该作业。

Google Cloud Platform Console 中打开 Cloud Dataflow 监控界面。您会发现,您的 wordcount 作业的状态是正在运行

现在,我们来看看流水线参数。首先,点击作业的名称:

选择作业后,您可以查看执行图。在流水线的执行图中,每个方框表示流水线中的一个转换,其中包含转换名称和一些状态信息。您可以点击每个步骤右上角的尖角来查看更多详细信息:

我们来看看流水线如何在每个步骤中转换数据:

  • 读取:在此步骤中,流水线从输入源读取数据。在本例中,它是 Cloud Storage 中的一个文本文件,其中包含莎士比亚戏剧《李尔王》的完整文本。我们的流水线逐行读取文件,并输出一个 PCollection,其中文本文件中的每一行都是集合中的一个元素。
  • CountWordsCountWords 步骤包含两个部分。首先,它使用名为 ExtractWords 的并行 do 函数 (ParDo) 将每行标记化为一个个字词。ExtractWords 的输出是一个新的 PCollection,其中的每个元素都是一个字词。下一步 Count 使用 Dataflow SDK 提供的转换,该转换会返回键值对,其中键是唯一字词,值是该字词出现的次数。以下是实现 CountWords 的方法,您可以在 GitHub 上查看完整的 WordCount.java 文件:
  /**
   * A PTransform that converts a PCollection containing lines of text 
   * into a PCollection of formatted word counts.
   */
  public static class CountWords extends PTransform<PCollection<String>,
      PCollection<KV<String, Long>>> {
    @Override
    public PCollection<KV<String, Long>> apply(PCollection<String> lines) {

      // Convert lines of text into individual words.
      PCollection<String> words = lines.apply(
          ParDo.of(new ExtractWordsFn()));

      // Count the number of times each word occurs.
      PCollection<KV<String, Long>> wordCounts =
          words.apply(Count.<String>perElement());

      return wordCounts;
    }
  }
  • FormatAsText:此函数可将每个键值对格式化为可打印的字符串。以下是实现此功能的 FormatAsText 转换:
  /** A SimpleFunction that converts a Word and Count into a printable string. */
  public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> {
    @Override
    public String apply(KV<String, Long> input) {
      return input.getKey() + ": " + input.getValue();
    }
  }
  • WriteCounts:在此步骤中,我们将可打印的字符串写入多个分片文本文件。

我们将在几分钟后查看流水线的输出结果。

现在,我们来看看图表右侧的摘要页面,其中包含我们在 mvn compile exec:java 命令中添加的流水线参数。

您还可以看到流水线的自定义计数器,在本例中,该计数器显示了执行期间到目前为止遇到的空行数。您可以向流水线添加新的计数器,以便跟踪特定于应用的指标。

您可以点击日志图标来查看具体的错误消息。

您可以使用“最低严重级别”下拉菜单过滤“作业日志”标签页中显示的消息。

您可以使用日志标签页中的工作器日志按钮查看运行流水线的 Compute Engine 实例的工作器日志。工作器日志包含由您的代码和运行该代码的 Dataflow 生成的代码生成的日志行。

如果您尝试调试流水线中的失败,通常会在 Worker 日志中找到有助于解决问题的其他日志记录。请注意,这些日志是所有工作器的汇总日志,可以进行过滤和搜索。

在下一步中,我们将检查您的作业是否成功运行。

Google Cloud Platform Console 中打开 Cloud Dataflow 监控界面。

您应会看到 wordcount 作业的状态最初为正在运行,然后变为成功

该作业大约需要 3-4 分钟才能运行完成。

还记得您运行流水线并指定输出存储分区时的情况吗?让我们来看看结果(因为您肯定想知道《李尔王》中每个字词出现的次数!)。返回到 Google Cloud Platform 控制台中的 Cloud Storage 浏览器。在您的存储分区中,您应该会看到作业所创建的输出文件和暂存文件:

您可以通过 Google Cloud Platform Console 关闭资源。

在 Google Cloud Platform Console 中打开 Cloud Storage 浏览器。

选中所创建存储分区旁边的复选框。

点击删除以永久删除存储分区及其内容。

您已了解如何使用 Cloud Dataflow SDK 创建 Maven 项目、使用 Google Cloud Platform 控制台运行示例流水线,以及删除关联的 Cloud Storage 存储分区及其内容。

了解详情

许可

此作品已获得 Creative Commons Attribution 3.0 通用许可和 Apache 2.0 许可授权。