feat: 添加异步任务功能类 DAsync

DAsync
能够使得 GUI 程序极为方便的支持异步任务,实现非阻塞界面,能够在同一上下文中执行
任务,该特性使得对即有程序的异步化改造变得非常简单容易,可以保持现有的程序逻辑
而实现异步化的特性,且尽可能的减少了线程同步带来的使用不便以及封装了底层实现的
复杂性,对外提供精简好用的接口。

DTimedLoop
提供了两种执行方式,一种是定时多少毫秒后退出,另一种是调用 exit 后退出循环继续往
下执行。并且提供了计时接口对执行时间进行统计,打印,方便程序的调试、性能把控。

Log: 添加功能类,方便 GUI 程序使用异步任务
Change-Id: Ia214d746ab421302e8106c96221249264654401d
This commit is contained in:
Wang Penga 2021-09-09 13:01:04 +08:00
parent 3c671e35be
commit 90bc4ae323
11 changed files with 1632 additions and 5 deletions

4
debian/api.json vendored

File diff suppressed because one or more lines are too long

View File

@ -0,0 +1,48 @@
######################################################################
# Automatically generated by qmake (3.1) Thu Aug 19 09:48:31 2021
######################################################################
TEMPLATE = app
TARGET = thread_util
INCLUDEPATH += .
QT+= core widgets testlib
CONFIG += c++11
# The following define makes your compiler warn you if you use any
# feature of Qt which has been marked as deprecated (the exact warnings
# depend on your compiler). Please consult the documentation of the
# deprecated API in order to know how to port your code away from it.
DEFINES += QT_DEPRECATED_WARNINGS
CONFIG(debug, debug|release) {
LIBS += -lgtest -lgmock
QMAKE_CXXFLAGS += -g -Wall -fprofile-arcs -ftest-coverage -fsanitize=address -fsanitize-recover=address -O2
QMAKE_LFLAGS += -g -Wall -fprofile-arcs -ftest-coverage -fsanitize=address -fsanitize-recover=address -O2
QMAKE_CXX += -g -fprofile-arcs -ftest-coverage -fsanitize=address -fsanitize-recover=address -O2
}
LIBS += -pthread
QMAKE_CXXFLAGS += -pthread
#QMAKE_CXXFLAGS_RELEASE += -fvisibility=hidden
#DEFINES += LIBDTKCORE_LIBRARY
# You can also make your code fail to compile if you use deprecated APIs.
# In order to do so, uncomment the following line.
# You can also select to disable deprecated APIs only up to a certain version of Qt.
#DEFINES += QT_DISABLE_DEPRECATED_BEFORE=0x060000 # disables all the APIs deprecated before Qt 6.0.0
INCLUDEPATH += $$PWD/../../src
INCLUDEPATH += $$PWD/../../src/base
INCLUDEPATH += $$PWD/../../src/util
# Input
HEADERS += \
$${PWD}/../../src/dtkcore_global.h \
$${PWD}/../../src/util/dasync.h \
$${PWD}/../../src/util/dthreadutils.h
SOURCES += \
$${PWD}/../../src/util/dthreadutils.cpp \
main.cpp

View File

