- Details
- Written by: Stanko Milosev
- Category: C#
- Hits: 1364
public class ReadAllHandlerAsync(
Dictionary<IListOfTasksToExecuteInReader, IListOfTasksToExecuteInReaderCommand> listOfTasksToExecuteInReadAllAsync)
: IListOfTasksToExecute
{
public async Task Execute(IListOfTasksToExecuteCommand command)
{
var reader = command.MyChannelReader;
if (reader is not null)
{
await foreach (var latLngFileName in reader.ReadAllAsync())
{
foreach (var taskToExecuteCommand in listOfTasksToExecuteInReadAllAsync)
{
try
{
taskToExecuteCommand.Value.LatLngFileName = latLngFileName;
await taskToExecuteCommand.Key.Execute(taskToExecuteCommand.Value);
}
catch (Exception ex)
{
((ReadAllAsyncHandlerCommand)command).Exceptions.Enqueue(ex);
}
}
}
}
}
}
Notice method signature:
Dictionary<IListOfTasksToExecuteInReader, IListOfTasksToExecuteInReaderCommand> listOfTasksToExecuteInReadAllAsyncThen I have created wrapper around my Parallel.ForEachAsync, since therotically speaking I could have also more consumers, I will have also list of taks in my wrapper which I can execute before starting Parallel.ForEachAsync, and where I will hand over the channel:
public class MyParallelForEachAsyncWrapper(MyParallelForEachAsync myParallelForEachAsync
, Dictionary<IListOfTasksToExecute, IListOfTasksToExecuteCommand> listOfTasksToExecuteBeforeStartForEach) : ICommandHandlerAsync<MyParallelForEachAsyncWrapperCommand>
{
public async Task Execute(MyParallelForEachAsyncWrapperCommand command)
{
var tasksToExecuteBeforeStartForEach = new List<Task>();
try
{
foreach (var taskToExecuteBeforeStartForEach in listOfTasksToExecuteBeforeStartForEach)
{
if (command.MyInfoChannel != null)
{
taskToExecuteBeforeStartForEach.Value.MyInfoChannelReader = command.MyInfoChannel;
}
tasksToExecuteBeforeStartForEach.Add(
taskToExecuteBeforeStartForEach.Key.Execute(taskToExecuteBeforeStartForEach.Value));
}
var myParallelForEachAsyncCommand = new MyParallelForEachAsyncCommand
{
FolderName = command.FolderName
, MyChannel = command.MyChannel
};
await myParallelForEachAsync.Execute(myParallelForEachAsyncCommand);
}
catch (Exception e)
{
command.Exceptions.Enqueue(e);
}
finally
{
await Task.WhenAll(tasksToExecuteBeforeStartForEach);
}
}
}
Notice how I am handing over the channel:
if (command.MyInfoChannel != null)
{
taskToExecuteBeforeStartForEach.Value.MyInfoChannelReader = command.MyInfoChannel;
}
and
var myParallelForEachAsyncCommand = new MyParallelForEachAsyncCommand
{
FolderName = command.FolderName
, MyChannel = command.MyChannel
};
await myParallelForEachAsync.Execute(myParallelForEachAsyncCommand);
Thats why my interface looks like:
public interface IListOfTasksToExecuteCommand
{
ChannelReader<LatLngFileNameModel>? GpsInfoChannelReader { get; set; }
}
At the end, my Parallel.ForEachAsync method will look like this:
public class MyParallelForEachAsync : ICommandHandlerAsync<MyParallelForEachAsyncCommand>
{
private readonly ConcurrentQueue<Exception> _exceptions = new();
public async Task Execute(MyParallelForEachAsyncCommand command)
{
if (Directory.Exists(command.FolderName))
{
var imageExtensions = new HashSet<string>(StringComparer.OrdinalIgnoreCase)
{
".jpg", ".jpeg", ".png", ".gif", ".bmp", ".tiff", ".webp"
};
await Parallel.ForEachAsync(
EnumerateFilesSafe(command.FolderName), async (imageFileName, ct) =>
{
if (imageExtensions.Contains(Path.GetExtension(imageFileName).ToLower()))
{
var extractGpsInfoFromImageCommand = new ExtractGpsInfoFromImageCommand
{
ImageFileNameToReadGpsFrom = imageFileName
};
if (command.GpsInfoChannel != null)
await command.GpsInfoChannel.Writer.WriteAsync(
ExtractGpsInfoFromImage(extractGpsInfoFromImageCommand), ct);
}
});
command.GpsInfoChannel?.Writer.Complete();
if (!_exceptions.IsEmpty)
{
throw new AggregateException("Error in der Parallel.ForEachAsync", _exceptions);
}
}
else
{
throw new DirectoryNotFoundException($"Directory {command.FolderName} not found.");
}
}
}
- Details
- Written by: Stanko Milosev
- Category: C#
- Hits: 928
using System;
using System.Data.SqlClient;
using System.Windows.Forms;
namespace GetDbDataTypes
{
public partial class Form1 : Form
{
public Form1()
{
InitializeComponent();
}
private void btnStart_Click(object sender, EventArgs e)
{
string connectionString = "Server=myServer;Database=myDb;Integrated Security=True";
string query = "SELECT top 1 * FROM [myTable]";
using (SqlConnection connection = new SqlConnection(connectionString))
{
connection.Open();
using (SqlCommand command = new SqlCommand(query, connection))
{
command.CommandTimeout = 3600;
using (SqlDataReader reader = command.ExecuteReader())
{
dataGridView1.DataSource = reader.GetSchemaTable();
}
}
}
}
}
}
Example download from here
- Details
- Written by: Stanko Milosev
- Category: C#
- Hits: 1389
_cts = new System.Threading.CancellationTokenSource();
int[] sleepConfiguration = [5, 7, 10, 1, 3];
List<Task> sleepingTasks = new List<Task>();
foreach (int sleepSeconds in sleepConfiguration)
{
Task sleepingTask = Task.Run(() =>
{
DoSomethingLong(sleepSeconds);
}, _cts.Token);
sleepingTasks.Add(sleepingTask);
}
await Task.WhenAll(sleepingTasks);
MessageBox.Show("Done!");
private void DoSomethingLong(int sleepSeconds)
{
Thread.Sleep(sleepSeconds * 1000);
}
- Details
- Written by: Stanko Milosev
- Category: C#
- Hits: 1096
public class FileNamesTestEntity
{
[Key] public int Id { get; set; }
public string? FileName { get; set; }
}
FileNamesTestDbContext:
public class FileNamesTestDbContext : DbContext
{
public DbSet<FileNamesTestEntity> FileNamesTest { get; set; }
public FileNamesTestDbContext(DbContextOptions<FileNamesTestDbContext> options) : base(options) { }
}
And the code:
using Microsoft.EntityFrameworkCore;
namespace UpdateOrInsertRecordInDbFromParallelForEachAsync
{
public partial class Form1 : Form
{
private CancellationTokenSource CancellationTokenSource { get; } = new();
public Form1()
{
InitializeComponent();
}
private async void btnStartIProgress_Click(object sender, EventArgs e)
{
int recordCount = 0;
IProgress<int> recordCountProgress = new Progress<int>(NumberOfFilesProcessedIProgress);
IProgress<string> fileNameProgress = new Progress<string>(FileProcessedIProgress);
int progressStep = 100;
var options = new DbContextOptionsBuilder<FileNamesTestDbContext>()
.UseSqlServer("Server=myServer;Database=MyDb;User Id=myUser;Password=myPass;Encrypt=True;TrustServerCertificate=True;")
.Options;
await Parallel.ForEachAsync(Directory.EnumerateFiles(textBox1.Text, "*.*", SearchOption.AllDirectories), new ParallelOptions
{
CancellationToken = CancellationTokenSource.Token
}
, async (fileName, ct) =>
{
await using var dbContext = new FileNamesTestDbContext(options);
FileNamesTestEntity fileNamesTestEntity = new FileNamesTestEntity();
fileNamesTestEntity.FileName = fileName;
dbContext.FileNamesTest.Add(fileNamesTestEntity);
await dbContext.SaveChangesAsync(ct);
recordCount = Interlocked.Increment(ref recordCount);
if (recordCount % progressStep == 0)
{
fileNameProgress.Report(fileName);
recordCountProgress.Report(recordCount);
}
});
MessageBox.Show("Done!");
}
private void FileProcessedIProgress(string fileName)
{
lblIProgressUiFileName.Text = fileName;
}
private void NumberOfFilesProcessedIProgress(int recordCount)
{
lblIProgressUiRecordCount.Text = $"Processed files: {recordCount}";
}
private void btnStopIProgress_Click(object sender, EventArgs e)
{
CancellationTokenSource.Cancel();
}
}
}
Example download from here