/*
* QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
* Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using QuantConnect.Data;
using System.ComponentModel;
using QuantConnect.Interfaces;
using System.Collections.Generic;
using QuantConnect.Lean.Engine.DataFeeds.Transport;
using QuantConnect.Algorithm;
namespace QuantConnect.Lean.Engine.DataFeeds
{
///
/// A base class for implementations of the
///
public abstract class BaseSubscriptionDataSourceReader : ISubscriptionDataSourceReader
{
///
/// True if we're in live mode, false for backtesting
///
protected bool IsLiveMode { get; }
///
/// The data cache provider to use
///
protected IDataCacheProvider DataCacheProvider { get; }
///
/// The object store to use
///
protected IObjectStore ObjectStore { get; }
///
/// Event fired when the specified source is considered invalid, this may
/// be from a missing file or failure to download a remote source
///
public event EventHandler InvalidSource;
///
/// Creates a new instance
///
protected BaseSubscriptionDataSourceReader(IDataCacheProvider dataCacheProvider, bool isLiveMode, IObjectStore objectStore)
{
DataCacheProvider = dataCacheProvider;
IsLiveMode = isLiveMode;
ObjectStore = objectStore;
}
///
/// Reads the specified
///
/// The source to be read
/// An that contains the data in the source
public abstract IEnumerable Read(SubscriptionDataSource source);
///
/// Creates a new for the specified
///
/// The source to produce an for
/// A new instance of to read the source, or null if there was an error
protected IStreamReader CreateStreamReader(SubscriptionDataSource subscriptionDataSource)
{
IStreamReader reader = null;
try
{
switch (subscriptionDataSource.TransportMedium)
{
case SubscriptionTransportMedium.LocalFile:
reader = new LocalFileSubscriptionStreamReader(DataCacheProvider, subscriptionDataSource.Source);
break;
case SubscriptionTransportMedium.RemoteFile:
reader = HandleRemoteSourceFile(subscriptionDataSource);
break;
case SubscriptionTransportMedium.Rest:
reader = new RestSubscriptionStreamReader(subscriptionDataSource.Source, subscriptionDataSource.Headers, IsLiveMode);
break;
case SubscriptionTransportMedium.ObjectStore:
reader = new ObjectStoreSubscriptionStreamReader(ObjectStore, subscriptionDataSource.Source);
break;
default:
throw new InvalidEnumArgumentException("Unexpected SubscriptionTransportMedium specified: " + subscriptionDataSource.TransportMedium);
}
}
catch (Exception e)
{
OnInvalidSource(subscriptionDataSource, e);
return reader;
}
if (reader == null || reader.EndOfStream)
{
OnInvalidSource(subscriptionDataSource, new Exception($"The reader was empty for source: ${subscriptionDataSource.Source}"));
return null;
}
return reader;
}
///
/// Event invocator for the event
///
/// The that was invalid
/// The exception if one was raised, otherwise null
protected void OnInvalidSource(SubscriptionDataSource source, Exception exception)
{
InvalidSource?.Invoke(this, new InvalidSourceEventArgs(source, exception));
}
///
/// Opens up an IStreamReader for a remote file source
///
private IStreamReader HandleRemoteSourceFile(SubscriptionDataSource source)
{
SubscriptionDataSourceReader.CheckRemoteFileCache();
try
{
// this will fire up a web client in order to download the 'source' file to the cache
return new RemoteFileSubscriptionStreamReader(DataCacheProvider, source.Source, Globals.Cache, source.Headers);
}
catch (Exception)
{
return null;
}
}
}
}