• 周三. 4月 17th, 2024

5G编程聚合网

5G时代下一个聚合的编程学习网

热门标签

Python用法速查@进程-线程-协程

admin

11月 28, 2021

进程

进程是正在运行程序的实例,一个程序可以有多个进程。

pid=os.fork()

作用:拷贝一份父进程(代码段,堆栈段,数据段)的数据来创建一个子进程。
返回:父进程返回子进程进程号,子进程返回0。
特点:子进程父进程相互独立,不在共享任何数据。

#以下代码在Linux下执行,window下没有fork函数
import os

pid = os.fork()

if pid < 0:
    print 'Fail to create process'
elif pid == 0:
    print 'I am child process (%s) and my parent is (%s).' % (os.getpid(), os.getppid()) #os.getpid()获得当前(子)进程号,os.getppid()获得父进程号
else:
    print 'I (%s) just created a child process (%s).' % (os.getpid(), pid)

#结果
I (86645) just created a child process (86646).
I am child process (86646) and my parent is (86645).

下验证了子进程从pid=os.fork()之后开始执行


[root@VM-0-15-centos ~]# cat tt.py
import os

print('start fork...')
pid = os.fork()
print('fork finish..')


if pid < 0:
    print('create False.')
elif pid == 0:
    print('I am child_proc [%d]' % (os.getpid()))
else:
    print('I am father_proc [%d]' % (os.getpid()))




[root@VM-0-15-centos ~]#
[root@VM-0-15-centos ~]# python3 tt.py
start fork...
fork finish..
I am father_proc [25188]
fork finish..
I am child_proc [25189]
[root@VM-0-15-centos ~]#

多进程

p=Process(target=child_proc, args=(x,))

import os
from multiprocessing import Process

# 子进程要执行的代码
def child_proc(name):
    print 'Run child process %s (%s)...' % (name, os.getpid())

if __name__ == '__main__':
    print 'Parent process %s.' % os.getpid()
    p = Process(target=child_proc, args=('test',))
    print 'Process will start.'
    p.start()
    p.join() #阻塞所有其他进程(包括父进程)
    print 'Process end.'
#结果
Parent process 7170.
Process will start.
Run child process test (10075)...
Process end.

from multiprocessing import Process’

multiprocessing与平台有关,同一段代码:

import random
import os
from multiprocessing import Process

num = random.randint(0, 100)

def show_num():
    print("pid:{}, num is {}".format(os.getpid(), num))

if __name__ == "__main__":
    print("pid:{}, num is {}".format(os.getpid(), num))
    p = Process(target=show_num) #创建新进程
    p.start()
    p.join()

windows

pid:6504, num is 25
pid:6880, num is 6

Linux

pid:11747, num is 13
pid:11748, num is 13

我们发现window下num结果竟然不一样???
个人理解:
windows没有fork(),就不能像Linux那样让子进程从被创建的哪段代码之后开始执行。

spawn/fork/forkserver

这是因为根据系统平台的不同multiprocessing提供了3种启动进程的方式

spawn

父进程会启动一个新的解释器,子进程只会继承run()所需的资源。 不必要的文件描述符和句柄(一种指针)不会被继承。 该方法和fork,forkserver相比,启动进程较慢。

可在Unix和Windows上使用。 Windows上的默认设置。

fork

父进程使用 os.fork() 来产生 Python 解释器分叉。 子进程在开始时实际上与父进程相同,并且会继承父进程的所有资源。 多线程的安全是有问题的。

只能在Unix上使用。Unix上的默认设置。

forkserver

程序启动并选择forkserver启动方法时,将启动一个服务器进程。 之后每当需要一个新进程时,父进程就会连接到服务器并请求它分叉一个新进程。 分叉服务器进程是单线程的,因此使用 os.fork() 是安全的。 没有不必要的资源被继承。

可在Unix平台上使用,并支持通过Unix管道传递文件描述符。

进程池

可以使用进程池来创建多个进程

p=Pool(n)

p.apply_async(chile_func, args=(a,))

import os, time
from multiprocessing import Pool

def foo(x):
    print 'Run task %s (pid:%s)...' % (x, os.getpid())
    time.sleep(2)
    print 'Task %s result is: %s' % (x, x * x)

if __name__ == '__main__':
    print 'Parent process %s.' % os.getpid()
    p = Pool(4)         # 设置进程数
    for i in range(5):
        p.apply_async(foo, args=(i,))    # 设置每个进程要执行的函数和参数
    print 'Waiting for all subprocesses done...'
    p.close() #关闭进程池
    p.join()  #等待所有子进程执行完毕
    print 'All subprocesses done.'

