aiscalator.core package

Submodules

aiscalator.core.config module

Handles configurations files for the application

class aiscalator.core.config.AiscalatorConfig(config=None, step_selection=None, dag_selection=None)[source]

Bases: object

A configuration object for the Aiscalator application.

This object stores:
  • global configuration for the whole application

  • configuration for a particular context specified in a step configuration file.

  • In this case, we might even focus on a particular step.

Variables
  • _app_conf – global configuration object for the application

  • _config_path (str) – path to the configuration file (or plain configuration as string)

  • _step_name (str) – name of the currently processed step

  • _step – configuration object for the currently processed step

  • _dag_name (str) – name of the currently processed dag

  • _dag – configuration object for the currently processed dag

Methods

airflow_docker_compose_file()

Return the configuration file to bring airflow services up.

app_config()

returns

str – the configuration object for the aiscalator application

app_config_has(field)

Tests if the applicatin config has a configuration value for the field.

app_config_home()

Return the path to the app configuration folder.

config_path()

returns

str – Returns the path to the step configuration file.

dag_container_name()

Return the docker container name to execute this step

dag_field(field)

Returns the value associated with the field for the currently focused dag.

dag_file_path(string)

Returns absolute path of a file from a field of the currently focused dag.

dag_name()

Returns the name of the currently focused dag

has_dag_field(field)

Tests if the currently focused dag has a configuration value for the field.

has_step_field(field)

Tests if the currently focused step has a configuration value for the field.

redefine_airflow_workspaces(workspaces)

Modify the configuration file to change the value of the airflow workspaces

redefine_app_config_home(config_home)

Modify the configuration file to change the value of the application configuration home directory.

root_dir()

returns

str – Returns the path to the folder containing the

step_container_name()

Return the docker container name to execute this step

step_extract_parameters()

Returns a list of docker parameters

step_field(field)

Returns the value associated with the field for the currently focused step.

step_file_path(string)

Returns absolute path of a file from a field of the currently focused step.

step_name()

Returns the name of the currently focused step

step_notebook_output_path(notebook)

Generates the name of the output notebook

user_env_file([job])

Find a list of env files to pass to docker containers

user_id()

returns

str – the user id stored when the application was first setup

validate_config()

Check if all the fields in the reference config are defined in focused steps too.

airflow_docker_compose_file()[source]

Return the configuration file to bring airflow services up.

app_config()[source]
Returns

str – the configuration object for the aiscalator application

app_config_has(field) → bool[source]

Tests if the applicatin config has a configuration value for the field.

app_config_home() → str[source]

Return the path to the app configuration folder.

config_path()[source]
Returns

str – Returns the path to the step configuration file. If it was an URL, it will return the path to the temporary downloaded version of it. If it was a plain string, then returns None

dag_container_name() → str[source]

Return the docker container name to execute this step

dag_field(field)[source]

Returns the value associated with the field for the currently focused dag.

dag_file_path(string)[source]

Returns absolute path of a file from a field of the currently focused dag.

dag_name()[source]

Returns the name of the currently focused dag

has_dag_field(field) → bool[source]

Tests if the currently focused dag has a configuration value for the field.

has_step_field(field) → bool[source]

Tests if the currently focused step has a configuration value for the field.

redefine_airflow_workspaces(workspaces)[source]

Modify the configuration file to change the value of the airflow workspaces

Parameters

workspaces (list) – list of workspaces to bind to airflow

Returns

AiscalatorConfig – the new configuration object

redefine_app_config_home(config_home)[source]

Modify the configuration file to change the value of the application configuration home directory.

Parameters

config_home (str) – path to the new configuration home

Returns

AiscalatorConfig – the new configuration object

root_dir()[source]
Returns

str – Returns the path to the folder containing the configuration file

step_container_name() → str[source]

Return the docker container name to execute this step

step_extract_parameters() → list[source]

Returns a list of docker parameters

step_field(field)[source]

Returns the value associated with the field for the currently focused step.

step_file_path(string)[source]

Returns absolute path of a file from a field of the currently focused step.

step_name()[source]

Returns the name of the currently focused step

step_notebook_output_path(notebook) → str[source]

Generates the name of the output notebook

user_env_file(job=None) → list[source]

Find a list of env files to pass to docker containers

Parameters

job – Optional step or dag config

Returns

List – env files

user_id() → str[source]
Returns

str – the user id stored when the application was first setup

validate_config()[source]

Check if all the fields in the reference config are defined in focused steps too. Otherwise raise an Exception (either pyhocon.ConfigMissingException or pyhocon.ConfigWrongTypeException)

aiscalator.core.config.convert_to_format(file: str, output: str, output_format: str)[source]

Converts a HOCON file to another format

Parameters
  • file (str) – hocon file to convert

  • output (str) – output file to produce

  • output_format (str) – format of the output file

Returns

str – the output file

aiscalator.core.config.generate_user_id() → str[source]
Returns

