Python多段程和多过程(五) 多段程管理方法&m

摘要:Q1:最先,为何必须进程池?由于大家期待可以维持一定总数的高并发进程处在实行情况,让处在实行情况的进程数不会太少都不对于过多,提升每日任务实行高效率。Q2:数据信号量...

Q1:最先,为何必须进程池?
由于大家期待可以维持一定总数的高并发进程处在实行情况,让处在实行情况的进程数不会太少都不对于过多,提升每日任务实行高效率。

Q2:数据信号量 semaphore 还可以完成维持一定总数的高并发进程,为何也要用进程池?
1. 由于进程池能够监管每个进程和每日任务的实行情况,能够获得某一每日任务的回到值,而数据信号量不好。

2. 进程池中的每一个进程进行一个每日任务后不容易消毁只是会再次实行下一个每日任务,做到进程重复使用的实际效果,而数据信号量不好。要了解进程是珍贵的資源,随便的建立和完毕进程是对資源的一种消耗,经常的建立和关掉进程也会减少高效率。
因此,进程池才算是多段程的最好实践活动方法!

 

进程池的应用方式:

from concurrent.future import ThreadPoolExecutor
executor = ThreadPoolExecutor(max_workers)  # 建立进程池目标,max_workers特定能够同时处在运作情况的进程数
task = executor.submit(fn,args)  # 将一个每日任务放进每日任务序列,而且start()一个进程,并让这一进程从序列取下和进行fn这一每日任务,args是fn每日任务涵数的主要参数;回到的task是一个Future目标
task.done()     # 分辨每日任务是不是进行
task.cancel()   # 撤销每日任务,仅有每日任务还未刚开始实行才能够撤销
task.result()   # 获得每日任务的結果(回到值),这一方式是堵塞的

 

下边剖析一下 ThreadPoolExecutor类和Future类 的源代码Future源代码:
 

