如果一个应用层的流量分析系统 同时用上了规则匹配 + 机器学习 + 大模型 那他是否是一辆混动的 F1?

# Background

这个东西我很早就想做了 苦于没有合适的数据集 也懒得去进一步寻找 就不出意外的搁置了。然而上个月公司里要搞个 AI 挑战赛,就搞利用 AI / 大模型对流量进行分类,mentor 和我一合计 拉上另一个大哥 就报上名去打了 报上名以后跟我说 他们都挺忙 要我打主力 - - 行吧 预料到了

这个比赛最大的资产无疑是一份 50w 条的生产上的真实流量集,虽然不能拉到本地跑 只能放在在线的机器上训练,这份流量集每一条都有人工分类的 label,所以不管怎么来说 只要选好算法,单凭这份高质量数据 你的结果都不可能差。

# Design

作为一个流量引擎,首要关注的就是他能不能线上使用,也就是关注延迟信息。想想你要是训练出一个 F1 score 0.99999 的模型 但是跑一条数据要一分钟,显然这个模型不能放在线上使用吧 只能作为一个离线分析的工具了。

所以 我对于这个流量引擎准确性以外的所有设计都是为了减少延迟而完成的。这里要吐槽一下这个 b 赛制 评分体系只看准确率 最终测试集还有 12w 条数据?那些纯大模型的跑一个月都跑不完吧

具体的设计思想可以参考这几点:

  • 减少大模型的使用

    无脑使用大模型 这属于人之常情吧 大模型发明出来以后 所有人都在用大模型将已经发明的东西重新实现一遍。一句 prompt 就能完成任务的体验固然很爽,但是目前来看你一旦调用了大模型,响应的时间就要掉到秒级了。所以为了降低响应延迟,我在设计时尽量减少了大模型的使用,仅在其他模型置信度不高的时候才会去调用一个微调后的大模型给出最终判断。

  • 尽量剪枝

    剪枝的意思就是将判断进行左移。能在规则层进行的判断为什么要调用模型?能用机器学习判别的为什么要调用大模型?所以我将分类的模块根据延迟大小分成了四类:

    规则层→ML 层→LLM 层

    每一层都有剔除机制,被剔除了就代表这个流量已经得出了置信度高的标签,可以被直接抛出了

思想差不多就这样吧 再细节的方面可能就要边做边想了

我也不是纽维 没法脑子里就想出来这个东西做完以后的效果是什么 中途可能有哪些设计需要改 所以这里写的设计方案仅代表这一时刻的想的设计。

# F-Duct:规则层

这其实是一个我人眼挖掘数据特征时发现的漏洞:给出的数据怎么和 URL 有相关啊?反爬运营的经验发力了。

在分析数据时(其实是在监工 llm 给我跑微调模型的数据),我看到有些标签和他访问的 URL 路径是相关的,也就是说这种路径仅会在这个特定标签的黑 / 白样本中出现。基于此,我就可以先利用特殊规则和规则的组合对流入的数据进行一次判断,如果符合了某个特定的 URL/body,则能直接推断出他的标签。

