Disha-code64/Network-port-scanner-

GitHub: Disha-code64/Network-port-scanner-

一款基于Python Tkinter的图形化TCP端口扫描工具,支持多线程并发扫描、实时进度显示和结果导出。

Stars: 0 | Forks: 0

``` self.total_ports = max(0, end_port - start_port + 1) self.scanned_count = 0 self.open_ports = [] # list[(port, service)] self._lock = threading.Lock() self.result_queue = queue.Queue() def stop(self): self._stop_event.set() def _scan_port(self, port): if self._stop_event.is_set(): return try: s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.settimeout(self.timeout) result = s.connect_ex((self.target, port)) if result == 0: service = COMMON_PORTS.get(port, 'Unknown') with self._lock: self.open_ports.append((port, service)) self.result_queue.put(('open', port, service)) s.close() except Exception as e: self.result_queue.put(('error', port, str(e))) finally: with self._lock: self.scanned_count += 1 self.result_queue.put(('progress', self.scanned_count, self.total_ports)) def resolve_target(self): return socket.gethostbyname(self.target) def run(self): sem = threading.Semaphore(self.max_workers) threads = [] for port in range(self.start_port, self.end_port + 1): if self._stop_event.is_set(): break sem.acquire() t = threading.Thread(target=self._worker_wrapper, args=(sem, port), daemon=True) threads.append(t) t.start() for t in threads: t.join() self.result_queue.put(('done', None, None)) def _worker_wrapper(self, sem, port): try: self._scan_port(port) finally: sem.release() ``` ``` self.scanner_thread = None self.scanner = None self.start_time = None self.poll_after_ms = 40 self._build_ui() def _build_ui(self): # --- Top Frame: Inputs (Only 3 fields) --- frm_top = ttk.LabelFrame(self, text="Scan Settings") frm_top.pack(fill="x", padx=10, pady=10) ttk.Label(frm_top, text="Target (IP / Hostname):").grid(row=0, column=0, padx=8, pady=8, sticky="e") self.ent_target = ttk.Entry(frm_top, width=36) self.ent_target.grid(row=0, column=1, padx=8, pady=8, sticky="w") ttk.Label(frm_top, text="Start Port:").grid(row=0, column=2, padx=8, pady=8, sticky="e") self.ent_start = ttk.Entry(frm_top, width=10) self.ent_start.insert(0, "1") self.ent_start.grid(row=0, column=3, padx=8, pady=8, sticky="w") ttk.Label(frm_top, text="End Port:").grid(row=0, column=4, padx=8, pady=8, sticky="e") self.ent_end = ttk.Entry(frm_top, width=10) self.ent_end.insert(0, "1024") self.ent_end.grid(row=0, column=5, padx=8, pady=8, sticky="w") self.btn_start = ttk.Button(frm_top, text="Start Scan", command=self.start_scan) self.btn_start.grid(row=1, column=4, padx=8, pady=8, sticky="e") self.btn_stop = ttk.Button(frm_top, text="Stop", command=self.stop_scan, state="disabled") self.btn_stop.grid(row=1, column=5, padx=8, pady=8, sticky="w") for i in range(6): frm_top.grid_columnconfigure(i, weight=1) # --- Progress / Status --- frm_status = ttk.LabelFrame(self, text="Status") frm_status.pack(fill="x", padx=10, pady=(0,10)) self.var_status = tk.StringVar(value="Idle") self.lbl_status = ttk.Label(frm_status, textvariable=self.var_status) self.lbl_status.pack(side="left", padx=10, pady=8) self.var_elapsed = tk.StringVar(value="Elapsed: 0.00s") self.lbl_elapsed = ttk.Label(frm_status, textvariable=self.var_elapsed) self.lbl_elapsed.pack(side="right", padx=10, pady=8) self.progress = ttk.Progressbar(frm_status, orient="horizontal", mode="determinate") self.progress.pack(fill="x", padx=10, pady=(0,10)) # --- Results --- frm_results = ttk.LabelFrame(self, text="Open Ports") frm_results.pack(fill="both", expand=True, padx=10, pady=(0,10)) self.txt_results = tk.Text(frm_results, height=16, wrap="none") self.txt_results.pack(fill="both", expand=True, side="left", padx=(10,0), pady=10) yscroll = ttk.Scrollbar(frm_results, orient="vertical", command=self.txt_results.yview) yscroll.pack(side="right", fill="y", pady=10) self.txt_results.configure(yscrollcommand=yscroll.set) xscroll = ttk.Scrollbar(self, orient="horizontal", command=self.txt_results.xview) xscroll.pack(fill="x", padx=10, pady=(0,10)) self.txt_results.configure(xscrollcommand=xscroll.set) # --- Bottom Buttons --- frm_bottom = ttk.Frame(self) frm_bottom.pack(fill="x", padx=10, pady=(0,12)) self.btn_clear = ttk.Button(frm_bottom, text="Clear", command=self.clear_results) self.btn_clear.pack(side="left") self.btn_save = ttk.Button(frm_bottom, text="Save Results", command=self.save_results, state="disabled") self.btn_save.pack(side="right") # ----------------------- # Control Handlers # ----------------------- def start_scan(self): if self.scanner_thread and self.scanner_thread.is_alive(): messagebox.showinfo("Scanner", "A scan is already running.") return target = self.ent_target.get().strip() if not target: messagebox.showerror("Input Error", "Please enter a target IP or hostname.") return try: start_port = int(self.ent_start.get().strip()) end_port = int(self.ent_end.get().strip()) except ValueError: messagebox.showerror("Input Error", "Ports must be integers.") return if not (0 <= start_port <= 65535 and 0 <= end_port <= 65535 and start_port <= end_port): messagebox.showerror("Input Error", "Port range must be within 0–65535 and start ≤ end.") return # Internal defaults (not shown in UI) timeout = 0.5 max_threads = 500 self.scanner = PortScanner(target, start_port, end_port, timeout=timeout, max_workers=max_threads) # Pre-resolve target to catch DNS issues early try: resolved_ip = self.scanner.resolve_target() self.append_text(f"Target: {target} ({resolved_ip})\n") self.append_text(f"Range: {start_port}-{end_port}\n\n") except Exception as e: messagebox.showerror("Resolution Error", f"Failed to resolve target '{target}'.\n{e}") self.scanner = None return self.btn_start.configure(state="disabled") self.btn_stop.configure(state="normal") self.btn_save.configure(state="disabled") self.clear_progress() self.start_time = time.time() self.var_status.set("Scanning...") self.update_elapsed() self.scanner_thread = threading.Thread(target=self.scanner.run, daemon=True) self.scanner_thread.start() self.after(self.poll_after_ms, self.poll_results) def stop_scan(self): if self.scanner: self.scanner.stop() self.var_status.set("Stopping...") def clear_results(self): self.txt_results.delete("1.0", tk.END) self.clear_progress() self.var_status.set("Idle") self.var_elapsed.set("Elapsed: 0.00s") self.btn_save.configure(state="disabled") def save_results(self): if not self.scanner or not self.scanner.open_ports: messagebox.showinfo("Save Results", "No open ports to save.") return default_name = f"open_ports_{int(time.time())}.txt" file_path = filedialog.asksaveasfilename( title="Save results", defaultextension=".txt", initialfile=default_name, filetypes=[("Text Files", "*.txt"), ("All Files", "*.*")] ) if not file_path: return try: with open(file_path, "w", encoding="utf-8") as f: f.write("Open Ports:\n") for port, service in sorted(self.scanner.open_ports, key=lambda x: x[0]): f.write(f"Port {port} ({service}) is open\n") messagebox.showinfo("Saved", f"Results saved to:\n{file_path}") except Exception as e: messagebox.showerror("Save Error", f"Failed to save file.\n{e}") # ----------------------- # UI Helpers # ----------------------- def append_text(self, text): self.txt_results.insert(tk.END, text) self.txt_results.see(tk.END) def clear_progress(self): self.progress.configure(value=0, maximum=1) def update_elapsed(self): if self.start_time and self.var_status.get() in ("Scanning...", "Stopping..."): elapsed = time.time() - self.start_time self.var_elapsed.set(f"Elapsed: {elapsed:.2f}s") self.after(200, self.update_elapsed) def poll_results(self): if not self.scanner: return try: while True: msg_type, a, b = self.scanner.result_queue.get_nowait() if msg_type == 'open': port, service = a, b self.append_text(f"[+] Port {port} ({service}) is open\n") elif msg_type == 'progress': scanned, total = a, b self.progress.configure(maximum=max(total, 1), value=scanned) self.var_status.set(f"Scanning... {scanned}/{total}") elif msg_type == 'done': total_open = len(self.scanner.open_ports) self.append_text("\nScan complete.\n") self.append_text(f"Open ports found: {total_open}\n") self.var_status.set("Completed") self.btn_start.configure(state="normal") self.btn_stop.configure(state="disabled") self.btn_save.configure(state="normal" if total_open else "disabled") self.start_time = None except queue.Empty: pass if self.scanner_thread and self.scanner_thread.is_alive(): self.after(self.poll_after_ms, self.poll_results) else: if self.var_status.get() in ("Scanning...", "Stopping..."): self.var_status.set("Completed") self.btn_start.configure(state="normal") self.btn_stop.configure(state="disabled") if self.scanner and self.scanner.open_ports: self.btn_save.configure(state="normal") ```
标签:DNS查询工具, GUI开发, Nmap, Python, Qt框架, Socket编程, Tkinter, 图形用户界面, 密码管理, 插件系统, 无后门, 服务识别, 漏洞扫描辅助, 端口扫描器, 系统独立性, 网络安全, 虚拟驱动器, 逆向工具, 隐私保护