构建一个集成Jotai前端、Ruby后端与云端SciPy计算的异步任务平台


我们团队的数据分析师最近遇到了一个瓶颈。他们依赖一组复杂的Python脚本进行蒙特卡洛模拟,这些脚本大量使用SciPy和NumPy。问题在于,这些计算任务通常需要运行5到20分钟,完全锁死他们的本地机器,而且结果分享极为低效——通常是截图或者传来传去的CSV文件。为了解决这个问题,我着手构建一个内部Web平台,目标是实现一个非阻塞、可追踪、结果可视化的异步计算任务流。

技术选型是个有趣的挑战,因为它横跨了多个技术栈:团队的API主力是Ruby on Rails,数据科学的核心是Python/SciPy,而我希望提供一个现代化的、响应迅速的前端体验。最终的架构决策是将这些异构部分通过云服务粘合起来。

sequenceDiagram
    participant FE as Jotai Frontend
    participant API as Ruby on Rails API
    participant MQ as Message Queue (AWS SQS)
    participant Worker as Cloud Function (AWS Lambda + SciPy)
    participant DS as Data Store (AWS S3)

    FE->>+API: POST /api/v1/simulations (params)
    API->>+MQ: Enqueue Job (job_id, params)
    API-->>-FE: { "job_id": "uuid-1234", "status": "queued" }
    
    Note right of MQ: Worker polls the queue
    Worker->>+MQ: Dequeue Job Message
    Worker->>Worker: Run SciPy Calculation...
    
    Note over Worker,DS: Calculation result (e.g., plot.png) is generated
    Worker->>+DS: Upload result to S3
    
    Worker->>+API: PATCH /api/v1/simulations/:id (status, result_url)
    API->>API: Update Job Status in DB
    API-->>-Worker: ACK
    
    Note over FE,API: Real-time update via WebSocket
    API->>FE: Broadcast "status_update" (job_id, status, result_url)
    
    FE->>FE: Update UI State (Jotai Atom)

这个流程的核心是将耗时计算从同步的HTTP请求-响应周期中剥离。用户提交任务后,前端立即获得一个任务ID,然后通过WebSocket接收后续的状态更新。实际的计算工作由云端的、按需启动的Worker来完成。

第一阶段:Ruby on Rails API 与任务模型

Rails作为胶水层,其核心职责是接收任务、将其推送到消息队列,并维护任务状态。我们使用Rails API模式来保持轻量。

首先是任务模型 SimulationJob,它记录了每个计算任务的元数据。

# app/models/simulation_job.rb

class SimulationJob < ApplicationRecord
  # status: queued, processing, completed, failed
  enum status: { queued: 0, processing: 1, completed: 2, failed: 3 }, _default: :queued

  validates :params_json, presence: true
  validates :status, presence: true

  # 在任务创建后,将其推送到消息队列
  after_create_commit :enqueue_job

  private

  def enqueue_job
    # 使用 Rails 内置的 Active Job,后端可以配置为 Sidekiq, Shoryuken 等
    # 这里我们直接调用一个服务类来与云服务商的SDK交互,以保持明确
    QueueingService.new.enqueue(self)
  rescue StandardError => e
    # 如果入队失败,必须将任务状态标记为失败并记录错误
    update(status: :failed, error_message: "Failed to enqueue job: #{e.message}")
    Rails.logger.error "[SimulationJob] Failed to enqueue job #{id}: #{e.message}"
  end
end

控制器 SimulationsController 负责处理前端的请求。它只做三件事:创建记录、触发入队、返回任务ID。

# app/controllers/api/v1/simulations_controller.rb

module Api
  module V1
    class SimulationsController < ApplicationController
      # 省略了错误处理和参数校验的 boilerplate
      
      def create
        job = SimulationJob.create!(simulation_params)
        render json: { job_id: job.id, status: job.status }, status: :accepted
      end

      def show
        job = SimulationJob.find(params[:id])
        render json: {
          job_id: job.id,
          status: job.status,
          result_url: job.result_url,
          error_message: job.error_message,
          created_at: job.created_at
        }
      end

      private

      def simulation_params
        params.require(:simulation).permit(params_json: {})
      end
    end
  end
end

真正的队列操作被封装在 QueueingService 中。在真实项目中,这里会使用AWS SDK for Ruby (aws-sdk-sqs)。

# app/services/queueing_service.rb
require 'aws-sdk-sqs'

