Distributed Block Coordinate Descent for Minimizing Partially Separable Functions

Jakub Marecek, Peter Richtarik, Martin Takac

Introduction

With the ever increasing availability of data comes the need to solve ever larger instances of problems in data science and machine learning, many of which turn out to be convex optimization problems of enormous dimensions. A single machine is often unable to store the complete data in its main memory. This suggests the need for efficient algorithms, which can benefit from distributing the data and computations across many computers.

In this paper, we study optimization problems of the form:

where ff is a smooth, convex and partially block separable function, and Ω\Omega is a possibly non-smooth, convex, block separable, and “simple” extended real valued function. The technical definitions of these terms are given in Section 2.

We propose and study the performance of a distributed block coordinate descent method applied to problem (1).

In our method, the blocks of coordinates are first partitioned among C{\it C} computers of a cluster. Likewise, data associated with these blocks are partitioned accordingly and stored in a distributed way. In each of the subsequent iterations, each computer chooses τ{\tau} blocks out of those stored locally, uniformly at random. Then, each computer computes and applies an update to the selected blocks, in parallel, out of information available to it locally. An update, which happens to be the residual in data-fitting problems, is then transmitted to other computers, which receive it either by the beginning of the next iteration or at some later time. In the former case, we denote the methods “sychronous” and we analyse them in detail. In the latter case, we denote the methods “asynchronous” and we include them for the sake of comparison in Section 7.

The main contributions of this paper are, in no particular order:

Partial separability. This is the first time such a distributed block-coordinate descent method is analyzed under the assumption that ff is partially separable.

New step-length. Our method and analysis is based on an expected separable overapproximation (ESO) inequality for partially separable functions and distributed samplings in Theorem 4.1 in Section 4. The length of the step we take in each iteration is given by the optimum of this ESO.

Iteration complexity. We show that the iteration complexity of the method depends on the degree of block separability of ff: the more separable the instance, the fewer iterations the method requires. The complexity results are stated in two theorems in Section 5 and are of the order of O(log(1/ϵ))O(\log(1/\epsilon)) for strongly convex FF and O(1/ϵ)O(1/\epsilon) for general convex FF. At the same time, the separability also reduces the run-time per iteration.

Efficient implementation. When we replace the natural synchronous communications between computers, as analysed in Section 5, with asynchronous communication, we obtain a major speed-up in the computational performance. An efficient open-source implementation of both synchronous and asynchronous methods is available as part of the package http://code.google.com/p/ac-dc/.

Our method and results are valid not only for a cluster setting, where there really are C{\it C} computers which do not share any memory, and hence have to communicate by sending messages to each other, but also for computers using the Non-Uniform Memory Access (NUMA) architecture, where the memory-access time depends on the memory location relative to a processor, and accessing local memory is much faster than accessing memory elsewhere. NUMA architectures are increasingly more common in multi-processor machines.

2 Related work

Before we proceed, we give a brief overview of some existing literature on coordinate descent methods. For further references, we refer the reader to Bertsekas1989 ; RT:PCDM ; fercoq2013accelerated .

Block-coordinate descent. Block-coordinate descent is a simple iterative optimization strategy, where two subsequent iterates differ only in a single block of coordinates. In a very common special case, each block consists of a single coordinate. The choice of the block can be deterministic, e.g., cyclic (Saha10finite ), greedy (RT:TTD2011 ), or randomized. Recent theoretical guarantees for randomized coordinate-descent algorithms can be found in Nesterov:2010RCDM ; RT:UCDC ; fercoq2013smooth ; lu2013complexity ; necoara2013distributed ; lee2013efficient . Coordinate descent algorithms are also closely related to coordinate relaxation, linear and non-linear Gauss-Seidel methods, subspace correction, and domain decomposition (see Bertsekas-Book for references). For classical references on non-randomized variants, we refer to the work of Tseng Tseng:CCMCDM:Smooth ; Tseng:CGDM:Nonsmooth ; Tseng:CGDMLC:Nonsmooth ; Tseng:CBCDM:Nonsmooth .

Parallel block-coordinate descent. Clearly, one can parallelize coordinate descent by updating several blocks in parallel. The related complexity issues were studied by a number of authors. Richtárik and Takáč studied a broad class of parallel methods for the same problem we study in this paper, and introduced the concept of ESO RT:PCDM . The complexity was improved by Tappenden et al. rachael:Improved . An efficient accelerated version was introduced by Fercoq and Richtárik fercoq2013accelerated and an inexact version was studied in T2 . An asynchronous variant was studied by Liu et al. liu2013asynchronous . A non-uniform sampling and a method for dealing with non-smooth functions were described in richtarik2013optimal and fercoq2013smooth , respectively. Further related work can be found in scherrer2012feature ; T1 ; zhao2014stochastic ; hogwild .

Distributed block-coordinate descent. Distributed coordinate descent was first proposed by Bertsekas and Tsitsiklis Bertsekas1989 . The literature on this topic was rather sparse, c.f. DCD , until the research presented in this paper raised the interest, which lead to the analyses of Richtárik and Takáč richtarik2013distributed and Fercoq et al. fercoq2014fast . These papers do not consider blocks, and specialise our results to convex functions admitting a quadratic upper bound.

In the machine-learning community, distributed algorithms have been studied for particular problems, e.g., training of support vector machines 6122723 . Google Chang_psvm:parallelizing developed a library called PSVM, where parallel row-based incomplete Cholesky factorization is employed in an interior-point method. A MapReduce-based distributed algorithm for SVM was found to be effective in automatic image annotation Alham20112801 . Nevertheless, none of these papers use coordinate descent.

Notation and assumptions

In this section, we introduce the notation used in the rest of the paper and state our assumptions formally. We aim to keep our notation consistent with that of Nesterov Nesterov:2010RCDM and Richtárik & Takáč RT:PCDM .

