スレッド・プログラミング(その2)

分散システム

                                       電子・情報工学系
                                       新城 靖
                                       <yas@is.tsukuba.ac.jp>

このページは、次の URL にあります。
http://www.hlla.is.tsukuba.ac.jp/~yas/coins/dsys-1998/1999-01-26 /pthread-sync.html
あるいは、次のページから手繰っていくこともできます。
http://www.hlla.is.tsukuba.ac.jp/~yas/coins/
http://www.hlla.is.tsukuba.ac.jp/~yas/index-j.html

■スレッド・プログラミング(その2)

この授業は、オペレーティング・システムIIです。

■復習

独立して実行したい処理がある時にスレッドを作る。

■mutexによるスレッド間の同期

◆共有資源

複数のプロセス/スレッドで共有しなければならないもの。

プロセスの場合、ファイル、端末、ウインドウ、主記憶、CPU時間。

主記憶やCPU時間といった資源は、横取りしても平気なので、あまり問題になりない。

スレッドの場合は、プログラミング言語の変数。

変更されない変数の値を読むだけなら、特に問題は起きない。それ意外の時、 特に、複数のスレッドで値を読み書きする時に問題が起きる。

◆複数のスレッドによる共有資源のアクセス(ロックなし)


----------------------------------------------------------------------
   1:	
   2:	/*
   3:	        mutex-nolock.c -- 共有資源をロックなしでアクセスするプログラム
   4:	        このファイルは次の場所にあります。
   5:	                ~yas/dsys/pthread/mutex-nolock.c
   6:	        cp コマンドでコピーできます。
   7:	        $Header: /home/lab2/OS/yas/dsys/pthread/RCS/mutex-nolock.c,v 1.1 1999/01/25 13:31:48 yas Exp $
   8:	        Start: 1999/01/25 22:13:15
   9:	*/
  10:	
  11:	#include <pthread.h>
  12:	
  13:	#if     defined(sun)
  14:	#define YIELD() thr_yield()
  15:	#endif
  16:	#if     defined(sgi) || defined(__alpha)
  17:	#define YIELD() work( 500000 )
  18:	int work( int n ) {
  19:	    int i,x ;
  20:	        for( i = 0 ; i<n ; i++ )
  21:	            x = x + i ;
  22:	        return( x );
  23:	}
  24:	#endif
  25:	#if     defined(__FreeBSD__) || defined(linux)
  26:	#define YIELD() usleep( 100 )
  27:	#endif
  28:	
  29:	void thread_A(), thread_B();
  30:	int     shared_resource ;
  31:	
  32:	main() {
  33:	    pthread_t t1 ;
  34:	    pthread_t t2 ;
  35:	        shared_resource = 0 ;
  36:	        pthread_create( &t1, NULL, (void *)thread_A, 0 );
  37:	        pthread_create( &t2, NULL, (void *)thread_B, 0 );
  38:	        pthread_join( t1, NULL );
  39:	        pthread_join( t2, NULL );
  40:	        printf("main(): shared_resource == %d\n", shared_resource );
  41:	}
  42:	
  43:	void thread_A() {
  44:	    int i, x ;
  45:	        for( i = 0 ; i<5 ; i++ ) {
  46:	YIELD();    x = shared_resource ;
  47:	YIELD();    x = x + 1 ;
  48:	YIELD();    shared_resource = x ;
  49:	        }
  50:	}
  51:	
  52:	void thread_B() {
  53:	    int i, x ;
  54:	        for( i = 0 ; i<5 ; i++ ) {
  55:	YIELD();    x = shared_resource ;
  56:	YIELD();    x = x + 1 ;
  57:	YIELD();    shared_resource = x ;
  58:	        }
  59:	}
----------------------------------------------------------------------

YIELD() は、後述。

◆実行結果(ロックなし)


----------------------------------------------------------------------
% cp ~yas/dsys/pthread/mutex-nolock.c . [←]
% cc -o mutex-nolock mutex-nolock.c -lpthread [←]
% ./mutex-nolock  [←]
main(): shared_resource == 6
% ./mutex-nolock [←]
main(): shared_resource == 5
% ./mutex-nolock [←]
main(): shared_resource == 5
% ./mutex-nolock [←]
main(): shared_resource == 5
% []
----------------------------------------------------------------------
thread_A(), thread_B()と2つのスレッドが作 られている。整数型の変数 shared_resource は、その2つの スレッドの両方からアクセスされる。2つのスレッドで合計10増える予定だ が、実際に実行すると、5か6しか増えない。

