Primary Components
In our example, all of the files to be analyzed are on a single computer (node) of the cluster. We know which computer they’re on, but we don’t know where they are. So the utility has to search the entire computer for the files that fit our search criteria. Considering that any one node on the cluster has several hundred gigabytes of file storage, locating the files that meet our search criteria requires work. Since all of the files are on a single machine, and the nodes don’t share a filesystem, the rest of the nodes on the cluster are not involved in the search.
Table 1 describes the nine basic components of the text analysis utility.
Table 1 Basic components of the text analysis utility.
Component |
Description |
A |
Locates files that match the initial search criteria. |
B |
Parses located files into containers. |
C |
Distributes work to nodes of the cluster. |
D |
Performs text analysis. |
E |
Receives results from nodes of the cluster. |
F |
list<string> ListFiles contains the filenames of the files to be searched. |
G |
list<string> Document contains the tokens for a given file. |
H |
The analysis object contains the result of a single text-file analysis. |
I |
list<analysis> AnalysisResult contains a list of analysis objects. |
Components A and E are multithreaded. That is, the work that components A and E does is divided into a series of threads that can be executed in parallel. In component A, we use multithreading to speed up the search for the files that meet our initial search criteria. In component E, the manager node has assigned a listener thread to each worker node. As each worker node completes an analysis, it sends the results back to the manager node, which routes the results to the appropriate listener thread.
Keep in mind that threads are lightweight processes. Compared to a process, a thread requires less operating system overhead. Threads are easier than processes to create, manipulate, and maintain. In this utility, multithreading is accomplished through the use of the Pthread Library, which supplies an API to create and manage the threads in an application. The Pthread Library is based on a standardized programming interface for the creation and maintenance of threads. The thread interface has been specified by the IEEE standards committee in the POSIX 1003.1c standard.
The Pthread Library contains more than 60 functions, which can be divided into three categories. Table 2 shows the categories and functional breakdown.
Table 2 Three categories of the Pthread Library functions.
Thread Management Functions |
Mutex Functions |
Condition Variable Functions |
Thread configuration Thread cancellation Thread scheduling Thread-specific data signals Thread attribute functions: Thread attribute configuration Thread attribute stack configuration Thread attribute scheduling configuration |
Mutex configuration Priority management Mutex attribute functions: Mutex attribute configuration Mutex attribute protocol Mutex attribute priority management |
Condition variable configuration Condition variable attribute functions: Condition variable attribute configuration Condition variable sharing functions |
Component A of the utility searches the root directory for other directories. If 5 directories are found, 5 threads are created; if 10,000 directories are found, 10,000 threads are created; and so on. Obviously, the maximum number of threads possible is dictated by the available operating system resources . The sysconf() function can be called to determine the threads available by using the following syntax:
#include <unistd.h> int main(int argc,char *argv[]) { ... sysconf(_SC_THREAD_THREADS_MAX); ... }
One thread is assigned to search each directory and its subdirectories. The thread searches its directories recursively, looking only for filenames that meet the initial search criteria. So if there were 20 root directories to start with, there would be 20 threads executing in parallel, searching each subdirectory for files that matched the initial search criteria. Once a filename is found, the thread places the complete filename (including path) into a list<string> container.
Listing 1 shows the source code that each thread uses to search the directories.
Listing 1 Code for searching directories.
1 2 #include <dirent.h> 3 4 #include <limits.h> 5 #include <unistd.h> 6 #include <sys/types.h> 7 #include <sys/stat.h> 8 #include <pthread.h> 9 #include "direct-files.h" 10 #include <iostream> 11 #include <string> 12 #include <vector> 13 #include <list> 14 15 16 pthread_mutex_t ListFileMutex = PTHREAD_MUTEX_INITIALIZER; 17 18 #ifndef PATH_MAX 19 const int PATH_MAX = 300; 20 #endif 21 22 23 extern vector<string> ListDirectories; 24 extern vector<string> ListFiles; 25 int filecount = 0; 26 int dircount = 0; 27 28 int isDirectory(const char *FileName) 29 { 30 struct stat StatBuffer; 31 32 lstat(FileName, &StatBuffer); 33 if((StatBuffer.st_mode & S_IFMT) == -1) 34 { 35 cerr << "could not get stats on file" << endl; 36 return (0); 37 } 38 else{ 39 if((StatBuffer.st_mode & S_IFMT) == S_IFDIR){ 40 return (1); 41 } 42 else{ 43 return (0); 44 } 45 } 46 } 47 48 49 int isRegular(string FileName,uid_t ID,off_t size) 50 { 51 struct stat StatBuffer; 52 53 lstat(FileName.c_str(),&StatBuffer); 54 if((StatBuffer.st_mode & S_IFMT) == S_IFDIR) 55 { 56 cerr << "could not get stats on file" << endl; 57 return (0); 58 } 59 else{ 60 if((StatBuffer.st_mode & S_IFMT) == S_IFREG) { 61 if((StatBuffer.st_uid == ID) && (StatBuffer.st_size <= size)){ 62 return (1); 63 } 64 } 65 else{ 66 return (0); 67 } 68 } 69 return (0); 70 } 71 72 73 void listFiles(const char *CurrentDir,uid_t ID,off_t Size) 74 { 75 DIR *DirP; 76 string FileName; 77 string Temp; 78 struct dirent *EntryP; 79 char Name[PATH_MAX +1]; 80 81 chdir(CurrentDir); 82 DirP = opendir(CurrentDir);//open the directory 83 84 if(DirP == NULL){ 85 cerr << "could not open Directory: " << endl; 86 cerr << CurrentDir << endl; 87 return; 88 } 89 90 EntryP = readdir(DirP); //read the directory 91 92 while(EntryP != NULL) 93 { 94 Temp.erase(); 95 Temp.assign(EntryP->d_name); 96 if((Temp != ".") && (Temp != "..")){ 97 FileName.assign(CurrentDir); 98 FileName.append(1,’/’); 99 FileName.append(Temp); 100 if(isDirectory(FileName.c_str())){ 101 listFiles(FileName.c_str(),ID,Size); 102 } 103 else{ 104 if(isRegular(FileName.c_str(),ID,Size)){ 105 pthread_mutex_lock(&ListFileMutex); 106 ListFiles.push_back(FileName); 107 pthread_mutex_unlock(&ListFileMutex); 108 filecount++; 109 } 110 } 111 } 112 EntryP = readdir(DirP); //read the next file 113 114 } 115 closedir(DirP); 116 } 117 118 119 120 121 void listDirectories(const char *CurrentDir) 122 { 123 DIR *DirP; 124 string FileName; 125 string Temp; 126 struct dirent *EntryP; 127 char Name[PATH_MAX +1]; 128 129 130 chdir(CurrentDir); 131 DirP = opendir(CurrentDir);//open the directory 132 133 if(DirP == NULL){ 134 cerr << "could not open directory in ListDirectories()" << endl; 135 return; 136 } 137 138 EntryP = readdir(DirP); //read the directory 139 140 while(EntryP != NULL) 141 { 142 Temp.erase(); 143 Temp.assign(EntryP->d_name); 144 if((Temp != ".") && (Temp != "..")){ 145 FileName.erase(); 146 FileName = CurrentDir; 147 FileName.append(1,’/’); 148 FileName.append(Temp); 149 if(isDirectory(FileName.c_str())){ 150 ListDirectories.push_back(FileName); 151 dircount++; 152 } 153 } 154 EntryP = readdir(DirP); //read the next file 155 156 } 157 closedir(DirP); 158 } 159 160 161 162 163
Notice in line 99 of Listing 1 that once a file is located it’s placed in a container named ListFiles.
To have threads execute the listFiles() function in Listing 1, it’s first necessary to create the thread and assign the thread a thread function. Threads are created and assigned their thread function using the following:
pthread_create() function
Table 3 shows the arguments and their descriptions for the pthread_create() function.
Table 3 Arguments and descriptions for the pthread_create() function.
Function/Parameters |
Description |
int pthread_create(pthread_t *restrict thread, const pthread_attr_t *restrict attr, void *(*start_routine)(void*), void *restrict arg); |
Creates a new thread in the address space of a process. If the function creates the thread successfully, it will return the thread ID and store the value in the thread parameter. |
pthread_t *restrict thread |
Points to a thread handle or thread ID of the thread that will be created. |
const pthread_attr_t *restrict attr |
An attribute object that contains the attributes of the newly created thread. |
void *(*start_routine)(void*) |
The function that will be immediately executed by the newly created thread. |
void *restrict arg |
Contains the arguments for the function start_routine. |
The thread function traverse() is shown in Listing 2.
Listing 2 The traverse() function.
1 ... 2 3 void *traverse(void *Arg) 4 { 5 6 lookup_key *LookupKey; 7 LookupKey = static_cast<lookup_key *>(Arg); 8 listFiles(LookupKey->DirectoryName.c_str(),LookupKey->Uid,SearchSize); 9 delete LookupKey; 10 pthread_exit(NULL); 11 return(NULL); 12 } 13 14 ...
The traverse() function calls the listFiles() function, which actually does the recursive directory search.
The primary objects that the utility manipulates are components F thru I (refer to Table 1). Our goal is to move these objects between nodes on the cluster in a straightforward and simple fashion. Since components F thru I are compound objects, transmitting them between nodes in the cluster requires some work. We used the interfaces classes discussed in parts 1 and 2 of this series to simplify and streamline the work required. Of particular interest was how the list<string> Document and list<analysis> AnalysisResult objects were transmitted between nodes in the cluster. For example, the analysis object is a compound object:
struct analysis{ int SensitiveOccurrence; string FileName; list<string> MonitoredTokens; list<string> MissingTokens; void reset(void); };
The analysis object consists of a string object and two list objects. Using pvm_send() or mpi_send() routines to transmit analysis objects between nodes would be very messy. It would also fog up the logic of components C and E. Component C is implemented by the distributeWork() function shown in Listing 3 and component E is implemented by the getAnalysis() function shown in Listing 4.
Listing 3 The distributeWork() function.
1 void *distributeWork(void *Arg) 2 { 3 4 int N; 5 string FileName; 6 int ReturnCode = 0; 7 int *Value; 8 Value = static_cast<int *>(Arg); 9 opvm_stream Destination[NumProcs]; 10 for(N = 0;N <TaskId.size();N++) 11 { 12 Destination[N].messageId(1); 13 Destination[N].taskId(TaskId[N]); 14 Destination[N] << WorkLoad; 15 } 16 int CurrentFile; 17 CurrentFile = 0; 18 if(FileQueue.size() > 0){ 19 while(FileQueue.size() >= *Value) 20 { 21 for(N = 0; N < WorkSize;N++) 22 { 23 FileName.assign(FileQueue.front()); 24 FileQueue.pop_front(); 25 if((getFile(FileName)) && (Document.size() > 0)){ 26 Destination[N] << FileName; 27 Destination[N] << Document; 28 Document.erase(Document.begin(),Document.end()); 29 } 30 31 } 32 33 } 34 35 } 36 pthread_exit(NULL); 37 }
Lines 26 and 27 in Listing 3 show how sending the Document object is simplified by use of overloading the << insertor operator.
Listing 4 The getAnalysis() function.
1 void *getAnalysis(void *Arg) 2 { 3 4 int N; 5 int Nth; 6 ipvm_stream Source; 7 analysis Result; 8 for(N =0;N < ListFiles.size();N++) 9 { 10 for(Nth = 0;Nth < ThreadId.size();Nth++) 11 { 12 13 Source.messageId(1); 14 Source.taskId(ThreadId[Nth]); 15 pthread_mutex_lock(&AnalysisMutex); 16 Source >> Result; 17 AnalysisResult.push_back(Result); 18 pthread_mutex_unlock(&AnalysisMutex); 19 20 } 21 22 } 23 return(NULL); 24 }
Line 16 in Listing 4 is used to receive an analysis object from a worker node. This is another example of the simplification that can take place when interface classes are used properly. It’s important to note that multiple threads have the potential of inserting analysis objects into the AnalysisResult list container at the same time. This makes AnalysisResult one of the critical sections in the multithreaded processing that the utility does. On lines 15 and 18 in Listing 4, we use a mutex to regulate access to that critical section. The distributeWork() function in Listing 3 and the getAnalysis() function in Listing 4 are executed by threads that belong to the manager node. The distributeWork() function gives the worker node the actual work to be done, and the getAnalysis() function receives the analysis results from the worker nodes and places those results in a list<analysis> AnalysisResult container.