基于 Haskell 与 WebSocket 实现一个带状态机前端的二阶段提交协调器


需要在一个原子操作中同时更新 S3 上的文件和 PostgreSQL 中的元数据记录。这是一个典型的分布式事务场景。最初的方案是先上传文件,成功后再写入数据库,失败则尝试删除文件。但这套补偿逻辑脆弱不堪,网络抖动或服务重启都可能导致状态不一致——要么是文件存在但元数据缺失(孤儿文件),要么是元数据存在但文件上传失败。业务要求强一致性,任何中间状态的失败都必须让整个操作回滚到初始状态。

这直接将我们引向了分布式事务协议。Saga 模式虽然流行,但其最终一致性的本质无法满足此处的原子性要求。在权衡了实现的复杂性和业务的强一致性需求后,我们决定采用一个“经典”但常常被诟病的协议:二阶段提交(Two-Phase Commit, 2PC)。我知道它的弊端:同步阻塞、协调者单点故障、性能问题。但对于这个特定的、低频但一致性要求极高的场景,它是一个可以接受的权衡。

挑战在于如何实现它。我们需要一个健壮的协调器(Coordinator)和两个参与者(Participants):S3 操作服务和数据库服务。更棘手的是,这个操作由前端用户发起,用户需要实时、清晰地了解这个漫长事务的每一个阶段。简单地显示一个加载动画,然后在几秒甚至几十秒后告诉用户“成功”或“失败”,是完全无法接受的。

这个需求最终的技术栈选型显得有些非主流,但每项决策都经过了深思熟虑:

  1. 协调器后端:Haskell。 对于协调器这种状态和并发逻辑极其复杂的组件,正确性是第一要务。Haskell 强悍的类型系统、纯函数特性以及优秀的并发原语(如 MVar 和 STM)能最大限度地在编译期捕获逻辑错误,构建出高度可靠的服务。
  2. 通信协议:WebSockets。 2PC 是一个有状态的、多阶段的“对话”过程。使用无状态的 RESTful API 来回轮询会非常笨拙且低效。一个长连接的 WebSocket 非常适合这种场景,服务器可以主动向客户端推送事务状态的实时变更。
  3. 前端状态管理:XState。 2PC 事务的生命周期是一个完美的有限状态机(FSM)。PREPARING, PREPARED, COMMITTING, ABORTING… 这些状态之间的转换必须被严格管理。XState 让我们能以一种声明式、可预测且可视化的方式来管理这种复杂性。
  4. 辅助控制平面:RESTful API。 对于查询历史事务状态、健康检查等非实时、幂等的操作,传统的 RESTful API 依然是最佳选择。

我们的目标是构建一个系统:用户在前端点击“发布”,前端 XState 状态机开始运转,通过 WebSocket 连接到 Haskell 协调器,协调器通过内部 RPC 指挥两个参与者服务完成 2PC 流程,并将每一步状态变化实时推送回前端。

数据结构与协调器核心

首先在 Haskell 中定义核心数据类型。类型的明确性是 Haskell 可靠性的基石。

-- file: src/Types.hs
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE OverloadedStrings #-}

module Types where

import Data.Aeson (FromJSON, ToJSON)
import Data.Text (Text)
import Data.UUID (UUID)
import GHC.Generics (Generic)

-- 事务的唯一标识
newtype TransactionId = TransactionId { unTID :: UUID }
  deriving (Show, Eq, Ord, Generic)
instance ToJSON TransactionId
instance FromJSON TransactionId

-- 参与者的标识
data Participant = S3Service | DBService
  deriving (Show, Eq, Ord, Bounded, Enum, Generic)
instance ToJSON Participant
instance FromJSON Participant

-- 事务的宏观状态
data TransactionState
  = Preparing
  | Prepared
  | Committing
  | Committed
  | Aborting
  | Aborted
  deriving (Show, Eq, Generic)