class QueueingService
  # 使用环境变量或Rails凭证管理来配置
  SQS_QUEUE_URL = ENV.fetch('AWS_SQS_QUEUE_URL')
  
  def initialize
    # 在生产环境中,IAM角色是最佳实践,而不是硬编码的密钥
    @sqs_client = Aws::SQS::Client.new(region: ENV.fetch('AWS_REGION'))
  end

  def enqueue(job)
    message_body = {
      job_id: job.id,
      params: job.params_json
    }.to_json

    @sqs_client.send_message(
      queue_url: SQS_QUEUE_URL,
      message_body: message_body,
      message_group_id: "simulation-jobs" # 对于FIFO队列是必需的
    )
    
    Rails.logger.info "[QueueingService] Successfully enqueued job #{job.id}"
  end
end

这个后端结构清晰地将职责分离开来:Controller负责HTTP接口,Model负责数据持久化和生命周期回调,Service负责与外部系统(消息队列)的交互。

第二阶段:云端SciPy Worker

计算Worker是整个系统的核心引擎。我们选择使用AWS Lambda,并为其配置一个自定义的Docker容器镜像,因为SciPy和NumPy这类库超出了Lambda默认的层大小限制。

首先是Dockerfile,它定义了我们的运行时环境。

# 使用官方提供的Python基础镜像
FROM public.ecr.aws/lambda/python:3.9

# 安装编译依赖,这些在构建后可以清理掉
RUN yum install -y gcc-c++ && yum clean all

# 将依赖项文件复制到工作目录
COPY requirements.txt ${LAMBDA_TASK_ROOT}/

# 使用pip安装Python库
# --no-cache-dir 减小镜像大小
RUN pip install --no-cache-dir -r ${LAMBDA_TASK_ROOT}/requirements.txt

# 复制我们的处理程序代码
COPY app.py ${LAMBDA_TASK_ROOT}/

# 设置Lambda调用的命令
CMD [ "app.handler" ]

requirements.txt 内容很简单:

scipy
numpy
requests
boto3
matplotlib

我们加入了matplotlib用于生成结果图表,requests用于回调Rails API更新状态。

核心的Python处理逻辑在app.py中。这个函数由Lambda服务在收到SQS消息时触发。

# app.py
import os
import json
import logging
import boto3
import requests
import numpy as np
from scipy.optimize import minimize
import matplotlib.pyplot as plt
import io

# 配置
# 从环境变量获取配置信息
RAILS_API_ENDPOINT = os.environ.get('RAILS_API_ENDPOINT') # e.g., https://api.example.com/api/v1/simulations
RAILS_API_TOKEN = os.environ.get('RAILS_API_TOKEN') # 用于认证的秘密令牌
S3_BUCKET_NAME = os.environ.get('S3_BUCKET_NAME')

# 设置日志
logger = logging.getLogger()
logger.setLevel(logging.INFO)

s3_client = boto3.client('s3')

def perform_heavy_computation(params):
    """
    一个示例性的、有意义的计算任务。
    这里我们使用SciPy的minimize函数来寻找一个复杂函数的最小值。
    这是一个CPU密集型操作。
    """
    logger.info(f"Starting computation with params: {params}")

    # Rosenbrock函数,一个经典的优化测试问题
    def rosen(x):
        return sum(100.0 * (x[1:] - x[:-1]**2.0)**2.0 + (1 - x[:-1])**2.0)

    # 从输入参数获取初始点,并增加一些随机性
    initial_guess_base = params.get('initial_guess', [0, 0, 0, 0])
    x0 = np.array(initial_guess_base) + np.random.rand(len(initial_guess_base)) * 0.1

    # 执行优化
    res = minimize(rosen, x0, method='nelder-mead', options={'xatol': 1e-8, 'disp': False})
    
    logger.info(f"Computation finished. Success: {res.success}, Iterations: {res.nit}")

    # 生成一个可视化结果图表
    fig, ax = plt.subplots()
    ax.plot(res.x, 'o-')
    ax.set_title(f'Optimization Result (iterations: {res.nit})')
    ax.set_xlabel('Dimension')
    ax.set_ylabel('Value')
    
    # 将图表保存到内存中的二进制缓冲区
    buf = io.BytesIO()
    plt.savefig(buf, format='png')
    buf.seek(0)
    
    return {"result": res.x.tolist(), "success": res.success}, buf.getvalue()


def update_job_status(job_id, status, data=None):
    """通过API回调更新Rails数据库中的任务状态"""
    url = f"{RAILS_API_ENDPOINT}/{job_id}"
    headers = {
        "Content-Type": "application/json",
        "Authorization": f"Bearer {RAILS_API_TOKEN}"
    }
    payload = {"status": status}
    if data:
        payload.update(data)
    
    try:
        response = requests.patch(url, json=payload, headers=headers, timeout=5)
        response.raise_for_status()
        logger.info(f"Successfully updated job {job_id} to status {status}")
    except requests.exceptions.RequestException as e:
        logger.error(f"Failed to update job {job_id}: {e}")
        # 在真实场景中,这里应该有重试逻辑或死信队列处理
        raise


