In the consumer-producer pattern, I want to have a single consumer under which I can define multiple optional tasks. I will store these tasks in a dictionary and execute them later within the consumer. To implement this example I will use the architecture from Steven van Deursen and channels.

In short, my consumer looks like this:

public class ReadAllHandlerAsync(
    Dictionary<IListOfTasksToExecuteInReader, IListOfTasksToExecuteInReaderCommand> listOfTasksToExecuteInReadAllAsync)
    : IListOfTasksToExecute
{
    public async Task Execute(IListOfTasksToExecuteCommand command)
    {
        var reader = command.MyChannelReader;
        if (reader is not null)
        {
            await foreach (var latLngFileName in reader.ReadAllAsync())
            {
                foreach (var taskToExecuteCommand in listOfTasksToExecuteInReadAllAsync)
                {
                    try
                    {
                        taskToExecuteCommand.Value.LatLngFileName = latLngFileName;
                        await taskToExecuteCommand.Key.Execute(taskToExecuteCommand.Value);
                    }
                    catch (Exception ex)
                    {
                        ((ReadAllAsyncHandlerCommand)command).Exceptions.Enqueue(ex);
                    }
                }
            }
        }
    }
}
Notice method signature:
Dictionary<IListOfTasksToExecuteInReader, IListOfTasksToExecuteInReaderCommand> listOfTasksToExecuteInReadAllAsync
Then I have created wrapper around my Parallel.ForEachAsync, since therotically speaking I could have also more consumers, I will have also list of taks in my wrapper which I can execute before starting Parallel.ForEachAsync, and where I will hand over the channel:
public class MyParallelForEachAsyncWrapper(MyParallelForEachAsync myParallelForEachAsync
    , Dictionary<IListOfTasksToExecute, IListOfTasksToExecuteCommand> listOfTasksToExecuteBeforeStartForEach) : ICommandHandlerAsync<MyParallelForEachAsyncWrapperCommand>
{
    public async Task Execute(MyParallelForEachAsyncWrapperCommand command)
    {
        var tasksToExecuteBeforeStartForEach = new List<Task>();
        try
        {
            foreach (var taskToExecuteBeforeStartForEach in listOfTasksToExecuteBeforeStartForEach)
            {
                if (command.MyInfoChannel != null)
                {
                    taskToExecuteBeforeStartForEach.Value.MyInfoChannelReader = command.MyInfoChannel;
                }
 
                tasksToExecuteBeforeStartForEach.Add(
                    taskToExecuteBeforeStartForEach.Key.Execute(taskToExecuteBeforeStartForEach.Value));
            }
 
            var myParallelForEachAsyncCommand = new MyParallelForEachAsyncCommand
            {
                FolderName = command.FolderName
                , MyChannel = command.MyChannel
            };
            await myParallelForEachAsync.Execute(myParallelForEachAsyncCommand);
        }
        catch (Exception e)
        {
            command.Exceptions.Enqueue(e);
        }
        finally
        {
            await Task.WhenAll(tasksToExecuteBeforeStartForEach);
        }
    }
}
Notice how I am handing over the channel:
if (command.MyInfoChannel != null)
{
	taskToExecuteBeforeStartForEach.Value.MyInfoChannelReader = command.MyInfoChannel;
}
and
var myParallelForEachAsyncCommand = new MyParallelForEachAsyncCommand
{
	FolderName = command.FolderName
	, MyChannel = command.MyChannel
};
await myParallelForEachAsync.Execute(myParallelForEachAsyncCommand);
Thats why my interface looks like:
public interface IListOfTasksToExecuteCommand
{
    ChannelReader<LatLngFileNameModel>? GpsInfoChannelReader { get; set; }
}
At the end, my Parallel.ForEachAsync method will look like this:
public class MyParallelForEachAsync : ICommandHandlerAsync<MyParallelForEachAsyncCommand>
{
    private readonly ConcurrentQueue<Exception> _exceptions = new();

    public async Task Execute(MyParallelForEachAsyncCommand command)
    {
        if (Directory.Exists(command.FolderName))
        {
            var imageExtensions = new HashSet<string>(StringComparer.OrdinalIgnoreCase)
            {
                ".jpg", ".jpeg", ".png", ".gif", ".bmp", ".tiff", ".webp"
            };

            await Parallel.ForEachAsync(
                EnumerateFilesSafe(command.FolderName), async (imageFileName, ct) =>
                {
                    if (imageExtensions.Contains(Path.GetExtension(imageFileName).ToLower()))
                    {
                        var extractGpsInfoFromImageCommand = new ExtractGpsInfoFromImageCommand
                        {
                            ImageFileNameToReadGpsFrom = imageFileName
                        };

                        if (command.GpsInfoChannel != null)
                            await command.GpsInfoChannel.Writer.WriteAsync(
                                ExtractGpsInfoFromImage(extractGpsInfoFromImageCommand), ct);
                    }
                });

            command.GpsInfoChannel?.Writer.Complete();

            if (!_exceptions.IsEmpty)
            {
                throw new AggregateException("Error in der Parallel.ForEachAsync", _exceptions);
            }
        }
        else
        {
            throw new DirectoryNotFoundException($"Directory {command.FolderName} not found.");
        }
    }
}