src/third-party/lzma/MtCoder.c (view raw)
1/* MtCoder.c -- Multi-thread Coder
22010-09-24 : Igor Pavlov : Public domain */
3
4#include "Precomp.h"
5
6#include <stdio.h>
7
8#include "MtCoder.h"
9
10void LoopThread_Construct(CLoopThread *p)
11{
12 Thread_Construct(&p->thread);
13 Event_Construct(&p->startEvent);
14 Event_Construct(&p->finishedEvent);
15}
16
17void LoopThread_Close(CLoopThread *p)
18{
19 Thread_Close(&p->thread);
20 Event_Close(&p->startEvent);
21 Event_Close(&p->finishedEvent);
22}
23
24static THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE LoopThreadFunc(void *pp)
25{
26 CLoopThread *p = (CLoopThread *)pp;
27 for (;;)
28 {
29 if (Event_Wait(&p->startEvent) != 0)
30 return SZ_ERROR_THREAD;
31 if (p->stop)
32 return 0;
33 p->res = p->func(p->param);
34 if (Event_Set(&p->finishedEvent) != 0)
35 return SZ_ERROR_THREAD;
36 }
37}
38
39WRes LoopThread_Create(CLoopThread *p)
40{
41 p->stop = 0;
42 RINOK(AutoResetEvent_CreateNotSignaled(&p->startEvent));
43 RINOK(AutoResetEvent_CreateNotSignaled(&p->finishedEvent));
44 return Thread_Create(&p->thread, LoopThreadFunc, p);
45}
46
47WRes LoopThread_StopAndWait(CLoopThread *p)
48{
49 p->stop = 1;
50 if (Event_Set(&p->startEvent) != 0)
51 return SZ_ERROR_THREAD;
52 return Thread_Wait(&p->thread);
53}
54
55WRes LoopThread_StartSubThread(CLoopThread *p) { return Event_Set(&p->startEvent); }
56WRes LoopThread_WaitSubThread(CLoopThread *p) { return Event_Wait(&p->finishedEvent); }
57
58static SRes Progress(ICompressProgress *p, UInt64 inSize, UInt64 outSize)
59{
60 return (p && p->Progress(p, inSize, outSize) != SZ_OK) ? SZ_ERROR_PROGRESS : SZ_OK;
61}
62
63static void MtProgress_Init(CMtProgress *p, ICompressProgress *progress)
64{
65 unsigned i;
66 for (i = 0; i < NUM_MT_CODER_THREADS_MAX; i++)
67 p->inSizes[i] = p->outSizes[i] = 0;
68 p->totalInSize = p->totalOutSize = 0;
69 p->progress = progress;
70 p->res = SZ_OK;
71}
72
73static void MtProgress_Reinit(CMtProgress *p, unsigned index)
74{
75 p->inSizes[index] = 0;
76 p->outSizes[index] = 0;
77}
78
79#define UPDATE_PROGRESS(size, prev, total) \
80 if (size != (UInt64)(Int64)-1) { total += size - prev; prev = size; }
81
82SRes MtProgress_Set(CMtProgress *p, unsigned index, UInt64 inSize, UInt64 outSize)
83{
84 SRes res;
85 CriticalSection_Enter(&p->cs);
86 UPDATE_PROGRESS(inSize, p->inSizes[index], p->totalInSize)
87 UPDATE_PROGRESS(outSize, p->outSizes[index], p->totalOutSize)
88 if (p->res == SZ_OK)
89 p->res = Progress(p->progress, p->totalInSize, p->totalOutSize);
90 res = p->res;
91 CriticalSection_Leave(&p->cs);
92 return res;
93}
94
95static void MtProgress_SetError(CMtProgress *p, SRes res)
96{
97 CriticalSection_Enter(&p->cs);
98 if (p->res == SZ_OK)
99 p->res = res;
100 CriticalSection_Leave(&p->cs);
101}
102
103static void MtCoder_SetError(CMtCoder* p, SRes res)
104{
105 CriticalSection_Enter(&p->cs);
106 if (p->res == SZ_OK)
107 p->res = res;
108 CriticalSection_Leave(&p->cs);
109}
110
111/* ---------- MtThread ---------- */
112
113void CMtThread_Construct(CMtThread *p, CMtCoder *mtCoder)
114{
115 p->mtCoder = mtCoder;
116 p->outBuf = 0;
117 p->inBuf = 0;
118 Event_Construct(&p->canRead);
119 Event_Construct(&p->canWrite);
120 LoopThread_Construct(&p->thread);
121}
122
123#define RINOK_THREAD(x) { if((x) != 0) return SZ_ERROR_THREAD; }
124
125static void CMtThread_CloseEvents(CMtThread *p)
126{
127 Event_Close(&p->canRead);
128 Event_Close(&p->canWrite);
129}
130
131static void CMtThread_Destruct(CMtThread *p)
132{
133 CMtThread_CloseEvents(p);
134
135 if (Thread_WasCreated(&p->thread.thread))
136 {
137 LoopThread_StopAndWait(&p->thread);
138 LoopThread_Close(&p->thread);
139 }
140
141 if (p->mtCoder->alloc)
142 IAlloc_Free(p->mtCoder->alloc, p->outBuf);
143 p->outBuf = 0;
144
145 if (p->mtCoder->alloc)
146 IAlloc_Free(p->mtCoder->alloc, p->inBuf);
147 p->inBuf = 0;
148}
149
150#define MY_BUF_ALLOC(buf, size, newSize) \
151 if (buf == 0 || size != newSize) \
152 { IAlloc_Free(p->mtCoder->alloc, buf); \
153 size = newSize; buf = (Byte *)IAlloc_Alloc(p->mtCoder->alloc, size); \
154 if (buf == 0) return SZ_ERROR_MEM; }
155
156static SRes CMtThread_Prepare(CMtThread *p)
157{
158 MY_BUF_ALLOC(p->inBuf, p->inBufSize, p->mtCoder->blockSize)
159 MY_BUF_ALLOC(p->outBuf, p->outBufSize, p->mtCoder->destBlockSize)
160
161 p->stopReading = False;
162 p->stopWriting = False;
163 RINOK_THREAD(AutoResetEvent_CreateNotSignaled(&p->canRead));
164 RINOK_THREAD(AutoResetEvent_CreateNotSignaled(&p->canWrite));
165
166 return SZ_OK;
167}
168
169static SRes FullRead(ISeqInStream *stream, Byte *data, size_t *processedSize)
170{
171 size_t size = *processedSize;
172 *processedSize = 0;
173 while (size != 0)
174 {
175 size_t curSize = size;
176 SRes res = stream->Read(stream, data, &curSize);
177 *processedSize += curSize;
178 data += curSize;
179 size -= curSize;
180 RINOK(res);
181 if (curSize == 0)
182 return SZ_OK;
183 }
184 return SZ_OK;
185}
186
187#define GET_NEXT_THREAD(p) &p->mtCoder->threads[p->index == p->mtCoder->numThreads - 1 ? 0 : p->index + 1]
188
189static SRes MtThread_Process(CMtThread *p, Bool *stop)
190{
191 CMtThread *next;
192 *stop = True;
193 if (Event_Wait(&p->canRead) != 0)
194 return SZ_ERROR_THREAD;
195
196 next = GET_NEXT_THREAD(p);
197
198 if (p->stopReading)
199 {
200 next->stopReading = True;
201 return Event_Set(&next->canRead) == 0 ? SZ_OK : SZ_ERROR_THREAD;
202 }
203
204 {
205 size_t size = p->mtCoder->blockSize;
206 size_t destSize = p->outBufSize;
207
208 RINOK(FullRead(p->mtCoder->inStream, p->inBuf, &size));
209 next->stopReading = *stop = (size != p->mtCoder->blockSize);
210 if (Event_Set(&next->canRead) != 0)
211 return SZ_ERROR_THREAD;
212
213 RINOK(p->mtCoder->mtCallback->Code(p->mtCoder->mtCallback, p->index,
214 p->outBuf, &destSize, p->inBuf, size, *stop));
215
216 MtProgress_Reinit(&p->mtCoder->mtProgress, p->index);
217
218 if (Event_Wait(&p->canWrite) != 0)
219 return SZ_ERROR_THREAD;
220 if (p->stopWriting)
221 return SZ_ERROR_FAIL;
222 if (p->mtCoder->outStream->Write(p->mtCoder->outStream, p->outBuf, destSize) != destSize)
223 return SZ_ERROR_WRITE;
224 return Event_Set(&next->canWrite) == 0 ? SZ_OK : SZ_ERROR_THREAD;
225 }
226}
227
228static THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE ThreadFunc(void *pp)
229{
230 CMtThread *p = (CMtThread *)pp;
231 for (;;)
232 {
233 Bool stop;
234 CMtThread *next = GET_NEXT_THREAD(p);
235 SRes res = MtThread_Process(p, &stop);
236 if (res != SZ_OK)
237 {
238 MtCoder_SetError(p->mtCoder, res);
239 MtProgress_SetError(&p->mtCoder->mtProgress, res);
240 next->stopReading = True;
241 next->stopWriting = True;
242 Event_Set(&next->canRead);
243 Event_Set(&next->canWrite);
244 return res;
245 }
246 if (stop)
247 return 0;
248 }
249}
250
251void MtCoder_Construct(CMtCoder* p)
252{
253 unsigned i;
254 p->alloc = 0;
255 for (i = 0; i < NUM_MT_CODER_THREADS_MAX; i++)
256 {
257 CMtThread *t = &p->threads[i];
258 t->index = i;
259 CMtThread_Construct(t, p);
260 }
261 CriticalSection_Init(&p->cs);
262 CriticalSection_Init(&p->mtProgress.cs);
263}
264
265void MtCoder_Destruct(CMtCoder* p)
266{
267 unsigned i;
268 for (i = 0; i < NUM_MT_CODER_THREADS_MAX; i++)
269 CMtThread_Destruct(&p->threads[i]);
270 CriticalSection_Delete(&p->cs);
271 CriticalSection_Delete(&p->mtProgress.cs);
272}
273
274SRes MtCoder_Code(CMtCoder *p)
275{
276 unsigned i, numThreads = p->numThreads;
277 SRes res = SZ_OK;
278 p->res = SZ_OK;
279
280 MtProgress_Init(&p->mtProgress, p->progress);
281
282 for (i = 0; i < numThreads; i++)
283 {
284 RINOK(CMtThread_Prepare(&p->threads[i]));
285 }
286
287 for (i = 0; i < numThreads; i++)
288 {
289 CMtThread *t = &p->threads[i];
290 CLoopThread *lt = &t->thread;
291
292 if (!Thread_WasCreated(<->thread))
293 {
294 lt->func = ThreadFunc;
295 lt->param = t;
296
297 if (LoopThread_Create(lt) != SZ_OK)
298 {
299 res = SZ_ERROR_THREAD;
300 break;
301 }
302 }
303 }
304
305 if (res == SZ_OK)
306 {
307 unsigned j;
308 for (i = 0; i < numThreads; i++)
309 {
310 CMtThread *t = &p->threads[i];
311 if (LoopThread_StartSubThread(&t->thread) != SZ_OK)
312 {
313 res = SZ_ERROR_THREAD;
314 p->threads[0].stopReading = True;
315 break;
316 }
317 }
318
319 Event_Set(&p->threads[0].canWrite);
320 Event_Set(&p->threads[0].canRead);
321
322 for (j = 0; j < i; j++)
323 LoopThread_WaitSubThread(&p->threads[j].thread);
324 }
325
326 for (i = 0; i < numThreads; i++)
327 CMtThread_CloseEvents(&p->threads[i]);
328 return (res == SZ_OK) ? p->res : res;
329}