Docs GODI Archive
Projects Blog Knowledge

Look up function:

(e.g. "List.find" or "keysym")
More options

BLOG ON CAMLCITY.ORG: Ocamlnet 3

Mastering Multi-processing

What's new in Ocamlnet 3: Synchronization primitives in Netplex - by Gerd Stolpmann, 2009-11-22

Ocamlnet is being renovated, and there is already a first testing version of Ocamlnet 3. The author, Gerd Stolpmann, explains in a series of articles what is new, and why Ocamlnet is the best networking platform ever seen. When it comes to parallelism, many people react skeptical on multi-processing - mostly because this is not the way of concurrent programming the "big players" prefer. However, with the help of a good multi-processing framework like Netplex, this style can be easily mastered, and delivers stable results in a more quicker way than other styles such as multi-threading. Ocamlnet 3 adds important synchronization primitives like semaphores.

Netplex is a framework for creating network services. In this model, the most important role of processes is to serve as containers for protocol interpreters that accept and deal with network connections ("worker processes"). In the traditional multi-processing world (like the inetd master server in Unix) this is taken to the extent that each connection is served by its own process. Netplex, fortunately, relaxes this 1:1 relationship - a process can handle more than one connection, either serially or even concurrently. Netplex was introduced in Ocamlnet 2, and got a lot of attention since then. What remained unanswered, however, is the question how to deal with tasks that are not immediately network-driven, but nevertheless have to run in parallel with the existing processes. In this article, I present a way to control a global timer for a network service, so one can run a certain task periodically - as an example how to use the new synchronization primitives.

Before going into detail, let me clarify on one important thing. These synchronization primitives are not designed to be fast. They are designed to be always available, and to behave well even under abnormal operating circumstances (like crashing processes). Usually, the operating system provides better implementations, but often these implementations require certain arrangements (like knowing global names, or certain process relationships). For many uses, it is more important to have an immediately available mechanism than to have the fastest one. Also, such a basic mechanism can be helpful for negotiating faster ones between processes.

The example: Competing for running the timer

In the Netplex process model one of the processes has a special role: The master process is the first process, and the one that serves as parent of all the other processes. The master process is not involved in CPU-intensive tasks, and is usually very responsive to incoming requests. Because of this, it is the ideal instance to perform helper tasks. Ocamlnet 3 adds the concept of plugins to the controller object that runs in the master process. Plugins are little RPC servers that can be attached to the main server the controller object provides. The synchronization primitives are implemented as such plugins. Before using a plugin, it needs to be added to the controller ctrl, such as in

ctrl # add_plugin Netplex_semaphore.plugin

This is best done early in the program. Fortunately, a good moment for doing so is provided as a service hook by Netplex. All these hooks are bundled together as a hook object, and this object configures some aspects of a Netplex network service:

class type processor_hook =
object
  method post_add_hook : socket_service -> controller -> unit
  method post_rm_hook : socket_service  -> controller -> unit
  method pre_start_hook : socket_service -> controller -> container_id -> unit
  method post_start_hook : container -> unit
  method pre_finish_hook : container -> unit
  method post_finish_hook : socket_service -> controller -> container_id -> unit
  method receive_message :
            container -> string -> string array -> unit
  method receive_admin_message :
            container -> string -> string array -> unit
  method system_shutdown : unit -> unit
  method shutdown : unit -> unit
  method global_exception_handler : exn -> bool
end

Such a hook object can always be passed as an additional parameter to the protocol interpreter implementation (no matter which: http, the various CGI variants, Sun RPC, ICE as provided by Hydro). The add_plugin call is best done in the post_add_hook hook which is executed once after the socket service is configured in the controller. We will also use other hooks for our timer - even more radical, we will only use hooks to attach the timer to the network service. This means that we don't assume much about the service itself, and because of this the solution is quite generic.

