BLOG ON CAMLCITY.ORG: Case Study
A scalable implementation of matrix multiplications in O'Caml - by Gerd Stolpmann, 2008-05-12
In order to tackle problem we need a way to break down the algorithm so that the running processes don't have to synchronize with each other often. This is very easy for the matrix multiplication (but may be more complicated for other problems): Every cell of the result matrix is the scalar product of one column of the first input matrix with one row of the second input matrix. Thus the cells can be computed independently, and one only has to take care that all processes have access to the input matrices, and that the processes do not interfer with each other when writing the results.
The Netplex/SunRPC combination is quite powerful. Basically, a SunRPC server accepts TCP connections, and executes the incoming RPC calls. With our tools we can choose whether we want to have a single process that processes all connections, or whether we want to create new processes for every connection. Obviously, the latter is the way to go for doing the hard computation work, because O'Caml allows us only to take advantage of several cores by running the code in different processes. The single process choice is also useful, namely for controlling the computation, i.e. for synchronization. In this example we'll use both types of process management.
SunRPC is a quite aged standard for doing remote procedure calls. RPC is typed message passing, that means we cannot only pass strings from one process to the other, but values of all types of the interface definition language. In SunRPC we have ints, floats, strings, arrays, options, records ("structs"), and variants ("unions"). Furthermore, RPCs are usually "two-way" messages: Input messages are always replied with output messages.
The general idea is that the controller triggers the workers when a new multiplication is to be done, and that the workers are themselves repsonsible for getting the input data, and for getting the tasks to execute. In the SunRPC IDL this looks like:
(There is also a program Multiplier with program number 1 we'll show later.)program Controller { version V1 { void ping(void) = 0; dim get_dim(which) = 1; /* Workers can call this proc to get the dimension of the matrix */ row get_row(which,int) = 2; /* Workers can call this proc to get a row of the matrix. The int is the row number, 0..rows-1 */ jobs pull_jobs(int) = 3; /* The controller maintains a queue of jobs. This proc pulls a list of jobs from this queue. The int is the requested number. */ void put_results(results) = 4; /* Put results into the result matrix. */ } = 1; } = 2; program Worker { version V1 { void ping(void) = 0; void run(void) = 1; /* The controller calls this proc to initiate the action in the worker. When it returns, it is assumed that the worker is completely finished with everything. */ } = 1; } = 3;
These are the procedures the two components expose. When a multplication is to be done, the order of actions is as follows:
The Multiplier program is just another interface of the controller. It is the public interface used by the client of the multiplier:
program Multiplier { version V1 { void ping(void) = 0; void test_multiply(int,int,int) = 1; /* Creates a test matrix with random values and multiplies them. Args are: (l_rows, r_cols, l_cols = r_rows) */ } = 1; } = 1;
Of course, using a distributed approach creates a lot of coding overhead. Compare with the direct implementation in simple.ml (also in this directory): The multiplication algorithm counts there only 9 lines of code. Compared with that we had to develop a lot of additional stuff: 389 lines for the distributed implementation. This is the price we have to pay in terms of coding effort.
Netplex requires a configuration file, here mm_server.cfg. Basically, the file lists the components that are supposed to accept incoming connections. Here, it looks like:
We have omitted boring parts ("..."). We configure the mm_controller that connects with two workers, both reachable over TCP port localhost:2022, and the mm_workers talking to the controller over port localhost:2021. By including the same worker port twice in the list of workers, two independent connections are created, and because of the worker configuration two processes are started. (This is configured in the omitted workload_manager block.)netplex { ... service { name = "mm_controller"; protocol { ... }; processor { type = "mm_controller"; worker { host = "localhost"; port = 2022 }; worker { host = "localhost"; port = 2022 }; }; workload_manager { ... }; }; service { name = "mm_worker"; protocol { ... }; processor { type = "mm_worker"; controller_host = "localhost"; controller_port = 2021; }; workload_manager { ... }; }; }
Netplex starts exactly the components it finds in the configuration file, even if more components are implemented in the code. This is quite useful, because the same binary can be used for different setups. In this example, we could run the multiplication server on host X with both controller and workers, and on host Y with only the workers.
The test is to square an NxN matrix of random numbers, for N=1000, N=2000, and N=3000. This has been done with the non-distributed version of the multiplation (simple.ml), and with the distributed version and W workers, for W=1, W=2, and W=4.
What we expect is that for larger input matrices the relative cost of message passing drops, because we have to transfer the input matrix only once for every worker, and the result matrix only once in total, but the computation time for the multiplication is O(N3) compared to O(N2) for message passing.
Here are the runtimes, as table, and as diagram:
Test | N | Runtime (seconds) |
---|---|---|
Non-distributed | 1000 | 38.61 |
Non-distributed | 2000 | 359.765 |
Non-distributed | 3000 | 1483.759 |
Distributed, W=1 | 1000 | 45.174 |
Distributed, W=1 | 2000 | 382.874 |
Distributed, W=1 | 3000 | 1847.225 |
Distributed, W=2 | 1000 | 24.407 |
Distributed, W=2 | 2000 | 206.426 |
Distributed, W=2 | 3000 | 849.089 |
Distributed, W=4 | 1000 | 20.842 |
Distributed, W=4 | 2000 | 144.718 |
Distributed, W=4 | 3000 | 576.02 |
Not bad, right? The time spent for message passing seems to be acceptable. If you compare the non-distributed time for N=3000 with the time of the distributed system and one worker, the overhead is approximately 25% of the total runtime.
For larger W the overhead for the initial transfer of the input matrices increases - they have to be copied to every worker. The algorithm can be improved here, because the controller process becomes soon the bottleneck as it has to copy the matrices for every worker. Alternatively, the matrices could be spread among the workers by the workers themselves in order to get a better degree of parallelization during the transfer phase.
As last, fun test I started workers on three nodes so that in total 10 cores were available (I couldn't do more because I did not find more idle systems in Wink's cluster). I got a runtime of 349 seconds for N=3000. As the cores were not identical, this does not tell us much except that we really can do it over the network, and that this multiplier really takes advantage of a cluster.
Links: