Python函数式编程
纸上得来终觉浅,绝知此事要躬行。

1. 基础知识
函数式编程(functional programming)是一种编程范式。
理解函数式编程的行为
- 函数式编程不依赖于外部变量,而是返回一个新的值给你,所以没有任何副作用。即保证每次输入的值不变,输出的值一定也不会发生改变。
- 非函数式编程中my_inc会修改全局变量中的x变量的值,而函数式编程中my_inc只依赖于我们输出时传入的x值,给出我们计算之后的值,不依赖于外部变量。
# 非函数式编程 In [1]: x = 1 In [2]: def my_inc(): ...: global x ...: x += 1 ...: In [3]: my_inc() In [4]: x Out[4]: 2
# 函数式编程 In [5]: def my_inc(x): ...: return x + 1 ...: In [6]: x = 1 In [7]: my_inc(x) Out[7]: 2
函数式编程的主要特征
- 函数是一等公民
- 所有和数据相关的事情都可以通过函数自身来解决。函数就像变量一样用来使用,函数可以像变量一样被创建、删除和修改,并当成变量一样传递,返回或是在函数中嵌套函数。
- 递归作为主要控制结构
- 函数式编程的循环控制等操作,基本都是使用递归进行完成的。
- 关注列表处理
- 列表处理(lisp)经常被用于子列表的递归来替代循环。
- 避免变量副作用
- 纯函数式编程语言会避免命令式语言中先赋值给一个变量,而另一个相同变量来跟踪程序状态这样的副作用,即所求即所得。
- 完全不允许声明
- 函数式编程不鼓励或完全不允许声明,替而代之的是表达式求值,即函数+参数的方式。通常情况下,一个程序就是一个表达式。
- 关注于计算本身
- 函数式编程担忧的是计算什么,而不是如何计算。所以你的代码变成了在描述你要干什么,而不是怎么去干
- 许多函数式编程利用高阶函数
- 之前我们学习的map、reduce、filter等都是高阶函数。
# Python对原生函数式编程的支持也就如下了,如需扩展就需要使用第三方库了 # 使用普通的方式计算求值 In [8]: (1 + 2) * 3 - 4 Out[8]: 5 # 函数式编程的计算求值 In [9]: from operator import mul, add, sub In [10]: sub(mul(add(1, 2), 3), 4) Out[10]: 5 # 其中operator提供的这样基本操作方式就是为了函数式编程中使用 In [12]: from functools import reduce In [13]: reduce(mul, [1, 2, 3, 4, 5]) Out[13]: 120
参考链接地址
2. 优点缺点
函数式编程基本工作中不太会使用,除非你真的知道你在干什么。
函数式编程的优点
- 代码编写易于理解
- 因为函数式编程语言的特点,更接近于自然语言,易于理解和使用。
- 方便 debug 和单元测试
- 由于函数式编程不依赖、也不会改变外界的状态,只要给定输入参数,返回的结果必定相同,可以把函数当做单元处理,debug和写单元测试比较容易。
- 易于并发编程
- 由于函数式编程不修改变量,也就没有资源竞争的问题,所以不需要加锁保护可变状态,也就没有死锁问题,所以可以很放心地把工作分摊到多个线程,进行并发编程。
函数式编程的缺点
- 编程性能较差
- 函数式编程相对于面向对象和过程式的编程性能要差,比如不断递归有堆栈的消耗,但是Python语言没有做尾递归优化。
- 很多场景不适用
- 函数式编程不适合做IO操作,也不适合写GUI。Python内置很多数据类型,有时候就是需要对这些数据类型做更新,如果非要用函数式会增加时间消耗,产生额外的中间数据。当然也有一些场景非常适合,比如下面讲的 4 个语言特性部分。不要为了用函数式编程而用,而是在适合的地方使用适合的技术,不要无视现实环境。
3. 语言特性
虽然函数式语言很多场景不适用,但是如下四个特性场景就非常适合使用了。
- [1] 闭包 Closure
# maker就是工厂函数,action就是闭包
In [1]: def maker(n):
...: def action(m):
...: return n * m
...: return action
...:
In [2]: f = maker(3)
In [3]: f(2)
Out[3]: 6
In [4]: g = maker(10)
In [5]: g(2)
Out[5]: 20
- [2] 高阶函数 higer-order function
In [1]: list(map(lambda x: x * 2, [1, 3, 4]))
Out[1]: [2, 6, 8]
In [2]: reduce(add, [1, 2, 3])
Out[2]: 6
In [3]: list(filter(None, [1, '', {}, (), False, None, set()]))
Out[3]: [1]
- [3] 柯里化 Currying
- 柯里化可以理解为多次嵌套的偏函数
- 柯里化就是把接收多个参数的函数转化为接收多个单一参数的函数
In [1]: def add(x, y, z): ....: return x + y + z ....: In [2]: def addX(z): ....: defa(y): ....: def_(x): ....: return x + y + z ....: return _ ....: return a ....: In [3]: add(1, 2, 3) Out[3]: Out: 6 In [4]: addX(1)(2)(3) Out[4]: Out: 6
In [5]: from toolz import curry In [6]: @curry ....: def add(x, y, z): ....: return x + y + z ....: In [7]: add(1) Out[7]: <function add at 0x10ab1e620> In [8]: add(1)(2) Out[8]: <function add at 0x10ab1e620> In [9]: add(1)(2)(3) Out[9]: 6 # 装饰器curry做的事情相当于多次partial In [10]: partial(partial(add, 1), 2)(3) # 装饰器curry并不纯粹只是多次partial,其参数可以自由自定使用 In [11]: add(1, 2) Out[11]: <function add at 0x10ab1e620> In [12]: add(1, 2)(3) Out[12]: 6 In [13]: add(1, 2, 3) Out[13]: 6 In [14]: add(1)(2, 3) Out[14]: 6
- [4] 偏应用函数 Partially Applied Function
- 偏应用函数用于给函数固定参数
In [1]: def warn_log(message):
...: return log('WARN', message)
...:
In [2]: warn_log('This is a warn')
Out[2]: [WARN]: This is a warn
# 偏应用函数partial,用于给函数绑定属性
In [3]: from functools import partial
In [4]: def log(level, message):
...: print(f'[{level}]: {message}')
...:
In [5]: warn_log = partial(log, 'WARN')
In [6]: warn_log('This is a warn')
Out[6]: [WARN]: This is a warn
# Python3.4开始添加了partialmethod函数
# partialmethod函数作用类似于partial函数,但仅作用于方法
from functools import partialmethod
class Cell(object):
def__init__(self):
self._alive = False
@property
def alive(self):
return self._alive
def set_state(self, state):
self._alive = bool(state)
set_alive = partialmethod(set_state, True)
set_dead = partialmethod(set_state, False)
# IPython中执行并输出
c = Cell()
c.alive # False
c.set_alive()
c.alive # True
4. 第三方库
如果对原生函数式编程不满足,可使用第三方库提供的语法糖简化代码。
尽管Python并不是一种纯粹的函数式编程(FP)语言,但是它自生的多范式以及十分自由的特性使我们更加容易编写函数式编程的代码。
| 第三方库函数编程库 | Github 地址 | 主要用途 |
|---|---|---|
| fancy | fancy | 注重实际的 FP 方法的集合 |
| fn | fn | 实现了 FP 缺失的特性功能 |
| PyFunctional | PyFunctional | 使用 FP 实现了数据管道链 |
| peewee | peewee | 实现了数据库的 ORM 连接 |
- fancy
# 遍历集合
walk(str.upper, {'a', 'b'}) # {'A', 'B'}
walk(reversed, {'a': 1, 'b': 2}) # {1: 'a', 2: 'b'}
walk_keys(double, {'a': 1, 'b': 2}) # {'aa': 1, 'bb': 2}
walk_values(inc, {'a': 1, 'b': 2}) # {'a': 2, 'b': 3}
# 操作序列
take(4, iterate(double, 1)) # [1, 2, 4, 8]
first(drop(3, count(10))) # 13
remove(even, [1, 2, 3]) # [1, 3]
concat([1, 2], [5, 6]) # [1, 2, 5, 6]
cat(map(range, range(4))) # [0, 0, 1, 0, 1, 2]
mapcat(range, range(4)) # same
flatten(nested_structure) # flat_list
distinct('abacbdd') # list('abcd')
split(odd, range(5)) # ([1, 3], [0, 2, 4])
split_at(2, range(5)) # ([0, 1], [2, 3, 4])
group_by(mod3, range(5)) # {0: [0, 3], 1: [1, 4], 2: [2]}
partition(2, range(5)) # [[0, 1], [2, 3]]
chunks(2, range(5)) # [[0, 1], [2, 3], [4]]
pairwise(range(5)) # iter: [0, 1], [1, 2], ...
# 轻松创建装饰器
@decorator
def log(call):
print call._func.__name__, call._args
return call()
# 易于调试
@print_exits
def some_func(...):
"..."
@log_calls(log.info, errors=False)
@log_errors(log.exception)
def some_suspicious_function(...):
"..."
with print_durations('Creating models'):
Model.objects.create(...)
- fn
# Scala格式的lambda定义 from fn import _ from fn.op import zipwith from itertools import repeat assert list(map(_ * 2, range(5))) == [0,2,4,6,8] assert list(filter(_ < 10, [9,10,11])) == [9] assert list(zipwith(_ + _)([0,1,2], repeat(10))) == [10,11,12]
# 持续的数据结构 >>> from fn.immutable import SkewHeap >>> s1 = SkewHeap(10) >>> s2 = s1.insert(20) >>> s2 <fn.immutable.heap.SkewHeap object at 0x10b14c050> >>> s3 = s2.insert(30) >>> s3 <fn.immutable.heap.SkewHeap object at 0x10b14c158> # <-- other object >>> s3.extract() (10, <fn.immutable.heap.SkewHeap object at 0x10b14c050>) >>> s3.extract() # <-- s3 isn't changed (10, <fn.immutable.heap.SkewHeap object at 0x10b11c052>)
- PyFunctional
In [1]: from functional import seq In [2]: (seq(1, 2, 3, 4) ...: .map(lambda x: x * 2) ...: .filter(lambda x: x > 4) ...: .reduce(lambda x, y: x + y) ...: ) Out [2]: 14 # map => [2, 4, 6, 8] # filter => [6, 8] # reduce => 14
# 过滤账户交易的列表
om collections import namedtuple
Transaction = namedtuple('Transaction', 'reason amount')
transactions = [
Transaction('github', 7),
Transaction('food', 10),
Transaction('coffee', 5),
Transaction('digitalocean', 5),
Transaction('food', 5),
Transaction('riotgames', 25),
Transaction('food', 10),
Transaction('amazon', 200),
Transaction('paycheck', -1000)
]
# Using the Scala/Spark inspired APIs
food_cost = seq(transactions)
.filter(lambda x: x.reason == 'food')
.map(lambda x: x.amount).sum()
# Using the LINQ inspired APIs
food_cost = seq(transactions)
.where(lambda x: x.reason == 'food')
.select(lambda x: x.amount).sum()
# Using PyFunctional with fn
from fn import _
food_cost = seq(transactions).filter(_.reason == 'food').map(_.amount).sum()
- peewee
# ORM的一个SQL语句 In [1]: tweets = (Tweet ...: .select() ...: .where( ...: (Tweet.created_date >= date.today()) & ...: (Tweet.is_published == True)) ...: .count())PyFunctional peewee
from peewee import *
import datetime
db = SqliteDatabase('my_database.db')
class BaseModel(Model):
class Meta:
database = db
class User(BaseModel):
username = CharField(unique=True)
class Tweet(BaseModel):
user = ForeignKeyField(User, backref='tweets')
message = TextField()
created_date = DateTimeField(default=datetime.datetime.now)
is_published = BooleanField(default=True)
# 连接到数据库并创建表 db.connect() db.create_tables([User, Tweet]) # 创建一些行 charlie = User.create(username='charlie') huey = User(username='huey') huey.save() Tweet.create(user=charlie, message='My first tweet')
# 表达式和组合式查询
# A simple query selecting a user.
User.get(User.username == 'charlie')
# Get tweets created by one of several users.
usernames = ['charlie', 'huey', 'mickey']
users = User.select().where(User.username.in_(usernames))
tweets = Tweet.select().where(Tweet.user.in_(users))
# We could accomplish the same using a JOIN:
tweets = (Tweet
.select()
.join(User)
.where(User.username.in_(usernames)))
# How many tweets were published today?
tweets_today = (Tweet
.select()
.where(
(Tweet.created_date >= datetime.date.today()) &
(Tweet.is_published == True))
.count())
# Paginate the user table and show me page 3 (users 41-60).
User.select().order_by(User.username).paginate(3, 20)
# Order users by the number of tweets they've created:
tweet_ct = fn.Count(Tweet.id)
users = (User
.select(User, tweet_ct.alias('ct'))
.join(Tweet, JOIN.LEFT_OUTER)
.group_by(User)
.order_by(tweet_ct.desc()))
# Do an atomic update
Counter.update(count=Counter.count + 1).where(Counter.url == request.url)