Skip to content

API Reference

This section provides an overview and links to the Kedro-Dagster API documentation.

Command Line Interface

Kedro-Dagster provides CLI commands to initialize and run the translation of your Kedro project into Dagster.

kedro dagster init

Initializes Dagster integration for your Kedro project by generating the necessary definitions.py and configuration files.

Updates the template of a kedro project.

Running this command is mandatory to use Kedro-Dagster.

This adds
  • "conf/base/dagster.yml": This is a configuration file used for the dagster run parametrization.
  • "src//definitions.py": This is the dagster file where all dagster definitions are set.
Source code in src/kedro_dagster/cli.py
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
@dagster_commands.command()
@click.option(
    "--env",
    "-e",
    default="base",
    help="The name of the kedro environment where the 'dagster.yml' should be created. Default to 'local'",
)
@click.option(
    "--force",
    "-f",
    is_flag=True,
    default=False,
    help="Update the template without any checks.",
)
@click.option(
    "--silent",
    "-s",
    is_flag=True,
    default=False,
    help="Should message be logged when files are modified?",
)
def init(env: str, force: bool, silent: bool) -> None:
    """Updates the template of a kedro project.

    Running this command is mandatory to use Kedro-Dagster.

    This adds:
     - "conf/base/dagster.yml": This is a configuration file
     used for the dagster run parametrization.
     - "src/<python_package>/definitions.py": This is the
     dagster file where all dagster definitions are set.
    """

    dagster_yml = "dagster.yml"
    project_path = _find_kedro_project(Path.cwd()) or Path.cwd()
    project_metadata = bootstrap_project(project_path)
    package_name = project_metadata.package_name
    dagster_yml_path = project_path / settings.CONF_SOURCE / env / dagster_yml

    if dagster_yml_path.is_file() and not force:
        click.secho(
            click.style(
                f"A 'dagster.yml' already exists at '{dagster_yml_path}' You can use the ``--force`` option to override it.",
                fg="red",
            )
        )
    else:
        try:
            write_jinja_template(
                src=TEMPLATE_FOLDER_PATH / dagster_yml,
                is_cookiecutter=False,
                dst=dagster_yml_path,
                python_package=package_name,
            )
            if not silent:
                click.secho(
                    click.style(
                        f"'{settings.CONF_SOURCE}/{env}/{dagster_yml}' successfully updated.",
                        fg="green",
                    )
                )
        except FileNotFoundError:
            click.secho(
                click.style(
                    f"No env '{env}' found. Please check this folder exists inside '{settings.CONF_SOURCE}' folder.",
                    fg="red",
                )
            )

    definitions_py = "definitions.py"
    definitions_py_path = project_path / "src" / package_name / definitions_py

    if definitions_py_path.is_file() and not force:
        click.secho(
            click.style(
                f"A 'definitions.py' already exists at '{definitions_py_path}' You can use the ``--force`` option to override it.",
                fg="red",
            )
        )
    else:
        write_jinja_template(
            src=TEMPLATE_FOLDER_PATH / definitions_py,
            is_cookiecutter=False,
            dst=definitions_py_path,
            python_package=package_name,
        )
        if not silent:
            click.secho(
                click.style(
                    f"'src/{package_name}/{definitions_py}' successfully updated.",
                    fg="green",
                )
            )

Usage:

uv run kedro dagster init --env <ENV_NAME> --force --silent
  • --env: The Kedro environment where the dagster.yml should be created (default: local).
  • --force: Overwrite existing files without prompting.
  • --silent: Suppress output messages when files are modified.

kedro dagster dev

Starts the Dagster development UI and launches your Kedro pipelines as Dagster jobs for interactive development and monitoring.

Opens the dagster dev user interface with the project-specific settings of dagster.yml.

Source code in src/kedro_dagster/cli.py
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
@dagster_commands.command()
@click.option(
    "--env",
    "-e",
    required=False,
    default="local",
    help="The environment within conf folder we want to retrieve",
)
@click.option(
    "--log-level",
    required=False,
    help="The level of the event tracked by the loggers",
)
@click.option(
    "--log-format",
    required=False,
    help="The format of the logs",
)
@click.option(
    "--port",
    "-p",
    required=False,
    help="The port to listen on",
)
@click.option(
    "--host",
    "-h",
    required=False,
    help="The network address to listen on",
)
@click.option(
    "--live-data-poll-rate",
    required=False,
    help="The rate at which to poll for new data",
)
def dev(
    env: str,
    log_level: Literal["debug", "info", "warning", "error", "critical"],
    log_format: Literal["color", "json", "default"],
    port: str,
    host: str,
    live_data_poll_rate: str,
) -> None:
    """Opens the dagster dev user interface with the
    project-specific settings of `dagster.yml`.
    """

    project_path = _find_kedro_project(Path.cwd()) or Path.cwd()
    bootstrap_project(project_path)

    with KedroSession.create(
        project_path=project_path,
        env=env,
    ) as session:
        context = session.load_context()
        dagster_config = get_dagster_config(context)
        python_file = dagster_config.dev.python_file  # type: ignore[union-attr]
        log_level = log_level or dagster_config.dev.log_level
        log_format = log_format or dagster_config.dev.log_format
        host = host or dagster_config.dev.host  # type: ignore[union-attr]
        port = port or dagster_config.dev.port  # type: ignore[union-attr]
        live_data_poll_rate = live_data_poll_rate or dagster_config.dev.live_data_poll_rate  # type: ignore[union-attr]

        # call dagster dev with specific options
        subprocess.call([
            "dagster",
            "dev",
            "--python-file",
            python_file,
            "--log-level",
            log_level,
            "--log-format",
            log_format,
            "--host",
            host,
            "--port",
            port,
            "--live-data-poll-rate",
            live_data_poll_rate,
        ])

Usage:

uv run kedro dagster dev --env <ENV_NAME> --log-level <LEVEL> --log-format <FORMAT> --port <PORT> --host <HOST> --live-data-poll-rate <RATE>
  • --env: The Kedro environment to use (e.g., local).
  • --log-level: Logging level (debug, info, warning, error, or critical).
  • --log-format: Log output format (colored, json, default).
  • --port: Port for the Dagster UI.
  • --host: Host address for the Dagster UI.
  • --live-data-poll-rate: Polling rate for live data in milliseconds.

If specified, those options will override the ones specified in your conf/<ENV_NAME>/dagster.yml. See DevOptions.

Configuration

The following classes define the configuration schema for Kedro-Dagster's dagster.yml, using Pydantic models.

KedroDagsterConfig

Main configuration class for Kedro-Dagster, representing the structure of the dagster.yml file.

Bases: BaseModel

Main configuration class for Kedro-Dagster, representing the structure of the dagster.yml file.

Attributes:

  • dev (DevOptions | None) –

    Options for kedro dagster dev command.

  • executors (dict[str, ExecutorOptions] | None) –

    Mapping of executor names to executor options.

  • schedules (dict[str, ScheduleOptions] | None) –

    Mapping of schedule names to schedule options.

  • jobs (dict[str, JobOptions] | None) –

    Mapping of job names to job options.


DevOptions

Options for the kedro dagster dev command.

Bases: BaseModel

Development configuration options for the kedro dagster dev command.

Attributes:

  • log_level (Literal['critical', 'error', 'warning', 'info', 'debug']) –

    Logging level.

  • log_format (Literal['colored', 'json', 'rich']) –

    Format for log output.

  • port (str) –

    Port for the dev server.

  • host (str) –

    Host for the dev server.

  • live_data_poll_rate (str) –

    Poll rate for live data updates in milliseconds.


JobOptions

Configuration options for a Dagster job, including pipeline filtering, executor, and schedule.

Bases: BaseModel

Configuration options for a Dagster job.

