Skip to main content

Connect FastAPI to Lakebase

This guide demonstrates how to connect your FastAPI application to a Lakebase PostgreSQL database with automatic token refresh and connection pooling. Lakebase provides a managed PostgreSQL database instance within your Databricks workspace that seamlessly integrates with Unity Catalog.

What is Lakebase?

Lakebase is Databricks' managed PostgreSQL service that provides:

  • Fully managed PostgreSQL instances within your Databricks workspace
  • Automatic OAuth token authentication using Databricks credentials
  • Unity Catalog integration for unified data governance
  • High availability and automatic backups
  • Seamless data synchronization with Unity Catalog tables

Prerequisites

Before connecting to Lakebase, ensure you have:

  • A Lakebase PostgreSQL instance created (see Create Lakebase Resources)
  • FastAPI application with required dependencies installed
  • Databricks workspace authentication configured locally

Database Configuration

Environment Variables

.env
# Lakebase Configuration
LAKEBASE_INSTANCE_NAME=my-lakebase-instance
LAKEBASE_DATABASE_NAME=my_database
LAKEBASE_CATALOG_NAME=my-pg-catalog

# Database Connection Pool Settings
DB_POOL_SIZE=5
DB_MAX_OVERFLOW=10
DB_COMMAND_TIMEOUT=30
DB_POOL_TIMEOUT=10
DB_POOL_RECYCLE_INTERVAL=3600
DATABRICKS_DATABASE_PORT=5432

Complete Implementation

config/database.py
import asyncio
import logging
import os
import time
import uuid
from typing import AsyncGenerator

from databricks.sdk import WorkspaceClient
from dotenv import load_dotenv
from sqlalchemy import URL, event, text
from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker

load_dotenv()
logger = logging.getLogger(__name__)

# Global variables
engine: AsyncEngine | None = None
AsyncSessionLocal: sessionmaker | None = None
workspace_client: WorkspaceClient | None = None
database_instance = None

# Token management for background refresh
postgres_password: str | None = None
last_password_refresh: float = 0
token_refresh_task: asyncio.Task | None = None

async def refresh_token_background():
"""Background task to refresh tokens every 50 minutes"""
global postgres_password, last_password_refresh, workspace_client, database_instance

while True:
try:
await asyncio.sleep(50 * 60) # Wait 50 minutes
logger.info("Background token refresh: Generating fresh PostgreSQL OAuth token")

cred = workspace_client.database.generate_database_credential(
request_id=str(uuid.uuid4()),
instance_names=[database_instance.name],
)
postgres_password = cred.token
last_password_refresh = time.time()
logger.info("Background token refresh: Token updated successfully")

except Exception as e:
logger.error(f"Background token refresh failed: {e}")

def init_engine():
"""Initialize database connection using SQLAlchemy with automatic token refresh"""
global engine, AsyncSessionLocal, workspace_client, database_instance, postgres_password, last_password_refresh

try:
# Initialize Databricks SDK client
workspace_client = WorkspaceClient()

# Get database instance details
instance_name = os.getenv("LAKEBASE_INSTANCE_NAME")
if not instance_name:
raise RuntimeError("LAKEBASE_INSTANCE_NAME environment variable is required")

database_instance = workspace_client.database.get_database_instance(name=instance_name)

# Generate initial OAuth credentials
cred = workspace_client.database.generate_database_credential(
request_id=str(uuid.uuid4()),
instance_names=[database_instance.name]
)
postgres_password = cred.token
last_password_refresh = time.time()
logger.info("Database: Initial credentials generated")

# Build connection URL
database_name = os.getenv("LAKEBASE_DATABASE_NAME", database_instance.name)
username = (
os.getenv("DATABRICKS_CLIENT_ID")
or workspace_client.current_user.me().user_name
or None
)

url = URL.create(
drivername="postgresql+asyncpg",
username=username,
password="", # Will be set by event handler
host=database_instance.read_write_dns,
port=int(os.getenv("DATABRICKS_DATABASE_PORT", "5432")),
database=database_name,
)

# Create async engine with connection pooling
engine = create_async_engine(
url,
pool_pre_ping=False,
echo=False,
pool_size=int(os.getenv("DB_POOL_SIZE", "5")),
max_overflow=int(os.getenv("DB_MAX_OVERFLOW", "10")),
pool_timeout=int(os.getenv("DB_POOL_TIMEOUT", "10")),
pool_recycle=int(os.getenv("DB_POOL_RECYCLE_INTERVAL", "3600")),
connect_args={
"command_timeout": int(os.getenv("DB_COMMAND_TIMEOUT", "30")),
"server_settings": {
"application_name": "fastapi_orders_app",
},
"ssl": "require",
},
)

# Register token provider for new connections
@event.listens_for(engine.sync_engine, "do_connect")
def provide_token(dialect, conn_rec, cargs, cparams):
global postgres_password
# Use current token from background refresh
cparams["password"] = postgres_password

# Create session factory
AsyncSessionLocal = sessionmaker(
bind=engine,
class_=AsyncSession,
expire_on_commit=False
)

logger.info(f"Database engine initialized for {database_name} with background token refresh")

except Exception as e:
logger.error(f"Error initializing database: {e}")
raise RuntimeError(f"Failed to initialize database: {e}") from e

async def start_token_refresh():
"""Start the background token refresh task"""
global token_refresh_task
if token_refresh_task is None or token_refresh_task.done():
token_refresh_task = asyncio.create_task(refresh_token_background())
logger.info("Background token refresh task started")

async def stop_token_refresh():
"""Stop the background token refresh task"""
global token_refresh_task
if token_refresh_task and not token_refresh_task.done():
token_refresh_task.cancel()
try:
await token_refresh_task
except asyncio.CancelledError:
pass
logger.info("Background token refresh task stopped")

