#!/usr/bin/perl -w

###############################################################################
#                                                                             #
#  TakTuk, a middleware for adaptive large scale parallel remote executions   #
#  deployment. Perl implementation, copyright(C) 2006 Guillaume Huard.        #
#                                                                             #
#  This program is free software; you can redistribute it and/or modify       #
#  it under the terms of the GNU General Public License as published by       #
#  the Free Software Foundation; either version 2 of the License, or          #
#  (at your option) any later version.                                        #
#                                                                             #
#  This program is distributed in the hope that it will be useful,            #
#  but WITHOUT ANY WARRANTY; without even the implied warranty of             #
#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the              #
#  GNU General Public License for more details.                               #
#                                                                             #
#  You should have received a copy of the GNU General Public License          #
#  along with this program; if not, write to the Free Software                #
#  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA #
#                                                                             #
#  Contact: Guillaume.Huard@imag.fr                                           #
#           ENSIMAG - Laboratoire ID                                          #
#           51 avenue Jean Kuntzmann                                          #
#           38330 Montbonnot Saint Martin                                     #
#                                                                             #
###############################################################################

############################################################################
###                          THIS IS TAKTUK                              ###
### In-code documentation is mostly made of comments that goes along     ###
### with the following prototypes                                        ###
### THE ORDER OF PRESENTATION IS ONLY LEXICOGRAPHIC, NO OTHER LOGIC      ###
### (except for the package settings which is placed first)              ###
### If you are looking for a good starting point, reading at the code of ###
### the package main seems a proper idea                                 ###
############################################################################

############################################################################
### CAUTION                                                              ###
### Even if the mechanics of taktuk looks able to work using a directed  ###
### acyclic graph as logical network, synchronization and messages       ###
### heavily rely on the fact that it is only a tree                      ###
############################################################################

# Some default settings for readline (otherwise painful to set)
$ENV{PERL_RL}=" o=0";

###############################################
### SETTINGS                                ###
### simple constants definition, no         ###
### implementation.                         ###
###############################################

package settings;

use constant FD => 0;
use constant SOCKET => 1;

###############################################
### PROTOTYPES                              ###
###############################################

###############################################
### ARGUMENTS                               ###
### argument line options parsing           ###
###############################################

package arguments;

# Returns TakTuk configuration as a flat string for the command line and
# an array of other TakTuk options
sub get_config();
# Creates a terminal object for using readline
sub initialize_terminal();
# Adds arguments to the internal command line
# Only fetched arguments are parsed by this package
# the values fetched may be opened filehandles to files that contain
# arguments
sub fetch_arguments(@);
# Undo the last issued call to get_next_stuff, use only this to undo
# arguments getting
sub restore_last_argument();
# Sets the variable $current_command to the value of the next command
# $arguments_ended is set to 1 if there are no more command
sub get_next_command();
# Sets the variable $current_argument to the value of the next argument
# $arguments_ended is set to 1 if there are no more arguments
sub get_next_argument();
# Read new data from a given file descriptor into the associated buffer
sub read_buffer_data($);
# Remove a buffer that is no more used
sub delete_buffer($);
# Returns a pointer to the next relevant data (either an argument or a pointer
# to a buffer filled with file data)
sub get_data_buffer();
# Returns true if the argument is a pointer to the first elements of the array
# @arguments (but do not spring $arguments[0] to existence)
sub is_first_argument($);
# Set the variable $current_stuff to the value of the next stuff in arguments
# The parameter is a word separator used when reading arguments for a
# file
sub get_next_stuff($);
# Returns the value of the next parameters
# $arguments_ended is set to 1 if there are no more arguments
sub get_parameters();
# Finds and skip the next command separator(s)
sub skip_command_separator();
# Closes any opened filehandle in the internal arguments list and sets
# $arguments_ended to 1
sub terminate_arguments_parsing();
# Adds a new option to the list of recognized options
# arguments are : short name, long name, type and ref
# Types are the following
# empty string means that the option is a boolean flag
# i means integer argument value
# f means floating point argument value
# s means string argument value
# ref is either a reference to a scalar (which is assigned the value of the
# option argument) or a reference to a function (which is called when the
# option is encountered and take the option value as argument if the option
# type is not boolean
# Optionally these arguments might be followed by a default value and a
# transfer mode
sub register_option($$$$@);
# These are routines that are registered with some of taktuk options
# Adds a file (which name is given) containing arguments to fetched arguments
sub args_file($);
# Adds machine names contained in a file (which name is given) as new remote
# peers to deploy
sub machines_file($);
# Adds a new remote peer which name is given. Also parses -[ and ]- imbrication
sub machine($);
# Changes the localhost name (as viewed by taktuk)
sub localhost($);
# Sets $general::root to 0
sub not_root();
# Prints default taktuk settings
sub print_defaults();
# Prints the taktuk package exported by taktuk_perl and exits
sub print_package($);
# Prints TakTuk version and exits
sub print_version();
# Performs registering of options accepted by taktuk and reading of environment
sub init();
# Parses and handle all arguments that are options
# I don't use GetOpt::Long because processing order is not ensured
sub parse_options();
# Checks if separators sets overlap of include characters that might break
# TakTuk command line parser
sub check_separators_integrity();
# Changes the value of some option
# parameters are the option name, the ref to destination and the value
sub set_option($$$);

###############################################
### COMMAND                                 ###
### command forking with pipe redirections  ###
###############################################

package command;

# Creates and bless a new command
# Arguments might include hash keys : line, read, write, error, type
# read, write and error are communication channels (fds) as viewed by the
# command : in other words, taktuk itself writes into read and reads from write
# and error
# type might either be settings::FD or settings::SOCKET depending on the nature
# of read and write
sub new (%);
# Forks a new process to execute the command line and redirects I/Os to the
# command communication channels
sub run ();
# Closes all opened communication channels for the command, waits for its
# completion and return its exit status
sub cleanup ();
# Does a shutdown and checks the return value
sub my_shutdown($$);
# Closes one of the communication channels (read, write or error);
sub close($);
# Propagate command output upward to the deployment tree root
# Arguments are : output type and message
sub output($$);

###############################################
### COMMUNICATOR                            ###
### communication server                    ###
###############################################

package communicator;

# Initializes data structures for the communicator before it can be used
sub init();
# Returns a asked set of connections ('sources', 'sinks', 'local_commands',...)
sub get_connections($);
# Returns the root connector (firt source)
sub get_root();
# Returns the control connector (outside world side)
sub get_control();
# Returns the control connector (within communicator side)
sub get_outgoing();
# Read pending data on a descriptor (second argument) of a connector (first
# argument) and process all complete messages in the connector
sub process_messages($$);
# Calls the hanler for the given coded message (second argument) coming from a
# given connector (first argument)
sub process_message($$);
# Prints output produced by a locally spawned command (root) or propagates it
# to the root (other nodes)
sub process_command_output($$);
# Enters into the main communicator loop
sub run ();
# Signify the communicator our wish to terminate the main loop
sub terminate ();
# Adds descriptors from a given connector or command communication channels to
# descriptors that the communicator listen to
sub add_descriptors($);
# Removes a descritpor (second argument) belonging to a connector or command
# (first argument) from the descriptors listened by the communicator
sub remove_descriptor($$);
# Returns true if no connectors are uninitialized
sub no_pending_connectors();
# Adds a connector (initialized) to one of the connectors sets (sources, ...)
sub add_connector($$);
# This function should be called when a connector already added to the
# communicator completes its initialization
sub connector_initialized($);
# Used internally by both remove_connector and remove_local_command
sub remove_from_set($$);
# This function should be called to remove a connector from the communicator
# whatever its state (initialized or not)
sub remove_connector($);
# Adds an already spawned local command to the communicator
sub add_local_command($);
# Removes a previously added local command
sub remove_local_command($);

###############################################
### CONNECTOR                               ###
### external connector code (no self        ###
### duplication)                            ###
###############################################

package connector;

# Creates an uninitialized connector
# Arguments : might include fields used by command (especially 'line' that
# will be used for most connector which are set up using an external command
# such as "ssh taktuk").
# Extra fields specific to connectors include : arguments
# This is a ref to an array of arguments that will be communicated to the
# forked taktuk engine
sub new (%);
# Reads data from a given file descriptor that should be one of the connector
# communication channels and store it to the connector buffer
# If the connector is not initialized, this function also negociate the
# initialization with the remote peer
# If the descriptor match the error channel, the text is propagated to the root
sub read_data ($);
# Returns the next available message or the empty string if none available
sub get_message ();
# Send a message to a remote peer
# The argument is the unpacked coded message
sub send_message($);
# Returns a scalar packed form of the connector
sub pack();
# Unpack a previously packed connector
sub unpack($);

###############################################
### DIAGNOSTIC                              ###
### debug and error messages                ###
###############################################

package diagnostic;

# Prints misc information for error diagnostic and debugging (root)
# propagate the information upward (other nodes)
# arguments : level, error type and additionnal message
sub print_info($$$);
# Various functions built on top of print_info
# A system call just failed
sub system ();
# Debugging information
sub debug ($);
# Error
sub error ($);
# Warning
sub warning ($);

###############################################
### GENERAL                                 ###
### Taktuk-wide settings                    ###
### This package contains both global       ###
### settings and some functions that do not ###
### really fit elsewhere                    ###
###############################################

package general;

# Initializes the value of some global settings
sub init();
# Opens a file and return a type glob to its descriptor
sub open_file($);
# Loads stuff from a file and return it
sub load_file($);
# Tries to figure out what's the filename of the taktuk executable and loads it
# into $general::taktuk_code
sub load_taktuk_file();
# Extract the taktuk package from the taktuk code and stores it into
# $general::taktuk_package
sub load_taktuk_package($);
# Prints taktuk help and exit from the program
sub print_help();
# Adds a new remote host to deploy
# Arguments are : the name of the host and the arguments to the taktuk command
sub add_peer(@);
# Unbuffered print
sub print($);
# Prints in a pretty form a tree-like structure, arguments : prefix, tree
sub print_tree($$);

###############################################
### HANDLERS                                ###
### messages handlers definition            ###
###############################################

package handlers;

# Associate a callback function (second argument) to a message code (first
# argument)
sub register_handler($$);
# Replaces the callback function previously associated to a message code
sub replace_handler($$);
# Returns the callback function associated to a message code
sub get_handler($);

# All these handlers are callback functions for incoming messages
# they all take three arguments : the message code, the connector from which
# the message comes and the message body

# Handler to use for message that need to wait for a particular event before
# being processed (e.g. the ready state for broadcast).
# Actually it should only be used by 'block_until_event'
sub handler_blocked($$$);
# Handler for reception of arguments for the taktuk command
sub arguments($$$);
# Handler for broadcasting a message contained in the body
# (the message is processed by all the nodes except the local one)
sub broadcast($$$);
# Closes all read channel of local commands
sub eof($$$);
# Execute a command on the localnode redirecting all I/Os
sub execute($$$);
# Propagates the message to the root
sub forward_up($$$);
# Input to be delivered to each local command
sub input($$$);
# Sends a message to another node
sub message($$$);
# Compute the logical number of each remote peer given the number of the
# local node passed as argument. The total count is also passed as argument
sub numbering($$$);
# Output to be printed. Should only be processed on the root node (use
# forward_up if necessary)
sub output($$$);
# Sends a pong back to incoming connector
sub ping($$$);
# Prints pong as a debug message
sub pong($$$);
# Ask for termination of the local node
sub quit($$$);
# Inform local node of readiness of the remote peer (used by father usually)
sub ready($$$);
# Handler for a timeout on recv call
sub recv_timeout($);
# This type of reduce count the number of deployed taktuk intance and calls
# the numbering sub once the count is completed
sub reduce_count($$$);
# This type of reduce gather the tree structure of the deployed taktuk network
# and calls print_tree on the root
sub reduce_tree($$$);
# General purpose reduce mechanics. Additional handlers are defined for each
# type of reduce
sub reduce($$$);
# Check wether some reduces have completed or not. Should be called when a
# connection is lost : no answer will come back from the connector but it might
# have been the last waited
sub check_ongoing_reduces();
# Sends the encapsuled message to the specified host(s) (logical number)
sub send_to($$$);
# Handler for broadcasting a message contained in the body
# (the message is processed by all the nodes including the local one)
sub spread($$$);
# Steal request
sub steal($$$);
# Function that does nothing (process a given message) but is expected to be
# strongly synchrnoized (as exec or send_to for instance)
sub synchronize($$$);
# Handler for reception of the taktuk code
sub taktuk_code($$$);
# Executes a perl interpreter already fetched with the taktuk package
sub taktuk_perl($$$);
# Sent by recv to signal that a message is waited and possibly timeouted
sub wait_message($$$);
# Work in answer to a steal request
sub work($$$);
# register all the handlers
sub init();

###############################################
### MAIN                                    ###
### main program                            ###
###############################################

package main;

# Translates a taktuk command line into internal message
# returns a message or a connector (if the message has to wait)
sub translate();
# Forks the process into
# - a loop that process command line read from stdin
# - a process connected via stdin/out to the other process and that will
# eventually run the taktuk engine at the end of this function
sub fork_taktuk_interpreter();
# Handle a taktuk command depending on local fork or not
sub handle_message($);
# Processes batch commands remaining as arguments after options parsing
# including initial TakTuk configuration commands
sub process_commands();

# This is the only package that contains raw code that will be the main
# program for taktuk

###############################################
### MY_SELECT                               ###
### my own implementation of select object  ###
###############################################

package my_select;

# Creates an empty select object
sub new();
# Adds a descriptor to the set listened by the select
sub add($$);
# Removes a descriptor from the select set
# Returns false if the descriptor is not in the set
sub remove($$);
# select call
# as in IO::Select, returns an array of three references to arrays
sub select($$$$);
# returns the array of registered handles
sub handles();

###############################################
### TAKTUK                                  ###
### base protocol encoding                  ###
### This is also the exported package for   ###
### external communication (which explains  ###
### the name)                               ###
###############################################

package taktuk;

# Turns autoflush on for the given filehandle
sub no_flush($);
# Unpacks stuff in a buffer begining by something packed
# Returns a couple : the unpacked stuff and the remaining of the buffer
sub unpack($);
# Packs some string so that it can be unpacked when it begining is found into
# a stream of bytes
sub pack($);
# Decode a previously coded message
sub decode($);
# Encode a message code and a message body into a single decodable string
sub encode($$);
# Same as CORE::syswrite with some error coping code
sub syswrite($$);
# Reads data from a given file descriptor and bufferizes it into a buffer
# managed by the taktuk package
sub read_data($);
# Returns the next available message for the given descriptor or the empty
# string if none available
sub get_message($);
# Find the next occurence of the required regexp (second argument) in the
# buffer associated to descriptor (first argument) and returns it
# Returns an empty string if the regexp do not match
# \n is treated as a delimiter by this function and cannot be part of the
# sequence
sub find_sequence($$);
# Returns the content of the buffer associated to a descriptor and empty it
sub flush_buffer($);
# Returns a textual description of any error that occured using send or recv
sub error_msg($);
# Sends a message to another node arguments are the node number and the message
sub send(%);
# Receive a message from another node under the form ($to, $from, $message)
sub recv(%);
# Decode a string specifying a set of intervals and transforms it to an array
sub decode_set($);
# Encode an array of intervals into a compact string specification
sub encode_set(@);

# syscalls default granularity
our $VERSION=30195;
our $read_size = 32768;
our $write_size = 32768;
our $error = undef;

