An Advanced Interceptor: The Weather Interceptor

Introduction

This sample interceptor, more complex than the random bid interceptor shown in Your First Bid Interceptor, covers these concepts:

  • Bidding rules — The bidding rules used to determine whether or not to bid in response to a particular request are based on complex criteria.
  • Using external services — The data used by the weather interceptor for its bidding logic comes from an external source. This creates challenges for low-latency operation, that we address with asynchronous caching techniques.
  • Cloud Storage and Protocol buffers — The weather interceptor uses protocol buffers to serialize and deserialize its rule data, and Cloud Storage to persist it.
  • Extensions — This interceptor will have bidding rules tied to the campaign structure. We will rely on campaign IDs accessible via an OpenRTB extension cid (the current OpenRTB specification has no support for this in bid requests).
  • Interceptor composition — The Weather interceptor doesn't create any bid; it will only adjust the prices of bids created by another interceptor.

Why a weather interceptor?

Demand for some products and services varies greatly with the weather. For example, an amusement park may want to place more ads when the weather is nice and people are more likely to go to the park. However, manually adjusting your campaigns every time the weather changes would be a lot of work.

One solution for that is to use weather conditions in bidding rules. We can create a bidder that considers the weather to make bidding decisions by defining one or more "weather targeting" rules that consider weather conditions in order to determine how much to bid. For example:

  • If the weather is perfect for the relevant business, make a bid with the full price we are willing to pay for a particular impression (as determined by another interceptor);
  • If the weather is acceptable, but not ideal, perhaps warm but with weak winds and a small chance of rain, we can still bid, but with a "discounted" price, e.g. only 80% of the impression's value;
  • If the weather is poor, raining for example, we place a greatly reduced bid, like 20% of the impression's value. This probably means we'll lose the bid, but if we win, at least it's a dirt-cheap ad!

Weather targeting

The first important aspect of this interceptor is that it needs a reasonably complex modeling of targeting and bidding rules; and this model is our burden, because RTB exchanges don't typically support weather targeting as part of their campaign model. We need to design and store this on our end.

Before writing any code, there's an important implementation decision: persistence. In our example we will store the targeting rules in Google Cloud Storage, which requires some form of object serialization. Our model entities will be coded in a .proto file, using the protoc compiler to generate Java classes that support efficient binary serialization. For more information on using protocol buffers, see the Protocol Buffers documentation.

So, the first thing we need is a representation of weather conditions. Weather information services provide a large number of data points, but for our purposes of ad bidding, we only need the basics:

message WeatherConditions {
  required int32 tempFahrenheit = 1;
  required int32 windMph = 2;
  required double humidityPercent = 3;
}

Weather data will be retrieved from a weather reporting service. Then we can match the weather conditions with targeting constraints. The WeatherTarget message contains minimum and maximum values for each weather factor:

message WeatherTarget {
  optional int32 minTemp = 1;
  optional int32 maxTemp = 2;
  optional int32 minWind = 3;
  optional int32 maxWind = 4;
  optional double minHumidity = 5;
  optional double maxHumidity = 6;
}

In the next step, a full weather bidding rule is built pairing the target with an output multiplier. The multiplier is an adjustment to the original bid price (determined by another interceptor that created the bid for a given impression), where 1.0 = 100%.

message WeatherBiddingRule {
  optional WeatherTarget target = 1;
  optional double multiplier = 2;
}

Finally, for every cid, we'll have a set of WeatherBiddingRules. This binding will be encapsulated by our last message:

message WeatherRules {
  required string cid = 1;
  required WeatherBiddingRule rules = 2;
}

Helper services

Here's a minimal DAO to access the weather rules:

public interface WeatherDao {
  void insert(WeatherRules rules);
  void deleteRules(String cid);
  List<WeatherRules> listRules();
}

We also need information about the weather conditions of the location associated to each bid request. This will be provided by a weather reporting service:

public interface WeatherService {
  @Nullable WeatherConditions getWeatherConditions(String location);
}

Bidding logic

The protoc compiler creates Java model classes that can store data and serialize it to and from protocol buffers, but we're missing the logic to match weather targets to conditions. Suppose we wanted to make a targeting model for the amusement-park campaign that has a single rule that reduces bid prices to 80% of their value when the temperature is between 50° and 60°, wind is less than 30 mph, and humidity is between 10% and 50%. Here's how you instantiate it:

