import os
import dvc.api
import json
from datetime import datetime, timedelta
from zoneinfo import ZoneInfo
import requests
import functools, itertools
import polars as pl
from pyprojroot.here import hereGeneration Transform
The goal of this pipeline is to update a parquet file with generation data from the French grid.
In the transform step:
- Parse response saved from the ingest step
- Concatenate with existing parquet files
- Write parquet files
First, let’s read the parameters:
params = dvc.api.params_show("dvc-params.yaml")Let’s load in the results from the previous step. We also load the published results, so that we can append the current results. Similar to the ingest step, we do not declare this file as a dependency, to avoid circularity.
with open(here("data/01-ingest/flow.json"), "r") as file:
array = json.load(file)
file_existing = here("data/99-publish/standard/flow.parquet")
table_existing = pl.DataFrame()
if os.path.isfile(file_existing):
table_existing = pl.read_parquet(file_existing)We can use Polars to wrangle this data:
table = (
pl.DataFrame(array)
.select(
[
pl.col("sender_country_name").alias("sender"),
pl.col("receiver_country_name").alias("receiver"),
pl.col("values"),
]
)
.explode("values") # list of 24 hourly values
.unnest("values") # dict with start_date, end_date, value
.drop(["updated_date"])
.rename(
{"start_date": "interval_start", "end_date": "interval_end", "value": "flow"}
)
.with_columns(
[
pl.col("interval_start")
.str.strptime(pl.Datetime("ms"))
.dt.convert_time_zone(time_zone=params["tz_local"]),
pl.col("interval_end")
.str.strptime(pl.Datetime("ms"))
.dt.convert_time_zone(time_zone=params["tz_local"]),
]
)
.sort(["interval_start"])
)
table.head(6)| sender | receiver | interval_start | interval_end | flow |
|---|---|---|---|---|
| str | str | datetime[ms, Europe/Paris] | datetime[ms, Europe/Paris] | i64 |
| "Germany" | "France" | 2017-06-28 00:00:00 CEST | 2017-06-28 01:00:00 CEST | 45 |
| "Belgium" | "France" | 2017-06-28 00:00:00 CEST | 2017-06-28 01:00:00 CEST | 719 |
| "France" | "Germany" | 2017-06-28 00:00:00 CEST | 2017-06-28 01:00:00 CEST | 0 |
| "France" | "Belgium" | 2017-06-28 00:00:00 CEST | 2017-06-28 01:00:00 CEST | 0 |
| "France" | "Italy" | 2017-06-28 00:00:00 CEST | 2017-06-28 01:00:00 CEST | 923 |
| "France" | "England-IFA" | 2017-06-28 00:00:00 CEST | 2017-06-28 01:00:00 CEST | 853 |
This gives us a tidy table we can work with.
Note that for each time, a country-pair is observed twice, with sender and receiver reversed.
table.filter((pl.col("sender") == "Germany") | (pl.col("receiver") == "Germany")).head(
6
)| sender | receiver | interval_start | interval_end | flow |
|---|---|---|---|---|
| str | str | datetime[ms, Europe/Paris] | datetime[ms, Europe/Paris] | i64 |
| "Germany" | "France" | 2017-06-28 00:00:00 CEST | 2017-06-28 01:00:00 CEST | 45 |
| "France" | "Germany" | 2017-06-28 00:00:00 CEST | 2017-06-28 01:00:00 CEST | 0 |
| "Germany" | "France" | 2017-06-28 01:00:00 CEST | 2017-06-28 02:00:00 CEST | 55 |
| "France" | "Germany" | 2017-06-28 01:00:00 CEST | 2017-06-28 02:00:00 CEST | 0 |
| "Germany" | "France" | 2017-06-28 02:00:00 CEST | 2017-06-28 03:00:00 CEST | 0 |
| "France" | "Germany" | 2017-06-28 02:00:00 CEST | 2017-06-28 03:00:00 CEST | 20 |
Note also that for each pair, there is at most one non-zero flow, and that "France" is always part of the transaction.
We can simplify this table by:
- combining
senderandreceiverinto a replacement variable,partner(France becomes implied). - we can replace
flowwithflow_net, if power is transmitted to France it would be positive.
table_net = (
table.with_columns(
[
pl.when(pl.col("sender") == "France")
.then(pl.col("receiver"))
.otherwise(pl.col("sender"))
.alias("partner"),
pl.when(pl.col("sender") == "France")
.then(-pl.col("flow"))
.otherwise(pl.col("flow"))
.alias("flow_net"),
]
)
.groupby(["partner", "interval_start", "interval_end"])
.agg([pl.sum("flow_net")])
.sort(["interval_start", "partner"])
)
table_net.head()| partner | interval_start | interval_end | flow_net |
|---|---|---|---|
| str | datetime[ms, Europe/Paris] | datetime[ms, Europe/Paris] | i64 |
| "Belgium" | 2017-06-28 00:00:00 CEST | 2017-06-28 01:00:00 CEST | 719 |
| "England-IFA" | 2017-06-28 00:00:00 CEST | 2017-06-28 01:00:00 CEST | -853 |
| "Germany" | 2017-06-28 00:00:00 CEST | 2017-06-28 01:00:00 CEST | 45 |
| "Italy" | 2017-06-28 00:00:00 CEST | 2017-06-28 01:00:00 CEST | -923 |
| "Switzerland" | 2017-06-28 00:00:00 CEST | 2017-06-28 01:00:00 CEST | 450 |
Let’s combine with the previous results, and remove duplicates:
table_combined = (
pl.concat([table_existing, table_net])
.unique(subset=["interval_start", "partner"])
.sort(by=["interval_start", "partner"])
)For this table, for each partner, count the observations and null values.
table_combined.groupby(["partner"]).agg(
pl.col("interval_start").min(),
pl.col("interval_end").max(),
pl.col("flow_net").count().alias("n_observations"),
pl.col("flow_net").null_count().alias("n_value_null"),
)| partner | interval_start | interval_end | n_observations | n_value_null |
|---|---|---|---|---|
| str | datetime[ms, Europe/Paris] | datetime[ms, Europe/Paris] | u32 | u32 |
| "Switzerland" | 2017-01-01 00:00:00 CET | 2017-07-11 00:00:00 CEST | 4574 | 0 |
| "Italy" | 2017-01-01 00:00:00 CET | 2017-07-11 00:00:00 CEST | 4574 | 0 |
| "Spain" | 2017-01-01 00:00:00 CET | 2017-06-27 16:00:00 CEST | 4254 | 0 |
| "Germany" | 2017-01-01 00:00:00 CET | 2017-07-11 00:00:00 CEST | 4574 | 0 |
| "Belgium" | 2017-01-01 00:00:00 CET | 2017-07-11 00:00:00 CEST | 4574 | 0 |
| "England-IFA" | 2017-01-01 00:00:00 CET | 2017-07-11 00:00:00 CEST | 4574 | 0 |
Finally, write the result:
table_combined.write_parquet(here("data/02-transform/flow.parquet"))