Run a workflow
This recipe triggers a Databricks Workflows job using the Databricks SDK for Python.
Code snippet
app.py
import reflex as rx
import json
from databricks.sdk import WorkspaceClient
def trigger_workflow(job_id: str, params: dict):
w = WorkspaceClient()
try:
run = w.jobs.run_now(job_id=int(job_id), job_parameters=params)
return {"run_id": run.run_id, "number_in_job": run.number_in_job}
except Exception as e:
return {"error": str(e)}
class TriggerJobState(rx.State):
job_id: str = ""
parameters_input: str = ""
result_data: str = ""
error_message: str = ""
is_loading: bool = False
@rx.event(background=True)
async def trigger_job(self):
async with self:
self.is_loading = True
self.error_message = ""
self.result_data = ""
if not self.job_id:
yield rx.toast("Please specify a Job ID.", level="warning")
self.is_loading = False
return
try:
params = json.loads(self.parameters_input.strip())
result = trigger_workflow(self.job_id, params)
async with self:
if "error" in result:
self.error_message = result["error"]
yield rx.toast(f"Error: {result['error']}", level="error")
else:
self.result_data = json.dumps(result, indent=2)
yield rx.toast(f"Run started: {result.get('run_id')}", level="success")
except json.JSONDecodeError:
async with self:
yield rx.toast("Invalid JSON parameters", level="error")
except Exception as e:
async with self:
self.error_message = str(e)
finally:
async with self:
self.is_loading = False
Resources
Permissions
Your app service principal needs the following permissions:
CAN MANAGE RUNpermission on the job
See Control access to a job for more information.
Dependencies
- Databricks SDK for Python -
databricks-sdk - Reflex -
reflex
requirements.txt
databricks-sdk
reflex