Generation Transform

The goal of this pipeline is to update a parquet file with generation data from the French grid.

In the transform step:

import os
import dvc.api
import json
from datetime import datetime, timedelta
from zoneinfo import ZoneInfo
import requests
import functools, itertools
import polars as pl
from pyprojroot.here import here

First, let’s read the parameters:

params = dvc.api.params_show("dvc-params.yaml")

Let’s load in the results from the previous step. We also load the published results, so that we can append the current results. Similar to the ingest step, we do not declare this file as a dependency, to avoid circularity.

with open(here("data/01-ingest/flow.json"), "r") as file:
    array = json.load(file)

file_existing = here("data/99-publish/standard/flow.parquet")
table_existing = pl.DataFrame()
if os.path.isfile(file_existing):
    table_existing = pl.read_parquet(file_existing)

We can use Polars to wrangle this data:

table = (
    pl.DataFrame(array)
    .select(
        [
            pl.col("sender_country_name").alias("sender"),
            pl.col("receiver_country_name").alias("receiver"),
            pl.col("values"),
        ]
    )
    .explode("values")  # list of 24 hourly values
    .unnest("values")  # dict with start_date, end_date, value
    .drop(["updated_date"])
    .rename(
        {"start_date": "interval_start", "end_date": "interval_end", "value": "flow"}
    )
    .with_columns(
        [
            pl.col("interval_start")
            .str.strptime(pl.Datetime("ms"))
            .dt.convert_time_zone(time_zone=params["tz_local"]),
            pl.col("interval_end")
            .str.strptime(pl.Datetime("ms"))
            .dt.convert_time_zone(time_zone=params["tz_local"]),
        ]
    )
    .sort(["interval_start"])
)

table.head(6)
shape: (6, 5)
sender receiver interval_start interval_end flow
str str datetime[ms, Europe/Paris] datetime[ms, Europe/Paris] i64
"Germany" "France" 2017-06-28 00:00:00 CEST 2017-06-28 01:00:00 CEST 45
"Belgium" "France" 2017-06-28 00:00:00 CEST 2017-06-28 01:00:00 CEST 719
"France" "Germany" 2017-06-28 00:00:00 CEST 2017-06-28 01:00:00 CEST 0
"France" "Belgium" 2017-06-28 00:00:00 CEST 2017-06-28 01:00:00 CEST 0
"France" "Italy" 2017-06-28 00:00:00 CEST 2017-06-28 01:00:00 CEST 923
"France" "England-IFA" 2017-06-28 00:00:00 CEST 2017-06-28 01:00:00 CEST 853

This gives us a tidy table we can work with.

Note that for each time, a country-pair is observed twice, with sender and receiver reversed.

table.filter((pl.col("sender") == "Germany") | (pl.col("receiver") == "Germany")).head(
    6
)
shape: (6, 5)
sender receiver interval_start interval_end flow
str str datetime[ms, Europe/Paris] datetime[ms, Europe/Paris] i64
"Germany" "France" 2017-06-28 00:00:00 CEST 2017-06-28 01:00:00 CEST 45
"France" "Germany" 2017-06-28 00:00:00 CEST 2017-06-28 01:00:00 CEST 0
"Germany" "France" 2017-06-28 01:00:00 CEST 2017-06-28 02:00:00 CEST 55
"France" "Germany" 2017-06-28 01:00:00 CEST 2017-06-28 02:00:00 CEST 0
"Germany" "France" 2017-06-28 02:00:00 CEST 2017-06-28 03:00:00 CEST 0
"France" "Germany" 2017-06-28 02:00:00 CEST 2017-06-28 03:00:00 CEST 20

Note also that for each pair, there is at most one non-zero flow, and that "France" is always part of the transaction.

We can simplify this table by:

table_net = (
    table.with_columns(
        [
            pl.when(pl.col("sender") == "France")
            .then(pl.col("receiver"))
            .otherwise(pl.col("sender"))
            .alias("partner"),
            pl.when(pl.col("sender") == "France")
            .then(-pl.col("flow"))
            .otherwise(pl.col("flow"))
            .alias("flow_net"),
        ]
    )
    .groupby(["partner", "interval_start", "interval_end"])
    .agg([pl.sum("flow_net")])
    .sort(["interval_start", "partner"])
)

table_net.head()
shape: (5, 4)
partner interval_start interval_end flow_net
str datetime[ms, Europe/Paris] datetime[ms, Europe/Paris] i64
"Belgium" 2017-06-28 00:00:00 CEST 2017-06-28 01:00:00 CEST 719
"England-IFA" 2017-06-28 00:00:00 CEST 2017-06-28 01:00:00 CEST -853
"Germany" 2017-06-28 00:00:00 CEST 2017-06-28 01:00:00 CEST 45
"Italy" 2017-06-28 00:00:00 CEST 2017-06-28 01:00:00 CEST -923
"Switzerland" 2017-06-28 00:00:00 CEST 2017-06-28 01:00:00 CEST 450

Let’s combine with the previous results, and remove duplicates:

table_combined = (
    pl.concat([table_existing, table_net])
    .unique(subset=["interval_start", "partner"])
    .sort(by=["interval_start", "partner"])
)

For this table, for each partner, count the observations and null values.

table_combined.groupby(["partner"]).agg(
    pl.col("interval_start").min(),
    pl.col("interval_end").max(),
    pl.col("flow_net").count().alias("n_observations"),
    pl.col("flow_net").null_count().alias("n_value_null"),
)
shape: (6, 5)
partner interval_start interval_end n_observations n_value_null
str datetime[ms, Europe/Paris] datetime[ms, Europe/Paris] u32 u32
"Switzerland" 2017-01-01 00:00:00 CET 2017-07-11 00:00:00 CEST 4574 0
"Italy" 2017-01-01 00:00:00 CET 2017-07-11 00:00:00 CEST 4574 0
"Spain" 2017-01-01 00:00:00 CET 2017-06-27 16:00:00 CEST 4254 0
"Germany" 2017-01-01 00:00:00 CET 2017-07-11 00:00:00 CEST 4574 0
"Belgium" 2017-01-01 00:00:00 CET 2017-07-11 00:00:00 CEST 4574 0
"England-IFA" 2017-01-01 00:00:00 CET 2017-07-11 00:00:00 CEST 4574 0

Finally, write the result:

table_combined.write_parquet(here("data/02-transform/flow.parquet"))