#!/usr/bin/perl
# INTRODUCTION:
# cluster_schedule is a free (in every sense) perl script with no warranty or
# support.
#
# SYNOPSIS:
# cluster_schedule [-n node01,node02,...] schedule_file
#
# -n distribute the jobs in schedule_file on these nodes.
#
# DESCRIPTION:
# The script cluster_schedule distributes jobs defined in the schedule file on
# a number of nodes. Each job is a list of commands which cluster_schedule will
# run in order on one node. If one of the commands in a job fails (error code
# is not zero) no more commands in the job is executed and the job is
# considered failed. If all commands in a job complete successfully (error codes
# are zero) the job is a success.
#
# The nodes the jobs are run on can be defined on the command line or in the
# schedule file. The nodes defined in command line replace all nodes defined in
# the schedule_file.
#
# Each job is run on one node and each command is executed on the node using
# ssh. Therefore, to use cluster_schedule make sure that all nodes are set up
# to use automatic ssh authentication.
#
# SCHEDULE_FILE:
# In schedule_file (see schedule_example file for an example) variables, nodes,
# locks and jobs are defined. Variables are defined using the following syntax
# and can only be defined in the outer scope:
#
# VARIABLE_NAME=VALUE
#
# The nodes which the jobs are run on can be set using the following syntax:
#
#
# node01
# node02
# ...
#
#
# where the nodes defined in the scope are the ip addresses or DNS
# names of the nodes. If a node is defined x times, up to x jobs can be
# distributed to this node by cluster_schedule.
#
# A job is a collection of commands defined in a scope:
#
#
# command_1
# command_2
# command_3
# ...
#
#
# where job_name is the name of the job. All commands in a job will be run in
# order one the same node (selected from the list of nodes given). Variables
# given by $VARIABLE_NAME or ${VARIABLE_NAME} are substituted by their value.
# To write a dollar sign use \$.
#
# Locks can only be defined in the scope. The purpose of locks is to
# ensure that commands in a lock scope with the same name are not executed
# simultaneously:
#
#
#
# command_1
# command_2
# ...
#
#
#
# where lock_name is the name of the lock.
use strict;
use threads;
use threads::shared;
# semaphore used when printing
my $sem_print : shared;
# semaphore used when poping and pushing jobs
my $sem_job : shared;
# semaphore used to make exclusive tasks
my $sem_task : shared;
# ssh command (-tt since we want to force pseudo-tty allocation)
my $ssh = '/usr/bin/ssh -tt';
# jobs are stored here
my @job_queue : shared;
# tasks locks
my %task_locks : shared;
# job counters
my $jobs_left : shared;
my $jobs_successfull : shared = 0;
my $jobs_failed : shared = 0;
sub log_print {
my $string_to_log = @_[0];
lock($sem_print);
my ($sec,$min,$hour,$mday,$mon,$year,$wday,$yday,$isdst)=localtime(time());
printf "%4d%02d%02d %02d:%02d:%02d ", $year+1900,$mon+1,$mday,$hour,$min,$sec;
print $string_to_log."\n";
}
sub replace_vars {
my $vars_ref = @_[0];
my $string = @_[1];
my $line_count = @_[2];
my $failed = undef;
$string =~ s/((\\\$)|(\$\w+)|(\${\w+}))/if (!exists($vars_ref->{$1}) && !defined($failed)) { $failed=$1 } ; (exists($vars_ref->{$1}) ? $vars_ref->{$1} : $1 )/ge;
if (defined($failed)) {
die "script error: unknown variable ".$failed.", line ".$line_count."\n";
}
return $string;
}
sub read_script {
my $script_name = @_[0];
my $jobs_ref = @_[1];
my $nodes_ref = @_[2];
my $job_name = undef;
my $lock_name = undef;
my @tasks = ();
my %vars = ();
my $token_id = 0;
$vars{'\$'} = '$';
open SCRIPT_FILE, "<$script_name" or die 'script error: unable to open script "'.$script_name."\"\n";
my $line_count = 0;
while (my $line = ) {
chomp($line);
$line =~ s/#.*//; # remove comments
$line =~ s/^\s+//; # remove leading blanks
$line =~ s/\s+$//; # remove trailing blanks
$line_count++;
if ($line eq '') {
next;
}
if ($line =~ /^<\s*nodes\s*>$/) { # nodes token, id = 1
if ($token_id != 0) {
die "script error: only valid in main scope, line ".$line_count."\n";
}
$token_id = 1;
} elsif ($line =~ /^<\s*\/\s*nodes\s*>$/) { # nodes end token
if ($token_id != 1) {
die "script error: unmatched , line ".$line_count."\n";
}
$token_id = 0;
} elsif ($line =~ /<\s*job\s*(\w*)\s*>$/) { # job token, id = 2
my $tmp = $1;
if ($token_id != 0) {
die "script error: only valid in main scope, line ".$line_count."\n";
}
if ($tmp eq '') {
$job_name = undef;
} else {
$job_name = $tmp;
}
$token_id = 2;
} elsif ($line =~ /^<\s*\/\s*job\s*>$/) { # job end token
if ($token_id != 2) {
die "script error: unmatched , line ".$line_count."\n";
}
if (scalar(@tasks) == 0) {
die "script error: job has no tasks, line ".$line_count."\n";
}
push(@{$jobs_ref}, [ $job_name, [ @tasks ] ]);
@tasks = ();
$job_name = undef;
$token_id = 0;
} elsif ($line =~ /^<\s*lock\s*(\w*)s*>$/) { # lock token, id = 3
$lock_name = $1;
if ($token_id != 2) {
die "script error: only valid in scope, line ".$line_count."\n";
}
if ($lock_name eq '') {
die "script error: lock has no name, line ".$line_count."\n";
}
$token_id = 3;
} elsif ($line =~ /^<\s*\/\s*lock\s*>$/) { # lock end token
if ($token_id != 3) {
die "script error: unmatched , line ".$line_count."\n";
}
$lock_name = undef;
$token_id = 2;
} elsif ($line =~ /^<[^>]*>$/) { # unknown token
die "script error: unknown token, line ".$line_count."\n";
} else {
if ($token_id == 0) { # main scope
if ($line =~ /^(\w+)\s*=\s*(.+)$/) {
$vars{'$'.$1} = $2;
$vars{'${'.$1.'}'} = $2;
} else {
die "script error: parse error, line ".$line_count."\n";
}
} elsif ($token_id == 1) { # nodes scope
if (defined($nodes_ref)) {
push(@{$nodes_ref}, $line);
}
} elsif ($token_id == 2 or $token_id == 3) { # job or lock scope
push(@tasks, [ $lock_name, $line_count, replace_vars(\%vars, $line, $line_count)]);
} else {
die "internal program error\n";
}
}
}
close SCRIPT_FILE;
if (scalar(@{$jobs_ref}) == 0) {
die "script error: no jobs found\n";
}
}
sub run_command_on_node {
my $command = @_[0];
my $node = @_[1];
my $exitcode;
my $error;
system($ssh.' '.$node." '".$command."' /dev/null");
$exitcode = $? == -1 ? -1 : $? >> 8;
if ($exitcode == -1) {
$error = $!;
}
return ($exitcode, $error);
}
sub shift_job {
my $job_number;
lock($sem_job);
while (scalar(@job_queue) == 0 && $jobs_left) {
cond_wait($sem_job);
}
if (scalar(@job_queue)) {
$job_number = shift(@job_queue);
return $job_number;
}
return undef;
}
sub push_job {
my $job_number = @_[0];
lock($sem_job);
push(@job_queue, $job_number);
if (scalar(@job_queue) == 1) {
cond_broadcast($sem_job);
}
}
sub done_job_successfull {
lock($sem_job);
$jobs_left--;
$jobs_successfull++;
if ($jobs_left == 0) {
cond_broadcast($sem_job);
}
}
sub done_job_failed {
lock($sem_job);
$jobs_left--;
$jobs_failed++;
if ($jobs_left == 0) {
cond_broadcast($sem_job);
}
}
sub node_check {
my $node_name = @_[0];
my ($exitcode, $error) = run_command_on_node("echo Hello", $node_name);
if ($exitcode != 0) {
log_print(' checking slave '.$node_name.'... failed');
return 1;
}
log_print(' checking slave '.$node_name.'... success');
return 0;
}
sub lock_task {
my $lock_name = @_[0];
lock($sem_task);
while (defined($task_locks{$lock_name})) {
cond_wait($sem_task);
}
$task_locks{$lock_name} = 1;
}
sub unlock_task {
my $lock_name = @_[0];
lock($sem_task);
delete($task_locks{$lock_name});
cond_broadcast($sem_task);
}
sub node_worker {
my $node_name = @_[0];
my $jobs_ref = @_[1];
my $old_lock_name = undef;
while (defined(my $job_number = shift_job())) {
my ($job_name, $tasks_ref) = @{$jobs_ref->[$job_number]};
my $i = 1;
my $success = 1;
$job_name = defined($job_name) ? $job_name : 'number '.$job_number;
log_print('job '.$job_name.' started on node '.$node_name);
for my $task (@{$tasks_ref}) {
my ($lock_name, $line, $cmd) = @{$task};
# release old lock
if (defined($old_lock_name) && (!defined($lock_name) || $old_lock_name ne $lock_name)) {
unlock_task($old_lock_name);
$old_lock_name = undef;
}
if (defined($lock_name) && !defined($old_lock_name)) {
lock_task($lock_name);
}
$old_lock_name = $lock_name;
log_print('job '.$job_name.' task '.$i.' of '.scalar(@{$tasks_ref}).' started on node '.$node_name);
my ($exitcode, $error) = run_command_on_node($cmd, $node_name);
# unlock_task($lock_name);
if ($exitcode != 0) {
log_print('job '.$job_name.' task '.$i.' of '.scalar(@{$tasks_ref}).' (line '.$line.') on node '.$node_name.' failed!');
log_print('job '.$job_name.' on node '.$node_name.' failed!');
# push_job($job_number);
done_job_failed();
$success = 0;
last;
}
log_print('job '.$job_name.' task '.$i.' of '.scalar(@{$tasks_ref}).' success');
$i++;
}
if (defined($old_lock_name)) {
unlock_task($old_lock_name);
}
if ($success) {
log_print('job '.$job_name.' on node '.$node_name.' success');
done_job_successfull();
}
}
}
sub check_nodes {
my $nodes_ref = @_[0];
my @wnodes;
foreach my $node_name (@{$nodes_ref}) {
if (node_check($node_name) == 0) {
push(@wnodes, $node_name);
}
}
return @wnodes;
}
sub start_node_workers {
my $nodes_ref = @_[0];
my $threads_ref = @_[1];
my $jobs_ref = @_[2];
foreach my $node_name (@{$nodes_ref}) {
my $t = threads->new(\&node_worker, $node_name, $jobs_ref);
push(@{$threads_ref}, $t);
}
}
sub join_node_workers {
my $threads_ref = @_[0];
foreach my $thread (@{$threads_ref}) {
$thread->join();
}
}
sub parse_args {
my $nodes_ref = @_[0];
my $node_arg = 0;
my $script_name = undef;
while (defined(my $arg = shift @ARGV)) {
if ($arg eq '-n') {
if ($node_arg) {
die "error, second -n argument found\n".
"syntax: cluster_schedule [-n node1[,node02]*] script_name\n";
} else {
$node_arg = 1;
}
my $nodes = shift @ARGV;
if (!defined($nodes)) {
die "error, no nodes found after -n\n".
"syntax: cluster_schedule [-n node1[,node02]*] script_name\n";
}
@{$nodes_ref} = split(',', $nodes);
} else {
if (defined($script_name)) {
die "syntax: cluster_schedule [-n node1,node2,...] script_name\n";
}
$script_name = $arg;
}
}
if (!defined($script_name)) {
die "syntax: cluster_schedule [-n node1,node2,...] script_name\n";
}
return $script_name;
}
my @nodes = ();
my @jobs = ();
my $script_name = undef;
my $nodes_found_in_file = 0;
my $time = -time();
$script_name = parse_args(\@nodes);
if (scalar(@nodes) == 0) {
$nodes_found_in_file = 1;
read_script($script_name, \@jobs, \@nodes);
} else {
read_script($script_name, \@jobs, undef);
}
if (scalar(@nodes) == 0) {
die "error, no nodes defined\n";
}
log_print('found '.scalar(@jobs).' jobs in file '.$script_name);
log_print('found '.scalar(@nodes).' nodes '.($nodes_found_in_file ? 'in file '.$script_name : 'on command line').', checking...');
my $node_count = scalar(@nodes);
# checking nodes
@nodes = check_nodes(\@nodes);
if (scalar(@nodes) == 0) {
log_print('no working nodes, exit');
exit(1);
}
log_print('using '.scalar(@nodes).' of '.$node_count.' available nodes');
$jobs_left = scalar(@jobs);
@job_queue = 0 ... (scalar(@jobs) - 1);
my @threads;
start_node_workers(\@nodes, \@threads, \@jobs);
join_node_workers(\@threads);
$time += time();
my $t_days = int($time / (24 * 3600));
$time -= $t_days * 24 * 3600;
my $t_hours = int($time / 3600);
$time -= $t_hours * 3600;
my $t_minuttes = int($time / 60);
my $t_seconds = $time - $t_minuttes * 60;
log_print('summary...');
log_print(' successful jobs : '.$jobs_successfull);
log_print(' failed jobs : '.$jobs_failed);
log_print(' total running time:'.($t_days ? ' '.$t_days.'d' : '').
($t_hours ? ' '.$t_hours.'m' : '').
($t_minuttes ? ' '.$t_minuttes.'m' : '').
($t_seconds ? ' '.$t_seconds.'s' : '')."\n");