上一个
下一个

不完全信息博弈:Khun牌与CFR及其简单变体

0.前言

决定从今天起简单记录下研究生阶段学习的知识。

考虑一个简单的双人博弈,石头剪刀布。我们不知道对手会出什么,对手当然也不会告诉我们(废话)。那么这就是一个最简单的不完全信息博弈。

参与人的集合,行动顺序,可以采取什么行动,当参与人行动时他们知道什么,以及参与人的收益函数,当这些都是参与人的共同知识时,博弈被称为是完全信息的。至少有一名参与者对于以上内容不完全了解,则是不完全信息博弈

我们的直觉告诉我们,在石头剪刀布的时候,不能多出某一招,否则会很容易被人利用。例如:如果我们知道一个对手以(0.2,0.3,0.5)的概率出(剪刀,石头,布),我们很自然会想到我们可以多出剪刀来取得胜利。当两个玩家都察觉到这一点时,两个玩家的 最佳策略都变成了(1/3,1/3,1/3),此时两个玩家都无法通过变招来多获得一点优势——我们称这种状态为纳什均衡

这种简单的问题下我们不需要什么算法,甚至可以通过一眼丁真的方法,观察对手会多出什么,但是当问题复杂起来就不是那么简单了。

这篇文章从一个简单的不完全信息博弈游戏kuhun牌开始,介绍CFR算法MCCFR算法以及OOS算法,并附上代码实现。

我在学习的时候苦于这方面开源的python代码不是很多,希望能做出自己的一点贡献。

1.Kuhn牌简介

Kuhn Poker(库恩扑克)是由 Harold E. Kuhn 设计的一种简单的三张牌扑克游戏。 两个玩家每人先下1个筹码(即盲注1个筹码到赌注池中)。 三张标有数字1、2和3的牌被洗牌后,每个玩家发一张牌作为私人信息。 游戏从玩家1开始轮流进行。在一轮中,玩家可以选择过牌或下注。下注的玩家将额外的一个筹码放入赌注池中。 当一个玩家在对手下注后过牌,对手将拿走赌注池中的所有筹码。 当连续两次过牌或连续两次下注时,两个玩家都亮出他们的牌,拥有更高数字牌的玩家拿走赌注池中的所有筹码。

有一些策略是显然的,拿到“3”牌时,因为足够大,所以我们应该选择下注,但是如果直接选择下注,对面的玩家可能会选择过牌,从而只能拿到1点收益;如果选择过牌,引诱对面可能拿到“2”的玩家下注,自己再下注就能拿到2点收益……层层博弈下应该以什么样的策略能拿到最大收益呢?换言之这个游戏的纳什平衡策略是什么?

我们使用CFR算法来解决这个问题。

2.CFR算法简介

在这里我不想长篇大论CFR的原理,因为我的水平也不是很能说清楚理论。推荐下这位博主大佬的https://zhuanlan.zhihu.com/p/139696555。我只简单地用直观的方式来说明为什么CFR可以有效。

如果时光可以倒流的话,我们一定会去试图弥补自己的遗憾;或者说,吃一堑长一智。面对一个场景我们能做出很多选择,但往往事后我们才能知道我们选择的收益大小。根据收益,我们就知道下次遇到类似的情况应该怎么做了。

但是结果往往是蝴蝶效应的叠加导致的,如果很久以前的动作,我们怎么知道这个动作的具体收益呢?CFR提出了反事实值这个概念。通过计算某一动作的反事实值减去策略的反事实值,我们就能知道这个动作是不是比较好的动作了。我们使用这个差值就可以更新这一动作的遗憾值,从而更新策略了。所以我们就会尽量选择相比之下比较好的动作。

对强化学习有了解的朋友会发现这里有很多概念似乎很熟悉。比如payoff这个概念很像reward,反事实值那一块又有点Q函数的意思……确实有点像,但是还是有点区别的,虽然我也搞不太清楚。

两者也有结合,如果有空,我也会写成博客。

 

