Skip to content

Data API

Synthetic Generators

synthetic

Synthetic data generators: GBM, Heston, Merton jump-diffusion.

gbm_paths(s0, mu, sigma, T, n_steps, n_paths, seed=None)

Generate Geometric Brownian Motion sample paths.

Parameters

s0 : float Initial price. mu : float Drift (annualized). sigma : float Volatility (annualized). T : float Time horizon in years. n_steps : int Number of time steps. n_paths : int Number of simulated paths. seed : int | None Random seed.

Returns

NDArray of shape (n_paths, n_steps + 1) Simulated price paths including the initial price.

heston_paths(s0, v0, kappa, theta, xi, rho, mu, T, n_steps, n_paths, seed=None)

Generate Heston stochastic-volatility model paths.

Uses Euler-Maruyama discretization with full truncation for the variance process to avoid negative variances.

Parameters

s0, v0 : float Initial price and variance. kappa : float Mean-reversion speed. theta : float Long-run variance. xi : float Volatility of variance (vol-of-vol). rho : float Correlation between price and variance Brownian motions. mu : float Drift. T : float Time horizon. n_steps, n_paths : int Discretization and sample size. seed : int | None Random seed.

Returns

(prices, variances) : tuple of NDArray, each shape (n_paths, n_steps + 1)

merton_jump_paths(s0, mu, sigma, lam, jump_mean, jump_std, T, n_steps, n_paths, seed=None)

Generate Merton jump-diffusion model paths.

Parameters

s0 : float Initial price. mu, sigma : float Drift and volatility of the diffusion component. lam : float Jump intensity (expected jumps per year). jump_mean, jump_std : float Mean and std of log-jump size (normal distribution). T : float Time horizon. n_steps, n_paths : int Discretization and sample size. seed : int | None Random seed.

Returns

NDArray of shape (n_paths, n_steps + 1)

Macro Data (FRED)

macro

Macro data provider using FRED API.

Fetches risk-free rates, yield curves, and macro indicators from the Federal Reserve Economic Data (FRED) API via the fredapi package.

Requires: pip install fredapi Requires FRED_API_KEY environment variable or passed directly.

References

FRED: https://fred.stlouisfed.org/

FREDProvider

Fetch macroeconomic data from FRED.

Parameters

api_key : str | None FRED API key. If None, reads from FRED_API_KEY env variable.

get_series(series_id, start=None, end=None)

Fetch a single FRED series.

Parameters

series_id : str FRED series ID (e.g., "DGS10") or a shorthand from SERIES dict. start, end : str | None Date strings (e.g., "2020-01-01").

Returns

pd.Series with datetime index.

get_risk_free_rate(maturity='3m', start=None, end=None)

Fetch Treasury yield as a risk-free rate proxy.

Returns annualized rate as a decimal (e.g., 0.05 for 5%).

get_yield_curve(date)

Fetch the Treasury yield curve for a specific date.

Returns a Series indexed by maturity label with annualized yields.

get_vix(start=None, end=None)

Fetch VIX (implied volatility index).

Bloomberg

bloomberg

Bloomberg data provider using the blpapi SDK.

Provides access to Bloomberg Terminal data including: - Historical end-of-day data for equities, fixed income, and FX - Real-time streaming for live portfolio monitoring - Corporate actions: dividends, splits, mergers

Requires: Bloomberg Terminal license + pip install blpapi

References

Bloomberg Open API: https://www.bloomberg.com/professional/support/api-library/

BloombergDataSource

Fetch market data from Bloomberg Terminal via blpapi.

Parameters

config : BloombergConfig | None Session parameters. Defaults to localhost:8194. field_map : dict[str, str] | None Bloomberg -> qufin field mapping override. cache : bool Whether to cache downloaded data to parquet via qufin cache layer.

Examples

from qufin.data.bloomberg import BloombergDataSource bbg = BloombergDataSource() prices = bbg.get_historical( ... ["AAPL US Equity", "MSFT US Equity"], ... start="2024-01-01", end="2024-06-30", ... )

close()

Stop the session and clean up subscriptions.

get_historical(tickers, start, end, fields=None, frequency='DAILY', currency=None)

Fetch historical end-of-day data.

Parameters

tickers : list[str] Bloomberg tickers (e.g. ["AAPL US Equity"]). start, end : str Date strings YYYY-MM-DD. fields : list[str] | None Bloomberg fields to request. Defaults to ["PX_LAST"]. frequency : str DAILY, WEEKLY, or MONTHLY. currency : str | None Override currency for cross-asset comparison (e.g. "USD").

