IP/域名Ping告警监控推送到webhook
import tkinter as tk
from tkinter import ttk, messagebox, simpledialog, filedialog
import subprocess
import threading
import time
import requests
import json
import os
import logging
from logging.handlers import TimedRotatingFileHandler
import re
from datetime import datetime
import ttkbootstrap as ttkbs
from ttkbootstrap.constants import *
import winreg
import sys
from PIL import Image, ImageDraw
from pystray import Icon as TrayIcon, Menu as TrayMenu, MenuItem as TrayMenuItem
# ========== 基础配置 ==========
# 使用程序运行目录作为默认存储位置
APP_DATA_DIR = os.path.abspath(".")
# 配置文件和日志目录路径(默认值)
CONFIG_FILE = os.path.join(APP_DATA_DIR, "ping_monitor_config.json")
DEFAULT_LOG_DIR = os.path.join(APP_DATA_DIR, "logs")
LOG_DIR = DEFAULT_LOG_DIR # 初始化为默认值
DEFAULT_PING_TIMEOUT = 2
# 确保默认日志目录存在
os.makedirs(DEFAULT_LOG_DIR, exist_ok=True)
# ========== 日志配置 ==========
def init_logger(days_to_keep=30):
if not os.path.exists(LOG_DIR):
os.makedirs(LOG_DIR)
logger = logging.getLogger("PingMonitor")
logger.setLevel(logging.INFO)
if logger.handlers:
# 清除现有处理器
for handler in logger.handlers[:]:
logger.removeHandler(handler)
handler.close()
log_handler = TimedRotatingFileHandler(
filename=os.path.join(LOG_DIR, "ping_monitor.log"),
when="midnight",
interval=1,
backupCount=days_to_keep,
encoding="utf-8"
)
formatter = logging.Formatter(
"%(asctime)s - %(levelname)s - %(module)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
)
log_handler.setFormatter(formatter)
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
logger.addHandler(log_handler)
logger.addHandler(console_handler)
return logger
def cleanup_old_logs(days_to_keep):
"""清理旧日志文件"""
try:
if not os.path.exists(LOG_DIR):
return
# 计算截止时间
cutoff_time = time.time() - (days_to_keep * 86400)
# 遍历日志目录
for file in os.listdir(LOG_DIR):
file_path = os.path.join(LOG_DIR, file)
if os.path.isfile(file_path) and file.startswith("ping_monitor"):
# 检查文件修改时间
if os.path.getmtime(file_path) < cutoff_time:
os.remove(file_path)
# 避免在logger未初始化时调用
try:
logger.info(f"已删除旧日志文件:{file}")
except NameError:
pass
except Exception as e:
# 避免在logger未初始化时调用
try:
logger.error(f"清理旧日志失败:{str(e)}")
except NameError:
print(f"清理旧日志失败:{str(e)}")
logger = init_logger()
# 清理旧日志(在logger初始化后调用)
cleanup_old_logs(30)
# ========== 主程序类 ==========
class PingMonitorApp:
def __init__(self, root):
self.root = root
self.root.title("IP/域名Ping告警监控")
self.root.geometry("900x600")
self.root.protocol("WM_DELETE_WINDOW", self.on_close)
# 全局变量
self.monitor_list = [] # [{name, ip, interval, fail_threshold, status, fail_count, is_paused, webhook_id}]
self.webhooks = [] # [{id, name, url, type}]
self.status_var = tk.StringVar(value="就绪 | 监控项:0 | 告警中:0 | 最后推送:-")
# 拆分为异常模板和恢复模板
self.alert_template = tk.StringVar(value="""⚠️ 【监控告警】
📋 监控项:{name}
🌐 IP/域名:{ip}
⏰ 告警时间:{time}
📊 当前状态:{status}""")
self.recovery_template = tk.StringVar(value="""✅ 【监控恢复】
📋 监控项:{name}
🌐 IP/域名:{ip}
⏰ 恢复时间:{time}
📊 当前状态:{status}
⏱️ 问题持续时间:{duration}""")
self.is_running = True
self.lock = threading.Lock()
self.ping_timeout = DEFAULT_PING_TIMEOUT
self.ping_timeout_var = tk.IntVar(value=self.ping_timeout)
# 日志保存天数
self.log_retention_days = tk.IntVar(value=30)
self.log_retention_options = [1, 7, 30, 90, 180, 365] # 固定选项:1天/7天/30天/90天/180天/365天
# 托盘相关
self.tray_icon = None
self.startup_enabled = False
# 初始化UI
self.style = ttkbs.Style(theme="flatly")
self.build_ui()
# 设置窗口图标
try:
# 直接使用内置图标数据,不依赖外部文件
logger.info("使用内置图标数据,确保所有图标一致")
import sys
import os
from PIL import Image, ImageDraw
# 创建一个唯一的图标图案,确保所有图标一致
def create_unified_icon(width, height):
"""创建统一的图标图案"""
image = Image.new('RGBA', (width, height), color=(0, 0, 0, 0))
draw = ImageDraw.Draw(image)
# 绘制一个统一的绿色圆形图标
center = width // 2
radius = min(width, height) // 3
# 外圆环
draw.ellipse([center - radius*2, center - radius*2, center + radius*2, center + radius*2],
fill=(0, 150, 0, 255),
outline=(255, 255, 255, 255),
width=2)
# 内圆
draw.ellipse([center - radius, center - radius, center + radius, center + radius],
fill=(0, 200, 0, 255))
# 十字线
draw.line([center - radius, center, center + radius, center],
fill=(255, 255, 255, 255),
width=3)
draw.line([center, center - radius, center, center + radius],
fill=(255, 255, 255, 255),
width=3)
return image
# 设置窗口图标
if not getattr(sys, 'frozen', False):
# 开发模式
# 创建临时ICO文件
import tempfile
icon_image = create_unified_icon(32, 32)
with tempfile.NamedTemporaryFile(suffix='.ico', delete=False) as temp_file:
temp_ico_path = temp_file.name
# 保存为ICO文件
icon_image.save(temp_ico_path, format='ICO')
# 设置窗口图标
self.root.iconbitmap(temp_ico_path)
logger.info("开发模式窗口图标设置成功")
else:
# 打包模式,PyInstaller会处理窗口图标
logger.info("打包模式,使用PyInstaller图标")
# 保存统一图标到类变量,供托盘图标使用
self.unified_icon = create_unified_icon(64, 64)
logger.info("统一图标创建成功")
except Exception as e:
logger.error(f"窗口图标设置失败:{str(e)}")
import traceback
traceback.print_exc()
# 添加调试日志
logger.info(f"程序初始化完成,当前工作目录:{os.getcwd()}")
logger.info(f"配置文件路径:{CONFIG_FILE}")
logger.info(f"配置文件是否存在:{os.path.exists(CONFIG_FILE)}")
logger.info(f"日志目录:{LOG_DIR}")
logger.info(f"日志目录是否存在:{os.path.exists(LOG_DIR)}")
self.load_config()
# 初始化托盘图标
self.setup_tray_icon()
# 检查开机自启状态
self.check_startup_status()
self.start_monitoring()
def build_ui(self):
# 顶部框架
top_frame = ttkbs.Frame(self.root, padding="10")
top_frame.pack(fill=X)
# Webhook管理
ttkbs.Label(top_frame, text="webhook推送链接:").pack(side=LEFT)
ttkbs.Button(top_frame, text="管理", command=self.manage_webhooks, style="Info.TButton").pack(side=LEFT, padx=5)
# 告警模板
ttkbs.Button(top_frame, text="编辑告警模板", command=self.edit_alert_template, style="Info.TButton").pack(side=LEFT, padx=5)
# Ping超时设置
ttkbs.Label(top_frame, text="Ping超时(秒):").pack(side=LEFT, padx=10)
self.ping_timeout_var = tk.IntVar(value=self.ping_timeout)
self.ping_timeout_entry = ttkbs.Spinbox(top_frame, from_=1, to=10, textvariable=self.ping_timeout_var, width=5)
self.ping_timeout_entry.pack(side=LEFT)
ttkbs.Button(top_frame, text="保存", command=self.update_ping_timeout, style="Success.TButton").pack(side=LEFT, padx=5)
# 中部操作按钮
op_frame = ttkbs.Frame(self.root, padding="10")
op_frame.pack(fill=X)
# 定义日志弹窗
def show_log_popup():
log_win = ttkbs.Toplevel(self.root)
log_win.title("运行日志")
log_win.geometry("900x550")
log_win.transient(self.root)
# 标题
ttkbs.Label(log_win, text="运行日志(最近100行)", font=("微软雅黑", 10, "bold")).pack(fill=X, pady=5)
# 日志保存设置
settings_frame = ttkbs.Frame(log_win, padding="10")
settings_frame.pack(fill=X, pady=5, side=TOP)
# 日志目录选择
ttkbs.Label(settings_frame, text="日志保存目录:").pack(side=LEFT, padx=5)
# 当前日志目录显示
log_dir_var = tk.StringVar(value=LOG_DIR)
log_dir_entry = ttkbs.Entry(settings_frame, textvariable=log_dir_var, width=40, state="readonly")
log_dir_entry.pack(side=LEFT, padx=5)
# 选择目录按钮
def select_log_dir():
selected_dir = filedialog.askdirectory(title="选择日志保存目录", initialdir=LOG_DIR)
if selected_dir:
log_dir_var.set(selected_dir)
ttkbs.Button(settings_frame, text="选择目录", command=select_log_dir, style="Info.TButton").pack(side=LEFT, padx=5)
# 第二行:日志保存时间
settings_frame2 = ttkbs.Frame(log_win, padding="10")
settings_frame2.pack(fill=X, pady=5, side=TOP)
ttkbs.Label(settings_frame2, text="日志保存时间:").pack(side=LEFT, padx=5)
# 创建日志保存天数下拉框
retention_combo = ttkbs.Combobox(
settings_frame2,
values=["1天(当日)", "7天(一星期)", "30天(一个月)", "90天(一季度)", "180天(半年)", "365天(一年)"],
state="readonly",
width=20
)
# 设置当前值
current_days = self.log_retention_days.get()
option_text = {
1: "1天(当日)",
7: "7天(一星期)",
30: "30天(一个月)",
90: "90天(一季度)",
180: "180天(半年)",
365: "365天(一年)"
}
retention_combo.current(list(option_text.keys()).index(current_days))
retention_combo.pack(side=LEFT, padx=5)
# 保存设置
def save_log_settings():
selected_text = retention_combo.get()
# 提取天数
days_map = {
"1天(当日)": 1,
"7天(一星期)": 7,
"30天(一个月)": 30,
"90天(一季度)": 90,
"180天(半年)": 180,
"365天(一年)": 365
}
new_days = days_map[selected_text]
# 获取新的日志目录
new_log_dir = log_dir_var.get()
if new_log_dir:
global LOG_DIR
old_log_dir = LOG_DIR
LOG_DIR = new_log_dir
# 确保新目录存在
os.makedirs(LOG_DIR, exist_ok=True)
# 更新日志保存天数
self.log_retention_days.set(new_days)
self.save_config()
# 重新初始化logger,应用新的日志目录和保存天数
global logger
logger = init_logger(new_days)
# 立即执行日志清理
self.cleanup_old_logs()
messagebox.showinfo("成功", f"日志保存设置已更新!\n日志目录:{new_log_dir}\n保存时间:{selected_text}")
# 刷新日志显示
refresh_log()
else:
messagebox.showerror("错误", "请选择有效的日志目录!")
ttkbs.Button(settings_frame2, text="保存设置", command=save_log_settings, style="Success.TButton").pack(side=LEFT, padx=5)
# 日志文本框
log_text = tk.Text(log_win, height=25, width=90)
log_scroll = ttkbs.Scrollbar(log_win, orient=VERTICAL, command=log_text.yview)
log_text.configure(yscrollcommand=log_scroll.set)
def refresh_log():
try:
log_file = os.path.join(LOG_DIR, "ping_monitor.log")
if os.path.exists(log_file):
with open(log_file, "r", encoding="utf-8", errors="ignore") as f:
lines = f.readlines()[-100:]
log_text.config(state=tk.NORMAL)
log_text.delete(1.0, END)
log_text.insert(END, "".join(lines))
log_text.see(END)
log_text.config(state=tk.DISABLED)
except Exception as e:
logger.error(f"刷新日志失败:{str(e)}")
# 日志操作按钮
btn_frame = ttkbs.Frame(log_win)
btn_frame.pack(fill=X, pady=5)
ttkbs.Button(btn_frame, text="刷新日志", command=refresh_log).pack(side=LEFT, padx=5)
ttkbs.Button(btn_frame, text="关闭", command=log_win.destroy).pack(side=RIGHT, padx=5)
# 布局
log_text.pack(side=LEFT, fill=BOTH, expand=True)
log_scroll.pack(side=RIGHT, fill=Y)
# 初始刷新
refresh_log()
# 操作按钮
ttkbs.Button(op_frame, text="添加监控", command=self.add_monitor_popup, style="Success.TButton").pack(side=LEFT, padx=5)
ttkbs.Button(op_frame, text="暂停选中", command=self.pause_selected, style="Warning.TButton").pack(side=LEFT, padx=5)
ttkbs.Button(op_frame, text="恢复选中", command=self.resume_selected, style="Success.TButton").pack(side=LEFT, padx=5)
ttkbs.Button(op_frame, text="查看日志", command=show_log_popup, style="Info.TButton").pack(side=RIGHT, padx=5)
# 监控列表
list_frame = ttkbs.Frame(self.root, padding="10")
list_frame.pack(fill=BOTH, expand=True)
# 搜索和过滤
search_frame = ttkbs.Frame(list_frame)
search_frame.pack(fill=X, pady=5)
ttkbs.Label(search_frame, text="搜索:").pack(side=LEFT)
self.search_var = tk.StringVar()
search_entry = ttkbs.Entry(search_frame, textvariable=self.search_var, width=30)
search_entry.pack(side=LEFT, padx=5)
search_entry.bind("<KeyRelease>", self.filter_monitors)
ttkbs.Label(search_frame, text="状态过滤:").pack(side=LEFT, padx=5)
self.filter_var = tk.StringVar(value="全部")
filter_combo = ttkbs.Combobox(search_frame, textvariable=self.filter_var, values=["全部", "正常", "告警中", "运行中", "已暂停"], state="readonly", width=10)
filter_combo.pack(side=LEFT, padx=5)
filter_combo.bind("<<ComboboxSelected>>", self.filter_monitors)
# 树形列表
columns = ("名称", "IP/域名", "频率(秒)", "失败阈值", "告警状态", "失败次数", "执行状态")
# 创建自定义Treeview样式,使用浅色表头
style = ttkbs.Style()
# 配置Treeview表头样式:浅色背景
style.configure("Custom.Treeview.Heading",
background="#f8f9fa", # 浅灰色背景
foreground="#212529", # 深灰色文字
font=(".", 9, "bold"),
relief="flat",
borderwidth=1)
# 配置Treeview项样式
style.configure("Custom.Treeview",
background="white",
foreground="#212529",
fieldbackground="white",
bordercolor="#dee2e6",
borderwidth=1)
# 配置Treeview选中项样式
style.map("Custom.Treeview",
background=[("selected", "#e9ecef")],
foreground=[("selected", "#212529")])
# 使用自定义样式
self.tree = ttkbs.Treeview(list_frame, columns=columns, show="headings", style="Custom.Treeview")
# 配置列
for col in columns:
# 设置列标题
self.tree.heading(col, text=col, command=lambda c=col: self.sort_tree(c))
# 设置列宽度
if col == "告警状态" or col == "执行状态":
width = 120
else:
width = 100
# 设置对齐方式:名称和IP/域名靠左,其他字段居中
anchor = "w" if col == "名称" or col == "IP/域名" else "center"
# 配置列属性
self.tree.column(col, width=width, stretch=YES, anchor=anchor, minwidth=width)
# 配置Treeview项样式,增加分隔线效果
self.tree.tag_configure("treeitem", font=(".", 9))
# 配置Treeview的样式,使用ttkbootstrap特有的方式
# 为不同状态添加不同的前缀文字颜色,使用HTML格式
# 由于ttkbootstrap的Treeview不支持直接的标签颜色,我们通过修改状态文字的方式来实现颜色显示
# 滚动条
tree_scroll = ttkbs.Scrollbar(list_frame, orient=VERTICAL, command=self.tree.yview)
self.tree.configure(yscrollcommand=tree_scroll.set)
# 列表操作按钮
tree_btn_frame = ttkbs.Frame(list_frame)
tree_btn_frame.pack(fill=X, pady=5)
ttkbs.Button(tree_btn_frame, text="编辑选中", command=self.edit_monitor, style="Info.TButton").grid(row=0, column=0, padx=5)
ttkbs.Button(tree_btn_frame, text="删除选中", command=self.delete_monitor, style="Danger.TButton").grid(row=0, column=1, padx=5)
ttkbs.Button(tree_btn_frame, text="刷新列表", command=self.refresh_tree, style="Primary.TButton").grid(row=0, column=2, padx=5)
# 布局列表
self.tree.pack(side=LEFT, fill=BOTH, expand=True)
tree_scroll.pack(side=RIGHT, fill=Y)
# 状态栏
status_frame = ttkbs.Frame(self.root, relief=SUNKEN, borderwidth=1)
status_frame.pack(side=BOTTOM, fill=X)
# 状态信息
status_label = ttkbs.Label(status_frame, textvariable=self.status_var)
status_label.pack(side=LEFT, padx=10, pady=2)
# 作者标识超链接
def open_author_link():
"""打开作者链接"""
import webbrowser
webbrowser.open("https://halo.taofile.cn/")
author_label = ttkbs.Label(status_frame, text="by_zhangt", cursor="hand2", foreground="#007bff")
author_label.pack(side=RIGHT, padx=10, pady=2)
author_label.bind("<Button-1>", lambda e: open_author_link())
def manage_webhooks(self):
win = ttkbs.Toplevel(self.root)
win.title("webhook管理")
win.geometry("700x400")
win.transient(self.root)
win.grab_set()
# 创建webhook列表
columns = ("ID", "名称", "类型", "URL")
tree = ttkbs.Treeview(win, columns=columns, show="headings", style="Custom.Treeview")
for col in columns:
tree.heading(col, text=col)
tree.column(col, width=50 if col == "ID" else 150 if col in ["名称", "类型"] else 300, stretch=YES)
scrollbar = ttkbs.Scrollbar(win, orient=VERTICAL, command=tree.yview)
tree.configure(yscrollcommand=scrollbar.set)
def refresh_webhook_list():
for item in tree.get_children():
tree.delete(item)
for webhook in self.webhooks:
tree.insert("", END, values=(webhook["id"], webhook["name"], webhook["type"], webhook["url"]))
# 添加webhook
def add_webhook():
add_win = ttkbs.Toplevel(win)
add_win.title("添加webhook")
add_win.geometry("500x250")
add_win.transient(win)
add_win.grab_set()
frame = ttkbs.Frame(add_win, padding="20")
frame.pack(fill=BOTH, expand=True)
ttkbs.Label(frame, text="名称:").grid(row=0, column=0, sticky=W, pady=10)
name_var = tk.StringVar()
ttkbs.Entry(frame, textvariable=name_var, width=30).grid(row=0, column=1)
ttkbs.Label(frame, text="类型:").grid(row=1, column=0, sticky=W, pady=10)
type_var = tk.StringVar(value="feishu")
type_combo = ttkbs.Combobox(frame, textvariable=type_var, values=["feishu", "dingtalk"], state="readonly", width=28)
type_combo.grid(row=1, column=1)
ttkbs.Label(frame, text="URL:").grid(row=2, column=0, sticky=W, pady=10)
url_var = tk.StringVar()
ttkbs.Entry(frame, textvariable=url_var, width=50).grid(row=2, column=1)
def save():
name = name_var.get().strip()
url = url_var.get().strip()
webhook_type = type_var.get()
if not name or not url:
messagebox.showerror("错误", "名称和URL不能为空!")
return
new_id = max([w["id"] for w in self.webhooks]) + 1 if self.webhooks else 1
self.webhooks.append({"id": new_id, "name": name, "url": url, "type": webhook_type})
self.save_config()
refresh_webhook_list()
add_win.destroy()
messagebox.showinfo("成功", "webhook添加成功!")
btn_frame = ttkbs.Frame(add_win, padding="10")
btn_frame.pack(fill=X)
ttkbs.Button(btn_frame, text="保存", command=save, style="Success.TButton").pack(side=RIGHT, padx=5)
ttkbs.Button(btn_frame, text="取消", command=add_win.destroy, style="Secondary.TButton").pack(side=RIGHT)
# 测试webhook
def test_webhook():
selected = tree.selection()
if not selected:
messagebox.showwarning("提示", "请先选择一个webhook!")
return
item = tree.item(selected[0])
webhook_id = int(item["values"][0])
webhook = next((w for w in self.webhooks if w["id"] == webhook_id), None)
if not webhook:
return
try:
# 创建模拟监控项数据用于测试
mock_monitor = {
"name": "测试监控项",
"ip": "127.0.0.1",
"webhook_id": webhook_id
}
# 使用告警模板发送测试消息
self.send_alert(mock_monitor, is_recovery=False)
messagebox.showinfo("提示", f"测试消息已发送,请查看{webhook['type']}群!")
except Exception as e:
messagebox.showerror("错误", f"测试失败:{str(e)}")
# 删除webhook
def delete_webhook():
selected = tree.selection()
if not selected:
messagebox.showwarning("提示", "请先选择一个webhook!")
return
item = tree.item(selected[0])
webhook_id = int(item["values"][0])
webhook = next((w for w in self.webhooks if w["id"] == webhook_id), None)
if not webhook:
return
if messagebox.askyesno("确认", f"确定要删除webhook '{webhook['name']}'吗?"):
# 检查是否有监控项正在使用该webhook
used = any(m["webhook_id"] == webhook_id for m in self.monitor_list)
if used:
messagebox.showerror("错误", "该webhook正在被监控项使用,无法删除!")
return
# 删除webhook
self.webhooks = [w for w in self.webhooks if w["id"] != webhook_id]
self.save_config()
refresh_webhook_list()
messagebox.showinfo("成功", "webhook删除成功!")
# 布局
btn_frame = ttkbs.Frame(win, padding="10")
btn_frame.pack(fill=X)
ttkbs.Button(btn_frame, text="添加", command=add_webhook, style="Success.TButton").pack(side=LEFT, padx=5)
ttkbs.Button(btn_frame, text="删除", command=delete_webhook, style="Danger.TButton").pack(side=LEFT, padx=5)
ttkbs.Button(btn_frame, text="测试选中", command=test_webhook, style="Warning.TButton").pack(side=LEFT, padx=5)
tree.pack(side=LEFT, fill=BOTH, expand=True)
scrollbar.pack(side=RIGHT, fill=Y)
refresh_webhook_list()
def add_monitor_popup(self):
add_win = ttkbs.Toplevel(self.root)
add_win.title("添加监控项")
add_win.geometry("500x300")
add_win.transient(self.root)
add_win.grab_set()
frame = ttkbs.Frame(add_win, padding="20")
frame.pack(fill=BOTH, expand=True)
# 表单
ttkbs.Label(frame, text="监控名称:").grid(row=0, column=0, sticky=W, pady=10)
name_var = tk.StringVar()
ttkbs.Entry(frame, textvariable=name_var, width=30).grid(row=0, column=1)
ttkbs.Label(frame, text="IP/域名:").grid(row=1, column=0, sticky=W, pady=10)
ip_var = tk.StringVar()
ttkbs.Entry(frame, textvariable=ip_var, width=30).grid(row=1, column=1)
ttkbs.Label(frame, text="Ping频率(秒):").grid(row=2, column=0, sticky=W, pady=10)
interval_var = tk.StringVar(value="10")
ttkbs.Entry(frame, textvariable=interval_var, width=30).grid(row=2, column=1)
ttkbs.Label(frame, text="失败阈值:").grid(row=3, column=0, sticky=W, pady=10)
fail_threshold_var = tk.StringVar(value="3")
ttkbs.Entry(frame, textvariable=fail_threshold_var, width=30).grid(row=3, column=1)
# webhook选择
ttkbs.Label(frame, text="推送webhook:").grid(row=4, column=0, sticky=W, pady=10)
webhook_var = tk.IntVar()
webhook_combo = ttkbs.Combobox(frame, state="readonly", width=28)
if self.webhooks:
webhook_combo['values'] = [f"{w['name']} ({w['type']})" for w in self.webhooks]
webhook_combo.current(0)
webhook_var.set(self.webhooks[0]['id'])
else:
webhook_combo['values'] = ["无可用webhook"]
webhook_combo.current(0)
webhook_var.set(None)
webhook_combo.grid(row=4, column=1)
def save():
name = name_var.get().strip()
ip = ip_var.get().strip()
interval = interval_var.get().strip()
fail_threshold = fail_threshold_var.get().strip()
if not name or not ip:
messagebox.showerror("错误", "名称和IP/域名不能为空!")
return
try:
interval = int(interval)
fail_threshold = int(fail_threshold)
if interval < 1 or fail_threshold < 1:
raise ValueError("频率和失败阈值必须大于0")
except ValueError as e:
messagebox.showerror("错误", f"参数错误:{str(e)}")
return
# 获取选中的webhook
selected_index = webhook_combo.current()
webhook_id = self.webhooks[selected_index]['id'] if self.webhooks and selected_index != -1 else None
# 添加监控项
self.monitor_list.append({
"name": name,
"ip": ip,
"interval": interval,
"fail_threshold": fail_threshold,
"status": "正常",
"fail_count": 0,
"is_paused": False,
"webhook_id": webhook_id,
"last_check_time": time.time(),
"last_alert_time": 0,
"is_recovered": True,
"alert_start_time": 0
})
self.save_config()
self.refresh_tree()
add_win.destroy()
messagebox.showinfo("成功", "监控项添加成功!")
# 按钮
btn_frame = ttkbs.Frame(add_win, padding="10")
btn_frame.pack(fill=X)
ttkbs.Button(btn_frame, text="添加监控", command=save, style="Success.TButton").pack(side=RIGHT, padx=5)
ttkbs.Button(btn_frame, text="取消", command=add_win.destroy, style="Secondary.TButton").pack(side=RIGHT)
def edit_monitor(self):
selected_items = self.tree.selection()
if not selected_items:
messagebox.showinfo("提示", "请先选中要编辑的监控项!")
return
item_id = selected_items[0]
values = self.tree.item(item_id, "values")
if not values:
return
# 找到对应的监控项
name = values[0]
monitor = next((m for m in self.monitor_list if m["name"] == name), None)
if not monitor:
return
# 创建编辑窗口
edit_win = ttkbs.Toplevel(self.root)
edit_win.title("编辑监控项")
edit_win.geometry("500x300")
edit_win.transient(self.root)
edit_win.grab_set()
frame = ttkbs.Frame(edit_win, padding="20")
frame.pack(fill=BOTH, expand=True)
# 表单
ttkbs.Label(frame, text="监控名称:").grid(row=0, column=0, sticky=W, pady=10)
name_var = tk.StringVar(value=monitor["name"])
ttkbs.Entry(frame, textvariable=name_var, width=30).grid(row=0, column=1)
ttkbs.Label(frame, text="IP/域名:").grid(row=1, column=0, sticky=W, pady=10)
ip_var = tk.StringVar(value=monitor["ip"])
ttkbs.Entry(frame, textvariable=ip_var, width=30).grid(row=1, column=1)
ttkbs.Label(frame, text="Ping频率(秒):").grid(row=2, column=0, sticky=W, pady=10)
interval_var = tk.StringVar(value=str(monitor["interval"]))
ttkbs.Entry(frame, textvariable=interval_var, width=30).grid(row=2, column=1)
ttkbs.Label(frame, text="失败阈值:").grid(row=3, column=0, sticky=W, pady=10)
fail_threshold_var = tk.StringVar(value=str(monitor["fail_threshold"]))
ttkbs.Entry(frame, textvariable=fail_threshold_var, width=30).grid(row=3, column=1)
# webhook选择
ttkbs.Label(frame, text="推送webhook:").grid(row=4, column=0, sticky=W, pady=10)
webhook_combo = ttkbs.Combobox(frame, state="readonly", width=28)
# 保存选中的webhook_id
selected_webhook_id = tk.IntVar()
if self.webhooks:
webhook_combo['values'] = [f"{w['name']} ({w['type']})" for w in self.webhooks]
# 找到当前webhook的索引,添加安全检查
webhook_id = monitor.get("webhook_id", 0)
current_webhook = next((w for w in self.webhooks if w["id"] == webhook_id), self.webhooks[0])
current_index = self.webhooks.index(current_webhook)
webhook_combo.current(current_index)
selected_webhook_id.set(self.webhooks[current_index]['id'])
# 绑定选择事件
def on_webhook_select(event):
selected_index = webhook_combo.current()
if selected_index != -1:
selected_webhook_id.set(self.webhooks[selected_index]['id'])
webhook_combo.bind("<<ComboboxSelected>>", on_webhook_select)
else:
webhook_combo['values'] = ["无可用webhook"]
webhook_combo.current(0)
selected_webhook_id.set(0)
webhook_combo.grid(row=4, column=1)
def save():
# 验证输入
name = name_var.get().strip()
ip = ip_var.get().strip()
interval = interval_var.get().strip()
fail_threshold = fail_threshold_var.get().strip()
if not name or not ip:
messagebox.showerror("错误", "名称和IP/域名不能为空!")
return
try:
interval = int(interval)
fail_threshold = int(fail_threshold)
if interval < 1 or fail_threshold < 1:
raise ValueError("频率和失败阈值必须大于0")
except ValueError as e:
messagebox.showerror("错误", f"参数错误:{str(e)}")
return
# 获取选中的webhook_id
webhook_id = selected_webhook_id.get()
# 更新监控项
monitor["name"] = name
monitor["ip"] = ip
monitor["interval"] = interval
monitor["fail_threshold"] = fail_threshold
monitor["webhook_id"] = webhook_id
self.save_config()
self.refresh_tree()
edit_win.destroy()
messagebox.showinfo("成功", "监控项更新成功!")
# 按钮
btn_frame = ttkbs.Frame(edit_win, padding="10")
btn_frame.pack(fill=X)
ttkbs.Button(btn_frame, text="保存修改", command=save, style="Success.TButton").pack(side=RIGHT, padx=5)
ttkbs.Button(btn_frame, text="取消", command=edit_win.destroy, style="Secondary.TButton").pack(side=RIGHT)
def delete_monitor(self):
selected_items = self.tree.selection()
if not selected_items:
messagebox.showinfo("提示", "请先选中要删除的监控项!")
return
if messagebox.askyesno("确认", "确定要删除选中的监控项吗?"):
for item_id in selected_items:
values = self.tree.item(item_id, "values")
if values:
name = values[0]
self.monitor_list = [m for m in self.monitor_list if m["name"] != name]
self.save_config()
self.refresh_tree()
messagebox.showinfo("成功", "监控项删除成功!")
def pause_selected(self):
selected_items = self.tree.selection()
if not selected_items:
messagebox.showinfo("提示", "请先选中要暂停的监控项!")
return
for item_id in selected_items:
values = self.tree.item(item_id, "values")
if values:
name = values[0]
for monitor in self.monitor_list:
if monitor["name"] == name:
monitor["is_paused"] = True
monitor["status"] = "已暂停"
break
self.save_config()
self.refresh_tree()
messagebox.showinfo("成功", "监控项已暂停!")
def resume_selected(self):
selected_items = self.tree.selection()
if not selected_items:
messagebox.showinfo("提示", "请先选中要恢复的监控项!")
return
for item_id in selected_items:
values = self.tree.item(item_id, "values")
if values:
name = values[0]
for monitor in self.monitor_list:
if monitor["name"] == name:
monitor["is_paused"] = False
monitor["status"] = "正常"
break
self.save_config()
self.refresh_tree()
messagebox.showinfo("成功", "监控项已恢复!")
def edit_alert_template(self):
edit_win = ttkbs.Toplevel(self.root)
edit_win.title("编辑告警模板")
edit_win.geometry("700x400")
edit_win.transient(self.root)
edit_win.grab_set()
frame = ttkbs.Frame(edit_win, padding="20")
frame.pack(fill=BOTH, expand=True)
# 模板类型选择
template_type_var = tk.StringVar(value="alert")
type_frame = ttkbs.Frame(frame)
type_frame.grid(row=0, column=0, sticky=W, pady=10)
ttkbs.Label(type_frame, text="模板类型:").pack(side=LEFT, padx=5)
ttkbs.Radiobutton(type_frame, text="异常告警模板", variable=template_type_var, value="alert").pack(side=LEFT, padx=10)
ttkbs.Radiobutton(type_frame, text="恢复通知模板", variable=template_type_var, value="recovery").pack(side=LEFT, padx=10)
# 模板变量说明
def update_template_help():
if template_type_var.get() == "alert":
help_text = "异常告警模板(支持变量:{name}-监控名称, {ip}-IP/域名, {time}-告警时间, {status}-当前状态):"
current_template = self.alert_template.get()
else:
help_text = "恢复通知模板(支持变量:{name}-监控名称, {ip}-IP/域名, {time}-恢复时间, {status}-当前状态, {duration}-问题持续时间):"
current_template = self.recovery_template.get()
help_label.config(text=help_text)
template_text.delete(1.0, END)
template_text.insert(END, current_template)
help_label = ttkbs.Label(frame, text="")
help_label.grid(row=1, column=0, sticky=W, pady=5)
# 模板文本框
template_text = tk.Text(frame, height=15, width=80)
template_scroll = ttkbs.Scrollbar(frame, orient=VERTICAL, command=template_text.yview)
template_text.configure(yscrollcommand=template_scroll.set)
template_text.grid(row=2, column=0, padx=10, pady=10, sticky=NSEW)
template_scroll.grid(row=2, column=1, sticky=NS, pady=10)
# 配置网格权重
frame.grid_rowconfigure(2, weight=1)
frame.grid_columnconfigure(0, weight=1)
# 绑定模板类型变化事件
template_type_var.trace_add("write", lambda *args: update_template_help())
# 初始加载
update_template_help()
def save():
content = template_text.get(1.0, END).strip()
if template_type_var.get() == "alert":
self.alert_template.set(content)
message = "异常告警模板已保存!"
else:
self.recovery_template.set(content)
message = "恢复通知模板已保存!"
self.save_config()
edit_win.destroy()
messagebox.showinfo("成功", message)
btn_frame = ttkbs.Frame(frame)
btn_frame.grid(row=3, column=0, columnspan=2, pady=10)
ttkbs.Button(btn_frame, text="保存", command=save, style="Success.TButton").grid(row=0, column=0, padx=10)
ttkbs.Button(btn_frame, text="取消", command=edit_win.destroy, style="Secondary.TButton").grid(row=0, column=1, padx=10)
def update_ping_timeout(self):
self.ping_timeout = self.ping_timeout_var.get()
self.save_config()
messagebox.showinfo("成功", f"Ping超时时间已更新为:{self.ping_timeout}秒")
def cleanup_old_logs(self):
"""清理旧日志文件"""
days_to_keep = self.log_retention_days.get()
cleanup_old_logs(days_to_keep)
# 重新初始化logger,应用新的保存天数
global logger
logger = init_logger(days_to_keep)
def start_monitoring(self):
"""开始监控"""
def monitor_loop():
while self.is_running:
time.sleep(1)
current_time = time.time()
with self.lock:
for monitor in self.monitor_list:
if not monitor["is_paused"]:
# 基于interval的动态调度
if current_time - monitor["last_check_time"] >= monitor["interval"]:
self.check_monitor(monitor)
monitor["last_check_time"] = current_time
threading.Thread(target=monitor_loop, daemon=True).start()
def check_monitor(self, monitor):
"""检查单个监控项(使用socket连接方式,完全避免命令行窗口)"""
ALERT_SUPPRESSION_INTERVAL = 300 # 告警抑制间隔(秒)
# 定义要尝试连接的端口列表
test_ports = [80, 443, 22, 21, 3389] # HTTP, HTTPS, SSH, FTP, RDP
ping_success = False
try:
import socket
# 使用socket连接方式检查主机是否可达
for port in test_ports:
sock = None
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(self.ping_timeout / 2) # 每个端口超时时间减半
sock.connect((monitor["ip"], port))
ping_success = True
break # 只要有一个端口连接成功,就认为主机可达
except (socket.timeout, ConnectionRefusedError, OSError):
# 连接失败,尝试下一个端口
continue
finally:
if sock:
sock.close()
# 如果所有端口都连接失败,直接认为主机不可达
# 不再依赖DNS解析,因为DNS解析成功不代表主机可达
pass
if ping_success:
# Ping成功
if monitor["status"] != "正常":
# 恢复通知:状态从告警中变为正常
was_alerting = monitor["status"] == "告警中"
monitor["status"] = "正常"
monitor["fail_count"] = 0
monitor["is_recovered"] = True
self.update_tree_row(monitor)
# 发送恢复通知
if was_alerting:
self.send_alert(monitor, is_recovery=True)
else:
# Ping失败
monitor["fail_count"] += 1
monitor["is_recovered"] = False
if monitor["fail_count"] >= monitor["fail_threshold"] and monitor["status"] != "告警中":
monitor["status"] = "告警中"
# 记录告警开始时间
monitor["alert_start_time"] = time.time()
self.update_tree_row(monitor)
# 告警抑制:避免短时间内重复发送相同告警
current_time = time.time()
if current_time - monitor["last_alert_time"] >= ALERT_SUPPRESSION_INTERVAL:
self.send_alert(monitor)
monitor["last_alert_time"] = current_time
except Exception as e:
logger.error(f"检查监控项 {monitor['name']} 时发生异常:{str(e)}")
monitor["fail_count"] += 1
monitor["is_recovered"] = False
if monitor["fail_count"] >= monitor["fail_threshold"] and monitor["status"] != "告警中":
monitor["status"] = "告警中"
self.update_tree_row(monitor)
# 告警抑制
current_time = time.time()
if current_time - monitor["last_alert_time"] >= ALERT_SUPPRESSION_INTERVAL:
self.send_alert(monitor)
monitor["last_alert_time"] = current_time
def send_alert(self, monitor, is_recovery=False):
"""发送告警或恢复通知"""
# 获取对应的webhook
webhook = next((w for w in self.webhooks if w["id"] == monitor["webhook_id"]), self.webhooks[0] if self.webhooks else None)
if not webhook:
logger.warning(f"监控项 {monitor['name']} 触发通知,但未配置webhook")
return
# 获取当前时间
current_time = datetime.now()
time_str = current_time.strftime("%Y-%m-%d %H:%M:%S")
# 格式化告警消息
if is_recovery:
# 计算持续时间
alert_start_time = monitor.get("alert_start_time", 0)
if alert_start_time > 0:
duration_seconds = int(current_time.timestamp() - alert_start_time)
# 格式化持续时间为易读格式
days, remainder = divmod(duration_seconds, 86400)
hours, remainder = divmod(remainder, 3600)
minutes, seconds = divmod(remainder, 60)
duration_parts = []
if days > 0:
duration_parts.append(f"{days}天")
if hours > 0:
duration_parts.append(f"{hours}小时")
if minutes > 0:
duration_parts.append(f"{minutes}分钟")
if seconds > 0:
duration_parts.append(f"{seconds}秒")
duration_str = " ".join(duration_parts) if duration_parts else "0秒"
else:
duration_str = "未知"
# 使用恢复模板
message = self.recovery_template.get().format(
name=monitor["name"],
ip=monitor["ip"],
time=time_str,
status="正常",
duration=duration_str
)
else:
# 使用告警模板,为测试数据提供默认值
status = monitor.get("status", "告警中")
message = self.alert_template.get().format(
name=monitor["name"],
ip=monitor["ip"],
time=time_str,
status=status
)
# 发送webhook
try:
self.send_webhook(webhook, message)
logger.info(f"{'恢复通知已发送' if is_recovery else '告警已发送'}:{monitor['name']}")
except Exception as e:
logger.error(f"发送{'恢复通知' if is_recovery else '告警'}失败:{str(e)}")
def send_webhook(self, webhook, message):
"""发送webhook消息"""
headers = {'Content-Type': 'application/json'}
if webhook["type"] == "feishu":
# 飞书格式
data = {
"msg_type": "text",
"content": {"text": message}
}
else:
# 钉钉格式
data = {
"msgtype": "text",
"text": {"content": message}
}
response = requests.post(webhook["url"], headers=headers, json=data, timeout=10)
response.raise_for_status()
def update_tree_row(self, monitor):
"""更新单个监控项的显示(使用主线程更新UI,避免闪烁)"""
def update_ui():
# 查找对应的tree item
item_id = None
for item in self.tree.get_children():
values = self.tree.item(item, "values")
if values and values[0] == monitor["name"]:
item_id = item
break
if not item_id:
# 如果找不到,调用完整刷新(应该不会发生)
self.refresh_tree()
return
# 告警状态显示
if monitor["status"] == "正常":
alert_status = "🟢 正常"
alert_color = "#28a745" # 绿色
elif monitor["status"] == "告警中":
alert_status = "🔴 告警中"
alert_color = "#dc3545" # 红色
else:
alert_status = "🟡 " + monitor["status"]
alert_color = "#ffc107" # 黄色
# 执行状态显示
if not monitor["is_paused"]:
monitor_status = "✅ 运行中"
status_color = "#28a745" # 绿色
else:
monitor_status = "⏸️ 已暂停"
status_color = "#6c757d" # 灰色
# 更新行数据
self.tree.set(item_id, column="告警状态", value=alert_status)
self.tree.set(item_id, column="失败次数", value=monitor["fail_count"])
self.tree.set(item_id, column="执行状态", value=monitor_status)
# 使用ttkbootstrap的样式系统,为每一行创建自定义样式
# 首先移除旧的标签
self.tree.item(item_id, tags=())
# 创建唯一的标签名称
row_tag = f"row_{monitor['name']}"
# 配置标签颜色和样式
self.tree.tag_configure(row_tag, foreground=alert_color, font=(".", 9))
# 应用标签到整行,同时添加treeitem标签
self.tree.item(item_id, tags=(row_tag, "treeitem"))
# 更新状态栏
total = len(self.monitor_list)
alerting = sum(1 for m in self.monitor_list if m["status"] == "告警中")
self.status_var.set(f"就绪 | 监控项:{total} | 告警中:{alerting} | 最后推送:-")
# 使用主线程更新UI,避免线程冲突导致的闪烁
self.root.after(0, update_ui)
def refresh_tree(self):
"""刷新树形列表(优化闪烁问题)"""
def update_ui():
# 获取现有项目映射,用于高效更新
existing_items = {}
for item in self.tree.get_children():
values = self.tree.item(item, "values")
if values:
existing_items[values[0]] = item
# 需要添加的新项目
monitor_names = [m["name"] for m in self.monitor_list]
# 更新现有项目或添加新项目
for monitor in self.monitor_list:
# 告警状态显示
if monitor["status"] == "正常":
alert_status = "🟢 正常"
alert_color = "#28a745" # 绿色
elif monitor["status"] == "告警中":
alert_status = "🔴 告警中"
alert_color = "#dc3545" # 红色
else:
alert_status = "🟡 " + monitor["status"]
alert_color = "#ffc107" # 黄色
# 执行状态显示
if not monitor["is_paused"]:
monitor_status = "✅ 运行中"
status_color = "#28a745" # 绿色
else:
monitor_status = "⏸️ 已暂停"
status_color = "#6c757d" # 灰色
# 创建包含所有值的列表
values = [
monitor["name"],
monitor["ip"],
monitor["interval"],
monitor["fail_threshold"],
alert_status,
monitor["fail_count"],
monitor_status
]
if monitor["name"] in existing_items:
# 更新现有项目
item_id = existing_items[monitor["name"]]
self.tree.item(item_id, values=values)
# 逐个更新列值,避免整行重绘
self.tree.set(item_id, column="告警状态", value=alert_status)
self.tree.set(item_id, column="失败次数", value=monitor["fail_count"])
self.tree.set(item_id, column="执行状态", value=monitor_status)
# 更新标签颜色和样式
row_tag = f"row_{monitor['name']}"
self.tree.item(item_id, tags=())
self.tree.tag_configure(row_tag, foreground=alert_color, font=(".", 9))
self.tree.item(item_id, tags=(row_tag, "treeitem"))
else:
# 插入新项目
item_id = self.tree.insert("", END, values=values)
# 添加标签颜色和样式
row_tag = f"row_{monitor['name']}"
self.tree.tag_configure(row_tag, foreground=alert_color, font=(".", 9))
self.tree.item(item_id, tags=(row_tag, "treeitem"))
# 删除不再存在的项目
for item_name, item_id in existing_items.items():
if item_name not in monitor_names:
self.tree.delete(item_id)
# 只在必要时更新状态栏
total = len(self.monitor_list)
alerting = sum(1 for m in self.monitor_list if m["status"] == "告警中")
current_status = self.status_var.get()
new_status = f"就绪 | 监控项:{total} | 告警中:{alerting} | 最后推送:-"
if current_status != new_status:
self.status_var.set(new_status)
# 使用主线程更新UI,避免线程冲突导致的闪烁
self.root.after(0, update_ui)
def sort_tree(self, column):
"""对列表进行排序"""
items = [(self.tree.set(item, column), item) for item in self.tree.get_children()]
items.sort(reverse=self.tree.heading(column, "text") == "告警状态")
for i, (_, item) in enumerate(items):
self.tree.move(item, "", i)
def filter_monitors(self, event=None):
"""根据搜索关键词和状态过滤监控项"""
search_term = self.search_var.get().lower()
filter_status = self.filter_var.get()
# 清空现有内容
for item in self.tree.get_children():
self.tree.delete(item)
# 筛选并重新添加
for monitor in self.monitor_list:
# 应用搜索过滤
if search_term:
if search_term not in monitor["name"].lower() and search_term not in monitor["ip"].lower():
continue
# 应用状态过滤
if filter_status != "全部":
if filter_status in ["运行中", "已暂停"]:
is_running = not monitor["is_paused"]
if (filter_status == "运行中" and not is_running) or (filter_status == "已暂停" and is_running):
continue
else:
if monitor["status"] != filter_status:
continue
# 告警状态显示和标签
if monitor["status"] == "正常":
alert_status = "🟢 正常"
alert_tag = "normal"
elif monitor["status"] == "告警中":
alert_status = "🔴 告警中"
alert_tag = "alert"
else:
alert_status = "🟡 " + monitor["status"]
alert_tag = "warning"
# 执行状态显示和标签
if not monitor["is_paused"]:
monitor_status = "✅ 运行中"
status_tag = "running"
else:
monitor_status = "⏸️ 已暂停"
status_tag = "paused"
# 创建包含所有值的列表
values = [
monitor["name"],
monitor["ip"],
monitor["interval"],
monitor["fail_threshold"],
alert_status,
monitor["fail_count"],
monitor_status
]
# 插入到Treeview,应用颜色标签
item_id = self.tree.insert("", END, values=values, tags=(alert_tag, status_tag))
def load_config(self):
"""加载配置"""
# 声明全局变量
global LOG_DIR
global logger
try:
if os.path.exists(CONFIG_FILE):
with open(CONFIG_FILE, "r", encoding="utf-8") as f:
config = json.load(f)
logger.info(f"成功加载配置文件:{CONFIG_FILE}")
else:
logger.info(f"配置文件不存在:{CONFIG_FILE},使用默认配置")
config = {}
# 加载webhooks
self.webhooks = config.get("webhooks", [])
if not self.webhooks:
# 兼容旧配置
old_url = config.get("webhook_url", "")
if old_url:
self.webhooks = [{"id": 1, "name": "默认webhook", "url": old_url, "type": "feishu"}]
# 加载监控项
self.monitor_list = config.get("monitor_list", [])
# 为每个监控项添加必要字段(如果不存在)
current_time = time.time()
for monitor in self.monitor_list:
if "last_check_time" not in monitor:
monitor["last_check_time"] = current_time
if "last_alert_time" not in monitor:
monitor["last_alert_time"] = 0
if "is_recovered" not in monitor:
monitor["is_recovered"] = True
if "alert_start_time" not in monitor:
monitor["alert_start_time"] = 0
if "webhook_id" not in monitor and self.webhooks:
monitor["webhook_id"] = self.webhooks[0]["id"]
elif "webhook_id" not in monitor:
monitor["webhook_id"] = 0
# 加载其他配置
self.alert_template.set(config.get("alert_template", self.alert_template.get()))
self.recovery_template.set(config.get("recovery_template", self.recovery_template.get()))
self.ping_timeout = config.get("ping_timeout", self.ping_timeout)
self.ping_timeout_var.set(self.ping_timeout)
# 加载日志保存天数配置
log_retention = config.get("log_retention_days", 30)
self.log_retention_days.set(log_retention)
# 加载日志目录配置
log_dir = config.get("log_dir", DEFAULT_LOG_DIR)
LOG_DIR = log_dir
# 确保日志目录存在
os.makedirs(LOG_DIR, exist_ok=True)
self.refresh_tree()
# 更新logger配置
logger = init_logger(log_retention)
logger.info("配置加载成功")
except Exception as e:
# 使用默认配置
self.webhooks = []
self.monitor_list = []
# 重置日志目录为默认值
LOG_DIR = DEFAULT_LOG_DIR
os.makedirs(LOG_DIR, exist_ok=True)
# 重新初始化logger
logger = init_logger()
logger.error(f"加载配置时发生异常:{str(e)}")
def save_config(self):
"""保存配置"""
try:
config = {
"webhooks": self.webhooks,
"monitor_list": self.monitor_list,
"alert_template": self.alert_template.get(),
"recovery_template": self.recovery_template.get(),
"ping_timeout": self.ping_timeout,
"log_retention_days": self.log_retention_days.get(),
"log_dir": LOG_DIR
}
with open(CONFIG_FILE, "w", encoding="utf-8") as f:
json.dump(config, f, ensure_ascii=False, indent=2)
logger.info("配置保存成功")
except Exception as e:
logger.error(f"保存配置时发生异常:{str(e)}")
def create_tray_image(self):
"""创建托盘图标(使用与窗口相同的统一图标)"""
try:
# 使用统一图标
if hasattr(self, 'unified_icon') and self.unified_icon is not None:
logger.info("使用统一图标作为托盘图标")
return self.unified_icon
# 如果统一图标不存在,创建一个与窗口图标相同风格的图标
logger.info("创建与窗口图标相同风格的托盘图标")
# 创建与窗口图标相同的图案
width, height = 64, 64
image = Image.new('RGBA', (width, height), color=(0, 0, 0, 0))
draw = ImageDraw.Draw(image)
# 绘制与窗口图标相同的图案
center = width // 2
radius = min(width, height) // 3
# 外圆环
draw.ellipse([center - radius*2, center - radius*2, center + radius*2, center + radius*2],
fill=(0, 150, 0, 255),
outline=(255, 255, 255, 255),
width=2)
# 内圆
draw.ellipse([center - radius, center - radius, center + radius, center + radius],
fill=(0, 200, 0, 255))
# 十字线
draw.line([center - radius, center, center + radius, center],
fill=(255, 255, 255, 255),
width=3)
draw.line([center, center - radius, center, center + radius],
fill=(255, 255, 255, 255),
width=3)
return image
except Exception as e:
logger.error(f"创建托盘图标失败:{str(e)}")
import traceback
traceback.print_exc()
# 作为最后的备选,创建一个简单的图标
width, height = 64, 64
image = Image.new('RGBA', (width, height), color=(0, 0, 0, 0))
draw = ImageDraw.Draw(image)
draw.ellipse([8, 8, 56, 56], fill=(0, 200, 0, 255), outline=(255, 255, 255, 255), width=2)
return image
def setup_tray_icon(self):
"""设置托盘图标"""
def on_double_click(icon, item):
self.on_show_main_window()
# 创建菜单
menu = TrayMenu(
TrayMenuItem("主界面", self.on_show_main_window, default=True),
TrayMenuItem("开机自启动", self.toggle_startup, checked=lambda item: self.startup_enabled),
TrayMenuItem("退出程序", self.exit_app)
)
# 创建图标
image = self.create_tray_image()
# 创建托盘图标
self.tray_icon = TrayIcon("PingMonitor", image, "IP/域名Ping告警监控", menu)
# 启动托盘图标线程
threading.Thread(target=self.tray_icon.run, daemon=True).start()
def on_show_main_window(self):
"""显示主界面"""
self.root.deiconify()
self.root.lift()
self.root.focus_force()
def toggle_startup(self):
"""切换开机自启状态"""
try:
self.startup_enabled = not self.startup_enabled
if self.startup_enabled:
# 添加到开机自启
key_path = r'SOFTWARE\Microsoft\Windows\CurrentVersion\Run'
with winreg.OpenKey(winreg.HKEY_CURRENT_USER, key_path, 0, winreg.KEY_SET_VALUE) as key:
winreg.SetValueEx(key, "IP_Ping_Monitor", 0, winreg.REG_SZ, sys.argv[0])
messagebox.showinfo("成功", "已开启开机自启")
else:
# 移除开机自启
key_path = r'SOFTWARE\Microsoft\Windows\CurrentVersion\Run'
try:
with winreg.OpenKey(winreg.HKEY_CURRENT_USER, key_path, 0, winreg.KEY_SET_VALUE) as key:
winreg.DeleteValue(key, "IP_Ping_Monitor")
messagebox.showinfo("成功", "已关闭开机自启")
except FileNotFoundError:
# 不存在则忽略
messagebox.showinfo("成功", "已关闭开机自启")
except Exception as e:
messagebox.showerror("错误", f"修改开机自启失败:{str(e)}")
def check_startup_status(self):
"""检查开机自启状态"""
try:
key_path = r'SOFTWARE\Microsoft\Windows\CurrentVersion\Run'
with winreg.OpenKey(winreg.HKEY_CURRENT_USER, key_path, 0, winreg.KEY_READ) as key:
try:
value, _ = winreg.QueryValueEx(key, "IP_Ping_Monitor")
self.startup_enabled = True
except FileNotFoundError:
self.startup_enabled = False
except Exception as e:
logger.error(f"检查开机自启状态失败:{str(e)}")
self.startup_enabled = False
def on_close(self):
"""关闭应用 - 修改为最小化到托盘"""
self.root.withdraw()
def exit_app(self):
"""退出应用"""
self.is_running = False
self.save_config()
if self.tray_icon:
self.tray_icon.stop()
self.root.destroy()
# 主程序入口
if __name__ == "__main__":
try:
root = ttkbs.Window()
app = PingMonitorApp(root)
root.mainloop()
except Exception as e:
import traceback
print("程序运行时发生异常:")
traceback.print_exc()
input("按回车键退出...")
python -m PyInstaller --onefile --windowed --icon=resources\ping48.ico --name=IP_Ping_Monitor 123.py主界面:
添加webhook界面:
添加监控界面:
告警模板:
日志查看 :
- 感谢你赐予我前进的力量
赞赏者名单
因为你们的支持让我意识到写文章的价值🙏
本文是原创文章,采用CC BY-NC-ND 4.0协议,完整转载请注明来自 halo.taofile.cn。
评论
匿名评论
隐私政策
你无需删除空行,直接评论以获取最佳展示效果

