Dataflow는 ETL, 일괄 계산, 연속 계산 등 광범위한 데이터 처리 패턴을 개발하고 실행하는 통합 프로그래밍 모델이자 관리형 서비스입니다. Dataflow는 관리형 서비스이므로 필요에 따라 리소스를 할당하여 높은 활용 효율성을 유지하면서도 지연 시간을 최소화할 수 있습니다.
Dataflow 모델은 일괄 처리와 스트림 처리를 결합하므로 개발자는 정확성, 비용, 처리 시간 간에 절충하지 않아도 됩니다. 이 Codelab에서는 텍스트 파일에 있는 고유 단어의 발생 횟수를 계산하는 Dataflow 파이프라인을 실행하는 방법을 알아봅니다.
이 튜토리얼은 https://cloud.google.com/dataflow/docs/quickstarts/quickstart-java-maven에서 가져왔습니다.
학습할 내용
- Cloud Dataflow SDK로 Maven 프로젝트를 만드는 방법
- Google Cloud Platform 콘솔을 사용하여 예시 파이프라인 실행
- 연결된 Cloud Storage 버킷 및 콘텐츠를 삭제하는 방법
필요한 항목
본 가이드를 어떻게 사용하실 계획인가요?
귀하의 Google Cloud Platform 서비스 사용 경험을 평가해 주세요.
자습형 환경 설정
아직 Google 계정 (Gmail 또는 Google Apps)이 없으면 계정을 만들어야 합니다. Google Cloud Platform 콘솔 (console.cloud.google.com)에 로그인하고 새 프로젝트를 만듭니다.
모든 Google Cloud 프로젝트에서 고유한 이름인 프로젝트 ID를 기억하세요(위의 이름은 이미 사용되었으므로 사용할 수 없습니다). 이 ID는 나중에 이 Codelab에서 PROJECT_ID
라고 부릅니다.
그런 다음 Google Cloud 리소스를 사용할 수 있도록 Cloud 콘솔에서 결제를 사용 설정해야 합니다.
이 codelab을 실행하는 과정에는 많은 비용이 들지 않지만 더 많은 리소스를 사용하려고 하거나 실행 중일 경우 비용이 더 들 수 있습니다(이 문서 마지막의 '삭제' 섹션 참조).
Google Cloud Platform 신규 사용자는 $300 상당의 무료 체험판을 사용할 수 있습니다.
API 사용 설정
화면 왼쪽 상단의 메뉴 아이콘을 클릭합니다.
드롭다운에서 API 관리자를 선택합니다.
검색창에서 'Google Compute Engine'을 검색합니다. 결과 목록에서 'Google Compute Engine API'를 클릭합니다.
Google Compute Engine 페이지에서 사용 설정을 클릭합니다.
사용 설정한 후 화살표를 클릭하여 뒤로 돌아갑니다.
이제 다음 API를 검색하여 사용 설정합니다.
- Google Dataflow API
- Stackdriver Logging API
- Google Cloud Storage
- Google Cloud Storage JSON API
- BigQuery API
- Google Cloud Pub/Sub API
- Google Cloud Datastore API
Google Cloud Platform Console에서 화면 왼쪽 상단의 메뉴 아이콘을 클릭합니다.
아래로 스크롤하여 스토리지 하위 섹션에서 Cloud Storage를 선택합니다.
이제 Cloud Storage 브라우저가 표시됩니다. 현재 Cloud Storage 버킷이 없는 프로젝트를 사용하고 있다면 새 버킷을 만들라는 메시지가 표시됩니다.
버킷 만들기 버튼을 눌러 버킷을 만듭니다.
버킷 이름을 입력합니다. 대화상자에 나와 있듯이 버킷 이름은 Cloud Storage 전체에서 고유해야 합니다. 따라서 'test'와 같은 명확한 이름을 선택하면 다른 사용자가 이미 해당 이름으로 버킷을 만들어 오류가 발생할 수 있습니다.
버킷 이름에 허용되는 문자에 관한 규칙도 있습니다. 버킷 이름을 문자 또는 숫자로 시작하고 끝내고 중간에 대시만 사용하면 됩니다. 특수 문자를 사용하거나 버킷 이름을 문자나 숫자가 아닌 다른 것으로 시작하거나 끝내려고 하면 대화상자에 규칙이 표시됩니다.
버킷의 고유한 이름을 입력하고 만들기를 누릅니다. 이미 사용 중인 이름을 선택하면 위와 같은 오류 메시지가 표시됩니다. 버킷을 성공적으로 만들면 브라우저에서 비어 있는 새 버킷으로 이동합니다.
버킷 이름은 모든 프로젝트에서 고유해야 하므로 표시되는 이름은 다릅니다.
Google Cloud Shell 활성화하기
GCP 콘솔에서 오른쪽 상단 툴바의 Cloud Shell 아이콘을 클릭합니다.
그런 다음 'Cloud Shell 시작'을 클릭합니다.
환경을 프로비저닝하고 연결하는 데 몇 분 정도 소요됩니다.
가상 머신은 필요한 모든 개발 도구와 함께 로드됩니다. 영구적인 5GB 홈 디렉토리를 제공하고 Google Cloud에서 실행되므로 네트워크 성능과 인증이 크게 개선됩니다. 이 실습에서 대부분의 작업은 브라우저나 Google Chromebook만을 이용하여 수행할 수 있습니다.
Cloud Shell에 연결되면 인증이 완료되었고 프로젝트가 PROJECT_ID로 이미 설정된 것을 확인할 수 있습니다.
Cloud Shell에서 다음 명령어를 실행하여 인증되었는지 확인합니다.
gcloud auth list
명령어 결과
Credentialed accounts: - <myaccount>@<mydomain>.com (active)
gcloud config list project
명령어 결과
[core] project = <PROJECT_ID>
또는 다음 명령어로 설정할 수 있습니다.
gcloud config set project <PROJECT_ID>
명령어 결과
Updated property [core/project].
Cloud Shell이 실행되면 Java용 Cloud Dataflow SDK가 포함된 Maven 프로젝트를 만들어 시작해 보겠습니다.
셸에서 다음과 같이 mvn archetype:generate
명령어를 실행합니다.
mvn archetype:generate \
-DarchetypeArtifactId=google-cloud-dataflow-java-archetypes-examples \
-DarchetypeGroupId=com.google.cloud.dataflow \
-DarchetypeVersion=1.9.0 \
-DgroupId=com.example \
-DartifactId=first-dataflow \
-Dversion="0.1" \
-DinteractiveMode=false \
-Dpackage=com.example
명령어를 실행하면 현재 디렉터리 아래에 first-dataflow
이라는 새 디렉터리가 표시됩니다. first-dataflow
에는 Java용 Cloud Dataflow SDK와 예시 파이프라인이 포함된 Maven 프로젝트가 포함되어 있습니다.
먼저 프로젝트 ID와 Cloud Storage 버킷 이름을 환경 변수로 저장해 보겠습니다. Cloud Shell에서 이 작업을 수행할 수 있습니다. <your_project_id>
를 자체 프로젝트 ID로 바꿔야 합니다.
export PROJECT_ID=<your_project_id>
이제 Cloud Storage 버킷에 대해서도 동일한 작업을 실행합니다. <your_bucket_name>
을 이전 단계에서 버킷을 만들 때 사용한 고유한 이름으로 바꿔야 합니다.
export BUCKET_NAME=<your_bucket_name>
first-dataflow/
디렉터리로 변경합니다.
cd first-dataflow
텍스트를 읽고, 텍스트 줄을 개별 단어로 토큰화하고, 각 단어의 출현 빈도를 세는 WordCount라는 파이프라인을 실행합니다. 먼저 파이프라인을 실행하고 실행되는 동안 각 단계에서 어떤 일이 일어나는지 살펴보겠습니다.
셸 또는 터미널 창에서 mvn compile exec:java
명령어를 실행하여 파이프라인을 시작합니다. --project, --stagingLocation,
및 --output
인수와 관련해 아래 명령어는 이 단계의 앞부분에서 설정한 환경 변수를 참조합니다.
mvn compile exec:java \
-Dexec.mainClass=com.example.WordCount \
-Dexec.args="--project=${PROJECT_ID} \
--stagingLocation=gs://${BUCKET_NAME}/staging/ \
--output=gs://${BUCKET_NAME}/output \
--runner=BlockingDataflowPipelineRunner"
작업이 실행되는 동안 작업 목록에서 작업을 찾아보겠습니다.
Google Cloud Platform Console에서 Cloud Dataflow 모니터링 UI를 엽니다. 상태가 실행 중인 wordcount 작업이 표시됩니다.
이제 파이프라인 매개변수를 살펴보겠습니다. 작업 이름을 클릭하여 시작합니다.
작업을 선택하면 실행 그래프를 볼 수 있습니다. 파이프라인의 실행 그래프는 파이프라인 내의 각 변환을 변환 이름과 일부 상태 정보가 포함되어 있는 상자로 표현합니다. 각 단계의 오른쪽 상단에 있는 캐럿을 클릭하면 세부정보를 확인할 수 있습니다.
각 단계에서 파이프라인이 데이터를 어떻게 변환하는지 살펴보겠습니다.
- 읽기: 이 단계에서 파이프라인은 입력 소스에서 읽습니다. 이 경우 셰익스피어의 희곡 리어왕의 전체 텍스트가 포함된 Cloud Storage의 텍스트 파일입니다. 파이프라인은 파일을 한 줄씩 읽고 각 줄을
PCollection
로 출력합니다. 텍스트 파일의 각 줄은 컬렉션의 요소입니다. - CountWords:
CountWords
단계는 두 부분으로 구성됩니다. 먼저ExtractWords
라는 병렬 do 함수 (ParDo)를 사용하여 각 줄을 개별 단어로 토큰화합니다. ExtractWords의 출력은 각 요소가 단어인 새 PCollection입니다. 다음 단계인Count
에서는 Dataflow SDK에서 제공하는 변환을 활용합니다. 이 변환은 키가 고유한 단어이고 값이 단어가 발생한 횟수인 키-값 쌍을 반환합니다. 다음은CountWords
를 구현하는 메서드입니다. GitHub에서 전체 WordCount.java 파일을 확인할 수 있습니다.
/**
* A PTransform that converts a PCollection containing lines of text
* into a PCollection of formatted word counts.
*/
public static class CountWords extends PTransform<PCollection<String>,
PCollection<KV<String, Long>>> {
@Override
public PCollection<KV<String, Long>> apply(PCollection<String> lines) {
// Convert lines of text into individual words.
PCollection<String> words = lines.apply(
ParDo.of(new ExtractWordsFn()));
// Count the number of times each word occurs.
PCollection<KV<String, Long>> wordCounts =
words.apply(Count.<String>perElement());
return wordCounts;
}
}
- FormatAsText: 각 키-값 쌍을 인쇄 가능한 문자열로 형식을 지정하는 함수입니다. 이를 구현하는
FormatAsText
변환은 다음과 같습니다.
/** A SimpleFunction that converts a Word and Count into a printable string. */
public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> {
@Override
public String apply(KV<String, Long> input) {
return input.getKey() + ": " + input.getValue();
}
}
- WriteCounts: 이 단계에서는 인쇄 가능한 문자열을 여러 샤드 텍스트 파일에 작성합니다.
몇 분 후에 파이프라인의 결과 출력을 살펴보겠습니다.
이제 그래프 오른쪽에 있는 요약 페이지를 살펴보세요. 여기에는 mvn compile exec:java
명령어에 포함된 파이프라인 매개변수가 포함되어 있습니다.
파이프라인의 맞춤 카운터도 확인할 수 있습니다. 이 경우 실행 중에 지금까지 발견된 빈 줄의 수를 보여줍니다. 애플리케이션별 측정항목을 추적하기 위해 파이프라인에 새 카운터를 추가할 수 있습니다.
로그 아이콘을 클릭하여 구체적인 오류 메시지를 확인할 수 있습니다.
최소 심각도 드롭다운 메뉴를 사용하여 작업 로그 탭에 표시되는 메시지를 필터링합니다.
로그 탭의 작업자 로그 버튼을 사용하여 파이프라인을 실행하는 Compute Engine 인스턴스의 작업자 로그를 볼 수 있습니다. 작업자 로그는 코드와 코드를 실행하는 Dataflow 생성 코드로 구성됩니다.
파이프라인의 실패를 디버깅하는 경우 문제 해결에 도움이 되는 추가 로깅이 작업자 로그에 있는 경우가 많습니다. 이러한 로그는 모든 작업자에서 집계되며 필터링하고 검색할 수 있습니다.
다음 단계에서는 작업이 성공했는지 확인합니다.
Google Cloud Platform Console에서 Cloud Dataflow 모니터링 UI를 엽니다.
초기 상태가 실행 중인 wordcount 작업이 나타난 후 성공으로 상태가 바뀝니다.
작업을 실행하는 데 약 3~4분이 소요됩니다.
파이프라인을 실행하고 출력 버킷을 지정했던 것을 기억하시나요? 결과를 살펴보겠습니다. 리어왕에 각 단어가 몇 번이나 나오는지 궁금하지 않으신가요? Google Cloud Platform Console에서 Cloud Storage 브라우저로 다시 이동합니다. 버킷에서 작업을 통해 만든 출력 파일 및 스테이징 파일을 확인할 수 있습니다.
Google Cloud Platform 콘솔에서 리소스를 종료할 수 있습니다.
Google Cloud Platform Console에서 Cloud Storage 브라우저를 엽니다.
만든 버킷 옆에 있는 확인란을 선택합니다.
삭제를 클릭하여 버킷과 콘텐츠를 영구 삭제합니다.
Cloud Dataflow SDK로 Maven 프로젝트를 만들고, Google Cloud Platform Console을 사용하여 예시 파이프라인을 실행하고, 연결된 Cloud Storage 버킷과 콘텐츠를 삭제하는 방법을 알아봤습니다.
자세히 알아보기
- Dataflow 문서: https://cloud.google.com/dataflow/docs/
라이선스
이 작업물은 크리에이티브 커먼즈 저작자 표시 3.0 일반 라이선스 및 Apache 2.0 라이선스에 따라 사용이 허가되었습니다.