#结果
Parent process 7170.
Run task 1 (pid:10320)...
Run task 0 (pid:10319)...
Run task 3 (pid:10322)...
Run task 2 (pid:10321)...
Waiting for all subprocesses done...
Task 1 result is: 1
Task 0 result is: 0
Run task 4 (pid:10320)...
Task 3 result is: 9
Task 2 result is: 4
Task 4 result is: 16
All subprocesses done.

进程通信

q=Queue()

p1=Processing(func_1, args=(q,))/q.put(message)

p2=Processing(func_2, args=(q,))/q.get(message)

# -*- coding: utf-8 -*-

from multiprocessing import Process, Queue

# 向队列中写入数据
def write_task(q):
    try:
        n = 1
        while n < 5:
            print "write, %d" % n
            q.put(n)
            time.sleep(1)
            n += 1
    except BaseException:
        print "write_task error"
    finally:
        print "write_task end"

# 从队列读取数据
def read_task(q):
    try:
        n = 1
        while n < 5:
            print "read, %d" % q.get()
            time.sleep(1)
            n += 1
    except BaseException:
        print "read_task error"
    finally:
        print "read_task end"

if __name__ == "__main__":
    q = Queue()  # 父进程创建Queue,并传给各个子进程

    pw = Process(target=write_task, args=(q,))
    pr = Process(target=read_task, args=(q,))

    pw.start()   # 启动子进程 pw,写入
    pr.start()   # 启动子进程 pr,读取
    pw.join()    # 等待 pw 结束
    pr.join()    # 等待 pr 结束
    print "DONE"
	
	
	
#结果
write, 1
read, 1
write, 2
read, 2
write, 3
read, 3
write, 4
read, 4
write_task end
read_task end
DONE

线程

线程是进程的一个实例,一个进程至少有一个线程。
区分
进程(CPU分配资源的基本单位)之间相互独立,不共享数据,一个崩溃不影响其他进程。
多个线程(CPU调度分派的基本单位)共享同一个进程的数据,一个崩溃全部崩溃。

t=Thread(child_thre, args=(a, ), name=’xxx’)

from threading import Thread, current_thread

def hello(message):
    print('%s, I am %s' % (message, current_thread().name))
    print('bye~')

if __name__ == '__main__':
    message='HELLO'
    print('%s, I AM %s' % (message, current_thread().name))
    t = Thread(target=hello, args=('hello',), name='child_thread')
    t.start()
    t.join()
    print('BYE~')
	
	
#结果
D:pythonpython.exe E:/PYTHON/Basics/Fun/HelloYoutube.py
HELLO, I AM MainThread
hello, I am child_thread
bye~
BYE~

Process finished with exit code 0

l = Lock()

l.acquire()

l.release()

线程共享进程的数据,就会遇到线程同步的问题

from threading import Thread, current_thread

count = 0


def calc():
    global count
    print('Hello, this is %s. ' % current_thread().name)
    for _ in range(1000000):  #每个线程对共享数据count做一百万次加一操作
        count += 1

    print('thread %s is finish.' % current_thread().name)


if __name__ == '__main__':
    print('HELLO, THIS IS %s' % current_thread().name)
    t_list = []

    for i in range(5):
        t_list.append(Thread(target=calc)) #创建5个线程
        t_list[i].start()

    for i in t_list:
        i.join()

    print('count = %s' % count)
    print('THREAD %s FINISH.' % current_thread().name)

#结果
D:pythonpython.exe E:/PYTHON/Basics/Fun/HelloYoutube.py
HELLO, THIS IS MainThread
Hello, this is Thread-1. 
Hello, this is Thread-2. 
Hello, this is Thread-3. 
thread Thread-1 is finish.
thread Thread-2 is finish.
Hello, this is Thread-4.
thread Thread-3 is finish.
Hello, this is Thread-5. 
thread Thread-4 is finish.
thread Thread-5 is finish.
count = 3567886
THREAD MainThread FINISH.

Process finished with exit code 0

结果为什么不是5000000?
count在被一个线程读走还未完成加一操作,又被另一个线程读走,导致读脏数据。
我还发现一个有趣的现象:
在def calc()中打印每次count做加一操作后的值,当循环次数比较小如十万时。数字不是递增1有的会差几十,但最终count的值却是正确的(五十万)

我们可用锁解决这个问题

