#include #include #include #include #include #include #include #include #include class CThreadPool { private: static int mChannel; private: int mRcvid; int mCoid; struct _msg_info mMsgInfo; unsigned char mBuffer[1000]; public: enum msgcode { MSGCODE_HELLO, MSGCODE_GOODBYE, MSGCODE_UNBLOCK, MSGCODE_INVALID=-1 }; CThreadPool(); ~CThreadPool(); static int getChannel() { return mChannel; }; void receiveMsg(void); void replyStatus(int status = EOK); void replyOk(void); void unblock(); int handleMsg(); }; int CThreadPool::mChannel = -1; CThreadPool::CThreadPool() { printf("%s\n", __PRETTY_FUNCTION__ ); if ( -1 == mChannel ) { mChannel = ChannelCreate( 0 ); } mCoid = ConnectAttach( ND_LOCAL_NODE, getpid(), mChannel, _NTO_SIDE_CHANNEL, 0 ); } CThreadPool::~CThreadPool() { printf("%s\n", __PRETTY_FUNCTION__ ); ConnectDetach( mCoid ); } void CThreadPool::receiveMsg() { do { mRcvid = MsgReceive( mChannel, mBuffer, sizeof mBuffer, &mMsgInfo ); } while ( mRcvid == -1 && errno == EINTR ); } void CThreadPool::replyStatus(int status) { MsgError( mRcvid, status ); } void CThreadPool::replyOk() { replyStatus(EOK); } void CThreadPool::unblock() { CThreadPool::msgcode code; MsgSendPulse( mCoid, -1, CThreadPool::MSGCODE_UNBLOCK, -1 ); } int CThreadPool::handleMsg() { CThreadPool::msgcode *code = (CThreadPool::msgcode *)mBuffer; switch( *code ) { case MSGCODE_HELLO: printf("Hello from tid %d!\n", mMsgInfo.tid ); replyOk(); break; case MSGCODE_GOODBYE: printf("Goodbye from tid %d!\n", mMsgInfo.tid ); sleep(1); // long goodbye! :v) replyOk(); break; case MSGCODE_UNBLOCK: break; default: replyStatus(ENOSYS); break; } return 0; } #define THREAD_POOL_PARAM_T CThreadPool #include THREAD_POOL_PARAM_T *block_func( THREAD_POOL_PARAM_T *ctp ) { ctp->receiveMsg(); return ctp; } THREAD_POOL_PARAM_T *context_alloc( THREAD_POOL_HANDLE_T *handle ) { CThreadPool *pool = new CThreadPool(); return pool; } void unblock_func( THREAD_POOL_PARAM_T *ctp ) { ctp->unblock(); } int handler_func( THREAD_POOL_PARAM_T *ctp ) { return ctp->handleMsg(); } void context_free( THREAD_POOL_PARAM_T *ctp ) { delete ctp; } void *worker_thread( void *arg ) { int chid, coid; CThreadPool::msgcode code; int ret; chid = CThreadPool::getChannel(); coid = ConnectAttach( ND_LOCAL_NODE, getpid(), chid, _NTO_SIDE_CHANNEL, 0 ); while(1) { code = CThreadPool::MSGCODE_HELLO; ret = MsgSend( coid, &code, sizeof code, 0, 0 ); printf("code = %d, ret = %d\n", code, ret ); code = CThreadPool::MSGCODE_GOODBYE; ret = MsgSend( coid, &code, sizeof code, 0, 0 ); printf("code = %d, ret = %d\n", code, ret ); code = CThreadPool::MSGCODE_INVALID; ret = MsgSend( coid, &code, sizeof code, 0, 0 ); printf("code = %d, ret = %d\n", code, ret ); delay(100); } return NULL; } int main(int argc, char **argv) { thread_pool_attr_t pool_attr; thread_pool_t *tpp; int id; memset( &pool_attr, 0, sizeof pool_attr ); pool_attr.handle = NULL; pool_attr.context_alloc = context_alloc; pool_attr.block_func = block_func; pool_attr.handler_func = handler_func; pool_attr.context_free = context_free; pool_attr.lo_water = 2; pool_attr.hi_water = 4; pool_attr.increment = 1; pool_attr.maximum = 50; if ((tpp = thread_pool_create( &pool_attr, 0)) == NULL) { fprintf( stderr, "%s: Unable to initialize thread pool.\n", argv[0] ); return EXIT_FAILURE; } thread_pool_start( tpp ); pthread_create( &id, NULL, worker_thread, NULL ); pthread_create( &id, NULL, worker_thread, NULL ); pthread_create( &id, NULL, worker_thread, NULL ); pthread_create( &id, NULL, worker_thread, NULL ); pthread_create( &id, NULL, worker_thread, NULL ); pthread_join( id, NULL ); return 0; }