部署连接器

本 Cloud Search 教程页面介绍了如何设置数据源和内容连接器以用于索引数据。如需从本教程的开头开始学习,请参阅 Cloud Search 入门教程

构建连接器

将工作目录更改为 cloud-search-samples/end-to-end/connector 目录,然后运行以下命令:

mvn package -DskipTests

该命令会下载构建内容连接器所需的必要依赖项,并编译代码。

创建服务账号凭据

连接器需要服务账号凭据才能调用 Cloud Search API。如需创建凭据,请执行以下操作:

  1. 返回 Google Cloud 控制台
  2. 在左侧导航栏中,点击凭据。系统会显示“凭据”页面。
  3. 点击 + 创建凭据下拉列表,然后选择服务账号。系统会显示“创建服务账号”页面。
  4. 服务账号名称字段中,输入“tutorial”。
  5. 记下服务账号 ID 值(紧随服务账号名称之后)。 此值稍后会用到。
  6. 点击创建。系统会显示“服务账号权限(可选)”对话框。
  7. 点击继续。系统会显示“向用户授予访问此服务账号的权限(可选)”对话框。
  8. 点击完成。系统会显示“凭据”界面。
  9. 在“服务账号”下,点击服务账号电子邮件地址。系统随即会显示“服务账号详情”页面。
  10. 在“密钥”下方,点击添加密钥下拉列表,然后选择创建新密钥。系统随即会显示“创建私钥”对话框。
  11. 点击创建
  12. (可选)如果系统显示“Do you want to allow downloads on console.cloud.google.com?”(您要允许 console.cloud.google.com 上的下载吗?)对话框,请点击允许
  13. 私钥文件已保存到您的计算机。记下下载文件的位置。此文件用于配置内容连接器,以便在调用 Google Cloud Search API 时进行身份验证。

初始化第三方支持

在调用任何其他 Cloud Search API 之前,您必须先初始化对 Google Cloud Search 的第三方支持。

如需初始化 Cloud Search 的第三方支持,请执行以下操作:

  1. 您的 Cloud Search 平台项目包含服务账号凭据。不过,为了初始化第三方支持,您必须创建 Web 应用凭据。如需了解如何创建 Web 应用凭据,请参阅创建凭据。 完成此步骤后,您应该会获得一个客户端 ID 和客户端密钥文件。

  2. 使用 Google 的 OAuth 2 Playground 获取访问令牌:

    1. 点击“设置”,然后选中使用您自己的身份验证凭据
    2. 输入第 1 步中的客户端 ID 和客户端密钥。
    3. 点击关闭
    4. 在“范围”字段中,输入 https://www.googleapis.com/auth/cloud_search.settings,然后点击授权。OAuth 2 Playground 会返回授权代码。
    5. 点击以授权代码交换令牌。系统会返回一个令牌。
  3. 如需初始化 Cloud Search 的第三方支持,请使用以下 curl 命令。请务必将 [YOUR_ACCESS_TOKEN] 替换为在第 2 步中获得的令牌。

    curl --request POST \
    'https://cloudsearch.googleapis.com/v1:initializeCustomer' \
      --header 'Authorization: Bearer [YOUR_ACCESS_TOKEN]' \
      --header 'Accept: application/json' \
      --header 'Content-Type: application/json' \
      --data '{}' \
      --compressed
    

    如果成功,响应正文会包含一个 operation 实例。例如:

    {
    name: "operations/customers/01b3fqdm/lro/AOIL6eBv7fEfiZ_hUSpm8KQDt1Mnd6dj5Ru3MXf-jri4xK6Pyb2-Lwfn8vQKg74pgxlxjrY"
    }
    

    如果无法成功完成,请与 Cloud Search 支持团队联系。

  4. 使用 operations.get 验证第三方支持是否已初始化:

    curl \
    'https://cloudsearch.googleapis.com/v1/operations/customers/01b3fqdm/lro/AOIL6eBv7fEfiZ_hUSpm8KQDt1Mnd6dj5Ru3MXf-jri4xK6Pyb2-Lwfn8vQKg74pgxlxjrY?key=
    [YOUR_API_KEY]' \
    --header 'Authorization: Bearer [YOUR_ACCESS_TOKEN]' \
    --header 'Accept: application/json' \
    --compressed
    

    当第三方初始化完成时,它会包含设置为 true 的字段 done。例如:

    {
    name: "operations/customers/01b3fqdm/lro/AOIL6eBv7fEfiZ_hUSpm8KQDt1Mnd6dj5Ru3MXf-jri4xK6Pyb2-Lwfn8vQKg74pgxlxjrY"
    done: true
    }
    

创建数据源

