通过 Pingora 构建应用网关
上一篇文章介绍了 CloudFlare
开源的网络框架 Pingora
, 此篇文章我们使用该框架从零到一实作一个应用网关. 目前大规模应用的网关主要分为三大类, 一类是以传统 C
语言开发的 Nginx
为基础并通过 Lua
进行扩展的网关, 诸如 OpenResty
, Kong
等便是如此; 一类则在诞生之初便完全为云原生提供解决方案的代理, 诸如 Envoy
, Linkerd2
和 Cilium
(严格说来 Cilium
划分在网关代理类并不合适); 最后一类便是和应用深度绑定的应用网关, 如 Spring Cloud Gateway
. 从今天的视角看 Spring Cloud
微服务的模式与云原生存在较大的竞合, 同时 微服务
, 注册中心
等概念被 Kubernetes
的不同 Service
通过类 DNS
模式一一种优雅的方式解决, 这也导致了 Spring Cloud Gateway
模块在 Kubernetes
集群内尴尬的境地.
开发中经常面对一种场景是微服务部署到 Kubernetes
内本地无法调试的问题和多应用集群内应用间相互调用的问题. 我们这里通过 Pingora
构建的应用网关主要便是为解决此特定场景下请求转发与应用鉴权的工作, 同时在此基础上可做进一步扩展.
首先声明一个 Gateway
结构体作为网关的主体, 后续有需要时可通过扩展该结构体以绑定资源, 实现一些高级功能.
#[derive(Debug, Default)]
pub struct Gateway {
}
在定义完 Gateway
后, 考虑到 TCP
连接特性我们再定义一个 Host
元组结构体以反映不同服务对应的 IP
和 Port
:
#[derive(Debug, Default)]
pub struct Host(String, u32);
同时应用网关一个核心职责便是路由映射, 即将 URL
A
映射到 B
上, 这里的 B
不一定是原服务也不一定是本集群内的服务.
#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct URL {
pub service_name: String,
pub origin_url: String,
pub mapping_url: String,
}
#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct Auth {
#[serde(default)]
pub client_id: String,
#[serde(default)]
pub client_secret: String,
#[serde(default)]
pub client_credentials: String,
#[serde(default)]
pub urls: Vec<URL>,
}
这里分别定义了 URL
和 Auth
结构体, 其中 URL
主要功能是完成 URI
的一对一转换, Auth
主要是为了在应用网关上进行细粒度的权限控制.
另外在上一篇文章中已经介绍了 Pingora
提供的代理能力主要通过 ProxyHttp
这个 Trait
提供, 具体的 ProxyHttp
包含了多个方法以在 HTTP
请求不同阶段进行拦截以便达到 短路请求
, 获取节点
, 修改请求正文
(修改请求正文头
) 和 修改响应正文
(修改响应正文头
) 等操作. 详细的处理流程可参考 Pingora Phase Chart
我们首先为 Gateway
实现 new_ctx
生成一个 Context
以便在后续报文交互过程中获取上下文环境:
type CTX = Arc<Mutex<HashMap<String, Auth>>>;
fn new_ctx(&self) -> Self::CTX {
APPS.clone()
}
此处 new_ctx
方法主要是返回一个通过 lazy_static
声明的 Arc
单例以便在不同线程间可进行状态共享, 更一般地说此处的 APPS
主要目的在于根据 ServiceName
和 URL
进行精细化地权限控制. APPS
具体的定义为:
lazy_static! {
pub static ref APPS: Arc<Mutex<HashMap<String, Auth>>> = {
let mut m: HashMap<String, Auth> = HashMap::new();
Arc::new(Mutex::new(m))
};
}
除上下文环境外网关的首要步骤便是获取下游服务节点, 根据前文引用的流程图可以知道此时可通过 upstream_peer
方法来实现(CloudFlare
使用的场景中任务节点是 上游
, 而在本文构建的应用网关场景下则认为称作 下游
, 所服务对象不同称呼不同而已, 不脱离具体语境下不混淆即可):
async fn upstream_peer(
&self,
session: &mut Session,
ctx: &mut Self::CTX,
) -> Result<Box<HttpPeer>>;
upstream_peer
会返回一个 Result
类型, 如果获取服务失败或是其他原因通过 Err
返回即可.
为简化此处应用网关逻辑首先需要约定一个关于路由的共识, 假设所有经过 Gateway
的 HTTP
请求均需满足 /service-a[:port]/service-a/x/y/z
形式. 其中第一个 service-a[:port]
中的 service-a
为网关需转发的 Service
, 也即 Kubernetes
中的不同服务, 而 [:port]
为可选的端口参数, 已解决采用 ClusterIP
类型的服务使用非 80
端口; 第二个 service-a
为对应 Service
下接收 HTTP
请求的公共前缀, 以便于不同网络请求进行区分. 如果是 Spring Boot
应用可通过 server.servlet.context-path
进行统一配置; /x/y/z
为可选的应用实际响应请求的路径.
upstream_peer
方法入参包含一个 可变
的 Session
对象和 CTX
对象, 因此可通过 Session
首先获得请求头 和 URI
:
let header = session.req_header();
let mut uri = header.uri.clone();
由于上面的 req_header()
获取到的是 RequestHeader
对象的引用, 而 RequestHeader
为 http::request::Parts
类型实现了 Deref
, 因此可直接通过上面的 header
对象获取 HTTP
Method
, 这样便可直接根据 GET
, POST
, PUT
以及其他的 HTTP
方法进行进一步处理.
接下来便需要将 ServiceName
转换为 IP
形式以便后续通过 `` 建立 TCP
连接:
let service_names = uris[1].split(":").collect::<Vec<_>>();
let service_name = if service_names.len() == 2 {
Host(service_names[0].to_string(), service_names[1].parse::<u32>().unwrap())
} else if service_names.len() == 1{
Host(service_names[0].to_string(), 80)
} else {
// TODO:
panic!("service_names is valid");
};
let ips = lookup_host(&service_name.0).unwrap();
let ips = ips.iter().filter(|&&ip| ip.is_ipv4()).last().unwrap();
这里在倒数第二行的地方通过调用 lookup_host
进行 DNS
解析以便获得对应 Service
的真实 ClusterIP
. 有一个常识就是 Kubernetes
通过采用 DNS
域名解析的形式进行服务发现与路由. 另外值得一提的时这里虽然定义了两次 ips
变量, 但是 Rust
采用了 同名覆盖
语法因此这里不会有问题.
在解析完 Service
另一个需要进行的操作就是修改原 HTTP
请求的 URL
为真实的 URL
:
let mut uri_builder = Uri::builder();
if let Some(a) = uri.authority() {
uri_builder = uri_builder.authority(a.to_string());
}
if let Some(&ref a) = uri.scheme() {
uri_builder = uri_builder.scheme(a.to_string().as_str());
}
if let Some(a) = uri.path_and_query() {
let s = a.to_string();
let path_and_query = format!("/{}", s.split("/").collect::<Vec<_>>()[2..].join("/"));
println!("path_and_query: {}", path_and_query);
uri_builder = uri_builder.path_and_query(path_and_query);
}
let uri = uri_builder.build().unwrap();
session.req_header_mut().set_uri(uri);
let _ = session.req_header_mut().insert_header("Host", service_name.0.clone());
由于在此处进行修改 URL
所以原 HTTP
请求中附带的 Authority
和 Scheme
等信息需要保留, 同时为满足 RFC 7231
中关于 HTTP/1.1
的定义与要求, 这里 Host
请求头也需要针对此前转换后端 ServiceName
进行修改. 如果不修改的话 Host
中存储的信息仍然是指向 Gateway
网关的, 真实的 Service
有理由拒绝响应.
在 upstream_peer
方法的最后根据处理结果返回一个 Result
类型即可, 按照上述逻辑如无以为则返回一个 Ok(Box<HttpPeer>)
对象即可:
let peer = HttpPeer::new(peer, false, "".to_string());
Ok(Box::new(peer))
在网关进行请求转发的过程中进行权限校验和 Token
验证是必不可少的步骤, 依据 Pingora
中关于请求的处理流程可以在 upstream_request_filter
方法中进行处理, 该方法会返回一个 Result
对象, 如果是 Some(())
的话则链路会继续向下流转否则则提前退出并向 HTTP
客户端返回具体的 Err
数据. 这里仅提供一个参考实现.
async fn upstream_request_filter(
&self,
_session: &mut Session,
upstream_request: &mut RequestHeader,
_ctx: &mut Self::CTX,
) -> Result<()> {
let auth = _ctx.lock().unwrap();
println!("Header: {:?}", upstream_request.headers);
let authorization = upstream_request.headers.get("Authorization");
match authorization {
Some(authorization) => {
let token = authorization.to_str().unwrap();
let claims = verify(token);
let state = match claims {
Some(claims) => {
let a = auth.get::<String>(&claims.sub);
match a {
Some(v) => {
if v.urls.iter().any(|u| u.mapping_url.eq(upstream_request.uri.path())) {
Some(())
} else {
None
}
},
None => None,
}
},
None => None,
};
if let Some(_) = state {
return Ok(());
}
Err(Box::new(Error {
etype: ErrorType::InternalError,
esource: ErrorSource::Internal,
retry: RetryType::ReusedOnly,
cause: None,
context: None,
}))
},
None => {
Ok(())
},
}
}
上面 upstream_request_filter
方法的核心逻辑在于获取 HTTP
请求头中的 Authorization
, 然后通过 verify
方法验证并解析 JWT
信息以便获得特定的信息. JWT
中存放了 Claims
信息 - 合法请求的话, 非法请求姑且不论, 而 Claims
结构体则包含了具体的 `` 等信息:
pub struct Claims {
pub iss: String, // Issuer of the JWT
pub sub: String, // Subject of the JWT (the user)
pub exp: u64, // Time after which the JWT expires
pub nbf: u64, // Time before which the JWT must not be accepted for processing
pub iat: u64, // Time at which the JWT was issued; can be used to determine age of the JWT
pub jti: Option<String>, // Unique identifier; can be used to prevent the JWT from being replayed (allows a token to be used only once)
}
在解析得到 client_id
后便可根据此前定义的 CTX
上下文对象进一步获取关于权限和 URL
的定义以进一步进行权限控制.
补充⌗
上文中通过实现 new_ctx
, upstream_peer
和 upstream_request_filter
三个方法实现了一个初版的 Gateway
, 但未涉及到关于权限信息的处理, 这里可通过订阅 Kafka
消息来实时更新 APPS
数据.
fn consumer_topic(brokers: &str, topics: &[&str], consumer_id: &str) {
println!("KAFAK Consumer config: {brokers:?} {topics:?} {consumer_id:?}");
let consumer: BaseConsumer = ClientConfig::new()
.set("bootstrap.servers", brokers)
.set("group.id", consumer_id)
.create()
.unwrap();
consumer.subscribe(topics)
.unwrap();
for msg in consumer.iter() {
match msg {
Ok(m) => {
let payload = match m.payload_view::<str>() {
None => None,
Some(Ok(v)) => Some(v),
Some(Err(err)) => None,
};
if payload.is_some() {
println!("Message: {:?}", payload.unwrap());
let auth = serde_json::from_str::<Auth>(&payload.unwrap());
match auth {
Ok(auth) => {
// TODO: partial update auth
println!("Auth: {:?}", auth);
let mut m = APPS.lock().unwrap();
println!("ClientId: {:?} Auth: {:?}", auth.client_id, m.get(&auth.client_id));
m.insert(auth.client_id.clone(), auth);
},
Err(err) => {
println!("Error: {:?}", err);
}
}
}
},
Err(err) => {
},
}
}
}