class Future(object):
    """Represents the result of putation."""
    # 汉语翻译:Future是一个储存着多线程每日任务的实行結果和运作情况的器皿;自然Future不仅仅一个器皿,还承担一个每日任务的結果获得,撤销,分辨是不是进行,出现异常设定等作用。一个Future相匹配着一个每日任务(每日任务涵数)的結果和情况。
    def __init__(self):
        """Initializes the future. Should not be called by clients."""
        # 汉语翻译:Future目标不可以在顾客端脚本制作案例化,只有在ThreadPoolExecutor那样的python內部编码中案例化
        
        self._condition = threading.Condition()     # 建立了一个标准自变量
        self._state = PENDING                       # 每日任务情况默认设置是“已经实行”
        self._result = None                         # 每日任务結果默认设置为空,由于每日任务还没有刚开始实行或是已经实行
        self._exception = None              
        self._waiters = []                          
        self._done_callbacks = []                   
    def _invoke_callbacks(self):
        for callback in self._done_callbacks:
            try:
                callback(self)
            except Exception:
                LOGGER.exception('exception calling callback for %r', self)
    # 用以撤销实行某一个每日任务
    def cancel(self):
        """Cancel the future if possible.
        Returns True if the future was cancelled, False otherwise. A future
        cannot be cancelled if it is running or pleted.
        """
        # 汉语翻译: 不可以撤销一个已经实行或已实行完的每日任务。只有撤销还未实行的每日任务。什么叫未实行的每日任务?进程池ThreadPoolExecutor限制了能同时处在实行情况的进程数n,但往进程池中加上的每日任务数m假如超出了n,m个每日任务中仅有n个能够同时实行,那麼m-n那一部分每日任务必须等候前边的每日任务实行完才可以刚开始实行,这种便是还未实行的每日任务。
        
        with self._condition:
            if self._state in [RUNNING, FINISHED]:
                return False
            if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
                return True
            self._state = CANCELLED
            self._condition.notify_all()    # 撤销每日任务的情况下,会唤起 get_result()中标准自变量的wait(),目地是以便告知get_result说:“无须等候已撤销的每日任务的結果了”
        self._invoke_callbacks()
        return True
    # 分辨这一Future每日任务是不是处在“已被撤销”情况
    def cancelled(self):
        """Return True if the future was cancelled."""
        with self._condition:
            return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]
    # 分辨这一Future每日任务是不是处在“已经运作”情况
    def running(self):
        """Return True if the future is currently executing."""
        with self._condition:
            return self._state == RUNNING
    # 分辨这一Future每日任务是不是处在“实行结束”情况
    def done(self):
        """Return True of the future was cancelled or finished executing."""
        with self._condition:
            return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]
    # 获得每日任务涵数的回到值
    def __get_result(self):
        if self._exception:
            raise self._exception
        else:
            return self._result
    # 加上每日任务涵数实行后应实行的涵数
    def add_done_callback(self, fn):
        """Attaches a callable that will be called when the future finishes.
        Args:
            fn: A callable that will be called with this future as its only
                argument when pletes or is cancelled. The callable
                will always be called by a thread in the same process in which
                it was added. If the future pleted or been
                cancelled then the callable will be called immediately. These
                callables are called in the order that they were added.
        """
        # 汉语翻译:该方式用以加上一个涵数,这一涵数会在每日任务涵数进行或是被撤销的情况下实行。涵数会被储放到_done_callbacks这一目录中。因此能够为一个每日任务涵数加上好几个那样的涵数。这种_done_callbacks中的涵数的实行次序是依照存进目录的次序实行的。
        
        with self._condition:
            if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]:
                self._done_callbacks.append(fn)
                return
        fn(self)

    # 获得每日任务实行的結果,因为应用了_self.condition.wait()方式,因此是个堵塞的方式。     def result(self, timeout=None):         """Return the result of the call that the future represents.         Args:             timeout: The number of seconds to wait for the result if the future                 isn't done. If None, then there is no limit on the wait time.         Returns:             The result of the call that the future represents.         Raises:             CancelledError: If the future was cancelled.             TimeoutError: If the future didn't finish executing before the given                 timeout.             Exception: If the call raised then that exception will be raised.         """         # 汉语翻译:获得Future每日任务涵数的回到值。因为要获得每日任务回到值,因此毫无疑问要等每日任务实行完才可以获得回到值,因此result()方式会堵塞等候Future目标中的每日任务涵数实行完才可以被唤起。                  with self._condition:             if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:                 raise CancelledError()      # 假如对已撤销的每日任务获得回到值結果会出错             elif self._state == FINISHED:   # 假如对已实行结束的每日任务获得結果则不用等候,立即回到結果                 return self.__get_result()             self._condition.wait(timeout)   # 假如对已经实行的每日任务或是还未实行的每日任务获得結果,则必须等候,直到每日任务完毕了才可以获得結果             # 这时 wait() 被唤起,可是仍要再分辨一次每日任务的情况,由于这一每日任务将会是还未实行的每日任务被撤销了,撤销每日任务是也会notify()唤起wait()             if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:                 raise CancelledError()             elif self._state == FINISHED:                 return self.__get_result()             else:                 raise TimeoutError()     # 用以获得每日任务实行时报的出现异常     def exception(self, timeout=None):         """Return the exception raised by the call that the future represents.         Args:             timeout: The number of seconds to wait for the exception if the                 future isn't done. If None, then there is no limit on the wait                 time.         Returns:             The exception raised by the call that the future represents or None             if pleted without raising.         Raises:             CancelledError: If the future was cancelled.             TimeoutError: If the future didn't finish executing before the given                 timeout.         """         with self._condition:             if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:                 raise CancelledError()             elif self._state == FINISHED:                 return self._exception             self._condition.wait(timeout)             if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:                 raise CancelledError()             elif self._state == FINISHED:                 return self._exception             else:                 raise TimeoutError()     # 设定每日任务結果     def set_result(self, result):         """Sets the return value of work associated with the future.         Should only be used by Executor implementations and unit tests.         """         # 这一方式只有在每日任务实行结束时线上程池的进程中启用,而不可以在主进程中启用                  with self._condition:             self._result = result       # 将每日任务結果存到目标特性中             self._state = FINISHED      # 将每日任务情况改成完毕             for waiter in self._waiters:                 waiter.add_result(self)             self._condition.notify_all()    # 唤起和通告 result()中的wait(),说:“每日任务涵数的結果早已取得了,能够把結果回到给主进程了”         self._invoke_callbacks()    # 实行每日任务涵数进行后应实行的涵数     def set_exception(self, exception):         """Sets the result of the future as being the given exception.         Should only be used by Executor implementations and unit tests.         """         with self._condition:             self._exception = exception             self._state = FINISHED             for waiter in self._waiters:                 waiter.add_exception(self)             self._condition.notify_all()         self._invoke_callbacks()         


       
PS:像cancel,done,add_done_callback,get_result,set_result等方式全是在 with self._condition 下实行的,寓意着这种实际操作全是加锁实行的,因此Future是进程安全性的。

