Source code for aiscalator.core.config

# -*- coding: utf-8 -*-
# Apache Software License 2.0
#
# Copyright (c) 2018, Christophe Duong
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Handles configurations files for the application
"""
import os
import uuid
from datetime import datetime
from logging import DEBUG
from logging import Formatter
from logging import StreamHandler
from logging import getLogger
from logging.config import dictConfig
from platform import uname
from tempfile import mkstemp
from urllib.error import HTTPError
from urllib.error import URLError

import pyhocon
from pytz import timezone

from aiscalator import __version__
from aiscalator.core.utils import copy_replace
from aiscalator.core.utils import data_file


def _generate_global_config() -> str:
    """Generate a standard configuration file for the application in the
    user's home folder ~/.aiscalator/config/aiscalator.conf from the
    template file in aiscalator/config/template/aiscalator.conf
    """
    logger = getLogger(__name__)
    dst = os.path.join(os.path.expanduser("~"),
                       ".aiscalator/config/aiscalator.conf")
    logger.info("Generating a new configuration file for aiscalator:\n\t%s",
                dst)
    pattern = [
        "testUserID",
        "generation_date",
    ]
    replace_value = [
        generate_user_id(),
        '"' + str(datetime
                  .utcnow()
                  .replace(tzinfo=timezone("UTC"))) +
        '" // in UTC timezone',
    ]
    dst_dir = os.path.dirname(dst)
    if dst_dir:
        os.makedirs(dst_dir, exist_ok=True)
    copy_replace(data_file("../config/template/aiscalator.conf"),
                 dst, pattern=pattern, replace_value=replace_value)
    open(os.path.join(dst_dir, "apt_packages.txt"), 'a').close()
    open(os.path.join(dst_dir, "requirements.txt"), 'a').close()
    open(os.path.join(dst_dir, "lab_extensions.txt"), 'a').close()
    return dst


[docs]def generate_user_id() -> str: """ Returns ------- str Returns a string identifying this user when the setup was run first """ return 'u' + str((uuid.getnode()))
def _app_config_file() -> str: """Return the path to the app configuration file.""" if 'AISCALATOR_HOME' in os.environ: home = os.environ['AISCALATOR_HOME'] file = os.path.join(home, "config", "aiscalator.conf") if os.path.exists(file): return file return os.path.join(os.path.expanduser("~"), '.aiscalator', 'config', 'aiscalator.conf') # TODO refactor, splitting up the Global App Config part from # Jupyter Config (step) and Airflow config (DAG) into 3 classes # with separate APIs.
[docs]class AiscalatorConfig: """ A configuration object for the Aiscalator application. This object stores: - global configuration for the whole application - configuration for a particular context specified in a step configuration file. - In this case, we might even focus on a particular step. ... Attributes ---------- _app_conf global configuration object for the application _config_path : str path to the configuration file (or plain configuration as string) _step_name : str name of the currently processed step _step configuration object for the currently processed step _dag_name : str name of the currently processed dag _dag configuration object for the currently processed dag """ def __init__(self, config=None, step_selection=None, dag_selection=None): """ Parameters ---------- config : str path to the step configuration file (or plain configuration string) step_selection : str Name of step from the configuration file to focus on dag_selection : str Name of dag from the configuration file to focus on """ self._config_path = config self._app_conf = _setup_app_config() self._setup_logging() parsed_config = _parse_config(config) self._step_name = None self._step = None self._dag_name = None self._dag = None if parsed_config: step_sel = step_selection if "steps" in parsed_config: self._step_name, self._step = ( _select_config(parsed_config, root_node='steps', child_node='task', selection=step_sel) ) if "dags" in parsed_config: self._dag_name, self._dag = ( _select_config(parsed_config, root_node='dags', child_node='definition', selection=dag_selection) ) ################################################### # Global App Config methods # ################################################### def _setup_logging(self): """ Setup the logging configuration of the application """ if self.app_config_has("logging"): log_config = self.app_config()["logging"] filename_list = [ v['filename'] for k, v in _find_config_tree(log_config, "filename") ] # pre-create directory in advance for all loggers for file in filename_list: file_dir = os.path.dirname(file) if file_dir and not os.path.isdir(file_dir): os.makedirs(file_dir, exist_ok=True) dictConfig(log_config) else: log = getLogger() handler = StreamHandler() formatter = Formatter( "%(asctime)s-%(threadName)s-%(name)s-%(levelname)s-%(message)s" ) handler.setFormatter(formatter) log.addHandler(handler) log.setLevel(DEBUG) msg = ("Starting " + os.path.basename(__name__) + " version " + __version__ + " on " + "_".join(uname()).replace(" ", "_")) logger = getLogger(__name__) logger.debug(msg)
[docs] def app_config_home(self) -> str: """Return the path to the app configuration folder.""" if self.app_config_has("app_config_home_directory"): return self.app_config()["app_config_home_directory"] return os.path.join(os.path.expanduser("~"), '.aiscalator')
[docs] def redefine_app_config_home(self, config_home): """ Modify the configuration file to change the value of the application configuration home directory. Parameters ---------- config_home : str path to the new configuration home Returns ------- AiscalatorConfig the new configuration object """ dst = _app_config_file() new_config = ( pyhocon.ConfigFactory.parse_string( "aiscalator.app_config_home_directory = " + config_home ) ).with_fallback(_app_config_file(), resolve=False) with open(dst, "w") as output: output.write( pyhocon.converter.HOCONConverter.to_hocon(new_config) ) self._app_conf = new_config return new_config
[docs] def redefine_airflow_workspaces(self, workspaces): """ Modify the configuration file to change the value of the airflow workspaces Parameters ---------- workspaces : list list of workspaces to bind to airflow Returns ------- AiscalatorConfig the new configuration object """ dst = _app_config_file() new_config = ( pyhocon.ConfigFactory.parse_string( "aiscalator.airflow.setup.workspace_paths = [\n" + "\n".join([ws for ws in workspaces]) + "]" ) ).with_fallback(_app_config_file(), resolve=False) with open(dst, "w") as output: output.write( pyhocon.converter.HOCONConverter.to_hocon(new_config) ) self._app_conf = new_config return new_config
[docs] def user_env_file(self, job=None) -> list: """ Find a list of env files to pass to docker containers Parameters ---------- job Optional step or dag config Returns ------- List env files """ logger = getLogger(__name__) result = [] # Look if any env file or variables were defined in the step/dag if job: (_, env_filename) = mkstemp(prefix="aiscalator_", text=True) with open(env_filename, mode="w") as env_file: # concatenate all the env files and variables into one for env in job: if isinstance(env, pyhocon.config_tree.ConfigTree): for k in env.keys(): env_file.write(k + '=' + env.get(k) + '\n') elif os.path.isfile(os.path.join(self.root_dir(), env)): with open(os.path.join(self.root_dir(), env), mode="r") as file: for line in file: env_file.write(line) else: msg = ("Undefined env" + env + ": expecting a dict of environment variables" + " or path to environment configuration file.") logger.warning("Warning %s", msg) result.append(env_filename) # TODO look in user config if env file has been redefined result.append( os.path.join(self.app_config_home(), "config", ".env") ) return result
def _timestamp_now(self) -> str: """ Depending on how the timezone is configured, returns the timestamp for this instant. """ date_now = datetime.utcnow().replace(tzinfo=timezone("UTC")) if self._app_conf["aiscalator"]: pst = timezone(self.app_config().timezone) else: pst = timezone('Europe/Paris') return date_now.astimezone(pst).strftime("%Y%m%d%H%M%S")
[docs] def app_config(self): """ Returns ------- str the configuration object for the aiscalator application """ return self._app_conf["aiscalator"]
[docs] def config_path(self): """ Returns ------- str Returns the path to the step configuration file. If it was an URL, it will return the path to the temporary downloaded version of it. If it was a plain string, then returns None """ if os.path.exists(self._config_path): if pyhocon.ConfigFactory.parse_file(self._config_path): return os.path.realpath(self._config_path) # TODO if string is url/git repo, download file locally first return None
[docs] def root_dir(self): """ Returns ------- str Returns the path to the folder containing the configuration file """ path = self.config_path() if path: root_dir = os.path.dirname(path) if not root_dir.endswith("/"): root_dir += "/" return root_dir return None
[docs] def user_id(self) -> str: """ Returns ------- str the user id stored when the application was first setup """ return self.app_config()["metadata.user.id"]
[docs] def app_config_has(self, field) -> bool: """ Tests if the applicatin config has a configuration value for the field. """ if not self.app_config(): return False return field in self.app_config()
[docs] def airflow_docker_compose_file(self): """Return the configuration file to bring airflow services up.""" if self.app_config_has("airflow.docker_compose_file"): return self.app_config()["airflow.docker_compose_file"] return None
[docs] def validate_config(self): """ Check if all the fields in the reference config are defined in focused steps too. Otherwise raise an Exception (either pyhocon.ConfigMissingException or pyhocon.ConfigWrongTypeException) """ reference = data_file("../config/template/minimum_aiscalator.conf") ref = pyhocon.ConfigFactory.parse_file(reference) msg = "In Global Application Configuration file " _validate_configs(self._app_conf, ref, msg, missing_exception=True, type_mismatch_exception=True) reference = data_file("../config/template/aiscalator.conf") ref = pyhocon.ConfigFactory.parse_file(reference) msg = "In Global Application Configuration file " _validate_configs(self._app_conf, ref, msg, missing_exception=False, type_mismatch_exception=True) if self._step_name: reference = data_file("../config/template/minimum_step.conf") ref = pyhocon.ConfigFactory.parse_file(reference) msg = "in step named " + self._step_name _validate_configs(self._step, ref["steps"]["Untitled"], msg, missing_exception=True, type_mismatch_exception=True) reference = data_file("../config/template/step.conf") ref = pyhocon.ConfigFactory.parse_file(reference) msg = "in step named " + self._step_name _validate_configs(self._step, ref["steps"]["Untitled"], msg, missing_exception=False, type_mismatch_exception=True) if self._dag_name: reference = data_file("../config/template/minimum_dag.conf") ref = pyhocon.ConfigFactory.parse_file(reference) msg = "in dag named " + self._dag_name _validate_configs(self._dag, ref["dags"]["Untitled"], msg, missing_exception=True, type_mismatch_exception=True) reference = data_file("../config/template/step.conf") ref = pyhocon.ConfigFactory.parse_file(reference) msg = "in dag named " + self._dag_name _validate_configs(self._dag, ref["dags"]["Untitled"], msg, missing_exception=False, type_mismatch_exception=True)
################################################### # Step methods # ###################################################
[docs] def step_notebook_output_path(self, notebook) -> str: """Generates the name of the output notebook""" return ("/home/jovyan/work/notebook_run/" + os.path.basename(notebook).replace(".ipynb", "") + "_" + self._timestamp_now() + self.user_id() + ".ipynb")
[docs] def step_field(self, field): """ Returns the value associated with the field for the currently focused step. """ if self.has_step_field(field): return self._step[field] return None
[docs] def has_step_field(self, field) -> bool: """ Tests if the currently focused step has a configuration value for the field. """ if not self._step: return False return field in self._step
[docs] def step_name(self): """ Returns the name of the currently focused step """ return self._step_name
[docs] def step_file_path(self, string): """ Returns absolute path of a file from a field of the currently focused step. """ if not self.has_step_field(string): return None # TODO handle url root_dir = self.root_dir() if root_dir: path = os.path.join(root_dir, self.step_field(string)) return os.path.realpath(path) return os.path.realpath(self.step_field(string))
[docs] def step_container_name(self) -> str: """Return the docker container name to execute this step""" return ( self.step_field("task.type") + "_" + self.step_name().replace(".", "_") )
[docs] def step_extract_parameters(self) -> list: """Returns a list of docker parameters""" result = [] if self.has_step_field("task.parameters"): for param in self.step_field("task.parameters"): for key in param: result += ["-p", key, param[key]] return result
################################################### # DAG methods # ###################################################
[docs] def dag_field(self, field): """ Returns the value associated with the field for the currently focused dag. """ if self.has_dag_field(field): return self._dag[field] return None
[docs] def has_dag_field(self, field) -> bool: """ Tests if the currently focused dag has a configuration value for the field. """ if not self._dag: return False return field in self._dag
[docs] def dag_name(self): """ Returns the name of the currently focused dag """ return self._dag_name
[docs] def dag_file_path(self, string): """ Returns absolute path of a file from a field of the currently focused dag. """ if not self.has_dag_field(string): return None # TODO handle url root_dir = self.root_dir() if root_dir: path = os.path.join(root_dir, self.dag_field(string)) return os.path.realpath(path) return os.path.realpath(self.dag_field(string))
[docs] def dag_container_name(self) -> str: """Return the docker container name to execute this step""" return ( "airflow_" + self.dag_name().replace(".", "_") )
def _setup_app_config(): """ Setup global application configuration. If not found in the default location, this method will generate a brand new one. """ try: file = _app_config_file() conf = pyhocon.ConfigFactory.parse_file(file) except FileNotFoundError: conf = pyhocon.ConfigFactory.parse_file(_generate_global_config()) # test if since_version is deprecated and regenerate a newer config return conf def _validate_configs(test, reference, path, missing_exception=True, type_mismatch_exception=True): """ Recursively check two configs if they match Parameters ---------- test configuration object to test reference reference configuration object path : str this accumulates the recursive path for details in Exceptions missing_exception : bool when a missing field is found, raise xception? type_mismatch_exception : bool when a field has type mismatch, raise xception? """ logger = getLogger(__name__) if isinstance(reference, pyhocon.config_tree.ConfigTree): for key in reference.keys(): if key not in test.keys(): msg = (path + ": Missing definition of " + key) if missing_exception: raise pyhocon.ConfigMissingException( message="Exception " + msg ) else: logger.warning("Warning %s", msg) elif not isinstance(test[key], type(reference[key])): msg = (path + ": Type mismatch of " + key + " found type " + str(type(test[key])) + " instead of " + str(type(reference[key]))) if type_mismatch_exception: raise pyhocon.ConfigWrongTypeException( message="Exception " + msg ) else: logger.warning("Warning %s", msg) elif (isinstance(test[key], pyhocon.config_tree.ConfigTree) and isinstance(reference[key], pyhocon.config_tree.ConfigTree)): # test recursively _validate_configs(test[key], reference[key], ".".join([path, key]), missing_exception, type_mismatch_exception) elif (isinstance(test[key], list) and isinstance(reference[key], list)): # iterate through both collections for i in test[key]: for j in reference[key]: _validate_configs(i, j, ".".join([path, key]), missing_exception, type_mismatch_exception) def _parse_config(step_config): """ Interpret the step_config to produce a step configuration object. It could be provided as: - a path to a local file - a url to a remote file - the plain configuration stored as string Returns ------- Step configuration object """ if not step_config: return None if os.path.exists(step_config): conf = pyhocon.ConfigFactory.parse_file(step_config) else: try: conf = pyhocon.ConfigFactory.parse_URL(step_config) except (HTTPError, URLError): conf = pyhocon.ConfigFactory.parse_string(step_config) return conf def _select_config(conf, root_node: str, child_node: str, selection: str): """ Extract the list of step objects corresponding to the list of names provided. Parameters ---------- conf step configuration object root_node : str node to start looking from child_node : str node that represents the leaves we are searching for. The path from root_node to child_node is compared with selection to check for a match. selection : str name of node to extract Returns ------- tuple of (node_name, node) of selected configuration object """ result = None candidates = [] if conf and root_node in conf: candidates = _find_config_tree(conf[root_node], child_node) if selection: for name, candidate in candidates: if name == selection: result = (name, candidate) break else: result = candidates[0] if selection and not result: msg = (selection + "'s " + child_node + " was not found in " + root_node + " configurations.\n ") if candidates: msg += ("Available candidates are: " + " ".join([name for name, _ in candidates])) raise pyhocon.ConfigMissingException(msg) return result def _find_config_tree(tree: pyhocon.ConfigTree, target_node, path="") -> list: """ Find all target_node objects in the Configuration object and report their paths. Parameters ---------- tree : pyhocon.ConfigTree Configuration object target_node : str key of Config to find path : str path that was traversed to get to this tree Returns ------- list list of names of Configuration objects containing a definition of a section 'task' """ result = [] if path: next_path = path + "." else: next_path = "" for key in tree.keys(): if key == target_node: result += [(path, tree)] else: if isinstance(tree[key], pyhocon.config_tree.ConfigTree): value = _find_config_tree(tree[key], target_node, path=next_path + key) if value: result += value return result
[docs]def convert_to_format(file: str, output: str, output_format: str): """ Converts a HOCON file to another format Parameters ---------- file : str hocon file to convert output : str output file to produce output_format : str format of the output file Returns ------- str the output file """ (pyhocon .converter .HOCONConverter .convert_from_file(file, output_file=output, output_format=output_format)) os.remove(file) return output