- Details
- Written by: Stanko Milosev
- Category: C#
- Hits: 634
private readonly IProgress<int> _recordCountProgress;
private readonly IProgress<string> _fileNameProgress;
...
Parallel.ForEach(Directory.EnumerateFiles(path, "*.*", SearchOption.AllDirectories), options, fileName =>
{
RecordCount = Interlocked.Increment(ref _recordCount);
if (RecordCount % progressStep == 0)
{
_fileNameProgress.Report(fileName);
_recordCountProgress.Report(RecordCount);
}
});
...
IProgress<int> recordCountProgress = new Progress<int>(NumberOfFilesProcessedIProgress);
IProgress<string> fileNameProgress = new Progress<string>(FileProcessedIProgress);
notice:
if (RecordCount % progressStep == 0)If there are only few files _recordCount will be always 0, because it is to fast, if there are too many files, without UI would be blocked. Second with SynchronizationContext:
Parallel.ForEach(Directory.EnumerateFiles(path, "*.*", SearchOption.AllDirectories), options, fileName =>
{
RecordCount = Interlocked.Increment(ref _recordCount);
if (RecordCount % progressStep == 0)
{
_syncContext?.Post(_ =>
{
FileProcessed?.Invoke(this,
new FilesProcessedSynchronizationContextEventArgs(fileName));
NumberOfFilesProcessed?.Invoke(this,
new RecordCountSynchronizationContextEventArgs(_recordCount));
}, null);
}
});
Same as previous, problem with the line:
if (RecordCount % progressStep == 0)Third with System.Threading.Timer:
CancellationTokenSource = cancellationTokenSource;
_timer = new System.Threading.Timer(_ =>
{
if (_queue.TryDequeue(out string? fileName))
{
if (form.InvokeRequired)
{
form.BeginInvoke(() =>
{
FileProcessed?.Invoke(this, new FilesProcessedEventArgs(fileName));
NumberOfFilesProcessed?.Invoke(this, new RecordCountEventArgs(_recordCount));
});
}
else
{
FileProcessed?.Invoke(this, new FilesProcessedEventArgs(fileName));
NumberOfFilesProcessed?.Invoke(this, new RecordCountEventArgs(_recordCount));
}
}
}, null, 0, 100);
Example download from here
- Details
- Written by: Stanko Milosev
- Category: C#
- Hits: 635
await using var sqlConnection = new SqlConnection(connectionString); using var sqlBulkCopy = new SqlBulkCopy(sqlConnection); using var csvDataReader = new CsvDataReader(csvPath); await sqlConnection.OpenAsync(); sqlBulkCopy.DestinationTableName = "GpsInfo"; await sqlBulkCopy.WriteToServerAsync(csvDataReader);Now IDataReader
using System.Data;
namespace SaveCsvFileToSqServerWithBulkCopy;
public class CsvDataReader: IDataReader
{
private readonly StreamReader _reader;
private string[]? _currentRow;
private readonly string[]? _headers;
public CsvDataReader(string filePath)
{
_reader = new StreamReader(filePath);
_headers = _reader.ReadLine()?.Split(';');
}
public bool Read()
{
if (_reader.EndOfStream) return false;
_currentRow = _reader.ReadLine()?.Split(';');
return true;
}
public object GetValue(int i) => _currentRow[i];
public int FieldCount => _headers.Length;
public void Dispose() => _reader.Dispose();
public bool GetBoolean(int i)
{
throw new NotImplementedException();
}
public byte GetByte(int i)
{
throw new NotImplementedException();
}
public long GetBytes(int i, long fieldOffset, byte[]? buffer, int bufferoffset, int length)
{
throw new NotImplementedException();
}
public char GetChar(int i)
{
throw new NotImplementedException();
}
public long GetChars(int i, long fieldoffset, char[]? buffer, int bufferoffset, int length)
{
throw new NotImplementedException();
}
public IDataReader GetData(int i)
{
throw new NotImplementedException();
}
public string GetDataTypeName(int i)
{
throw new NotImplementedException();
}
public DateTime GetDateTime(int i)
{
throw new NotImplementedException();
}
public decimal GetDecimal(int i)
{
throw new NotImplementedException();
}
public double GetDouble(int i)
{
throw new NotImplementedException();
}
public Type GetFieldType(int i)
{
throw new NotImplementedException();
}
public float GetFloat(int i)
{
throw new NotImplementedException();
}
public Guid GetGuid(int i)
{
throw new NotImplementedException();
}
public short GetInt16(int i)
{
throw new NotImplementedException();
}
public int GetInt32(int i)
{
throw new NotImplementedException();
}
public long GetInt64(int i)
{
throw new NotImplementedException();
}
public string GetName(int i)
{
throw new NotImplementedException();
}
public int GetOrdinal(string name)
{
throw new NotImplementedException();
}
public string GetString(int i)
{
throw new NotImplementedException();
}
public int GetValues(object[] values)
{
throw new NotImplementedException();
}
public bool IsDBNull(int i)
{
throw new NotImplementedException();
}
public object this[int i] => throw new NotImplementedException();
public object this[string name] => throw new NotImplementedException();
public void Close()
{
throw new NotImplementedException();
}
public DataTable? GetSchemaTable()
{
throw new NotImplementedException();
}
public bool NextResult()
{
throw new NotImplementedException();
}
public int Depth { get; }
public bool IsClosed { get; }
public int RecordsAffected { get; }
}
Full example download from here
- Details
- Written by: Stanko Milosev
- Category: C#
- Hits: 565
private void SearchForAllFilesAndPutThemInQueue(BlockingCollection<string> fileQueue, string path)
{
foreach (string file in Directory.EnumerateFiles(path, "*.*", SearchOption.AllDirectories))
{
fileQueue.Add(file);
}
}
Then I will read file names from queue and do something with (consumer):
private void ReadFileNamesFromQueue
(
BlockingCollection<string>? fileQueue
, BlockingCollection<(LatLngModel, string)>? gpsInfoQueue
)
{
int i = 0;
if (fileQueue is null) throw new ArgumentNullException(nameof(fileQueue));
if (gpsInfoQueue is null) throw new ArgumentNullException(nameof(gpsInfoQueue));
foreach (string file in fileQueue.GetConsumingEnumerable())
{
try
{
ExtractGpsInfoFromImageCommand extractGpsInfoFromImageCommand = new ExtractGpsInfoFromImageCommand();
extractGpsInfoFromImageCommand.ImageFileNameToReadGpsFrom = file;
extractGpsInfoFromImage.Execute(extractGpsInfoFromImageCommand);
if (extractGpsInfoFromImageCommand.LatLngModel is not null)
{
gpsInfoQueue.Add((extractGpsInfoFromImageCommand.LatLngModel, file));
}
}
catch (Exception ex)
{
Debug.WriteLine(ex.Message());
}
}
}
Here you can download my example, little bit complicated, where I get list of file in one queue, in second I try to extract GPS info, and from third queue I am saving to CSV file file name and longitude / latitude.
- Details
- Written by: Stanko Milosev
- Category: C#
- Hits: 570
A task returned by Task.Run() really is saying "I want you to execute this code separately"; the exact thread on which that code executes depends on a number of factors.So, here is first my method with async. First install Microsoft.Data.SqlClient here notice that old System.Data.SqlClient is deprecated. Now the method:
return Task.Run(() => ThreadReadFromDatabaseAsync(connectionString, sql));
private async Task ThreadReadFromDatabaseAsync(string connectionString, string sql)
{
DateTime start = DateTime.Now;
try
{
await using SqlConnection connection = new SqlConnection(connectionString);
await connection.OpenAsync();
await using SqlCommand command = new SqlCommand(sql, connection);
command.CommandTimeout = 3600;
await using SqlDataReader reader = await command.ExecuteReaderAsync();
int recdCnt = 0;
while (reader.Read())
{
UpdateUi.Execute($"{start} Reading using tasks record {recdCnt++}", LblRecordCount, Form);
object[] values = new object[reader.FieldCount];
reader.GetValues(values);
}
}
catch (Exception ex)
{
UpdateUi.Execute($"Error in task execution: {ex.Message}", LblError, Form);
}
finally
{
UpdateUi.Execute($"{DateTime.Now} Done with task execution", LblStatus, Form);
}
}
Here notice line:
while (reader.Read())That I am not using ReadAsync. The problem is that at ReadAsync will stuck on reading from DB after few hundert records, this looks like bug described here. Now another example with thread:
var thread = new Thread(() => ThreadReadFromDatabase(connectionString, sql))
{
Name = "ThreadReadFromDatabase",
IsBackground = true
};
thread.Start();
private void ThreadReadFromDatabase(string connectionString, string sql)
{
DateTime start = DateTime.Now;
try
{
using SqlConnection connection = new SqlConnection(connectionString);
connection.Open();
using SqlCommand command = new SqlCommand(sql, connection);
command.CommandTimeout = 3600;
using SqlDataReader reader = command.ExecuteReader();
int recdCnt = 0;
while (reader.Read())
{
UpdateUi.Execute($"{start} Reading using threads record {recdCnt++}", LblRecordCount, Form);
object[] values = new object[reader.FieldCount];
reader.GetValues(values);
}
}
catch (Exception ex)
{
UpdateUi.Execute($"Error in thread execution: {ex.Message}", LblError, Form);
}
finally
{
UpdateUi.Execute($"{DateTime.Now} Done with thread execution", LblStatus, Form);
}
}
Full example download from here
UPDATE 2025-03-12: Avoid using Task.Run for long-running work that blocks the thread
Also from the book Concurrency in C# Cookbook by Stephen Cleary:
As soon as you type new Thread(), it’s over; your project already has legacy code
Also check this Stack Overflow answer.