GANs for Time series analysis (Synthetic data generation, anomaly detection and interpolation), Hypertuning using Optuna, MLFlow and Databricks
The model in this repository combines a stack of dilated causal convolutional and LSTM layers, and while GANs are quite challenging to train, this approach have produced great results on various types of datasets, such as stock market and weather measurements.
├── data
│ └── sample.csv
├── databricks-ci
│ ├── create_cls.sh
│ ├── runNotebook.sh
│ └── start_cls.sh
├── .gitlab-ci.yml
├── Standalone_example.ipynb
└── TSGAN_Hypertuning.ipynb
Databricks ML runtime (e.g. 10.5.x-cpu-ml)
Optuna 3.0.1
MLflow 1.8.0
torch 1.5.0
torchvision 0.6.0
tickers = ["GOOG", "AAPL"]
df = spark.createDataFrame([(i,) for i in tickers],("ticker",))
return_type = ArrayType(MapType(StringType(), StringType()))
@udf(returnType=return_type)
def yahoo_udf(ticker_label):
headers={'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.12; rv:74.0) Gecko/20100101 Firefox/74.0'}
end_ts = int(time.time())
start_ts = end_ts - 3600*24*7
ticker = ticker_label
data_url = f"https://query1.finance.yahoo.com/v8/finance/chart/{ticker}?symbol={ticker}&period1={start_ts}&period2={end_ts}&useYfid=true&interval=1m&includePrePost=true"
values = None
data_dict = {i: [] for i in ['timestamp', 'low', 'high', 'close', 'open', 'volume']}
try:
f = urllib.request.urlopen(urllib.request.Request(data_url, headers=headers))
status = f.status
if status == 200:
jresp = json.loads(f.read().decode("utf-8"))
data_dict['timestamp'] = jresp['chart']['result'][0]['timestamp']
for k in ['low', 'high', 'close', 'open', 'volume']:
data_dict[k] = jresp['chart']['result'][0]['indicators']['quote'][0][k]
except:
pass
df = pd.DataFrame(data_dict)
values = df.to_dict("index").values()
return list(values)
extracted = yahoo_udf("ticker")
exploded = explode(extracted).alias("exploded")
expanded = [
col("exploded").getItem(k).alias(k) for k in ['timestamp', 'low', 'high', 'close', 'open', 'volume']
]
pdf = df.select("ticker", exploded).select("ticker", *expanded)
def suggest_hyperparameters(trial):
lr = trial.suggest_float("lr", 1e-4, 1e-1, log=True)
dropoutG = trial.suggest_float("dropoutG", 0.0, 0.4, step=0.1)
dropoutD = trial.suggest_float("dropoutD", 0.0, 0.4, step=0.1)
optimizer_name = trial.suggest_categorical("optimizer_name", ["Adam", "Adadelta"])
return lr, dropoutG, dropoutD, optimizer_name
def objective(trial):
best_val_loss = float('Inf')
nz_dim = 100
best_mse_val = None
...
with mlflow.start_run():
lr, dropoutG, dropoutD, optimizer_name = suggest_hyperparameters(trial)
...
mlflow.log_params(trial.params)
mlflow.log_param("device", device)
netG = Generator(in_dim=nz_dim, n_channel=10, kernel_size=8, out_dim=1, hidden_dim=100, dropout=dropoutG).to(device)
netD = Discriminator(in_dim=1, cnn_layers=4, n_layers=1, kernel_size=8, n_channel=10, dropout=dropoutD, hidden_dim=100).to(device)
optimizerD = getattr(optim, optimizer_name)(netD.parameters(), lr=lr)
optimizerG = getattr(optim, optimizer_name)(netG.parameters(), lr=lr)
train(netG, netD, device, train_dataloader, optimizerG, optimizerD, n_epochs)
mse_errG = test(netG, device, test_dataloader)
if best_mse_val is None:
best_mse_val = mse_errG
best_mse_val = min(best_mse_val, mse_errG)
mlflow.log_metric("mse_errG", mse_errG)
return best_mse_val
run_tag = datetime.datetime.now().strftime("%Y-%m-%dT%H:%M")
experiment_id = mlflow.create_experiment(
f"/Users/user/TimeSeriesGAN_Exp_{run_tag}",
tags={"version": "v1", "priority": "P1"},
)
mlflow.set_experiment(experiment_id=experiment_id)
study = optuna.create_study(study_name=f"TimeSeriesGAN_study_{run_tag}", direction="minimize")
study.optimize(objective, n_trials=10)
sample.csv
Evolution of the generated time series