instance ToJSON TransactionState
instance FromJSON TransactionState

-- 每个参与者的投票结果
data Vote = VoteCommit | VoteAbort
  deriving (Show, Eq, Generic)
instance ToJSON Vote
instance FromJSON Vote

-- 协调器需要维护的完整事务信息
data Transaction = Transaction
  { tId :: TransactionId
  , tState :: TransactionState
  , tVotes :: Map Participant Vote
  } deriving (Show, Generic)
instance ToJSON Transaction

协调器的核心是管理所有活动事务的状态。在真实项目中,这需要持久化到数据库(如 PostgreSQL 或 FoundationDB)以防协调器崩溃。为简化演示,我们先用一个并发安全的内存状态 TVar (Software Transactional Memory) 来存储。

-- file: src/Coordinator.hs
module Coordinator where

import Control.Concurrent.STM
import qualified Data.Map.Strict as Map
import Data.UUID.V4 (nextRandom)
import Types

-- 使用 STM TVar 保证并发访问的原子性和一致性
type TransactionStore = TVar (Map.Map TransactionId Transaction)

-- 创建一个新的事务存储
newTransactionStore :: IO TransactionStore
newTransactionStore = newTVarIO Map.empty

-- 初始化一个新事务
-- 这是一个关键的入口点,当客户端请求发起事务时调用
initTransaction :: TransactionStore -> IO Transaction
initTransaction store = do
  uuid <- nextRandom
  let tid = TransactionId uuid
      -- 初始状态:Preparing,没有任何投票
      newTx = Transaction
        { tId = tid
        , tState = Preparing
        , tVotes = Map.empty
        }
  -- 原子地将新事务写入存储
  atomically $ modifyTVar' store (Map.insert tid newTx)
  -- 返回创建的事务,其 ID 将被发送给客户端
  return newTx

WebSocket 通信协议与实现

客户端与协调器之间的通信协议必须被精确定义。我们使用 JSON 格式,并通过 type 字段区分消息类型。

// 定义客户端与服务端之间的消息契约
// Client -> Server Messages
interface InitiateTxMsg {
  type: 'INITIATE_TRANSACTION';
  payload: { fileContent: string; metadata: object; };
}

// Server -> Client Messages
interface TxStateChangeMsg {
  type: 'TRANSACTION_STATE_CHANGE';
  payload: {
    transactionId: string;
    newState: 'PREPARING' | 'PREPARED' | 'COMMITTING' | 'ABORTING' | 'COMMITTED' | 'ABORTED';
  };
}

interface TxErrorMsg {
  type: 'TRANSACTION_ERROR';
  payload: {
    transactionId: string;
    error: string;
  };
}

在 Haskell 端,我们需要为这些消息创建对应的 ADT (Algebraic Data Type),并提供 Aeson 的 ToJSONFromJSON 实例。

-- file: src/Protocol.hs
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE OverloadedStrings #-}

module Protocol where

import Data.Aeson
import Data.Text (Text)
import GHC.Generics (Generic)
import Types (TransactionId, TransactionState)

-- C2S: Client to Server
data ClientMessage
  = InitiateTransaction
      { fileContent :: Text
      , metadata :: Value -- 使用 Aeson Value 来表示任意 JSON 对象
      }
  deriving (Show, Generic)

instance FromJSON ClientMessage where
  parseJSON = withObject "ClientMessage" $ \v -> do
    tag <- v .: "type"
    case (tag :: Text) of
      "INITIATE_TRANSACTION" -> do
        payload <- v .: "payload"
        InitiateTransaction
          <$> payload .: "fileContent"
          <*> payload .: "metadata"
      _ -> fail "Unknown client message type"

-- S2C: Server to Client
data ServerMessage
  = TransactionStateChange
      { transactionId :: TransactionId
      , newState :: TransactionState
      }
  | TransactionError
      { transactionId :: TransactionId
      , error :: Text
      }
  deriving (Show, Generic)

