OLTP Database
This recipe connects to a Databricks Lakebase OLTP database instance to read data from PostgreSQL tables. It uses OAuth token-based authentication with connection pooling for efficient database access.
Code snippet
app.py
import uuid
import pandas as pd
from databricks.sdk import WorkspaceClient
import psycopg
from psycopg_pool import ConnectionPool
w = WorkspaceClient()
class RotatingTokenConnection(psycopg.Connection):
"""psycopg3 Connection that injects a fresh OAuth token as the password."""
@classmethod
def connect(cls, conninfo: str = "", **kwargs):
kwargs["password"] = w.database.generate_database_credential(
request_id=str(uuid.uuid4()),
instance_names=[kwargs.pop("_instance_name")]
).token
kwargs.setdefault("sslmode", "require")
return super().connect(conninfo, **kwargs)
def build_pool(instance_name: str, host: str, user: str, database: str) -> ConnectionPool:
return ConnectionPool(
conninfo=f"host={host} dbname={database} user={user}",
connection_class=RotatingTokenConnection,
kwargs={"_instance_name": instance_name},
min_size=1,
max_size=5,
open=True,
)
def query_df(pool: ConnectionPool, sql: str) -> pd.DataFrame:
with pool.connection() as conn:
with conn.cursor() as cur:
cur.execute(sql)
if cur.description is None:
return pd.DataFrame()
cols = [d.name for d in cur.description]
rows = cur.fetchall()
return pd.DataFrame(rows, columns=cols)
# Usage
instance_name = "dbase_instance"
database = "databricks_postgres"
schema = "public"
table = "app_state"
user = w.current_user.me().user_name
host = w.database.get_database_instance(name=instance_name).read_write_dns
pool = build_pool(instance_name, host, user, database)
# Query existing data
df = query_df(pool, f"SELECT * FROM {schema}.{table} LIMIT 10")
Requirements
Permissions (app service principal)
- The database instance should be specified in your App resources.
- A PostgreSQL role for the service principal is required. See this guide.
- The PostgreSQL service principal role should have these example grants:
GRANT CONNECT ON DATABASE databricks_postgres TO "099f0306-9e29-4a87-84c0-3046e4bcea02";
GRANT USAGE, CREATE ON SCHEMA public TO "099f0306-9e29-4a87-84c0-3046e4bcea02";
GRANT SELECT ON TABLE app_state TO "099f0306-9e29-4a87-84c0-3046e4bcea02";
Databricks resources
- Lakebase database instance (PostgreSQL).
- Target PostgreSQL database/schema/table.
Dependencies
- Databricks SDK -
databricks-sdk>=0.60.0
psycopg[binary]
,psycopg-pool
- Pandas -
pandas
- Dash -
dash
info
Tokens expire periodically; this app refreshes on each new connection and enforces TLS (sslmode=require).