一. 多线程要考虑的问题

前人总结出,一个线程安全的class应当满足的条件:

  1. 从多个线程访问时,其表现出正确的行为,无论操作系统如何调度这些线程,无论这些线程的执行顺序如何交织。

  2. 调用端代码无需额外的同步或其他协调动作

在写多线程程序时脑子里要有这样的意识,下面我总结了几条比较具体的注意事项。 使用多线程要考虑的问题:

  1. 线程访问资源安全(线程同步问题),这个问题,可用Mutex封装临界区,我的另一篇blog里有介绍.

  2. 线程退出安全(主线程结束前线程安全退出),可在主线程结束时,在对象析构里等待线程退出再析构.

  3. 跨线程使用的对象的生命周期的控制,要保证对象(指针)不再被使用时才析构。为了安全,少费精力,还是用shared_ptr代替原始指针吧。shared_ptr是boost库的智能指针,现在已加入标准库中,用法不说了,说一下线程中使用注意事项吧。

shared_ptr的线程安全:

shared_ptr的线程安全级别与内建类型一样,多个线程同时对一个shared_ptr进行读操作时是安全的,多个线程同时对多个shared_ptr进行写操作时是安全的,其余都是未定义的。

shared_ptr作为函数参数:

因为拷贝shared_ptr是要修改引用计数的,所以拷贝shared_ptr比拷贝原始指针成本要高,多数情况下可以以reference to const方式传递,只需要在最外层的函数有个实体对象,以后都可以用reference to const来使用这个shared_ptr。

二. 一个封装好的线程类

下面我写了个线程封装类(windows平台),线程时使用它能少写一些代码~ 源码呈上:

#ifndef HBASE_HTHREAD_H__  
#define HBASE_HTHREAD_H__  
  
#include <Windows.h>  
  
#ifdef Xxx_CALLBACK  
#include <Core/Core.h>  
#endif  
  
/** 在类A中使用HThread类: 
 *  将HThread类的对象作为类A的成员.  
 *  Start函数,启动一个线程  
 *  WaitExit函数,在A的析构里调用,保证在主线程退出前线程退出.   
 *  注意:一个HThread类同时最多只能运行一个线程。 
 *  如果使用Callback封装unsigned (__stdcall * _start_address) (void *)函数,使用更灵活. 
 */  
  
class HThread  
{  
public:  
  HThread();  
  ~HThread();  
#ifdef Xxx_CALLBACK  
  bool        Start(Xxx::Callback _cb);  
#endif  
  bool        Start(unsigned (__stdcall * _start_address) (void *), void * _arg_list);  
  int         WaitExit();  
  
  bool        IsRunning();  
  void        Detach();  
  bool        IsOpen() const     { return (NULL != handle_); }  
  HANDLE      GetHandle() const  { return handle_; }  
  
protected:  
#ifdef Xxx_CALLBACK  
  static unsigned __stdcall ThreadRuntime(void* p);  
#endif  
  
private:  
  HThread(const HThread&);  
  HThread& operator=(const HThread&);  
  
private:  
  HANDLE      handle_;  
};  
  
  
#endif  
#include <process.h>  
#include "hthread.h"  
  
HThread::HThread() : handle_(NULL)  
{}  
  
HThread::~HThread()  
{  
  Detach();   // Wait() ?  
}  
  
#ifdef Xxx_CALLBACK  
unsigned __stdcall HThread::ThreadRuntime(void* p)  
{  
  Xxx::Callback *cb = (Xxx::Callback *)p;  
  (*cb)();  
  delete cb;  
  return 0;  
}  
  
bool HThread::Start(Xxx::Callback _cb)  
{  
  Detach();  
  Xxx::Callback *cb = new Xxx::Callback(_cb);  
  handle_ = (HANDLE)_beginthreadex(0, 0, &HThread::ThreadRuntime, cb, 0, NULL);  
  return (0 != handle_);  
}  
#endif  
  
bool HThread::Start(unsigned (__stdcall * _start_address) (void *), void * _arg_list)  
{  
  Detach();  
  handle_ = (HANDLE)_beginthreadex(0, 0, _start_address, _arg_list, 0, NULL);  
  return (0 != handle_);  
}  
  
