关于两段代码的错误性指点与修改

关于两段代码的错误性指点与修改

 次点击
34 分钟阅读

代码源码

首先是第一段,也是一个可以正常运行的一段

#!/usr/bin/env python3
import sys
import os
import re
import json
import subprocess
import requests
import socket

# ====== 🔒 隐私脱敏处理 ======
TOKEN = os.getenv("TG_BOT_TOKEN", "YOUR_BOT_TOKEN_HERE")
MY_CHAT_ID = int(os.getenv("TG_CHAT_ID", "0000000000"))
# ============================

BASE_URL = f"https://api.telegram.org/bot{TOKEN}"
CACHE_FILE = "/tmp/bot_ping_multitarget.cache"
LOCK_FILE = "/tmp/bot_state_ping.lock"

def send_tg(text, reply_markup=None):
    if TOKEN == "YOUR_BOT_TOKEN_HERE" or MY_CHAT_ID == 0000000000:
        return  # 未配置凭据时直接跳过,防止线上测试报错
    url = f"{BASE_URL}/sendMessage"
    payload = {"chat_id": MY_CHAT_ID, "text": text, "parse_mode": "Markdown"}
    if reply_markup: payload["reply_markup"] = reply_markup
    try: requests.post(url, json=payload, timeout=10)
    except: pass

def get_default_gateway():
    try:
        with open("/proc/net/route", "r") as f:
            for line in f.readlines():
                parts = line.split()
                if len(parts) > 2 and parts[1] == "00000000":
                    route_hex = parts[2]
                    return ".".join([str(int(route_hex[i:i+2], 16)) for i in (6, 4, 2, 0)])
    except: pass
    return "114.114.114.114"

def extract_ips_and_domains(text):
    lines = text.replace(",", "\n").split("\n")
    targets = []
    for line in lines:
        cleaned = line.strip().lstrip("/").strip()
        if not cleaned: continue
        if re.match(r'^[a-zA-Z0-9.-]+\.[a-zA-Z]{2,6}$', cleaned) or re.match(r'^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$', cleaned):
            targets.append(cleaned)
    return targets

# ─── 核心:最原始、最简单的普通 Ping 模式 ───
def execute_single_stream_ping(t):
    # 直接调用系统标准 ping,发送 3 个包,超时 5 秒
    cmd = ["ping", "-c", "3", "-w", "5", t]
    try:
        res = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
        output = res.stdout if res.stdout else res.stderr
        
        if res.returncode == 0:
            send_tg(f"🟢 **[{t}] Ping 成功**\n```text\n{output.strip()}\n```")
            return "SUCCESS"
        else:
            send_tg(f"🔴 **[{t}] Ping 失败或有丢包**\n```text\n{output.strip()}\n```")
            return "DROP"
    except Exception as e:
        send_tg(f"💥 **[{t}] 运行异常:** `{e}`")
        return "ERROR"

# ─── 流水线逐个执行 ───
def execute_ping_pool(targets):
    total = len(targets)
    send_tg(f"🛰️ **[开始多目标 Ping 测试]**\n\n目标总数: `{total}` 个,正在按顺序逐个测试...")
    
    stats = {"SUCCESS": 0, "DROP": 0, "ERROR": 0}
    
    for t in targets:
        status = execute_single_stream_ping(t)
        if status in stats:
            stats[status] += 1
            
    summary_text = (
        f"🏁 **[Ping 测试完成]**\n"
        f"------------------------------------\n"
        f"📊 **总测节点:** `{total}`\n"
        f"🟢 **测试成功:** `{stats['SUCCESS']}`\n"
        f"🔴 **测试失败:** `{stats['DROP']}`\n"
        f"💥 **未知错误:** `{stats['ERROR']}`\n"
        f"------------------------------------"
    )
    send_tg(summary_text)