# These are codes for available messages
our $eof="D";
our $taktuk_perl="E";
our $timeout="O";
our $pong="P";
our $reduce="R";
our $spread="S";
our $taktuk_code="T";
our $wait_message="W";
our $arguments="a";
our $broadcast="b";
our $downcast="d";
our $execute="e";
our $input="i";
our $message="m";
our $numbering="n";
our $output="o";
our $ping="p";
our $quit="q";
our $ready="r";
our $steal="s";
our $send_to="t";
our $forward_up="u";
our $work="w";
our $synchronize="x";

# Reduce types
our $reduce_count = 'c';
our $reduce_tree = 't';

###############################################
### SCHEDULER                               ###
### dispatching of connectors execution     ###
###############################################

package scheduler;

# Starts the deployment of a connector and add it to the communicator
# the arguments are the connector and a ref to an array of its arguments
sub deploy_connector($);
# Depending on the current deployment state, deploys the connector, adds it to
# the static list of pending connectors or adds it to the dynamic list
# arguments are the connector command and a reference to an array containing
# its arguments
sub add_connector($);
# Should be called upon completion of a connector initialization
sub connector_initialized($);
# Should be called upon failure of a connector initialization
sub connector_failed($);
# Depending on the current deployment state, deploys pending connectors, send
# steal requests or does nothing
sub schedule();
# Returns true if the scheduler has nothing to do
sub is_idle();
# Sends any sendable work (connectors deployment) to peers having asked for it
# returns true is some work has been sent
sub send_work();
# Dispatchs a received stolen connector (propagates downward last)
sub dispatch_work($);
# Handles the arrival of a new steal request from a given connector (first
# argument) using given parameters (second argument)
sub theft_handler($$);

###############################################
### SYNCHRONIZER                            ###
### enforcement of causal constraints       ###
###############################################

package synchronizer;

# Checks if the current taktuk instance is ready
# ready means : father is ready and I've nothing to do
# this function also replace temporary handlers and flushes pending messages
# when changing state from not ready to ready
sub check_ready_state();
# Should be initially called (as ready state will be propagated upon completion
# of the deployment)
# Should also be called when a new connector has just been added in the system
# (either as pending work, or as already forked connector)
# Called by scheduler::add_connector
sub set_not_ready();
# Should be called when a connector becomes initialized
sub initialization_complete($);
# Should be called when a connector fail to initialize
sub initialization_failed($);
# Should be called when a connector timeouts
sub initialization_timeout($);
# Should be called to block engine until nodes are numbered
sub set_not_numbering();
# Blocks the set of messages (array argument) as long as the required event
# (first argument) has not been dispatched
sub block_until_event($@);
# Dispatch an event
sub dispatch_event($);
# Adds a new message to the appropriate pending queue
# Should only be used when its usual handler has been previously blocked by
# 'block_until_event'
# The message will be processed as usual at the next according 'dispatch_event'
sub add_pending_message($$$);

###############################################
### TIMER                                   ###
### time accounting & timeouts management   ###
###############################################

package timer;

# Returns current system time in seconds since Jan. 1, 1970
# Might be fractional and has the precision of gettimeofday (milliseconds)
sub current_time();
# Adds a new callback function (second argument) to be called in the given
# number of seconds (first argument, possibly fractional)
sub register($$);
# Checks is some registered timers has expired and, in such a case, calls the
# handler and removes the timer
sub check_timeouts();
# Removes an unexpired timer
sub unregister();
# Get the current lifetime of some timer
sub gettime($);
# Prints the internal info of some timer
sub print($);

###############################################
### END OF PROTOTYPES                       ###
###############################################

###############################################
### GENERAL                                 ###
###############################################

package general;
use strict; use bytes;

use File::Basename;
use Sys::Hostname;

our $connector_command;
our $login_name;
our $host;
our $root=1;
our $taktuk_command;
our $self_propagate;
our $taktuk_code = undef;
our $taktuk_package = undef;
our $template;
our $redirect;

our $rank = -1;
our $count = -1;
our $child_min = -1;
our $child_max = -1;

sub init()
  {
    $host=hostname;
    chomp($host);
    $ENV{TAKTUK_HOSTNAME} = $host;
  }

sub open_file($)
  {
    my $filename = shift;
    my $fd = undef;

    if ($filename and ($filename ne "-"))
      {
        if (not open $fd, "<", $filename)
          {
            diagnostic::error("Trying to open $filename");
            diagnostic::system;
            $fd = undef;
          }
      }
    else
      {
        if ($main::interactive)
          {
            diagnostic::error("Cannot load STDIN in interactive mode");
          }
        elsif (not $general::root)
          {
            diagnostic::error("Cannot load STDIN on a non root node");
          }
        else
          {
            $fd = \*STDIN;
          }
      }
    return $fd;
  }

sub load_file($)
  {
    my $filename = shift;
    my $fd = undef;
    my $new_data;
    my $end = 0;
    my $stuff="";
    my $result;

    if ($filename and ($filename ne "-"))
      {
        if (not open $fd, "<", $filename)
          {
            diagnostic::error("Trying to open $filename");
            diagnostic::system;
            return "";
          }
      }
    else
      {
        if ($main::interactive)
          {
            diagnostic::error("Cannot load STDIN in interactive mode");
            return "";
          }
        elsif (not $general::root)
          {
            diagnostic::error("Cannot load STDIN on a non root node");
            return "";
          }
        else
          {
            $fd = \*STDIN;
          }
      }
    while (not $end)
      {
        $result = sysread($fd, $new_data, $taktuk::read_size);
        diagnostic::system if not defined($result);
        if ($result > 0)
          {
            $stuff = $stuff.$new_data;
          }
        else
          {
            $end = 1;
          }
      } 
    if ($fd != \*STDIN)
      {
        close $fd or diagnostic::system;
      }
    return $stuff;
  }

sub load_taktuk_file()
  {
    my $filename = $0;
    my $fd = undef;
    my $new_data;
    my $end = 0;
    my $result;

    if (not -r $filename)
      {
        $filename = `which $0`;
      }
    if ($filename)
      {
        if (not open $fd, "<", $filename)
          {
            diagnostic::error("Trying to open $filename");
            diagnostic::system;
            return "";
          }
      }
    else
      {
        diagnostic::error("Cannot find own taktuk executable");
        return "";
      }
    $taktuk_code = "";
    while (not $end)
      {
        $result = sysread($fd, $new_data, $taktuk::read_size);
        diagnostic::system if not defined($result);
        if ($result > 0)
          {
            # not really more efficient...
            #$new_data =~ s/^\s*//gmo;
            #$new_data =~ s/^\#.*$//gmo;
            $taktuk_code = $taktuk_code.$new_data;
          }
        else
          {
            $end = 1;
          }
      } 
    close $fd or diagnostic::system;
  }

sub load_taktuk_package($)
  {
    my $package_name = shift;
    my $state = 0;
    my $last_pos;

    if (not defined($taktuk_package))
      {
        if (not defined($taktuk_code))
          {
            diagnostic::warning("Taktuk code should have been loaded...")
                if ($self_propagate);
            load_taktuk_file;
          }
        $taktuk_package = "";
        my $last_pos = 0;
        while ($taktuk_code =~ /(?=package\s+([^;\s]*)\s*;\s*$)/gmi)
          {
            my $pos = pos($taktuk_code);
            $taktuk_package .= substr($taktuk_code,$last_pos, $pos - $last_pos)
                if $state;
            if ($1 eq $package_name)
              {
                $state = 1;
              }
            else
              {
                $state = 0;
              }
            $last_pos = $pos;
          }
        $taktuk_package .= substr($taktuk_code, $last_pos) if $state;
        $taktuk_package .= "1;\n";
      }
  }

sub print_help()
  {
    communicator::get_root->output('info', <<END_HELP);
Usage :
taktuk [ options ] [ commands ]

If one or more commands are given, TakTuk will execute them sequentially.
Otherwise, TakTuk enter interactive mode, waiting for commands.

Commands are (braces can be replaced by any delimiter):
set command              : sends the command to all the peers of the set
                           (see TakTuk manual for set specification syntax)
broadcast command        : broadcasts TakTuk command to all the remote peers
                           not including the node initiating the broadcast
close                    : closes stdin of all commands being executed locally
downcast command         : spreads TakTuk command to all the children of the
                           node initiating the command (not including itself)
exec [command]           : executes the given shell command on the local node
file_input [filename]    : sends the content of a file as input to all commands
                           being executed locally (might also be broadcasted).
                           Reads file from STDIN if filename is '-' or void
input [data]             : sends input to all commands being executed locally
line_input [data]        : same as input but adds a newline at the end of data
print_tree               : prints the TakTuk deployment tree
synchronize command      : forces the following command to wait for deployment,
                           numbering and preceding commands before executing
taktuk_perl [args]       : forks a perl interpreter on the local node aware of
                           the taktuk package and communication routines (see
                           manual for a description of the taktuk package which
                           contains point-to-point communication routines).
                           Similar in principle to 'exec perl args'.
                           WARNING: due to limited parser, you have to give
                           args (even if empty) and to use '--' to explicitely
                           terminate switches if any.
quit                     : quit TakTuk and shutdown its logical network

General options may contain :
-c, --connector command  : defines the connector commands used for following
                           remote machines.
-d, --dynamic limit      : turns dynamic mode on (work stealing) for all the
                           following remote machines specifications.
                           Uses limit as a maximal arity (0 = no limit).
                           A negative value for limit turns dynamic mode off.
                           Warning, currently it is a bad idea to use several
                           -d options on the same command line.
-f, --machines-file name : name is the name of a file that contains remote
                           machines name (equivalent to several -m opions)
-g, --time-granularity n : sets to n the maximal interval between timeouts
                           checks (usually checks are made more often).
-h, --help               : prints this help.
-i, --interactive        : forces interactive mode afer command line parsing.
-l, --login name         : sets the login name for the following hosts.
-m, --machine name -[ -] : name is the name of a remote host to be deployed
                           with its optional arguments between -[ and -].
-n, --no-numbering       : disable taktuk nodes numbering, speed up deployment
-o, --output-template    : set an output template specification for one of the
      name=specification   output streams (see man for details about
                           templates). When giving only a name (without
                           specification) this disables the stream.
-s, --self-propagate     : propagate the TakTuk executable through connectors
                           (eliminates the need for a TakTuk installation on
                           remote machines).
-t, --timeout time       : sets the timeout for connectors (0 = no timeout).
-v, --version            : prints TakTuk version and exits
-w, --window number      : sets initial window size to number (pipeline width).
-C, --command-separator  : changes the set of characters considered as command
      separators set       separators.
-F, --args-file name     : name is the name of a file that contains options.
-L, --localhost name     : sets the name of localhost as viewed by TakTuk.
-O, --option-separator   : changes the set of characters considered as option
      separators set       separators.
-P, --print-defaults     : prints default settings
-R, --output-redirect    : sets an output redirection for one of the output
      name=number          streams (see man for details about output streams)
-T, --taktuk-command com : com is the name of the TakTuk command that should
                           be used for the following hosts.
-W, --static-window      : turns off the dynamic adjustment of window size
                           (no effect, dynamic window mode not implemented).

Environment variables defined by TakTuk for commands execution (see man for
details about variables that change TakTuk settings):
TAKTUK_CONTROL           : file descriptor used by taktuk module
                           (usually not useful for most users)
TAKTUK_COUNT             : total number of successfully deployed nodes
TAKTUK_HOSTNAME          : hostname of the local node as given to TakTuk
TAKTUK_PIDS              : pids list of commands executed by the local TakTuk
TAKTUK_RANK              : logical number of local node (in [1..TAKTUK_COUNT])
END_HELP
    $main::terminate = 1;
  }

sub add_peer(@)
  {
    my $actual_command;
    my $peer_name = shift;
    my ($command_args, $message_args) = arguments::get_config;

    if ($self_propagate)
      {
        $actual_command = "echo $connector::connector_functional_string\\;".
                          "exec perl -- - -r ";
      }
    else
      {
        $actual_command = "exec $taktuk_command -r ";
      }
    $actual_command = $actual_command.$command_args." ";
    $actual_command = $actual_command."-L $peer_name ";
    foreach my $element (@_)
      {
        $message_args = $message_args.taktuk::pack($element);
      }
    my $connection_command = $connector_command;
    $connection_command .= " -l $login_name" if defined($login_name);
    my $connector=connector::new(command   => $connection_command,
                                 peer      => $peer_name,
                                 taktuk    => $actual_command,
                                 arguments => $message_args);

    diagnostic::debug("New external connector command : $connector->{line}");
    scheduler::add_connector($connector);
  }

sub print($)
  {
    my $message=shift;
    taktuk::syswrite(\*STDOUT,$message) or diagnostic::system;
  }

sub print_tree($$)
  {
    my $prefix = shift;
    my $data = shift;

    communicator::get_root->output('info',$prefix.(shift @$data)."\n");
    foreach my $subtree (@$data)
      {
        print_tree($prefix."    ",$subtree);
      }
  }

###############################################
### MY_SELECT                               ###
###############################################

package my_select;
use strict; use bytes;

sub new ()
  {
    my $data = { rin=>'', win=>'', ein=>'', handles=>[] };

    bless($data);
    return $data;
  }

sub add ($$)
  {
    my $self = shift;
    my $handle = shift;

    if (vec($self->{rin}, fileno($handle), 1) == 0)
      {
        vec($self->{rin}, fileno($handle), 1) = 1;
        vec($self->{win}, fileno($handle), 1) = 1;
        vec($self->{ein}, fileno($handle), 1) = 1;
        push @{$self->{handles}}, $handle;
      }
    else
      {
        diagnostic::warning("Descriptor already in set");
      }
  }

