304 lines
9.2 KiB
Python
304 lines
9.2 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
网络诊断工具
|
||
用于诊断服务器部署后访问外部网站时的网络连接问题
|
||
"""
|
||
|
||
import socket
|
||
import requests
|
||
import time
|
||
import urllib.parse
|
||
import subprocess
|
||
import platform
|
||
from typing import Dict, List
|
||
|
||
def diagnose_network_connectivity(url: str) -> Dict:
|
||
"""
|
||
综合诊断网络连接问题
|
||
"""
|
||
print(f"开始诊断网络连接: {url}")
|
||
results = {
|
||
'url': url,
|
||
'timestamp': time.strftime('%Y-%m-%d %H:%M:%S'),
|
||
'tests': {}
|
||
}
|
||
|
||
# 解析URL
|
||
try:
|
||
parsed_url = urllib.parse.urlparse(url)
|
||
hostname = parsed_url.hostname
|
||
port = parsed_url.port or (443 if parsed_url.scheme == 'https' else 80)
|
||
results['hostname'] = hostname
|
||
results['port'] = port
|
||
except Exception as e:
|
||
results['error'] = f"URL解析失败: {e}"
|
||
return results
|
||
|
||
# 1. DNS解析测试
|
||
results['tests']['dns_resolution'] = test_dns_resolution(hostname)
|
||
|
||
# 2. TCP连接测试
|
||
results['tests']['tcp_connection'] = test_tcp_connection(hostname, port)
|
||
|
||
# 3. HTTP请求测试
|
||
results['tests']['http_request'] = test_http_request(url)
|
||
|
||
# 4. 系统网络配置检查
|
||
results['tests']['system_info'] = get_system_network_info()
|
||
|
||
# 5. 建议解决方案
|
||
results['suggestions'] = generate_suggestions(results['tests'])
|
||
|
||
return results
|
||
|
||
def test_dns_resolution(hostname: str) -> Dict:
|
||
"""测试DNS解析"""
|
||
print(f"测试DNS解析: {hostname}")
|
||
test_result = {
|
||
'status': 'unknown',
|
||
'details': {}
|
||
}
|
||
|
||
try:
|
||
start_time = time.time()
|
||
ip_address = socket.gethostbyname(hostname)
|
||
dns_time = time.time() - start_time
|
||
|
||
test_result['status'] = 'success'
|
||
test_result['details'] = {
|
||
'ip_address': ip_address,
|
||
'resolution_time': f"{dns_time:.3f}秒"
|
||
}
|
||
print(f"DNS解析成功: {hostname} -> {ip_address} ({dns_time:.3f}秒)")
|
||
|
||
except socket.gaierror as e:
|
||
test_result['status'] = 'failed'
|
||
test_result['details'] = {
|
||
'error': str(e),
|
||
'error_code': e.errno if hasattr(e, 'errno') else 'unknown'
|
||
}
|
||
print(f"DNS解析失败: {e}")
|
||
|
||
return test_result
|
||
|
||
def test_tcp_connection(hostname: str, port: int) -> Dict:
|
||
"""测试TCP连接"""
|
||
print(f"测试TCP连接: {hostname}:{port}")
|
||
test_result = {
|
||
'status': 'unknown',
|
||
'details': {}
|
||
}
|
||
|
||
try:
|
||
start_time = time.time()
|
||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||
sock.settimeout(10) # 10秒超时
|
||
|
||
result = sock.connect_ex((hostname, port))
|
||
connect_time = time.time() - start_time
|
||
sock.close()
|
||
|
||
if result == 0:
|
||
test_result['status'] = 'success'
|
||
test_result['details'] = {
|
||
'connection_time': f"{connect_time:.3f}秒",
|
||
'port_open': True
|
||
}
|
||
print(f"TCP连接成功: {hostname}:{port} ({connect_time:.3f}秒)")
|
||
else:
|
||
test_result['status'] = 'failed'
|
||
test_result['details'] = {
|
||
'connection_time': f"{connect_time:.3f}秒",
|
||
'port_open': False,
|
||
'error_code': result
|
||
}
|
||
print(f"TCP连接失败: {hostname}:{port} (错误代码: {result})")
|
||
|
||
except Exception as e:
|
||
test_result['status'] = 'failed'
|
||
test_result['details'] = {
|
||
'error': str(e)
|
||
}
|
||
print(f"TCP连接测试异常: {e}")
|
||
|
||
return test_result
|
||
|
||
def test_http_request(url: str) -> Dict:
|
||
"""测试HTTP请求"""
|
||
print(f"测试HTTP请求: {url}")
|
||
test_result = {
|
||
'status': 'unknown',
|
||
'details': {}
|
||
}
|
||
|
||
headers = {
|
||
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
|
||
}
|
||
|
||
try:
|
||
start_time = time.time()
|
||
response = requests.get(
|
||
url,
|
||
headers=headers,
|
||
timeout=(10, 30), # (连接超时, 读取超时)
|
||
verify=False,
|
||
allow_redirects=True
|
||
)
|
||
request_time = time.time() - start_time
|
||
|
||
test_result['status'] = 'success'
|
||
test_result['details'] = {
|
||
'status_code': response.status_code,
|
||
'request_time': f"{request_time:.3f}秒",
|
||
'content_length': len(response.content),
|
||
'encoding': response.encoding,
|
||
'headers_count': len(response.headers)
|
||
}
|
||
print(f"HTTP请求成功: {response.status_code} ({request_time:.3f}秒)")
|
||
|
||
except requests.exceptions.Timeout as e:
|
||
test_result['status'] = 'timeout'
|
||
test_result['details'] = {
|
||
'error': '请求超时',
|
||
'timeout_type': str(type(e).__name__)
|
||
}
|
||
print(f"HTTP请求超时: {e}")
|
||
|
||
except requests.exceptions.ConnectionError as e:
|
||
test_result['status'] = 'connection_error'
|
||
test_result['details'] = {
|
||
'error': '连接错误',
|
||
'error_detail': str(e)
|
||
}
|
||
print(f"HTTP连接错误: {e}")
|
||
|
||
except Exception as e:
|
||
test_result['status'] = 'failed'
|
||
test_result['details'] = {
|
||
'error': str(e)
|
||
}
|
||
print(f"HTTP请求失败: {e}")
|
||
|
||
return test_result
|
||
|
||
def get_system_network_info() -> Dict:
|
||
"""获取系统网络信息"""
|
||
print("收集系统网络信息...")
|
||
info = {
|
||
'platform': platform.system(),
|
||
'platform_version': platform.version(),
|
||
'python_version': platform.python_version()
|
||
}
|
||
|
||
try:
|
||
# 获取本机IP地址
|
||
hostname = socket.gethostname()
|
||
local_ip = socket.gethostbyname(hostname)
|
||
info['hostname'] = hostname
|
||
info['local_ip'] = local_ip
|
||
except:
|
||
info['hostname'] = 'unknown'
|
||
info['local_ip'] = 'unknown'
|
||
|
||
# 检查网络配置(仅在Linux/Unix系统上)
|
||
if platform.system() in ['Linux', 'Darwin']:
|
||
try:
|
||
# 检查DNS服务器
|
||
with open('/etc/resolv.conf', 'r') as f:
|
||
dns_servers = []
|
||
for line in f:
|
||
if line.startswith('nameserver'):
|
||
dns_servers.append(line.split()[1])
|
||
info['dns_servers'] = dns_servers
|
||
except:
|
||
info['dns_servers'] = 'unavailable'
|
||
|
||
try:
|
||
# 检查默认网关
|
||
result = subprocess.run(['ip', 'route', 'show', 'default'],
|
||
capture_output=True, text=True, timeout=5)
|
||
if result.returncode == 0:
|
||
info['default_gateway'] = result.stdout.strip()
|
||
else:
|
||
info['default_gateway'] = 'unavailable'
|
||
except:
|
||
info['default_gateway'] = 'unavailable'
|
||
|
||
return info
|
||
|
||
def generate_suggestions(test_results: Dict) -> List[str]:
|
||
"""根据测试结果生成建议"""
|
||
suggestions = []
|
||
|
||
# DNS问题
|
||
if test_results.get('dns_resolution', {}).get('status') == 'failed':
|
||
suggestions.extend([
|
||
"DNS解析失败,请检查服务器的DNS配置",
|
||
"尝试使用公共DNS服务器 (8.8.8.8, 114.114.114.114)",
|
||
"检查 /etc/resolv.conf 文件中的DNS服务器配置"
|
||
])
|
||
|
||
# TCP连接问题
|
||
if test_results.get('tcp_connection', {}).get('status') == 'failed':
|
||
suggestions.extend([
|
||
"TCP连接失败,可能是防火墙阻止了连接",
|
||
"检查服务器的出站规则设置",
|
||
"确认目标端口是否开放"
|
||
])
|
||
|
||
# HTTP请求问题
|
||
http_status = test_results.get('http_request', {}).get('status')
|
||
if http_status == 'timeout':
|
||
suggestions.extend([
|
||
"HTTP请求超时,网络延迟较高",
|
||
"增加超时时间设置",
|
||
"考虑使用重试机制"
|
||
])
|
||
elif http_status == 'connection_error':
|
||
suggestions.extend([
|
||
"HTTP连接错误,可能是网络配置问题",
|
||
"检查代理设置",
|
||
"确认SSL/TLS证书配置"
|
||
])
|
||
|
||
# 通用建议
|
||
if not suggestions:
|
||
suggestions.append("网络连接测试基本正常,问题可能在应用层面")
|
||
|
||
suggestions.extend([
|
||
"联系系统管理员检查网络配置",
|
||
"考虑使用VPN或代理服务器",
|
||
"检查服务器所在云平台的安全组设置"
|
||
])
|
||
|
||
return suggestions
|
||
|
||
def main():
|
||
"""主函数,用于命令行测试"""
|
||
test_url = "https://ac.bit.edu.cn"
|
||
|
||
print("=" * 60)
|
||
print("网络连接诊断工具")
|
||
print("=" * 60)
|
||
|
||
results = diagnose_network_connectivity(test_url)
|
||
|
||
print("\n" + "=" * 60)
|
||
print("诊断结果")
|
||
print("=" * 60)
|
||
|
||
for test_name, test_result in results['tests'].items():
|
||
print(f"\n{test_name}: {test_result['status']}")
|
||
if 'details' in test_result:
|
||
for key, value in test_result['details'].items():
|
||
print(f" {key}: {value}")
|
||
|
||
print("\n" + "=" * 60)
|
||
print("建议解决方案")
|
||
print("=" * 60)
|
||
|
||
for i, suggestion in enumerate(results['suggestions'], 1):
|
||
print(f"{i}. {suggestion}")
|
||
|
||
if __name__ == "__main__":
|
||
main() |