◆考察(ロックなし)

このプログラムが共有メモリ型マルチプロセッサで動いているとして、動きを 考えてえる。


----------------------------------------------------------------------
thread_A()			thread_B()
	:				:
	:				:
46: x = shared_resource ;
				55: x = shared_resource ;
47: x = x + 1 ;
				56: x = x + 1 ;
48: shared_resource = x ;
				57: shared_resource = x ;
	:				:
	:				:
----------------------------------------------------------------------

shared_resource++と書いてもだめ。複数の機械語命令に分割 されるので。

上のプログラムは、 シングルプロセッサ で、CPU資源の横取りを使いながら、マルチプロセッサの動きをエミュレート している。

◆O2のPthreadライブラリ

O2 で動いている Pthread は、横取りのあるスケジューリングを行なっている。

横取り(preemption) とは、実行中(CPUがあれば処理を続けられる)のスレッドやプロセスの実行を 中断し、別のスレッドやプロセスを実行する。

◆相互排除

相互排除(mutual exclusion): ある資源をアクセスできるスレッドの数を 多くても1つにする。

プログラムの字面上、相互排除が必要な部分を 際どい部分(critical section) ( クリティカルセクション ) という。

◆Pthreadでの相互排除

Pthread では、相互排除を行なうために、 mutex という仕組みがある。次のようにして、相互排除を行ないたい部分を lock と unlock で囲む。

    pthread_mutex_t mutex1 ;
	:
	:
    pthread_mutex_lock( &murex1 );
	<相互排除したい部分(際どい部分)>
    pthread_mutex_unlock( &mutex1 );

◆複数のスレッドによる共有資源のアクセス(ロック付き)

YHM_SSS_Ref(thread-mutex-nolock,ロックなし)のプログラムを、mutex を使っ て書き直したもの。

----------------------------------------------------------------------
   1:	
   2:	/*
   3:	        mutex-lock.c -- 共有資源をロックしながらアクセスするプログラム
   4:	        このファイルは次の場所にあります。
   5:	                ~yas/dsys/pthread/mutex-lock.c
   6:	        cp コマンドでコピーできます。
   7:	        $Header: /home/lab2/OS/yas/dsys/pthread/RCS/mutex-lock.c,v 1.2 1999/01/25 14:03:56 yas Exp $
   8:	        Start: 1999/01/25 22:13:15
   9:	*/
  10:	
  11:	#include <pthread.h>
  12:	
  13:	#if     defined(sun)
  14:	#define YIELD() thr_yield()
  15:	#endif
  16:	#if     defined(sgi) || defined(__alpha)
  17:	#define YIELD() work( 500000 )
  18:	int work( int n ) {
  19:	    int i,x ;
  20:	        for( i = 0 ; i<n ; i++ )
  21:	            x = x + i ;
  22:	        return( x );
  23:	}
  24:	#endif
  25:	#if     defined(__FreeBSD__) || defined(linux)
  26:	#define YIELD() usleep( 100 )
  27:	#endif
  28:	
  29:	void thread_A(), thread_B();
  30:	int     shared_resource ;
  31:	pthread_mutex_t mutex1 ;
  32:	
  33:	main() {
  34:	    pthread_t t1 ;
  35:	    pthread_t t2 ;
  36:	        shared_resource = 0 ;
  37:	        pthread_mutex_init( &mutex1, NULL );
  38:	        pthread_create( &t1, NULL, (void *)thread_A, 0 );
  39:	        pthread_create( &t2, NULL, (void *)thread_B, 0 );
  40:	        pthread_join( t1, NULL );
  41:	        pthread_join( t2, NULL );
  42:	        printf("main(): shared_resource == %d\n", shared_resource );
  43:	}
  44:	
  45:	void thread_A() {
  46:	    int i, x ;
  47:	        for( i = 0 ; i<5 ; i++ ) {
  48:	YIELD();    pthread_mutex_lock( &mutex1 );
  49:	YIELD();    x = shared_resource ;
  50:	YIELD();    x = x + 1 ;
  51:	YIELD();    shared_resource = x ;
  52:	YIELD();    pthread_mutex_unlock( &mutex1 );
  53:	        }
  54:	}
  55:	
  56:	void thread_B() {
  57:	    int i, x ;
  58:	        for( i = 0 ; i<5 ; i++ ) {
  59:	YIELD();    pthread_mutex_lock( &mutex1 );
  60:	YIELD();    x = shared_resource ;
  61:	YIELD();    x = x + 1 ;
  62:	YIELD();    shared_resource = x ;
  63:	YIELD();    pthread_mutex_unlock( &mutex1 );
  64:	        }
  65:	}
