Skip to main content

Create Lakebase Resources

This recipe demonstrates how to programmatically create Lakebase PostgreSQL resources in your Databricks workspace using FastAPI. This endpoint sets up a complete Lakebase environment including a database instance, catalog, and synced table pipeline.

Cost Alert

This endpoint creates billable resources in your Databricks environment including:

  • A Lakebase PostgreSQL database instance
  • A synced table pipeline

These resources will incur ongoing costs until deleted. Monitor your usage and delete resources when no longer needed using the delete endpoint.

info

In this example, we set up our API to be called using the POST HTTP method which is the standard choice for creating new resources in REST APIs as defined in RFC 7231. Unlike GET, POST is not idempotent - making the same request multiple times may create multiple resources.

For detailed specifications, refer to RFC 7231 Section 4.3.3 which defines the POST method's semantics and requirements.

Code snippet

routes/v1/lakebase.py
import logging
import os

from databricks.sdk import WorkspaceClient
from databricks.sdk.service.database import (
DatabaseCatalog,
DatabaseInstance,
DatabaseInstanceRole,
DatabaseInstanceRoleAttributes,
DatabaseInstanceRoleIdentityType,
DatabaseInstanceRoleMembershipRole,
NewPipelineSpec,
SyncedDatabaseTable,
SyncedTableSchedulingPolicy,
SyncedTableSpec,
)
from models.lakebase import LakebaseResourcesResponse

from fastapi import APIRouter, HTTPException, Query

# Environment variables used:
# LAKEBASE_INSTANCE_NAME=my-lakebase-instance
# LAKEBASE_DATABASE_NAME=my_database
# LAKEBASE_CATALOG_NAME=my-pg-catalog
# SYNCHED_TABLE_STORAGE_CATALOG=my_storage_catalog
# SYNCHED_TABLE_STORAGE_SCHEMA=my_storage_schema

logger = logging.getLogger(__name__)
w = WorkspaceClient()
router = APIRouter(tags=["lakebase"])
current_user_id = w.current_user.me().id

@router.post(
"/resources/create-lakebase-resources",
response_model=LakebaseResourcesResponse,
summary="Create Lakebase Resources",
)
async def create_lakebase_resources(
create_resources: bool = Query(
description="""🚨 This endpoint creates resources in your Databricks environment that will incur a cost.
By setting this value to true you understand the costs associated with this action. 🚨
⌛️ This endpoint may take a few minutes to complete.⌛️""",
),
capacity: str = Query("CU_1", description="Capacity of the Lakebase instance"),
node_count: int = Query(1, description="Number of nodes in the Lakebase instance"),
enable_readable_secondaries: bool = Query(
False, description="Enable readable secondaries"
),
retention_window_in_days: int = Query(
7, description="Retention window in days for the Lakebase instance"
),
):
if not create_resources:
return LakebaseResourcesResponse(
instance="",
catalog="",
synced_table="",
message="No resources were created (create_resources=False)",
)

instance_name = os.getenv("LAKEBASE_INSTANCE_NAME", f"{current_user_id}-lakebase-demo")

# Check if instance already exists
try:
instance_exists = w.database.get_database_instance(name=instance_name)
logger.info(f"Instance {instance_name} already exists. Skipping creation.")
return LakebaseResourcesResponse(
instance=instance_name,
catalog="",
synced_table="",
message="Instance already exists, skipping creation.",
)
except Exception as e:
if "not found" not in str(e).lower():
raise HTTPException(status_code=500, detail=f"Error checking instance existence: {str(e)}")

# Create database instance
instance = DatabaseInstance(
name=instance_name,
capacity=capacity,
node_count=node_count,
enable_readable_secondaries=enable_readable_secondaries,
retention_window_in_days=retention_window_in_days,
)
instance_create = w.database.create_database_instance_and_wait(instance)

