DYAD Data Transport API

Once consumer identifies the owner of the file it tries to access via metadata, and it finds out the owner is at a remote location, i.e., the file is not locally visible, it request the owner to transfer the file. DYAD offers multiple backend implementations to transfer data from producer to consumer. The streaming RPC that is native mechanism of Flux, UCX with RDMA, and the Margo+Mercury framework.

Functions

dyad_rc_t dyad_dtl_init(dyad_ctx_t *ctx, dyad_dtl_mode_t mode, dyad_dtl_comm_mode_t comm_mode, bool debug)

Initializes the Data Transport Layer for a DYAD context.

Allocates the dyad_dtl handle inside ctx and delegates to the backend-specific initialization function based on mode:

If mode does not match any enabled backend, returns DYAD_RC_BADDTLMODE without initializing the handle.

See also

dyad_dtl_mode_t.

See also

dyad_dtl_comm_mode_t.

Parameters:
  • ctx[inout] DYAD context. On success, ctx->dtl_handle is allocated and initialized.

  • mode[in] DTL backend to use.

  • comm_mode[in] Communication direction (DYAD_COMM_SEND for producer, DYAD_COMM_RECV for consumer).

  • debug[in] If true, enables verbose debug logging in the DTL backend.

Return values:
  • DYAD_RC_OK – Initialization succeeded.

  • DYAD_RC_SYSFAIL – Failed to allocate the dyad_dtl handle.

  • DYAD_RC_BADDTLMODEmode does not match any enabled backend.

  • other – Any error code returned by the backend-specific init function.

Returns:

dyad_rc_t return code:

dyad_rc_t dyad_dtl_finalize(dyad_ctx_t *ctx)

Finalizes and frees the Data Transport Layer for a DYAD context.

Delegates to the backend-specific finalization function based on ctx->dtl_handle->mode, then frees the dyad_dtl handle and sets ctx->dtl_handle to NULL.

If ctx->dtl_handle is already NULL, the function is a no-op and returns DYAD_RC_OK. This allows dyad_dtl_finalize() to be called safely on an already-finalized context without error.

Backend dispatch:

Note

The dyad_dtl handle is always freed and set to NULL regardless of whether the backend finalization succeeds or fails. Any error code returned by the backend is overwritten with DYAD_RC_OK before returning — this is intentional, as finalization errors are non-recoverable and should not prevent the caller from continuing teardown.

Parameters:

ctx[inout] DYAD context. On return, ctx->dtl_handle is NULL.

Return values:
  • DYAD_RC_OK – Finalization succeeded, or the handle was already NULL.

  • DYAD_RC_BADDTLMODE – The mode stored in the handle does not match any enabled backend. The handle is still freed.

Returns:

dyad_rc_t return code:

Flux Streaming RPC

The Flux RPC DTL backend uses the built-in RPC mechanism provided by the Flux framework, requiring no additional networking dependencies beyond Flux itself.

Flux is a next-generation resource manager and job scheduler for HPC systems. Beyond job and resource management, Flux provides a distributed messaging infrastructure based on ZeroMQ that allows processes running under the same Flux instance to communicate via lightweight RPCs. Each Flux broker runs on a node and handles message routing between processes across the job allocation.

Flux streaming RPC extends the basic request-response RPC model to allow the service to send multiple responses to a single request, with ENODATA signalling end-of-stream. DYAD uses this mechanism on the producer side — the DYAD service responds to a consumer fetch request by sending the file data as one or more raw payloads via flux_respond_raw(), and signals completion by responding with ENODATA via flux_respond_error(). The consumer reads successive responses from the same flux_future_t via flux_rpc_get_raw() until it receives ENODATA.

Functions

dyad_rc_t dyad_dtl_flux_init(const dyad_ctx_t *ctx, dyad_dtl_mode_t mode, dyad_dtl_comm_mode_t comm_mode, bool debug)

Initializes the Flux RPC DTL internal state.

Allocates and populates the dyad_dtl_flux internal state struct, then wires all function pointers in ctx->dtl_handle to their Flux RPC implementations:

  • rpc_pack ->dyad_dtl_flux_rpc_pack

  • rpc_unpack -> dyad_dtl_flux_rpc_unpack

  • rpc_respond -> dyad_dtl_flux_rpc_respond

  • rpc_recv_response -> dyad_dtl_flux_rpc_recv_response

  • get_buffer -> dyad_dtl_flux_get_buffer

  • return_buffer -> dyad_dtl_flux_return_buffer

  • establish_connection -> dyad_dtl_flux_establish_connection

  • send -> dyad_dtl_flux_send

  • recv -> dyad_dtl_flux_recv

  • close_connection -> dyad_dtl_flux_close_connection

The Flux handle is borrowed from ctx->h and stored as a non-owning pointer — it is not closed by dyad_dtl_flux_finalize(). The pending future (f) and message (msg) fields are initialized to NULL and are set during RPC operations.

Todo:

Remove mode parameter — it is already stored in ctx->dtl_handle->mode by dyad_dtl_init() before this function is called.

Parameters:
  • ctx[in] DYAD context. ctx->dtl_handle must already be allocated by dyad_dtl_init().

  • mode[in] DTL mode (must be DYAD_DTL_FLUX_RPC. see TODO).

  • comm_mode[in] Communication direction (DYAD_COMM_SEND for producer, DYAD_COMM_RECV for consumer).

  • debug[in] If true, enables verbose debug logging.

Return values:
  • DYAD_RC_OK – Initialization succeeded.

  • DYAD_RC_SYSFAIL – Failed to allocate the dyad_dtl_flux struct.

Returns:

dyad_rc_t return code:

dyad_rc_t dyad_dtl_flux_rpc_pack(const dyad_ctx_t *ctx, const char *upath, uint32_t producer_rank, void ***packed_obj)

Packs a file fetch request into a JSON object for a Flux RPC call.

Creates a Jansson JSON object of the form {"upath": “<upath>”} using json_pack(). The producer_rank parameter is accepted for interface consistency with other DTL backends but is not included in the packed object — the Flux RPC is routed to the producer broker by rank at the Flux level rather than embedded in the payload.

The caller is responsible for decrementing the reference count of packed_obj via json_decref() when it is no longer needed.

Parameters:
  • ctx[in] DYAD context.

  • upath[in] Relative path of the file to fetch.

  • producer_rank[in] Flux rank of the producer broker. Not embedded in the payload for Flux RPC.

  • packed_obj[out] Set to the allocated JSON object on success. Undefined on failure.

Return values:
  • DYAD_RC_OK – The JSON object was created successfully.

  • DYAD_RC_BADPACKjson_pack() failed to create the object.

Returns:

dyad_rc_t return code:

dyad_rc_t dyad_dtl_flux_rpc_unpack(const dyad_ctx_t *ctx, const void **msg, char **upath)

Unpacks a file fetch request from an incoming Flux RPC message.

Extracts the upath field from the JSON payload of msg using flux_request_unpack(). On success, stores a non-owning pointer to msg in the Flux DTL handle so that dyad_dtl_flux_rpc_respond() can respond to the same message later without the caller needing to pass it again.

Note

The upath string is owned by the Flux message msg and must not be freed by the caller. It remains valid only for the lifetime of msg.

Note

The stored msg pointer is non-owning — the Flux DTL does not free or destroy the message.

Note

msg is stored as a non-owning pointer in the Flux DTL handle for use by dyad_dtl_flux_send(), which calls flux_respond_raw() with the stored message to route the response back to the correct consumer. In the Flux streaming RPC protocol, the original request message serves as the reply address — without it the producer cannot direct the response to the consumer that issued the request. The message is owned by the Flux broker and must not be freed by DYAD.

Parameters:
  • ctx[in] DYAD context.

  • msg[in] Incoming Flux RPC message containing the JSON payload.

  • upath[out] Set to the relative path of the requested file, extracted from the message payload. Valid for the lifetime of msg.

Return values:
  • DYAD_RC_OK – Unpacking succeeded and upath is set.

  • DYAD_RC_BADUNPACKflux_request_unpack() failed to extract the upath field from the message.

Returns:

dyad_rc_t return code:

dyad_rc_t dyad_dtl_flux_rpc_respond(const dyad_ctx_t *ctx, const void **orig_msg)

Sends the initial RPC acknowledgement from the service to the consumer.

For the Flux RPC DTL backend this is a no-op — the Flux streaming RPC protocol does not require an explicit acknowledgement before data transfer begins. The function exists to satisfy the rpc_respond function pointer in dyad_dtl and to maintain a consistent interface across all backends.

Parameters:
  • ctx[in] DYAD context.

  • orig_msg[in] The incoming Flux RPC message to respond to. Unused by this backend.

Returns:

Always returns DYAD_RC_OK.

dyad_rc_t dyad_dtl_flux_rpc_recv_response(const dyad_ctx_t *ctx, void **f)

Receives the initial RPC response from the service and stores the Flux future for subsequent data transfer.

Stores f in the Flux DTL handle so that subsequent calls to dyad_dtl_flux_recv() can retrieve data from the same streaming RPC future without the caller needing to pass it again.

For the Flux RPC backend, no blocking wait is performed here — the future is stored and consumed lazily by dyad_dtl_flux_recv().

Parameters:
  • ctx[in] DYAD context.

  • f[in] Flux future representing the pending streaming RPC response. Stored as a non-owning pointer; the caller retains ownership and must destroy it after the transfer is complete.

Returns:

Always returns DYAD_RC_OK.

dyad_rc_t dyad_dtl_flux_get_buffer(const dyad_ctx_t *ctx, size_t data_size, void **data_buf)

Allocates a page-aligned buffer for Flux RPC data transfer.

Allocates a buffer of data_size bytes aligned to the system page size via posix_memalign(). Page alignment is required for compatibility with direct I/O and RDMA-capable transports that may be used alongside the Flux RPC backend.

The function validates data_buf before allocation:

  • If data_buf is NULL, the caller passed an invalid output pointer and DYAD_RC_BADBUF is returned.

  • If *data_buf is non-NULL, a buffer is already present and overwriting it would cause a memory leak, so DYAD_RC_BADBUF is returned.

The allocated buffer must be released via dyad_dtl_flux_return_buffer().

Note

