/* * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals. * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using System; using System.IO; using Ionic.Zip; using Ionic.Zlib; using System.Linq; using System.Threading; using QuantConnect.Util; using QuantConnect.Logging; using QuantConnect.Interfaces; using System.Collections.Generic; using System.Collections.Concurrent; using QuantConnect.Configuration; namespace QuantConnect.Lean.Engine.DataFeeds { /// /// File provider implements optimized zip archives caching facility. Cache is thread safe. /// public class ZipDataCacheProvider : IDataCacheProvider { private readonly double _cacheSeconds; // ZipArchive cache used by the class private readonly ConcurrentDictionary _zipFileCache = new ConcurrentDictionary(); private readonly IDataProvider _dataProvider; private readonly Timer _cacheCleaner; /// /// Property indicating the data is temporary in nature and should not be cached. /// public bool IsDataEphemeral { get; } /// /// Constructor that sets the used to retrieve data /// public ZipDataCacheProvider(IDataProvider dataProvider, bool isDataEphemeral = true, double cacheTimer = double.NaN) { IsDataEphemeral = isDataEphemeral; _cacheSeconds = double.IsNaN(cacheTimer) ? Config.GetDouble("zip-data-cache-provider", 10) : cacheTimer; _dataProvider = dataProvider; _cacheCleaner = new Timer(state => CleanCache(), null, TimeSpan.FromSeconds(_cacheSeconds), Timeout.InfiniteTimeSpan); } /// /// Does not attempt to retrieve any data /// public Stream Fetch(string key) { LeanData.ParseKey(key, out var filename, out var entryName); // handles zip files if (filename.EndsWith(".zip", StringComparison.InvariantCulture)) { Stream stream = null; try { CachedZipFile existingZip; if (!_zipFileCache.TryGetValue(filename, out existingZip)) { stream = CacheAndCreateEntryStream(filename, entryName); } else { try { lock (existingZip) { if (existingZip.Disposed) { // bad luck, thread race condition // it was disposed and removed after we got it // so lets create it again and add it stream = CacheAndCreateEntryStream(filename, entryName); } else { existingZip.Refresh(); stream = CreateEntryStream(existingZip, entryName, filename); } } } catch (Exception exception) { if (exception is ZipException || exception is ZlibException) { Log.Error("ZipDataCacheProvider.Fetch(): Corrupt zip file/entry: " + filename + "#" + entryName + " Error: " + exception); } else throw; } } return stream; } catch (Exception err) { Log.Error(err, "Inner try/catch"); stream?.DisposeSafely(); return null; } } else { // handles text files return _dataProvider.Fetch(filename); } } /// /// Store the data in the cache. /// /// The source of the data, used as a key to retrieve data in the cache /// The data as a byte array public void Store(string key, byte[] data) { LeanData.ParseKey(key, out var fileName, out var entryName); // We only support writing to zips with this provider, we also need an entryName to write // Return silently because RemoteFileSubscriptionStreamReader depends on this function not throwing. if (!fileName.EndsWith(".zip", StringComparison.InvariantCulture) || entryName == null) { return; } // Only allow one thread at a time to modify our cache lock (_zipFileCache) { // If its not in the cache, and can not be cached we need to create it if (!_zipFileCache.TryGetValue(fileName, out var cachedZip) && !Cache(fileName, out cachedZip)) { // Create the zip, if successful, cache it for later use if (Compression.ZipCreateAppendData(fileName, entryName, data)) { Cache(fileName, out _); } else { throw new InvalidOperationException($"Failed to store data {fileName}#{entryName}"); } return; } lock (cachedZip) { if (cachedZip.Disposed) { // if disposed and we have the lock means it's not in the dictionary anymore, let's assert it // but there is a window for another thread to add a **new/different** instance which is okay // we will pick it up on the store call bellow if (_zipFileCache.TryGetValue(fileName, out var existing) && ReferenceEquals(existing, cachedZip)) { Log.Error($"ZipDataCacheProvider.Store(): unexpected cache state for {fileName}"); throw new InvalidOperationException( "LEAN entered an unexpected state. Please contact support@quantconnect.com so we may debug this further."); } Store(key, data); } else { cachedZip.WriteEntry(entryName, data); } } } } /// /// Returns a list of zip entries in a provided zip file /// public List GetZipEntries(string zipFile) { if (!_zipFileCache.TryGetValue(zipFile, out var cachedZip)) { if (!Cache(zipFile, out cachedZip)) { throw new ArgumentException($"Failed to get zip entries from {zipFile}"); } } lock (cachedZip) { cachedZip.Refresh(); return cachedZip.EntryCache.Keys.ToList(); } } /// /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources. /// /// 2 public void Dispose() { // stop the cache cleaner timer _cacheCleaner.DisposeSafely(); CachedZipFile zip; foreach (var zipFile in _zipFileCache) { if (_zipFileCache.TryRemove(zipFile.Key, out zip)) { zip.DisposeSafely(); } } } /// /// Remove items in the cache that are older than the cutoff date /// private void CleanCache() { var utcNow = DateTime.UtcNow; try { var clearCacheIfOlderThan = utcNow.AddSeconds(-_cacheSeconds); // clean all items that that are older than CacheSeconds than the current date foreach (var zip in _zipFileCache) { if (zip.Value.Uncache(clearCacheIfOlderThan)) { // only clear items if they are not being used if (Monitor.TryEnter(zip.Value)) { try { // we first dispose it since if written it will refresh the file on disk and we don't // want anyone reading it directly which should be covered by the entry being in the cache // and us holding the instance lock zip.Value.Dispose(); // removing it from the cache _zipFileCache.TryRemove(zip.Key, out _); } finally { Monitor.Exit(zip.Value); } } } } } finally { try { var nextDueTime = Time.GetSecondUnevenWait((int)Math.Ceiling(_cacheSeconds * 1000)); _cacheCleaner.Change(nextDueTime, Timeout.Infinite); } catch (ObjectDisposedException) { // ignored disposed } } } private Stream CacheAndCreateEntryStream(string filename, string entryName) { Stream stream = null; var dataStream = _dataProvider.Fetch(filename); if (dataStream != null) { try { var newItem = new CachedZipFile(dataStream, DateTime.UtcNow, filename); // here we don't need to lock over the cache item // because it was still not added in the cache stream = CreateEntryStream(newItem, entryName, filename); if (!_zipFileCache.TryAdd(filename, newItem)) { // some other thread could of added it already, lets dispose ours newItem.Dispose(); } } catch (Exception exception) { // don't leak the file stream! dataStream.DisposeSafely(); if (exception is ZipException || exception is ZlibException) { Log.Error("ZipDataCacheProvider.Fetch(): Corrupt zip file/entry: " + filename + "#" + entryName + " Error: " + exception); } else throw; } } return stream; } /// /// Create a stream of a specific ZipEntry /// /// The zipFile containing the zipEntry /// The name of the entry /// The name of the zip file on disk /// A of the appropriate zip entry private Stream CreateEntryStream(CachedZipFile zipFile, string entryName, string fileName) { ZipEntryCache entryCache; if (entryName == null) { entryCache = zipFile.EntryCache.FirstOrDefault().Value; } else { zipFile.EntryCache.TryGetValue(entryName, out entryCache); } if (entryCache is { Modified: true }) { // we want to read an entry in the zip that has be edited, we need to start over // because of the zip library else it blows up, we need to call 'Save' zipFile.Dispose(); _zipFileCache.Remove(fileName, out _); return CacheAndCreateEntryStream(fileName, entryName); } var entry = entryCache?.Entry; if (entry != null) { var stream = new MemoryStream(); try { stream.SetLength(entry.UncompressedSize); } catch (ArgumentOutOfRangeException) { // The needed size of the MemoryStream is longer than allowed. // just read the data directly from the file. // Note that we cannot use entry.OpenReader() because only one OpenReader // can be open at a time without causing corruption. // We must use fileName instead of zipFile.Name, // because zipFile is initialized from a stream and not a file. var zipStream = new ZipInputStream(fileName); var zipEntry = zipStream.GetNextEntry(); // The zip file was empty! if (zipEntry == null) { return null; } // Null entry name, return the first. if (entryName == null) { return zipStream; } // Non-default entry name, return matching one if it exists, otherwise null. while (zipEntry != null) { if (string.Compare(zipEntry.FileName, entryName, StringComparison.OrdinalIgnoreCase) == 0) { return zipStream; } zipEntry = zipStream.GetNextEntry(); } } // extract directly into the stream entry.Extract(stream); stream.Position = 0; return stream; } return null; } /// /// Cache a Zip /// /// Zip to cache /// The resulting CachedZipFile /// private bool Cache(string filename, out CachedZipFile cachedZip) { cachedZip = null; var dataStream = _dataProvider.Fetch(filename); if (dataStream != null) { try { cachedZip = new CachedZipFile(dataStream, DateTime.UtcNow, filename); if (!_zipFileCache.TryAdd(filename, cachedZip)) { // some other thread could of added it already, lets dispose ours cachedZip.Dispose(); return _zipFileCache.TryGetValue(filename, out cachedZip); } return true; } catch (Exception exception) { if (exception is ZipException || exception is ZlibException) { Log.Error("ZipDataCacheProvider.Fetch(): Corrupt zip file/entry: " + filename + " Error: " + exception); } else throw; } dataStream.Dispose(); } return false; } /// /// Type for storing zipfile in cache /// private class CachedZipFile : IDisposable { private ReferenceWrapper _dateCached; private readonly Stream _dataStream; private readonly string _filePath; private long _disposed; private long _modified; /// /// The ZipFile this object represents /// private readonly ZipFile _zipFile; /// /// Contains all entries of the zip file by filename /// public readonly Dictionary EntryCache = new (StringComparer.OrdinalIgnoreCase); /// /// Returns if this cached zip file is disposed /// public bool Disposed => Interlocked.Read(ref _disposed) != 0; /// /// Initializes a new instance of the /// /// Stream containing the zip file /// Current utc time /// Path of the zip file public CachedZipFile(Stream dataStream, DateTime utcNow, string filePath) { _dataStream = dataStream; _zipFile = ZipFile.Read(dataStream); _zipFile.UseZip64WhenSaving = Zip64Option.Always; foreach (var entry in _zipFile.Entries) { EntryCache[entry.FileName] = new ZipEntryCache{ Entry = entry }; } _dateCached = new ReferenceWrapper(utcNow); _filePath = filePath; } /// /// Method used to check if this object was created before a certain time /// /// DateTime which is compared to the DateTime this object was created /// Bool indicating whether this object is older than the specified time public bool Uncache(DateTime date) { return _dateCached.Value < date; } /// /// Write to this entry, will be updated on disk when uncached /// Meaning either when timer finishes or on dispose /// /// Entry to write this as /// Content of the entry public void WriteEntry(string entryName, byte[] content) { Interlocked.Increment(ref _modified); Refresh(); // If the entry already exists remove it if (_zipFile.ContainsEntry(entryName)) { _zipFile.RemoveEntry(entryName); EntryCache.Remove(entryName); } // Write this entry to zip file var newEntry = _zipFile.AddEntry(entryName, content); EntryCache.Add(entryName, new ZipEntryCache { Entry = newEntry, Modified = true }); } /// /// We refresh our cache time when used to avoid it being clean up /// public void Refresh() { _dateCached = new ReferenceWrapper(DateTime.UtcNow); } /// /// Dispose of the ZipFile /// public void Dispose() { if (Interlocked.CompareExchange(ref _disposed, 1, 0) == 1) { // compare will return the original value, if it's already 1 means already being disposed off return; } // If we changed this zip we need to save string tempFileName = null; var modified = Interlocked.Read(ref _modified) != 0; if (modified) { // Write our changes to disk as temp tempFileName = Path.GetTempFileName(); _zipFile.Save(tempFileName); } _zipFile?.DisposeSafely(); _dataStream?.DisposeSafely(); //After disposal we will move it to the final location if (modified && tempFileName != null) { File.Move(tempFileName, _filePath, true); } } } /// /// ZipEntry wrapper which handles flagging a modified entry /// private class ZipEntryCache { public ZipEntry Entry { get; set; } public bool Modified { get; set; } } } }