----------------------------------------------------------------------

◆実行結果(ロック付き)


----------------------------------------------------------------------
% cp ~yas/dsys/pthread/mutex-lock.c . [←]
% cc -o mutex-lock mutex-lock.c -lpthread [←]
% ./mutex-lock  [←]
main(): shared_resource == 10
% ./mutex-lock [←]
main(): shared_resource == 10
% ./mutex-lock [←]
main(): shared_resource == 10
% ./mutex-lock [←]
main(): shared_resource == 10
% []
----------------------------------------------------------------------

◆考察(ロック付き)

このプログラムが共有メモリ型マルチプロセッサで動いているとして、動きを 考えてえる。

----------------------------------------------------------------------
thread_A()				thread_B()

	:					:
48: pthread_mutex_lock( &mutex1 );		:
					59: pthread_mutex_lock( &mutex1 );
49: x = shared_resource ;		<他のスレッドが実行中なので
50: x = x + 1 ;				 この状態でしばらく待たされる>
51: shared_resource = x ;			:
52: pthread_mutex_unlock( &mutex1 );		:
	:				<実行再開>
	:				60: x = shared_resource ;
					61: x = x + 1 ;
					62: shared_resource = x ;
					63: pthread_mutex_unlock( &mutex1 );
----------------------------------------------------------------------

後から来た thread_B() は、他のスレッドが実行中は、待たさ れる。

■条件変数によるスレッド間の同期

スレッドでプログラムを作っていると、あるスレッドが別のスレッドの仕事の 完了を待つ必要が出がある。

◆パイプと循環バッファ

Unix のパイプのようなことをスレッドを使って実行するしたい。

----------------------------------------------------------------------
thread_A | thread_B
----------------------------------------------------------------------
2つのスレッドの間には、バッファを置く。

図? 環状バッファ、生産者スレッド、消費者スレッド

図? 環状バッファ、生産者スレッド、消費者スレッド

バッファが空の時、YHM_thread_B() は、YHM_thread_A() が何かデー タをバッファに入れるのを待つ。バッファがいっぱいの時、YHM_thread_A() は、YHM_thread_B() がバッファから何かデータを取り出すのを待つ。

◆条件変数

条件変数(condition variable) で、ある条件が生じたことを待つ。

条件変数の操作:

wait
ある条件が満たされるまで待つ
signal
ある条件が満たされたことを伝える。待っているスレッドが1つだけ起き上がる。
broadcast
ある条件が満たされたことを伝える。待っているスレッドが全て起き上がる。

◆条件変数を使った環状バッファ


