The Job Shop Problem

One common scheduling problem is the job shop, in which multiple jobs are processed on several machines. Each job consists of a sequence of tasks, which must be performed in a given order, and each task must be processed on a specific machine. For example, the job could be the manufacture of a single consumer item, such as an automobile. The problem is to schedule the tasks on the machines so as to minimize the length of the schedule—the time it takes for all the jobs to be completed.

There are several constraints for the job shop problem:

  • No task for a job can be started until the previous task for that job is completed.
  • A machine can only work on one task at a time.
  • A task, once started, must run to completion.

Example Problem

Below is a simple example of a job shop problem, in which each task is labeled by a pair of numbers (m, p) where m is the number of the machine the task must be processed on and p is the processing time of the task — the amount of time it requires. (The numbering of jobs and machines starts at 0.)

  • job 0 = [(0, 3), (1, 2), (2, 2)]
  • job 1 = [(0, 2), (2, 1), (1, 4)]
  • job 2 = [(1, 4), (2, 3)]

In the example, job 0 has three tasks. The first, (0, 3), must be processed on machine 0 in 3 units of time. The second, (1, 2), must be processed on machine 1 in 2 units of time, and so on. Altogether, there are eight tasks.

A solution for the problem

A solution to the job shop problem is an assignment of a start time for each task, which meets the constraints given above. The diagram below shows one possible solution for the problem:

You can check that the tasks for each job are scheduled at non-overlapping time intervals, in the order given by the problem.

The length of this solution is 12, which is the first time when all three jobs are complete. However, as you will see below, this is not the optimal solution to the problem.

Variables and constraints for the problem

This section describes how to set up the variables and constraints for the problem. First, let task(i, j) denote the jth task in the sequence for job i. For example, task(0, 2) denotes the second task for job 0, which corresponds to the pair (1, 2) in the problem description.

Next, define ti, j to be the start time for task(i, j). The ti, j are the variables in the job shop problem. Finding a solution involves determining values for these variables that meet the requirement of the problem.

There are two types of constraints for the job shop problem:

  • Precedence constraints — These arise from the condition that for any two consecutive tasks in the same job, the first must be completed before the second can be started. For example, task(0, 2) and task(0, 3) are consecutive tasks for job 0. Since the processing time for task(0, 2) is 2, the start time for task(0, 3) must be at least 2 units of time after the start time for task 2. (Perhaps task 2 is painting a door, and it takes two hours for the paint to dry.) As a result, you get the following constraint:

        t0, 2   + 2  ≤  t0, 3

  • No overlap constraints — These arise from the restriction that a machine can't work on two tasks at the same time. For example, task(0, 2) and task(2, 1) are both processed on machine 1. Since their processing times are 2 and 4, respectively, one of the following constraints must hold:

        t0, 2   + 2  ≤  t2, 1 (if task(0, 2) is scheduled before task(2, 1))

          or

        t2, 1   + 4  ≤  t0, 2 (if task(2, 1) is scheduled before task(0, 2)).

Objective for the problem

The objective of the job shop problem is to minimize the makespan: the length of time from the earliest start time of the jobs to the latest end time.

A Python solution

The following sections describe the main elements of a Python program that solves the job shop problem.

Declare the model

The following code declares the model for the problem.

# Import Python wrapper for or-tools CP-SAT solver.
from ortools.sat.python import cp_model


def MinimalJobshopSat():
    """Minimal jobshop problem."""
    # Create the model.
    model = cp_model.CpModel()

Define the data

Next, the program defines the data for the problem.

    jobs_data = [  # task = (machine_id, processing_time).
        [(0, 3), (1, 2), (2, 2)],  # Job0
        [(0, 2), (2, 1), (1, 4)],  # Job1
        [(1, 4), (2, 3)]  # Job2
    ]

    machines_count = 1 + max(task[0] for job in jobs_data for task in job)
    all_machines = range(machines_count)
    jobs_count = len(jobs_data)
    all_jobs = range(jobs_count)

Define the variables

