Skip to content

Latest commit

 

History

History
915 lines (781 loc) · 29.5 KB

File metadata and controls

915 lines (781 loc) · 29.5 KB

数据开发模块技术方案

1. 概述

数据开发模块是 DataJump 平台的核心功能模块,为数据开发工程师提供高效的开发环境。主要包含三个子模块:

  • SQL IDE:专业的 SQL 开发环境,支持智能补全、语法高亮、执行计划可视化
  • DAG 可视化编排:拖拽式任务编排,直观展示任务依赖关系
  • 发布流程:多环境发布管理,支持审批流程

2. SQL IDE

2.1 功能概览

┌─────────────────────────────────────────────────────────────────────────────┐
│  文件树  │                    SQL Editor                      │  结果面板   │
│  ┌─────┐ │  ┌─────────────────────────────────────────────┐  │  ┌───────┐  │
│  │项目A │ │  │ SELECT                                      │  │  │ 表格  │  │
│  │ ├─任务│ │  │   user_id,                                  │  │  │ 视图  │  │
│  │ ├─任务│ │  │   user_name,                                │  │  │       │  │
│  │项目B │ │  │   SUM(order_amount) as total                 │  │  ├───────┤  │
│  │ ├─任务│ │  │ FROM orders                                 │  │  │ 图表  │  │
│  │       │ │  │ GROUP BY user_id, user_name                 │  │  │ 视图  │  │
│  └─────┘ │  └─────────────────────────────────────────────┘  │  └───────┘  │
├──────────┼───────────────────────────────────────────────────┼─────────────┤
│          │  ┌─ 执行 ─┐ ┌─ 格式化 ─┐ ┌─ 执行计划 ─┐          │             │
│  元数据  │  │   ▶    │ │    {}    │ │     📊     │          │   历史      │
│  浏览器  │  └────────┘ └──────────┘ └────────────┘          │   记录      │
└──────────┴───────────────────────────────────────────────────┴─────────────┘

2.2 核心功能

功能 说明
语法高亮 SQL 关键字、函数、表名、字段名不同颜色
智能补全 关键字、表名、字段名、函数自动补全
错误提示 实时语法检查,错误行标红
格式化 SQL 自动格式化美化
执行计划 可视化展示执行计划树
多 Tab 支持多个 SQL 文件同时编辑
快捷键 Ctrl+Enter 执行,Ctrl+S 保存等

2.3 Monaco Editor 集成

// SQL 语言配置
const sqlLanguageConfig: monaco.languages.LanguageConfiguration = {
  comments: {
    lineComment: '--',
    blockComment: ['/*', '*/'],
  },
  brackets: [
    ['(', ')'],
    ['[', ']'],
  ],
  autoClosingPairs: [
    { open: '(', close: ')' },
    { open: '[', close: ']' },
    { open: '"', close: '"' },
    { open: "'", close: "'" },
  ],
};

// 自动补全提供器
class SQLCompletionProvider implements monaco.languages.CompletionItemProvider {
  private metadataService: MetadataService;

  async provideCompletionItems(
    model: monaco.editor.ITextModel,
    position: monaco.Position
  ): Promise<monaco.languages.CompletionList> {
    const word = model.getWordUntilPosition(position);
    const range = {
      startLineNumber: position.lineNumber,
      endLineNumber: position.lineNumber,
      startColumn: word.startColumn,
      endColumn: word.endColumn,
    };

    const suggestions: monaco.languages.CompletionItem[] = [];

    // SQL 关键字补全
    suggestions.push(...this.getKeywordSuggestions(range));

    // 表名补全
    const tables = await this.metadataService.getTables();
    suggestions.push(
      ...tables.map((table) => ({
        label: table.name,
        kind: monaco.languages.CompletionItemKind.Class,
        insertText: table.name,
        detail: table.comment,
        range,
      }))
    );

    // 字段补全(根据上下文)
    const contextTable = this.parseContextTable(model, position);
    if (contextTable) {
      const columns = await this.metadataService.getColumns(contextTable);
      suggestions.push(
        ...columns.map((col) => ({
          label: col.name,
          kind: monaco.languages.CompletionItemKind.Field,
          insertText: col.name,
          detail: `${col.type} - ${col.comment}`,
          range,
        }))
      );
    }

    // 函数补全
    suggestions.push(...this.getFunctionSuggestions(range));

    return { suggestions };
  }
}

