C語言中文網 目錄

C語言線程間通信

C11 標準為線程間通信提供了條件變量(condition variable)。線程可以使用條件變量,以等待來自另一個線程的通知,通知告知了指定的條件已被滿足。例如,這類通知可能代表某些數據已經準備好進行處理。

條件變量由類型為 cnd_t 的對象表示,并配合互斥一起使用。一般過程如下:線程獲得互斥,然后測試條件。如果條件不滿足,則線程繼續等待條件變量(釋放互斥),直到另一個線程再次喚醒它,然后該線程再次獲得互斥,并再次測試條件,重復上述過程,直到條件滿足。

頭文件 threads.h 定義了使用條件變量的函數,它們如下所示:

int cnd_init(cnd_t*cond);
初始化 cond 引用的條件變量。

void cnd_destroy(cnd_t*cond);
釋放指定條件變量使用的所有資源。

int cnd_signal(cnd_t*cond);
在等待指定條件變量的任意數量的線程中,喚醒其中一個線程。

int cnd_broadcast(cnd_t*cond);
喚醒所有等待指定條件變量的線程。

int cnd_wait(cnd_t*cond,mtx_t*mtx);
阻塞正在調用的線程,并釋放指定的互斥。在調用 cnd_wait()之前,線程必須持有互斥。如果另一線程通過發送一個信號解除當前線程的阻塞(也就是說,通過指定同樣的條件變量作為參數調用 cond_signal()或 cnd_broadcast()),那么調用 cnd_wait()的線程在 cnd_wait()返回之前會再次獲得互斥。

int cnd_timedwait(cnd_t*restrict cond,mtx_t*restrict mtx,const struct timespec*restrict ts);
與 cnd_wait()類似,cnd_timedwait()阻塞調用它們的線程,但僅維持由參數 ts 指定的時間。可以通過調用函數 timespec_get()獲得一個 struct timespec 對象,它表示當前時間。

除 cnd_destroy()以外的所有條件變量函數,如果它們引發錯誤,則返回值 thrd_error,否則返回值 thrd_success。當時間達到限定值時,函數 cnd_timedwait()也會返回值 thrd_timedout。

例 1 與例 2 中的程序展示了在常見的“生產者-消費者”模型中使用條件變量。程序為每個生產者和消費者開啟一個新線程。生產者將一個新產品(在我們的示例中,新產品為一個 int 變量)放入一個環形緩沖區中,假設這個緩沖區沒有滿,然后通知等待的消費者:產品已經準備好。每個消費者從該緩沖區中取出產品,然后將實際情況通知給正在等待的生產者。

在任一特定時間,只有一個線程可以修改環形緩沖器。因此,在函數 bufPut()和 bufGet()間將存在線程同步問題,函數 bufPut()將一個元素插入到緩沖區,函數 buf-Get()將一個元素從緩沖區移除。

有兩個條件變量:生產者等待其中一個條件變量,以判斷緩沖器是否滿了;消費者等待另一個條件變量,以判斷緩沖器是否空了。緩沖區的所有必需元素都包括在結構 Buffer 中。函數 bufInit()初始化具有指定大小的 Buffer 對象,而函數 bufDestroy()銷毀 Buffer 對象。

【例1】用于“生產者-消費者”模型的環形緩沖區
/* buffer.h
* 用于線程安全緩沖區的所有聲明
*/
#include <stdbool.h>
#include <threads.h>

typedef struct Buffer
{
    int *data;                          // 指向數據數組的指針
    size_t size, count;                 // 元素數量的最大值和當前值
    size_t tip, tail;                   // tip = 下一個空點的索引
    mtx_t mtx;                          // 一個互斥
    cnd_t cndPut, cndGet;               // 兩個條件變量
} Buffer;

bool bufInit( Buffer *bufPtr, size_t size );
void bufDestroy(Buffer *bufPtr);

bool bufPut(Buffer *bufPtr, int data);
bool bufGet(Buffer *bufPtr, int *dataPtr, int sec);
/* -------------------------------------------------------------
* buffer.c
* 定義用于處理Buffer的函數
*/
#include "buffer.h"
#include <stdlib.h>                       // 為了使用malloc()和free()
bool bufInit( Buffer *bufPtr, size_t size)
{
    if ((bufPtr->data = malloc( size * sizeof(int))) == NULL)
       return false;
    bufPtr->size = size;
    bufPtr->count = 0;
    bufPtr->tip = bufPtr->tail = 0;
    return    mtx_init( &bufPtr->mtx, mtx_plain) == thrd_success
           && cnd_init( &bufPtr->cndPut) == thrd_success
           && cnd_init( &bufPtr->cndGet) == thrd_success;
}

void bufDestroy(Buffer *bufPtr)
{
    cnd_destroy( &bufPtr->cndGet );
    cnd_destroy( &bufPtr->cndPut );
    mtx_destroy( &bufPtr->mtx );
    free( bufPtr->data );
}