小结:
1.Future目标是用以进行:每日任务的結果设定,結果获得,撤销实行,监管每日任务情况的一个目标,而且会将每日任务的情况和結果储存到目标组员自变量中。
2.Future目标中,怎样完成在主进程获得在别的进程中多线程实行的每日任务的結果?答:应用标准自变量完成,在每日任务没实行完的情况下堵塞主进程,在每日任务实行完后唤起主进程并回到結果;自然假如每日任务早已进行的状况下来获得結果则不容易堵塞。
3.Future不储存每日任务涵数自身,在Future的__init__中并沒有组员自变量是用于储存每日任务涵数,都没有看到有每日任务涵数传到__init__中。
4.Future不辜负责实行每日任务涵数
 

_WorkItem源代码:

class _WorkItem(object):
    def __init__(self, future, fn, args, kwargs):       # 接纳一个Future目标,fn 每日任务涵数,args 每日任务涵数的主要参数
        self.future = future    # 每日任务涵数的結果和情况的储存器皿
        self.fn = fn            # 每日任务涵数
        self.args = args
        self.kwargs = kwargs
    # 实行每日任务涵数,并将每日任务的結果储存到future目标中。这一run方式是在 _WorkItem 的 _worker方式中启用的
    def run(self):
        if not self.future.set_running_or_notify_cancel():
            return
        try:
            result = self.fn(*self.args, **self.kwargs)     # 实行每日任务涵数
        except BaseException as exc:
            self.future.set_exception(exc)
            # Break a reference cycle with the exception 'exc'
            self = None
        else:   
            self.future.set_result(result)                  # 将每日任务的結果储存到future目标中
# 用以从每日任务序列中弹出来每日任务器皿 _WorkItem 目标,而且实行每日任务器皿的 run() 方式。 该方式是在 ThreadPoolExecutor 的 _adjust_thread_count 方式中启用
def _worker(executor_reference, work_queue):
    try:
        while True:         # 这儿是一个死循环系统,表明进程池的进程也不是停从每日任务序列取每日任务实行的
            work_item = work_queue.get(block=True)
            if work_item is not None:
                work_item.run()         # work_item便是 _WorkItem 目标自身,run()功效便是运作每日任务涵数
                # Delete references to object. See issue16284
                del work_item           # 每日任务实行完以后,消毁 每日任务器皿目标
                continue
            executor = executor_reference()
            # Exit if:
            #   - The interpreter is shutting down OR
            #   - The executor that owns the worker has been collected OR
            #   - The executor that owns the worker has been shutdown.
            if _shutdown or executor is None or executor._shutdown:
                # Notice other workers
                work_queue.put(None)
                return
            del executor
    except BaseException:
        _base.LOGGER.critical('Exception in worker', =True)

 

小结: _WorkItem储放着一个每日任务涵数和这一每日任务相匹配的Future目标,_WorkItem承担实行每日任务涵数,并将每日任务涵数的結果放进Future中。
_WrokItem实质是一个器皿,能够觉得它便是一个每日任务(目标)。


ThreadPoolExecutor 进程池 源代码:
 

