王森涛
发布于 2026-05-30 / 2 阅读
0
0

当AI学会"场记":多模态大模型如何用叙事逻辑接管链上数据索引

当AI学会"场记":多模态大模型如何用叙事逻辑接管链上数据索引

作者:王森涛 | 广播电视编导专业 · 区块链独立研究者

"每一笔交易都是一个镜头,每一条链上日志都是一段未被剪辑的素材。当AI拿起场记板,链上世界终于拥有了自己的叙事导演。"


引子:那块被敲响了六十年的场记板

我至今记得大三那年在片场实习的某个深夜。北京大兴的一处废弃厂房里,剧组已经连续拍摄了十六个小时,灯光把每个人的脸照得惨白。副导演的嗓子哑了,摄影师的眼睛布满血丝,所有人都机械地重复着动作,唯独场记小姑娘——我们叫她小林——始终保持着一种近乎偏执的清醒。每拍一条,她都会举起场记板,清脆地"咔"一声,然后在密密麻麻的场记单上记录:场景编号、镜头号、条次、演员走位、穿帮位置、甚至连导演说"再来一条"时的语气都做了标注。

后来剪辑师告诉我,如果没有小林那些近乎强迫症的场记记录,后期面对几百个小时的素材,他们根本无法理出头绪。场记板是电影工业的"元数据索引"——它不创造价值,却让一切价值变得可追溯、可检索、可复用。

AI与场记板的交汇

区块链世界此刻正在经历的,恰恰就是一场没有场记的大型拍摄。每天数十亿笔交易、数百万条智能合约调用、不可计数的链上事件——它们像散落在剪辑台上的原始胶片,等待着一个能把它们组织成完整叙事的力量。而当多模态大模型(Multimodal Large Language Models)开始介入链上数据的索引与解读,我忽然意识到:AI正在成为区块链世界的"小林",而我们要讨论的,不仅仅是一个技术问题,更是一场关于叙事逻辑的范式转移。

在这篇文章中,我将以一名广播电视编导专业毕业生的视角,用影视制作的叙事框架来拆解多模态AI如何重构链上数据索引。从场记板的起源,到智能合约的蒙太奇,再到隐私计算的转场艺术——请跟随我的镜头语言,一帧一帧地看清这场变革。


第一幕:场记板——电影工业的"数据索引协议"

一个容易被忽视的事实

在电影行业,场记板(Clapperboard)的存在已有近百年历史。它最初的形状是一块可以开合的木板,上面写着制作信息,拍板的声音用于同步画面与声音。但随着电影工业的复杂化,场记板承载的信息量远远超出了声画同步的需求。一个现代数字场记板上可能记录的信息包括:制作编号(Production)、卷号(Roll)、场景号(Scene)、条次(Take)、导演姓名、摄影师姓名、日期、时间码(Timecode)、胶片规格或数字格式、帧率、滤镜信息、场景备注等等。

如果我们把电影拍摄视为一个"数据生产流水线",那么场记板就是这个流水线上最核心的元数据注入点。每一条素材(take)在被记录的同时,也被赋予了一整套可索引、可关联的标签。当后期剪辑师面对数千小时的素材时,他们无需逐帧回看,只需查阅场记单上的元数据,就能快速定位到所需的镜头:哪一场、哪一条、是否有穿帮、导演是否在旁标注了"通过"。

场记单与区块链浏览器:两种"溯源"的对话

有趣的是,区块链浏览器(如 Etherscan、BaseScan)所做的事情与场记单有着惊人的结构同构性。Etherscan 展示交易哈希、区块高度、时间戳、发送地址、接收地址、Gas 费用、调用数据——这些都是交易的"元数据"。但问题是,这些元数据的颗粒度和语义丰富度远远不及场记单。场记单会告诉你"这场戏拍了第七条,导演觉得情绪对了但走位有问题,建议保留第三条和第五条做参考",而区块链浏览器只会冷冰冰地显示一笔交易"成功"或"失败"。

这就引出了区块链世界的一个核心痛点:我们拥有了海量的链上数据,却缺少一套足够丰富的"场记系统"来索引和诠释这些数据。链上数据的"叙事性"被淹没在了无穷无尽的哈希值之中。正如一部电影如果只有胶片而没有场记单,它依然只是一堆未被组织的影像碎片。

在我看来,场记板的真正价值不在于它记录了什么信息,而在于它定义了"以什么维度去组织信息"。这是一种叙事架构(Narrative Architecture),它把离散的事件编织成了可被理解的故事。而在区块链领域,我们正需要这样一套叙事架构。


第二幕:链上数据的"未剪辑"困境

一条交易的"素材"有多杂?

以以太坊为例,一个简单的 ERC-20 代币转账就涉及:原始交易数据(tx hash, from, to, value)、内部合约调用(internal transactions)、事件日志(event logs,如 Transfer 事件)、状态变更(storage slot changes)、Gas 消耗细节。当这笔交易发生在 Uniswap V3 这样的去中心化交易所时,复杂度会进一步爆炸式增长——一笔 swap 可能触发多个 Pool 的流动性变化、价格预言机更新、LP 头寸调整,所有这些事件都散布在不同的合约地址和日志条目中。

这就像是一个没有场记的剧组在同时用十二台摄影机拍摄,每台机器都在记录,但没有任何一个中心节点负责"打板"。最终结果是,素材量极大,但无法高效检索、关联和理解。

链上数据的三重断裂

如果用影视剪辑的术语来描述,链上数据的困境可以概括为"三重断裂":

