# Example of use of tqdm with logging and multiprocessing. # Written by Hildo Guillardi JĂșnior. # License MIT. import tqdm import logging from multiprocessing import Pool, Manager, Lock import time def main(): TOTAL=10 # Test ONE print('\n\n') progress = tqdm.tqdm(desc='P%', total=TOTAL, unit='unity', miniters=1) # Change the logging print channel to `tqdm` to keep the process bar to the end of terminal. class TqdmLoggingHandler(logging.Handler): '''Overload the class to write the logging through the `tqdm`.''' def __init__(self, level = logging.NOTSET): super(self.__class__, self).__init__(level) def emit(self, record): try: msg = self.format(record) tqdm.tqdm.write(msg) self.flush() except (KeyboardInterrupt, SystemExit): raise except: self.handleError(record) logger.addHandler(TqdmLoggingHandler()) logger.warning('Starting test of sequencial process...') for i in range(TOTAL): progress.update(1) logger.warning('Test {}'.format(i)) time.sleep(.2) logger.removeHandler(TqdmLoggingHandler()) del progress logger.warning('Finished test of sequencial process!') logger.warning('Doing other stuffs of my software not related with the process bar.') # Test TWO print('\n\n') progress = tqdm.tqdm(desc='P%', total=TOTAL, unit='unity', miniters=1) class TqdmLoggingHandler2(logging.Handler): '''Overload the class to write the logging through the `tqdm`.''' def __init__(self, level = logging.NOTSET): super(self.__class__, self).__init__(level) def emit(self, record): try: msg = self.format(record) tqdm.tqdm.write(msg) self.flush() except (KeyboardInterrupt, SystemExit): raise except: self.handleError(record) logger.addHandler(TqdmLoggingHandler2()) logger.warning('Starting test of parallel process...') p_thread = Pool(2)#int(round(TOTAL/2))) def update(x): progress.update(1) return x def my_thread(n): #print(n) logger.warning('My tread {}'.format(n)) return n**n #results = [p_thread.apply_async(my_thread, [i]) for i in range(TOTAL)] results = [p_thread.apply_async(my_thread, [i], callback=update) for i in range(TOTAL)] # Wait for all the processes finish. for r in results: while(not r.ready()): pass logger.warning('All parallels process finished with success.') p_thread.close() p_thread.join() #for r in results: # print(r.get()) # Print the results of `my_thread()`. logger.removeHandler(TqdmLoggingHandler2()) del progress logger.warning('Doing other stuffs of my software not related with the process bar.') if __name__ == '__main__': logger = logging.getLogger('my') main()