2.4 SQL 执行引擎

public interface SQLExecutor {
    /**
     * 执行 SQL 查询
     */
    QueryResult executeQuery(ExecuteRequest request);

    /**
     * 获取执行计划
     */
    ExecutionPlan explainQuery(String sql, String engine);

    /**
     * 取消执行
     */
    void cancelQuery(String queryId);

    /**
     * 获取执行状态
     */
    QueryStatus getQueryStatus(String queryId);
}

public class ExecuteRequest {
    private String sql;
    private String engine;         // HIVE, SPARK, PRESTO
    private Long datasourceId;
    private Map<String, Object> params;
    private Integer limit;
    private Integer timeout;
}

public class QueryResult {
    private String queryId;
    private QueryStatus status;
    private List<ColumnMeta> columns;
    private List<List<Object>> data;
    private Long totalRows;
    private Long executionTime;
    private String errorMessage;
}

2.5 执行计划可视化

interface ExecutionPlanNode {
  id: string;
  operator: string;        // TableScan, Filter, Aggregate, Join, etc.
  table?: string;
  columns?: string[];
  predicate?: string;
  cost?: number;
  rows?: number;
  children: ExecutionPlanNode[];
}

const ExecutionPlanTree: React.FC<{ plan: ExecutionPlanNode }> = ({ plan }) => {
  return (
    <div className="plan-node">
      <div className="node-header">
        <span className="operator">{plan.operator}</span>
        {plan.cost && <span className="cost">Cost: {plan.cost}</span>}
        {plan.rows && <span className="rows">Rows: {plan.rows}</span>}
      </div>
      {plan.table && <div className="detail">Table: {plan.table}</div>}
      {plan.predicate && <div className="detail">Filter: {plan.predicate}</div>}
      {plan.children.length > 0 && (
        <div className="children">
          {plan.children.map((child) => (
            <ExecutionPlanTree key={child.id} plan={child} />
          ))}
        </div>
      )}
    </div>
  );
};

2.6 查询结果处理

interface QueryResultProps {
  result: QueryResult;
  onExport: (format: 'csv' | 'excel' | 'json') => void;
}

const QueryResultPanel: React.FC<QueryResultProps> = ({ result, onExport }) => {
  const [viewMode, setViewMode] = useState<'table' | 'chart'>('table');
  const [chartConfig, setChartConfig] = useState<ChartConfig>();

  return (
    <div className="result-panel">
      <div className="toolbar">
        <Radio.Group value={viewMode} onChange={(e) => setViewMode(e.target.value)}>
          <Radio.Button value="table">表格</Radio.Button>
          <Radio.Button value="chart">图表</Radio.Button>
        </Radio.Group>
        <Dropdown menu={{ items: exportMenuItems }}>
          <Button icon={<ExportOutlined />}></Button>
        </Dropdown>
        <span className="stats">
          {result.totalRows}  | {result.executionTime}ms
        </span>
      </div>

      {viewMode === 'table' ? (
        <Table
          columns={result.columns.map((col) => ({
            title: col.name,
            dataIndex: col.name,
            ellipsis: true,
          }))}
          dataSource={result.data}
          pagination={{ pageSize: 100 }}
          scroll={{ x: 'max-content', y: 400 }}
          virtual
        />
      ) : (
        <ChartBuilder data={result} config={chartConfig} onConfigChange={setChartConfig} />
      )}
    </div>
  );
};

3. DAG 可视化编排

3.1 功能概览

