Skip to main content

Introduction

MLFlow is an open-source platform that allows you, among other things, to track experiments to compare different hyperparameters and results, and to serve ML models on different platforms. In this tutorial, we provide basic scripts that you can use to track experiments made with TimeGPT, or to serve TimeGPT through MLFLow. Each script is customizable to your own needs. The goal is to provide an easy-to-use template that you can extend.

Experiment Tracking with TimeGPT and MLFlow

The following scripts provide functions for logging experiment results when testing different parameter combinations in TimeGPT. Of course, to use these scripts make sure to have MLFLow installed and any other required dependencies. To install MLFlow, run pip install mlflow. Here, we use the following packages:
import os

import mlflow
import pandas as pd

from datetime import datetime
from dotenv import load_dotenv         # Used when working with the API directly or a Docker deployment
from api.serverless import make_client # Used when working with the Python wheel
In all scripts, we require that you initialize the NixtlaClient and pass it to the functions. You can see how to setup your client with an API key here.

Logging experiments with forecasting

The following script can be used to log results when using the forecast method.
def log_timegpt_forecast(
    client: NixtlaClient,
    df: pd.DataFrame,
    h: int = 12,
    freq: str = "MS",
    level: list = None,
    model: str = "timegpt-2-mini",
    experiment_name: str = "basic_forecast",
    time_col: str = "ds",
    target_col: str = "y",
    id_col: str = "unique_id",
    **kwargs
):
    """
    Perform TimeGPT forecast and log to MLFlow.
    
    Parameters:
    -----------
    client : NixtlaClient
        Initialized Nixtla client
    df : pd.DataFrame
        Input dataframe with time series data
    h : int
        Forecast horizon
    freq : str
        Frequency of the time series
    level : list
        Confidence levels for prediction intervals
    model : str
        TimeGPT model variant to use
    experiment_name : str
        Name for this MLFlow run
    time_col : str
        Name of the time column in df
    target_col : str
        Name of the target column in df
    id_col : str
        Name of the series identifier column in df
    **kwargs : dict
        Additional parameters to pass to forecast()
    """
    
    with mlflow.start_run(run_name=experiment_name):
        # Log parameters
        mlflow.log_param("horizon", h)
        mlflow.log_param("frequency", freq)
        mlflow.log_param("model", model)
        mlflow.log_param("n_series", df[id_col].nunique() if id_col in df.columns else 1)
        mlflow.log_param("n_observations", len(df))
        
        if level:
            mlflow.log_param("prediction_intervals", level)
        
        # Log any additional parameters
        for key, value in kwargs.items():
            mlflow.log_param(key, value)
        
        # Log dataset info
        mlflow.log_param("start_date", df[time_col].min())
        mlflow.log_param("end_date", df[time_col].max())

        # Forecast
        forecast_df = client.forecast(
            df=df,
            h=h,
            freq=freq,
            level=level,
            model=model,
            id_col=id_col,
            time_col=time_col,
            target_col=target_col,
            **kwargs
        )
        
        # Log metrics
        mlflow.log_metric("forecast_points", len(forecast_df))
        
        # Log forecast as artifact
        forecast_path = "forecast_output.csv"
        forecast_df.to_csv(forecast_path, index=False)
        mlflow.log_artifact(forecast_path)
        
        # Log input data as artifact
        input_path = "input_data.csv"
        df.to_csv(input_path, index=False)
        mlflow.log_artifact(input_path)
        
        # Add tags
        mlflow.set_tag("task_type", "forecasting")
        mlflow.set_tag("timestamp", datetime.now().isoformat())
        
        # Clean up temporary files
        os.remove(forecast_path)
        os.remove(input_path)
        
        return forecast_df
If you want add more explicit parameters for fine-tuning or reproduce the exact type hints of the SDK, refer to the SDK reference.

Logging experiments with cross-validation