def handler(event, context):
    """Lambda的主处理函数"""
    for record in event['Records']:
        message_body = json.loads(record['body'])
        job_id = message_body['job_id']
        params = message_body['params']

        logger.info(f"Processing job {job_id}...")

        try:
            # 1. 更新状态为 'processing'
            update_job_status(job_id, 'processing')

            # 2. 执行核心计算
            result_data, plot_bytes = perform_heavy_computation(params)
            
            # 3. 上传结果到S3
            result_key = f"results/{job_id}/result_plot.png"
            s3_client.put_object(
                Bucket=S3_BUCKET_NAME,
                Key=result_key,
                Body=plot_bytes,
                ContentType='image/png'
            )
            # 生成一个预签名的URL,以便前端可以安全地访问
            result_url = s3_client.generate_presigned_url(
                'get_object',
                Params={'Bucket': S3_BUCKET_NAME, 'Key': result_key},
                ExpiresIn=3600  # 1小时有效
            )
            
            # 4. 更新状态为 'completed' 并附上结果URL
            update_job_status(job_id, 'completed', {
                'result_url': result_url,
                'result_data_json': result_data # 也可以将数值结果存回DB
            })

        except Exception as e:
            logger.exception(f"Error processing job {job_id}")
            update_job_status(job_id, 'failed', {'error_message': str(e)})

    return {'statusCode': 200, 'body': json.dumps('Processing complete')}

这个Worker的设计考虑了生产环境的几个关键点:

  1. 配置分离:所有敏感信息和环境配置都通过环境变量注入。
  2. 状态回调:Worker通过API回调来更新状态,这比让Worker直接访问数据库更安全、更解耦。
  3. 原子性操作:任务状态的更新(processing -> completed/failed)是独立的API调用,确保了任务流转的可追踪性。
  4. 结果存储:大的结果对象(如图表、原始数据文件)存储在S3这类对象存储中,数据库只保存元数据和URL,避免了数据库膨胀。

第三阶段:Jotai驱动的响应式前端

前端的挑战在于如何优雅地管理异步任务的状态,并给用户提供即时反馈。这正是Jotai的用武之地。它的原子化状态管理模型非常适合这种场景,我们可以为任务的各个方面(参数、状态、结果)创建独立的原子,避免了复杂的Reducer和全局状态树。

首先,定义我们的核心原子。

// src/state/simulationAtoms.js
import { atom } from 'jotai';

// 存储用户输入的模拟参数
export const simulationParamsAtom = atom({
  initial_guess: [2.5, 1.5, 3.0, 2.0],
  // 其他参数...
});

// 存储当前任务的ID和状态
// atom的初始值可以是null或一个代表“无任务”的对象
export const currentJobAtom = atom({
  id: null,
  status: 'idle', // idle, queued, processing, completed, failed
  resultUrl: null,
  errorMessage: null,
});

// 一个派生原子,只读,用于判断当前是否有任务在运行
export const isRunningAtom = atom(
  (get) => {
    const status = get(currentJobAtom).status;
    return status === 'queued' || status === 'processing';
  }
);

接下来是UI组件。我们有一个表单用于提交任务,以及一个状态展示区域。

// src/components/SimulationRunner.jsx
import { useAtom } from 'jotai';
import { simulationParamsAtom, currentJobAtom, isRunningAtom } from '../state/simulationAtoms';
import { useActionCable } from '../hooks/useActionCable'; // 自定义hook

const API_BASE_URL = 'http://localhost:3000/api/v1';

export function SimulationRunner() {
  const [params, setParams] = useAtom(simulationParamsAtom);
  const [job, setJob] = useAtom(currentJobAtom);
  const [isRunning] = useAtom(isRunningAtom);

  // 使用自定义Hook来处理WebSocket连接和消息
  const handleJobUpdate = (data) => {
    if (data.job_id === job.id) {
      console.log('Received job update via WebSocket:', data);
      setJob((prev) => ({
        ...prev,
        status: data.status,
        resultUrl: data.result_url,
        errorMessage: data.error_message,
      }));
    }
  };
  
  // 只有在job.id存在时才订阅频道
  useActionCable({ channel: 'SimulationsChannel', id: job.id }, handleJobUpdate);

  const handleSubmit = async (e) => {
    e.preventDefault();
    if (isRunning) return;

    try {
      setJob({ id: null, status: 'queued', resultUrl: null, errorMessage: null });
      
      const response = await fetch(`${API_BASE_URL}/simulations`, {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify({ simulation: { params_json: params } }),
      });

      if (!response.ok) {
        throw new Error('Failed to submit job');
      }

      const data = await response.json();
      setJob((prev) => ({ ...prev, id: data.job_id, status: data.status }));

    } catch (error) {
      setJob({ id: null, status: 'failed', resultUrl: null, errorMessage: error.message });
    }
  };

  // ... 表单输入字段的渲染逻辑,此处省略 ...

  return (
    <div>
      <form onSubmit={handleSubmit}>
        {/* ... form inputs for params ... */}
        <button type="submit" disabled={isRunning}>
          {isRunning ? 'Processing...' : 'Run Simulation'}
        </button>
      </form>

      <div className="status-panel">
        <h3>Job Status</h3>
        {job.id ? (
          <>
            <p>Job ID: {job.id}</p>
            <p>Status: <span className={`status-${job.status}`}>{job.status}</span></p>
            {job.status === 'completed' && job.resultUrl && (
              <div>
                <h4>Result:</h4>
                <img src={job.resultUrl} alt="Simulation Result Plot" style={{ maxWidth: '100%' }} />
              </div>
            )}
            {job.status === 'failed' && <p className="error">Error: {job.errorMessage}</p>}
          </>
        ) : (
          <p>No job submitted yet.</p>
        )}
      </div>
    </div>
  );
}