----------------------------------------------------------------------
   1:	
   2:	/*
   3:	        condv-buffer.c -- 条件変数を使った環状バッファ
   4:	        このファイルは次の場所にあります。
   5:	                ~yas/dsys/pthread/mutex-nolock.c
   6:	        cp コマンドでコピーできます。
   7:	        $Header: /home/lab2/OS/yas/dsys/pthread/RCS/condv-buffer.c,v 1.1 1999/01/25 14:08:31 yas Exp $
   8:	        Start: 1999/01/25 22:13:15
   9:	*/
  10:	
  11:	#include <pthread.h>
  12:	
  13:	void thread_A(), thread_B();
  14:	
  15:	#define BUFFER_SIZE     3       /* バッファの大きさ */
  16:	struct buffer {
  17:	        int rp ;                        /* 読み出す位置 */
  18:	        int wp ;                        /* 書き込む位置 */
  19:	        int data[BUFFER_SIZE];          /* データを保存する場所 */
  20:	        pthread_mutex_t mutex ;         /* この構造体の相互排除のための mutex */
  21:	        pthread_cond_t  not_full ;      /* バッファが一杯ではない状態を待つための条件変数 */
  22:	        pthread_cond_t  not_empty ;     /* バッファが空ではない状態を待つための条件変数 */
  23:	};
  24:	
  25:	void put( struct buffer *b,int x ) {
  26:	        pthread_mutex_lock( &b->mutex );
  27:	loop:   if( ((b->wp+1)%BUFFER_SIZE) == b->rp ) {
  28:	            pthread_cond_wait( &b->not_full,&b->mutex );
  29:	            goto loop;
  30:	        }
  31:	        b->data[ b->wp++ ] = x ;
  32:	        if( b->wp >= BUFFER_SIZE )
  33:	            b->wp = 0 ;
  34:	        pthread_cond_signal( &b->not_empty );
  35:	        pthread_mutex_unlock( &b->mutex );
  36:	}
  37:	
  38:	
  39:	int get( struct buffer *b ) {
  40:	    int x ;
  41:	        pthread_mutex_lock( &b->mutex );
  42:	loop:   if( b->rp == b->wp ) {
  43:	            pthread_cond_wait( &b->not_empty,&b->mutex );
  44:	            goto loop;
  45:	        }
  46:	        x = b->data[ b->rp++ ] ;
  47:	        if( b->rp >= BUFFER_SIZE )
  48:	            b->rp = 0 ;
  49:	        pthread_cond_signal( &b->not_full );
  50:	        pthread_mutex_unlock( &b->mutex );
  51:	        return( x );
  52:	}
  53:	
  54:	main() {
  55:	    pthread_t t1 ;
  56:	    pthread_t t2 ;
  57:	        struct buffer *b  ;
  58:	        if( (b = (struct buffer *)malloc(sizeof(struct buffer))) == NULL )
  59:	        {
  60:	            perror("no memory for struct buffer\n");
  61:	            exit( -1 );
  62:	        }
  63:	        b->rp = 0 ;
  64:	        b->wp = 0 ;
  65:	        pthread_mutex_init( &b->mutex, NULL );
  66:	        pthread_cond_init( &b->not_full,NULL );
  67:	        pthread_cond_init( &b->not_empty,NULL );
  68:	        pthread_create( &t1, NULL, (void *)thread_A, (void *)b );
  69:	        pthread_create( &t2, NULL, (void *)thread_B, (void *)b );
  70:	        pthread_join( t1, NULL );
  71:	        pthread_join( t2, NULL );
  72:	}
  73:	
  74:	void thread_A( struct buffer *b ) { /* producer */
  75:	    int i,x ;
  76:	        for( i = 0 ; i<10 ; i++ ) {
  77:	            x = i ;
  78:	            printf("thread_A(): put( %d )\n",x );
  79:	            put( b,x );
  80:	        }
  81:	}
  82:	
  83:	void thread_B( struct buffer *b ) { /* consumer */
  84:	    int i, x ;
  85:	        for( i = 0 ; i<10 ; i++ ) {
  86:	            x = get( b );
  87:	            printf("thread_B(): get() %d.\n", x );
  88:	        }
  89:	}
----------------------------------------------------------------------

put() は、バッファにデータを追加する時に使う手続き。

基本的には、入口で pthread_mutex_lock() し、 出口で pthread_mutex_unlock() する。

バッファが一杯の時には、条件変数b->not_full で、 一杯でないという条件になるまで待つ。

待っている間は、mutex のロックは解除される。

pthread_cond_wait() からリターンして来る時には、もう一度 ロックされた状態に戻るが、待っている間に、他の変数 (rp,wp,data)が書き換えられている可能性があるので、もう一 度最初から調べる。

get() は、バッファからデータを取り出す時に使う手 続き。put()とほぼ対称形。バッファが空の時に、wait し、バッファがもはや一杯ではないことをsignal する。

thread_A() は、10回バッファにデータを書き込むスレッド。 thread_B() は逆に、10回バッファからデータを読み出すス レッド。

◆実行結果


----------------------------------------------------------------------
% cp ~yas/dsys/pthread/condv-buffer.c . [←]
% cc -o condv-buffer condv-buffer.c -lpthread [←]
% ./condv-buffer  [←]
thread_A(): put( 0 )
thread_A(): put( 1 )
thread_A(): put( 2 )
thread_B(): get() 0.
thread_B(): get() 1.
thread_A(): put( 3 )
thread_A(): put( 4 )
thread_B(): get() 2.
thread_B(): get() 3.
thread_A(): put( 5 )
thread_A(): put( 6 )
thread_B(): get() 4.
thread_B(): get() 5.
thread_A(): put( 7 )
thread_A(): put( 8 )
thread_B(): get() 6.
thread_B(): get() 7.
thread_A(): put( 9 )
thread_B(): get() 8.
thread_B(): get() 9.
% []
----------------------------------------------------------------------

◆ダブルバッファリング

整数を1つバッファに書き込むだけでロック/アンロックを行なっていると、 実際の並列処理では重たい。ロックの回数を減らすために、ダブルバッファリ ングと呼ばれる技術がよく使われる。読み手と書き手で別々にバッファをもう け、1つのバッファの処理をしている間は、ロックを行なわない。

