在前面講共享內存的IPC時曾說共享內存本身不具備同步機制,如果要實現同步需要使用信號量等手段來實現之,現在我們就來說說使用posix的信號量來實現posix多進程共享內存的同步。其實信號量也可以使用在同一進程的不同線程之間。
信號量是一個包含非負整數的變量,有兩個原子操作,wait和post,也可以說是P操作和V操作,P操作將信號量減一,V操作將信號量加一。如果信號量大於0,P操作直接減一,否則如果等於0,P操作將調用進程(線程)阻塞起來,直到另外進程(線程)執行V操作。
信號量可以理解為一種資源的數量,通過這種資源的分配來實現同步或者互斥。當信號量的值為1時,可以實現互斥的功能,此時信號量就是二值信號量,如果信號量的值大於一時,可以實現進程(線程)併發執行。信號量和互斥鎖條件變量之間的區別是:互斥鎖必須由給它上鎖的進程(線程)來解鎖,而信號燈P操作不必由執行過它V操作的進程(線程)來執行;互斥鎖類似於二值信號量,要麼加鎖,要麼解鎖;當向條件變量發信號時,如果此時沒有等待在該條件變量上的線程,信號將丟失,而信號量不會。信號量主要用於之間同步,但也可以用在線程之間。互斥鎖和條件變量主要用於線程同步,但也可以用於進程間的同步。
POSIX信號量是一個sem_t的變量類型,分有名信號量和無名信號量,無名信號量用在共享信號量內存的情況下,比如同一個進程的不同線程之間,或父子進程之間,有名信號量隨內核持續的,所以我們可以跨多個程序操作它們。
#include <fcntl.h> /* For O_* constants */
#include <sys/stat.h> /* For mode constants */
#include <semaphore.h>
sem_t *sem_open(const char *name, int oflag);
sem_t *sem_open(const char *name, int oflag, mode_t mode, unsigned int value);
Link with -pthread
參數name為posix IPC 名稱, 參數oflag可以為0, O_CREAT 或O_CREAT | O_EXCL, 如果指定了O_CREAT,第三、四參數需要指定。mode為指定權限位,value參數指定信號量初始值,二值信號量的值通常為1,計數信號量的值通常大於1。 函數返回執行sem_t數據類型的指針,出錯返回SEM_FAILED。
###2. 函數sem_close關閉有名信號量。
#include <semaphore.h>
int sem_close(sem_t *sem);
Link with -pthread.
#include <semaphore.h>
int sem_unlink(const char *name);
Link with -pthread.
###4.函數sem_wait和函數sem_trywait測試信號量的值,如果信號量值大於0,函數將信號量的值減一併立即返回,如果信號量的值等於0,sem_wait函數開始睡眠,直到信號量的值大於0,這是再減一併立即返回,而sem_trywait函數不睡眠直接返回,並返回EAGAIN錯誤。函數原型為:
#include <semaphore.h>
int sem_wait(sem_t *sem);
int sem_trywait(sem_t *sem);
int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout);
Link with -pthread.
#include <semaphore.h>
int sem_post(sem_t *sem);
Link with -pthread.
#include <semaphore.h>
int sem_getvalue(sem_t *sem, int *sval);
Link with -pthread.
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
Link with -pthread.
sem為無名信號量地址,value為信號量初始值,pshared指定信號量是在進程的各個線程之間共享還是在進程之間共享,如果該值為0,表示在進程的線程之間共享,如果非0則在進出之間共享,無名信號量主要用在有共同祖先的進程或線程之間。
這一節使用posix共享內存來實現各個進程之間的通信,並使用posix信號量來達到互斥與同步的目的。首先我們來看看使用信號量實現對共享內存段的互斥訪問。
之前線程的訪問使用如下方式來達到對公共的內存互斥訪問:
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
......
pthread_mutex_lock(&mutex);//加鎖
......
/*share memory handle*/
......
pthread_mutex_unlock(&mutex);//解鎖
......
現在我們也使用類似方式來實現:
sem_t *sem_mutex = NULL;
......
SLN_MUTEX_SHM_LOCK(SEM_MUTEX_FILE, sem_mutex);//加鎖
......
/*share memory handle*/
......
SLN_MUTEX_SHM_UNLOCK(sem_mutex);//解鎖
......
其中SEM_MUTEX_FILE為sem_open函數需要的有名信號量名稱。
其中兩個加鎖解鎖的實現為:
#define SLN_MUTEX_SHM_LOCK(shmfile, sem_mutex) do {\
sem_mutex = sem_open(shmfile, O_RDWR | O_CREAT, 0666, 1);\
if (SEM_FAILED == sem_mutex) {\
printf("sem_open(%d): %s\n", __LINE__, strerror(errno));\
}\
sem_wait(sem_mutex);\
}while(0)
#define SLN_MUTEX_SHM_UNLOCK(sem_mutex) do {sem_post(sem_mutex);} while(0)
其實就是初始化一個二值信號量,其初始值為1,並執行wait操作,使信號量的值變為0,此時其它進程想要操作共享內存時也需要執行wait操作,但此時信號量的值為0,所以開始等待信號量的值變為1。噹噹前進程操作完共享內存後,開始解鎖,執行post操作將信號量的值加一,此時其它進程的wait可以返回了。
下面為一個互斥訪問共享內存的示例,posix共享內存實現請查看前面IPC的系列文章。 ser process:
int nms_shm_get(char* shm_file, void** shm, int mem_len)
{
int fd;
fd = shm_open(shm_file, O_RDWR | O_CREAT, 0666);
if (fd < 0) {
printf("shm_pen <%s> failed: %s\n", shm_file, strerror(errno));
return -1;
}
ftruncate(fd, mem_len);
*shm = mmap(NULL, mem_len, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
if (MAP_FAILED == *shm) {
printf("mmap: %s\n", strerror(errno));
return -1;
}
return 0;
}
int main(int argc, const char* argv[])
{
sem_t* sem_mutex = NULL;
char* str = NULL;
SLN_MUTEX_SHM_LOCK(SEM_MUTEX_FILE, sem_mutex); //加鎖
nms_shm_get(SHM_FILE, (void**)&str,
SHM_MAX_LEN); //下面三行互斥訪問共享內存
sleep(6);
snprintf(str, SHM_MAX_LEN, "posix semphore server!");
SLN_MUTEX_SHM_UNLOCK(sem_mutex); //解鎖
sleep(6);
shm_unlink(SHM_FILE);
return 0;
}
client process:
int main(int argc, const char *argv[])
{
sem_t *sem_mutex;
char *str = NULL;
SLN_MUTEX_SHM_LOCK(SEM_MUTEX_FILE, sem_mutex);
nms_shm_get(SHM_FILE, (void **)&str, SHM_MAX_LEN);
printf("client get: %s\n", str);
SLN_MUTEX_SHM_UNLOCK(sem_mutex);
return 0;
}
先啟動服務進程首先加鎖,創建共享內存並操作它,加鎖中sleep 6秒,以便測試客戶進程是否在服務進程未釋放鎖時處於等待狀態。客戶進程在服務進程啟動之後馬上啟動,此時處於等待狀態,當服務進程6秒之後解鎖,客戶進程獲得共享內存信息。再過6秒之後,服務進程刪除共享內存,客戶進程再此獲取共享內存失敗。
# ./server &
[1] 21690
# ./client
client get: posix semphore server!
# ./client
shm_open <share_memory_file> failed: No such file or directory
client get: (null)
[1]+ Done ./server
posix有名信號量創建的信號量文件和共享內存文件在/dev/shm/目錄下:
# ls /dev/shm/
sem.sem_mutex share_memory_file
在兩個進程共享數據時,當一個進程向共享內存寫入了數據後需要通知另外的進程,這就需要兩個進程之間實現同步,這裡我們給上面的程序在互斥的基礎上加上同步操作。同步也是使用posix信號量來實現。 server process:
int main(int argc, const char* argv[])
{
sem_t* sem_mutex = NULL;
sem_t* sem_consumer = NULL, *sem_productor = NULL;
int semval;
char* sharememory = NULL;
sem_consumer = sem_open(SEM_CONSUMER_FILE, O_CREAT, 0666,
0); //初始化信號量sem_consumer ,並設置初始值為0
if (SEM_FAILED == sem_consumer) {
printf("sem_open <%s>: %s\n", SEM_CONSUMER_FILE, strerror(errno));
return -1;
}
sem_productor = sem_open(SEM_PRODUCTOR_FILE, O_CREAT, 0666,
0);//初始化信號量sem_productor ,並設置初始值為0
if (SEM_FAILED == sem_productor) {
printf("sem_open <%s>: %s\n", SEM_PRODUCTOR_FILE, strerror(errno));
return -1;
}
for (;;) {//服務進程一直循環處理客戶進程請求
sem_getvalue(sem_consumer, &semval);
printf("%d waiting...\n", semval);
if (sem_wait(sem_consumer) <
0) {//如果sem_consumer為0,則阻塞在此,等待客戶進程post操作使sem_consumer大於0,此處和客戶進程同步
printf("sem_wait: %s\n", strerror(errno));
return -1;
}
printf("Get request...\n");
SLN_MUTEX_SHM_LOCK(SEM_MUTEX,
sem_mutex);//此處開始互斥訪問共享內存
nms_shm_get(SHM_FILE, (void**)&sharememory, SHM_MAX_LEN);
sleep(6);
snprintf(sharememory, SHM_MAX_LEN, "Hello, this is server's message!");
SLN_MUTEX_SHM_UNLOCK(sem_mutex);
sem_post(sem_productor);//使信號量sem_productor加一,使阻塞的客戶進程繼續執行
printf("Response request...\n");
}
sem_close(sem_consumer);
sem_close(sem_productor);
return 0;
}
client process:
int main(int argc, const char* argv[])
{
sem_t* sem_consumer = NULL, *sem_productor = NULL;
struct timespec timeout;
int ret;
char* sharememory = NULL;
sem_t* sem_mutex;
sem_consumer = sem_open(SEM_CONSUMER_FILE,
O_RDWR);//獲取信號量sem_consumer的值
if (SEM_FAILED == sem_consumer) {
printf("sem_open <%s>: %s\n", SEM_CONSUMER_FILE, strerror(errno));
return -1;
}
sem_productor = sem_open(SEM_PRODUCTOR_FILE,
O_RDWR);//獲取信號量sem_productor 的值
if (SEM_FAILED == sem_productor) {
printf("sem_open <%s>: %s\n", SEM_PRODUCTOR_FILE, strerror(errno));
return -1;
}
//clear_exist_sem(sem_productor);
SLN_MUTEX_SHM_LOCK(SEM_MUTEX, sem_mutex);//互斥訪問共享內存
nms_shm_get(SHM_FILE, (void**)&sharememory, SHM_MAX_LEN);
printf("sharememory: %s\n", sharememory);
SLN_MUTEX_SHM_UNLOCK(sem_mutex);
sem_post(sem_consumer);//信號量sem_consumer加一,喚醒是阻塞在該信號量上的服務進程
printf("Post...\n");
sem_wait(sem_productor);//等待服務進程迴應
/*
timeout.tv_sec = time(NULL) + SEM_TIMEOUT_SEC;
timeout.tv_nsec = 0;
ret = sem_timedwait(sem_productor, &timeout);
if (ret < 0) {
printf("sem_timedwait: %s\n", strerror(errno));
}
*/
printf("Get response...\n");
sem_close(sem_consumer);
sem_close(sem_productor);
return 0;
}
http://download.csdn.net/detail/gentleliu/8292189