/[svn]/linuxsampler/trunk/src/engines/gig/DiskThread.cpp
ViewVC logotype

Annotation of /linuxsampler/trunk/src/engines/gig/DiskThread.cpp

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1855 - (hide annotations) (download)
Mon Mar 2 15:33:38 2009 UTC (15 years, 1 month ago) by iliev
File size: 19806 byte(s)
* fixed endless loop in Engine::SuspendAll() (bug #120)
* AU plugin, work in progress: minor fixes in the build files

1 schoenebeck 53 /***************************************************************************
2     * *
3     * LinuxSampler - modular, streaming capable sampler *
4     * *
5 schoenebeck 56 * Copyright (C) 2003, 2004 by Benno Senoner and Christian Schoenebeck *
6 persson 1644 * Copyright (C) 2005 - 2008 Christian Schoenebeck *
7 schoenebeck 53 * *
8     * This program is free software; you can redistribute it and/or modify *
9     * it under the terms of the GNU General Public License as published by *
10     * the Free Software Foundation; either version 2 of the License, or *
11     * (at your option) any later version. *
12     * *
13     * This program is distributed in the hope that it will be useful, *
14     * but WITHOUT ANY WARRANTY; without even the implied warranty of *
15     * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
16     * GNU General Public License for more details. *
17     * *
18     * You should have received a copy of the GNU General Public License *
19     * along with this program; if not, write to the Free Software *
20     * Foundation, Inc., 59 Temple Place, Suite 330, Boston, *
21     * MA 02111-1307 USA *
22     ***************************************************************************/
23    
24     #include <sstream>
25    
26     #include "DiskThread.h"
27    
28     namespace LinuxSampler { namespace gig {
29    
30     // *********** DiskThread **************
31     // *
32    
33    
34     // just a placeholder to mark a cell in the pickup array as 'reserved'
35     Stream* DiskThread::SLOT_RESERVED = (Stream*) &SLOT_RESERVED;
36    
37    
38     // #########################################################################
39     // # Foreign Thread Section
40     // # (following code intended to be interface for audio thread)
41    
42    
43     /**
44     * Suspend disk thread, kill all active streams, clear all queues and the
45     * pickup array and reset all streams. Call this method to bring everything
46     * in the disk thread to day one. If the disk thread was running, it will be
47     * respawned right after everything was reset.
48     */
49     void DiskThread::Reset() {
50     bool running = this->IsRunning();
51     if (running) this->StopThread();
52 schoenebeck 1800 for (int i = 0; i < Streams; i++) {
53 schoenebeck 53 pStreams[i]->Kill();
54     }
55 schoenebeck 1800 for (int i = 1; i <= Streams; i++) {
56 schoenebeck 53 pCreatedStreams[i] = NULL;
57     }
58     GhostQueue->init();
59     CreationQueue->init();
60     DeletionQueue->init();
61 schoenebeck 1321 DeletionNotificationQueue.init();
62 persson 1644
63     // make sure that all DimensionRegions are released
64     while (DeleteDimregQueue->read_space() > 0) {
65     ::gig::DimensionRegion* dimreg;
66     DeleteDimregQueue->pop(&dimreg);
67     pInstruments->HandBackDimReg(dimreg);
68     }
69 persson 1038 DeleteDimregQueue->init();
70 iliev 1789 SetActiveStreamCount(0);
71 schoenebeck 53 ActiveStreamCountMax = 0;
72     if (running) this->StartThread(); // start thread only if it was running before
73     }
74    
75     String DiskThread::GetBufferFillBytes() {
76     bool activestreams = false;
77     std::stringstream ss;
78     for (uint i = 0; i < this->Streams; i++) {
79     if (pStreams[i]->GetState() == Stream::state_unused) continue;
80     uint bufferfill = pStreams[i]->GetReadSpace() * sizeof(sample_t);
81     uint streamid = (uint) pStreams[i]->GetHandle();
82     if (!streamid) continue;
83    
84     if (activestreams) ss << ",[" << streamid << ']' << bufferfill;
85     else {
86     ss << '[' << streamid << ']' << bufferfill;
87     activestreams = true;
88     }
89     }
90     return ss.str();
91     }
92    
93     String DiskThread::GetBufferFillPercentage() {
94     bool activestreams = false;
95     std::stringstream ss;
96     for (uint i = 0; i < this->Streams; i++) {
97     if (pStreams[i]->GetState() == Stream::state_unused) continue;
98 schoenebeck 554 uint bufferfill = (uint) ((float) pStreams[i]->GetReadSpace() / (float) CONFIG_STREAM_BUFFER_SIZE * 100);
99 schoenebeck 53 uint streamid = (uint) pStreams[i]->GetHandle();
100     if (!streamid) continue;
101    
102     if (activestreams) ss << ",[" << streamid << ']' << bufferfill << '%';
103     else {
104     ss << '[' << streamid << ']' << bufferfill;
105     activestreams = true;
106     }
107     }
108     return ss.str();
109     }
110    
111     /**
112     * Returns -1 if command queue or pickup pool is full, 0 on success (will be
113     * called by audio thread within the voice class).
114     */
115 persson 865 int DiskThread::OrderNewStream(Stream::reference_t* pStreamRef, ::gig::DimensionRegion* pDimRgn, unsigned long SampleOffset, bool DoLoop) {
116 schoenebeck 53 dmsg(4,("Disk Thread: new stream ordered\n"));
117     if (CreationQueue->write_space() < 1) {
118     dmsg(1,("DiskThread: Order queue full!\n"));
119     return -1;
120     }
121    
122 senkov 329 const Stream::OrderID_t newOrder = CreateOrderID();
123     if (!newOrder) {
124     dmsg(1,("DiskThread: there was no free slot\n"));
125     return -1; // there was no free slot
126     }
127    
128 schoenebeck 53 pStreamRef->State = Stream::state_active;
129 senkov 329 pStreamRef->OrderID = newOrder;
130 schoenebeck 53 pStreamRef->hStream = CreateHandle();
131     pStreamRef->pStream = NULL; // a stream has to be activated by the disk thread first
132    
133     create_command_t cmd;
134     cmd.OrderID = pStreamRef->OrderID;
135     cmd.hStream = pStreamRef->hStream;
136     cmd.pStreamRef = pStreamRef;
137 persson 865 cmd.pDimRgn = pDimRgn;
138 schoenebeck 53 cmd.SampleOffset = SampleOffset;
139     cmd.DoLoop = DoLoop;
140    
141     CreationQueue->push(&cmd);
142     return 0;
143     }
144    
145     /**
146 schoenebeck 1321 * Request the disk thread to delete the given disk stream. This method
147     * will return immediately, thus it won't block until the respective voice
148     * was actually deleted. (Called by audio thread within the Voice class).
149     *
150     * @param pStreamRef - stream that shall be deleted
151     * @param bRequestNotification - set to true in case you want to receive a
152     * notification once the stream has actually
153     * been deleted
154     * @returns 0 on success, -1 if command queue is full
155     * @see AskForDeletedStream()
156 schoenebeck 53 */
157 schoenebeck 1321 int DiskThread::OrderDeletionOfStream(Stream::reference_t* pStreamRef, bool bRequestNotification) {
158 schoenebeck 53 dmsg(4,("Disk Thread: stream deletion ordered\n"));
159     if (DeletionQueue->write_space() < 1) {
160     dmsg(1,("DiskThread: Deletion queue full!\n"));
161     return -1;
162     }
163    
164     delete_command_t cmd;
165     cmd.pStream = pStreamRef->pStream;
166     cmd.hStream = pStreamRef->hStream;
167     cmd.OrderID = pStreamRef->OrderID;
168 schoenebeck 1321 cmd.bNotify = bRequestNotification;
169 schoenebeck 53
170     DeletionQueue->push(&cmd);
171     return 0;
172     }
173    
174     /**
175 schoenebeck 1321 * Tell the disk thread to release a dimension region that belongs
176 persson 1038 * to an instrument which isn't loaded anymore. The disk thread
177     * will hand back the dimension region to the instrument resource
178     * manager. (OrderDeletionOfDimreg is called from the audio thread
179     * when a voice dies.)
180     */
181     int DiskThread::OrderDeletionOfDimreg(::gig::DimensionRegion* dimreg) {
182     dmsg(4,("Disk Thread: dimreg deletion ordered\n"));
183     if (DeleteDimregQueue->write_space() < 1) {
184     dmsg(1,("DiskThread: DeleteDimreg queue full!\n"));
185     return -1;
186     }
187     DeleteDimregQueue->push(&dimreg);
188     return 0;
189     }
190    
191     /**
192 schoenebeck 53 * Returns the pointer to a disk stream if the ordered disk stream
193     * represented by the \a StreamOrderID was already activated by the disk
194     * thread, returns NULL otherwise. If the call was successful, thus if it
195     * returned a valid stream pointer, the caller has to the store that pointer
196     * by himself, because it's not possible to call this method again with the
197     * same used order ID; this method is just intended for picking up an ordered
198     * disk stream. This method will usually be called by the voice class (within
199     * the audio thread).
200     *
201     * @param StreamOrderID - ID previously returned by OrderNewStream()
202     * @returns pointer to created stream object, NULL otherwise
203     */
204     Stream* DiskThread::AskForCreatedStream(Stream::OrderID_t StreamOrderID) {
205     dmsg(4,("Disk Thread: been asked if stream already created, OrderID=%x ", StreamOrderID));
206     Stream* pStream = pCreatedStreams[StreamOrderID];
207     if (pStream && pStream != SLOT_RESERVED) {
208     dmsg(4,("(yes created)\n"));
209     pCreatedStreams[StreamOrderID] = NULL; // free the slot for a new order
210     return pStream;
211     }
212     dmsg(4,("(no not yet created)\n"));
213     return NULL;
214     }
215    
216 schoenebeck 1321 /**
217     * In case the original sender requested a notification with his stream
218     * deletion order, he can use this method to poll if the respective stream
219     * has actually been deleted.
220     *
221     * @returns handle / identifier of the deleted stream, or
222     * Stream::INVALID_HANDLE if no notification present
223     */
224     Stream::Handle DiskThread::AskForDeletedStream() {
225     if (DeletionNotificationQueue.read_space()) {
226     Stream::Handle hStream;
227     DeletionNotificationQueue.pop(&hStream);
228     return hStream;
229     } else return Stream::INVALID_HANDLE; // no notification received yet
230     }
231 schoenebeck 53
232    
233 schoenebeck 1321
234 schoenebeck 53 // #########################################################################
235     // # Disk Thread Only Section
236     // # (following code should only be executed by the disk thread)
237    
238    
239 schoenebeck 1800 DiskThread::DiskThread(int MaxStreams, uint BufferWrapElements, InstrumentResourceManager* pInstruments) :
240 persson 1038 Thread(true, false, 1, -2),
241 schoenebeck 1321 pInstruments(pInstruments),
242 schoenebeck 1800 DeletionNotificationQueue(4*MaxStreams)
243 schoenebeck 1321 {
244 schoenebeck 554 DecompressionBuffer = ::gig::Sample::CreateDecompressionBuffer(CONFIG_STREAM_MAX_REFILL_SIZE);
245 schoenebeck 1800 CreationQueue = new RingBuffer<create_command_t,false>(4*MaxStreams);
246     DeletionQueue = new RingBuffer<delete_command_t,false>(4*MaxStreams);
247     GhostQueue = new RingBuffer<delete_command_t,false>(MaxStreams);
248     DeleteDimregQueue = new RingBuffer< ::gig::DimensionRegion*,false>(4*MaxStreams);
249     pStreams = new Stream*[MaxStreams];
250     pCreatedStreams = new Stream*[MaxStreams + 1];
251     Streams = MaxStreams;
252 schoenebeck 554 RefillStreamsPerRun = CONFIG_REFILL_STREAMS_PER_RUN;
253 schoenebeck 1800 for (int i = 0; i < MaxStreams; i++) {
254 schoenebeck 554 pStreams[i] = new Stream(&DecompressionBuffer, CONFIG_STREAM_BUFFER_SIZE, BufferWrapElements); // 131072 sample words
255 schoenebeck 53 }
256 schoenebeck 1800 for (int i = 1; i <= MaxStreams; i++) {
257 schoenebeck 53 pCreatedStreams[i] = NULL;
258     }
259 persson 835 ActiveStreamCountMax = 0;
260 schoenebeck 53 }
261    
262     DiskThread::~DiskThread() {
263 schoenebeck 1800 for (int i = 0; i < Streams; i++) {
264 schoenebeck 53 if (pStreams[i]) delete pStreams[i];
265     }
266     if (CreationQueue) delete CreationQueue;
267     if (DeletionQueue) delete DeletionQueue;
268     if (GhostQueue) delete GhostQueue;
269 persson 1038 if (DeleteDimregQueue) delete DeleteDimregQueue;
270 schoenebeck 1800 if (pStreams) delete[] pStreams;
271     if (pCreatedStreams) delete[] pCreatedStreams;
272    
273 schoenebeck 385 ::gig::Sample::DestroyDecompressionBuffer(DecompressionBuffer);
274 schoenebeck 53 }
275    
276     int DiskThread::Main() {
277     dmsg(3,("Disk thread running\n"));
278     while (true) {
279 senoner 1481 #if !defined(WIN32)
280 schoenebeck 361 pthread_testcancel(); // mandatory for OSX
281 senoner 1481 #endif
282 schoenebeck 1800 #if CONFIG_PTHREAD_TESTCANCEL
283     TestCancel();
284     #endif
285 schoenebeck 53 IsIdle = true; // will be set to false if a stream got filled
286    
287     // if there are ghost streams, delete them
288     for (int i = 0; i < GhostQueue->read_space(); i++) { //FIXME: unefficient
289 schoenebeck 1321 delete_command_t ghostStream;
290     GhostQueue->pop(&ghostStream);
291 schoenebeck 53 bool found = false;
292     for (int i = 0; i < this->Streams; i++) {
293 schoenebeck 1321 if (pStreams[i]->GetHandle() == ghostStream.hStream) {
294 schoenebeck 53 pStreams[i]->Kill();
295     found = true;
296 schoenebeck 1321 // if original sender requested a notification, let him know now
297     if (ghostStream.bNotify)
298     DeletionNotificationQueue.push(&ghostStream.hStream);
299 schoenebeck 53 break;
300     }
301     }
302 schoenebeck 1321 if (!found) GhostQueue->push(&ghostStream); // put ghost stream handle back to the queue
303 schoenebeck 53 }
304    
305     // if there are creation commands, create new streams
306     while (Stream::UnusedStreams > 0 && CreationQueue->read_space() > 0) {
307     create_command_t command;
308     CreationQueue->pop(&command);
309     CreateStream(command);
310     }
311    
312     // if there are deletion commands, delete those streams
313 senkov 333 while (Stream::UnusedStreams < Stream::TotalStreams && DeletionQueue->read_space() > 0) {
314 schoenebeck 53 delete_command_t command;
315     DeletionQueue->pop(&command);
316     DeleteStream(command);
317     }
318    
319 persson 1038 // release DimensionRegions that belong to instruments
320     // that are no longer loaded
321     while (DeleteDimregQueue->read_space() > 0) {
322     ::gig::DimensionRegion* dimreg;
323     DeleteDimregQueue->pop(&dimreg);
324     pInstruments->HandBackDimReg(dimreg);
325     }
326    
327 schoenebeck 53 RefillStreams(); // refill the most empty streams
328    
329     // if nothing was done during this iteration (eg no streambuffer
330 persson 840 // filled with data) then sleep for 30ms
331 schoenebeck 53 if (IsIdle) usleep(30000);
332    
333     int streamsInUsage = 0;
334     for (int i = Streams - 1; i >= 0; i--) {
335     if (pStreams[i]->GetState() != Stream::state_unused) streamsInUsage++;
336     }
337 iliev 1789 SetActiveStreamCount(streamsInUsage);
338 schoenebeck 53 if (streamsInUsage > ActiveStreamCountMax) ActiveStreamCountMax = streamsInUsage;
339     }
340    
341     return EXIT_FAILURE;
342     }
343    
344     void DiskThread::CreateStream(create_command_t& Command) {
345     // search for unused stream
346     Stream* newstream = NULL;
347     for (int i = Streams - 1; i >= 0; i--) {
348     if (pStreams[i]->GetState() == Stream::state_unused) {
349     newstream = pStreams[i];
350     break;
351     }
352     }
353     if (!newstream) {
354     std::cerr << "No unused stream found (OrderID:" << Command.OrderID << ") - report if this happens, this is a bug!\n" << std::flush;
355     return;
356     }
357 persson 865 newstream->Launch(Command.hStream, Command.pStreamRef, Command.pDimRgn, Command.SampleOffset, Command.DoLoop);
358 schoenebeck 53 dmsg(4,("new Stream launched by disk thread (OrderID:%d,StreamHandle:%d)\n", Command.OrderID, Command.hStream));
359     if (pCreatedStreams[Command.OrderID] != SLOT_RESERVED) {
360     std::cerr << "DiskThread: Slot " << Command.OrderID << " already occupied! Please report this!\n" << std::flush;
361     newstream->Kill();
362     return;
363     }
364     pCreatedStreams[Command.OrderID] = newstream;
365     }
366    
367     void DiskThread::DeleteStream(delete_command_t& Command) {
368 iliev 1855 if (Command.pStream) {
369     Command.pStream->Kill();
370     if (Command.bNotify) DeletionNotificationQueue.push(&Command.hStream);
371     }
372 schoenebeck 53 else { // the stream wasn't created by disk thread or picked up by audio thread yet
373    
374     // if stream was created but not picked up yet
375     Stream* pStream = pCreatedStreams[Command.OrderID];
376     if (pStream && pStream != SLOT_RESERVED) {
377     pStream->Kill();
378     pCreatedStreams[Command.OrderID] = NULL; // free slot for new order
379 schoenebeck 1321 // if original sender requested a notification, let him know now
380     if (Command.bNotify)
381     DeletionNotificationQueue.push(&Command.hStream);
382 schoenebeck 53 return;
383     }
384    
385     // the stream was not created yet
386     if (GhostQueue->write_space() > 0) {
387 schoenebeck 1321 GhostQueue->push(&Command);
388     } else { // error, queue full
389     if (Command.bNotify) {
390     dmsg(1,("DiskThread: GhostQueue full! (might lead to dead lock with instrument editor!)\n"));
391     } else {
392     dmsg(1,("DiskThread: GhostQueue full!\n"));
393     }
394 schoenebeck 53 }
395     }
396     }
397    
398     void DiskThread::RefillStreams() {
399     // sort the streams by most empty stream
400     qsort(pStreams, Streams, sizeof(Stream*), CompareStreamWriteSpace);
401    
402     // refill the most empty streams
403     for (uint i = 0; i < RefillStreamsPerRun; i++) {
404     if (pStreams[i]->GetState() == Stream::state_active) {
405    
406     //float filledpercentage = (float) pStreams[i]->GetReadSpace() / 131072.0 * 100.0;
407     //dmsg(("\nbuffer fill: %.1f%\n", filledpercentage));
408    
409     int writespace = pStreams[i]->GetWriteSpaceToEnd();
410     if (writespace == 0) break;
411    
412     int capped_writespace = writespace;
413     // if there is too much buffer space available then cut the read/write
414 schoenebeck 554 // size to CONFIG_STREAM_MAX_REFILL_SIZE which is by default 65536 samples = 256KBytes
415     if (writespace > CONFIG_STREAM_MAX_REFILL_SIZE) capped_writespace = CONFIG_STREAM_MAX_REFILL_SIZE;
416 schoenebeck 53
417     // adjust the amount to read in order to ensure that the buffer wraps correctly
418     int read_amount = pStreams[i]->AdjustWriteSpaceToAvoidBoundary(writespace, capped_writespace);
419     // if we wasn't able to refill one of the stream buffers by more than
420 schoenebeck 554 // CONFIG_STREAM_MIN_REFILL_SIZE we'll send the disk thread to sleep later
421     if (pStreams[i]->ReadAhead(read_amount) > CONFIG_STREAM_MIN_REFILL_SIZE) this->IsIdle = false;
422 schoenebeck 53 }
423     }
424     }
425    
426     /// Handle Generator
427     Stream::Handle DiskThread::CreateHandle() {
428     static uint32_t counter = 0;
429     if (counter == 0xffffffff) counter = 1; // we use '0' as 'invalid handle' only, so we skip 0
430     else counter++;
431     return counter;
432     }
433    
434     /// order ID Generator
435     Stream::OrderID_t DiskThread::CreateOrderID() {
436 senkov 329 static Stream::OrderID_t counter(0);
437 schoenebeck 1800 for (int i = 0; i < Streams; i++) {
438     if (counter == Streams) counter = 1; // we use '0' as 'invalid order' only, so we skip 0
439     else counter++;
440 schoenebeck 53 if (!pCreatedStreams[counter]) {
441     pCreatedStreams[counter] = SLOT_RESERVED; // mark this slot as reserved
442     return counter; // found empty slot
443     }
444     }
445     return 0; // no free slot
446     }
447    
448    
449    
450     // *********** C functions **************
451     // *
452    
453     /**
454     * This is the comparison function the qsort algo uses to determine if a value is
455     * bigger than another one or special in our case; if the writespace of a stream
456     * is bigger than another one.
457     */
458     int CompareStreamWriteSpace(const void* A, const void* B) {
459     Stream* a = *(Stream**) A;
460     Stream* b = *(Stream**) B;
461     return b->GetWriteSpace() - a->GetWriteSpace();
462     }
463    
464     }} // namespace LinuxSampler::gig

  ViewVC Help
Powered by ViewVC