הפעלת צינור לעיבוד טקסט של Big Data ב-Cloud Dataflow

‫Dataflow הוא מודל תכנות מאוחד ושירות מנוהל לפיתוח ולהרצה של מגוון רחב של דפוסי עיבוד נתונים, כולל ETL, חישוב אצווה וחישוב רציף. מכיוון ש-Dataflow הוא שירות מנוהל, הוא יכול להקצות משאבים לפי דרישה כדי לצמצם את זמן האחזור, תוך שמירה על יעילות גבוהה של ניצול המשאבים.

מודל Dataflow משלב עיבוד באצווה ועיבוד ברצף, כך שמפתחים לא צריכים להתפשר על דיוק, עלות וזמן עיבוד. ב-codelab הזה תלמדו איך להפעיל צינור Dataflow שסופר את המופעים של מילים ייחודיות בקובץ טקסט.

המדריך הזה מבוסס על המדריך https://cloud.google.com/dataflow/docs/quickstarts/quickstart-java-maven

מה תלמדו

  • איך יוצרים פרויקט Maven באמצעות Cloud Dataflow SDK
  • הפעלת צינור לדוגמה באמצעות מסוף Google Cloud Platform
  • איך מוחקים את קטגוריית Cloud Storage המשויכת ואת התוכן שלה

מה צריך להכין

איך תשתמשו במדריך הזה?

רק קוראים את המאמר קוראים את המאמר ומבצעים את התרגילים

איזה דירוג מגיע לדעתך לחוויית השימוש שלך בשירותים של Google Cloud Platform?

מתחילים ביניים מומחים

הגדרת סביבה בקצב אישי

אם עדיין אין לכם חשבון Google (Gmail או Google Apps), אתם צריכים ליצור חשבון. נכנסים ל-Google Cloud Platform Console‏ (console.cloud.google.com) ויוצרים פרויקט חדש:

Screenshot from 2016-02-10 12:45:26.png

חשוב לזכור את מזהה הפרויקט, שהוא שם ייחודי בכל הפרויקטים ב-Google Cloud (השם שמופיע למעלה כבר תפוס ולא יתאים לכם, מצטערים!). בהמשך ה-codelab הזה, נתייחס אליו כאל PROJECT_ID.

בשלב הבא, תצטרכו להפעיל את החיוב ב-Cloud Console כדי להשתמש במשאבים של Google Cloud.

העלות של התרגיל הזה לא אמורה להיות גבוהה, אבל היא יכולה להיות גבוהה יותר אם תחליטו להשתמש ביותר משאבים או אם תשאירו אותם פועלים (ראו את הקטע 'ניקוי' בסוף המסמך הזה).

משתמשים חדשים ב-Google Cloud Platform זכאים לתקופת ניסיון בחינם בשווי 300$.

הפעלת ממשקי ה-API

לוחצים על סמל התפריט בפינה השמאלית העליונה.

בתפריט הנפתח, בוחרים באפשרות API Manager.

בתיבת החיפוש, מחפשים את Google Compute Engine. לוחצים על Google Compute Engine API ברשימת התוצאות שמופיעה.

בדף Google Compute Engine לוחצים על הפעלה.

אחרי ההפעלה, לוחצים על החץ כדי לחזור.

עכשיו מחפשים את ממשקי ה-API הבאים ומפעילים אותם גם כן:

  • Google Dataflow API
  • Stackdriver Logging API
  • Google Cloud Storage
  • Google Cloud Storage JSON API
  • BigQuery API
  • Google Cloud Pub/Sub API
  • Google Cloud Datastore API

במסוף Google Cloud Platform, לוחצים על סמל התפריט בפינה הימנית העליונה של המסך:

גוללים למטה ובוחרים באפשרות Cloud Storage בקטע המשנה אחסון:

עכשיו אמור להופיע הדפדפן של Cloud Storage. אם אתם משתמשים בפרויקט שאין בו כרגע קטגוריות של Cloud Storage, יופיע תיבת דו-שיח עם הצעה ליצור קטגוריה חדשה:

כדי ליצור מאגר, לוחצים על הלחצן Create bucket:

מזינים שם לקטגוריה. כמו שכתוב בתיבת הדו-שיח, שמות הקטגוריות חייבים להיות ייחודיים בכל Cloud Storage. לכן, אם תבחרו שם ברור, כמו test, סביר להניח שמישהו אחר כבר יצר קטגוריה עם השם הזה ותקבלו שגיאה.

יש גם כמה כללים לגבי התווים שמותרים בשמות של קטגוריות. אם השם של הקטגוריה מתחיל ומסתיים באות או בספרה, ואם משתמשים במקפים רק באמצע, אז הכול בסדר. אם תנסו להשתמש בתווים מיוחדים, או להתחיל או לסיים את שם הקטגוריה במשהו שאינו אות או מספר, תיבת הדו-שיח תזכיר לכם את הכללים.

מזינים שם ייחודי לקטגוריה ולוחצים על Create (יצירה). אם תבחרו משהו שכבר נמצא בשימוש, תוצג הודעת השגיאה שמופיעה למעלה. אחרי שיוצרים את הקטגוריה, מועברים לקטגוריה החדשה והריקה בדפדפן:

שם הקטגוריה שיוצג לכם יהיה שונה, כי הוא חייב להיות ייחודי בכל הפרויקטים.

הפעלת Google Cloud Shell

ב-GCP Console, לוחצים על סמל Cloud Shell בסרגל הכלים שבפינה השמאלית העליונה:

לאחר מכן לוחצים על 'הפעלת Cloud Shell':

יחלפו כמה רגעים עד שההקצאה והחיבור לסביבת העבודה יושלמו:

המכונה הווירטואלית הזו כוללת את כל הכלים הדרושים למפתחים. יש בה ספריית בית בנפח מתמיד של 5GB והיא פועלת ב-Google Cloud, מה שמשפר מאוד את הביצועים והאימות ברשת. את רוב העבודה במעבדה הזו, אם לא את כולה, אפשר לבצע באמצעות דפדפן או Google Chromebook.

אחרי שמתחברים ל-Cloud Shell, אמור להופיע אימות שכבר בוצע ושהפרויקט כבר הוגדר לפי PROJECT_ID.

מריצים את הפקודה הבאה ב-Cloud Shell כדי לוודא שהאימות בוצע:

gcloud auth list

פלט הפקודה

Credentialed accounts:
 - <myaccount>@<mydomain>.com (active)
gcloud config list project

פלט הפקודה

[core]
project = <PROJECT_ID>

אם הוא לא מוגדר, אפשר להגדיר אותו באמצעות הפקודה הבאה:

gcloud config set project <PROJECT_ID>

פלט הפקודה

Updated property [core/project].

אחרי שמפעילים את Cloud Shell, מתחילים ליצור פרויקט Maven שמכיל את Cloud Dataflow SDK for Java.

מריצים את הפקודה mvn archetype:generate בשורת הפקודה:

  mvn archetype:generate \
     -DarchetypeArtifactId=google-cloud-dataflow-java-archetypes-examples \
     -DarchetypeGroupId=com.google.cloud.dataflow \
     -DarchetypeVersion=1.9.0 \
     -DgroupId=com.example \
     -DartifactId=first-dataflow \
     -Dversion="0.1" \
     -DinteractiveMode=false \
     -Dpackage=com.example

אחרי שמריצים את הפקודה, אמורה להופיע ספרייה חדשה בשם first-dataflow בספרייה הנוכחית. ‫first-dataflow מכיל פרויקט Maven שכולל את Cloud Dataflow SDK ל-Java וצינורות לדוגמה.

