Skip to content

quantex.sources

DataSource abstractions for the QuantEx library.

BacktestingDataSource

Bases: DataSource

A data source for backtesting that must have a defined length.

Source code in src/quantex/sources.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
class BacktestingDataSource(DataSource):
    """A data source for backtesting that must have a defined length."""

    @abstractmethod
    def __len__(self) -> int:  # pragma: no cover – abstract contract
        raise NotImplementedError

    @abstractmethod
    def get_raw_data(self) -> pd.DataFrame:
        """Returns the entire underlying data as a DataFrame.

        This method is intended for use by the backtesting engine for
        pre-computation and should not be used in strategy logic.
        """
        raise NotImplementedError

get_raw_data() abstractmethod

Returns the entire underlying data as a DataFrame.

This method is intended for use by the backtesting engine for pre-computation and should not be used in strategy logic.

Source code in src/quantex/sources.py
70
71
72
73
74
75
76
77
@abstractmethod
def get_raw_data(self) -> pd.DataFrame:
    """Returns the entire underlying data as a DataFrame.

    This method is intended for use by the backtesting engine for
    pre-computation and should not be used in strategy logic.
    """
    raise NotImplementedError

CSVDataSource

Bases: BacktestingDataSource

Backtesting data source backed by a local OHLCV CSV file.

The CSV must contain 'timestamp', 'open', 'high', 'low', 'close', and 'volume' columns. The 'timestamp' column will be parsed as dates.

Source code in src/quantex/sources.py
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
class CSVDataSource(BacktestingDataSource):
    """Backtesting data source backed by a local OHLCV CSV file.

    The CSV must contain 'timestamp', 'open', 'high', 'low', 'close', and
    'volume' columns. The 'timestamp' column will be parsed as dates.
    """

    def __init__(self, path: str | Path, symbol: Optional[str] = None):
        """Initializes the CSVDataSource.

        Args:
            path: The path to the CSV file.
            symbol: The symbol for the data. If None, it's inferred from the
                file name.

        Raises:
            FileNotFoundError: If the CSV file does not exist.
            ValueError: If the CSV is missing required columns.
        """
        self.path = Path(path)
        if not self.path.exists():
            raise FileNotFoundError(self.path)

        df = pd.read_csv(self.path, parse_dates=["timestamp"])
        df = df.set_index("timestamp")
        df = df.sort_index()
        required_cols = {"open", "high", "low", "close", "volume"}
        if not required_cols.issubset(df.columns):
            missing = required_cols.difference(df.columns)
            raise ValueError(f"CSV missing required columns: {missing}")

        self._df = df  # immutable reference
        self.symbol = symbol or self.path.stem
        self.index = 0

    def get_raw_data(self) -> pd.DataFrame:
        return self._df

    def __len__(self) -> int:
        """Returns the number of bars in the data source."""
        return len(self._df)

    def peek_timestamp(self) -> datetime | None:
        """Peeks at the timestamp of the next available bar from the CSV.

        Returns:
            The next timestamp, or None if the source is exhausted.
        """
        if self.index < len(self):
            return self._df.index[self.index]
        return None

    def get_current_bar(self) -> Bar:
        """Returns the current bar from the CSV data."""
        row = self._df.iloc[self.index]
        ts = self._df.index[self.index]
        return Bar(
            timestamp=ts,
            open=row["open"],
            high=row["high"],
            low=row["low"],
            close=row["close"],
            volume=row["volume"],
            symbol=self.symbol,
        )

    def get_lookback_data(self, lookback_period: int) -> pd.DataFrame:
        """Returns a lookback window of data from the CSV.

        Args:
            lookback_period: The size of the lookback window.

        Returns:
            A pandas DataFrame containing the lookback data, inclusive of the current bar.
        """
        start = max(0, self.index - lookback_period + 1)
        return self._df.iloc[start : self.index + 1].copy()

__init__(path, symbol=None)

Initializes the CSVDataSource.

Parameters:

Name Type Description Default
path str | Path

The path to the CSV file.

required
symbol Optional[str]

The symbol for the data. If None, it's inferred from the file name.

None

Raises:

Type Description
FileNotFoundError

If the CSV file does not exist.

ValueError

If the CSV is missing required columns.

Source code in src/quantex/sources.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
def __init__(self, path: str | Path, symbol: Optional[str] = None):
    """Initializes the CSVDataSource.

    Args:
        path: The path to the CSV file.
        symbol: The symbol for the data. If None, it's inferred from the
            file name.

    Raises:
        FileNotFoundError: If the CSV file does not exist.
        ValueError: If the CSV is missing required columns.
    """
    self.path = Path(path)
    if not self.path.exists():
        raise FileNotFoundError(self.path)

    df = pd.read_csv(self.path, parse_dates=["timestamp"])
    df = df.set_index("timestamp")
    df = df.sort_index()
    required_cols = {"open", "high", "low", "close", "volume"}
    if not required_cols.issubset(df.columns):
        missing = required_cols.difference(df.columns)
        raise ValueError(f"CSV missing required columns: {missing}")

    self._df = df  # immutable reference
    self.symbol = symbol or self.path.stem
    self.index = 0

