Google App Engine

Using Push Queues in Python

In App Engine push queues, a task is a unit of work to be performed by the application. Each task is an object of the Task class. Each Task object contains an application-specific URL with a request handler for the task, and an optional data payload that parameterizes the task.

For example, consider a calendaring application that needs to notify an invitee, via email, that an event has been updated. The data payload for this task consists of the email address and name of the invitee, along with a description of the event. The webhook might live at /app_worker/send_email and contain a function that adds the relevant strings to an email template and sends the email. The app can create a separate task for each email it needs to send.

You can use push queues only within the App Engine environment; if you need to access App Engine tasks from outside of App Engine, use pull queues.

  1. Using push queues
  2. Push task execution
  3. Deferred tasks
  4. URL endpoints
  5. Push queues and the development server
  6. Push queues and backends
  7. Quotas and limits for push queues

Using push queues

A Python app sets up queues using a configuration file named queue.yaml (see Python Task Queue Configuration). If an app does not have a queue.yaml file, it has a queue named default with some default settings.

To enqueue a task, you call the taskqueue.add() function. (You can also create a Task object and call its add() method.) The task consists of data for a request, including a URL path, parameters, HTTP headers, and an HTTP payload. It can also include the earliest time to execute the task (the default is as soon as possible) and a name for the task. The task is added to a queue, then performed by the Task Queue service as the queue is processed.

The following example defines a task handler (CounterWorker) that increments a counter in the datastore, mapped to the URL /worker. It also defines a user-accessible request handler that displays the current value of the counter for a GET request, and for a POST request enqueues a task and returns. It's difficult to visualize the execution of tasks with the user-accessible handler if the queue processes them too quickly. Therefore, the task in this example should run at a rate no greater than once per second.

import os

import jinja2
import webapp2

from google.appengine.api import taskqueue
from google.appengine.ext import ndb


JINJA_ENV = jinja2.Environment(
    loader=jinja2.FileSystemLoader(os.path.dirname(__file__)))


class Counter(ndb.Model):
    count = ndb.IntegerProperty(indexed=False)

class CounterHandler(webapp2.RequestHandler):
    def get(self):
        template_values = {'counters': Counter.query()}
        counter_template = JINJA_ENV.get_template('counter.html')
        self.response.out.write(counter_template.render(template_values))

    def post(self):
        key = self.request.get('key')

        # Add the task to the default queue.
        taskqueue.add(url='/worker', params={'key': key})

        self.redirect('/')

class CounterWorker(webapp2.RequestHandler):
    def post(self): # should run at most 1/s due to entity group limit
        key = self.request.get('key')
        @ndb.transactional
        def update_counter():
            counter = Counter.get_or_insert(key, count=0)
            counter.count += 1
            counter.put()
        update_counter()


APP = webapp2.WSGIApplication(
    [
        ('/', CounterHandler),
        ('/worker', CounterWorker)
    ], debug=True)

(In this example, 'counters.html' refers to a Jinja2 template that contains the HTML for a page that displays the counter value, and a button to trigger a POST request to the / URL.) The full application is available at our github repository.

Note that this example is not idempotent. It is possible for the task queue to execute a task more than once. In this case, the counter is incremented each time the task is run, possibly skewing the results.

Push task execution

App Engine executes push tasks by sending HTTP requests to your app. Specifying a programmatic asynchronous callback as an HTTP request is sometimes called a web hook. The web hook model enables efficient parallel processing.

The task's URL determines the handler for the task and the module that runs the handler.

The handler is determined by the path part of the URL (the forward-slash separated string following the hostname), which is specified by using the url parameter of the Task class. The url must be relative and local to your application's root directory.

The module (or frontend or backend) and version in which the handler runs is determined by:

  • The target or header keyword arguments in your call to the Task() constructor.
  • The target directive in the queue.yaml file.

If you do not specify any of these parameters, the task will run in the same module/version in which it was enqueued, subject to these rules:

  • If the default version of the app enqueues a task, the task will run on the default version. Note that if the app enqueues a task and the default version is changed before the task actually runs, the task will be executed in the new default version.
  • If a non-default version enqueues a task, the task will always run on that same version.

The namespace in which a push task runs is determined when the task is added to the queue. By default, a task will run in the current namespace of the process that created the task. You can override this behavior by explicitly setting the namespace before adding a task to a queue, as described on the multitenancy page.

