Core event loop orchestrating data flow between sources, strategy and simulator.
The EventBus
pre-computes a global timeline, vectorises price data into
NumPy arrays for speed, injects market snapshots into the running
quantex.strategy.Strategy
and records NAV, orders and fills.
Google-style docstrings are used throughout for consistency.
EventBus
Lightweight dispatcher that coordinates data, strategy, and execution.
This class is the central coordinator of the backtesting engine. It fetches
data from data sources, passes it to the strategy for processing, and
sends any generated orders to the execution simulator.
Source code in src/quantex/engine.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158 | class EventBus:
"""Lightweight dispatcher that coordinates data, strategy, and execution.
This class is the central coordinator of the backtesting engine. It fetches
data from data sources, passes it to the strategy for processing, and
sends any generated orders to the execution simulator.
"""
def __init__(
self,
strategy: Strategy,
data_sources: Mapping[str, BacktestingDataSource],
simulator: ImmediateFillSimulator,
) -> None:
"""Initializes the EventBus.
Args:
strategy: The trading strategy to be executed.
data_sources: A dictionary of data sources.
simulator: The execution simulator.
"""
self.strategy = strategy
self.data_sources = data_sources
self.simulator = simulator
setattr(self.strategy, "event_bus", self)
self.orders: list[Order] = []
self.fills: list[Fill] = []
self.nav: list[float] = []
self.timestamps: list[datetime] = []
# Pre-computed event timeline
self._timeline: list[datetime] = []
self._price_df: pd.DataFrame | None = None
def _precompute_timeline(self) -> None:
"""Computes a *synchronised* global timeline.
The timeline now contains **only** those timestamps that are present in
*all* data sources (i.e. the set intersection). This guarantees that
every bar processed by the engine has a corresponding observation for
every symbol, removing the need for forward-filling or other alignment
work-arounds downstream.
"""
timeline: pd.Index | None = None
for ds in self.data_sources.values():
idx = ds.get_raw_data().index
timeline = idx if timeline is None else timeline.intersection(idx)
# No common timestamps → empty timeline
if timeline is None:
self._timeline = []
else:
# ``intersection`` preserves order of the left operand, but we
# explicitly sort to ensure monotonically increasing timestamps.
self._timeline = timeline.sort_values().to_list()
def _precompute_price_data(self) -> None:
"""Builds a price matrix strictly aligned to the global timeline."""
# Gather close price series for each symbol
price_series: dict[str, pd.Series] = {}
for _, ds in self.data_sources.items():
raw_data = ds.get_raw_data()
if "close" in raw_data.columns and ds.symbol:
price_series[ds.symbol] = raw_data["close"]
# Assemble into a single DataFrame and forward-fill within each column
price_df = pd.DataFrame(price_series).sort_index().ffill()
# Restrict to *only* the timestamps present in ``self._timeline`` so the
# row index aligns 1-to-1 with the event loop.
if self._timeline:
price_df = price_df.loc[self._timeline]
self._price_df = price_df
if not self._price_df.empty:
self._price_array = self._price_df.to_numpy(dtype=float)
self._symbols = list(self._price_df.columns)
self._symbol_idx = {sym: i for i, sym in enumerate(self._symbols)}
def run(self) -> None:
"""Runs the simulation until all data is exhausted.
This method orchestrates the event loop, which proceeds in timestamp
order. At each step, it determines the earliest timestamp among all
data sources, processes the data for that moment, executes strategy
logic, and advances the data sources that produced the event. This
ensures that data from multiple sources is handled chronologically.
"""
self._precompute_timeline()
self._precompute_price_data()
if self._price_df is None:
return # No data to process
for row_idx, ts in enumerate(self._timeline):
self.strategy.timestamp = ts
price_row = self._price_array[row_idx]
# Inject current market snapshot into strategy (very low overhead)
self.strategy._update_market_data(
price_row, self._symbols, self._symbol_idx
)
# Align each data source to the global timeline & advance pointer
for ds in self.data_sources.values():
if ds.peek_timestamp() == ts:
ds._increment_index()
# Run strategy logic
self.strategy.run()
# Execute orders
new_orders = self.strategy._pop_pending_orders()
self.orders.extend(new_orders)
for order in new_orders:
# Guard: if symbol not in price mapping, skip execution
idx = self._symbol_idx.get(order.symbol)
if idx is None:
continue
execution_price = float(price_row[idx])
fill = self.simulator.execute(order, execution_price, ts)
self.fills.append(fill)
# Record NAV (vectorised prices dict → float conversion once)
nav = self.strategy.portfolio.net_asset_value_array(
price_row, self._symbol_idx
)
self.nav.append(nav)
self.timestamps.append(ts)
# Advance strategy book-keeping pointer
self.strategy._increment_index()
|
__init__(strategy, data_sources, simulator)
Initializes the EventBus.
Parameters:
Source code in src/quantex/engine.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55 | def __init__(
self,
strategy: Strategy,
data_sources: Mapping[str, BacktestingDataSource],
simulator: ImmediateFillSimulator,
) -> None:
"""Initializes the EventBus.
Args:
strategy: The trading strategy to be executed.
data_sources: A dictionary of data sources.
simulator: The execution simulator.
"""
self.strategy = strategy
self.data_sources = data_sources
self.simulator = simulator
setattr(self.strategy, "event_bus", self)
self.orders: list[Order] = []
self.fills: list[Fill] = []
self.nav: list[float] = []
self.timestamps: list[datetime] = []
# Pre-computed event timeline
self._timeline: list[datetime] = []
self._price_df: pd.DataFrame | None = None
|
run()
Runs the simulation until all data is exhausted.
This method orchestrates the event loop, which proceeds in timestamp
order. At each step, it determines the earliest timestamp among all
data sources, processes the data for that moment, executes strategy
logic, and advances the data sources that produced the event. This
ensures that data from multiple sources is handled chronologically.
Source code in src/quantex/engine.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158 | def run(self) -> None:
"""Runs the simulation until all data is exhausted.
This method orchestrates the event loop, which proceeds in timestamp
order. At each step, it determines the earliest timestamp among all
data sources, processes the data for that moment, executes strategy
logic, and advances the data sources that produced the event. This
ensures that data from multiple sources is handled chronologically.
"""
self._precompute_timeline()
self._precompute_price_data()
if self._price_df is None:
return # No data to process
for row_idx, ts in enumerate(self._timeline):
self.strategy.timestamp = ts
price_row = self._price_array[row_idx]
# Inject current market snapshot into strategy (very low overhead)
self.strategy._update_market_data(
price_row, self._symbols, self._symbol_idx
)
# Align each data source to the global timeline & advance pointer
for ds in self.data_sources.values():
if ds.peek_timestamp() == ts:
ds._increment_index()
# Run strategy logic
self.strategy.run()
# Execute orders
new_orders = self.strategy._pop_pending_orders()
self.orders.extend(new_orders)
for order in new_orders:
# Guard: if symbol not in price mapping, skip execution
idx = self._symbol_idx.get(order.symbol)
if idx is None:
continue
execution_price = float(price_row[idx])
fill = self.simulator.execute(order, execution_price, ts)
self.fills.append(fill)
# Record NAV (vectorised prices dict → float conversion once)
nav = self.strategy.portfolio.net_asset_value_array(
price_row, self._symbol_idx
)
self.nav.append(nav)
self.timestamps.append(ts)
# Advance strategy book-keeping pointer
self.strategy._increment_index()
|