┌─────────────────────────────────────────────────────────────────────────────┐
│  工具栏:保存 | 运行 | 添加节点 | 执行历史 | 调度配置                          │
├─────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│                           DAG 画布 (ReactFlow)                              │
│         ┌─────────┐      ┌─────────┐      ┌─────────┐                      │
│         │ [SQL]   ├─────▶│ [SHELL] ├─────▶│ [PYTHON]│                      │
│         │ 数据源   │      │  清洗    │      │  分析   │                      │
│         └─────────┘      └────┬────┘      └─────────┘                      │
│                               │                                             │
│                               ▼                                             │
│                         ┌─────────┐                                         │
│                         │ [SPARK] │                                         │
│                         │  聚合   │                                         │
│                         └─────────┘                                         │
│                                                                             │
├─────────────────────────────────────────────────────────────────────────────┤
│  节点属性(Drawer):名称 | 类型 | 脚本内容 | 参数配置                          │
└─────────────────────────────────────────────────────────────────────────────┘

已实现功能

功能 说明
画布编辑 ReactFlow 画布,拖拽节点和连线
节点类型 支持 SQL、SHELL、SPARK、FLINK、PYTHON 五种任务类型
节点配置 点击节点打开属性 Drawer,配置名称、类型、脚本内容
保存/加载 画布数据通过 API 持久化存储
手动触发 一键触发 DAG 执行
执行历史 查看 DAG 实例列表及任务状态
任务日志 查看任务实例的完整执行日志
状态显示 节点显示任务执行状态(等待/运行/成功/失败/超时)
调度配置 配置 Cron 表达式,启用/禁用定时调度
Cron 验证 实时验证 Cron 表达式有效性,显示可读描述

3.2 ReactFlow 集成

import ReactFlow, {
  Node,
  Edge,
  Controls,
  MiniMap,
  Background,
  useNodesState,
  useEdgesState,
  addEdge,
  Connection,
} from 'reactflow';

interface DAGEditorProps {
  dag: DAG;
  onSave: (dag: DAG) => void;
}

const DAGEditor: React.FC<DAGEditorProps> = ({ dag, onSave }) => {
  const [nodes, setNodes, onNodesChange] = useNodesState(dag.tasks.map(taskToNode));
  const [edges, setEdges, onEdgesChange] = useEdgesState(dag.edges.map(depToEdge));
  const [selectedNode, setSelectedNode] = useState<Node | null>(null);

  const onConnect = useCallback(
    (connection: Connection) => {
      // 检查是否形成循环依赖
      if (wouldCreateCycle(nodes, edges, connection)) {
        message.error('不能创建循环依赖');
        return;
      }
      setEdges((eds) => addEdge(connection, eds));
    },
    [nodes, edges]
  );

  const onDrop = useCallback(
    (event: React.DragEvent) => {
      event.preventDefault();
      const type = event.dataTransfer.getData('application/reactflow');
      const position = reactFlowInstance.project({
        x: event.clientX,
        y: event.clientY,
      });

      const newNode: Node = {
        id: `task_${Date.now()}`,
        type: 'taskNode',
        position,
        data: { label: `新${type}任务`, taskType: type },
      };

      setNodes((nds) => nds.concat(newNode));
    },
    [reactFlowInstance]
  );

  return (
    <div className="dag-editor">
      <NodePanel />
      <div className="canvas" onDrop={onDrop} onDragOver={(e) => e.preventDefault()}>
        <ReactFlow
          nodes={nodes}
          edges={edges}
          onNodesChange={onNodesChange}
          onEdgesChange={onEdgesChange}
          onConnect={onConnect}
          onNodeClick={(_, node) => setSelectedNode(node)}
          nodeTypes={nodeTypes}
          fitView
        >
          <Controls />
          <MiniMap />
          <Background variant="dots" gap={12} size={1} />
        </ReactFlow>
      </div>
      <PropertyPanel node={selectedNode} onChange={handleNodeChange} />
    </div>
  );
};

