/[svn]/linuxsampler/trunk/src/engines/gig/DiskThread.cpp
ViewVC logotype

Annotation of /linuxsampler/trunk/src/engines/gig/DiskThread.cpp

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1321 - (hide annotations) (download)
Tue Sep 4 01:12:49 2007 UTC (16 years, 7 months ago) by schoenebeck
File size: 19188 byte(s)
* added highly experimental code for synchronizing instrument editors
  hosted in the sampler's process to safely edit instruments while playing
  without a crash (hopefully) by either suspending single regions wherever
  possible or - if unavoidable - whole engine(s)
* disk thread: queue sizes are now proportional to CONFIG_MAX_STREAMS
  instead of fix values
* removed legacy Makefiles in meanwhile deleted src/lib directory and its
  subdirectories
* bumped version to 0.4.0.7cvs

1 schoenebeck 53 /***************************************************************************
2     * *
3     * LinuxSampler - modular, streaming capable sampler *
4     * *
5 schoenebeck 56 * Copyright (C) 2003, 2004 by Benno Senoner and Christian Schoenebeck *
6 schoenebeck 1321 * Copyright (C) 2005 - 2007 Christian Schoenebeck *
7 schoenebeck 53 * *
8     * This program is free software; you can redistribute it and/or modify *
9     * it under the terms of the GNU General Public License as published by *
10     * the Free Software Foundation; either version 2 of the License, or *
11     * (at your option) any later version. *
12     * *
13     * This program is distributed in the hope that it will be useful, *
14     * but WITHOUT ANY WARRANTY; without even the implied warranty of *
15     * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
16     * GNU General Public License for more details. *
17     * *
18     * You should have received a copy of the GNU General Public License *
19     * along with this program; if not, write to the Free Software *
20     * Foundation, Inc., 59 Temple Place, Suite 330, Boston, *
21     * MA 02111-1307 USA *
22     ***************************************************************************/
23    
24     #include <sstream>
25    
26     #include "DiskThread.h"
27    
28     namespace LinuxSampler { namespace gig {
29    
30     // *********** DiskThread **************
31     // *
32    
33    
34     // just a placeholder to mark a cell in the pickup array as 'reserved'
35     Stream* DiskThread::SLOT_RESERVED = (Stream*) &SLOT_RESERVED;
36    
37    
38     // #########################################################################
39     // # Foreign Thread Section
40     // # (following code intended to be interface for audio thread)
41    
42    
43     /**
44     * Suspend disk thread, kill all active streams, clear all queues and the
45     * pickup array and reset all streams. Call this method to bring everything
46     * in the disk thread to day one. If the disk thread was running, it will be
47     * respawned right after everything was reset.
48     */
49     void DiskThread::Reset() {
50     bool running = this->IsRunning();
51     if (running) this->StopThread();
52 schoenebeck 554 for (int i = 0; i < CONFIG_MAX_STREAMS; i++) {
53 schoenebeck 53 pStreams[i]->Kill();
54     }
55 schoenebeck 554 for (int i = 1; i <= CONFIG_MAX_STREAMS; i++) {
56 schoenebeck 53 pCreatedStreams[i] = NULL;
57     }
58     GhostQueue->init();
59     CreationQueue->init();
60     DeletionQueue->init();
61 schoenebeck 1321 DeletionNotificationQueue.init();
62 persson 1038 DeleteDimregQueue->init();
63 schoenebeck 53 ActiveStreamCount = 0;
64     ActiveStreamCountMax = 0;
65     if (running) this->StartThread(); // start thread only if it was running before
66     }
67    
68     String DiskThread::GetBufferFillBytes() {
69     bool activestreams = false;
70     std::stringstream ss;
71     for (uint i = 0; i < this->Streams; i++) {
72     if (pStreams[i]->GetState() == Stream::state_unused) continue;
73     uint bufferfill = pStreams[i]->GetReadSpace() * sizeof(sample_t);
74     uint streamid = (uint) pStreams[i]->GetHandle();
75     if (!streamid) continue;
76    
77     if (activestreams) ss << ",[" << streamid << ']' << bufferfill;
78     else {
79     ss << '[' << streamid << ']' << bufferfill;
80     activestreams = true;
81     }
82     }
83     return ss.str();
84     }
85    
86     String DiskThread::GetBufferFillPercentage() {
87     bool activestreams = false;
88     std::stringstream ss;
89     for (uint i = 0; i < this->Streams; i++) {
90     if (pStreams[i]->GetState() == Stream::state_unused) continue;
91 schoenebeck 554 uint bufferfill = (uint) ((float) pStreams[i]->GetReadSpace() / (float) CONFIG_STREAM_BUFFER_SIZE * 100);
92 schoenebeck 53 uint streamid = (uint) pStreams[i]->GetHandle();
93     if (!streamid) continue;
94    
95     if (activestreams) ss << ",[" << streamid << ']' << bufferfill << '%';
96     else {
97     ss << '[' << streamid << ']' << bufferfill;
98     activestreams = true;
99     }
100     }
101     return ss.str();
102     }
103    
104     /**
105     * Returns -1 if command queue or pickup pool is full, 0 on success (will be
106     * called by audio thread within the voice class).
107     */
108 persson 865 int DiskThread::OrderNewStream(Stream::reference_t* pStreamRef, ::gig::DimensionRegion* pDimRgn, unsigned long SampleOffset, bool DoLoop) {
109 schoenebeck 53 dmsg(4,("Disk Thread: new stream ordered\n"));
110     if (CreationQueue->write_space() < 1) {
111     dmsg(1,("DiskThread: Order queue full!\n"));
112     return -1;
113     }
114    
115 senkov 329 const Stream::OrderID_t newOrder = CreateOrderID();
116     if (!newOrder) {
117     dmsg(1,("DiskThread: there was no free slot\n"));
118     return -1; // there was no free slot
119     }
120    
121 schoenebeck 53 pStreamRef->State = Stream::state_active;
122 senkov 329 pStreamRef->OrderID = newOrder;
123 schoenebeck 53 pStreamRef->hStream = CreateHandle();
124     pStreamRef->pStream = NULL; // a stream has to be activated by the disk thread first
125    
126     create_command_t cmd;
127     cmd.OrderID = pStreamRef->OrderID;
128     cmd.hStream = pStreamRef->hStream;
129     cmd.pStreamRef = pStreamRef;
130 persson 865 cmd.pDimRgn = pDimRgn;
131 schoenebeck 53 cmd.SampleOffset = SampleOffset;
132     cmd.DoLoop = DoLoop;
133    
134     CreationQueue->push(&cmd);
135     return 0;
136     }
137    
138     /**
139 schoenebeck 1321 * Request the disk thread to delete the given disk stream. This method
140     * will return immediately, thus it won't block until the respective voice
141     * was actually deleted. (Called by audio thread within the Voice class).
142     *
143     * @param pStreamRef - stream that shall be deleted
144     * @param bRequestNotification - set to true in case you want to receive a
145     * notification once the stream has actually
146     * been deleted
147     * @returns 0 on success, -1 if command queue is full
148     * @see AskForDeletedStream()
149 schoenebeck 53 */
150 schoenebeck 1321 int DiskThread::OrderDeletionOfStream(Stream::reference_t* pStreamRef, bool bRequestNotification) {
151 schoenebeck 53 dmsg(4,("Disk Thread: stream deletion ordered\n"));
152     if (DeletionQueue->write_space() < 1) {
153     dmsg(1,("DiskThread: Deletion queue full!\n"));
154     return -1;
155     }
156    
157     delete_command_t cmd;
158     cmd.pStream = pStreamRef->pStream;
159     cmd.hStream = pStreamRef->hStream;
160     cmd.OrderID = pStreamRef->OrderID;
161 schoenebeck 1321 cmd.bNotify = bRequestNotification;
162 schoenebeck 53
163     DeletionQueue->push(&cmd);
164     return 0;
165     }
166    
167     /**
168 schoenebeck 1321 * Tell the disk thread to release a dimension region that belongs
169 persson 1038 * to an instrument which isn't loaded anymore. The disk thread
170     * will hand back the dimension region to the instrument resource
171     * manager. (OrderDeletionOfDimreg is called from the audio thread
172     * when a voice dies.)
173     */
174     int DiskThread::OrderDeletionOfDimreg(::gig::DimensionRegion* dimreg) {
175     dmsg(4,("Disk Thread: dimreg deletion ordered\n"));
176     if (DeleteDimregQueue->write_space() < 1) {
177     dmsg(1,("DiskThread: DeleteDimreg queue full!\n"));
178     return -1;
179     }
180     DeleteDimregQueue->push(&dimreg);
181     return 0;
182     }
183    
184     /**
185 schoenebeck 53 * Returns the pointer to a disk stream if the ordered disk stream
186     * represented by the \a StreamOrderID was already activated by the disk
187     * thread, returns NULL otherwise. If the call was successful, thus if it
188     * returned a valid stream pointer, the caller has to the store that pointer
189     * by himself, because it's not possible to call this method again with the
190     * same used order ID; this method is just intended for picking up an ordered
191     * disk stream. This method will usually be called by the voice class (within
192     * the audio thread).
193     *
194     * @param StreamOrderID - ID previously returned by OrderNewStream()
195     * @returns pointer to created stream object, NULL otherwise
196     */
197     Stream* DiskThread::AskForCreatedStream(Stream::OrderID_t StreamOrderID) {
198     dmsg(4,("Disk Thread: been asked if stream already created, OrderID=%x ", StreamOrderID));
199     Stream* pStream = pCreatedStreams[StreamOrderID];
200     if (pStream && pStream != SLOT_RESERVED) {
201     dmsg(4,("(yes created)\n"));
202     pCreatedStreams[StreamOrderID] = NULL; // free the slot for a new order
203     return pStream;
204     }
205     dmsg(4,("(no not yet created)\n"));
206     return NULL;
207     }
208    
209 schoenebeck 1321 /**
210     * In case the original sender requested a notification with his stream
211     * deletion order, he can use this method to poll if the respective stream
212     * has actually been deleted.
213     *
214     * @returns handle / identifier of the deleted stream, or
215     * Stream::INVALID_HANDLE if no notification present
216     */
217     Stream::Handle DiskThread::AskForDeletedStream() {
218     if (DeletionNotificationQueue.read_space()) {
219     Stream::Handle hStream;
220     DeletionNotificationQueue.pop(&hStream);
221     return hStream;
222     } else return Stream::INVALID_HANDLE; // no notification received yet
223     }
224 schoenebeck 53
225    
226 schoenebeck 1321
227 schoenebeck 53 // #########################################################################
228     // # Disk Thread Only Section
229     // # (following code should only be executed by the disk thread)
230    
231    
232 persson 1038 DiskThread::DiskThread(uint BufferWrapElements, InstrumentResourceManager* pInstruments) :
233     Thread(true, false, 1, -2),
234 schoenebeck 1321 pInstruments(pInstruments),
235     DeletionNotificationQueue(4*CONFIG_MAX_STREAMS)
236     {
237 schoenebeck 554 DecompressionBuffer = ::gig::Sample::CreateDecompressionBuffer(CONFIG_STREAM_MAX_REFILL_SIZE);
238 schoenebeck 1321 CreationQueue = new RingBuffer<create_command_t,false>(4*CONFIG_MAX_STREAMS);
239     DeletionQueue = new RingBuffer<delete_command_t,false>(4*CONFIG_MAX_STREAMS);
240     GhostQueue = new RingBuffer<delete_command_t,false>(CONFIG_MAX_STREAMS);
241     DeleteDimregQueue = new RingBuffer< ::gig::DimensionRegion*,false>(4*CONFIG_MAX_STREAMS);
242 schoenebeck 554 Streams = CONFIG_MAX_STREAMS;
243     RefillStreamsPerRun = CONFIG_REFILL_STREAMS_PER_RUN;
244     for (int i = 0; i < CONFIG_MAX_STREAMS; i++) {
245     pStreams[i] = new Stream(&DecompressionBuffer, CONFIG_STREAM_BUFFER_SIZE, BufferWrapElements); // 131072 sample words
246 schoenebeck 53 }
247 schoenebeck 554 for (int i = 1; i <= CONFIG_MAX_STREAMS; i++) {
248 schoenebeck 53 pCreatedStreams[i] = NULL;
249     }
250 persson 835 ActiveStreamCountMax = 0;
251 schoenebeck 53 }
252    
253     DiskThread::~DiskThread() {
254 schoenebeck 554 for (int i = 0; i < CONFIG_MAX_STREAMS; i++) {
255 schoenebeck 53 if (pStreams[i]) delete pStreams[i];
256     }
257     if (CreationQueue) delete CreationQueue;
258     if (DeletionQueue) delete DeletionQueue;
259     if (GhostQueue) delete GhostQueue;
260 persson 1038 if (DeleteDimregQueue) delete DeleteDimregQueue;
261 schoenebeck 385 ::gig::Sample::DestroyDecompressionBuffer(DecompressionBuffer);
262 schoenebeck 53 }
263    
264     int DiskThread::Main() {
265     dmsg(3,("Disk thread running\n"));
266     while (true) {
267 schoenebeck 361 pthread_testcancel(); // mandatory for OSX
268 schoenebeck 53 IsIdle = true; // will be set to false if a stream got filled
269    
270     // if there are ghost streams, delete them
271     for (int i = 0; i < GhostQueue->read_space(); i++) { //FIXME: unefficient
272 schoenebeck 1321 delete_command_t ghostStream;
273     GhostQueue->pop(&ghostStream);
274 schoenebeck 53 bool found = false;
275     for (int i = 0; i < this->Streams; i++) {
276 schoenebeck 1321 if (pStreams[i]->GetHandle() == ghostStream.hStream) {
277 schoenebeck 53 pStreams[i]->Kill();
278     found = true;
279 schoenebeck 1321 // if original sender requested a notification, let him know now
280     if (ghostStream.bNotify)
281     DeletionNotificationQueue.push(&ghostStream.hStream);
282 schoenebeck 53 break;
283     }
284     }
285 schoenebeck 1321 if (!found) GhostQueue->push(&ghostStream); // put ghost stream handle back to the queue
286 schoenebeck 53 }
287    
288     // if there are creation commands, create new streams
289     while (Stream::UnusedStreams > 0 && CreationQueue->read_space() > 0) {
290     create_command_t command;
291     CreationQueue->pop(&command);
292     CreateStream(command);
293     }
294    
295     // if there are deletion commands, delete those streams
296 senkov 333 while (Stream::UnusedStreams < Stream::TotalStreams && DeletionQueue->read_space() > 0) {
297 schoenebeck 53 delete_command_t command;
298     DeletionQueue->pop(&command);
299     DeleteStream(command);
300     }
301    
302 persson 1038 // release DimensionRegions that belong to instruments
303     // that are no longer loaded
304     while (DeleteDimregQueue->read_space() > 0) {
305     ::gig::DimensionRegion* dimreg;
306     DeleteDimregQueue->pop(&dimreg);
307     pInstruments->HandBackDimReg(dimreg);
308     }
309    
310 schoenebeck 53 RefillStreams(); // refill the most empty streams
311    
312     // if nothing was done during this iteration (eg no streambuffer
313 persson 840 // filled with data) then sleep for 30ms
314 schoenebeck 53 if (IsIdle) usleep(30000);
315    
316     int streamsInUsage = 0;
317     for (int i = Streams - 1; i >= 0; i--) {
318     if (pStreams[i]->GetState() != Stream::state_unused) streamsInUsage++;
319     }
320     ActiveStreamCount = streamsInUsage;
321     if (streamsInUsage > ActiveStreamCountMax) ActiveStreamCountMax = streamsInUsage;
322     }
323    
324     return EXIT_FAILURE;
325     }
326    
327     void DiskThread::CreateStream(create_command_t& Command) {
328     // search for unused stream
329     Stream* newstream = NULL;
330     for (int i = Streams - 1; i >= 0; i--) {
331     if (pStreams[i]->GetState() == Stream::state_unused) {
332     newstream = pStreams[i];
333     break;
334     }
335     }
336     if (!newstream) {
337     std::cerr << "No unused stream found (OrderID:" << Command.OrderID << ") - report if this happens, this is a bug!\n" << std::flush;
338     return;
339     }
340 persson 865 newstream->Launch(Command.hStream, Command.pStreamRef, Command.pDimRgn, Command.SampleOffset, Command.DoLoop);
341 schoenebeck 53 dmsg(4,("new Stream launched by disk thread (OrderID:%d,StreamHandle:%d)\n", Command.OrderID, Command.hStream));
342     if (pCreatedStreams[Command.OrderID] != SLOT_RESERVED) {
343     std::cerr << "DiskThread: Slot " << Command.OrderID << " already occupied! Please report this!\n" << std::flush;
344     newstream->Kill();
345     return;
346     }
347     pCreatedStreams[Command.OrderID] = newstream;
348     }
349    
350     void DiskThread::DeleteStream(delete_command_t& Command) {
351     if (Command.pStream) Command.pStream->Kill();
352     else { // the stream wasn't created by disk thread or picked up by audio thread yet
353    
354     // if stream was created but not picked up yet
355     Stream* pStream = pCreatedStreams[Command.OrderID];
356     if (pStream && pStream != SLOT_RESERVED) {
357     pStream->Kill();
358     pCreatedStreams[Command.OrderID] = NULL; // free slot for new order
359 schoenebeck 1321 // if original sender requested a notification, let him know now
360     if (Command.bNotify)
361     DeletionNotificationQueue.push(&Command.hStream);
362 schoenebeck 53 return;
363     }
364    
365     // the stream was not created yet
366     if (GhostQueue->write_space() > 0) {
367 schoenebeck 1321 GhostQueue->push(&Command);
368     } else { // error, queue full
369     if (Command.bNotify) {
370     dmsg(1,("DiskThread: GhostQueue full! (might lead to dead lock with instrument editor!)\n"));
371     } else {
372     dmsg(1,("DiskThread: GhostQueue full!\n"));
373     }
374 schoenebeck 53 }
375     }
376     }
377    
378     void DiskThread::RefillStreams() {
379     // sort the streams by most empty stream
380     qsort(pStreams, Streams, sizeof(Stream*), CompareStreamWriteSpace);
381    
382     // refill the most empty streams
383     for (uint i = 0; i < RefillStreamsPerRun; i++) {
384     if (pStreams[i]->GetState() == Stream::state_active) {
385    
386     //float filledpercentage = (float) pStreams[i]->GetReadSpace() / 131072.0 * 100.0;
387     //dmsg(("\nbuffer fill: %.1f%\n", filledpercentage));
388    
389     int writespace = pStreams[i]->GetWriteSpaceToEnd();
390     if (writespace == 0) break;
391    
392     int capped_writespace = writespace;
393     // if there is too much buffer space available then cut the read/write
394 schoenebeck 554 // size to CONFIG_STREAM_MAX_REFILL_SIZE which is by default 65536 samples = 256KBytes
395     if (writespace > CONFIG_STREAM_MAX_REFILL_SIZE) capped_writespace = CONFIG_STREAM_MAX_REFILL_SIZE;
396 schoenebeck 53
397     // adjust the amount to read in order to ensure that the buffer wraps correctly
398     int read_amount = pStreams[i]->AdjustWriteSpaceToAvoidBoundary(writespace, capped_writespace);
399     // if we wasn't able to refill one of the stream buffers by more than
400 schoenebeck 554 // CONFIG_STREAM_MIN_REFILL_SIZE we'll send the disk thread to sleep later
401     if (pStreams[i]->ReadAhead(read_amount) > CONFIG_STREAM_MIN_REFILL_SIZE) this->IsIdle = false;
402 schoenebeck 53 }
403     }
404     }
405    
406     /// Handle Generator
407     Stream::Handle DiskThread::CreateHandle() {
408     static uint32_t counter = 0;
409     if (counter == 0xffffffff) counter = 1; // we use '0' as 'invalid handle' only, so we skip 0
410     else counter++;
411     return counter;
412     }
413    
414     /// order ID Generator
415     Stream::OrderID_t DiskThread::CreateOrderID() {
416 senkov 329 static Stream::OrderID_t counter(0);
417 schoenebeck 554 for (int i = 0; i < CONFIG_MAX_STREAMS; i++) {
418     if (counter == CONFIG_MAX_STREAMS) counter = 1; // we use '0' as 'invalid order' only, so we skip 0
419 schoenebeck 53 else counter++;
420     if (!pCreatedStreams[counter]) {
421     pCreatedStreams[counter] = SLOT_RESERVED; // mark this slot as reserved
422     return counter; // found empty slot
423     }
424     }
425     return 0; // no free slot
426     }
427    
428    
429    
430     // *********** C functions **************
431     // *
432    
433     /**
434     * This is the comparison function the qsort algo uses to determine if a value is
435     * bigger than another one or special in our case; if the writespace of a stream
436     * is bigger than another one.
437     */
438     int CompareStreamWriteSpace(const void* A, const void* B) {
439     Stream* a = *(Stream**) A;
440     Stream* b = *(Stream**) B;
441     return b->GetWriteSpace() - a->GetWriteSpace();
442     }
443    
444     }} // namespace LinuxSampler::gig

  ViewVC Help
Powered by ViewVC