Block structure. We decompose RN\mathbf{R}^{N} into nn subspaces as follows. Let URN×NU\in\mathbf{R}^{N\times N} be the N×NN\times N identity matrix and further let U=[U1,U2,,Un]U=[U_{1},U_{2},\dots,U_{n}] be a column decomposition of UU into nn submatrices, with UiU_{i} being of size N×NiN\times N_{i}, where iNi=N\sum_{i}N_{i}=N. It is easy to observe that any vector xRNx\in\mathbf{R}^{N} can be written uniquely as x=i=1nUix(i)x=\sum_{i=1}^{n}U_{i}x^{(i)}, where x(i)RNix^{(i)}\in\mathbf{R}^{N_{i}}. Moreover, x(i)=UiTxx^{(i)}=U_{i}^{T}x. In view of the above, from now on we write x(i):=UiTxRNix^{(i)}:=U_{i}^{T}x\in\mathbf{R}^{N_{i}}, and call x(i)x^{(i)} the block ii of xx.

Projection onto a set of blocks. Let us denote {1,2,,n}\{1,2,\ldots,n\} by [n][n], a set of blocks S[n]S\subseteq{[n]}, xRNx\in\mathbf{R}^{N}, and let x[S]x_{[S]} be the vector in RN\mathbf{R}^{N} whose blocks iSi\in S are identical to those of xx, but whose other blocks are zeroed out. Block-by-block, we thus have (x[S])(i)=x(i)(x_{[S]})^{(i)}=x^{(i)} for iSi\in S and (x[S])(i)=0RNi(x_{[S]})^{(i)}=0\in\mathbf{R}^{N_{i}}, otherwise. It will be more useful to us however to write

where we adopt the convention that if S=S=\emptyset, the sum is equal 0RN0\in\mathbf{R}^{N}.

Norms. Spaces RNi\mathbf{R}^{N_{i}}, i[n]i\in{[n]}, are equipped with a pair of conjugate norms: t(i)\|t\|_{(i)} and t(i):=maxs(i)1s,t\|t\|_{(i)}^{*}:=\max_{\|s\|_{(i)}\leq 1}\langle s,t\rangle, tRNit\in\mathbf{R}^{N_{i}}. For wR>0nw\in\mathbf{R}^{n}_{>0}, where R>0\mathbf{R}_{>0} is a set of positive real numbers, define a pair of conjugate norms in RN\mathbf{R}^{N} by

We shall assume throughout the paper that ff has the following properties.

Function f:RNRf:\mathbf{R}^{N}\to\mathbf{R} satisfies:

Partial separability. Function ff is of the form

where J\mathcal{J} is a collection of subsets of [n][n] and function fJf_{J} depends on xx through blocks x(i)x^{(i)} for iJi\in J only. The quantity ω:=maxJJJ\omega:=\max_{J\in\mathcal{J}}|J| is the degree of separability of ff.

Convexity. Functions fJf_{J}, JJJ\in\mathcal{J} in (4) are convex.

Smoothness. The gradient of ff is block Lipschitz, uniformly in xx, with positive constants L1,,LnL_{1},\dots,L_{n}. That is, for all xRNx\in\mathbf{R}^{N}, i[n]i\in{[n]} and tRNit\in\mathbf{R}^{N_{i}},

where if(x):=(f(x))(i)=UiTf(x)RNi\nabla_{i}f(x):=(\nabla f(x))^{(i)}=U^{T}_{i}\nabla f(x)\in\mathbf{R}^{N_{i}}.

Note that every function ff is trivially of the form (4): we can always assume that J\mathcal{J} contains just the single set J=[n]J=[n] and let fJ=ff_{J}=f. In this case we would have ω=n\omega=n. However, many functions appearing in applications can naturally be decomposed as a sum of a number of functions each of which depends on a small number of blocks of xx only. That is, many functions have degree of separability ω\omega that is much smaller than nn.

Note that since fJf_{J} are convex, so is ff. While it is possible to remove this assumption and provide an analysis in the non-convex case, this is beyond the scope of this paper.

An important consequence of (5) is the following standard inequality NesterovBook :

We assume that Ω:RNR{+}\Omega:\mathbf{R}^{N}\to\mathbf{R}\cup\{+\infty\} is (block) separable, i.e., that it can be decomposed as follows:

where the functions Ωi:RNiR{+}\Omega_{i}:\mathbf{R}^{N_{i}}\to\mathbf{R}\cup\{+\infty\} are convex and closed.

Distributed block coordinate descent method

In this section we describe our distributed block coordinate descent method (Algorithm 1). It is designed to solve convex optimization problems of the form (1), where the data describing the instance are so large that it is impossible to store these in memory of a single computer.

Pre-processing. Before the method is run, the set of blocks is partitioned into C{\it C} sets P(c)P^{(c)}, c=1,2,,Cc=1,2,\dots,C. Each computer “owns” one partition and will only store and update blocks of xx it owns. That is, the blocks iP(c)i\in P^{(c)} of xx are stored on and updated by computer cc only. Likewise, “all data” relevant to these blocks are stored on computer cc. We deal with the issues of data distribution and communication only in Section 6.

Distributed sampling of blocks. In Step 6 of Algorithm 1, each computer c{\it c} chooses a random subset Zk(c)Z_{k}^{(c)} of blocks from its partition P(c)\it{P}^{({\it c})}. We assume that Zk(c)=τ|Z_{k}^{(c)}|=\tau, and that it is chosen uniformly at random from all subsets of P(c)P^{(c)} of cardinality τ\tau. Moreover, we assume the choice is done independently from all history and from what the other computers do in the same iteration. Formally, we say that the set of blocks chosen by all computers in iteration kk, i.e., Zk=c=1CZk(c)Z_{k}=\cup_{c=1}^{C}Z_{k}^{(c)}, is a (C,τ)(C,\tau)-distributed sampling.