Attributes:

  • pipeline (PipelineOptions) –

    PipelineOptions specifying which pipeline and nodes to run.

  • executor (ExecutorOptions | str | None) –

    ExecutorOptions instance or string key referencing an executor.

  • schedule (ScheduleOptions | str | None) –

    ScheduleOptions instance or string key referencing a schedule.


PipelineOptions

Options for filtering and configuring Kedro pipelines within jobs.

Bases: BaseModel

Options for filtering and configuring Kedro pipelines within a Dagster job.

Attributes:

  • pipeline_name (str | None) –

    Name of the Kedro pipeline to run.

  • from_nodes (list[str] | None) –

    List of node names to start execution from.

  • to_nodes (list[str] | None) –

    List of node names to end execution at.

  • node_names (list[str] | None) –

    List of specific node names to include in the pipeline.

  • from_inputs (list[str] | None) –

    List of dataset names to use as entry points.

  • to_outputs (list[str] | None) –

    List of dataset names to use as exit points.

  • node_namespace (str | None) –

    Namespace to filter nodes by.

  • tags (list[str] | None) –

    List of tags to filter nodes by.


ExecutorOptions

Base class for executor configuration. See specific executor option classes below.


InProcessExecutorOptions

Options for the in-process executor.

Bases: BaseModel

Options for the in-process executor.

Attributes:

  • retries (RetriesEnableOptions | RetriesDisableOptions) –

    Retry configuration for the executor.


MultiprocessExecutorOptions

Options for the multiprocess executor.

Bases: InProcessExecutorOptions

Options for the multiprocess executor.

Attributes:

  • retries (RetriesEnableOptions | RetriesDisableOptions) –

    Retry configuration for the executor.

  • max_concurrent (int | None) –

    Maximum number of concurrent processes.


DaskExecutorOptions

Options for the Dask executor.

Bases: BaseModel

Options for the Dask executor.

Attributes:

  • cluster (DaskClusterConfig) –

    Configuration for the Dask cluster.

where DaskClusterConfig is defined as:

Bases: BaseModel

Configuration for the Dask cluster.

Attributes:

  • existing (dict[str, str] | None) –

    Connect to an existing scheduler.

  • local (dict[str, Any] | None) –

    Local cluster configuration.

  • yarn (dict[str, Any] | None) –

    YARN cluster configuration.

  • ssh (dict[str, Any] | None) –

    SSH cluster configuration.

  • pbs (dict[str, Any] | None) –

    PBS cluster configuration.

  • moab (dict[str, Any] | None) –

    Moab cluster configuration.

  • sge (dict[str, Any] | None) –

    SGE cluster configuration.

  • lsf (dict[str, Any] | None) –

    LSF cluster configuration.

  • slurm (dict[str, Any] | None) –

    SLURM cluster configuration.

  • oar (dict[str, Any] | None) –

    OAR cluster configuration.

  • kube (dict[str, Any] | None) –

    Kubernetes cluster configuration.


DockerExecutorOptions

Options for the Docker-based executor.

Bases: MultiprocessExecutorOptions

Options for the Docker-based executor.

Attributes:

  • retries (RetriesEnableOptions | RetriesDisableOptions) –

    Retry configuration for the executor.

  • max_concurrent (int | None) –

    Maximum number of concurrent processes.

  • image (str | None) –

    Docker image to use.

  • network (str | None) –

    Name of the network to connect the container at creation time.

  • registry (dict[str, str] | None) –

    Information for using a non local/public docker registry.

  • env_vars (list[str]) –

    Environment variables for the container.

  • container_kwargs (dict[str, Any] | None) –

    Key-value pairs for containers.create.

  • networks (list[str]) –

    Names of the networks to connect the container at creation time.


CeleryExecutorOptions

Options for the Celery-based executor.

Bases: BaseModel

Options for the Celery-based executor.

Attributes:

  • broker (str | None) –

    Celery broker URL.

  • backend (str | None) –

    Celery backend URL.

  • include (list[str]) –

    List of modules every worker should import.

  • config_source (dict[str, Any] | None) –

    Additional settings for the Celery app.

  • retries (int | None) –

    Number of retries for the Celery tasks.


CeleryDockerExecutorOptions

Options for the Celery executor with Docker support.

Bases: CeleryExecutorOptions, DockerExecutorOptions

Options for the Celery-based executor which launches tasks as Docker containers.

Attributes:

  • broker (str | None) –

    Celery broker URL.

  • backend (str | None) –

    Celery backend URL.

  • include (list[str]) –

    List of modules every worker should import.

  • config_source (dict[str, Any] | None) –

    Additional settings for the Celery app.

  • retries (int | None) –

    Number of retries for the Celery tasks.

  • image (str | None) –

    Docker image to use.

  • network (str | None) –

    Name of the network to connect the container at creation time.

  • registry (dict[str, str] | None) –

    Information for using a non local/public docker registry.

  • env_vars (list[str]) –

    Environment variables for the container.

  • container_kwargs (dict[str, Any] | None) –

    Key-value pairs for containers.create.

  • networks (list[str]) –

    Names of the networks to connect the container at creation time.

  • max_concurrent (int | None) –

    Maximum number of concurrent processes.

  • retries (RetriesEnableOptions | RetriesDisableOptions) –

    Retry configuration for the executor.


K8sJobExecutorOptions

Options for the Kubernetes-based executor.

Bases: MultiprocessExecutorOptions

Options for the Kubernetes-based executor.

Attributes:

  • retries (RetriesEnableOptions | RetriesDisableOptions) –

    Retry configuration for the executor.

  • max_concurrent (int | None) –

    Maximum number of concurrent processes.

  • job_namespace (str | None) –

    Kubernetes namespace for jobs.

  • load_incluster_config (bool | None) –

    Whether the executor is running within a k8s cluster.

  • kubeconfig_file (str | None) –

    Path to a kubeconfig file to use.

  • step_k8s_config (K8sJobConfig) –

    Raw Kubernetes configuration for each step.

  • per_step_k8s_config (dict[str, K8sJobConfig]) –

    Per op k8s configuration overrides.

  • image_pull_policy (str | None) –

    Image pull policy for Pods.

  • image_pull_secrets (list[dict[str, str]] | None) –

    Credentials for pulling images.

  • service_account_name (str | None) –

    Kubernetes service account name.

  • env_config_maps (list[str] | None) –

    ConfigMapEnvSource names for environment variables.

  • env_secrets (list[str] | None) –

    Secret names for environment variables.

  • env_vars (list[str] | None) –

    Environment variables for the job.

  • volume_mounts (list[dict[str, str]]) –

    Volume mounts for the container.

  • volumes (list[dict[str, str]]) –

    Volumes for the Pod.

  • labels (dict[str, str]) –

    Labels for created pods.

  • resources (dict[str, dict[str, str]] | None) –

    Compute resource requirements.

  • scheduler_name (str | None) –

    Custom Kubernetes scheduler for Pods.

  • security_context (dict[str, str]) –

    Security settings for the container.

where K8sJobConfig is defined as:

Bases: BaseModel

Configuration for Kubernetes jobs.

Attributes:

  • container_config (dict[str, Any] | None) –

    Configuration for the Kubernetes container.

  • pod_spec_config (dict[str, Any] | None) –

    Configuration for the Pod specification.

  • pod_template_spec_metadata (dict[str, Any] | None) –

    Metadata for the Pod template specification.

  • job_spec_config (dict[str, Any] | None) –

    Configuration for the Job specification.

  • job_metadata (dict[str, Any] | None) –

    Metadata for the Job.


CeleryK8sJobExecutorOptions

Options for the Celery executor with Kubernetes support.

Bases: CeleryExecutorOptions, K8sJobExecutorOptions

Options for the Celery-based executor which launches tasks as Kubernetes jobs.

