分类目录归档:python

python入门与提高::::::演道网python专栏提供一线python研发人员在学习工作中的经验,减少大家走的弯路,大量源码可以直接使用。

埃拉托斯特尼筛法求素数

算法

要得到自然数n以内的全部素数,必须把不大于 的所有素数的倍数剔除,剩下的就是素数。 [1]
给出要筛数值的范围n,找出以内的素数。先用2去筛,即把2留下,把2的倍数剔除掉;再用下一个质数,也就是3筛,把3留下,把3的倍数剔除掉;接下去用下一个质数5筛,把5留下,把5的倍数剔除掉;不断重复下去……。

python实现

def print_prime(N):
    ret = list()
    for i in range(2, N):
        ret.append(i)
    idx = 0
    def remove_multiple(n):
        for v in ret:
            if v%n==0 and v//n > 1:
                ret.remove(v)
    while ret[idx]*ret[idx] <= N-1:
        remove_multiple(ret[idx])
        idx+=1
    print(ret)


if __name__ == '__main__':
    print_prime(10000)

参考

算法说明

安装python+flask的博客系统blog_mini

安装python+flask的博客系统blog_mini

根据指引安装出了点问题。我的环境是mac+python3.5。参考博客
1. 报错,name reload not defined。

这个好解决。增加from importlib import reload(其实没有必要)

2.报错.sys has no attribute setdefaultencodeing。

这个是因为python3.5默认是utf-8了。这个函数干掉了。。所以直接把这行代码干掉。并且之前的reload也是python2.x中为了setdefaultencodeing服务的。所以from importlib import reload 和 reload(sys)也可以干掉了。

3.报错 ImportError: No module named ‘MySQLdb’。

这个其实也没有必要了。python3.中用pymysql替代了。
解决办法时,修改DATABASE_URL=mysql+pymysql://root@127.0.0.1/blog_mini。
增加’+pymysql’后就会用pymysql的api。不过你得安装pymysql,pip install pymysql。
这个比MySQLdb容易安装多了。那个不能通过pip安装。

我fork了一个。修改后的版本:https://github.com/bjmayor/Blog_mini
因为安装了两个项目。把DATABASE_URL改成BLOG_DATABASE_URL了。
另外我搞了个fab发布。可以参考修改下。fab主要是参考的廖雪峰的python教程。

我安装完成的网站: python.go2live.cn

感觉主要是参考的Flask Web开发 基于Python的Web应用开发实战
我会根据这本书的代码对这个代码再做些优化,譬如加上用户权限,加上测试用例,去掉bootstrap的文件,直接引用flask_bootstrap。

同时也会参考wordpress的数据库结构做些优化,再加上图片上传什么的。

慢慢优化吧。懒得从头搞起,就以这个blog_mini为起点了。

  1. 个人喜欢用markdonw语法。。所以第一个要把这个改掉。得支持markdown。

Python模块学习:tempfile

应用程序经常要保存一些临时的信息,这些信息不是特别重要,没有必要写在配置文件里,但又不能没有,这时候就可以把这些信息写到临时文件里。其实很多程序在运行的时候,都会产生一大堆临时文件,有些用于保存日志,有些用于保存一些临时数据,还有一些保存一些无关紧要的设置。在windows操作系统中,临时文件一般被保存在这个文件夹下:C:/Documents and Settings/User/Local Settings/Temp。其实我们最常用的IE浏览器在浏览网页的时候,会产生大量的临时文件,这些临时文件一般是我们浏览过的网页的本地副本。Python提供了一个tempfile模块,用来对临时数据进行操作。查阅Python手册,里面介绍了如下常用的方法:

tempfile.mkstemp([suffix=”[, prefix=’tmp'[, dir=None[, text=False]]]])

mkstemp方法用于创建一个临时文件。该方法仅仅用于创建临时文件,调用tempfile.mkstemp函数后,返回包含两个元素的元组,第一个元素指示操作该临时文件的安全级别,第二个元素指示该临时文件的路径。参数suffix和prefix分别表示临时文件名称的后缀和前缀;dir指定了临时文件所在的目录,如果没有指定目录,将根据系统环境变量TMPDIRTEMP或者TMP的设置来保存临时文件;参数text指定了是否以文本的形式来操作文件,默认为False,表示以二进制的形式来操作文件。

tempfile.mkdtemp([suffix=”[, prefix=’tmp'[, dir=None]]])

该函数用于创建一个临时文件夹。参数的意思与tempfile.mkdtemp一样。它返回临时文件夹的绝对路径。

tempfile.mktemp([suffix=”[, prefix=’tmp'[, dir=None]]])

mktemp用于返回一个临时文件的路径,但并不创建该临时文件。

tempfile.tempdir

该属性用于指定创建的临时文件(夹)所在的默认文件夹。如果没有设置该属性或者将其设为None,Python将返回以下环境变量TMPDIR, TEMP, TEMP指定的目录,如果没有定义这些环境变量,临时文件将被创建在当前工作目录。

tempfile.gettempdir()

gettempdir()则用于返回保存临时文件的文件夹路径。

tempfile.TemporaryFile([mode=’w+b'[, bufsize=-1[, suffix=”[, prefix=’tmp'[, dir=None]]]]])

该函数返回一个 类文件 对象(file-like)用于临时数据保存(实际上对应磁盘上的一个临时文件)。当文件对象被close或者被del的时候,临时文件将从磁盘上删除。mode、bufsize参数的单方与open()函数一样;suffix和prefix指定了临时文件名的后缀和前缀;dir用于设置临时文件默认的保存路径。返回的类文件对象有一个file属性,它指向真正操作的底层的file对象。

tempfile.NamedTemporaryFile([mode=’w+b'[, bufsize=-1[, suffix=”[, prefix=’tmp'[, dir=None[, delete=True]]]]]])

tempfile.NamedTemporaryFile函数的行为与tempfile.TemporaryFile类似,只不过它多了一个delete参数,用于指定类文件对象close或者被del之后,是否也一同删除磁盘上的临时文件(当delete = True的时候,行为与TemporaryFile一样)。

tempfile.SpooledTemporaryFile([max_size=0[, mode=’w+b'[, bufsize=-1[, suffix=”[, prefix=’tmp'[, dir=None]]]]]])

tempfile.SpooledTemporaryFile函数的行为与tempfile.TemporaryFile类似。不同的是向类文件对象写数据的时候,数据长度只有到达参数max_size指定大小时,或者调用类文件对象的fileno()方法,数据才会真正写入到磁盘的临时文件中。 蛮简单、实用的一个模块,不是吗?

python自动化工具从0到invork&ansible

原文出处: classam   译文出处:ictar

很久以前,当我第一次读到“程序员修炼(The Pragmatic Programmer)”时,我读了一些让我真真难以忘怀的建议。

“不要使用手工流程(Don’t Use Manual Procedures)”。

这在Ubiquitous Automation一节。总之,他们希望你将所有的事情都自动化。

麻烦的是,我对于如何真正的让任何东东都自动化并没有太多的想法。当时,我仍困于这样一个世界,其中所有的程序都是一个使用CLI的庞大的Java程序,而我的印象是,命令行界面无非是过去时代的一个垂死残余。

我的一些假装自动化之旅充满了陷阱和自我无能的尴尬故事,而我希望与大家分享这些故事。

bash脚本编程

与命令行系统交互的陷阱之一是,每一个复杂的交互使用一个同样复杂的命令 —— 而对于那些我可能会一次又一次运行的命令,每次我都将不得不记得精确的调用。

大多数时候,这个可以工作,虽然每次都要花不少的时间上谷歌或StackOverflow搜索。

关于幻想小说的一个持久的事情是,你总能看到追随者使用魔杖,但没有魔法书。这是因为他们都是愚蠢的。奇才携带魔法书。

所以,我开始写下我的命令,这样我就不会忘记它们。而不是将这些命令写到便利贴,或笔记本上,我将它们写到我的home目录下。

Python

1
2
3
$ cat work
ssh classam@workcomputer.blorf -P 8888 -p=hunter2

起初,我只想cat这个文件,然后将该命令复制回命令行。

Python

1
2
3
4
5
$ cat work
ssh classam@workcomputer.blorf -P 8888 -p=hunter2
$ ssh classam@workcomputer.blorf -P 8888 -p=hunter2
Welcome to Work Computer. Blorf.

我和你说了,我会分享我尴尬无能的故事。

我很快意识到,没有真的有打算,我写的是bash脚本 – 我只是没有明智地执行它们。如果我只是告诉系统我要像执行程序一样执行这个文件,那么它应该足够明智地工作。

Python

1
2
3
4
$ chmod u+x work
$ ./work
Welcome to Work Computer. Blorf.

 

Hashbang

仅凭这一点是不够好到让这个脚本正常工作的,但我们没有做很多的事来帮助系统弄清楚到底如何运行这个我们传给它的神秘的脚本。

让我们想象下我们创建了一个文件:

Python

1
2
print(“hello”)

这在Python 3中是有效的,但在Python 2中无效。如果我们将这个文件保存为’hello’,那么我们基本不知道怎样运行它。而如果我们尝试运行,计算机将会进行猜测,然后它会猜错。

Python

1
2
3
$ ./hello
idfk, man, that doesn’t look right to me

如果我们将文件命名为hello.py,给我们自己一个方便的线索以便提示自己这个是个怎么样的文件,那么我们可以使用Python解释器来执行它。

Python

1
2
3
4
5
# python hello.py
that’s python 3, man, I’m python 2. Whateva.
# python3 hello.py
hello

好吧,它工作了,但对于程序调用来说压力山大。七十年代的一个Unix创新可以消除这个问题:hashbang。在我们脚本的开头,我们可以准确的声明想使用哪个解释器。

Python

1
2
3
4
5
6
#!python3
print(“hello”)
#!bash
echo “hello”

这里唯一的规则是,编译器必须存在于系统的$PATH变量中。当你在本地运行该脚本时,是挺棒的,但如果你远程运行脚本而不使用一个伪终端会话集时,你或许并没有一个$PATH变量,这将导致你的hashbang声明失败。

解决方法?hashbang可以包含你打算用于程序的解释器的完整路径。

Python

1
2
3
4
5
6
#!/bin/python3
print(“hello”)
#!/bin/bash
echo “hello”

在很多现代编程中,你没有看到这个令人兴奋的技术,因为它并不是引人注目的可便携的 —— 你用来执行一个python程序的解释器的位置,甚至是名字,往往系统与系统之间并不相同。

