A Distributed Quasi-Newton Algorithm for Empirical Risk Minimization with Nonsmooth Regularization

Ching-pei Lee, Cong Han Lim, Stephen J. Wright

Introduction

We consider solving the following regularized problem in a distributed manner:

where XX is a dd by nn real-valued matrix, gg is a convex, closed, and extended-valued proper function that can be nondifferentiable, and ff is a differentiable function whose gradient is Lipschitz continuous with parameter L>0L>0. Each column of XX represents a single data point or instance, and we assume that the set of data points is partitioned and spread across KK machines (i.e. distributed instance-wise). We can write XX as

where XkX_{k} is stored exclusively on the kkth machine. We further assume that ff shares the same block-separable structure and can be written as follows:

Unlike our instance-wise setting, some existing works consider the feature-wise partition setting, under which XX is partitioned by rows rather than columns. Although the feature-wise setting is a simpler one for algorithm design when gg is separable, storage of different features on different machines is often impractical.

The bottleneck in performing distributed optimization is often the high cost of communication between machines. For (1), the time required to retrieve XkX_{k} over a network can greatly exceed the time needed to compute fkf_{k} or its gradient with locally stored XkX_{k}. Moreover, we incur a delay at the beginning of each round of communication due to the overhead of establishing connections between machines. This latency prevents many efficient single-core algorithms such as coordinate descent (CD) and stochastic gradient and their asynchronous parallel variants from being employed in large-scale distributed computing setups. Thus, a key aim of algorithm design for distributed optimization is to improve the communication efficiency while keeping the computational cost affordable. Batch methods are preferred in this context, because fewer rounds of communication occur in distributed batch methods.

To the best of our knowledge, for ERMs with general nonsmooth regularizers in the instance-wise storage setting, proximal-gradient-like methods (Wright et al., 2009; Nesterov, 2013) are the only practical distributed optimization algorithms with convergence guarantees. Since these methods barely use the Hessian information of the smooth part (if at all), we suspect that proper utilization of second-order information has the potential to improve convergence speed and therefore communication efficiency dramatically. We thus propose a practical distributed inexact variable-metric algorithm for general (1) which uses gradients and which updates information from previous iterations to estimate curvature of the smooth part ff in a communication-efficient manner. We describe construction of this estimate and solution of the corresponding subproblem. We also provide convergence rate guarantees, which also bound communication complexity. These rates improve on existing distributed methods, even those tailor-made for specific regularizers.

Our algorithm leverages the more general framework provided in Lee and Wright (2018), and our major contribution in this work is to describe how the main steps of the framework can be implemented efficiently in a distributed environment. Our approach has both good communication and computational complexity, unlike certain approaches that focus only on communication at the expense of computation (and ultimately overall time). We believe that this work is the first to propose, analyze, and implement a practically feasible distributed optimization method for solving (1) with general nonsmooth regularizer gg under the instance-wise storage setting.

Our algorithm and implementation details are given in Section 2. Communication complexity and the effect of the subproblem solution inexactness are analyzed in Section 3. Section 4 discusses related works, and empirical comparisons are conducted in Section 5. Concluding observations appear in Section 6.

\|\cdot\| denotes the 2-norm, both for vectors and for matrices.

Algorithm

the update direction is obtained by approximately solving

A line search procedure determines a suitable stepsize α\alpha, and we perform the update ww+αp{\boldsymbol{w}}\leftarrow{\boldsymbol{w}}+\alpha{\boldsymbol{p}}.

For the ease of description, we assume the allreduce model of MPI (Message Passing Interface Forum, 1994), but it is also straightforward to extend the framework to a master-worker platform. Under this model, all machines simultaneously fulfill master and worker roles, and any data transmitted is broadcast to all machines. This can be considered as equivalent to conducting one map-reduce operation and then broadcasting the result to all nodes. The communication cost for the allreduce operation on a dd-dimensional vector under this model is

where TinitialT_{\text{initial}} is the latency to establish connection between machines, and TbyteT_{\text{byte}} is the per byte transmission time (see, for example, Chan et al. (2007, Section 6.3)).