import time
from threading import Thread, current_thread, Lock
start = time.time()

count = 0
lock = Lock()

def calc():
    global count
    print('Hello, this is %s. ' % current_thread().name)
    for _ in range(1000000):
        lock.acquire() #获得锁
        count += 1
        lock.release() #释放锁

    print('thread %s is finish.' % current_thread().name)


if __name__ == '__main__':
    print('HELLO, THIS IS %s' % current_thread().name)
    t_list = []

    for i in range(5):
        t_list.append(Thread(target=calc))
        t_list[i].start()

    for i in t_list:
        i.join()

    print('count = %s' % count)
    print('THREAD %s FINISH.' % current_thread().name)

end = time.time()
print('Running time:%d' % (end-start))

# 结果
D:pythonpython.exe E:/PYTHON/Basics/Fun/HelloYoutube.py
HELLO, THIS IS MainThread
Hello, this is Thread-1. 
Hello, this is Thread-2. 
Hello, this is Thread-3. 
Hello, this is Thread-4. 
Hello, this is Thread-5. 
thread Thread-1 is finish.
thread Thread-2 is finish.
thread Thread-3 is finish.
thread Thread-4 is finish.thread Thread-5 is finish.

count = 5000000
THREAD MainThread FINISH.
Running time:13 #明显感觉到执行时间变长了,13秒!!!,不加锁是0秒(取整)。

Process finished with exit code 0

GIL info

讲到 Python 中的多线程,就不得不面对 GIL 锁,GIL 锁的存在导致 Python 不能有效地使用多线程实现多核任务,因为在同一时间,只能有一个线程在运行。

GIL 全称是 Global Interpreter Lock,译为全局解释锁。早期的 Python 为了支持多线程,引入了 GIL 锁,用于解决多线程之间数据共享和同步的问题。但这种实现方式后来被发现是非常低效的,当大家试图去除 GIL 的时候,却发现大量库代码已重度依赖 GIL,由于各种各样的历史原因,GIL 锁就一直保留到现在。

global_data = local()

我们知道多个线程共享一个数据,如何让线程拥有自己独有的数据?,有两种不完美的方法:
1.在函数中定义局部变量,但不便于修改。

from threading import Thread, current_thread

def echo(num):
    print current_thread().name, num

def calc():
    print 'thread %s is running...' % current_thread().name
    local_num = 0
    for _ in xrange(10000):
        local_num += 1
    echo(local_num)
    print 'thread %s ended.' % current_thread().name

if __name__ == '__main__':
    print 'thread %s is running...' % current_thread().name

    threads = []
    for i in range(5):
        threads.append(Thread(target=calc))
        threads[i].start()
    for i in range(5):
        threads[i].join()

    print 'thread %s ended.' % current_thread().name
# 结果 很显然确实互不影响
thread MainThread is running...
thread Thread-4 is running...
Thread-4 10000
thread Thread-4 ended.
thread Thread-5 is running...
Thread-5 10000
thread Thread-5 ended.
thread Thread-6 is running...
Thread-6 10000
thread Thread-6 ended.
thread Thread-7 is running...
Thread-7 10000
thread Thread-7 ended.
thread Thread-8 is running...
Thread-8 10000
thread Thread-8 ended.
thread MainThread end

2.把线程独有数据存在同一个dict{key=current_thread,value=data}中,却又使得每个线程都可访问该数据,非常不安全。

from threading import Thread, current_thread

global_dict = {}

def echo():
    num = global_dict[current_thread()]    # 线程根据自己的 ID 获取数据
    print current_thread().name, num

def calc():
    print 'thread %s is running...' % current_thread().name
    
    global_dict[current_thread()] = 0
    for _ in xrange(10000):
        global_dict[current_thread()] += 1
    echo()
    
    print 'thread %s ended.' % current_thread().name

if __name__ == '__main__':
    print 'thread %s is running...' % current_thread().name

    threads = []
    for i in range(5):
        threads.append(Thread(target=calc))
        threads[i].start()
    for i in range(5):
        threads[i].join()

    print 'thread %s ended.' % current_thread().name

# 结果
thread MainThread is running...
thread Thread-64 is running...
thread Thread-65 is running...
thread Thread-66 is running...
thread Thread-67 is running...
thread Thread-68 is running...
Thread-67 10000
thread Thread-67 ended.
Thread-65 10000
thread Thread-65 ended.
Thread-68 10000
thread Thread-68 ended.
Thread-66 10000
thread Thread-66 ended.
Thread-64 10000
thread Thread-64 ended.
thread MainThread ended.


