import os
import dvc.api
from datetime import datetime, timedelta
from zoneinfo import ZoneInfo
import requests
import json
import functools, itertools
import polars as pl
from pyprojroot.here import here
Flow Ingest
The goal of this pipeline is to update a parquet file with flow data from the French grid.
This is the ingest step:
- Try to read a data file to get start date, use default otherwise.
- Use minimum of today’s date or start-date plus two weeks as end date.
- Assemble query, execute for API
The flow data is available from 2014-12-16, onwards. However, given that the generation data are available starting 2017-01-01, we’ll use that as a start date.
Let’s determine the start_date
and the end_date
for the request:
the start date will be the larger of:
- 2017-01-01
- the end of the most-recent interval in the dataset
the end date will be the smaller of:
- the start date plus 14 days
- the current date
Dates are expressed as midnight, Paris time.
The timezone is a parameter that we share among multiple files in each of the pipelines. so we read a parameter:
= dvc.api.params_show("dvc-params.yaml") params
The timezone is a parameter that we share among multiple files in each of the pipelines. so we read a parameter:
To get the pipeline started, we read the published metadata, which contains the most-recent valid date. Note that although we are reading from this file, we not declare it a dependency in dvc.yaml
. This is because to declare this file would be to create a circular dependency, which DVC would refuse to accept.
= ZoneInfo(params["tz_local"])
tz_local
# not including this as a DVC dependency, as this would make things "circular"
= here("data/99-publish/flow-meta.json")
file_transform_meta
# if file exists, get most-recent date
= None
date_end_previous if os.path.isfile(file_transform_meta):
with open(file_transform_meta, "r") as file:
= json.load(file)
meta = datetime.fromisoformat(meta.get("interval_end")).astimezone(
date_end_previous
tz_local )
Let’s also get the current date (start of today, Paris time):
= datetime.now(tz_local).replace(hour=0, minute=0, second=0, microsecond=0) date_today
We now have enough information to determine the start and end dates for the API call:
# timedelta() seems to use periods rather than intervals;
# i.e., it takes DST into account and returns same wall-clock time.
= datetime(2017, 1, 1, tzinfo=tz_local)
date_start if date_end_previous is not None:
= date_end_previous
date_start
= date_start + timedelta(days=14)
date_end
# amend date_end if in the future
= min(date_end, date_today) date_end
Given the context, here are the start and end dates:
print(date_start.isoformat())
print(date_end.isoformat())
2017-06-27T16:00:00+02:00
2017-07-11T16:00:00+02:00
We need to request a token:
= requests.post(
auth "https://digital.iservices.rte-france.com/token/oauth/",
={
headers"Authorization": f'Basic {os.environ["RTE_FRANCE_BASE64"]}',
"Content-Type": "application/x-www-form-urlencoded",
},
)= auth.json()["access_token"] token
We compose a request, gather the response, then pull out the data:
= "https://digital.iservices.rte-france.com/open_api/physical_flow/v1/physical_flows"
endpoint
= requests.get(
response f"{endpoint}/?start_date={date_start.isoformat()}&end_date={date_end.isoformat()}",
={
headers"Host": "digital.iservices.rte-france.com",
"Authorization": f"Bearer {token}",
}, )
Check the response:
response.ok
True
If the response is OK, write out JSON content to file:
if response.ok:
= response.json()["physical_flows"]
array
with open(here("data/01-ingest/flow.json"), "w") as file:
file) json.dump(array,