Google App Engine

Background work with the deferred library

Nick Johnson
October 15, 2009

Introduction

Thanks to the Task Queue API released in SDK 1.2.3, it's easier than ever to do work 'offline', separate from user serving requests. In some cases, however, setting up a handler for each distinct task you want to run can be cumbersome, as can serializing and deserializing complex arguments for the task - particularly if you have many diverse but small tasks that you want to run on the queue.

Fortunately, a new library in release 1.2.5 of the SDK makes these ad-hoc tasks much easier to write and execute. This library is found in google.appengine.ext.deferred, and from here on in we'll refer to it as the 'deferred' library. The deferred library lets you bypass all the work of setting up dedicated task handlers and serializing and deserializing your parameters by exposing a simple function, deferred.defer(). To call a function later, simply pass the function and its arguments to deferred.defer, like this:

from google.appengine.ext import deferred

def do_something_expensive(a, b, c=None):
    logging.info("Doing something expensive!")
    # Do your work here

# Somewhere else
deferred.defer(do_something_expensive, "Hello, world!", 42, c=True)

That's all there is to it. The deferred library will package up your function call and its arguments, and add it to the task queue. When the task gets executed, the deferred library will execute do_something_expensive("Hello, world!", 42, c=True). There's one other thing you need to do to get this working - you need to add the deferred library's task handler to your app.yaml. Just add the following entry to the builtins section of your app.yaml:

- deferred: on

The capabilities of the deferred library aren't limited to simple function calls with straightforward arguments. In fact, you can use nearly any python 'callable', including functions, methods, class methods and callable objects. deferred.defer supports task arguments used in the task queue API, such as countdown, eta, and name, but in order to use them, you need to prefix them with an underscore, to prevent them conflicting with keyword arguments to your own function. For example, to run do_something_expensive in 30 seconds, with a custom queue name, you could do this:

deferred.defer(do_something_expensive, "Foobie bletch", 12, _countdown=30, _queue="myqueue")

Example: A datastore mapper

To demonstrate how powerful the deferred library can be, we're going to reprise an example from the remote_api article - the Mapper class. Like the example in the remote_api article, this class will make it easy to iterate over a large set of entities, making changes or calculating totals. Unlike the remote_api version, though, our version won't require an external computer to run it on, and it'll be more efficient to boot!

Here's our example Mapper implementation:

from google.appengine.ext import deferred
from google.appengine.runtime import DeadlineExceededError

class Mapper(object):
    # Subclasses should replace this with a model class (eg, model.Person).
    KIND = None

    # Subclasses can replace this with a list of (property, value) tuples to filter by.
    FILTERS = []

    def __init__(self):
        self.to_put = []
        self.to_delete = []

    def map(self, entity):
        """Updates a single entity.

        Implementers should return a tuple containing two iterables (to_update, to_delete).
        """
        return ([], [])

    def finish(self):
        """Called when the mapper has finished, to allow for any final work to be done."""
        pass

    def get_query(self):
        """Returns a query over the specified kind, with any appropriate filters applied."""
        q = self.KIND.all()
        for prop, value in self.FILTERS:
            q.filter("%s =" % prop, value)
        q.order("__key__")
        return q

    def run(self, batch_size=100):
        """Starts the mapper running."""
        self._continue(None, batch_size)

    def _batch_write(self):
        """Writes updates and deletes entities in a batch."""
        if self.to_put:
            db.put(self.to_put)
            self.to_put = []
        if self.to_delete:
            db.delete(self.to_delete)
            self.to_delete = []

    def _continue(self, start_key, batch_size):
        q = self.get_query()
        # If we're resuming, pick up where we left off last time.
        if start_key:
            q.filter("__key__ >", start_key)
        # Keep updating records until we run out of time.
        try:
            # Steps over the results, returning each entity and its index.
            for i, entity in enumerate(q):
                map_updates, map_deletes = self.map(entity)
                self.to_put.extend(map_updates)
                self.to_delete.extend(map_deletes)
                # Do updates and deletes in batches.
                if (i + 1) % batch_size == 0:
                    self._batch_write()
                # Record the last entity we processed.
                start_key = entity.key()
            self._batch_write()
        except DeadlineExceededError:
            # Write any unfinished updates to the datastore.
            self._batch_write()
            # Queue a new task to pick up where we left off.
            deferred.defer(self._continue, start_key, batch_size)
            return
        self.finish()