A plain malloc() path exists in the source but is disabled (#if 0). The active path always uses posix_memalign().

Parameters:
  • ctx[in] DYAD context.

  • data_size[in] Number of bytes to allocate.

  • data_buf[out] Must point to a NULL pointer on entry. Set to the allocated page-aligned buffer on success.

Return values:
  • DYAD_RC_OK – Buffer allocated successfully.

  • DYAD_RC_BADBUFdata_buf is NULL or *data_buf is already non-NULL.

  • DYAD_RC_SYSFAILposix_memalign() failed.

Returns:

dyad_rc_t return code:

dyad_rc_t dyad_dtl_flux_return_buffer(const dyad_ctx_t *ctx, void **data_buf)

Releases a buffer previously allocated by dyad_dtl_flux_get_buffer().

Frees the buffer pointed to by *data_buf. The function validates data_buf before freeing:

  • If data_buf is NULL, the caller passed an invalid pointer and DYAD_RC_BADBUF is returned.

  • If *data_buf is NULL, the buffer has already been freed or was never allocated, and DYAD_RC_BADBUF is returned.

Todo:

Set *data_buf to NULL after free() to prevent use-after-free. This is a one-line fix:

free (*data_buf);
*data_buf = NULL;

Note

The caller is responsible for setting *data_buf to NULL after this call. Unlike some other buffer management functions in DYAD, this function does not null the pointer on return.

Parameters:
  • ctx[in] DYAD context.

  • data_buf[inout] Pointer to the buffer to free. *data_buf must be non-NULL on entry.

Return values:
  • DYAD_RC_OK – Buffer freed successfully.

  • DYAD_RC_BADBUFdata_buf is NULL or *data_buf is NULL.

Returns:

dyad_rc_t return code:

dyad_rc_t dyad_dtl_flux_establish_connection(const dyad_ctx_t *ctx)

Establishes the DTL data channel for the Flux RPC backend.

For the Flux RPC backend this is a no-op — data is transferred directly over the existing Flux streaming RPC connection established during rpc_pack() / rpc_recv_response(), so no additional connection setup is required. The function exists to satisfy the establish_connection function pointer in dyad_dtl and to maintain a consistent interface across all backends.

Parameters:

ctx[in] DYAD context.

Returns:

Always returns DYAD_RC_OK.

dyad_rc_t dyad_dtl_flux_send(const dyad_ctx_t *ctx, void *buf, size_t buflen)

Sends file data to the consumer via a Flux RPC response.

Sends buf as the raw payload of a Flux RPC response using flux_respond_raw(). The response is addressed to the consumer using the message stored in the Flux DTL handle by dyad_dtl_flux_rpc_unpack(). This is a streaming RPC response — the consumer receives it via dyad_dtl_flux_recv() which reads successive responses from the same Flux future until ENODATA signals end-of-stream.

Parameters:
  • ctx[in] DYAD context. The Flux handle and the stored request message are read from the Flux DTL handle.

  • buf[in] Buffer containing the file data to send.

  • buflen[in] Number of bytes in buf.

Return values:
  • DYAD_RC_OK – Data sent successfully.

  • DYAD_RC_FLUXFAILflux_respond_raw() failed.

Returns:

dyad_rc_t return code:

dyad_rc_t dyad_dtl_flux_recv(const dyad_ctx_t *ctx, void **buf, size_t *buflen)

Receives file data from the producer via a Flux streaming RPC.

Retrieves the raw payload of the next Flux RPC response from the future stored in the Flux DTL handle by dyad_dtl_flux_rpc_recv_response(). The received data is copied into a freshly allocated buffer obtained via ctx->dtl_handle->get_buffer().

After retrieval, flux_future_reset() is called on the stored future regardless of success or failure, allowing the future to be reused for subsequent streaming responses in the same RPC session.

Error handling:

  • If the stored future is NULL, the function returns DYAD_RC_FLUXFAIL immediately.

  • If flux_rpc_get_raw() fails with errno == ENODATA, the producer has signalled end-of-stream and DYAD_RC_RPC_FINISHED is returned. The caller should treat this as a normal termination condition rather than a hard error.

  • If flux_rpc_get_raw() fails for any other reason, DYAD_RC_BADRPC is returned.

  • If buffer allocation fails, *buf is set to NULL and *buflen is set to 0.

Parameters:
  • ctx[in] DYAD context. The stored Flux future is read from the Flux DTL handle.

  • buf[out] Set to a newly allocated buffer containing the received file data on success. The caller must release it via ctx->dtl_handle->return_buffer(). Set to NULL on failure.

  • buflen[out] Set to the number of bytes received on success. Set to 0 on failure.

Return values:
  • DYAD_RC_OK – Data received and copied successfully.

  • DYAD_RC_FLUXFAIL – The stored Flux future is NULL.

  • DYAD_RC_RPC_FINISHEDflux_rpc_get_raw() returned ENODATA, signalling end-of-stream.

  • DYAD_RC_BADRPCflux_rpc_get_raw() failed for a reason other than ENODATA.

  • other – Any error code returned by get_buffer() on allocation failure.

Returns:

dyad_rc_t return code:

dyad_rc_t dyad_dtl_flux_close_connection(const dyad_ctx_t *ctx)

Closes the Flux RPC DTL data channel.

Clears the stored Flux future (f) and request message (msg) pointers in the Flux DTL handle by setting them to NULL. Neither pointer is destroyed or freed here — both are non-owning references:

  • The Flux future (f) is owned by the caller that passed it to dyad_dtl_flux_rpc_recv_response() and must be destroyed by that caller via flux_future_destroy().

  • The request message (msg) is owned by the Flux broker and must not be freed by DYAD.

After this call the DTL handle is ready to be reused for a new RPC session by calling rpc_pack() / rpc_unpack() again.

Parameters:

ctx[in] DYAD context.

Returns:

Always returns DYAD_RC_OK.

dyad_rc_t dyad_dtl_flux_finalize(const dyad_ctx_t *ctx)

Finalizes and frees the Flux RPC DTL internal state.

Clears all pointers in the Flux DTL handle, frees the dyad_dtl_flux struct, and sets the handle pointer to NULL. If ctx->dtl_handle is already NULL the function is a no-op.

The three stored pointers are cleared as follows before freeing:

  • h — set to NULL. The Flux handle is non-owning; it is borrowed from ctx->h and must not be closed here.

  • f — set to NULL. The Flux future is non-owning; the caller that created it is responsible for destroying it via flux_future_destroy().

  • msg — set to NULL. The request message is non-owning; it is owned by the Flux broker and must not be freed here.

Note

This function only frees the dyad_dtl_flux struct. The outer dyad_dtl handle is freed by dyad_dtl_finalize(), which calls this function as part of the full teardown sequence.

Parameters:

ctx[in] DYAD context. ctx->dtl_handle->private_dtl.flux_dtl_handle is set to NULL on return.

Returns:

Always returns DYAD_RC_OK.

Margo, Mercury and Mochi

Margo, Mercury, and libfabric form a layered communication stack used by DYAD’s Margo DTL backend.

Margo is a higher-level RPC library built on top of Mercury and Argobots. It integrates Mercury’s progress loop with Argobots user-level threads (ULTs) and execution streams (ESs), allowing RPC handlers to be written as straightforward blocking functions rather than callback chains.

Mercury is an RPC framework built on top of a pluggable Network Abstraction (NA) layer. It provides serialization of RPC arguments and RDMA bulk transfers via hg_bulk_t handles, supporting both push (HG_BULK_PUSH) and pull (HG_BULK_PULL) transfer modes. Mercury supports multiple NA plugins including libfabric and UCX, allowing it to target a wide range of high-performance network fabrics. In DYAD’s Margo DTL backend, the pull model is adopted — the producer registers its buffer and notifies the consumer, which pulls the data directly from the producer’s memory via HG_BULK_PULL.

Together these libraries are part of the Mochi project, an HPC ecosystem of composable microservices for data management and storage, developed under the DOE Exascale Computing Project.

libfabric (OFI) is a low-level network abstraction library that provides portable access to high-performance network fabrics such as InfiniBand (via ofi+verbs) and TCP (via ofi+tcp). It is maintained by the OpenFabrics Alliance and used by Mercury as its high-performance network transport layer.

Functions

static void data_ready_rpc(void *h)

Mercury/Margo RPC input structure for a data transfer request.

The Mercury macro MERCURY_GEN_PROC generates the structure along with the serialization functions. Contains the size of the data to transfer and a bulk handle referencing the producer’s registered memory region for the RDMA pull.

  • n — number of bytes to transfer.

  • bulk — Mercury bulk handle to the producer’s send buffer.

Mercury/Margo RPC output structure for a data transfer response.

The Mercury macro MERCURY_GEN_PROC generates the structure along with the serialization functions. Contains a return code sent back to the producer after the RDMA pull completes.

  • ret — return code; 0 indicates success.

Margo RPC handler that pulls file data from the producer via RDMA.

Registered as the handler for the DYAD Margo data transfer RPC. Invoked on the consumer side when the producer calls margo_forward() to initiate a data transfer. Performs the following steps:

  1. Retrieves the Margo instance and producer address from the RPC handle.

  2. Looks up the dyad_dtl_margo_t handle registered with the Margo instance via margo_registered_data().

  3. Unpacks the input (margo_rpc_in_t) to obtain the transfer size (n) and the producer’s bulk handle.

  4. Allocates a receive buffer of n bytes and creates a local Mercury bulk handle with HG_BULK_WRITE_ONLY access.

  5. Performs an RDMA pull (HG_BULK_PULL) from the producer’s bulk handle into the local buffer via margo_bulk_transfer().

  6. Responds to the producer with out.ret = 0 to signal completion, then frees the input and destroys the RPC handle.

  7. Sets margo_handle->recv_ready = 1 to unblock the consumer thread waiting in a busy loop on that flag.

Todo:

Replace assert() calls with proper error handling that propagates failures back to the caller rather than aborting.

Note

All Mercury/Margo calls are checked with assert(). This means any failure aborts the process rather than returning an error code. Error handling should be improved in a future revision.

Note

The receive buffer allocated in step 4 is stored in margo_handle->recv_buffer and must be freed by the caller after consuming the data.

Note

DEFINE_MARGO_RPC_HANDLER() wraps this function to register it with the Margo runtime as a ULT (user-level thread) handler.

Parameters:

h[in] Mercury RPC handle for the incoming request.

dyad_rc_t dyad_dtl_margo_get_buffer(const dyad_ctx_t *ctx, size_t data_size, void **data_buf)

Allocates a buffer for Margo DTL data transfer.

Allocates a buffer of data_size bytes via malloc(). The function validates data_buf before allocation:

  • If data_buf is NULL, the caller passed an invalid output pointer and DYAD_RC_BADBUF is returned.

  • If *data_buf is non-NULL, a buffer is already present and overwriting it would cause a memory leak, so DYAD_RC_BADBUF is returned.

The allocated buffer must be released via dyad_dtl_margo_return_buffer().

Note

A posix_memalign() path exists in the source but is disabled (#else branch, #if 1 selects malloc()). Unlike the Flux RPC DTL which uses page-aligned allocation, the Margo DTL currently uses plain malloc() since Margo manages its own RDMA memory registration separately via margo_bulk_create().

Parameters:
  • ctx[in] DYAD context.

  • data_size[in] Number of bytes to allocate.

  • data_buf[out] Must point to a NULL pointer on entry. Set to the allocated buffer on success.

Return values:
  • DYAD_RC_OK – Buffer allocated successfully.

  • DYAD_RC_BADBUFdata_buf is NULL or *data_buf is already non-NULL.

  • DYAD_RC_SYSFAILmalloc() failed to allocate the buffer.

Returns:

dyad_rc_t return code:

dyad_rc_t dyad_dtl_margo_return_buffer(const dyad_ctx_t *ctx, void **data_buf)

Releases a buffer previously allocated by dyad_dtl_margo_get_buffer().

Frees the buffer pointed to by *data_buf. The function validates data_buf before freeing:

  • If data_buf is NULL, the caller passed an invalid pointer and DYAD_RC_BADBUF is returned.

  • If *data_buf is NULL, the buffer has already been freed or was never allocated, and DYAD_RC_BADBUF is returned.

Todo:

Set *data_buf to NULL after free() to prevent use-after-free:

free (*data_buf);
*data_buf = NULL;

Note

*data_buf is not set to NULL after freeing. The caller is responsible for nulling the pointer to prevent use-after-free (see TODO).

Parameters:
  • ctx[in] DYAD context.

  • data_buf[inout] Pointer to the buffer to free. *data_buf must be non-NULL on entry.

Return values:
  • DYAD_RC_OK – Buffer freed successfully.

  • DYAD_RC_BADBUFdata_buf is NULL or *data_buf is NULL.

Returns:

dyad_rc_t return code:

static dyad_rc_t validate_margo_protocol(const dyad_ctx_t *ctx, const char *protocol)

Validates that a Mercury/libfabric network protocol is available on the current system.

Calls NA_Get_protocol_info() with protocol to check whether it is supported by the installed libfabric providers.

If the protocol is available, logs the class and device names at debug level, frees the protocol info via NA_Free_protocol_info(), and returns DYAD_RC_OK.

If the protocol is not available, logs an error and attempts a second call to NA_Get_protocol_info() with NULL to enumerate all available protocols. Each available protocol is logged at debug level to help the user identify a valid alternative. The protocol info is freed and DYAD_RC_MARGO_BAD_PROTO is returned regardless of whether the enumeration succeeds.

If the enumeration call also fails or returns no protocols, logs a message indicating that no NA protocol is available at all and returns DYAD_RC_MARGO_BAD_PROTO.

This function is called during Margo DTL initialization to fail fast with a descriptive error message rather than letting margo_init() fail with a cryptic error.

Parameters:
  • ctx[in] DYAD context.

  • protocol[in] Mercury/NA protocol string to validate, e.g. "ofi+tcp" or "ofi+verbs". If NULL, behavior is undefined — callers should always pass a non-NULL string.

Return values:
  • DYAD_RC_OK – The protocol is available on this system.

  • DYAD_RC_MARGO_BAD_PROTO – The protocol is not available, or NA_Get_protocol_info() failed.

Returns:

dyad_rc_t return code:

dyad_rc_t dyad_dtl_margo_init(const dyad_ctx_t *ctx, dyad_dtl_mode_t mode, dyad_dtl_comm_mode_t comm_mode, bool debug)

Initializes the Margo DTL internal state.

Allocates and populates the dyad_dtl_margo internal state struct, initializes the Margo instance, registers the data_ready_rpc RPC, and wires all function pointers in ctx->dtl_handle to their Margo implementations:

  • rpc_packdyad_dtl_margo_rpc_pack

  • rpc_unpackdyad_dtl_margo_rpc_unpack

  • rpc_responddyad_dtl_margo_rpc_respond

  • rpc_recv_responsedyad_dtl_margo_rpc_recv_response

  • get_bufferdyad_dtl_margo_get_buffer

  • return_bufferdyad_dtl_margo_return_buffer

  • establish_connectiondyad_dtl_margo_establish_connection

  • senddyad_dtl_margo_send

  • recvdyad_dtl_margo_recv

  • close_connectiondyad_dtl_margo_close_connection

The Mercury/NA protocol is read from the DYAD_MARGO_PROTO environment variable (DYAD_MARGO_PROTO_ENV). If not set, it defaults to "ofi+tcp". The protocol is validated via validate_margo_protocol() before margo_init() is called.

Margo is initialized differently depending on comm_mode:

  • DYAD_COMM_SEND (producer / Flux broker side): Margo is initialized in MARGO_CLIENT_MODE with no dedicated progress ES and no RPC handler ES. The data_ready_rpc RPC is registered without a handler (NULL), since the producer only sends RPCs.

  • DYAD_COMM_RECV (consumer / client wrapper side): Margo is initialized in MARGO_SERVER_MODE with a dedicated Execution Stream (ES) for the Mercury progress loop (use_progress_thread=1) and RPC handlers running in that same ES (rpc_thread_count=-1). The RPC named "data_ready_rpc" is registered with data_ready_rpc() as its handler function, and the margo_handle is registered a auxiliary data accessible to the handler via margo_registered_data()

    .

    Both modes retrieve their own local Margo address via

    margo_addr_self() and initialize remote_addr to NULL (set during connection establishment).

On any error, dyad_dtl_margo_finalize() is called to clean up partially initialized state before returning.

Todo:

Remove mode parameter — it is already stored in ctx->dtl_handle->mode by dyad_dtl_init() before this function is called.

Note

RPC handlers run in the same Argobots Execution Stream (ES) as the Mercury progress loop (rpc_thread_count=-1), meaning no additional ES is created for handler execution. This is safe because the consumer blocks in a busy-wait loop on margo_handle->recv_ready until data_ready_rpc() signals completion — there is no concurrent work that could be starved by sharing the progress loop ES with the handler.

Parameters:
  • ctx[in] DYAD context. ctx->dtl_handle must already be allocated by dyad_dtl_init().

  • mode[in] DTL mode (must be DYAD_DTL_MARGO). Redundant since dyad_dtl_init() already stores the mode in ctx->dtl_handle->mode before dispatch (see TODO).

  • comm_mode[in] Communication direction. DYAD_COMM_SEND initializes Margo as a client (producer side); DYAD_COMM_RECV initializes Margo as a server (consumer side).

  • debug[in] If true, enables verbose debug logging.

Return values:
  • DYAD_RC_OK – Initialization succeeded.

  • DYAD_RC_SYSFAIL – Failed to allocate the dyad_dtl_margo struct.

  • DYAD_RC_MARGO_BAD_PROTO – The configured Mercury/NA protocol is not available on this system.

  • DYAD_RC_MARGOINIT_FAILmargo_init() failed, or any other error occurred during initialization.

Returns:

dyad_rc_t return code:

dyad_rc_t dyad_dtl_margo_rpc_pack(const dyad_ctx_t *ctx, const char *upath, uint32_t producer_rank, void ***packed_obj)

Packs a file fetch request into a JSON object for a Margo RPC call.

Creates a Jansson JSON object containing the fields needed by the producer to locate the file and establish a Margo connection back to the consumer:

  • "upath" — relative path of the file to fetch.

  • "tag_prod" — Flux rank of the producer broker, used to route the Flux RPC to the correct broker.

  • "pid_cons" — process ID of the consumer, used by the producer to identify the requesting process.

  • "addr" — Margo address string of the consumer’s Margo server, obtained via margo_addr_to_string(). The producer uses this address to establish an RDMA connection back to the consumer to push the file data.

Unlike the Flux RPC backend which only packs upath, the Margo backend must also include the consumer’s Margo server address since the data transfer is initiated by the producer connecting back to the consumer (RDMA push model), rather than the consumer pulling from the producer.

The caller is responsible for decrementing the reference count of packed_obj via json_decref() when it is no longer needed.

Parameters:
  • ctx[in] DYAD context.

  • upath[in] Relative path of the file to fetch.

  • producer_rank[in] Flux rank of the producer broker.

  • packed_obj[out] Set to the allocated JSON object on success. Undefined on failure.

Return values:
  • DYAD_RC_OK – The JSON object was created successfully.

  • DYAD_RC_BADPACKjson_pack() failed to create the object.

Returns:

dyad_rc_t return code:

dyad_rc_t dyad_dtl_margo_rpc_unpack(const dyad_ctx_t *ctx, const void **msg, char **upath)

Unpacks a file fetch request from an incoming Flux RPC message and resolves the consumer’s Margo address.

Extracts the following fields from the JSON payload of msg using flux_request_unpack():

  • "upath" — relative path of the requested file.

  • "tag_prod" — Flux rank of the producer broker (extracted but not used on the producer side).

  • "pid_cons" — process ID of the consumer (extracted but not used on the producer side).

  • "addr" — Margo address string of the consumer’s Margo server.

After successful unpacking, resolves the consumer’s Margo address string to a hg_addr_t via margo_addr_lookup() and stores it in margo_handle->remote_addr. This address is used by dyad_dtl_margo_send() to establish the RDMA connection back to the consumer.

Note

upath is owned by the Flux message msg and must not be freed by the caller. It remains valid only for the lifetime of msg.

Note

tag_prod and pid_cons are unpacked for protocol completeness but are not used on the producer side. They may be useful for debugging or future extensions.

Note

Unlike the Flux RPC backend, the Margo backend does not store msg in the DTL handle after unpacking. In the Flux RPC backend, msg is stored because it is reused across multiple flux_respond_raw() calls on the same streaming RPC. This backend does not use streaming RPC — msg is only needed to extract the initial request fields, and the data transfer proceeds over a separate Margo RDMA channel independently of the Flux message.

Parameters:
  • ctx[in] DYAD context.

  • msg[in] Incoming Flux RPC message containing the JSON payload packed by dyad_dtl_margo_rpc_pack().

  • upath[out] Set to the relative path of the requested file, extracted from the message payload. Valid for the lifetime of msg.

Return values:
  • DYAD_RC_OK – Unpacking and address resolution succeeded.

  • DYAD_RC_BADUNPACKflux_request_unpack() failed to extract the required fields from the message.

Returns:

dyad_rc_t return code:

dyad_rc_t dyad_dtl_margo_rpc_respond(const dyad_ctx_t *ctx, const void **orig_msg)

Sends the initial RPC acknowledgement from the service to the consumer.

No-op for the Margo DTL. Unlike the Flux RPC backend, the Margo data transfer is driven by the producer connecting back to the consumer’s Margo server directly via RDMA, so no explicit acknowledgement over the Flux RPC channel is needed before data transfer begins.

Parameters:
  • ctx[in] DYAD context.

  • orig_msg[in] Unused by this backend.

Returns:

Always returns DYAD_RC_OK.

dyad_rc_t dyad_dtl_margo_rpc_recv_response(const dyad_ctx_t *ctx, void **f)

Receives the initial RPC response from the service.

No-op for the Margo DTL. The consumer does not need to process a Flux RPC response before data transfer begins — it waits directly on margo_handle->recv_ready, which is set by data_ready_rpc() after the RDMA pull completes.

Parameters:
  • ctx[in] Unused by this backend.

  • f[in] Unused by this backend.

Returns:

Always returns DYAD_RC_OK.

dyad_rc_t dyad_dtl_margo_establish_connection(const dyad_ctx_t *ctx)

Establishes the Margo DTL data channel.

No-op for the Margo DTL. The consumer’s Margo address is resolved to an hg_addr_t during dyad_dtl_margo_rpc_unpack() and stored in margo_handle->remote_addr. The producer connects directly to this address in dyad_dtl_margo_send() via margo_create() and margo_forward() without a separate connection setup step.

Endpoint caching is not needed since each producer-consumer pair performs a single data transfer per file fetch — the connection is created, used, and destroyed within a single dyad_dtl_margo_send() call.

Parameters:

ctx[in] DYAD context.

Returns:

Always returns DYAD_RC_OK.

dyad_rc_t dyad_dtl_margo_send(const dyad_ctx_t *ctx, void *buf, size_t buflen)

Sends file data to the consumer via Margo RDMA.

Registers buf as a read-only Mercury bulk handle via margo_bulk_create(), then sends an RPC to the consumer’s Margo server at margo_handle->remote_addr (resolved during dyad_dtl_margo_rpc_unpack()) via margo_forward(). The RPC payload contains the bulk handle and the buffer size, allowing the consumer’s data_ready_rpc() handler to perform an RDMA pull from the producer’s registered buffer.

margo_forward() blocks until the consumer responds, confirming that the RDMA pull is complete. The producer then frees the RPC output and destroys the handle.

Todo:

Add error checking for margo_bulk_create(), margo_create(), margo_forward(), and margo_get_output() — all currently unchecked.

Note

Unlike the Flux RPC backend where the producer needs the original request message (flux_msg_t) as a reply address to route flux_respond_raw() back to the correct consumer, this Margo-based backend uses an RDMA pull model. During dyad_dtl_margo_init(), the consumer initializes its Margo instance in MARGO_SERVER_MODE so that the producer can connect back to it. The consumer then embeds its own Margo address as a string in the Flux RPC request payload via dyad_dtl_margo_rpc_pack(). The producer extracts and resolves this address to an hg_addr_t via margo_addr_lookup() during dyad_dtl_margo_rpc_unpack(), storing it in margo_handle->remote_addr. This address is used here to connect back to the consumer and trigger the RDMA pull, without needing the original flux_msg_t.

Parameters:
  • ctx[in] DYAD context. margo_handle->remote_addr must already be set by dyad_dtl_margo_rpc_unpack().

  • buf[in] Buffer containing the file data to send.

  • buflen[in] Number of bytes in buf.

Return values:

DYAD_RC_OK – Always returned. Error handling for Margo calls is not yet implemented (see TODO).

Returns:

dyad_rc_t return code:

dyad_rc_t dyad_dtl_margo_recv(const dyad_ctx_t *ctx, void **buf, size_t *buflen)

Receives file data from the producer via the Margo DTL.

Busy-waits on margo_handle->recv_ready, sleeping 100 microseconds between checks, until data_ready_rpc() sets it to 1 after completing the RDMA pull from the producer. Once the data is ready, allocates a new buffer, copies the received data from margo_handle->recv_buffer, and frees the buffer allocated by data_ready_rpc().

After copying, the Margo handle state is reset for the next transfer:

  • recv_buffer is freed and set to NULL.

  • recv_len is reset to 0.

  • recv_ready is reset to 0.

Todo:

Replace the busy-wait loop with a more efficient synchronization mechanism such as an Argobots eventual or condition variable.

Todo:

Add error checking for malloc() failure.

Note

The data flow for Margo receive is inverted compared to the Flux RPC backend. In the Flux RPC backend the producer pushes data by calling flux_respond_raw(), and the consumer reads it via flux_rpc_get_raw(). In this Margo-based backend the producer registers its buffer and notifies the consumer’s Margo server, which performs an RDMA pull into margo_handle->recv_buffer via data_ready_rpc(). The consumer then copies from that buffer here. The actual data movement therefore happens in data_ready_rpc() running on the progress loop ES, not in this function.

Note

The busy-wait with usleep(100) is a simple polling approach. A condition variable or Argobots synchronization primitive would be more efficient but would require additional coordination with the Margo progress loop ES (see TODO).

Parameters:
  • ctx[in] DYAD context.

  • buf[out] Set to a newly allocated buffer containing a copy of the received file data. The caller must free this buffer via ctx->dtl_handle->return_buffer().

  • buflen[out] Set to the number of bytes received.

Returns:

Always returns DYAD_RC_OK. Error handling for allocation failure and copy errors is not yet implemented (see TODO).

dyad_rc_t dyad_dtl_margo_close_connection(const dyad_ctx_t *ctx)

Closes the Margo DTL data channel.

No-op for the Margo DTL. The RPC handle created in dyad_dtl_margo_send() is destroyed within that function via margo_destroy() immediately after the transfer completes. No persistent connection state is maintained between transfers since endpoint caching is not needed for DYAD’s single-transfer per file fetch usage pattern.

Todo:

The original implementation intended to perform different teardown depending on comm_mode (producer vs consumer). This was never implemented and the code was commented out. Revisit whether comm_mode-specific teardown is needed, for example to free margo_handle->remote_addr on the producer side via margo_addr_free().

Parameters:

ctx[in] DYAD context.

Returns:

Always returns DYAD_RC_OK.

dyad_rc_t dyad_dtl_margo_finalize(const dyad_ctx_t *ctx)

Finalizes and frees the Margo DTL internal state.

Releases all resources associated with the Margo DTL in the following order:

  1. If margo_handle->mid is a valid Margo instance, frees the local Margo address via margo_addr_free().

  2. If margo_handle->remote_addr is non-NULL, frees the remote address (the consumer’s resolved Margo server address) via margo_addr_free().

  3. Finalizes the Margo instance via margo_finalize(), which shuts down the Mercury progress loop and any associated Argobots ESs.

  4. Frees the dyad_dtl_margo struct and sets the handle pointer to NULL.

If ctx->dtl_handle is NULL or ctx->dtl_handle->private_dtl.margo_dtl_handle is NULL, the function is a no-op and returns DYAD_RC_OK. This allows dyad_dtl_margo_finalize() to be called safely on a partially initialized or already-finalized context, for example when dyad_dtl_margo_init() fails partway through and calls this function to clean up.

Note

This function only frees the dyad_dtl_margo struct. The outer dyad_dtl handle is freed by dyad_dtl_finalize(), which calls this function as part of the full teardown sequence.

Parameters:

ctx[in] DYAD context. On return, ctx->dtl_handle->private_dtl.margo_dtl_handle is NULL.

Returns:

Always returns DYAD_RC_OK.

UCX

The UCX (Unified Communication X) DTL backend provides high-performance inter-node data movement using one-sided RDMA. UCX is an open-source communication library that abstracts over multiple high-speed network fabrics including InfiniBand, RoCE, and shared memory, exposing a unified API for remote memory access operations.

In DYAD’s UCX backend, the producer pushes file data directly into the consumer’s pre-registered memory buffer using ucp_put_nbx(), a non-blocking one-sided RDMA put operation. Unlike the Margo backend which uses a pull model (HG_BULK_PULL), the UCX backend uses a push model — the producer writes directly to the consumer’s memory without the consumer needing to initiate the transfer.

Before any data transfer can occur, the consumer pre-allocates and registers an RDMA buffer with UCX via ucp_mem_map() during initialization. The consumer’s buffer address and the associated remote key (ucp_rkey_t) are packed into the Flux RPC request payload (base64-encoded) and sent to the producer. The producer decodes these fields, unpacks the remote key via ucp_ep_rkey_unpack(), and uses them to perform the RDMA put directly into the consumer’s buffer without any intermediate copy.

To avoid the cost of repeated endpoint creation, the UCX backend maintains a per-producer endpoint cache (ucx_ep_cache_h) that maps consumer connection keys to ucp_ep_h endpoints. An endpoint is created on the first transfer to a given consumer and reused for all subsequent transfers to the same consumer within the same job, amortizing the connection establishment overhead across multiple file fetches.

The consumer detects the arrival of data by polling the first sizeof(ssize_t) bytes of its RDMA buffer, which the producer prepends with the file size before initiating the put. This sentinel value is initialized to zero at the start of each transfer and becomes non-zero once the producer begins writing, allowing the consumer to busy- wait without a UCX request handle since one-sided RDMA puts do not notify the target.

Defines

UCX_MAX_TRANSFER_SIZE

Maximum data size for a single UCX transfer (4 GiB).

UCX tag send/receive operations are limited to 4 GiB per transfer. Files larger than this limit must be split into multiple transfers.

DYAD_UCX_TAG_MASK

Tag mask used for UCX tag send/receive operations.

Set to UINT64_MAX to match any tag, effectively disabling tag filtering. DYAD uses a single tag per transfer so no filtering is needed.

Typedefs

typedef struct ucx_request dyad_ucx_request_t

Functions

static void dyad_ucx_request_init(void *request)

Initializes a dyad_ucx_request_t allocated by UCX.

Registered with UCX via the request_init field of ucp_params_t during dyad_dtl_ucx_init(). Called automatically by UCX each time it allocates a new request object from its internal memory pool. Sets completed to 0 so that dyad_ucx_request_wait() can detect when the operation has finished.

Parameters:

request[in] Pointer to the uninitialized request memory allocated by UCX. Cast to dyad_ucx_request_t*.

static void dyad_send_callback(void *req, int status, void *ctx)

UCX send completion callback.

Registered as the completion callback for UCX tag send operations. Called by UCX when a send operation completes, either successfully or with an error. Sets real_req->completed to 1 so that dyad_ucx_request_wait() can detect completion and exit its polling loop.

The function signature differs between UCX API versions:

  • UCX >= 1.10: receives an additional ctx pointer argument.

  • UCX < 1.10: receives only req and status.

Parameters:
  • req[in] Pointer to the dyad_ucx_request_t for the completed operation.

  • status[in] Final status of the send operation.

  • ctx[in] User context pointer (UCX >= 1.10 only, unused).

static int dyad_ucx_request_wait(const dyad_ctx_t *ctx, dyad_ucx_request_t *request)

Waits for an async UCX operation to complete.

Handles the three possible return values of a UCX communication operation:

  • Immediate success (request == UCS_OK): the operation completed synchronously. Returns UCS_OK immediately.

  • Request handle (UCS_PTR_IS_PTR(request)): the operation is in progress. Spins by calling ucp_worker_progress() and polling ucp_request_check_status() until the status is no longer UCS_INPROGRESS, then frees the request via ucp_request_free() and returns the final status.

  • Immediate error (UCS_PTR_IS_ERR(request)): the operation failed immediately. Extracts the ucs_status_t error code via UCS_PTR_STATUS() and returns it.

The spin loop is expected to run only a small number of iterations because prior UCX calls in the send/receive path are structured to minimize the size of the worker’s event queue before this wait is called.

Parameters:
  • ctx[in] DYAD context. Used to access the UCX worker via ctx->dtl_handle->private_dtl.ucx_dtl_handle->ucx_worker.

  • request[in] Return value of a UCX communication operation (e.g. ucp_tag_send_nbx() or ucp_tag_recv_nbx()). May be UCS_OK, a request handle, or a UCX error pointer.

Return values:
  • UCS_OK – The operation completed successfully.

  • other – A UCX error code indicating failure.

Returns:

ucs_status_t final status of the operation:

static dyad_rc_t ucx_allocate_buffer(const dyad_ctx_t *ctx, dyad_dtl_ucx_t *dtl_handle, dyad_dtl_comm_mode_t comm_mode)

Allocates and registers a UCX memory buffer for RDMA operations.

Allocates a host memory buffer of dtl_handle->max_transfer_size + sizeof(size_t) bytes using ucp_mem_map() with UCP_MEM_MAP_ALLOCATE, which lets UCX allocate and register the memory in one step. The extra sizeof(size_t) bytes prepend the file size to the buffer so the consumer can determine the data boundary without an additional RDMA operation.

The memory protection flags differ by communication direction:

  • DYAD_COMM_SEND (producer): UCP_MEM_MAP_PROT_LOCAL_READ — the producer only reads from the buffer locally to send data.

  • DYAD_COMM_RECV (consumer): UCP_MEM_MAP_PROT_REMOTE_WRITE — the buffer is exposed for remote write so the producer can push data into it via RDMA.

After mapping, queries the actual allocated address via ucp_mem_query() and stores it in dtl_handle->net_buf and dtl_handle->cons_buf_ptr. Then packs the remote key for the registered memory region via ucp_rkey_pack(), storing the packed key in dtl_handle->rkey_buf and its size in dtl_handle->rkey_size. The remote key is later sent to the producer (encoded in base64) so it can perform the RDMA push into the consumer’s buffer.

On any failure after a successful ucp_mem_map(), the memory is unmapped via ucp_mem_unmap() before returning.

Parameters:
  • ctx[in] DYAD context. Used for logging.

  • dtl_handle[inout] UCX DTL internal state. On success, net_buf, cons_buf_ptr, mem_handle, rkey_buf, and rkey_size are populated.

  • comm_mode[in] Communication direction. Controls the memory protection flags. Must not be DYAD_COMM_NONE.

Return values:
  • DYAD_RC_OK – Buffer allocated and registered successfully.

  • DYAD_RC_NOCTXdtl_handle->ucx_ctx is NULL.

  • DYAD_RC_BAD_COMM_MODEdtl_handle->comm_mode is DYAD_COMM_NONE.

  • DYAD_RC_UCXMMAP_FAILucp_mem_map() or ucp_mem_query() failed.

  • DYAD_RC_UCXRKEY_PACK_FAILEDucp_rkey_pack() failed.

  • DYAD_RC_BADBUF – Default error before any specific check is reached.

Returns:

dyad_rc_t return code:

static dyad_rc_t ucx_free_buffer(const dyad_ctx_t *ctx, void *ucp_ctx, void *mem_handle, void **buf)

Releases a UCX RDMA-registered memory buffer.

Unmaps and deregisters the memory region identified by mem_handle via ucp_mem_unmap(), which both frees the UCX memory registration and releases the underlying memory allocated by ucp_mem_map() with UCP_MEM_MAP_ALLOCATE. Sets *buf to NULL after unmapping.

Validates all three inputs before proceeding:

  • If ucp_ctx is NULL, returns DYAD_RC_NOCTX.

  • If mem_handle is NULL, returns DYAD_RC_UCXMMAP_FAIL.

  • If buf or *buf is NULL, returns DYAD_RC_BADBUF.

Note

Unlike dyad_dtl_flux_return_buffer() and dyad_dtl_margo_return_buffer() which use free(), this function uses ucp_mem_unmap() because the buffer was allocated and registered by UCX via ucp_mem_map() with UCP_MEM_MAP_ALLOCATE. Calling free() directly on a UCX-managed buffer would bypass UCX’s memory registration and cause undefined behavior.

Parameters:
  • ctx[in] DYAD context. Used for logging.

  • ucp_ctx[in] UCX context used to unmap the memory. Must not be NULL.

  • mem_handle[in] UCX memory handle returned by ucp_mem_map(). Must not be NULL.

  • buf[inout] Pointer to the buffer to release. *buf must be non-NULL on entry. Set to NULL on success.

Return values:
  • DYAD_RC_OK – Buffer unmapped and released successfully.

  • DYAD_RC_NOCTXucp_ctx is NULL.

  • DYAD_RC_UCXMMAP_FAILmem_handle is NULL.

  • DYAD_RC_BADBUFbuf or *buf is NULL.

Returns:

dyad_rc_t return code:

static inline void *ucx_send_no_wait(const dyad_ctx_t *ctx, bool is_warmup, void *buf, size_t buflen)

Initiates a non-blocking UCX RDMA push operation.

Performs a one-sided RDMA put of buf into the consumer’s pre-registered memory buffer using ucp_put_nbx(). The operation proceeds in three steps:

  1. Unpacks the consumer’s remote key from dtl_handle->rkey_buf via ucp_ep_rkey_unpack(), binding it to the endpoint dtl_handle->ep. The unpacked key is stored in dtl_handle->rkey.

  2. Initiates the RDMA put via ucp_put_nbx() with dyad_send_callback registered as the completion callback. The destination address is dtl_handle->cons_buf_ptr, which points to the consumer’s pre-registered RDMA buffer (set during connection establishment).

  3. Returns the ucs_status_ptr_t from ucp_put_nbx() without waiting for completion. The caller must pass the returned pointer to dyad_ucx_request_wait() to block until the operation finishes.

If the endpoint is NULL or ucp_ep_rkey_unpack() fails, the function returns a ucs_status_ptr_t encoding UCS_ERR_NOT_CONNECTED without initiating the put.

Note

The is_warmup parameter is accepted for interface consistency with the receive path (ucx_recv_no_wait()) but is not used — no warmup distinction is made on the send path.

Note

This function confirms the push RDMA model used by the UCX backend: the producer pushes data directly into the consumer’s pre-registered memory buffer via ucp_put_nbx(), in contrast to the Margo backend which uses a pull model where the consumer pulls from the producer’s buffer via HG_BULK_PULL.

Parameters:
  • ctx[in] DYAD context. The UCX endpoint, remote key buffer, and consumer buffer pointer are read from the UCX DTL internal state.

  • is_warmup[in] Unused. Accepted for interface consistency with the receive path.

  • buf[in] Local buffer containing the data to send.

  • buflen[in] Number of bytes to send.

Return values:
  • UCS_OK – The operation completed immediately and successfully (returned as a status, not a pointer).

  • valid – pointer The operation is in progress. Pass to dyad_ucx_request_wait() to wait for completion.

  • UCS_ERR_NOT_CONNECTED – The endpoint is NULL, ucp_ep_rkey_unpack() failed, or ucp_put_nbx() returned an error.

Returns:

ucs_status_ptr_t:

static inline void *ucx_recv_no_wait(const dyad_ctx_t *ctx, bool is_warmup, void **buf, size_t *buflen)

Waits for incoming RDMA data by polling the consumer’s pre-registered buffer.

Busy-polls the first sizeof(ssize_t) bytes of dtl_handle->net_buf, which the producer prepends with the file size before initiating the RDMA push via ucp_put_nbx(). The consumer spins until this value becomes non-zero, indicating that the producer has started writing data into the buffer.

On each iteration the UCX worker is progressed via ucp_worker_progress() to process incoming network events, and the thread sleeps for 10 microseconds to avoid saturating the CPU.

This approach works because the UCX backend uses a push RDMA model — the producer writes directly into the consumer’s pre-registered memory buffer (dtl_handle->net_buf) via ucp_put_nbx(). The prepended file size acts as a sentinel: once the producer has initiated the RDMA put, the first bytes of the buffer are non-zero, signalling to the consumer that data is arriving.

Todo:

Replace the busy-poll with a more efficient notification mechanism. The current approach wastes CPU cycles and adds latency from the 10-microsecond sleep. UCX Active Messages or a lightweight atomic flag could provide lower-latency completion notification.

Note

Unlike dyad_ucx_request_wait() which polls a UCX request handle for a specific operation, this function polls the buffer contents directly. This is necessary because the consumer has no UCX request handle for the producer’s ucp_put_nbx() call — the put is one-sided and the consumer is not notified by UCX when it completes.

Note

The buf and buflen output parameters are accepted for interface consistency but are not populated by this function. The actual buffer pointer and length are read from dtl_handle->net_buf and the prepended size field by the caller (dyad_dtl_ucx_recv()) after this function returns.

Note

The is_warmup parameter controls whether this is a warmup iteration used to prime the RDMA connection before the actual data transfer. During warmup the buffer sentinel check still applies but the received data is discarded by the caller.

Parameters:
  • ctx[in] DYAD context. The UCX worker and network buffer are read from the UCX DTL internal state.

  • is_warmup[in] If true, this is a warmup receive used to prime the RDMA connection.

  • buf[out] Unused by this function. Populated by the caller after return.

  • buflen[out] Unused by this function. Populated by the caller after return.

Returns:

Always returns NULL. The actual data is read directly from dtl_handle->net_buf by the caller.

static dyad_rc_t ucx_warmup(const dyad_ctx_t *ctx)

Performs a UCX connection warmup by sending a 1-byte message to self.

Warms up the UCX RDMA connection by performing a loopback send from the local worker to itself. This primes the UCX connection machinery — endpoint creation, remote key exchange, and RDMA registration — so that the first real data transfer does not pay the full connection establishment cost.

The warmup sequence is:

  1. Allocates a 1-byte UCX-registered send buffer via dyad_dtl_ucx_get_buffer() and a plain 1-byte receive buffer via malloc().

  2. Connects to the local worker’s own address via ucx_connect(), creating a loopback endpoint.

  3. Initiates a non-blocking 1-byte RDMA put to self via ucx_send_no_wait() with is_warmup=true.

  4. Waits for the send to complete via dyad_ucx_request_wait().

  5. Disconnects from self via ucx_disconnect() and frees both buffers.

On any failure, both buffers are freed and the appropriate error code is returned before cleanup.

Note

The receive buffer is allocated with malloc() rather than dyad_dtl_ucx_get_buffer() because the warmup receive is not a real RDMA operation — the loopback send writes into the UCX-registered send buffer itself, and the plain receive buffer is only allocated to mirror the real transfer path without being used.

Note

The warmup is only performed on the producer side during dyad_dtl_ucx_init(). The consumer side does not need a warmup because it waits passively for the producer to initiate the connection.

Parameters:

ctx[in] DYAD context. The UCX worker, local address, and endpoint are read from and written to the UCX DTL internal state.

Return values:
  • DYAD_RC_OK – Warmup completed successfully.

  • DYAD_RC_SYSFAIL – Failed to allocate the plain receive buffer via malloc().

  • DYAD_RC_UCXCOMM_FAIL – The warmup send operation failed.

  • other – Any error code returned by dyad_dtl_ucx_get_buffer() or ucx_connect().

Returns:

dyad_rc_t return code:

dyad_rc_t dyad_dtl_ucx_init(const dyad_ctx_t *ctx, dyad_dtl_mode_t mode, dyad_dtl_comm_mode_t comm_mode, bool debug)

Initializes the UCX DTL internal state.

Allocates and populates the dyad_dtl_ucx internal state struct, initializes the UCX context and worker, allocates the RDMA-registered buffer, performs a connection warmup, and wires all function pointers in ctx->dtl_handle to their UCX implementations:

  • rpc_packdyad_dtl_ucx_rpc_pack

  • rpc_unpackdyad_dtl_ucx_rpc_unpack

  • rpc_responddyad_dtl_ucx_rpc_respond

  • rpc_recv_responsedyad_dtl_ucx_rpc_recv_response

  • get_bufferdyad_dtl_ucx_get_buffer

  • return_bufferdyad_dtl_ucx_return_buffer

  • establish_connectiondyad_dtl_ucx_establish_connection

  • senddyad_dtl_ucx_send

  • recvdyad_dtl_ucx_recv

  • close_connectiondyad_dtl_ucx_close_connection

Initialization proceeds in the following order:

  1. Allocates the dyad_dtl_ucx struct and initializes all fields to safe defaults (NULL, 0, or UCX_MAX_TRANSFER_SIZE). The Flux handle is borrowed from ctx->h as a non-owning pointer.

  2. Reads the UCX configuration via ucp_config_read().

  3. Initializes the UCX context via ucp_init() with the following features enabled:

    • UCP_FEATURE_RMA — Remote Memory Access for RDMA push.

    • UCP_FEATURE_AMO32 — 32-bit atomic memory operations.

    • UCP_FEATURE_TAG — Tag-matching send/receive. The request size is set to sizeof(dyad_ucx_request_t) and dyad_ucx_request_init is registered as the request initializer. If debug is true, the UCX configuration is printed to stderr before the config is released.

  4. Creates a UCX worker via ucp_worker_create() with UCS_THREAD_MODE_SERIALIZED — all UCX calls must be made from a single thread at a time.

  5. Queries the worker’s local address via ucp_worker_get_address(), storing it in dtl_handle->local_address. This address is sent to the remote peer during connection establishment so the peer can create an endpoint back to this worker.

  6. Initializes the endpoint cache via dyad_ucx_ep_cache_init(). The cache stores ucp_ep_h endpoints keyed by remote worker address to avoid recreating endpoints for repeated transfers to the same peer.

  7. Allocates and registers an RDMA buffer of UCX_MAX_TRANSFER_SIZE + sizeof(size_t) bytes via ucx_allocate_buffer(). The extra sizeof(size_t) bytes hold a prepended file size sentinel used by the consumer to detect when the producer has started the RDMA push.

  8. Wires all DTL function pointers.

  9. Performs a loopback connection warmup via ucx_warmup() to prime the UCX connection machinery before the first real transfer. After warmup, dtl_handle->ep is reset to NULL.

On any error, dyad_dtl_ucx_finalize() is called to clean up partially initialized state before returning.

Todo:

Remove mode parameter — it is already stored in ctx->dtl_handle->mode by dyad_dtl_init() before this function is called.

Parameters:
  • ctx[in] DYAD context. ctx->dtl_handle must already be allocated by dyad_dtl_init().

  • mode[in] DTL mode (must be DYAD_DTL_UCX). Redundant since dyad_dtl_init() already stores the mode in ctx->dtl_handle->mode before dispatch (see TODO).

  • comm_mode[in] Communication direction. Controls the memory protection flags for the RDMA buffer: DYAD_COMM_SEND (producer) uses UCP_MEM_MAP_PROT_LOCAL_READ; DYAD_COMM_RECV (consumer) uses UCP_MEM_MAP_PROT_REMOTE_WRITE.

  • debug[in] If true, prints the UCX configuration to stderr and enables verbose debug logging.

Return values:
  • DYAD_RC_OK – Initialization succeeded.

  • DYAD_RC_SYSFAIL – Failed to allocate the dyad_dtl_ucx struct.

  • DYAD_RC_UCXINIT_FAIL – Any other initialization step failed, including ucp_config_read(), ucp_init(), ucp_worker_create(), endpoint cache initialization, buffer allocation, or warmup.

Returns:

dyad_rc_t return code:

dyad_rc_t dyad_dtl_ucx_rpc_pack(const dyad_ctx_t *ctx, const char *upath, uint32_t producer_rank, void ***packed_obj)

Packs a file fetch request into a JSON object for a UCX RPC call.

Creates a Jansson JSON object containing all information the producer needs to locate the file and perform an RDMA push into the consumer’s pre-registered buffer. Before packing, resets the first sizeof(ssize_t) bytes of dtl_handle->net_buf to zero so that ucx_recv_no_wait() can use this as a sentinel to detect when the producer has started writing.

The packed JSON object contains the following fields:

  • "upath" — relative path of the file to fetch.

  • "tag_prod" — Flux rank of the producer broker.

  • "cons_buf" — consumer’s RDMA buffer address (cons_buf_ptr) encoded as a decimal string. The producer uses this as the remote destination address for ucp_put_nbx().

  • "pid_cons" — process ID of the consumer.

  • "addr" — consumer’s UCX worker address, base64-encoded using RFC 4648. The producer calls ucp_worker_get_address() on this to create an endpoint back to the consumer.

  • "rkey" — consumer’s UCX remote key (rkey_buf), base64-encoded using RFC 4648. The producer calls ucp_ep_rkey_unpack() on this to obtain the ucp_rkey_t needed for ucp_put_nbx().

Both the UCX worker address and the remote key are opaque binary blobs that cannot be embedded directly in JSON. They are base64-encoded (RFC 4648) before packing and decoded by the producer in dyad_dtl_ucx_rpc_unpack(). The encoded buffers are freed after json_pack() copies them into the JSON object.

Note

The UCX backend packs significantly more information than the Flux RPC or Margo backends because RDMA push requires the producer to know the consumer’s exact memory address and access credentials before initiating the transfer.

Parameters:
  • ctx[in] DYAD context.

  • upath[in] Relative path of the file to fetch.

  • producer_rank[in] Flux rank of the producer broker.

  • packed_obj[out] Set to the allocated JSON object on success. Undefined on failure. The caller is responsible for decrementing the reference count via json_decref() when no longer needed.

Return values:
  • DYAD_RC_OK – The JSON object was created successfully.

  • DYAD_RC_BADPACKdtl_handle->local_address is NULL, base64 encoding of the address or remote key failed, or json_pack() failed.

  • DYAD_RC_SYSFAIL – Failed to allocate the base64 encoding buffer for the address or remote key.

Returns:

dyad_rc_t return code:

dyad_rc_t dyad_dtl_ucx_rpc_unpack(const dyad_ctx_t *ctx, const void **msg, char **upath)

Unpacks a file fetch request from an incoming Flux RPC message and decodes the consumer’s UCX worker address and remote key.

Extracts the following fields from the JSON payload of msg using flux_request_unpack():

  • "upath" — relative path of the requested file.

  • "tag_prod" — Flux rank of the producer broker.

  • "cons_buf" — consumer’s RDMA buffer address as a decimal string. Parsed via atoll() and stored in dtl_handle->cons_buf_ptr as the remote destination address for ucp_put_nbx().

  • "pid_cons" — process ID of the consumer. Combined with tag_cons to form dtl_handle->consumer_conn_key, used as the endpoint cache lookup key.

  • "addr" — base64-encoded (RFC 4648) UCX worker address of the consumer. Decoded into a newly allocated dtl_handle->remote_address buffer.

  • "rkey" — base64-encoded (RFC 4648) UCX remote key. Decoded into a newly allocated dtl_handle->rkey_buf buffer.

After unpacking, both the consumer’s UCX worker address and remote key are base64-decoded from RFC 4648 encoding. The decoded address is used by dyad_dtl_ucx_establish_connection() to create a ucp_ep_h endpoint to the consumer (or retrieve a cached one). The decoded remote key is used by ucx_send_no_wait() via ucp_ep_rkey_unpack() to obtain the ucp_rkey_t needed for ucp_put_nbx().

The communication tag is computed as: dtl_handle->comm_tag = tag_prod << 32 | tag_cons

The endpoint cache key is computed as: dtl_handle->consumer_conn_key = pid << 32 | tag_cons

On any decoding failure the corresponding allocated buffer is freed and set to NULL before returning.

Note

Unlike the Margo backend which calls margo_addr_lookup() to resolve the consumer’s address, the UCX backend allocates a raw buffer for the decoded binary address and passes it directly to ucp_worker_get_address() during connection establishment.

Note

upath is owned by the Flux message msg and must not be freed by the caller. It remains valid only for the lifetime of msg.

Parameters:
  • ctx[in] DYAD context.

  • msg[in] Incoming Flux RPC message containing the JSON payload packed by dyad_dtl_ucx_rpc_pack().

  • upath[out] Set to the relative path of the requested file. Valid for the lifetime of msg.

Return values:
  • DYAD_RC_OK – Unpacking and decoding succeeded.

  • DYAD_RC_BADUNPACKflux_request_unpack() failed.

  • DYAD_RC_SYSFAIL – Failed to allocate the buffer for the decoded address or remote key.

  • DYAD_RC_BAD_B64DECODE – Base64 decoding of the address or remote key failed.

Returns:

dyad_rc_t return code:

dyad_rc_t dyad_dtl_ucx_rpc_respond(const dyad_ctx_t *ctx, const void **orig_msg)

Sends the initial RPC acknowledgement from the service to the consumer.

No-op for the UCX DTL. As with the Margo backend, no explicit acknowledgement over the Flux RPC channel is needed before data transfer begins — the producer connects directly to the consumer’s UCX worker address and pushes data via ucp_put_nbx().

Parameters:
  • ctx[in] DYAD context.

  • orig_msg[in] Unused by this backend.

Returns:

Always returns DYAD_RC_OK.

dyad_rc_t dyad_dtl_ucx_rpc_recv_response(const dyad_ctx_t *ctx, void **f)

Receives the initial RPC response from the service.

No-op for the UCX DTL. The consumer does not process a Flux RPC response before data transfer begins — it waits directly on the sentinel value in dtl_handle->net_buf via ucx_recv_no_wait(), which polls until the producer’s RDMA push has started writing data.

Parameters:
  • ctx[in] Unused by this backend.

  • f[in] Unused by this backend.

Returns:

Always returns DYAD_RC_OK.

dyad_rc_t dyad_dtl_ucx_get_buffer(const dyad_ctx_t *ctx, size_t data_size, void **data_buf)

Returns a pointer to the pre-allocated UCX RDMA buffer.

Unlike the Flux RPC and Margo backends which allocate a new buffer on each call, the UCX backend uses a single pre-allocated RDMA- registered buffer (dtl_handle->net_buf) allocated during dyad_dtl_ucx_init(). This function simply validates data_size and sets *data_buf to point to that pre-allocated buffer.

Reusing a pre-allocated buffer avoids the overhead of repeated ucp_mem_map() registrations, which are expensive because they pin memory and register it with the network hardware.

Todo:

Investigate and fix the *data_buf != NULL validation check that incorrectly fires for valid NULL-initialized pointers.

Note

The buffer validation check (data_buf == NULL or *data_buf != NULL) is currently disabled due to a known issue where the *data_buf != NULL check incorrectly evaluates to true even when data_buf points to a NULL pointer (see TODO in source).

Note

The caller must not free *data_buf — it points into the UCX-registered memory region managed by dtl_handle and must be released via dyad_dtl_ucx_return_buffer() which is itself a no-op for UCX (the buffer is only freed during finalization).

Parameters:
  • ctx[in] DYAD context.

  • data_size[in] Number of bytes needed. Must not exceed dtl_handle->max_transfer_size (UCX_MAX_TRANSFER_SIZE).

  • data_buf[out] Set to dtl_handle->net_buf on success.

Return values:
  • DYAD_RC_OK*data_buf set to the pre-allocated buffer.

  • DYAD_RC_BADBUFdata_size exceeds max_transfer_size.

Returns:

dyad_rc_t return code:

dyad_rc_t dyad_dtl_ucx_return_buffer(const dyad_ctx_t *ctx, void **data_buf)

Releases a reference to the UCX pre-allocated RDMA buffer.

Unlike the Flux RPC and Margo backends which free() the buffer, the UCX backend only sets *data_buf to NULL since the buffer is pre-allocated and RDMA-registered during dyad_dtl_ucx_init() and must persist for the lifetime of the DTL handle. The actual memory is released by ucx_free_buffer() during finalization.

Validates data_buf before clearing:

  • If data_buf is NULL or *data_buf is NULL, returns DYAD_RC_BADBUF.

Parameters:
  • ctx[inout] DYAD context.

  • data_buf[inout] Pointer to the buffer pointer to clear. *data_buf is set to NULL on success.

Return values:
  • DYAD_RC_OK*data_buf cleared successfully.

  • DYAD_RC_BADBUFdata_buf or *data_buf is NULL.

Returns:

dyad_rc_t return code:

dyad_rc_t dyad_dtl_ucx_establish_connection(const dyad_ctx_t *ctx)

Establishes a UCX endpoint connection for data transfer.

Behavior differs by communication direction:

  • DYAD_COMM_SEND (producer): Looks up the consumer’s endpoint in the endpoint cache via dyad_ucx_ep_cache_find() using dtl_handle->remote_address as the key. If no cached endpoint exists, creates a new one via dyad_ucx_ep_cache_insert(), which calls ucp_ep_create() and stores the result in the cache. The endpoint is stored in dtl_handle->ep for use by ucx_send_no_wait(). If debug is true, prints endpoint information to stderr via ucp_ep_print_info().

  • DYAD_COMM_RECV (consumer): No-op. The consumer does not need to create an endpoint — it passively waits for the producer to push data into the pre-registered RDMA buffer via ucp_put_nbx().

The endpoint cache avoids recreating ucp_ep_h objects for repeated transfers to the same consumer, which is significant because UCX endpoint creation involves connection establishment with the remote worker and is expensive relative to the data transfer itself.

Parameters:

ctx[in] DYAD context. The remote address, endpoint cache, UCX worker, and endpoint pointer are read from and written to the UCX DTL internal state.

Return values:
  • DYAD_RC_OK – Connection established or not needed (consumer side).

  • DYAD_RC_BAD_COMM_MODEdtl_handle->comm_mode is invalid.

  • other – Any error code returned by dyad_ucx_ep_cache_insert() if endpoint creation failed.

Returns:

dyad_rc_t return code:

dyad_rc_t dyad_dtl_ucx_send(const dyad_ctx_t *ctx, void *buf, size_t buflen)

Sends file data to the consumer via UCX RDMA push.

Initiates a non-blocking RDMA put via ucx_send_no_wait() with is_warmup=false, then blocks until the operation completes by calling dyad_ucx_request_wait(). The data is pushed directly into the consumer’s pre-registered RDMA buffer at dtl_handle->cons_buf_ptr using the endpoint and remote key established during dyad_dtl_ucx_establish_connection() and dyad_dtl_ucx_rpc_unpack().

Note

The producer prepends the file size to buf before calling this function (in dyad_fetch_request_cb()), so the consumer can use the first sizeof(size_t) bytes as a sentinel in ucx_recv_no_wait() to detect when the push has started.

Parameters:
  • ctx[in] DYAD context.

  • buf[in] Buffer containing the file data to send. Must point to the pre-allocated UCX-registered buffer (dtl_handle->net_buf) since ucp_put_nbx() requires the source buffer to be registered with UCX.

  • buflen[in] Number of bytes to send.

Return values:
Returns:

dyad_rc_t return code:

dyad_rc_t dyad_dtl_ucx_recv(const dyad_ctx_t *ctx, void **buf, size_t *buflen)

Receives file data from the producer via UCX RDMA push.

Calls ucx_recv_no_wait() with is_warmup=false to busy-poll the first sizeof(ssize_t) bytes of dtl_handle->net_buf until they become non-zero, indicating that the producer has started the RDMA push. After ucx_recv_no_wait() returns, the received data is available directly in dtl_handle->net_bufbuf and buflen are not populated by this function and must be read by the caller from the pre-allocated buffer via dyad_dtl_ucx_get_buffer().

Todo:

Add error handling for cases where the RDMA push fails or times out. The current implementation spins indefinitely if the sentinel never becomes non-zero.

Note

The return value of ucx_recv_no_wait() is intentionally discarded ((void)stat_ptr) since it always returns NULL for the UCX backend — the actual completion is signalled by the buffer sentinel, not by a UCX request handle.

Note

Unlike the Flux RPC backend where buf and buflen are populated with a newly allocated copy of the received data, the UCX backend leaves data in the pre-allocated dtl_handle->net_buf. The caller reads it directly from there without an additional copy.

Parameters:
  • ctx[in] DYAD context.

  • buf[out] Not populated by this function. The received data is in dtl_handle->net_buf.

  • buflen[out] Not populated by this function.

Returns:

Always returns DYAD_RC_OK.

dyad_rc_t dyad_dtl_ucx_close_connection(const dyad_ctx_t *ctx)

Closes the UCX DTL data channel after a transfer completes.

Behavior differs by communication direction:

  • DYAD_COMM_SEND (producer): Destroys the unpacked remote key via ucp_rkey_destroy() — the remote key is unpacked per-transfer in ucx_send_no_wait() and must be destroyed after each send. Clears dtl_handle->ep, dtl_handle->remote_address, dtl_handle->remote_addr_len, and dtl_handle->comm_tag.

    The endpoint itself is not disconnected — it is retained in the endpoint cache for reuse in future transfers to the same consumer, avoiding the cost of reconnection. LRU eviction of stale endpoints from the cache is not yet implemented (see TODO).

    dtl_handle->remote_address is set to NULL but not freed — the pointer is still referenced by the endpoint cache entry and must remain valid until the cache entry is evicted (see TODO).

  • DYAD_COMM_RECV (consumer): No-op beyond resetting dtl_handle->comm_tag to 0. The consumer has no endpoint to close since it passively receives data via the pre-registered RDMA buffer without creating a ucp_ep_h.

Todo:

Implement LRU eviction for the endpoint cache to reclaim resources for endpoints that have not been used recently. The commented-out ucx_disconnect() call shows the intended disconnect path that would be used on eviction.

Todo:

Free dtl_handle->remote_address here once the endpoint cache no longer references it, either by copying the address into the cache entry or by reference-counting it.

Parameters:

ctx[in] DYAD context.

Return values:
  • DYAD_RC_OK – Connection closed successfully.

  • DYAD_RC_BAD_COMM_MODEdtl_handle->comm_mode is invalid.

Returns:

dyad_rc_t return code:

dyad_rc_t dyad_dtl_ucx_finalize(const dyad_ctx_t *ctx)

Finalizes and frees the UCX DTL internal state.

Releases all UCX resources in the correct teardown order, guarding each step with a NULL check to allow safe partial finalization when called after a failed dyad_dtl_ucx_init():

  1. If dtl_handle->ep is non-NULL, closes the active connection via dyad_dtl_ucx_close_connection() to destroy the unpacked remote key and clear connection state, then sets ep to NULL.

  2. If dtl_handle->ep_cache is non-NULL, finalizes the endpoint cache via dyad_ucx_ep_cache_finalize(), which disconnects and destroys all cached ucp_ep_h endpoints, then sets ep_cache to NULL.

  3. If dtl_handle->local_address is non-NULL, releases the local worker address via ucp_worker_release_address() and sets it to NULL.

  4. If dtl_handle->mem_handle is non-NULL, unmaps and frees the pre-allocated RDMA buffer via ucx_free_buffer(), which calls ucp_mem_unmap() and sets dtl_handle->net_buf to NULL.

  5. If dtl_handle->ucx_worker is non-NULL, destroys the UCX worker via ucp_worker_destroy() and sets it to NULL.

  6. If dtl_handle->ucx_ctx is non-NULL, releases the UCX context via ucp_cleanup() and sets it to NULL.

  7. Sets dtl_handle->h to NULL. The Flux handle is non-owning and must not be closed here — it is managed by the DYAD context.

  8. Frees the dyad_dtl_ucx struct and sets the handle pointer to NULL.

If ctx->dtl_handle is NULL or ctx->dtl_handle->private_dtl.ucx_dtl_handle is NULL, the function is a no-op and returns DYAD_RC_OK.

Note

The endpoint cache must be finalized before the UCX worker is destroyed, since endpoint disconnection requires the worker to be active to flush pending operations.

Note

The RDMA buffer must be unmapped before the UCX context is cleaned up, since ucp_mem_unmap() requires a valid UCX context.

Note

This function only frees the dyad_dtl_ucx struct. The outer dyad_dtl handle is freed by dyad_dtl_finalize(), which calls this function as part of the full teardown sequence.

Parameters:

ctx[in] DYAD context. On return, ctx->dtl_handle->private_dtl.ucx_dtl_handle is NULL.

Returns:

Always returns DYAD_RC_OK.

Variables

const base64_maps_t base64_maps_rfc4648

Base64 encoding map for RFC 4648, used to encode/decode UCX remote keys for transmission over the Flux RPC channel.

struct ucx_request

Request struct used to track the completion of async UCX operations.

UCX allocates one instance of this struct per in-flight operation using dyad_ucx_request_init() as the initializer. The completed flag is set to 1 by the send callback dyad_send_callback() when the operation finishes, and polled by dyad_ucx_request_wait().

Public Members

int completed

Set to 1 when the UCX operation completes.

Typedefs

using key_type = uint64_t

Key type for the UCX endpoint cache.

A 64-bit integer combining the consumer’s process ID and communication tag (pid << 32 | tag_cons), uniquely identifying a consumer connection within a job.

using cache_type = std::unordered_map<key_type, void*>

UCX endpoint cache type.

An unordered map from key_type to ucp_ep_h, used to cache UCX endpoints keyed by consumer connection key. Lookups and insertions are O(1) on average. Cached endpoints are reused across transfers to the same consumer to avoid the cost of repeated ucp_ep_create() calls.

Functions

static void dyad_ucx_ep_err_handler(void *arg, void *ep, int status)

UCX endpoint error handler callback.

Registered as the error handler for UCX endpoints via the UCP_EP_PARAM_FIELD_ERR_HANDLER field of ucp_ep_params_t. Called by UCX when an error occurs on a ucp_ep_h, for example when the remote peer disconnects unexpectedly or a network failure is detected.

Currently logs the error via DYAD_LOG_ERROR and returns. No recovery action is taken.

Marked ((unused)) because error handler registration is not yet wired into ucx_connect() — the handler is defined but not currently passed to ucp_ep_create() (see TODO).

Todo:

Wire this handler into ucx_connect() by setting UCP_EP_PARAM_FIELD_ERR_HANDLER and params.err_handler in ucp_ep_params_t so that endpoint errors are caught and logged at runtime.

Parameters:
  • arg[in] User argument passed during endpoint creation. Cast to dyad_ctx_t* for logging.

  • ep[in] The endpoint on which the error occurred.

  • status[in] UCX error status describing the failure.

dyad_rc_t ucx_connect(const dyad_ctx_t *ctx, void *worker, const void *addr, void **ep)

Creates a UCX endpoint to a remote worker.

Creates a ucp_ep_h to the remote worker identified by addr via ucp_ep_create(). The endpoint is initialized with only UCP_EP_PARAM_FIELD_REMOTE_ADDRESS set — no error handler is registered (see TODO in dyad_ucx_ep_err_handler()).

Validates the created endpoint — if ucp_ep_create() succeeds but returns a NULL endpoint, the function treats this as a failure and returns DYAD_RC_UCXCOMM_FAIL.

Note

This function is used both during the warmup loopback connection in ucx_warmup() and during real connection establishment in dyad_ucx_ep_cache_insert(). In the normal transfer path, dyad_dtl_ucx_establish_connection() uses the endpoint cache and calls this function indirectly via dyad_ucx_ep_cache_insert() only when no cached endpoint exists for the consumer.

Parameters:
  • ctx[in] DYAD context. Used for logging.

  • worker[in] UCX worker on which to create the endpoint.

  • addr[in] UCX address of the remote worker to connect to.

  • ep[out] Set to the created ucp_ep_h on success.

Return values:
  • DYAD_RC_OK – Endpoint created successfully.

  • DYAD_RC_UCXCOMM_FAILucp_ep_create() failed or returned a NULL endpoint.

Returns:

dyad_rc_t return code:

dyad_rc_t ucx_disconnect(const dyad_ctx_t *ctx, void *worker, void *ep)

Closes a UCX endpoint and waits for the closure to complete.

Initiates a non-blocking endpoint close via ucp_ep_close_nbx() (UCX >= 1.10) or ucp_ep_close_nb() (UCX < 1.10), both using UCP_EP_CLOSE_FLAG_FORCE / UCP_EP_CLOSE_MODE_FORCE to forcefully terminate the endpoint without waiting for in-flight operations to complete. If ep is NULL the function is a no-op.

Unlike other UCX non-blocking operations, endpoint close does not use dyad_ucx_request_wait() because the close request behaves differently from communication requests — it must be waited on by calling ucp_worker_progress() directly rather than through the standard request polling path. The wait loop is therefore inlined:

  • If the returned stat_ptr is a request handle (UCS_PTR_IS_PTR), spins calling ucp_worker_progress() and ucp_request_check_status() until the status is no longer UCS_INPROGRESS, then frees the request via ucp_request_free().

  • If the returned stat_ptr encodes an error (UCS_PTR_IS_ERR), extracts the status code for reporting.

  • If stat_ptr is NULL, the close completed immediately.

Note

Force-close is used regardless of UCX version. If error handler mode is enabled in the future, the close mode may need to change to UCP_EP_CLOSE_MODE_FLUSH to allow in-flight operations to drain before closing (see TODO in source).

Note

This function is called by dyad_ucx_ep_cache_finalize() when evicting endpoints from the cache, and by ucx_warmup() after the loopback warmup transfer. It is not called during normal transfer close — dyad_dtl_ucx_close_connection() sets dtl_handle->ep to NULL and relies on the cache to manage the endpoint lifetime.

Parameters:
  • ctx[in] DYAD context. Used for logging.

  • worker[in] UCX worker needed to progress the close operation.

  • ep[in] UCX endpoint to close. If NULL, the function is a no-op and returns DYAD_RC_OK.

Return values:
  • DYAD_RC_OK – Endpoint closed successfully or was NULL.

  • DYAD_RC_UCXEP_FAIL – The endpoint close operation failed. The endpoint can no longer be used regardless of the return code.

Returns:

dyad_rc_t return code:

dyad_rc_t dyad_ucx_ep_cache_init(const dyad_ctx_t *ctx, void **cache)

Allocates and initializes the UCX endpoint cache.

Allocates a new cache_type (std::unordered_map<key_type, ucp_ep_h>) using new(std::nothrow) and stores a reinterpret_cast pointer to it in *cache as an opaque ucx_ep_cache_h handle. Using std::nothrow ensures that allocation failure returns nullptr rather than throwing std::bad_alloc.

Validates cache before allocation:

  • If cache is nullptr, the caller passed an invalid output pointer and DYAD_RC_BADBUF is returned.

  • If *cache is already non-nullptr, a cache is already present and overwriting it would leak the existing allocation, so DYAD_RC_BADBUF is returned.

Todo:

Add an option to configure replacement strategy

Parameters:
  • ctx[in] DYAD context. Used for logging.

  • cache[out] Must point to a nullptr on entry. Set to the allocated cache handle on success.

Return values:
  • DYAD_RC_OK – Cache allocated successfully.

  • DYAD_RC_BADBUFcache is nullptr or *cache is already non-nullptr.

  • DYAD_RC_SYSFAILnew(std::nothrow) failed to allocate the cache.

Returns:

dyad_rc_t return code:

dyad_rc_t dyad_ucx_ep_cache_find(const dyad_ctx_t *ctx, const void *cache, const void *addr, const size_t addr_size, void **ep)

Looks up a cached UCX endpoint by consumer connection key.

Searches the endpoint cache for an entry matching ctx->dtl_handle->private_dtl.ucx_dtl_handle->consumer_conn_key (pid << 32 | tag_cons). If found, sets *ep to the cached ucp_ep_h and returns DYAD_RC_OK. If not found, sets *ep to nullptr and returns DYAD_RC_NOTFOUND.

Validates ep before searching:

  • If ep is nullptr, the caller passed an invalid output pointer and DYAD_RC_BADBUF is returned.

  • If *ep is already non-nullptr, an endpoint is already present and overwriting it would leak the existing handle, so DYAD_RC_BADBUF is returned.

Note

The addr and addr_size parameters are accepted for interface consistency but are not used — the lookup is performed by consumer_conn_key rather than by raw address bytes, since ucp_address_t is an opaque struct that cannot be reliably compared byte-by-byte.

Note

All cache operations are wrapped in a try / catch(...) block to prevent C++ exceptions from propagating into the C calling code. Any exception is caught and returned as DYAD_RC_SYSFAIL.

Parameters:
  • ctx[in] DYAD context. The consumer_conn_key used as the cache lookup key is read from the UCX DTL internal state.

  • cache[in] Endpoint cache handle to search.

  • addr[in] Unused. Accepted for interface consistency.

  • addr_size[in] Unused. Accepted for interface consistency.

  • ep[out] Must point to a nullptr on entry. Set to the cached ucp_ep_h on success, or nullptr if not found.

Return values:
  • DYAD_RC_OK – A cached endpoint was found and returned.

  • DYAD_RC_NOTFOUND – No cached endpoint exists for the current consumer_conn_key.

  • DYAD_RC_BADBUFep is nullptr or *ep is already non-nullptr.

  • DYAD_RC_SYSFAIL – An unexpected C++ exception was thrown during the cache lookup.

Returns:

dyad_rc_t return code:

dyad_rc_t dyad_ucx_ep_cache_insert(const dyad_ctx_t *ctx, void *cache, const void *addr, const size_t addr_size, void *worker)

Inserts a new UCX endpoint into the cache, creating it if needed.

Looks up consumer_conn_key in the cache. If an entry already exists, returns DYAD_RC_OK immediately without creating a new endpoint — this handles the case where dyad_dtl_ucx_establish_connection() calls dyad_ucx_ep_cache_find() and dyad_ucx_ep_cache_insert() in sequence and the entry was inserted between the two calls.

If no entry exists, creates a new ucp_ep_h to addr via ucx_connect() and inserts it into the cache using insert_or_assign(). The new endpoint is also stored directly in ctx->dtl_handle->private_dtl.ucx_dtl_handle->ep for immediate use by the caller.

Note

All cache operations are wrapped in a try / catch(...) block to prevent C++ exceptions from propagating into the C calling code. Any exception is caught and returned as DYAD_RC_SYSFAIL.

Note

insert_or_assign() is used instead of insert() to handle the race condition where a concurrent insert could have occurred between the find() check and the insert. Since DYAD uses UCS_THREAD_MODE_SERIALIZED, true concurrency is not possible, but insert_or_assign() is safer and has no performance cost in the non-concurrent case.

Parameters:
  • ctx[in] DYAD context. The consumer_conn_key and ep fields of the UCX DTL internal state are read and written.

  • cache[in] Endpoint cache handle to insert into.

  • addr[in] UCX address of the remote worker to connect to. Passed to ucx_connect() if no cached entry exists.

  • addr_size[in] Size of addr in bytes. Passed to ucx_connect() for consistency.

  • worker[in] UCX worker used to create the new endpoint via ucx_connect().

Return values:
  • DYAD_RC_OK – A cached entry already existed, or a new endpoint was created and inserted successfully.

  • DYAD_RC_SYSFAIL – An unexpected C++ exception was thrown during the cache operation.

  • other – Any error code returned by ucx_connect() if endpoint creation failed.

Returns:

dyad_rc_t return code:

static inline cache_type::iterator cache_remove_impl(const dyad_ctx_t *ctx, cache_type *cache, cache_type::iterator it, void *worker)

Internal helper that disconnects and removes a single cache entry.

If it is a valid iterator (not cache->end()), disconnects the endpoint via ucx_disconnect(), erases the entry from the cache via cache->erase(), and returns the iterator to the next entry. If it is cache->end(), returns cache->end() immediately as a no-op.

Used by both dyad_ucx_ep_cache_remove() for single-entry removal and dyad_ucx_ep_cache_finalize() to iterate over and remove all entries.

Note

The comment in the source mentions that the UCP address was allocated with malloc() during RPC unpacking. However the current implementation does not free the address here — it was extracted from dtl_handle->remote_address and is cleared in dyad_dtl_ucx_close_connection(). See the TODO in dyad_dtl_ucx_close_connection() regarding ownership of remote_address.

Parameters:
  • ctx[in] DYAD context. Used for logging in ucx_disconnect().

  • cache[in] The cache from which to remove the entry.

  • it[in] Iterator to the entry to remove. If equal to cache->end(), the function is a no-op.

  • worker[in] UCX worker passed to ucx_disconnect() to progress the endpoint close operation.

Returns:

Iterator to the entry following the removed one, or cache->end() if it was already cache->end().

dyad_rc_t dyad_ucx_ep_cache_remove(const dyad_ctx_t *ctx, void *cache, const void *addr, const size_t addr_size, void *worker)

Removes a single UCX endpoint from the cache and disconnects it.

Looks up consumer_conn_key in the cache and, if found, delegates to cache_remove_impl() to disconnect the endpoint via ucx_disconnect() and erase the cache entry. If no entry is found for the key, the function is effectively a no-op and returns DYAD_RC_OK.

Note

All cache operations are wrapped in a try / catch(...) block to prevent C++ exceptions from propagating into the C calling code.

Note

addr and addr_size are accepted for interface consistency but are not used — the removal is performed by consumer_conn_key rather than by raw address bytes.

Parameters:
  • ctx[in] DYAD context. The consumer_conn_key used as the cache lookup key is read from the UCX DTL internal state.

  • cache[in] Endpoint cache handle to remove from.

  • addr[in] Unused. Accepted for interface consistency.

  • addr_size[in] Unused. Accepted for interface consistency.

  • worker[in] UCX worker passed to ucx_disconnect() to progress the endpoint close operation.

Return values:
  • DYAD_RC_OK – Entry removed successfully or key not found.

  • DYAD_RC_SYSFAIL – An unexpected C++ exception was thrown during the cache operation.

Returns:

dyad_rc_t return code:

dyad_rc_t dyad_ucx_ep_cache_finalize(const dyad_ctx_t *ctx, void **cache, void *worker)

Finalizes and frees the UCX endpoint cache.

Iterates over all entries in the cache, disconnecting and removing each endpoint via cache_remove_impl(). After all entries are removed, deletes the cache_type object and sets *cache to nullptr.

The iteration uses the iterator returned by cache_remove_impl() rather than incrementing the iterator manually, since erase() invalidates the current iterator. cache_remove_impl() returns the iterator to the next valid entry after the erased one, making this a safe and correct way to drain the entire cache.

If cache is nullptr or *cache is nullptr, the function is a no-op and returns DYAD_RC_OK. This allows safe calls on a partially initialized or already-finalized cache.

Todo:

Add a try / catch block around the iteration and delete to prevent C++ exceptions from propagating into the C calling code in dyad_dtl_ucx_finalize().

Note

This function is called by dyad_dtl_ucx_finalize() as part of the full UCX DTL teardown sequence. The endpoint cache must be finalized before the UCX worker is destroyed, since ucx_disconnect() requires the worker to be active to progress the endpoint close operations.

Note

Unlike dyad_ucx_ep_cache_remove() which wraps operations in a try / catch block, this function does not — any C++ exception thrown during iteration or deletion will propagate to the caller. Since this is called only from C++ translation units this is acceptable, but a try / catch wrapper could be added for robustness (see TODO).

Parameters:
  • ctx[in] DYAD context. Used for logging in cache_remove_impl()ucx_disconnect().

  • cache[inout] Pointer to the cache handle to finalize. *cache is set to nullptr on return. If nullptr or *cache is nullptr, the function is a no-op.

  • worker[in] UCX worker passed to ucx_disconnect() for each endpoint in the cache.

Returns:

Always returns DYAD_RC_OK.