WeatherRules rules = WeatherRules.newBuilder().setCid("1")
    .addRules(WeatherBiddingRule.newBuilder()
        .setTarget(WeatherTarget.newBuilder()
            .setMinTemp(50).setMaxTemp(60)
            .setMaxWind(30)
            .setMinHumidity(0.1).setMaxHumidity(0.5))
        .setMultiplier(0.8))
    .build();

Now, if some bid request's weather conditions are: temperature = 55°F, wind = 20mph, humidity = 30%:

WeatherConditions cond = WeatherConditions.newBuilder()
    .setTempFahrenheit(55)
    .setWindMph(20)
    .setHumidityPercent(0.3)
    .build();

Our goal is to apply the WeatherRules to the condition, and have the bid multiplier set to 0.8. We can conceptualize each rule as a function from a set of condition filters to a multiplier, like this:

The weather interceptor determines the bid multiplier according to weather conditions.
Figure 1: The weather interceptor determines the bid multiplier according to weather conditions.

We can use this as a design basis. Let's create a class that implements the Function interface; specifically, a Function<WeatherConditions, Double>. The domain for this function is all possible weather conditions, and the return value is the multiplier that's used for bidding when the condition is satisfied. Unsatisfied conditions will return null.

The WeatherBiddingFunction class is a wrapper for the WeatherRules class, which implements a Function interface. The apply method compares each individual WeatherTarget to a WeatherConditions.

Finally, we consider that a WeatherRules collection may contain many WeatherBiddingRules, and certain conditions may match more than one target. Our strategy in the overloaded apply() is to evaluate all rules and, if more than one has a matching target, we return the highest multiplier. For example, if rule A says bid 90% of the impression value if it's hot, and rule B says to bid 80% if there's no wind; and the weather is "hot with no wind"— we should return 90%.

public class WeatherBiddingFunction implements Function<WeatherConditions, Double> {
  private final WeatherRules rules;

  public WeatherBiddingFunction(WeatherRules rules) {
    this.rules = checkNotNull(rules);
  }

  public WeatherRules getRules() {
    return rules;
  }

  @Override
  public @Nullable Double apply(WeatherConditions conditions) {
    WeatherBiddingRule bestRule = null;
    for (WeatherBiddingRule rule : rules.getRulesList()) {
      if (apply(rule.getTarget(), conditions)
          && (bestRule == null || bestRule.getMultiplier() < rule.getMultiplier())) {
        bestRule = rule;
      }
    }
    return bestRule == null ? null : bestRule.getMultiplier();
  }

  private static boolean apply(WeatherTarget target, WeatherConditions cond) {
    return
           (!target.hasMinTemp()     || target.getMinTemp()     <= cond.getTempFahrenheit())
        && (!target.hasMaxTemp()     || target.getMaxTemp()     >= cond.getTempFahrenheit())
        && (!target.hasMinWind()     || target.getMinWind()     <= cond.getWindMph())
        && (!target.hasMaxWind()     || target.getMaxWind()     >= cond.getWindMph())
        && (!target.hasMinHumidity() || target.getMinHumidity() <= cond.getHumidityPercent())
        && (!target.hasMaxHumidity() || target.getMaxHumidity() >= cond.getHumidityPercent());
  }
}

The interceptor

Having all the important business logic behind us, we can finally write the actual interceptor!

Construction and initialization

We'll need references to two service objects— WeatherDao and WeatherService—which can be initialized with Guice (we'll see later how to set this up).

We also need an in-memory index of all weather rules; this will be a Map where the key is a campaign ID, and the value is a WeatherFunction. This initialization from an external database can be slow, and it can rely on other services that may still be uninitialized at construction time, so the best practice is performing this work asynchronously—the good news is that Open Bidder makes this easy to implement.

public class WeatherInterceptor implements BidInterceptor {
  private ImmutableMap<String, WeatherBiddingFunction> weatherFunctions;
  private final WeatherService weatherService;

