"async, wow, async async async"
3
Steps:
1. Create our socket
2. Bind it to an address
3. Put it in "server" mode (i.e. call listen on it)
4. Create an event queue
5. Add the server socket to the event queue
6. Wait for events in a loop
7. Application logic involving reading/writing/closing the socket
8. Remove socket from queue when done
9. Goto step 6
Creo que no hay nada que le guste mas a los ignorantes en software que hablar de async. Necesitamos una API async! Compañero, necesito que las llamadas sean asyncronas. Estamos migrando los servidores HTTP a async!
Voy a usar mio, que es una abstracción encima de epoll, kqueue, IOCP. La desgracia que es poll me la voy a saltar, bastante tenemos que sufrir con la porqueria de epoll, de lo peor que se ha escrito en al historia de la informática. Y de io_uring, de momento, mejor no hablemos.
La manera en que funciona una cola es muy simple. Simplificando para nuestro caso de uso, el kernel nos avisara mediante la cola cuando podamos leer o escribir sin bloquear. Imaginaros este caso:
- mi socket esta ocupado
- trato de leer del socket, como esta ocupado me bloqueo
Lo que vamos a hacer es:
- registrar el socket, avisame cuando pueda escribir y leer
- si el socket esta ocupado, mi codigo hace otras cosas
- la cola me avisa que el socekt ya esta listo, lo uso y no me bloqueo
Wealth, status, pride,
are their own ruin.
To do good, work well, and lie low
is the way of the blessing.
Para usar la cola, necesitamos definir un tamaño máximo como upper bound. Voy a elegir 1024 clientes. Cada uno de los clientes tendra un Token asignado de 0..1023. Y necesitamos además un Token para nuestro server, voy a elegir el 1025.
const MAX_CLIENTS: usize = 1024;
const LISTENER: Token = Token(MAX_CLIENTS + 1);
Asi todos nuestros sockets iran del 0, 1, 2, 3, 4... 1023
.
Cada vez que hagamos poll, la cola nos devolvera una lista de eventos que se podra escribir o leer.
Oye el socket 1, 3 y 6 puedes leer, el socket 2 puedes escribir.
Si no me llega ningúna notificaciones es que no puedo realizar operaciones sin bloquear.
Veamos como debemos inicializarlo con mio:
Tenemos:
- Un servidor, listener, asignado a nuestro puerto 8080
- Un Poll de mio, abstrae: epoll, kqueue, IOPC
- Una lista de eventos, necesario para estas estructuras de datos, MAX_CLIENTS + 1, que es el +1? Lo acabamos de ver, el servidor.
- Un Slab de Connection, (https://crates.io/crates/slab) un slab es una especie de array que nos sera muy util.
Que queremos? Seguiremos los pasos descritos arriba. El primero es que nuestro servidor acepte conexiónes.
Registro el listener como READABLE, cada vez que se pueda leer del listener nos llegara un evento. Esto NO significa que haya llegado una nueva conexión, no lo sabemos, lo que sabemos es que podremos leer de ese socket y la operación funcionara.
El codigo es muy simple, en un loop hacemos poll, este tiene un timeout que bloquea hasta que el kernel nos notifica que hay un evento. Como le hemos pasado toda la lista de eventos que estamos interesados, el kernel nos devolvera los que podemos leer y escribir, tanto servidor como clientes, como hemos visto en el ejemplo anterior.
Cada evento tiene un token que lo identifica, en este caso tenemos nuestro servidor LISTENER, que es el Token(1025) y token para los clientes que van de Token(0) a Token(1023) como hemos visto.
- Para cada evento de LISTENER vamos a aceptar una conexión y añadirla a nuestros eventos y connections.
- Para cada evento de un cliente, ya sea leer o escribir, haremos lo que toque.
Aqui es donde el codigo se vuelve loco.
El servidor es muy simple, para cada conexión, la añadimos en nuestro slab, y actualizamos nuestro registry como READABLE. Nos interesa leer de ese cliente.
uff que duro este codigo, y no esta ni al 50%.
hacemos exactamente lo que haciamos antes bloqueando, peor ahora tenemos que actualizar el poll y gestionar los errores.
- cliente lee
- si lee n == 0, es que no hay nada que hacer, limpia la conexion del registry y del slab
- si lee n > 0, es que hay algo que escribir, no sabemos si esto es parcial o no, solo sabemos que hemos leido algo, asique marcamos como WRITEABLE
- si se da el caso de que el event es WRITEABLE, escribimos
cuantos bugs tiene este codigo? infinitos. voy a resolver unos cuantos.
Funciona? Hombre, si escribimos a mano con un par de clientes funciona. Pero no esta bien.
En Rust usamos ?
cuando queremos devolver un error, estos errores... no se pueden devolver nunca asi como asi... hay que gestionarlos. En el codigo del cliente tenemos 4 ?, en el del server 2?...
Can you keep your soul in its body,
hold fast to the one,
and so learn to be whole?
Can you center your energy,
be soft, tender,
and so learn to be a baby?
Empecemos con el server:
- Primero que pasa si el
listener.accept()
falla? Lo voy a ignorar.
- Segundo que pasa si el
registry().register()
falla? Lo voy a re-intentar hasta que funcione.
Ahora vamos a seguir con el cliente:
- Primero que pasa si el
connection.read()
falla? Lo voy a ignorar.
- Que pasa si el
registry().deregister()
falla? Lo voy a re-intentar hasta que funcione.
- Segundo que pasa si el
connection.write()
falla? Lo voy a ignorar.
Esta esto bien?
spoiler#[cfg(test)]
mod tests {
use super::*;
use std::{
io::{Read, Write},
net::{SocketAddr, TcpStream},
thread::{self},
};
fn setup_server() -> Result<(Server, SocketAddr)> {
let server_addr = "127.0.0.1:0".parse::<SocketAddr>()?;
let server = Server::bind(server_addr)?;
let local_addr = server.listener.local_addr()?;
Ok((server, local_addr))
}
#[test]
fn test_server_single_message() -> Result<()> {
let (mut server, local_addr) = setup_server()?;
thread::spawn(move || {
server.run().unwrap();
});
let mut stream = TcpStream::connect(local_addr)?;
stream.set_nodelay(true)?;
let test_msg = b"Hello, Server!";
stream.write_all(test_msg)?;
stream.flush()?;
let mut buf = vec![0; test_msg.len()];
stream.read_exact(&mut buf)?;
assert_eq!(&buf, test_msg);
Ok(())
}
#[test]
fn test_server_multiple_writes() -> Result<()> {
let (mut server, local_addr) = setup_server()?;
thread::spawn(move || server.run().unwrap());
let mut stream = TcpStream::connect(local_addr)?;
stream.set_nodelay(true)?;
let test_msg1 = b"First";
let test_msg2 = b"Second";
stream.write_all(test_msg1)?;
stream.write_all(test_msg2)?;
stream.flush()?;
let expected = [&test_msg1[..], &test_msg2[..]].concat();
let mut buf = vec![0; expected.len()];
stream.read_exact(&mut buf)?;
assert_eq!(buf, expected);
Ok(())
}
#[test]
fn test_server_large_message() -> Result<()> {
let (mut server, local_addr) = setup_server()?;
thread::spawn(move || server.run().unwrap());
let mut stream = TcpStream::connect(local_addr)?;
stream.set_nodelay(true)?;
let test_msg = vec![b'X'; 1024];
stream.write_all(&test_msg)?;
stream.flush()?;
let mut buf = vec![0; test_msg.len()];
stream.read_exact(&mut buf)?;
assert_eq!(buf, test_msg);
Ok(())
}
#[test]
fn test_server_message_larger_than_buffer() -> Result<()> {
let (mut server, local_addr) = setup_server()?;
thread::spawn(move || server.run().unwrap());
let mut stream = TcpStream::connect(local_addr)?;
stream.set_nodelay(true)?;
let buffer_size = 1024;
let test_msg = vec![b'X'; buffer_size + 500];
stream.write_all(&test_msg)?;
stream.flush()?;
let mut buf = vec![0; buffer_size];
stream.read_exact(&mut buf)?;
assert_eq!(&buf[..], &test_msg[..buffer_size]);
Ok(())
}
#[test]
fn test_server_concurrent_clients() -> Result<()> {
let (mut server, local_addr) = setup_server()?;
thread::spawn(move || server.run().unwrap());
let mut handles = vec![];
let num_clients = 5;
for i in 0..num_clients {
let addr = local_addr;
handles.push(thread::spawn(move || {
let mut stream = TcpStream::connect(addr).unwrap();
stream.set_nodelay(true).unwrap();
let msg = format!("Client{}", i);
stream.write_all(msg.as_bytes()).unwrap();
stream.flush().unwrap();
let mut buf = vec![0; msg.len()];
stream.read_exact(&mut buf).unwrap();
assert_eq!(buf, msg.as_bytes());
}));
}
for handle in handles {
handle.join().unwrap();
}
Ok(())
}
#[test]
fn test_server_client_disconnect_reconnect() -> Result<()> {
let (mut server, local_addr) = setup_server()?;
thread::spawn(move || server.run().unwrap());
// First connection
let mut stream = TcpStream::connect(local_addr)?;
stream.write_all(b"hello")?;
stream.flush()?;
drop(stream); // Force disconnect
// Small delay to ensure disconnect is processed
thread::sleep(std::time::Duration::from_millis(10));
// Reconnect
let mut stream2 = TcpStream::connect(local_addr)?;
stream2.write_all(b"world")?;
stream2.flush()?;
let mut buf = vec![0; 5];
stream2.read_exact(&mut buf)?;
assert_eq!(&buf, b"world");
Ok(())
}
#[test]
fn test_server_zero_length_message() -> Result<()> {
let (mut server, local_addr) = setup_server()?;
thread::spawn(move || server.run().unwrap());
let mut stream = TcpStream::connect(local_addr)?;
stream.set_nodelay(true)?;
stream.write_all(b"")?;
stream.flush()?;
// Write a real message after empty one
let msg = b"test";
stream.write_all(msg)?;
stream.flush()?;
let mut buf = vec![0; msg.len()];
stream.read_exact(&mut buf)?;
assert_eq!(&buf, msg);
Ok(())
}
#[test]
fn test_server_rapid_connect_disconnect() -> Result<()> {
let (mut server, local_addr) = setup_server()?;
thread::spawn(move || server.run().unwrap());
for _ in 0..10 {
let stream = TcpStream::connect(local_addr)?;
drop(stream);
}
// Final connection to verify server still works
let mut stream = TcpStream::connect(local_addr)?;
stream.write_all(b"test")?;
stream.flush()?;
let mut buf = vec![0; 4];
stream.read_exact(&mut buf)?;
assert_eq!(&buf, b"test");
Ok(())
}
}
Lo suyo es escribir unos tests y bueno, todo funciona con estos, ya hemos testeado a parte que nuestra Connection tenga partial read y partial writes y funcione. El problema aqui esta en los edge cases asyncronos del kernel.
solucionuse std::net::SocketAddr;
use anyhow::Result;
use log::{debug, error};
mod connection;
use connection::Connection;
use mio::{
net::{TcpListener, TcpStream},
Events, Poll, Token,
};
use slab::Slab;
const MAX_CLIENTS: usize = 1024;
const LISTENER: Token = Token(MAX_CLIENTS + 1);
struct Server {
listener: TcpListener,
poll: Poll,
events: Events,
connections: Slab<Connection<TcpStream>>,
}
impl Server {
fn bind(addr: SocketAddr) -> Result<Self> {
let mut listener = TcpListener::bind(addr)?;
debug!("server listening on {:?}", listener.local_addr()?);
let poll = Poll::new()?;
let events = Events::with_capacity(MAX_CLIENTS + 1);
let connections = Slab::with_capacity(MAX_CLIENTS);
poll.registry()
.register(&mut listener, LISTENER, mio::Interest::READABLE)?;
Ok(Self {
listener,
poll,
events,
connections,
})
}
fn run(&mut self) -> Result<()> {
loop {
self.poll.poll(&mut self.events, None)?;
for event in self.events.iter() {
match event.token() {
LISTENER => {
if let Ok((socket, addr)) = self.listener.accept() {
debug!("server: Connection from {:?}", addr);
let connection = Connection::with_capacity(socket, 1024);
let key = self.connections.insert(connection);
while let Err(e) = self.poll.registry().register(
&mut self.connections[key].socket,
Token(key),
mio::Interest::READABLE,
) {
error!("server: Failed to register client: {:?}", e);
}
} else {
debug!("server: Failed to accept connection");
}
}
token => {
if let Some(connection) = self.connections.get_mut(token.0) {
if event.is_readable() {
if let Ok(n) = connection.read() {
if n == 0 {
debug!("Client disconnected");
while let Err(e) =
self.poll.registry().deregister(&mut connection.socket)
{
error!("server: Failed to deregister client: {:?}", e);
}
self.connections.remove(token.0);
continue;
} else {
while let Err(e) = self.poll.registry().reregister(
&mut connection.socket,
token,
mio::Interest::READABLE.add(mio::Interest::WRITABLE),
) {
error!("server: Failed to reregister client: {:?}", e);
}
}
}
} else if event.is_writable() {
if let Err(e) = connection.write() {
error!("server: Failed to write to client: {:?}", e);
}
}
}
}
}
}
}
}
}
fn main() -> Result<()> {
env_logger::init();
let mut server = Server::bind("127.0.0.1:8080".parse()?)?;
server.run()?;
Ok(())
}