Go to:
Gentoo Home
Documentation
Forums
Lists
Bugs
Planet
Store
Wiki
Get Gentoo!
Gentoo's Bugzilla – Attachment 128306 Details for
Bug 189130
app-arch/lzop-1.01 threading support ebuild patch
Home
|
New
–
[Ex]
|
Browse
|
Search
|
Privacy Policy
|
[?]
|
Reports
|
Requests
|
Help
|
New Account
|
Log In
[x]
|
Forgot Password
Login:
[x]
[patch]
lzop parallel source code patch
lzop_parallel_patch_05 (text/plain), 29.69 KB, created by
Vladimir Brik
on 2007-08-16 16:48:10 UTC
(
hide
)
Description:
lzop parallel source code patch
Filename:
MIME Type:
Creator:
Vladimir Brik
Created:
2007-08-16 16:48:10 UTC
Size:
29.69 KB
patch
obsolete
>diff -Nuar lzop-1.01/config.hin lzop-1.01-parallel/config.hin >--- lzop-1.01/config.hin Sun Apr 27 17:00:02 2003 >+++ lzop-1.01-parallel/config.hin Mon Jul 14 15:50:09 2003 >@@ -105,6 +105,9 @@ > /* Define to 1 if you have the `mktime' function. */ > #undef HAVE_MKTIME > >+/* Define to 1 if you have the `pthread_join' function. */ >+#undef HAVE_PTHREAD_JOIN >+ > /* Define to 1 if you have the <share.h> header file. */ > #undef HAVE_SHARE_H > >diff -Nuar lzop-1.01/configure lzop-1.01-parallel/configure >--- lzop-1.01/configure Sun Apr 27 17:00:02 2003 >+++ lzop-1.01-parallel/configure Mon Jul 14 15:44:41 2003 >@@ -13815,6 +13815,205 @@ > done > > >+# Begin PTHREADS mods by James Lemley, 2003 >+echo "$as_me:$LINENO: checking for library containing pthread_join" >&5 >+echo $ECHO_N "checking for library containing pthread_join... $ECHO_C" >&6 >+if test "${ac_cv_search_pthread_join+set}" = set; then >+ echo $ECHO_N "(cached) $ECHO_C" >&6 >+else >+ ac_func_search_save_LIBS=$LIBS >+ac_cv_search_pthread_join=no >+cat >conftest.$ac_ext <<_ACEOF >+#line $LINENO "configure" >+/* confdefs.h. */ >+_ACEOF >+cat confdefs.h >>conftest.$ac_ext >+cat >>conftest.$ac_ext <<_ACEOF >+/* end confdefs.h. */ >+ >+/* Override any gcc2 internal prototype to avoid an error. */ >+#ifdef __cplusplus >+extern "C" >+#endif >+/* We use char because int might match the return type of a gcc2 >+ builtin and then its argument prototype would still apply. */ >+char pthread_join (); >+int >+main () >+{ >+pthread_join (); >+ ; >+ return 0; >+} >+_ACEOF >+rm -f conftest.$ac_objext conftest$ac_exeext >+if { (eval echo "$as_me:$LINENO: \"$ac_link\"") >&5 >+ (eval $ac_link) 2>&5 >+ ac_status=$? >+ echo "$as_me:$LINENO: \$? = $ac_status" >&5 >+ (exit $ac_status); } && >+ { ac_try='test -s conftest$ac_exeext' >+ { (eval echo "$as_me:$LINENO: \"$ac_try\"") >&5 >+ (eval $ac_try) 2>&5 >+ ac_status=$? >+ echo "$as_me:$LINENO: \$? = $ac_status" >&5 >+ (exit $ac_status); }; }; then >+ ac_cv_search_pthread_join="none required" >+else >+ echo "$as_me: failed program was:" >&5 >+sed 's/^/| /' conftest.$ac_ext >&5 >+ >+fi >+rm -f conftest.$ac_objext conftest$ac_exeext conftest.$ac_ext >+if test "$ac_cv_search_pthread_join" = no; then >+ for ac_lib in pthread pthreads; do >+ LIBS="-l$ac_lib $ac_func_search_save_LIBS" >+ cat >conftest.$ac_ext <<_ACEOF >+#line $LINENO "configure" >+/* confdefs.h. */ >+_ACEOF >+cat confdefs.h >>conftest.$ac_ext >+cat >>conftest.$ac_ext <<_ACEOF >+/* end confdefs.h. */ >+ >+/* Override any gcc2 internal prototype to avoid an error. */ >+#ifdef __cplusplus >+extern "C" >+#endif >+/* We use char because int might match the return type of a gcc2 >+ builtin and then its argument prototype would still apply. */ >+char pthread_join (); >+int >+main () >+{ >+pthread_join (); >+ ; >+ return 0; >+} >+_ACEOF >+rm -f conftest.$ac_objext conftest$ac_exeext >+if { (eval echo "$as_me:$LINENO: \"$ac_link\"") >&5 >+ (eval $ac_link) 2>&5 >+ ac_status=$? >+ echo "$as_me:$LINENO: \$? = $ac_status" >&5 >+ (exit $ac_status); } && >+ { ac_try='test -s conftest$ac_exeext' >+ { (eval echo "$as_me:$LINENO: \"$ac_try\"") >&5 >+ (eval $ac_try) 2>&5 >+ ac_status=$? >+ echo "$as_me:$LINENO: \$? = $ac_status" >&5 >+ (exit $ac_status); }; }; then >+ ac_cv_search_pthread_join="-l$ac_lib" >+break >+else >+ echo "$as_me: failed program was:" >&5 >+sed 's/^/| /' conftest.$ac_ext >&5 >+ >+fi >+rm -f conftest.$ac_objext conftest$ac_exeext conftest.$ac_ext >+ done >+fi >+LIBS=$ac_func_search_save_LIBS >+fi >+echo "$as_me:$LINENO: result: $ac_cv_search_pthread_join" >&5 >+echo "${ECHO_T}$ac_cv_search_pthread_join" >&6 >+if test "$ac_cv_search_pthread_join" != no; then >+ test "$ac_cv_search_pthread_join" = "none required" || LIBS="$ac_cv_search_pthread_join $LIBS" >+ >+else >+ >+ echo "Can't find a POSIX threads library. If one is actually installed," >+ echo "please fix configure.in and submit a patch back to the maintainer." >+ echo "Thanks!" >+ exit 1 >+ >+fi >+ >+ >+for ac_func in pthread_join >+do >+as_ac_var=`echo "ac_cv_func_$ac_func" | $as_tr_sh` >+echo "$as_me:$LINENO: checking for $ac_func" >&5 >+echo $ECHO_N "checking for $ac_func... $ECHO_C" >&6 >+if eval "test \"\${$as_ac_var+set}\" = set"; then >+ echo $ECHO_N "(cached) $ECHO_C" >&6 >+else >+ cat >conftest.$ac_ext <<_ACEOF >+#line $LINENO "configure" >+/* confdefs.h. */ >+_ACEOF >+cat confdefs.h >>conftest.$ac_ext >+cat >>conftest.$ac_ext <<_ACEOF >+/* end confdefs.h. */ >+/* System header to define __stub macros and hopefully few prototypes, >+ which can conflict with char $ac_func (); below. >+ Prefer <limits.h> to <assert.h> if __STDC__ is defined, since >+ <limits.h> exists even on freestanding compilers. */ >+#ifdef __STDC__ >+# include <limits.h> >+#else >+# include <assert.h> >+#endif >+/* Override any gcc2 internal prototype to avoid an error. */ >+#ifdef __cplusplus >+extern "C" >+{ >+#endif >+/* We use char because int might match the return type of a gcc2 >+ builtin and then its argument prototype would still apply. */ >+char $ac_func (); >+/* The GNU C library defines this for functions which it implements >+ to always fail with ENOSYS. Some functions are actually named >+ something starting with __ and the normal name is an alias. */ >+#if defined (__stub_$ac_func) || defined (__stub___$ac_func) >+choke me >+#else >+char (*f) () = $ac_func; >+#endif >+#ifdef __cplusplus >+} >+#endif >+ >+int >+main () >+{ >+return f != $ac_func; >+ ; >+ return 0; >+} >+_ACEOF >+rm -f conftest.$ac_objext conftest$ac_exeext >+if { (eval echo "$as_me:$LINENO: \"$ac_link\"") >&5 >+ (eval $ac_link) 2>&5 >+ ac_status=$? >+ echo "$as_me:$LINENO: \$? = $ac_status" >&5 >+ (exit $ac_status); } && >+ { ac_try='test -s conftest$ac_exeext' >+ { (eval echo "$as_me:$LINENO: \"$ac_try\"") >&5 >+ (eval $ac_try) 2>&5 >+ ac_status=$? >+ echo "$as_me:$LINENO: \$? = $ac_status" >&5 >+ (exit $ac_status); }; }; then >+ eval "$as_ac_var=yes" >+else >+ echo "$as_me: failed program was:" >&5 >+sed 's/^/| /' conftest.$ac_ext >&5 >+ >+eval "$as_ac_var=no" >+fi >+rm -f conftest.$ac_objext conftest$ac_exeext conftest.$ac_ext >+fi >+echo "$as_me:$LINENO: result: `eval echo '${'$as_ac_var'}'`" >&5 >+echo "${ECHO_T}`eval echo '${'$as_ac_var'}'`" >&6 >+if test `eval echo '${'$as_ac_var'}'` = yes; then >+ cat >>confdefs.h <<_ACEOF >+#define `echo "HAVE_$ac_func" | $as_tr_cpp` 1 >+_ACEOF >+ >+fi >+done >+ >+# End PTHREADS mods by James Lemley, 2003 > > # /*********************************************************************** > # // Write output files >diff -Nuar lzop-1.01/configure.ac lzop-1.01-parallel/configure.ac >--- lzop-1.01/configure.ac Sun Apr 27 17:00:02 2003 >+++ lzop-1.01-parallel/configure.ac Mon Jul 14 15:44:22 2003 >@@ -130,6 +130,15 @@ > AC_CHECK_FUNCS(access alloca atoi atol chmod chown ctime difftime fstat gettimeofday gmtime localtime lstat memcmp memcpy memmove memset mktime snprintf strchr strdup strerror strftime strrchr umask utime vsnprintf) > AC_CHECK_FUNCS(fchmod stricmp strnicmp) > >+# Begin PTHREADS mods by James Lemley, 2003 >+AC_SEARCH_LIBS(pthread_join,pthread pthreads,,[ >+ echo "Can't find a POSIX threads library. If one is actually installed," >+ echo "please fix configure.in and submit a patch back to the maintainer." >+ echo "Thanks!" >+ exit 1 >+]) >+AC_CHECK_FUNCS(pthread_join) >+# End PTHREADS mods by James Lemley, 2003 > > # /*********************************************************************** > # // Write output files >diff -Nuar lzop-1.01/src/conf.h lzop-1.01-parallel/src/conf.h >--- lzop-1.01/src/conf.h Sun Apr 27 17:00:02 2003 >+++ lzop-1.01-parallel/src/conf.h Mon Jul 14 15:54:33 2003 >@@ -79,6 +79,14 @@ > # error > #endif > >+/* -- James Lemley -- adding support for POSIX threads via autoconf. */ >+#ifdef HAVE_PTHREAD_JOIN >+#define USE_PTHREADS >+#endif >+ >+#ifdef USE_PTHREADS >+#include <pthread.h> >+#endif > > /************************************************************************* > // >@@ -596,6 +604,26 @@ > /************************************************************************* > // mblock.c > **************************************************************************/ >+#ifdef USE_PTHREADS >+ >+/* define the states buffers can be in for parallel operation: */ >+/* input buffers go from empty to filled to busy to empty again */ >+/* output buffers go from empty to busy to filled to empty again */ >+/* worker threads shut down when they find an EOF flag on an input buffer */ >+ >+#define BUFFER_EMPTY 0x00000100L >+#define BUFFER_FILLED 0x00000200L >+#define BUFFER_BUSY 0x00000400L >+#define BUFFER_EOF 0x00000800L >+ >+/* More defines for parallel operation: */ >+#ifndef MAX_THREADS >+#define MAX_THREADS 128 >+#endif >+ >+#define DEFAULT_THREADS 8 >+ >+#endif /* USE_PTHREADS */ > > typedef struct > { >@@ -605,6 +633,14 @@ > lzo_uint32 size; > lzo_uint32 align; > lzo_uint32 flags; >+#ifdef USE_PTHREADS >+ /* flags is initially set to 0x00000001 in mblock.c */ >+ /* 0x00000001 means "memory allocated"; otherwise unused. */ >+ /* I will use this field for parallel buffer status. */ >+ lzo_uint32 adler32; /* moved from local storage in lzo_compress to buffer areas */ >+ lzo_uint32 crc32; /* for parallel processing */ >+ lzo_uint32 len; /* for parallel processing */ >+#endif /* USE_PTHREADS */ > } > memblock_t; > #define memblock_p memblock_t * >diff -Nuar lzop-1.01/src/help.c lzop-1.01-parallel/src/help.c >--- lzop-1.01/src/help.c Sun Apr 27 17:00:02 2003 >+++ lzop-1.01-parallel/src/help.c Mon Jul 14 15:28:03 2003 >@@ -50,6 +50,10 @@ > " Laetor Zonas Obteri\n" > " Copyright (C) 1996, 1997, 1998, 1999, 2000, 2001, 2002, 2003\n" > "lzop v%-11s Markus Franz Xaver Johannes Oberhumer %20s\n" >+#ifdef USE_PTHREADS >+ "\n##POSIX thread enabled patch 05 contributed by James Lemley, 2002, 2003##\n" >+ "For information on this version only, see http://lemley.net/lzop_patches/\n" >+#endif > "\n", > LZOP_VERSION_STRING, LZOP_VERSION_DATE); > fg = con_fg(f,fg); >@@ -129,8 +133,14 @@ > " -S.suf use suffix .suf on compressed files\n" > " -F do *not* store or verify checksum of files (faster)\n" > " -U delete input files after successful operation (like gzip and bzip2)\n" >+#ifdef USE_PTHREADS >+" -TN use N parallel compression threads (default %d max %d)\n" >+" file.. files to (de)compress. If none given, try standard input.\n" >+, dn, dN, DEFAULT_THREADS, MAX_THREADS); >+#else > " file.. files to (de)compress. If none given, try standard input.\n" > , dn, dN); >+#endif /* USE_PTHREADS */ > } > > >diff -Nuar lzop-1.01/src/lzop.c lzop-1.01-parallel/src/lzop.c >--- lzop-1.01/src/lzop.c Sun Apr 27 17:00:02 2003 >+++ lzop-1.01-parallel/src/lzop.c Mon Jul 14 15:28:03 2003 >@@ -103,6 +103,9 @@ > static unsigned long total_bytes_written = 0; > static unsigned long total_bytes_read = 0; > >+#ifdef USE_PTHREADS >+extern int num_threads; >+#endif > > /************************************************************************* > // exit handlers >@@ -2549,6 +2552,21 @@ > case 't': > set_cmd(CMD_TEST); > break; >+#ifdef USE_PTHREADS >+ case 'T': /* set number of compression threads for parallel compression */ >+ if (mfx_optarg && isdigit(mfx_optarg[0])) >+ { >+ num_threads = atoi(mfx_optarg); >+ if (num_threads > MAX_THREADS) >+ { >+ fprintf(stderr, "%s: %d threads requested; using maximum of %d\n", argv0, num_threads, MAX_THREADS); >+ num_threads = MAX_THREADS; >+ } >+ if (num_threads < 1) >+ num_threads = 1; >+ } >+ break; >+#endif > case 'U': > opt_unlink = 1; > break; >@@ -2749,6 +2767,9 @@ > {"path", 1, 0, 'p'+256}, > {"quiet", 0, 0, 'q'}, /* quiet mode */ > {"silent", 0, 0, 'q'}, /* quiet mode */ >+#ifdef USE_PTHREADS >+ {"threads", 1, 0, 'T'}, /* set number of threads for parallel operation */ >+#endif > {"stdout", 0, 0, 'c'}, /* write output on standard output */ > {"suffix", 1, 0, 'S'}, /* use given suffix instead of .lzo */ > {"to-stdout", 0, 0, 'c'}, /* write output on standard output */ >@@ -2811,6 +2832,9 @@ > {"no-warn", 0, 0, 525}, /* do not display any warnings */ > {"quiet", 0, 0, 'q'}, /* quiet mode */ > {"silent", 0, 0, 'q'}, /* quiet mode */ >+#ifdef USE_PTHREADS >+ {"threads", 1, 0, 'T'}, /* set number of threads for parallel operation */ >+#endif > {"unlink", 0, 0, 'U'}, > {"verbose", 0, 0, 'v'}, /* verbose mode */ > >diff -Nuar lzop-1.01/src/p_lzo.c lzop-1.01-parallel/src/p_lzo.c >--- lzop-1.01/src/p_lzo.c Sun Apr 27 17:00:02 2003 >+++ lzop-1.01-parallel/src/p_lzo.c Mon Jul 14 15:28:03 2003 >@@ -28,6 +28,20 @@ > > #include "conf.h" > >+#ifdef USE_PTHREADS >+/* POSIX threads code added by James Lemley, 2002-2003 */ >+ >+/* define some things for parallel processing */ >+#define MAX_BUFFERS (MAX_THREADS * 4) >+int num_threads = DEFAULT_THREADS; >+#define num_buffers (num_threads * 4 ) /* must be at least num_threads * 2, and must be even number */ >+pthread_mutex_t buffer_mutex; /* to lock buffer arrays for checking && setting flags */ >+#define IO_SLEEP_USEC 20 /* how long do I/O threads sleep in microseconds if they can't get a buffer. */ >+#define COMP_SLEEP_USEC 10 /* how long do worker threads sleep in microseconds if they can't get a buffer. */ >+ >+/* define a macro for easy debugging print */ >+#define D(x, y) if(0) {fprintf(stderr, x, y); fflush(stderr);} >+#endif /* USE_PTHREADS */ > > #if defined(WITH_LZO) > >@@ -172,23 +186,57 @@ > #endif > > >+#ifdef USE_PTHREADS >+static memblock_t block1[MAX_BUFFERS]; /* define buffers in array instead of discreetly */ >+#else > static memblock_t block1; > static memblock_t block2; > static memblock_t wrkmem; >+#endif /* USE_PTHREADS */ > > > static void free_mem(void) > { >+#ifdef USE_PTHREADS >+ int i; >+ for (i = 0; i < num_buffers; i++) >+ { >+ mb_free(block1+i); >+ } >+#else > mb_free(&wrkmem); > mb_free(&block2); > mb_free(&block1); >+#endif /* USE_PTHREADS */ > } > >+#ifdef USE_PTHREADS >+static lzo_uint32 W; /* holds work_len for worker memory size, in each thread. */ >+#endif /* USE_PTHREADS */ > > static lzo_bool alloc_mem(lzo_uint32 s1, lzo_uint32 s2, lzo_uint32 w) > { > lzo_bool r = 1; > >+#ifdef USE_PTHREADS >+#if defined(USE_MALLOC) >+ int i; >+ for (i = 0; r && i < num_buffers; i+=2) >+ r &= mb_alloc(block1+i, s1, ALIGN_SIZE); >+ for (i = 0; r && i < num_buffers; i+=2) >+ r &= mb_alloc(block1+i+1, s2, ALIGN_SIZE); >+ >+ if (pthread_mutex_init(&buffer_mutex, NULL)) >+ { >+ fprintf(stderr, "pthread_mutex_init failed\n"); >+ r = 0; >+ } >+ /* save W for allocating workmem in each worker thread */ >+ W = w; >+#else >+#error "parallel version requires use of malloc()" >+#endif /* USE_MALLOC */ >+#else /* if not defined USE_PTHREADS */ > #if defined(USE_MALLOC) > r &= mb_alloc(&block1, s1, ALIGN_SIZE); > r &= mb_alloc(&block2, s2, ALIGN_SIZE); >@@ -197,7 +245,8 @@ > r &= mb_init(&block1, s1, ALIGN_SIZE, heap_block1, sizeof(heap_block1)); > r &= mb_init(&block2, s2, ALIGN_SIZE, heap_block2, sizeof(heap_block2)); > r &= mb_init(&wrkmem, w, ALIGN_SIZE, heap_wrkmem, sizeof(heap_wrkmem)); >-#endif >+#endif /* defined(USE_MALLOC) */ >+#endif /* USE_PTHREADS */ > if (!r) > free_mem(); > return r; >@@ -282,6 +331,321 @@ > } > > >+#ifdef USE_PTHREADS >+/************************************************************************* >+// compress a file using POSIX threads >+**************************************************************************/ >+ >+lzo_bool lzo_compress_worker(const header_t *h); >+lzo_bool lzo_compress_reader(file_t *fip); >+ >+lzo_bool lzo_compress_parallel(file_t *fip, file_t *fop, const header_t *h) >+{ >+ int i; >+ int r = 1; /* changed 2002-12-23 */ >+ lzo_uint32 output_sequence = 0; >+ pthread_t worker_threads[MAX_THREADS]; >+ pthread_t reader_thread; >+ void *thread_ret; >+ >+ /* set initial state to BUFFER_EMPTY on all input and output buffers. */ >+ for (i = 0; i < num_buffers; i++) >+ { >+ block1[i].flags &= ~(BUFFER_BUSY | BUFFER_FILLED | BUFFER_EOF); >+ block1[i].flags |= BUFFER_EMPTY; >+ } >+ >+ /* start dedicated reader thread */ >+ if (pthread_create(&reader_thread, NULL, (void *)(void *)lzo_compress_reader, (void *) fip)) >+ { >+ fprintf(stderr, "Unable to create reader thread\n"); >+ /* print errno here too */ >+ exit(1); >+ } >+ >+ /* start worker threads */ >+ for (i = 0; i < num_threads; i++) >+ { >+ if (pthread_create(&worker_threads[i], NULL, (void *)(void *)lzo_compress_worker, (void *) h)) >+ { >+ fprintf(stderr, "Unable to create worker thread %d\n", i); >+ /* print errno here too */ >+ exit(1); >+ } >+ } >+ >+ for (;;) >+ { >+ /* troll for THE ready output buffer - identified by output_sequence and BUFFER_FILLED */ >+ >+D("lzo_compress: %s\n", "trolling for finished output buffer..."); >+ pthread_mutex_lock(&buffer_mutex); >+ for (i = 0; i < num_buffers; i+=2) >+ { >+ if (block1[i+1].flags & BUFFER_FILLED) /* an output buffer that is ready */ >+ if ((block1[i+1].flags >> 16) == output_sequence) /* and it is the one I am waiting on */ >+ break; >+ } >+ if (i != num_buffers) /* found work */ >+ { >+D("lzo_compress: %s\n", "found it! writing block."); >+ block1[i+1].flags &= ~BUFFER_FILLED; /* mark output buffer as busy */ >+ block1[i+1].flags |= BUFFER_BUSY; >+ >+ pthread_mutex_unlock(&buffer_mutex); /* unlock buffers for file write that may block */ >+ >+ /* write uncompressed block size */ >+ write32(fop,block1[i].len); >+D("lzo_compress: uncompressed size: %d\n", block1[i].len); >+D("lzo_compress: compressed size: %d\n", block1[i+1].len); >+ >+ /* exit if last block */ >+ if (block1[i].len == 0) >+ break; >+ >+ /* write compressed block size */ >+ if (block1[i+1].len < block1[i].len) >+ write32(fop,block1[i+1].len); >+ else >+ write32(fop,block1[i].len); >+ >+ /* write checksum of uncompressed block */ >+ if (h->flags & F_ADLER32_D) >+ write32(fop,block1[i].adler32); >+ if (h->flags & F_CRC32_D) >+ write32(fop,block1[i].crc32); >+ >+ /* write checksum of compressed block */ >+ if (block1[i+1].len < block1[i].len && (h->flags & F_ADLER32_C)) >+ { >+ write32(fop,block1[i+1].adler32); >+ } >+ if (block1[i+1].len < block1[i].len && (h->flags & F_CRC32_C)) >+ { >+ write32(fop,block1[i+1].crc32); >+ } >+ >+ /* write compressed block data */ >+ if (block1[i+1].len < block1[i].len) >+ write_buf(fop,block1[i+1].mem,block1[i+1].len); /* this line had an error */ >+ else >+ write_buf(fop,block1[i].mem,block1[i].len); >+ >+ /* mark both input and output buffers as free */ >+ pthread_mutex_lock(&buffer_mutex); >+ block1[i+1].flags &= ~BUFFER_BUSY; >+ block1[i+1].flags |= BUFFER_EMPTY; >+ block1[i].flags &= ~BUFFER_BUSY; /* clear busy flag that the worker left set on the input buffer */ >+ block1[i].flags |= BUFFER_EMPTY; >+ pthread_mutex_unlock(&buffer_mutex); /* unlock buffers */ >+ >+ output_sequence++; >+ if (output_sequence > 0x0FFF) >+ output_sequence = 0; /* do same with input_sequence on input to keep in sync */ >+ } >+ else /* no output buffer found ready to go */ >+ { >+D("lzo_compress: %s\n", "No ouput ready. Sleeping."); >+ pthread_mutex_unlock(&buffer_mutex); /* unlock buffers */ >+ usleep (IO_SLEEP_USEC); /* give up my CPU */ >+ } >+ } >+ >+D("lzo_compress: %s\n", "Waiting on reader to join... "); >+ pthread_join(reader_thread, &thread_ret); >+D("lzo_compress: %s\n", "Waiting on workers to join... "); >+ /* wait on workers */ >+ for (i = 0; i < num_threads; i++) >+ { >+ pthread_join(worker_threads[i], &thread_ret); >+ /*FIXME -- dumps core. r &= *((lzo_bool *) thread_ret); */ >+ } >+D("lzo_compress: %s\n", "Workers joined. Exiting... "); >+ >+ return r; >+} >+ >+lzo_bool lzo_compress_worker(const header_t *h) >+{ >+ int i; >+ int r = LZO_E_OK; >+ lzo_bool input_eof = 0; >+ memblock_t wrkmem; >+ r &= mb_alloc(&wrkmem, W, ALIGN_SIZE); /* allocate work memory on a per-thread basis */ >+D(" lzo_compress_worker: %s\n", "startup!"); >+ >+ while (1) >+ { >+ /* troll for full input buffer to compress */ >+D(" lzo_compress_worker: %s\n", "trolling for filled input buffer..."); >+ pthread_mutex_lock(&buffer_mutex); >+ for (i = 0; i < num_buffers; i+=2) >+ { >+ if (block1[i].flags & BUFFER_EOF) >+ input_eof = 1; >+ if (block1[i].flags & BUFFER_FILLED) >+ break; >+ } >+ if (i == num_buffers) >+ { >+ pthread_mutex_unlock(&buffer_mutex); >+ /* no work for me... */ >+ if (input_eof) >+ { >+D(" lzo_compress_worker: %s\n", "Found EOF in input buffer and there are no other filled inputs. "); >+ break; >+ } >+ >+D(" lzo_compress_worker: %s\n", "sleeping."); >+ usleep(COMP_SLEEP_USEC); /* give up my CPU... */ >+ continue; /* go back and try again... */ >+ } >+ >+D(" lzo_compress_worker: %s\n", "Got input buffer. "); >+ block1[i].flags &= ~BUFFER_FILLED; /* clear buffer full flag so no other worker grabs this one too */ >+ block1[i].flags |= BUFFER_BUSY; /* set busy flag */ >+ /* work for this thread to do! now to find a free output buffer... */ >+D(" lzo_compress_worker: %s\n", "Got output buffer. Compressing... "); >+ block1[i+1].flags &= ~BUFFER_EMPTY; /* unset output buffer free flag */ >+ block1[i+1].flags |= BUFFER_BUSY; /* set output buffer busy flag */ >+ block1[i+1].flags &= 0x0000FFFF; /* clear any old sequence number */ >+ block1[i+1].flags |= (block1[i].flags & 0xFFFF0000); /*copy input sequence to output buffer */ >+ if (block1[i].flags & BUFFER_EOF) /* if this is the EOF buffer, don't really compress. */ >+ { >+D(" lzo_compress_worker: %s\n", "I've got the EOF buffer."); >+ /* but first set EOF flag on ouptut buffer, and zero length. */ >+ block1[i+1].flags &= ~BUFFER_BUSY; /* set output buffer busy flag */ >+ block1[i+1].flags |= BUFFER_FILLED; >+ block1[i+1].len = 0; >+ pthread_mutex_unlock(&buffer_mutex); >+ /* go back and make sure there isn't more work to do... */ >+ continue; >+ } >+ >+ pthread_mutex_unlock(&buffer_mutex); /* unlock buffers now that I have the ones I want */ >+ >+ /* compute checksum of uncompressed block */ >+ >+ if (h->flags & F_ADLER32_D) >+ block1[i].adler32 = lzo_adler32(ADLER32_INIT_VALUE,block1[i].mem,block1[i].len); >+ if (h->flags & F_CRC32_D) >+ block1[i].crc32 = lzo_crc32(CRC32_INIT_VALUE,block1[i].mem,block1[i].len); >+ >+ x_filter(block1[i].mem,block1[i].len,h); >+ >+ /* compress */ >+ if (h->method == M_LZO1X_1) >+ r = lzo1x_1_compress(block1[i].mem, block1[i].len, block1[i+1].mem, &block1[i+1].len, wrkmem.mem); >+#if defined(USE_LZO1X_1_15) >+ else if (h->method == M_LZO1X_1_15) >+ r = lzo1x_1_15_compress(block1[i].mem, block1[i].len, >+ block1[i+1].mem, &block1[i+1].len, wrkmem.mem); >+#endif >+#if defined(USE_LZO1X_999) >+ else if (h->method == M_LZO1X_999) >+ r = lzo1x_999_compress_level(block1[i].mem, block1[i].len, >+ block1[i+1].mem, &block1[i+1].len, wrkmem.mem, >+ NULL, 0, 0, h->level); >+#endif >+ else >+ fatal(NULL,"Internal error"); >+ >+#if 0 >+ fprintf(stderr, "%ld %ld %ld\n", (long)src_len, (long)dst_len, (long)block2.size); >+#endif >+ assert(block1[i+1].len <= block1[i+1].size); >+ if (r != LZO_E_OK) >+ fatal(NULL,"Internal error - compression failed"); >+ >+ /* optimize */ >+ if (opt_optimize && block1[i+1].len < block1[i].len) >+ { >+ lzo_uint32 new_len = block1[i].len; >+ r = lzo1x_optimize(block1[i+1].mem, block1[i+1].len, block1[i].mem, &new_len, NULL); >+ if (r != LZO_E_OK || new_len != block1[i].len) >+ fatal(NULL,"Internal error - optimization failed"); >+ } >+ /* compute checksum of compressed block */ >+ if (block1[i+1].len < block1[i].len && (h->flags & F_ADLER32_C)) >+ { >+ block1[i+1].adler32 = lzo_adler32(ADLER32_INIT_VALUE,block1[i+1].mem,block1[i+1].len); >+ } >+ if (block1[i+1].len < block1[i].len && (h->flags & F_CRC32_C)) >+ { >+ block1[i+1].crc32 = lzo_crc32(CRC32_INIT_VALUE,block1[i+1].mem,block1[i+1].len); >+ } >+D(" lzo_compress_worker: %s\n", "setting ouput buffer filled... "); >+ pthread_mutex_lock(&buffer_mutex); >+ /* set flags on output buffer to full - I/O thread will reset flags on input buffer */ >+ >+ block1[i+1].flags &= ~BUFFER_BUSY; >+ block1[i+1].flags |= BUFFER_FILLED; >+ >+ pthread_mutex_unlock(&buffer_mutex); >+ } >+ mb_free(&wrkmem); >+D(" *****lzo_compress_worker: %s\n", "exiting!"); >+ return r; >+} >+ >+lzo_bool lzo_compress_reader(file_t *fip) >+{ >+ int i; >+ lzo_uint32 input_sequence = 0; >+ lzo_int l; >+ >+ for (;;) >+ { >+ /* troll for an empty input buffer */ >+D(" lzo_compress_reader: %s\n", "trolling for empty input buffer..."); >+ pthread_mutex_lock(&buffer_mutex); >+ for (i = 0; i < num_buffers; i+=2) >+ if (block1[i].flags & BUFFER_EMPTY) >+ break; >+ if (i != num_buffers) /* got one */ >+ { >+D(" lzo_compress_reader: %s\n", "Got one. Reading block."); >+ block1[i].flags &= ~BUFFER_EMPTY; >+ block1[i].flags |= BUFFER_BUSY; >+ /* save buffer sequence so we can keep proper order on output */ >+ block1[i].flags ^= (block1[i].flags & 0xFFFF0000); /* clear old buffer sequence */ >+ block1[i].flags |= (input_sequence << 16); /* store new buffer sequence */ >+ input_sequence++; >+ if (input_sequence > 0x0FFF) >+ input_sequence = 0; /* do same with output_sequence on output to keep in sync */ >+ >+ pthread_mutex_unlock(&buffer_mutex); /* unlock buffers for the read - if we block, others can work */ >+ >+ /* read a block */ >+ l = read_buf(fip, block1[i].mem, block_size); >+ block1[i].len = (l > 0 ? l : 0); >+ >+ pthread_mutex_lock(&buffer_mutex); >+ block1[i].flags &= ~BUFFER_BUSY; >+ block1[i].flags |= BUFFER_FILLED; >+ if (block1[i].len == 0) >+ { >+D(" lzo_compress_reader: %s\n", "EOF."); >+ block1[i].flags |= BUFFER_EOF; /* signal all waiting threads to give up */ >+ } >+ } >+ pthread_mutex_unlock(&buffer_mutex); >+ if (i == num_buffers) >+ { >+D(" lzo_compress_reader: %s\n", " no empty input buffers - sleeping."); >+ usleep(IO_SLEEP_USEC); >+ } >+ else if (block1[i].len == 0) >+ { >+D(" lzo_compress_reader: %s\n", " bailing.."); >+ break; >+ } >+ } >+ return 1; >+} >+ >+#endif /* ifdef USE_PTHREADS */ >+ > /************************************************************************* > // compress a file > **************************************************************************/ >@@ -289,14 +653,27 @@ > lzo_bool lzo_compress(file_t *fip, file_t *fop, const header_t *h) > { > int r = LZO_E_OK; >+#ifdef USE_PTHREADS >+ /* this still needs to work for calling the thread enabled version with only 1 thread */ >+ lzo_byte * const b1 = block1[0].mem; >+ lzo_byte * const b2 = block1[1].mem; >+ memblock_t wrkmem; >+#else > lzo_byte * const b1 = block1.mem; > lzo_byte * const b2 = block2.mem; >+#endif /* USE_PTHREADS */ > lzo_uint32 src_len = 0, dst_len = 0; > lzo_uint32 c_adler32 = ADLER32_INIT_VALUE, d_adler32 = ADLER32_INIT_VALUE; > lzo_uint32 c_crc32 = CRC32_INIT_VALUE, d_crc32 = CRC32_INIT_VALUE; > lzo_int l; > lzo_bool ok = 1; > >+#ifdef USE_PTHREADS >+ /* if parallelism is requested, call lzo_compress_parallel, else fall through to original code */ >+ if (num_threads > 1) >+ return lzo_compress_parallel(fip, fop, h); >+ r &= mb_alloc(&wrkmem, W, ALIGN_SIZE); /* allocate work memory on a per-thread basis */ >+#endif > for (;;) > { > /* read a block */ >@@ -338,7 +715,11 @@ > #if 0 > fprintf(stderr, "%ld %ld %ld\n", (long)src_len, (long)dst_len, (long)block2.size); > #endif >+#ifdef USE_PTHREADS >+ assert(dst_len <= block1[1].size); >+#else > assert(dst_len <= block2.size); >+#endif /* USE_PTHREADS */ > if (r != LZO_E_OK) > fatal(fip,"Internal error - compression failed"); > >@@ -382,6 +763,9 @@ > write_buf(fop,b1,src_len); > } > >+#ifdef USE_PTHREADS >+ mb_free(&wrkmem); >+#endif > return ok; > } > >@@ -399,7 +783,13 @@ > lzo_uint32 c_crc32 = CRC32_INIT_VALUE, d_crc32 = CRC32_INIT_VALUE; > lzo_bool ok = 1; > lzo_bool use_seek; >+#ifdef USE_PTHREADS >+ /* the below was &block2 (an output block) but now block1 is an array */ >+ /* of memblock_t, allocated in input-output-input-output... order. */ >+ memblock_t * const block = block1+1; >+#else > memblock_t * const block = &block2; >+#endif /* USE_PTHREADS */ > lzo_byte * b1; > lzo_byte * const b2 = block->mem; >
You cannot view the attachment while viewing its details because your browser does not support IFRAMEs.
View the attachment on a separate page
.
View Attachment As Diff
View Attachment As Raw
Actions:
View
|
Diff
Attachments on
bug 189130
:
128304
| 128306