Computing and applying block updates. In Steps 7-9, each computer cc first computes and then applies updates to blocks iZk(c)i\in Z_{k}^{(c)} to xkx_{k}. This is done on each computer in parallel. Hence, we have two levels of parallelism: across the nodes/computers and within each computer. The update to block ii is denoted by h(i)(xk)h^{(i)}(x_{k}) and arises as a solution of an optimization problem in the lower dimensional space RNiR^{N_{i}}:

Our method is most effective when this optimization problem has a closed form solution, which is the case in many applications. Note that nearly all information that describes problem (8) for iP(c)i\in P^{(c)} is available at node cc. In particular, xk(i)x_{k}^{(i)} is stored on cc. Moreover, we can store the description of Ωi\Omega_{i}, norm (i)\|\cdot\|_{(i)} and the pair (β,wi)(\beta,w_{i}), for iP(c)i\in P^{(c)}, on node cc and only there.

Note that we did not specify yet the values of the parameters β\beta and w=(w1,,wn)w=(w_{1},\dots,w_{n}). These depend on the properties of ff and sampling Z^\hat{Z}. We shall give theoretically justified formulas for these parameters in Section 4.

Communication. Finally, note that in order to find h(i)(xk)h^{(i)}(x_{k}), each computer needs to be able to compute if(xk)\nabla_{i}f(x_{k}) for blocks iZk(c)P(c)i\in Z_{k}^{(c)}\subseteq P^{(c)}. This is the only information that an individual computer can not obtain from the data stored locally. We shall describe an efficient communication protocol that allows each node to compute if(xk)\nabla_{i}f(x_{k}) in Section 6.

Balanced partitioning. The set of blocks is partitioned into C{\it C} groups P(1),,P(C)\it{P}^{(1)},\dots,\it{P}^{({\it C})}, each of size s:=n/Cs:=n/{\it C}. That is,

{1,2,,n}=c=1CP(c)\{1,2,\dots,n\}=\cup_{{\it c}=1}^{\it C}\it{P}^{({\it c})},

P(c)P(c)=\it{P}^{(c^{\prime})}\cap\it{P}^{(c^{\prime\prime})}=\emptyset for ccc^{\prime}\neq c^{\prime\prime},

Sampling. For each c{1,,C}c\in\{1,\dots,C\}, the set Z^(c)\hat{Z}^{(c)} is a random subset of P(c)P^{(c)} of size τ{1,2,,s}\tau\in\{1,2,\dots,s\}, where each subset of size τ\tau is chosen with equal probability.

We refer call the random set-valued mapping Z^:=c=1CZ^(c)\hat{{\it{Z}}}:=\cup_{c=1}^{C}\hat{{\it{Z}}}^{(c)} by the name (C,τ)(C,\tau)-distributed sampling.

Expected separable overapproximation (ESO)

The following concept was first defined in RT:PCDM . It plays a key role in the complexity analysis of randomized coordinate descent methods.

Let Z^\hat{\it{Z}} be any uniform sampling, i.e., a random sampling of blocks for which Prob(iZ^)=Prob(jZ^)\mathbf{Prob}(i\in\hat{{\it{Z}}})=\mathbf{Prob}(j\in\hat{{\it{Z}}}) for all i,j[n]i,j\in[n]. We say that function ff admits an ESO with respect to sampling Z^\hat{\it{Z}}, with parameters β>0\beta>0 and wR>0nw\in\mathbf{R}^{n}_{>0}, if the following inequality holds for all x,hRNx,h\in\mathbf{R}^{N}:

For simplicity, we will sometimes write (f,Z^)ESO(β,w)(f,\hat{{\it{Z}}})\sim ESO(\beta,w).

In the rest of this section we derive an ESO inequality for ff satisfying Assumption 2.1 (smooth, convex, partially separable) and for sampling Z^\hat{Z} satisfying Assumption 3.1 ((C,τ)(C,\tau)-distributed sampling). This has not been done before in the literature. In particular, we give simple closed-form formulas for parameters β\beta and ww, which we shall use in Section 5 to shed light on the performance of the method.

We first need to establish an auxiliary result. We use [n][n] to denote {1,2,,n}\{1,2,\ldots,n\}.

Let Z^=c=1CZ^(c)\hat{{\it{Z}}}=\cup_{c=1}^{\it C}\hat{{\it{Z}}}^{(c)} be a (C,τ)({\it C},\tau)-distributed sampling. Pick J[n]J\subseteq[n] and assume that P(c)J=ξ|\it{P}^{({\it c})}\cap J|=\xi for some ξ1\xi\geq 1 and all c{\it c}. Let κ=κ(Z^J,i)\kappa=\kappa(|\hat{{\it{Z}}}\cap J|,i) be any function that depends on Z^J|\hat{{\it{Z}}}\cap J| and i[n]i\in[n] only. Then

Let us denote by J(c)=JP(c)J^{({\it c})}=J\cap\it{P}^{({\it c})}, ζ=Z^J\zeta=|\hat{\it{Z}}\cap J| and ζ(c)=Z^J(c)\zeta^{({\it c})}=|\hat{\it{Z}}\cap J^{({\it c})}|. Then

The main technical result of this paper follows. This is a generalization of a result from RT:PCDM for partially separable ff and τ\tau-nice sampling to the distributed (c>1c>1) case. Notice that for C=1{\it C}=1 we have ξ=ω\xi=\omega.

