Python SDK

Ice 规则引擎的 Python 实现,与 Java 和 Go SDK 功能完全一致。

安装

pip install ice-rules

要求: Python >= 3.11

快速开始

1. 定义叶子节点

使用 @ice.leaf 装饰器注册叶子节点,支持 IceField 字段描述和 alias 别名:

import ice
from ice import Roam, IceField, IceIgnore
from typing import Annotated

@ice.leaf(
    "com.example.ScoreFlow",
    name="分数判断",
    desc="检查分数是否达标",
    alias=["score_flow"]  # 别名,用于多语言兼容
)
class ScoreFlow:
    """检查分数是否达标"""
    # 使用 Annotated + IceField 添加字段描述(推荐)
    threshold: Annotated[int, IceField(name="分数阈值", desc="判断的分数值")] = 0
    key: Annotated[str, IceField(name="取值键", desc="从roam取值的键名")] = "score"
    
    def do_roam_flow(self, roam: Roam) -> bool:
        return roam.get_int(self.key, 0) >= self.threshold


@ice.leaf("com.example.AmountResult", name="金额计算", desc="根据分数计算金额")
class AmountResult:
    """计算奖励金额"""
    multiplier: Annotated[float, IceField(name="倍率", desc="计算倍率")] = 1.0
    
    def do_roam_result(self, roam: Roam) -> bool:
        score = roam.get_int("score", 0)
        roam.put("amount", score * self.multiplier)
        return True


@ice.leaf("com.example.LogNone")
class LogNone:
    """日志记录"""
    def do_roam_none(self, roam: Roam) -> None:
        print(f"Processing: {roam}")

2. 启动客户端(同步方式)

import ice
from ice import Pack

# 导入包含叶子节点定义的模块(确保装饰器被执行)
from my_flows import ScoreFlow, AmountResult

# 创建并启动客户端
client = ice.FileClient(app=1, storage_path="./ice-data")
client.start()

# 等待启动完成
client.wait_started()

# 执行规则
pack = Pack(ice_id=1)
pack.roam.put("score", 85)

results = ice.sync_process(pack)

# 获取结果
for ctx in results:
    print(f"Amount: {ctx.pack.roam.get('amount')}")
    print(f"Process: {ctx.get_process_info()}")

# 关闭客户端
client.destroy()

3. 异步方式

import asyncio
import ice
from ice import Pack

async def main():
    # 创建异步客户端
    client = ice.AsyncFileClient(app=1, storage_path="./ice-data")
    await client.start()
    
    # 执行规则
    pack = Pack(ice_id=1)
    pack.roam.put("score", 85)
    
    results = await ice.async_process(pack)
    
    for ctx in results:
        print(f"Amount: {ctx.pack.roam.get('amount')}")
    
    await client.destroy()

asyncio.run(main())

叶子节点接口

Python SDK 支持 9 种叶子节点接口,根据实现的方法自动适配:

Flow 类型(返回 True/False)

@ice.leaf("com.example.Flow1")
class ContextFlow:
    def do_flow(self, ctx: ice.Context) -> bool:
        return ctx.pack.roam.get_int("score") > 60

@ice.leaf("com.example.Flow2")
class PackFlow:
    def do_pack_flow(self, pack: ice.Pack) -> bool:
        return pack.roam.get_int("score") > 60

@ice.leaf("com.example.Flow3")
class RoamFlow:
    def do_roam_flow(self, roam: ice.Roam) -> bool:
        return roam.get_int("score") > 60

Result 类型(返回 True/False)

@ice.leaf("com.example.Result1")
class ContextResult:
    def do_result(self, ctx: ice.Context) -> bool:
        ctx.pack.roam.put("result", "done")
        return True

@ice.leaf("com.example.Result2")
class PackResult:
    def do_pack_result(self, pack: ice.Pack) -> bool:
        pack.roam.put("result", "done")
        return True

@ice.leaf("com.example.Result3")
class RoamResult:
    def do_roam_result(self, roam: ice.Roam) -> bool:
        roam.put("result", "done")
        return True

None 类型(无返回值)

@ice.leaf("com.example.None1")
class ContextNone:
    def do_none(self, ctx: ice.Context) -> None:
        print(f"Processing {ctx.ice_id}")

@ice.leaf("com.example.None2")
class PackNone:
    def do_pack_none(self, pack: ice.Pack) -> None:
        print(f"TraceId: {pack.trace_id}")

@ice.leaf("com.example.None3")
class RoamNone:
    def do_roam_none(self, roam: ice.Roam) -> None:
        print(f"Score: {roam.get('score')}")

Roam 数据结构

Roam 是线程安全的字典,用于存储业务数据:

from ice import Roam

roam = Roam()

