Interact with Lakebase Tables
This recipe demonstrates how to build a complete orders management API using Lakebase PostgreSQL database. These endpoints provide CRUD operations and various query patterns for handling orders data synchronized from Databricks Unity Catalog.
- Lakebase resources must be created using the Create Lakebase Resources endpoint
- The synced table pipeline must be completed and orders data synchronized from
samples.tpch.orders
- Database connection must be configured in your application environment
In this example, we demonstrate multiple HTTP methods (GET, POST) which are the standard choices for data operations in REST APIs as defined in RFC 7231:
GET
requests are idempotent and cacheable, ideal for data retrievalPOST
requests for updates and modifications
For detailed specifications, refer to RFC 7231 which defines HTTP method semantics.
Code snippet
import logging
from config.database import get_async_db
from models.orders import (
CursorPaginationInfo,
Order,
OrderCount,
OrderListCursorResponse,
OrderListResponse,
OrderRead,
OrderSample,
OrderStatusUpdate,
OrderStatusUpdateResponse,
PaginationInfo,
)
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from fastapi import APIRouter, Depends, HTTPException, Query
logger = logging.getLogger(__name__)
router = APIRouter(tags=["orders"])
# 1. GET ORDERS COUNT
@router.get("/count", response_model=OrderCount, summary="Get total order count")
async def get_order_count(db: AsyncSession = Depends(get_async_db)):
try:
stmt = select(func.count(Order.o_orderkey))
result = await db.execute(stmt)
count = result.scalar()
return OrderCount(total_orders=count)
except Exception as e:
logger.error(f"Error getting order count: {e}")
raise HTTPException(status_code=500, detail="Failed to retrieve order count")
# 2. GET SAMPLE ORDERS
@router.get("/sample", response_model=OrderSample, summary="Get 5 random order keys")
async def get_sample_orders(db: AsyncSession = Depends(get_async_db)):
try:
stmt = select(Order.o_orderkey).limit(5)
result = await db.execute(stmt)
order_keys = result.scalars().all()
return OrderSample(sample_order_keys=order_keys)
except Exception as e:
logger.error(f"Error getting sample orders: {e}")
raise HTTPException(status_code=500, detail="Failed to retrieve sample orders")
# 3. PAGE-BASED PAGINATION
@router.get("/pages", response_model=OrderListResponse, summary="Get orders with page-based pagination")
async def get_orders_by_page(
page: int = Query(1, ge=1, description="Page number (1-based)"),
page_size: int = Query(100, ge=1, le=1000, description="Number of records per page (max 1000)"),
include_count: bool = Query(True, description="Include total count for pagination info"),
db: AsyncSession = Depends(get_async_db),
):
try:
if include_count:
count_stmt = select(func.count(Order.o_orderkey))
count_result = await db.execute(count_stmt)
total_count = count_result.scalar()
total_pages = (total_count + page_size - 1) // page_size
else:
total_count = -1
total_pages = -1
offset = (page - 1) * page_size
stmt = (
select(Order)
.order_by(Order.o_orderkey)
.offset(offset)
.limit(page_size + 1)
)
result = await db.execute(stmt)
all_orders = result.scalars().all()
has_next = len(all_orders) > page_size
orders = all_orders[:page_size]
has_previous = page > 1
pagination_info = PaginationInfo(
page=page,
page_size=page_size,
total_pages=total_pages,
total_count=total_count,
has_next=has_next,
has_previous=has_previous,
)
return OrderListResponse(orders=orders, pagination=pagination_info)
except Exception as e:
logger.error(f"Error getting page-based orders: {e}")
raise HTTPException(status_code=500, detail="Failed to retrieve orders")
# 4. CURSOR-BASED PAGINATION
@router.get("/stream", response_model=OrderListCursorResponse, summary="Get orders with cursor-based pagination")
async def get_orders_by_cursor(
cursor: int = Query(0, ge=0, description="Start after this order key (0 for beginning)"),
page_size: int = Query(100, ge=1, le=1000, description="Number of records to fetch (max 1000)"),
db: AsyncSession = Depends(get_async_db),
):
try:
stmt = (
select(Order)
.where(Order.o_orderkey > cursor)
.order_by(Order.o_orderkey)
.limit(page_size + 1)
)
result = await db.execute(stmt)
all_orders = result.scalars().all()
has_next = len(all_orders) > page_size
orders = all_orders[:page_size]
has_previous = cursor > 0
next_cursor = orders[-1].o_orderkey if orders and has_next else None
previous_cursor = max(0, cursor - page_size) if has_previous else None
pagination_info = CursorPaginationInfo(
page_size=page_size,
has_next=has_next,
has_previous=has_previous,
next_cursor=next_cursor,
previous_cursor=previous_cursor,
)
return OrderListCursorResponse(orders=orders, pagination=pagination_info)
except Exception as e:
logger.error(f"Error getting cursor-based orders: {e}")
raise HTTPException(status_code=500, detail="Failed to retrieve orders")
# 5. GET SPECIFIC ORDER
@router.get("/{order_key}", response_model=OrderRead, summary="Get an order by its key")
async def read_order(order_key: int, db: AsyncSession = Depends(get_async_db)):
try:
if order_key <= 0:
raise HTTPException(status_code=400, detail="Invalid order key provided")
stmt = select(Order).where(Order.o_orderkey == order_key)
result = await db.execute(stmt)
order = result.scalars().first()
if not order:
raise HTTPException(status_code=404, detail=f"Order with key '{order_key}' not found")
return order
except HTTPException:
raise
except Exception as e:
logger.error(f"Unexpected error fetching order {order_key}: {e}")
raise HTTPException(status_code=500, detail="Internal server error occurred")
# 6. UPDATE ORDER STATUS
@router.post("/{order_key}/status", response_model=OrderStatusUpdateResponse, summary="Update order status")
async def update_order_status(
order_key: int,
status_data: OrderStatusUpdate,
db: AsyncSession = Depends(get_async_db),
):
try:
if order_key <= 0:
raise HTTPException(status_code=400, detail="Invalid order key provided")
check_stmt = select(Order).where(Order.o_orderkey == order_key)
check_result = await db.execute(check_stmt)
existing_order = check_result.scalars().first()
if not existing_order:
raise HTTPException(status_code=404, detail=f"Order with key '{order_key}' not found")
existing_order.o_orderstatus = status_data.o_orderstatus
await db.commit()
await db.refresh(existing_order)
return OrderStatusUpdateResponse(
o_orderkey=order_key,
o_orderstatus=status_data.o_orderstatus,
message="Order status updated successfully",
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error updating status for order {order_key}: {e}")
raise HTTPException(status_code=500, detail="Failed to update order status")
The above example is shortened for brevity and not suitable for production use. You can find a more advanced sample in the databricks-apps-cookbook GitHub repository.
Example Usage
The orders API provides six main endpoints for different use cases:
Endpoint | Method | Purpose | Best For |
---|---|---|---|
/orders/count | GET | Get total orders count | Dashboards, monitoring |
/orders/sample | GET | Get 5 sample order keys | Testing, development |
/orders/pages | GET | Page-based pagination | Traditional UIs with page numbers |
/orders/stream | GET | Cursor-based pagination | Large datasets, infinite scroll |
/orders/{order_key} | GET | Get specific order | Order details, lookups |
/orders/{order_key}/status | POST | Update order status | Order processing workflows |
Get Orders Count
curl -X GET "http://localhost:8000/api/v1/orders/count"
{
"total_orders": 1500000
}
Get Sample Orders
curl -X GET "http://localhost:8000/api/v1/orders/sample"
{
"sample_order_keys": [1, 32, 33, 34, 35]
}
Page-Based Pagination
curl -X GET "http://localhost:8000/api/v1/orders/pages?page=1&page_size=2"
{
"orders": [
{
"o_orderkey": 1,
"o_custkey": 370,
"o_orderstatus": "O",
"o_totalprice": 172799.49,
"o_orderdate": "1996-01-02",
"o_orderpriority": "5-LOW",
"o_clerk": "Clerk#000000951",
"o_shippriority": 0,
"o_comment": "nstructions sleep furiously among"
},
{
"o_orderkey": 2,
"o_custkey": 781,
"o_orderstatus": "O",
"o_totalprice": 46929.18,
"o_orderdate": "1996-12-01",
"o_orderpriority": "1-URGENT",
"o_clerk": "Clerk#000000880",
"o_shippriority": 0,
"o_comment": "foxes. pending accounts at the pending"
}
],
"pagination": {
"page": 1,
"page_size": 2,
"total_pages": 750000,
"total_count": 1500000,
"has_next": true,
"has_previous": false
}
}
Update Order Status
curl -X POST "http://localhost:8000/api/v1/orders/1/status" \
-H "Content-Type: application/json" \
-d '{"o_orderstatus": "F"}'
{
"o_orderkey": 1,
"o_orderstatus": "F",
"message": "Order status updated successfully"
}
Resources
Permissions
Your app service principal needs the following permissions:
- Database instance connection access via OAuth tokens
SELECT
permissions on theorders_synced
table in your Lakebase databaseUPDATE
permissions on theorders_synced
table for status updates- Database user role with appropriate table access
See Lakebase permissions for more information.
Dependencies
- Databricks SDK for Python -
databricks-sdk
- FastAPI -
fastapi
- SQLAlchemy -
sqlalchemy
- SQLModel -
sqlmodel
- asyncpg -
asyncpg
- uvicorn -
uvicorn
databricks-sdk>=0.60.0
fastapi
sqlalchemy
sqlmodel
asyncpg
uvicorn