接下来,在管理控制台中创建数据源。数据源提供了一个命名空间,用于使用连接器为内容编制索引。

  1. 打开 Google 管理控制台
  2. 点击“应用”图标。系统会显示“应用管理”页面。
  3. 点击 Google Workspace。系统随即会显示“Google Workspace 应用管理”页面。
  4. 向下滚动,然后点击 Cloud Search。系统随即会显示“Google Workspace 设置”页面。
  5. 点击第三方数据源。系统随即会显示“数据源”页面。
  6. 点击黄色圆圈中的 +。系统随即会显示“添加新的数据源”对话框。
  7. 显示名称字段中,输入“tutorial”。
  8. 服务账号电子邮件地址字段中,输入您在上一部分中创建的服务账号的电子邮件地址。如果您不知道服务账号的电子邮件地址,请在服务账号页面中查找相应值。
  9. 点击添加。系统会显示“已成功创建数据源”对话框。
  10. 点击 *OK。记下新创建的数据源的来源 ID。来源 ID 用于配置内容连接器。

为 GitHub API 生成个人访问令牌

连接器需要经过身份验证才能访问 GitHub API,以便获得足够的配额。为简单起见,连接器利用的是个人访问令牌,而不是 OAuth。个人令牌允许以具有有限权限的用户身份进行身份验证,类似于 OAuth。

  1. 登录 GitHub。
  2. 点击右上角的个人资料照片。系统随即会显示一个下拉菜单。
  3. 点击设置
  4. 点击开发者设置
  5. 点击个人访问令牌
  6. 点击生成个人访问令牌
  7. 备注字段中,输入“Cloud Search 教程”。
  8. 检查 public_repo 范围。
  9. 点击 生成令牌
  10. 记下生成的令牌。连接器使用此令牌来调用 GitHub API,并提供 API 配额以执行索引编制。

配置连接器

创建凭据和数据源后,请更新连接器配置以添加以下值:

  1. 从命令行中,将目录更改为 cloud-search-samples/end-to-end/connector/
  2. 通过文本编辑器打开 sample-config.properties 文件。
  3. api.serviceAccountPrivateKeyFile 参数设置为您之前下载的服务凭据的文件路径。
  4. api.sourceId 参数设置为您之前创建的数据源的 ID。
  5. github.user 参数设置为您的 GitHub 用户名。
  6. github.token 参数设置为您之前创建的访问令牌。
  7. 保存文件。

更新架构

连接器可将结构化和非结构化内容编入索引。在为数据编制索引之前,您必须更新数据源的架构。运行以下命令以更新架构:

mvn exec:java -Dexec.mainClass=com.google.cloudsearch.tutorial.SchemaTool \
    -Dexec.args="-Dconfig=sample-config.properties"

运行该连接器

如需运行连接器并开始编入索引,请运行以下命令:

mvn exec:java -Dexec.mainClass=com.google.cloudsearch.tutorial.GithubConnector \
    -Dexec.args="-Dconfig=sample-config.properties"

连接器的默认配置是将 googleworkspace 组织中的单个代码库编入索引。为代码库编制索引大约需要 1 分钟。 初始索引编制完成后,连接器会继续轮询代码库的更改,以便在 Cloud Search 索引中反映这些更改。

查看代码

其余部分将介绍如何构建连接器。

正在启动应用

连接器的入口点是 GithubConnector 类。main 方法会实例化 SDK 的 IndexingApplication 并启动它。

GithubConnector.java
/**
 * Main entry point for the connector. Creates and starts an indexing
 * application using the {@code ListingConnector} template and the sample's
 * custom {@code Repository} implementation.
 *
 * @param args program command line arguments
 * @throws InterruptedException thrown if an abort is issued during initialization
 */
public static void main(String[] args) throws InterruptedException {
  Repository repository = new GithubRepository();
  IndexingConnector connector = new ListingConnector(repository);
  IndexingApplication application = new IndexingApplication.Builder(connector, args)
      .build();
  application.start();
}

SDK 提供的 ListingConnector 实现了一种遍历策略,该策略利用 Cloud Search 队列来跟踪索引中商品的状态。它会委托给由示例连接器实现的 GithubRepository,以访问 GitHub 中的内容。

遍历 GitHub 代码库

在完整遍历期间,系统会调用 getIds() 方法,将可能需要编入索引的项推送到队列中。

连接器可以为多个代码库或组织编制索引。为了尽量减少故障的影响,系统一次遍历一个 GitHub 代码库。系统会随遍历结果返回一个检查点,其中包含要在后续对 getIds() 的调用中编制索引的仓库列表。如果发生错误,系统会从当前代码库恢复索引编制,而不是从头开始。

