Dataflow เป็นโมเดลการเขียนโปรแกรมแบบรวมและบริการที่มีการจัดการสำหรับการพัฒนาและดำเนินการรูปแบบการประมวลผลข้อมูลที่หลากหลาย รวมถึง ETL, การคำนวณแบบกลุ่ม และการคำนวณอย่างต่อเนื่อง เนื่องจาก Dataflow เป็นบริการที่มีการจัดการ จึงสามารถจัดสรรทรัพยากรตามความต้องการเพื่อลดเวลาในการตอบสนองในขณะที่ยังคงประสิทธิภาพการใช้งานสูงไว้ได้
โมเดล Dataflow ผสานรวมการประมวลผลแบบกลุ่มและแบบสตรีม เพื่อให้นักพัฒนาแอปไม่ต้องเลือกระหว่างความถูกต้อง ต้นทุน และเวลาในการประมวลผล ในโค้ดแล็บนี้ คุณจะได้เรียนรู้วิธีเรียกใช้ไปป์ไลน์ Dataflow ที่นับจำนวนคำที่ไม่ซ้ำกันในไฟล์ข้อความ
บทแนะนำนี้ดัดแปลงมาจาก https://cloud.google.com/dataflow/docs/quickstarts/quickstart-java-maven
สิ่งที่คุณจะได้เรียนรู้
- วิธีสร้างโปรเจ็กต์ Maven ด้วย Cloud Dataflow SDK
- เรียกใช้ไปป์ไลน์ตัวอย่างโดยใช้คอนโซล Google Cloud Platform
- วิธีลบที่เก็บข้อมูล Cloud Storage ที่เชื่อมโยงและเนื้อหาของที่เก็บข้อมูล
สิ่งที่ต้องมี
คุณจะใช้บทแนะนำนี้อย่างไร
คุณจะให้คะแนนประสบการณ์การใช้บริการ Google Cloud Platform เท่าใด
การตั้งค่าสภาพแวดล้อมแบบเรียนรู้ด้วยตนเอง
หากยังไม่มีบัญชี Google (Gmail หรือ Google Apps) คุณต้องสร้างบัญชี ลงชื่อเข้าใช้คอนโซล Google Cloud Platform (console.cloud.google.com) แล้วสร้างโปรเจ็กต์ใหม่โดยทำดังนี้
โปรดจดจำรหัสโปรเจ็กต์ ซึ่งเป็นชื่อที่ไม่ซ้ำกันในโปรเจ็กต์ Google Cloud ทั้งหมด (ชื่อด้านบนถูกใช้ไปแล้วและจะใช้ไม่ได้ ขออภัย) ซึ่งจะเรียกว่า PROJECT_ID
ในภายหลังใน Codelab นี้
จากนั้นคุณจะต้องเปิดใช้การเรียกเก็บเงินใน Cloud Console เพื่อใช้ทรัพยากร Google Cloud
การทำ Codelab นี้ไม่ควรมีค่าใช้จ่ายเกิน 2-3 ดอลลาร์ แต่ก็อาจมีค่าใช้จ่ายมากกว่านี้หากคุณตัดสินใจใช้ทรัพยากรเพิ่มเติมหรือปล่อยให้ทรัพยากรทำงานต่อไป (ดูส่วน "การล้างข้อมูล" ที่ท้ายเอกสารนี้)
ผู้ใช้ใหม่ของ Google Cloud Platform มีสิทธิ์ทดลองใช้ฟรี$300
เปิดใช้ API
คลิกไอคอนเมนูที่ด้านซ้ายบนของหน้าจอ
เลือก API Manager จากเมนูแบบเลื่อนลง
ค้นหา "Google Compute Engine" ในช่องค้นหา คลิก "Google Compute Engine API" ในรายการผลลัพธ์ที่ปรากฏ
ในหน้า Google Compute Engine ให้คลิกเปิดใช้
เมื่อเปิดใช้แล้ว ให้คลิกลูกศรเพื่อกลับไป
ตอนนี้ให้ค้นหา API ต่อไปนี้และเปิดใช้ด้วย
- Google Dataflow API
- Stackdriver Logging API
- Google Cloud Storage
- Google Cloud Storage JSON API
- BigQuery API
- Google Cloud Pub/Sub API
- Google Cloud Datastore API
ในคอนโซล Google Cloud Platform ให้คลิกไอคอนเมนูที่ด้านซ้ายบนของหน้าจอ
เลื่อนลงแล้วเลือก Cloud Storage ในส่วนย่อยพื้นที่เก็บข้อมูล
ตอนนี้คุณควรเห็นเบราว์เซอร์ Cloud Storage และหากคุณใช้โปรเจ็กต์ที่ไม่มีที่เก็บข้อมูล Cloud Storage ในขณะนี้ คุณจะเห็นกล่องโต้ตอบที่เชิญให้คุณสร้างที่เก็บข้อมูลใหม่
กดปุ่มสร้างที่เก็บข้อมูลเพื่อสร้างที่เก็บข้อมูล
ป้อนชื่อสำหรับที่เก็บข้อมูล ดังที่กล่องโต้ตอบระบุไว้ ชื่อที่เก็บข้อมูลต้องไม่ซ้ำกันใน Cloud Storage ทั้งหมด ดังนั้นหากเลือกชื่อที่ชัดเจน เช่น "test" คุณอาจพบว่ามีคนสร้างที่เก็บข้อมูลที่มีชื่อนั้นอยู่แล้ว และจะได้รับข้อผิดพลาด
นอกจากนี้ ยังมีกฎบางอย่างเกี่ยวกับอักขระที่อนุญาตในชื่อที่เก็บข้อมูลด้วย หากคุณขึ้นต้นและลงท้ายชื่อที่เก็บข้อมูลด้วยตัวอักษรหรือตัวเลข และใช้เฉพาะขีดกลางตรงกลาง คุณก็จะไม่พบปัญหา หากคุณพยายามใช้สัญลักษณ์พิเศษ หรือพยายามขึ้นต้นหรือลงท้ายชื่อที่เก็บข้อมูลด้วยสิ่งอื่นที่ไม่ใช่ตัวอักษรหรือตัวเลข กล่องโต้ตอบจะแจ้งเตือนกฎให้คุณทราบ
ป้อนชื่อที่ไม่ซ้ำกันสำหรับที่เก็บข้อมูล แล้วกดสร้าง หากเลือกสิ่งที่ใช้อยู่แล้ว คุณจะเห็นข้อความแสดงข้อผิดพลาดที่แสดงด้านบน เมื่อสร้างที่เก็บข้อมูลเรียบร้อยแล้ว ระบบจะนำคุณไปยังที่เก็บข้อมูลใหม่ที่ว่างเปล่าในเบราว์เซอร์
แน่นอนว่าชื่อที่เก็บข้อมูลที่คุณเห็นจะแตกต่างกัน เนื่องจากชื่อที่เก็บข้อมูลต้องไม่ซ้ำกันในทุกโปรเจ็กต์
เปิดใช้งาน Google Cloud Shell
จาก GCP Console ให้คลิกไอคอน Cloud Shell ในแถบเครื่องมือด้านขวาบน
จากนั้นคลิก "เริ่ม Cloud Shell"
การจัดสรรและเชื่อมต่อกับสภาพแวดล้อมจะใช้เวลาเพียงไม่กี่นาที
เครื่องเสมือนนี้มาพร้อมเครื่องมือพัฒนาทั้งหมดที่คุณต้องการ โดยมีไดเรกทอรีหลักขนาด 5 GB ที่คงอยู่ถาวร และทำงานบน Google Cloud ซึ่งช่วยเพิ่มประสิทธิภาพเครือข่ายและการตรวจสอบสิทธิ์ได้อย่างมาก คุณสามารถทำงานส่วนใหญ่ในแล็บนี้ได้โดยใช้เพียงเบราว์เซอร์หรือ Google Chromebook
เมื่อเชื่อมต่อกับ Cloud Shell แล้ว คุณควรเห็นว่าคุณได้รับการตรวจสอบสิทธิ์แล้วและระบบได้ตั้งค่าโปรเจ็กต์เป็น PROJECT_ID แล้ว
เรียกใช้คำสั่งต่อไปนี้ใน Cloud Shell เพื่อยืนยันว่าคุณได้รับการตรวจสอบสิทธิ์แล้ว
gcloud auth list
เอาต์พุตของคำสั่ง
Credentialed accounts: - <myaccount>@<mydomain>.com (active)
gcloud config list project
เอาต์พุตของคำสั่ง
[core] project = <PROJECT_ID>
หากไม่ได้ตั้งค่าไว้ คุณตั้งค่าได้ด้วยคำสั่งนี้
gcloud config set project <PROJECT_ID>
เอาต์พุตของคำสั่ง
Updated property [core/project].
หลังจากเปิดใช้ Cloud Shell แล้ว มาเริ่มต้นใช้งานโดยการสร้างโปรเจ็กต์ Maven ที่มี Cloud Dataflow SDK สำหรับ Java กัน
เรียกใช้คำสั่ง mvn archetype:generate
ในเชลล์ดังนี้
mvn archetype:generate \
-DarchetypeArtifactId=google-cloud-dataflow-java-archetypes-examples \
-DarchetypeGroupId=com.google.cloud.dataflow \
-DarchetypeVersion=1.9.0 \
-DgroupId=com.example \
-DartifactId=first-dataflow \
-Dversion="0.1" \
-DinteractiveMode=false \
-Dpackage=com.example
หลังจากเรียกใช้คำสั่งแล้ว คุณควรเห็นไดเรกทอรีใหม่ชื่อ first-dataflow
ในไดเรกทอรีปัจจุบัน first-dataflow
มีโปรเจ็กต์ Maven ที่มี Cloud Dataflow SDK สำหรับ Java และไปป์ไลน์ตัวอย่าง
มาเริ่มต้นด้วยการบันทึกรหัสโปรเจ็กต์และชื่อที่เก็บข้อมูล Cloud Storage เป็นตัวแปรสภาพแวดล้อมกัน โดยทำได้ใน Cloud Shell อย่าลืมแทนที่ <your_project_id>
ด้วยรหัสโปรเจ็กต์ของคุณเอง
export PROJECT_ID=<your_project_id>
ตอนนี้เราจะทำเช่นเดียวกันกับที่เก็บข้อมูล Cloud Storage อย่าลืมแทนที่ <your_bucket_name>
ด้วยชื่อที่ไม่ซ้ำกันที่คุณใช้สร้างที่เก็บข้อมูลในขั้นตอนก่อนหน้า
export BUCKET_NAME=<your_bucket_name>
เปลี่ยนเป็นไดเรกทอรี first-dataflow/
cd first-dataflow
เราจะเรียกใช้ไปป์ไลน์ที่ชื่อ WordCount ซึ่งอ่านข้อความ เปลี่ยนแต่ละคำในบรรทัดข้อความให้เป็นโทเคน และนับจำนวนความถี่ในแต่ละคำ ก่อนอื่นเราจะเรียกใช้ไปป์ไลน์ และในขณะที่ไปป์ไลน์ทำงานอยู่ เราจะมาดูสิ่งที่เกิดขึ้นในแต่ละขั้นตอน
เริ่มไปป์ไลน์โดยเรียกใช้คำสั่ง mvn compile exec:java
ในเชลล์หรือหน้าต่างเทอร์มินัล สำหรับอาร์กิวเมนต์ --project, --stagingLocation,
และ --output
คำสั่งด้านล่างจะอ้างอิงตัวแปรสภาพแวดล้อมที่คุณตั้งค่าไว้ก่อนหน้านี้ในขั้นตอนนี้
mvn compile exec:java \
-Dexec.mainClass=com.example.WordCount \
-Dexec.args="--project=${PROJECT_ID} \
--stagingLocation=gs://${BUCKET_NAME}/staging/ \
--output=gs://${BUCKET_NAME}/output \
--runner=BlockingDataflowPipelineRunner"
ขณะที่งานกำลังทำงานอยู่ ให้ค้นหางานในรายการงาน
เปิด UI ของ Cloud Dataflow Monitoring ใน Google Cloud Platform Console คุณควรเห็นงานนับคำที่มีสถานะเป็นกำลังทำงาน ดังนี้
ตอนนี้เรามาดูพารามิเตอร์ของไปป์ไลน์กัน เริ่มต้นโดยคลิกชื่องานของคุณ
เมื่อเลือกงาน คุณจะดูกราฟการดำเนินการได้ กราฟการดำเนินการของไปป์ไลน์จะแสดงการเปลี่ยนรูปแบบแต่ละรายการในไปป์ไลน์เป็นกล่องที่มีชื่อการเปลี่ยนรูปแบบและข้อมูลสถานะบางอย่าง คุณคลิกเครื่องหมายแคริเอตที่มุมขวาบนของแต่ละขั้นตอนเพื่อดูรายละเอียดเพิ่มเติมได้
มาดูกันว่าไปป์ไลน์จะเปลี่ยนรูปแบบข้อมูลในแต่ละขั้นตอนอย่างไร
- อ่าน: ในขั้นตอนนี้ ไปป์ไลน์จะอ่านจากแหล่งข้อมูลอินพุต ในกรณีนี้คือไฟล์ข้อความจาก Cloud Storage ที่มีข้อความทั้งหมดของบทละคร King Lear ของเชกสเปียร์ ไปป์ไลน์ของเราจะอ่านไฟล์ทีละบรรทัดและเอาต์พุตแต่ละบรรทัดเป็น
PCollection
โดยแต่ละบรรทัดในไฟล์ข้อความคือองค์ประกอบในคอลเล็กชัน - CountWords: ขั้นตอนที่
CountWords
มี 2 ส่วน ขั้นแรก ฟังก์ชันนี้จะใช้ฟังก์ชัน do แบบขนาน (ParDo) ที่ชื่อExtractWords
เพื่อแยกแต่ละบรรทัดเป็นคำแต่ละคำ เอาต์พุตของ ExtractWords คือ PCollection ใหม่ที่แต่ละองค์ประกอบเป็นคำ ขั้นตอนถัดไปCount
ใช้การแปลงที่ SDK ของ Dataflow จัดเตรียมไว้ ซึ่งจะแสดงผลคู่คีย์-ค่า โดยที่คีย์คือคำที่ไม่ซ้ำกัน และค่าคือจำนวนครั้งที่คำนั้นปรากฏ ต่อไปนี้เป็นวิธีการใช้CountWords
และคุณสามารถดูไฟล์ WordCount.java แบบเต็มได้ใน GitHub
/**
* A PTransform that converts a PCollection containing lines of text
* into a PCollection of formatted word counts.
*/
public static class CountWords extends PTransform<PCollection<String>,
PCollection<KV<String, Long>>> {
@Override
public PCollection<KV<String, Long>> apply(PCollection<String> lines) {
// Convert lines of text into individual words.
PCollection<String> words = lines.apply(
ParDo.of(new ExtractWordsFn()));
// Count the number of times each word occurs.
PCollection<KV<String, Long>> wordCounts =
words.apply(Count.<String>perElement());
return wordCounts;
}
}
- FormatAsText: ฟังก์ชันนี้จะจัดรูปแบบคู่คีย์-ค่าแต่ละคู่เป็นสตริงที่พิมพ์ได้
FormatAsText
การแปลงเพื่อใช้ฟีเจอร์นี้มีดังนี้
/** A SimpleFunction that converts a Word and Count into a printable string. */
public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> {
@Override
public String apply(KV<String, Long> input) {
return input.getKey() + ": " + input.getValue();
}
}
- WriteCounts: ในขั้นตอนนี้ เราจะเขียนสตริงที่พิมพ์ได้ลงในไฟล์ข้อความที่แยกส่วนหลายไฟล์
เราจะมาดูเอาต์พุตที่ได้จากไปป์ไลน์ในอีกไม่กี่นาที
ตอนนี้ให้ดูหน้าสรุปทางด้านขวาของกราฟ ซึ่งมีพารามิเตอร์ไปป์ไลน์ที่เราใส่ไว้ในคำสั่ง mvn compile exec:java
นอกจากนี้ คุณยังดูตัวนับที่กำหนดเองสำหรับไปป์ไลน์ได้ด้วย ซึ่งในกรณีนี้จะแสดงจำนวนบรรทัดว่างที่พบจนถึงขณะนี้ในระหว่างการดำเนินการ คุณเพิ่มตัวนับใหม่ลงในไปป์ไลน์เพื่อติดตามเมตริกเฉพาะแอปพลิเคชันได้
คุณคลิกไอคอนบันทึกเพื่อดูข้อความแสดงข้อผิดพลาดที่เฉพาะเจาะจงได้
คุณกรองข้อความที่ปรากฏในแท็บบันทึกงานได้โดยใช้เมนูแบบเลื่อนลงความรุนแรงขั้นต่ำ
คุณสามารถใช้ปุ่มบันทึกของผู้ปฏิบัติงานในแท็บบันทึกเพื่อดูบันทึกของผู้ปฏิบัติงานสำหรับอินสแตนซ์ Compute Engine ที่เรียกใช้ไปป์ไลน์ บันทึกของ Worker ประกอบด้วยบรรทัดบันทึกที่โค้ดของคุณสร้างขึ้นและโค้ดที่ Dataflow สร้างขึ้นซึ่งเรียกใช้โค้ดของคุณ
หากคุณพยายามแก้ไขข้อบกพร่องในไปป์ไลน์ มักจะมีบันทึกเพิ่มเติมในบันทึกของ Worker ที่ช่วยแก้ปัญหาได้ โปรดทราบว่าระบบจะรวบรวมบันทึกเหล่านี้จากผู้ปฏิบัติงานทั้งหมด และสามารถกรองและค้นหาได้
ในขั้นตอนถัดไป เราจะตรวจสอบว่างานของคุณสำเร็จหรือไม่
เปิด UI ของ Cloud Dataflow Monitoring ใน Google Cloud Platform Console
คุณควรเห็นงานนับคำที่มีสถานะเป็นกำลังทำงานในตอนแรก จากนั้นเป็นสำเร็จ
งานจะใช้เวลาประมาณ 3-4 นาทีในการเรียกใช้
ยังจำตอนที่คุณเรียกใช้ไปป์ไลน์และระบุที่เก็บข้อมูลเอาต์พุตได้ไหม มาดูผลลัพธ์กัน (เพราะคุณคงอยากรู้ว่าคำแต่ละคำในKing Lear ปรากฏกี่ครั้งใช่ไหม) กลับไปที่เบราว์เซอร์ Cloud Storage ในคอนโซล Google Cloud Platform คุณควรเห็นไฟล์เอาต์พุตและไฟล์ Staging ที่งานสร้างขึ้นในที่เก็บข้อมูล
คุณปิดทรัพยากรได้จากคอนโซล Google Cloud Platform
เปิดเบราว์เซอร์ Cloud Storage ในคอนโซล Google Cloud Platform
เลือกช่องทำเครื่องหมายข้างที่เก็บข้อมูลที่คุณสร้างขึ้น
คลิกลบเพื่อลบบัคเก็ตและเนื้อหาอย่างถาวร
คุณได้เรียนรู้วิธีสร้างโปรเจ็กต์ Maven ด้วย Cloud Dataflow SDK, เรียกใช้ไปป์ไลน์ตัวอย่างโดยใช้คอนโซล Google Cloud Platform และลบที่เก็บข้อมูล Cloud Storage ที่เชื่อมโยงและเนื้อหาของที่เก็บข้อมูล
ดูข้อมูลเพิ่มเติม
- เอกสารประกอบของ Dataflow: https://cloud.google.com/dataflow/docs/
ใบอนุญาต
ผลงานนี้ได้รับอนุญาตภายใต้สัญญาอนุญาตครีเอทีฟคอมมอนส์สำหรับยอมรับสิทธิของผู้สร้าง (Creative Commons Attribution License) 3.0 แบบทั่วไป และสัญญาอนุญาต Apache 2.0