3.代码1-kuhn环境实现:kuhn.py

注释比较详细(虽然是用gpt写的),就不另外说明了。

				
					import copy
import random
from collections import defaultdict

"""
Kuhn Poker(库恩扑克)是由 Harold E. Kuhn 设计的一种简单的三张牌扑克游戏。
两个玩家每人先下1个筹码(即盲注1个筹码到赌注池中)。
三张标有数字1、2和3的牌被洗牌后,每个玩家发一张牌作为私人信息。
游戏从玩家1开始轮流进行。在一轮中,玩家可以选择过牌或下注。下注的玩家将额外的一个筹码放入赌注池中。
当一个玩家在对手下注后过牌,对手将拿走赌注池中的所有筹码。
当连续两次过牌或连续两次下注时,两个玩家都亮出他们的牌,拥有更高数字牌的玩家拿走赌注池中的所有筹码。
"""

random.seed()

# 定义游戏中的动作类,包括过牌和下注两种动作
class Action:
    # 过牌动作,用0表示
    Check = 0
    # 下注动作,用1表示
    Bet = 1

    # 随机生成一个动作
    @staticmethod
    def random():
        # 生成一个0或1的随机整数作为动作ID
        action_id = random.randint(0, 1)
        return Action.from_int(action_id)

    # 根据给定的整数ID生成对应的动作对象
    @staticmethod
    def from_int(action_id):
        if action_id == 0:
            return Action.Check
        elif action_id == 1:
            return Action.Bet
        else:
            # 如果传入的ID不合法,抛出异常
            raise ValueError(f"Unknown action id {action_id}")

    # 返回动作的总数(这里是2,即过牌和下注)
    @staticmethod
    def num():
        return 2

# 用于存储游戏动作历史的类
class ActionHistory:
    def __init__(self, raw):
        # 存储动作历史的列表
        self.raw = raw

    # 创建一个新的ActionHistory实例
    @staticmethod
    def new(raw):
        return ActionHistory(raw)

    # 使ActionHistory实例可迭代,返回其内部存储的动作列表的迭代器
    def __iter__(self):
        return iter(self.raw)

    # 返回动作历史的长度(即存储的动作数量)
    def __len__(self):
        return len(self.raw)

    # 创建当前ActionHistory实例的副本
    def clone(self):
        return ActionHistory(copy.deepcopy(self.raw))

# Kuhn扑克游戏类
class KuhnGame:
    def __init__(self):
        # 游戏中的三张牌,牌面数字为1、2、3
        self.cards = [1, 2, 3]
        # 存储游戏动作历史的实例
        self.action_history = ActionHistory([])
        # 存储玩家的列表
        self.players = []

    # 添加玩家到游戏中,每个游戏最多两个玩家
    def add_player(self, player):
        # 确保玩家数量小于2
        assert len(self.players) < 2
        # 调用玩家的注册方法,传入玩家ID(0或1)
        player.on_register(len(self.players))
        self.players.append(player)

    # 开始游戏,进行指定轮数的游戏
    def start(self, total_rounds):
        score = [0,0]
        for round in range(total_rounds):
            # 每轮开始时洗牌
            random.shuffle(self.cards)
            # 清空动作历史列表
            self.action_history.raw.clear()
            # print(f"Round {round}, cards {self.cards}")

            # 给每个玩家发牌,并调用玩家的开始游戏方法
            for player_id, player in enumerate(self.players):
                player.on_start(self.cards[player_id])

            # 获取当前游戏状态下的收益,如果还没有结束则为None
            maybe_payoff = get_payoff(self.action_history, self.cards)
            while maybe_payoff is None:
                for player in self.players:
                    # 玩家决定动作
                    action = player.decide_action(self.action_history)
                    # 将动作添加到动作历史列表
                    self.action_history.raw.append(action)
                    # 再次获取收益,检查游戏是否结束
                    maybe_payoff = get_payoff(self.action_history, self.cards)
                    if maybe_payoff is not None:
                        break
            payoff = maybe_payoff
            # 根据玩家顺序确定每个玩家的收益
            payoffs = [payoff, -payoff]
            score[0] += payoffs[0]
            score[1] += payoffs[1]
            for player_id, player in enumerate(self.players):
                player.handle_result(self.action_history, payoffs[player_id])
        # for player_id, player in enumerate(self.players):
        #     print(f"Player {player_id}: {player.money}")