3.3 自定义节点组件

interface TaskNodeData {
  label: string;
  taskType: 'SQL' | 'SHELL' | 'SPARK' | 'FLINK' | 'PYTHON';
  status?: 'pending' | 'running' | 'success' | 'failed';
  config?: TaskConfig;
}

const TaskNode: React.FC<NodeProps<TaskNodeData>> = ({ data, selected }) => {
  const statusColor = {
    pending: '#d9d9d9',
    running: '#1890ff',
    success: '#52c41a',
    failed: '#ff4d4f',
  };

  const typeIcon = {
    SQL: <DatabaseOutlined />,
    SHELL: <CodeOutlined />,
    SPARK: <ThunderboltOutlined />,
    FLINK: <ApiOutlined />,
    PYTHON: <PythonOutlined />,
  };

  return (
    <div className={`task-node ${selected ? 'selected' : ''}`}>
      <Handle type="target" position={Position.Top} />
      <div className="node-header">
        <span className="type-icon">{typeIcon[data.taskType]}</span>
        <span className="label">{data.label}</span>
        {data.status && (
          <span
            className="status-dot"
            style={{ backgroundColor: statusColor[data.status] }}
          />
        )}
      </div>
      <Handle type="source" position={Position.Bottom} />
    </div>
  );
};

const nodeTypes = {
  taskNode: TaskNode,
};

3.4 属性配置面板

interface PropertyPanelProps {
  node: Node<TaskNodeData> | null;
  onChange: (nodeId: string, data: Partial<TaskNodeData>) => void;
}

const PropertyPanel: React.FC<PropertyPanelProps> = ({ node, onChange }) => {
  if (!node) {
    return <div className="property-panel empty">请选择一个节点</div>;
  }

  return (
    <div className="property-panel">
      <Form layout="vertical">
        <Form.Item label="节点名称">
          <Input
            value={node.data.label}
            onChange={(e) => onChange(node.id, { label: e.target.value })}
          />
        </Form.Item>

        <Form.Item label="任务类型">
          <Select value={node.data.taskType} disabled>
            <Option value="SQL">SQL</Option>
            <Option value="SHELL">Shell</Option>
            <Option value="SPARK">Spark</Option>
            <Option value="FLINK">Flink</Option>
            <Option value="PYTHON">Python</Option>
          </Select>
        </Form.Item>

        {/* 根据任务类型渲染不同的配置项 */}
        {node.data.taskType === 'SQL' && <SQLTaskConfig node={node} onChange={onChange} />}
        {node.data.taskType === 'SHELL' && <ShellTaskConfig node={node} onChange={onChange} />}
        {node.data.taskType === 'SPARK' && <SparkTaskConfig node={node} onChange={onChange} />}

        <Collapse>
          <Panel header="调度配置" key="schedule">
            <ScheduleConfig node={node} onChange={onChange} />
          </Panel>
          <Panel header="重试策略" key="retry">
            <RetryConfig node={node} onChange={onChange} />
          </Panel>
          <Panel header="告警配置" key="alert">
            <AlertConfig node={node} onChange={onChange} />
          </Panel>
        </Collapse>
      </Form>
    </div>
  );
};

3.5 DAG 校验

interface ValidationResult {
  valid: boolean;
  errors: ValidationError[];
  warnings: ValidationWarning[];
}

interface ValidationError {
  nodeId?: string;
  edgeId?: string;
  type: 'CYCLE' | 'ORPHAN' | 'MISSING_CONFIG' | 'INVALID_CONNECTION';
  message: string;
}

