O Dataflow é um modelo de programação unificado e um serviço gerenciado para desenvolvimento e execução de diversos padrões de processamento de dados, inclusive ETL, computação em lote e computação contínua. Como o Dataflow é um serviço gerenciado, ele pode alocar recursos sob demanda para minimizar a latência enquanto mantém uma alta eficiência de utilização.
O modelo do Dataflow combina processamento em lote e de stream para que os desenvolvedores não precisem fazer concessões entre exatidão, custo e tempo de processamento. Neste codelab, você vai aprender a executar um pipeline do Dataflow que conta as ocorrências de palavras únicas em um arquivo de texto.
Este tutorial foi adaptado de https://cloud.google.com/dataflow/docs/quickstarts/quickstart-java-maven
O que você vai aprender
- Como criar um projeto Maven com o SDK do Cloud Dataflow
- Como executar um pipeline de exemplo com o Console do Google Cloud Platform
- Como excluir o bucket do Cloud Storage associado e o conteúdo dele
O que é necessário
Como você usará este tutorial?
Como você classificaria sua experiência com o uso dos serviços do Google Cloud Platform?
Configuração de ambiente autoguiada
Se você ainda não tem uma Conta do Google (Gmail ou Google Apps), crie uma. Faça login no Console do Google Cloud Platform (console.cloud.google.com) e crie um projeto:
Lembre-se do código do projeto, um nome exclusivo em todos os projetos do Google Cloud. O nome acima já foi escolhido e não servirá para você. Faremos referência a ele mais adiante neste codelab como PROJECT_ID
.
Em seguida, ative o faturamento no console do Cloud para usar os recursos do Google Cloud.
A execução por meio deste codelab terá um custo baixo, mas poderá ser mais se você decidir usar mais recursos ou se deixá-los em execução. Consulte a seção "limpeza" no final deste documento.
Novos usuários do Google Cloud Platform têm direito a uma avaliação sem custo financeiro de US$300.
Ativar as APIs
Clique no ícone de menu no canto superior esquerdo da tela.
Selecione API Manager no menu suspenso.
Procure "Google Compute Engine" na caixa de pesquisa. Clique em "API Google Compute Engine" na lista de resultados que aparece.
Na página do Google Compute Engine, clique em Ativar.
Depois de fazer a ativação, clique na seta para voltar.
Agora, pesquise e ative as seguintes APIs:
- API Google Dataflow
- API Stackdriver Logging
- Google Cloud Storage
- API Google Cloud Storage JSON
- API BigQuery
- API Google Cloud Pub/Sub
- API Google Cloud Datastore
No console do Google Cloud Platform, clique no ícone Menu no canto superior esquerdo da tela:
Role a tela para baixo e selecione Cloud Storage na subseção Armazenamento:
Agora você vai ver o navegador do Cloud Storage. Se estiver usando um projeto que não tem buckets do Cloud Storage, uma caixa de diálogo vai aparecer para você criar um bucket:
Clique no botão Criar bucket para criar um:
Digite um nome para o bucket. Como a caixa de diálogo observa, os nomes de bucket precisam ser exclusivos em todo o Cloud Storage. Portanto, se você escolher um nome óbvio, como "test", provavelmente vai descobrir que outra pessoa já criou um bucket com esse nome e vai receber um erro.
Também há algumas regras sobre os caracteres permitidos nos nomes de buckets. Se você começar e terminar o nome do bucket com uma letra ou um número e usar apenas traços no meio, não haverá problemas. Se você tentar usar caracteres especiais ou começar ou terminar o nome do bucket com algo diferente de uma letra ou um número, a caixa de diálogo vai lembrar as regras.
Digite um nome exclusivo para o bucket e clique em Criar. Se você escolher algo que já está em uso, vai receber a mensagem de erro mostrada acima. Depois de criar um bucket com sucesso, você verá seu novo bucket vazio no navegador:
O nome do bucket que você vê será diferente, já que eles precisam ser exclusivos em todos os projetos.
Ativar o Google Cloud Shell
No Console do GCP, clique no ícone do Cloud Shell na barra de ferramentas localizada no canto superior direito:
Em seguida, clique em "Start Cloud Shell":
O provisionamento e a conexão ao ambiente levarão apenas alguns instantes para serem concluídos:
Essa máquina virtual contém todas as ferramentas de desenvolvimento necessárias. Ela oferece um diretório principal persistente de 5 GB, além de ser executada no Google Cloud. Isso aprimora o desempenho e a autenticação da rede. Praticamente todo o seu trabalho neste laboratório pode ser feito em um navegador ou no seu Google Chromebook.
Depois de se conectar ao Cloud Shell, sua conta já estará autenticada e o projeto estará configurado com seu PROJECT_ID.
Execute o seguinte comando no Cloud Shell para confirmar se a conta está autenticada:
gcloud auth list
Resposta ao comando
Credentialed accounts: - <myaccount>@<mydomain>.com (active)
gcloud config list project
Resposta ao comando
[core] project = <PROJECT_ID>
Se o projeto não estiver configurado, configure-o usando este comando:
gcloud config set project <PROJECT_ID>
Resposta ao comando
Updated property [core/project].
Depois que o Cloud Shell for iniciado, vamos começar criando um projeto Maven que contenha o SDK do Cloud Dataflow para Java.
Execute o comando mvn archetype:generate
no shell da seguinte maneira:
mvn archetype:generate \
-DarchetypeArtifactId=google-cloud-dataflow-java-archetypes-examples \
-DarchetypeGroupId=com.google.cloud.dataflow \
-DarchetypeVersion=1.9.0 \
-DgroupId=com.example \
-DartifactId=first-dataflow \
-Dversion="0.1" \
-DinteractiveMode=false \
-Dpackage=com.example
Depois de executar o comando, você verá um novo diretório chamado first-dataflow
abaixo do diretório atual. first-dataflow
contém um projeto Maven que inclui o SDK do Cloud Dataflow para Java e pipelines de exemplo.
Vamos começar salvando o ID do projeto e os nomes dos buckets do Cloud Storage como variáveis de ambiente. É possível fazer isso no Cloud Shell. Substitua <your_project_id>
pelo ID do seu projeto.
export PROJECT_ID=<your_project_id>
Agora vamos fazer o mesmo para o bucket do Cloud Storage. Substitua <your_bucket_name>
pelo nome exclusivo que você usou para criar o bucket em uma etapa anterior.
export BUCKET_NAME=<your_bucket_name>
Altere para o diretório first-dataflow/
.
cd first-dataflow
Será executado um pipeline chamado "WordCount", que lê o texto, tokeniza as linhas de texto em palavras individuais e executa uma contagem de frequência em cada uma dessas palavras. Primeiro, vamos executar o pipeline e, enquanto ele estiver em execução, vamos analisar o que está acontecendo em cada etapa.
Inicie o pipeline executando o comando mvn compile exec:java
no shell ou na janela do terminal. Nos argumentos --project, --stagingLocation,
e --output
, o comando abaixo faz referência às variáveis de ambiente que você configurou anteriormente nesta etapa.
mvn compile exec:java \
-Dexec.mainClass=com.example.WordCount \
-Dexec.args="--project=${PROJECT_ID} \
--stagingLocation=gs://${BUCKET_NAME}/staging/ \
--output=gs://${BUCKET_NAME}/output \
--runner=BlockingDataflowPipelineRunner"
Enquanto o job estiver em execução, vamos encontrá-lo na lista de jobs.
Abra a interface de monitoramento do Cloud Dataflow no Console do Google Cloud Platform. Você vai ver seu job de contagem de palavras com o status Em execução:
Agora vamos conhecer os parâmetros do pipeline. Para começar, clique no nome do seu job:
Ao selecionar um job, é possível conferir o gráfico de execução. O gráfico de execução de um pipeline representa cada transformação no pipeline como uma caixa que contém o nome da transformação e algumas informações de status. Para ver mais detalhes, clique no sinal de seta no canto superior direito de cada etapa:
Como o pipeline transforma os dados em cada etapa:
- Read: nesta etapa, o pipeline lê o texto a partir de uma fonte de entrada. Nesse caso, é um arquivo de texto do Cloud Storage com todo o texto da peça Rei Lear, de Shakespeare. Nosso pipeline lê o arquivo linha por linha e gera uma
PCollection
, em que cada linha no arquivo de texto é um elemento da coleção. - CountWords: a etapa
CountWords
tem duas partes. Primeiro, ele usa uma função paralela (ParDo) chamadaExtractWords
para dividir cada linha em palavras individuais. A saída de "ExtractWords" é uma nova PCollection em que cada elemento é uma palavra. A próxima etapa,Count
, usa uma transformação fornecida pelo SDK do Dataflow que retorna pares de chave-valor em que a chave é uma palavra única e o valor é o número de vezes que ela aparece. Confira o método que implementaCountWords
e o arquivo WordCount.java completo no GitHub:
/**
* A PTransform that converts a PCollection containing lines of text
* into a PCollection of formatted word counts.
*/
public static class CountWords extends PTransform<PCollection<String>,
PCollection<KV<String, Long>>> {
@Override
public PCollection<KV<String, Long>> apply(PCollection<String> lines) {
// Convert lines of text into individual words.
PCollection<String> words = lines.apply(
ParDo.of(new ExtractWordsFn()));
// Count the number of times each word occurs.
PCollection<KV<String, Long>> wordCounts =
words.apply(Count.<String>perElement());
return wordCounts;
}
}
- FormatAsText: esta função formata cada par de chave-valor como uma string apresentável. Confira a transformação
FormatAsText
para implementar isso:
/** A SimpleFunction that converts a Word and Count into a printable string. */
public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> {
@Override
public String apply(KV<String, Long> input) {
return input.getKey() + ": " + input.getValue();
}
}
- WriteCounts: nesta etapa, gravamos as strings imprimíveis em vários arquivos de texto fragmentados.
A resposta resultante do pipeline será exibida em alguns minutos.
Agora veja a página Resumo à direita do gráfico, que inclui os parâmetros do pipeline que adicionamos ao comando mvn compile exec:java
.
Você também pode ver Contadores personalizados para o pipeline. Neste caso, eles mostram quantas linhas vazias foram encontradas até o momento durante a execução. É possível adicionar novos contadores ao pipeline para rastrear métricas específicas do aplicativo.
Clique no ícone Registros para ver as mensagens de erro específicas.
Filtre as mensagens que aparecem na guia "Registros do job" com o menu suspenso "Gravidade mínima".
Use o botão Registros de worker na guia "Registros" para ver os registros de worker das instâncias do Compute Engine que executam seu pipeline. Esses registros consistem em linhas de registro geradas pelo seu código e pelo código gerado pelo Dataflow que os executa.
Se você estiver tentando depurar uma falha no pipeline, muitas vezes haverá outros dados nos registros do worker para ajudar a resolver o problema. Esses registros são agregados em todos os workers e podem ser filtrados e pesquisados.
Na próxima etapa, vamos verificar se o job foi concluído.
Abra a interface de monitoramento do Cloud Dataflow no Console do Google Cloud Platform.
Você verá o job de contagem de palavras com o status Em execução que mudará para Concluído:
A execução do job levará entre três e quatro minutos.
Lembra quando você executou o canal e especificou um bucket de saída? Vamos dar uma olhada no resultado. Afinal, você não quer saber quantas ocorrências de cada palavra há no Rei Lear? Volte ao navegador do Cloud Storage no console do Google Cloud Platform. No seu bucket, os arquivos de saída e os arquivos de preparação que foram criados pelo job serão exibidos:
É possível encerrar os recursos no Console do Google Cloud Platform.
Abra o navegador do Cloud Storage no Console do Google Cloud Platform.
Marque a caixa de seleção ao lado do bucket criado.
Clique em EXCLUIR para remover permanentemente o bucket e o conteúdo dele.
Você aprendeu a criar um projeto Maven com o SDK do Cloud Dataflow e a executar um pipeline de exemplo usando o Console do Google Cloud Platform, além de excluir o bucket do Cloud Storage associado e o conteúdo dele.
Saiba mais
- Documentação do Dataflow: https://cloud.google.com/dataflow/docs/
Licença
Este trabalho está licenciado sob uma Licença Creative Commons Atribuição 3.0 Genérica e uma licença Apache 2.0.