If you've read the remote_api article, this is likely to seem very familiar. There's a couple of differences worth noting.

First, the main loop is wrapped in a try/except clause, catching DeadlineExceededError. This allows us to process as many results as we can in the time we have available; when we're out of time, the runtime throws a DeadlineExceededError, which gives us enough time to queue the next task before returning. We're also no longer fetching results in batches - instead, we iterate over the query, which lets us fetch as many results as we have time to process. Updates and deletes are still batched, however, and whenever we've processed enough records, we update the datastore, and record the current entity as the point to continue from.

The other difference is that we've added a finish() method. This gets called when every matching entity has been processed, and allows us to use the Mapper class to do things like calculate totals.

Using the mapper

Using this class is almost identical to using the one we defined for remote_api, so the examples from that article apply here, too. Here's the example mapper that adds 'bar' to every guestbook entry containing 'foo', rewritten for our new mapper class:

class GuestbookUpdater(Mapper):
    KIND = Greeting

    def map(self, entity):
        if entity.content.lower().find('foo') != -1:
            entity.content += ' Bar!'
            return ([entity], [])
       return ([], [])
  mapper = MyMapper()
  deferred.defer(mapper.run)

As you can see, the mapper subclass itself is unchanged; the only change is how we invoke it - by 'deferring' the mapper.run method.

The new finish method makes it easy to calculate totals, too. Suppose we have a class that records files and how many times they've been downloaded, and we want to count up the total number of files and the total number of downloads. We could implement it like this:

class File(db.Model):
      name = db.StringProperty(required=True)
      download_count = db.IntegerProperty(required=True, count=0)

  class DailyTotal(db.Model):
      date = db.DateProperty(required=True, auto_now_add=True)
      file_count = db.IntegerProperty(required=True)
      download_count = db.IntegerProperty(required=True)

  class DownloadCountMapper(Mapper):
      KIND = File

      def __init__(self):
          self.file_count = 0
          self.download_count = 0

      def map(self, file):
          self.file_count += 1
          self.download_count += file.download_count

      def finish(self):
          total = DailyTotal(file_count=self.file_count,
                             download_count=self.download_count)
          total.put()

     mapper = DownloadCountMapper()
     deferred.defer(mapper.run)

In just a few lines of code, we've written a mapper that can be run from a cron job each day, and will count up the total number of files and downloads, storing them to another entity. This will still run even if we have many more File entities than we could count up in a single request, and it'll automatically retry in the event of timeouts and other transient errors. And at no point did we have to write a task queue handler, or map URLs. Our Mapper class can be put in a library and used in multiple applications, even if they use different frameworks!

The Mapper framework we wrote above isn't full-featured, of course, and it could do with a few improvements to make it more robust and versatile, but it serves to demonstrate the power of the deferred API.

Deferred tips and tricks

As always, there are a few tips and tricks you should be aware of in order to make optimal use of the deferred library.

Make tasks as small as possible

Task Queue items are limited to 100 KB of associated data. This means that when the deferred library serializes the details of your call, it must amount to less than 100 KB in order to fit on the Task Queue directly. No need to panic, though: If you try to enqueue a task that is too big to fit on the queue by itself, the deferred library will automatically create a new Entity in the datastore to hold information about the task, and will delete the entity once the task has been run. This means that in practice, your function call can be up to 1 MB once serialized.