Attributes:

  • broker (str | None) –

    Celery broker URL.

  • backend (str | None) –

    Celery backend URL.

  • include (list[str]) –

    List of modules every worker should import.

  • config_source (dict[str, Any] | None) –

    Additional settings for the Celery app.

  • retries (int | None) –

    Number of retries for the Celery tasks.

  • job_namespace (str | None) –

    Kubernetes namespace for jobs.

  • load_incluster_config (bool | None) –

    Whether the executor is running within a k8s cluster.

  • kubeconfig_file (str | None) –

    Path to a kubeconfig file to use.

  • step_k8s_config (K8sJobConfig) –

    Raw Kubernetes configuration for each step.

  • per_step_k8s_config (dict[str, K8sJobConfig]) –

    Per op k8s configuration overrides.

  • image_pull_policy (str | None) –

    Image pull policy for Pods.

  • image_pull_secrets (list[dict[str, str]] | None) –

    Credentials for pulling images.

  • service_account_name (str | None) –

    Kubernetes service account name.

  • env_config_maps (list[str] | None) –

    ConfigMapEnvSource names for environment variables.

  • env_secrets (list[str] | None) –

    Secret names for environment variables.

  • env_vars (list[str] | None) –

    Environment variables for the job.

  • volume_mounts (list[dict[str, str]]) –

    Volume mounts for the container.

  • volumes (list[dict[str, str]]) –

    Volumes for the Pod.

  • labels (dict[str, str]) –

    Labels for created pods.

  • resources (dict[str, dict[str, str]] | None) –

    Compute resource requirements.

  • scheduler_name (str | None) –

    Custom Kubernetes scheduler for Pods.

  • security_context (dict[str, str]) –

    Security settings for the container.

  • job_wait_timeout (float) –

    Wait time in seconds for a job to complete before marking as failed.

where K8sJobConfig is defined as:

Bases: BaseModel

Configuration for Kubernetes jobs.

Attributes:

  • container_config (dict[str, Any] | None) –

    Configuration for the Kubernetes container.

  • pod_spec_config (dict[str, Any] | None) –

    Configuration for the Pod specification.

  • pod_template_spec_metadata (dict[str, Any] | None) –

    Metadata for the Pod template specification.

  • job_spec_config (dict[str, Any] | None) –

    Configuration for the Job specification.

  • job_metadata (dict[str, Any] | None) –

    Metadata for the Job.


ScheduleOptions

Options for defining Dagster schedules.

Bases: BaseModel

Options for defining Dagster schedules.

Attributes:

  • cron_schedule (str) –

    Cron expression for the schedule.

  • execution_timezone (str | None) –

    Timezone in which the schedule should execute.

  • description (str | None) –

    Optional description of the schedule.

  • metadata (dict[str, Any] | None) –

    Additional metadata for the schedule.


Translation Modules

The following classes are responsible for translating Kedro concepts into Dagster constructs:

KedroProjectTranslator

Translates an entire Kedro project into a Dagster code location, orchestrating the translation of pipelines, datasets, hooks, and loggers.

Translate Kedro project into Dagster code location.

Parameters:

  • project_path (Path | None, default: None ) –

    The path to the Kedro project.

  • env (str | None, default: None ) –

    Kedro environment to use.

  • conf_source (str | None, default: None ) –

    Path to the Kedro configuration source directory.

Source code in src/kedro_dagster/translator.py
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
def __init__(
    self,
    project_path: Path | None = None,
    env: str | None = None,
    conf_source: str | None = None,
) -> None:
    self._project_path: Path
    if project_path is None:
        self._project_path = _find_kedro_project(Path.cwd()) or Path.cwd()
    else:
        self._project_path = project_path

    if env is None:
        # TODO: Double check if this is the right way to get the default environment
        default_run_env = settings._CONFIG_LOADER_ARGS["default_run_env"]
        env = os.getenv("KEDRO_ENV", default_run_env) or ""

    self._env: str = env

    self.initialize_kedro(conf_source=conf_source)

get_defined_pipelines

get_defined_pipelines(dagster_config, translate_all)

Get pipelines to translate.

Parameters:

  • dagster_config (dict[str, Any]) –

    The configuration of the Dagster job.

  • translate_all (bool) –

    Whether to translate the whole Kedro project.

Returns:

  • list[Pipeline]

    List of Kedro pipelines to translate.

Source code in src/kedro_dagster/translator.py
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
def get_defined_pipelines(self, dagster_config: "BaseModel", translate_all: bool) -> list["Pipeline"]:
    """Get pipelines to translate.

    Args:
        dagster_config (dict[str, Any]): The configuration of the Dagster job.
        translate_all (bool): Whether to translate the whole Kedro project.
        If ``False``, translates only the pipelines defined in `dagster.yml`.

    Returns:
        list[Pipeline]: List of Kedro pipelines to translate.
    """
    if translate_all:
        return list(find_pipelines().values())

    defined_pipelines = []
    for job_config in dagster_config.jobs.values():
        pipeline_config = job_config.pipeline.model_dump()

        pipeline_name = pipeline_config.get("pipeline_name", "__default__")
        filter_params = get_filter_params_dict(pipeline_config)
        pipeline = pipelines.get(pipeline_name).filter(**filter_params)
        defined_pipelines.append(pipeline)

    return defined_pipelines

initialize_kedro

initialize_kedro(conf_source=None)

Initialize Kedro context and pipelines for translation.

Parameters:

  • conf_source (str | None, default: None ) –

    Optional configuration source directory.

Source code in src/kedro_dagster/translator.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
def initialize_kedro(self, conf_source: str | None = None) -> None:
    """Initialize Kedro context and pipelines for translation.

    Args:
        conf_source (str | None): Optional configuration source directory.
    """
    LOGGER.info("Initializing Kedro project...")

    LOGGER.info("Bootstrapping Kedro project at path: %s", self._project_path)
    self._project_metadata = bootstrap_project(self._project_path)
    LOGGER.info("Project name: %s", self._project_metadata.project_name)

    LOGGER.info("Creating Kedro session...")
    self._session = KedroSession.create(
        project_path=self._project_path,
        env=self._env,
        conf_source=conf_source,
    )

    self._session_id = self._session.session_id
    LOGGER.info("Session created with ID: %s", self._session_id)

    LOGGER.info("Loading Kedro context...")
    self._context = self._session.load_context()

    self._pipelines = find_pipelines()

    LOGGER.info("Kedro initialization complete.")

to_dagster

to_dagster(translate_all=False)

Translate Kedro project into Dagster.

Parameters:

  • translate_all (bool, default: False ) –

    Whether to translate the whole Kedro project.

Returns:

  • DagsterCodeLocation

    The translated Dagster code location.

