Skip to main content

Interact with Lakebase Tables

This recipe demonstrates how to build a complete orders management API using Lakebase PostgreSQL database. These endpoints provide CRUD operations and various query patterns for handling orders data synchronized from Databricks Unity Catalog.

Prerequisites
  • Lakebase resources must be created using the Create Lakebase Resources endpoint
  • The synced table pipeline must be completed and orders data synchronized from samples.tpch.orders
  • Database connection must be configured in your application environment
info

In this example, we demonstrate multiple HTTP methods (GET, POST) which are the standard choices for data operations in REST APIs as defined in RFC 7231:

  • GET requests are idempotent and cacheable, ideal for data retrieval
  • POST requests for updates and modifications

For detailed specifications, refer to RFC 7231 which defines HTTP method semantics.

Code snippet

routes/v1/orders.py
import logging

from config.database import get_async_db
from models.orders import (
CursorPaginationInfo,
Order,
OrderCount,
OrderListCursorResponse,
OrderListResponse,
OrderRead,
OrderSample,
OrderStatusUpdate,
OrderStatusUpdateResponse,
PaginationInfo,
)
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession

from fastapi import APIRouter, Depends, HTTPException, Query

logger = logging.getLogger(__name__)
router = APIRouter(tags=["orders"])

# 1. GET ORDERS COUNT
@router.get("/count", response_model=OrderCount, summary="Get total order count")
async def get_order_count(db: AsyncSession = Depends(get_async_db)):
try:
stmt = select(func.count(Order.o_orderkey))
result = await db.execute(stmt)
count = result.scalar()
return OrderCount(total_orders=count)
except Exception as e:
logger.error(f"Error getting order count: {e}")
raise HTTPException(status_code=500, detail="Failed to retrieve order count")

# 2. GET SAMPLE ORDERS
@router.get("/sample", response_model=OrderSample, summary="Get 5 random order keys")
async def get_sample_orders(db: AsyncSession = Depends(get_async_db)):
try:
stmt = select(Order.o_orderkey).limit(5)
result = await db.execute(stmt)
order_keys = result.scalars().all()
return OrderSample(sample_order_keys=order_keys)
except Exception as e:
logger.error(f"Error getting sample orders: {e}")
raise HTTPException(status_code=500, detail="Failed to retrieve sample orders")

# 3. PAGE-BASED PAGINATION
@router.get("/pages", response_model=OrderListResponse, summary="Get orders with page-based pagination")
async def get_orders_by_page(
page: int = Query(1, ge=1, description="Page number (1-based)"),
page_size: int = Query(100, ge=1, le=1000, description="Number of records per page (max 1000)"),
include_count: bool = Query(True, description="Include total count for pagination info"),
db: AsyncSession = Depends(get_async_db),
):
try:
if include_count:
count_stmt = select(func.count(Order.o_orderkey))
count_result = await db.execute(count_stmt)
total_count = count_result.scalar()
total_pages = (total_count + page_size - 1) // page_size
else:
total_count = -1
total_pages = -1

offset = (page - 1) * page_size
stmt = (
select(Order)
.order_by(Order.o_orderkey)
.offset(offset)
.limit(page_size + 1)
)

result = await db.execute(stmt)
all_orders = result.scalars().all()

has_next = len(all_orders) > page_size
orders = all_orders[:page_size]
has_previous = page > 1

pagination_info = PaginationInfo(
page=page,
page_size=page_size,
total_pages=total_pages,
total_count=total_count,
has_next=has_next,
has_previous=has_previous,
)

return OrderListResponse(orders=orders, pagination=pagination_info)

except Exception as e:
logger.error(f"Error getting page-based orders: {e}")
raise HTTPException(status_code=500, detail="Failed to retrieve orders")

# 4. CURSOR-BASED PAGINATION
@router.get("/stream", response_model=OrderListCursorResponse, summary="Get orders with cursor-based pagination")
async def get_orders_by_cursor(
cursor: int = Query(0, ge=0, description="Start after this order key (0 for beginning)"),
page_size: int = Query(100, ge=1, le=1000, description="Number of records to fetch (max 1000)"),
db: AsyncSession = Depends(get_async_db),
):
try:
stmt = (
select(Order)
.where(Order.o_orderkey > cursor)
.order_by(Order.o_orderkey)
.limit(page_size + 1)
)

result = await db.execute(stmt)
all_orders = result.scalars().all()

has_next = len(all_orders) > page_size
orders = all_orders[:page_size]
has_previous = cursor > 0

next_cursor = orders[-1].o_orderkey if orders and has_next else None
previous_cursor = max(0, cursor - page_size) if has_previous else None

pagination_info = CursorPaginationInfo(
page_size=page_size,
has_next=has_next,
has_previous=has_previous,
next_cursor=next_cursor,
previous_cursor=previous_cursor,
)

