NCBI C Toolkit Cross Reference

C/algo/blast/api/hspstream_queue.c


  1 /*  $Id: hspstream_queue.c,v 1.9 2007/07/27 18:03:11 papadopo Exp $
  2  * ===========================================================================
  3  *
  4  *                            PUBLIC DOMAIN NOTICE
  5  *               National Center for Biotechnology Information
  6  *
  7  *  This software/database is a "United States Government Work" under the
  8  *  terms of the United States Copyright Act.  It was written as part of
  9  *  the author's official duties as a United States Government employee and
 10  *  thus cannot be copyrighted.  This software/database is freely available
 11  *  to the public for use. The National Library of Medicine and the U.S.
 12  *  Government have not placed any restriction on its use or reproduction.
 13  *
 14  *  Although all reasonable efforts have been taken to ensure the accuracy
 15  *  and reliability of the software and data, the NLM and the U.S.
 16  *  Government do not and cannot warrant the performance or results that
 17  *  may be obtained by using this software or data. The NLM and the U.S.
 18  *  Government disclaim all warranties, express or implied, including
 19  *  warranties of performance, merchantability or fitness for any particular
 20  *  purpose.
 21  *
 22  *  Please cite the author in any work or product based on this material.
 23  *
 24  * ===========================================================================
 25  *
 26  * Author:  Ilya Dondoshansky
 27  *
 28  */
 29 
 30 /** @file hspstream_queue.c
 31  * Implementation of the BlastHSPStream interface for producing BLAST results
 32  * on the fly.
 33  */
 34 
 35 #ifndef SKIP_DOXYGEN_PROCESSING
 36 static char const rcsid[] = 
 37     "$Id: hspstream_queue.c,v 1.9 2007/07/27 18:03:11 papadopo Exp $";
 38 #endif /* SKIP_DOXYGEN_PROCESSING */
 39 
 40 
 41 #include <algo/blast/core/blast_hits.h>
 42 #include <algo/blast/api/hspstream_queue.h>
 43 #include <ncbithr.h>
 44 
 45 /** @addtogroup CToolkitAlgoBlast
 46  *
 47  * @{
 48  */
 49 
 50 /** Default hit saving stream methods */
 51 
 52 /** Deallocate memory for the BlastHSPStream with an HSP list queue data 
 53  * structure. 
 54  * @param hsp_stream HSP stream to free [in]
 55  * @return NULL
 56  */
 57 static BlastHSPStream* 
 58 BlastHSPListQueueFree(BlastHSPStream* hsp_stream) 
 59 {
 60    BlastHSPListQueueData* stream_data = 
 61       (BlastHSPListQueueData*) GetData(hsp_stream);
 62    ListNode* node;
 63 
 64    NlmSemaDestroy(stream_data->m_resultsSema);
 65    NlmMutexDestroy(stream_data->m_resultsMutex);
 66 
 67    for (node = stream_data->m_queueStart; node; node = node->next) {
 68       node->ptr = (void*) Blast_HSPListFree((BlastHSPList*)node->ptr);
 69    }
 70    stream_data->m_queueStart = ListNodeFree(stream_data->m_queueStart);
 71    sfree(stream_data);
 72    sfree(hsp_stream);
 73    return NULL;
 74 }
 75 
 76 /** Read one HSP list from a queue of HSP lists. If the queue is empty, this 
 77  * function waits for more results to be written, unless results queue is 
 78  * already closed for writing.
 79  * @param hsp_stream HSP list stream to read from [in]
 80  * @param hsp_list_out The read HSP list. NULL, if there is nothing left 
 81  *                     in the queue to read.
 82  * @return Status: success, error or end of reading.
 83  */
 84 static int 
 85 BlastHSPListQueueRead(BlastHSPStream* hsp_stream, 
 86                       BlastHSPList** hsp_list_out) 
 87 {
 88    BlastHSPListQueueData* stream_data = 
 89       (BlastHSPListQueueData*) GetData(hsp_stream);
 90    int status = kBlastHSPStream_Error;
 91 
 92    /* Lock the mutex */
 93    NlmMutexLockEx(&stream_data->m_resultsMutex);
 94 
 95    if (!stream_data->m_writingDone) {
 96       while (!stream_data->m_writingDone && !stream_data->m_queueStart) {
 97          /* Decrement the semaphore count to 0, then wait for it to be 
 98           * incremented. Note that mutex must be locked whenever the 
 99           * contents of the stream are checked, but it must be unlocked
100           * for the semaphore wait. */
101          NlmMutexUnlock(stream_data->m_resultsMutex);
102          NlmSemaWait(stream_data->m_resultsSema);
103          NlmMutexLockEx(&stream_data->m_resultsMutex);
104       }
105    }
106 
107    if (!stream_data->m_queueStart) {
108       /* Nothing in the queue, but no more writing to the queue is expected. */
109       *hsp_list_out = NULL;
110       status =  kBlastHSPStream_Eof;
111    } else {
112       ListNode* start_node = stream_data->m_queueStart;
113 
114       *hsp_list_out = (BlastHSPList*) start_node->ptr;
115 
116       stream_data->m_queueStart = start_node->next;
117       start_node->next = NULL;
118       ListNodeFree(start_node);
119       if (!stream_data->m_queueStart)
120          stream_data->m_queueEnd = NULL;
121       status = kBlastHSPStream_Success;
122    }
123 
124    NlmMutexUnlock(stream_data->m_resultsMutex);
125 
126    return status;
127 }
128 
129 /** Write an HSP list to the results queue.
130  * @param hsp_stream BlastHSPStream to write to. [in]
131  * @param hsp_list Pointer to an HSP list to save in the queue. The HSP stream
132  *                 takes ownership of the HSP list and sets the dereferenced
133  *                 pointer to NULL [in]
134  * @return Status: success or error, if queue is already closed for writing.
135  */
136 static int 
137 BlastHSPListQueueWrite(BlastHSPStream* hsp_stream, 
138                        BlastHSPList** hsp_list)
139 {
140    BlastHSPListQueueData* stream_data = 
141       (BlastHSPListQueueData*) GetData(hsp_stream);
142 
143    /* If input is Null, don't do anything, but return success */
144    if (*hsp_list == NULL)
145       return kBlastHSPStream_Success;
146 
147     /* If input HSP list is empty, free it and return success */
148     if ((*hsp_list)->hspcnt == 0) {
149         *hsp_list = Blast_HSPListFree(*hsp_list);
150         return kBlastHSPStream_Success;
151     }    
152 
153    /* If stream is closed for writing, return error */
154    if (stream_data->m_writingDone)
155       return kBlastHSPStream_Error;
156 
157    NlmMutexLockEx(&stream_data->m_resultsMutex);
158    stream_data->m_queueEnd = 
159       ListNodeAddPointer(&stream_data->m_queueEnd, 0, (void*)(*hsp_list));
160    if (!stream_data->m_queueStart)
161       stream_data->m_queueStart = stream_data->m_queueEnd;
162    /* Free the caller from this pointer's ownership. */
163    *hsp_list = NULL;
164    /* Increment the semaphore count. */
165    NlmSemaPost(stream_data->m_resultsSema);
166    NlmMutexUnlock(stream_data->m_resultsMutex);
167 
168    return kBlastHSPStream_Success;
169 }
170 
171 /** Prohibit any future writing to the HSP list queue. Also increment the 
172  * read semaphore, to allow exit out of the wait state in the reading function.
173  * @param hsp_stream The BlastHSPStream pointer [in] [out]
174  */
175 static void 
176 BlastHSPListQueueClose(BlastHSPStream* hsp_stream)
177 {
178    BlastHSPListQueueData* stream_data = 
179       (BlastHSPListQueueData*) GetData(hsp_stream);
180    NlmMutexLockEx(&stream_data->m_resultsMutex);
181    stream_data->m_writingDone = TRUE;
182    /* Increment the semaphore count so the reading thread can get out of 
183     * the waiting state and check the m_writingDone variable. */
184    NlmSemaPost(stream_data->m_resultsSema);
185    NlmMutexUnlock(stream_data->m_resultsMutex);
186 }
187 
188 /** Set functions pointers and data structure pointer in a new BlastHSPStream 
189  * with an HSP list queue data structure.
190  * @param hsp_stream The BlastHSPStream to initialize [in] [out]
191  * @param args Pointer to the HSP list queue data structure [in]
192  */
193 static BlastHSPStream* 
194 BlastHSPListQueueNew(BlastHSPStream* hsp_stream, void* args) 
195 {
196     BlastHSPStreamFunctionPointerTypes fnptr;
197 
198     fnptr.dtor = &BlastHSPListQueueFree;
199     SetMethod(hsp_stream, eDestructor, fnptr);
200     fnptr.method = &BlastHSPListQueueRead;
201     SetMethod(hsp_stream, eRead, fnptr);
202     fnptr.method = &BlastHSPListQueueWrite;
203     SetMethod(hsp_stream, eWrite, fnptr);
204     fnptr.closeFn = &BlastHSPListQueueClose;
205     SetMethod(hsp_stream, eClose, fnptr);
206     fnptr.batch_read = NULL;
207     SetMethod(hsp_stream, eBatchRead, fnptr);
208     fnptr.mergeFn = NULL;
209     SetMethod(hsp_stream, eMerge, fnptr);
210 
211     SetData(hsp_stream, args);
212     return hsp_stream;
213 }
214 
215 /* Create a new BlastHSPStream with an HSP list queue data structure. */
216 BlastHSPStream* Blast_HSPListQueueInit()
217 {
218     BlastHSPListQueueData* stream_data = 
219        (BlastHSPListQueueData*) calloc(1, sizeof(BlastHSPListQueueData));
220     BlastHSPStreamNewInfo info;
221 
222     /* At the start of the search there is nothing in the results queue, so
223      * initialize the semaphore count with 0. */
224     stream_data->m_resultsSema = NlmSemaInit(0);
225     info.constructor = &BlastHSPListQueueNew;
226     info.ctor_argument = (void*)stream_data;
227 
228     return BlastHSPStreamNew(&info);
229 }
230 
231 /* @} */
232 
233 

source navigation ]   [ diff markup ]   [ identifier search ]   [ freetext search ]   [ file search ]  

This page was automatically generated by the LXR engine.
Visit the LXR main site for more information.