# QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
# Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from AlgorithmImports import *
###
### Example structure for structuring an algorithm with indicator and consolidator data for many tickers.
###
###
###
###
###
class MultipleSymbolConsolidationAlgorithm(QCAlgorithm):
# Initialise the data and resolution required, as well as the cash and start-end dates for your algorithm. All algorithms must initialized.
def initialize(self) -> None:
# This is the period of bars we'll be creating
bar_period = TimeSpan.from_minutes(10)
# This is the period of our sma indicators
sma_period = 10
# This is the number of consolidated bars we'll hold in symbol data for reference
rolling_window_size = 10
# Holds all of our data keyed by each symbol
self._data = {}
# Contains all of our equity symbols
equity_symbols = ["AAPL","SPY","IBM"]
# Contains all of our forex symbols
forex_symbols = ["EURUSD", "USDJPY", "EURGBP", "EURCHF", "USDCAD", "USDCHF", "AUDUSD","NZDUSD"]
self.set_start_date(2014, 12, 1)
self.set_end_date(2015, 2, 1)
# initialize our equity data
for symbol in equity_symbols:
equity = self.add_equity(symbol)
self._data[symbol] = SymbolData(equity.symbol, bar_period, rolling_window_size)
# initialize our forex data
for symbol in forex_symbols:
forex = self.add_forex(symbol)
self._data[symbol] = SymbolData(forex.symbol, bar_period, rolling_window_size)
# loop through all our symbols and request data subscriptions and initialize indicator
for symbol, symbol_data in self._data.items():
# define the indicator
symbol_data.sma = SimpleMovingAverage(self.create_indicator_name(symbol, "sma" + str(sma_period), Resolution.MINUTE), sma_period)
# define a consolidator to consolidate data for this symbol on the requested period
if symbol_data._symbol.security_type == SecurityType.EQUITY:
tb_consolidator = TradeBarConsolidator(bar_period)
tb_consolidator.data_consolidated += self.on_trade_bar_consolidated
# we need to add this consolidator so it gets auto updates
self.subscription_manager.add_consolidator(symbol_data._symbol, tb_consolidator)
else:
qb_consolidator = QuoteBarConsolidator(bar_period)
qb_consolidator.data_consolidated += self.on_quote_bar_consolidated
# we need to add this consolidator so it gets auto updates
self.subscription_manager.add_consolidator(symbol_data._symbol, qb_consolidator)
def on_trade_bar_consolidated(self, sender: object, bar: TradeBar) -> None:
self._on_data_consolidated(sender, bar)
def on_quote_bar_consolidated(self, sender: object, bar: QuoteBar) -> None:
self._on_data_consolidated(sender, bar)
def _on_data_consolidated(self, sender: object, bar: TradeBar | QuoteBar) -> None:
self._data[bar.symbol.value].sma.update(bar.time, bar.close)
self._data[bar.symbol.value].bars.add(bar)
# OnData event is the primary entry point for your algorithm. Each new data point will be pumped in here.
# Argument "data": Slice object, dictionary object with your stock data
def on_data(self, data: Slice) -> None:
# loop through each symbol in our structure
for symbol in self._data.keys():
symbol_data = self._data[symbol]
# this check proves that this symbol was JUST updated prior to this OnData function being called
if symbol_data.is_ready() and symbol_data.was_just_updated(self.time):
if not self.portfolio[symbol].invested:
self.market_order(symbol, 1)
# End of a trading day event handler. This method is called at the end of the algorithm day (or multiple times if trading multiple assets).
# Method is called 10 minutes before closing to allow user to close out position.
def on_end_of_day(self, symbol: Symbol) -> None:
i = 0
for symbol_key in sorted(self._data.keys()):
symbol_data = self._data[symbol_key]
# we have too many symbols to plot them all, so plot every other
i += 1
if symbol_data.is_ready() and i%2 == 0:
self.plot(symbol_key, symbol_key, symbol_data.sma.current.value)
class SymbolData(object):
def __init__(self, symbol: Symbol, bar_period: timedelta, window_size: int) -> None:
self._symbol = symbol
# The period used when population the Bars rolling window
self.bar_period = bar_period
# A rolling window of data, data needs to be pumped into Bars by using Bars.update( trade_bar ) and can be accessed like:
# my_symbol_data.bars[0] - most first recent piece of data
# my_symbol_data.bars[5] - the sixth most recent piece of data (zero based indexing)
self.bars = RollingWindow(window_size)
# The simple moving average indicator for our symbol
self.sma = None
# Returns true if all the data in this instance is ready (indicators, rolling windows, ect...)
def is_ready(self) -> bool:
return self.bars.is_ready and self.sma.is_ready
# Returns true if the most recent trade bar time matches the current time minus the bar's period, this
# indicates that update was just called on this instance
def was_just_updated(self, current: datetime) -> bool:
return self.bars.count > 0 and self.bars[0].time == current - self.bar_period