49 |
void DiskThread::Reset() { |
void DiskThread::Reset() { |
50 |
bool running = this->IsRunning(); |
bool running = this->IsRunning(); |
51 |
if (running) this->StopThread(); |
if (running) this->StopThread(); |
52 |
for (int i = 0; i < CONFIG_MAX_STREAMS; i++) { |
for (int i = 0; i < Streams; i++) { |
53 |
pStreams[i]->Kill(); |
pStreams[i]->Kill(); |
54 |
} |
} |
55 |
for (int i = 1; i <= CONFIG_MAX_STREAMS; i++) { |
for (int i = 1; i <= Streams; i++) { |
56 |
pCreatedStreams[i] = NULL; |
pCreatedStreams[i] = NULL; |
57 |
} |
} |
58 |
GhostQueue->init(); |
GhostQueue->init(); |
236 |
// # (following code should only be executed by the disk thread) |
// # (following code should only be executed by the disk thread) |
237 |
|
|
238 |
|
|
239 |
DiskThread::DiskThread(uint BufferWrapElements, InstrumentResourceManager* pInstruments) : |
DiskThread::DiskThread(int MaxStreams, uint BufferWrapElements, InstrumentResourceManager* pInstruments) : |
240 |
Thread(true, false, 1, -2), |
Thread(true, false, 1, -2), |
241 |
pInstruments(pInstruments), |
pInstruments(pInstruments), |
242 |
DeletionNotificationQueue(4*CONFIG_MAX_STREAMS) |
DeletionNotificationQueue(4*MaxStreams) |
243 |
{ |
{ |
244 |
DecompressionBuffer = ::gig::Sample::CreateDecompressionBuffer(CONFIG_STREAM_MAX_REFILL_SIZE); |
DecompressionBuffer = ::gig::Sample::CreateDecompressionBuffer(CONFIG_STREAM_MAX_REFILL_SIZE); |
245 |
CreationQueue = new RingBuffer<create_command_t,false>(4*CONFIG_MAX_STREAMS); |
CreationQueue = new RingBuffer<create_command_t,false>(4*MaxStreams); |
246 |
DeletionQueue = new RingBuffer<delete_command_t,false>(4*CONFIG_MAX_STREAMS); |
DeletionQueue = new RingBuffer<delete_command_t,false>(4*MaxStreams); |
247 |
GhostQueue = new RingBuffer<delete_command_t,false>(CONFIG_MAX_STREAMS); |
GhostQueue = new RingBuffer<delete_command_t,false>(MaxStreams); |
248 |
DeleteDimregQueue = new RingBuffer< ::gig::DimensionRegion*,false>(4*CONFIG_MAX_STREAMS); |
DeleteDimregQueue = new RingBuffer< ::gig::DimensionRegion*,false>(4*MaxStreams); |
249 |
Streams = CONFIG_MAX_STREAMS; |
pStreams = new Stream*[MaxStreams]; |
250 |
|
pCreatedStreams = new Stream*[MaxStreams + 1]; |
251 |
|
Streams = MaxStreams; |
252 |
RefillStreamsPerRun = CONFIG_REFILL_STREAMS_PER_RUN; |
RefillStreamsPerRun = CONFIG_REFILL_STREAMS_PER_RUN; |
253 |
for (int i = 0; i < CONFIG_MAX_STREAMS; i++) { |
for (int i = 0; i < MaxStreams; i++) { |
254 |
pStreams[i] = new Stream(&DecompressionBuffer, CONFIG_STREAM_BUFFER_SIZE, BufferWrapElements); // 131072 sample words |
pStreams[i] = new Stream(&DecompressionBuffer, CONFIG_STREAM_BUFFER_SIZE, BufferWrapElements); // 131072 sample words |
255 |
} |
} |
256 |
for (int i = 1; i <= CONFIG_MAX_STREAMS; i++) { |
for (int i = 1; i <= MaxStreams; i++) { |
257 |
pCreatedStreams[i] = NULL; |
pCreatedStreams[i] = NULL; |
258 |
} |
} |
259 |
ActiveStreamCountMax = 0; |
ActiveStreamCountMax = 0; |
260 |
} |
} |
261 |
|
|
262 |
DiskThread::~DiskThread() { |
DiskThread::~DiskThread() { |
263 |
for (int i = 0; i < CONFIG_MAX_STREAMS; i++) { |
for (int i = 0; i < Streams; i++) { |
264 |
if (pStreams[i]) delete pStreams[i]; |
if (pStreams[i]) delete pStreams[i]; |
265 |
} |
} |
266 |
if (CreationQueue) delete CreationQueue; |
if (CreationQueue) delete CreationQueue; |
267 |
if (DeletionQueue) delete DeletionQueue; |
if (DeletionQueue) delete DeletionQueue; |
268 |
if (GhostQueue) delete GhostQueue; |
if (GhostQueue) delete GhostQueue; |
269 |
if (DeleteDimregQueue) delete DeleteDimregQueue; |
if (DeleteDimregQueue) delete DeleteDimregQueue; |
270 |
|
if (pStreams) delete[] pStreams; |
271 |
|
if (pCreatedStreams) delete[] pCreatedStreams; |
272 |
|
|
273 |
::gig::Sample::DestroyDecompressionBuffer(DecompressionBuffer); |
::gig::Sample::DestroyDecompressionBuffer(DecompressionBuffer); |
274 |
} |
} |
275 |
|
|
279 |
#if !defined(WIN32) |
#if !defined(WIN32) |
280 |
pthread_testcancel(); // mandatory for OSX |
pthread_testcancel(); // mandatory for OSX |
281 |
#endif |
#endif |
282 |
#if CONFIG_PTHREAD_TESTCANCEL |
#if CONFIG_PTHREAD_TESTCANCEL |
283 |
TestCancel(); |
TestCancel(); |
284 |
#endif |
#endif |
285 |
IsIdle = true; // will be set to false if a stream got filled |
IsIdle = true; // will be set to false if a stream got filled |
286 |
|
|
287 |
// if there are ghost streams, delete them |
// if there are ghost streams, delete them |
431 |
/// order ID Generator |
/// order ID Generator |
432 |
Stream::OrderID_t DiskThread::CreateOrderID() { |
Stream::OrderID_t DiskThread::CreateOrderID() { |
433 |
static Stream::OrderID_t counter(0); |
static Stream::OrderID_t counter(0); |
434 |
for (int i = 0; i < CONFIG_MAX_STREAMS; i++) { |
for (int i = 0; i < Streams; i++) { |
435 |
if (counter == CONFIG_MAX_STREAMS) counter = 1; // we use '0' as 'invalid order' only, so we skip 0 |
if (counter == Streams) counter = 1; // we use '0' as 'invalid order' only, so we skip 0 |
436 |
else counter++; |
else counter++; |
437 |
if (!pCreatedStreams[counter]) { |
if (!pCreatedStreams[counter]) { |
438 |
pCreatedStreams[counter] = SLOT_RESERVED; // mark this slot as reserved |
pCreatedStreams[counter] = SLOT_RESERVED; // mark this slot as reserved |
439 |
return counter; // found empty slot |
return counter; // found empty slot |