Create Lakebase Resources
This recipe demonstrates how to programmatically create Lakebase PostgreSQL resources in your Databricks workspace using FastAPI. This endpoint sets up a complete Lakebase environment including a database instance, catalog, and synced table pipeline.
This endpoint creates billable resources in your Databricks environment including:
- A Lakebase PostgreSQL database instance
- A synced table pipeline
These resources will incur ongoing costs until deleted. Monitor your usage and delete resources when no longer needed using the delete endpoint.
In this example, we set up our API to be called using the POST
HTTP method which is the standard choice for creating new resources in REST APIs as defined in RFC 7231.
Unlike GET
, POST is not idempotent - making the same request multiple times may create multiple resources.
For detailed specifications, refer to RFC 7231 Section 4.3.3 which defines the POST method's semantics and requirements.
Code snippet
import logging
import os
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.database import (
DatabaseCatalog,
DatabaseInstance,
DatabaseInstanceRole,
DatabaseInstanceRoleAttributes,
DatabaseInstanceRoleIdentityType,
DatabaseInstanceRoleMembershipRole,
NewPipelineSpec,
SyncedDatabaseTable,
SyncedTableSchedulingPolicy,
SyncedTableSpec,
)
from models.lakebase import LakebaseResourcesResponse
from fastapi import APIRouter, HTTPException, Query
# Environment variables used:
# LAKEBASE_INSTANCE_NAME=my-lakebase-instance
# LAKEBASE_DATABASE_NAME=my_database
# LAKEBASE_CATALOG_NAME=my-pg-catalog
# SYNCHED_TABLE_STORAGE_CATALOG=my_storage_catalog
# SYNCHED_TABLE_STORAGE_SCHEMA=my_storage_schema
logger = logging.getLogger(__name__)
w = WorkspaceClient()
router = APIRouter(tags=["lakebase"])
current_user_id = w.current_user.me().id
@router.post(
"/resources/create-lakebase-resources",
response_model=LakebaseResourcesResponse,
summary="Create Lakebase Resources",
)
async def create_lakebase_resources(
create_resources: bool = Query(
description="""🚨 This endpoint creates resources in your Databricks environment that will incur a cost.
By setting this value to true you understand the costs associated with this action. 🚨
⌛️ This endpoint may take a few minutes to complete.⌛️""",
),
capacity: str = Query("CU_1", description="Capacity of the Lakebase instance"),
node_count: int = Query(1, description="Number of nodes in the Lakebase instance"),
enable_readable_secondaries: bool = Query(
False, description="Enable readable secondaries"
),
retention_window_in_days: int = Query(
7, description="Retention window in days for the Lakebase instance"
),
):
if not create_resources:
return LakebaseResourcesResponse(
instance="",
catalog="",
synced_table="",
message="No resources were created (create_resources=False)",
)
instance_name = os.getenv("LAKEBASE_INSTANCE_NAME", f"{current_user_id}-lakebase-demo")
# Check if instance already exists
try:
instance_exists = w.database.get_database_instance(name=instance_name)
logger.info(f"Instance {instance_name} already exists. Skipping creation.")
return LakebaseResourcesResponse(
instance=instance_name,
catalog="",
synced_table="",
message="Instance already exists, skipping creation.",
)
except Exception as e:
if "not found" not in str(e).lower():
raise HTTPException(status_code=500, detail=f"Error checking instance existence: {str(e)}")
# Create database instance
instance = DatabaseInstance(
name=instance_name,
capacity=capacity,
node_count=node_count,
enable_readable_secondaries=enable_readable_secondaries,
retention_window_in_days=retention_window_in_days,
)
instance_create = w.database.create_database_instance_and_wait(instance)
# Create superuser role
superuser_role = DatabaseInstanceRole(
name=w.current_user.me().user_name,
identity_type=DatabaseInstanceRoleIdentityType.USER,
membership_role=DatabaseInstanceRoleMembershipRole.DATABRICKS_SUPERUSER,
attributes=DatabaseInstanceRoleAttributes(bypassrls=True, createdb=True, createrole=True),
)
try:
w.database.create_database_instance_role(
instance_name=instance_create.name,
database_instance_role=superuser_role,
)
except Exception as e:
logger.error(f"Failed to create superuser role: {e}")
# Create catalog
lakebase_database_name = os.getenv("LAKEBASE_DATABASE_NAME", "demo_database")
catalog_name = os.getenv("LAKEBASE_CATALOG_NAME", f"{current_user_id}-pg-catalog")
catalog = DatabaseCatalog(
name=catalog_name,
database_instance_name=instance_create.name,
database_name=lakebase_database_name,
create_database_if_not_exists=True,
)
database_create = w.database.create_database_catalog(catalog)
# Create synced table
synced_table_storage_catalog = os.getenv("SYNCHED_TABLE_STORAGE_CATALOG", "default_storage_catalog")
synced_table_storage_schema = os.getenv("SYNCHED_TABLE_STORAGE_SCHEMA", "default_storage_schema")
new_pipeline = NewPipelineSpec(
storage_catalog=synced_table_storage_catalog,
storage_schema=synced_table_storage_schema,
)
spec = SyncedTableSpec(
source_table_full_name="samples.tpch.orders",
primary_key_columns=["o_orderkey"],
timeseries_key="o_orderdate",
create_database_objects_if_missing=True,
new_pipeline_spec=new_pipeline,
scheduling_policy=SyncedTableSchedulingPolicy.SNAPSHOT,
)
synced_table = SyncedDatabaseTable(
name=f"{catalog_name}.public.orders_synced",
database_instance_name=instance_create.name,
logical_database_name=lakebase_database_name,
spec=spec,
)
try:
synced_table_create = w.database.create_synced_database_table(synced_table)
pipeline_id = synced_table_create.id
except Exception as e:
logger.error(f"API error during synced table creation: {e}")
pipeline_id = "check-workspace-ui"
workspace_url = w.config.host
if pipeline_id != "check-workspace-ui":
pipeline_url = f"{workspace_url}/pipelines/{pipeline_id}"
message = f"Resources created successfully. Synced table pipeline {pipeline_id} is provisioning asynchronously. Monitor progress at: {pipeline_url}"
else:
message = f"Resources created successfully. Synced table pipeline initiated. Check pipelines in workspace: {workspace_url}/pipelines"
return LakebaseResourcesResponse(
instance=instance_create.name,
catalog=database_create.name,
synced_table=pipeline_id,
message=message,
)
The above example is shortened for brevity and not suitable for production use. You can find a more advanced sample in the databricks-apps-cookbook GitHub repository.
Example Usage
# Create Lakebase resources with default settings
curl -X POST "http://localhost:8000/api/v1/resources/create-lakebase-resources?create_resources=true"
# Create with custom capacity and multiple nodes
curl -X POST "http://localhost:8000/api/v1/resources/create-lakebase-resources?create_resources=true&capacity=CU_2&node_count=2&enable_readable_secondaries=true"
If the request was successful, you will get the following output:
{
"instance": "user123-lakebase-demo",
"catalog": "user123-pg-catalog",
"synced_table": "pipeline-abc123",
"message": "Resources created successfully. Synced table pipeline abc123 is provisioning asynchronously. Monitor progress at: https://workspace.databricks.com/pipelines/abc123"
}
Resources
Permissions
Your app service principal needs the following permissions:
CREATE
permissions on the Unity Catalog metastoreCAN USE
on the storage catalog and schema for synced tables- Access to the source table
samples.tpch.orders
See Lakebase permissions for more information.
Dependencies
- Databricks SDK for Python -
databricks-sdk
- FastAPI -
fastapi
- uvicorn -
uvicorn
databricks-sdk>=0.60.0
fastapi
uvicorn