/* * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals. * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using QuantConnect.Data; using QuantConnect.Data.Market; using QuantConnect.Logging; using System; using System.Collections.Generic; using System.Globalization; using System.IO; using System.IO.Compression; using System.Linq; using ZipEntry = Ionic.Zip.ZipEntry; namespace QuantConnect.ToolBox.KaikoDataConverter { /// /// Decompress single entry from Kaiko crypto raw data. /// public class KaikoDataReader { private Symbol _symbol; private TickType _tickType; /// /// Initializes a new instance of the class. /// /// The symbol. /// Type of the tick. public KaikoDataReader(Symbol symbol, TickType tickType) { _symbol = symbol; _tickType = tickType; } /// /// Gets the ticks from Kaiko file zip entry. /// /// The zip entry. /// public IEnumerable GetTicksFromZipEntry(ZipEntry zipEntry) { var rawData = GetRawDataStreamFromEntry(zipEntry); return _tickType == TickType.Trade ? ParseKaikoTradeFile(rawData) : ParseKaikoQuoteFile(rawData); } /// /// Gets the raw data from entry. /// /// The zip entry. /// IEnumerable with the zip entry content. private IEnumerable GetRawDataStreamFromEntry(ZipEntry zipEntry) { using (var outerStream = new StreamReader(zipEntry.OpenReader())) using (var innerStream = new GZipStream(outerStream.BaseStream, CompressionMode.Decompress)) using (var outputStream = new StreamReader(innerStream)) { string line; while ((line = outputStream.ReadLine()) != null) { yield return line; } } } /// /// Parse order book information for Kaiko data files /// /// The raw data lines. /// /// IEnumerable of ticks representing the Kaiko data /// private IEnumerable ParseKaikoQuoteFile(IEnumerable rawDataLines) { var headerLine = rawDataLines.First(); var headerCsv = headerLine.ToCsv(); var typeColumn = headerCsv.FindIndex(x => x == "type"); var dateColumn = headerCsv.FindIndex(x => x == "date"); var priceColumn = headerCsv.FindIndex(x => x == "price"); var quantityColumn = headerCsv.FindIndex(x => x == "amount"); long currentEpoch = 0; var currentEpochTicks = new List(); foreach (var line in rawDataLines.Skip(1)) { if (line == null || string.IsNullOrEmpty(line)) continue; var lineParts = line.Split(','); var tickEpoch = Parse.Long(lineParts[dateColumn]); decimal quantity; decimal price; try { quantity = ParseScientificNotationToDecimal(lineParts, quantityColumn); price = ParseScientificNotationToDecimal(lineParts, priceColumn); } catch (Exception ex) { Log.Error($"KaikoDataConverter.ParseKaikoQuoteFile(): Raw data corrupted. Line {string.Join(" ", lineParts)}, Exception {ex}"); continue; } var currentTick = new KaikoTick { TickType = TickType.Quote, Time = Time.UnixMillisecondTimeStampToDateTime(tickEpoch), Quantity = quantity, Value = price, OrderDirection = lineParts[typeColumn] }; if (currentEpoch != tickEpoch) { var quoteTick = CreateQuoteTick(Time.UnixMillisecondTimeStampToDateTime(currentEpoch), currentEpochTicks); if (quoteTick != null) yield return quoteTick; currentEpochTicks.Clear(); currentEpoch = tickEpoch; } currentEpochTicks.Add(currentTick); } } /// /// Take a minute snapshot of order book information and make a single Lean quote tick /// /// The data being processed /// The snapshot of bid/ask Kaiko data /// A single Lean quote tick private Tick CreateQuoteTick(DateTime date, List currentEpcohTicks) { // lowest ask var bestAsk = currentEpcohTicks.Where(x => x.OrderDirection == "a") .OrderBy(x => x.Value) .FirstOrDefault(); // highest bid var bestBid = currentEpcohTicks.Where(x => x.OrderDirection == "b") .OrderByDescending(x => x.Value) .FirstOrDefault(); if (bestAsk == null && bestBid == null) { // Did not have enough data to create a tick return null; } var tick = new Tick() { Symbol = _symbol, Time = date, TickType = TickType.Quote }; if (bestBid != null) { tick.BidPrice = bestBid.Price; tick.BidSize = bestBid.Quantity; } if (bestAsk != null) { tick.AskPrice = bestAsk.Price; tick.AskSize = bestAsk.Quantity; } return tick; } /// /// Parse a kaiko trade file /// /// The path to the unzipped file /// Lean Ticks in the Kaiko file private IEnumerable ParseKaikoTradeFile(IEnumerable rawDataLines) { var headerLine = rawDataLines.First(); var headerCsv = headerLine.ToCsv(); var dateColumn = headerCsv.FindIndex(x => x == "date"); var priceColumn = headerCsv.FindIndex(x => x == "price"); var quantityColumn = headerCsv.FindIndex(x => x == "amount"); foreach (var line in rawDataLines.Skip(1)) { if (line == null || string.IsNullOrEmpty(line)) continue; var lineParts = line.Split(','); decimal quantity; decimal price; try { quantity = ParseScientificNotationToDecimal(lineParts, quantityColumn); price = ParseScientificNotationToDecimal(lineParts, priceColumn); } catch (Exception ex) { Log.Error($"KaikoDataConverter.ParseKaikoTradeFile(): Raw data corrupted. Line {string.Join(" ", lineParts)}, Exception {ex}"); continue; } yield return new Tick { Symbol = _symbol, TickType = TickType.Trade, Time = Time.UnixMillisecondTimeStampToDateTime(Parse.Long(lineParts[dateColumn])), Quantity = quantity, Value = price }; } } /// /// Parse the quantity field of the kaiko ticks - can sometimes be expressed in scientific notation /// /// The line from the Kaiko file /// The index of the quantity column /// The quantity as a decimal private static decimal ParseScientificNotationToDecimal(string[] lineParts, int column) { var value = lineParts[column]; if (value.Contains('e', StringComparison.InvariantCulture)) { return Parse.Decimal(value, NumberStyles.Float); } return lineParts[column].ConvertInvariant(); } /// /// Simple class to add order direction to Tick /// used for aggregating Kaiko order book snapshots /// private class KaikoTick : Tick { public string OrderDirection { get; set; } } } }