Source code in src/kedro_dagster/translator.py
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
def to_dagster(self, translate_all: bool = False) -> DagsterCodeLocation:
    """Translate Kedro project into Dagster.

    Args:
        translate_all (bool): Whether to translate the whole Kedro project.
        If ``False``, translates only the pipelines defined in `dagster.yml`.

    Returns:
        DagsterCodeLocation: The translated Dagster code location.
    """
    LOGGER.info("Translating Kedro project into Dagster...")

    LOGGER.info("Loading Dagster configuration...")
    dagster_config = get_dagster_config(self._context)

    LOGGER.info("Creating Dagster run resources...")
    kedro_run_translator = KedroRunTranslator(
        context=self._context,
        project_path=str(self._project_path),
        env=self._env,
        session_id=self._session_id,
    )
    kedro_run_resource = kedro_run_translator.to_dagster(
        pipeline_name="__default__",
        filter_params={},
    )
    named_resources: dict[str, dg.ResourceDefinition] = {"kedro_run": kedro_run_resource}

    if is_mlflow_enabled():
        # Add MLflow resource if enabled in the Kedro context
        named_resources["mlflow"] = get_mlflow_resource_from_config(self._context.mlflow)

    LOGGER.info("Mapping Dagster loggers...")
    self.logger_creator = LoggerTranslator(
        dagster_config=dagster_config, package_name=self._project_metadata.package_name
    )
    named_loggers = self.logger_creator.to_dagster()

    LOGGER.info("Translating Kedro catalog to Dagster IO managers...")
    defined_pipelines = self.get_defined_pipelines(dagster_config=dagster_config, translate_all=translate_all)
    self.catalog_translator = CatalogTranslator(
        catalog=self._context.catalog,
        pipelines=defined_pipelines,
        hook_manager=self._context._hook_manager,
        env=self._env,
    )
    named_io_managers = self.catalog_translator.to_dagster()
    named_resources |= named_io_managers

    LOGGER.info("Translating Kedro nodes to Dagster ops and assets...")
    self.node_translator = NodeTranslator(
        pipelines=defined_pipelines,
        catalog=self._context.catalog,
        hook_manager=self._context._hook_manager,
        session_id=self._session_id,
        named_resources=named_resources,
        env=self._env,
    )
    named_ops, named_assets = self.node_translator.to_dagster()

    LOGGER.info("Creating Dagster executors...")
    self.executor_creator = ExecutorCreator(dagster_config=dagster_config)
    named_executors = self.executor_creator.create_executors()

    LOGGER.info("Translating Kedro pipelines to Dagster jobs...")
    self.pipeline_translator = PipelineTranslator(
        dagster_config=dagster_config,
        context=self._context,
        project_path=str(self._project_path),
        env=self._env,
        session_id=self._session_id,
        named_assets=named_assets,
        named_ops=named_ops,
        named_resources=named_resources,
        named_executors=named_executors,
        enable_mlflow=is_mlflow_enabled(),
    )
    named_jobs = self.pipeline_translator.to_dagster()

    LOGGER.info("Creating Dagster schedules...")
    self.schedule_creator = ScheduleCreator(dagster_config=dagster_config, named_jobs=named_jobs)
    named_schedules = self.schedule_creator.create_schedules()

    LOGGER.info("Creating Dagster run sensors...")
    named_sensors = kedro_run_translator._translate_on_pipeline_error_hook(named_jobs=named_jobs)

    LOGGER.info("Kedro project successfully translated into Dagster.")

    return DagsterCodeLocation(
        named_resources=named_resources,
        named_assets=named_assets,
        named_ops=named_ops,
        named_jobs=named_jobs,
        named_executors=named_executors,
        named_schedules=named_schedules,
        named_sensors=named_sensors,
        named_loggers=named_loggers,
    )

DagsterCodeLocation

Collects the Dagster job, asset, resource, executor, schedule, sensor, and loggers definitions generated for the Kedro project-based Dagster code location.

Represents a Kedro-based Dagster code location.

Attributes:

  • named_ops (dict[str, OpDefinition]) –

    A dictionary of named Dagster operations.

  • named_assets (dict[str, AssetSpec | AssetsDefinition]) –

    A dictionary of named Dagster assets.

  • named_resources (dict[str, ResourceDefinition]) –

    A dictionary of named Dagster resources.

  • named_jobs (dict[str, JobDefinition]) –

    A dictionary of named Dagster jobs.

  • named_executors (dict[str, ExecutorDefinition]) –

    A dictionary of named Dagster executors.

  • named_schedules (dict[str, ScheduleDefinition]) –

    A dictionary of named Dagster schedules.

  • named_sensors (dict[str, SensorDefinition]) –

    A dictionary of named Dagster sensors.

  • named_loggers (dict[str, LoggerDefinition]) –

    A dictionary of named Dagster loggers.


CatalogTranslator

Translates Kedro datasets into Dagster IO managers and assets, enabling seamless data handling between Kedro and Dagster.

Translate Kedro datasets into Dagster IO managers.

Parameters:

  • catalog (CatalogProtocol) –

    The Kedro catalog.

  • pipelines (list[Pipeline]) –

    List of Kedro pipelines to translate.

  • hook_manager (PluginManager) –

    The hook manager to call Kedro hooks.

  • env (str) –

    The Kedro environment.

Source code in src/kedro_dagster/catalog.py
32
33
34
35
36
37
38
def __init__(
    self, catalog: "CatalogProtocol", pipelines: list["Pipeline"], hook_manager: "PluginManager", env: str
):
    self._catalog = catalog
    self._pipelines = pipelines
    self._hook_manager = hook_manager
    self._env = env

to_dagster

to_dagster()

Get the IO managers from Kedro datasets.

Returns:

  • Dict[str, IOManagerDefinition]

    A dictionary of DagsterIO managers.

Source code in src/kedro_dagster/catalog.py
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
def to_dagster(self) -> dict[str, dg.IOManagerDefinition]:
    """Get the IO managers from Kedro datasets.

    Returns:
        Dict[str, IOManagerDefinition]: A dictionary of DagsterIO managers.
    """
    named_io_managers = {}
    for dataset_name in sum(self._pipelines, start=Pipeline([])).datasets():
        asset_name = format_dataset_name(dataset_name)
        if _is_asset_name(asset_name):
            try:
                dataset = self._catalog._get_dataset(dataset_name)

            except DatasetNotFoundError:
                LOGGER.debug(
                    f"Dataset `{dataset_name}` not in catalog. It will be "
                    "handled by default IO manager `io_manager`."
                )
                continue

            if isinstance(dataset, MemoryDataset):
                continue

            named_io_managers[f"{self._env}__{asset_name}_io_manager"] = self._translate_dataset(
                dataset,
                dataset_name,
            )

    return named_io_managers

NodeTranslator

Converts Kedro nodes into Dagster ops and assets, handling Kedro parameter passing.

Translate Kedro nodes into Dagster ops and assets.

Parameters:

  • pipelines (list[Pipeline]) –

    List of Kedro pipelines.

  • catalog (CatalogProtocol) –

    Kedro catalog instance.

  • hook_manager (PluginManager) –

    Kedro hook manager.

  • session_id (str) –

    Kedro session ID.

  • named_resources (dict[str, ResourceDefinition]) –

    Named Dagster resources.

  • env (str) –

    Kedro environment.

Source code in src/kedro_dagster/nodes.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
def __init__(
    self,
    pipelines: list[Pipeline],
    catalog: "CatalogProtocol",
    hook_manager: "PluginManager",
    session_id: str,
    named_resources: dict[str, dg.ResourceDefinition],
    env: str,
):
    self._pipelines = pipelines
    self._catalog = catalog
    self._hook_manager = hook_manager
    self._session_id = session_id
    self._named_resources = named_resources
    self._env = env

asset_names property

asset_names

Return a list of all asset names in the pipelines.

create_asset

create_asset(node)

Create a Dagster asset from a Kedro node.

Parameters:

  • node (Node) –

    The Kedro node to wrap into an asset.

Returns:

  • AssetsDefinition

    A Dagster asset.

