最近拿到一批泄漏的 url - 账号 - 密码情报 为 txt 格式 夹杂大量脏数据 通过几次试错成功打通了数据读取 -> 数据清洗 -> 数据入库 -> 后处理(去重、优化)的整个流程 这篇文章是对这一个过程中的思考、产出的记录
# 基础设施
# 选型 & 搭建
使用的数据库选择了 clickhouse 作为列存储数据库 clickhouse 天生就比 mysql 这种行存储数据库精通于处理大量的数据 最直观的表现就是快 两百多万条测试数据只用了 15s 就能插入完毕 单条件查询十万条数据 处理时长也只需要三百毫秒
搭建:
官方提供了 docker 镜像可以一键搭建 只需要一行命令就可以得到一个开箱即用的 clickhouse
docker run -d -p 8123:8123 9000:9000 yandex/clickhouse-server:latest |
这样就开启了一个数据库并将端口映射到了宿主机 使用 datagrip 或者其他管理软件就可以连接了
# 表结构选取
我最终选择的表结构是:
create table credentials( | |
id int codec(ZSTD(12)) , | |
url String codec(ZSTD(12)), | |
username String codec(ZSTD(12)), | |
password String codec(ZSTD(12)) | |
) engine =ReplacingMergeTree(id) | |
order by (url, username, password) | |
settings index_granularity = 8192; |
选取了_ReplacingMergeTree_ 这个 engine 能够实现插入数据后自动优化去重 同时使用了 ZSTD 压缩 最大化节省存储空间
一开始我尝试过使用 LowCardinality 低基数类型包裹的 string 进行存储 但是发现在数据量大了以后他的性能和占用的存储空间都会快速增长 看了文档才发现
对于大量用户凭证信息 肯定是过万的数据量 这时使用 LowCardinality 就起反作用了
# 数据读取 & 清洗
我在这里选择的是 python 处理 因为写的快 -- 快速开发 & 快速试错并修复
基本的思路就是 open 文件 -> 读取一行 -> 分割成 url username password-> 清洗脏数据,最后插入到数据库
这里我让 gpt 给我了一个快速原型
import requests | |
# ClickHouse 服务器的地址 | |
clickhouse_url = 'http://localhost:8123/' | |
# 定义文件路径 | |
file_path = 'your_file.txt' | |
# 定义批量插入的大小 | |
batch_size = 1000 | |
data_batch = [] | |
# 初始化 ID 计数器 | |
id_counter = 1 | |
# 打开文件,逐行读取 | |
with open(file_path, 'r') as file: | |
for line in file: | |
# 假设数据以 '|' 分隔,并且只有 url、username 和 password 三栏 | |
fields = line.strip().split('|') | |
if len(fields) == 3: # 仅有 url、username 和 password 三栏 | |
url, username, password = fields | |
# 为每条记录增加一个自增 ID | |
data_batch.append([str(id_counter), url, username, password]) | |
id_counter += 1 | |
# 当数据达到批量大小时,将其插入 ClickHouse | |
if len(data_batch) >= batch_size: | |
# 将数据转换为 CSV 格式(或其他合适的格式) | |
csv_data = '\n'.join([','.join(row) for row in data_batch]) | |
# 插入数据到 ClickHouse | |
requests.post(clickhouse_url, params={ | |
'query': 'INSERT INTO userPasses FORMAT CSV'}, data=csv_data) | |
# 清空批次数据 | |
data_batch = [] | |
# 处理剩余的数据(如果最后一批不满 batch_size) | |
if data_batch: | |
csv_data = '\n'.join([','.join(row) for row in data_batch]) | |
requests.post(clickhouse_url, params={ | |
'query': 'INSERT INTO userPasses FORMAT CSV'}, data=csv_data) | |
print("数据导入完成!") |
不得不说这个原型是很好用的 很方便在其基础上进行扩展 添加需要的功能
在修改和实际使用的过程中 基本每个环节都出现过 bug--
# 文件读取
虽然数据源基本都是 utf8 编码的 txt 文件 但是中间可能会掺杂着几个非 utf8 字符 在读取的时候就会直接把脚本崩溃 但是如果用 try 包裹处理流程的话 又会导致脚本性能太低 因此我选择的是对文件进行 utf8 合法性检测 & 预处理
# utf8编码测试 | |
folder = '' | |
nonutf8File = [] | |
for filename in os.listdir(folder): | |
if filename.endswith('.txt'): | |
try: | |
file = open(os.path.join(folder, filename), 'r', encoding='utf-8') | |
for line in file: | |
pass | |
except: | |
nonutf8File.append(filename) | |
continue | |
if nonutf8File: | |
print(f"以下文件不是utf8编码:{nonutf8File}") | |
exit() |
这一段代码能很方便的测试出哪些文件存在问题 将检测出的文件在 vscode 中用 utf8 重新保存一下就能解决这个问题了(不是很优雅 但是我感觉这是效率比较高的方法了 直接在 python 里处理我觉得会更慢
# 数据分割
因为数据来自不同来源 对用户凭证的分割方式也可能不同 有的用 | 有的用:甚至还有混着用的 --
对于 | 或者空格还算好处理的 直接 split 就可以了 但是:因为 url 协议中也存在 就需要单独处理 (实际没有
line = line.replace('https://', '',).replace('http://', '').replace('android://','').replace(':',' ').replace('|'," ") | |
fields = line.strip().split(' ') | |
if len(fields) == 3: # 仅有 url、username 和 password 三栏 用来过滤一些脏数据 | |
url, username, password = fields |
这段代码先清除了 url 协议头 然后将:或者|都替换成空格处理 这样基本能通杀各种 txt 了
# 数据清洗
脏数据也是各有各的脏 有写了一堆不可见字符的 有把 url 写进 username 或者 password 的 一一处理就行了
if not url or not username or not password: | |
continue | |
if not url.isprintable() or not username.isprintable() or not password.isprintable(): | |
continue | |
if "http" in username or "http" in password: | |
continue | |
if len(username) > 50 or len(password) > 100: | |
continue |
最后这个长度判断我觉得还可以再收紧一点 因为常见网站的用户凭证长度限制也就 20 个字符吧
# 数据插入
clickhouse 提供了一个很好用的 http 协议 Api 同时支持直接插入 csv 这样就方便 python 进行交互并且批量插入 有效提高了吞吐量 有点像炼丹里的 minibatch 思想
# 当数据达到批量大小时,将其插入 ClickHouse | |
if len(data_batch) >= batch_size: | |
data_count += len(data_batch) | |
# 将数据转换为 CSV 格式(或其他合适的格式) | |
csv_data = '\n'.join([','.join(row) for row in data_batch]) | |
# process = psutil.Process(os.getpid()) | |
# mem_info = process.memory_info() | |
# # 查看 RSS(Resident Set Size,常驻内存) | |
# print(f"Memory Usage (RSS): {mem_info.rss / (1024 * 1024):.2f} MB") | |
# 插入数据到 ClickHouse | |
req = requests.post(clickhouse_url, params={ | |
'query': 'INSERT INTO credentials.userPasses FORMAT CSV','input_format_allow_errors_num': 1000}, data=csv_data) | |
print(f"已导入 {id_counter} 条数据") | |
# 清空批次数据 | |
data_batch = [] | |
# 处理剩余的数据(如果最后一批不满 batch_size) | |
if data_batch: | |
csv_data = '\n'.join([','.join(row) for row in data_batch]) | |
requests.post(clickhouse_url, params={ | |
'query': 'INSERT INTO credentials.userPasses FORMAT CSV','input_format_allow_errors_num': 1000}, data=csv_data) | |
print(f"已导入 {id_counter} 条数据") |
这里添加了 'input_format_allow_errors_num': 1000 参数 不添加这个参数的话只要有报错 整批数据都会被退回
这样就实现了数据的批量处理和插入 完整脚本如下:
import os | |
import time | |
import requests | |
# import psutil | |
# import os | |
# 获取当前进程的内存信息 | |
# ClickHouse 服务器的地址 | |
clickhouse_url = 'http://localhost:8123/' | |
# 定义批量插入的大小 | |
batch_size = 10000 | |
id_counter = 1 | |
# 打开文件,逐行读取 | |
def import_data(file_path): | |
global id_counter | |
data_count = 0 | |
data_batch = [] | |
with open(file_path, 'r',encoding='utf-8') as file: | |
print("开始导入数据...") | |
for line in file: | |
# 拆分数据,假设以 '|' 分隔 | |
line = line.replace('https://', '',).replace('http://', '').replace('android://','').replace(':',' ').replace('|'," ") | |
fields = line.strip().split(' ') | |
if len(fields) == 3: # 仅有 url、username 和 password 三栏 | |
url, username, password = fields | |
if not url or not username or not password: | |
continue | |
if not url.isprintable() or not username.isprintable() or not password.isprintable(): | |
continue | |
if "http" in username or "http" in password: | |
continue | |
if len(username) > 50 or len(password) > 100: | |
continue | |
data_batch.append([str(id_counter), url, username, password]) | |
id_counter += 1 | |
# 当数据达到批量大小时,将其插入 ClickHouse | |
if len(data_batch) >= batch_size: | |
data_count += len(data_batch) | |
# 将数据转换为 CSV 格式(或其他合适的格式) | |
csv_data = '\n'.join([','.join(row) for row in data_batch]) | |
# process = psutil.Process(os.getpid()) | |
# mem_info = process.memory_info() | |
# # 查看 RSS(Resident Set Size,常驻内存) | |
# print(f"Memory Usage (RSS): {mem_info.rss / (1024 * 1024):.2f} MB") | |
# 插入数据到 ClickHouse | |
req = requests.post(clickhouse_url, params={ | |
'query': 'INSERT INTO credentials.userPasses FORMAT CSV','input_format_allow_errors_num': 1000}, data=csv_data) | |
print(f"已导入 {id_counter} 条数据") | |
# 清空批次数据 | |
data_batch = [] | |
# 处理剩余的数据(如果最后一批不满 batch_size) | |
if data_batch: | |
csv_data = '\n'.join([','.join(row) for row in data_batch]) | |
requests.post(clickhouse_url, params={ | |
'query': 'INSERT INTO credentials.userPasses FORMAT CSV','input_format_allow_errors_num': 1000}, data=csv_data) | |
print(f"已导入 {id_counter} 条数据") | |
starttime = time.time() | |
# utf8编码测试 | |
folder = 'datatxt' | |
nonutf8File = [] | |
for filename in os.listdir(folder)[:]: | |
if filename.endswith('.txt'): | |
try: | |
file = open(os.path.join(folder, filename), 'r', encoding='utf-8') | |
for line in file: | |
pass | |
except: | |
nonutf8File.append(filename) | |
continue | |
if nonutf8File: | |
print(f"以下文件不是utf8编码:{nonutf8File}") | |
exit() | |
# | |
for filename in os.listdir(folder)[:]: | |
if filename.endswith('.txt'): | |
import_data(os.path.join(folder, filename)) | |
print("数据导入完成!") | |
print(f"耗时:{time.time() - starttime:.2f} 秒") |
中间的检测内存的几行是因为我当时想 python 是不是会把整个文件读到内存里 造成爆内存 后来想了想 不会爆 python 的 open 只是打开了一个 fd 实际是按行分段读取的 运行中监测的内存占用也一直在 40mb 左右
# 性能优化
这一块可以修改的参数就非常多了 很多选项都可能影响性能
例如 batch_size
我在将 batchsize 从 1k 上调到 10k 以后 用时就下降到了 12s
至于数据库端 replacingmergeTree 会出手 通过执行
OPTIMIZE TABLE xxx.xxx FINAL; |
引擎会自动对数据进行去重、搜索优化,结果是十分有意义的
插入时记录三亿条数据 自动去重后剩余 2.1 亿条
这部分数据源 txt 占用 26g 存储空间 经过数据库压缩后只剩 8.54g