Connect FastAPI to Lakebase
This guide demonstrates how to connect your FastAPI application to a Lakebase PostgreSQL database with automatic token refresh and connection pooling. Lakebase provides a managed PostgreSQL database instance within your Databricks workspace that seamlessly integrates with Unity Catalog.
What is Lakebase?
Lakebase is Databricks' managed PostgreSQL service that provides:
- Fully managed PostgreSQL instances within your Databricks workspace
- Automatic OAuth token authentication using Databricks credentials
- Unity Catalog integration for unified data governance
- High availability and automatic backups
- Seamless data synchronization with Unity Catalog tables
Prerequisites
Before connecting to Lakebase, ensure you have:
- A Lakebase PostgreSQL instance created (see Create Lakebase Resources)
- FastAPI application with required dependencies installed
- Databricks workspace authentication configured locally
Database Configuration
Environment Variables
.env
# Lakebase Configuration
LAKEBASE_INSTANCE_NAME=my-lakebase-instance
LAKEBASE_DATABASE_NAME=my_database
LAKEBASE_CATALOG_NAME=my-pg-catalog
# Database Connection Pool Settings
DB_POOL_SIZE=5
DB_MAX_OVERFLOW=10
DB_COMMAND_TIMEOUT=30
DB_POOL_TIMEOUT=10
DB_POOL_RECYCLE_INTERVAL=3600
DATABRICKS_DATABASE_PORT=5432
Complete Implementation
config/database.py
import asyncio
import logging
import os
import time
import uuid
from typing import AsyncGenerator
from databricks.sdk import WorkspaceClient
from dotenv import load_dotenv
from sqlalchemy import URL, event, text
from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker
load_dotenv()
logger = logging.getLogger(__name__)
# Global variables
engine: AsyncEngine | None = None
AsyncSessionLocal: sessionmaker | None = None
workspace_client: WorkspaceClient | None = None
database_instance = None
# Token management for background refresh
postgres_password: str | None = None
last_password_refresh: float = 0
token_refresh_task: asyncio.Task | None = None
async def refresh_token_background():
"""Background task to refresh tokens every 50 minutes"""
global postgres_password, last_password_refresh, workspace_client, database_instance
while True:
try:
await asyncio.sleep(50 * 60) # Wait 50 minutes
logger.info("Background token refresh: Generating fresh PostgreSQL OAuth token")
cred = workspace_client.database.generate_database_credential(
request_id=str(uuid.uuid4()),
instance_names=[database_instance.name],
)
postgres_password = cred.token
last_password_refresh = time.time()
logger.info("Background token refresh: Token updated successfully")
except Exception as e:
logger.error(f"Background token refresh failed: {e}")
def init_engine():
"""Initialize database connection using SQLAlchemy with automatic token refresh"""
global engine, AsyncSessionLocal, workspace_client, database_instance, postgres_password, last_password_refresh
try:
# Initialize Databricks SDK client
workspace_client = WorkspaceClient()
# Get database instance details
instance_name = os.getenv("LAKEBASE_INSTANCE_NAME")
if not instance_name:
raise RuntimeError("LAKEBASE_INSTANCE_NAME environment variable is required")
database_instance = workspace_client.database.get_database_instance(name=instance_name)
# Generate initial OAuth credentials
cred = workspace_client.database.generate_database_credential(
request_id=str(uuid.uuid4()),
instance_names=[database_instance.name]
)
postgres_password = cred.token
last_password_refresh = time.time()
logger.info("Database: Initial credentials generated")
# Build connection URL
database_name = os.getenv("LAKEBASE_DATABASE_NAME", database_instance.name)
username = (
os.getenv("DATABRICKS_CLIENT_ID")
or workspace_client.current_user.me().user_name
or None
)
url = URL.create(
drivername="postgresql+asyncpg",
username=username,
password="", # Will be set by event handler
host=database_instance.read_write_dns,
port=int(os.getenv("DATABRICKS_DATABASE_PORT", "5432")),
database=database_name,
)
# Create async engine with connection pooling
engine = create_async_engine(
url,
pool_pre_ping=False,
echo=False,
pool_size=int(os.getenv("DB_POOL_SIZE", "5")),
max_overflow=int(os.getenv("DB_MAX_OVERFLOW", "10")),
pool_timeout=int(os.getenv("DB_POOL_TIMEOUT", "10")),
pool_recycle=int(os.getenv("DB_POOL_RECYCLE_INTERVAL", "3600")),
connect_args={
"command_timeout": int(os.getenv("DB_COMMAND_TIMEOUT", "30")),
"server_settings": {
"application_name": "fastapi_orders_app",
},
"ssl": "require",
},
)
# Register token provider for new connections
@event.listens_for(engine.sync_engine, "do_connect")
def provide_token(dialect, conn_rec, cargs, cparams):
global postgres_password
# Use current token from background refresh
cparams["password"] = postgres_password
# Create session factory
AsyncSessionLocal = sessionmaker(
bind=engine,
class_=AsyncSession,
expire_on_commit=False
)
logger.info(f"Database engine initialized for {database_name} with background token refresh")
except Exception as e:
logger.error(f"Error initializing database: {e}")
raise RuntimeError(f"Failed to initialize database: {e}") from e
async def start_token_refresh():
"""Start the background token refresh task"""
global token_refresh_task
if token_refresh_task is None or token_refresh_task.done():
token_refresh_task = asyncio.create_task(refresh_token_background())
logger.info("Background token refresh task started")
async def stop_token_refresh():
"""Stop the background token refresh task"""
global token_refresh_task
if token_refresh_task and not token_refresh_task.done():
token_refresh_task.cancel()
try:
await token_refresh_task
except asyncio.CancelledError:
pass
logger.info("Background token refresh task stopped")
async def get_async_db() -> AsyncGenerator[AsyncSession, None]:
"""Get a database session with automatic token refresh"""
if AsyncSessionLocal is None:
raise RuntimeError("Engine not initialized; call init_engine() first")
async with AsyncSessionLocal() as session:
yield session
def check_database_exists() -> bool:
"""Check if the Lakebase database instance exists"""
try:
workspace_client = WorkspaceClient()
instance_name = os.getenv("LAKEBASE_INSTANCE_NAME")
if not instance_name:
logger.warning("LAKEBASE_INSTANCE_NAME not set - database instance check skipped")
return False
workspace_client.database.get_database_instance(name=instance_name)
logger.info(f"Lakebase database instance '{instance_name}' exists")
return True
except Exception as e:
if "not found" in str(e).lower() or "resource not found" in str(e).lower():
logger.info(f"Lakebase database instance '{instance_name}' does not exist")
else:
logger.error(f"Error checking database instance existence: {e}")
return False
async def database_health() -> bool:
"""Check database connection health"""
global engine
if engine is None:
logger.error("Database engine failed to initialize.")
return False
try:
async with engine.connect() as connection:
await connection.execute(text("SELECT 1"))
logger.info("Database connection is healthy.")
return True
except Exception as e:
logger.error("Database health check failed: %s", e)
return False
Integration with FastAPI
Application Startup
main.py
from contextlib import asynccontextmanager
from fastapi import FastAPI
from config.database import init_engine, start_token_refresh, stop_token_refresh, check_database_exists
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifespan management"""
# Startup
if check_database_exists():
init_engine()
await start_token_refresh()
logger.info("Application started with Lakebase connection")
else:
logger.warning("Lakebase database not found - orders endpoints disabled")
yield
# Shutdown
await stop_token_refresh()
logger.info("Application shutdown complete")
app = FastAPI(lifespan=lifespan)
Using Database Sessions
routes/orders.py
from fastapi import APIRouter, Depends
from sqlalchemy.ext.asyncio import AsyncSession
from config.database import get_async_db
from models.orders import Order
router = APIRouter()
@router.get("/orders/count")
async def get_order_count(db: AsyncSession = Depends(get_async_db)):
"""Get total orders count using dependency injection"""
stmt = select(func.count(Order.o_orderkey))
result = await db.execute(stmt)
count = result.scalar()
return {"total_orders": count}
Key Architecture Decisions
1. Automatic Token Refresh
# OAuth tokens expire after 1 hour
# We refresh every 50 minutes to ensure continuous connectivity
await asyncio.sleep(50 * 60) # 50 minutes
2. Connection Event Handler
# SQLAlchemy event listener injects fresh tokens for new connections
@event.listens_for(engine.sync_engine, "do_connect")
def provide_token(dialect, conn_rec, cargs, cparams):
cparams["password"] = postgres_password
3. Connection Pooling Configuration
engine = create_async_engine(
url,
pool_size=5, # Base number of connections
max_overflow=10, # Additional connections under load
pool_timeout=10, # Max wait time for connection
pool_recycle=3600, # Recycle connections every hour
)
Configuration Options
Connection Pool Settings
Parameter | Default | Description | Recommended |
---|---|---|---|
DB_POOL_SIZE | 5 | Base connection pool size | 5-10 for most apps |
DB_MAX_OVERFLOW | 10 | Additional connections under load | 2x pool_size |
DB_POOL_TIMEOUT | 10 | Max seconds to wait for connection | 10-30 seconds |
DB_COMMAND_TIMEOUT | 30 | Query timeout in seconds | 30-60 seconds |
DB_POOL_RECYCLE_INTERVAL | 3600 | Recycle connections (seconds) | 3600 (1 hour) |
Performance Tuning
# For high-traffic applications
DB_POOL_SIZE=10
DB_MAX_OVERFLOW=20
DB_POOL_TIMEOUT=30
# For development/testing
DB_POOL_SIZE=2
DB_MAX_OVERFLOW=5
DB_POOL_TIMEOUT=10
Health Monitoring
Database Health Check Endpoint
routes/health.py
from fastapi import APIRouter
from config.database import database_health, check_database_exists
router = APIRouter()
@router.get("/health/database")
async def health_check():
"""Comprehensive database health check"""
instance_exists = check_database_exists()
connection_healthy = await database_health() if instance_exists else False
return {
"database_instance_exists": instance_exists,
"connection_healthy": connection_healthy,
"status": "healthy" if (instance_exists and connection_healthy) else "unhealthy"
}
Error Handling and Troubleshooting
Common Issues
1. Token Refresh Failures
# Monitor token refresh in logs
logger.error(f"Background token refresh failed: {e}")
# Check Databricks SDK authentication
workspace_client = WorkspaceClient()
print(workspace_client.current_user.me())
2. Connection Pool Exhaustion
# Increase pool settings
DB_POOL_SIZE=10
DB_MAX_OVERFLOW=20
DB_POOL_TIMEOUT=30
3. Instance Not Found
# Verify instance name matches exactly
LAKEBASE_INSTANCE_NAME=your-exact-instance-name
# Check instance exists in Databricks workspace
Security Considerations
- OAuth Token Security: Tokens are stored in memory only, never persisted to disk
- SSL Enforcement: All connections require SSL encryption
- Connection Isolation: Each request gets its own database session
- Automatic Cleanup: Sessions are automatically closed after request completion
- No Hardcoded Credentials: All authentication uses Databricks SDK
Related Documentation
- Create Lakebase Resources - Set up your Lakebase instance
- Lakebase Orders Management - Use the database connection
- Delete Lakebase Resources - Clean up resources
Dependencies
- Databricks SDK for Python -
databricks-sdk
- SQLAlchemy -
sqlalchemy
- asyncpg -
asyncpg
- python-dotenv -
python-dotenv
requirements.txt
databricks-sdk>=0.60.0
sqlalchemy
asyncpg
python-dotenv