#!/usr/bin/perl # INTRODUCTION: # cluster_schedule is a free (in every sense) perl script with no warranty or # support. # # SYNOPSIS: # cluster_schedule [-n node01,node02,...] schedule_file # # -n distribute the jobs in schedule_file on these nodes. # # DESCRIPTION: # The script cluster_schedule distributes jobs defined in the schedule file on # a number of nodes. Each job is a list of commands which cluster_schedule will # run in order on one node. If one of the commands in a job fails (error code # is not zero) no more commands in the job is executed and the job is # considered failed. If all commands in a job complete successfully (error codes # are zero) the job is a success. # # The nodes the jobs are run on can be defined on the command line or in the # schedule file. The nodes defined in command line replace all nodes defined in # the schedule_file. # # Each job is run on one node and each command is executed on the node using # ssh. Therefore, to use cluster_schedule make sure that all nodes are set up # to use automatic ssh authentication. # # SCHEDULE_FILE: # In schedule_file (see schedule_example file for an example) variables, nodes, # locks and jobs are defined. Variables are defined using the following syntax # and can only be defined in the outer scope: # # VARIABLE_NAME=VALUE # # The nodes which the jobs are run on can be set using the following syntax: # # # node01 # node02 # ... # # # where the nodes defined in the scope are the ip addresses or DNS # names of the nodes. If a node is defined x times, up to x jobs can be # distributed to this node by cluster_schedule. # # A job is a collection of commands defined in a scope: # # # command_1 # command_2 # command_3 # ... # # # where job_name is the name of the job. All commands in a job will be run in # order one the same node (selected from the list of nodes given). Variables # given by $VARIABLE_NAME or ${VARIABLE_NAME} are substituted by their value. # To write a dollar sign use \$. # # Locks can only be defined in the scope. The purpose of locks is to # ensure that commands in a lock scope with the same name are not executed # simultaneously: # # # # command_1 # command_2 # ... # # # # where lock_name is the name of the lock. use strict; use threads; use threads::shared; # semaphore used when printing my $sem_print : shared; # semaphore used when poping and pushing jobs my $sem_job : shared; # semaphore used to make exclusive tasks my $sem_task : shared; # ssh command (-tt since we want to force pseudo-tty allocation) my $ssh = '/usr/bin/ssh -tt'; # jobs are stored here my @job_queue : shared; # tasks locks my %task_locks : shared; # job counters my $jobs_left : shared; my $jobs_successfull : shared = 0; my $jobs_failed : shared = 0; sub log_print { my $string_to_log = @_[0]; lock($sem_print); my ($sec,$min,$hour,$mday,$mon,$year,$wday,$yday,$isdst)=localtime(time()); printf "%4d%02d%02d %02d:%02d:%02d ", $year+1900,$mon+1,$mday,$hour,$min,$sec; print $string_to_log."\n"; } sub replace_vars { my $vars_ref = @_[0]; my $string = @_[1]; my $line_count = @_[2]; my $failed = undef; $string =~ s/((\\\$)|(\$\w+)|(\${\w+}))/if (!exists($vars_ref->{$1}) && !defined($failed)) { $failed=$1 } ; (exists($vars_ref->{$1}) ? $vars_ref->{$1} : $1 )/ge; if (defined($failed)) { die "script error: unknown variable ".$failed.", line ".$line_count."\n"; } return $string; } sub read_script { my $script_name = @_[0]; my $jobs_ref = @_[1]; my $nodes_ref = @_[2]; my $job_name = undef; my $lock_name = undef; my @tasks = (); my %vars = (); my $token_id = 0; $vars{'\$'} = '$'; open SCRIPT_FILE, "<$script_name" or die 'script error: unable to open script "'.$script_name."\"\n"; my $line_count = 0; while (my $line = ) { chomp($line); $line =~ s/#.*//; # remove comments $line =~ s/^\s+//; # remove leading blanks $line =~ s/\s+$//; # remove trailing blanks $line_count++; if ($line eq '') { next; } if ($line =~ /^<\s*nodes\s*>$/) { # nodes token, id = 1 if ($token_id != 0) { die "script error: only valid in main scope, line ".$line_count."\n"; } $token_id = 1; } elsif ($line =~ /^<\s*\/\s*nodes\s*>$/) { # nodes end token if ($token_id != 1) { die "script error: unmatched , line ".$line_count."\n"; } $token_id = 0; } elsif ($line =~ /<\s*job\s*(\w*)\s*>$/) { # job token, id = 2 my $tmp = $1; if ($token_id != 0) { die "script error: only valid in main scope, line ".$line_count."\n"; } if ($tmp eq '') { $job_name = undef; } else { $job_name = $tmp; } $token_id = 2; } elsif ($line =~ /^<\s*\/\s*job\s*>$/) { # job end token if ($token_id != 2) { die "script error: unmatched , line ".$line_count."\n"; } if (scalar(@tasks) == 0) { die "script error: job has no tasks, line ".$line_count."\n"; } push(@{$jobs_ref}, [ $job_name, [ @tasks ] ]); @tasks = (); $job_name = undef; $token_id = 0; } elsif ($line =~ /^<\s*lock\s*(\w*)s*>$/) { # lock token, id = 3 $lock_name = $1; if ($token_id != 2) { die "script error: only valid in scope, line ".$line_count."\n"; } if ($lock_name eq '') { die "script error: lock has no name, line ".$line_count."\n"; } $token_id = 3; } elsif ($line =~ /^<\s*\/\s*lock\s*>$/) { # lock end token if ($token_id != 3) { die "script error: unmatched , line ".$line_count."\n"; } $lock_name = undef; $token_id = 2; } elsif ($line =~ /^<[^>]*>$/) { # unknown token die "script error: unknown token, line ".$line_count."\n"; } else { if ($token_id == 0) { # main scope if ($line =~ /^(\w+)\s*=\s*(.+)$/) { $vars{'$'.$1} = $2; $vars{'${'.$1.'}'} = $2; } else { die "script error: parse error, line ".$line_count."\n"; } } elsif ($token_id == 1) { # nodes scope if (defined($nodes_ref)) { push(@{$nodes_ref}, $line); } } elsif ($token_id == 2 or $token_id == 3) { # job or lock scope push(@tasks, [ $lock_name, $line_count, replace_vars(\%vars, $line, $line_count)]); } else { die "internal program error\n"; } } } close SCRIPT_FILE; if (scalar(@{$jobs_ref}) == 0) { die "script error: no jobs found\n"; } } sub run_command_on_node { my $command = @_[0]; my $node = @_[1]; my $exitcode; my $error; system($ssh.' '.$node." '".$command."' /dev/null"); $exitcode = $? == -1 ? -1 : $? >> 8; if ($exitcode == -1) { $error = $!; } return ($exitcode, $error); } sub shift_job { my $job_number; lock($sem_job); while (scalar(@job_queue) == 0 && $jobs_left) { cond_wait($sem_job); } if (scalar(@job_queue)) { $job_number = shift(@job_queue); return $job_number; } return undef; } sub push_job { my $job_number = @_[0]; lock($sem_job); push(@job_queue, $job_number); if (scalar(@job_queue) == 1) { cond_broadcast($sem_job); } } sub done_job_successfull { lock($sem_job); $jobs_left--; $jobs_successfull++; if ($jobs_left == 0) { cond_broadcast($sem_job); } } sub done_job_failed { lock($sem_job); $jobs_left--; $jobs_failed++; if ($jobs_left == 0) { cond_broadcast($sem_job); } } sub node_check { my $node_name = @_[0]; my ($exitcode, $error) = run_command_on_node("echo Hello", $node_name); if ($exitcode != 0) { log_print(' checking slave '.$node_name.'... failed'); return 1; } log_print(' checking slave '.$node_name.'... success'); return 0; } sub lock_task { my $lock_name = @_[0]; lock($sem_task); while (defined($task_locks{$lock_name})) { cond_wait($sem_task); } $task_locks{$lock_name} = 1; } sub unlock_task { my $lock_name = @_[0]; lock($sem_task); delete($task_locks{$lock_name}); cond_broadcast($sem_task); } sub node_worker { my $node_name = @_[0]; my $jobs_ref = @_[1]; my $old_lock_name = undef; while (defined(my $job_number = shift_job())) { my ($job_name, $tasks_ref) = @{$jobs_ref->[$job_number]}; my $i = 1; my $success = 1; $job_name = defined($job_name) ? $job_name : 'number '.$job_number; log_print('job '.$job_name.' started on node '.$node_name); for my $task (@{$tasks_ref}) { my ($lock_name, $line, $cmd) = @{$task}; # release old lock if (defined($old_lock_name) && (!defined($lock_name) || $old_lock_name ne $lock_name)) { unlock_task($old_lock_name); $old_lock_name = undef; } if (defined($lock_name) && !defined($old_lock_name)) { lock_task($lock_name); } $old_lock_name = $lock_name; log_print('job '.$job_name.' task '.$i.' of '.scalar(@{$tasks_ref}).' started on node '.$node_name); my ($exitcode, $error) = run_command_on_node($cmd, $node_name); # unlock_task($lock_name); if ($exitcode != 0) { log_print('job '.$job_name.' task '.$i.' of '.scalar(@{$tasks_ref}).' (line '.$line.') on node '.$node_name.' failed!'); log_print('job '.$job_name.' on node '.$node_name.' failed!'); # push_job($job_number); done_job_failed(); $success = 0; last; } log_print('job '.$job_name.' task '.$i.' of '.scalar(@{$tasks_ref}).' success'); $i++; } if (defined($old_lock_name)) { unlock_task($old_lock_name); } if ($success) { log_print('job '.$job_name.' on node '.$node_name.' success'); done_job_successfull(); } } } sub check_nodes { my $nodes_ref = @_[0]; my @wnodes; foreach my $node_name (@{$nodes_ref}) { if (node_check($node_name) == 0) { push(@wnodes, $node_name); } } return @wnodes; } sub start_node_workers { my $nodes_ref = @_[0]; my $threads_ref = @_[1]; my $jobs_ref = @_[2]; foreach my $node_name (@{$nodes_ref}) { my $t = threads->new(\&node_worker, $node_name, $jobs_ref); push(@{$threads_ref}, $t); } } sub join_node_workers { my $threads_ref = @_[0]; foreach my $thread (@{$threads_ref}) { $thread->join(); } } sub parse_args { my $nodes_ref = @_[0]; my $node_arg = 0; my $script_name = undef; while (defined(my $arg = shift @ARGV)) { if ($arg eq '-n') { if ($node_arg) { die "error, second -n argument found\n". "syntax: cluster_schedule [-n node1[,node02]*] script_name\n"; } else { $node_arg = 1; } my $nodes = shift @ARGV; if (!defined($nodes)) { die "error, no nodes found after -n\n". "syntax: cluster_schedule [-n node1[,node02]*] script_name\n"; } @{$nodes_ref} = split(',', $nodes); } else { if (defined($script_name)) { die "syntax: cluster_schedule [-n node1,node2,...] script_name\n"; } $script_name = $arg; } } if (!defined($script_name)) { die "syntax: cluster_schedule [-n node1,node2,...] script_name\n"; } return $script_name; } my @nodes = (); my @jobs = (); my $script_name = undef; my $nodes_found_in_file = 0; my $time = -time(); $script_name = parse_args(\@nodes); if (scalar(@nodes) == 0) { $nodes_found_in_file = 1; read_script($script_name, \@jobs, \@nodes); } else { read_script($script_name, \@jobs, undef); } if (scalar(@nodes) == 0) { die "error, no nodes defined\n"; } log_print('found '.scalar(@jobs).' jobs in file '.$script_name); log_print('found '.scalar(@nodes).' nodes '.($nodes_found_in_file ? 'in file '.$script_name : 'on command line').', checking...'); my $node_count = scalar(@nodes); # checking nodes @nodes = check_nodes(\@nodes); if (scalar(@nodes) == 0) { log_print('no working nodes, exit'); exit(1); } log_print('using '.scalar(@nodes).' of '.$node_count.' available nodes'); $jobs_left = scalar(@jobs); @job_queue = 0 ... (scalar(@jobs) - 1); my @threads; start_node_workers(\@nodes, \@threads, \@jobs); join_node_workers(\@threads); $time += time(); my $t_days = int($time / (24 * 3600)); $time -= $t_days * 24 * 3600; my $t_hours = int($time / 3600); $time -= $t_hours * 3600; my $t_minuttes = int($time / 60); my $t_seconds = $time - $t_minuttes * 60; log_print('summary...'); log_print(' successful jobs : '.$jobs_successfull); log_print(' failed jobs : '.$jobs_failed); log_print(' total running time:'.($t_days ? ' '.$t_days.'d' : ''). ($t_hours ? ' '.$t_hours.'m' : ''). ($t_minuttes ? ' '.$t_minuttes.'m' : ''). ($t_seconds ? ' '.$t_seconds.'s' : '')."\n");