The first term in (3) also explains why batch methods are preferable. Even if methods that frequently update the iterates communicate the same amount of bytes, it takes more rounds of communication to transmit the information, and the overhead of log(K)Tinitial\log(K)T_{\text{initial}} incurred at every round of communication makes this cost dominant, especially when KK is large.

In subsequent discussion, when an allreduce operation is performed on a vector of dimension O(d)O(d), we simply say that O(d)O(d) communication is conducted. We omit the latency term since batch methods like ours tend to have only a small constant number of rounds of communication per iteration. By contrast, non-batch methods such as CD or stochastic gradient require O(d)O(d) or O(n)O(n) rounds of communication per epoch and therefore face much more significant latency issues.

We see that, except for the sum over kk, the computation can be conducted locally provided w{\boldsymbol{w}} is available to all machines. Our algorithm maintains XkTwX_{k}^{T}{\boldsymbol{w}} on the kkth machine throughout, and the most costly steps are the matrix-vector multiplications between XkX_{k} and fk(XkTw)\nabla f_{k}(X_{k}^{T}{\boldsymbol{w}}), and XTwX^{T}{\boldsymbol{w}}. The local dd-dimensional partial gradients are then aggregated through an allreduce operation.

3. Constructing a good H𝐻H efficiently

The LBFGS Hessian approximation matrix is

At the first iteration where no si{\boldsymbol{s}}_{i} and yi{\boldsymbol{y}}_{i} are available, we set H0a0IH_{0}\coloneqq a_{0}I for some positive scalar a0a_{0}. When ff is twice-differentiable and convex, we use

If ff is not strongly convex, it is possible that (5) is only positive semi-definite. In this case, we follow Li and Fukushima (2001), taking the mm update pairs to be the most recent mm iterations for which the inequality

is satisfied, for some predefined δ>0\delta>0. It can be shown that this safeguard ensures that HtH_{t} are always positive definite and the eigenvalues are bounded within a positive range (see, for example, the appendix of Lee and Wright (2017)).

No additional communication is required to compute HtH_{t}. The gradients at all previous iterations have been shared with all machines through the allreduce operation, and the iterates wt{\boldsymbol{w}}_{t} are also available on each machine, as they are needed to compute the local gradient. Thus the information needed to form HtH_{t} is available locally on each machine.

We now consider the costs associated with the matrix MtM_{t}. In practice, mm is usually much smaller than dd, so the O(m3)O(m^{3}) cost of inverting the matrix directly is insignificant compared to the cost of the other steps. However, if dd is large, the computation of the inner products siTyj{\boldsymbol{s}}_{i}^{T}{\boldsymbol{y}}_{j} and siTsj{\boldsymbol{s}}_{i}^{T}{\boldsymbol{s}}_{j} can be expensive. We can significantly reduce this cost by computing and maintaining the inner products in parallel and assembling the results with O(m)O(m) communication cost. At the ttth iteration, given the new st1{\boldsymbol{s}}_{t-1}, we compute its inner products with both StS_{t} and YtY_{t} in parallel via the summations

requiring O(m)O(m) communication of the partial sums on each machine. We keep these results until st1{\boldsymbol{s}}_{t-1} and yt1{\boldsymbol{y}}_{t-1} are discarded, so that at each iteration, only 2m2m (not O(m2)O(m^{2})) inner products are computed.

4. Solving the Subproblem

The approximate Hessian HtH_{t} is generally not diagonal, so there is no easy closed-form solution to (2). We will instead use iterative algorithms to obtain an approximate solution to this subproblem. In single-core environments, coordinate descent (CD) is one of the most efficient approaches for solving (2) (Yuan et al., 2012; Zhong et al., 2014; Scheinberg and Tang, 2016). Since the subproblem (2) is formed locally on all machines, a local CD process can be applied when gg is separable and dd is not too large. The alternative approach of applying proximal-gradient methods to (2) may be more efficient in distributed settings, since they can be parallelized with little communication cost for large dd, and can be applied to larger classes of regularizers gg.

