import os
import dvc.api
import json
from datetime import datetime, timedelta
from zoneinfo import ZoneInfo
import requests
import functools, itertools
import polars as pl
from pyprojroot.here import hereGeneration Transform
The goal of this pipeline is to update a parquet file with generation data from the French grid.
In the transform step:
- Parse response saved from the ingest step
- Concatenate with existing parquet files
- Write parquet files
First, let’s read the parameters:
params = dvc.api.params_show("dvc-params.yaml")Let’s load in the results from the previous step. We also load the published results, so that we can append the current results. Similar to the ingest step, we do not declare this file as a dependency, to avoid circularity.
with open(here("data/01-ingest/generation.json"), "r") as file:
array = json.load(file)
table_existing = pl.read_parquet(here("data/99-publish/standard/generation.parquet"))Here, I use some functional-programming tools (because that’s what I know) to make one observation per interval and production-type:
by_type = list(
map(
lambda x: list(
map(
lambda v: {
"type": x["production_type"],
"interval_start": v["start_date"],
"interval_end": v["end_date"],
"generation": v["value"],
},
x["values"],
)
),
array,
)
)
fixed = functools.reduce(itertools.chain, by_type)We’re now in a form to convert this to a Polars DataFrame, to parse, etc. Let’s create a DataFrame from the list:
table_raw = pl.DataFrame(fixed)We can use Polars expressions to parse the date-times:
table = table_raw.with_columns(
[
pl.col("interval_start")
.str.strptime(pl.Datetime("ms"))
.dt.convert_time_zone(time_zone=params["tz_local"]),
pl.col("interval_end")
.str.strptime(pl.Datetime("ms"))
.dt.convert_time_zone(time_zone=params["tz_local"]),
]
)Let’s remove duplicate entries:
table_combined = (
pl.concat([table_existing, table])
.unique(subset=["interval_start", "type"])
.sort(by=["interval_start", "type"])
)For the combined table, for each type of generation, count the observations and null values.
table_combined.groupby(pl.col("type")).agg(
pl.col("interval_start").min(),
pl.col("interval_end").max(),
pl.col("generation").count().alias("n_observations"),
pl.col("generation").null_count().alias("n_value_null"),
)| type | interval_start | interval_end | n_observations | n_value_null |
|---|---|---|---|---|
| str | datetime[ms, Europe/Paris] | datetime[ms, Europe/Paris] | u32 | u32 |
| "WIND" | 2017-01-01 00:00:00 CET | 2023-08-10 00:00:00 CEST | 231524 | 0 |
| "EXCHANGE" | 2017-01-01 00:00:00 CET | 2023-07-27 18:30:00 CEST | 230254 | 0 |
| "PUMPING" | 2017-01-01 00:00:00 CET | 2023-08-10 00:00:00 CEST | 231524 | 0 |
| "FOSSIL_OIL" | 2017-01-01 00:00:00 CET | 2023-08-10 00:00:00 CEST | 231524 | 0 |
| "SOLAR" | 2017-01-01 00:00:00 CET | 2023-08-10 00:00:00 CEST | 231524 | 0 |
| "BIOENERGY" | 2017-01-01 00:00:00 CET | 2023-08-10 00:00:00 CEST | 231524 | 0 |
| "FOSSIL_GAS" | 2017-01-01 00:00:00 CET | 2023-08-10 00:00:00 CEST | 231524 | 0 |
| "NUCLEAR" | 2017-01-01 00:00:00 CET | 2023-08-10 00:00:00 CEST | 231524 | 0 |
| "FOSSIL_HARD_CO… | 2017-01-01 00:00:00 CET | 2023-08-10 00:00:00 CEST | 231524 | 0 |
| "HYDRO" | 2017-01-01 00:00:00 CET | 2023-08-10 00:00:00 CEST | 231523 | 0 |
Finally, write the result:
table_combined.write_parquet(here("data/02-transform/generation.parquet"))