/* * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals. * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using System; using System.Collections.Generic; using System.Linq; using QuantConnect.Data; using QuantConnect.Data.Consolidators; using QuantConnect.Data.Market; namespace QuantConnect.ToolBox { /// /// Specifies a piece of processing that should be performed against a source file /// public interface IDataProcessor : IDisposable { /// /// Invoked for each piece of data from the source file /// /// The data to be processed void Process(IBaseData data); } /// /// Provides methods for creating data processor stacks /// public static class DataProcessor { /// /// Creates a new data processor that will filter in input data before piping it into the specified processor /// public static IDataProcessor FilteredBy(this IDataProcessor processor, Func predicate) { return new FilteredDataProcessor(processor, predicate); } /// /// Creates a data processor that will aggregate and zip the requested resolutions of data /// public static IDataProcessor Zip(string dataDirectory, IEnumerable resolutions, TickType tickType, bool sourceIsTick) { var set = resolutions.ToHashSet(); var root = new PipeDataProcessor(); // only filter tick sources var stack = !sourceIsTick ? root : (IDataProcessor) new FilteredDataProcessor(root, x => ((Tick) x).TickType == tickType); if (set.Contains(Resolution.Tick)) { // tick is filtered via trade/quote var tick = new CsvDataProcessor(dataDirectory, Resolution.Tick, tickType); root.PipeTo(tick); } if (set.Contains(Resolution.Second)) { root = AddResolution(dataDirectory, tickType, root, Resolution.Second, sourceIsTick); sourceIsTick = false; } if (set.Contains(Resolution.Minute)) { root = AddResolution(dataDirectory, tickType, root, Resolution.Minute, sourceIsTick); sourceIsTick = false; } if (set.Contains(Resolution.Hour)) { root = AddResolution(dataDirectory, tickType, root, Resolution.Hour, sourceIsTick); sourceIsTick = false; } if (set.Contains(Resolution.Daily)) { AddResolution(dataDirectory, tickType, root, Resolution.Daily, sourceIsTick); } return stack; } private static PipeDataProcessor AddResolution(string dataDirectory, TickType tickType, PipeDataProcessor root, Resolution resolution, bool sourceIsTick) { var second = new CsvDataProcessor(dataDirectory, resolution, tickType); var secondRoot = new PipeDataProcessor(second); var aggregator = new ConsolidatorDataProcessor(secondRoot, data => CreateConsolidator(resolution, tickType, data, sourceIsTick)); root.PipeTo(aggregator); return secondRoot; } private static IDataConsolidator CreateConsolidator(Resolution resolution, TickType tickType, IBaseData data, bool sourceIsTick) { var securityType = data.Symbol.ID.SecurityType; switch (securityType) { case SecurityType.Base: case SecurityType.Equity: case SecurityType.Cfd: case SecurityType.Forex: return new TickConsolidator(resolution.ToTimeSpan()); case SecurityType.Option: if (tickType == TickType.Trade) { return sourceIsTick ? new TickConsolidator(resolution.ToTimeSpan()) : (IDataConsolidator) new TradeBarConsolidator(resolution.ToTimeSpan()); } if (tickType == TickType.Quote) { return sourceIsTick ? new TickQuoteBarConsolidator(resolution.ToTimeSpan()) : (IDataConsolidator) new QuoteBarConsolidator(resolution.ToTimeSpan()); } break; } throw new NotImplementedException("Consolidator creation is not defined for " + securityType + " " + tickType); } } }