Content



Parallel training

This section contains articles and codes on the topic of accelerating training using parallel data processing. All this applies only to the Newton-Kaczmarz method. It is assumed that reader is familiar with the concept. If not, I recommend skim through the article The complete idiot's guide to Kolmogorov-Arnold Networks. Who wish to read it in more scientific form can find the following papers:

Discrete Urysohn Operator
Kolmogorov-Arnold Representation
Newton-Kaczmarz Method

The Newton-Kaczmarz method is fast already when executed sequentially in a single CPU. In different benchmark tests it is 5 to 50 times quicker than well optimized MLP code written in C++. However, we decided not to stop at this benchmark and move on to customizing the method for parallel processing on as many processors as available in a hardware. So, the goal is to expedite processing up to several hundred times faster compared to regular MLPs.

Infinite concurrency

The subtitle is conventional, all people understand that there is nothing infinite, including universe, however, it assumes an ability of running very large number of concurrent threads. As we show here it is possible.

We can define Kolmogorov-Arnold networks by sequential equations describing layers in a quantity two or more $$ z(t) = \int_{s_1}^{s_2} F_1[y(s),s,t]ds, \quad t \in [t_1, t_2] $$ $$ y(s) = \int_{p_1}^{p_2} F_2[x(p),p,s]dp, \quad s \in [s_1, s_2] $$ $$ ... $$ Assuming model formed by only two equations, $x(p)$ is an input or features, $z(t)$ is an output or target and $y(s)$ is hidden layer. The model is defined by two kernels $F_1$ and $F_2$ which have to be identified in a training process provided dataset of inputs and outputs. The integral equations used here were introduced by Pavel Urysohn in 1924 as conversion of one function into another $x$ to $y$ and $y$ to $z$. Urysohn assumed that the problem will be finding functions sitting under kernel provided the output and kernel. It appeared now that we need to find kernels in case of multiple unobserved layers and need to run it concurrently for given large set of inputs and outputs.

By applying quadrature formula we convert continuous kernels into tensors $$ z_i = \sum_{j=1}^{n} f_{i,j}(y_j), \quad i = 1,2, ..., m, $$ where $f_{i,j}$ is matrix of functions and, when each function is provided by the sequence of its values, it it is a tensor.

As it is already shown in all published articles the Newton-Kaczmarz training has two steps. First is backpropagation of the difference between model and actual values obtained by gradient descent. Second is tensor correction step. Theoretically computations and update of tensor elements can be performed concurrently.

Atomic type

It is std::atomic class template in the C++ Standard Library. Most of its part was introduced in C++11 and some more options in C++17. It supports thread-safe modification of the same element (defined by atomic template) at runtime and safe reading of this element in another thread. The important thing is that atomic type should be declared for each element individually and not for entire buffer. The change in the code is only different type declarations and method call instead of simple assignment for double and integer. Looks perfect, but ...

... only a type replacement for all involved variables and using corresponding assignment methods, slows down twice. Runing multiple concurrent threads improves the performance but not as fast as desired. So it is improved parallel processing, but not infinite concurrency. One example of using atomic type for a tensor is below:
#include <vector>
#include <atomic>
#include <thread>
#include <iostream>

constexpr int N1 = 10, N2 = 15, N3 = 20;
std::vector<std::vector<std::vector<std::atomic<double>>>> Tensor(N1);
std::vector<std::vector<std::vector<double>>> Test1(N1);
std::vector<std::vector<std::vector<double>>> Test2(N1);

void thread_safe_modification_of_single_matrix_element(std::atomic<double>& element, double addend) {
    double current_value = element.load(std::memory_order_relaxed);
    while (!element.compare_exchange_weak(current_value, current_value + addend, std::memory_order_relaxed));
}

void entire_tensor_modification_worker(int step, int value) {
    int n1 = 0;
    int n2 = 0;
    int n3 = 0;
    for (int i = 0; i < N1; ++i) {
        for (int j = 0; j < N2; ++j) {
            for (int k = 0; k < N3; ++k) {
                thread_safe_modification_of_single_matrix_element(Tensor[n1][n2][n3], value);
                n3 += step;
                n3 = n3 % N3;
            }
            n2 += step;
            n2 = n2 % N2;
        }
        n1 += step;
        n1 = n1 % N1;
    }
}

void update_entire_tensor_function(int step, int value) {
    int n1 = 0;
    int n2 = 0;
    int n3 = 0;
    for (int i = 0; i < N1; ++i) {
        for (int j = 0; j < N2; ++j) {
            for (int k = 0; k < N3; ++k) {
                Test2[n1][n2][n3] += value;
                n3 += step;
                n3 = n3 % N3;
            }
            n2 += step;
            n2 = n2 % N2;
        }
        n1 += step;
        n1 = n1 % N1;
    }
}

void modify_tensor_in_concurrent_threads() {
    clock_t start = clock();

    std::thread t1(entire_tensor_modification_worker, 1, 1);
    std::thread t2(entire_tensor_modification_worker, 2, 2);
    std::thread t3(entire_tensor_modification_worker, 3, 3);
    std::thread t4(entire_tensor_modification_worker, 4, 4);
    std::thread t5(entire_tensor_modification_worker, 5, 5);
    std::thread t6(entire_tensor_modification_worker, 6, 6);
    std::thread t7(entire_tensor_modification_worker, 7, 7);
    std::thread t8(entire_tensor_modification_worker, 8, 8);

    t1.join();
    t2.join();
    t3.join();
    t4.join();
    t5.join();
    t6.join();
    t7.join();
    t8.join();

    clock_t end = clock();
    printf("Time for concurrent %2.3f sec.\n", (double)(end - start) / CLOCKS_PER_SEC);
}

