diff --git a/MODULE/HgFootLocker.pike b/MODULE/HgFootLocker.pike --- a/MODULE/HgFootLocker.pike +++ b/MODULE/HgFootLocker.pike @@ -1,206 +1,220 @@ inherit .FootLocker; protected ADT.Queue add_queue = ADT.Queue(); +//#define SSH_DEBUG 1 + +#ifdef SSH_DEBUG +#define SSH_DEBUG_OPT "-v" +#else +#define SSH_DEBUG_OPT "" +#endif + multiset remote_commands = (<"push", "pull", "incoming", "outgoing">); // TODO // verify presense of hg and ssh void verify_local_repository() { werror("verifying repository for %s\n", dir); - mapping res = run_hg_command("status"); + mapping res; + mixed err; + + if(err = Error.mkerror(catch(res = run_hg_command("status")))) { + werror("error running hg status: " + err->message() ); + throw(Error.Generic("Unable to run mercurial status command. Is mercurial not in PATH?\n")); + } werror("hg status returned status %d, stdout: %O, stderr: %O\n", res->exitcode, res->stdout, res->stderr); - + if(res->exitcode == 255) { // no repository present // ok, first step is to initialize an empty repository here res = run_hg_command("init"); if(res->exitcode) { // if the init failed for some reason, return us to a repository-less situation. Stdio.recursive_rm(get_dir("/.hg")); explain_hg_error(res); } Stdio.write_file(get_dir("/.hg/hgrc"), "[paths]\ndefault=" + configuration->source + "\n\n[extensions]\nrebase=\n\n[ui]\nusername=" + System.get_user() + " <" + System.get_user() + "@" + System.gethostname() + ">\n"); res = run_hg_command("pull", " --rebase -t internal:merge-local " + configuration->source); if(res->exitcode) { // if the pull failed for some reason, return us to a repository-less situation. // we may possibly be left with a partial pull, but at least there will be no data loss. Stdio.recursive_rm(get_dir("/.hg")); explain_hg_error(res); } res = run_hg_command("update"); if(res->exitcode == 1) { res = run_hg_command("merge", "-t internal:merge-local"); } else if(res->exitcode == 0) { // success } else { // if the update failed for some reason, return us to a repository-less situation. // we may possibly be left with a partial pull, but at least there will be no data loss. Stdio.recursive_rm(get_dir("/.hg")); explain_hg_error(res); } } else if(res->exitcode == 0) { // repository present - res = run_hg_command("outgoing " + configuration->source); + res = run_hg_command("outgoing", configuration->source); if(res->exitcode == 0) { pull_n_push_changes(); } } else { throw(Error.Generic("unable to determine state of footlocker repository: " + res->stderr + "\n")); } werror("repository successfully verified for %s\n", dir); } void explain_hg_error(mapping res) { string exp = sprintf("We ran the command %s, and it returned exitcode %d, stdout: %O, stderr: %O\n", res->command, res->exitcode, res->stdout, res->stderr); werror(exp); add_history(HISTORY_TYPE_ERROR, exp); throw(Error.Generic(exp)); } mapping run_hg_command(string command, string|void args) { string cmdstr = "hg " + command; - if(remote_commands[command]) cmdstr += " --ssh \"ssh -oUserKnownHostsFile=/dev/null -oStrictHostKeyChecking=no -i '" + configuration->private_key + "'\""; + if(remote_commands[command]) cmdstr += " --ssh \"ssh " + SSH_DEBUG_OPT + " -oUserKnownHostsFile=/dev/null -oStrictHostKeyChecking=no -i '" + configuration->private_key + "'\""; if(args) cmdstr += (" " + args); werror("%s -> running %s\n", dir, cmdstr); return Process.run(cmdstr, (["cwd": dir])) + (["command": cmdstr]); } int do_add_file(string path, int|void advisory) { add_queue->write(({path, advisory})); return 1; } int do_remove_file(string path) { run_hg_command("rm", path); return 1; } void do_run_commit(array ents) { mixed e; if(sizeof(add_queue)) { ADT.Queue aq = add_queue; add_queue = ADT.Queue(); mapping res = run_hg_command("add", sprintf("%{'%s' %}", (array)aq)); } mapping st = run_hg_command("status", "-m -a -r -d -n"); array files_affected = (st->stdout/"\n"); array files_to_ignore = ents - files_affected; ents = ents & files_affected; if(sizeof(ents)) { set_processing_state(PROCESSING_STATE_RECORDING); mapping resp = run_hg_command("commit", "-m 'QuarterMaster generated commit from " + gethostname() + ".\n\nFiles modified:\n\n" + sprintf("%{%s\n%}", ents) + "' " + sprintf("%{'%s' %}", ents)); add_history(HISTORY_TYPE_INFO, "Recorded changes to " + sizeof(ents) + " files.\n" + sprintf("%{%s\n%}", ents)); set_processing_state(PROCESSING_STATE_IDLE); if(resp->exitcode == 0) { e = catch(pull_n_push_changes()); } } else if (pull_needed) { pull_needed = 0; e = catch(pull_n_push_changes()); } else { set_processing_state(PROCESSING_STATE_IDLE); } if(e) throw(e); } void do_pull_incoming_changes() { set_processing_state(PROCESSING_STATE_RECEIVING); mapping r; r = pull_changes(); if(r->exitcode != 0) { catch(explain_hg_error(r)); } stop_processing_events(); r = update_changes(); if(r->exitcode == 0) { set_repository_state(REPOSITORY_STATE_CLEAN); // TODO include source and files affected. add_history(HISTORY_TYPE_INFO, "Received incoming changes."); } if(r->exitcode != 0) { set_repository_state(REPOSITORY_STATE_ERROR); catch(explain_hg_error(r)); } // TODO we need to have better error handling here. set_processing_state(PROCESSING_STATE_IDLE); start_processing_events(); } void pull_n_push_changes() { set_processing_state(PROCESSING_STATE_RECEIVING); mapping r; // mapping r = run_hg_command("incoming", configuration->source); // if(r->exitcode == 0) { r = pull_changes(); if(r->exitcode != 0) { catch(explain_hg_error(r)); } stop_processing_events(); r = update_changes(); if(r->exitcode != 0) { catch(explain_hg_error(r)); set_repository_state(REPOSITORY_STATE_ERROR); } // } else if(r->exit_code > 1) { // catch(explain_hg_error(r)); // } else { start_processing_events(); set_processing_state(PROCESSING_STATE_SENDING); r = push_changes(); // } // TODO we need to have better error handling here. set_processing_state(PROCESSING_STATE_IDLE); if(r->exitcode == 0) { add_history(HISTORY_TYPE_INFO, "Sent changes."); changes_pushed(); } if(!(<0,1>)[r->exitcode]) // zero is success, one is nothing to push. { explain_hg_error(r); set_repository_state(REPOSITORY_STATE_ERROR); } else { set_repository_state(REPOSITORY_STATE_CLEAN); } } mapping push_changes() { mapping res = run_hg_command("push", configuration->source); return res; } mapping update_changes() { mapping res = run_hg_command("update", "--check"); if(res->exitcode == 1) { res = run_hg_command("merge", "-t internal:merge-local"); } return res; } mapping pull_changes() { mapping res = run_hg_command("pull", "--rebase -t internal:merge-local " + configuration->source); return res; } diff --git a/quartermaster.pike b/quartermaster.pike --- a/quartermaster.pike +++ b/quartermaster.pike @@ -1,231 +1,237 @@ inherit Application.Backgrounder; constant QUARTERMASTER_VERSION = "0.1"; mapping config_defaults = ([ "configfile": "quartermaster.conf", "logfile": "/var/log/quartermaster.log", "daemon": 0, "help": 0, "version": 0 ]); string configfile; string logfile; mapping prefs; int shutting_down = 0; ControlSocket control_socket; mapping footlockers = ([]); protected void create(array(string) args) { ::create(args); } public int main(int argc, array(string) argv) { int daemon_mode=0; array args=Getopt.find_all_options(argv, ({ ({"daemon", Getopt.NO_ARG, ({"-d", "--daemon"}) }), ({"help", Getopt.NO_ARG, ({"-h", "--help"}) }), ({"version", Getopt.NO_ARG, ({"-v", "--version"}) }), ({"configfile", Getopt.HAS_ARG, ({"-f", "--configfile"}) }), ({"logfile", Getopt.HAS_ARG, ({"-l", "--logfile"}) }) }) ); mapping config = ([]); foreach(args, array arg) { switch(arg[0]) { case "daemon": config->daemon = 1; break; case "help": config->help = 1; break; case "version": config->version = 1; break; case "logfile": config->logfile = arg[1]; break; case "configfile": config->configfile = arg[1]; break; } } config = config_defaults + config; // we should get a fully populated config object with this. if(config->help) { display_help(); return 0; } if(config->version) { display_version(); return 0; } if(config->daemon) { daemon_mode=1; catch(System.setsid()); } logfile = config->logfile; configfile = config->configfile; if(!Stdio.is_file(configfile)) { werror("Error: config file %s does not exist\n", configfile); return 1; } if(daemon_mode) { mixed e = catch(Stdio.File(logfile, "cwa")); if(e) { werror("Unable to create log file: %s\n", Error.mkerror(e)->message()); return 2; } } write("Quartermaster %s starting\n", QUARTERMASTER_VERSION); if(enter_background(daemon_mode, logfile)) { return 0; } load_preferences(); call_out(configure_control_socket, 1); call_out(configure_footlockers, 1); signal(signum("SIGINT"), shutdown); return -1; } protected void display_help() { write("usage: quartermaster [-v|--version] [-h|--help] [-f yyy|--configfile=yyyy] [-d|--daemon] [-lxxxx|-logfile=xxxx]\n"); } protected void display_version() { write("quartermaster version %s\n", QUARTERMASTER_VERSION); } protected void load_preferences() { string f=Stdio.read_file(configfile); if(!f) error("Unable to read config file " + configfile + "\n"); else write("Read configuration from " + configfile + "\n"); mapping p=Config.read(f); prefs=p; } protected void configure_control_socket() { string sock_path; if(prefs->global && (sock_path = prefs->global->control_socket)) { werror("creating control socket at %s.\n", sock_path); control_socket = ControlSocket(sock_path, controlsocket_delegate(this)); } } protected void configure_footlockers() { foreach(glob("footlocker_*", indices(prefs));; string config_key) { write("Configuring footlocker from %s\n", config_key); mapping fl_config = prefs[config_key]; switch(fl_config->type) { case "hg": write("creating footlocker for hg source.\n"); FootLocker fl = HgFootLocker(fl_config, footlocker_delegate(this)); - fl->setup(); + mixed err; + if(err = catch(fl->setup())) { + werror("Error creating footlocker %s\n", config_key); + werror("Shutting down.\n"); + call_out(shutdown, 0); + return; + } footlockers[config_key] = fl; fl->start_watch(); break; default: werror("unknown footlocker type " + fl_config->type + ". Not configuring.\n"); } } } mapping status() { mapping fls = ([]); foreach(footlockers; string key; object fl) { fls[key] = fl->status(); } return fls; } mapping pause() { mapping fls = ([]); foreach(footlockers; string key; object fl) { fls[key] = fl->pause(); } return fls; } mapping resume() { mapping fls = ([]); foreach(footlockers; string key; object fl) { fls[key] = fl->resume(); } return fls; } mapping shutdown() { if(shutting_down) { werror("already shutting down\n"); return (["error": "SHUTDOWN_PENDING"]); } return do_shutdown(); } mapping do_shutdown() { shutting_down = 1; werror("shutdown requested\n"); foreach(footlockers; string key; object fl) { werror("requesting shutdown of %s\n", key); fl->shutdown(); } call_out(exit, 1.0, 0); return (["result": "OK"]); } void status_changed(object footlocker, mapping status) { if(control_socket) { string flname = search(footlockers, footlocker); control_socket->status_changed(flname, status); } } class footlocker_delegate(object controller) { void status_changed(object footlocker, mapping status) { controller->status_changed(footlocker, status); } } class controlsocket_delegate(object controller) { mapping status() { return controller->status(); } mapping shutdown() { return controller->shutdown(); } mapping pause() { return controller->pause(); } mapping resume() { return controller->resume(); } }