This site is developed to XHTML and CSS2 W3C standards.
If you see this paragraph, your browser does not support those standards and you
need to upgrade. Visit WaSP
for a variety of options.
Paste #63
Posted by: NORTH_archive
Posted on: 2025-10-27 22:23:23
Age: 29 days ago
Views: 27
import socket
import threading
import re
import time
import base64
import os
from Crypto.PublicKey import RSA
from Crypto.Cipher import PKCS1_OAEP
SERVER_ADDR = ("147.185.221.19", 42439)
KEYFILE_PRIV = "dm_priv.pem"
KEYFILE_PUB = "dm_pub.pem"
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client_socket.connect(SERVER_ADDR)
stop_event = threading.Event()
def ensure_keys():
if os.path.exists(KEYFILE_PRIV) and os.path.exists(KEYFILE_PUB):
with open(KEYFILE_priv := KEYFILE_PRIV, "rb") as f:
priv_pem = f.read()
with open(KEYFILE_pub := KEYFILE_PUB, "rb") as f:
pub_pem = f.read()
priv = RSA.import_key(priv_pem)
pub = RSA.import_key(pub_pem)
else:
key = RSA.generate(2048)
priv = key
pub = key.publickey()
with open(KEYFILE_PRIV, "wb") as f:
f.write(priv.export_key())
with open(KEYFILE_PUB, "wb") as f:
f.write(pub.export_key())
return priv, pub
priv_key, pub_key = ensure_keys()
pub_pem = pub_key.export_key()
def rsa_encrypt(pub_pem_bytes, message: str) -> str:
pub = RSA.import_key(pub_pem_bytes)
cipher = PKCS1_OAEP.new(pub)
ct = cipher.encrypt(message.encode("utf-8"))
return base64.b64encode(ct).decode("ascii")
def rsa_decrypt(priv_key_obj, b64cipher: str) -> str:
cipher = PKCS1_OAEP.new(priv_key_obj)
pt = cipher.decrypt(base64.b64decode(b64cipher))
return pt.decode("utf-8")
peers_pubkeys = {}
pending_msgs = {}
peers_e2ee_support = {}
last_send_time = 0.0
send_lock = threading.Lock()
def send_raw(data: bytes):
global last_send_time
with send_lock:
now = time.time()
elapsed = now - last_send_time
if elapsed < 0.2:
time.sleep(0.2 - elapsed)
try:
client_socket.send(data)
except Exception as e:
print("Send failed:", e)
stop_event.set()
last_send_time = time.time()
def send_text(text: str):
send_raw(text.encode("utf-8"))
def send_pubkey_to(user: str):
payload = base64.b64encode(pub_pem).decode("ascii")
send_text(f"/pm {user} PUBKEY:{payload}")
def send_encrypted_pm(user: str, plaintext: str):
pub = peers_pubkeys.get(user)
if not pub:
pending_msgs.setdefault(user, []).append(plaintext)
send_pubkey_to(user)
print(f"[LOG] Queued message to {user}, sent our PUBKEY and waiting for theirs")
return
try:
ct_b64 = rsa_encrypt(pub, plaintext)
except Exception as e:
print("Encryption failed:", e)
return
send_text(f"/pm {user} ENC:{ct_b64}")
def flush_pending_for(user: str):
pending = pending_msgs.pop(user, [])
for msg in pending:
send_encrypted_pm(user, msg)
private_re = re.compile(r'^\(Private\)\s+(\w+):\s*(.+)$')
def handle_private_message(sender: str, payload: str):
payload = payload.strip()
if payload == "E2EE_CHECK":
send_text(f"/pm {sender} E2EE_OK")
peers_e2ee_support[sender] = True
print(f"[E2EE] Responded to E2EE_CHECK from {sender}")
flush_pending_for(sender)
return
if payload == "E2EE_OK":
peers_e2ee_support[sender] = True
print(f"[E2EE] {sender} supports end-to-end encryption")
flush_pending_for(sender)
return
if payload.startswith("PUBKEY:"):
b64 = payload[len("PUBKEY:"):].strip()
try:
pem = base64.b64decode(b64)
peers_pubkeys[sender] = pem
print(f"[key] saved public key for {sender}")
send_pubkey_to(sender)
flush_pending_for(sender)
except Exception as e:
print("Failed to process PUBKEY from", sender, e)
return
if payload.startswith("ENC:"):
if not peers_e2ee_support.get(sender, False):
print(f"[WARN] Received encrypted message from {sender}, but E2EE not confirmed. Ignored.")
return
b64 = payload[len("ENC:"):].strip()
try:
pt = rsa_decrypt(priv_key, b64)
print(f"(Private) {sender}: {pt}")
except Exception as e:
print(f"(Private) {sender}: <failed to decrypt> ({e})")
return
if peers_e2ee_support.get(sender, None) is True:
print(f"[E2EE] {sender} sent unencrypted message. Switching to open chat.")
peers_e2ee_support[sender] = False
print(f"(Private) {sender}: {payload}")
def send_encrypted_pm(user: str, plaintext: str):
support = peers_e2ee_support.get(user)
if support is None:
peers_e2ee_support[user] = False
send_text(f"/pm {user} E2EE_CHECK")
pending_msgs.setdefault(user, []).append(plaintext)
print(f"[E2EE] Initiated E2EE handshake with {user}, message queued")
return
if support:
pub = peers_pubkeys.get(user)
if not pub:
pending_msgs.setdefault(user, []).append(plaintext)
send_pubkey_to(user)
print(f"[local] queued message to {user}, sent PUBKEY, waiting for theirs")
return
try:
ct_b64 = rsa_encrypt(pub, plaintext)
except Exception as e:
print("Encryption failed:", e)
return
send_text(f"/pm {user} ENC:{ct_b64}")
else:
send_text(f"/pm {user} {plaintext}")
def receive_messages():
while not stop_event.is_set():
try:
data = client_socket.recv(32768)
except Exception as e:
print("Receive error:", e)
stop_event.set()
break
if not data:
print("Connection closed by server")
stop_event.set()
break
try:
message = data.decode('utf-8', errors='ignore').strip()
except:
message = data.decode('latin1', errors='ignore').strip()
if not message:
continue
msg = message.replace("*Ping!*", "").strip()
if not msg:
continue
m = private_re.match(msg)
if m:
sender, payload = m.group(1), m.group(2)
handle_private_message(sender, payload)
else:
print(msg)
def send_keepalive():
while not stop_event.is_set():
time.sleep(5)
try:
send_raw(b"/")
except:
break
def start_receiving_thread():
receive_thread = threading.Thread(target=receive_messages, daemon=True)
receive_thread.start()
def start_keepalive_thread():
t = threading.Thread(target=send_keepalive, daemon=True)
t.start()
start_receiving_thread()
start_keepalive_thread()
try:
while True:
line = input()
if not line:
continue
line = line.strip()
pm_match = re.match(r'^/pm\s+(\w+)\s+(.+)$', line)
if pm_match:
user = pm_match.group(1)
message = pm_match.group(2)
send_encrypted_pm(user, message)
continue
send_text(line)
except KeyboardInterrupt:
print("Exiting...")
finally:
stop_event.set()
try:
client_socket.close()
except:
pass
Download raw |
Create new paste