/[svn]/linuxsampler/trunk/src/engines/common/DiskThreadBase.h
ViewVC logotype

Contents of /linuxsampler/trunk/src/engines/common/DiskThreadBase.h

Parent Directory Parent Directory | Revision Log Revision Log


Revision 3766 - (show annotations) (download) (as text)
Mon Apr 6 12:41:49 2020 UTC (4 years ago) by schoenebeck
File MIME type: text/x-c++hdr
File size: 28370 byte(s)
Fixed deadlocks (e.g. when restarting engines).

* Individual thread implementations (e.g. disk thread, etc.):
  Disable thread cancellation on critical sections, e.g. when holding
  mutex locks, to prevent deadlocks if thread is stopped and/or
  restarted.

* Added TestCancel() calls to thread implementations if missing.

* No need to wrap Thread::TestCancel() calls into
  CONFIG_PTHREAD_TESTCANCEL macro conditions (since TestCancel() is
  already a stub on systems which don't have pthread_testcancel()
  available).

* If compiled for debugging: give each thread a human readable name
  to simplify debugging of multi-threading issues.

* DiskThreadBase: TestCancel() and pthread_testcancel() calls are
  per-se redundant, so only call TestCancel().

* Added missing override keywords to silent compiler warnings.

* Bumped version (2.1.1.svn54).

