Connect an OLTP database
This app connects to a Databricks Lakebase OLTP database instance for reads and writes, e.g., of an App state. Provide the instance name, database, schema, and state table.
Code snippet
app.py
import reflex as rx
import uuid
import pandas as pd
from databricks.sdk import WorkspaceClient
import psycopg
from psycopg_pool import ConnectionPool
from typing import Union, Optional
import datetime
# This module-level variable will cache the connection pool across the app.
_pool: Optional[ConnectionPool] = None
# Define a custom connection class to handle token rotation.
class RotatingTokenConnection(psycopg.Connection):
def __init__(self, *args, **kwargs):
self._instance_name = kwargs.pop("_instance_name")
super().__init__(*args, **kwargs)
@classmethod
def connect(cls, *args, **kwargs):
if "_instance_name" in kwargs:
instance_name = kwargs["_instance_name"]
# Generate a fresh OAuth token for each new connection.
w = WorkspaceClient()
credential = w.database.generate_database_credential(
request_id=str(uuid.uuid4()), instance_names=[instance_name]
)
kwargs["password"] = credential.token
return super().connect(*args, **kwargs)
def build_pool(instance_name: str, host: str, user: str, database: str) -> ConnectionPool:
"""Builds and returns a new connection pool with token rotation."""
# Note: sslmode is set to require to ensure encrypted connections.
return ConnectionPool(
conninfo=f"host={host} dbname={database} user={user} sslmode=require",
min_size=1,
max_size=5,
open=True,
kwargs={"_instance_name": instance_name},
connection_class=RotatingTokenConnection,
)
def query_df(query: str, params=None) -> pd.DataFrame:
"""Executes a query using the global pool and returns a DataFrame."""
global _pool
if _pool is None:
raise ConnectionError("Connection pool is not initialized.")
with _pool.connection() as conn:
with conn.cursor() as cursor:
cursor.execute(query, params)
if cursor.description is None:
return pd.DataFrame()
cols = [desc[0] for desc in cursor.description]
return pd.DataFrame(cursor.fetchall(), columns=cols)
def upsert_app_state(schema: str, table: str, session_id: str, key: str, value: str):
"""Helper to create a table and upsert a key-value pair for a session."""
create_sql = f"""
CREATE TABLE IF NOT EXISTS {schema}.{table} (
session_id TEXT,
key TEXT,
value TEXT,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (session_id, key)
);
"""
query_df(create_sql)
upsert_sql = f"""
INSERT INTO {schema}.{table} (session_id, key, value, updated_at)
VALUES (%(session_id)s, %(key)s, %(value)s, CURRENT_TIMESTAMP)
ON CONFLICT (session_id, key) DO UPDATE SET
value = EXCLUDED.value,
updated_at = EXCLUDED.updated_at;
"""
query_df(upsert_sql, params={"session_id": session_id, "key": key, "value": value})
class OltpDatabaseState(rx.State):
session_id: str = str(uuid.uuid4())
selected_instance: str = "your_instance_name"
database: str = "databricks_postgres"
schema_name: str = "public"
table_name: str = "app_state"
result_data: list[dict[str, Union[str, int, float, bool, None, datetime.datetime]]] = []
is_loading: bool = False
error_message: str = ""
@rx.event(background=True)
async def run_query(self):
global _pool
async with self:
self.is_loading = True
self.error_message = ""
try:
# Initialize the pool if it's the first run.
if _pool is None:
w = WorkspaceClient()
user = w.current_user.me().user_name
instance = w.database.get_database_instance(name=self.selected_instance)
host = instance.read_write_dns
_pool = build_pool(
instance_name=self.selected_instance,
host=host, user=user, database=self.database
)
# Create table and insert/update a record for this session.
upsert_app_state(
self.schema_name, self.table_name, self.session_id, "feedback_message", "true"
)
# Fetch the data to display.
select_sql = f"SELECT * FROM {self.schema_name}.{self.table_name} WHERE session_id = %(session_id)s"
df = query_df(select_sql, params={"session_id": self.session_id})
async with self:
self.result_data = df.to_dict("records")
except Exception as e:
async with self:
self.error_message = f"An error occurred: {e}"
finally:
async with self:
self.is_loading = False
info
This sample keeps a module-level ConnectionPool in _pool so connections can be reused within the same app process. For multi-worker deployments, configure pooling appropriately for your workload.
Resources
- Lakebase database instance (PostgreSQL).
- Target PostgreSQL database/schema/table.
Permissions
First, the database instance should be specified in your App resources.
Then, your app service principal needs the following permissions:
GRANT CONNECT ON DATABASE databricks_postgres TO "099f0306-9e29-4a87-84c0-3046e4bcea02";
GRANT USAGE, CREATE ON SCHEMA public TO "099f0306-9e29-4a87-84c0-3046e4bcea02";
GRANT SELECT, INSERT, UPDATE, DELETE ON TABLE app_state TO "099f0306-9e29-4a87-84c0-3046e4bcea02";
See this guide for more information.
This guide shows you how to query your Lakebase.
Dependencies
- Databricks SDK -
databricks-sdk>=0.60.0 psycopg[binary],psycopg-pool- Pandas -
pandas - Reflex -
reflex
requirements.txt
databricks-sdk
pandas
reflex
psycopg[binary]
psycopg-pool