NCBI C++ ToolKit
queue_database.cpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 /* $Id: queue_database.cpp 71715 2016-03-24 14:13:07Z satskyse $
2  * ===========================================================================
3  *
4  * PUBLIC DOMAIN NOTICE
5  * National Center for Biotechnology Information
6  *
7  * This software/database is a "United States Government Work" under the
8  * terms of the United States Copyright Act. It was written as part of
9  * the author's official duties as a United States Government employee and
10  * thus cannot be copyrighted. This software/database is freely available
11  * to the public for use. The National Library of Medicine and the U.S.
12  * Government have not placed any restriction on its use or reproduction.
13  *
14  * Although all reasonable efforts have been taken to ensure the accuracy
15  * and reliability of the software and data, the NLM and the U.S.
16  * Government do not and cannot warrant the performance or results that
17  * may be obtained by using this software or data. The NLM and the U.S.
18  * Government disclaim all warranties, express or implied, including
19  * warranties of performance, merchantability or fitness for any particular
20  * purpose.
21  *
22  * Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors: Anatoliy Kuznetsov, Victor Joukov
27  *
28  * File Description: NetSchedule queue collection and database managenement.
29  *
30  */
31 #include <ncbi_pch.hpp>
32 
33 #include <corelib/ncbi_system.hpp> // SleepMilliSec
34 #include <corelib/ncbireg.hpp>
35 
37 #include <connect/ncbi_socket.hpp>
38 
39 #include <db.h>
40 #include <db/bdb/bdb_trans.hpp>
41 #include <db/bdb/bdb_cursor.hpp>
42 #include <db/bdb/bdb_util.hpp>
43 
44 #include <util/time_line.hpp>
45 
46 #include "queue_database.hpp"
47 #include "ns_util.hpp"
48 #include "netschedule_version.hpp"
49 #include "ns_server.hpp"
50 #include "ns_handler.hpp"
51 #include "ns_db_dump.hpp"
52 #include "ns_types.hpp"
53 
54 
56 
57 
58 #define GetUIntNoErr(name, dflt) \
59  (unsigned) bdb_conf.GetInt("netschedule", name, \
60  CConfig::eErr_NoThrow, dflt)
61 #define GetSizeNoErr(name, dflt) \
62  (unsigned) bdb_conf.GetDataSize("netschedule", name, \
63  CConfig::eErr_NoThrow, dflt)
64 #define GetBoolNoErr(name, dflt) \
65  bdb_conf.GetBool("netschedule", name, CConfig::eErr_NoThrow, dflt)
66 
67 
68 
69 /////////////////////////////////////////////////////////////////////////////
70 // SNSDBEnvironmentParams implementation
71 
72 bool SNSDBEnvironmentParams::Read(const IRegistry& reg, const string& sname)
73 {
74  CConfig conf(reg);
75  const CConfig::TParamTree* param_tree = conf.GetTree();
76  const TPluginManagerParamTree* bdb_tree = param_tree->FindSubNode(sname);
77 
78  if (!bdb_tree)
79  return false;
80 
81  CConfig bdb_conf((CConfig::TParamTree*)bdb_tree, eNoOwnership);
82 
83  db_path = bdb_conf.GetString("netschedule", "path", CConfig::eErr_Throw);
84  db_log_path = ""; // doesn't work yet
85  // bdb_conf.GetString("netschedule", "transaction_log_path",
86  // CConfig::eErr_NoThrow, "");
87 
88  max_queues = GetUIntNoErr("max_queues", 50);
89 
90  cache_ram_size = GetSizeNoErr("mem_size", 0);
91  mutex_max = GetUIntNoErr("mutex_max", 0);
92  max_locks = GetUIntNoErr("max_locks", 0);
93  max_lockers = GetUIntNoErr("max_lockers", 0);
94  max_lockobjects = GetUIntNoErr("max_lockobjects", 0);
95  log_mem_size = GetSizeNoErr("log_mem_size", 0);
96  // max_trans is derivative, so we do not read it here
97 
98  checkpoint_kb = GetUIntNoErr("checkpoint_kb", 5000);
99  checkpoint_min = GetUIntNoErr("checkpoint_min", 5);
100 
101  sync_transactions = GetBoolNoErr("sync_transactions", false);
102  direct_db = GetBoolNoErr("direct_db", false);
103  direct_log = GetBoolNoErr("direct_log", false);
104  private_env = GetBoolNoErr("private_env", false);
105  return true;
106 }
107 
108 
109 
110 /////////////////////////////////////////////////////////////////////////////
111 // CQueueDataBase implementation
112 
114  const SNSDBEnvironmentParams & params,
115  bool reinit)
116 : m_Host(server->GetBackgroundHost()),
117  m_Executor(server->GetRequestExecutor()),
118  m_Env(NULL),
119  m_StopPurge(false),
120  m_FreeStatusMemCnt(0),
121  m_LastFreeMem(time(0)),
122  m_Server(server),
123  m_PurgeQueue(""),
124  m_PurgeStatusIndex(0),
125  m_PurgeJobScanned(0)
126 {
130 
131  // First, load the previous session start job IDs if file existed
133 
134  // Old instance bdb files are not needed (even if they survived)
136 
137  // Creates the queues from the ini file and loads jobs from the dump
138  x_Open(params, reinit);
139 }
140 
141 
143 {
144  try {
145  Close();
146  } catch (...) {}
147 }
148 
149 
151  bool reinit)
152 {
153  // Checks preconditions and provides the final reinit value
154  // It sets alerts and throws exceptions if needed.
155  reinit = x_CheckOpenPreconditions(reinit);
156 
157  CDir data_dir(m_DataPath);
158  if (reinit)
159  data_dir.Remove();
160  if (!data_dir.Exists())
161  data_dir.Create();
162 
163  // The initialization must be done before the queues are created but after
164  // the directory is possibly re-created
166 
167  m_Env = x_CreateBDBEnvironment(params);
168 
169  // Detect what queues need to be loaded. It depends on the configuration
170  // file and on the dumped queues. It might be that the saved queues +
171  // the config file queues excced the configured max number of queues.
172  set<string, PNocase> dump_static_queues;
173  map<string, string,
174  PNocase> dump_dynamic_queues; // qname -> qclass
175  TQueueParams dump_queue_classes;
176  x_ReadDumpQueueDesrc(dump_static_queues, dump_dynamic_queues,
177  dump_queue_classes);
178  set<string, PNocase> config_static_queues = x_GetConfigQueues();
179  string last_queue_load_error;
180  size_t queue_load_error_count = 0;
181 
182  // Exclude number of queues will be the static queues from the config
183  // plus the dumped dynamic queues
184  size_t final_dynamic_count = 0;
186  k = dump_dynamic_queues.begin();
187  k != dump_dynamic_queues.end(); ++k)
188  if (config_static_queues.find(k->first) == config_static_queues.end())
189  ++final_dynamic_count;
190 
191  size_t total_queues = final_dynamic_count +
192  config_static_queues.size();
193  size_t queues_limit = total_queues;
194  if (total_queues > params.max_queues) {
195  string msg = "The initial number of queues on the server exceeds the "
196  "configured max number of queues. Configured: " +
197  NStr::NumericToString(params.max_queues) + ". Real: " +
198  NStr::NumericToString(total_queues) + ". The limit will "
199  "be extended to accomodate all the queues.";
200  LOG_POST(Note << msg);
202  } else {
203  // The configuration file limit has not been exceeded
204  queues_limit = params.max_queues;
205  }
206 
207  // Allocate SQueueDbBlock's here, open/create corresponding databases
208  m_QueueDbBlockArray.Init(*m_Env, m_DataPath, queues_limit);
209 
210  try {
211  // Here: we can start restoring what was saved. The first step is
212  // to get the linked sections. Linked sections have two sources:
213  // - config file
214  // - dumped sections
215  // The config file sections must override the dumped ones
216  CJsonNode unused_diff = CJsonNode::NewObjectNode();
218  unused_diff);
220 
221  // Read the queue classes from the config file and append those which
222  // come from the dump
224  CNcbiApplication::Instance()->GetConfig());
225  for (TQueueParams::const_iterator k = dump_queue_classes.begin();
226  k != dump_queue_classes.end(); ++k) {
227  if (m_QueueClasses.find(k->first) == m_QueueClasses.end())
228  m_QueueClasses[k->first] = k->second;
229  }
230 
231  // Read the queues from the config file
232  TQueueParams queues_from_ini =
234  CNcbiApplication::Instance()->GetConfig(),
236  x_ConfigureQueues(queues_from_ini, unused_diff);
237 
238  // Add the queues from the dump
240  k = dump_dynamic_queues.begin();
241  k != dump_dynamic_queues.end(); ++k) {
242  string qname = k->first;
243  if (config_static_queues.find(qname) != config_static_queues.end())
244  continue;
245 
246  // Here: the dumped queue has not been changed from a dynamic one
247  // to a static. So, it needs to be added.
248  string qclass = k->second;
249  int new_position = m_QueueDbBlockArray.Allocate();
250 
251  SQueueParameters params = m_QueueClasses[qclass];
252 
253  params.kind = CQueue::eKindDynamic;
254  params.position = new_position;
255  params.delete_request = false;
256  params.qclass = qclass;
257 
258  // Lost parameter: description. The only dynamic queue classes are
259  // dumped so the description is lost.
260  // params.description = ...
261 
262  x_CreateAndMountQueue(qname, params,
263  m_QueueDbBlockArray.Get(new_position));
264  }
265 
266  // All the structures are ready to upload the jobs from the dump
268  k != m_Queues.end(); ++k) {
269  try {
270  unsigned int records =
271  k->second.second->LoadFromDump(m_DumpPath);
273  .Print("_type", "startup")
274  .Print("_queue", k->first)
275  .Print("info", "load_from_dump")
276  .Print("records", records);
277  } catch (const exception & ex) {
278  ERR_POST(ex.what());
279  last_queue_load_error = ex.what();
280  ++queue_load_error_count;
281  } catch (...) {
282  last_queue_load_error = "Unknown error loading queue " +
283  k->first + " from dump";
284  ERR_POST(last_queue_load_error);
285  ++queue_load_error_count;
286  }
287  }
288  } catch (const exception & ex) {
289  ERR_POST(ex.what());
290  last_queue_load_error = ex.what();
291  ++queue_load_error_count;
292  } catch (...) {
293  last_queue_load_error = "Unknown error loading queues from dump";
294  ERR_POST(last_queue_load_error);
295  ++queue_load_error_count;
296  }
297 
301 
302  // Serialize the start job IDs file if it was deleted during the database
303  // initialization. Even if it is overwriting an existing file there is no
304  // performance issue at this point (it's done once anyway).
306 
307  if (queue_load_error_count > 0) {
309  "There were error(s) loading the previous "
310  "instance dump. Number of errors: " +
311  NStr::NumericToString(queue_load_error_count) +
312  ". See log for all the loading errors. "
313  "Last error: " + last_queue_load_error);
314  x_BackupDump();
315  } else {
316  x_RemoveDump();
317  }
318 
320 }
321 
322 
325 {
326  TQueueParams queue_classes;
327  list<string> sections;
328 
329  reg.EnumerateSections(&sections);
330 
331  ITERATE(list<string>, it, sections) {
332  string queue_class;
333  string prefix;
334  const string & section_name = *it;
335 
336  NStr::SplitInTwo(section_name, "_", prefix, queue_class);
337  if (NStr::CompareNocase(prefix, "qclass") != 0)
338  continue;
339  if (queue_class.empty())
340  continue;
341  if (queue_class.size() > kMaxQueueNameSize - 1)
342  continue;
343 
344  // Warnings are ignored here. At this point they are not of interest
345  // because they have already been collected at the startup (allowed)
346  // or at RECO - a file with warnings is not allowed
347  SQueueParameters params;
348  vector<string> warnings;
349  if (params.ReadQueueClass(reg, section_name, warnings)) {
350  // false => problems with linked sections; see CXX-2617
351  // The same sections cannot appear twice
352  queue_classes[queue_class] = params;
353  }
354  }
355 
356  return queue_classes;
357 }
358 
359 
360 // Reads the queues from ini file and respects inheriting queue classes
361 // parameters
364  const TQueueParams & classes)
365 {
366  TQueueParams queues;
367  list<string> sections;
368 
369  reg.EnumerateSections(&sections);
370  ITERATE(list<string>, it, sections) {
371  string queue_name;
372  string prefix;
373  const string & section_name = *it;
374 
375  NStr::SplitInTwo(section_name, "_", prefix, queue_name);
376  if (NStr::CompareNocase(prefix, "queue") != 0)
377  continue;
378  if (queue_name.empty())
379  continue;
380  if (queue_name.size() > kMaxQueueNameSize - 1)
381  continue;
382 
383  // Warnings are ignored here. At this point they are not of interest
384  // because they have already been collected at the startup (allowed)
385  // or at RECO - a file with warnings is not allowed
386  SQueueParameters params;
387  vector<string> warnings;
388 
389  if (params.ReadQueue(reg, section_name, classes, warnings)) {
390  // false => problems with linked sections; see CXX-2617
391  queues[queue_name] = params;
392  }
393  }
394 
395  return queues;
396 }
397 
398 
400  CJsonNode & diff)
401 {
402  // Read the new content
403  typedef map< string, map< string, string > > section_container;
404  section_container new_values;
405  list<string> sections;
406  reg.EnumerateSections(&sections);
407 
408  ITERATE(list<string>, it, sections) {
409  string queue_or_class;
410  string prefix;
411  const string & section_name = *it;
412 
413  NStr::SplitInTwo(section_name, "_", prefix, queue_or_class);
414  if (queue_or_class.empty())
415  continue;
416  if (NStr::CompareNocase(prefix, "qclass") != 0 &&
417  NStr::CompareNocase(prefix, "queue") != 0)
418  continue;
419  if (queue_or_class.size() > kMaxQueueNameSize - 1)
420  continue;
421 
422  list<string> entries;
423  reg.EnumerateEntries(section_name, &entries);
424 
425  ITERATE(list<string>, k, entries) {
426  const string & entry = *k;
427  string ref_section = reg.GetString(section_name,
428  entry, kEmptyStr);
429 
430  if (!NStr::StartsWith(entry, "linked_section_", NStr::eCase))
431  continue;
432 
433  if (entry == "linked_section_")
434  continue; // Malformed values prefix
435 
436  if (ref_section.empty())
437  continue; // Malformed section name
438 
439  if (find(sections.begin(), sections.end(), ref_section) ==
440  sections.end())
441  continue; // Non-existing section
442 
443  if (new_values.find(ref_section) != new_values.end())
444  continue; // Has already been read
445 
446  // Read the linked section values
447  list<string> linked_section_entries;
448  reg.EnumerateEntries(ref_section, &linked_section_entries);
449  map<string, string> values;
450  for (list<string>::const_iterator j = linked_section_entries.begin();
451  j != linked_section_entries.end(); ++j)
452  values[*j] = reg.GetString(ref_section, *j, kEmptyStr);
453 
454  new_values[ref_section] = values;
455  }
456  }
457 
459 
460  // Identify those sections which were deleted
461  vector<string> deleted;
462  for (section_container::const_iterator k(m_LinkedSections.begin());
463  k != m_LinkedSections.end(); ++k)
464  if (new_values.find(k->first) == new_values.end())
465  deleted.push_back(k->first);
466 
467  if (!deleted.empty()) {
468  CJsonNode deletedSections = CJsonNode::NewArrayNode();
469  for (vector<string>::const_iterator k(deleted.begin());
470  k != deleted.end(); ++k)
471  deletedSections.AppendString( *k );
472  diff.SetByKey( "linked_section_deleted", deletedSections );
473  }
474 
475  // Identify those sections which were added
476  vector<string> added;
477  for (section_container::const_iterator k(new_values.begin());
478  k != new_values.end(); ++k)
479  if (m_LinkedSections.find(k->first) == m_LinkedSections.end())
480  added.push_back(k->first);
481 
482  if (!added.empty()) {
483  CJsonNode addedSections = CJsonNode::NewArrayNode();
484  for (vector<string>::const_iterator k(added.begin());
485  k != added.end(); ++k)
486  addedSections.AppendString( *k );
487  diff.SetByKey( "linked_section_added", addedSections );
488  }
489 
490  // Deal with changed sections: what was added/deleted/modified
491  vector<string> changed;
492  for (section_container::const_iterator k(new_values.begin());
493  k != new_values.end(); ++k) {
494  if (find(added.begin(), added.end(), k->first) != added.end())
495  continue;
496  if (new_values[k->first] == m_LinkedSections[k->first])
497  continue;
498  changed.push_back(k->first);
499  }
500 
501  if (!changed.empty()) {
502  CJsonNode changedSections = CJsonNode::NewObjectNode();
503  for (vector<string>::const_iterator k(changed.begin());
504  k != changed.end(); ++k)
505  changedSections.SetByKey( *k,
507  new_values[*k]) );
508  diff.SetByKey( "linked_section_changed", changedSections );
509  }
510 
511  // Finally, save the new configuration
512  m_LinkedSections = new_values;
513 }
514 
515 
516 CJsonNode
518  const map<string, string> & old_values,
519  const map<string, string> & new_values)
520 {
522 
523  // Deal with deleted items
524  vector<string> deleted;
525  for (map<string, string>::const_iterator k(old_values.begin());
526  k != old_values.end(); ++k)
527  if (new_values.find(k->first) == new_values.end())
528  deleted.push_back(k->first);
529  if (!deleted.empty()) {
530  CJsonNode deletedValues = CJsonNode::NewArrayNode();
531  for (vector<string>::const_iterator k(deleted.begin());
532  k != deleted.end(); ++k)
533  deletedValues.AppendString( *k );
534  diff.SetByKey( "deleted", deletedValues );
535  }
536 
537  // Deal with added items
538  vector<string> added;
539  for (map<string, string>::const_iterator k(new_values.begin());
540  k != new_values.end(); ++k)
541  if (old_values.find(k->first) == old_values.end())
542  added.push_back(k->first);
543  if (!added.empty()) {
544  CJsonNode addedValues = CJsonNode::NewArrayNode();
545  for (vector<string>::const_iterator k(added.begin());
546  k != added.end(); ++k)
547  addedValues.AppendString( *k );
548  diff.SetByKey( "added", addedValues );
549  }
550 
551  // Deal with changed values
552  vector<string> changed;
553  for (map<string, string>::const_iterator k(new_values.begin());
554  k != new_values.end(); ++k) {
555  if (old_values.find(k->first) == old_values.end())
556  continue;
557  if (old_values.find(k->first)->second ==
558  new_values.find(k->first)->second)
559  continue;
560  changed.push_back(k->first);
561  }
562  if (!changed.empty()) {
563  CJsonNode changedValues = CJsonNode::NewObjectNode();
564  for (vector<string>::const_iterator k(changed.begin());
565  k != changed.end(); ++k) {
567  values.AppendString( old_values.find(*k)->second );
568  values.AppendString( new_values.find(*k)->second );
569  changedValues.SetByKey( *k, values );
570  }
571  diff.SetByKey( "changed", changedValues );
572  }
573 
574  return diff;
575 }
576 
577 
578 // Validates the config from an ini file for the following:
579 // - a static queue redefines existed dynamic queue
580 void
582  const TQueueParams & queues_from_ini) const
583 {
584  // Check that static queues do not mess with existing dynamic queues
585  for (TQueueParams::const_iterator k = queues_from_ini.begin();
586  k != queues_from_ini.end(); ++k) {
587  TQueueInfo::const_iterator existing = m_Queues.find(k->first);
588 
589  if (existing == m_Queues.end())
590  continue;
591  if (existing->second.first.kind == CQueue::eKindDynamic)
592  NCBI_THROW(CNetScheduleException, eInvalidParameter,
593  "Configuration error. The queue '" + k->first +
594  "' clashes with a currently existing "
595  "dynamic queue of the same name.");
596  }
597 
598  // Config file is OK for the current configuration
599 }
600 
601 
602 unsigned int
603 CQueueDataBase::x_CountQueuesToAdd(const TQueueParams & queues_from_ini) const
604 {
605  unsigned int add_count = 0;
606 
607  for (TQueueParams::const_iterator k = queues_from_ini.begin();
608  k != queues_from_ini.end(); ++k) {
609 
610  if (m_Queues.find(k->first) == m_Queues.end())
611  ++add_count;
612  }
613 
614  return add_count;
615 }
616 
617 
618 // Updates what is stored in memory.
619 // Forms the diff string. Tells if there were changes.
620 bool
622  CJsonNode & diff)
623 {
624  bool has_changes = false;
625  vector<string> classes; // Used to store added and deleted classes
626 
627  // Delete from existed what was not found in the new classes
629  k != m_QueueClasses.end(); ++k) {
630  string old_queue_class = k->first;
631 
632  if (classes_from_ini.find(old_queue_class) != classes_from_ini.end())
633  continue;
634 
635  // The queue class is not in the configuration any more however it
636  // still could be in use for a dynamic queue. Leave it for the GC
637  // to check if the class has no reference to it and delete it
638  // accordingly.
639  // So, just mark it as for removal.
640 
641  if (k->second.delete_request)
642  continue; // Has already been marked for deletion
643 
644  k->second.delete_request = true;
645  classes.push_back(old_queue_class);
646  }
647 
648  if (!classes.empty()) {
649  has_changes = true;
650  CJsonNode deleted_classes = CJsonNode::NewArrayNode();
651  for (vector<string>::const_iterator k = classes.begin();
652  k != classes.end(); ++k)
653  deleted_classes.AppendString( *k );
654  diff.SetByKey( "deleted_queue_classes", deleted_classes );
655  }
656 
657 
658  // Check the updates in the classes
659  classes.clear();
660 
661  CJsonNode class_changes = CJsonNode::NewObjectNode();
663  k != m_QueueClasses.end(); ++k) {
664 
665  string queue_class = k->first;
666  TQueueParams::const_iterator new_class =
667  classes_from_ini.find(queue_class);
668 
669  if (new_class == classes_from_ini.end())
670  continue; // It is a candidate for deletion, so no diff
671 
672  // The same class found in the new configuration
673  if (k->second.delete_request) {
674  // The class was restored before GC deleted it. Update the flag
675  // and parameters
676  k->second = new_class->second;
677  classes.push_back(queue_class);
678  continue;
679  }
680 
681  // That's the same class which possibly was updated
682  // Do not compare class name here, this is a class itself
683  // Description should be compared
684  CJsonNode class_diff = k->second.Diff(new_class->second,
685  false, true);
686 
687  if (class_diff.GetSize() > 0) {
688  // There is a difference, update the class info
689  k->second = new_class->second;
690 
691  class_changes.SetByKey(queue_class, class_diff);
692  has_changes = true;
693  }
694  }
695  if (class_changes.GetSize() > 0)
696  diff.SetByKey("queue_class_changes", class_changes);
697 
698  // Check what was added
699  for (TQueueParams::const_iterator k = classes_from_ini.begin();
700  k != classes_from_ini.end(); ++k) {
701  string new_queue_class = k->first;
702 
703  if (m_QueueClasses.find(new_queue_class) == m_QueueClasses.end()) {
704  m_QueueClasses[new_queue_class] = k->second;
705  classes.push_back(new_queue_class);
706  }
707  }
708 
709  if (!classes.empty()) {
710  has_changes = true;
711  CJsonNode added_classes = CJsonNode::NewArrayNode();
712  for (vector<string>::const_iterator k = classes.begin();
713  k != classes.end(); ++k)
714  added_classes.AppendString(*k);
715  diff.SetByKey("added_queue_classes", added_classes);
716  }
717 
718  return has_changes;
719 }
720 
721 
722 // Updates the queue info in memory and creates/marks for deletion
723 // queues if necessary.
724 bool
726  CJsonNode & diff)
727 {
728  bool has_changes = false;
729  vector<string> deleted_queues;
730 
731  // Mark for deletion what disappeared
733  k != m_Queues.end(); ++k) {
734  if (k->second.first.kind == CQueue::eKindDynamic)
735  continue; // It's not the config business to deal
736  // with dynamic queues
737 
738  string old_queue = k->first;
739  if (queues_from_ini.find(old_queue) != queues_from_ini.end())
740  continue;
741 
742  // The queue is not in the configuration any more. It could
743  // still be in use or jobs could be still there. So mark it
744  // for deletion and forbid submits for the queue.
745  // GC will later delete it.
746 
747  if (k->second.first.delete_request)
748  continue; // Has already been marked for deletion
749 
750  CRef<CQueue> queue = k->second.second;
751  queue->SetRefuseSubmits(true);
752 
753  k->second.first.delete_request = true;
754  deleted_queues.push_back(k->first);
755  }
756 
757  if (!deleted_queues.empty()) {
758  has_changes = true;
760  for (vector<string>::const_iterator k = deleted_queues.begin();
761  k != deleted_queues.end(); ++k)
762  deleted.AppendString(*k);
763  diff.SetByKey("deleted_queues", deleted);
764  }
765 
766 
767  // Check the updates in the queue parameters
768  vector< pair<string,
769  string> > added_queues;
770  CJsonNode section_changes = CJsonNode::NewObjectNode();
771 
773  k != m_Queues.end(); ++k) {
774 
775  // The server configuration which affects the performance logging for
776  // queues could have been changed, so let the queues update the flag
777  k->second.second->UpdatePerfLoggingSettings(k->second.first.qclass);
778 
779 
780  if (k->second.first.kind == CQueue::eKindDynamic)
781  continue; // It's not the config business to deal
782  // with dynamic queues
783 
784  string queue_name = k->first;
785  TQueueParams::const_iterator new_queue =
786  queues_from_ini.find(queue_name);
787 
788  if (new_queue == queues_from_ini.end())
789  continue; // It is a candidate for deletion, or a dynamic queue;
790  // So no diff
791 
792  // The same queue is in the new configuration
793  if (k->second.first.delete_request) {
794  // The queue was restored before GC deleted it. Update the flag,
795  // parameters and allows submits and update parameters if so.
796  CRef<CQueue> queue = k->second.second;
797  queue->SetParameters(new_queue->second);
798  queue->SetRefuseSubmits(false);
799 
800  // The queue position must be preserved.
801  // The queue kind could not be changed here.
802  // The delete request is just checked.
803  int pos = k->second.first.position;
804 
805  k->second.first = new_queue->second;
806  k->second.first.position = pos;
807  added_queues.push_back(make_pair(queue_name,
808  k->second.first.qclass));
809  continue;
810  }
811 
812 
813  // That's the same queue which possibly was updated
814  // Class name should also be compared here
815  // Description should be compared here
816  CJsonNode queue_diff = k->second.first.Diff(new_queue->second,
817  true, true);
818 
819  if (queue_diff.GetSize() > 0) {
820  // There is a difference, update the queue info and the queue
821  CRef<CQueue> queue = k->second.second;
822  queue->SetParameters(new_queue->second);
823 
824  // The queue position must be preserved.
825  // The queue kind could not be changed here.
826  // The queue delete request could not be changed here.
827  int pos = k->second.first.position;
828 
829  k->second.first = new_queue->second;
830  k->second.first.position = pos;
831 
832  section_changes.SetByKey(queue_name, queue_diff);
833  has_changes = true;
834  }
835  }
836 
837  // Check dynamic queues classes. They might be updated.
839  k != m_Queues.end(); ++k) {
840 
841  if (k->second.first.kind != CQueue::eKindDynamic)
842  continue;
843  if (k->second.first.delete_request == true)
844  continue;
845 
846  // OK, this is dynamic queue, alive and not set for deletion
847  // Check if its class parameters have been updated/
848  TQueueParams::const_iterator queue_class =
849  m_QueueClasses.find(k->second.first.qclass);
850  if (queue_class == m_QueueClasses.end()) {
851  ERR_POST("Cannot find class '" + k->second.first.qclass +
852  "' for dynamic queue '" + k->first +
853  "'. Unexpected internal data inconsistency.");
854  continue;
855  }
856 
857  // Do not compare classes
858  // Do not compare description
859  // They do not make sense for dynamic queues because they are created
860  // with their own descriptions and the class does not have the 'class'
861  // field
862  CJsonNode class_diff = k->second.first.Diff(queue_class->second,
863  false, false);
864  if (class_diff.GetSize() > 0) {
865  // There is a difference in the queue class - update the
866  // parameters.
867  string old_class = k->second.first.qclass;
868  int old_pos = k->second.first.position;
869  string old_description = k->second.first.description;
870 
871  CRef<CQueue> queue = k->second.second;
872  queue->SetParameters(queue_class->second);
873 
874  k->second.first = queue_class->second;
875  k->second.first.qclass = old_class;
876  k->second.first.position = old_pos;
877  k->second.first.description = old_description;
878  k->second.first.kind = CQueue::eKindDynamic;
879 
880  section_changes.SetByKey(k->first, class_diff);
881  has_changes = true;
882  }
883  }
884 
885  if (section_changes.GetSize() > 0)
886  diff.SetByKey("queue_changes", section_changes);
887 
888 
889  // Check what was added
890  for (TQueueParams::const_iterator k = queues_from_ini.begin();
891  k != queues_from_ini.end(); ++k) {
892  string new_queue_name = k->first;
893 
894  if (m_Queues.find(new_queue_name) == m_Queues.end()) {
895  // No need to check the allocation success here. It was checked
896  // before that the server has enough resources for the new
897  // configuration.
898  int new_position = m_QueueDbBlockArray.Allocate();
899 
900  x_CreateAndMountQueue(new_queue_name, k->second,
901  m_QueueDbBlockArray.Get(new_position));
902 
903  m_Queues[new_queue_name].first.position = new_position;
904 
905  added_queues.push_back(make_pair(new_queue_name, k->second.qclass));
906  }
907  }
908 
909  if (!added_queues.empty()) {
910  has_changes = true;
912  for (vector< pair<string, string> >::const_iterator
913  k = added_queues.begin();
914  k != added_queues.end(); ++k)
915  added.SetByKey(k->first, CJsonNode::NewStringNode(k->second));
916  diff.SetByKey("added_queues", added);
917  }
918 
919  return has_changes;
920 }
921 
922 
924  CJsonNode & diff)
925 {
927 
928  // Read the configured queues and classes from the ini file
929  TQueueParams classes_from_ini =
931  TQueueParams queues_from_ini =
933  classes_from_ini);
934 
935  // Validate basic consistency of the incoming configuration
936  x_ValidateConfiguration(queues_from_ini);
937 
938  x_ReadLinkedSections(reg, diff);
939 
940  // Check that the there are enough slots for the new queues if so
941  // configured
942  unsigned int to_add_count = x_CountQueuesToAdd(queues_from_ini);
943  unsigned int available_count = m_QueueDbBlockArray.CountAvailable();
944 
945  if (to_add_count > available_count)
946  NCBI_THROW(CNetScheduleException, eInvalidParameter,
947  "New configuration slots requirement: " +
948  NStr::NumericToString(to_add_count) +
949  ". Number of available slots: " +
950  NStr::NumericToString(available_count) + ".");
951 
952  // Here: validation is finished. There is enough resources for the new
953  // configuration.
954  x_ConfigureQueueClasses(classes_from_ini, diff);
955  x_ConfigureQueues(queues_from_ini, diff);
956  return CalculateRuntimePrecision();
957 }
958 
959 
961 {
962  // Calculate the new min_run_timeout: required at the time of loading
963  // NetSchedule and not used while reconfiguring on the fly
964  CNSPreciseTime min_precision = kTimeNever;
966  k != m_QueueClasses.end(); ++k)
967  min_precision = std::min(min_precision,
968  k->second.CalculateRuntimePrecision());
970  k != m_Queues.end(); ++k)
971  min_precision = std::min(min_precision,
972  k->second.first.CalculateRuntimePrecision());
973  return min_precision;
974 }
975 
976 
977 unsigned int CQueueDataBase::CountActiveJobs(void) const
978 {
979  unsigned int cnt = 0;
981 
983  k != m_Queues.end(); ++k)
984  cnt += k->second.second->CountActiveJobs();
985 
986  return cnt;
987 }
988 
989 
990 unsigned int CQueueDataBase::CountAllJobs(void) const
991 {
992  unsigned int cnt = 0;
994 
996  k != m_Queues.end(); ++k)
997  cnt += k->second.second->CountAllJobs();
998 
999  return cnt;
1000 }
1001 
1002 
1003 bool CQueueDataBase::AnyJobs(void) const
1004 {
1006 
1008  k != m_Queues.end(); ++k)
1009  if (k->second.second->AnyJobs())
1010  return true;
1011 
1012  return false;
1013 }
1014 
1015 
1017 {
1020 
1021  if (found == m_Queues.end())
1022  NCBI_THROW(CNetScheduleException, eUnknownQueue,
1023  "Queue '" + name + "' is not found.");
1024 
1025  return found->second.second;
1026 }
1027 
1028 
1029 void
1031  const SQueueParameters & params,
1032  SQueueDbBlock * queue_db_block)
1033 {
1034  auto_ptr<CQueue> q(new CQueue(m_Executor, qname,
1035  params.kind, m_Server, *this));
1036 
1037  q->Attach(queue_db_block);
1038  q->SetParameters(params);
1039 
1040  m_Queues[qname] = make_pair(params, q.release());
1041 
1042  GetDiagContext().Extra()
1043  .Print("_type", "startup")
1044  .Print("_queue", qname)
1045  .Print("qclass", params.qclass)
1046  .Print("info", "mount");
1047 }
1048 
1049 
1050 bool CQueueDataBase::QueueExists(const string & qname) const
1051 {
1053  return m_Queues.find(qname) != m_Queues.end();
1054 }
1055 
1056 
1058  const string & qname,
1059  const string & qclass,
1060  const string & description)
1061 {
1063 
1064  // Queue name clashes
1065  if (m_Queues.find(qname) != m_Queues.end())
1066  NCBI_THROW(CNetScheduleException, eDuplicateName,
1067  "Queue '" + qname + "' already exists.");
1068 
1069  // Queue class existance
1070  TQueueParams::const_iterator queue_class = m_QueueClasses.find(qclass);
1071  if (queue_class == m_QueueClasses.end())
1072  NCBI_THROW(CNetScheduleException, eUnknownQueueClass,
1073  "Queue class '" + qclass +
1074  "' for queue '" + qname + "' is not found.");
1075 
1076  // And class is not marked for deletion
1077  if (queue_class->second.delete_request)
1078  NCBI_THROW(CNetScheduleException, eUnknownQueueClass,
1079  "Queue class '" + qclass +
1080  "' for queue '" + qname + "' is marked for deletion.");
1081 
1082  // Slot availability
1084  NCBI_THROW(CNetScheduleException, eUnknownQueue,
1085  "Cannot allocate queue '" + qname +
1086  "'. max_queues limit reached.");
1087 
1088  // submitter and program restrictions must be checked
1089  // for the class
1090  if (!client.IsAdmin()) {
1091  if (!queue_class->second.subm_hosts.empty()) {
1093  acl.SetHosts(queue_class->second.subm_hosts);
1094  if (!acl.IsAllowed(client.GetAddress())) {
1095  m_Server->RegisterAlert(eAccess, "submitter privileges required"
1096  " to create a dynamic queue");
1097  NCBI_THROW(CNetScheduleException, eAccessDenied,
1098  "Access denied: submitter privileges required");
1099  }
1100  }
1101  if (!queue_class->second.program_name.empty()) {
1103  bool ok = false;
1104 
1105  acl.AddClientInfo(queue_class->second.program_name);
1106  try {
1107  CQueueClientInfo auth_prog_info;
1109  &auth_prog_info.client_name,
1110  &auth_prog_info.version_info);
1111  ok = acl.IsMatchingClient(auth_prog_info);
1112  } catch (...) {
1113  // Parsing errors
1114  ok = false;
1115  }
1116 
1117  if (!ok) {
1118  m_Server->RegisterAlert(eAccess, "program privileges required "
1119  "to create a dynamic queue");
1120  NCBI_THROW(CNetScheduleException, eAccessDenied,
1121  "Access denied: program privileges required");
1122  }
1123  }
1124  }
1125 
1126 
1127  // All the preconditions are met. Create the queue
1128  int new_position = m_QueueDbBlockArray.Allocate();
1129 
1130  SQueueParameters params = queue_class->second;
1131 
1132  params.kind = CQueue::eKindDynamic;
1133  params.position = new_position;
1134  params.delete_request = false;
1135  params.qclass = qclass;
1136  params.description = description;
1137 
1138  x_CreateAndMountQueue(qname, params, m_QueueDbBlockArray.Get(new_position));
1139 }
1140 
1141 
1143  const string & qname)
1144 {
1146  TQueueInfo::iterator found_queue = m_Queues.find(qname);
1147 
1148  if (found_queue == m_Queues.end())
1149  NCBI_THROW(CNetScheduleException, eUnknownQueue,
1150  "Queue '" + qname + "' is not found." );
1151 
1152  if (found_queue->second.first.kind != CQueue::eKindDynamic)
1153  NCBI_THROW(CNetScheduleException, eInvalidParameter,
1154  "Queue '" + qname + "' is static and cannot be deleted.");
1155 
1156  // submitter and program restrictions must be checked
1157  CRef<CQueue> queue = found_queue->second.second;
1158  if (!client.IsAdmin()) {
1159  if (!queue->IsSubmitAllowed(client.GetAddress()))
1160  NCBI_THROW(CNetScheduleException, eAccessDenied,
1161  "Access denied: submitter privileges required");
1162  if (!queue->IsProgramAllowed(client.GetProgramName()))
1163  NCBI_THROW(CNetScheduleException, eAccessDenied,
1164  "Access denied: program privileges required");
1165  }
1166 
1167  found_queue->second.first.delete_request = true;
1168  queue->SetRefuseSubmits(true);
1169 }
1170 
1171 
1172 SQueueParameters CQueueDataBase::QueueInfo(const string & qname) const
1173 {
1175  TQueueInfo::const_iterator found_queue = m_Queues.find(qname);
1176 
1177  if (found_queue == m_Queues.end())
1178  NCBI_THROW(CNetScheduleException, eUnknownQueue,
1179  "Queue '" + qname + "' is not found." );
1180 
1181  return x_SingleQueueInfo(found_queue);
1182 }
1183 
1184 
1185 /* Note: this member must be called under a lock, it's not thread safe */
1188 {
1189  SQueueParameters params = found->second.first;
1190 
1191  // The fields below are used as a transport.
1192  // Usually used by QINF2 and STAT QUEUES
1193  params.refuse_submits = found->second.second->GetRefuseSubmits();
1194  params.pause_status = found->second.second->GetPauseStatus();
1196  params.aff_slots_used = found->second.second->GetAffSlotsUsed();
1198  params.group_slots_used = found->second.second->GetGroupSlotsUsed();
1200  params.scope_slots_used = found->second.second->GetScopeSlotsUsed();
1201  params.clients = found->second.second->GetClientsCount();
1202  params.groups = found->second.second->GetGroupsCount();
1203  params.gc_backlog = found->second.second->GetGCBacklogCount();
1204  params.notif_count = found->second.second->GetNotifCount();
1205  return params;
1206 }
1207 
1208 
1209 string CQueueDataBase::GetQueueNames(const string & sep) const
1210 {
1211  string names;
1213 
1215  k != m_Queues.end(); ++k)
1216  names += k->first + sep;
1217 
1218  return names;
1219 }
1220 
1221 
1223 {
1224  // Check that we're still open
1225  if (!m_Env)
1226  return;
1227 
1228  StopNotifThread();
1229  StopPurgeThread();
1232 
1233  // Print the statistics counters last time
1235  size_t aff_count = 0;
1236 
1237  PrintStatistics(aff_count);
1239  }
1240 
1242  // That was a not interrupted drain shutdown so there is no
1243  // need to dump anything
1244  LOG_POST("Drained shutdown: the DB has been successfully drained");
1246  } else {
1247  // That was either:
1248  // - hard shutdown
1249  // - hard shutdown when drained shutdown is in process
1250  if (m_Server->IsDrainShutdown())
1251  ERR_POST(Warning <<
1252  "Drained shutdown: DB draining has not been completed "
1253  "when a hard shutdown is received. "
1254  "Shutting down immediately.");
1255 
1256  // Dump all the queues/queue classes/queue parameters to flat files
1257  x_Dump();
1258 
1259  // A Dump is created, so we may avoid calling
1260  // - m_Env->ForceTransactionCheckpoint();
1261  // - m_Env->CleanLog();
1262  // which may take very long time to complete
1263 
1265 
1266  // CQueue objects destructors are called from here because the last
1267  // reference to the object has gone
1268  m_Queues.clear();
1269 
1270  // The call below may also cause a very long shutdown. At this point
1271  // we are not interested in proper BDB file closing because the jobs
1272  // have already been dumped. Thus we can ommit calling the DBD files
1273  // close and just remove them a few statements later.
1274  // The only caveat is that valgrind will complain on memory leaks. This
1275  // is however minor in comparison to the shutdown time consumption.
1276  // With the close() commented out it only depends on the current number
1277  // of jobs but not on the number of job since start.
1278  // Close pre-allocated databases
1279  // m_QueueDbBlockArray.Close();
1280  }
1281 
1282  delete m_Env;
1283  m_Env = 0;
1284 
1285  // BDB files are not needed anymore. They could be safely deleted.
1286  x_RemoveBDBFiles();
1288 }
1289 
1290 
1292 {
1294  if (clean_log)
1295  m_Env->CleanLog();
1296 }
1297 
1298 
1300 {
1301  string result;
1304  k != m_Queues.end(); ++k)
1305  result += "OK:[queue " + k->first + "]\n" +
1306  k->second.second->PrintTransitionCounters();
1307  return result;
1308 }
1309 
1310 
1312 {
1313  string result;
1314  vector<string> warnings;
1317  k != m_Queues.end(); ++k)
1318  // Group and affinity tokens make no sense for the server,
1319  // so they are both "".
1320  result += "OK:[queue " + k->first + "]\n" +
1321  k->second.second->PrintJobsStat(client, "", "", warnings);
1322  return result;
1323 }
1324 
1325 
1327 {
1328  string output;
1330 
1332  k != m_QueueClasses.end(); ++k) {
1333  if (!output.empty())
1334  output += "\n";
1335 
1336  // false - not to include qclass
1337  // false - not URL encoded format
1338  output += "OK:[qclass " + k->first + "]\n" +
1339  k->second.GetPrintableParameters(false, false);
1340 
1342  j = k->second.linked_sections.begin();
1343  j != k->second.linked_sections.end(); ++j) {
1344  string prefix((j->first).c_str() + strlen("linked_section_"));
1345  string section_name = j->second;
1346 
1347  map<string, string> values = GetLinkedSection(section_name);
1348  for (map<string, string>::const_iterator m = values.begin();
1349  m != values.end(); ++m)
1350  output += "\nOK:" + prefix + "." + m->first + ": " + m->second;
1351  }
1352  }
1353  return output;
1354 }
1355 
1356 
1358 {
1359  string output;
1362  k != m_QueueClasses.end(); ++k) {
1363  if (!output.empty())
1364  output += "\n";
1365  output += "[qclass_" + k->first + "]\n" +
1366  k->second.ConfigSection(true);
1367  }
1368  return output;
1369 }
1370 
1371 
1373 {
1374  string output;
1376 
1378  k != m_Queues.end(); ++k) {
1379  if (!output.empty())
1380  output += "\n";
1381 
1382  // true - include qclass
1383  // false - not URL encoded format
1384  output += "OK:[queue " + k->first + "]\n" +
1385  x_SingleQueueInfo(k).GetPrintableParameters(true, false);
1386 
1388  j = k->second.first.linked_sections.begin();
1389  j != k->second.first.linked_sections.end(); ++j) {
1390  string prefix((j->first).c_str() + strlen("linked_section_"));
1391  string section_name = j->second;
1392 
1393  map<string, string> values = GetLinkedSection(section_name);
1394  for (map<string, string>::const_iterator m = values.begin();
1395  m != values.end(); ++m)
1396  output += "\nOK:" + prefix + "." + m->first + ": " + m->second;
1397  }
1398  }
1399  return output;
1400 }
1401 
1403 {
1404  string output;
1407  k != m_Queues.end(); ++k) {
1408  if (!output.empty())
1409  output += "\n";
1410  output += "[queue_" + k->first + "]\n" +
1411  k->second.first.ConfigSection(false);
1412  }
1413  return output;
1414 }
1415 
1416 
1418 {
1419  string output;
1421 
1422  for (map< string, map< string, string > >::const_iterator
1423  k = m_LinkedSections.begin();
1424  k != m_LinkedSections.end(); ++k) {
1425  if (!output.empty())
1426  output += "\n";
1427  output += "[" + k->first + "]\n";
1428  for (map< string, string >::const_iterator j = k->second.begin();
1429  j != k->second.end(); ++j) {
1430  output += j->first + "=\"" + j->second + "\"\n";
1431  }
1432  }
1433  return output;
1434 }
1435 
1436 
1438 CQueueDataBase::GetLinkedSection(const string & section_name) const
1439 {
1441  map< string, map< string, string > >::const_iterator found =
1442  m_LinkedSections.find(section_name);
1443  if (found == m_LinkedSections.end())
1444  return map< string, string >();
1445  return found->second;
1446 }
1447 
1448 
1450 {
1451  CNSPreciseTime current_time = CNSPreciseTime::Current();
1452  for (unsigned int index = 0; ; ++index) {
1453  CRef<CQueue> queue = x_GetQueueAt(index);
1454  if (queue.IsNull())
1455  break;
1456  queue->NotifyListenersPeriodically(current_time);
1457  }
1458 }
1459 
1460 
1461 void CQueueDataBase::PrintStatistics(size_t & aff_count)
1462 {
1464 
1466  k != m_Queues.end(); ++k)
1467  k->second.second->PrintStatistics(aff_count);
1468 }
1469 
1470 
1472 {
1474 
1476  k != m_Queues.end(); ++k)
1477  k->second.second->PrintJobCounters();
1478 }
1479 
1480 
1482 {
1483  for (unsigned int index = 0; ; ++index) {
1484  CRef<CQueue> queue = x_GetQueueAt(index);
1485  if (queue.IsNull())
1486  break;
1487  queue->CheckExecutionTimeout(logging);
1488  }
1489 }
1490 
1491 
1492 // Checks if the queues marked for deletion could be really deleted
1493 // and deletes them if so. Deletes queue classes which are marked
1494 // for deletion if there are no links to them.
1496 {
1497  // It's better to avoid quering the queues under a lock so
1498  // let's first build a list of CRefs to the candidate queues.
1499  list< pair< string, CRef< CQueue > > > candidates;
1500 
1501  {{
1504  k != m_Queues.end(); ++k)
1505  if (k->second.first.delete_request)
1506  candidates.push_back(make_pair(k->first, k->second.second));
1507  }}
1508 
1509  // Now the queues are queiried if they are empty without a lock
1510  list< pair< string, CRef< CQueue > > >::iterator
1511  k = candidates.begin();
1512  while (k != candidates.end()) {
1513  if (k->second->IsEmpty() == false)
1514  k = candidates.erase(k);
1515  else
1516  ++k;
1517  }
1518 
1519  if (candidates.empty())
1520  return;
1521 
1522  // Here we have a list of the queues which can be deleted
1523  // Take a lock and delete the queues plus check queue classes
1525  for (k = candidates.begin(); k != candidates.end(); ++k) {
1526  // It's only here where a queue can be deleted so it's safe not
1527  // to check the iterator
1528  TQueueInfo::iterator queue = m_Queues.find(k->first);
1529 
1530  // Deallocation of the DB block will be done later when the queue
1531  // is actually deleted
1532  queue->second.second->MarkForTruncating();
1533  m_Queues.erase(queue);
1534  }
1535 
1536  // Now, while still holding the lock, let's check queue classes
1537  vector< string > classes_to_delete;
1539  j != m_QueueClasses.end(); ++j) {
1540  if (j->second.delete_request) {
1541  bool in_use = false;
1543  m != m_Queues.end(); ++m) {
1544  if (m->second.first.qclass == j->first) {
1545  in_use = true;
1546  break;
1547  }
1548  }
1549  if (in_use == false)
1550  classes_to_delete.push_back(j->first);
1551  }
1552  }
1553 
1554  for (vector< string >::const_iterator k = classes_to_delete.begin();
1555  k != classes_to_delete.end(); ++k) {
1556  // It's only here where a queue class can be deleted so
1557  // it's safe not to check the iterator
1559  }
1560 }
1561 
1562 
1563 /* Data used in CQueueDataBase::Purge() only */
1571 };
1572 const size_t kStatusesSize = sizeof(statuses_to_delete_from) /
1574 
1576 {
1577  size_t max_mark_deleted = m_Server->GetMarkdelBatchSize();
1578  size_t max_scanned = m_Server->GetScanBatchSize();
1579  size_t total_scanned = 0;
1580  size_t total_mark_deleted = 0;
1581  CNSPreciseTime current_time = CNSPreciseTime::Current();
1582  bool limit_reached = false;
1583 
1584  // Cleanup the queues and classes if possible
1586 
1587  // Part I: from the last end till the end of the list
1588  CRef<CQueue> start_queue = x_GetLastPurged();
1589  CRef<CQueue> current_queue = start_queue;
1590  size_t start_status_index = m_PurgeStatusIndex;
1591  unsigned int start_job_id = m_PurgeJobScanned;
1592 
1593  while (current_queue.IsNull() == false) {
1594  m_PurgeQueue = current_queue->GetQueueName();
1595  if (x_PurgeQueue(current_queue.GetObject(),
1596  m_PurgeStatusIndex, kStatusesSize - 1,
1597  m_PurgeJobScanned, 0,
1598  max_scanned, max_mark_deleted,
1599  current_time,
1600  total_scanned, total_mark_deleted) == true)
1601  return;
1602 
1603  if (total_mark_deleted >= max_mark_deleted ||
1604  total_scanned >= max_scanned) {
1605  limit_reached = true;
1606  break;
1607  }
1608  current_queue = x_GetNext(m_PurgeQueue);
1609  }
1610 
1611 
1612  // Part II: from the beginning of the list till the last end
1613  if (limit_reached == false) {
1614  current_queue = x_GetFirst();
1615  while (current_queue.IsNull() == false) {
1616  if (current_queue->GetQueueName() == start_queue->GetQueueName())
1617  break;
1618 
1619  m_PurgeQueue = current_queue->GetQueueName();
1620  if (x_PurgeQueue(current_queue.GetObject(),
1621  m_PurgeStatusIndex, kStatusesSize - 1,
1622  m_PurgeJobScanned, 0,
1623  max_scanned, max_mark_deleted,
1624  current_time,
1625  total_scanned, total_mark_deleted) == true)
1626  return;
1627 
1628  if (total_mark_deleted >= max_mark_deleted ||
1629  total_scanned >= max_scanned) {
1630  limit_reached = true;
1631  break;
1632  }
1633  current_queue = x_GetNext(m_PurgeQueue);
1634  }
1635  }
1636 
1637  // Part III: it might need to check the statuses in the queue we started
1638  // with if the start status was not the first one.
1639  if (limit_reached == false) {
1640  if (start_queue.IsNull() == false) {
1641  m_PurgeQueue = start_queue->GetQueueName();
1642  if (start_status_index > 0) {
1643  if (x_PurgeQueue(start_queue.GetObject(),
1644  0, start_status_index - 1,
1645  m_PurgeJobScanned, 0,
1646  max_scanned, max_mark_deleted,
1647  current_time,
1648  total_scanned, total_mark_deleted) == true)
1649  return;
1650  }
1651  }
1652  }
1653 
1654  if (limit_reached == false) {
1655  if (start_queue.IsNull() == false) {
1656  m_PurgeQueue = start_queue->GetQueueName();
1657  if (x_PurgeQueue(start_queue.GetObject(),
1658  start_status_index, start_status_index,
1659  0, start_job_id,
1660  max_scanned, max_mark_deleted,
1661  current_time,
1662  total_scanned, total_mark_deleted) == true)
1663  return;
1664  }
1665  }
1666 
1667 
1668  // Part IV: purge the found candidates and optimize the memory if required
1671 
1672  x_OptimizeStatusMatrix(current_time);
1673 }
1674 
1675 
1677 {
1679 
1680  if (m_PurgeQueue.empty()) {
1681  if (m_Queues.empty())
1682  return CRef<CQueue>(NULL);
1683  return m_Queues.begin()->second.second;
1684  }
1685 
1686  for (TQueueInfo::iterator it = m_Queues.begin();
1687  it != m_Queues.end(); ++it)
1688  if (it->first == m_PurgeQueue)
1689  return it->second.second;
1690 
1691  // Not found, which means the queue was deleted. Pick a random one
1692  m_PurgeStatusIndex = 0;
1693  m_PurgeJobScanned = 0;
1694 
1695  int queue_num = ((rand() * 1.0) / RAND_MAX) * m_Queues.size();
1696  int k = 1;
1697  for (TQueueInfo::iterator it = m_Queues.begin();
1698  it != m_Queues.end(); ++it) {
1699  if (k >= queue_num)
1700  return it->second.second;
1701  ++k;
1702  }
1703 
1704  // Cannot happened, so just be consistent with the return value
1705  return m_Queues.begin()->second.second;
1706 }
1707 
1708 
1710 {
1712 
1713  if (m_Queues.empty())
1714  return CRef<CQueue>(NULL);
1715  return m_Queues.begin()->second.second;
1716 }
1717 
1718 
1719 CRef<CQueue> CQueueDataBase::x_GetNext(const string & current_name)
1720 {
1722 
1723  if (m_Queues.empty())
1724  return CRef<CQueue>(NULL);
1725 
1726  for (TQueueInfo::iterator it = m_Queues.begin();
1727  it != m_Queues.end(); ++it) {
1728  if (it->first == current_name) {
1729  ++it;
1730  if (it == m_Queues.end())
1731  return CRef<CQueue>(NULL);
1732  return it->second.second;
1733  }
1734  }
1735 
1736  // May not really happen. Let's have just in case.
1737  return CRef<CQueue>(NULL);
1738 }
1739 
1740 
1741 // Purges jobs from a queue starting from the given status.
1742 // Returns true if the purge should be stopped.
1743 // The status argument is a status to start from
1745  size_t status,
1746  size_t status_to_end,
1747  unsigned int start_job_id,
1748  unsigned int end_job_id,
1749  size_t max_scanned,
1750  size_t max_mark_deleted,
1751  const CNSPreciseTime & current_time,
1752  size_t & total_scanned,
1753  size_t & total_mark_deleted)
1754 {
1755  SPurgeAttributes purge_io;
1756 
1757  for (; status <= status_to_end; ++status) {
1758  purge_io.scans = max_scanned - total_scanned;
1759  purge_io.deleted = max_mark_deleted - total_mark_deleted;
1760  purge_io.job_id = start_job_id;
1761 
1762  purge_io = queue.CheckJobsExpiry(current_time, purge_io,
1763  end_job_id,
1764  statuses_to_delete_from[status]);
1765  total_scanned += purge_io.scans;
1766  total_mark_deleted += purge_io.deleted;
1767  m_PurgeJobScanned = purge_io.job_id;
1768 
1769  if (x_CheckStopPurge())
1770  return true;
1771 
1772  if (total_mark_deleted >= max_mark_deleted ||
1773  total_scanned >= max_scanned) {
1774  m_PurgeStatusIndex = status;
1775  return false;
1776  }
1777  }
1778  m_PurgeStatusIndex = 0;
1779  m_PurgeJobScanned = 0;
1780  return false;
1781 }
1782 
1783 
1785 {
1786  try {
1787  CFile crash_file(CFile::MakePath(m_DataPath,
1789  if (!crash_file.Exists()) {
1790  CFileIO f;
1794  f.Close();
1795  }
1796  }
1797  catch (...) {
1798  ERR_POST("Error creating crash detection file.");
1799  }
1800 }
1801 
1802 
1804 {
1805  try {
1806  CFile crash_file(CFile::MakePath(m_DataPath,
1808  if (crash_file.Exists())
1809  crash_file.Remove();
1810  }
1811  catch (...) {
1812  ERR_POST("Error removing crash detection file. When the server "
1813  "restarts it will re-initialize the database.");
1814  }
1815 }
1816 
1817 
1819 {
1821 }
1822 
1823 
1825 {
1826  try {
1827  CFile crash_file(CFile::MakePath(m_DataPath,
1829  if (!crash_file.Exists()) {
1830  CFileIO f;
1834  f.Close();
1835  }
1836  }
1837  catch (...) {
1838  ERR_POST("Error creating dump error detection file.");
1839  }
1840 }
1841 
1842 
1844 {
1846 }
1847 
1848 
1850 {
1851  try {
1852  CFile crash_file(CFile::MakePath(m_DataPath,
1854  if (crash_file.Exists())
1855  crash_file.Remove();
1856  }
1857  catch (...) {
1858  ERR_POST("Error removing dump error detection file. When the server "
1859  "restarts it will set an alert.");
1860  }
1861 }
1862 
1863 
1865  // Purge unconditional jobs
1866  unsigned int del_rec = 0;
1867  unsigned int max_deleted = m_Server->GetDeleteBatchSize();
1868 
1869 
1870  for (unsigned int index = 0; ; ++index) {
1871  CRef<CQueue> queue = x_GetQueueAt(index);
1872  if (queue.IsNull())
1873  break;
1874  del_rec += queue->DeleteBatch(max_deleted - del_rec);
1875  if (del_rec >= max_deleted)
1876  break;
1877  }
1878  return del_rec;
1879 }
1880 
1881 
1882 void
1884 {
1885  // optimize memory every 15 min. or after 1 million of deleted records
1886  static const int kMemFree_Delay = 15 * 60;
1887  static const unsigned kRecordThreshold = 1000000;
1888 
1889  if ((m_FreeStatusMemCnt > kRecordThreshold) ||
1890  (m_LastFreeMem + kMemFree_Delay <= current_time)) {
1891  m_FreeStatusMemCnt = 0;
1892  m_LastFreeMem = current_time;
1893 
1894  for (unsigned int index = 0; ; ++index) {
1895  CRef<CQueue> queue = x_GetQueueAt(index);
1896  if (queue.IsNull())
1897  break;
1898  queue->OptimizeMem();
1899  if (x_CheckStopPurge())
1900  break;
1901  }
1902  }
1903 }
1904 
1905 
1907 {
1908  // No need to guard, this operation is thread safe
1909  m_StopPurge = true;
1910 }
1911 
1912 
1914 {
1916  bool stop_purge = m_StopPurge;
1917 
1918  m_StopPurge = false;
1919  return stop_purge;
1920 }
1921 
1922 
1924 {
1925  double purge_timeout = m_Server->GetPurgeTimeout();
1926  unsigned int sec_delay = purge_timeout;
1927  unsigned int nanosec_delay = (purge_timeout - sec_delay)*1000000000;
1928 
1930  m_Host, *this, sec_delay, nanosec_delay,
1932  m_PurgeThread->Run();
1933 }
1934 
1935 
1937 {
1938  if (!m_PurgeThread.Empty()) {
1939  StopPurge();
1941  m_PurgeThread->Join();
1942  m_PurgeThread.Reset(0);
1943  }
1944 }
1945 
1946 
1948 {
1949  for (unsigned int index = 0; ; ++index) {
1950  CRef<CQueue> queue = x_GetQueueAt(index);
1951  if (queue.IsNull())
1952  break;
1953  queue->PurgeAffinities();
1954  if (x_CheckStopPurge())
1955  break;
1956  }
1957 }
1958 
1959 
1961 {
1962  for (unsigned int index = 0; ; ++index) {
1963  CRef<CQueue> queue = x_GetQueueAt(index);
1964  if (queue.IsNull())
1965  break;
1966  queue->PurgeGroups();
1967  if (x_CheckStopPurge())
1968  break;
1969  }
1970 }
1971 
1972 
1974 {
1975  // Worker nodes have the last access time in seconds since 1970
1976  // so there is no need to purge them more often than once a second
1977  static CNSPreciseTime last_purge(0, 0);
1978  CNSPreciseTime current_time = CNSPreciseTime::Current();
1979 
1980  if (current_time.Sec() == last_purge.Sec())
1981  return;
1982  last_purge = current_time;
1983 
1984  for (unsigned int index = 0; ; ++index) {
1985  CRef<CQueue> queue = x_GetQueueAt(index);
1986  if (queue.IsNull())
1987  break;
1988  queue->StaleNodes(current_time);
1989  if (x_CheckStopPurge())
1990  break;
1991  }
1992 }
1993 
1994 
1996 {
1997  static CNSPreciseTime period(30, 0);
1998  static CNSPreciseTime last_time(0, 0);
1999  CNSPreciseTime current_time = CNSPreciseTime::Current();
2000 
2001  // Run this check once in ten seconds
2002  if (current_time - last_time < period)
2003  return;
2004 
2005  last_time = current_time;
2006 
2007  for (unsigned int index = 0; ; ++index) {
2008  CRef<CQueue> queue = x_GetQueueAt(index);
2009  if (queue.IsNull())
2010  break;
2011  queue->PurgeBlacklistedJobs();
2012  if (x_CheckStopPurge())
2013  break;
2014  }
2015 }
2016 
2017 
2019 {
2020  static CNSPreciseTime period(5, 0);
2021  static CNSPreciseTime last_time(0, 0);
2022  CNSPreciseTime current_time = CNSPreciseTime::Current();
2023 
2024  // Run this check once in five seconds
2025  if (current_time - last_time < period)
2026  return;
2027 
2028  last_time = current_time;
2029 
2030  for (unsigned int index = 0; ; ++index) {
2031  CRef<CQueue> queue = x_GetQueueAt(index);
2032  if (queue.IsNull())
2033  break;
2034  queue->PurgeClientRegistry(current_time);
2035  if (x_CheckStopPurge())
2036  break;
2037  }
2038 }
2039 
2040 
2042 {
2043  for (unsigned int index = 0; ; ++index) {
2044  CRef<CQueue> queue = x_GetQueueAt(index);
2045  if (queue.IsNull())
2046  break;
2047  queue->PurgeJobInfoCache();
2048  if (x_CheckStopPurge())
2049  break;
2050  }
2051 }
2052 
2053 
2054 // Safely provides a queue at the given index
2056 {
2057  unsigned int current_index = 0;
2059 
2060  for (TQueueInfo::iterator k = m_Queues.begin();
2061  k != m_Queues.end(); ++k) {
2062  if (current_index == index)
2063  return k->second.second;
2064  ++current_index;
2065  }
2066  return CRef<CQueue>(NULL);
2067 }
2068 
2069 
2071 {
2072  // 10 times per second
2074  *this, 0, 100000000,
2076  m_NotifThread->Run();
2077 }
2078 
2079 
2081 {
2082  if (!m_NotifThread.Empty()) {
2084  m_NotifThread->Join();
2085  m_NotifThread.Reset(0);
2086  }
2087 }
2088 
2089 
2091 {
2092  if (!m_NotifThread.Empty())
2093  m_NotifThread->WakeUp();
2094 }
2095 
2096 
2099 {
2100  CNSPreciseTime next = kTimeNever;
2101  CNSPreciseTime from_queue;
2102 
2103  for (unsigned int index = 0; ; ++index) {
2104  CRef<CQueue> queue = x_GetQueueAt(index);
2105  if (queue.IsNull())
2106  break;
2107  from_queue = queue->NotifyExactListeners();
2108  if (from_queue < next )
2109  next = from_queue;
2110  }
2111  return next;
2112 }
2113 
2114 
2116 {
2117  // Once in 100 seconds
2119  *m_Server, m_Host, *this,
2123  m_ServiceThread->Run();
2124 }
2125 
2126 
2128 {
2129  if (!m_ServiceThread.Empty()) {
2131  m_ServiceThread->Join();
2133  }
2134 }
2135 
2136 
2138 {
2140  m_Host, *this,
2141  run_delay.Sec(), run_delay.NSec(),
2143  m_ExeWatchThread->Run();
2144 }
2145 
2146 
2148 {
2149  if (!m_ExeWatchThread.Empty()) {
2153  }
2154 }
2155 
2156 
2158 {
2159  LOG_POST(Note << "Start dumping jobs");
2160 
2161  // Create the directory if needed
2162  CDir dump_dir(m_DumpPath);
2163  if (!dump_dir.Exists())
2164  dump_dir.Create();
2165 
2166  // Remove the file which reserves the disk space
2168 
2169  // Walk all the queues and dump them
2170  // Note: no need for a lock because it is a shutdown time
2171  bool dump_error = false;
2172  set<string> dumped_queues;
2173  const string lbsm_test_queue("LBSMDTestQueue");
2174  for (TQueueInfo::iterator k = m_Queues.begin();
2175  k != m_Queues.end(); ++k) {
2176  if (NStr::CompareNocase(k->first, lbsm_test_queue) != 0) {
2177  try {
2178  k->second.second->Dump(m_DumpPath);
2179  dumped_queues.insert(k->first);
2180  } catch (const exception & ex) {
2181  dump_error = true;
2182  ERR_POST("Error dumping queue " << k->first << ": " <<
2183  ex.what());
2184  }
2185  }
2186  }
2187 
2188  // Dump the required queue classes. The only classes required are those
2189  // which were used by dynamic queues. The dynamic queue classes may also
2190  // use linked sections
2191  set<string> classes_to_dump;
2192  set<string> linked_sections_to_dump;
2193  set<string> dynamic_queues_to_dump;
2194  for (TQueueInfo::iterator k = m_Queues.begin();
2195  k != m_Queues.end(); ++k) {
2196  if (NStr::CompareNocase(k->first, lbsm_test_queue) == 0)
2197  continue;
2198  if (k->second.second->GetQueueKind() == CQueue::eKindStatic)
2199  continue;
2200  if (dumped_queues.find(k->first) == dumped_queues.end())
2201  continue; // There was a dumping error
2202 
2203  classes_to_dump.insert(k->second.first.qclass);
2205  j = k->second.first.linked_sections.begin();
2206  j != k->second.first.linked_sections.end(); ++j)
2207  linked_sections_to_dump.insert(j->second);
2208  dynamic_queues_to_dump.insert(k->first);
2209  }
2210 
2211 
2212  // Dump classes if so and linked sections if so
2213  if (!classes_to_dump.empty()) {
2214  string qclasses_dump_file_name = m_DumpPath +
2216  string linked_sections_dump_file_name = m_DumpPath +
2218  FILE * qclasses_dump_file = NULL;
2219  FILE * linked_sections_dump_file = NULL;
2220  try {
2221  qclasses_dump_file = fopen(qclasses_dump_file_name.c_str(), "wb");
2222  if (qclasses_dump_file == NULL)
2223  throw runtime_error("Cannot open file " +
2224  qclasses_dump_file_name);
2225  setbuf(qclasses_dump_file, NULL);
2226 
2227  // Dump dynamic queue classes
2228  for (set<string>::const_iterator k = classes_to_dump.begin();
2229  k != classes_to_dump.end(); ++k) {
2230  TQueueParams::const_iterator queue_class =
2231  m_QueueClasses.find(*k);
2232  x_DumpQueueOrClass(qclasses_dump_file, "", *k, false,
2233  queue_class->second);
2234  }
2235 
2236  // Dump dynamic queues: qname and its class.
2238  k = dynamic_queues_to_dump.begin();
2239  k != dynamic_queues_to_dump.end(); ++k) {
2241  x_DumpQueueOrClass(qclasses_dump_file, *k,
2242  q->second.first.qclass, true,
2243  q->second.first);
2244  }
2245 
2246  fclose(qclasses_dump_file);
2247  qclasses_dump_file = NULL;
2248 
2249  // Dump linked sections if so
2250  if (!linked_sections_to_dump.empty()) {
2251  linked_sections_dump_file = fopen(
2252  linked_sections_dump_file_name.c_str(), "wb");
2253  if (linked_sections_dump_file == NULL)
2254  throw runtime_error("Cannot open file " +
2255  linked_sections_dump_file_name);
2256  setbuf(linked_sections_dump_file, NULL);
2257 
2259  k = linked_sections_to_dump.begin();
2260  k != linked_sections_to_dump.end(); ++k) {
2261  map<string, map<string, string> >::const_iterator
2262  j = m_LinkedSections.find(*k);
2263  x_DumpLinkedSection(linked_sections_dump_file, *k,
2264  j->second);
2265  }
2266  fclose(linked_sections_dump_file);
2267  linked_sections_dump_file = NULL;
2268  }
2269  } catch (const exception & ex) {
2270  dump_error = true;
2271  ERR_POST("Error dumping dynamic queue classes and "
2272  "their linked sections. Dynamic queue dumps are lost.");
2273  if (qclasses_dump_file != NULL)
2274  fclose(qclasses_dump_file);
2275  if (linked_sections_dump_file != NULL)
2276  fclose(linked_sections_dump_file);
2277 
2278  // Remove the classes and linked sections files
2279  if (access(qclasses_dump_file_name.c_str(), F_OK) != -1)
2280  remove(qclasses_dump_file_name.c_str());
2281  if (access(linked_sections_dump_file_name.c_str(), F_OK) != -1)
2282  remove(linked_sections_dump_file_name.c_str());
2283 
2284  // Remove dynamic queues dumps
2286  k = dynamic_queues_to_dump.begin();
2287  k != dynamic_queues_to_dump.end(); ++k) {
2288  m_Queues[*k].second->RemoveDump(m_DumpPath);
2289  }
2290  }
2291  }
2292 
2293  if (!dump_error)
2295 
2296  LOG_POST(Note << "Dumping jobs finished");
2297 }
2298 
2299 
2301  const string & qname,
2302  const string & qclass,
2303  bool is_queue,
2304  const SQueueParameters & params)
2305 {
2306  SQueueDescriptionDump descr_dump;
2307 
2308  descr_dump.is_queue = is_queue;
2309  descr_dump.qname_size = qname.size();
2310  memcpy(descr_dump.qname, qname.data(), qname.size());
2311  descr_dump.qclass_size = qclass.size();
2312  memcpy(descr_dump.qclass, qclass.data(), qclass.size());
2313 
2314  if (!is_queue) {
2315  // The other parameters are required for the queue classes only
2316  descr_dump.timeout = (double)params.timeout;
2317  descr_dump.notif_hifreq_interval = (double)params.notif_hifreq_interval;
2318  descr_dump.notif_hifreq_period = (double)params.notif_hifreq_period;
2319  descr_dump.notif_lofreq_mult = params.notif_lofreq_mult;
2320  descr_dump.notif_handicap = (double)params.notif_handicap;
2321  descr_dump.dump_buffer_size = params.dump_buffer_size;
2323  descr_dump.dump_aff_buffer_size = params.dump_aff_buffer_size;
2324  descr_dump.dump_group_buffer_size = params.dump_group_buffer_size;
2325  descr_dump.run_timeout = (double)params.run_timeout;
2326  descr_dump.read_timeout = (double)params.read_timeout;
2327  descr_dump.program_name_size = params.program_name.size();
2328  memcpy(descr_dump.program_name, params.program_name.data(),
2329  params.program_name.size());
2330  descr_dump.failed_retries = params.failed_retries;
2331  descr_dump.read_failed_retries = params.read_failed_retries;
2332  descr_dump.blacklist_time = (double)params.blacklist_time;
2333  descr_dump.read_blacklist_time = (double)params.read_blacklist_time;
2334  descr_dump.max_input_size = params.max_input_size;
2335  descr_dump.max_output_size = params.max_output_size;
2336  descr_dump.subm_hosts_size = params.subm_hosts.size();
2337  memcpy(descr_dump.subm_hosts, params.subm_hosts.data(),
2338  params.subm_hosts.size());
2339  descr_dump.wnode_hosts_size = params.wnode_hosts.size();
2340  memcpy(descr_dump.wnode_hosts, params.wnode_hosts.data(),
2341  params.wnode_hosts.size());
2342  descr_dump.reader_hosts_size = params.reader_hosts.size();
2343  memcpy(descr_dump.reader_hosts, params.reader_hosts.data(),
2344  params.reader_hosts.size());
2345  descr_dump.wnode_timeout = (double)params.wnode_timeout;
2346  descr_dump.reader_timeout = (double)params.reader_timeout;
2347  descr_dump.pending_timeout = (double)params.pending_timeout;
2348  descr_dump.max_pending_wait_timeout =
2349  (double)params.max_pending_wait_timeout;
2350  descr_dump.max_pending_read_wait_timeout =
2351  (double)params.max_pending_read_wait_timeout;
2352  descr_dump.description_size = params.description.size();
2353  memcpy(descr_dump.description, params.description.data(),
2354  params.description.size());
2355  descr_dump.scramble_job_keys = params.scramble_job_keys;
2357  (double)params.client_registry_timeout_worker_node;
2360  descr_dump.client_registry_timeout_admin =
2361  (double)params.client_registry_timeout_admin;
2362  descr_dump.client_registry_min_admins =
2365  (double)params.client_registry_timeout_submitter;
2366  descr_dump.client_registry_min_submitters =
2368  descr_dump.client_registry_timeout_reader =
2369  (double)params.client_registry_timeout_reader;
2370  descr_dump.client_registry_min_readers =
2372  descr_dump.client_registry_timeout_unknown =
2373  (double)params.client_registry_timeout_unknown;
2374  descr_dump.client_registry_min_unknowns =
2376 
2377  // Dump the linked sections prefixes and names in the same order
2378  string prefixes;
2379  string names;
2381  k = params.linked_sections.begin();
2382  k != params.linked_sections.end(); ++k) {
2383  if (!prefixes.empty()) {
2384  prefixes += ",";
2385  names += ",";
2386  }
2387  prefixes += k->first;
2388  names += k->second;
2389  }
2390  descr_dump.linked_section_prefixes_size = prefixes.size();
2391  memcpy(descr_dump.linked_section_prefixes, prefixes.data(),
2392  prefixes.size());
2393  descr_dump.linked_section_names_size = names.size();
2394  memcpy(descr_dump.linked_section_names, names.data(), names.size());
2395  }
2396 
2397  try {
2398  descr_dump.Write(f);
2399  } catch (const exception & ex) {
2400  string msg = "Writing error while dumping queue ";
2401  if (is_queue)
2402  msg += qname;
2403  else
2404  msg += "class " + qclass;
2405  msg += string(": ") + ex.what();
2406  throw runtime_error(msg);
2407  }
2408 }
2409 
2410 
2411 void CQueueDataBase::x_DumpLinkedSection(FILE * f, const string & sname,
2412  const map<string, string> & values)
2413 {
2414  for (map<string, string>::const_iterator k = values.begin();
2415  k != values.end(); ++k) {
2416  SLinkedSectionDump section_dump;
2417 
2418  section_dump.section_size = sname.size();
2419  memcpy(section_dump.section, sname.data(), sname.size());
2420  section_dump.value_name_size = k->first.size();
2421  memcpy(section_dump.value_name, k->first.data(), k->first.size());
2422  section_dump.value_size = k->second.size();
2423  memcpy(section_dump.value, k->second.data(), k->second.size());
2424 
2425  try {
2426  section_dump.Write(f);
2427  } catch (const exception & ex) {
2428  throw runtime_error("Writing error while dumping linked section " +
2429  sname + " values: " + ex.what());
2430  }
2431  }
2432 }
2433 
2434 
2436 {
2437  try {
2438  CDir dump_dir(m_DumpPath);
2439  if (dump_dir.Exists())
2440  dump_dir.Remove();
2441  } catch (const exception & ex) {
2442  ERR_POST("Error removing the dump directory: " << ex.what());
2443  } catch (...) {
2444  ERR_POST("Unknown error removing the dump directory");
2445  }
2446 }
2447 
2448 
2450 {
2451  CDir data_dir(m_DataPath);
2452  if (!data_dir.Exists())
2453  return;
2454 
2455  CDir::TEntries entries = data_dir.GetEntries(
2457  for (CDir::TEntries::const_iterator k = entries.begin();
2458  k != entries.end(); ++k) {
2459  if ((*k)->IsDir())
2460  continue;
2461  if ((*k)->IsLink())
2462  continue;
2463  string entryName = (*k)->GetName();
2464  if (entryName == kDBStorageVersionFileName ||
2465  entryName == kNodeIDFileName ||
2466  entryName == kStartJobIDsFileName ||
2467  entryName == kCrashFlagFileName ||
2468  entryName == kDumpErrorFlagFileName)
2469  continue;
2470 
2471  CFile f(m_DataPath + entryName);
2472  try {
2473  f.Remove();
2474  } catch (...) {}
2475  }
2476 }
2477 
2478 
2479 // Logs the corresponding message if needed and provides the overall reinit
2480 // status.
2482 {
2483  if (x_DoesCrashFlagFileExist()) {
2484  ERR_POST("Reinitialization due to the server "
2485  "did not stop gracefully last time. "
2486  << m_DataPath << " removed.");
2487  m_Server->RegisterAlert(eStartAfterCrash, "Database has been "
2488  "reinitialized due to the server did not "
2489  "stop gracefully last time");
2490  return true;
2491  }
2492 
2493  if (reinit) {
2494  LOG_POST(Note << "Reinitialization due to a command line option. "
2495  << m_DataPath << " removed.");
2496  m_Server->RegisterAlert(eReinit, "Database has been reinitialized due "
2497  "to a command line option");
2498  return true;
2499  }
2500 
2501  if (CDir(m_DataPath).Exists()) {
2502  bool ver_file_exists = CFile(m_DataPath +
2504  bool dump_dir_exists = CDir(m_DumpPath).Exists();
2505 
2506  if (dump_dir_exists && !ver_file_exists) {
2507  // Strange. Some service file exist while the storage version
2508  // does not. It might be that the data dir has been altered
2509  // manually. Let's not start.
2510  NCBI_THROW(CNetScheduleException, eInternalError,
2511  "Error detected: Storage version file is not found "
2512  "while the dump directory exists.");
2513  }
2514 
2515  if (dump_dir_exists && ver_file_exists) {
2516  CFileIO f;
2517  char buf[32];
2518  size_t read_count = 0;
2519  string storage_ver;
2520 
2523 
2524  read_count = f.Read(buf, sizeof(buf));
2525  storage_ver.append(buf, read_count);
2527  f.Close();
2528 
2529  if (storage_ver != NETSCHEDULED_STORAGE_VERSION)
2530  NCBI_THROW(CNetScheduleException, eInternalError,
2531  "Error detected: Storage version mismatch, "
2532  "required: " NETSCHEDULED_STORAGE_VERSION
2533  ", found: " + storage_ver);
2534  }
2535 
2536  if (!dump_dir_exists && ver_file_exists) {
2537  LOG_POST(Note << "Non-empty data directory exists however the "
2538  << kDumpSubdirName
2539  << " subdirectory is not found");
2541  "Non-empty data directory exists "
2542  "however the " + kDumpSubdirName +
2543  " subdirectory is not found");
2544  }
2545  }
2546 
2548  string msg = "The previous instance of the server had problems with "
2549  "dumping the information on the disk. Some queues may be "
2550  "not restored. "
2551  "See the previous instance log for details.";
2552  LOG_POST(Note << msg);
2554  }
2555 
2556  return false;
2557 }
2558 
2559 
2560 CBDB_Env *
2562 {
2563  string trailed_log_path =
2565  string err_file = m_DataPath + "errjsqueue.log";
2566  CBDB_Env * env = new CBDB_Env();
2567 
2568  if (!trailed_log_path.empty())
2569  env->SetLogDir(trailed_log_path);
2570 
2571  env->OpenErrFile(err_file.c_str());
2572 
2573  env->SetLogRegionMax(512 * 1024);
2574  if (params.log_mem_size) {
2575  env->SetLogInMemory(true);
2576  env->SetLogBSize(params.log_mem_size);
2577  } else {
2578  env->SetLogFileMax(200 * 1024 * 1024);
2579  env->SetLogAutoRemove(true);
2580  }
2581 
2583  if (params.private_env)
2584  opt |= CBDB_Env::ePrivate;
2585 
2586  if (params.cache_ram_size)
2587  env->SetCacheSize(params.cache_ram_size);
2588  if (params.mutex_max)
2589  env->MutexSetMax(params.mutex_max);
2590  if (params.max_locks)
2591  env->SetMaxLocks(params.max_locks);
2592  if (params.max_lockers)
2593  env->SetMaxLockers(params.max_lockers);
2594  if (params.max_lockobjects)
2595  env->SetMaxLockObjects(params.max_lockobjects);
2596  if (params.max_trans)
2597  env->SetTransactionMax(params.max_trans);
2601 
2602  env->OpenWithTrans(m_DataPath.c_str(), opt);
2603  GetDiagContext().Extra()
2604  .Print("_type", "startup")
2605  .Print("info", "opened BDB environment")
2606  .Print("private", params.private_env ? "true" : "false")
2607  .Print("max_locks", env->GetMaxLocks())
2608  .Print("transactions",
2610  "syncronous" : "asyncronous")
2611  .Print("max_mutexes", env->MutexGetMax());
2612 
2613  env->SetDirectDB(params.direct_db);
2614  env->SetDirectLog(params.direct_log);
2615 
2616  env->SetCheckPointKB(params.checkpoint_kb);
2617  env->SetCheckPointMin(params.checkpoint_min);
2618 
2619  env->SetLockTimeout(10 * 1000000); // 10 sec
2620 
2621  env->SetTasSpins(5);
2622 
2623  if (env->IsTransactional()) {
2624  env->SetTransactionTimeout(10 * 1000000); // 10 sec
2626  env->CleanLog();
2627  }
2628 
2629  return env;
2630 }
2631 
2632 
2634 {
2635  CFileIO f;
2639  f.Close();
2640 }
2641 
2642 
2643 void
2644 CQueueDataBase::x_ReadDumpQueueDesrc(set<string, PNocase> & dump_static_queues,
2645  map<string, string,
2646  PNocase> & dump_dynamic_queues,
2647  TQueueParams & dump_queue_classes)
2648 {
2649  CDir dump_dir(m_DumpPath);
2650  if (!dump_dir.Exists())
2651  return;
2652 
2653  // The file contains dynamic queue classes and dynamic queue names
2654  string queue_desrc_file_name = m_DumpPath + kQClassDescriptionFileName;
2655  CFile queue_desrc_file(queue_desrc_file_name);
2656  if (queue_desrc_file.Exists()) {
2657  FILE * f = fopen(queue_desrc_file_name.c_str(), "rb");
2658  if (f == NULL)
2659  throw runtime_error("Cannot open the existing dump file "
2660  "for reading: " + queue_desrc_file_name);
2661  SQueueDescriptionDump dump_struct;
2662  try {
2663  while (dump_struct.Read(f) == 0) {
2664  if (dump_struct.is_queue) {
2665  string qname(dump_struct.qname, dump_struct.qname_size);
2666  string qclass(dump_struct.qclass, dump_struct.qclass_size);
2667  dump_dynamic_queues[qname] = qclass;
2668  } else {
2669  SQueueParameters p;
2670  p.qclass = "";
2671  p.timeout = CNSPreciseTime(dump_struct.timeout);
2675  CNSPreciseTime(dump_struct.notif_hifreq_period);
2676  p.notif_lofreq_mult = dump_struct.notif_lofreq_mult;
2677  p.notif_handicap =
2678  CNSPreciseTime(dump_struct.notif_handicap);
2679  p.dump_buffer_size = dump_struct.dump_buffer_size;
2681  dump_struct.dump_client_buffer_size;
2684  dump_struct.dump_group_buffer_size;
2685  p.run_timeout = CNSPreciseTime(dump_struct.run_timeout);
2686  p.read_timeout = CNSPreciseTime(dump_struct.read_timeout);
2687  p.program_name = string(dump_struct.program_name,
2688  dump_struct.program_name_size);
2689  p.failed_retries = dump_struct.failed_retries;
2690  p.read_failed_retries = dump_struct.read_failed_retries;
2691  p.blacklist_time =
2692  CNSPreciseTime(dump_struct.blacklist_time);
2694  CNSPreciseTime(dump_struct.read_blacklist_time);
2695  p.max_input_size = dump_struct.max_input_size;
2696  p.max_output_size = dump_struct.max_output_size;
2697  p.subm_hosts = string(dump_struct.subm_hosts,
2698  dump_struct.subm_hosts_size);
2699  p.wnode_hosts = string(dump_struct.wnode_hosts,
2700  dump_struct.wnode_hosts_size);
2701  p.reader_hosts = string(dump_struct.reader_hosts,
2702  dump_struct.reader_hosts_size);
2703  p.wnode_timeout =
2704  CNSPreciseTime(dump_struct.wnode_timeout);
2705  p.reader_timeout =
2706  CNSPreciseTime(dump_struct.reader_timeout);
2707  p.pending_timeout =
2708  CNSPreciseTime(dump_struct.pending_timeout);
2712  CNSPreciseTime(dump_struct.
2713  max_pending_read_wait_timeout);
2714  p.description = string(dump_struct.description,
2715  dump_struct.description_size);
2716  p.scramble_job_keys = dump_struct.scramble_job_keys;
2718  CNSPreciseTime(dump_struct.
2719  client_registry_timeout_worker_node);
2723  CNSPreciseTime(dump_struct.
2724  client_registry_timeout_admin);
2726  dump_struct.client_registry_min_admins;
2728  CNSPreciseTime(dump_struct.
2729  client_registry_timeout_submitter);
2731  dump_struct.client_registry_min_submitters;
2733  CNSPreciseTime(dump_struct.
2734  client_registry_timeout_reader);
2736  dump_struct.client_registry_min_readers;
2738  CNSPreciseTime(dump_struct.
2739  client_registry_timeout_unknown);
2741  dump_struct.client_registry_min_unknowns;;
2742 
2743  // Unpack linked sections
2744  string dump_prefs(dump_struct.
2745  linked_section_prefixes,
2746  dump_struct.
2747  linked_section_prefixes_size);
2748  string dump_names(dump_struct.
2749  linked_section_names,
2750  dump_struct.
2751  linked_section_names_size);
2752  list<string> prefixes;
2753  list<string> names;
2754  NStr::Split(dump_prefs, ",", prefixes,
2757  NStr::Split(dump_names, ",", names,
2760  list<string>::const_iterator pref_it = prefixes.begin();
2761  list<string>::const_iterator names_it = names.begin();
2762  for ( ; pref_it != prefixes.end() &&
2763  names_it != names.end(); ++pref_it, ++names_it)
2764  p.linked_sections[*pref_it] = *names_it;
2765 
2766  string qclass(dump_struct.qclass, dump_struct.qclass_size);
2767  dump_queue_classes[qclass] = p;
2768  }
2769  }
2770  } catch (const exception & ex) {
2771  fclose(f);
2772  throw;
2773  }
2774  fclose(f);
2775  }
2776 
2777 
2778  CDir::TEntries entries = dump_dir.GetEntries(
2780  for (CDir::TEntries::const_iterator k = entries.begin();
2781  k != entries.end(); ++k) {
2782  if ((*k)->IsDir())
2783  continue;
2784  if ((*k)->IsLink())
2785  continue;
2786  string entry_name = (*k)->GetName();
2787  if (!NStr::StartsWith(entry_name, "db_dump."))
2788  continue;
2789 
2790  string prefix;
2791  string qname;
2792  NStr::SplitInTwo(entry_name, ".", prefix, qname);
2793  if (dump_dynamic_queues.find(qname) == dump_dynamic_queues.end())
2794  dump_static_queues.insert(qname);
2795  }
2796 }
2797 
2798 
2799 set<string, PNocase> CQueueDataBase::x_GetConfigQueues(void)
2800 {
2801  const CNcbiRegistry & reg = CNcbiApplication::Instance()->GetConfig();
2802  set<string, PNocase> queues;
2803  list<string> sections;
2804 
2805  reg.EnumerateSections(&sections);
2806  for (list<string>::const_iterator k = sections.begin();
2807  k != sections.end(); ++k) {
2808  string queue_name;
2809  string prefix;
2810  const string & section_name = *k;
2811 
2812  NStr::SplitInTwo(section_name, "_", prefix, queue_name);
2813  if (NStr::CompareNocase(prefix, "queue") != 0)
2814  continue;
2815  if (queue_name.empty())
2816  continue;
2817  if (queue_name.size() > kMaxQueueNameSize - 1)
2818  continue;
2819  queues.insert(queue_name);
2820  }
2821 
2822  return queues;
2823 }
2824 
2825 
2827 {
2828  // Here: the m_LinkedSections has already been read from the configuration
2829  // file. Let's append the sections from the dump.
2830  CDir dump_dir(m_DumpPath);
2831  if (!dump_dir.Exists())
2832  return;
2833 
2834  string linked_sections_file_name = m_DumpPath + kLinkedSectionsFileName;
2835  CFile linked_sections_file(linked_sections_file_name);
2836 
2837  if (linked_sections_file.Exists()) {
2838  FILE * f = fopen(linked_sections_file_name.c_str(), "rb");
2839  if (f == NULL)
2840  throw runtime_error("Cannot open the existing dump file "
2841  "for reading: " + linked_sections_file_name);
2842  SLinkedSectionDump dump_struct;
2843  map<string, map<string, string> > dump_sections;
2844  try {
2845  while (dump_struct.Read(f) == 0) {
2846  if (m_LinkedSections.find(dump_struct.section) !=
2848  continue;
2849  string sname(dump_struct.section, dump_struct.section_size);
2850  string vname(dump_struct.value_name,
2851  dump_struct.value_name_size);
2852  string val(dump_struct.value, dump_struct.value_size);
2853  dump_sections[sname][vname] = val;
2854  }
2855  } catch (const exception & ex) {
2856  fclose(f);
2857  throw;
2858  }
2859  fclose(f);
2860 
2861  m_LinkedSections.insert(dump_sections.begin(), dump_sections.end());
2862  }
2863 }
2864 
2865 
2867 {
2868  CDir dump_dir(m_DumpPath);
2869  if (!dump_dir.Exists())
2870  return;
2871 
2872  size_t backup_number = 0;
2873  string backup_dir_name;
2874  for ( ; ; ) {
2876  "." +
2877  NStr::NumericToString(backup_number);
2878  if (!CDir(backup_dir_name).Exists())
2879  break;
2880  ++backup_number;
2881  }
2882 
2883  try {
2884  dump_dir.Rename(backup_dir_name);
2885  } catch (const exception & ex) {
2886  ERR_POST("Error renaming the dump directory: " << ex.what());
2887  } catch (...) {
2888  ERR_POST("Unknown error renaming the dump directory");
2889  }
2890 }
2891 
2892 
2894 {
2896 }
2897 
2898 
2900 {
2901  CFile space_file(x_GetDumpSpaceFileName());
2902  if (space_file.Exists()) {
2903  try {
2904  space_file.Remove();
2905  } catch (const exception & ex) {
2906  string msg = "Error removing reserving dump space file: " +
2907  string(ex.what());
2908  ERR_POST(msg);
2910  return false;
2911  }
2912  }
2913  return true;
2914 }
2915 
2916 
2918 {
2919  unsigned int space = m_Server->GetReserveDumpSpace();
2920  if (space == 0)
2921  return;
2922 
2923  CDir dump_dir(m_DumpPath);
2924  if (!dump_dir.Exists()) {
2925  try {
2926  dump_dir.Create();
2927  } catch (const exception & ex) {
2928  string msg = "Error creating dump directory: " + string(ex.what());
2929  ERR_POST(msg);
2931  return;
2932  }
2933  }
2934 
2935  // This will truncate the file if it existed
2936  FILE * space_file = fopen(x_GetDumpSpaceFileName().c_str(), "w");
2937  if (space_file == NULL) {
2938  string msg = "Error opening reserving dump space file " +
2940  ERR_POST(msg);
2942  return;
2943  }
2944 
2945  void * buffer = malloc(kDumpReservedSpaceFileBuffer);
2946  if (buffer == NULL) {
2947  fclose(space_file);
2948  string msg = "Error creating a memory buffer to write into the "
2949  "reserving dump space file";
2950  ERR_POST(msg);
2952  return;
2953  }
2954 
2955  memset(buffer, 0, kDumpReservedSpaceFileBuffer);
2956  while (space > kDumpReservedSpaceFileBuffer) {
2957  errno = 0;
2958  if (fwrite(buffer, kDumpReservedSpaceFileBuffer, 1, space_file) != 1) {
2959  free(buffer);
2960  fclose(space_file);
2961  string msg = "Error writing into the reserving dump space file: " +
2962  string(strerror(errno));
2963  ERR_POST(msg);
2965  return;
2966  }
2968  }
2969 
2970  if (space > 0) {
2971  errno = 0;
2972  if (fwrite(buffer, space, 1, space_file) != 1) {
2973  string msg = "Error writing into the reserving dump space file: " +
2974  string(strerror(errno));
2975  ERR_POST(msg);
2977  }
2978  }
2979 
2980  free(buffer);
2981  fclose(space_file);
2982 }
2983 
2984 
2986 
list< AutoPtr< CDirEntry > > TEntries
Define a list of pointers to directory entries.
Definition: ncbifile.hpp:1646
definition of a Culling tree
Definition: ncbi_tree.hpp:88
void x_RemoveDumpErrorFlagFile(void)
void SetLogRegionMax(unsigned size)
Set logging region size.
Definition: bdb_env.cpp:163
CBDB_Transaction::ETransSync GetTransactionSync() const
Get default syncronicity setting.
Definition: bdb_env.hpp:281
void x_BackupDump(void)
string GetQueueInfo(void) const
CNSPreciseTime pending_timeout
bool x_CheckStopPurge(void)
void SetByKey(const string &key, CJsonNode::TInstance value)
For a JSON object node, insert a new element or update an existing element.
iterator_bool insert(const value_type &val)
Definition: map.hpp:165
map< string, string > linked_sections
size_type size() const
Definition: map.hpp:148
char * buf
CRef< CGetJobNotificationThread > m_NotifThread
bool IsNull(void) const THROWS_NONE
Check if pointer is null – same effect as Empty().
Definition: ncbiobj.hpp:718
string GetString(const string &driver_name, const string &param_name, EErrAction on_error, const string &default_value, const list< string > *synonyms=NULL)
Utility function to get an element of parameter tree Throws an exception when mandatory parameter is ...
const string kNodeIDFileName("NODE_ID")
void x_CreateStorageVersionFile(void)
Definition: dbpivot.c:60
void SetTransactionSync(CBDB_Transaction::ETransSync sync)
Set default syncronicity level.
Definition: bdb_env.cpp:136
unsigned int client_registry_min_unknowns
static CJsonNode NewArrayNode()
Create a new JSON array node.
void EnumerateSections(list< string > *sections, TFlags flags=fAllLayers) const
Enumerate section names.
Definition: ncbireg.cpp:492
TQueueInfo m_Queues
void StopPurgeThread(void)
CRef< CQueue > x_GetLastPurged(void)
TQueueParams m_QueueClasses
char program_name[kMaxQueueLimitsSize]
Definition: ns_db_dump.hpp:192
char qclass[kMaxQueueNameSize]
Definition: ns_db_dump.hpp:179
CFastMutex m_LinkedSectionsGuard
void SetLogDir(const string &log_dir)
Path to directory where transaction logs are to be stored By default it is the same directory as envi...
Definition: bdb_env.cpp:470
void PurgeAffinities(void)
const string & GetQueueName() const
Definition: ns_queue.hpp:165
void PurgeBlacklistedJobs(void)
Definition: ns_queue.cpp:3771
void Init(CBDB_Env &env, const string &path, unsigned int count)
unsigned int PurgeAffinities(void)
Definition: ns_queue.cpp:3691
void x_RemoveBDBFiles(void)
bool Run(TRunMode flags=fRunDefault)
Run the thread.
Definition: ncbithr.cpp:649
void SetDirectDB(bool on_off)
Turn off buffering of databases (DB_DIRECT_DB)
Definition: bdb_env.cpp:612
void Write(FILE *f)
Definition: ns_db_dump.cpp:441
CFastMutex m_ConfigureLock
static list< string > & Split(const CTempString str, const CTempString delim, list< string > &arr, TSplitFlags flags, vector< SIZE_TYPE > *token_pos=NULL)
Split a string using specified delimiters.
Definition: ncbistr.cpp:3344
No ownership is assumed.
Definition: ncbi_types.h:135
Open an existing file, or create a new one.
Definition: ncbifile.hpp:3221
static unsigned cnt[256]
CNetScheduleServer * m_Server
const string kLinkedSectionsFileName("linked_sections.dump")
bool x_CheckOpenPreconditions(bool reinit)
time_t & Sec(void)
void clear()
Definition: map.hpp:169
bool x_PurgeQueue(CQueue &queue, size_t status_to_start, size_t status_to_end, unsigned int start_job_id, unsigned int end_job_id, size_t max_scanned, size_t max_mark_deleted, const CNSPreciseTime &current_time, size_t &total_scanned, size_t &total_mark_deleted)
void ForceTransactionCheckpoint()
Forced checkpoint.
Definition: bdb_env.cpp:658
time_t Configure(const IRegistry &reg, CJsonNode &diff)
const string & GetProgramName(void) const
Definition: ns_clients.hpp:98
bool x_ConfigureQueues(const TQueueParams &queues_from_ini, CJsonNode &diff)
CRef< CServiceThread > m_ServiceThread
void MutexSetMax(unsigned max)
Configure the total number of mutexes.
Definition: bdb_env.cpp:712
corresponds to DB_THREAD
Definition: bdb_env.hpp:64
CNSPreciseTime max_pending_read_wait_timeout
unsigned int GetAddress(void) const
Definition: ns_clients.hpp:84
CNSPreciseTime client_registry_timeout_worker_node
size_t Read(void *buf, size_t count) const
Read file.
Definition: ncbifile.cpp:6312
Final state - read failed.
void SetParameters(const SQueueParameters &params)
Definition: ns_queue.cpp:230
string GetQueueClassesConfig(void) const
container_type::const_iterator const_iterator
Definition: map.hpp:53
void OptimizeMem()
Definition: ns_queue.cpp:2586
void Warning(CExceptionArgs_Base &args)
Definition: ncbiexpt.hpp:1008
string x_GetDumpSpaceFileName(void) const
void x_DeleteQueuesAndClasses(void)
void SetLogFileMax(unsigned int lg_max)
Set maximum size of LOG files.
Definition: bdb_env.cpp:500
CDiagContext & GetDiagContext(void)
Get diag context instance.
Definition: logging.cpp:746
void SetMaxLockers(unsigned max_lockers)
Set the maximum number of locking entities supported by the Berkeley DB environment.
Definition: bdb_env.cpp:557
void x_Open(const SNSDBEnvironmentParams &params, bool reinit)
IRegistry –.
Definition: ncbireg.hpp:74
CNSPreciseTime notif_hifreq_interval
Throw an exception on error.
Definition: ncbi_config.hpp:97
CJsonNode SetHosts(const string &host_names)
Definition: access_list.cpp:54
string
Definition: cgiapp.hpp:437
CNSPreciseTime wnode_timeout
CNSPreciseTime read_blacklist_time
long & NSec(void)
char wnode_hosts[kMaxQueueLimitsSize]
Definition: ns_db_dump.hpp:202
for(len=0;yy_str[len];++len)
void SetRefuseSubmits(bool val)
Definition: ns_queue.hpp:152
string GetString(const string &section, const string &name, const string &default_value, TFlags flags=0) const
Get the parameter string value.
Definition: ncbireg.cpp:316
void ParseVersionString(const string &vstr, string *program_name, CVersionInfo *ver)
Parse string, extract version info and program name (case insensitive)
Definition: version.cpp:285
void SetMaxLockObjects(unsigned lock_obj_max)
see DB_ENV->set_lk_max_objects for more details
Definition: bdb_env.cpp:548
void NotifyListeners(void)
const CNSPreciseTime kTimeNever
void Attach(SQueueDbBlock *block)
Definition: ns_queue.cpp:165
SNSRegistryParameters GetScopeRegistrySettings(void) const
Definition: ns_server.hpp:116
unsigned GetMarkdelBatchSize(void) const
Definition: ns_server.hpp:92
bool Create(void) const
Create the directory using "dirname" passed in the constructor.
Definition: ncbifile.cpp:3970
double client_registry_timeout_submitter
Definition: ns_db_dump.hpp:217
#define NETSCHEDULED_STORAGE_VERSION
Queue databases.
unsigned int client_registry_min_worker_nodes
void x_ReadLinkedSections(const IRegistry &reg, CJsonNode &diff)
static CNcbiApplication * Instance(void)
Singleton method.
Definition: ncbiapp.cpp:96
static string DeleteTrailingPathSeparator(const string &path)
Delete trailing path separator, if any.
Definition: ncbifile.cpp:451
CNSPreciseTime timeout
unsigned int client_registry_min_admins
#define NULL
Definition: ncbistd.hpp:225
void Open(const string &filename, EOpenMode open_mode, EAccessMode access_mode, EShareMode share_mode=eShare)
Open file.
Definition: ncbifile.cpp:6087
unsigned int PurgeJobInfoCache(void)
Definition: ns_queue.cpp:3795
const string kQClassDescriptionFileName("qclass_descr.dump")
void x_AppendDumpLinkedSections(void)
Uint4 linked_section_prefixes_size
Definition: ns_db_dump.hpp:224
void AddClientInfo(const CQueueClientInfo &cinfo)
Definition: queue_vc.hpp:74
#define kEmptyStr
Definition: ncbistr.hpp:120
Netschedule queue client info.
Definition: queue_vc.hpp:50
Merge adjacent delimiters.
Definition: ncbistr.hpp:2284
CNSPreciseTime read_timeout
unsigned int TEnvOpenFlags
OR-ed combination of EEnvOptions.
Definition: bdb_env.hpp:78
unsigned int m_FreeStatusMemCnt
unsigned int client_registry_min_readers
unsigned int CountAvailable(void) const
void x_RemoveDump(void)
unsigned int client_registry_min_submitters
char value[kLinkedSectionValueSize]
Definition: ns_db_dump.hpp:246
void SetLogInMemory(bool on_off)
Configure environment for non-durable in-memory logging.
Definition: bdb_env.cpp:567
void x_ReadDumpQueueDesrc(set< string, PNocase > &dump_static_queues, map< string, string, PNocase > &dump_dynamic_queues, TQueueParams &dump_queue_classes)
const size_t kDumpReservedSpaceFileBuffer
Definition: ns_types.hpp:80
#define ITERATE(Type, Var, Cont)
ITERATE macro to sequence through container elements.
Definition: ncbimisc.hpp:893
unsigned int CountAllJobs(void) const
void RunNotifThread(void)
SPurgeAttributes CheckJobsExpiry(const CNSPreciseTime &current_time, SPurgeAttributes attributes, unsigned int last_job, TJobStatus status)
Definition: ns_queue.cpp:3474
void x_ValidateConfiguration(const TQueueParams &queues_from_ini) const
void SetCacheSize(Uint8 cache_size, int num_caches=1)
Set cache size for the environment.
Definition: bdb_env.cpp:150
CRef< CQueue > OpenQueue(const string &name)
CFastMutex m_PurgeLock
CNSPreciseTime CalculateRuntimePrecision(void) const
unsigned int m_PurgeJobScanned
char qname[kMaxQueueNameSize]
Definition: ns_db_dump.hpp:177
void CheckExecutionTimeout(bool logging)
Definition: ns_queue.cpp:3245
NetSchedule client specs.
void PurgeGroups(void)
string GetQueueNames(const string &sep) const
#define GetBoolNoErr(name, dflt)
Job is ready (computed successfully)
CNSPreciseTime notif_handicap
bool x_ConfigureQueueClasses(const TQueueParams &classes_from_ini, CJsonNode &diff)
#define END_NCBI_SCOPE
End previously defined NCBI scope.
Definition: ncbistl.hpp:101
const size_t kStatusesSize
const string kDumpSubdirName("dump")
SNSRegistryParameters GetGroupRegistrySettings(void) const
Definition: ns_server.hpp:114
void LoadJobsStartIDs(void)
Definition: ns_server.hpp:139
SQueueParameters x_SingleQueueInfo(TQueueInfo::const_iterator found) const
virtual bool Exists(void) const
Check if directory "dirname" exists.
Definition: ncbifile.hpp:3837
const_iterator end() const
Definition: map.hpp:152
map< string, string > GetLinkedSection(const string &section_name) const
void CheckExecutionTimeout(bool logging)
bool IsProgramAllowed(const string &program_name) const
Definition: ns_queue.hpp:797
void NotifyListenersPeriodically(const CNSPreciseTime &current_time)
Definition: ns_queue.cpp:3121
char linked_section_prefixes[kLinkedSectionsList]
Definition: ns_db_dump.hpp:225
unsigned int read_failed_retries
void RunPurgeThread(void)
CNSPreciseTime NotifyExactListeners(void)
Definition: ns_queue.cpp:3181
bool IsSubmitAllowed(unsigned host) const
Definition: ns_queue.hpp:782
void SetTransactionMax(unsigned tx_max)
Set number of active transaction see DB_ENV->set_tx_max for more details.
Definition: bdb_env.cpp:516
CRef< CQueue > x_GetFirst(void)
string GetLinkedSectionConfig(void) const
double client_registry_timeout_admin
Definition: ns_db_dump.hpp:215
Create private directory.
Definition: bdb_env.hpp:67
unsigned GetDeleteBatchSize(void) const
Definition: ns_server.hpp:88
unsigned int dump_buffer_size
void OpenErrFile(const string &file_name)
Open error reporting file for the environment.
Definition: bdb_env.cpp:590
Waiting for execution.
const string kStartJobIDsFileName("STARTJOBIDS")
const_iterator find(const key_type &key) const
Definition: map.hpp:153
CFile –.
Definition: ncbifile.hpp:1523
CDiagContext_Extra Extra(void) const
Create a temporary CDiagContext_Extra object.
Definition: ncbidiag.hpp:2111
unsigned GetScanBatchSize(void) const
Definition: ns_server.hpp:94
void SetDirectLog(bool on_off)
Turn off buffering of log files (DB_DIRECT_LOG)
Definition: bdb_env.cpp:622
void Write(FILE *f)
Definition: ns_db_dump.cpp:379
const bool & IsLogCleaningThread() const
Definition: ns_server.hpp:68
void StopExecutionWatcherThread(void)
#define ERR_POST(message)
Error posting with file, line number information but without error codes.
Definition: ncbidiag.hpp:185
void x_CreateCrashFlagFile(void)
Explicitly canceled.
void x_RemoveCrashFlagFile(void)
static bool SplitInTwo(const CTempString str, const CTempString delim, string &str1, string &str2, TSplitFlags flags=0)
Split a string into two pieces using the specified delimiters.
Definition: ncbistr.cpp:3515
CDir –.
Definition: ncbifile.hpp:1610
unsigned int DeleteBatch(unsigned int max_deleted)
Definition: ns_queue.cpp:3609
const bool & IsLogNotificationThread() const
Definition: ns_server.hpp:66
bool Empty(void) const THROWS_NONE
Check if CRef is empty – not pointing to any object, which means having a null value.
Definition: ncbiobj.hpp:702
string PrintTransitionCounters(void)
double max_pending_read_wait_timeout
Definition: ns_db_dump.hpp:209
double f(double x_, const double &y_)
Definition: njn_root.hpp:188
void CreateDynamicQueue(const CNSClientId &client, const string &qname, const string &qclass, const string &description="")
static void PrintServerWide(size_t affinities)
virtual bool Remove(TRemoveFlags flags=eRecursive) const
Remove a directory entry.
Definition: ncbifile.cpp:2585
void StopPurge(void)
void SetLockTimeout(unsigned timeout)
Set timeout value for locks in microseconds (1 000 000 in sec)
Definition: bdb_env.cpp:698
const bool & IsLogStatisticsThread() const
Definition: ns_server.hpp:72
void x_DumpLinkedSection(FILE *f, const string &sname, const map< string, string > &values)
void Close(void)
Close file.
Definition: ncbifile.cpp:6290
const unsigned kMaxQueueNameSize
Definition: ns_db.hpp:56
unsigned int dump_client_buffer_size
void RequestStop()
Schedule thread Stop.
char section[kLinkedSectionValueNameSize]
Definition: ns_db_dump.hpp:244
unsigned MutexGetMax()
Get number of mutexes.
Definition: bdb_env.cpp:720
static wxAcceleratorEntry entries[3]
TQueueParams x_ReadIniFileQueueDescriptions(const IRegistry &reg, const TQueueParams &classes)
void DeleteDynamicQueue(const CNSClientId &client, const string &qname)
double client_registry_timeout_reader
Definition: ns_db_dump.hpp:219
unsigned int PurgeGroups(void)
Definition: ns_queue.cpp:3727
const string kDBStorageVersionFileName("DB_STORAGE_VER")
const string kDumpReservedSpaceFileName("space_keeper.dat")
void SetCheckPointKB(unsigned kb)
If the kb parameter is non-zero, a checkpoint will be done if more than kbyte kilobytes of log data h...
Definition: bdb_env.hpp:331
void SetLogBSize(unsigned lg_bsize)
Set the size of the in-memory log buffer, in bytes.
Definition: bdb_env.cpp:169
static bool StartsWith(const CTempString str, const CTempString start, ECase use_case=eCase)
Check if a string starts with a specified prefix value.
Definition: ncbistr.hpp:5275
virtual bool Exists(void) const
Check existence of file.
Definition: ncbifile.hpp:3810
bool Rename(const string &new_path, TRenameFlags flags=fRF_Default)
Rename entry.
Definition: ncbifile.cpp:2448
double client_registry_timeout_worker_node
Definition: ns_db_dump.hpp:213
bool empty() const
Definition: map.hpp:149
void StopNotifThread(void)
unsigned int dump_aff_buffer_size
unsigned int max_input_size
void SetMaxLocks(unsigned locks)
Set max number of locks in the database.
Definition: bdb_env.cpp:506
File can be read and written.
Definition: ncbifile.hpp:3233
void SetTransactionTimeout(unsigned timeout)
Set timeout value for transactions in microseconds (1 000 000 in sec)
Definition: bdb_env.cpp:705
#define LOG_POST(message)
This macro is deprecated and it's strongly recomended to move in all projects (except tests) to macro...
Definition: ncbidiag.hpp:198
CRef< CQueue > x_GetQueueAt(unsigned int index)
CJsonNode x_DetectChangesInLinkedSection(const map< string, string > &old_values, const map< string, string > &new_values)
TQueueParams x_ReadIniFileQueueClassDescriptions(const IRegistry &reg)
CNSPreciseTime SendExactNotifications(void)
#define GetSizeNoErr(name, dflt)
void x_CreateAndMountQueue(const string &qname, const SQueueParameters &params, SQueueDbBlock *queue_db_block)
CRef< CQueue > x_GetNext(const string &current_name)
CNSPreciseTime run_timeout
char * strerror(int n)
Definition: pcregrep.c:511
parent_type::const_iterator const_iterator
Definition: set.hpp:79
void AppendString(const string &value)
For an array node, add a string node at the end of the array.
unsigned GetMaxLocks()
Get max locks.
Definition: bdb_env.cpp:537
static CJsonNode NewObjectNode()
Create a new JSON object node.
void x_CreateDumpErrorFlagFile(void)
Uint4 client_registry_min_unknowns
Definition: ns_db_dump.hpp:222
bool Read(const IRegistry &reg, const string &sname)
size_t GetSize() const
For a container node (that is, either an array or an object), return the number of elements in the co...
CNSPreciseTime client_registry_timeout_reader
Uint4 client_registry_min_submitters
Definition: ns_db_dump.hpp:218
void PurgeJobInfoCache(void)
CDiagContext_Extra & Print(const string &name, const string &value)
The method does not print the argument, but adds it to the string.
Definition: ncbidiag.cpp:2189
bool x_DoesCrashFlagFileExist(void) const
Uint4 client_registry_min_worker_nodes
Definition: ns_db_dump.hpp:214
CRequestExecutor & m_Executor
T min(T x_, T y_)
TEntries GetEntries(const string &mask=kEmptyStr, TGetEntriesFlags flags=0) const
Get directory entries based on the specified "mask".
Definition: ncbifile.cpp:3793
void x_DumpQueueOrClass(FILE *f, const string &qname, const string &qclass, bool is_queue, const SQueueParameters &params)
unsigned int x_CountQueuesToAdd(const TQueueParams &queues_from_ini) const
CNSPreciseTime client_registry_timeout_admin
void TransactionCheckPoint(bool clean_log=false)
Force transaction checkpoint.
SNSRegistryParameters GetAffRegistrySettings(void) const
Definition: ns_server.hpp:112
char value_name[kLinkedSectionValueNameSize]
Definition: ns_db_dump.hpp:245
CQueueDbBlockArray m_QueueDbBlockArray
void RunExecutionWatcherThread(const CNSPreciseTime &run_delay)
bool IsTransactional() const
Return TRUE if environment has been open as transactional.
Definition: bdb_env.cpp:668
Process information in the NCBI Registry, including working with configuration files.
Syncronous transaction.
Definition: bdb_trans.hpp:69
unsigned x_PurgeUnconditional(void)
CNSPreciseTime client_registry_timeout_unknown
const bool & IsLogExecutionWatcherThread() const
Definition: ns_server.hpp:70
bool IsMatchingClient(const CQueueClientInfo &cinfo) const
Definition: queue_vc.hpp:106
#define GetUIntNoErr(name, dflt)
void EnumerateEntries(const string &section, list< string > *entries, TFlags flags=fAllLayers) const
Enumerate parameter names for a specified section.
Definition: ncbireg.cpp:509
CRef –.
Definition: ncbiobj.hpp:616
const string kDumpErrorFlagFileName("DUMP_ERROR_FLAG")
SQueueParameters QueueInfo(const string &qname) const
static int errno
Definition: dblib.c:54
void x_OptimizeStatusMatrix(const CNSPreciseTime &current_time)
Final state - read confirmed.
void InitNodeID(const string &db_path)
Definition: ns_server.cpp:680
void erase(iterator pos)
Definition: map.hpp:167
unsigned int max_output_size
set< string, PNocase > x_GetConfigQueues(void)
static CNSPreciseTime Current(void)
CQueueDataBase(CNetScheduleServer *server, const SNSDBEnvironmentParams &params, bool reinit)
string GetPrintableParameters(bool include_class, bool url_encoded) const
CNSPreciseTime reader_timeout
CBackgroundHost & m_Host
bool IsAllowed(unsigned int ha) const
Definition: access_list.cpp:42
double GetPurgeTimeout(void) const
Definition: ns_server.hpp:96
const unsigned int & GetJobCountersInterval() const
Definition: ns_server.hpp:76
void SetLogAutoRemove(bool on_off)
If set, Berkeley DB will automatically remove log files that are no longer needed.
Definition: bdb_env.cpp:637
bool AnyJobs(void) const
const TTreeType * FindSubNode(const TKeyType &key) const
Non recursive linear scan of all subnodes, with key comparison.
Definition: ncbi_tree.hpp:906
Case sensitive compare.
Definition: ncbistr.hpp:1155
void SerializeJobsStartIDs(void)
Definition: ns_server.hpp:141
const TParamTree * GetTree() const
static void TruncateSpacesInPlace(string &str, ETrunc where=eTrunc_Both)
Truncate spaces in a string (in-place)
Definition: ncbistr.cpp:3072
bool QueueExists(const string &qname) const
void x_CreateSpaceReserveFile(void)
EJobStatus
Job status codes.
else result
Definition: token2.c:20
void SetCheckPointMin(unsigned m)
If the m parameter is non-zero, a checkpoint will be done if more than min minutes have passed since ...
Definition: bdb_env.hpp:335
void SetTasSpins(unsigned tas_spins)
Specify that test-and-set mutexes should spin tas_spins times without blocking.
Definition: bdb_env.cpp:579
CNcbiRegistry –.
Definition: ncbireg.hpp:825
static string MakePath(const string &dir=kEmptyStr, const string &base=kEmptyStr, const string &ext=kEmptyStr)
Assemble a path from basic components.
Definition: ncbifile.cpp:399
unsigned int dump_group_buffer_size
bool IsDrainShutdown(void) const
Definition: ns_server.hpp:118
Failed to run (execution timeout)
Wrapper around Berkeley DB transaction structure.
File can be read.
Definition: ncbifile.hpp:3231
CBDB_Env * x_CreateBDBEnvironment(const SNSDBEnvironmentParams &params)
Create a new file, or truncate an existing one.
Definition: ncbifile.hpp:3217
Class for support low level input/output for files.
Definition: ncbifile.hpp:3270
char subm_hosts[kMaxQueueLimitsSize]
Definition: ns_db_dump.hpp:200
bool x_RemoveSpaceReserveFile(void)
Suppress "self recursive" elements (the directories "." and "..").
Definition: ncbifile.hpp:1651
CRef< CJobQueueExecutionWatcherThread > m_ExeWatchThread
void TransactionCheckpoint()
Flush the underlying memory pools, logs and data bases.
Definition: bdb_env.cpp:649
void PrintStatistics(size_t &aff_count)
void PrintJobCounters(void)
SQueueDbBlock * Get(int pos)
double client_registry_timeout_unknown
Definition: ns_db_dump.hpp:221
void OpenWithTrans(const string &db_home, TEnvOpenFlags opt=0)
Open environment using transaction.
Definition: bdb_env.cpp:239
#define NCBI_THROW(exception_class, err_code, message)
Generic macro to throw an exception, given the exception class, error code and message string...
Definition: ncbiexpt.hpp:546
unsigned int GetReserveDumpSpace(void) const
Definition: ns_server.hpp:211
void PurgeClientRegistry(void)
BDB environment object a collection including support for some or all of caching, locking...
Definition: bdb_env.hpp:60
CRef< CJobQueueCleanerThread > m_PurgeThread
TObjectType & GetObject(void)
Get object.
Definition: ncbiobj.hpp:983
static const struct name_t names[]
CVersionInfo version_info
Definition: queue_vc.hpp:53
unsigned int notif_lofreq_mult
Non-durable asyncronous transaction.
Definition: bdb_trans.hpp:70
const string kCrashFlagFileName("CRASH_FLAG")
const_iterator begin() const
Definition: map.hpp:151
All clients registered to connect.
Definition: queue_vc.hpp:68
void RunServiceThread(void)
bool x_DoesDumpErrorFlagFileExist(void) const
unsigned int failed_retries
CNSPreciseTime max_pending_wait_timeout
static const char * prefix[]
Definition: pcregrep.c:251
string GetQueueConfig(void) const
NetScheduler threaded server.
Definition: ns_server.hpp:56
static HENV env
Definition: transaction2.c:41
void PurgeClientRegistry(const CNSPreciseTime &current_time)
Definition: ns_queue.cpp:3778
NetSchedule internal exception.
void StopServiceThread(void)
int Read(FILE *f)
Definition: ns_db_dump.cpp:447
void StaleWNodes(void)
char description[kMaxDescriptionSize]
Definition: ns_db_dump.hpp:211
static CNetScheduleAPI::EJobStatus statuses_to_delete_from[]
map< string, map< string, string > > m_LinkedSections
JSON node abstraction.
static int CompareNocase(const CTempString str, SIZE_TYPE pos, SIZE_TYPE n, const char *pattern)
Case-insensitive compare of a substring with a pattern.
Definition: ncbistr.cpp:170
CNSPreciseTime blacklist_time
Truncate trailing spaces only.
Definition: ncbistr.hpp:2083
ESERV_Flag val
void CleanLog()
Remove all non-active log files.
Definition: bdb_env.cpp:680
void Join(void **exit_data=0)
Wait for the thread termination.
Definition: ncbithr.cpp:781
#define BEGIN_NCBI_SCOPE
Define ncbi namespace.
Definition: ncbistl.hpp:98
static CJsonNode NewStringNode(const string &value)
Create a new JSON string node.
bool WasDBDrained(void) const
Definition: ns_server.hpp:120
bool ReadQueue(const IRegistry &reg, const string &sname, const map< string, SQueueParameters, PNocase > &queue_classes, vector< string > &warnings)
char linked_section_names[kLinkedSectionsList]
Definition: ns_db_dump.hpp:227
bool IsAdmin(void) const
Definition: ns_clients.hpp:112
bool ReadQueueClass(const IRegistry &reg, const string &sname, vector< string > &warnings)
void StaleNodes(const CNSPreciseTime &current_time)
Definition: ns_queue.cpp:3761
static string NumericToString(TNumeric value, TNumToStringFlags flags=0, int base=10)
Convert numeric value to string.
Definition: ncbistr.hpp:4277
static string AddTrailingPathSeparator(const string &path)
Add trailing path separator, if needed.
Definition: ncbifile.cpp:441
string GetQueueClassesInfo(void) const
static uschar * buffer
Definition: pcretest.c:187
unsigned int CountActiveJobs(void) const
CNSPreciseTime client_registry_timeout_submitter
void Reset(void)
Reset reference object.
Definition: ncbiobj.hpp:756
CNSPreciseTime notif_hifreq_period
void RegisterAlert(EAlertType alert_type, const string &message)
Definition: ns_server.cpp:669
void PurgeBlacklistedJobs(void)
const unsigned int & GetStatInterval() const
Definition: ns_server.hpp:74
Berkeley BDB file cursor.
string client_name
Definition: queue_vc.hpp:52
char reader_hosts[kMaxQueueLimitsSize]
Definition: ns_db_dump.hpp:204
string PrintJobsStat(const CNSClientId &client)
void WakeupNotifThread(void)
size_t Write(const void *buf, size_t count) const
Write file.
Definition: ncbifile.cpp:6338
virtual bool Remove(TRemoveFlags flags=eRecursive) const
Delete existing directory.
Definition: ncbifile.cpp:4158
Modified on Sun Jul 24 16:17:11 2016 by modify_doxy.py rev. 506947