A semaphore is a resource counter. The counter can be atomically increased and decreased, but can never fall below zero. An attempt to do so either fails immediately, or the execution of the process blocks until the counter is positive again. The idea behind that can be illustrated by a queue supplying a number of processes with tasks. The counter reflects the length of the queue. When a process needs another task, it checks the counter, and tries to decrease it. Either this works immediately, which means that the process gets the task, and has the right to fetch it from the queue. Or the queue is empty, and the process has to wait until a new task is again posted to the queue. Of course, the latter is the interesting case, because it can happen that several processes want to get new tasks from the empty queue. The semaphore then works as an arbitration mechanism, as it selects which process will be considered first.

In our example, the resource is the timer, or better the possibility of running the timer. We have a number of processes, but the timer should only run in one of them ("it is owned by one process only"). The counter value 1 means that the timer is not (!) running (i.e. 1 = the timer can be started once more), and 0 means that the timer is already running (0 = the timer cannot be started for an additional time).

This function tries to get the ownership of the timer, and returns true if that could be achieved:

let sem_name = "some_name"

let own_timer = ref false

let acquire_timer() =
  (* precondition: the semaphore value can be 1 or 0 *)
  !own_timer || (
    let v =
      Netplex_semaphore.decrement sem_name in
    (* By convention, v=-1 means that the value was already 0. It is not
       further decreased, however. v=0 means that it was decreased from
       1 to 0, and we own the semaphore now.
     *)
    own_timer := (v = 0);
    !own_timer
  )

Note that we don't pass ~wait:true to the decrement call. This causes the operation to return -1 when the counter is already 0 rather than to wait for becoming positive. Also, semaphores have Netplex-wide global names. Instead of "some_name" one should pass something more intelligent, e.g. a name derived from the service name ("service.semaphore0").

Although semaphores are automatically created at the time of the first use, they are often initialized wrong (counter value of 0). In our example, the initial value must be 1 - meaning that the timer is unacquired:

let create_sem() =
  let success =
    Netplex_semaphore.create ~protected:true sem_name 1L in
  ()

This function creates the semaphore if it does not exist, and sets the initial value to 1. Also, it is a so-called protected semaphore: If the process terminates in an abnormal way, Netplex ensures that the increment and decrement operations called by the process are reverted, i.e. if the process happens to own the timer, and the counter is 0, it will be automatically set back to 1, so another process has the chance to take over the ownership.

When all processes call create_sem when they start up it can be ensured that the semaphore exists.

The timer itself is started like this:

let timer_running = ref None

let start_timer() =
  timer_running :=
    Some(Netplex_cenv.create_timer
          (fun timer -> ...; true)
          60.0
        )

The function body is run every 60 seconds (the result value of true means that the timer is restarted after each timeout). Such timers are entered into the main event loop of the Netplex process container. This means it is not guaranteed that they are run as often as demanded - if some regular computation takes too long and the event loop cannot run then the timer activation will be deferred. For many timers, especially when runnning only infrequently, this way of activation is reliable enough. Timers are automatically stopped at shutdown time. We can, however, also enfore an earlier stop:

let stop_timer() =
  match !timer_running with
    | None -> ()
    | Some timer ->
        Netplex_cenv.cancel_timer timer;
        timer_running := None

Finally, there is the case that the process terminates and has to give up the ownership of the timer. This is easily done by incrementing the counter. However, we have to do more in this example, because the decrement operation does not wait: We have to notify other processes so that they again try to acquire the now free semaphore. Netplex has a simple built-in messaging system that we can use here:

let release_timer() =
  if !own_timer then (
    let v = Netplex_semaphore.increment sem_name in
    let cont = Netplex_cenv.self_cont() in
    cont # send_message "*" "release_semaphore" [| sem_name |]
  )

(The destination address of "*" means that the message is broadcasted to all receivers. This could be optimized by sending only to the processes of the same service.)

Now, let's put everything together: When the Netplex system starts up, we add the plugin. When a process starts, we first try to create the semaphore, and then try to acquire it. If this is successful, we start the timer. For getting the shutdown right, we stop the timer and release the ownership before the process exits. Also, we listen on messages, and if the right one arrives, we look whether the semaphore is orphaned:

class our_hooks() =
  object(self)
    inherit Netplex_kit.empty_processor_hooks()

    method post_add_hook _ ctrl =
      ctrl # add_plugin Netplex_semaphore.plugin

    method post_start_hook cont =
      create_sem();
      if acquire_timer() then
        start_timer()

    method pre_finish_hook cont =
      stop_timer();
      release_timer()

    method receive_message cont msg_name msg_args =
      if msg_name = "release_sempahore" && msg_args = [| sem_name |] then (
        if acquire_timer() then
          start_timer()
      )
  end

There is still a possible improvement. As pointed out, a protected semaphore automatically adjusts the counter value when the process crashes. But this is only the first action of release_timer. The second is to notify the other processes of the vacant timer ownership. This could be done from the master process (which is complicated), or by simply starting another timer that continuously tries to get the semaphore:

    method post_start_hook cont =
      create_sem();
      if acquire_timer() then
        start_timer();
      ignore(Netplex_cenv.create_timer 
               (fun _ -> 
                  if acquire_timer() then
                    start_timer();
                  true
               )
               60.0)

Other IPC primitives

So far, Netplex does not only have semaphores and message broadcasting as primitives, but also:
  • Shared variables: The plugin Netplex_sharedvar allows the processes to access variables in the master process. This is a very limited mechanism, though: Accesses are quite expensive, and the variables should not become big (because they would have to be copied at fork time). Nevertheless, this is a useful mechanism to spread dynamic information in a Netplex process system.
  • Mutexes: The plugin Netplex_mutex can be used to protect critical code sections (e.g. when accessing shared variables). Again, this mechanism is slow.
  • Accessing Unix Domain sockets: It is possible to look up the address of Unix Domain sockets that are served by other services in the same Netplex system: the lookup method of containers. This can be used to establish a fast IPC channel between processes.

This list is going to be extended (e.g. message queues are on my list). Also, it is tried to speed up the mechanisms when Netplex finds out that the OS has native support for these primitives (e.g. use POSIX semaphores for implementing the Netplex_semaphore plugin if available).

Why using processes?

Some final words on why multi-processing is an interesting option for implementing network servers. This is important to explain because there are a number of myths about multi-processing (e.g. it is slow, hard to master, not that flexible), and especially non-tech people see it as a technology of the past. There are also big companies who are interested in spreading these myths, e.g. Microsoft, because their OS is a bad choice when it comes to multi-processing, or Sun, because Java is heavily optimized for multi-threading and JIT is not really compatible with multi-processing. The truth is that the big players simply did not invest in this technology. Actually, there are a number of advantages, and some make multi-processing superior:

First, there is the stability of the resulting programs. Even if a process crashes, the remaining processes of the system can continue to run. Netplex is designed with crash resilience in mind (crashed processes are restarted).

Second, multi-processing systems can more easily benefit from multicores because shared memory only plays a subordinate role for them which leads to more effective caching of RAM. Multi-processing systems often use uni- and bidirectional messaging instead. For programs written in Ocaml this is even more the case (but not for other languages) because Ocaml does not allow multi-threaded programs to profit from multicores at all.

Third, multi-processing systems can be more easily extended to clusters (i.e. they are running on several computers). The messaging mechanisms can often be easily implemented as network protocols.

Finally, many synchronization primitives are also available for the multi-processing case (as demonstrated). More or less, it is only shared memory which is really harder to master - e.g. Ocaml does not provide any form of memory management for explicitly allocated shared memory, so one could put Ocaml values directly into such memory blocks for easy and quick access. (However, I'm working on partial solutions for this.)

Where to get Ocamlnet 3

There is no final release yet. The current testing version is Ocamlnet-3.0test1. Look at this article for a list of changes.

Alternately, one can also check out the Subversion repository (use the svn command on this URL, or click on it to view it with your web browser - most of the discussed code lives in src/netplex).

Gerd Stolpmann works as O'Caml consultant. He is accepting new customers!
This web site is published by Informatikbüro Gerd Stolpmann
Powered by Caml