The fastest proximal-gradient-type methods are accelerated gradient (AG) (Nesterov, 2013) and SpaRSA (Wright et al., 2009). SpaRSA is a basic proximal-gradient method with spectral initialization of the parameter in the prox term. SpaRSA has a few key advantages over AG despite its weaker theoretical convergence rate guarantees. It tends to be faster in the early iterations of the algorithm (Yang and Zhang, 2011), thus possibly yielding a solution of acceptable accuracy in fewer iterations than AG. It is also a descent method, reducing the objective QHQ_{H} at every iteration, which ensures that the solution returned is at least as good as the original guess p=0{\boldsymbol{p}}=0

In the rest of this subsection, we will describe a distributed implementation of SpaRSA for (2), with HH as defined in (5). To distinguish between the iterations of our main algorithm (i.e. the entire process required to update w{\boldsymbol{w}} a single time) and the iterations of SpaRSA, we will refer to them by main iterations and SpaRSA iterations respectively.

Since HH and w{\boldsymbol{w}} are fixed in this subsection, we will write QH(;w)Q_{H}(\cdot;{\boldsymbol{w}}) simply as Q()Q(\cdot). We denote the iith iterate of the SpaRSA algorithm as p(i){\boldsymbol{p}}^{(i)}, and we initialize p(0)0{\boldsymbol{p}}^{(0)}\equiv 0. We denote the smooth part of QHQ_{H} by f^(p)\hat{f}({\boldsymbol{p}}), and the nonsmooth g(w+p)g({\boldsymbol{w}}+{\boldsymbol{p}}) by g^(p)\hat{g}({\boldsymbol{p}}). At the iith iteration of SpaRSA, we define

where ψi\psi_{i} is defined by the following “spectral” formula:

When i=0i=0, we use a pre-assigned value for ψ0\psi_{0} instead. (In our LBFGS choice for HtH_{t}, we use the value of γt\gamma_{t} from (6d) as the initial estimate of ψ0\psi_{0}.) The exact minimizer of (11) can be difficult to compute for general regularizers gg. However, approximate solutions of (11) suffice to provide a convergence rate guarantee for solving (2) (Schmidt et al., 2011; Scheinberg and Tang, 2016; Ghanbari and Scheinberg, 2016; Lee and Wright, 2018). Since it is known (see (Li and Fukushima, 2001)) that the eigenvalues of HH are upper- and lower-bounded in a positive range after the safeguard (9) is applied, we can guarantee that this initialization of ψi\psi_{i} is bounded within a positive range; see Section 3. The initial value of ψi\psi_{i} defined in (12) is increased successively by a chosen constant factor β>1\beta>1, and p(i+1){\boldsymbol{p}}^{(i+1)} is recalculated from (11), until the following sufficient decrease criterion is satisfied:

For general HH, the computational bottleneck of f^\nabla\hat{f} would take O(d2)O(d^{2}) operations to compute the Hp(i)H{\boldsymbol{p}}^{(i)} term. However, for our LBFGS choice of HkH_{k}, this cost is reduced to O(md+m2)O(md+m^{2}) by utilizing the matrix structure, as shown in the following formula:

The computation of (14) can be parallelized, by first parallelizing computation of the inner product UtTp(i)U_{t}^{T}{\boldsymbol{p}}^{(i)} via the formula

The distributed implementation of SpaRSA for solving (2) is summarized in Algorithm 1.

5. Line Search

After obtaining an update direction pk{\boldsymbol{p}}^{k} by approximately minimizing QHk(;wk)Q_{H_{k}}(\cdot;{\boldsymbol{w}}_{k}), a line search procedure is usually needed to find a step size αk\alpha_{k} that ensures sufficient decrease in the objective value. We follow Tseng and Yun (2009) by using a modified-Armijo-type backtracking line search to find a suitable step size α\alpha. Given the current iterate w{\boldsymbol{w}}, the update direction p{\boldsymbol{p}}, and parameters σ1,θ(0,1)\sigma_{1},\theta\in(0,1), we set

and pick the step size as the largest of θ0,θ1,\theta^{0},\theta^{1},\dotsc satisfying

