# QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals. # Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from AlgorithmImports import * ### ### In this algorithm, we fetch a list of tickers with corresponding dates from a file on Dropbox. ### We then create a fine fundamental universe which contains those symbols on their respective dates.### ### ### ### ### class DropboxCoarseFineAlgorithm(QCAlgorithm): def initialize(self): self.set_start_date(2019, 9, 23) # Set Start Date self.set_end_date(2019, 9, 30) # Set End Date self.set_cash(100000) # Set Strategy Cash self.add_universe(self.select_coarse, self.select_fine) self.universe_data = None self.next_update = datetime(1, 1, 1) # Minimum datetime self.url = "https://www.dropbox.com/s/x2sb9gaiicc6hm3/tickers_with_dates.csv?dl=1" def on_end_of_day(self, symbol: Symbol) -> None: self.debug(f"{self.time.date()} {symbol.value} with Market Cap: ${self.securities[symbol].fundamentals.market_cap}") def select_coarse(self, coarse): return self.get_symbols() def select_fine(self, fine): symbols = self.get_symbols() # Return symbols from our list which have a market capitalization of at least 10B return [f.symbol for f in fine if f.market_cap > 1e10 and f.symbol in symbols] def get_symbols(self): # In live trading update every 12 hours if self.live_mode: if self.time < self.next_update: # Return today's row return self.universe_data[self.time.date()] # When updating set the new reset time. self.next_update = self.time + timedelta(hours=12) self.universe_data = self.parse(self.url) # In backtest load once if not set, then just use the dates. if not self.universe_data: self.universe_data = self.parse(self.url) # Check if contains the row we need if self.time.date() not in self.universe_data: return Universe.UNCHANGED return self.universe_data[self.time.date()] def parse(self, url): # Download file from url as string file = self.download(url).split("\n") # # Remove formatting characters data = [x.replace("\r", "").replace(" ", "") for x in file] # # Split data by date and symbol split_data = [x.split(",") for x in data] # Dictionary to hold list of active symbols for each date, keyed by date symbols_by_date = {} # Parse data into dictionary for arr in split_data: date = datetime.strptime(arr[0], "%Y%m%d").date() symbols = [Symbol.create(ticker, SecurityType.EQUITY, Market.USA) for ticker in arr[1:]] symbols_by_date[date] = symbols return symbols_by_date def on_securities_changed(self, changes): self.log(f"Added Securities: {[security.symbol.value for security in changes.added_securities]}")