// 在緩沖區中插入一個新元素
bool bufPut(Buffer *bufPtr, int data)
{
    mtx_lock( &bufPtr->mtx );

    while (bufPtr->count == bufPtr->size)
       if (cnd_wait( &bufPtr->cndPut, &bufPtr->mtx ) != thrd_success)
          return false;

    bufPtr->data[bufPtr->tip] = data;
    bufPtr->tip = (bufPtr->tip + 1) % bufPtr->size;
    ++bufPtr->count;

    mtx_unlock( &bufPtr->mtx );
    cnd_signal( &bufPtr->cndGet );

    return true;
}

// 從緩沖區中移除一個元素
// 如果緩沖區是空的,則等待不超過sec秒
bool bufGet(Buffer *bufPtr, int *dataPtr, int sec)
{
    struct timespec ts;
    timespec_get( &ts, TIME_UTC );             // 當前時間
    ts.tv_sec += sec;                              // + sec秒延時

    mtx_lock( &bufPtr->mtx );
    while ( bufPtr->count == 0 )
       if (cnd_timedwait(&bufPtr->cndGet,
                         &bufPtr->mtx, &ts) != thrd_success)
          return false;

    *dataPtr = bufPtr->data[bufPtr->tail];
    bufPtr->tail = (bufPtr->tail + 1) % bufPtr->size;
    --bufPtr->count;

    mtx_unlock( &bufPtr->mtx );
    cnd_signal( &bufPtr->cndPut );

    return true;
}

例 2 中的 main()函數創建了一個緩沖區,并啟動了若干個生產者和消費者線程,給予每個線程一個識別號碼和一個指向緩沖區的指針。每個生產者線程創建一定數量的“產品”,然后用一個 return 語句退出。一個消費者線程如果在給定延時期間無法獲得產品以進行消費,則直接返回。

【例2】啟動生產者和消費者線程
// producer_consumer.c
#include "buffer.h"
#include <stdio.h>
#include <stdlib.h>

#define NP 2                             // 生產者的數量
#define NC 3                             // 消費者的數量

int producer(void *);                    // 線程函數
int consumer(void *);

struct Arg { int id; Buffer *bufPtr; };          // 線程函數的參數
_Noreturn void errorExit(const char* msg)
{
    fprintf(stderr, "%s\n", msg); exit(0xff);
}

int main(void)
{
    printf("Producer-Consumer Demo\n\n");
    Buffer buf;                                  // 為5個產品創建一個緩沖區
    bufInit( &buf, 5 );

    thrd_t prod[NP], cons[NC];           // 線程
    struct Arg prodArg[NP], consArg[NC]; // 線程的參數
    int i = 0, res = 0;

    for ( i = 0; i < NP; ++i )                // 啟動生產者
    {
        prodArg[i].id = i+1, prodArg[i].bufPtr = &buf;
        if (thrd_create( &prod[i], producer, &prodArg[i] ) != thrd_success)
           errorExit("Thread error.");
    }

    for ( i = 0; i < NC; ++i )                // 啟動消費者
    {
       consArg[i].id = i+1, consArg[i].bufPtr = &buf;
       if ( thrd_create( &cons[i], consumer, &consArg[i] ) != thrd_success)
          errorExit("Thread error.");
    }

    for ( i = 0; i < NP; ++i )                // 等待線程結束
      thrd_join(prod[i], &res),
      printf("\nProducer %d ended with result %d.\n", prodArg[i].id, res);

      for ( i = 0; i < NC; ++i )
         thrd_join(cons[i], &res),
         printf("Consumer %d ended with result %d.\n", consArg[i].id, res);
      bufDestroy( &buf );
      return 0;
}

int producer(void *arg)                         // 生產者線程函數
{
    struct Arg *argPtr = (struct Arg *)arg;
    int id = argPtr->id;
    Buffer *bufPtr = argPtr->bufPtr;
    int count = 0;
    for (int i = 0; i < 10; ++i)
    {
        int data = 10*id + i;
        if (bufPut( bufPtr, data ))
            printf("Producer %d produced %d\n", id, data), ++count;
        else
        { fprintf( stderr,
                 "Producer %d: error storing %d\n", id, data);
          return -id;
        }
    }
    return count;
}

int consumer(void *arg)                         // 消費者線程函數
{
    struct Arg *argPtr = (struct Arg *)arg;
    int id = argPtr->id;
    Buffer *bufPtr = argPtr->bufPtr;
  
    int count = 0;
    int data = 0;
    while (bufGet( bufPtr, &data, 2 ))
    {
        ++count;
        printf("Consumer %d consumed %d\n", id, data);
    }
    return count;
}

精美而實用的網站,提供C語言C++STLLinuxShellJavaGo語言等教程,以及socketGCCviSwing設計模式JSP等專題。

Copyright ?2011-2018 biancheng.net, 陜ICP備15000209號

底部Logo