The computation of Δ\Delta can again be done in a distributed manner. First, XkTpX_{k}^{T}{\boldsymbol{p}} can be computed locally on each machine, then the first term in (15) is obtained by sending a scalar over the network. When gg is block-separable, its computation can also be distributed across machines. The vector XkTpX_{k}^{T}{\boldsymbol{p}} is then used to compute the left-hand side of (16) for arbitrary values of α\alpha. Writing XkT(w+αp)=(XkTw)+α(XkTp)X_{k}^{T}({\boldsymbol{w}}+\alpha{\boldsymbol{p}})=\left(X_{k}^{T}{\boldsymbol{w}}\right)+\alpha\left(X_{k}^{T}{\boldsymbol{p}}\right), we see that once XkTwX_{k}^{T}{\boldsymbol{w}} and XkTpX_{k}^{T}{\boldsymbol{p}} are known, we can evaluate XkT(w+αp)X_{k}^{T}({\boldsymbol{w}}+\alpha{\boldsymbol{p}}) via an “axpy” operation (weighted sum of two vectors). Because HtH_{t} defined in (5) attempts to approximate the real Hessian, the unit step α=1\alpha=1 frequently satisfies (16), so we use the value 11 as the initial guess. Aside from the communication needed to compute the summation of the fkf_{k} terms in the evaluation of FF, the only other communication needed is to share the update direction p{\boldsymbol{p}} if (2) was solved in a distributed manner. Thus, two rounds of O(d)O(d) communication are incurred per main iteration. Otherwise, if each machine solves the same subproblem (2) locally, then only one round of O(d)O(d) communication is required.

Our distributed algorithm for (1) is summarized in Algorithm 2.

6. Cost Analysis

We now summarize the costs of our algorithm. For the distributed version of Algorithm 1, each iteration costs

in communication. In the next section, we will show that (13) is accepted in a constant number of times and thus the overall communication cost is O(m)O(m).

For Algorithm 2, the computational cost per iteration is

where #nnz is the number of nonzero elements in XX, and the communication cost is

We note that the costs of Algorithm 1 are dominated by those of Algorithm 2 if a fixed number of SpaRSA iterations is conducted every main iteration.

Communication Complexity

The use of an iterative solver for the subproblem (2) generally results in an inexact solution. We first show that running SpaRSA for any fixed number of iterations guarantees a step p{\boldsymbol{p}} whose accuracy is sufficient to prove overall convergence.

Using HtH_{t} as defined in (5) with the safeguard mechanism (9) in (2), we have the following.

There exist constants c1c2>0c_{1}\geq c_{2}>0 such that c1IHtc2Ic_{1}I\succeq H_{t}\succeq c_{2}I for all main iterations. Moreover, XTXLγtδ\|X^{T}X\|L\geq\gamma_{t}\geq\delta for all t>0t>0.

The initial estimate of ψi\psi_{i} at every SpaRSA iteration is bounded within the range of [min{c2,δ},max{c1,XTXL}][\min\{c_{2},\delta\},\max\{c_{1},\|X^{T}X\|L\}], and the final accepted value ψi\psi_{i} is upper-bounded.

SpaRSA is globally Q-linear convergent in solving (2). Therefore, there exists η[0,1)\eta\in[0,1) such that if we run at least SS iterations of SpaRSA for all main iterations for any S>0S>0, the approximate solution p{\boldsymbol{p}} satisfies

where QQ^{*} is the optimal objective of (2).

Lemma 3.1 establishes how the number of iterations of SpaRSA affects the inexactness of the subproblem solution. Given this measure, we can leverage the results developed in Lee and Wright (2018) to obtain iteration complexity guarantees for our algorithm. Since in our algorithm, communication complexity scales linearly with iteration complexity, this guarantee provides a bound on the amount of communication. In particular, our method communicates O(d+mS)O(d+mS) bytes per iteration (where SS is the number of SpaRSA iterations used, as in Lemma 3.1) and the second term can usually be ignored for small mm.

We show next that the step size generated by our line search procedure in Algorithm 2 is lower bounded by a positive value.

If SpaRSA is run at least SS iterations in solving (2), the corresponding Δ\Delta defined in (15) satisfies

Moreover, the backtracking subroutine in Algorithm 2 terminates in finite steps and produces a step size

This result is just a worst-case guarantee; in practice we often observe that the line search procedure terminates with α=1\alpha=1 for our choice of HH, as we see in our experiments.