Let ff satisfy Assumption 2.1 and Z^\hat{\it{Z}} satisfy Assumption 3.1. Let Note that ξ{ωC,,ω}\xi\in\{\lceil\tfrac{\omega}{{\it C}}\rceil,\dots,\omega\}. ξ:=max{P(c)J  :  c{1,,C},  JJ}\xi:=\max\{|\it{P}^{({\it c})}\cap J|\;:\;c\in\{1,\dots,{\it C}\},\;J\in\mathcal{J}\}. Then (f,Z^)(f,\hat{\it{Z}}) admits ESO with parameters β\beta and ww given by

For fixed xRNx\in\mathbf{R}^{N}, define ϕ(h):=f(x+h)f(x)f(x),h\phi(h):=f(x+h)-f(x)-\langle\nabla f(x),h\rangle. Likewise, for all JJJ\in\mathcal{J} we define ϕJ(h):=fJ(x+h)fJ(x)fJ(x),h\phi_{J}(h):=f_{J}(x+h)-f_{J}(x)-\langle\nabla f_{J}(x),h\rangle. Note that

Also note that the functions ϕJ\phi_{J} and ϕ\phi are convex and minimized at h=0h=0, where they attain the value of . For any uniform sampling, and hence for Z^\hat{{\it{Z}}} in particular, and any aRNa\in\mathbf{R}^{N}, one has E[a,h[Z^]]=E[Z^]na,h,\mathbf{E}[\langle a,h_{[\hat{{\it{Z}}}]}\rangle]=\tfrac{\mathbf{E}[|\hat{{\it{Z}}}|]}{n}\langle a,h\rangle, and therefore

Because of this, and in view of (9) and the fact that as E[Z^]=Cτ\mathbf{E}[|\hat{{\it{Z}}}|]={\it C}\tau,In fact, Z^=Cτ|\hat{{\it{Z}}}|={\it C}\tau with probability 1. we only need to show that

Our starting point in establishing (14) will be the observation that from (6) used with t=h(i)t=h^{(i)} we get

To simplify the proof, we shall without loss of generality assume that P(c)J=ξ|\it{P}^{({\it c})}\cap J|=\xi for all c{1,2,,C}c\in\{1,2,\dots,{\it C}\} and JJJ\in\mathcal{J} for some constant ξ>1\xi>1. This can be achieved by extending the sets JJJ\in\mathcal{J} by introducing dummy dependencies (note that the assumptions of the theorem are still satisfied after this change). For brevity, let us write θJ,Z^:=JZ^\theta_{J,\hat{\it{Z}}}:=|J\cap\hat{\it{Z}}| and h[i]:=Uih(i)h_{[i]}:=U_{i}h^{(i)}. Fixing JJJ\in\mathcal{J} and hRNh\in\mathbf{R}^{N}, we can estimate:

In the second equation above we have used the assumption that ϕJ\phi_{J} depends on blocks iJi\in J only. The only inequality above follows from convexity of ϕJ\phi_{J}. Note that this step can only be performed if the sum is over a nonempty index set, which happens precisely when θJ,Z^1\theta_{J,\hat{{\it{Z}}}}\geq 1. This technicality can be handled at the expense of introducing a heavier notation (which we shall not do here), and (16) still holds. Finally, in one of the last steps we have used (10) with κ(Z^J,i)ϕJ(θJ,Z^h[i])\kappa(|\hat{{\it{Z}}}\cap J|,i)\leftarrow\phi_{J}(\theta_{J,\hat{{\it{Z}}}}h_{[i]}).

By summing up inequalities (16) for JJJ\in\mathcal{J}, we get

We now need to compute E[θJ,Z^2]\mathbf{E}[\theta_{J,\hat{\it{Z}}}^{2}]. Note that the random variable θJ,Z^\theta_{J,\hat{\it{Z}}} is the sum of C{\it C} independent random variables θJ,Z^=c=1CθJ,Z^(c)\theta_{J,\hat{\it{Z}}}=\sum_{{\it c}=1}^{\it C}\theta_{J,\hat{\it{Z}}^{({\it c})}}, where θJ,Z^(c)\theta_{J,\hat{\it{Z}}^{({\it c})}} has the simple law

It only remains to combine (17) and (18) to get (14). ∎

Note that ESO inequalities have recently been used in the analysis of distributed coordinate descent methods by Richtárik and Takáč richtarik2013distributed and Fercoq et al. fercoq2014fast However, their assumptions on ff and derivation of ESO are very different and hence our results apply to a different class of functions.

Iteration complexity

In this section, we state two iteration complexity results for Algorithm 1. Theorem 5.1 deals with a non-strongly convex objective and shows that the algorithm achieves sub-linear rate of convergence O(1ϵ)\mathcal{O}(\frac{1}{\epsilon}). Theorem 5.2 shows Algorithm 1 achieves linear convergence rate O(log1ϵ)\mathcal{O}(\log\frac{1}{\epsilon}) for a strongly convex objective.

However, we wish to stress that in high dimensional settings, and especially in applications where low- or medium-accuracy solutions are acceptable, the dependence of the method on ϵ\epsilon is somewhat less important than its dependence on data size through quantities such as the dimension NN and the number of blocks nn, and on quantities such as the number of computers C{\it C} and number of parallel updates per computer τ\tau, which is related to the number of cores.

Notice that once the ESO is established by Theorem 4.1, the complexity results, Theorems 5.1 and 5.2, follow from the generic complexity results in rachael:Improved and RT:PCDM , respectively.

Let ff satisfy Assumption 2.1 and sampling Z^\hat{{\it{Z}}} satisfy Assumption 3.1. Let xkx_{k} be the iterates of Algorithm 1 applied to problem (1), where parameters β\beta and ww are chosen as in Theorem 4.1 and the random sets ZkZ_{k} are iid, following the law of Z^\hat{{\it{Z}}}. Then for all k1k\geq 1,

