/* * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals. * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using System; using System.Threading; namespace QuantConnect.Util.RateLimit { /// /// Provides an implementation of that implements the leaky bucket algorithm /// See: https://en.wikipedia.org/wiki/Leaky_bucket /// public class LeakyBucket : ITokenBucket { private readonly object _sync = new object(); private long _available; private readonly ISleepStrategy _sleep; private readonly IRefillStrategy _refill; private readonly ITimeProvider _timeProvider; /// /// Gets the maximum capacity of tokens this bucket can hold. /// public long Capacity { get; } /// /// Gets the total number of currently available tokens for consumption /// public long AvailableTokens { // synchronized read w/ the modification of available tokens in TryConsume get { lock (_sync) return _available; } } /// /// Initializes a new instance of the class. /// This constructor initializes the bucket using the with a 1 millisecond /// sleep to prevent being CPU intensive and uses the to refill bucket /// tokens according to the and parameters. /// /// The maximum number of tokens this bucket can hold /// The number of tokens to add to the bucket each /// The interval which after passing more tokens are added to the bucket public LeakyBucket(long capacity, long refillAmount, TimeSpan refillInterval) : this(capacity, ThreadSleepStrategy.Sleeping(1), new FixedIntervalRefillStrategy(RealTimeProvider.Instance, refillAmount, refillInterval) ) { } /// /// Initializes a new instance of the class /// /// The maximum number of tokens this bucket can hold /// Defines the used when is invoked /// but the bucket does not have enough tokens yet /// Defines the that computes how many tokens to add /// back to the bucket each time consumption is attempted /// Defines the used to enforce timeouts when /// invoking public LeakyBucket(long capacity, ISleepStrategy sleep, IRefillStrategy refill, ITimeProvider timeProvider = null) { _sleep = sleep; _refill = refill; Capacity = capacity; _available = capacity; _timeProvider = timeProvider ?? RealTimeProvider.Instance; } /// /// Blocks until the specified number of tokens are available for consumption /// and then consumes that number of tokens. /// /// The number of tokens to consume /// The maximum amount of time, in milliseconds, to block. An exception is /// throw in the event it takes longer than the stated timeout to consume the requested number /// of tokens public void Consume(long tokens, long timeout = Timeout.Infinite) { if (timeout < Timeout.Infinite) { throw new ArgumentOutOfRangeException(nameof(timeout), "Invalid timeout. Use -1 for no timeout, 0 for immediate timeout and a positive number " + "of milliseconds to indicate a timeout. All other values are out of range." ); } var startTime = _timeProvider.GetUtcNow(); while (true) { if (TryConsume(tokens)) { break; } if (timeout != Timeout.Infinite) { // determine if the requested timeout has elapsed var currentTime = _timeProvider.GetUtcNow(); var elapsedMilliseconds = (currentTime - startTime).TotalMilliseconds; if (elapsedMilliseconds > timeout) { throw new TimeoutException("The operation timed out while waiting for the rate limit to be lifted."); } } _sleep.Sleep(); } } /// /// Attempts to consume the specified number of tokens from the bucket. If the /// requested number of tokens are not immediately available, then this method /// will return false to indicate that zero tokens have been consumed. /// public bool TryConsume(long tokens) { if (tokens <= 0) { throw new ArgumentOutOfRangeException(nameof(tokens), "Number of tokens to consume must be positive" ); } if (tokens > Capacity) { throw new ArgumentOutOfRangeException(nameof(tokens), "Number of tokens to consume must be less than or equal to the capacity" ); } lock (_sync) { // determine how many units have become available since last invocation var refilled = Math.Max(0, _refill.Refill()); // the number of tokens to add, the max of which is the difference between capacity and currently available var deltaTokens = Math.Min(Capacity - _available, refilled); // update the available number of units with the new tokens _available += deltaTokens; if (tokens > _available) { // we don't have enough tokens yet Logging.Log.Trace($"LeakyBucket.TryConsume({tokens}): Failed to consumed tokens. Available: {_available}"); return false; } // subtract the number of tokens consumed _available = _available - tokens; Logging.Log.Trace($"LeakyBucket.TryConsume({tokens}): Successfully consumed tokens. Available: {_available}"); return true; } } } }