  @Inject
  public WeatherInterceptor(
      final WeatherDao weatherDao,
      WeatherService weatherService,
      @InitExecutor ExecutorService initExecutor) {
    this.weatherService = weatherService;
    initExecutor.execute(() -> {
      ImmutableMap.Builder<String, WeatherBiddingFunction> map = ImmutableMap.builder();
      for (WeatherRules rules : weatherDao.listRules()) {
        map.put(rules.getCid(), new WeatherBiddingFunction(rules));
      }
      weatherFunctions = map.build();
    });
  }
  ...
}

The execute() method

Now comes the interceptor's entry point, execute(). The first thing it does is call chain.proceed(); this is because WeatherInterceptor is a post-processing interceptor — it runs after all other BidInterceptors further in the chain have executed, and modifies any bids they may have created. You could obtain a similar effect by having your code before the proceed() but placing it at the end of the chain.

Next, we need to look up the current weather conditions for the request's location. This information is only available in the exchange-specific request data; while this example is DoubleClick-specific, but you could refactor it in order to support multiple exchanges. Once we know the location from which the request was received, we can invoke the weather-reporting service's getWeatherConditions() method.

Now that we have the WeatherConditions, we can easily apply the WeatherBiddingFunction for each Bid's Impression. If the function returns a non-null multiplier, apply it to the Bid.price. Otherwise, if we're unable to obtain weather conditions for any reason or don't have any applicable rules, we place a very cheap bid — which means we'll almost certainly lose the auction, but if we do win it (e.g., because there are no other bidders), at least it was a bargain!

@Override
public void execute(final InterceptorChain<BidRequest, BidResponse> chain) {
  // We will post-process bids created by other interceptors down the chain.
  chain.proceed();

  Geo geo = chain.request().openRtb().getDevice().getGeo();
  final WeatherConditions cond = geo.hasCity() && geo.hasCountry()
      ? weatherService.getWeatherConditions(geo.getCity() + ',' + geo.getCountry())
      : null;

  chain.response().updateBids(bid -> {
    assert bid != null;
    WeatherBiddingFunction function = bid.hasCid() ? weatherFunctions.get(bid.getCid()) : null;

    if (function != null) {
      Double multiplier = cond == null ? null : function.apply(cond);
      // No weather data or applicable rules for this location? Make the bid very cheap
      multiplier = multiplier == null ? 0.2 : multiplier;
      double updatedPrice = multiplier * bid.getPrice();
      bid.setPrice(updatedPrice);
    }
    return true;
  });
}

Helper services

Wait, we're not done yet because we have only defined interfaces for our weather-storing DAO and our weather-reporting service! These are not central to the interceptor programming or to the weather-based bidding logic, but let's finish this code for completeness (or you can skip to the next section).

Storage service

We'll use the Google Cloud Storage Service to persist the weather rules. Cloud Storage keeps a set of named buckets, each of which contains named objects. It's a good option because it's native to the Google cloud with all capabilities you need for bidding setups at any scale, and Open Bidder itself uses Cloud Storage so you can trivially use it for your own purposes. Our schema is very simple: a single bucket with a record per campaign. The bucket name will be provided as a property (you need to create it with gsutil mb). Google Cloud Storage has a REST API, but Open Bidder (which also uses this service) has a client to simplify using it called GoogleCloudStorage.

Cloud Storage only stores raw byte streams in its objects, so we'll use protocol buffers to serialize the WeatherRules object. The listRules() method retrieves data for all rules, which is all the interceptor needs. The insert() and deleteRules() methods complete a minimal CRUD interace that enables a simple command-line tool to manage stored rules that are essential for testing (see the class WeatherTool in open-bidder-weather, and the shell scripts in /bin to help invoke it).

public final class WeatherDaoCloudStorage implements WeatherDao {
  private final GoogleCloudStorage cloudStorage;
  private final String storageBucket;

  public WeatherDaoCloudStorage(GoogleCloudStorage cloudStorage, String storageBucket) {
    this.cloudStorage = cloudStorage;
    this.storageBucket = storageBucket;
  }

  @Override
  public void insert(WeatherRules rules) {
    try {
      cloudStorage.putObject(storageBucket, rules.getCid(),
          new ByteArrayContent("binary/octet-stream", rules.toByteArray()), null);
    } catch (HttpResponseException e) {}
  }

  @Override
  public void deleteRules(String cid) {
    try {
      cloudStorage.removeObject(storageBucket, cid);
    } catch (HttpResponseException e) {}
  }