# 根据动作历史和牌面情况计算收益
def get_payoff(action_history, cards):
    # 如果动作历史少于2个动作,游戏未结束,返回None
    if len(action_history.raw) < 2:
        return None

    prev_action = action_history.raw[-1]
    prev_prev_action = action_history.raw[-2]

    if prev_action == Action.Check:
        if prev_prev_action == Action.Check:
            # 如果连续两次过牌,比较玩家牌面大小决定收益
            return 1 if cards[0] > cards[1] else -1
        # 根据下注玩家的ID确定收益(交替下注)过牌-下注-过牌
        bet_player_id = len(action_history.raw) % 2
        return 1 if bet_player_id == 0 else -1

    if prev_action == Action.Bet and prev_prev_action == Action.Bet:
        # 如果连续两次下注,比较玩家牌面大小决定收益(筹码翻倍)
        return 2 if cards[0] > cards[1] else -2
    return None
				
			

4.代码2-cfr主体部分:cfr.py

				
					import random
from collections import defaultdict

import kuhn
import time

import pickle

# 信息集类,用于表示玩家在游戏中的特定状态(包括动作历史和手牌)
class InformationSet:
    def __init__(self, action_history, hand_card):
        # 存储动作历史
        self.action_history = action_history
        # 存储玩家手牌
        self.hand_card = hand_card

    # 重写哈希函数,根据动作历史(转换为元组)和手牌计算哈希值
    def __hash__(self):
        return hash((tuple(self.action_history.raw), self.hand_card))

    # 重写相等比较函数,比较两个信息集的动作历史和手牌是否相等
    def __eq__(self, other):
        return (self.action_history.raw, self.hand_card) == (other.action_history.raw, other.hand_card)

    # 重写对象的字符串表示形式,便于打印和调试
    def __repr__(self):
        return f"{self.hand_card} {self.action_history.raw} "


# CFR(Counterfactual Regret Minimization)算法中的节点类
class CfrNode:
    def __init__(self, epsilon):
        # 累计遗憾值列表,用于每个可能的动作(这里是库恩扑克中的过牌和下注)
        self.cum_regrets = [0.0] * kuhn.Action.num()
        # 累计策略列表,用于每个可能的动作
        self.cum_strategy = [0.0] * kuhn.Action.num()
        # 探索率,用于平衡探索新策略和利用已有策略
        self.epsilon = epsilon

    # 根据给定的概率分布采样一个动作索引
    def sample(self, probs):
        return random.choices(range(len(probs)), weights=probs, k=1)[0]

    # 根据累计遗憾值选择一个动作 这里没有被运行?
    def get_action(self):
        # 如果所有累积遗憾值都为零或负数,则随机选择一个动作
        if all(regret <= 0 for regret in self.cum_regrets):
            return kuhn.Action.random()

        # 将负遗憾值转换为0,仅保留正遗憾值作为权重
        weights = [max(0, regret) for regret in self.cum_regrets]
        # 使用随机权重采样
        chosen_index = random.choices(range(len(weights)), weights=weights, k=1)[0]

        return kuhn.Action.from_int(chosen_index)

    # 获取每个动作的概率,考虑了遗憾值和探索率 对于CFR,就是0.5 0.5的平均探索
    def get_action_probability(self):
        regret_sum = sum(max(0, regret) for regret in self.cum_regrets)
        # 如果总遗憾值为0,则每个动作概率均等
        if regret_sum > 0:
            result = [(max(0, regret) / regret_sum) for regret in self.cum_regrets]
        else:
            result = [1.0 / kuhn.Action.num()] * kuhn.Action.num()

        explore_prob = self.epsilon / kuhn.Action.num()
        # 将探索概率和策略概率融合
        result = [explore_prob + prob * (1 - self.epsilon) for prob in result]

        return result

    # 重写对象的字符串表示形式,显示过牌和下注的比例
    def __repr__(self):
        check_ratio = self.cum_strategy[0] / (self.cum_strategy[0] + self.cum_strategy[1])
        bet_ratio = 1.0 - check_ratio
        return f"[Check: {check_ratio}, Bet: {bet_ratio}]"