__len__()

Returns the number of bars in the data source.

Source code in src/quantex/sources.py
118
119
120
def __len__(self) -> int:
    """Returns the number of bars in the data source."""
    return len(self._df)

get_current_bar()

Returns the current bar from the CSV data.

Source code in src/quantex/sources.py
132
133
134
135
136
137
138
139
140
141
142
143
144
def get_current_bar(self) -> Bar:
    """Returns the current bar from the CSV data."""
    row = self._df.iloc[self.index]
    ts = self._df.index[self.index]
    return Bar(
        timestamp=ts,
        open=row["open"],
        high=row["high"],
        low=row["low"],
        close=row["close"],
        volume=row["volume"],
        symbol=self.symbol,
    )

get_lookback_data(lookback_period)

Returns a lookback window of data from the CSV.

Parameters:

Name Type Description Default
lookback_period int

The size of the lookback window.

required

Returns:

Type Description
DataFrame

A pandas DataFrame containing the lookback data, inclusive of the current bar.

Source code in src/quantex/sources.py
146
147
148
149
150
151
152
153
154
155
156
def get_lookback_data(self, lookback_period: int) -> pd.DataFrame:
    """Returns a lookback window of data from the CSV.

    Args:
        lookback_period: The size of the lookback window.

    Returns:
        A pandas DataFrame containing the lookback data, inclusive of the current bar.
    """
    start = max(0, self.index - lookback_period + 1)
    return self._df.iloc[start : self.index + 1].copy()

peek_timestamp()

Peeks at the timestamp of the next available bar from the CSV.

Returns:

Type Description
datetime | None

The next timestamp, or None if the source is exhausted.

Source code in src/quantex/sources.py
122
123
124
125
126
127
128
129
130
def peek_timestamp(self) -> datetime | None:
    """Peeks at the timestamp of the next available bar from the CSV.

    Returns:
        The next timestamp, or None if the source is exhausted.
    """
    if self.index < len(self):
        return self._df.index[self.index]
    return None

DataSource

Bases: ABC

Abstract data source for providing market data.

Implementations must provide the current bar via get_current_bar and allow a rolling historical window via get_lookback_data. The internal pointer index starts at 0 and should be advanced by calling _increment_index once the engine has finished processing a bar.

Source code in src/quantex/sources.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
class DataSource(ABC):
    """Abstract data source for providing market data.

    Implementations must provide the *current* bar via `get_current_bar`
    and allow a rolling historical window via `get_lookback_data`.
    The internal pointer `index` starts at 0 and should be advanced by calling
    `_increment_index` once the engine has finished processing a bar.
    """

    index: int = 0
    symbol: str | None = None

    @abstractmethod
    def get_current_bar(self) -> Bar:
        """Returns the bar at the current `index` position."""
        raise NotImplementedError

    @abstractmethod
    def get_lookback_data(self, lookback_period: int) -> pd.DataFrame:
        """Returns a lookback window of data.

        Args:
            lookback_period: The size of the lookback window.

        Returns:
            A pandas DataFrame containing the lookback data, inclusive of the
            current bar.
        """
        raise NotImplementedError

    @abstractmethod
    def peek_timestamp(self) -> datetime | None:
        """Peeks at the timestamp of the next available bar.

        This method should return the timestamp of the bar at the current
        `index` without advancing the index. If the source is exhausted,
        it should return `None`.

        Returns:
            The next timestamp, or None if the source is exhausted.
        """
        raise NotImplementedError

    def _increment_index(self) -> None:
        """Advances the internal pointer to the next bar."""
        self.index += 1

get_current_bar() abstractmethod

Returns the bar at the current index position.

Source code in src/quantex/sources.py
27
28
29
30
@abstractmethod
def get_current_bar(self) -> Bar:
    """Returns the bar at the current `index` position."""
    raise NotImplementedError

get_lookback_data(lookback_period) abstractmethod

Returns a lookback window of data.

Parameters:

Name Type Description Default
lookback_period int

The size of the lookback window.

required

Returns:

Type Description
DataFrame

A pandas DataFrame containing the lookback data, inclusive of the

DataFrame

current bar.

Source code in src/quantex/sources.py
32
33
34
35
36
37
38
39
40
41
42
43
@abstractmethod
def get_lookback_data(self, lookback_period: int) -> pd.DataFrame:
    """Returns a lookback window of data.

    Args:
        lookback_period: The size of the lookback window.

    Returns:
        A pandas DataFrame containing the lookback data, inclusive of the
        current bar.
    """
    raise NotImplementedError

