import os
import dvc.api
import json
from datetime import datetime, timedelta
from zoneinfo import ZoneInfo
import requests
import functools, itertools
import polars as pl
from pyprojroot.here import here
Generation Transform
The goal of this pipeline is to update a parquet file with generation data from the French grid.
In the transform step:
- Parse response saved from the ingest step
- Concatenate with existing parquet files
- Write parquet files
First, let’s read the parameters:
= dvc.api.params_show("dvc-params.yaml") params
Let’s load in the results from the previous step. We also load the published results, so that we can append the current results. Similar to the ingest step, we do not declare this file as a dependency, to avoid circularity.
with open(here("data/01-ingest/flow.json"), "r") as file:
= json.load(file)
array
= here("data/99-publish/standard/flow.parquet")
file_existing = pl.DataFrame()
table_existing if os.path.isfile(file_existing):
= pl.read_parquet(file_existing) table_existing
We can use Polars to wrangle this data:
= (
table
pl.DataFrame(array)
.select(
["sender_country_name").alias("sender"),
pl.col("receiver_country_name").alias("receiver"),
pl.col("values"),
pl.col(
]
)"values") # list of 24 hourly values
.explode("values") # dict with start_date, end_date, value
.unnest("updated_date"])
.drop([
.rename("start_date": "interval_start", "end_date": "interval_end", "value": "flow"}
{
)
.with_columns(
["interval_start")
pl.col(str.strptime(pl.Datetime("ms"))
.=params["tz_local"]),
.dt.convert_time_zone(time_zone"interval_end")
pl.col(str.strptime(pl.Datetime("ms"))
.=params["tz_local"]),
.dt.convert_time_zone(time_zone
]
)"interval_start"])
.sort([
)
6) table.head(
sender | receiver | interval_start | interval_end | flow |
---|---|---|---|---|
str | str | datetime[ms, Europe/Paris] | datetime[ms, Europe/Paris] | i64 |
"Germany" | "France" | 2017-06-28 00:00:00 CEST | 2017-06-28 01:00:00 CEST | 45 |
"Belgium" | "France" | 2017-06-28 00:00:00 CEST | 2017-06-28 01:00:00 CEST | 719 |
"France" | "Germany" | 2017-06-28 00:00:00 CEST | 2017-06-28 01:00:00 CEST | 0 |
"France" | "Belgium" | 2017-06-28 00:00:00 CEST | 2017-06-28 01:00:00 CEST | 0 |
"France" | "Italy" | 2017-06-28 00:00:00 CEST | 2017-06-28 01:00:00 CEST | 923 |
"France" | "England-IFA" | 2017-06-28 00:00:00 CEST | 2017-06-28 01:00:00 CEST | 853 |
This gives us a tidy table we can work with.
Note that for each time, a country-pair is observed twice, with sender
and receiver
reversed.
filter((pl.col("sender") == "Germany") | (pl.col("receiver") == "Germany")).head(
table.6
)
sender | receiver | interval_start | interval_end | flow |
---|---|---|---|---|
str | str | datetime[ms, Europe/Paris] | datetime[ms, Europe/Paris] | i64 |
"Germany" | "France" | 2017-06-28 00:00:00 CEST | 2017-06-28 01:00:00 CEST | 45 |
"France" | "Germany" | 2017-06-28 00:00:00 CEST | 2017-06-28 01:00:00 CEST | 0 |
"Germany" | "France" | 2017-06-28 01:00:00 CEST | 2017-06-28 02:00:00 CEST | 55 |
"France" | "Germany" | 2017-06-28 01:00:00 CEST | 2017-06-28 02:00:00 CEST | 0 |
"Germany" | "France" | 2017-06-28 02:00:00 CEST | 2017-06-28 03:00:00 CEST | 0 |
"France" | "Germany" | 2017-06-28 02:00:00 CEST | 2017-06-28 03:00:00 CEST | 20 |
Note also that for each pair, there is at most one non-zero flow
, and that "France"
is always part of the transaction.
We can simplify this table by:
- combining
sender
andreceiver
into a replacement variable,partner
(France becomes implied). - we can replace
flow
withflow_net
, if power is transmitted to France it would be positive.
= (
table_net
table.with_columns(
["sender") == "France")
pl.when(pl.col("receiver"))
.then(pl.col("sender"))
.otherwise(pl.col("partner"),
.alias("sender") == "France")
pl.when(pl.col(-pl.col("flow"))
.then("flow"))
.otherwise(pl.col("flow_net"),
.alias(
]
)"partner", "interval_start", "interval_end"])
.groupby([sum("flow_net")])
.agg([pl."interval_start", "partner"])
.sort([
)
table_net.head()
partner | interval_start | interval_end | flow_net |
---|---|---|---|
str | datetime[ms, Europe/Paris] | datetime[ms, Europe/Paris] | i64 |
"Belgium" | 2017-06-28 00:00:00 CEST | 2017-06-28 01:00:00 CEST | 719 |
"England-IFA" | 2017-06-28 00:00:00 CEST | 2017-06-28 01:00:00 CEST | -853 |
"Germany" | 2017-06-28 00:00:00 CEST | 2017-06-28 01:00:00 CEST | 45 |
"Italy" | 2017-06-28 00:00:00 CEST | 2017-06-28 01:00:00 CEST | -923 |
"Switzerland" | 2017-06-28 00:00:00 CEST | 2017-06-28 01:00:00 CEST | 450 |
Let’s combine with the previous results, and remove duplicates:
= (
table_combined
pl.concat([table_existing, table_net])=["interval_start", "partner"])
.unique(subset=["interval_start", "partner"])
.sort(by )
For this table, for each partner, count the observations and null values.
"partner"]).agg(
table_combined.groupby(["interval_start").min(),
pl.col("interval_end").max(),
pl.col("flow_net").count().alias("n_observations"),
pl.col("flow_net").null_count().alias("n_value_null"),
pl.col( )
partner | interval_start | interval_end | n_observations | n_value_null |
---|---|---|---|---|
str | datetime[ms, Europe/Paris] | datetime[ms, Europe/Paris] | u32 | u32 |
"Switzerland" | 2017-01-01 00:00:00 CET | 2017-07-11 00:00:00 CEST | 4574 | 0 |
"Italy" | 2017-01-01 00:00:00 CET | 2017-07-11 00:00:00 CEST | 4574 | 0 |
"Spain" | 2017-01-01 00:00:00 CET | 2017-06-27 16:00:00 CEST | 4254 | 0 |
"Germany" | 2017-01-01 00:00:00 CET | 2017-07-11 00:00:00 CEST | 4574 | 0 |
"Belgium" | 2017-01-01 00:00:00 CET | 2017-07-11 00:00:00 CEST | 4574 | 0 |
"England-IFA" | 2017-01-01 00:00:00 CET | 2017-07-11 00:00:00 CEST | 4574 | 0 |
Finally, write the result:
"data/02-transform/flow.parquet")) table_combined.write_parquet(here(