线程池
简单实现一个线程池
#include <signal.h>
#include <iostream>
#include <thread>
#include <vector>
#include <queue>
#include <functional>
#include <atomic>
#include <mutex>
#include <condition_variable>
#include <chrono>
#include <unistd.h>
std::atomic<bool> is_running(true);
void signal_handler(int sig)
{
is_running = false;
}
class ThreadUnit
{
public:
using Fun = std::function<void(void *arg)>;
struct Task
{
Fun fun;
void *arg;
};
ThreadUnit() {}
~ThreadUnit() {}
int init()
{
thread_is_running = true;
// std::function<void(void)> f = std::bind(&ThreadUnit::loop, this);
// m_thread = std::thread(f);
m_thread = std::thread(&ThreadUnit::loop, this);
return 0;
}
int deinit()
{
thread_is_running = false;
{
std::unique_lock<std::mutex> lock(m_mutex);
m_cond.notify_all();
while (!m_task.empty()) {
m_task.pop();
}
}
if (m_thread.joinable()) {
m_thread.join();
}
return 0;
}
void addTask(Task task)
{
std::unique_lock<std::mutex> lock(m_mutex);
m_task.push(task);
m_cond.notify_all();
}
void loop()
{
while (thread_is_running) {
std::unique_lock<std::mutex> lock(m_mutex);
std::cout << "wait......" << std::endl;
while(m_task.empty() && thread_is_running) {
m_cond.wait(lock);
}
std::cout << "m_task size = " << m_task.size() << std::endl;
if (thread_is_running == false) {
break;
}
Task task = m_task.front();
m_task.pop();
lock.unlock();
//do task
// std::cout << "thread id: " << std::this_thread::get_id() << std::endl;
task.fun(task.arg);
lock.lock();
}
}
const int getTaskNum()
{
std::unique_lock<std::mutex> lock(m_mutex);
return (int)m_task.size();
}
private:
std::atomic<bool> thread_is_running;
std::mutex m_mutex;
std::condition_variable m_cond;
std::queue<Task> m_task;
std::thread m_thread;
};
class ThreadPool
{
public:
ThreadPool() {}
~ThreadPool() {}
int init(int num)
{
if (num < 1) {
std::cout << "thread num must >= 1" << std::endl;
return -1;
}
m_thread_num = num;
thread_is_running = true;
for (int i = 0; i < m_thread_num; i++) {
m_thread_unit.push_back(new ThreadUnit);
m_thread_unit[i]->init();
}
return 0;
}
int deinit()
{
thread_is_running = false;
// auto it = m_thread_unit.begin();
// for ( ; it != m_thread_unit.end(); it++) {
// (*it)->deinit();
// delete *it;
// }
for (auto i : m_thread_unit) {
i->deinit();
delete i;
}
return 0;
}
int selectThread()
{
int index = 0;
int min_thread_task = m_thread_unit[0]->getTaskNum();
for (int i = 0; i < m_thread_unit.size(); i++) {
if (m_thread_unit[i]->getTaskNum() < min_thread_task) {
min_thread_task = m_thread_unit[i]->getTaskNum();
index = i;
}
}
std::cout << "thread[" << index << "] has task[" << min_thread_task
<< "] is lightly loaded, add task."
<< std::endl;
return index;
}
void submitTask(ThreadUnit::Task task)
{
m_thread_unit[selectThread()]->addTask(task);
}
private:
int m_thread_num;
std::atomic<bool> thread_is_running;
std::vector<ThreadUnit *> m_thread_unit;
};
void task(void *arg)
{
std::cout << "arg is " << (char *)arg << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
int main()
{
signal(SIGINT, signal_handler);
signal(SIGQUIT, signal_handler);
signal(SIGTERM, signal_handler);
ThreadPool threadPool;
threadPool.init(4);
//wait thread is start
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
threadPool.submitTask(ThreadUnit::Task{task, (void *)"111"});
threadPool.submitTask(ThreadUnit::Task{task, (void *)"222"});
threadPool.submitTask(ThreadUnit::Task{task, (void *)"333"});
threadPool.submitTask(ThreadUnit::Task{task, (void *)"444"});
threadPool.submitTask(ThreadUnit::Task{task, (void *)"555"});
threadPool.submitTask(ThreadUnit::Task{task, (void *)"666"});
while (is_running) {
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
threadPool.deinit();
std::cout << "main exit" << std::endl;
return 0;
}