/[svn]/linuxsampler/trunk/src/engines/gig/DiskThread.cpp
ViewVC logotype

Contents of /linuxsampler/trunk/src/engines/gig/DiskThread.cpp

Parent Directory Parent Directory | Revision Log Revision Log


Revision 554 - (show annotations) (download)
Thu May 19 19:25:14 2005 UTC (18 years, 11 months ago) by schoenebeck
File size: 15852 byte(s)
* All compile time options are now centrally alterable as arguments to the
  ./configure script. All options are C Macros beginning with CONFIG_
  prefix and will be placed into auto generated config.h file.

1 /***************************************************************************
2 * *
3 * LinuxSampler - modular, streaming capable sampler *
4 * *
5 * Copyright (C) 2003, 2004 by Benno Senoner and Christian Schoenebeck *
6 * Copyright (C) 2005 Christian Schoenebeck *
7 * *
8 * This program is free software; you can redistribute it and/or modify *
9 * it under the terms of the GNU General Public License as published by *
10 * the Free Software Foundation; either version 2 of the License, or *
11 * (at your option) any later version. *
12 * *
13 * This program is distributed in the hope that it will be useful, *
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
16 * GNU General Public License for more details. *
17 * *
18 * You should have received a copy of the GNU General Public License *
19 * along with this program; if not, write to the Free Software *
20 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, *
21 * MA 02111-1307 USA *
22 ***************************************************************************/
23
24 #include <sstream>
25
26 #include "DiskThread.h"
27
28 namespace LinuxSampler { namespace gig {
29
30 // *********** DiskThread **************
31 // *
32
33
34 // just a placeholder to mark a cell in the pickup array as 'reserved'
35 Stream* DiskThread::SLOT_RESERVED = (Stream*) &SLOT_RESERVED;
36
37
38 // #########################################################################
39 // # Foreign Thread Section
40 // # (following code intended to be interface for audio thread)
41
42
43 /**
44 * Suspend disk thread, kill all active streams, clear all queues and the
45 * pickup array and reset all streams. Call this method to bring everything
46 * in the disk thread to day one. If the disk thread was running, it will be
47 * respawned right after everything was reset.
48 */
49 void DiskThread::Reset() {
50 bool running = this->IsRunning();
51 if (running) this->StopThread();
52 for (int i = 0; i < CONFIG_MAX_STREAMS; i++) {
53 pStreams[i]->Kill();
54 }
55 for (int i = 1; i <= CONFIG_MAX_STREAMS; i++) {
56 pCreatedStreams[i] = NULL;
57 }
58 GhostQueue->init();
59 CreationQueue->init();
60 DeletionQueue->init();
61 ActiveStreamCount = 0;
62 ActiveStreamCountMax = 0;
63 if (running) this->StartThread(); // start thread only if it was running before
64 }
65
66 String DiskThread::GetBufferFillBytes() {
67 bool activestreams = false;
68 std::stringstream ss;
69 for (uint i = 0; i < this->Streams; i++) {
70 if (pStreams[i]->GetState() == Stream::state_unused) continue;
71 uint bufferfill = pStreams[i]->GetReadSpace() * sizeof(sample_t);
72 uint streamid = (uint) pStreams[i]->GetHandle();
73 if (!streamid) continue;
74
75 if (activestreams) ss << ",[" << streamid << ']' << bufferfill;
76 else {
77 ss << '[' << streamid << ']' << bufferfill;
78 activestreams = true;
79 }
80 }
81 return ss.str();
82 }
83
84 String DiskThread::GetBufferFillPercentage() {
85 bool activestreams = false;
86 std::stringstream ss;
87 for (uint i = 0; i < this->Streams; i++) {
88 if (pStreams[i]->GetState() == Stream::state_unused) continue;
89 uint bufferfill = (uint) ((float) pStreams[i]->GetReadSpace() / (float) CONFIG_STREAM_BUFFER_SIZE * 100);
90 uint streamid = (uint) pStreams[i]->GetHandle();
91 if (!streamid) continue;
92
93 if (activestreams) ss << ",[" << streamid << ']' << bufferfill << '%';
94 else {
95 ss << '[' << streamid << ']' << bufferfill;
96 activestreams = true;
97 }
98 }
99 return ss.str();
100 }
101
102 /**
103 * Returns -1 if command queue or pickup pool is full, 0 on success (will be
104 * called by audio thread within the voice class).
105 */
106 int DiskThread::OrderNewStream(Stream::reference_t* pStreamRef, ::gig::Sample* pSample, unsigned long SampleOffset, bool DoLoop) {
107 dmsg(4,("Disk Thread: new stream ordered\n"));
108 if (CreationQueue->write_space() < 1) {
109 dmsg(1,("DiskThread: Order queue full!\n"));
110 return -1;
111 }
112
113 const Stream::OrderID_t newOrder = CreateOrderID();
114 if (!newOrder) {
115 dmsg(1,("DiskThread: there was no free slot\n"));
116 return -1; // there was no free slot
117 }
118
119 pStreamRef->State = Stream::state_active;
120 pStreamRef->OrderID = newOrder;
121 pStreamRef->hStream = CreateHandle();
122 pStreamRef->pStream = NULL; // a stream has to be activated by the disk thread first
123
124 create_command_t cmd;
125 cmd.OrderID = pStreamRef->OrderID;
126 cmd.hStream = pStreamRef->hStream;
127 cmd.pStreamRef = pStreamRef;
128 cmd.pSample = pSample;
129 cmd.SampleOffset = SampleOffset;
130 cmd.DoLoop = DoLoop;
131
132 CreationQueue->push(&cmd);
133 return 0;
134 }
135
136 /**
137 * Returns -1 if command queue is full, 0 on success (will be called by audio
138 * thread within the voice class).
139 */
140 int DiskThread::OrderDeletionOfStream(Stream::reference_t* pStreamRef) {
141 dmsg(4,("Disk Thread: stream deletion ordered\n"));
142 if (DeletionQueue->write_space() < 1) {
143 dmsg(1,("DiskThread: Deletion queue full!\n"));
144 return -1;
145 }
146
147 delete_command_t cmd;
148 cmd.pStream = pStreamRef->pStream;
149 cmd.hStream = pStreamRef->hStream;
150 cmd.OrderID = pStreamRef->OrderID;
151
152 DeletionQueue->push(&cmd);
153 return 0;
154 }
155
156 /**
157 * Returns the pointer to a disk stream if the ordered disk stream
158 * represented by the \a StreamOrderID was already activated by the disk
159 * thread, returns NULL otherwise. If the call was successful, thus if it
160 * returned a valid stream pointer, the caller has to the store that pointer
161 * by himself, because it's not possible to call this method again with the
162 * same used order ID; this method is just intended for picking up an ordered
163 * disk stream. This method will usually be called by the voice class (within
164 * the audio thread).
165 *
166 * @param StreamOrderID - ID previously returned by OrderNewStream()
167 * @returns pointer to created stream object, NULL otherwise
168 */
169 Stream* DiskThread::AskForCreatedStream(Stream::OrderID_t StreamOrderID) {
170 dmsg(4,("Disk Thread: been asked if stream already created, OrderID=%x ", StreamOrderID));
171 Stream* pStream = pCreatedStreams[StreamOrderID];
172 if (pStream && pStream != SLOT_RESERVED) {
173 dmsg(4,("(yes created)\n"));
174 pCreatedStreams[StreamOrderID] = NULL; // free the slot for a new order
175 return pStream;
176 }
177 dmsg(4,("(no not yet created)\n"));
178 return NULL;
179 }
180
181
182
183 // #########################################################################
184 // # Disk Thread Only Section
185 // # (following code should only be executed by the disk thread)
186
187
188 DiskThread::DiskThread(uint BufferWrapElements) : Thread(true, false, 1, -2) {
189 DecompressionBuffer = ::gig::Sample::CreateDecompressionBuffer(CONFIG_STREAM_MAX_REFILL_SIZE);
190 CreationQueue = new RingBuffer<create_command_t>(1024);
191 DeletionQueue = new RingBuffer<delete_command_t>(1024);
192 GhostQueue = new RingBuffer<Stream::Handle>(CONFIG_MAX_STREAMS);
193 Streams = CONFIG_MAX_STREAMS;
194 RefillStreamsPerRun = CONFIG_REFILL_STREAMS_PER_RUN;
195 for (int i = 0; i < CONFIG_MAX_STREAMS; i++) {
196 pStreams[i] = new Stream(&DecompressionBuffer, CONFIG_STREAM_BUFFER_SIZE, BufferWrapElements); // 131072 sample words
197 }
198 for (int i = 1; i <= CONFIG_MAX_STREAMS; i++) {
199 pCreatedStreams[i] = NULL;
200 }
201 }
202
203 DiskThread::~DiskThread() {
204 for (int i = 0; i < CONFIG_MAX_STREAMS; i++) {
205 if (pStreams[i]) delete pStreams[i];
206 }
207 if (CreationQueue) delete CreationQueue;
208 if (DeletionQueue) delete DeletionQueue;
209 if (GhostQueue) delete GhostQueue;
210 ::gig::Sample::DestroyDecompressionBuffer(DecompressionBuffer);
211 }
212
213 int DiskThread::Main() {
214 dmsg(3,("Disk thread running\n"));
215 while (true) {
216 pthread_testcancel(); // mandatory for OSX
217 IsIdle = true; // will be set to false if a stream got filled
218
219 // if there are ghost streams, delete them
220 for (int i = 0; i < GhostQueue->read_space(); i++) { //FIXME: unefficient
221 Stream::Handle hGhostStream;
222 GhostQueue->pop(&hGhostStream);
223 bool found = false;
224 for (int i = 0; i < this->Streams; i++) {
225 if (pStreams[i]->GetHandle() == hGhostStream) {
226 pStreams[i]->Kill();
227 found = true;
228 break;
229 }
230 }
231 if (!found) GhostQueue->push(&hGhostStream); // put ghost stream handle back to the queue
232 }
233
234 // if there are creation commands, create new streams
235 while (Stream::UnusedStreams > 0 && CreationQueue->read_space() > 0) {
236 create_command_t command;
237 CreationQueue->pop(&command);
238 CreateStream(command);
239 }
240
241 // if there are deletion commands, delete those streams
242 while (Stream::UnusedStreams < Stream::TotalStreams && DeletionQueue->read_space() > 0) {
243 delete_command_t command;
244 DeletionQueue->pop(&command);
245 DeleteStream(command);
246 }
247
248 RefillStreams(); // refill the most empty streams
249
250 // if nothing was done during this iteration (eg no streambuffer
251 // filled with data) then sleep for 50ms
252 if (IsIdle) usleep(30000);
253
254 int streamsInUsage = 0;
255 for (int i = Streams - 1; i >= 0; i--) {
256 if (pStreams[i]->GetState() != Stream::state_unused) streamsInUsage++;
257 }
258 ActiveStreamCount = streamsInUsage;
259 if (streamsInUsage > ActiveStreamCountMax) ActiveStreamCountMax = streamsInUsage;
260 }
261
262 return EXIT_FAILURE;
263 }
264
265 void DiskThread::CreateStream(create_command_t& Command) {
266 // search for unused stream
267 Stream* newstream = NULL;
268 for (int i = Streams - 1; i >= 0; i--) {
269 if (pStreams[i]->GetState() == Stream::state_unused) {
270 newstream = pStreams[i];
271 break;
272 }
273 }
274 if (!newstream) {
275 std::cerr << "No unused stream found (OrderID:" << Command.OrderID << ") - report if this happens, this is a bug!\n" << std::flush;
276 return;
277 }
278 newstream->Launch(Command.hStream, Command.pStreamRef, Command.pSample, Command.SampleOffset, Command.DoLoop);
279 dmsg(4,("new Stream launched by disk thread (OrderID:%d,StreamHandle:%d)\n", Command.OrderID, Command.hStream));
280 if (pCreatedStreams[Command.OrderID] != SLOT_RESERVED) {
281 std::cerr << "DiskThread: Slot " << Command.OrderID << " already occupied! Please report this!\n" << std::flush;
282 newstream->Kill();
283 return;
284 }
285 pCreatedStreams[Command.OrderID] = newstream;
286 }
287
288 void DiskThread::DeleteStream(delete_command_t& Command) {
289 if (Command.pStream) Command.pStream->Kill();
290 else { // the stream wasn't created by disk thread or picked up by audio thread yet
291
292 // if stream was created but not picked up yet
293 Stream* pStream = pCreatedStreams[Command.OrderID];
294 if (pStream && pStream != SLOT_RESERVED) {
295 pStream->Kill();
296 pCreatedStreams[Command.OrderID] = NULL; // free slot for new order
297 return;
298 }
299
300 // the stream was not created yet
301 if (GhostQueue->write_space() > 0) {
302 GhostQueue->push(&Command.hStream);
303 }
304 else dmsg(1,("DiskThread: GhostQueue full!\n"));
305 }
306 }
307
308 void DiskThread::RefillStreams() {
309 // sort the streams by most empty stream
310 qsort(pStreams, Streams, sizeof(Stream*), CompareStreamWriteSpace);
311
312 // refill the most empty streams
313 for (uint i = 0; i < RefillStreamsPerRun; i++) {
314 if (pStreams[i]->GetState() == Stream::state_active) {
315
316 //float filledpercentage = (float) pStreams[i]->GetReadSpace() / 131072.0 * 100.0;
317 //dmsg(("\nbuffer fill: %.1f%\n", filledpercentage));
318
319 int writespace = pStreams[i]->GetWriteSpaceToEnd();
320 if (writespace == 0) break;
321
322 int capped_writespace = writespace;
323 // if there is too much buffer space available then cut the read/write
324 // size to CONFIG_STREAM_MAX_REFILL_SIZE which is by default 65536 samples = 256KBytes
325 if (writespace > CONFIG_STREAM_MAX_REFILL_SIZE) capped_writespace = CONFIG_STREAM_MAX_REFILL_SIZE;
326
327 // adjust the amount to read in order to ensure that the buffer wraps correctly
328 int read_amount = pStreams[i]->AdjustWriteSpaceToAvoidBoundary(writespace, capped_writespace);
329 // if we wasn't able to refill one of the stream buffers by more than
330 // CONFIG_STREAM_MIN_REFILL_SIZE we'll send the disk thread to sleep later
331 if (pStreams[i]->ReadAhead(read_amount) > CONFIG_STREAM_MIN_REFILL_SIZE) this->IsIdle = false;
332 }
333 }
334 }
335
336 /// Handle Generator
337 Stream::Handle DiskThread::CreateHandle() {
338 static uint32_t counter = 0;
339 if (counter == 0xffffffff) counter = 1; // we use '0' as 'invalid handle' only, so we skip 0
340 else counter++;
341 return counter;
342 }
343
344 /// order ID Generator
345 Stream::OrderID_t DiskThread::CreateOrderID() {
346 static Stream::OrderID_t counter(0);
347 for (int i = 0; i < CONFIG_MAX_STREAMS; i++) {
348 if (counter == CONFIG_MAX_STREAMS) counter = 1; // we use '0' as 'invalid order' only, so we skip 0
349 else counter++;
350 if (!pCreatedStreams[counter]) {
351 pCreatedStreams[counter] = SLOT_RESERVED; // mark this slot as reserved
352 return counter; // found empty slot
353 }
354 }
355 return 0; // no free slot
356 }
357
358
359
360 // *********** C functions **************
361 // *
362
363 /**
364 * This is the comparison function the qsort algo uses to determine if a value is
365 * bigger than another one or special in our case; if the writespace of a stream
366 * is bigger than another one.
367 */
368 int CompareStreamWriteSpace(const void* A, const void* B) {
369 Stream* a = *(Stream**) A;
370 Stream* b = *(Stream**) B;
371 return b->GetWriteSpace() - a->GetWriteSpace();
372 }
373
374 }} // namespace LinuxSampler::gig

  ViewVC Help
Powered by ViewVC