在构建一个分布式的多租户SaaS平台时,我们面临一个棘手的安全挑战:如何在一个由多个Phoenix节点组成的集群中,即时且可靠地处理OAuth 2.0 Refresh Token的撤销。当一个租户的用户通过其账户管理界面撤销了对某个第三方应用的授权时,这个动作可能落在集群中的任何一个节点上。然而,几乎在同一时刻,该第三方应用可能正在使用旧的Refresh Token,向我们集群中的另一个节点请求新的Access Token。
传统的解决方案,例如使用Redis的Pub/Sub机制广播撤销事件,存在明显的缺陷。消息传递是“即发即弃”的,不保证送达。在网络抖动或节点短暂失联的情况下,某些节点可能永远不会收到撤销通知,从而留下一个严重的安全窗口。另一种方法是每次刷新令牌时都去中心化数据库(如PostgreSQL)中查询一个“撤销列表”,但这会为每个令牌刷新操作增加一次数据库往返,在高并发场景下,这对性能和延迟是不可接受的。
问题的核心在于状态的一致性。我们需要一个机制,能保证一旦一个令牌被标记为“已撤销”,这个状态变更就能原子性地、线性化地同步到集群的所有节点。任何后续的、针对该令牌的请求,无论落在哪个节点上,都必须能看到这个最新的状态。这正是分布式一致性协议的用武之地。经过评估,我们决定不引入如etcd或Consul这样的外部协调服务来增加运维复杂性,而是利用Elixir/OTP生态的优势,在应用层内部实现一个基于Raft协议的分布式状态机,专门用于管理令牌撤销列表。
我们的技术选型最终落在Phoenix框架、Elixir语言以及horde库上。horde提供了一个强大的分布式进程注册表和监督工具集,其内部利用高效的CRDT和Gossip协议进行状态同步,并能集成Raft来保证关键操作的强一致性。我们将构建一个分布式的注册表,用它来维护一个全局一致的、已撤销的Refresh Token JTI(JWT ID)集合。
初始设定:环境与依赖
首先,我们在一个标准的Phoenix 1.7项目中引入horde。
mix.exs:
def deps do
[
# ... phoenix deps
{:horde, "~> 0.9.1"}
]
end
为了让Phoenix节点能够组成集群,我们还需要一个节点发现策略。在开发环境中,使用Horde.Cluster.Gossip配合静态节点列表是可行的。在生产环境中,通常会使用libcluster配合Kubernetes、DNS A记录或云厂商的API来自动发现节点。
config/dev.exs:
import Config
config :horde,
debug: true, # 在开发时开启,便于观察集群事件
cluster_name: "revocation_cluster"
config :libcluster,
topologies: [
revocation_cluster: [
strategy: Cluster.Strategy.Gossip,
config: [
# 在开发时,我们手动指定种子节点
# 假设我们启动两个节点:
# iex --sname [email protected] -S mix phx.server
# iex --sname [email protected] -S mix phx.server
hosts: [:"[email protected]", :"[email protected]"]
]
]
]
核心组件:分布式撤销列表注册表
我们将创建一个专门的GenServer,由Horde.Registry进行管理。这个注册表将作为我们全局统一的撤销令牌存储。它的状态就是一个简单的MapSet,键是JTI,值是撤销的时间戳。
lib/my_app/auth/revocation_registry.ex:
defmodule MyApp.Auth.RevocationRegistry do
use GenServer
require Logger
# JTI -> revocation_timestamp (unix)
@type state :: %{String.t() => integer()}
# --------------------------------
# Public API
# --------------------------------
@doc """
将JTI添加到分布式撤销列表。
这是一个同步的、具有强一致性保证的操作。
`Horde.Registry.put_meta/3` 在Raft模式下会确保操作被提交到Raft日志。
"""
def revoke(jti, ttl_seconds) do
expiry = DateTime.to_unix(DateTime.utc_now()) + ttl_seconds
key = jti_to_key(jti)
# 使用put_meta而不是注册进程,是因为我们只需要存储元数据。
# 这里的value可以是任意值,关键是key和meta。
:ok = Horde.Registry.put_meta(registry_name(), key, :revoked, %{expiry: expiry})
end
@doc """
检查JTI是否已被撤销。
这是一个本地读操作,速度极快。Horde保证了状态的最终一致性同步。
对于我们的场景,Raft提交后的状态会很快通过Gossip传播开。
"""
def is_revoked?(jti) do
key = jti_to_key(jti)
case Horde.Registry.get_meta(registry_name(), key) do
%{expiry: expiry} ->
# 检查是否过期,防止列表无限增长
if expiry > DateTime.to_unix(DateTime.utc_now()) do
true
else
# JTI已过期,可以视为未撤销(因为它也无法再使用)
# 也可以在这里触发一个异步的清理任务
false
end
nil ->
false
end
end
# --------------------------------
# GenServer Callbacks
# --------------------------------
def start_link(opts) do
GenServer.start_link(__MODULE__, :ok, opts)
end
@impl true
def init(:ok) do
Logger.info("RevocationRegistry starting...")
# 定期清理过期的JTI
:timer.send_interval(3_600_000, self(), :cleanup) # 每小时清理一次
{:ok, %{}}
end
@impl true
def handle_info(:cleanup, state) do
Logger.info("Running cleanup task for revoked JTIs...")
now = DateTime.to_unix(DateTime.utc_now())
# 清理任务只在leader节点执行,防止集群重复工作
if Horde.Leader.is_leader(supervisor_name()) do
revoked_jtis = Horde.Registry.members(registry_name())
for {key, _pid, %{expiry: expiry}} <- revoked_jtis do
if expiry <= now do
Logger.debug("Cleaning up expired JTI: #{key}")
Horde.Registry.delete(registry_name(), key)
end
end
end
{:noreply, state}
end
# --------------------------------
# Private Helpers
# --------------------------------
defp registry_name, do: __MODULE__
defp supervisor_name, do: MyApp.Auth.RevocationSupervisor
defp jti_to_key(jti), do: "jti:#{jti}"
end
这个模块的设计有几个关键点:
-
revoke/2: 使用Horde.Registry.put_meta/3。在配置为使用Raft后端时,这个调用会阻塞,直到状态变更被Raft集群的大多数节点确认。这为我们提供了写操作的线性化保证。 -
is_revoked?/1: 使用Horde.Registry.get_meta/2。这是一个本地读取操作,直接访问当前节点内存中的状态副本,因此延迟极低。Horde通过内部的delta-CRDTs和Gossip协议,确保Raft提交的状态变更被高效地传播到所有节点。 - 状态清理: 为了防止撤销列表无限增长,我们为每个JTI设置了过期时间。一个定时任务会由集群的领导者节点(由
Horde.Leader选举产生)执行,清理掉已经过期的记录。
监督树与集群启动
现在,我们需要将Horde的服务集成到我们应用的监督树中。这包括Horde.Registry和Horde.Supervisor。
lib/my_app/auth/revocation_supervisor.ex:
defmodule MyApp.Auth.RevocationSupervisor do
use Horde.Supervisor
def start_link(init_arg) do
Horde.Supervisor.start_link(__MODULE__, init_arg, name: __MODULE__)
end
@impl true
def init(init_arg) do
children = [
{Horde.Registry,
name: MyApp.Auth.RevocationRegistry,
# 使用Raft保证元数据写入的强一致性
# distribution_strategy: Horde.Distribution.Strategy.Raft,
# raft_storage_dir: "./_build/dev/raft_storage",
members: [
{MyApp.Auth.RevocationRegistry, name: :singleton_process}
]}
]
# 我们需要一个集群监督器
cluster_supervisor_opts = [
strategy: :one_for_one,
name: MyApp.ClusterSupervisor
]
Horde.Supervisor.init(children,
distribution_strategy: Horde.Distribution.Strategy.Raft,
cluster_supervisor_opts: cluster_supervisor_opts,
init_arg: init_arg
)
end
end
接着,将这个RevocationSupervisor添加到主应用的监督树中。
lib/my_app/application.ex:
defmodule MyApp.Application do
use Application
@impl true
def start(_type, _args) do
children = [
# Start the Ecto repository
MyApp.Repo,
# Start the Telemetry supervisor
MyAppWeb.Telemetry,
# Start the PubSub system
{Phoenix.PubSub, name: MyApp.PubSub},
# Start the Web server
MyAppWeb.Endpoint,
# Start our distributed revocation list supervisor
MyApp.Auth.RevocationSupervisor
]
# ...
opts = [strategy: :one_for_one, name: MyApp.Supervisor]
Supervisor.start_link(children, opts)
end
# ...
end
注意: Horde.Distribution.Strategy.Raft 在较新版本中配置方式有所改变,上述init/1中的配置需要依据具体Horde版本进行调整。核心思想是为Horde.Registry指定Raft分发策略。
集成到OAuth 2.0流程
万事俱备,只欠东风。现在我们将这个分布式撤销服务集成到我们的OAuth 2.0授权服务器逻辑中。我们使用ueberauth和oauth2库来处理标准的OAuth流程。
1. 令牌刷新端点
在处理Refresh Token请求的控制器或Plug中,我们需要在验证完令牌签名和有效期之后,立即检查它是否已被撤销。
lib/my_app_web/controllers/oauth_controller.ex:
defmodule MyAppWeb.OAuthController do
use MyAppWeb, :controller
alias MyApp.Auth.RevocationRegistry
def refresh(conn, %{"refresh_token" => refresh_token}) do
case MyApp.Auth.Token.verify(refresh_token, :refresh) do
{:ok, claims} ->
# 核心检查点
if RevocationRegistry.is_revoked?(claims["jti"]) do
conn
|> put_status(:bad_request)
|> json(%{error: "invalid_grant", error_description: "Token has been revoked."})
else
# 令牌有效且未被撤销,签发新的Access Token
{access_token, _new_refresh_token} = MyApp.Auth.Token.generate_access_token(claims["sub"])
json(conn, %{
access_token: access_token,
token_type: "Bearer",
expires_in: 3600
# 可选:实现Refresh Token Rotation
# refresh_token: new_refresh_token
})
end
{:error, _reason} ->
conn
|> put_status(:bad_request)
|> json(%{error: "invalid_grant", error_description: "Invalid refresh token."})
end
end
end
2. 令牌撤销端点
我们需要提供一个符合RFC 7009规范的令牌撤销端点。当用户或客户端主动撤销令牌时,会调用此接口。
lib/my_app_web/controllers/oauth_controller.ex:
defmodule MyAppWeb.OAuthController do
# ... (previous code)
# Endpoint for POST /oauth/revoke
def revoke(conn, %{"token" => token, "token_type_hint" => "refresh_token"}) do
case MyApp.Auth.Token.verify(token, :refresh) do
{:ok, claims} ->
jti = claims["jti"]
# Refresh Token的有效期通常较长(数月或数年),我们设置一个合理的TTL
# 比如30天,确保即使令牌本身未过期,撤销记录也不会永久存在
ttl_seconds = 30 * 24 * 3600
:ok = RevocationRegistry.revoke(jti, ttl_seconds)
# RFC要求即使令牌无效,也返回200 OK
send_resp(conn, :ok, "")
{:error, _reason} ->
# 令牌无效,我们什么都不做,直接返回成功
send_resp(conn, :ok, "")
end
end
def revoke(conn, _params) do
# 其他情况或不带hint的情况,可能需要更复杂的逻辑来判断令牌类型
send_resp(conn, :ok, "")
end
end
现在,整个流程闭环了。当用户通过前端界面撤销授权时,前端会调用后端的业务API,该API再调用/oauth/revoke端点,将对应的Refresh Token JTI加入到分布式的、强一致的撤销列表中。
流程可视化与一致性分析
我们可以用Mermaid图来清晰地展示这个流程:
sequenceDiagram
participant User
participant Browser
participant API_Node_A
participant API_Node_B
participant Horde_Raft as Horde (Raft Quorum)
participant Third_Party_App as App
User->>Browser: 点击“撤销授权”
Browser->>API_Node_A: POST /api/applications/revoke_access
API_Node_A->>API_Node_A: 调用内部OAuth服务 revoke(refresh_token)
API_Node_A->>Horde_Raft: RevocationRegistry.revoke(JTI)
Note right of Horde_Raft: Raft协议开始工作
1. Leader接收请求
2. 写入本地Log
3. 复制Log到Followers
4. 等待多数派确认
Horde_Raft-->>API_Node_A: :ok (写入已提交)
Note right of Horde_Raft: 状态变更已持久化
并开始通过Gossip传播
API_Node_A-->>Browser: 授权已撤销
Browser-->>User: 显示成功消息
par 并发请求
App->>API_Node_B: POST /oauth/refresh (使用相同的refresh_token)
and
Note over API_Node_B: Horde通过Gossip
接收到JTI已撤销的状态更新
end
API_Node_B->>API_Node_B: RevocationRegistry.is_revoked?(JTI)
Note right of API_Node_B: 本地内存读取, 命中撤销列表
API_Node_B-->>App: HTTP 400 (invalid_grant)
这个架构的优势在于,revoke操作的强一致性保证了只要API_Node_A的调用返回成功,JTI就已经被安全地提交到了Raft集群的多数派中。任何后续的is_revoked?查询,即使由于Gossip的短暂延迟,在最坏情况下可能读取到旧状态,但这个窗口期极小。对于要求更严格的场景,可以让is_revoked?也执行一次Raft的线性化读(Quorum Read),但这会牺牲读操作的低延迟特性。在实践中,对于令牌撤销这种场景,写操作的强一致性加上读操作的快速最终一致性,已经能满足绝大多数安全要求。
局限性与未来迭代路径
当前方案并非没有权衡。首先,它将令牌撤销状态存储在应用集群的内存中。如果集群中的所有节点同时重启,且Raft日志没有持久化到磁盘(horde支持配置持久化存储),那么状态将会丢失。在生产环境中,必须为Raft配置持久化存储目录,并确保其有备份和恢复策略。
其次,这个方案的扩展性受限于Raft协议。Raft集群的性能会随着节点数量的增加而下降,通常建议集群规模保持在3、5或7个节点。对于超大规模的部署(数百个节点),可能需要将应用集群分片,每个分片维护自己的Raft集群,或者回归到使用专门的外部协调服务,如TiKV或FoundationDB。
最后,内存占用是需要监控的一个指标。虽然我们实现了TTL和定期清理,但在撤销操作异常频繁的时期,内存占用可能会瞬时增高。可以考虑引入更紧凑的数据结构,例如在集群领导者节点上维护一个布隆过滤器(Bloom Filter),并将其序列化后通过Raft进行分发。这会引入一定的假阳性率(一个未被撤销的令牌可能被误判为已撤销),但绝不会有假阴性,这在安全上是可接受的,可以通过业务逻辑的重试来弥补。