/* * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals. * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ using System; using System.Runtime.CompilerServices; using QuantConnect.Data.Market; using Python.Runtime; using QuantConnect.Interfaces; namespace QuantConnect.Data.Consolidators { /// /// Provides a base class for consolidators that emit data based on the passing of a period of time /// or after seeing a max count of data points. /// /// The input type of the consolidator /// The output type of the consolidator public abstract class PeriodCountConsolidatorBase : DataConsolidator where T : IBaseData where TConsolidated : BaseData { // The SecurityIdentifier that we are consolidating for. private SecurityIdentifier _securityIdentifier; private bool _securityIdentifierIsSet; //The number of data updates between creating new bars. private int? _maxCount; // private IPeriodSpecification _periodSpecification; //The minimum timespan between creating new bars. private TimeSpan? _period; //The number of pieces of data we've accumulated since our last emit private int _currentCount; //The working bar used for aggregating the data private TConsolidated _workingBar; //The last time we emitted a consolidated bar private DateTime? _lastEmit; private bool _validateTimeSpan; private PeriodCountConsolidatorBase(IPeriodSpecification periodSpecification) { _periodSpecification = periodSpecification; _period = _periodSpecification.Period; } /// /// Creates a consolidator to produce a new instance representing the period /// /// The minimum span of time before emitting a consolidated bar protected PeriodCountConsolidatorBase(TimeSpan period) : this(new TimeSpanPeriodSpecification(period)) { _period = _periodSpecification.Period; } /// /// Creates a consolidator to produce a new instance representing the last count pieces of data /// /// The number of pieces to accept before emiting a consolidated bar protected PeriodCountConsolidatorBase(int maxCount) : this(new BarCountPeriodSpecification()) { _maxCount = maxCount; } /// /// Creates a consolidator to produce a new instance representing the last count pieces of data or the period, whichever comes first /// /// The number of pieces to accept before emiting a consolidated bar /// The minimum span of time before emitting a consolidated bar protected PeriodCountConsolidatorBase(int maxCount, TimeSpan period) : this(new MixedModePeriodSpecification(period)) { _maxCount = maxCount; _period = _periodSpecification.Period; } /// /// Creates a consolidator to produce a new instance representing the last count pieces of data or the period, whichever comes first /// /// Func that defines the start time of a consolidated data protected PeriodCountConsolidatorBase(Func func) : this(new FuncPeriodSpecification(func)) { _period = Time.OneSecond; } /// /// Creates a consolidator to produce a new instance representing the last count pieces of data or the period, whichever comes first /// /// Python object that defines either a function object that defines the start time of a consolidated data or a timespan protected PeriodCountConsolidatorBase(PyObject pyObject) : this(GetPeriodSpecificationFromPyObject(pyObject)) { } /// /// Gets the type produced by this consolidator /// public override Type OutputType => typeof(TConsolidated); /// /// Gets a clone of the data being currently consolidated /// public override IBaseData WorkingData => _workingBar?.Clone(); /// /// Event handler that fires when a new piece of data is produced. We define this as a 'new' /// event so we can expose it as a instead of a instance /// public new event EventHandler DataConsolidated; /// /// Updates this consolidator with the specified data. This method is /// responsible for raising the DataConsolidated event /// In time span mode, the bar range is closed on the left and open on the right: [T, T+TimeSpan). /// For example, if time span is 1 minute, we have [10:00, 10:01): so data at 10:01 is not /// included in the bar starting at 10:00. /// /// Thrown when multiple symbols are being consolidated. /// The new data for the consolidator public override void Update(T data) { if (!_securityIdentifierIsSet) { _securityIdentifierIsSet = true; _securityIdentifier = data.Symbol.ID; } else if (!data.Symbol.ID.Equals(_securityIdentifier)) { throw new InvalidOperationException($"Consolidators can only be used with a single symbol. The previous consolidated SecurityIdentifier ({_securityIdentifier}) is not the same as in the current data ({data.Symbol.ID})."); } if (!ShouldProcess(data)) { // first allow the base class a chance to filter out data it doesn't want // before we start incrementing counts and what not return; } if (!_validateTimeSpan && _period.HasValue && _periodSpecification is TimeSpanPeriodSpecification) { // only do this check once _validateTimeSpan = true; var dataLength = data.EndTime - data.Time; if (dataLength > _period) { throw new ArgumentException($"For Symbol {data.Symbol} can not consolidate bars of period: {_period}, using data of the same or higher period: {data.EndTime - data.Time}"); } } //Decide to fire the event var fireDataConsolidated = false; // decide to aggregate data before or after firing OnDataConsolidated event // always aggregate before firing in counting mode bool aggregateBeforeFire = _maxCount.HasValue; if (_maxCount.HasValue) { // we're in count mode _currentCount++; if (_currentCount >= _maxCount.Value) { _currentCount = 0; fireDataConsolidated = true; } } if (!_lastEmit.HasValue) { // initialize this value for period computations _lastEmit = IsTimeBased ? DateTime.MinValue : data.Time; } if (_period.HasValue) { // we're in time span mode and initialized if (_workingBar != null && data.Time - _workingBar.Time >= _period.Value && GetRoundedBarTime(data) > _lastEmit) { fireDataConsolidated = true; } // special case: always aggregate before event trigger when TimeSpan is zero if (_period.Value == TimeSpan.Zero) { fireDataConsolidated = true; aggregateBeforeFire = true; } } if (aggregateBeforeFire) { if (data.Time >= _lastEmit) { AggregateBar(ref _workingBar, data); } } //Fire the event if (fireDataConsolidated) { var workingTradeBar = _workingBar as TradeBar; if (workingTradeBar != null) { // we kind of are cheating here... if (_period.HasValue) { workingTradeBar.Period = _period.Value; } // since trade bar has period it aggregates this properly else if (!(data is TradeBar)) { workingTradeBar.Period = data.Time - _lastEmit.Value; } } // Set _lastEmit first because OnDataConsolidated will set _workingBar to null _lastEmit = IsTimeBased && _workingBar != null ? _workingBar.EndTime : data.Time; OnDataConsolidated(_workingBar); } if (!aggregateBeforeFire) { if (data.Time >= _lastEmit) { AggregateBar(ref _workingBar, data); } } } /// /// Scans this consolidator to see if it should emit a bar due to time passing /// /// The current time in the local time zone (same as ) public override void Scan(DateTime currentLocalTime) { if (_workingBar != null && _period.HasValue && _period.Value != TimeSpan.Zero && currentLocalTime - _workingBar.Time >= _period.Value && GetRoundedBarTime(currentLocalTime) > _lastEmit) { _lastEmit = _workingBar.EndTime; OnDataConsolidated(_workingBar); } } /// /// Resets the consolidator /// public override void Reset() { base.Reset(); _securityIdentifier = null; _securityIdentifierIsSet = false; _currentCount = 0; _workingBar = null; _lastEmit = null; _validateTimeSpan = false; } /// /// Returns true if this consolidator is time-based, false otherwise /// protected bool IsTimeBased => !_maxCount.HasValue; /// /// Gets the time period for this consolidator /// protected TimeSpan? Period => _period; /// /// Determines whether or not the specified data should be processed /// /// The data to check /// True if the consolidator should process this data, false otherwise protected virtual bool ShouldProcess(T data) => true; /// /// Aggregates the new 'data' into the 'workingBar'. The 'workingBar' will be /// null following the event firing /// /// The bar we're building, null if the event was just fired and we're starting a new consolidated bar /// The new data protected abstract void AggregateBar(ref TConsolidated workingBar, T data); /// /// Gets a rounded-down bar time. Called by AggregateBar in derived classes. /// /// The bar time to be rounded down /// The rounded bar time [MethodImpl(MethodImplOptions.AggressiveInlining)] protected DateTime GetRoundedBarTime(DateTime time) { var startTime = _periodSpecification.GetRoundedBarTime(time); // In the case of a new bar, define the period defined at opening time if (_workingBar == null) { _period = _periodSpecification.Period; } return startTime; } /// /// Gets a rounded-down bar start time. Called by AggregateBar in derived classes. /// /// The input data point /// The rounded bar start time [MethodImpl(MethodImplOptions.AggressiveInlining)] protected DateTime GetRoundedBarTime(IBaseData inputData) { var potentialStartTime = GetRoundedBarTime(inputData.Time); if (_period.HasValue && potentialStartTime + _period < inputData.EndTime) { // US equity hour bars from the database starts at 9am but the exchange opens at 9:30am. Thus, the method // GetRoundedBarTime(inputData.Time) returns the market open of the previous day, which is not consistent // with the given end time. For that reason we need to handle this case specifically, by calling // GetRoundedBarTime(inputData.EndTime) as it will return our expected start time: 9:30am if (inputData.EndTime - inputData.Time == Time.OneHour && potentialStartTime.Date < inputData.Time.Date) { potentialStartTime = GetRoundedBarTime(inputData.EndTime); } else { // whops! the end time we were giving is beyond our potential end time, so let's use the giving bars star time instead potentialStartTime = inputData.Time; } } return potentialStartTime; } /// /// Event invocator for the event /// /// The consolidated data protected virtual void OnDataConsolidated(TConsolidated e) { base.OnDataConsolidated(e); DataConsolidated?.Invoke(this, e); _workingBar = null; } /// /// Gets the period specification from the PyObject that can either represent a function object that defines the start time of a consolidated data or a timespan. /// /// Python object that defines either a function object that defines the start time of a consolidated data or a timespan /// IPeriodSpecification that represents the PyObject private static IPeriodSpecification GetPeriodSpecificationFromPyObject(PyObject pyObject) { Func expiryFunc; if (pyObject.TryConvertToDelegate(out expiryFunc)) { return new FuncPeriodSpecification(expiryFunc); } using (Py.GIL()) { return new TimeSpanPeriodSpecification(pyObject.As()); } } /// /// Distinguishes between the different ways a consolidated data start time can be specified /// private interface IPeriodSpecification { TimeSpan? Period { get; } DateTime GetRoundedBarTime(DateTime time); } /// /// User defined the bars period using a counter /// private class BarCountPeriodSpecification : IPeriodSpecification { public TimeSpan? Period { get; } = null; public DateTime GetRoundedBarTime(DateTime time) => time; } /// /// User defined the bars period using a counter and a period (mixed mode) /// private class MixedModePeriodSpecification : IPeriodSpecification { public TimeSpan? Period { get; } public MixedModePeriodSpecification(TimeSpan period) { Period = period; } public DateTime GetRoundedBarTime(DateTime time) => time; } /// /// User defined the bars period using a time span /// private class TimeSpanPeriodSpecification : IPeriodSpecification { public TimeSpan? Period { get; } public TimeSpanPeriodSpecification(TimeSpan period) { Period = period; } public DateTime GetRoundedBarTime(DateTime time) => Period.Value > Time.OneDay ? time // #4915 For periods larger than a day, don't use a rounding schedule. : time.RoundDown(Period.Value); } /// /// Special case for bars where the open time is defined by a function. /// We assert on construction that the function returns a date time in the past or equal to the given time instant. /// private class FuncPeriodSpecification : IPeriodSpecification { private static readonly DateTime _verificationDate = new DateTime(2022, 01, 03, 10, 10, 10); public TimeSpan? Period { get; private set; } public readonly Func _calendarInfoFunc; public FuncPeriodSpecification(Func expiryFunc) { if (expiryFunc(_verificationDate).Start > _verificationDate) { throw new ArgumentException($"{nameof(FuncPeriodSpecification)}: Please use a function that computes the start of the bar associated with the given date time. Should never return a time later than the one passed in."); } _calendarInfoFunc = expiryFunc; } public DateTime GetRoundedBarTime(DateTime time) { var calendarInfo = _calendarInfoFunc(time); Period = calendarInfo.Period; return calendarInfo.Start; } } } }