int HThread::WaitExit()  
{  
  if(!IsOpen())  
    return -1;  
  int out;  
  DWORD exit;  
  if(!GetExitCodeThread(handle_, &exit))  
    return -1;  
  if(exit != STILL_ACTIVE)  
    out = (int)exit;  
  else  
  {  
    if(WaitForSingleObject(handle_, INFINITE) != WAIT_OBJECT_0)  
      return 0;  
    out = GetExitCodeThread(handle_, &exit) ? int(exit) : int(0);  
  }  
  Detach();  
  return out;  
}  
  
bool HThread::IsRunning()  
{  
  if(!IsOpen())  
    return false;  
  
  DWORD exit;  
  if(!GetExitCodeThread(handle_, &exit))  
    return true;  
  
  return (exit == STILL_ACTIVE)? true : false;  
}  
  
void HThread::Detach()  
{  
  if(handle_) {  
    CloseHandle(handle_);  
    handle_ = 0;  
  }  
}  

三. 根据任务类型封装的线程类

在用线程处理问题时常常分两种情况:

A. 这个线程只做一件事,或循环做一件事情,做完这件事情之后就不再用它了,此时就要结束这个线程,称这种线程为单任务型。

B. 启动一个线程后,有个事情A需要在线程里完成,此时把任务A交给线程,没过多久又有个任务B要做,再把B交给这个线程去完成,也就是说这个线程能完成各种类型的任务,这种类型的线程称为多任务型,如果管理多个线程,就成线程池了。

单任务型适合用于耗时的重复性操作,比如上传1000张图片,这种线程为该任务而生,任务结束线程也就结束了。

多任务型线程长期活着,接各种耗时短的杂活,有时候会下载个文件,有时候数据库插入个图片。

说了这么多,看代码吧~

#ifndef MATERIAL_THREAD_H__  
#define MATERIAL_THREAD_H__  
  
#include <Windows.h>  
#include <Core/Core.h>  
#include <deque>  
  
class TaskThreadBase  
{  
public:  
  TaskThreadBase();  
  virtual ~TaskThreadBase();  
  
  virtual int WaitExit();  
  
  bool        IsRunning();  
  void        Detach();  
  bool        IsOpen() const     { return (NULL != handle_); }  
  HANDLE      GetHandle() const  { return handle_; }  
  
private:  
  TaskThreadBase(const TaskThreadBase&);  
  TaskThreadBase& operator=(const TaskThreadBase&);  
  
protected:  
  HANDLE      handle_;  
  bool        running_;  
};  
  
class TaskThread : public TaskThreadBase  
{  
public:  
  TaskThread();  
  ~TaskThread();  
  bool        Start(Upp::Callback _cb);  
  bool        Start(unsigned (__stdcall * _start_address) (void *), void * _arg_list);  
  
protected:  
  static unsigned __stdcall ThreadRuntime(void* p);  
};  
  
class MutiTaskThread : public TaskThreadBase  
{  
private:  
  struct Task  
  {  
    Task() {}  
    Task(Upp::Callback cb) : cb(cb) {}  
    Upp::Callback cb;      
  };  
  
public:  
  MutiTaskThread();  
  ~MutiTaskThread();  
  
  void SetMaxTaskCount(size_t task_count);  
  bool Start();  
  bool AddTask(Upp::Callback cb, bool priority = false);  
    
  virtual int WaitExit();  
    
private:  
  static unsigned __stdcall ThreadRuntime(void* p);  
  void Run();  
  void DoTask(const Task& task);  
  void RequestStop();  
  
private:  
  std::deque<Task>  tasks_;  
  size_t            max_task_count_;  
  
  Upp::Mutex        mutex_;  
  Upp::Semaphore    semaphore_;  
  bool              quit_;  
};  
  
  
#endif  
#include <process.h>  
#include "materialthread.h"  
  
TaskThreadBase::TaskThreadBase() : handle_(NULL)  
{}  
  
TaskThreadBase::~TaskThreadBase()  
{  
  Detach();   // Wait() ?  
}  
  