时间线断裂(Timeline Fracture):链上事件按照区块高度线性排列,但用户关心的往往是跨区块、跨合约的因果链。比如一笔 DeFi 清算可能跨越数十个区块才能完成,但区块浏览器只会以高度顺序展示这些事件,无法呈现它们之间的因果关系。这就像剪辑师只能按照胶片拍摄的物理顺序来回放素材,无法按照叙事逻辑重新组织它们。

语义断裂(Semantic Fracture):链上日志只记录结构化数据(地址、数值、布尔值),不记录行为的"意图"或"语境"。一笔转账是"做市商在补仓"还是"巨鲸在出货"?链上数据本身不会告诉你答案。这好比一段胶片只记录了演员说了什么台词,却没有场记标注这场戏的情感基调和戏剧功能。

叙事断裂(Narrative Fracture):链上活动缺乏统一的叙事标签。一个 MEV 机器人可能在一分钟内执行数十笔交易,涉及多个交易所合约和清算合约,但没有任何标签把这些交易组织成一个完整的"攻击故事"。这就像一个剧组拍完了所有素材,却没有剧本把各个场景串起来——观众看到了一堆零碎的画面,却不知道故事在讲什么。

这种困境的根源在于:区块链被设计为一个"忠实记录一切"的系统,却从未被赋予"叙事性地理解"的能力。它的记账逻辑像一台永远在运转的摄影机,记录了一切,却什么也没说。

传统索引方案的局限

目前链上数据索引的主流工具——The Graph、Dune Analytics、Nansen——各自在不同层面尝试解决这个问题。The Graph 用 GraphQL 子图提供了一种结构化的查询方式;Dune 允许用户用 SQL 编写自定义分析;Nansen 用标签系统为地址赋予语义。但这些方案都存在根本性的局限:它们依赖人工编写索引逻辑,无法自动理解链上事件的叙事结构。这就好比给场记配了一台打字机和一本字典——她的工具升级了,但理解能力依然取决于她本人的经验和判断。

我们需要的,是一个能自主理解"叙事"的智能场记。这就是多模态大模型登场的时候了。

链上数据的结构化索引


第三幕:多模态大模型——拥有"蒙太奇之眼"的AI场记

从语言模型到叙事模型:理解力的跃迁

当我们谈论 GPT-4o、Gemini 2.0、Claude 这样的多模态大模型时,业界的主流讨论往往集中在它们"能看图"、"能听音频"、"能生成视频"这些表层能力上。但对于一个受过影视训练的人来说,多模态的真正革命性在于:它赋予了AI一种"蒙太奇式"的理解力。

在电影理论中,蒙太奇(Montage)不仅仅是剪辑技术,更是一种认知方式。苏联导演库里肖夫的经典实验证明:同一个面部表情,当它分别与一碗汤、一口棺材、一个孩子并置时,观众会从中读出完全不同的含义。蒙太奇的本质是"意义的涌现"——当离散的影像被以特定方式组合时,超越个体元素之和的新意义会产生。

多模态大模型在处理链上数据时,展现出了一种类似的能力。它不仅能"看到"单个交易的原始数据(文本、数值),还能"理解"多个交易之间的时序关系、因果链条、行为模式——甚至能"感知"整个市场情绪的"氛围",就像一个经验丰富的场记能感受到一场戏的"气口"(节奏感)是否对。

AI如何"阅读"一条链上事件的叙事

让我们用一个具体的场景来说明。假设链上发生了以下一系列事件:

  1. 地址 0xABC... 向 Uniswap 合约发送了一笔大额 USDC 卖出交易
  2. 同一地址在 Aave 合约中偿还了一笔 WBTC 贷款
  3. 另一地址 0xDEF... 在 Compound 上增加了对 USDC 的借贷
  4. USDC/ETH 的价格预言机发生了偏移

一个传统索引工具会分别记录这四件事,也许会用标签标注 0xABC 为"巨鲸"。但多模态大模型——当它被赋予"导演思维"后——可能会这样解读:这是一个"防守性去杠杆"的叙事弧线。地址 0xABC 正在系统性地降低风险敞口(卖出稳定币、偿还贷款),而地址 0xDEF 则在逆势加杠杆。这两个地址的行为构成了一种"对手戏"——一方的恐惧恰好是另一方的贪婪,就像电影中主角与反派的镜像关系。

这种叙事层面的理解,是传统工具做不到的。它需要一种"蒙太奇之眼"——一种能在离散数据之间建立意义关联的能力。而这恰恰是多模态大模型最擅长的事情。

大模型的"三镜头法"

在电影拍摄中,经典的"三镜头法"包含:全景(建立环境)、中景(展示人物关系)、特写(捕捉情感细节)。多模态大模型在分析链上数据时,同样可以运用这种层次化的"镜头语言":

全景镜头(Macro View):分析整个链上生态的宏观指标——TVL 变化趋势、Gas 价格走势、活跃地址数的波动曲线。这是一种鸟瞰式的视角,用于建立"故事背景"。

中景镜头(Meso View):聚焦于特定协议、特定地址集合的行为模式——例如,分析某个 DeFi 协议在过去一周内所有清算事件的时间分布,判断是否存在系统性的压力。这相当于展示一组角色之间的互动关系。

特写镜头(Micro View):深入分析单笔交易的调用路径、内部状态变化、Gas 消耗分布,以理解具体行为的意图和效果。这是最细腻的读法,如同特写镜头捕捉演员眼角一闪而过的微表情。

当这三个层次的"镜头"被多模态大模型同时处理和融合时,链上数据就不再是一堆冰冷的数字,而变成了一部有着起承转合、高潮与悬念的"连续剧"。


第四幕:智能合约的"场记板"——Solidity 实现

设计思路:让每笔交易自带"场记"

