/[svn]/linuxsampler/trunk/src/engines/common/DiskThreadBase.h
ViewVC logotype

Contents of /linuxsampler/trunk/src/engines/common/DiskThreadBase.h

Parent Directory Parent Directory | Revision Log Revision Log


Revision 2506 - (show annotations) (download) (as text)
Sun Jan 12 11:27:05 2014 UTC (10 years, 3 months ago) by schoenebeck
File MIME type: text/x-c++hdr
File size: 27427 byte(s)
* Bugfix: only process the latest MIDI program change event.

1 /***************************************************************************
2 * *
3 * LinuxSampler - modular, streaming capable sampler *
4 * *
5 * Copyright (C) 2003, 2004 by Benno Senoner and Christian Schoenebeck *
6 * Copyright (C) 2005 - 2008 Christian Schoenebeck *
7 * Copyright (C) 2009 - 2012 Christian Schoenebeck and Grigor Iliev *
8 * Copyright (C) 2013 - 2014 Christian Schoenebeck and Andreas Persson *
9 * *
10 * This program is free software; you can redistribute it and/or modify *
11 * it under the terms of the GNU General Public License as published by *
12 * the Free Software Foundation; either version 2 of the License, or *
13 * (at your option) any later version. *
14 * *
15 * This program is distributed in the hope that it will be useful, *
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
18 * GNU General Public License for more details. *
19 * *
20 * You should have received a copy of the GNU General Public License *
21 * along with this program; if not, write to the Free Software *
22 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, *
23 * MA 02111-1307 USA *
24 ***************************************************************************/
25
26 #ifndef __LS_DISKTHREADBASE_H__
27 #define __LS_DISKTHREADBASE_H__
28
29 #include "StreamBase.h"
30 #include "../EngineChannel.h"
31 #include "../InstrumentManagerBase.h"
32
33 #include "../../common/global_private.h"
34
35 #include "../../common/Thread.h"
36 #include "../../common/RingBuffer.h"
37 #include "../../common/atomic.h"
38
39 namespace LinuxSampler {
40
41 int CompareStreamWriteSpace(const void* A, const void* B);
42
43 /** @brief Disk Reader Thread
44 *
45 * The disk reader thread is responsible for periodically refilling
46 * disk streams in parallel to the audio thread's rendering process.
47 *
48 * There is also a function for releasing parts of instruments not
49 * in use anymore (as this is not real time safe, the audio thread
50 * cannot do it directly).
51 */
52 template <class R /* Resource */, class IM /* Instrument Manager */>
53 class DiskThreadBase : public Thread {
54 private:
55 // Private Types
56 struct create_command_t {
57 Stream::OrderID_t OrderID;
58 Stream::Handle hStream;
59 Stream::reference_t* pStreamRef;
60 R* pRegion;
61 unsigned long SampleOffset;
62 bool DoLoop;
63 };
64 struct delete_command_t {
65 Stream* pStream;
66 Stream::Handle hStream;
67 Stream::OrderID_t OrderID;
68 bool bNotify;
69 };
70 struct program_change_command_t {
71 uint32_t Program;
72 EngineChannel* pEngineChannel;
73 };
74 // Attributes
75 bool IsIdle;
76 uint Streams;
77 RingBuffer<create_command_t,false>* CreationQueue; ///< Contains commands to create streams
78 RingBuffer<delete_command_t,false>* DeletionQueue; ///< Contains commands to delete streams
79 RingBuffer<delete_command_t,false>* GhostQueue; ///< Contains handles to streams that are not used anymore and weren't deletable immediately
80 RingBuffer<Stream::Handle,false> DeletionNotificationQueue; ///< In case the original sender requested a notification for its stream deletion order, this queue will receive the handle of the respective stream once actually be deleted by the disk thread.
81 RingBuffer<R*,false>* DeleteRegionQueue; ///< Contains dimension regions that are not used anymore and should be handed back to the instrument resource manager
82 RingBuffer<program_change_command_t,false> ProgramChangeQueue; ///< Contains requests for MIDI program change
83 unsigned int RefillStreamsPerRun; ///< How many streams should be refilled in each loop run
84 Stream** pStreams; ///< Contains all disk streams (whether used or unused)
85 Stream** pCreatedStreams; ///< This is where the voice (audio thread) picks up it's meanwhile hopefully created disk stream.
86 static Stream* SLOT_RESERVED; ///< This value is used to mark an entry in pCreatedStreams[] as reserved.
87
88 // Methods
89
90 void CreateStream(create_command_t& Command) {
91 // search for unused stream
92 Stream* newstream = NULL;
93 for (int i = Streams - 1; i >= 0; i--) {
94 if (pStreams[i]->GetState() == Stream::state_unused) {
95 newstream = pStreams[i];
96 break;
97 }
98 }
99 if (!newstream) {
100 std::cerr << "No unused stream found (OrderID:" << Command.OrderID;
101 std::cerr << ") - report if this happens, this is a bug!\n" << std::flush;
102 return;
103 }
104 LaunchStream(newstream, Command.hStream, Command.pStreamRef, Command.pRegion, Command.SampleOffset, Command.DoLoop);
105 dmsg(4,("new Stream launched by disk thread (OrderID:%d,StreamHandle:%d)\n", Command.OrderID, Command.hStream));
106 if (pCreatedStreams[Command.OrderID] != SLOT_RESERVED) {
107 std::cerr << "DiskThread: Slot " << Command.OrderID << " already occupied! Please report this!\n" << std::flush;
108 newstream->Kill();
109 return;
110 }
111 pCreatedStreams[Command.OrderID] = newstream;
112 }
113
114 void DeleteStream(delete_command_t& Command) {
115 if (Command.pStream) {
116 Command.pStream->Kill();
117 if (Command.bNotify) DeletionNotificationQueue.push(&Command.hStream);
118 }
119 else { // the stream wasn't created by disk thread or picked up by audio thread yet
120
121 // if stream was created but not picked up yet
122 Stream* pStream = pCreatedStreams[Command.OrderID];
123 if (pStream && pStream != SLOT_RESERVED) {
124 pStream->Kill();
125 pCreatedStreams[Command.OrderID] = NULL; // free slot for new order
126 // if original sender requested a notification, let him know now
127 if (Command.bNotify)
128 DeletionNotificationQueue.push(&Command.hStream);
129 return;
130 }
131
132 // the stream was not created yet
133 if (GhostQueue->write_space() > 0) {
134 GhostQueue->push(&Command);
135 } else { // error, queue full
136 if (Command.bNotify) {
137 dmsg(1,("DiskThread: GhostQueue full! (might lead to dead lock with instrument editor!)\n"));
138 } else {
139 dmsg(1,("DiskThread: GhostQueue full!\n"));
140 }
141 }
142 }
143 }
144
145 void RefillStreams() {
146 // sort the streams by most empty stream
147 qsort(pStreams, Streams, sizeof(Stream*), CompareStreamWriteSpace);
148
149 // refill the most empty streams
150 for (uint i = 0; i < RefillStreamsPerRun; i++) {
151 if (pStreams[i]->GetState() == Stream::state_active) {
152
153 //float filledpercentage = (float) pStreams[i]->GetReadSpace() / 131072.0 * 100.0;
154 //dmsg(("\nbuffer fill: %.1f%\n", filledpercentage));
155
156 int writespace = pStreams[i]->GetWriteSpaceToEnd();
157 if (writespace == 0) break;
158
159 int capped_writespace = writespace;
160 // if there is too much buffer space available then cut the read/write
161 // size to CONFIG_STREAM_MAX_REFILL_SIZE which is by default 65536 samples = 256KBytes
162 if (writespace > CONFIG_STREAM_MAX_REFILL_SIZE) capped_writespace = CONFIG_STREAM_MAX_REFILL_SIZE;
163
164 // adjust the amount to read in order to ensure that the buffer wraps correctly
165 int read_amount = pStreams[i]->AdjustWriteSpaceToAvoidBoundary(writespace, capped_writespace);
166 // if we wasn't able to refill one of the stream buffers by more than
167 // CONFIG_STREAM_MIN_REFILL_SIZE we'll send the disk thread to sleep later
168 if (pStreams[i]->ReadAhead(read_amount) > CONFIG_STREAM_MIN_REFILL_SIZE) this->IsIdle = false;
169 }
170 }
171 }
172
173 Stream::Handle CreateHandle() {
174 static uint32_t counter = 0;
175 if (counter == 0xffffffff) counter = 1; // we use '0' as 'invalid handle' only, so we skip 0
176 else counter++;
177 return counter;
178 }
179
180 Stream::OrderID_t CreateOrderID() {
181 static Stream::OrderID_t counter(0);
182 for (int i = 0; i < Streams; i++) {
183 if (counter == Streams) counter = 1; // we use '0' as 'invalid order' only, so we skip 0
184 else counter++;
185 if (!pCreatedStreams[counter]) {
186 pCreatedStreams[counter] = SLOT_RESERVED; // mark this slot as reserved
187 return counter; // found empty slot
188 }
189 }
190 return 0; // no free slot
191 }
192
193 atomic_t ActiveStreamCount;
194 public:
195 // Methods
196 DiskThreadBase(int MaxStreams, uint BufferWrapElements, IM* pInstruments) :
197 Thread(true, false, 1, -2),
198 pInstruments(pInstruments),
199 DeletionNotificationQueue(4*MaxStreams),
200 ProgramChangeQueue(512)
201 {
202 CreationQueue = new RingBuffer<create_command_t,false>(4*MaxStreams);
203 DeletionQueue = new RingBuffer<delete_command_t,false>(4*MaxStreams);
204 GhostQueue = new RingBuffer<delete_command_t,false>(MaxStreams);
205 DeleteRegionQueue = new RingBuffer<R*,false>(4*MaxStreams);
206 pStreams = new Stream*[MaxStreams];
207 pCreatedStreams = new Stream*[MaxStreams + 1];
208 Streams = MaxStreams;
209 RefillStreamsPerRun = CONFIG_REFILL_STREAMS_PER_RUN;
210
211 for (int i = 1; i <= MaxStreams; i++) {
212 pCreatedStreams[i] = NULL;
213 }
214 ActiveStreamCountMax = 0;
215 }
216
217 virtual ~DiskThreadBase() {
218 for (int i = 0; i < Streams; i++) {
219 if (pStreams[i]) delete pStreams[i];
220 }
221 if (CreationQueue) delete CreationQueue;
222 if (DeletionQueue) delete DeletionQueue;
223 if (GhostQueue) delete GhostQueue;
224 if (DeleteRegionQueue) delete DeleteRegionQueue;
225 if (pStreams) delete[] pStreams;
226 if (pCreatedStreams) delete[] pCreatedStreams;
227 }
228
229
230 // #########################################################################
231 // # Foreign Thread Section
232 // # (following code intended to be interface for audio thread)
233
234 /**
235 * Suspend disk thread, kill all active streams, clear all queues and the
236 * pickup array and reset all streams. Call this method to bring everything
237 * in the disk thread to day one. If the disk thread was running, it will be
238 * respawned right after everything was reset.
239 */
240 void Reset() {
241 bool running = this->IsRunning();
242 if (running) this->StopThread();
243 for (int i = 0; i < Streams; i++) {
244 pStreams[i]->Kill();
245 }
246 for (int i = 1; i <= Streams; i++) {
247 pCreatedStreams[i] = NULL;
248 }
249 GhostQueue->init();
250 CreationQueue->init();
251 DeletionQueue->init();
252 DeletionNotificationQueue.init();
253
254 // make sure that all DimensionRegions are released
255 while (DeleteRegionQueue->read_space() > 0) {
256 R* pRgn;
257 DeleteRegionQueue->pop(&pRgn);
258 pInstruments->HandBackRegion(pRgn);
259 }
260 DeleteRegionQueue->init();
261 SetActiveStreamCount(0);
262 ActiveStreamCountMax = 0;
263 if (running) this->StartThread(); // start thread only if it was running before
264 }
265
266 String GetBufferFillBytes() {
267 bool activestreams = false;
268 std::stringstream ss;
269 for (uint i = 0; i < this->Streams; i++) {
270 if (pStreams[i]->GetState() == Stream::state_unused) continue;
271 uint bufferfill = pStreams[i]->GetReadSpace() * sizeof(sample_t);
272 uint streamid = (uint) pStreams[i]->GetHandle();
273 if (!streamid) continue;
274
275 if (activestreams) ss << ",[" << streamid << ']' << bufferfill;
276 else {
277 ss << '[' << streamid << ']' << bufferfill;
278 activestreams = true;
279 }
280 }
281 return ss.str();
282 }
283
284 String GetBufferFillPercentage() {
285 bool activestreams = false;
286 std::stringstream ss;
287 for (uint i = 0; i < this->Streams; i++) {
288 if (pStreams[i]->GetState() == Stream::state_unused) continue;
289 uint bufferfill = (uint) ((float) pStreams[i]->GetReadSpace() / (float) CONFIG_STREAM_BUFFER_SIZE * 100);
290 uint streamid = (uint) pStreams[i]->GetHandle();
291 if (!streamid) continue;
292
293 if (activestreams) ss << ",[" << streamid << ']' << bufferfill << '%';
294 else {
295 ss << '[' << streamid << ']' << bufferfill;
296 activestreams = true;
297 }
298 }
299 return ss.str();
300 }
301
302 /**
303 * Returns -1 if command queue or pickup pool is full, 0 on success (will be
304 * called by audio thread within the voice class).
305 */
306 int OrderNewStream(Stream::reference_t* pStreamRef, R* pRegion, unsigned long SampleOffset, bool DoLoop) {
307 dmsg(4,("Disk Thread: new stream ordered\n"));
308 if (CreationQueue->write_space() < 1) {
309 dmsg(1,("DiskThread: Order queue full!\n"));
310 return -1;
311 }
312
313 const Stream::OrderID_t newOrder = CreateOrderID();
314 if (!newOrder) {
315 dmsg(1,("DiskThread: there was no free slot\n"));
316 return -1; // there was no free slot
317 }
318
319 pStreamRef->State = Stream::state_active;
320 pStreamRef->OrderID = newOrder;
321 pStreamRef->hStream = CreateHandle();
322 pStreamRef->pStream = NULL; // a stream has to be activated by the disk thread first
323
324 create_command_t cmd;
325 cmd.OrderID = pStreamRef->OrderID;
326 cmd.hStream = pStreamRef->hStream;
327 cmd.pStreamRef = pStreamRef;
328 cmd.pRegion = pRegion;
329 cmd.SampleOffset = SampleOffset;
330 cmd.DoLoop = DoLoop;
331
332 CreationQueue->push(&cmd);
333 return 0;
334 }
335
336 /**
337 * Request the disk thread to delete the given disk stream. This method
338 * will return immediately, thus it won't block until the respective voice
339 * was actually deleted. (Called by audio thread within the Voice class).
340 *
341 * @param pStreamRef - stream that shall be deleted
342 * @param bRequestNotification - set to true in case you want to receive a
343 * notification once the stream has actually
344 * been deleted
345 * @returns 0 on success, -1 if command queue is full
346 * @see AskForDeletedStream()
347 */
348 int OrderDeletionOfStream(Stream::reference_t* pStreamRef, bool bRequestNotification = false) {
349 dmsg(4,("Disk Thread: stream deletion ordered\n"));
350 if (DeletionQueue->write_space() < 1) {
351 dmsg(1,("DiskThread: Deletion queue full!\n"));
352 return -1;
353 }
354
355 delete_command_t cmd;
356 cmd.pStream = pStreamRef->pStream;
357 cmd.hStream = pStreamRef->hStream;
358 cmd.OrderID = pStreamRef->OrderID;
359 cmd.bNotify = bRequestNotification;
360
361 DeletionQueue->push(&cmd);
362 return 0;
363 }
364
365 /**
366 * Tell the disk thread to release a dimension region that belongs
367 * to an instrument which isn't loaded anymore. The disk thread
368 * will hand back the dimension region to the instrument resource
369 * manager. (OrderDeletionOfDimreg is called from the audio thread
370 * when a voice dies.)
371 */
372 int OrderDeletionOfRegion(R* pReg) {
373 dmsg(4,("Disk Thread: dimreg deletion ordered\n"));
374 if (DeleteRegionQueue->write_space() < 1) {
375 dmsg(1,("DiskThread: DeleteRegion queue full!\n"));
376 return -1;
377 }
378 DeleteRegionQueue->push(&pReg);
379 return 0;
380 }
381
382 /**
383 * Tell the disk thread to do a program change on the specified
384 * EngineChannel.
385 */
386 int OrderProgramChange(uint32_t Program, EngineChannel* pEngineChannel) {
387 program_change_command_t cmd;
388 cmd.Program = Program;
389 cmd.pEngineChannel = pEngineChannel;
390
391 dmsg(4,("Disk Thread: program change ordered\n"));
392 if (ProgramChangeQueue.write_space() < 1) {
393 dmsg(1,("DiskThread: ProgramChange queue full!\n"));
394 return -1;
395 }
396 ProgramChangeQueue.push(&cmd);
397 return 0;
398 }
399
400 /**
401 * Returns the pointer to a disk stream if the ordered disk stream
402 * represented by the \a StreamOrderID was already activated by the disk
403 * thread, returns NULL otherwise. If the call was successful, thus if it
404 * returned a valid stream pointer, the caller has to the store that pointer
405 * by himself, because it's not possible to call this method again with the
406 * same used order ID; this method is just intended for picking up an ordered
407 * disk stream. This method will usually be called by the voice class (within
408 * the audio thread).
409 *
410 * @param StreamOrderID - ID previously returned by OrderNewStream()
411 * @returns pointer to created stream object, NULL otherwise
412 */
413 Stream* AskForCreatedStream(Stream::OrderID_t StreamOrderID) {
414 dmsg(4,("Disk Thread: been asked if stream already created, OrderID=%x ", StreamOrderID));
415 Stream* pStream = pCreatedStreams[StreamOrderID];
416 if (pStream && pStream != SLOT_RESERVED) {
417 dmsg(4,("(yes created)\n"));
418 pCreatedStreams[StreamOrderID] = NULL; // free the slot for a new order
419 return pStream;
420 }
421 dmsg(4,("(no not yet created)\n"));
422 return NULL;
423 }
424
425 /**
426 * In case the original sender requested a notification with his stream
427 * deletion order, he can use this method to poll if the respective stream
428 * has actually been deleted.
429 *
430 * @returns handle / identifier of the deleted stream, or
431 * Stream::INVALID_HANDLE if no notification present
432 */
433 Stream::Handle AskForDeletedStream() {
434 if (DeletionNotificationQueue.read_space()) {
435 Stream::Handle hStream;
436 DeletionNotificationQueue.pop(&hStream);
437 return hStream;
438 } else return Stream::INVALID_HANDLE; // no notification received yet
439 }
440
441 // the number of streams currently in usage
442 // printed on the console the main thread (along with the active voice count)
443 uint GetActiveStreamCount() { return atomic_read(&ActiveStreamCount); }
444 void SetActiveStreamCount(uint Streams) { atomic_set(&ActiveStreamCount, Streams); }
445 int ActiveStreamCountMax;
446
447 protected:
448 IM* pInstruments; ///< The instrument resource manager of the engine that is using this disk thread. Used by the dimension region deletion feature.
449
450 // #########################################################################
451 // # Disk Thread Only Section
452 // # (following code should only be executed by the disk thread)
453
454 // Implementation of virtual method from class Thread
455 int Main() {
456 dmsg(3,("Disk thread running\n"));
457 while (true) {
458 #if !defined(WIN32)
459 pthread_testcancel(); // mandatory for OSX
460 #endif
461 #if CONFIG_PTHREAD_TESTCANCEL
462 TestCancel();
463 #endif
464 IsIdle = true; // will be set to false if a stream got filled
465
466 // if there are ghost streams, delete them
467 for (int i = 0; i < GhostQueue->read_space(); i++) { //FIXME: unefficient
468 delete_command_t ghostStream;
469 GhostQueue->pop(&ghostStream);
470 bool found = false;
471 for (int i = 0; i < this->Streams; i++) {
472 if (pStreams[i]->GetHandle() == ghostStream.hStream) {
473 pStreams[i]->Kill();
474 found = true;
475 // if original sender requested a notification, let him know now
476 if (ghostStream.bNotify)
477 DeletionNotificationQueue.push(&ghostStream.hStream);
478 break;
479 }
480 }
481 if (!found) GhostQueue->push(&ghostStream); // put ghost stream handle back to the queue
482 }
483
484 // if there are creation commands, create new streams
485 while (Stream::UnusedStreams > 0 && CreationQueue->read_space() > 0) {
486 create_command_t command;
487 CreationQueue->pop(&command);
488 CreateStream(command);
489 }
490
491 // if there are deletion commands, delete those streams
492 while (Stream::UnusedStreams < Stream::TotalStreams && DeletionQueue->read_space() > 0) {
493 delete_command_t command;
494 DeletionQueue->pop(&command);
495 DeleteStream(command);
496 }
497
498 // release DimensionRegions that belong to instruments
499 // that are no longer loaded
500 while (DeleteRegionQueue->read_space() > 0) {
501 R* pRgn;
502 DeleteRegionQueue->pop(&pRgn);
503 pInstruments->HandBackRegion(pRgn);
504 }
505
506 // perform MIDI program change commands
507 if (ProgramChangeQueue.read_space() > 0) {
508 program_change_command_t cmd;
509 // skip all old ones, only process the latest one
510 do {
511 ProgramChangeQueue.pop(&cmd);
512 } while (ProgramChangeQueue.read_space() > 0);
513 cmd.pEngineChannel->ExecuteProgramChange(cmd.Program);
514 }
515
516 RefillStreams(); // refill the most empty streams
517
518 // if nothing was done during this iteration (eg no streambuffer
519 // filled with data) then sleep for 30ms
520 if (IsIdle) usleep(30000);
521
522 int streamsInUsage = 0;
523 for (int i = Streams - 1; i >= 0; i--) {
524 if (pStreams[i]->GetState() != Stream::state_unused) streamsInUsage++;
525 }
526 SetActiveStreamCount(streamsInUsage);
527 if (streamsInUsage > ActiveStreamCountMax) ActiveStreamCountMax = streamsInUsage;
528 }
529
530 return EXIT_FAILURE;
531 }
532
533 virtual Stream* CreateStream(long BufferSize, uint BufferWrapElements) = 0;
534
535 void CreateAllStreams(int MaxStreams, uint BufferWrapElements) {
536 for (int i = 0; i < MaxStreams; i++) {
537 pStreams[i] = CreateStream(CONFIG_STREAM_BUFFER_SIZE, BufferWrapElements);
538 }
539 }
540
541 virtual void LaunchStream (
542 Stream* pStream,
543 Stream::Handle hStream,
544 Stream::reference_t* pExportReference,
545 R* pRgn,
546 unsigned long SampleOffset,
547 bool DoLoop
548 ) = 0;
549
550 friend class Stream;
551 };
552 } // namespace LinuxSampler
553
554 #endif // __LS_DISKTHREADBASE_H__

  ViewVC Help
Powered by ViewVC