/*
* QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
* Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Collections.Generic;
using System.Threading;
using QuantConnect.Interfaces;
using QuantConnect.Data.Consolidators;
namespace QuantConnect.Data
{
///
/// Helper class to wrap a consolidator and keep track of the next scan time we should trigger
///
internal class ConsolidatorWrapper : IDisposable
{
// helps us guarantee a deterministic ordering by addition/creation
private static long _counter;
private readonly IDataConsolidator _consolidator;
private readonly LocalTimeKeeper _localTimeKeeper;
private readonly TimeSpan _minimumIncrement;
private readonly ITimeKeeper _timeKeeper;
private readonly long _id;
private TimeSpan? _barSpan;
///
/// True if this consolidator has been removed
///
public bool Disposed { get; private set; }
///
/// The next utc scan time
///
public DateTime UtcScanTime { get; private set; }
///
/// Get enqueue time
///
public ConsolidatorScanPriority Priority => new (UtcScanTime, _id);
///
/// Creates a new instance
///
public ConsolidatorWrapper(IDataConsolidator consolidator, TimeSpan configIncrement, ITimeKeeper timeKeeper, LocalTimeKeeper localTimeKeeper)
{
_id = Interlocked.Increment(ref _counter);
_timeKeeper = timeKeeper;
_consolidator = consolidator;
_localTimeKeeper = localTimeKeeper;
_minimumIncrement = configIncrement < Time.OneSecond ? Time.OneSecond : configIncrement;
_consolidator.DataConsolidated += AdvanceScanTime;
AdvanceScanTime();
}
///
/// Scans the current consolidator
///
public void Scan()
{
_consolidator.Scan(_localTimeKeeper.LocalTime);
// it might not of emitted at all, could happen if we got no data or it's not expected to emit like in a weekend
// but we still need to advance the next scan time
AdvanceScanTime();
}
public void Dispose()
{
Disposed = true;
_consolidator.DataConsolidated -= AdvanceScanTime;
}
///
/// Helper method to set the next scan time
///
private void AdvanceScanTime(object _ = null, IBaseData consolidated = null)
{
if (consolidated == null && UtcScanTime > _timeKeeper.UtcTime)
{
// already set
return;
}
if (_barSpan.HasValue)
{
var reference = _timeKeeper.UtcTime;
if (consolidated != null)
{
reference = consolidated.EndTime.ConvertToUtc(_localTimeKeeper.TimeZone);
}
UtcScanTime = reference + _barSpan.Value;
}
else
{
if (consolidated != null)
{
_barSpan = consolidated.EndTime - consolidated.Time;
if (_barSpan < _minimumIncrement)
{
_barSpan = _minimumIncrement;
}
UtcScanTime = consolidated.EndTime.ConvertToUtc(_localTimeKeeper.TimeZone) + _barSpan.Value;
}
else if (_consolidator.WorkingData == null)
{
// we have no reference
UtcScanTime = _timeKeeper.UtcTime + _minimumIncrement;
}
else
{
var pontetialEndTime = _consolidator.WorkingData.EndTime.ConvertToUtc(_localTimeKeeper.TimeZone);
if (pontetialEndTime > _timeKeeper.UtcTime)
{
UtcScanTime = pontetialEndTime;
}
else
{
UtcScanTime = _timeKeeper.UtcTime + _minimumIncrement;
}
}
}
}
}
internal class ConsolidatorScanPriority
{
private sealed class UtcScanTimeIdRelationalComparer : IComparer
{
public int Compare(ConsolidatorScanPriority? x, ConsolidatorScanPriority? y)
{
if (ReferenceEquals(x, y)) return 0;
if (y is null) return 1;
if (x is null) return -1;
var utcScanTimeComparison = x.UtcScanTime.CompareTo(y.UtcScanTime);
if (utcScanTimeComparison != 0) return utcScanTimeComparison;
return x.Id.CompareTo(y.Id);
}
}
public static IComparer Comparer { get; } =
new UtcScanTimeIdRelationalComparer();
///
/// The next utc scan time
///
public DateTime UtcScanTime { get; }
///
/// Unique Id of the associated consolidator
///
public long Id { get; }
public ConsolidatorScanPriority(DateTime utcScanTime, long id)
{
Id = id;
UtcScanTime = utcScanTime;
}
}
}