from selenium import webdriver from selenium.webdriver.common.by import By from selenium.webdriver.chrome.options import Options import time from urllib.parse import urlparse from pathlib import Path import pandas as pd from .base import BASE_DIR import os from selenium.common.exceptions import TimeoutException chrome_driver_file = os.path.join(BASE_DIR, 'mycode', 'chromedriver.exe') failed_sites_file = os.path.join(BASE_DIR, 'mycode/failed_sites.xlsx') def fix_url_scheme(url, default_scheme='http'): # 检查URL是否包含方案 if not url.startswith('http://') and not url.startswith('https://'): # 如果没有方案,添加默认方案 url = f'{default_scheme}://{url}' return url def init_driver(): # Set up Chrome WebDriver with custom User-Agent options = Options() options.add_argument("--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.93 Safari/537.36") prefs = {"profile.managed_default_content_settings.images": 2, 'profile.managed_default_content_settings.notifications':2} options.add_argument("--disable-default-apps") # 禁用默认应用程序 # 禁用 "tel" 协议处理 options.add_argument("--disable-features=WebAppInstallForceList,ProtocolHandler,ProtocolHandlerMixedSchemes") options.add_argument("--disable-protocol-handler") # 禁用 "mailto" 协议处理 options.add_argument("--disable-features=WebAppInstallForceList,ProtocolHandler,ProtocolHandlerMixedSchemes,PreloadMediaEngagementData") options.add_argument("--disable-protocol-handler") options.add_experimental_option("prefs", prefs) driver = webdriver.Chrome(chrome_driver_file, options=options) return driver def open_website(url): # Set up Chrome WebDriver with custom User-Agent options = Options() options.add_argument("--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.93 Safari/537.36") prefs = {"profile.managed_default_content_settings.images": 2, 'notifications':2} options.add_experimental_option("prefs", prefs) driver = webdriver.Chrome(chrome_driver_file, options=options) driver.get(url) return driver def extract_hyperlinks(driver): # Find all elements with href attribute elements = driver.find_elements(By.XPATH, '//*[@href]') # Extract the href values from the elements hrefs = [element.get_attribute('href') for element in elements] return hrefs def ignore_image_and_document_hrefs(href): parsed_url = urlparse(href) path = parsed_url.path file_extension = Path(path).suffix # Check if the href has a domain suffix of image or document file extensions return file_extension.lower() in ('.png', '.jpg', '.jpeg', '.pdf', '.doc', '.docx', '.xls', '.xlsx', '.ppt', '.pptx', '.rar', '.zip', '.ico', '.css', '.js') def process_page(driver, url, visited_pages, start_domain, data, group, name): if not url.startswith('http'): return # Add the URL to visited pages visited_pages.add(url) # Navigate to the URL driver.get(url) # Wait for the page to load time.sleep(2) # Extract the content from the page content_element = driver.find_element(By.XPATH, '//body') content_text = content_element.text # print(content_text) # Add URL, Domain, and Content to the data list data.append([group, name, start_domain, url, content_text]) # Find and process hyperlinks hrefs = extract_hyperlinks(driver) for href in hrefs: # Check if the href should be ignored if ignore_image_and_document_hrefs(href): continue # Check if the href is of type "javascript:void(0)" if href.startswith("javascript:void(0)"): continue # Check if the href leads back to the original page or has already been visited if check_href(href, driver.current_url, visited_pages): continue try: # Check if the new href belongs to the same domain as the original URL parsed_href = urlparse(href) if parsed_href.netloc.replace("www.", "") != start_domain: continue # # Open the href in the same tab and retrieve data # driver.get(href) # # print(href) # # Wait for the page to load # time.sleep(2) # # Extract the content from the hyperlink page # hyperlink_content_element = driver.find_element(By.XPATH, '//body') # hyperlink_content_text = hyperlink_content_element.text # # print(hyperlink_content_text) # # Add URL, Domain, and Content of the hyperlink to the data list # data.append([start_domain, href, hyperlink_content_text]) # Recursively process the page and follow hyperlinks process_page(driver, href, visited_pages, start_domain, data, group, name) except Exception as e: print(f"Error processing hyperlink: {href}") print(f"Error message: {str(e)}") continue # Return to the original page # driver.get(url) def check_href(href, original_url, visited_pages): parsed_href = urlparse(href) parsed_original_url = urlparse(original_url) # Check if the href leads back to the original page if parsed_href.netloc.replace("www.", "") == parsed_original_url.netloc.replace("www.", "") and parsed_href.path == parsed_original_url.path: return True # Check if the href has already been visited if href in visited_pages: return True return False def export_to_excel(data, output_filename): # Create separate lists for each column groups = [item[0] for item in data] names = [item[1] for item in data] domains = [item[2] for item in data] urls = [item[3] for item in data] texts = [item[4] for item in data] # Create a DataFrame from the data lists df = pd.DataFrame({'group': groups, 'name': names, 'domain': domains, 'url': urls, 'text': texts}) # Export the DataFrame to an Excel file df.to_excel(output_filename, index=False) def get_cookies_from_previous_session(driver): cookies = {} try: # Execute JavaScript to get the cookies using jQuery Cookie Plugin cookie_script = """ var cookies = $.cookie(); return Object.entries(cookies).map(([name, value]) => `${name}=${value}`); """ cookie_values = driver.execute_script(cookie_script) # Parse the cookie values and store them in a dictionary for cookie_value in cookie_values: cookie_name, cookie_value = cookie_value.split('=') cookies[cookie_name] = cookie_value except Exception as e: print('Error getting cookies:', e) return cookies def add_cookies(driver, cookies): for name, value in cookies.items(): driver.add_cookie({'name': name, 'value': value}) def chrom_main_from_list(sites): for ind, item in enumerate(sites): group = item[0] # Replace with the actual column name for group name = item[1] url = item[2] domain = urlparse(url).netloc.replace("www.", "") if domain in ['xdjstc.com', 'epcyiqizu.com', 'cbra.ctc.ac.cn']: continue url = fix_url_scheme(url) driver = init_driver() # Open the website # driver.get(url) # # Retrieve cookies from previous session # cookies = get_cookies_from_previous_session(driver) # # Add cookies to the WebDriver # add_cookies(driver, cookies) # Initialize the set to store visited pages visited_pages = set() # Initialize the data list data = [] try: # 设置页面加载超时时间为10秒 driver.set_page_load_timeout(10) # 设置脚本执行超时时间为10秒 driver.set_script_timeout(10) # 在这里编写你的代码,例如打开网页、点击按钮等操作 # ... process_page(driver, url, visited_pages, domain, data, group, name) except TimeoutException: # 当超时异常发生时,进行相应的操作,例如跳过或报错 print("超时异常") driver.quit() # Export data to a separate Excel file in the web_dir directory output_filename = os.path.join(BASE_DIR, f'web_dir/{name}_{domain}.xlsx') export_to_excel(data, output_filename) # Close the WebDriver # driver.quit() def chrome_main(): # Read failed URLs from the list df = pd.read_excel(failed_sites_file) for ind, row in df.iterrows(): group = row['单位'] # Replace with the actual column name for group name = row['主办'] url = row['地址'] domain = urlparse(url).netloc.replace("www.", "") # Open the website driver = open_website(url) # Retrieve cookies from previous session cookies = get_cookies_from_previous_session(driver) # Add cookies to the WebDriver add_cookies(driver, cookies) # Initialize the set to store visited pages visited_pages = set() # Initialize the data list data = [] # Process the starting page and follow hyperlinks recursively process_page(driver, url, visited_pages, domain, data) # Export data to a separate Excel file in the web_dir directory output_filename = os.path.join(BASE_DIR, f'web_dir/{name}_{domain}.xlsx') export_to_excel(data, output_filename) # Close the WebDriver driver.quit() if __name__ == "__main__": chrome_main()