w.pipelines
: Pipelines¶
- class databricks.sdk.service.pipelines.PipelinesAPI¶
The Delta Live Tables API allows you to create, edit, delete, start, and view details about pipelines.
Delta Live Tables is a framework for building reliable, maintainable, and testable data processing pipelines. You define the transformations to perform on your data, and Delta Live Tables manages task orchestration, cluster management, monitoring, data quality, and error handling.
Instead of defining your data pipelines using a series of separate Apache Spark tasks, Delta Live Tables manages how your data is transformed based on a target schema you define for each processing step. You can also enforce data quality with Delta Live Tables expectations. Expectations allow you to define expected data quality and specify how to handle records that fail those expectations.
- create([, allow_duplicate_names: Optional[bool], catalog: Optional[str], channel: Optional[str], clusters: Optional[List[PipelineCluster]], configuration: Optional[Dict[str, str]], continuous: Optional[bool], deployment: Optional[PipelineDeployment], development: Optional[bool], dry_run: Optional[bool], edition: Optional[str], filters: Optional[Filters], id: Optional[str], ingestion_definition: Optional[ManagedIngestionPipelineDefinition], libraries: Optional[List[PipelineLibrary]], name: Optional[str], notifications: Optional[List[Notifications]], photon: Optional[bool], serverless: Optional[bool], storage: Optional[str], target: Optional[str], trigger: Optional[PipelineTrigger]]) CreatePipelineResponse ¶
Usage:
import os import time from databricks.sdk import WorkspaceClient from databricks.sdk.service import pipelines w = WorkspaceClient() notebook_path = f'/Users/{w.current_user.me().user_name}/sdk-{time.time_ns()}' created = w.pipelines.create( continuous=False, name=f'sdk-{time.time_ns()}', libraries=[pipelines.PipelineLibrary(notebook=pipelines.NotebookLibrary(path=notebook_path))], clusters=[ pipelines.PipelineCluster(instance_pool_id=os.environ["TEST_INSTANCE_POOL_ID"], label="default", num_workers=1, custom_tags={ "cluster_type": "default", }) ]) # cleanup w.pipelines.delete(pipeline_id=created.pipeline_id)
Create a pipeline.
Creates a new data processing pipeline based on the requested configuration. If successful, this method returns the ID of the new pipeline.
- Parameters:
allow_duplicate_names – bool (optional) If false, deployment will fail if name conflicts with that of another pipeline.
catalog – str (optional) A catalog in Unity Catalog to publish data from this pipeline to. If target is specified, tables in this pipeline are published to a target schema inside catalog (for example, catalog.`target`.`table`). If target is not specified, no data is published to Unity Catalog.
channel – str (optional) DLT Release Channel that specifies which version to use.
clusters – List[
PipelineCluster
] (optional) Cluster settings for this pipeline deployment.configuration – Dict[str,str] (optional) String-String configuration for this pipeline execution.
continuous – bool (optional) Whether the pipeline is continuous or triggered. This replaces trigger.
deployment –
PipelineDeployment
(optional) Deployment type of this pipeline.development – bool (optional) Whether the pipeline is in Development mode. Defaults to false.
dry_run – bool (optional)
edition – str (optional) Pipeline product edition.
filters –
Filters
(optional) Filters on which Pipeline packages to include in the deployed graph.id – str (optional) Unique identifier for this pipeline.
ingestion_definition –
ManagedIngestionPipelineDefinition
(optional) The configuration for a managed ingestion pipeline. These settings cannot be used with the ‘libraries’, ‘target’ or ‘catalog’ settings.libraries – List[
PipelineLibrary
] (optional) Libraries or code needed by this deployment.name – str (optional) Friendly identifier for this pipeline.
notifications – List[
Notifications
] (optional) List of notification settings for this pipeline.photon – bool (optional) Whether Photon is enabled for this pipeline.
serverless – bool (optional) Whether serverless compute is enabled for this pipeline.
storage – str (optional) DBFS root directory for storing checkpoints and tables.
target – str (optional) Target schema (database) to add tables in this pipeline to. If not specified, no data is published to the Hive metastore or Unity Catalog. To publish to Unity Catalog, also specify catalog.
trigger –
PipelineTrigger
(optional) Which pipeline trigger to use. Deprecated: Use continuous instead.
- Returns:
- delete(pipeline_id: str)¶
Delete a pipeline.
Deletes a pipeline.
- Parameters:
pipeline_id – str
- get(pipeline_id: str) GetPipelineResponse ¶
Usage:
import os import time from databricks.sdk import WorkspaceClient from databricks.sdk.service import pipelines w = WorkspaceClient() notebook_path = f'/Users/{w.current_user.me().user_name}/sdk-{time.time_ns()}' created = w.pipelines.create( continuous=False, name=f'sdk-{time.time_ns()}', libraries=[pipelines.PipelineLibrary(notebook=pipelines.NotebookLibrary(path=notebook_path))], clusters=[ pipelines.PipelineCluster(instance_pool_id=os.environ["TEST_INSTANCE_POOL_ID"], label="default", num_workers=1, custom_tags={ "cluster_type": "default", }) ]) by_id = w.pipelines.get(pipeline_id=created.pipeline_id) # cleanup w.pipelines.delete(pipeline_id=created.pipeline_id)
Get a pipeline.
- Parameters:
pipeline_id – str
- Returns:
- get_permission_levels(pipeline_id: str) GetPipelinePermissionLevelsResponse ¶
Get pipeline permission levels.
Gets the permission levels that a user can have on an object.
- Parameters:
pipeline_id – str The pipeline for which to get or manage permissions.
- Returns:
- get_permissions(pipeline_id: str) PipelinePermissions ¶
Get pipeline permissions.
Gets the permissions of a pipeline. Pipelines can inherit permissions from their root object.
- Parameters:
pipeline_id – str The pipeline for which to get or manage permissions.
- Returns:
- get_update(pipeline_id: str, update_id: str) GetUpdateResponse ¶
Get a pipeline update.
Gets an update from an active pipeline.
- Parameters:
pipeline_id – str The ID of the pipeline.
update_id – str The ID of the update.
- Returns:
- list_pipeline_events(pipeline_id: str [, filter: Optional[str], max_results: Optional[int], order_by: Optional[List[str]], page_token: Optional[str]]) Iterator[PipelineEvent] ¶
Usage:
import os import time from databricks.sdk import WorkspaceClient from databricks.sdk.service import pipelines w = WorkspaceClient() notebook_path = f'/Users/{w.current_user.me().user_name}/sdk-{time.time_ns()}' created = w.pipelines.create( continuous=False, name=f'sdk-{time.time_ns()}', libraries=[pipelines.PipelineLibrary(notebook=pipelines.NotebookLibrary(path=notebook_path))], clusters=[ pipelines.PipelineCluster(instance_pool_id=os.environ["TEST_INSTANCE_POOL_ID"], label="default", num_workers=1, custom_tags={ "cluster_type": "default", }) ]) events = w.pipelines.list_pipeline_events(pipeline_id=created.pipeline_id) # cleanup w.pipelines.delete(pipeline_id=created.pipeline_id)
List pipeline events.
Retrieves events for a pipeline.
- Parameters:
pipeline_id – str
filter –
str (optional) Criteria to select a subset of results, expressed using a SQL-like syntax. The supported filters are: 1. level=’INFO’ (or WARN or ERROR) 2. level in (‘INFO’, ‘WARN’) 3. id=’[event-id]’ 4. timestamp > ‘TIMESTAMP’ (or >=,<,<=,=)
Composite expressions are supported, for example: level in (‘ERROR’, ‘WARN’) AND timestamp> ‘2021-07-22T06:37:33.083Z’
max_results – int (optional) Max number of entries to return in a single page. The system may return fewer than max_results events in a response, even if there are more events available.
order_by – List[str] (optional) A string indicating a sort order by timestamp for the results, for example, [“timestamp asc”]. The sort order can be ascending or descending. By default, events are returned in descending order by timestamp.
page_token – str (optional) Page token returned by previous call. This field is mutually exclusive with all fields in this request except max_results. An error is returned if any fields other than max_results are set when this field is set.
- Returns:
Iterator over
PipelineEvent
- list_pipelines([, filter: Optional[str], max_results: Optional[int], order_by: Optional[List[str]], page_token: Optional[str]]) Iterator[PipelineStateInfo] ¶
Usage:
from databricks.sdk import WorkspaceClient from databricks.sdk.service import pipelines w = WorkspaceClient() all = w.pipelines.list_pipelines(pipelines.ListPipelinesRequest())
List pipelines.
Lists pipelines defined in the Delta Live Tables system.
- Parameters:
filter –
str (optional) Select a subset of results based on the specified criteria. The supported filters are:
notebook=’<path>’ to select pipelines that reference the provided notebook path. * `name LIKE
’[pattern]’` to select pipelines with a name that matches pattern. Wildcards are supported, for example: name LIKE ‘%shopping%’
Composite filters are not supported. This field is optional.
max_results – int (optional) The maximum number of entries to return in a single page. The system may return fewer than max_results events in a response, even if there are more events available. This field is optional. The default value is 25. The maximum value is 100. An error is returned if the value of max_results is greater than 100.
order_by – List[str] (optional) A list of strings specifying the order of results. Supported order_by fields are id and name. The default is id asc. This field is optional.
page_token – str (optional) Page token returned by previous call
- Returns:
Iterator over
PipelineStateInfo
- list_updates(pipeline_id: str [, max_results: Optional[int], page_token: Optional[str], until_update_id: Optional[str]]) ListUpdatesResponse ¶
List pipeline updates.
List updates for an active pipeline.
- Parameters:
pipeline_id – str The pipeline to return updates for.
max_results – int (optional) Max number of entries to return in a single page.
page_token – str (optional) Page token returned by previous call
until_update_id – str (optional) If present, returns updates until and including this update_id.
- Returns:
- set_permissions(pipeline_id: str [, access_control_list: Optional[List[PipelineAccessControlRequest]]]) PipelinePermissions ¶
Set pipeline permissions.
Sets permissions on a pipeline. Pipelines can inherit permissions from their root object.
- Parameters:
pipeline_id – str The pipeline for which to get or manage permissions.
access_control_list – List[
PipelineAccessControlRequest
] (optional)
- Returns:
- start_update(pipeline_id: str [, cause: Optional[StartUpdateCause], full_refresh: Optional[bool], full_refresh_selection: Optional[List[str]], refresh_selection: Optional[List[str]], validate_only: Optional[bool]]) StartUpdateResponse ¶
Start a pipeline.
Starts a new update for the pipeline. If there is already an active update for the pipeline, the request will fail and the active update will remain running.
- Parameters:
pipeline_id – str
cause –
StartUpdateCause
(optional)full_refresh – bool (optional) If true, this update will reset all tables before running.
full_refresh_selection – List[str] (optional) A list of tables to update with fullRefresh. If both refresh_selection and full_refresh_selection are empty, this is a full graph update. Full Refresh on a table means that the states of the table will be reset before the refresh.
refresh_selection – List[str] (optional) A list of tables to update without fullRefresh. If both refresh_selection and full_refresh_selection are empty, this is a full graph update. Full Refresh on a table means that the states of the table will be reset before the refresh.
validate_only – bool (optional) If true, this update only validates the correctness of pipeline source code but does not materialize or publish any datasets.
- Returns:
- stop(pipeline_id: str) Wait[GetPipelineResponse] ¶
Stop a pipeline.
Stops the pipeline by canceling the active update. If there is no active update for the pipeline, this request is a no-op.
- Parameters:
pipeline_id – str
- Returns:
Long-running operation waiter for
GetPipelineResponse
. See :method:wait_get_pipeline_idle for more details.
- stop_and_wait(pipeline_id: str, timeout: datetime.timedelta = 0:20:00) GetPipelineResponse ¶
- update(pipeline_id: str [, allow_duplicate_names: Optional[bool], catalog: Optional[str], channel: Optional[str], clusters: Optional[List[PipelineCluster]], configuration: Optional[Dict[str, str]], continuous: Optional[bool], deployment: Optional[PipelineDeployment], development: Optional[bool], edition: Optional[str], expected_last_modified: Optional[int], filters: Optional[Filters], id: Optional[str], ingestion_definition: Optional[ManagedIngestionPipelineDefinition], libraries: Optional[List[PipelineLibrary]], name: Optional[str], notifications: Optional[List[Notifications]], photon: Optional[bool], serverless: Optional[bool], storage: Optional[str], target: Optional[str], trigger: Optional[PipelineTrigger]])¶
Usage:
import os import time from databricks.sdk import WorkspaceClient from databricks.sdk.service import pipelines w = WorkspaceClient() notebook_path = f'/Users/{w.current_user.me().user_name}/sdk-{time.time_ns()}' created = w.pipelines.create( continuous=False, name=f'sdk-{time.time_ns()}', libraries=[pipelines.PipelineLibrary(notebook=pipelines.NotebookLibrary(path=notebook_path))], clusters=[ pipelines.PipelineCluster(instance_pool_id=os.environ["TEST_INSTANCE_POOL_ID"], label="default", num_workers=1, custom_tags={ "cluster_type": "default", }) ]) w.pipelines.update( pipeline_id=created.pipeline_id, name=f'sdk-{time.time_ns()}', libraries=[pipelines.PipelineLibrary(notebook=pipelines.NotebookLibrary(path=notebook_path))], clusters=[ pipelines.PipelineCluster(instance_pool_id=os.environ["TEST_INSTANCE_POOL_ID"], label="default", num_workers=1, custom_tags={ "cluster_type": "default", }) ]) # cleanup w.pipelines.delete(pipeline_id=created.pipeline_id)
Edit a pipeline.
Updates a pipeline with the supplied configuration.
- Parameters:
pipeline_id – str Unique identifier for this pipeline.
allow_duplicate_names – bool (optional) If false, deployment will fail if name has changed and conflicts the name of another pipeline.
catalog – str (optional) A catalog in Unity Catalog to publish data from this pipeline to. If target is specified, tables in this pipeline are published to a target schema inside catalog (for example, catalog.`target`.`table`). If target is not specified, no data is published to Unity Catalog.
channel – str (optional) DLT Release Channel that specifies which version to use.
clusters – List[
PipelineCluster
] (optional) Cluster settings for this pipeline deployment.configuration – Dict[str,str] (optional) String-String configuration for this pipeline execution.
continuous – bool (optional) Whether the pipeline is continuous or triggered. This replaces trigger.
deployment –
PipelineDeployment
(optional) Deployment type of this pipeline.development – bool (optional) Whether the pipeline is in Development mode. Defaults to false.
edition – str (optional) Pipeline product edition.
expected_last_modified – int (optional) If present, the last-modified time of the pipeline settings before the edit. If the settings were modified after that time, then the request will fail with a conflict.
filters –
Filters
(optional) Filters on which Pipeline packages to include in the deployed graph.id – str (optional) Unique identifier for this pipeline.
ingestion_definition –
ManagedIngestionPipelineDefinition
(optional) The configuration for a managed ingestion pipeline. These settings cannot be used with the ‘libraries’, ‘target’ or ‘catalog’ settings.libraries – List[
PipelineLibrary
] (optional) Libraries or code needed by this deployment.name – str (optional) Friendly identifier for this pipeline.
notifications – List[
Notifications
] (optional) List of notification settings for this pipeline.photon – bool (optional) Whether Photon is enabled for this pipeline.
serverless – bool (optional) Whether serverless compute is enabled for this pipeline.
storage – str (optional) DBFS root directory for storing checkpoints and tables.
target – str (optional) Target schema (database) to add tables in this pipeline to. If not specified, no data is published to the Hive metastore or Unity Catalog. To publish to Unity Catalog, also specify catalog.
trigger –
PipelineTrigger
(optional) Which pipeline trigger to use. Deprecated: Use continuous instead.
- update_permissions(pipeline_id: str [, access_control_list: Optional[List[PipelineAccessControlRequest]]]) PipelinePermissions ¶
Update pipeline permissions.
Updates the permissions on a pipeline. Pipelines can inherit permissions from their root object.
- Parameters:
pipeline_id – str The pipeline for which to get or manage permissions.
access_control_list – List[
PipelineAccessControlRequest
] (optional)
- Returns:
- wait_get_pipeline_idle(pipeline_id: str, timeout: datetime.timedelta = 0:20:00, callback: Optional[Callable[[GetPipelineResponse], None]]) GetPipelineResponse ¶
- wait_get_pipeline_running(pipeline_id: str, timeout: datetime.timedelta = 0:20:00, callback: Optional[Callable[[GetPipelineResponse], None]]) GetPipelineResponse ¶