Source code for spacetimeformer.data.csv_dataset

import random
from typing import List
import os
import tqdm

import torch
from torch.utils.data import Dataset, DataLoader
import pandas as pd
import numpy as np
import pytorch_lightning as pl
from sklearn.preprocessing import StandardScaler

import spacetimeformer as stf


[docs]class CSVTimeSeries: def __init__( self, data_path: str, target_cols: List[str], read_csv_kwargs={}, val_split: float = 0.2, test_split: float = 0.15, ): self.data_path = data_path assert os.path.exists(self.data_path) # read the file and do some datetime conversions raw_df = pd.read_csv( self.data_path, **read_csv_kwargs, ) time_df = pd.to_datetime(raw_df["Datetime"], format="%Y-%m-%d %H:%M") df = stf.data.timefeatures.time_features(time_df, raw_df) assert (df["Datetime"] > pd.Timestamp.min).all() assert (df["Datetime"] < pd.Timestamp.max).all() # Train/Val/Test Split using holdout approach # def mask_intervals(mask, intervals, cond): for (interval_low, interval_high) in intervals: if interval_low is None: interval_low = df["Datetime"].iloc[0].year if interval_high is None: interval_high = df["Datetime"].iloc[-1].year mask[ (df["Datetime"] >= interval_low) & (df["Datetime"] <= interval_high) ] = cond return mask test_cutoff = len(time_df) - round(test_split * len(time_df)) val_cutoff = test_cutoff - round(val_split * len(time_df)) val_interval_low = time_df.iloc[val_cutoff] val_interval_high = time_df.iloc[test_cutoff - 1] val_intervals = [(val_interval_low, val_interval_high)] test_interval_low = time_df.iloc[test_cutoff] test_interval_high = time_df.iloc[-1] test_intervals = [(test_interval_low, test_interval_high)] train_mask = df["Datetime"] > pd.Timestamp.min val_mask = df["Datetime"] > pd.Timestamp.max test_mask = df["Datetime"] > pd.Timestamp.max train_mask = mask_intervals(train_mask, test_intervals, False) train_mask = mask_intervals(train_mask, val_intervals, False) val_mask = mask_intervals(val_mask, val_intervals, True) test_mask = mask_intervals(test_mask, test_intervals, True) if (train_mask == False).all(): print(f"No training data detected for file {data_path}") self._train_data = df[train_mask] self._scaler = StandardScaler() self.target_cols = target_cols self._scaler = self._scaler.fit(self._train_data[target_cols].values) self._train_data = self.apply_scaling_df(df[train_mask]) self._val_data = self.apply_scaling_df(df[val_mask]) self._test_data = self.apply_scaling_df(df[test_mask])
[docs] def get_slice(self, split, start, stop, skip): assert split in ["train", "val", "test"] if split == "train": return self.train_data.iloc[start:stop:skip] elif split == "val": return self.val_data.iloc[start:stop:skip] else: return self.test_data.iloc[start:stop:skip]
[docs] def apply_scaling_df(self, df): scaled = df.copy(deep=True) scaled[self.target_cols] = ( df[self.target_cols].values - self._scaler.mean_ ) / self._scaler.scale_ return scaled
[docs] def apply_scaling(self, array): return (array - self._scaler.mean_) / self._scaler.scale_
[docs] def reverse_scaling_df(self, df): scaled = df.copy(deep=True) scaled[self.target_cols] = ( df[self.target_cols] * self._scaler.scale_ ) + self._scaler.mean_ return scaled
[docs] def reverse_scaling(self, array): return (array * self._scaler.scale_) + self._scaler.mean_
@property def train_data(self): return self._train_data @property def val_data(self): return self._val_data @property def test_data(self): return self._test_data
[docs] def length(self, split): return { "train": len(self.train_data), "val": len(self.val_data), "test": len(self.test_data), }[split]
[docs] @classmethod def add_cli(self, parser): parser.add_argument("--data_path", type=str, default="auto")
[docs]class CSVTorchDset(Dataset): def __init__( self, csv_time_series: CSVTimeSeries, split: str = "train", context_points: int = 128, target_points: int = 32, time_resolution: int = 1, ): assert split in ["train", "val", "test"] self.split = split self.series = csv_time_series self.context_points = context_points self.target_points = target_points self.time_resolution = time_resolution self._slice_start_points = [ i for i in range( 0, self.series.length(split) + time_resolution * (-target_points - context_points) + 1, ) ] random.shuffle(self._slice_start_points) self._slice_start_points = self._slice_start_points def __len__(self): return len(self._slice_start_points) def _torch(self, *np_arrays): t = [] for x in np_arrays: t.append(torch.from_numpy(x).float()) return tuple(t) def __getitem__(self, i): start = self._slice_start_points[i] series_slice = self.series.get_slice( self.split, start=start, stop=start + self.time_resolution * (self.context_points + self.target_points), skip=self.time_resolution, ).drop(columns=["Datetime"]) ctxt_slice, trgt_slice = ( series_slice.iloc[: self.context_points], series_slice.iloc[self.context_points :], ) ctxt_x = ctxt_slice[ ctxt_slice.columns.difference(self.series.target_cols) ].values ctxt_y = ctxt_slice[self.series.target_cols].values trgt_x = trgt_slice[ trgt_slice.columns.difference(self.series.target_cols) ].values trgt_y = trgt_slice[self.series.target_cols].values return self._torch(ctxt_x, ctxt_y, trgt_x, trgt_y)
[docs] @classmethod def add_cli(self, parser): parser.add_argument( "--context_points", type=int, default=128, help="number of previous timesteps given to the model in order to make predictions", ) parser.add_argument( "--target_points", type=int, default=32, help="number of future timesteps to predict", ) parser.add_argument( "--time_resolution", type=int, default=1, )
if __name__ == "__main__": test = CSVTimeSeries( "/p/qdatatext/jcg6dn/asos/temperature-v1.csv", ["ABI", "AMA", "ACT", "ALB", "JFK", "LGA"], ) breakpoint() dset = CSVTorchDset(test) base = dset[0][0] for i in range(len(dset)): assert base.shape == dset[i][0].shape breakpoint()