上一篇文章介绍了 CloudFlare 开源的网络框架 Pingora, 此篇文章我们使用该框架从零到一实作一个应用网关. 目前大规模应用的网关主要分为三大类, 一类是以传统 C 语言开发的 Nginx 为基础并通过 Lua 进行扩展的网关, 诸如 OpenResty, Kong 等便是如此; 一类则在诞生之初便完全为云原生提供解决方案的代理, 诸如 Envoy, Linkerd2Cilium (严格说来 Cilium 划分在网关代理类并不合适); 最后一类便是和应用深度绑定的应用网关, 如 Spring Cloud Gateway. 从今天的视角看 Spring Cloud 微服务的模式与云原生存在较大的竞合, 同时 微服务, 注册中心 等概念被 Kubernetes 的不同 Service 通过类 DNS 模式一一种优雅的方式解决, 这也导致了 Spring Cloud Gateway 模块在 Kubernetes 集群内尴尬的境地.

开发中经常面对一种场景是微服务部署到 Kubernetes 内本地无法调试的问题和多应用集群内应用间相互调用的问题. 我们这里通过 Pingora 构建的应用网关主要便是为解决此特定场景下请求转发与应用鉴权的工作, 同时在此基础上可做进一步扩展.

首先声明一个 Gateway 结构体作为网关的主体, 后续有需要时可通过扩展该结构体以绑定资源, 实现一些高级功能.

#[derive(Debug, Default)]
pub struct Gateway {
}

在定义完 Gateway 后, 考虑到 TCP 连接特性我们再定义一个 Host 元组结构体以反映不同服务对应的 IPPort:

#[derive(Debug, Default)]
pub struct Host(String, u32);

同时应用网关一个核心职责便是路由映射, 即将 URL A 映射到 B 上, 这里的 B 不一定是原服务也不一定是本集群内的服务.

#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct URL {
    pub service_name: String,
    pub origin_url: String,
    pub mapping_url: String,
}

#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct Auth {
    #[serde(default)]
    pub client_id: String,
    #[serde(default)]
    pub client_secret: String,
    #[serde(default)]
    pub client_credentials: String,
    #[serde(default)]
    pub urls: Vec<URL>,
}

这里分别定义了 URLAuth 结构体, 其中 URL 主要功能是完成 URI 的一对一转换, Auth 主要是为了在应用网关上进行细粒度的权限控制.

另外在上一篇文章中已经介绍了 Pingora 提供的代理能力主要通过 ProxyHttp 这个 Trait 提供, 具体的 ProxyHttp 包含了多个方法以在 HTTP 请求不同阶段进行拦截以便达到 短路请求 , 获取节点, 修改请求正文(修改请求正文头) 和 修改响应正文(修改响应正文头) 等操作. 详细的处理流程可参考 Pingora Phase Chart

我们首先为 Gateway 实现 new_ctx 生成一个 Context 以便在后续报文交互过程中获取上下文环境:

    type CTX = Arc<Mutex<HashMap<String, Auth>>>;

    fn new_ctx(&self) -> Self::CTX {
        APPS.clone()
    }

此处 new_ctx 方法主要是返回一个通过 lazy_static 声明的 Arc 单例以便在不同线程间可进行状态共享, 更一般地说此处的 APPS 主要目的在于根据 ServiceNameURL 进行精细化地权限控制. APPS 具体的定义为:

lazy_static! {
    pub static ref APPS: Arc<Mutex<HashMap<String, Auth>>> = {
        let mut m: HashMap<String, Auth> = HashMap::new();
        Arc::new(Mutex::new(m))
    };
}

除上下文环境外网关的首要步骤便是获取下游服务节点, 根据前文引用的流程图可以知道此时可通过 upstream_peer 方法来实现(CloudFlare 使用的场景中任务节点是 上游, 而在本文构建的应用网关场景下则认为称作 下游, 所服务对象不同称呼不同而已, 不脱离具体语境下不混淆即可):

async fn upstream_peer(
    &self,
    session: &mut Session,
    ctx: &mut Self::CTX,
) -> Result<Box<HttpPeer>>;