return OrderListCursorResponse(orders=orders, pagination=pagination_info)

except Exception as e:
logger.error(f"Error getting cursor-based orders: {e}")
raise HTTPException(status_code=500, detail="Failed to retrieve orders")

# 5. GET SPECIFIC ORDER
@router.get("/{order_key}", response_model=OrderRead, summary="Get an order by its key")
async def read_order(order_key: int, db: AsyncSession = Depends(get_async_db)):
try:
if order_key <= 0:
raise HTTPException(status_code=400, detail="Invalid order key provided")

stmt = select(Order).where(Order.o_orderkey == order_key)
result = await db.execute(stmt)
order = result.scalars().first()

if not order:
raise HTTPException(status_code=404, detail=f"Order with key '{order_key}' not found")

return order

except HTTPException:
raise
except Exception as e:
logger.error(f"Unexpected error fetching order {order_key}: {e}")
raise HTTPException(status_code=500, detail="Internal server error occurred")

# 6. UPDATE ORDER STATUS
@router.post("/{order_key}/status", response_model=OrderStatusUpdateResponse, summary="Update order status")
async def update_order_status(
order_key: int,
status_data: OrderStatusUpdate,
db: AsyncSession = Depends(get_async_db),
):
try:
if order_key <= 0:
raise HTTPException(status_code=400, detail="Invalid order key provided")

check_stmt = select(Order).where(Order.o_orderkey == order_key)
check_result = await db.execute(check_stmt)
existing_order = check_result.scalars().first()

if not existing_order:
raise HTTPException(status_code=404, detail=f"Order with key '{order_key}' not found")

existing_order.o_orderstatus = status_data.o_orderstatus
await db.commit()
await db.refresh(existing_order)

return OrderStatusUpdateResponse(
o_orderkey=order_key,
o_orderstatus=status_data.o_orderstatus,
message="Order status updated successfully",
)

except HTTPException:
raise
except Exception as e:
logger.error(f"Error updating status for order {order_key}: {e}")
raise HTTPException(status_code=500, detail="Failed to update order status")
warning

The above example is shortened for brevity and not suitable for production use. You can find a more advanced sample in the databricks-apps-cookbook GitHub repository.

Example Usage

The orders API provides six main endpoints for different use cases:

EndpointMethodPurposeBest For
/orders/countGETGet total orders countDashboards, monitoring
/orders/sampleGETGet 5 sample order keysTesting, development
/orders/pagesGETPage-based paginationTraditional UIs with page numbers
/orders/streamGETCursor-based paginationLarge datasets, infinite scroll
/orders/{order_key}GETGet specific orderOrder details, lookups
/orders/{order_key}/statusPOSTUpdate order statusOrder processing workflows

Get Orders Count

curl -X GET "http://localhost:8000/api/v1/orders/count"
{
"total_orders": 1500000
}

Get Sample Orders

curl -X GET "http://localhost:8000/api/v1/orders/sample"
{
"sample_order_keys": [1, 32, 33, 34, 35]
}

Page-Based Pagination

curl -X GET "http://localhost:8000/api/v1/orders/pages?page=1&page_size=2"
{
"orders": [
{
"o_orderkey": 1,
"o_custkey": 370,
"o_orderstatus": "O",
"o_totalprice": 172799.49,
"o_orderdate": "1996-01-02",
"o_orderpriority": "5-LOW",
"o_clerk": "Clerk#000000951",
"o_shippriority": 0,
"o_comment": "nstructions sleep furiously among"
},
{
"o_orderkey": 2,
"o_custkey": 781,
"o_orderstatus": "O",
"o_totalprice": 46929.18,
"o_orderdate": "1996-12-01",
"o_orderpriority": "1-URGENT",
"o_clerk": "Clerk#000000880",
"o_shippriority": 0,
"o_comment": "foxes. pending accounts at the pending"
}
],
"pagination": {
"page": 1,
"page_size": 2,
"total_pages": 750000,
"total_count": 1500000,
"has_next": true,
"has_previous": false
}
}

Update Order Status

curl -X POST "http://localhost:8000/api/v1/orders/1/status" \
-H "Content-Type: application/json" \
-d '{"o_orderstatus": "F"}'
{
"o_orderkey": 1,
"o_orderstatus": "F",
"message": "Order status updated successfully"
}

Resources

Permissions

Your app service principal needs the following permissions:

  • Database instance connection access via OAuth tokens
  • SELECT permissions on the orders_synced table in your Lakebase database
  • UPDATE permissions on the orders_synced table for status updates
  • Database user role with appropriate table access

See Lakebase permissions for more information.

Dependencies

requirements.txt
databricks-sdk>=0.60.0
fastapi
sqlalchemy
sqlmodel
asyncpg
uvicorn