sub remove ($$)
  {
    my $self = shift;
    my $handle = shift;
    my $i=0;
    
    my $handles_list = $self->{handles};
    if (vec($self->{rin}, fileno($handle), 1) == 1)
      {
        vec($self->{rin}, fileno($handle), 1) = 0;
        vec($self->{win}, fileno($handle), 1) = 0;
        vec($self->{ein}, fileno($handle), 1) = 0;
        while (($i <= $#$handles_list) and ($handles_list->[$i] != $handle))
          {
            $i++;
          }
        if ($i <= $#$handles_list)
          {
            splice @$handles_list, $i, 1;
            return  1;
          }
        else
          {
            diagnostic::warning("Didn't found descriptor to remove");
            return 0;
          }
      }
    else
      {
        diagnostic::warning("Descriptor not in set");
      }
  }

sub select ($$$$)
  {
    my $read_select = shift;
    my $write_select = shift;
    my $except_select = shift;
    my $timeout = shift;

    my $rin = defined($read_select)?$read_select->{rin}:undef;
    my $win = defined($write_select)?$write_select->{win}:undef;
    my $ein = defined($except_select)?$except_select->{ein}:undef;
    my $rout;
    my $wout;
    my $eout;
    my ($nfound, $timeleft) =
        CORE::select($rout=$rin, $wout=$win, $eout=$ein, $timeout);

    if ($nfound == -1)
      {
        return ();
      }
    elsif ($nfound == 0)
      {
        return ([],[],[]);
      }
    else
      {
        my $read_set = [];
        my $write_set = [];
        my $except_set = [];
        my $handle;
        
        foreach $handle (@{$read_select->{handles}})
          {
            push(@$read_set, $handle) if (vec($rout,fileno($handle),1) == 1);
          }
        foreach $handle (@{$write_select->{handles}})
          {
            push(@$write_set, $handle) if (vec($wout,fileno($handle),1) == 1);
          }
        foreach $handle (@{$except_select->{handles}})
          {
            push(@$except_set, $handle) if (vec($eout,fileno($handle),1) == 1);
          }
        return ($read_set, $write_set, $except_set);
      }
  }

sub handles ()
  {
    my $self=shift;
    return @{$self->{handles}};
  }

###############################################
### ARGUMENTS                               ###
###############################################

package arguments;
use strict; use bytes;

our $has_readline = eval("use Term::ReadLine;1")?1:0;
our $terminal = undef;

our $command_separator;
our $option_separator;

use constant COMMAND_LINE => 0;
use constant MESSAGE => 1;

our %handler;
our %long_name;
our %short_name;
our %type;
our %transmission_mode;
our %current_value;
our @current_config;
our $current_config_outdated = 1;

our $arguments_ended = 1;
our $again=0;
our @arguments = ();
our $current_argument = undef;
our $current_stuff = undef;
our $current_separators = undef;
our $options_ended=0;

sub get_config()
  {
    if ($current_config_outdated)
      {
        my $command_args = "";
        my $message_args = "";

        foreach my $key (keys(%current_value))
          {
            if (exists($transmission_mode{$key}))
              {
                if ($transmission_mode{$key} == COMMAND_LINE)
                  {
                    if (UNIVERSAL::isa($current_value{$key}, 'HASH'))
                      {
                        foreach my $field (keys(%{$current_value{$key}}))
                          {
                            $command_args.=" -$key$field";
                            $command_args.="=$current_value{$key}->{$field}"
                                if defined($current_value{$key}->{$field});
                          }
                      }
                    else
                      {
                        $command_args.=" -$key";
                        $command_args.="$current_value{$key}" if ($type{$key});
                      }
                  }
                elsif ($transmission_mode{$key} == MESSAGE)
                  {
                    if (UNIVERSAL::isa($current_value{$key}, 'HASH'))
                      {
                        foreach my $field (keys(%{$current_value{$key}}))
                          {
                            $message_args.=taktuk::pack("-$key");
                            $message_args.=taktuk::pack("$field".
                                (defined($current_value{$key}->{$field})?
                                "=$current_value{$key}->{$field}":""));
                          }
                      }
                    else
                      {
                        $message_args.=taktuk::pack("-$key");
                        $message_args.=taktuk::pack($current_value{$key})
                            if ($type{$key});
                      }
                  }
                else
                  {
                    diagnostic::warning("Internal bug in get_config");
                  }
              }
          }
        diagnostic::debug("Config : $command_args | ".$message_args."\n");
        @current_config = ($command_args, $message_args);
        $current_config_outdated = 0;
      }
    return @current_config;
  }

sub initialize_terminal()
  {
    if ($has_readline)
      {
        $terminal = Term::ReadLine->new("TakTuk", \*STDIN, \*STDOUT);
        # issue a warning in instances without terminal
        #$terminal->ornaments(0);
      }
  }

sub fetch_arguments(@)
  {
    if (scalar(@_))
      {
        unshift @arguments, @_;
        $arguments_ended = 0;
        $options_ended = 0;
      }
  }

sub restore_last_argument()
  {
    if ($again)
      {
        diagnostic::warning("Cannot restore an ungeted argument");
      }
    else
      {
        $again = 1;
      }
  }

sub get_next_command()
  {
    my $command;

    get_next_stuff($option_separator);
    # retore command separator just in case option_separators overlap it
    if ($current_separators =~ m/([$command_separator])/)
      {
        $current_stuff .= $1;
        $current_separators = "";
      }
    if ($current_stuff =~ m/^([^$option_separator]+)[$option_separator]+(.*)$/)
      {
        $command = $1;
        $current_stuff = $2;
        $again = 1;
      }
    elsif ($current_stuff =~
                         m/^([^$command_separator]+)([$command_separator].*)$/)
      {
        $command = $1;
        $current_stuff = $2;
        $again = 1;
      }
    else
      {
        $command = $current_stuff;
        $current_stuff = "";
      }
    diagnostic::debug("Command found : [$command]");
    return $command;
  }

sub get_next_argument()
  {
    get_next_stuff($option_separator);
    $current_argument = $current_stuff;
    diagnostic::debug("Argument found : [$current_argument]");
    return $current_stuff;
  }

our %buffer;
our %end;

sub read_buffer_data($)
  {
    my $argument = shift;
    my $old_pos = undef;
    my $new_data;
    my $result;

    if (defined($terminal) and ($argument == \*STDIN))
      {
        $new_data = $terminal->readline("");
        if (defined($new_data))
          {
            $new_data .= "\n";
            $result = length($new_data);
          }
        else
          {
            $result = 0;
          }
      }
    else
      {
        $result = sysread($argument,$new_data,$taktuk::read_size);
      }

    $old_pos = pos($buffer{$argument});
    diagnostic::system if not defined($result);
    if ($result)
      {
        $buffer{$argument} .= $new_data if $result;
        pos($buffer{$argument}) = $old_pos;
      }
    else
      {
        $end{$argument} = 1;
        CORE::close $argument if ($argument != \*STDIN);
      }
    return $result;
  }

sub delete_buffer($)
  {
    my $argument = shift;

    diagnostic::warning("Bug") if not exists($end{$argument});
    delete $buffer{$argument};
    delete $end{$argument};
  }
    
sub get_data_buffer()
  {
    my $data = undef;
    my $args_remaining = 1;

    $args_remaining = 0 if not scalar(@arguments);
    while (not defined($data) and $args_remaining)
      {
        my $argument = $arguments[0];

        if (UNIVERSAL::isa($argument,'GLOB'))
          {
            if ($end{$argument})
              {
                delete_buffer($argument);
                shift @arguments;
                $args_remaining = 0 if not scalar(@arguments);
              }
            else
              {
                $buffer{$argument} = "" if not exists($buffer{$argument});
                read_buffer_data($argument) if not length($buffer{$argument});
                if (length($buffer{$argument}))
                  {
                    $data = \$buffer{$argument};
                  }
              }
          }
        else
          {
            $data = \$arguments[0];
          }
      }
    return $data;
  }

sub is_first_argument($)
  {
    my $data = shift;

    if (exists($arguments[0]) and ($data == \$arguments[0]))
      {
        return 1;
      }
    else
      {
        return 0;
      }
  }

sub get_next_stuff($)
  {
    my $separator = shift;
    my $done = 0;
    my $data = 1;

    if ($again)
      {
        $again = 0;
      }
    else
      {
        $current_stuff = "";
        $current_separators = "";
        while (not $done and defined($data))
          {
            $data = get_data_buffer;
            if (defined($data))
              {
                if (is_first_argument($data))
                  {
                    $current_stuff = $$data;
                    shift @arguments;
                    $done = 1;
                  }
                else
                  {
                    if ($$data =~ m/([$separator]+)/g)
                      {
                        $current_separators = $1;
                        $current_stuff .= substr $$data, 0,
                             pos($$data)-length($current_separators);
                        $$data = substr $$data, pos($$data);
                        if (length($current_stuff))
                          {
                            $done = 1;
                          }
                      }
                    else
                      {
                        $current_stuff .= $$data;
                        $$data = "";
                      }
                  }
              }
          }
        $arguments_ended = 1 if not length($current_stuff);
      }
  }

sub get_parameters()
  {
    my $left_brace = undef;
    my $right_brace;
    my $balance=1;
    my $parameters = "";
    my $data=1;

    # should not have $current_stuff not empty (probably a syntax error) :
    if (length($current_stuff))
      {
        diagnostic::warning("Warning invalid [$current_stuff] before ".
                                                                "parameters");
        $balance = 0;
      }
    while ($balance and defined($data))
      {
        $data = get_data_buffer;
        if (defined($data))
          {
            if (defined($left_brace) and is_first_argument($data))
              {
                $$data = " ".$$data;
              }
            if (not defined($left_brace))
              {
                $$data =~ s/^(.)//;
                $left_brace = $1;
                if ($left_brace =~ m/[({[]/)
                  {
                    $right_brace = $left_brace;
                    $right_brace =~ tr/({[/)}]/;
                  }
                else
                  {
                    $right_brace = "";
                  }
              }
            while ($balance and
                   ($$data =~ m/.*?([$right_brace$left_brace])/g))
              {
                my $brace = $1;
                if ($right_brace and ($brace =~ m/[$left_brace]/))
                  {
                    $balance++;
                  }
                else
                  {
                    $balance--;
                  }
              }
            if (not $balance)
              {
                $parameters .= substr $$data, 0, pos($$data)-1;
                $$data = substr $$data, pos($$data);
              }
            else
              {
                $parameters .= $$data;
                shift @arguments if is_first_argument($data);
                $$data = "";
              }
          }
      }
    if ($balance)
      {
        $arguments_ended = 1;
        diagnostic::warning("Bad $left_brace$right_brace balance");
      }
    diagnostic::debug("Parameters found : [$parameters]");
    return $parameters;
  }

sub skip_command_separator()
  {
    my $done = 0;
    my $data;

    if ($again)
      {
        $data = \$current_stuff;
      }
    else
      {
        $data = get_data_buffer;
      }
    while (not $done and not $arguments_ended)
      {
        if (defined($data))
          {
            if ($$data =~ s/^[$option_separator]*[$command_separator]+[$option_separator]*//)
              {
                $done = 1;
                shift @arguments if (not length($$data) and
                                                    is_first_argument($data));
              }
            else
              {
                if ($$data =~ m/^[$option_separator]*$/)
                  {
                    $$data = "";
                    shift @arguments if $data == \$arguments[0];
                    $data = get_data_buffer;
                  }
                else
                  {
                    diagnostic::warning("Missing command separator");
                    $done = 1;
                  }
              }
          }
        else
          {
            $arguments_ended = 1;
          }
      }
    $again = 0 if not length($current_stuff);
  }

sub terminate_arguments_parsing()
  {
    while (scalar(@arguments))
      {
        my $argument = shift @arguments;
        close $argument if UNIVERSAL::isa($argument,'GLOB');
      }
    $arguments_ended = 1;
  }

sub register_option($$$$@)
  {
    my $short = shift;
    my $long = shift;
    my $opt_type = shift;
    my $opt_handler = shift;

    $long_name{$short} = $long;
    $short_name{$long} = $short;
    $handler{$short} = $opt_handler;
    $type{$short} = $opt_type;

    if (scalar(@_))
      {
        if (UNIVERSAL::isa($opt_handler, 'CODE'))
          {
            &$opt_handler(shift);
          }
        else
          {
            $$opt_handler = shift;
          }
        if (scalar(@_))
          {
            $transmission_mode{$short} = shift;
          }
      }
  }

sub args_file($)
  {
    my $file = shift;
    my $file_handle = undef;
    $file_handle = general::open_file($file)
        or diagnostic::error("Can't open [$file]");
    fetch_arguments($file_handle);
  }

sub machines_file($)
  {
    my $file = shift;
    my $file_handle = undef;
    $file_handle = general::open_file($file)
        or diagnostic::error("Can't open [$file]");
    while (my $line = <$file_handle>)
      {
        foreach my $peer_name (split /\s+/,$line)
          {
            general::add_peer($peer_name);
          }
      }
    close $file_handle if $file_handle != \*STDIN;
  }

sub machine($)
  {
    my $peer = shift;
    my @peer_arguments=();

    get_next_argument;
    if ($current_argument eq "-[")
      {
        my $count = 1;
        while ($count and !$arguments_ended)
          {
            get_next_argument;
            $count-- if ($current_argument eq "-]");
            $count++ if ($current_argument eq "-[");
            push @peer_arguments, $current_argument;
          }
        if ($count)
          {
            diagnostic::error("Invalid -[ and -] imbrication");
            exit(1);
          }
        else
          {
            pop @peer_arguments;
          }
      }
    else
      {
        restore_last_argument;
      }
    general::add_peer($peer,@peer_arguments);
  }

sub localhost($)
  {
    my $hostname = shift;
    $general::host = $hostname;
    $ENV{TAKTUK_HOSTNAME} = $hostname;
  }

sub not_root()
  {
    if ($general::root)
      {
        # as soon as I know I'm not root I need a source connector for
        # the diagnostic package to work
        my $connector = connector::new('write' =>\*STDIN,
                                       'read'  =>\*STDOUT,
                                       'type'  =>settings::FD);
        # DO NOT CLOSE STDERR !!!
        # Actually some errors (such as error produced by connectors) use
        # STDERR to print a diagnostic message.
        # If I close STDERR, future forked connectors will redirect the
        # descriptor they know as STDERR wich is likely to have been reused by
        # some call to socketpair when creating the connector.
        # The result is that the connector is polluted by message which are not
        # conformant to the protocol.
        # This used to cause freeze in some parts of the taktuk deployment.
        #close(STDERR) or diagnostic::system;
        communicator::add_connector($connector, 'sources');
        $general::root = 0;
      }
  }

sub print_defaults()
  {
    foreach my $key (keys(%long_name))
      {
        if (not UNIVERSAL::isa($handler{$key}, 'CODE'))
          {
            my $name = $long_name{$key};
            if ($type{$key} =~ m/^h.$/)
              {
                foreach my $field (keys(%${$handler{$key}}))
                  {
                    my $line = uc("TAKTUK_".$name."_".$field);
                    my $value = defined(${$handler{$key}}->{$field})?
                                ${$handler{$key}}->{$field}:"";
                    $line =~ tr/-/_/;
                    communicator::get_root->output('info',
                                  $line."=".$value."\n");
                  }
              }
            else
              {
                my $line = uc("TAKTUK_".$name);
                my $value = defined(${$handler{$key}})?
                            ${$handler{$key}}:"";
                $line =~ tr/-/_/;
                communicator::get_root->output('info', $line."=".$value."\n");
              }
          }
      }
    $main::terminate = 1;
  }  

sub print_package($)
  {
    my $package_name = shift;

    general::load_taktuk_package($package_name);
    communicator::get_root->output('info',$general::taktuk_package);
    $main::terminate = 1;
  }

sub print_version()
  {
    my $version = int($taktuk::VERSION/1000)/10.0;
    my $release = $taktuk::VERSION%1000;
    communicator::get_root->output('info',
                                "TakTuk version $version release $release\n");
    $main::terminate = 1;
  }

sub init()
  {
    register_option("C","command-separator","s",
                    \$arguments::command_separator,',;\n',MESSAGE);
    register_option("D","debug","hi",\$diagnostic::package_level,
                    { default=>2 },COMMAND_LINE);
    register_option("F","args-file","s",\&args_file);
    register_option("L","localhost","s",\&localhost);
    register_option("O","option-separator","s",
                    \$arguments::option_separator,'\s',MESSAGE);
    register_option("P","print-defaults","",\&print_defaults);
    register_option("R","output-redirect","hi",\$general::redirect,
      { "default" => 1, "taktuk" => 2 }, MESSAGE);
    register_option("T","taktuk-command","s",\$general::taktuk_command,
                    $0,MESSAGE);
    register_option("W","static-window","",\$scheduler::static_window,
                    0,MESSAGE);
    register_option("c","connector","s",\$general::connector_command,
                  "ssh -o StrictHostKeyChecking=no -o BatchMode=yes", MESSAGE);
    register_option("d","dynamic","i",\$scheduler::dynamic_limit, 0, MESSAGE);
    register_option("f","machines-file","s",\&machines_file);
    register_option("g","time-granularity","f",\$communicator::select_timeout,
                    1, MESSAGE);
    register_option("h","help","",\&general::print_help);
    register_option("i","interactive","",\$main::forced_interactive);
    register_option("l","login","s",\$general::login_name, undef, MESSAGE);
    register_option("m","machine","s",\&machine);
    register_option("n","no-numbering","",\$main::no_numbering,0,COMMAND_LINE);
    register_option("o","output-template","hs",\$general::template,
      {
        "default"   => '"$host-$rank: $command ($pid): $type > $line\n"',
        "connector" => '"$host: $peer ($pid): $type > $line\n"',
        "info"      => '"$line$eol"',
        "status"    => '"$host-$rank: $command ($pid): $type > ".
            (WIFEXITED($line)?"Exited with status ".WEXITSTATUS($line)
           :(WIFSIGNALED($line)?"Signaled with signal ".WTERMSIG($line)
           :(WIFSTOPPED($line)?"Stopped with signal ".WSTOPSIG($line)
           :"")))."\n"',
        "taktuk"    => '"[ TAKTUK $level_name ] $host (PID $pid) Line ".
            "$line_number ($package)\n$line\n"'
      }, MESSAGE);
    register_option("p","print-package","s",\&print_package);
    register_option("r","not-root","",\&not_root);
    register_option("s","self-propagate","",\$general::self_propagate,
                    0,MESSAGE);
    register_option("t","timeout","f",\$scheduler::connector_timeout,0,MESSAGE);
    register_option("v","version","",\&print_version);
    register_option("w","window","i",\$scheduler::window, 10, MESSAGE);

    foreach my $full_name (keys(%ENV))
      {
        my $variable;
        if ($full_name =~ m/^TAKTUK_(.+)$/)
          {
            $variable = $1;
          }
        else
          {
            undef($variable);
          }
        if (defined($variable))
          {
            my $prefix;
            my $qualifier;

            $variable = lc($variable);
            $variable =~ m/^(.*?)(?:_([^_]+))?$/;
            ($prefix, $qualifier) = ($1, $2);
            $prefix =~ tr/_/-/;
            if (defined($qualifier))
              {
                if (exists($short_name{$prefix}))
                  {
                    my $short = $short_name{$prefix};
                    if ($type{$short} =~ m/^h.$/)
                      {
                        my $reference = ${$handler{$short}};
                        $reference->{$qualifier} = $ENV{$full_name};
                      }
                    else
                      {
                        diagnostic::warning("Hash specification given where ".
                                            "scalar expected in $full_name");
                      }
                  }
                else
                  {
                    $prefix .= "-$qualifier";
                    $qualifier = undef;
                  }
              }
            if (not defined($qualifier))
              {
                if (exists($short_name{$prefix}))
                  {
                    my $short = $short_name{$prefix};
                    if ($type{$short} !~ m/^h.$/)
                      {
                        my $reference = $handler{$short};
                        $$reference = $ENV{$full_name};
                      }
                    else
                      {
                        diagnostic::warning("Scalar given where hash ".
                                       "specification expected in $full_name");
                      }
                  }
                else
                  {
                    diagnostic::warning("Unknown setting $full_name");
                  }
              }
          }
      }
  }
                    
sub parse_options()
  {
    get_next_argument();

    while (not $options_ended)
      {
        if ($current_argument =~ s/^-//o)
          {
            my @names;

            if ($current_argument =~ s/^-(.*)$//o)
              {
                my $name = $1;
                if ($name)
                  {
                    my $short = $short_name{$name};
                    if (not defined($short))
                      {
                        diagnostic::warning("Unknown long option $name");
                        general::print_help if ($general::root);
                      }
                    else
                      {
                        $current_argument = $short;
                      }
                  }
                else
                  {
                    $options_ended = 1;
                  }
              }
      
            while ($current_argument)
              {
                $current_argument =~ s/^(.)//o;
                my $name = $1;
      
                if (exists($type{$name}))
                  {
                    my $destination = $handler{$name};
                    if ($type{$name})
                      {
                        get_next_argument if not length $current_argument;
                        if ($arguments_ended)
                          {
                            diagnostic::error("Missing argument")
                          }
                        else
                          {
                            set_option($name, $destination, $current_argument);
                          }
                        $current_argument = "";
                      }
                    else
                      {
                        set_option($name, $destination, 1);
                      }
                  }
                else
                  {
                    diagnostic::warning("Unknown short option $name");
                    general::print_help if ($general::root);
                  }
              } 
            get_next_argument;
          }
        else
          {
            # As a special case, the empty option might appear when using
            # custom option separator characters
            if (($current_argument =~ m/^\s*$/) and not $arguments_ended)
              {
                get_next_argument;
              }
            else
              {
                $options_ended=1;
                restore_last_argument if ($current_argument !~ m/^\s*$/);
              }
          }
      }
    check_separators_integrity();
  }

sub check_separators_integrity()
  {
    diagnostic::warning("Options and command separators intersect")
        if ($option_separator =~ m/[$command_separator]/) or
           ($command_separator =~ m/[$option_separator]/);
    my $chaine =
        "-/0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ";
    diagnostic::warning("Options or command separators contain one of :".
                        "-/1-9a-zA-Z")
        if ($chaine =~ m/[$command_separator]/) or
           ($chaine =~ m/[$option_separator]/);
  }

sub set_option($$$)
  {
    my $name = shift;
    my $destination = shift;
    my $value = shift;
    my $key = undef;
    my $current_type;

    # Hash values with type for value following
    if ($type{$name} =~ m/^h(.)$/)
      {
        $current_type = $1;
        if ($value =~ m/^([^=]+)(?:=(.+))?$/)
          {
            $key = $1;
            $value = $2;
          }
        else
          {
            diagnostic::warning("Option $name needs a well formed key=value ".
                                "pair ($value doesn't match)");
            $value = "";
          }
      }
    else
      {
        $current_type = $type{$name};
      }

    # Check if value is well formed
    if (($current_type eq "i") and ($value !~ /^-?[0-9]+$/o))
      {
        diagnostic::warning("Option $name needs a numeric argument ".
                            "($value doesn't match)");
      }
    elsif (($current_type eq "f") and ($value !~
                                /^(?:[0-9]+(?:\.[0-9]*)?|\.[0-9]+)$/o))
      {
        diagnostic::warning("Option $name needs a floating point argument ".
                            "($value doesn't match)");
      }
    else
      {
        if (defined($key))
          {
            if (exists($transmission_mode{$name}))
              {
                $current_value{$name} = {} if not exists $current_value{$name};
                $current_value{$name}->{$key} = $value;
              }
            if (UNIVERSAL::isa($destination,'CODE'))
              {
                diagnostic::warning("NOT EXPECTED: Found option $name with ".
                                    "code handler and hash value (key=$key)");
                &$destination($key, $value);
              }
            else
              {
                diagnostic::debug("Found option $name with scalar ".
                                  "handler and hash value (key=$key)");
                $$destination->{$key} = $value;
              }
          }
        else
          {
            $current_value{$name}=$value if exists($transmission_mode{$name});
            if (UNIVERSAL::isa($destination,'CODE'))
              {
                diagnostic::debug("Found option $name with code handler and ".
                                  "scalar value ". $value);
                &$destination($value);
              }
            else
              {
                diagnostic::debug("Found option $name with scalar ".
                                  "handler and scalar value ". $value);
                $$destination = $value;
              }
          }
        $current_config_outdated = 1 if exists($transmission_mode{$name});
      }
  }

###############################################
### DIAGNOSTIC                              ###
###############################################

package diagnostic;
use strict; use bytes;
use POSIX;

our $package_level;
our $depth = 0;
our $level="";
our $level_name="";
our $package="";
our $filename="";
our $line_number="";

sub print_info($$$)
  {
    $level = shift;
    $level_name = shift;
    ($package, $filename, $line_number) = caller(1);

    my $level_ok = defined($package_level->{$package})?
                   $package_level->{$package}<=$level:
                   $package_level->{default}<=$level;
    # depth is here to avoid infinite recursion when calling print_info from
    # the code executed by print_info (this is the case for instance when
    # debugging the send_message sub)
    $depth++;
    if ($level_ok and ($depth < 3))
      {
        communicator::get_root->output('taktuk',shift);
      }
    $depth--;
  }

sub system ()
  {
    print_info(3,"ERROR : SYSTEM", "Error $!");
  }

sub debug ($)
  {
    print_info(1,"DEBUG", shift);
  }

sub error ($)
  {
    print_info(3,"ERROR", shift);
  }

sub warning ($)
  {
    print_info(2,"WARNING", shift);
  }

###############################################
### TAKTUK                                  ###
###############################################

package taktuk;
use strict; use bytes;

our %buffer;

sub no_flush($)
  {
    my $new_fd = shift;
    binmode($new_fd);
    my $old_fd=select($new_fd);
    $|=1;
    select($old_fd);
  }

sub unpack($)
  {
    my $buffer = shift;

    if (length($buffer) >= 4)
      {
        my $size;
        ($size) = CORE::unpack("N",$buffer);
        if (length($buffer) >= $size+4)
          {
            return (substr($buffer, 4, $size), substr($buffer, $size+4));
          }
        else
          {
            return (undef, $buffer);
          }
      }
    else
      {
        return (undef, $buffer);
      }
  }

sub pack($)
  {
    my $full_message = shift;
    my $size = length($full_message);
    return CORE::pack("N",$size).$full_message;
  }

sub decode($)
  {
    my $message = shift;
    my $message_code = substr($message, 0, 1);
    my $body = substr($message, 1);
    return ($message_code, $body);
  }

sub encode($$)
  {
    my $message = shift;
    my $body = shift;
    return ($message).($body);
  }

sub syswrite ($$)
  {
    my $unrecoverable = 0;
    my $write_fd = shift;
    my $full_message = shift;
    my $result;
    my $total_expected = length($full_message);
    my $call_expected = $write_size;
    my $offset = 0;

    while ($total_expected and not $unrecoverable)
      {
        $call_expected = $total_expected if $call_expected > $total_expected;
        $result =
            CORE::syswrite($write_fd, $full_message, $call_expected, $offset);
        if ($result)
          {
            $total_expected -= $result;
            $offset += $result;
          }
        else
          {
            if ($!{EAGAIN})
              {
                # In this case the ressource is temporarily unavailable
                # This happens on a heavily loaded system in which too many
                # writes are pending
                # Here there are two options :
                # 1) we sleep for some time loosing potential write
                # opportunities
                # 2) we make another try right now potentially overloading the
                # system
                # In any case the message server is blocked and it would be
                # better to find another solution
                print STDERR "Delayed write ...";
                sleep 1;
              }
            else
              {
                # This is more serious, here we probably eventually end badly
                $unrecoverable = 1;
                # I guess we should end up even more badly by killing any child
                # connector (to avoid partially deployed crashed instances)
              }
          }
      }
    if ($unrecoverable)
      {
        return undef;
      }
    else
      {
        return 1;
      }
  }

sub read_data ($)
  {
    my $descriptor = shift;
    my $new_data;

    my $result = sysread($descriptor, $new_data, $read_size);
    return undef if not defined($result);

    if ($result and exists($buffer{$descriptor}))
      {
        $buffer{$descriptor} .= $new_data;
      }
    else
      {
        $buffer{$descriptor} = $new_data;
      }
    return $result;
  }

sub get_message ($)
  {
    my $descriptor = shift;

    if (exists($buffer{$descriptor}))
      {
        my ($message, $new_buffer) = taktuk::unpack($buffer{$descriptor});

        if (defined($new_buffer))
          {
            $buffer{$descriptor} = $new_buffer;
          }
        else
          {
            delete($buffer{$descriptor});
          }
        if (defined($message))
          {
            return $message;
          }
        else
          {
            return "";
          }
      }
    else
      {
        return "";
      }
  }

sub find_sequence($$)
  {
    my $descriptor = shift;
    my $sequence = shift;
    my $found = undef;

    if (exists($buffer{$descriptor}))
      {
        my $position;

        $position = index($buffer{$descriptor},"\n");
        while (($position >= 0) and not defined($found))
          {
            my $string;

            $string = substr($buffer{$descriptor}, 0, $position);
            $buffer{$descriptor} = substr($buffer{$descriptor}, $position+1);
            if ($string =~ m/($sequence)/)
              {
                $found = $1;
              }
            else
              {
                $position = index($buffer{$descriptor},"\n");
              }
          }
      }
    return defined($found)?$found:"";
  }

sub flush_buffer($)
  {
    my $descriptor = shift;

    if (exists($buffer{$descriptor}))
      {
        my $result = $buffer{$descriptor};
        delete($buffer{$descriptor});
        return $result;
      }
    else
      {
        return "";
      }
  }

our $control_channel;

if ($ENV{TAKTUK_CONTROL})
  {
    open($control_channel, "+<&=", $ENV{TAKTUK_CONTROL})
        or print("Error opening taktuk control channel : $!\n");
    no_flush($control_channel);
  }

use constant ESWRIT=>0;
use constant EFCLSD=>1;
use constant ESREAD=>2;
use constant EARGTO=>3;
use constant EARGBD=>4;
use constant ETMOUT=>5;
use constant EINVST=>6;

sub error_msg($)
  {
    my $error = shift;

    if ($error == ESWRIT)
      {
        return "taktuk::syswrite failed, system message : $!";
      }
    elsif ($error == EFCLSD)
      {
        return "TakTuk engine closed the communication channel";
      }
    elsif ($error == ESREAD)
      {
        return "sysread error, system message : $!";
      }
    elsif ($error == EARGTO)
      {
        return "field 'to' not defined";
      }
    elsif ($error == EARGBD)
      {
        return "field 'body' not defined";
      }
    elsif ($error == ETMOUT)
      {
        return "timeouted";
      }
    elsif ($error == EINVST)
      {
        return "invalid destination set specification";
      }
    else
      {
        return "Unknown error";
      }
  }

sub send(%)
  {
    my %argument = @_;
    my $from = $ENV{TAKTUK_RANK};

    $error=EARGTO && return undef if not exists($argument{to});
    my $to = $argument{to};
    $error=EARGBD && return undef if not exists($argument{body});
    my $body = $argument{body};

    my @send_set = taktuk::decode_set($to);
    if (scalar(@send_set))
      {
        $to = taktuk::encode_set(@send_set);
        my $full_message = taktuk::encode($taktuk::send_to,
                                      taktuk::pack($to).
                                      taktuk::encode($taktuk::message,
                                      taktuk::pack($to).
                                      taktuk::pack($from).
                                      $body));
        my $result = taktuk::syswrite($control_channel,
                                      taktuk::pack($full_message));
        $error=ESWRIT if not $result;
        return $result?$result:undef;
      }
    else
      {
        $error=EINVST;
        return undef;
      }
  }

sub recv(%)
  {
    my %argument = @_;
    my $result;
    my $message;

    # Notification of the recv to the server
    # Necessary in all cases as a timer should not be created if a message is
    # already there (we have to count waiters internally)
    if (exists($argument{timeout}))
      {
        $message = taktuk::encode($taktuk::wait_message, $argument{timeout});
      }
    else
      {
        $message = $taktuk::wait_message;
      }
    $result = taktuk::syswrite($control_channel,taktuk::pack($message));
    if (not $result)
      {
        $error=ESWRIT;
        return ();
      }

    # Now we actually get the message
    $message = get_message($control_channel);
    while ($result and not $message)
      {
        $result = read_data($control_channel);
        $message = get_message($control_channel);
      }

    if ($result)
      {
        my ($message_code, $to, $from);
        ($message_code, $message) = taktuk::decode($message);
        if ($message_code eq $taktuk::timeout)
          {
            $error=ETMOUT;
            return ();
          }
        ($to, $message) = taktuk::unpack($message);
        ($from, $message) = taktuk::unpack($message);

        return ($to, $from, $message);
      }
    else
      {
        if (defined($result))
          {
            $error=EFCLSD;
          }
        else
          {
            $error=ESREAD;
          }
        return ();
      }
  }

sub decode_set($)
  {
    my $string = shift;
    my @result = ();
    my $interval;
    my $i;

    ($interval,$string) = split /\//,$string,2;
    while ($interval)
      {
        my ($min, $max) = split /-/,$interval,2;
        my @replacement = ();
        my $state = 0;
        my $length = 0;

        $i = $#result;
        $max = $min if not $max;
        return () if ($min !~ /^[0-9]+$/o) or ($max !~ /^[0-9]+$/o) or
                     ($min > $max);
        while (($i > -1) and ($max < $result[$i]))
          {
            $i--;
            $state = 1 - $state;
          }
        unshift(@replacement, $max) if not $state;
        while (($i > -1) and ($min <= $result[$i]))
          {
            $i--;
            $state = 1 - $state;
            $length++;
          }
        unshift(@replacement, $min) if not $state;
        $i++;
        #print "splice @result, $i, $length, @replacement;\n";
        splice @result, $i, $length, @replacement;

        if (defined($string))
          {
            ($interval,$string) = split /\//,$string,2;
          }
        else
          {
            $interval = 0;
          }
      }
    for ($i=1; $i<$#result; $i+=2)
      {
        splice @result, $i, 2
            while (($i<$#result) and ($result[$i]+1 == $result[$i+1]));
      }
    return @result;
  }

sub encode_set(@)
  {
    my @set = @_;
    my $result = "";

    while (scalar(@set))
      {
        my $min = shift @set;
        my $max = shift @set;

        $result .= "/" if ($result);
        if ($min == $max)
          {
            $result .= "$min";
          }
        else
          {
            $result .= "$min-$max";
          }
      }
    return $result;
  }

###############################################
### COMMAND                                 ###
###############################################

package command;
use strict; use bytes;
use Socket;
use Fcntl;
use POSIX;

sub new (%)
  {
    my $data = { @_ };
    if (exists($data->{line}) and
        (exists($data->{read}) or exists($data->{write}) or
         exists($data->{error}) or exists($data->{type})))
      {
        diagnostic::error("Command built from both a command line and fds");
      }
    bless($data);
    return $data;
  }

sub run ()
  {
    my $self=shift;
    my ($father_read,$father_write,$father_error);
    my ($child_read,$child_write,$child_error);

    # Creates communication channel
    undef($father_read);
    undef($child_read);
    if (not socketpair($father_read, $child_read, AF_UNIX, SOCK_STREAM,
                       PF_UNSPEC))
      {
        diagnostic::system;
        return 0;
      }
    $father_write = $father_read;
    $child_write = $child_read;
    undef($father_error);
    undef($child_error);
    if (not pipe($father_error,$child_error))
      {
        diagnostic::system;
        return 0;
      }

    # Fork a new process to execute the connector command
    my $pid;
    $pid = fork;
    if (not defined($pid))
      {
         diagnostic::system;
         return 0;
      }
    $self->{pid} = $pid;
    if ($pid)
      {
        # father
        diagnostic::debug("New child with pid $pid created".
                     " to execute $self->{line}");
        CORE::close($child_read) or diagnostic::system;
        CORE::close($child_error) or diagnostic::system;
        $self->{read} = $father_read;
        $self->{write} = $father_write;
        $self->{error} = $father_error;
        $self->{type} = settings::SOCKET;
        return $self;
      }
    else
      {
        # child
        # This part has to be rewrote : when errors occur 
        # the connector seems to remain in some blocked state...
        CORE::close($father_read) or die $!;
        CORE::close($father_error) or die $!;
        open(STDIN,"<&",$child_read) or die $!;
        open(STDOUT,">&",$child_write) or die $!;
        open(STDERR,">&",$child_error) or die $!;
        CORE::close($child_read) or die $!;
        CORE::close($child_error) or die $!;
        fcntl(communicator::get_control()->{read}, F_SETFD, 0);
        exec($self->{line}) or die "Exec failed : $!";
      }
  }

sub cleanup ()
  {
    my $self = shift;
    foreach my $part ('read','write','error')
      {
        $self->close($part) if defined $self->{$part};
      }
    if (exists($self->{pid}))
      {
        diagnostic::debug("Cleaning up $self by waiting pid $self->{pid}");
        if (not $self->isa('connector') and exists($self->{line}))
          {
            if (not exists($self->{status}))
              {
                waitpid($self->{pid},0);
                $self->{status} = $?;
              }
            $self->output('status', $self->{status});
          }
      }
  }

sub my_shutdown($$)
  {
    my $fd = shift;
    my $how = shift;
    if (not shutdown($fd,$how))
      {
        # Socket was already disconnected ? could be worse
        diagnostic::system if not $!{ENOTCONN};
      }
  }

sub close($)
  {
    my $self=shift;
    my $part=shift;

    if ($part eq 'error')
      {
        if (exists($self->{error}))
          {
            taktuk::flush_buffer($self->{error});
            CORE::close($self->{error}) or diagnostic::system;
            delete $self->{error};
          }
        else
          {
            diagnostic::debug("Cannot close error : does not exists");
          }
      }
    else
      {
        if (exists($self->{type}) and exists($self->{$part}))
          {
            if ($self->{type} == settings::FD)
              {
                taktuk::flush_buffer($self->{write}) if $part eq 'write';
                # here we close a simple fd (e.g. read or write from stdin/out)
                CORE::close($self->{$part}) or diagnostic::system;
                delete $self->{$part};
              }
            elsif ($self->{type} == settings::SOCKET)
              {
                if ($part eq 'read')
                  {
                    # finished writing to read channel of child command
                    my_shutdown($self->{read},1);
                    delete $self->{read};
                  }
                elsif ($part eq 'write')
                  {
                    taktuk::flush_buffer($self->{write});
                    # finished reading from write channel of child command
                    my_shutdown($self->{write},0);
                    delete $self->{write};
                  }
                else
                  {
                    diagnostic::warning("Invalid part for socket close");
                  }
              }
            else
              {
                diagnostic::warning("Unknown command type : BUG");
              }
          }
        else
          {
            # We do not want of a warning if things come from several
            # successive eof
            diagnostic::warning("Closing a non existant command type or part")
                unless (exists($self->{type}) and ($part eq 'read'));
          }  
      }
  }

our $user_scalar=undef;

sub output ($$)
  {
    my $self = shift;
    my $type = shift;
    my $message = shift;
    my $template;
    my $fd;
    my $rank = $general::rank;

    if (exists($general::template->{$type}))
      {
        $template = $general::template->{$type};
      }
    else
      {
        $template = $general::template->{default};
      }
    if (exists($general::redirect->{$type}))
      {
        $fd = $general::redirect->{$type};
      }
    else
      {
        $fd = $general::redirect->{default};
      }
    if (defined($template))
      {
        my $host = $general::host;
        my $count = $general::count;
        my $peer = exists($self->{peer})?$self->{peer}:"";
        my $command = exists($self->{line})?$self->{line}:"";
        my $pid = exists($self->{pid})?$self->{pid}:$$;
        my $level = $diagnostic::level;
        my $level_name = $diagnostic::level_name;
        my $package = $diagnostic::package;
        my $filename = $diagnostic::filename;
        my $line_number = $diagnostic::line_number;

        my $line;
        my $eol;
        my $to_be_sent = "";

        pos($message) = 0;
        while ($message =~ /\G([^\n]*)(\n|$)/go)
          {
            $line = $1;
            $eol = $2;
            if (length($line) or length($eol))
              {
                my $result = eval("$template");
                $to_be_sent .= $result if defined($result) and length($result);
              }
          }
        if (length($to_be_sent))
          {
            communicator::process_message($self,
                taktuk::encode($taktuk::forward_up,
                    taktuk::encode($taktuk::output,
                        taktuk::pack($fd).$to_be_sent)));
          }
      }
  }

###############################################
### CONNECTOR                               ###
###############################################

package connector;
use strict; use bytes;

our @ISA=qw(command);
our $init_string="Taktuk initialization, version ";
our $connector_functional_string="Taktuk connector functional";
our $ready_to_load_string="Taktuk ready to load";
# It is important to keep the temporal total order on state
# as it will be used later
our $pre_load=1;
our $first_load=0;
our $simple_connector=2;
our $initialized=3;

sub new (%)
  {
    my $data=command::new(@_,
             data_handler=>\&communicator::process_messages,
             remove_handler=>\&communicator::remove_connector);
    return 0 if not $data;
    $data->{last_given} = 0;
    $data->{peers_count} = undef;
    if (exists($data->{peer}))
      {
        if ($general::self_propagate)
          {
            $data->{state}=$pre_load;
          }
        else
          {
            $data->{state}=$simple_connector;
          }
        $data->{line} = "$data->{command} $data->{peer} $data->{taktuk}";
      }
    else
      {
        $data->{state} = $initialized;
      }
    bless($data);
    return $data;
  }

sub read_data ($)
  {
    my $self = shift;
    my $descriptor = shift;

    my $result = taktuk::read_data($descriptor);
    diagnostic::system if not defined($result);

    my $write = undef;
    my $error = undef;
    $write = $self->{write} if exists $self->{write};
    $error = $self->{error} if exists $self->{error};

    if ($write and ($descriptor == $write))
      {
        if ($result >0)
          {
            if ($self->{state} < $initialized)
              {
                my $argument = 0;

                if ($self->{state} == $pre_load)
                  {
                    if (length(taktuk::find_sequence($descriptor,
                                   ".*$connector_functional_string")))
                      {
                        $argument = 1;
                      }
                  }
                elsif ($self->{state} == $simple_connector)
                  {
                    my $sequence = taktuk::find_sequence($descriptor,
                                       ".*$init_string"."[0-9]+[.0-9]*");
                    if (length($sequence))
                      {
                        $argument = $sequence;
                        $argument =~ s/.* (.*)$/$1/o;
                      }
                  }
                else
                  {
                    diagnostic::warning("Wrong connector state : ".
                                        $self->{state});
                  }
                    
                if ($argument > 0)
                  {
                    my $connector_input = $self->{read};

                    if ($self->{state} < $simple_connector)
                      {
                        taktuk::syswrite($connector_input,
                            $general::taktuk_code) or diagnostic::system;
                      }
                    else
                      {
                        diagnostic::warning("Protocol versions do not match")
                            if $argument != $taktuk::VERSION;
                      }

                    $self->{state} += 1;
                    diagnostic::debug("Connector $self->{line} promoted to ".
                                      "state ".$self->{state});
                    synchronizer::initialization_complete($self)
                        if ($self->{state} == $initialized);
                  }
              }
          }
      }
    elsif ($error and ($descriptor == $error))
      {
        if ($result >0)
          {
            my $new_data = taktuk::flush_buffer($descriptor);
            $self->output('connector', $new_data);
          } 
      }
    else
      {
        diagnostic::warning("Unknown descriptor");
      }
    return $result;
  }

sub get_message ()
  {
    my $self = shift;

    if ($self->{state} >= $initialized)
      {
        if (exists($self->{write}))
          {
            return taktuk::get_message($self->{write});
          }
        else
          {
            return "";
          }
      }
    else
      {
        return "";
      }
  }

sub send_message($)
  {
    my $self = shift;
    my $write_fd = $self->{read};
    my $full_message = taktuk::pack(shift);

    if (not taktuk::syswrite($write_fd, $full_message))
      {
        diagnostic::error("Error in send message, connector dumps into ".
                          Dumper($self));
      }
  }

sub pack()
  {
    my $self = shift;

    return taktuk::pack($self->{command}).
           taktuk::pack($self->{peer}).
           taktuk::pack($self->{taktuk}).
           taktuk::pack($self->{arguments});
  }

sub unpack($)
  {
    my $buffer = shift;
    my ($command, $peer, $taktuk, $arguments);

    ($command, $buffer) = taktuk::unpack($buffer);
    ($peer, $buffer) = taktuk::unpack($buffer);
    ($taktuk, $buffer) = taktuk::unpack($buffer);
    ($arguments, $buffer) = taktuk::unpack($buffer);

    my $connector = connector::new(command   => $command,
                                   peer      => $peer,
                                   taktuk    => $taktuk,
                                   arguments => $arguments);
    return ($connector, $buffer);
  }

###############################################
### COMMUNICATOR                            ###
###############################################

package communicator;
use strict; use bytes;
use Socket;
use POSIX;
use Errno;

our $sinks_number = 0;
our $initialized_sinks_number = 0;

our $select;
our $select_timeout;

our $connections;
our $control_connector = 0;
our $end = 0;
our %c_from_fd;

# default connector to which it is only possible to write
# (used by get_root as a last ressort, useful for diagnostic)
our $default_root = connector::new('read' =>\*STDOUT,
                                   'type' =>settings::FD);

sub init()
  {
    $select = my_select::new() or diagnostic::error("Select creation");
    $connections = {};
    $connections->{sources} = [];
    $connections->{sinks} = [];
    $connections->{control} = [];
    $connections->{local_commands} = [];

    # Now I create the control channel
    my $communicator_side = undef;
    my $control_side = undef;
    # Be sure to store the control side in a persistent place, otherwise
    # Perl frees it and closes the socketpair !!!
    if (not socketpair($communicator_side, $control_side,
                       AF_UNIX, SOCK_STREAM, PF_UNSPEC))
      {
        diagnostic::system;
        # Annoying, without control channel half of the functionnalities
        # are missing...
        diagnostic::error("Degradated mode (no control channel)");
      }
    # Probably redundant with what's done in add_connector, but doesn't harm
    taktuk::no_flush($communicator_side);
    taktuk::no_flush($control_side);
    my $communicator_connector = connector::new('read'  =>$communicator_side,
                                                'write' =>$communicator_side,
                                                'type'  =>settings::SOCKET);
    $communicator_connector->{pending_messages} = 0;
    add_connector($communicator_connector, 'control');
    $ENV{TAKTUK_CONTROL} = fileno($control_side);
    # A connector for external use only (communicator listen to the other side
    $control_connector = connector::new('read'  =>$control_side,
                                        'write' =>$control_side,
                                        'type'  =>settings::SOCKET);
  }

sub get_connections($)
  {
    my $set_name = shift;
    my $list = $connections->{$set_name};
    return @$list;
  }

sub get_root()
  {
    my $sources = $connections->{sources};

    if (defined($sources) and scalar(@$sources))
      {
        return $sources->[0];
      }
    else
      {
        return $default_root;
      }
  }

sub get_control()
  {
    return $control_connector;
  }

sub get_outgoing()
  {
    my $sources = $connections->{control};

    if (scalar(@$sources))
      {
        return $sources->[0];
      }
    else
      {
        return 0;
      }
  }

sub process_message($$)
  {
    my $connector = shift;
    my ($message, $body) = taktuk::decode(shift);
    my $function = handlers::get_handler($message);

    if (defined($function))
      {
        &$function($message, $connector, $body);
      }
    else
      {
        diagnostic::warning("Unknown message : $message (body: $body)");
      }
  }

sub process_messages($$)
{
  my $connector = shift;
  my $descriptor = shift;

  my $result = $connector->read_data($descriptor);
  my $message = $connector->get_message;

  while ($message)
    {
      process_message($connector, $message);
      $message = $connector->get_message;
    }
  return $result;
}

sub process_command_output($$)
  {
    my $command = shift;
    my $descriptor = shift;

    my $buffer;
    my $read_result;

    $read_result = sysread($descriptor, $buffer, $taktuk::read_size);
    diagnostic::system if not defined($read_result);
    if ($read_result > 0)
      {
        my $type;
        if (exists($command->{write}) and ($descriptor == $command->{write}))
          {
            $type = "output";
          }
        else
          {
            $type = "error";
          }
        $command->output($type, $buffer);
      }
    return $read_result;
  }

sub run ()
  {
    my @select_result;
    my $sinks = $connections->{sinks};
    my $local_commands = $connections->{local_commands};

    while ((not $end) or scalar(@$sinks) or scalar(@$local_commands))
      {
        timer::check_timeouts;
        @select_result = 
            my_select::select($select,undef,$select,$select_timeout);
        if (scalar(@select_result))
          {
            my ($read_set, $write_set, $exception_set) = @select_result;
            while (scalar(@$read_set))
              {
                my $descriptor = shift @$read_set;
                my $cobidule = $c_from_fd{$descriptor};

                if (exists($cobidule->{data_handler}))
                  {
                    my $handler = $cobidule->{data_handler};
                    my $result = &$handler($cobidule, $descriptor);
                    remove_descriptor($cobidule, $descriptor) if (not $result);
                  }
                else
                  {
                    diagnostic::error("Bug : connector has no data handler");
                  }
              }
            if (scalar(@$write_set))
              {
                diagnostic::debug("Unexpected ready to write fds : ".
                    join(' ', @$write_set));
              }
            if (scalar(@$exception_set))
              {
                diagnostic::debug("Unexpected exceptional fds : ".
                    join(' ', @$exception_set));
              }
          }
        else
          {
            # Here we might have been thrown out of select by a timeout
            # but we have to test for error conditions (just in case)
            if ($!{EBADF})
              {
                my $error_msg="Selected an invalid descriptor ($!)... Very ".
                              "bad\nRegistered handles = ";
                foreach my $handle ($select->handles)
                  {
                    $error_msg.="$handle (".fileno($handle).") ";
                  }
                diagnostic::error($error_msg);
                exit 1;
              }
            elsif ($!{EINTR})
              {
                diagnostic::warning("Select exited because of a signal");
              }
            elsif ($!{EINVAL})
              {
                diagnostic::error("Invalid time limit for select");
              }
            else
              {
                #diagnostic::debug("Select timeout in communicator loop : ".
                #    "end $end, sinks @sinks, sources @sources , commands ".
                #    "@local_commands");
              }
          }
      }
  }

sub terminate ()
  {
    $end = 1;
  }

sub add_descriptors($)
  {
    my $command = shift;
    diagnostic::debug("Adding $command to hash & select");
    if (exists($command->{write}))
      {
        $select->add($command->{write});
        $c_from_fd{$command->{write}} = $command;
      }
    if (exists($command->{error}))
      {
        $select->add($command->{error});
        $c_from_fd{$command->{error}} = $command;
      }
  }

sub remove_descriptor($$)
  {
    my $cobidule = shift;
    my $descriptor = shift;
    my $type;
    my $other;

    my $write = undef;
    my $error = undef;
    $write = $cobidule->{write} if exists $cobidule->{write};
    $error = $cobidule->{error} if exists $cobidule->{error};

    if (defined($descriptor))
      {
        if (defined($write) and ($write == $descriptor))
          {
            $other = $error;
            $type = 'write';
          }
        elsif (defined($error) and ($error == $descriptor))
          {
            $other = $write;
            $type = 'error';
          }
        else
          {
            diagnostic::error("Invalid descriptor, serious BUG !");
          }
        if (exists $c_from_fd{$descriptor})
          {
            $select->remove($descriptor);
            delete $c_from_fd{$descriptor};
            $cobidule->close($type);
            if ((not defined($other)) or (not exists $c_from_fd{$other}))
              {
                # the remove handler is not mandatory (e.g. file reads)
                if (exists($cobidule->{remove_handler}))
                  {
                    my $remove_handler = $cobidule->{remove_handler};
                    &$remove_handler($cobidule);
                  }
              }
          }
        else
          {
            diagnostic::warning("Descriptor not present in hash");
          }
      }
    else
      {
        diagnostic::warning("Should not be called with undefined descriptor");
      }
  }

sub no_pending_connectors()
  {
    diagnostic::debug("Connectors situation, sinks_number : $sinks_number, ".
                      "initialized : $initialized_sinks_number");
    dagnostic::warning("More initialized than sinks")
        if ($sinks_number<$initialized_sinks_number);
    return $sinks_number == $initialized_sinks_number;
  }

sub add_connector($$)
  {
    my $connector = shift;
    my $set_name = shift;
    my $set = $connections->{$set_name};

    taktuk::no_flush($connector->{read});

    add_descriptors($connector);
    push(@$set,$connector);
    $sinks_number++ if $set_name eq 'sinks';
    diagnostic::debug("Connector $connector->{line} added")
        if defined($connector->{line});
  }

sub connector_initialized($)
  {
    my $connector = shift;

    $initialized_sinks_number++;
  }

sub remove_from_set($$)
  {
    my $cobidule = shift;
    my $set_name = shift;
    my $set = $connections->{$set_name};
    my $i = 0;

    $i++ while (($i <= $#$set) and ($set->[$i] != $cobidule));
    if ($i <= $#$set)
      {
        splice @$set, $i, 1;
        diagnostic::debug("Cobidule $cobidule, $cobidule->{line}, ".
                            "$cobidule->{pid} deleted from set ".
                            "$set_name at position $i");
        return 1;
      }
    else
      {
        return 0;
      }
  }

sub remove_connector($)
  {
    my $connector = shift;
    my @sets_names = ('sinks', 'sources');

    my $current_set_name = shift @sets_names;
    my $found = 0;
    while (not $found and defined($current_set_name))
      {
        $found = remove_from_set($connector, $current_set_name);
        $current_set_name = shift @sets_names if not $found;
      }
    if ($found)
      {
        if ($current_set_name eq 'sources')
          {
            if (not scalar(@{$connections->{sources}}))
              {
                # useless to do anything cleanly now as I lost connection
                # might change if some commands running have side effects
                # but even in this last case, its better to handle it through
                # usual synchronization because of resulting hanged zombies
                general::print("Exiting after loosing my sources ($$)\n");
                exit 1;
              }
          }
        elsif ($current_set_name eq 'sinks')
          {
            # Remove it first because synchronizer::initialization failed
            # checks for ready state
            $sinks_number--;
            if ($connector->{state} >= $connector::initialized)
              {
                #diagnostic::warning("The connector was initialized");
                $initialized_sinks_number--;
              }
            else
              {
                #diagnostic::warning("The connector was not initialized");
                synchronizer::initialization_failed($connector);
              }
            # in any case, I've to check ongoing reduces to avoid blocking
            # current instance (e.g. in the case of the initial reduce_count)
            # For now reduces only propagate upward, so only sinks might change
            # a reduce result.
            handlers::check_ongoing_reduces;
          }
        $connector->cleanup;
      }
    else
      {
        diagnostic::warning("Connector to remove not found !");
      }
  } 

sub add_local_command($)
  {
    my $command = shift;
    my $local_commands = $connections->{local_commands};

    push @$local_commands,$command;
    add_descriptors($command);
    diagnostic::debug("Command $command added");
  }

sub remove_local_command($)
  {
    my $command = shift;
    my $found = remove_from_set($command, 'local_commands');

    if ($found)
      {
        $command->cleanup;
      }
    else
      {
        diagnostic::warning("Command to remove not found !");
      }
  }

###############################################
### SCHEDULER                               ###
###############################################

package scheduler;
use strict; use bytes;

our $dynamic_limit;
our $window;
our $static_window;
our $connector_timeout;

# undeployed connectors
our @static_connectors = ();
our @dynamic_connectors = ();

# in progress
our @waiting_thieves = ();
our $current_window = 0;
our $arity = 0;
our $steal_request_sent = 0;

# stats
our $sum_connection_times = 0;
our $sum_user_times = 0;
our $num_connections = 0;
our $local_idleness = 0;

our $average_connection_time = 0;
our $average_user_time = 0;
our $min_connection_time = 0;

sub deploy_connector($)
  {
    my $connector = shift;

    if ($connector->run())
      {
        if ($connector_timeout)
          {
            my $timer = timer->register($connector_timeout,
                                     \&synchronizer::initialization_timeout);
            $connector->{timer} = $timer;
            $timer->{connector} = $connector;
          }
        communicator::add_connector($connector, 'sinks');
        diagnostic::debug("Connector added : ".$connector->{line});
        $arity++;
        $current_window++;
      }
    else
      {
        # Ok, there's probably too many forks

        # Old code that try to guess the proper fix
        # actually this end up in deployment failures when system is higly
        # loaded (in this case the test succeed, hum)
        # So I changed to a more basic solution
        #if ($current_window < 1)
        #  {
        #    # We're in a big trouble, I have to deploy but I cannot ...
        #    # In this case I give up
        #    diagnostic::error("Cannot deploy connector due to fork ".
        #                      "unavailability");
        #  }
        #else
        #  {
        #    diagnostic::warning("Delaying connector due to fork error");
        #    $window=$current_window;
        #    scheduler::add_connector($command_line, $arguments);
        #  }
        #diagnostic::warning("Delaying connector due to fork error");
        #sleep 1;
        #scheduler::add_connector($connector);
        diagnostic::warning("Giving up connection to $connector->{peer}");
        # I guess a better long term solution would be to send connector to
        # children when failure repeats on the local node
      }
  }

sub add_connector($)
  {
    my $connector = shift;

    if (($current_window < $window) and
        (($dynamic_limit <= 0) or ($arity < $dynamic_limit)))
      {
        deploy_connector($connector);
      }
    else
      {
        if ($dynamic_limit < 0)
          {
            push @static_connectors, $connector;
          }
        else
          {
            push @dynamic_connectors, $connector;
          }
      }
    # Should only be necessary when new deployment are initiated after the very
    # first setup and the initial propagation of ready state
    # (At the time this comment is written, no command allow the creation of a
    # new connector and thus the following line has no importance)
    synchronizer::set_not_ready();
  }

sub connector_initialized($)
  {
    $current_window--;
    schedule;
  }

sub connector_failed($)
  {
    my $connector = shift;

    $current_window--;
    $arity--;
    schedule;
  }

sub schedule()
  {
    while (scalar(@static_connectors) and ($current_window < $window))
      {
        my $connector = shift @static_connectors;
        deploy_connector($connector);
      }
    while (scalar(@dynamic_connectors) and ($current_window < $window)
           and (($dynamic_limit == 0) or ($arity < $dynamic_limit)))
      {
        my $connector = shift @dynamic_connectors;
        deploy_connector($connector);
      }
    if (not $general::root and ($current_window < $window)
        and (($dynamic_limit == 0) or ($arity < $dynamic_limit)) and
        not $steal_request_sent)
      {
        communicator::get_root->send_message(
          taktuk::encode($taktuk::steal,
            "$average_connection_time $average_user_time $local_idleness"));
        diagnostic::debug("Steal request sent with parameters :".
            "$average_connection_time $average_user_time $local_idleness");
        $steal_request_sent = 1;
      }
    else
      {
        diagnostic::debug("Nothing done in scheduler, static work : "
                          .scalar(@static_connectors).", dynamic work : "
                          .scalar(@dynamic_connectors));
      }
  }

sub is_idle()
  {
    return !scalar(@static_connectors) && !scalar(@dynamic_connectors);
  }

sub send_work()
  {
    my $work_to_give = scalar(@dynamic_connectors) && scalar(@waiting_thieves);
    my $work_given = $work_to_give;

    diagnostic::debug("Theft, dynamic connectors : ".scalar(@dynamic_connectors)
                     .", waiting thieves : ".scalar(@waiting_thieves)
                     .", work to give : $work_to_give"); 
    while ($work_to_give)
      {
        my $thief = shift @waiting_thieves;
        my $last_given = $thief->{last_given};
        my $to_give;
        my $given=0;
        my $workpack="";

        if ($last_given)
          {
            my $available = $#dynamic_connectors;
            $available /= 2;
            $available = $available/2 + $available%2;
            $to_give = $last_given * 2;
            $to_give = $available if $to_give > $available;
          }
        else
          {
            $to_give = 1;
          }
        for ($given=0; $given < $to_give; $given++)
          {
            my $connector_to_deploy = shift @dynamic_connectors;

            $workpack = $workpack.$connector_to_deploy->pack();
          }

        $thief->send_message(taktuk::encode($taktuk::work,
                                              $workpack));
        diagnostic::debug("Work sent : ".$workpack);
        $work_to_give = scalar(@dynamic_connectors) && scalar(@waiting_thieves);
      }
    return $work_given;
  } 

sub dispatch_work($)
  {
    my $workpack = shift;
    my $connector;

    while ($workpack)
      {
        ($connector, $workpack) = connector::unpack($workpack);
        diagnostic::debug("Unpacked connector: $connector");
        push @dynamic_connectors, $connector; 
      }
    # reset steal information
    $steal_request_sent = 0;
    # priority to local schedule (ASAP)
    schedule;
    send_work;
  }

sub theft_handler($$)
  {
    my $connector = shift;
    my $parameters = shift;

    push @waiting_thieves, $connector;
    send_work;
    if (scalar(@waiting_thieves) and not $general::root and
        not $steal_request_sent)
      {
        communicator::get_root->send_message(
          taktuk::encode($taktuk::steal, $parameters));
        diagnostic::debug("Forwarded steal request, parameters : $parameters");
        $steal_request_sent = 1;
      }
    elsif (scalar(@waiting_thieves))
      {
        if ($general::root)
          {
            diagnostic::debug("Cannot satisfy steal request : no more dynamic ".
                              "work");
          }
        elsif ($steal_request_sent)
          {
            diagnostic::debug("Request already sent");
          }
        
      }
  }

###############################################
### TIMER                                   ###
###############################################

package timer;
use strict; use bytes;
use Time::HiRes;

our @registered_timers;

sub current_time()
  {
    my ($seconds, $micro) = Time::HiRes::gettimeofday;
    return $seconds + $micro/1000000;
  }

sub register($$)
  {
    my $type = shift;
    my $timeout = shift;
    my $handler = shift;
    my $current = current_time;
    my $data = { 'handler'=>$handler,
                 'birth'=>$current };
    bless($data, $type);
    if ($timeout)
      {
        $data->{timeout} = $current+$timeout;
        my $i=$#registered_timers;
        $i-- while (($i >= 0) and
                    ($registered_timers[$i]->{timeout} > $data->{timeout}));
        splice @registered_timers, $i+1, 0, $data;
      }
    else
      {
        $data->{timeout} = 0;
      }
    diagnostic::debug("Registered new timer $data, ".$data->print.
                      ", list = @registered_timers");
    return $data;
  }

sub check_timeouts()
  {
    my $current = current_time;
    while (scalar(@registered_timers) and
           ($registered_timers[0]->{timeout} <= $current))
      {
        my $timer = shift @registered_timers;
        my $handler = $timer->{handler};
        &$handler($timer);
        diagnostic::debug("Timeout handled for $timer, ".$timer->print);
      }
  }

sub unregister()
  {
    my $timer = shift;
    if ($timer->{timeout})
      {
        my $i=0;
        $i++ while (($i <= $#registered_timers) and
                    ($registered_timers[$i] != $timer));
        if ($i <= $#registered_timers)
          {
            splice @registered_timers, $i, 1;
            diagnostic::debug("Unregistered timer $timer");
          }
        else
          {
            diagnostic::warning("Unregistering didn't found timer $timer");
          }
      }
  }

sub gettime($)
  {
    my $current = current_time;
    my $timer = shift;
    return $current - $timer->{birth};
  }

sub print($)
  {
    my $current = current_time;
    my $timer = shift;
    return "Timer created at $timer->{birth} timeouting at ".
           "$timer->{timeout} current is $current";
  }

###############################################
### SYNCHRONIZER                            ###
###############################################

package synchronizer;
use strict; use bytes;

# states
our %states = (ready=>1, numbering=>1);

# ready state help variables
our $father_is_ready=0;
our $first_time=1;

# pending stuff waiting ready state
our @ready_handlers = ();
our @pending_messages = ();

our %blocked;
our %event_waited;

sub check_ready_state()
  {
    if (!$states{ready})
      {
        if ($father_is_ready && scheduler::is_idle &&
            communicator::no_pending_connectors)
          {
            # The first network setup enforce complete synchronisation (all
            # commands are initially blocked until its completion), it must be
            # propagated
            # Afterward, nothing can be ensured as conflicting operation might
            # be concurrently started (e.g. a new connector by might be
            # added on root while a broadcast is initiated in a leaf)
            if ($first_time)
              {
                foreach my $connector (communicator::get_connections('sinks'))
                  {
                    $connector->send_message($taktuk::ready);
                  }
                $first_time = 0;
              }
            dispatch_event('ready');
            diagnostic::debug("I'm ready");
          }
        else
          {
            diagnostic::debug("I'm not ready : father_is_ready = ".
                              "$father_is_ready, is_idle = ".
                              scheduler::is_idle.", no_pending_connector = ".
                              communicator::no_pending_connectors);
          }
      }
  }

sub set_not_ready()
  {
    if ($states{ready})
      {
        $states{ready} = 0;
        block_until_event('ready', $taktuk::broadcast,
                                   $taktuk::downcast,
                                   $taktuk::spread);
      }
  }

sub initialization_complete($)
  {
    my $connector = shift;

    $connector->{timer}->unregister if (exists($connector->{timer}));
    if ($general::self_propagate)
      {
        $connector->send_message(taktuk::encode($taktuk::taktuk_code,
                                           $general::taktuk_code));
      }
    my $arguments = $connector->{arguments};
    diagnostic::debug("Arguments : ".$arguments);
    $connector->send_message(taktuk::encode($taktuk::arguments, $arguments));

    communicator::connector_initialized($connector);
    scheduler::connector_initialized($connector);
    diagnostic::debug("Connector $connector->{line} initialized");
    # must be the last operation as it depends on ongoing deployments
    check_ready_state;
  }

sub initialization_failed($)
  {
    my $connector = shift;

    $connector->output('connector', "initialization failed");
    # Nothing to be done for communicator as this function should be called from
    # communicator::remove_connector
    scheduler::connector_failed($connector);
    # must be the last operation as it depends on ongoing deployments
    check_ready_state;
    # checking ongoing reduces is useless here as no reduce could have been
    # initiated before the ready state
  }

sub initialization_timeout($)
  {
    my $timer = shift;
    my $connector = $timer->{connector};
    if ($connector->{state} >= $connector::initialized)
      {
        diagnostic::warning("Bug, timeouted an initialized connector");
      }
    else
      {
        $connector->output('connector', "timeouted");
        kill 9, $connector->{pid};
        # the shutdown of a connector is more complicated than a simple
        # call to remove_connector, this is remove_descriptor which eliminates
        # a descriptor from the select list and calls remove_connector when
        # necessary.
        # remove_connector should also call synchronizer::initialization_failed
        communicator::remove_descriptor($connector, $connector->{write})
            if (defined($connector->{write}));
        communicator::remove_descriptor($connector, $connector->{error})
            if (defined($connector->{error}));
      }
  }

sub set_not_numbering()
  {
    if ($states{numbering})
      {
        $states{numbering} = 0;
        # message should not be necessary, but there no harm to include it
        block_until_event('numbering', $taktuk::execute,
                                       $taktuk::eof,
                                       $taktuk::input,
                                       $taktuk::message,
                                       $taktuk::send_to,
                                       $taktuk::synchronize,
                                       $taktuk::quit,
                                       $taktuk::taktuk_perl);
      }
  }

sub block_until_event($@)
  {
    my $event = shift;
    my $handlers = {};

    foreach my $message (@_)
      {
        if (exists($event_waited{$message}))
          {
            diagnostic::warning("Multiple blocking for $message");
          }
        else
          {
            $handlers->{$message} = handlers::get_handler($message);
            handlers::replace_handler($message, \&handlers::handler_blocked);
            $event_waited{$message} = $event;
          }
      }
    $blocked{$event} = { handlers=>$handlers, pending=>[] };
  }

sub dispatch_event($)
  {
    my $event = shift;
    my $handlers = $blocked{$event}->{handlers};
    my $pending_list = $blocked{$event}->{pending};

    $states{$event} = 1;
    foreach my $message (keys(%$handlers))
      {
        handlers::replace_handler($message, $handlers->{$message});
        delete $event_waited{$message};
      }
    while (scalar(@$pending_list))
      {
        my $message = shift @$pending_list;
        my $connector = shift @$pending_list;
        my $body = shift @$pending_list;

        my $function = handlers::get_handler($message);
        &$function($message, $connector, $body);
        diagnostic::debug("Processing $message with body $body");
      }
    delete $blocked{$event};
  }

sub add_pending_message($$$)
  {
    my $message = shift;
    my $connector = shift;
    my $body = shift;

    my $event = $event_waited{$message};
    my $pending_list = $blocked{$event}->{pending};
    push @$pending_list, $message, $connector, $body;
  }

###############################################
### HANDLERS                                ###
###############################################

package handlers;
use strict; use bytes;

our %handlers;

sub register_handler($$)
  {
    my $message = shift;

    if (defined($handlers{$message}))
      {
        diagnostic::error("Handler already defined for $message");
      }
    else
      {
        $handlers{$message} = shift;
      }
  }

sub replace_handler($$)
  {
    my $message = shift;

    if (defined($handlers{$message}))
      {
        $handlers{$message} = shift;
      }
    else
      {
        diagnostic::error("Handler not defined for $message");
      }
  }

sub get_handler($)
  {
    my $message = shift;
    return $handlers{$message};
  }


sub handler_blocked($$$)
  {
    my $message = shift;
    my $connector = shift;
    my $body = shift;

    synchronizer::add_pending_message($message, $connector, $body);
  }
    
sub arguments($$$)
  {
    my $message = shift;
    my $connector = shift;
    my $body = shift;
    my $element;
    my @arguments;

    while ($body)
      {
        ($element, $body) = taktuk::unpack($body);
        push @arguments, $element;
      }

    # Now I've to handle all the stuff (just as in main)
    arguments::fetch_arguments(@arguments);
    arguments::parse_options;

    main::process_commands;
    # Useful especially for stealing if I've nothing to do
    scheduler::schedule;
  }

sub broadcast($$$)
  {
    my $message = shift;
    my $connector = shift;
    my $body = shift;

    foreach my $other_connector (communicator::get_connections('sources'),
                                 communicator::get_connections('sinks'))
      {
        if ($other_connector != $connector)
          {
            diagnostic::debug("Spreading message to $other_connector");
            $other_connector->send_message(taktuk::encode(
                                            $taktuk::spread, $body));
          }
      }
    # this message should not be interpreted on the originating node
    # this is the behavior of spread which is sent to others
    # communicator::process_message($connector, $body);
  }

sub downcast($$$)
  {
    my $message = shift;
    my $connector = shift;
    my $body = shift;

    foreach my $other_connector (communicator::get_connections('sinks'))
      {
        if ($other_connector != $connector)
          {
            diagnostic::debug("Spreading message to $other_connector");
            $other_connector->send_message(taktuk::encode(
                                            $taktuk::spread, $body));
          }
      }
    # this message should not be interpreted on the originating node
    # this is the behavior of spread which is sent to others
    # communicator::process_message($connector, $body);
  }

sub eof($$$)
  {
    my $message = shift;
    my $connector = shift;
    my $body = shift;

    foreach my $command (communicator::get_connections('local_commands'))
      {
        $command->close('read');
        diagnostic::debug("Closed inputs for command $command");
      }
  }

sub execute($$$)
  {
    my $message = shift;
    my $connector = shift;
    my $body = shift;

    if ($general::root)
      {
        # More annoying than helpfull
        # diagnostic::warning("Execution has no effect on the root node, you ".
        #  "probably forgot 'broadcast' when typing 'broadcast exec command'");
        return 0;
      }
    else
      {
        my $command = command::new(line=>$body,
               data_handler=>\&communicator::process_command_output,
               remove_handler=>\&communicator::remove_local_command);
        my $result;

        $ENV{TAKTUK_PIDS} = join(' ', map("$_->{pid}",
                            communicator::get_connections('local_commands')));
        if ($result = $command->run())
          {
            communicator::add_local_command($command);
          }
        else
          {
            diagnostic::warning("Giving up command $body");
          }
        return $result;
      }
  }

sub forward_up($$$)
  {
    my $message = shift;
    my $connector = shift;
    my $body = shift;

    if ($general::root)
      {
        communicator::process_message($connector, $body);
      }
    else
      {
        foreach my $other_connector (communicator::get_connections('sources'))
          {
            $other_connector->send_message(
                              taktuk::encode($taktuk::forward_up, $body));
          }
      }
  }

sub input($$$)
  {
    my $message = shift;
    my $connector = shift;
    my $body = shift;

    foreach my $command (communicator::get_connections('local_commands'))
      {
        taktuk::syswrite($command->{read}, $body) or diagnostic::system;
      }
  }

sub message($$$)
  {
    my $message = shift;
    my $connector = shift;
    my $body = shift;
    my $control = communicator::get_outgoing;

    $control->send_message(taktuk::encode($message, $body));
    $control->{pending_messages}++;
    if (($control->{pending_messages} == 0) and exists($control->{timer}))
      {
        $control->{timer}->unregister;
        delete $control->{timer};
      }
  }

sub numbering($$$)
  {
    my $message = shift;
    my $connector = shift;
    my $body = shift;

    my ($rank, $count) = split / /,$body;
    diagnostic::debug("I'm $rank among $count");

    $general::rank = $rank;
    $ENV{TAKTUK_RANK} = $rank;
    $general::count = $count;
    $ENV{TAKTUK_COUNT} = $count;

    my $current = $rank+1;
    $general::child_min = $current;
    foreach my $other_connector (communicator::get_connections('sinks'))
      {
        if ($other_connector != $connector)
          {
            $other_connector->send_message(taktuk::encode($taktuk::numbering,
                                           "$current $count"));
            $other_connector->{min} = $current;
            $current += $other_connector->{$taktuk::reduce_count};
            $other_connector->{max} = $current-1;
          }
        else
          {
            diagnostic::warning("Bug : numbering coming from a sink");
          }
      }
    $general::child_max = $current-1;
    synchronizer::dispatch_event('numbering');
  }

our %filehandles;

sub output($$$)
  {
    my $message = shift;
    my $connector = shift;
    my $body = shift;

    if ($general::root)
      {
        my ($fd, $remaining) = taktuk::unpack($body);

        if (not exists($filehandles{$fd}))
          {
            open ($filehandles{$fd}, ">&=", $fd) or diagnostic::system;
          }
        taktuk::syswrite($filehandles{$fd}, $remaining) or diagnostic::system;
      }
    else
      {
        diagnostic::warning("Output message received on non root node");
      }
  }

sub ping($$$)
  {
    my $message = shift;
    my $connector = shift;
    my $body = shift;

    $connector->send_message($taktuk::pong);
  }

sub pong($$$)
  {
    my $message = shift;
    my $connector = shift;
    my $body = shift;

    diagnostic::debug("pong");
  }

sub quit($$$)
  {
    my $message = shift;
    my $connector = shift;
    my $body = shift;

    communicator::terminate;
  }

sub ready($$$)
  {
    my $message = shift;
    my $connector = shift;
    my $body = shift;

    diagnostic::debug("Ready received");
    $synchronizer::father_is_ready = 1;
    synchronizer::check_ready_state;
  }

sub recv_timeout($)
  {
    my $timer = shift;

    $timer->{connector}->send_message($taktuk::timeout);
    $timer->{connector}->{pending_messages}++;
  }

our %reduce_data;
our %reduce_rank_complete;
our %reduce_handler = ($taktuk::reduce_count => \&reduce_count,
                       $taktuk::reduce_tree  => \&reduce_tree);

sub reduce_count ($$$)
  {
    my $rank = shift;
    my $new_value = shift;
    my $data = shift;

    if ($rank == -1)
      {
        return 1;
      }
    elsif ($rank == -2)
      {
        if ($general::root)
          {
            # the root, numbered 0, is not counted as deployed
            $data--;
            numbering($taktuk::numbering, communicator::get_root, "0 $data");
            return undef;
          }
        else
          {
            return $data;
          }
      }
    else
      {
        return $data + $new_value; 
      }
  }

sub reduce_tree($$$)
  {
    my $rank = shift;
    my $new_value = shift;
    my $data = shift;
    my $result;

    if ($rank == -1)
      {
        $result =  "['$general::host ($general::rank, ".
                   "$synchronizer::states{ready})'";
      }
    elsif ($rank == -2)
      {
        if ($general::root)
          {
            eval('$new_value = '.$data."]");
            general::print_tree("",$new_value);
            return undef;
          }
        else
          {
            return $data."]";
          }
      }
    else
      {
        $result =  $data.",".$new_value; 
      }

    # This reduce is special and not synchronized with connectors...
    # so we have to bypass the usual reduce synchronization...
    my @sinks = communicator::get_connections('sinks');

    $rank++;
    while (($rank <= $#sinks) and
           ($sinks[$rank]->{state} < $connector::initialized))
      {
        $result .= ", ['connecting $sinks[$rank]->{peer}']";
        $rank++;
      }
    $reduce_rank_complete{$taktuk::reduce_tree} = $rank-1;

    # ... and we have to propagate the reduce by ourselves
    $sinks[$rank]->send_message(taktuk::encode($taktuk::reduce,
                                               $taktuk::reduce_tree))
        if ($rank <= $#sinks);
    return $result;
  }

sub reduce($$$)
  {
    my $message = shift;
    my $connector = shift;
    my $body = shift;
    my @sinks = communicator::get_connections('sinks');
    my $i;

    my ($type,$value) = taktuk::decode($body);

    if (not exists($reduce_rank_complete{$type}))
      {
        diagnostic::error("Handler not defined for reduce $type")
            if not exists($reduce_handler{$type});
        foreach my $sink (@sinks)
          {
            $sink->{$type} = undef;
          }
        $reduce_rank_complete{$type} = -1;
        $reduce_data{$type} = &{$reduce_handler{$type}}(-1, undef, undef);
      }
    $connector->{$type} = $value;
    $i = $reduce_rank_complete{$type} + 1;
    while (($i <= $#sinks) and defined($sinks[$i]->{$type}))
      {
        $reduce_rank_complete{$type}++;
        $reduce_data{$type} = &{$reduce_handler{$type}}($i,
                                    $sinks[$i]->{$type}, $reduce_data{$type});
        
        $i = $reduce_rank_complete{$type} + 1;
      }
    if ($i > $#sinks)
      {
        $reduce_data{$type} = &{$reduce_handler{$type}}(-2, undef,
                                                         $reduce_data{$type});
        if (defined($reduce_data{$type}))
          {
            my $reply_connector = communicator::get_root;
      
            $reply_connector->send_message(taktuk::encode( $taktuk::reduce,
                                  taktuk::encode($type, $reduce_data{$type})));
          }
        delete $reduce_rank_complete{$type};
      }
  }

sub check_ongoing_reduces()
  {
    foreach my $reduce_type (keys(%reduce_rank_complete))
      {
        reduce($taktuk::reduce, communicator::get_root, $reduce_type.0);
      }
  }

sub send_to($$$)
  {
    my $message = shift;
    my $connector = shift;
    my $body = shift;

    my ($to, $remaining) = taktuk::unpack($body);
    diagnostic::debug("Message to $to and I'm $general::rank");

    my $new_message;
    my @destination_list = taktuk::decode_set($to);
    my @upward = ();
    my @downward = ();
    my $delivery = 0;
    my $min = shift @destination_list;
    my $max = shift @destination_list;

    # propagation upward of low numbered destinations
    while (defined($min) and ($min < $general::rank))
      {
        if ($max >= $general::rank)
          {
            push @upward, $min, $general::rank-1;
            unshift @destination_list, $general::rank, $max;
          }
        else
          {
            push @upward, $min, $max;
          }
        $min = shift @destination_list;
        $max = shift @destination_list;
      }
    # local delivering
    if (defined($min) and ($min == $general::rank))
      {
        $delivery = 1;
        if ($max == $general::rank)
          {
            $min = shift @destination_list;
            $max = shift @destination_list;
          } 
        else
          {
            $min = $general::rank+1;
          }
      }
    # propagation downward of destination within children interval
    my $i=0;
    my @sinks = communicator::get_connections('sinks');
    while (defined($min) and ($min <= $general::child_max))
      {
        if ($max > $general::child_max)
          {
            push @upward, $general::child_max+1, $max;
            $max = $general::child_max;
          }
        while (($i <= $#sinks) and ($min > $sinks[$i]->{max}))
          {
            if (scalar(@downward))
              {
                $new_message = taktuk::encode($message,
                    taktuk::pack(taktuk::encode_set(@downward)).$remaining);
                $sinks[$i]->send_message($new_message);
                @downward = ();
              }
            $i++;
          }
        if (($i <= $#sinks) and ($min < $sinks[$i]->{min}) and
            ($max >= $sinks[$i]->{min}))
          {
            unshift @destination_list, $sinks[$i]->{min}, $max;
            $max = $sinks[$i]->{min}-1;
          }
        if (($i > $#sinks) or ($min < $sinks[$i]->{min}))
          {
            # I got a small issue : some destinations are not available
            diagnostic::warning("Send problem, ".
                (($min != $max)?"$min-$max":"$min")." not available anymore");
          }
        else
          {
            if ($max > $sinks[$i]->{max})
              {
                unshift @destination_list, $sinks[$i]->{max}+1, $max;
                $max = $sinks[$i]->{max};
              }
            push @downward, $min, $max;
          }
        $min = shift @destination_list;
        $max = shift @destination_list;
      }
    if (scalar(@downward))
      {
        $new_message = taktuk::encode($message,
            taktuk::pack(taktuk::encode_set(@downward)).$remaining);
        $sinks[$i]->send_message($new_message);
      }
    # propagation upward of high numbered destinations
    while (defined($min))
      {
        push @upward, $min, $max;
        $min = shift @destination_list;
        $max = shift @destination_list;
      }
    if (scalar(@upward))
      {
        if ($general::root)
          {
            diagnostic::warning("Send problem, ".taktuk::encode_set(@upward).
                                " is(are) invalid destination(s)");
          }
        else
          {
            $new_message = taktuk::encode($message,
                taktuk::pack(taktuk::encode_set(@upward)).$remaining);
            communicator::get_root->send_message($new_message);
          }
      }
    # actual local delivery
    if ($delivery)
      {
        diagnostic::debug("Delivered message");
        communicator::process_message(communicator::get_control, $remaining);
      }
  }

sub spread($$$)
  {
    my $message = shift;
    my $connector = shift;
    my $body = shift;

    broadcast($message, $connector, $body);
    diagnostic::debug("Handling message locally");
    communicator::process_message($connector, $body);
  }

sub steal($$$)
  {
    my $message = shift;
    my $connector = shift;
    my $body = shift;

    diagnostic::debug("Steal request : $body, connector : $connector");
    scheduler::theft_handler($connector, $body);
    synchronizer::check_ready_state;
  }

sub synchronize($$$)
  {
    my $message = shift;
    my $connector = shift;
    my $body = shift;

    communicator::process_message($connector, $body);
  }

sub taktuk_code($$$)
  {
    my $message = shift;
    my $connector = shift;
    my $body = shift;

    diagnostic::debug("Taktuk code received");
    $general::taktuk_code = $body;
  }

sub taktuk_perl($$$)
  {
    my $message = shift;
    my $connector = shift;
    my $body = shift;
    my ($options, $arguments, $filename);

    if ($body =~ /(?:^|\s)--(?:$|\s)/)
      {
        ($options, $arguments) = split /(?:^|\s)--(?:$|\s)/,$body,2;
        $arguments = "" if not defined($arguments);
      }
    else
      {
        ($options, $arguments) = ("", $body);
      }
    ($filename, $arguments) = split /\s/, $arguments, 2;
    $filename = "" if not defined($filename);
    $arguments = "" if not defined($arguments);
    diagnostic::debug("Taktuk perl execution, options [$options], filename ".
                      "[$filename], arguments [$arguments]");
    my $command = execute($taktuk::execute, $connector,
                          "perl $options -- - $arguments");
    if ($command)
      {
        $command->{line} = "taktuk_perl";
        $command->{line} .= " $body" if $body;
        general::load_taktuk_package('taktuk');
        taktuk::syswrite($command->{read}, $general::taktuk_package)
            or diagnostic::system;
        taktuk::syswrite($command->{read}, "\npackage main;\n")
            or diagnostic::system;
        if ($filename and ($filename ne "-"))
          {
            $filename = qx{echo "$filename"};
            chomp($filename);
            taktuk::syswrite($command->{read}, general::load_file($filename))
                or diagnostic::system;
            $command->close('read');
          }
      }
  }

sub wait_message($$$)
  {
    my $message = shift;
    my $connector = shift;
    my $body = shift;

    $connector->{pending_messages}--;
    if ($body)
      {
        if (($connector->{pending_messages} < 0) and ($body > 0))
          {
            my $timer = timer->register($body,\&recv_timeout);
            $timer->{connector} = $connector;
            $connector->{timer} = $timer;
          }
      }
  }

sub work($$$)
  {
    my $message = shift;
    my $connector = shift;
    my $body = shift;

    diagnostic::debug("Work received : $body");
    scheduler::dispatch_work($body);
    synchronizer::check_ready_state;
  }

sub init()
  {
    handlers::register_handler($taktuk::arguments, \&arguments);
    handlers::register_handler($taktuk::broadcast, \&broadcast);
    handlers::register_handler($taktuk::downcast, \&downcast);
    handlers::register_handler($taktuk::eof, \&eof);
    handlers::register_handler($taktuk::execute, \&execute);
    handlers::register_handler($taktuk::forward_up, \&forward_up);
    handlers::register_handler($taktuk::input, \&input);
    handlers::register_handler($taktuk::message, \&message);
    handlers::register_handler($taktuk::output, \&output);
    handlers::register_handler($taktuk::ping, \&ping);
    handlers::register_handler($taktuk::pong, \&pong);
    handlers::register_handler($taktuk::quit, \&quit);
    handlers::register_handler($taktuk::ready, \&ready);
    handlers::register_handler($taktuk::reduce, \&reduce);
    handlers::register_handler($taktuk::numbering, \&numbering);
    handlers::register_handler($taktuk::send_to, \&send_to);
    handlers::register_handler($taktuk::spread, \&spread);
    handlers::register_handler($taktuk::steal, \&steal);
    handlers::register_handler($taktuk::synchronize, \&synchronize);
    handlers::register_handler($taktuk::taktuk_code, \&taktuk_code);
    handlers::register_handler($taktuk::taktuk_perl, \&taktuk_perl);
    handlers::register_handler($taktuk::wait_message, \&wait_message);
    handlers::register_handler($taktuk::work, \&work);
  }

###############################################
### MAIN                                    ###
###############################################

package main;
use strict; use bytes;

our $taktuk_interpreter = undef;
our $interactive = 0;
our $forced_interactive = 0;
our $no_numbering = 0;
our $terminate = 0;

sub translate()
  {
    my $message = undef;
    my $data = undef;
    my @commands = qw(broadcast close downcast exec file_input input
                      line_input print_tree synchronize taktuk_perl quit);
    my $found="";
    my $number_found=0;
    my $command;
    my $dont_skip = 0;

    $command = arguments::get_next_command;
    if (not $arguments_ended and ($command =~ /^[a-z]+$/))
      {
        foreach my $fullname (@commands)
          {
            if ($fullname =~ m/^$command/)
              {
                $found = $fullname;
                $number_found++;
              }
          }
        $command = $found if ($number_found == 1);
      }
    
    if ($arguments_ended or $command =~ m/^\s*$/o)
      {
        $message = "";
        $dont_skip = 1 if $arguments_ended;
      }
    elsif ($command =~ /^[0-9]/)
      {
        my @send_set = taktuk::decode_set($command);

        if (scalar(@send_set))
          {
            $dont_skip = 1;
            ($message, $data) = translate();
            if ($message)
              {
                $message = taktuk::encode($taktuk::send_to,
                               taktuk::pack(taktuk::encode_set(@send_set)).
                               $message);
              }
          }
        else
          {
            diagnostic::error("Invalid set specification : $command");
            $message = "";
          }
      }
    elsif ($command eq "broadcast")
      {
        $dont_skip = 1;
        ($message, $data) = translate();
        if ($message)
          {
            $message = taktuk::encode($taktuk::broadcast, $message);
          }
      }
    elsif ($command eq "downcast")
      {
        $dont_skip = 1;
        ($message, $data) = translate();
        if ($message)
          {
            $message = taktuk::encode($taktuk::downcast, $message);
          }
      }
    elsif ($command eq "close")
      {
        $message = $taktuk::eof;
      }
    elsif ($command eq "exec")
      {
        my $parameters = arguments::get_parameters;
        $message = taktuk::encode($taktuk::execute, $parameters);
      }
    elsif ($command eq "file_input")
      {
        my $parameters = arguments::get_parameters;

        $data = general::open_file($parameters);
        $message = $taktuk::input if $data;
      }
    elsif ($command eq "input")
      {
        my $parameters = arguments::get_parameters;
        $message = taktuk::encode($taktuk::input, $parameters);
      }
    elsif ($command eq "line_input")
      {
        my $parameters = arguments::get_parameters;
        $message = taktuk::encode($taktuk::input, $parameters."\n");
      }
    elsif ($command eq "print_tree")
      {
        $message = taktuk::encode($taktuk::reduce, $taktuk::reduce_tree);
      }
    elsif ($command eq "synchronize")
      {
        $dont_skip = 1;
        ($message, $data) = translate();
        if ($message)
          {
            $message = taktuk::encode($taktuk::synchronize, $message);
          }
      }
    elsif ($command eq "taktuk_perl")
      {
        my $parameters = arguments::get_parameters;
        $message = taktuk::encode($taktuk::taktuk_perl, $parameters);
      }
    elsif ($command eq "quit")
      {
        $terminate = 1;
        $message = "";
      }
    else
      {
        diagnostic::error("Unknown command : $command");
        $message = "";
      }
    arguments::skip_command_separator unless $dont_skip;
    return ($message, $data);
  }

sub fork_taktuk_interpreter()
  {
    $taktuk_interpreter = fork();
    if (not defined($taktuk_interpreter))
      {
        diagnostic::system;
        # I'm quite worried in this case. The fork interpreter is mandatory in
        # the root node
        diagnostic::error("FATAL : cannot continue without forked interpreter");
        exit 1;
      }
    if ($taktuk_interpreter)
      {
        arguments::initialize_terminal;
        process_commands();
        waitpid $taktuk_interpreter,0;
        exit 0;
      }
  }

sub handle_message($)
  {
    my $message = shift;
    my $connector = communicator::get_control;

    if (defined($taktuk_interpreter))
      {
        $connector->send_message($message);
      }
    else
      {
        communicator::process_message($connector, $message);
      }
  }

sub process_commands()
  {
    my $message;
    my $data;

    handle_message(taktuk::encode($taktuk::spread,
                   taktuk::encode($taktuk::reduce, $taktuk::reduce_count)))
         if (($general::root) and !$no_numbering);
    while (not $arguments::arguments_ended and not $terminate)
      {
        ($message, $data) = translate();
        if ($message)
          {
            if ($data)
              {
                my $result;
                my $partial_message;
                my $buffer;

                # I read half less than $taktuk::read_size to minimize
                # the chances of having fragmented taktuk packets
                $result = sysread($data, $buffer, $taktuk::read_size/2);
                while ($result)
                  {
                    $partial_message = taktuk::encode($message, $buffer);
                    diagnostic::debug("Message to send : $partial_message");
                    handle_message($partial_message);
                    $result = sysread($data, $buffer, $taktuk::read_size/2);
                  }
                diagnostic::system if not defined($result);
                CORE::close($data) if ($data != \*STDIN);
              }
            else
              {
                diagnostic::debug("Message to send : $message");
                handle_message($message);
              }
          }
        if ($arguments::arguments_ended and $interactive)
          {
            arguments::fetch_arguments(\*STDIN);
            $interactive = 0;
          }
      }
    if ($general::root)
      {
        handle_message(taktuk::encode($taktuk::synchronize,
                           taktuk::encode($taktuk::broadcast, $taktuk::eof)));
        handle_message(taktuk::encode($taktuk::synchronize,
                           taktuk::encode($taktuk::spread,$taktuk::quit)));
      }
  }

taktuk::no_flush(\*STDOUT);
# To be called first because most other initializations copy stuff from
# initialized settings
general::init;
arguments::init;
communicator::init;
handlers::init;
# Must come after handlers::init as it replaces some handlers
# Must come before the ready_state_check (of course)
# This call will synchronize all broadcast-like operations with the deployment
synchronizer::set_not_ready;
arguments::fetch_arguments(@ARGV);
arguments::parse_options;
# This call will synchronize all executions (even local ones) with the logical
# nodes numbering (should be ok after options parsing)
synchronizer::set_not_numbering unless $no_numbering;

if ($general::root)
  {
    $synchronizer::father_is_ready = 1;
    general::load_taktuk_file if $general::self_propagate;
    if ($arguments::arguments_ended or $forced_interactive)
      {
        diagnostic::debug("Interactive mode");
        if ($arguments::arguments_ended)
          {
            arguments::fetch_arguments(\*STDIN);
          }
        else
          {
            $interactive = 1;
          }
      }
    # Fork interpreter in all case to make file_input from STDIN not blocking
    # for the TakTuk engine
    # Other solutions such as asynchronous file load using the communicator are
    # too complicated because of messages encapsulation (broadcast, sendto,
    # ...)
    # Not that bad because this only concerns root node (file_input from STDIN
    # in a non root node has no sense)
    fork_taktuk_interpreter;
  }
else
  {
    general::print($connector::init_string.$taktuk::VERSION."\n");
  }

# has to be called once after arguments parsing and 'father_is_ready' setting
# for the case in which there are no connectors to deploy
# Other tests will be called on connector initialization complete or on
# connection errors
synchronizer::check_ready_state;

# Useful especially for stealing if I've nothing to do
scheduler::schedule;
communicator::run;
diagnostic::debug("End of the taktuk code");
# WARNING : avoid to put anything below the following line if you want the
# autoload feature to run smoothly
__END__