class DAGValidator {
  validate(nodes: Node[], edges: Edge[]): ValidationResult {
    const errors: ValidationError[] = [];
    const warnings: ValidationWarning[] = [];

    // 1. 检查循环依赖
    if (this.hasCycle(nodes, edges)) {
      errors.push({
        type: 'CYCLE',
        message: 'DAG 中存在循环依赖',
      });
    }

    // 2. 检查孤立节点
    const orphanNodes = this.findOrphanNodes(nodes, edges);
    orphanNodes.forEach((node) => {
      warnings.push({
        nodeId: node.id,
        type: 'ORPHAN',
        message: `节点 "${node.data.label}" 没有上游或下游依赖`,
      });
    });

    // 3. 检查必填配置
    nodes.forEach((node) => {
      const configErrors = this.validateNodeConfig(node);
      errors.push(...configErrors);
    });

    // 4. 检查连接有效性
    edges.forEach((edge) => {
      if (!this.isValidConnection(edge, nodes)) {
        errors.push({
          edgeId: edge.id,
          type: 'INVALID_CONNECTION',
          message: '无效的依赖连接',
        });
      }
    });

    return {
      valid: errors.length === 0,
      errors,
      warnings,
    };
  }

  private hasCycle(nodes: Node[], edges: Edge[]): boolean {
    // Kahn's algorithm 检测循环
    const inDegree = new Map<string, number>();
    const adjacency = new Map<string, string[]>();

    nodes.forEach((node) => {
      inDegree.set(node.id, 0);
      adjacency.set(node.id, []);
    });

    edges.forEach((edge) => {
      adjacency.get(edge.source)!.push(edge.target);
      inDegree.set(edge.target, inDegree.get(edge.target)! + 1);
    });

    const queue: string[] = [];
    inDegree.forEach((degree, nodeId) => {
      if (degree === 0) queue.push(nodeId);
    });

    let visited = 0;
    while (queue.length > 0) {
      const nodeId = queue.shift()!;
      visited++;

      adjacency.get(nodeId)!.forEach((nextId) => {
        const newDegree = inDegree.get(nextId)! - 1;
        inDegree.set(nextId, newDegree);
        if (newDegree === 0) queue.push(nextId);
      });
    }

    return visited !== nodes.length;
  }
}

4. 发布流程(已实现)

4.1 环境管理

采用 DEV → PROD 两层环境模型,单人审批制:

┌─────────────────────────────────────────────────────────────────┐
│                        发布流程                                  │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│   ┌─────────────┐         ┌─────────────┐                      │
│   │   开发 DEV   │────────▶│   生产 PROD  │                      │
│   │  (编辑/保存)  │         │  (审批+部署)  │                      │
│   └─────────────┘         └─────────────┘                      │
│        │                        │                               │
│        ▼                        ▼                               │
│   自动创建版本              单人审批 + 一键部署                  │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

发布对象:DAGSQL_TASK(保存的查询)

4.2 版本管理

基于 JSON 快照的版本管理,自动递增版本号,内容去重(相同内容不创建新版本):

@Service
public class VersionService extends ServiceImpl<ResourceVersionMapper, ResourceVersion> {
    private final DiffService diffService;

    /**
     * 创建版本:自动递增版本号,内容去重
     */
    public ResourceVersion createVersion(String resourceType, Long resourceId,
                                          String contentSnapshot, String changeSummary, Long userId) {
        ResourceVersion latest = getLatestVersion(resourceType, resourceId);
        // 内容未变化则跳过
        if (latest != null && latest.getContentSnapshot().equals(contentSnapshot)) {
            return null;
        }
        int nextVersion = (latest != null) ? latest.getVersionNumber() + 1 : 1;
        // 创建新版本...
    }

    /**
     * 回滚:创建新版本(非破坏性),内容复制自目标版本
     */
    public ResourceVersion rollback(Long versionId, Long userId) {
        ResourceVersion target = getById(versionId);
        String changeSummary = "回滚到版本 v" + target.getVersionNumber();
        return createVersion(target.getResourceType(), target.getResourceId(),
                target.getContentSnapshot(), changeSummary, userId);
    }

