BLOG ON CAMLCITY.ORG: Ocamlnet 3
What's new in Ocamlnet 3: Synchronization primitives in Netplex - by Gerd Stolpmann, 2009-11-22
Netplex is a framework for creating network services. In this model,
the most important role of processes is to serve as containers for
protocol interpreters that accept and deal with network connections
("worker processes"). In the traditional multi-processing world (like
inetd master server in Unix) this is taken to the
extent that each connection is served by its own process. Netplex,
fortunately, relaxes this 1:1 relationship - a process can handle more
than one connection, either serially or even concurrently. Netplex was
introduced in Ocamlnet 2, and got a lot of attention since then. What
remained unanswered, however, is the question how to deal with tasks
that are not immediately network-driven, but nevertheless have to run
in parallel with the existing processes. In this article, I present
a way to control a global timer for a network service, so one can
run a certain task periodically - as an example how to use the new
Before going into detail, let me clarify on one important thing. These synchronization primitives are not designed to be fast. They are designed to be always available, and to behave well even under abnormal operating circumstances (like crashing processes). Usually, the operating system provides better implementations, but often these implementations require certain arrangements (like knowing global names, or certain process relationships). For many uses, it is more important to have an immediately available mechanism than to have the fastest one. Also, such a basic mechanism can be helpful for negotiating faster ones between processes.
In the Netplex process model one of the processes has a special role:
The master process is the first process, and the one that serves as
parent of all the other processes. The master process is not involved
in CPU-intensive tasks, and is usually very responsive to incoming
requests. Because of this, it is the ideal instance to perform helper
tasks. Ocamlnet 3 adds the concept of plugins to the controller object
that runs in the master process. Plugins are little RPC servers that
can be attached to the main server the controller object provides.
The synchronization primitives are implemented as such plugins. Before
using a plugin, it needs to be added to the controller
such as in
ctrl # add_plugin Netplex_semaphore.plugin
This is best done early in the program. Fortunately, a good moment for doing so is provided as a service hook by Netplex. All these hooks are bundled together as a hook object, and this object configures some aspects of a Netplex network service:
class type processor_hook = object method post_add_hook : socket_service -> controller -> unit method post_rm_hook : socket_service -> controller -> unit method pre_start_hook : socket_service -> controller -> container_id -> unit method post_start_hook : container -> unit method pre_finish_hook : container -> unit method post_finish_hook : socket_service -> controller -> container_id -> unit method receive_message : container -> string -> string array -> unit method receive_admin_message : container -> string -> string array -> unit method system_shutdown : unit -> unit method shutdown : unit -> unit method global_exception_handler : exn -> bool end
Such a hook object can always be passed as an additional parameter to
the protocol interpreter implementation (no matter which: http, the
various CGI variants, Sun RPC, ICE as provided by Hydro). The
add_plugin call is best done in the
post_add_hook hook which is executed once after the
socket service is configured in the controller. We will also use
other hooks for our timer - even more radical, we will only use hooks
to attach the timer to the network service. This means that we don't
assume much about the service itself, and because of this the solution
is quite generic.
A semaphore is a resource counter. The counter can be atomically increased and decreased, but can never fall below zero. An attempt to do so either fails immediately, or the execution of the process blocks until the counter is positive again. The idea behind that can be illustrated by a queue supplying a number of processes with tasks. The counter reflects the length of the queue. When a process needs another task, it checks the counter, and tries to decrease it. Either this works immediately, which means that the process gets the task, and has the right to fetch it from the queue. Or the queue is empty, and the process has to wait until a new task is again posted to the queue. Of course, the latter is the interesting case, because it can happen that several processes want to get new tasks from the empty queue. The semaphore then works as an arbitration mechanism, as it selects which process will be considered first.
In our example, the resource is the timer, or better the possibility of running the timer. We have a number of processes, but the timer should only run in one of them ("it is owned by one process only"). The counter value 1 means that the timer is not (!) running (i.e. 1 = the timer can be started once more), and 0 means that the timer is already running (0 = the timer cannot be started for an additional time).
This function tries to get the ownership of the timer, and returns true if that could be achieved:
let sem_name = "some_name" let own_timer = ref false let acquire_timer() = (* precondition: the semaphore value can be 1 or 0 *) !own_timer || ( let v = Netplex_semaphore.decrement sem_name in (* By convention, v=-1 means that the value was already 0. It is not further decreased, however. v=0 means that it was decreased from 1 to 0, and we own the semaphore now. *) own_timer := (v = 0); !own_timer )
Note that we don't pass
~wait:true to the
decrement call. This causes the operation to return -1 when the
counter is already 0 rather than to wait for becoming positive. Also,
semaphores have Netplex-wide global names. Instead of "some_name" one
should pass something more intelligent, e.g. a name derived from the
service name ("service.semaphore0").
Although semaphores are automatically created at the time of the first use, they are often initialized wrong (counter value of 0). In our example, the initial value must be 1 - meaning that the timer is unacquired:
let create_sem() = let success = Netplex_semaphore.create ~protected:true sem_name 1L in ()
This function creates the semaphore if it does not exist, and sets the initial value to 1. Also, it is a so-called protected semaphore: If the process terminates in an abnormal way, Netplex ensures that the increment and decrement operations called by the process are reverted, i.e. if the process happens to own the timer, and the counter is 0, it will be automatically set back to 1, so another process has the chance to take over the ownership.
When all processes call
create_sem when they start up it
can be ensured that the semaphore exists.
The timer itself is started like this:
let timer_running = ref None let start_timer() = timer_running := Some(Netplex_cenv.create_timer (fun timer -> ...; true) 60.0 )
The function body is run every 60 seconds (the result value of true means that the timer is restarted after each timeout). Such timers are entered into the main event loop of the Netplex process container. This means it is not guaranteed that they are run as often as demanded - if some regular computation takes too long and the event loop cannot run then the timer activation will be deferred. For many timers, especially when runnning only infrequently, this way of activation is reliable enough. Timers are automatically stopped at shutdown time. We can, however, also enfore an earlier stop:
let stop_timer() = match !timer_running with | None -> () | Some timer -> Netplex_cenv.cancel_timer timer; timer_running := None
Finally, there is the case that the process terminates and has to give up the ownership of the timer. This is easily done by incrementing the counter. However, we have to do more in this example, because the decrement operation does not wait: We have to notify other processes so that they again try to acquire the now free semaphore. Netplex has a simple built-in messaging system that we can use here:
let release_timer() = if !own_timer then ( let v = Netplex_semaphore.increment sem_name in let cont = Netplex_cenv.self_cont() in cont # send_message "*" "release_semaphore" [| sem_name |] )
(The destination address of "*" means that the message is broadcasted to all receivers. This could be optimized by sending only to the processes of the same service.)
Now, let's put everything together: When the Netplex system starts up, we add the plugin. When a process starts, we first try to create the semaphore, and then try to acquire it. If this is successful, we start the timer. For getting the shutdown right, we stop the timer and release the ownership before the process exits. Also, we listen on messages, and if the right one arrives, we look whether the semaphore is orphaned:
class our_hooks() = object(self) inherit Netplex_kit.empty_processor_hooks() method post_add_hook _ ctrl = ctrl # add_plugin Netplex_semaphore.plugin method post_start_hook cont = create_sem(); if acquire_timer() then start_timer() method pre_finish_hook cont = stop_timer(); release_timer() method receive_message cont msg_name msg_args = if msg_name = "release_sempahore" && msg_args = [| sem_name |] then ( if acquire_timer() then start_timer() ) end
There is still a possible improvement. As pointed out, a protected
semaphore automatically adjusts the counter value when the process
crashes. But this is only the first action of
release_timer. The second is to notify the other
processes of the vacant timer ownership. This could be done from the
master process (which is complicated), or by simply starting another
timer that continuously tries to get the semaphore:
method post_start_hook cont = create_sem(); if acquire_timer() then start_timer(); ignore(Netplex_cenv.create_timer (fun _ -> if acquire_timer() then start_timer(); true ) 60.0)
Netplex_sharedvarallows the processes to access variables in the master process. This is a very limited mechanism, though: Accesses are quite expensive, and the variables should not become big (because they would have to be copied at
forktime). Nevertheless, this is a useful mechanism to spread dynamic information in a Netplex process system.
Netplex_mutexcan be used to protect critical code sections (e.g. when accessing shared variables). Again, this mechanism is slow.
lookupmethod of containers. This can be used to establish a fast IPC channel between processes.
This list is going to be extended (e.g. message queues are on my
list). Also, it is tried to speed up the mechanisms when Netplex finds
out that the OS has native support for these primitives (e.g. use
POSIX semaphores for implementing the
plugin if available).
Some final words on why multi-processing is an interesting option for implementing network servers. This is important to explain because there are a number of myths about multi-processing (e.g. it is slow, hard to master, not that flexible), and especially non-tech people see it as a technology of the past. There are also big companies who are interested in spreading these myths, e.g. Microsoft, because their OS is a bad choice when it comes to multi-processing, or Sun, because Java is heavily optimized for multi-threading and JIT is not really compatible with multi-processing. The truth is that the big players simply did not invest in this technology. Actually, there are a number of advantages, and some make multi-processing superior:
First, there is the stability of the resulting programs. Even if a process crashes, the remaining processes of the system can continue to run. Netplex is designed with crash resilience in mind (crashed processes are restarted).
Second, multi-processing systems can more easily benefit from multicores because shared memory only plays a subordinate role for them which leads to more effective caching of RAM. Multi-processing systems often use uni- and bidirectional messaging instead. For programs written in Ocaml this is even more the case (but not for other languages) because Ocaml does not allow multi-threaded programs to profit from multicores at all.
Third, multi-processing systems can be more easily extended to clusters (i.e. they are running on several computers). The messaging mechanisms can often be easily implemented as network protocols.
Finally, many synchronization primitives are also available for the multi-processing case (as demonstrated). More or less, it is only shared memory which is really harder to master - e.g. Ocaml does not provide any form of memory management for explicitly allocated shared memory, so one could put Ocaml values directly into such memory blocks for easy and quick access. (However, I'm working on partial solutions for this.)
Alternately, one can also check out
the Subversion repository (use the
on this URL, or click on it to view it with your web browser - most
of the discussed code lives in