Returns

pd.DataFrame Multi-indexed by (date, ticker) or pivoted with tickers as columns when a single field is requested.

Raises

ValueError If tickers list is empty. ConnectionError If the Bloomberg session cannot be established.

get_prices(tickers, start, end, frequency='DAILY')

Fetch closing prices — convenience wrapper around get_historical.

Returns

pd.DataFrame Columns are tickers, index is datetime.

get_returns(tickers, start, end, frequency='DAILY')

Fetch log returns.

Returns

pd.DataFrame Log returns, columns are tickers.

get_reference(tickers, fields)

Fetch current reference (snapshot) data.

Parameters

tickers : list[str] Bloomberg tickers. fields : list[str] Bloomberg fields (e.g. ["PX_LAST", "CUR_MKT_CAP"]).

Returns

pd.DataFrame One row per ticker, columns are normalised field names.

get_dividends(tickers, start, end)

Fetch dividend history for the given tickers.

Parameters

tickers : list[str] Bloomberg tickers. start, end : str Date range YYYY-MM-DD.

Returns

list[CorporateAction]

get_splits(tickers, start, end)

Fetch stock split history.

Parameters

tickers : list[str] Bloomberg tickers. start, end : str Date range YYYY-MM-DD.

Returns

list[CorporateAction]

subscribe(tickers, fields=None, callback=None, interval=0.0)

Subscribe to real-time market data.

Parameters

tickers : list[str] Bloomberg tickers to subscribe to. fields : list[str] | None Fields to stream. Defaults to ["LAST_PRICE", "BID", "ASK"]. callback : callable | None Called with a StreamTick for each incoming update. If None, ticks are logged at DEBUG level. interval : float Minimum interval in seconds between updates (0 = every tick).

Raises

ValueError If tickers list is empty.

unsubscribe_all()

Cancel all active real-time subscriptions.

poll_events(timeout_ms=500, max_events=100)

Poll for streaming events and return collected ticks.

Parameters

timeout_ms : int Timeout per event poll. max_events : int Maximum number of events to process in this call.

Returns

list[StreamTick]

BloombergConfig dataclass

Configuration for Bloomberg session.

Parameters

host : str Bloomberg server host. port : int Bloomberg server port. timeout_ms : int Session start timeout in milliseconds. max_pending : int Maximum pending requests.

BloombergSession

Manages a blpapi session lifecycle.

Parameters

config : BloombergConfig | None Session configuration. Uses defaults if None.

connected property

Whether the session is currently active.

start()

Open a Bloomberg session and open the reference data service.

Raises

ConnectionError If the session cannot be started or the service cannot be opened.

stop()

Close the Bloomberg session.

normalize_fields(raw, field_map=None)

Translate Bloomberg field names to qufin internal names.

Parameters

raw : dict Raw Bloomberg key-value pairs. field_map : dict | None Custom mapping; defaults to FIELD_MAP.

Returns

dict with translated keys. Unmapped keys are passed through lower-cased.

normalize_dataframe(df, field_map=None)

Rename DataFrame columns from Bloomberg to qufin names.

Parameters

df : pd.DataFrame DataFrame with Bloomberg column names. field_map : dict | None Custom mapping; defaults to FIELD_MAP.

Returns

pd.DataFrame with renamed columns.

Refinitiv / LSEG

refinitiv

Refinitiv/LSEG data provider using the Eikon Data API.

Fetches equities, fixed income, and derivatives data from the Refinitiv Eikon / LSEG Workspace platform. An alternative to Bloomberg for firms in the LSEG ecosystem.

Requires: pip install eikon (or refinitiv-data) Requires an active Eikon / Workspace session or an API proxy.

References

Eikon Data API: https://developers.lseg.com/en/api-catalog/eikon/eikon-data-api

RefinitivDataSource

Fetch market data from Refinitiv Eikon / LSEG Workspace.

Parameters

config : RefinitivConfig | None Connection configuration. When None, a default config is used and the app_key must be supplied separately via :meth:set_app_key.

set_app_key(app_key)

Set (or reset) the Eikon application key at runtime.

get_equity_prices(rics, start, end, interval='daily')

Fetch historical equity close prices.

Parameters