class ThreadPoolExecutor(_base.Executor):
    def __init__(self, max_workers=None, thread_name_prefix=''):
        if max_workers is None:
            max_workers = (os.cpu_count() or 1) * 5     # 默认设置较大的高并发进程是CPU核数的5倍
        if max_workers = 0:
            raise ValueError("max_workers must be greater than 0")
        self._max_workers = max_workers     # 较大高并发进程数
        self._work_queue = queue.Queue()    # 每日任务序列,用以储放每日任务器皿 _WorkItem
        self._threads = set()               # 进程池,用以储放已经运作的进程
        self._shutdown = False              # 标识,用以标识进程池是不是已关掉
        self._shutdown_lock = threading.Lock()  # 互斥锁,用以确保进程加上到 self._threads结合中是进程安全性的
        self._thread_name_prefix = (thread_name_prefix or
                                    ("ThreadPoolExecutor-%d" % self._counter()))    # 用以转化成进程名字的作为前缀
    # 递交一个每日任务到进程池中
    def submit(self, fn, *args, **kwargs):
        with self._shutdown_lock:
            if self._shutdown:
                raise RuntimeError('cannot schedule new futures after shutdown')
            f = _base.Future()      # 转化成一个Future目标
            w = _WorkItem(f, fn, args, kwargs)  # 将每日任务涵数和Future目标都放进 _WorkItem器皿中
            self._work_queue.put(w)     # 将 _WorkItem 每日任务器皿放进每日任务序列中
            self._adjust_thread_count()     # 为当今每日任务建立一个进程,而且起动进程运作每日任务
            return f
    submit.__doc__ = _base.Executor.submit.__doc__

    def _adjust_thread_count(self):         # When the executor gets lost, the weakref callback will wake up         # the worker threads.         def weakref_cb(_, q=self._work_queue):             q.put(None)         # TODO(bquinlan): Should avoid creating new threads if there are more         # idle threads than items in the work queue.                  num_threads = len(self._threads)         if num_threads self._max_workers:     # 分辨已建立的进程数是不是超出限定,假如超出限定则不做一切事儿             thread_name = '%s_%d' % (self._thread_name_prefix or self,                                      num_threads)                                                   # 这方面很重要,建立一个进程,而且,进程的总体目标涵数并不是每日任务涵数,只是 _WorkItem中的_worker方式             # _worker方式做的事儿是:不断的从每日任务序列 self._work_queue 中取下每日任务实行             # 因此, ThreadPoolExecutor进程池并不是为一个每日任务转化成一个进程,只是老先生成一定量分析的进程,让这种进程不断的从每日任务序列取每日任务实行,也就是说,这种进程不容易被消毁,只是能够重复使用。             t = threading.Thread(name=thread_name, target=_worker,                                  args=(weakref.ref(self, weakref_cb),                                        self._work_queue))             t.daemon = True     # 将进程设成守卫进程             t.start()           # 运作进程             self._threads.add(t)             _threads_queues[t] = self._work_queue     # 这一方式也太重要:用以关掉一个进程池。他的方式较为独特:当生产制造者已不往每日任务序列转化成商品了,便可以启用这一shutdown方式,这时会将_shutdown标示设成True,而且往每日任务序列中加上一个None,而加上了这一None便是重要。     # 由于当生产制造者转化成结束的情况下,消費者不一定消費结束,而当消費者进程消費结束全部每日任务的情况下,假如每日任务序列为空,消費者进程会被序列堵塞。这一情况下全部进程不可以完毕。可是假如我还在每日任务序列最终put一个None,当消費者进程取得这一None的情况下能够做下分辨,并跳出来while True循环系统进而完毕进程,让进程获得释放出来。     def shutdown(self, wait=True):         with self._shutdown_lock:             self._shutdown = True             self._work_queue.put(None)      # 往每日任务序列中加上一个None,它是关掉进程的重要         if wait:             for t in self._threads:                 t.join()     shutdown.__doc__ = _base.Executor.shutdown.__doc_     


PS:shutdown的启用机会:当生产制造者将全部商品放进每日任务序列,已不会生产制造新产品时启用。
启用这一方式的情况下,每日任务将会还没有消費完,可是每日任务一定早已生产制造完。
这一方式用以当全部每日任务消費结束后,通告进程池完毕全部进程,防止进程处在堵塞情况。
启用shutdown的情况下沒有通告进程池完毕全部进程,只是每日任务消費结束曾任务序列中的None原素会通告进程池完毕全部进程
   


学习培训多段程重要并不是学如何用python的多段程涵数和类,只是掌握其基本原理和设计方案方式,那样自身也可以设计方案出较为好的多段程实体模型。


上边掌握了进程池的设计方案方式和基本原理,接下去大家用进程池来仿真模拟完成一个简易的网络爬虫:

from threading import Semaphore,Thread
import threading
from time import sleep
from concurrent.futures import ThreadPoolExecutor
# 用以获得一页目录页的url
def get_url(current_page=1,per_rows=10):
    sleep(0.5)    # 仿真模拟抓取
    urls = []
    for i in range(1,per_rows+1):
        id = (current_page-1)*per_rows+i
        " % id
        urls.append(url)
    print("%s 抓取目录页 %d 取得成功" % (threading.current_thread().getName(),current_page))
    return urls
# 用以抓取一个
def get_detail(url):
    sleep(0.2)  # 仿真模拟抓取
    print("%s 抓取连接 %s 取得成功" % (threading.current_thread().getName(),url))

    for page in range(1,pages+1):         future = pool.submit(get_url,page)   # 加上到每日任务序列中,由进程取下每日任务并实行每日任务         results.append(future)     # 获得結果     for future in results:         result_urls = future.result()   # 堵塞获得結果         # 获得到url后实行         for url in result_urls:             pool.submit(get_detail,url)  # 因为抓取


結果以下:
ThreadPoolExecutor-0_0 抓取目录页 1 取得成功
ThreadPoolExecutor-0_2 抓取目录页 3 取得成功
ThreadPoolExecutor-0_1 抓取目录页 2 取得成功
ThreadPoolExecutor-0_3 抓取目录页 4 取得成功
ThreadPoolExecutor-0_7 抓取目录页 8 取得成功
ThreadPoolExecutor-0_5 抓取目录页 6 取得成功
ThreadPoolExecutor-0_9 抓取目录页 10 取得成功
ThreadPoolExecutor-0_6 抓取目录页 7 取得成功
ThreadPoolExecutor-0_8 抓取目录页 9 取得成功
ThreadPoolExecutor-0_4 抓取目录页 5 取得成功
取得成功
取得成功
取得成功
取得成功
取得成功
取得成功
取得成功
取得成功

