/*
* QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
* Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using QuantConnect.Data;
using QuantConnect.Util;
using QuantConnect.Interfaces;
using System.Collections.Generic;
using QuantConnect.Data.UniverseSelection;
namespace QuantConnect.Lean.Engine.DataFeeds
{
///
/// Collection Subscription Factory takes a BaseDataCollection from BaseData factories
/// and yields it one point at a time to the algorithm
///
public class CollectionSubscriptionDataSourceReader : BaseSubscriptionDataSourceReader
{
private readonly DateTime _date;
private readonly BaseData _factory;
private readonly SubscriptionDataConfig _config;
///
/// Initializes a new instance of the class
///
/// Used to cache data for requested from the IDataProvider
/// The subscription's configuration
/// The date this factory was produced to read data for
/// True if we're in live mode, false for backtesting
public CollectionSubscriptionDataSourceReader(IDataCacheProvider dataCacheProvider, SubscriptionDataConfig config, DateTime date, bool isLiveMode, IObjectStore objectStore)
:base(dataCacheProvider, isLiveMode, objectStore)
{
_date = date;
_config = config;
_factory = _config.GetBaseDataInstance();
}
///
/// Event fired when an exception is thrown during a call to
///
///
public event EventHandler ReaderError;
///
/// Reads the specified
///
/// The source to be read
/// An that contains the data in the source
public override IEnumerable Read(SubscriptionDataSource source)
{
SubscriptionDataSourceReader.CheckRemoteFileCache();
IStreamReader reader = null;
try
{
reader = CreateStreamReader(source);
if (reader == null)
{
yield break;
}
var raw = "";
while (!reader.EndOfStream)
{
BaseDataCollection instances = null;
try
{
raw = reader.ReadLine();
var result = _factory.Reader(_config, raw, _date, IsLiveMode);
instances = result as BaseDataCollection;
if (instances == null && !reader.ShouldBeRateLimited)
{
OnInvalidSource(source, new Exception("Reader must generate a BaseDataCollection with the FileFormat.Collection"));
continue;
}
}
catch (Exception err)
{
OnReaderError(raw, err);
if (!reader.ShouldBeRateLimited)
{
continue;
}
}
if (IsLiveMode
// this shouldn't happen, rest reader is the only one to be rate limited
// and in live mode, but just in case...
|| instances == null && reader.ShouldBeRateLimited)
{
// in live trading these data points will be unrolled at the
// 'LiveCustomDataSubscriptionEnumeratorFactory' level
yield return instances;
}
else
{
foreach (var instance in instances.Data)
{
if (instance != null && instance.EndTime != default(DateTime))
{
yield return instance;
}
}
}
}
}
finally
{
reader.DisposeSafely();
}
}
///
/// Event invocator for the event
///
/// The line that caused the exception
/// The exception that was caught
private void OnReaderError(string line, Exception exception)
{
var handler = ReaderError;
if (handler != null) handler(this, new ReaderErrorEventArgs(line, exception));
}
}
}