import os import psycopg2 import csv import chardet import re import json # ------------------------------------------------------------ # CONFIGURACIÓN DE CONEXIÓN # ------------------------------------------------------------ DB_CONFIG = { "host": "localhost", "port": 5433, "user": "postgres", "password": "postgres", "dbname": "adicciones" } DATASETS_DIR = "datasets" OUTPUT_FILE = "columnas_info.json" def sanitize_name(name: str) -> str: """Limpia nombres de columnas para SQL.""" name = name.strip().lower() name = re.sub(r"[^a-z0-9_]+", "_", name) return name[:50] def detect_encoding(file_path): """Detecta codificación del archivo con chardet.""" with open(file_path, 'rb') as f: raw = f.read(10000) result = chardet.detect(raw) return result['encoding'] or 'utf-8' def normalize_encoding(enc: str) -> str: """Convierte nombres de encoding a formato válido para PostgreSQL.""" if not enc: return "UTF8" enc = enc.upper().replace("-", "").replace("_", "") if "UTF" in enc: return "UTF8" if "LATIN" in enc or "ISO88591" in enc: return "LATIN1" if "1252" in enc or "WINDOWS" in enc: return "WIN1252" return "UTF8" # fallback por defecto def detect_delimiter(file_path, encoding): """Detecta el delimitador probable.""" with open(file_path, 'r', encoding=encoding, errors='ignore') as f: sample = f.read(4096) try: dialect = csv.Sniffer().sniff(sample, delimiters=";,|\t") return dialect.delimiter except csv.Error: if ";" in sample: return ";" elif "\t" in sample: return "\t" elif "|" in sample: return "|" else: return "," def main(): conn = psycopg2.connect(**DB_CONFIG) cur = conn.cursor() columnas_info = {} for filename in os.listdir(DATASETS_DIR): if not filename.endswith(".csv"): continue path = os.path.join(DATASETS_DIR, filename) table_name = sanitize_name(os.path.splitext(filename)[0]) print(f"\n📊 Procesando: {filename} → tabla '{table_name}'") encoding_detected = detect_encoding(path) pg_encoding = normalize_encoding(encoding_detected) delimiter = detect_delimiter(path, encoding_detected) print(f" → detectado: delimitador '{delimiter}' | codificación '{encoding_detected}' (→ {pg_encoding})") # Leer encabezados manualmente with open(path, 'r', encoding=encoding_detected, errors='ignore') as f: reader = csv.reader(f, delimiter=delimiter) headers = next(reader) headers = [sanitize_name(h) for h in headers] columnas_info[table_name] = headers # Crear tabla limpia cur.execute(f"DROP TABLE IF EXISTS {table_name} CASCADE;") cols_sql = ", ".join([f"{col} TEXT" for col in headers]) cur.execute(f"CREATE TABLE {table_name} ({cols_sql});") conn.commit() # Intentar importar con encoding normalizado try: with open(path, 'r', encoding=encoding_detected, errors='ignore') as f: next(f) # saltar encabezado cur.copy_expert( sql=f"COPY {table_name} ({', '.join(headers)}) FROM STDIN WITH (FORMAT CSV, DELIMITER '{delimiter}', NULL '', HEADER FALSE, ENCODING '{pg_encoding}');", file=f ) conn.commit() except Exception as e: print(f"⚠️ Error al copiar con '{pg_encoding}': {e}") print(" → Reintentando con ENCODING 'UTF8'") with open(path, 'r', encoding='utf-8', errors='ignore') as f: next(f) cur.copy_expert( sql=f"COPY {table_name} ({', '.join(headers)}) FROM STDIN WITH (FORMAT CSV, DELIMITER '{delimiter}', NULL '', HEADER FALSE, ENCODING 'UTF8');", file=f ) conn.commit() print(f"✅ Tabla '{table_name}' importada correctamente ({len(headers)} columnas).") cur.close() conn.close() with open(OUTPUT_FILE, "w", encoding="utf-8") as f: json.dump(columnas_info, f, indent=4, ensure_ascii=False) print(f"\n✅ Importación completada. Estructuras guardadas en '{OUTPUT_FILE}'") if __name__ == "__main__": main()