Skip to content

quantex.engine

Core event loop orchestrating data flow between sources, strategy and simulator.

The EventBus pre-computes a global timeline, vectorises price data into NumPy arrays for speed, injects market snapshots into the running quantex.strategy.Strategy and records NAV, orders and fills.

Google-style docstrings are used throughout for consistency.

EventBus

Lightweight dispatcher that coordinates data, strategy, and execution.

This class is the central coordinator of the backtesting engine. It fetches data from data sources, passes it to the strategy for processing, and sends any generated orders to the execution simulator.

Source code in src/quantex/engine.py
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
class EventBus:
    """Lightweight dispatcher that coordinates data, strategy, and execution.

    This class is the central coordinator of the backtesting engine. It fetches
    data from data sources, passes it to the strategy for processing, and
    sends any generated orders to the execution simulator.
    """

    def __init__(
        self,
        strategy: Strategy,
        data_sources: Mapping[str, BacktestingDataSource],
        simulator: ImmediateFillSimulator,
    ) -> None:
        """Initializes the EventBus.

        Args:
            strategy: The trading strategy to be executed.
            data_sources: A dictionary of data sources.
            simulator: The execution simulator.
        """
        self.strategy = strategy
        self.data_sources = data_sources
        self.simulator = simulator

        setattr(self.strategy, "event_bus", self)

        self.orders: list[Order] = []
        self.fills: list[Fill] = []
        self.nav: list[float] = []
        self.timestamps: list[datetime] = []
        # Pre-computed event timeline
        self._timeline: list[datetime] = []
        self._price_df: pd.DataFrame | None = None

    def _precompute_timeline(self) -> None:
        """Computes a *synchronised* global timeline.

        The timeline now contains **only** those timestamps that are present in
        *all* data sources (i.e. the set intersection). This guarantees that
        every bar processed by the engine has a corresponding observation for
        every symbol, removing the need for forward-filling or other alignment
        work-arounds downstream.
        """

        timeline: pd.Index | None = None
        for ds in self.data_sources.values():
            idx = ds.get_raw_data().index
            timeline = idx if timeline is None else timeline.intersection(idx)

        # No common timestamps → empty timeline
        if timeline is None:
            self._timeline = []
        else:
            # ``intersection`` preserves order of the left operand, but we
            # explicitly sort to ensure monotonically increasing timestamps.
            self._timeline = timeline.sort_values().to_list()

    def _precompute_price_data(self) -> None:
        """Builds a price matrix strictly aligned to the global timeline."""

        # Gather close price series for each symbol
        price_series: dict[str, pd.Series] = {}
        for _, ds in self.data_sources.items():
            raw_data = ds.get_raw_data()
            if "close" in raw_data.columns and ds.symbol:
                price_series[ds.symbol] = raw_data["close"]

        # Assemble into a single DataFrame and forward-fill within each column
        price_df = pd.DataFrame(price_series).sort_index().ffill()

        # Restrict to *only* the timestamps present in ``self._timeline`` so the
        # row index aligns 1-to-1 with the event loop.
        if self._timeline:
            price_df = price_df.loc[self._timeline]

        self._price_df = price_df

        if not self._price_df.empty:
            self._price_array = self._price_df.to_numpy(dtype=float)
            self._symbols = list(self._price_df.columns)
            self._symbol_idx = {sym: i for i, sym in enumerate(self._symbols)}

    def run(self) -> None:
        """Runs the simulation until all data is exhausted.

        This method orchestrates the event loop, which proceeds in timestamp
        order. At each step, it determines the earliest timestamp among all
        data sources, processes the data for that moment, executes strategy
        logic, and advances the data sources that produced the event. This
        ensures that data from multiple sources is handled chronologically.
        """
        self._precompute_timeline()
        self._precompute_price_data()

        if self._price_df is None:
            return  # No data to process

        for row_idx, ts in enumerate(self._timeline):
            self.strategy.timestamp = ts

            price_row = self._price_array[row_idx]

            # Inject current market snapshot into strategy (very low overhead)
            self.strategy._update_market_data(
                price_row, self._symbols, self._symbol_idx
            )

            # Align each data source to the global timeline & advance pointer
            for ds in self.data_sources.values():
                if ds.peek_timestamp() == ts:
                    ds._increment_index()

            # Run strategy logic
            self.strategy.run()

            # Execute orders
            new_orders = self.strategy._pop_pending_orders()
            self.orders.extend(new_orders)
            for order in new_orders:
                # Guard: if symbol not in price mapping, skip execution
                idx = self._symbol_idx.get(order.symbol)
                if idx is None:
                    continue
                execution_price = float(price_row[idx])
                fill = self.simulator.execute(order, execution_price, ts)
                self.fills.append(fill)

            # Record NAV (vectorised prices dict → float conversion once)
            nav = self.strategy.portfolio.net_asset_value_array(
                price_row, self._symbol_idx
            )
            self.nav.append(nav)
            self.timestamps.append(ts)

            # Advance strategy book-keeping pointer
            self.strategy._increment_index()

