/[svn]/linuxsampler/trunk/src/engines/common/DiskThreadBase.h
ViewVC logotype

Contents of /linuxsampler/trunk/src/engines/common/DiskThreadBase.h

Parent Directory Parent Directory | Revision Log Revision Log


Revision 2277 - (show annotations) (download) (as text)
Sat Oct 1 08:23:02 2011 UTC (12 years, 6 months ago) by persson
File MIME type: text/x-c++hdr
File size: 27174 byte(s)
* fixed handling of rapid bank select and program change messages sent
  to the same sampler channel (patch from the Open Octave project,
  slightly adjusted)

1 /***************************************************************************
2 * *
3 * LinuxSampler - modular, streaming capable sampler *
4 * *
5 * Copyright (C) 2003, 2004 by Benno Senoner and Christian Schoenebeck *
6 * Copyright (C) 2005 - 2008 Christian Schoenebeck *
7 * Copyright (C) 2009 - 2011 Christian Schoenebeck and Grigor Iliev *
8 * *
9 * This program is free software; you can redistribute it and/or modify *
10 * it under the terms of the GNU General Public License as published by *
11 * the Free Software Foundation; either version 2 of the License, or *
12 * (at your option) any later version. *
13 * *
14 * This program is distributed in the hope that it will be useful, *
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
17 * GNU General Public License for more details. *
18 * *
19 * You should have received a copy of the GNU General Public License *
20 * along with this program; if not, write to the Free Software *
21 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, *
22 * MA 02111-1307 USA *
23 ***************************************************************************/
24
25 #ifndef __LS_DISKTHREADBASE_H__
26 #define __LS_DISKTHREADBASE_H__
27
28 #include "StreamBase.h"
29 #include "../EngineChannel.h"
30 #include "../InstrumentManagerBase.h"
31
32 #include "../../common/global_private.h"
33
34 #include "../../common/Thread.h"
35 #include "../../common/RingBuffer.h"
36 #include "../../common/atomic.h"
37
38 namespace LinuxSampler {
39
40 int CompareStreamWriteSpace(const void* A, const void* B);
41
42 /** @brief Disk Reader Thread
43 *
44 * The disk reader thread is responsible for periodically refilling
45 * disk streams in parallel to the audio thread's rendering process.
46 *
47 * There is also a function for releasing parts of instruments not
48 * in use anymore (as this is not real time safe, the audio thread
49 * cannot do it directly).
50 */
51 template <class R /* Resource */, class IM /* Instrument Manager */>
52 class DiskThreadBase : public Thread {
53 private:
54 // Private Types
55 struct create_command_t {
56 Stream::OrderID_t OrderID;
57 Stream::Handle hStream;
58 Stream::reference_t* pStreamRef;
59 R* pRegion;
60 unsigned long SampleOffset;
61 bool DoLoop;
62 };
63 struct delete_command_t {
64 Stream* pStream;
65 Stream::Handle hStream;
66 Stream::OrderID_t OrderID;
67 bool bNotify;
68 };
69 struct program_change_command_t {
70 uint32_t Program;
71 EngineChannel* pEngineChannel;
72 };
73 // Attributes
74 bool IsIdle;
75 uint Streams;
76 RingBuffer<create_command_t,false>* CreationQueue; ///< Contains commands to create streams
77 RingBuffer<delete_command_t,false>* DeletionQueue; ///< Contains commands to delete streams
78 RingBuffer<delete_command_t,false>* GhostQueue; ///< Contains handles to streams that are not used anymore and weren't deletable immediately
79 RingBuffer<Stream::Handle,false> DeletionNotificationQueue; ///< In case the original sender requested a notification for its stream deletion order, this queue will receive the handle of the respective stream once actually be deleted by the disk thread.
80 RingBuffer<R*,false>* DeleteRegionQueue; ///< Contains dimension regions that are not used anymore and should be handed back to the instrument resource manager
81 RingBuffer<program_change_command_t,false> ProgramChangeQueue; ///< Contains requests for MIDI program change
82 unsigned int RefillStreamsPerRun; ///< How many streams should be refilled in each loop run
83 Stream** pStreams; ///< Contains all disk streams (whether used or unused)
84 Stream** pCreatedStreams; ///< This is where the voice (audio thread) picks up it's meanwhile hopefully created disk stream.
85 static Stream* SLOT_RESERVED; ///< This value is used to mark an entry in pCreatedStreams[] as reserved.
86
87 // Methods
88
89 void CreateStream(create_command_t& Command) {
90 // search for unused stream
91 Stream* newstream = NULL;
92 for (int i = Streams - 1; i >= 0; i--) {
93 if (pStreams[i]->GetState() == Stream::state_unused) {
94 newstream = pStreams[i];
95 break;
96 }
97 }
98 if (!newstream) {
99 std::cerr << "No unused stream found (OrderID:" << Command.OrderID;
100 std::cerr << ") - report if this happens, this is a bug!\n" << std::flush;
101 return;
102 }
103 LaunchStream(newstream, Command.hStream, Command.pStreamRef, Command.pRegion, Command.SampleOffset, Command.DoLoop);
104 dmsg(4,("new Stream launched by disk thread (OrderID:%d,StreamHandle:%d)\n", Command.OrderID, Command.hStream));
105 if (pCreatedStreams[Command.OrderID] != SLOT_RESERVED) {
106 std::cerr << "DiskThread: Slot " << Command.OrderID << " already occupied! Please report this!\n" << std::flush;
107 newstream->Kill();
108 return;
109 }
110 pCreatedStreams[Command.OrderID] = newstream;
111 }
112
113 void DeleteStream(delete_command_t& Command) {
114 if (Command.pStream) {
115 Command.pStream->Kill();
116 if (Command.bNotify) DeletionNotificationQueue.push(&Command.hStream);
117 }
118 else { // the stream wasn't created by disk thread or picked up by audio thread yet
119
120 // if stream was created but not picked up yet
121 Stream* pStream = pCreatedStreams[Command.OrderID];
122 if (pStream && pStream != SLOT_RESERVED) {
123 pStream->Kill();
124 pCreatedStreams[Command.OrderID] = NULL; // free slot for new order
125 // if original sender requested a notification, let him know now
126 if (Command.bNotify)
127 DeletionNotificationQueue.push(&Command.hStream);
128 return;
129 }
130
131 // the stream was not created yet
132 if (GhostQueue->write_space() > 0) {
133 GhostQueue->push(&Command);
134 } else { // error, queue full
135 if (Command.bNotify) {
136 dmsg(1,("DiskThread: GhostQueue full! (might lead to dead lock with instrument editor!)\n"));
137 } else {
138 dmsg(1,("DiskThread: GhostQueue full!\n"));
139 }
140 }
141 }
142 }
143
144 void RefillStreams() {
145 // sort the streams by most empty stream
146 qsort(pStreams, Streams, sizeof(Stream*), CompareStreamWriteSpace);
147
148 // refill the most empty streams
149 for (uint i = 0; i < RefillStreamsPerRun; i++) {
150 if (pStreams[i]->GetState() == Stream::state_active) {
151
152 //float filledpercentage = (float) pStreams[i]->GetReadSpace() / 131072.0 * 100.0;
153 //dmsg(("\nbuffer fill: %.1f%\n", filledpercentage));
154
155 int writespace = pStreams[i]->GetWriteSpaceToEnd();
156 if (writespace == 0) break;
157
158 int capped_writespace = writespace;
159 // if there is too much buffer space available then cut the read/write
160 // size to CONFIG_STREAM_MAX_REFILL_SIZE which is by default 65536 samples = 256KBytes
161 if (writespace > CONFIG_STREAM_MAX_REFILL_SIZE) capped_writespace = CONFIG_STREAM_MAX_REFILL_SIZE;
162
163 // adjust the amount to read in order to ensure that the buffer wraps correctly
164 int read_amount = pStreams[i]->AdjustWriteSpaceToAvoidBoundary(writespace, capped_writespace);
165 // if we wasn't able to refill one of the stream buffers by more than
166 // CONFIG_STREAM_MIN_REFILL_SIZE we'll send the disk thread to sleep later
167 if (pStreams[i]->ReadAhead(read_amount) > CONFIG_STREAM_MIN_REFILL_SIZE) this->IsIdle = false;
168 }
169 }
170 }
171
172 Stream::Handle CreateHandle() {
173 static uint32_t counter = 0;
174 if (counter == 0xffffffff) counter = 1; // we use '0' as 'invalid handle' only, so we skip 0
175 else counter++;
176 return counter;
177 }
178
179 Stream::OrderID_t CreateOrderID() {
180 static Stream::OrderID_t counter(0);
181 for (int i = 0; i < Streams; i++) {
182 if (counter == Streams) counter = 1; // we use '0' as 'invalid order' only, so we skip 0
183 else counter++;
184 if (!pCreatedStreams[counter]) {
185 pCreatedStreams[counter] = SLOT_RESERVED; // mark this slot as reserved
186 return counter; // found empty slot
187 }
188 }
189 return 0; // no free slot
190 }
191
192 atomic_t ActiveStreamCount;
193 public:
194 // Methods
195 DiskThreadBase(int MaxStreams, uint BufferWrapElements, IM* pInstruments) :
196 Thread(true, false, 1, -2),
197 pInstruments(pInstruments),
198 DeletionNotificationQueue(4*MaxStreams),
199 ProgramChangeQueue(512)
200 {
201 CreationQueue = new RingBuffer<create_command_t,false>(4*MaxStreams);
202 DeletionQueue = new RingBuffer<delete_command_t,false>(4*MaxStreams);
203 GhostQueue = new RingBuffer<delete_command_t,false>(MaxStreams);
204 DeleteRegionQueue = new RingBuffer<R*,false>(4*MaxStreams);
205 pStreams = new Stream*[MaxStreams];
206 pCreatedStreams = new Stream*[MaxStreams + 1];
207 Streams = MaxStreams;
208 RefillStreamsPerRun = CONFIG_REFILL_STREAMS_PER_RUN;
209
210 for (int i = 1; i <= MaxStreams; i++) {
211 pCreatedStreams[i] = NULL;
212 }
213 ActiveStreamCountMax = 0;
214 }
215
216 virtual ~DiskThreadBase() {
217 for (int i = 0; i < Streams; i++) {
218 if (pStreams[i]) delete pStreams[i];
219 }
220 if (CreationQueue) delete CreationQueue;
221 if (DeletionQueue) delete DeletionQueue;
222 if (GhostQueue) delete GhostQueue;
223 if (DeleteRegionQueue) delete DeleteRegionQueue;
224 if (pStreams) delete[] pStreams;
225 if (pCreatedStreams) delete[] pCreatedStreams;
226 }
227
228
229 // #########################################################################
230 // # Foreign Thread Section
231 // # (following code intended to be interface for audio thread)
232
233 /**
234 * Suspend disk thread, kill all active streams, clear all queues and the
235 * pickup array and reset all streams. Call this method to bring everything
236 * in the disk thread to day one. If the disk thread was running, it will be
237 * respawned right after everything was reset.
238 */
239 void Reset() {
240 bool running = this->IsRunning();
241 if (running) this->StopThread();
242 for (int i = 0; i < Streams; i++) {
243 pStreams[i]->Kill();
244 }
245 for (int i = 1; i <= Streams; i++) {
246 pCreatedStreams[i] = NULL;
247 }
248 GhostQueue->init();
249 CreationQueue->init();
250 DeletionQueue->init();
251 DeletionNotificationQueue.init();
252
253 // make sure that all DimensionRegions are released
254 while (DeleteRegionQueue->read_space() > 0) {
255 R* pRgn;
256 DeleteRegionQueue->pop(&pRgn);
257 pInstruments->HandBackRegion(pRgn);
258 }
259 DeleteRegionQueue->init();
260 SetActiveStreamCount(0);
261 ActiveStreamCountMax = 0;
262 if (running) this->StartThread(); // start thread only if it was running before
263 }
264
265 String GetBufferFillBytes() {
266 bool activestreams = false;
267 std::stringstream ss;
268 for (uint i = 0; i < this->Streams; i++) {
269 if (pStreams[i]->GetState() == Stream::state_unused) continue;
270 uint bufferfill = pStreams[i]->GetReadSpace() * sizeof(sample_t);
271 uint streamid = (uint) pStreams[i]->GetHandle();
272 if (!streamid) continue;
273
274 if (activestreams) ss << ",[" << streamid << ']' << bufferfill;
275 else {
276 ss << '[' << streamid << ']' << bufferfill;
277 activestreams = true;
278 }
279 }
280 return ss.str();
281 }
282
283 String GetBufferFillPercentage() {
284 bool activestreams = false;
285 std::stringstream ss;
286 for (uint i = 0; i < this->Streams; i++) {
287 if (pStreams[i]->GetState() == Stream::state_unused) continue;
288 uint bufferfill = (uint) ((float) pStreams[i]->GetReadSpace() / (float) CONFIG_STREAM_BUFFER_SIZE * 100);
289 uint streamid = (uint) pStreams[i]->GetHandle();
290 if (!streamid) continue;
291
292 if (activestreams) ss << ",[" << streamid << ']' << bufferfill << '%';
293 else {
294 ss << '[' << streamid << ']' << bufferfill;
295 activestreams = true;
296 }
297 }
298 return ss.str();
299 }
300
301 /**
302 * Returns -1 if command queue or pickup pool is full, 0 on success (will be
303 * called by audio thread within the voice class).
304 */
305 int OrderNewStream(Stream::reference_t* pStreamRef, R* pRegion, unsigned long SampleOffset, bool DoLoop) {
306 dmsg(4,("Disk Thread: new stream ordered\n"));
307 if (CreationQueue->write_space() < 1) {
308 dmsg(1,("DiskThread: Order queue full!\n"));
309 return -1;
310 }
311
312 const Stream::OrderID_t newOrder = CreateOrderID();
313 if (!newOrder) {
314 dmsg(1,("DiskThread: there was no free slot\n"));
315 return -1; // there was no free slot
316 }
317
318 pStreamRef->State = Stream::state_active;
319 pStreamRef->OrderID = newOrder;
320 pStreamRef->hStream = CreateHandle();
321 pStreamRef->pStream = NULL; // a stream has to be activated by the disk thread first
322
323 create_command_t cmd;
324 cmd.OrderID = pStreamRef->OrderID;
325 cmd.hStream = pStreamRef->hStream;
326 cmd.pStreamRef = pStreamRef;
327 cmd.pRegion = pRegion;
328 cmd.SampleOffset = SampleOffset;
329 cmd.DoLoop = DoLoop;
330
331 CreationQueue->push(&cmd);
332 return 0;
333 }
334
335 /**
336 * Request the disk thread to delete the given disk stream. This method
337 * will return immediately, thus it won't block until the respective voice
338 * was actually deleted. (Called by audio thread within the Voice class).
339 *
340 * @param pStreamRef - stream that shall be deleted
341 * @param bRequestNotification - set to true in case you want to receive a
342 * notification once the stream has actually
343 * been deleted
344 * @returns 0 on success, -1 if command queue is full
345 * @see AskForDeletedStream()
346 */
347 int OrderDeletionOfStream(Stream::reference_t* pStreamRef, bool bRequestNotification = false) {
348 dmsg(4,("Disk Thread: stream deletion ordered\n"));
349 if (DeletionQueue->write_space() < 1) {
350 dmsg(1,("DiskThread: Deletion queue full!\n"));
351 return -1;
352 }
353
354 delete_command_t cmd;
355 cmd.pStream = pStreamRef->pStream;
356 cmd.hStream = pStreamRef->hStream;
357 cmd.OrderID = pStreamRef->OrderID;
358 cmd.bNotify = bRequestNotification;
359
360 DeletionQueue->push(&cmd);
361 return 0;
362 }
363
364 /**
365 * Tell the disk thread to release a dimension region that belongs
366 * to an instrument which isn't loaded anymore. The disk thread
367 * will hand back the dimension region to the instrument resource
368 * manager. (OrderDeletionOfDimreg is called from the audio thread
369 * when a voice dies.)
370 */
371 int OrderDeletionOfRegion(R* pReg) {
372 dmsg(4,("Disk Thread: dimreg deletion ordered\n"));
373 if (DeleteRegionQueue->write_space() < 1) {
374 dmsg(1,("DiskThread: DeleteRegion queue full!\n"));
375 return -1;
376 }
377 DeleteRegionQueue->push(&pReg);
378 return 0;
379 }
380
381 /**
382 * Tell the disk thread to do a program change on the specified
383 * EngineChannel.
384 */
385 int OrderProgramChange(uint32_t Program, EngineChannel* pEngineChannel) {
386 program_change_command_t cmd;
387 cmd.Program = Program;
388 cmd.pEngineChannel = pEngineChannel;
389
390 dmsg(4,("Disk Thread: program change ordered\n"));
391 if (ProgramChangeQueue.write_space() < 1) {
392 dmsg(1,("DiskThread: ProgramChange queue full!\n"));
393 return -1;
394 }
395 ProgramChangeQueue.push(&cmd);
396 return 0;
397 }
398
399 /**
400 * Returns the pointer to a disk stream if the ordered disk stream
401 * represented by the \a StreamOrderID was already activated by the disk
402 * thread, returns NULL otherwise. If the call was successful, thus if it
403 * returned a valid stream pointer, the caller has to the store that pointer
404 * by himself, because it's not possible to call this method again with the
405 * same used order ID; this method is just intended for picking up an ordered
406 * disk stream. This method will usually be called by the voice class (within
407 * the audio thread).
408 *
409 * @param StreamOrderID - ID previously returned by OrderNewStream()
410 * @returns pointer to created stream object, NULL otherwise
411 */
412 Stream* AskForCreatedStream(Stream::OrderID_t StreamOrderID) {
413 dmsg(4,("Disk Thread: been asked if stream already created, OrderID=%x ", StreamOrderID));
414 Stream* pStream = pCreatedStreams[StreamOrderID];
415 if (pStream && pStream != SLOT_RESERVED) {
416 dmsg(4,("(yes created)\n"));
417 pCreatedStreams[StreamOrderID] = NULL; // free the slot for a new order
418 return pStream;
419 }
420 dmsg(4,("(no not yet created)\n"));
421 return NULL;
422 }
423
424 /**
425 * In case the original sender requested a notification with his stream
426 * deletion order, he can use this method to poll if the respective stream
427 * has actually been deleted.
428 *
429 * @returns handle / identifier of the deleted stream, or
430 * Stream::INVALID_HANDLE if no notification present
431 */
432 Stream::Handle AskForDeletedStream() {
433 if (DeletionNotificationQueue.read_space()) {
434 Stream::Handle hStream;
435 DeletionNotificationQueue.pop(&hStream);
436 return hStream;
437 } else return Stream::INVALID_HANDLE; // no notification received yet
438 }
439
440 // the number of streams currently in usage
441 // printed on the console the main thread (along with the active voice count)
442 uint GetActiveStreamCount() { return atomic_read(&ActiveStreamCount); }
443 void SetActiveStreamCount(uint Streams) { atomic_set(&ActiveStreamCount, Streams); }
444 int ActiveStreamCountMax;
445
446 protected:
447 IM* pInstruments; ///< The instrument resource manager of the engine that is using this disk thread. Used by the dimension region deletion feature.
448
449 // #########################################################################
450 // # Disk Thread Only Section
451 // # (following code should only be executed by the disk thread)
452
453 // Implementation of virtual method from class Thread
454 int Main() {
455 dmsg(3,("Disk thread running\n"));
456 while (true) {
457 #if !defined(WIN32)
458 pthread_testcancel(); // mandatory for OSX
459 #endif
460 #if CONFIG_PTHREAD_TESTCANCEL
461 TestCancel();
462 #endif
463 IsIdle = true; // will be set to false if a stream got filled
464
465 // if there are ghost streams, delete them
466 for (int i = 0; i < GhostQueue->read_space(); i++) { //FIXME: unefficient
467 delete_command_t ghostStream;
468 GhostQueue->pop(&ghostStream);
469 bool found = false;
470 for (int i = 0; i < this->Streams; i++) {
471 if (pStreams[i]->GetHandle() == ghostStream.hStream) {
472 pStreams[i]->Kill();
473 found = true;
474 // if original sender requested a notification, let him know now
475 if (ghostStream.bNotify)
476 DeletionNotificationQueue.push(&ghostStream.hStream);
477 break;
478 }
479 }
480 if (!found) GhostQueue->push(&ghostStream); // put ghost stream handle back to the queue
481 }
482
483 // if there are creation commands, create new streams
484 while (Stream::UnusedStreams > 0 && CreationQueue->read_space() > 0) {
485 create_command_t command;
486 CreationQueue->pop(&command);
487 CreateStream(command);
488 }
489
490 // if there are deletion commands, delete those streams
491 while (Stream::UnusedStreams < Stream::TotalStreams && DeletionQueue->read_space() > 0) {
492 delete_command_t command;
493 DeletionQueue->pop(&command);
494 DeleteStream(command);
495 }
496
497 // release DimensionRegions that belong to instruments
498 // that are no longer loaded
499 while (DeleteRegionQueue->read_space() > 0) {
500 R* pRgn;
501 DeleteRegionQueue->pop(&pRgn);
502 pInstruments->HandBackRegion(pRgn);
503 }
504
505 // perform MIDI program change commands
506 while (ProgramChangeQueue.read_space() > 0) {
507 program_change_command_t cmd;
508 ProgramChangeQueue.pop(&cmd);
509 cmd.pEngineChannel->ExecuteProgramChange(cmd.Program);
510 }
511 RefillStreams(); // refill the most empty streams
512
513 // if nothing was done during this iteration (eg no streambuffer
514 // filled with data) then sleep for 30ms
515 if (IsIdle) usleep(30000);
516
517 int streamsInUsage = 0;
518 for (int i = Streams - 1; i >= 0; i--) {
519 if (pStreams[i]->GetState() != Stream::state_unused) streamsInUsage++;
520 }
521 SetActiveStreamCount(streamsInUsage);
522 if (streamsInUsage > ActiveStreamCountMax) ActiveStreamCountMax = streamsInUsage;
523 }
524
525 return EXIT_FAILURE;
526 }
527
528 virtual Stream* CreateStream(long BufferSize, uint BufferWrapElements) = 0;
529
530 void CreateAllStreams(int MaxStreams, uint BufferWrapElements) {
531 for (int i = 0; i < MaxStreams; i++) {
532 pStreams[i] = CreateStream(CONFIG_STREAM_BUFFER_SIZE, BufferWrapElements);
533 }
534 }
535
536 virtual void LaunchStream (
537 Stream* pStream,
538 Stream::Handle hStream,
539 Stream::reference_t* pExportReference,
540 R* pRgn,
541 unsigned long SampleOffset,
542 bool DoLoop
543 ) = 0;
544
545 friend class Stream;
546 };
547 } // namespace LinuxSampler
548
549 #endif // __LS_DISKTHREADBASE_H__

  ViewVC Help
Powered by ViewVC