This site is developed to XHTML and CSS2 W3C standards.
If you see this paragraph, your browser does not support those standards and you
need to upgrade. Visit WaSP
for a variety of options.
Paste #72
Posted by: AT2493
Posted on: 2025-11-01 21:25:43
Age: 23 days ago
Views: 21
import socket
import threading
import time
import queue
import os
import json
import re
import hashlib
import colorsys
import base64
import numpy as np
import sounddevice as sd
import opuslib
import requests
from Crypto.PublicKey import RSA
from Crypto.Cipher import PKCS1_OAEP
import ttkbootstrap as tb
from ttkbootstrap.constants import *
from tkinter import simpledialog, messagebox, scrolledtext, filedialog, ttk
import tkinter as tk
from tkinter import font as tkfont
CONFIG_FILE = "client_config.json"
KEYFILE_PRIV = "dm_priv.pem"
KEYFILE_PUB = "dm_pub.pem"
client_socket = None
stop_event = threading.Event()
ignore_list = []
message_history = []
message_history_index = None
VOICE_SERVER_IP = "hoho.ws"
VOICE_SERVER_PORT = 50007
RATE = 16000
CHANNELS = 1
FRAME_SIZE = 960
QUEUE_MAXSIZE = 50
voice_sock = None
encoder = opuslib.Encoder(RATE, CHANNELS, application="voip")
decoder = opuslib.Decoder(RATE, CHANNELS)
audio_queue = queue.Queue(maxsize=QUEUE_MAXSIZE)
send_audio = False
sd_stream = None
UPLOAD_SERVER_URL = ""
user_themes = {}
current_theme_name = "cyborg"
peers_pubkeys = {}
pending_msgs = {}
peers_e2ee_support = {}
last_send_time = 0.0
send_lock = threading.Lock()
e2ee_timers = {}
_control_pm_sent_at = {}
def ensure_keys():
if os.path.exists(KEYFILE_PRIV) and os.path.exists(KEYFILE_PUB):
with open(KEYFILE_PRIV, "rb") as f:
priv_pem = f.read()
with open(KEYFILE_PUB, "rb") as f:
pub_pem_local = f.read()
priv = RSA.import_key(priv_pem)
pub = RSA.import_key(pub_pem_local)
else:
key = RSA.generate(2048)
priv = key
pub = key.publickey()
with open(KEYFILE_PRIV, "wb") as f:
f.write(priv.export_key())
with open(KEYFILE_PUB, "wb") as f:
f.write(pub.export_key())
return priv, pub
priv_key, pub_key = ensure_keys()
pub_pem = pub_key.export_key()
def rsa_encrypt(pub_pem_bytes, message: str) -> str:
pub = RSA.import_key(pub_pem_bytes)
cipher = PKCS1_OAEP.new(pub)
ct = cipher.encrypt(message.encode("utf-8"))
return base64.b64encode(ct).decode("ascii")
def rsa_decrypt(priv_key_obj, b64cipher: str) -> str:
cipher = PKCS1_OAEP.new(priv_key_obj)
pt = cipher.decrypt(base64.b64decode(b64cipher))
return pt.decode("utf-8")
def ui_log(line: str):
try:
chat_box.after(0, lambda: (chat_box.insert(tk.END, line + "\n"), chat_box.see(tk.END)))
except Exception:
print(line)
def ui_notice_purple(line: str):
try:
def _ins():
chat_box.insert(tk.END, line + "\n", ("e2ee_notice",))
chat_box.see(tk.END)
chat_box.after(0, _ins)
except Exception:
print(line)
def ui_insert_encrypted(sender: str, text: str):
try:
def _ins():
chat_box.insert(tk.END, "● ", ("enc_dot",))
chat_box.insert(tk.END, f"(Private) {sender}: {text}\n")
chat_box.see(tk.END)
chat_box.after(0, _ins)
except Exception:
print(f"(Private) {sender}: {text}")
def ui_insert_plain(sender: str, text: str):
try:
def _ins():
chat_box.insert(tk.END, "● ", ("plain_dot",))
chat_box.insert(tk.END, f"(Private) {sender}: {text}\n")
chat_box.see(tk.END)
chat_box.after(0, _ins)
except Exception:
print(f"(Private) {sender}: {text}")
def send_raw(data: bytes):
global last_send_time
with send_lock:
now = time.time()
elapsed = now - last_send_time
if elapsed < 0.2:
time.sleep(0.2 - elapsed)
try:
if client_socket:
client_socket.send(data)
except Exception as e:
ui_log(f"Send failed: {e}")
stop_event.set()
last_send_time = time.time()
def send_text(text: str):
send_raw(text.encode("utf-8"))
def send_pubkey_to(user: str):
payload = base64.b64encode(pub_pem).decode("ascii")
send_text(f"/pm {user} PUBKEY:{payload}")
try:
_control_pm_sent_at[user] = time.time()
except Exception:
pass
def flush_pending_for(user: str):
pending = pending_msgs.pop(user, [])
for msg in pending:
send_encrypted_pm(user, msg)
def flush_pending_plain(user: str):
pending = pending_msgs.pop(user, [])
for msg in pending:
send_text(f"/pm {user} {msg}")
def cancel_e2ee_timer(user: str):
t = e2ee_timers.pop(user, None)
if t:
try:
t.cancel()
except Exception:
pass
private_re = re.compile(r'^\(Private\)\s+(\w+):\s*(.+)$')
def handle_private_message(sender: str, payload: str):
payload = payload.strip()
if payload == "E2EE_CHECK":
send_text(f"/pm {sender} E2EE_OK")
try:
_control_pm_sent_at[sender] = time.time()
except Exception:
pass
peers_e2ee_support[sender] = True
cancel_e2ee_timer(sender)
ui_notice_purple(f"С пользователем {sender} начат защищённый диалог")
flush_pending_for(sender)
return
if payload == "E2EE_OK":
peers_e2ee_support[sender] = True
cancel_e2ee_timer(sender)
ui_notice_purple(f"С пользователем {sender} начат защищённый диалог")
flush_pending_for(sender)
return
if payload.startswith("PUBKEY:"):
b64 = payload[len("PUBKEY:"):].strip()
try:
pem = base64.b64decode(b64)
peers_pubkeys[sender] = pem
send_pubkey_to(sender)
flush_pending_for(sender)
except Exception as e:
ui_log(f"Failed to process PUBKEY from {sender} {e}")
return
if payload.startswith("ENC:"):
if not peers_e2ee_support.get(sender, False):
ui_log(f"[WARN] Received encrypted message from {sender}, but E2EE not confirmed. Ignored.")
return
b64 = payload[len("ENC:"):].strip()
try:
pt = rsa_decrypt(priv_key, b64)
ui_insert_encrypted(sender, pt)
except Exception as e:
ui_log(f"(Private) {sender}: <failed to decrypt> ({e})")
return
if peers_e2ee_support.get(sender, None) is True:
ui_notice_purple(f"{sender} отправил незашифрованное сообщение — переключение на незашифрованный режим")
prev = peers_e2ee_support.get(sender, None)
peers_e2ee_support[sender] = False
cancel_e2ee_timer(sender)
if prev in (None, True):
flush_pending_plain(sender)
ui_insert_plain(sender, payload)
def send_encrypted_pm(user: str, plaintext: str):
support = peers_e2ee_support.get(user)
if support is None:
peers_e2ee_support[user] = False
send_text(f"/pm {user} E2EE_CHECK")
try:
_control_pm_sent_at[user] = time.time()
except Exception:
pass
pending_msgs.setdefault(user, []).append(plaintext)
try:
timer = threading.Timer(5.0, lambda u=user: (_e2ee_timeout(u)))
cancel_e2ee_timer(user)
e2ee_timers[user] = timer
timer.daemon = True
timer.start()
except Exception:
pass
return
if support:
pub = peers_pubkeys.get(user)
if not pub:
pending_msgs.setdefault(user, []).append(plaintext)
send_pubkey_to(user)
return
try:
ct_b64 = rsa_encrypt(pub, plaintext)
except Exception as e:
ui_log(f"Encryption failed: {e}")
return
send_text(f"/pm {user} ENC:{ct_b64}")
else:
send_text(f"/pm {user} {plaintext}")
def _e2ee_timeout(user: str):
if peers_e2ee_support.get(user) is True:
return
peers_e2ee_support[user] = False
cancel_e2ee_timer(user)
flush_pending_plain(user)
def save_config():
cfg = {
"voice_server_ip": VOICE_SERVER_IP,
"voice_server_port": VOICE_SERVER_PORT,
"upload_server_url": UPLOAD_SERVER_URL,
"ignore_list": ignore_list,
"user_themes": user_themes,
"current_theme_name": current_theme_name
}
with open(CONFIG_FILE, "w", encoding="utf-8") as f:
json.dump(cfg, f, indent=2)
def load_config():
global VOICE_SERVER_IP, VOICE_SERVER_PORT, UPLOAD_SERVER_URL
global ignore_list, user_themes, current_theme_name
if os.path.exists(CONFIG_FILE):
try:
with open(CONFIG_FILE, "r", encoding="utf-8") as f:
cfg = json.load(f)
VOICE_SERVER_IP = cfg.get("voice_server_ip", VOICE_SERVER_IP)
VOICE_SERVER_PORT = cfg.get("voice_server_port", VOICE_SERVER_PORT)
UPLOAD_SERVER_URL = cfg.get("upload_server_url", UPLOAD_SERVER_URL)
ignore_list = cfg.get("ignore_list", ignore_list)
user_themes = cfg.get("user_themes", user_themes)
current_theme_name = cfg.get("current_theme_name", current_theme_name)
except Exception as e:
print("Ошибка загрузки конфигурации:", e)
load_config()
def show_connect_dialog(initial_host: str = "", initial_port: str = ""):
dlg = tb.Toplevel(root)
dlg.title("Подключение к серверу")
dlg.transient(root)
dlg.grab_set()
container = tb.Frame(dlg)
container.pack(padx=16, pady=16, fill="both", expand=True)
tb.Label(container, text="Хост:").grid(row=0, column=0, sticky="w", padx=(0,8), pady=(0,8))
host_var = tk.StringVar(value=initial_host)
host_entry = tb.Entry(container, textvariable=host_var, width=32)
host_entry.grid(row=0, column=1, sticky="ew", pady=(0,8))
tb.Label(container, text="Порт:").grid(row=1, column=0, sticky="w", padx=(0,8))
port_var = tk.StringVar(value=str(initial_port) if initial_port else "")
port_entry = tb.Entry(container, textvariable=port_var, width=32)
port_entry.grid(row=1, column=1, sticky="ew")
container.columnconfigure(1, weight=1)
error_var = tk.StringVar(value="")
error_lbl = tb.Label(container, textvariable=error_var, bootstyle=DANGER)
error_lbl.grid(row=2, column=0, columnspan=2, sticky="w", pady=(8,0))
btns = tb.Frame(container)
btns.grid(row=3, column=0, columnspan=2, sticky="e", pady=(8,0))
result = {"value": None}
def validate_and_close(evt=None):
host = host_var.get().strip()
port_text = port_var.get().strip()
if not host:
error_var.set("Укажите хост")
host_entry.focus_set()
return
if not port_text.isdigit():
error_var.set("Порт должен быть числом")
port_entry.focus_set()
return
port = int(port_text)
if port < 1 or port > 65535:
error_var.set("Порт должен быть 1–65535")
port_entry.focus_set()
return
result["value"] = (host, port)
dlg.destroy()
def cancel():
result["value"] = None
dlg.destroy()
ok_btn = tb.Button(btns, text="Подключиться", bootstyle=SUCCESS, command=validate_and_close)
ok_btn.pack(side="right", padx=(6,0))
cancel_btn = tb.Button(btns, text="Отмена", bootstyle=SECONDARY, command=cancel)
cancel_btn.pack(side="right", padx=(0,6))
dlg.bind("<Return>", validate_and_close)
dlg.bind("<Escape>", lambda e: cancel())
host_entry.focus_set()
dlg.update_idletasks()
x = root.winfo_rootx() + (root.winfo_width() // 2) - (dlg.winfo_width() // 2)
y = root.winfo_rooty() + (root.winfo_height() // 2) - (dlg.winfo_height() // 2)
dlg.geometry(f"+{x}+{y}")
root.wait_window(dlg)
return result["value"]
def show_login_dialog():
dlg = tb.Toplevel(root)
dlg.title("Логин")
dlg.transient(root); dlg.grab_set()
container = tb.Frame(dlg)
container.pack(padx=16, pady=16, fill="both", expand=True)
tb.Label(container, text="Логин:").grid(row=0, column=0, sticky="w", padx=(0,8), pady=(0,8))
user_var = tk.StringVar()
user_entry = tb.Entry(container, textvariable=user_var, width=32)
user_entry.grid(row=0, column=1, sticky="ew", pady=(0,8))
tb.Label(container, text="Пароль:").grid(row=1, column=0, sticky="w", padx=(0,8))
pass_var = tk.StringVar()
pass_entry = tb.Entry(container, textvariable=pass_var, show='*', width=32)
pass_entry.grid(row=1, column=1, sticky="ew")
container.columnconfigure(1, weight=1)
error_var = tk.StringVar(value="")
tb.Label(container, textvariable=error_var, bootstyle=DANGER).grid(row=2, column=0, columnspan=2, sticky="w", pady=(8,0))
btns = tb.Frame(container)
btns.grid(row=3, column=0, columnspan=2, sticky="e", pady=(8,0))
result = {"value": None}
def submit(evt=None):
u, p = user_var.get().strip(), pass_var.get()
if not u:
error_var.set("Введите логин")
user_entry.focus_set(); return
if not p:
error_var.set("Введите пароль")
pass_entry.focus_set(); return
result["value"] = (u, p); dlg.destroy()
def cancel(): result["value"] = None; dlg.destroy()
ok_btn = tb.Button(btns, text="Войти", bootstyle=SUCCESS, command=submit)
ok_btn.pack(side="right", padx=(6,0))
tb.Button(btns, text="Отмена", bootstyle=SECONDARY, command=cancel).pack(side="right", padx=(0,6))
dlg.bind("<Return>", submit); dlg.bind("<Escape>", lambda e: cancel())
user_entry.focus_set()
dlg.update_idletasks()
x = root.winfo_rootx() + (root.winfo_width() // 2) - (dlg.winfo_width() // 2)
y = root.winfo_rooty() + (root.winfo_height() // 2) - (dlg.winfo_height() // 2)
dlg.geometry(f"+{x}+{y}")
root.wait_window(dlg)
return result["value"]
def show_register_dialog():
dlg = tb.Toplevel(root)
dlg.title("Регистрация")
dlg.transient(root); dlg.grab_set()
container = tb.Frame(dlg); container.pack(padx=16, pady=16, fill="both", expand=True)
tb.Label(container, text="Логин:").grid(row=0, column=0, sticky="w", padx=(0,8), pady=(0,8))
user_var = tk.StringVar(); user_entry = tb.Entry(container, textvariable=user_var, width=32)
user_entry.grid(row=0, column=1, sticky="ew", pady=(0,8))
tb.Label(container, text="Пароль:").grid(row=1, column=0, sticky="w", padx=(0,8))
pass_var = tk.StringVar(); pass_entry = tb.Entry(container, textvariable=pass_var, show='*', width=32)
pass_entry.grid(row=1, column=1, sticky="ew")
container.columnconfigure(1, weight=1)
error_var = tk.StringVar(value=""); tb.Label(container, textvariable=error_var, bootstyle=DANGER).grid(row=2, column=0, columnspan=2, sticky="w", pady=(8,0))
btns = tb.Frame(container); btns.grid(row=3, column=0, columnspan=2, sticky="e", pady=(8,0))
result = {"value": None}
def submit(evt=None):
u, p = user_var.get().strip(), pass_var.get()
if not u:
error_var.set("Введите логин"); user_entry.focus_set(); return
if not p:
error_var.set("Введите пароль"); pass_entry.focus_set(); return
result["value"] = (u, p); dlg.destroy()
def cancel(): result["value"] = None; dlg.destroy()
tb.Button(btns, text="Зарегистрироваться", bootstyle=SUCCESS, command=submit).pack(side="right", padx=(6,0))
tb.Button(btns, text="Отмена", bootstyle=SECONDARY, command=cancel).pack(side="right", padx=(0,6))
dlg.bind("<Return>", submit); dlg.bind("<Escape>", lambda e: cancel())
user_entry.focus_set(); dlg.update_idletasks()
x = root.winfo_rootx() + (root.winfo_width() // 2) - (dlg.winfo_width() // 2)
y = root.winfo_rooty() + (root.winfo_height() // 2) - (dlg.winfo_height() // 2)
dlg.geometry(f"+{x}+{y}"); root.wait_window(dlg)
return result["value"]
def show_pm_dialog(initial_to: str = ""):
dlg = tb.Toplevel(root); dlg.title("Приватное сообщение"); dlg.transient(root); dlg.grab_set()
container = tb.Frame(dlg); container.pack(padx=16, pady=16, fill="both", expand=True)
tb.Label(container, text="Кому:").grid(row=0, column=0, sticky="w", padx=(0,8), pady=(0,8))
to_var = tk.StringVar(value=initial_to); to_entry = tb.Entry(container, textvariable=to_var, width=32)
to_entry.grid(row=0, column=1, sticky="ew", pady=(0,8))
tb.Label(container, text="Текст:").grid(row=1, column=0, sticky="nw", padx=(0,8))
text_widget = tk.Text(container, width=40, height=5)
text_widget.grid(row=1, column=1, sticky="ew")
container.columnconfigure(1, weight=1)
error_var = tk.StringVar(value=""); tb.Label(container, textvariable=error_var, bootstyle=DANGER).grid(row=2, column=0, columnspan=2, sticky="w", pady=(8,0))
btns = tb.Frame(container); btns.grid(row=3, column=0, columnspan=2, sticky="e", pady=(8,0))
result = {"value": None}
def submit(evt=None):
recip = to_var.get().strip(); msg = text_widget.get("1.0", "end").strip()
if not recip:
error_var.set("Укажите получателя"); to_entry.focus_set(); return
if not msg:
error_var.set("Введите текст сообщения"); text_widget.focus_set(); return
result["value"] = (recip, msg); dlg.destroy()
def cancel(): result["value"] = None; dlg.destroy()
tb.Button(btns, text="Отправить", bootstyle=SUCCESS, command=submit).pack(side="right", padx=(6,0))
tb.Button(btns, text="Отмена", bootstyle=SECONDARY, command=cancel).pack(side="right", padx=(0,6))
dlg.bind("<Return>", submit); dlg.bind("<Escape>", lambda e: cancel())
to_entry.focus_set(); dlg.update_idletasks()
x = root.winfo_rootx() + (root.winfo_width() // 2) - (dlg.winfo_width() // 2)
y = root.winfo_rooty() + (root.winfo_height() // 2) - (dlg.winfo_height() // 2)
dlg.geometry(f"+{x}+{y}"); root.wait_window(dlg)
return result["value"]
def show_voice_server_dialog(initial_host: str, initial_port: int):
return show_connect_dialog(initial_host=str(initial_host or ""), initial_port=str(initial_port or ""))
def show_add_theme_dialog():
dlg = tb.Toplevel(root); dlg.title("Новая тема"); dlg.transient(root); dlg.grab_set()
container = tb.Frame(dlg); container.pack(padx=16, pady=16, fill="both", expand=True)
tb.Label(container, text="Имя темы:").grid(row=0, column=0, sticky="w", padx=(0,8), pady=(0,8))
name_var = tk.StringVar(); name_entry = tb.Entry(container, textvariable=name_var, width=32)
name_entry.grid(row=0, column=1, sticky="ew", pady=(0,8))
tb.Label(container, text="JSON-конфиг:").grid(row=1, column=0, sticky="nw", padx=(0,8))
json_widget = tk.Text(container, width=48, height=6)
json_widget.grid(row=1, column=1, sticky="ew")
container.columnconfigure(1, weight=1)
error_var = tk.StringVar(value=""); tb.Label(container, textvariable=error_var, bootstyle=DANGER).grid(row=2, column=0, columnspan=2, sticky="w", pady=(8,0))
btns = tb.Frame(container); btns.grid(row=3, column=0, columnspan=2, sticky="e", pady=(8,0))
result = {"value": None}
def submit(evt=None):
name = name_var.get().strip(); js = json_widget.get("1.0", "end").strip()
if not name:
error_var.set("Введите имя темы"); name_entry.focus_set(); return
if not js:
error_var.set("Введите JSON-конфиг"); json_widget.focus_set(); return
try:
cfg = json.loads(js)
except Exception as e:
error_var.set(f"Ошибка JSON: {e}"); return
result["value"] = (name, cfg); dlg.destroy()
def cancel(): result["value"] = None; dlg.destroy()
tb.Button(btns, text="Сохранить", bootstyle=SUCCESS, command=submit).pack(side="right", padx=(6,0))
tb.Button(btns, text="Отмена", bootstyle=SECONDARY, command=cancel).pack(side="right", padx=(0,6))
dlg.bind("<Return>", submit); dlg.bind("<Escape>", lambda e: cancel())
name_entry.focus_set(); dlg.update_idletasks()
x = root.winfo_rootx() + (root.winfo_width() // 2) - (dlg.winfo_width() // 2)
y = root.winfo_rooty() + (root.winfo_height() // 2) - (dlg.winfo_height() // 2)
dlg.geometry(f"+{x}+{y}"); root.wait_window(dlg)
return result["value"]
def show_upload_server_dialog(initial_url: str):
dlg = tb.Toplevel(root); dlg.title("HTTP Upload сервер"); dlg.transient(root); dlg.grab_set()
container = tb.Frame(dlg); container.pack(padx=16, pady=16, fill="both", expand=True)
tb.Label(container, text="Базовый адрес:").grid(row=0, column=0, sticky="w", padx=(0,8), pady=(0,8))
url_var = tk.StringVar(value=initial_url or "http://")
url_entry = tb.Entry(container, textvariable=url_var, width=48)
url_entry.grid(row=0, column=1, sticky="ew", pady=(0,8))
container.columnconfigure(1, weight=1)
error_var = tk.StringVar(value=""); tb.Label(container, textvariable=error_var, bootstyle=DANGER).grid(row=1, column=0, columnspan=2, sticky="w")
btns = tb.Frame(container); btns.grid(row=2, column=0, columnspan=2, sticky="e", pady=(8,0))
result = {"value": None}
def submit(evt=None):
url = url_var.get().strip()
if not url:
error_var.set("Введите адрес сервера"); url_entry.focus_set(); return
if not (url.startswith("http://") or url.startswith("https://")):
error_var.set("Адрес должен начинаться с http:// или https://"); return
result["value"] = url.rstrip('/')
dlg.destroy()
def cancel(): result["value"] = None; dlg.destroy()
tb.Button(btns, text="Сохранить", bootstyle=SUCCESS, command=submit).pack(side="right", padx=(6,0))
tb.Button(btns, text="Отмена", bootstyle=SECONDARY, command=cancel).pack(side="right", padx=(0,6))
dlg.bind("<Return>", submit); dlg.bind("<Escape>", lambda e: cancel())
url_entry.focus_set(); dlg.update_idletasks()
x = root.winfo_rootx() + (root.winfo_width() // 2) - (dlg.winfo_width() // 2)
y = root.winfo_rooty() + (root.winfo_height() // 2) - (dlg.winfo_height() // 2)
dlg.geometry(f"+{x}+{y}"); root.wait_window(dlg)
return result["value"]
def show_join_channel_dialog():
dlg = tb.Toplevel(root); dlg.title("Вход на канал"); dlg.transient(root); dlg.grab_set()
container = tb.Frame(dlg); container.pack(padx=16, pady=16, fill="both", expand=True)
tb.Label(container, text="Канал:").grid(row=0, column=0, sticky="w", padx=(0,8), pady=(0,8))
name_var = tk.StringVar(); entry = tb.Entry(container, textvariable=name_var, width=32)
entry.grid(row=0, column=1, sticky="ew", pady=(0,8)); container.columnconfigure(1, weight=1)
error_var = tk.StringVar(value=""); tb.Label(container, textvariable=error_var, bootstyle=DANGER).grid(row=1, column=0, columnspan=2, sticky="w")
btns = tb.Frame(container); btns.grid(row=2, column=0, columnspan=2, sticky="e", pady=(8,0))
result = {"value": None}
def submit(evt=None):
ch = name_var.get().strip()
if not ch:
error_var.set("Введите название канала"); entry.focus_set(); return
result["value"] = ch; dlg.destroy()
def cancel(): result["value"] = None; dlg.destroy()
tb.Button(btns, text="Войти", bootstyle=SUCCESS, command=submit).pack(side="right", padx=(6,0))
tb.Button(btns, text="Отмена", bootstyle=SECONDARY, command=cancel).pack(side="right", padx=(0,6))
dlg.bind("<Return>", submit); dlg.bind("<Escape>", lambda e: cancel())
entry.focus_set(); dlg.update_idletasks()
x = root.winfo_rootx() + (root.winfo_width() // 2) - (dlg.winfo_width() // 2)
y = root.winfo_rooty() + (root.winfo_height() // 2) - (dlg.winfo_height() // 2)
dlg.geometry(f"+{x}+{y}"); root.wait_window(dlg)
return result["value"]
def show_ignore_list_dialog(initial_value: str = ""):
dlg = tb.Toplevel(root); dlg.title("Игнор-лист"); dlg.transient(root); dlg.grab_set()
container = tb.Frame(dlg); container.pack(padx=16, pady=16, fill="both", expand=True)
tb.Label(container, text="Слова через запятую:").grid(row=0, column=0, sticky="w", padx=(0,8), pady=(0,8))
text_var = tk.StringVar(value=initial_value)
entry = tb.Entry(container, textvariable=text_var, width=60)
entry.grid(row=0, column=1, sticky="ew", pady=(0,8)); container.columnconfigure(1, weight=1)
btns = tb.Frame(container); btns.grid(row=1, column=0, columnspan=2, sticky="e")
result = {"value": None}
def submit(evt=None):
result["value"] = text_var.get(); dlg.destroy()
def cancel(): result["value"] = None; dlg.destroy()
tb.Button(btns, text="Сохранить", bootstyle=SUCCESS, command=submit).pack(side="right", padx=(6,0))
tb.Button(btns, text="Отмена", bootstyle=SECONDARY, command=cancel).pack(side="right", padx=(0,6))
dlg.bind("<Return>", submit); dlg.bind("<Escape>", lambda e: cancel())
entry.focus_set(); dlg.update_idletasks()
x = root.winfo_rootx() + (root.winfo_width() // 2) - (dlg.winfo_width() // 2)
y = root.winfo_rooty() + (root.winfo_height() // 2) - (dlg.winfo_height() // 2)
dlg.geometry(f"+{x}+{y}"); root.wait_window(dlg)
return result["value"]
def connect_to_server():
global client_socket
result = show_connect_dialog(initial_host="dmconnect.hoho.ws", initial_port="42439")
if not result:
return
ip, port = result
try:
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client_socket.connect((ip, port))
threading.Thread(target=receive_messages, daemon=True).start()
threading.Thread(target=send_keepalive, daemon=True).start()
chat_box.insert(tk.END, f"[+] Подключено к {ip}:{port}\n")
chat_box.see(tk.END)
try:
update_connection_menu()
except Exception:
pass
except Exception as e:
messagebox.showerror("Ошибка подключения", str(e))
NICK_PATTERN = re.compile(
r"^(?:\[[^\]]*\]\s*)?(?:<)?([A-Za-z0-9_\-\.@/]+)(?:>)?:\s*(.*)$"
)
nick_tag_cache = {}
def color_for_nick(nick: str) -> str:
digest = hashlib.md5(nick.encode("utf-8")).hexdigest()
hue = int(digest[:6], 16) % 360
s, v = 0.70, 0.90
r, g, b = colorsys.hsv_to_rgb(hue/360.0, s, v)
return f"#{int(r*255):02X}{int(g*255):02X}{int(b*255):02X}"
def tag_for_nick(nick: str) -> str:
tag = nick_tag_cache.get(nick)
if tag:
return tag
tag = f"nick_{nick}"
chat_box.tag_configure(tag, font=bold_font, foreground=color_for_nick(nick))
def on_click(event, n=nick):
pm = show_pm_dialog(initial_to=n)
if pm and client_socket:
recipient, text = pm
try:
send_encrypted_pm(recipient, text)
except Exception:
pass
chat_box.tag_bind(tag, "<Button-1>", on_click)
chat_box.tag_bind(tag, "<Enter>", lambda e: chat_box.config(cursor="hand2"))
chat_box.tag_bind(tag, "<Leave>", lambda e: chat_box.config(cursor=""))
nick_tag_cache[nick] = tag
return tag
def insert_formatted_message(message: str):
if message.startswith('***'):
chat_box.insert(tk.END, message + "\n", ("important",))
chat_box.see(tk.END)
return
m = NICK_PATTERN.match(message)
if m:
nick, msg_text = m.group(1), m.group(2)
chat_box.insert(tk.END, nick, (tag_for_nick(nick),))
chat_box.insert(tk.END, f": {msg_text}\n")
else:
chat_box.insert(tk.END, message + "\n")
chat_box.see(tk.END)
def receive_messages():
global client_socket
while not stop_event.is_set():
try:
data = client_socket.recv(4096)
if not data:
break
message = data.decode('utf-8', errors='ignore').strip()
if not message:
continue
message = message.replace("*Ping!*", "").strip()
if not message:
continue
if message.startswith("(Private)") and "PUBKEY:" in message:
pass
m = private_re.match(message)
if m:
sender, payload = m.group(1), m.group(2)
if payload.startswith("PUBKEY:"):
handle_private_message(sender, payload)
continue
handle_private_message(sender, payload)
continue
if message.startswith("Private message sent to "):
try:
nick = message.split("to ", 1)[1].strip()
if nick.endswith('.'):
nick = nick[:-1].strip()
ts = _control_pm_sent_at.get(nick)
if ts and (time.time() - ts) <= 3.0:
continue
except Exception:
pass
if any(word in message for word in ignore_list):
continue
chat_box.after(0, insert_formatted_message, message)
except Exception:
break
def send_keepalive():
while not stop_event.is_set():
try:
if client_socket:
client_socket.send(b"/")
except:
break
time.sleep(5)
def send_message(event=None):
if not client_socket:
chat_box.insert(tk.END, "[!] Не подключено к серверу\n")
chat_box.see(tk.END)
return
msg = msg_entry.get()
if msg:
try:
pm_match = re.match(r'^/pm\s+(\w+)\s+(.+)$', msg.strip())
if pm_match:
user = pm_match.group(1)
text = pm_match.group(2)
send_encrypted_pm(user, text)
else:
client_socket.send(msg.encode('utf-8'))
try:
if msg.strip():
message_history.append(msg)
global message_history_index
message_history_index = None
except Exception:
pass
msg_entry.delete(0, tk.END)
except Exception:
chat_box.insert(tk.END, "[!] Ошибка отправки\n")
chat_box.see(tk.END)
def menu_login():
creds = show_login_dialog()
if creds and client_socket:
user, password = creds
client_socket.send(f"/login {user} {password}".encode('utf-8'))
def menu_register():
creds = show_register_dialog()
if creds and client_socket:
user, password = creds
client_socket.send(f"/register {user} {password}".encode('utf-8'))
def menu_pm():
pm = show_pm_dialog()
if pm and client_socket:
recipient, text = pm
send_encrypted_pm(recipient, text)
def menu_join():
channel = show_join_channel_dialog()
if channel and client_socket:
client_socket.send(f"/join_server {channel}".encode('utf-8'))
def menu_ignore():
global ignore_list
ignore_text = show_ignore_list_dialog(", ".join(ignore_list))
if ignore_text is not None:
ignore_list = [w.strip() for w in ignore_text.split(",") if w.strip()]
chat_box.insert(tk.END, f"[+] Игнор обновлён: {ignore_list}\n")
chat_box.see(tk.END)
save_config()
def menu_theme(theme_name: str):
global current_theme_name
if theme_name in user_themes:
theme_config = user_themes[theme_name]
root.style.configure("TEntry", foreground=theme_config.get("fg", "white"),
background=theme_config.get("bg", "#1e1e1e"))
root.style.configure("TLabel", foreground=theme_config.get("fg", "white"),
background=theme_config.get("bg", "#1e1e1e"))
chat_box.config(bg=theme_config.get("bg", "#1e1e1e"),
fg=theme_config.get("fg", "white"))
else:
root.style.theme_use(theme_name)
current_theme_name = theme_name
save_config()
def add_user_theme():
res = show_add_theme_dialog()
if res:
theme_name, config = res
try:
user_themes[theme_name] = config
menu_theme(theme_name)
save_config()
chat_box.insert(tk.END, f"[+] Пользовательская тема '{theme_name}' добавлена и применена\n")
chat_box.see(tk.END)
except Exception as e:
messagebox.showerror("Ошибка темы", f"Неверный формат темы: {e}")
def audio_callback(indata, frames, time_info, status):
if send_audio:
try:
audio_queue.put_nowait(indata[:,0].copy())
except queue.Full:
pass
def audio_send_thread():
global voice_sock
while True:
try:
frame = audio_queue.get(timeout=0.1)
if len(frame) < FRAME_SIZE:
frame = np.pad(frame, (0, FRAME_SIZE - len(frame)), 'constant')
pcm16 = (frame * 32767).astype(np.int16)
packet = encoder.encode(pcm16.tobytes(), FRAME_SIZE)
voice_sock.sendall(len(packet).to_bytes(4, 'big') + packet)
except queue.Empty:
continue
except Exception as e:
print("Send error:", e)
break
def audio_receive_thread():
global voice_sock
try:
with sd.OutputStream(samplerate=RATE, channels=CHANNELS, blocksize=FRAME_SIZE) as out_stream:
buffer = b''
while True:
try:
data = voice_sock.recv(4096)
if not data:
break
buffer += data
while len(buffer) >= 4:
length = int.from_bytes(buffer[:4], 'big')
if len(buffer) < 4 + length:
break
packet = buffer[4:4+length]
buffer = buffer[4+length:]
try:
pcm = decoder.decode(packet, FRAME_SIZE)
audio = np.frombuffer(pcm, dtype=np.int16).astype(np.float32)/32768
out_stream.write(audio)
except opuslib.OpusError:
continue
except Exception as e:
print("Receive error:", e)
break
except Exception as e:
print("Output stream error:", e)
def switch_device(device_name):
global sd_stream
if sd_stream:
try:
sd_stream.stop(); sd_stream.close()
except:
pass
try:
sd_stream = sd.InputStream(
device=device_name,
channels=CHANNELS,
samplerate=RATE,
blocksize=FRAME_SIZE,
dtype='float32',
latency='low',
callback=audio_callback
)
sd_stream.start()
except Exception as e:
print(f"Cannot open microphone '{device_name}': {e}")
def toggle_send():
global send_audio
send_audio = var.get()
def select_device(event):
threading.Thread(
target=switch_device,
args=(input_devices[devices_combo.current()]["name"],),
daemon=True
).start()
def connect_voice_server():
global voice_sock
try:
voice_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
voice_sock.connect((VOICE_SERVER_IP, VOICE_SERVER_PORT))
threading.Thread(target=audio_send_thread, daemon=True).start()
threading.Thread(target=audio_receive_thread, daemon=True).start()
chat_box.insert(tk.END, f"[+] Подключено к голосовому серверу {VOICE_SERVER_IP}:{VOICE_SERVER_PORT}\n")
chat_box.see(tk.END)
except Exception as e:
messagebox.showerror("Голос", f"Ошибка подключения к голосовому серверу: {e}")
def set_voice_server():
global VOICE_SERVER_IP, VOICE_SERVER_PORT
res = show_voice_server_dialog(VOICE_SERVER_IP, VOICE_SERVER_PORT)
if res:
ip, port = res
VOICE_SERVER_IP = ip
VOICE_SERVER_PORT = port
chat_box.insert(tk.END, f"[+] Голосовой сервер изменён на {ip}:{port}\n")
chat_box.see(tk.END)
save_config()
def set_upload_server():
global UPLOAD_SERVER_URL
new_url = show_upload_server_dialog(UPLOAD_SERVER_URL)
if new_url:
UPLOAD_SERVER_URL = new_url
chat_box.insert(tk.END, f"[+] Сервер загрузки изменён на {UPLOAD_SERVER_URL}\n")
chat_box.see(tk.END)
save_config()
def upload_file():
if not UPLOAD_SERVER_URL:
messagebox.showwarning("Upload", "Сначала настройте адрес HTTP Upload сервера в меню.")
return
file_path = filedialog.askopenfilename(title="Выберите файл для отправки")
if not file_path:
return
chat_box.insert(tk.END, f"[⏫] Загружаю файл: {os.path.basename(file_path)}\n")
chat_box.see(tk.END)
def worker(path):
try:
with open(path, "rb") as f:
r = requests.post(f"{UPLOAD_SERVER_URL}/upload", files={"file": f}, timeout=60)
if r.status_code == 200:
data = r.json()
url = data.get("url")
if url:
msg_entry.delete(0, tk.END)
msg_entry.insert(0, url)
chat_box.insert(tk.END, f"[✅] Файл загружен, URL вставлен в поле ввода\n")
else:
chat_box.insert(tk.END, "[!] Сервер вернул ответ без URL\n")
else:
chat_box.insert(tk.END, f"[!] Ошибка HTTP {r.status_code}\n")
except Exception as e:
chat_box.insert(tk.END, f"[!] Ошибка загрузки: {e}\n")
chat_box.see(tk.END)
threading.Thread(target=worker, args=(file_path,), daemon=True).start()
root = tb.Window(themename=current_theme_name)
root.title("Dsconnect")
root.geometry("920x700")
menubar = tk.Menu(root)
root.config(menu=menubar)
connection_menu = tk.Menu(menubar, tearoff=0)
menubar.add_cascade(label="Подключение", menu=connection_menu)
connection_menu.add_command(label="Подключиться к серверу", command=connect_to_server)
CONNECTION_TOGGLE_IDX = connection_menu.index("end")
actions_menu = tk.Menu(menubar, tearoff=0)
menubar.add_cascade(label="Действия", menu=actions_menu)
actions_menu.add_command(label="Логин", command=menu_login)
actions_menu.add_command(label="Регистрация", command=menu_register)
actions_menu.add_command(label="Приватное сообщение", command=menu_pm)
actions_menu.add_command(label="Войти на канал", command=menu_join)
actions_menu.add_command(label="Игнор-лист", command=menu_ignore)
theme_menu = tk.Menu(menubar, tearoff=0)
menubar.add_cascade(label="Темы", menu=theme_menu)
for theme in ["cyborg","darkly","flatly","superhero","morph","solar"]:
theme_menu.add_command(label=theme, command=lambda t=theme: menu_theme(t))
theme_menu.add_command(label="Добавить пользовательскую тему", command=add_user_theme)
voice_menu = tk.Menu(menubar, tearoff=0)
menubar.add_cascade(label="Голосовой сервер", menu=voice_menu)
voice_menu.add_command(label="Настроить адрес", command=set_voice_server)
voice_menu.add_command(label="Подключиться", command=connect_voice_server)
upload_menu = tk.Menu(menubar, tearoff=0)
menubar.add_cascade(label="HTTP Upload сервер", menu=upload_menu)
upload_menu.add_command(label="Настроить адрес", command=set_upload_server)
chat_box = scrolledtext.ScrolledText(root, wrap="word", width=120, height=30,
bg="#1e1e1e", fg="white",
insertbackground="white", font=("Consolas", 11))
chat_box.pack(padx=10, pady=10, fill="both", expand=True)
chat_box.configure(cursor="arrow", insertwidth=0)
def _chat_keyguard(event):
ctrl = (event.state & 0x4) != 0
if ctrl and event.keysym in ("c", "C", "a", "A"):
return None
return "break"
chat_box.bind("<Key>", _chat_keyguard)
chat_box.bind("<<Paste>>", lambda e: "break")
chat_box.bind("<Control-v>", lambda e: "break")
chat_box.bind("<Control-V>", lambda e: "break")
default_font = tkfont.Font(font=chat_box.cget("font"))
bold_font = default_font.copy()
bold_font.configure(weight="bold")
chat_box.tag_configure("nick", font=bold_font)
chat_box.tag_configure("important", font=bold_font, foreground="#2ABA27")
chat_box.tag_configure("e2ee_notice", font=bold_font, foreground="#8E44AD")
chat_box.tag_configure("enc_dot", foreground="#239E21", font=bold_font)
chat_box.tag_configure("plain_dot", foreground="#BA8727", font=bold_font)
tooltip_window = None
def _show_tooltip(event, text="Это сообщение защищено сквозным шифрованием"):
global tooltip_window
try:
_hide_tooltip()
tw = tk.Toplevel(root)
tw.wm_overrideredirect(True)
x = event.x_root + 12
y = event.y_root + 12
tw.wm_geometry(f"+{x}+{y}")
lbl = tk.Label(tw, text=text, bg="#333333", fg="white",
borderwidth=1, relief="solid", padx=6, pady=3,
font=("Consolas", 9))
lbl.pack()
tooltip_window = tw
except Exception:
pass
def _move_tooltip(event):
try:
if tooltip_window:
x = event.x_root + 12
y = event.y_root + 12
tooltip_window.wm_geometry(f"+{x}+{y}")
except Exception:
pass
def _hide_tooltip(event=None):
global tooltip_window
try:
if tooltip_window:
tooltip_window.destroy()
tooltip_window = None
except Exception:
tooltip_window = None
chat_box.tag_bind("enc_dot", "<Enter>", _show_tooltip)
chat_box.tag_bind("enc_dot", "<Leave>", _hide_tooltip)
chat_box.tag_bind("enc_dot", "<Motion>", _move_tooltip)
chat_box.tag_bind("plain_dot", "<Enter>", lambda e: _show_tooltip(e, "Это сообщение не зашифровано"))
chat_box.tag_bind("plain_dot", "<Leave>", _hide_tooltip)
chat_box.tag_bind("plain_dot", "<Motion>", _move_tooltip)
frame_bottom = tb.Frame(root)
frame_bottom.pack(fill="x", padx=10, pady=5)
msg_entry = tb.Entry(frame_bottom, width=80)
msg_entry.pack(side="left", padx=5, fill="x", expand=True)
msg_entry.bind("<Return>", send_message)
def _history_prev(event=None):
global message_history_index
try:
if not message_history:
return "break"
if message_history_index is None:
message_history_index = len(message_history) - 1
elif message_history_index > 0:
message_history_index -= 1
msg_entry.delete(0, tk.END)
msg_entry.insert(0, message_history[message_history_index])
except Exception:
pass
return "break"
def _history_next(event=None):
global message_history_index
try:
if not message_history:
return "break"
if message_history_index is None:
return "break"
if message_history_index < len(message_history) - 1:
message_history_index += 1
msg_entry.delete(0, tk.END)
msg_entry.insert(0, message_history[message_history_index])
else:
message_history_index = None
msg_entry.delete(0, tk.END)
except Exception:
pass
return "break"
msg_entry.bind("<Up>", _history_prev)
msg_entry.bind("<Down>", _history_next)
send_btn = tb.Button(frame_bottom, text="Отправить", bootstyle=SUCCESS, command=send_message)
send_btn.pack(side="left", padx=5)
file_btn = tb.Button(frame_bottom, text="📎", bootstyle="secondary-outline", command=upload_file)
file_btn.pack(side="left", padx=5)
voice_frame = tb.Frame(root)
voice_frame.pack(padx=10, pady=5, fill="x")
var = tk.BooleanVar()
ttk.Checkbutton(voice_frame, text="Говорить", variable=var, command=toggle_send).pack(side="left", padx=5)
ttk.Label(voice_frame, text="Выберите микрофон:").pack(side="left", padx=5)
devices = sd.query_devices()
input_devices = [d for d in devices if d["max_input_channels"] > 0]
device_names = [d["name"] for d in input_devices] if input_devices else []
devices_combo = ttk.Combobox(voice_frame, values=device_names, width=50)
if device_names:
devices_combo.current(0)
devices_combo.bind("<<ComboboxSelected>>", select_device)
devices_combo.pack(side="left", padx=5)
if input_devices:
threading.Thread(target=switch_device, args=(input_devices[0]["name"],), daemon=True).start()
def on_closing():
global send_audio, stop_event
send_audio = False
stop_event.set()
try:
if client_socket:
client_socket.close()
except: pass
try:
if voice_sock:
voice_sock.close()
except: pass
try:
if sd_stream:
sd_stream.stop(); sd_stream.close()
except: pass
save_config()
root.destroy()
root.protocol("WM_DELETE_WINDOW", on_closing)
def update_connection_menu():
try:
if client_socket:
connection_menu.entryconfig(CONNECTION_TOGGLE_IDX,
label="Отключиться",
command=disconnect_from_server)
else:
connection_menu.entryconfig(CONNECTION_TOGGLE_IDX,
label="Подключиться к серверу",
command=connect_to_server)
except Exception:
pass
def disconnect_from_server():
global client_socket
try:
if client_socket:
try:
client_socket.shutdown(socket.SHUT_RDWR)
except Exception:
pass
try:
client_socket.close()
except Exception:
pass
client_socket = None
chat_box.insert(tk.END, "[-] Отключено от сервера\n")
chat_box.see(tk.END)
finally:
update_connection_menu()
update_connection_menu()
root.mainloop()
Download raw |
Create new paste