Guardrails API

Checks Guardrails API 现已推出 Alpha 版,目前为非公开预览版。 使用我们的意向调查表申请非公开预览版的访问权限。

Guardrails API 是一种 API,可用于检查文本是否可能有害或不安全。您可以在 GenAI 应用中使用此 API,以防止用户接触到可能有害的内容。

如何使用安全护栏?

在生成式 AI 输入和输出中使用检查护栏,以检测和缓解违反政策的文本。

图表:展示如何在生成式 AI 应用中使用 Guardrails API

为何要使用安全护栏?

LLM 有时可能会生成可能有害或不当的内容。 将 Guardrails API 集成到 GenAI 应用中对于确保以负责任且更安全的方式使用大语言模型 (LLM) 至关重要。它通过过滤掉各种可能有害的输出内容(包括不当言论、歧视性言论以及可能促成伤害的内容),帮助您降低与生成的内容相关的风险。这不仅可以保护您的用户,还可以维护应用的声誉,并赢得观众的信任。通过优先考虑安全性和责任性,Guardrails 可帮助您构建既创新又安全的生成式 AI 应用。

使用入门

本指南介绍了如何使用 Guardrails API 检测和过滤应用中的不当内容。该 API 提供各种预训练政策,可识别不同类型的潜在有害内容,例如仇恨言论、暴力内容和露骨色情内容。您还可以通过为每项政策设置阈值来自定义 API 的行为。

前提条件

  1. 确保您的 Google Cloud 项目已获批参与 Checks AI Safety Private Preview。如果您尚未申请,请使用我们的意向表单申请使用权限。
  2. 启用 Checks API。
  3. 确保您能够按照我们的授权指南发送授权请求。

支持的政策

政策名称 政策说明 政策类型 API 枚举值
危险内容 助长、宣传或允许访问有害商品、服务和活动的内容。 DANGEROUS_CONTENT
索取和背诵个人身份信息 征求或披露个人敏感信息或数据的内容。 PII_SOLICITING_RECITING
骚扰内容 针对他人的恶意、恐吓、欺凌或辱骂性内容。 HARASSMENT
露骨色情内容 露骨色情内容。 SEXUALLY_EXPLICIT
仇恨言论 通常被认为是仇恨言论的内容。 HATE_SPEECH
医疗信息 禁止发布助长、宣扬或提供有害医疗建议或指导的内容。 MEDICAL_INFO
暴力和血腥内容 包含对写实暴力和/或血腥场景的无端描述的内容。 VIOLENCE_AND_GORE
淫秽内容和脏话 禁止发布包含粗俗、亵渎性或冒犯性语言的内容。 OBSCENITY_AND_PROFANITY

代码段

Python

运行 pip install google-api-python-client 以安装 Google API Python 客户端。


  import logging
  from google.oauth2 import service_account
  from googleapiclient.discovery import build

  SECRET_FILE_PATH = 'path/to/your/secret.json'

  credentials = service_account.Credentials.from_service_account_file(
      SECRET_FILE_PATH, scopes=['https://www.googleapis.com/auth/checks']
  )

  service = build('checks', 'v1alpha', credentials=credentials)

  request = service.aisafety().classifyContent(
      body={
          'input': {
              'textInput': {
                  'content': 'Mix, bake, cool, frost, and enjoy.',
                  'languageCode': 'en',
              }
          },
          'policies': [
              {'policyType': 'DANGEROUS_CONTENT'}
          ],  # Default Checks-defined threshold is used
      }
  )

  response = request.execute()

  for policy_result in response['policyResults']:
    logging.warning(
        'Policy: %s, Score: %s, Violation result: %s',
        policy_result['policyType'],
        policy_result['score'],
        policy_result['violationResult'],
    )

Go