# Create superuser role
superuser_role = DatabaseInstanceRole(
name=w.current_user.me().user_name,
identity_type=DatabaseInstanceRoleIdentityType.USER,
membership_role=DatabaseInstanceRoleMembershipRole.DATABRICKS_SUPERUSER,
attributes=DatabaseInstanceRoleAttributes(bypassrls=True, createdb=True, createrole=True),
)

try:
w.database.create_database_instance_role(
instance_name=instance_create.name,
database_instance_role=superuser_role,
)
except Exception as e:
logger.error(f"Failed to create superuser role: {e}")

# Create catalog
lakebase_database_name = os.getenv("LAKEBASE_DATABASE_NAME", "demo_database")
catalog_name = os.getenv("LAKEBASE_CATALOG_NAME", f"{current_user_id}-pg-catalog")

catalog = DatabaseCatalog(
name=catalog_name,
database_instance_name=instance_create.name,
database_name=lakebase_database_name,
create_database_if_not_exists=True,
)
database_create = w.database.create_database_catalog(catalog)

# Create synced table
synced_table_storage_catalog = os.getenv("SYNCHED_TABLE_STORAGE_CATALOG", "default_storage_catalog")
synced_table_storage_schema = os.getenv("SYNCHED_TABLE_STORAGE_SCHEMA", "default_storage_schema")

new_pipeline = NewPipelineSpec(
storage_catalog=synced_table_storage_catalog,
storage_schema=synced_table_storage_schema,
)

spec = SyncedTableSpec(
source_table_full_name="samples.tpch.orders",
primary_key_columns=["o_orderkey"],
timeseries_key="o_orderdate",
create_database_objects_if_missing=True,
new_pipeline_spec=new_pipeline,
scheduling_policy=SyncedTableSchedulingPolicy.SNAPSHOT,
)

synced_table = SyncedDatabaseTable(
name=f"{catalog_name}.public.orders_synced",
database_instance_name=instance_create.name,
logical_database_name=lakebase_database_name,
spec=spec,
)

try:
synced_table_create = w.database.create_synced_database_table(synced_table)
pipeline_id = synced_table_create.id
except Exception as e:
logger.error(f"API error during synced table creation: {e}")
pipeline_id = "check-workspace-ui"

workspace_url = w.config.host
if pipeline_id != "check-workspace-ui":
pipeline_url = f"{workspace_url}/pipelines/{pipeline_id}"
message = f"Resources created successfully. Synced table pipeline {pipeline_id} is provisioning asynchronously. Monitor progress at: {pipeline_url}"
else:
message = f"Resources created successfully. Synced table pipeline initiated. Check pipelines in workspace: {workspace_url}/pipelines"

return LakebaseResourcesResponse(
instance=instance_create.name,
catalog=database_create.name,
synced_table=pipeline_id,
message=message,
)
warning

The above example is shortened for brevity and not suitable for production use. You can find a more advanced sample in the databricks-apps-cookbook GitHub repository.

Example Usage

# Create Lakebase resources with default settings
curl -X POST "http://localhost:8000/api/v1/resources/create-lakebase-resources?create_resources=true"

# Create with custom capacity and multiple nodes
curl -X POST "http://localhost:8000/api/v1/resources/create-lakebase-resources?create_resources=true&capacity=CU_2&node_count=2&enable_readable_secondaries=true"

If the request was successful, you will get the following output:

{
"instance": "user123-lakebase-demo",
"catalog": "user123-pg-catalog",
"synced_table": "pipeline-abc123",
"message": "Resources created successfully. Synced table pipeline abc123 is provisioning asynchronously. Monitor progress at: https://workspace.databricks.com/pipelines/abc123"
}

Resources

Permissions

Your app service principal needs the following permissions:

  • CREATE permissions on the Unity Catalog metastore
  • CAN USE on the storage catalog and schema for synced tables
  • Access to the source table samples.tpch.orders

See Lakebase permissions for more information.

Dependencies

requirements.txt
databricks-sdk>=0.60.0
fastapi
uvicorn