def main():
    if len(sys.argv) < 2:
        target = get_default_gateway()
        print(f"ℹ️ 未指定目标,自适应探测默认网关: {target}\n")
        cmd = ["ping", "-c", "3", "-w", "4", target]
        res = subprocess.run(cmd, capture_output=True, text=True)
        print(res.stdout)
        return

    raw_arg = sys.argv[1].strip()

    if raw_arg.lower() == "more":
        send_tg("📝 **[多目标链路配置]**\n\n请直接在下方发送你需要**同时探测**的多个公网IPv4地址或域名,每行一个目标:")
        with open(LOCK_FILE, "w") as f: 
            f.write("waiting_list")
        sys.exit(0)

    if os.path.exists(LOCK_FILE) and not raw_arg.startswith("callback:"):
        try: os.remove(LOCK_FILE)
        except: pass
        
        targets = extract_ips_and_domains(raw_arg)
        if not targets:
            send_tg("❌ 未检测到任何合法的IPv4地址或域名,多路探测强行退出。")
            sys.exit(0)
            
        with open(CACHE_FILE, "w", encoding="utf-8") as f:
            json.dump(targets, f)
            
        inline_keyboard = [[
            {"text": "✅ [OK] 开始顺序探测", "callback_data": "k_ping_action_ok"},
            {"text": "❌ [NO] 强行退出", "callback_data": "k_ping_action_no"}
        ]]
        
        targets_str = ", ".join(targets)
        send_tg(
            f"🛡️ **[多路验证挑战核验面板]**\n\n"
            f"确认将按顺序对以下 **[{len(targets)}]** 个节点建立拨测吗?\n"
            f"📋 `({targets_str})`\n\n"
            f"请点击下方控制板按钮直接核准:", 
            reply_markup={"inline_keyboard": inline_keyboard}
        )
        sys.exit(0)

    if raw_arg.startswith("callback:"):
        action = raw_arg.replace("callback:k_ping_action_", "")
        if action == "no":
            send_tg("🛑 多路探测指令已被手动取消。")
            if os.path.exists(CACHE_FILE): os.remove(CACHE_FILE)
            sys.exit(0)
            
        if action == "ok":
            if not os.path.exists(CACHE_FILE):
                send_tg("❌ 缓存资产已过期,请重新发起 `/ping more`。")
                sys.exit(0)
            with open(CACHE_FILE, "r", encoding="utf-8") as f:
                targets = json.load(f)
            if os.path.exists(CACHE_FILE): os.remove(CACHE_FILE)
            
            execute_ping_pool(targets)
            sys.exit(0)

    print(f"🔍 正在定向侦测目标: {raw_arg}\n")
    cmd = ["ping", "-c", "3", "-w", "4", raw_arg]
    res = subprocess.run(cmd, capture_output=True, text=True)
    print(res.stdout or res.stderr)

if __name__ == "__main__":
    main()

接下来是第二段代码,也是有问题的一段

#!/usr/bin/env python3
import sys
import os
import re
import json
import subprocess
import requests
import socket

# ====== 🔒 隐私脱敏处理 ======
TOKEN = os.getenv("TG_BOT_TOKEN", "YOUR_BOT_TOKEN_HERE")
MY_CHAT_ID = int(os.getenv("TG_CHAT_ID", "0000000000"))
# ============================

BASE_URL = f"https://api.telegram.org/bot{TOKEN}"
CACHE_FILE = "/tmp/bot_ping_multitarget.cache"
LOCK_FILE = "/tmp/bot_state_ping.lock"

def send_tg(text, reply_markup=None):
    if TOKEN == "YOUR_BOT_TOKEN_HERE" or MY_CHAT_ID == 0000000000:
        return
    url = f"{BASE_URL}/sendMessage"
    payload = {"chat_id": MY_CHAT_ID, "text": text, "parse_mode": "Markdown"}
    if reply_markup: payload["reply_markup"] = reply_markup
    try: requests.post(url, json=payload, timeout=10)
    except: pass

def get_default_gateway():
    try:
        with open("/proc/net/route", "r") as f:
            for line in f.readlines():
                parts = line.split()
                if len(parts) > 2 and parts[1] == "00000000":
                    route_hex = parts[2]
                    return ".".join([str(int(route_hex[i:i+2], 16)) for i in (6, 4, 2, 0)])
    except: pass
    return "114.114.114.114"

def extract_ips_and_domains(text):
    lines = text.replace(",", "\n").split("\n")
    targets = []
    for line in lines:
        cleaned = line.strip().lstrip("/").strip()
        if not cleaned: continue
        if re.match(r'^[a-zA-Z0-9.-]+\.[a-zA-Z]{2,6}$', cleaned) or re.match(r'^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$', cleaned):
            targets.append(cleaned)
    return targets