instance ToJSON ServerMessage where
  toJSON (TransactionStateChange tid state) =
    object [ "type" .= ("TRANSACTION_STATE_CHANGE" :: Text)
           , "payload" .= object ["transactionId" .= tid, "newState" .= state]
           ]
  toJSON (TransactionError tid err) =
    object [ "type" .= ("TRANSACTION_ERROR" :: Text)
           , "payload" .= object ["transactionId" .= tid, "error" .= err]
           ]

WebSocket 服务器的逻辑是整个系统的神经中枢。我们使用 websockets 库来处理连接。

-- file: app/Main.hs
import Control.Concurrent (forkIO)
import Control.Exception (finally)
import Control.Monad (forever)
import qualified Network.WebSockets as WS
import qualified Data.Text as T
import Data.Aeson (decode, encode)

-- ... imports for Coordinator, Types, Protocol

-- 应用程序状态,包含事务存储和与其他服务的连接池等
data AppState = AppState { txStore :: TransactionStore }

main :: IO ()
main = do
  state <- AppState <$> newTransactionStore
  putStrLn "Starting WebSocket server on port 9160"
  WS.runServer "127.0.0.1" 9160 $ application state

-- 当一个新客户端连接时,'application' 会被调用
application :: AppState -> WS.ServerApp
application state pending = do
  conn <- WS.acceptRequest pending
  -- 为每个连接设置一个保活 ping
  WS.withPingThread conn 30 (return ()) $
    -- 主消息循环
    finally (talk conn state) (putStrLn "Client disconnected")

-- 处理单个客户端的消息
talk :: WS.Connection -> AppState -> IO ()
talk conn state = forever $ do
  msgBytes <- WS.receiveData conn
  case decode msgBytes :: Maybe ClientMessage of
    Just (InitiateTransaction content meta) -> do
      -- 当收到事务初始化请求时,启动 2PC 流程
      -- forkIO 确保事务处理不会阻塞主消息接收循环
      _ <- forkIO $ runTransaction conn (txStore state) content meta
      return ()
    Nothing ->
      putStrLn $ "Received invalid message: " ++ show msgBytes

-- 完整的 2PC 流程,这是核心业务逻辑
runTransaction :: WS.Connection -> TransactionStore -> Text -> Value -> IO ()
runTransaction conn store content meta = do
  -- 1. 初始化事务
  tx <- initTransaction store
  let tid = tId tx
  
  -- 推送初始状态
  WS.sendTextData conn $ encode $ TransactionStateChange tid Preparing

  -- 2. PREPARE 阶段
  -- 在真实项目中,这里会是 RPC 调用 S3 和 DB 服务
  -- 为了演示,我们模拟这个过程
  putStrLn $ "[TID: " ++ show tid ++ "] Entering PREPARE phase."
  s3Vote <- mockParticipantCall S3Service "prepare"
  dbVote <- mockParticipantCall DBService "prepare"

  -- 记录投票结果
  let allVotes = Map.fromList [(S3Service, s3Vote), (DBService, dbVote)]
  atomically $ modifyTVar' store (Map.adjust (\t -> t { tVotes = allVotes }) tid)
  
  let decision = if all (== VoteCommit) allVotes
                   then Commit
                   else Abort
  
  -- 3. DECISION 阶段
  case decision of
    Commit -> do
      -- 通知客户端已准备好,等待提交
      atomically $ modifyTVar' store (Map.adjust (\t -> t { tState = Prepared }) tid)
      WS.sendTextData conn $ encode $ TransactionStateChange tid Prepared

      putStrLn $ "[TID: " ++ show tid ++ "] All participants voted COMMIT. Committing..."
      atomically $ modifyTVar' store (Map.adjust (\t -> t { tState = Committing }) tid)
      WS.sendTextData conn $ encode $ TransactionStateChange tid Committing
      
      -- 发送 COMMIT 指令
      _ <- mockParticipantCall S3Service "commit"
      _ <- mockParticipantCall DBService "commit"

      atomically $ modifyTVar' store (Map.adjust (\t -> t { tState = Committed }) tid)
      WS.sendTextData conn $ encode $ TransactionStateChange tid Committed
      putStrLn $ "[TID: " ++ show tid ++ "] Transaction COMMITTED."

    Abort -> do
      putStrLn $ "[TID: " ++ show tid ++ "] At least one participant voted ABORT. Aborting..."
      atomically $ modifyTVar' store (Map.adjust (\t -> t { tState = Aborting }) tid)
      WS.sendTextData conn $ encode $ TransactionStateChange tid Aborting
      
      -- 发送 ABORT 指令给那些可能投了 COMMIT 的服务
      _ <- mockParticipantCall S3Service "abort"
      _ <- mockParticipantCall DBService "abort"
      
      atomically $ modifyTVar' store (Map.adjust (\t -> t { tState = Aborted }) tid)
      WS.sendTextData conn $ encode $ TransactionStateChange tid Aborted
      putStrLn $ "[TID: " ++ show tid ++ "] Transaction ABORTED."