运行 go get google.golang.org/api/checks/v1alpha 以安装 Checks API Go 客户端。


  package main

  import (
    "context"
    "log/slog"

    checks "google.golang.org/api/checks/v1alpha"
    option "google.golang.org/api/option"
  )

  const credsFilePath = "path/to/your/secret.json"

  func main() {
    ctx := context.Background()
    checksService, err := checks.NewService(
      ctx,
      option.WithEndpoint("https://checks.googleapis.com"),
      option.WithCredentialsFile(credsFilePath),
      option.WithScopes("https://www.googleapis.com/auth/checks"),
    )
    if err != nil {
      // Handle error
    }

    req := &checks.GoogleChecksAisafetyV1alphaClassifyContentRequest{
      Input: &checks.GoogleChecksAisafetyV1alphaClassifyContentRequestInputContent{
        TextInput: &checks.GoogleChecksAisafetyV1alphaTextInput{
          Content:      "Mix, bake, cool, frost, and enjoy.",
          LanguageCode: "en",
        },
      },
      Policies: []*checks.GoogleChecksAisafetyV1alphaClassifyContentRequestPolicyConfig{
        {PolicyType: "DANGEROUS_CONTENT"}, // Default Checks-defined threshold is used
      },
    }

    classificationResults, err := checksService.Aisafety.ClassifyContent(req).Do()
    if err != nil {
      // Handle error
    }

    for _, policy := range classificationResults.PolicyResults {
      slog.Info("Checks Guardrails violation: ", "Policy", policy.PolicyType, "Score", policy.Score, "Violation Result", policy.ViolationResult)
    }
  }

REST

注意:此示例使用的是 oauth2l CLI 工具

YOUR_GCP_PROJECT_ID 替换为已获授 Guardrails API 访问权限的 Google Cloud 项目 ID。

  curl -X POST   https://checks.googleapis.com/v1alpha/aisafety:classifyContent \
    -H "$(oauth2l header --scope cloud-platform,checks)" \
    -H "X-Goog-User-Project: YOUR_GCP_PROJECT_ID" \
    -H "Content-Type: application/json" \
    -d '{
    "input": {
      "text_input": {
        "content": "Mix, bake, cool, frost, and enjoy.",
        "language_code": "en"
      }
    },
    "policies": [
      {
        "policy_type": "HARASSMENT",
        "threshold": "0.5"
      },
      {
        "policy_type": "DANGEROUS_CONTENT",
      },
    ]
  }'

响应示例

  {
    "policyResults": [
      {
        "policyType": "HARASSMENT",
        "score": 0.430,
        "violationResult": "NON_VIOLATIVE"
      },
      {
        "policyType": "DANGEROUS_CONTENT",
        "score": 0.764,
        "violationResult": "VIOLATIVE"
      },
      {
        "policyType": "OBSCENITY_AND_PROFANITY",
        "score": 0.876,
        "violationResult": "VIOLATIVE"
      },
      {
        "policyType": "SEXUALLY_EXPLICIT",
        "score": 0.197,
        "violationResult": "NON_VIOLATIVE"
      },
      {
        "policyType": "HATE_SPEECH",
        "score": 0.45,
        "violationResult": "NON_VIOLATIVE"
      },
      {
        "policyType": "MEDICAL_INFO",
        "score": 0.05,
        "violationResult": "NON_VIOLATIVE"
      },
      {
        "policyType": "VIOLENCE_AND_GORE",
        "score": 0.964,
        "violationResult": "VIOLATIVE"
      },
      {
        "policyType": "PII_SOLICITING_RECITING",
        "score": 0.0009,
        "violationResult": "NON_VIOLATIVE"
      }
    ]
  }

使用场景

您可以根据自己的具体需求和风险承受能力,以多种方式将 Guardrails API 集成到 LLM 应用中。以下是一些常见使用场景示例:

无安全防护栏干预 - 日志记录

在此场景中,Guardrails API 在使用时未对应用的行为进行任何更改。不过,系统会记录潜在的政策违规行为,以用于监控和审核。这些信息还可用于识别潜在的 LLM 安全风险。

Python

  import logging
  from google.oauth2 import service_account
  from googleapiclient.discovery import build

  # Checks API configuration
  class ChecksConfig:

    def __init__(self, scope, creds_file_path):
      self.scope = scope
      self.creds_file_path = creds_file_path

  my_checks_config = ChecksConfig(
      scope='https://www.googleapis.com/auth/checks',
      creds_file_path='path/to/your/secret.json',
  )

  def new_checks_service(config):
    """Creates a new Checks API service."""

    credentials = service_account.Credentials.from_service_account_file(
        config.creds_file_path, scopes=[config.scope]
    )

    service = build('checks', 'v1alpha', credentials=credentials)
    return service

  def fetch_checks_violation_results(content, context=''):
    """Fetches violation results from the Checks API."""

    service = new_checks_service(my_checks_config)

    request = service.aisafety().classifyContent(
        body={
            'context': {'prompt': context},
            'input': {
                'textInput': {
                    'content': content,
                    'languageCode': 'en',
                }
            },
            'policies': [
                {'policyType': 'DANGEROUS_CONTENT'},
                {'policyType': 'HATE_SPEECH'},
                # ... add more policies
            ],
        }
    )

    response = request.execute()
    return response

  def fetch_user_prompt():
    """Imitates retrieving the input prompt from the user."""
    return 'How do I bake a cake?'

  def fetch_llm_response(prompt):
    """Imitates the call to an LLM endpoint."""
    return 'Mix, bake, cool, frost, enjoy.'

  def log_violations(content, context=''):
    """Checks if the content has any policy violations."""

    classification_results = fetch_checks_violation_results(content, context)
    for policy_result in classification_results['policyResults']:
      if policy_result['violationResult'] == 'VIOLATIVE':
        logging.warning(
            'Policy: %s, Score: %s, Violation result: %s',
            policy_result['policyType'],
            policy_result['score'],
            policy_result['violationResult'],
        )
    return False

  if __name__ == '__main__':
    user_prompt = fetch_user_prompt()
    log_violations(user_prompt)
    llm_response = fetch_llm_response(user_prompt)

    log_violations(llm_response, user_prompt)
    print(llm_response)