# ─── ⚠️ 读者请注意:此段逻辑在运行时会导致丢包误报,请大触指正 ───
def execute_single_stream_ping(t):
    # 1. DNS 试探
    resolved_ip = t
    if not re.match(r'^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$', t):
        try:
            socket.setdefaulttimeout(0.4) # 400ms 硬限制
            resolved_ip = socket.gethostbyname(t)
        except Exception:
            err_msg = f"🔴 **[{t}] 侦测异常报告**\n```text\n[DNS错误] 400ms内解析超时或域名被严重污染。\n```"
            send_tg(err_msg)
            return "DNS_FAIL"

    # 2. 调用 fping
    # 📝 提问:这里的 fping 参数配置或过滤逻辑是否会导致输出捕获不全,从而误判为 DROP?
    cmd = ["fping", "-c", "5", "-i", "10", "-A", resolved_ip]
    try:
        res = subprocess.run(cmd, capture_output=True, text=True, timeout=5)
        raw_output = (res.stdout or "") + "\n" + (res.stderr or "")
        lines = [line.strip() for line in raw_output.split("\n") if line.strip()]

        detail_lines = []
        summary_line = ""

        for line in lines:
            if "icmp_seq" in line:
                detail_lines.append(f"📍 {line}")
            elif "xmt/rcv" in line:
                summary_line = line

        if detail_lines:
            formatted_detail = "\n".join(detail_lines)
            formatted_summary = f"\n📊 统计快照: {summary_line}" if summary_line else ""
            success_msg = f"🟢 **[{t}] 侦测完成报告**\n```text\n{formatted_detail}{formatted_summary}\n```"
            send_tg(success_msg)
            return "SUCCESS"
        else:
            # 兜底过滤
            if summary_line and "0%" not in summary_line and "100%" not in summary_line:
                send_tg(f"🟢 **[{t}] 侦测完成报告 (原始回显)**\n```text\n{raw_output.strip()}\n```")
                return "SUCCESS"

            # 🐛 问题现象:在此处高概率触发全包丢失误报
            send_tg(f"🔴 **[{t}] 链路超时,全包丢失。**\n```text\n{raw_output.strip()}\n```")
            return "DROP"
    except Exception as e:
        send_tg(f"💥 **[{t}] 拨测异常:** `{e}`")
        return "ERROR"

def execute_ping_pool(targets):
    total = len(targets)
    send_tg(f"🛰️ **[链路流式滚动监测启动]**\n\n总体监测矩阵规模: `{total} 个节点`... ")
    
    stats = {"SUCCESS": 0, "DNS_FAIL": 0, "DROP": 0, "ERROR": 0}

    for i, t in enumerate(targets, 1):
        status = execute_single_stream_ping(t)
        if status in stats:
            stats[status] += 1

    summary_text = (
        f"🏁 **[流式滚动侦测全盘终结报告]**\n"
        f"------------------------------------\n"
        f"📊 **监测节点总数:** `{total}`\n"
        f"🟢 **通畅节点正常:** `{stats['SUCCESS']}`\n"
        f"🔴 **彻底丢包断交:** `{stats['DROP']}`\n"
        f"🚫 **域名寻址死锁:** `{stats['DNS_FAIL']}`\n"
        f"💥 **未知运行崩溃:** `{stats['ERROR']}`\n"
        f"------------------------------------"
    )
    send_tg(summary_text)

def main():
    if len(sys.argv) < 2:
        target = get_default_gateway()
        print(f"ℹ️ 未指定目标,自适应探测默认网关: {target}\n")
        cmd = ["ping", "-c", "3", "-w", "4", target]
        res = subprocess.run(cmd, capture_output=True, text=True)
        print(res.stdout)
        return

    raw_arg = sys.argv[1].strip()

    if raw_arg.lower() == "more":
        send_tg("📝 **[多目标链路配置]**\n\n请直接在下方发送你需要探测的多个目标:")
        with open(LOCK_FILE, "w") as f:
            f.write("waiting_list")
        sys.exit(0)

    if os.path.exists(LOCK_FILE) and not raw_arg.startswith("callback:"):
        try: os.remove(LOCK_FILE)
        except: pass

        targets = extract_ips_and_domains(raw_arg)
        if not targets:
            send_tg("❌ 未检测到任何合法的IPv4地址或域名。")
            sys.exit(0)

        with open(CACHE_FILE, "w", encoding="utf-8") as f:
            json.dump(targets, f)

        inline_keyboard = [[
            {"text": "✅ [OK] 开始流式滚动探测", "callback_data": "k_ping_action_ok"},
            {"text": "❌ [NO] 强行退出", "callback_data": "k_ping_action_no"}
        ]]

        targets_str = ", ".join(targets)
        send_tg(
            f"🛡️ **[多路验证挑战核验面板]**\n\n确认将按顺序对以下 **[{len(targets)}]** 个节点建立拨测吗?\n📋 `({targets_str})`\n\n请点击下方控制板按钮直接核准:",
            reply_markup={"inline_keyboard": inline_keyboard}
        )
        sys.exit(0)

    if raw_arg.startswith("callback:"):
        action = raw_arg.replace("callback:k_ping_action_", "")
        if action == "no":
            send_tg("🛑 多路联动探测指令已被用户手动销毁。")
            if os.path.exists(CACHE_FILE): os.remove(CACHE_FILE)
            sys.exit(0)

        if action == "ok":
            if not os.path.exists(CACHE_FILE):
                send_tg("❌ 缓存资产已过期。")
                sys.exit(0)
            with open(CACHE_FILE, "r", encoding="utf-8") as f:
                targets = json.load(f)
            if os.path.exists(CACHE_FILE): os.remove(CACHE_FILE)

            execute_ping_pool(targets)
            sys.exit(0)

    print(f"🔍 正在定向侦测目标: {raw_arg}\n")
    cmd = ["ping", "-c", "3", "-w", "4", raw_arg]
    res = subprocess.run(cmd, capture_output=True, text=True)
    print(res.stdout or res.stderr)