  @Override
  public List<WeatherRules> listRules() {
    List<WeatherRules> rules = new ArrayList<>();
    try {
      ListBucketResult items = cloudStorage.listObjectsInBucket(storageBucket, null);
      for (ListBucketResult.Content item : items.getContents()) {
        StorageObject rule = cloudStorage.getObject(storageBucket, item.getKey(), null);
        rules.add(WeatherRules.parseFrom(rule.getInputStream()));
      }
    } catch (IOException e) {
    }
    return rules;
  }
}

The weather service and caching

Finally, we need to create a means of retrieving the WeatherConditions for a given location. There are plenty of web services with APIs that you could use to retrieve weather forecasts. Choose an API you would like to use, and then implement WeatherService with it (see the project for one example).

Two problems relevant to real-time bidding are latency and scale. Invoking a web service for each bid request will have a penalty of a few tens of milliseconds or more, which can be prohibitive. Furthermore, third-party weather services often have quotas or per-request costs, which make them unviable for bidders that handle thousands of requests per second! To mitigate both problems, we need to add a final trick: asynchronous caching.

public class WeatherServiceCache implements WeatherService {
  private final Integer deadlineMs;
  private final LoadingCache<String, Future<WeatherConditions>> cache;

  public WeatherServiceCache(
      Integer deadlineMs, final WeatherService weatherSource, final ExecutorService executor) {
    this.deadlineMs = deadlineMs;
    this.cache = CacheBuilder.newBuilder()
        .refreshAfterWrite(2, TimeUnit.HOURS)
        .maximumSize(1000000)
        .build(new CacheLoader<String, Future<WeatherConditions>>() {
          @Override public Future<WeatherConditions> load(final String location) {
            try {
              return executor.submit(() -> weatherSource.getWeatherConditions(location));
            } catch (RejectedExecutionException e) {
              return Futures.immediateFuture(null);
            }
          }});
  }

  public long size() {
    return cache.size();
  }

  @Override
  public @Nullable WeatherConditions getWeatherConditions(String location) {
    try {
      Stopwatch stopwatch = Stopwatch.createStarted();
      Future<WeatherConditions> future = cache.getUnchecked(location);
      return deadlineMs == null
          ? (future.isDone() ? future.get() : null)
          : future.get(deadlineMs, TimeUnit.MILLISECONDS);
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
      return null;
    } catch (ExecutionException | TimeoutException e) {
      return null;
    }
  }
}

Implementing this cache is simple thanks to Guava's caching APIs and Java's concurrency APIs. The cache maps locations to their corresponding WeatherConditions. It starts empty, and entries expire when they're considered old and stale. We still have a latency problem for the first lookup to each location (and each lookup after the cached WeatherConditions expire), so our load() method executes asynchronously, returning a Future that provides access to the cached value. Every lookup to the cache will read the Future with a maximum deadline. If that takes too long because the cache entry was just loaded and the weather service is slow, we return null— so the interceptor won't be able to apply the weather rules for that particular bid, but at least it won't exceed the exchange's maximum latency so the bidder is not punished with throttling. However this cache entry will be eventually loaded, so the value will be available for the next request for the same location. The default deadline is none, which means lookups never wait, they will return null immediately if the value is not cached already; this is the recommended setting.

Guice module

There's a single missing piece now: the dependency injection bindings required by the interceptor. For this we need the WeatherModule listed below; notice that aside from binding each service interface to some implementation, this is where we can interpose the cache for the weather service.

@Parameters(separators = "=")
public class WeatherModule extends AbstractModule {
  @Parameter(names = "--weather_bucket", required = false,
      description = "Google Cloud Storage bucket where Weather rules are to be loaded from")
  private String storageBucket;
  @Parameter(names = "--weather_cache_timeout", required = false,
      description = "Deadline for blocking reads to the weather data cache (ms; 0=never block)")
  private Integer cacheTimeout;

  @Override
  protected void configure() {
    if (!Strings.isNullOrEmpty(storageBucket)) {
      bind(String.class).annotatedWith(WeatherStorageBucket.class).toInstance(storageBucket);
      bind(Integer.class).annotatedWith(WeatherCacheTimeout.class)
          .toProvider(Providers.of(cacheTimeout));
      install(new ImplModule());
    }
  }