-- 模拟对参与者服务的调用,随机成功或失败
mockParticipantCall :: Participant -> String -> IO Vote
mockParticipantCall p action = do
  putStrLn $ "Calling " ++ show p ++ " to " ++ action
  threadDelay (500 * 1000) -- 模拟网络延迟
  -- 模拟 prepare 阶段有 10% 的失败率
  shouldFail <- if action == "prepare" 
                  then (< 0.1) <$> getStdRandom random 
                  else return False
  if shouldFail
    then do
      putStrLn $ "-> " ++ show p ++ " call FAILED."
      return VoteAbort
    else do
      putStrLn $ "-> " ++ show p ++ " call SUCCEEDED."
      return VoteCommit

前端 XState 状态机

前端的复杂度在于,它必须精确地响应并展示后端推送的每一个状态。任何UI状态的错乱都会严重误导用户。XState 在这里的作用无可替代。

首先,我们用 Mermaid 图来可视化这个状态机,这对于理解和沟通非常有帮助。

stateDiagram-v2
    [*] --> idle

    idle --> initiating: INITIATE
    initiating --> preparing: TX_STATE_CHANGE (PREPARING)
    initiating --> failed: ERROR

    preparing --> prepared: TX_STATE_CHANGE (PREPARED)
    preparing --> aborting: TX_STATE_CHANGE (ABORTING)
    preparing --> failed: ERROR

    prepared --> committing: TX_STATE_CHANGE (COMMITTING)
    prepared --> failed: ERROR

    committing --> committed: TX_STATE_CHANGE (COMMITTED)
    committing --> failed: ERROR

    aborting --> aborted: TX_STATE_CHANGE (ABORTED)
    aborting --> failed: ERROR

    committed --> idle: RESET
    aborted --> idle: RESET
    failed --> idle: RESET

接下来是 XState 的 TypeScript/JavaScript 实现。

// file: src/transactionMachine.ts
import { createMachine, assign } from 'xstate';

// 定义状态机的上下文,用于存储事务ID和错误信息
interface TxContext {
  transactionId: string | null;
  error: string | null;
}

// 定义状态机可以接收的事件
type TxEvent =
  | { type: 'INITIATE'; payload: { fileContent: string; metadata: object } }
  | { type: 'TX_STATE_CHANGE'; newState: string; transactionId: string }
  | { type: 'ERROR'; error: string }
  | { type: 'RESET' };

