Skip to main content

Run a workflow

This recipe triggers a Databricks Workflows job using the Databricks SDK for Python.

Code snippet

app.py
import reflex as rx
import json
from databricks.sdk import WorkspaceClient

def trigger_workflow(job_id: str, params: dict):
w = WorkspaceClient()
try:
run = w.jobs.run_now(job_id=int(job_id), job_parameters=params)
return {"run_id": run.run_id, "number_in_job": run.number_in_job}
except Exception as e:
return {"error": str(e)}

class TriggerJobState(rx.State):
job_id: str = ""
parameters_input: str = ""
result_data: str = ""
error_message: str = ""
is_loading: bool = False

@rx.event(background=True)
async def trigger_job(self):
async with self:
self.is_loading = True
self.error_message = ""
self.result_data = ""

if not self.job_id:
yield rx.toast("Please specify a Job ID.", level="warning")
self.is_loading = False
return

try:
params = json.loads(self.parameters_input.strip())
result = trigger_workflow(self.job_id, params)

async with self:
if "error" in result:
self.error_message = result["error"]
yield rx.toast(f"Error: {result['error']}", level="error")
else:
self.result_data = json.dumps(result, indent=2)
yield rx.toast(f"Run started: {result.get('run_id')}", level="success")
except json.JSONDecodeError:
async with self:
yield rx.toast("Invalid JSON parameters", level="error")
except Exception as e:
async with self:
self.error_message = str(e)
finally:
async with self:
self.is_loading = False

Resources

Permissions

Your app service principal needs the following permissions:

  • CAN MANAGE RUN permission on the job

See Control access to a job for more information.

Dependencies

requirements.txt
databricks-sdk
reflex