TPL Dataflow
Asynchronous Producer Consumer With A Bounded BufferBlock
Section titled “Asynchronous Producer Consumer With A Bounded BufferBlock”var bufferBlock = new BufferBlock<int>(new DataflowBlockOptions{ BoundedCapacity = 1000});
var cancellationToken = new CancellationTokenSource(TimeSpan.FromSeconds(10)).Token;
var producerTask = Task.Run(async () =>{ var random = new Random();
while (!cancellationToken.IsCancellationRequested) { var value = random.Next(); await bufferBlock.SendAsync(value, cancellationToken); }});
var consumerTask = Task.Run(async () =>{ while (await bufferBlock.OutputAvailableAsync()) { var value = bufferBlock.Receive(); Console.WriteLine(value); }});
await Task.WhenAll(producerTask, consumerTask);Posting to an ActionBlock and waiting for completion
Section titled “Posting to an ActionBlock and waiting for completion”// Create a block with an asynchronous actionvar block = new ActionBlock<string>(async hostName =>{ IPAddress[] ipAddresses = await Dns.GetHostAddressesAsync(hostName); Console.WriteLine(ipAddresses[0]);});
block.Post("google.com"); // Post items to the block's InputQueue for processingblock.Post("reddit.com");block.Post("stackoverflow.com");
block.Complete(); // Tell the block to complete and stop accepting new itemsawait block.Completion; // Asynchronously wait until all items completed processinguLinking blocks to create a pipeline
Section titled “Linking blocks to create a pipeline”var httpClient = new HttpClient();
// Create a block the accepts a uri and returns its contents as a stringvar downloaderBlock = new TransformBlock<string, string>( async uri => await httpClient.GetStringAsync(uri));
// Create a block that accepts the content and prints it to the consolevar printerBlock = new ActionBlock<string>( contents => Console.WriteLine(contents));
// Make the downloaderBlock complete the printerBlock when its completed.var dataflowLinkOptions = new DataflowLinkOptions {PropagateCompletion = true};
// Link the block to create a pipelinedownloaderBlock.LinkTo(printerBlock, dataflowLinkOptions);
// Post urls to the first block which will pass their contents to the second one.downloaderBlock.Post("http://youtube.com");downloaderBlock.Post("http://github.com");downloaderBlock.Post("http://twitter.com");
downloaderBlock.Complete(); // Completion will propagate to printerBlockawait printerBlock.Completion; // Only need to wait for the last block in the pipelineSynchronous Producer/Consumer with BufferBlock
Section titled “Synchronous Producer/Consumer with BufferBlock”public class Producer{ private static Random random = new Random((int)DateTime.UtcNow.Ticks); //produce the value that will be posted to buffer block public double Produce ( ) { var value = random.NextDouble(); Console.WriteLine($"Producing value: {value}"); return value; }}
public class Consumer{ //consume the value that will be received from buffer block public void Consume (double value) => Console.WriteLine($"Consuming value: {value}");}
class Program{ private static BufferBlock<double> buffer = new BufferBlock<double>(); static void Main (string[] args) { //start a task that will every 1 second post a value from the producer to buffer block var producerTask = Task.Run(async () => { var producer = new Producer(); while(true) { buffer.Post(producer.Produce()); await Task.Delay(1000); } }); //start a task that will recieve values from bufferblock and consume it var consumerTask = Task.Run(() => { var consumer = new Consumer(); while(true) { consumer.Consume(buffer.Receive()); } });
Task.WaitAll(new[] { producerTask, consumerTask }); }}Remarks
Section titled “Remarks”Libraries Used in Examples
Section titled “Libraries Used in Examples”System.Threading.Tasks.Dataflow
System.Threading.Tasks
System.Net.Http
System.Net
Difference between Post and SendAsync
Section titled “Difference between Post and SendAsync”To add items to a block you can either use Post or SendAsync.
Post will try to add the item synchronously and return a bool saying whether it succeeded or not. It may not succeed when, for example, a block has reached its BoundedCapcity and has no more room for new items yet. SendAsync on the other hand will return an uncompleted Task<bool> that you can await. That task will complete in the future with a true result when the block cleared its internal queue and can accept more items or with a false result if it’s declining permanently (e.g. as a result of cancellation).