openGauss数据库源码学习指标采集、预测与异常检测

opengauss 源码解析

指标采集、预测与异常检测

代码位置:/gausskernel/dbmind/tools/anomaly_detection

各模组在整体结构上的组合在摩天轮论坛上官方解析文章已经叙述的相对完整详尽。该报告对应为具体模块内部的实现。

Agent 模块

代码位置: ~/agent Agent模块负责数据库指标数据采集与发送,从代码的结构上看,可以分为 3 个子模块,即DBSource(/db_source.py)MemoryChannel(/channel.py)HttpSink(/sink.py),负责整合组织各个模块进行协作部分的代码位于~/metric_agent.py 的 agent_main()方法中。

DBSource

代码位置:~/agent/db_source.py DBSource承担数据采集的功能,其承担的三个 task 可以在 agent_main()部分直观的看到,分别为database_exporter,os_exporter,wdr。该三个 task 的代码位于 anomaly_detection/task 中.

在 metric_agent()中的使用

os_exporter

负责收集部署 opengauss 的设备上系统的部分性能数据。

对象方法 对应实现
cpu_usage()/memory_usage() linux 命令 ps -ux 获取
io_read()/io_write() linux 命令 pidstat 获取
io_wait() iostat 获取
disk_space() 通过 sql 获取路径,再通过命令 du -sh 获取

可以看到该 task 收集的数据包括 cpu 使用率,io 读写,io 等待,内存使用以及硬盘空间存储部分。

DatabaseExporter

负责收集数据库方面的性能数据。

对象方法 对应实现
guc*parameter() 通过 sql 语句查询 pg_setting 中的数据,包括工作内存,共享缓冲区的大小以及最大的连接数量
current_connections() select count(1) from pg_stat_activity
gps()每秒事务量 通过 sql 语句从 gs_sql_count 中获取 select/update/insert/delete 计数,间隔 0.1s 采样,乘以 10 作为结果
process() linux 命令 ps -aux 获取进程信息,通过分割筛选等处理获取进程数据,返回 key 为(pid)*(process),value为(cpu_usage:memory_usage)的字典
WDR(Workload Diagnosis Report)

WDR 基于两次不同时间点系统的性能快照数据,生成两个时间点之间的性能表现报表,用于诊断数据库内核的性能故障。而该部分的 wdr 相关的仅仅是一小部分,wdrsummary级和detail级别的性能数据比起内置的这部分数据要丰富的多。该 task 模块中主要通过 sql 语句进行计数器的查询

MemoryChannel

代码位置:~/agent/channel.py 整体为一个存放数据的队列,结构比较简单

对象方法 对应实现
put() 尝试向队列中放置数据,超过最大限度时 log 提醒并舍去新数据
take() 数据出列并返回其值
size() 返回队列数据量

HttpSink

代码位置:~/agent/sink.py 其从MemoryChannel获取数据,并根据 metric_agent.py 中提供的协议ssl/http进行转发,重试次数为 5 次,间隔 1s

def process(self): agent_logger.info('Begin send data to {url}.'.format(url=self._url)) while self.running: contents = self._channel.take() if not contents: time.sleep(0.5) continue contents.update(**{'flag': {'host': self.db_host, 'port': self.db_port, 'type': self.db_type}}) retry_times = 5 while retry_times: try: req = request.Request(self._url, headers=_JSON_HEADER, data=json.dumps(contents).encode('utf-8'), method='POST') request.urlopen(req, context=self.context) break except Exception as e: agent_logger.error("{error}, retry...".format(error=str(e))) retry_times -= 1 if not retry_times: raise time.sleep(1.0) time.sleep(self._interval)