在讲完一些有的没的(?)之后,我们继续回到 Pingora 的主线。这一篇我们来看一看 Peer,也就是 pingora-core::upstreams 中的一些结构与实现。
ToC
从 Example 继续
回到 Getting Started 开头的 Example。这次我们的注意力放在 HttpPeer 上:
use async_trait::async_trait;use pingora::prelude::*;use std::sync::Arc;
pub struct LB(Arc<LoadBalancer<RoundRobin>>);
#[async_trait]impl ProxyHttp for LB {7 collapsed lines
/// For this small example, we don't need context storage type CTX = ();
fn new_ctx(&self) -> () { () }
async fn upstream_peer(&self, _session: &mut Session, _ctx: &mut ()) -> Result<Box<HttpPeer>> { let upstream = self .0 .select(b"", 256) // hash doesn't matter for round robin .unwrap();
println!("upstream peer is: {upstream:?}");
// Set SNI to one.one.one.one let peer = Box::new(HttpPeer::new(upstream, true, "one.one.one.one".to_string())); Ok(peer) }25 collapsed lines
async fn upstream_request_filter( &self, _session: &mut Session, upstream_request: &mut RequestHeader, _ctx: &mut Self::CTX, ) -> Result<()> { upstream_request .insert_header("Host", "one.one.one.one") .unwrap(); Ok(()) }}
fn main() { let mut my_server = Server::new(None).unwrap(); my_server.bootstrap();
let upstreams = LoadBalancer::try_from_iter(["1.1.1.1:443", "1.0.0.1:443"]).unwrap(); let mut lb = http_proxy_service(&my_server.configuration, LB(Arc::new(upstreams))); lb.add_tcp("0.0.0.0:6188");
my_server.add_service(lb); my_server.run_forever();}可以看到,在实现 ProxyHttp 的过程中,我们需要返回一个 HttpPeer 用于连接。HttpPeer,以及它实现了的 trait Peer,都是定义在 pingora-core 中的结构。让我们深入看看。
trait Peer
Peer 是 Connector 在连接时用于承载连接参数的结构。它的 Trait 定义如下:
/// [`Peer`] defines the interface to communicate with the [`crate::connectors`] regarding where to/// connect to and how to connect to it.pub trait Peer: Display + Clone { /// The remote address to connect to fn address(&self) -> &SocketAddr; /// If TLS should be used; fn tls(&self) -> bool; /// The SNI to send, if TLS is used fn sni(&self) -> &str; /// To decide whether a [`Peer`] can use the connection established by another [`Peer`]. /// /// The connection to two peers are considered reusable to each other if their reuse hashes are /// the same fn reuse_hash(&self) -> u64;103 collapsed lines
/// Get the proxy setting to connect to the remote server fn get_proxy(&self) -> Option<&Proxy> { None } /// Get the additional options to connect to the peer. /// /// See [`PeerOptions`] for more details fn get_peer_options(&self) -> Option<&PeerOptions> { None } /// Get the additional options for modification. fn get_mut_peer_options(&mut self) -> Option<&mut PeerOptions> { None } /// Whether the TLS handshake should validate the cert of the server. fn verify_cert(&self) -> bool { match self.get_peer_options() { Some(opt) => opt.verify_cert, None => false, } } /// Whether the TLS handshake should verify that the server cert matches the SNI. fn verify_hostname(&self) -> bool { match self.get_peer_options() { Some(opt) => opt.verify_hostname, None => false, } } /// The alternative common name to use to verify the server cert. /// /// If the server cert doesn't match the SNI, this name will be used to /// verify the cert. fn alternative_cn(&self) -> Option<&String> { match self.get_peer_options() { Some(opt) => opt.alternative_cn.as_ref(), None => None, } } /// Which local source address this connection should be bind to. fn bind_to(&self) -> Option<&InetSocketAddr> { match self.get_peer_options() { Some(opt) => opt.bind_to.as_ref(), None => None, } } /// How long connect() call should be wait before it returns a timeout error. fn connection_timeout(&self) -> Option<Duration> { match self.get_peer_options() { Some(opt) => opt.connection_timeout, None => None, } } /// How long the overall connection establishment should take before a timeout error is returned. fn total_connection_timeout(&self) -> Option<Duration> { match self.get_peer_options() { Some(opt) => opt.total_connection_timeout, None => None, } } /// If the connection can be reused, how long the connection should wait to be reused before it /// shuts down. fn idle_timeout(&self) -> Option<Duration> { self.get_peer_options().and_then(|o| o.idle_timeout) }
/// Get the ALPN preference. fn get_alpn(&self) -> Option<&ALPN> { self.get_peer_options().map(|opt| &opt.alpn) }
/// Get the CA cert to use to validate the server cert. /// /// If not set, the default CAs will be used. fn get_ca(&self) -> Option<&Arc<Box<[X509]>>> { match self.get_peer_options() { Some(opt) => opt.ca.as_ref(), None => None, } }
/// Get the client cert and key for mutual TLS if any fn get_client_cert_key(&self) -> Option<&Arc<CertKey>> { None }
/// The TCP keepalive setting that should be applied to this connection fn tcp_keepalive(&self) -> Option<&TcpKeepalive> { self.get_peer_options() .and_then(|o| o.tcp_keepalive.as_ref()) }
/// The interval H2 pings to send to the server if any fn h2_ping_interval(&self) -> Option<Duration> { self.get_peer_options().and_then(|o| o.h2_ping_interval) }
fn matches_fd<V: AsRawFd>(&self, fd: V) -> bool { self.address().check_fd_match(fd) }
fn get_tracer(&self) -> Option<Tracer> { None }}为了节省篇幅,这里将大量拥有默认实现的 trait Method 折叠了。有需要可以自行展开观看。从最核心的角度出发,Peer 必须实现的,也就是连接的过程中必须告知 Connector 的属性有:
| 方法 | 简介 |
|---|---|
address() | 连接的地址。 |
tls() | 是否要以 TLS 连接。 |
sni() | 当使用 TLS 时的 SNI。 |
reuse_hash() | 是否应该复用连接。当这个值相等时,则可以复用。 |
初次之外,还有一些有用的属性,包括:
get_proxy(): 是否使用代理连接verify_cert(): 是否检查证书verify_hostname: 是否检查Hostnameconnection_timeout(): 通过connect建立连接时的超时时间total_connection_timeout(): 连接的总超时时间matches_fd(): 判断某一FD是否和该连接匹配get_tracer(): 获取该连接对应的Tracer
Tracing 与 Tracer
在 trait Peer 中,定义了一种 Tracer 类型。这也是 Peer 用于追踪的手段。Tracer 需要实现 trace Tracing,负责在连接成功或失败时被调用。定义如下:
/// The interface to trace the connectionpub trait Tracing: Send + Sync + std::fmt::Debug { /// This method is called when successfully connected to a remote server fn on_connected(&self); /// This method is called when the connection is disconnected. fn on_disconnected(&self); /// A way to clone itself fn boxed_clone(&self) -> Box<dyn Tracing>;}
/// An object-safe version of Tracing object that can use Clone#[derive(Debug)]pub struct Tracer(pub Box<dyn Tracing>);BasicPeer
在了解完 trait Peer 的定义后,我们来看一个简单的 Peer 实现:BasicPeer。Pingora 在一些简单的场景中会用到 BasicPeer,比如 TcpHealthCheck 和单元测试。这个实现也比较粗糙,目前只支持建立到 address 的非 TLS 连接。源码非常简单,基本就是把 field 填了一下。如下所示:
/// A simple TCP or TLS peer without many complicated settings.#[derive(Debug, Clone)]pub struct BasicPeer { pub _address: SocketAddr, pub sni: String, pub options: PeerOptions,}
impl BasicPeer { /// Create a new [`BasicPeer`] pub fn new(address: &str) -> Self { BasicPeer { _address: SocketAddr::Inet(address.parse().unwrap()), // TODO: check error, add support // for UDS sni: "".to_string(), // TODO: add support for SNI options: PeerOptions::new(), } }}7 collapsed lines
impl Display for BasicPeer { fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult { write!(f, "{:?}", self) }}
impl Peer for BasicPeer { fn address(&self) -> &SocketAddr { &self._address }
fn tls(&self) -> bool { !self.sni.is_empty() }
fn bind_to(&self) -> Option<&InetSocketAddr> { None }
fn sni(&self) -> &str { &self.sni }
// TODO: change connection pool to accept u64 instead of String fn reuse_hash(&self) -> u64 { let mut hasher = AHasher::default(); self._address.hash(&mut hasher); hasher.finish() }
fn get_peer_options(&self) -> Option<&PeerOptions> { Some(&self.options) }}HttpPeer
看完了比较简单的,再来看看相对复杂一些的。HttpPeer,也就是我们在 Example 中用到的结构。相比 BasicPeer,它额外支持了 https、SNI、代理、客户端证书,还有诸如 UDS(Unix Domain Socket) 的使用。
/// A peer representing the remote HTTP server to connect to#[derive(Debug, Clone)]pub struct HttpPeer { pub _address: SocketAddr, pub scheme: Scheme, pub sni: String, pub proxy: Option<Proxy>, pub client_cert_key: Option<Arc<CertKey>>, pub options: PeerOptions,}
impl HttpPeer { // These methods are pretty ad-hoc pub fn is_tls(&self) -> bool { match self.scheme { Scheme::HTTP => false, Scheme::HTTPS => true, } }
fn new_from_sockaddr(address: SocketAddr, tls: bool, sni: String) -> Self { HttpPeer { _address: address, scheme: Scheme::from_tls_bool(tls), sni, proxy: None, client_cert_key: None, options: PeerOptions::new(), } }
/// Create a new [`HttpPeer`] with the given socket address and TLS settings. pub fn new<A: ToInetSocketAddrs>(address: A, tls: bool, sni: String) -> Self { let mut addrs_iter = address.to_socket_addrs().unwrap(); //TODO: handle error let addr = addrs_iter.next().unwrap(); Self::new_from_sockaddr(SocketAddr::Inet(addr), tls, sni) }
/// Create a new [`HttpPeer`] with the given path to Unix domain socket and TLS settings. pub fn new_uds(path: &str, tls: bool, sni: String) -> Self { let addr = SocketAddr::Unix(UnixSocketAddr::from_pathname(Path::new(path)).unwrap()); //TODO: handle error Self::new_from_sockaddr(addr, tls, sni) }
/// Create a new [`HttpPeer`] that uses a proxy to connect to the upstream IP and port /// combination. pub fn new_proxy( next_hop: &str, ip_addr: IpAddr, port: u16, tls: bool, sni: &str, headers: BTreeMap<String, Vec<u8>>, ) -> Self { HttpPeer { _address: SocketAddr::Inet(InetSocketAddr::new(ip_addr, port)), scheme: Scheme::from_tls_bool(tls), sni: sni.to_string(), proxy: Some(Proxy { next_hop: PathBuf::from(next_hop).into(), host: ip_addr.to_string(), port, headers, }), client_cert_key: None, options: PeerOptions::new(), } }
fn peer_hash(&self) -> u64 { let mut hasher = AHasher::default(); self.hash(&mut hasher); hasher.finish() }}32 collapsed lines
impl Hash for HttpPeer { fn hash<H: Hasher>(&self, state: &mut H) { self._address.hash(state); self.scheme.hash(state); self.proxy.hash(state); self.sni.hash(state); // client cert serial self.client_cert_key.hash(state); // origin server cert verification self.verify_cert().hash(state); self.verify_hostname().hash(state); self.alternative_cn().hash(state); }}
impl Display for HttpPeer { fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult { write!(f, "addr: {}, scheme: {},", self._address, self.scheme)?; if !self.sni.is_empty() { write!(f, "sni: {},", self.sni)?; } if let Some(p) = self.proxy.as_ref() { write!(f, "proxy: {p},")?; } if let Some(cert) = &self.client_cert_key { write!(f, "client cert: {},", cert)?; } Ok(()) }}
impl Peer for HttpPeer { fn address(&self) -> &SocketAddr { &self._address }
fn tls(&self) -> bool { self.is_tls() }
fn sni(&self) -> &str { &self.sni }
// TODO: change connection pool to accept u64 instead of String fn reuse_hash(&self) -> u64 { self.peer_hash() }
fn get_peer_options(&self) -> Option<&PeerOptions> { Some(&self.options) }
fn get_mut_peer_options(&mut self) -> Option<&mut PeerOptions> { Some(&mut self.options) }
fn get_proxy(&self) -> Option<&Proxy> { self.proxy.as_ref() }
fn matches_fd<V: AsRawFd>(&self, fd: V) -> bool { if let Some(proxy) = self.get_proxy() { proxy.next_hop.check_fd_match(fd) } else { self.address().check_fd_match(fd) } }
fn get_client_cert_key(&self) -> Option<&Arc<CertKey>> { self.client_cert_key.as_ref() }
fn get_tracer(&self) -> Option<Tracer> { self.options.tracer.clone() }}HttpPeer 支持三种构造形式:
[1]:new(),可以建立到某个SocketAddr的连接。[2]:new_uds(),可以通过Unix Domain Socket的路径建立连接。[3]:new_proxy(),则可以以一个http proxy作为跳板继续建立连接。
Proxy
HttpProxy 中使用的 Proxy 支持连接任意 host:port 或 UDS 路径,并可以指定额外的 HTTP Header,定义如下:
/// The proxy settings to connect to the remote server, CONNECT only for now#[derive(Debug, Hash, Clone)]pub struct Proxy { pub next_hop: Box<Path>, // for now this will be the path to the UDS pub host: String, // the proxied host. Could be either IP addr or hostname. pub port: u16, // the port to proxy to pub headers: BTreeMap<String, Vec<u8>>, // the additional headers to add to CONNECT}12 collapsed lines
impl Display for Proxy { fn fmt(&self, f: &mut Formatter) -> FmtResult { write!( f, "next_hop: {}, host: {}, port: {}", self.next_hop.display(), self.host, self.port ) }}