- Details
- Written by: Stanko Milosev
- Category: C#
- Hits: 361
private readonly IProgress<int> _recordCountProgress; private readonly IProgress<string> _fileNameProgress; ... Parallel.ForEach(Directory.EnumerateFiles(path, "*.*", SearchOption.AllDirectories), options, fileName => { RecordCount = Interlocked.Increment(ref _recordCount); if (RecordCount % progressStep == 0) { _fileNameProgress.Report(fileName); _recordCountProgress.Report(RecordCount); } }); ... IProgress<int> recordCountProgress = new Progress<int>(NumberOfFilesProcessedIProgress); IProgress<string> fileNameProgress = new Progress<string>(FileProcessedIProgress);notice:
if (RecordCount % progressStep == 0)If there are only few files _recordCount will be always 0, because it is to fast, if there are too many files, without UI would be blocked. Second with SynchronizationContext:
Parallel.ForEach(Directory.EnumerateFiles(path, "*.*", SearchOption.AllDirectories), options, fileName => { RecordCount = Interlocked.Increment(ref _recordCount); if (RecordCount % progressStep == 0) { _syncContext?.Post(_ => { FileProcessed?.Invoke(this, new FilesProcessedSynchronizationContextEventArgs(fileName)); NumberOfFilesProcessed?.Invoke(this, new RecordCountSynchronizationContextEventArgs(_recordCount)); }, null); } });Same as previous, problem with the line:
if (RecordCount % progressStep == 0)Third with System.Threading.Timer:
CancellationTokenSource = cancellationTokenSource; _timer = new System.Threading.Timer(_ => { if (_queue.TryDequeue(out string? fileName)) { if (form.InvokeRequired) { form.BeginInvoke(() => { FileProcessed?.Invoke(this, new FilesProcessedEventArgs(fileName)); NumberOfFilesProcessed?.Invoke(this, new RecordCountEventArgs(_recordCount)); }); } else { FileProcessed?.Invoke(this, new FilesProcessedEventArgs(fileName)); NumberOfFilesProcessed?.Invoke(this, new RecordCountEventArgs(_recordCount)); } } }, null, 0, 100);Example download from here
- Details
- Written by: Stanko Milosev
- Category: C#
- Hits: 358
await using var sqlConnection = new SqlConnection(connectionString); using var sqlBulkCopy = new SqlBulkCopy(sqlConnection); using var csvDataReader = new CsvDataReader(csvPath); await sqlConnection.OpenAsync(); sqlBulkCopy.DestinationTableName = "GpsInfo"; await sqlBulkCopy.WriteToServerAsync(csvDataReader);Now IDataReader
using System.Data; namespace SaveCsvFileToSqServerWithBulkCopy; public class CsvDataReader: IDataReader { private readonly StreamReader _reader; private string[]? _currentRow; private readonly string[]? _headers; public CsvDataReader(string filePath) { _reader = new StreamReader(filePath); _headers = _reader.ReadLine()?.Split(';'); } public bool Read() { if (_reader.EndOfStream) return false; _currentRow = _reader.ReadLine()?.Split(';'); return true; } public object GetValue(int i) => _currentRow[i]; public int FieldCount => _headers.Length; public void Dispose() => _reader.Dispose(); public bool GetBoolean(int i) { throw new NotImplementedException(); } public byte GetByte(int i) { throw new NotImplementedException(); } public long GetBytes(int i, long fieldOffset, byte[]? buffer, int bufferoffset, int length) { throw new NotImplementedException(); } public char GetChar(int i) { throw new NotImplementedException(); } public long GetChars(int i, long fieldoffset, char[]? buffer, int bufferoffset, int length) { throw new NotImplementedException(); } public IDataReader GetData(int i) { throw new NotImplementedException(); } public string GetDataTypeName(int i) { throw new NotImplementedException(); } public DateTime GetDateTime(int i) { throw new NotImplementedException(); } public decimal GetDecimal(int i) { throw new NotImplementedException(); } public double GetDouble(int i) { throw new NotImplementedException(); } public Type GetFieldType(int i) { throw new NotImplementedException(); } public float GetFloat(int i) { throw new NotImplementedException(); } public Guid GetGuid(int i) { throw new NotImplementedException(); } public short GetInt16(int i) { throw new NotImplementedException(); } public int GetInt32(int i) { throw new NotImplementedException(); } public long GetInt64(int i) { throw new NotImplementedException(); } public string GetName(int i) { throw new NotImplementedException(); } public int GetOrdinal(string name) { throw new NotImplementedException(); } public string GetString(int i) { throw new NotImplementedException(); } public int GetValues(object[] values) { throw new NotImplementedException(); } public bool IsDBNull(int i) { throw new NotImplementedException(); } public object this[int i] => throw new NotImplementedException(); public object this[string name] => throw new NotImplementedException(); public void Close() { throw new NotImplementedException(); } public DataTable? GetSchemaTable() { throw new NotImplementedException(); } public bool NextResult() { throw new NotImplementedException(); } public int Depth { get; } public bool IsClosed { get; } public int RecordsAffected { get; } }Full example download from here
- Details
- Written by: Stanko Milosev
- Category: C#
- Hits: 370
private void SearchForAllFilesAndPutThemInQueue(BlockingCollection<string> fileQueue, string path) { foreach (string file in Directory.EnumerateFiles(path, "*.*", SearchOption.AllDirectories)) { fileQueue.Add(file); } }Then I will read file names from queue and do something with (consumer):
private void ReadFileNamesFromQueue ( BlockingCollection<string>? fileQueue , BlockingCollection<(LatLngModel, string)>? gpsInfoQueue ) { int i = 0; if (fileQueue is null) throw new ArgumentNullException(nameof(fileQueue)); if (gpsInfoQueue is null) throw new ArgumentNullException(nameof(gpsInfoQueue)); foreach (string file in fileQueue.GetConsumingEnumerable()) { try { ExtractGpsInfoFromImageCommand extractGpsInfoFromImageCommand = new ExtractGpsInfoFromImageCommand(); extractGpsInfoFromImageCommand.ImageFileNameToReadGpsFrom = file; extractGpsInfoFromImage.Execute(extractGpsInfoFromImageCommand); if (extractGpsInfoFromImageCommand.LatLngModel is not null) { gpsInfoQueue.Add((extractGpsInfoFromImageCommand.LatLngModel, file)); } } catch (Exception ex) { Debug.WriteLine(ex.Message()); } } }Here you can download my example, little bit complicated, where I get list of file in one queue, in second I try to extract GPS info, and from third queue I am saving to CSV file file name and longitude / latitude.
- Details
- Written by: Stanko Milosev
- Category: C#
- Hits: 369
A task returned by Task.Run() really is saying "I want you to execute this code separately"; the exact thread on which that code executes depends on a number of factors.So, here is first my method with async. First install Microsoft.Data.SqlClient here notice that old System.Data.SqlClient is deprecated. Now the method:
return Task.Run(() => ThreadReadFromDatabaseAsync(connectionString, sql)); private async Task ThreadReadFromDatabaseAsync(string connectionString, string sql) { DateTime start = DateTime.Now; try { await using SqlConnection connection = new SqlConnection(connectionString); await connection.OpenAsync(); await using SqlCommand command = new SqlCommand(sql, connection); command.CommandTimeout = 3600; await using SqlDataReader reader = await command.ExecuteReaderAsync(); int recdCnt = 0; while (reader.Read()) { UpdateUi.Execute($"{start} Reading using tasks record {recdCnt++}", LblRecordCount, Form); object[] values = new object[reader.FieldCount]; reader.GetValues(values); } } catch (Exception ex) { UpdateUi.Execute($"Error in task execution: {ex.Message}", LblError, Form); } finally { UpdateUi.Execute($"{DateTime.Now} Done with task execution", LblStatus, Form); } }Here notice line:
while (reader.Read())That I am not using ReadAsync. The problem is that at ReadAsync will stuck on reading from DB after few hundert records, this looks like bug described here. Now another example with thread:
var thread = new Thread(() => ThreadReadFromDatabase(connectionString, sql)) { Name = "ThreadReadFromDatabase", IsBackground = true }; thread.Start(); private void ThreadReadFromDatabase(string connectionString, string sql) { DateTime start = DateTime.Now; try { using SqlConnection connection = new SqlConnection(connectionString); connection.Open(); using SqlCommand command = new SqlCommand(sql, connection); command.CommandTimeout = 3600; using SqlDataReader reader = command.ExecuteReader(); int recdCnt = 0; while (reader.Read()) { UpdateUi.Execute($"{start} Reading using threads record {recdCnt++}", LblRecordCount, Form); object[] values = new object[reader.FieldCount]; reader.GetValues(values); } } catch (Exception ex) { UpdateUi.Execute($"Error in thread execution: {ex.Message}", LblError, Form); } finally { UpdateUi.Execute($"{DateTime.Now} Done with thread execution", LblStatus, Form); } }Full example download from here UPDATE 2025-03-12: Avoid using Task.Run for long-running work that blocks the thread Also from the book Concurrency in C# Cookbook by Stephen Cleary: As soon as you type new Thread(), it’s over; your project already has legacy code Also check this Stack Overflow answer.