◆signalかbroadcastか

バッファに要素を1つずつ追加しているので、 pthread_cond_signal() でもよい。 pthread_cond_broadcast() に変えても動く。 複数個同時に読み書きする時には、 pthread_cond_broadcast() でないとだめ。

■スレッドとメモリ

◆auto変数

各スレッドには、独立したスタックが割り当てられる。C言語の auto 変数は、 スレッドごとにコピーが作られる。

<−>再帰呼出し

スレッド間でポインタを渡す時には、スレッドの寿命にも注意。

◆static変数

シングルスレッドのプログラムでは、static変数は、プログラムのモジュール性 を高めるために有効に使われてきた。

マルチスレッドと相性が非常に悪い。static変数もextern変数と同様に複数の スレッドで共有される。変更する場合には、mutex でロックが必要になる。

◆static変数を使ったライブラリ関数

TCP/IP でプログラムを書く時に使う gethostbyname() は、 static変数に値をセットして、リターン・バリューとして返してる。

----------------------------------------------------------------------
struct hostent *gethostbyname( char *name ){
    static struct hostent ret ;
	.....
	return( &ret );
}
----------------------------------------------------------------------

複数のスレッドが同時にこの関数を呼び出した場合、同じstatic変数が使われ る。

◆スレッド・セーフ

複数のスレッドで呼び出してもきちんと動作することを、 スレッド・セーフ(thread-safe) という。 MT-Safe(multi-thread-safe)再入可能(reentrant) ということもある。

externやstaticを使わず、auto変数やmalloc()だけを使っているような手続き は、スレッド・セーフ。

別のスレッド・セーフでない手続きを呼んでいれば、それはスレッド・セーフ ではない。

◆スレッド・セーフなインタフェース

スレッド・セーフになるようにするには、インタフェースを変更する必要があ る。
Sun のマニュアルより:
----------------------------------------------------------------------
struct hostent *gethostbyname(const char *name);

struct hostent *gethostbyname_r(const char *name,
     struct hostent *result, char *buffer, int buflen,
     int *h_errnop);

----------------------------------------------------------------------
O2には、gethostbyname_r() はない。

◆スレッド・セーフではない手続きを使う

一見無関係の手続きが内部で変数を共有している場合がある。

■Javaのスレッド

YHM_Chapter_Ref(09-100,Java) には、最初から言語のレベルでスレッドの機能が入っている。

----------------------------------------------------------------------
Java			Pthread
----------------------------------------------------------------------
new Thread(); start();	pthread_create()
join()			pthread_join()
synchronized		pthread_mutex_lock()とpthread_mutex_unlock()の組
wait()			pthread_cond_wait()
wait(long timeout)	pthread_cond_timedwait()
notify()		pthread_cond_signal()
notifyAll()		pthread_cond_broadcast()
----------------------------------------------------------------------
1つの mutex と1つの条件変数なら、書き直せる。 上の循環バッファのプログラムは、1つの mutex で2つの条件変数を使って いるので、単純には Java で書き直せない。

■練習問題

★スレッドの数

相互排除のプログラムや条件変数プログラムでスレッドの数を増やしてみなさい。

★手続きとスレッド

1つの手続き(C言語の関数)から複数のスレッドを生成してみなさい。

★ダブルバッファリング

上の循環バッファのプログラムを YHM_SSS_Ref(double-buffering,ダブルバッファリング)を行なうプログラムに 変更しなさい。

★循環バッファの改良

上の循環バッファのプログラムは、バッファが3つあるのに、実質的には2つ しか使われていない。これを3つ使えるようにしなさい。

ヒント: rp==wp の時に、バッファが空かフルかが判定ができればよい。

★条件変数の削減

循環バッファのプログラムで、条件変数を1つになるように変更しなさい。一 度に wait する必要があるのは、put() 側か get() のどちらか一方だけ。あ るいは、両方とも同じキューにつないでも、効率は悪いが動く事は動く。

★Javaへの書き直し

循環バッファのプログラムで、条件変数を1つにしたものについて、Java で 書き直しなさい。
[SunRPC] [SunRPCデータ型] [SunRPC練習問題] [NFS] [Pthread] [Pthread同期]
↑[もどる] ←[1月19日] ・[1月26日] →[2月2日]
Last updated: 1999/02/09 01:18:08
Yasushi Shinjo / <yas@is.tsukuba.ac.jp>