0.前言
决定从今天起简单记录下研究生阶段学习的知识。
考虑一个简单的双人博弈,石头剪刀布。我们不知道对手会出什么,对手当然也不会告诉我们(废话)。那么这就是一个最简单的不完全信息博弈。
参与人的集合,行动顺序,可以采取什么行动,当参与人行动时他们知道什么,以及参与人的收益函数,当这些都是参与人的共同知识时,博弈被称为是完全信息的。至少有一名参与者对于以上内容不完全了解,则是不完全信息博弈。
我们的直觉告诉我们,在石头剪刀布的时候,不能多出某一招,否则会很容易被人利用。例如:如果我们知道一个对手以(0.2,0.3,0.5)的概率出(剪刀,石头,布),我们很自然会想到我们可以多出剪刀来取得胜利。当两个玩家都察觉到这一点时,两个玩家的 最佳策略都变成了(1/3,1/3,1/3),此时两个玩家都无法通过变招来多获得一点优势——我们称这种状态为纳什均衡。
这种简单的问题下我们不需要什么算法,甚至可以通过一眼丁真的方法,观察对手会多出什么,但是当问题复杂起来就不是那么简单了。
这篇文章从一个简单的不完全信息博弈游戏kuhun牌开始,介绍CFR算法、MCCFR算法以及OOS算法,并附上代码实现。
我在学习的时候苦于这方面开源的python代码不是很多,希望能做出自己的一点贡献。
1.Kuhn牌简介
Kuhn Poker(库恩扑克)是由 Harold E. Kuhn 设计的一种简单的三张牌扑克游戏。 两个玩家每人先下1个筹码(即盲注1个筹码到赌注池中)。 三张标有数字1、2和3的牌被洗牌后,每个玩家发一张牌作为私人信息。 游戏从玩家1开始轮流进行。在一轮中,玩家可以选择过牌或下注。下注的玩家将额外的一个筹码放入赌注池中。 当一个玩家在对手下注后过牌,对手将拿走赌注池中的所有筹码。 当连续两次过牌或连续两次下注时,两个玩家都亮出他们的牌,拥有更高数字牌的玩家拿走赌注池中的所有筹码。
有一些策略是显然的,拿到“3”牌时,因为足够大,所以我们应该选择下注,但是如果直接选择下注,对面的玩家可能会选择过牌,从而只能拿到1点收益;如果选择过牌,引诱对面可能拿到“2”的玩家下注,自己再下注就能拿到2点收益……层层博弈下应该以什么样的策略能拿到最大收益呢?换言之这个游戏的纳什平衡策略是什么?
我们使用CFR算法来解决这个问题。
2.CFR算法简介
在这里我不想长篇大论CFR的原理,因为我的水平也不是很能说清楚理论。推荐下这位博主大佬的https://zhuanlan.zhihu.com/p/139696555。我只简单地用直观的方式来说明为什么CFR可以有效。
如果时光可以倒流的话,我们一定会去试图弥补自己的遗憾;或者说,吃一堑长一智。面对一个场景我们能做出很多选择,但往往事后我们才能知道我们选择的收益大小。根据收益,我们就知道下次遇到类似的情况应该怎么做了。
但是结果往往是蝴蝶效应的叠加导致的,如果很久以前的动作,我们怎么知道这个动作的具体收益呢?CFR提出了反事实值这个概念。通过计算某一动作的反事实值减去策略的反事实值,我们就能知道这个动作是不是比较好的动作了。我们使用这个差值就可以更新这一动作的遗憾值,从而更新策略了。所以我们就会尽量选择相比之下比较好的动作。
对强化学习有了解的朋友会发现这里有很多概念似乎很熟悉。比如payoff这个概念很像reward,反事实值那一块又有点Q函数的意思……确实有点像,但是还是有点区别的,虽然我也搞不太清楚。
两者也有结合,如果有空,我也会写成博客。
3.代码1-kuhn环境实现:kuhn.py
注释比较详细(虽然是用gpt写的),就不另外说明了。
import copy
import random
from collections import defaultdict
"""
Kuhn Poker(库恩扑克)是由 Harold E. Kuhn 设计的一种简单的三张牌扑克游戏。
两个玩家每人先下1个筹码(即盲注1个筹码到赌注池中)。
三张标有数字1、2和3的牌被洗牌后,每个玩家发一张牌作为私人信息。
游戏从玩家1开始轮流进行。在一轮中,玩家可以选择过牌或下注。下注的玩家将额外的一个筹码放入赌注池中。
当一个玩家在对手下注后过牌,对手将拿走赌注池中的所有筹码。
当连续两次过牌或连续两次下注时,两个玩家都亮出他们的牌,拥有更高数字牌的玩家拿走赌注池中的所有筹码。
"""
random.seed()
# 定义游戏中的动作类,包括过牌和下注两种动作
class Action:
# 过牌动作,用0表示
Check = 0
# 下注动作,用1表示
Bet = 1
# 随机生成一个动作
@staticmethod
def random():
# 生成一个0或1的随机整数作为动作ID
action_id = random.randint(0, 1)
return Action.from_int(action_id)
# 根据给定的整数ID生成对应的动作对象
@staticmethod
def from_int(action_id):
if action_id == 0:
return Action.Check
elif action_id == 1:
return Action.Bet
else:
# 如果传入的ID不合法,抛出异常
raise ValueError(f"Unknown action id {action_id}")
# 返回动作的总数(这里是2,即过牌和下注)
@staticmethod
def num():
return 2
# 用于存储游戏动作历史的类
class ActionHistory:
def __init__(self, raw):
# 存储动作历史的列表
self.raw = raw
# 创建一个新的ActionHistory实例
@staticmethod
def new(raw):
return ActionHistory(raw)
# 使ActionHistory实例可迭代,返回其内部存储的动作列表的迭代器
def __iter__(self):
return iter(self.raw)
# 返回动作历史的长度(即存储的动作数量)
def __len__(self):
return len(self.raw)
# 创建当前ActionHistory实例的副本
def clone(self):
return ActionHistory(copy.deepcopy(self.raw))
# Kuhn扑克游戏类
class KuhnGame:
def __init__(self):
# 游戏中的三张牌,牌面数字为1、2、3
self.cards = [1, 2, 3]
# 存储游戏动作历史的实例
self.action_history = ActionHistory([])
# 存储玩家的列表
self.players = []
# 添加玩家到游戏中,每个游戏最多两个玩家
def add_player(self, player):
# 确保玩家数量小于2
assert len(self.players) < 2
# 调用玩家的注册方法,传入玩家ID(0或1)
player.on_register(len(self.players))
self.players.append(player)
# 开始游戏,进行指定轮数的游戏
def start(self, total_rounds):
score = [0,0]
for round in range(total_rounds):
# 每轮开始时洗牌
random.shuffle(self.cards)
# 清空动作历史列表
self.action_history.raw.clear()
# print(f"Round {round}, cards {self.cards}")
# 给每个玩家发牌,并调用玩家的开始游戏方法
for player_id, player in enumerate(self.players):
player.on_start(self.cards[player_id])
# 获取当前游戏状态下的收益,如果还没有结束则为None
maybe_payoff = get_payoff(self.action_history, self.cards)
while maybe_payoff is None:
for player in self.players:
# 玩家决定动作
action = player.decide_action(self.action_history)
# 将动作添加到动作历史列表
self.action_history.raw.append(action)
# 再次获取收益,检查游戏是否结束
maybe_payoff = get_payoff(self.action_history, self.cards)
if maybe_payoff is not None:
break
payoff = maybe_payoff
# 根据玩家顺序确定每个玩家的收益
payoffs = [payoff, -payoff]
score[0] += payoffs[0]
score[1] += payoffs[1]
for player_id, player in enumerate(self.players):
player.handle_result(self.action_history, payoffs[player_id])
# for player_id, player in enumerate(self.players):
# print(f"Player {player_id}: {player.money}")
# 根据动作历史和牌面情况计算收益
def get_payoff(action_history, cards):
# 如果动作历史少于2个动作,游戏未结束,返回None
if len(action_history.raw) < 2:
return None
prev_action = action_history.raw[-1]
prev_prev_action = action_history.raw[-2]
if prev_action == Action.Check:
if prev_prev_action == Action.Check:
# 如果连续两次过牌,比较玩家牌面大小决定收益
return 1 if cards[0] > cards[1] else -1
# 根据下注玩家的ID确定收益(交替下注)过牌-下注-过牌
bet_player_id = len(action_history.raw) % 2
return 1 if bet_player_id == 0 else -1
if prev_action == Action.Bet and prev_prev_action == Action.Bet:
# 如果连续两次下注,比较玩家牌面大小决定收益(筹码翻倍)
return 2 if cards[0] > cards[1] else -2
return None
4.代码2-cfr主体部分:cfr.py
import random
from collections import defaultdict
import kuhn
import time
import pickle
# 信息集类,用于表示玩家在游戏中的特定状态(包括动作历史和手牌)
class InformationSet:
def __init__(self, action_history, hand_card):
# 存储动作历史
self.action_history = action_history
# 存储玩家手牌
self.hand_card = hand_card
# 重写哈希函数,根据动作历史(转换为元组)和手牌计算哈希值
def __hash__(self):
return hash((tuple(self.action_history.raw), self.hand_card))
# 重写相等比较函数,比较两个信息集的动作历史和手牌是否相等
def __eq__(self, other):
return (self.action_history.raw, self.hand_card) == (other.action_history.raw, other.hand_card)
# 重写对象的字符串表示形式,便于打印和调试
def __repr__(self):
return f"{self.hand_card} {self.action_history.raw} "
# CFR(Counterfactual Regret Minimization)算法中的节点类
class CfrNode:
def __init__(self, epsilon):
# 累计遗憾值列表,用于每个可能的动作(这里是库恩扑克中的过牌和下注)
self.cum_regrets = [0.0] * kuhn.Action.num()
# 累计策略列表,用于每个可能的动作
self.cum_strategy = [0.0] * kuhn.Action.num()
# 探索率,用于平衡探索新策略和利用已有策略
self.epsilon = epsilon
# 根据给定的概率分布采样一个动作索引
def sample(self, probs):
return random.choices(range(len(probs)), weights=probs, k=1)[0]
# 根据累计遗憾值选择一个动作 这里没有被运行?
def get_action(self):
# 如果所有累积遗憾值都为零或负数,则随机选择一个动作
if all(regret <= 0 for regret in self.cum_regrets):
return kuhn.Action.random()
# 将负遗憾值转换为0,仅保留正遗憾值作为权重
weights = [max(0, regret) for regret in self.cum_regrets]
# 使用随机权重采样
chosen_index = random.choices(range(len(weights)), weights=weights, k=1)[0]
return kuhn.Action.from_int(chosen_index)
# 获取每个动作的概率,考虑了遗憾值和探索率 对于CFR,就是0.5 0.5的平均探索
def get_action_probability(self):
regret_sum = sum(max(0, regret) for regret in self.cum_regrets)
# 如果总遗憾值为0,则每个动作概率均等
if regret_sum > 0:
result = [(max(0, regret) / regret_sum) for regret in self.cum_regrets]
else:
result = [1.0 / kuhn.Action.num()] * kuhn.Action.num()
explore_prob = self.epsilon / kuhn.Action.num()
# 将探索概率和策略概率融合
result = [explore_prob + prob * (1 - self.epsilon) for prob in result]
return result
# 重写对象的字符串表示形式,显示过牌和下注的比例
def __repr__(self):
check_ratio = self.cum_strategy[0] / (self.cum_strategy[0] + self.cum_strategy[1])
bet_ratio = 1.0 - check_ratio
return f"[Check: {check_ratio}, Bet: {bet_ratio}]"
# 使用CFR算法的玩家类
class CfrPlayer:
def __init__(self,mode="Random"):
# 玩家ID,初始化为 -1
self.player_id = -1
# 玩家手牌,初始化为 -1
self.hand_card = -1
# 玩家的资金
self.money = 0
# 存储信息集到CFR节点的映射 Information-set:CFR-Node
self.cfr_info = {}
self.mode = mode
# 玩家注册方法,设置玩家ID
def on_register(self, player_id):
self.player_id = player_id
# 游戏开始时,设置玩家手牌
def on_start(self, card):
self.hand_card = card
# 根据动作历史决定玩家的行动
def decide_action(self, action_history):
if self.mode == "Random":
return kuhn.Action.random()
info_set = InformationSet(action_history, self.hand_card)
if info_set not in self.cfr_info.keys():
# print(info_set,"------118-----")
self.cfr_info[info_set] = CfrNode(0.0)
return kuhn.Action.random()
# print(self.cfr_info[info_set].cum_regrets)
return self.cfr_info[info_set].get_action()
# 处理游戏结果,更新玩家资金并打印相关信息
def handle_result(self, action_history, payoff):
self.money += payoff
info_set = InformationSet(action_history, self.hand_card)
# print(f"player {self.player_id} get payoff {payoff} with info set {str(info_set)}")
# 训练玩家策略,进行多次迭代
def train(self, iteration):
total_payoff = 0.0
episode_payoff = 0.0
# start = time.time()
for round in range(iteration):
cards = [1, 2, 3]
action_history = kuhn.ActionHistory([])
random.shuffle(cards)
history_probs = {0: 1.0, 1: 1.0}
round_payoff = 0.0
if self.mode == "CFR":
round_payoff = self.cfr(action_history, cards, history_probs)
elif self.mode == "MCCFR":
round_payoff = self.mccfr(action_history, cards, history_probs)
elif self.mode == "OOS":
_, _, round_payoff = self.oos(action_history, cards,{0: 1.0, 1: 1.0}, {0: 1.0, 1: 1.0})
total_payoff += round_payoff
episode_payoff += round_payoff
if (round+1)%1000==0:
print(f"round {round}, round payoff {round_payoff},1000-average payoff {episode_payoff / 1000},average payoff {total_payoff / round}")
episode_payoff = 0.0
# end = time.time()
# if end - start > 60:
# print(f"round {round}, average payoff {total_payoff / iteration}")
# break
# CFR算法的核心递归方法,计算玩家在当前状态下的价值
def cfr(self, history, cards, reach_probs):
player_id = len(history) % 2
# 检查游戏是否结束,如果结束则返回相应的收益
maybe_payoff = kuhn.get_payoff(history, cards)
if maybe_payoff is not None:
payoff = maybe_payoff if player_id == 0 else -maybe_payoff
return float(payoff)
# 处理信息集,如果信息集不存在则创建新的CFR节点
info_set = InformationSet(history, cards[player_id])
if info_set not in self.cfr_info.keys():
self.cfr_info[info_set] = CfrNode(0.0)
# 获取当前信息集下的行动概率
action_probs = self.cfr_info[info_set].get_action_probability()
# 存储每个行动的收益
action_payoffs = []
node_value = 0.0
# 遍历每个可能的行动
for action_id, action_prob in enumerate(action_probs):
# 创建下一个动作历史的副本
next_history = history.clone()
# 将当前行动添加到动作历史中
next_history.raw.append(kuhn.Action.from_int(action_id))
# 复制到达当前状态的概率
next_reach_probs = reach_probs.copy()
# 更新到达下一个状态的概率
next_reach_probs[player_id] *= action_prob
# 递归调用cfr方法计算下一个状态的收益,并取负(因为是对手的视角)
action_payoffs.append(-self.cfr(next_history, cards, next_reach_probs))
node_value += action_prob * action_payoffs[action_id]
assert len(action_payoffs) == 2
# 根据计算得到的收益更新节点的累计遗憾值
node = self.cfr_info[info_set]
# print(node.cum_strategy) #0.0,0.0
for action_id, payoff in enumerate(action_payoffs):
regret = payoff - node_value #动作的反事实值 - 策略的反事实值
opponent = 1 - player_id
node.cum_regrets[action_id] += reach_probs[opponent] * regret #计算这个节点的遗憾值
# 根据行动概率更新节点的累计策略,这个变量用于打印策略信息,不影响
for action_id, action_prob in enumerate(action_probs):
node.cum_strategy[action_id] += reach_probs[player_id] * action_prob
return node_value
5.main.py
from kuhn import KuhnGame
from cfr import CfrPlayer
import time
def main():
game = KuhnGame()
player1 = CfrPlayer("CFR")
player2 = CfrPlayer("Random")
start_time = time.time()
player1.train(10001)
end_time = time.time()
print("CFR takes",end_time - start_time) #CFR 0.2558259963989258
player1.print_info()
game.add_player(player1)
game.add_player(player2)
game.start(1000)
game.players.clear()
game.add_player(player2)
game.add_player(player1)
game.start(1000)
print(player1.money, player2.money)
if __name__ == "__main__":
main()
6.结语
这是策略表格:
—————先手—————
1 [] : [Check: 0.8613605693431835, Bet: 0.1386394306568165]
1 [0, 1] : [Check: 0.9999912741717137, Bet: 8.725828286260118e-06]
2 [] : [Check: 0.9992974156106671, Bet: 0.0007025843893329053]
2 [0, 1] : [Check: 0.5164528591157757, Bet: 0.48354714088422435]
3 [] : [Check: 0.585470226795728, Bet: 0.414529773204272]
3 [0, 1] : [Check: 1.2746102987836917e-05, Bet: 0.9999872538970122]
—————后手—————
1 [0] : [Check: 0.6683707431378955, Bet: 0.33162925686210454]
1 [1] : [Check: 0.9999850227653966, Bet: 1.4977234603419376e-05]
2 [0] : [Check: 0.9998948822681403, Bet: 0.00010511773185972739]
2 [1] : [Check: 0.6714624487539822, Bet: 0.3285375512460178]
3 [0] : [Check: 1.5005552054260076e-05, Bet: 0.9999849944479458]
3 [1] : [Check: 1.5005552054260076e-05, Bet: 0.9999849944479458]
——————————–
发现其实随机策略的玩家玩的也很好?确实是这样的,我的猜测是因为这里动作太少了,而且我们可以看到许多情况下过牌和下注概率差的也不大。所以会导致这样的情况吧。
之后有空会写MCCFR和手上正在看的论文的算法。
orz