Next, we analyze communication complexity of Algorithm 2.

If we apply Algorithm 2 to solve (1), and Algorithm 1 is run for SS iterations at each main iteration, then the following claims hold.

Suppose that the following variant of strong convexity holds: There exists μ>0\mu>0 such that for any w{\boldsymbol{w}} and any aa\in, we have

where FF^{*} is the optimal objective value of (1), Ω\Omega is the solution set, and PΩP_{\Omega} is the projection onto this set. Then Algorithm 2 converges globally at a Q-linear rate. That is,

Therefore, to get an approximate solution of (1) that is ϵ\epsilon-accurate in the sense of objective value, we need to perform at most

When FF is convex, and the level set defined by w0{\boldsymbol{w}}_{0} is bounded, define

Then we obtain the following expressions for rate of convergence of the objective value.

When F(wt)Fc1R02F({\boldsymbol{w}}_{t})-F^{*}\geq c_{1}R_{0}^{2},

Otherwise, we have globally for all tt that

This implies a communication complexity of

If FF is non-convex, the norm of the proximal gradient steps

converge to zero at a rate of O(1/t)O(1/\sqrt{t}) in the following sense:

Note that it is known that the norm of GtG_{t} is zero if and only if wt{\boldsymbol{w}}_{t} is a stationary point (Lee and Wright, 2018), so this measure serves as a first-order optimality condition.

Our computational experiments cover the case of FF convex; exploration of the method on nonconvex FF is left for future work.

Related Works

Scheinberg and Tang (2016) and Ghanbari and Scheinberg (2016) showed global convergence rate results for a method based on (2) with bounded HH, and suggested using randomized coordinate descent to solve (2). In the experiments of these two works, they used the same choice of HH as we do in this paper, with CD as the solver for (2), which is well suited to their single-machine setting. Aside from our extension to the distributed setting and the use of SpaRSA, the third major difference between their algorithm and ours is that they do not conduct line search on the step size. Instead, when the obtained solution with a unit step size does not result in sufficient objective value decrease, they add a scaled identity matrix to HH and solve (2) again starting from p(0)=0{\boldsymbol{p}}^{(0)}=0. The cost of repeatedly solving (2) from scratch can be high, which results in an algorithm with higher overall complexity. This potential inefficiency is exacerbated further by the inefficiency of coordinate descent in the distributed setting.

Some methods consider solving (1) in a distributed environment where XX is partitioned feature-wise (i.e. along rows instead of columns). There are two potential disadvantages of this approach. First, new data points can easily be assigned to one of the machines in our approach, whereas in the feature-wise approach, the features of all new points would need to be distributed around the machines. Second, local curvature information is obtained, so the update direction can be poor if the data is distributed nonuniformly across features. (Data is more likely to be distributed evenly across instances than across features.) In the extreme case in which each machine contains only one row of XX, only the diagonal entries of the Hessian can be obtained locally, so the method reduces to a scaled version of proximal gradient.

Numerical Experiments

where C>0C>0 is a parameter prespecified to trade-off between the loss term and the regularization term. We fix CC to 11 for simplicity in our experiments. We consider the publicly available binary classification data sets listed in Table 1Downloaded from https://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/., and partitioned the instances evenly across machines.

The parameters of our algorithm were set as follows: θ=0.5\theta=0.5, β=2\beta=2, σ0=102\sigma_{0}=10^{-2}, σ1=104\sigma_{1}=10^{-4}, m=10m=10, δ=1010\delta=10^{-10}. The parameters in SpaRSA follow the setting in (Wright et al., 2009), θ\theta is set to halve the step size each time, the value of σ0\sigma_{0} follows the default experimental setting of (Lee et al., 2017), δ\delta is set to a small enough number, and m=10m=10 is a common choice for LBFGS.

We ran our experiments on a local cluster of 1616 machines running MPICH2, and all algorithms are implemented in C/C++. The inversion of MM defined in (6) is performed through LAPACK (Anderson et al., 1999). The comparison criteria are the relative objective error (F(w)F)/F(F({\boldsymbol{w}})-F^{*})/F^{*}, versus either the amount communicated (divided by dd) or the overall running time. The former criterion is useful in estimating the performance in environments in which communication cost is extremely high.

