C++でのスレッドクラスの作成はじめに先日、私の兄弟から、オブジェクト指向のスレッド処理を楽にするC++クラスを簡単に作成する方法はないだろうかという相談を受けました。私はこれまで、さまざまなマルチスレッドライブラリを作成した経験がありますが、いずれもC言語によるものでした。低レベルのプログラミングには私はいつもC言語を使用し、C++はもっぱらGUI開発に使用しています。CodeGuruにはオブジェクト指向でスレッド処理を行うクラスの優れた例が色々と掲載されていますが、彼の要望をすべて満たし、かつ私の好奇心をも満たすクラスは見当たりませんでした。彼が求めていたのは、次のような特徴を備えたスレッドクラスでした。
そこで私は、CThreadという新しいクラスと、併せて使用するサポートクラスをいくつか作成しました。サポートクラスとして用意したのは、CMutexClassクラス、CEventClassクラス、CTaskクラスです。CMutexClassとCEventClassは、リソース管理を行うためのクラスです。CTaskは、均一型の非同期スレッド処理を行うクラスを作成するための基本クラスです。 スレッド処理とは何かプロセスは必ず1つ以上のスレッドを制御しており、各プロセスは一度に少なくとも1つのタスクを実行できます。複数のスレッドを制御しているプロセスのことを、マルチスレッドプロセスといいます。マルチスレッドプロセスでは、同じプロセスの環境内から複数のタスクを非同期で実行できます。 リソース管理――スレッドの同期同じマルチスレッドプロセスに属する各スレッドは、同じリソースを共有します。したがって、データ整合性を確保するために、OSレベルの制御機構が必要です。データ整合性が損なわれるのは、たとえばあるスレッドが変数を変更しようとしているときに別のスレッドがその変数を読み込もうとした場合や、2つのスレッドが同じ変数を同時に変更しようとした場合です。OSには、こうした状況を防ぐためのしくみが用意されています。その名は、相互排他オブジェクト(Mutual Exclusion Object)、略して「ミューテックス(mutex)」です。マルチスレッドアプリケーションでは、プログラムで配置したミューテックスによって、複数のスレッドが単一のリソースに同時にアクセスする事態を防ぎます。 スレッドは、リソースにアクセスする必要が生じると、まずミューテックスを取得します。いずれかのスレッドがミューテックスを取得しているときには、同じミューテックスを取得しようとした他のスレッドはブロックされ、CPU使用率の低い待ち状態に置かれます。データアクセスが完了したスレッドは、対応するミューテックスを解放します。これを受けて、他のスレッドがそのミューテックスを取得できるようになり、対応するデータにアクセスできます。 ミューテックスの実装に問題があると、いわゆる「デッドロック」が発生することがあります。デッドロックとは、1つまたは複数のスレッドが、同じリソースへのアクセスを求めて競合している状態です。デッドロックは、スレッドがミューテックスを2回取得しようとした場合にも発生することがあります。 デッドロックの例
上の例ではデッドロックが発生します。スレッドAは、スレッドBが保持するミューテックス(2)を取得しようとしてブロック状態になり、スレッドBは、スレッドAが保持するミューテックス(1)を取得しようとしてブロック状態になるからです。 UNIXの条件変数も、ミューテックスと同様、スレッドを同期させるためのしくみの1つです。条件変数では、スレッド同士を連係させることが可能です。あるスレッドから別のスレッドに対し、変更が発生したことを通知できます。Windowsでは、その機能はイベントで実現されます。 オペレーティングシステムコール次の表は、CMutexClass、CEventClass、CTask、CThreadの各クラスでスレッド処理の実装に使用した関数の一覧です。
CMutexClassクラスCMutexClassクラスは、システムレベルのミューテックス関数とミューテックス同期オブジェクトをカプセル化するクラスです。ミューテックスの作成は、オブジェクトをインスタンス化するときに行い、非ブロック状態のミューテックスを作成します。 このクラスには、LockとUnlockという2つのメンバ関数があります。Lockは、ミューテックスをロックして呼び出し元のスレッドに割り当てるメンバ関数です。ミューテックスは、呼び出し元のスレッドがUnlockメンバ関数で解放するまでは、そのスレッドにロックされたままとなります。ロック状態のミューテックスのLockメンバ関数を呼び出して取得しようとしたスレッドはブロックされ、CPU使用率が低い待ち状態に置かれます。ブロックしているスレッドがミューテックスを解放するまではその状態が続きます。 CMutexClass::CMutexClass(void) :m_bCreated(TRUE) { #ifdef WINDOWS m_mutex = CreateMutex(NULL,FALSE,NULL); if( !m_mutex ) m_bCreated = FALSE; #else pthread_mutexattr_t mattr; pthread_mutexattr_init( &mattr ); pthread_mutex_init(&m_mutex,&mattr); #endif memset(&m_owner,0,sizeof(ThreadId_t)); } CMutexClass::~CMutexClass(void) { #ifdef WINDOWS WaitForSingleObject(m_mutex,INFINITE); CloseHandle(m_mutex); #else pthread_mutex_lock(&m_mutex); pthread_mutex_unlock(&m_mutex); pthread_mutex_destroy(&m_mutex); #endif } /** * * Lock * the same thread cannot lock the same mutex * more than once * **/ void CMutexClass::Lock() { ThreadId_t id = CThread::ThreadId(); try { if(CThread::ThreadIdsEqual(&m_owner,&id) ) // the mutex is already locked by this thread throw "¥n¥tthe same thread can not acquire a mutex twice!¥n"; #ifdef WINDOWS WaitForSingleObject(m_mutex,INFINITE); #else pthread_mutex_lock(&m_mutex); #endif m_owner = CThread::ThreadId(); } catch( char *psz ) { #ifdef WINDOWS MessageBoxA(NULL,&psz[2],"Fatal exception CMutexClass::Lock", MB_ICONHAND); exit(-1); #else cerr << "Fatal exception CMutexClass::Lock : " << psz; #endif } } /** * * Unlock * releases a mutex. Only the thread that acquires * the mutex can release it. * **/ void CMutexClass::Unlock() { ThreadId_t id = CThread::ThreadId(); try { if( ! CThread::ThreadIdsEqual(&id,&m_owner) ) throw "¥n¥tonly the thread that acquires a mutex can release it!"; memset(&m_owner,0,sizeof(ThreadId_t)); #ifdef WINDOWS ReleaseMutex(m_mutex); #else pthread_mutex_unlock(&m_mutex); #endif } catch ( char *psz) { #ifdef WINDOWS MessageBoxA(NULL,&psz[2],"Fatal exception CMutexClass::Unlock", MB_ICONHAND); exit(-1); #else cerr << "Fatal exception CMutexClass::Unlock : " << psz; #endif } } メンバ関数
例
int g_iStorage = 0; CMutexClass MyMutex; void StoreValue( int *pInt ) { MyMutex.Lock(); //the gate keeper. only one thread //allowed in at a time g_iStorage = *pInt; //protected data, critical code section MyMutex.Unlock(); //unblocks, allowing another thread to //access g_iStorage } CEventClassクラスCEventClassクラスは、Windowsのイベント関数、Windowsのイベントオブジェクト、UNIXの条件変数関数、およびUNIXの条件変数をカプセル化したクラスです。CEventClassクラスに組み込まれている関数は、WindowsのSetEventとCreateEvent、およびUNIXのphtread_cond_init、pthread_cond_destroy、pthread_cond_signal、pthread_cond_waitです。UNIXでは、イベント同期オブジェクトのことを条件変数といいますが、説明を簡略化するために、ここでは条件変数とイベントオブジェクトの両方をイベントオブジェクトと称しています。 #include "Thread.h" #include <iostream> using namespace std; CEventClass::CEventClass(void) :m_bCreated(TRUE) { memset(&m_owner,0,sizeof(ThreadId_t)); #ifdef WINDOWS m_event = CreateEvent(NULL,FALSE,FALSE,NULL); if( !m_event ) { m_bCreated = FALSE; } #else pthread_mutexattr_t mattr; pthread_mutexattr_init(&mattr); pthread_mutex_init(&m_lock,&mattr); pthread_cond_init(&m_ready,NULL); #endif } CEventClass::~CEventClass(void) { #ifdef WINDOWS CloseHandle(m_event); #else pthread_cond_destroy(&m_ready); pthread_mutex_destroy(&m_lock); #endif } /** * * Set * set an event to signaled * **/ void CEventClass::Set() { #ifdef WINDOWS SetEvent(m_event); #else pthread_cond_signal(&m_ready); #endif } /** * * Wait * wait for an event -- wait for an event object * to be set to signaled. Must be paired with a * call to reset within the same thread. * **/ BOOL CEventClass::Wait() { try { ThreadId_t id = CThread::ThreadId(); if( CThread::ThreadIdsEqual(&id,&m_owner) ) { throw "¥n¥tinvalid Wait call, Wait can not be called more than once" "¥n¥twithout a corresponding call to Reset!¥n"; } ThreadId_t zero; memset(&zero,0,sizeof(ThreadId_t)); if( memcmp(&zero,&m_owner,sizeof(ThreadId_t)) != 0 ) { throw "¥n¥tanother thread is already waiting on this event!¥n"; } m_owner = CThread::ThreadId(); #ifdef WINDOWS if( WaitForSingleObject(m_event,INFINITE) != WAIT_OBJECT_0 ) { return FALSE; } #else pthread_mutex_lock(&m_lock); pthread_cond_wait(&m_ready,&m_lock); return TRUE; #endif } catch( char *psz ) { #ifdef WINDOWS MessageBoxA(NULL,&psz[2],"Fatal exception CEventClass::Wait", MB_ICONHAND); exit(-1); #else cerr << "Fatal exception CEventClass::Wait: " << psz; #endif } return TRUE; } /** * * Reset * reset an event flag to unsignaled * wait must be paired with reset within the * same thread. * **/ void CEventClass::Reset() { try { ThreadId_t id = CThread::ThreadId(); if( !CThread::ThreadIdsEqual(&id,&m_owner) ) { throw "¥n¥tunbalanced call to Reset, Reset must be called from¥n" "¥n¥tthe same Wait-Reset pair!¥n"; } memset(&m_owner,0,sizeof(ThreadId_t)); #ifndef WINDOWS pthread_mutex_unlock(&m_lock); #endif } catch( char *psz ) { #ifdef WINDOWS MessageBoxA(NULL,&psz[2],"Fatal exception CEventClass::Reset", MB_ICONHAND); exit(-1); #else cerr << "Fatal exception CEventClass::Reset: " << psz; #endif } } メンバ関数
待機側スレッドでのイベントオブジェクトの使用例
CEventClass event; . . //thread code . . while(bContinueRunning) { event.Wait(); // wait for an event to occur // perform some task . . event.Reset(); // reset the event to un-signaled } . . 他のスレッドへの通知でのイベントオブジェクトの使用例
CEventClass event; . . // change some data . . event.Set(); // notify thread that an event has occurred, // set event to signaled . . CTaskクラスと非細分型スレッドこれまで私が目にしたスレッドプログラミングの例の多くは、スレッド処理用のデータをグローバル変数に格納し、ミューテックスで保護するという形でした。データを操作する処理はスレッド関数に組み込まれています。私は、そのような形のスレッド処理を、「細分型非同期スレッド処理(SAT:Specialized Asynchronous Threading)」と呼んでいます。 しかし実際には、データとその処理機能は同じオブジェクトにカプセル化されているのが理想です。私は、そのような形のスレッド処理を、「均一型非同期スレッド処理(HAT:Homogeneous Asynchronous Threading)」と呼んでいます。HATでは、各スレッドは細分化されていません。たとえば、HATのソリューションでは、印刷スレッド、I/Oスレッド、という分類はありません。代わりに、単一のスレッドで両方の種類のタスクに対応できます。タスクは、すべてを完備したオブジェクトとして実装されている(つまり、データ自体とその処理に必要な機能の両方がひとまとまりになっている)からです。CTaskクラスは、HATベースのスレッド処理を扱いやすくするための基本クラスです。 typedef enum { TaskStatusNotSubmitted, TaskStatusWaitingOnQueue, TaskStatusBeingProcessed, TaskStatusCompleted } TaskStatus_t; class CTask { private: CMutexClass m_mutex; TaskStatus_t m_state; ThreadId_t m_dwThread; public: void SetTaskStatus(TaskStatus_t state) { m_mutex.Lock(); m_state=state; m_mutex.Unlock(); } void SetId(ThreadId_t *pid) { memcpy(&m_dwThread,pid,sizeof(ThreadId_t)); } /** * * Wait * waits for up to timeoutSeconds for a task * to complete * **/ BOOL Wait(int timeoutSeconds) { timeoutSeconds = timeoutSeconds * 1000; if( Status() != TaskStatusCompleted && timeoutSeconds > 0 ) { Sleep(100); timeoutSeconds = timeoutSeconds - 100; } if( Status() == TaskStatusCompleted ) return TRUE; return FALSE; } /** * * Where * returns current state of a task * **/ TaskStatus_t Status() { TaskStatus_t state ; m_mutex.Lock(); state = m_state; m_mutex.Unlock(); return state; } void Thread(ThreadId_t *pId) { memcpy(pId,&m_dwThread,sizeof(ThreadId_t)); } CTask(){m_state=TaskStatusNotSubmitted; memset(&m_dwThread,sizeof(ThreadId_t),0); } ~CTask(){} virtual BOOL Task()=0; }; メンバ関数
CThreadクラスをまだ定義していませんが、CThreadオブジェクトとCTaskオブジェクトがどのように連係するかは、定義がなくても理解できます。これら2つのオブジェクト型の連係の概要を次に示します。 CTaskオブジェクトの処理手順
メインのCThreadクラスメンバ関数
ここまでサポートクラスについて見てきました。次はいよいよ、主役となるCThreadクラスの出番です。CThreadクラスは2種類のスレッドをサポートしています。1つはイベントドリブンのスレッド、もう1つはインターバル(間隔)ドリブンのスレッドです。イベントドリブンのスレッドとは、イベントオブジェクトに基づいてブロックされ、待ち状態が続くスレッドです。イベントオブジェクトの状態が非シグナル状態からシグナル状態に変わるまでは待ち状態が続きます。新しいイベントが発生するのは、別のスレッドがCThreadオブジェクトのキューにタスクを格納し、そのオブジェクトのスレッドに通知した(イベントオブジェクトをシグナル状態に設定した)ときです。シグナル状態になると、スレッドが起き、イベントキューからタスクをポップします。キューが空になるまでこれを続けます。 CThreadオブジェクトはタスクごとにOnTaskメンバ関数を呼び出します。タスクは先着順(FCFS:First Come First Serve)で処理されます。つまり、CThreadオブジェクトのキューに最初に格納されたタスクがまず処理され、次に2番目のタスクが処理され、という形です。キューへのアクセスはミューテックスオブジェクトによって同期されます。このため、スレッドが前のイベントを処理している最中に、別のイベントをキューに配置できます。キューが空になると、スレッドはイベントオブジェクトを非シグナル状態にリセットし、イベントオブジェクトの待ち状態に戻ります。CThreadクラスは2種類のイベントドリブンスレッドをサポートしています。細分型スレッドと非細分型スレッドです。詳細についてはCTaskを参照してください。 細分型のスレッドを実装するには、CThreadクラスから新しいクラスを派生する必要があります。派生クラスでは、そのオブジェクトのデータ型を処理するようにOnTaskの実装を再定義します。 例
#include "Thread.h" class CIncrementThread : public CThread { public: int counter; virtual BOOL OnTask( LPVOID lpv ) { ThreadId_t id; GetId(&id); if( lpv ) { int *pInt = (int *)lpv; //don't use cout here, output could be broken up due to //threading printf("¥tthread(%ld, counter+%d=%d, counter incremented¥n", id,*pInt,(counter+=*pInt)); } return TRUE; } virtual BOOL OnTask() { ThreadId_t id; GetId(&id); //don't use cout here, output could be broken up due to //threading m_mutex.Lock(); // protect the counter variable printf("¥tthread(%ld, counter++= %d, counter incremented)¥n", id,(++counter)); m_mutex.Unlock(); return TRUE; } int GetValue() { int counterValue = 0; m_mutex.Lock(); // protect the counter variable counterValue = counter; m_mutex.Unlock(); return counter; } void Reset() { m_mutex.Lock(); counter = 0; m_mutex.Unlock(); } CIncrementThread(){counter=0;} ~CIncrementThread(){} }; int main( int argc, char *argv[]) { // object allocated and thread started CIncrementThread MyThread; int two=2; while( MyThread.GetValue() < 20 ) { MyThread.Event(); // increment value by one Sleep(100); // pauses the root thread for 100 // milliseconds } MyThread.Reset(); while( MyThread.GetValue() < 40 ) { MyThread.Event(&two); Sleep(100); } } 出力
thread(5220, counter++= 1, counter incremented)
thread(5220, counter++= 2, counter incremented)
thread(5220, counter++= 3, counter incremented)
thread(5220, counter++= 4, counter incremented)
thread(5220, counter++= 5, counter incremented)
thread(5220, counter++= 6, counter incremented)
thread(5220, counter++= 7, counter incremented)
thread(5220, counter++= 8, counter incremented)
thread(5220, counter++= 9, counter incremented)
thread(5220, counter++= 10, counter incremented)
thread(5220, counter++= 11, counter incremented)
thread(5220, counter++= 12, counter incremented)
thread(5220, counter++= 13, counter incremented)
thread(5220, counter++= 14, counter incremented)
thread(5220, counter++= 15, counter incremented)
thread(5220, counter++= 16, counter incremented)
thread(5220, counter++= 17, counter incremented)
thread(5220, counter++= 18, counter incremented)
thread(5220, counter++= 19, counter incremented)
thread(5220, counter++= 20, counter incremented)
thread(5220, counter+2=2, counter incremented
thread(5220, counter+2=4, counter incremented
thread(5220, counter+2=6, counter incremented
thread(5220, counter+2=8, counter incremented
thread(5220, counter+2=10, counter incremented
thread(5220, counter+2=12, counter incremented
thread(5220, counter+2=14, counter incremented
thread(5220, counter+2=16, counter incremented
thread(5220, counter+2=18, counter incremented
thread(5220, counter+2=20, counter incremented
thread(5220, counter+2=22, counter incremented
thread(5220, counter+2=24, counter incremented
thread(5220, counter+2=26, counter incremented
thread(5220, counter+2=28, counter incremented
thread(5220, counter+2=30, counter incremented
thread(5220, counter+2=32, counter incremented
thread(5220, counter+2=34, counter incremented
thread(5220, counter+2=36, counter incremented
thread(5220, counter+2=38, counter incremented
thread(5220, counter+2=40, counter incremented
上の例では、CThreadクラスからCIncrementThreadクラスを派生しています。クラス定義で、仮想メンバ関数OnTask()とOnTask(LPVOID)を再定義しています。OnTask()の実装では、オブジェクトのカウンタ変数に1を加えます。もう一方のOnTask(LPVOID)では、整数値へのポインタを引数で受け取り、そのポインタの値をカウンタのメンバ変数に加えます。この例は、スレッドが処理できる2種類のイベントに対応しています。カウンタ変数は複数のスレッドからアクセスされる可能性があるので、CThread::m_mutexオブジェクトを使用して、単一のスレッドからのみアクセスされるようにしています。 均一型非同期スレッド処理(HAT:Homogeneous Asynchronous Threading)のスレッドの実装では、CThreadクラスとCTaskクラスの両方を使用します。 例
#include "Thread.h" class CTaskIncrementer: public CTask { private: int counter; int incr; public: void SetIncr(int iValue) { m_mutex.Lock(); incr = iValue; m_mutex.Unlock(); } int GetIncrementValue() { int incrValue; m_mutex.Lock(); incrValue=incr; m_mutex.Unlock(); return incrValue; } int GetValue() { int counterValue = 0; m_mutex.Lock(); // protect the counter variable counterValue = counter; m_mutex.Unlock(); return counter; } BOOL Task() { ThreadId_t id; Thread(&id); m_mutex.Lock(); printf("¥tthread(%ld, counter+%d=%d, counter incremented¥n", id,incr,(counter+=incr)); m_mutex.Unlock(); return TRUE; } CTaskIncrementer(){counter=0;} ~CTaskIncrementer(){} }; int main(int argc, char *argv[]) { CTaskIncrementer incr; CThread thr; incr.SetIncr(2); while( incr.GetValue() < 40 ) thr.Event(&incr); } 出力
thread(5700, counter+2=2, counter incremented
thread(5700, counter+2=4, counter incremented
thread(5700, counter+2=6, counter incremented
thread(5700, counter+2=8, counter incremented
thread(5700, counter+2=10, counter incremented
thread(5700, counter+2=12, counter incremented
thread(5700, counter+2=14, counter incremented
thread(5700, counter+2=16, counter incremented
thread(5700, counter+2=18, counter incremented
thread(5700, counter+2=20, counter incremented
thread(5700, counter+2=22, counter incremented
thread(5700, counter+2=24, counter incremented
thread(5700, counter+2=26, counter incremented
thread(5700, counter+2=28, counter incremented
thread(5700, counter+2=30, counter incremented
thread(5700, counter+2=32, counter incremented
thread(5700, counter+2=34, counter incremented
thread(5700, counter+2=36, counter incremented
thread(5700, counter+2=38, counter incremented
thread(5700, counter+2=40, counter incremented
インターバル(間隔)ドリブンのスレッドは、定められた間隔ごとに起き、環境が変化しているかを確認して、その変化に対応します。そして、次の間隔までスリープしてから、同じことを繰り返す、というスレッドです。インターバルドリブンのスレッドを実装するには、CThreadクラスを派生して、OnTask(LPVOID)を再定義します。スレッドのインスタンスを作成したら、SetThreadTypeメンバ関数を呼び出し、そのパラメータとして、ThreadTypeIntervalDrivenと、ミリ秒単位の間隔を指定します。 例
#include "Thread.h" class CIncrementThread : public CThread { public: int counter; virtual BOOL OnTask() { ThreadId_t id; GetId(&id); //don't use cout here, output could be broken up due to //threading m_mutex.Lock(); // protect the counter variable printf("¥tthread(%ld, counter++= %d, counter incremented)¥n", id,(++counter)); m_mutex.Unlock(); return TRUE; } int GetValue() { int counterValue = 0; m_mutex.Lock(); // protect the counter variable counterValue = counter; m_mutex.Unlock(); return counter; } void Reset() { m_mutex.Lock(); counter = 0; m_mutex.Unlock(); } CIncrementThread(){counter=0;} ~CIncrementThread(){} }; int main( int argc, char *argv[] ) { CIncrementThread thr; thr->SetThreadType(ThreadTypeIntervalDriven,100); Sleep(500); } 出力
thread(6104, counter++= 12, counter incremented)
thread(6104, counter++= 13, counter incremented)
thread(6104, counter++= 14, counter incremented)
thread(6104, counter++= 15, counter incremented)
thread(6104, counter++= 16, counter incremented)
まとめ以上で、スレッドオブジェクトは完成です。Linuxでテストしてみたところ、各クラスはきちんと動作しました。SunOSや他のUNIXプラットフォームにも対応しているはずですが、そちらはまだテストしていません。Windowsでコンパイルするときには、コード生成時に/Mtまたは/Mtdを必ず指定してください。このアプリケーションはマルチスレッドアプリケーションであるという意味の指定です。Linuxの場合は、次のようなメイクファイルでOKです。
CC=g++
LIBS=-lpthread -lrt
CFLAGS=-DSUNOS -DNANO_SECOND_SLEEP
OBJS=Thread.cpp EventClass.cpp MutexClass.cpp main.cpp
EXECS = thread
all: $(EXECS)
thread: $(OBJS)
$(CC) $(CFLAGS) -o thread $(OBJS) $(LIBS)
clean:; rm -f *.o $(EXECS)
著者紹介Walter Capers(Walter Capers)
Compuware社のソフトウェアアーキテクトで、ソフトウェアセキュリティを専門としている。言語はC/C++、FORTRAN、COBOL、Java。ライブラリはOpenGL、MFC、WIN32。経験のあるプラットフォームはAIX、HP-UX、SunOS、Open VMS、OSF、AS400、AIX、SGI、Linux、Windows CE、Windows。3Dグラフィックスプログラミング、TCP/IP開発、スレッド処理、クロスプラットフォーム開発、暗号化とセキュア通信、システムレベルプログラミングに精通している。PC界にOpenGLがまだ登場していなかった当時、ハイパフォーマンスな最初期のダブルバッファリング3Dグラフィックスエンジンを開発した経歴を持つ。これは、DOSとWindowsの両方をサポートし、Engineering Technology Associate社のFinite Element Model Builder(FEMB)でリアルタイム3Dグラフィックスに使われた(同社製品の情報についてはwww.eta.comを参照)。米国特許も保持している(http://www.patentstorm.us/patents/6894690.html)。
関連記事 最新トップニュース
|
|