1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97
| import asyncio import traceback import platform from DrissionPage import ChromiumPage, ChromiumOptions from loguru import logger
ua = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36"
class Cloudflare5sBypass(object): driver = None
def __init__(self, proxy_server=None): browser_path = r"C:\Program Files\Google\Chrome\Application\chrome.exe" options = ChromiumOptions() options.set_paths(browser_path=browser_path) options.set_user_agent(ua) arguments = [ "--accept-lang=en-US", "--no-first-run", "--force-color-profile=srgb", "--metrics-recording-only", "--password-store=basic", "--use-mock-keychain", "--export-tagged-pdf", "--no-default-browser-check", "--enable-features=NetworkService,NetworkServiceInProcess,LoadCryptoTokenExtension,PermuteTLSExtensions", "--disable-gpu", "--disable-infobars", "--disable-popup-blocking", "--disable-background-mode", "--disable-features=FlashDeprecationWarning,EnablePasswordsAccountStorage,PrivacySandboxSettings4", "--deny-permission-prompts", "--disable-suggestions-ui", "--hide-crash-restore-bubble", "--window-size=800,600", "--disable-mobile-emulation" ] _platform = platform.system().lower() if bool(_platform == "linux"): options.headless(True) arguments.append("--no-sandbox")
if proxy_server: options.set_proxy(proxy_server)
for argument in arguments: options.set_argument(argument)
self.driver = ChromiumPage(addr_or_opts=options)
async def get_web_content(self, _url): self.driver.set.cookies.clear() web_content = None tab_id = self.driver.new_tab(_url).tab_id tab = self.driver.get_tab(tab_id) try: await asyncio.sleep(3) for i in range(5): logger.debug("正在检测是否人机验证页面... ", tab.title) web_content = await self.bypass(tab) if web_content: break await asyncio.sleep(3) if not web_content: logger.info("默认返回页面源码") web_content = tab.html except: logger.error(traceback.format_exc()) finally: tab.close() return web_content
async def bypass(self, _tab): ele_flag = ".spacer" logger.debug("-------------debug1-------------") if _tab.wait.ele_displayed(ele_flag, timeout=1.5): logger.debug("-------------debug2-------------") verify_element = _tab.ele(ele_flag, timeout=2.5) logger.debug("-------------debug3-------------") if verify_element: await asyncio.sleep(5) logger.debug("-------------debug4-------------") verify_element.click() logger.debug("点击了人机验证按钮") await asyncio.sleep(5)
for line in _tab.cookies(): if line["name"] == "cf_clearance": logger.success("成功绕过cloudflare5s盾,返回页面源码!") return _tab.html return None
|