Source code for code_genie.pipeline

import json
import os
import time
from typing import Any, Dict, List, Optional, TypeVar, Union

from pydantic import BaseModel, validator

from code_genie.genie import Genie, GenieResult
from code_genie.io import (
    BigQueryToDataframeSource,
    BoolArg,
    CsvToDataFrameSource,
    DataFrameToCsvSink,
    IntArg,
    StringArg,
)
from code_genie.io.argument import GenieArgument
from code_genie.io.base import GenieSource

Source = TypeVar("Source", CsvToDataFrameSource, BigQueryToDataframeSource)
Sink = TypeVar("Sink", bound=DataFrameToCsvSink)
Argument = TypeVar("Argument", StringArg, IntArg, BoolArg)


[docs]class PipelineStep(BaseModel): genie_result: GenieResult """Result of the genie which should be run in this step""" data: Optional[Union[Source, GenieResult]] = None """Data to be passed to the genie for computation. This could be either a data source or a previous genie result""" additional_inputs: Optional[Dict[str, Union[Source, GenieResult, Argument]]] = None """Set this value for each additional input to the genie. The dictionary key should be the name of the input and the value could be one of the 3 things: 1. A genie data source 2. A genie result from a previous step 3. A constant value to be passed as an argument to the pipeline""" sink: Optional[Sink] = None """If the output of this step needs to be exported, then a sink can be provided here"""
[docs]class GeniePipeline(BaseModel): name: str """Name of the pipeline""" version: str """Version of the pipeline""" cache_dir: str """Directory where the genies being used in this pipeline are cached. The pipeline will also be cached at the same location""" steps: List[PipelineStep] """List of steps in the pipeline"""
[docs] def add(self, step: PipelineStep): """Add a step to the pipeline""" return self.copy(update={"steps": self.steps + [step]})
@validator("steps") def _validate_steps(cls, v: List[PipelineStep]): # base_input of the first step should be a genie source if v[0].data is None: raise ValueError(f"base_input_source of the first step should be set, found None") # there should be atleast 1 sink if all(step.sink is None for step in v): raise ValueError("atleast one of the steps should have a sink; none found") return v def _get_filepath(self, filename: str): if not filename.endswith("json"): filename += ".json" return os.path.join(self.cache_dir, filename)
[docs] def export(self, filename: str): """Export the pipeline as a json file""" with open(self._get_filepath(filename), "w") as f: json.dump(self.dict(), f)
[docs] @classmethod def load(cls, filepath: str): """Load the pipeline from a json file""" with open(filepath, "r") as f: return cls.parse_obj(json.load(f))
@classmethod def _get_cached_genie_result(cls, step_id: str, genie_id: str, cached_results: Dict[str, GenieResult]): if genie_id not in cached_results: raise ValueError( f"Error in step id id: {step_id}; You are attempting to use the results of genie with id: {genie_id} " f"in this step but the genie has not been run in a previous step. Add a step before this step in the " f"pipeline to run genie id {genie_id}." ) return cached_results[genie_id].result
[docs] def run(self, args: Dict[str, Any]): """Run the pipeline using the value of the arguments passed. Note that all arguments which do not have a default value needs to be passed here for the pipeline to run.""" cached_genie_results: Dict[str, GenieResult] = {} for i, step in enumerate(self.steps): print(f"Running step {i+1}: {step.genie_result.id}") # initialize timer start_time = time.time() step_id = step.genie_result.id # get the base input if isinstance(step.data, GenieSource): base_input = step.data.get(**args) elif isinstance(step.data, GenieResult): base_input = self._get_cached_genie_result( step_id=step_id, genie_id=step.data.id, cached_results=cached_genie_results ) else: raise ValueError(f"Invalid type for base_input: {type(step.data)}") # get the additional inputs additional_inputs = {} for name, add_input in (step.additional_inputs or {}).items(): if isinstance(add_input, GenieSource): additional_inputs[name] = add_input.get(**args) if isinstance(add_input, GenieResult): additional_inputs[name] = self._get_cached_genie_result(step_id, add_input.id, cached_genie_results) if isinstance(add_input, GenieArgument): additional_inputs[name] = add_input.get(**args) # run the genie genie = Genie(data=base_input) genie_result = genie.run(step.genie_result.code, additional_inputs) cached_genie_results[step_id] = GenieResult( id=step_id, result=genie_result, code=step.genie_result.code, cache_dir=self.cache_dir ) # write the output if step.sink is not None: step.sink.put(genie_result, **args) end_time = time.time() print(f"\tCompleted in {end_time - start_time:.1f} seconds")