__init__(strategy, data_sources, simulator)

Initializes the EventBus.

Parameters:

Name Type Description Default
strategy Strategy

The trading strategy to be executed.

required
data_sources Mapping[str, BacktestingDataSource]

A dictionary of data sources.

required
simulator ImmediateFillSimulator

The execution simulator.

required
Source code in src/quantex/engine.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
def __init__(
    self,
    strategy: Strategy,
    data_sources: Mapping[str, BacktestingDataSource],
    simulator: ImmediateFillSimulator,
) -> None:
    """Initializes the EventBus.

    Args:
        strategy: The trading strategy to be executed.
        data_sources: A dictionary of data sources.
        simulator: The execution simulator.
    """
    self.strategy = strategy
    self.data_sources = data_sources
    self.simulator = simulator

    setattr(self.strategy, "event_bus", self)

    self.orders: list[Order] = []
    self.fills: list[Fill] = []
    self.nav: list[float] = []
    self.timestamps: list[datetime] = []
    # Pre-computed event timeline
    self._timeline: list[datetime] = []
    self._price_df: pd.DataFrame | None = None

run()

Runs the simulation until all data is exhausted.

This method orchestrates the event loop, which proceeds in timestamp order. At each step, it determines the earliest timestamp among all data sources, processes the data for that moment, executes strategy logic, and advances the data sources that produced the event. This ensures that data from multiple sources is handled chronologically.

Source code in src/quantex/engine.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
def run(self) -> None:
    """Runs the simulation until all data is exhausted.

    This method orchestrates the event loop, which proceeds in timestamp
    order. At each step, it determines the earliest timestamp among all
    data sources, processes the data for that moment, executes strategy
    logic, and advances the data sources that produced the event. This
    ensures that data from multiple sources is handled chronologically.
    """
    self._precompute_timeline()
    self._precompute_price_data()

    if self._price_df is None:
        return  # No data to process

    for row_idx, ts in enumerate(self._timeline):
        self.strategy.timestamp = ts

        price_row = self._price_array[row_idx]

        # Inject current market snapshot into strategy (very low overhead)
        self.strategy._update_market_data(
            price_row, self._symbols, self._symbol_idx
        )

        # Align each data source to the global timeline & advance pointer
        for ds in self.data_sources.values():
            if ds.peek_timestamp() == ts:
                ds._increment_index()

        # Run strategy logic
        self.strategy.run()

        # Execute orders
        new_orders = self.strategy._pop_pending_orders()
        self.orders.extend(new_orders)
        for order in new_orders:
            # Guard: if symbol not in price mapping, skip execution
            idx = self._symbol_idx.get(order.symbol)
            if idx is None:
                continue
            execution_price = float(price_row[idx])
            fill = self.simulator.execute(order, execution_price, ts)
            self.fills.append(fill)

        # Record NAV (vectorised prices dict → float conversion once)
        nav = self.strategy.portfolio.net_asset_value_array(
            price_row, self._symbol_idx
        )
        self.nav.append(nav)
        self.timestamps.append(ts)

        # Advance strategy book-keeping pointer
        self.strategy._increment_index()