/*
* QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
* Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.IO;
using System.Linq;
using System.Threading;
using QuantConnect.Data.Consolidators;
using QuantConnect.Data.Market;
using QuantConnect.Logging;
using QuantConnect.Util;
namespace QuantConnect.ToolBox.AlgoSeekFuturesConverter
{
///
/// Processor for caching and consolidating ticks;
/// then flushing the ticks in memory to disk when triggered.
///
public class AlgoSeekFuturesProcessor
{
static private int _curFileCount = 0;
private string _zipPath;
private string _entryPath;
private Symbol _symbol;
private TickType _tickType;
private Resolution _resolution;
private LazyStreamWriter _streamWriter;
private string _dataDirectory;
private IDataConsolidator _consolidator;
private DateTime _referenceDate;
private static string[] _windowsRestrictedNames =
{
"con", "prn", "aux", "nul"
};
///
/// Zip entry name for the futures contract
///
public string EntryPath
{
get
{
if (_entryPath == null)
{
_entryPath = SafeName(LeanData.GenerateZipEntryName(_symbol, _referenceDate, _resolution, _tickType));
}
return _entryPath;
}
set { _entryPath = value; }
}
///
/// Zip file path for the futures contract collection
///
public string ZipPath
{
get
{
if (_zipPath == null)
{
_zipPath = Path.Combine(_dataDirectory, SafeName(LeanData.GenerateRelativeZipFilePath(Safe(_symbol), _referenceDate, _resolution, _tickType).Replace(".zip", string.Empty))) + ".zip";
}
return _zipPath;
}
set { _zipPath = value; }
}
///
/// Public access to the processor symbol
///
public Symbol Symbol
{
get { return _symbol; }
}
///
/// Accessor for the final enumerator
///
public Resolution Resolution
{
get { return _resolution; }
}
///
/// Type of this futures processor.
/// ASOP's are grouped trade type for file writing.
///
public TickType TickType
{
get { return _tickType; }
set { _tickType = value; }
}
///
/// If no data has been consolidated, do not write to disk
///
public bool ShouldWriteToDisk()
{
return _consolidator.Consolidated != null;
}
///
/// Create a new AlgoSeekFuturesProcessor for enquing consolidated bars and flushing them to disk
///
/// Symbol for the processor
/// Reference date for the processor
/// TradeBar or QuoteBar to generate
/// Resolution to consolidate
/// Data directory for LEAN
public AlgoSeekFuturesProcessor(Symbol symbol, DateTime date, TickType tickType, Resolution resolution, string dataDirectory)
{
_symbol = Safe(symbol);
_tickType = tickType;
_referenceDate = date;
_resolution = resolution;
_dataDirectory = dataDirectory;
// Setup the consolidator for the requested resolution
if (resolution == Resolution.Tick)
{
_consolidator = new IdentityDataConsolidator();
}
else
{
switch (tickType)
{
case TickType.Trade:
_consolidator = new TickConsolidator(resolution.ToTimeSpan());
break;
case TickType.Quote:
_consolidator = new TickQuoteBarConsolidator(resolution.ToTimeSpan());
break;
case TickType.OpenInterest:
_consolidator = new OpenInterestConsolidator(resolution.ToTimeSpan());
break;
}
}
var path = ZipPath.Replace(".zip", string.Empty);
Directory.CreateDirectory(path);
var file = Path.Combine(path, EntryPath);
try
{
_streamWriter = new LazyStreamWriter(file);
}
catch (Exception err)
{
// we are unable to open new file - it is already opened due to bug in algoseek data
Log.Error("File: {0} Err: {1} Source: {2} Stack: {3}", file, err.Message, err.Source, err.StackTrace);
var newRandomizedName = (file + "-" + Math.Abs(file.GetHashCode()).ToStringInvariant()).Replace(".csv", string.Empty) + ".csv";
// we store the information under different (randomized) name
Log.Trace("Changing name from {0} to {1}", file, newRandomizedName);
_streamWriter = new LazyStreamWriter(newRandomizedName);
}
// On consolidating the bars put the bar into a queue in memory to be written to disk later.
_consolidator.DataConsolidated += (sender, consolidated) =>
{
_streamWriter.WriteLine(LeanData.GenerateLine(consolidated, SecurityType.Future, Resolution));
};
Interlocked.Add(ref _curFileCount, 1);
if (_curFileCount % 1000 == 0)
{
Log.Trace("Opened more files: {0}", _curFileCount);
}
}
///
/// Process the tick; add to the con
///
///
public void Process(Tick data)
{
if (data.TickType != _tickType)
{
return;
}
_consolidator.Update(data);
}
///
/// Write the in memory queues to the disk.
///
/// Current foremost tick time
/// Indicates is this is the final push to disk at the end of the data
public void FlushBuffer(DateTime frontierTime, bool finalFlush)
{
//Force the consolidation if time has past the bar
_consolidator.Scan(frontierTime);
// If this is the final packet dump it to the queue
if (finalFlush)
{
if (_consolidator.WorkingData != null)
{
_streamWriter.WriteLine(LeanData.GenerateLine(_consolidator.WorkingData, SecurityType.Future, Resolution));
}
_streamWriter.Flush();
_streamWriter.Close();
_streamWriter = null;
Interlocked.Add(ref _curFileCount, -1);
if (_curFileCount % 1000 == 0)
{
Log.Trace("Closed some files: {0}", _curFileCount);
}
}
}
///
/// Add filtering to safe check the symbol for windows environments
///
/// Symbol to rename if required
/// Renamed symbol for reserved names
private static Symbol Safe(Symbol symbol)
{
if (OS.IsWindows)
{
if (_windowsRestrictedNames.Contains(symbol.Value.ToLowerInvariant()))
{
symbol = Symbol.CreateFuture(SafeName(symbol.Underlying.Value), symbol.ID.Market, symbol.ID.Date);
}
}
return symbol;
}
private static string SafeName(string fileName)
{
if (OS.IsWindows)
{
foreach (var name in _windowsRestrictedNames)
{
// The 'con' restricted filename will corrupt the 'seCONed' filepath
var restrictedFilePath = Path.DirectorySeparatorChar + name;
var safeFilePath = Path.DirectorySeparatorChar + "_" + name;
fileName = fileName.Replace(restrictedFilePath, safeFilePath);
}
}
return fileName;
}
}
}