高性能计算程序设计(2) 秋季2024
by 22336173罗弘杰
多线程程序计时
0
1
2
3
4
5
#include <ctime>
#include <sys/time.h>
gettimeofday(&start, NULL);
float start_time = clock();
gettimeofday(&end, NULL);
float end_time = clock();
? gettimeofday()来自linux系统库,用以计算操作系统视角下程序运行时间。
? clock()用来计算CPU时间(主要和问题规模相关)。
通过 Pthreads实现通用矩阵乘法
通过Pthreads实现通用矩阵乘法(Lab0)的并行版本,Pthreads并行线程从1增加至8,矩阵规模从512增加至2048.
通用矩阵乘法(GEMM)通常定义为:
\[C = AB\] \[C_{m,n} = \sum_{n = 1}^{N}{A_{m,n}B_{n,k}}\]输入:M , N, K三个整数(512 ~2048)
问题描述:随机生成M*N和N*K的两个矩阵A,B,对这两个矩阵做乘法得到矩阵C.
输出:A,B,C三个矩阵以及矩阵计算的时间(单位:(秒))
? 主函数中,创建线程函数,然后多线程并行处理数据
0
1
2
3
4
5
6
7
8
9
10
11
gettimeofday(&start, NULL);
pthread_t *pthread_set = (pthread_t*)malloc(THREAD_NUM * sizeof(pthread_t));
for (int i = 0; i < THREAD_NUM; i++) {
pthread_create(&pthread_set[i], NULL, thread_matrix_multiply, (void *)(intptr_t)i);
}
float start_time = clock();
for (int i = 0; i < THREAD_NUM; i++) {
pthread_join(pthread_set[i], NULL);
}
gettimeofday(&end, NULL);
float end_time = clock();
? 调用的矩阵计算函数
0
1
2
3
4
5
6
7
8
void *thread_matrix_multiply(void *arg) {
int threadId = (intptr_t)arg;
int rowStart = threadId * AVG_ROW;
int rowEnd = (threadId == THREAD_NUM - 1) ? M : (threadId + 1) * AVG_ROW; // 最后一个线程处理剩余的行
// printf("start: %d end: %d\n", rowStart, rowEnd);
matrix_multiply_s(rowEnd - rowStart, N, K, A + N*rowStart, B, C +rowStart);
pthread_exit(0);
}
线程数 | 128 | 256 | 512 | 1024 | 2048 |
---|---|---|---|---|---|
1 | 0.0071 | 0.0498 | 0.3294 | 2.5041 | 21.2992 |
2 | 0.0050 | 0.0404 | 0.2097 | 1.6313 | 12.3914 |
3 | 0.0048 | 0.0218 | 0.1825 | 1.2449 | 7.6367 |
4 | 0.0025 | 0.0213 | 0.1105 | 0.9706 | 6.7229 |
5 | 0.0055 | 0.0241 | 0.1073 | 0.7554 | 6.3505 |
6 | 0.0048 | 0.0219 | 0.1152 | 0.7427 | 5.2654 |
7 | 0.0038 | 0.0198 | 0.0996 | 0.7533 | 5.2605 |
8 | 0.0032 | 0.0172 | 0.1157 | 0.7763 | 4.7793 |
基于Pthreads的数组求和
使用多个线程对数组a[1000]求和的简单程序,演示Pthreads的用法。创建n个线程,每个线程通过共享变量global_index获取a数组的下一个未加元素,注意不能在临界区(critical section)外访问共享变量global_index,避免出现race condition。
主函数
? 避免时间太小,难以比较大小,扩大数组到 10000.测试10次取平均值。
0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
//全局变量
#define THREAD_NUM 8
const int ARRAY_SIZE = 100000;
const int MAX_ELEMENTS = 1; //根据这个变量来定义一次取出多少个下标
float *Numbers; //数组
int Global_index = 0; // 共享变量
float Sum = 0; // 共享结果
float Local_sum[THREAD_NUM] = {0}; //局部变量sum
pthread_mutex_t mutex;
int main(int argc, char *argv[]) {
double totle_time = 0;
Numbers = (float *)malloc(ARRAY_SIZE * sizeof(float)); // 开辟1000个数组空间
// 初始化数组
for (int i = 0; i < ARRAY_SIZE; i++) {
Numbers[i] = i + 1.0 ;
}
int Test_Rnd = 10; //计算10次取平均
int ori = Test_Rnd;
struct timeval start, end;
while(Test_Rnd --){
pthread_mutex_init(&mutex, NULL); // 初始化互斥锁
pthread_t *pthread_set = (pthread_t *)malloc(THREAD_NUM * sizeof(pthread_t)); // 开辟多线程
for (int i = 0; i < THREAD_NUM; i++) {
pthread_create(&pthread_set[i], NULL, thread_Numbers_Add, (void *)(intptr_t)i);
}
gettimeofday(&start, NULL);
// 等待所有线程完成
for (int i = 0; i < THREAD_NUM; i++) {
pthread_join(pthread_set[i], NULL);
Sum += Local_sum[i];
Local_sum[i] = 0;
}
gettimeofday(&end, NULL);
double time_use = (end.tv_sec - start.tv_sec) * 1000000 + (end.tv_usec - start.tv_usec); // 微秒
time_use /= 1000000;
printf("Index: %d, Sum: %.2f, Time: %.8f\n", ori - Test_Rnd, Sum, time_use);
totle_time += time_use;
free(pthread_set);
Sum = 0;
Global_index = 0; //恢复数据为0
}
0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
const int ARRAY_SIZE = 100000;
const int MAX_ELEMENTS = 1;
float *Numbers; //数组
int Global_index = 0; // 共享变量
float Sum = 0; // 共享结果
float Local_sum[THREAD_NUM] = {0};
pthread_mutex_t mutex;
void *thread_Numbers_Add(void *arg) {
while (Global_index <= ARRAY_SIZE) {
int rank = (intptr_t)arg;
pthread_mutex_lock(&mutex); // 进入临界区
if (Global_index >= ARRAY_SIZE) {
pthread_mutex_unlock(&mutex);
break; // 如果已处理所有元素,退出循环
}
int start = Global_index;
int end = start + std::min(MAX_ELEMENTS, ARRAY_SIZE - Global_index); // 确保不越界
Global_index += end - start;
pthread_mutex_unlock(&mutex); // 退出临界区
for (int i = start; i < end; i++) {
Local_sum[rank] += Numbers[i]; // 求和操作
}
}
return NULL;
}
? 时间测试如下:
重写上面的例子,使得各线程可以一次最多提取10个连续的数组元素,取数组元素策略可以自己定义,可以是随机读取【1-10】个连续的元素,也可以是固定数量的元素,并进行累加求和,从而减少对下标的访问
? 只要把全局变量MAX ELEMENTS改为10, 就可以达到一次检索10个元素局部求和的效果。
测试时间如下:
Pthreads求解二次方程组的根
编写一个多线程程序来求解二次方程组???+??+?=0的根,使用下面的公式
中间值被不同的线程计算,使用条件变量来识别何时所有的线程都完成了计算
思想:
- ? 线程1计算b^2,线程2计算-4ac,最后线程3计算开方,最后线程0计算剩下的运算:
- ? 需要两个条件变量,分别对应B方和-4AC。
0
1
2
3
4
5
6
7
8
9
10
int b_pow_2;
int minus_4ac;
double sqrt_result;
pthread_mutex_t mutex_b_pow_2; //for b^2
pthread_mutex_t mutex_minus_4ac; //for -4ac
pthread_mutex_t mutex_sqrt_result; //for sqrt result
pthread_cond_t cond_b_pow_2; //for b^2 ready
pthread_cond_t cond_minus_4ac; //for -4ac ready
线程函数编写:
前置知识:
? pthread_cond_wait(&cond,&mutex)是一个挂起函数,一旦挂起后续函数无法执行,直到sinal发生;那么如果siganl发生早于wait,则wait就不会再进行,就发生了无限期等待。
? 有两个方法:
? 1,使用while{}或者if{}加一层判断,在signal后,保证不会出现相关wait(否则会导致饿死!)
? 2, 使用互斥锁,保证先wait().后signal();
? 这里注意在访问条件变量的时候需要注意使用互斥锁,避免数据冲突。这里有两个版本
0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
void* mul_fun(void* arg) {
int con = *(static_cast<int*>(arg));
if (con == 1) { // calculate b^2
pthread_mutex_lock(&mutex_b_pow_2);
int coef1 = *(static_cast<int*>(arg) + 1);
b_pow_2 = coef1 * coef1;
pthread_cond_signal(&cond_b_pow_2);
pthread_mutex_unlock(&mutex_b_pow_2);
} else { // calculate -4ac
pthread_mutex_lock(&mutex_minus_4ac);
int coef1 = *(static_cast<int*>(arg) + 1);
int coef2 = *(static_cast<int*>(arg) + 2);
minus_4ac = -4 * coef1 * coef2;
pthread_cond_signal(&cond_minus_4ac);
pthread_mutex_unlock(&mutex_minus_4ac);
}
return NULL;
}
void* sqrt_fun(void* arg) {
pthread_mutex_lock(&mutex_b_pow_2);
while(b_pow_2 == 0){ //加入while确保前提变量已经被正确修改
pthread_cond_wait(&cond_b_pow_2, &mutex_b_pow_2);
}
pthread_mutex_unlock(&mutex_b_pow_2);
pthread_mutex_lock(&mutex_minus_4ac);
while(minus_4ac == 0){
pthread_cond_wait(&cond_minus_4ac, &mutex_minus_4ac);
}
pthread_mutex_unlock(&mutex_minus_4ac);
// Now it's safe to calculate the square root
pthread_mutex_lock(&mutex_sqrt_result);
printf("%d, %d\n", b_pow_2, minus_4ac);
sqrt_result = sqrt(b_pow_2 + minus_4ac);
pthread_mutex_unlock(&mutex_sqrt_result);
return NULL;
}
0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
pthread_mutex_lock(&mutex_b_pow_2);
pthread_mutex_lock(&mutex_minus_4ac); //在主函数声明
void* mul_fun(void* arg) {
int con = *(static_cast<int*>(arg));
if (con == 1) { // calculate b^2
pthread_mutex_lock(&mutex_b_pow_2);
int coef1 = *(static_cast<int*>(arg) + 1);
b_pow_2 = coef1 * coef1;
pthread_cond_signal(&cond_b_pow_2);
pthread_mutex_unlock(&mutex_b_pow_2);
} else { // calculate -4ac
pthread_mutex_lock(&mutex_minus_4ac);
int coef1 = *(static_cast<int*>(arg) + 1);
int coef2 = *(static_cast<int*>(arg) + 2);
minus_4ac = -4 * coef1 * coef2;
pthread_cond_signal(&cond_minus_4ac);
pthread_mutex_unlock(&mutex_minus_4ac);
}
return NULL;
}
void* sqrt_fun(void* arg) {
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
// 设置等待时间为 5 秒
ts.tv_sec += 5;
// Wait for both b^2 and -4ac to be ready
// pthread_mutex_lock(&mutex_b_pow_2);
pthread_cond_wait(&cond_b_pow_2, &mutex_b_pow_2);
// pthread_mutex_unlock(&mutex_b_pow_2);
// pthread_mutex_lock(&mutex_minus_4ac);
pthread_cond_wait(&cond_minus_4ac, &mutex_minus_4ac);
// pthread_mutex_unlock(&mutex_minus_4ac);
// Now it's safe to calculate the square root
pthread_mutex_lock(&mutex_sqrt_result);
printf("%d, %d\n", b_pow_2, minus_4ac);
sqrt_result = sqrt(b_pow_2 + minus_4ac);
pthread_mutex_unlock(&mutex_sqrt_result);
return NULL;
}
结果
编写一个多线程程序实现Monte-carlo方法
参考课本137页4.2题和本次实验作业的补充材料。
思路:
? 使用N个线程,并行采样M次二维【0,1】采样,计算是否满足y<=x^2,若满足,计数加一(全局共享变量需要用互斥锁保护),最后用频率计算概率,根据几何概型,可以估计积分面积。
? 由于要使用cpu,所以需要把线程绑定到cpu, 这方面是一个新的知识,需要总结
? 本实验在一个12核心16线程的机器上运行。
? 线程绑定cpu函数
0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
//reference:
//https://stackoverflow.com/questions/1407786/how-to-set-cpu-affinity-of-a-particular-pthread
#include <pthread.h>
#include <unistd.h>
#include <sched.h>
#include <errno.h>
int stick_this_thread_to_core(int rank) {
int num_cores = sysconf(_SC_NPROCESSORS_ONLN);
int core_id = rank % num_cores; //将线程哈希绑定到一个固定的core
if (core_id < 0 || core_id >= num_cores)
return EINVAL; //need #include <errno.h>
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(core_id, &cpuset);
pthread_t current_thread = pthread_self();
printf("tie the rank %d thread to cpu id %d of total core sum of %d\n", rank, core_id, num_cores);
return pthread_setaffinity_np(current_thread, sizeof(cpu_set_t), &cpuset);
}
? 实验可以看到确实可以绑定到固定cpu
? 频率计算线程函数:
0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
void * compute_thread(void *arg){
int rank = (intptr_t)arg; //the rank of the thread
stick_this_thread_to_core(rank);
printf("%d compute_thread start to work\n", rank+1);
double x , y;
int time = avg_time;
int local_count = 0;
while(time--){
x = rand() / (double)RAND_MAX;
y = rand() / (double)RAND_MAX;
if (y <= x *x ){
local_count ++;
}
}
pthread_mutex_lock(&mutex);
total_count += local_count;
printf("%d compute_thread finish work with local_count of %d\n", rank+1, local_count);
pthread_mutex_unlock(&mutex);
return NULL;
}
结果
This work is licensed under a Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International License.