Skip to main content

Insert data into a table

This recipe demonstrates how to insert data into a Databricks Unity Catalog table from a FastAPI application using the Databricks SQL Connector.

info

In this example, we set up our API to be called using the POST HTTP method which is the standard choice for creating new resources in REST APIs as defined in RFC 7231. Unlike GET, POST is not idempotent - making the same request multiple times may create multiple resources.

POST requests are typically used when submitting data to be processed or when creating new resources, with the request body containing the data to be added.

For detailed specifications, refer to RFC 7231 Section 4.3.3 which defines the POST method's semantics and requirements.

Code snippet

app.py
import os
from typing import Dict, List

from fastapi import FastAPI, Request

from databricks import sql
from databricks.sdk.core import Config

DATABRICKS_WAREHOUSE_ID = os.environ.get("DATABRICKS_WAREHOUSE_ID") or None

app = FastAPI()
databricks_cfg = Config()


def get_connection(warehouse_id: str):
http_path = f"/sql/1.0/warehouses/{DATABRICKS_WAREHOUSE_ID}"
return sql.connect(
server_hostname=databricks_cfg.host,
http_path=http_path,
credentials_provider=lambda: databricks_cfg.authenticate,
)


def insert_data(table_path: str, data: List[Dict], warehouse_id: str) -> int:
conn = get_connection(warehouse_id)
try:
with conn.cursor() as cursor:
# Get columns
columns = list(data[0].keys())
columns_str = ", ".join(columns)
placeholders = ", ".join(["?"] * len(columns))

# Build the INSERT statement with multiple VALUES clauses
values_clauses = []
all_values = []

for record in data:
values_clauses.append(f"({placeholders})")
all_values.extend(record[col] for col in columns)

insert_query = f"""
INSERT INTO {table_path} ({columns_str})
VALUES {", ".join(values_clauses)}
"""

# Execute the insert with all values in a single statement
print(f"Executing query: {insert_query}")
cursor.execute(insert_query, all_values)
return cursor.rowcount

except Exception as e:
raise Exception(f"Failed to insert data: {str(e)}")


@app.post("/api/v1/table")
async def insert_table_data(request: Request):
request_as_json = await request.json()
request_as_dict = dict(request_as_json)
results = None
try:
# Build the table path
table_path = f"{request_as_dict['catalog']}.{request_as_dict['schema']}.{request_as_dict['table']}"

# Insert the data
records_inserted = insert_data(
table_path=table_path,
data=request_as_dict["data"],
warehouse_id=DATABRICKS_WAREHOUSE_ID,
)

# Ensure records_inserted is not negative (DBSQL Side Effect)
if records_inserted < 0:
records_inserted = len(request_as_dict["data"])

# Create the response
results = {"data": request_as_dict["data"], "count": records_inserted}
except Exception as e:
raise Exception(f"FastAPI Request Failed: {str(e)}")

return {"results": results}
warning

The above example is shortened for brevity and not suitable for production use. You can find a more advanced sample in the databricks-apps-cookbook GitHub repository.

Example Usage

In this example, we will create the following Unity Catalog table called my_catalog.my_schema.trips via Databricks SQL Editor.
It is assumed that the user/service principal identity has the appropriate Unity Catalog grants to make changes as required.

CREATE OR REPLACE TABLE my_catalog.my_schema.trips (
trip_id INT,
passenger_count INT,
trip_distance FLOAT,
pickup_datetime TIMESTAMP,
dropoff_datetime TIMESTAMP,
payment_type STRING,
fare_amount FLOAT,
tip_amount FLOAT
)

Once the table has been created, you can provide data (list of dicts) to be inserted using the API example provided above.
To highlight this, please consult the example Python script below, noting the POST verb and JSON data payload.

insert_data_into_table.py
from databricks.sdk.core import Config
import requests

config = Config(profile="my-env")
token = config.oauth_token().access_token

rows_to_be_inserted = [
{
"trip_id": 1,
"passenger_count": 1,
"trip_distance": 10.0,
"pickup_datetime": "2024-01-01 12:00:00",
"dropoff_datetime": "2024-01-01 12:10:00",
"payment_type": "credit_card",
"fare_amount": 15.0,
"tip_amount": 2.0,
},
{
"trip_id": 2,
"passenger_count": 1,
"trip_distance": 86.0,
"pickup_datetime": "2024-01-01 14:00:00",
"dropoff_datetime": "2024-01-01 15:13:00",
"payment_type": "cash",
"fare_amount": 15.0,
"tip_amount": 3.0,
},
{
"trip_id": 3,
"passenger_count": 1,
"trip_distance": 6.0,
"pickup_datetime": "2024-01-01 15:31:00",
"dropoff_datetime": "2024-01-01 15:45:00",
"payment_type": "cash",
"fare_amount": 15.0,
"tip_amount": 3.0,
},
]

response = requests.post(
"https://<your-app-url>.databricksapps.com/api/v1/table",
headers={"Authorization": f"Bearer {token}"},
json={
"catalog": "my_catalog",
"schema": "my_schema",
"table": "trips",
"data": rows_to_be_inserted,
},
)
print(response.json())

If the request was successful, you will get the following output in your terminal:

{'data': [{'trip_id': 1, 'passenger_count': 1, 'trip_distance': 10.0, 'pickup_datetime': '2024-01-01 12:00:00', 'dropoff_datetime': '2024-01-01 12:10:00', 'payment_type': 'credit_card', 'fare_amount': 15.0, 'tip_amount': 2.0}, {'trip_id': 2, 'passenger_count': 1, 'trip_distance': 86.0, 'pickup_datetime': '2024-01-01 14:00:00', 'dropoff_datetime': '2024-01-01 15:13:00', 'payment_type': 'cash', 'fare_amount': 15.0, 'tip_amount': 3.0}, {'trip_id': 3, 'passenger_count': 1, 'trip_distance': 6.0, 'pickup_datetime': '2024-01-01 15:31:00', 'dropoff_datetime': '2024-01-01 15:45:00', 'payment_type': 'cash', 'fare_amount': 15.0, 'tip_amount': 3.0}], 'count': 3, 'total': 3}

Resources

Permissions

Your app service principal needs the following permissions:

  • SELECT and MODIFY on the Unity Catalog table
  • CAN USE on the SQL warehouse

See Unity Catalog privileges and securable objects for more information.

Dependencies

requirements.txt
databricks-sdk
databricks-sql-connector
fastapi
uvicorn