A task must finish executing and send an HTTP response value between 200–299 within 10 minutes of the original request. This deadline is separate from user requests, which have a 60-second deadline. If your task's execution nears the limit, App Engine raises a DeadlineExceededError (from the module google.appengine.runtime) that you can catch to save your work or log progress before the deadline passes. If the task failed to execute, App Engine retries it based on criteria that you can configure.

Task request headers

Requests from the Task Queue service contain the following HTTP headers:

  • X-AppEngine-QueueName, the name of the queue (possibly default)
  • X-AppEngine-TaskName, the name of the task, or a system-generated unique ID if no name was specified
  • X-AppEngine-TaskRetryCount, the number of times this task has been retried; for the first attempt, this value is 0. This number includes attempts where the task failed due to a lack of available instances and never reached the execution phase.
  • X-AppEngine-TaskExecutionCount, the number of times this task has previously failed during the execution phase. This number does not include failures due to a lack of available instances.
  • X-AppEngine-TaskETA, the target execution time of the task, specified in milliseconds since January 1st 1970.

These headers are set internally by Google App Engine. If your request handler finds any of these headers, it can trust that the request is a Task Queue request. If any of the above headers are present in an external user request to your app, they are stripped. The exception being requests from logged in administrators of the application, who are allowed to set the headers for testing purposes.

Tasks may be created with the X-AppEngine-FailFast header, which specifies that a task running on a backend fails immediately instead of waiting in a pending queue.

Google App Engine issues Task Queue requests from the IP address 0.1.0.2.

The rate of task execution

You set the maximum processing rate for the entire queue when you configure the queue. App Engine uses a token bucket algorithm to execute tasks once they've been delivered to the queue. Each queue has a token bucket, and each bucket holds a certain number of tokens. Your app consumes a token each time it executes a task. If the bucket runs out of tokens, the system pauses until the bucket has more tokens. The rate at which the bucket is refilled is the limiting factor that determines the rate of the queue. See Defining Push Queues and Processing Rates for more details.

To ensure that the Task Queue system does not overwhelm your application, it may throttle the rate at which requests are sent. This throttled rate is known as the enforced rate. The enforced rate may be decreased when your application returns a 503 HTTP response code, or if there are no instances able to execute a request for an extended period of time. You can view the enforced rate on the Task Queue tab of the Administration Console.

The order of task execution

The order in which tasks are executed depends on several factors:

  • The position of the task in the queue. App Engine attempts to process tasks based on FIFO (first in, first out) order. In general, tasks are inserted into the end of a queue, and executed from the head of the queue.
  • The backlog of tasks in the queue. The system attempts to deliver the lowest latency possible for any given task via specially optimized notifications to the scheduler. Thus, in the case that a queue has a large backlog of tasks, the system's scheduling may "jump" new tasks to the head of the queue.
  • The value of the task's eta property. This specifies the earliest time that a task can execute. App Engine always waits until after the specified ETA to process push tasks.
  • The value of the task's countdown property. This specifies the minimum number of seconds to wait before executing a task. Countdown and eta are mutually exclusive; if you specify one, do not specify the other.

Task retries

If a push task request handler returns an HTTP status code within the range 200–299, App Engine considers the task to have completed successfully. If the task returns a status code outside of this range, App Engine retries the task until it succeeds. The system backs off gradually to avoid flooding your application with too many requests, but schedules retry attempts for failed tasks to recur at a maximum of once per hour.

You can also configure your own scheme for task retries using the retry_parameters directive in queue.yaml.

When implementing the code for tasks (as worker URLs within your app), it is important to consider whether the task is idempotent. App Engine's Task Queue API is designed to only invoke a given task once; however, it is possible in exceptional circumstances that a task may execute multiple times (such as in the unlikely case of major system failure). Thus, your code must ensure that there are no harmful side-effects of repeated execution.

Deferred tasks

Setting up a handler for each distinct task (as described in the previous sections) can be cumbersome, as can serializing and deserializing complex arguments for the task—particularly if you have many diverse but small tasks that you want to run on the queue. The Python SDK includes a library (google.appengine.ext.deferred) exposing a simple function that allows you to bypass all the work of setting up dedicated task handlers and serializing and deserializing your parameters.

To use this library, you need to add the deferred builtin to app.yaml. For more information, please see the Built-in Handlers section of the Python Application Configuration page.

To use the deferred library, simply pass the function and its arguments to deferred.defer():

import logging

from google.appengine.ext import deferred

def do_something_expensive(a, b, c=None):
    logging.info("Doing something expensive!")
    # Do your work here

# Somewhere else
deferred.defer(do_something_expensive, "Hello, world!", 42, c=True)