Go

  package main

  import (
    "context"
    "fmt"
    "log/slog"

    checks "google.golang.org/api/checks/v1alpha"
    option "google.golang.org/api/option"
  )

  type checksConfig struct {
    scope            string
    credsFilePath    string
    endpoint         string
  }

  var myChecksConfig = checksConfig{
    scope:            "https://www.googleapis.com/auth/checks",
    credsFilePath:    "path/to/your/secret.json",
    endpoint:         "https://checks.googleapis.com",
  }

  func newChecksService(ctx context.Context, cfg checksConfig) (*checks.Service, error) {
    return checks.NewService(
      ctx,
      option.WithEndpoint(cfg.endpoint),
      option.WithCredentialsFile(cfg.credsFilePath),
      option.WithScopes(cfg.scope),
    )
  }

  func fetchChecksViolationResults(ctx context.Context, content string, context string) (*checks.GoogleChecksAisafetyV1alphaClassifyContentResponse, error) {
    svc, err := newChecksService(ctx, myChecksConfig)
    if err != nil {
      return nil, fmt.Errorf("failed to create checks service: %w", err)
    }

    req := &checks.GoogleChecksAisafetyV1alphaClassifyContentRequest{
      Context: &checks.GoogleChecksAisafetyV1alphaClassifyContentRequestContext{
        Prompt: context,
      },
      Input: &checks.GoogleChecksAisafetyV1alphaClassifyContentRequestInputContent{
        TextInput: &checks.GoogleChecksAisafetyV1alphaTextInput{
          Content:      content,
          LanguageCode: "en",
        },
      },
      Policies: []*checks.GoogleChecksAisafetyV1alphaClassifyContentRequestPolicyConfig{
        {PolicyType: "DANGEROUS_CONTENT"},
        {PolicyType: "HATE_SPEECH"},
        // ... add more policies
      },
    }

    response, err := svc.Aisafety.ClassifyContent(req).Do()
    if err != nil {
      return nil, fmt.Errorf("failed to classify content: %w", err)
    }

    return response, nil
  }

  // Imitates retrieving the input prompt from the user.
  func fetchUserPrompt() string {
    return "How do I bake a cake?"
  }

  // Imitates the call to an LLM endpoint.
  func fetchLLMResponse(prompt string) string {
    return "Mix, bake, cool, frost, enjoy."
  }

  func logViolations(ctx context.Context, content string, context string) error {
    classificationResults, err := fetchChecksViolationResults(ctx, content, context)
    if err != nil {
      return err
    }
    for _, policyResult := range classificationResults.PolicyResults {
      if policyResult.ViolationResult == "VIOLATIVE" {
        slog.Warn("Checks Guardrails violation: ", "Policy", policyResult.PolicyType, "Score", policyResult.Score, "Violation Result", policyResult.ViolationResult)
      }
    }
    return nil
  }

  func main() {
    ctx := context.Background()

    userPrompt := fetchUserPrompt()
    err := logViolations(ctx, userPrompt, "")
    if err != nil {
      // Handle error
    }

    llmResponse := fetchLLMResponse(userPrompt)
    err = logViolations(ctx, llmResponse, userPrompt)
    if err != nil {
      // Handle error
    }

    fmt.Println(llmResponse)
  }

防护措施根据政策阻止了操作

在此示例中,Guardrails API 会屏蔽不安全的用户输入和模型回答。它会同时根据预定义的安全政策(例如仇恨言论、危险内容)进行检查。这样可以防止 AI 生成可能有害的输出,并保护用户免遭不当内容的侵害。

