/* * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals. * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ using System; using NodaTime; using QuantConnect.Data; using System.Collections; using System.Collections.Generic; namespace QuantConnect.Lean.Engine.DataFeeds.Enumerators { /// /// This enumerator will filter out data of the underlying enumerator based on a provided schedule. /// Will respect the schedule above the data, meaning will let older data through if the underlying provides none for the schedule date /// public class ScheduledEnumerator : IEnumerator { private readonly IEnumerator _underlyingEnumerator; private readonly IEnumerator _scheduledTimes; private readonly ITimeProvider _frontierTimeProvider; private readonly DateTimeZone _scheduleTimeZone; private BaseData _underlyingCandidateDataPoint; private bool _scheduledTimesEnded; /// /// The current data point /// public BaseData Current { get; private set; } object IEnumerator.Current => Current; /// /// Creates a new instance /// /// The underlying enumerator to filter /// The scheduled times to emit new data points /// /// /// the underlying request start time public ScheduledEnumerator(IEnumerator underlyingEnumerator, IEnumerable scheduledTimes, ITimeProvider frontierTimeProvider, DateTimeZone scheduleTimeZone, DateTime startTime) { _scheduleTimeZone = scheduleTimeZone; _frontierTimeProvider = frontierTimeProvider; _underlyingEnumerator = underlyingEnumerator; _scheduledTimes = scheduledTimes.GetEnumerator(); // move our schedule enumerator to current start time MoveScheduleForward(startTime); } /// /// Advances the enumerator to the next element of the collection. /// /// True if the enumerator was successfully advanced to the next element; /// false if the enumerator has passed the end of the collection. /// public bool MoveNext() { if (_scheduledTimesEnded) { Current = null; return false; } // lets get our candidate data point to emit if (_underlyingCandidateDataPoint == null) { if (_underlyingEnumerator.Current != null && _underlyingEnumerator.Current.EndTime <= _scheduledTimes.Current) { _underlyingCandidateDataPoint = _underlyingEnumerator.Current; } else if (Current != null) { // we will keep the last data point, even if we already emitted it, there could be a case where the user has a schedule in a // period where there's not new data (or it's far in the future) so let's just FF the previous point _underlyingCandidateDataPoint = Current.Clone(fillForward: true); } } // lets try to get a better candidate if (_underlyingEnumerator.Current == null || _underlyingEnumerator.Current.EndTime < _scheduledTimes.Current) { bool pullAgain; do { pullAgain = false; if (!_underlyingEnumerator.MoveNext()) { if (_underlyingCandidateDataPoint != null) { // if we still have a candidate wait till we emit him before stopping break; } Current = null; return false; } if (_underlyingEnumerator.Current != null) { if (_underlyingEnumerator.Current.EndTime <= _scheduledTimes.Current) { // lets try again pullAgain = true; // we got another data point which is a newer candidate to emit so let use it instead // and drop the previous _underlyingCandidateDataPoint = _underlyingEnumerator.Current; } else if (_underlyingCandidateDataPoint == null) { // this is the first data point we got and it's After our schedule, let's move our schedule forward _underlyingCandidateDataPoint = _underlyingEnumerator.Current; MoveScheduleForward(); } } } while (pullAgain); } if (_underlyingCandidateDataPoint != null // if we are at or past the schedule time we try to emit, in backtest this emits right away, since time is data driven, in live though // we don't emit right away because the underlying might provide us with a newer data point && _scheduledTimes.Current.ConvertToUtc(_scheduleTimeZone) <= GetUtcNow()) { Current = _underlyingCandidateDataPoint; // we align the data endtime with the schedule, we respect the schedule above the data time. In backtesting, // time is driven by the data, so let's make sure we emit at the scheduled time even if the data is older Current.EndTime = _scheduledTimes.Current; if (Current.Time > Current.EndTime) { Current.Time = _scheduledTimes.Current; } MoveScheduleForward(); _underlyingCandidateDataPoint = null; return true; } Current = null; return true; } /// /// Resets the underlying enumerator /// public void Reset() { _underlyingEnumerator.Reset(); } /// /// Disposes of the underlying enumerator /// public void Dispose() { _scheduledTimes.Dispose(); _underlyingEnumerator.Dispose(); } /// /// Available in live trading only, in backtesting frontier is driven and sycned already by the data itself /// so we can't hold data here based on it /// private DateTime GetUtcNow() { if (_frontierTimeProvider != null) { return _frontierTimeProvider.GetUtcNow(); } return DateTime.MaxValue; } private void MoveScheduleForward(DateTime? frontier = null) { do { _scheduledTimesEnded = !_scheduledTimes.MoveNext(); } while (!_scheduledTimesEnded && frontier.HasValue && _scheduledTimes.Current < frontier.Value); } } }