虽然,bash通常位于/bin/bash,这就是为嘛sun下的每个bash程序都用#!/bin/bash打开。

Bashy Bash

所以,不久以后,我的home目录到处都开始充斥着名字诸如work.shtest.sh这样的有用的小的bash文件。

Alias

如果你在Django中编程,你可能还记得,你可以在Django环境中调用的每个命令都是通过直接引用程序manage.py开始的。

我也许像这样启动Django开发服务器:

Python

1
2
3
$ cd ~/code/django_project
$ ./manage.py runserver 0:8000

这很简单,但是,一天内我必须启动那个Django开发服务器无数次!

所有一切仅需对我的.bashrc文件稍微做个调整:

Python

1
2
alias dj=”/home/classam/code/django_project/manage.py”

突然之间,无论我当前的工作目录是什么,都可以用下面这个简单的命令启动服务器

Python

1
2
$ dj runserver 0:8000

这是一种全新的令人兴奋的感觉。alias工具会工作良好。

Make

随着我构建了越来越多微小的自动化步骤到我的编程环境,为每个我想要执行的命令使用一个单独的文件开始变得不方便。

来到Make工具。

现在,这完全不是什么化妆工具 – Make是一个构建工具!它像下面这样将一个项目的所有自动化全部安排在了一个make文件中,从而满足了我的需求:

Python

1
2
3
4
5
6
7
8
9
run:
    manage.py runserver 0:8000
shell:
    manage.py shell
test:
    nosetests /home/vagrant/code/things
lint:
    pyflakes /home/vagrant/code/things –no-bitching-about-long-lines

然后,我可以运行我那一群杂七杂八的任务,无需非得与一组分离的shell文件交互,像这样:

Python

1
2
$ make lint

归根结底,这不比shell文件稳定或便利得多,并且它为一堆的项目引入了一个奇怪的Make依赖,而一个Make依赖并不具备任何实际意义。

但它确实将我引入了我的旅途中的下一站:尝试使用一个task-runner来取代使用脚本。

Fabric

为嘛不用一个也存在于Python世界中的依赖来取代一个Make依赖呢?欢迎来到Fabric,一个优秀的Python命令运行器。如果比之Python,你对Ruby更加了解,那么你可能会记得Fabric邪恶的对手,Capistrano。

Fabric允许我像这样结构化我的测试运行:

Python

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from fabric.api import run
def go():
    “””
    Documentation documentation documentation.
    “””
    run(‘manage.py runserver 0:8000’)
def test():
    run(‘nosetests /home/vagrant/code/things’)
def lint():
    run(‘pyflakes /home/vagrant/code/things –no-bitching-about-long-lines’)

然后,我可以这样调用:

Python

1
2
$ fab go

我们开始吧,我们所有的工具脚本都在同一个便利的地方。

这相当漂亮,而我开始使用它来做任何事。甚至是部署远程服务器这种棘手的问题 —— Fabric支持得相当好。

只要告诉它SSH到一个远程服务器,或者一堆远程服务器一次,然后fabric能够在所有它们上运行我的脚本。

我开始用Fabric来搭建服务器。新的服务器会需要PostgreSQL和nginx,而让脚本为我设置这些,会比必须手动处理所有繁琐的安装步骤棒得多。

Ansible

Fabric很棒,但有几件事它做的没那么漂亮。

在调试Fabric任务时,我在同一个服务器一遍又一遍的运行任务,而每一次它都会从头开始,运行整个脚本。如果有一个步骤很慢,那么它将会拖慢我的调试过程。

这意味着,当我使用它们时,我只会注释掉脚本的大块内容,测试脚本“头部”的同时留下尾部注释,这样我就可以推进我的自动化了。

有很多操作,第一次运行的时候会通过,但后续就全失败了,像“创建一个数据库”或者“创建一个目录”,这些操作第二次运行会失败,因为在我第二次运行的时候,该数据库或者目录已经存在了。

问题是啥?大多数的Fabric操作缺少”幂等性” – 一个五美元词,表示”每次运行同个操作总会产生相同的结果。”

另外一个问题是大量的配置文件。我发现自己经常要么编写丑陋的Bash操作来对大的配置文件进行小手术,要么使用字符串工具在python中构建配置文件,然后将它们放入到适当的位置。

最后一个问题?证书。为了让系统正常工作,证书需要放在哪里这个问题从来就还没搞得很清楚。我最终决定采取环境变量,但这意味着我建立的每个新系统,我都要马上立即提交三十几个变量到一个.bashrc文件中。

这仍然工作得很好,但我们可以做的更好。幸运的是,那里有一个框架,可以带这些属性运行操作,并且还让我们牢牢地处在Python的范畴之内。Ansible。

Ansible提供了一种YAML格式,它可以用来描述一个工作中的系统,然后它将通过一个SSH连接构建那个系统。一旦我熬过了YAML中固有的讨人厌的“编码”,这就很棒!除其他事项外,Ansible还包含一个全功能的模板系统,用于创建配置文件,以及带双向加密的密码存储,用以隐匿重要的配置凭据。

Ansible解决了很多问题。

Invoke

但是Ansible没有解决的一个问题是啥?它不是一个很好的任务运行器。

事实上,Ansible命令可以彻头彻尾的神秘运行。

在一组服务器上运行一个playbook看起来可能像这样:

Python

1
2
ansible-playbook -i inventory.yml mongo -u root –vault-password-file ~/.vaultpw mongo.yml

我不了解你,但这不是那种我可以轻松交给肌肉记忆的事情。

所以,再一次,我们需要一个任务运行器。我们可以回到Fabric,但是Fabric的维护者基本上已经放弃了它,转而支持一个正显示出一些第二系统效应的明确迹象的非常野心勃勃的2.0版本 —— 最明显的征兆是,它已经开发4年了,但还没有看到光明的一天。

所以Fabric现在不予考虑 —— 但同时Fabric 2.0在夹缝中生存,Fabric 2.0一个半完成的块已经出现了。虽然在功能上有所限制,它,额……比Fabric得多。

它称为Invoke,并且它只提供Fabric一半的”任务运行器(task runner)”,而不提供任何一点”ssh”或者”deployment”。但这就是我们想要的!如此完美!

所以我们可以像这样封装我们的ansible部署:

Python

1
2
3
4
5
6
from invoke import task, run
@task
def provision_mongo(ctx):
    ctx.run(‘ansible-playbook -i inventory.yml mongo -u root –vault-password-file ~/.vaultpw mongo.yml’)

然后像这样运行它:

Python

1
2
$ inv provision_mongo

我们可以用我们用来运行剩余应用的实用脚本来包含它。

Python

1
2
3
4
5
6
7
8
@task
def go(ctx):
    ctx.run(‘manage.py runserver 0:8000’)
@task
def test(ctx):
    ctx.run(‘nosetests /home/vagrant/code/things’)

Invoke具有更多的功能,但是涵盖所有将会超出这个已经过长的博文的范围了。

结论

显然的,我的可重复、有用的项目自动化之旅还有很长的路要走,但是我已经在invokeansible的交界处明确地发现了一些非常有用工具。

我们获得了Python所有的可组合性,Ansible所有的实用性,以及一个task runner所有的便利性。

P.S.

一个最后的牢骚:Invoke漂亮地支持Python 3,但是Ansible仍然绑在了Python 2的Python黑暗时代,所以为了同时运行Invoke和Python,我们必须接受一个次等的Python。

该死。

P.P.S.

据我所知,Javascript没有像这样的东东。我能找到最接近的东西是gulp。难道我将不得不满足于gulp吗?Yegh.

转载自演道,想查看更及时的互联网产品技术热点文章请点击http://go2live.cn

Python 中的枚举类型

枚举类型可以看作是一种标签或是一系列常量的集合,通常用于表示某些特定的有限集合,例如星期、月份、状态等。Python 的原生类型(Built-in types)里并没有专门的枚举类型,但是我们可以通过很多方法来实现它,例如字典、类等:

WEEKDAY = {
    'MON': 1,
    'TUS': 2,
    'WEN': 3,
    'THU': 4,
    'FRI': 5
}
class Color:
    RED   = 0
    GREEN = 1
    BLUE  = 2

上面两种方法可以看做是简单的枚举类型的实现,如果只在局部范围内用到了这样的枚举变量是没有问题的,但问题在于它们都是可变的(mutable),也就是说可以在其它地方被修改从而影响其正常使用:

WEEKDAY['MON'] = WEEKDAY['FRI']
print(WEEKDAY)

{‘FRI’: 5, ‘TUS’: 2, ‘MON’: 5, ‘WEN’: 3, ‘THU’: 4}

通过类定义的枚举甚至可以实例化,变得不伦不类:

c = Color()
print(c.RED)
Color.RED = 2
print(c.RED)

0
2

当然也可以使用不可变类型(immutable),例如元组,但是这样就失去了枚举类型的本意,将标签退化为无意义的变量:

COLOR = ('R', 'G', 'B')
print(COLOR[0], COLOR[1], COLOR[2])

R G B

为了提供更好的解决方案,Python 通过PEP 435 在 3.4 版本中添加了 enum标准库,3.4 之前的版本也可以通过 pip install enum 下载兼容支持的库。enum 提供了 Enum,IntEnum,unique三个工具,用法也非常简单,可以通过继承 Enum/IntEnum 定义枚举类型,其中 IntEnum 限定枚举成员必须为(或可以转化为)整数类型,而 unique 方法可以作为修饰器限定枚举成员的值不可重复:

from enum import Enum, IntEnum, unique
 
try:
    @unique
    class WEEKDAY(Enum):
        MON = 1
        TUS = 2
        WEN = 3
        THU = 4
        FRI = 1
except ValueError as e:
    print(e)

duplicate values found in : FRI -> MON

try:
    class Color(IntEnum):
        RED   = 0
        GREEN = 1
        BLUE  = 'b'
except ValueError as e:
    print(e)

invalid literal for int() with base 10: ‘b’

更有趣的是 Enum的成员均为单例(Singleton),并且不可实例化,不可更改:

class Color(Enum):
    R = 0
    G = 1
    B = 2

try:
    Color.R = 2
except AttributeError as e:
    print(e)

Cannot reassign members.

虽然不可实例化,但可以将枚举成员赋值给变量:

red = Color(0)
green = Color(1)
blue = Color(2)
print(red, green, blue)

Color.R Color.G Color.B

也可以进行比较判断:

print(red is Color.R)
print(red == Color.R)
print(red is blue)
print(green != Color.B)
print(red == 0) # 不等于任何非本枚举类的值