Source code in src/kedro_dagster/nodes.py
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
def create_asset(self, node: "Node") -> dg.AssetsDefinition:
    """Create a Dagster asset from a Kedro node.

    Args:
        node (Node): The Kedro node to wrap into an asset.

    Returns:
        AssetsDefinition: A Dagster asset.
    """

    ins = {}
    for dataset_name in node.inputs:
        asset_name = format_dataset_name(dataset_name)
        if _is_asset_name(asset_name):
            asset_key = get_asset_key_from_dataset_name(dataset_name, self._env)
            ins[asset_name] = dg.AssetIn(key=asset_key)

    outs = {}
    for dataset_name in node.outputs:
        asset_name = format_dataset_name(dataset_name)
        asset_key = get_asset_key_from_dataset_name(dataset_name, self._env)
        out_asset_params = self._get_out_asset_params(dataset_name, asset_name, return_kinds=True)
        outs[asset_name] = dg.AssetOut(key=asset_key, **out_asset_params)

    NodeParametersConfig = self._get_node_parameters_config(node)

    required_resource_keys = None
    if is_mlflow_enabled():
        required_resource_keys = {"mlflow"}

    @dg.multi_asset(
        name=f"{format_node_name(node.name)}_asset",
        description=f"Kedro node {node.name} wrapped as a Dagster multi asset.",
        group_name=_get_node_pipeline_name(node),
        ins=ins,
        outs=outs,
        required_resource_keys=required_resource_keys,
        op_tags={f"node_tag_{i + 1}": tag for i, tag in enumerate(node.tags)},
    )
    def dagster_asset(context: dg.AssetExecutionContext, config: NodeParametersConfig, **inputs):  # type: ignore[no-untyped-def, valid-type]
        """Execute the Kedro node as a Dagster asset."""
        context.log.info(f"Running node `{node.name}` in asset.")

        inputs |= config.model_dump()  # type: ignore[attr-defined]
        inputs = {
            unformat_asset_name(input_asset_name): input_asset for input_asset_name, input_asset in inputs.items()
        }

        outputs = node.run(inputs)

        if len(outputs) == 1:
            return list(outputs.values())[0]
        elif len(outputs) > 1:
            return tuple(outputs.values())

    return dagster_asset

create_op

create_op(node)

Create a Dagster op from a Kedro node for use in a Dagster graph.

Parameters:

  • node (Node) –

    Kedro node.

Returns:

  • OpDefinition

    A Dagster op.

Source code in src/kedro_dagster/nodes.py
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
def create_op(self, node: "Node") -> dg.OpDefinition:
    """Create a Dagster op from a Kedro node for use in a Dagster graph.

    Args:
        node (Node): Kedro node.

    Returns:
        OpDefinition: A Dagster op.
    """
    ins = {}
    for dataset_name in node.inputs:
        asset_name = format_dataset_name(dataset_name)
        if _is_asset_name(asset_name):
            ins[asset_name] = dg.In(asset_key=dg.AssetKey(asset_name))

    out = {}
    for dataset_name in node.outputs:
        asset_name = format_dataset_name(dataset_name)
        out_asset_params = self._get_out_asset_params(dataset_name, asset_name)
        out[asset_name] = dg.Out(**out_asset_params)

    NodeParametersConfig = self._get_node_parameters_config(node)
    op_name = format_node_name(node.name)

    required_resource_keys = []
    for dataset_name in node.inputs + node.outputs:
        asset_name = format_dataset_name(dataset_name)
        if f"{self._env}__{asset_name}_io_manager" in self._named_resources:
            required_resource_keys.append(f"{self._env}__{asset_name}_io_manager")

    if is_mlflow_enabled():
        required_resource_keys.append("mlflow")

    @dg.op(
        name=f"{op_name}",
        description=f"Kedro node {node.name} wrapped as a Dagster op.",
        ins=ins | {"before_pipeline_run_hook_output": dg.In(dagster_type=dg.Nothing)},
        out=out | {f"{op_name}_after_pipeline_run_hook_input": dg.Out(dagster_type=dg.Nothing)},
        required_resource_keys=required_resource_keys,
        tags={f"node_tag_{i + 1}": tag for i, tag in enumerate(node.tags)},
    )
    def node_graph_op(context: dg.OpExecutionContext, config: NodeParametersConfig, **inputs):  # type: ignore[no-untyped-def, valid-type]
        """Execute the Kedro node as a Dagster op."""
        context.log.info(f"Running node `{node.name}` in graph.")

        inputs |= config.model_dump()  # type: ignore[attr-defined]
        inputs = {
            unformat_asset_name(input_asset_name): input_asset for input_asset_name, input_asset in inputs.items()
        }

        self._hook_manager.hook.before_node_run(
            node=node,
            catalog=self._catalog,
            inputs=inputs,
            is_async=False,  # TODO: Should this be True?
            session_id=self._session_id,
        )

        try:
            outputs = node.run(inputs)

        except Exception as exc:
            self._hook_manager.hook.on_node_error(
                error=exc,
                node=node,
                catalog=self._catalog,
                inputs=inputs,
                is_async=False,
                session_id=self._session_id,
            )
            raise exc

        self._hook_manager.hook.after_node_run(
            node=node,
            catalog=self._catalog,
            inputs=inputs,
            outputs=outputs,
            is_async=False,
            session_id=self._session_id,
        )

        for output_dataset_name in node.outputs:
            output_asset_key = get_asset_key_from_dataset_name(output_dataset_name, self._env)
            context.log_event(dg.AssetMaterialization(asset_key=output_asset_key))

        if len(outputs) > 0:
            return tuple(outputs.values()) + (None,)

        return None

    return node_graph_op

to_dagster

to_dagster()

Translate Kedro nodes into Dagster ops and assets.

Returns:

  • dict[str, OpDefinition]

    Dictionary of named ops.

  • dict[str, dg.AssetSpec | dg.AssetsDefinition]]

    Dictionary of named assets.

Source code in src/kedro_dagster/nodes.py
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
def to_dagster(self) -> tuple[dict[str, dg.OpDefinition], dict[str, dg.AssetSpec | dg.AssetsDefinition]]:
    """Translate Kedro nodes into Dagster ops and assets.

    Returns:
        dict[str, dg.OpDefinition]: Dictionary of named ops.
        dict[str, dg.AssetSpec | dg.AssetsDefinition]]: Dictionary of named assets.
    """
    default_pipeline: Pipeline = sum(self._pipelines, start=Pipeline([]))

    # Assets that are not generated through dagster are external and
    # registered with AssetSpec
    named_assets = {}
    for external_dataset_name in default_pipeline.inputs():
        external_asset_name = format_dataset_name(external_dataset_name)
        if _is_asset_name(external_asset_name):
            dataset = self._catalog._get_dataset(external_dataset_name)
            metadata = getattr(dataset, "metadata", None) or {}
            description = metadata.pop("description", "")

            io_manager_key = "io_manager"
            if not isinstance(dataset, MemoryDataset):
                io_manager_key = f"{self._env}__{external_asset_name}_io_manager"

            external_asset_key = get_asset_key_from_dataset_name(external_dataset_name, env=self._env)
            external_asset = dg.AssetSpec(
                key=external_asset_key,
                group_name="external",
                description=description,
                metadata=metadata,
                kinds={"kedro"},
            ).with_io_manager_key(io_manager_key=io_manager_key)
            named_assets[external_asset_name] = external_asset

    # Create assets from Kedro nodes that have outputs
    named_ops = {}
    for node in default_pipeline.nodes:
        op_name = format_node_name(node.name)
        graph_op = self.create_op(node)
        named_ops[f"{op_name}_graph"] = graph_op

        if len(node.outputs):
            asset = self.create_asset(node)
            named_assets[op_name] = asset

    return named_ops, named_assets

PipelineTranslator

Maps Kedro pipelines to Dagster jobs, supporting pipeline filtering, hooks, job configuration, and resource assignment.

Translator for Kedro pipelines to Dagster jobs.

Parameters:

  • dagster_config (dict[str, Any]) –

    The configuration of the Dagster job.

  • context (KedroContext) –

    The Kedro context.

  • project_path (str) –

    The path to the Kedro project.

  • env (str) –

    The Kedro environment.

  • session_id (str) –

    The Kedro session ID.

  • named_assets (dict[str, AssetsDefinition]) –

    The named assets.

  • named_ops (dict[str, OpDefinition]) –

    The named ops.

  • named_resources (dict[str, ResourceDefinition]) –

    The named resources.

  • named_executors (dict[str, ExecutorDefinition]) –

    The named executors.