str – Returns a string identifying this user when the setup was run first

aiscalator.core.log_regex_analyzer module

Class to parse output logs from subprocess and catch particular expressions

class aiscalator.core.log_regex_analyzer.LogRegexAnalyzer(pattern=None, log_level=10)[source]

Bases: object

A regular expression analyzer object to parse logs and extract values from patterns in the logs. …

Variables
  • _artifact (str) – Value of the pattern found in the logs

  • _pattern (bytes) – Regular expression to search for in the logs

Methods

artifact()

Returns the artifact extracted from the logs.

grep_logs(pipe)

Reads the logs and extract values defined by the pattern

artifact()[source]

Returns the artifact extracted from the logs.

grep_logs(pipe)[source]

Reads the logs and extract values defined by the pattern

Parameters

pipe – Stream of logs to analyze

aiscalator.core.utils module

Various Utility functions

class aiscalator.core.utils.BackgroundThreadRunner(command, log_function, no_redirect=False)[source]

Bases: object

Worker Thread to run logging output in the background

Variables
  • _process – Process object of the command running in the background

  • _log_function (function(stream -> bool)) – callback function to log the output of the command

  • _no_redirect (bool) – whether the subprocess STDOUT and STDERR should be redirected to logs

  • _worker (Thread) – Thread object

Methods

process()

Returns the process object.

run()

Starts the Thread, process the output of the process.

process()[source]

Returns the process object.

run()[source]

Starts the Thread, process the output of the process.

aiscalator.core.utils.check_notebook(logger, code_path, from_format='py:percent')[source]

Checks existence of notebook file and regenerates using jupytext from associated .py file if possible. Otherwise, create an empty notebook file.

Parameters
  • code_path (str) – path to the notebook to check

  • from_format (str) – jupytext format of the .py input file

aiscalator.core.utils.check_notebook_dir(logger, code_path, from_format='py:percent')[source]

Check a folder and generate all notebook files that might be required in that folder.

Parameters
  • code_path (str) – path to a file in the folder

  • from_format (str) – jupytext format of potential .py files

aiscalator.core.utils.copy_replace(src, dst, pattern=None, replace_value=None)[source]

Copies a file from src to dst replacing pattern by replace_value

Parameters
  • src (string) – Path to the source filename to copy from

  • dst (string) – Path to the output filename to copy to

  • pattern – list of Patterns to replace inside the src file

  • replace_value – list of Values to replace by in the dst file

aiscalator.core.utils.data_file(path)[source]

Utility function to find resources data file packaged along with code

Parameters

path (path) – path to the resource file in the package

Returns

absolute path to the resource data file

aiscalator.core.utils.find(collection, item, field='name')[source]

Finds an element in a collection which has a field equal to particular item value

Parameters
  • collection (Set) – Collection of objects

  • item – value of the item that we are looking for

  • field (string) – Name of the field from the object to inspect

Returns

object – Corresponding element that has a field matching item in the collection

aiscalator.core.utils.format_file_content(content, prefix='', suffix='')[source]

Reformat the content of a file line by line, adding prefix and suffix strings.

Parameters
  • content (str) – path to the file to reformat its content

  • prefix (str) – add to each line this prefix string

  • suffix (str) – add to each line this suffix string

Returns

str – Formatted content of the file

aiscalator.core.utils.log_info(pipe)[source]

Default logging function

aiscalator.core.utils.notebook_file(code_path, from_format='py:percent')[source]

Parse a path to return both the ipynb and py versions of the file.

Parameters
  • code_path (str) – path to a file

  • from_format (str) – jupytext format of potential .py files

Returns

(str, str) – tuple of 2 paths to ipynb and py files

aiscalator.core.utils.sha256(file: str)[source]

Reads a file content and returns its sha256 hash.

aiscalator.core.utils.subprocess_run(command, log_function=<function log_info>, no_redirect=False, wait=True)[source]

Run command in a subprocess while redirecting output to log_function.

The subprocess either runs synchroneoulsy or in the background depending on the wait parameter.

Parameters
  • command (List) – Command to run in the subprocess

  • log_function (function) – Callback function to log the output of the subprocess

  • no_redirect (bool) – whether the subprocess STDOUT and STDERR should be redirected to logs

  • wait (bool) – Whether the subprocess should be run synchroneously or in the background

Returns

  • int – return code of the subprocess

  • BackgroundThreadRunner – the thread running in the background

aiscalator.core.utils.wait_for_jupyter_lab(commands, logger, notebook, port, folder)[source]

Starts jupyter lab and wait for it to start, returning the url it’s running from.

Parameters
  • commands (list) – List of commands to run to start the process

  • logger (logging.Logger) – Logger object

  • notebook (str) – path to the notebook

  • port – port on which the jupyter lab is listening

  • folder (str) – path in the container to reach the notebook

Returns

str – url from which it is serving the jupyter lab

Module contents