  private static class ImplModule extends AbstractModule {
    @Override protected void configure() {}

    @Provides @Singleton
    public WeatherDao provideWeatherDao(
        GoogleCloudStorage cloudStorage,
        @WeatherStorageBucket String storageBucket) {
      return new WeatherDaoCloudStorage(cloudStorage, storageBucket);
    }

    @Provides @Singleton
    public WeatherService provideWeatherService(
        JsonFactory jsonFactory,
        HttpTransport httpTransport,
        MetricRegistry metricRegistry,
        @Nullable @WeatherCacheTimeout Integer cacheTimeout) {
      final BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
      metricRegistry.register(MetricRegistry.name(getClass(), "CacheQueue"),
          new Gauge<Integer>() {
            @Override public Integer getValue() { return queue.size(); }
          });
      final WeatherServiceCache cache = new WeatherServiceCache(
          cacheTimeout,
          new WeatherServiceOpenWeatherMap(jsonFactory, httpTransport),
          // Limit threads to not overload the server
          new ThreadPoolExecutor(1, 8, 60L, TimeUnit.SECONDS, queue));
      metricRegistry.register(MetricRegistry.name(getClass(), "CacheSize"),
          new Gauge<Long>() {
            @Override public Long getValue() { return cache.size(); }
          });
      return cache;
    }
  }
}

As a bonus, you can see in the last method above an example of how to instrument services with custom metrics. Just inject the MetricProvider object, create and register your metrics. Here we create two Gauges, which are the simplest kind of metric, allowing inspection of the current value of some variable. Our gauges will show the current amount of cached weather conditions, and the size of the queue that holds pending requests for lookups.

In order to use the WeatherInterceptor on a bidder, you need a "bidder binary" project with an extension of BidderServer that adds the WeatherModule. See the Bidder Server Guide for more information. For this example, the binary project is already provided: see Open Bidder's module open-bidder-samples-binary.

Unit testing

No system is complete without its tests! The test itself is not difficult, but it will need some structure and helpers.

The setUp method

The first problem is that it's very impractical to execute tests that depend on live, external services; they can be slow, have a cost, need authorization, or not return the same results consistently. To solve this problem, our test will use a WeatherServiceMock class that's very simple: it implements the getWeatherConditions() method by simply returning some fixed data that you populate in the mock object. Check the project for its source code, but it's just some data structures so we don't show it here.

The setUp() method also initializes a GoogleCloudStorageMock, which can be imported from Open Bidder's testing support packages. This is a full mock of the entire Google Cloud Storage, so you don't need to create mocks for each DAO class: just connect the unmodified DAO to the mock GCS!

Next, we populate the data store with weather rules for two campaign IDs used in the tests, create the weather-reporting service's mock, and then populate it with fake weather data for the locations used in the test. Now we can create the WeatherInterceptor connected to the two mock services, and also another simple interceptor to create the initial bids (remember the weather interceptor works as a postprocessor). Finally, initialize a BidController configured with this chain of two interceptors.

The tearDown method

Let's be neat and shutdown the controller after all testing is done, which has the added benefit of allowing you to test the interceptors' @PreDestroy method.

Writing the test cases

All prerequisites behind us, we can finally write the test cases proper. We'll have two tests, the only distinction being the inputs and outputs, so most of the code can go in a shared method; which also makes each test cleaner, containing only the inputs, outputs and assertions.

The helper run() method runs the bidder for each test. It contains the standard code to test interceptors:

  • Creates the BidRequest populated with relevant data
  • Invokes the bidder's onBidRequest()
  • Extracts the bid value from the response.
public class WeatherInterceptorTest {
  private static final String CID_AMUSEMENT_PARK = "1";
  private static final String CID_SAILING_CRUISE = "2";
  private BidController controller;

