¿Qué es SQLite?
SQLite es una base de datos ligera, rápida y serverless que se ejecuta completamente en un archivo local. Es perfecta para aplicaciones pequeñas y medianas, prototipos y aprendizaje.
Ventajas de SQLite
- Sin configuración ni servidor
- Base de datos en un solo archivo
- Multiplataforma
- Incluido en Python por defecto
- Rápido para aplicaciones pequeñas/medianas
- Ideal para desarrollo y prototipos
Conexión Básica
import sqlite3
# Conectar a base de datos (la crea si no existe)
conexion = sqlite3.connect('mi_base_datos.db')
# Crear cursor
cursor = conexion.cursor()
# Ejecutar consultas...
# Cerrar conexión
conexion.close()
# Mejor práctica: usar context manager
with sqlite3.connect('mi_base_datos.db') as conexion:
cursor = conexion.cursor()
# Ejecutar consultas
# Se cierra automáticamenteCrear Tablas
import sqlite3
conexion = sqlite3.connect('usuarios.db')
cursor = conexion.cursor()
# Crear tabla
cursor.execute('''
CREATE TABLE IF NOT EXISTS usuarios (
id INTEGER PRIMARY KEY AUTOINCREMENT,
nombre TEXT NOT NULL,
email TEXT UNIQUE NOT NULL,
edad INTEGER,
fecha_registro TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
conexion.commit()
conexion.close()Insertar Datos
Insertar un Registro
import sqlite3
conexion = sqlite3.connect('usuarios.db')
cursor = conexion.cursor()
# Forma 1: Con valores directos (no recomendado)
cursor.execute(
"INSERT INTO usuarios (nombre, email, edad) VALUES ('Ana', 'ana@email.com', 25)"
)
# Forma 2: Con placeholders (recomendado - previene SQL injection)
cursor.execute(
"INSERT INTO usuarios (nombre, email, edad) VALUES (?, ?, ?)",
('Luis', 'luis@email.com', 30)
)
# Forma 3: Con diccionario
cursor.execute(
"INSERT INTO usuarios (nombre, email, edad) VALUES (:nombre, :email, :edad)",
{'nombre': 'María', 'email': 'maria@email.com', 'edad': 28}
)
conexion.commit()
conexion.close()Insertar Múltiples Registros
conexion = sqlite3.connect('usuarios.db')
cursor = conexion.cursor()
usuarios = [
('Carlos', 'carlos@email.com', 35),
('Elena', 'elena@email.com', 27),
('Pedro', 'pedro@email.com', 32)
]
cursor.executemany(
"INSERT INTO usuarios (nombre, email, edad) VALUES (?, ?, ?)",
usuarios
)
conexion.commit()
print(f"{cursor.rowcount} usuarios insertados")
conexion.close()Consultar Datos (SELECT)
Seleccionar Todos
conexion = sqlite3.connect('usuarios.db')
cursor = conexion.cursor()
# Obtener todos los registros
cursor.execute("SELECT * FROM usuarios")
resultados = cursor.fetchall()
for usuario in resultados:
print(usuario)
# (1, 'Ana', 'ana@email.com', 25, '2025-10-30 10:00:00')
conexion.close()Seleccionar Uno
cursor.execute("SELECT * FROM usuarios WHERE id = ?", (1,))
usuario = cursor.fetchone()
print(usuario)Seleccionar con Límite
# Primeros 5 usuarios
cursor.execute("SELECT * FROM usuarios LIMIT 5")
resultados = cursor.fetchmany(5)
for usuario in resultados:
print(usuario)Row Factory - Resultados como Diccionarios
conexion = sqlite3.connect('usuarios.db')
conexion.row_factory = sqlite3.Row
cursor = conexion.cursor()
cursor.execute("SELECT * FROM usuarios")
resultados = cursor.fetchall()
for usuario in resultados:
# Acceder por nombre de columna
print(f"Nombre: {usuario['nombre']}, Email: {usuario['email']}")
conexion.close()Filtrar y Ordenar
# WHERE - Filtrar
cursor.execute("SELECT * FROM usuarios WHERE edad > ?", (25,))
# AND
cursor.execute(
"SELECT * FROM usuarios WHERE edad > ? AND nombre LIKE ?",
(25, 'A%')
)
# OR
cursor.execute(
"SELECT * FROM usuarios WHERE edad < ? OR edad > ?",
(25, 35)
)
# IN
cursor.execute(
"SELECT * FROM usuarios WHERE nombre IN (?, ?, ?)",
('Ana', 'Luis', 'María')
)
# ORDER BY
cursor.execute("SELECT * FROM usuarios ORDER BY edad DESC")
# LIMIT y OFFSET (paginación)
cursor.execute("SELECT * FROM usuarios LIMIT 10 OFFSET 20")Actualizar Datos (UPDATE)
conexion = sqlite3.connect('usuarios.db')
cursor = conexion.cursor()
# Actualizar un registro
cursor.execute(
"UPDATE usuarios SET edad = ? WHERE id = ?",
(26, 1)
)
# Actualizar múltiples campos
cursor.execute(
"UPDATE usuarios SET nombre = ?, email = ? WHERE id = ?",
('Ana García', 'ana.garcia@email.com', 1)
)
# Actualizar con condición
cursor.execute(
"UPDATE usuarios SET edad = edad + 1 WHERE edad < ?",
(30,)
)
conexion.commit()
print(f"{cursor.rowcount} registros actualizados")
conexion.close()Eliminar Datos (DELETE)
conexion = sqlite3.connect('usuarios.db')
cursor = conexion.cursor()
# Eliminar un registro
cursor.execute("DELETE FROM usuarios WHERE id = ?", (1,))
# Eliminar con condición
cursor.execute("DELETE FROM usuarios WHERE edad < ?", (18,))
# Eliminar todos (¡cuidado!)
# cursor.execute("DELETE FROM usuarios")
conexion.commit()
print(f"{cursor.rowcount} registros eliminados")
conexion.close()Funciones Agregadas
cursor = conexion.cursor()
# COUNT - Contar registros
cursor.execute("SELECT COUNT(*) FROM usuarios")
total = cursor.fetchone()[0]
print(f"Total usuarios: {total}")
# SUM - Sumar
cursor.execute("SELECT SUM(edad) FROM usuarios")
total_edades = cursor.fetchone()[0]
# AVG - Promedio
cursor.execute("SELECT AVG(edad) FROM usuarios")
promedio_edad = cursor.fetchone()[0]
print(f"Edad promedio: {promedio_edad:.2f}")
# MAX y MIN
cursor.execute("SELECT MAX(edad), MIN(edad) FROM usuarios")
max_edad, min_edad = cursor.fetchone()
# GROUP BY
cursor.execute("""
SELECT edad, COUNT(*) as cantidad
FROM usuarios
GROUP BY edad
ORDER BY cantidad DESC
""")
resultados = cursor.fetchall()Joins - Relacionar Tablas
# Crear tabla de pedidos
cursor.execute('''
CREATE TABLE IF NOT EXISTS pedidos (
id INTEGER PRIMARY KEY AUTOINCREMENT,
usuario_id INTEGER,
producto TEXT,
precio REAL,
fecha TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (usuario_id) REFERENCES usuarios(id)
)
''')
# INNER JOIN
cursor.execute("""
SELECT usuarios.nombre, pedidos.producto, pedidos.precio
FROM usuarios
INNER JOIN pedidos ON usuarios.id = pedidos.usuario_id
""")
# LEFT JOIN
cursor.execute("""
SELECT usuarios.nombre, pedidos.producto
FROM usuarios
LEFT JOIN pedidos ON usuarios.id = pedidos.usuario_id
""")
resultados = cursor.fetchall()Transacciones
conexion = sqlite3.connect('usuarios.db')
cursor = conexion.cursor()
try:
# Iniciar transacción (automático en Python)
cursor.execute(
"INSERT INTO usuarios (nombre, email, edad) VALUES (?, ?, ?)",
('Test', 'test@email.com', 25)
)
# Más operaciones...
cursor.execute(
"UPDATE usuarios SET edad = ? WHERE nombre = ?",
(26, 'Test')
)
# Confirmar cambios
conexion.commit()
print("Transacción completada")
except sqlite3.Error as e:
# Revertir cambios en caso de error
conexion.rollback()
print(f"Error: {e}")
finally:
conexion.close()Clase Base de Datos Reutilizable
import sqlite3
from contextlib import contextmanager
class BaseDatos:
def __init__(self, nombre_db):
self.nombre_db = nombre_db
@contextmanager
def obtener_conexion(self):
conexion = sqlite3.connect(self.nombre_db)
conexion.row_factory = sqlite3.Row
try:
yield conexion
finally:
conexion.close()
def ejecutar_query(self, query, parametros=()):
with self.obtener_conexion() as conexion:
cursor = conexion.cursor()
cursor.execute(query, parametros)
conexion.commit()
return cursor.rowcount
def obtener_uno(self, query, parametros=()):
with self.obtener_conexion() as conexion:
cursor = conexion.cursor()
cursor.execute(query, parametros)
return cursor.fetchone()
def obtener_todos(self, query, parametros=()):
with self.obtener_conexion() as conexion:
cursor = conexion.cursor()
cursor.execute(query, parametros)
return cursor.fetchall()
# Usar la clase
db = BaseDatos('usuarios.db')
# Insertar
db.ejecutar_query(
"INSERT INTO usuarios (nombre, email, edad) VALUES (?, ?, ?)",
('Test', 'test@email.com', 25)
)
# Consultar
usuarios = db.obtener_todos("SELECT * FROM usuarios WHERE edad > ?", (25,))
for usuario in usuarios:
print(usuario['nombre'])Modelo ORM Simple
class Usuario:
def __init__(self, nombre, email, edad, id=None):
self.id = id
self.nombre = nombre
self.email = email
self.edad = edad
@staticmethod
def crear_tabla():
with sqlite3.connect('usuarios.db') as conn:
conn.execute('''
CREATE TABLE IF NOT EXISTS usuarios (
id INTEGER PRIMARY KEY AUTOINCREMENT,
nombre TEXT NOT NULL,
email TEXT UNIQUE NOT NULL,
edad INTEGER
)
''')
def guardar(self):
with sqlite3.connect('usuarios.db') as conn:
cursor = conn.cursor()
if self.id is None:
cursor.execute(
"INSERT INTO usuarios (nombre, email, edad) VALUES (?, ?, ?)",
(self.nombre, self.email, self.edad)
)
self.id = cursor.lastrowid
else:
cursor.execute(
"UPDATE usuarios SET nombre=?, email=?, edad=? WHERE id=?",
(self.nombre, self.email, self.edad, self.id)
)
conn.commit()
@staticmethod
def obtener_todos():
with sqlite3.connect('usuarios.db') as conn:
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute("SELECT * FROM usuarios")
return [Usuario(
nombre=row['nombre'],
email=row['email'],
edad=row['edad'],
id=row['id']
) for row in cursor.fetchall()]
@staticmethod
def obtener_por_id(id):
with sqlite3.connect('usuarios.db') as conn:
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute("SELECT * FROM usuarios WHERE id = ?", (id,))
row = cursor.fetchone()
if row:
return Usuario(
nombre=row['nombre'],
email=row['email'],
edad=row['edad'],
id=row['id']
)
return None
def eliminar(self):
with sqlite3.connect('usuarios.db') as conn:
conn.execute("DELETE FROM usuarios WHERE id = ?", (self.id,))
conn.commit()
# Usar el modelo
Usuario.crear_tabla()
# Crear usuario
usuario = Usuario('Ana', 'ana@email.com', 25)
usuario.guardar()
print(f"Usuario creado con ID: {usuario.id}")
# Obtener todos
usuarios = Usuario.obtener_todos()
for u in usuarios:
print(f"{u.nombre} - {u.email}")
# Obtener uno
usuario = Usuario.obtener_por_id(1)
if usuario:
print(f"Usuario encontrado: {usuario.nombre}")
# Actualizar
usuario.edad = 26
usuario.guardar()
# Eliminar
# usuario.eliminar()Backup y Restore
import sqlite3
import shutil
from datetime import datetime
# Backup simple
def hacer_backup(db_origen, ruta_backup=None):
if ruta_backup is None:
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
ruta_backup = f'backup_{timestamp}.db'
shutil.copy2(db_origen, ruta_backup)
print(f"Backup creado: {ruta_backup}")
return ruta_backup
# Backup con conexión
def backup_con_conexion(db_origen, db_destino):
origen = sqlite3.connect(db_origen)
destino = sqlite3.connect(db_destino)
origen.backup(destino)
origen.close()
destino.close()
print(f"Backup completado: {db_destino}")
# Usar
hacer_backup('usuarios.db')
backup_con_conexion('usuarios.db', 'backup_manual.db')Migración de Datos
# Exportar a CSV
import csv
def exportar_a_csv(db_nombre, tabla, archivo_csv):
conexion = sqlite3.connect(db_nombre)
conexion.row_factory = sqlite3.Row
cursor = conexion.cursor()
cursor.execute(f"SELECT * FROM {tabla}")
rows = cursor.fetchall()
if rows:
with open(archivo_csv, 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=rows[0].keys())
writer.writeheader()
for row in rows:
writer.writerow(dict(row))
print(f"Datos exportados a {archivo_csv}")
conexion.close()
# Importar desde CSV
def importar_desde_csv(db_nombre, tabla, archivo_csv):
conexion = sqlite3.connect(db_nombre)
cursor = conexion.cursor()
with open(archivo_csv, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
placeholders = ', '.join(['?' for _ in row])
columns = ', '.join(row.keys())
query = f"INSERT INTO {tabla} ({columns}) VALUES ({placeholders})"
cursor.execute(query, list(row.values()))
conexion.commit()
conexion.close()
print(f"Datos importados desde {archivo_csv}")Buenas Prácticas
- Usa placeholders: Previene SQL injection
- Context managers: Usa with para cerrar conexiones
- Índices: Crea índices en columnas frecuentemente consultadas
- Transacciones: Agrupa operaciones relacionadas
- Backups: Haz backups regulares
- Normalización: Diseña bien tus tablas
- Validación: Valida datos antes de insertar
Conclusión
SQLite es perfecto para comenzar con bases de datos en Python. Es simple, rápido y no requiere configuración. Dominar SQLite te prepara para trabajar con bases de datos más complejas como PostgreSQL o MySQL en el futuro.