async def get_async_db() -> AsyncGenerator[AsyncSession, None]:
"""Get a database session with automatic token refresh"""
if AsyncSessionLocal is None:
raise RuntimeError("Engine not initialized; call init_engine() first")
async with AsyncSessionLocal() as session:
yield session

def check_database_exists() -> bool:
"""Check if the Lakebase database instance exists"""
try:
workspace_client = WorkspaceClient()
instance_name = os.getenv("LAKEBASE_INSTANCE_NAME")

if not instance_name:
logger.warning("LAKEBASE_INSTANCE_NAME not set - database instance check skipped")
return False

workspace_client.database.get_database_instance(name=instance_name)
logger.info(f"Lakebase database instance '{instance_name}' exists")
return True
except Exception as e:
if "not found" in str(e).lower() or "resource not found" in str(e).lower():
logger.info(f"Lakebase database instance '{instance_name}' does not exist")
else:
logger.error(f"Error checking database instance existence: {e}")
return False

async def database_health() -> bool:
"""Check database connection health"""
global engine

if engine is None:
logger.error("Database engine failed to initialize.")
return False

try:
async with engine.connect() as connection:
await connection.execute(text("SELECT 1"))
logger.info("Database connection is healthy.")
return True
except Exception as e:
logger.error("Database health check failed: %s", e)
return False

Integration with FastAPI

Application Startup

main.py
from contextlib import asynccontextmanager
from fastapi import FastAPI
from config.database import init_engine, start_token_refresh, stop_token_refresh, check_database_exists

@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifespan management"""
# Startup
if check_database_exists():
init_engine()
await start_token_refresh()
logger.info("Application started with Lakebase connection")
else:
logger.warning("Lakebase database not found - orders endpoints disabled")

yield

# Shutdown
await stop_token_refresh()
logger.info("Application shutdown complete")

app = FastAPI(lifespan=lifespan)

Using Database Sessions

routes/orders.py
from fastapi import APIRouter, Depends
from sqlalchemy.ext.asyncio import AsyncSession
from config.database import get_async_db
from models.orders import Order

router = APIRouter()

@router.get("/orders/count")
async def get_order_count(db: AsyncSession = Depends(get_async_db)):
"""Get total orders count using dependency injection"""
stmt = select(func.count(Order.o_orderkey))
result = await db.execute(stmt)
count = result.scalar()
return {"total_orders": count}

Key Architecture Decisions

1. Automatic Token Refresh

# OAuth tokens expire after 1 hour
# We refresh every 50 minutes to ensure continuous connectivity
await asyncio.sleep(50 * 60) # 50 minutes

2. Connection Event Handler

# SQLAlchemy event listener injects fresh tokens for new connections
@event.listens_for(engine.sync_engine, "do_connect")
def provide_token(dialect, conn_rec, cargs, cparams):
cparams["password"] = postgres_password

3. Connection Pooling Configuration

engine = create_async_engine(
url,
pool_size=5, # Base number of connections
max_overflow=10, # Additional connections under load
pool_timeout=10, # Max wait time for connection
pool_recycle=3600, # Recycle connections every hour
)

Configuration Options

Connection Pool Settings

ParameterDefaultDescriptionRecommended
DB_POOL_SIZE5Base connection pool size5-10 for most apps
DB_MAX_OVERFLOW10Additional connections under load2x pool_size
DB_POOL_TIMEOUT10Max seconds to wait for connection10-30 seconds
DB_COMMAND_TIMEOUT30Query timeout in seconds30-60 seconds
DB_POOL_RECYCLE_INTERVAL3600Recycle connections (seconds)3600 (1 hour)

Performance Tuning

# For high-traffic applications
DB_POOL_SIZE=10
DB_MAX_OVERFLOW=20
DB_POOL_TIMEOUT=30

# For development/testing
DB_POOL_SIZE=2
DB_MAX_OVERFLOW=5
DB_POOL_TIMEOUT=10

Health Monitoring

Database Health Check Endpoint

routes/health.py
from fastapi import APIRouter
from config.database import database_health, check_database_exists

router = APIRouter()

@router.get("/health/database")
async def health_check():
"""Comprehensive database health check"""
instance_exists = check_database_exists()
connection_healthy = await database_health() if instance_exists else False

return {
"database_instance_exists": instance_exists,
"connection_healthy": connection_healthy,
"status": "healthy" if (instance_exists and connection_healthy) else "unhealthy"
}

Error Handling and Troubleshooting

Common Issues

1. Token Refresh Failures

# Monitor token refresh in logs
logger.error(f"Background token refresh failed: {e}")

# Check Databricks SDK authentication
workspace_client = WorkspaceClient()
print(workspace_client.current_user.me())

2. Connection Pool Exhaustion

# Increase pool settings
DB_POOL_SIZE=10
DB_MAX_OVERFLOW=20
DB_POOL_TIMEOUT=30

3. Instance Not Found

# Verify instance name matches exactly
LAKEBASE_INSTANCE_NAME=your-exact-instance-name

# Check instance exists in Databricks workspace

Security Considerations

  1. OAuth Token Security: Tokens are stored in memory only, never persisted to disk
  2. SSL Enforcement: All connections require SSL encryption
  3. Connection Isolation: Each request gets its own database session
  4. Automatic Cleanup: Sessions are automatically closed after request completion
  5. No Hardcoded Credentials: All authentication uses Databricks SDK

Dependencies

requirements.txt
databricks-sdk>=0.60.0
sqlalchemy
asyncpg
python-dotenv