Python

  from google.oauth2 import service_account
  from googleapiclient.discovery import build

  # Checks API configuration
  class ChecksConfig:

    def __init__(self, scope, creds_file_path, default_threshold):
      self.scope = scope
      self.creds_file_path = creds_file_path
      self.default_threshold = default_threshold

  my_checks_config = ChecksConfig(
      scope='https://www.googleapis.com/auth/checks',
      creds_file_path='path/to/your/secret.json',
      default_threshold=0.6,
  )

  def new_checks_service(config):
    """Creates a new Checks API service."""

    credentials = service_account.Credentials.from_service_account_file(
        config.creds_file_path, scopes=[config.scope]
    )

    service = build('checks', 'v1alpha', credentials=credentials)
    return service

  def fetch_checks_violation_results(content, context=''):
    """Fetches violation results from the Checks API."""

    service = new_checks_service(my_checks_config)

    request = service.aisafety().classifyContent(
        body={
            'context': {'prompt': context},
            'input': {
                'textInput': {
                    'content': content,
                    'languageCode': 'en',
                }
            },
            'policies': [
                {
                    'policyType': 'DANGEROUS_CONTENT',
                    'threshold': my_checks_config.default_threshold,
                },
                {'policyType': 'HATE_SPEECH'},
                # ... add more policies
            ],
        }
    )

    response = request.execute()
    return response

  def fetch_user_prompt():
    """Imitates retrieving the input prompt from the user."""
    return 'How do I bake a cake?'

  def fetch_llm_response(prompt):
    """Imitates the call to an LLM endpoint."""
    return 'Mix, bake, cool, frost, enjoy.'

  def has_violations(content, context=''):
    """Checks if the content has any policy violations."""

    classification_results = fetch_checks_violation_results(content, context)
    for policy_result in classification_results['policyResults']:
      if policy_result['violationResult'] == 'VIOLATIVE':
        return True
    return False

  if __name__ == '__main__':
    user_prompt = fetch_user_prompt()
    if has_violations(user_prompt):
      print("Sorry, I can't help you with this request.")
    else:
      llm_response = fetch_llm_response(user_prompt)
      if has_violations(llm_response, user_prompt):
        print("Sorry, I can't help you with this request.")
      else:
        print(llm_response)

Go

  package main

  import (
    "context"
    "fmt"

    checks "google.golang.org/api/checks/v1alpha"
    option "google.golang.org/api/option"
  )

  type checksConfig struct {
    scope            string
    credsFilePath    string
    endpoint         string
    defaultThreshold float64
  }

  var myChecksConfig = checksConfig{
    scope:            "https://www.googleapis.com/auth/checks",
    credsFilePath:    "path/to/your/secret.json",
    endpoint:         "https://checks.googleapis.com",
    defaultThreshold: 0.6,
  }

  func newChecksService(ctx context.Context, cfg checksConfig) (*checks.Service, error) {
    return checks.NewService(
      ctx,
      option.WithEndpoint(cfg.endpoint),
      option.WithCredentialsFile(cfg.credsFilePath),
      option.WithScopes(cfg.scope),
    )
  }

  func fetchChecksViolationResults(ctx context.Context, content string, context string) (*checks.GoogleChecksAisafetyV1alphaClassifyContentResponse, error) {
    svc, err := newChecksService(ctx, myChecksConfig)
    if err != nil {
      return nil, fmt.Errorf("failed to create checks service: %w", err)
    }

    req := &checks.GoogleChecksAisafetyV1alphaClassifyContentRequest{
      Context: &checks.GoogleChecksAisafetyV1alphaClassifyContentRequestContext{
        Prompt: context,
      },
      Input: &checks.GoogleChecksAisafetyV1alphaClassifyContentRequestInputContent{
        TextInput: &checks.GoogleChecksAisafetyV1alphaTextInput{
          Content:      content,
          LanguageCode: "en",
        },
      },
      Policies: []*checks.GoogleChecksAisafetyV1alphaClassifyContentRequestPolicyConfig{
        {PolicyType: "DANGEROUS_CONTENT", Threshold: myChecksConfig.defaultThreshold},
        {PolicyType: "HATE_SPEECH"}, // default Checks-defined threshold is used
        // ... add more policies
      },
    }

    response, err := svc.Aisafety.ClassifyContent(req).Do()
    if err != nil {
      return nil, fmt.Errorf("failed to classify content: %w", err)
    }

    return response, nil
  }

  // Imitates retrieving the input prompt from the user.
  func fetchUserPrompt() string {
    return "How do I bake a cake?"
  }

  // Imitates the call to an LLM endpoint.
  func fetchLLMResponse(prompt string) string {
    return "Mix, bake, cool, frost, enjoy."
  }

  func hasViolations(ctx context.Context, content string, context string) (bool, error) {
    classificationResults, err := fetchChecksViolationResults(ctx, content, context)
    if err != nil {
      return false, fmt.Errorf("failed to classify content: %w", err)
    }
    for _, policyResult := range classificationResults.PolicyResults {
      if policyResult.ViolationResult == "VIOLATIVE" {
        return true, nil
      }
    }
    return false, nil
  }

  func main() {
    ctx := context.Background()

    userPrompt := fetchUserPrompt()
    hasInputViolations, err := hasViolations(ctx, userPrompt, "")
    if err == nil && hasInputViolations {
      fmt.Println("Sorry, I can't help you with this request.")
      return
    }

    llmResponse := fetchLLMResponse(userPrompt)
    hasOutputViolations, err := hasViolations(ctx, llmResponse, userPrompt)
    if err == nil && hasOutputViolations {
      fmt.Println("Sorry, I can't help you with this request.")
      return
    }

    fmt.Println(llmResponse)
  }

