/*
* QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
* Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
using System;
using QuantConnect.Data;
using QuantConnect.Util;
using QuantConnect.Interfaces;
using System.Collections.Generic;
namespace QuantConnect.Lean.Engine.DataFeeds
{
///
/// This implementation supports
/// the and types.
/// Handles the layer of indirection for the index data source and forwards
/// the target source to the corresponding
///
public class IndexSubscriptionDataSourceReader : BaseSubscriptionDataSourceReader
{
private readonly SubscriptionDataConfig _config;
private readonly DateTime _date;
private IDataProvider _dataProvider;
private readonly IndexedBaseData _factory;
///
/// Creates a new instance of this
///
public IndexSubscriptionDataSourceReader(IDataCacheProvider dataCacheProvider,
SubscriptionDataConfig config,
DateTime date,
bool isLiveMode,
IDataProvider dataProvider,
IObjectStore objectStore)
: base(dataCacheProvider, isLiveMode, objectStore)
{
_config = config;
_date = date;
_dataProvider = dataProvider;
_factory = config.Type.GetBaseDataInstance() as IndexedBaseData;
if (_factory == null)
{
throw new ArgumentException($"{nameof(IndexSubscriptionDataSourceReader)} should be used" +
$"with a data type which implements {nameof(IndexedBaseData)}");
}
}
///
/// Reads the specified
///
/// The source to be read
/// An that contains the data in the source
public override IEnumerable Read(SubscriptionDataSource source)
{
// handles zip or text files
using (var reader = CreateStreamReader(source))
{
// if the reader doesn't have data then we're done with this subscription
if (reader == null || reader.EndOfStream)
{
OnInvalidSource(source, new Exception($"The reader was empty for source: ${source.Source}"));
yield break;
}
// while the reader has data
while (!reader.EndOfStream)
{
// read a line and pass it to the base data factory
var line = reader.ReadLine();
if (line.IsNullOrEmpty())
{
continue;
}
SubscriptionDataSource dataSource;
try
{
dataSource = _factory.GetSourceForAnIndex(_config, _date, line, IsLiveMode);
}
catch
{
OnInvalidSource(source, new Exception("Factory.GetSourceForAnIndex() failed to return a valid source"));
yield break;
}
if (dataSource != null)
{
var dataReader = SubscriptionDataSourceReader.ForSource(
dataSource,
DataCacheProvider,
_config,
_date,
IsLiveMode,
_factory,
_dataProvider,
ObjectStore);
var enumerator = dataReader.Read(dataSource).GetEnumerator();
while (enumerator.MoveNext())
{
yield return enumerator.Current;
}
enumerator.DisposeSafely();
}
}
}
}
}
}