Note that the leading term in the bound decreases as the number of blocks updated in a single (parallel) iteration, Cτ{\it C}\tau, increases. However, notice that the parameter β\beta also depends on C{\it C} and τ\tau. We shall investigate this phenomenon in Section 5.3 and show that the level of speed-up one gets by increasing C{\it C} and/or τ\tau (where by speed-up we mean the decrease of the upper bound established by the theorem) depends on the degree of separability ω\omega of ff. The smaller ω\omega is, the more speed-up one obtains.

2 Strongly-convex functions

If we assume that FF is strongly convex with respect to the norm w\|\cdot\|_{w} then the following theorem shows that F(xk)F(x_{k}) converges to FF^{*} linearly, with high probability.

Function ϕ:RNR{+}\phi:\mathbf{R}^{N}\to\mathbf{R}\cup\{+\infty\} is strongly convex with respect to the norm w\|\cdot\|_{w} with convexity parameter μϕ(w)0\mu_{\phi}(w)\geq 0 if

where ϕ(x)\phi^{\prime}(x) is any subgradient of ϕ\phi at xx.

Notice that by setting μϕ(w)=0\mu_{\phi}(w)=0, one obtains the usual notion of convexity. Strong convexity of FF may come from ff or Ω\Omega (or both); we write μf(w)\mu_{f}(w) (resp. μΩ(w)\mu_{\Omega}(w)) for the (strong) convexity parameter of ff (resp. Ω\Omega). It follows from (20) that if ff and Ω\Omega are strongly convex, then FF is strongly convex with, e.g., μF(w)μf(w)+μΩ(w).\mu_{F}(w)\geq\mu_{f}(w)+\mu_{\Omega}(w).

Let us adopt the same assumptions as in Theorem 5.1. Moreover, assume that FF is strongly convex with μf(w)+μΩ(w)>0\mu_{f}(w)+\mu_{\Omega}(w)>0. Choose initial point x0RNx_{0}\in\mathbf{R}^{N}, target confidence level 0<ρ<10<\rho<1, target accuracy level 0<ϵ<F(x0)F0<\epsilon<F(x_{0})-F^{*} and

If {xk}\{x_{k}\} are the random points generated by Algorithm 1, then Prob(F(xK)Fϵ)1ρ\mathbf{Prob}(F(x_{K})-F^{*}\leq\epsilon)\geq 1-\rho.

Notice that now both ϵ\epsilon and ρ\rho appear inside a logarithm. Hence, it is easy to obtain accurate solutions with high probability.

3 Parallelization speed-up is governed by sparsity

If we assume that x0xw2F(x0)F\|x_{0}-x^{*}\|_{w}^{2}\gg F(x_{0})-F^{*}, then in view of Theorem 5.1, the number of iterations required by our method to get an ϵ\epsilon solution in expectation is O(βCτϵ)O(\frac{\beta}{{\it C}\tau\epsilon}). Hence, the smaller βCτϵ\frac{\beta}{{\it C}\tau\epsilon} is, the fewer are the iterations required. If β\beta were a constant independent of C{\it C} and τ{\tau}, one would achieve linear speed-up by increasing workload (i.e., by increasing Cτ{\it C}\tau). However, this is the case for C=1{\it C}=1 and ω=1\omega=1 only (see Theorem 4.1). Let us look at the general case. If we write η:=ξs\eta:=\frac{\xi}{s} (this a measure of sparsity of the partitioned data), then

As expected, the first term represents linear speed-up. The second term represents a penalty for the lack of sparsity (correlations) in the data. As Cτ{\it C}{\tau} increases, the second term becomes increasingly dominant, and hence slows the speed-up from almost linear to none. Notice that for fixed η\eta, the ratio βCτ\frac{\beta}{{\it C}\tau} as a function of Cτ{\it C}{\tau} is decreasing and hence we always get some speed-up by increasing Cτ{\it C}\tau.

Figure 1 (left) shows the speed-up factor (Cτβ\frac{{\it C}\tau}{\beta}; high values are good) as a function of Cτ{\it C}{\tau} for different sparsity levels η\eta. One can observe that sparse problems achieve almost linear speed-up even for bigger value of Cτ{\it C}{\tau}, whereas for, e.g., η=0.2\eta=0.2, almost linear speed-up is possible only up to Cτ=10{\it C}{\tau}=10. For sparser data with η=0.01\eta=0.01, linear speed-up can be achieved up to Cτ=100{\it C}{\tau}=100. For η=0.001\eta=0.001, we can use Cτ=103{\it C}{\tau}=10^{3}. The right part of Figure 1 shows how sparsity affects speed-up for a fixed number of updates Cτ{\it C}{\tau}. Again, the break-point of almost linear speed-up is visibly present.

Similar observations in the non-distributed setting were reported in RT:PCDM . The phenomenon is not merely a by-product of our theoretical analysis; it also appears in practice.

4 The cost of distribution

Notice that in a certain intuitive sense, variants of Algorithm 1 are comparable, as long as each iteration updates the same number Cτ{\it C}\tau of blocks. This allows us to vary C{\it C} and τ\tau, while keeping the product constant. In particular, let us consider two scenarios:

Consider C{\it C} computers, each updating τ\tau blocks in parallel, and

Consider 11 computer updating Cτ{\it C}\tau blocks in each iteration in parallel.

For the sake of comparison, we assume that the underlying problem is small enough so that it can be stored on and solved by a single computer. Further, we assume that FF is strongly convex, μ(Ω)=0\mu(\Omega)=0 and s=nC2s=\tfrac{n}{{\it C}}\geq 2. Similar comparisons can be made in other settings as well, but given the page restrictions, we restrict ourselves to this case only.