export const transactionMachine = createMachine<TxContext, TxEvent>({
  id: 'distributedTransaction',
  initial: 'idle',
  context: {
    transactionId: null,
    error: null,
  },
  states: {
    idle: {
      on: {
        INITIATE: 'initiating',
      },
      // 进入 idle 状态时清空上下文
      entry: assign({ transactionId: null, error: null }),
    },
    initiating: {
      // 这里的 'invoke' 可以用来封装 WebSocket 的发送逻辑
      // 为简化,我们假设发送逻辑在组件层面处理
      on: {
        TX_STATE_CHANGE: [
          {
            target: 'preparing',
            cond: (_, event) => event.newState === 'Preparing',
            actions: assign({ transactionId: (_, event) => event.transactionId }),
          },
        ],
        ERROR: {
          target: 'failed',
          actions: assign({ error: (_, event) => event.error }),
        },
      },
    },
    preparing: {
      on: {
        TX_STATE_CHANGE: [
          { target: 'prepared', cond: (_, event) => event.newState === 'Prepared' },
          { target: 'aborting', cond: (_, event) => event.newState === 'Aborting' },
        ],
        ERROR: {
          target: 'failed',
          actions: assign({ error: (_, event) => event.error }),
        },
      },
    },
    prepared: {
      on: {
        TX_STATE_CHANGE: [
          { target: 'committing', cond: (_, event) => event.newState === 'Committing' },
        ],
        ERROR: {
          target: 'failed',
          actions: assign({ error: (_, event) => event.error }),
        },
      },
    },
    committing: {
      on: {
        TX_STATE_CHANGE: [
          { target: 'committed', cond: (_, event) => event.newState === 'Committed' },
        ],
        ERROR: {
          target: 'failed',
          actions: assign({ error: (_, event) => event.error }),
        },
      },
    },
    aborting: {
      on: {
        TX_STATE_CHANGE: [
          { target: 'aborted', cond: (_, event) => event.newState === 'Aborted' },
        ],
        ERROR: {
          target: 'failed',
          actions: assign({ error: (_, event) => event.error }),
        },
      },
    },
    committed: {
      // 成功状态,可以停留一段时间显示成功信息
      on: { RESET: 'idle' },
      after: {
        5000: { target: 'idle' } // 5秒后自动重置
      }
    },
    aborted: {
      on: { RESET: 'idle' },
      after: {
        5000: { target: 'idle' }
      }
    },
    failed: {
      on: { RESET: 'idle' },
    },
  },
});

在 React 组件中集成这个状态机。

// file: src/components/Uploader.tsx
import React, { useEffect, useRef } from 'react';
import { useMachine } from '@xstate/react';
import { transactionMachine } from '../transactionMachine';

const WEBSOCKET_URL = 'ws://127.0.0.1:9160';

export const Uploader = () => {
  const [state, send] = useMachine(transactionMachine);
  const socketRef = useRef<WebSocket | null>(null);

  useEffect(() => {
    // 组件挂载时建立 WebSocket 连接
    const socket = new WebSocket(WEBSOCKET_URL);
    socketRef.current = socket;

    socket.onopen = () => console.log('WebSocket Connected');
    socket.onclose = () => console.log('WebSocket Disconnected');
    socket.onerror = (err) => {
      console.error('WebSocket Error:', err);
      send({ type: 'ERROR', error: 'Connection failed' });
    };

    socket.onmessage = (event) => {
      try {
        const serverMsg = JSON.parse(event.data);
        if (serverMsg.type === 'TRANSACTION_STATE_CHANGE') {
          send({
            type: 'TX_STATE_CHANGE',
            newState: serverMsg.payload.newState,
            transactionId: serverMsg.payload.transactionId,
          });
        } else if (serverMsg.type === 'TRANSACTION_ERROR') {
          send({ type: 'ERROR', error: serverMsg.payload.error });
        }
      } catch (e) {
        console.error('Failed to parse server message:', e);
      }
    };

    return () => {
      socket.close(); // 组件卸载时关闭连接
    };
  }, [send]);

  const handleInitiate = () => {
    if (socketRef.current?.readyState === WebSocket.OPEN) {
      const msg = {
        type: 'INITIATE_TRANSACTION',
        payload: {
          fileContent: 'large-file-content-string',
          metadata: { name: 'document.pdf', author: 'user123' },
        },
      };
      socketRef.current.send(JSON.stringify(msg));
      send({ type: 'INITIATE', payload: msg.payload });
    } else {
      alert('WebSocket is not connected.');
    }
  };

  return (
    <div>
      <h2>Distributed Uploader</h2>
      <p>Current State: <strong>{state.value}</strong></p>
      {state.context.transactionId && <p>Transaction ID: {state.context.transactionId}</p>}
      
      {state.matches('idle') && <button onClick={handleInitiate}>Upload File</button>}
      
      {state.matches('failed') && (
        <div>
          <p style={{ color: 'red' }}>Error: {state.context.error}</p>
          <button onClick={() => send('RESET')}>Try Again</button>
        </div>
      )}

      {state.matches('committed') && <p style={{ color: 'green' }}>Upload successful!</p>}
      {state.matches('aborted') && <p style={{ color: 'orange' }}>Upload was cancelled and rolled back.</p>}
    </div>
  );
};