The following code defines the variables in the problem.

    task_type = collections.namedtuple('task_type', 'start end interval')
    assigned_task_type = collections.namedtuple('assigned_task_type',
                                                'start job index')

    # Create jobs.
    all_tasks = {}
    for job in all_jobs:
        for task_id, task in enumerate(jobs_data[job]):
            start_var = model.NewIntVar(0, horizon,
                                        'start_%i_%i' % (job, task_id))
            duration = task[1]
            end_var = model.NewIntVar(0, horizon, 'end_%i_%i' % (job, task_id))
            interval_var = model.NewIntervalVar(
                start_var, duration, end_var, 'interval_%i_%i' % (job, task_id))
            all_tasks[job, task_id] = task_type(
                start=start_var, end=end_var, interval=interval_var)

First, the program uses the solver's NewIntVar method to create interval_var, which contains the time interval for each task.

Define the constraints

The following code defines the constraints for the problem.

    # Create and add disjunctive constraints.
    for machine in all_machines:
        intervals = []
        for job in all_jobs:
            for task_id, task in enumerate(jobs_data[job]):
                if task[0] == machine:
                    intervals.append(all_tasks[job, task_id].interval)
        model.AddNoOverlap(intervals)

    # Add precedence contraints.
    for job in all_jobs:
        for task_id in range(0, len(jobs_data[job]) - 1):
            model.Add(all_tasks[job, task_id +
                                1].start >= all_tasks[job, task_id].end)

The program uses the solver's AddNoOverlap method to create the no overlap constraints, which prevent tasks for the same machine from overlapping in time.

Next, the program adds the precedence constraints, which prevent consecutive tasks for the same job from overlapping in time. For each job, the line

  model.Add(
          all_tasks[job, task_id + 1].start >= all_tasks[job, task_id].end)

requires the end time of a task to occur before the start time of the next task in the job.

Define the objective

The following code defines the objective in the problem.

    # Makespan objective.
    obj_var = model.NewIntVar(0, horizon, 'makespan')
    model.AddMaxEquality(
        obj_var,
        [all_tasks[(job, len(jobs_data[job]) - 1)].end for job in all_jobs])
    model.Minimize(obj_var)

The expression

   model.AddMaxEquality(
      obj_var,
      [all_tasks[(job, len(jobs_data[job]) - 1)].end for job in all_jobs])

creates a variable obj_var whose value is the maximum of the end times for all jobs —that is, the makespan.

Declares the solver

The following code declares and calls the solver.

    # Solve model.
    solver = cp_model.CpSolver()
    status = solver.Solve(model)

Display the results

The following code displays the results, including the optimal schedule and task intervals.

        # Print out makespan.
        print('Optimal Schedule Length: %i' % solver.ObjectiveValue())
        print()

        # Create one list of assigned tasks per machine.
        assigned_jobs = [[] for _ in all_machines]
        for job in all_jobs:
            for task_id, task in enumerate(jobs_data[job]):
                machine = task[0]
                assigned_jobs[machine].append(
                    assigned_task_type(
                        start=solver.Value(all_tasks[job, task_id].start),
                        job=job,
                        index=task_id))

        disp_col_width = 10
        sol_line = ''
        sol_line_tasks = ''

        print('Optimal Schedule', '\n')

        for machine in all_machines:
            # Sort by starting time.
            assigned_jobs[machine].sort()
            sol_line += 'Machine ' + str(machine) + ': '
            sol_line_tasks += 'Machine ' + str(machine) + ': '

            for assigned_task in assigned_jobs[machine]:
                name = 'job_%i_%i' % (assigned_task.job, assigned_task.index)
                # Add spaces to output to align columns.
                sol_line_tasks += name + ' ' * (disp_col_width - len(name))
                start = assigned_task.start
                duration = jobs_data[assigned_task.job][assigned_task.index][1]

                sol_tmp = '[%i,%i]' % (start, start + duration)
                # Add spaces to output to align columns.
                sol_line += sol_tmp + ' ' * (disp_col_width - len(sol_tmp))

            sol_line += '\n'
            sol_line_tasks += '\n'

        print(sol_line_tasks)
        print('Task Time Intervals\n')
        print(sol_line)

The output is shown below:

Optimal Schedule Length: 11

Optimal Schedule

Machine 0: Job_0_0   Job_1_0
Machine 1: Job_2_0   Job_0_1   Job_1_2
Machine 2: Job_1_1   Job_0_2   Job_2_1

The length of the optimal schedule is 11, which is an improvement over the solution shown previously. The optimal schedule is displayed for each machine, where Job_i_j represents the jth task for job i.

Here are the time intervals corresponding to the scheduled tasks above.

Task Time Intervals