The following is a simple script to log metrics when performing experiments with the cross_validation method. This script requires utilsforecast to compute performance metrics, so make sure pip install utilsforecast. The script allows you to log overall performance metrics across multiple windows, and also per-window metrics.
def log_timegpt_cross_validation(
    client: NixtlaClient,
    df: pd.DataFrame,
    h: int = 12,
    n_windows: int = 1,
    step_size: int = None,
    freq: str = "MS",
    level: list = None,
    model: str = "timegpt-2-mini",
    experiment_name: str = "cross_validation",
    time_col: str = "ds",
    target_col: str = "y",
    id_col: str = "unique_id",
    **kwargs
):
    """
    Perform TimeGPT cross-validation and log to MLFlow.
    
    Parameters:
    -----------
    client : NixtlaClient
        Initialized Nixtla client
    df : pd.DataFrame
        Input dataframe with time series data
    h : int
        Forecast horizon
    n_windows : int
        Number of cross-validation windows
    step_size : int
        Step size between windows (default: h)
    freq : str
        Frequency of the time series
    level : list
        Confidence levels for prediction intervals
    model : str
        TimeGPT model variant to use
    experiment_name : str
        Name for this MLFlow run
    time_col : str
        Name of the time column in df
    target_col : str
        Name of the target column in df
    id_col : str
        Name of the series identifier column in df
    **kwargs : dict
        Additional parameters to pass to cross_validation()
    """
    
    with mlflow.start_run(run_name=experiment_name):
        # Log parameters
        mlflow.log_param("horizon", h)
        mlflow.log_param("n_windows", n_windows)
        mlflow.log_param("step_size", step_size or h)
        mlflow.log_param("frequency", freq)
        mlflow.log_param("model", model)
        mlflow.log_param("n_series", df[id_col].nunique() if id_col in df.columns else 1)
        
        if level:
            mlflow.log_param("prediction_intervals", level)
        
        for key, value in kwargs.items():
            mlflow.log_param(key, value)
        
        # Perform cross-validation
        cv_df = client.cross_validation(
            df=df,
            h=h,
            n_windows=n_windows,
            step_size=step_size,
            freq=freq,
            level=level,
            model=model,
            time_col=time_col,
            target_col=target_col,
            id_col=id_col,
            **kwargs
        )
        
        # Calculate and log metrics
        from utilsforecast.losses import mae, mse, rmse
        
        # MAE
        mae_value = mae(
            df=cv_df.drop(columns=['cutoff']),
            models=['TimeGPT'],
            target_col=target_col,
            id_col=id_col,
        )['TimeGPT'].values[0]
        
        # MSE
        mse_value = mse(
            df=cv_df.drop(columns=['cutoff']),
            models=['TimeGPT'],
            target_col=target_col,
            id_col=id_col,
        )['TimeGPT'].values[0]
        
        # RMSE
        rmse_value = rmse(
            df=cv_df.drop(columns=['cutoff']),
            models=['TimeGPT'],
            target_col=target_col,
            id_col=id_col,
        )['TimeGPT'].values[0]
        
        mlflow.log_metric("mae", mae_value)
        mlflow.log_metric("mse", mse_value)
        mlflow.log_metric("rmse", rmse_value)
        mlflow.log_metric("total_cv_predictions", len(cv_df))
        
        # Log per-window metrics (MAE as an example)
        cutoffs = cv_df['cutoff'].unique()
        for i, cutoff in enumerate(cutoffs):
            window_df = cv_df[cv_df['cutoff'] == cutoff]
            window_mae = mae(
                df=window_df.drop(columns=['cutoff']),
                models=['TimeGPT'],
                target_col=target_col,
                id_col=id_col,
            )['TimeGPT'].values[0]
            mlflow.log_metric(f"mae_window_{i+1}", window_mae)
        
        # Log artifacts
        cv_path = "cross_validation_results.csv"
        cv_df.to_csv(cv_path, index=False)
        mlflow.log_artifact(cv_path)
        
        # Log summary statistics
        summary = {
            'metric': ['MAE', 'MSE', 'RMSE'],
            'value': [mae_value, mse_value, rmse_value]
        }
        summary_df = pd.DataFrame(summary)
        summary_path = "metrics_summary.csv"
        summary_df.to_csv(summary_path, index=False)
        mlflow.log_artifact(summary_path)
        
        mlflow.set_tag("task_type", "cross_validation")
        mlflow.set_tag("timestamp", datetime.now().isoformat())
        
        # Clean up
        os.remove(cv_path)
        os.remove(summary_path)
        
        return cv_df
If you want add more explicit parameters for fine-tuning or reproduce the exact type hints of the SDK, refer to the SDK reference.

Logging experiments with anomaly detection

