/*
* QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
* Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using Python.Runtime;
using QuantConnect.Data;
using QuantConnect.Data.Fundamental;
using QuantConnect.Data.Market;
using QuantConnect.Util;
using System;
using System.Collections;
using System.Collections.Generic;
using System.Globalization;
using System.Linq;
using System.Reflection;
namespace QuantConnect.Python
{
///
/// Organizes a list of data to create pandas.DataFrames
///
public partial class PandasData
{
// we keep these so we don't need to ask for them each time
private static PyString _empty;
private static PyObject _pandas;
private static PyObject _pandasColumn;
private static PyObject _seriesFactory;
private static PyObject _dataFrameFactory;
private static PyObject _multiIndexFactory;
private static PyObject _multiIndex;
private static PyObject _indexFactory;
private static PyList _defaultNames;
private static PyList _level1Names;
private static PyList _level2Names;
private static PyList _level3Names;
private readonly static Dictionary> _membersCache = new();
private readonly static MemberInfo _tickLastPriceMember = typeof(Tick).GetProperty(nameof(Tick.LastPrice));
private readonly static MemberInfo _openInterestLastPriceMember = typeof(OpenInterest).GetProperty(nameof(Tick.LastPrice));
private static readonly string[] _nonLeanDataTypeForcedMemberNames = new[] { nameof(BaseData.Value) };
private readonly static string[] _quoteTickOnlyPropertes = new[] {
nameof(Tick.AskPrice),
nameof(Tick.AskSize),
nameof(Tick.BidPrice),
nameof(Tick.BidSize)
};
private static readonly Type PandasNonExpandableAttribute = typeof(PandasNonExpandableAttribute);
private static readonly Type PandasIgnoreAttribute = typeof(PandasIgnoreAttribute);
private static readonly Type PandasIgnoreMembersAttribute = typeof(PandasIgnoreMembersAttribute);
private static readonly IReadOnlyCollection EmptySeriesTimesKey = new List();
private static readonly List EmptyDataTypeMembers = new List();
private readonly Symbol _symbol;
private readonly bool _isFundamentalType;
private readonly bool _isBaseData;
private readonly bool _timeAsColumn;
private readonly Dictionary _series;
private readonly Dictionary> _members = new();
///
/// Gets true if this is a custom data request, false for normal QC data
///
public bool IsCustomData { get; }
///
/// Implied levels of a multi index pandas.Series (depends on the security type)
///
public int Levels { get; } = 2;
///
/// Initializes the static members of the class
///
static PandasData()
{
using (Py.GIL())
{
// Use our PandasMapper class that modifies pandas indexing to support tickers, symbols and SIDs
_pandas = Py.Import("PandasMapper");
_pandasColumn = _pandas.GetAttr("PandasColumn");
_seriesFactory = _pandas.GetAttr("Series");
_dataFrameFactory = _pandas.GetAttr("DataFrame");
_multiIndex = _pandas.GetAttr("MultiIndex");
_multiIndexFactory = _multiIndex.GetAttr("from_tuples");
_indexFactory = _pandas.GetAttr("Index");
_empty = new PyString(string.Empty);
var time = new PyString("time");
var symbol = new PyString("symbol");
var expiry = new PyString("expiry");
_defaultNames = new PyList(new PyObject[] { expiry, new PyString("strike"), new PyString("type"), symbol, time });
_level1Names = new PyList(new PyObject[] { symbol });
_level2Names = new PyList(new PyObject[] { symbol, time });
_level3Names = new PyList(new PyObject[] { expiry, symbol, time });
}
}
///
/// Initializes an instance of
///
public PandasData(object data, bool timeAsColumn = false)
{
_series = new();
var baseData = data as IBaseData;
// in the case we get a list/collection of data we take the first data point to determine the type
// but it's also possible to get a data which supports enumerating we don't care about those cases
if (baseData == null && data is IEnumerable enumerable)
{
foreach (var item in enumerable)
{
data = item;
baseData = data as IBaseData;
break;
}
}
var type = data.GetType();
_isFundamentalType = type == typeof(Fundamental);
_isBaseData = baseData != null;
_timeAsColumn = timeAsColumn && _isBaseData;
_symbol = _isBaseData ? baseData.Symbol : ((ISymbolProvider)data).Symbol;
IsCustomData = Extensions.IsCustomDataType(_symbol, type);
if (baseData == null)
{
Levels = 1;
}
else if (_symbol.SecurityType == SecurityType.Future)
{
Levels = 3;
}
else if (_symbol.SecurityType.IsOption())
{
Levels = 5;
}
}
///
/// Adds security data object to the end of the lists
///
/// object that contains security data
public void Add(object data)
{
Add(data, false);
}
private void Add(object data, bool overrideValues)
{
if (data == null)
{
return;
}
var typeMembers = GetInstanceDataTypeMembers(data);
var endTime = default(DateTime);
if (_isBaseData)
{
endTime = ((IBaseData)data).EndTime;
if (_timeAsColumn)
{
AddToSeries("time", endTime, endTime, overrideValues);
}
}
AddMembersData(data, typeMembers, endTime, overrideValues);
if (data is DynamicData dynamicData)
{
var storage = dynamicData.GetStorageDictionary();
var value = dynamicData.Value;
AddToSeries("value", endTime, value, overrideValues);
foreach (var kvp in storage.Where(x => x.Key != "value"
// if this is a PythonData instance we add in '__typename' which we don't want into the data frame
&& !x.Key.StartsWith("__", StringComparison.InvariantCulture)))
{
AddToSeries(kvp.Key, endTime, kvp.Value, overrideValues);
}
}
}
private void AddMemberToSeries(object instance, DateTime endTime, DataTypeMember member, bool overrideValues)
{
var baseName = (string)null;
var tick = member.IsTickProperty ? instance as Tick : null;
if (tick != null && member.IsTickLastPrice && tick.TickType == TickType.OpenInterest)
{
baseName = "OpenInterest";
}
// TODO field/property.GetValue is expensive
var key = member.GetMemberName(baseName);
var value = member.GetValue(instance);
var memberType = member.GetMemberType();
// For DataDictionary instances, we only want to add the values
if (MemberIsDataDictionary(memberType))
{
value = memberType.GetProperty("Values").GetValue(value);
}
else if (member.IsProperty)
{
if (_isFundamentalType && value is FundamentalTimeDependentProperty timeDependentProperty)
{
value = timeDependentProperty.Clone(new FixedTimeProvider(endTime));
}
else if (member.IsTickProperty && tick != null)
{
if (tick.TickType != TickType.Quote && _quoteTickOnlyPropertes.Contains(member.Member.Name))
{
value = null;
}
else if (member.IsTickLastPrice)
{
var nullValueKey = tick.TickType != TickType.OpenInterest
? member.GetMemberName("OpenInterest")
: member.GetMemberName();
AddToSeries(nullValueKey, endTime, null, overrideValues);
}
}
}
AddToSeries(key, endTime, value, overrideValues);
}
///
/// Adds Lean data objects to the end of the lists
///
/// object that contains trade bar information of the security
/// object that contains quote bar information of the security
public void Add(TradeBar tradeBar, QuoteBar quoteBar)
{
// Quote bar first, so if there is a trade bar, OHLC will be overwritten
Add(quoteBar);
Add(tradeBar, overrideValues: true);
}
///
/// Get the pandas.DataFrame of the current state
///
/// Number of levels of the multi index
/// If false, make sure columns with "missing" values only are still added to the dataframe
/// pandas.DataFrame object
public PyObject ToPandasDataFrame(int levels = 2, bool filterMissingValueColumns = true)
{
using var _ = Py.GIL();
PyObject[] indexTemplate;
// Create the index labels
var names = _defaultNames;
if (levels == 1)
{
names = _level1Names;
indexTemplate = GetIndexTemplate(_symbol);
}
else if (levels == 2)
{
// symbol, time
names = _level2Names;
indexTemplate = GetIndexTemplate(_symbol, null);
}
else if (levels == 3)
{
// expiry, symbol, time
names = _level3Names;
indexTemplate = GetIndexTemplate(_symbol.ID.Date, _symbol, null);
}
else
{
if (_symbol.SecurityType == SecurityType.Future)
{
indexTemplate = GetIndexTemplate(_symbol.ID.Date, null, null, _symbol, null);
}
else if (_symbol.SecurityType.IsOption())
{
indexTemplate = GetIndexTemplate(_symbol.ID.Date, _symbol.ID.StrikePrice, _symbol.ID.OptionRight, _symbol, null);
}
else
{
indexTemplate = GetIndexTemplate(null, null, null, _symbol, null);
}
}
names = new PyList(names.SkipLast(names.Count() > 1 && _timeAsColumn ? 1 : 0).ToArray());
// creating the pandas MultiIndex is expensive so we keep a cash
var indexCache = new Dictionary, PyObject>(new ListComparer());
// Returns a dictionary keyed by column name where values are pandas.Series objects
using var pyDict = new PyDict();
foreach (var (seriesName, serie) in _series)
{
if (filterMissingValueColumns && serie.ShouldFilter) continue;
var key = serie.Times ?? EmptySeriesTimesKey;
if (!indexCache.TryGetValue(key, out var index))
{
PyList indexSource;
if (_timeAsColumn)
{
indexSource = serie.Values.Select(_ => CreateIndexSourceValue(DateTime.MinValue, indexTemplate)).ToPyListUnSafe();
}
else
{
indexSource = serie.Times.Select(time => CreateIndexSourceValue(time, indexTemplate)).ToPyListUnSafe();
}
if (indexTemplate.Length == 1)
{
using var nameDic = Py.kw("name", names[0]);
index = _indexFactory.Invoke(new[] { indexSource }, nameDic);
}
else
{
using var namesDic = Py.kw("names", names);
index = _multiIndexFactory.Invoke(new[] { indexSource }, namesDic);
}
indexCache[key] = index;
foreach (var pyObject in indexSource)
{
pyObject.Dispose();
}
indexSource.Dispose();
}
// Adds pandas.Series value keyed by the column name
using var pyvalues = new PyList();
for (var i = 0; i < serie.Values.Count; i++)
{
using var pyObject = serie.Values[i].ToPython();
pyvalues.Append(pyObject);
}
using var series = _seriesFactory.Invoke(pyvalues, index);
using var pyStrKey = seriesName.ToPython();
using var pyKey = _pandasColumn.Invoke(pyStrKey);
pyDict.SetItem(pyKey, series);
}
_series.Clear();
foreach (var kvp in indexCache)
{
kvp.Value.Dispose();
}
for (var i = 0; i < indexTemplate.Length; i++)
{
DisposeIfNotEmpty(indexTemplate[i]);
}
names.Dispose();
// Create the DataFrame
var result = _dataFrameFactory.Invoke(pyDict);
foreach (var item in pyDict)
{
item.Dispose();
}
return result;
}
///
/// Helper method to create a single pandas data frame indexed by symbol
///
/// Will add a single point per pandas data series (symbol)
public static PyObject ToPandasDataFrame(IEnumerable pandasDatas, bool skipTimesColumn = false)
{
using var _ = Py.GIL();
using var list = pandasDatas.Select(x => x._symbol).ToPyListUnSafe();
using var namesDic = Py.kw("name", _level1Names[0]);
using var index = _indexFactory.Invoke(new[] { list }, namesDic);
var valuesPerSeries = new Dictionary();
var seriesToSkip = new Dictionary();
foreach (var pandasData in pandasDatas)
{
foreach (var kvp in pandasData._series)
{
if (skipTimesColumn && kvp.Key == "time")
{
continue;
}
if (seriesToSkip.ContainsKey(kvp.Key))
{
seriesToSkip[kvp.Key] &= kvp.Value.ShouldFilter;
}
else
{
seriesToSkip[kvp.Key] = kvp.Value.ShouldFilter;
}
if (!valuesPerSeries.TryGetValue(kvp.Key, out PyList value))
{
// Adds pandas.Series value keyed by the column name
value = valuesPerSeries[kvp.Key] = new PyList();
}
if (kvp.Value.Values.Count > 0)
{
// taking only 1 value per symbol
using var valueOfSymbol = kvp.Value.Values[0].ToPython();
value.Append(valueOfSymbol);
}
else
{
value.Append(PyObject.None);
}
}
}
using var pyDict = new PyDict();
foreach (var kvp in valuesPerSeries)
{
if (seriesToSkip.TryGetValue(kvp.Key, out var skip) && skip)
{
continue;
}
using var series = _seriesFactory.Invoke(kvp.Value, index);
using var pyStrKey = kvp.Key.ToPython();
using var pyKey = _pandasColumn.Invoke(pyStrKey);
pyDict.SetItem(pyKey, series);
kvp.Value.Dispose();
}
var result = _dataFrameFactory.Invoke(pyDict);
// Drop columns with only NaN or None values
using var dropnaKwargs = Py.kw("axis", 1, "inplace", true, "how", "all");
result.GetAttr("dropna").Invoke(Array.Empty(), dropnaKwargs);
return result;
}
private List GetInstanceDataTypeMembers(object data)
{
var type = data.GetType();
if (!_members.TryGetValue(type, out var members))
{
HashSet columnNames;
if (data is DynamicData dynamicData)
{
columnNames = (data as DynamicData)?.GetStorageDictionary()
// if this is a PythonData instance we add in '__typename' which we don't want into the data frame
.Where(x => !x.Key.StartsWith("__", StringComparison.InvariantCulture)).ToHashSet(x => x.Key);
columnNames.Add("value");
members = EmptyDataTypeMembers;
}
else
{
members = GetTypeMembers(type);
columnNames = members.SelectMany(x => x.GetMemberNames()).ToHashSet();
// We add openinterest key so the series is created: open interest tick LastPrice is renamed to OpenInterest
if (data is Tick)
{
columnNames.Add("openinterest");
}
}
_members[type] = members;
if (_timeAsColumn)
{
columnNames.Add("time");
}
foreach (var columnName in columnNames)
{
_series.TryAdd(columnName, new Serie(withTimeIndex: !_timeAsColumn));
}
}
return members;
}
///
/// Gets or create/adds the instances corresponding to the members of the given type,
/// and returns the names of the members.
///
private List GetTypeMembers(Type type)
{
List typeMembers;
lock (_membersCache)
{
if (!_membersCache.TryGetValue(type, out typeMembers))
{
var forcedInclusionMembers = LeanData.IsCommonLeanDataType(type)
? Array.Empty()
: _nonLeanDataTypeForcedMemberNames;
typeMembers = GetDataTypeMembers(type, forcedInclusionMembers).ToList();
_membersCache[type] = typeMembers;
}
}
_members[type] = typeMembers;
return typeMembers;
}
///
/// Gets the instances corresponding to the members of the given type.
/// It will try to unwrap properties which types are classes unless they are marked either to be ignored or to be added as a whole
///
private static IEnumerable GetDataTypeMembers(Type type, string[] forcedInclusionMembers)
{
var members = type
.GetMembers(BindingFlags.Instance | BindingFlags.Public)
.Where(x => x.MemberType == MemberTypes.Field || x.MemberType == MemberTypes.Property)
.Where(x => forcedInclusionMembers.Contains(x.Name)
|| (!x.IsDefined(PandasIgnoreAttribute) && !x.DeclaringType.IsDefined(PandasIgnoreMembersAttribute)));
return members
.Select(member =>
{
var dataTypeMember = CreateDataTypeMember(member);
var memberType = dataTypeMember.GetMemberType();
// Should we unpack its properties into columns?
if (memberType.IsClass
&& (memberType.Namespace == null
// We only expand members of types in the QuantConnect namespace,
// else we might be expanding types like System.String, NodaTime.DateTimeZone or any other external types
|| (memberType.Namespace.StartsWith("QuantConnect.", StringComparison.InvariantCulture)
&& !memberType.IsDefined(PandasNonExpandableAttribute)
&& !member.IsDefined(PandasNonExpandableAttribute))))
{
dataTypeMember = CreateDataTypeMember(member, GetDataTypeMembers(memberType, forcedInclusionMembers).ToArray());
}
return (memberType, dataTypeMember);
})
// Check if there are multiple properties/fields of the same type,
// in which case we add the property/field name as prefix for the inner members to avoid name conflicts
.GroupBy(x => x.memberType, x => x.dataTypeMember)
.SelectMany(grouping =>
{
var typeProperties = grouping.ToList();
if (typeProperties.Count > 1)
{
var propertiesToExpand = typeProperties.Where(x => x.ShouldBeUnwrapped).ToList();
if (propertiesToExpand.Count > 1)
{
foreach (var property in propertiesToExpand)
{
property.SetPrefix();
}
}
}
return typeProperties;
});
}
///
/// Adds the member value to the corresponding series, making sure unwrapped values a properly added
/// by checking the children members and adding their values to their own series
///
private void AddMembersData(object instance, IEnumerable members, DateTime endTime, bool overrideValues)
{
foreach (var member in members)
{
if (!member.ShouldBeUnwrapped)
{
AddMemberToSeries(instance, endTime, member, overrideValues);
}
else
{
var memberValue = member.GetValue(instance);
if (memberValue != null)
{
AddMembersData(memberValue, member.Children, endTime, overrideValues);
}
}
}
}
///
/// Only dipose of the PyObject if it was set to something different than empty
///
private static void DisposeIfNotEmpty(PyObject pyObject)
{
if (!ReferenceEquals(pyObject, _empty))
{
pyObject.Dispose();
}
}
private static bool MemberIsDataDictionary(Type memberType)
{
while (memberType != null && !memberType.IsValueType)
{
if (memberType.IsGenericType && memberType.GetGenericTypeDefinition() == typeof(DataDictionary<>))
{
return true;
}
memberType = memberType.BaseType;
}
return false;
}
private PyObject[] GetIndexTemplate(params object[] args)
{
return args.SkipLast(args.Length > 1 && _timeAsColumn ? 1 : 0).Select(x => x?.ToPython() ?? _empty).ToArray();
}
///
/// Create a new tuple index
///
private PyObject CreateIndexSourceValue(DateTime index, PyObject[] list)
{
if (!_timeAsColumn && list.Length > 1)
{
DisposeIfNotEmpty(list[^1]);
list[^1] = index.ToPython();
}
if (list.Length > 1)
{
return new PyTuple(list.ToArray());
}
return list[0].ToPython();
}
///
/// Adds data to dictionary
///
/// The key of the value to get
/// object to add to the value associated with the specific key
/// to add to the value associated with the specific key. Can be null.
private void AddToSeries(string key, DateTime time, object input, bool overrideValues)
{
if (!_series.TryGetValue(key, out var serie))
{
throw new ArgumentException($"PandasData.AddToSeries(): {Messages.PandasData.KeyNotFoundInSeries(key)}");
}
serie.Add(time, input, overrideValues);
}
private class Serie
{
private static readonly IFormatProvider InvariantCulture = CultureInfo.InvariantCulture;
public bool ShouldFilter { get; private set; }
public List Times { get; }
public List