Quick guide to using the plugin-ipc library from C, Rust, and Go.
For full specifications, see the other docs in this directory:
codec.md, level1-*.md, level2-typed-api.md, level3-snapshot-api.md.
The library has four layers:
- Codec (
protocol/): wire format encode/decode. Platform-independent. - Level 1 (
transport/): connection lifecycle, send/receive, chunking, handshake. Treats payloads as opaque bytes. - Level 2 (
service/): typed client context with retry, managed server with multi-client worker dispatch. Composes L1 + Codec. - Level 3 (
service/): snapshot cache with O(1) hash lookup, periodic refresh, cache preservation on failure. Built on L2.
Transports: UDS + SHM (POSIX/Linux), Named Pipe + Win SHM (Windows).
If SHM is selected during handshake, successful HELLO_ACK already
guarantees the server has already prepared SHM for that session. If the
client still cannot attach SHM locally, it closes that session and
reconnects without SHM. This remains transparent to
L2/L3 callers.
The Level 2 examples below are schematic. They show the intended public API shape from the specifications:
- Clients use typed calls only
- Servers expose one service kind only
- Request/response scratch buffers remain internal to the library
- Typed callers do not provide
max_request_payload_bytes
Clients connect to services, not plugins.
- the client knows
service_namespace + service_name - the client does not know which plugin/process implements that service
- one service endpoint serves one request kind only
Examples:
cgroups-snapshotip-to-asnpid-traffic
Runtime expectations:
- plugins may start in any order
- a client may start before the service exists
- a provider may restart or disappear
- optional enrichments must not break the caller
- clients should call
Refresh()periodically and tolerate temporary absence
#include "netipc/netipc_service.h"
nipc_client_config_t cfg = {
.supported_profiles = NIPC_PROFILE_BASELINE,
.max_request_batch_items = 1,
.max_response_payload_bytes = 65536,
.auth_token = 0xDEADBEEFCAFEBABE,
};
nipc_client_ctx_t client;
nipc_client_init(&client, "/run/netdata", "cgroups-snapshot", &cfg);
nipc_client_refresh(&client); /* attempt connect */
if (nipc_client_ready(&client)) {
nipc_cgroups_resp_view_t view;
nipc_error_t err = nipc_client_call_cgroups_snapshot(&client, &view);
if (err == NIPC_OK) {
/* view.item_count, view.generation, etc. */
for (uint32_t i = 0; i < view.item_count; i++) {
nipc_cgroups_item_view_t item;
nipc_cgroups_resp_item(&view, i, &item);
/* item.hash, item.name.ptr/len, item.path.ptr/len */
}
/* view is valid until the next typed call on this client */
}
}
nipc_client_close(&client);use netipc::service::cgroups::{CgroupsClient, ClientConfig};
use netipc::protocol::PROFILE_BASELINE;
let config = ClientConfig {
supported_profiles: PROFILE_BASELINE,
max_request_batch_items: 1,
max_response_payload_bytes: 65536,
auth_token: 0xDEADBEEFCAFEBABE,
..ClientConfig::default()
};
let mut client = CgroupsClient::new("/run/netdata", "cgroups-snapshot", config);
client.refresh(); // attempt connect
if client.ready() {
match client.call_snapshot() {
Ok(view) => {
for i in 0..view.item_count {
let item = view.item(i).unwrap();
// item.hash, item.name.as_str(), item.path.as_str()
}
// view is valid until the next typed call on this client
}
Err(e) => eprintln!("call failed: {:?}", e),
}
}
client.close();import "github.com/netdata/plugin-ipc/go/pkg/netipc/service/cgroups"
import "github.com/netdata/plugin-ipc/go/pkg/netipc/protocol"
config := cgroups.ClientConfig{
SupportedProfiles: protocol.ProfileBaseline,
MaxRequestBatchItems: 1,
MaxResponsePayloadBytes: 65536,
AuthToken: 0xDEADBEEFCAFEBABE,
}
client := cgroups.NewClient("/run/netdata", "cgroups-snapshot", config)
defer client.Close()
client.Refresh() // attempt connect
if client.Ready() {
view, err := client.CallSnapshot()
if err == nil {
for i := uint32(0); i < view.ItemCount; i++ {
item, _ := view.Item(i)
// item.Hash, item.Name, item.Path
}
// view is valid until the next typed call on this client
}
}The managed server receives wire messages, but that is internal to the library. The public L2 server contract is typed:
- You expose one service kind only
- Callbacks receive decoded request data
- Callbacks return typed response data or fill typed builders
- User code never switches on plugin identity,
method_code, or raw payload bytes or raw response buffers
#include "netipc/netipc_service.h"
bool on_cgroups(void *user, const nipc_cgroups_req_t *req,
nipc_cgroups_builder_t *builder) {
/* Fill builder with cgroup items */
return true;
}
/* Public L2 shape: one service kind, one typed handler surface */
nipc_cgroups_service_handler_t service_handler = {
.handle = on_cgroups,
.user = NULL,
};
nipc_server_config_t scfg = {
.supported_profiles = NIPC_PROFILE_BASELINE,
.max_request_batch_items = 1,
.max_response_payload_bytes = 65536,
.auth_token = 0xDEADBEEFCAFEBABE,
};
nipc_managed_server_t server;
nipc_server_init_typed(&server, "/run/netdata", "cgroups-snapshot", &scfg,
4, &service_handler);
/* Blocking: accepts clients, performs typed dispatch internally */
nipc_server_run(&server);
/* From another thread: */
nipc_server_stop(&server);
/* Or graceful drain (waits up to 5s for in-flight requests): */
nipc_server_drain(&server, 5000);
nipc_server_destroy(&server);use netipc::service::cgroups::{Handler, ManagedServer, ServerConfig};
let handler = Handler {
handle: Some(Box::new(|req, builder| {
// Fill builder with cgroup items
true
})),
..Default::default()
};
let config = ServerConfig { /* ... */ };
let mut server = ManagedServer::new(
"/run/netdata", "cgroups-snapshot", config, handler);
server.run().unwrap();
server.stop();handler := cgroups.Handler{
Handle: func(req protocol.CgroupsRequestView, builder *protocol.CgroupsBuilder) bool {
// Fill builder with cgroup items
return true
},
}
config := cgroups.ServerConfig{ /* ... */ }
server := cgroups.NewServerWithWorkers(
"/run/netdata", "cgroups-snapshot", config, handler, 4)
go server.Run()
server.Stop()The L3 cache wraps an L2 client, maintains a local owned copy of the latest snapshot, and provides O(1) lookup by hash + name.
nipc_cgroups_cache_t cache;
nipc_client_config_t ccfg = { /* ... */ };
nipc_cgroups_cache_init(&cache, "/run/netdata", "cgroups-snapshot", &ccfg);
/* Call periodically from your loop */
if (nipc_cgroups_cache_refresh(&cache)) {
printf("cache updated, %u items\n", cache.item_count);
}
/* O(1) lookup (hash table, no I/O) */
const nipc_cgroups_cache_item_t *item =
nipc_cgroups_cache_lookup(&cache, hash, "docker-abc123");
if (item) {
/* item->name, item->path, item->enabled, item->options */
}
nipc_cgroups_cache_close(&cache);use netipc::service::cgroups::CgroupsCache;
let mut cache = CgroupsCache::new("/run/netdata", "cgroups-snapshot", config);
// Call periodically
if cache.refresh() {
println!("cache updated");
}
// O(1) lookup
if let Some(item) = cache.lookup(hash, "docker-abc123") {
println!("{}: {}", item.name, item.path);
}
cache.close();cache := cgroups.NewCache("/run/netdata", "cgroups-snapshot", config)
defer cache.Close()
// Call periodically
if cache.Refresh() {
fmt.Println("cache updated")
}
// O(1) lookup
if item, found := cache.Lookup(hash, "docker-abc123"); found {
fmt.Printf("%s: %s\n", item.Name, item.Path)
}- Caller-driven: no hidden threads, no timers. You call
Refresh()from your own loop at your own cadence. - Service-oriented: the client binds to a service kind, not to a plugin.
- One endpoint, one request kind: a
cgroups-snapshotserver serves snapshot requests only. - Asynchronous startup: providers may appear late or disappear. Callers must tolerate absence and reconnect through their normal loop.
- At-least-once retry: if a call fails and the client was READY, it automatically disconnects, reconnects (full handshake), and retries once before returning an error.
- Overflow recovery is fallback: typed callers do not size request payloads manually. The library computes an internal initial request payload proposal and reconnects with a larger one only if that estimate proved too small.
- Cache preservation: on refresh failure, the previous cache is preserved. The cache is empty only if no refresh has ever succeeded.
- Transport negotiation: if SHM is selected during handshake, the successful handshake means the server already prepared SHM for that session; if client-side attach still fails, recovery is a new baseline session, not same-session fallback.
- Cross-language: all three implementations produce identical wire bytes and pass cross-language interop tests.
# Full build + tests
mkdir build && cd build
cmake ..
cmake --build .
ctest
# Rust
cd src/crates/netipc && cargo test
# Go
cd src/go && go test ./...