Insert data into a table
This recipe demonstrates how to insert data into a Databricks Unity Catalog table from a FastAPI application using the Databricks SQL Connector.
In this example, we set up our API to be called using the POST
HTTP method which is the standard choice for creating new resources in REST APIs as defined in RFC 7231.
Unlike GET
, POST is not idempotent - making the same request multiple times may create multiple resources.
POST requests are typically used when submitting data to be processed or when creating new resources, with the request body containing the data to be added.
For detailed specifications, refer to RFC 7231 Section 4.3.3 which defines the POST method's semantics and requirements.
Code snippet
import os
from typing import Dict, List
from fastapi import FastAPI, Request
from databricks import sql
from databricks.sdk.core import Config
DATABRICKS_WAREHOUSE_ID = os.environ.get("DATABRICKS_WAREHOUSE_ID") or None
app = FastAPI()
databricks_cfg = Config()
def get_connection(warehouse_id: str):
http_path = f"/sql/1.0/warehouses/{DATABRICKS_WAREHOUSE_ID}"
return sql.connect(
server_hostname=databricks_cfg.host,
http_path=http_path,
credentials_provider=lambda: databricks_cfg.authenticate,
)
def insert_data(table_path: str, data: List[Dict], warehouse_id: str) -> int:
conn = get_connection(warehouse_id)
try:
with conn.cursor() as cursor:
# Get columns
columns = list(data[0].keys())
columns_str = ", ".join(columns)
placeholders = ", ".join(["?"] * len(columns))
# Build the INSERT statement with multiple VALUES clauses
values_clauses = []
all_values = []
for record in data:
values_clauses.append(f"({placeholders})")
all_values.extend(record[col] for col in columns)
insert_query = f"""
INSERT INTO {table_path} ({columns_str})
VALUES {", ".join(values_clauses)}
"""
# Execute the insert with all values in a single statement
print(f"Executing query: {insert_query}")
cursor.execute(insert_query, all_values)
return cursor.rowcount
except Exception as e:
raise Exception(f"Failed to insert data: {str(e)}")
@app.post("/api/v1/table")
async def insert_table_data(request: Request):
request_as_json = await request.json()
request_as_dict = dict(request_as_json)
results = None
try:
# Build the table path
table_path = f"{request_as_dict['catalog']}.{request_as_dict['schema']}.{request_as_dict['table']}"
# Insert the data
records_inserted = insert_data(
table_path=table_path,
data=request_as_dict["data"],
warehouse_id=DATABRICKS_WAREHOUSE_ID,
)
# Ensure records_inserted is not negative (DBSQL Side Effect)
if records_inserted < 0:
records_inserted = len(request_as_dict["data"])
# Create the response
results = {"data": request_as_dict["data"], "count": records_inserted}
except Exception as e:
raise Exception(f"FastAPI Request Failed: {str(e)}")
return {"results": results}
The above example is shortened for brevity and not suitable for production use. You can find a more advanced sample in the databricks-apps-cookbook GitHub repository.
Example Usage
In this example, we will create the following Unity Catalog table called my_catalog.my_schema.trips
via Databricks SQL Editor.
It is assumed that the user/service principal identity has the appropriate Unity Catalog grants to make changes as required.
CREATE OR REPLACE TABLE my_catalog.my_schema.trips (
trip_id INT,
passenger_count INT,
trip_distance FLOAT,
pickup_datetime TIMESTAMP,
dropoff_datetime TIMESTAMP,
payment_type STRING,
fare_amount FLOAT,
tip_amount FLOAT
)
Once the table has been created, you can provide data (list of dicts) to be inserted using the API example provided above.
To highlight this, please consult the example Python script below, noting the POST
verb and JSON
data payload.
from databricks.sdk.core import Config
import requests
config = Config(profile="my-env")
token = config.oauth_token().access_token
rows_to_be_inserted = [
{
"trip_id": 1,
"passenger_count": 1,
"trip_distance": 10.0,
"pickup_datetime": "2024-01-01 12:00:00",
"dropoff_datetime": "2024-01-01 12:10:00",
"payment_type": "credit_card",
"fare_amount": 15.0,
"tip_amount": 2.0,
},
{
"trip_id": 2,
"passenger_count": 1,
"trip_distance": 86.0,
"pickup_datetime": "2024-01-01 14:00:00",
"dropoff_datetime": "2024-01-01 15:13:00",
"payment_type": "cash",
"fare_amount": 15.0,
"tip_amount": 3.0,
},
{
"trip_id": 3,
"passenger_count": 1,
"trip_distance": 6.0,
"pickup_datetime": "2024-01-01 15:31:00",
"dropoff_datetime": "2024-01-01 15:45:00",
"payment_type": "cash",
"fare_amount": 15.0,
"tip_amount": 3.0,
},
]
response = requests.post(
"https://<your-app-url>.databricksapps.com/api/v1/table",
headers={"Authorization": f"Bearer {token}"},
json={
"catalog": "my_catalog",
"schema": "my_schema",
"table": "trips",
"data": rows_to_be_inserted,
},
)
print(response.json())
If the request was successful, you will get the following output in your terminal:
{'data': [{'trip_id': 1, 'passenger_count': 1, 'trip_distance': 10.0, 'pickup_datetime': '2024-01-01 12:00:00', 'dropoff_datetime': '2024-01-01 12:10:00', 'payment_type': 'credit_card', 'fare_amount': 15.0, 'tip_amount': 2.0}, {'trip_id': 2, 'passenger_count': 1, 'trip_distance': 86.0, 'pickup_datetime': '2024-01-01 14:00:00', 'dropoff_datetime': '2024-01-01 15:13:00', 'payment_type': 'cash', 'fare_amount': 15.0, 'tip_amount': 3.0}, {'trip_id': 3, 'passenger_count': 1, 'trip_distance': 6.0, 'pickup_datetime': '2024-01-01 15:31:00', 'dropoff_datetime': '2024-01-01 15:45:00', 'payment_type': 'cash', 'fare_amount': 15.0, 'tip_amount': 3.0}], 'count': 3, 'total': 3}
Resources
Permissions
Your app service principal needs the following permissions:
SELECT
andMODIFY
on the Unity Catalog tableCAN USE
on the SQL warehouse
See Unity Catalog privileges and securable objects for more information.
Dependencies
- Databricks SDK for Python -
databricks-sdk
- Databricks SQL Connector for Python -
databricks-sql-connector
- FastAPI -
fastapi
- uvicorn -
uvicorn
databricks-sdk
databricks-sql-connector
fastapi
uvicorn