Skip to main content

Connect an OLTP database

This app connects to a Databricks Lakebase OLTP database instance for reads and writes, e.g., of App state. Provide the instance name, database, schema, and state table.

Code snippet

app.py
import uuid
import streamlit as st
import pandas as pd

from databricks.sdk import WorkspaceClient
import psycopg
from psycopg_pool import ConnectionPool


w = WorkspaceClient()


class RotatingTokenConnection(psycopg.Connection):
@classmethod
def connect(cls, conninfo: str = "", **kwargs):
kwargs["password"] = w.database.generate_database_credential(
request_id=str(uuid.uuid4()),
instance_names=[kwargs.pop("_instance_name")]
).token
kwargs.setdefault("sslmode", "require")
return super().connect(conninfo, **kwargs)


@st.cache_resource
def build_pool(instance_name: str, host: str, user: str, database: str) -> ConnectionPool:
return ConnectionPool(
conninfo=f"host={host} dbname={database} user={user}",
connection_class=RotatingTokenConnection,
kwargs={"_instance_name": instance_name},
min_size=1,
max_size=5,
open=True,
)


def query_df(pool: ConnectionPool, sql: str) -> pd.DataFrame:
with pool.connection() as conn:
with conn.cursor() as cur:
cur.execute(sql)
if cur.description is None:
return pd.DataFrame()
cols = [d.name for d in cur.description]
rows = cur.fetchall()
return pd.DataFrame(rows, columns=cols)


session_id = str(uuid.uuid4())
if "session_id" not in st.session_state:
st.session_state["session_id"] = session_id


instance_name = "dbase_instance"
database = "databricks_postgres"
schema = "public"
table = "app_state"
user = w.current_user.me().user_name
host = w.database.get_database_instance(name=instance_name).read_write_dns

pool = build_pool(instance_name, host, user, database)

with pool.connection() as conn:
with conn.cursor() as cur:
cur.execute(f"""
CREATE TABLE IF NOT EXISTS {schema}.{table} (
session_id TEXT,
key TEXT,
value TEXT,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (session_id, key)
)
""")

cur.execute(f"""
INSERT INTO app_state (session_id, key, value, updated_at)
VALUES ('{session_id}', 'feedback_message', 'true', CURRENT_TIMESTAMP)
ON CONFLICT (session_id, key) DO UPDATE
SET value = EXCLUDED.value,
updated_at = CURRENT_TIMESTAMP
""")

df = query_df(pool, f"SELECT * FROM {schema}.{table} WHERE session_id = '{session_id}'")
st.dataframe(df)
info

This sample uses Streamlit's st.cache_resource to cache the database connection across users, sessions, and reruns. Use Streamlit's caching decorators to implement a caching strategy that works for your use case.

Resources

  • Lakebase database instance (PostgreSQL).
  • Target PostgreSQL database/schema/table.

Permissions

First, the database instance should be specified in your App resources.

Then, your app service principal needs the following permissions:

GRANT CONNECT ON DATABASE databricks_postgres TO "099f0306-9e29-4a87-84c0-3046e4bcea02"; GRANT USAGE ON SCHEMA public TO "099f0306-9e29-4a87-84c0-3046e4bcea02"; GRANT SELECT, INSERT, UPDATE, DELETE ON TABLE app_state TO "099f0306-9e29-4a87-84c0-3046e4bcea02";

See this guide for more information.

This guide shows you how to query your Lakebase.

Dependencies

requirements.txt
databricks-sdk
databricks-sql-connector
pandas
streamlit
psycopg[binary]
psycopg-pool