# QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
# Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from AlgorithmImports import *
###
### This algorithm demonstrates the various ways you can call the History function,
### what it returns, and what you can do with the returned values.
###
###
###
###
###
class HistoryAlgorithm(QCAlgorithm):
def initialize(self):
self.set_start_date(2013,10, 8) #Set Start Date
self.set_end_date(2013,10,11) #Set End Date
self.set_cash(100000) #Set Strategy Cash
# Find more symbols here: http://quantconnect.com/data
self.add_equity("SPY", Resolution.DAILY)
IBM = self.add_data(CustomDataEquity, "IBM", Resolution.DAILY)
# specifying the exchange will allow the history methods that accept a number of bars to return to work properly
IBM.Exchange = EquityExchange()
# we can get history in initialize to set up indicators and such
self.daily_sma = SimpleMovingAverage(14)
# get the last calendar year's worth of SPY data at the configured resolution (daily)
trade_bar_history = self.history([self.securities["SPY"].symbol], timedelta(365))
self.assert_history_count("History([\"SPY\"], timedelta(365))", trade_bar_history, 250)
# get the last calendar day's worth of SPY data at the specified resolution
trade_bar_history = self.history(["SPY"], timedelta(1), Resolution.MINUTE)
self.assert_history_count("History([\"SPY\"], timedelta(1), Resolution.MINUTE)", trade_bar_history, 390)
# get the last 14 bars of SPY at the configured resolution (daily)
trade_bar_history = self.history(["SPY"], 14)
self.assert_history_count("History([\"SPY\"], 14)", trade_bar_history, 14)
# get the last 14 minute bars of SPY
trade_bar_history = self.history(["SPY"], 14, Resolution.MINUTE)
self.assert_history_count("History([\"SPY\"], 14, Resolution.MINUTE)", trade_bar_history, 14)
# get the historical data from last current day to this current day in minute resolution
# with Fill Forward and Extended Market options
interval_bar_history = self.history(["SPY"], self.time - timedelta(1), self.time, Resolution.MINUTE, True, True)
self.assert_history_count("History([\"SPY\"], self.time - timedelta(1), self.time, Resolution.MINUTE, True, True)", interval_bar_history, 960)
# get the historical data from last current day to this current day in minute resolution
# with Extended Market option
interval_bar_history = self.history(["SPY"], self.time - timedelta(1), self.time, Resolution.MINUTE, False, True)
self.assert_history_count("History([\"SPY\"], self.time - timedelta(1), self.time, Resolution.MINUTE, False, True)", interval_bar_history, 919)
# get the historical data from last current day to this current day in minute resolution
# with Fill Forward option
interval_bar_history = self.history(["SPY"], self.time - timedelta(1), self.time, Resolution.MINUTE, True, False)
self.assert_history_count("History([\"SPY\"], self.time - timedelta(1), self.time, Resolution.MINUTE, True, False)", interval_bar_history, 390)
# get the historical data from last current day to this current day in minute resolution
interval_bar_history = self.history(["SPY"], self.time - timedelta(1), self.time, Resolution.MINUTE, False, False)
self.assert_history_count("History([\"SPY\"], self.time - timedelta(1), self.time, Resolution.MINUTE, False, False)", interval_bar_history, 390)
# we can loop over the return value from these functions and we get TradeBars
# we can use these TradeBars to initialize indicators or perform other math
for index, trade_bar in trade_bar_history.loc["SPY"].iterrows():
self.daily_sma.update(index, trade_bar["close"])
# get the last calendar year's worth of custom_data data at the configured resolution (daily)
custom_data_history = self.history(CustomDataEquity, "IBM", timedelta(365))
self.assert_history_count("History(CustomDataEquity, \"IBM\", timedelta(365))", custom_data_history, 250)
# get the last 10 bars of IBM at the configured resolution (daily)
custom_data_history = self.history(CustomDataEquity, "IBM", 14)
self.assert_history_count("History(CustomDataEquity, \"IBM\", 14)", custom_data_history, 14)
# we can loop over the return values from these functions and we'll get Custom data
# this can be used in much the same way as the trade_bar_history above
self.daily_sma.reset()
for index, custom_data in custom_data_history.loc["IBM"].iterrows():
self.daily_sma.update(index, custom_data["value"])
# get the last 10 bars worth of Custom data for the specified symbols at the configured resolution (daily)
all_custom_data = self.history(CustomDataEquity, self.securities.keys(), 14)
self.assert_history_count("History(CustomDataEquity, self.securities.keys(), 14)", all_custom_data, 14 * 2)
# NOTE: Using different resolutions require that they are properly implemented in your data type. If your
# custom data source has different resolutions, it would need to be implemented in the GetSource and
# Reader methods properly.
#custom_data_history = self.history(CustomDataEquity, "IBM", timedelta(7), Resolution.MINUTE)
#custom_data_history = self.history(CustomDataEquity, "IBM", 14, Resolution.MINUTE)
#all_custom_data = self.history(CustomDataEquity, timedelta(365), Resolution.MINUTE)
#all_custom_data = self.history(CustomDataEquity, self.securities.keys(), 14, Resolution.MINUTE)
#all_custom_data = self.history(CustomDataEquity, self.securities.keys(), timedelta(1), Resolution.MINUTE)
#all_custom_data = self.history(CustomDataEquity, self.securities.keys(), 14, Resolution.MINUTE)
# get the last calendar year's worth of all custom_data data
all_custom_data = self.history(CustomDataEquity, self.securities.keys(), timedelta(365))
self.assert_history_count("History(CustomDataEquity, self.securities.keys(), timedelta(365))", all_custom_data, 250 * 2)
# we can also access the return value from the multiple symbol functions to request a single
# symbol and then loop over it
single_symbol_custom = all_custom_data.loc["IBM"]
self.assert_history_count("all_custom_data.loc[\"IBM\"]", single_symbol_custom, 250)
for custom_data in single_symbol_custom:
# do something with 'IBM.custom_data_equity' custom_data data
pass
custom_data_spyvalues = all_custom_data.loc["IBM"]["value"]
self.assert_history_count("all_custom_data.loc[\"IBM\"][\"value\"]", custom_data_spyvalues, 250)
for value in custom_data_spyvalues:
# do something with 'IBM.custom_data_equity' value data
pass
def on_data(self, data):
'''on_data event is the primary entry point for your algorithm. Each new data point will be pumped in here.
Arguments:
data: Slice object keyed by symbol containing the stock data
'''
if not self.portfolio.invested:
self.set_holdings("SPY", 1)
def assert_history_count(self, method_call, trade_bar_history, expected):
count = len(trade_bar_history.index)
if count != expected:
raise AssertionError("{} expected {}, but received {}".format(method_call, expected, count))
class CustomDataEquity(PythonData):
def get_source(self, config, date, is_live):
zip_file_name = LeanData.generate_zip_file_name(config.Symbol, date, config.Resolution, config.TickType)
source = Globals.data_folder + "/equity/usa/daily/" + zip_file_name
return SubscriptionDataSource(source)
def reader(self, config, line, date, is_live):
if line == None:
return None
custom_data = CustomDataEquity()
custom_data.symbol = config.symbol
csv = line.split(",")
custom_data.time = datetime.strptime(csv[0], '%Y%m%d %H:%M')
custom_data.end_time = custom_data.time + timedelta(days=1)
custom_data.value = float(csv[1])
return custom_data