@ -0,0 +1,460 @@
/*
* Copyright (C) 2021 ~ 2021 UnionTech Technology Co., Ltd.
*
* Author: Wang Peng <993381@qq.com>
*
* Maintainer: Wang Peng <wangpenga@uniontech.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <QWidget>
#include <QTimer>
#include <iostream>
#include <QEventLoop>
#include <QApplication>
#include <unistd.h>
#include "util/dasync.h"
#include "util/dthreadutils.h"
#ifdef QT_DEBUG
#include <sanitizer/asan_interface.h>
#endif
DCORE_USE_NAMESPACE
#define XLog() qDebug() << __LINE__ << " "
#define RUN_IN_SUB_THREAD 1
#define THREAD_BEGIN std::thread thread([&]{
#define THREAD_END }); thread.detach();
#if RUN_IN_SUB_THREAD
# define OPT_THREAD_BEGIN THREAD_BEGIN
# define OPT_THREAD_END THREAD_END
#else
# define OPT_THREAD_BEGIN
# define OPT_THREAD_END
#endif
#define TIMED_EXIT(second, loop) QTimer::singleShot(second * 1000, [&]{ loop.exit(); })
struct Configure
{
QObject *o = nullptr;
QWidget *w = nullptr;
} conf;
static Configure *config = &conf;
int main1(int argc, char *argv[]);
int main2(int argc, char *argv[]);
int main3(int argc, char *argv[]);
int main(int argc, char *argv[]) {
QApplication app(argc, argv);
// 将以下所有新建的对象都托管给 w、o
config->w = new QWidget;
config->o = new QObject;
config->w->show();
main1(argc, argv);
main2(argc, argv);
main3(argc, argv);
QTimer::singleShot(1 * 1000, [&]{
config->w->deleteLater();
config->o->deleteLater();
});
QTimer::singleShot(2 * 1000, [&]{
qApp->exit(0);
});
qDebug() << "finished xxxxxxxxxxxxxxxxxxxxxxxxxxxx";
return app.exec();
}
#pragma mark main1 ------------------------------------------------------------
// 这是一个最基础的示例程序
DAsync<int, int> *testTask() {
auto task = new DAsync<int, int>(config->o);
int i = 0;
while (i < 100) {
task->postData(i++);
}
task->post([](int arg) {
Q_ASSERT(!D_THREAD_IN_MAIN());
XLog() << "run in child thread: " << arg;
return arg * 2;
})->then([&](int arg) {
Q_ASSERT(D_THREAD_IN_MAIN());
XLog() << "run in main thread:" << arg;
})->start();
task->postData(i++);
return task;
}
void runTest()
{
OPT_THREAD_BEGIN
auto task = testTask();
// 删除前应该先等待所有的任务执行完或取消未执行的任务
// 主线程中只能用 isFinished 查询状态,用 cancelAll 取消之后的任务队列
// 其它子线程中(非 post、非 then 函数)中可以直接 waitForFinished 然后删除
// 也可以使用 task->setParent 去托管,自动释放
if (!D_THREAD_IN_MAIN()) {
task->waitForFinished(false);
// task->deleteLater();
}
OPT_THREAD_END
}
int main1(int argc, char *argv[]) {
QEventLoop loop;
TIMED_EXIT(3, loop);
XLog() << "in main thread: " << pthread_self();
// DAsync 依赖事件循环,不能被阻塞,比如 thread.join 就不行
// 运行在主线程中和运行在子线程中应该有一样的结果才对
OPT_THREAD_BEGIN
runTest();
OPT_THREAD_END
return loop.exec();
}
#pragma mark main2 ------------------------------------------------------------
int main2(int argc, char *argv[]) {
QEventLoop loop;
TIMED_EXIT(3, loop);
OPT_THREAD_BEGIN
QWidget *w = DThreadUtil::runInMainThread([&](){
QWidget *w = new QWidget(config->w);
w->setBackgroundRole(QPalette::HighlightedText);
w->show();
return w;
});
// w->show();
OPT_THREAD_END
return loop.exec();
}
#pragma mark main3 ------------------------------------------------------------
int test1();
int test2();
int test3();
int test4();
int test5();
int test6();
int test7();
int test8();
int main3(int argc, char *argv[]) {
std::clog << "in main thread: " << pthread_self() << std::endl;
// 示例 1输入输出都是基本类型
test1();
// 示例 2输入基本类型输出复合类型
test2();
// 示例 3输入输出都是复合类型
test3();
// 示例 4输入输出都是自定义类型的指针
test4();
// 示例 5, 异步执行一个输入复合类型、没有输出的一次性任务,执行结束后通知主线程
// 间歇性输入数据,要保证生产者消费者模型的正确性。
test5();
// 示例 6, 异步执行一个没有输入、输出参数的一次性任务,执行结束后通知在主线
test6();
// 示例 7, 异步运行一个没有输入参数的一次性任务,执行后在主线程处理结果
test7();
// 示例 8, 在子线程中异步创建一个 widget 并显示出来:
test8();
// std::thread thread([&]{ test8(); });
// thread.detach();
return 0;
}
int test1() {
QEventLoop loop;
TIMED_EXIT(2, loop);
// 加 static 防止函数执行结束后线程中继续 postData 访问已经释放的栈上变量
static auto task1 = new DAsync<int, int>(config->o);
static int i = 0;
while (i < 100) {
task1->postData(i++);
}
task1->post([](int arg) {
XLog() << "async task: " << arg;
return arg * 2;
})->then([](int arg) {
XLog() << "get result: " << arg;
})->start();
return loop.exec();
}
int test2() {
QEventLoop loop;
static auto task2 = new DAsync<int, QString>(config->o);
static int i = 0;
static bool stopFlag = false;
// TIMED_EXIT(3, loop);
QTimer::singleShot(3 * 1000, [&]{
stopFlag = true;
loop.exit();
});
while(i < 100) {
task2->postData(i++);
}
task2->post([](int arg) -> QString {
XLog() << "async task: " << arg;
return QString::number(arg);
})->then([](QString arg) {
XLog() << "get result: " << arg;
})->start();
THREAD_BEGIN
while (!stopFlag && i < 220) {
XLog() << "post data: " << i;
task2->postData(i++);
usleep(200 * 1000);
}
THREAD_END
// task2->waitForFinished();
// task2->deleteLater();
return loop.exec();
}
int test3() {
QEventLoop loop;
TIMED_EXIT(3, loop);
static auto task3 = new DAsync<QString, QString>(config->o);
static int i = 0;
while (i < 100) {
task3->postData(QString::number(i++));
}
task3->post([](QString arg) -> QString {
XLog() << "async task: " << arg;
return arg;
})->then([](QString arg) {
XLog() << "get result " << arg;
})->start();
// task3->waitForFinished();
// task3->deleteLater();
return loop.exec();
}
int test4() {
QEventLoop loop;
TIMED_EXIT(3, loop);
class Test : public QObject {
public:
Test(int in, QObject *parent = nullptr)
: QObject (parent)
, count (in)
{
}
int count = 0;
};
static auto task4 = new DAsync<Test *, Test*>(config->o);
static int i = 0;
while (i < 100) {
task4->postData(new Test(i++, config->o));
}
task4->post([](Test *arg) -> Test * {
XLog() << "async task: " << arg->count;
return arg;
})->then([](Test *arg) {
XLog() << "get result " << arg->count;
})->start();
return loop.exec();
}
int test5() {
QEventLoop loop;
// TIMED_EXIT(3, loop);
static bool stopFlag = false;
QTimer::singleShot(3 * 1000, [&]{
stopFlag = true;
loop.exit();
});
static auto task5 = new DAsync<QString, void>(config->o);
static int i = 0;
while (i < 100) {
task5->postData(QString::number(i++));
}
task5->post([](QString arg) {
XLog() << "async task." << arg;
})->then([]() {
XLog() << "get void";
})->start();
OPT_THREAD_BEGIN
while (!stopFlag) {
usleep(200 * 1000);
task5->postData(QString::number(i++));
}
OPT_THREAD_END
return loop.exec();
}
int test6() {
QEventLoop loop;
TIMED_EXIT(1, loop);
static auto task6 = new DAsync<void, void>(config->o);
task6->post([]() {
XLog() << "async task.";
})->then([]() {
XLog() << "get result.";
})->start();
// 如果只想在子线程执行一个任务,不需要主线程的任何处理,按照以下方式,
// 其实也只是只设置一个函数就可以了:
// task6->post([]() { XLog() << "async task."; });
// task6->startUp();
return loop.exec();
}
int test7() {
QEventLoop loop;
TIMED_EXIT(1, loop);
static auto task7 = new DAsync<void, QString>(config->o);
static int i = 0;
task7->post([&]() {
XLog() << "async task.";
return QString("%1").arg(i++);
})->then([](QString arg) {
XLog() << "get result " << arg;
})->start();
return loop.exec();
}
int test8() {
QEventLoop loop;
TIMED_EXIT(1, loop);
static auto task8 = new DAsync<void, QString>(config->o);
// 注意,任务是异步执行的,传进去的一定不能是栈区变量!
static int i = 0;
task8->post([&]() -> QString {
Q_ASSERT(!D_THREAD_IN_MAIN());
QWidget *w = DThreadUtil::runInMainThread([](){
Q_ASSERT(D_THREAD_IN_MAIN());
QWidget *w = new QWidget(config->w);
w->setBackgroundRole(QPalette::Text);
w->show();
return w;
});
// 在外面调用并不合适,虽然也能显示出来。比如 mac 上这么用就显示不出来
// w->setBackgroundRole(QPalette::Text);
// w->show();
XLog() << "async task." << QString("%1").arg(i++);
return QString("%1").arg(i++);
})->then([](QString str) {
XLog() << "get result " << str;
})->start();
return loop.exec();
}

View File

@ -1,2 +1,4 @@
TEMPLATE = subdirs
SUBDIRS += expintf-example
SUBDIRS += dasync-example

505
src/util/dasync.h Normal file
View File

@ -0,0 +1,505 @@
/*
* Copyright (C) 2021 ~ 2021 UnionTech Technology Co., Ltd.
*
* Author: Wang Peng <993381@qq.com>
*
* Maintainer: Wang Peng <wangpenga@uniontech.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef DASYNC_H
#define DASYNC_H
#include <dtkcore_global.h>
#include <QQueue>
#include <QMutex>
#include <QThread>
#include <QMutexLocker>
#include <QCoreApplication>
#include <functional>
#include <type_traits>
DCORE_BEGIN_NAMESPACE
#define GUARDED_BY(...)
#define D_THREAD_IN_MAIN() (qApp->instance() && qApp->instance()->thread() == QThread::currentThread())
// TODO: 添加 DtkCorePrivate 到 dtkcore_global.h
namespace DtkCorePrivate
{
// 本类是继承实现的,只有子类方法是安全的,暂不对外提供接口
template<class T>
class DSafeQueue : public QQueue<T> {
public:
inline void enqueue(const T &t) {
QMutexLocker lkc(&m_mtx);
QQueue<T>::enqueue(t);
}
inline T dequeue() {
QMutexLocker lkc(&m_mtx);
return QQueue<T>::dequeue();
}
inline int size() {
QMutexLocker lkc(&m_mtx);
return QQueue<T>::size();
}
inline T &head() {
QMutexLocker lkc(&m_mtx);
return QQueue<T>::head();
}
inline const T &head() const {
QMutexLocker lkc(&m_mtx);
return QQueue<T>::head();
}
private:
QMutex m_mtx;
};
// 内部使用,不对外提供接口
class MainWorker : public QObject {
Q_OBJECT
std::function<void(void *)> m_handle;
std::function<void(void *)> m_handleProxy;
std::function<void(void)> m_handleV;
std::function<void(void)> m_handleVProxy;
bool m_dasyncDestroyed = false;
char __padding[7];
public:
void setDAsyncDestroyed() {
m_dasyncDestroyed = true;
}
bool dasyncDestroyed() {
return m_dasyncDestroyed;
}
public:
MainWorker(QObject *parent = nullptr)
: QObject (parent)
{
// Ensure that QApplication is initialized
Q_ASSERT(qApp->instance() && qApp->instance()->thread());
moveToThread(qApp->instance()->thread());
bool isStartInMain = D_THREAD_IN_MAIN();
QObject::connect(this, &MainWorker::sigRunInMain,
this, &MainWorker::slotRunInMain,
isStartInMain ? Qt::AutoConnection : Qt::BlockingQueuedConnection);
QObject::connect(this, &MainWorker::sigRunInMainVoid,
this, &MainWorker::slotRunInMainVoid,
isStartInMain ? Qt::AutoConnection : Qt::BlockingQueuedConnection);
}
// 1. handle arg is non void
template <typename FUNC, typename ArgType>
typename std::enable_if<!std::is_void<ArgType>::value>::type
setHandle(FUNC &&func) {
m_handle = [&] (void *arg) {
DSafeQueue<ArgType> *q = static_cast<DSafeQueue<ArgType>*>(arg);
while (q && q->size()) {
// 这里是 then 回调真正执行到的地方
func(q->dequeue());
}
};
m_handleProxy = [this] (void *arg) {
if (m_handle) {
m_handle(arg);
}
};
}
// 2. handle arg is void
template <typename FUNC, typename ArgType>
typename std::enable_if<std::is_void<ArgType>::value>::type
setHandle(FUNC &&func) {
m_handleV = [&] (void) {
// 这里是 then 回调真正执行到的地方
func();
};
m_handleVProxy = [this] (void) {
if (m_handleV) {
m_handleV();
}
};
}
Q_SIGNALS:
void sigRunInMain(void *arg);
void sigRunInMainVoid();
public Q_SLOTS:
void slotRunInMain(void *arg) {
Q_ASSERT(D_THREAD_IN_MAIN());
if (m_handleProxy && !m_dasyncDestroyed) {
m_handleProxy(arg);
}
}
void slotRunInMainVoid(void) {
Q_ASSERT(D_THREAD_IN_MAIN());
if (m_handleVProxy && !m_dasyncDestroyed) {
m_handleVProxy();
}
}
};
}
class DAsyncState : public QObject {
Q_OBJECT
public:
explicit DAsyncState(QObject *parent = nullptr) noexcept
: QObject (parent)
{
}
enum AsyncTaskState {
NotReady = 0x00, // initial state
Ready = 0x02, // deffered = false
Running = 0x04, // thread started
Pending = Ready | Running, // condition wait
Cancel = 0x08, // set thread canceled
WaitFinished = 0x10, // wiaitForFinished
Finished = 0x20, // thread exit
Forever = 0x30, // TODO: DAsync<void, xxx>::post execute forever
};
Q_DECLARE_FLAGS(AsyncTaskStatus, AsyncTaskState)
};
// Template classes not supported by Q_OBJECT, so class MainWorker is independent
template <typename DataTypeIn, typename DataTypeOut>
class DAsync : public QObject {
class Helper;
std::mutex m_mtxIn;
std::condition_variable m_cvIn;
std::mutex m_mtxForWaitTask;
std::condition_variable m_cvForWaitTask;
class Guard {
DAsync *m_as;
// 如果 DAsync 已经析构了,工作线程还没结束
// DAsync 中的有些数据就不能在 guard 的析构里面访问了
bool m_dasDestructed = false;
public:
bool destructed() {
return m_dasDestructed;
}
void setDestructed() {
m_dasDestructed = true;
}
public:
explicit Guard(DAsync *as) noexcept : m_as (as)
{
m_as->m_status.setFlag(DAsyncState::Ready);
m_as->m_status.setFlag(DAsyncState::Finished, false); // 防止重入
}
~Guard() {
if (destructed()) {
return;
}
m_as->m_threadGuard = nullptr;
m_as->m_status.setFlag(DAsyncState::Finished);
m_as->m_status.setFlag(DAsyncState::Ready, false); // 防止重入
if (m_as->m_status.testFlag(DAsyncState::WaitFinished)) {
m_as->m_cvForWaitTask.notify_one();
}
setPending(false);
}
void setPending(bool isPending) {
if (!destructed()) {
m_as->m_status.setFlag(DAsyncState::Pending, isPending);
}
}
};
Guard *m_threadGuard = nullptr;
/*
* m_QueueIn PostData
* m_QueueOut post then
* emitHelper post 线
* 使 void * 使 qRegisterMetaType
*/
template<typename T, typename Enable = void>
struct DataQueueType { DtkCorePrivate::DSafeQueue<T> m_queue; };
template<class T>
struct DataQueueType<T, typename std::enable_if<std::is_void<T>::value>::type> { };
using DataInQueue = DataQueueType<DataTypeIn>;
using DataOutQueue = DataQueueType<DataTypeOut>;
// Queue 中处理完的结果经由 m_QueueIn 变量暂存,然后经由 signal、slot 传给 then 中的回调函数做参数
DataInQueue m_QueueIn;
DataOutQueue m_QueueOut;
// 存储不同类型的输入函数
template<typename T1, typename T2, typename Enable1 = void, typename Enable2 = void>
struct FuncType {
};
template<typename T1, typename T2>
struct FuncType<T1, T2,
typename std::enable_if<std::is_void<T1>::value>::type,
typename std::enable_if<std::is_void<T2>::value>::type> {
std::function <void(void)> cbp;
};
template<typename T1, typename T2>
struct FuncType<T1, T2,
typename std::enable_if<!std::is_void<T1>::value>::type,
typename std::enable_if<!std::is_void<T2>::value>::type> {
std::function <T2(T1)> cbp;
};
template<typename T1, typename T2>
struct FuncType<T1, T2,
typename std::enable_if<std::is_void<T1>::value>::type,
typename std::enable_if<!std::is_void<T2>::value>::type> {
std::function <T2(void)> cbp;
};
template<typename T1, typename T2>
struct FuncType<T1, T2,
typename std::enable_if<!std::is_void<T1>::value>::type,
typename std::enable_if<std::is_void<T2>::value>::type> {
std::function <void(T1)> cbp;
};
std::mutex m_mtxFunc;
FuncType<DataTypeIn, DataTypeOut> m_func GUARDED_BY(m_mtxFunc);
DAsyncState::AsyncTaskStatus m_status;
public:
explicit DAsync(QObject *parent = nullptr) noexcept
: QObject (parent)
, m_func ({nullptr})
, m_status (DAsyncState::NotReady)
{
m_mainWorker = new DtkCorePrivate::MainWorker();
m_helper = new Helper(this, this);
}
~DAsync() {
if (m_threadGuard) {
m_threadGuard->setDestructed();
}
m_status.setFlag(DAsyncState::Cancel);
if (m_status.testFlag(DAsyncState::Pending)) {
m_cvIn.notify_one();
}
if (m_mainWorker) {
m_mainWorker->setDAsyncDestroyed();
m_mainWorker->deleteLater();
m_mainWorker = nullptr;
}
}
private:
// 1. input void & emit void
template <typename PostInType, typename EmitInType>
typename std::enable_if<std::is_void<PostInType>::value && std::is_void<EmitInType>::value>::type
emitHelper() {
m_func.cbp();
Q_EMIT m_mainWorker->sigRunInMainVoid();
}
// 2. input non void & emit non void
template <typename PostInType, typename EmitInType>
typename std::enable_if<!std::is_void<PostInType>::value && !std::is_void<EmitInType>::value>::type
emitHelper() {
m_QueueOut.m_queue.enqueue(m_func.cbp(m_QueueIn.m_queue.dequeue()));
Q_EMIT m_mainWorker->sigRunInMain(static_cast<void *>(&(m_QueueOut.m_queue)));
}
// 3. input non void & emit void
template <typename PostInType, typename EmitInType>
typename std::enable_if<!std::is_void<PostInType>::value && std::is_void<EmitInType>::value>::type
emitHelper() {
m_func.cbp(m_QueueIn.m_queue.dequeue());
Q_EMIT m_mainWorker->sigRunInMainVoid();
}
// 4. input void & emit non void
template <typename PostInType, typename EmitInType>
typename std::enable_if<std::is_void<PostInType>::value && !std::is_void<EmitInType>::value>::type
emitHelper() {
m_QueueOut.m_queue.enqueue(m_func.cbp());
Q_EMIT m_mainWorker->sigRunInMain(static_cast<void *>(&(m_QueueOut.m_queue)));
}
public:
void startUp() {
if (m_status.testFlag(DAsyncState::Cancel)) {
return;
}
m_helper->start();
}
void cancelAll() {
m_status.setFlag(DAsyncState::Cancel);
if (m_status.testFlag(DAsyncState::Pending)) {
m_cvIn.notify_one();
}
}
bool isFinished() {
return m_status.testFlag(DAsyncState::Finished);
}
/*
* QTimer 使 waitForFinished线
* 线使 waitForFinished()
* true waitForFinished(false) +
* cancelAll,
* cancelAll waitForFinished
* 退DAsync了
* QObject 线使
* cancelAll + isFinished
* DAsync 线退
*/
void waitForFinished(bool cancelAllWorks = true) {
Q_ASSERT(!D_THREAD_IN_MAIN());
if (cancelAllWorks) {
cancelAll();
}
if (!m_status.testFlag(DAsyncState::Finished)) {
if (m_status.testFlag(DAsyncState::Pending)) {
m_cvIn.notify_one();
}
m_status.setFlag(DAsyncState::WaitFinished);
std::unique_lock <std::mutex> lck(m_mtxForWaitTask);
m_cvForWaitTask.wait(lck);
}
}
// 输入数据不是 void 类型则依赖于 m_QueueIn
template <typename FUNC, typename InputType = DataTypeIn>
typename std::enable_if<!std::is_void<InputType>::value, Helper *>::type
post(FUNC &&func) {
m_func.cbp = std::forward<FUNC>(func);
if (m_postProxy) {
return m_helper;
}
m_postProxy = [this] () {
std::thread thread([this] {
if (m_status.testFlag(DAsyncState::Cancel)) {
return;
}
Guard guard(this);
m_threadGuard = &guard;
std::unique_lock <std::mutex> lck(m_mtxIn);
while (true) {
while (!m_status.testFlag(DAsyncState::Ready) || !m_QueueIn.m_queue.size()) {
guard.setPending(true);
// 定时查询 flag防止睡死的情况发生
m_cvIn.wait_for(lck, std::chrono::milliseconds(200));
if (guard.destructed() || m_status.testFlag(DAsyncState::Cancel)) {
return;
}
}
guard.setPending(false);
while (m_func.cbp && m_QueueIn.m_queue.size()) {
emitHelper<DataTypeIn, DataTypeOut>();
}
}
});
thread.detach();
};
return m_helper;
}
template <typename FUNC, typename InputType = DataTypeIn>
typename std::enable_if<std::is_void<InputType>::value, Helper *>::type
post(FUNC &&func) {
{
std::lock_guard<std::mutex> lckFunc(m_mtxFunc);
m_func.cbp = std::forward<FUNC>(func);
}
if (m_postProxy) {
return m_helper;
}
m_postProxy = [this] () {
std::thread thread([this] {
if (m_status.testFlag(DAsyncState::Cancel)) {
return;
}
Guard guard(this);
m_threadGuard = &guard;
std::unique_lock <std::mutex> lck(m_mtxIn);
while (true) {
if (!m_status.testFlag(DAsyncState::Ready)) {
guard.setPending(true);
// 定时查询 flag防止睡死的情况发生
m_cvIn.wait_for(lck, std::chrono::milliseconds(200));
if (guard.destructed() || m_status.testFlag(DAsyncState::Cancel)){
return;
}
}
guard.setPending(false);
if (m_func.cbp) {
std::lock_guard<std::mutex> lckFunc(m_mtxFunc);
emitHelper<DataTypeIn, DataTypeOut>();
m_func.cbp = nullptr; // reset
}
}
});
thread.detach();
};
return m_helper;
}
// only support DAsync<non void type, ...>
template <typename InputType = DataTypeIn>
typename std::enable_if<!std::is_void<InputType>::value>::type
postData(const InputType &data) {
if (Q_UNLIKELY(!m_status.testFlag(DAsyncState::Cancel))) {
m_QueueIn.m_queue.enqueue(data);
if (m_status.testFlag(DAsyncState::Pending)) {
m_cvIn.notify_one();
}
}
}
private:
std::function<void()> m_postProxy;
class Helper : public QObject {
DAsync *m_async;
public:
explicit Helper(DAsync *async, QObject *parent = nullptr) noexcept
: QObject (parent)
, m_async (async)
{
}
template <typename FUNC>
Helper *then(FUNC &&func) {
m_async->m_mainWorker->template setHandle<FUNC, DataTypeOut>(std::forward<FUNC>(func));
return this;
}
// 仅启动,非阻塞
void start(bool immediately = true) {
if (m_async->m_postProxy) {
m_async->m_postProxy();
}
if (!immediately) {
m_async->m_status.setFlag(DAsyncState::Ready, false);
} else {
m_async->m_status.setFlag(DAsyncState::Ready);
if (m_async->m_status.testFlag(DAsyncState::Pending)) {
m_async->m_cvIn.notify_one();
}
}
}
};
Helper *m_helper = nullptr;
DtkCorePrivate::MainWorker *m_mainWorker = nullptr;
};
DCORE_END_NAMESPACE
#endif //DASYNC_H

