HPC

HPC lab2 Pthreads并行矩阵乘法

Posted by RogersLuo's Blog on October 9, 2024

高性能计算程序设计(2) 秋季2024

by 22336173罗弘杰

多线程程序计时

0
1
2
3
4
5
#include <ctime>
#include <sys/time.h>
	gettimeofday(&start, NULL);
    float start_time = clock();
    gettimeofday(&end, NULL);
    float end_time = clock();

? gettimeofday()来自linux系统库,用以计算操作系统视角下程序运行时间。

? clock()用来计算CPU时间(主要和问题规模相关)。

通过 Pthreads实现通用矩阵乘法

通过Pthreads实现通用矩阵乘法(Lab0)的并行版本,Pthreads并行线程从1增加至8,矩阵规模从512增加至2048.

通用矩阵乘法(GEMM)通常定义为:

\[C = AB\] \[C_{m,n} = \sum_{n = 1}^{N}{A_{m,n}B_{n,k}}\]

输入:M , N, K三个整数(512 ~2048)

问题描述:随机生成M*N和N*K的两个矩阵A,B,对这两个矩阵做乘法得到矩阵C.

输出:A,B,C三个矩阵以及矩阵计算的时间(单位:(秒))

? 主函数中,创建线程函数,然后多线程并行处理数据

0
1
2
3
4
5
6
7
8
9
10
11
    gettimeofday(&start, NULL);
    pthread_t *pthread_set = (pthread_t*)malloc(THREAD_NUM * sizeof(pthread_t));
    for (int i = 0; i < THREAD_NUM; i++) {
        pthread_create(&pthread_set[i], NULL, thread_matrix_multiply, (void *)(intptr_t)i);
    }

    float start_time = clock();
    for (int i = 0; i < THREAD_NUM; i++) {
        pthread_join(pthread_set[i], NULL);
    }
    gettimeofday(&end, NULL);
    float end_time = clock();

? 调用的矩阵计算函数

0
1
2
3
4
5
6
7
8
void *thread_matrix_multiply(void *arg) {
    int threadId = (intptr_t)arg;
    int rowStart = threadId * AVG_ROW;
    int rowEnd = (threadId == THREAD_NUM - 1) ? M : (threadId + 1) * AVG_ROW; // 最后一个线程处理剩余的行
    // printf("start: %d end: %d\n", rowStart, rowEnd);
    matrix_multiply_s(rowEnd - rowStart, N, K, A + N*rowStart, B, C +rowStart);
    pthread_exit(0); 
}

线程数 128 256 512 1024 2048
1 0.0071 0.0498 0.3294 2.5041 21.2992
2 0.0050 0.0404 0.2097 1.6313 12.3914
3 0.0048 0.0218 0.1825 1.2449 7.6367
4 0.0025 0.0213 0.1105 0.9706 6.7229
5 0.0055 0.0241 0.1073 0.7554 6.3505
6 0.0048 0.0219 0.1152 0.7427 5.2654
7 0.0038 0.0198 0.0996 0.7533 5.2605
8 0.0032 0.0172 0.1157 0.7763 4.7793

基于Pthreads的数组求和

使用多个线程对数组a[1000]求和的简单程序,演示Pthreads的用法。创建n个线程,每个线程通过共享变量global_index获取a数组的下一个未加元素,注意不能在临界区(critical section)外访问共享变量global_index,避免出现race condition。

主函数

? 避免时间太小,难以比较大小,扩大数组到 10000.测试10次取平均值。

0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
//全局变量
#define THREAD_NUM 8
const int ARRAY_SIZE = 100000;
const int MAX_ELEMENTS = 1; //根据这个变量来定义一次取出多少个下标
float *Numbers; //数组
int Global_index = 0; // 共享变量
float Sum = 0;        // 共享结果
float Local_sum[THREAD_NUM] = {0};  //局部变量sum
pthread_mutex_t mutex;