This script showcases how you can track experiments done with the detect_anomalies_online method.
def log_timegpt_online_anomaly_detection(
    client: NixtlaClient,
    df: pd.DataFrame,
    h: int,
    detection_size: int,
    threshold_method: str = "univariate",
    freq: str = "D",
    level: int | float = 99,
    model: str = "timegpt-2-mini",
    experiment_name: str = "anomaly_detection",
    time_col: str = "ds",
    target_col: str = "y",
    id_col: str = "unique_id",
    **kwargs
):
    """
    Perform TimeGPT anomaly detection and log results to MLFlow.
    
    Parameters:
    -----------
    client : NixtlaClient
        Initialized Nixtla client
    df : pd.DataFrame
        Input dataframe with time series data
    freq : str
        Frequency of the time series
    level : int
        Confidence level for anomaly detection threshold
    model : str
        TimeGPT model variant to use
    experiment_name : str
        Name for this MLFlow run
    time_col : str
        Name of the time column in df
    target_col : str
        Name of the target column in df
    id_col : str
        Name of the series identifier column in df
    **kwargs : dict
        Additional parameters to pass to detect_anomalies()
    """
    
    with mlflow.start_run(run_name=experiment_name):        
        # Log parameters
        mlflow.log_param("horizon", h)
        mlflow.log_param("detection_size", detection_size)
        mlflow.log_param("threshold_method", threshold_method)
        mlflow.log_param("frequency", freq)
        mlflow.log_param("detection_level", level)
        mlflow.log_param("model", model)
        mlflow.log_param("n_observations", len(df))
        
        for key, value in kwargs.items():
            mlflow.log_param(key, value)
        
        # Detect anomalies
        anomalies_df = client.detect_anomalies_online(
            df=df,
            h=h,
            detection_size=detection_size,
            threshold_method=threshold_method,
            freq=freq,
            level=level,
            model=model,
            time_col=time_col,
            target_col=target_col,
            id_col=id_col,
            **kwargs
        )
        
        # Calculate metrics
        n_anomalies = anomalies_df['anomaly'].sum()
        
        mlflow.log_metric("n_anomalies", n_anomalies)
        
        # Log results
        anomaly_path = "anomaly_detection_results.csv"
        anomalies_df.to_csv(anomaly_path, index=False)
        mlflow.log_artifact(anomaly_path)
        
        # Log only the detected anomalies
        if n_anomalies > 0:
            detected_anomalies = anomalies_df[anomalies_df['anomaly'] == True]
            detected_path = "detected_anomalies_only.csv"
            detected_anomalies.to_csv(detected_path, index=False)
            mlflow.log_artifact(detected_path)
            os.remove(detected_path)
        
        mlflow.set_tag("task_type", "anomaly_detection")
        mlflow.set_tag("timestamp", datetime.now().isoformat())
        
        # Clean up
        os.remove(anomaly_path)
        
        return anomalies_df
If you want add more explicit parameters for fine-tuning or reproduce the exact type hints of the SDK, refer to the SDK reference.

Sample usage

With the above functions, you can now run experiments and log it to MLFLow. First, you must instantiate your client using either:
from nixtla import NixtlaClient

nixtla_client = NixtlaClient(api_key='your_api_key_here')
Or, if you are using a Python wheel:
from api.serverless import make_client

client = make_client()
Then, load your data. Here, we use the simple air passengers dataset.
df = pd.read_csv(
    'https://raw.githubusercontent.com/Nixtla/transfer-learning-time-series/main/datasets/air_passengers.csv'
)
df.columns = ['ds', 'y']
df["unique_id"] = 0
df = df[["unique_id", "ds", "y"]]
After, you can set your tracking URI and experiment name for MLFlow. Note that here, we use the local filesystem for tracking.
mlflow.set_tracking_uri("mlruns")
experiment = mlflow.set_experiment("timegpt_experiments")
Finally, you can run any function defined above.
forecast_df = log_timegpt_forecast(
    client=client,
    df=df,
    h=h,
    freq=freq,
    level=[80, 90],
    experiment_name="forecast_example",
    time_col=time_col,
    target_col=target_col,
    id_col=id_col,
    model="timegpt-2"
)

cv_df = log_timegpt_cross_validation(
    client=client,
    df=df,
    h=h,
    n_windows=1,
    freq=freq,
    level=[90],
    experiment_name="cv_example",
    time_col=time_col,
    target_col=target_col,
    id_col=id_col,
)

anomaly_df = log_timegpt_online_anomaly_detection(
    client=client,
    df=df,
    h=h,
    detection_size=h,
    threshold_method="univariate",
    freq=freq,
    level=60,
    model="timegpt-2-mini",
    experiment_name="anomaly_detection_example",
    time_col=time_col,
    target_col=target_col,
    id_col=id_col,
)

Serving TimeGPT with MLFlow