168
src/util/dtimedloop.cpp Normal file
View File

@ -0,0 +1,168 @@
/*
* Copyright (C) 2021 ~ 2021 UnionTech Technology Co., Ltd.
*
* Author: Wang Peng <993381@qq.com>
*
* Maintainer: Wang Peng <wangpenga@uniontech.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "dtimedloop.h"
#include <DObject>
#include <DObjectPrivate>
#include <dthreadutils.h>
#include <QTime>
#include <QTimer>
#include <QLoggingCategory>
DCORE_BEGIN_NAMESPACE
#ifdef QT_DEBUG
Q_LOGGING_CATEGORY(logTimedLoop, "dtk.dtimedloop")
#else
Q_LOGGING_CATEGORY(logTimedLoop, "dtk.dtimedloop", QtInfoMsg)
#endif
class DTimedLoopPrivate : public DObjectPrivate
{
D_DECLARE_PUBLIC(DTimedLoop)
public:
DTimedLoopPrivate(DTimedLoop *qq = nullptr);
~DTimedLoopPrivate();
int m_returnCode = 0;
QTime m_startTime;
QTime m_stopTime;
bool m_timeDumpFlag = false;
char __padding[3];
QString m_exectionName;
void setExecutionName(const QString &executionName);
class LoopGuard {
DTimedLoopPrivate *m_p = nullptr;
public:
LoopGuard(DTimedLoopPrivate *p)
: m_p (p)
{
m_p->m_startTime = QTime::currentTime();
}
~LoopGuard() {
m_p->m_stopTime = QTime::currentTime();
if (!m_p->m_timeDumpFlag) {
return;
}
if (Q_UNLIKELY(m_p->m_exectionName.isEmpty())) {
qCDebug(logTimedLoop(),
"The execution time is %-5d ms",
m_p->m_startTime.msecsTo(QTime::currentTime()));
} else {
qCDebug(logTimedLoop(),
"The execution time is %-5d ms for \"%s\"",
m_p->m_startTime.msecsTo(QTime::currentTime()),
m_p->m_exectionName.toLocal8Bit().data());
m_p->m_exectionName.clear();
}
}
};
};
DTimedLoopPrivate::DTimedLoopPrivate(DTimedLoop *qq)
: DObjectPrivate (qq)
{
}
DTimedLoopPrivate::~DTimedLoopPrivate()
{
}
void DTimedLoopPrivate::setExecutionName(const QString &executionName)
{
m_exectionName = executionName;
}
DTimedLoop::DTimedLoop(QObject *parent) noexcept
: QEventLoop (parent)
, DObject (*new DTimedLoopPrivate(this))
{
}
DTimedLoop::DTimedLoop() noexcept
: QEventLoop ()
, DObject (*new DTimedLoopPrivate(this))
{
}
DTimedLoop::~DTimedLoop()
{
}
int DTimedLoop::runningTime() {
Q_D(DTimedLoop);
if (QEventLoop::isRunning()) {
return d->m_startTime.msecsTo(QTime::currentTime());
}
return d->m_startTime.msecsTo(d->m_stopTime);
}
void DTimedLoop::setTimeDump(bool flag)
{
Q_D(DTimedLoop);
d->m_timeDumpFlag = flag;
}
void DTimedLoop::exit(int returnCode)
{
// 避免在子线程中提前被执行
DThreadUtil::runInMainThread([this, returnCode]{
QEventLoop::exit(returnCode);
});
}
int DTimedLoop::exec(QEventLoop::ProcessEventsFlags flags)
{
Q_D(DTimedLoop);
DTimedLoopPrivate::LoopGuard guard(d);
return QEventLoop::exec(flags);
}
int DTimedLoop::exec(int durationTimeMs, QEventLoop::ProcessEventsFlags flags)
{
Q_D(DTimedLoop);
int runningTime = durationTimeMs < 0 ? 0 : durationTimeMs;
QTimer::singleShot(runningTime, [this] {
QEventLoop::exit(0);
});
DTimedLoopPrivate::LoopGuard guard(d);
return QEventLoop::exec(flags);
}
int DTimedLoop::exec(const QString &executionName, QEventLoop::ProcessEventsFlags flags)
{
Q_D(DTimedLoop);
d->setExecutionName(executionName);
return exec(flags);
}
int DTimedLoop::exec(int durationMs, const QString &executionName, QEventLoop::ProcessEventsFlags flags)
{
Q_D(DTimedLoop);
d->setExecutionName(executionName);
return exec(durationMs, flags);
}
DCORE_END_NAMESPACE

62
src/util/dtimedloop.h Normal file
View File

@ -0,0 +1,62 @@
/*
* Copyright (C) 2021 ~ 2021 UnionTech Technology Co., Ltd.
*
* Author: Wang Peng <993381@qq.com>
*
* Maintainer: Wang Peng <wangpenga@uniontech.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef DTIMEDLOOP_H
#define DTIMEDLOOP_H
#include <dtkcore_global.h>
#include <DObject>
#include <QEventLoop>
DCORE_BEGIN_NAMESPACE
class DObject;
class DTimedLoopPrivate;
class DTimedLoop : public QEventLoop, public DObject {
Q_OBJECT
public:
explicit DTimedLoop() noexcept;
explicit DTimedLoop(QObject *parent) noexcept;
~DTimedLoop();
// 如果是 isRunning 则返回从开始到现在的 exec 执行时间,否则返回上次运行的时间
int runningTime();
void setTimeDump(bool flag = true);
void exit(int returnCode = 0);
// 方式1不传定时时间如果不退出就一直执行配合 exit 使用
// 方式2传入durationMs 参数的是定时执行的,也能调用 exit 提前退出
// 如果传入了 executionName 就会为本次执行设置一个名字,会输出到 log
// 在执行结束将会打印 exec 的执行时间,可以用 setTimeDump 控制其是否打印
int exec(QEventLoop::ProcessEventsFlags flags = QEventLoop::AllEvents);
int exec(int durationMs, QEventLoop::ProcessEventsFlags flags = QEventLoop::AllEvents);
int exec(const QString &executionName, QEventLoop::ProcessEventsFlags flags = QEventLoop::AllEvents);
int exec(int durationMs, const QString &executionName, QEventLoop::ProcessEventsFlags flags = QEventLoop::AllEvents);
private:
Q_DISABLE_COPY(DTimedLoop)
D_DECLARE_PRIVATE(DTimedLoop)
};
DCORE_END_NAMESPACE
#endif // DTIMEDLOOP_H

View File

@ -10,7 +10,9 @@ HEADERS += \
$$PWD/dexportedinterface.h \
$$PWD/dvtablehook.h \
$$PWD/dfileservices.h \
$$PWD/dthreadutils.h
$$PWD/dthreadutils.h \
$$PWD/dasync.h \
$$PWD/dtimedloop.h
INCLUDEPATH += $$PWD
@ -39,7 +41,8 @@ SOURCES += \
$$PWD/dpinyin.cpp \
$$PWD/dexportedinterface.cpp \
$$PWD/dvtablehook.cpp \
$$PWD/dthreadutils.cpp
$$PWD/dthreadutils.cpp \
$$PWD/dtimedloop.cpp
linux {
QT += dbus

View File

@ -26,12 +26,14 @@ int main(int argc, char *argv[])
{
QCoreApplication app(argc, argv);
DTimedLoop loop;
testing::InitGoogleTest(&argc, argv);
int ret = RUN_ALL_TESTS();
int retVal = RUN_ALL_TESTS();
#ifdef QT_DEBUG
__sanitizer_set_report_path("asan.log");
#endif
return ret;
return loop.exec(0, "main execution") + retVal;
}

366
tests/ut_dasync.cpp Normal file
View File

@ -0,0 +1,366 @@
/*
* Copyright (C) 2021 ~ 2021 UnionTech Technology Co., Ltd.
*
* Author: Wang Peng <993381@qq.com>
*
* Maintainer: Wang Peng <wangpenga@uniontech.com>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <QTest>
#include <QTimer>
#include <gtest/gtest.h>
#include "dasync.h"
#include "dtimedloop.h"
#include "ut_dutil.h"
DCORE_USE_NAMESPACE
// 为了方便托管 std::thread 而创建的辅助类
class Thread : public QObject {
std::thread *m_thread = nullptr;
public:
template<typename FUNC>
Thread(FUNC &&func, QObject *parent = nullptr)
: QObject (parent)
, m_thread (new std::thread(func))
{
}
void detach() {
m_thread->detach();
}
void join() {
m_thread->join();
}
virtual ~Thread()
{
if (m_thread) {
delete m_thread;
m_thread = nullptr;
}
}
};
bool gInSubFlag = true;
// 全局内存托管,防止 asan 报错
QObject gRoot;
template <typename FUNC>
void DetachedRun(FUNC &&func) {
Thread *thread = new Thread(func, &gRoot);
if (!gInSubFlag) {
func();
}
thread->detach();
}
class ut_DAsync : public testing::Test, public QObject
{
public:
class Test : public QObject {
public:
Test(int in, QObject *parent = nullptr)
: QObject (parent)
, count (in)
{
}
int count = 0;
};
ut_DAsync() { }
virtual ~ut_DAsync() {}
virtual void SetUp() {
task1 = new DAsync<int, int>(this);
// 测试 task2 在线程内部new能正常工作
task3 = new DAsync<int, QString>(this);
// task4~task7 测试固定的API功能大同小异在函数内部创建
task8 = new DAsync<void, QString>(this);
task9 = new DAsync<void, void>(this);
task10 = new DAsync<int, void>(this);
m_loop = new DTimedLoop(this);
m_loop->setTimeDump(true);
}
virtual void TearDown() {
// 释放资源要用 deleteLater 或者托管内存
// 避免线程不同步时直接 delete 导致 asan 偶发性报使用释放掉的堆内存
}
// 首先要保证这些不同类型的模板参数的声明没有编译问题
DAsync<int ,int> *task1 = nullptr;
DAsync<int ,int> *task2 = nullptr;
DAsync<int, QString> *task3 = nullptr;
DAsync<QString, QString> *task4 = nullptr;
DAsync<Test *, Test*> *task5 = nullptr;
DAsync<QString, void> *task6 = nullptr;
// 第一个模板参数是 void 的类型的仅执行一次函数调用
DAsync<void, void> *task7 = nullptr;
DAsync<void, QString> *task8 = nullptr;
DAsync<void, void> *task9 = nullptr;
DAsync<int, void> *task10 = nullptr;
// m_loop 须是 static 的asan 会有误报
static DTimedLoop *m_loop;
};
DTimedLoop *ut_DAsync::m_loop = nullptr;
TEST_F(ut_DAsync, testRunInCorrectThread)
{
// 测试 post 中的函数一定在非主线程异步调用
// 返回结果传到 then 中的函数在主线程中调用
task1->post([](int arg) {
HAVE_FUN(ASSERT_TRUE(!D_THREAD_IN_MAIN()));
return arg;
})->then([&](int arg) {
ASSERT_EQ(arg, 1);
HAVE_FUN(ASSERT_TRUE(D_THREAD_IN_MAIN()));
m_loop->exit();
})->start();
task1->postData(1);
m_loop->exec("testRunInCorrectThread");
}
TEST_F(ut_DAsync, testRunInSubThread)
{
// 和上面 testRunInCorrectThread 测项类似
// task2, 测试 task 在非主线程中依然能正确创建和运行
bool startedFlag = false;
DetachedRun([&]{
// 这里用托管也可以的,但是会有警告
task2 = new DAsync<int, int>(/*this*/);
task2->post([](int arg) {
HAVE_FUN(ASSERT_TRUE(!D_THREAD_IN_MAIN()));
return arg;
})->then([&](int arg) {
static int i = 0;
ASSERT_EQ(arg, i++);
HAVE_FUN(ASSERT_TRUE(D_THREAD_IN_MAIN()));
if (i > 3) m_loop->exit();
})->start();
startedFlag = true;
});
DetachedRun([&]{
static int i = 0;
while (true) {
// 要自己设置 flag因为在不同的线程中
// 到这里 task2 还不一定已经被创建完毕
if (startedFlag) {
task2->postData(i++);
if (i > 3) {
break;
}
}
usleep(100*1000);
}
});
m_loop->exec("testRunInSubThread");
}
TEST_F(ut_DAsync, testMultiThreadSynchronization)
{
// task3, 在子线程中输入 0~999, 在 post 中乘以 2 输出到主线程 then 中
static int n = 1000;
static int result = 0;
task3->post([](int arg) -> QString {
return QString("%1").arg(arg * 2);
})->then([](QString arg) {
static int i = 0;
ASSERT_TRUE(arg == QString("%1").arg(i * 2));
i++;
result = n;
})->start();
DetachedRun([&] {
int i = 0;
while (i < n) {
if (!task3->isFinished()) {
task3->postData(i++);
}
}
task3->cancelAll();
});
DetachedRun([&] {
// 该线程启动后会一直阻塞等待,直到 cancelAll 被调用,
// 说明任务结束了,就可以往下走,判断执行结果
task3->waitForFinished(false);
ASSERT_EQ(result, n);
m_loop->exit();
});
m_loop->exec("testMultiThreadSynchronization");
}
TEST_F(ut_DAsync, testOneTimeTask)
{
// task8, 测试一次性任务,确保两个函数只会进来执行一次
task8->post([] {
static int i = 0;
return QString("testOneTimeTask%1").arg(i++);
})->then([&](const QString &arg) {
ASSERT_TRUE(arg == "testOneTimeTask0");
m_loop->exit();
})->start();
m_loop->exec("test task8");
// task9, 测试一次性任务,确保只有 post 的函数能够被执行到
task9->post([&]{
m_loop->exit();
})->start();
// task9->startUp(); # 或者在合适的时候调用
m_loop->exec("test task9");
// task10, 测试仅有 post 的任务的正确执行
task10->post([&] (int arg) {
static int j = 0;
ASSERT_EQ(arg, j++);
if (j == 2) {
m_loop->exit();
}
});
task10->postData(0);
task10->startUp();
task10->postData(1);
m_loop->exec("test task10");
}
TEST_F(ut_DAsync, testFixedApi)
{
// 测试这些固定的 API 能够正确处理不同的参数类型
// task4
task4 = new DAsync<QString, QString>(this);
static int i = 0;
while (i < 100) {
task4->postData(QString::number(i++));
}
task4->post([](const QString &arg) -> QString {
static int j = 0;
HAVE_FUN(ASSERT_TRUE(arg == QString::number(j++)));
return arg;
})->then([](QString arg) {
static int k = 0;
ASSERT_TRUE(arg == QString::number(k++));
if (k == 100) {
m_loop->exit();
}
})->start();
m_loop->exec("test task4");
// 和上面的不一样的地方就是 postData 在 start 前后都调用了
// task5
i = 0;
task5 = new DAsync<Test *, Test*>(this);
while (i < 50) {
task5->postData(new Test(i++, this));
}
task5->post([](Test *arg) -> Test * {
static int j = 0;
HAVE_FUN(ASSERT_TRUE(arg->count == j++));
return arg;
})->then([](Test *arg) {
static int k = 0;
HAVE_FUN(ASSERT_TRUE(arg->count == k++));
if (k == 100) {
m_loop->exit();
}
})->start();
while (i < 100) {
task5->postData(new Test(i++, this));
}
m_loop->exec("test task5");
// task6
i = 0;
task6 = new DAsync<QString, void>(this);
while (i < 50) {
task6->postData(QString::number(i++));
}
task6->post([](QString arg) {
static int j = 0;
HAVE_FUN(ASSERT_TRUE(arg == QString::number(j++)));
})->then([]() {
static int k = 0;
k++;
if (k == 100) {
m_loop->exit();
}
})->start(false);
DetachedRun([&]{
usleep(100 * 1000);
while (i < 100) {
task6->postData(QString::number(i++));
}
task6->startUp();
});
m_loop->exec("test task6");
// task7
task7 = new DAsync<void, void>(this);
task7->post([]() {
static int j = 0;
HAVE_FUN(ASSERT_TRUE(0 == j++));
})->then([]() {
static int k = 0;
HAVE_FUN(ASSERT_TRUE(0 == k++));
m_loop->exit();
})->start();
m_loop->exec("test task7");
}

View File

@ -18,6 +18,15 @@
#pragma once
#include <gtest/gtest.h>
#include "dtimedloop.h"
DCORE_USE_NAMESPACE
// 有返回值的 lambda 表达式、函数里面使用 ASSERT_XXX
// 总之,不管有没有返回值,用它就对了
#ifndef HAVE_FUN
#define HAVE_FUN(X) [&](){X;}();
#endif
class ut_DUtil : public testing::Test
{