Zdalne-systemy-z-kit-uVPN/utils.py

101 lines
2.3 KiB
Python

import datetime
import hashlib
import secrets
import string
import random
import os
import subprocess
import threading
from time import sleep
import db
import config
import ipaddress
from os import chmod
from Crypto.PublicKey import RSA
DELETE_TIMEOUT = 45
RESTART_DELETE_THREAD = 3
def generate_random_string(length):
letters = string.ascii_letters
random_string = ''.join(random.choice(letters) for i in range(length))
return random_string
def hash_password(password):
return hashlib.sha512(password.encode('utf-8')).hexdigest()
def generate_auth_token():
return secrets.token_urlsafe(32)
def ping_client(ip):
response = os.system("ping -c 1 " + ip + "> /dev/null")
if response == 0:
return True
else:
return False
def ssh_thread_function():
subprocess.run(['wssh', '--fbidhttp=False', '--port='+config.webssh_port])
def check_allocation_thread_function():
while True:
ids = db.get_image_allocation_all()
if ids is not None:
for x in ids:
ip = db.get_image_allocation_clientip_id_vpn(x[0])
ping_thread = PingThread(ip, x[0])
ping_thread.start()
sleep(RESTART_DELETE_THREAD)
class PingThread(threading.Thread):
def __init__(self, ip, id):
super(PingThread, self).__init__()
self.Ip = ip
self.Id = id
def run(self):
if self.Ip is None:
return
if ping_client(self.Ip) == False:
date = db.get_image_allocation_time_id(self.Id)
if date is None:
return
delta = datetime.datetime.utcnow() - date
if delta.total_seconds() > DELETE_TIMEOUT:
db.del_image_allocation_id(self.Id)
print("deleted:", self.Id)
else:
db.update_image_allocation_time(self.Id)
def init_threads():
ssh_thread = threading.Thread(target=ssh_thread_function)
ssh_thread.start()
allocation_thread = threading.Thread(
target=check_allocation_thread_function)
allocation_thread.start()
def is_valid_ip_address(ip: str) -> bool:
try:
ipaddress.IPv4Address(ip)
return True
except ipaddress.AddressValueError:
try:
ipaddress.IPv6Address(ip)
return True
except:
pass
return False