if __name__ == "__main__":
    main()

第二段问题的表现

首先就是明明ping的通,但却无法正常显示能正常连接,例如这一段

[202X/07/04 15:50] user_test: ping more
[202X/07/04 15:50] my_ping_bot: ⏳ 正在执行系统运维脚本 ping...
[202X/07/04 15:50] my_ping_bot: 📝 多目标链路配置

请直接在下方发送你需要同时探测的多个公网IPv4地址或域名,每行一个目标:
[202X/07/04 15:50] user_test: baidu.com
com.com
[202X/07/04 15:50] my_ping_bot: 🛡️ 多路验证挑战核验面板

确认将按顺序对以下 2 个节点建立流式滚动拨测矩阵吗?
📋 (baidu.com, com.com)

请点击下方控制板按钮直接核准:
[202X/07/04 15:51] my_ping_bot: 🛰️ 链路流式滚动监测启动

总体监测矩阵规模: 2 个节点
状态: 正在按序线性下探,请留意后续逐个回传面板...
[202X/07/04 15:51] my_ping_bot: 🔴 baidu.com 链路超时,全包丢失。
110.242.74.102 : [0], 64 bytes, 43.7 ms (43.7 avg, 0% loss)
110.242.74.102 : [1], 64 bytes, 43.6 ms (43.7 avg, 0% loss)
110.242.74.102 : [2], 64 bytes, 43.5 ms (43.6 avg, 0% loss)
110.242.74.102 : [3], 64 bytes, 43.6 ms (43.6 avg, 0% loss)
110.242.74.102 : [4], 64 bytes, 43.5 ms (43.6 avg, 0% loss)


110.242.74.102 : xmt/rcv/%loss = 5/5/0%, min/avg/max = 43.5/43.6/43.7

[202X/07/04 15:51] my_ping_bot: 🔴 com.com 链路超时,全包丢失。
104.26.5.148 : [0], 64 bytes, 3.52 ms (3.52 avg, 0% loss)
104.26.5.148 : [1], 64 bytes, 3.28 ms (3.40 avg, 0% loss)
104.26.5.148 : [2], 64 bytes, 3.31 ms (3.37 avg, 0% loss)
104.26.5.148 : [3], 64 bytes, 3.18 ms (3.32 avg, 0% loss)
104.26.5.148 : [4], 64 bytes, 3.78 ms (3.41 avg, 0% loss)


104.26.5.148 : xmt/rcv/%loss = 5/5/0%, min/avg/max = 3.18/3.41/3.78

[202X/07/04 15:51] my_ping_bot: 🏁 流式滚动侦测全盘终结报告
------------------------------------
📊 监测节点总数: 2
🟢 通畅节点正常: 0
🔴 彻底丢包断交: 2
🚫 域名寻址死锁: 0
💥 未知运行崩溃: 0
------------------------------------
✨ 所有分配的拨测流水线已全部打完收工并安全解构。

我也搞不懂为什么,因为我写到后面脑子就糊涂了,这玩意我反反复复研究了将近一个小时,但是还是没能弄好

© 本文著作权归作者所有,未经许可不得转载使用。