local()应运而生!!!

global_data.num = 0

local()提供了一种线程绑定自己独有数据的方式。

from threading import Thread, current_thread, local

global_data = local()

def echo():
    num = global_data.num
    print current_thread().name, num

def calc():
    print 'thread %s is running...' % current_thread().name
    
    global_data.num = 0 #对于不同的线程global_data.num的值是不同的
    for _ in xrange(10000):
        global_data.num += 1
    echo()
    
    print 'thread %s ended.' % current_thread().name

if __name__ == '__main__':
    print 'thread %s is running...' % current_thread().name

    threads = []
    for i in range(5):
        threads.append(Thread(target=calc))
        threads[i].start()
    for i in range(5):
        threads[i].join()

    print 'thread %s ended.' % current_thread().name

# 结果

thread MainThread is running...
thread Thread-94 is running...
thread Thread-95 is running...
thread Thread-96 is running...
thread Thread-97 is running...
thread Thread-98 is running...
Thread-96 10000
thread Thread-96 ended.
Thread-97 10000
thread Thread-97 ended.
Thread-95 10000
thread Thread-95 ended.
Thread-98 10000
thread Thread-98 ended.
Thread-94 10000
thread Thread-94 ended.
thread MainThread ended.


协程

一次函数调用,只有一个入口,一次返回,调用顺序明确。
1.协程有多个入口对函数进行中断、继续执行
2.相对于多线程,它只在一个线程中执行,避免了线程间的切换开销、访问冲突。
可通过创建协程将异步编程同步化
技术支持
python2 yield
python3.5 async/await
第三方库gevent

n = yield data

Python2 可以通过 yield 来实现基本的协程,但不够强大,第三方库 gevent 对协程提供了强大的支持。另外,Python3.5 提供了 async/await 语法来实现对协程的支持。本文只讨论通过 yield 来实现协程。

#生产者消费者问题

import time

def consumer(): #带有yild的函数被叫做生成器generator
    message = ''
    while True:
        n = yield message     # yield 使函数中断
        if not n:
            return
        print '[CONSUMER] Consuming %s...' % n
        time.sleep(2)
        message = '200 OK'

def produce(c):
    c.next()           # 启动生成器 ~~我理解为生成器被中断后next下一个开始执行的位置~~
	               # python3 用 c.__next__()
    n = 0
    while n < 5:
        n = n + 1
        print '[PRODUCER] Producing %s...' % n
        r = c.send(n)  # 通过 send 切换到 consumer 执行
        print '[PRODUCER] Consumer return: %s' % r
    c.close()

if __name__ == '__main__':
    c = consumer()
    produce(c)

# 结果
[PRODUCER] Producing 1...
[CONSUMER] Consuming 1...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 2...
[CONSUMER] Consuming 2...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 3...
[CONSUMER] Consuming 3...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 4...
[CONSUMER] Consuming 4...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 5...
[CONSUMER] Consuming 5...
[PRODUCER] Consumer return: 200 OK


#声音太小的女演员

def actress():
    message = '[旁白]:actress is 表演中...'
    while True:
        n = yield message  # 中断
        print(n)
        if not n:
            return
        message = '[actress]:Why stout me again ?'


def diractor(a):
    a.__next__()  # 中断后下一步
    message = '[diractor]:You should Shout louder !!!'
    for i in range(5):
        response = a.send(message)
        print(response)

    a.close()


if __name__ == '__main__':
    a = actress()
    diractor(a)

#结果
D:pythonpython.exe E:/PYTHON/Basics/Fun/HelloYoutube.py
[diractor]:You should Shout louder !!!
[actress]:Why stop me again ?
[diractor]:You should Shout louder !!!
[actress]:Why stop me again ?
[diractor]:You should Shout louder !!!
[actress]:Why stop me again ?
[diractor]:You should Shout louder !!!
[actress]:Why stop me again ?
[diractor]:You should Shout louder !!!
[actress]:Why stop me again ?

Process finished with exit code 0

怎么有了tcp的感觉(以下是我的胡扯)?

客户端启动 终端等待服务端回复=> n = yield message
客户端唤醒服务端 启动=> a.__next__()

服务端 发=> send(message)
服务端 收=> response = send(message)

客户端 发=> yield message
客户端 收=> n = yield message

参考

explore-python/Process-Thread-coroutine


________________________________________________________


Every good deed you do will someday come back to you.


Love you,love word !

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注