Generation Transform

The goal of this pipeline is to update a parquet file with generation data from the French grid.

In the transform step:

import os
import dvc.api
import json
from datetime import datetime, timedelta
from zoneinfo import ZoneInfo
import requests
import functools, itertools
import polars as pl
from pyprojroot.here import here

First, let’s read the parameters:

params = dvc.api.params_show("dvc-params.yaml")

Let’s load in the results from the previous step. We also load the published results, so that we can append the current results. Similar to the ingest step, we do not declare this file as a dependency, to avoid circularity.

with open(here("data/01-ingest/generation.json"), "r") as file:
    array = json.load(file)

table_existing = pl.read_parquet(here("data/99-publish/standard/generation.parquet"))

Here, I use some functional-programming tools (because that’s what I know) to make one observation per interval and production-type:

by_type = list(
    map(
        lambda x: list(
            map(
                lambda v: {
                    "type": x["production_type"],
                    "interval_start": v["start_date"],
                    "interval_end": v["end_date"],
                    "generation": v["value"],
                },
                x["values"],
            )
        ),
        array,
    )
)

fixed = functools.reduce(itertools.chain, by_type)

We’re now in a form to convert this to a Polars DataFrame, to parse, etc. Let’s create a DataFrame from the list:

table_raw = pl.DataFrame(fixed)

We can use Polars expressions to parse the date-times:

table = table_raw.with_columns(
    [
        pl.col("interval_start")
        .str.strptime(pl.Datetime("ms"))
        .dt.convert_time_zone(time_zone=params["tz_local"]),
        pl.col("interval_end")
        .str.strptime(pl.Datetime("ms"))
        .dt.convert_time_zone(time_zone=params["tz_local"]),
    ]
)

Let’s remove duplicate entries:

table_combined = (
    pl.concat([table_existing, table])
    .unique(subset=["interval_start", "type"])
    .sort(by=["interval_start", "type"])
)

For the combined table, for each type of generation, count the observations and null values.

table_combined.groupby(pl.col("type")).agg(
    pl.col("interval_start").min(),
    pl.col("interval_end").max(),
    pl.col("generation").count().alias("n_observations"),
    pl.col("generation").null_count().alias("n_value_null"),
)
shape: (10, 5)
type interval_start interval_end n_observations n_value_null
str datetime[ms, Europe/Paris] datetime[ms, Europe/Paris] u32 u32
"WIND" 2017-01-01 00:00:00 CET 2023-08-10 00:00:00 CEST 231524 0
"EXCHANGE" 2017-01-01 00:00:00 CET 2023-07-27 18:30:00 CEST 230254 0
"PUMPING" 2017-01-01 00:00:00 CET 2023-08-10 00:00:00 CEST 231524 0
"FOSSIL_OIL" 2017-01-01 00:00:00 CET 2023-08-10 00:00:00 CEST 231524 0
"SOLAR" 2017-01-01 00:00:00 CET 2023-08-10 00:00:00 CEST 231524 0
"BIOENERGY" 2017-01-01 00:00:00 CET 2023-08-10 00:00:00 CEST 231524 0
"FOSSIL_GAS" 2017-01-01 00:00:00 CET 2023-08-10 00:00:00 CEST 231524 0
"NUCLEAR" 2017-01-01 00:00:00 CET 2023-08-10 00:00:00 CEST 231524 0
"FOSSIL_HARD_CO… 2017-01-01 00:00:00 CET 2023-08-10 00:00:00 CEST 231524 0
"HYDRO" 2017-01-01 00:00:00 CET 2023-08-10 00:00:00 CEST 231523 0

Finally, write the result:

table_combined.write_parquet(here("data/02-transform/generation.parquet"))