控制平面 RESTful API

最后,为了运维和排错,我们需要一个 RESTful API 来查询事务的最终状态。使用 servantscotty 可以轻松实现。这里用 scotty 举例。

-- in app/Main.hs, add scotty logic
import Web.Scotty
import Data.Text.Lazy (fromStrict)
import Data.UUID (fromText)

-- ...

main :: IO ()
main = do
  state <- AppState <$> newTransactionStore
  -- 并行运行 WebSocket 和 REST API
  forkIO $ WS.runServer "127.0.0.1" 9160 $ application state
  
  putStrLn "Starting REST API server on port 3000"
  scotty 3000 $ do
    get "/transactions/:tid" $ do
      tidText <- param "tid"
      case fromText (fromStrict tidText) of
        Nothing -> do
          status status400
          json ("Invalid TID format" :: Text)
        Just uuid -> do
          let tid = TransactionId uuid
          txMap <- liftIO $ readTVarIO (txStore state)
          case Map.lookup tid txMap of
            Nothing -> do
              status status404
              json ("Transaction not found" :: Text)
            Just tx -> json tx

现在,我们可以通过 GET http://localhost:3000/transactions/some-uuid 来查询任何一个事务的详细信息,这对于调试和审计至关重要。

局限与展望

这套实现方案解决了最初的原子性问题,但它远非完美。2PC 的同步阻塞特性是其固有的阿喀琉斯之踵。在 PREPARE 阶段,如果任何一个参与者无响应,所有资源都会被锁定,直到协调器超时。这在参与者数量多、网络不稳定的情况下会成为严重的性能瓶颈。

此外,协调器本身是单点故障。尽管我们用了健壮的 Haskell,但如果承载它的机器宕机,所有进行中的事务都会陷入不确定状态。生产环境的协调器必须是集群化的、高可用的,并且其状态必须持久化到支持事务的数据库中(如PostgreSQL),通过 Raft 或 Paxos 协议保证集群一致性。

未来的优化路径可能包括:

  1. 实现参与者端的超时和恢复日志: 参与者在 PREPARED 状态后应将事务信息写入本地日志。当协调器恢复后,可以通过查询参与者的日志来决定事务的最终走向。
  2. 协调器集群化: 使用 Raft 协议在多个协调器实例间同步事务状态,实现高可用。
  3. 协议替换探索: 对于可以容忍最终一致性的场景,切换到基于消息队列的 Saga 模式,能极大地提高系统的吞吐量和可用性。

尽管有这些局限,但通过 Haskell 的严谨性、WebSocket 的实时性以及 XState 的确定性,我们构建了一个在特定场景下高度可靠且用户体验透明的分布式事务解决方案。它清晰地展示了如何将多种看似不相关的技术栈有机地结合起来,以应对一个复杂的工程挑战。


  目录