Se ha cambiado la base de datos a Postgres SQL, además se han realizado algunos cambios de lógica.
137 lines
4.4 KiB
Python
137 lines
4.4 KiB
Python
import os
|
|
import psycopg2
|
|
import csv
|
|
import chardet
|
|
import re
|
|
import json
|
|
|
|
# ------------------------------------------------------------
|
|
# CONFIGURACIÓN DE CONEXIÓN
|
|
# ------------------------------------------------------------
|
|
DB_CONFIG = {
|
|
"host": "localhost",
|
|
"port": 5433,
|
|
"user": "postgres",
|
|
"password": "postgres",
|
|
"dbname": "adicciones"
|
|
}
|
|
|
|
DATASETS_DIR = "datasets"
|
|
OUTPUT_FILE = "columnas_info.json"
|
|
|
|
|
|
def sanitize_name(name: str) -> str:
|
|
"""Limpia nombres de columnas para SQL."""
|
|
name = name.strip().lower()
|
|
name = re.sub(r"[^a-z0-9_]+", "_", name)
|
|
return name[:50]
|
|
|
|
|
|
def detect_encoding(file_path):
|
|
"""Detecta codificación del archivo con chardet."""
|
|
with open(file_path, 'rb') as f:
|
|
raw = f.read(10000)
|
|
result = chardet.detect(raw)
|
|
return result['encoding'] or 'utf-8'
|
|
|
|
|
|
def normalize_encoding(enc: str) -> str:
|
|
"""Convierte nombres de encoding a formato válido para PostgreSQL."""
|
|
if not enc:
|
|
return "UTF8"
|
|
enc = enc.upper().replace("-", "").replace("_", "")
|
|
if "UTF" in enc:
|
|
return "UTF8"
|
|
if "LATIN" in enc or "ISO88591" in enc:
|
|
return "LATIN1"
|
|
if "1252" in enc or "WINDOWS" in enc:
|
|
return "WIN1252"
|
|
return "UTF8" # fallback por defecto
|
|
|
|
|
|
def detect_delimiter(file_path, encoding):
|
|
"""Detecta el delimitador probable."""
|
|
with open(file_path, 'r', encoding=encoding, errors='ignore') as f:
|
|
sample = f.read(4096)
|
|
try:
|
|
dialect = csv.Sniffer().sniff(sample, delimiters=";,|\t")
|
|
return dialect.delimiter
|
|
except csv.Error:
|
|
if ";" in sample:
|
|
return ";"
|
|
elif "\t" in sample:
|
|
return "\t"
|
|
elif "|" in sample:
|
|
return "|"
|
|
else:
|
|
return ","
|
|
|
|
|
|
def main():
|
|
conn = psycopg2.connect(**DB_CONFIG)
|
|
cur = conn.cursor()
|
|
columnas_info = {}
|
|
|
|
for filename in os.listdir(DATASETS_DIR):
|
|
if not filename.endswith(".csv"):
|
|
continue
|
|
|
|
path = os.path.join(DATASETS_DIR, filename)
|
|
table_name = sanitize_name(os.path.splitext(filename)[0])
|
|
|
|
print(f"\n📊 Procesando: {filename} → tabla '{table_name}'")
|
|
|
|
encoding_detected = detect_encoding(path)
|
|
pg_encoding = normalize_encoding(encoding_detected)
|
|
delimiter = detect_delimiter(path, encoding_detected)
|
|
|
|
print(f" → detectado: delimitador '{delimiter}' | codificación '{encoding_detected}' (→ {pg_encoding})")
|
|
|
|
# Leer encabezados manualmente
|
|
with open(path, 'r', encoding=encoding_detected, errors='ignore') as f:
|
|
reader = csv.reader(f, delimiter=delimiter)
|
|
headers = next(reader)
|
|
headers = [sanitize_name(h) for h in headers]
|
|
|
|
columnas_info[table_name] = headers
|
|
|
|
# Crear tabla limpia
|
|
cur.execute(f"DROP TABLE IF EXISTS {table_name} CASCADE;")
|
|
cols_sql = ", ".join([f"{col} TEXT" for col in headers])
|
|
cur.execute(f"CREATE TABLE {table_name} ({cols_sql});")
|
|
conn.commit()
|
|
|
|
# Intentar importar con encoding normalizado
|
|
try:
|
|
with open(path, 'r', encoding=encoding_detected, errors='ignore') as f:
|
|
next(f) # saltar encabezado
|
|
cur.copy_expert(
|
|
sql=f"COPY {table_name} ({', '.join(headers)}) FROM STDIN WITH (FORMAT CSV, DELIMITER '{delimiter}', NULL '', HEADER FALSE, ENCODING '{pg_encoding}');",
|
|
file=f
|
|
)
|
|
conn.commit()
|
|
except Exception as e:
|
|
print(f"⚠️ Error al copiar con '{pg_encoding}': {e}")
|
|
print(" → Reintentando con ENCODING 'UTF8'")
|
|
with open(path, 'r', encoding='utf-8', errors='ignore') as f:
|
|
next(f)
|
|
cur.copy_expert(
|
|
sql=f"COPY {table_name} ({', '.join(headers)}) FROM STDIN WITH (FORMAT CSV, DELIMITER '{delimiter}', NULL '', HEADER FALSE, ENCODING 'UTF8');",
|
|
file=f
|
|
)
|
|
conn.commit()
|
|
|
|
print(f"✅ Tabla '{table_name}' importada correctamente ({len(headers)} columnas).")
|
|
|
|
cur.close()
|
|
conn.close()
|
|
|
|
with open(OUTPUT_FILE, "w", encoding="utf-8") as f:
|
|
json.dump(columnas_info, f, indent=4, ensure_ascii=False)
|
|
|
|
print(f"\n✅ Importación completada. Estructuras guardadas en '{OUTPUT_FILE}'")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|