Parallel training
This section contains articles and codes on the topic of accelerating training using parallel data processing.
All this applies only to the Newton-Kaczmarz method. It is assumed that reader is familiar with the concept. If not,
I recommend skim through the article The complete idiot's guide to Kolmogorov-Arnold Networks.
Who wish to read it in more scientific form can find the following papers:
Discrete Urysohn Operator
Kolmogorov-Arnold Representation
Newton-Kaczmarz Method
The Newton-Kaczmarz method is fast already when executed sequentially in a single CPU. In different benchmark tests
it is 5 to 50 times quicker than well optimized MLP code written in C++. However, we decided not to stop at this benchmark
and move on to customizing the method for parallel processing on as many processors as available in a hardware.
So, the goal is to expedite processing up to several hundred times faster compared to regular MLPs.
Infinite concurrency
The subtitle is conventional, all people understand that there is nothing infinite, including universe, however, it assumes
an ability of running very large number of concurrent threads. As we show here it is possible.
We can define Kolmogorov-Arnold networks by sequential equations describing layers in a quantity two or more
$$
z(t) = \int_{s_1}^{s_2} F_1[y(s),s,t]ds, \quad t \in [t_1, t_2]
$$
$$
y(s) = \int_{p_1}^{p_2} F_2[x(p),p,s]dp, \quad s \in [s_1, s_2]
$$
$$
...
$$
Assuming model formed by only two equations, $x(p)$ is an input or features, $z(t)$ is an output or target and $y(s)$ is hidden layer.
The model is defined by two kernels $F_1$ and $F_2$ which have to be identified in a training process provided dataset
of inputs and outputs. The integral equations used here were introduced by Pavel Urysohn in 1924 as conversion of
one function into another $x$ to $y$ and $y$ to $z$. Urysohn assumed that the problem will be finding functions sitting
under kernel provided the output and kernel. It appeared now that we need to find kernels in case of multiple unobserved
layers and need to run it concurrently for given large set of inputs and outputs.
By applying quadrature formula we convert continuous kernels into tensors
$$
z_i = \sum_{j=1}^{n} f_{i,j}(y_j), \quad i = 1,2, ..., m,
$$
where $f_{i,j}$ is matrix of functions and, when each function is provided by the sequence of its values, it it is a tensor.
As it is already shown in all published articles the Newton-Kaczmarz training has two steps. First is backpropagation of the difference between model
and actual values obtained by gradient descent. Second is tensor correction step. Theoretically computations and update of tensor elements
can be performed concurrently.
Atomic type
It is std::atomic class template in the C++ Standard Library. Most of its
part was introduced in C++11 and some more options in C++17. It supports thread-safe modification of the same element (defined by atomic template)
at runtime and safe reading of this element in another thread. The important thing is that atomic type should be declared for each
element individually and not for entire buffer. The change in the code is only different type declarations and method call
instead of simple assignment for double and integer. Looks perfect, but ...
... only a type replacement for all involved variables and using corresponding assignment methods, slows down twice. Runing
multiple concurrent threads improves the performance but not as fast as desired. So it is improved parallel processing, but
not infinite concurrency. One example of using atomic type for a tensor is below:
#include <vector>
#include <atomic>
#include <thread>
#include <iostream>
constexpr int N1 = 10, N2 = 15, N3 = 20;
std::vector<std::vector<std::vector<std::atomic<double>>>> Tensor(N1);
std::vector<std::vector<std::vector<double>>> Test1(N1);
std::vector<std::vector<std::vector<double>>> Test2(N1);
void thread_safe_modification_of_single_matrix_element(std::atomic<double>& element, double addend) {
double current_value = element.load(std::memory_order_relaxed);
while (!element.compare_exchange_weak(current_value, current_value + addend, std::memory_order_relaxed));
}
void entire_tensor_modification_worker(int step, int value) {
int n1 = 0;
int n2 = 0;
int n3 = 0;
for (int i = 0; i < N1; ++i) {
for (int j = 0; j < N2; ++j) {
for (int k = 0; k < N3; ++k) {
thread_safe_modification_of_single_matrix_element(Tensor[n1][n2][n3], value);
n3 += step;
n3 = n3 % N3;
}
n2 += step;
n2 = n2 % N2;
}
n1 += step;
n1 = n1 % N1;
}
}
void update_entire_tensor_function(int step, int value) {
int n1 = 0;
int n2 = 0;
int n3 = 0;
for (int i = 0; i < N1; ++i) {
for (int j = 0; j < N2; ++j) {
for (int k = 0; k < N3; ++k) {
Test2[n1][n2][n3] += value;
n3 += step;
n3 = n3 % N3;
}
n2 += step;
n2 = n2 % N2;
}
n1 += step;
n1 = n1 % N1;
}
}
void modify_tensor_in_concurrent_threads() {
clock_t start = clock();
std::thread t1(entire_tensor_modification_worker, 1, 1);
std::thread t2(entire_tensor_modification_worker, 2, 2);
std::thread t3(entire_tensor_modification_worker, 3, 3);
std::thread t4(entire_tensor_modification_worker, 4, 4);
std::thread t5(entire_tensor_modification_worker, 5, 5);
std::thread t6(entire_tensor_modification_worker, 6, 6);
std::thread t7(entire_tensor_modification_worker, 7, 7);
std::thread t8(entire_tensor_modification_worker, 8, 8);
t1.join();
t2.join();
t3.join();
t4.join();
t5.join();
t6.join();
t7.join();
t8.join();
clock_t end = clock();
printf("Time for concurrent %2.3f sec.\n", (double)(end - start) / CLOCKS_PER_SEC);
}
void modify_tensor_in_sequential_threads() {
clock_t start = clock();
std::thread t1(entire_tensor_modification_worker, 1, 1);
t1.join();
std::thread t2(entire_tensor_modification_worker, 2, 2);
t2.join();
std::thread t3(entire_tensor_modification_worker, 3, 3);
t3.join();
std::thread t4(entire_tensor_modification_worker, 4, 4);
t4.join();
std::thread t5(entire_tensor_modification_worker, 5, 5);
t5.join();
std::thread t6(entire_tensor_modification_worker, 6, 6);
t6.join();
std::thread t7(entire_tensor_modification_worker, 7, 7);
t7.join();
std::thread t8(entire_tensor_modification_worker, 8, 8);
t8.join();
clock_t end = clock();
printf("Time for sequential %2.3f sec.\n", (double)(end - start) / CLOCKS_PER_SEC);
}
void modify_tensor_not_in_threads() {
clock_t start = clock();
update_entire_tensor_function(1, 1);
update_entire_tensor_function(2, 2);
update_entire_tensor_function(3, 3);
update_entire_tensor_function(4, 4);
update_entire_tensor_function(5, 5);
update_entire_tensor_function(6, 6);
update_entire_tensor_function(7, 7);
update_entire_tensor_function(8, 8);
clock_t end = clock();
printf("Time for regular sequential %2.3f sec.\n", (double)(end - start) / CLOCKS_PER_SEC);
}
int main() {
//set initial state
for (int i = 0; i < N1; ++i) {
Tensor[i] = std::vector<std::vector<std::atomic<double>>>(N2);
Test1[i] = std::vector<std::vector<double>>(N2);
Test2[i] = std::vector<std::vector<double>>(N2);
for (int j = 0; j < N2; ++j) {
Tensor[i][j] = std::vector<std::atomic<double>>(N3);
Test1[i][j] = std::vector<double>(N3);
Test2[i][j] = std::vector<double>(N3);
for (int k = 0; k < N3; ++k) {
Tensor[i][j][k].store(0.0);
Test1[i][j][k] = 0.0;
Test2[i][j][k] = 0.0;
}
}
}
modify_tensor_in_sequential_threads();
//copy obtained matrix to buffer
for (int i = 0; i < N1; ++i) {
for (int j = 0; j < N2; ++j) {
for (int k = 0; k < N3; ++k) {
Test1[i][j][k] = Tensor[i][j][k].load(std::memory_order_relaxed);
Tensor[i][j][k].store(0.0);
}
}
}
modify_tensor_in_concurrent_threads();
modify_tensor_not_in_threads();
//compare test data
bool isEqual = true;
for (int i = 0; i < N1; ++i) {
for (int j = 0; j < N2; ++j) {
for (int k = 0; k < N3; ++k) {
if (Test1[i][j][k] != Tensor[i][j][k].load(std::memory_order_relaxed)) {
isEqual = false;
break;
}
if (Test2[i][j][k] != Tensor[i][j][k].load(std::memory_order_relaxed)) {
isEqual = false;
break;
}
}
}
}
if (isEqual) {
printf("The data buffers are identical.\n\n");
}
else {
printf("Data mismatch\n");
}
return 0;
}
Acceleration of descent by concurrent records' validation
The update of model parameters is going record by record. The magnitudes of update increments depend on residual errors.
After first quick part of training, it can be found that residual errors are very different for different records and
for some of them are near zero. So, the obvious acceleration is to run estimation of residuals in several concurrent
threads and select for training only records with large residual errors.
The quick experiment showed that potential acceleration is near 5 times quicker compared to sequential mode. That is
also fine and can be used but not an infinite concurrency again. For those who wish to repeat this experiment I can
publish code for test only. It does not do acceleration, it only estimates possible gain
int counter = 0;
double error = 0.0;
while (true) {
//select
auto residual = std::vector(nTrainingRecords);
auto index = std::vector(nTrainingRecords);
for (int i = 0; i < nTrainingRecords; ++i) {
double model = 0.0;
for (int j = 0; j < nAddends; ++j) {
model += addends[j]->ComputeUsingInput(inputs_training[i], true);
}
residual[i] = abs(target_training[i] - model);
index[i] = i;
}
std::sort(index.begin(), index.end(), [&](int i, int j) { return residual[i] > residual[j]; });
//update
int k = index[0];
double model = 0.0;
for (int j = 0; j < nAddends; ++j) {
model += addends[j]->ComputeUsingInput(inputs_training[k]);
}
double res = target_training[k] - model;
res *= mu;
for (int j = 0; j < nAddends; ++j) {
addends[j]->UpdateUsingMemory(res);
}
//accuracy
if (0 == counter % 100) {
error = 0.0;
for (int i = 0; i < nValidationRecords; ++i) {
double vmodel = 0.0;
for (int j = 0; j < nAddends; ++j) {
vmodel += addends[j]->ComputeUsingInput(inputs_validation[i], true);
}
error += (target_validation[i] - vmodel) * (target_validation[i] - vmodel);
}
error /= nValidationRecords;
error = sqrt(error) / (targetMax - targetMin);
}
printf("%d %d %f %f\n", counter, index[0], residual[index[0]], error);
++counter;
}
In this code the record with largest residual error is selected and used for training, so the model is updated, then
another record with largest residual error is selected and so on. This is sort of theoretical limit. In reality
because of some overhead, it could be three or two times faster than sequential code. It is better than nothing,
but not the desired solution yet.
|
|