In the
consumer-producer pattern, I want to have a single consumer under which I can define multiple optional tasks. I will store these tasks in a dictionary and execute them later within the consumer. To implement this example I will use the
architecture from
Steven van Deursen and
channels.
In short, my consumer looks like this:
public class ReadAllHandlerAsync(
Dictionary<IListOfTasksToExecuteInReader, IListOfTasksToExecuteInReaderCommand> listOfTasksToExecuteInReadAllAsync)
: IListOfTasksToExecute
{
public async Task Execute(IListOfTasksToExecuteCommand command)
{
var reader = command.MyChannelReader;
if (reader is not null)
{
await foreach (var latLngFileName in reader.ReadAllAsync())
{
foreach (var taskToExecuteCommand in listOfTasksToExecuteInReadAllAsync)
{
try
{
taskToExecuteCommand.Value.LatLngFileName = latLngFileName;
await taskToExecuteCommand.Key.Execute(taskToExecuteCommand.Value);
}
catch (Exception ex)
{
((ReadAllAsyncHandlerCommand)command).Exceptions.Enqueue(ex);
}
}
}
}
}
}
Notice method signature:
Dictionary<IListOfTasksToExecuteInReader, IListOfTasksToExecuteInReaderCommand> listOfTasksToExecuteInReadAllAsync
Then I have created wrapper around my Parallel.ForEachAsync, since therotically speaking I could have also more consumers, I will have also list of taks in my wrapper which I can execute before starting Parallel.ForEachAsync, and where I will hand over the channel:
public class MyParallelForEachAsyncWrapper(MyParallelForEachAsync myParallelForEachAsync
, Dictionary<IListOfTasksToExecute, IListOfTasksToExecuteCommand> listOfTasksToExecuteBeforeStartForEach) : ICommandHandlerAsync<MyParallelForEachAsyncWrapperCommand>
{
public async Task Execute(MyParallelForEachAsyncWrapperCommand command)
{
var tasksToExecuteBeforeStartForEach = new List<Task>();
try
{
foreach (var taskToExecuteBeforeStartForEach in listOfTasksToExecuteBeforeStartForEach)
{
if (command.MyInfoChannel != null)
{
taskToExecuteBeforeStartForEach.Value.MyInfoChannelReader = command.MyInfoChannel;
}
tasksToExecuteBeforeStartForEach.Add(
taskToExecuteBeforeStartForEach.Key.Execute(taskToExecuteBeforeStartForEach.Value));
}
var myParallelForEachAsyncCommand = new MyParallelForEachAsyncCommand
{
FolderName = command.FolderName
, MyChannel = command.MyChannel
};
await myParallelForEachAsync.Execute(myParallelForEachAsyncCommand);
}
catch (Exception e)
{
command.Exceptions.Enqueue(e);
}
finally
{
await Task.WhenAll(tasksToExecuteBeforeStartForEach);
}
}
}
Notice how I am handing over the channel:
if (command.MyInfoChannel != null)
{
taskToExecuteBeforeStartForEach.Value.MyInfoChannelReader = command.MyInfoChannel;
}
and
var myParallelForEachAsyncCommand = new MyParallelForEachAsyncCommand
{
FolderName = command.FolderName
, MyChannel = command.MyChannel
};
await myParallelForEachAsync.Execute(myParallelForEachAsyncCommand);
Thats why my interface looks like:
public interface IListOfTasksToExecuteCommand
{
ChannelReader<LatLngFileNameModel>? GpsInfoChannelReader { get; set; }
}
At the end, my Parallel.ForEachAsync method will look like this:
public class MyParallelForEachAsync : ICommandHandlerAsync<MyParallelForEachAsyncCommand>
{
private readonly ConcurrentQueue<Exception> _exceptions = new();
public async Task Execute(MyParallelForEachAsyncCommand command)
{
if (Directory.Exists(command.FolderName))
{
var imageExtensions = new HashSet<string>(StringComparer.OrdinalIgnoreCase)
{
".jpg", ".jpeg", ".png", ".gif", ".bmp", ".tiff", ".webp"
};
await Parallel.ForEachAsync(
EnumerateFilesSafe(command.FolderName), async (imageFileName, ct) =>
{
if (imageExtensions.Contains(Path.GetExtension(imageFileName).ToLower()))
{
var extractGpsInfoFromImageCommand = new ExtractGpsInfoFromImageCommand
{
ImageFileNameToReadGpsFrom = imageFileName
};
if (command.GpsInfoChannel != null)
await command.GpsInfoChannel.Writer.WriteAsync(
ExtractGpsInfoFromImage(extractGpsInfoFromImageCommand), ct);
}
});
command.GpsInfoChannel?.Writer.Complete();
if (!_exceptions.IsEmpty)
{
throw new AggregateException("Error in der Parallel.ForEachAsync", _exceptions);
}
}
else
{
throw new DirectoryNotFoundException($"Directory {command.FolderName} not found.");
}
}
}