Dataflow هو نموذج برمجة موحَّد وخدمة مُدارة لتطوير وتنفيذ مجموعة كبيرة من أنماط معالجة البيانات، بما في ذلك استخراج البيانات وتحويلها وتحميلها (ETL) والحساب المجمَّع والحساب المتواصل. بما أنّ Dataflow هي خدمة مُدارة، يمكنها تخصيص الموارد عند الطلب لتقليل وقت الاستجابة إلى الحدّ الأدنى مع الحفاظ على كفاءة عالية في الاستخدام.
يجمع نموذج Dataflow بين معالجة البيانات المجمّعة والمتواصلة، ما يتيح للمطوّرين عدم التنازل عن الدقة والتكلفة ووقت المعالجة. في هذا الدرس التطبيقي حول الترميز، ستتعرّف على كيفية تشغيل مسار Dataflow يحسب عدد مرات تكرار الكلمات الفريدة في ملف نصي.
هذا الفيديو التعليمي مقتبس من https://cloud.google.com/dataflow/docs/quickstarts/quickstart-java-maven
ما ستتعرّف عليه
- كيفية إنشاء مشروع Maven باستخدام Cloud Dataflow SDK
- تشغيل نموذج مسار باستخدام Google Cloud Platform Console
- كيفية حذف حزمة Cloud Storage المرتبطة ومحتواها
المتطلبات
كيف ستستخدم هذا البرنامج التعليمي؟
ما هو تقييمك لتجربة استخدام خدمات Google Cloud Platform؟
إعداد البيئة بالسرعة التي تناسبك
إذا لم يكن لديك حساب على Google (Gmail أو Google Apps)، عليك إنشاء حساب. سجِّل الدخول إلى "وحدة تحكّم Google Cloud Platform" (console.cloud.google.com) وأنشِئ مشروعًا جديدًا:
تذكَّر معرّف المشروع، وهو اسم فريد في جميع مشاريع Google Cloud (الاسم أعلاه مستخدَم حاليًا ولن يكون متاحًا لك، نأسف لذلك). سيتم الإشارة إليه لاحقًا في هذا الدرس العملي باسم PROJECT_ID
.
بعد ذلك، عليك تفعيل الفوترة في Cloud Console من أجل استخدام موارد Google Cloud.
لن تكلفك تجربة هذا الدرس البرمجي أكثر من بضعة دولارات، ولكن قد تكون التكلفة أعلى إذا قررت استخدام المزيد من الموارد أو إذا تركتها قيد التشغيل (راجِع قسم "التنظيف" في نهاية هذا المستند).
يمكن للمستخدمين الجدد في Google Cloud Platform الاستفادة من فترة تجريبية مجانية بقيمة 300 دولار أمريكي.
تفعيل واجهات برمجة التطبيقات
انقر على رمز القائمة في أعلى يمين الشاشة.
اختَر API Manager من القائمة المنسدلة.
ابحث عن "Google Compute Engine" في مربّع البحث. انقر على "Google Compute Engine API" في قائمة النتائج التي تظهر.
في صفحة Google Compute Engine، انقر على تفعيل.
بعد تفعيلها، انقر على السهم للرجوع.
ابحث الآن عن واجهات برمجة التطبيقات التالية وفعِّلها أيضًا:
- Google Dataflow API
- Stackdriver Logging API
- Google Cloud Storage
- Google Cloud Storage JSON API
- BigQuery API
- Google Cloud Pub/Sub API
- Google Cloud Datastore API
في وحدة تحكّم Google Cloud Platform، انقر على رمز القائمة في أعلى يمين الشاشة:
انتقِل للأسفل واختَر Cloud Storage في القسم الفرعي مساحة التخزين:
من المفترض أن يظهر لك الآن "متصفّح Cloud Storage"، وإذا كنت تستخدم مشروعًا لا يتضمّن حاليًا أي حِزم Cloud Storage، سيظهر لك مربّع حوار يدعوك إلى إنشاء حزمة جديدة:
اضغط على الزر إنشاء حزمة لإنشاء حزمة:
أدخِل اسمًا للحزمة. كما يوضّح مربّع الحوار، يجب أن تكون أسماء الحِزم فريدة في جميع أنحاء Cloud Storage. لذا، إذا اخترت اسمًا واضحًا، مثل "اختبار"، من المحتمل أن تجد أنّ مستخدمًا آخر قد أنشأ مجموعة بهذا الاسم، وستتلقّى رسالة خطأ.
هناك أيضًا بعض القواعد المتعلقة بالأحرف المسموح بها في أسماء الحِزم. إذا بدأت اسم الحزمة وانتهيت منه بحرف أو رقم، واستخدمت الشرطات في المنتصف فقط، لن تواجه أي مشاكل. إذا حاولت استخدام رموز خاصة أو بدء اسم الحزمة أو إنهائه بشيء آخر غير حرف أو رقم، سيذكّرك مربّع الحوار بالقواعد.
أدخِل اسمًا فريدًا للحزمة واضغط على إنشاء. إذا اخترت اسمًا مستخدمًا من قبل، ستظهر لك رسالة الخطأ الموضّحة أعلاه. بعد إنشاء حزمة بنجاح، سيتم نقلك إلى حزمتك الجديدة الفارغة في المتصفّح:
بالطبع، سيختلف اسم الحزمة الذي تراه، لأنّه يجب أن يكون فريدًا في جميع المشاريع.
تفعيل Google Cloud Shell
من وحدة تحكّم Google Cloud Platform، انقر على رمز Cloud Shell في شريط الأدوات العلوي الأيسر:
ثم انقر على "بدء Cloud Shell":
ينبغي ألا تستغرق إدارة الحسابات والاتصال بالبيئة أكثر من بضع لحظات.
يتم تحميل هذه الآلة الافتراضية مزوّدة بكل أدوات التطوير التي ستحتاج إليها. وتوفِّر هذه الآلة دليلاً رئيسيًا دائمًا بسعة 5 غيغابايت ويتمّ تشغيله على Google Cloud، ما يحسّن كثيرًا أداء الشبكة والمصادقة. ويمكن إتمام معظم عملك إن لم يكن كلّه في هذا الدرس ببساطة من خلال متصفّح أو جهاز Google Chromebook فقط.
بعد الاتصال بـ Cloud Shell، من المفترض أن ترى أنّه تم إثبات هويتك وأنّ المشروع تم ضبطه على PROJECT_ID.
نفِّذ الأمر التالي في Cloud Shell للتأكّد من إكمال عملية المصادقة:
gcloud auth list
ناتج الأمر
Credentialed accounts: - <myaccount>@<mydomain>.com (active)
gcloud config list project
ناتج الأمر
[core] project = <PROJECT_ID>
إذا لم يكن كذلك، يمكنك تعيينه من خلال هذا الأمر:
gcloud config set project <PROJECT_ID>
ناتج الأمر
Updated property [core/project].
بعد تشغيل Cloud Shell، لنبدأ بإنشاء مشروع Maven يحتوي على حزمة تطوير البرامج (SDK) الخاصة بـ Cloud Dataflow للغة Java.
نفِّذ الأمر mvn archetype:generate
في الصدفة على النحو التالي:
mvn archetype:generate \
-DarchetypeArtifactId=google-cloud-dataflow-java-archetypes-examples \
-DarchetypeGroupId=com.google.cloud.dataflow \
-DarchetypeVersion=1.9.0 \
-DgroupId=com.example \
-DartifactId=first-dataflow \
-Dversion="0.1" \
-DinteractiveMode=false \
-Dpackage=com.example
بعد تنفيذ الأمر، من المفترض أن يظهر دليل جديد باسم first-dataflow
ضمن دليلك الحالي. يحتوي first-dataflow
على مشروع Maven يتضمّن حزمة تطوير البرامج (SDK) الخاصة بـ Cloud Dataflow للغة Java ومسارات نموذجية.
لنبدأ بحفظ رقم تعريف مشروعنا وأسماء حِزم Cloud Storage كمتغيّرات بيئية. يمكنك إجراء ذلك في Cloud Shell. احرص على استبدال <your_project_id>
برقم تعريف مشروعك.
export PROJECT_ID=<your_project_id>
سننفّذ الآن الإجراء نفسه لحزمة Cloud Storage. تذكَّر استبدال <your_bucket_name>
بالاسم الفريد الذي استخدمته لإنشاء الحزمة في خطوة سابقة.
export BUCKET_NAME=<your_bucket_name>
انتقِل إلى الدليل first-dataflow/
.
cd first-dataflow
سننفّذ مسارًا للبيانات يُسمى WordCount، وهو يقرأ النص ويقسّم أسطر النص إلى كلمات فردية ويحصي عدد مرات تكرار كل كلمة من هذه الكلمات. سنشغّل أولاً مسار البيانات، وأثناء تشغيله، سنلقي نظرة على ما يحدث في كل خطوة.
ابدأ عملية النقل عن طريق تنفيذ الأمر mvn compile exec:java
في واجهة الأوامر أو نافذة المحطة الطرفية. بالنسبة إلى الوسيطتين --project, --stagingLocation,
و--output
، يشير الأمر أدناه إلى متغيرات البيئة التي أعددتها سابقًا في هذه الخطوة.
mvn compile exec:java \
-Dexec.mainClass=com.example.WordCount \
-Dexec.args="--project=${PROJECT_ID} \
--stagingLocation=gs://${BUCKET_NAME}/staging/ \
--output=gs://${BUCKET_NAME}/output \
--runner=BlockingDataflowPipelineRunner"
أثناء تنفيذ المهمة، لنبحث عنها في قائمة المهام.
افتح واجهة مستخدم المراقبة في Cloud Dataflow ضِمن وحدة تحكّم Google Cloud Platform. من المفترض أن تظهر مهمة عدد الكلمات بالحالة قيد التنفيذ:
لنلقِ الآن نظرة على مَعلمات مسار العرض. ابدأ بالنقر على اسم وظيفتك:
عند اختيار مهمة، يمكنك الاطّلاع على الرسم البياني للتنفيذ. يمثّل الرسم البياني لتنفيذ المسار كل عملية تحويل في المسار كمربّع يحتوي على اسم عملية التحويل وبعض معلومات الحالة. يمكنك النقر على علامة السهم في أعلى يسار كل خطوة للاطّلاع على مزيد من التفاصيل:
لنتعرّف على كيفية تحويل المسار للبيانات في كل خطوة:
- القراءة: في هذه الخطوة، تقرأ سلسلة نقل البيانات من مصدر إدخال. في هذه الحالة، يكون الملف عبارة عن ملف نصي من Cloud Storage يتضمّن النص الكامل لمسرحية شكسبير الملك لير. تقرأ عملية المعالجة الملف سطرًا سطرًا وتنتج
PCollection
لكل سطر، حيث يمثّل كل سطر في ملفنا النصي عنصرًا في المجموعة. - CountWords: تحتوي الخطوة
CountWords
على جزأين. أولاً، تستخدِم دالة ParDo متوازية باسمExtractWords
لتقسيم كل سطر إلى كلمات فردية. ناتج عملية ExtractWords هو PCollection جديد يكون كل عنصر فيه عبارة عن كلمة. تستخدِم الخطوة التالية،Count
، عملية تحويل توفّرها حزمة تطوير البرامج (SDK) الخاصة بخدمة Dataflow، وتُرجع أزواجًا من المفاتيح والقيم، حيث يكون المفتاح كلمة فريدة والقيمة هي عدد مرات تكرارها. في ما يلي الطريقة التي تنفّذCountWords
، ويمكنك الاطّلاع على ملف WordCount.java الكامل على GitHub:
/**
* A PTransform that converts a PCollection containing lines of text
* into a PCollection of formatted word counts.
*/
public static class CountWords extends PTransform<PCollection<String>,
PCollection<KV<String, Long>>> {
@Override
public PCollection<KV<String, Long>> apply(PCollection<String> lines) {
// Convert lines of text into individual words.
PCollection<String> words = lines.apply(
ParDo.of(new ExtractWordsFn()));
// Count the number of times each word occurs.
PCollection<KV<String, Long>> wordCounts =
words.apply(Count.<String>perElement());
return wordCounts;
}
}
- FormatAsText: هذه دالة تعمل على تنسيق كل زوج من المفاتيح والقيم في سلسلة قابلة للطباعة. في ما يلي عملية التحويل
FormatAsText
لتنفيذ ذلك:
/** A SimpleFunction that converts a Word and Count into a printable string. */
public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> {
@Override
public String apply(KV<String, Long> input) {
return input.getKey() + ": " + input.getValue();
}
}
- WriteCounts: في هذه الخطوة، نكتب السلاسل القابلة للطباعة في ملفات نصية متعددة مقسّمة.
سنلقي نظرة على الناتج الناتج من خط الأنابيب في غضون بضع دقائق.
اطّلِع الآن على صفحة الملخّص على يسار الرسم البياني، والتي تتضمّن مَعلمات خط الأنابيب التي أدرجناها في الأمر mvn compile exec:java
.
يمكنك أيضًا الاطّلاع على عدادات مخصّصة للمسار، والتي توضّح في هذه الحالة عدد الأسطر الفارغة التي تمت مصادفتها حتى الآن أثناء التنفيذ. يمكنك إضافة عدّادات جديدة إلى مسار المعالجة لتتبُّع مقاييس خاصة بالتطبيق.
يمكنك النقر على رمز السجلات لعرض رسائل الخطأ المحدّدة.
يمكنك فلترة الرسائل التي تظهر في علامة التبويب "سجلّ المهام" باستخدام القائمة المنسدلة "الحدّ الأدنى للخطورة".
يمكنك استخدام الزر سجلات العامل في علامة التبويب "السجلات" لعرض سجلات العامل الخاصة بمثيلات Compute Engine التي تشغّل خط الأنابيب. تتألف سجلّات العامل من أسطر سجلّات تم إنشاؤها بواسطة الرمز البرمجي والرمز البرمجي الذي أنشأته Dataflow لتشغيله.
إذا كنت تحاول تصحيح خطأ في مسار البيانات، ستتوفّر غالبًا سجلّات إضافية في سجلّات العامل تساعد في حلّ المشكلة. يُرجى العِلم أنّه يتم تجميع هذه السجلات على مستوى جميع الموظفين، ويمكن فلترتها والبحث فيها.
في الخطوة التالية، سنتأكّد من أنّ مهمتك قد اكتملت بنجاح.
افتح واجهة مستخدم المراقبة في Cloud Dataflow ضِمن وحدة تحكّم Google Cloud Platform.
من المفترض أن تظهر مهمة عدد الكلمات بحالة قيد التشغيل في البداية، ثم تم بنجاح:
سيستغرق تشغيل المهمة من 3 إلى 4 دقائق تقريبًا.
هل تتذكر عندما شغّلت خط الأنابيب وحدّدت حزمة إخراج؟ لنلقِ نظرة على النتيجة (ألا تريد معرفة عدد المرات التي وردت فيها كل كلمة في الملك لير؟). ارجع إلى "متصفّح Cloud Storage" في وحدة تحكّم Google Cloud Platform. في الحزمة، من المفترض أن تظهر لك ملفات الإخراج وملفات التخزين المؤقت التي أنشأتها مهمتك:
يمكنك إيقاف مواردك من وحدة تحكّم Google Cloud Platform.
افتح متصفّح Cloud Storage في "وحدة تحكّم Google Cloud Platform".
ضَع علامة في مربّع الاختيار بجانب المجموعة التي أنشأتها.
انقر على حذف لحذف الحزمة ومحتواها نهائيًا.
تعرّفت على كيفية إنشاء مشروع Maven باستخدام حزمة تطوير البرامج (SDK) الخاصة بخدمة Cloud Dataflow، وتشغيل نموذج مسار باستخدام "وحدة تحكّم Google Cloud Platform"، وحذف حزمة Cloud Storage المرتبطة ومحتوياتها.
مزيد من المعلومات
- مستندات Dataflow: https://cloud.google.com/dataflow/docs/
الترخيص
يخضع هذا العمل لترخيص Creative Commons Attribution 3.0 Generic License وترخيص Apache 2.0.