GithubRepository.java
/**
 * Gets all of the existing item IDs from the data repository. While
 * multiple repositories are supported, only one repository is traversed
 * per call. The remaining repositories are saved in the checkpoint
 * are traversed on subsequent calls. This minimizes the amount of
 * data that needs to be reindex in the event of an error.
 *
 * <p>This method is called by {@link ListingConnector#traverse()} during
 * <em>full traversals</em>. Every document ID and metadata hash value in
 * the <em>repository</em> is pushed to the Cloud Search queue. Each pushed
 * document is later polled and processed in the {@link #getDoc(Item)} method.
 * <p>
 * The metadata hash values are pushed to aid document change detection. The
 * queue sets the document status depending on the hash comparison. If the
 * pushed ID doesn't yet exist in Cloud Search, the document's status is
 * set to <em>new</em>. If the ID exists but has a mismatched hash value,
 * its status is set to <em>modified</em>. If the ID exists and matches
 * the hash value, its status is unchanged.
 *
 * <p>In every case, the pushed content hash value is only used for
 * comparison. The hash value is only set in the queue during an
 * update (see {@link #getDoc(Item)}).
 *
 * @param checkpoint value defined and maintained by this connector
 * @return this is typically a {@link PushItems} instance
 */
@Override
public CheckpointCloseableIterable<ApiOperation> getIds(byte[] checkpoint)
    throws RepositoryException {
  List<String> repositories;
  // Decode the checkpoint if present to get the list of remaining
  // repositories to index.
  if (checkpoint != null) {
    try {
      FullTraversalCheckpoint decodedCheckpoint = FullTraversalCheckpoint
          .fromBytes(checkpoint);
      repositories = decodedCheckpoint.getRemainingRepositories();
    } catch (IOException e) {
      throw new RepositoryException.Builder()
          .setErrorMessage("Unable to deserialize checkpoint")
          .setCause(e)
          .build();
    }
  } else {
    // No previous checkpoint, scan for repositories to index
    // based on the connector configuration.
    try {
      repositories = scanRepositories();
    } catch (IOException e) {
      throw toRepositoryError(e, Optional.of("Unable to scan repositories"));
    }
  }

  if (repositories.isEmpty()) {
    // Nothing left to index. Reset the checkpoint to null so the
    // next full traversal starts from the beginning
    Collection<ApiOperation> empty = Collections.emptyList();
    return new CheckpointCloseableIterableImpl.Builder<>(empty)
        .setCheckpoint((byte[]) null)
        .setHasMore(false)
        .build();
  }

  // Still have more repositories to index. Pop the next repository to
  // index off the list. The remaining repositories make up the next
  // checkpoint.
  String repositoryToIndex = repositories.get(0);
  repositories = repositories.subList(1, repositories.size());

  try {
    log.info(() -> String.format("Traversing repository %s", repositoryToIndex));
    Collection<ApiOperation> items = collectRepositoryItems(repositoryToIndex);
    FullTraversalCheckpoint newCheckpoint = new FullTraversalCheckpoint(repositories);
    return new CheckpointCloseableIterableImpl.Builder<>(items)
        .setHasMore(true)
        .setCheckpoint(newCheckpoint.toBytes())
        .build();
  } catch (IOException e) {
    String errorMessage = String.format("Unable to traverse repo: %s",
        repositoryToIndex);
    throw toRepositoryError(e, Optional.of(errorMessage));
  }
}

方法 collectRepositoryItems() 用于处理单个 GitHub 代码库的遍历。此方法会返回一个 ApiOperations 集合,表示要推送到队列中的项。推送的项包括资源名称和表示项当前状态的哈希值。

哈希值用于后续对 GitHub 代码库的遍历。此值提供了一种轻量级检查,用于确定内容是否已更改,而无需上传其他内容。连接器会盲目地将所有商品排入队列。如果商品是新商品或哈希值已更改,则该商品可供在队列中轮询。否则,该项会被视为未修改。

GithubRepository.java
/**
 * Fetch IDs to  push in to the queue for all items in the repository.
 * Currently captures issues & content in the master branch.
 *
 * @param name Name of repository to index
 * @return Items to push into the queue for later indexing
 * @throws IOException if error reading issues
 */
private Collection<ApiOperation> collectRepositoryItems(String name)
    throws IOException {
  List<ApiOperation> operations = new ArrayList<>();
  GHRepository repo = github.getRepository(name);

  // Add the repository as an item to be indexed
  String metadataHash = repo.getUpdatedAt().toString();
  String resourceName = repo.getHtmlUrl().getPath();
  PushItem repositoryPushItem = new PushItem()
      .setMetadataHash(metadataHash);
  PushItems items = new PushItems.Builder()
      .addPushItem(resourceName, repositoryPushItem)
      .build();

  operations.add(items);
  // Add issues/pull requests & files
  operations.add(collectIssues(repo));
  operations.add(collectContent(repo));
  return operations;
}