int TaskThreadBase::WaitExit()  
{  
  if(!IsOpen())  
    return -1;  
  int out;  
  DWORD exit;  
  if(!GetExitCodeThread(handle_, &exit))  
    return -1;  
  if(exit != STILL_ACTIVE)  
    out = (int)exit;  
  else  
  {  
    if(WaitForSingleObject(handle_, INFINITE) != WAIT_OBJECT_0)  
      return 0;  
    out = GetExitCodeThread(handle_, &exit) ? int(exit) : int(0);  
  }  
  Detach();  
  return out;  
}  
  
bool TaskThreadBase::IsRunning()  
{  
  if(!IsOpen())  
    return false;  
  
  DWORD exit;  
  if(!GetExitCodeThread(handle_, &exit))  
    return true;  
  
  return (exit == STILL_ACTIVE)? true : false;  
}  
  
void TaskThreadBase::Detach()  
{  
  if(handle_) {  
    CloseHandle(handle_);  
    handle_ = 0;  
  }  
}  
  
TaskThread::TaskThread()  
{  
}  
  
TaskThread::~TaskThread()  
{  
}  
  
unsigned __stdcall TaskThread::ThreadRuntime(void* p)  
{  
  Upp::Callback *cb = (Upp::Callback *)p;  
  (*cb)();  
  delete cb;  
  return 0;  
}  
  
bool TaskThread::Start(Upp::Callback _cb)  
{  
  Detach();  
  Upp::Callback *cb = new Upp::Callback(_cb);  
  handle_ = (HANDLE)_beginthreadex(0, 0, &TaskThread::ThreadRuntime, cb, 0, NULL);  
  return (0 != handle_);  
}  
  
bool TaskThread::Start(unsigned (__stdcall * _start_address) (void *), void * _arg_list)  
{  
  Detach();  
  handle_ = (HANDLE)_beginthreadex(0, 0, _start_address, _arg_list, 0, NULL);  
  return (0 != handle_);  
}  
  
MutiTaskThread::MutiTaskThread() : max_task_count_(1), quit_(false)  
{  
}  
  
MutiTaskThread::~MutiTaskThread()  
{  
}  
  
void MutiTaskThread::SetMaxTaskCount(size_t task_count)  
{  
  max_task_count_ = task_count;  
}  
  
bool MutiTaskThread::Start()  
{  
  if (running_) return false;  
  running_ = true;  
  handle_ = (HANDLE)_beginthreadex(0, 0, ThreadRuntime, this, 0, NULL);  
  return (0 != handle_);  
}  
  
unsigned __stdcall MutiTaskThread::ThreadRuntime(void* p)  
{  
  MutiTaskThread* self = (MutiTaskThread*)p;  
  self ->Run();  
  return 0;  
}  
  
void MutiTaskThread::RequestStop()  
{  
  quit_ = true;  
  semaphore_.Release();  
}  
  
void MutiTaskThread::Run()  
{  
  while(true)  
  {  
    Task cur_task;  
    bool empty = false;      
    {  
      Upp::Mutex::Lock lock(mutex_);  
      if (!tasks_.empty())  
        cur_task = tasks_.front();          
      else  
        empty = true;  
    }  
  
    if (quit_) break;  
  
    if (!empty) {  
      DoTask(cur_task);  
      {  
        Upp::Mutex::Lock lock(mutex_);  
        if (!tasks_.empty())  
          tasks_.pop_front();  
      }  
    }  
    else  
      semaphore_.Wait();  
  }  
  running_ = false;  
}  
  
bool MutiTaskThread::AddTask(Upp::Callback cb, bool priority /*= false*/)  
{  
  bool ret = false;  
  Upp::Mutex::Lock lock(mutex_);  
  Task task(cb);  
  size_t n = tasks_.size();  
  if (n < max_task_count_) {  
    if (priority)  
      tasks_.push_front(task);  
    else  
      tasks_.push_back(task);  
    ret = true;  
    if (0 == n)  
      semaphore_.Release();  
  }  
  
  return ret;  
}  
  
void MutiTaskThread::DoTask(const Task& task)  
{  
  task.cb();  
}  
  
int MutiTaskThread::WaitExit()  
{  
  RequestStop();  
  return TaskThreadBase::WaitExit();  
}  

(完)

作者 侯振永
写于2012 年 9月 24日