You can also use MLFlow for model serving. This can be useful if you need a unified interface and serve the model on many endpoints. Although it is not required to use MLFlow to use TimeGPT, it can be a practice your organization enforces. The following script shows how you can wrap the TimeGPT model in an mlflow.pyfunc.PythonModel to save the model and call it.
class MLFLowTimeGPTModel(mlflow.pyfunc.PythonModel):
    """
    Unified MLflow pyfunc wrapper for TimeGPT
    
    Can perform forecasting, cross-validation, and anomaly detection
    based on the 'operation' parameter in the input.
    """
    
    def __init__(self, 
                 client: NixtlaClient,
                 model: str = "timegpt-2-mini",
                 default_h: int = 96,
                 default_freq: str = "15min",
                 default_level: Optional[list] = None,
                 default_n_windows: int = 1,
                 default_anomaly_level: int = 99,
                 ):
        """
        Initialize the unified TimeGPT model wrapper.
        
        Parameters:
        -----------
        model : str
            TimeGPT model variant (timegpt-2-mini, timegpt-2)
        default_h : int
            Default forecast horizon
        default_freq : str
            Default frequency
        default_level : list
            Default confidence levels for prediction intervals
        default_n_windows : int
            Default number of cross-validation windows
        default_anomaly_level : int
            Default confidence level for anomaly detection
        """
        self.model = model
        self.default_h = default_h
        self.default_freq = default_freq
        self.default_level = default_level or [80, 90]
        self.default_n_windows = default_n_windows
        self.default_anomaly_level = default_anomaly_level
        self.client = client
    
    def load_context(self, context):
        """
        Load the model context. Called once when the model is loaded.
        
        Parameters:
        -----------
        context : mlflow.pyfunc.PythonModelContext
            Context containing artifacts and other model metadata
        """
        # Load configuration from artifacts if present
        if context.artifacts and "config" in context.artifacts:
            with open(context.artifacts["config"], 'r') as f:
                config = json.load(f)
                self.model = config.get("model", self.model)
                self.default_h = config.get("h", self.default_h)
                self.default_freq = config.get("freq", self.default_freq)
                self.default_level = config.get("level", self.default_level)
                self.default_n_windows = config.get("n_windows", self.default_n_windows)
                self.default_anomaly_level = config.get("anomaly_level", self.default_anomaly_level)
    
    def predict(self, context, model_input):
        """Perform operation"""
        # Parse input
        if isinstance(model_input, dict):
            df = model_input.get('data')
            operation = model_input.get('operation', 'forecast')
            time_col = model_input.get('time_col', 'ds')
            target_col = model_input.get('target_col', 'y')
            id_col = model_input.get('id_col', 'unique_id')
            model_input.pop('time_col', None)
            model_input.pop('target_col', None)
            model_input.pop('id_col', None)
        else:
            # If just DataFrame is passed, default to forecast
            df = model_input
            operation = 'forecast'
            id_col = 'unique_id'
            time_col = 'ds'
            target_col = 'y'
        
        # Validate input
        if df is None or not isinstance(df, pd.DataFrame):
            raise ValueError("Input must contain a pandas DataFrame under 'data' key")
        
        # Route to appropriate operation
        if operation == 'forecast':
            return self._forecast(df, model_input, id_col, time_col, target_col)
        elif operation == 'cross_validation':
            return self._cross_validation(df, model_input, id_col, time_col, target_col)
        elif operation == 'anomaly_detection':
            return self._anomaly_detection(df, model_input, id_col, time_col, target_col)
        else:
            raise ValueError(f"Unknown operation: {operation}. Must be 'forecast', 'cross_validation', or 'anomaly_detection'")

    def _forecast(self, df, params, id_col, time_col, target_col):
        """Perform forecasting operation."""
        h = params.get('h', self.default_h) if isinstance(params, dict) else self.default_h
        freq = params.get('freq', self.default_freq) if isinstance(params, dict) else self.default_freq
        level = params.get('level', self.default_level) if isinstance(params, dict) else self.default_level
        
        # Extract additional parameters
        additional_params = {}
        if isinstance(params, dict):
            exclude_keys = {'data', 'operation', 'h', 'freq', 'level', 'time_col', 'target_col'}
            additional_params = {k: v for k, v in params.items() if k not in exclude_keys}
        
        result = self.client.forecast(
            df=df,
            h=h,
            freq=freq,
            level=level,
            id_col=id_col,
            time_col=time_col,
            target_col=target_col,
            model=self.model,
            **additional_params
        )
        
        return result

    def _cross_validation(self, df, params, id_col, time_col, target_col):
        """Perform cross-validation operation."""
        h = params.get('h', self.default_h) if isinstance(params, dict) else self.default_h
        n_windows = params.get('n_windows', self.default_n_windows) if isinstance(params, dict) else self.default_n_windows
        freq = params.get('freq', self.default_freq) if isinstance(params, dict) else self.default_freq
        level = params.get('level', self.default_level) if isinstance(params, dict) else self.default_level
        step_size = params.get('step_size') if isinstance(params, dict) else None
        
        # Extract additional parameters
        additional_params = {}
        if isinstance(params, dict):
            exclude_keys = {'data', 'operation', 'h', 'n_windows', 'freq', 'level', 'step_size', 'time_col', 'target_col'}
            additional_params = {k: v for k, v in params.items() if k not in exclude_keys}
        
        result = self.client.cross_validation(
            df=df,
            h=h,
            n_windows=n_windows,
            step_size=step_size,
            freq=freq,
            level=level,
            id_col=id_col,
            time_col=time_col,
            target_col=target_col,
            model=self.model,
            **additional_params
        )
        
        return result

    def _anomaly_detection(self, df, params, id_col, time_col, target_col):
        """Perform anomaly detection operation."""
        h = params.get('h', self.default_h) if isinstance(params, dict) else self.default_h
        detection_size = params.get("detection_size", self.default_h) if isinstance(params, dict) else self.default_h
        threshold_method = params.get("threshold_method", "univariate") if isinstance(params, dict) else "univariate"
        freq = params.get('freq', self.default_freq) if isinstance(params, dict) else self.default_freq
        level = params.get('level', self.default_anomaly_level) if isinstance(params, dict) else self.default_anomaly_level
        
        # Extract additional parameters
        additional_params = {}
        if isinstance(params, dict):
            exclude_keys = {'data', 'operation', 'h', 'detection_size', 
                'threshold_method', 'freq', 'level', 'time_col', 'target_col'}
            additional_params = {k: v for k, v in params.items() if k not in exclude_keys}
        
        result = self.client.detect_anomalies_online(
            df=df,
            h=h,
            detection_size=detection_size,
            threshold_method=threshold_method,
            freq=freq,
            level=level,
            time_col=time_col,
            id_col=id_col,
            target_col=target_col,
            model=self.model,
            **additional_params
        )
        
        return result