peek_timestamp() abstractmethod

Peeks at the timestamp of the next available bar.

This method should return the timestamp of the bar at the current index without advancing the index. If the source is exhausted, it should return None.

Returns:

Type Description
datetime | None

The next timestamp, or None if the source is exhausted.

Source code in src/quantex/sources.py
45
46
47
48
49
50
51
52
53
54
55
56
@abstractmethod
def peek_timestamp(self) -> datetime | None:
    """Peeks at the timestamp of the next available bar.

    This method should return the timestamp of the bar at the current
    `index` without advancing the index. If the source is exhausted,
    it should return `None`.

    Returns:
        The next timestamp, or None if the source is exhausted.
    """
    raise NotImplementedError

ParquetDataSource

Bases: BacktestingDataSource

Backtesting data source backed by a local OHLCV Parquet file.

The Parquet file must contain either an index of timestamps or a column named 'timestamp', as well as the standard OHLCV columns. If a 'timestamp' column exists, it will be parsed and set as the index.

Source code in src/quantex/sources.py
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
class ParquetDataSource(BacktestingDataSource):
    """Backtesting data source backed by a local OHLCV Parquet file.

    The Parquet file must contain either an index of timestamps or a column
    named 'timestamp', as well as the standard OHLCV columns. If a
    'timestamp' column exists, it will be parsed and set as the index.
    """

    def __init__(self, path: str | Path, symbol: Optional[str] = None):
        """Initializes the ParquetDataSource.

        Args:
            path: Path to the Parquet file on disk.
            symbol: Optional symbol name. If omitted, the stem of the path is
                used instead.
        """
        if not isinstance(path, Path):
            path = Path(path)
        self.path = path
        if not self.path.exists():
            raise FileNotFoundError(self.path)

        # Load data – let pandas/FastParquet handle decompression & column types
        df = pd.read_parquet(self.path)

        # If timestamp is a regular column, make it the index
        if "timestamp" in df.columns:
            df["timestamp"] = pd.to_datetime(df["timestamp"], utc=True)
            df = df.set_index("timestamp")

        # Ensure chronological order
        df = df.sort_index()

        required_cols = {"open", "high", "low", "close", "volume"}
        if not required_cols.issubset(df.columns):
            missing = required_cols.difference(df.columns)
            raise ValueError(f"Parquet missing required columns: {missing}")

        # Immutable reference to underlying data
        self._df = df
        self.symbol = symbol or self.path.stem
        self.index = 0

    # --- BacktestingDataSource API -----------------------------------------
    def get_raw_data(self) -> pd.DataFrame:  # type: ignore[override]
        return self._df

    def __len__(self) -> int:  # type: ignore[override]
        return len(self._df)

    def peek_timestamp(self) -> datetime | None:  # type: ignore[override]
        if self.index < len(self):
            return self._df.index[self.index]
        return None

    def get_current_bar(self) -> Bar:  # type: ignore[override]
        row = self._df.iloc[self.index]
        ts = self._df.index[self.index]
        return Bar(
            timestamp=ts,
            open=row["open"],
            high=row["high"],
            low=row["low"],
            close=row["close"],
            volume=row["volume"],
            symbol=self.symbol,
        )

    def get_lookback_data(self, lookback_period: int) -> pd.DataFrame:  # type: ignore[override]
        start = max(0, self.index - lookback_period + 1)
        return self._df.iloc[start : self.index + 1].copy()

__init__(path, symbol=None)

Initializes the ParquetDataSource.

Parameters:

Name Type Description Default
path str | Path

Path to the Parquet file on disk.

required
symbol Optional[str]

Optional symbol name. If omitted, the stem of the path is used instead.

None
Source code in src/quantex/sources.py
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
def __init__(self, path: str | Path, symbol: Optional[str] = None):
    """Initializes the ParquetDataSource.

    Args:
        path: Path to the Parquet file on disk.
        symbol: Optional symbol name. If omitted, the stem of the path is
            used instead.
    """
    if not isinstance(path, Path):
        path = Path(path)
    self.path = path
    if not self.path.exists():
        raise FileNotFoundError(self.path)

    # Load data – let pandas/FastParquet handle decompression & column types
    df = pd.read_parquet(self.path)

    # If timestamp is a regular column, make it the index
    if "timestamp" in df.columns:
        df["timestamp"] = pd.to_datetime(df["timestamp"], utc=True)
        df = df.set_index("timestamp")

    # Ensure chronological order
    df = df.sort_index()

    required_cols = {"open", "high", "low", "close", "volume"}
    if not required_cols.issubset(df.columns):
        missing = required_cols.difference(df.columns)
        raise ValueError(f"Parquet missing required columns: {missing}")

    # Immutable reference to underlying data
    self._df = df
    self.symbol = symbol or self.path.stem
    self.index = 0