As you might expect, however, inserting and deleting datastore entities for each task adds some overhead, so if you can, make the arguments to your call (and the object itself, if you're calling a method) as small as possible to avoid the extra overhead.

Don't pass entities to deferred.defer

If you're working with datastore entities, you might be tempted to do something like this:

def do_something(entity):
    # Do something with entity
    entity.put()

entity = MyModel.get_by_id(123)
deferred.defer(do_something, entity, _countdown=60)

This is a bad idea, because the deferred library will package up the entity itself and store it to the task queue; when the time comes to execute your task, it will deserialize the entity and run the code. When your code writes the entity back to the datastore, it'll overwrite any changes that have happened since you enqueued the task with deferred.defer!

Instead of passing entities around, then, you should pass keys. For example:

def do_something_with_key(k):
    entity = MyModel.get(k)
    # Do something with entity
    entity.put()

k = db.Key.from_path('MyModel', 123)
deferred.defer(do_something_with_key, k, _countdown=60)

Dealing with task failures

Tasks created using deferred.defer get retried in case of failure just like regular Task Queue tasks. 'Failure' in the case of a deferred task is defined as your task throwing any uncaught exception. Normally, automatic retries are what you want - for example, if the task does datastore operations, and the datastore threw a Timeout exception. If you want to deliberately cause your task to fail so it will be retried, you can simply throw an exception.

Sometimes, though, you know your task will never succeed, and you want it to fail permanently. In such situations, you have two options:

  • Return from the deferred function normally. The handler will think the task executed successfully, and will not try again. This is best in the case of non-critical failures - for example, when you deferred something for later, and discovered it's already been done.
  • Raise a deferred.PermanentTaskFailure exception. This is a special exception, and is treated as a permanent failure. It is logged as an exception, so it will show up in your admin console as one, but causes the task to not be retried again.

Handling import path manipulation

Some applications, or the frameworks they use, rely on manipulating the Python import path in order to make all the libraries they need available. While this is a perfectly legitimate technique, the deferred library has no way of knowing what path manipulations you've engaged in, so if the task you're deferring relies on modules that aren't on the import path by default, you need to give it a helping hand. Failing to do this can result in your tasks failing to run - or worse, only failing intermittently.

Fortunately, handling this is easy. Make sure your code that changes the import path is in a module all of its own, such as 'fix_path.py'. Such a module might look like this:

import os
import sys

sys.path.append(os.path.join(os.path.dirname(__file__), 'lib'))

Then, import the 'fix_path' module along with your usual imports, anywhere you rely on the modified path, such as in the module you defined the functions you're calling with deferred.defer in.

Limitations of the deferred library

There are a few limitations on what you can pass to deferred.defer:

  • All arguments must be picklable. That means you can't pass exotic things like instances of nested classes as function arguments. If you're calling an instance method, the instance must be picklable too. If you're not familiar with what 'pickling' is, don't worry - most stuff that you're likely to be passing to deferred.defer will work just fine.
  • You can't call nested functions, or methods of nested classes.
  • You can't call lambda functions (but you probably wouldn't want to anyway).
  • You can't call a static method.
  • You can't call a method in the request handler module.

The last point above deserves special attention: Calling a method in the request handler module - the module specified as a request handler in app.yaml - will not work. You can call deferred.defer from the request handler module, but the function you are calling must be defined elsewhere!

When to use ext.deferred

You may be wondering when to use ext.deferred, and when to stick with the built-in task queue API. Here are our suggestions.

You may want to use the deferred library if:

  • You only use the task queue lightly.
  • You want to refactor existing code to run on the Task Queue with a minimum of changes.
  • You're writing a one off maintenance task, such as schema migration.
  • Your app has many different types of background task, and writing a separate handler for each would be burdensome.
  • Your task requires complex arguments that aren't easily serialized without using Pickle.
  • You are writing a library for other apps that needs to do background work.

You may want to use the Task Queue API if:

  • You need complete control over how tasks are queued and executed.
  • You need better queue management or monitoring than deferred provides.
  • You have high throughput, and overhead is important.
  • You are building larger abstractions and need direct control over tasks.
  • You like the webhook model better than the RPC model.

Naturally, you can use both the Task Queue API and the deferred library side-by-side, if your app has requirements that fit into both groups.

If you've come up with your own uses for the deferred API, please let us and the rest of the community know about them in the discussion groups.

Authentication required

You need to be signed in with Google+ to do that.

Signing you in...

Google Developers needs your permission to do that.