aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAvatar Ciaran McCreesh <ciaran.mccreesh@googlemail.com> 2009-12-27 00:03:50 +0000
committerAvatar Ciaran McCreesh <ciaran.mccreesh@googlemail.com> 2009-12-28 01:31:00 +0000
commited20e3bc7a0fcef8cfd75f07508318f99cb0d7d5 (patch)
tree3df85b6d39cd6bb8f796b7a7b5c0c67c8c9f7d5e
parente02985867ff9854c7827db3a53436cf05f41fee6 (diff)
downloadpaludis-ed20e3bc7a0fcef8cfd75f07508318f99cb0d7d5.tar.gz
paludis-ed20e3bc7a0fcef8cfd75f07508318f99cb0d7d5.tar.xz
Change how we execute syncs
-rw-r--r--src/clients/cave/cmd_sync.cc501
1 files changed, 291 insertions, 210 deletions
diff --git a/src/clients/cave/cmd_sync.cc b/src/clients/cave/cmd_sync.cc
index 0a76bc1..e8220c0 100644
--- a/src/clients/cave/cmd_sync.cc
+++ b/src/clients/cave/cmd_sync.cc
@@ -29,13 +29,18 @@
#include <paludis/util/make_named_values.hh>
#include <paludis/util/condition_variable.hh>
#include <paludis/util/thread.hh>
+#include <paludis/util/return_literal_function.hh>
#include <paludis/output_manager.hh>
#include <paludis/repository.hh>
#include <paludis/environment.hh>
#include <paludis/hook.hh>
#include <paludis/syncer.hh>
+#include <paludis/metadata_key.hh>
+#include <paludis/create_output_manager_info.hh>
+#include <tr1/functional>
#include <cstdlib>
#include <iostream>
+#include <list>
#include <map>
#include <set>
@@ -46,61 +51,16 @@ using namespace cave;
using std::cout;
using std::endl;
-namespace paludis
-{
- namespace n
- {
- struct output_manager;
- struct success;
- struct summary;
- }
-}
-
namespace
{
- struct TailAndLogOutputManager :
- OutputManager
- {
- TailAndLogOutputManager(const std::string &)
- {
- }
-
- virtual std::ostream & stdout_stream()
- {
- return std::cout;
- }
-
- virtual std::ostream & stderr_stream()
- {
- return std::cerr;
- }
-
- std::tr1::shared_ptr<const Sequence<std::string> > tail(const bool) const
- {
- return make_shared_ptr(new Sequence<std::string>);
- }
-
- void discard_log()
- {
- }
-
- const FSEntry log_file_name() const
- {
- return FSEntry("/dev/null");
- }
-
- void succeeded()
- {
- }
-
- virtual void message(const MessageType, const std::string &)
- {
- }
- };
+ typedef std::set<RepositoryName, RepositoryNameComparator> Repos;
struct SyncCommandLine :
CaveCommandCommandLine
{
+ args::ArgsGroup g_job_options;
+ args::SwitchArg a_sequential;
+
virtual std::string app_name() const
{
return "cave sync";
@@ -117,181 +77,326 @@ namespace
"are synced. Otherwise, all syncable repositories are synced.";
}
- SyncCommandLine()
+ SyncCommandLine() :
+ g_job_options(main_options_section(), "Job Options", "Job options."),
+ a_sequential(&g_job_options, "sequential", '\0', "Only perform one sync at a time.", false)
{
- add_usage_line("[repository ...]");
+ add_usage_line("[ --sequential ] [repository ...]");
}
};
- struct Message
+ std::string get_queue_name_func(
+ const std::tr1::shared_ptr<const Environment> & env,
+ const RepositoryName & r)
{
- NamedValue<n::output_manager, std::tr1::shared_ptr<TailAndLogOutputManager> > output_manager;
- NamedValue<n::success, bool> success;
- NamedValue<n::summary, std::string> summary;
- };
+ const std::tr1::shared_ptr<const Repository> repo(env->package_database()->fetch_repository(r));
+ if (repo->sync_host_key())
+ return repo->sync_host_key()->value();
+ else
+ return "(no host)";
+ }
+
+ struct Executive
+ {
+ virtual std::string queue_name() const = 0;
+ virtual std::string unique_id() const = 0;
+ virtual bool can_run() const = 0;
- typedef std::map<std::string, Message> Messages;
+ virtual void pre_execute_exclusive() = 0;
+ virtual void execute_threaded() = 0;
+ virtual void post_execute_exclusive() = 0;
+ };
- void do_one_sync_notifier(const RepositoryName & r, Mutex & notifier_mutex,
- Mutex & count_mutex, ConditionVariable & notifier_condition, int & np, int & na, int & nd,
- bool & finished, TailAndLogOutputManager & output_manager)
+ class Executor
{
- bool first(true);
- while (true)
- {
+ private:
+ int _pending;
+ int _active;
+ int _done;
+
+ typedef std::multimap<std::string, std::tr1::shared_ptr<Executive> > Queues;
+ typedef std::list<std::tr1::shared_ptr<Executive> > ReadyForPost;
+ Queues _queues;
+ ReadyForPost _ready_for_post;
+ Mutex _mutex;
+ ConditionVariable _condition;
+
+ void _one(const std::tr1::shared_ptr<Executive> executive)
{
- Lock lock(count_mutex);
- if (finished)
- return;
+ executive->execute_threaded();
- if (! first)
- {
- cout << format_general_spad(f::sync_repo_active(), stringify(r), np, na, nd);
- std::tr1::shared_ptr<const Sequence<std::string> > tail(output_manager.tail(true));
- if (tail && tail->begin() != tail->end())
- {
- for (Sequence<std::string>::ConstIterator t(tail->begin()), t_end(tail->end()) ;
- t != t_end ; ++t)
- cout << format_general_s(f::sync_repo_tail(), *t);
- }
- }
+ Lock lock(_mutex);
+ _ready_for_post.push_back(executive);
+ _condition.signal();
}
- Lock lock(notifier_mutex);
- notifier_condition.timed_wait(notifier_mutex, 10);
- first = false;
- }
- }
+ public:
+ Executor() :
+ _pending(0),
+ _active(0),
+ _done(0)
+ {
+ }
- void do_one_sync(const std::tr1::shared_ptr<Environment> & env, const RepositoryName & r, Mutex & mutex,
- Messages & messages, int & retcode, int & np, int & na, int & nd)
- {
- std::tr1::shared_ptr<TailAndLogOutputManager> output_manager(
- new TailAndLogOutputManager("sync-" + stringify(r)));
+ int pending() const
+ {
+ return _pending;
+ }
- bool done_decrement(false);
+ int active() const
+ {
+ return _active;
+ }
- try
- {
+ int done() const
{
- Lock lock(mutex);
- ++na;
- --np;
- cout << format_general_spad(f::sync_repo_starting(), stringify(r), np, na, nd);
- if (0 !=
- env->perform_hook(Hook("sync_pre")
- ("TARGET", stringify(r))
- ("NUMBER_DONE", stringify(nd))
- ("NUMBER_ACTIVE", stringify(na))
- ("NUMBER_PENDING", stringify(np))
- ).max_exit_status())
- throw SyncFailedError("Sync of '" + stringify(r) + "' aborted by hook");
+ return _done;
}
- const std::tr1::shared_ptr<const Repository> repo(env->package_database()->fetch_repository(r));
- bool result(false);
+ void add(const std::tr1::shared_ptr<Executive> & x)
{
- Mutex notifier_mutex;
- ConditionVariable notifier_condition;
- bool finished(false);
- Thread notifier_thread(std::tr1::bind(&do_one_sync_notifier, r,
- std::tr1::ref(notifier_mutex), std::tr1::ref(mutex),
- std::tr1::ref(notifier_condition),
- std::tr1::ref(np), std::tr1::ref(na), std::tr1::ref(nd),
- std::tr1::ref(finished), std::tr1::ref(*output_manager)));
-
- try
- {
- result = repo->sync(output_manager);
+ ++_pending;
+ _queues.insert(std::make_pair(x->queue_name(), x));
+ }
+ void execute()
+ {
+ typedef std::map<std::string, std::tr1::shared_ptr<Thread> > Running;
+ Running running;
+
+ Lock lock(_mutex);
+ while (true)
+ {
+ bool any(false);
+ for (Queues::iterator q(_queues.begin()), q_end(_queues.end()) ;
+ q != q_end ; )
{
- Lock lock(mutex);
- finished = true;
+ if ((running.end() != running.find(q->first)) || ! q->second->can_run())
+ {
+ ++q;
+ continue;
+ }
+
+ ++_active;
+ --_pending;
+ q->second->pre_execute_exclusive();
+ running.insert(std::make_pair(q->first, make_shared_ptr(new Thread(
+ std::tr1::bind(&Executor::_one, this, q->second)))));
+ _queues.erase(q++);
+ any = true;
}
- }
- catch (...)
- {
+
+ if ((! any) && running.empty())
+ break;
+
+ _condition.wait(_mutex);
+
+ for (ReadyForPost::iterator p(_ready_for_post.begin()), p_end(_ready_for_post.end()) ;
+ p != p_end ; ++p)
{
- Lock lock(mutex);
- finished = true;
+ --_active;
+ ++_done;
+ running.erase((*p)->queue_name());
+ (*p)->post_execute_exclusive();
}
- notifier_condition.acquire_then_signal(notifier_mutex);
- throw;
+
+ _ready_for_post.clear();
}
- notifier_condition.acquire_then_signal(notifier_mutex);
}
+ };
+
+ struct SyncExecutive :
+ Executive
+ {
+ bool abort;
+ bool success;
+ bool skipped;
+ std::string error;
+
+ const std::tr1::shared_ptr<Environment> env;
+ const SyncCommandLine & cmdline;
+ const Executor * const executor;
+ const RepositoryName name;
+
+ std::tr1::shared_ptr<OutputManager> output_manager;
+
+ SyncExecutive(
+ const std::tr1::shared_ptr<Environment> & e,
+ const SyncCommandLine & c,
+ const Executor * const x,
+ const RepositoryName & n) :
+ abort(false),
+ success(false),
+ skipped(false),
+ env(e),
+ cmdline(c),
+ executor(x),
+ name(n)
+ {
+ }
+
+ virtual std::string queue_name() const
+ {
+ const std::tr1::shared_ptr<const Repository> r(env->package_database()->fetch_repository(name));
+ if (r->sync_host_key())
+ return r->sync_host_key()->value();
+ else
+ return "";
+ }
+
+ virtual std::string unique_id() const
+ {
+ return stringify(name);
+ }
+
+ virtual bool can_run() const
+ {
+ return true;
+ }
+ virtual void pre_execute_exclusive()
+ {
+ try
{
- Lock lock(mutex);
- ++nd;
- --na;
- done_decrement = true;
-
- if (0 !=
- env->perform_hook(Hook("sync_post")
- ("TARGET", stringify(r))
- ("NUMBER_DONE", stringify(nd))
- ("NUMBER_ACTIVE", stringify(na))
- ("NUMBER_PENDING", stringify(np))
+ cout << format_general_spad(f::sync_repo_starting(), stringify(name), executor->pending(),
+ executor->active(), executor->done());
+
+ if (0 != env->perform_hook(Hook("sync_pre")
+ ("TARGET", stringify(name))
+ ("NUMBER_DONE", stringify(executor->done()))
+ ("NUMBER_ACTIVE", stringify(executor->active()))
+ ("NUMBER_PENDING", stringify(executor->pending()))
).max_exit_status())
- throw SyncFailedError("Sync of '" + stringify(r) + "' aborted by hook");
+ throw SyncFailedError("Sync aborted by hook");
+ }
+ catch (const Exception & e)
+ {
+ abort = true;
+ error = e.message();
+ }
+ catch (...)
+ {
+ abort = true;
+ error = "Caught unknown exception";
}
+ }
+
+ virtual void execute_threaded()
+ {
+ if (abort)
+ return;
- if (result)
+ try
{
- Lock lock(mutex);
- messages.insert(make_pair(stringify(r), make_named_values<Message>(
- value_for<n::output_manager>(output_manager),
- value_for<n::success>(true),
- value_for<n::summary>("success")
- )));
- cout << format_general_spad(f::sync_repo_done_success(), stringify(r), np, na, nd);
+ const std::tr1::shared_ptr<Repository> repo(env->package_database()->fetch_repository(name));
+ output_manager = env->create_output_manager(
+ CreateOutputManagerForRepositorySyncInfo(*repo,
+ cmdline.a_sequential.specified() ? oe_exclusive : oe_with_others));
+
+ if (! repo->sync(output_manager))
+ skipped = true;
+ success = true;
}
- else
+ catch (const SyncFailedError & e)
+ {
+ error = e.message();
+ /* don't abort */
+ }
+ catch (const Exception & e)
+ {
+ error = e.message();
+ abort = true;
+ }
+ catch (...)
+ {
+ error = "Caught unknown exception";
+ abort = true;
+ }
+ }
+
+ virtual void post_execute_exclusive()
+ {
+ if (abort)
+ return;
+
+ try
+ {
+ if (0 != env->perform_hook(Hook(success ? "sync_post" : "sync_fail")
+ ("TARGET", stringify(name))
+ ("NUMBER_DONE", stringify(executor->done()))
+ ("NUMBER_ACTIVE", stringify(executor->active()))
+ ("NUMBER_PENDING", stringify(executor->pending()))
+ ).max_exit_status())
+ throw SyncFailedError("Sync aborted by hook");
+
+ if (skipped)
+ cout << format_general_spad(f::sync_repo_done_no_syncing_required(), stringify(name),
+ executor->pending(), executor->active(), executor->done());
+ else if (! success)
+ cout << format_general_spad(f::sync_repo_done_failure(), stringify(name),
+ executor->pending(), executor->active(), executor->done());
+ else
+ cout << format_general_spad(f::sync_repo_done_success(), stringify(name),
+ executor->pending(), executor->active(), executor->done());
+ }
+ catch (const Exception & e)
+ {
+ error = e.message();
+ abort = true;
+ success = false;
+ }
+ catch (...)
{
- Lock lock(mutex);
- messages.insert(make_pair(stringify(r), make_named_values<Message>(
- value_for<n::output_manager>(output_manager),
- value_for<n::success>(true),
- value_for<n::summary>("no syncing required")
- )));
- cout << format_general_spad(f::sync_repo_done_no_syncing_required(), stringify(r), np, na, nd);
+ error = "Caught unknown exception";
+ abort = true;
+ success = false;
}
}
- catch (const Exception & e)
+ };
+
+ int sync_these(
+ const std::tr1::shared_ptr<Environment> & env,
+ const SyncCommandLine & cmdline,
+ const Repos & repos)
+ {
+ std::list<std::tr1::shared_ptr<SyncExecutive> > executives;
+
{
- Lock lock(mutex);
+ Executor executor;
- if (! done_decrement)
+ for (Repos::const_iterator r(repos.begin()), r_end(repos.end()) ;
+ r != r_end ; ++r)
{
- --na;
- ++nd;
- done_decrement = true;
+ const std::tr1::shared_ptr<SyncExecutive> x(new SyncExecutive(env, cmdline, &executor, *r));
+ executor.add(x);
+ executives.push_back(x);
}
- retcode |= 1;
- messages.insert(make_pair(stringify(r), make_named_values<Message>(
- value_for<n::output_manager>(output_manager),
- value_for<n::success>(false),
- value_for<n::summary>(e.message() + " (" + e.what() + ")")
- )));
+ executor.execute();
+ }
+
+ int retcode(0);
- cout << format_general_spad(f::sync_repo_done_failure(), stringify(r), np, na, nd);
- std::tr1::shared_ptr<const Sequence<std::string> > tail(output_manager->tail(true));
- if (tail && tail->begin() != tail->end())
+ cout << format_general_s(f::sync_heading(), "Sync results");
+ for (std::list<std::tr1::shared_ptr<SyncExecutive> >::const_iterator x(executives.begin()),
+ x_end(executives.end()) ;
+ x != x_end ; ++x)
+ {
+ if (! (*x)->success)
{
- for (Sequence<std::string>::ConstIterator t(tail->begin()), t_end(tail->end()) ;
- t != t_end ; ++t)
- cout << format_general_s(f::sync_repo_tail(), *t);
+ retcode |= 1;
+ cout << format_general_kv(f::sync_message_failure(), stringify((*x)->name), "failed");
+ cout << format_general_kv(f::sync_message_failure_message(), "error", (*x)->error);
}
+ else if ((*x)->skipped)
+ cout << format_general_kv(f::sync_message_success(), stringify((*x)->name), "no syncing required");
+ else
+ cout << format_general_kv(f::sync_message_success(), stringify((*x)->name), "success");
- int PALUDIS_ATTRIBUTE((unused)) ignore(env->perform_hook(Hook("sync_fail")
- ("TARGET", stringify(r))
- ("NUMBER_DONE", stringify(nd))
- ("NUMBER_ACTIVE", stringify(na))
- ("NUMBER_PENDING", stringify(np))
- ).max_exit_status());
+ (*x)->output_manager.reset();
}
+
+ return retcode;
}
}
@@ -317,13 +422,17 @@ SyncCommand::run(
}
int retcode(0);
- Messages messages;
- std::set<RepositoryName, RepositoryNameComparator> repos;
+ Repos repos;
if (cmdline.begin_parameters() != cmdline.end_parameters())
for (SyncCommandLine::ParametersConstIterator p(cmdline.begin_parameters()), p_end(cmdline.end_parameters()) ;
p != p_end ; ++p)
- repos.insert(RepositoryName(*p));
+ {
+ RepositoryName n(*p);
+ if (! env->package_database()->has_repository_named(n))
+ throw NothingMatching(*p);
+ repos.insert(n);
+ }
else
for (PackageDatabase::RepositoryConstIterator p(env->package_database()->begin_repositories()),
p_end(env->package_database()->end_repositories()) ;
@@ -339,17 +448,7 @@ SyncCommand::run(
cout << format_general_s(f::sync_repos_title(), "");
- {
- Mutex mutex;
- int active(0), done(0), pending(repos.size());
-
- ActionQueue actions(5);
- for (std::set<RepositoryName, RepositoryNameComparator>::const_iterator r(repos.begin()), r_end(repos.end()) ;
- r != r_end ; ++r)
- actions.enqueue(std::tr1::bind(&do_one_sync, env, *r, std::tr1::ref(mutex),
- std::tr1::ref(messages), std::tr1::ref(retcode), std::tr1::ref(pending),
- std::tr1::ref(active), std::tr1::ref(done)));
- }
+ retcode |= sync_these(env, cmdline, repos);
for (PackageDatabase::RepositoryConstIterator r(env->package_database()->begin_repositories()),
r_end(env->package_database()->end_repositories()) ; r != r_end ; ++r)
@@ -363,24 +462,6 @@ SyncCommand::run(
)).max_exit_status())
throw SyncFailedError("Sync aborted by hook");
- cout << endl << format_general_s(f::sync_heading(), "Sync results");
-
- for (Messages::const_iterator m(messages.begin()), m_end(messages.end()) ;
- m != m_end ; ++m)
- {
- if (m->second.success())
- {
- cout << format_general_kv(f::sync_message_success(), m->first, m->second.summary());
- m->second.output_manager()->discard_log();
- }
- else
- {
- cout << format_general_kv(f::sync_message_failure(), m->first, m->second.summary());
- cout << format_general_kv(f::sync_message_failure_message(), "Log file", stringify(m->second.output_manager()->log_file_name()));
- }
- }
- cout << endl;
-
return retcode;
}