Skip to content

Commit

Permalink
Added support for client TLS authentication closes streamnative#197
Browse files Browse the repository at this point in the history
  • Loading branch information
afonsosribeiro committed Mar 16, 2023
1 parent bcfa020 commit 79c8b40
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 2 deletions.
39 changes: 39 additions & 0 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,25 @@ impl<Exe: Executor> PulsarBuilder<Exe> {
self
}

/// add a certificate and private key to authenticate the client in TLS connections
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub fn with_identity(mut self, certificate: Vec<u8>, private_key: Vec<u8>) -> Self {
match &mut self.tls_options {
Some(tls) => {
tls.certificate = Some(certificate);
tls.private_key = Some(private_key);
}
None => {
self.tls_options = Some(TlsOptions {
certificate: Some(certificate),
private_key: Some(private_key),
..Default::default()
})
}
}
self
}

#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub fn with_allow_insecure_connection(mut self, allow: bool) -> Self {
match &mut self.tls_options {
Expand Down Expand Up @@ -549,6 +568,26 @@ impl<Exe: Executor> PulsarBuilder<Exe> {
Ok(self.with_certificate_chain(v))
}

/// add a custom certificate chain from a file to authenticate the server in TLS connections
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub fn with_identity_files<P: AsRef<std::path::Path>>(
self,
certificate_path: P,
private_key_path: P,
) -> Result<Self, std::io::Error> {
use std::io::Read;

let mut file = std::fs::File::open(certificate_path)?;
let mut certificate = vec![];
file.read_to_end(&mut certificate)?;

let mut file = std::fs::File::open(private_key_path)?;
let mut private_key = vec![];
file.read_to_end(&mut private_key)?;

Ok(self.with_identity(certificate, private_key))
}

/// creates the Pulsar client and connects it
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub async fn build(self) -> Result<Pulsar<Exe>, Error> {
Expand Down
8 changes: 7 additions & 1 deletion src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use futures::{
task::{Context, Poll},
Future, FutureExt, Sink, SinkExt, Stream, StreamExt,
};
use native_tls::Certificate;
use native_tls::{Certificate, Identity};
use proto::MessageIdData;
use rand::{seq::SliceRandom, thread_rng};
use url::Url;
Expand Down Expand Up @@ -721,6 +721,7 @@ impl<Exe: Executor> Connection<Exe> {
auth_data: Option<Arc<Mutex<Box<dyn crate::authentication::Authentication>>>>,
proxy_to_broker_url: Option<String>,
certificate_chain: &[Certificate],
identity: &Option<Identity>,
allow_insecure_connection: bool,
tls_hostname_verification_enabled: bool,
connection_timeout: Duration,
Expand Down Expand Up @@ -779,6 +780,7 @@ impl<Exe: Executor> Connection<Exe> {
auth_data.clone(),
proxy_to_broker_url.clone(),
certificate_chain,
identity.clone(),
allow_insecure_connection,
tls_hostname_verification_enabled,
executor.clone(),
Expand Down Expand Up @@ -854,6 +856,7 @@ impl<Exe: Executor> Connection<Exe> {
auth: Option<Arc<Mutex<Box<dyn crate::authentication::Authentication>>>>,
proxy_to_broker_url: Option<String>,
certificate_chain: &[Certificate],
identity: Option<Identity>,
allow_insecure_connection: bool,
tls_hostname_verification_enabled: bool,
executor: Arc<Exe>,
Expand All @@ -869,6 +872,9 @@ impl<Exe: Executor> Connection<Exe> {
for certificate in certificate_chain {
builder.add_root_certificate(certificate.clone());
}
if let Some(identity) = identity {
builder.identity(identity);
}
builder.danger_accept_invalid_hostnames(
allow_insecure_connection && !tls_hostname_verification_enabled,
);
Expand Down
20 changes: 19 additions & 1 deletion src/connection_manager.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{collections::HashMap, sync::Arc, time::Duration};

use futures::{channel::oneshot, lock::Mutex};
use native_tls::Certificate;
use native_tls::{Certificate, Identity};
use rand::Rng;
use url::Url;

Expand Down Expand Up @@ -75,6 +75,12 @@ pub struct TlsOptions {
/// contains a list of PEM encoded certificates
pub certificate_chain: Option<Vec<u8>>,

/// PEM encoded X509 certificates
pub certificate: Option<Vec<u8>>,

/// is a PEM encoded PKCS #8 formatted private key for the leaf certificate
pub private_key: Option<Vec<u8>>,

/// allow insecure TLS connection if set to true
///
/// defaults to *false*
Expand All @@ -91,6 +97,8 @@ impl Default for TlsOptions {
fn default() -> Self {
Self {
certificate_chain: None,
certificate: None,
private_key: None,
allow_insecure_connection: false,
tls_hostname_verification_enabled: true,
}
Expand All @@ -117,6 +125,7 @@ pub struct ConnectionManager<Exe: Executor> {
pub(crate) operation_retry_options: OperationRetryOptions,
tls_options: TlsOptions,
certificate_chain: Vec<Certificate>,
identity: Option<Identity>,
}

impl<Exe: Executor> ConnectionManager<Exe> {
Expand Down Expand Up @@ -162,6 +171,13 @@ impl<Exe: Executor> ConnectionManager<Exe> {
}
};

let identity = match (tls_options.certificate.as_ref(), tls_options.private_key.as_ref()) {
(None, _) | (_, None) => None,
(Some(certificate), Some(privatekey)) => {
Some(native_tls::Identity::from_pkcs8(&certificate, &privatekey)?)
}
};

if let Some(auth) = auth.clone() {
auth.lock().await.initialize().await?;
}
Expand All @@ -175,6 +191,7 @@ impl<Exe: Executor> ConnectionManager<Exe> {
operation_retry_options,
tls_options,
certificate_chain,
identity,
};
let broker_address = BrokerAddress {
url: url.clone(),
Expand Down Expand Up @@ -292,6 +309,7 @@ impl<Exe: Executor> ConnectionManager<Exe> {
self.auth.clone(),
proxy_url.clone(),
&self.certificate_chain,
&self.identity,
self.tls_options.allow_insecure_connection,
self.tls_options.tls_hostname_verification_enabled,
self.connection_retry_options.connection_timeout,
Expand Down

0 comments on commit 79c8b40

Please sign in to comment.