נתחיל בשמירת מזהה הפרויקט ושמות הקטגוריות של Cloud Storage כמשתני סביבה. אפשר לעשות את זה ב-Cloud Shell. חשוב להחליף את <your_project_id> במזהה הפרויקט שלכם.

 export PROJECT_ID=<your_project_id>

עכשיו נבצע את אותה הפעולה עבור קטגוריית Cloud Storage. חשוב לזכור להחליף את <your_bucket_name> בשם הייחודי שבו השתמשתם כדי ליצור את הקטגוריה בשלב מוקדם יותר.

 export BUCKET_NAME=<your_bucket_name>

עוברים לספרייה first-dataflow/.

 cd first-dataflow

אנחנו הולכים להריץ צינור (pipeline) שנקרא WordCount, שקורא טקסט, מבצע טוקניזציה של שורות הטקסט למילים בודדות ומבצע ספירת תדירות של כל אחת מהמילים האלה. קודם נריץ את צינור העיבוד, ובזמן שהוא פועל נבדוק מה קורה בכל שלב.

מריצים את הפקודה mvn compile exec:java בחלון המעטפת או הטרמינל כדי להפעיל את צינור הנתונים. בפקודה הבאה, הארגומנטים --project, --stagingLocation, ו---output מתייחסים למשתני הסביבה שהגדרתם קודם בשלב הזה.

 mvn compile exec:java \
      -Dexec.mainClass=com.example.WordCount \
      -Dexec.args="--project=${PROJECT_ID} \
      --stagingLocation=gs://${BUCKET_NAME}/staging/ \
      --output=gs://${BUCKET_NAME}/output \
      --runner=BlockingDataflowPipelineRunner"

בזמן שהעבודה פועלת, נחפש את העבודה ברשימת העבודות.

פותחים את ממשק המשתמש של Cloud Dataflow Monitoring ב-Google Cloud Platform Console. אמור להופיע סטטוס של Running (פועל) למשימת ספירת המילים:

עכשיו נבחן את הפרמטרים של צינור הנתונים. מתחילים בלחיצה על שם המשרה:

כשבוחרים משימה, אפשר לראות את תרשים הביצוע. גרף ההפעלה של צינור הנתונים מייצג כל טרנספורמציה בצינור כתיבה שמכילה את שם הטרנספורמציה ומידע על הסטטוס. כדי לראות פרטים נוספים, לוחצים על החץ הקטן בפינה השמאלית העליונה של כל שלב:

עכשיו נראה איך הצינור מעבד את הנתונים בכל שלב:

  • קריאה: בשלב הזה, צינור העיבוד קורא ממקור קלט. במקרה הזה, זהו קובץ טקסט מ-Cloud Storage עם הטקסט המלא של המחזה המלך ליר של שייקספיר. תהליך הצינור שלנו קורא את הקובץ שורה אחר שורה ומוציא כל PCollection, כאשר כל שורה בקובץ הטקסט היא רכיב באוסף.
  • CountWords: לשלב CountWords יש שני חלקים. קודם כול, הפונקציה משתמשת בפונקציית do מקבילית (ParDo) בשם ExtractWords כדי ליצור טוקניזציה של כל שורה למילים נפרדות. הפלט של ExtractWords הוא PCollection חדש שבו כל רכיב הוא מילה. בשלב הבא, Count, נעשה שימוש בטרנספורמציה שסופקה על ידי Dataflow SDK, שמחזירה זוגות של מפתח וערך, כאשר המפתח הוא מילה ייחודית והערך הוא מספר הפעמים שהמילה מופיעה. זו השיטה שמיישמת את CountWords, ואפשר לעיין בקובץ WordCount.java המלא ב-GitHub:
  /**
   * A PTransform that converts a PCollection containing lines of text 
   * into a PCollection of formatted word counts.
   */
  public static class CountWords extends PTransform<PCollection<String>,
      PCollection<KV<String, Long>>> {
    @Override
    public PCollection<KV<String, Long>> apply(PCollection<String> lines) {

      // Convert lines of text into individual words.
      PCollection<String> words = lines.apply(
          ParDo.of(new ExtractWordsFn()));

      // Count the number of times each word occurs.
      PCollection<KV<String, Long>> wordCounts =
          words.apply(Count.<String>perElement());

      return wordCounts;
    }
  }
  • FormatAsText: פונקציה שמעצבת כל זוג של מפתח וערך למחרוזת שאפשר להדפיס. כך נראה הטרנספורמציה FormatAsText שצריך להטמיע:
  /** A SimpleFunction that converts a Word and Count into a printable string. */
  public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> {
    @Override
    public String apply(KV<String, Long> input) {
      return input.getKey() + ": " + input.getValue();
    }
  }
  • WriteCounts: בשלב הזה אנחנו כותבים את המחרוזות שניתנות להדפסה לכמה קובצי טקסט מחולקים.

