12.13
Last week I spent some time trying out the message based concurrency library Retlang in and effort speed up a portion of an application I’ve been working on. When picking up Retlang I found very few examples/explanations that were relevant to the current release (0.4.3.0).
In the beginning
Retlang is constructed of various channels that can have messages published to them. A channel also has fibers subscribed to them with a delegate to a function that accept a message as a parameter. So create a channel, subscribe a fiber to it and publish your messages to the channel and the fiber will process them one after another leaving your program free to do other things, Simple yeah? not one thread, mutex, monitor or lock in sight, don’t believe me how about an example?
Talk to me!
Ok so you might have noticed these published messages are asynchronous, you get nothing back. There are two solutions use the RequestReplyChannel instead of Channel which will process your message and wait for a response with each call. That’s gonna slow things down though, so instead we’ll publish the result on to another channel and pick them up after. Here goes…
using(var responseFiber = new ThreadFiber())
{
requestFiber.Start();
responseFiber.Start();
var requestChannel = new Channel<KeyValuePair<int, string>>();
var responseChannel = new Channel<KeyValuePair<int, bool>>();
requestChannel.Subscribe(
requestFiber,
value => {
responseChannel.Publish(
new KeyValuePair<int, bool>(
value.Key, value.Value.Contains("world")
)
);
}
);
var results = new List<KeyValuePair<int, bool>>();
responseChannel.Subscribe(
responseFiber,
value => {
results.Add(value);
}
);
requestChannel.Publish(new KeyValuePair<int, string>(1, "hello london"));
requestChannel.Publish(new KeyValuePair<int, string>(2, "hello world"));
}
You said concurrent
Ok so the previous example used one fiber to find strings that contained the word “world” and another to collate them after. It’s hardly concurrent, still feels a bit sequential. In the example we can’t really add any concurrency to the collation of those responses, well not without locking that results list, plus that’s a very simple/fast operation anyway. What we can do is speed up the finding of the word “world” in strings because there is no shared state, that’s simple to do. The channel will allow you to subscribe as many fibers to it as you wish, only problem is every fiber will be called for each message. This doesn’t really do what we want, we’ll end up with many results for each message, what we really want is the first free fiber to process the next published message. Step in the QueueChannel which does just that, so same example but with more “fiber”!
using(var requestFiber2 = new ThreadFiber())
using(var responseFiber = new ThreadFiber())
{
requestFiber1.Start();
requestFiber2.Start();
responseFiber.Start();
var requestChannel = new QueueChannel<KeyValuePair<int, string>>();
var responseChannel = new Channel<KeyValuePair<int, bool>>();
requestChannel.Subscribe(
requestFiber1,
value => {
responseChannel.Publish(
new KeyValuePair<int, bool>(
value.Key, value.Value.Contains("world")
)
);
}
);
requestChannel.Subscribe(
requestFiber2,
value => {
responseChannel.Publish(
new KeyValuePair<int, bool>(
value.Key, value.Value.Contains("world")
)
);
}
);
var results = new List<KeyValuePair<int, bool>>();
responseChannel.Subscribe(
responseFiber,
value => {
results.Add(value);
}
);
requestChannel.Publish(new KeyValuePair<int, string>(1, "hello london"));
requestChannel.Publish(new KeyValuePair<int, string>(2, "hello world"));
}
Those threads are a bit heavy
At the moment i’m using the ThreadFiber which maps down to a Thread. So for every ThreadFiber we’re creating new threads which is expensive so instead we might want to use the PoolFiber which uses the .Net ThreadPool to manage creation and reuse. It’s a very simple change to use the PoolFiber, just replace the ThreadFibers. The only difference between ThreadFiber and PoolFiber is that it will batch up a few messages to be processed in a single thread.
Pause for breath
This seems like a good place to stop for now. There are a few more things to cover in more detail like batch execution and some other things like customising execution but those can wait till another day.
Updated
I’ve updated the last to examples to solve a type typo.
Your example produces this compiler error:
error CS1503: Argument ’1′: cannot convert from ‘System.Collections.Generic.KeyValuePair’ to ‘System.Collections.Generic.KeyValuePair’
for:
results.Add(value);
Good spot the results KeyValuePair should have been bool not string . Typo shame….