处理队列

完全遍历完成后,连接器会开始轮询队列中需要编入索引的项目。系统会针对从队列中提取的每个项调用 getDoc() 方法。该方法从 GitHub 读取相应项,并将其转换为适合编入索引的表示形式。

由于连接器是针对可能随时更改的实时数据运行的,因此 getDoc() 还会验证队列中的商品是否仍然有效,并从索引中删除不再存在的任何商品。

GithubRepository.java
/**
 * Gets a single data repository item and indexes it if required.
 *
 * <p>This method is called by the {@link ListingConnector} during a poll
 * of the Cloud Search queue. Each queued item is processed
 * individually depending on its state in the data repository.
 *
 * @param item the data repository item to retrieve
 * @return the item's state determines which type of
 * {@link ApiOperation} is returned:
 * {@link RepositoryDoc}, {@link DeleteItem}, or {@link PushItem}
 */
@Override
public ApiOperation getDoc(Item item) throws RepositoryException {
  log.info(() -> String.format("Processing item: %s ", item.getName()));
  Object githubObject;
  try {
    // Retrieve the item from GitHub
    githubObject = getGithubObject(item.getName());
    if (githubObject instanceof GHRepository) {
      return indexItem((GHRepository) githubObject, item);
    } else if (githubObject instanceof GHPullRequest) {
      return indexItem((GHPullRequest) githubObject, item);
    } else if (githubObject instanceof GHIssue) {
      return indexItem((GHIssue) githubObject, item);
    } else if (githubObject instanceof GHContent) {
      return indexItem((GHContent) githubObject, item);
    } else {
      String errorMessage = String.format("Unexpected item received: %s",
          item.getName());
      throw new RepositoryException.Builder()
          .setErrorMessage(errorMessage)
          .setErrorType(RepositoryException.ErrorType.UNKNOWN)
          .build();
    }
  } catch (FileNotFoundException e) {
    log.info(() -> String.format("Deleting item: %s ", item.getName()));
    return ApiOperations.deleteItem(item.getName());
  } catch (IOException e) {
    String errorMessage = String.format("Unable to retrieve item: %s",
        item.getName());
    throw toRepositoryError(e, Optional.of(errorMessage));
  }
}

对于连接器编入索引的每个 GitHub 对象,相应的 indexItem() 方法会处理为 Cloud Search 构建项目表示形式。例如,如需为内容项构建表示形式,请执行以下操作:

GithubRepository.java
/**
 * Build the ApiOperation to index a content item (file).
 *
 * @param content      Content item to index
 * @param previousItem Previous item state in the index
 * @return ApiOperation (RepositoryDoc if indexing,  PushItem if not modified)
 * @throws IOException if unable to create operation
 */
private ApiOperation indexItem(GHContent content, Item previousItem)
    throws IOException {
  String metadataHash = content.getSha();

  // If previously indexed and unchanged, just requeue as unmodified
  if (canSkipIndexing(previousItem, metadataHash)) {
    return notModified(previousItem.getName());
  }

  String resourceName = new URL(content.getHtmlUrl()).getPath();
  FieldOrValue<String> title = FieldOrValue.withValue(content.getName());
  FieldOrValue<String> url = FieldOrValue.withValue(content.getHtmlUrl());

  String containerName = content.getOwner().getHtmlUrl().getPath();
  String programmingLanguage = FileExtensions.getLanguageForFile(content.getName());

  // Structured data based on the schema
  Multimap<String, Object> structuredData = ArrayListMultimap.create();
  structuredData.put("organization", content.getOwner().getOwnerName());
  structuredData.put("repository", content.getOwner().getName());
  structuredData.put("path", content.getPath());
  structuredData.put("language", programmingLanguage);

  Item item = IndexingItemBuilder.fromConfiguration(resourceName)
      .setTitle(title)
      .setContainerName(containerName)
      .setSourceRepositoryUrl(url)
      .setItemType(IndexingItemBuilder.ItemType.CONTAINER_ITEM)
      .setObjectType("file")
      .setValues(structuredData)
      .setVersion(Longs.toByteArray(System.currentTimeMillis()))
      .setHash(content.getSha())
      .build();

  // Index the file content too
  String mimeType = FileTypeMap.getDefaultFileTypeMap()
      .getContentType(content.getName());
  AbstractInputStreamContent fileContent = new InputStreamContent(
      mimeType, content.read())
      .setLength(content.getSize())
      .setCloseInputStream(true);
  return new RepositoryDoc.Builder()
      .setItem(item)
      .setContent(fileContent, IndexingService.ContentFormat.RAW)
      .setRequestMode(IndexingService.RequestMode.SYNCHRONOUS)
      .build();
}

接下来,部署搜索界面。

上一个 下一个