Echo Server with Stream Semantics
This example demonstrates a simple echo server and client using stream semantics where data is received in arbitrary chunks.
Source Code
// SPDX-FileCopyrightText: 2025 Ben Jarvis
//
// SPDX-License-Identifier: LGPL-2.1-only
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <unistd.h>
#include <evpl/evpl.h>
/* This example implements a echo server that uses stream semantics. */
#define TRANSFER_SIZE (1024 * 1024) /* 1MB */
struct client_state {
int bytes_sent;
int bytes_received;
};
struct server_state {
int ready;
int run;
pthread_mutex_t mutex;
pthread_cond_t cond;
};
/* Server callback - echoes received data back to client */
void
server_callback(
struct evpl *evpl,
struct evpl_bind *bind,
struct evpl_notify *notify,
void *private_data)
{
struct evpl_iovec iovecs[16];
int niov, length;
struct server_state *state = private_data;
switch (notify->notify_type) {
case EVPL_NOTIFY_RECV_DATA:
/* We received some amount of data. We will get another callback only when we receive more,
* so we should consume as much as we can now
*/
while ((niov = evpl_recvv(evpl, bind, iovecs, 16, TRANSFER_SIZE, &length))) {
/* Echo it back (zero-copy).
* evpl_sendv takes ownership of the iovecs, so we do NOT
* release them here. The buffers will be freed automatically
* after the send completes.
*/
evpl_sendv(evpl, bind, iovecs, niov, length, EVPL_SEND_FLAG_TAKE_REF);
printf("[Server] Echoed %d bytes\n", length);
}
break;
case EVPL_NOTIFY_DISCONNECTED:
state->run = 0; /* Stop the server */
break;
} /* switch */
} /* server_callback */
/* Accept callback - sets up the server callback for new connections */
void
accept_callback(
struct evpl *evpl,
struct evpl_bind *bind,
evpl_notify_callback_t *notify_callback,
evpl_segment_callback_t *segment_callback,
void **conn_private_data,
void *private_data)
{
*notify_callback = server_callback;
*segment_callback = NULL; /* not needed for stream semantics */
*conn_private_data = private_data; /* Pass server state */
} /* accept_callback */
/* Server thread - runs event loop and accepts connections */
void *
server_thread(void *arg)
{
struct server_state *state = arg;
struct evpl *evpl;
struct evpl_listener *listener;
struct evpl_listener_binding *binding;
struct evpl_endpoint *endpoint;
/* Create evpl context for the server thread */
evpl = evpl_create(NULL);
/* Create endpoint (localhost, port 8000) */
endpoint = evpl_endpoint_create("127.0.0.1", 8000);
/* Create and configure listener */
listener = evpl_listener_create();
/* Attach listener to the evpl context */
binding = evpl_listener_attach(evpl, listener, accept_callback, state);
/* Start listening for incoming connections */
evpl_listen(listener, EVPL_STREAM_SOCKET_TCP, endpoint);
printf("[Server] Listening on port 8000\n");
/* Notify main thread we are ready so client thread can be started */
pthread_mutex_lock(&state->mutex);
state->ready = 1;
pthread_cond_signal(&state->cond);
pthread_mutex_unlock(&state->mutex);
/* Run event loop until stopped by the main thread */
while (state->run) {
evpl_continue(evpl);
}
printf("[Server] Shutting down\n");
/* Cleanup */
evpl_listener_detach(evpl, binding);
evpl_listener_destroy(listener);
evpl_endpoint_close(endpoint);
evpl_destroy(evpl);
return NULL;
} /* server_thread */
/* Client callback - sends 1MB and receives echo in chunks */
void
client_callback(
struct evpl *evpl,
struct evpl_bind *bind,
struct evpl_notify *notify,
void *private_data)
{
struct client_state *state = private_data;
struct evpl_iovec iov[8];
int niov, chunk_size;
switch (notify->notify_type) {
case EVPL_NOTIFY_CONNECTED:
printf("[Client] Connected! Sending %d bytes\n", TRANSFER_SIZE);
niov = evpl_iovec_alloc(evpl, TRANSFER_SIZE, 0, 8, iov);
for (int i = 0; i < niov; i++) {
memset(evpl_iovec_data(&iov[i]), 0xbeef, evpl_iovec_length(&iov[i]));
}
state->bytes_sent = TRANSFER_SIZE;
/* evpl owns iovecs after send */
evpl_sendv(evpl, bind, iov, niov, TRANSFER_SIZE, EVPL_SEND_FLAG_TAKE_REF);
break;
case EVPL_NOTIFY_RECV_DATA:
/* We received some amount of data. We will get another callback only when we receive more,
* so we should consume as much as we can now
*/
while ((niov = evpl_recvv(evpl, bind, iov, 8, TRANSFER_SIZE, &chunk_size))) {
state->bytes_received += chunk_size;
/* Release buffers - we're done with them */
for (int i = 0; i < niov; i++) {
evpl_iovec_release(&iov[i]);
}
printf("[Client] Received %d bytes now %d bytes total\n", chunk_size, state->bytes_received);
/* Check if we've received everything */
if (state->bytes_received >= TRANSFER_SIZE) {
evpl_close(evpl, bind);
}
}
break;
case EVPL_NOTIFY_DISCONNECTED:
/* Client is done */
break;
} /* switch */
} /* client_callback */
/* Client thread - connects and transfers data */
void *
client_thread(void *arg)
{
struct client_state *state = arg;
struct evpl *evpl;
struct evpl_endpoint *endpoint;
struct evpl_bind *bind;
/* Create evpl context for the client thread */
evpl = evpl_create(NULL);
/* Connect to server */
endpoint = evpl_endpoint_create("127.0.0.1", 8000);
bind = evpl_connect(evpl, EVPL_STREAM_SOCKET_TCP,
NULL, endpoint, client_callback, NULL, state);
if (!bind) {
fprintf(stderr, "[Client] Failed to connect\n");
evpl_destroy(evpl);
return NULL;
}
/* Run event loop until transfer completes */
while (state->bytes_received < TRANSFER_SIZE) {
evpl_continue(evpl);
}
printf("[Client] Transfer complete\n");
/* Explicitly closing endpoints is optional, otherwise they are cleaned up on process exit */
evpl_endpoint_close(endpoint);
/* Destroy the evpl context before the thread exits */
evpl_destroy(evpl);
return NULL;
} /* client_thread */
int
main(
int argc,
char *argv[])
{
struct server_state server_state = { .run = 1, .ready = 0 };
struct client_state client_state = { 0 };
pthread_t server_tid;
pthread_t client_tid;
pthread_mutex_init(&server_state.mutex, NULL);
pthread_cond_init(&server_state.cond, NULL);
/* Initialize libevpl */
evpl_init(NULL);
/* Start server thread */
printf("Starting server thread\n");
pthread_create(&server_tid, NULL, server_thread, &server_state);
pthread_mutex_lock(&server_state.mutex);
while (!server_state.ready) {
pthread_cond_wait(&server_state.cond, &server_state.mutex);
}
pthread_mutex_unlock(&server_state.mutex);
/* Start client thread */
printf("Starting client thread\n");
pthread_create(&client_tid, NULL, client_thread, &client_state);
/* Wait for client to finish */
pthread_join(client_tid, NULL);
/* Stop server */
server_state.run = 0;
pthread_join(server_tid, NULL);
return 0;
} /* main */See Also
- Event Loop API - Event loop management
- Binds API - Network connections and I/O
- Echo Server (Message) - Message-based alternative