True
True
False
True
False

最后一点,由于枚举成员本身也是枚举类型,因此也可以通过枚举成员找到其它成员:

print(red.B)
print(red.B.G.R)

Color.B
Color.R

但是要谨慎使用这一特性,因为可能与成员原有的命名空间中的名称相冲突:

print(red.name, ':', red.value)
 
class Attr(Enum):
    name  = 'NAME'
    value = 'VALUE'
print(Attr.name.value, Attr.value.name)

R : 0
NAME value

总结
enum 模块的用法很简单,功能也很明确,但是其实现方式却非常值得学习。如果你想更深入了解更多 Python 中关于 ClassMetaclass的黑魔法,又不知道如何入手,那么不妨阅读一下 enum源码,或者关注接下来后面几篇的内容!

Python元类

Python元类

从某种意义上讲,元类只是扩展了装饰器的代码插入模式。
元类主要是针对那些构建API和工具供他人使用的程序员。

Python构建工具:

  1. 内省属性。如__class__、__dict__
  2. 运算符重载方法。如__str__、__add__
  3. 属性拦截方法。如__getattr__、__setattr__、__getattribute__。
  4. 类特性。内置函数property。拦截特定的属性。
  5. 类属性描述符。拦截特定的属性。特定只是定义根据访问自动运行函数的属性描述符的一种简洁方式。
  6. 函数和类装饰器。
  7. 元类。

元类允许我们在在一条class语句的末尾,插入当创建一个类对象的时候自动运行的逻辑。
这个逻辑不会把类名重新绑定到一个装饰器可调用对象,而是把类自身的创建指向特定的逻辑。

和类装饰器不同,它通常是添加实例创建时运行的逻辑,元类在类创建时运行。
同样的,它们都是通常用来管理或扩展类的钩子,而不是管理其实例。

通过声明一个元类,我们告诉Python把类对象的创建路由到我们所提供的另一个类:

由于创建类的时候,Python在class语句的末尾自动调用元类,因此它可以根据需要扩展、注册或管理类。

类也是某物的实例:

  • 在Python3.0中,用户定义的类对象是名为type的对象的实例,type本身是一个类。
  • 在Python2.6中,新式类继承自object,它是type的一个子类;传统类是type的一个实例,并且并不创建自一个类。
    实例创建自类,而类创建自type。
    类是类型,类型也是类
  • 类型由派生自type的类定义。
  • 用户定义的类是类型类的实例。
  • 用户定义的类是产生它们自己的实例的类型。

类根本不是一个独立的概念:它们就是用户定义的类型,并且type自身也是由一个类定义的。

由于类实际上是type类的实例,从type的定制的子类创建类允许我们实现各种定制的类。
在Python3.0中以及在Python2.6的新式类中:

  • type是产生用户定义的类的一个类。
  • 元类是type类的一个子类。
  • 类对象是type类的一个实例,或一个子类。
  • 实例对象产生自一个类。

换句话说,为了控制创建类以及扩展其行为的方式,我们所需要做的只是指定个用户定义的类创建自一个用户定义的元类,而不是常规的type类。

从技术上讲,Python遵从一个标准的协议来使这发生:在一条class语句的末尾,并且在运行了一个命名控件词典中的所有嵌套代码之后,它调用type对象来创建class对象:

class = type(classname, superclasses,attrbuteddict)

type对象反过来定义了一个__call__运算符重载方法,当调用type对象的时候,该方法运行两个其他方法:

type.__new__(typeclass, classname,superclasses,attributeddict)
type.__init__(class,classname,superclasses,attributedict)

__new__方法创建并返回了新的class对象,并且随后__init__方法初始化了新创建的对象。
这是type的元类子类通常用来定制类的钩子。

尽管重新定义type超类的__new____init__方法是元类向类对象创建过程插入逻辑的最常见方法,其他方案也是可能的。
实际上任何可调用对象都可以用作一个元类,只要它接收传递的参数并且返回与目标类兼容的一个对象。

另外,我们也可以重定义元类的__call__, 以拦截创建调用。

making class
In SuperMeta.call:
…Spam
…(<class ‘__main__.Eggs’>,)
…{‘meth’: , ‘data’: 1, ‘__new__’: , ‘__module__’: ‘__main__’, ‘__qualname__’: ‘Spam’, ‘__init__’: }
In SubMeta.new:
…Spam
…(<class ‘__main__.Eggs’>,)
…{‘meth’: , ‘data’: 1, ‘__new__’: , ‘__module__’: ‘__main__’, ‘__qualname__’: ‘Spam’, ‘__init__’: }
In SubMeta init:
…Spam
…(<class ‘__main__.Eggs’>,)
…{‘meth’: , ‘data’: 1, ‘__new__’: , ‘__module__’: ‘__main__’, ‘__qualname__’: ‘Spam’, ‘__init__’: }
…init class object: [‘meth’, ‘data’, ‘__new__’, ‘__doc__’, ‘__module__’, ‘__init__’]
making instance
data: 1

调用顺序是:
1. 调用元类的__call__。只调用一次。
2. 调用元类的__new__。创建一个type实例,只调用一次。
3. 调用元类的__init__。初始化type实例,只调用一次。
4. 调用自定义类的__new__。调用多次,每创建一个自定义实例,调用一次。
5. 调用自定义类的__init__。调用多次,每创建一个自定义实例,调用一次。

实例与继承的关系

  • 元类继承自type类。
  • 元类声明由子类继承。在用户定义的类中,metaclass=M声明由该类的子类继承,因此,对于在超类链中继承了这一声明的每个类的构建,该元类都将运行。
  • 元类属性没有由类实例继承。元类声明指定了一个实例关系,它和继承不同。由于类是元类的实例,所以元类中定义的行为应用于类,而不是类随后的实例。实例从它们的类和超类获取行为,但是,不是从任何元类获取行为。从技术上讲,实例属性查找通过只是搜索实例及期所有类的__dict__字典;元类不包含在实例查找中。

元类与类装饰器在功能上有重合。

  • 在class语句末尾,类装饰器把类名重新绑定到一个函数的结果。
  • 元类通过在一条class语句的末尾把类对象创建过程路由到一个对象来工作。

学习元类可参考Enum源码

  1. _EnumDict通过继承dict,并重写__setitem__,来实现枚举的name不重复。
  2. 通过重写元类EnumMeta的__delattr____setattr__来限制删除和修改枚举成员。

python之logging模块

python之logging模块

介绍

logging模块是用来做日志记录的。
logging.config用来配置。
logging.handlers用来处理日志,可以是屏幕输出,可以是写文件(文件又可以自动切割文件),可以是发邮件。

最简单的用法是:

import logging

logging.debug('This is debug message')
logging.info('This is info message')
logging.warning('This is warning message')
 
屏幕上打印:
WARNING:root:This is warning message

logging.basicConfig(**kwargs)

以默认格式创建一个StreamHandler,然后把它加到root logger,函数参数可以做些配置。
如果root logger没有定义handlers,则当函数debug(),info(), warning(), error() and critical()调用时会自动调用basicConfig()。

如果root logger已经定义好了handlers,则这个函数什么也不干。

import logging

logging.basicConfig(level=logging.DEBUG,
                format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
                datefmt='%a, %d %b %Y %H:%M:%S',
                filename='myapp.log',
                filemode='w')
    
logging.debug('This is debug message')
logging.info('This is info message')
logging.warning('This is warning message')
 
./myapp.log文件中内容为:
Sun, 24 May 2009 21:48:54 demo2.py[line:11] DEBUG This is debug message
Sun, 24 May 2009 21:48:54 demo2.py[line:12] INFO This is info message
Sun, 24 May 2009 21:48:54 demo2.py[line:13] WARNING This is warning message

logging.basicConfig函数各参数:
filename: 指定日志文件名
filemode: 和file函数意义相同,指定日志文件的打开模式,’w’或’a’
format: 指定输出的格式和内容,format可以输出很多有用信息,如上例所示:
%(levelno)s: 打印日志级别的数值
%(levelname)s: 打印日志级别名称
%(pathname)s: 打印当前执行程序的路径,其实就是sys.argv[0]
%(filename)s: 打印当前执行程序名
%(funcName)s: 打印日志的当前函数
%(lineno)d: 打印日志的当前行号
%(asctime)s: 打印日志的时间
%(thread)d: 打印线程ID
%(threadName)s: 打印线程名称
%(process)d: 打印进程ID
%(message)s: 打印日志信息
datefmt: 指定时间格式,同time.strftime()
level: 设置日志级别,默认为logging.WARNING
stream: 指定将日志的输出流,可以指定输出到sys.stderr,sys.stdout或者文件,默认输出到sys.stderr,当stream和filename同时指定时,stream被忽略

logging.handlers

handlers控制日志的落地。有下面这些方式。

logging.StreamHandler: 日志输出到流,可以是sys.stderr、sys.stdout或者文件
logging.FileHandler: 日志输出到文件
日志回滚方式,实际使用时用RotatingFileHandler和TimedRotatingFileHandler
logging.handlers.BaseRotatingHandler
logging.handlers.RotatingFileHandler
logging.handlers.TimedRotatingFileHandler
logging.handlers.SocketHandler: 远程输出日志到TCP/IP sockets
logging.handlers.DatagramHandler: 远程输出日志到UDP sockets
logging.handlers.SMTPHandler: 远程输出日志到邮件地址
logging.handlers.SysLogHandler: 日志输出到syslog
logging.handlers.NTEventLogHandler: 远程输出日志到Windows NT/2000/XP的事件日志
logging.handlers.MemoryHandler: 日志输出到内存中的制定buffer
logging.handlers.HTTPHandler: 通过”GET”或”POST”远程输出到HTTP服务器

将日志同时输同到文件和屏幕

import logging

logging.basicConfig(level=logging.DEBUG,
                format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
                datefmt='%a, %d %b %Y %H:%M:%S',
                filename='myapp.log',
                filemode='w')

#################################################################################################
#定义一个StreamHandler,将INFO级别或更高的日志信息打印到标准错误,并将其添加到当前的日志处理对象#
console = logging.StreamHandler()
console.setLevel(logging.INFO)
formatter = logging.Formatter('%(name)-12s: %(levelname)-8s %(message)s')
console.setFormatter(formatter)
logging.getLogger('').addHandler(console)
#################################################################################################

logging.debug('This is debug message')
logging.info('This is info message')
logging.warning('This is warning message')
 