In the iteration-complexity bound (21), we notice that the only difference is in the value of β\beta. Let β1\beta_{1} be the β\beta parameter in the first situation with C{\it C} computers, and β2\beta_{2} be the β\beta parameter in the second situation with 1 computer. The ratio of the complexity bounds (21) is hence equal to the ratio

Notice that ωCξω\tfrac{\omega}{{\it C}}\leq\xi\leq\omega. The ratio β1/β2\beta_{1}/\beta_{2} is increasing in ξ\xi. We thus obtain the following bounds:

Table 1 presents the values of LB and UB for various parameter choices and problem sizes. We observe that the value of β2\beta_{2} is around 1. The value of β1\beta_{1} depends on a particular partition, but we are sure that β1[β2\mboxLU,β2\mboxUB]\beta_{1}\in[\beta_{2}\cdot\mbox{LU},\beta_{2}\cdot\mbox{UB}]. In Table 1, UB is less than 2, which means that by distributing the computation, the method will at most double the number of iterations. However, larger values of UB, albeit \mboxUBC\mbox{UB}\precsim{\it C}, are possible for different settings of the parameters. For a different class of functions ff, an upper bound of 2 was proven in richtarik2013distributed and improved in fercoq2014fast to the factor 1+1/(τ1)1+1/(\tau-1) whenever τ>1\tau>1.

Of course, if the problem size exceeds the memory available at a single computer, the option of not distributing the data and computation may not be available. It is reassuring, though, to know that the price we pay for distributing the data and computation, in terms of the number of iterations, is bounded. Having said that, a major complication associated with any distributed method is the communication, which we discuss in the two following sections.

Two implementations

Although our algorithm and results apply to a rather broad class of functions, we focus on two important problems in statistics and machine learning in describing our computational experience, so as to highlight the finer details of the implementations.

Once the value of gk=g(xk)g_{k}=g(x_{k}) is available, a new iterate is

and gk+1=g(xk+1)g_{k+1}=g(x_{k+1}) can be easily expressed as

Note that the value δg(c)\delta g^{({\it c})} can be computed on computer c{\it c} as all required data are available on computer c{\it c}. Subsequently, gk+1g_{k+1} can be obtained by summation and the formula for if(x)\nabla_{i}f(x) will take the form if(x)=A:iTg=j=1mAj,ig(j).\nabla_{i}f(x)=A_{{:}{i}}^{T}g=\sum_{j=1}^{m}A_{{j},{i}}g^{(j)}. Once we know how to compute if(x)\nabla_{i}f(x) and LiL_{i}, all that remains to be done is to solve the problem

where a,b,dRa,b,d\in\mathbf{R} and c,λR>0c,\lambda\in\mathbf{R}_{>0}, which is given by a soft-thresholding formula t=sgn(ζ)(ζλc)+d,t^{*}=\textrm{sgn}(\zeta)(|\zeta|-\tfrac{\lambda}{c})_{+}-d, where ζ=dbc.\zeta=d-\tfrac{b}{c}.

2 An implementation for training support vector machines

Let us present another example implementation. The key problem in supervised machine learning is the training of classifiers. Given a matrix ARm×NA\in\mathbf{R}^{m\times N}, a compatible vector yRmy\in\mathbf{R}^{m}, and constant γ>0\gamma>0, the goal is to find a vector xRNx\in\mathbf{R}^{N} which solves the following optimization problem:

where Aj:A_{{j}{:}} again denotes jj-th row of matrix AA and L\mathcal{L} is a loss function, such as

The input (A,y)(A,y) is often referred to as the training data. Rows of matrix AA represent observations of NN features each and yy are the corresponding classifications to train the classifier on.

Square hinge loss is a popular choice of L\mathcal{L}, but is not smooth. It is well known that the dual has the form DCD ; shalev2013stochastic ; TR:MINIBATCH :

where Φ\Phi_{} is the characteristic (or “indicator”) function of the interval $andandQ\in\mathbf{R}^{m\times m}istheGrammatrixofthedata,i.e.,is the Gram matrix of the data, i.e.,Q_{i,j}=y^{(i)}y^{(j)}A_{{i}{:}}A_{{j}{:}}^{T}.If. Ifx^{*}isanoptimalsolutionof(SVMDUAL)thenis an optimal solution of (SVM-DUAL) thenw^{*}=w^{*}(x^{*})=\frac{1}{\lambda m}\sum_{i=1}^{m}y^{(i)}(x^{*})^{(i)}A_{{i}{:}}^{T}$ is an optimal solution of the primal problem

where L(w,Ai:,y(i))=max{0,1y(i)Ai:w}\mathcal{L}(w,A_{{i}{:}},y^{(i)})=\max\{0,1-y^{(i)}A_{{i}{:}}w\}.

Our second example implementation is a distributed coordinate-descent algorithm for support vector machines (SVM) in the (SVM-DUAL) formulation. In this case, we define

The optimal step length is then solution of a one-dimensional problem:

The new value of the auxiliary vector gk+1=g(xk+1)g_{k+1}=g(x_{k+1}) is given by

and the duality gap G(xk)=P(gk)+F(xk)G(x_{k})=P(g_{k})+F(x_{k}) can be easily obtained TR:MINIBATCH ; shalev2013stochastic ; jaggi2014communication as

Per-iteration complexity

Using to the auxiliary vector gkg_{k}, which was introduced in the previous section, Algorithm 1 has two alternating and time consuming sub-procedures, namely:

computation of an update iZk(c)Uih(i)(xk)\sum_{i\in Z_{k}^{({\it c})}}U_{i}h^{(i)}(x_{k}) and the accumulation of gkg_{k}: δg(c)\delta g^{({\it c})},

Let us denote the run-time of the first sub-procedure by T1(τ)\mathcal{T}_{1}({\tau}), considering this depends on τ{\tau}, and the run-time of a second one by T2\mathcal{T}_{2}. We will neglect the rest of the run-time cost, such as managing a loop, evaluation of termination criteria, measuring a computation time, etc. The total run-time cost TT\mathcal{T}_{T} is hence given by

