/*
* QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
* Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Linq;
using QuantConnect.Data;
using QuantConnect.Logging;
using QuantConnect.Interfaces;
using System.Collections.Generic;
using QuantConnect.Data.Fundamental;
using QuantConnect.Data.UniverseSelection;
namespace QuantConnect.Lean.Engine.DataFeeds
{
///
/// Provides an implementations of that uses the
///
/// method to read lines of text from a
///
public class TextSubscriptionDataSourceReader : BaseSubscriptionDataSourceReader
{
private readonly bool _implementsStreamReader;
private readonly DateTime _date;
private BaseData _factory;
private bool _shouldCacheDataPoints;
private static int CacheSize = 100;
private static volatile Dictionary> BaseDataSourceCache = new Dictionary>(100);
private static Queue CacheKeys = new Queue(100);
///
/// The requested subscription configuration
///
protected SubscriptionDataConfig Config { get; set; }
///
/// Event fired when an exception is thrown during a call to
///
///
public event EventHandler ReaderError;
///
/// Initializes a new instance of the class
///
/// This provider caches files if needed
/// The subscription's configuration
/// The date this factory was produced to read data for
/// True if we're in live mode, false for backtesting
/// The object storage for data persistence.
public TextSubscriptionDataSourceReader(IDataCacheProvider dataCacheProvider, SubscriptionDataConfig config, DateTime date, bool isLiveMode,
IObjectStore objectStore)
: base(dataCacheProvider, isLiveMode, objectStore)
{
_date = date;
Config = config;
_shouldCacheDataPoints = !Config.IsCustomData && Config.Resolution >= Resolution.Hour
&& Config.Type != typeof(FineFundamental) && Config.Type != typeof(CoarseFundamental) && Config.Type != typeof(Fundamental)
// don't cache universe data, doesn't make much sense and we don't want to change the symbol of the clone
&& !Config.Type.IsAssignableTo(typeof(BaseDataCollection))
&& !DataCacheProvider.IsDataEphemeral;
_implementsStreamReader = Config.Type.ImplementsStreamReader();
}
///
/// Reads the specified
///
/// The source to be read
/// An that contains the data in the source
public override IEnumerable Read(SubscriptionDataSource source)
{
List cache = null;
_shouldCacheDataPoints = _shouldCacheDataPoints &&
// only cache local files
source.TransportMedium == SubscriptionTransportMedium.LocalFile;
string cacheKey = null;
if (_shouldCacheDataPoints)
{
cacheKey = source.Source + Config.Type;
BaseDataSourceCache.TryGetValue(cacheKey, out cache);
}
if (cache == null)
{
cache = _shouldCacheDataPoints ? new List(30000) : null;
using (var reader = CreateStreamReader(source))
{
if (reader == null)
{
// if the reader doesn't have data then we're done with this subscription
yield break;
}
if (_factory == null)
{
// only create a factory if the stream isn't null
_factory = Config.GetBaseDataInstance();
}
// while the reader has data
while (!reader.EndOfStream)
{
BaseData instance = null;
string line = null;
try
{
if (reader.StreamReader != null && _implementsStreamReader)
{
instance = _factory.Reader(Config, reader.StreamReader, _date, IsLiveMode);
}
else
{
// read a line and pass it to the base data factory
line = reader.ReadLine();
instance = _factory.Reader(Config, line, _date, IsLiveMode);
}
}
catch (Exception err)
{
OnReaderError(line ?? "StreamReader", err);
}
if (instance != null && instance.EndTime != default)
{
if (_shouldCacheDataPoints)
{
cache.Add(instance);
}
else
{
yield return instance;
}
}
else if (reader.ShouldBeRateLimited)
{
yield return instance;
}
}
}
if (!_shouldCacheDataPoints)
{
yield break;
}
lock (CacheKeys)
{
CacheKeys.Enqueue(cacheKey);
// we create a new dictionary, so we don't have to take locks when reading, and add our new item
var newCache = new Dictionary>(BaseDataSourceCache) { [cacheKey] = cache };
if (BaseDataSourceCache.Count > CacheSize)
{
var removeCount = 0;
// we remove a portion of the first in entries
while (++removeCount < (CacheSize / 4))
{
newCache.Remove(CacheKeys.Dequeue());
}
// update the cache instance
BaseDataSourceCache = newCache;
}
else
{
// update the cache instance
BaseDataSourceCache = newCache;
}
}
}
if (cache == null)
{
throw new InvalidOperationException($"Cache should not be null. Key: {cacheKey}");
}
// Find the first data point 10 days (just in case) before the desired date
// and subtract one item (just in case there was a time gap and data.Time is after _date)
var frontier = _date.AddDays(-10);
var index = cache.FindIndex(data => data.Time > frontier);
index = index > 0 ? (index - 1) : 0;
foreach (var data in cache.Skip(index))
{
var clone = data.Clone();
clone.Symbol = Config.Symbol;
yield return clone;
}
}
///
/// Event invocator for the event
///
/// The line that caused the exception
/// The exception that was caught
private void OnReaderError(string line, Exception exception)
{
var handler = ReaderError;
if (handler != null) handler(this, new ReaderErrorEventArgs(line, exception));
}
///
/// Set the cache size to use
///
/// How to size this cache: Take worst case scenario, BTCUSD hour, 60k QuoteBar entries, which are roughly 200 bytes in size -> 11 MB * CacheSize
public static void SetCacheSize(int megaBytesToUse)
{
if (megaBytesToUse != 0)
{
// we take worst case scenario, each entry is 12 MB
CacheSize = megaBytesToUse / 12;
Log.Trace($"TextSubscriptionDataSourceReader.SetCacheSize(): Setting cache size to {CacheSize} items");
}
}
///
/// Will clear the data cache.
/// Used for testing different time zones for the same data set and allow a clean fresh start for each backtest
///
public static void ClearCache()
{
BaseDataSourceCache = new();
}
}
}