Source code in src/kedro_dagster/pipelines.py
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
def __init__(
    self,
    dagster_config: dict[str, Any],
    context: "KedroContext",
    project_path: str,
    env: str,
    session_id: str,
    named_assets: dict[str, dg.AssetsDefinition],
    named_ops: dict[str, dg.OpDefinition],
    named_resources: dict[str, dg.ResourceDefinition],
    named_executors: dict[str, dg.ExecutorDefinition],
    enable_mlflow: bool,
):
    self._dagster_config = dagster_config
    self._context = context
    self._project_path = project_path
    self._env = env
    self._session_id = session_id
    self._catalog = context.catalog
    self._hook_manager = context._hook_manager
    self._named_assets = named_assets
    self._named_ops = named_ops
    self._named_resources = named_resources
    self._named_executors = named_executors
    self._enable_mlflow = enable_mlflow

to_dagster

to_dagster()

Translate the Kedro pipelines into Dagster jobs.

Returns:

  • dict[str, JobDefinition]

    The translated Dagster jobs.

Source code in src/kedro_dagster/pipelines.py
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
def to_dagster(self) -> dict[str, dg.JobDefinition]:
    """Translate the Kedro pipelines into Dagster jobs.

    Returns:
        dict[str, JobDefinition]: The translated Dagster jobs.
    """
    named_jobs = {}
    for job_name, job_config in self._dagster_config.jobs.items():  # type: ignore[attr-defined]
        pipeline_config = job_config.pipeline.model_dump()

        pipeline_name = pipeline_config.get("pipeline_name", "__default__")
        filter_params = get_filter_params_dict(pipeline_config)
        pipeline = pipelines.get(pipeline_name).filter(**filter_params)

        executor_config = job_config.executor
        if executor_config in self._named_executors:
            executor_def = self._named_executors[executor_config]
        else:
            raise ValueError(f"Executor `{executor_config}` not found.")

        job = self.translate_pipeline(
            pipeline=pipeline,
            pipeline_name=pipeline_name,
            filter_params=filter_params,
            job_name=job_name,
            executor_def=executor_def,
        )

        named_jobs[job_name] = job

    return named_jobs

translate_pipeline

translate_pipeline(pipeline, pipeline_name, filter_params, job_name, executor_def=None, logger_defs=None)

Translate a Kedro pipeline into a Dagster job.

Parameters:

  • pipeline (Pipeline) –

    The Kedro pipeline.

  • pipeline_name (str) –

    The name of the Kedro pipeline.

  • filter_params (dict[str, Any]) –

    Filter parameters for the pipeline.

  • job_name (str) –

    The name of the job.

  • executor_def (ExecutorDefinition, default: None ) –

    The executor definition.

  • logger_defs (dict[str, LoggerDefinition] | None, default: None ) –

    The logger definitions.

Returns:

  • JobDefinition

    A Dagster job definition.

Source code in src/kedro_dagster/pipelines.py
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
def translate_pipeline(
    self,
    pipeline: Pipeline,
    pipeline_name: str,
    filter_params: dict[str, Any],
    job_name: str,
    executor_def: dg.ExecutorDefinition | None = None,
    logger_defs: dict[str, dg.LoggerDefinition] | None = None,
) -> dg.JobDefinition:
    """Translate a Kedro pipeline into a Dagster job.

    Args:
        pipeline (Pipeline): The Kedro pipeline.
        pipeline_name (str): The name of the Kedro pipeline.
        filter_params (dict[str, Any]): Filter parameters for the pipeline.
        job_name (str): The name of the job.
        executor_def (ExecutorDefinition): The executor definition.
        logger_defs (dict[str, LoggerDefinition] | None): The logger definitions.

    Returns:
        JobDefinition: A Dagster job definition.
    """
    (
        before_pipeline_run_hook,
        after_pipeline_run_hook,
    ) = self._create_pipeline_hook_ops(job_name, pipeline)

    @dg.graph(
        name=f"{self._env}__{job_name}",
        description=f"Job derived from pipeline associated to `{job_name}` in env `{self._env}`.",
        out=None,
    )
    def pipeline_graph() -> None:
        before_pipeline_run_hook_output = before_pipeline_run_hook()

        # Fil up materialized_assets with pipeline input assets
        materialized_input_assets = {}
        for dataset_name in pipeline.inputs():
            asset_name = format_dataset_name(dataset_name)
            if _is_asset_name(asset_name):
                # First, we account for external assets
                if asset_name in self._named_assets:
                    materialized_input_assets[asset_name] = self._named_assets[asset_name]
                else:
                    asset_key = get_asset_key_from_dataset_name(dataset_name, self._env)
                    materialized_input_assets[asset_name] = dg.AssetSpec(
                        key=asset_key,
                    ).with_io_manager_key(f"{self._env}__{asset_name}_io_manager")

        materialized_output_assets: dict[str, Any] = {}
        for layer in pipeline.grouped_nodes:
            for node in layer:
                op_name = format_node_name(node.name) + "_graph"
                op = self._named_ops[op_name]

                materialized_input_assets_op = {}
                for input_dataset_name in node.inputs:
                    input_asset_name = format_dataset_name(input_dataset_name)
                    if input_asset_name in materialized_input_assets:
                        materialized_input_assets_op[input_asset_name] = materialized_input_assets[input_asset_name]

                materialized_outputs = op(
                    before_pipeline_run_hook_output=before_pipeline_run_hook_output,
                    **materialized_input_assets_op,
                )

                if len(node.outputs) == 0:
                    materialized_output_assets_op = {materialized_outputs.output_name: materialized_outputs}
                else:
                    materialized_output_assets_op = {
                        materialized_output.output_name: materialized_output
                        for materialized_output in materialized_outputs
                    }
                materialized_input_assets |= materialized_output_assets_op
                materialized_output_assets |= materialized_output_assets_op

        after_pipeline_run_hook(**materialized_output_assets)

    # Overrides the kedro_run resource with the one created for the job
    kedro_run_translator = KedroRunTranslator(
        context=self._context,
        project_path=self._project_path,
        env=self._env,
        session_id=self._session_id,
    )
    kedro_run_resource = kedro_run_translator.to_dagster(
        pipeline_name=pipeline_name,
        filter_params=filter_params,
    )
    resource_defs = {"kedro_run": kedro_run_resource}

    for dataset_name in pipeline.all_inputs() | pipeline.all_outputs():
        asset_name = format_dataset_name(dataset_name)
        if f"{self._env}__{asset_name}_io_manager" in self._named_resources:
            resource_defs[f"{self._env}__{asset_name}_io_manager"] = self._named_resources[
                f"{self._env}__{asset_name}_io_manager"
            ]

    if self._enable_mlflow and is_mlflow_enabled():
        resource_defs |= {"mlflow": self._named_resources["mlflow"]}

    job = pipeline_graph.to_job(
        name=f"{self._env}__{job_name}",
        resource_defs=resource_defs,
        executor_def=executor_def,
        logger_defs=logger_defs,
    )

    return job

KedroRunTranslator

Manages translation of Kedro run parameters and hooks into Dagster resources and sensors, including error handling and context propagation.

Translator for Kedro run params.

Parameters:

  • context (KedroContext) –

    Kedro context.

  • project_path (str) –

    Path to the Kedro project.

  • env (str) –

    Kedro environment.

  • session_id (str) –

    Kedro session ID.

Source code in src/kedro_dagster/kedro.py
24
25
26
27
28
29
30
31
32
33
def __init__(self, context: "KedroContext", project_path: str, env: str, session_id: str):
    self._context = context
    self._catalog = context.catalog
    self._hook_manager = context._hook_manager
    self._kedro_params = dict(
        project_path=project_path,
        env=env,
        session_id=session_id,
        kedro_version=kedro_version,
    )

to_dagster

to_dagster(pipeline_name, filter_params)

Create a Dagster resource for Kedro pipeline hooks.

Parameters:

  • pipeline_name (str) –

    Name of the Kedro pipeline.

  • filter_params (dict[str, Any]) –

    Parameters used to filter the pipeline.