בעוד כמה דקות נבדוק את הפלט שמתקבל מהצינור.

עכשיו מעיינים בדף סיכום שמשמאל לתרשים, שכולל פרמטרים של צינורות שכללנו בפקודה mvn compile exec:java.

אפשר גם לראות מונים מותאמים אישית עבור צינור הנתונים, שבמקרה הזה מראים כמה שורות ריקות נתקלו עד עכשיו במהלך ההפעלה. אפשר להוסיף מוניטורים חדשים לצינור כדי לעקוב אחרי מדדים ספציפיים לאפליקציה.

כדי לראות את הודעות השגיאה הספציפיות, לוחצים על סמל היומנים.

כדי לסנן את ההודעות שמופיעות בכרטיסייה 'יומן המשימות', משתמשים בתפריט הנפתח 'חומרה מינימלית'.

אתם יכולים להשתמש בלחצן Worker Logs בכרטיסייה Logs כדי לראות את יומני העובדים של מכונות Compute Engine שמריצות את צינור הנתונים. יומני Worker מורכבים משורות יומן שנוצרו על ידי הקוד שלכם ועל ידי הקוד שנוצר על ידי Dataflow שמריץ אותו.

אם אתם מנסים לנפות באגים בכשל בצינור, לרוב יהיו יומנים נוספים ביומני העובדים שיעזרו לפתור את הבעיה. חשוב לזכור שהיומנים האלה הם צבירה של כל העובדים, ואפשר לסנן ולחפש בהם.

בשלב הבא נבדוק שהעבודה הסתיימה בהצלחה.

פותחים את ממשק המשתמש של Cloud Dataflow Monitoring ב-Google Cloud Platform Console.

בהתחלה, סטטוס העבודה של ספירת המילים יהיה Running (פועל), ואחר כך Succeeded (הושלם):

התהליך יימשך כ-3 או 4 דקות.

זוכר מתי הפעלת את צינור הנתונים וציינת דלי פלט? בואו נראה את התוצאה (כי בטח תרצו לדעת כמה פעמים כל מילה בהמלך ליר מופיעה!). חוזרים לדף Cloud Storage Browser ב-Google Cloud Platform Console. בקטגוריה שלכם אמורים להופיע קובצי הפלט וקובצי ההכנה שהעבודה יצרה:

אפשר להשבית את המשאבים ממסוף Google Cloud Platform.

פותחים את הדף Cloud Storage browser ב-Google Cloud Platform Console.

מסמנים את התיבה ליד הקטגוריה שיצרתם.

לוחצים על DELETE כדי למחוק סופית את הקטגוריה ואת התוכן שלה.

למדתם איך ליצור פרויקט Maven באמצעות Cloud Dataflow SDK, להפעיל צינור לדוגמה באמצעות Google Cloud Platform Console ולמחוק את קטגוריית Cloud Storage המשויכת ואת התוכן שלה.

מידע נוסף

רישיון

העבודה הזו בשימוש במסגרת רישיון Creative Commons כללי מגרסה 3.0 ורישיון Apache מגרסה 2.0.