rics : list[str] Reuters Instrument Codes, e.g. ["AAPL.O", "MSFT.O"]. start, end : str ISO date strings ("2023-01-01"). interval : {"daily", "weekly", "monthly"} Bar frequency.

Returns

TimeSeriesResult

get_equity_ohlcv(rics, start, end, interval='daily')

Fetch historical OHLCV bars for equities.

Parameters

rics : list[str] Reuters Instrument Codes. start, end : str ISO date strings. interval : {"daily", "weekly", "monthly"} Bar frequency.

Returns

TimeSeriesResult

get_equity_returns(rics, start, end, frequency='D')

Compute log returns from equity close prices.

Parameters

rics : list[str] Reuters Instrument Codes. start, end : str ISO date strings. frequency : {"D", "W", "M"} Return frequency.

Returns

pd.DataFrame Log returns with datetime index, one column per RIC.

get_bond_data(rics, fields=None)

Fetch current bond / fixed-income snapshot data.

Parameters

rics : list[str] Bond RICs, e.g. ["US10YT=RR", "DE10YT=RR"]. fields : list[str] | None Eikon field names. Defaults to :attr:FIXED_INCOME_FIELDS.

Returns

SnapshotResult

get_yield_curve(currency='USD', rics=None)

Fetch government bond yields across maturities.

Parameters

currency : str "USD", "EUR", "GBP", etc. Used to pick default RICs. rics : list[str] | None Explicit RIC list. Overrides currency defaults.

Returns

pd.DataFrame Columns maturity, bid_yield, ask_yield, mid_yield.

get_option_chain(underlying_ric, fields=None)

Fetch option chain data for a given underlying.

Parameters

underlying_ric : str Underlying RIC, e.g. "AAPL.O". fields : list[str] | None Eikon field names. Defaults to :attr:DERIVATIVES_FIELDS.

Returns

SnapshotResult

get_futures_data(rics, fields=None)

Fetch futures snapshot data.

Parameters

rics : list[str] Futures RICs, e.g. ["CLc1", "ESc1"]. fields : list[str] | None Eikon field names. Defaults to settlement, OI, volume.

Returns

SnapshotResult

get_data(rics, fields)

Generic wrapper around ek.get_data.

Parameters

rics : list[str] Instrument RICs. fields : list[str] Eikon TR field names.

Returns

SnapshotResult

get_timeseries(rics, fields, start, end, interval='daily')

Generic wrapper around ek.get_timeseries.

Parameters

rics : list[str] Instrument RICs. fields : list[str] Timeseries field names (e.g. ["CLOSE", "VOLUME"]). start, end : str ISO date strings. interval : {"daily", "weekly", "monthly"} Bar frequency.

Returns

TimeSeriesResult

search(query, select=None, top=10)

Search for instruments using Eikon search.

Parameters

query : str Free-text search query. select : str | None Comma-separated Eikon properties to return. top : int Maximum number of results.

Returns

pd.DataFrame

RefinitivConfig dataclass

Configuration for the Refinitiv Eikon connection.

Parameters

app_key : str Eikon Data API application key (aka app-key / api-key). timeout : int Request timeout in seconds. Default 30. cache : bool Whether to cache downloaded data to parquet via qufin caching.

TimeSeriesResult dataclass

Container for time-series query results.

Attributes

data : pd.DataFrame The returned price / field data. rics : list[str] RIC codes that were queried. fields : list[str] Eikon field names returned. metadata : dict[str, Any] Extra metadata from the response.

SnapshotResult dataclass

Container for a point-in-time data snapshot.

Attributes

data : pd.DataFrame Snapshot data with RICs as rows. rics : list[str] RIC codes that were queried. fields : list[str] Eikon field names returned.

Real-Time Streaming

streaming

Real-time WebSocket streaming for live market data.

Supports Alpaca, Polygon, and IEX providers. Provides event-driven rebalancing triggers (threshold drift, signal-based regime change) and latency monitoring with heartbeat.

Requires websockets (optional dependency).

PriceStream

Async WebSocket price stream with buffering, rebalancing, and monitoring.

Parameters

config : StreamConfig Connection and subscription settings. rebalance_config : RebalanceConfig | None If provided, enables automatic rebalance triggers. target_weights : dict[str, float] | None Target portfolio weights for drift detection. holdings : dict[str, float] | None Current share holdings for portfolio tracking. on_tick : Callable | None Callback invoked on each parsed tick: on_tick(tick_dict). on_rebalance : Callable | None Callback invoked when a rebalance fires: on_rebalance(current_weights).

