/[svn]/linuxsampler/trunk/src/engines/gig/DiskThread.cpp
ViewVC logotype

Contents of /linuxsampler/trunk/src/engines/gig/DiskThread.cpp

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1924 - (show annotations) (download)
Sun Jun 28 16:43:38 2009 UTC (14 years, 10 months ago) by persson
File size: 20709 byte(s)
* made program change handling in MIDI thread real-time safe by moving
  the logic to a non-RT thread

1 /***************************************************************************
2 * *
3 * LinuxSampler - modular, streaming capable sampler *
4 * *
5 * Copyright (C) 2003, 2004 by Benno Senoner and Christian Schoenebeck *
6 * Copyright (C) 2005 - 2009 Christian Schoenebeck *
7 * *
8 * This program is free software; you can redistribute it and/or modify *
9 * it under the terms of the GNU General Public License as published by *
10 * the Free Software Foundation; either version 2 of the License, or *
11 * (at your option) any later version. *
12 * *
13 * This program is distributed in the hope that it will be useful, *
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
16 * GNU General Public License for more details. *
17 * *
18 * You should have received a copy of the GNU General Public License *
19 * along with this program; if not, write to the Free Software *
20 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, *
21 * MA 02111-1307 USA *
22 ***************************************************************************/
23
24 #include <sstream>
25
26 #include "DiskThread.h"
27
28 namespace LinuxSampler { namespace gig {
29
30 // *********** DiskThread **************
31 // *
32
33
34 // just a placeholder to mark a cell in the pickup array as 'reserved'
35 Stream* DiskThread::SLOT_RESERVED = (Stream*) &SLOT_RESERVED;
36
37
38 // #########################################################################
39 // # Foreign Thread Section
40 // # (following code intended to be interface for audio thread)
41
42
43 /**
44 * Suspend disk thread, kill all active streams, clear all queues and the
45 * pickup array and reset all streams. Call this method to bring everything
46 * in the disk thread to day one. If the disk thread was running, it will be
47 * respawned right after everything was reset.
48 */
49 void DiskThread::Reset() {
50 bool running = this->IsRunning();
51 if (running) this->StopThread();
52 for (int i = 0; i < Streams; i++) {
53 pStreams[i]->Kill();
54 }
55 for (int i = 1; i <= Streams; i++) {
56 pCreatedStreams[i] = NULL;
57 }
58 GhostQueue->init();
59 CreationQueue->init();
60 DeletionQueue->init();
61 DeletionNotificationQueue.init();
62
63 // make sure that all DimensionRegions are released
64 while (DeleteDimregQueue->read_space() > 0) {
65 ::gig::DimensionRegion* dimreg;
66 DeleteDimregQueue->pop(&dimreg);
67 pInstruments->HandBackDimReg(dimreg);
68 }
69 DeleteDimregQueue->init();
70 SetActiveStreamCount(0);
71 ActiveStreamCountMax = 0;
72 if (running) this->StartThread(); // start thread only if it was running before
73 }
74
75 String DiskThread::GetBufferFillBytes() {
76 bool activestreams = false;
77 std::stringstream ss;
78 for (uint i = 0; i < this->Streams; i++) {
79 if (pStreams[i]->GetState() == Stream::state_unused) continue;
80 uint bufferfill = pStreams[i]->GetReadSpace() * sizeof(sample_t);
81 uint streamid = (uint) pStreams[i]->GetHandle();
82 if (!streamid) continue;
83
84 if (activestreams) ss << ",[" << streamid << ']' << bufferfill;
85 else {
86 ss << '[' << streamid << ']' << bufferfill;
87 activestreams = true;
88 }
89 }
90 return ss.str();
91 }
92
93 String DiskThread::GetBufferFillPercentage() {
94 bool activestreams = false;
95 std::stringstream ss;
96 for (uint i = 0; i < this->Streams; i++) {
97 if (pStreams[i]->GetState() == Stream::state_unused) continue;
98 uint bufferfill = (uint) ((float) pStreams[i]->GetReadSpace() / (float) CONFIG_STREAM_BUFFER_SIZE * 100);
99 uint streamid = (uint) pStreams[i]->GetHandle();
100 if (!streamid) continue;
101
102 if (activestreams) ss << ",[" << streamid << ']' << bufferfill << '%';
103 else {
104 ss << '[' << streamid << ']' << bufferfill;
105 activestreams = true;
106 }
107 }
108 return ss.str();
109 }
110
111 /**
112 * Returns -1 if command queue or pickup pool is full, 0 on success (will be
113 * called by audio thread within the voice class).
114 */
115 int DiskThread::OrderNewStream(Stream::reference_t* pStreamRef, ::gig::DimensionRegion* pDimRgn, unsigned long SampleOffset, bool DoLoop) {
116 dmsg(4,("Disk Thread: new stream ordered\n"));
117 if (CreationQueue->write_space() < 1) {
118 dmsg(1,("DiskThread: Order queue full!\n"));
119 return -1;
120 }
121
122 const Stream::OrderID_t newOrder = CreateOrderID();
123 if (!newOrder) {
124 dmsg(1,("DiskThread: there was no free slot\n"));
125 return -1; // there was no free slot
126 }
127
128 pStreamRef->State = Stream::state_active;
129 pStreamRef->OrderID = newOrder;
130 pStreamRef->hStream = CreateHandle();
131 pStreamRef->pStream = NULL; // a stream has to be activated by the disk thread first
132
133 create_command_t cmd;
134 cmd.OrderID = pStreamRef->OrderID;
135 cmd.hStream = pStreamRef->hStream;
136 cmd.pStreamRef = pStreamRef;
137 cmd.pDimRgn = pDimRgn;
138 cmd.SampleOffset = SampleOffset;
139 cmd.DoLoop = DoLoop;
140
141 CreationQueue->push(&cmd);
142 return 0;
143 }
144
145 /**
146 * Request the disk thread to delete the given disk stream. This method
147 * will return immediately, thus it won't block until the respective voice
148 * was actually deleted. (Called by audio thread within the Voice class).
149 *
150 * @param pStreamRef - stream that shall be deleted
151 * @param bRequestNotification - set to true in case you want to receive a
152 * notification once the stream has actually
153 * been deleted
154 * @returns 0 on success, -1 if command queue is full
155 * @see AskForDeletedStream()
156 */
157 int DiskThread::OrderDeletionOfStream(Stream::reference_t* pStreamRef, bool bRequestNotification) {
158 dmsg(4,("Disk Thread: stream deletion ordered\n"));
159 if (DeletionQueue->write_space() < 1) {
160 dmsg(1,("DiskThread: Deletion queue full!\n"));
161 return -1;
162 }
163
164 delete_command_t cmd;
165 cmd.pStream = pStreamRef->pStream;
166 cmd.hStream = pStreamRef->hStream;
167 cmd.OrderID = pStreamRef->OrderID;
168 cmd.bNotify = bRequestNotification;
169
170 DeletionQueue->push(&cmd);
171 return 0;
172 }
173
174 /**
175 * Tell the disk thread to release a dimension region that belongs
176 * to an instrument which isn't loaded anymore. The disk thread
177 * will hand back the dimension region to the instrument resource
178 * manager. (OrderDeletionOfDimreg is called from the audio thread
179 * when a voice dies.)
180 */
181 int DiskThread::OrderDeletionOfDimreg(::gig::DimensionRegion* dimreg) {
182 dmsg(4,("Disk Thread: dimreg deletion ordered\n"));
183 if (DeleteDimregQueue->write_space() < 1) {
184 dmsg(1,("DiskThread: DeleteDimreg queue full!\n"));
185 return -1;
186 }
187 DeleteDimregQueue->push(&dimreg);
188 return 0;
189 }
190
191 /**
192 * Tell the disk thread to do a program change on the specified
193 * EngineChannel.
194 */
195 int DiskThread::OrderProgramChange(uint8_t Program, EngineChannel* pEngineChannel) {
196 program_change_command_t cmd;
197 cmd.Program = Program;
198 cmd.pEngineChannel = pEngineChannel;
199
200 dmsg(4,("Disk Thread: program change ordered\n"));
201 if (ProgramChangeQueue.write_space() < 1) {
202 dmsg(1,("DiskThread: ProgramChange queue full!\n"));
203 return -1;
204 }
205 ProgramChangeQueue.push(&cmd);
206 return 0;
207 }
208
209 /**
210 * Returns the pointer to a disk stream if the ordered disk stream
211 * represented by the \a StreamOrderID was already activated by the disk
212 * thread, returns NULL otherwise. If the call was successful, thus if it
213 * returned a valid stream pointer, the caller has to the store that pointer
214 * by himself, because it's not possible to call this method again with the
215 * same used order ID; this method is just intended for picking up an ordered
216 * disk stream. This method will usually be called by the voice class (within
217 * the audio thread).
218 *
219 * @param StreamOrderID - ID previously returned by OrderNewStream()
220 * @returns pointer to created stream object, NULL otherwise
221 */
222 Stream* DiskThread::AskForCreatedStream(Stream::OrderID_t StreamOrderID) {
223 dmsg(4,("Disk Thread: been asked if stream already created, OrderID=%x ", StreamOrderID));
224 Stream* pStream = pCreatedStreams[StreamOrderID];
225 if (pStream && pStream != SLOT_RESERVED) {
226 dmsg(4,("(yes created)\n"));
227 pCreatedStreams[StreamOrderID] = NULL; // free the slot for a new order
228 return pStream;
229 }
230 dmsg(4,("(no not yet created)\n"));
231 return NULL;
232 }
233
234 /**
235 * In case the original sender requested a notification with his stream
236 * deletion order, he can use this method to poll if the respective stream
237 * has actually been deleted.
238 *
239 * @returns handle / identifier of the deleted stream, or
240 * Stream::INVALID_HANDLE if no notification present
241 */
242 Stream::Handle DiskThread::AskForDeletedStream() {
243 if (DeletionNotificationQueue.read_space()) {
244 Stream::Handle hStream;
245 DeletionNotificationQueue.pop(&hStream);
246 return hStream;
247 } else return Stream::INVALID_HANDLE; // no notification received yet
248 }
249
250
251
252 // #########################################################################
253 // # Disk Thread Only Section
254 // # (following code should only be executed by the disk thread)
255
256
257 DiskThread::DiskThread(int MaxStreams, uint BufferWrapElements, InstrumentResourceManager* pInstruments) :
258 Thread(true, false, 1, -2),
259 pInstruments(pInstruments),
260 DeletionNotificationQueue(4*MaxStreams),
261 ProgramChangeQueue(100)
262 {
263 DecompressionBuffer = ::gig::Sample::CreateDecompressionBuffer(CONFIG_STREAM_MAX_REFILL_SIZE);
264 CreationQueue = new RingBuffer<create_command_t,false>(4*MaxStreams);
265 DeletionQueue = new RingBuffer<delete_command_t,false>(4*MaxStreams);
266 GhostQueue = new RingBuffer<delete_command_t,false>(MaxStreams);
267 DeleteDimregQueue = new RingBuffer< ::gig::DimensionRegion*,false>(4*MaxStreams);
268 pStreams = new Stream*[MaxStreams];
269 pCreatedStreams = new Stream*[MaxStreams + 1];
270 Streams = MaxStreams;
271 RefillStreamsPerRun = CONFIG_REFILL_STREAMS_PER_RUN;
272 for (int i = 0; i < MaxStreams; i++) {
273 pStreams[i] = new Stream(&DecompressionBuffer, CONFIG_STREAM_BUFFER_SIZE, BufferWrapElements); // 131072 sample words
274 }
275 for (int i = 1; i <= MaxStreams; i++) {
276 pCreatedStreams[i] = NULL;
277 }
278 ActiveStreamCountMax = 0;
279 }
280
281 DiskThread::~DiskThread() {
282 for (int i = 0; i < Streams; i++) {
283 if (pStreams[i]) delete pStreams[i];
284 }
285 if (CreationQueue) delete CreationQueue;
286 if (DeletionQueue) delete DeletionQueue;
287 if (GhostQueue) delete GhostQueue;
288 if (DeleteDimregQueue) delete DeleteDimregQueue;
289 if (pStreams) delete[] pStreams;
290 if (pCreatedStreams) delete[] pCreatedStreams;
291
292 ::gig::Sample::DestroyDecompressionBuffer(DecompressionBuffer);
293 }
294
295 int DiskThread::Main() {
296 dmsg(3,("Disk thread running\n"));
297 while (true) {
298 #if !defined(WIN32)
299 pthread_testcancel(); // mandatory for OSX
300 #endif
301 #if CONFIG_PTHREAD_TESTCANCEL
302 TestCancel();
303 #endif
304 IsIdle = true; // will be set to false if a stream got filled
305
306 // if there are ghost streams, delete them
307 for (int i = 0; i < GhostQueue->read_space(); i++) { //FIXME: unefficient
308 delete_command_t ghostStream;
309 GhostQueue->pop(&ghostStream);
310 bool found = false;
311 for (int i = 0; i < this->Streams; i++) {
312 if (pStreams[i]->GetHandle() == ghostStream.hStream) {
313 pStreams[i]->Kill();
314 found = true;
315 // if original sender requested a notification, let him know now
316 if (ghostStream.bNotify)
317 DeletionNotificationQueue.push(&ghostStream.hStream);
318 break;
319 }
320 }
321 if (!found) GhostQueue->push(&ghostStream); // put ghost stream handle back to the queue
322 }
323
324 // if there are creation commands, create new streams
325 while (Stream::UnusedStreams > 0 && CreationQueue->read_space() > 0) {
326 create_command_t command;
327 CreationQueue->pop(&command);
328 CreateStream(command);
329 }
330
331 // if there are deletion commands, delete those streams
332 while (Stream::UnusedStreams < Stream::TotalStreams && DeletionQueue->read_space() > 0) {
333 delete_command_t command;
334 DeletionQueue->pop(&command);
335 DeleteStream(command);
336 }
337
338 // release DimensionRegions that belong to instruments
339 // that are no longer loaded
340 while (DeleteDimregQueue->read_space() > 0) {
341 ::gig::DimensionRegion* dimreg;
342 DeleteDimregQueue->pop(&dimreg);
343 pInstruments->HandBackDimReg(dimreg);
344 }
345
346 // perform MIDI program change commands
347 while (ProgramChangeQueue.read_space() > 0) {
348 program_change_command_t cmd;
349 ProgramChangeQueue.pop(&cmd);
350 cmd.pEngineChannel->ExecuteProgramChange(cmd.Program);
351 }
352 RefillStreams(); // refill the most empty streams
353
354 // if nothing was done during this iteration (eg no streambuffer
355 // filled with data) then sleep for 30ms
356 if (IsIdle) usleep(30000);
357
358 int streamsInUsage = 0;
359 for (int i = Streams - 1; i >= 0; i--) {
360 if (pStreams[i]->GetState() != Stream::state_unused) streamsInUsage++;
361 }
362 SetActiveStreamCount(streamsInUsage);
363 if (streamsInUsage > ActiveStreamCountMax) ActiveStreamCountMax = streamsInUsage;
364 }
365
366 return EXIT_FAILURE;
367 }
368
369 void DiskThread::CreateStream(create_command_t& Command) {
370 // search for unused stream
371 Stream* newstream = NULL;
372 for (int i = Streams - 1; i >= 0; i--) {
373 if (pStreams[i]->GetState() == Stream::state_unused) {
374 newstream = pStreams[i];
375 break;
376 }
377 }
378 if (!newstream) {
379 std::cerr << "No unused stream found (OrderID:" << Command.OrderID << ") - report if this happens, this is a bug!\n" << std::flush;
380 return;
381 }
382 newstream->Launch(Command.hStream, Command.pStreamRef, Command.pDimRgn, Command.SampleOffset, Command.DoLoop);
383 dmsg(4,("new Stream launched by disk thread (OrderID:%d,StreamHandle:%d)\n", Command.OrderID, Command.hStream));
384 if (pCreatedStreams[Command.OrderID] != SLOT_RESERVED) {
385 std::cerr << "DiskThread: Slot " << Command.OrderID << " already occupied! Please report this!\n" << std::flush;
386 newstream->Kill();
387 return;
388 }
389 pCreatedStreams[Command.OrderID] = newstream;
390 }
391
392 void DiskThread::DeleteStream(delete_command_t& Command) {
393 if (Command.pStream) {
394 Command.pStream->Kill();
395 if (Command.bNotify) DeletionNotificationQueue.push(&Command.hStream);
396 }
397 else { // the stream wasn't created by disk thread or picked up by audio thread yet
398
399 // if stream was created but not picked up yet
400 Stream* pStream = pCreatedStreams[Command.OrderID];
401 if (pStream && pStream != SLOT_RESERVED) {
402 pStream->Kill();
403 pCreatedStreams[Command.OrderID] = NULL; // free slot for new order
404 // if original sender requested a notification, let him know now
405 if (Command.bNotify)
406 DeletionNotificationQueue.push(&Command.hStream);
407 return;
408 }
409
410 // the stream was not created yet
411 if (GhostQueue->write_space() > 0) {
412 GhostQueue->push(&Command);
413 } else { // error, queue full
414 if (Command.bNotify) {
415 dmsg(1,("DiskThread: GhostQueue full! (might lead to dead lock with instrument editor!)\n"));
416 } else {
417 dmsg(1,("DiskThread: GhostQueue full!\n"));
418 }
419 }
420 }
421 }
422
423 void DiskThread::RefillStreams() {
424 // sort the streams by most empty stream
425 qsort(pStreams, Streams, sizeof(Stream*), CompareStreamWriteSpace);
426
427 // refill the most empty streams
428 for (uint i = 0; i < RefillStreamsPerRun; i++) {
429 if (pStreams[i]->GetState() == Stream::state_active) {
430
431 //float filledpercentage = (float) pStreams[i]->GetReadSpace() / 131072.0 * 100.0;
432 //dmsg(("\nbuffer fill: %.1f%\n", filledpercentage));
433
434 int writespace = pStreams[i]->GetWriteSpaceToEnd();
435 if (writespace == 0) break;
436
437 int capped_writespace = writespace;
438 // if there is too much buffer space available then cut the read/write
439 // size to CONFIG_STREAM_MAX_REFILL_SIZE which is by default 65536 samples = 256KBytes
440 if (writespace > CONFIG_STREAM_MAX_REFILL_SIZE) capped_writespace = CONFIG_STREAM_MAX_REFILL_SIZE;
441
442 // adjust the amount to read in order to ensure that the buffer wraps correctly
443 int read_amount = pStreams[i]->AdjustWriteSpaceToAvoidBoundary(writespace, capped_writespace);
444 // if we wasn't able to refill one of the stream buffers by more than
445 // CONFIG_STREAM_MIN_REFILL_SIZE we'll send the disk thread to sleep later
446 if (pStreams[i]->ReadAhead(read_amount) > CONFIG_STREAM_MIN_REFILL_SIZE) this->IsIdle = false;
447 }
448 }
449 }
450
451 /// Handle Generator
452 Stream::Handle DiskThread::CreateHandle() {
453 static uint32_t counter = 0;
454 if (counter == 0xffffffff) counter = 1; // we use '0' as 'invalid handle' only, so we skip 0
455 else counter++;
456 return counter;
457 }
458
459 /// order ID Generator
460 Stream::OrderID_t DiskThread::CreateOrderID() {
461 static Stream::OrderID_t counter(0);
462 for (int i = 0; i < Streams; i++) {
463 if (counter == Streams) counter = 1; // we use '0' as 'invalid order' only, so we skip 0
464 else counter++;
465 if (!pCreatedStreams[counter]) {
466 pCreatedStreams[counter] = SLOT_RESERVED; // mark this slot as reserved
467 return counter; // found empty slot
468 }
469 }
470 return 0; // no free slot
471 }
472
473
474
475 // *********** C functions **************
476 // *
477
478 /**
479 * This is the comparison function the qsort algo uses to determine if a value is
480 * bigger than another one or special in our case; if the writespace of a stream
481 * is bigger than another one.
482 */
483 int CompareStreamWriteSpace(const void* A, const void* B) {
484 Stream* a = *(Stream**) A;
485 Stream* b = *(Stream**) B;
486 return b->GetWriteSpace() - a->GetWriteSpace();
487 }
488
489 }} // namespace LinuxSampler::gig

  ViewVC Help
Powered by ViewVC