def save_unified_model(
    client: NixtlaClient,
    model_path: str,
    model_variant: str = "timegpt-2-mini",
    **default_params
):
    """
    Save a unified TimeGPT model that can perform all operations.
    
    Parameters:
    -----------
    client : NixtlaClient
        Initialized Nixtla client
    model_path : str
        Path where the model will be saved
    model_variant : str
        TimeGPT model variant to use
    **default_params : dict
        Default parameters for operations
    """
    python_model = MLFLowTimeGPTModel(
        client=client,
        model=model_variant,
        **default_params
    )
    
    # Save the model
    mlflow.pyfunc.save_model(
        path=model_path,
        python_model=python_model,
    )

Sample usage

First, you must instantiate your client using either:
from nixtla import NixtlaClient

nixtla_client = NixtlaClient(api_key='your_api_key_here')
Or, if you are using a Python wheel:
from api.serverless import make_client

client = make_client()
Then, load your data. Here, we use the simple air passengers dataset.
df = pd.read_csv(
    'https://raw.githubusercontent.com/Nixtla/transfer-learning-time-series/main/datasets/air_passengers.csv'
)
df.columns = ['ds', 'y']
df["unique_id"] = 0
df = df[["unique_id", "ds", "y"]]
Then, save the model with:
save_unified_model(client=client, model_path="test_model", model_variant="timegpt-2-mini", default_h=h, default_freq=freq)
Note that if you want to use another variant of TimeGPT, say timegpt-2, then you must save another instance and specify that model variant. Now, you can perform forecasting, cross-validation and anomaly detection with the saved model in MLFlow.
# Load model
model = mlflow.pyfunc.load_model("test_model")

# Forecast
forecast = model.predict({'data': df, 'operation': 'forecast', 'h': h, 'id_col': id_col, 'time_col': time_col, 'target_col': target_col, 'freq': freq})

# Cross-validation
cv = model.predict({'data': df, 'operation': 'cross_validation', 'h': h, 'n_windows': 1, 'id_col': id_col, 'time_col': time_col, 'target_col': target_col, 'freq': freq})

# Anomaly detection
anomalies = model.predict({'data': df, 'operation': 'anomaly_detection', 'detection_size': h, 'id_col': id_col, 'time_col': time_col, 'target_col': target_col, 'freq': freq})