all repos — mgba @ b2d406a411b31acce5bbf0246af32a80c22ca834

mGBA Game Boy Advance Emulator

src/third-party/lzma/MtDec.c (view raw)

   1/* MtDec.c -- Multi-thread Decoder
   22019-02-02 : Igor Pavlov : Public domain */
   3
   4#include "Precomp.h"
   5
   6// #define SHOW_DEBUG_INFO
   7
   8// #include <stdio.h>
   9
  10#ifdef SHOW_DEBUG_INFO
  11#include <stdio.h>
  12#endif
  13
  14#ifdef SHOW_DEBUG_INFO
  15#define PRF(x) x
  16#else
  17#define PRF(x)
  18#endif
  19
  20#define PRF_STR_INT(s, d) PRF(printf("\n" s " %d\n", (unsigned)d))
  21
  22#include "MtDec.h"
  23
  24#ifndef _7ZIP_ST
  25
  26void MtProgress_Init(CMtProgress *p, ICompressProgress *progress)
  27{
  28  p->progress = progress;
  29  p->res = SZ_OK;
  30  p->totalInSize = 0;
  31  p->totalOutSize = 0;
  32}
  33
  34
  35SRes MtProgress_Progress_ST(CMtProgress *p)
  36{
  37  if (p->res == SZ_OK && p->progress)
  38    if (ICompressProgress_Progress(p->progress, p->totalInSize, p->totalOutSize) != SZ_OK)
  39      p->res = SZ_ERROR_PROGRESS;
  40  return p->res;
  41}
  42
  43
  44SRes MtProgress_ProgressAdd(CMtProgress *p, UInt64 inSize, UInt64 outSize)
  45{
  46  SRes res;
  47  CriticalSection_Enter(&p->cs);
  48  
  49  p->totalInSize += inSize;
  50  p->totalOutSize += outSize;
  51  if (p->res == SZ_OK && p->progress)
  52    if (ICompressProgress_Progress(p->progress, p->totalInSize, p->totalOutSize) != SZ_OK)
  53      p->res = SZ_ERROR_PROGRESS;
  54  res = p->res;
  55  
  56  CriticalSection_Leave(&p->cs);
  57  return res;
  58}
  59
  60
  61SRes MtProgress_GetError(CMtProgress *p)
  62{
  63  SRes res;
  64  CriticalSection_Enter(&p->cs);
  65  res = p->res;
  66  CriticalSection_Leave(&p->cs);
  67  return res;
  68}
  69
  70
  71void MtProgress_SetError(CMtProgress *p, SRes res)
  72{
  73  CriticalSection_Enter(&p->cs);
  74  if (p->res == SZ_OK)
  75    p->res = res;
  76  CriticalSection_Leave(&p->cs);
  77}
  78
  79
  80#define RINOK_THREAD(x) RINOK(x)
  81
  82
  83static WRes ArEvent_OptCreate_And_Reset(CEvent *p)
  84{
  85  if (Event_IsCreated(p))
  86    return Event_Reset(p);
  87  return AutoResetEvent_CreateNotSignaled(p);
  88}
  89
  90
  91struct __CMtDecBufLink
  92{
  93  struct __CMtDecBufLink *next;
  94  void *pad[3];
  95};
  96
  97typedef struct __CMtDecBufLink CMtDecBufLink;
  98
  99#define MTDEC__LINK_DATA_OFFSET sizeof(CMtDecBufLink)
 100#define MTDEC__DATA_PTR_FROM_LINK(link) ((Byte *)(link) + MTDEC__LINK_DATA_OFFSET)
 101
 102
 103
 104static THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE ThreadFunc(void *pp);
 105
 106
 107static WRes MtDecThread_CreateEvents(CMtDecThread *t)
 108{
 109  WRes wres = ArEvent_OptCreate_And_Reset(&t->canWrite);
 110  if (wres == 0)
 111  {
 112    wres = ArEvent_OptCreate_And_Reset(&t->canRead);
 113    if (wres == 0)
 114      return SZ_OK;
 115  }
 116  return wres;
 117}
 118
 119
 120static SRes MtDecThread_CreateAndStart(CMtDecThread *t)
 121{
 122  WRes wres = MtDecThread_CreateEvents(t);
 123  // wres = 17; // for test
 124  if (wres == 0)
 125  {
 126    if (Thread_WasCreated(&t->thread))
 127      return SZ_OK;
 128    wres = Thread_Create(&t->thread, ThreadFunc, t);
 129    if (wres == 0)
 130      return SZ_OK;
 131  }
 132  return MY_SRes_HRESULT_FROM_WRes(wres);
 133}
 134
 135
 136void MtDecThread_FreeInBufs(CMtDecThread *t)
 137{
 138  if (t->inBuf)
 139  {
 140    void *link = t->inBuf;
 141    t->inBuf = NULL;
 142    do
 143    {
 144      void *next = ((CMtDecBufLink *)link)->next;
 145      ISzAlloc_Free(t->mtDec->alloc, link);
 146      link = next;
 147    }
 148    while (link);
 149  }
 150}
 151
 152
 153static void MtDecThread_CloseThread(CMtDecThread *t)
 154{
 155  if (Thread_WasCreated(&t->thread))
 156  {
 157    Event_Set(&t->canWrite); /* we can disable it. There are no threads waiting canWrite in normal cases */
 158    Event_Set(&t->canRead);
 159    Thread_Wait(&t->thread);
 160    Thread_Close(&t->thread);
 161  }
 162
 163  Event_Close(&t->canRead);
 164  Event_Close(&t->canWrite);
 165}
 166
 167static void MtDec_CloseThreads(CMtDec *p)
 168{
 169  unsigned i;
 170  for (i = 0; i < MTDEC__THREADS_MAX; i++)
 171    MtDecThread_CloseThread(&p->threads[i]);
 172}
 173
 174static void MtDecThread_Destruct(CMtDecThread *t)
 175{
 176  MtDecThread_CloseThread(t);
 177  MtDecThread_FreeInBufs(t);
 178}
 179
 180
 181
 182static SRes FullRead(ISeqInStream *stream, Byte *data, size_t *processedSize)
 183{
 184  size_t size = *processedSize;
 185  *processedSize = 0;
 186  while (size != 0)
 187  {
 188    size_t cur = size;
 189    SRes res = ISeqInStream_Read(stream, data, &cur);
 190    *processedSize += cur;
 191    data += cur;
 192    size -= cur;
 193    RINOK(res);
 194    if (cur == 0)
 195      return SZ_OK;
 196  }
 197  return SZ_OK;
 198}
 199
 200
 201static SRes MtDec_GetError_Spec(CMtDec *p, UInt64 interruptIndex, BoolInt *wasInterrupted)
 202{
 203  SRes res;
 204  CriticalSection_Enter(&p->mtProgress.cs);
 205  *wasInterrupted = (p->needInterrupt && interruptIndex > p->interruptIndex);
 206  res = p->mtProgress.res;
 207  CriticalSection_Leave(&p->mtProgress.cs);
 208  return res;
 209}
 210
 211static SRes MtDec_Progress_GetError_Spec(CMtDec *p, UInt64 inSize, UInt64 outSize, UInt64 interruptIndex, BoolInt *wasInterrupted)
 212{
 213  SRes res;
 214  CriticalSection_Enter(&p->mtProgress.cs);
 215
 216  p->mtProgress.totalInSize += inSize;
 217  p->mtProgress.totalOutSize += outSize;
 218  if (p->mtProgress.res == SZ_OK && p->mtProgress.progress)
 219    if (ICompressProgress_Progress(p->mtProgress.progress, p->mtProgress.totalInSize, p->mtProgress.totalOutSize) != SZ_OK)
 220      p->mtProgress.res = SZ_ERROR_PROGRESS;
 221
 222  *wasInterrupted = (p->needInterrupt && interruptIndex > p->interruptIndex);
 223  res = p->mtProgress.res;
 224  
 225  CriticalSection_Leave(&p->mtProgress.cs);
 226
 227  return res;
 228}
 229
 230static void MtDec_Interrupt(CMtDec *p, UInt64 interruptIndex)
 231{
 232  CriticalSection_Enter(&p->mtProgress.cs);
 233  if (!p->needInterrupt || interruptIndex < p->interruptIndex)
 234  {
 235    p->interruptIndex = interruptIndex;
 236    p->needInterrupt = True;
 237  }
 238  CriticalSection_Leave(&p->mtProgress.cs);
 239}
 240
 241Byte *MtDec_GetCrossBuff(CMtDec *p)
 242{
 243  Byte *cr = p->crossBlock;
 244  if (!cr)
 245  {
 246    cr = (Byte *)ISzAlloc_Alloc(p->alloc, MTDEC__LINK_DATA_OFFSET + p->inBufSize);
 247    if (!cr)
 248      return NULL;
 249    p->crossBlock = cr;
 250  }
 251  return MTDEC__DATA_PTR_FROM_LINK(cr);
 252}
 253
 254
 255/*
 256  ThreadFunc2() returns:
 257  0      - in all normal cases (even for stream error or memory allocation error)
 258  (!= 0) - WRes error return by system threading function
 259*/
 260
 261// #define MTDEC_ProgessStep (1 << 22)
 262#define MTDEC_ProgessStep (1 << 0)
 263
 264static WRes ThreadFunc2(CMtDecThread *t)
 265{
 266  CMtDec *p = t->mtDec;
 267
 268  PRF_STR_INT("ThreadFunc2", t->index);
 269
 270  // SetThreadAffinityMask(GetCurrentThread(), 1 << t->index);
 271
 272  for (;;)
 273  {
 274    SRes res, codeRes;
 275    BoolInt wasInterrupted, isAllocError, overflow, finish;
 276    SRes threadingErrorSRes;
 277    BoolInt needCode, needWrite, needContinue;
 278    
 279    size_t inDataSize_Start;
 280    UInt64 inDataSize;
 281    // UInt64 inDataSize_Full;
 282    
 283    UInt64 blockIndex;
 284
 285    UInt64 inPrev = 0;
 286    UInt64 outPrev = 0;
 287    UInt64 inCodePos;
 288    UInt64 outCodePos;
 289    
 290    Byte *afterEndData = NULL;
 291    size_t afterEndData_Size = 0;
 292
 293    BoolInt canCreateNewThread = False;
 294    // CMtDecCallbackInfo parse;
 295    CMtDecThread *nextThread;
 296
 297    PRF_STR_INT("Event_Wait(&t->canRead)", t->index);
 298
 299    RINOK_THREAD(Event_Wait(&t->canRead));
 300    if (p->exitThread)
 301      return 0;
 302
 303    PRF_STR_INT("after Event_Wait(&t->canRead)", t->index);
 304
 305    // if (t->index == 3) return 19; // for test
 306
 307    blockIndex = p->blockIndex++;
 308
 309    // PRF(printf("\ncanRead\n"))
 310
 311    res = MtDec_Progress_GetError_Spec(p, 0, 0, blockIndex, &wasInterrupted);
 312
 313    finish = p->readWasFinished;
 314    needCode = False;
 315    needWrite = False;
 316    isAllocError = False;
 317    overflow = False;
 318
 319    inDataSize_Start = 0;
 320    inDataSize = 0;
 321    // inDataSize_Full = 0;
 322
 323    if (res == SZ_OK && !wasInterrupted)
 324    {
 325      // if (p->inStream)
 326      {
 327        CMtDecBufLink *prev = NULL;
 328        CMtDecBufLink *link = (CMtDecBufLink *)t->inBuf;
 329        size_t crossSize = p->crossEnd - p->crossStart;
 330
 331        PRF(printf("\ncrossSize = %d\n", crossSize));
 332
 333        for (;;)
 334        {
 335          if (!link)
 336          {
 337            link = (CMtDecBufLink *)ISzAlloc_Alloc(p->alloc, MTDEC__LINK_DATA_OFFSET + p->inBufSize);
 338            if (!link)
 339            {
 340              finish = True;
 341              // p->allocError_for_Read_BlockIndex = blockIndex;
 342              isAllocError = True;
 343              break;
 344            }
 345            link->next = NULL;
 346            if (prev)
 347            {
 348              // static unsigned g_num = 0;
 349              // printf("\n%6d : %x", ++g_num, (unsigned)(size_t)((Byte *)link - (Byte *)prev));
 350              prev->next = link;
 351            }
 352            else
 353              t->inBuf = (void *)link;
 354          }
 355
 356          {
 357            Byte *data = MTDEC__DATA_PTR_FROM_LINK(link);
 358            Byte *parseData = data;
 359            size_t size;
 360
 361            if (crossSize != 0)
 362            {
 363              inDataSize = crossSize;
 364              // inDataSize_Full = inDataSize;
 365              inDataSize_Start = crossSize;
 366              size = crossSize;
 367              parseData = MTDEC__DATA_PTR_FROM_LINK(p->crossBlock) + p->crossStart;
 368              PRF(printf("\ncross : crossStart = %7d  crossEnd = %7d finish = %1d",
 369                  (int)p->crossStart, (int)p->crossEnd, (int)finish));
 370            }
 371            else
 372            {
 373              size = p->inBufSize;
 374              
 375              res = FullRead(p->inStream, data, &size);
 376              
 377              // size = 10; // test
 378
 379              inDataSize += size;
 380              // inDataSize_Full = inDataSize;
 381              if (!prev)
 382                inDataSize_Start = size;
 383
 384              p->readProcessed += size;
 385              finish = (size != p->inBufSize);
 386              if (finish)
 387                p->readWasFinished = True;
 388              
 389              // res = E_INVALIDARG; // test
 390
 391              if (res != SZ_OK)
 392              {
 393                // PRF(printf("\nRead error = %d\n", res))
 394                // we want to decode all data before error
 395                p->readRes = res;
 396                // p->readError_BlockIndex = blockIndex;
 397                p->readWasFinished = True;
 398                finish = True;
 399                res = SZ_OK;
 400                // break;
 401              }
 402
 403              if (inDataSize - inPrev >= MTDEC_ProgessStep)
 404              {
 405                res = MtDec_Progress_GetError_Spec(p, 0, 0, blockIndex, &wasInterrupted);
 406                if (res != SZ_OK || wasInterrupted)
 407                  break;
 408                inPrev = inDataSize;
 409              }
 410            }
 411
 412            {
 413              CMtDecCallbackInfo parse;
 414
 415              parse.startCall = (prev == NULL);
 416              parse.src = parseData;
 417              parse.srcSize = size;
 418              parse.srcFinished = finish;
 419              parse.canCreateNewThread = True;
 420
 421              // PRF(printf("\nParse size = %d\n", (unsigned)size))
 422
 423              p->mtCallback->Parse(p->mtCallbackObject, t->index, &parse);
 424
 425              needWrite = True;
 426              canCreateNewThread = parse.canCreateNewThread;
 427
 428              // printf("\n\n%12I64u %12I64u", (UInt64)p->mtProgress.totalInSize, (UInt64)p->mtProgress.totalOutSize);
 429              
 430              if (
 431                  // parseRes != SZ_OK ||
 432                  // inDataSize - (size - parse.srcSize) > p->inBlockMax
 433                  // ||
 434                  parse.state == MTDEC_PARSE_OVERFLOW
 435                  // || wasInterrupted
 436                  )
 437              {
 438                // Overflow or Parse error - switch from MT decoding to ST decoding
 439                finish = True;
 440                overflow = True;
 441
 442                {
 443                  PRF(printf("\n Overflow"));
 444                  // PRF(printf("\nisBlockFinished = %d", (unsigned)parse.blockWasFinished));
 445                  PRF(printf("\n inDataSize = %d", (unsigned)inDataSize));
 446                }
 447                
 448                if (crossSize != 0)
 449                  memcpy(data, parseData, size);
 450                p->crossStart = 0;
 451                p->crossEnd = 0;
 452                break;
 453              }
 454
 455              if (crossSize != 0)
 456              {
 457                memcpy(data, parseData, parse.srcSize);
 458                p->crossStart += parse.srcSize;
 459              }
 460
 461              if (parse.state != MTDEC_PARSE_CONTINUE || finish)
 462              {
 463                // we don't need to parse in current thread anymore
 464
 465                if (parse.state == MTDEC_PARSE_END)
 466                  finish = True;
 467
 468                needCode = True;
 469                // p->crossFinished = finish;
 470
 471                if (parse.srcSize == size)
 472                {
 473                  // full parsed - no cross transfer
 474                  p->crossStart = 0;
 475                  p->crossEnd = 0;
 476                  break;
 477                }
 478
 479                if (parse.state == MTDEC_PARSE_END)
 480                {
 481                  p->crossStart = 0;
 482                  p->crossEnd = 0;
 483
 484                  if (crossSize != 0)
 485                    memcpy(data + parse.srcSize, parseData + parse.srcSize, size - parse.srcSize); // we need all data
 486                  afterEndData_Size = size - parse.srcSize;
 487                  afterEndData = parseData + parse.srcSize;
 488
 489                  // we reduce data size to required bytes (parsed only)
 490                  inDataSize -= (size - parse.srcSize);
 491                  if (!prev)
 492                    inDataSize_Start = parse.srcSize;
 493                  break;
 494                }
 495
 496                {
 497                  // partial parsed - need cross transfer
 498                  if (crossSize != 0)
 499                    inDataSize = parse.srcSize; // it's only parsed now
 500                  else
 501                  {
 502                    // partial parsed - is not in initial cross block - we need to copy new data to cross block
 503                    Byte *cr = MtDec_GetCrossBuff(p);
 504                    if (!cr)
 505                    {
 506                      {
 507                        PRF(printf("\ncross alloc error error\n"));
 508                        // res = SZ_ERROR_MEM;
 509                        finish = True;
 510                        // p->allocError_for_Read_BlockIndex = blockIndex;
 511                        isAllocError = True;
 512                        break;
 513                      }
 514                    }
 515
 516                    {
 517                      size_t crSize = size - parse.srcSize;
 518                      inDataSize -= crSize;
 519                      p->crossEnd = crSize;
 520                      p->crossStart = 0;
 521                      memcpy(cr, parseData + parse.srcSize, crSize);
 522                    }
 523                  }
 524
 525                  // inDataSize_Full = inDataSize;
 526                  if (!prev)
 527                    inDataSize_Start = parse.srcSize; // it's partial size (parsed only)
 528
 529                  finish = False;
 530                  break;
 531                }
 532              }
 533
 534              if (parse.srcSize != size)
 535              {
 536                res = SZ_ERROR_FAIL;
 537                PRF(printf("\nfinished error SZ_ERROR_FAIL = %d\n", res));
 538                break;
 539              }
 540            }
 541          }
 542          
 543          prev = link;
 544          link = link->next;
 545
 546          if (crossSize != 0)
 547          {
 548            crossSize = 0;
 549            p->crossStart = 0;
 550            p->crossEnd = 0;
 551          }
 552        }
 553      }
 554
 555      if (res == SZ_OK)
 556        res = MtDec_GetError_Spec(p, blockIndex, &wasInterrupted);
 557    }
 558
 559    codeRes = SZ_OK;
 560
 561    if (res == SZ_OK && needCode && !wasInterrupted)
 562    {
 563      codeRes = p->mtCallback->PreCode(p->mtCallbackObject, t->index);
 564      if (codeRes != SZ_OK)
 565      {
 566        needCode = False;
 567        finish = True;
 568        // SZ_ERROR_MEM is expected error here.
 569        //   if (codeRes == SZ_ERROR_MEM) - we will try single-thread decoding later.
 570        //   if (codeRes != SZ_ERROR_MEM) - we can stop decoding or try single-thread decoding.
 571      }
 572    }
 573    
 574    if (res != SZ_OK || wasInterrupted)
 575      finish = True;
 576    
 577    nextThread = NULL;
 578    threadingErrorSRes = SZ_OK;
 579
 580    if (!finish)
 581    {
 582      if (p->numStartedThreads < p->numStartedThreads_Limit && canCreateNewThread)
 583      {
 584        SRes res2 = MtDecThread_CreateAndStart(&p->threads[p->numStartedThreads]);
 585        if (res2 == SZ_OK)
 586        {
 587          // if (p->numStartedThreads % 1000 == 0) PRF(printf("\n numStartedThreads=%d\n", p->numStartedThreads));
 588          p->numStartedThreads++;
 589        }
 590        else
 591        {
 592          PRF(printf("\nERROR: numStartedThreads=%d\n", p->numStartedThreads));
 593          if (p->numStartedThreads == 1)
 594          {
 595            // if only one thread is possible, we leave muti-threading code
 596            finish = True;
 597            needCode = False;
 598            threadingErrorSRes = res2;
 599          }
 600          else
 601            p->numStartedThreads_Limit = p->numStartedThreads;
 602        }
 603      }
 604      
 605      if (!finish)
 606      {
 607        unsigned nextIndex = t->index + 1;
 608        nextThread = &p->threads[nextIndex >= p->numStartedThreads ? 0 : nextIndex];
 609        RINOK_THREAD(Event_Set(&nextThread->canRead))
 610        // We have started executing for new iteration (with next thread)
 611        // And that next thread now is responsible for possible exit from decoding (threading_code)
 612      }
 613    }
 614
 615    // each call of Event_Set(&nextThread->canRead) must be followed by call of Event_Set(&nextThread->canWrite)
 616    // if ( !finish ) we must call Event_Set(&nextThread->canWrite) in any case
 617    // if (  finish ) we switch to single-thread mode and there are 2 ways at the end of current iteration (current block):
 618    //   - if (needContinue) after Write(&needContinue), we restore decoding with new iteration
 619    //   - otherwise we stop decoding and exit from ThreadFunc2()
 620
 621    // Don't change (finish) variable in the further code
 622
 623
 624    // ---------- CODE ----------
 625
 626    inPrev = 0;
 627    outPrev = 0;
 628    inCodePos = 0;
 629    outCodePos = 0;
 630
 631    if (res == SZ_OK && needCode && codeRes == SZ_OK)
 632    {
 633      BoolInt isStartBlock = True;
 634      CMtDecBufLink *link = (CMtDecBufLink *)t->inBuf;
 635
 636      for (;;)
 637      {
 638        size_t inSize;
 639        int stop;
 640
 641        if (isStartBlock)
 642          inSize = inDataSize_Start;
 643        else
 644        {
 645          UInt64 rem = inDataSize - inCodePos;
 646          inSize = p->inBufSize;
 647          if (inSize > rem)
 648            inSize = (size_t)rem;
 649        }
 650
 651        inCodePos += inSize;
 652        stop = True;
 653
 654        codeRes = p->mtCallback->Code(p->mtCallbackObject, t->index,
 655            (const Byte *)MTDEC__DATA_PTR_FROM_LINK(link), inSize,
 656            (inCodePos == inDataSize), // srcFinished
 657            &inCodePos, &outCodePos, &stop);
 658        
 659        if (codeRes != SZ_OK)
 660        {
 661          PRF(printf("\nCode Interrupt error = %x\n", codeRes));
 662          // we interrupt only later blocks
 663          MtDec_Interrupt(p, blockIndex);
 664          break;
 665        }
 666
 667        if (stop || inCodePos == inDataSize)
 668          break;
 669  
 670        {
 671          const UInt64 inDelta = inCodePos - inPrev;
 672          const UInt64 outDelta = outCodePos - outPrev;
 673          if (inDelta >= MTDEC_ProgessStep || outDelta >= MTDEC_ProgessStep)
 674          {
 675            // Sleep(1);
 676            res = MtDec_Progress_GetError_Spec(p, inDelta, outDelta, blockIndex, &wasInterrupted);
 677            if (res != SZ_OK || wasInterrupted)
 678              break;
 679            inPrev = inCodePos;
 680            outPrev = outCodePos;
 681          }
 682        }
 683
 684        link = link->next;
 685        isStartBlock = False;
 686      }
 687    }
 688
 689
 690    // ---------- WRITE ----------
 691   
 692    RINOK_THREAD(Event_Wait(&t->canWrite));
 693
 694  {
 695    BoolInt isErrorMode = False;
 696    BoolInt canRecode = True;
 697    BoolInt needWriteToStream = needWrite;
 698
 699    if (p->exitThread) return 0; // it's never executed in normal cases
 700
 701    if (p->wasInterrupted)
 702      wasInterrupted = True;
 703    else
 704    {
 705      if (codeRes != SZ_OK) // || !needCode // check it !!!
 706      {
 707        p->wasInterrupted = True;
 708        p->codeRes = codeRes;
 709        if (codeRes == SZ_ERROR_MEM)
 710          isAllocError = True;
 711      }
 712      
 713      if (threadingErrorSRes)
 714      {
 715        p->wasInterrupted = True;
 716        p->threadingErrorSRes = threadingErrorSRes;
 717        needWriteToStream = False;
 718      }
 719      if (isAllocError)
 720      {
 721        p->wasInterrupted = True;
 722        p->isAllocError = True;
 723        needWriteToStream = False;
 724      }
 725      if (overflow)
 726      {
 727        p->wasInterrupted = True;
 728        p->overflow = True;
 729        needWriteToStream = False;
 730      }
 731    }
 732
 733    if (needCode)
 734    {
 735      if (wasInterrupted)
 736      {
 737        inCodePos = 0;
 738        outCodePos = 0;
 739      }
 740      {
 741        const UInt64 inDelta = inCodePos - inPrev;
 742        const UInt64 outDelta = outCodePos - outPrev;
 743        // if (inDelta != 0 || outDelta != 0)
 744        res = MtProgress_ProgressAdd(&p->mtProgress, inDelta, outDelta);
 745      }
 746    }
 747
 748    needContinue = (!finish);
 749
 750    // if (res == SZ_OK && needWrite && !wasInterrupted)
 751    if (needWrite)
 752    {
 753      // p->inProcessed += inCodePos;
 754
 755      res = p->mtCallback->Write(p->mtCallbackObject, t->index,
 756          res == SZ_OK && needWriteToStream && !wasInterrupted, // needWrite
 757          afterEndData, afterEndData_Size,
 758          &needContinue,
 759          &canRecode);
 760      
 761      // res= E_INVALIDARG; // for test
 762
 763      PRF(printf("\nAfter Write needContinue = %d\n", (unsigned)needContinue));
 764      PRF(printf("\nprocessed = %d\n", (unsigned)p->inProcessed));
 765
 766      if (res != SZ_OK)
 767      {
 768        PRF(printf("\nWrite error = %d\n", res));
 769        isErrorMode = True;
 770        p->wasInterrupted = True;
 771      }
 772      if (res != SZ_OK
 773          || (!needContinue && !finish))
 774      {
 775        PRF(printf("\nWrite Interrupt error = %x\n", res));
 776        MtDec_Interrupt(p, blockIndex);
 777      }
 778    }
 779
 780    if (canRecode)
 781    if (!needCode
 782        || res != SZ_OK
 783        || p->wasInterrupted
 784        || codeRes != SZ_OK
 785        || wasInterrupted
 786        || p->numFilledThreads != 0
 787        || isErrorMode)
 788    {
 789      if (p->numFilledThreads == 0)
 790        p->filledThreadStart = t->index;
 791      if (inDataSize != 0 || !finish)
 792      {
 793        t->inDataSize_Start = inDataSize_Start;
 794        t->inDataSize = inDataSize;
 795        p->numFilledThreads++;
 796      }
 797      PRF(printf("\np->numFilledThreads = %d\n", p->numFilledThreads));
 798      PRF(printf("p->filledThreadStart = %d\n", p->filledThreadStart));
 799    }
 800
 801    if (!finish)
 802    {
 803      RINOK_THREAD(Event_Set(&nextThread->canWrite));
 804    }
 805    else
 806    {
 807      if (needContinue)
 808      {
 809        // we restore decoding with new iteration
 810        RINOK_THREAD(Event_Set(&p->threads[0].canWrite));
 811      }
 812      else
 813      {
 814        // we exit from decoding
 815        if (t->index == 0)
 816          return SZ_OK;
 817        p->exitThread = True;
 818      }
 819      RINOK_THREAD(Event_Set(&p->threads[0].canRead));
 820    }
 821  }
 822  }
 823}
 824
 825#ifdef _WIN32
 826#define USE_ALLOCA
 827#endif
 828
 829#ifdef USE_ALLOCA
 830#ifdef _WIN32
 831#include <malloc.h>
 832#else
 833#include <stdlib.h>
 834#endif
 835#endif
 836
 837
 838static THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE ThreadFunc1(void *pp)
 839{
 840  WRes res;
 841
 842  CMtDecThread *t = (CMtDecThread *)pp;
 843  CMtDec *p;
 844
 845  // fprintf(stdout, "\n%d = %p\n", t->index, &t);
 846
 847  res = ThreadFunc2(t);
 848  p = t->mtDec;
 849  if (res == 0)
 850    return p->exitThreadWRes;
 851  {
 852    // it's unexpected situation for some threading function error
 853    if (p->exitThreadWRes == 0)
 854      p->exitThreadWRes = res;
 855    PRF(printf("\nthread exit error = %d\n", res));
 856    p->exitThread = True;
 857    Event_Set(&p->threads[0].canRead);
 858    Event_Set(&p->threads[0].canWrite);
 859    MtProgress_SetError(&p->mtProgress, MY_SRes_HRESULT_FROM_WRes(res));
 860  }
 861  return res;
 862}
 863
 864static MY_NO_INLINE THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE ThreadFunc(void *pp)
 865{
 866  CMtDecThread *t = (CMtDecThread *)pp;
 867
 868  // fprintf(stderr, "\n%d = %p - before", t->index, &t);
 869  #ifdef USE_ALLOCA
 870  t->allocaPtr = alloca(t->index * 128);
 871  #endif
 872  return ThreadFunc1(pp);
 873}
 874
 875
 876int MtDec_PrepareRead(CMtDec *p)
 877{
 878  if (p->crossBlock && p->crossStart == p->crossEnd)
 879  {
 880    ISzAlloc_Free(p->alloc, p->crossBlock);
 881    p->crossBlock = NULL;
 882  }
 883    
 884  {
 885    unsigned i;
 886    for (i = 0; i < MTDEC__THREADS_MAX; i++)
 887      if (i > p->numStartedThreads
 888          || p->numFilledThreads <=
 889            (i >= p->filledThreadStart ?
 890              i - p->filledThreadStart :
 891              i + p->numStartedThreads - p->filledThreadStart))
 892        MtDecThread_FreeInBufs(&p->threads[i]);
 893  }
 894
 895  return (p->numFilledThreads != 0) || (p->crossStart != p->crossEnd);
 896}
 897
 898    
 899const Byte *MtDec_Read(CMtDec *p, size_t *inLim)
 900{
 901  while (p->numFilledThreads != 0)
 902  {
 903    CMtDecThread *t = &p->threads[p->filledThreadStart];
 904    
 905    if (*inLim != 0)
 906    {
 907      {
 908        void *link = t->inBuf;
 909        void *next = ((CMtDecBufLink *)link)->next;
 910        ISzAlloc_Free(p->alloc, link);
 911        t->inBuf = next;
 912      }
 913      
 914      if (t->inDataSize == 0)
 915      {
 916        MtDecThread_FreeInBufs(t);
 917        if (--p->numFilledThreads == 0)
 918          break;
 919        if (++p->filledThreadStart == p->numStartedThreads)
 920          p->filledThreadStart = 0;
 921        t = &p->threads[p->filledThreadStart];
 922      }
 923    }
 924    
 925    {
 926      size_t lim = t->inDataSize_Start;
 927      if (lim != 0)
 928        t->inDataSize_Start = 0;
 929      else
 930      {
 931        UInt64 rem = t->inDataSize;
 932        lim = p->inBufSize;
 933        if (lim > rem)
 934          lim = (size_t)rem;
 935      }
 936      t->inDataSize -= lim;
 937      *inLim = lim;
 938      return (const Byte *)MTDEC__DATA_PTR_FROM_LINK(t->inBuf);
 939    }
 940  }
 941
 942  {
 943    size_t crossSize = p->crossEnd - p->crossStart;
 944    if (crossSize != 0)
 945    {
 946      const Byte *data = MTDEC__DATA_PTR_FROM_LINK(p->crossBlock) + p->crossStart;
 947      *inLim = crossSize;
 948      p->crossStart = 0;
 949      p->crossEnd = 0;
 950      return data;
 951    }
 952    *inLim = 0;
 953    if (p->crossBlock)
 954    {
 955      ISzAlloc_Free(p->alloc, p->crossBlock);
 956      p->crossBlock = NULL;
 957    }
 958    return NULL;
 959  }
 960}
 961
 962
 963void MtDec_Construct(CMtDec *p)
 964{
 965  unsigned i;
 966  
 967  p->inBufSize = (size_t)1 << 18;
 968
 969  p->numThreadsMax = 0;
 970
 971  p->inStream = NULL;
 972  
 973  // p->inData = NULL;
 974  // p->inDataSize = 0;
 975
 976  p->crossBlock = NULL;
 977  p->crossStart = 0;
 978  p->crossEnd = 0;
 979
 980  p->numFilledThreads = 0;
 981
 982  p->progress = NULL;
 983  p->alloc = NULL;
 984
 985  p->mtCallback = NULL;
 986  p->mtCallbackObject = NULL;
 987
 988  p->allocatedBufsSize = 0;
 989
 990  for (i = 0; i < MTDEC__THREADS_MAX; i++)
 991  {
 992    CMtDecThread *t = &p->threads[i];
 993    t->mtDec = p;
 994    t->index = i;
 995    t->inBuf = NULL;
 996    Event_Construct(&t->canRead);
 997    Event_Construct(&t->canWrite);
 998    Thread_Construct(&t->thread);
 999  }
1000
1001  // Event_Construct(&p->finishedEvent);
1002
1003  CriticalSection_Init(&p->mtProgress.cs);
1004}
1005
1006
1007static void MtDec_Free(CMtDec *p)
1008{
1009  unsigned i;
1010
1011  p->exitThread = True;
1012
1013  for (i = 0; i < MTDEC__THREADS_MAX; i++)
1014    MtDecThread_Destruct(&p->threads[i]);
1015
1016  // Event_Close(&p->finishedEvent);
1017
1018  if (p->crossBlock)
1019  {
1020    ISzAlloc_Free(p->alloc, p->crossBlock);
1021    p->crossBlock = NULL;
1022  }
1023}
1024
1025
1026void MtDec_Destruct(CMtDec *p)
1027{
1028  MtDec_Free(p);
1029
1030  CriticalSection_Delete(&p->mtProgress.cs);
1031}
1032
1033
1034SRes MtDec_Code(CMtDec *p)
1035{
1036  unsigned i;
1037
1038  p->inProcessed = 0;
1039
1040  p->blockIndex = 1; // it must be larger than not_defined index (0)
1041  p->isAllocError = False;
1042  p->overflow = False;
1043  p->threadingErrorSRes = SZ_OK;
1044
1045  p->needContinue = True;
1046
1047  p->readWasFinished = False;
1048  p->needInterrupt = False;
1049  p->interruptIndex = (UInt64)(Int64)-1;
1050
1051  p->readProcessed = 0;
1052  p->readRes = SZ_OK;
1053  p->codeRes = SZ_OK;
1054  p->wasInterrupted = False;
1055
1056  p->crossStart = 0;
1057  p->crossEnd = 0;
1058
1059  p->filledThreadStart = 0;
1060  p->numFilledThreads = 0;
1061
1062  {
1063    unsigned numThreads = p->numThreadsMax;
1064    if (numThreads > MTDEC__THREADS_MAX)
1065      numThreads = MTDEC__THREADS_MAX;
1066    p->numStartedThreads_Limit = numThreads;
1067    p->numStartedThreads = 0;
1068  }
1069
1070  if (p->inBufSize != p->allocatedBufsSize)
1071  {
1072    for (i = 0; i < MTDEC__THREADS_MAX; i++)
1073    {
1074      CMtDecThread *t = &p->threads[i];
1075      if (t->inBuf)
1076        MtDecThread_FreeInBufs(t);
1077    }
1078    if (p->crossBlock)
1079    {
1080      ISzAlloc_Free(p->alloc, p->crossBlock);
1081      p->crossBlock = NULL;
1082    }
1083
1084    p->allocatedBufsSize = p->inBufSize;
1085  }
1086
1087  MtProgress_Init(&p->mtProgress, p->progress);
1088
1089  // RINOK_THREAD(ArEvent_OptCreate_And_Reset(&p->finishedEvent));
1090  p->exitThread = False;
1091  p->exitThreadWRes = 0;
1092
1093  {
1094    WRes wres;
1095    WRes sres;
1096    CMtDecThread *nextThread = &p->threads[p->numStartedThreads++];
1097    // wres = MtDecThread_CreateAndStart(nextThread);
1098    wres = MtDecThread_CreateEvents(nextThread);
1099    if (wres == 0) { wres = Event_Set(&nextThread->canWrite);
1100    if (wres == 0) { wres = Event_Set(&nextThread->canRead);
1101    if (wres == 0) { wres = ThreadFunc(nextThread);
1102    if (wres != 0)
1103    {
1104      p->needContinue = False;
1105      MtDec_CloseThreads(p);
1106    }}}}
1107
1108    // wres = 17; // for test
1109    // wres = Event_Wait(&p->finishedEvent);
1110
1111    sres = MY_SRes_HRESULT_FROM_WRes(wres);
1112
1113    if (sres != 0)
1114      p->threadingErrorSRes = sres;
1115
1116    if (
1117        // wres == 0
1118        // wres != 0
1119        // || p->mtc.codeRes == SZ_ERROR_MEM
1120        p->isAllocError
1121        || p->threadingErrorSRes != SZ_OK
1122        || p->overflow)
1123    {
1124      // p->needContinue = True;
1125    }
1126    else
1127      p->needContinue = False;
1128    
1129    if (p->needContinue)
1130      return SZ_OK;
1131
1132    // if (sres != SZ_OK)
1133      return sres;
1134    // return E_FAIL;
1135  }
1136}
1137
1138#endif