import json
import multiprocessing as mp
import re
from collections import Counter
from concurrent.futures import ThreadPoolExecutor
import pandas as pd
from tqdm import tqdm
# 读取数据
metadata = metadata1.append(metadata2).append(metadata3)
print(metadata.head())
# URL 路径模式发现和分析
class URLPathPatternAnalyzer:
    def __init__(self, df, n_workers=None):
        self.df = df
        # 更新标签映射
        self.label_mapping = {
            ('白样本', '基础安全'): 0,
            ('白样本', '数据安全'): 0,
            ('白样本', '内容安全'): 0,
            ('白样本', '账号安全'): 0,
            ('正常', '交易安全'): 0,
            ('正常流量', '反爬'): 0,
            ('刷单', '交易安全'): 1,
            ('数据安全-API数据恶意拉取', '数据安全'): 2,
            ('数据安全-未授权敏感数据查询', '数据安全'): 3,
            ('数据安全-绕过基线防护', '数据安全'): 4,
            ('爬虫流量', '反爬'): 5,
            ('虚假账号', '账号安全'): 6,
            ('账号囤积', '账号安全'): 7,
            ('黄牛', '交易安全'): 8,
            ('黑样本', '内容安全'): 9,
            ('黑样本', '基础安全'): 10,
        }
        # 创建反向映射
        self.class_names = {v: k for k, v in self.label_mapping.items()}
        # 白样本类别(标签为 0)
        self.white_labels = [k[0] for k, v in self.label_mapping.items() if v == 0]  # 只取 data_label
        self.n_workers = n_workers or min(58, mp.cpu_count())
        print(f"使用 {self.n_workers} 个工作线程")
        print(f"识别到的类别数量: {len(set(self.label_mapping.values()))}")
    def get_samples_by_class(self, class_id):
        """根据类别ID获取样本"""
        if class_id not in self.class_names:
            return pd.DataFrame()
        target_labels = [k for k, v in self.label_mapping.items() if v == class_id]
        # 筛选对应的样本
        mask = False
        for data_label, data_source in target_labels:
            current_mask = (self.df['data_label'] == data_label) & (self.df['data_source'] == data_source)
            mask = mask | current_mask
        return self.df[mask]
    def find_class_specific_patterns_parallel(self, target_class_id, min_frequency=3, purity_threshold=0.8):
        """并行找出特定类别的纯净路径模式"""
        target_samples = self.get_samples_by_class(target_class_id)
        # 修复获取其他样本的逻辑
        target_label_source = self.class_names[target_class_id]  # 这是一个元组 (data_label, data_source)
        target_data_label, target_data_source = target_label_source
        # 获取不属于目标类别的所有样本
        other_samples = self.df[~((self.df['data_label'] == target_data_label) &
                                 (self.df['data_source'] == target_data_source))]
        if target_samples.empty:
            print(f"类别 {target_class_id} ({self.class_names[target_class_id]}) 没有找到样本")
            return []
        print(f"类别 {target_class_id} ({self.class_names[target_class_id]}) 样本数量: {len(target_samples)}")
        print(f"其他类别样本数量: {len(other_samples)}")
        # 准备数据
        target_paths = target_samples['url_path'].dropna().tolist()
        other_paths = other_samples['url_path'].dropna().tolist()
        if not target_paths:
            print(f"类别 {target_class_id} 没有有效的URL路径")
            return []
        # 分批处理
        target_batch_size = max(1, len(target_paths) // (self.n_workers * 2))
        other_batch_size = max(1, len(other_paths) // (self.n_workers * 2)) if other_paths else 1
        target_batches = [target_paths[i:i + target_batch_size]
                         for i in range(0, len(target_paths), target_batch_size)]
        other_batches = [other_paths[i:i + other_batch_size]
                        for i in range(0, len(other_paths), other_batch_size)] if other_paths else []
        print(f"目标类别分成 {len(target_batches)} 个批次,其他类别分成 {len(other_batches)} 个批次")
        # 并行提取泛化路径模式
        with ThreadPoolExecutor(max_workers=self.n_workers) as executor:
            print(f"并行处理类别 {target_class_id} 样本...")
            target_futures = [executor.submit(self._extract_patterns_from_paths, batch)
                             for batch in target_batches]
            print("并行处理其他类别样本...")
            other_futures = [executor.submit(self._extract_patterns_from_paths, batch)
                            for batch in other_batches] if other_batches else []
            # 收集目标类别结果
            target_generalized_paths = []
            for future in tqdm(target_futures, desc=f"收集类别{target_class_id}模式"):
                target_generalized_paths.extend(future.result())
            # 收集其他类别结果
            other_generalized_paths = []
            if other_futures:
                for future in tqdm(other_futures, desc="收集其他类别模式"):
                    other_generalized_paths.extend(future.result())
        print(f"类别 {target_class_id} 模式数量: {len(target_generalized_paths)}")
        print(f"其他类别模式数量: {len(other_generalized_paths)}")
        # 统计模式频次
        target_pattern_counts = Counter(target_generalized_paths)
        other_pattern_counts = Counter(other_generalized_paths)
        # 找出纯净模式
        pure_patterns = []
        for pattern, target_count in target_pattern_counts.items():
            if target_count >= min_frequency:
                other_count = other_pattern_counts.get(pattern, 0)
                total_count = target_count + other_count
                purity = target_count / total_count if total_count > 0 else 0
                if purity >= purity_threshold:
                    pure_patterns.append({
                        'pattern': pattern,
                        'target_count': target_count,
                        'other_count': other_count,
                        'total_count': total_count,
                        'purity': purity,
                        'type': 'generalized_path',
                        'class_id': target_class_id,
                        'class_name': self.class_names[target_class_id]
                    })
        # 处理路径前缀模式
        print("并行处理路径前缀模式...")
        with ThreadPoolExecutor(max_workers=self.n_workers) as executor:
            target_prefix_futures = [executor.submit(self._extract_prefixes_from_paths, batch)
                                   for batch in target_batches]
            other_prefix_futures = [executor.submit(self._extract_prefixes_from_paths, batch)
                                  for batch in other_batches] if other_batches else []
            # 收集前缀结果
            target_prefixes = []
            for future in tqdm(target_prefix_futures, desc=f"收集类别{target_class_id}前缀"):
                target_prefixes.extend(future.result())
            other_prefixes = []
            if other_prefix_futures:
                for future in tqdm(other_prefix_futures, desc="收集其他类别前缀"):
                    other_prefixes.extend(future.result())
        target_prefix_counts = Counter([p for p in target_prefixes if p])
        other_prefix_counts = Counter([p for p in other_prefixes if p])
        # 找出纯净前缀模式
        for prefix, target_count in target_prefix_counts.items():
            if target_count >= min_frequency:
                other_count = other_prefix_counts.get(prefix, 0)
                total_count = target_count + other_count
                purity = target_count / total_count if total_count > 0 else 0
                if purity >= purity_threshold:
                    pure_patterns.append({
                        'pattern': prefix,
                        'target_count': target_count,
                        'other_count': other_count,
                        'total_count': total_count,
                        'purity': purity,
                        'type': 'path_prefix',
                        'class_id': target_class_id,
                        'class_name': self.class_names[target_class_id]
                    })
        return sorted(pure_patterns, key=lambda x: (x['purity'], x['target_count']), reverse=True)
    def generate_all_class_filter_rules(self, min_frequency=3, purity_threshold=0.8):
        """为所有类别生成过滤规则"""
        all_rules = {}
        for class_id in self.class_names.keys():
            print(f"\n=== 处理类别 {class_id}: {self.class_names[class_id]} ===")
            patterns = self.find_class_specific_patterns_parallel(
                class_id, min_frequency, purity_threshold
            )
            if patterns:
                rules = self.generate_pure_filter_rules(patterns)
                all_rules[class_id] = {
                    'class_name': self.class_names[class_id],
                    'patterns': patterns,
                    'rules': rules
                }
                print(f"为类别 {class_id} 生成了 {len(rules)} 条规则")
            else:
                print(f"类别 {class_id} 没有找到符合条件的纯净模式")
                all_rules[class_id] = {
                    'class_name': self.class_names[class_id],
                    'patterns': [],
                    'rules': []
                }
        return all_rules
    def extract_path_patterns(self, path):
        """提取URL路径的各种模式特征"""
        if pd.isna(path) or not isinstance(path, str):
            return {}
        try:
            # 确保路径以 / 开头
            if not path.startswith('/'):
                path = '/' + path
            patterns = {
                'segments_count': len([seg for seg in path.split('/') if seg]),
                'depth': path.count('/') - 1 if path.startswith('/') else path.count('/'),
                'has_api': '/api/' in path.lower(),
                'api_version': self._extract_api_version(path),
                'resource_type': self._classify_path_resource(path),
                'generalized_path': self._generalize_path(path),
                'path_prefix': self._extract_path_prefix(path),
                'path_suffix': self._extract_path_suffix(path),
                'contains_numbers': bool(re.search(r'\d+', path)),
                'path_structure': self._analyze_path_structure(path)
            }
            return patterns
        except Exception as e:
            print(f"处理路径时出错: {path}, 错误: {e}")
            return {}
    def _extract_api_version(self, path):
        """提取API版本信息"""
        # 匹配 v1, v2, v3 等版本号
        version_match = re.search(r'/v(\d+)(?:/|$)', path.lower())
        if version_match:
            return f"v{version_match.group(1)}"
        return 'no_version'
    def _classify_path_resource(self, path):
        """根据路径分类资源类型"""
        path_lower = path.lower()
        if '/api/' in path_lower:
            # 进一步分类 API 类型
            if any(keyword in path_lower for keyword in ['poi', 'shop', 'store']):
                return 'poi_api'
            elif any(keyword in path_lower for keyword in ['user', 'account', 'profile']):
                return 'user_api'
            elif any(keyword in path_lower for keyword in ['order', 'trade', 'pay']):
                return 'trade_api'
            elif any(keyword in path_lower for keyword in ['search', 'query']):
                return 'search_api'
            else:
                return 'general_api'
        elif any(keyword in path_lower for keyword in ['admin', 'manage', 'dashboard']):
            return 'admin_page'
        elif any(keyword in path_lower for keyword in ['static', 'assets', 'resource']):
            return 'static_resource'
        else:
            return 'general_page'
    def _generalize_path(self, path):
        """将路径中的变量部分替换为占位符"""
        # 替换数字为 {num}
        generalized = re.sub(r'\b\d+\b', '{num}', path)
        # 替换 UUID 格式为 {uuid}
        generalized = re.sub(r'\b[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}\b', '{uuid}', generalized)
        # 替换长字符串 ID 为 {id}(长度大于 8 的字母数字组合)
        generalized = re.sub(r'\b[a-zA-Z0-9]{8,}\b', '{id}', generalized)
        # 替换时间戳格式
        generalized = re.sub(r'\b\d{10,13}\b', '{timestamp}', generalized)
        return generalized
    def _extract_path_prefix(self, path, depth=2):
        """提取路径前缀(前N层)"""
        segments = [seg for seg in path.split('/') if seg]
        if len(segments) >= depth:
            return '/' + '/'.join(segments[:depth])
        return path
    def _extract_path_suffix(self, path):
        """提取路径后缀(最后一段)"""
        segments = [seg for seg in path.split('/') if seg]
        return segments[-1] if segments else ''
    def _analyze_path_structure(self, path):
        """分析路径结构模式"""
        segments = [seg for seg in path.split('/') if seg]
        # 创建结构模式:字母段用 A 表示,数字段用 N 表示,混合段用 M 表示
        structure = []
        for seg in segments:
            if seg.isdigit():
                structure.append('N')
            elif seg.isalpha():
                structure.append('A')
            else:
                structure.append('M')
        return '-'.join(structure) if structure else 'empty'
    def _process_paths_batch(self, paths_batch):
        """批量处理路径模式提取"""
        patterns_list = []
        for path in paths_batch:
            patterns = self.extract_path_patterns(path)
            if patterns:
                patterns_list.append(patterns)
        return patterns_list
    def analyze_white_patterns_parallel(self):
        """并行分析白样本的路径模式"""
        # 筛选白样本数据
        white_samples = self.df[self.df['data_label'].isin(self.white_labels)]
        if white_samples.empty:
            print("未找到白样本数据")
            return None
        print(f"白样本数量: {len(white_samples)}")
        # 将路径分批处理
        paths = white_samples['url_path'].dropna().tolist()
        batch_size = max(1, len(paths) // (self.n_workers * 4))  # 每个线程处理多个批次
        path_batches = [paths[i:i + batch_size] for i in range(0, len(paths), batch_size)]
        print(f"分成 {len(path_batches)} 个批次进行并行处理")
        # 并行处理
        all_patterns = []
        with ThreadPoolExecutor(max_workers=self.n_workers) as executor:
            # 使用 tqdm 显示进度
            futures = [executor.submit(self._process_paths_batch, batch) for batch in path_batches]
            for future in tqdm(futures, desc="处理路径模式"):
                batch_patterns = future.result()
                all_patterns.extend(batch_patterns)
        if not all_patterns:
            print("未能提取到有效的路径模式")
            return None
        print(f"成功提取 {len(all_patterns)} 个路径模式")
        # 统计各种模式
        results = {
            'generalized_paths': Counter([p['generalized_path'] for p in all_patterns if p.get('generalized_path')]),
            'path_prefixes': Counter([p['path_prefix'] for p in all_patterns if p.get('path_prefix')]),
            'resource_types': Counter([p['resource_type'] for p in all_patterns if p.get('resource_type')]),
            'api_versions': Counter([p['api_version'] for p in all_patterns if p.get('api_version')]),
            'path_structures': Counter([p['path_structure'] for p in all_patterns if p.get('path_structure')]),
            'segments_distribution': Counter([p['segments_count'] for p in all_patterns if p.get('segments_count')]),
            'depth_distribution': Counter([p['depth'] for p in all_patterns if p.get('depth')])
        }
        return results
    def _extract_patterns_from_paths(self, paths):
        """从路径列表中提取泛化模式"""
        patterns = []
        for path in paths:
            pattern_info = self.extract_path_patterns(path)
            if pattern_info.get('generalized_path'):
                patterns.append(pattern_info['generalized_path'])
        return patterns
    def find_pure_white_patterns_parallel(self, min_frequency=3, purity_threshold=1.0):
        """并行找出只对应白样本的纯净路径模式"""
        white_samples = self.df[self.df['data_label'].isin(self.white_labels)]
        non_white_samples = self.df[~self.df['data_label'].isin(self.white_labels)]
        print(f"白样本数量: {len(white_samples)}, 非白样本数量: {len(non_white_samples)}")
        # 准备数据
        white_paths = white_samples['url_path'].dropna().tolist()
        non_white_paths = non_white_samples['url_path'].dropna().tolist()
        # 分批处理
        white_batch_size = max(1, len(white_paths) // (self.n_workers * 2))
        non_white_batch_size = max(1, len(non_white_paths) // (self.n_workers * 2))
        white_batches = [white_paths[i:i + white_batch_size]
                        for i in range(0, len(white_paths), white_batch_size)]
        non_white_batches = [non_white_paths[i:i + non_white_batch_size]
                            for i in range(0, len(non_white_paths), non_white_batch_size)]
        print(f"白样本分成 {len(white_batches)} 个批次,非白样本分成 {len(non_white_batches)} 个批次")
        # 并行提取泛化路径模式
        with ThreadPoolExecutor(max_workers=self.n_workers) as executor:
            print("并行处理白样本...")
            white_futures = [executor.submit(self._extract_patterns_from_paths, batch)
                           for batch in white_batches]
            print("并行处理非白样本...")
            non_white_futures = [executor.submit(self._extract_patterns_from_paths, batch)
                               for batch in non_white_batches]
            # 收集白样本结果
            white_generalized_paths = []
            for future in tqdm(white_futures, desc="收集白样本模式"):
                white_generalized_paths.extend(future.result())
            # 收集非白样本结果
            non_white_generalized_paths = []
            for future in tqdm(non_white_futures, desc="收集非白样本模式"):
                non_white_generalized_paths.extend(future.result())
        print(f"白样本模式数量: {len(white_generalized_paths)}, 非白样本模式数量: {len(non_white_generalized_paths)}")
        # 统计模式频次
        white_pattern_counts = Counter(white_generalized_paths)
        non_white_pattern_counts = Counter(non_white_generalized_paths)
        # 找出纯净模式
        pure_patterns = []
        for pattern, white_count in white_pattern_counts.items():
            if white_count >= min_frequency:
                non_white_count = non_white_pattern_counts.get(pattern, 0)
                total_count = white_count + non_white_count
                purity = white_count / total_count if total_count > 0 else 0
                if purity >= purity_threshold:
                    pure_patterns.append({
                        'pattern': pattern,
                        'white_count': white_count,
                        'non_white_count': non_white_count,
                        'total_count': total_count,
                        'purity': purity,
                        'type': 'generalized_path'
                    })
        # 并行处理路径前缀模式
        print("并行处理路径前缀模式...")
        with ThreadPoolExecutor(max_workers=self.n_workers) as executor:
            white_prefix_futures = [executor.submit(self._extract_prefixes_from_paths, batch)
                                  for batch in white_batches]
            non_white_prefix_futures = [executor.submit(self._extract_prefixes_from_paths, batch)
                                      for batch in non_white_batches]
            # 收集前缀结果
            white_prefixes = []
            for future in tqdm(white_prefix_futures, desc="收集白样本前缀"):
                white_prefixes.extend(future.result())
            non_white_prefixes = []
            for future in tqdm(non_white_prefix_futures, desc="收集非白样本前缀"):
                non_white_prefixes.extend(future.result())
        white_prefix_counts = Counter([p for p in white_prefixes if p])
        non_white_prefix_counts = Counter([p for p in non_white_prefixes if p])
        # 找出纯净前缀模式
        for prefix, white_count in white_prefix_counts.items():
            if white_count >= min_frequency:
                non_white_count = non_white_prefix_counts.get(prefix, 0)
                total_count = white_count + non_white_count
                purity = white_count / total_count if total_count > 0 else 0
                if purity >= purity_threshold:
                    pure_patterns.append({
                        'pattern': prefix,
                        'white_count': white_count,
                        'non_white_count': non_white_count,
                        'total_count': total_count,
                        'purity': purity,
                        'type': 'path_prefix'
                    })
        return sorted(pure_patterns, key=lambda x: (x['purity'], x['white_count']), reverse=True)
    def _extract_prefixes_from_paths(self, paths):
        """从路径列表中提取前缀"""
        prefixes = []
        for path in paths:
            pattern_info = self.extract_path_patterns(path)
            if pattern_info.get('path_prefix'):
                prefixes.append(pattern_info['path_prefix'])
        return prefixes
    def _validate_pattern_batch(self, args):
        """批量验证模式纯净度"""
        pattern, pattern_type, paths_with_labels = args
        matches = []
        for path, label in paths_with_labels:
            if pd.isna(path):
                continue
            if self._pattern_matches(pattern, pattern_type, str(path)):
                matches.append({
                    'path': path,
                    'label': label
                })
        return matches
    def validate_pattern_purity_parallel(self, pattern, pattern_type):
        """并行验证模式的纯净度"""
        white_samples = self.df[self.df['data_label'].isin(self.white_labels)]
        non_white_samples = self.df[~self.df['data_label'].isin(self.white_labels)]
        # 准备数据
        white_data = [(row['url_path'], row['data_label']) for _, row in white_samples.iterrows()]
        non_white_data = [(row['url_path'], row['data_label']) for _, row in non_white_samples.iterrows()]
        # 分批处理
        white_batch_size = max(1, len(white_data) // self.n_workers)
        non_white_batch_size = max(1, len(non_white_data) // self.n_workers)
        white_batches = [white_data[i:i + white_batch_size]
                        for i in range(0, len(white_data), white_batch_size)]
        non_white_batches = [non_white_data[i:i + non_white_batch_size]
                            for i in range(0, len(non_white_data), non_white_batch_size)]
        # 并行验证
        with ThreadPoolExecutor(max_workers=self.n_workers) as executor:
            # 验证白样本
            white_futures = [executor.submit(self._validate_pattern_batch,
                                           (pattern, pattern_type, batch))
                           for batch in white_batches]
            # 验证非白样本
            non_white_futures = [executor.submit(self._validate_pattern_batch,
                                               (pattern, pattern_type, batch))
                               for batch in non_white_batches]
            # 收集结果
            white_matches = []
            for future in white_futures:
                white_matches.extend(future.result())
            non_white_matches = []
            for future in non_white_futures:
                non_white_matches.extend(future.result())
        return {
            'white_matches': white_matches,
            'non_white_matches': non_white_matches,
            'is_pure': len(non_white_matches) == 0,
            'purity': len(white_matches) / (len(white_matches) + len(non_white_matches)) if (len(white_matches) + len(non_white_matches)) > 0 else 0
        }
    def _pattern_matches(self, pattern, pattern_type, path):
        """检查路径是否匹配模式"""
        if pattern_type == 'generalized_path':
            generalized = self._generalize_path(path)
            return generalized == pattern
        elif pattern_type == 'path_prefix':
            return path.startswith(pattern)
        return False
    def generate_pure_filter_rules(self, patterns):
        """生成纯净的过滤规则"""
        rules = []
        for pattern_info in patterns:
            pattern = pattern_info['pattern']
            pattern_type = pattern_info['type']
            purity = pattern_info['purity']
            class_id = pattern_info.get('class_id')
            class_name = pattern_info.get('class_name')
            if pattern_type == 'generalized_path':
                # 将泛化模式转换为正则表达式
                regex_pattern = re.escape(pattern)
                regex_pattern = regex_pattern.replace(r'\{num\}', r'\d+')
                regex_pattern = regex_pattern.replace(r'\{uuid\}', r'[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}')
                regex_pattern = regex_pattern.replace(r'\{id\}', r'[a-zA-Z0-9]+')
                regex_pattern = regex_pattern.replace(r'\{timestamp\}', r'\d{10,13}')
                regex_pattern = '^' + regex_pattern + '$'
                rules.append({
                    'type': 'path_regex',
                    'pattern': regex_pattern,
                    'purity': purity,
                    'description': f'纯净泛化路径模式: {pattern}',
                    'original_pattern': pattern,
                    'target_count': pattern_info.get('target_count', pattern_info.get('white_count', 0)),
                    'other_count': pattern_info.get('other_count', pattern_info.get('non_white_count', 0)),
                    'class_id': class_id,
                    'class_name': class_name,
                    'data_label': class_name[0] if class_name else None,
                    'data_source': class_name[1] if class_name else None
                })
            elif pattern_type == 'path_prefix':
                rules.append({
                    'type': 'path_prefix',
                    'pattern': pattern,
                    'purity': purity,
                    'description': f'纯净路径前缀: {pattern}',
                    'original_pattern': pattern,
                    'target_count': pattern_info.get('target_count', pattern_info.get('white_count', 0)),
                    'other_count': pattern_info.get('other_count', pattern_info.get('non_white_count', 0)),
                    'class_id': class_id,
                    'class_name': class_name,
                    'data_label': class_name[0] if class_name else None,
                    'data_source': class_name[1] if class_name else None
                })
        return rules
# 使用修改后的分析器
print("初始化多线程分析器...")
analyzer = URLPathPatternAnalyzer(metadata, n_workers=58)
# 为所有类别生成纯净过滤规则
print("\n=== 为所有类别生成纯净过滤规则 ===")
all_class_rules = analyzer.generate_all_class_filter_rules(min_frequency=3, purity_threshold=1)
# 收集所有规则并排序
all_rules = []
for class_id, rule_info in all_class_rules.items():
    all_rules.extend(rule_info['rules'])
# 按纯度和目标样本数排序,取前 60 条
top_rules = sorted(all_rules, key=lambda x: (x['purity'], x['target_count']), reverse=True)
# 准备写入 JSON 的数据
json_data = {
    'metadata': {
        'total_rules_generated': len(all_rules),
        'top_rules_count': len(top_rules),
        'generation_timestamp': pd.Timestamp.now().isoformat(),
        'min_frequency': 3,
        'purity_threshold': 1
    },
    'class_mapping': {
        str(k): {
            'data_label': v[0],
            'data_source': v[1]
        } for k, v in analyzer.class_names.items()
    },
    'top_rules': []
}
# 添加规则到 JSON 数据
for i, rule in enumerate(top_rules, 1):
    json_rule = {
        'rank': i,
        'type': rule['type'],
        'pattern': rule['pattern'],
        'original_pattern': rule['original_pattern'],
        'description': rule['description'],
        'purity': round(rule['purity'], 4),
        'target_count': rule['target_count'],
        'other_count': rule['other_count'],
        'class_id': rule['class_id'],
        'data_label': rule['data_label'],
        'data_source': rule['data_source']
    }
    json_data['top_rules'].append(json_rule)
# 写入 JSON 文件
with open(output_file, 'w', encoding='utf-8') as f:
    json.dump(json_data, f, ensure_ascii=False, indent=2)
print(f"\n前60条过滤规则已写入文件: {output_file}")
# 输出每个类别的规则统计
for class_id, rule_info in all_class_rules.items():
    print(f"\n{'='*60}")
    print(f"类别 {class_id}: {rule_info['class_name']}")
    print(f"{'='*60}")
    if rule_info['rules']:
        print(f"发现 {len(rule_info['patterns'])} 个纯净模式,生成 {len(rule_info['rules'])} 条过滤规则:")
        for i, rule in enumerate(rule_info['rules'][:5], 1):  # 显示前 5 条规则
            print(f"\n{i}. {rule['description']}")
            print(f"   类型: {rule['type']}")
            print(f"   模式: {rule['pattern']}")
            print(f"   纯度: {rule['purity']:.3f}")
            print(f"   目标类别样本数: {rule['target_count']}")
            print(f"   其他类别样本数: {rule['other_count']}")
            print(f"   所属类别: {rule['data_label']} - {rule['data_source']}")
    else:
        print("未找到符合条件的纯净模式")
# 生成规则统计报告
print(f"\n{'='*60}")
print("规则生成统计报告")
print(f"{'='*60}")
total_rules = 0
class_rule_stats = {}
for class_id, rule_info in all_class_rules.items():
    rule_count = len(rule_info['rules'])
    total_rules += rule_count
    class_rule_stats[class_id] = {
        'count': rule_count,
        'data_label': rule_info['class_name'][0],
        'data_source': rule_info['class_name'][1]
    }
    print(f"类别 {class_id} ({rule_info['class_name'][0]}, {rule_info['class_name'][1]}): {rule_count} 条规则")
print(f"\n总计生成 {total_rules} 条过滤规则")
print(f"前60条规则已保存到: {output_file}")
# 显示前 60 条规则的类别分布
print(f"\n{'='*60}")
print("前60条规则的类别分布")
print(f"{'='*60}")
top_rules_by_class = {}
for rule in top_rules:
    class_key = f"{rule['data_label']} - {rule['data_source']}"
    if class_key not in top_rules_by_class:
        top_rules_by_class[class_key] = 0
    top_rules_by_class[class_key] += 1
for class_key, count in sorted(top_rules_by_class.items(), key=lambda x: x[1], reverse=True):
    print(f"{class_key}: {count} 条规则")
print("所有类别的纯净过滤规则生成完成!")

# Power Unit:选用随机森林的机器学习模型

# 提取特征

对于一条离散的 http 流量 核心的参数就是一去一回两个 http 报文,通过这两个报文中的内容和类似 http 请求处理时间的辅助特征,就能够完整量化出一条流量的威胁水平。

def extract_features(row):
    """提取特征函数"""
    domain = row['domain'] if pd.notna(row['domain']) else 'null'
    method = row['method'] if pd.notna(row['method']) else 'null'
    decoded_url = unquote(row['url']) if pd.notna(row['url']) else 'null'
    decoded_request_body = unquote(row['request_body']) if row['request_body_length'] != 0 and pd.notna(
        row['request_body']) else 'null'
    decoded_request_head = unquote(row['request_head']) if not isinstance(row['request_head'], float) and pd.notna(
        row['request_head']) else 'null'
    handle_time = row['request_time'] if pd.notna(row['request_time']) else 0
    UA = row['user_agent'] if pd.notna(row['user_agent']) else 'null'
    response_head = unquote(row['response_head']) if not isinstance(row['response_head'], float) and pd.notna(
        row['response_head']) else 'null'
    response_body = unquote(row['response_body']) if not isinstance(row['response_body'], float) and pd.notna(
        row['response_body']) else 'null'
    status_code = row['status'] if pd.notna(row['status']) else 0
    referer = unquote(row['referer']) if not isinstance(row['referer'], float) and pd.notna(row['referer']) else 'null'
    extra_map = row['extra_map'] if pd.notna(row['extra_map']) else 'null'

使用这些数据以后 就要用合理的方法将他们向量化

对于数值特征,直接标准归一化以后加入向量

# 数值特征
    numerical_features = ['handle_time', 'status_code']
    # 处理数值特征
    print('处理数值')
    scaler = StandardScaler()
    numerical_data = scaler.fit_transform(feature_df[numerical_features].fillna(0))

对于域名和访问方法这样的枚举值 直接用 label encoder 向量化

le_domain = LabelEncoder()
    le_method = LabelEncoder()
    domain_encoded = le_domain.fit_transform(feature_df['domain'].astype(str))
    method_encoded = le_method.fit_transform(feature_df['method'].astype(str))

其他的文本直接 TF-IDF 一把梭

# 处理文本特征 - 使用 TF-IDF
    # 合并所有文本特征为一个字符串
    print('处理文本')
    combined_text = []
    for idx, row in feature_df.iterrows():
        text_parts = []
        for col in text_features:
            if row[col] != 'null':
                text_parts.append(str(row[col]))
        combined_text.append(' '.join(text_parts))
    # TF-IDF 向量化
    tfidf = TfidfVectorizer(max_features=1000, stop_words='english',
                            ngram_range=(1, 2), min_df=2)
    text_features_tfidf = tfidf.fit_transform(combined_text).toarray()

URL 的方面用深度、敏感词数量、特殊符号数量等进行特殊处理

url_features = []
    for url in feature_df['url']:
        url_str = str(url)
        url_feat = {
            'url_length': len(url_str),
            'param_count': url_str.count('&'),
            'has_suspicious_chars': int(any(char in url_str for char in ['<', '>', '"', "'", 'script', 'alert'])),
            'path_depth': url_str.count('/'),
            'query_length': len(url_str.split('?')[1]) if '?' in url_str else 0
        }
        url_features.append(list(url_feat.values()))
    url_features = np.array(url_features)

最后把他们塞在一起放到一个特征矩阵里,放进随机森林训练

# 随机森林

最终选择的模型和参数是

'RandomForest3': RandomForestClassifier(n_estimators=800, n_jobs=-1,criterion='entropy')

这是通过和 xgboost、transformer 和参数搜索空间赛马选出来的。

最终代码:

import time
from urllib.parse import unquote
import numpy as np
import pandas as pd
import tqdm
from sklearn.ensemble import RandomForestClassifier
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics import classification_report, accuracy_score
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder, StandardScaler
# 读取数据
metadata = metadata1.append(metadata2).append(metadata3)
print(metadata.head())
# 修复标签赋值问题 - 基于 data_label 和 data_source 的组合
# 创建标签映射字典
label_mapping = {
    ('白样本', '基础安全'): 0,
    ('白样本', '数据安全'): 0,
    ('白样本', '内容安全'): 0,
    ('白样本', '账号安全'): 0,
    ('正常', '交易安全'): 0,
    ('正常流量', '反爬'): 0,
    ('刷单', '交易安全'): 1,
    ('数据安全-API数据恶意拉取', '数据安全'): 2,
    ('数据安全-未授权敏感数据查询', '数据安全'): 3,
    ('数据安全-绕过基线防护', '数据安全'): 4,
    ('爬虫流量', '反爬'): 5,
    ('虚假账号', '账号安全'): 6,
    ('账号囤积', '账号安全'): 7,
    ('黄牛', '交易安全'): 8,
    ('黑样本', '内容安全'): 9,
    ('黑样本', '基础安全'): 10,
}
# 首先处理白样本和正常流量,设置为 0
# white_combinations = [
#     (' 白样本 ', ' 基础安全 '),
#     (' 白样本 ', ' 数据安全 '),
#     (' 白样本 ', ' 内容安全 '),
#     (' 白样本 ', ' 账号安全 '),
#     (' 正常 ', ' 交易安全 '),
#     (' 正常流量 ', ' 反爬 ')
# ]
#
# for combination in white_combinations:
#     label_mapping[combination] = 0
#
# # 获取所有其他标签组合并分配递增的数字
# unique_combinations = metadata[['data_label', 'data_source']].drop_duplicates()
# other_combinations = []
#
# for _, row in unique_combinations.iterrows():
#     combination = (row['data_label'], row['data_source'])
#     if combination not in white_combinations and pd.notna(row['data_label']) and pd.notna(row['data_source']):
#         other_combinations.append(combination)
#
# # 按组合排序确保一致性
# other_combinations = sorted(other_combinations)
#
# for combination in other_combinations:
#     label_mapping[combination] = label_counter
#     label_counter += 1
print("标签映射关系 (data_label, data_source -> 编码):")
for combination, code in label_mapping.items():
    print(f"{combination}: {code}")
# 应用标签映射
def assign_label(row):
    combination = (row['data_label'], row['data_source'])
    return label_mapping.get(combination, -1)  # 未知组合设为 - 1
metadata['act_label'] = metadata.apply(assign_label, axis=1)
print(f"\n标签分布:")
print(metadata['act_label'].value_counts().sort_index())
print(f"\n详细标签分布:")
label_distribution = metadata.groupby(['data_label', 'data_source', 'act_label']).size().reset_index(name='count')
print(label_distribution)
def extract_features(row):
    """提取特征函数"""
    domain = row['domain'] if pd.notna(row['domain']) else 'null'
    method = row['method'] if pd.notna(row['method']) else 'null'
    decoded_url = unquote(row['url']) if pd.notna(row['url']) else 'null'
    decoded_request_body = unquote(row['request_body']) if row['request_body_length'] != 0 and pd.notna(
        row['request_body']) else 'null'
    decoded_request_head = unquote(row['request_head']) if not isinstance(row['request_head'], float) and pd.notna(
        row['request_head']) else 'null'
    handle_time = row['request_time'] if pd.notna(row['request_time']) else 0
    UA = row['user_agent'] if pd.notna(row['user_agent']) else 'null'
    response_head = unquote(row['response_head']) if not isinstance(row['response_head'], float) and pd.notna(
        row['response_head']) else 'null'
    response_body = unquote(row['response_body']) if not isinstance(row['response_body'], float) and pd.notna(
        row['response_body']) else 'null'
    status_code = row['status'] if pd.notna(row['status']) else 0
    referer = unquote(row['referer']) if not isinstance(row['referer'], float) and pd.notna(row['referer']) else 'null'
    extra_map = row['extra_map'] if pd.notna(row['extra_map']) else 'null'
    return {
        'domain': domain,
        'method': method,
        'url': decoded_url,
        'request_body': decoded_request_body,
        'request_head': decoded_request_head,
        'handle_time': handle_time,
        'user_agent': UA,
        'response_head': response_head,
        'response_body': response_body,
        'status_code': status_code,
        'referer': referer,
        'extra_map': extra_map,
    }
def create_feature_vectors(df):
    """创建特征向量"""
    features = []
    # 提取所有特征
    for idx, row in tqdm.tqdm(df.iterrows()):
        feature_dict = extract_features(row)
        features.append(feature_dict)
    feature_df = pd.DataFrame(features)
    # 数值特征
    numerical_features = ['handle_time', 'status_code']
    # 文本特征列表
    text_features = ['domain', 'method', 'url', 'request_body', 'request_head',
                     'user_agent', 'response_head', 'response_body', 'referer','extra_map']
    # 处理数值特征
    print('处理数值')
    scaler = StandardScaler()
    numerical_data = scaler.fit_transform(feature_df[numerical_features].fillna(0))
    # 处理分类特征(domain, method, status_code)
    print('处理分类')
    le_domain = LabelEncoder()
    le_method = LabelEncoder()
    domain_encoded = le_domain.fit_transform(feature_df['domain'].astype(str))
    method_encoded = le_method.fit_transform(feature_df['method'].astype(str))
    # 处理文本特征 - 使用 TF-IDF
    # 合并所有文本特征为一个字符串
    print('处理文本')
    combined_text = []
    for idx, row in feature_df.iterrows():
        text_parts = []
        for col in text_features:
            if row[col] != 'null':
                text_parts.append(str(row[col]))
        combined_text.append(' '.join(text_parts))
    # TF-IDF 向量化
    tfidf = TfidfVectorizer(max_features=1000, stop_words='english',
                            ngram_range=(1, 2), min_df=2)
    text_features_tfidf = tfidf.fit_transform(combined_text).toarray()
    # URL 特征工程
    print('处理url')
    url_features = []
    for url in feature_df['url']:
        url_str = str(url)
        url_feat = {
            'url_length': len(url_str),
            'param_count': url_str.count('&'),
            'has_suspicious_chars': int(any(char in url_str for char in ['<', '>', '"', "'", 'script', 'alert'])),
            'path_depth': url_str.count('/'),
            'query_length': len(url_str.split('?')[1]) if '?' in url_str else 0
        }
        url_features.append(list(url_feat.values()))
    url_features = np.array(url_features)
    # 合并所有特征
    final_features = np.hstack([
        numerical_data,
        domain_encoded.reshape(-1, 1),
        method_encoded.reshape(-1, 1),
        text_features_tfidf,
        url_features
    ])
    return final_features, (scaler, le_domain, le_method, tfidf)
# 创建特征向量
print("正在提取特征...")
X, encoders = create_feature_vectors(metadata)
import pickle
f = open(f'encoder-{time.time()}.pkl', 'wb')
pickle.dump(encoders, f)
f.close()
y = metadata['act_label'].values
print(f"特征矩阵形状: {X.shape}")
print(f"标签分布: {np.bincount(y)}")
# 划分训练集和测试集
# X_train, X_test, y_train, y_test = train_test_split(
#     X, y, test_size=0, stratify=y
# )
X_train, y_train = X, y
# 训练多个模型
models = {
    'RandomForest3': RandomForestClassifier(n_estimators=800, n_jobs=-1,criterion='entropy'),
    # 'RandomForest4': RandomForestClassifier(n_estimators=500, n_jobs=-1, criterion='entropy'),
    # 'RandomForest5': RandomForestClassifier(n_estimators=800, n_jobs=-1, criterion='entropy'),
    # 'XGBoost': XGBClassifier(
    #     n_estimators=100,
    #     learning_rate=0.1,
    #     max_depth=6,
    #     min_child_weight=1,
    #     n_jobs=-1
    # )
}
print("\n开始训练模型...")
best_model = None
best_score = 0
best_name = ""
for name, model in models.items():
    print(f"\n训练 {name}...")
    model.fit(X_train, y_train)
    # 预测
    # y_pred = model.predict(X_test)
    # accuracy = accuracy_score(y_test, y_pred)
    #
    # print (f"{name} 准确率: {accuracy:.4f}")
    # print (f"{name} 分类报告:")
    # print(classification_report(y_test, y_pred))
    #
    # if accuracy > best_score:
    #     best_score = accuracy
    best_model = model
    best_name = name
# print (f"\n 最佳模型: {best_name}, 准确率: {best_score:.4f}")
# 保存最佳模型
import pickle
f = open(f'model-{time.time()}.pkl', 'wb')
pickle.dump(best_model, f)
f.close()
# 特征重要性分析(如果是随机森林)
if best_name == 'RandomForest':
    feature_importance = best_model.feature_importances_
    print(f"\n前10个重要特征的重要性:")
    top_indices = np.argsort(feature_importance)[-10:][::-1]
    for i, idx in enumerate(top_indices):
        print(f"特征 {idx}: {feature_importance[idx]:.4f}")

总之 这套系统最终有 0.97 的 f1-score 平均 4.2ms 的响应速度 基本生产可用

大体原理就是这样 之后的后处理部分我增加了一个大模型进行离线多线程数据包分析解释分类原因,便于后期进行运营