    /**
     * Diff:比较两个版本的 JSON 快照差异
     */
    public List<DiffEntry> diffVersions(Long fromVersionId, Long toVersionId) {
        return diffService.diff(
            getById(fromVersionId).getContentSnapshot(),
            getById(toVersionId).getContentSnapshot());
    }
}

DAG 快照内容示例:

{
  "name": "示例ETL任务",
  "cronExpression": "0 30 2 * * *",
  "scheduleEnabled": true,
  "status": 1,
  "tasks": [
    {"id": 1, "name": "task1", "type": "SQL", "script": "SELECT * FROM users"},
    {"id": 2, "name": "task2", "type": "SHELL", "script": "echo done"}
  ]
}

4.3 JSON Tree Diff 引擎

递归比较 JSON 树结构,支持 id-based 数组元素匹配:

@Service
public class DiffService {
    /**
     * 对比两个 JSON 字符串,返回变更列表
     * - 对象:逐字段递归比较
     * - 数组(含 id 字段):按 id 匹配元素,检测增删改
     * - 数组(无 id 字段):按索引逐项比较
     * - 叶子节点:值比较
     */
    public List<DiffEntry> diff(String oldJson, String newJson);
}

DiffEntry 输出格式:

[
  {"path": "cronExpression", "type": "MODIFIED", "oldValue": "0 0 * * *", "newValue": "0 30 2 * * *"},
  {"path": "tasks[id=1].script", "type": "MODIFIED", "oldValue": "SELECT 1", "newValue": "SELECT * FROM users"},
  {"path": "tasks[id=2]", "type": "ADDED", "oldValue": null, "newValue": "{...}"}
]

4.4 发布流程状态机

┌──────────────┐
│   PENDING    │  提交发布(选择审批人)
└──────┬───────┘
       │
    ┌──┴──┐
    │     │
approve  reject
    │     │
    ▼     ▼
┌───────┐ ┌──────────────┐
│APPROVED│ │   REJECTED   │
└───┬───┘ └──────────────┘
    │ deploy
    ▼
┌──────────────┐
│   DEPLOYED   │  已上线(更新 DeployedVersion 记录)
└──────────────┘

4.5 发布服务

@Service
public class ReleaseService extends ServiceImpl<ReleaseMapper, Release> {
    private final DeployedVersionMapper deployedVersionMapper;

    /** 提交发布 */
    public Release submit(String resourceType, Long resourceId, Long versionId,
                          Long submitUserId, Long approverUserId, String comment);

    /** 审批通过 - 仅 PENDING 状态可操作 */
    public void approve(Long releaseId, String comment);

    /** 审批驳回 - 仅 PENDING 状态可操作 */
    public void reject(Long releaseId, String comment);

    /** 部署上线 - 仅 APPROVED 状态可操作,更新 DeployedVersion */
    public void deploy(Long releaseId, Long deployUserId);

    /** 查询当前线上版本 */
    public DeployedVersion getDeployedVersion(String resourceType, Long resourceId);
}

4.6 前端组件

组件 文件 说明
VersionHistory pages/DataDevelop/VersionHistory.tsx 版本列表 Drawer,支持选择两个版本对比、回滚、线上版本标记
DiffViewer pages/DataDevelop/DiffViewer.tsx Diff 展示 Modal,结构变更表格 + Monaco DiffEditor
ReleaseManagement pages/DataDevelop/ReleaseManagement.tsx 发布管理页,三个 Tab:待我审批/我的发布/发布记录

DAG 编辑器集成:

  • 工具栏:「版本」按钮打开 VersionHistory、「发布」按钮提交发布请求
  • 状态栏:显示当前线上版本标签("线上: v{N}" 或 "未发布")
  • 保存画布时自动创建版本快照

SQL IDE 集成:

  • 收藏查询保存时自动创建 SQL_TASK 版本
  • 收藏列表中每条查询有「发布」按钮

5. 数据库设计

