#!/usr/bin/env python3 """ 网络诊断工具 用于诊断服务器部署后访问外部网站时的网络连接问题 """ import socket import requests import time import urllib.parse import subprocess import platform from typing import Dict, List def diagnose_network_connectivity(url: str) -> Dict: """ 综合诊断网络连接问题 """ print(f"开始诊断网络连接: {url}") results = { 'url': url, 'timestamp': time.strftime('%Y-%m-%d %H:%M:%S'), 'tests': {} } # 解析URL try: parsed_url = urllib.parse.urlparse(url) hostname = parsed_url.hostname port = parsed_url.port or (443 if parsed_url.scheme == 'https' else 80) results['hostname'] = hostname results['port'] = port except Exception as e: results['error'] = f"URL解析失败: {e}" return results # 1. DNS解析测试 results['tests']['dns_resolution'] = test_dns_resolution(hostname) # 2. TCP连接测试 results['tests']['tcp_connection'] = test_tcp_connection(hostname, port) # 3. HTTP请求测试 results['tests']['http_request'] = test_http_request(url) # 4. 系统网络配置检查 results['tests']['system_info'] = get_system_network_info() # 5. 建议解决方案 results['suggestions'] = generate_suggestions(results['tests']) return results def test_dns_resolution(hostname: str) -> Dict: """测试DNS解析""" print(f"测试DNS解析: {hostname}") test_result = { 'status': 'unknown', 'details': {} } try: start_time = time.time() ip_address = socket.gethostbyname(hostname) dns_time = time.time() - start_time test_result['status'] = 'success' test_result['details'] = { 'ip_address': ip_address, 'resolution_time': f"{dns_time:.3f}秒" } print(f"DNS解析成功: {hostname} -> {ip_address} ({dns_time:.3f}秒)") except socket.gaierror as e: test_result['status'] = 'failed' test_result['details'] = { 'error': str(e), 'error_code': e.errno if hasattr(e, 'errno') else 'unknown' } print(f"DNS解析失败: {e}") return test_result def test_tcp_connection(hostname: str, port: int) -> Dict: """测试TCP连接""" print(f"测试TCP连接: {hostname}:{port}") test_result = { 'status': 'unknown', 'details': {} } try: start_time = time.time() sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(10) # 10秒超时 result = sock.connect_ex((hostname, port)) connect_time = time.time() - start_time sock.close() if result == 0: test_result['status'] = 'success' test_result['details'] = { 'connection_time': f"{connect_time:.3f}秒", 'port_open': True } print(f"TCP连接成功: {hostname}:{port} ({connect_time:.3f}秒)") else: test_result['status'] = 'failed' test_result['details'] = { 'connection_time': f"{connect_time:.3f}秒", 'port_open': False, 'error_code': result } print(f"TCP连接失败: {hostname}:{port} (错误代码: {result})") except Exception as e: test_result['status'] = 'failed' test_result['details'] = { 'error': str(e) } print(f"TCP连接测试异常: {e}") return test_result def test_http_request(url: str) -> Dict: """测试HTTP请求""" print(f"测试HTTP请求: {url}") test_result = { 'status': 'unknown', 'details': {} } headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' } try: start_time = time.time() response = requests.get( url, headers=headers, timeout=(10, 30), # (连接超时, 读取超时) verify=False, allow_redirects=True ) request_time = time.time() - start_time test_result['status'] = 'success' test_result['details'] = { 'status_code': response.status_code, 'request_time': f"{request_time:.3f}秒", 'content_length': len(response.content), 'encoding': response.encoding, 'headers_count': len(response.headers) } print(f"HTTP请求成功: {response.status_code} ({request_time:.3f}秒)") except requests.exceptions.Timeout as e: test_result['status'] = 'timeout' test_result['details'] = { 'error': '请求超时', 'timeout_type': str(type(e).__name__) } print(f"HTTP请求超时: {e}") except requests.exceptions.ConnectionError as e: test_result['status'] = 'connection_error' test_result['details'] = { 'error': '连接错误', 'error_detail': str(e) } print(f"HTTP连接错误: {e}") except Exception as e: test_result['status'] = 'failed' test_result['details'] = { 'error': str(e) } print(f"HTTP请求失败: {e}") return test_result def get_system_network_info() -> Dict: """获取系统网络信息""" print("收集系统网络信息...") info = { 'platform': platform.system(), 'platform_version': platform.version(), 'python_version': platform.python_version() } try: # 获取本机IP地址 hostname = socket.gethostname() local_ip = socket.gethostbyname(hostname) info['hostname'] = hostname info['local_ip'] = local_ip except: info['hostname'] = 'unknown' info['local_ip'] = 'unknown' # 检查网络配置(仅在Linux/Unix系统上) if platform.system() in ['Linux', 'Darwin']: try: # 检查DNS服务器 with open('/etc/resolv.conf', 'r') as f: dns_servers = [] for line in f: if line.startswith('nameserver'): dns_servers.append(line.split()[1]) info['dns_servers'] = dns_servers except: info['dns_servers'] = 'unavailable' try: # 检查默认网关 result = subprocess.run(['ip', 'route', 'show', 'default'], capture_output=True, text=True, timeout=5) if result.returncode == 0: info['default_gateway'] = result.stdout.strip() else: info['default_gateway'] = 'unavailable' except: info['default_gateway'] = 'unavailable' return info def generate_suggestions(test_results: Dict) -> List[str]: """根据测试结果生成建议""" suggestions = [] # DNS问题 if test_results.get('dns_resolution', {}).get('status') == 'failed': suggestions.extend([ "DNS解析失败,请检查服务器的DNS配置", "尝试使用公共DNS服务器 (8.8.8.8, 114.114.114.114)", "检查 /etc/resolv.conf 文件中的DNS服务器配置" ]) # TCP连接问题 if test_results.get('tcp_connection', {}).get('status') == 'failed': suggestions.extend([ "TCP连接失败,可能是防火墙阻止了连接", "检查服务器的出站规则设置", "确认目标端口是否开放" ]) # HTTP请求问题 http_status = test_results.get('http_request', {}).get('status') if http_status == 'timeout': suggestions.extend([ "HTTP请求超时,网络延迟较高", "增加超时时间设置", "考虑使用重试机制" ]) elif http_status == 'connection_error': suggestions.extend([ "HTTP连接错误,可能是网络配置问题", "检查代理设置", "确认SSL/TLS证书配置" ]) # 通用建议 if not suggestions: suggestions.append("网络连接测试基本正常,问题可能在应用层面") suggestions.extend([ "联系系统管理员检查网络配置", "考虑使用VPN或代理服务器", "检查服务器所在云平台的安全组设置" ]) return suggestions def main(): """主函数,用于命令行测试""" test_url = "https://ac.bit.edu.cn" print("=" * 60) print("网络连接诊断工具") print("=" * 60) results = diagnose_network_connectivity(test_url) print("\n" + "=" * 60) print("诊断结果") print("=" * 60) for test_name, test_result in results['tests'].items(): print(f"\n{test_name}: {test_result['status']}") if 'details' in test_result: for key, value in test_result['details'].items(): print(f" {key}: {value}") print("\n" + "=" * 60) print("建议解决方案") print("=" * 60) for i, suggestion in enumerate(results['suggestions'], 1): print(f"{i}. {suggestion}") if __name__ == "__main__": main()