Returns:

  • ConfigurableResource

    A Dagster resource for Kedro pipeline hooks.

Source code in src/kedro_dagster/kedro.py
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
def to_dagster(
    self,
    pipeline_name: str,
    filter_params: dict[str, Any],
) -> dg.ConfigurableResource:
    """Create a Dagster resource for Kedro pipeline hooks.

    Args:
        pipeline_name (str): Name of the Kedro pipeline.
        filter_params (dict[str, Any]): Parameters used to filter the pipeline.

    Returns:
        ConfigurableResource: A Dagster resource for Kedro pipeline hooks.

    """

    context = self._context
    hook_manager = self._hook_manager

    class RunParamsModel(dg.Config):
        session_id: str
        project_path: str
        env: str
        kedro_version: str
        pipeline_name: str
        load_versions: list[str] | None = None
        extra_params: dict[str, Any] | None = None
        runner: str | None = None
        node_names: list[str] | None = None
        from_nodes: list[str] | None = None
        to_nodes: list[str] | None = None
        from_inputs: list[str] | None = None
        to_outputs: list[str] | None = None
        node_namespace: str | None = None
        tags: list[str] | None = None

        class Config:
            # force triggering type control when setting value instead of init
            validate_assignment = True
            # raise an error if an unknown key is passed to the constructor
            extra = "forbid"

    class KedroRunResource(RunParamsModel, dg.ConfigurableResource):
        """Resource for Kedro context."""

        @property
        def run_params(self) -> dict[str, Any]:
            return self.model_dump()  # type: ignore[no-any-return]

        @property
        def pipeline(self) -> dict[str, Any]:
            return pipelines.get(self.pipeline_name).filter(  # type: ignore[no-any-return]
                tags=self.tags,
                from_nodes=self.from_nodes,
                to_nodes=self.to_nodes,
                node_names=self.node_names,
                from_inputs=self.from_inputs,
                to_outputs=self.to_outputs,
                node_namespace=self.node_namespace,
            )

        def after_context_created_hook(self) -> None:
            hook_manager.hook.after_context_created(context=context)

    run_params = (
        self._kedro_params
        | filter_params
        | dict(
            pipeline_name=pipeline_name,
            load_versions=None,
            extra_params=None,
            runner=None,
        )
    )

    return KedroRunResource(**run_params)

ExecutorCreator

Creates Dagster executors from configuration, allowing for granular execution strategies.

Creates Dagster executor definitions from Kedro configuration.

Parameters:

  • dagster_config (BaseModel) –

    The Dagster configuration.

Source code in src/kedro_dagster/dagster.py
45
46
def __init__(self, dagster_config: "BaseModel"):
    self._dagster_config = dagster_config

create_executors

create_executors()

Create executor definitions from the configuration.

Returns:

  • Dict[str, ExecutorDefinition]

    A dict of executor definitions.

Source code in src/kedro_dagster/dagster.py
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
def create_executors(self) -> dict[str, dg.ExecutorDefinition]:
    """Create executor definitions from the configuration.

    Returns:
        Dict[str, ExecutorDefinition]: A dict of executor definitions.
    """
    # Register all available executors dynamically
    for executor_option, module_name, executor_name in self._EXECUTOR_CONFIGS:
        try:
            module = __import__(module_name, fromlist=[executor_name])
            executor = getattr(module, executor_name)
            self.register_executor(executor_option, executor)
        except ImportError:
            pass

    named_executors = {}
    for executor_name, executor_config in self._dagster_config.executors.items():
        # Make use of the executor map to create the executor
        executor = self._OPTION_EXECUTOR_MAP.get(type(executor_config), None)
        if executor is None:
            raise ValueError(
                f"Executor {executor_name} not supported. "
                "Please use one of the following executors: "
                f"{', '.join([str(k) for k in self._OPTION_EXECUTOR_MAP.keys()])}"
            )
        executor = executor.configured(executor_config.model_dump())
        named_executors[executor_name] = executor

    return named_executors

register_executor

register_executor(executor_option, executor)

Register an executor option with a Dagster executor.

Parameters:

  • executor_option (BaseModel) –

    The executor option to register.

  • executor (ExecutorDefinition) –

    The executor to register the option with.

Source code in src/kedro_dagster/dagster.py
48
49
50
51
52
53
54
55
def register_executor(self, executor_option: "BaseModel", executor: dg.ExecutorDefinition) -> None:
    """Register an executor option with a Dagster executor.

    Args:
        executor_option (BaseModel): The executor option to register.
        executor (ExecutorDefinition): The executor to register the option with.
    """
    self._OPTION_EXECUTOR_MAP[executor_option] = executor

LoggerTranslator

Translates Kedro loggers to Dagster loggers for unified logging across both frameworks.

Translates Kedro loggers to Dagster loggers.

Source code in src/kedro_dagster/dagster.py
132
133
134
def __init__(self, dagster_config: "BaseModel", package_name: str):
    self._dagster_config = dagster_config
    self._package_name = package_name

to_dagster

to_dagster()

Translate Kedro loggers to Dagster loggers.

Source code in src/kedro_dagster/dagster.py
147
148
149
150
151
152
153
154
155
156
def to_dagster(self) -> dict[str, dg.LoggerDefinition]:
    """Translate Kedro loggers to Dagster loggers."""
    named_loggers = {}
    for pipeline_name in pipelines:
        if pipeline_name != "__default__":
            named_loggers[f"{self._package_name}.pipelines.{pipeline_name}.nodes"] = self._get_logger_definition(
                self._package_name, pipeline_name
            )

    return named_loggers

ScheduleCreator

Generates Dagster schedules from configuration, enabling automated pipeline execution.

Creates Dagster schedule definitions from Kedro configuration.

Source code in src/kedro_dagster/dagster.py
91
92
93
def __init__(self, dagster_config: "BaseModel", named_jobs: dict[str, dg.JobDefinition]):
    self._dagster_config = dagster_config
    self._named_jobs = named_jobs

create_schedules

create_schedules()

Create schedule definitions from the configuration.

Returns:

  • Dict[str, ScheduleDefinition]

    A dict of schedule definitions.

Source code in src/kedro_dagster/dagster.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
def create_schedules(self) -> dict[str, dg.ScheduleDefinition]:
    """Create schedule definitions from the configuration.

    Returns:
        Dict[str, ScheduleDefinition]: A dict of schedule definitions.

    """
    named_schedule_config = {}
    if self._dagster_config.schedules is not None:
        for schedule_name, schedule_config in self._dagster_config.schedules.items():
            named_schedule_config[schedule_name] = schedule_config.model_dump()

    named_schedules = {}
    for job_name, job_config in self._dagster_config.jobs.items():
        schedule_config = job_config.schedule
        if isinstance(schedule_config, str):
            schedule_name = schedule_config
            if schedule_name in named_schedule_config:
                schedule = dg.ScheduleDefinition(
                    name=f"{job_name}_{schedule_name}_schedule",
                    job=self._named_jobs[job_name],
                    **named_schedule_config[schedule_name],
                )
            else:
                raise ValueError(
                    f"Schedule defined by {schedule_config} not found. "
                    "Please make sure the schedule is defined in the configuration."
                )

            named_schedules[job_name] = schedule

    return named_schedules

Utilities

Helper functions for formatting, filtering, and supporting translation between Kedro and Dagster concepts.

Utility functions.

format_dataset_name

format_dataset_name(name)

Convert a dataset name so that it is valid under Dagster's naming convention.

Parameters:

  • name (str) –

    The name to format.

Returns:

  • str

    The formatted name.

Source code in src/kedro_dagster/utils.py
82
83
84
85
86
87
88
89
90
91
92
def format_dataset_name(name: str) -> str:
    """Convert a dataset name so that it is valid under Dagster's naming convention.

    Args:
        name (str): The name to format.

    Returns:
        str: The formatted name.
    """

    return name.replace(".", "__")