where we consider the case when μΩ(w)0\mu_{\Omega}(w)\equiv 0 in (21). Let us now for simplicity assume that the first sub-procedure is linear in τ{\tau}, i.e., T1(τ)=τT1(1)=:τT1\mathcal{T}_{1}({\tau})={\tau}\mathcal{T}_{1}(1)=:{\tau}\mathcal{T}_{1}. Then

Numerical values of T1\mathcal{T}_{1} and T2\mathcal{T}_{2} could be estimated, given problem sparsity and underlying hardware, or can be measured during the run.

Optimal choice of sampling parameter τ{\tau}. In the previous paragraph, we gave an estimate of the complexity of a single iteration. In this paragraph, we answer the question of how to choose a τ{\tau} given times T1,T2\mathcal{T}_{1},\mathcal{T}_{2}. For variable β\beta, we have more options, but we stick to the most general one given in (11). Given that s2s\geq 2, we have

where r1,2=T1T2r_{1,2}=\frac{\mathcal{T}_{1}}{\mathcal{T}_{2}} is a work to communication ratio. The optimal parameter τ{\tau}^{*} can be obtain by minimizing (36) and is given by

Therefore, smaller values of r1,2r_{1,2} imply that we should do more work in each iteration, and hence bigger values of τ{\tau} should be chosen. This is quite natural, as one should tune the parameters in such a way that time spent in communication should be in comparable with that of effective computation.

Message Passing Interface (MPI). In order to discuss finer details of the implementations, we need to introduce the architecture we use. We use OpenMP OpenMP for dealing with concurrency within a single computer and Message Passing Interface (MPI) MPI as the abstraction layer for network communication. In MPI, one passes data from one MPI process to another MPI process, which may run on another computer. (We disregard the concept of groups for brevity.) Communication can involve any subset of computers, which run MPI processes. Communication can be either blocking (“synchronous”) or non-blocking (“asynchronous”). A collective operation involves the communication among two or more MPI processes. An example of a collective operation is a barrier, where computers wait until all of them reach the same point in the algorithm. Another common collective operation is reduce all, which is parametrized by an arbitrary operation that takes a set of elements and produces a single element of the same type. This “reduce” operation is applied to all elements of the particular type stored across all MPI processes and the result is returned to all MPI processes. For example, let us assume that each computer stores a vector δg(c)Rm\delta g^{({\it c})}\in\mathbf{R}^{m} and the goal is to sum it up, i.e., to compute δg(1,,C)=c=1Cδg(c)\delta g^{(1,\dots,{\it C})}=\sum_{{\it c}=1}^{\it C}\delta g^{({\it c})} and to make this result available on each computer. Figure 3 shows a standard approach, which leads to the desired result. From the performance point of view, however, the use of reduce all should be minimized, as it involves an implicit synchronisation and leaves most of the computers idle throughout the collective operation.

This suggests the following range of progressively better-performing variants:

Alternating Parallel and Serial regions (PS). The naïve implementation alternates two sub-procedures. One, which is computationally heavy and is done in parallel, but with no MPI communication, and another one, which is purely communicational. As an easy fix, one can dedicate one thread to the communication and other threads within the same computer to computation. We call this approach Fully Parallel (FP)). Figure 2 compares the naïve strategy (left) with the FP (right),

Reduce All (RA). As mentioned above, the use of reduce all operations significantly decreases the performance of many distributed algorithms. It is, however, the preferred form of communication between computers close to each other in the computer network, such as computers directly connected by a network cable. The use of asynchronous methods is also preferred over synchronous methods.

Asynchronous StreamLined (ASL). We propose another pattern of communication, where each computer in one iteration sends only one message to the closest computer, asynchronously, and receives only one message from another computer close-by, asynchronously. The communication hence takes place in an ring. This tweak, however, requires a significant change in the algorithm. Figure 4 illustrates the data flow of messages at the end of iteration kk for C=4{\it C}=4.

We fix an order of computers in a ring, denoting predR(c)\textrm{pred}_{R}({\it c}) and succR(c)\textrm{succ}_{R}({\it c}) the two computers neighbouring computer c{\it c} along the two directions on the ring. Computer c{\it c} always receives data only from computer predR(c)\textrm{pred}_{R}({\it c}) and sends data only to computer succR(c)\textrm{succ}_{R}({\it c}). Let us denote by δGk(c)\delta G_{k}^{({\it c})} the data, which computer c{\it c} sends to computer succR(c)\textrm{succ}_{R}({\it c}) at the end of iteration kk. When computer c{\it c} starts iteration kk, it has already received δGk1(predR(c))\delta G_{k-1}^{(\textrm{pred}_{R}({\it c}))}.For the start of the algorithm we define δgl(c)=δGl(c)=0\delta g_{l}^{({\it c})}=\delta G_{l}^{({\it c})}={\bf 0} for all l<0l<0. Hence the data, which will be sent at the end of iteration kk by computer c{\it c} are:

It should be noticed that at the end of each iteration in the ASL procedure, each computer has a different vector gkg_{k}, which we denote gk(c)g_{k}^{({\it c})}. The update rule is

The clear advantage of the ASL method is a decrease in communication time. On the other hand it comes with a cost of slower propagation of information. Indeed, it takes C1{\it C}-1 iterations to propagate information to all computers. It also comes with bigger storage requirements, as at iteration kk, we have to have all vectors δgl(c)\delta g_{l}^{({\it c})} for kClkk-{\it C}\leq l\leq k stored on computer c{\it c}.