In Table 3, we show the distribution of the step sizes over the main iterations, for the same set of values of ϵ1\epsilon_{1}. As we discussed in Section 3, although the smallest α\alpha can be much smaller than one, the unit step is usually accepted. Therefore, although the worst-case communication complexity analysis is dominated by the smallest step encountered, the practical behavior is much better.

2. Comparison with Other Methods

DPLBFGS: our Distributed Proximal LBFGS approach. We fix ϵ1=102\epsilon_{1}=10^{-2} in this experiment.

SPARSA (Wright et al., 2009): the method described in Section 2.4, but applied directly to (1).

We implement all methods in C/C++ and MPI. Note that the AG method (Nesterov, 2013) can also be used, but its empirical performance has been shown to be similar to SpaRSA (Yang and Zhang, 2011) and it requires strong convexity and Lipschitz parameters to be estimated, which induces an additional cost. A further examination on different values of mm indicates that convergence speed of our method improves with larger mm, while in OWLQN, larger mm usually does not lead to better results. We use the same value of mm for both methods and choose a value that favors OWLQN.

The results are provided in Figure 1. Our method is always the fastest in both criteria. For epsilon, our method is orders of magnitude faster, showing that correctly using the curvature information of the smooth part is indeed beneficial in reducing the communication complexity.

Conclusions

In this work, we propose a practical and communication-efficient distributed algorithm for solving general regularized nonsmooth ERM problems. Our algorithm enjoys fast performance both theoretically and empirically and can be applied to a wide range of ERM problems. It is possible to extend our approach for solving the distributed dual ERM problem with a strongly convex primal regularizer, and we expect our framework to outperform state of the art, which only uses block-diagonal parts of the Hessian that can be computed locally. These topics are left for future work.

Acknowledgement

This work was supported by NSF Awards IIS-1447449, 1628384, 1634597, and 1740707; AFOSR Award FA9550-13-1-0138; and Subcontract 3F-30222 from Argonne National Laboratory. The authors thank En-Hsu Yen for fruitful discussions.

References

Appendix A Proofs

By directly expanding f^\nabla\hat{f}, we have that for any p1,p2{\boldsymbol{p}}_{1},{\boldsymbol{p}}_{2},

for bounding ψi\psi_{i} for i>0i>0, and the bound for ψ0\psi_{0} is directly from the bounds of γt\gamma_{t}. The combined bound is therefore [min{c2,δ},max{c1,XTXL}][\min\{c_{2},\delta\},\max\{c_{1},\|X^{T}X\|L\}]. Next, we show that the final ψi\psi_{i} is always upper-bounded. The right-hand side of (11) is equivalent to the following:

Denote the optimal solution by d{\boldsymbol{d}}^{*}, then we have p(i+1)=p(i)+d{\boldsymbol{p}}^{(i+1)}={\boldsymbol{p}}^{(i)}+{\boldsymbol{d}}^{*}. Because HH is upper-bounded by c1c_{1}, we have that f^\nabla\hat{f} is c1c_{1}-Lipschitz continuous. Therefore, using Lemma 12 of Lee and Wright (2018), we get

We then have from c1c_{1}-Lipschitz continuity of f^\nabla\hat{f} that

Since σ0(0,1)\sigma_{0}\in(0,1), we must have c1/(2σ0)(c1/2,c1)c_{1}/(2-\sigma_{0})\in(c_{1}/2,c_{1}), Note that the initialization of ψi\psi_{i} is upper-bounded by c1c_{1} for all i>1i>1, so the final ψi\psi_{i} is upper bounded by 2c12c_{1}. Together with the first iteration that we start with ψ0=γt\psi_{0}=\gamma_{t}, we have that ψi\psi_{i} are always upper-bounded by max{2c1,γt}\max\{2c_{1},\gamma_{t}\}, and we have already proven γt\gamma_{t} is upper-bounded by XTXL\|X^{T}X\|L.

We note that since QQ is c2c_{2}-strongly convex, the following condition holds.

On the other hand, from the optimality condition of (25), we have that

showing Q-linear convergence of SpaRSA, with