  @Before
  public void setUp() throws HttpResponseException {
    GoogleCloudStorage cloudStorage = new FakeGoogleCloudStorage(new FakeClock());
    cloudStorage.putBucket("weather-0");

    // Fake data for our tests
    WeatherDao weatherDao = new WeatherDaoCloudStorage(cloudStorage, "weather-0");

    weatherDao.insert(WeatherRules.newBuilder().setCid(CID_AMUSEMENT_PARK)
        .addRules(WeatherBiddingRule.newBuilder()
            .setTarget(WeatherTarget.newBuilder()
                .setMinTemp(50).setMaxTemp(60)
                .setMaxWind(30)
                .setMinHumidity(0.1).setMaxHumidity(0.5))
            .setMultiplier(0.8))
            // Omitted: more rules
        .build());
    // Omitted: insert rule for CID_SAILING_CRUISE

    // Fake weather information for our tests
    WeatherService weatherService = new WeatherServiceMock(
        new WeatherData("New York,USA", WeatherConditions.newBuilder()
            .setTempFahrenheit(72).setWindMph(14).setHumidityPercent(0.3).build()),
        new WeatherData("Rio de Janeiro,BRA", WeatherConditions.newBuilder()
            .setTempFahrenheit(90).setWindMph(35).setHumidityPercent(0.2).build()),
        new WeatherData("Paris,FRA", WeatherConditions.newBuilder()
            .setTempFahrenheit(55).setWindMph(5).setHumidityPercent(0.1).build())
    );

    controller = BiddingTestUtil.newBidController(
        new WeatherInterceptor(
            weatherDao, weatherService, MoreExecutors.newDirectExecutorService()),
        (BidInterceptor) chain -> {
          for (Imp imp : chain.request().imps()) {
            for (String cid : chain.request().exchange().getCidList(imp)) {
              chain.response().addBid(Bid.newBuilder()
                  .setId("1")
                  .setImpid(imp.getId())
                  .setCid(cid)
                  .setPrice(1.0)
                  .setAdm("snippet"));
            }
          }
        });
  }

  @After
  public void tearDown() {
    if (controller != null) {
      controller.stopAsync().awaitTerminated();
    }
  }

  @Test
  public void testAmusementPark() {
    // Good weather; max bid
    assertThat(run(CID_AMUSEMENT_PARK, "New York,USA")).isWithin(1e-9).of(1.0);
    // Too hot; min bid
    assertThat(run(CID_AMUSEMENT_PARK, "Rio de Janeiro,BRA")).isWithin(1e-9).of(0.2);
    // Acceptable; low bid
    assertThat(run(CID_AMUSEMENT_PARK, "Paris,FRA")).isWithin(1e-9).of(0.8);
  }

  // Omitted: @Test testSailingCruise()

  @Test
  public void testNoLocation() {
    // No location; min bid
    assertThat(run(CID_AMUSEMENT_PARK, null)).isWithin(1e-9).of(0.2);
  }

  private double run(String cid, @Nullable String location) {
    OpenRtb.BidRequest.Builder openrtbRequest = OpenRtb.BidRequest.newBuilder()
        .setId("1")
        .addImp(Imp.newBuilder()
            .setId("1")
            .setBanner(Banner.newBuilder().setId("1").setW(728).setH(90))
            .setBidfloor(0.1)
            .setExtension(AdxExt.imp, AdxExt.ImpExt.newBuilder()
                .addBillingId(Long.valueOf(cid)).build()));
    if (location != null) {
      openrtbRequest.setDevice(Device.newBuilder().setGeo(Geo.newBuilder()
          .setCity(location.split(",")[0])
          .setCountry(location.split(",")[1])));
    }
    BidRequest request = TestBidRequestBuilder.create()
        .setExchange(DoubleClickConstants.EXCHANGE)
        .setRequest(openrtbRequest).build();
    BidResponse response = TestBidResponseBuilder.create()
        .setExchange(DoubleClickConstants.EXCHANGE)
        .build();
    controller.onRequest(request, response);
    Iterator<Bid.Builder> bids = response.bids().iterator();
    return bids.hasNext() ? Iterators.getOnlyElement(bids).getPrice() : 0;
  }
}

Thanks to the mock services, the unit test doesn't need any kind of external setup. You can just run the WeatherInterceptorTest with mvn test, or from an IDE with JUnit integration.

The open-bidder-weather project includes a command-line utility (see above) to help create some test data in Cloud Storage, so a "live" interceptor will have some weather rules to execute in a real deployment. The full code in the project includes logging; set the logger com.google.openbidder.sample.weather to DEBUG in the bidder's /usr/local/open-bidder/etc/logback.xml and you can observe details like the execution of rules and price adjustments made by the interceptor.

Enviar comentarios sobre…