int main(int argc, char *argv[]) {
    double totle_time = 0;
    Numbers = (float *)malloc(ARRAY_SIZE * sizeof(float)); // 开辟1000个数组空间
    // 初始化数组
    for (int i = 0; i < ARRAY_SIZE; i++) {
        Numbers[i] = i + 1.0 ;
    }
    int Test_Rnd = 10; //计算10次取平均
    int ori = Test_Rnd;
    struct timeval start, end;
    while(Test_Rnd --){
        pthread_mutex_init(&mutex, NULL); // 初始化互斥锁
        pthread_t *pthread_set = (pthread_t *)malloc(THREAD_NUM * sizeof(pthread_t)); // 开辟多线程

        for (int i = 0; i < THREAD_NUM; i++) {
            pthread_create(&pthread_set[i], NULL, thread_Numbers_Add, (void *)(intptr_t)i);
        }
        gettimeofday(&start, NULL);
        // 等待所有线程完成
        for (int i = 0; i < THREAD_NUM; i++) {
            pthread_join(pthread_set[i], NULL);
            Sum += Local_sum[i];
            Local_sum[i] = 0;
        }
        gettimeofday(&end, NULL);
        double time_use = (end.tv_sec - start.tv_sec) * 1000000 + (end.tv_usec - start.tv_usec); // 微秒
        time_use /= 1000000;
        printf("Index: %d, Sum: %.2f, Time: %.8f\n", ori - Test_Rnd, Sum, time_use);
        totle_time += time_use;
        free(pthread_set);
        Sum = 0;
        Global_index = 0; //恢复数据为0
        }

0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
const int ARRAY_SIZE = 100000;
const int MAX_ELEMENTS = 1;
float *Numbers; //数组
int Global_index = 0; // 共享变量
float Sum = 0;        // 共享结果
float Local_sum[THREAD_NUM] = {0};
pthread_mutex_t mutex;

void *thread_Numbers_Add(void *arg) {
    while (Global_index <= ARRAY_SIZE) {
        int rank = (intptr_t)arg;
        pthread_mutex_lock(&mutex); // 进入临界区
        if (Global_index >= ARRAY_SIZE) {
            pthread_mutex_unlock(&mutex);
            break; // 如果已处理所有元素,退出循环
        }
        int start = Global_index;
        int end = start + std::min(MAX_ELEMENTS, ARRAY_SIZE - Global_index); // 确保不越界
        Global_index += end - start;
        pthread_mutex_unlock(&mutex); // 退出临界区

        for (int i = start; i < end; i++) {
            Local_sum[rank] += Numbers[i]; // 求和操作
        }
    }
    return NULL;
}

? 时间测试如下:

image-20241018202539446

重写上面的例子,使得各线程可以一次最多提取10个连续的数组元素,取数组元素策略可以自己定义,可以是随机读取【1-10】个连续的元素,也可以是固定数量的元素,并进行累加求和,从而减少对下标的访问

? 只要把全局变量MAX ELEMENTS改为10, 就可以达到一次检索10个元素局部求和的效果。

测试时间如下:

image-20241018202727042

Pthreads求解二次方程组的根

编写一个多线程程序来求解二次方程组???+??+?=0的根,使用下面的公式

image-20241018203053676

中间值被不同的线程计算,使用条件变量来识别何时所有的线程都完成了计算

思想:

  • ? 线程1计算b^2,线程2计算-4ac,最后线程3计算开方,最后线程0计算剩下的运算:
  • ? 需要两个条件变量,分别对应B方和-4AC。
0
1
2
3
4
5
6
7
8
9
10
int b_pow_2;
int minus_4ac;
double sqrt_result;

pthread_mutex_t mutex_b_pow_2; //for b^2
pthread_mutex_t mutex_minus_4ac; //for -4ac
pthread_mutex_t mutex_sqrt_result; //for sqrt result

pthread_cond_t cond_b_pow_2; //for b^2 ready
pthread_cond_t cond_minus_4ac; //for -4ac ready

线程函数编写:

前置知识:

? pthread_cond_wait(&cond,&mutex)是一个挂起函数,一旦挂起后续函数无法执行,直到sinal发生;那么如果siganl发生早于wait,则wait就不会再进行,就发生了无限期等待。

? 有两个方法:

? 1,使用while{}或者if{}加一层判断,在signal后,保证不会出现相关wait(否则会导致饿死!)

? 2, 使用互斥锁,保证先wait().后signal();

? 这里注意在访问条件变量的时候需要注意使用互斥锁,避免数据冲突。这里有两个版本

0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
void* mul_fun(void* arg) {
    int con = *(static_cast<int*>(arg));
    if (con == 1) { // calculate b^2
        pthread_mutex_lock(&mutex_b_pow_2);
        int coef1 = *(static_cast<int*>(arg) + 1);
        b_pow_2 = coef1 * coef1;
        pthread_cond_signal(&cond_b_pow_2);
        pthread_mutex_unlock(&mutex_b_pow_2);
    } else { // calculate -4ac
        pthread_mutex_lock(&mutex_minus_4ac);
        int coef1 = *(static_cast<int*>(arg) + 1);
        int coef2 = *(static_cast<int*>(arg) + 2);
        minus_4ac = -4 * coef1 * coef2;
        pthread_cond_signal(&cond_minus_4ac);
        pthread_mutex_unlock(&mutex_minus_4ac);
    }
    return NULL;
}

void* sqrt_fun(void* arg) {
    pthread_mutex_lock(&mutex_b_pow_2);
    while(b_pow_2 == 0){ //加入while确保前提变量已经被正确修改
        pthread_cond_wait(&cond_b_pow_2, &mutex_b_pow_2);
    }
    pthread_mutex_unlock(&mutex_b_pow_2);

    pthread_mutex_lock(&mutex_minus_4ac);
    while(minus_4ac == 0){
        pthread_cond_wait(&cond_minus_4ac, &mutex_minus_4ac);
    }
    pthread_mutex_unlock(&mutex_minus_4ac);

    // Now it's safe to calculate the square root
    pthread_mutex_lock(&mutex_sqrt_result);
    printf("%d, %d\n", b_pow_2, minus_4ac);
    sqrt_result = sqrt(b_pow_2 + minus_4ac);
    pthread_mutex_unlock(&mutex_sqrt_result);
    return NULL;
}

0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
    pthread_mutex_lock(&mutex_b_pow_2);
    pthread_mutex_lock(&mutex_minus_4ac); //在主函数声明

void* mul_fun(void* arg) {
    int con = *(static_cast<int*>(arg));
    if (con == 1) { // calculate b^2
        pthread_mutex_lock(&mutex_b_pow_2);
        int coef1 = *(static_cast<int*>(arg) + 1);
        b_pow_2 = coef1 * coef1;
        pthread_cond_signal(&cond_b_pow_2);
        pthread_mutex_unlock(&mutex_b_pow_2);
    } else { // calculate -4ac
        pthread_mutex_lock(&mutex_minus_4ac);
        int coef1 = *(static_cast<int*>(arg) + 1);
        int coef2 = *(static_cast<int*>(arg) + 2);
        minus_4ac = -4 * coef1 * coef2;
        pthread_cond_signal(&cond_minus_4ac);
        pthread_mutex_unlock(&mutex_minus_4ac);
    }
    return NULL;
}

void* sqrt_fun(void* arg) {
    struct timespec ts;
    clock_gettime(CLOCK_REALTIME, &ts);
    // 设置等待时间为 5 秒
    ts.tv_sec += 5;
    // Wait for both b^2 and -4ac to be ready
    // pthread_mutex_lock(&mutex_b_pow_2);

    pthread_cond_wait(&cond_b_pow_2, &mutex_b_pow_2);

    // pthread_mutex_unlock(&mutex_b_pow_2);

    // pthread_mutex_lock(&mutex_minus_4ac);

    pthread_cond_wait(&cond_minus_4ac, &mutex_minus_4ac);

    // pthread_mutex_unlock(&mutex_minus_4ac);

    // Now it's safe to calculate the square root
    pthread_mutex_lock(&mutex_sqrt_result);
    printf("%d, %d\n", b_pow_2, minus_4ac);
    sqrt_result = sqrt(b_pow_2 + minus_4ac);
    pthread_mutex_unlock(&mutex_sqrt_result);
    return NULL;
}

结果image-20241022230406893

编写一个多线程程序实现Monte-carlo方法

参考课本137页4.2题和本次实验作业的补充材料。

思路:

? 使用N个线程,并行采样M次二维【0,1】采样,计算是否满足y<=x^2,若满足,计数加一(全局共享变量需要用互斥锁保护),最后用频率计算概率,根据几何概型,可以估计积分面积。

? 由于要使用cpu,所以需要把线程绑定到cpu, 这方面是一个新的知识,需要总结

? 本实验在一个12核心16线程的机器上运行。

? 线程绑定cpu函数

0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
//reference:
//https://stackoverflow.com/questions/1407786/how-to-set-cpu-affinity-of-a-particular-pthread
#include <pthread.h>
#include <unistd.h>
#include <sched.h>
#include <errno.h>
int stick_this_thread_to_core(int rank) {
   int num_cores = sysconf(_SC_NPROCESSORS_ONLN);
   int core_id = rank % num_cores; //将线程哈希绑定到一个固定的core
   if (core_id < 0 || core_id >= num_cores)
      return EINVAL; //need #include <errno.h>

   cpu_set_t cpuset;
   CPU_ZERO(&cpuset);
   CPU_SET(core_id, &cpuset);

   pthread_t current_thread = pthread_self(); 
   printf("tie the rank %d thread to cpu id %d of total core sum of %d\n", rank, core_id, num_cores);   
   return pthread_setaffinity_np(current_thread, sizeof(cpu_set_t), &cpuset);
}

? 实验可以看到确实可以绑定到固定cpuimage-20241023141500262

? 频率计算线程函数:

0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
void * compute_thread(void *arg){
    int rank = (intptr_t)arg; //the rank of the thread
    stick_this_thread_to_core(rank); 

    printf("%d compute_thread start to work\n", rank+1);
    double x , y;
    int time = avg_time;
    int local_count = 0;
    while(time--){
        x = rand() / (double)RAND_MAX;
        y = rand() / (double)RAND_MAX;
        if (y <= x *x ){
            local_count ++;
        }
    }
    pthread_mutex_lock(&mutex);
    total_count += local_count;
    printf("%d compute_thread finish work with local_count of %d\n", rank+1, local_count);
    pthread_mutex_unlock(&mutex); 

    return NULL;   
}

image-20241023141539238

结果

image-20241023141348237

Creative Commons License本作品采用知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议进行许可。
This work is licensed under a Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International License.