# 使用CFR算法的玩家类
class CfrPlayer:
    def __init__(self,mode="Random"):
        # 玩家ID,初始化为 -1
        self.player_id = -1
        # 玩家手牌,初始化为 -1
        self.hand_card = -1
        # 玩家的资金
        self.money = 0
        # 存储信息集到CFR节点的映射 Information-set:CFR-Node
        self.cfr_info = {}

        self.mode = mode

    # 玩家注册方法,设置玩家ID
    def on_register(self, player_id):
        self.player_id = player_id

    # 游戏开始时,设置玩家手牌
    def on_start(self, card):
        self.hand_card = card

    # 根据动作历史决定玩家的行动
    def decide_action(self, action_history):
        if self.mode == "Random":
            return kuhn.Action.random()
        info_set = InformationSet(action_history, self.hand_card)
        if info_set not in self.cfr_info.keys():
            # print(info_set,"------118-----")
            self.cfr_info[info_set] = CfrNode(0.0)
            return kuhn.Action.random()
        # print(self.cfr_info[info_set].cum_regrets)
        return self.cfr_info[info_set].get_action()

    # 处理游戏结果,更新玩家资金并打印相关信息
    def handle_result(self, action_history, payoff):
        self.money += payoff
        info_set = InformationSet(action_history, self.hand_card)
        # print(f"player {self.player_id} get payoff {payoff} with info set {str(info_set)}")

    # 训练玩家策略,进行多次迭代
    def train(self, iteration):
        total_payoff = 0.0
        episode_payoff = 0.0
        # start = time.time()
        for round in range(iteration):
            cards = [1, 2, 3]
            action_history = kuhn.ActionHistory([])
            random.shuffle(cards)
            history_probs = {0: 1.0, 1: 1.0}
            round_payoff = 0.0
            if self.mode == "CFR":
                round_payoff = self.cfr(action_history, cards, history_probs)
            elif self.mode == "MCCFR":
                round_payoff = self.mccfr(action_history, cards, history_probs)
            elif self.mode == "OOS":
                _, _, round_payoff = self.oos(action_history, cards,{0: 1.0, 1: 1.0}, {0: 1.0, 1: 1.0})
            total_payoff += round_payoff
            episode_payoff += round_payoff
            if (round+1)%1000==0:
                print(f"round {round}, round payoff {round_payoff},1000-average payoff {episode_payoff / 1000},average payoff {total_payoff / round}")
                episode_payoff = 0.0
            # end = time.time()
            # if end - start > 60:
            #     print(f"round {round}, average payoff {total_payoff / iteration}")
            #     break

    # CFR算法的核心递归方法,计算玩家在当前状态下的价值
    def cfr(self, history, cards, reach_probs):
        player_id = len(history) % 2

        # 检查游戏是否结束,如果结束则返回相应的收益
        maybe_payoff = kuhn.get_payoff(history, cards)
        if maybe_payoff is not None:
            payoff = maybe_payoff if player_id == 0 else -maybe_payoff
            return float(payoff)

        # 处理信息集,如果信息集不存在则创建新的CFR节点
        info_set = InformationSet(history, cards[player_id])
        if info_set not in self.cfr_info.keys():
            self.cfr_info[info_set] = CfrNode(0.0)

        # 获取当前信息集下的行动概率
        action_probs = self.cfr_info[info_set].get_action_probability()
        # 存储每个行动的收益
        action_payoffs = []
        node_value = 0.0

        # 遍历每个可能的行动
        for action_id, action_prob in enumerate(action_probs):
            # 创建下一个动作历史的副本
            next_history = history.clone()
            # 将当前行动添加到动作历史中
            next_history.raw.append(kuhn.Action.from_int(action_id))
            # 复制到达当前状态的概率
            next_reach_probs = reach_probs.copy()
            # 更新到达下一个状态的概率
            next_reach_probs[player_id] *= action_prob
            # 递归调用cfr方法计算下一个状态的收益,并取负(因为是对手的视角)
            action_payoffs.append(-self.cfr(next_history, cards, next_reach_probs))
            node_value += action_prob * action_payoffs[action_id]

        assert len(action_payoffs) == 2
        # 根据计算得到的收益更新节点的累计遗憾值
        node = self.cfr_info[info_set]
        # print(node.cum_strategy) #0.0,0.0
        for action_id, payoff in enumerate(action_payoffs):
            regret = payoff - node_value #动作的反事实值 - 策略的反事实值
            opponent = 1 - player_id
            node.cum_regrets[action_id] += reach_probs[opponent] * regret #计算这个节点的遗憾值

        # 根据行动概率更新节点的累计策略,这个变量用于打印策略信息,不影响
        for action_id, action_prob in enumerate(action_probs):
            node.cum_strategy[action_id] += reach_probs[player_id] * action_prob


        return node_value
				
			