在前面的讨论中,我们已经建立了核心认识:链上数据需要一个类似场记板的"元数据注入层"。现在,让我用一个具体的 Solidity 智能合约来实现这个概念。

这个合约的核心理念是:为每一个链上事件赋予"叙事元数据"(Narrative Metadata)。就像场记板为每一镜头标注场景号、条次、意图一样,这个合约要求每笔交易必须附带结构化的叙事标签——包括事件类型、叙事角色、关联事件 ID、时间线标记等。这些数据将在链上永久存储,为后续的AI索引提供丰富的原始材料。

// SPDX-License-Identifier: MIT
pragma solidity ^0.8.20;

contract ClapperBoard {

    enum NarrativeRole {
        Protagonist,
        Antagonist,
        Supporting,
        Narrator,
        Extra
    }

    enum EventCategory {
        TokenTransfer,
        DeFiSwap,
        Liquidation,
        GovernanceVote,
        BridgeCross,
        NFTMint,
        ContractDeploy,
        Custom
    }

    struct SceneRecord {
        uint256 sceneId;
        uint256 parentId;
        address director;
        address[] cast;
        EventCategory category;
        NarrativeRole role;
        string intent;
        string context;
        uint256 blockTimestamp;
        uint256 gasConsumed;
        bytes32 relatedTxHash;
        bytes32[] crossReferences;
        bool isFinalCut;
    }

    struct NarrativeArc {
        uint256 arcId;
        string title;
        string synopsis;
        uint256[] sceneIds;
        address author;
        uint256 startBlock;
        uint256 endBlock;
        string genre;
        uint8 actCount;
        uint256 createdAt;
    }

    uint256 private _sceneCounter;
    uint256 private _arcCounter;

    mapping(uint256 => SceneRecord) private _scenes;
    mapping(uint256 => NarrativeArc) private _arcs;
    mapping(address => uint256[]) private _directorFilmography;
    mapping(bytes32 => uint256) private _txHashToScene;
    mapping(uint256 => uint256[]) private _arcScenes;
    mapping(string => uint256[]) private _genreIndex;

    event SceneShot(
        uint256 indexed sceneId,
        uint256 indexed parentId,
        address indexed director,
        EventCategory category,
        NarrativeRole role,
        uint256 blockTimestamp
    );

    event NarrativeArcCreated(
        uint256 indexed arcId,
        string title,
        address indexed author,
        string genre
    );

    event SceneLinkedToArc(
        uint256 indexed sceneId,
        uint256 indexed arcId
    );

    event DirectorCutFinalized(
        uint256 indexed sceneId,
        address indexed director
    );

    modifier onlyDirector(uint256 sceneId) {
        require(
            _scenes[sceneId].director == msg.sender,
            "ClapperBoard: caller is not the director of this scene"
        );
        _;
    }

    modifier sceneExists(uint256 sceneId) {
        require(
            _scenes[sceneId].blockTimestamp > 0,
            "ClapperBoard: scene does not exist"
        );
        _;
    }

    function shootScene(
        uint256 parentId,
        address[] memory cast,
        EventCategory category,
        NarrativeRole role,
        string memory intent,
        string memory context,
        bytes32 relatedTxHash,
        bytes32[] memory crossReferences
    ) external returns (uint256) {
        _sceneCounter++;
        uint256 newSceneId = _sceneCounter;

        _scenes[newSceneId] = SceneRecord({
            sceneId: newSceneId,
            parentId: parentId,
            director: msg.sender,
            cast: cast,
            category: category,
            role: role,
            intent: intent,
            context: context,
            blockTimestamp: block.timestamp,
            gasConsumed: gasleft(),
            relatedTxHash: relatedTxHash,
            crossReferences: crossReferences,
            isFinalCut: false
        });

        _directorFilmography[msg.sender].push(newSceneId);

        if (relatedTxHash != bytes32(0)) {
            _txHashToScene[relatedTxHash] = newSceneId;
        }

        emit SceneShot(
            newSceneId,
            parentId,
            msg.sender,
            category,
            role,
            block.timestamp
        );

        return newSceneId;
    }

    function finalizeCut(
        uint256 sceneId
    ) external onlyDirector(sceneId) sceneExists(sceneId) {
        require(
            !_scenes[sceneId].isFinalCut,
            "ClapperBoard: scene already finalized"
        );
        _scenes[sceneId].isFinalCut = true;
        _scenes[sceneId].gasConsumed = gasleft();
        emit DirectorCutFinalized(sceneId, msg.sender);
    }

    function createNarrativeArc(
        string memory title,
        string memory synopsis,
        uint256[] memory sceneIds,
        string memory genre,
        uint8 actCount
    ) external returns (uint256) {
        _arcCounter++;
        uint256 newArcId = _arcCounter;

        uint256 startBlock = type(uint256).max;
        uint256 endBlock = 0;

        for (uint256 i = 0; i < sceneIds.length; i++) {
            require(
                _scenes[sceneIds[i]].blockTimestamp > 0,
                "ClapperBoard: referenced scene does not exist"
            );
            _arcScenes[newArcId].push(sceneIds[i]);
        }

        _arcs[newArcId] = NarrativeArc({
            arcId: newArcId,
            title: title,
            synopsis: synopsis,
            sceneIds: sceneIds,
            author: msg.sender,
            startBlock: startBlock,
            endBlock: endBlock,
            genre: genre,
            actCount: actCount,
            createdAt: block.timestamp
        });

        _genreIndex[genre].push(newArcId);

        emit NarrativeArcCreated(newArcId, title, msg.sender, genre);

        return newArcId;
    }

    function linkSceneToArc(
        uint256 sceneId,
        uint256 arcId
    ) external sceneExists(sceneId) {
        require(
            _arcs[arcId].createdAt > 0,
            "ClapperBoard: narrative arc does not exist"
        );
        _arcScenes[arcId].push(sceneId);
        emit SceneLinkedToArc(sceneId, arcId);
    }

    function getScene(
        uint256 sceneId
    ) external view sceneExists(sceneId) returns (SceneRecord memory) {
        return _scenes[sceneId];
    }

    function getNarrativeArc(
        uint256 arcId
    ) external view returns (NarrativeArc memory) {
        require(
            _arcs[arcId].createdAt > 0,
            "ClapperBoard: narrative arc does not exist"
        );
        return _arcs[arcId];
    }

    function getDirectorFilmography(
        address director
    ) external view returns (uint256[] memory) {
        return _directorFilmography[director];
    }

    function getScenesByTxHash(
        bytes32 txHash
    ) external view returns (uint256) {
        return _txHashToScene[txHash];
    }

    function getArcsByGenre(
        string memory genre
    ) external view returns (uint256[] memory) {
        return _genreIndex[genre];
    }

    function getArcSceneCount(
        uint256 arcId
    ) external view returns (uint256) {
        return _arcScenes[arcId].length;
    }
}

