DYAD Data Transport API
Once consumer identifies the owner of the file it tries to access via metadata, and it finds out the owner is at a remote location, i.e., the file is not locally visible, it request the owner to transfer the file. DYAD offers multiple backend implementations to transfer data from producer to consumer. The streaming RPC that is native mechanism of Flux, UCX with RDMA, and the Margo+Mercury framework.
Functions
-
dyad_rc_t dyad_dtl_init(dyad_ctx_t *ctx, dyad_dtl_mode_t mode, dyad_dtl_comm_mode_t comm_mode, bool debug)
Initializes the Data Transport Layer for a DYAD context.
Allocates the
dyad_dtlhandle insidectxand delegates to the backend-specific initialization function based onmode:DYAD_DTL_UCX→dyad_dtl_ucx_init()(only if built withDYAD_ENABLE_UCX_DTL)DYAD_DTL_MARGO→dyad_dtl_margo_init()(only if built withDYAD_ENABLE_MARGO_DTL)DYAD_DTL_FLUX_RPC→dyad_dtl_flux_init()(always available)
If
modedoes not match any enabled backend, returnsDYAD_RC_BADDTLMODEwithout initializing the handle.See also
dyad_dtl_mode_t.
See also
dyad_dtl_comm_mode_t.
- Parameters:
ctx – [inout] DYAD context. On success,
ctx->dtl_handleis allocated and initialized.mode – [in] DTL backend to use.
comm_mode – [in] Communication direction (
DYAD_COMM_SENDfor producer,DYAD_COMM_RECVfor consumer).debug – [in] If
true, enables verbose debug logging in the DTL backend.
- Return values:
DYAD_RC_OK – Initialization succeeded.
DYAD_RC_SYSFAIL – Failed to allocate the
dyad_dtlhandle.DYAD_RC_BADDTLMODE –
modedoes not match any enabled backend.other – Any error code returned by the backend-specific init function.
- Returns:
dyad_rc_treturn code:
-
dyad_rc_t dyad_dtl_finalize(dyad_ctx_t *ctx)
Finalizes and frees the Data Transport Layer for a DYAD context.
Delegates to the backend-specific finalization function based on
ctx->dtl_handle->mode, then frees thedyad_dtlhandle and setsctx->dtl_handletoNULL.If
ctx->dtl_handleis alreadyNULL, the function is a no-op and returnsDYAD_RC_OK. This allowsdyad_dtl_finalize()to be called safely on an already-finalized context without error.Backend dispatch:
DYAD_DTL_UCX→dyad_dtl_ucx_finalize()(only if built withDYAD_ENABLE_UCX_DTLand the UCX handle is non-NULL)DYAD_DTL_MARGO→dyad_dtl_margo_finalize()(only if built withDYAD_ENABLE_MARGO_DTL)DYAD_DTL_FLUX_RPC→dyad_dtl_flux_finalize()(only if the Flux handle is non-NULL)
Note
The
dyad_dtlhandle is always freed and set toNULLregardless of whether the backend finalization succeeds or fails. Any error code returned by the backend is overwritten withDYAD_RC_OKbefore returning — this is intentional, as finalization errors are non-recoverable and should not prevent the caller from continuing teardown.- Parameters:
ctx – [inout] DYAD context. On return,
ctx->dtl_handleisNULL.- Return values:
DYAD_RC_OK – Finalization succeeded, or the handle was already
NULL.DYAD_RC_BADDTLMODE – The mode stored in the handle does not match any enabled backend. The handle is still freed.
- Returns:
dyad_rc_treturn code:
Flux Streaming RPC
The Flux RPC DTL backend uses the built-in RPC mechanism provided by the Flux framework, requiring no additional networking dependencies beyond Flux itself.
Flux is a next-generation resource manager and job scheduler for HPC systems. Beyond job and resource management, Flux provides a distributed messaging infrastructure based on ZeroMQ that allows processes running under the same Flux instance to communicate via lightweight RPCs. Each Flux broker runs on a node and handles message routing between processes across the job allocation.
Flux streaming RPC extends the basic request-response RPC model to
allow the service to send multiple responses to a single request, with
ENODATA signalling end-of-stream. DYAD uses this mechanism on the
producer side — the DYAD service responds to a consumer fetch request
by sending the file data as one or more raw payloads via
flux_respond_raw(), and signals completion by responding with
ENODATA via flux_respond_error(). The consumer reads successive
responses from the same flux_future_t via flux_rpc_get_raw()
until it receives ENODATA.
Functions
-
dyad_rc_t dyad_dtl_flux_init(const dyad_ctx_t *ctx, dyad_dtl_mode_t mode, dyad_dtl_comm_mode_t comm_mode, bool debug)
Initializes the Flux RPC DTL internal state.
Allocates and populates the
dyad_dtl_fluxinternal state struct, then wires all function pointers inctx->dtl_handleto their Flux RPC implementations:rpc_pack->dyad_dtl_flux_rpc_packrpc_unpack->dyad_dtl_flux_rpc_unpackrpc_respond->dyad_dtl_flux_rpc_respondrpc_recv_response->dyad_dtl_flux_rpc_recv_responseget_buffer->dyad_dtl_flux_get_bufferreturn_buffer->dyad_dtl_flux_return_bufferestablish_connection->dyad_dtl_flux_establish_connectionsend->dyad_dtl_flux_sendrecv->dyad_dtl_flux_recvclose_connection->dyad_dtl_flux_close_connection
The Flux handle is borrowed from
ctx->hand stored as a non-owning pointer — it is not closed bydyad_dtl_flux_finalize(). The pending future (f) and message (msg) fields are initialized toNULLand are set during RPC operations.- Todo:
Remove
modeparameter — it is already stored inctx->dtl_handle->modebydyad_dtl_init()before this function is called.
- Parameters:
ctx – [in] DYAD context.
ctx->dtl_handlemust already be allocated bydyad_dtl_init().mode – [in] DTL mode (must be
DYAD_DTL_FLUX_RPC. see TODO).comm_mode – [in] Communication direction (
DYAD_COMM_SENDfor producer,DYAD_COMM_RECVfor consumer).debug – [in] If
true, enables verbose debug logging.
- Return values:
DYAD_RC_OK – Initialization succeeded.
DYAD_RC_SYSFAIL – Failed to allocate the
dyad_dtl_fluxstruct.
- Returns:
dyad_rc_treturn code:
-
dyad_rc_t dyad_dtl_flux_rpc_pack(const dyad_ctx_t *ctx, const char *upath, uint32_t producer_rank, void ***packed_obj)
Packs a file fetch request into a JSON object for a Flux RPC call.
Creates a Jansson JSON object of the form
{"upath": “<upath>”} usingjson_pack(). Theproducer_rankparameter is accepted for interface consistency with other DTL backends but is not included in the packed object — the Flux RPC is routed to the producer broker by rank at the Flux level rather than embedded in the payload.The caller is responsible for decrementing the reference count of
packed_objviajson_decref()when it is no longer needed.- Parameters:
ctx – [in] DYAD context.
upath – [in] Relative path of the file to fetch.
producer_rank – [in] Flux rank of the producer broker. Not embedded in the payload for Flux RPC.
packed_obj – [out] Set to the allocated JSON object on success. Undefined on failure.
- Return values:
DYAD_RC_OK – The JSON object was created successfully.
DYAD_RC_BADPACK –
json_pack()failed to create the object.
- Returns:
dyad_rc_treturn code:
-
dyad_rc_t dyad_dtl_flux_rpc_unpack(const dyad_ctx_t *ctx, const void **msg, char **upath)
Unpacks a file fetch request from an incoming Flux RPC message.
Extracts the
upathfield from the JSON payload ofmsgusingflux_request_unpack(). On success, stores a non-owning pointer tomsgin the Flux DTL handle so thatdyad_dtl_flux_rpc_respond()can respond to the same message later without the caller needing to pass it again.Note
The
upathstring is owned by the Flux messagemsgand must not be freed by the caller. It remains valid only for the lifetime ofmsg.Note
The stored
msgpointer is non-owning — the Flux DTL does not free or destroy the message.Note
msgis stored as a non-owning pointer in the Flux DTL handle for use bydyad_dtl_flux_send(), which callsflux_respond_raw()with the stored message to route the response back to the correct consumer. In the Flux streaming RPC protocol, the original request message serves as the reply address — without it the producer cannot direct the response to the consumer that issued the request. The message is owned by the Flux broker and must not be freed by DYAD.- Parameters:
ctx – [in] DYAD context.
msg – [in] Incoming Flux RPC message containing the JSON payload.
upath – [out] Set to the relative path of the requested file, extracted from the message payload. Valid for the lifetime of
msg.
- Return values:
DYAD_RC_OK – Unpacking succeeded and
upathis set.DYAD_RC_BADUNPACK –
flux_request_unpack()failed to extract theupathfield from the message.
- Returns:
dyad_rc_treturn code:
-
dyad_rc_t dyad_dtl_flux_rpc_respond(const dyad_ctx_t *ctx, const void **orig_msg)
Sends the initial RPC acknowledgement from the service to the consumer.
For the Flux RPC DTL backend this is a no-op — the Flux streaming RPC protocol does not require an explicit acknowledgement before data transfer begins. The function exists to satisfy the
rpc_respondfunction pointer indyad_dtland to maintain a consistent interface across all backends.- Parameters:
ctx – [in] DYAD context.
orig_msg – [in] The incoming Flux RPC message to respond to. Unused by this backend.
- Returns:
Always returns
DYAD_RC_OK.
-
dyad_rc_t dyad_dtl_flux_rpc_recv_response(const dyad_ctx_t *ctx, void **f)
Receives the initial RPC response from the service and stores the Flux future for subsequent data transfer.
Stores
fin the Flux DTL handle so that subsequent calls todyad_dtl_flux_recv()can retrieve data from the same streaming RPC future without the caller needing to pass it again.For the Flux RPC backend, no blocking wait is performed here — the future is stored and consumed lazily by
dyad_dtl_flux_recv().- Parameters:
ctx – [in] DYAD context.
f – [in] Flux future representing the pending streaming RPC response. Stored as a non-owning pointer; the caller retains ownership and must destroy it after the transfer is complete.
- Returns:
Always returns
DYAD_RC_OK.
-
dyad_rc_t dyad_dtl_flux_get_buffer(const dyad_ctx_t *ctx, size_t data_size, void **data_buf)
Allocates a page-aligned buffer for Flux RPC data transfer.
Allocates a buffer of
data_sizebytes aligned to the system page size viaposix_memalign(). Page alignment is required for compatibility with direct I/O and RDMA-capable transports that may be used alongside the Flux RPC backend.The function validates
data_bufbefore allocation:If
data_bufisNULL, the caller passed an invalid output pointer andDYAD_RC_BADBUFis returned.If
*data_bufis non-NULL, a buffer is already present and overwriting it would cause a memory leak, soDYAD_RC_BADBUFis returned.
The allocated buffer must be released via
dyad_dtl_flux_return_buffer().Note
A plain
malloc()path exists in the source but is disabled (#if0). The active path always usesposix_memalign().- Parameters:
ctx – [in] DYAD context.
data_size – [in] Number of bytes to allocate.
data_buf – [out] Must point to a
NULLpointer on entry. Set to the allocated page-aligned buffer on success.
- Return values:
DYAD_RC_OK – Buffer allocated successfully.
DYAD_RC_BADBUF –
data_bufisNULLor*data_bufis already non-NULL.DYAD_RC_SYSFAIL –
posix_memalign()failed.
- Returns:
dyad_rc_treturn code:
-
dyad_rc_t dyad_dtl_flux_return_buffer(const dyad_ctx_t *ctx, void **data_buf)
Releases a buffer previously allocated by
dyad_dtl_flux_get_buffer().Frees the buffer pointed to by
*data_buf. The function validatesdata_bufbefore freeing:If
data_bufisNULL, the caller passed an invalid pointer andDYAD_RC_BADBUFis returned.If
*data_bufisNULL, the buffer has already been freed or was never allocated, andDYAD_RC_BADBUFis returned.
- Todo:
Set
*data_buftoNULLafterfree()to prevent use-after-free. This is a one-line fix:
free (*data_buf); *data_buf = NULL;
Note
The caller is responsible for setting
*data_buftoNULLafter this call. Unlike some other buffer management functions in DYAD, this function does not null the pointer on return.- Parameters:
ctx – [in] DYAD context.
data_buf – [inout] Pointer to the buffer to free.
*data_bufmust be non-NULLon entry.
- Return values:
DYAD_RC_OK – Buffer freed successfully.
DYAD_RC_BADBUF –
data_bufisNULLor*data_bufisNULL.
- Returns:
dyad_rc_treturn code:
-
dyad_rc_t dyad_dtl_flux_establish_connection(const dyad_ctx_t *ctx)
Establishes the DTL data channel for the Flux RPC backend.
For the Flux RPC backend this is a no-op — data is transferred directly over the existing Flux streaming RPC connection established during
rpc_pack()/rpc_recv_response(), so no additional connection setup is required. The function exists to satisfy theestablish_connectionfunction pointer indyad_dtland to maintain a consistent interface across all backends.- Parameters:
ctx – [in] DYAD context.
- Returns:
Always returns
DYAD_RC_OK.
-
dyad_rc_t dyad_dtl_flux_send(const dyad_ctx_t *ctx, void *buf, size_t buflen)
Sends file data to the consumer via a Flux RPC response.
Sends
bufas the raw payload of a Flux RPC response usingflux_respond_raw(). The response is addressed to the consumer using the message stored in the Flux DTL handle bydyad_dtl_flux_rpc_unpack(). This is a streaming RPC response — the consumer receives it viadyad_dtl_flux_recv()which reads successive responses from the same Flux future untilENODATAsignals end-of-stream.- Parameters:
ctx – [in] DYAD context. The Flux handle and the stored request message are read from the Flux DTL handle.
buf – [in] Buffer containing the file data to send.
buflen – [in] Number of bytes in
buf.
- Return values:
DYAD_RC_OK – Data sent successfully.
DYAD_RC_FLUXFAIL –
flux_respond_raw()failed.
- Returns:
dyad_rc_treturn code:
-
dyad_rc_t dyad_dtl_flux_recv(const dyad_ctx_t *ctx, void **buf, size_t *buflen)
Receives file data from the producer via a Flux streaming RPC.
Retrieves the raw payload of the next Flux RPC response from the future stored in the Flux DTL handle by
dyad_dtl_flux_rpc_recv_response(). The received data is copied into a freshly allocated buffer obtained viactx->dtl_handle->get_buffer().After retrieval,
flux_future_reset()is called on the stored future regardless of success or failure, allowing the future to be reused for subsequent streaming responses in the same RPC session.Error handling:
If the stored future is
NULL, the function returnsDYAD_RC_FLUXFAILimmediately.If
flux_rpc_get_raw()fails witherrno==ENODATA, the producer has signalled end-of-stream andDYAD_RC_RPC_FINISHEDis returned. The caller should treat this as a normal termination condition rather than a hard error.If
flux_rpc_get_raw()fails for any other reason,DYAD_RC_BADRPCis returned.If buffer allocation fails,
*bufis set toNULLand*buflenis set to 0.
- Parameters:
ctx – [in] DYAD context. The stored Flux future is read from the Flux DTL handle.
buf – [out] Set to a newly allocated buffer containing the received file data on success. The caller must release it via
ctx->dtl_handle->return_buffer(). Set toNULLon failure.buflen – [out] Set to the number of bytes received on success. Set to 0 on failure.
- Return values:
DYAD_RC_OK – Data received and copied successfully.
DYAD_RC_FLUXFAIL – The stored Flux future is
NULL.DYAD_RC_RPC_FINISHED –
flux_rpc_get_raw()returnedENODATA, signalling end-of-stream.DYAD_RC_BADRPC –
flux_rpc_get_raw()failed for a reason other thanENODATA.other – Any error code returned by
get_buffer()on allocation failure.
- Returns:
dyad_rc_treturn code:
-
dyad_rc_t dyad_dtl_flux_close_connection(const dyad_ctx_t *ctx)
Closes the Flux RPC DTL data channel.
Clears the stored Flux future (
f) and request message (msg) pointers in the Flux DTL handle by setting them toNULL. Neither pointer is destroyed or freed here — both are non-owning references:The Flux future (
f) is owned by the caller that passed it todyad_dtl_flux_rpc_recv_response()and must be destroyed by that caller viaflux_future_destroy().The request message (
msg) is owned by the Flux broker and must not be freed by DYAD.
After this call the DTL handle is ready to be reused for a new RPC session by calling
rpc_pack()/rpc_unpack()again.- Parameters:
ctx – [in] DYAD context.
- Returns:
Always returns
DYAD_RC_OK.
-
dyad_rc_t dyad_dtl_flux_finalize(const dyad_ctx_t *ctx)
Finalizes and frees the Flux RPC DTL internal state.
Clears all pointers in the Flux DTL handle, frees the
dyad_dtl_fluxstruct, and sets the handle pointer toNULL. Ifctx->dtl_handleis alreadyNULLthe function is a no-op.The three stored pointers are cleared as follows before freeing:
h— set toNULL. The Flux handle is non-owning; it is borrowed fromctx->hand must not be closed here.f— set toNULL. The Flux future is non-owning; the caller that created it is responsible for destroying it viaflux_future_destroy().msg— set toNULL. The request message is non-owning; it is owned by the Flux broker and must not be freed here.
Note
This function only frees the
dyad_dtl_fluxstruct. The outerdyad_dtlhandle is freed bydyad_dtl_finalize(), which calls this function as part of the full teardown sequence.- Parameters:
ctx – [in] DYAD context.
ctx->dtl_handle->private_dtl.flux_dtl_handleis set toNULLon return.- Returns:
Always returns
DYAD_RC_OK.
Margo, Mercury and Mochi
Margo, Mercury, and libfabric form a layered communication stack used by DYAD’s Margo DTL backend.
Margo is a higher-level RPC library built on top of Mercury and Argobots. It integrates Mercury’s progress loop with Argobots user-level threads (ULTs) and execution streams (ESs), allowing RPC handlers to be written as straightforward blocking functions rather than callback chains.
Mercury is an RPC framework built on top of a pluggable Network
Abstraction (NA) layer. It provides serialization of RPC arguments and
RDMA bulk transfers via hg_bulk_t handles, supporting both push
(HG_BULK_PUSH) and pull (HG_BULK_PULL) transfer modes. Mercury
supports multiple NA plugins including libfabric and UCX, allowing it to
target a wide range of high-performance network fabrics. In DYAD’s Margo
DTL backend, the pull model is adopted — the producer registers its buffer
and notifies the consumer, which pulls the data directly from the producer’s
memory via HG_BULK_PULL.
Together these libraries are part of the Mochi project, an HPC ecosystem of composable microservices for data management and storage, developed under the DOE Exascale Computing Project.
libfabric (OFI) is a low-level network abstraction library that
provides portable access to high-performance network fabrics such as
InfiniBand (via ofi+verbs) and TCP (via ofi+tcp).
It is maintained by the OpenFabrics Alliance and used by Mercury as its
high-performance network transport layer.
Functions
-
static void data_ready_rpc(void *h)
Mercury/Margo RPC input structure for a data transfer request.
The Mercury macro
MERCURY_GEN_PROCgenerates the structure along with the serialization functions. Contains the size of the data to transfer and a bulk handle referencing the producer’s registered memory region for the RDMA pull.n— number of bytes to transfer.bulk— Mercury bulk handle to the producer’s send buffer.
Mercury/Margo RPC output structure for a data transfer response.
The Mercury macro
MERCURY_GEN_PROCgenerates the structure along with the serialization functions. Contains a return code sent back to the producer after the RDMA pull completes.ret— return code; 0 indicates success.
Margo RPC handler that pulls file data from the producer via RDMA.
Registered as the handler for the DYAD Margo data transfer RPC. Invoked on the consumer side when the producer calls
margo_forward()to initiate a data transfer. Performs the following steps:Retrieves the Margo instance and producer address from the RPC handle.
Looks up the
dyad_dtl_margo_thandle registered with the Margo instance viamargo_registered_data().Unpacks the input (
margo_rpc_in_t) to obtain the transfer size (n) and the producer’s bulk handle.Allocates a receive buffer of
nbytes and creates a local Mercury bulk handle withHG_BULK_WRITE_ONLYaccess.Performs an RDMA pull (
HG_BULK_PULL) from the producer’s bulk handle into the local buffer viamargo_bulk_transfer().Responds to the producer with
out.ret= 0 to signal completion, then frees the input and destroys the RPC handle.Sets
margo_handle->recv_ready= 1 to unblock the consumer thread waiting in a busy loop on that flag.
- Todo:
Replace
assert()calls with proper error handling that propagates failures back to the caller rather than aborting.
Note
All Mercury/Margo calls are checked with
assert(). This means any failure aborts the process rather than returning an error code. Error handling should be improved in a future revision.Note
The receive buffer allocated in step 4 is stored in
margo_handle->recv_bufferand must be freed by the caller after consuming the data.Note
DEFINE_MARGO_RPC_HANDLER()wraps this function to register it with the Margo runtime as a ULT (user-level thread) handler.- Parameters:
h – [in] Mercury RPC handle for the incoming request.
-
dyad_rc_t dyad_dtl_margo_get_buffer(const dyad_ctx_t *ctx, size_t data_size, void **data_buf)
Allocates a buffer for Margo DTL data transfer.
Allocates a buffer of
data_sizebytes viamalloc(). The function validatesdata_bufbefore allocation:If
data_bufisNULL, the caller passed an invalid output pointer andDYAD_RC_BADBUFis returned.If
*data_bufis non-NULL, a buffer is already present and overwriting it would cause a memory leak, soDYAD_RC_BADBUFis returned.
The allocated buffer must be released via
dyad_dtl_margo_return_buffer().Note
A
posix_memalign()path exists in the source but is disabled (#elsebranch,#if1 selectsmalloc()). Unlike the Flux RPC DTL which uses page-aligned allocation, the Margo DTL currently uses plainmalloc()since Margo manages its own RDMA memory registration separately viamargo_bulk_create().- Parameters:
ctx – [in] DYAD context.
data_size – [in] Number of bytes to allocate.
data_buf – [out] Must point to a
NULLpointer on entry. Set to the allocated buffer on success.
- Return values:
DYAD_RC_OK – Buffer allocated successfully.
DYAD_RC_BADBUF –
data_bufisNULLor*data_bufis already non-NULL.DYAD_RC_SYSFAIL –
malloc()failed to allocate the buffer.
- Returns:
dyad_rc_treturn code:
-
dyad_rc_t dyad_dtl_margo_return_buffer(const dyad_ctx_t *ctx, void **data_buf)
Releases a buffer previously allocated by
dyad_dtl_margo_get_buffer().Frees the buffer pointed to by
*data_buf. The function validatesdata_bufbefore freeing:If
data_bufisNULL, the caller passed an invalid pointer andDYAD_RC_BADBUFis returned.If
*data_bufisNULL, the buffer has already been freed or was never allocated, andDYAD_RC_BADBUFis returned.
- Todo:
Set
*data_buftoNULLafterfree()to prevent use-after-free:
free (*data_buf); *data_buf = NULL;
Note
*data_bufis not set toNULLafter freeing. The caller is responsible for nulling the pointer to prevent use-after-free (see TODO).- Parameters:
ctx – [in] DYAD context.
data_buf – [inout] Pointer to the buffer to free.
*data_bufmust be non-NULLon entry.
- Return values:
DYAD_RC_OK – Buffer freed successfully.
DYAD_RC_BADBUF –
data_bufisNULLor*data_bufisNULL.
- Returns:
dyad_rc_treturn code:
-
static dyad_rc_t validate_margo_protocol(const dyad_ctx_t *ctx, const char *protocol)
Validates that a Mercury/libfabric network protocol is available on the current system.
Calls
NA_Get_protocol_info()withprotocolto check whether it is supported by the installed libfabric providers.If the protocol is available, logs the class and device names at debug level, frees the protocol info via
NA_Free_protocol_info(), and returnsDYAD_RC_OK.If the protocol is not available, logs an error and attempts a second call to
NA_Get_protocol_info()withNULLto enumerate all available protocols. Each available protocol is logged at debug level to help the user identify a valid alternative. The protocol info is freed andDYAD_RC_MARGO_BAD_PROTOis returned regardless of whether the enumeration succeeds.If the enumeration call also fails or returns no protocols, logs a message indicating that no NA protocol is available at all and returns
DYAD_RC_MARGO_BAD_PROTO.This function is called during Margo DTL initialization to fail fast with a descriptive error message rather than letting
margo_init()fail with a cryptic error.- Parameters:
ctx – [in] DYAD context.
protocol – [in] Mercury/NA protocol string to validate, e.g.
"ofi+tcp"or"ofi+verbs". IfNULL, behavior is undefined — callers should always pass a non-NULLstring.
- Return values:
DYAD_RC_OK – The protocol is available on this system.
DYAD_RC_MARGO_BAD_PROTO – The protocol is not available, or
NA_Get_protocol_info()failed.
- Returns:
dyad_rc_treturn code:
-
dyad_rc_t dyad_dtl_margo_init(const dyad_ctx_t *ctx, dyad_dtl_mode_t mode, dyad_dtl_comm_mode_t comm_mode, bool debug)
Initializes the Margo DTL internal state.
Allocates and populates the
dyad_dtl_margointernal state struct, initializes the Margo instance, registers thedata_ready_rpcRPC, and wires all function pointers inctx->dtl_handleto their Margo implementations:rpc_pack→dyad_dtl_margo_rpc_packrpc_unpack→dyad_dtl_margo_rpc_unpackrpc_respond→dyad_dtl_margo_rpc_respondrpc_recv_response→dyad_dtl_margo_rpc_recv_responseget_buffer→dyad_dtl_margo_get_bufferreturn_buffer→dyad_dtl_margo_return_bufferestablish_connection→dyad_dtl_margo_establish_connectionsend→dyad_dtl_margo_sendrecv→dyad_dtl_margo_recvclose_connection→dyad_dtl_margo_close_connection
The Mercury/NA protocol is read from the
DYAD_MARGO_PROTOenvironment variable (DYAD_MARGO_PROTO_ENV). If not set, it defaults to"ofi+tcp". The protocol is validated viavalidate_margo_protocol()beforemargo_init()is called.Margo is initialized differently depending on
comm_mode:DYAD_COMM_SEND(producer / Flux broker side): Margo is initialized inMARGO_CLIENT_MODEwith no dedicated progress ES and no RPC handler ES. Thedata_ready_rpcRPC is registered without a handler (NULL), since the producer only sends RPCs.DYAD_COMM_RECV(consumer / client wrapper side): Margo is initialized inMARGO_SERVER_MODEwith a dedicated Execution Stream (ES) for the Mercury progress loop (use_progress_thread=1) and RPC handlers running in that same ES (rpc_thread_count=-1). The RPC named"data_ready_rpc"is registered withdata_ready_rpc()as its handler function, and themargo_handleis registered a auxiliary data accessible to the handler viamargo_registered_data().
Both modes retrieve their own local Margo address via
margo_addr_self()and initializeremote_addrtoNULL(set during connection establishment).
On any error,
dyad_dtl_margo_finalize()is called to clean up partially initialized state before returning.- Todo:
Remove
modeparameter — it is already stored inctx->dtl_handle->modebydyad_dtl_init()before this function is called.
Note
RPC handlers run in the same Argobots Execution Stream (ES) as the Mercury progress loop (
rpc_thread_count=-1), meaning no additional ES is created for handler execution. This is safe because the consumer blocks in a busy-wait loop onmargo_handle->recv_readyuntildata_ready_rpc()signals completion — there is no concurrent work that could be starved by sharing the progress loop ES with the handler.- Parameters:
ctx – [in] DYAD context.
ctx->dtl_handlemust already be allocated bydyad_dtl_init().mode – [in] DTL mode (must be
DYAD_DTL_MARGO). Redundant sincedyad_dtl_init()already stores the mode inctx->dtl_handle->modebefore dispatch (see TODO).comm_mode – [in] Communication direction.
DYAD_COMM_SENDinitializes Margo as a client (producer side);DYAD_COMM_RECVinitializes Margo as a server (consumer side).debug – [in] If
true, enables verbose debug logging.
- Return values:
DYAD_RC_OK – Initialization succeeded.
DYAD_RC_SYSFAIL – Failed to allocate the
dyad_dtl_margostruct.DYAD_RC_MARGO_BAD_PROTO – The configured Mercury/NA protocol is not available on this system.
DYAD_RC_MARGOINIT_FAIL –
margo_init()failed, or any other error occurred during initialization.
- Returns:
dyad_rc_treturn code:
-
dyad_rc_t dyad_dtl_margo_rpc_pack(const dyad_ctx_t *ctx, const char *upath, uint32_t producer_rank, void ***packed_obj)
Packs a file fetch request into a JSON object for a Margo RPC call.
Creates a Jansson JSON object containing the fields needed by the producer to locate the file and establish a Margo connection back to the consumer:
"upath"— relative path of the file to fetch."tag_prod"— Flux rank of the producer broker, used to route the Flux RPC to the correct broker."pid_cons"— process ID of the consumer, used by the producer to identify the requesting process."addr"— Margo address string of the consumer’s Margo server, obtained viamargo_addr_to_string(). The producer uses this address to establish an RDMA connection back to the consumer to push the file data.
Unlike the Flux RPC backend which only packs
upath, the Margo backend must also include the consumer’s Margo server address since the data transfer is initiated by the producer connecting back to the consumer (RDMA push model), rather than the consumer pulling from the producer.The caller is responsible for decrementing the reference count of
packed_objviajson_decref()when it is no longer needed.- Parameters:
ctx – [in] DYAD context.
upath – [in] Relative path of the file to fetch.
producer_rank – [in] Flux rank of the producer broker.
packed_obj – [out] Set to the allocated JSON object on success. Undefined on failure.
- Return values:
DYAD_RC_OK – The JSON object was created successfully.
DYAD_RC_BADPACK –
json_pack()failed to create the object.
- Returns:
dyad_rc_treturn code:
-
dyad_rc_t dyad_dtl_margo_rpc_unpack(const dyad_ctx_t *ctx, const void **msg, char **upath)
Unpacks a file fetch request from an incoming Flux RPC message and resolves the consumer’s Margo address.
Extracts the following fields from the JSON payload of
msgusingflux_request_unpack():"upath"— relative path of the requested file."tag_prod"— Flux rank of the producer broker (extracted but not used on the producer side)."pid_cons"— process ID of the consumer (extracted but not used on the producer side)."addr"— Margo address string of the consumer’s Margo server.
After successful unpacking, resolves the consumer’s Margo address string to a
hg_addr_tviamargo_addr_lookup()and stores it inmargo_handle->remote_addr. This address is used bydyad_dtl_margo_send()to establish the RDMA connection back to the consumer.Note
upathis owned by the Flux messagemsgand must not be freed by the caller. It remains valid only for the lifetime ofmsg.Note
tag_prodandpid_consare unpacked for protocol completeness but are not used on the producer side. They may be useful for debugging or future extensions.Note
Unlike the Flux RPC backend, the Margo backend does not store
msgin the DTL handle after unpacking. In the Flux RPC backend,msgis stored because it is reused across multipleflux_respond_raw()calls on the same streaming RPC. This backend does not use streaming RPC —msgis only needed to extract the initial request fields, and the data transfer proceeds over a separate Margo RDMA channel independently of the Flux message.- Parameters:
ctx – [in] DYAD context.
msg – [in] Incoming Flux RPC message containing the JSON payload packed by
dyad_dtl_margo_rpc_pack().upath – [out] Set to the relative path of the requested file, extracted from the message payload. Valid for the lifetime of
msg.
- Return values:
DYAD_RC_OK – Unpacking and address resolution succeeded.
DYAD_RC_BADUNPACK –
flux_request_unpack()failed to extract the required fields from the message.
- Returns:
dyad_rc_treturn code:
-
dyad_rc_t dyad_dtl_margo_rpc_respond(const dyad_ctx_t *ctx, const void **orig_msg)
Sends the initial RPC acknowledgement from the service to the consumer.
No-op for the Margo DTL. Unlike the Flux RPC backend, the Margo data transfer is driven by the producer connecting back to the consumer’s Margo server directly via RDMA, so no explicit acknowledgement over the Flux RPC channel is needed before data transfer begins.
- Parameters:
ctx – [in] DYAD context.
orig_msg – [in] Unused by this backend.
- Returns:
Always returns
DYAD_RC_OK.
-
dyad_rc_t dyad_dtl_margo_rpc_recv_response(const dyad_ctx_t *ctx, void **f)
Receives the initial RPC response from the service.
No-op for the Margo DTL. The consumer does not need to process a Flux RPC response before data transfer begins — it waits directly on
margo_handle->recv_ready, which is set bydata_ready_rpc()after the RDMA pull completes.- Parameters:
ctx – [in] Unused by this backend.
f – [in] Unused by this backend.
- Returns:
Always returns
DYAD_RC_OK.
-
dyad_rc_t dyad_dtl_margo_establish_connection(const dyad_ctx_t *ctx)
Establishes the Margo DTL data channel.
No-op for the Margo DTL. The consumer’s Margo address is resolved to an
hg_addr_tduringdyad_dtl_margo_rpc_unpack()and stored inmargo_handle->remote_addr. The producer connects directly to this address indyad_dtl_margo_send()viamargo_create()andmargo_forward()without a separate connection setup step.Endpoint caching is not needed since each producer-consumer pair performs a single data transfer per file fetch — the connection is created, used, and destroyed within a single
dyad_dtl_margo_send()call.- Parameters:
ctx – [in] DYAD context.
- Returns:
Always returns
DYAD_RC_OK.
-
dyad_rc_t dyad_dtl_margo_send(const dyad_ctx_t *ctx, void *buf, size_t buflen)
Sends file data to the consumer via Margo RDMA.
Registers
bufas a read-only Mercury bulk handle viamargo_bulk_create(), then sends an RPC to the consumer’s Margo server atmargo_handle->remote_addr(resolved duringdyad_dtl_margo_rpc_unpack()) viamargo_forward(). The RPC payload contains the bulk handle and the buffer size, allowing the consumer’sdata_ready_rpc()handler to perform an RDMA pull from the producer’s registered buffer.margo_forward()blocks until the consumer responds, confirming that the RDMA pull is complete. The producer then frees the RPC output and destroys the handle.- Todo:
Add error checking for
margo_bulk_create(),margo_create(),margo_forward(), andmargo_get_output()— all currently unchecked.
Note
Unlike the Flux RPC backend where the producer needs the original request message (
flux_msg_t) as a reply address to routeflux_respond_raw()back to the correct consumer, this Margo-based backend uses an RDMA pull model. Duringdyad_dtl_margo_init(), the consumer initializes its Margo instance inMARGO_SERVER_MODEso that the producer can connect back to it. The consumer then embeds its own Margo address as a string in the Flux RPC request payload viadyad_dtl_margo_rpc_pack(). The producer extracts and resolves this address to anhg_addr_tviamargo_addr_lookup()duringdyad_dtl_margo_rpc_unpack(), storing it inmargo_handle->remote_addr. This address is used here to connect back to the consumer and trigger the RDMA pull, without needing the originalflux_msg_t.- Parameters:
ctx – [in] DYAD context.
margo_handle->remote_addrmust already be set bydyad_dtl_margo_rpc_unpack().buf – [in] Buffer containing the file data to send.
buflen – [in] Number of bytes in
buf.
- Return values:
DYAD_RC_OK – Always returned. Error handling for Margo calls is not yet implemented (see TODO).
- Returns:
dyad_rc_treturn code:
-
dyad_rc_t dyad_dtl_margo_recv(const dyad_ctx_t *ctx, void **buf, size_t *buflen)
Receives file data from the producer via the Margo DTL.
Busy-waits on
margo_handle->recv_ready, sleeping 100 microseconds between checks, untildata_ready_rpc()sets it to 1 after completing the RDMA pull from the producer. Once the data is ready, allocates a new buffer, copies the received data frommargo_handle->recv_buffer, and frees the buffer allocated bydata_ready_rpc().After copying, the Margo handle state is reset for the next transfer:
recv_bufferis freed and set toNULL.recv_lenis reset to 0.recv_readyis reset to 0.
- Todo:
Replace the busy-wait loop with a more efficient synchronization mechanism such as an Argobots eventual or condition variable.
- Todo:
Add error checking for
malloc()failure.
Note
The data flow for Margo receive is inverted compared to the Flux RPC backend. In the Flux RPC backend the producer pushes data by calling
flux_respond_raw(), and the consumer reads it viaflux_rpc_get_raw(). In this Margo-based backend the producer registers its buffer and notifies the consumer’s Margo server, which performs an RDMA pull intomargo_handle->recv_bufferviadata_ready_rpc(). The consumer then copies from that buffer here. The actual data movement therefore happens indata_ready_rpc()running on the progress loop ES, not in this function.Note
The busy-wait with
usleep(100)is a simple polling approach. A condition variable or Argobots synchronization primitive would be more efficient but would require additional coordination with the Margo progress loop ES (see TODO).- Parameters:
ctx – [in] DYAD context.
buf – [out] Set to a newly allocated buffer containing a copy of the received file data. The caller must free this buffer via
ctx->dtl_handle->return_buffer().buflen – [out] Set to the number of bytes received.
- Returns:
Always returns
DYAD_RC_OK. Error handling for allocation failure and copy errors is not yet implemented (see TODO).
-
dyad_rc_t dyad_dtl_margo_close_connection(const dyad_ctx_t *ctx)
Closes the Margo DTL data channel.
No-op for the Margo DTL. The RPC handle created in
dyad_dtl_margo_send()is destroyed within that function viamargo_destroy()immediately after the transfer completes. No persistent connection state is maintained between transfers since endpoint caching is not needed for DYAD’s single-transfer per file fetch usage pattern.- Todo:
The original implementation intended to perform different teardown depending on
comm_mode(producer vs consumer). This was never implemented and the code was commented out. Revisit whether comm_mode-specific teardown is needed, for example to freemargo_handle->remote_addron the producer side viamargo_addr_free().
- Parameters:
ctx – [in] DYAD context.
- Returns:
Always returns
DYAD_RC_OK.
-
dyad_rc_t dyad_dtl_margo_finalize(const dyad_ctx_t *ctx)
Finalizes and frees the Margo DTL internal state.
Releases all resources associated with the Margo DTL in the following order:
If
margo_handle->midis a valid Margo instance, frees the local Margo address viamargo_addr_free().If
margo_handle->remote_addris non-NULL, frees the remote address (the consumer’s resolved Margo server address) viamargo_addr_free().Finalizes the Margo instance via
margo_finalize(), which shuts down the Mercury progress loop and any associated Argobots ESs.Frees the
dyad_dtl_margostruct and sets the handle pointer toNULL.
If
ctx->dtl_handleisNULLorctx->dtl_handle->private_dtl.margo_dtl_handleisNULL, the function is a no-op and returnsDYAD_RC_OK. This allowsdyad_dtl_margo_finalize()to be called safely on a partially initialized or already-finalized context, for example whendyad_dtl_margo_init()fails partway through and calls this function to clean up.Note
This function only frees the
dyad_dtl_margostruct. The outerdyad_dtlhandle is freed bydyad_dtl_finalize(), which calls this function as part of the full teardown sequence.- Parameters:
ctx – [in] DYAD context. On return,
ctx->dtl_handle->private_dtl.margo_dtl_handleisNULL.- Returns:
Always returns
DYAD_RC_OK.
UCX
The UCX (Unified Communication X) DTL backend provides high-performance inter-node data movement using one-sided RDMA. UCX is an open-source communication library that abstracts over multiple high-speed network fabrics including InfiniBand, RoCE, and shared memory, exposing a unified API for remote memory access operations.
In DYAD’s UCX backend, the producer pushes file data directly into the
consumer’s pre-registered memory buffer using ucp_put_nbx(), a
non-blocking one-sided RDMA put operation. Unlike the Margo backend
which uses a pull model (HG_BULK_PULL), the UCX backend uses a push
model — the producer writes directly to the consumer’s memory without
the consumer needing to initiate the transfer.
Before any data transfer can occur, the consumer pre-allocates and
registers an RDMA buffer with UCX via ucp_mem_map() during
initialization. The consumer’s buffer address and the associated remote
key (ucp_rkey_t) are packed into the Flux RPC request payload
(base64-encoded) and sent to the producer. The producer decodes these
fields, unpacks the remote key via ucp_ep_rkey_unpack(), and uses
them to perform the RDMA put directly into the consumer’s buffer without
any intermediate copy.
To avoid the cost of repeated endpoint creation, the UCX backend
maintains a per-producer endpoint cache (ucx_ep_cache_h) that maps
consumer connection keys to ucp_ep_h endpoints. An endpoint is
created on the first transfer to a given consumer and reused for all
subsequent transfers to the same consumer within the same job, amortizing
the connection establishment overhead across multiple file fetches.
The consumer detects the arrival of data by polling the first
sizeof(ssize_t) bytes of its RDMA buffer, which the producer
prepends with the file size before initiating the put. This sentinel
value is initialized to zero at the start of each transfer and becomes
non-zero once the producer begins writing, allowing the consumer to busy-
wait without a UCX request handle since one-sided RDMA puts do not
notify the target.
Defines
-
UCX_MAX_TRANSFER_SIZE
Maximum data size for a single UCX transfer (4 GiB).
UCX tag send/receive operations are limited to 4 GiB per transfer. Files larger than this limit must be split into multiple transfers.
-
DYAD_UCX_TAG_MASK
Tag mask used for UCX tag send/receive operations.
Set to
UINT64_MAXto match any tag, effectively disabling tag filtering. DYAD uses a single tag per transfer so no filtering is needed.
Typedefs
-
typedef struct ucx_request dyad_ucx_request_t
Functions
-
static void dyad_ucx_request_init(void *request)
Initializes a
dyad_ucx_request_tallocated by UCX.Registered with UCX via the
request_initfield ofucp_params_tduringdyad_dtl_ucx_init(). Called automatically by UCX each time it allocates a new request object from its internal memory pool. Setscompletedto 0 so thatdyad_ucx_request_wait()can detect when the operation has finished.- Parameters:
request – [in] Pointer to the uninitialized request memory allocated by UCX. Cast to
dyad_ucx_request_t*.
-
static void dyad_send_callback(void *req, int status, void *ctx)
UCX send completion callback.
Registered as the completion callback for UCX tag send operations. Called by UCX when a send operation completes, either successfully or with an error. Sets
real_req->completedto 1 so thatdyad_ucx_request_wait()can detect completion and exit its polling loop.The function signature differs between UCX API versions:
UCX >= 1.10: receives an additional
ctxpointer argument.UCX < 1.10: receives only
reqandstatus.
- Parameters:
req – [in] Pointer to the
dyad_ucx_request_tfor the completed operation.status – [in] Final status of the send operation.
ctx – [in] User context pointer (UCX >= 1.10 only, unused).
-
static int dyad_ucx_request_wait(const dyad_ctx_t *ctx, dyad_ucx_request_t *request)
Waits for an async UCX operation to complete.
Handles the three possible return values of a UCX communication operation:
Immediate success (
request==UCS_OK): the operation completed synchronously. ReturnsUCS_OKimmediately.Request handle (
UCS_PTR_IS_PTR(request)): the operation is in progress. Spins by callingucp_worker_progress()and pollingucp_request_check_status()until the status is no longerUCS_INPROGRESS, then frees the request viaucp_request_free()and returns the final status.Immediate error (
UCS_PTR_IS_ERR(request)): the operation failed immediately. Extracts theucs_status_terror code viaUCS_PTR_STATUS()and returns it.
The spin loop is expected to run only a small number of iterations because prior UCX calls in the send/receive path are structured to minimize the size of the worker’s event queue before this wait is called.
- Parameters:
ctx – [in] DYAD context. Used to access the UCX worker via
ctx->dtl_handle->private_dtl.ucx_dtl_handle->ucx_worker.request – [in] Return value of a UCX communication operation (e.g.
ucp_tag_send_nbx()orucp_tag_recv_nbx()). May beUCS_OK, a request handle, or a UCX error pointer.
- Return values:
UCS_OK – The operation completed successfully.
other – A UCX error code indicating failure.
- Returns:
ucs_status_tfinal status of the operation:
-
static dyad_rc_t ucx_allocate_buffer(const dyad_ctx_t *ctx, dyad_dtl_ucx_t *dtl_handle, dyad_dtl_comm_mode_t comm_mode)
Allocates and registers a UCX memory buffer for RDMA operations.
Allocates a host memory buffer of
dtl_handle->max_transfer_size+sizeof(size_t)bytes usingucp_mem_map()withUCP_MEM_MAP_ALLOCATE, which lets UCX allocate and register the memory in one step. The extrasizeof(size_t)bytes prepend the file size to the buffer so the consumer can determine the data boundary without an additional RDMA operation.The memory protection flags differ by communication direction:
DYAD_COMM_SEND(producer):UCP_MEM_MAP_PROT_LOCAL_READ— the producer only reads from the buffer locally to send data.DYAD_COMM_RECV(consumer):UCP_MEM_MAP_PROT_REMOTE_WRITE— the buffer is exposed for remote write so the producer can push data into it via RDMA.
After mapping, queries the actual allocated address via
ucp_mem_query()and stores it indtl_handle->net_bufanddtl_handle->cons_buf_ptr. Then packs the remote key for the registered memory region viaucp_rkey_pack(), storing the packed key indtl_handle->rkey_bufand its size indtl_handle->rkey_size. The remote key is later sent to the producer (encoded in base64) so it can perform the RDMA push into the consumer’s buffer.On any failure after a successful
ucp_mem_map(), the memory is unmapped viaucp_mem_unmap()before returning.- Parameters:
ctx – [in] DYAD context. Used for logging.
dtl_handle – [inout] UCX DTL internal state. On success,
net_buf,cons_buf_ptr,mem_handle,rkey_buf, andrkey_sizeare populated.comm_mode – [in] Communication direction. Controls the memory protection flags. Must not be
DYAD_COMM_NONE.
- Return values:
DYAD_RC_OK – Buffer allocated and registered successfully.
DYAD_RC_NOCTX –
dtl_handle->ucx_ctxisNULL.DYAD_RC_BAD_COMM_MODE –
dtl_handle->comm_modeisDYAD_COMM_NONE.DYAD_RC_UCXMMAP_FAIL –
ucp_mem_map()orucp_mem_query()failed.DYAD_RC_UCXRKEY_PACK_FAILED –
ucp_rkey_pack()failed.DYAD_RC_BADBUF – Default error before any specific check is reached.
- Returns:
dyad_rc_treturn code:
-
static dyad_rc_t ucx_free_buffer(const dyad_ctx_t *ctx, void *ucp_ctx, void *mem_handle, void **buf)
Releases a UCX RDMA-registered memory buffer.
Unmaps and deregisters the memory region identified by
mem_handleviaucp_mem_unmap(), which both frees the UCX memory registration and releases the underlying memory allocated byucp_mem_map()withUCP_MEM_MAP_ALLOCATE. Sets*buftoNULLafter unmapping.Validates all three inputs before proceeding:
If
ucp_ctxisNULL, returnsDYAD_RC_NOCTX.If
mem_handleisNULL, returnsDYAD_RC_UCXMMAP_FAIL.If
bufor*bufisNULL, returnsDYAD_RC_BADBUF.
Note
Unlike
dyad_dtl_flux_return_buffer()anddyad_dtl_margo_return_buffer()which usefree(), this function usesucp_mem_unmap()because the buffer was allocated and registered by UCX viaucp_mem_map()withUCP_MEM_MAP_ALLOCATE. Callingfree()directly on a UCX-managed buffer would bypass UCX’s memory registration and cause undefined behavior.- Parameters:
ctx – [in] DYAD context. Used for logging.
ucp_ctx – [in] UCX context used to unmap the memory. Must not be
NULL.mem_handle – [in] UCX memory handle returned by
ucp_mem_map(). Must not beNULL.buf – [inout] Pointer to the buffer to release.
*bufmust be non-NULLon entry. Set toNULLon success.
- Return values:
DYAD_RC_OK – Buffer unmapped and released successfully.
DYAD_RC_NOCTX –
ucp_ctxisNULL.DYAD_RC_UCXMMAP_FAIL –
mem_handleisNULL.DYAD_RC_BADBUF –
bufor*bufisNULL.
- Returns:
dyad_rc_treturn code:
-
static inline void *ucx_send_no_wait(const dyad_ctx_t *ctx, bool is_warmup, void *buf, size_t buflen)
Initiates a non-blocking UCX RDMA push operation.
Performs a one-sided RDMA put of
bufinto the consumer’s pre-registered memory buffer usingucp_put_nbx(). The operation proceeds in three steps:Unpacks the consumer’s remote key from
dtl_handle->rkey_bufviaucp_ep_rkey_unpack(), binding it to the endpointdtl_handle->ep. The unpacked key is stored indtl_handle->rkey.Initiates the RDMA put via
ucp_put_nbx()withdyad_send_callbackregistered as the completion callback. The destination address isdtl_handle->cons_buf_ptr, which points to the consumer’s pre-registered RDMA buffer (set during connection establishment).Returns the
ucs_status_ptr_tfromucp_put_nbx()without waiting for completion. The caller must pass the returned pointer todyad_ucx_request_wait()to block until the operation finishes.
If the endpoint is
NULLorucp_ep_rkey_unpack()fails, the function returns aucs_status_ptr_tencodingUCS_ERR_NOT_CONNECTEDwithout initiating the put.Note
The
is_warmupparameter is accepted for interface consistency with the receive path (ucx_recv_no_wait()) but is not used — no warmup distinction is made on the send path.Note
This function confirms the push RDMA model used by the UCX backend: the producer pushes data directly into the consumer’s pre-registered memory buffer via
ucp_put_nbx(), in contrast to the Margo backend which uses a pull model where the consumer pulls from the producer’s buffer viaHG_BULK_PULL.- Parameters:
ctx – [in] DYAD context. The UCX endpoint, remote key buffer, and consumer buffer pointer are read from the UCX DTL internal state.
is_warmup – [in] Unused. Accepted for interface consistency with the receive path.
buf – [in] Local buffer containing the data to send.
buflen – [in] Number of bytes to send.
- Return values:
UCS_OK – The operation completed immediately and successfully (returned as a status, not a pointer).
valid – pointer The operation is in progress. Pass to
dyad_ucx_request_wait()to wait for completion.UCS_ERR_NOT_CONNECTED – The endpoint is
NULL,ucp_ep_rkey_unpack()failed, orucp_put_nbx()returned an error.
- Returns:
ucs_status_ptr_t:
-
static inline void *ucx_recv_no_wait(const dyad_ctx_t *ctx, bool is_warmup, void **buf, size_t *buflen)
Waits for incoming RDMA data by polling the consumer’s pre-registered buffer.
Busy-polls the first
sizeof(ssize_t)bytes ofdtl_handle->net_buf, which the producer prepends with the file size before initiating the RDMA push viaucp_put_nbx(). The consumer spins until this value becomes non-zero, indicating that the producer has started writing data into the buffer.On each iteration the UCX worker is progressed via
ucp_worker_progress()to process incoming network events, and the thread sleeps for 10 microseconds to avoid saturating the CPU.This approach works because the UCX backend uses a push RDMA model — the producer writes directly into the consumer’s pre-registered memory buffer (
dtl_handle->net_buf) viaucp_put_nbx(). The prepended file size acts as a sentinel: once the producer has initiated the RDMA put, the first bytes of the buffer are non-zero, signalling to the consumer that data is arriving.- Todo:
Replace the busy-poll with a more efficient notification mechanism. The current approach wastes CPU cycles and adds latency from the 10-microsecond sleep. UCX Active Messages or a lightweight atomic flag could provide lower-latency completion notification.
Note
Unlike
dyad_ucx_request_wait()which polls a UCX request handle for a specific operation, this function polls the buffer contents directly. This is necessary because the consumer has no UCX request handle for the producer’sucp_put_nbx()call — the put is one-sided and the consumer is not notified by UCX when it completes.Note
The
bufandbuflenoutput parameters are accepted for interface consistency but are not populated by this function. The actual buffer pointer and length are read fromdtl_handle->net_bufand the prepended size field by the caller (dyad_dtl_ucx_recv()) after this function returns.Note
The
is_warmupparameter controls whether this is a warmup iteration used to prime the RDMA connection before the actual data transfer. During warmup the buffer sentinel check still applies but the received data is discarded by the caller.- Parameters:
ctx – [in] DYAD context. The UCX worker and network buffer are read from the UCX DTL internal state.
is_warmup – [in] If
true, this is a warmup receive used to prime the RDMA connection.buf – [out] Unused by this function. Populated by the caller after return.
buflen – [out] Unused by this function. Populated by the caller after return.
- Returns:
Always returns
NULL. The actual data is read directly fromdtl_handle->net_bufby the caller.
-
static dyad_rc_t ucx_warmup(const dyad_ctx_t *ctx)
Performs a UCX connection warmup by sending a 1-byte message to self.
Warms up the UCX RDMA connection by performing a loopback send from the local worker to itself. This primes the UCX connection machinery — endpoint creation, remote key exchange, and RDMA registration — so that the first real data transfer does not pay the full connection establishment cost.
The warmup sequence is:
Allocates a 1-byte UCX-registered send buffer via
dyad_dtl_ucx_get_buffer()and a plain 1-byte receive buffer viamalloc().Connects to the local worker’s own address via
ucx_connect(), creating a loopback endpoint.Initiates a non-blocking 1-byte RDMA put to self via
ucx_send_no_wait()withis_warmup=true.Waits for the send to complete via
dyad_ucx_request_wait().Disconnects from self via
ucx_disconnect()and frees both buffers.
On any failure, both buffers are freed and the appropriate error code is returned before cleanup.
Note
The receive buffer is allocated with
malloc()rather thandyad_dtl_ucx_get_buffer()because the warmup receive is not a real RDMA operation — the loopback send writes into the UCX-registered send buffer itself, and the plain receive buffer is only allocated to mirror the real transfer path without being used.Note
The warmup is only performed on the producer side during
dyad_dtl_ucx_init(). The consumer side does not need a warmup because it waits passively for the producer to initiate the connection.- Parameters:
ctx – [in] DYAD context. The UCX worker, local address, and endpoint are read from and written to the UCX DTL internal state.
- Return values:
DYAD_RC_OK – Warmup completed successfully.
DYAD_RC_SYSFAIL – Failed to allocate the plain receive buffer via
malloc().DYAD_RC_UCXCOMM_FAIL – The warmup send operation failed.
other – Any error code returned by
dyad_dtl_ucx_get_buffer()orucx_connect().
- Returns:
dyad_rc_treturn code:
-
dyad_rc_t dyad_dtl_ucx_init(const dyad_ctx_t *ctx, dyad_dtl_mode_t mode, dyad_dtl_comm_mode_t comm_mode, bool debug)
Initializes the UCX DTL internal state.
Allocates and populates the
dyad_dtl_ucxinternal state struct, initializes the UCX context and worker, allocates the RDMA-registered buffer, performs a connection warmup, and wires all function pointers inctx->dtl_handleto their UCX implementations:rpc_pack→dyad_dtl_ucx_rpc_packrpc_unpack→dyad_dtl_ucx_rpc_unpackrpc_respond→dyad_dtl_ucx_rpc_respondrpc_recv_response→dyad_dtl_ucx_rpc_recv_responseget_buffer→dyad_dtl_ucx_get_bufferreturn_buffer→dyad_dtl_ucx_return_bufferestablish_connection→dyad_dtl_ucx_establish_connectionsend→dyad_dtl_ucx_sendrecv→dyad_dtl_ucx_recvclose_connection→dyad_dtl_ucx_close_connection
Initialization proceeds in the following order:
Allocates the
dyad_dtl_ucxstruct and initializes all fields to safe defaults (NULL, 0, orUCX_MAX_TRANSFER_SIZE). The Flux handle is borrowed fromctx->has a non-owning pointer.Reads the UCX configuration via
ucp_config_read().Initializes the UCX context via
ucp_init()with the following features enabled:UCP_FEATURE_RMA— Remote Memory Access for RDMA push.UCP_FEATURE_AMO32— 32-bit atomic memory operations.UCP_FEATURE_TAG— Tag-matching send/receive. The request size is set tosizeof(dyad_ucx_request_t)anddyad_ucx_request_initis registered as the request initializer. Ifdebugistrue, the UCX configuration is printed tostderrbefore the config is released.
Creates a UCX worker via
ucp_worker_create()withUCS_THREAD_MODE_SERIALIZED— all UCX calls must be made from a single thread at a time.Queries the worker’s local address via
ucp_worker_get_address(), storing it indtl_handle->local_address. This address is sent to the remote peer during connection establishment so the peer can create an endpoint back to this worker.Initializes the endpoint cache via
dyad_ucx_ep_cache_init(). The cache storesucp_ep_hendpoints keyed by remote worker address to avoid recreating endpoints for repeated transfers to the same peer.Allocates and registers an RDMA buffer of
UCX_MAX_TRANSFER_SIZE+sizeof(size_t)bytes viaucx_allocate_buffer(). The extrasizeof(size_t)bytes hold a prepended file size sentinel used by the consumer to detect when the producer has started the RDMA push.Wires all DTL function pointers.
Performs a loopback connection warmup via
ucx_warmup()to prime the UCX connection machinery before the first real transfer. After warmup,dtl_handle->epis reset toNULL.
On any error,
dyad_dtl_ucx_finalize()is called to clean up partially initialized state before returning.- Todo:
Remove
modeparameter — it is already stored inctx->dtl_handle->modebydyad_dtl_init()before this function is called.
- Parameters:
ctx – [in] DYAD context.
ctx->dtl_handlemust already be allocated bydyad_dtl_init().mode – [in] DTL mode (must be
DYAD_DTL_UCX). Redundant sincedyad_dtl_init()already stores the mode inctx->dtl_handle->modebefore dispatch (see TODO).comm_mode – [in] Communication direction. Controls the memory protection flags for the RDMA buffer:
DYAD_COMM_SEND(producer) usesUCP_MEM_MAP_PROT_LOCAL_READ;DYAD_COMM_RECV(consumer) usesUCP_MEM_MAP_PROT_REMOTE_WRITE.debug – [in] If
true, prints the UCX configuration tostderrand enables verbose debug logging.
- Return values:
DYAD_RC_OK – Initialization succeeded.
DYAD_RC_SYSFAIL – Failed to allocate the
dyad_dtl_ucxstruct.DYAD_RC_UCXINIT_FAIL – Any other initialization step failed, including
ucp_config_read(),ucp_init(),ucp_worker_create(), endpoint cache initialization, buffer allocation, or warmup.
- Returns:
dyad_rc_treturn code:
-
dyad_rc_t dyad_dtl_ucx_rpc_pack(const dyad_ctx_t *ctx, const char *upath, uint32_t producer_rank, void ***packed_obj)
Packs a file fetch request into a JSON object for a UCX RPC call.
Creates a Jansson JSON object containing all information the producer needs to locate the file and perform an RDMA push into the consumer’s pre-registered buffer. Before packing, resets the first
sizeof(ssize_t)bytes ofdtl_handle->net_bufto zero so thatucx_recv_no_wait()can use this as a sentinel to detect when the producer has started writing.The packed JSON object contains the following fields:
"upath"— relative path of the file to fetch."tag_prod"— Flux rank of the producer broker."cons_buf"— consumer’s RDMA buffer address (cons_buf_ptr) encoded as a decimal string. The producer uses this as the remote destination address forucp_put_nbx()."pid_cons"— process ID of the consumer."addr"— consumer’s UCX worker address, base64-encoded using RFC 4648. The producer callsucp_worker_get_address()on this to create an endpoint back to the consumer."rkey"— consumer’s UCX remote key (rkey_buf), base64-encoded using RFC 4648. The producer callsucp_ep_rkey_unpack()on this to obtain theucp_rkey_tneeded forucp_put_nbx().
Both the UCX worker address and the remote key are opaque binary blobs that cannot be embedded directly in JSON. They are base64-encoded (RFC 4648) before packing and decoded by the producer in
dyad_dtl_ucx_rpc_unpack(). The encoded buffers are freed afterjson_pack()copies them into the JSON object.Note
The UCX backend packs significantly more information than the Flux RPC or Margo backends because RDMA push requires the producer to know the consumer’s exact memory address and access credentials before initiating the transfer.
- Parameters:
ctx – [in] DYAD context.
upath – [in] Relative path of the file to fetch.
producer_rank – [in] Flux rank of the producer broker.
packed_obj – [out] Set to the allocated JSON object on success. Undefined on failure. The caller is responsible for decrementing the reference count via
json_decref()when no longer needed.
- Return values:
DYAD_RC_OK – The JSON object was created successfully.
DYAD_RC_BADPACK –
dtl_handle->local_addressisNULL, base64 encoding of the address or remote key failed, orjson_pack()failed.DYAD_RC_SYSFAIL – Failed to allocate the base64 encoding buffer for the address or remote key.
- Returns:
dyad_rc_treturn code:
-
dyad_rc_t dyad_dtl_ucx_rpc_unpack(const dyad_ctx_t *ctx, const void **msg, char **upath)
Unpacks a file fetch request from an incoming Flux RPC message and decodes the consumer’s UCX worker address and remote key.
Extracts the following fields from the JSON payload of
msgusingflux_request_unpack():"upath"— relative path of the requested file."tag_prod"— Flux rank of the producer broker."cons_buf"— consumer’s RDMA buffer address as a decimal string. Parsed viaatoll()and stored indtl_handle->cons_buf_ptras the remote destination address forucp_put_nbx()."pid_cons"— process ID of the consumer. Combined withtag_consto formdtl_handle->consumer_conn_key, used as the endpoint cache lookup key."addr"— base64-encoded (RFC 4648) UCX worker address of the consumer. Decoded into a newly allocateddtl_handle->remote_addressbuffer."rkey"— base64-encoded (RFC 4648) UCX remote key. Decoded into a newly allocateddtl_handle->rkey_bufbuffer.
After unpacking, both the consumer’s UCX worker address and remote key are base64-decoded from RFC 4648 encoding. The decoded address is used by
dyad_dtl_ucx_establish_connection()to create aucp_ep_hendpoint to the consumer (or retrieve a cached one). The decoded remote key is used byucx_send_no_wait()viaucp_ep_rkey_unpack()to obtain theucp_rkey_tneeded forucp_put_nbx().The communication tag is computed as:
dtl_handle->comm_tag= tag_prod << 32 | tag_consThe endpoint cache key is computed as:
dtl_handle->consumer_conn_key= pid << 32 | tag_consOn any decoding failure the corresponding allocated buffer is freed and set to
NULLbefore returning.Note
Unlike the Margo backend which calls
margo_addr_lookup()to resolve the consumer’s address, the UCX backend allocates a raw buffer for the decoded binary address and passes it directly toucp_worker_get_address()during connection establishment.Note
upathis owned by the Flux messagemsgand must not be freed by the caller. It remains valid only for the lifetime ofmsg.- Parameters:
ctx – [in] DYAD context.
msg – [in] Incoming Flux RPC message containing the JSON payload packed by
dyad_dtl_ucx_rpc_pack().upath – [out] Set to the relative path of the requested file. Valid for the lifetime of
msg.
- Return values:
DYAD_RC_OK – Unpacking and decoding succeeded.
DYAD_RC_BADUNPACK –
flux_request_unpack()failed.DYAD_RC_SYSFAIL – Failed to allocate the buffer for the decoded address or remote key.
DYAD_RC_BAD_B64DECODE – Base64 decoding of the address or remote key failed.
- Returns:
dyad_rc_treturn code:
-
dyad_rc_t dyad_dtl_ucx_rpc_respond(const dyad_ctx_t *ctx, const void **orig_msg)
Sends the initial RPC acknowledgement from the service to the consumer.
No-op for the UCX DTL. As with the Margo backend, no explicit acknowledgement over the Flux RPC channel is needed before data transfer begins — the producer connects directly to the consumer’s UCX worker address and pushes data via
ucp_put_nbx().- Parameters:
ctx – [in] DYAD context.
orig_msg – [in] Unused by this backend.
- Returns:
Always returns
DYAD_RC_OK.
-
dyad_rc_t dyad_dtl_ucx_rpc_recv_response(const dyad_ctx_t *ctx, void **f)
Receives the initial RPC response from the service.
No-op for the UCX DTL. The consumer does not process a Flux RPC response before data transfer begins — it waits directly on the sentinel value in
dtl_handle->net_bufviaucx_recv_no_wait(), which polls until the producer’s RDMA push has started writing data.- Parameters:
ctx – [in] Unused by this backend.
f – [in] Unused by this backend.
- Returns:
Always returns
DYAD_RC_OK.
-
dyad_rc_t dyad_dtl_ucx_get_buffer(const dyad_ctx_t *ctx, size_t data_size, void **data_buf)
Returns a pointer to the pre-allocated UCX RDMA buffer.
Unlike the Flux RPC and Margo backends which allocate a new buffer on each call, the UCX backend uses a single pre-allocated RDMA- registered buffer (
dtl_handle->net_buf) allocated duringdyad_dtl_ucx_init(). This function simply validatesdata_sizeand sets*data_bufto point to that pre-allocated buffer.Reusing a pre-allocated buffer avoids the overhead of repeated
ucp_mem_map()registrations, which are expensive because they pin memory and register it with the network hardware.- Todo:
Investigate and fix the
*data_buf!=NULLvalidation check that incorrectly fires for validNULL-initializedpointers.
Note
The buffer validation check (
data_buf==NULLor*data_buf!=NULL) is currently disabled due to a known issue where the*data_buf!=NULLcheck incorrectly evaluates totrueeven whendata_bufpoints to aNULLpointer (see TODO in source).Note
The caller must not free
*data_buf— it points into the UCX-registered memory region managed bydtl_handleand must be released viadyad_dtl_ucx_return_buffer()which is itself a no-op for UCX (the buffer is only freed during finalization).- Parameters:
ctx – [in] DYAD context.
data_size – [in] Number of bytes needed. Must not exceed
dtl_handle->max_transfer_size(UCX_MAX_TRANSFER_SIZE).data_buf – [out] Set to
dtl_handle->net_bufon success.
- Return values:
DYAD_RC_OK –
*data_bufset to the pre-allocated buffer.DYAD_RC_BADBUF –
data_sizeexceedsmax_transfer_size.
- Returns:
dyad_rc_treturn code:
-
dyad_rc_t dyad_dtl_ucx_return_buffer(const dyad_ctx_t *ctx, void **data_buf)
Releases a reference to the UCX pre-allocated RDMA buffer.
Unlike the Flux RPC and Margo backends which
free()the buffer, the UCX backend only sets*data_buftoNULLsince the buffer is pre-allocated and RDMA-registered duringdyad_dtl_ucx_init()and must persist for the lifetime of the DTL handle. The actual memory is released byucx_free_buffer()during finalization.Validates
data_bufbefore clearing:If
data_bufisNULLor*data_bufisNULL, returnsDYAD_RC_BADBUF.
- Parameters:
ctx – [inout] DYAD context.
data_buf – [inout] Pointer to the buffer pointer to clear.
*data_bufis set toNULLon success.
- Return values:
DYAD_RC_OK –
*data_bufcleared successfully.DYAD_RC_BADBUF –
data_bufor*data_bufisNULL.
- Returns:
dyad_rc_treturn code:
-
dyad_rc_t dyad_dtl_ucx_establish_connection(const dyad_ctx_t *ctx)
Establishes a UCX endpoint connection for data transfer.
Behavior differs by communication direction:
DYAD_COMM_SEND(producer): Looks up the consumer’s endpoint in the endpoint cache viadyad_ucx_ep_cache_find()usingdtl_handle->remote_addressas the key. If no cached endpoint exists, creates a new one viadyad_ucx_ep_cache_insert(), which callsucp_ep_create()and stores the result in the cache. The endpoint is stored indtl_handle->epfor use byucx_send_no_wait(). Ifdebugistrue, prints endpoint information tostderrviaucp_ep_print_info().DYAD_COMM_RECV(consumer): No-op. The consumer does not need to create an endpoint — it passively waits for the producer to push data into the pre-registered RDMA buffer viaucp_put_nbx().
The endpoint cache avoids recreating
ucp_ep_hobjects for repeated transfers to the same consumer, which is significant because UCX endpoint creation involves connection establishment with the remote worker and is expensive relative to the data transfer itself.- Parameters:
ctx – [in] DYAD context. The remote address, endpoint cache, UCX worker, and endpoint pointer are read from and written to the UCX DTL internal state.
- Return values:
DYAD_RC_OK – Connection established or not needed (consumer side).
DYAD_RC_BAD_COMM_MODE –
dtl_handle->comm_modeis invalid.other – Any error code returned by
dyad_ucx_ep_cache_insert()if endpoint creation failed.
- Returns:
dyad_rc_treturn code:
-
dyad_rc_t dyad_dtl_ucx_send(const dyad_ctx_t *ctx, void *buf, size_t buflen)
Sends file data to the consumer via UCX RDMA push.
Initiates a non-blocking RDMA put via
ucx_send_no_wait()withis_warmup=false, then blocks until the operation completes by callingdyad_ucx_request_wait(). The data is pushed directly into the consumer’s pre-registered RDMA buffer atdtl_handle->cons_buf_ptrusing the endpoint and remote key established duringdyad_dtl_ucx_establish_connection()anddyad_dtl_ucx_rpc_unpack().Note
The producer prepends the file size to
bufbefore calling this function (indyad_fetch_request_cb()), so the consumer can use the firstsizeof(size_t)bytes as a sentinel inucx_recv_no_wait()to detect when the push has started.- Parameters:
ctx – [in] DYAD context.
buf – [in] Buffer containing the file data to send. Must point to the pre-allocated UCX-registered buffer (
dtl_handle->net_buf) sinceucp_put_nbx()requires the source buffer to be registered with UCX.buflen – [in] Number of bytes to send.
- Return values:
DYAD_RC_OK – Data sent successfully.
DYAD_RC_UCXCOMM_FAIL –
ucp_put_nbx()ordyad_ucx_request_wait()failed.
- Returns:
dyad_rc_treturn code:
-
dyad_rc_t dyad_dtl_ucx_recv(const dyad_ctx_t *ctx, void **buf, size_t *buflen)
Receives file data from the producer via UCX RDMA push.
Calls
ucx_recv_no_wait()withis_warmup=falseto busy-poll the firstsizeof(ssize_t)bytes ofdtl_handle->net_bufuntil they become non-zero, indicating that the producer has started the RDMA push. Afterucx_recv_no_wait()returns, the received data is available directly indtl_handle->net_buf—bufandbuflenare not populated by this function and must be read by the caller from the pre-allocated buffer viadyad_dtl_ucx_get_buffer().- Todo:
Add error handling for cases where the RDMA push fails or times out. The current implementation spins indefinitely if the sentinel never becomes non-zero.
Note
The return value of
ucx_recv_no_wait()is intentionally discarded ((void)stat_ptr) since it always returnsNULLfor the UCX backend — the actual completion is signalled by the buffer sentinel, not by a UCX request handle.Note
Unlike the Flux RPC backend where
bufandbuflenare populated with a newly allocated copy of the received data, the UCX backend leaves data in the pre-allocateddtl_handle->net_buf. The caller reads it directly from there without an additional copy.- Parameters:
ctx – [in] DYAD context.
buf – [out] Not populated by this function. The received data is in
dtl_handle->net_buf.buflen – [out] Not populated by this function.
- Returns:
Always returns
DYAD_RC_OK.
-
dyad_rc_t dyad_dtl_ucx_close_connection(const dyad_ctx_t *ctx)
Closes the UCX DTL data channel after a transfer completes.
Behavior differs by communication direction:
DYAD_COMM_SEND(producer): Destroys the unpacked remote key viaucp_rkey_destroy()— the remote key is unpacked per-transfer inucx_send_no_wait()and must be destroyed after each send. Clearsdtl_handle->ep,dtl_handle->remote_address,dtl_handle->remote_addr_len, anddtl_handle->comm_tag.The endpoint itself is not disconnected — it is retained in the endpoint cache for reuse in future transfers to the same consumer, avoiding the cost of reconnection. LRU eviction of stale endpoints from the cache is not yet implemented (see TODO).
dtl_handle->remote_addressis set toNULLbut not freed — the pointer is still referenced by the endpoint cache entry and must remain valid until the cache entry is evicted (see TODO).DYAD_COMM_RECV(consumer): No-op beyond resettingdtl_handle->comm_tagto 0. The consumer has no endpoint to close since it passively receives data via the pre-registered RDMA buffer without creating aucp_ep_h.
- Todo:
Implement LRU eviction for the endpoint cache to reclaim resources for endpoints that have not been used recently. The commented-out
ucx_disconnect()call shows the intended disconnect path that would be used on eviction.
- Todo:
Free
dtl_handle->remote_addresshere once the endpoint cache no longer references it, either by copying the address into the cache entry or by reference-counting it.
- Parameters:
ctx – [in] DYAD context.
- Return values:
DYAD_RC_OK – Connection closed successfully.
DYAD_RC_BAD_COMM_MODE –
dtl_handle->comm_modeis invalid.
- Returns:
dyad_rc_treturn code:
-
dyad_rc_t dyad_dtl_ucx_finalize(const dyad_ctx_t *ctx)
Finalizes and frees the UCX DTL internal state.
Releases all UCX resources in the correct teardown order, guarding each step with a
NULLcheck to allow safe partial finalization when called after a faileddyad_dtl_ucx_init():If
dtl_handle->epis non-NULL, closes the active connection viadyad_dtl_ucx_close_connection()to destroy the unpacked remote key and clear connection state, then setseptoNULL.If
dtl_handle->ep_cacheis non-NULL, finalizes the endpoint cache viadyad_ucx_ep_cache_finalize(), which disconnects and destroys all cacheducp_ep_hendpoints, then setsep_cachetoNULL.If
dtl_handle->local_addressis non-NULL, releases the local worker address viaucp_worker_release_address()and sets it toNULL.If
dtl_handle->mem_handleis non-NULL, unmaps and frees the pre-allocated RDMA buffer viaucx_free_buffer(), which callsucp_mem_unmap()and setsdtl_handle->net_buftoNULL.If
dtl_handle->ucx_workeris non-NULL, destroys the UCX worker viaucp_worker_destroy()and sets it toNULL.If
dtl_handle->ucx_ctxis non-NULL, releases the UCX context viaucp_cleanup()and sets it toNULL.Sets
dtl_handle->htoNULL. The Flux handle is non-owning and must not be closed here — it is managed by the DYAD context.Frees the
dyad_dtl_ucxstruct and sets the handle pointer toNULL.
If
ctx->dtl_handleisNULLorctx->dtl_handle->private_dtl.ucx_dtl_handleisNULL, the function is a no-op and returnsDYAD_RC_OK.Note
The endpoint cache must be finalized before the UCX worker is destroyed, since endpoint disconnection requires the worker to be active to flush pending operations.
Note
The RDMA buffer must be unmapped before the UCX context is cleaned up, since
ucp_mem_unmap()requires a valid UCX context.Note
This function only frees the
dyad_dtl_ucxstruct. The outerdyad_dtlhandle is freed bydyad_dtl_finalize(), which calls this function as part of the full teardown sequence.- Parameters:
ctx – [in] DYAD context. On return,
ctx->dtl_handle->private_dtl.ucx_dtl_handleisNULL.- Returns:
Always returns
DYAD_RC_OK.
Variables
-
const base64_maps_t base64_maps_rfc4648
Base64 encoding map for RFC 4648, used to encode/decode UCX remote keys for transmission over the Flux RPC channel.
-
struct ucx_request
Request struct used to track the completion of async UCX operations.
UCX allocates one instance of this struct per in-flight operation using
dyad_ucx_request_init()as the initializer. Thecompletedflag is set to 1 by the send callbackdyad_send_callback()when the operation finishes, and polled bydyad_ucx_request_wait().Public Members
-
int completed
Set to 1 when the UCX operation completes.
-
int completed
Typedefs
-
using key_type = uint64_t
Key type for the UCX endpoint cache.
A 64-bit integer combining the consumer’s process ID and communication tag (
pid<< 32 | tag_cons), uniquely identifying a consumer connection within a job.
-
using cache_type = std::unordered_map<key_type, void*>
UCX endpoint cache type.
An unordered map from
key_typetoucp_ep_h, used to cache UCX endpoints keyed by consumer connection key. Lookups and insertions are O(1) on average. Cached endpoints are reused across transfers to the same consumer to avoid the cost of repeateducp_ep_create()calls.
Functions
-
static void dyad_ucx_ep_err_handler(void *arg, void *ep, int status)
UCX endpoint error handler callback.
Registered as the error handler for UCX endpoints via the
UCP_EP_PARAM_FIELD_ERR_HANDLERfield ofucp_ep_params_t. Called by UCX when an error occurs on aucp_ep_h, for example when the remote peer disconnects unexpectedly or a network failure is detected.Currently logs the error via
DYAD_LOG_ERRORand returns. No recovery action is taken.Marked
((unused)) because error handler registration is not yet wired intoucx_connect()— the handler is defined but not currently passed toucp_ep_create()(see TODO).- Todo:
Wire this handler into
ucx_connect()by settingUCP_EP_PARAM_FIELD_ERR_HANDLERandparams.err_handlerinucp_ep_params_tso that endpoint errors are caught and logged at runtime.
- Parameters:
arg – [in] User argument passed during endpoint creation. Cast to
dyad_ctx_t*for logging.ep – [in] The endpoint on which the error occurred.
status – [in] UCX error status describing the failure.
-
dyad_rc_t ucx_connect(const dyad_ctx_t *ctx, void *worker, const void *addr, void **ep)
Creates a UCX endpoint to a remote worker.
Creates a
ucp_ep_hto the remote worker identified byaddrviaucp_ep_create(). The endpoint is initialized with onlyUCP_EP_PARAM_FIELD_REMOTE_ADDRESSset — no error handler is registered (see TODO indyad_ucx_ep_err_handler()).Validates the created endpoint — if
ucp_ep_create()succeeds but returns aNULLendpoint, the function treats this as a failure and returnsDYAD_RC_UCXCOMM_FAIL.Note
This function is used both during the warmup loopback connection in
ucx_warmup()and during real connection establishment indyad_ucx_ep_cache_insert(). In the normal transfer path,dyad_dtl_ucx_establish_connection()uses the endpoint cache and calls this function indirectly viadyad_ucx_ep_cache_insert()only when no cached endpoint exists for the consumer.- Parameters:
ctx – [in] DYAD context. Used for logging.
worker – [in] UCX worker on which to create the endpoint.
addr – [in] UCX address of the remote worker to connect to.
ep – [out] Set to the created
ucp_ep_hon success.
- Return values:
DYAD_RC_OK – Endpoint created successfully.
DYAD_RC_UCXCOMM_FAIL –
ucp_ep_create()failed or returned aNULLendpoint.
- Returns:
dyad_rc_treturn code:
-
dyad_rc_t ucx_disconnect(const dyad_ctx_t *ctx, void *worker, void *ep)
Closes a UCX endpoint and waits for the closure to complete.
Initiates a non-blocking endpoint close via
ucp_ep_close_nbx()(UCX >= 1.10) orucp_ep_close_nb()(UCX < 1.10), both usingUCP_EP_CLOSE_FLAG_FORCE/UCP_EP_CLOSE_MODE_FORCEto forcefully terminate the endpoint without waiting for in-flight operations to complete. IfepisNULLthe function is a no-op.Unlike other UCX non-blocking operations, endpoint close does not use
dyad_ucx_request_wait()because the close request behaves differently from communication requests — it must be waited on by callingucp_worker_progress()directly rather than through the standard request polling path. The wait loop is therefore inlined:If the returned
stat_ptris a request handle (UCS_PTR_IS_PTR), spins callingucp_worker_progress()anducp_request_check_status()until the status is no longerUCS_INPROGRESS, then frees the request viaucp_request_free().If the returned
stat_ptrencodes an error (UCS_PTR_IS_ERR), extracts the status code for reporting.If
stat_ptrisNULL, the close completed immediately.
Note
Force-close is used regardless of UCX version. If error handler mode is enabled in the future, the close mode may need to change to
UCP_EP_CLOSE_MODE_FLUSHto allow in-flight operations to drain before closing (see TODO in source).Note
This function is called by
dyad_ucx_ep_cache_finalize()when evicting endpoints from the cache, and byucx_warmup()after the loopback warmup transfer. It is not called during normal transfer close —dyad_dtl_ucx_close_connection()setsdtl_handle->eptoNULLand relies on the cache to manage the endpoint lifetime.- Parameters:
ctx – [in] DYAD context. Used for logging.
worker – [in] UCX worker needed to progress the close operation.
ep – [in] UCX endpoint to close. If
NULL, the function is a no-op and returnsDYAD_RC_OK.
- Return values:
DYAD_RC_OK – Endpoint closed successfully or was
NULL.DYAD_RC_UCXEP_FAIL – The endpoint close operation failed. The endpoint can no longer be used regardless of the return code.
- Returns:
dyad_rc_treturn code:
-
dyad_rc_t dyad_ucx_ep_cache_init(const dyad_ctx_t *ctx, void **cache)
Allocates and initializes the UCX endpoint cache.
Allocates a new
cache_type(std::unordered_map<key_type, ucp_ep_h>) usingnew(std::nothrow)and stores areinterpret_castpointer to it in*cacheas an opaqueucx_ep_cache_hhandle. Usingstd::nothrowensures that allocation failure returnsnullptrrather than throwingstd::bad_alloc.Validates
cachebefore allocation:If
cacheisnullptr, the caller passed an invalid output pointer andDYAD_RC_BADBUFis returned.If
*cacheis already non-nullptr, a cache is already present and overwriting it would leak the existing allocation, soDYAD_RC_BADBUFis returned.
- Todo:
Add an option to configure replacement strategy
- Parameters:
ctx – [in] DYAD context. Used for logging.
cache – [out] Must point to a
nullptron entry. Set to the allocated cache handle on success.
- Return values:
DYAD_RC_OK – Cache allocated successfully.
DYAD_RC_BADBUF –
cacheisnullptror*cacheis already non-nullptr.DYAD_RC_SYSFAIL –
new(std::nothrow)failed to allocate the cache.
- Returns:
dyad_rc_treturn code:
-
dyad_rc_t dyad_ucx_ep_cache_find(const dyad_ctx_t *ctx, const void *cache, const void *addr, const size_t addr_size, void **ep)
Looks up a cached UCX endpoint by consumer connection key.
Searches the endpoint cache for an entry matching
ctx->dtl_handle->private_dtl.ucx_dtl_handle->consumer_conn_key(pid<< 32 | tag_cons). If found, sets*epto the cacheducp_ep_hand returnsDYAD_RC_OK. If not found, sets*eptonullptrand returnsDYAD_RC_NOTFOUND.Validates
epbefore searching:If
episnullptr, the caller passed an invalid output pointer andDYAD_RC_BADBUFis returned.If
*epis already non-nullptr, an endpoint is already present and overwriting it would leak the existing handle, soDYAD_RC_BADBUFis returned.
Note
The
addrandaddr_sizeparameters are accepted for interface consistency but are not used — the lookup is performed byconsumer_conn_keyrather than by raw address bytes, sinceucp_address_tis an opaque struct that cannot be reliably compared byte-by-byte.Note
All cache operations are wrapped in a
try/catch(...)block to prevent C++ exceptions from propagating into the C calling code. Any exception is caught and returned asDYAD_RC_SYSFAIL.- Parameters:
ctx – [in] DYAD context. The
consumer_conn_keyused as the cache lookup key is read from the UCX DTL internal state.cache – [in] Endpoint cache handle to search.
addr – [in] Unused. Accepted for interface consistency.
addr_size – [in] Unused. Accepted for interface consistency.
ep – [out] Must point to a
nullptron entry. Set to the cacheducp_ep_hon success, ornullptrif not found.
- Return values:
DYAD_RC_OK – A cached endpoint was found and returned.
DYAD_RC_NOTFOUND – No cached endpoint exists for the current
consumer_conn_key.DYAD_RC_BADBUF –
episnullptror*epis already non-nullptr.DYAD_RC_SYSFAIL – An unexpected C++ exception was thrown during the cache lookup.
- Returns:
dyad_rc_treturn code:
-
dyad_rc_t dyad_ucx_ep_cache_insert(const dyad_ctx_t *ctx, void *cache, const void *addr, const size_t addr_size, void *worker)
Inserts a new UCX endpoint into the cache, creating it if needed.
Looks up
consumer_conn_keyin the cache. If an entry already exists, returnsDYAD_RC_OKimmediately without creating a new endpoint — this handles the case wheredyad_dtl_ucx_establish_connection()callsdyad_ucx_ep_cache_find()anddyad_ucx_ep_cache_insert()in sequence and the entry was inserted between the two calls.If no entry exists, creates a new
ucp_ep_htoaddrviaucx_connect()and inserts it into the cache usinginsert_or_assign(). The new endpoint is also stored directly inctx->dtl_handle->private_dtl.ucx_dtl_handle->epfor immediate use by the caller.Note
All cache operations are wrapped in a
try/catch(...)block to prevent C++ exceptions from propagating into the C calling code. Any exception is caught and returned asDYAD_RC_SYSFAIL.Note
insert_or_assign()is used instead ofinsert()to handle the race condition where a concurrent insert could have occurred between thefind()check and the insert. Since DYAD usesUCS_THREAD_MODE_SERIALIZED, true concurrency is not possible, butinsert_or_assign()is safer and has no performance cost in the non-concurrent case.- Parameters:
ctx – [in] DYAD context. The
consumer_conn_keyandepfields of the UCX DTL internal state are read and written.cache – [in] Endpoint cache handle to insert into.
addr – [in] UCX address of the remote worker to connect to. Passed to
ucx_connect()if no cached entry exists.addr_size – [in] Size of
addrin bytes. Passed toucx_connect()for consistency.worker – [in] UCX worker used to create the new endpoint via
ucx_connect().
- Return values:
DYAD_RC_OK – A cached entry already existed, or a new endpoint was created and inserted successfully.
DYAD_RC_SYSFAIL – An unexpected C++ exception was thrown during the cache operation.
other – Any error code returned by
ucx_connect()if endpoint creation failed.
- Returns:
dyad_rc_treturn code:
-
static inline cache_type::iterator cache_remove_impl(const dyad_ctx_t *ctx, cache_type *cache, cache_type::iterator it, void *worker)
Internal helper that disconnects and removes a single cache entry.
If
itis a valid iterator (notcache->end()), disconnects the endpoint viaucx_disconnect(), erases the entry from the cache viacache->erase(), and returns the iterator to the next entry. Ifitiscache->end(), returnscache->end()immediately as a no-op.Used by both
dyad_ucx_ep_cache_remove()for single-entry removal anddyad_ucx_ep_cache_finalize()to iterate over and remove all entries.Note
The comment in the source mentions that the UCP address was allocated with
malloc()during RPC unpacking. However the current implementation does not free the address here — it was extracted fromdtl_handle->remote_addressand is cleared indyad_dtl_ucx_close_connection(). See the TODO indyad_dtl_ucx_close_connection()regarding ownership ofremote_address.- Parameters:
ctx – [in] DYAD context. Used for logging in
ucx_disconnect().cache – [in] The cache from which to remove the entry.
it – [in] Iterator to the entry to remove. If equal to
cache->end(), the function is a no-op.worker – [in] UCX worker passed to
ucx_disconnect()to progress the endpoint close operation.
- Returns:
Iterator to the entry following the removed one, or
cache->end()ifitwas alreadycache->end().
-
dyad_rc_t dyad_ucx_ep_cache_remove(const dyad_ctx_t *ctx, void *cache, const void *addr, const size_t addr_size, void *worker)
Removes a single UCX endpoint from the cache and disconnects it.
Looks up
consumer_conn_keyin the cache and, if found, delegates tocache_remove_impl()to disconnect the endpoint viaucx_disconnect()and erase the cache entry. If no entry is found for the key, the function is effectively a no-op and returnsDYAD_RC_OK.Note
All cache operations are wrapped in a
try/catch(...)block to prevent C++ exceptions from propagating into the C calling code.Note
addrandaddr_sizeare accepted for interface consistency but are not used — the removal is performed byconsumer_conn_keyrather than by raw address bytes.- Parameters:
ctx – [in] DYAD context. The
consumer_conn_keyused as the cache lookup key is read from the UCX DTL internal state.cache – [in] Endpoint cache handle to remove from.
addr – [in] Unused. Accepted for interface consistency.
addr_size – [in] Unused. Accepted for interface consistency.
worker – [in] UCX worker passed to
ucx_disconnect()to progress the endpoint close operation.
- Return values:
DYAD_RC_OK – Entry removed successfully or key not found.
DYAD_RC_SYSFAIL – An unexpected C++ exception was thrown during the cache operation.
- Returns:
dyad_rc_treturn code:
-
dyad_rc_t dyad_ucx_ep_cache_finalize(const dyad_ctx_t *ctx, void **cache, void *worker)
Finalizes and frees the UCX endpoint cache.
Iterates over all entries in the cache, disconnecting and removing each endpoint via
cache_remove_impl(). After all entries are removed, deletes thecache_typeobject and sets*cachetonullptr.The iteration uses the iterator returned by
cache_remove_impl()rather than incrementing the iterator manually, sinceerase()invalidates the current iterator.cache_remove_impl()returns the iterator to the next valid entry after the erased one, making this a safe and correct way to drain the entire cache.If
cacheisnullptror*cacheisnullptr, the function is a no-op and returnsDYAD_RC_OK. This allows safe calls on a partially initialized or already-finalized cache.- Todo:
Add a
try/catchblock around the iteration anddeleteto prevent C++ exceptions from propagating into the C calling code indyad_dtl_ucx_finalize().
Note
This function is called by
dyad_dtl_ucx_finalize()as part of the full UCX DTL teardown sequence. The endpoint cache must be finalized before the UCX worker is destroyed, sinceucx_disconnect()requires the worker to be active to progress the endpoint close operations.Note
Unlike
dyad_ucx_ep_cache_remove()which wraps operations in atry/catchblock, this function does not — any C++ exception thrown during iteration or deletion will propagate to the caller. Since this is called only from C++ translation units this is acceptable, but atry/catchwrapper could be added for robustness (see TODO).- Parameters:
ctx – [in] DYAD context. Used for logging in
cache_remove_impl()→ucx_disconnect().cache – [inout] Pointer to the cache handle to finalize.
*cacheis set tonullptron return. Ifnullptror*cacheisnullptr, the function is a no-op.worker – [in] UCX worker passed to
ucx_disconnect()for each endpoint in the cache.
- Returns:
Always returns
DYAD_RC_OK.