5.main.py

				
					from kuhn import KuhnGame
from cfr import CfrPlayer
import time


def main():
    game = KuhnGame()

    player1 = CfrPlayer("CFR")
    player2 = CfrPlayer("Random")

    start_time = time.time()
    player1.train(10001)
    end_time = time.time()
    print("CFR takes",end_time - start_time) #CFR 0.2558259963989258
    player1.print_info()

    game.add_player(player1)
    game.add_player(player2)
    game.start(1000)
    game.players.clear()
    game.add_player(player2)
    game.add_player(player1)
    game.start(1000)

    print(player1.money, player2.money)


if __name__ == "__main__":
    main()

				
			

6.结语

这是策略表格:
—————先手—————
1 [] : [Check: 0.8613605693431835, Bet: 0.1386394306568165]
1 [0, 1] : [Check: 0.9999912741717137, Bet: 8.725828286260118e-06]
2 [] : [Check: 0.9992974156106671, Bet: 0.0007025843893329053]
2 [0, 1] : [Check: 0.5164528591157757, Bet: 0.48354714088422435]
3 [] : [Check: 0.585470226795728, Bet: 0.414529773204272]
3 [0, 1] : [Check: 1.2746102987836917e-05, Bet: 0.9999872538970122]
—————后手—————
1 [0] : [Check: 0.6683707431378955, Bet: 0.33162925686210454]
1 [1] : [Check: 0.9999850227653966, Bet: 1.4977234603419376e-05]
2 [0] : [Check: 0.9998948822681403, Bet: 0.00010511773185972739]
2 [1] : [Check: 0.6714624487539822, Bet: 0.3285375512460178]
3 [0] : [Check: 1.5005552054260076e-05, Bet: 0.9999849944479458]
3 [1] : [Check: 1.5005552054260076e-05, Bet: 0.9999849944479458]
——————————–

发现其实随机策略的玩家玩的也很好?确实是这样的,我的猜测是因为这里动作太少了,而且我们可以看到许多情况下过牌和下注概率差的也不大。所以会导致这样的情况吧。

之后有空会写MCCFR和手上正在看的论文的算法。

订阅评论
提醒
1 评论
最旧
最新 最多投票
内联反馈
查看所有评论
Fenxiang
4 月 前

orz

《不完全信息博弈:Khun牌与CFR及其简单变体》

1
0
希望看到您的想法,请您发表评论x