我们面临的第一个问题不是模型,而是数据。具体来说,是为特定垂直领域的深度学习模型持续、自动化地获取高质量、动态渲染的网页数据。传统的爬虫库如 Scrapy 在面对重度依赖JavaScript的现代单页应用(SPA)时显得力不从心,而手动收集和标注数据的成本则高到无法接受。我们需要的是一个能自我驱动、自我优化的数据“永动机”。
这个系统的核心构想是一个闭环:
- 采集 (Acquisition): 使用无头浏览器技术深度抓取网页的最终渲染DOM和网络流量。
- 存储 (Storage): 将海量的原始数据和提取的特征存入一个可水平扩展的数据库。
- 学习 (Learning): 利用一个模型对采集到的数据进行分析和价值评估。
- 反馈 (Feedback): 将模型的评估结果反馈给采集模块,用以指导下一轮的采集目标,优先抓取高价值页面。
整个流程必须是自动化的,并且由一个中心化的、可交互的环境进行调度和监控。
架构选型与权衡
在真实项目中,技术选型从来不是追求“最好”,而是“最合适”。
graph TD subgraph Jupyter Orchestration A[Jupyter Notebook] end subgraph Acquisition Module B(Puppeteer Cluster) end subgraph Storage & Data Bus C[HBase] D((URL Queue in HBase)) end subgraph Learning & Inference Module E(PyTorch Training) F(PyTorch Inference) end A -- "1. Trigger Scrape Batch" --> B B -- "2. Scrape Pages from Queue" --> D B -- "3. Store Raw HTML/Content" --> C A -- "4. Trigger Training Job" --> E E -- "5. Read Data" --> C E -- "6. Save Trained Model" --> C A -- "7. Trigger Inference/Scoring" --> F F -- "8. Score New URLs" --> D
采集端: Puppeteer
- 为什么是Puppeteer? 相较于Selenium,Puppeteer提供了更底层的Chrome DevTools Protocol (CDP) 接口,能精细控制浏览器行为,比如拦截请求、模拟设备、追踪性能。这对于反爬和提取动态注入的数据至关重要。虽然它与Node.js绑定,但我们可以通过Python的
pyppeteer
库或将其封装为服务来调用。在我们的场景中,我们选择将其封装成一个独立的Node.js微服务,Python通过API调用,实现技术栈解耦。 - 坑点: 资源消耗。每个Puppeteer实例都是一个完整的Chrome浏览器,内存和CPU占用巨大。必须实现一个有效的池化和调度机制来管理浏览器实例,否则服务器会迅速崩溃。
- 为什么是Puppeteer? 相较于Selenium,Puppeteer提供了更底层的Chrome DevTools Protocol (CDP) 接口,能精细控制浏览器行为,比如拦截请求、模拟设备、追踪性能。这对于反爬和提取动态注入的数据至关重要。虽然它与Node.js绑定,但我们可以通过Python的
存储层: HBase
- 为什么是HBase? 我们要存储的数据类型多样:原始HTML文本(可能很大)、提取的结构化文本、图片URL、模型embedding向量、页面元数据等。HBase的宽表模型对此非常友好,可以动态增删列。更重要的是,它的横向扩展能力和基于行键(Row Key)的高性能读写,完美契合我们海量数据写入和按URL快速检索的需求。
- 设计决策: Row Key的设计是HBase性能的关键。我们采用
reversed_domain#timestamp_ms#url_hash
的结构。反转域名(com.example.www
)可以避免热点问题,时间戳用于时序分析,URL哈希则确保唯一性。
学习核心: PyTorch 与 TensorFlow 的考量
- 为什么训练用PyTorch? PyTorch的动态计算图和其高度Pythonic的API,在研究和原型验证阶段提供了极大的灵活性。在Jupyter中,我们可以逐行调试模型和数据处理流程,这对于快速迭代至关重要。我们的模型需要处理非结构化文本,使用Hugging Face的Transformers库(基于PyTorch)是自然的选择。
- TensorFlow的角色? 在真实生产环境中,模型的服务化部署是另一个话题。TensorFlow Serving (TFS) 在这方面非常成熟,提供高吞吐、低延迟的gRPC接口,并且支持模型版本管理和热更新。一个常见的生产模式是:用PyTorch完成训练,然后将模型转换为ONNX格式,再由TensorFlow Serving加载并提供服务。 这样既享受了PyTorch的研发便利,又获得了TFS的部署优势。本文的重点在闭环系统本身,因此主要实现PyTorch的训练和推理部分。
调度中心: Jupyter
- 为什么是Jupyter? 对于这个系统的早期和迭代阶段,Jupyter是完美的“胶水”。它不是一个生产级的调度器如Airflow,但它提供了无与伦比的交互性。我们可以手动触发各个阶段,检查中间结果,可视化数据分布,调试模型输出,然后再将稳定的代码固化成脚本。它充当了我们的大脑和控制台。
核心实现:代码与解析
1. Puppeteer 采集服务 (Node.js)
我们将Puppeteer封装成一个简单的Express.js服务。它接收一个URL,返回页面内容。在生产环境中,这会是一个更复杂的队列消费者。
// scraper-service/index.js
const express = require('express');
const puppeteer = require('puppeteer');
const pino = require('pino');
const app = express();
const port = 3000;
const logger = pino();
app.use(express.json());
// 浏览器实例池(简化版)
// 在生产中,你会使用 puppeteer-cluster 或类似的库
let browserInstance;
async function getBrowser() {
if (!browserInstance) {
logger.info('Launching new browser instance...');
browserInstance = await puppeteer.launch({
headless: true,
args: [
'--no-sandbox',
'--disable-setuid-sandbox',
'--disable-dev-shm-usage',
'--disable-accelerated-2d-canvas',
'--no-first-run',
'--no-zygote',
'--single-process', // a controversial choice for memory, but helps in some container environments
'--disable-gpu'
]
});
}
return browserInstance;
}
app.post('/scrape', async (req, res) => {
const { url } = req.body;
if (!url) {
return res.status(400).json({ error: 'URL is required' });
}
let page;
try {
const browser = await getBrowser();
page = await browser.newPage();
// 基础的反爬措施
await page.setUserAgent('Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36');
await page.setViewport({ width: 1920, height: 1080 });
logger.info({ url }, `Navigating to URL`);
await page.goto(url, { waitUntil: 'networkidle2', timeout: 30000 });
const content = await page.content();
const title = await page.title();
// 也可以在这里提取更结构化的数据
// const data = await page.evaluate(() => { ... });
logger.info({ url, title }, `Successfully scraped`);
res.json({ url, title, content });
} catch (error) {
logger.error({ url, error: error.message }, 'Scraping failed');
res.status(500).json({ error: 'Failed to scrape URL', details: error.message });
} finally {
if (page) {
await page.close();
}
}
});
app.listen(port, () => {
logger.info(`Scraper service listening at http://localhost:${port}`);
});
// 优雅地关闭浏览器
process.on('SIGINT', async () => {
if (browserInstance) {
await browserInstance.close();
}
process.exit();
});
代码解析:
- 配置与日志: 使用
pino
进行结构化日志记录,这在排查问题时至关重要。 - 浏览器管理: 我们实现了一个非常简化的单例浏览器实例。在真实项目中,
puppeteer-cluster
是更好的选择,它能管理一个浏览器实例池,处理任务队列、重试和错误。 - 反爬策略: 设置了常见的User-Agent和Viewport。更高级的策略可能包括使用代理、处理验证码(集成第三方服务)、模拟人类行为(随机延迟、鼠标移动)等。
- 错误处理:
try...catch...finally
结构确保即使页面加载失败,page实例也会被关闭,防止内存泄漏。这是一个常见的坑。
2. HBase数据存取层 (Python)
我们使用happybase
库在Python中与HBase交互。
# hbase_connector.py
import happybase
import hashlib
import time
from urllib.parse import urlparse
import logging
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
class HBaseConnector:
def __init__(self, host='localhost', port=9090, table_name='web_data'):
"""
初始化HBase连接。
在生产环境中,host, port等应来自配置文件。
"""
try:
self.connection = happybase.Connection(host, port, timeout=10000)
self.table_name = table_name
self.connection.open()
if table_name.encode() not in self.connection.tables():
self.create_table()
self.table = self.connection.table(self.table_name)
logging.info(f"Successfully connected to HBase table '{self.table_name}'")
except Exception as e:
logging.error(f"Failed to connect to HBase: {e}")
raise
def create_table(self):
"""
创建HBase表,定义列族。
这是一个一次性操作。
"""
families = {
'raw': dict(), # 存放原始HTML内容
'meta': dict(), # 存放元数据,如标题、抓取时间
'features': dict(), # 存放提取的特征,如文本、embedding
'score': dict() # 存放模型的评分
}
self.connection.create_table(self.table_name, families)
logging.info(f"HBase table '{self.table_name}' created with families: {list(families.keys())}")
def _generate_row_key(self, url):
"""
生成HBase Row Key.
格式: reversed_domain#timestamp_ms#url_hash
"""
parsed_url = urlparse(url)
domain = parsed_url.netloc
reversed_domain = '.'.join(domain.split('.')[::-1])
timestamp_ms = int(time.time() * 1000)
url_hash = hashlib.md5(url.encode('utf-8')).hexdigest()
return f"{reversed_domain}#{timestamp_ms}#{url_hash}"
def write_scraped_data(self, url, title, content):
"""
将抓取的数据写入HBase
"""
row_key = self._generate_row_key(url)
data = {
'raw:content': content.encode('utf-8'),
'meta:title': title.encode('utf-8'),
'meta:url': url.encode('utf-8'),
'meta:scraped_at': str(time.time())
}
try:
self.table.put(row_key, data)
logging.info(f"Wrote data for URL {url} with key {row_key}")
return row_key
except Exception as e:
logging.error(f"Failed to write data for URL {url}: {e}")
return None
def read_batch_for_training(self, limit=1000, start_row=None):
"""
批量读取数据用于模型训练。
"""
# 在真实场景中,扫描可能会很慢,需要优化或使用MapReduce/Spark
scan_options = {'limit': limit}
if start_row:
scan_options['row_start'] = start_row
data = []
try:
# 只获取需要的列族以提升性能
scanner = self.table.scan(columns=['raw:content', 'meta:title'], **scan_options)
for key, row_data in scanner:
decoded_data = {
'row_key': key.decode('utf-8'),
'content': row_data.get(b'raw:content', b'').decode('utf-8', 'ignore'),
'title': row_data.get(b'meta:title', b'').decode('utf-8', 'ignore')
}
data.append(decoded_data)
logging.info(f"Read {len(data)} rows for training.")
return data
except Exception as e:
logging.error(f"Failed to read batch for training: {e}")
return []
def close(self):
if self.connection:
self.connection.close()
logging.info("HBase connection closed.")
# 单元测试思路
# def test_hbase_connector():
# connector = HBaseConnector(table_name='test_web_data')
# # 1. 测试写入
# test_url = "http://example.com/test"
# row_key = connector.write_scraped_data(test_url, "Test Title", "<html>Test</html>")
# assert row_key is not None
# # 2. 测试读取
# data = connector.table.row(row_key)
# assert data[b'meta:url'] == test_url.encode('utf-8')
# # 3. 清理
# connector.connection.delete_table('test_web_data', disable=True)
# connector.close()
代码解析:
- Row Key设计: 这是最重要的部分。
reversed_domain
防止了写入流量集中在少数几个Region Server上。 - 列族 (Column Families): 将不同类型或访问模式的数据分到不同列族 (
raw
,meta
,features
) 是HBase的最佳实践。这有助于优化物理存储和缓存。 - 数据读写:
put
用于写入,scan
用于批量读取。注意,全表扫描 (scan
) 对于非常大的表是昂贵的操作。在生产系统中,通常会结合Spark或MapReduce作业来并行处理HBase中的数据,或者设计更精细的Row Key来支持范围查询。
3. Jupyter 调度与 PyTorch 模型训练
这是将所有部分粘合在一起的Jupyter Notebook。
# In a Jupyter Notebook cell
import requests
import pandas as pd
from bs4 import BeautifulSoup
import torch
from torch.utils.data import Dataset, DataLoader
from transformers import BertTokenizer, BertForSequenceClassification, AdamW
from sklearn.model_selection import train_test_split
# --- Cell 1: 初始化连接 ---
from hbase_connector import HBaseConnector
# 确保HBase和Scraper服务正在运行
SCRAPER_API_URL = "http://localhost:3000/scrape"
hbase_conn = HBaseConnector(table_name='web_content_pipeline')
# --- Cell 2: 触发数据采集 ---
# 初始URL种子列表。在闭环系统中,这个列表会被模型输出动态填充
seed_urls = ["https://news.ycombinator.com", "https://github.com/trending", "http://www.example.com"]
for url in seed_urls:
try:
response = requests.post(SCRAPER_API_URL, json={'url': url})
if response.status_code == 200:
data = response.json()
hbase_conn.write_scraped_data(data['url'], data['title'], data['content'])
else:
print(f"Failed to scrape {url}: {response.text}")
except requests.exceptions.RequestException as e:
print(f"Error calling scraper for {url}: {e}")
# --- Cell 3: 加载数据并预处理 ---
# 这是一个简化的示例,假设我们已经有了一些带标签的数据用于初始训练
# 在真实场景中,第一批数据可能需要手动标注
raw_data = hbase_conn.read_batch_for_training(limit=500)
df = pd.DataFrame(raw_data)
# 简单的数据清洗和特征提取
def clean_html(html_content):
if not html_content:
return ""
soup = BeautifulSoup(html_content, 'html.parser')
# 移除脚本和样式
for script_or_style in soup(["script", "style"]):
script_or_style.decompose()
text = soup.get_text()
lines = (line.strip() for line in text.splitlines())
chunks = (phrase.strip() for line in lines for phrase in line.split(" "))
return ' '.join(chunk for chunk in chunks if chunk)
df['text'] = df['content'].apply(clean_html)
# 假设我们有一个'label'列 (0: low-value, 1: high-value)
# 这里我们随机生成标签作为演示
df['label'] = torch.randint(0, 2, (len(df),)).tolist()
# --- Cell 4: 定义PyTorch Dataset ---
class WebContentDataset(Dataset):
def __init__(self, texts, labels, tokenizer, max_len):
self.texts = texts
self.labels = labels
self.tokenizer = tokenizer
self.max_len = max_len
def __len__(self):
return len(self.texts)
def __getitem__(self, item):
text = str(self.texts[item])
label = self.labels[item]
encoding = self.tokenizer.encode_plus(
text,
add_special_tokens=True,
max_length=self.max_len,
return_token_type_ids=False,
padding='max_length',
truncation=True,
return_attention_mask=True,
return_tensors='pt',
)
return {
'text': text,
'input_ids': encoding['input_ids'].flatten(),
'attention_mask': encoding['attention_mask'].flatten(),
'labels': torch.tensor(label, dtype=torch.long)
}
# --- Cell 5: 模型训练设置 ---
PRE_TRAINED_MODEL_NAME = 'bert-base-uncased'
MAX_LEN = 256
BATCH_SIZE = 8
EPOCHS = 1
tokenizer = BertTokenizer.from_pretrained(PRE_TRAINED_MODEL_NAME)
model = BertForSequenceClassification.from_pretrained(PRE_TRAINED_MODEL_NAME, num_labels=2)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
# 划分数据集
train_texts, val_texts, train_labels, val_labels = train_test_split(
df['text'].tolist(), df['label'].tolist(), test_size=0.1
)
def create_data_loader(texts, labels, tokenizer, max_len, batch_size):
ds = WebContentDataset(texts, labels, tokenizer, max_len)
return DataLoader(ds, batch_size=batch_size, num_workers=2)
train_data_loader = create_data_loader(train_texts, train_labels, tokenizer, MAX_LEN, BATCH_SIZE)
val_data_loader = create_data_loader(val_texts, val_labels, tokenizer, MAX_LEN, BATCH_SIZE)
optimizer = AdamW(model.parameters(), lr=2e-5, correct_bias=False)
# --- Cell 6: 训练循环 ---
for epoch in range(EPOCHS):
model.train()
for batch in train_data_loader:
input_ids = batch["input_ids"].to(device)
attention_mask = batch["attention_mask"].to(device)
labels = batch["labels"].to(device)
outputs = model(
input_ids=input_ids,
attention_mask=attention_mask,
labels=labels
)
loss = outputs.loss
loss.backward()
optimizer.step()
optimizer.zero_grad()
print(f"Epoch {epoch + 1}/{EPOCHS}, Loss: {loss.item()}")
# --- Cell 7: 实现反馈循环 ---
# 保存模型
torch.save(model.state_dict(), 'content_classifier.bin')
# 加载模型
# model.load_state_dict(torch.load('content_classifier.bin'))
def predict_value(text, model, tokenizer, device, max_len=256):
model.eval()
encoding = tokenizer.encode_plus(
text,
add_special_tokens=True,
max_length=max_len,
return_token_type_ids=False,
padding='max_length',
truncation=True,
return_attention_mask=True,
return_tensors='pt',
)
input_ids = encoding['input_ids'].to(device)
attention_mask = encoding['attention_mask'].to(device)
with torch.no_grad():
outputs = model(input_ids=input_ids, attention_mask=attention_mask)
_, preds = torch.max(outputs.logits, dim=1)
return "high-value" if preds.item() == 1 else "low-value"
# 假设我们从某个页面发现了一批新的URL
new_urls_to_evaluate = [
# ... 从已抓取页面中提取出的链接 ...
]
# 在这里,我们会抓取这些新URL的标题或摘要,然后用模型打分
# 高分的URL将被放入一个高优先级的队列(在HBase中可以是另一个表或特定的行)
# 从而指导下一轮的Puppeteer抓取
# ... 此处省略具体实现,但逻辑是闭环的关键 ...
# --- Cell 8: 清理连接 ---
hbase_conn.close()
代码解析:
- Pipeline化: Notebook的每个Cell代表了pipeline的一个阶段,清晰明了。
- 数据清洗:
BeautifulSoup
用于从原始HTML中提取纯文本,这是一个非常基础但必要的预处理步骤。 - PyTorch/Transformers: 我们使用了
transformers
库,这是现代NLP任务的事实标准。代码展示了如何定义Dataset
、DataLoader
,以及一个标准的训练循环。 - 闭环逻辑: Cell 7是整个系统的核心思想。虽然代码被简化了,但它清晰地指出了如何使用训练好的模型去评估新的候选目标,并将结果反馈到采集系统中,实现“自优化”。
局限性与未来迭代路径
这个基于Jupyter的系统作为一个原型和迭代框架非常强大,但在直接投入大规模生产时存在明显局限:
- 调度与容错: Jupyter Notebook不是一个健壮的调度器。任何一个Cell的失败都可能导致整个流程中断。生产环境需要迁移到如Airflow, Dagster,或Argo Workflows这样的工作流管理系统,它们提供重试、依赖管理、监控和告警。
- Puppeteer集群管理: 单点的Node.js服务无法应对大规模抓取。需要一个真正的集群管理方案,能够动态增减浏览器实例,并将任务分发到多个节点。
- 数据处理瓶颈: 当数据量达到TB级别时,从HBase中拉取全量数据到单机进行训练是不可行的。届时必须引入分布式计算框架,如Apache Spark,利用其与HBase的连接器,在Spark集群上进行分布式的数据预处理和特征工程。
- 模型服务化: 在Jupyter中进行推理只适用于小批量或离线任务。对于需要实时评估URL价值的场景,应将模型部署到TensorFlow Serving或TorchServe等专用推理服务器上,提供稳定的API接口。
- 反馈循环的复杂性: 一个简单的“高价值/低价值”分类可能导致系统陷入“信息茧房”,反复抓取同类型内容。未来的模型需要更复杂,例如引入探索/利用 (Exploration/Exploitation) 机制,主动抓取一些未知类型的页面以避免局部最优。