Data API¶
Synthetic Generators¶
synthetic
¶
Synthetic data generators: GBM, Heston, Merton jump-diffusion.
gbm_paths(s0, mu, sigma, T, n_steps, n_paths, seed=None)
¶
Generate Geometric Brownian Motion sample paths.
Parameters¶
s0 : float Initial price. mu : float Drift (annualized). sigma : float Volatility (annualized). T : float Time horizon in years. n_steps : int Number of time steps. n_paths : int Number of simulated paths. seed : int | None Random seed.
Returns¶
NDArray of shape (n_paths, n_steps + 1) Simulated price paths including the initial price.
heston_paths(s0, v0, kappa, theta, xi, rho, mu, T, n_steps, n_paths, seed=None)
¶
Generate Heston stochastic-volatility model paths.
Uses Euler-Maruyama discretization with full truncation for the variance process to avoid negative variances.
Parameters¶
s0, v0 : float Initial price and variance. kappa : float Mean-reversion speed. theta : float Long-run variance. xi : float Volatility of variance (vol-of-vol). rho : float Correlation between price and variance Brownian motions. mu : float Drift. T : float Time horizon. n_steps, n_paths : int Discretization and sample size. seed : int | None Random seed.
Returns¶
(prices, variances) : tuple of NDArray, each shape (n_paths, n_steps + 1)
merton_jump_paths(s0, mu, sigma, lam, jump_mean, jump_std, T, n_steps, n_paths, seed=None)
¶
Generate Merton jump-diffusion model paths.
Parameters¶
s0 : float Initial price. mu, sigma : float Drift and volatility of the diffusion component. lam : float Jump intensity (expected jumps per year). jump_mean, jump_std : float Mean and std of log-jump size (normal distribution). T : float Time horizon. n_steps, n_paths : int Discretization and sample size. seed : int | None Random seed.
Returns¶
NDArray of shape (n_paths, n_steps + 1)
Macro Data (FRED)¶
macro
¶
Macro data provider using FRED API.
Fetches risk-free rates, yield curves, and macro indicators from the Federal Reserve Economic Data (FRED) API via the fredapi package.
Requires: pip install fredapi Requires FRED_API_KEY environment variable or passed directly.
References¶
FRED: https://fred.stlouisfed.org/
FREDProvider
¶
Fetch macroeconomic data from FRED.
Parameters¶
api_key : str | None FRED API key. If None, reads from FRED_API_KEY env variable.
get_series(series_id, start=None, end=None)
¶
get_risk_free_rate(maturity='3m', start=None, end=None)
¶
Fetch Treasury yield as a risk-free rate proxy.
Returns annualized rate as a decimal (e.g., 0.05 for 5%).
get_yield_curve(date)
¶
Fetch the Treasury yield curve for a specific date.
Returns a Series indexed by maturity label with annualized yields.
get_vix(start=None, end=None)
¶
Fetch VIX (implied volatility index).
Bloomberg¶
bloomberg
¶
Bloomberg data provider using the blpapi SDK.
Provides access to Bloomberg Terminal data including: - Historical end-of-day data for equities, fixed income, and FX - Real-time streaming for live portfolio monitoring - Corporate actions: dividends, splits, mergers
Requires: Bloomberg Terminal license + pip install blpapi
References¶
Bloomberg Open API: https://www.bloomberg.com/professional/support/api-library/
BloombergDataSource
¶
Fetch market data from Bloomberg Terminal via blpapi.
Parameters¶
config : BloombergConfig | None Session parameters. Defaults to localhost:8194. field_map : dict[str, str] | None Bloomberg -> qufin field mapping override. cache : bool Whether to cache downloaded data to parquet via qufin cache layer.
Examples¶
from qufin.data.bloomberg import BloombergDataSource bbg = BloombergDataSource() prices = bbg.get_historical( ... ["AAPL US Equity", "MSFT US Equity"], ... start="2024-01-01", end="2024-06-30", ... )
close()
¶
Stop the session and clean up subscriptions.
get_historical(tickers, start, end, fields=None, frequency='DAILY', currency=None)
¶
Fetch historical end-of-day data.
Parameters¶
tickers : list[str]
Bloomberg tickers (e.g. ["AAPL US Equity"]).
start, end : str
Date strings YYYY-MM-DD.
fields : list[str] | None
Bloomberg fields to request. Defaults to ["PX_LAST"].
frequency : str
DAILY, WEEKLY, or MONTHLY.
currency : str | None
Override currency for cross-asset comparison (e.g. "USD").
Returns¶
pd.DataFrame Multi-indexed by (date, ticker) or pivoted with tickers as columns when a single field is requested.
Raises¶
ValueError If tickers list is empty. ConnectionError If the Bloomberg session cannot be established.
get_prices(tickers, start, end, frequency='DAILY')
¶
Fetch closing prices — convenience wrapper around get_historical.
Returns¶
pd.DataFrame Columns are tickers, index is datetime.
get_returns(tickers, start, end, frequency='DAILY')
¶
get_reference(tickers, fields)
¶
get_dividends(tickers, start, end)
¶
get_splits(tickers, start, end)
¶
subscribe(tickers, fields=None, callback=None, interval=0.0)
¶
Subscribe to real-time market data.
Parameters¶
tickers : list[str]
Bloomberg tickers to subscribe to.
fields : list[str] | None
Fields to stream. Defaults to ["LAST_PRICE", "BID", "ASK"].
callback : callable | None
Called with a StreamTick for each incoming update. If None,
ticks are logged at DEBUG level.
interval : float
Minimum interval in seconds between updates (0 = every tick).
Raises¶
ValueError If tickers list is empty.
unsubscribe_all()
¶
Cancel all active real-time subscriptions.
BloombergConfig
dataclass
¶
Configuration for Bloomberg session.
Parameters¶
host : str Bloomberg server host. port : int Bloomberg server port. timeout_ms : int Session start timeout in milliseconds. max_pending : int Maximum pending requests.
BloombergSession
¶
Manages a blpapi session lifecycle.
Parameters¶
config : BloombergConfig | None Session configuration. Uses defaults if None.
normalize_fields(raw, field_map=None)
¶
Refinitiv / LSEG¶
refinitiv
¶
Refinitiv/LSEG data provider using the Eikon Data API.
Fetches equities, fixed income, and derivatives data from the Refinitiv Eikon / LSEG Workspace platform. An alternative to Bloomberg for firms in the LSEG ecosystem.
Requires: pip install eikon (or refinitiv-data) Requires an active Eikon / Workspace session or an API proxy.
References¶
Eikon Data API: https://developers.lseg.com/en/api-catalog/eikon/eikon-data-api
RefinitivDataSource
¶
Fetch market data from Refinitiv Eikon / LSEG Workspace.
Parameters¶
config : RefinitivConfig | None
Connection configuration. When None, a default config is used
and the app_key must be supplied separately via :meth:set_app_key.
set_app_key(app_key)
¶
Set (or reset) the Eikon application key at runtime.
get_equity_prices(rics, start, end, interval='daily')
¶
get_equity_ohlcv(rics, start, end, interval='daily')
¶
get_equity_returns(rics, start, end, frequency='D')
¶
get_bond_data(rics, fields=None)
¶
get_yield_curve(currency='USD', rics=None)
¶
get_option_chain(underlying_ric, fields=None)
¶
get_futures_data(rics, fields=None)
¶
get_data(rics, fields)
¶
get_timeseries(rics, fields, start, end, interval='daily')
¶
RefinitivConfig
dataclass
¶
Configuration for the Refinitiv Eikon connection.
Parameters¶
app_key : str Eikon Data API application key (aka app-key / api-key). timeout : int Request timeout in seconds. Default 30. cache : bool Whether to cache downloaded data to parquet via qufin caching.
Real-Time Streaming¶
streaming
¶
Real-time WebSocket streaming for live market data.
Supports Alpaca, Polygon, and IEX providers. Provides event-driven rebalancing triggers (threshold drift, signal-based regime change) and latency monitoring with heartbeat.
Requires websockets (optional dependency).
PriceStream
¶
Async WebSocket price stream with buffering, rebalancing, and monitoring.
Parameters¶
config : StreamConfig
Connection and subscription settings.
rebalance_config : RebalanceConfig | None
If provided, enables automatic rebalance triggers.
target_weights : dict[str, float] | None
Target portfolio weights for drift detection.
holdings : dict[str, float] | None
Current share holdings for portfolio tracking.
on_tick : Callable | None
Callback invoked on each parsed tick: on_tick(tick_dict).
on_rebalance : Callable | None
Callback invoked when a rebalance fires: on_rebalance(current_weights).
StreamConfig
dataclass
¶
Configuration for a price stream.
Parameters¶
provider : Provider Which WebSocket data provider to use. api_key : str API key / token for the provider. tickers : list[str] Symbols to subscribe to. buffer_size : int Maximum number of price ticks to buffer per ticker. heartbeat_interval : float Seconds between heartbeat pings. url_override : str | None Override the default WebSocket URL (useful for testing).
PriceBuffer
¶
Thread-safe (asyncio-safe) ring buffer for price ticks per ticker.
LatencyMonitor
dataclass
¶
Tracks message latency and heartbeat health.
is_healthy(heartbeat_interval)
¶
True if we received a heartbeat within 2x the expected interval.
PortfolioTracker
¶
Track live portfolio value from streaming prices.
Parameters¶
holdings : dict[str, float] Number of shares held per ticker.
RebalanceTrigger
¶
Evaluates whether a rebalance should fire.
Parameters¶
config : RebalanceConfig Threshold and signal configuration. target_weights : dict[str, float] Target portfolio weights keyed by ticker.
RebalanceConfig
dataclass
¶
Configuration for rebalance triggers.
Parameters¶
drift_threshold : float Trigger rebalance when any asset drifts more than this fraction from its target weight (e.g. 0.05 = 5%). signal_callback : Callable | None Optional callback that returns True when a regime-change signal fires (e.g. from a VQC classifier). cooldown : float Minimum seconds between consecutive rebalances.
Parquet Data Warehouse¶
warehouse
¶
Parquet data warehouse with partitioned storage.
Stores market data in a asset_class/ticker/year.parquet hierarchy.
Supports predicate pushdown via PyArrow, automatic compaction, and
deduplication on (ticker, date) composite key.
Integrates with :mod:qufin.data.cache to check the warehouse before
making API calls.
Requires pyarrow (optional dependency).
ParquetWarehouse
¶
Partitioned Parquet data warehouse.
Data is stored as::
root_dir/
asset_class/
ticker/
2023.parquet
2024.parquet
Parameters¶
config : WarehouseConfig | None Warehouse configuration. Defaults to WarehouseConfig().
write(df, asset_class, ticker)
¶
Write a DataFrame partitioned by year.
The DataFrame must have a column matching config.date_column.
Data is deduplicated on (ticker, date) before writing.
Parameters¶
df : pd.DataFrame Market data to store. asset_class : str Asset class partition key (e.g. "equity", "option"). ticker : str Ticker symbol.
Returns¶
list[Path] Paths of written parquet files.
read(asset_class, ticker, start_date=None, end_date=None, columns=None)
¶
has_data(asset_class, ticker, year=None)
¶
Check whether data exists for a given partition.
list_tickers(asset_class)
¶
List all tickers stored under an asset class.
list_years(asset_class, ticker)
¶
List available years for a ticker.
compact(asset_class, ticker)
¶
Merge small fragment files within a partition.
If a year's directory contains multiple parquet fragment files (from incremental writes stored outside the year-naming scheme), this method consolidates them.
Returns the number of files merged.
auto_compact(asset_class=None)
¶
WarehouseConfig
dataclass
¶
Configuration for the parquet data warehouse.
Parameters¶
root_dir : Path Root directory for partitioned storage. compaction_threshold : int Merge small files when a partition has more than this many files. dedup_keys : list[str] Columns used for deduplication. date_column : str Name of the date/datetime column in DataFrames.
warehouse_or_fetch(warehouse, asset_class, ticker, start_date, end_date, fetch_fn)
¶
Check warehouse first; if data missing, call fetch_fn and store.
Parameters¶
warehouse : ParquetWarehouse
Warehouse instance.
asset_class, ticker, start_date, end_date : str
Query parameters.
fetch_fn : callable
fetch_fn(ticker, start_date, end_date) -> pd.DataFrame
Returns¶
pd.DataFrame Market data.
Data Quality¶
quality
¶
Data quality framework: gap detection, outlier flagging, corporate action adjustment, data lineage tracking, and per-ticker quality scoring.
All computations use only numpy and pandas (no extra dependencies).
DataLineage
dataclass
¶
Provenance record for a price series.
Parameters¶
ticker : str Ticker symbol. source : str Data provider name. fetch_timestamp : datetime.datetime When the raw data was obtained (UTC). transformations : list[TransformationStep] Ordered list of transformations applied.
add_step(name, description, parameters=None)
¶
Append a transformation step.
GapReport
dataclass
¶
Result of gap detection.
Attributes¶
missing_dates : list[datetime.date] Trading days with no data. total_expected : int Number of expected trading days. total_present : int Number of trading days actually present. gap_fraction : float Fraction of expected days that are missing.
OutlierReport
dataclass
¶
Result of outlier detection.
Attributes¶
outlier_dates : list[datetime.date] Dates where daily return exceeded the sigma threshold. outlier_returns : list[float] Corresponding return values. sigma_threshold : float Threshold used (number of standard deviations). mean : float Mean of daily returns. std : float Standard deviation of daily returns.
QualityScore
dataclass
¶
Per-ticker quality assessment.
Each sub-score is in [0, 1]; overall is their weighted average.
Attributes¶
ticker : str Ticker symbol. completeness : float 1 - gap_fraction. freshness : float Decays with age of most recent observation. consistency : float 1 - (outlier fraction). overall : float Weighted combination.
detect_gaps(prices, trading_calendar=None)
¶
Detect missing trading days in a price series.
Parameters¶
prices : pd.Series or pd.DataFrame
Price data with a DatetimeIndex. If DataFrame, the index is
used (gaps are detected at the row level).
trading_calendar : pd.DatetimeIndex, optional
Expected trading days. If None, pandas.bdate_range is used.
Returns¶
GapReport
detect_outliers(prices, sigma_threshold=5.0)
¶
adjust_for_splits(prices, splits, lineage=None)
¶
Adjust historical prices for stock splits.
Prices before the split date are divided by the cumulative split ratio so that the series is continuous.
Parameters¶
prices : pd.Series
Raw price series (DatetimeIndex).
splits : sequence of SplitEvent
Split events to apply, in any order.
lineage : DataLineage, optional
If provided, a transformation step is recorded.
Returns¶
pd.Series Adjusted price series.
adjust_for_dividends(prices, dividends, lineage=None)
¶
Adjust historical prices for cash dividends.
Prices before the ex-dividend date are reduced by the ratio
(close - dividend) / close on the ex-date, so the series is
continuous.
Parameters¶
prices : pd.Series
Raw price series (DatetimeIndex).
dividends : sequence of DividendEvent
Dividend events to apply.
lineage : DataLineage, optional
If provided, a transformation step is recorded.
Returns¶
pd.Series Adjusted price series.
compute_quality_score(prices, ticker, reference_date=None, freshness_halflife_days=30, trading_calendar=None, sigma_threshold=5.0, weights=(0.4, 0.3, 0.3))
¶
Compute a composite quality score for a single ticker.
Parameters¶
prices : pd.Series
Price series with DatetimeIndex.
ticker : str
Ticker symbol.
reference_date : datetime.date, optional
"Today" for freshness calculation. Defaults to date.today().
freshness_halflife_days : int
Half-life in days for exponential freshness decay.
trading_calendar : pd.DatetimeIndex, optional
Expected trading days for gap detection.
sigma_threshold : float
Sigma threshold for outlier detection.
weights : tuple of 3 floats
Weights for (completeness, freshness, consistency).
Returns¶
QualityScore