为了提高MySQL数据库(基于C/S设计)的访问瓶颈,除了在服务器端增加缓存服务器缓存常用的数据之外(例如redis),还可以增加连接池,来提高MySQL Server的访问效率,在高并发情况下,大量的 TCP三次握手、MySQL Server连接认证、MySQL Server关闭连接回收资源和TCP四次挥手所耗费的性能时间也是很明显的,增加连接池就是为了减少这一部分的性能损耗。
Gitee 源码
连接池变量设计
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
| #ifndef MYSQLCONNECTIONPOOL_H #define MYSQLCONNECTIONPOOL_H
#include <string> #include <queue> #include <mutex> #include <atomic> #include <thread> #include<condition_variable> #include <memory> #include <functional>
#include "Connection.h"
using namespace std;
class ConnectionPool { public: static ConnectionPool* getConnectionPool(); shared_ptr<Connection> getConnection();
private: ConnectionPool();
bool loadConfigFile(); void produceConnectionTask();
void scannerConnectionTask();
string _ip; unsigned short _port; string _username; string _password; string _dbname; int _initSize; int _maxSize; int _maxIdleTime; int _connectionTimeout;
queue<Connection*> _connectionQue; mutex _queueMutex; atomic_int _connectionCnt; condition_variable cv;
};
#endif
|
连接池功能设计
- 连接池只需要一个实例,故
ConnectionPool
以单例模式进行设计。
1 2 3 4 5 6
| ConnectionPool* ConnectionPool::getConnectionPool() { static ConnectionPool pool; return &pool; }
|
从ConnectionPool
中可以获取和MySQL连接的Connection
。
空闲连接Connection
全部维护在一个线程安全的Connection
队列中,使用互斥锁来保证队列的线程安全。
1 2 3 4 5 6 7
| class ConnectionPool { private: queue<Connection*> _connectionQue; mutex _queueMutex; atomic_int _connectionCnt; }
|
- 如果
Connection
队列为空,还需要再获取连接,此时需要动态创建连接,上限数量是maxSize
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| void ConnectionPool::produceConnectionTask() {
for (;;) { unique_lock<mutex> lock(_queueMutex); while (!_connectionQue.empty()) { cv.wait(lock); } if (_connectionCnt < _maxSize) { Connection* p = new Connection(); p->connect(_ip, _port, _username, _password, _dbname); p->refreshAliveTime(); _connectionQue.push(p); _connectionCnt++; }
cv.notify_all(); } }
|
- 队列中空闲时间超过
maxIdleTime
会被释放掉,只保留初始的initSize
个连接即可。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| void ConnectionPool::scannerConnectionTask() { for (;;) { this_thread::sleep_for(chrono::seconds(_maxIdleTime));
unique_lock<mutex> lock(_queueMutex); while (_connectionCnt > _initSize) { Connection* p = _connectionQue.front(); if (p->getAliveTime() > (_maxIdleTime * 1000)) { _connectionQue.pop(); _connectionCnt--; delete p; } else { break; } } } }
|
Connection
队列为空且连接数量已达上限maxSize
,则等待connectionTimeout
时候若还获取不到空闲的连接,那么获取连接失败。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| shared_ptr<Connection> ConnectionPool::getConnection() { unique_lock<mutex> lock(_queueMutex); while (_connectionQue.empty()) { if (cv_status::timeout == cv.wait_for(lock, chrono::milliseconds(_connectionTimeout))) { if (_connectionQue.empty()) { LOG("获取空闲连接超时了...获取连接失败"); return nullptr; } } }
shared_ptr<Connection> sp(_connectionQue.front(), [&](Connection* pcon) { unique_lock<mutex> lock(_queueMutex); pcon->refreshAliveTime(); _connectionQue.push(pcon); });
_connectionQue.pop(); cv.notify_all(); return sp; }
|
智能指针的合理使用
用户获取到的连接用shared_ptr
来管理,用lambda表达式定制连接释放的功能。
1 2 3 4 5 6 7 8 9 10 11 12
|
shared_ptr<Connection> sp(_connectionQue.front(), [&](Connection* pcon) { unique_lock<mutex> lock(_queueMutex); pcon->refreshAliveTime(); _connectionQue.push(pcon); });
|
- 这样可以保证不真正释放连接池里的MySQL连接,而是把连接归还到连接池中!
生产者-消费者线程模型
MySQL连接和销毁的操作采用生产者和消费者模型来设计,同时结合条件变量、互斥锁控制线程安全。
生产者和消费者线程各司其职,生产者线程一旦检测到存放MySQL连接的队列为空就会根据实际情况(连接量是否已达最大限制)来选择是否创建新的MySQL连接;而消费者线程会定期存放MySQL连接的队列,根据实际情况(连接是否达到最大空闲时间和连接数)选择是否释放MySQL连接。
连接池测试
1 2 3 4 5 6 7
| # 建表语句 CREATE TABLE user ( id INT AUTO_INCREMENT PRIMARY KEY, name VARCHAR(50) NOT NULL, age INT NOT NULL, sex VARCHAR(10) NOT NULL );
|
- demo2:单线程往数据库里插入1000条数据。
- demo3:不用MySQL连接池,开启4个线程往数据库中插入1000条数据。
- demo4:使用MySQL连接池,开启4个线程往数据库中插入1000条数据。

- demo3需要43s,demo4仅需2s左右。(连接池初始10个连接,最大1024个连接,可以在配置文件修改。)