/[svn]/linuxsampler/trunk/src/engines/gig/DiskThread.cpp
ViewVC logotype

Contents of /linuxsampler/trunk/src/engines/gig/DiskThread.cpp

Parent Directory Parent Directory | Revision Log Revision Log


Revision 1321 - (show annotations) (download)
Tue Sep 4 01:12:49 2007 UTC (16 years, 7 months ago) by schoenebeck
File size: 19188 byte(s)
* added highly experimental code for synchronizing instrument editors
  hosted in the sampler's process to safely edit instruments while playing
  without a crash (hopefully) by either suspending single regions wherever
  possible or - if unavoidable - whole engine(s)
* disk thread: queue sizes are now proportional to CONFIG_MAX_STREAMS
  instead of fix values
* removed legacy Makefiles in meanwhile deleted src/lib directory and its
  subdirectories
* bumped version to 0.4.0.7cvs

1 /***************************************************************************
2 * *
3 * LinuxSampler - modular, streaming capable sampler *
4 * *
5 * Copyright (C) 2003, 2004 by Benno Senoner and Christian Schoenebeck *
6 * Copyright (C) 2005 - 2007 Christian Schoenebeck *
7 * *
8 * This program is free software; you can redistribute it and/or modify *
9 * it under the terms of the GNU General Public License as published by *
10 * the Free Software Foundation; either version 2 of the License, or *
11 * (at your option) any later version. *
12 * *
13 * This program is distributed in the hope that it will be useful, *
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
16 * GNU General Public License for more details. *
17 * *
18 * You should have received a copy of the GNU General Public License *
19 * along with this program; if not, write to the Free Software *
20 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, *
21 * MA 02111-1307 USA *
22 ***************************************************************************/
23
24 #include <sstream>
25
26 #include "DiskThread.h"
27
28 namespace LinuxSampler { namespace gig {
29
30 // *********** DiskThread **************
31 // *
32
33
34 // just a placeholder to mark a cell in the pickup array as 'reserved'
35 Stream* DiskThread::SLOT_RESERVED = (Stream*) &SLOT_RESERVED;
36
37
38 // #########################################################################
39 // # Foreign Thread Section
40 // # (following code intended to be interface for audio thread)
41
42
43 /**
44 * Suspend disk thread, kill all active streams, clear all queues and the
45 * pickup array and reset all streams. Call this method to bring everything
46 * in the disk thread to day one. If the disk thread was running, it will be
47 * respawned right after everything was reset.
48 */
49 void DiskThread::Reset() {
50 bool running = this->IsRunning();
51 if (running) this->StopThread();
52 for (int i = 0; i < CONFIG_MAX_STREAMS; i++) {
53 pStreams[i]->Kill();
54 }
55 for (int i = 1; i <= CONFIG_MAX_STREAMS; i++) {
56 pCreatedStreams[i] = NULL;
57 }
58 GhostQueue->init();
59 CreationQueue->init();
60 DeletionQueue->init();
61 DeletionNotificationQueue.init();
62 DeleteDimregQueue->init();
63 ActiveStreamCount = 0;
64 ActiveStreamCountMax = 0;
65 if (running) this->StartThread(); // start thread only if it was running before
66 }
67
68 String DiskThread::GetBufferFillBytes() {
69 bool activestreams = false;
70 std::stringstream ss;
71 for (uint i = 0; i < this->Streams; i++) {
72 if (pStreams[i]->GetState() == Stream::state_unused) continue;
73 uint bufferfill = pStreams[i]->GetReadSpace() * sizeof(sample_t);
74 uint streamid = (uint) pStreams[i]->GetHandle();
75 if (!streamid) continue;
76
77 if (activestreams) ss << ",[" << streamid << ']' << bufferfill;
78 else {
79 ss << '[' << streamid << ']' << bufferfill;
80 activestreams = true;
81 }
82 }
83 return ss.str();
84 }
85
86 String DiskThread::GetBufferFillPercentage() {
87 bool activestreams = false;
88 std::stringstream ss;
89 for (uint i = 0; i < this->Streams; i++) {
90 if (pStreams[i]->GetState() == Stream::state_unused) continue;
91 uint bufferfill = (uint) ((float) pStreams[i]->GetReadSpace() / (float) CONFIG_STREAM_BUFFER_SIZE * 100);
92 uint streamid = (uint) pStreams[i]->GetHandle();
93 if (!streamid) continue;
94
95 if (activestreams) ss << ",[" << streamid << ']' << bufferfill << '%';
96 else {
97 ss << '[' << streamid << ']' << bufferfill;
98 activestreams = true;
99 }
100 }
101 return ss.str();
102 }
103
104 /**
105 * Returns -1 if command queue or pickup pool is full, 0 on success (will be
106 * called by audio thread within the voice class).
107 */
108 int DiskThread::OrderNewStream(Stream::reference_t* pStreamRef, ::gig::DimensionRegion* pDimRgn, unsigned long SampleOffset, bool DoLoop) {
109 dmsg(4,("Disk Thread: new stream ordered\n"));
110 if (CreationQueue->write_space() < 1) {
111 dmsg(1,("DiskThread: Order queue full!\n"));
112 return -1;
113 }
114
115 const Stream::OrderID_t newOrder = CreateOrderID();
116 if (!newOrder) {
117 dmsg(1,("DiskThread: there was no free slot\n"));
118 return -1; // there was no free slot
119 }
120
121 pStreamRef->State = Stream::state_active;
122 pStreamRef->OrderID = newOrder;
123 pStreamRef->hStream = CreateHandle();
124 pStreamRef->pStream = NULL; // a stream has to be activated by the disk thread first
125
126 create_command_t cmd;
127 cmd.OrderID = pStreamRef->OrderID;
128 cmd.hStream = pStreamRef->hStream;
129 cmd.pStreamRef = pStreamRef;
130 cmd.pDimRgn = pDimRgn;
131 cmd.SampleOffset = SampleOffset;
132 cmd.DoLoop = DoLoop;
133
134 CreationQueue->push(&cmd);
135 return 0;
136 }
137
138 /**
139 * Request the disk thread to delete the given disk stream. This method
140 * will return immediately, thus it won't block until the respective voice
141 * was actually deleted. (Called by audio thread within the Voice class).
142 *
143 * @param pStreamRef - stream that shall be deleted
144 * @param bRequestNotification - set to true in case you want to receive a
145 * notification once the stream has actually
146 * been deleted
147 * @returns 0 on success, -1 if command queue is full
148 * @see AskForDeletedStream()
149 */
150 int DiskThread::OrderDeletionOfStream(Stream::reference_t* pStreamRef, bool bRequestNotification) {
151 dmsg(4,("Disk Thread: stream deletion ordered\n"));
152 if (DeletionQueue->write_space() < 1) {
153 dmsg(1,("DiskThread: Deletion queue full!\n"));
154 return -1;
155 }
156
157 delete_command_t cmd;
158 cmd.pStream = pStreamRef->pStream;
159 cmd.hStream = pStreamRef->hStream;
160 cmd.OrderID = pStreamRef->OrderID;
161 cmd.bNotify = bRequestNotification;
162
163 DeletionQueue->push(&cmd);
164 return 0;
165 }
166
167 /**
168 * Tell the disk thread to release a dimension region that belongs
169 * to an instrument which isn't loaded anymore. The disk thread
170 * will hand back the dimension region to the instrument resource
171 * manager. (OrderDeletionOfDimreg is called from the audio thread
172 * when a voice dies.)
173 */
174 int DiskThread::OrderDeletionOfDimreg(::gig::DimensionRegion* dimreg) {
175 dmsg(4,("Disk Thread: dimreg deletion ordered\n"));
176 if (DeleteDimregQueue->write_space() < 1) {
177 dmsg(1,("DiskThread: DeleteDimreg queue full!\n"));
178 return -1;
179 }
180 DeleteDimregQueue->push(&dimreg);
181 return 0;
182 }
183
184 /**
185 * Returns the pointer to a disk stream if the ordered disk stream
186 * represented by the \a StreamOrderID was already activated by the disk
187 * thread, returns NULL otherwise. If the call was successful, thus if it
188 * returned a valid stream pointer, the caller has to the store that pointer
189 * by himself, because it's not possible to call this method again with the
190 * same used order ID; this method is just intended for picking up an ordered
191 * disk stream. This method will usually be called by the voice class (within
192 * the audio thread).
193 *
194 * @param StreamOrderID - ID previously returned by OrderNewStream()
195 * @returns pointer to created stream object, NULL otherwise
196 */
197 Stream* DiskThread::AskForCreatedStream(Stream::OrderID_t StreamOrderID) {
198 dmsg(4,("Disk Thread: been asked if stream already created, OrderID=%x ", StreamOrderID));
199 Stream* pStream = pCreatedStreams[StreamOrderID];
200 if (pStream && pStream != SLOT_RESERVED) {
201 dmsg(4,("(yes created)\n"));
202 pCreatedStreams[StreamOrderID] = NULL; // free the slot for a new order
203 return pStream;
204 }
205 dmsg(4,("(no not yet created)\n"));
206 return NULL;
207 }
208
209 /**
210 * In case the original sender requested a notification with his stream
211 * deletion order, he can use this method to poll if the respective stream
212 * has actually been deleted.
213 *
214 * @returns handle / identifier of the deleted stream, or
215 * Stream::INVALID_HANDLE if no notification present
216 */
217 Stream::Handle DiskThread::AskForDeletedStream() {
218 if (DeletionNotificationQueue.read_space()) {
219 Stream::Handle hStream;
220 DeletionNotificationQueue.pop(&hStream);
221 return hStream;
222 } else return Stream::INVALID_HANDLE; // no notification received yet
223 }
224
225
226
227 // #########################################################################
228 // # Disk Thread Only Section
229 // # (following code should only be executed by the disk thread)
230
231
232 DiskThread::DiskThread(uint BufferWrapElements, InstrumentResourceManager* pInstruments) :
233 Thread(true, false, 1, -2),
234 pInstruments(pInstruments),
235 DeletionNotificationQueue(4*CONFIG_MAX_STREAMS)
236 {
237 DecompressionBuffer = ::gig::Sample::CreateDecompressionBuffer(CONFIG_STREAM_MAX_REFILL_SIZE);
238 CreationQueue = new RingBuffer<create_command_t,false>(4*CONFIG_MAX_STREAMS);
239 DeletionQueue = new RingBuffer<delete_command_t,false>(4*CONFIG_MAX_STREAMS);
240 GhostQueue = new RingBuffer<delete_command_t,false>(CONFIG_MAX_STREAMS);
241 DeleteDimregQueue = new RingBuffer< ::gig::DimensionRegion*,false>(4*CONFIG_MAX_STREAMS);
242 Streams = CONFIG_MAX_STREAMS;
243 RefillStreamsPerRun = CONFIG_REFILL_STREAMS_PER_RUN;
244 for (int i = 0; i < CONFIG_MAX_STREAMS; i++) {
245 pStreams[i] = new Stream(&DecompressionBuffer, CONFIG_STREAM_BUFFER_SIZE, BufferWrapElements); // 131072 sample words
246 }
247 for (int i = 1; i <= CONFIG_MAX_STREAMS; i++) {
248 pCreatedStreams[i] = NULL;
249 }
250 ActiveStreamCountMax = 0;
251 }
252
253 DiskThread::~DiskThread() {
254 for (int i = 0; i < CONFIG_MAX_STREAMS; i++) {
255 if (pStreams[i]) delete pStreams[i];
256 }
257 if (CreationQueue) delete CreationQueue;
258 if (DeletionQueue) delete DeletionQueue;
259 if (GhostQueue) delete GhostQueue;
260 if (DeleteDimregQueue) delete DeleteDimregQueue;
261 ::gig::Sample::DestroyDecompressionBuffer(DecompressionBuffer);
262 }
263
264 int DiskThread::Main() {
265 dmsg(3,("Disk thread running\n"));
266 while (true) {
267 pthread_testcancel(); // mandatory for OSX
268 IsIdle = true; // will be set to false if a stream got filled
269
270 // if there are ghost streams, delete them
271 for (int i = 0; i < GhostQueue->read_space(); i++) { //FIXME: unefficient
272 delete_command_t ghostStream;
273 GhostQueue->pop(&ghostStream);
274 bool found = false;
275 for (int i = 0; i < this->Streams; i++) {
276 if (pStreams[i]->GetHandle() == ghostStream.hStream) {
277 pStreams[i]->Kill();
278 found = true;
279 // if original sender requested a notification, let him know now
280 if (ghostStream.bNotify)
281 DeletionNotificationQueue.push(&ghostStream.hStream);
282 break;
283 }
284 }
285 if (!found) GhostQueue->push(&ghostStream); // put ghost stream handle back to the queue
286 }
287
288 // if there are creation commands, create new streams
289 while (Stream::UnusedStreams > 0 && CreationQueue->read_space() > 0) {
290 create_command_t command;
291 CreationQueue->pop(&command);
292 CreateStream(command);
293 }
294
295 // if there are deletion commands, delete those streams
296 while (Stream::UnusedStreams < Stream::TotalStreams && DeletionQueue->read_space() > 0) {
297 delete_command_t command;
298 DeletionQueue->pop(&command);
299 DeleteStream(command);
300 }
301
302 // release DimensionRegions that belong to instruments
303 // that are no longer loaded
304 while (DeleteDimregQueue->read_space() > 0) {
305 ::gig::DimensionRegion* dimreg;
306 DeleteDimregQueue->pop(&dimreg);
307 pInstruments->HandBackDimReg(dimreg);
308 }
309
310 RefillStreams(); // refill the most empty streams
311
312 // if nothing was done during this iteration (eg no streambuffer
313 // filled with data) then sleep for 30ms
314 if (IsIdle) usleep(30000);
315
316 int streamsInUsage = 0;
317 for (int i = Streams - 1; i >= 0; i--) {
318 if (pStreams[i]->GetState() != Stream::state_unused) streamsInUsage++;
319 }
320 ActiveStreamCount = streamsInUsage;
321 if (streamsInUsage > ActiveStreamCountMax) ActiveStreamCountMax = streamsInUsage;
322 }
323
324 return EXIT_FAILURE;
325 }
326
327 void DiskThread::CreateStream(create_command_t& Command) {
328 // search for unused stream
329 Stream* newstream = NULL;
330 for (int i = Streams - 1; i >= 0; i--) {
331 if (pStreams[i]->GetState() == Stream::state_unused) {
332 newstream = pStreams[i];
333 break;
334 }
335 }
336 if (!newstream) {
337 std::cerr << "No unused stream found (OrderID:" << Command.OrderID << ") - report if this happens, this is a bug!\n" << std::flush;
338 return;
339 }
340 newstream->Launch(Command.hStream, Command.pStreamRef, Command.pDimRgn, Command.SampleOffset, Command.DoLoop);
341 dmsg(4,("new Stream launched by disk thread (OrderID:%d,StreamHandle:%d)\n", Command.OrderID, Command.hStream));
342 if (pCreatedStreams[Command.OrderID] != SLOT_RESERVED) {
343 std::cerr << "DiskThread: Slot " << Command.OrderID << " already occupied! Please report this!\n" << std::flush;
344 newstream->Kill();
345 return;
346 }
347 pCreatedStreams[Command.OrderID] = newstream;
348 }
349
350 void DiskThread::DeleteStream(delete_command_t& Command) {
351 if (Command.pStream) Command.pStream->Kill();
352 else { // the stream wasn't created by disk thread or picked up by audio thread yet
353
354 // if stream was created but not picked up yet
355 Stream* pStream = pCreatedStreams[Command.OrderID];
356 if (pStream && pStream != SLOT_RESERVED) {
357 pStream->Kill();
358 pCreatedStreams[Command.OrderID] = NULL; // free slot for new order
359 // if original sender requested a notification, let him know now
360 if (Command.bNotify)
361 DeletionNotificationQueue.push(&Command.hStream);
362 return;
363 }
364
365 // the stream was not created yet
366 if (GhostQueue->write_space() > 0) {
367 GhostQueue->push(&Command);
368 } else { // error, queue full
369 if (Command.bNotify) {
370 dmsg(1,("DiskThread: GhostQueue full! (might lead to dead lock with instrument editor!)\n"));
371 } else {
372 dmsg(1,("DiskThread: GhostQueue full!\n"));
373 }
374 }
375 }
376 }
377
378 void DiskThread::RefillStreams() {
379 // sort the streams by most empty stream
380 qsort(pStreams, Streams, sizeof(Stream*), CompareStreamWriteSpace);
381
382 // refill the most empty streams
383 for (uint i = 0; i < RefillStreamsPerRun; i++) {
384 if (pStreams[i]->GetState() == Stream::state_active) {
385
386 //float filledpercentage = (float) pStreams[i]->GetReadSpace() / 131072.0 * 100.0;
387 //dmsg(("\nbuffer fill: %.1f%\n", filledpercentage));
388
389 int writespace = pStreams[i]->GetWriteSpaceToEnd();
390 if (writespace == 0) break;
391
392 int capped_writespace = writespace;
393 // if there is too much buffer space available then cut the read/write
394 // size to CONFIG_STREAM_MAX_REFILL_SIZE which is by default 65536 samples = 256KBytes
395 if (writespace > CONFIG_STREAM_MAX_REFILL_SIZE) capped_writespace = CONFIG_STREAM_MAX_REFILL_SIZE;
396
397 // adjust the amount to read in order to ensure that the buffer wraps correctly
398 int read_amount = pStreams[i]->AdjustWriteSpaceToAvoidBoundary(writespace, capped_writespace);
399 // if we wasn't able to refill one of the stream buffers by more than
400 // CONFIG_STREAM_MIN_REFILL_SIZE we'll send the disk thread to sleep later
401 if (pStreams[i]->ReadAhead(read_amount) > CONFIG_STREAM_MIN_REFILL_SIZE) this->IsIdle = false;
402 }
403 }
404 }
405
406 /// Handle Generator
407 Stream::Handle DiskThread::CreateHandle() {
408 static uint32_t counter = 0;
409 if (counter == 0xffffffff) counter = 1; // we use '0' as 'invalid handle' only, so we skip 0
410 else counter++;
411 return counter;
412 }
413
414 /// order ID Generator
415 Stream::OrderID_t DiskThread::CreateOrderID() {
416 static Stream::OrderID_t counter(0);
417 for (int i = 0; i < CONFIG_MAX_STREAMS; i++) {
418 if (counter == CONFIG_MAX_STREAMS) counter = 1; // we use '0' as 'invalid order' only, so we skip 0
419 else counter++;
420 if (!pCreatedStreams[counter]) {
421 pCreatedStreams[counter] = SLOT_RESERVED; // mark this slot as reserved
422 return counter; // found empty slot
423 }
424 }
425 return 0; // no free slot
426 }
427
428
429
430 // *********** C functions **************
431 // *
432
433 /**
434 * This is the comparison function the qsort algo uses to determine if a value is
435 * bigger than another one or special in our case; if the writespace of a stream
436 * is bigger than another one.
437 */
438 int CompareStreamWriteSpace(const void* A, const void* B) {
439 Stream* a = *(Stream**) A;
440 Stream* b = *(Stream**) B;
441 return b->GetWriteSpace() - a->GetWriteSpace();
442 }
443
444 }} // namespace LinuxSampler::gig

  ViewVC Help
Powered by ViewVC