connect(max_messages=None) async

Connect to the WebSocket and begin streaming.

Parameters

max_messages : int | None If set, disconnect after this many messages (useful for testing).

stop()

Signal the stream to stop.

StreamConfig dataclass

Configuration for a price stream.

Parameters

provider : Provider Which WebSocket data provider to use. api_key : str API key / token for the provider. tickers : list[str] Symbols to subscribe to. buffer_size : int Maximum number of price ticks to buffer per ticker. heartbeat_interval : float Seconds between heartbeat pings. url_override : str | None Override the default WebSocket URL (useful for testing).

PriceBuffer

Thread-safe (asyncio-safe) ring buffer for price ticks per ticker.

LatencyMonitor dataclass

Tracks message latency and heartbeat health.

is_healthy(heartbeat_interval)

True if we received a heartbeat within 2x the expected interval.

PortfolioTracker

Track live portfolio value from streaming prices.

Parameters

holdings : dict[str, float] Number of shares held per ticker.

RebalanceTrigger

Evaluates whether a rebalance should fire.

Parameters

config : RebalanceConfig Threshold and signal configuration. target_weights : dict[str, float] Target portfolio weights keyed by ticker.

check_drift(current_weights)

Return True if any asset has drifted beyond the threshold.

check_signal()

Return True if the signal callback fires.

should_rebalance(current_weights)

Evaluate all triggers; return True if rebalance is warranted.

RebalanceConfig dataclass

Configuration for rebalance triggers.

Parameters

drift_threshold : float Trigger rebalance when any asset drifts more than this fraction from its target weight (e.g. 0.05 = 5%). signal_callback : Callable | None Optional callback that returns True when a regime-change signal fires (e.g. from a VQC classifier). cooldown : float Minimum seconds between consecutive rebalances.

Parquet Data Warehouse

warehouse

Parquet data warehouse with partitioned storage.

Stores market data in a asset_class/ticker/year.parquet hierarchy. Supports predicate pushdown via PyArrow, automatic compaction, and deduplication on (ticker, date) composite key.

Integrates with :mod:qufin.data.cache to check the warehouse before making API calls.

Requires pyarrow (optional dependency).

ParquetWarehouse

Partitioned Parquet data warehouse.

Data is stored as::

root_dir/
  asset_class/
    ticker/
      2023.parquet
      2024.parquet
Parameters

config : WarehouseConfig | None Warehouse configuration. Defaults to WarehouseConfig().

write(df, asset_class, ticker)

Write a DataFrame partitioned by year.

The DataFrame must have a column matching config.date_column. Data is deduplicated on (ticker, date) before writing.

Parameters

df : pd.DataFrame Market data to store. asset_class : str Asset class partition key (e.g. "equity", "option"). ticker : str Ticker symbol.

Returns

list[Path] Paths of written parquet files.

read(asset_class, ticker, start_date=None, end_date=None, columns=None)

Read data with optional date filtering via predicate pushdown.

Parameters

asset_class : str Asset class partition. ticker : str Ticker symbol. start_date, end_date : str | None ISO date strings for filtering. columns : list[str] | None Specific columns to read.

Returns

pd.DataFrame Filtered data.

has_data(asset_class, ticker, year=None)

Check whether data exists for a given partition.

list_tickers(asset_class)

List all tickers stored under an asset class.

list_years(asset_class, ticker)

List available years for a ticker.

compact(asset_class, ticker)

Merge small fragment files within a partition.

If a year's directory contains multiple parquet fragment files (from incremental writes stored outside the year-naming scheme), this method consolidates them.

Returns the number of files merged.

auto_compact(asset_class=None)

Run compaction on all partitions that exceed the threshold.

Parameters

asset_class : str | None If given, only compact partitions under this asset class.

Returns

int Total number of files merged.

delete(asset_class, ticker, year=None)

Delete stored data.

Parameters

asset_class : str Asset class partition. ticker : str Ticker to delete. year : int | None If given, delete only that year. Otherwise delete all years.

Returns

int Number of files deleted.

WarehouseConfig dataclass

Configuration for the parquet data warehouse.

Parameters

root_dir : Path Root directory for partitioned storage. compaction_threshold : int Merge small files when a partition has more than this many files. dedup_keys : list[str] Columns used for deduplication. date_column : str Name of the date/datetime column in DataFrames.

