Dataflow는 ETL, 일괄 계산, 지속적 계산을 포함한 다양한 데이터 처리 패턴을 개발하고 실행하는 통합 프로그래밍 모델 및 관리형 서비스입니다. Dataflow는 관리형 서비스이므로 높은 사용률을 유지하면서 지연 시간을 최소화하도록 주문형 리소스를 할당할 수 있습니다.
Dataflow 모델은 일괄 처리와 스트림 처리를 결합하므로 개발자가 정확성, 비용, 처리 시간 사이에 절충안을 찾을 필요가 없습니다. 이 Codelab에서는 텍스트 파일에서 고유한 단어 일치를 계산하는 Dataflow 파이프라인을 실행하는 방법을 알아봅니다.
이 튜토리얼은 https://cloud.google.com/dataflow/docs/quickstarts/quickstart-java-maven에서 조정됩니다.
학습할 내용
- Cloud Dataflow SDK로 Maven 프로젝트를 만드는 방법
- Google Cloud Platform Console을 사용하여 예시 파이프라인 실행
- 연결된 Cloud Storage 버킷 및 콘텐츠 삭제 방법
필요한 항목
본 가이드를 어떻게 사용하실 계획인가요?
귀하의 Google Cloud Platform 서비스 사용 경험을 평가해 주세요.
자습형 환경 설정
Google 계정 (Gmail 또는 Google 앱)이 아직 없다면 계정을 만들어야 합니다. Google Cloud Platform Console (console.cloud.google.com)에 로그인하여 새 프로젝트를 만듭니다.
모든 Google Cloud 프로젝트에서 고유한 이름인 프로젝트 ID를 기억하세요(위의 이름은 이미 사용되었으므로 사용할 수 없습니다). 이 ID는 나중에 이 Codelab에서 PROJECT_ID
라고 부릅니다.
다음으로 Google Cloud 리소스를 사용하려면 Cloud Console에서 결제를 사용 설정해야 합니다.
이 codelab을 실행하는 과정에는 많은 비용이 들지 않지만 더 많은 리소스를 사용하려고 하거나 실행 중일 경우 비용이 더 들 수 있습니다(이 문서 마지막의 '삭제' 섹션 참조).
Google Cloud Platform의 신규 사용자는 $300 무료 체험판을 사용할 수 있습니다.
API 사용 설정
화면 왼쪽 상단의 메뉴 아이콘을 클릭합니다.
드롭다운에서 API 관리자를 선택합니다.
검색창에서 'Google Compute Engine'을 검색합니다. 표시되는 결과 목록에서 'Google Compute Engine API'를 클릭합니다.
Google Compute Engine 페이지에서 사용 설정을 클릭합니다.
사용 설정되면 화살표를 클릭하여 뒤로 돌아갑니다.
이제 다음 API를 검색하여 사용 설정합니다.
- Google Dataflow API
- Stackdriver Logging API
- Google Cloud Storage
- Google Cloud Storage JSON API
- BigQuery API
- Google Cloud Pub/Sub API
- Google Cloud Datastore API
Google Cloud Platform Console에서 화면 왼쪽 상단의 메뉴 아이콘을 클릭합니다.
아래로 스크롤하여 Storage 하위 섹션에서 Cloud Storage를 선택합니다.
이제 Cloud Storage 브라우저가 표시됩니다. 현재 Cloud Storage 버킷이 없는 프로젝트를 사용 중이라고 가정하는 경우 새 버킷을 만들도록 초대하는 대화상자가 표시됩니다.
버킷 만들기 버튼을 눌러 버킷을 만듭니다.
버킷 이름을 입력합니다. 대화상자에서 알 수 있듯이 버킷 이름은 Cloud Storage 전체에서 고유해야 합니다. 따라서 "test"와 같이 명확한 이름을 선택하면 다른 사용자가 이미 해당 이름으로 버킷을 만든 후 오류가 발생할 수 있습니다.
버킷 이름에 허용되는 문자에 대한 규칙도 있습니다. 문자 또는 숫자로 버킷 이름을 시작하고 끝낼 때 중간에 대시만 사용해도 괜찮으며 괜찮습니다. 특수문자를 사용하거나 버킷 이름이 문자 또는 숫자가 아닌 다른 값으로 시작하거나 종료하려고 하면 대화상자에 규칙이 표시됩니다.
버킷에 대해 고유 이름을 입력하고 만들기를 누릅니다. 이미 사용 중인 항목을 선택하면 위에 표시된 오류 메시지가 나타납니다. 버킷이 만들어지면 브라우저에서 비어 있는 새 버킷으로 이동합니다.
물론 표시되는 버킷 이름은 모든 프로젝트에서 고유해야 하므로 다릅니다.
Google Cloud Shell 활성화
GCP 콘솔에서 오른쪽 상단 툴바의 Cloud Shell 아이콘을 클릭합니다.
그런 다음 'Cloud Shell 시작'을 클릭합니다.
환경을 프로비저닝하고 연결하는 데 몇 분 정도 소요됩니다.
가상 머신은 필요한 모든 개발 도구와 함께 로드됩니다. 영구적인 5GB 홈 디렉토리를 제공하고 Google Cloud에서 실행되므로 네트워크 성능과 인증이 크게 개선됩니다. 이 실습에서 대부분의 작업은 브라우저나 Google Chromebook만을 이용하여 수행할 수 있습니다.
Cloud Shell에 연결되면 인증이 이미 완료되어 있고 프로젝트가 이미 PROJECT_ID로 설정되어 있음을 확인할 수 있습니다.
Cloud Shell에서 다음 명령어를 실행하여 인증되었는지 확인합니다.
gcloud auth list
명령어 결과
Credentialed accounts: - <myaccount>@<mydomain>.com (active)
gcloud config list project
명령어 결과
[core] project = <PROJECT_ID>
또는 다음 명령어로 설정할 수 있습니다.
gcloud config set project <PROJECT_ID>
명령어 결과
Updated property [core/project].
Cloud Shell이 출시된 후에 자바용 Cloud Dataflow SDK가 포함된 Maven 프로젝트를 만들어 시작해 보겠습니다.
셸에서 다음과 같이 mvn archetype:generate
명령어를 실행합니다.
mvn archetype:generate \
-DarchetypeArtifactId=google-cloud-dataflow-java-archetypes-examples \
-DarchetypeGroupId=com.google.cloud.dataflow \
-DarchetypeVersion=1.9.0 \
-DgroupId=com.example \
-DartifactId=first-dataflow \
-Dversion="0.1" \
-DinteractiveMode=false \
-Dpackage=com.example
명령어를 실행하면 현재 디렉터리 아래에 first-dataflow
라는 새 디렉터리가 표시됩니다. first-dataflow
에는 자바용 Cloud Dataflow SDK 및 예시 파이프라인이 포함된 Maven 프로젝트가 포함되어 있습니다.
먼저 프로젝트 ID와 Cloud Storage 버킷 이름을 환경 변수로 저장합니다. Cloud Shell에서 이 작업을 수행할 수 있습니다. <your_project_id>
를 본인의 프로젝트 ID로 바꿔야 합니다.
export PROJECT_ID=<your_project_id>
이제 Cloud Storage 버킷에 대해서도 동일한 작업을 수행합니다. <your_bucket_name>
을 이전 단계에서 버킷을 만드는 데 사용한 고유 이름으로 바꾸어야 합니다.
export BUCKET_NAME=<your_bucket_name>
first-dataflow/
디렉터리로 변경합니다.
cd first-dataflow
텍스트를 읽고, 텍스트 줄을 개별 단어로 토큰화하고, 각 단어에 대해 빈도를 세는 Memcache라는 파이프라인을 실행합니다. 먼저 파이프라인을 실행해 보겠습니다. 실행 중인 각 단계에서 어떤 일이 일어나는지 살펴보겠습니다.
셸 또는 터미널 창에서 mvn compile exec:java
명령어를 실행하여 파이프라인을 시작합니다. --project, --stagingLocation,
및 --output
인수의 경우 아래 명령어는 이 단계의 앞부분에서 설정한 환경 변수를 참조합니다.
mvn compile exec:java \
-Dexec.mainClass=com.example.WordCount \
-Dexec.args="--project=${PROJECT_ID} \
--stagingLocation=gs://${BUCKET_NAME}/staging/ \
--output=gs://${BUCKET_NAME}/output \
--runner=BlockingDataflowPipelineRunner"
작업이 실행되는 동안 작업 목록에서 작업을 찾을 수 있습니다.
Google Cloud Platform Console에서 Cloud Dataflow Monitoring UI를 엽니다. 실행 중 상태인 워드카운트 작업이 표시됩니다.
이제 파이프라인 매개변수를 살펴보겠습니다. 먼저 작업 이름을 클릭합니다.
작업을 선택하면 실행 그래프를 볼 수 있습니다. 파이프라인의 실행 그래프는 파이프라인의 각 변환을 변환 이름과 일부 상태 정보를 포함하는 상자로 나타냅니다. 각 단계의 오른쪽 상단에 있는 캐럿을 클릭하여 자세한 내용을 확인할 수 있습니다.
파이프라인이 각 단계에서 데이터를 어떻게 변환하는지 살펴보겠습니다.
- 읽기: 이 단계에서는 파이프라인이 입력 소스에서 읽습니다. 이 경우에는 Cloud Storage의 텍스트 파일로, 셰익스피어 King Lear의 전체 텍스트입니다. 파이프라인은 파일을 한 줄씩 읽고 각각
PCollection
을 출력합니다. 여기서 텍스트 파일의 각 줄은 컬렉션의 요소입니다. - CountWords:
CountWords
단계는 두 부분으로 구성됩니다. 먼저ExtractWords
이라는 동시 실행 함수 (vCPM)를 사용하여 각 행을 개별 단어로 토큰화합니다. ExtractWords의 출력은 각 요소가 단어인 새로운 PCollection입니다. 다음 단계인Count
는 Dataflow SDK에서 제공하는 변환을 활용하여 키가 고유한 단어이고 값이 발생하는 횟수인 키-값 쌍을 반환합니다. 다음은CountWords
를 구현하는 메서드입니다. GitHub에서 전체 gclid.java 파일을 확인할 수 있습니다.
/**
* A PTransform that converts a PCollection containing lines of text
* into a PCollection of formatted word counts.
*/
public static class CountWords extends PTransform<PCollection<String>,
PCollection<KV<String, Long>>> {
@Override
public PCollection<KV<String, Long>> apply(PCollection<String> lines) {
// Convert lines of text into individual words.
PCollection<String> words = lines.apply(
ParDo.of(new ExtractWordsFn()));
// Count the number of times each word occurs.
PCollection<KV<String, Long>> wordCounts =
words.apply(Count.<String>perElement());
return wordCounts;
}
}
- FormatAsText: 각 키의 값 쌍을 인쇄 가능한 문자열로 포맷하는 함수입니다. 다음은 이를 구현하는
FormatAsText
변환입니다.
/** A SimpleFunction that converts a Word and Count into a printable string. */
public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> {
@Override
public String apply(KV<String, Long> input) {
return input.getKey() + ": " + input.getValue();
}
}
- WriteCounts: 이 단계에서는 인쇄 가능한 문자열을 샤딩된 여러 텍스트 파일에 씁니다.
잠시 후 파이프라인의 결과 결과를 살펴보겠습니다.
이제 mvn compile exec:java
명령어에 포함된 파이프라인 매개변수가 포함된 그래프 오른쪽에 있는 요약 페이지를 살펴봅니다.
파이프라인의 커스텀 카운터도 확인할 수 있습니다. 이 경우 실행 중에 현재까지 발생한 빈 줄의 수를 표시합니다. 파이프라인에 새 카운터를 추가하여 애플리케이션별 측정항목을 추적할 수 있습니다.
로그 아이콘을 클릭하면 특정 오류 메시지를 볼 수 있습니다.
최소 심각도 드롭다운 메뉴를 사용하여 작업 로그 탭에 표시되는 메시지를 필터링합니다.
로그 탭의 작업자 로그 버튼을 사용하여 파이프라인을 실행하는 Compute Engine 인스턴스의 작업자 로그를 확인할 수 있습니다. 작업자 로그는 코드로 생성된 로그 줄과 이를 실행하는 Dataflow로 생성된 코드로 구성됩니다.
파이프라인에서 오류를 디버그하려고 하는 경우 문제 해결에 도움이 되는 Worker Logs에 로깅이 추가되기도 합니다. 이러한 로그는 모든 작업자에서 집계되며, 필터링하고 검색할 수 있습니다.
다음 단계에서 작업이 성공했는지 확인합니다.
Google Cloud Platform Console에서 Cloud Dataflow Monitoring UI를 엽니다.
처음에는 실행 중 상태로 된 단어 수 작업을 확인한 후 성공 상태로 표시합니다.
작업을 실행하는 데 약 3~4분이 소요됩니다.
언제 파이프라인을 실행하고 출력 버킷을 지정했는지 기억하시나요? 결과를 살펴보겠습니다. 후면의 왕국이 각 단어가 몇 번이나 나올까요? Google Cloud Platform Console에서 Cloud Storage 브라우저로 돌아갑니다. 버킷에서 작업을 통해 만든 출력 파일 및 스테이징 파일을 확인할 수 있습니다.
Google Cloud Platform Console에서 리소스를 종료할 수 있습니다.
Google Cloud Platform Console에서 Cloud Storage 브라우저를 엽니다.
만든 버킷 옆에 있는 체크박스를 선택합니다.
삭제를 클릭하여 버킷과 콘텐츠를 영구 삭제합니다.
Cloud Dataflow SDK로 Maven 프로젝트를 만들고, Google Cloud Platform Console을 사용해 예시 파이프라인을 실행하고, 연결된 Cloud Storage 버킷과 그 콘텐츠를 삭제하는 방법을 배웠습니다.
자세히 알아보기
- Dataflow 문서: https://cloud.google.com/dataflow/docs/
라이선스
이 저작물은 크리에이티브 커먼즈 저작자 표시 3.0 일반 라이선스 및 Apache 2.0 라이선스에 따라 라이선스가 부여됩니다.