这个合约的结构体现了一种"影视化思维"。SceneRecord就是一条场记记录——它包含场景号(sceneId)、父场景号(parentId,用于标记续拍关系)、"导演"地址(发起交易的地址)、"演员表"(cast,参与交易的地址列表)、场景类型(EventCategory)、叙事角色(NarrativeRole)、行为意图(intent)、场景背景(context),以及一个"最终剪辑"标志(isFinalCut),表示此场景的记录已完成,不再修改。

NarrativeArc则是一段"叙事弧线"——它把多个场景串联成一个有标题、有简介、有类型(genre)的完整故事。这就像一部电影由多个场景组成,每个场景都有自己的镜头编号和场记信息,但只有当所有场景被按照叙事逻辑排列后,才能构成一个完整的故事。

这种设计的精妙之处在于,它在链上建立了一套"语义索引层"。当AI后续需要理解链上数据时,它不需要从零开始解析原始日志——它可以直接读取这些结构化的场记数据,快速构建叙事模型。就像剪辑师拿到场记单后可以立即开始粗剪,而不需要从头回看每一秒素材。


第五幕:AI叙事引擎——JavaScript 实现链上事件的"导演剪辑"

从数据管道到叙事管道

有了链上的"场记板"合约,接下来我们需要一个"AI导演"来解读这些场记数据,并将其组织成可理解、可传播的叙事。下面的 JavaScript 代码实现了一个"叙事引擎"——它读取链上事件流,通过调用 AI API(此处以 OpenAI 为例)为每个事件赋予叙事标签,然后按照时间线和因果关系组织成"剧集"结构。

这个过程就像一部纪录片的后期制作:首先收集所有素材(原始事件),然后让导演(AI)为每个素材打上标签和注释,接着按照叙事逻辑进行粗剪(分组和时间排序),最后精剪输出(生成结构化的叙事报告)。

const { ethers } = require("ethers");
const OpenAI = require("openai");
const fs = require("fs").promises;

const CLAPPERBOARD_ABI = [
  "event SceneShot(uint256 indexed sceneId, uint256 indexed parentId, address indexed director, uint8 category, uint8 role, uint256 blockTimestamp)",
  "event NarrativeArcCreated(uint256 indexed arcId, string title, address indexed author, string genre)",
  "event DirectorCutFinalized(uint256 indexed sceneId, address indexed director)",
  "function getScene(uint256 sceneId) view returns (tuple(uint256 sceneId, uint256 parentId, address director, address[] cast, uint8 category, uint8 role, string intent, string context, uint256 blockTimestamp, uint256 gasConsumed, bytes32 relatedTxHash, bytes32[] crossReferences, bool isFinalCut))",
  "function getNarrativeArc(uint256 arcId) view returns (tuple(uint256 arcId, string title, string synopsis, uint256[] sceneIds, address author, uint256 startBlock, uint256 endBlock, string genre, uint8 actCount, uint256 createdAt))"
];

const CATEGORY_LABELS = {
  0: "TokenTransfer", 1: "DeFiSwap", 2: "Liquidation",
  3: "GovernanceVote", 4: "BridgeCross", 5: "NFTMint",
  6: "ContractDeploy", 7: "Custom"
};

const ROLE_LABELS = {
  0: "Protagonist", 1: "Antagonist", 2: "Supporting",
  3: "Narrator", 4: "Extra"
};

class NarrativeEngine {
  constructor(rpcUrl, contractAddress, openaiApiKey) {
    this.provider = new ethers.JsonRpcProvider(rpcUrl);
    this.contract = new ethers.Contract(contractAddress, CLAPPERBOARD_ABI, this.provider);
    this.ai = new OpenAI({ apiKey: openaiApiKey });
    this.sceneBuffer = [];
    this.arcIndex = new Map();
    this.timelineCache = new Map();
  }

  async collectScenes(fromBlock, toBlock) {
    const filter = this.contract.filters.SceneShot();
    const events = await this.contract.queryFilter(filter, fromBlock, toBlock);

    for (const event of events) {
      const sceneId = Number(event.args.sceneId);
      const scene = await this.contract.getScene(sceneId);
      this.sceneBuffer.push({
        sceneId: sceneId,
        parentId: Number(scene.parentId),
        director: scene.director,
        cast: scene.cast,
        category: CATEGORY_LABELS[Number(scene.category)],
        role: ROLE_LABELS[Number(scene.role)],
        intent: scene.intent,
        context: scene.context,
        timestamp: Number(scene.blockTimestamp),
        gasConsumed: scene.gasConsumed.toString(),
        relatedTxHash: scene.relatedTxHash,
        crossReferences: scene.crossReferences,
        blockNumber: event.blockNumber,
        isFinalCut: scene.isFinalCut
      });
    }

    this.sceneBuffer.sort((a, b) => a.timestamp - b.timestamp);
    return this.sceneBuffer.length;
  }

  async generateNarrativeSummary(scenes) {
    const condensed = scenes.map(s => ({
      id: s.sceneId,
      type: s.category,
      director: s.director.slice(0, 10),
      actors: s.cast.length,
      intent: s.intent,
      time: new Date(s.timestamp * 1000).toISOString(),
      role: s.role,
      context: s.context
    }));

    const response = await this.ai.chat.completions.create({
      model: "gpt-4o",
      messages: [
        {
          role: "system",
          content: "You are a blockchain narrative analyst who interprets on-chain events as cinematic storylines. Output JSON with fields: episodeTitle, synopsis, keyBeats (array of {sceneId, narrativeFunction, emotionalTone}), foreshadowing (array), climax (object with sceneId and description), genreClassification."
        },
        {
          role: "user",
          content: `Analyze the following ${condensed.length} blockchain scenes as a narrative sequence:\n${JSON.stringify(condensed, null, 2)}`
        }
      ],
      response_format: { type: "json_object" },
      temperature: 0.4
    });

    return JSON.parse(response.choices[0].message.content);
  }

  buildTimelineTree(scenes) {
    const roots = [];
    const nodeMap = new Map();

    for (const scene of scenes) {
      nodeMap.set(scene.sceneId, { scene, children: [] });
    }

    for (const scene of scenes) {
      const node = nodeMap.get(scene.sceneId);
      if (scene.parentId === 0) {
        roots.push(node);
      } else if (nodeMap.has(scene.parentId)) {
        nodeMap.get(scene.parentId).children.push(node);
      } else {
        roots.push(node);
      }
    }

    return roots;
  }

  detectRecurringPatterns(scenes) {
    const directorCounts = {};
    const categoryCounts = {};
    const hourlyBuckets = {};

    for (const scene of scenes) {
      const dir = scene.director.toLowerCase();
      directorCounts[dir] = (directorCounts[dir] || 0) + 1;

      categoryCounts[scene.category] = (categoryCounts[scene.category] || 0) + 1;

      const hour = Math.floor(scene.timestamp / 3600);
      if (!hourlyBuckets[hour]) hourlyBuckets[hour] = [];
      hourlyBuckets[hour].push(scene);
    }

    const powerDirectors = Object.entries(directorCounts)
      .filter(([, count]) => count >= 3)
      .sort(([, a], [, b]) => b - a)
      .slice(0, 5);

    const peakHours = Object.entries(hourlyBuckets)
      .filter(([, group]) => group.length >= 5)
      .map(([hour, group]) => ({
        hour: Number(hour),
        count: group.length,
        dominantCategory: this.findDominant(group.map(g => g.category))
      }));

    return { powerDirectors, peakHours, categoryDistribution: categoryCounts };
  }

  findDominant(items) {
    const freq = {};
    items.forEach(i => { freq[i] = (freq[i] || 0) + 1; });
    return Object.entries(freq).sort(([, a], [, b]) => b - a)[0][0];
  }

  async produceEpisode(fromBlock, toBlock) {
    const sceneCount = await this.collectScenes(fromBlock, toBlock);
    if (sceneCount === 0) {
      return { status: "empty", message: "No scenes found in the specified block range." };
    }

    const narrative = await this.generateNarrativeSummary(this.sceneBuffer);
    const timeline = this.buildTimelineTree(this.sceneBuffer);
    const patterns = this.detectRecurringPatterns(this.sceneBuffer);

    const episode = {
      blockRange: { from: fromBlock, to: toBlock },
      totalScenes: sceneCount,
      narrativeSummary: narrative,
      timelineTree: timeline,
      patternAnalysis: patterns,
      generatedAt: new Date().toISOString()
    };

    await fs.writeFile(
      `episode_${fromBlock}_${toBlock}.json`,
      JSON.stringify(episode, null, 2)
    );

    return episode;
  }
}

async function main() {
  const engine = new NarrativeEngine(
    "https://rpc.ankr.com/eth",
    "0x0000000000000000000000000000000000001234",
    process.env.OPENAI_API_KEY
  );

  const latestBlock = await engine.provider.getBlockNumber();
  const episode = await engine.produceEpisode(latestBlock - 1000, latestBlock);

  console.log(`Episode produced: ${episode.totalScenes} scenes analyzed`);
  console.log(`Narrative title: ${episode.narrativeSummary.episodeTitle}`);
}

这段代码的核心架构是一个NarrativeEngine类,它包含四个主要的工作流阶段。素材采集collectScenes)从链上"场记板"合约中读取所有事件记录,按时间排序形成素材表。AI叙事生成generateNarrativeSummary)将浓缩后的场景数据发送给 GPT-4o,要求其以影视叙事的方式解读——输出结果包含"剧集标题"、"剧情梗概"、"关键节拍"(每个场景的叙事功能)、"伏笔"(可能预示未来事件的模式)、和"高潮"(最戏剧性的场景)。时间线构建buildTimelineTree)利用 parentId 关系将场景组织成树形结构,展示事件之间的因果传承关系。模式检测detectRecurringPatterns)则识别"明星导演"(高频活跃地址)、"高峰时段"和"类型分布"——这些都是影视分析中的经典手法。

