all repos — mgba @ b2d406a411b31acce5bbf0246af32a80c22ca834

mGBA Game Boy Advance Emulator

src/third-party/lzma/MtCoder.c (view raw)

  1/* MtCoder.c -- Multi-thread Coder
  22018-07-04 : Igor Pavlov : Public domain */
  3
  4#include "Precomp.h"
  5
  6#include "MtCoder.h"
  7
  8#ifndef _7ZIP_ST
  9
 10SRes MtProgressThunk_Progress(const ICompressProgress *pp, UInt64 inSize, UInt64 outSize)
 11{
 12  CMtProgressThunk *thunk = CONTAINER_FROM_VTBL(pp, CMtProgressThunk, vt);
 13  UInt64 inSize2 = 0;
 14  UInt64 outSize2 = 0;
 15  if (inSize != (UInt64)(Int64)-1)
 16  {
 17    inSize2 = inSize - thunk->inSize;
 18    thunk->inSize = inSize;
 19  }
 20  if (outSize != (UInt64)(Int64)-1)
 21  {
 22    outSize2 = outSize - thunk->outSize;
 23    thunk->outSize = outSize;
 24  }
 25  return MtProgress_ProgressAdd(thunk->mtProgress, inSize2, outSize2);
 26}
 27
 28
 29void MtProgressThunk_CreateVTable(CMtProgressThunk *p)
 30{
 31  p->vt.Progress = MtProgressThunk_Progress;
 32}
 33
 34
 35
 36#define RINOK_THREAD(x) { if ((x) != 0) return SZ_ERROR_THREAD; }
 37
 38
 39static WRes ArEvent_OptCreate_And_Reset(CEvent *p)
 40{
 41  if (Event_IsCreated(p))
 42    return Event_Reset(p);
 43  return AutoResetEvent_CreateNotSignaled(p);
 44}
 45
 46
 47static THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE ThreadFunc(void *pp);
 48
 49
 50static SRes MtCoderThread_CreateAndStart(CMtCoderThread *t)
 51{
 52  WRes wres = ArEvent_OptCreate_And_Reset(&t->startEvent);
 53  if (wres == 0)
 54  {
 55    t->stop = False;
 56    if (!Thread_WasCreated(&t->thread))
 57      wres = Thread_Create(&t->thread, ThreadFunc, t);
 58    if (wres == 0)
 59      wres = Event_Set(&t->startEvent);
 60  }
 61  if (wres == 0)
 62    return SZ_OK;
 63  return MY_SRes_HRESULT_FROM_WRes(wres);
 64}
 65
 66
 67static void MtCoderThread_Destruct(CMtCoderThread *t)
 68{
 69  if (Thread_WasCreated(&t->thread))
 70  {
 71    t->stop = 1;
 72    Event_Set(&t->startEvent);
 73    Thread_Wait(&t->thread);
 74    Thread_Close(&t->thread);
 75  }
 76
 77  Event_Close(&t->startEvent);
 78
 79  if (t->inBuf)
 80  {
 81    ISzAlloc_Free(t->mtCoder->allocBig, t->inBuf);
 82    t->inBuf = NULL;
 83  }
 84}
 85
 86
 87
 88static SRes FullRead(ISeqInStream *stream, Byte *data, size_t *processedSize)
 89{
 90  size_t size = *processedSize;
 91  *processedSize = 0;
 92  while (size != 0)
 93  {
 94    size_t cur = size;
 95    SRes res = ISeqInStream_Read(stream, data, &cur);
 96    *processedSize += cur;
 97    data += cur;
 98    size -= cur;
 99    RINOK(res);
100    if (cur == 0)
101      return SZ_OK;
102  }
103  return SZ_OK;
104}
105
106
107/*
108  ThreadFunc2() returns:
109  SZ_OK           - in all normal cases (even for stream error or memory allocation error)
110  SZ_ERROR_THREAD - in case of failure in system synch function
111*/
112
113static SRes ThreadFunc2(CMtCoderThread *t)
114{
115  CMtCoder *mtc = t->mtCoder;
116
117  for (;;)
118  {
119    unsigned bi;
120    SRes res;
121    SRes res2;
122    BoolInt finished;
123    unsigned bufIndex;
124    size_t size;
125    const Byte *inData;
126    UInt64 readProcessed = 0;
127    
128    RINOK_THREAD(Event_Wait(&mtc->readEvent))
129
130    /* after Event_Wait(&mtc->readEvent) we must call Event_Set(&mtc->readEvent) in any case to unlock another threads */
131
132    if (mtc->stopReading)
133    {
134      return Event_Set(&mtc->readEvent) == 0 ? SZ_OK : SZ_ERROR_THREAD;
135    }
136
137    res = MtProgress_GetError(&mtc->mtProgress);
138    
139    size = 0;
140    inData = NULL;
141    finished = True;
142
143    if (res == SZ_OK)
144    {
145      size = mtc->blockSize;
146      if (mtc->inStream)
147      {
148        if (!t->inBuf)
149        {
150          t->inBuf = (Byte *)ISzAlloc_Alloc(mtc->allocBig, mtc->blockSize);
151          if (!t->inBuf)
152            res = SZ_ERROR_MEM;
153        }
154        if (res == SZ_OK)
155        {
156          res = FullRead(mtc->inStream, t->inBuf, &size);
157          readProcessed = mtc->readProcessed + size;
158          mtc->readProcessed = readProcessed;
159        }
160        if (res != SZ_OK)
161        {
162          mtc->readRes = res;
163          /* after reading error - we can stop encoding of previous blocks */
164          MtProgress_SetError(&mtc->mtProgress, res);
165        }
166        else
167          finished = (size != mtc->blockSize);
168      }
169      else
170      {
171        size_t rem;
172        readProcessed = mtc->readProcessed;
173        rem = mtc->inDataSize - (size_t)readProcessed;
174        if (size > rem)
175          size = rem;
176        inData = mtc->inData + (size_t)readProcessed;
177        readProcessed += size;
178        mtc->readProcessed = readProcessed;
179        finished = (mtc->inDataSize == (size_t)readProcessed);
180      }
181    }
182
183    /* we must get some block from blocksSemaphore before Event_Set(&mtc->readEvent) */
184
185    res2 = SZ_OK;
186
187    if (Semaphore_Wait(&mtc->blocksSemaphore) != 0)
188    {
189      res2 = SZ_ERROR_THREAD;
190      if (res == SZ_OK)
191      {
192        res = res2;
193        // MtProgress_SetError(&mtc->mtProgress, res);
194      }
195    }
196
197    bi = mtc->blockIndex;
198
199    if (++mtc->blockIndex >= mtc->numBlocksMax)
200      mtc->blockIndex = 0;
201
202    bufIndex = (unsigned)(int)-1;
203
204    if (res == SZ_OK)
205      res = MtProgress_GetError(&mtc->mtProgress);
206
207    if (res != SZ_OK)
208      finished = True;
209
210    if (!finished)
211    {
212      if (mtc->numStartedThreads < mtc->numStartedThreadsLimit
213          && mtc->expectedDataSize != readProcessed)
214      {
215        res = MtCoderThread_CreateAndStart(&mtc->threads[mtc->numStartedThreads]);
216        if (res == SZ_OK)
217          mtc->numStartedThreads++;
218        else
219        {
220          MtProgress_SetError(&mtc->mtProgress, res);
221          finished = True;
222        }
223      }
224    }
225
226    if (finished)
227      mtc->stopReading = True;
228
229    RINOK_THREAD(Event_Set(&mtc->readEvent))
230
231    if (res2 != SZ_OK)
232      return res2;
233
234    if (res == SZ_OK)
235    {
236      CriticalSection_Enter(&mtc->cs);
237      bufIndex = mtc->freeBlockHead;
238      mtc->freeBlockHead = mtc->freeBlockList[bufIndex];
239      CriticalSection_Leave(&mtc->cs);
240      
241      res = mtc->mtCallback->Code(mtc->mtCallbackObject, t->index, bufIndex,
242          mtc->inStream ? t->inBuf : inData, size, finished);
243      
244      // MtProgress_Reinit(&mtc->mtProgress, t->index);
245
246      if (res != SZ_OK)
247        MtProgress_SetError(&mtc->mtProgress, res);
248    }
249
250    {
251      CMtCoderBlock *block = &mtc->blocks[bi];
252      block->res = res;
253      block->bufIndex = bufIndex;
254      block->finished = finished;
255    }
256    
257    #ifdef MTCODER__USE_WRITE_THREAD
258      RINOK_THREAD(Event_Set(&mtc->writeEvents[bi]))
259    #else
260    {
261      unsigned wi;
262      {
263        CriticalSection_Enter(&mtc->cs);
264        wi = mtc->writeIndex;
265        if (wi == bi)
266          mtc->writeIndex = (unsigned)(int)-1;
267        else
268          mtc->ReadyBlocks[bi] = True;
269        CriticalSection_Leave(&mtc->cs);
270      }
271
272      if (wi != bi)
273      {
274        if (res != SZ_OK || finished)
275          return 0;
276        continue;
277      }
278
279      if (mtc->writeRes != SZ_OK)
280        res = mtc->writeRes;
281
282      for (;;)
283      {
284        if (res == SZ_OK && bufIndex != (unsigned)(int)-1)
285        {
286          res = mtc->mtCallback->Write(mtc->mtCallbackObject, bufIndex);
287          if (res != SZ_OK)
288          {
289            mtc->writeRes = res;
290            MtProgress_SetError(&mtc->mtProgress, res);
291          }
292        }
293
294        if (++wi >= mtc->numBlocksMax)
295          wi = 0;
296        {
297          BoolInt isReady;
298
299          CriticalSection_Enter(&mtc->cs);
300          
301          if (bufIndex != (unsigned)(int)-1)
302          {
303            mtc->freeBlockList[bufIndex] = mtc->freeBlockHead;
304            mtc->freeBlockHead = bufIndex;
305          }
306          
307          isReady = mtc->ReadyBlocks[wi];
308          
309          if (isReady)
310            mtc->ReadyBlocks[wi] = False;
311          else
312            mtc->writeIndex = wi;
313          
314          CriticalSection_Leave(&mtc->cs);
315
316          RINOK_THREAD(Semaphore_Release1(&mtc->blocksSemaphore))
317
318          if (!isReady)
319            break;
320        }
321
322        {
323          CMtCoderBlock *block = &mtc->blocks[wi];
324          if (res == SZ_OK && block->res != SZ_OK)
325            res = block->res;
326          bufIndex = block->bufIndex;
327          finished = block->finished;
328        }
329      }
330    }
331    #endif
332      
333    if (finished || res != SZ_OK)
334      return 0;
335  }
336}
337
338
339static THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE ThreadFunc(void *pp)
340{
341  CMtCoderThread *t = (CMtCoderThread *)pp;
342  for (;;)
343  {
344    if (Event_Wait(&t->startEvent) != 0)
345      return SZ_ERROR_THREAD;
346    if (t->stop)
347      return 0;
348    {
349      SRes res = ThreadFunc2(t);
350      CMtCoder *mtc = t->mtCoder;
351      if (res != SZ_OK)
352      {
353        MtProgress_SetError(&mtc->mtProgress, res);
354      }
355      
356      #ifndef MTCODER__USE_WRITE_THREAD
357      {
358        unsigned numFinished = (unsigned)InterlockedIncrement(&mtc->numFinishedThreads);
359        if (numFinished == mtc->numStartedThreads)
360          if (Event_Set(&mtc->finishedEvent) != 0)
361            return SZ_ERROR_THREAD;
362      }
363      #endif
364    }
365  }
366}
367
368
369
370void MtCoder_Construct(CMtCoder *p)
371{
372  unsigned i;
373  
374  p->blockSize = 0;
375  p->numThreadsMax = 0;
376  p->expectedDataSize = (UInt64)(Int64)-1;
377
378  p->inStream = NULL;
379  p->inData = NULL;
380  p->inDataSize = 0;
381
382  p->progress = NULL;
383  p->allocBig = NULL;
384
385  p->mtCallback = NULL;
386  p->mtCallbackObject = NULL;
387
388  p->allocatedBufsSize = 0;
389
390  Event_Construct(&p->readEvent);
391  Semaphore_Construct(&p->blocksSemaphore);
392
393  for (i = 0; i < MTCODER__THREADS_MAX; i++)
394  {
395    CMtCoderThread *t = &p->threads[i];
396    t->mtCoder = p;
397    t->index = i;
398    t->inBuf = NULL;
399    t->stop = False;
400    Event_Construct(&t->startEvent);
401    Thread_Construct(&t->thread);
402  }
403
404  #ifdef MTCODER__USE_WRITE_THREAD
405    for (i = 0; i < MTCODER__BLOCKS_MAX; i++)
406      Event_Construct(&p->writeEvents[i]);
407  #else
408    Event_Construct(&p->finishedEvent);
409  #endif
410
411  CriticalSection_Init(&p->cs);
412  CriticalSection_Init(&p->mtProgress.cs);
413}
414
415
416
417
418static void MtCoder_Free(CMtCoder *p)
419{
420  unsigned i;
421
422  /*
423  p->stopReading = True;
424  if (Event_IsCreated(&p->readEvent))
425    Event_Set(&p->readEvent);
426  */
427
428  for (i = 0; i < MTCODER__THREADS_MAX; i++)
429    MtCoderThread_Destruct(&p->threads[i]);
430
431  Event_Close(&p->readEvent);
432  Semaphore_Close(&p->blocksSemaphore);
433
434  #ifdef MTCODER__USE_WRITE_THREAD
435    for (i = 0; i < MTCODER__BLOCKS_MAX; i++)
436      Event_Close(&p->writeEvents[i]);
437  #else
438    Event_Close(&p->finishedEvent);
439  #endif
440}
441
442
443void MtCoder_Destruct(CMtCoder *p)
444{
445  MtCoder_Free(p);
446
447  CriticalSection_Delete(&p->cs);
448  CriticalSection_Delete(&p->mtProgress.cs);
449}
450
451
452SRes MtCoder_Code(CMtCoder *p)
453{
454  unsigned numThreads = p->numThreadsMax;
455  unsigned numBlocksMax;
456  unsigned i;
457  SRes res = SZ_OK;
458
459  if (numThreads > MTCODER__THREADS_MAX)
460    numThreads = MTCODER__THREADS_MAX;
461  numBlocksMax = MTCODER__GET_NUM_BLOCKS_FROM_THREADS(numThreads);
462  
463  if (p->blockSize < ((UInt32)1 << 26)) numBlocksMax++;
464  if (p->blockSize < ((UInt32)1 << 24)) numBlocksMax++;
465  if (p->blockSize < ((UInt32)1 << 22)) numBlocksMax++;
466
467  if (numBlocksMax > MTCODER__BLOCKS_MAX)
468    numBlocksMax = MTCODER__BLOCKS_MAX;
469
470  if (p->blockSize != p->allocatedBufsSize)
471  {
472    for (i = 0; i < MTCODER__THREADS_MAX; i++)
473    {
474      CMtCoderThread *t = &p->threads[i];
475      if (t->inBuf)
476      {
477        ISzAlloc_Free(p->allocBig, t->inBuf);
478        t->inBuf = NULL;
479      }
480    }
481    p->allocatedBufsSize = p->blockSize;
482  }
483
484  p->readRes = SZ_OK;
485
486  MtProgress_Init(&p->mtProgress, p->progress);
487
488  #ifdef MTCODER__USE_WRITE_THREAD
489    for (i = 0; i < numBlocksMax; i++)
490    {
491      RINOK_THREAD(ArEvent_OptCreate_And_Reset(&p->writeEvents[i]));
492    }
493  #else
494    RINOK_THREAD(ArEvent_OptCreate_And_Reset(&p->finishedEvent));
495  #endif
496
497  {
498    RINOK_THREAD(ArEvent_OptCreate_And_Reset(&p->readEvent));
499
500    if (Semaphore_IsCreated(&p->blocksSemaphore))
501    {
502      RINOK_THREAD(Semaphore_Close(&p->blocksSemaphore));
503    }
504    RINOK_THREAD(Semaphore_Create(&p->blocksSemaphore, numBlocksMax, numBlocksMax));
505  }
506
507  for (i = 0; i < MTCODER__BLOCKS_MAX - 1; i++)
508    p->freeBlockList[i] = i + 1;
509  p->freeBlockList[MTCODER__BLOCKS_MAX - 1] = (unsigned)(int)-1;
510  p->freeBlockHead = 0;
511
512  p->readProcessed = 0;
513  p->blockIndex = 0;
514  p->numBlocksMax = numBlocksMax;
515  p->stopReading = False;
516
517  #ifndef MTCODER__USE_WRITE_THREAD
518    p->writeIndex = 0;
519    p->writeRes = SZ_OK;
520    for (i = 0; i < MTCODER__BLOCKS_MAX; i++)
521      p->ReadyBlocks[i] = False;
522    p->numFinishedThreads = 0;
523  #endif
524
525  p->numStartedThreadsLimit = numThreads;
526  p->numStartedThreads = 0;
527
528  // for (i = 0; i < numThreads; i++)
529  {
530    CMtCoderThread *nextThread = &p->threads[p->numStartedThreads++];
531    RINOK(MtCoderThread_CreateAndStart(nextThread));
532  }
533
534  RINOK_THREAD(Event_Set(&p->readEvent))
535
536  #ifdef MTCODER__USE_WRITE_THREAD
537  {
538    unsigned bi = 0;
539
540    for (;; bi++)
541    {
542      if (bi >= numBlocksMax)
543        bi = 0;
544
545      RINOK_THREAD(Event_Wait(&p->writeEvents[bi]))
546
547      {
548        const CMtCoderBlock *block = &p->blocks[bi];
549        unsigned bufIndex = block->bufIndex;
550        BoolInt finished = block->finished;
551        if (res == SZ_OK && block->res != SZ_OK)
552          res = block->res;
553
554        if (bufIndex != (unsigned)(int)-1)
555        {
556          if (res == SZ_OK)
557          {
558            res = p->mtCallback->Write(p->mtCallbackObject, bufIndex);
559            if (res != SZ_OK)
560              MtProgress_SetError(&p->mtProgress, res);
561          }
562          
563          CriticalSection_Enter(&p->cs);
564          {
565            p->freeBlockList[bufIndex] = p->freeBlockHead;
566            p->freeBlockHead = bufIndex;
567          }
568          CriticalSection_Leave(&p->cs);
569        }
570        
571        RINOK_THREAD(Semaphore_Release1(&p->blocksSemaphore))
572
573        if (finished)
574          break;
575      }
576    }
577  }
578  #else
579  {
580    WRes wres = Event_Wait(&p->finishedEvent);
581    res = MY_SRes_HRESULT_FROM_WRes(wres);
582  }
583  #endif
584
585  if (res == SZ_OK)
586    res = p->readRes;
587
588  if (res == SZ_OK)
589    res = p->mtProgress.res;
590
591  #ifndef MTCODER__USE_WRITE_THREAD
592    if (res == SZ_OK)
593      res = p->writeRes;
594  #endif
595
596  if (res != SZ_OK)
597    MtCoder_Free(p);
598  return res;
599}
600
601#endif