all repos — mgba @ 341ebd04356360cfff3e217bf7a419e43b534ef1

mGBA Game Boy Advance Emulator

src/third-party/lzma/MtCoder.c (view raw)

  1/* MtCoder.c -- Multi-thread Coder
  22010-09-24 : Igor Pavlov : Public domain */
  3
  4#include "Precomp.h"
  5
  6#include <stdio.h>
  7
  8#include "MtCoder.h"
  9
 10void LoopThread_Construct(CLoopThread *p)
 11{
 12  Thread_Construct(&p->thread);
 13  Event_Construct(&p->startEvent);
 14  Event_Construct(&p->finishedEvent);
 15}
 16
 17void LoopThread_Close(CLoopThread *p)
 18{
 19  Thread_Close(&p->thread);
 20  Event_Close(&p->startEvent);
 21  Event_Close(&p->finishedEvent);
 22}
 23
 24static THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE LoopThreadFunc(void *pp)
 25{
 26  CLoopThread *p = (CLoopThread *)pp;
 27  for (;;)
 28  {
 29    if (Event_Wait(&p->startEvent) != 0)
 30      return SZ_ERROR_THREAD;
 31    if (p->stop)
 32      return 0;
 33    p->res = p->func(p->param);
 34    if (Event_Set(&p->finishedEvent) != 0)
 35      return SZ_ERROR_THREAD;
 36  }
 37}
 38
 39WRes LoopThread_Create(CLoopThread *p)
 40{
 41  p->stop = 0;
 42  RINOK(AutoResetEvent_CreateNotSignaled(&p->startEvent));
 43  RINOK(AutoResetEvent_CreateNotSignaled(&p->finishedEvent));
 44  return Thread_Create(&p->thread, LoopThreadFunc, p);
 45}
 46
 47WRes LoopThread_StopAndWait(CLoopThread *p)
 48{
 49  p->stop = 1;
 50  if (Event_Set(&p->startEvent) != 0)
 51    return SZ_ERROR_THREAD;
 52  return Thread_Wait(&p->thread);
 53}
 54
 55WRes LoopThread_StartSubThread(CLoopThread *p) { return Event_Set(&p->startEvent); }
 56WRes LoopThread_WaitSubThread(CLoopThread *p) { return Event_Wait(&p->finishedEvent); }
 57
 58static SRes Progress(ICompressProgress *p, UInt64 inSize, UInt64 outSize)
 59{
 60  return (p && p->Progress(p, inSize, outSize) != SZ_OK) ? SZ_ERROR_PROGRESS : SZ_OK;
 61}
 62
 63static void MtProgress_Init(CMtProgress *p, ICompressProgress *progress)
 64{
 65  unsigned i;
 66  for (i = 0; i < NUM_MT_CODER_THREADS_MAX; i++)
 67    p->inSizes[i] = p->outSizes[i] = 0;
 68  p->totalInSize = p->totalOutSize = 0;
 69  p->progress = progress;
 70  p->res = SZ_OK;
 71}
 72
 73static void MtProgress_Reinit(CMtProgress *p, unsigned index)
 74{
 75  p->inSizes[index] = 0;
 76  p->outSizes[index] = 0;
 77}
 78
 79#define UPDATE_PROGRESS(size, prev, total) \
 80  if (size != (UInt64)(Int64)-1) { total += size - prev; prev = size; }
 81
 82SRes MtProgress_Set(CMtProgress *p, unsigned index, UInt64 inSize, UInt64 outSize)
 83{
 84  SRes res;
 85  CriticalSection_Enter(&p->cs);
 86  UPDATE_PROGRESS(inSize, p->inSizes[index], p->totalInSize)
 87  UPDATE_PROGRESS(outSize, p->outSizes[index], p->totalOutSize)
 88  if (p->res == SZ_OK)
 89    p->res = Progress(p->progress, p->totalInSize, p->totalOutSize);
 90  res = p->res;
 91  CriticalSection_Leave(&p->cs);
 92  return res;
 93}
 94
 95static void MtProgress_SetError(CMtProgress *p, SRes res)
 96{
 97  CriticalSection_Enter(&p->cs);
 98  if (p->res == SZ_OK)
 99    p->res = res;
100  CriticalSection_Leave(&p->cs);
101}
102
103static void MtCoder_SetError(CMtCoder* p, SRes res)
104{
105  CriticalSection_Enter(&p->cs);
106  if (p->res == SZ_OK)
107    p->res = res;
108  CriticalSection_Leave(&p->cs);
109}
110
111/* ---------- MtThread ---------- */
112
113void CMtThread_Construct(CMtThread *p, CMtCoder *mtCoder)
114{
115  p->mtCoder = mtCoder;
116  p->outBuf = 0;
117  p->inBuf = 0;
118  Event_Construct(&p->canRead);
119  Event_Construct(&p->canWrite);
120  LoopThread_Construct(&p->thread);
121}
122
123#define RINOK_THREAD(x) { if((x) != 0) return SZ_ERROR_THREAD; }
124
125static void CMtThread_CloseEvents(CMtThread *p)
126{
127  Event_Close(&p->canRead);
128  Event_Close(&p->canWrite);
129}
130
131static void CMtThread_Destruct(CMtThread *p)
132{
133  CMtThread_CloseEvents(p);
134
135  if (Thread_WasCreated(&p->thread.thread))
136  {
137    LoopThread_StopAndWait(&p->thread);
138    LoopThread_Close(&p->thread);
139  }
140
141  if (p->mtCoder->alloc)
142    IAlloc_Free(p->mtCoder->alloc, p->outBuf);
143  p->outBuf = 0;
144
145  if (p->mtCoder->alloc)
146    IAlloc_Free(p->mtCoder->alloc, p->inBuf);
147  p->inBuf = 0;
148}
149
150#define MY_BUF_ALLOC(buf, size, newSize) \
151  if (buf == 0 || size != newSize) \
152  { IAlloc_Free(p->mtCoder->alloc, buf); \
153    size = newSize; buf = (Byte *)IAlloc_Alloc(p->mtCoder->alloc, size); \
154    if (buf == 0) return SZ_ERROR_MEM; }
155
156static SRes CMtThread_Prepare(CMtThread *p)
157{
158  MY_BUF_ALLOC(p->inBuf, p->inBufSize, p->mtCoder->blockSize)
159  MY_BUF_ALLOC(p->outBuf, p->outBufSize, p->mtCoder->destBlockSize)
160
161  p->stopReading = False;
162  p->stopWriting = False;
163  RINOK_THREAD(AutoResetEvent_CreateNotSignaled(&p->canRead));
164  RINOK_THREAD(AutoResetEvent_CreateNotSignaled(&p->canWrite));
165
166  return SZ_OK;
167}
168
169static SRes FullRead(ISeqInStream *stream, Byte *data, size_t *processedSize)
170{
171  size_t size = *processedSize;
172  *processedSize = 0;
173  while (size != 0)
174  {
175    size_t curSize = size;
176    SRes res = stream->Read(stream, data, &curSize);
177    *processedSize += curSize;
178    data += curSize;
179    size -= curSize;
180    RINOK(res);
181    if (curSize == 0)
182      return SZ_OK;
183  }
184  return SZ_OK;
185}
186
187#define GET_NEXT_THREAD(p) &p->mtCoder->threads[p->index == p->mtCoder->numThreads  - 1 ? 0 : p->index + 1]
188
189static SRes MtThread_Process(CMtThread *p, Bool *stop)
190{
191  CMtThread *next;
192  *stop = True;
193  if (Event_Wait(&p->canRead) != 0)
194    return SZ_ERROR_THREAD;
195  
196  next = GET_NEXT_THREAD(p);
197  
198  if (p->stopReading)
199  {
200    next->stopReading = True;
201    return Event_Set(&next->canRead) == 0 ? SZ_OK : SZ_ERROR_THREAD;
202  }
203
204  {
205    size_t size = p->mtCoder->blockSize;
206    size_t destSize = p->outBufSize;
207
208    RINOK(FullRead(p->mtCoder->inStream, p->inBuf, &size));
209    next->stopReading = *stop = (size != p->mtCoder->blockSize);
210    if (Event_Set(&next->canRead) != 0)
211      return SZ_ERROR_THREAD;
212
213    RINOK(p->mtCoder->mtCallback->Code(p->mtCoder->mtCallback, p->index,
214        p->outBuf, &destSize, p->inBuf, size, *stop));
215
216    MtProgress_Reinit(&p->mtCoder->mtProgress, p->index);
217
218    if (Event_Wait(&p->canWrite) != 0)
219      return SZ_ERROR_THREAD;
220    if (p->stopWriting)
221      return SZ_ERROR_FAIL;
222    if (p->mtCoder->outStream->Write(p->mtCoder->outStream, p->outBuf, destSize) != destSize)
223      return SZ_ERROR_WRITE;
224    return Event_Set(&next->canWrite) == 0 ? SZ_OK : SZ_ERROR_THREAD;
225  }
226}
227
228static THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE ThreadFunc(void *pp)
229{
230  CMtThread *p = (CMtThread *)pp;
231  for (;;)
232  {
233    Bool stop;
234    CMtThread *next = GET_NEXT_THREAD(p);
235    SRes res = MtThread_Process(p, &stop);
236    if (res != SZ_OK)
237    {
238      MtCoder_SetError(p->mtCoder, res);
239      MtProgress_SetError(&p->mtCoder->mtProgress, res);
240      next->stopReading = True;
241      next->stopWriting = True;
242      Event_Set(&next->canRead);
243      Event_Set(&next->canWrite);
244      return res;
245    }
246    if (stop)
247      return 0;
248  }
249}
250
251void MtCoder_Construct(CMtCoder* p)
252{
253  unsigned i;
254  p->alloc = 0;
255  for (i = 0; i < NUM_MT_CODER_THREADS_MAX; i++)
256  {
257    CMtThread *t = &p->threads[i];
258    t->index = i;
259    CMtThread_Construct(t, p);
260  }
261  CriticalSection_Init(&p->cs);
262  CriticalSection_Init(&p->mtProgress.cs);
263}
264
265void MtCoder_Destruct(CMtCoder* p)
266{
267  unsigned i;
268  for (i = 0; i < NUM_MT_CODER_THREADS_MAX; i++)
269    CMtThread_Destruct(&p->threads[i]);
270  CriticalSection_Delete(&p->cs);
271  CriticalSection_Delete(&p->mtProgress.cs);
272}
273
274SRes MtCoder_Code(CMtCoder *p)
275{
276  unsigned i, numThreads = p->numThreads;
277  SRes res = SZ_OK;
278  p->res = SZ_OK;
279
280  MtProgress_Init(&p->mtProgress, p->progress);
281
282  for (i = 0; i < numThreads; i++)
283  {
284    RINOK(CMtThread_Prepare(&p->threads[i]));
285  }
286
287  for (i = 0; i < numThreads; i++)
288  {
289    CMtThread *t = &p->threads[i];
290    CLoopThread *lt = &t->thread;
291
292    if (!Thread_WasCreated(&lt->thread))
293    {
294      lt->func = ThreadFunc;
295      lt->param = t;
296
297      if (LoopThread_Create(lt) != SZ_OK)
298      {
299        res = SZ_ERROR_THREAD;
300        break;
301      }
302    }
303  }
304
305  if (res == SZ_OK)
306  {
307    unsigned j;
308    for (i = 0; i < numThreads; i++)
309    {
310      CMtThread *t = &p->threads[i];
311      if (LoopThread_StartSubThread(&t->thread) != SZ_OK)
312      {
313        res = SZ_ERROR_THREAD;
314        p->threads[0].stopReading = True;
315        break;
316      }
317    }
318
319    Event_Set(&p->threads[0].canWrite);
320    Event_Set(&p->threads[0].canRead);
321
322    for (j = 0; j < i; j++)
323      LoopThread_WaitSubThread(&p->threads[j].thread);
324  }
325
326  for (i = 0; i < numThreads; i++)
327    CMtThread_CloseEvents(&p->threads[i]);
328  return (res == SZ_OK) ? p->res : res;
329}