Long-running operations¶
When you invoke a long-running operation, the SDK provides a high-level API to trigger these operations and wait for the related entities
to reach the correct state or return the error message in case of failure. All long-running operations return generic Wait
instance with result()
method to get a result of long-running operation, once it’s finished. Databricks SDK for Python picks the most reasonable default timeouts for
every method, but sometimes you may find yourself in a situation, where you’d want to provide datetime.timedelta()
as the value of timeout
argument to result()
method.
There are a number of long-running operations in Databricks APIs such as managing:
Clusters,
Command execution
Jobs
Libraries
Delta Live Tables pipelines
Databricks SQL warehouses.
For example, in the Clusters API, once you create a cluster, you receive a cluster ID, and the cluster is in the PENDING
state Meanwhile
Databricks takes care of provisioning virtual machines from the cloud provider in the background. The cluster is
only usable in the RUNNING
state and so you have to wait for that state to be reached.
Another example is the API for running a job or repairing the run: right after
the run starts, the run is in the PENDING
state. The job is only considered to be finished when it is in either
the TERMINATED
or SKIPPED
state. Also you would likely need the error message if the long-running
operation times out and fails with an error code. Other times you may want to configure a custom timeout other than
the default of 20 minutes.
In the following example, w.clusters.create
returns ClusterInfo
only once the cluster is in the RUNNING
state,
otherwise it will timeout in 10 minutes:
import datetime
import logging
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
info = w.clusters.create(cluster_name='Created cluster',
spark_version='12.0.x-scala2.12',
node_type_id='m5d.large',
autotermination_minutes=10,
num_workers=1).result(timeout=datetime.timedelta(minutes=10))
logging.info(f'Created: {info}')
Please look at the examples/starting_job_and_waiting.py
for a more advanced usage:
import datetime
import logging
import time
from databricks.sdk import WorkspaceClient
import databricks.sdk.service.jobs as j
w = WorkspaceClient()
# create a dummy file on DBFS that just sleeps for 10 seconds
py_on_dbfs = f'/home/{w.current_user.me().user_name}/sample.py'
with w.dbfs.open(py_on_dbfs, write=True, overwrite=True) as f:
f.write(b'import time; time.sleep(10); print("Hello, World!")')
# trigger one-time-run job and get waiter object
waiter = w.jobs.submit(run_name=f'py-sdk-run-{time.time()}', tasks=[
j.RunSubmitTaskSettings(
task_key='hello_world',
new_cluster=j.BaseClusterInfo(
spark_version=w.clusters.select_spark_version(long_term_support=True),
node_type_id=w.clusters.select_node_type(local_disk=True),
num_workers=1
),
spark_python_task=j.SparkPythonTask(
python_file=f'dbfs:{py_on_dbfs}'
),
)
])
logging.info(f'starting to poll: {waiter.run_id}')
# callback, that receives a polled entity between state updates
def print_status(run: j.Run):
statuses = [f'{t.task_key}: {t.state.life_cycle_state}' for t in run.tasks]
logging.info(f'workflow intermediate status: {", ".join(statuses)}')
# If you want to perform polling in a separate thread, process, or service,
# you can use w.jobs.wait_get_run_job_terminated_or_skipped(
# run_id=waiter.run_id,
# timeout=datetime.timedelta(minutes=15),
# callback=print_status) to achieve the same results.
#
# Waiter interface allows for `w.jobs.submit(..).result()` simplicity in
# the scenarios, where you need to block the calling thread for the job to finish.
run = waiter.result(timeout=datetime.timedelta(minutes=15),
callback=print_status)
logging.info(f'job finished: {run.run_page_url}')