Playwright
在现代 Web 数据采集场景下,为什么选择 Playwright 作为页面采集框架,以及官方推荐的使用方式。
一、Playwright 的定位与作用
Section titled “一、Playwright 的定位与作用”Playwright 是一个跨浏览器自动化框架,提供:
- Chromium / Firefox / WebKit 支持
- 真实浏览器控制(可接入指纹浏览器)
- JS 执行、DOM 操作、事件模拟
- 异步 API,适合高并发采集
Playwright 不模拟浏览器,而是直接驱动真实浏览器执行网页逻辑。
二、官方推荐写法
Section titled “二、官方推荐写法”1️⃣ 连接远程指纹浏览器
Section titled “1️⃣ 连接远程指纹浏览器”try: Auth = os.environ.get("PROXY_AUTH") CoreSDK.Log.info(f"当前获取的浏览器认证信息: {Auth}")except Exception as e: #捕获其他未知异常 CoreSDK.Log.error(f"当前获取浏览器认证信息失败: {e}") Auth = Nonereturn
# 指纹浏览器的 CDP 连接地址(从环境变量读取,支持灵活部署)chrome_ws = os.environ.get("ChromeWs") or "chrome-ws-inner.coreclaw.com"CoreSDK.Log.info(f"Chrome WebSocket 地址: {chrome_ws}")
browser_url = f'ws://{Auth}@{chrome_ws}'rest_item = { "url": url, "html": "", "resp_status": "200"}
async with async_playwright() as playwright: CoreSDK.Log.info(f "请求的url:{url}")
try: browser = await playwright.chromium.connect_over_cdp(browser_url)except Exception as e: CoreSDK.Log.info(f "[错误] 指纹浏览器连接失败: {e}") rest_item['resp_status'] = "403" await asyncio.sleep(5) await browser.close()return2️⃣ 页面访问与内容获取
Section titled “2️⃣ 页面访问与内容获取”try: page = await browser.new_page(no_viewport=True) await page.goto(url, timeout=3 * 60 * 1000) html = await page.content() rest_item["html"] = htmlexcept Exception as e: CoreSDK.Log.info(f"[错误] 获取浏览器html失败: {e}") rest_item['resp_status'] = "500" CoreSDK.Result.push_data(rest_item) await asyncio.sleep(5) await browser.close()3️⃣ 完整平台脚本入口示例(推荐直接使用)
Section titled “3️⃣ 完整平台脚本入口示例(推荐直接使用)”import asyncio, sys,traceback,reimport randomimport timeimport os
from lxml import etreefrom urllib.parse import urlparse,urlencodefrom playwright.async_api import async_playwright
from sdk import CoreSDK
async def run(): CoreSDK.Log.info("🚀 Init...") CoreSDK.Log.info("====================================================") CoreSDK.Log.info("🚀 CoreClaw Playwright Browser Scrape Demo") CoreSDK.Log.info("====================================================") headers = [ { "label": "url", "key": "url", "format": "text", }, { "label": "html", "key": "html", "format": "text", }, { "label": "resp_status", "key": "resp_status", "format": "text", } ] res = CoreSDK.Result.set_table_header(headers)
input_json_dict = CoreSDK.Parameter.get_input_json_dict() CoreSDK.Log.debug(f"======input_json_dict====== {input_json_dict}") url = input_json_dict['url']
try: Auth = os.environ.get("PROXY_AUTH") CoreSDK.Log.info(f"当前获取的浏览器认证信息: {Auth}") except Exception as e: # 捕获其他未知异常 CoreSDK.Log.error(f"当前获取浏览器认证信息失败: {e}") Auth = None return
# 指纹浏览器的 CDP 连接地址(从环境变量读取,支持灵活部署) chrome_ws = os.environ.get("ChromeWs") or "chrome-ws-inner.coreclaw.com" CoreSDK.Log.info(f"Chrome WebSocket 地址: {chrome_ws}")
browser_url = f'ws://{Auth}@{chrome_ws}' rest_item = {"url": url, "html": "", "resp_status": "200"} async with async_playwright() as playwright: CoreSDK.Log.info(f"请求的url:{url}") try: browser = await playwright.chromium.connect_over_cdp(browser_url) except Exception as e: CoreSDK.Log.info(f"[错误] 指纹浏览器连接失败: {e}") rest_item['resp_status'] = "403" await asyncio.sleep(5) await browser.close() return
try: page = await browser.new_page(no_viewport=True) await page.goto(url, timeout=3 * 60 * 1000) html = await page.content() rest_item["html"] = html except Exception as e: CoreSDK.Log.info(f"[错误] 获取浏览器html失败: {e}") rest_item['resp_status'] = "500" CoreSDK.Result.push_data(rest_item) await asyncio.sleep(5) await browser.close()
if __name__ == "__main__": asyncio.run(run())4️⃣ 连接 Firefox 指纹浏览器
Section titled “4️⃣ 连接 Firefox 指纹浏览器”Playwright 支持通过 CDP 连接 Firefox 浏览器:
try: Auth = os.environ.get("PROXY_AUTH") CoreSDK.Log.info(f"当前获取的浏览器认证信息: {Auth}")except Exception as e: # 捕获其他未知异常 CoreSDK.Log.error(f"当前获取浏览器认证信息失败: {e}") Auth = None return
# Firefox CDP 连接地址(从环境变量读取,支持灵活部署)firefox_ws = os.environ.get("FirefoxWs") or "firefox-ws-inner.coreclaw.com"CoreSDK.Log.info(f"Firefox WebSocket 地址: {firefox_ws}")
browser_url = f'ws://{Auth}@{firefox_ws}'rest_item = { "url": url, "html": "", "resp_status": "200"}
async with async_playwright() as playwright: CoreSDK.Log.info(f"请求的url:{url}")
try: browser = await playwright.firefox.connect_over_cdp(browser_url) except Exception as e: CoreSDK.Log.info(f"[错误] Firefox 指纹浏览器连接失败: {e}") rest_item['resp_status'] = "403" await asyncio.sleep(5) await browser.close() return三、动态内容与 DOM 操作
Section titled “三、动态内容与 DOM 操作”获取单个元素
Section titled “获取单个元素”# 方法1:CSS选择器(推荐)element = page.locator('.product-title').firstelement = page.locator('#main-content')element = page.locator('h1')
# 方法2:XPathelement = page.locator('xpath=//div[@class="container"]')
# 方法3:文本定位element = page.locator('text=立即购买')element = page.get_by_text('立即购买') # 更推荐的方式element = page.get_by_role('button', name='提交')
# 检查元素是否存在并获取属性if element.count() > 0: text = element.text_content() inner_html = element.inner_html() is_visible = element.is_visible() class_name = element.get_attribute('class')
# 等待元素可见await element.wait_for(state='visible')
# 获取元素边界框bbox = element.bounding_box()批量元素处理
Section titled “批量元素处理”# 获取所有匹配元素product_items = page.locator('.product-item')count = await product_items.count()print(f"找到 {count} 个商品")
# 方法1:遍历处理products_data = []for i in range(count): item = product_items.nth(i) product = { 'name': await item.locator('.name').text_content() if await item.locator('.name').count() > 0 else '', 'price': await item.locator('.price').text_content() if await item.locator('.price').count() > 0 else '', 'link': await item.locator('.link').get_attribute('href') if await item.locator('.link').count() > 0 else '', } products_data.append(product)
# 方法2:使用evaluate_all批量处理(更高效)products_data = await page.evaluate('''() => { const items = document.querySelectorAll('.product-item'); return Array.from(items).map(item => { const nameElem = item.querySelector('.name'); const priceElem = item.querySelector('.price'); const linkElem = item.querySelector('.link'); return { name: nameElem ? nameElem.textContent.trim() : '', price: priceElem ? priceElem.textContent.trim() : '', link: linkElem ? linkElem.href : '' }; });}''')
# 方法3:使用async for循环(Python 3.8+)products_data = []items = page.locator('.product-item')async for i in range(await items.count()): item = items.nth(i) # ... 处理逻辑
# 使用列表推导式(需要异步处理)names = await asyncio.gather(*[ item.locator('.name').text_content() for i in range(count) if await items.nth(i).locator('.name').count() > 0])优势:
- 操作真实 DOM
- JS 渲染内容可直接获取
- 与前端逻辑一致
四、官方不推荐写法(反例)
Section titled “四、官方不推荐写法(反例)”❌ sleep 等待页面加载
Section titled “❌ sleep 等待页面加载”await asyncio.sleep(5)- 不保证 JS 执行完成
- 页面慢时失败,快时浪费时间
❌ requests 模拟浏览器页面
Section titled “❌ requests 模拟浏览器页面”requests.get(url)- 页面内容不完整
- 易触发反爬机制
- 成功率不可控