openclaw_config.py

openclaw 中文openclaw 2

OpenClaw 是一个强大的 Python Web 抓取框架(原 ClawPack),手动适配通常涉及以下几个方面:

openclaw_config.py-第1张图片-OpenClaw下载中文-AI中文智能体

基础配置适配

from openclaw import Claw
# 基础配置
claw = Claw(
    # 请求配置
    timeout=30,
    retry_times=3,
    retry_delay=1.0,
    # 并发控制
    concurrent_requests=5,
    rate_limit=10,  # 每秒请求数
    # 代理设置
    proxy={
        'http': 'http://proxy.example.com:8080',
        'https': 'https://proxy.example.com:8080',
    },
    # 请求头
    headers={
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
        'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
        'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
    }
)

解析器适配

CSS 选择器适配

class MyParser:
    def parse_product(self, response):
        """解析产品页面"""
        return {
            'title': response.css('h1.product-title::text').get(),
            'price': response.css('.price::text').get(),
            'description': response.css('.description::text').get(),
            'images': response.css('.product-images img::attr(src)').getall(),
            'stock': response.xpath('//div[@class="stock"]/text()').get(),
        }
    def parse_list(self, response):
        """解析列表页面"""
        items = []
        for item in response.css('.product-item'):
            items.append({
                'name': item.css('.name::text').get(),
                'url': item.css('a::attr(href)').get(),
                'price': item.css('.price::text').get(),
            })
        # 分页处理
        next_page = response.css('.next-page::attr(href)').get()
        return {'items': items, 'next_page': next_page}

动态页面适配

from openclaw.drivers import SeleniumDriver
class DynamicSiteParser:
    def __init__(self):
        self.driver = SeleniumDriver(
            headless=True,
            proxy="http://proxy:8080",
            user_agent="Custom UA String",
            disable_images=True,  # 提升加载速度
            page_load_timeout=30,
        )
    def parse_with_js(self, url):
        """处理需要 JavaScript 的页面"""
        self.driver.get(url)
        # 等待元素加载
        self.driver.wait_for_element('.product-info', timeout=10)
        # 执行 JavaScript
        self.driver.execute_script(
            "window.scrollTo(0, document.body.scrollHeight);"
        )
        # 获取页面内容
        content = self.driver.page_source
        # 解析
        return self.parse_product(content)
    def close(self):
        self.driver.quit()

反爬虫策略适配

from openclaw.middlewares import (
    RandomUserAgentMiddleware,
    ProxyMiddleware,
    DelayMiddleware,
    RetryMiddleware
)
class AntiScrapingAdapter:
    def __init__(self):
        # User-Agent 轮换
        self.user_agents = [
            'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
            'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36',
            'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36',
        ]
        # 代理池
        self.proxies = [
            'http://proxy1:8080',
            'http://proxy2:8080',
            'http://proxy3:8080',
        ]
        # 延迟设置
        self.delay_range = (1, 3)  # 1-3秒随机延迟
    def get_headers(self):
        return {
            'User-Agent': random.choice(self.user_agents),
            'Accept-Language': 'zh-CN,zh;q=0.9',
            'Accept-Encoding': 'gzip, deflate',
            'Connection': 'keep-alive',
        }
    def get_proxy(self):
        return random.choice(self.proxies)
    def random_delay(self):
        time.sleep(random.uniform(*self.delay_range))

数据存储适配

from openclaw.pipelines import BasePipeline
import pymongo
import mysql.connector
import json
class MultiStoragePipeline(BasePipeline):
    def __init__(self):
        # MongoDB 存储
        self.mongo_client = pymongo.MongoClient('localhost', 27017)
        self.mongo_db = self.mongo_client['scraping_data']
        # MySQL 存储
        self.mysql_conn = mysql.connector.connect(
            host="localhost",
            user="root",
            password="password",
            database="scraping_data"
        )
        # 文件存储
        self.json_file = open('data.json', 'a', encoding='utf-8')
    def process_item(self, item, spider):
        # 存储到 MongoDB
        self.mongo_db[spider.name].insert_one(dict(item))
        # 存储到 MySQL
        cursor = self.mysql_conn.cursor()
        sql = """
            INSERT INTO products (title, price, description)
            VALUES (%s, %s, %s)
        """
        cursor.execute(sql, (item['title'], item['price'], item['description']))
        self.mysql_conn.commit()
        # 保存为 JSON
        json.dump(dict(item), self.json_file, ensure_ascii=False)
        self.json_file.write('\n')
        return item
    def close_spider(self, spider):
        self.json_file.close()
        self.mysql_conn.close()
        self.mongo_client.close()

