代码源码
首先是第一段,也是一个可以正常运行的一段
#!/usr/bin/env python3
import sys
import os
import re
import json
import subprocess
import requests
import socket
# ====== 🔒 隐私脱敏处理 ======
TOKEN = os.getenv("TG_BOT_TOKEN", "YOUR_BOT_TOKEN_HERE")
MY_CHAT_ID = int(os.getenv("TG_CHAT_ID", "0000000000"))
# ============================
BASE_URL = f"https://api.telegram.org/bot{TOKEN}"
CACHE_FILE = "/tmp/bot_ping_multitarget.cache"
LOCK_FILE = "/tmp/bot_state_ping.lock"
def send_tg(text, reply_markup=None):
if TOKEN == "YOUR_BOT_TOKEN_HERE" or MY_CHAT_ID == 0000000000:
return # 未配置凭据时直接跳过,防止线上测试报错
url = f"{BASE_URL}/sendMessage"
payload = {"chat_id": MY_CHAT_ID, "text": text, "parse_mode": "Markdown"}
if reply_markup: payload["reply_markup"] = reply_markup
try: requests.post(url, json=payload, timeout=10)
except: pass
def get_default_gateway():
try:
with open("/proc/net/route", "r") as f:
for line in f.readlines():
parts = line.split()
if len(parts) > 2 and parts[1] == "00000000":
route_hex = parts[2]
return ".".join([str(int(route_hex[i:i+2], 16)) for i in (6, 4, 2, 0)])
except: pass
return "114.114.114.114"
def extract_ips_and_domains(text):
lines = text.replace(",", "\n").split("\n")
targets = []
for line in lines:
cleaned = line.strip().lstrip("/").strip()
if not cleaned: continue
if re.match(r'^[a-zA-Z0-9.-]+\.[a-zA-Z]{2,6}$', cleaned) or re.match(r'^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$', cleaned):
targets.append(cleaned)
return targets
# ─── 核心:最原始、最简单的普通 Ping 模式 ───
def execute_single_stream_ping(t):
# 直接调用系统标准 ping,发送 3 个包,超时 5 秒
cmd = ["ping", "-c", "3", "-w", "5", t]
try:
res = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
output = res.stdout if res.stdout else res.stderr
if res.returncode == 0:
send_tg(f"🟢 **[{t}] Ping 成功**\n```text\n{output.strip()}\n```")
return "SUCCESS"
else:
send_tg(f"🔴 **[{t}] Ping 失败或有丢包**\n```text\n{output.strip()}\n```")
return "DROP"
except Exception as e:
send_tg(f"💥 **[{t}] 运行异常:** `{e}`")
return "ERROR"
# ─── 流水线逐个执行 ───
def execute_ping_pool(targets):
total = len(targets)
send_tg(f"🛰️ **[开始多目标 Ping 测试]**\n\n目标总数: `{total}` 个,正在按顺序逐个测试...")
stats = {"SUCCESS": 0, "DROP": 0, "ERROR": 0}
for t in targets:
status = execute_single_stream_ping(t)
if status in stats:
stats[status] += 1
summary_text = (
f"🏁 **[Ping 测试完成]**\n"
f"------------------------------------\n"
f"📊 **总测节点:** `{total}`\n"
f"🟢 **测试成功:** `{stats['SUCCESS']}`\n"
f"🔴 **测试失败:** `{stats['DROP']}`\n"
f"💥 **未知错误:** `{stats['ERROR']}`\n"
f"------------------------------------"
)
send_tg(summary_text)
def main():
if len(sys.argv) < 2:
target = get_default_gateway()
print(f"ℹ️ 未指定目标,自适应探测默认网关: {target}\n")
cmd = ["ping", "-c", "3", "-w", "4", target]
res = subprocess.run(cmd, capture_output=True, text=True)
print(res.stdout)
return
raw_arg = sys.argv[1].strip()
if raw_arg.lower() == "more":
send_tg("📝 **[多目标链路配置]**\n\n请直接在下方发送你需要**同时探测**的多个公网IPv4地址或域名,每行一个目标:")
with open(LOCK_FILE, "w") as f:
f.write("waiting_list")
sys.exit(0)
if os.path.exists(LOCK_FILE) and not raw_arg.startswith("callback:"):
try: os.remove(LOCK_FILE)
except: pass
targets = extract_ips_and_domains(raw_arg)
if not targets:
send_tg("❌ 未检测到任何合法的IPv4地址或域名,多路探测强行退出。")
sys.exit(0)
with open(CACHE_FILE, "w", encoding="utf-8") as f:
json.dump(targets, f)
inline_keyboard = [[
{"text": "✅ [OK] 开始顺序探测", "callback_data": "k_ping_action_ok"},
{"text": "❌ [NO] 强行退出", "callback_data": "k_ping_action_no"}
]]
targets_str = ", ".join(targets)
send_tg(
f"🛡️ **[多路验证挑战核验面板]**\n\n"
f"确认将按顺序对以下 **[{len(targets)}]** 个节点建立拨测吗?\n"
f"📋 `({targets_str})`\n\n"
f"请点击下方控制板按钮直接核准:",
reply_markup={"inline_keyboard": inline_keyboard}
)
sys.exit(0)
if raw_arg.startswith("callback:"):
action = raw_arg.replace("callback:k_ping_action_", "")
if action == "no":
send_tg("🛑 多路探测指令已被手动取消。")
if os.path.exists(CACHE_FILE): os.remove(CACHE_FILE)
sys.exit(0)
if action == "ok":
if not os.path.exists(CACHE_FILE):
send_tg("❌ 缓存资产已过期,请重新发起 `/ping more`。")
sys.exit(0)
with open(CACHE_FILE, "r", encoding="utf-8") as f:
targets = json.load(f)
if os.path.exists(CACHE_FILE): os.remove(CACHE_FILE)
execute_ping_pool(targets)
sys.exit(0)
print(f"🔍 正在定向侦测目标: {raw_arg}\n")
cmd = ["ping", "-c", "3", "-w", "4", raw_arg]
res = subprocess.run(cmd, capture_output=True, text=True)
print(res.stdout or res.stderr)
if __name__ == "__main__":
main()接下来是第二段代码,也是有问题的一段
#!/usr/bin/env python3
import sys
import os
import re
import json
import subprocess
import requests
import socket
# ====== 🔒 隐私脱敏处理 ======
TOKEN = os.getenv("TG_BOT_TOKEN", "YOUR_BOT_TOKEN_HERE")
MY_CHAT_ID = int(os.getenv("TG_CHAT_ID", "0000000000"))
# ============================
BASE_URL = f"https://api.telegram.org/bot{TOKEN}"
CACHE_FILE = "/tmp/bot_ping_multitarget.cache"
LOCK_FILE = "/tmp/bot_state_ping.lock"
def send_tg(text, reply_markup=None):
if TOKEN == "YOUR_BOT_TOKEN_HERE" or MY_CHAT_ID == 0000000000:
return
url = f"{BASE_URL}/sendMessage"
payload = {"chat_id": MY_CHAT_ID, "text": text, "parse_mode": "Markdown"}
if reply_markup: payload["reply_markup"] = reply_markup
try: requests.post(url, json=payload, timeout=10)
except: pass
def get_default_gateway():
try:
with open("/proc/net/route", "r") as f:
for line in f.readlines():
parts = line.split()
if len(parts) > 2 and parts[1] == "00000000":
route_hex = parts[2]
return ".".join([str(int(route_hex[i:i+2], 16)) for i in (6, 4, 2, 0)])
except: pass
return "114.114.114.114"
def extract_ips_and_domains(text):
lines = text.replace(",", "\n").split("\n")
targets = []
for line in lines:
cleaned = line.strip().lstrip("/").strip()
if not cleaned: continue
if re.match(r'^[a-zA-Z0-9.-]+\.[a-zA-Z]{2,6}$', cleaned) or re.match(r'^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$', cleaned):
targets.append(cleaned)
return targets
# ─── ⚠️ 读者请注意:此段逻辑在运行时会导致丢包误报,请大触指正 ───
def execute_single_stream_ping(t):
# 1. DNS 试探
resolved_ip = t
if not re.match(r'^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$', t):
try:
socket.setdefaulttimeout(0.4) # 400ms 硬限制
resolved_ip = socket.gethostbyname(t)
except Exception:
err_msg = f"🔴 **[{t}] 侦测异常报告**\n```text\n[DNS错误] 400ms内解析超时或域名被严重污染。\n```"
send_tg(err_msg)
return "DNS_FAIL"
# 2. 调用 fping
# 📝 提问:这里的 fping 参数配置或过滤逻辑是否会导致输出捕获不全,从而误判为 DROP?
cmd = ["fping", "-c", "5", "-i", "10", "-A", resolved_ip]
try:
res = subprocess.run(cmd, capture_output=True, text=True, timeout=5)
raw_output = (res.stdout or "") + "\n" + (res.stderr or "")
lines = [line.strip() for line in raw_output.split("\n") if line.strip()]
detail_lines = []
summary_line = ""
for line in lines:
if "icmp_seq" in line:
detail_lines.append(f"📍 {line}")
elif "xmt/rcv" in line:
summary_line = line
if detail_lines:
formatted_detail = "\n".join(detail_lines)
formatted_summary = f"\n📊 统计快照: {summary_line}" if summary_line else ""
success_msg = f"🟢 **[{t}] 侦测完成报告**\n```text\n{formatted_detail}{formatted_summary}\n```"
send_tg(success_msg)
return "SUCCESS"
else:
# 兜底过滤
if summary_line and "0%" not in summary_line and "100%" not in summary_line:
send_tg(f"🟢 **[{t}] 侦测完成报告 (原始回显)**\n```text\n{raw_output.strip()}\n```")
return "SUCCESS"
# 🐛 问题现象:在此处高概率触发全包丢失误报
send_tg(f"🔴 **[{t}] 链路超时,全包丢失。**\n```text\n{raw_output.strip()}\n```")
return "DROP"
except Exception as e:
send_tg(f"💥 **[{t}] 拨测异常:** `{e}`")
return "ERROR"
def execute_ping_pool(targets):
total = len(targets)
send_tg(f"🛰️ **[链路流式滚动监测启动]**\n\n总体监测矩阵规模: `{total} 个节点`... ")
stats = {"SUCCESS": 0, "DNS_FAIL": 0, "DROP": 0, "ERROR": 0}
for i, t in enumerate(targets, 1):
status = execute_single_stream_ping(t)
if status in stats:
stats[status] += 1
summary_text = (
f"🏁 **[流式滚动侦测全盘终结报告]**\n"
f"------------------------------------\n"
f"📊 **监测节点总数:** `{total}`\n"
f"🟢 **通畅节点正常:** `{stats['SUCCESS']}`\n"
f"🔴 **彻底丢包断交:** `{stats['DROP']}`\n"
f"🚫 **域名寻址死锁:** `{stats['DNS_FAIL']}`\n"
f"💥 **未知运行崩溃:** `{stats['ERROR']}`\n"
f"------------------------------------"
)
send_tg(summary_text)
def main():
if len(sys.argv) < 2:
target = get_default_gateway()
print(f"ℹ️ 未指定目标,自适应探测默认网关: {target}\n")
cmd = ["ping", "-c", "3", "-w", "4", target]
res = subprocess.run(cmd, capture_output=True, text=True)
print(res.stdout)
return
raw_arg = sys.argv[1].strip()
if raw_arg.lower() == "more":
send_tg("📝 **[多目标链路配置]**\n\n请直接在下方发送你需要探测的多个目标:")
with open(LOCK_FILE, "w") as f:
f.write("waiting_list")
sys.exit(0)
if os.path.exists(LOCK_FILE) and not raw_arg.startswith("callback:"):
try: os.remove(LOCK_FILE)
except: pass
targets = extract_ips_and_domains(raw_arg)
if not targets:
send_tg("❌ 未检测到任何合法的IPv4地址或域名。")
sys.exit(0)
with open(CACHE_FILE, "w", encoding="utf-8") as f:
json.dump(targets, f)
inline_keyboard = [[
{"text": "✅ [OK] 开始流式滚动探测", "callback_data": "k_ping_action_ok"},
{"text": "❌ [NO] 强行退出", "callback_data": "k_ping_action_no"}
]]
targets_str = ", ".join(targets)
send_tg(
f"🛡️ **[多路验证挑战核验面板]**\n\n确认将按顺序对以下 **[{len(targets)}]** 个节点建立拨测吗?\n📋 `({targets_str})`\n\n请点击下方控制板按钮直接核准:",
reply_markup={"inline_keyboard": inline_keyboard}
)
sys.exit(0)
if raw_arg.startswith("callback:"):
action = raw_arg.replace("callback:k_ping_action_", "")
if action == "no":
send_tg("🛑 多路联动探测指令已被用户手动销毁。")
if os.path.exists(CACHE_FILE): os.remove(CACHE_FILE)
sys.exit(0)
if action == "ok":
if not os.path.exists(CACHE_FILE):
send_tg("❌ 缓存资产已过期。")
sys.exit(0)
with open(CACHE_FILE, "r", encoding="utf-8") as f:
targets = json.load(f)
if os.path.exists(CACHE_FILE): os.remove(CACHE_FILE)
execute_ping_pool(targets)
sys.exit(0)
print(f"🔍 正在定向侦测目标: {raw_arg}\n")
cmd = ["ping", "-c", "3", "-w", "4", raw_arg]
res = subprocess.run(cmd, capture_output=True, text=True)
print(res.stdout or res.stderr)
if __name__ == "__main__":
main()第二段问题的表现
首先就是明明ping的通,但却无法正常显示能正常连接,例如这一段
[202X/07/04 15:50] user_test: ping more
[202X/07/04 15:50] my_ping_bot: ⏳ 正在执行系统运维脚本 ping...
[202X/07/04 15:50] my_ping_bot: 📝 多目标链路配置
请直接在下方发送你需要同时探测的多个公网IPv4地址或域名,每行一个目标:
[202X/07/04 15:50] user_test: baidu.com
com.com
[202X/07/04 15:50] my_ping_bot: 🛡️ 多路验证挑战核验面板
确认将按顺序对以下 2 个节点建立流式滚动拨测矩阵吗?
📋 (baidu.com, com.com)
请点击下方控制板按钮直接核准:
[202X/07/04 15:51] my_ping_bot: 🛰️ 链路流式滚动监测启动
总体监测矩阵规模: 2 个节点
状态: 正在按序线性下探,请留意后续逐个回传面板...
[202X/07/04 15:51] my_ping_bot: 🔴 baidu.com 链路超时,全包丢失。
110.242.74.102 : [0], 64 bytes, 43.7 ms (43.7 avg, 0% loss)
110.242.74.102 : [1], 64 bytes, 43.6 ms (43.7 avg, 0% loss)
110.242.74.102 : [2], 64 bytes, 43.5 ms (43.6 avg, 0% loss)
110.242.74.102 : [3], 64 bytes, 43.6 ms (43.6 avg, 0% loss)
110.242.74.102 : [4], 64 bytes, 43.5 ms (43.6 avg, 0% loss)
110.242.74.102 : xmt/rcv/%loss = 5/5/0%, min/avg/max = 43.5/43.6/43.7
[202X/07/04 15:51] my_ping_bot: 🔴 com.com 链路超时,全包丢失。
104.26.5.148 : [0], 64 bytes, 3.52 ms (3.52 avg, 0% loss)
104.26.5.148 : [1], 64 bytes, 3.28 ms (3.40 avg, 0% loss)
104.26.5.148 : [2], 64 bytes, 3.31 ms (3.37 avg, 0% loss)
104.26.5.148 : [3], 64 bytes, 3.18 ms (3.32 avg, 0% loss)
104.26.5.148 : [4], 64 bytes, 3.78 ms (3.41 avg, 0% loss)
104.26.5.148 : xmt/rcv/%loss = 5/5/0%, min/avg/max = 3.18/3.41/3.78
[202X/07/04 15:51] my_ping_bot: 🏁 流式滚动侦测全盘终结报告
------------------------------------
📊 监测节点总数: 2
🟢 通畅节点正常: 0
🔴 彻底丢包断交: 2
🚫 域名寻址死锁: 0
💥 未知运行崩溃: 0
------------------------------------
✨ 所有分配的拨测流水线已全部打完收工并安全解构。我也搞不懂为什么,因为我写到后面脑子就糊涂了,这玩意我反反复复研究了将近一个小时,但是还是没能弄好