.......

能看出,在程序中,我submit()好多好多次每日任务,可是結果显示信息,建立的进程仅有ThreadPoolExecutor-0_0~9 这10个进程。表明,进程并不是实行完以后就消毁随后又转化成新进程,只是能够重复使用进程实行好几个每日任务,让一个进程不断地从每日任务序列里边取下每日任务实行。
 

可使用map方式简单化上边的启用:

if __name__=="__main__":
    # 建立一个进程池
    pool = ThreadPoolExecutor()
    # 先抓取目录页
    pages = 100
    for result in pool.map(get_url,list(range(pages))):
        # result便是
        for url in result:
            pool.submit(get_detail,url)
            


那样启用和上边的实际效果是一样的。
map()方式是进程池的方式,第一参是每日任务涵数,第二参是每日任务涵数的主要参数,是一个目录,目录有是多少个原素就表明要实行是多少个每日任务涵数。map()內部会实行一个for循环系统启用submit()递交和实行每日任务,随后会对每个submit()回到的future目标启用result()获得結果。map內部应用了yield转化成器。每每一个每日任务回到了断果便会取值给result自变量。

因此 for result in pool.map(get_url,list(range(pages))) 中的 result 立即便是每日任务涵数的回到值,而并不是future目标。下边贴出来 map() 的源代码:

    def map(self, fn, *iterables, timeout=None, chunksize=1):
        if timeout is not None:
            end_time = timeout + time.monotonic()
        fs = [self.submit(fn, *args) for args in zip(*iterables)]
        def result_iterator():
            try:
                # reverse to keep finishing order
                fs.reverse()
                while fs:
                    # Careful not to keep a reference to the popped future
                    if timeout is None:
                        yield fs.pop().result()     # 堵塞
                    else:
                        yield fs.pop().result(end_time - time.monotonic())
            finally:
                for future in fs:
                    future.cancel()
        return result_iterator()


plete() 启用: 

if __name__=="__main__":
    # 建立一个进程池
    pool = ThreadPoolExecutor()
    # 先抓取目录页
    pages = 100
    futures = [pool.submit(get_url,page) for page in range(pages)]
    for future pleted(futures):
        urls=future.result()
        for url in urls:
            pool.submit(get_detail,url)

pleted()pleted()会堵塞直至有某一每日任务进行了,就将这一future赋给for循环系统中,而这时future.result()不容易堵塞。能进到到for编码块中的future全是早已启用过_set_result()的future,全是早已实行结束的每日任务,这时future.result()是不用等候的。

举个案子:我想进行 A B C D E 五个每日任务,五个每日任务各自必须5,4,3,2,一秒。我submit的次序是 A B C D E 
这时,1s后E的future先获得,for循环系统了一次,随后进到堵塞。又已过1s后,获得D的future,for又循环系统了一次。为此类推,进行这一for循环系统要花5秒


上边这种启用方式還是有一个小缺点:
他只有抓取完全部目录页才可以刚开始爬

# 用以抓取一个
def get_detail(url):
    sleep(0.2)  # 仿真模拟抓取
    print("%s 抓取连接 %s 取得成功" % (threading.current_thread().getName(),url))

        id = (current_page-1)*per_rows+i         " % id         pool.submit(get_detail,url)         # urls.append(url)     print("%s 抓取目录页 %d 取得成功" % (threading.current_thread().getName(),current_page)) if __name__=="__main__":      # 建立一个进程池     pool = ThreadPoolExecutor()     # 先抓取目录页     pages = 100     futures=[]     for page in range(1,pages+1):         future = pool.submit(get_url,pool,page)         futures.append(future)     for future in futures:         future.result()      # 这时get_url是沒有回到值的,这儿获得每日任务結果的目地是等候全部get_url()每日任务的进行,不然会因为主进程的完毕而造成别的进程没实行完就完毕(由于别的进程是以守卫进程的方式存有的)。自然这儿只确保了目录页每日任务的进行,不可以确保

 

张柏沛IT技术性blog > Python多段程和多过程(五) 多段程管理方法 进程池

点一下拷贝转截该一篇文章



联系我们

全国服务热线:4000-399-000 公司邮箱:343111187@qq.com

  工作日 9:00-18:00

关注我们

官网公众号

官网公众号

Copyright?2020 广州凡科互联网科技股份有限公司 版权所有 粤ICP备10235580号 客服热线 18720358503

技术支持:凡科小程序