Asynchronous Torus (AST). There is a compromise solution, though, which inherits many desirable features of both RA and ASL. This employs a toroidal networking topology, which is common in high-performance computing (HPC) in general, and HPC using InfiniBand networks InfiniBand , in particular. Let us assume that C{\it C} is a multiple of rNr\in N, where rr represents the width of a torus, i.e., C{\it C} computers are partitioned into subsets RiR_{i} each with size rr. Each group RiR_{i} has a root computer. These root computers aggregate updates from their respective groups, e.g., using a local reduce all operation, in each iteration and exchange those update in an asynchronous ring with two other adjacent root computers. Thus the communication between the root nodes follows the ASL communication pattern. The AST approach decreases the propagation time from C{\it C} to Cr\frac{{\it C}}{r}, additional storage is also decrease by factor rr, and the overall communication complexity remains low.

The Comparison. Changing from the FP approach to the PS approach does not require much computational or storage overhead, but can reduce the idle time of processors. However, changing from RA to SLA or AST brings significant storage requirements, while it reduces both communication and idle time significantly. Table 2 summarize maximum memory requirements on each single node of the cluster, time spent in communication, and amount of data transferred over the network. Once the time spent in communication is measured or estimated, one can pick the most appropriate strategy. Notice that the wall-clock time required for the reduce all operation, Tra\mathcal{T}_{ra}, is typically of the order O(logC)Tp2p,\mathcal{O}(\log{\it C})\cdot\mathcal{T}_{p2p}, where Tp2p\mathcal{T}_{p2p} is the time required by the point-to-point transmission.

Numerical experiments

In this section we present numerical evidence of the efficiency of the distributed (block) coordinate-descent method.

The code. The code of the distributed (block) coordinate-descent solver is part of our AC-DC library, available at http://code.google.com/p/ac-dc/. The library is written in C++ using OpenMP. The extensive use of template classes, Boost::MPI, and Boost.Serialization makes it easy to change the composite function and the precision of the computation. Both wall-clock and CPU-time were measured using Boost::Timers, which achieve nano-second accuracy on recent processors running recent versions of Linux.

The facility. Our empirical tests were conducted in UK’s high-performance computing facility, HECToR, equipped with multi-core computers connected using Infiniband InfiniBand . In particular, in Phase 3 of the facility, which is a Cray XE6 cluster, we have used up to 128 nodes, equipped with two AMD Opteron Interlagos 16-core processors and 32 GB of memory each. This gave us 4,096 cores in total, interconnected using Cray Gemini routers in a 3D torus. Each Gemini router was connected to processors and random-access memory of two nodes via HyperTransport links. Each router is then connected to ten other routers. In practice, the latency is about 1–1.5 microseconds and the capacity of each link is 8 GBs-1. The facility ran a Cray Linux Environment, based on SuSE Linux.

Support Vector Machines (SVM). One of the goals of this paper is to train huge sparse support-vector machines (SVM) that do not fit into the memory of a single computer. In the machine learning literature, one often performs experiments on instances of moderate size, e.g., 100 MB pegasos ; DCD ; TR:MINIBATCH . Well-known instances of this scale include, e.g., CCAT variant of RCV1 Lewis2004 , Astro-ph pegasos , and COV pegasos . In this Section, we focus on a larger dataset, known as WebSpam libsvm . This dataset consists of 350,000 observations (rows) and 16,609,143 features (columns). The size of the instance is 25 GB. Figure 5 show the execution time and duality gap for WebSpam dataset, using C=16{\it C}=16 MPI processes, with each process using 8 threads. τ\tau is the number of coordinates updated by one MPI process during one iteration. As expected, the main run-time cost it not computing the updates, but updating gg.

Let us remark that ϵ\epsilon is usually not particularly small in the machine-learning community. In experimenting with small ϵ\epsilon, we just wanted to demonstrate that our algorithm is able to close the duality gap within the limits of machine precision. The truly important measures of the performance of the classifier, e.g., 0-1 loss or prediction error, are actually within 10 % after the first minute, which is the first time we compute it. In practice, a duality gap of 0.1 or 0.01 can be sufficient for machine learning problems.

Sparse least squares (LASSO). Next, we solved an artificial instance of sparse least squares with a matrix of n=109n=10^{9} rows and d=5108d=5\cdot 10^{8} columns in block-angular form:

requiring 3 TB to store. Such matrices often arise in stochastic optimization. We used 128 nodes with 4 MPI processes on each node. Each MPI process ran 8 OpenMP threads, giving a total of 4,096 hardware threads. Each node c{\it c} stored two matrices: Aloc(c)R1,952,148×976,562A_{loc}^{({\it c})}\in\mathbf{R}^{1,952,148\times 976,562} and Aglob(c)R500,224×976,562A_{glob}^{({\it c})}\in\mathbf{R}^{500,224\times 976,562}. The average number of non-zero elements per row is 175175 and 1,0001,000 for Aloc(c)A^{({\it c})}_{loc} and Aglob(c)A_{glob}^{({\it c})}, respectively. When communicating gk(c)g_{k}^{({\it c})}, only entries corresponding to the global part of A(c)A^{({\it c})} need to be communicated, and hence in RA, a reduce all operation is applied to vectors δgglob(c)R500,224\delta g_{glob}^{({\it c})}\in\mathbf{R}^{500,224}. In ASL, vectors with the same length are sent. The optimal solution xx^{*} has exactly 160,000160,000 nonzero elements. Figure 6 compares the evolution of F(xk)FF(x_{k})-F^{*} for ASL-FP and RA-FP.

Conclusions

Overall, distributed algorithms can be both very efficient and easy to implement, when one picks the right approach. The first steps taken by the present authors over the past two years seem to have been validated by the considerable interest richtarik2013distributed ; fercoq2014fast ; jaggi2014communication they have generated.

References

Notation Glossary