#!/usr/bin/env python3
import os
import socket
import sqlite3
import threading
import time
import queue
from datetime import datetime
from typing import Set, Tuple, List
from concurrent.futures import ThreadPoolExecutor
from contextlib import asynccontextmanager
try:
from dnslib import DNSRecord, DNSHeader, RR, QTYPE, A, AAAA, RCODE
from fastapi import FastAPI, UploadFile, File, Response
from fastapi.responses import JSONResponse, HTMLResponse
import uvicorn
except ImportError as e:
print(f"Error: Missing dependency. Install it with: pip install dnslib fastapi uvicorn python-multipart")
exit(1)
DB_PATH = "dns_data.db"
WHITELIST_FILE = "whitelist.txt"
WHITELIST_DENY_FILE = "whitelist_deny.txt"
BLACKLIST_FILE = "blacklist.txt"
MODE_FILE = "mode.txt"
HOST_IP = "0.0.0.0"
DNS_PORT = 53
API_PORT = 8009
MAX_WORKERS = 10
DNS_TIMEOUT = 2.0
CACHE_TTL = 300
MAX_CACHE_SIZE = 2000
UPSTREAM_SERVERS = [
("94.140.14.14", 53, "AdGuard Default"),
("8.8.8.8", 53, "Google DNS")
]
POPULAR_DOMAINS = [
"google.com", "yandex.ru", "vk.com", "mail.ru", "github.com", "telegram.org"
]
WHITELIST_SET: Set[str] = set()
WHITELIST_DENY_SET: Set[str] = set()
BLACKLIST_SET: Set[str] = set()
DATA_LOCK = threading.Lock()
LOG_QUEUE = queue.Queue()
HTML_DASHBOARD = """
DNS Guard V2
🛡 DNS Guard V2 Online
Режим
...
"""
def init_db():
conn = sqlite3.connect(DB_PATH)
conn.execute("PRAGMA journal_mode=WAL;")
cur = conn.cursor()
cur.execute("CREATE TABLE IF NOT EXISTS logs(id INTEGER PRIMARY KEY AUTOINCREMENT, ts TEXT, src_ip TEXT, qname TEXT, action TEXT, mode TEXT, latency_ms REAL, upstream TEXT);")
cur.execute("CREATE TABLE IF NOT EXISTS cache(domain TEXT PRIMARY KEY, rtype INTEGER, rdata BLOB, expires_at REAL);")
conn.commit()
conn.close()
def db_writer_worker():
conn = sqlite3.connect(DB_PATH, check_same_thread=False)
conn.execute("PRAGMA journal_mode=WAL;")
cur = conn.cursor()
while True:
try:
records = [LOG_QUEUE.get()]
while len(records) < 50:
try: records.append(LOG_QUEUE.get_nowait())
except queue.Empty: break
if records:
cur.executemany("INSERT INTO logs(ts, src_ip, qname, action, mode, latency_ms, upstream) VALUES(?,?,?,?,?,?,?)", records)
conn.commit()
for _ in records: LOG_QUEUE.task_done()
except Exception as e:
print(f"DB Error: {e}")
time.sleep(1)
def enqueue_log(src_ip, qname, action, mode, start_time, upstream):
latency = (time.time() - start_time) * 1000
ts = datetime.now().isoformat()
LOG_QUEUE.put((ts, src_ip, qname, action, mode, latency, upstream))
def get_cache(domain: str, qtype: int):
try:
conn = sqlite3.connect(DB_PATH)
cur = conn.cursor()
cur.execute("SELECT rdata FROM cache WHERE domain=? AND rtype=? AND expires_at > ?", (domain, qtype, time.time()))
row = cur.fetchone()
conn.close()
return row[0] if row else None
except: return None
def set_cache(domain: str, qtype: int, rdata: bytes):
try:
conn = sqlite3.connect(DB_PATH)
conn.execute("INSERT OR REPLACE INTO cache VALUES(?,?,?,?)", (domain, qtype, rdata, time.time() + CACHE_TTL))
conn.commit()
conn.close()
except: pass
def load_file_to_set(filepath: str) -> Set[str]:
res = set()
if os.path.exists(filepath):
with open(filepath, "r", encoding="utf-8", errors='ignore') as f:
for line in f:
d = line.strip().lower()
if d and not d.startswith("#"): res.add(d)
return res
def reload_lists():
global WHITELIST_SET, BLACKLIST_SET, WHITELIST_DENY_SET
with DATA_LOCK:
WHITELIST_SET = load_file_to_set(WHITELIST_FILE)
BLACKLIST_SET = load_file_to_set(BLACKLIST_FILE)
WHITELIST_DENY_SET = load_file_to_set(WHITELIST_DENY_FILE)
print(f"Lists loaded: BL={len(BLACKLIST_SET)}")
def get_mode():
try:
with open(MODE_FILE, "r") as f: return f.read().strip().upper() or "NORMAL"
except: return "NORMAL"
def set_mode_file(mode):
with open(MODE_FILE, "w") as f: f.write(mode)
def is_domain_in_set(domain: str, target_set: Set[str]) -> bool:
if domain in target_set: return True
parts = domain.split('.')
for i in range(len(parts)):
if ".".join(parts[i:]) in target_set: return True
return False
def forward_dns(data) -> Tuple[bytes, str]:
for ip, port, name in UPSTREAM_SERVERS:
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.settimeout(DNS_TIMEOUT)
sock.sendto(data, (ip, port))
resp, _ = sock.recvfrom(4096)
sock.close()
return resp, name
except: continue
return None, "NONE"
def dns_handler(sock, data, addr):
start_time = time.time()
try: request = DNSRecord.parse(data)
except: return
if not request.questions: return
code
Code
download
content_copy
expand_less
qname = str(request.q.qname).lower().rstrip('.')
qtype = request.q.qtype
cached = get_cache(qname, qtype)
if cached:
try:
reply = DNSRecord.parse(cached)
reply.header.id = request.header.id
sock.sendto(reply.pack(), addr)
enqueue_log(addr[0], qname, "CACHE", get_mode(), start_time, "LOCAL_CACHE")
return
except: pass
mode = get_mode()
action, block_it, up_name = "FORWARD", False, "UNKNOWN"
with DATA_LOCK:
if mode == "WHITELIST":
if is_domain_in_set(qname, WHITELIST_SET): action = "ALLOWED_WL"
else: block_it, action = True, "BLOCKED_WL"
else:
if is_domain_in_set(qname, WHITELIST_SET): action = "ALLOWED_WL"
elif is_domain_in_set(qname, BLACKLIST_SET): block_it, action = True, "BLOCKED_BL"
if block_it:
reply = DNSRecord(DNSHeader(id=request.header.id, qr=1, aa=1, ra=1), q=request.q)
reply.add_answer(RR(rname=request.q.qname, rtype=QTYPE.A, rdata=A("0.0.0.0"), ttl=60))
reply_data = reply.pack()
else:
resp, up_name = forward_dns(data)
if resp:
reply_data = resp
set_cache(qname, qtype, resp)
else:
reply = DNSRecord(DNSHeader(id=request.header.id, qr=1, rcode=RCODE.SERVFAIL), q=request.q)
reply_data = reply.pack()
action = "FAIL"
try: sock.sendto(reply_data, addr)
except: pass
enqueue_log(addr[0], qname, action, mode, start_time, up_name)
def dns_server_loop():
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind((HOST_IP, DNS_PORT))
print(f"DNS Server on port {DNS_PORT}...")
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as pool:
while True:
try:
data, addr = sock.recvfrom(4096)
pool.submit(dns_handler, sock, data, addr)
except: pass
@asynccontextmanager
async def lifespan(app: FastAPI):
init_db()
reload_lists()
threading.Thread(target=db_writer_worker, daemon=True).start()
threading.Thread(target=dns_server_loop, daemon=True).start()
yield
app = FastAPI(lifespan=lifespan)
@app.get("/", response_class=HTMLResponse)
def index():
return HTML_DASHBOARD
@app.get("/stats")
def stats():
conn = sqlite3.connect(DB_PATH)
cache_count = conn.execute("SELECT count(*) FROM cache").fetchone()[0]
conn.close()
return {
"mode": get_mode(),
"blacklist": len(BLACKLIST_SET),
"cache_size": cache_count
}
@app.get("/logs/new")
def get_logs(since_id: int = 0):
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
rows = conn.execute("SELECT * FROM logs WHERE id > ? ORDER BY id DESC LIMIT 50", (since_id,)).fetchall()
conn.close()
if not rows:
return Response(status_code=204)
return [dict(r) for r in rows]
@app.post("/mode/{mode_name}")
def change_mode(mode_name: str):
m = mode_name.upper()
if m in ["NORMAL", "STRICT", "WHITELIST"]:
set_mode_file(m)
return {"status": "ok"}
return Response(status_code=400)
if name == "main":
SSL_KEY = "/etc/letsencrypt/live/cid.h4ck.me/privkey.pem"
SSL_CERT = "/etc/letsencrypt/live/cid.h4ck.me/fullchain.pem"
code
Code
download
content_copy
expand_less
use_ssl = os.path.exists(SSL_KEY) and os.path.exists(SSL_CERT)
uvicorn.run(
app,
host="0.0.0.0",
port=API_PORT,
ssl_keyfile=SSL_KEY if use_ssl else None,
ssl_certfile=SSL_CERT if use_ssl else None
)