upstream_peer 会返回一个 Result 类型, 如果获取服务失败或是其他原因通过 Err 返回即可.

为简化此处应用网关逻辑首先需要约定一个关于路由的共识, 假设所有经过 GatewayHTTP 请求均需满足 /service-a[:port]/service-a/x/y/z 形式. 其中第一个 service-a[:port] 中的 service-a 为网关需转发的 Service, 也即 Kubernetes 中的不同服务, 而 [:port] 为可选的端口参数, 已解决采用 ClusterIP 类型的服务使用非 80 端口; 第二个 service-a 为对应 Service 下接收 HTTP 请求的公共前缀, 以便于不同网络请求进行区分. 如果是 Spring Boot 应用可通过 server.servlet.context-path 进行统一配置; /x/y/z 为可选的应用实际响应请求的路径.

upstream_peer 方法入参包含一个 可变Session 对象和 CTX 对象, 因此可通过 Session 首先获得请求头 和 URI:

let header  = session.req_header();
let mut uri = header.uri.clone();

由于上面的 req_header() 获取到的是 RequestHeader 对象的引用, 而 RequestHeaderhttp::request::Parts 类型实现了 Deref, 因此可直接通过上面的 header 对象获取 HTTP Method, 这样便可直接根据 GET, POST, PUT 以及其他的 HTTP 方法进行进一步处理.

接下来便需要将 ServiceName 转换为 IP 形式以便后续通过 `` 建立 TCP 连接:

let service_names = uris[1].split(":").collect::<Vec<_>>();
let service_name = if service_names.len() == 2 {
    Host(service_names[0].to_string(), service_names[1].parse::<u32>().unwrap())
} else if service_names.len() == 1{
    Host(service_names[0].to_string(), 80)
} else {
    // TODO:
    panic!("service_names is valid");
};

let ips = lookup_host(&service_name.0).unwrap();
let ips = ips.iter().filter(|&&ip| ip.is_ipv4()).last().unwrap();

这里在倒数第二行的地方通过调用 lookup_host 进行 DNS 解析以便获得对应 Service 的真实 ClusterIP. 有一个常识就是 Kubernetes 通过采用 DNS 域名解析的形式进行服务发现与路由. 另外值得一提的时这里虽然定义了两次 ips 变量, 但是 Rust 采用了 同名覆盖 语法因此这里不会有问题.

在解析完 Service 另一个需要进行的操作就是修改原 HTTP 请求的 URL 为真实的 URL:

let mut uri_builder = Uri::builder();
if let Some(a) = uri.authority() {
    uri_builder = uri_builder.authority(a.to_string());
}
if let Some(&ref a) = uri.scheme() {
    uri_builder = uri_builder.scheme(a.to_string().as_str());
}
if let Some(a) = uri.path_and_query() {
    let s = a.to_string();
    let path_and_query = format!("/{}", s.split("/").collect::<Vec<_>>()[2..].join("/"));
    println!("path_and_query: {}", path_and_query);
    uri_builder = uri_builder.path_and_query(path_and_query);
}
let uri = uri_builder.build().unwrap();
session.req_header_mut().set_uri(uri);
let _ = session.req_header_mut().insert_header("Host", service_name.0.clone());

由于在此处进行修改 URL 所以原 HTTP 请求中附带的 AuthorityScheme 等信息需要保留, 同时为满足 RFC 7231 中关于 HTTP/1.1 的定义与要求, 这里 Host 请求头也需要针对此前转换后端 ServiceName 进行修改. 如果不修改的话 Host 中存储的信息仍然是指向 Gateway 网关的, 真实的 Service 有理由拒绝响应.

upstream_peer 方法的最后根据处理结果返回一个 Result 类型即可, 按照上述逻辑如无以为则返回一个 Ok(Box<HttpPeer>) 对象即可:

let peer = HttpPeer::new(peer, false, "".to_string());
Ok(Box::new(peer))