将 LLM 输出流式传输到保护措施

在以下示例中,我们将 LLM 的输出流式传输到 Guardrails API。这可用于降低用户感知到的延迟时间。由于上下文不完整,此方法可能会导致误报,因此请务必确保 LLM 输出具有足够的上下文,以便 Guardrails 在调用 API 之前做出准确的评估。

同步保护措施调用

Python

  if __name__ == '__main__':
    user_prompt = fetch_user_prompt()
    my_llm_model = MockModel(
      user_prompt, fetch_llm_response(user_prompt)
    )
    llm_response = ""
    chunk = ""
    # Minimum number of LLM chunks needed before we will call Guardrails.
    contextThreshold = 2
    while not my_llm_model.finished:
      chunk = my_llm_model.next_chunk()
      llm_response += str(chunk)
      if my_llm_model.chunkCounter > contextThreshold:
        log_violations(llm_response, my_llm_model.userPrompt)

Go

  func main() {
    ctx := context.Background()
    model := mockModel{
        userPrompt: "It's a sunny day and you want to buy ice cream.",
        response:   []string{"What a lovely day", "to get some ice cream.", "is the shop open?"},
    }
    // Minimum number of LLM chunks needed before we will call Guardrails.
    const contextThreshold = 2
    var llmResponse string
    for !model.finished {
      chunk := model.nextChunk()
      llmResponse += chunk + " "
      if model.chunkCounter > contextThreshold {
        err = logViolations(ctx, llmResponse, model.userPrompt)
        if err != nil {
            // Handle error
        }
      }
    }
  }

异步保护措施调用

Python

  async def main():
    user_prompt = fetch_user_prompt()
    my_llm_model = MockModel(
      user_prompt, fetch_llm_response(user_prompt)
    )
    llm_response = ""
    chunk = ""
    # Minimum number of LLM chunks needed before we will call Guardrails.
    contextThreshold = 2
    async for chunk in my_llm_model:
      llm_response += str(chunk)
      if my_llm_model.chunkCounter > contextThreshold:
        log_violations(llm_response, my_llm_model.userPrompt)
    asyncio.run(main())

Go

  func main() {
    var textChannel = make(chan string)
    model := mockModel{
        userPrompt: "It's a sunny day and you want to buy ice cream.",
        response:   []string{"What a lovely day", "to get some ice cream.", "is the shop open?"},
    }
    var llmResponse string

    // Minimum number of LLM chunks needed before we will call Guardrails.
    const contextThreshold = 2
    go model.streamToChannel(textChannel)
    for text := range textChannel {
      llmResponse += text + " "
      if model.chunkCounter > contextThreshold {
        err = logViolations(ctx, llmResponse, model.userPrompt)
        if err != nil {
          // Handle error
        }
      }
    }
  }

常见问题解答

如果我已达到 Guardrails API 的配额限制,该怎么办?

如需申请增加配额,请发送电子邮件至 checks-support@google.com,并在邮件中说明您的请求。 请在电子邮件中提供以下信息:

  • 您的 Google Cloud 项目编号:这有助于我们快速识别您的账号。
  • 使用情形详情:请说明您如何使用 Guardrails API。
  • 所需配额量:指定您需要多少额外配额。