-- 资源版本表(通用,支持 DAG/SQL_TASK 等多种资源类型)
CREATE TABLE t_resource_version (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    resource_type VARCHAR(50) NOT NULL COMMENT 'DAG, SQL_TASK',
    resource_id BIGINT NOT NULL,
    version_number INT NOT NULL,
    content_snapshot LONGTEXT NOT NULL COMMENT 'JSON 快照',
    change_summary VARCHAR(500),
    created_by BIGINT,
    created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
    INDEX idx_resource (resource_type, resource_id),
    INDEX idx_version (resource_type, resource_id, version_number)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

-- 发布记录表
CREATE TABLE t_release (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    resource_type VARCHAR(50) NOT NULL COMMENT 'DAG, SQL_TASK',
    resource_id BIGINT NOT NULL,
    version_id BIGINT NOT NULL COMMENT '关联 t_resource_version.id',
    status VARCHAR(20) NOT NULL COMMENT 'PENDING/APPROVED/REJECTED/DEPLOYED',
    submit_user_id BIGINT NOT NULL,
    approver_user_id BIGINT NOT NULL,
    submit_comment TEXT,
    approve_comment TEXT,
    approved_at DATETIME,
    deployed_at DATETIME,
    created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
    updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    INDEX idx_resource (resource_type, resource_id),
    INDEX idx_status (status),
    INDEX idx_approver (approver_user_id, status)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

-- 已部署版本表(每个资源仅一条记录,表示当前线上版本)
CREATE TABLE t_deployed_version (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    resource_type VARCHAR(50) NOT NULL,
    resource_id BIGINT NOT NULL,
    version_id BIGINT NOT NULL COMMENT '关联 t_resource_version.id',
    deployed_at DATETIME NOT NULL,
    deployed_by BIGINT,
    UNIQUE KEY uk_resource (resource_type, resource_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

6. API 设计

6.1 SQL IDE API

# 执行 SQL
POST /api/v1/sql/execute
Request:
  sql: string
  engine: string
  datasourceId: number
  params: object
  limit: number

# 获取执行计划
POST /api/v1/sql/explain
Request:
  sql: string
  engine: string

# 取消执行
POST /api/v1/sql/cancel/{queryId}

# 获取执行历史
GET /api/v1/sql/history
Query:
  taskId: number
  page: number
  size: number

# 格式化 SQL
POST /api/v1/sql/format
Request:
  sql: string

6.2 DAG 编排 API

# 保存 DAG
PUT /api/v1/dags/{dagId}
Request:
  tasks: Task[]
  edges: Edge[]

# 校验 DAG
POST /api/v1/dags/{dagId}/validate

# 自动布局
POST /api/v1/dags/{dagId}/auto-layout

6.3 版本管理 API

# 创建版本
POST /api/v1/versions
Request:
  resourceType: string       # DAG, SQL_TASK
  resourceId: number
  contentSnapshot: string    # JSON 快照
  changeSummary: string
  userId: number

# 查询版本列表
GET /api/v1/versions?resourceType=DAG&resourceId=1

# 获取版本详情
GET /api/v1/versions/{id}

# 版本 Diff
GET /api/v1/versions/diff?from=1&to=2

# 回滚版本
POST /api/v1/versions/{id}/rollback
Request:
  userId: number

6.4 发布流程 API

# 提交发布
POST /api/v1/releases
Request:
  resourceType: string       # DAG, SQL_TASK
  resourceId: number
  versionId: number
  submitUserId: number
  approverUserId: number
  comment: string

# 查询发布列表
GET /api/v1/releases?status=PENDING&approverId=1

# 获取发布详情
GET /api/v1/releases/{id}

# 审批通过
POST /api/v1/releases/{id}/approve
Request:
  comment: string

# 审批驳回
POST /api/v1/releases/{id}/reject
Request:
  comment: string

# 部署上线
POST /api/v1/releases/{id}/deploy
Request:
  userId: number

# 查询线上版本
GET /api/v1/releases/deployed/{resourceType}/{resourceId}