import os
import dvc.api
import json
from datetime import datetime, timedelta
from zoneinfo import ZoneInfo
import requests
import functools, itertools
import polars as pl
from pyprojroot.here import here
Generation Transform
The goal of this pipeline is to update a parquet file with generation data from the French grid.
In the transform step:
- Parse response saved from the ingest step
- Concatenate with existing parquet files
- Write parquet files
First, let’s read the parameters:
= dvc.api.params_show("dvc-params.yaml") params
Let’s load in the results from the previous step. We also load the published results, so that we can append the current results. Similar to the ingest step, we do not declare this file as a dependency, to avoid circularity.
with open(here("data/01-ingest/generation.json"), "r") as file:
= json.load(file)
array
= pl.read_parquet(here("data/99-publish/standard/generation.parquet")) table_existing
Here, I use some functional-programming tools (because that’s what I know) to make one observation per interval and production-type:
= list(
by_type map(
lambda x: list(
map(
lambda v: {
"type": x["production_type"],
"interval_start": v["start_date"],
"interval_end": v["end_date"],
"generation": v["value"],
},"values"],
x[
)
),
array,
)
)
= functools.reduce(itertools.chain, by_type) fixed
We’re now in a form to convert this to a Polars DataFrame, to parse, etc. Let’s create a DataFrame from the list:
= pl.DataFrame(fixed) table_raw
We can use Polars expressions to parse the date-times:
= table_raw.with_columns(
table
["interval_start")
pl.col(str.strptime(pl.Datetime("ms"))
.=params["tz_local"]),
.dt.convert_time_zone(time_zone"interval_end")
pl.col(str.strptime(pl.Datetime("ms"))
.=params["tz_local"]),
.dt.convert_time_zone(time_zone
] )
Let’s remove duplicate entries:
= (
table_combined
pl.concat([table_existing, table])=["interval_start", "type"])
.unique(subset=["interval_start", "type"])
.sort(by )
For the combined table, for each type of generation, count the observations and null values.
"type")).agg(
table_combined.groupby(pl.col("interval_start").min(),
pl.col("interval_end").max(),
pl.col("generation").count().alias("n_observations"),
pl.col("generation").null_count().alias("n_value_null"),
pl.col( )
type | interval_start | interval_end | n_observations | n_value_null |
---|---|---|---|---|
str | datetime[ms, Europe/Paris] | datetime[ms, Europe/Paris] | u32 | u32 |
"WIND" | 2017-01-01 00:00:00 CET | 2023-08-10 00:00:00 CEST | 231524 | 0 |
"EXCHANGE" | 2017-01-01 00:00:00 CET | 2023-07-27 18:30:00 CEST | 230254 | 0 |
"PUMPING" | 2017-01-01 00:00:00 CET | 2023-08-10 00:00:00 CEST | 231524 | 0 |
"FOSSIL_OIL" | 2017-01-01 00:00:00 CET | 2023-08-10 00:00:00 CEST | 231524 | 0 |
"SOLAR" | 2017-01-01 00:00:00 CET | 2023-08-10 00:00:00 CEST | 231524 | 0 |
"BIOENERGY" | 2017-01-01 00:00:00 CET | 2023-08-10 00:00:00 CEST | 231524 | 0 |
"FOSSIL_GAS" | 2017-01-01 00:00:00 CET | 2023-08-10 00:00:00 CEST | 231524 | 0 |
"NUCLEAR" | 2017-01-01 00:00:00 CET | 2023-08-10 00:00:00 CEST | 231524 | 0 |
"FOSSIL_HARD_CO… | 2017-01-01 00:00:00 CET | 2023-08-10 00:00:00 CEST | 231524 | 0 |
"HYDRO" | 2017-01-01 00:00:00 CET | 2023-08-10 00:00:00 CEST | 231523 | 0 |
Finally, write the result:
"data/02-transform/generation.parquet")) table_combined.write_parquet(here(