分散システム
電子・情報工学系
新城 靖
<yas@is.tsukuba.ac.jp>
このページは、次の URL にあります。
http://www.hlla.is.tsukuba.ac.jp/~yas/coins/dsys-1998/1999-01-26
/pthread-sync.html
あるいは、次のページから手繰っていくこともできます。
http://www.hlla.is.tsukuba.ac.jp/~yas/coins/
http://www.hlla.is.tsukuba.ac.jp/~yas/index-j.html
プロセスの場合、ファイル、端末、ウインドウ、主記憶、CPU時間。
主記憶やCPU時間といった資源は、横取りしても平気なので、あまり問題になりない。
スレッドの場合は、プログラミング言語の変数。
変更されない変数の値を読むだけなら、特に問題は起きない。それ意外の時、 特に、複数のスレッドで値を読み書きする時に問題が起きる。
----------------------------------------------------------------------
1:
2: /*
3: mutex-nolock.c -- 共有資源をロックなしでアクセスするプログラム
4: このファイルは次の場所にあります。
5: ~yas/dsys/pthread/mutex-nolock.c
6: cp コマンドでコピーできます。
7: $Header: /home/lab2/OS/yas/dsys/pthread/RCS/mutex-nolock.c,v 1.1 1999/01/25 13:31:48 yas Exp $
8: Start: 1999/01/25 22:13:15
9: */
10:
11: #include <pthread.h>
12:
13: #if defined(sun)
14: #define YIELD() thr_yield()
15: #endif
16: #if defined(sgi) || defined(__alpha)
17: #define YIELD() work( 500000 )
18: int work( int n ) {
19: int i,x ;
20: for( i = 0 ; i<n ; i++ )
21: x = x + i ;
22: return( x );
23: }
24: #endif
25: #if defined(__FreeBSD__) || defined(linux)
26: #define YIELD() usleep( 100 )
27: #endif
28:
29: void thread_A(), thread_B();
30: int shared_resource ;
31:
32: main() {
33: pthread_t t1 ;
34: pthread_t t2 ;
35: shared_resource = 0 ;
36: pthread_create( &t1, NULL, (void *)thread_A, 0 );
37: pthread_create( &t2, NULL, (void *)thread_B, 0 );
38: pthread_join( t1, NULL );
39: pthread_join( t2, NULL );
40: printf("main(): shared_resource == %d\n", shared_resource );
41: }
42:
43: void thread_A() {
44: int i, x ;
45: for( i = 0 ; i<5 ; i++ ) {
46: YIELD(); x = shared_resource ;
47: YIELD(); x = x + 1 ;
48: YIELD(); shared_resource = x ;
49: }
50: }
51:
52: void thread_B() {
53: int i, x ;
54: for( i = 0 ; i<5 ; i++ ) {
55: YIELD(); x = shared_resource ;
56: YIELD(); x = x + 1 ;
57: YIELD(); shared_resource = x ;
58: }
59: }
----------------------------------------------------------------------
YIELD() は、後述。
---------------------------------------------------------------------- % cp ~yas/dsys/pthread/mutex-nolock.c .% cc -o mutex-nolock mutex-nolock.c -lpthread
% ./mutex-nolock
main(): shared_resource == 6 % ./mutex-nolock
main(): shared_resource == 5 % ./mutex-nolock
main(): shared_resource == 5 % ./mutex-nolock
main(): shared_resource == 5 %
----------------------------------------------------------------------
thread_A(), thread_B()と2つのスレッドが作
られている。整数型の変数 shared_resource は、その2つの
スレッドの両方からアクセスされる。2つのスレッドで合計10増える予定だ
が、実際に実行すると、5か6しか増えない。
このプログラムが共有メモリ型マルチプロセッサで動いているとして、動きを 考えてえる。
---------------------------------------------------------------------- thread_A() thread_B() : : : : 46: x = shared_resource ; 55: x = shared_resource ; 47: x = x + 1 ; 56: x = x + 1 ; 48: shared_resource = x ; 57: shared_resource = x ; : : : : ----------------------------------------------------------------------
shared_resource++と書いてもだめ。複数の機械語命令に分割
されるので。
上のプログラムは、 シングルプロセッサ で、CPU資源の横取りを使いながら、マルチプロセッサの動きをエミュレート している。
O2 で動いている Pthread は、横取りのあるスケジューリングを行なっている。横取り(preemption) とは、実行中(CPUがあれば処理を続けられる)のスレッドやプロセスの実行を 中断し、別のスレッドやプロセスを実行する。
相互排除(mutual exclusion): ある資源をアクセスできるスレッドの数を 多くても1つにする。
プログラムの字面上、相互排除が必要な部分を 際どい部分(critical section) ( クリティカルセクション ) という。
Pthread では、相互排除を行なうために、 mutex という仕組みがある。次のようにして、相互排除を行ないたい部分を lock と unlock で囲む。
pthread_mutex_t mutex1 ;
:
:
pthread_mutex_lock( &murex1 );
<相互排除したい部分(際どい部分)>
pthread_mutex_unlock( &mutex1 );
YHM_SSS_Ref(thread-mutex-nolock,ロックなし)のプログラムを、mutex を使っ て書き直したもの。
----------------------------------------------------------------------
1:
2: /*
3: mutex-lock.c -- 共有資源をロックしながらアクセスするプログラム
4: このファイルは次の場所にあります。
5: ~yas/dsys/pthread/mutex-lock.c
6: cp コマンドでコピーできます。
7: $Header: /home/lab2/OS/yas/dsys/pthread/RCS/mutex-lock.c,v 1.2 1999/01/25 14:03:56 yas Exp $
8: Start: 1999/01/25 22:13:15
9: */
10:
11: #include <pthread.h>
12:
13: #if defined(sun)
14: #define YIELD() thr_yield()
15: #endif
16: #if defined(sgi) || defined(__alpha)
17: #define YIELD() work( 500000 )
18: int work( int n ) {
19: int i,x ;
20: for( i = 0 ; i<n ; i++ )
21: x = x + i ;
22: return( x );
23: }
24: #endif
25: #if defined(__FreeBSD__) || defined(linux)
26: #define YIELD() usleep( 100 )
27: #endif
28:
29: void thread_A(), thread_B();
30: int shared_resource ;
31: pthread_mutex_t mutex1 ;
32:
33: main() {
34: pthread_t t1 ;
35: pthread_t t2 ;
36: shared_resource = 0 ;
37: pthread_mutex_init( &mutex1, NULL );
38: pthread_create( &t1, NULL, (void *)thread_A, 0 );
39: pthread_create( &t2, NULL, (void *)thread_B, 0 );
40: pthread_join( t1, NULL );
41: pthread_join( t2, NULL );
42: printf("main(): shared_resource == %d\n", shared_resource );
43: }
44:
45: void thread_A() {
46: int i, x ;
47: for( i = 0 ; i<5 ; i++ ) {
48: YIELD(); pthread_mutex_lock( &mutex1 );
49: YIELD(); x = shared_resource ;
50: YIELD(); x = x + 1 ;
51: YIELD(); shared_resource = x ;
52: YIELD(); pthread_mutex_unlock( &mutex1 );
53: }
54: }
55:
56: void thread_B() {
57: int i, x ;
58: for( i = 0 ; i<5 ; i++ ) {
59: YIELD(); pthread_mutex_lock( &mutex1 );
60: YIELD(); x = shared_resource ;
61: YIELD(); x = x + 1 ;
62: YIELD(); shared_resource = x ;
63: YIELD(); pthread_mutex_unlock( &mutex1 );
64: }
65: }
----------------------------------------------------------------------
---------------------------------------------------------------------- % cp ~yas/dsys/pthread/mutex-lock.c .% cc -o mutex-lock mutex-lock.c -lpthread
% ./mutex-lock
main(): shared_resource == 10 % ./mutex-lock
main(): shared_resource == 10 % ./mutex-lock
main(): shared_resource == 10 % ./mutex-lock
main(): shared_resource == 10 %
----------------------------------------------------------------------
このプログラムが共有メモリ型マルチプロセッサで動いているとして、動きを 考えてえる。
---------------------------------------------------------------------- thread_A() thread_B() : : 48: pthread_mutex_lock( &mutex1 ); : 59: pthread_mutex_lock( &mutex1 ); 49: x = shared_resource ; <他のスレッドが実行中なので 50: x = x + 1 ; この状態でしばらく待たされる> 51: shared_resource = x ; : 52: pthread_mutex_unlock( &mutex1 ); : : <実行再開> : 60: x = shared_resource ; 61: x = x + 1 ; 62: shared_resource = x ; 63: pthread_mutex_unlock( &mutex1 ); ----------------------------------------------------------------------後から来た
thread_B() は、他のスレッドが実行中は、待たさ
れる。
2つのスレッドの間には、バッファを置く。---------------------------------------------------------------------- thread_A | thread_B ----------------------------------------------------------------------
バッファが空の時、YHM_thread_B() は、YHM_thread_A() が何かデー タをバッファに入れるのを待つ。バッファがいっぱいの時、YHM_thread_A() は、YHM_thread_B() がバッファから何かデータを取り出すのを待つ。
条件変数(condition variable) で、ある条件が生じたことを待つ。条件変数の操作:
----------------------------------------------------------------------
1:
2: /*
3: condv-buffer.c -- 条件変数を使った環状バッファ
4: このファイルは次の場所にあります。
5: ~yas/dsys/pthread/mutex-nolock.c
6: cp コマンドでコピーできます。
7: $Header: /home/lab2/OS/yas/dsys/pthread/RCS/condv-buffer.c,v 1.1 1999/01/25 14:08:31 yas Exp $
8: Start: 1999/01/25 22:13:15
9: */
10:
11: #include <pthread.h>
12:
13: void thread_A(), thread_B();
14:
15: #define BUFFER_SIZE 3 /* バッファの大きさ */
16: struct buffer {
17: int rp ; /* 読み出す位置 */
18: int wp ; /* 書き込む位置 */
19: int data[BUFFER_SIZE]; /* データを保存する場所 */
20: pthread_mutex_t mutex ; /* この構造体の相互排除のための mutex */
21: pthread_cond_t not_full ; /* バッファが一杯ではない状態を待つための条件変数 */
22: pthread_cond_t not_empty ; /* バッファが空ではない状態を待つための条件変数 */
23: };
24:
25: void put( struct buffer *b,int x ) {
26: pthread_mutex_lock( &b->mutex );
27: loop: if( ((b->wp+1)%BUFFER_SIZE) == b->rp ) {
28: pthread_cond_wait( &b->not_full,&b->mutex );
29: goto loop;
30: }
31: b->data[ b->wp++ ] = x ;
32: if( b->wp >= BUFFER_SIZE )
33: b->wp = 0 ;
34: pthread_cond_signal( &b->not_empty );
35: pthread_mutex_unlock( &b->mutex );
36: }
37:
38:
39: int get( struct buffer *b ) {
40: int x ;
41: pthread_mutex_lock( &b->mutex );
42: loop: if( b->rp == b->wp ) {
43: pthread_cond_wait( &b->not_empty,&b->mutex );
44: goto loop;
45: }
46: x = b->data[ b->rp++ ] ;
47: if( b->rp >= BUFFER_SIZE )
48: b->rp = 0 ;
49: pthread_cond_signal( &b->not_full );
50: pthread_mutex_unlock( &b->mutex );
51: return( x );
52: }
53:
54: main() {
55: pthread_t t1 ;
56: pthread_t t2 ;
57: struct buffer *b ;
58: if( (b = (struct buffer *)malloc(sizeof(struct buffer))) == NULL )
59: {
60: perror("no memory for struct buffer\n");
61: exit( -1 );
62: }
63: b->rp = 0 ;
64: b->wp = 0 ;
65: pthread_mutex_init( &b->mutex, NULL );
66: pthread_cond_init( &b->not_full,NULL );
67: pthread_cond_init( &b->not_empty,NULL );
68: pthread_create( &t1, NULL, (void *)thread_A, (void *)b );
69: pthread_create( &t2, NULL, (void *)thread_B, (void *)b );
70: pthread_join( t1, NULL );
71: pthread_join( t2, NULL );
72: }
73:
74: void thread_A( struct buffer *b ) { /* producer */
75: int i,x ;
76: for( i = 0 ; i<10 ; i++ ) {
77: x = i ;
78: printf("thread_A(): put( %d )\n",x );
79: put( b,x );
80: }
81: }
82:
83: void thread_B( struct buffer *b ) { /* consumer */
84: int i, x ;
85: for( i = 0 ; i<10 ; i++ ) {
86: x = get( b );
87: printf("thread_B(): get() %d.\n", x );
88: }
89: }
----------------------------------------------------------------------
put() は、バッファにデータを追加する時に使う手続き。
基本的には、入口で pthread_mutex_lock() し、
出口で pthread_mutex_unlock() する。
バッファが一杯の時には、条件変数b->not_full で、
一杯でないという条件になるまで待つ。
待っている間は、mutex のロックは解除される。
pthread_cond_wait() からリターンして来る時には、もう一度
ロックされた状態に戻るが、待っている間に、他の変数
(rp,wp,data)が書き換えられている可能性があるので、もう一
度最初から調べる。
get() は、バッファからデータを取り出す時に使う手
続き。put()とほぼ対称形。バッファが空の時に、wait
し、バッファがもはや一杯ではないことをsignal する。
thread_A() は、10回バッファにデータを書き込むスレッド。
thread_B() は逆に、10回バッファからデータを読み出すス
レッド。
整数を1つバッファに書き込むだけでロック/アンロックを行なっていると、 実際の並列処理では重たい。ロックの回数を減らすために、ダブルバッファリ ングと呼ばれる技術がよく使われる。読み手と書き手で別々にバッファをもう け、1つのバッファの処理をしている間は、ロックを行なわない。---------------------------------------------------------------------- % cp ~yas/dsys/pthread/condv-buffer.c .% cc -o condv-buffer condv-buffer.c -lpthread
% ./condv-buffer
thread_A(): put( 0 ) thread_A(): put( 1 ) thread_A(): put( 2 ) thread_B(): get() 0. thread_B(): get() 1. thread_A(): put( 3 ) thread_A(): put( 4 ) thread_B(): get() 2. thread_B(): get() 3. thread_A(): put( 5 ) thread_A(): put( 6 ) thread_B(): get() 4. thread_B(): get() 5. thread_A(): put( 7 ) thread_A(): put( 8 ) thread_B(): get() 6. thread_B(): get() 7. thread_A(): put( 9 ) thread_B(): get() 8. thread_B(): get() 9. %
----------------------------------------------------------------------
バッファに要素を1つずつ追加しているので、
pthread_cond_signal() でもよい。
pthread_cond_broadcast() に変えても動く。
複数個同時に読み書きする時には、
pthread_cond_broadcast() でないとだめ。
<−>再帰呼出し
スレッド間でポインタを渡す時には、スレッドの寿命にも注意。
シングルスレッドのプログラムでは、static変数は、プログラムのモジュール性 を高めるために有効に使われてきた。
マルチスレッドと相性が非常に悪い。static変数もextern変数と同様に複数の スレッドで共有される。変更する場合には、mutex でロックが必要になる。
TCP/IP でプログラムを書く時に使うgethostbyname() は、
static変数に値をセットして、リターン・バリューとして返してる。
----------------------------------------------------------------------
struct hostent *gethostbyname( char *name ){
static struct hostent ret ;
.....
return( &ret );
}
----------------------------------------------------------------------
複数のスレッドが同時にこの関数を呼び出した場合、同じstatic変数が使われ
る。
複数のスレッドで呼び出してもきちんと動作することを、
スレッド・セーフ(thread-safe)
という。
MT-Safe(multi-thread-safe)
や
再入可能(reentrant)
ということもある。
externやstaticを使わず、auto変数やmalloc()だけを使っているような手続き は、スレッド・セーフ。
別のスレッド・セーフでない手続きを呼んでいれば、それはスレッド・セーフ ではない。
スレッド・セーフになるようにするには、インタフェースを変更する必要があ る。
Sun のマニュアルより:
----------------------------------------------------------------------
struct hostent *gethostbyname(const char *name);
struct hostent *gethostbyname_r(const char *name,
struct hostent *result, char *buffer, int buflen,
int *h_errnop);
----------------------------------------------------------------------
O2には、gethostbyname_r() はない。
YHM_Chapter_Ref(09-100,Java) には、最初から言語のレベルでスレッドの機能が入っている。
---------------------------------------------------------------------- Java Pthread ---------------------------------------------------------------------- new Thread(); start(); pthread_create() join() pthread_join() synchronized pthread_mutex_lock()とpthread_mutex_unlock()の組 wait() pthread_cond_wait() wait(long timeout) pthread_cond_timedwait() notify() pthread_cond_signal() notifyAll() pthread_cond_broadcast() ----------------------------------------------------------------------1つの mutex と1つの条件変数なら、書き直せる。 上の循環バッファのプログラムは、1つの mutex で2つの条件変数を使って いるので、単純には Java で書き直せない。
ヒント: rp==wp の時に、バッファが空かフルかが判定ができればよい。
循環バッファのプログラムで、条件変数を1つになるように変更しなさい。一 度に wait する必要があるのは、put() 側か get() のどちらか一方だけ。あ るいは、両方とも同じキューにつないでも、効率は悪いが動く事は動く。 循環バッファのプログラムで、条件変数を1つにしたものについて、Java で 書き直しなさい。