Source code for adaptdl.goodput

# Copyright 2020 Petuum, Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import autograd
import numpy as np
import collections
import scipy.optimize
import scipy.stats

# Parameters for a performance model which predicts the per-step time of
# distributed SGD using all-reduce. At a high level, models compute time and
# network time separately, and combines them with some degree of overlap.
# Compute time is modeled as a linear function of the local batch size.
# Network time is modeled using different parameters depending on if the job
# is inter-node (there exists a pair of replicas on different nodes), or
# intra-node (all replicas are on the same node). For both cases, network time
# is modeled as a constant term plus a retrogression term which increases
# linearly with the total number of replicas.
PerfParams = collections.namedtuple("PerfParams", [
    # T_compute ~ alpha_c + beta_c * local_bsz +
    #             (alpha_a + beta_a * local_bsz) * accumulation_steps
    "alpha_c",  # Constant term of compute time
    "beta_c",   # Multiplicative factor of compute time
    # If inter-node: T_network ~ alpha_n + beta_n * replicas
    "alpha_n",  # Constant term of inter-node network time
    "beta_n",   # Retrogression factor of inter-node network time
    # If intra-node: T_network ~ alpha_r + beta_r * replicas
    "alpha_r",  # Constant term of intra-node network time
    "beta_r",   # Retrogression factor of intra-node network time
    # T_step ~ (T_compute ^ gamma + T_network ^ gamma) ^ (1 / gamma)
    # Essentially is a p-norm where p = gamma. When p ~ 1 then
    # T_step ~ T_compute + T_network, indicating no overlap between compute
    # and network. When p -> infinity then T_step = max(T_compute, T_network),
    # indicating perfect overlap. We limit gamma to [1, 10] since 10 is close
    # enough to approximate the max function for our purposes.
    "gamma",    # Models the degree of overlap between compute and network
])

GradParams = collections.namedtuple("GradParams", ["sqr", "var"])


[docs]class GoodputFunction(object): def __init__(self, perf_params, grad_params, init_batch_size): self._perf_params = PerfParams(*perf_params) self._grad_params = GradParams(*grad_params) self._init_batch_size = init_batch_size def __call__(self, num_nodes, num_replicas, atomic_bsz, accum_steps): return self.evaluate(num_nodes, num_replicas, atomic_bsz, accum_steps)
[docs] def evaluate(self, num_nodes, num_replicas, atomic_bsz, accum_steps): batch_size = num_replicas * atomic_bsz * (accum_steps + 1) assert np.all(self._init_batch_size <= batch_size) return self.throughput(num_nodes, num_replicas, atomic_bsz, accum_steps) * self.efficiency(batch_size)
[docs] def throughput(self, num_nodes, num_replicas, atomic_bsz, accum_steps): accum_time = _predict_accum_time(self._perf_params, atomic_bsz) network_time = _predict_network_time(self._perf_params, num_nodes, num_replicas) optim_time = np.exp(_predict_log_optim_time(self._perf_params, accum_time, network_time)) total_time = accum_steps * accum_time + optim_time batch_size = num_replicas * atomic_bsz * (accum_steps + 1) return batch_size / total_time
[docs] def efficiency(self, batch_size): grad_sqr = self._grad_params.sqr grad_var = self._grad_params.var scale = batch_size / self._init_batch_size denom = grad_var / scale + grad_sqr gain = np.where(denom > 0, (grad_var + grad_sqr) / denom, 1.0) return gain / scale
[docs] def optimize(self, num_nodes, num_replicas, max_batch_size=None, atomic_bsz_range=None, accumulation=False): assert np.all(np.less_equal(1, num_nodes)) assert np.all(np.less_equal(num_nodes, num_replicas)) if max_batch_size is None: max_batch_size = self._init_batch_size assert self._init_batch_size <= max_batch_size atomic_bsz_range = atomic_bsz_range or (None, None) min_atomic_bsz = atomic_bsz_range[0] or 1 max_atomic_bsz = atomic_bsz_range[1] or max_batch_size # Remember what the output shape/format should be and flatten inputs. output_shape = np.broadcast(num_nodes, num_replicas).shape output_scalar = np.isscalar(num_nodes) or np.isscalar(num_replicas) num_nodes = np.broadcast_to(num_nodes, output_shape).flatten() num_replicas = np.broadcast_to(num_replicas, output_shape).flatten() # Samples 50 different total batch sizes in geometric space. min_batch_size = np.maximum(self._init_batch_size, min_atomic_bsz * num_replicas) batch_size = np.geomspace(min_batch_size, max_batch_size) local_bsz = batch_size / num_replicas eps = 1e-8 # Tolerance for floor/ceil operations. if accumulation: # If local_bsz size exceeds the max atomic batch size, split it # into a number of batches to form (atomic_bsz, accum_steps) such # that (atomic_bsz * (accum_steps + 1)) is close to local_bsz. # # If num_replicas == 1 and local_bsz > self._init_batch_size, then # set accum_steps to at least 1. This is because the gradient # statistics used for scaling up the learning rate are inaccurate # when there is only one atomic minibatch to estimate them from. accum_steps = np.ceil(local_bsz / max_atomic_bsz - eps) - 1 accum_steps = np.where( np.logical_and(num_replicas == 1, local_bsz > self._init_batch_size + eps), np.maximum(accum_steps, 1), accum_steps).astype(int) atomic_bsz = np.ceil( local_bsz / (accum_steps + 1) - eps).astype(int) else: accum_steps = np.zeros_like(local_bsz, dtype=np.int) atomic_bsz = np.where( num_replicas == 1, self._init_batch_size, np.ceil(local_bsz - eps)).astype(int) # Constrain the atomic_bsz before we evaluate the candidates atomic_bsz = np.maximum(min_atomic_bsz, atomic_bsz) atomic_bsz = np.minimum(max_atomic_bsz, atomic_bsz) # Evaluate the goodput of all candidate configurations. goodput = self.evaluate(num_nodes, num_replicas, atomic_bsz, accum_steps) # Find the indices of the best configurations. indices = np.argmax(goodput, axis=0), np.arange(goodput.shape[1]) # Restore the correct output shape and return results. goodput = goodput[indices].reshape(output_shape) atomic_bsz = atomic_bsz[indices].reshape(output_shape) accum_steps = accum_steps[indices].reshape(output_shape) if output_scalar: goodput = goodput.item() atomic_bsz = atomic_bsz.item() accum_steps = accum_steps.item() return goodput, atomic_bsz, accum_steps
[docs]def fit_perf_params(num_nodes, num_replicas, atomic_bsz, accum_step_time, optim_step_time): # Fit the performance model given accum time and optim time measurements # for different configurations of num_nodes, num_replicas, and atomic_bsz. # HACK: We want to use the original numpy module for calls from the # SpeedupFunction for performance reasons, but also need those functions to # use autograd.numpy when we want to differentiate them. We patch the # global np reference only for the code invoked rom this function. global np # Replace numpy from autograd. orig_np = np np = autograd.numpy num_nodes = np.array(num_nodes) num_replicas = np.array(num_replicas) accum_step_time = np.array(accum_step_time) optim_step_time = np.array(optim_step_time) # Set initial params to reasonable values. params = [1e-1, 1e-2] * 3 + [1.0 + 1e-3] # Set lower/upper bounds for each parameter. Add a small slack to lower # bounds to avoid numerical instability issues. lower = [1e-8, 1e-8] * 3 + [1.0] upper = [np.inf, np.inf] * 3 + [10.0] if len(np.unique(atomic_bsz)) == 1: # Fix alpha_c if only observed a single atomic batch size. # This makes the speedup model optimistic with respect to # scaling up the batchsize. This will assign equal weight # to the constant and multplicative factors for accum time # if there is only a single datapoint (which is by far the # most likely case for this scenario) params[0] = upper[0] = lower[0] = np.mean(accum_step_time) / 2 if not np.any(num_nodes > 1): # Fix alpha_n and beta_n if no multi-node observations. params[2] = upper[2] = lower[2] params[3] = upper[3] = lower[3] if not np.any(np.logical_and(num_nodes == 1, num_replicas > 1)): # Fix alpha_r and beta_r if no single-node/multi-replica observations. params[4] = upper[4] = lower[4] params[5] = upper[5] = lower[5] if not np.any(num_replicas > 2): # Fix beta_n and beta_r if no replicas > 2. params[3] = upper[3] = lower[3] params[5] = upper[5] = lower[5] bounds = scipy.optimize.Bounds(lower, upper, keep_feasible=True) args = (num_nodes, num_replicas, atomic_bsz, accum_step_time, optim_step_time) # FIXME: need to handle optimization failures and propagate to the Trainer. grad_fn = autograd.grad(_obj_fn) result = scipy.optimize.minimize(_obj_fn, params, args=args, jac=grad_fn, bounds=bounds) params = result.x if not any(num_nodes > 1): # Enforce prior: alpha_n and beta_n are at least alpha_r and beta_r. params[2] = max(params[2], params[4] * 1.1) params[3] = max(params[3], params[5] * 1.1) np = orig_np # Restore original numpy. return PerfParams(*params)
def _rmse(pred, true): return np.sqrt(((pred - true) ** 2).mean()) def _obj_fn(params, num_nodes, num_replicas, atomic_bsz, accum_step_time, optim_step_time): params = PerfParams(*params) pred_accum = _predict_accum_time(params, atomic_bsz) pred_network = _predict_network_time(params, num_nodes, num_replicas) pred_log_optim = _predict_log_optim_time(params, pred_accum, pred_network) # RMSLError of accum step time predictions. err1 = _rmse(np.log(pred_accum), np.log(accum_step_time)) # RMSLError of optim step time predictions. err2 = _rmse(pred_log_optim, np.log(optim_step_time)) # L2 regularization towards a smaller gamma, because it's easier to # optimize the alpha and beta parameters when gamma is smaller. reg1 = 1e-3 * (params.gamma - 1) ** 2 # Penalize retrogression terms to prefer a more optimistic model. reg2 = 1e-2 * ((params.beta_n / params.alpha_n) ** 2 + (params.beta_r / params.alpha_r) ** 2) return err1 + err2 + reg1 + reg2 def _predict_accum_time(params, atomic_bsz): params = PerfParams(*params) # Forward/backward passes should scale linearly with the batch size. return params.alpha_c + params.beta_c * atomic_bsz def _predict_log_optim_time(params, accum_time, network_time): gamma = PerfParams(*params).gamma return np.log(accum_time ** gamma + network_time ** gamma) / gamma def _predict_network_time(params, num_nodes, num_replicas): params = PerfParams(*params) # Select the most significant link between replicas, currently either # inter-node (nodes > 1) or intra-node (replicas > 1). Note that if # replicas == 1 then neither of these two conditions are matched. conds = [num_nodes > 1, num_replicas > 1] # Bandwidth is bottlenecked by the most significant link, alpha models # the overhead of transferring data across that link. bottleneck = np.select(conds, [params.alpha_n, params.alpha_r], 1e-8) # Assuming ring all-reduce, communication happens in a number of rounds # equal to the number of replicas. beta models the performance # retrogression from increasing the number of replicas beyond 2. retrogress = np.select(conds, [params.beta_n, params.beta_r], 1e-8) retrogress = retrogress * np.maximum(num_replicas - 2, 1e-8) return (bottleneck + retrogress)