屏幕上打印:
root        : INFO     This is info message
root        : WARNING  This is warning message
./myapp.log文件中内容为:
Sun, 24 May 2009 21:48:54 demo2.py[line:11] DEBUG This is debug message
Sun, 24 May 2009 21:48:54 demo2.py[line:12] INFO This is info message
Sun, 24 May 2009 21:48:54 demo2.py[line:13] WARNING This is warning message

自动切割日志

import logging
from logging.handlers import RotatingFileHandler

#################################################################################################
#定义一个RotatingFileHandler,最多备份5个日志文件,每个日志文件最大10M
Rthandler = RotatingFileHandler('myapp.log', maxBytes=10*1024*1024,backupCount=5)
Rthandler.setLevel(logging.INFO)
formatter = logging.Formatter('%(name)-12s: %(levelname)-8s %(message)s')
Rthandler.setFormatter(formatter)
logging.getLogger('').addHandler(Rthandler)
#######################################################################################

这个用的比较多。留下日志,1是可以跟踪性能信息,也可以跟踪出错信息。

通过文件配置日志

配置文件

#logger.conf
###############################################
[loggers]
keys=root,test01,test02
[logger_root]
level=DEBUG
handlers=hand01,hand02
[logger_test01]
handlers=hand01,hand02
qualname=test01
propagate=0
[logger_test02]
handlers=hand01,hand03
qualname=test02
propagate=0
###############################################
[handlers]
keys=hand01,hand02,hand03
[handler_hand01]
class=StreamHandler
level=INFO
formatter=form02
args=(sys.stderr,)
[handler_hand02]
class=FileHandler
level=DEBUG
formatter=form01
args=('myapp.log', 'a')
[handler_hand03]
class=handlers.RotatingFileHandler
level=INFO
formatter=form02
args=('myapp.log', 'a', 10*1024*1024, 5)
###############################################
[formatters]
keys=form01,form02
[formatter_form01]
format=%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s
datefmt=%a, %d %b %Y %H:%M:%S
[formatter_form02]
format=%(name)-12s: %(levelname)-8s %(message)s
datefmt=

代码使用配置文件


import logging
import logging.config

logging.config.fileConfig("logger.conf")
logger = logging.getLogger("test01")

logger.debug('This is debug message')
logger.info('This is info message')
logger.warning('This is warning message')

blog_mini bug修复记录1

blog_mini bug修复记录1

发现有篇文章只显示了一半。编辑之后再发布,还是只有一半。另外发布的时候感觉比较慢。

ab测试结果

ab -n 100 -c 10 http://blog.go2live.cn/
This is ApacheBench, Version 2.3 <$Revision: 1430300 $>
Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://www.zeustech.net/
Licensed to The Apache Software Foundation, http://www.apache.org/

Benchmarking blog.go2live.cn (be patient).....done


Server Software:        nginx/1.10.2
Server Hostname:        blog.go2live.cn
Server Port:            80

Document Path:          /
Document Length:        24039 bytes

