MySQL 连接池
2025-04-01 17:41:42

为了提高MySQL数据库(基于C/S设计)的访问瓶颈,除了在服务器端增加缓存服务器缓存常用的数据之外(例如redis),还可以增加连接池,来提高MySQL Server的访问效率,在高并发情况下,大量的 TCP三次握手、MySQL Server连接认证、MySQL Server关闭连接回收资源和TCP四次挥手所耗费的性能时间也是很明显的,增加连接池就是为了减少这一部分的性能损耗。

Gitee 源码

连接池变量设计

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
// MySQLConnectionPool.h
#ifndef MYSQLCONNECTIONPOOL_H
#define MYSQLCONNECTIONPOOL_H

#include <string>
#include <queue>
#include <mutex>
#include <atomic>
#include <thread>
#include<condition_variable>
#include <memory>
#include <functional>

#include "Connection.h"

using namespace std;

/*
实现连接池功能模块
*/

class ConnectionPool
{
public:
// 获取连接池对象实例
static ConnectionPool* getConnectionPool();
// 给外部提供接口,从连接池中获取一个可用的空闲连接
shared_ptr<Connection> getConnection(); // 智能指针自动管理外部用完连接的“释放” -> 归还连接池

private:
ConnectionPool(); // 单例 -> 构造函数私有化

bool loadConfigFile(); // 从配置文件中加载配置项

// 运行在独立的线程中,专门负责生产新连接
void produceConnectionTask();

// 运行在独立的线程中,专门负责释放空闲连接
void scannerConnectionTask();

string _ip; // MySQL的ip地址
unsigned short _port; // MySQL的端口号(默认3306)
string _username; // MySQL登录用户名
string _password; // MySQL登录密码
string _dbname; // MySQL数据库名称
int _initSize; // 连接池的初始连接量
int _maxSize; // 连接池的最大连接量
int _maxIdleTime; // 连接池最大空闲时间
int _connectionTimeout; // 连接池获取连接的超时时间

queue<Connection*> _connectionQue; // 存储MySQL连接的队列
mutex _queueMutex; // 维护连接队列的线程安全互斥锁
atomic_int _connectionCnt; // 记录所创建的connection连接的总数量
condition_variable cv; // 设置条件变量,用于连接生产线程和连接消费线程的通信

};

#endif

连接池功能设计

  1. 连接池只需要一个实例,故ConnectionPool单例模式进行设计。
1
2
3
4
5
6
// 线程安全的懒汉单例函数接口
ConnectionPool* ConnectionPool::getConnectionPool()
{
static ConnectionPool pool; // 编译器lock和unlock
return &pool;
}
  1. ConnectionPool中可以获取和MySQL连接的Connection

  2. 空闲连接Connection全部维护在一个线程安全的Connection队列中,使用互斥锁来保证队列的线程安全。

1
2
3
4
5
6
7
class ConnectionPool
{
private:
queue<Connection*> _connectionQue; // 存储MySQL连接的队列
mutex _queueMutex; // 维护连接队列的线程安全互斥锁
atomic_int _connectionCnt; // 记录所创建的connection连接的总数量
}
  1. 如果Connection队列为空,还需要再获取连接,此时需要动态创建连接,上限数量是maxSize
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// 运行在独立的线程中,专门负责生产新连接
void ConnectionPool::produceConnectionTask()
{
/*
队列为空 → 生产连接 → 通知消费者
队列非空 → 进入等到状态(等待状态锁是释放的),以便消费者消费
*/
for (;;)
{
// 生产者拿到锁后,消费者无法加锁
unique_lock<mutex> lock(_queueMutex);
while (!_connectionQue.empty())
{
cv.wait(lock); // 队列不空,此时生产线程进入等待状态
}

// 连接数量未达上限,继续创建新的连接
if (_connectionCnt < _maxSize)
{
Connection* p = new Connection();
p->connect(_ip, _port, _username, _password, _dbname);
p->refreshAliveTime(); // 刷新开始空闲的起始时间
_connectionQue.push(p);
_connectionCnt++;
}

// 通知消费者线程,可以消费连接了
cv.notify_all();
} // 解锁,消费者有机会获得锁开始消费
}
  1. 队列中空闲时间超过maxIdleTime会被释放掉,只保留初始的initSize个连接即可。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 运行在独立的线程中,专门负责释放空闲连接
