# QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
# Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from AlgorithmImports import *
from Portfolio.RiskParityPortfolioOptimizer import RiskParityPortfolioOptimizer
###
### Risk Parity Portfolio Construction Model
###
### Spinu, F. (2013). An algorithm for computing risk parity weights. Available at SSRN 2297383.
### Available at https://papers.ssrn.com/sol3/Papers.cfm?abstract_id=2297383
class RiskParityPortfolioConstructionModel(PortfolioConstructionModel):
def __init__(self,
rebalance = Resolution.DAILY,
portfolio_bias = PortfolioBias.LONG_SHORT,
lookback = 1,
period = 252,
resolution = Resolution.DAILY,
optimizer = None):
"""Initialize the model
Args:
rebalance: Rebalancing parameter. If it is a timedelta, date rules or Resolution, it will be converted into a function.
If None will be ignored.
The function returns the next expected rebalance time for a given algorithm UTC DateTime.
The function returns null if unknown, in which case the function will be called again in the
next loop. Returning current time will trigger rebalance.
portfolio_bias: Specifies the bias of the portfolio (Short, Long/Short, Long)
lookback(int): Historical return lookback period
period(int): The time interval of history price to calculate the weight
resolution: The resolution of the history price
optimizer(class): Method used to compute the portfolio weights"""
super().__init__()
if portfolio_bias == PortfolioBias.SHORT:
raise ArgumentException("Long position must be allowed in RiskParityPortfolioConstructionModel.")
self.lookback = lookback
self.period = period
self.resolution = resolution
self.sign = lambda x: -1 if x < 0 else (1 if x > 0 else 0)
self.optimizer = RiskParityPortfolioOptimizer() if optimizer is None else optimizer
self._symbol_data_by_symbol = {}
# If the argument is an instance of Resolution or Timedelta
# Redefine rebalancing_func
rebalancing_func = rebalance
if isinstance(rebalance, int):
rebalance = Extensions.to_time_span(rebalance)
if isinstance(rebalance, timedelta):
rebalancing_func = lambda dt: dt + rebalance
if rebalancing_func:
self.set_rebalancing_func(rebalancing_func)
def determine_target_percent(self, active_insights):
"""Will determine the target percent for each insight
Args:
active_insights: list of active insights
Returns:
dictionary of insight and respective target weight
"""
targets = {}
# If we have no insights just return an empty target list
if len(active_insights) == 0:
return targets
symbols = [insight.symbol for insight in active_insights]
# Create a dictionary keyed by the symbols in the insights with an pandas.series as value to create a data frame
returns = { str(symbol) : data.return_ for symbol, data in self._symbol_data_by_symbol.items() if symbol in symbols }
returns = pd.DataFrame(returns)
# The portfolio optimizer finds the optional weights for the given data
weights = self.optimizer.optimize(returns)
weights = pd.Series(weights, index = returns.columns)
# Create portfolio targets from the specified insights
for insight in active_insights:
targets[insight] = weights[str(insight.symbol)]
return targets
def on_securities_changed(self, algorithm, changes):
'''Event fired each time the we add/remove securities from the data feed
Args:
algorithm: The algorithm instance that experienced the change in securities
changes: The security additions and removals from the algorithm'''
# clean up data for removed securities
super().on_securities_changed(algorithm, changes)
for removed in changes.removed_securities:
symbol_data = self._symbol_data_by_symbol.pop(removed.symbol, None)
symbol_data.reset()
algorithm.unregister_indicator(symbol_data.roc)
# initialize data for added securities
symbols = [ x.symbol for x in changes.added_securities ]
history = algorithm.history(symbols, self.lookback * self.period, self.resolution)
if history.empty: return
tickers = history.index.levels[0]
for ticker in tickers:
symbol = SymbolCache.get_symbol(ticker)
if symbol not in self._symbol_data_by_symbol:
symbol_data = self.RiskParitySymbolData(symbol, self.lookback, self.period)
symbol_data.warm_up_indicators(history.loc[ticker])
self._symbol_data_by_symbol[symbol] = symbol_data
algorithm.register_indicator(symbol, symbol_data.roc, self.resolution)
class RiskParitySymbolData:
'''Contains data specific to a symbol required by this model'''
def __init__(self, symbol, lookback, period):
self._symbol = symbol
self.roc = RateOfChange(f'{symbol}.roc({lookback})', lookback)
self.roc.updated += self.on_rate_of_change_updated
self.window = RollingWindow(period)
def reset(self):
self.roc.updated -= self.on_rate_of_change_updated
self.roc.reset()
self.window.reset()
def warm_up_indicators(self, history):
for tuple in history.itertuples():
self.roc.update(tuple.Index, tuple.close)
def on_rate_of_change_updated(self, roc, value):
if roc.is_ready:
self.window.add(value)
def add(self, time, value):
item = IndicatorDataPoint(self._symbol, time, value)
self.window.add(item)
@property
def return_(self):
return pd.Series(
data = [x.value for x in self.window],
index = [x.end_time for x in self.window])
@property
def is_ready(self):
return self.window.is_ready
def __str__(self, **kwargs):
return '{}: {:.2%}'.format(self.roc.name, self.window[0])