# 基本操作
roam.put("key", "value")
roam.put_multi({"a": 1, "b": 2})
value = roam.get("key")
value = roam.get("key", "default")

# 多级 key 访问
roam.put("user", {"name": "test", "profile": {"age": 25}})
name = roam.get_multi("user.name")  # "test"
age = roam.get_multi("user.profile.age")  # 25

# 引用其他 key
roam.put("score", 100)
roam.put("ref", "@score")
value = roam.get_union(roam.get("ref"))  # 100

# 类型安全获取
roam.get_str("key", "")
roam.get_int("key", 0)
roam.get_float("key", 0.0)
roam.get_bool("key", False)
roam.get_list("key")
roam.get_dict("key")

客户端配置

基础配置

# 最简方式
client = ice.FileClient(app=1, storage_path="./ice-data")

完整配置

client = ice.FileClient(
    app=1,                      # 应用 ID
    storage_path="./ice-data",  # 存储路径(与 ice-server 共享)
    parallelism=-1,             # 并行度(-1 使用 CPU 核数)
    poll_interval=5.0,          # 轮询间隔(秒)
    heartbeat_interval=30.0,    # 心跳间隔(秒)
)

生命周期

# 启动
client.start()

# 等待启动完成(可选)
client.wait_started(timeout=30.0)

# 检查版本
print(f"Loaded version: {client.loaded_version}")

# 关闭
client.destroy()

自定义日志

from ice.log import Logger, set_logger
from typing import Any


class MyLogger(Logger):
    def debug(self, msg: str, **kwargs: Any) -> None:
        print(f"[DEBUG] {msg} {kwargs}")
    
    def info(self, msg: str, **kwargs: Any) -> None:
        print(f"[INFO] {msg} {kwargs}")
    
    def warn(self, msg: str, **kwargs: Any) -> None:
        print(f"[WARN] {msg} {kwargs}")
    
    def error(self, msg: str, **kwargs: Any) -> None:
        print(f"[ERROR] {msg} {kwargs}")


# 设置自定义日志
set_logger(MyLogger())

字段描述与忽略

字段描述 (IceField)

使用 typing.AnnotatedIceField 为字段添加描述:

from typing import Annotated
from ice import IceField

@ice.leaf("com.example.MyNode")
class MyNode:
    # iceField - 在 UI 中显示名称和描述
    score: Annotated[float, IceField(name="分数阈值", desc="判断分数的阈值")] = 0.0
    key: Annotated[str, IceField(name="取值键", desc="从roam取值的键名")] = ""
    
    # hideField - 无 IceField,可配置但隐藏
    internal: str = ""

字段忽略 (IceIgnore)

不想被配置的字段可以忽略:

from typing import Annotated
from ice import IceIgnore

@ice.leaf("com.example.MyNode")
class MyNode:
    # 方式1: _ 前缀 - 私有字段自动忽略
    _cache: dict = None
    
    # 方式2: IceIgnore - 显式忽略
    service: Annotated[Any, IceIgnore()] = None

别名 (Alias)

支持多语言兼容配置:

@ice.leaf(
    "com.example.ScoreFlow",
    alias=["score_flow", "ScoreFlow"]  # 可响应多种类名
)
class ScoreFlow:
    ...

兼容性

  • 可与 Java/Go SDK 共享 ice-data 目录
  • JSON 序列化格式一致
  • 节点类型和关系语义一致

完整示例

import ice
from ice import Roam, Pack

# 定义叶子节点
@ice.leaf("com.example.ScoreCheck")
class ScoreCheck:
    threshold: int = 60
    
    def do_roam_flow(self, roam: Roam) -> bool:
        return roam.get_int("score", 0) >= self.threshold


@ice.leaf("com.example.CalculateReward")
class CalculateReward:
    rate: float = 0.1
    
    def do_roam_result(self, roam: Roam) -> bool:
        score = roam.get_int("score", 0)
        roam.put("reward", score * self.rate)
        return True


def main():
    # 启动客户端
    client = ice.FileClient(app=1, storage_path="./ice-data")
    client.start()
    client.wait_started()
    
    try:
        # 执行规则
        pack = Pack(ice_id=1)
        pack.roam.put("score", 85)
        pack.roam.put("userId", "user123")
        
        results = ice.sync_process(pack)
        
        for ctx in results:
            reward = ctx.pack.roam.get("reward")
            if reward:
                print(f"User reward: {reward}")
            else:
                print("Score too low, no reward")
            
            # 打印执行过程
            print(f"Process: {ctx.get_process_info()}")
    finally:
        client.destroy()


if __name__ == "__main__":
    main()

版本要求

  • Python >= 3.11
  • 无外部依赖(纯标准库)

License

Apache-2.0