Concurrency Level:      10
Time taken for tests:   14.268 seconds
Complete requests:      100
Failed requests:        0
Write errors:           0
Total transferred:      2435000 bytes
HTML transferred:       2403900 bytes
Requests per second:    7.01 [#/sec] (mean)
Time per request:       1426.779 [ms] (mean)
Time per request:       142.678 [ms] (mean, across all concurrent requests)
Transfer rate:          166.66 [Kbytes/sec] received

Connection Times (ms)
              min  mean[+/-sd] median   max
Connect:        0    0   0.1      0       1
Processing:   141 1350 261.7   1420    1497
Waiting:      140 1350 261.7   1420    1497
Total:        141 1350 261.6   1420    1497

Percentage of the requests served within a certain time (ms)
  50%   1420
  66%   1436
  75%   1455
  80%   1460
  90%   1473
  95%   1489
  98%   1492
  99%   1497
 100%   1497 (longest request)

应该说效率比较低。代码还要走读一直,看是直接增加memcache还是可以用插件的形式。

先修复内容只有一半的问题。
1.从相关的models,forms,views都没有看出问题。
2.通过抓包发现提交上去的内容并没有变少。
3.从数据库内容看,确实内容被截断了。

光看代码没有看出来。只有debug跟踪一下了。在python中debug可以用logging模块。

编辑config.py增加文件日志

  Rthandler = RotatingFileHandler(os.path.join(basedir,'../log/myapp.log'), maxBytes=100*1024*1024,backupCount=5)
  Rthandler.setLevel(logging.INFO)
  formatter = logging.Formatter('%(name)-12s: %(levelname)-8s %(message)s')
  Rthandler.setFormatter(formatter)
  app.logger.addHandler(Rthandler)

最后发现居然是数据库的 Text字段不够长。。手动把结构改成了LongText。
找不半天没有发现flask-sqlalchemy的字段里支持LongText。

python多线程相关概念及解释

python多线程相关概念及解释

简介

多线程是为了抢占资源而设计的。同样作用的有多进程,不过因为线程的创建和切换代价远比进程低,一般是选择线程来执行多任务。因为线程会共享进程内的资源,为了处理多个线程处理相同的资源,又多出了锁的概念。
而本身线程的创建其实也是有代价的,为了避免反复的创建和销毁,又有了线程池的概念。

线程的基本使用

直接调用threading.Thread类

#coding=utf-8
import threading
from time import sleep,ctime

def aLongTimeWorkWithIO():
    sleep(5)
    print("aLongTimeWorkWithIO  work  in thread:%s,%s" % (threading.current_thread().name, ctime()))

if __name__ == '__main__':
    print("start thread:", threading.current_thread().name, ctime())
    t = threading.Thread(target=aLongTimeWorkWithIO, args=())
    t.start()
    print("end thread:", threading.current_thread().name, ctime() )

输入结果:

start thread: MainThread Mon Feb 6 22:50:04 2017
end thread: MainThread Mon Feb 6 22:50:04 2017
aLongTimeWorkWithIO work in thread:Thread-1,Mon Feb 6 22:50:09 2017

最基本的代码执行是按顺序,一条一条执行下去的。只有一条路。
线程花费的CPU时间是占满主线程的。

而多线程的引入其实就是增加了代码执行路径。进程花费的CPU时间是在主线程和新线程之间交叉进行的。

继承threading.Thread类

#coding=utf-8
import threading
from time import sleep,ctime
class MyThread(threading.Thread):
    def run(self):
        sleep(5)
        print("aLongTimeWorkWithIO  work  in thread:%s,%s" % (threading.current_thread().name, ctime()))

if __name__ == '__main__':
    print("start thread:", threading.current_thread().name, ctime())
    t = MyThread()
    t.start()
    print("end thread:", threading.current_thread().name, ctime() )

重载下run方法即可。

线程交互

线程合并join

join(timeout)方法将会等待直到线程结束。这将阻塞正在调用的线程,直到被调用join()方法的线程结束。
如果线程是独立变化的挺好。譬如需要下载10张图片。因为这10张图片没有依赖关系。所以可以同时启动10个线程去下载就好了。
但有时候,线程之间是有依赖关系的,如父线程必须得等子线程完成 拿到子线程的运行结果才可以继续作业。

import threading
import time

def target():
    print('the curent threading  %s is running' % threading.current_thread().name)
    time.sleep(1)
    print('the curent threading  %s is ended' % threading.current_thread().name)

print('the curent threading  %s is running' % threading.current_thread().name)
t = threading.Thread(target=target)

#t.setDaemon(True) #1
t.start()
#t.join() #2
print('the curent threading  %s is ended' % threading.current_thread().name)

代码输出:

the curent threading MainThread is running
the curent threading Thread-1 is running
the curent threading MainThread is ended
the curent threading Thread-1 is ended

你可能会觉得奇怪。为啥主线程退出,子线程还会运行。这其实是python的主线程干的好事,他会在即将退出时检查所有的非daemon且alive的线程,一个一个调用join方法。

所以要使主线程退出,子线程也退出,只需要把子线程设置为daemon,如上面代码中的#1,打开注释就可。
而join就是等待该线程执行完成。如上面的#2所示。同时打开#1和#2和都不打开的输出内容差不多,只是顺序不同。

the curent threading MainThread is running
the curent threading Thread-1 is running
the curent threading Thread-1 is ended
the curent threading MainThread is ended

条件变量

条件变量也是处理线程间协作的一种机制,详情见下文。

Event对象

线程可以读取共享的内存,通过内存做一些数据处理。这就是线程通信的一种,python还提供了更加高级的线程通信接口。Event对象可以用来进行线程通信,调用event对象的wait方法,线程则会阻塞等待,直到别的线程set之后,才会被唤醒。

#coding: utf-8
import  threading
import  time


class MyThread(threading.Thread):
    def __init__(self, event):
        super(MyThread, self).__init__()
        self.event = event

    def run(self):
        print("thread {} is ready ".format(self.name))
        self.event.wait()
        print("thread {} run".format(self.name))

signal = threading.Event()

def main():
    start = time.time()
    for i in range(3):
        t = MyThread(signal)
        t.start()
    time.sleep(3)
    print( "after {}s".format(time.time() - start))
    signal.set()




if __name__ == '__main__':
    main()

输出结果:

thread Thread-1 is ready
thread Thread-2 is ready
thread Thread-3 is ready
after 3.00528883934021s
thread Thread-1 run
thread Thread-3 run
thread Thread-2 run

屏障barrier

屏障(barrier)是用户协调多个线程并行工作的同步机制。屏障允许每个线程等待,直到所有的合作线程都到达某一点,然后从该点继续执行。
屏障允许任意数量的线程等待,直到所有的线程完成处理工作,而线程不需要退出。所有线程达到屏障后可以接着工作。

感觉在分治法中有用。分成的子任务都完成了才能合并。所以都需要等待其它子任务。

下面的代码t1和t2都在wait,直到t5也wait,达到了Barrier的值3,才继续运行。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import threading
from threading import Barrier, Lock
from time import time,sleep
from datetime import datetime

def test_with_barrier(synchronizer, serializer):
    name = threading.current_thread().name
    synchronizer.wait()
    now = time()
    with serializer:
        print("thread %s -----> %s" % (name, datetime.fromtimestamp(now)))

def test_without_barrier():
    name = threading.current_thread().name
    now = time()
    print("thread %s -----> %s" % (name, datetime.fromtimestamp(now)))

def worker(dictionary, key, item):
    dictionary[key] = item
    print(key, item)

if __name__ == '__main__':
    synchronizer = Barrier(3)
    serializer = Lock()
    threads = []
    t1 = threading.Thread(target=test_with_barrier,args=(synchronizer, serializer))
    t2 = threading.Thread(target=test_with_barrier,args=(synchronizer, serializer))
    t3 =  threading.Thread(target=test_without_barrier)
    t4 = threading.Thread(target=test_without_barrier)

    t5 = threading.Thread(target=test_with_barrier,args=(synchronizer, serializer))
    threads.append(t1)
    threads.append(t2)
    threads.append(t3)
    threads.append(t4)

    for t in threads:
        t.start()


    sleep(5)
    t5.start()

输出结果:

thread Thread-3 —–> 2017-02-07 11:59:46.336922
thread Thread-4 —–> 2017-02-07 11:59:46.339094
thread Thread-5 —–> 2017-02-07 11:59:51.341117
thread Thread-1 —–> 2017-02-07 11:59:51.341264
thread Thread-2 —–> 2017-02-07 11:59:51.341474

避免共享资源冲突

锁的使用

互斥锁

多个线程可能会用到同一个资源,如果在使用这个资源的时候,不是原子操作。则很可能会产生不可预知的错误。例如线程A使用全局变量i,先读取值(i=1),然后再设置i(i=i+1)。线程B同样是先读取值(i=1),然后再设置i(i=i+1)。
因为[先读取值(i=1),然后再设置i(i=i+1)]并不是一个原子操作,这里面有时间窗口,那么就会有可能引起错误。如在线程A获取到i=1时,这个时候CPU切换到了线程B。
线程B执行的任务是把i设置为2。CPU切换为线程A,线程设置值为2。这里就有问题了,其实最初设计的时候期望值是3(期望线程A和B都能增加1)

为了模拟获取值和设置值之间的时间窗口。特意搞了个sleep。代码如下:

#coding: utf-8
import  threading
import  time

counter = 0
counter_lock = threading.Lock()
class  MyThread(threading.Thread):#使用类定义thread,继承threading.Thread
    def  __init__(self,name):
        threading.Thread.__init__(self)
        self.name = "Thread-" + str(name)

    def run(self):  #run函数必须实现
        global counter,counter_lock #多线程是共享资源的,使用全局变量
       # if counter_lock.acquire(): #当需要独占counter资源时,必须先锁定,这个锁可以是任意的一个锁,可以使用上边定义的3个锁中的任意一个
        tmp = counter
        time.sleep(1)
        counter = tmp+1
        print("I am %s, set counter:%s"  % (self.name,counter))
       #     counter_lock.release() #使用完counter资源必须要将这个锁打开,让其他线程使用


if  __name__ ==  "__main__":
    for i in range(1,11):
        my_thread = MyThread(i)
        my_thread.start()

输出结果:

I am Thread-1, set counter:1
I am Thread-8, set counter:1
I am Thread-4, set counter:1
I am Thread-6, set counter:1
I am Thread-7, set counter:1
I am Thread-2, set counter:1
I am Thread-9, set counter:1
I am Thread-5, set counter:1
I am Thread-3, set counter:1
I am Thread-10, set counter:1

锁的引入就是为了解决这个问题,它把非原子操作变成了原子操作。
在线程没有获取到锁的时候只能等待,使用完共享资源再释放锁,以供其它线程使用。
这里有2个注意点:
1. 是锁的使用必须是一致的。也就是对同样的共享资源,必须使用相同的锁机制。只要有一处没有使用,锁就没意义了。
2. 需要避免死锁。获取锁的顺序是和释放锁的顺序相反的。获取锁A,获取锁B,释放锁B,释放锁A。并且其它地方也必须按同样的顺序获取和释放锁。不这样的话很容易造成死锁。

下面的代码就是锁的简单使用,解决了上面代码中的问题。

#coding: utf-8
import  threading
import  time

counter = 0
counter_lock = threading.Lock()
class  MyThread(threading.Thread):#使用类定义thread,继承threading.Thread
    def  __init__(self,name):
        threading.Thread.__init__(self)
        self.name = "Thread-" + str(name)

    def run(self):  #run函数必须实现
        global counter,counter_lock #多线程是共享资源的,使用全局变量
        if counter_lock.acquire(): #当需要独占counter资源时,必须先锁定,这个锁可以是任意的一个锁,可以使用上边定义的3个锁中的任意一个
            tmp = counter
            time.sleep(1)
            counter = tmp+1
            print("I am %s, set counter:%s"  % (self.name,counter))
            counter_lock.release() #使用完counter资源必须要将这个锁打开,让其他线程使用


if  __name__ ==  "__main__":
    for i in range(1,11):
        my_thread = MyThread(i)
        my_thread.start()

输出结果如下:

I am Thread-1, set counter:1
I am Thread-2, set counter:2
I am Thread-3, set counter:3
I am Thread-4, set counter:4
I am Thread-5, set counter:5
I am Thread-6, set counter:6
I am Thread-7, set counter:7
I am Thread-8, set counter:8
I am Thread-9, set counter:9
I am Thread-10, set counter:10

需要说明的是,锁解决了共享资源冲突问题,同时他又影响了线程的并发度。降低了效率。
合理的线程设计很重要,最好是没有共享资源的使用,如全局变量,共同文件等。

可重入锁

为了支持在同一线程中多次请求同一资源,python提供了可重入锁(RLock)。RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。

#coding: utf-8
import  threading
import  time


mutex = threading.RLock()

class MyThread(threading.Thread):

    def run(self):
        if mutex.acquire(1):
            print("thread {} get mutex".format(self.name))
            time.sleep(1)
            mutex.acquire()
            mutex.release()
            mutex.release()



def main():
    print("Start main threading")
    for i in range(2):
        MyThread().start()

    print("End Main threading")


if __name__ == '__main__':
    main()

输出结果:

Start main threading
thread Thread-1 get mutex
End Main threading
thread Thread-2 get mutex

条件变量

有了锁,为什么还要有条件变量

条件变量(cond)是在多线程程序中用来实现”等待–》唤醒”逻辑常用的方法。条件变量利用线程间共享的全局变量进行同步的一种机制,主要包括两个动作:一个线程等待”条件变量的条件成立”而挂起;另一个线程使“条件成立”。为了防止竞争,条件变量的使用总是和一个互斥锁结合在一起。线程在改变条件状态前必须首先锁住互斥量,函数pthread_cond_wait把自己放到等待条件的线程列表上,然后对互斥锁解锁(这两个操作是原子操作)。在函数返回时,互斥量再次被锁住。

锁的引入已经解决了线程的同步问题,为什么还要用到条件变量呢?
首先,举个例子:在应用程序中有连个线程thread1,thread2,thread3和thread4,有一个int类型的全局变量iCount。iCount初始化为0,thread1和thread2的功能是对iCount的加1,thread3的功能是对iCount的值减1,而thread4的功能是当iCount的值大于等于100时,打印提示信息并重置iCount=0。
如果使用互斥量,线程代码大概应是下面的样子:

       thread1/2:
       while (1)
       {
             pthread_mutex_lock(&mutex);
             iCount++;
             pthread_mutex_unlock(&mutex);
       }
       thread4:
       while(1)
       {
             pthead_mutex_lock(&mutex);
             if (100 <= iCount)
             {
                   printf("iCount >= 100\r\n");
                   iCount = 0;
                   pthread_mutex_unlock(&mutex);
             }
             else
             {
                   pthread_mutex_unlock(&mutex);
             }
       }

在上面代码中由于thread4并不知道什么时候iCount会大于等于100,所以就会一直在循环判断,但是每次判断都要加锁、解锁(即使本次并没有修改iCount)。这就带来了问题一,CPU浪费严重。所以在代码中添加了sleep(),这样让每次判断都休眠一定时间。但这由带来的第二个问题,如果sleep()的时间比较长,导致thread4处理不够及时,等iCount到了很大的值时才重置。对于上面的两个问题,可以使用条件变量来解决。
首先看一下使用条件变量后,线程代码大概的样子:

      thread1/2:
       while(1)
       {
               pthread_mutex_lock(&mutex);
               iCount++;
               pthread_mutex_unlock(&mutex);
               if (iCount >= 100)
               {
                      pthread_cond_signal(&cond);
               }
       }         
       thread4:
       while (1)
       {
              pthread_mutex_lock(&mutex);
              while(iCount < 100)
              {
                     pthread_cond_wait(&cond, &mutex);
              }
              printf("iCount >= 100\r\n");
              iCount = 0;
              pthread_mutex_unlock(&mutex);
       }

从上面的代码可以看出thread4中,当iCount < 100时,会调用pthread_cond_wait。而pthread_cond_wait在上面应经讲到它会释放mutex,然后等待条件变为真返回。当返回时会再次锁住mutex。因为pthread_cond_wait会等待,从而不用一直的轮询,减少CPU的浪费。在thread1和thread2中的函数pthread_cond_signal会唤醒等待cond的线程(即thread4),这样当iCount一到大于等于100就会去唤醒thread4。从而不致出现iCount很大了,thread4才去处理。
需要注意的一点是在thread4中使用的while (iCount < 100),而不是if (iCount < 100)。这是因为在pthread_cond_singal()和pthread_cond_wait()返回之间有时间差,假如在时间差内,thread3又将iCount减到了100以下了,那么thread4就需要在等待条件为真了。

python中的例子

Pythonk中的条件变量是Condition对象。它除了具有acquire和release方法之外,还提供了wait和notify方法。线程首先acquire一个条件变量锁。如果条件不足,则该线程wait,如果满足就执行线程,甚至可以notify其他线程。其他处于wait状态的线程接到通知后会重新判断条件。

条件变量可以看成不同的线程先后acquire获得锁,如果不满足条件,可以理解为被扔到一个(Lock或RLock)的waiting池。直达其他线程notify之后再重新判断条件。该模式常用于生成者-消费者模式:

#coding: utf-8
import  threading
import  time
import random

queue = []

con = threading.Condition()

class Producer(threading.Thread):
    def run(self):
        while True:
            if con.acquire():
                if len(queue) > 10:
                    print("{} Producer waiting".format(threading.current_thread().name))
                    con.wait()
                else:
                    elem = random.randrange(100)
                    queue.append(elem)
                    print("{}, Producer a elem {}, Now size is {}".format( threading.current_thread().name,elem, len(queue)))
                    time.sleep(random.random())
                    con.notify()
                con.release()


class Consumer(threading.Thread):
    def run(self):
        while True:
            if con.acquire():
                if len(queue) < 0:
                    print("{} Consumer waiting".format(threading.current_thread().name))
                    con.wait()
                else:
                    elem = queue.pop()
                    print("{}, Consumer a elem {}. Now size is {}".format(threading.current_thread().name, elem, len(queue)))
                    time.sleep(random.random())
                    con.notify()
                con.release()

def main():
    for i in range(3):
        Producer().start()

    for i in range(2):
        Consumer().start()


if __name__ == '__main__':
    main()

输出结果如下:

Thread-1, Producer a elem 74, Now size is 1
Thread-1, Producer a elem 16, Now size is 2
Thread-1, Producer a elem 26, Now size is 3
Thread-1, Producer a elem 97, Now size is 4
Thread-1, Producer a elem 62, Now size is 5
Thread-1, Producer a elem 76, Now size is 6
Thread-1, Producer a elem 38, Now size is 7
Thread-1, Producer a elem 59, Now size is 8
Thread-1, Producer a elem 72, Now size is 9
Thread-1, Producer a elem 87, Now size is 10
Thread-1, Producer a elem 83, Now size is 11
Thread-1 Producer waiting
Thread-5, Consumer a elem 83. Now size is 10
Thread-5, Consumer a elem 87. Now size is 9
Thread-3, Producer a elem 19, Now size is 10
Thread-3, Producer a elem 15, Now size is 11
Thread-2 Producer waiting
Thread-1 Producer waiting
Thread-4, Consumer a elem 15. Now size is 10
Thread-4, Consumer a elem 19. Now size is 9
Thread-4, Consumer a elem 72. Now size is 8
Thread-4, Consumer a elem 59. Now size is 7
Thread-4, Consumer a elem 38. Now size is 6
Thread-4, Consumer a elem 76. Now size is 5
Thread-4, Consumer a elem 62. Now size is 4
Thread-4, Consumer a elem 97. Now size is 3
Thread-4, Consumer a elem 26. Now size is 2
Thread-4, Consumer a elem 16. Now size is 1
Thread-4, Consumer a elem 74. Now size is 0

信号量Semaphore

信号量是一个计数器,用于为多个线程提供对共享数据对象的访问。
为了获得共享资源,线程需要执行下列操作。

  1. 测试控制该资源的信号量。
  2. 若此信号量的值为正,则线程可以使用该资源。在这种情况下,线程会将信号量减1,表示它使用了一个资源单位。
  3. 否则,若此信号量的值为0,则线程进入休眠状态,直至信号量值大于0.进程被唤醒后,它返回到步骤1.

当线程不再使用由一个信号量控制的共享资源时,该信号量值增1。如果有线程正在休眠等待此信号量,则唤醒它们。

scan2.py

# coding=UTF-8
import optparse
import socket
import threading
screenLock = threading.Semaphore(value=1)
def connScan(tgtHost, tgtPort):
    try:
        connSkt = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        connSkt.connect((tgtHost, tgtPort))
        connSkt.send('ViolentPython\r\n')
        results = connSkt.recv(100)
        screenLock.acquire()
        print('[+]%d/tcp open' % tgtPort)
        print('[+]' + str(results))
    except:
        screenLock.acquire()
        print('[-]%d/tcp close' % tgtPort)
    finally:
        screenLock.release()
        connSkt.close()

def portScan(tgtHost, tgtPorts):
    try:
        tgtIp = socket.gethostbyname(tgtHost)
    except Exception as e:
        print("[-] Cannot resolve '%s' " % tgtHost)
        return
    try:
        tgtName = socket.gethostbyaddr(tgtIp)
        print('\n[+] Scan Results for:%s' % tgtName[0])
    except:
        print('\n[+] Scan Results for:%s' % tgtIp)
    socket.setdefaulttimeout(1)
    for tgtPort in tgtPorts:
        print('Scanning port' + str(tgtPort))
        t = threading.Thread(target=connScan, args=(tgtHost, int(tgtPort)))
        t.start()

def main():
    parser = optparse.OptionParser('usage%prog -H <target host> -p <target port>')
    parser.add_option('-H', dest='tgtHost', type='string', help='specify target host')
    parser.add_option('-p', dest = 'tgtPort', type='int', help='specify target port')
    options, args = parser.parse_args()
    if options.tgtHost is None or options.tgtPort is None:
        print(parser.usage)
        exit(0)
    else:
        tgtHost = options.tgtHost
        tgtPort = options.tgtPort
    args.append(tgtPort)
    portScan(tgtHost, args)

if __name__ == '__main__':
    main()

输出示例:

hacker git:(master) ✗ python scan2.py -H www.baidu.com -p 22
[+] Scan Results for:103.235.46.39
Scanning port22
[-]22/tcp close

在这里Semaphore用来控制同一个线程在屏幕上的输出是连续的,不至于线程的输出相互交叉,从而导致不可读。

类似的还有BoundedSemaphore
示例代码:

# coding=UTF-8
from pexpect import pxssh
import optparse
import time
import threading

maxConnections = 5
connection_lock = threading.BoundedSemaphore(value=maxConnections)
Found = False
Fails = 0

def connect(host, user, password, release):
    global Found, Fails
    try:
        s = pxssh.pxssh()
        s.login(host, user, password)
        print('[+] Password Found: ' + password)
        Found = True
    except Exception as e:
        if 'read_nonblocking' in str(e):
            Fails += 1
            time.sleep(5)
            connect(host, user, password, False)
        elif 'synchronize with original prompt' in str(e):
            time.sleep(1)
            connect(host, user, password, False)
    finally:
        if release:
            connection_lock.release()

def main():
    parser = optparse.OptionParser('usage%prog '+'-H <target host> -u <user> -f <password list>')
    parser.add_option('-H', dest='tgtHost', type='string', help='specify target host')
    parser.add_option('-f', dest='passwdFile', type='string', help='specify password file')
    parser.add_option('-u', dest='user', type='string', help='specify the user')
    (options, args) = parser.parse_args()
    host = options.tgtHost
    passwdFile = options.passwdFile
    user = options.user
    if host == None or passwdFile == None or user == None:
        print(parser.usage)
        exit(0 )
    fn = open(passwdFile, 'r')
    for line in fn.readlines():
        if Found:
            print("[*] Exiting: Password Found")
            exit(0)
        if Fails > 5:
            print("[!] Exiting: Too Many Socket Timeouts")
            exit(0)
        connection_lock.acquire()
        password = line.strip('\r').strip('\n')
        print("[-] Testing: " + str(password))
        t = threading.Thread(target=connect, args=(host, user, password, True))
        t.start()

if __name__ == '__main__':
    main()

在这个例子中BoundedSemaphore起到了限制最大并发线程数的作用。

队列

生产消费者模型主要是对队列进程操作,贴心的Python为我们实现了一个队列结构,队列内部实现了锁的相关设置。可以用队列重写生产消费者模型。

queue内部实现了相关的锁,如果queue的为空,则get元素的时候会被阻塞,知道队列里面被其他线程写入数据。同理,当写入数据的时候,如果元素个数大于队列的长度,也会被阻塞。也就是在 put 或 get的时候都会获得Lock。

实现和上面类似功能的代码如下:

#coding: utf-8
import  threading
import  time
import random

import queue

queue = queue.Queue(10)

class Producer(threading.Thread):

    def run(self):
        while True:
            elem = random.randrange(100)
            queue.put(elem)
            print("Producer a elem {}, Now size is {}".format(elem, queue.qsize()))
            time.sleep(random.random())

class Consumer(threading.Thread):

    def run(self):
        while True:
            elem = queue.get()
            queue.task_done()
            print("Consumer a elem {}. Now size is {}".format(elem, queue.qsize()))
            time.sleep(random.random())

def main():

    for i in range(3):
        Producer().start()

    for i in range(2):
        Consumer().start()


if __name__ == '__main__':
    main()

输出结果:

Producer a elem 97, Now size is 1
Producer a elem 56, Now size is 2
Producer a elem 27, Now size is 3
Consumer a elem 97. Now size is 2
Consumer a elem 56. Now size is 1
Consumer a elem 27. Now size is 0
Producer a elem 41, Now size is 1
Producer a elem 75, Now size is 2
Producer a elem 18, Now size is 3
Consumer a elem 41. Now size is 2

ThreadLocal

这个概念在Java中也有。类似于伪私有类属性,在类方法前加__,如__X,Python会自动扩展这样的名称,以包含类的名称,从而使它们变成真正的唯一。这样在当前线程中就是全局变量,但对于整个进程来说,又是局部变量。

使用示例如下:

import threading
local = threading.local()
def func(name):
    print('current thread:%s' % threading.currentThread().name)
    local.name = name
    print("%s in %s" % (local.name,threading.currentThread().name))
t1 = threading.Thread(target=func,args=('haibo',))
t2 = threading.Thread(target=func,args=('lina',))
t1.start()
t2.start()
t1.join()
t2.join()

输出结果:

current thread:Thread-1
haibo in Thread-1
current thread:Thread-2
lina in Thread-2

线程池的使用

threadpool

threadpool需要安装。pip install threadpool

import threadpool
from time import sleep

def work(name):
sleep(1)
print("%s doing job" % name)

arg_list = [(["go2live.cn"],None),
(["blog.go2live.cn"],None)]
pool = threadpool.ThreadPool(5)
requests = threadpool.makeRequests(work, arg_list)
[pool.putRequest(req) for req in requests]
pool.wait()

输出结果:

go2live.cn doing job
blog.go2live.cn doing job

multiprocessing.dummy.Pool

标准库。

from time import sleep

from multiprocessing.dummy import Pool as ThreadPool

def work(name):
    sleep(1)
    print("%s doing job" % name)

pool = ThreadPool()
results = pool.map(work,['go2live.cn','blog.go2live.cn'])
print(results)
pool.close()
pool.join()

print('main ended')

输出结果:

go2live.cn doing job
blog.go2live.cn doing job
[None, None]
main ended

其它线程工具

在debug中可以利用threading模块获取到当前执行的线程情况:

支持的函数:

  • activeCount():返回激活的线程对象的数量
  • currentThread():返回当前cpu执行的线程对象
  • get_ident() 返回当前线程
  • enumerate(): 返回当前激活的线程对象列表
  • main_thread() 返回主 Thread 对象
  • settrace(func) 为所有线程设置一个 trace 函数
  • setprofile(func) 为所有线程设置一个 profile 函数
  • stack_size([size]) 返回新创建线程栈大小;或为后续创建的线程设定栈大小为 size
  • TIMEOUT_MAX Lock.acquire(), RLock.acquire(), Condition.wait() 允许的最大值

threading 可用对象列表:

  • Thread 表示执行线程的对象
  • Lock 锁原语对象
  • RLock 可重入锁对象,使单一进程再次获得已持有的锁(递归锁)
  • Condition 条件变量对象,使得一个线程等待另一个线程满足特定条件,比如改变状态或某个值
  • Semaphore 为线程间共享的有限资源提供一个”计数器”,如果没有可用资源会被阻塞
  • Event 条件变量的通用版本,任意数量的线程等待某个事件的发生,在该事件发生后所有线程被激活
  • Timer 与 Thread 相识,不过它要在运行前等待一段时间
  • Barrier 创建一个”阻碍”,必须达到指定数量的线程后才可以继续。

延迟执行新线程Timer

class threading.Timer(interval, function, args=None, kwargs=None)
过interval秒后再执行函数function,参数为后面的args和kwargs。

这东东在GUI编程中有用。把数据处理扔到新线程,从而不影响当前的用户交互。

示例代码:

#coding: utf-8
import  threading
import  time


def test():
    print("{} is running {}".format(threading.current_thread().name, time.ctime()))


def main():
    print("main thread start %s" % time.ctime())
    threading.Timer(3,test).start()
    print("main thread end %s" % time.ctime())


if __name__ == '__main__':
    main()

输出如下所示:

main thread start Tue Feb 7 11:41:56 2017
main thread end Tue Feb 7 11:41:56 2017
Thread-1 is running Tue Feb 7 11:41:59 2017

python的多线程缺陷

因为Python的线程虽然是真正的线程,但解释器执行代码时,有一个GIL锁:Global Interpreter Lock,任何Python线程执行前,必须先获得GIL锁,然后,每执行100条字节码,解释器就自动释放GIL锁,让别的线程有机会执行。这个GIL全局锁实际上把所有线程的执行代码都给上了锁,所以,多线程在Python中只能交替执行,即使100个线程跑在100核CPU上,也只能用到1个核。

GIL是Python解释器设计的历史遗留问题,通常我们用的解释器是官方实现的CPython,要真正利用多核,除非重写一个不带GIL的解释器。

所以,在Python中,可以使用多线程,但不要指望能有效利用多核。如果一定要通过多线程利用多核,那只能通过C扩展来实现,不过这样就失去了Python简单易用的特点。

不过,也不用过于担心,Python虽然不能利用多线程实现多核任务,但可以通过多进程实现多核任务。多个Python进程有各自独立的GIL锁,互不影响。

总结

Python多线程在IO密集型任务中还是很有用处的,而对于计算密集型任务,应该使用Python多进程。

Python多进程相关概念及解释

Python多进程相关概念及解释

介绍

由于GIL的存在,为了利用多核的优势,Python程序员不得不使用多进程。(多线程在PYthon中仅在IO密集型任务中有优势)。
Python提供了非常好用的多进程包multiprocessing,只需要定义一个函数,Python会完成其他所有事情。借助这个包,可以轻松完成从单进程到并发执行的转换。multiprocessing支持子进程、通信和共享数据、执行不同形式的同步,提供了Process、Queue、Pipe、Lock等组件。
感觉使用方法和threading差不多。

原始方法fork

fork函数被调用一次,但返回两次。对父进程返回子进程ID, 对子进程返回0.
子进程获得父进程数据空间、堆和栈的副本。父子进程并不共享这些存储空间部分,父、子进程共享正文段。

由于在fork之后经常跟随着exec,所以现在的很多实现并不执行一个父进程数据段、堆和栈的完全复制。作为替代,使用了写时复制技术。
这些区域由父、子进程共享,而且内核将它们的访问权限改变为只读的。
如果父、子进程中的任一个试图修改这些区域,则内核只为修改区域的那块内存制作一个副本,通常是虚拟存储器系统中的一“页”。

一般来说,在fork之后是父进程还是子进程执行是不确定的。

在类unix系统中, python的os 模块内置了fork 函数用以创建子进程。

示例代码:

import os

print("Process %s start …" %(os.getpid()))
pid = os.fork()
if pid == 0:
    print("This is child process and my pid is %d, my father process is %d" %(os.getpid(), os.getppid()))
else:
    print("This is Fater process, And Its child pid is %d" %(pid))

输出结果:

Process 872 start …
This is Fater process, And Its child pid is 873
This is child process and my pid is 873, my father process is 872

subprocess

subprocess包主要功能是执行外部的命令和程序。比如说,我需要使用wget下载文件。我在Python中调用wget程序。从这个意义上来说,subprocess的功能与shell类似。

Popen

subprocess模块中只定义了一个类: Popen。
可以使用Popen来创建进程,并与进程进行复杂的交互。它的构造函数如下:

subprocess.Popen(args, bufsize=0, executable=None, stdin=None, stdout=None, stderr=None, preexec_fn=None, close_fds=False, shell=False, cwd=None, env=None, universal_newlines=False, startupinfo=None, creationflags=0)

参数解释:

  • args 可以是字符串或者序列类型(如:list,元组),用于指定进程的可执行文件及其参数。如果是序列类型,第一个元素通常是可执行文件的路径。我们也可以显式的使用executeable参数来指定可执行文件的路径。在windows操作系统上,Popen通过调用CreateProcess()来创建子进程,CreateProcess接收一个字符串参数,如果args是序列类型,系统将会通过list2cmdline()函数将序列类型转换为字符串。
  • bufsize:指定打开stdin/stdout/stderr文件的缓冲类型。0 无缓冲;1 行缓冲; 其它正数 选择合适的缓冲; 负数(默认值),采用系统默认缓冲大小,即io.DEFAULT_BUFFER_SIZE。
  • executable用于指定可执行程序。一般情况下我们通过args参数来设置所要运行的程序。如果将参数shell设为True,executable将指定程序使用的shell。在windows平台下,默认的shell由COMSPEC环境变量来指定。
  • stdin, stdout, stderr分别表示程序的标准输入、输出、错误句柄。他们可以是PIPE,文件描述符或文件对象,也可以设置为None,表示从父进程继承。
  • preexec_fn 只在Unix平台下有效,用于指定一个可执行对象(callable object),它将在子进程运行之前被调用。
  • Close_sfs:在windows平台下,如果close_fds被设置为True,则新创建的子进程将不会继承父进程的输入、输出、错误管道。我们不能将close_fds设置为True同时重定向子进程的标准输入、输出与错误(stdin, stdout, stderr)。
  • 如果参数shell设为true,程序将通过shell来执行。
  • cwd用于设置子进程的当前目录。
  • env是字典类型,用于指定子进程的环境变量。如果env = None,子进程的环境变量将从父进程中继承。
  • Universal_newlines:不同操作系统下,文本的换行符是不一样的。如:windows下用’/r/n’表示换,而Linux下用’/n’。如果将此参数设置为True,Python统一把这些换行符当作’/n’来处理。
  • startupinfo与createionflags只在windows下用效,它们将被传递给底层的CreateProcess()函数,用于设置子进程的一些属性,如:主窗口的外观,进程的优先级等等。

方法解释:

  • Popen.poll() 用于检查子进程是否已经结束。设置并返回returncode属性。
  • Popen.wait() 等待子进程结束。设置并返回returncode属性。
  • Popen.communicate(input=None) 与子进程进行交互。向stdin发送数据,或从stdout和stderr中读取数据。可选参数input指定发送到子进程的参数。Communicate()返回一个元组:(stdoutdata, stderrdata)。注意:如果希望通过进程的stdin向其发送数据,在创建Popen对象的时候,参数stdin必须被设置为PIPE。同样,如果希望从stdout和stderr获取数据,必须将stdout和stderr设置为PIPE。
  • Popen.send_signal(signal) 向子进程发送信号。
  • Popen.terminate() 停止(stop)子进程。在windows平台下,该方法将调用Windows API TerminateProcess()来结束子进程。
  • Popen.kill() 杀死子进程。

简单示例如下:

import subprocess
child = subprocess.Popen(["ping","-c","5","www.google.com"])
print("parent process")
child.wait()

输出结果:

parent process
PING www.google.com (93.46.8.89): 56 data bytes
Request timeout for icmp_seq 0
Request timeout for icmp_seq 1
Request timeout for icmp_seq 2
Request timeout for icmp_seq 3

www.google.com ping statistics —
5 packets transmitted, 0 packets received, 100.0% packet loss

可以见到子进程的输出直接打印在了屏幕上,因为子进程默认继承了父进程的stdin/stdout/stderr。

Popen的再封装

subprocess.call()
父进程等待子进程完成
返回退出信息(returncode)

subprocess.check_call()
父进程等待子进程完成。
正常执行完返回0,否者抛出subprocess.CalledProcessError,其中包含returncode

subprocess.check_output()

父进程等待子进程完成。
返回子进程向标准输出的输出结果。

事实上这个三个都是利用subprocess.Popen实现的。

简单实现的check_output示例如下:

import subprocess

def my_check_output(args,
                    stdin=None,
                    stderr=None,
                    shell=False,
                    encoding=None,
                    errors=None,
                    universal_newlines=False,
                    timeout=None):

    child = subprocess.Popen(args, stdin=stdin, stdout=subprocess.PIPE,stderr=stderr,shell=shell,universal_newlines=universal_newlines)
    stdout,stderr = child.communicate(input=stdin, timeout=timeout)
    returncode = child.returncode
    if returncode != 0:
        raise subprocess.CalledProcessError(returncode=returncode)
    else:
        return stdout

try:
    ret = my_check_output("pwd",shell=True)
    print(ret)
except subprocess.CalledProcessError as e:
    print(e)

输出结果:

b’/Users/maynard/data/workbase/python/study/Blog_mini/tests\n’

标准模块multiprocessing

fork 方式是仅在linux 下才有的接口, 在windows下并没有, 那么在windows下如何实现多进程呢, 这就用到了multiprocessing

Process创建进程

直接调用

multiprocessing 模块的Process 对象表示的是一个进程对象, 可以创建子进程并执行指定的函数。
创建进程的类:Process([group [, target [, name [, args [, kwargs]]]]]),target表示调用对象,args表示调用对象的位置参数元组。kwargs表示调用对象的字典参数。name为别名。group实质上不使用。

方法:is_alive()、join([timeout])、run()、start()、terminate()。其中,Process以start()启动某个进程。

属性:authkey、daemon、exitcode(进程在运行时为None、如果为–N,表示被信号N结束)、name、pid。
其中daemon是父进程终止后自动终止,且自己不能产生新进程,必须在start()之前设置。

from multiprocessing import Process
import os

def pro_do(name, func):
    print("This is child process %d from parent process %d, and name is  %s which is used for %s"
          %(os.getpid(), os.getppid(), name, func))

if __name__ == "__main__":
    print("Parent process id %d" %(os.getpid()))
    #process 对象指定子进程将要执行的操作方法(pro_do), 以及该函数的对象列表args(必须是tuple格式, 且元素与pro_do的参数一一对应)
    pro = Process(target=pro_do, args=("test", "dev"))
    print("start child process")
    #启动子进程
    pro.start()
    #是否阻塞方式执行, 如果有, 则阻塞方式, 否则非阻塞
    pro.join() #if has this, it’s synchronous operation or asynchronous operation
    print("Process end")

输出结果:

Parent process id 921
start child process
This is child process 922 from parent process 921, and name is test which is used for dev
Process end

子类化Process

自定义类,继承自Process, 实现run方法。

import multiprocessing
import time

class ClockProcess(multiprocessing.Process):
    def __init__(self, interval):
        multiprocessing.Process.__init__(self)
        self.interval = interval

    def run(self):
        n = 5
        while n > 0:
            print("the time is {0}".format(time.ctime()))
            time.sleep(self.interval)
            n -= 1

if __name__ == '__main__':
    p = ClockProcess(3)
    p.start()

输出结果:

the time is Wed Feb 8 16:40:55 2017
the time is Wed Feb 8 16:40:58 2017
the time is Wed Feb 8 16:41:01 2017
the time is Wed Feb 8 16:41:04 2017
the time is Wed Feb 8 16:41:07 2017

Pool进程池

multiprocessing.Pool提供进程池功能。
在利用Python进行系统管理的时候,特别是同时操作多个文件目录,或者远程控制多台主机,并行操作可以节约大量的时间。当被操作对象数目不大时,可以直接利用multiprocessing中的Process动态成生多个进程,十几个还好,但如果是上百个,上千个目标,手动的去限制进程数量却又太过繁琐,此时可以发挥进程池的功效。
Pool可以提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来它。

函数解释:

  • apply_async(func[, args[, kwds[, callback]]]) 它是非阻塞,apply(func[, args[, kwds]])是阻塞的
  • close() 关闭pool,使其不在接受新的任务。
  • terminate() 结束工作进程,不再处理未完成的任务。
  • join() 主进程阻塞,等待子进程的退出, join方法要在close或terminate之后使用。
from multiprocessing import Pool
import os, time

def pro_do(process_num):
    print("child process id is %d" %(os.getpid()))
    time.sleep(6-process_num)
    print("this is process %d" %(process_num))

if __name__ == "__main__":
    print("Current process is %d" %(os.getpid()))
    p = Pool()#默认启动CPU个数的进程,可以传参指定。
    for i in range(5):
        p.apply_async(pro_do, (i,))  #增加新的进程
    p.close() # 禁止在增加新的进程
    p.join()#等待所有子进程结束
    print("pool process done")

输出结果:

Current process is 960
child process id is 961
child process id is 963
child process id is 962
child process id is 964
this is process 3
child process id is 964
this is process 2
this is process 1
this is process 4
this is process 0
pool process done

进程间的通信

父进程可以指定子进程执行的方法及其参数, 达到父进程向子进程传递消息的单向通信的目的, 那子进程之间或子进程怎么向父进程通信呢

Queue

Queue是通过Pipe和锁、信号量实现,在进程间共享的一个通信机制。
Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。put方法用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常。

get方法可以从队列读取并且删除一个元素。同样,get方法有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常。

示例如下:

from multiprocessing import Process, Queue
import os, time

def write_queue(q):
    for name in ["Yi_Zhi_Yu", "Tony" ,"San"]:
        print("put name %s to queue" %(name))
        q.put(name)
        time.sleep(2)
    print ("write data finished")

def read_queue(q):
    print( "begin to read data")
    while True:
        name = q.get()
        print ("get name %s from queue" %(name))

if __name__ == "__main__":
    q = Queue()
    pw = Process(target=write_queue, args=(q,))
    pr = Process(target=read_queue,args=(q,))

    pw.start()
    pr.start()
    pw.join() #这个表示是否阻塞方式启动进程, 如果要立即读取的话, 两个进程的启动就应该是非阻塞式的, 所以pw在start后不能立即使用pw.join(), 要等pr start后方可
    pr.terminate() #服务进程,强制停止

输出结果:

put name Yi_Zhi_Yu to queue
begin to read data
get name Yi_Zhi_Yu from queue
put name Tony to queue
get name Tony from queue
put name San to queue
get name San from queue
write data finished

Manager

Queue不能用在父进程和Pool产生的子进程中。这个时候需要用到Manager。
其它如Lock等,也不能用multiprocessing.Lock,而是得用Manager中的Lock。

代码示例如下:

from multiprocessing import Process, Queue,Pool,Manager
import os, time

def write_queue(q):
    for name in ["Yi_Zhi_Yu", "Tony" ,"San"]:
        print("put name %s to queue" %(name))
        q.put(name)
        time.sleep(2)
    print ("write data finished")

def read_queue(q):
    print( "begin to read data")
    while True:
        name = q.get()
        print ("get name %s from queue" %(name))

if __name__ == "__main__":
    manager = Manager()
    q = manager.Queue()
    p = Pool()
    pw = p.apply_async(write_queue,args=(q,))
    pr = p.apply_async(read_queue,args=(q,))
    p.close()
    p.join()
    print("main end")

输出结果:

put name Yi_Zhi_Yu to queue
begin to read data
get name Yi_Zhi_Yu from queue
put name Tony to queue
get name Tony from queue
put name San to queue
get name San from queue
write data finished

Pipe

管道是UNIX系统IPC的最古老的形式,所有UNIX系统都提供此种通信机制。管道有以下两种局限性。

  1. 历史上,它们是半双工的(即数据只能在一个方向上流动)。现在,某些系统提供全双工管道,但是为了最佳的可移植性,我们决不应预先假定系统支持全双工管道。
  2. 管道只能在具有公共祖先的两个进程之间使用。通常,一个管道由一个进程创建,在进程调用fork之后,这个管道就能在父进程和子进程之间使用了。

Pipe方法返回(conn1, conn2)代表一个管道的两个端。Pipe方法有duplex参数,如果duplex参数为True(默认值),那么这个管道是全双工模式,也就是说conn1和conn2均可收发。duplex为False,conn1只负责接受消息,conn2只负责发送消息。

send和recv方法分别是发送和接受消息的方法。例如,在全双工模式下,可以调用conn1.send发送消息,conn1.recv接收消息。如果没有消息可接收,recv方法会一直阻塞。如果管道已经被关闭,那么recv方法会抛出EOFError。

代码示例:

#!/usr/bin/env python
#encoding=utf-8

from multiprocessing import Process,Pipe
import os,time,sys

def send_pipe(p):
    names = ["Yi_Zhi_Yu", "Tony", "San"]
    for name in names:
        print("put name %s to Pipe" %(name))
        p.send(name)
        time.sleep(1)
def recv_pipe(p):
    print("Try to read data in pipe")
    while True:
            name = p.recv()
            print("get name %s from pipe" %(name))

if __name__ == "__main__":
   #pipe, one for send, one for read
   ps_pipe, pr_pipe = Pipe()# return  (conn1, conn2). 有个参数来决定是否双向管理.True为双向管道,False为单向,conn1发送,conn2接收.默认True
   #process
   ps = Process(target=send_pipe, args=(ps_pipe,))
   pr = Process(target=recv_pipe, args=(pr_pipe,))
   pr.start()
   ps.start()
   ps.join()
   pr.terminate()

输出结果:

Try to read data in pipe
put name Yi_Zhi_Yu to Pipe
get name Yi_Zhi_Yu from pipe
put name Tony to Pipe
get name Tony from pipe
put name San to Pipe
get name San from pipe

Event

Event用来实现进程间同步通信。

示例代码:

import multiprocessing
import time

def wait_for_event(e):
    print("wait_for_event: starting")
    e.wait()
    print("wairt_for_event: e.is_set()->" + str(e.is_set()))

def wait_for_event_timeout(e, t):
    print("wait_for_event_timeout:starting")
    e.wait(t)
    print("wait_for_event_timeout:e.is_set->" + str(e.is_set()))

if __name__ == "__main__":
    e = multiprocessing.Event()
    w1 = multiprocessing.Process(name = "block",
            target = wait_for_event,
            args = (e,))

    w2 = multiprocessing.Process(name = "non-block",
            target = wait_for_event_timeout,
            args = (e, 2))
    w1.start()
    w2.start()

    time.sleep(3)

    e.set()
    print("main: event is set")

输出结果:

wait_for_event: starting
wait_for_event_timeout:starting
wait_for_event_timeout:e.is_set->False
main: event is set
wairt_for_event: e.is_set()->True

解决共享资源

Lock

有with用法和明确的acquire和release用法。见代码示例:

import multiprocessing
import sys

def worker_with(lock, f):
    with lock:
        fs = open(f, 'a+')
        n = 10
        while n > 1:
            fs.write("Lockd acquired via with\n")
            n -= 1
        fs.close()

def worker_no_with(lock, f):
    lock.acquire()
    try:
        fs = open(f, 'a+')
        n = 10
        while n > 1:
            fs.write("Lock acquired directly\n")
            n -= 1
        fs.close()
    finally:
        lock.release()

if __name__ == "__main__":
    lock = multiprocessing.Lock()
    f = "file.txt"
    w = multiprocessing.Process(target = worker_with, args=(lock, f))
    nw = multiprocessing.Process(target = worker_no_with, args=(lock, f))
    w.start()
    nw.start()
    print("end")

file.txt文件内容:

Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly

Semaphore

信号量(Semaphore)用来控制对共享资源的访问数量,例如池的最大连接数。

代码示例:

import multiprocessing
import time

def worker(s, i):
    s.acquire()
    print(multiprocessing.current_process().name + "acquire");
    time.sleep(i)
    print(multiprocessing.current_process().name + "release\n");
    s.release()

if __name__ == "__main__":
    s = multiprocessing.Semaphore(2)
    for i in range(5):
        p = multiprocessing.Process(target = worker, args=(s, i*2))
        p.start()

输出结果:

Process-1acquire
Process-1release

Process-2acquire
Process-3acquire
Process-2release

Process-4acquire
Process-3release

Process-5acquire
Process-4release

Process-5release