The deferred library packages your function call and its arguments, then adds it to the task queue. When the task is executed, the deferred library executes do_something_expensive("Hello, world!", 42).

For more information about using the deferred library in Python, please refer to Background Work with the Deferred Library.

URL endpoints

Push tasks reference their implementation via URL. For example, a task which fetches and parses an RSS feed might use a worker URL called /app_worker/fetch_feed. You can specify this worker URL or use the default. In general, you can use any URL as the worker for a task, so long as it is within your application; all task worker URLs must be specified as relative URLs:

from google.appengine.api import taskqueue

taskqueue.add(url='/path/to/my/worker')
taskqueue.add(url='/path?a=b&c=d', method='GET')

If you do not specify a worker URL, the task uses a default worker URL named after the queue:

/_ah/queue/queue_name

A queue's default URL is used if, and only if, a task does not have a worker URL of its own. If a task does have its own worker URL, then it is only invoked at the worker URL, never another. Once inserted into a queue, its url endpoint cannot be changed.

You can also target tasks to App Engine Backends. Backends allow you to process tasks beyond the 10-minute deadline for task execution. See Push Queues and Backends for more information.

Securing URLs for tasks

You can prevent users from accessing URLs of tasks by restricting access to administrator accounts. Task queues can access admin-only URLs. You can restrict a URL by adding login: admin to the handler configuration in app.yaml.

An example might look like this in app.yaml:

application: hello-tasks
version: 1
runtime: python27
api_version: 1
threadsafe: true

handlers:
- url: /tasks/process
  script: process.app
  login: admin

For more information see Python Application Configuration: Requiring Login or Administrator Status.

To test a task web hook, sign in as an administrator and visit the URL of the handler in your browser.

Push queues and the development server

When your app is running in the development server, tasks are automatically executed at the appropriate time just as in production.

To disable tasks from running in the development server, run the following command:

dev_appserver.py --disable_task_runnning

You can examine and manipulate tasks from the developer console at: http://localhost:8000/_ah/admin/taskqueue.

To execute tasks, select the queue by clicking on its name, select the tasks to execute, and click Run Now. To clear a queue without executing any tasks, click Purge Queue.

The development server and the production server behave differently:

  • The development server doesn't respect the <rate> and <bucket-size> attributes of your queues. As a result, tasks are executed as close to their ETA as possible. Setting a rate of 0 doesn't prevent tasks from being executed automatically.
  • The development server doesn't retry tasks.
  • The development server doesn't preserve queue state across server restarts.

Push queues and backends

Push tasks typically must finish execution within 10 minutes. If you have push tasks that require more time or computing resources to process, you can use App Engine Backends to process these tasks outside of the normal limits of App Engine applications. The following code sample demonstrates how to create a push task addressed to an instance 1 of a backend named backend1:

from google.appengine.api import taskqueue
# ...
    def post(self):
        key = self.request.get('key')

        # Add the task to the default queue.
        taskqueue.add(url='/path/to/my/worker/', params={'key': key},
                      target='1.backend1')

Quotas and limits for push queues

Enqueuing a task in a push queue counts toward the following quotas:

  • Task Queue Stored Task Count
  • Task Queue Stored Task Bytes
  • Task Queue API Calls

The Task Queue Stored Task Bytes quota is configurable in queue.yaml by setting total_storage_limit. This quota counts towards your Stored Data (billable) quota.

Execution of a task counts toward the following quotas:

  • Requests
  • Incoming Bandwidth
  • Outgoing Bandwidth

The act of executing a task consumes bandwidth-related quotas for the request and response data, just as if the request handler were called by a remote client. When the task queue processes a task, the response data is discarded.

Once a task has been executed or deleted, the storage used by that task is reclaimed. The reclaiming of storage quota for tasks happens at regular intervals, and this may not be reflected in the storage quota immediately after the task is deleted.

For more information on quotas, see Quotas, and the "Quota Details" section of the Admin Console.

The following limits apply to the use of push queues:

Push Queue Limits
Maximum task size100KB
Maximum number of active queues (not including the default queue)Free apps: 10 queues, Billed apps: 100 queues
Queue execution rate500 task invocations per second per queue
Maximum countdown/ETA for a task30 days from the current date and time
Maximum number of tasks that can be added in a batch100 tasks
Maximum number of tasks that can be added in a transaction5 tasks

Authentication required

You need to be signed in with Google+ to do that.

Signing you in...

Google Developers needs your permission to do that.