format_node_name

format_node_name(name)

Convert a node name so that it is valid under Dagster's naming convention.

Parameters:

  • name (str) –

    The node name to format.

Returns:

  • str

    The formatted name.

Source code in src/kedro_dagster/utils.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
def format_node_name(name: str) -> str:
    """Convert a node name so that it is valid under Dagster's naming convention.

    Args:
        name (str): The node name to format.

    Returns:
        str: The formatted name.
    """
    dagster_name = name.replace(".", "__")

    allowed_pattern = re.compile(r"^[A-Za-z0-9_]+$")
    if not allowed_pattern.match(dagster_name):
        dagster_name = f"unnamed_node_{hashlib.md5(name.encode('utf-8')).hexdigest()}"
        LOGGER.warning(
            "Node is either unnamed or not in regex ^[A-Za-z0-9_]+$. "
            "Prefer naming your Kedro nodes directly using a `name`. "
            f"Node named `{name}` has been converted to `{dagster_name}`."
        )

    return dagster_name

get_asset_key_from_dataset_name

get_asset_key_from_dataset_name(dataset_name, env)

Get a Dagster AssetKey from a Kedro dataset name and environment.

Parameters:

  • dataset_name (str) –

    The Kedro dataset name.

  • env (str) –

    The Kedro environment.

Returns:

  • AssetKey

    The corresponding Dagster AssetKey.

Source code in src/kedro_dagster/utils.py
69
70
71
72
73
74
75
76
77
78
79
def get_asset_key_from_dataset_name(dataset_name: str, env: str) -> dg.AssetKey:
    """Get a Dagster AssetKey from a Kedro dataset name and environment.

    Args:
        dataset_name (str): The Kedro dataset name.
        env (str): The Kedro environment.

    Returns:
        AssetKey: The corresponding Dagster AssetKey.
    """
    return dg.AssetKey([env] + dataset_name.split("."))

get_filter_params_dict

get_filter_params_dict(pipeline_config)

Extract filter parameters from a pipeline config dict.

Parameters:

  • pipeline_config (dict[str, Any]) –

    Pipeline configuration.

Returns:

  • dict[str, Any]

    Filter parameters.

Source code in src/kedro_dagster/utils.py
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
def get_filter_params_dict(pipeline_config: dict[str, Any]) -> dict[str, Any]:
    """Extract filter parameters from a pipeline config dict.

    Args:
        pipeline_config (dict[str, Any]): Pipeline configuration.

    Returns:
        dict[str, Any]: Filter parameters.
    """
    filter_params = dict(
        tags=pipeline_config.get("tags"),
        from_nodes=pipeline_config.get("from_nodes"),
        to_nodes=pipeline_config.get("to_nodes"),
        node_names=pipeline_config.get("node_names"),
        from_inputs=pipeline_config.get("from_inputs"),
        to_outputs=pipeline_config.get("to_outputs"),
        node_namespace=pipeline_config.get("node_namespace"),
    )

    return filter_params

get_mlflow_resource_from_config

get_mlflow_resource_from_config(mlflow_config)

Create a Dagster resource definition from MLflow config.

Parameters:

  • mlflow_config (BaseModel) –

    MLflow configuration.

Returns:

  • ResourceDefinition

    Dagster resource definition for MLflow.

Source code in src/kedro_dagster/utils.py
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
def get_mlflow_resource_from_config(mlflow_config: "BaseModel") -> dg.ResourceDefinition:
    """Create a Dagster resource definition from MLflow config.

    Args:
        mlflow_config (BaseModel): MLflow configuration.

    Returns:
        ResourceDefinition: Dagster resource definition for MLflow.
    """
    from dagster_mlflow import mlflow_tracking

    mlflow_resource = mlflow_tracking.configured({
        "experiment_name": mlflow_config.tracking.experiment.name,
        "mlflow_tracking_uri": mlflow_config.server.mlflow_tracking_uri,
        "parent_run_id": None,
    })

    return mlflow_resource

is_mlflow_enabled

is_mlflow_enabled()

Check if MLflow is enabled in the Kedro context.

Returns:

  • bool

    True if MLflow is enabled, False otherwise.

Source code in src/kedro_dagster/utils.py
168
169
170
171
172
173
174
175
176
177
178
179
180
def is_mlflow_enabled() -> bool:
    """Check if MLflow is enabled in the Kedro context.

    Returns:
        bool: True if MLflow is enabled, False otherwise.
    """
    try:
        import kedro_mlflow  # NOQA
        import mlflow  # NOQA

        return True
    except ImportError:
        return False

render_jinja_template

render_jinja_template(src, is_cookiecutter=False, **kwargs)

Render a Jinja template from a file or string.

Parameters:

  • src (str | Path) –

    Path to the template file or template string.

  • is_cookiecutter (bool, default: False ) –

    Whether to use cookiecutter-style rendering.

  • **kwargs

    Variables to pass to the template.

Returns:

  • str

    Rendered template as a string.

Source code in src/kedro_dagster/utils.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
def render_jinja_template(src: str | Path, is_cookiecutter=False, **kwargs) -> str:  # type: ignore[no-untyped-def]
    """Render a Jinja template from a file or string.

    Args:
        src (str | Path): Path to the template file or template string.
        is_cookiecutter (bool): Whether to use cookiecutter-style rendering.
        **kwargs: Variables to pass to the template.

    Returns:
        str: Rendered template as a string.
    """
    src = Path(src)

    template_loader = FileSystemLoader(searchpath=src.parent.as_posix())
    # the keep_trailing_new_line option is mandatory to
    # make sure that black formatting will be preserved
    template_env = Environment(loader=template_loader, keep_trailing_newline=True)
    template = template_env.get_template(src.name)
    if is_cookiecutter:
        # we need to match tags from a cookiecutter object
        # but cookiecutter only deals with folder, not file
        # thus we need to create an object with all necessary attributes
        class FalseCookieCutter:
            def __init__(self, **kwargs):  # type: ignore[no-untyped-def]
                self.__dict__.update(kwargs)

        parsed_template = template.render(cookiecutter=FalseCookieCutter(**kwargs))  # type: ignore[no-untyped-call]
    else:
        parsed_template = template.render(**kwargs)

    return parsed_template  # type: ignore[no-any-return]

unformat_asset_name

unformat_asset_name(name)

Convert a Dagster-formatted asset name back to Kedro's naming convention.

Parameters:

  • name (str) –

    The Dagster-formatted name.

Returns:

  • str

    The original Kedro name.

Source code in src/kedro_dagster/utils.py
118
119
120
121
122
123
124
125
126
127
128
def unformat_asset_name(name: str) -> str:
    """Convert a Dagster-formatted asset name back to Kedro's naming convention.

    Args:
        name (str): The Dagster-formatted name.

    Returns:
        str: The original Kedro name.
    """

    return name.replace("__", ".")

write_jinja_template

write_jinja_template(src, dst, **kwargs)

Render and write a Jinja template to a destination file.

Parameters:

  • src (str | Path) –

    Path to the template file.

  • dst (str | Path) –

    Path to the output file.

  • **kwargs

    Variables to pass to the template.

Source code in src/kedro_dagster/utils.py
55
56
57
58
59
60
61
62
63
64
65
66
def write_jinja_template(src: str | Path, dst: str | Path, **kwargs) -> None:  # type: ignore[no-untyped-def]
    """Render and write a Jinja template to a destination file.

    Args:
        src (str | Path): Path to the template file.
        dst (str | Path): Path to the output file.
        **kwargs: Variables to pass to the template.
    """
    dst = Path(dst)
    parsed_template = render_jinja_template(src, **kwargs)
    with open(dst, "w") as file_handler:
        file_handler.write(parsed_template)