/* * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals. * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using System; using QuantConnect.Data; using System.ComponentModel; using QuantConnect.Interfaces; using System.Collections.Generic; using QuantConnect.Lean.Engine.DataFeeds.Transport; using QuantConnect.Algorithm; namespace QuantConnect.Lean.Engine.DataFeeds { /// /// A base class for implementations of the /// public abstract class BaseSubscriptionDataSourceReader : ISubscriptionDataSourceReader { /// /// True if we're in live mode, false for backtesting /// protected bool IsLiveMode { get; } /// /// The data cache provider to use /// protected IDataCacheProvider DataCacheProvider { get; } /// /// The object store to use /// protected IObjectStore ObjectStore { get; } /// /// Event fired when the specified source is considered invalid, this may /// be from a missing file or failure to download a remote source /// public event EventHandler InvalidSource; /// /// Creates a new instance /// protected BaseSubscriptionDataSourceReader(IDataCacheProvider dataCacheProvider, bool isLiveMode, IObjectStore objectStore) { DataCacheProvider = dataCacheProvider; IsLiveMode = isLiveMode; ObjectStore = objectStore; } /// /// Reads the specified /// /// The source to be read /// An that contains the data in the source public abstract IEnumerable Read(SubscriptionDataSource source); /// /// Creates a new for the specified /// /// The source to produce an for /// A new instance of to read the source, or null if there was an error protected IStreamReader CreateStreamReader(SubscriptionDataSource subscriptionDataSource) { IStreamReader reader = null; try { switch (subscriptionDataSource.TransportMedium) { case SubscriptionTransportMedium.LocalFile: reader = new LocalFileSubscriptionStreamReader(DataCacheProvider, subscriptionDataSource.Source); break; case SubscriptionTransportMedium.RemoteFile: reader = HandleRemoteSourceFile(subscriptionDataSource); break; case SubscriptionTransportMedium.Rest: reader = new RestSubscriptionStreamReader(subscriptionDataSource.Source, subscriptionDataSource.Headers, IsLiveMode); break; case SubscriptionTransportMedium.ObjectStore: reader = new ObjectStoreSubscriptionStreamReader(ObjectStore, subscriptionDataSource.Source); break; default: throw new InvalidEnumArgumentException("Unexpected SubscriptionTransportMedium specified: " + subscriptionDataSource.TransportMedium); } } catch (Exception e) { OnInvalidSource(subscriptionDataSource, e); return reader; } if (reader == null || reader.EndOfStream) { OnInvalidSource(subscriptionDataSource, new Exception($"The reader was empty for source: ${subscriptionDataSource.Source}")); return null; } return reader; } /// /// Event invocator for the event /// /// The that was invalid /// The exception if one was raised, otherwise null protected void OnInvalidSource(SubscriptionDataSource source, Exception exception) { InvalidSource?.Invoke(this, new InvalidSourceEventArgs(source, exception)); } /// /// Opens up an IStreamReader for a remote file source /// private IStreamReader HandleRemoteSourceFile(SubscriptionDataSource source) { SubscriptionDataSourceReader.CheckRemoteFileCache(); try { // this will fire up a web client in order to download the 'source' file to the cache return new RemoteFileSubscriptionStreamReader(DataCacheProvider, source.Source, Globals.Cache, source.Headers); } catch (Exception) { return null; } } } }