diff -Nuar lzop-1.01/config.hin lzop-1.01-parallel/config.hin --- lzop-1.01/config.hin Sun Apr 27 17:00:02 2003 +++ lzop-1.01-parallel/config.hin Mon Jul 14 15:50:09 2003 @@ -105,6 +105,9 @@ /* Define to 1 if you have the `mktime' function. */ #undef HAVE_MKTIME +/* Define to 1 if you have the `pthread_join' function. */ +#undef HAVE_PTHREAD_JOIN + /* Define to 1 if you have the header file. */ #undef HAVE_SHARE_H diff -Nuar lzop-1.01/configure lzop-1.01-parallel/configure --- lzop-1.01/configure Sun Apr 27 17:00:02 2003 +++ lzop-1.01-parallel/configure Mon Jul 14 15:44:41 2003 @@ -13815,6 +13815,205 @@ done +# Begin PTHREADS mods by James Lemley, 2003 +echo "$as_me:$LINENO: checking for library containing pthread_join" >&5 +echo $ECHO_N "checking for library containing pthread_join... $ECHO_C" >&6 +if test "${ac_cv_search_pthread_join+set}" = set; then + echo $ECHO_N "(cached) $ECHO_C" >&6 +else + ac_func_search_save_LIBS=$LIBS +ac_cv_search_pthread_join=no +cat >conftest.$ac_ext <<_ACEOF +#line $LINENO "configure" +/* confdefs.h. */ +_ACEOF +cat confdefs.h >>conftest.$ac_ext +cat >>conftest.$ac_ext <<_ACEOF +/* end confdefs.h. */ + +/* Override any gcc2 internal prototype to avoid an error. */ +#ifdef __cplusplus +extern "C" +#endif +/* We use char because int might match the return type of a gcc2 + builtin and then its argument prototype would still apply. */ +char pthread_join (); +int +main () +{ +pthread_join (); + ; + return 0; +} +_ACEOF +rm -f conftest.$ac_objext conftest$ac_exeext +if { (eval echo "$as_me:$LINENO: \"$ac_link\"") >&5 + (eval $ac_link) 2>&5 + ac_status=$? + echo "$as_me:$LINENO: \$? = $ac_status" >&5 + (exit $ac_status); } && + { ac_try='test -s conftest$ac_exeext' + { (eval echo "$as_me:$LINENO: \"$ac_try\"") >&5 + (eval $ac_try) 2>&5 + ac_status=$? + echo "$as_me:$LINENO: \$? = $ac_status" >&5 + (exit $ac_status); }; }; then + ac_cv_search_pthread_join="none required" +else + echo "$as_me: failed program was:" >&5 +sed 's/^/| /' conftest.$ac_ext >&5 + +fi +rm -f conftest.$ac_objext conftest$ac_exeext conftest.$ac_ext +if test "$ac_cv_search_pthread_join" = no; then + for ac_lib in pthread pthreads; do + LIBS="-l$ac_lib $ac_func_search_save_LIBS" + cat >conftest.$ac_ext <<_ACEOF +#line $LINENO "configure" +/* confdefs.h. */ +_ACEOF +cat confdefs.h >>conftest.$ac_ext +cat >>conftest.$ac_ext <<_ACEOF +/* end confdefs.h. */ + +/* Override any gcc2 internal prototype to avoid an error. */ +#ifdef __cplusplus +extern "C" +#endif +/* We use char because int might match the return type of a gcc2 + builtin and then its argument prototype would still apply. */ +char pthread_join (); +int +main () +{ +pthread_join (); + ; + return 0; +} +_ACEOF +rm -f conftest.$ac_objext conftest$ac_exeext +if { (eval echo "$as_me:$LINENO: \"$ac_link\"") >&5 + (eval $ac_link) 2>&5 + ac_status=$? + echo "$as_me:$LINENO: \$? = $ac_status" >&5 + (exit $ac_status); } && + { ac_try='test -s conftest$ac_exeext' + { (eval echo "$as_me:$LINENO: \"$ac_try\"") >&5 + (eval $ac_try) 2>&5 + ac_status=$? + echo "$as_me:$LINENO: \$? = $ac_status" >&5 + (exit $ac_status); }; }; then + ac_cv_search_pthread_join="-l$ac_lib" +break +else + echo "$as_me: failed program was:" >&5 +sed 's/^/| /' conftest.$ac_ext >&5 + +fi +rm -f conftest.$ac_objext conftest$ac_exeext conftest.$ac_ext + done +fi +LIBS=$ac_func_search_save_LIBS +fi +echo "$as_me:$LINENO: result: $ac_cv_search_pthread_join" >&5 +echo "${ECHO_T}$ac_cv_search_pthread_join" >&6 +if test "$ac_cv_search_pthread_join" != no; then + test "$ac_cv_search_pthread_join" = "none required" || LIBS="$ac_cv_search_pthread_join $LIBS" + +else + + echo "Can't find a POSIX threads library. If one is actually installed," + echo "please fix configure.in and submit a patch back to the maintainer." + echo "Thanks!" + exit 1 + +fi + + +for ac_func in pthread_join +do +as_ac_var=`echo "ac_cv_func_$ac_func" | $as_tr_sh` +echo "$as_me:$LINENO: checking for $ac_func" >&5 +echo $ECHO_N "checking for $ac_func... $ECHO_C" >&6 +if eval "test \"\${$as_ac_var+set}\" = set"; then + echo $ECHO_N "(cached) $ECHO_C" >&6 +else + cat >conftest.$ac_ext <<_ACEOF +#line $LINENO "configure" +/* confdefs.h. */ +_ACEOF +cat confdefs.h >>conftest.$ac_ext +cat >>conftest.$ac_ext <<_ACEOF +/* end confdefs.h. */ +/* System header to define __stub macros and hopefully few prototypes, + which can conflict with char $ac_func (); below. + Prefer to if __STDC__ is defined, since + exists even on freestanding compilers. */ +#ifdef __STDC__ +# include +#else +# include +#endif +/* Override any gcc2 internal prototype to avoid an error. */ +#ifdef __cplusplus +extern "C" +{ +#endif +/* We use char because int might match the return type of a gcc2 + builtin and then its argument prototype would still apply. */ +char $ac_func (); +/* The GNU C library defines this for functions which it implements + to always fail with ENOSYS. Some functions are actually named + something starting with __ and the normal name is an alias. */ +#if defined (__stub_$ac_func) || defined (__stub___$ac_func) +choke me +#else +char (*f) () = $ac_func; +#endif +#ifdef __cplusplus +} +#endif + +int +main () +{ +return f != $ac_func; + ; + return 0; +} +_ACEOF +rm -f conftest.$ac_objext conftest$ac_exeext +if { (eval echo "$as_me:$LINENO: \"$ac_link\"") >&5 + (eval $ac_link) 2>&5 + ac_status=$? + echo "$as_me:$LINENO: \$? = $ac_status" >&5 + (exit $ac_status); } && + { ac_try='test -s conftest$ac_exeext' + { (eval echo "$as_me:$LINENO: \"$ac_try\"") >&5 + (eval $ac_try) 2>&5 + ac_status=$? + echo "$as_me:$LINENO: \$? = $ac_status" >&5 + (exit $ac_status); }; }; then + eval "$as_ac_var=yes" +else + echo "$as_me: failed program was:" >&5 +sed 's/^/| /' conftest.$ac_ext >&5 + +eval "$as_ac_var=no" +fi +rm -f conftest.$ac_objext conftest$ac_exeext conftest.$ac_ext +fi +echo "$as_me:$LINENO: result: `eval echo '${'$as_ac_var'}'`" >&5 +echo "${ECHO_T}`eval echo '${'$as_ac_var'}'`" >&6 +if test `eval echo '${'$as_ac_var'}'` = yes; then + cat >>confdefs.h <<_ACEOF +#define `echo "HAVE_$ac_func" | $as_tr_cpp` 1 +_ACEOF + +fi +done + +# End PTHREADS mods by James Lemley, 2003 # /*********************************************************************** # // Write output files diff -Nuar lzop-1.01/configure.ac lzop-1.01-parallel/configure.ac --- lzop-1.01/configure.ac Sun Apr 27 17:00:02 2003 +++ lzop-1.01-parallel/configure.ac Mon Jul 14 15:44:22 2003 @@ -130,6 +130,15 @@ AC_CHECK_FUNCS(access alloca atoi atol chmod chown ctime difftime fstat gettimeofday gmtime localtime lstat memcmp memcpy memmove memset mktime snprintf strchr strdup strerror strftime strrchr umask utime vsnprintf) AC_CHECK_FUNCS(fchmod stricmp strnicmp) +# Begin PTHREADS mods by James Lemley, 2003 +AC_SEARCH_LIBS(pthread_join,pthread pthreads,,[ + echo "Can't find a POSIX threads library. If one is actually installed," + echo "please fix configure.in and submit a patch back to the maintainer." + echo "Thanks!" + exit 1 +]) +AC_CHECK_FUNCS(pthread_join) +# End PTHREADS mods by James Lemley, 2003 # /*********************************************************************** # // Write output files diff -Nuar lzop-1.01/src/conf.h lzop-1.01-parallel/src/conf.h --- lzop-1.01/src/conf.h Sun Apr 27 17:00:02 2003 +++ lzop-1.01-parallel/src/conf.h Mon Jul 14 15:54:33 2003 @@ -79,6 +79,14 @@ # error #endif +/* -- James Lemley -- adding support for POSIX threads via autoconf. */ +#ifdef HAVE_PTHREAD_JOIN +#define USE_PTHREADS +#endif + +#ifdef USE_PTHREADS +#include +#endif /************************************************************************* // @@ -596,6 +604,26 @@ /************************************************************************* // mblock.c **************************************************************************/ +#ifdef USE_PTHREADS + +/* define the states buffers can be in for parallel operation: */ +/* input buffers go from empty to filled to busy to empty again */ +/* output buffers go from empty to busy to filled to empty again */ +/* worker threads shut down when they find an EOF flag on an input buffer */ + +#define BUFFER_EMPTY 0x00000100L +#define BUFFER_FILLED 0x00000200L +#define BUFFER_BUSY 0x00000400L +#define BUFFER_EOF 0x00000800L + +/* More defines for parallel operation: */ +#ifndef MAX_THREADS +#define MAX_THREADS 128 +#endif + +#define DEFAULT_THREADS 8 + +#endif /* USE_PTHREADS */ typedef struct { @@ -605,6 +633,14 @@ lzo_uint32 size; lzo_uint32 align; lzo_uint32 flags; +#ifdef USE_PTHREADS + /* flags is initially set to 0x00000001 in mblock.c */ + /* 0x00000001 means "memory allocated"; otherwise unused. */ + /* I will use this field for parallel buffer status. */ + lzo_uint32 adler32; /* moved from local storage in lzo_compress to buffer areas */ + lzo_uint32 crc32; /* for parallel processing */ + lzo_uint32 len; /* for parallel processing */ +#endif /* USE_PTHREADS */ } memblock_t; #define memblock_p memblock_t * diff -Nuar lzop-1.01/src/help.c lzop-1.01-parallel/src/help.c --- lzop-1.01/src/help.c Sun Apr 27 17:00:02 2003 +++ lzop-1.01-parallel/src/help.c Mon Jul 14 15:28:03 2003 @@ -50,6 +50,10 @@ " Laetor Zonas Obteri\n" " Copyright (C) 1996, 1997, 1998, 1999, 2000, 2001, 2002, 2003\n" "lzop v%-11s Markus Franz Xaver Johannes Oberhumer %20s\n" +#ifdef USE_PTHREADS + "\n##POSIX thread enabled patch 05 contributed by James Lemley, 2002, 2003##\n" + "For information on this version only, see http://lemley.net/lzop_patches/\n" +#endif "\n", LZOP_VERSION_STRING, LZOP_VERSION_DATE); fg = con_fg(f,fg); @@ -129,8 +133,14 @@ " -S.suf use suffix .suf on compressed files\n" " -F do *not* store or verify checksum of files (faster)\n" " -U delete input files after successful operation (like gzip and bzip2)\n" +#ifdef USE_PTHREADS +" -TN use N parallel compression threads (default %d max %d)\n" +" file.. files to (de)compress. If none given, try standard input.\n" +, dn, dN, DEFAULT_THREADS, MAX_THREADS); +#else " file.. files to (de)compress. If none given, try standard input.\n" , dn, dN); +#endif /* USE_PTHREADS */ } diff -Nuar lzop-1.01/src/lzop.c lzop-1.01-parallel/src/lzop.c --- lzop-1.01/src/lzop.c Sun Apr 27 17:00:02 2003 +++ lzop-1.01-parallel/src/lzop.c Mon Jul 14 15:28:03 2003 @@ -103,6 +103,9 @@ static unsigned long total_bytes_written = 0; static unsigned long total_bytes_read = 0; +#ifdef USE_PTHREADS +extern int num_threads; +#endif /************************************************************************* // exit handlers @@ -2549,6 +2552,21 @@ case 't': set_cmd(CMD_TEST); break; +#ifdef USE_PTHREADS + case 'T': /* set number of compression threads for parallel compression */ + if (mfx_optarg && isdigit(mfx_optarg[0])) + { + num_threads = atoi(mfx_optarg); + if (num_threads > MAX_THREADS) + { + fprintf(stderr, "%s: %d threads requested; using maximum of %d\n", argv0, num_threads, MAX_THREADS); + num_threads = MAX_THREADS; + } + if (num_threads < 1) + num_threads = 1; + } + break; +#endif case 'U': opt_unlink = 1; break; @@ -2749,6 +2767,9 @@ {"path", 1, 0, 'p'+256}, {"quiet", 0, 0, 'q'}, /* quiet mode */ {"silent", 0, 0, 'q'}, /* quiet mode */ +#ifdef USE_PTHREADS + {"threads", 1, 0, 'T'}, /* set number of threads for parallel operation */ +#endif {"stdout", 0, 0, 'c'}, /* write output on standard output */ {"suffix", 1, 0, 'S'}, /* use given suffix instead of .lzo */ {"to-stdout", 0, 0, 'c'}, /* write output on standard output */ @@ -2811,6 +2832,9 @@ {"no-warn", 0, 0, 525}, /* do not display any warnings */ {"quiet", 0, 0, 'q'}, /* quiet mode */ {"silent", 0, 0, 'q'}, /* quiet mode */ +#ifdef USE_PTHREADS + {"threads", 1, 0, 'T'}, /* set number of threads for parallel operation */ +#endif {"unlink", 0, 0, 'U'}, {"verbose", 0, 0, 'v'}, /* verbose mode */ diff -Nuar lzop-1.01/src/p_lzo.c lzop-1.01-parallel/src/p_lzo.c --- lzop-1.01/src/p_lzo.c Sun Apr 27 17:00:02 2003 +++ lzop-1.01-parallel/src/p_lzo.c Mon Jul 14 15:28:03 2003 @@ -28,6 +28,20 @@ #include "conf.h" +#ifdef USE_PTHREADS +/* POSIX threads code added by James Lemley, 2002-2003 */ + +/* define some things for parallel processing */ +#define MAX_BUFFERS (MAX_THREADS * 4) +int num_threads = DEFAULT_THREADS; +#define num_buffers (num_threads * 4 ) /* must be at least num_threads * 2, and must be even number */ +pthread_mutex_t buffer_mutex; /* to lock buffer arrays for checking && setting flags */ +#define IO_SLEEP_USEC 20 /* how long do I/O threads sleep in microseconds if they can't get a buffer. */ +#define COMP_SLEEP_USEC 10 /* how long do worker threads sleep in microseconds if they can't get a buffer. */ + +/* define a macro for easy debugging print */ +#define D(x, y) if(0) {fprintf(stderr, x, y); fflush(stderr);} +#endif /* USE_PTHREADS */ #if defined(WITH_LZO) @@ -172,23 +186,57 @@ #endif +#ifdef USE_PTHREADS +static memblock_t block1[MAX_BUFFERS]; /* define buffers in array instead of discreetly */ +#else static memblock_t block1; static memblock_t block2; static memblock_t wrkmem; +#endif /* USE_PTHREADS */ static void free_mem(void) { +#ifdef USE_PTHREADS + int i; + for (i = 0; i < num_buffers; i++) + { + mb_free(block1+i); + } +#else mb_free(&wrkmem); mb_free(&block2); mb_free(&block1); +#endif /* USE_PTHREADS */ } +#ifdef USE_PTHREADS +static lzo_uint32 W; /* holds work_len for worker memory size, in each thread. */ +#endif /* USE_PTHREADS */ static lzo_bool alloc_mem(lzo_uint32 s1, lzo_uint32 s2, lzo_uint32 w) { lzo_bool r = 1; +#ifdef USE_PTHREADS +#if defined(USE_MALLOC) + int i; + for (i = 0; r && i < num_buffers; i+=2) + r &= mb_alloc(block1+i, s1, ALIGN_SIZE); + for (i = 0; r && i < num_buffers; i+=2) + r &= mb_alloc(block1+i+1, s2, ALIGN_SIZE); + + if (pthread_mutex_init(&buffer_mutex, NULL)) + { + fprintf(stderr, "pthread_mutex_init failed\n"); + r = 0; + } + /* save W for allocating workmem in each worker thread */ + W = w; +#else +#error "parallel version requires use of malloc()" +#endif /* USE_MALLOC */ +#else /* if not defined USE_PTHREADS */ #if defined(USE_MALLOC) r &= mb_alloc(&block1, s1, ALIGN_SIZE); r &= mb_alloc(&block2, s2, ALIGN_SIZE); @@ -197,7 +245,8 @@ r &= mb_init(&block1, s1, ALIGN_SIZE, heap_block1, sizeof(heap_block1)); r &= mb_init(&block2, s2, ALIGN_SIZE, heap_block2, sizeof(heap_block2)); r &= mb_init(&wrkmem, w, ALIGN_SIZE, heap_wrkmem, sizeof(heap_wrkmem)); -#endif +#endif /* defined(USE_MALLOC) */ +#endif /* USE_PTHREADS */ if (!r) free_mem(); return r; @@ -282,6 +331,321 @@ } +#ifdef USE_PTHREADS +/************************************************************************* +// compress a file using POSIX threads +**************************************************************************/ + +lzo_bool lzo_compress_worker(const header_t *h); +lzo_bool lzo_compress_reader(file_t *fip); + +lzo_bool lzo_compress_parallel(file_t *fip, file_t *fop, const header_t *h) +{ + int i; + int r = 1; /* changed 2002-12-23 */ + lzo_uint32 output_sequence = 0; + pthread_t worker_threads[MAX_THREADS]; + pthread_t reader_thread; + void *thread_ret; + + /* set initial state to BUFFER_EMPTY on all input and output buffers. */ + for (i = 0; i < num_buffers; i++) + { + block1[i].flags &= ~(BUFFER_BUSY | BUFFER_FILLED | BUFFER_EOF); + block1[i].flags |= BUFFER_EMPTY; + } + + /* start dedicated reader thread */ + if (pthread_create(&reader_thread, NULL, (void *)(void *)lzo_compress_reader, (void *) fip)) + { + fprintf(stderr, "Unable to create reader thread\n"); + /* print errno here too */ + exit(1); + } + + /* start worker threads */ + for (i = 0; i < num_threads; i++) + { + if (pthread_create(&worker_threads[i], NULL, (void *)(void *)lzo_compress_worker, (void *) h)) + { + fprintf(stderr, "Unable to create worker thread %d\n", i); + /* print errno here too */ + exit(1); + } + } + + for (;;) + { + /* troll for THE ready output buffer - identified by output_sequence and BUFFER_FILLED */ + +D("lzo_compress: %s\n", "trolling for finished output buffer..."); + pthread_mutex_lock(&buffer_mutex); + for (i = 0; i < num_buffers; i+=2) + { + if (block1[i+1].flags & BUFFER_FILLED) /* an output buffer that is ready */ + if ((block1[i+1].flags >> 16) == output_sequence) /* and it is the one I am waiting on */ + break; + } + if (i != num_buffers) /* found work */ + { +D("lzo_compress: %s\n", "found it! writing block."); + block1[i+1].flags &= ~BUFFER_FILLED; /* mark output buffer as busy */ + block1[i+1].flags |= BUFFER_BUSY; + + pthread_mutex_unlock(&buffer_mutex); /* unlock buffers for file write that may block */ + + /* write uncompressed block size */ + write32(fop,block1[i].len); +D("lzo_compress: uncompressed size: %d\n", block1[i].len); +D("lzo_compress: compressed size: %d\n", block1[i+1].len); + + /* exit if last block */ + if (block1[i].len == 0) + break; + + /* write compressed block size */ + if (block1[i+1].len < block1[i].len) + write32(fop,block1[i+1].len); + else + write32(fop,block1[i].len); + + /* write checksum of uncompressed block */ + if (h->flags & F_ADLER32_D) + write32(fop,block1[i].adler32); + if (h->flags & F_CRC32_D) + write32(fop,block1[i].crc32); + + /* write checksum of compressed block */ + if (block1[i+1].len < block1[i].len && (h->flags & F_ADLER32_C)) + { + write32(fop,block1[i+1].adler32); + } + if (block1[i+1].len < block1[i].len && (h->flags & F_CRC32_C)) + { + write32(fop,block1[i+1].crc32); + } + + /* write compressed block data */ + if (block1[i+1].len < block1[i].len) + write_buf(fop,block1[i+1].mem,block1[i+1].len); /* this line had an error */ + else + write_buf(fop,block1[i].mem,block1[i].len); + + /* mark both input and output buffers as free */ + pthread_mutex_lock(&buffer_mutex); + block1[i+1].flags &= ~BUFFER_BUSY; + block1[i+1].flags |= BUFFER_EMPTY; + block1[i].flags &= ~BUFFER_BUSY; /* clear busy flag that the worker left set on the input buffer */ + block1[i].flags |= BUFFER_EMPTY; + pthread_mutex_unlock(&buffer_mutex); /* unlock buffers */ + + output_sequence++; + if (output_sequence > 0x0FFF) + output_sequence = 0; /* do same with input_sequence on input to keep in sync */ + } + else /* no output buffer found ready to go */ + { +D("lzo_compress: %s\n", "No ouput ready. Sleeping."); + pthread_mutex_unlock(&buffer_mutex); /* unlock buffers */ + usleep (IO_SLEEP_USEC); /* give up my CPU */ + } + } + +D("lzo_compress: %s\n", "Waiting on reader to join... "); + pthread_join(reader_thread, &thread_ret); +D("lzo_compress: %s\n", "Waiting on workers to join... "); + /* wait on workers */ + for (i = 0; i < num_threads; i++) + { + pthread_join(worker_threads[i], &thread_ret); + /*FIXME -- dumps core. r &= *((lzo_bool *) thread_ret); */ + } +D("lzo_compress: %s\n", "Workers joined. Exiting... "); + + return r; +} + +lzo_bool lzo_compress_worker(const header_t *h) +{ + int i; + int r = LZO_E_OK; + lzo_bool input_eof = 0; + memblock_t wrkmem; + r &= mb_alloc(&wrkmem, W, ALIGN_SIZE); /* allocate work memory on a per-thread basis */ +D(" lzo_compress_worker: %s\n", "startup!"); + + while (1) + { + /* troll for full input buffer to compress */ +D(" lzo_compress_worker: %s\n", "trolling for filled input buffer..."); + pthread_mutex_lock(&buffer_mutex); + for (i = 0; i < num_buffers; i+=2) + { + if (block1[i].flags & BUFFER_EOF) + input_eof = 1; + if (block1[i].flags & BUFFER_FILLED) + break; + } + if (i == num_buffers) + { + pthread_mutex_unlock(&buffer_mutex); + /* no work for me... */ + if (input_eof) + { +D(" lzo_compress_worker: %s\n", "Found EOF in input buffer and there are no other filled inputs. "); + break; + } + +D(" lzo_compress_worker: %s\n", "sleeping."); + usleep(COMP_SLEEP_USEC); /* give up my CPU... */ + continue; /* go back and try again... */ + } + +D(" lzo_compress_worker: %s\n", "Got input buffer. "); + block1[i].flags &= ~BUFFER_FILLED; /* clear buffer full flag so no other worker grabs this one too */ + block1[i].flags |= BUFFER_BUSY; /* set busy flag */ + /* work for this thread to do! now to find a free output buffer... */ +D(" lzo_compress_worker: %s\n", "Got output buffer. Compressing... "); + block1[i+1].flags &= ~BUFFER_EMPTY; /* unset output buffer free flag */ + block1[i+1].flags |= BUFFER_BUSY; /* set output buffer busy flag */ + block1[i+1].flags &= 0x0000FFFF; /* clear any old sequence number */ + block1[i+1].flags |= (block1[i].flags & 0xFFFF0000); /*copy input sequence to output buffer */ + if (block1[i].flags & BUFFER_EOF) /* if this is the EOF buffer, don't really compress. */ + { +D(" lzo_compress_worker: %s\n", "I've got the EOF buffer."); + /* but first set EOF flag on ouptut buffer, and zero length. */ + block1[i+1].flags &= ~BUFFER_BUSY; /* set output buffer busy flag */ + block1[i+1].flags |= BUFFER_FILLED; + block1[i+1].len = 0; + pthread_mutex_unlock(&buffer_mutex); + /* go back and make sure there isn't more work to do... */ + continue; + } + + pthread_mutex_unlock(&buffer_mutex); /* unlock buffers now that I have the ones I want */ + + /* compute checksum of uncompressed block */ + + if (h->flags & F_ADLER32_D) + block1[i].adler32 = lzo_adler32(ADLER32_INIT_VALUE,block1[i].mem,block1[i].len); + if (h->flags & F_CRC32_D) + block1[i].crc32 = lzo_crc32(CRC32_INIT_VALUE,block1[i].mem,block1[i].len); + + x_filter(block1[i].mem,block1[i].len,h); + + /* compress */ + if (h->method == M_LZO1X_1) + r = lzo1x_1_compress(block1[i].mem, block1[i].len, block1[i+1].mem, &block1[i+1].len, wrkmem.mem); +#if defined(USE_LZO1X_1_15) + else if (h->method == M_LZO1X_1_15) + r = lzo1x_1_15_compress(block1[i].mem, block1[i].len, + block1[i+1].mem, &block1[i+1].len, wrkmem.mem); +#endif +#if defined(USE_LZO1X_999) + else if (h->method == M_LZO1X_999) + r = lzo1x_999_compress_level(block1[i].mem, block1[i].len, + block1[i+1].mem, &block1[i+1].len, wrkmem.mem, + NULL, 0, 0, h->level); +#endif + else + fatal(NULL,"Internal error"); + +#if 0 + fprintf(stderr, "%ld %ld %ld\n", (long)src_len, (long)dst_len, (long)block2.size); +#endif + assert(block1[i+1].len <= block1[i+1].size); + if (r != LZO_E_OK) + fatal(NULL,"Internal error - compression failed"); + + /* optimize */ + if (opt_optimize && block1[i+1].len < block1[i].len) + { + lzo_uint32 new_len = block1[i].len; + r = lzo1x_optimize(block1[i+1].mem, block1[i+1].len, block1[i].mem, &new_len, NULL); + if (r != LZO_E_OK || new_len != block1[i].len) + fatal(NULL,"Internal error - optimization failed"); + } + /* compute checksum of compressed block */ + if (block1[i+1].len < block1[i].len && (h->flags & F_ADLER32_C)) + { + block1[i+1].adler32 = lzo_adler32(ADLER32_INIT_VALUE,block1[i+1].mem,block1[i+1].len); + } + if (block1[i+1].len < block1[i].len && (h->flags & F_CRC32_C)) + { + block1[i+1].crc32 = lzo_crc32(CRC32_INIT_VALUE,block1[i+1].mem,block1[i+1].len); + } +D(" lzo_compress_worker: %s\n", "setting ouput buffer filled... "); + pthread_mutex_lock(&buffer_mutex); + /* set flags on output buffer to full - I/O thread will reset flags on input buffer */ + + block1[i+1].flags &= ~BUFFER_BUSY; + block1[i+1].flags |= BUFFER_FILLED; + + pthread_mutex_unlock(&buffer_mutex); + } + mb_free(&wrkmem); +D(" *****lzo_compress_worker: %s\n", "exiting!"); + return r; +} + +lzo_bool lzo_compress_reader(file_t *fip) +{ + int i; + lzo_uint32 input_sequence = 0; + lzo_int l; + + for (;;) + { + /* troll for an empty input buffer */ +D(" lzo_compress_reader: %s\n", "trolling for empty input buffer..."); + pthread_mutex_lock(&buffer_mutex); + for (i = 0; i < num_buffers; i+=2) + if (block1[i].flags & BUFFER_EMPTY) + break; + if (i != num_buffers) /* got one */ + { +D(" lzo_compress_reader: %s\n", "Got one. Reading block."); + block1[i].flags &= ~BUFFER_EMPTY; + block1[i].flags |= BUFFER_BUSY; + /* save buffer sequence so we can keep proper order on output */ + block1[i].flags ^= (block1[i].flags & 0xFFFF0000); /* clear old buffer sequence */ + block1[i].flags |= (input_sequence << 16); /* store new buffer sequence */ + input_sequence++; + if (input_sequence > 0x0FFF) + input_sequence = 0; /* do same with output_sequence on output to keep in sync */ + + pthread_mutex_unlock(&buffer_mutex); /* unlock buffers for the read - if we block, others can work */ + + /* read a block */ + l = read_buf(fip, block1[i].mem, block_size); + block1[i].len = (l > 0 ? l : 0); + + pthread_mutex_lock(&buffer_mutex); + block1[i].flags &= ~BUFFER_BUSY; + block1[i].flags |= BUFFER_FILLED; + if (block1[i].len == 0) + { +D(" lzo_compress_reader: %s\n", "EOF."); + block1[i].flags |= BUFFER_EOF; /* signal all waiting threads to give up */ + } + } + pthread_mutex_unlock(&buffer_mutex); + if (i == num_buffers) + { +D(" lzo_compress_reader: %s\n", " no empty input buffers - sleeping."); + usleep(IO_SLEEP_USEC); + } + else if (block1[i].len == 0) + { +D(" lzo_compress_reader: %s\n", " bailing.."); + break; + } + } + return 1; +} + +#endif /* ifdef USE_PTHREADS */ + /************************************************************************* // compress a file **************************************************************************/ @@ -289,14 +653,27 @@ lzo_bool lzo_compress(file_t *fip, file_t *fop, const header_t *h) { int r = LZO_E_OK; +#ifdef USE_PTHREADS + /* this still needs to work for calling the thread enabled version with only 1 thread */ + lzo_byte * const b1 = block1[0].mem; + lzo_byte * const b2 = block1[1].mem; + memblock_t wrkmem; +#else lzo_byte * const b1 = block1.mem; lzo_byte * const b2 = block2.mem; +#endif /* USE_PTHREADS */ lzo_uint32 src_len = 0, dst_len = 0; lzo_uint32 c_adler32 = ADLER32_INIT_VALUE, d_adler32 = ADLER32_INIT_VALUE; lzo_uint32 c_crc32 = CRC32_INIT_VALUE, d_crc32 = CRC32_INIT_VALUE; lzo_int l; lzo_bool ok = 1; +#ifdef USE_PTHREADS + /* if parallelism is requested, call lzo_compress_parallel, else fall through to original code */ + if (num_threads > 1) + return lzo_compress_parallel(fip, fop, h); + r &= mb_alloc(&wrkmem, W, ALIGN_SIZE); /* allocate work memory on a per-thread basis */ +#endif for (;;) { /* read a block */ @@ -338,7 +715,11 @@ #if 0 fprintf(stderr, "%ld %ld %ld\n", (long)src_len, (long)dst_len, (long)block2.size); #endif +#ifdef USE_PTHREADS + assert(dst_len <= block1[1].size); +#else assert(dst_len <= block2.size); +#endif /* USE_PTHREADS */ if (r != LZO_E_OK) fatal(fip,"Internal error - compression failed"); @@ -382,6 +763,9 @@ write_buf(fop,b1,src_len); } +#ifdef USE_PTHREADS + mb_free(&wrkmem); +#endif return ok; } @@ -399,7 +783,13 @@ lzo_uint32 c_crc32 = CRC32_INIT_VALUE, d_crc32 = CRC32_INIT_VALUE; lzo_bool ok = 1; lzo_bool use_seek; +#ifdef USE_PTHREADS + /* the below was &block2 (an output block) but now block1 is an array */ + /* of memblock_t, allocated in input-output-input-output... order. */ + memblock_t * const block = block1+1; +#else memblock_t * const block = &block2; +#endif /* USE_PTHREADS */ lzo_byte * b1; lzo_byte * const b2 = block->mem;