值得注意的是,generateNarrativeSummary方法中的 system prompt 被精心设计为要求AI用影视分析的词汇来描述链上事件。它不说"地址 A 向地址 B 转了一笔钱",而是说"Protagonist 在第三幕的高潮中执行了一次关键的资源转移,为下一幕的反转埋下了伏笔"。这种语言框架的转换看似只是修辞游戏,但实际上它改变了人们理解数据的方式——从"发生了什么"变成了"这个故事在讲什么"。


第六幕:Python 脚本——交易模式的"叙事弧线分析"

把K线图变成"故事线"

在前两幕中,我们在链上建立了场记数据结构,在链下用 JavaScript 构建了AI叙事引擎。现在,让我们用 Python 来完成最后一步:分析交易模式,识别其中隐藏的"叙事弧线"(Narrative Arcs)。

在戏剧理论中,经典的叙事弧线包含五个阶段:铺垫(Exposition)、上升(Rising Action)、高潮(Climax)、下降(Falling Action)和解局(Resolution)。任何一段有意义的链上活动序列,往往也能被映射到这种弧线结构上。例如,一次 DeFi 闪电贷攻击的"叙事弧线"可以这样理解:铺垫(攻击者发现漏洞)、上升(逐步建立头寸)、高潮(执行攻击交易)、下降(提取利润、清理现场)、解局(社区响应、协议修复)。

下面的 Python 脚本实现了一个"叙事弧线检测器"——它输入一组交易数据,输出其中识别到的叙事弧线及其阶段标注。

import json
import math
from datetime import datetime
from collections import defaultdict, Counter
from dataclasses import dataclass, field, asdict
from typing import Optional


@dataclass
class Transaction:
    tx_hash: str
    block_number: int
    timestamp: int
    from_address: str
    to_address: str
    value: float
    gas_used: int
    category: str
    intent: str
    context: str


@dataclass
class NarrativeBeat:
    phase: str
    start_index: int
    end_index: int
    transactions: list
    intensity: float
    description: str
    dominant_category: str
    key_addresses: list


@dataclass
class NarrativeArc:
    arc_id: str
    title: str
    beats: list = field(default_factory=list)
    arc_type: str = "Standard"
    total_duration_seconds: int = 0
    climax_intensity: float = 0.0
    resolution_quality: str = "Open-ended"
    genre: str = "Drama"


