Skip to main content

Connect to a cluster

This recipe uses Databricks Connect to execute pre-defined Python or SQL code on a shared cluster with UI inputs.

Code snippet

app.py
import reflex as rx
import os
import asyncio
from typing import Any
from databricks.connect import DatabricksSession
import pandas as pd

def run_spark_workload(host: str, cluster_id: str):
spark = DatabricksSession.builder.remote(
host=host,
cluster_id=cluster_id
).getOrCreate()

session_info = {
"App Name": spark.conf.get("spark.app.name", "Unknown"),
"Master URL": spark.conf.get("spark.master", "Unknown"),
}

query = "SELECT 'I''m a stellar cook!' AS message"
df_sql = spark.sql(query).toPandas()

df_range = spark.range(10).toPandas()

return session_info, df_sql.values.tolist(), df_range.values.tolist()

class ConnectClusterState(rx.State):
cluster_id: str = ""
session_info: dict = {}
sql_output: list[list[Any]] = []
range_output: list[list[Any]] = []
is_loading: bool = False
error_message: str = ""
success_message: str = ""

@rx.event(background=True)
async def connect_and_run(self):
async with self:
self.is_loading = True
self.error_message = ""
self.success_message = ""
try:
host = os.getenv("DATABRICKS_HOST")
loop = asyncio.get_running_loop()
info, sql_res, range_res = await loop.run_in_executor(
None, run_spark_workload, host, self.cluster_id
)
async with self:
self.session_info = info
self.sql_output = sql_res
self.range_output = range_res
self.success_message = "Successfully connected to Spark"
except Exception as e:
async with self:
self.error_message = str(e)
finally:
async with self:
self.is_loading = False

Resources

Permissions

Your app service principal needs the following permissions:

  • CAN ATTACH TO permission on the cluster

See Compute permissions for more information.

Dependencies

requirements.txt
databricks-connect
pandas
reflex