1 /***************************************************************************
2 * *
3 * LinuxSampler - modular, streaming capable sampler *
4 * *
5 * Copyright (C) 2003, 2004 by Benno Senoner and Christian Schoenebeck *
6 * Copyright (C) 2005 - 2020 Christian Schoenebeck *
7 * Copyright (C) 2009 - 2012 Grigor Iliev *
8 * Copyright (C) 2013 - 2016 Andreas Persson *
9 * *
10 * This program is free software; you can redistribute it and/or modify *
11 * it under the terms of the GNU General Public License as published by *
12 * the Free Software Foundation; either version 2 of the License, or *
13 * (at your option) any later version. *
14 * *
15 * This program is distributed in the hope that it will be useful, *
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
18 * GNU General Public License for more details. *
19 * *
20 * You should have received a copy of the GNU General Public License *
21 * along with this program; if not, write to the Free Software *
22 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, *
23 * MA 02111-1307 USA *
24 ***************************************************************************/
25
26 #ifndef __LS_DISKTHREADBASE_H__
27 #define __LS_DISKTHREADBASE_H__
28
29 #include <map>
30
31 #include "StreamBase.h"
32 #include "../EngineChannel.h"
33 #include "../InstrumentManagerBase.h"
34
35 #include "../../common/global_private.h"
36
37 #include "../../common/Thread.h"
38 #include "../../common/RingBuffer.h"
39 #include "../../common/atomic.h"
40
41 namespace LinuxSampler {
42
43 int CompareStreamWriteSpace(const void* A, const void* B);
44
45 /** @brief Disk Reader Thread
46 *
47 * The disk reader thread is responsible for periodically refilling
48 * disk streams in parallel to the audio thread's rendering process.
49 *
50 * There is also a function for releasing parts of instruments not
51 * in use anymore (as this is not real time safe, the audio thread
52 * cannot do it directly).
53 */
54 template <class R /* Resource */, class IM /* Instrument Manager */>
55 class DiskThreadBase : public Thread {
56 private:
57 // Private Types
58 struct create_command_t {
59 Stream::OrderID_t OrderID;
60 Stream::Handle hStream;
61 Stream::reference_t* pStreamRef;
62 R* pRegion;
63 unsigned long SampleOffset;
64 bool DoLoop;
65 };
66 struct delete_command_t {
67 Stream* pStream;
68 Stream::Handle hStream;
69 Stream::OrderID_t OrderID;
70 bool bNotify;
71 };
72 struct program_change_command_t {
73 uint32_t Program;
74 EngineChannel* pEngineChannel;
75 };
76 // Attributes
77 bool IsIdle;
78 uint Streams;
79 RingBuffer<create_command_t,false>* CreationQueue; ///< Contains commands to create streams
80 RingBuffer<delete_command_t,false>* DeletionQueue; ///< Contains commands to delete streams
81 RingBuffer<delete_command_t,false>* GhostQueue; ///< Contains handles to streams that are not used anymore and weren't deletable immediately
82 RingBuffer<Stream::Handle,false> DeletionNotificationQueue; ///< In case the original sender requested a notification for its stream deletion order, this queue will receive the handle of the respective stream once actually be deleted by the disk thread.
83 RingBuffer<R*,false>* DeleteRegionQueue; ///< Contains dimension regions that are not used anymore and should be handed back to the instrument resource manager
84 RingBuffer<program_change_command_t,false> ProgramChangeQueue; ///< Contains requests for MIDI program change
85 unsigned int RefillStreamsPerRun; ///< How many streams should be refilled in each loop run
86 Stream** pStreams; ///< Contains all disk streams (whether used or unused)
87 Stream** pCreatedStreams; ///< This is where the voice (audio thread) picks up it's meanwhile hopefully created disk stream.
88 static Stream* SLOT_RESERVED; ///< This value is used to mark an entry in pCreatedStreams[] as reserved.
89
90 // Methods
91
92 void CreateStream(create_command_t& Command) {
93 // search for unused stream
94 Stream* newstream = NULL;
95 for (int i = Streams - 1; i >= 0; i--) {
96 if (pStreams[i]->GetState() == Stream::state_unused) {
97 newstream = pStreams[i];
98 break;
99 }
100 }
101 if (!newstream) {
102 std::cerr << "No unused stream found (OrderID:" << Command.OrderID;
103 std::cerr << ") - report if this happens, this is a bug!\n" << std::flush;
104 return;
105 }
106 LaunchStream(newstream, Command.hStream, Command.pStreamRef, Command.pRegion, Command.SampleOffset, Command.DoLoop);
107 dmsg(4,("new Stream launched by disk thread (OrderID:%d,StreamHandle:%d)\n", Command.OrderID, Command.hStream));
108 if (pCreatedStreams[Command.OrderID] != SLOT_RESERVED) {
109 std::cerr << "DiskThread: Slot " << Command.OrderID << " already occupied! Please report this!\n" << std::flush;
110 newstream->Kill();
111 return;
112 }
113 pCreatedStreams[Command.OrderID] = newstream;
114 }
115
116 void DeleteStream(delete_command_t& Command) {
117 if (Command.pStream) {
118 Command.pStream->Kill();
119 if (Command.bNotify) DeletionNotificationQueue.push(&Command.hStream);
120 }
121 else { // the stream wasn't created by disk thread or picked up by audio thread yet
122
123 // if stream was created but not picked up yet
124 Stream* pStream = pCreatedStreams[Command.OrderID];
125 if (pStream && pStream != SLOT_RESERVED) {
126 pStream->Kill();
127 pCreatedStreams[Command.OrderID] = NULL; // free slot for new order
128 // if original sender requested a notification, let him know now
129 if (Command.bNotify)
130 DeletionNotificationQueue.push(&Command.hStream);
131 return;
132 }
133
134 // the stream was not created yet
135 if (GhostQueue->write_space() > 0) {
136 GhostQueue->push(&Command);
137 } else { // error, queue full
138 if (Command.bNotify) {
139 dmsg(1,("DiskThread: GhostQueue full! (might lead to dead lock with instrument editor!)\n"));
140 } else {
141 dmsg(1,("DiskThread: GhostQueue full!\n"));
142 }
143 }
144 }
145 }
146
147 void RefillStreams() {
148 // sort the streams by most empty stream
149 qsort(pStreams, Streams, sizeof(Stream*), CompareStreamWriteSpace);
150
151 // refill the most empty streams
152 for (uint i = 0; i < RefillStreamsPerRun; i++) {
153 if (pStreams[i]->GetState() == Stream::state_active) {
154
155 //float filledpercentage = (float) pStreams[i]->GetReadSpace() / 131072.0 * 100.0;
156 //dmsg(("\nbuffer fill: %.1f%\n", filledpercentage));
157
158 int writespace = pStreams[i]->GetWriteSpaceToEnd();
159 if (writespace == 0) break;
160
161 int capped_writespace = writespace;
162 // if there is too much buffer space available then cut the read/write
163 // size to CONFIG_STREAM_MAX_REFILL_SIZE which is by default 65536 samples = 256KBytes
164 if (writespace > CONFIG_STREAM_MAX_REFILL_SIZE) capped_writespace = CONFIG_STREAM_MAX_REFILL_SIZE;
165
166 // adjust the amount to read in order to ensure that the buffer wraps correctly
167 int read_amount = pStreams[i]->AdjustWriteSpaceToAvoidBoundary(writespace, capped_writespace);
168 // if we wasn't able to refill one of the stream buffers by more than
169 // CONFIG_STREAM_MIN_REFILL_SIZE we'll send the disk thread to sleep later
170 if (pStreams[i]->ReadAhead(read_amount) > CONFIG_STREAM_MIN_REFILL_SIZE) this->IsIdle = false;
171 }
172 }
173 }
174
175 Stream::Handle CreateHandle() {
176 static uint32_t counter = 0;
177 if (counter == 0xffffffff) counter = 1; // we use '0' as 'invalid handle' only, so we skip 0
178 else counter++;
179 return counter;
180 }
181
182 Stream::OrderID_t CreateOrderID() {
183 static Stream::OrderID_t counter(0);
184 for (int i = 0; i < Streams; i++) {
185 if (counter == Streams) counter = 1; // we use '0' as 'invalid order' only, so we skip 0
186 else counter++;
187 if (!pCreatedStreams[counter]) {
188 pCreatedStreams[counter] = SLOT_RESERVED; // mark this slot as reserved
189 return counter; // found empty slot
190 }
191 }
192 return 0; // no free slot
193 }
194
195 atomic_t ActiveStreamCount;
196 public:
197 // Methods
198 DiskThreadBase(int MaxStreams, uint BufferWrapElements, IM* pInstruments) :
199 Thread(true, false, 1, -2),
200 DeletionNotificationQueue(4*MaxStreams),
201 ProgramChangeQueue(512),
202 pInstruments(pInstruments)
203 {
204 CreationQueue = new RingBuffer<create_command_t,false>(4*MaxStreams);
205 DeletionQueue = new RingBuffer<delete_command_t,false>(4*MaxStreams);
206 GhostQueue = new RingBuffer<delete_command_t,false>(MaxStreams);
207 DeleteRegionQueue = new RingBuffer<R*,false>(4*MaxStreams);
208 pStreams = new Stream*[MaxStreams];
209 pCreatedStreams = new Stream*[MaxStreams + 1];
210 Streams = MaxStreams;
211 RefillStreamsPerRun = CONFIG_REFILL_STREAMS_PER_RUN;
212
213 for (int i = 1; i <= MaxStreams; i++) {
214 pCreatedStreams[i] = NULL;
215 }
216 ActiveStreamCountMax = 0;
217 }
218
219 virtual ~DiskThreadBase() {
220 for (int i = 0; i < Streams; i++) {
221 if (pStreams[i]) delete pStreams[i];
222 }
223 if (CreationQueue) delete CreationQueue;
224 if (DeletionQueue) delete DeletionQueue;
225 if (GhostQueue) delete GhostQueue;
226 if (DeleteRegionQueue) delete DeleteRegionQueue;
227 if (pStreams) delete[] pStreams;
228 if (pCreatedStreams) delete[] pCreatedStreams;
229 }
230
231
232 // #########################################################################
233 // # Foreign Thread Section
234 // # (following code intended to be interface for audio thread)
235
236 /**
237 * Suspend disk thread, kill all active streams, clear all queues and the
238 * pickup array and reset all streams. Call this method to bring everything
239 * in the disk thread to day one. If the disk thread was running, it will be
240 * respawned right after everything was reset.
241 */
242 void Reset() {
243 bool running = this->IsRunning();
244 if (running) this->StopThread();
245 for (int i = 0; i < Streams; i++) {
246 pStreams[i]->Kill();
247 }
248 for (int i = 1; i <= Streams; i++) {
249 pCreatedStreams[i] = NULL;
250 }
251 GhostQueue->init();
252 CreationQueue->init();
253 DeletionQueue->init();
254 DeletionNotificationQueue.init();
255
256 // make sure that all DimensionRegions are released
257 while (DeleteRegionQueue->read_space() > 0) {
258 R* pRgn;
259 DeleteRegionQueue->pop(&pRgn);
260 pInstruments->HandBackRegion(pRgn);
261 }
262 DeleteRegionQueue->init();
263 SetActiveStreamCount(0);
264 ActiveStreamCountMax = 0;
265 if (running) this->StartThread(); // start thread only if it was running before
266 }
267
268 String GetBufferFillBytes() {
269 bool activestreams = false;
270 std::stringstream ss;
271 for (uint i = 0; i < this->Streams; i++) {
272 if (pStreams[i]->GetState() == Stream::state_unused) continue;
273 uint bufferfill = pStreams[i]->GetReadSpace() * sizeof(sample_t);
274 uint streamid = (uint) pStreams[i]->GetHandle();
275 if (!streamid) continue;
276
277 if (activestreams) ss << ",[" << streamid << ']' << bufferfill;
278 else {
279 ss << '[' << streamid << ']' << bufferfill;
280 activestreams = true;
281 }
282 }
283 return ss.str();
284 }
285
286 String GetBufferFillPercentage() {
287 bool activestreams = false;
288 std::stringstream ss;
289 for (uint i = 0; i < this->Streams; i++) {
290 if (pStreams[i]->GetState() == Stream::state_unused) continue;
291 uint bufferfill = (uint) ((float) pStreams[i]->GetReadSpace() / (float) CONFIG_STREAM_BUFFER_SIZE * 100);
292 uint streamid = (uint) pStreams[i]->GetHandle();
293 if (!streamid) continue;
294
295 if (activestreams) ss << ",[" << streamid << ']' << bufferfill << '%';
296 else {
297 ss << '[' << streamid << ']' << bufferfill;
298 activestreams = true;
299 }
300 }
301 return ss.str();
302 }
303
304 /**
305 * Returns -1 if command queue or pickup pool is full, 0 on success (will be
306 * called by audio thread within the voice class).
307 */
308 int OrderNewStream(Stream::reference_t* pStreamRef, R* pRegion, unsigned long SampleOffset, bool DoLoop) {
309 dmsg(4,("Disk Thread: new stream ordered\n"));
310 if (CreationQueue->write_space() < 1) {
311 dmsg(1,("DiskThread: Order queue full!\n"));
312 return -1;
313 }
314
315 const Stream::OrderID_t newOrder = CreateOrderID();
316 if (!newOrder) {
317 dmsg(1,("DiskThread: there was no free slot\n"));
318 return -1; // there was no free slot
319 }
320
321 pStreamRef->State = Stream::state_active;
322 pStreamRef->OrderID = newOrder;
323 pStreamRef->hStream = CreateHandle();
324 pStreamRef->pStream = NULL; // a stream has to be activated by the disk thread first
325
326 create_command_t cmd;
327 cmd.OrderID = pStreamRef->OrderID;
328 cmd.hStream = pStreamRef->hStream;
329 cmd.pStreamRef = pStreamRef;
330 cmd.pRegion = pRegion;
331 cmd.SampleOffset = SampleOffset;
332 cmd.DoLoop = DoLoop;
333
334 CreationQueue->push(&cmd);
335 return 0;
336 }
337
338 /**
339 * Request the disk thread to delete the given disk stream. This method
340 * will return immediately, thus it won't block until the respective voice
341 * was actually deleted. (Called by audio thread within the Voice class).
342 *
343 * @param pStreamRef - stream that shall be deleted
344 * @param bRequestNotification - set to true in case you want to receive a
345 * notification once the stream has actually
346 * been deleted
347 * @returns 0 on success, -1 if command queue is full
348 * @see AskForDeletedStream()
349 */
350 int OrderDeletionOfStream(Stream::reference_t* pStreamRef, bool bRequestNotification = false) {
351 dmsg(4,("Disk Thread: stream deletion ordered\n"));
352 if (DeletionQueue->write_space() < 1) {
353 dmsg(1,("DiskThread: Deletion queue full!\n"));
354 return -1;
355 }
356
357 delete_command_t cmd;
358 cmd.pStream = pStreamRef->pStream;
359 cmd.hStream = pStreamRef->hStream;
360 cmd.OrderID = pStreamRef->OrderID;
361 cmd.bNotify = bRequestNotification;
362
363 DeletionQueue->push(&cmd);
364 return 0;
365 }
366
367 /**
368 * Tell the disk thread to release a dimension region that belongs
369 * to an instrument which isn't loaded anymore. The disk thread
370 * will hand back the dimension region to the instrument resource
371 * manager. (OrderDeletionOfDimreg is called from the audio thread
372 * when a voice dies.)
373 */
374 int OrderDeletionOfRegion(R* pReg) {
375 dmsg(4,("Disk Thread: dimreg deletion ordered\n"));
376 if (DeleteRegionQueue->write_space() < 1) {
377 dmsg(1,("DiskThread: DeleteRegion queue full!\n"));
378 return -1;
379 }
380 DeleteRegionQueue->push(&pReg);
381 return 0;
382 }
383
384 /**
385 * Tell the disk thread to do a program change on the specified
386 * EngineChannel.
387 */
388 int OrderProgramChange(uint32_t Program, EngineChannel* pEngineChannel) {
389 program_change_command_t cmd;
390 cmd.Program = Program;
391 cmd.pEngineChannel = pEngineChannel;
392
393 dmsg(4,("Disk Thread: program change ordered\n"));
394 if (ProgramChangeQueue.write_space() < 1) {
395 dmsg(1,("DiskThread: ProgramChange queue full!\n"));
396 return -1;
397 }
398 ProgramChangeQueue.push(&cmd);
399 return 0;
400 }
401
402 /**
403 * Returns the pointer to a disk stream if the ordered disk stream
404 * represented by the \a StreamOrderID was already activated by the disk
405 * thread, returns NULL otherwise. If the call was successful, thus if it
406 * returned a valid stream pointer, the caller has to the store that pointer
407 * by himself, because it's not possible to call this method again with the
408 * same used order ID; this method is just intended for picking up an ordered
409 * disk stream. This method will usually be called by the voice class (within
410 * the audio thread).
411 *
412 * @param StreamOrderID - ID previously returned by OrderNewStream()
413 * @returns pointer to created stream object, NULL otherwise
414 */
415 Stream* AskForCreatedStream(Stream::OrderID_t StreamOrderID) {
416 dmsg(4,("Disk Thread: been asked if stream already created, OrderID=%x ", StreamOrderID));
417 Stream* pStream = pCreatedStreams[StreamOrderID];
418 if (pStream && pStream != SLOT_RESERVED) {
419 dmsg(4,("(yes created)\n"));
420 pCreatedStreams[StreamOrderID] = NULL; // free the slot for a new order
421 return pStream;
422 }
423 dmsg(4,("(no not yet created)\n"));
424 return NULL;
425 }
426
427 /**
428 * In case the original sender requested a notification with his stream
429 * deletion order, he can use this method to poll if the respective stream
430 * has actually been deleted.
431 *
432 * @returns handle / identifier of the deleted stream, or
433 * Stream::INVALID_HANDLE if no notification present
434 */
435 Stream::Handle AskForDeletedStream() {
436 if (DeletionNotificationQueue.read_space()) {
437 Stream::Handle hStream;
438 DeletionNotificationQueue.pop(&hStream);
439 return hStream;
440 } else return Stream::INVALID_HANDLE; // no notification received yet
441 }
442
443 // the number of streams currently in usage
444 // printed on the console the main thread (along with the active voice count)
445 uint GetActiveStreamCount() { return atomic_read(&ActiveStreamCount); }
446 void SetActiveStreamCount(uint Streams) { atomic_set(&ActiveStreamCount, Streams); }
447 int ActiveStreamCountMax;
448
449 protected:
450 IM* pInstruments; ///< The instrument resource manager of the engine that is using this disk thread. Used by the dimension region deletion feature.
451
452 // #########################################################################
453 // # Disk Thread Only Section
454 // # (following code should only be executed by the disk thread)
455
456 // Implementation of virtual method from class Thread
457 int Main() {
458 dmsg(3,("Disk thread running\n"));
459
460 #if DEBUG
461 Thread::setNameOfCaller("DiskIO");
462 #endif
463
464 while (true) {
465
466 TestCancel();
467
468 IsIdle = true; // will be set to false if a stream got filled
469
470 // prevent disk thread from being cancelled
471 // (e.g. to prevent deadlocks while holding mutex lock(s))
472 pushCancelable(false);
473
474 // if there are ghost streams, delete them
475 for (int i = 0; i < GhostQueue->read_space(); i++) { //FIXME: unefficient
476 delete_command_t ghostStream;
477 GhostQueue->pop(&ghostStream);
478 bool found = false;
479 for (int i = 0; i < this->Streams; i++) {
480 if (pStreams[i]->GetHandle() == ghostStream.hStream) {
481 pStreams[i]->Kill();
482 found = true;
483 // if original sender requested a notification, let him know now
484 if (ghostStream.bNotify)
485 DeletionNotificationQueue.push(&ghostStream.hStream);
486 break;
487 }
488 }
489 if (!found) GhostQueue->push(&ghostStream); // put ghost stream handle back to the queue
490 }
491
492 // if there are creation commands, create new streams
493 while (Stream::UnusedStreams > 0 && CreationQueue->read_space() > 0) {
494 create_command_t command;
495 CreationQueue->pop(&command);
496 CreateStream(command);
497 }
498
499 // if there are deletion commands, delete those streams
500 while (Stream::UnusedStreams < Stream::TotalStreams && DeletionQueue->read_space() > 0) {
501 delete_command_t command;
502 DeletionQueue->pop(&command);
503 DeleteStream(command);
504 }
505
506 // release DimensionRegions that belong to instruments
507 // that are no longer loaded
508 while (DeleteRegionQueue->read_space() > 0) {
509 R* pRgn;
510 DeleteRegionQueue->pop(&pRgn);
511 pInstruments->HandBackRegion(pRgn);
512 }
513
514 // perform MIDI program change commands
515 if (ProgramChangeQueue.read_space() > 0) {
516 program_change_command_t cmd;
517 std::map<EngineChannel*,uint32_t> cmds;
518 // skip all old ones, only process the latest program
519 // change command on each engine channel
520 do {
521 ProgramChangeQueue.pop(&cmd);
522 cmds[cmd.pEngineChannel] = cmd.Program;
523 } while (ProgramChangeQueue.read_space() > 0);
524 // now execute those latest program change commands on
525 // their respective engine channel
526 for (std::map<EngineChannel*,uint32_t>::const_iterator it = cmds.begin();
527 it != cmds.end(); ++it)
528 {
529 it->first->ExecuteProgramChange(it->second);
530 }
531 }
532
533 RefillStreams(); // refill the most empty streams
534
535 int streamsInUsage = 0;
536 for (int i = Streams - 1; i >= 0; i--) {
537 if (pStreams[i]->GetState() != Stream::state_unused) streamsInUsage++;
538 }
539 SetActiveStreamCount(streamsInUsage);
540 if (streamsInUsage > ActiveStreamCountMax) ActiveStreamCountMax = streamsInUsage;
541
542 // now allow disk thread being cancelled again
543 // (since all mutexes are now unlocked and data structures
544 // are at consistent states)
545 popCancelable();
546
547 // if nothing was done during this iteration (i.e. no
548 // stream buffer filled with data) then sleep for 30ms
549 if (IsIdle)
550 usleep(30000); //NOTE: defined as cancellation point by POSIX
551 }
552
553 return EXIT_FAILURE;
554 }
555
556 virtual Stream* CreateStream(long BufferSize, uint BufferWrapElements) = 0;
557
558 void CreateAllStreams(int MaxStreams, uint BufferWrapElements) {
559 for (int i = 0; i < MaxStreams; i++) {
560 pStreams[i] = CreateStream(CONFIG_STREAM_BUFFER_SIZE, BufferWrapElements);
561 }
562 }
563
564 virtual void LaunchStream (
565 Stream* pStream,
566 Stream::Handle hStream,
567 Stream::reference_t* pExportReference,
568 R* pRgn,
569 unsigned long SampleOffset,
570 bool DoLoop
571 ) = 0;
572
573 friend class Stream;
574 };
575 } // namespace LinuxSampler
576
577 #endif // __LS_DISKTHREADBASE_H__

  ViewVC Help
Powered by ViewVC