#!/usr/bin/perl
$ENV{PERL_RL}=" o=0";
package settings;
use constant FD => 0;
use constant SOCKET => 1;
package arguments;
sub get_config();
sub initialize_terminal();
sub fetch_arguments(@);
sub restore_last_argument();
sub get_next_command();
sub get_next_argument();
sub read_buffer_data($);
sub delete_buffer($);
sub get_data_buffer();
sub is_first_argument($);
sub get_next_stuff($);
sub get_parameters();
sub skip_command_separator();
sub terminate_arguments_parsing();
sub register_option($$$$@);
sub args_file($);
sub machines_file($);
sub machine($);
sub localhost($);
sub not_root();
sub print_defaults();
sub print_package($);
sub print_version();
sub init();
sub parse_options();
sub check_separators_integrity();
sub set_option($$$);
package command;
sub new (%);
sub run ();
sub cleanup ();
sub my_shutdown($$);
sub close($);
sub output($$);
package communicator;
sub init();
sub get_connections($);
sub get_root();
sub get_control();
sub get_outgoing();
sub process_messages($$);
sub process_message($$);
sub process_command_output($$);
sub run ();
sub terminate ();
sub add_descriptors($);
sub remove_descriptor($$);
sub no_pending_connectors();
sub add_connector($$);
sub connector_initialized($);
sub remove_from_set($$);
sub remove_connector($);
sub add_local_command($);
sub remove_local_command($);
package connector;
sub new (%);
sub read_data ($);
sub get_message ();
sub send_message($);
sub pack();
sub unpack($);
package diagnostic;
sub print_info($$$);
sub system ();
sub debug ($);
sub error ($);
sub warning ($);
package general;
sub init();
sub open_file($);
sub load_file($);
sub load_taktuk_file();
sub load_taktuk_package($);
sub print_help();
sub add_peer(@);
sub print($);
sub print_tree($$);
package handlers;
sub register_handler($$);
sub replace_handler($$);
sub get_handler($);
sub handler_blocked($$$);
sub arguments($$$);
sub broadcast($$$);
sub eof($$$);
sub execute($$$);
sub forward_up($$$);
sub input($$$);
sub message($$$);
sub numbering($$$);
sub output($$$);
sub ping($$$);
sub pong($$$);
sub quit($$$);
sub ready($$$);
sub recv_timeout($);
sub reduce_count($$$);
sub reduce_tree($$$);
sub reduce($$$);
sub check_ongoing_reduces();
sub send_to($$$);
sub spread($$$);
sub steal($$$);
sub synchronize($$$);
sub taktuk_code($$$);
sub taktuk_perl($$$);
sub wait_message($$$);
sub work($$$);
sub init();
package main;
sub translate();
sub fork_taktuk_interpreter();
sub handle_message($);
sub process_commands();
package my_select;
sub new();
sub add($$);
sub remove($$);
sub select($$$$);
sub handles();
package taktuk;
sub no_flush($);
sub unpack($);
sub pack($);
sub decode($);
sub encode($$);
sub syswrite($$);
sub read_data($);
sub get_message($);
sub find_sequence($$);
sub flush_buffer($);
sub error_msg($);
sub send(%);
sub recv(%);
sub decode_set($);
sub encode_set(@);
our $VERSION=30195;
our $read_size = 32768;
our $write_size = 32768;
our $error = undef;
our $eof="D";
our $taktuk_perl="E";
our $timeout="O";
our $pong="P";
our $reduce="R";
our $spread="S";
our $taktuk_code="T";
our $wait_message="W";
our $arguments="a";
our $broadcast="b";
our $downcast="d";
our $execute="e";
our $input="i";
our $message="m";
our $numbering="n";
our $output="o";
our $ping="p";
our $quit="q";
our $ready="r";
our $steal="s";
our $send_to="t";
our $forward_up="u";
our $work="w";
our $synchronize="x";
our $reduce_count = 'c';
our $reduce_tree = 't';
package scheduler;
sub deploy_connector($);
sub add_connector($);
sub connector_initialized($);
sub connector_failed($);
sub schedule();
sub is_idle();
sub send_work();
sub dispatch_work($);
sub theft_handler($$);
package synchronizer;
sub check_ready_state();
sub set_not_ready();
sub initialization_complete($);
sub initialization_failed($);
sub initialization_timeout($);
sub set_not_numbering();
sub block_until_event($@);
sub dispatch_event($);
sub add_pending_message($$$);
package timer;
sub current_time();
sub register($$);
sub check_timeouts();
sub unregister();
sub gettime($);
sub print($);
package general;
use strict; use bytes;
use File::Basename;
use Sys::Hostname;
our $connector_command;
our $login_name;
our $host;
our $root=1;
our $taktuk_command;
our $self_propagate;
our $taktuk_code = undef;
our $taktuk_package = undef;
our $template;
our $redirect;
our $rank = -1;
our $count = -1;
our $child_min = -1;
our $child_max = -1;
sub init()
{
$host=hostname;
chomp($host);
$ENV{TAKTUK_HOSTNAME} = $host;
}
sub open_file($)
{
my $filename = shift;
my $fd = undef;
if ($filename and ($filename ne "-"))
{
if (not open $fd, "<", $filename)
{
diagnostic::error("Trying to open $filename");
diagnostic::system;
$fd = undef;
}
}
else
{
if ($main::interactive)
{
diagnostic::error("Cannot load STDIN in interactive mode");
}
elsif (not $general::root)
{
diagnostic::error("Cannot load STDIN on a non root node");
}
else
{
$fd = \*STDIN;
}
}
return $fd;
}
sub load_file($)
{
my $filename = shift;
my $fd = undef;
my $new_data;
my $end = 0;
my $stuff="";
my $result;
if ($filename and ($filename ne "-"))
{
if (not open $fd, "<", $filename)
{
diagnostic::error("Trying to open $filename");
diagnostic::system;
return "";
}
}
else
{
if ($main::interactive)
{
diagnostic::error("Cannot load STDIN in interactive mode");
return "";
}
elsif (not $general::root)
{
diagnostic::error("Cannot load STDIN on a non root node");
return "";
}
else
{
$fd = \*STDIN;
}
}
while (not $end)
{
$result = sysread($fd, $new_data, $taktuk::read_size);
diagnostic::system if not defined($result);
if ($result > 0)
{
$stuff = $stuff.$new_data;
}
else
{
$end = 1;
}
} 
if ($fd != \*STDIN)
{
close $fd or diagnostic::system;
}
return $stuff;
}
sub load_taktuk_file()
{
my $filename = $0;
my $fd = undef;
my $new_data;
my $end = 0;
my $result;
if (not -r $filename)
{
$filename = `which $0`;
}
if ($filename)
{
if (not open $fd, "<", $filename)
{
diagnostic::error("Trying to open $filename");
diagnostic::system;
return "";
}
}
else
{
diagnostic::error("Cannot find own taktuk executable");
return "";
}
$taktuk_code = "";
while (not $end)
{
$result = sysread($fd, $new_data, $taktuk::read_size);
diagnostic::system if not defined($result);
if ($result > 0)
{
$taktuk_code = $taktuk_code.$new_data;
}
else
{
$end = 1;
}
} 
close $fd or diagnostic::system;
}
sub load_taktuk_package($)
{
my $package_name = shift;
my $state = 0;
my $last_pos;
if (not defined($taktuk_package))
{
if (not defined($taktuk_code))
{
diagnostic::warning("Taktuk code should have been loaded...")
if ($self_propagate);
load_taktuk_file;
}
$taktuk_package = "";
my $last_pos = 0;
while ($taktuk_code =~ /(?=package\s+([^;\s]*)\s*;\s*$)/gmi)
{
my $pos = pos($taktuk_code);
$taktuk_package .= substr($taktuk_code,$last_pos, $pos - $last_pos)
if $state;
if ($1 eq $package_name)
{
$state = 1;
}
else
{
$state = 0;
}
$last_pos = $pos;
}
$taktuk_package .= substr($taktuk_code, $last_pos) if $state;
$taktuk_package .= "1;\n";
}
}
sub print_help()
{
communicator::get_root->output('info', <<END_HELP);
Usage :
taktuk [ options ] [ commands ]
If one or more commands are given, TakTuk will execute them sequentially.
Otherwise, TakTuk enter interactive mode, waiting for commands.
Commands are (braces can be replaced by any delimiter):
set command : sends the command to all the peers of the set
(see TakTuk manual for set specification syntax)
broadcast command : broadcasts TakTuk command to all the remote peers
not including the node initiating the broadcast
close : closes stdin of all commands being executed locally
downcast command : spreads TakTuk command to all the children of the
node initiating the command (not including itself)
exec [command] : executes the given shell command on the local node
file_input [filename] : sends the content of a file as input to all commands
being executed locally (might also be broadcasted).
Reads file from STDIN if filename is '-' or void
input [data] : sends input to all commands being executed locally
line_input [data] : same as input but adds a newline at the end of data
print_tree : prints the TakTuk deployment tree
synchronize command : forces the following command to wait for deployment,
numbering and preceding commands before executing
taktuk_perl [args] : forks a perl interpreter on the local node aware of
the taktuk package and communication routines (see
manual for a description of the taktuk package which
contains point-to-point communication routines).
Similar in principle to 'exec perl args'.
WARNING: due to limited parser, you have to give
args (even if empty) and to use '--' to explicitely
terminate switches if any.
quit : quit TakTuk and shutdown its logical network
General options may contain :
-c, --connector command : defines the connector commands used for following
remote machines.
-d, --dynamic limit : turns dynamic mode on (work stealing) for all the
following remote machines specifications.
Uses limit as a maximal arity (0 = no limit).
A negative value for limit turns dynamic mode off.
Warning, currently it is a bad idea to use several
-d options on the same command line.
-f, --machines-file name : name is the name of a file that contains remote
machines name (equivalent to several -m opions)
-g, --time-granularity n : sets to n the maximal interval between timeouts
checks (usually checks are made more often).
-h, --help : prints this help.
-i, --interactive : forces interactive mode afer command line parsing.
-l, --login name : sets the login name for the following hosts.
-m, --machine name -[ -] : name is the name of a remote host to be deployed
with its optional arguments between -[ and -].
-n, --no-numbering : disable taktuk nodes numbering, speed up deployment
-o, --output-template : set an output template specification for one of the
name=specification output streams (see man for details about
templates). When giving only a name (without
specification) this disables the stream.
-s, --self-propagate : propagate the TakTuk executable through connectors
(eliminates the need for a TakTuk installation on
remote machines).
-t, --timeout time : sets the timeout for connectors (0 = no timeout).
-v, --version : prints TakTuk version and exits
-w, --window number : sets initial window size to number (pipeline width).
-C, --command-separator : changes the set of characters considered as command
separators set separators.
-F, --args-file name : name is the name of a file that contains options.
-L, --localhost name : sets the name of localhost as viewed by TakTuk.
-O, --option-separator : changes the set of characters considered as option
separators set separators.
-P, --print-defaults : prints default settings
-R, --output-redirect : sets an output redirection for one of the output
name=number streams (see man for details about output streams)
-T, --taktuk-command com : com is the name of the TakTuk command that should
be used for the following hosts.
-W, --static-window : turns off the dynamic adjustment of window size
(no effect, dynamic window mode not implemented).
Environment variables defined by TakTuk for commands execution (see man for
details about variables that change TakTuk settings):
TAKTUK_CONTROL : file descriptor used by taktuk module
(usually not useful for most users)
TAKTUK_COUNT : total number of successfully deployed nodes
TAKTUK_HOSTNAME : hostname of the local node as given to TakTuk
TAKTUK_PIDS : pids list of commands executed by the local TakTuk
TAKTUK_RANK : logical number of local node (in [1..TAKTUK_COUNT])
END_HELP
$main::terminate = 1;
}
sub add_peer(@)
{
my $actual_command;
my $peer_name = shift;
my ($command_args, $message_args) = arguments::get_config;
if ($self_propagate)
{
$actual_command = "echo $connector::connector_functional_string\\;".
"exec perl -- - -r ";
}
else
{
$actual_command = "exec $taktuk_command -r ";
}
$actual_command = $actual_command.$command_args." ";
$actual_command = $actual_command."-L $peer_name ";
foreach my $element (@_)
{
$message_args = $message_args.taktuk::pack($element);
}
my $connection_command = $connector_command;
$connection_command .= " -l $login_name" if defined($login_name);
my $connector=connector::new(command => $connection_command,
peer => $peer_name,
taktuk => $actual_command,
arguments => $message_args);
diagnostic::debug("New external connector command : $connector->{line}");
scheduler::add_connector($connector);
}
sub print($)
{
my $message=shift;
taktuk::syswrite(\*STDOUT,$message) or diagnostic::system;
}
sub print_tree($$)
{
my $prefix = shift;
my $data = shift;
communicator::get_root->output('info',$prefix.(shift @$data)."\n");
foreach my $subtree (@$data)
{
print_tree($prefix." ",$subtree);
}
}
package my_select;
use strict; use bytes;
sub new ()
{
my $data = { rin=>'', win=>'', ein=>'', handles=>[] };
bless($data);
return $data;
}
sub add ($$)
{
my $self = shift;
my $handle = shift;
if (vec($self->{rin}, fileno($handle), 1) == 0)
{
vec($self->{rin}, fileno($handle), 1) = 1;
vec($self->{win}, fileno($handle), 1) = 1;
vec($self->{ein}, fileno($handle), 1) = 1;
push @{$self->{handles}}, $handle;
}
else
{
diagnostic::warning("Descriptor already in set");
}
}
sub remove ($$)
{
my $self = shift;
my $handle = shift;
my $i=0;
my $handles_list = $self->{handles};
if (vec($self->{rin}, fileno($handle), 1) == 1)
{
vec($self->{rin}, fileno($handle), 1) = 0;
vec($self->{win}, fileno($handle), 1) = 0;
vec($self->{ein}, fileno($handle), 1) = 0;
while (($i <= $#$handles_list) and ($handles_list->[$i] != $handle))
{
$i++;
}
if ($i <= $#$handles_list)
{
splice @$handles_list, $i, 1;
return 1;
}
else
{
diagnostic::warning("Didn't found descriptor to remove");
return 0;
}
}
else
{
diagnostic::warning("Descriptor not in set");
}
}
sub select ($$$$)
{
my $read_select = shift;
my $write_select = shift;
my $except_select = shift;
my $timeout = shift;
my $rin = defined($read_select)?$read_select->{rin}:undef;
my $win = defined($write_select)?$write_select->{win}:undef;
my $ein = defined($except_select)?$except_select->{ein}:undef;
my $rout;
my $wout;
my $eout;
my ($nfound, $timeleft) =
CORE::select($rout=$rin, $wout=$win, $eout=$ein, $timeout);
if ($nfound == -1)
{
return ();
}
elsif ($nfound == 0)
{
return ([],[],[]);
}
else
{
my $read_set = [];
my $write_set = [];
my $except_set = [];
my $handle;
foreach $handle (@{$read_select->{handles}})
{
push(@$read_set, $handle) if (vec($rout,fileno($handle),1) == 1);
}
foreach $handle (@{$write_select->{handles}})
{
push(@$write_set, $handle) if (vec($wout,fileno($handle),1) == 1);
}
foreach $handle (@{$except_select->{handles}})
{
push(@$except_set, $handle) if (vec($eout,fileno($handle),1) == 1);
}
return ($read_set, $write_set, $except_set);
}
}
sub handles ()
{
my $self=shift;
return @{$self->{handles}};
}
package arguments;
use strict; use bytes;
our $has_readline = eval("use Term::ReadLine;1")?1:0;
our $terminal = undef;
our $command_separator;
our $option_separator;
use constant COMMAND_LINE => 0;
use constant MESSAGE => 1;
our %handler;
our %long_name;
our %short_name;
our %type;
our %transmission_mode;
our %current_value;
our @current_config;
our $current_config_outdated = 1;
our $arguments_ended = 1;
our $again=0;
our @arguments = ();
our $current_argument = undef;
our $current_stuff = undef;
our $current_separators = undef;
our $options_ended=0;
sub get_config()
{
if ($current_config_outdated)
{
my $command_args = "";
my $message_args = "";
foreach my $key (keys(%current_value))
{
if (exists($transmission_mode{$key}))
{
if ($transmission_mode{$key} == COMMAND_LINE)
{
if (UNIVERSAL::isa($current_value{$key}, 'HASH'))
{
foreach my $field (keys(%{$current_value{$key}}))
{
$command_args.=" -$key$field";
$command_args.="=$current_value{$key}->{$field}"
if defined($current_value{$key}->{$field});
}
}
else
{
$command_args.=" -$key";
$command_args.="$current_value{$key}" if ($type{$key});
}
}
elsif ($transmission_mode{$key} == MESSAGE)
{
if (UNIVERSAL::isa($current_value{$key}, 'HASH'))
{
foreach my $field (keys(%{$current_value{$key}}))
{
$message_args.=taktuk::pack("-$key");
$message_args.=taktuk::pack("$field".
(defined($current_value{$key}->{$field})?
"=$current_value{$key}->{$field}":""));
}
}
else
{
$message_args.=taktuk::pack("-$key");
$message_args.=taktuk::pack($current_value{$key})
if ($type{$key});
}
}
else
{
diagnostic::warning("Internal bug in get_config");
}
}
}
diagnostic::debug("Config : $command_args | ".$message_args."\n");
@current_config = ($command_args, $message_args);
$current_config_outdated = 0;
}
return @current_config;
}
sub initialize_terminal()
{
if ($has_readline)
{
$terminal = Term::ReadLine->new("TakTuk", \*STDIN, \*STDOUT);
}
}
sub fetch_arguments(@)
{
if (scalar(@_))
{
unshift @arguments, @_;
$arguments_ended = 0;
$options_ended = 0;
}
}
sub restore_last_argument()
{
if ($again)
{
diagnostic::warning("Cannot restore an ungeted argument");
}
else
{
$again = 1;
}
}
sub get_next_command()
{
my $command;
get_next_stuff($option_separator);
if ($current_separators =~ m/([$command_separator])/)
{
$current_stuff .= $1;
$current_separators = "";
}
if ($current_stuff =~ m/^([^$option_separator]+)[$option_separator]+(.*)$/)
{
$command = $1;
$current_stuff = $2;
$again = 1;
}
elsif ($current_stuff =~
m/^([^$command_separator]+)([$command_separator].*)$/)
{
$command = $1;
$current_stuff = $2;
$again = 1;
}
else
{
$command = $current_stuff;
$current_stuff = "";
}
diagnostic::debug("Command found : [$command]");
return $command;
}
sub get_next_argument()
{
get_next_stuff($option_separator);
$current_argument = $current_stuff;
diagnostic::debug("Argument found : [$current_argument]");
return $current_stuff;
}
our %buffer;
our %end;
sub read_buffer_data($)
{
my $argument = shift;
my $old_pos = undef;
my $new_data;
my $result;
if (defined($terminal) and ($argument == \*STDIN))
{
$new_data = $terminal->readline("");
if (defined($new_data))
{
$new_data .= "\n";
$result = length($new_data);
}
else
{
$result = 0;
}
}
else
{
$result = sysread($argument,$new_data,$taktuk::read_size);
}
$old_pos = pos($buffer{$argument});
diagnostic::system if not defined($result);
if ($result)
{
$buffer{$argument} .= $new_data if $result;
pos($buffer{$argument}) = $old_pos;
}
else
{
$end{$argument} = 1;
CORE::close $argument if ($argument != \*STDIN);
}
return $result;
}
sub delete_buffer($)
{
my $argument = shift;
diagnostic::warning("Bug") if not exists($end{$argument});
delete $buffer{$argument};
delete $end{$argument};
}
sub get_data_buffer()
{
my $data = undef;
my $args_remaining = 1;
$args_remaining = 0 if not scalar(@arguments);
while (not defined($data) and $args_remaining)
{
my $argument = $arguments[0];
if (UNIVERSAL::isa($argument,'GLOB'))
{
if ($end{$argument})
{
delete_buffer($argument);
shift @arguments;
$args_remaining = 0 if not scalar(@arguments);
}
else
{
$buffer{$argument} = "" if not exists($buffer{$argument});
read_buffer_data($argument) if not length($buffer{$argument});
if (length($buffer{$argument}))
{
$data = \$buffer{$argument};
}
}
}
else
{
$data = \$arguments[0];
}
}
return $data;
}
sub is_first_argument($)
{
my $data = shift;
if (exists($arguments[0]) and ($data == \$arguments[0]))
{
return 1;
}
else
{
return 0;
}
}
sub get_next_stuff($)
{
my $separator = shift;
my $done = 0;
my $data = 1;
if ($again)
{
$again = 0;
}
else
{
$current_stuff = "";
$current_separators = "";
while (not $done and defined($data))
{
$data = get_data_buffer;
if (defined($data))
{
if (is_first_argument($data))
{
$current_stuff = $$data;
shift @arguments;
$done = 1;
}
else
{
if ($$data =~ m/([$separator]+)/g)
{
$current_separators = $1;
$current_stuff .= substr $$data, 0,
pos($$data)-length($current_separators);
$$data = substr $$data, pos($$data);
if (length($current_stuff))
{
$done = 1;
}
}
else
{
$current_stuff .= $$data;
$$data = "";
}
}
}
}
$arguments_ended = 1 if not length($current_stuff);
}
}
sub get_parameters()
{
my $left_brace = undef;
my $right_brace;
my $balance=1;
my $parameters = "";
my $data=1;
if (length($current_stuff))
{
diagnostic::warning("Warning invalid [$current_stuff] before ".
"parameters");
$balance = 0;
}
while ($balance and defined($data))
{
$data = get_data_buffer;
if (defined($data))
{
if (defined($left_brace) and is_first_argument($data))
{
$$data = " ".$$data;
}
if (not defined($left_brace))
{
$$data =~ s/^(.)//;
$left_brace = $1;
if ($left_brace =~ m/[({[]/)
{
$right_brace = $left_brace;
$right_brace =~ tr/({[/)}]/;
}
else
{
$right_brace = "";
}
}
while ($balance and
($$data =~ m/.*?([$right_brace$left_brace])/g))
{
my $brace = $1;
if ($right_brace and ($brace =~ m/[$left_brace]/))
{
$balance++;
}
else
{
$balance--;
}
}
if (not $balance)
{
$parameters .= substr $$data, 0, pos($$data)-1;
$$data = substr $$data, pos($$data);
}
else
{
$parameters .= $$data;
shift @arguments if is_first_argument($data);
$$data = "";
}
}
}
if ($balance)
{
$arguments_ended = 1;
diagnostic::warning("Bad $left_brace$right_brace balance");
}
diagnostic::debug("Parameters found : [$parameters]");
return $parameters;
}
sub skip_command_separator()
{
my $done = 0;
my $data;
if ($again)
{
$data = \$current_stuff;
}
else
{
$data = get_data_buffer;
}
while (not $done and not $arguments_ended)
{
if (defined($data))
{
if ($$data =~ s/^[$option_separator]*[$command_separator]+[$option_separator]*//)
{
$done = 1;
shift @arguments if (not length($$data) and
is_first_argument($data));
}
else
{
if ($$data =~ m/^[$option_separator]*$/)
{
$$data = "";
shift @arguments if $data == \$arguments[0];
$data = get_data_buffer;
}
else
{
diagnostic::warning("Missing command separator");
$done = 1;
}
}
}
else
{
$arguments_ended = 1;
}
}
$again = 0 if not length($current_stuff);
}
sub terminate_arguments_parsing()
{
while (scalar(@arguments))
{
my $argument = shift @arguments;
close $argument if UNIVERSAL::isa($argument,'GLOB');
}
$arguments_ended = 1;
}
sub register_option($$$$@)
{
my $short = shift;
my $long = shift;
my $opt_type = shift;
my $opt_handler = shift;
$long_name{$short} = $long;
$short_name{$long} = $short;
$handler{$short} = $opt_handler;
$type{$short} = $opt_type;
if (scalar(@_))
{
if (UNIVERSAL::isa($opt_handler, 'CODE'))
{
&$opt_handler(shift);
}
else
{
$$opt_handler = shift;
}
if (scalar(@_))
{
$transmission_mode{$short} = shift;
}
}
}
sub args_file($)
{
my $file = shift;
my $file_handle = undef;
$file_handle = general::open_file($file)
or diagnostic::error("Can't open [$file]");
fetch_arguments($file_handle);
}
sub machines_file($)
{
my $file = shift;
my $file_handle = undef;
$file_handle = general::open_file($file)
or diagnostic::error("Can't open [$file]");
while (my $line = <$file_handle>)
{
foreach my $peer_name (split /\s+/,$line)
{
general::add_peer($peer_name);
}
}
close $file_handle if $file_handle != \*STDIN;
}
sub machine($)
{
my $peer = shift;
my @peer_arguments=();
get_next_argument;
if ($current_argument eq "-[")
{
my $count = 1;
while ($count and !$arguments_ended)
{
get_next_argument;
$count-- if ($current_argument eq "-]");
$count++ if ($current_argument eq "-[");
push @peer_arguments, $current_argument;
}
if ($count)
{
diagnostic::error("Invalid -[ and -] imbrication");
exit(1);
}
else
{
pop @peer_arguments;
}
}
else
{
restore_last_argument;
}
general::add_peer($peer,@peer_arguments);
}
sub localhost($)
{
my $hostname = shift;
$general::host = $hostname;
$ENV{TAKTUK_HOSTNAME} = $hostname;
}
sub not_root()
{
if ($general::root)
{
my $connector = connector::new('write' =>\*STDIN,
'read' =>\*STDOUT,
'type' =>settings::FD);
communicator::add_connector($connector, 'sources');
$general::root = 0;
}
}
sub print_defaults()
{
foreach my $key (keys(%long_name))
{
if (not UNIVERSAL::isa($handler{$key}, 'CODE'))
{
my $name = $long_name{$key};
if ($type{$key} =~ m/^h.$/)
{
foreach my $field (keys(%${$handler{$key}}))
{
my $line = uc("TAKTUK_".$name."_".$field);
my $value = defined(${$handler{$key}}->{$field})?
${$handler{$key}}->{$field}:"";
$line =~ tr/-/_/;
communicator::get_root->output('info',
$line."=".$value."\n");
}
}
else
{
my $line = uc("TAKTUK_".$name);
my $value = defined(${$handler{$key}})?
${$handler{$key}}:"";
$line =~ tr/-/_/;
communicator::get_root->output('info', $line."=".$value."\n");
}
}
}
$main::terminate = 1;
} 
sub print_package($)
{
my $package_name = shift;
general::load_taktuk_package($package_name);
communicator::get_root->output('info',$general::taktuk_package);
$main::terminate = 1;
}
sub print_version()
{
my $version = int($taktuk::VERSION/1000)/10.0;
my $release = $taktuk::VERSION%1000;
communicator::get_root->output('info',
"TakTuk version $version release $release\n");
$main::terminate = 1;
}
sub init()
{
register_option("C","command-separator","s",
\$arguments::command_separator,',;\n',MESSAGE);
register_option("D","debug","hi",\$diagnostic::package_level,
{ default=>2 },COMMAND_LINE);
register_option("F","args-file","s",\&args_file);
register_option("L","localhost","s",\&localhost);
register_option("O","option-separator","s",
\$arguments::option_separator,'\s',MESSAGE);
register_option("P","print-defaults","",\&print_defaults);
register_option("R","output-redirect","hi",\$general::redirect,
{ "default" => 1, "taktuk" => 2 }, MESSAGE);
register_option("T","taktuk-command","s",\$general::taktuk_command,
$0,MESSAGE);
register_option("W","static-window","",\$scheduler::static_window,
0,MESSAGE);
register_option("c","connector","s",\$general::connector_command,
"ssh -o StrictHostKeyChecking=no -o BatchMode=yes", MESSAGE);
register_option("d","dynamic","i",\$scheduler::dynamic_limit, 0, MESSAGE);
register_option("f","machines-file","s",\&machines_file);
register_option("g","time-granularity","f",\$communicator::select_timeout,
1, MESSAGE);
register_option("h","help","",\&general::print_help);
register_option("i","interactive","",\$main::forced_interactive);
register_option("l","login","s",\$general::login_name, undef, MESSAGE);
register_option("m","machine","s",\&machine);
register_option("n","no-numbering","",\$main::no_numbering,0,COMMAND_LINE);
register_option("o","output-template","hs",\$general::template,
{
"default" => '"$host-$rank: $command ($pid): $type > $line\n"',
"connector" => '"$host: $peer ($pid): $type > $line\n"',
"info" => '"$line$eol"',
"status" => '"$host-$rank: $command ($pid): $type > ".
(WIFEXITED($line)?"Exited with status ".WEXITSTATUS($line)
:(WIFSIGNALED($line)?"Signaled with signal ".WTERMSIG($line)
:(WIFSTOPPED($line)?"Stopped with signal ".WSTOPSIG($line)
:"")))."\n"',
"taktuk" => '"[ TAKTUK $level_name ] $host (PID $pid) Line ".
"$line_number ($package)\n$line\n"'
}, MESSAGE);
register_option("p","print-package","s",\&print_package);
register_option("r","not-root","",\&not_root);
register_option("s","self-propagate","",\$general::self_propagate,
0,MESSAGE);
register_option("t","timeout","f",\$scheduler::connector_timeout,0,MESSAGE);
register_option("v","version","",\&print_version);
register_option("w","window","i",\$scheduler::window, 10, MESSAGE);
foreach my $full_name (keys(%ENV))
{
my $variable;
if ($full_name =~ m/^TAKTUK_(.+)$/)
{
$variable = $1;
}
else
{
undef($variable);
}
if (defined($variable))
{
my $prefix;
my $qualifier;
$variable = lc($variable);
$variable =~ m/^(.*?)(?:_([^_]+))?$/;
($prefix, $qualifier) = ($1, $2);
$prefix =~ tr/_/-/;
if (defined($qualifier))
{
if (exists($short_name{$prefix}))
{
my $short = $short_name{$prefix};
if ($type{$short} =~ m/^h.$/)
{
my $reference = ${$handler{$short}};
$reference->{$qualifier} = $ENV{$full_name};
}
else
{
diagnostic::warning("Hash specification given where ".
"scalar expected in $full_name");
}
}
else
{
$prefix .= "-$qualifier";
$qualifier = undef;
}
}
if (not defined($qualifier))
{
if (exists($short_name{$prefix}))
{
my $short = $short_name{$prefix};
if ($type{$short} !~ m/^h.$/)
{
my $reference = $handler{$short};
$$reference = $ENV{$full_name};
}
else
{
diagnostic::warning("Scalar given where hash ".
"specification expected in $full_name");
}
}
else
{
diagnostic::warning("Unknown setting $full_name");
}
}
}
}
}
sub parse_options()
{
get_next_argument();
while (not $options_ended)
{
if ($current_argument =~ s/^-//o)
{
my @names;
if ($current_argument =~ s/^-(.*)$//o)
{
my $name = $1;
if ($name)
{
my $short = $short_name{$name};
if (not defined($short))
{
diagnostic::warning("Unknown long option $name");
general::print_help if ($general::root);
}
else
{
$current_argument = $short;
}
}
else
{
$options_ended = 1;
}
}
while ($current_argument)
{
$current_argument =~ s/^(.)//o;
my $name = $1;
if (exists($type{$name}))
{
my $destination = $handler{$name};
if ($type{$name})
{
get_next_argument if not length $current_argument;
if ($arguments_ended)
{
diagnostic::error("Missing argument")
}
else
{
set_option($name, $destination, $current_argument);
}
$current_argument = "";
}
else
{
set_option($name, $destination, 1);
}
}
else
{
diagnostic::warning("Unknown short option $name");
general::print_help if ($general::root);
}
} 
get_next_argument;
}
else
{
if (($current_argument =~ m/^\s*$/) and not $arguments_ended)
{
get_next_argument;
}
else
{
$options_ended=1;
restore_last_argument if ($current_argument !~ m/^\s*$/);
}
}
}
check_separators_integrity();
}
sub check_separators_integrity()
{
diagnostic::warning("Options and command separators intersect")
if ($option_separator =~ m/[$command_separator]/) or
($command_separator =~ m/[$option_separator]/);
my $chaine =
"-/0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ";
diagnostic::warning("Options or command separators contain one of :".
"-/1-9a-zA-Z")
if ($chaine =~ m/[$command_separator]/) or
($chaine =~ m/[$option_separator]/);
}
sub set_option($$$)
{
my $name = shift;
my $destination = shift;
my $value = shift;
my $key = undef;
my $current_type;
if ($type{$name} =~ m/^h(.)$/)
{
$current_type = $1;
if ($value =~ m/^([^=]+)(?:=(.+))?$/)
{
$key = $1;
$value = $2;
}
else
{
diagnostic::warning("Option $name needs a well formed key=value ".
"pair ($value doesn't match)");
$value = "";
}
}
else
{
$current_type = $type{$name};
}
if (($current_type eq "i") and ($value !~ /^-?[0-9]+$/o))
{
diagnostic::warning("Option $name needs a numeric argument ".
"($value doesn't match)");
}
elsif (($current_type eq "f") and ($value !~
/^(?:[0-9]+(?:\.[0-9]*)?|\.[0-9]+)$/o))
{
diagnostic::warning("Option $name needs a floating point argument ".
"($value doesn't match)");
}
else
{
if (defined($key))
{
if (exists($transmission_mode{$name}))
{
$current_value{$name} = {} if not exists $current_value{$name};
$current_value{$name}->{$key} = $value;
}
if (UNIVERSAL::isa($destination,'CODE'))
{
diagnostic::warning("NOT EXPECTED: Found option $name with ".
"code handler and hash value (key=$key)");
&$destination($key, $value);
}
else
{
diagnostic::debug("Found option $name with scalar ".
"handler and hash value (key=$key)");
$$destination->{$key} = $value;
}
}
else
{
$current_value{$name}=$value if exists($transmission_mode{$name});
if (UNIVERSAL::isa($destination,'CODE'))
{
diagnostic::debug("Found option $name with code handler and ".
"scalar value ". $value);
&$destination($value);
}
else
{
diagnostic::debug("Found option $name with scalar ".
"handler and scalar value ". $value);
$$destination = $value;
}
}
$current_config_outdated = 1 if exists($transmission_mode{$name});
}
}
package diagnostic;
use strict; use bytes;
use POSIX;
our $package_level;
our $depth = 0;
our $level="";
our $level_name="";
our $package="";
our $filename="";
our $line_number="";
sub print_info($$$)
{
$level = shift;
$level_name = shift;
($package, $filename, $line_number) = caller(1);
my $level_ok = defined($package_level->{$package})?
$package_level->{$package}<=$level:
$package_level->{default}<=$level;
$depth++;
if ($level_ok and ($depth < 3))
{
communicator::get_root->output('taktuk',shift);
}
$depth--;
}
sub system ()
{
print_info(3,"ERROR : SYSTEM", "Error $!");
}
sub debug ($)
{
print_info(1,"DEBUG", shift);
}
sub error ($)
{
print_info(3,"ERROR", shift);
}
sub warning ($)
{
print_info(2,"WARNING", shift);
}
package taktuk;
use strict; use bytes;
our %buffer;
sub no_flush($)
{
my $new_fd = shift;
binmode($new_fd);
my $old_fd=select($new_fd);
$|=1;
select($old_fd);
}
sub unpack($)
{
my $buffer = shift;
if (length($buffer) >= 4)
{
my $size;
($size) = CORE::unpack("N",$buffer);
if (length($buffer) >= $size+4)
{
return (substr($buffer, 4, $size), substr($buffer, $size+4));
}
else
{
return (undef, $buffer);
}
}
else
{
return (undef, $buffer);
}
}
sub pack($)
{
my $full_message = shift;
my $size = length($full_message);
return CORE::pack("N",$size).$full_message;
}
sub decode($)
{
my $message = shift;
my $message_code = substr($message, 0, 1);
my $body = substr($message, 1);
return ($message_code, $body);
}
sub encode($$)
{
my $message = shift;
my $body = shift;
return ($message).($body);
}
sub syswrite ($$)
{
my $unrecoverable = 0;
my $write_fd = shift;
my $full_message = shift;
my $result;
my $total_expected = length($full_message);
my $call_expected = $write_size;
my $offset = 0;
while ($total_expected and not $unrecoverable)
{
$call_expected = $total_expected if $call_expected > $total_expected;
$result =
CORE::syswrite($write_fd, $full_message, $call_expected, $offset);
if ($result)
{
$total_expected -= $result;
$offset += $result;
}
else
{
if ($!{EAGAIN})
{
print STDERR "Delayed write ...";
sleep 1;
}
else
{
$unrecoverable = 1;
}
}
}
if ($unrecoverable)
{
return undef;
}
else
{
return 1;
}
}
sub read_data ($)
{
my $descriptor = shift;
my $new_data;
my $result = sysread($descriptor, $new_data, $read_size);
return undef if not defined($result);
if ($result and exists($buffer{$descriptor}))
{
$buffer{$descriptor} .= $new_data;
}
else
{
$buffer{$descriptor} = $new_data;
}
return $result;
}
sub get_message ($)
{
my $descriptor = shift;
if (exists($buffer{$descriptor}))
{
my ($message, $new_buffer) = taktuk::unpack($buffer{$descriptor});
if (defined($new_buffer))
{
$buffer{$descriptor} = $new_buffer;
}
else
{
delete($buffer{$descriptor});
}
if (defined($message))
{
return $message;
}
else
{
return "";
}
}
else
{
return "";
}
}
sub find_sequence($$)
{
my $descriptor = shift;
my $sequence = shift;
my $found = undef;
if (exists($buffer{$descriptor}))
{
my $position;
$position = index($buffer{$descriptor},"\n");
while (($position >= 0) and not defined($found))
{
my $string;
$string = substr($buffer{$descriptor}, 0, $position);
$buffer{$descriptor} = substr($buffer{$descriptor}, $position+1);
if ($string =~ m/($sequence)/)
{
$found = $1;
}
else
{
$position = index($buffer{$descriptor},"\n");
}
}
}
return defined($found)?$found:"";
}
sub flush_buffer($)
{
my $descriptor = shift;
if (exists($buffer{$descriptor}))
{
my $result = $buffer{$descriptor};
delete($buffer{$descriptor});
return $result;
}
else
{
return "";
}
}
our $control_channel;
if ($ENV{TAKTUK_CONTROL})
{
open($control_channel, "+<&=", $ENV{TAKTUK_CONTROL})
or print("Error opening taktuk control channel : $!\n");
no_flush($control_channel);
}
use constant ESWRIT=>0;
use constant EFCLSD=>1;
use constant ESREAD=>2;
use constant EARGTO=>3;
use constant EARGBD=>4;
use constant ETMOUT=>5;
use constant EINVST=>6;
sub error_msg($)
{
my $error = shift;
if ($error == ESWRIT)
{
return "taktuk::syswrite failed, system message : $!";
}
elsif ($error == EFCLSD)
{
return "TakTuk engine closed the communication channel";
}
elsif ($error == ESREAD)
{
return "sysread error, system message : $!";
}
elsif ($error == EARGTO)
{
return "field 'to' not defined";
}
elsif ($error == EARGBD)
{
return "field 'body' not defined";
}
elsif ($error == ETMOUT)
{
return "timeouted";
}
elsif ($error == EINVST)
{
return "invalid destination set specification";
}
else
{
return "Unknown error";
}
}
sub send(%)
{
my %argument = @_;
my $from = $ENV{TAKTUK_RANK};
$error=EARGTO && return undef if not exists($argument{to});
my $to = $argument{to};
$error=EARGBD && return undef if not exists($argument{body});
my $body = $argument{body};
my @send_set = taktuk::decode_set($to);
if (scalar(@send_set))
{
$to = taktuk::encode_set(@send_set);
my $full_message = taktuk::encode($taktuk::send_to,
taktuk::pack($to).
taktuk::encode($taktuk::message,
taktuk::pack($to).
taktuk::pack($from).
$body));
my $result = taktuk::syswrite($control_channel,
taktuk::pack($full_message));
$error=ESWRIT if not $result;
return $result?$result:undef;
}
else
{
$error=EINVST;
return undef;
}
}
sub recv(%)
{
my %argument = @_;
my $result;
my $message;
if (exists($argument{timeout}))
{
$message = taktuk::encode($taktuk::wait_message, $argument{timeout});
}
else
{
$message = $taktuk::wait_message;
}
$result = taktuk::syswrite($control_channel,taktuk::pack($message));
if (not $result)
{
$error=ESWRIT;
return ();
}
$message = get_message($control_channel);
while ($result and not $message)
{
$result = read_data($control_channel);
$message = get_message($control_channel);
}
if ($result)
{
my ($message_code, $to, $from);
($message_code, $message) = taktuk::decode($message);
if ($message_code eq $taktuk::timeout)
{
$error=ETMOUT;
return ();
}
($to, $message) = taktuk::unpack($message);
($from, $message) = taktuk::unpack($message);
return ($to, $from, $message);
}
else
{
if (defined($result))
{
$error=EFCLSD;
}
else
{
$error=ESREAD;
}
return ();
}
}
sub decode_set($)
{
my $string = shift;
my @result = ();
my $interval;
my $i;
($interval,$string) = split /\//,$string,2;
while ($interval)
{
my ($min, $max) = split /-/,$interval,2;
my @replacement = ();
my $state = 0;
my $length = 0;
$i = $#result;
$max = $min if not $max;
return () if ($min !~ /^[0-9]+$/o) or ($max !~ /^[0-9]+$/o) or
($min > $max);
while (($i > -1) and ($max < $result[$i]))
{
$i--;
$state = 1 - $state;
}
unshift(@replacement, $max) if not $state;
while (($i > -1) and ($min <= $result[$i]))
{
$i--;
$state = 1 - $state;
$length++;
}
unshift(@replacement, $min) if not $state;
$i++;
splice @result, $i, $length, @replacement;
if (defined($string))
{
($interval,$string) = split /\//,$string,2;
}
else
{
$interval = 0;
}
}
for ($i=1; $i<$#result; $i+=2)
{
splice @result, $i, 2
while (($i<$#result) and ($result[$i]+1 == $result[$i+1]));
}
return @result;
}
sub encode_set(@)
{
my @set = @_;
my $result = "";
while (scalar(@set))
{
my $min = shift @set;
my $max = shift @set;
$result .= "/" if ($result);
if ($min == $max)
{
$result .= "$min";
}
else
{
$result .= "$min-$max";
}
}
return $result;
}
package command;
use strict; use bytes;
use Socket;
use Fcntl;
use POSIX;
sub new (%)
{
my $data = { @_ };
if (exists($data->{line}) and
(exists($data->{read}) or exists($data->{write}) or
exists($data->{error}) or exists($data->{type})))
{
diagnostic::error("Command built from both a command line and fds");
}
bless($data);
return $data;
}
sub run ()
{
my $self=shift;
my ($father_read,$father_write,$father_error);
my ($child_read,$child_write,$child_error);
undef($father_read);
undef($child_read);
if (not socketpair($father_read, $child_read, AF_UNIX, SOCK_STREAM,
PF_UNSPEC))
{
diagnostic::system;
return 0;
}
$father_write = $father_read;
$child_write = $child_read;
undef($father_error);
undef($child_error);
if (not pipe($father_error,$child_error))
{
diagnostic::system;
return 0;
}
my $pid;
$pid = fork;
if (not defined($pid))
{
diagnostic::system;
return 0;
}
$self->{pid} = $pid;
if ($pid)
{
diagnostic::debug("New child with pid $pid created".
" to execute $self->{line}");
CORE::close($child_read) or diagnostic::system;
CORE::close($child_error) or diagnostic::system;
$self->{read} = $father_read;
$self->{write} = $father_write;
$self->{error} = $father_error;
$self->{type} = settings::SOCKET;
return $self;
}
else
{
CORE::close($father_read) or die $!;
CORE::close($father_error) or die $!;
open(STDIN,"<&",$child_read) or die $!;
open(STDOUT,">&",$child_write) or die $!;
open(STDERR,">&",$child_error) or die $!;
CORE::close($child_read) or die $!;
CORE::close($child_error) or die $!;
fcntl(communicator::get_control()->{read}, F_SETFD, 0);
exec($self->{line}) or die "Exec failed : $!";
}
}
sub cleanup ()
{
my $self = shift;
foreach my $part ('read','write','error')
{
$self->close($part) if defined $self->{$part};
}
if (exists($self->{pid}))
{
diagnostic::debug("Cleaning up $self by waiting pid $self->{pid}");
if (not $self->isa('connector') and exists($self->{line}))
{
if (not exists($self->{status}))
{
waitpid($self->{pid},0);
$self->{status} = $?;
}
$self->output('status', $self->{status});
}
}
}
sub my_shutdown($$)
{
my $fd = shift;
my $how = shift;
if (not shutdown($fd,$how))
{
diagnostic::system if not $!{ENOTCONN};
}
}
sub close($)
{
my $self=shift;
my $part=shift;
if ($part eq 'error')
{
if (exists($self->{error}))
{
taktuk::flush_buffer($self->{error});
CORE::close($self->{error}) or diagnostic::system;
delete $self->{error};
}
else
{
diagnostic::debug("Cannot close error : does not exists");
}
}
else
{
if (exists($self->{type}) and exists($self->{$part}))
{
if ($self->{type} == settings::FD)
{
taktuk::flush_buffer($self->{write}) if $part eq 'write';
CORE::close($self->{$part}) or diagnostic::system;
delete $self->{$part};
}
elsif ($self->{type} == settings::SOCKET)
{
if ($part eq 'read')
{
my_shutdown($self->{read},1);
delete $self->{read};
}
elsif ($part eq 'write')
{
taktuk::flush_buffer($self->{write});
my_shutdown($self->{write},0);
delete $self->{write};
}
else
{
diagnostic::warning("Invalid part for socket close");
}
}
else
{
diagnostic::warning("Unknown command type : BUG");
}
}
else
{
diagnostic::warning("Closing a non existant command type or part")
unless (exists($self->{type}) and ($part eq 'read'));
} 
}
}
our $user_scalar=undef;
sub output ($$)
{
my $self = shift;
my $type = shift;
my $message = shift;
my $template;
my $fd;
my $rank = $general::rank;
if (exists($general::template->{$type}))
{
$template = $general::template->{$type};
}
else
{
$template = $general::template->{default};
}
if (exists($general::redirect->{$type}))
{
$fd = $general::redirect->{$type};
}
else
{
$fd = $general::redirect->{default};
}
if (defined($template))
{
my $host = $general::host;
my $count = $general::count;
my $peer = exists($self->{peer})?$self->{peer}:"";
my $command = exists($self->{line})?$self->{line}:"";
my $pid = exists($self->{pid})?$self->{pid}:$$;
my $level = $diagnostic::level;
my $level_name = $diagnostic::level_name;
my $package = $diagnostic::package;
my $filename = $diagnostic::filename;
my $line_number = $diagnostic::line_number;
my $line;
my $eol;
my $to_be_sent = "";
pos($message) = 0;
while ($message =~ /\G([^\n]*)(\n|$)/go)
{
$line = $1;
$eol = $2;
if (length($line) or length($eol))
{
my $result = eval("$template");
$to_be_sent .= $result if defined($result) and length($result);
}
}
if (length($to_be_sent))
{
communicator::process_message($self,
taktuk::encode($taktuk::forward_up,
taktuk::encode($taktuk::output,
taktuk::pack($fd).$to_be_sent)));
}
}
}
package connector;
use strict; use bytes;
our @ISA=qw(command);
our $init_string="Taktuk initialization, version ";
our $connector_functional_string="Taktuk connector functional";
our $ready_to_load_string="Taktuk ready to load";
our $pre_load=1;
our $first_load=0;
our $simple_connector=2;
our $initialized=3;
sub new (%)
{
my $data=command::new(@_,
data_handler=>\&communicator::process_messages,
remove_handler=>\&communicator::remove_connector);
return 0 if not $data;
$data->{last_given} = 0;
$data->{peers_count} = undef;
if (exists($data->{peer}))
{
if ($general::self_propagate)
{
$data->{state}=$pre_load;
}
else
{
$data->{state}=$simple_connector;
}
$data->{line} = "$data->{command} $data->{peer} $data->{taktuk}";
}
else
{
$data->{state} = $initialized;
}
bless($data);
return $data;
}
sub read_data ($)
{
my $self = shift;
my $descriptor = shift;
my $result = taktuk::read_data($descriptor);
diagnostic::system if not defined($result);
my $write = undef;
my $error = undef;
$write = $self->{write} if exists $self->{write};
$error = $self->{error} if exists $self->{error};
if ($write and ($descriptor == $write))
{
if ($result >0)
{
if ($self->{state} < $initialized)
{
my $argument = 0;
if ($self->{state} == $pre_load)
{
if (length(taktuk::find_sequence($descriptor,
".*$connector_functional_string")))
{
$argument = 1;
}
}
elsif ($self->{state} == $simple_connector)
{
my $sequence = taktuk::find_sequence($descriptor,
".*$init_string"."[0-9]+[.0-9]*");
if (length($sequence))
{
$argument = $sequence;
$argument =~ s/.* (.*)$/$1/o;
}
}
else
{
diagnostic::warning("Wrong connector state : ".
$self->{state});
}
if ($argument > 0)
{
my $connector_input = $self->{read};
if ($self->{state} < $simple_connector)
{
taktuk::syswrite($connector_input,
$general::taktuk_code) or diagnostic::system;
}
else
{
diagnostic::warning("Protocol versions do not match")
if $argument != $taktuk::VERSION;
}
$self->{state} += 1;
diagnostic::debug("Connector $self->{line} promoted to ".
"state ".$self->{state});
synchronizer::initialization_complete($self)
if ($self->{state} == $initialized);
}
}
}
}
elsif ($error and ($descriptor == $error))
{
if ($result >0)
{
my $new_data = taktuk::flush_buffer($descriptor);
$self->output('connector', $new_data);
} 
}
else
{
diagnostic::warning("Unknown descriptor");
}
return $result;
}
sub get_message ()
{
my $self = shift;
if ($self->{state} >= $initialized)
{
if (exists($self->{write}))
{
return taktuk::get_message($self->{write});
}
else
{
return "";
}
}
else
{
return "";
}
}
sub send_message($)
{
my $self = shift;
my $write_fd = $self->{read};
my $full_message = taktuk::pack(shift);
if (not taktuk::syswrite($write_fd, $full_message))
{
diagnostic::error("Error in send message, connector dumps into ".
Dumper($self));
}
}
sub pack()
{
my $self = shift;
return taktuk::pack($self->{command}).
taktuk::pack($self->{peer}).
taktuk::pack($self->{taktuk}).
taktuk::pack($self->{arguments});
}
sub unpack($)
{
my $buffer = shift;
my ($command, $peer, $taktuk, $arguments);
($command, $buffer) = taktuk::unpack($buffer);
($peer, $buffer) = taktuk::unpack($buffer);
($taktuk, $buffer) = taktuk::unpack($buffer);
($arguments, $buffer) = taktuk::unpack($buffer);
my $connector = connector::new(command => $command,
peer => $peer,
taktuk => $taktuk,
arguments => $arguments);
return ($connector, $buffer);
}
package communicator;
use strict; use bytes;
use Socket;
use POSIX;
use Errno;
our $sinks_number = 0;
our $initialized_sinks_number = 0;
our $select;
our $select_timeout;
our $connections;
our $control_connector = 0;
our $end = 0;
our %c_from_fd;
our $default_root = connector::new('read' =>\*STDOUT,
'type' =>settings::FD);
sub init()
{
$select = my_select::new() or diagnostic::error("Select creation");
$connections = {};
$connections->{sources} = [];
$connections->{sinks} = [];
$connections->{control} = [];
$connections->{local_commands} = [];
my $communicator_side = undef;
my $control_side = undef;
if (not socketpair($communicator_side, $control_side,
AF_UNIX, SOCK_STREAM, PF_UNSPEC))
{
diagnostic::system;
diagnostic::error("Degradated mode (no control channel)");
}
taktuk::no_flush($communicator_side);
taktuk::no_flush($control_side);
my $communicator_connector = connector::new('read' =>$communicator_side,
'write' =>$communicator_side,
'type' =>settings::SOCKET);
$communicator_connector->{pending_messages} = 0;
add_connector($communicator_connector, 'control');
$ENV{TAKTUK_CONTROL} = fileno($control_side);
$control_connector = connector::new('read' =>$control_side,
'write' =>$control_side,
'type' =>settings::SOCKET);
}
sub get_connections($)
{
my $set_name = shift;
my $list = $connections->{$set_name};
return @$list;
}
sub get_root()
{
my $sources = $connections->{sources};
if (defined($sources) and scalar(@$sources))
{
return $sources->[0];
}
else
{
return $default_root;
}
}
sub get_control()
{
return $control_connector;
}
sub get_outgoing()
{
my $sources = $connections->{control};
if (scalar(@$sources))
{
return $sources->[0];
}
else
{
return 0;
}
}
sub process_message($$)
{
my $connector = shift;
my ($message, $body) = taktuk::decode(shift);
my $function = handlers::get_handler($message);
if (defined($function))
{
&$function($message, $connector, $body);
}
else
{
diagnostic::warning("Unknown message : $message (body: $body)");
}
}
sub process_messages($$)
{
my $connector = shift;
my $descriptor = shift;
my $result = $connector->read_data($descriptor);
my $message = $connector->get_message;
while ($message)
{
process_message($connector, $message);
$message = $connector->get_message;
}
return $result;
}
sub process_command_output($$)
{
my $command = shift;
my $descriptor = shift;
my $buffer;
my $read_result;
$read_result = sysread($descriptor, $buffer, $taktuk::read_size);
diagnostic::system if not defined($read_result);
if ($read_result > 0)
{
my $type;
if (exists($command->{write}) and ($descriptor == $command->{write}))
{
$type = "output";
}
else
{
$type = "error";
}
$command->output($type, $buffer);
}
return $read_result;
}
sub run ()
{
my @select_result;
my $sinks = $connections->{sinks};
my $local_commands = $connections->{local_commands};
while ((not $end) or scalar(@$sinks) or scalar(@$local_commands))
{
timer::check_timeouts;
@select_result = 
my_select::select($select,undef,$select,$select_timeout);
if (scalar(@select_result))
{
my ($read_set, $write_set, $exception_set) = @select_result;
while (scalar(@$read_set))
{
my $descriptor = shift @$read_set;
my $cobidule = $c_from_fd{$descriptor};
if (exists($cobidule->{data_handler}))
{
my $handler = $cobidule->{data_handler};
my $result = &$handler($cobidule, $descriptor);
remove_descriptor($cobidule, $descriptor) if (not $result);
}
else
{
diagnostic::error("Bug : connector has no data handler");
}
}
if (scalar(@$write_set))
{
diagnostic::debug("Unexpected ready to write fds : ".
join(' ', @$write_set));
}
if (scalar(@$exception_set))
{
diagnostic::debug("Unexpected exceptional fds : ".
join(' ', @$exception_set));
}
}
else
{
if ($!{EBADF})
{
my $error_msg="Selected an invalid descriptor ($!)... Very ".
"bad\nRegistered handles = ";
foreach my $handle ($select->handles)
{
$error_msg.="$handle (".fileno($handle).") ";
}
diagnostic::error($error_msg);
exit 1;
}
elsif ($!{EINTR})
{
diagnostic::warning("Select exited because of a signal");
}
elsif ($!{EINVAL})
{
diagnostic::error("Invalid time limit for select");
}
else
{
}
}
}
}
sub terminate ()
{
$end = 1;
}
sub add_descriptors($)
{
my $command = shift;
diagnostic::debug("Adding $command to hash & select");
if (exists($command->{write}))
{
$select->add($command->{write});
$c_from_fd{$command->{write}} = $command;
}
if (exists($command->{error}))
{
$select->add($command->{error});
$c_from_fd{$command->{error}} = $command;
}
}
sub remove_descriptor($$)
{
my $cobidule = shift;
my $descriptor = shift;
my $type;
my $other;
my $write = undef;
my $error = undef;
$write = $cobidule->{write} if exists $cobidule->{write};
$error = $cobidule->{error} if exists $cobidule->{error};
if (defined($descriptor))
{
if (defined($write) and ($write == $descriptor))
{
$other = $error;
$type = 'write';
}
elsif (defined($error) and ($error == $descriptor))
{
$other = $write;
$type = 'error';
}
else
{
diagnostic::error("Invalid descriptor, serious BUG !");
}
if (exists $c_from_fd{$descriptor})
{
$select->remove($descriptor);
delete $c_from_fd{$descriptor};
$cobidule->close($type);
if ((not defined($other)) or (not exists $c_from_fd{$other}))
{
if (exists($cobidule->{remove_handler}))
{
my $remove_handler = $cobidule->{remove_handler};
&$remove_handler($cobidule);
}
}
}
else
{
diagnostic::warning("Descriptor not present in hash");
}
}
else
{
diagnostic::warning("Should not be called with undefined descriptor");
}
}
sub no_pending_connectors()
{
diagnostic::debug("Connectors situation, sinks_number : $sinks_number, ".
"initialized : $initialized_sinks_number");
dagnostic::warning("More initialized than sinks")
if ($sinks_number<$initialized_sinks_number);
return $sinks_number == $initialized_sinks_number;
}
sub add_connector($$)
{
my $connector = shift;
my $set_name = shift;
my $set = $connections->{$set_name};
taktuk::no_flush($connector->{read});
add_descriptors($connector);
push(@$set,$connector);
$sinks_number++ if $set_name eq 'sinks';
diagnostic::debug("Connector $connector->{line} added")
if defined($connector->{line});
}
sub connector_initialized($)
{
my $connector = shift;
$initialized_sinks_number++;
}
sub remove_from_set($$)
{
my $cobidule = shift;
my $set_name = shift;
my $set = $connections->{$set_name};
my $i = 0;
$i++ while (($i <= $#$set) and ($set->[$i] != $cobidule));
if ($i <= $#$set)
{
splice @$set, $i, 1;
diagnostic::debug("Cobidule $cobidule, $cobidule->{line}, ".
"$cobidule->{pid} deleted from set ".
"$set_name at position $i");
return 1;
}
else
{
return 0;
}
}
sub remove_connector($)
{
my $connector = shift;
my @sets_names = ('sinks', 'sources');
my $current_set_name = shift @sets_names;
my $found = 0;
while (not $found and defined($current_set_name))
{
$found = remove_from_set($connector, $current_set_name);
$current_set_name = shift @sets_names if not $found;
}
if ($found)
{
if ($current_set_name eq 'sources')
{
if (not scalar(@{$connections->{sources}}))
{
general::print("Exiting after loosing my sources ($$)\n");
exit 1;
}
}
elsif ($current_set_name eq 'sinks')
{
$sinks_number--;
if ($connector->{state} >= $connector::initialized)
{
$initialized_sinks_number--;
}
else
{
synchronizer::initialization_failed($connector);
}
handlers::check_ongoing_reduces;
}
$connector->cleanup;
}
else
{
diagnostic::warning("Connector to remove not found !");
}
} 
sub add_local_command($)
{
my $command = shift;
my $local_commands = $connections->{local_commands};
push @$local_commands,$command;
add_descriptors($command);
diagnostic::debug("Command $command added");
}
sub remove_local_command($)
{
my $command = shift;
my $found = remove_from_set($command, 'local_commands');
if ($found)
{
$command->cleanup;
}
else
{
diagnostic::warning("Command to remove not found !");
}
}
package scheduler;
use strict; use bytes;
our $dynamic_limit;
our $window;
our $static_window;
our $connector_timeout;
our @static_connectors = ();
our @dynamic_connectors = ();
our @waiting_thieves = ();
our $current_window = 0;
our $arity = 0;
our $steal_request_sent = 0;
our $sum_connection_times = 0;
our $sum_user_times = 0;
our $num_connections = 0;
our $local_idleness = 0;
our $average_connection_time = 0;
our $average_user_time = 0;
our $min_connection_time = 0;
sub deploy_connector($)
{
my $connector = shift;
if ($connector->run())
{
if ($connector_timeout)
{
my $timer = timer->register($connector_timeout,
\&synchronizer::initialization_timeout);
$connector->{timer} = $timer;
$timer->{connector} = $connector;
}
communicator::add_connector($connector, 'sinks');
diagnostic::debug("Connector added : ".$connector->{line});
$arity++;
$current_window++;
}
else
{
diagnostic::warning("Giving up connection to $connector->{peer}");
}
}
sub add_connector($)
{
my $connector = shift;
if (($current_window < $window) and
(($dynamic_limit <= 0) or ($arity < $dynamic_limit)))
{
deploy_connector($connector);
}
else
{
if ($dynamic_limit < 0)
{
push @static_connectors, $connector;
}
else
{
push @dynamic_connectors, $connector;
}
}
synchronizer::set_not_ready();
}
sub connector_initialized($)
{
$current_window--;
schedule;
}
sub connector_failed($)
{
my $connector = shift;
$current_window--;
$arity--;
schedule;
}
sub schedule()
{
while (scalar(@static_connectors) and ($current_window < $window))
{
my $connector = shift @static_connectors;
deploy_connector($connector);
}
while (scalar(@dynamic_connectors) and ($current_window < $window)
and (($dynamic_limit == 0) or ($arity < $dynamic_limit)))
{
my $connector = shift @dynamic_connectors;
deploy_connector($connector);
}
if (not $general::root and ($current_window < $window)
and (($dynamic_limit == 0) or ($arity < $dynamic_limit)) and
not $steal_request_sent)
{
communicator::get_root->send_message(
taktuk::encode($taktuk::steal,
"$average_connection_time $average_user_time $local_idleness"));
diagnostic::debug("Steal request sent with parameters :".
"$average_connection_time $average_user_time $local_idleness");
$steal_request_sent = 1;
}
else
{
diagnostic::debug("Nothing done in scheduler, static work : "
.scalar(@static_connectors).", dynamic work : "
.scalar(@dynamic_connectors));
}
}
sub is_idle()
{
return !scalar(@static_connectors) && !scalar(@dynamic_connectors);
}
sub send_work()
{
my $work_to_give = scalar(@dynamic_connectors) && scalar(@waiting_thieves);
my $work_given = $work_to_give;
diagnostic::debug("Theft, dynamic connectors : ".scalar(@dynamic_connectors)
.", waiting thieves : ".scalar(@waiting_thieves)
.", work to give : $work_to_give"); 
while ($work_to_give)
{
my $thief = shift @waiting_thieves;
my $last_given = $thief->{last_given};
my $to_give;
my $given=0;
my $workpack="";
if ($last_given)
{
my $available = $#dynamic_connectors;
$available /= 2;
$available = $available/2 + $available%2;
$to_give = $last_given * 2;
$to_give = $available if $to_give > $available;
}
else
{
$to_give = 1;
}
for ($given=0; $given < $to_give; $given++)
{
my $connector_to_deploy = shift @dynamic_connectors;
$workpack = $workpack.$connector_to_deploy->pack();
}
$thief->send_message(taktuk::encode($taktuk::work,
$workpack));
diagnostic::debug("Work sent : ".$workpack);
$work_to_give = scalar(@dynamic_connectors) && scalar(@waiting_thieves);
}
return $work_given;
} 
sub dispatch_work($)
{
my $workpack = shift;
my $connector;
while ($workpack)
{
($connector, $workpack) = connector::unpack($workpack);
diagnostic::debug("Unpacked connector: $connector");
push @dynamic_connectors, $connector; 
}
$steal_request_sent = 0;
schedule;
send_work;
}
sub theft_handler($$)
{
my $connector = shift;
my $parameters = shift;
push @waiting_thieves, $connector;
send_work;
if (scalar(@waiting_thieves) and not $general::root and
not $steal_request_sent)
{
communicator::get_root->send_message(
taktuk::encode($taktuk::steal, $parameters));
diagnostic::debug("Forwarded steal request, parameters : $parameters");
$steal_request_sent = 1;
}
elsif (scalar(@waiting_thieves))
{
if ($general::root)
{
diagnostic::debug("Cannot satisfy steal request : no more dynamic ".
"work");
}
elsif ($steal_request_sent)
{
diagnostic::debug("Request already sent");
}
}
}
package timer;
use strict; use bytes;
use Time::HiRes;
our @registered_timers;
sub current_time()
{
my ($seconds, $micro) = Time::HiRes::gettimeofday;
return $seconds + $micro/1000000;
}
sub register($$)
{
my $type = shift;
my $timeout = shift;
my $handler = shift;
my $current = current_time;
my $data = { 'handler'=>$handler,
'birth'=>$current };
bless($data, $type);
if ($timeout)
{
$data->{timeout} = $current+$timeout;
my $i=$#registered_timers;
$i-- while (($i >= 0) and
($registered_timers[$i]->{timeout} > $data->{timeout}));
splice @registered_timers, $i+1, 0, $data;
}
else
{
$data->{timeout} = 0;
}
diagnostic::debug("Registered new timer $data, ".$data->print.
", list = @registered_timers");
return $data;
}
sub check_timeouts()
{
my $current = current_time;
while (scalar(@registered_timers) and
($registered_timers[0]->{timeout} <= $current))
{
my $timer = shift @registered_timers;
my $handler = $timer->{handler};
&$handler($timer);
diagnostic::debug("Timeout handled for $timer, ".$timer->print);
}
}
sub unregister()
{
my $timer = shift;
if ($timer->{timeout})
{
my $i=0;
$i++ while (($i <= $#registered_timers) and
($registered_timers[$i] != $timer));
if ($i <= $#registered_timers)
{
splice @registered_timers, $i, 1;
diagnostic::debug("Unregistered timer $timer");
}
else
{
diagnostic::warning("Unregistering didn't found timer $timer");
}
}
}
sub gettime($)
{
my $current = current_time;
my $timer = shift;
return $current - $timer->{birth};
}
sub print($)
{
my $current = current_time;
my $timer = shift;
return "Timer created at $timer->{birth} timeouting at ".
"$timer->{timeout} current is $current";
}
package synchronizer;
use strict; use bytes;
our %states = (ready=>1, numbering=>1);
our $father_is_ready=0;
our $first_time=1;
our @ready_handlers = ();
our @pending_messages = ();
our %blocked;
our %event_waited;
sub check_ready_state()
{
if (!$states{ready})
{
if ($father_is_ready && scheduler::is_idle &&
communicator::no_pending_connectors)
{
if ($first_time)
{
foreach my $connector (communicator::get_connections('sinks'))
{
$connector->send_message($taktuk::ready);
}
$first_time = 0;
}
dispatch_event('ready');
diagnostic::debug("I'm ready");
}
else
{
diagnostic::debug("I'm not ready : father_is_ready = ".
"$father_is_ready, is_idle = ".
scheduler::is_idle.", no_pending_connector = ".
communicator::no_pending_connectors);
}
}
}
sub set_not_ready()
{
if ($states{ready})
{
$states{ready} = 0;
block_until_event('ready', $taktuk::broadcast,
$taktuk::downcast,
$taktuk::spread);
}
}
sub initialization_complete($)
{
my $connector = shift;
$connector->{timer}->unregister if (exists($connector->{timer}));
if ($general::self_propagate)
{
$connector->send_message(taktuk::encode($taktuk::taktuk_code,
$general::taktuk_code));
}
my $arguments = $connector->{arguments};
diagnostic::debug("Arguments : ".$arguments);
$connector->send_message(taktuk::encode($taktuk::arguments, $arguments));
communicator::connector_initialized($connector);
scheduler::connector_initialized($connector);
diagnostic::debug("Connector $connector->{line} initialized");
check_ready_state;
}
sub initialization_failed($)
{
my $connector = shift;
$connector->output('connector', "initialization failed");
scheduler::connector_failed($connector);
check_ready_state;
}
sub initialization_timeout($)
{
my $timer = shift;
my $connector = $timer->{connector};
if ($connector->{state} >= $connector::initialized)
{
diagnostic::warning("Bug, timeouted an initialized connector");
}
else
{
$connector->output('connector', "timeouted");
kill 9, $connector->{pid};
communicator::remove_descriptor($connector, $connector->{write})
if (defined($connector->{write}));
communicator::remove_descriptor($connector, $connector->{error})
if (defined($connector->{error}));
}
}
sub set_not_numbering()
{
if ($states{numbering})
{
$states{numbering} = 0;
block_until_event('numbering', $taktuk::execute,
$taktuk::eof,
$taktuk::input,
$taktuk::message,
$taktuk::send_to,
$taktuk::synchronize,
$taktuk::quit,
$taktuk::taktuk_perl);
}
}
sub block_until_event($@)
{
my $event = shift;
my $handlers = {};
foreach my $message (@_)
{
if (exists($event_waited{$message}))
{
diagnostic::warning("Multiple blocking for $message");
}
else
{
$handlers->{$message} = handlers::get_handler($message);
handlers::replace_handler($message, \&handlers::handler_blocked);
$event_waited{$message} = $event;
}
}
$blocked{$event} = { handlers=>$handlers, pending=>[] };
}
sub dispatch_event($)
{
my $event = shift;
my $handlers = $blocked{$event}->{handlers};
my $pending_list = $blocked{$event}->{pending};
$states{$event} = 1;
foreach my $message (keys(%$handlers))
{
handlers::replace_handler($message, $handlers->{$message});
delete $event_waited{$message};
}
while (scalar(@$pending_list))
{
my $message = shift @$pending_list;
my $connector = shift @$pending_list;
my $body = shift @$pending_list;
my $function = handlers::get_handler($message);
&$function($message, $connector, $body);
diagnostic::debug("Processing $message with body $body");
}
delete $blocked{$event};
}
sub add_pending_message($$$)
{
my $message = shift;
my $connector = shift;
my $body = shift;
my $event = $event_waited{$message};
my $pending_list = $blocked{$event}->{pending};
push @$pending_list, $message, $connector, $body;
}
package handlers;
use strict; use bytes;
our %handlers;
sub register_handler($$)
{
my $message = shift;
if (defined($handlers{$message}))
{
diagnostic::error("Handler already defined for $message");
}
else
{
$handlers{$message} = shift;
}
}
sub replace_handler($$)
{
my $message = shift;
if (defined($handlers{$message}))
{
$handlers{$message} = shift;
}
else
{
diagnostic::error("Handler not defined for $message");
}
}
sub get_handler($)
{
my $message = shift;
return $handlers{$message};
}
sub handler_blocked($$$)
{
my $message = shift;
my $connector = shift;
my $body = shift;
synchronizer::add_pending_message($message, $connector, $body);
}
sub arguments($$$)
{
my $message = shift;
my $connector = shift;
my $body = shift;
my $element;
my @arguments;
while ($body)
{
($element, $body) = taktuk::unpack($body);
push @arguments, $element;
}
arguments::fetch_arguments(@arguments);
arguments::parse_options;
main::process_commands;
scheduler::schedule;
}
sub broadcast($$$)
{
my $message = shift;
my $connector = shift;
my $body = shift;
foreach my $other_connector (communicator::get_connections('sources'),
communicator::get_connections('sinks'))
{
if ($other_connector != $connector)
{
diagnostic::debug("Spreading message to $other_connector");
$other_connector->send_message(taktuk::encode(
$taktuk::spread, $body));
}
}
}
sub downcast($$$)
{
my $message = shift;
my $connector = shift;
my $body = shift;
foreach my $other_connector (communicator::get_connections('sinks'))
{
if ($other_connector != $connector)
{
diagnostic::debug("Spreading message to $other_connector");
$other_connector->send_message(taktuk::encode(
$taktuk::spread, $body));
}
}
}
sub eof($$$)
{
my $message = shift;
my $connector = shift;
my $body = shift;
foreach my $command (communicator::get_connections('local_commands'))
{
$command->close('read');
diagnostic::debug("Closed inputs for command $command");
}
}
sub execute($$$)
{
my $message = shift;
my $connector = shift;
my $body = shift;
if ($general::root)
{
return 0;
}
else
{
my $command = command::new(line=>$body,
data_handler=>\&communicator::process_command_output,
remove_handler=>\&communicator::remove_local_command);
my $result;
$ENV{TAKTUK_PIDS} = join(' ', map("$_->{pid}",
communicator::get_connections('local_commands')));
if ($result = $command->run())
{
communicator::add_local_command($command);
}
else
{
diagnostic::warning("Giving up command $body");
}
return $result;
}
}
sub forward_up($$$)
{
my $message = shift;
my $connector = shift;
my $body = shift;
if ($general::root)
{
communicator::process_message($connector, $body);
}
else
{
foreach my $other_connector (communicator::get_connections('sources'))
{
$other_connector->send_message(
taktuk::encode($taktuk::forward_up, $body));
}
}
}
sub input($$$)
{
my $message = shift;
my $connector = shift;
my $body = shift;
foreach my $command (communicator::get_connections('local_commands'))
{
taktuk::syswrite($command->{read}, $body) or diagnostic::system;
}
}
sub message($$$)
{
my $message = shift;
my $connector = shift;
my $body = shift;
my $control = communicator::get_outgoing;
$control->send_message(taktuk::encode($message, $body));
$control->{pending_messages}++;
if (($control->{pending_messages} == 0) and exists($control->{timer}))
{
$control->{timer}->unregister;
delete $control->{timer};
}
}
sub numbering($$$)
{
my $message = shift;
my $connector = shift;
my $body = shift;
my ($rank, $count) = split / /,$body;
diagnostic::debug("I'm $rank among $count");
$general::rank = $rank;
$ENV{TAKTUK_RANK} = $rank;
$general::count = $count;
$ENV{TAKTUK_COUNT} = $count;
my $current = $rank+1;
$general::child_min = $current;
foreach my $other_connector (communicator::get_connections('sinks'))
{
if ($other_connector != $connector)
{
$other_connector->send_message(taktuk::encode($taktuk::numbering,
"$current $count"));
$other_connector->{min} = $current;
$current += $other_connector->{$taktuk::reduce_count};
$other_connector->{max} = $current-1;
}
else
{
diagnostic::warning("Bug : numbering coming from a sink");
}
}
$general::child_max = $current-1;
synchronizer::dispatch_event('numbering');
}
our %filehandles;
sub output($$$)
{
my $message = shift;
my $connector = shift;
my $body = shift;
if ($general::root)
{
my ($fd, $remaining) = taktuk::unpack($body);
if (not exists($filehandles{$fd}))
{
open ($filehandles{$fd}, ">&=", $fd) or diagnostic::system;
}
taktuk::syswrite($filehandles{$fd}, $remaining) or diagnostic::system;
}
else
{
diagnostic::warning("Output message received on non root node");
}
}
sub ping($$$)
{
my $message = shift;
my $connector = shift;
my $body = shift;
$connector->send_message($taktuk::pong);
}
sub pong($$$)
{
my $message = shift;
my $connector = shift;
my $body = shift;
diagnostic::debug("pong");
}
sub quit($$$)
{
my $message = shift;
my $connector = shift;
my $body = shift;
communicator::terminate;
}
sub ready($$$)
{
my $message = shift;
my $connector = shift;
my $body = shift;
diagnostic::debug("Ready received");
$synchronizer::father_is_ready = 1;
synchronizer::check_ready_state;
}
sub recv_timeout($)
{
my $timer = shift;
$timer->{connector}->send_message($taktuk::timeout);
$timer->{connector}->{pending_messages}++;
}
our %reduce_data;
our %reduce_rank_complete;
our %reduce_handler = ($taktuk::reduce_count => \&reduce_count,
$taktuk::reduce_tree => \&reduce_tree);
sub reduce_count ($$$)
{
my $rank = shift;
my $new_value = shift;
my $data = shift;
if ($rank == -1)
{
return 1;
}
elsif ($rank == -2)
{
if ($general::root)
{
$data--;
numbering($taktuk::numbering, communicator::get_root, "0 $data");
return undef;
}
else
{
return $data;
}
}
else
{
return $data + $new_value; 
}
}
sub reduce_tree($$$)
{
my $rank = shift;
my $new_value = shift;
my $data = shift;
my $result;
if ($rank == -1)
{
$result = "['$general::host ($general::rank, ".
"$synchronizer::states{ready})'";
}
elsif ($rank == -2)
{
if ($general::root)
{
eval('$new_value = '.$data."]");
general::print_tree("",$new_value);
return undef;
}
else
{
return $data."]";
}
}
else
{
$result = $data.",".$new_value; 
}
my @sinks = communicator::get_connections('sinks');
$rank++;
while (($rank <= $#sinks) and
($sinks[$rank]->{state} < $connector::initialized))
{
$result .= ", ['connecting $sinks[$rank]->{peer}']";
$rank++;
}
$reduce_rank_complete{$taktuk::reduce_tree} = $rank-1;
$sinks[$rank]->send_message(taktuk::encode($taktuk::reduce,
$taktuk::reduce_tree))
if ($rank <= $#sinks);
return $result;
}
sub reduce($$$)
{
my $message = shift;
my $connector = shift;
my $body = shift;
my @sinks = communicator::get_connections('sinks');
my $i;
my ($type,$value) = taktuk::decode($body);
if (not exists($reduce_rank_complete{$type}))
{
diagnostic::error("Handler not defined for reduce $type")
if not exists($reduce_handler{$type});
foreach my $sink (@sinks)
{
$sink->{$type} = undef;
}
$reduce_rank_complete{$type} = -1;
$reduce_data{$type} = &{$reduce_handler{$type}}(-1, undef, undef);
}
$connector->{$type} = $value;
$i = $reduce_rank_complete{$type} + 1;
while (($i <= $#sinks) and defined($sinks[$i]->{$type}))
{
$reduce_rank_complete{$type}++;
$reduce_data{$type} = &{$reduce_handler{$type}}($i,
$sinks[$i]->{$type}, $reduce_data{$type});
$i = $reduce_rank_complete{$type} + 1;
}
if ($i > $#sinks)
{
$reduce_data{$type} = &{$reduce_handler{$type}}(-2, undef,
$reduce_data{$type});
if (defined($reduce_data{$type}))
{
my $reply_connector = communicator::get_root;
$reply_connector->send_message(taktuk::encode( $taktuk::reduce,
taktuk::encode($type, $reduce_data{$type})));
}
delete $reduce_rank_complete{$type};
}
}
sub check_ongoing_reduces()
{
foreach my $reduce_type (keys(%reduce_rank_complete))
{
reduce($taktuk::reduce, communicator::get_root, $reduce_type.0);
}
}
sub send_to($$$)
{
my $message = shift;
my $connector = shift;
my $body = shift;
my ($to, $remaining) = taktuk::unpack($body);
diagnostic::debug("Message to $to and I'm $general::rank");
my $new_message;
my @destination_list = taktuk::decode_set($to);
my @upward = ();
my @downward = ();
my $delivery = 0;
my $min = shift @destination_list;
my $max = shift @destination_list;
while (defined($min) and ($min < $general::rank))
{
if ($max >= $general::rank)
{
push @upward, $min, $general::rank-1;
unshift @destination_list, $general::rank, $max;
}
else
{
push @upward, $min, $max;
}
$min = shift @destination_list;
$max = shift @destination_list;
}
if (defined($min) and ($min == $general::rank))
{
$delivery = 1;
if ($max == $general::rank)
{
$min = shift @destination_list;
$max = shift @destination_list;
} 
else
{
$min = $general::rank+1;
}
}
my $i=0;
my @sinks = communicator::get_connections('sinks');
while (defined($min) and ($min <= $general::child_max))
{
if ($max > $general::child_max)
{
push @upward, $general::child_max+1, $max;
$max = $general::child_max;
}
while (($i <= $#sinks) and ($min > $sinks[$i]->{max}))
{
if (scalar(@downward))
{
$new_message = taktuk::encode($message,
taktuk::pack(taktuk::encode_set(@downward)).$remaining);
$sinks[$i]->send_message($new_message);
@downward = ();
}
$i++;
}
if (($i <= $#sinks) and ($min < $sinks[$i]->{min}) and
($max >= $sinks[$i]->{min}))
{
unshift @destination_list, $sinks[$i]->{min}, $max;
$max = $sinks[$i]->{min}-1;
}
if (($i > $#sinks) or ($min < $sinks[$i]->{min}))
{
diagnostic::warning("Send problem, ".
(($min != $max)?"$min-$max":"$min")." not available anymore");
}
else
{
if ($max > $sinks[$i]->{max})
{
unshift @destination_list, $sinks[$i]->{max}+1, $max;
$max = $sinks[$i]->{max};
}
push @downward, $min, $max;
}
$min = shift @destination_list;
$max = shift @destination_list;
}
if (scalar(@downward))
{
$new_message = taktuk::encode($message,
taktuk::pack(taktuk::encode_set(@downward)).$remaining);
$sinks[$i]->send_message($new_message);
}
while (defined($min))
{
push @upward, $min, $max;
$min = shift @destination_list;
$max = shift @destination_list;
}
if (scalar(@upward))
{
if ($general::root)
{
diagnostic::warning("Send problem, ".taktuk::encode_set(@upward).
" is(are) invalid destination(s)");
}
else
{
$new_message = taktuk::encode($message,
taktuk::pack(taktuk::encode_set(@upward)).$remaining);
communicator::get_root->send_message($new_message);
}
}
if ($delivery)
{
diagnostic::debug("Delivered message");
communicator::process_message(communicator::get_control, $remaining);
}
}
sub spread($$$)
{
my $message = shift;
my $connector = shift;
my $body = shift;
broadcast($message, $connector, $body);
diagnostic::debug("Handling message locally");
communicator::process_message($connector, $body);
}
sub steal($$$)
{
my $message = shift;
my $connector = shift;
my $body = shift;
diagnostic::debug("Steal request : $body, connector : $connector");
scheduler::theft_handler($connector, $body);
synchronizer::check_ready_state;
}
sub synchronize($$$)
{
my $message = shift;
my $connector = shift;
my $body = shift;
communicator::process_message($connector, $body);
}
sub taktuk_code($$$)
{
my $message = shift;
my $connector = shift;
my $body = shift;
diagnostic::debug("Taktuk code received");
$general::taktuk_code = $body;
}
sub taktuk_perl($$$)
{
my $message = shift;
my $connector = shift;
my $body = shift;
my ($options, $arguments, $filename);
if ($body =~ /(?:^|\s)--(?:$|\s)/)
{
($options, $arguments) = split /(?:^|\s)--(?:$|\s)/,$body,2;
$arguments = "" if not defined($arguments);
}
else
{
($options, $arguments) = ("", $body);
}
($filename, $arguments) = split /\s/, $arguments, 2;
$filename = "" if not defined($filename);
$arguments = "" if not defined($arguments);
diagnostic::debug("Taktuk perl execution, options [$options], filename ".
"[$filename], arguments [$arguments]");
my $command = execute($taktuk::execute, $connector,
"perl $options -- - $arguments");
if ($command)
{
$command->{line} = "taktuk_perl";
$command->{line} .= " $body" if $body;
general::load_taktuk_package('taktuk');
taktuk::syswrite($command->{read}, $general::taktuk_package)
or diagnostic::system;
taktuk::syswrite($command->{read}, "\npackage main;\n")
or diagnostic::system;
if ($filename and ($filename ne "-"))
{
$filename = qx{echo "$filename"};
chomp($filename);
taktuk::syswrite($command->{read}, general::load_file($filename))
or diagnostic::system;
$command->close('read');
}
}
}
sub wait_message($$$)
{
my $message = shift;
my $connector = shift;
my $body = shift;
$connector->{pending_messages}--;
if ($body)
{
if (($connector->{pending_messages} < 0) and ($body > 0))
{
my $timer = timer->register($body,\&recv_timeout);
$timer->{connector} = $connector;
$connector->{timer} = $timer;
}
}
}
sub work($$$)
{
my $message = shift;
my $connector = shift;
my $body = shift;
diagnostic::debug("Work received : $body");
scheduler::dispatch_work($body);
synchronizer::check_ready_state;
}
sub init()
{
handlers::register_handler($taktuk::arguments, \&arguments);
handlers::register_handler($taktuk::broadcast, \&broadcast);
handlers::register_handler($taktuk::downcast, \&downcast);
handlers::register_handler($taktuk::eof, \&eof);
handlers::register_handler($taktuk::execute, \&execute);
handlers::register_handler($taktuk::forward_up, \&forward_up);
handlers::register_handler($taktuk::input, \&input);
handlers::register_handler($taktuk::message, \&message);
handlers::register_handler($taktuk::output, \&output);
handlers::register_handler($taktuk::ping, \&ping);
handlers::register_handler($taktuk::pong, \&pong);
handlers::register_handler($taktuk::quit, \&quit);
handlers::register_handler($taktuk::ready, \&ready);
handlers::register_handler($taktuk::reduce, \&reduce);
handlers::register_handler($taktuk::numbering, \&numbering);
handlers::register_handler($taktuk::send_to, \&send_to);
handlers::register_handler($taktuk::spread, \&spread);
handlers::register_handler($taktuk::steal, \&steal);
handlers::register_handler($taktuk::synchronize, \&synchronize);
handlers::register_handler($taktuk::taktuk_code, \&taktuk_code);
handlers::register_handler($taktuk::taktuk_perl, \&taktuk_perl);
handlers::register_handler($taktuk::wait_message, \&wait_message);
handlers::register_handler($taktuk::work, \&work);
}
package main;
use strict; use bytes;
our $taktuk_interpreter = undef;
our $interactive = 0;
our $forced_interactive = 0;
our $no_numbering = 0;
our $terminate = 0;
sub translate()
{
my $message = undef;
my $data = undef;
my @commands = qw(broadcast close downcast exec file_input input
line_input print_tree synchronize taktuk_perl quit);
my $found="";
my $number_found=0;
my $command;
my $dont_skip = 0;
$command = arguments::get_next_command;
if (not $arguments_ended and ($command =~ /^[a-z]+$/))
{
foreach my $fullname (@commands)
{
if ($fullname =~ m/^$command/)
{
$found = $fullname;
$number_found++;
}
}
$command = $found if ($number_found == 1);
}
if ($arguments_ended or $command =~ m/^\s*$/o)
{
$message = "";
$dont_skip = 1 if $arguments_ended;
}
elsif ($command =~ /^[0-9]/)
{
my @send_set = taktuk::decode_set($command);
if (scalar(@send_set))
{
$dont_skip = 1;
($message, $data) = translate();
if ($message)
{
$message = taktuk::encode($taktuk::send_to,
taktuk::pack(taktuk::encode_set(@send_set)).
$message);
}
}
else
{
diagnostic::error("Invalid set specification : $command");
$message = "";
}
}
elsif ($command eq "broadcast")
{
$dont_skip = 1;
($message, $data) = translate();
if ($message)
{
$message = taktuk::encode($taktuk::broadcast, $message);
}
}
elsif ($command eq "downcast")
{
$dont_skip = 1;
($message, $data) = translate();
if ($message)
{
$message = taktuk::encode($taktuk::downcast, $message);
}
}
elsif ($command eq "close")
{
$message = $taktuk::eof;
}
elsif ($command eq "exec")
{
my $parameters = arguments::get_parameters;
$message = taktuk::encode($taktuk::execute, $parameters);
}
elsif ($command eq "file_input")
{
my $parameters = arguments::get_parameters;
$data = general::open_file($parameters);
$message = $taktuk::input if $data;
}
elsif ($command eq "input")
{
my $parameters = arguments::get_parameters;
$message = taktuk::encode($taktuk::input, $parameters);
}
elsif ($command eq "line_input")
{
my $parameters = arguments::get_parameters;
$message = taktuk::encode($taktuk::input, $parameters."\n");
}
elsif ($command eq "print_tree")
{
$message = taktuk::encode($taktuk::reduce, $taktuk::reduce_tree);
}
elsif ($command eq "synchronize")
{
$dont_skip = 1;
($message, $data) = translate();
if ($message)
{
$message = taktuk::encode($taktuk::synchronize, $message);
}
}
elsif ($command eq "taktuk_perl")
{
my $parameters = arguments::get_parameters;
$message = taktuk::encode($taktuk::taktuk_perl, $parameters);
}
elsif ($command eq "quit")
{
$terminate = 1;
$message = "";
}
else
{
diagnostic::error("Unknown command : $command");
$message = "";
}
arguments::skip_command_separator unless $dont_skip;
return ($message, $data);
}
sub fork_taktuk_interpreter()
{
$taktuk_interpreter = fork();
if (not defined($taktuk_interpreter))
{
diagnostic::system;
diagnostic::error("FATAL : cannot continue without forked interpreter");
exit 1;
}
if ($taktuk_interpreter)
{
arguments::initialize_terminal;
process_commands();
waitpid $taktuk_interpreter,0;
exit 0;
}
}
sub handle_message($)
{
my $message = shift;
my $connector = communicator::get_control;
if (defined($taktuk_interpreter))
{
$connector->send_message($message);
}
else
{
communicator::process_message($connector, $message);
}
}
sub process_commands()
{
my $message;
my $data;
handle_message(taktuk::encode($taktuk::spread,
taktuk::encode($taktuk::reduce, $taktuk::reduce_count)))
if (($general::root) and !$no_numbering);
while (not $arguments::arguments_ended and not $terminate)
{
($message, $data) = translate();
if ($message)
{
if ($data)
{
my $result;
my $partial_message;
my $buffer;
$result = sysread($data, $buffer, $taktuk::read_size/2);
while ($result)
{
$partial_message = taktuk::encode($message, $buffer);
diagnostic::debug("Message to send : $partial_message");
handle_message($partial_message);
$result = sysread($data, $buffer, $taktuk::read_size/2);
}
diagnostic::system if not defined($result);
CORE::close($data) if ($data != \*STDIN);
}
else
{
diagnostic::debug("Message to send : $message");
handle_message($message);
}
}
if ($arguments::arguments_ended and $interactive)
{
arguments::fetch_arguments(\*STDIN);
$interactive = 0;
}
}
if ($general::root)
{
handle_message(taktuk::encode($taktuk::synchronize,
taktuk::encode($taktuk::broadcast, $taktuk::eof)));
handle_message(taktuk::encode($taktuk::synchronize,
taktuk::encode($taktuk::spread,$taktuk::quit)));
}
}
taktuk::no_flush(\*STDOUT);
general::init;
arguments::init;
communicator::init;
handlers::init;
synchronizer::set_not_ready;
arguments::fetch_arguments(@ARGV);
arguments::parse_options;
synchronizer::set_not_numbering unless $no_numbering;
if ($general::root)
{
$synchronizer::father_is_ready = 1;
general::load_taktuk_file if $general::self_propagate;
if ($arguments::arguments_ended or $forced_interactive)
{
diagnostic::debug("Interactive mode");
if ($arguments::arguments_ended)
{
arguments::fetch_arguments(\*STDIN);
}
else
{
$interactive = 1;
}
}
fork_taktuk_interpreter;
}
else
{
general::print($connector::init_string.$taktuk::VERSION."\n");
}
synchronizer::check_ready_state;
scheduler::schedule;
communicator::run;
diagnostic::debug("End of the taktuk code");
__END__