Machine 0: [0,3]     [3,5]
Machine 1: [0,4]     [4,6]     [6,10]
Machine 2: [5,6]     [6,8]     [8,11]

Here's the timeline for the optimal schedule.

Entire program

Finally, here is the entire program for the job shop problem.

from __future__ import print_function

import collections

# Import Python wrapper for or-tools CP-SAT solver.
from ortools.sat.python import cp_model


def MinimalJobshopSat():
    """Minimal jobshop problem."""
    # Create the model.
    model = cp_model.CpModel()

    jobs_data = [  # task = (machine_id, processing_time).
        [(0, 3), (1, 2), (2, 2)],  # Job0
        [(0, 2), (2, 1), (1, 4)],  # Job1
        [(1, 4), (2, 3)]  # Job2
    ]

    machines_count = 1 + max(task[0] for job in jobs_data for task in job)
    all_machines = range(machines_count)
    jobs_count = len(jobs_data)
    all_jobs = range(jobs_count)

    # Compute horizon.
    horizon = sum(task[1] for job in jobs_data for task in job)

    task_type = collections.namedtuple('task_type', 'start end interval')
    assigned_task_type = collections.namedtuple('assigned_task_type',
                                                'start job index')

    # Create jobs.
    all_tasks = {}
    for job in all_jobs:
        for task_id, task in enumerate(jobs_data[job]):
            start_var = model.NewIntVar(0, horizon,
                                        'start_%i_%i' % (job, task_id))
            duration = task[1]
            end_var = model.NewIntVar(0, horizon, 'end_%i_%i' % (job, task_id))
            interval_var = model.NewIntervalVar(
                start_var, duration, end_var, 'interval_%i_%i' % (job, task_id))
            all_tasks[job, task_id] = task_type(
                start=start_var, end=end_var, interval=interval_var)

    # Create and add disjunctive constraints.
    for machine in all_machines:
        intervals = []
        for job in all_jobs:
            for task_id, task in enumerate(jobs_data[job]):
                if task[0] == machine:
                    intervals.append(all_tasks[job, task_id].interval)
        model.AddNoOverlap(intervals)

    # Add precedence contraints.
    for job in all_jobs:
        for task_id in range(0, len(jobs_data[job]) - 1):
            model.Add(all_tasks[job, task_id +
                                1].start >= all_tasks[job, task_id].end)

    # Makespan objective.
    obj_var = model.NewIntVar(0, horizon, 'makespan')
    model.AddMaxEquality(
        obj_var,
        [all_tasks[(job, len(jobs_data[job]) - 1)].end for job in all_jobs])
    model.Minimize(obj_var)

    # Solve model.
    solver = cp_model.CpSolver()
    status = solver.Solve(model)

    if status == cp_model.OPTIMAL:
        # Print out makespan.
        print('Optimal Schedule Length: %i' % solver.ObjectiveValue())
        print()

        # Create one list of assigned tasks per machine.
        assigned_jobs = [[] for _ in all_machines]
        for job in all_jobs:
            for task_id, task in enumerate(jobs_data[job]):
                machine = task[0]
                assigned_jobs[machine].append(
                    assigned_task_type(
                        start=solver.Value(all_tasks[job, task_id].start),
                        job=job,
                        index=task_id))

        disp_col_width = 10
        sol_line = ''
        sol_line_tasks = ''

        print('Optimal Schedule', '\n')

        for machine in all_machines:
            # Sort by starting time.
            assigned_jobs[machine].sort()
            sol_line += 'Machine ' + str(machine) + ': '
            sol_line_tasks += 'Machine ' + str(machine) + ': '

            for assigned_task in assigned_jobs[machine]:
                name = 'job_%i_%i' % (assigned_task.job, assigned_task.index)
                # Add spaces to output to align columns.
                sol_line_tasks += name + ' ' * (disp_col_width - len(name))
                start = assigned_task.start
                duration = jobs_data[assigned_task.job][assigned_task.index][1]

                sol_tmp = '[%i,%i]' % (start, start + duration)
                # Add spaces to output to align columns.
                sol_line += sol_tmp + ' ' * (disp_col_width - len(sol_tmp))

            sol_line += '\n'
            sol_line_tasks += '\n'

        print(sol_line_tasks)
        print('Task Time Intervals\n')
        print(sol_line)


MinimalJobshopSat()

Send feedback about...