There are more types of channels, which can be viewed in terms of two aspects; shared channels, and buffered channels.
One2OneChannel<Work> workChannel; One2OneChannel<Results> resultsChannel; Run(InParallel ( new Farmer( workChannel.writer() ) ) ( new Worker( workChannel.reader() , resultsChannel.writer() ) ) ( new Collector( resultsChannel.reader() ) ) );
This system is functioning as it should -- but you wish to add another Worker or two so that multiple instances of work can be done in parallel. One way to do this would be to re-engineer the existing processes, to have multiple work/results channels, and have a manager process that routes the work appropriately.
However, shared channels provide a simple way of accomplishing your aim:
One2AnyChannel<Work> workChannel; Any2OneChannel<Results> resultsChannel; Run(InParallel ( new Farmer( workChannel.writer() ) ) ( new Worker( workChannel.reader() , resultsChannel.writer() ) ) ( new Worker( workChannel.reader() , resultsChannel.writer() ) ) ( new Worker( workChannel.reader() , resultsChannel.writer() ) ) ( new Collector( resultsChannel.reader() ) ) );
Note that the behaviour of shared channels is still one writer communicating to one reader -- it is just that multiple writers and/or multiple readers can contend on the channel. Specifically, these channels are not broadcast channels.
Imagine that on a csp::Any2AnyChannel, one writer attempts to write the value 1. A second writer then attempts to write the value 2. A third writer tries to write the value 3. All these writers will wait for a reader. When a reader then tries to read from the channel, it will read the value 1 (and the first writer will finish its communication). When a second reader tries to read from the channel, it will read the value 2 (and the second writer will finish its communication). The third writer remains waiting for a reader. The first reader never sees the values 2 or 3, and the second reader never sees the values 1 or 3. Each writer and reader are paired up, and data is never shared between multiple readers.
So with reference to the farmer example, the csp::One2AnyChannel<Work> channel allows a packet of work to be sent out by the Farmer, which will be picked up by one of the Worker processes. The Farmer doesn't know which Worker will take it, and it does not care. Once a Worker has finished, it will send its results to the Collector. The results could come from any Worker to the Collecter, but again the Collector does not care. So none of the processes have had to be changed -- only the channels.
There is only one complication with using shared channels. Channels with a shared reading-end (csp::One2AnyChannel and csp::Any2AnyChannel) cannot be used with an Alternative. This stems from implementation complexities/inefficiencies. Consider if two processes wanted to alt over a shared channel, while another channel wants to read from the channel. When a writer does write to the channel, which reader should be used? For this reason, alting over shared reading-end channels is disbarred, and hence they return a csp::Chanin from their reader() method, rather than csp::AltChanin. If you really do want to alt over shared channels, consider attaching an csp::common::ExtId (or csp::common::Id) process to the shared channel, and alting over a One2OneChannel that the ExtId/Id process is writing to.
One2OneChannel<int> c; Run(InParallel ( new IncreasingNumbers( c.writer() ) ) ( new NumberPrinter( c.reader() ) ) );
Synchronisation between these two processes is not really important -- the first process is trying to send out numbers as fast as it can, and the second process will print them when it receieves them. At the moment what is happening between these two processes is that the IncreasingNumbers process will produce one value to write to the channel, and then it will have to wait until the NumberPrinter process has read it. This means that the processor(s) in your system will be continually switching back and forth between the two processes with each channel communication. We can improve this performance using buffers:
FIFOBuffer<int>::Factory bufferFactory(16); BufferedOne2OneChannel<int> c(bufferFactory); Run(InParallel ( new IncreasingNumbers( c.writer() ) ) ( new NumberPrinter( c.reader() ) ) );
The effect of the above code is to introduce a size-16 FIFO buffer between the two processes. This means that instead of the continual switching between processes with each communication, it is likely that the IncreasingNumbers will write multiple values to the buffer before being switched out, which will allow the NumberPrinter process to read some (or all) of these values being itself being switched out.
If the buffer fills up (i.e. it is holding 16 items) and then the IncreasingNumbers process tries to write to the buffer, it will be forced to wait until the NumberPrinter process has read some values and hence freed space in the buffer. Similarly, if the buffer is empty when the NumberPrinter process attempts to read from it, it will be made to wait until an item has been written to the buffer.
Thus, while a csp::FIFOBuffer allows buffering to take place on a channel, it does not entirely remove the synchronisation between the two processes -- when the buffer is full or empty, the writer or reader (respectively) would have to wait. One of these restrictions -- the buffer being full -- can be lifted by using an csp::InfiniteFIFOBuffer. As you might expect, this is a buffer without a maximum size. The writer will never have to wait when writing to such a channel, although of course if it was empty the reader would still have to wait.
The use of infinite buffers is potentially dangerous. In our example, the IncreasingNumbers process is likely to be able to produce values faster than the NumberPrinter process can print them. If these two processes were connected by an infinitely-buffered channel, the buffer would likely grow forever (because more was being put in than taken out) which would eventually lead to all your RAM being used up by this buffer. Be very careful when using infinite-buffers to ensure that they cannot grow indefinitely.
C++CSP2 offers a third type of buffer; csp::OverwritingBuffer. As the name suggests, this is a FIFO buffer that overwrites the oldest data when it becomes full. Imagine a process writing increasing integers to a size four overwriting buffer (without the values being read just yet). The buffer would fill up with 1,2,3,4 (1 being the oldest value). When the process wrote another value, the contents of the buffer would be 2,3,4,5. Then 3,4,5,6. And so on. If a process was now to read from the buffer, it would read the value 3.
All shared channels can be buffered, and all buffered channels can be shared.
This guide is continued in Guide part 4: Advanced Ways of Running Processes.