在网关进行请求转发的过程中进行权限校验和 Token 验证是必不可少的步骤, 依据 Pingora 中关于请求的处理流程可以在 upstream_request_filter 方法中进行处理, 该方法会返回一个 Result 对象, 如果是 Some(()) 的话则链路会继续向下流转否则则提前退出并向 HTTP 客户端返回具体的 Err 数据. 这里仅提供一个参考实现.

async fn upstream_request_filter(
    &self,
    _session: &mut Session,
    upstream_request: &mut RequestHeader,
    _ctx: &mut Self::CTX,
) -> Result<()> {
    let auth = _ctx.lock().unwrap();
    println!("Header: {:?}", upstream_request.headers);
    let authorization = upstream_request.headers.get("Authorization");
    match authorization {
        Some(authorization) => {
            let token = authorization.to_str().unwrap();
            let claims = verify(token);
            let state = match claims {
                Some(claims) => {
                    let a = auth.get::<String>(&claims.sub);
                    match a {
                        Some(v) => {
                            if v.urls.iter().any(|u| u.mapping_url.eq(upstream_request.uri.path())) {
                                Some(())
                            } else {
                                None
                            }
                        },
                        None => None,
                    }
                },
                None => None,
            };
            if let Some(_) = state {
                return Ok(());
            }
            Err(Box::new(Error {
                etype: ErrorType::InternalError,
                esource: ErrorSource::Internal,
                retry: RetryType::ReusedOnly,
                cause: None,
                context: None,
            }))
        },
        None => {
            Ok(())
        },
    }
}

上面 upstream_request_filter 方法的核心逻辑在于获取 HTTP 请求头中的 Authorization, 然后通过 verify 方法验证并解析 JWT 信息以便获得特定的信息. JWT 中存放了 Claims 信息 - 合法请求的话, 非法请求姑且不论, 而 Claims 结构体则包含了具体的 `` 等信息:

pub struct Claims {
    pub iss: String,            // Issuer of the JWT
    pub sub: String,            // Subject of the JWT (the user)
    pub exp: u64,               // Time after which the JWT expires
    pub nbf: u64,               // Time before which the JWT must not be accepted for processing
    pub iat: u64,               // Time at which the JWT was issued; can be used to determine age of the JWT
    pub jti: Option<String>,    // Unique identifier; can be used to prevent the JWT from being replayed (allows a token to be used only once)
}

在解析得到 client_id 后便可根据此前定义的 CTX 上下文对象进一步获取关于权限和 URL 的定义以进一步进行权限控制.

补充

上文中通过实现 new_ctx, upstream_peerupstream_request_filter 三个方法实现了一个初版的 Gateway, 但未涉及到关于权限信息的处理, 这里可通过订阅 Kafka 消息来实时更新 APPS 数据.

fn consumer_topic(brokers: &str, topics: &[&str], consumer_id: &str) {
    println!("KAFAK Consumer config: {brokers:?} {topics:?} {consumer_id:?}");
    let consumer: BaseConsumer = ClientConfig::new()
        .set("bootstrap.servers", brokers)
        .set("group.id", consumer_id)
        .create()
        .unwrap();
    consumer.subscribe(topics)
        .unwrap();
    for msg in consumer.iter() {
        match msg {
            Ok(m) => {
                let payload = match m.payload_view::<str>() {
                    None => None,
                    Some(Ok(v)) => Some(v),
                    Some(Err(err)) => None,
                };
                if payload.is_some() {
                    println!("Message: {:?}", payload.unwrap());
                    let auth = serde_json::from_str::<Auth>(&payload.unwrap());
                    match auth {
                        Ok(auth) => {
                            // TODO: partial update auth
                            println!("Auth: {:?}", auth);
                            let mut m = APPS.lock().unwrap();
                            println!("ClientId: {:?} Auth: {:?}", auth.client_id, m.get(&auth.client_id));
                            m.insert(auth.client_id.clone(), auth);
                        },
                        Err(err) => {
                            println!("Error: {:?}", err);
                        }
                    }
                }
            },
            Err(err) => {
            },
        }
    }
}

REF

  1. cloudflare/pingora
  2. RFC 7231 - Hypertext Transfer Protocol (HTTP/1.1): Semantics and Content