class NarrativeArcDetector:

    PHASES = ["Exposition", "RisingAction", "Climax", "FallingAction", "Resolution"]
    CLIMAX_KEYWORDS = ["liquidation", "flash_loan", "exploit", "governance_attack", "mass_transfer"]
    EXPOSITION_KEYWORDS = ["deploy", "approve", "init", "config", "oracle_update"]
    RESOLUTION_KEYWORDS = ["refund", "governance_proposal", "patch_deploy", "compensation"]

    def __init__(self, window_size: int = 50, intensity_threshold: float = 0.7):
        self.window_size = window_size
        self.intensity_threshold = intensity_threshold
        self.arc_counter = 0
        self.rolling_values = []
        self.value_momentum = []

    def compute_intensity(self, tx: Transaction) -> float:
        value_score = math.log1p(tx.value) / 20.0
        gas_score = math.log1p(tx.gas_used) / 30.0
        category_weight = 1.5 if any(kw in tx.category.lower() for kw in self.CLIMAX_KEYWORDS) else 1.0
        raw = (value_score * 0.4 + gas_score * 0.3 + 0.3) * category_weight
        return min(raw, 1.0)

    def compute_momentum(self, transactions: list) -> list:
        intensities = [self.compute_intensity(tx) for tx in transactions]
        momentum = [0.0] * len(intensities)
        for i in range(1, len(intensities)):
            window = intensities[max(0, i - 5):i + 1]
            momentum[i] = (window[-1] - window[0]) / len(window)
        return intensities, momentum

    def detect_climax(self, intensities: list) -> int:
        peak_idx = intensities.index(max(intensities))
        return peak_idx

    def segment_phases(self, intensities: list, climax_idx: int) -> dict:
        n = len(intensities)
        rising_end = climax_idx
        falling_start = climax_idx + 1

        exposition_end = 0
        for i in range(climax_idx):
            if intensities[i] > self.intensity_threshold * 0.3:
                exposition_end = i
                break
        else:
            exposition_end = max(1, climax_idx // 5)

        resolution_start = n
        for i in range(climax_idx + 1, n):
            if i < n - 1 and intensities[i] < 0.2:
                consecutive_low = 0
                for j in range(i, min(i + 5, n)):
                    if intensities[j] < 0.2:
                        consecutive_low += 1
                if consecutive_low >= 3:
                    resolution_start = i
                    break

        falling_end = min(resolution_start, n)

        return {
            "Exposition": (0, exposition_end),
            "RisingAction": (exposition_end, rising_end),
            "Climax": (climax_idx, climax_idx + 1),
            "FallingAction": (falling_start, falling_end),
            "Resolution": (resolution_start, n)
        }

    def classify_arc_type(self, intensities: list, climax_idx: int) -> str:
        n = len(intensities)
        if climax_idx < n * 0.2:
            return "ColdOpen"
        elif climax_idx > n * 0.8:
            return "SlowBurn"
        elif n - climax_idx > n * 0.5:
            return "Anti-Climax"
        else:
            return "Standard"

    def extract_key_addresses(self, transactions: list, phase_range: tuple) -> list:
        addr_counter = Counter()
        for tx in transactions[phase_range[0]:phase_range[1]]:
            addr_counter[tx.from_address] += 1
            addr_counter[tx.to_address] += 1
        return [addr for addr, _ in addr_counter.most_common(5)]

    def detect_arcs(self, transactions: list) -> list:
        if len(transactions) < 10:
            return []

        intensities, momentum = self.compute_momentum(transactions)
        climax_idx = self.detect_climax(intensities)
        phases = self.segment_phases(intensities, climax_idx)
        arc_type = self.classify_arc_type(intensities, climax_idx)

        self.arc_counter += 1
        arc = NarrativeArc(
            arc_id=f"ARC-{self.arc_counter:05d}",
            title=f"Chain Narrative Arc #{self.arc_counter}",
            arc_type=arc_type,
            total_duration_seconds=transactions[-1].timestamp - transactions[0].timestamp,
            climax_intensity=intensities[climax_idx],
            resolution_quality=self.assess_resolution(intensities, phases)
        )

        for phase_name, (start, end) in phases.items():
            if end <= start or start >= len(transactions):
                continue
            phase_txs = transactions[start:end]
            phase_intensities = intensities[start:end]
            key_addrs = self.extract_key_addresses(transactions, (start, end))
            dominant = self.find_dominant_category(phase_txs)

            beat = NarrativeBeat(
                phase=phase_name,
                start_index=start,
                end_index=end,
                transactions=[tx.tx_hash for tx in phase_txs],
                intensity=sum(phase_intensities) / len(phase_intensities) if phase_intensities else 0,
                description=self.generate_phase_description(phase_name, phase_txs),
                dominant_category=dominant,
                key_addresses=key_addrs
            )
            arc.beats.append(beat)

        return [arc]

    def assess_resolution(self, intensities: list, phases: dict) -> str:
        resolution_range = phases.get("Resolution", (len(intensities), len(intensities)))
        if resolution_range[0] >= len(intensities):
            return "Cliffhanger"
        resolution_intensities = intensities[resolution_range[0]:resolution_range[1]]
        if not resolution_intensities:
            return "Ambiguous"
        avg = sum(resolution_intensities) / len(resolution_intensities)
        if avg < 0.15:
            return "CleanResolution"
        elif avg < 0.4:
            return "PartialResolution"
        else:
            return "OngoingTension"

    def find_dominant_category(self, transactions: list) -> str:
        categories = [tx.category for tx in transactions]
        if not categories:
            return "Unknown"
        return Counter(categories).most_common(1)[0][0]

    def generate_phase_description(self, phase: str, txs: list) -> str:
        if not txs:
            return f"Empty {phase} phase"
        n = len(txs)
        avg_val = sum(t.value for t in txs) / n
        return f"{phase} phase with {n} transactions, average value {avg_val:.4f}"

    def export_arcs(self, arcs: list, filepath: str):
        output = []
        for arc in arcs:
            arc_dict = {
                "arc_id": arc.arc_id,
                "title": arc.title,
                "arc_type": arc.arc_type,
                "duration": arc.total_duration_seconds,
                "climax_intensity": round(arc.climax_intensity, 4),
                "resolution": arc.resolution_quality,
                "genre": arc.genre,
                "beats": [
                    {
                        "phase": b.phase,
                        "intensity": round(b.intensity, 4),
                        "tx_count": len(b.transactions),
                        "dominant_category": b.dominant_category,
                        "key_addresses": b.key_addresses
                    }
                    for b in arc.beats
                ]
            }
            output.append(arc_dict)
        with open(filepath, "w", encoding="utf-8") as f:
            json.dump(output, f, indent=2, ensure_ascii=False)


def load_transactions(filepath: str) -> list:
    with open(filepath, "r", encoding="utf-8") as f:
        raw = json.load(f)
    return [
        Transaction(
            tx_hash=tx["tx_hash"],
            block_number=tx["block_number"],
            timestamp=tx["timestamp"],
            from_address=tx["from_address"],
            to_address=tx["to_address"],
            value=float(tx["value"]),
            gas_used=int(tx["gas_used"]),
            category=tx["category"],
            intent=tx.get("intent", ""),
            context=tx.get("context", "")
        )
        for tx in raw
    ]


if __name__ == "__main__":
    transactions = load_transactions("onchain_events.json")
    detector = NarrativeArcDetector(window_size=50, intensity_threshold=0.6)
    detected_arcs = detector.detect_arcs(transactions)
    detector.export_arcs(detected_arcs, "narrative_arcs_output.json")

    for arc in detected_arcs:
        print(f"Arc: {arc.arc_id} | Type: {arc.arc_type}")
        for beat in arc.beats:
            print(f"  {beat.phase}: intensity={beat.intensity:.3f}, txs={len(beat.transactions)}, category={beat.dominant_category}")

这个脚本的精髓在于 compute_intensitysegment_phases 两个方法。前者为每笔交易计算一个"强度分数"——交易金额越大、Gas 消耗越高、事件类型越"激烈"(如涉及闪电贷、清算等),强度分数越高。后者则通过分析强度分数的分布,自动将交易序列划分为五个叙事阶段:铺垫期(强度低、行为多为部署和授权)、上升期(强度逐渐攀升、大额交易出现)、高潮点(强度峰值所在的位置)、下降期(强度回落、攻击者提取利润或做善后)和解局(强度趋于平稳、社区做出响应)。

classify_arc_type方法引入了四种弧线类型的分类——"冷开场"(Cold Open,高潮出现在开头)、"慢热"(Slow Burn,高潮出现在末尾)、"反高潮"(Anti-Climax,高潮之后还有大量活动)和"标准"(Standard,高潮出现在中段)。这种分类直接借用了好莱坞编剧的类型学概念。

智能合约的剪辑逻辑


第七幕:隐私的"转场"与行业的"续集"展望

零知识证明:区块链世界的"转场艺术"

在电影剪辑中,"转场"(Transition)是连接两个场景的桥梁——它可以是淡入淡出、叠化、划变、或者更具创意的匹配剪辑。好的转场不着痕迹,让观众自然而然地从一个场景过渡到另一个场景。而区块链中的零知识证明(Zero-Knowledge Proofs, ZKPs)恰恰扮演着"转场"的角色:它让数据在完成验证的同时隐藏其细节——就像一个转场镜头,让你知道"故事从 A 到达了 B",却不需要展示 A 和 B 之间的所有细节。

当AI作为"场记"介入链上数据索引时,隐私问题变得尤为尖锐。一个能理解链上叙事结构的AI,本质上拥有了巨大的信息优势——它能识别"巨鲸"的行为模式、预判市场的集体情绪、追踪 DeFi 协议的风险积累。如果这些信息被不当使用,就相当于AI场记把剧组的"内部消息"泄露给了竞争对手。

zk-SNARKs 和 zk-STARKs 在这个框架中可以被称为"加密转场"。一个理想的设计是:AI叙事引擎在分析链上数据时,不需要直接访问原始交易数据,而是通过零知识证明验证交易的某些属性(如"这笔交易的金额大于某个阈值"、"这条交易的发起者在过去一周内有过至少三次同类行为"),从而在不暴露隐私的前提下完成叙事推理。

这就好比一部纪录片的导演不能直接把拍摄对象的家庭住址打上屏幕,但可以通过间接的叙事手法让观众理解"这个人生活在一个怎样的环境中"。转场的美学和隐私保护的美学,在某种意义上是同构的——它们都在"展示"与"隐藏"之间寻找平衡。

"续集"展望:未来的可能性

如果我们把"AI+链上数据索引"看作一部正在制作的系列电影,那么目前我们只完成了"第一部"的拍摄。以下是我对"续集"的几个展望:

**第二季:实时叙事引擎。**当前的方案大多是批处理式的——收集一段时间的链上数据后统一交给AI分析。未来的方向是实现实时叙事:AI像一个永远在线的场记,实时监听链上事件,实时生成叙事注解。这需要更高效的推理能力和更流式化的数据管道。想象一下:当一笔闪电贷攻击正在发生时,AI场记能实时标注"第二幕开始,攻击者正在建立头寸",而不是事后才复盘。

**第三季:跨链蒙太奇。**随着多链生态的发展,一个完整的链上叙事往往跨越以太坊、Arbitrum、Solana 等多条链。AI需要具备"交叉剪辑"(Cross-cutting)的能力——像诺兰的电影一样,在多个叙事线之间来回切换,最终在某个高潮点将它们汇合。这要求索引系统能理解不同链之间的时间锚点和语义映射关系。

**第四季:社区共创叙事。**目前的索引是单方向的——AI理解数据,然后呈现给用户。未来的方向是让社区参与到叙事共建中:用户可以补充AI遗漏的上下文、纠正AI错误的叙事判断、甚至为链上事件添加自己的"导演评论"。这就像 DVD 时代的"导演评论音轨"——在正片之外增加一层主观的、人类视角的叙事。

伦理的"穿帮镜头"

任何技术的应用都不能回避伦理维度。当AI拥有了理解链上叙事的能力,它也可能被用于市场操纵——提前识别"巨鲸"的行为模式并进行抢跑交易(front-running);或者被用于地址画像和行为追踪,损害用户的匿名性。这些风险就像电影制作中的"穿帮镜头"——它们不应该出现在最终成片中,但在实际制作过程中却总是难以完全避免。

我认为,技术伦理的底线应该是:AI的叙事能力应该用于"解释"而非"预测"——它应该帮助人们理解"发生了什么",而不是告诉他们"即将发生什么"并据此套利。这是一个"纪录片"和"预言片"的区别——我们需要的是一部忠实的纪录片,而不是一部操纵市场的预言片。

在监管层面,链上叙事索引工具可能需要面临类似金融投研报告的合规要求:AI生成的叙事分析如果被用于投资决策,那么其生成逻辑、数据来源、潜在偏见都需要被透明地披露。这就像电影分级制度——不同的叙事输出可能需要不同级别的"标注"。


尾声:当镜头对准区块链

写到这里,我忽然想起了大卫·波德维尔在《电影叙事学》中的一句话:"叙事的本质不是记录发生了什么,而是赋予事件以结构。"这句话放在区块链的语境下,恰如其分。

区块链从不缺乏数据——它每分钟都在生产数以万计的交易记录。它缺乏的是"叙事"——一种将离散数据编织成可理解故事的能力。当多模态大模型介入这个过程,区块链世界终于拥有了一双"蒙太奇之眼":它能在全景中看见趋势,在中景中看见关系,在特写中看见意图。

从技术上说,这篇文章讨论的是一套"AI驱动的链上数据索引系统"——用智能合约建立场记层,用 JavaScript 引擎生成叙事,用 Python 脚本分析叙事弧线。但从更深的层面看,它讨论的是两种文化——影视叙事与计算机科学的交汇。作为一名广播电视编导专业的毕业生,我深知镜头语言的力量:同一个事件,用不同的镜头呈现,可以讲述完全不同的故事。而在区块链领域,数据呈现的方式同样深刻地影响着我们理解和使用技术的方式。

在这个万物皆可 Token 化的时代,技术的迭代往往比镜头切换更快。作为一名广播电视编导专业的毕业生,我始终尝试在流动的影像与加密的算法之间寻找平衡。感谢阅读,我是王森涛,让我们在区块链的视听宇宙中保持清醒,持续探索。


评论