warehouse_or_fetch(warehouse, asset_class, ticker, start_date, end_date, fetch_fn)

Check warehouse first; if data missing, call fetch_fn and store.

Parameters

warehouse : ParquetWarehouse Warehouse instance. asset_class, ticker, start_date, end_date : str Query parameters. fetch_fn : callable fetch_fn(ticker, start_date, end_date) -> pd.DataFrame

Returns

pd.DataFrame Market data.

Data Quality

quality

Data quality framework: gap detection, outlier flagging, corporate action adjustment, data lineage tracking, and per-ticker quality scoring.

All computations use only numpy and pandas (no extra dependencies).

DataLineage dataclass

Provenance record for a price series.

Parameters

ticker : str Ticker symbol. source : str Data provider name. fetch_timestamp : datetime.datetime When the raw data was obtained (UTC). transformations : list[TransformationStep] Ordered list of transformations applied.

add_step(name, description, parameters=None)

Append a transformation step.

GapReport dataclass

Result of gap detection.

Attributes

missing_dates : list[datetime.date] Trading days with no data. total_expected : int Number of expected trading days. total_present : int Number of trading days actually present. gap_fraction : float Fraction of expected days that are missing.

OutlierReport dataclass

Result of outlier detection.

Attributes

outlier_dates : list[datetime.date] Dates where daily return exceeded the sigma threshold. outlier_returns : list[float] Corresponding return values. sigma_threshold : float Threshold used (number of standard deviations). mean : float Mean of daily returns. std : float Standard deviation of daily returns.

QualityScore dataclass

Per-ticker quality assessment.

Each sub-score is in [0, 1]; overall is their weighted average.

Attributes

ticker : str Ticker symbol. completeness : float 1 - gap_fraction. freshness : float Decays with age of most recent observation. consistency : float 1 - (outlier fraction). overall : float Weighted combination.

detect_gaps(prices, trading_calendar=None)

Detect missing trading days in a price series.

Parameters

prices : pd.Series or pd.DataFrame Price data with a DatetimeIndex. If DataFrame, the index is used (gaps are detected at the row level). trading_calendar : pd.DatetimeIndex, optional Expected trading days. If None, pandas.bdate_range is used.

Returns

GapReport

detect_outliers(prices, sigma_threshold=5.0)

Flag daily returns that exceed sigma_threshold standard deviations.

Parameters

prices : pd.Series Price series with DatetimeIndex. sigma_threshold : float Number of standard deviations beyond which a return is flagged.

Returns

OutlierReport

adjust_for_splits(prices, splits, lineage=None)

Adjust historical prices for stock splits.

Prices before the split date are divided by the cumulative split ratio so that the series is continuous.

Parameters

prices : pd.Series Raw price series (DatetimeIndex). splits : sequence of SplitEvent Split events to apply, in any order. lineage : DataLineage, optional If provided, a transformation step is recorded.

Returns

pd.Series Adjusted price series.

adjust_for_dividends(prices, dividends, lineage=None)

Adjust historical prices for cash dividends.

Prices before the ex-dividend date are reduced by the ratio (close - dividend) / close on the ex-date, so the series is continuous.

Parameters

prices : pd.Series Raw price series (DatetimeIndex). dividends : sequence of DividendEvent Dividend events to apply. lineage : DataLineage, optional If provided, a transformation step is recorded.

Returns

pd.Series Adjusted price series.

compute_quality_score(prices, ticker, reference_date=None, freshness_halflife_days=30, trading_calendar=None, sigma_threshold=5.0, weights=(0.4, 0.3, 0.3))

Compute a composite quality score for a single ticker.

Parameters

prices : pd.Series Price series with DatetimeIndex. ticker : str Ticker symbol. reference_date : datetime.date, optional "Today" for freshness calculation. Defaults to date.today(). freshness_halflife_days : int Half-life in days for exponential freshness decay. trading_calendar : pd.DatetimeIndex, optional Expected trading days for gap detection. sigma_threshold : float Sigma threshold for outlier detection. weights : tuple of 3 floats Weights for (completeness, freshness, consistency).

Returns

QualityScore

compute_quality_scores(prices, reference_date=None, **kwargs)

Compute quality scores for every column in a DataFrame.

Parameters

prices : pd.DataFrame Columns are tickers; index is DatetimeIndex. reference_date : datetime.date, optional Reference date for freshness. **kwargs Forwarded to :func:compute_quality_score.

Returns

dict[str, QualityScore]