PyJaws: A Pythonic Way to Define Databricks Jobs and Workflows
Folks who have used Python-based orchestration tools such as Apache Airflow, Luigi and Mage will be familiar with the concepts and the API if PyJaws.
pyjaws
:pip install pyjaws
export DATABRICKS_HOST = ...
export DATABRICKS_TOKEN = ...
examples
) and run:pyjaws create path/to/your/workflow_definitions
from pyjaws.api.base import (
Cluster,
Runtime,
Workflow
)
from pyjaws.api.tasks import PythonWheelTask
cluster = Cluster(
job_cluster_key = "ai_cluster",
spark_version = Runtime.DBR_13_ML,
num_workers = 2,
node_type_id = "Standard_DS3_v2",
cluster_log_conf = {
"dbfs": {
"destination": "dbfs:/home/cluster_log"
}
}
)
# Create a Task object.
ingest_task = PythonWheelTask(
key = "ingest",
cluster = cluster,
entrypoint = "iot",
task_name = "ingest",
parameters = [
f"my_parameter_value",
"--output-table", "my_table"
]
)
transform_task = PythonWheelTask(
key = "transform",
cluster = cluster,
entrypoint = "iot",
task_name = "ingest",
dependencies = [ingest_task],
parameters = [
f"my_parameter_value2",
"--input-table", "my_table"
"--output-table", "output_table"
]
)
# Create a Workflow object to define dependencies
# between previously defined tasks.
workflow = Workflow(
name = "my_workflow",
tasks = [ingest_task, transform_task]
)
# cluster created with context manager
with Cluster(
job_cluster_key="mycluster_2",
spark_version=Runtime.DBR_13_ML,
node_type_id="Standard_E4ds_v4",
num_workers=3,
) as cluster_2:
task_2 = SparkPythonTask(
key="task_2",
cluster=cluster_2,
python_file="/Workspace/Repos/[email protected]/utils/task_2.py",
source=Source.WORKSPACE,
)
display(workflow)
Result:
If you have a folder containing multiple workflow definition files written in Python with PyJaws, it is quite simple to deploy all of them to your Databricks Workspace with a one liner:
pyjaws create examples/simple_workflow
This would result in the following Workflow being deployed to your workspace:
By default, pyjaws also includes some useful tags into the workflows indicating which Git Repo hosts the Python definition, commit hash and when the workflow was last updated. For example:
tox
, pytest
, pytest-cov
, and coverage
are installed and from a bash terminal, simply run tox
.Feel free to create an issue if you feel something is not right. Contribution guidelines can be found here.