/* * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals. * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using System; using System.Collections; using System.Collections.Concurrent; using System.Collections.Generic; using System.IO; using System.Linq; using System.Text.RegularExpressions; using System.Threading; using QuantConnect.Configuration; using QuantConnect.Interfaces; using QuantConnect.Logging; using QuantConnect.Packets; using QuantConnect.Util; namespace QuantConnect.Lean.Engine.Storage { /// /// A local disk implementation of . /// public class LocalObjectStore : IObjectStore { /// /// No read permissions error message /// protected const string NoReadPermissionsError = "The current user does not have permission to read from the organization Object Store." + " Please contact your organization administrator to request permission."; /// /// No write permissions error message /// protected const string NoWritePermissionsError = "The current user does not have permission to write to the organization Object Store." + " Please contact your organization administrator to request permission."; /// /// No delete permissions error message /// protected const string NoDeletePermissionsError = "The current user does not have permission to delete objects from the organization Object Store." + " Please contact your organization administrator to request permission."; /// /// Event raised each time there's an error /// public event EventHandler ErrorRaised; /// /// Gets the default object store location /// public static string DefaultObjectStore { get; set; } = Path.GetFullPath(Config.Get("object-store-root", "./storage")); /// /// Flag indicating the state of this object storage has changed since the last invocation /// private volatile bool _dirty; private Timer _persistenceTimer; private Regex _pathRegex = new(@"^\.?[a-zA-Z0-9\\/_#\-\$= ]+\.?[a-zA-Z0-9]*$", RegexOptions.Compiled); private readonly ConcurrentDictionary _storage = new(); private readonly object _persistLock = new object(); /// /// Provides access to the controls governing behavior of this instance, such as the persistence interval /// protected Controls Controls { get; private set; } /// /// The root storage folder for the algorithm /// protected string AlgorithmStorageRoot { get; private set; } /// /// The file handler instance to use /// protected FileHandler FileHandler { get; set; } = new(); /// /// Initializes the object store /// /// The user id /// The project id /// The user token /// The job controls instance public virtual void Initialize(int userId, int projectId, string userToken, Controls controls) { AlgorithmStorageRoot = StorageRoot(); // create the root path if it does not exist var directoryInfo = FileHandler.CreateDirectory(AlgorithmStorageRoot); // full name will return a normalized path which is later easier to compare AlgorithmStorageRoot = directoryInfo.FullName; Controls = controls; // if <= 0 we disable periodic persistence and make it synchronous if (Controls.PersistenceIntervalSeconds > 0) { _persistenceTimer = new Timer(_ => Persist(), null, Controls.PersistenceIntervalSeconds * 1000, Timeout.Infinite); } Log.Trace($"LocalObjectStore.Initialize(): Storage Root: {directoryInfo.FullName}. StorageFileCount {controls.StorageFileCount}. StorageLimit {BytesToMb(controls.StorageLimit)}MB. StoragePermissions {Controls.StorageAccess}"); } /// /// Storage root path /// protected virtual string StorageRoot() { return DefaultObjectStore; } /// /// Loads objects from the AlgorithmStorageRoot into the ObjectStore /// private IEnumerable GetObjectStoreEntries(bool loadContent, bool takePersistLock = true) { if (Controls.StorageAccess.Read) { // Acquire the persist lock to avoid yielding twice the same value, just in case lock (takePersistLock ? _persistLock : new object()) { foreach (var kvp in _storage) { if (!loadContent || kvp.Value.Data != null) { // let's first serve what we already have in memory because it might include files which are not on disk yet yield return kvp.Value; } } foreach (var file in FileHandler.EnumerateFiles(AlgorithmStorageRoot, "*", SearchOption.AllDirectories, out var rootFolder)) { var path = NormalizePath(file.FullName.RemoveFromStart(rootFolder)); ObjectStoreEntry objectStoreEntry; if (loadContent) { if (!_storage.TryGetValue(path, out objectStoreEntry) || objectStoreEntry.Data == null) { if (TryCreateObjectStoreEntry(file.FullName, path, out objectStoreEntry)) { // load file if content is null or not present, we prioritize the version we have in memory yield return _storage[path] = objectStoreEntry; } } } else { if (!_storage.ContainsKey(path)) { // we do not read the file contents yet, just the name. We read the contents on demand yield return _storage[path] = new ObjectStoreEntry(path, null); } } } } } } /// /// Returns the file paths present in the object store. This is specially useful not to load the object store into memory /// public ICollection Keys { get { return GetObjectStoreEntries(loadContent: false).Select(objectStoreEntry => objectStoreEntry.Path).ToList(); } } /// /// Will clear the object store state cache. This is useful when the object store is used concurrently by nodes which want to share information /// public void Clear() { // write to disk anything pending first Persist(); _storage.Clear(); } /// /// Determines whether the store contains data for the specified path /// /// The object path /// True if the key was found public bool ContainsKey(string path) { if (path == null) { throw new ArgumentNullException(nameof(path)); } if (!Controls.StorageAccess.Read) { throw new InvalidOperationException($"LocalObjectStore.ContainsKey(): {NoReadPermissionsError}"); } path = NormalizePath(path); if (_storage.ContainsKey(path)) { return true; } // if we don't have the file but it exists, be friendly and register it var filePath = PathForKey(path); if (FileHandler.Exists(filePath)) { _storage[path] = new ObjectStoreEntry(path, null); return true; } return false; } /// /// Returns the object data for the specified path /// /// The object path /// A byte array containing the data public byte[] ReadBytes(string path) { // Ensure we have the key, also takes care of null or improper access if (!ContainsKey(path)) { throw new KeyNotFoundException($"Object with path '{path}' was not found in the current project. " + "Please use ObjectStore.ContainsKey(key) to check if an object exists before attempting to read." ); } path = NormalizePath(path); if (!_storage.TryGetValue(path, out var objectStoreEntry) || objectStoreEntry.Data == null) { var filePath = PathForKey(path); if (TryCreateObjectStoreEntry(filePath, path, out objectStoreEntry)) { // if there is no data in the cache and the file exists on disk let's load it _storage[path] = objectStoreEntry; } } return objectStoreEntry?.Data; } /// /// Saves the object data for the specified path /// /// The object path /// The object data /// True if the save operation was successful public bool SaveBytes(string path, byte[] contents) { if (path == null) { throw new ArgumentNullException(nameof(path)); } else if (!Controls.StorageAccess.Write) { throw new InvalidOperationException($"LocalObjectStore.SaveBytes(): {NoWritePermissionsError}"); } else if (!_pathRegex.IsMatch(path)) { throw new ArgumentException($"LocalObjectStore: path is not supported: '{path}'"); } else if (path.Count(c => c == '/') > 100 || path.Count(c => c == '\\') > 100) { // just in case throw new ArgumentException($"LocalObjectStore: path is not supported: '{path}'"); } // after we check the regex path = NormalizePath(path); if (InternalSaveBytes(path, contents) // only persist if we actually stored some new data, else can skip && contents != null) { _dirty = true; // if <= 0 we disable periodic persistence and make it synchronous if (Controls.PersistenceIntervalSeconds <= 0) { Persist(); } return true; } return false; } /// /// Won't trigger persist nor will check storage write permissions, useful on initialization since it allows read only permissions to load the object store /// protected bool InternalSaveBytes(string path, byte[] contents) { if (!IsWithinStorageLimit(path, contents, takePersistLock: true)) { return false; } // Add the dirty entry var entry = _storage[path] = new ObjectStoreEntry(path, contents); entry.SetDirty(); return true; } /// /// Validates storage limits are respected on a new save operation /// protected virtual bool IsWithinStorageLimit(string path, byte[] contents, bool takePersistLock) { // Before saving confirm we are abiding by the control rules // Start by counting our file and its length var fileCount = 1; var expectedStorageSizeBytes = contents?.Length ?? 0L; foreach (var kvp in GetObjectStoreEntries(loadContent: false, takePersistLock: takePersistLock)) { if (path.Equals(kvp.Path)) { // Skip we have already counted this above // If this key was already in storage it will be replaced. } else { fileCount++; if (kvp.Data != null) { // if the data is in memory use it expectedStorageSizeBytes += kvp.Data.Length; } else { expectedStorageSizeBytes += FileHandler.TryGetFileLength(PathForKey(kvp.Path)); } } } // Verify we are within FileCount limit if (fileCount > Controls.StorageFileCount) { var message = $"LocalObjectStore.InternalSaveBytes(): You have reached the ObjectStore limit for files it can save: {fileCount}. Unable to save the new file: '{path}'"; Log.Error(message); OnErrorRaised(new StorageLimitExceededException(message)); return false; } // Verify we are within Storage limit if (expectedStorageSizeBytes > Controls.StorageLimit) { var message = $"LocalObjectStore.InternalSaveBytes(): at storage capacity: {BytesToMb(expectedStorageSizeBytes)}MB/{BytesToMb(Controls.StorageLimit)}MB. Unable to save: '{path}'"; Log.Error(message); OnErrorRaised(new StorageLimitExceededException(message)); return false; } return true; } /// /// Deletes the object data for the specified path /// /// The object path /// True if the delete operation was successful public bool Delete(string path) { if (path == null) { throw new ArgumentNullException(nameof(path)); } if (!Controls.StorageAccess.Delete) { throw new InvalidOperationException($"LocalObjectStore.Delete(): {NoDeletePermissionsError}"); } path = NormalizePath(path); var wasInCache = _storage.TryRemove(path, out var _); var filePath = PathForKey(path); if (FileHandler.Exists(filePath)) { try { FileHandler.Delete(filePath); return true; } catch { // This try sentence is to prevent a race condition with the Delete within the PersisData() method } } return wasInCache; } /// /// Returns the file path for the specified path /// /// If the key is not already inserted it will just return a path associated with it /// and add the key with null value /// The object path /// The path for the file public virtual string GetFilePath(string path) { // Ensure we have an object for that key if (!ContainsKey(path)) { // Add a key with null value to tell Persist() not to delete the file created in the path associated // with this key and not update it with the value associated with the key(null) SaveBytes(path, null); } else { // Persist to ensure pur files are up to date Persist(); } // Fetch the path to file and return it var normalizedPathKey = PathForKey(path); var parent = Directory.GetParent(normalizedPathKey); if (parent != null && parent.FullName != AlgorithmStorageRoot) { // let's create the parent folder if it's not the root storage and it does not exist if (!FileHandler.DirectoryExists(parent.FullName)) { FileHandler.CreateDirectory(parent.FullName); } } return normalizedPathKey; } /// /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources. /// public virtual void Dispose() { try { if (_persistenceTimer != null) { _persistenceTimer.Change(Timeout.Infinite, Timeout.Infinite); Persist(); _persistenceTimer.DisposeSafely(); } } catch (Exception err) { Log.Error(err, "Error deleting storage directory."); } } /// Returns an enumerator that iterates through the collection. /// A that can be used to iterate through the collection. /// 1 public IEnumerator> GetEnumerator() { return GetObjectStoreEntries(loadContent: true).Select(objectStore => new KeyValuePair(objectStore.Path, objectStore.Data)).GetEnumerator(); } /// Returns an enumerator that iterates through a collection. /// An object that can be used to iterate through the collection. /// 2 IEnumerator IEnumerable.GetEnumerator() { return GetEnumerator(); } /// /// Get's a file path for a given path. /// Internal use only because it does not guarantee the existence of the file. /// protected string PathForKey(string path) { return Path.Combine(AlgorithmStorageRoot, NormalizePath(path)); } /// /// Invoked periodically to persist the object store's contents /// private void Persist() { // Acquire the persist lock lock (_persistLock) { try { // If there are no changes we are fine if (!_dirty) { return; } if (PersistData()) { _dirty = false; } } catch (Exception err) { Log.Error("LocalObjectStore.Persist()", err); OnErrorRaised(err); } finally { try { if (_persistenceTimer != null) { // restart timer following end of persistence _persistenceTimer.Change(Time.GetSecondUnevenWait(Controls.PersistenceIntervalSeconds * 1000), Timeout.Infinite); } } catch (ObjectDisposedException) { // ignored disposed } } } } /// /// Overridable persistence function /// /// True if persistence was successful, otherwise false protected virtual bool PersistData() { try { // Write our store data to disk // Skip the key associated with null values. They are not linked to a file yet or not loaded // Also skip fails which are not flagged as dirty foreach (var kvp in _storage) { if (kvp.Value.Data != null && kvp.Value.IsDirty) { var filePath = PathForKey(kvp.Key); // directory might not exist for custom prefix var parentDirectory = Path.GetDirectoryName(filePath); if (!FileHandler.DirectoryExists(parentDirectory)) { FileHandler.CreateDirectory(parentDirectory); } FileHandler.WriteAllBytes(filePath, kvp.Value.Data); // clear the dirty flag kvp.Value.SetClean(); // This kvp could have been deleted by the Delete() method if (!_storage.Contains(kvp)) { try { FileHandler.Delete(filePath); } catch { // This try sentence is to prevent a race condition with the Delete() method } } } } return true; } catch (Exception err) { Log.Error(err, "LocalObjectStore.PersistData()"); OnErrorRaised(err); return false; } } /// /// Event invocator for the event /// protected virtual void OnErrorRaised(Exception error) { ErrorRaised?.Invoke(this, new ObjectStoreErrorRaisedEventArgs(error)); } /// /// Converts a number of bytes to megabytes as it's more human legible /// private static double BytesToMb(long bytes) { return bytes / 1024.0 / 1024.0; } private static string NormalizePath(string path) { if (string.IsNullOrEmpty(path)) { return path; } return path.TrimStart('.').TrimStart('/', '\\').Replace('\\', '/'); } private bool TryCreateObjectStoreEntry(string filePath, string path, out ObjectStoreEntry objectStoreEntry) { var count = 0; do { count++; try { if (FileHandler.Exists(filePath)) { objectStoreEntry = new ObjectStoreEntry(path, FileHandler.ReadAllBytes(filePath)); return true; } objectStoreEntry = null; return false; } catch (Exception) { if (count > 3) { throw; } else { // let's be resilient and retry, avoid race conditions, someone updating it or just random io failure Thread.Sleep(250); } } } while (true); } /// /// Helper class to hold the state of an object store file /// private class ObjectStoreEntry { private long _isDirty; public byte[] Data { get; } public string Path { get; } public bool IsDirty => Interlocked.Read(ref _isDirty) != 0; public ObjectStoreEntry(string path, byte[] data) { Path = path; Data = data; } public void SetDirty() { // flag as dirty Interlocked.CompareExchange(ref _isDirty, 1, 0); } public void SetClean() { Interlocked.CompareExchange(ref _isDirty, 0, 1); } } } }