void modify_tensor_in_sequential_threads() {
    clock_t start = clock();

    std::thread t1(entire_tensor_modification_worker, 1, 1);
    t1.join();
    std::thread t2(entire_tensor_modification_worker, 2, 2);
    t2.join();
    std::thread t3(entire_tensor_modification_worker, 3, 3);
    t3.join();
    std::thread t4(entire_tensor_modification_worker, 4, 4);
    t4.join();
    std::thread t5(entire_tensor_modification_worker, 5, 5);
    t5.join();
    std::thread t6(entire_tensor_modification_worker, 6, 6);
    t6.join();
    std::thread t7(entire_tensor_modification_worker, 7, 7);
    t7.join();
    std::thread t8(entire_tensor_modification_worker, 8, 8);
    t8.join();

    clock_t end = clock();
    printf("Time for sequential %2.3f sec.\n", (double)(end - start) / CLOCKS_PER_SEC);
}

void modify_tensor_not_in_threads() {
    clock_t start = clock();

    update_entire_tensor_function(1, 1);
    update_entire_tensor_function(2, 2);
    update_entire_tensor_function(3, 3);
    update_entire_tensor_function(4, 4);
    update_entire_tensor_function(5, 5);
    update_entire_tensor_function(6, 6);
    update_entire_tensor_function(7, 7);
    update_entire_tensor_function(8, 8);

    clock_t end = clock();
    printf("Time for regular sequential %2.3f sec.\n", (double)(end - start) / CLOCKS_PER_SEC);
}

int main() {

    //set initial state
    for (int i = 0; i < N1; ++i) {
        Tensor[i] = std::vector<std::vector<std::atomic<double>>>(N2);
        Test1[i] = std::vector<std::vector<double>>(N2);
        Test2[i] = std::vector<std::vector<double>>(N2);
        for (int j = 0; j < N2; ++j) {
            Tensor[i][j] = std::vector<std::atomic<double>>(N3);
            Test1[i][j] = std::vector<double>(N3);
            Test2[i][j] = std::vector<double>(N3);
            for (int k = 0; k < N3; ++k) {
                Tensor[i][j][k].store(0.0);
                Test1[i][j][k] = 0.0;
                Test2[i][j][k] = 0.0;
            }
        }
    }

    modify_tensor_in_sequential_threads();

    //copy obtained matrix to buffer
    for (int i = 0; i < N1; ++i) {
        for (int j = 0; j < N2; ++j) {
            for (int k = 0; k < N3; ++k) {
                Test1[i][j][k] = Tensor[i][j][k].load(std::memory_order_relaxed);
                Tensor[i][j][k].store(0.0);
            }
        }
    }

    modify_tensor_in_concurrent_threads();
    modify_tensor_not_in_threads();

    //compare test data
    bool isEqual = true;
    for (int i = 0; i < N1; ++i) {
        for (int j = 0; j < N2; ++j) {
            for (int k = 0; k < N3; ++k) {
                if (Test1[i][j][k] != Tensor[i][j][k].load(std::memory_order_relaxed)) {
                    isEqual = false;
                    break;
                }
                if (Test2[i][j][k] != Tensor[i][j][k].load(std::memory_order_relaxed)) {
                    isEqual = false;
                    break;
                }
            }
        }
    }

    if (isEqual) {
        printf("The data buffers are identical.\n\n");
    }
    else {
        printf("Data mismatch\n");
    }

    return 0;
}

Acceleration of descent by concurrent records' validation

The update of model parameters is going record by record. The magnitudes of update increments depend on residual errors. After first quick part of training, it can be found that residual errors are very different for different records and for some of them are near zero. So, the obvious acceleration is to run estimation of residuals in several concurrent threads and select for training only records with large residual errors.

The quick experiment showed that potential acceleration is near 5 times quicker compared to sequential mode. That is also fine and can be used but not an infinite concurrency again. For those who wish to repeat this experiment I can publish code for test only. It does not do acceleration, it only estimates possible gain
    int counter = 0;
    double error = 0.0;
    while (true) {
        //select
        auto residual = std::vector(nTrainingRecords);
        auto index = std::vector(nTrainingRecords);
        for (int i = 0; i < nTrainingRecords; ++i) {
            double model = 0.0;
            for (int j = 0; j < nAddends; ++j) {
                model += addends[j]->ComputeUsingInput(inputs_training[i], true);
            }
            residual[i] = abs(target_training[i] - model);
            index[i] = i;
        }
        std::sort(index.begin(), index.end(), [&](int i, int j) { return residual[i] > residual[j]; });

        //update
        int k = index[0];
        double model = 0.0;
        for (int j = 0; j < nAddends; ++j) {
            model += addends[j]->ComputeUsingInput(inputs_training[k]);
        }
        double res = target_training[k] - model;
        res *= mu;
        for (int j = 0; j < nAddends; ++j) {
            addends[j]->UpdateUsingMemory(res);
        }

        //accuracy
        if (0 == counter % 100) {
            error = 0.0;
            for (int i = 0; i < nValidationRecords; ++i) {
                double vmodel = 0.0;
                for (int j = 0; j < nAddends; ++j) {
                    vmodel += addends[j]->ComputeUsingInput(inputs_validation[i], true);
                }
                error += (target_validation[i] - vmodel) * (target_validation[i] - vmodel);
            }
            error /= nValidationRecords;
            error = sqrt(error) / (targetMax - targetMin);
        }

        printf("%d %d %f %f\n", counter, index[0], residual[index[0]], error);
        ++counter;
    }
In this code the record with largest residual error is selected and used for training, so the model is updated, then another record with largest residual error is selected and so on. This is sort of theoretical limit. In reality because of some overhead, it could be three or two times faster than sequential code. It is better than nothing, but not the desired solution yet.