void ConnectionPool::scannerConnectionTask()
{
for (;;)
{
// 通过sleep模拟定时效果
this_thread::sleep_for(chrono::seconds(_maxIdleTime));

// 扫描整个队列,释放多余的连接
unique_lock<mutex> lock(_queueMutex);
while (_connectionCnt > _initSize)
{
Connection* p = _connectionQue.front();
if (p->getAliveTime() > (_maxIdleTime * 1000))
{
_connectionQue.pop();
_connectionCnt--;
delete p; // 调用~Connection()释放连接
}
else
{
break; // 队头的连接未超过最大存活时间,其他时间肯定没有
}
}
}
}
  1. Connection队列为空且连接数量已达上限maxSize,则等待connectionTimeout时候若还获取不到空闲的连接,那么获取连接失败。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// 给外部提供接口,从连接池中获取一个可用的空闲连接
shared_ptr<Connection> ConnectionPool::getConnection()
{
unique_lock<mutex> lock(_queueMutex);
while (_connectionQue.empty())
{
// 在_connectionTimeout这段时间内,若连接池非空,都会被通知可以消费了;若超时(未被唤醒或被唤醒但未抢到锁),则视为获取连接失败
if (cv_status::timeout == cv.wait_for(lock, chrono::milliseconds(_connectionTimeout)))
{
if (_connectionQue.empty())
{
LOG("获取空闲连接超时了...获取连接失败");
return nullptr;
}
}
}

/*
shared_ptr智能指针析构时,会把connection资源直接delete
相当于调用connection的析构函数,connection会被close
故需要自定义shared_ptr释放资源的方式,将connection直接归还到queue当中
*/
shared_ptr<Connection> sp(_connectionQue.front(),
[&](Connection* pcon) {
// 此处是在服务器应用线程中调用,故一定要考虑队列的线程安全
unique_lock<mutex> lock(_queueMutex);
pcon->refreshAliveTime(); // 刷新开始空闲的起始时间
_connectionQue.push(pcon);
});

_connectionQue.pop();
cv.notify_all(); // 消费完通知生产者线程检查一下,若队列为空,则进行生产
return sp;
}

智能指针的合理使用

用户获取到的连接用shared_ptr来管理,用lambda表达式定制连接释放的功能。

1
2
3
4
5
6
7
8
9
10
11
12
   /*
shared_ptr智能指针析构时,会把connection资源直接delete
相当于调用connection的析构函数,connection会被close
故需要自定义shared_ptr释放资源的方式,将connection直接归还到queue当中
*/
shared_ptr<Connection> sp(_connectionQue.front(),
[&](Connection* pcon) {
// 此处是在服务器应用线程中调用,故一定要考虑队列的线程安全
unique_lock<mutex> lock(_queueMutex);
pcon->refreshAliveTime(); // 刷新开始空闲的起始时间
_connectionQue.push(pcon);
});
  • 这样可以保证不真正释放连接池里的MySQL连接,而是把连接归还到连接池中!

生产者-消费者线程模型

MySQL连接和销毁的操作采用生产者和消费者模型来设计,同时结合条件变量、互斥锁控制线程安全。
生产者和消费者线程各司其职,生产者线程一旦检测到存放MySQL连接的队列为空就会根据实际情况(连接量是否已达最大限制)来选择是否创建新的MySQL连接;而消费者线程会定期存放MySQL连接的队列,根据实际情况(连接是否达到最大空闲时间和连接数)选择是否释放MySQL连接。

连接池测试

1
2
3
4
5
6
7
# 建表语句
CREATE TABLE user (
id INT AUTO_INCREMENT PRIMARY KEY, -- 添加一个自增主键
name VARCHAR(50) NOT NULL, -- 用户名,最大长度为 50
age INT NOT NULL, -- 年龄
sex VARCHAR(10) NOT NULL -- 性别
);
  • demo2:单线程往数据库里插入1000条数据。
  • demo3:不用MySQL连接池,开启4个线程往数据库中插入1000条数据。
  • demo4:使用MySQL连接池,开启4个线程往数据库中插入1000条数据。

image.png

  • demo3需要43s,demo4仅需2s左右。(连接池初始10个连接,最大1024个连接,可以在配置文件修改。