zcspider/mycode/crawl_chrome.py

242 lines
9.6 KiB
Python

from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options
import time
from urllib.parse import urlparse
from pathlib import Path
import pandas as pd
from .base import BASE_DIR
import os
from selenium.common.exceptions import TimeoutException
chrome_driver_file = os.path.join(BASE_DIR, 'mycode', 'chromedriver.exe')
failed_sites_file = os.path.join(BASE_DIR, 'mycode/failed_sites.xlsx')
def fix_url_scheme(url, default_scheme='http'):
# 检查URL是否包含方案
if not url.startswith('http://') and not url.startswith('https://'):
# 如果没有方案,添加默认方案
url = f'{default_scheme}://{url}'
return url
def init_driver():
# Set up Chrome WebDriver with custom User-Agent
options = Options()
options.add_argument("--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.93 Safari/537.36")
prefs = {"profile.managed_default_content_settings.images": 2, 'profile.managed_default_content_settings.notifications':2}
options.add_argument("--disable-default-apps") # 禁用默认应用程序
# 禁用 "tel" 协议处理
options.add_argument("--disable-features=WebAppInstallForceList,ProtocolHandler,ProtocolHandlerMixedSchemes")
options.add_argument("--disable-protocol-handler")
# 禁用 "mailto" 协议处理
options.add_argument("--disable-features=WebAppInstallForceList,ProtocolHandler,ProtocolHandlerMixedSchemes,PreloadMediaEngagementData")
options.add_argument("--disable-protocol-handler")
options.add_experimental_option("prefs", prefs)
driver = webdriver.Chrome(chrome_driver_file, options=options)
return driver
def open_website(url):
# Set up Chrome WebDriver with custom User-Agent
options = Options()
options.add_argument("--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.93 Safari/537.36")
prefs = {"profile.managed_default_content_settings.images": 2, 'notifications':2}
options.add_experimental_option("prefs", prefs)
driver = webdriver.Chrome(chrome_driver_file, options=options)
driver.get(url)
return driver
def extract_hyperlinks(driver):
# Find all elements with href attribute
elements = driver.find_elements(By.XPATH, '//*[@href]')
# Extract the href values from the elements
hrefs = [element.get_attribute('href') for element in elements]
return hrefs
def ignore_image_and_document_hrefs(href):
parsed_url = urlparse(href)
path = parsed_url.path
file_extension = Path(path).suffix
# Check if the href has a domain suffix of image or document file extensions
return file_extension.lower() in ('.png', '.jpg', '.jpeg', '.pdf', '.doc', '.docx', '.xls', '.xlsx', '.ppt', '.pptx', '.rar', '.zip', '.ico', '.css', '.js')
def process_page(driver, url, visited_pages, start_domain, data, group, name):
if not url.startswith('http'):
return
# Add the URL to visited pages
visited_pages.add(url)
# Navigate to the URL
driver.get(url)
# Wait for the page to load
time.sleep(2)
# Extract the content from the page
content_element = driver.find_element(By.XPATH, '//body')
content_text = content_element.text
# print(content_text)
# Add URL, Domain, and Content to the data list
data.append([group, name, start_domain, url, content_text])
# Find and process hyperlinks
hrefs = extract_hyperlinks(driver)
for href in hrefs:
# Check if the href should be ignored
if ignore_image_and_document_hrefs(href):
continue
# Check if the href is of type "javascript:void(0)"
if href.startswith("javascript:void(0)"):
continue
# Check if the href leads back to the original page or has already been visited
if check_href(href, driver.current_url, visited_pages):
continue
try:
# Check if the new href belongs to the same domain as the original URL
parsed_href = urlparse(href)
if parsed_href.netloc.replace("www.", "") != start_domain:
continue
# # Open the href in the same tab and retrieve data
# driver.get(href)
# # print(href)
# # Wait for the page to load
# time.sleep(2)
# # Extract the content from the hyperlink page
# hyperlink_content_element = driver.find_element(By.XPATH, '//body')
# hyperlink_content_text = hyperlink_content_element.text
# # print(hyperlink_content_text)
# # Add URL, Domain, and Content of the hyperlink to the data list
# data.append([start_domain, href, hyperlink_content_text])
# Recursively process the page and follow hyperlinks
process_page(driver, href, visited_pages, start_domain, data, group, name)
except Exception as e:
print(f"Error processing hyperlink: {href}")
print(f"Error message: {str(e)}")
continue
# Return to the original page
# driver.get(url)
def check_href(href, original_url, visited_pages):
parsed_href = urlparse(href)
parsed_original_url = urlparse(original_url)
# Check if the href leads back to the original page
if parsed_href.netloc.replace("www.", "") == parsed_original_url.netloc.replace("www.", "") and parsed_href.path == parsed_original_url.path:
return True
# Check if the href has already been visited
if href in visited_pages:
return True
return False
def export_to_excel(data, output_filename):
# Create separate lists for each column
groups = [item[0] for item in data]
names = [item[1] for item in data]
domains = [item[2] for item in data]
urls = [item[3] for item in data]
texts = [item[4] for item in data]
# Create a DataFrame from the data lists
df = pd.DataFrame({'group': groups, 'name': names, 'domain': domains, 'url': urls, 'text': texts})
# Export the DataFrame to an Excel file
df.to_excel(output_filename, index=False)
def get_cookies_from_previous_session(driver):
cookies = {}
try:
# Execute JavaScript to get the cookies using jQuery Cookie Plugin
cookie_script = """
var cookies = $.cookie();
return Object.entries(cookies).map(([name, value]) => `${name}=${value}`);
"""
cookie_values = driver.execute_script(cookie_script)
# Parse the cookie values and store them in a dictionary
for cookie_value in cookie_values:
cookie_name, cookie_value = cookie_value.split('=')
cookies[cookie_name] = cookie_value
except Exception as e:
print('Error getting cookies:', e)
return cookies
def add_cookies(driver, cookies):
for name, value in cookies.items():
driver.add_cookie({'name': name, 'value': value})
def chrom_main_from_list(sites):
for ind, item in enumerate(sites):
group = item[0] # Replace with the actual column name for group
name = item[1]
url = item[2]
domain = urlparse(url).netloc.replace("www.", "")
if domain in ['xdjstc.com', 'epcyiqizu.com', 'cbra.ctc.ac.cn']:
continue
url = fix_url_scheme(url)
driver = init_driver()
# Open the website
# driver.get(url)
# # Retrieve cookies from previous session
# cookies = get_cookies_from_previous_session(driver)
# # Add cookies to the WebDriver
# add_cookies(driver, cookies)
# Initialize the set to store visited pages
visited_pages = set()
# Initialize the data list
data = []
try:
# 设置页面加载超时时间为10秒
driver.set_page_load_timeout(10)
# 设置脚本执行超时时间为10秒
driver.set_script_timeout(10)
# 在这里编写你的代码,例如打开网页、点击按钮等操作
# ...
process_page(driver, url, visited_pages, domain, data, group, name)
except TimeoutException:
# 当超时异常发生时,进行相应的操作,例如跳过或报错
print("超时异常")
driver.quit()
# Export data to a separate Excel file in the web_dir directory
output_filename = os.path.join(BASE_DIR, f'web_dir/{name}_{domain}.xlsx')
export_to_excel(data, output_filename)
# Close the WebDriver
# driver.quit()
def chrome_main():
# Read failed URLs from the list
df = pd.read_excel(failed_sites_file)
for ind, row in df.iterrows():
group = row['单位'] # Replace with the actual column name for group
name = row['主办']
url = row['地址']
domain = urlparse(url).netloc.replace("www.", "")
# Open the website
driver = open_website(url)
# Retrieve cookies from previous session
cookies = get_cookies_from_previous_session(driver)
# Add cookies to the WebDriver
add_cookies(driver, cookies)
# Initialize the set to store visited pages
visited_pages = set()
# Initialize the data list
data = []
# Process the starting page and follow hyperlinks recursively
process_page(driver, url, visited_pages, domain, data)
# Export data to a separate Excel file in the web_dir directory
output_filename = os.path.join(BASE_DIR, f'web_dir/{name}_{domain}.xlsx')
export_to_excel(data, output_filename)
# Close the WebDriver
driver.quit()
if __name__ == "__main__":
chrome_main()