Skip to content

线程池

简单实现一个线程池

#include <signal.h>
#include <iostream>
#include <thread>
#include <vector>
#include <queue>
#include <functional>
#include <atomic>
#include <mutex>
#include <condition_variable>
#include <chrono>
#include <unistd.h>

std::atomic<bool> is_running(true);

void signal_handler(int sig)
{
    is_running = false;
}

class ThreadUnit
{
public:
    using Fun = std::function<void(void *arg)>;
    struct Task
    {
        Fun fun;
        void *arg;
    };

    ThreadUnit() {}
    ~ThreadUnit() {}

    int init()
    {
        thread_is_running = true;
        // std::function<void(void)> f = std::bind(&ThreadUnit::loop, this);
        // m_thread = std::thread(f);
        m_thread = std::thread(&ThreadUnit::loop, this);
        return 0;
    }
    int deinit()
    {
        thread_is_running = false;
        {
            std::unique_lock<std::mutex> lock(m_mutex);
            m_cond.notify_all();
            while (!m_task.empty()) {
                m_task.pop();
            }
        }

        if (m_thread.joinable()) {
            m_thread.join();
        }
        return 0;
    }

    void addTask(Task task)
    {
        std::unique_lock<std::mutex> lock(m_mutex);
        m_task.push(task);
        m_cond.notify_all();
    }
    void loop()
    {
        while (thread_is_running) {
            std::unique_lock<std::mutex> lock(m_mutex);
            std::cout << "wait......" << std::endl;
            while(m_task.empty() && thread_is_running) {
                m_cond.wait(lock);
            }
            std::cout << "m_task size = " << m_task.size() << std::endl;
            if (thread_is_running == false) {
                break;
            }
            Task task = m_task.front();
            m_task.pop();
            lock.unlock();

            //do task
            // std::cout << "thread id: " << std::this_thread::get_id() << std::endl;
            task.fun(task.arg);

            lock.lock();
        }
    }

    const int getTaskNum()
    {
        std::unique_lock<std::mutex> lock(m_mutex);
        return (int)m_task.size();
    }

private:
    std::atomic<bool> thread_is_running;
    std::mutex m_mutex;
    std::condition_variable m_cond;
    std::queue<Task> m_task;
    std::thread m_thread;
};

class ThreadPool
{
public:
    ThreadPool() {}
    ~ThreadPool() {}

    int init(int num)
    {
        if (num < 1) {
            std::cout << "thread num must >= 1" << std::endl;
            return -1;
        }
        m_thread_num = num;
        thread_is_running = true;
        for (int i = 0; i < m_thread_num; i++) {
            m_thread_unit.push_back(new ThreadUnit);
            m_thread_unit[i]->init();
        }
        return 0;
    }
    int deinit()
    {
        thread_is_running = false;

        // auto it = m_thread_unit.begin();
        // for ( ; it != m_thread_unit.end(); it++) {
        //     (*it)->deinit();
        //     delete *it;
        // }
        for (auto i : m_thread_unit) {
            i->deinit();
            delete i;
        }
        return 0;
    }

    int selectThread()
    {
        int index = 0;
        int min_thread_task = m_thread_unit[0]->getTaskNum();
        for (int i = 0; i < m_thread_unit.size(); i++) {
            if (m_thread_unit[i]->getTaskNum() < min_thread_task) {
                min_thread_task = m_thread_unit[i]->getTaskNum();
                index = i;
            }
        }
        std::cout << "thread[" << index << "] has task[" << min_thread_task
                  << "] is lightly loaded, add task."
                  << std::endl;
        return index;
    }

    void submitTask(ThreadUnit::Task task)
    {
        m_thread_unit[selectThread()]->addTask(task);
    }

private:
    int m_thread_num;
    std::atomic<bool> thread_is_running;
    std::vector<ThreadUnit *> m_thread_unit;
};

void task(void *arg)
{
    std::cout << "arg is " << (char *)arg << std::endl;
    std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}

int main()
{
    signal(SIGINT, signal_handler);
    signal(SIGQUIT, signal_handler);
    signal(SIGTERM, signal_handler);

    ThreadPool threadPool;
    threadPool.init(4);
    //wait thread is start
    std::this_thread::sleep_for(std::chrono::milliseconds(1000));
    threadPool.submitTask(ThreadUnit::Task{task, (void *)"111"});
    threadPool.submitTask(ThreadUnit::Task{task, (void *)"222"});
    threadPool.submitTask(ThreadUnit::Task{task, (void *)"333"});
    threadPool.submitTask(ThreadUnit::Task{task, (void *)"444"});
    threadPool.submitTask(ThreadUnit::Task{task, (void *)"555"});
    threadPool.submitTask(ThreadUnit::Task{task, (void *)"666"});

    while (is_running) {
        std::this_thread::sleep_for(std::chrono::milliseconds(1000));
    }

    threadPool.deinit();

    std::cout << "main exit" << std::endl;
    return 0;
}