/[svn]/linuxsampler/trunk/src/diskthread.cpp
ViewVC logotype

Annotation of /linuxsampler/trunk/src/diskthread.cpp

Parent Directory Parent Directory | Revision Log Revision Log


Revision 35 - (hide annotations) (download)
Fri Mar 5 13:46:15 2004 UTC (20 years, 2 months ago) by schoenebeck
File size: 14047 byte(s)
* implemented parser for the LinuxSampler control protocol (LSCP) by using
  flex / bison (where src/network/lscp.l is the input file for lex / flex
  and src/network/lscp.y is the input file for yacc / bison), parser and
  scanner can be regenerated by 'make parser'
* implemented LSCP network server (only single threaded so far), LSCP
  server will be launched if LinuxSampler was started with "--server" flag,
  implemented the following LSCP commands so far: "LOAD INSTRUMENT", "GET
  CHANNEL VOICE_COUNT", "GET CHANNEL STREAM_COUNT", "GET CHANNEL
  BUFFER_FILL", "SET CHANNEL VOLUME" and "RESET CHANNEL"
* disk thread now started within the engine

1 schoenebeck 9 /***************************************************************************
2     * *
3     * LinuxSampler - modular, streaming capable sampler *
4     * *
5     * Copyright (C) 2003 by Benno Senoner *
6     * *
7     * This program is free software; you can redistribute it and/or modify *
8     * it under the terms of the GNU General Public License as published by *
9     * the Free Software Foundation; either version 2 of the License, or *
10     * (at your option) any later version. *
11     * *
12     * This program is distributed in the hope that it will be useful, *
13     * but WITHOUT ANY WARRANTY; without even the implied warranty of *
14     * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
15     * GNU General Public License for more details. *
16     * *
17     * You should have received a copy of the GNU General Public License *
18     * along with this program; if not, write to the Free Software *
19     * Foundation, Inc., 59 Temple Place, Suite 330, Boston, *
20     * MA 02111-1307 USA *
21     ***************************************************************************/
22    
23     #include "diskthread.h"
24    
25    
26     // *********** DiskThread **************
27     // *
28    
29    
30 schoenebeck 18 // just a placeholder to mark a cell in the pickup array as 'reserved'
31     Stream* DiskThread::SLOT_RESERVED = (Stream*) &SLOT_RESERVED;
32    
33    
34 schoenebeck 9 // #########################################################################
35     // # Foreign Thread Section
36     // # (following code intended to be interface for audio thread)
37    
38    
39     /**
40 schoenebeck 35 * Suspend disk thread, kill all active streams, clear all queues and the
41     * pickup array and reset all streams. Call this method to bring everything
42     * in the disk thread to day one. If the disk thread was running, it will be
43     * respawned right after everything was reset.
44     */
45     void DiskThread::Reset() {
46     bool running = this->IsRunning();
47     if (running) this->StopThread();
48     for (int i = 0; i < MAX_INPUT_STREAMS; i++) {
49     pStreams[i]->Kill();
50     }
51     for (int i = 1; i <= MAX_INPUT_STREAMS; i++) {
52     pCreatedStreams[i] = NULL;
53     }
54     GhostQueue->init();
55     CreationQueue->init();
56     DeletionQueue->init();
57     ActiveStreamCount = 0;
58     ActiveStreamCountMax = 0;
59     if (running) this->StartThread(); // start thread only if it was running before
60     }
61    
62     String DiskThread::GetBufferFillBytes() {
63     bool activestreams = false;
64     std::stringstream ss;
65     for (uint i = 0; i < this->Streams; i++) {
66     if (pStreams[i]->GetState() == Stream::state_unused) continue;
67     uint bufferfill = pStreams[i]->GetReadSpace() * sizeof(sample_t);
68     uint streamid = (uint) pStreams[i]->GetHandle();
69     if (!streamid) continue;
70    
71     if (activestreams) ss << ",[" << streamid << ']' << bufferfill;
72     else {
73     ss << '[' << streamid << ']' << bufferfill;
74     activestreams = true;
75     }
76     }
77     return ss.str();
78     }
79    
80     String DiskThread::GetBufferFillPercentage() {
81     bool activestreams = false;
82     std::stringstream ss;
83     for (uint i = 0; i < this->Streams; i++) {
84     if (pStreams[i]->GetState() == Stream::state_unused) continue;
85     uint bufferfill = (uint) ((float) pStreams[i]->GetReadSpace() / (float) STREAM_BUFFER_SIZE * 100);
86     uint streamid = (uint) pStreams[i]->GetHandle();
87     if (!streamid) continue;
88    
89     if (activestreams) ss << ",[" << streamid << ']' << bufferfill << '%';
90     else {
91     ss << '[' << streamid << ']' << bufferfill;
92     activestreams = true;
93     }
94     }
95     return ss.str();
96     }
97    
98     /**
99 schoenebeck 18 * Returns -1 if command queue or pickup pool is full, 0 on success (will be
100     * called by audio thread within the voice class).
101 schoenebeck 9 */
102 schoenebeck 26 int DiskThread::OrderNewStream(Stream::reference_t* pStreamRef, gig::Sample* pSample, unsigned long SampleOffset, bool DoLoop) {
103 schoenebeck 12 dmsg(4,("Disk Thread: new stream ordered\n"));
104 schoenebeck 18 if (CreationQueue->write_space() < 1) {
105     dmsg(1,("DiskThread: Order queue full!\n"));
106     return -1;
107     }
108 schoenebeck 9
109     pStreamRef->State = Stream::state_active;
110     pStreamRef->OrderID = CreateOrderID();
111     pStreamRef->hStream = CreateHandle();
112     pStreamRef->pStream = NULL; // a stream has to be activated by the disk thread first
113    
114 schoenebeck 18 if (!pStreamRef->OrderID) return -1; // there was no free slot
115    
116 schoenebeck 9 create_command_t cmd;
117 schoenebeck 18 cmd.OrderID = pStreamRef->OrderID;
118     cmd.hStream = pStreamRef->hStream;
119 schoenebeck 9 cmd.pStreamRef = pStreamRef;
120     cmd.pSample = pSample;
121     cmd.SampleOffset = SampleOffset;
122 schoenebeck 26 cmd.DoLoop = DoLoop;
123 schoenebeck 9
124 schoenebeck 18 CreationQueue->push(&cmd);
125 schoenebeck 9 return 0;
126     }
127    
128     /**
129     * Returns -1 if command queue is full, 0 on success (will be called by audio
130     * thread within the voice class).
131     */
132     int DiskThread::OrderDeletionOfStream(Stream::reference_t* pStreamRef) {
133 schoenebeck 12 dmsg(4,("Disk Thread: stream deletion ordered\n"));
134 schoenebeck 18 if (DeletionQueue->write_space() < 1) {
135     dmsg(1,("DiskThread: Deletion queue full!\n"));
136     return -1;
137     }
138 schoenebeck 9
139     delete_command_t cmd;
140     cmd.pStream = pStreamRef->pStream;
141     cmd.hStream = pStreamRef->hStream;
142     cmd.OrderID = pStreamRef->OrderID;
143    
144 schoenebeck 18 DeletionQueue->push(&cmd);
145 schoenebeck 9 return 0;
146     }
147    
148     /**
149     * Returns the pointer to a disk stream if the ordered disk stream
150     * represented by the \a StreamOrderID was already activated by the disk
151     * thread, returns NULL otherwise. If the call was successful, thus if it
152     * returned a valid stream pointer, the caller has to the store that pointer
153     * by himself, because it's not possible to call this method again with the
154     * same used order ID; this method is just intended for picking up an ordered
155     * disk stream. This method will usually be called by the voice class (within
156     * the audio thread).
157     *
158     * @param StreamOrderID - ID previously returned by OrderNewStream()
159     * @returns pointer to created stream object, NULL otherwise
160     */
161     Stream* DiskThread::AskForCreatedStream(Stream::OrderID_t StreamOrderID) {
162 schoenebeck 12 dmsg(4,("Disk Thread: been asked if stream already created, OrderID=%x ", StreamOrderID));
163 schoenebeck 9 Stream* pStream = pCreatedStreams[StreamOrderID];
164 schoenebeck 18 if (pStream && pStream != SLOT_RESERVED) {
165     dmsg(4,("(yes created)\n"));
166     pCreatedStreams[StreamOrderID] = NULL; // free the slot for a new order
167     }
168     else dmsg(4,("(no not yet created)\n"));
169 schoenebeck 9 return pStream;
170     }
171    
172    
173    
174     // #########################################################################
175     // # Disk Thread Only Section
176     // # (following code should only be executed by the disk thread)
177    
178    
179     DiskThread::DiskThread(uint BufferWrapElements) : Thread(false, 1, -2) {
180     CreationQueue = new RingBuffer<create_command_t>(1024);
181     DeletionQueue = new RingBuffer<delete_command_t>(1024);
182 schoenebeck 18 GhostQueue = new RingBuffer<Stream::Handle>(MAX_INPUT_STREAMS);
183 schoenebeck 9 Streams = MAX_INPUT_STREAMS;
184 schoenebeck 18 RefillStreamsPerRun = REFILL_STREAMS_PER_RUN;
185 schoenebeck 9 for (int i = 0; i < MAX_INPUT_STREAMS; i++) {
186 schoenebeck 18 pStreams[i] = new Stream(STREAM_BUFFER_SIZE, BufferWrapElements); // 131072 sample words
187 schoenebeck 9 }
188     for (int i = 1; i <= MAX_INPUT_STREAMS; i++) {
189     pCreatedStreams[i] = NULL;
190     }
191     }
192    
193     DiskThread::~DiskThread() {
194     for (int i = 0; i < MAX_INPUT_STREAMS; i++) {
195     if (pStreams[i]) delete pStreams[i];
196     }
197     if (CreationQueue) delete CreationQueue;
198     if (DeletionQueue) delete DeletionQueue;
199 schoenebeck 18 if (GhostQueue) delete GhostQueue;
200 schoenebeck 9 }
201    
202     int DiskThread::Main() {
203 schoenebeck 12 dmsg(3,("Disk thread running\n"));
204 schoenebeck 9 while (true) {
205     IsIdle = true; // will be set to false if a stream got filled
206    
207 schoenebeck 18 // if there are ghost streams, delete them
208     for (int i = 0; i < GhostQueue->read_space(); i++) { //FIXME: unefficient
209     Stream::Handle hGhostStream;
210     GhostQueue->pop(&hGhostStream);
211     bool found = false;
212     for (int i = 0; i < this->Streams; i++) {
213     if (pStreams[i]->GetHandle() == hGhostStream) {
214     pStreams[i]->Kill();
215     found = true;
216     break;
217     }
218     }
219     if (!found) GhostQueue->push(&hGhostStream); // put ghost stream handle back to the queue
220     }
221    
222 schoenebeck 9 // if there are creation commands, create new streams
223     while (Stream::UnusedStreams > 0 && CreationQueue->read_space() > 0) {
224     create_command_t command;
225 schoenebeck 18 CreationQueue->pop(&command);
226 schoenebeck 9 CreateStream(command);
227     }
228    
229     // if there are deletion commands, delete those streams
230     while (Stream::UnusedStreams < Streams && DeletionQueue->read_space() > 0) {
231     delete_command_t command;
232 schoenebeck 18 DeletionQueue->pop(&command);
233 schoenebeck 9 DeleteStream(command);
234     }
235    
236     RefillStreams(); // refill the most empty streams
237    
238     // if nothing was done during this iteration (eg no streambuffer
239     // filled with data) then sleep for 50ms
240     if (IsIdle) usleep(30000);
241 senoner 10
242 schoenebeck 13 int streamsInUsage = 0;
243 senoner 10 for (int i = Streams - 1; i >= 0; i--) {
244 schoenebeck 13 if (pStreams[i]->GetState() != Stream::state_unused) streamsInUsage++;
245 senoner 10 }
246 schoenebeck 13 ActiveStreamCount = streamsInUsage;
247     if (streamsInUsage > ActiveStreamCountMax) ActiveStreamCountMax = streamsInUsage;
248 schoenebeck 9 }
249    
250     return EXIT_FAILURE;
251     }
252    
253     void DiskThread::CreateStream(create_command_t& Command) {
254     // search for unused stream
255     Stream* newstream = NULL;
256     for (int i = Streams - 1; i >= 0; i--) {
257     if (pStreams[i]->GetState() == Stream::state_unused) {
258     newstream = pStreams[i];
259     break;
260     }
261     }
262     if (!newstream) {
263 schoenebeck 18 std::cerr << "No unused stream found (OrderID:" << Command.OrderID << ") - report if this happens, this is a bug!\n" << std::flush;
264 schoenebeck 9 return;
265     }
266 schoenebeck 26 newstream->Launch(Command.hStream, Command.pStreamRef, Command.pSample, Command.SampleOffset, Command.DoLoop);
267 schoenebeck 18 dmsg(4,("new Stream launched by disk thread (OrderID:%d,StreamHandle:%d)\n", Command.OrderID, Command.hStream));
268     if (pCreatedStreams[Command.OrderID] != SLOT_RESERVED) {
269     std::cerr << "DiskThread: Slot " << Command.OrderID << " already occupied! Please report this!\n" << std::flush;
270     newstream->Kill();
271     return;
272     }
273     pCreatedStreams[Command.OrderID] = newstream;
274 schoenebeck 9 }
275    
276     void DiskThread::DeleteStream(delete_command_t& Command) {
277     if (Command.pStream) Command.pStream->Kill();
278     else { // the stream wasn't created by disk thread or picked up by audio thread yet
279    
280 schoenebeck 18 // if stream was created but not picked up yet
281 schoenebeck 9 Stream* pStream = pCreatedStreams[Command.OrderID];
282 schoenebeck 18 if (pStream && pStream != SLOT_RESERVED) {
283 schoenebeck 9 pStream->Kill();
284     pCreatedStreams[Command.OrderID] = NULL; // free slot for new order
285     return;
286     }
287    
288 schoenebeck 18 // the stream was not created yet
289     if (GhostQueue->write_space() > 0) {
290     GhostQueue->push(&Command.hStream);
291 schoenebeck 9 }
292 schoenebeck 18 else dmsg(1,("DiskThread: GhostQueue full!\n"));
293 schoenebeck 9 }
294     }
295    
296     void DiskThread::RefillStreams() {
297     // sort the streams by most empty stream
298     qsort(pStreams, Streams, sizeof(Stream*), CompareStreamWriteSpace);
299    
300     // refill the most empty streams
301     for (uint i = 0; i < RefillStreamsPerRun; i++) {
302     if (pStreams[i]->GetState() == Stream::state_active) {
303    
304     //float filledpercentage = (float) pStreams[i]->GetReadSpace() / 131072.0 * 100.0;
305     //dmsg(("\nbuffer fill: %.1f%\n", filledpercentage));
306    
307     int writespace = pStreams[i]->GetWriteSpaceToEnd();
308     if (writespace == 0) break;
309    
310     int capped_writespace = writespace;
311     // if there is too much buffer space available then cut the read/write
312     // size to MAX_REFILL_SIZE which is by default 65536 samples = 256KBytes
313     if (writespace > MAX_REFILL_SIZE) capped_writespace = MAX_REFILL_SIZE;
314    
315     // adjust the amount to read in order to ensure that the buffer wraps correctly
316     int read_amount = pStreams[i]->AdjustWriteSpaceToAvoidBoundary(writespace, capped_writespace);
317     // if we wasn't able to refill one of the stream buffers by more than
318     // MIN_REFILL_SIZE we'll send the disk thread to sleep later
319     if (pStreams[i]->ReadAhead(read_amount) > MIN_REFILL_SIZE) this->IsIdle = false;
320     }
321     }
322     }
323    
324     /// Handle Generator
325     Stream::Handle DiskThread::CreateHandle() {
326     static uint32_t counter = 0;
327     if (counter == 0xffffffff) counter = 1; // we use '0' as 'invalid handle' only, so we skip 0
328     else counter++;
329     return counter;
330     }
331    
332     /// order ID Generator
333     Stream::OrderID_t DiskThread::CreateOrderID() {
334     static uint32_t counter = 0;
335 schoenebeck 18 for (int i = 0; i < MAX_INPUT_STREAMS; i++) {
336 schoenebeck 9 if (counter == MAX_INPUT_STREAMS) counter = 1; // we use '0' as 'invalid order' only, so we skip 0
337     else counter++;
338 schoenebeck 18 if (!pCreatedStreams[counter]) {
339     pCreatedStreams[counter] = SLOT_RESERVED; // mark this slot as reserved
340     return counter; // found empty slot
341     }
342     }
343     return 0; // no free slot
344 schoenebeck 9 }
345    
346    
347    
348     // *********** C functions **************
349     // *
350    
351     /**
352     * This is the comparison function the qsort algo uses to determine if a value is
353     * bigger than another one or special in our case; if the writespace of a stream
354     * is bigger than another one.
355     */
356     int CompareStreamWriteSpace(const void* A, const void* B) {
357     Stream* a = *(Stream**) A;
358     Stream* b = *(Stream**) B;
359     return b->GetWriteSpace() - a->GetWriteSpace();
360     }

  ViewVC Help
Powered by ViewVC