监控与日志适配

import logging
from datetime import datetime
from openclaw.signals import spider_opened, spider_closed
class MonitoringAdapter:
    def __init__(self):
        self.logger = logging.getLogger(__name__)
        self.start_time = None
        self.request_count = 0
        # 设置日志
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler(f'scrape_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log'),
                logging.StreamHandler()
            ]
        )
    @spider_opened.connect
    def on_spider_opened(self, spider):
        self.start_time = datetime.now()
        self.logger.info(f"Spider {spider.name} opened")
    @spider_closed.connect
    def on_spider_closed(self, spider, reason):
        duration = datetime.now() - self.start_time
        self.logger.info(
            f"Spider {spider.name} closed. "
            f"Reason: {reason}. "
            f"Duration: {duration}. "
            f"Total requests: {self.request_count}"
        )

完整示例:电商网站爬虫

from openclaw import Claw, signals
import json
from urllib.parse import urljoin
class EcommerceSpider:
    def __init__(self):
        self.claw = Claw(
            concurrent_requests=3,
            rate_limit=2,
            headers={
                'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
            }
        )
        self.base_url = 'https://example.com'
        self.data = []
    def start(self):
        """启动爬虫"""
        # 从分类页面开始
        categories = self.get_categories()
        for category in categories:
            # 爬取分类下的产品列表
            products = self.get_products(category['url'])
            for product in products:
                # 爬取产品详情
                detail = self.get_product_detail(product['url'])
                self.data.append(detail)
                # 保存进度
                self.save_progress()
    def get_categories(self):
        """获取分类列表"""
        response = self.claw.get(self.base_url + '/categories')
        categories = []
        for cat in response.css('.category-item'):
            categories.append({
                'name': cat.css('::text').get(),
                'url': cat.css('a::attr(href)').get()
            })
        return categories
    def get_products(self, url):
        """获取产品列表(支持分页)"""
        products = []
        page = 1
        while True:
            page_url = f"{url}?page={page}"
            response = self.claw.get(page_url)
            items = response.css('.product-list .item')
            if not items:
                break
            for item in items:
                products.append({
                    'name': item.css('.name::text').get(),
                    'url': urljoin(self.base_url, item.css('a::attr(href)').get()),
                    'price': item.css('.price::text').get(),
                })
            # 检查是否有下一页
            next_page = response.css('.next-page')
            if not next_page:
                break
            page += 1
        return products
    def get_product_detail(self, url):
        """获取产品详情"""
        response = self.claw.get(url)
        return {
            'url': url,
            'title': response.css('h1.product-title::text').get(),
            'price': response.css('.price::text').get(),
            'description': response.css('.description::text').getall(),
            'specifications': self.parse_specifications(response),
            'reviews': self.parse_reviews(response),
            'timestamp': datetime.now().isoformat(),
        }
    def save_progress(self):
        """保存进度"""
        with open('scraped_data.json', 'w', encoding='utf-8') as f:
            json.dump(self.data, f, ensure_ascii=False, indent=2)
if __name__ == '__main__':
    spider = EcommerceSpider()
    spider.start()

性能优化建议

  1. 缓存已访问 URL
    from functools import lru_cache

@lru_cache(maxsize=10000) def normalize_url(url): """规范化URL并缓存""" return url.rstrip('/')


2. **异步处理**:
```python
import asyncio
from openclaw.async_claw import AsyncClaw
async def async_crawl():
    async with AsyncClaw() as claw:
        urls = ['url1', 'url2', 'url3']
        tasks = [claw.get(url) for url in urls]
        responses = await asyncio.gather(*tasks)
  1. 增量爬取
    import hashlib

def get_content_hash(content): """计算内容哈希,用于去重""" return hashlib.md5(content.encode()).hexdigest()


这些适配方案可以根据具体的爬取目标进行调整,关键是:
- 合理设置请求间隔和并发数
- 处理反爬机制
- 设计健壮的解析逻辑
- 实现可靠的数据存储
- 添加适当的监控和日志

标签: OpenClawGripper OpenClawState

抱歉,评论功能暂时关闭!