/* * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals. * Lean Algorithmic Trading Engine v2.0. Copyright 2024 QuantConnect Corporation. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using System; using NodaTime; using QuantConnect.Util; using QuantConnect.Securities; using QuantConnect.Data.Market; using QuantConnect.Data.Consolidators; namespace QuantConnect.Data.Common { /// /// Consolidator for open markets bar only, extended hours bar are not consolidated. /// public class MarketHourAwareConsolidator : IDataConsolidator { private readonly bool _dailyStrictEndTimeEnabled; private readonly bool _extendedMarketHours; private bool _useStrictEndTime; /// /// The consolidation period requested /// protected TimeSpan Period { get; } /// /// The consolidator instance /// protected IDataConsolidator Consolidator { get; } /// /// The associated security exchange hours instance /// protected SecurityExchangeHours ExchangeHours { get; set; } /// /// The associated data time zone /// protected DateTimeZone DataTimeZone { get; set; } /// /// Gets the most recently consolidated piece of data. This will be null if this consolidator /// has not produced any data yet. /// public IBaseData Consolidated => Consolidator.Consolidated; /// /// Gets the type consumed by this consolidator /// public Type InputType => Consolidator.InputType; /// /// Gets a clone of the data being currently consolidated /// public IBaseData WorkingData => Consolidator.WorkingData; /// /// Gets the type produced by this consolidator /// public Type OutputType => Consolidator.OutputType; /// /// Initializes a new instance of the class. /// /// The resolution. /// The target data type /// The target tick type /// True if extended market hours should be consolidated public MarketHourAwareConsolidator(bool dailyStrictEndTimeEnabled, Resolution resolution, Type dataType, TickType tickType, bool extendedMarketHours) { _dailyStrictEndTimeEnabled = dailyStrictEndTimeEnabled; Period = resolution.ToTimeSpan(); _extendedMarketHours = extendedMarketHours; if (dataType == typeof(Tick)) { if (tickType == TickType.Trade) { Consolidator = resolution == Resolution.Daily ? new TickConsolidator(DailyStrictEndTime) : new TickConsolidator(Period); } else { Consolidator = resolution == Resolution.Daily ? new TickQuoteBarConsolidator(DailyStrictEndTime) : new TickQuoteBarConsolidator(Period); } } else if (dataType == typeof(TradeBar)) { Consolidator = resolution == Resolution.Daily ? new TradeBarConsolidator(DailyStrictEndTime) : new TradeBarConsolidator(Period); } else if (dataType == typeof(QuoteBar)) { Consolidator = resolution == Resolution.Daily ? new QuoteBarConsolidator(DailyStrictEndTime) : new QuoteBarConsolidator(Period); } else { throw new ArgumentNullException(nameof(dataType), $"{dataType.Name} not supported"); } Consolidator.DataConsolidated += ForwardConsolidatedBar; } /// /// Event handler that fires when a new piece of data is produced /// public event DataConsolidatedHandler DataConsolidated; /// /// Updates this consolidator with the specified data /// /// The new data for the consolidator public virtual void Update(IBaseData data) { Initialize(data); // US equity hour data from the database starts at 9am but the exchange opens at 9:30am. Thus, we need to handle // this case specifically to avoid skipping the first hourly bar. To avoid this, we assert the period is daily, // the data resolution is hour and the exchange opens at any point in time over the data.Time to data.EndTime interval if (_extendedMarketHours || ExchangeHours.IsOpen(data.Time, false) || (Period == Time.OneDay && (data.EndTime - data.Time == Time.OneHour) && ExchangeHours.IsOpen(data.Time, data.EndTime, false))) { Consolidator.Update(data); } } /// /// Scans this consolidator to see if it should emit a bar due to time passing /// /// The current time in the local time zone (same as ) public void Scan(DateTime currentLocalTime) { Consolidator.Scan(currentLocalTime); } /// /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources. /// public void Dispose() { Consolidator.DataConsolidated -= ForwardConsolidatedBar; Consolidator.Dispose(); } /// /// Resets the consolidator /// public void Reset() { _useStrictEndTime = false; ExchangeHours = null; DataTimeZone = null; Consolidator.Reset(); } /// /// Perform late initialization based on the datas symbol /// protected void Initialize(IBaseData data) { if (ExchangeHours == null) { var symbol = data.Symbol; var marketHoursDatabase = MarketHoursDatabase.FromDataFolder(); ExchangeHours = marketHoursDatabase.GetExchangeHours(symbol.ID.Market, symbol, symbol.SecurityType); DataTimeZone = marketHoursDatabase.GetDataTimeZone(symbol.ID.Market, symbol, symbol.SecurityType); _useStrictEndTime = UseStrictEndTime(data.Symbol); } } /// /// Determines a bar start time and period /// protected virtual CalendarInfo DailyStrictEndTime(DateTime dateTime) { if (!_useStrictEndTime) { return new (Period > Time.OneDay ? dateTime : dateTime.RoundDown(Period), Period); } return LeanData.GetDailyCalendar(dateTime, ExchangeHours, _extendedMarketHours); } /// /// Useful for testing /// protected virtual bool UseStrictEndTime(Symbol symbol) { return LeanData.UseStrictEndTime(_dailyStrictEndTimeEnabled, symbol, Period, ExchangeHours); } /// /// Will forward the underlying consolidated bar to consumers on this object /// protected virtual void ForwardConsolidatedBar(object sender, IBaseData consolidated) { DataConsolidated?.Invoke(this, consolidated); } } }