前端交互的关键在于实时更新。轮询API (/api/v1/simulations/:id)是一个简单但低效的方案。更优雅的实现是使用WebSocket。Rails的Action Cable为此提供了完美的解决方案。

首先在Rails端定义一个Channel:

# app/channels/simulations_channel.rb
class SimulationsChannel < ApplicationCable::Channel
  def subscribed
    # stream_from "simulations_channel" # 广播给所有人
    # 更精细的控制,只让特定用户接收到自己任务的更新
    job = SimulationJob.find(params[:id])
    stream_for job
  end

  def unsubscribed
    # Any cleanup needed when channel is unsubscribed
  end
end

然后,在SimulationJob模型中,当状态更新时,我们广播消息:

# app/models/simulation_job.rb

# ... existing code ...
after_update_commit :broadcast_status_update

private

def broadcast_status_update
  SimulationsChannel.broadcast_to(
    self,
    {
      job_id: self.id,
      status: self.status,
      result_url: self.result_url,
      error_message: self.error_message
    }
  )
end

最后,是前端的useActionCable自定义Hook,它封装了与Action Cable客户端的交互逻辑:

// src/hooks/useActionCable.js
import { useEffect, useRef } from 'react';
import { createConsumer } from '@rails/actioncable';

const consumer = createConsumer('ws://localhost:3000/cable'); // 配置WebSocket URL

export function useActionCable({ channel, id }, onReceived) {
  const subscriptionRef = useRef(null);

  useEffect(() => {
    // 仅当有有效的ID时才进行订阅
    if (id) {
      subscriptionRef.current = consumer.subscriptions.create(
        { channel: channel, id: id },
        {
          received: (data) => {
            onReceived(data);
          },
        }
      );
    }

    // 组件卸载或ID变化时,取消订阅
    return () => {
      if (subscriptionRef.current) {
        consumer.subscriptions.remove(subscriptionRef.current);
        subscriptionRef.current = null;
      }
    };
  }, [channel, id, onReceived]); // 依赖项数组确保只在必要时重新订阅
}

这个前端架构实现了真正的响应式。状态由Jotai的原子管理,UI与原子绑定。当WebSocket消息抵达时,useActionCable Hook调用回调函数,该函数setcurrentJobAtom的值,Jotai会自动、高效地只重渲染依赖该原子的组件。

架构的局限性与未来迭代

graph TD
    subgraph "Current Limitations"
        A[Lambda 15min Timeout] --> B{For >15min jobs, use AWS Batch or Fargate};
        C[Simple SQS Queue] --> D{Complex workflows need AWS Step Functions};
        E[Basic Error Handling] --> F{Implement exponential backoff & dead-letter queues};
    end
    
    subgraph "Future Enhancements"
        G[Interactive Visualization] --> H[Use Plotly.js/D3 to render results from S3 data];
        I[User Authentication] --> J[Integrate Devise/OAuth2 for multi-user support];
        K[Template Library] --> L[Allow users to save and reuse simulation parameters];
    end

这套架构虽然解决了最初的问题,但在生产环境中依然有其边界。AWS Lambda有15分钟的执行超时限制,对于更长的计算任务,需要迁移到AWS Batch或ECS Fargate这类更适合长时间运行的服务。目前的队列机制是先进先出,对于需要多步骤、有依赖关系的复杂计算流,引入AWS Step Functions进行工作流编排会是更稳健的选择。

未来的迭代方向很明确:前端可以引入更强大的可视化库直接渲染从S3获取的原始数据,而不仅仅是显示一张图片;后端则需要加入完善的用户认证与权限体系,让平台能够服务于整个组织,并允许用户管理自己的历史任务。同时,构建一个可复用的计算参数模板库,将极大地提升分析师的工作效率。


  目录