Dataflow 是一种统一的编程模型,同时也是一项托管服务,用于开发和执行各种各样的数据处理模式,包括 ETL、批量计算和连续计算等。由于 Dataflow 是一项代管式服务,因此它可以按需分配资源,从而最大限度地缩短延迟时间,同时保持高使用效率。
Dataflow 模型将批处理和流处理相结合,因此开发者无需在正确性、费用和处理时间之间做出权衡。在此 Codelab 中,您将学习如何运行一个 Dataflow 流水线,该流水线用于统计文本文件中唯一字词的出现次数。
本教程改编自 https://cloud.google.com/dataflow/docs/quickstarts/quickstart-java-maven
学习内容
- 如何使用 Cloud Dataflow SDK 创建 Maven 项目
- 使用 Google Cloud Platform Console 运行示例流水线
- 如何删除关联的 Cloud Storage 存储分区及其内容
所需条件
您打算如何使用本教程?
您如何评价自己在使用 Google Cloud Platform 服务方面的经验水平?
自定进度的环境设置
如果您还没有 Google 账号(Gmail 或 Google Apps),则必须创建一个。登录 Google Cloud Platform Console (console.cloud.google.com) 并创建一个新项目:
请记住项目 ID,它在所有 Google Cloud 项目中都是唯一名称(很抱歉,上述名称已被占用,您无法使用!)。它稍后将在此 Codelab 中被称为 PROJECT_ID
。
接下来,您需要在 Cloud Console 中启用结算功能,才能使用 Google Cloud 资源。
在此 Codelab 中运行仅花费几美元,但是如果您决定使用更多资源或继续让它们运行,费用可能更高(请参阅本文档末尾的“清理”部分)。
Google Cloud Platform 的新用户有资格获享 $300 免费试用。
启用 API
点击屏幕左上角的菜单图标。
从下拉菜单中选择 API 管理器。
在搜索框中搜索“Google Compute Engine”。在显示的搜索结果列表中,点击“Google Compute Engine API”。
在 Google Compute Engine 页面上,点击启用
启用后,点击箭头返回。
现在,搜索以下 API 并启用它们:
- Google Dataflow API
- Stackdriver Logging API
- Google Cloud Storage
- Google Cloud Storage JSON API
- BigQuery API
- Google Cloud Pub/Sub API
- Google Cloud Datastore API
在 Google Cloud Platform 控制台 中,点击屏幕左上角的菜单图标:
向下滚动,然后在存储空间子部分中选择 Cloud Storage:
您现在应该会看到 Cloud Storage 浏览器,并且假设您使用的是目前没有任何 Cloud Storage 存储分区的项目,您会看到一个对话框,邀请您创建新的存储分区:
按创建存储分区按钮以创建一个存储分区:
为存储分区输入名称。如对话框中所述,存储分区名称在整个 Cloud Storage 中必须具有唯一性。因此,如果您选择一个显而易见的名称(例如“test”),您可能会发现其他人已经创建了具有该名称的存储分区,并会收到错误消息。
此外,对于允许在存储分区名称中使用的字符,也有一些规则。如果您的存储分区名称以字母或数字开头和结尾,并且中间只使用短划线,则不会出现问题。如果您尝试使用特殊字符,或者尝试以字母或数字以外的字符开头或结尾来命名存储分区,系统会在对话框中提醒您相关规则。
为您的存储分区输入一个唯一的名称,然后按创建。如果您选择的名称已被使用,系统会显示上述错误消息。成功创建存储分区后,系统会将您转到浏览器中的新空存储分区:
您看到的存储分区名称当然会有所不同,因为存储分区名称在所有项目中必须是唯一的。
激活 Google Cloud Shell
在 GCP 控制台中,点击右上角工具栏上的 Cloud Shell 图标:
然后点击“启动 Cloud Shell”:
配置和连接到环境应该只需要片刻时间:
这个虚拟机已加载了您需要的所有开发工具。它提供了一个永久性的 5GB 主目录,并且在 Google Cloud 上运行,从而大大增强了网络性能和身份验证功能。只需使用一个浏览器或 Google Chromebook 即可完成本实验中的大部分(甚至全部)工作。
在连接到 Cloud Shell 后,您应该会看到自己已通过身份验证,并且相关项目已设置为您的 PROJECT_ID。
在 Cloud Shell 中运行以下命令以确认您已通过身份验证:
gcloud auth list
命令输出
Credentialed accounts: - <myaccount>@<mydomain>.com (active)
gcloud config list project
命令输出
[core] project = <PROJECT_ID>
如果不是上述结果,您可以使用以下命令进行设置:
gcloud config set project <PROJECT_ID>
命令输出
Updated property [core/project].
Cloud Shell 启动后,我们先创建一个包含 Java 版 Cloud Dataflow SDK 的 Maven 项目。
在 shell 中运行 mvn archetype:generate
命令,如下所示:
mvn archetype:generate \
-DarchetypeArtifactId=google-cloud-dataflow-java-archetypes-examples \
-DarchetypeGroupId=com.google.cloud.dataflow \
-DarchetypeVersion=1.9.0 \
-DgroupId=com.example \
-DartifactId=first-dataflow \
-Dversion="0.1" \
-DinteractiveMode=false \
-Dpackage=com.example
运行该命令后,您应该会在当前目录下看到一个名为 first-dataflow
的新目录。first-dataflow
包含一个 Maven 项目,其中包含 Java 版 Cloud Dataflow SDK 和示例流水线。
我们先将项目 ID 和 Cloud Storage 存储分区名称保存为环境变量。您可以在 Cloud Shell 中执行此操作。请务必将 <your_project_id>
替换为您自己的项目 ID。
export PROJECT_ID=<your_project_id>
现在,我们将对 Cloud Storage 存储分区执行相同的操作。请注意,您需要将 <your_bucket_name>
替换为您在之前的步骤中用于创建存储分区的唯一名称。
export BUCKET_NAME=<your_bucket_name>
切换到 first-dataflow/
目录:
cd first-dataflow
我们将运行一个名为 WordCount 的流水线,该流水线可读取文本、将文本行标记化为单个词,并对其中的每个词执行词频计数。首先,我们将运行流水线,并在其运行期间查看每个步骤中发生的情况。
在 shell 或终端窗口中运行 mvn compile exec:java
命令,以启动流水线。对于 --project, --stagingLocation,
和 --output
实参,以下命令引用了您在此步骤中之前设置的环境变量。
mvn compile exec:java \
-Dexec.mainClass=com.example.WordCount \
-Dexec.args="--project=${PROJECT_ID} \
--stagingLocation=gs://${BUCKET_NAME}/staging/ \
--output=gs://${BUCKET_NAME}/output \
--runner=BlockingDataflowPipelineRunner"
在作业运行期间,我们来在作业列表中找到该作业。
在 Google Cloud Platform Console 中打开 Cloud Dataflow 监控界面。您会发现,您的 wordcount 作业的状态是正在运行:
现在,我们来看看流水线参数。首先,点击作业的名称:
选择作业后,您可以查看执行图。在流水线的执行图中,每个方框表示流水线中的一个转换,其中包含转换名称和一些状态信息。您可以点击每个步骤右上角的尖角来查看更多详细信息:
我们来看看流水线如何在每个步骤中转换数据:
- 读取:在此步骤中,流水线从输入源读取数据。在本例中,它是 Cloud Storage 中的一个文本文件,其中包含莎士比亚戏剧《李尔王》的完整文本。我们的流水线逐行读取文件,并输出一个
PCollection
,其中文本文件中的每一行都是集合中的一个元素。 - CountWords:
CountWords
步骤包含两个部分。首先,它使用名为ExtractWords
的并行 do 函数 (ParDo) 将每行标记化为一个个字词。ExtractWords 的输出是一个新的 PCollection,其中的每个元素都是一个字词。下一步Count
使用 Dataflow SDK 提供的转换,该转换会返回键值对,其中键是唯一字词,值是该字词出现的次数。以下是实现CountWords
的方法,您可以在 GitHub 上查看完整的 WordCount.java 文件:
/**
* A PTransform that converts a PCollection containing lines of text
* into a PCollection of formatted word counts.
*/
public static class CountWords extends PTransform<PCollection<String>,
PCollection<KV<String, Long>>> {
@Override
public PCollection<KV<String, Long>> apply(PCollection<String> lines) {
// Convert lines of text into individual words.
PCollection<String> words = lines.apply(
ParDo.of(new ExtractWordsFn()));
// Count the number of times each word occurs.
PCollection<KV<String, Long>> wordCounts =
words.apply(Count.<String>perElement());
return wordCounts;
}
}
- FormatAsText:此函数可将每个键值对格式化为可打印的字符串。以下是实现此功能的
FormatAsText
转换:
/** A SimpleFunction that converts a Word and Count into a printable string. */
public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> {
@Override
public String apply(KV<String, Long> input) {
return input.getKey() + ": " + input.getValue();
}
}
- WriteCounts:在此步骤中,我们将可打印的字符串写入多个分片文本文件。
我们将在几分钟后查看流水线的输出结果。
现在,我们来看看图表右侧的摘要页面,其中包含我们在 mvn compile exec:java
命令中添加的流水线参数。
您还可以看到流水线的自定义计数器,在本例中,该计数器显示了执行期间到目前为止遇到的空行数。您可以向流水线添加新的计数器,以便跟踪特定于应用的指标。
您可以点击日志图标来查看具体的错误消息。
您可以使用“最低严重级别”下拉菜单过滤“作业日志”标签页中显示的消息。
您可以使用日志标签页中的工作器日志按钮查看运行流水线的 Compute Engine 实例的工作器日志。工作器日志包含由您的代码和运行该代码的 Dataflow 生成的代码生成的日志行。
如果您尝试调试流水线中的失败,通常会在 Worker 日志中找到有助于解决问题的其他日志记录。请注意,这些日志是所有工作器的汇总日志,可以进行过滤和搜索。
在下一步中,我们将检查您的作业是否成功运行。
在 Google Cloud Platform Console 中打开 Cloud Dataflow 监控界面。
您应会看到 wordcount 作业的状态最初为正在运行,然后变为成功:
该作业大约需要 3-4 分钟才能运行完成。
还记得您运行流水线并指定输出存储分区时的情况吗?让我们来看看结果(因为您肯定想知道《李尔王》中每个字词出现的次数!)。返回到 Google Cloud Platform 控制台中的 Cloud Storage 浏览器。在您的存储分区中,您应该会看到作业所创建的输出文件和暂存文件:
您可以通过 Google Cloud Platform Console 关闭资源。
在 Google Cloud Platform Console 中打开 Cloud Storage 浏览器。
选中所创建存储分区旁边的复选框。
点击删除以永久删除存储分区及其内容。
您已了解如何使用 Cloud Dataflow SDK 创建 Maven 项目、使用 Google Cloud Platform 控制台运行示例流水线,以及删除关联的 Cloud Storage 存储分区及其内容。
了解详情
- Dataflow 文档:https://cloud.google.com/dataflow/docs/
许可
此作品已获得 Creative Commons Attribution 3.0 通用许可和 Apache 2.0 许可授权。