Skip to main content

Bulk Patient Import over WebSockets

This module implements a stateful, authenticated WebSocket protocol to import many patients (and their data entries) into a cohort in a controlled, auditable way.

It is designed for connector-style data ingestion where an external system streams patient batches to the server, and the server responds with per-batch reports plus final run statistics.


Where this lives

Main entrypoint (WebSocket endpoint):

  • bio.cosy.feddb.local.api.cohort.patient.bulkimport.BulkImportWebsocketService

Session + persistence orchestration:

  • bio.cosy.feddb.local.api.cohort.patient.bulkimport.BulkImportSessionBO

Implemented import mode:

  • bio.cosy.feddb.local.api.cohort.patient.bulkimport.modes.BulkImportComprehensiveModeBO

Other modes exist conceptually but are not implemented yet (see Import modes).


High-level flow

The protocol is intentionally simple and strict:

  1. Client connects to the WebSocket (/ws/bulkimport)
  2. Client sends START_TRANSFER
  3. Server persists a new import session and returns START_TRANSFER_RESPONSE with the canonical TransferIdentificationDTO
  4. Client sends one or more PATIENT_DATA batches (chunked by batchId)
  5. Server responds to each batch with a PATIENT_REPORT (per-patient error logs)
  6. Client sends STOP_TRANSFER
  7. Server finalizes and returns RUN_STATISTICS
  8. Connection may be closed by either side

If something goes wrong, the server sends CRITICAL_ERROR and closes the connection.


WebSocket endpoint

Path

The WebSocket endpoint is exposed at:

  • BulkImportWebsocketService.PATH = /ws/bulkimport
@WebSocket(path = BulkImportWebsocketService.PATH)
@Authenticated
@RequestScoped
public class BulkImportWebsocketService { ... }

Authentication

The endpoint is annotated with @Authenticated, so the connection is established only for authenticated users.

The service uses UserIdentity to resolve the caller:

  • userIdentity.getKeycloakIdOrDefault()

This keycloakId becomes part of the import session metadata and is also used for audit context propagation.


Connection state (in-memory)

WebSockets are long-lived; you need some server-side state per connection. This implementation uses Quarkus WebSockets UserData storage with typed keys:

  • SESSION_KEY: BulkImportSessionDTO
  • PATIENTS_KEY: List<PatientEntity>
public static final TypedKey<BulkImportSessionDTO> SESSION_KEY = new TypedKey<>("session");
public static final TypedKey<List<PatientEntity>> PATIENTS_KEY = new TypedKey<>("patients");

OnOpen

On connection open:

  • resolve keycloakId
  • create a BulkImportSessionDTO(keycloakId) in INIT state (in-memory only)
  • attach an empty patient buffer list
@OnOpen
public void onOpen(WebSocketConnection connection) {
String keycloakId = userIdentity.getKeycloakIdOrDefault();
BulkImportSessionDTO session = new BulkImportSessionDTO(keycloakId);

connection.userData().put(SESSION_KEY, session);
connection.userData().put(PATIENTS_KEY, new ArrayList<>());
}

Important: the session created here is not persisted. Persistence only happens once the client sends START_TRANSFER.

OnClose

On close, the service does not auto-finalize.

This is a deliberate design choice:

  • closing a WebSocket can happen for many reasons (client crash, network reset, proxy timeout)
  • you do not want an implicit flush or a partial commit unless the connector explicitly finishes the run

So the rule is:

Only STOP_TRANSFER finalizes and persists the import result. Closing the connection alone is not treated as a successful run.

OnError

// @OnError updates a persisted run to ERROR, but only if the session has already started:
// - if session is null or still INIT, nothing is persisted
// - otherwise:
// - session.status = ERROR
// - session.errorMessage = throwable.getMessage()
// - bulkImportBO.update(session)

@OnError
@Transactional
public synchronized void onError(WebSocketConnection connection, Throwable throwable) {
BulkImportSessionDTO session = connection.userData().get(SESSION_KEY);
if (session == null || session.getStatus() == ImportStatusEnum.INIT) return;

session.setErrorMessage(throwable.getMessage());
session.setStatus(ImportStatusEnum.ERROR);
bulkImportBO.update(session);
}

Message model

All inbound and outbound messages are wrapped in:

  • MessageWrapper<T>
@Data
@NoArgsConstructor
@AllArgsConstructor
public class MessageWrapper<T> {
@NotNull
private MessageTypeEnum messageType;
private Integer status;
private T message;
}

Message types

Client → Server:

  • START_TRANSFER
  • PATIENT_DATA
  • STOP_TRANSFER

Server → Client:

  • START_TRANSFER_RESPONSE
  • PATIENT_REPORT
  • RUN_STATISTICS
  • CRITICAL_ERROR (terminal, closes connection)
public enum MessageTypeEnum {
START_TRANSFER,
STOP_TRANSFER,
PATIENT_DATA,

START_TRANSFER_RESPONSE,
CRITICAL_ERROR,
PATIENT_REPORT,
RUN_STATISTICS
}

Protocol details

1) START_TRANSFER

Client sends:

  • MessageWrapper<StartTransferConnectorDTO> with messageType = START_TRANSFER

StartTransferConnectorDTO:

  • cohortId (target cohort)
  • connectorId (source connector identifier)
  • importerPID (process identifier on connector side)
  • mode (DEFAULT or COMPREHENSIVE)
  • elements (expected number of patient entities)
  • dry (optional, default false)
@Data
public class StartTransferConnectorDTO {
@NotNull private Long cohortId;
@NotNull private Long connectorId;
@NotNull private Long importerPID;
@NotNull private ImportModeEnum mode;
@NotNull private Long elements;
private Boolean dry = false;
}

Server behavior:

  • validates that the connection session is still INIT
  • persists a new session via bulkImportBO.initNewSession(startDto)
  • stores the persisted session back into UserData
  • replies with:
  • MessageTypeEnum.START_TRANSFER_RESPONSE
  • TransferIdentificationDTO(id, cohortId, connectorId)
public synchronized MessageWrapper<TransferIdentificationDTO> handleStartTransfer(...) {
BulkImportSessionDTO currentSession = connection.userData().get(SESSION_KEY);
if (currentSession.getStatus() != ImportStatusEnum.INIT) { ... }

BulkImportSessionDTO startSession = bulkImportBO.initNewSession(startDto);
connection.userData().put(SESSION_KEY, startSession);

TransferIdentificationDTO response = new TransferIdentificationDTO(
startSession.getId(), startSession.getCohortId(), startSession.getConnectorId()
);

return new MessageWrapper<>(MessageTypeEnum.START_TRANSFER_RESPONSE, 200, response);
}

2) PATIENT_DATA (batched)

Client sends:

  • MessageWrapper<BulkPatientDataMessage> with messageType = PATIENT_DATA

BulkPatientDataMessage:

  • batchId (chunk identifier; client-chosen)
  • transferIdentification (must match session)
  • patientDataMessages (list of patient payloads)
@Data
public class BulkPatientDataMessage {
@NotNull private Long batchId;
@NotNull private TransferIdentificationDTO transferIdentification;
@NotNull private List<PatientDataMessage> patientDataMessages;
}

PatientDataMessage contains:

  • externalPatientId
  • dataEntries in a 3D list structure: List<List<List<PatientDataEntryDTO>>>
@Data
public class PatientDataMessage {
@NotNull private String externalPatientId;
@NotNull private List<List<List<PatientDataEntryDTO>>> dataEntries;
}

How to read dataEntries:

  • outer list: multiple time series blocks (or “frames”)
  • middle list: rows within that block (each row corresponds to one timestamp/visit/observation group)
  • inner list: columns/parameters of that row (each is one PatientDataEntryDTO)

This structure is used to reconstruct grouped long-format observations while keeping row-level grouping stable.

Server behavior:

  • requires an active RUNNING session
  • routes by mode:
    • COMPREHENSIVE → BulkImportComprehensiveModeBO.importPatients(...)
    • DEFAULT → currently implemented in a separate BO but stop is not implemented (see below)
  • responds with PATIENT_REPORT containing:
    • importId
    • batchId
    • per-patient error logs
public MessageWrapper<BulkPatientReportDTO> handlePatientData(...) {
BulkImportSessionDTO session = connection.userData().get(SESSION_KEY);
if (session == null || session.getStatus() != ImportStatusEnum.RUNNING) { ... }

BulkPatientReportDTO report = comprehensiveModeBO.importPatients(...);

return new MessageWrapper<>(MessageTypeEnum.PATIENT_REPORT, 200, report);
}

Per-patient error logs

Every batch returns a report listing errors per patient:

@Data
@AllArgsConstructor
public class BulkPatientReportDTO {
@NotNull private Long importId;
@NotNull private Long batchId;
@NotNull private List<PatientErrorLogsDTO> errorLogs;
}

PatientErrorLogsDTO:

  • message (patient-level message)
  • externalPatientId
  • updated flag (true if changes were applied / entries added)
  • errorFields (field-level errors: schemaNodeId + message)
@Data
public class PatientErrorLogsDTO {
private String message;
private String externalPatientId;
private boolean updated = false;
private List<RunErrorFieldLogDTO> errorFields = new ArrayList<>();
}

This is intentionally verbose: you can feed the logs directly into connector diagnostics or UI.


3) STOP_TRANSFER

Client sends:

  • MessageWrapper<TransferIdentificationDTO> with messageType = STOP_TRANSFER
@Data
@AllArgsConstructor
public class TransferIdentificationDTO {
private Long id; //pk of the bulk import session inside local learning id
private Long cohortId;
private Long connectorId;
}

Server behavior:

  • validates transferIdentification matches the session (importId, cohortId, connectorId)
  • requires session state RUNNING
  • mode-specific finalization:

Comprehensive mode finalization BulkImportComprehensiveModeBO.finishTransfer(session, patientsToCommit) does the final reconciliation:

  1. Build lookup map from imported patients by externalPatientId
  2. Load all existing cohort patients
  3. For each existing patient:
    • if present in import → replace its data entries (update)
    • if missing in import → delete patient (comprehensive sync semantics)
  4. Any remaining imported patients not matched → create new
  5. Update session counters + mark FINISHED
  6. Return BulkImportSessionDTO (run statistics)

Response message type:

  • MessageTypeEnum.RUN_STATISTICS
public synchronized MessageWrapper<BulkImportSessionDTO> handleStopTransfer(...) {
if (session.getMode() == ImportModeEnum.COMPREHENSIVE) {
BulkImportSessionDTO stats = comprehensiveModeBO.finishTransfer(session, patients);
return new MessageWrapper<>(MessageTypeEnum.RUN_STATISTICS, stats);
}
...
}

Default mode finalization (not implemented) DEFAULT currently returns:

  • CRITICAL_ERROR with HTTP-like status 501
  • connection closes

sendCriticalErrorandCleanup(connection, "Default mode not implemented yet", 501);


Import modes

The bulk import protocol supports different run modes that define how the incoming data set is interpreted relative to previously imported data. Each run mode expresses a contract between the connector and the platform about data completeness and deletion semantics.

The semantics are intentionally strict to guarantee reproducibility, auditability, and deterministic outcomes.

INSERT mode

Semantics: Incremental insert without deletion.

  • New entities present in the run are created
  • Existing entities: All new data entries from the import are always inserted, there is no check for duplications or whether similar entries exist already.
  • No entities are deleted, even if they are missing from the current run

INSERT mode is suited for connectors that:

  • ingest data incrementally,
  • cannot guarantee that each run contains a full snapshot of the source system,
  • or explicitly want to avoid destructive operations.

In the current implementation:

  • Patient entities are created or updated based on the incoming batches
    • if the patient exists, add only data to Table: patient_data
    • If the patient is new, add patient and data (patient_data)
  • Missing patients from previous runs are left untouched
  • STOP_TRANSFER finalization performs no reconciliation-based deletion

INSERT mode assumes that absence of data does not imply deletion.


COMPREHENSIVE mode

Semantics: Snapshot-based synchronization (source-of-truth mode).

In COMPREHENSIVE mode, each run is treated as a complete representation of the cohort state as provided by the connector.

  • New entities present in the run are created
  • Existing entities are updated (or remain unchanged) based solely on the current run
  • Any entities that were created by previous runs of the same connector, but are not present in the current run, are deleted

This mode is designed for connectors that:

  • can reliably deliver the entire dataset on every run,
  • treat the source system as authoritative,
  • and want the target cohort to exactly mirror the source state.

Failure semantics (important)

If a connector running in COMPREHENSIVE mode encounters a critical error at any point in the run lifecycle (including during batch processing or finalization):

  • the entire run fails
  • no partial results are committed
  • the import session is marked as ERROR
  • the cohort state remains unchanged

This ensures atomic snapshot semantics and prevents partially applied destructive updates.

In other words: COMPREHENSIVE mode is all-or-nothing.


DELETION mode (planned / not yet implemented)

Semantics: Explicit deletion-only run.

In DELETION mode, the connector run is interpreted as a deletion instruction set.

  • No entities are created
  • No entities are updated
  • Every entity whose identifier appears in the run is deleted

This mode is intended for advanced connector workflows where:

  • deletions are pushed explicitly (e.g. event-driven source systems),
  • each run represents only entities to be removed,
  • and the run is not a comprehensive dataset.

Typical use cases include:

  • warning-based cleanup runs,
  • corrective deletion after misconfigured imports,
  • or explicit "tombstone" synchronization from upstream systems.

DELETION mode is destructive by design and should be used with care.


Choosing the right run mode

Run modeAssumes full datasetCreatesUpdates (Replace)Updates (Inserts)DeletesTypical use case
INSERT❌ No✅ Yes❌ No✅ Yes❌ NoIncremental ingestion
COMPREHENSIVE✅ Yes✅ Yes✅ Yes✅ Yes✅ YesSnapshot synchronization
DELETION❌ No❌ No❌ No❌ No✅ YesExplicit deletion runs

Mapping to protocol behavior

  • The run mode is fixed when START_TRANSFER is issued
  • The selected mode governs:
    • batch processing semantics
    • STOP_TRANSFER finalization behavior
    • error handling and rollback guarantees
  • Mode-specific logic is encapsulated in dedicated Business Objects (e.g. BulkImportComprehensiveModeBO)

Statistics

Statistics are tracked with BulkImportStatisticsTypeEnum:

public enum BulkImportStatisticsTypeEnum {
NEW_ENTITIES,
UPDATED_ENTITIES,
DELETED_ENTITIES,
FAILED_ENTITIES,
UNCHANGED_ENTITIES,
RECEIVED_ENTITIES,
PROCCESSED_ENTITIES,
NEW_DATA_ENTRIES,
FAILED_DATA_ENTRIES;
}

In comprehensive mode, the importer uses these counters while processing batches and while finalizing. They are returned at the end as part of:

  • BulkImportSessionDTO
@EqualsAndHashCode(callSuper = true)
@Data
@NoArgsConstructor
public class BulkImportSessionDTO extends BaseAuthDTO {

private Long cohortId;
private Long connectorId;
private Long importerPID;

private ImportStatusEnum status = ImportStatusEnum.INIT;
private ImportModeEnum mode;

private Boolean dryRun;

private Long newEntities;
private Long updatedEntities;
private Long deletedEntities;
private Long failedEntities;
private Long unchangedEntities;
private Long receivedEntities;
private Long newDataEntries;
private Long failedDataEntries;
private Long expectedElements;
private Long processedEntities;

private String errorMessage;

public BulkImportSessionDTO(String keycloakId) {
super();
this.setKeycloakId(keycloakId);
}
}

Practical meaning:

  • receivedEntities: how many patients were received from the connector (sum over batches)
  • processedEntities: how many were processed into in-memory entities
  • newDataEntries / failedDataEntries: data-entry level success/failure
  • newEntities / updatedEntities / deletedEntities: cohort-level reconciliation results during STOP_TRANSFER

Audit / traceability

WebSocket requests do not pass through the normal HTTP filter chain, so the audit context is set manually.

For every incoming message, BulkImportWebsocketService.onMessage(...) ensures:

  • AuditContext.keycloakId
  • AuditContext.connectorId
  • AuditContext.runId (= import session id)
private void setAuditContext(String keycloakId, Long connectorId, Long runId) {
auditContext.setKeycloakId(keycloakId);
auditContext.setConnectorId(connectorId);
auditContext.setRunId(runId);
}

This is critical for auditing systems like a UserRevisionListener (Hibernate Envers style), where the listener depends on request-scoped context.


Data-entry grouping and importSchemaGroupId

While building a new PatientEntity from PatientDataMessage, the server assigns an importSchemaGroupId to each PatientDataEntryEntity.

The identifier is constructed as:

  • parentSchemaGroupId + ":" + connectorId + sessionId + ":" + rowIndex

In code:

String importSchemaGroupIdSuffix = connectorId + sessionId + ":" + rowIndex;
Long relevantSchemaGroupId = dataEntryEntity.getSchemaNode().getParent().getId();
dataEntryEntity.setImportSchemaGroupId(relevantSchemaGroupId + ":" + importSchemaGroupIdSuffix);

Why this exists:

  • Rows represent grouped observations (think: multiple parameters measured at the same timepoint)
  • When reconstructing wide-format or time series semantics later, you need a stable join key for “these entries belong together”

There is also a commented grouping validation block (currently disabled). Expect it to be re-enabled once the edge cases are fixed.


Error handling and cleanup

CRITICAL_ERROR is terminal

Any critical protocol violation or runtime error that should stop the run uses:

  • sendCriticalErrorandCleanup(connection, message, statusCode)

Behavior:

  1. Send a MessageWrapper<MessageWrapperError> with messageType = CRITICAL_ERROR
  2. Mark session ERROR and persist it (if already started)
  3. Close the WebSocket connection
@Transactional(REQUIRES_NEW)
public void sendCriticalErrorandCleanup(WebSocketConnection connection, String errorMessage, int statusCode) {
MessageWrapperError payload = new MessageWrapperError(errorMessage);
MessageWrapper<MessageWrapperError> msg = new MessageWrapper<>(MessageTypeEnum.CRITICAL_ERROR, statusCode, payload);
connection.sendTextAndAwait(msg);

BulkImportSessionDTO session = connection.userData().get(SESSION_KEY);
if (session != null && session.getStatus() != ImportStatusEnum.INIT) {
session.setStatus(ImportStatusEnum.ERROR);
session.setErrorMessage(errorMessage);
bulkImportBO.update(session);
}
connection.close().await();
}

Transfer identification mismatch

STOP_TRANSFER validates that the request matches the active session:

  • importId
  • connectorId
  • cohortId

Mismatch throws IllegalArgumentException and is treated as a critical error by the surrounding logic.


Thread-safety and transactions

Some WebSocket handlers are marked synchronized and @Transactional.

Why:

  • A single WebSocket connection can receive messages concurrently depending on client/proxy behavior.
  • Session state and persistence updates must remain consistent (e.g., starting twice, stopping while a batch is being processed).

Notable methods:

  • handleStartTransfer(...) → @Transactional + synchronized
  • handleStopTransfer(...) → @Transactional + synchronized
  • onError(...) → @Transactional + synchronized

Batch processing (handlePatientData) is transactional but not synchronized; comprehensive mode generally treats each message as an atomic operation.


Example message exchange

START_TRANSFER

Client → Server:

{
"messageType": "START_TRANSFER",
"status": 200,
"message": {
"cohortId": 12,
"connectorId": 7,
"importerPID": 4242,
"mode": "COMPREHENSIVE",
"elements": 2000,
"dry": false
}
}

Server → Client:

{
"messageType": "START_TRANSFER_RESPONSE",
"status": 200,
"message": {
"importId": 991,
"cohortId": 12,
"connectorId": 7
}
}

PATIENT_DATA (batch)

Client → Server:

{
"messageType": "PATIENT_DATA",
"status": 200,
"message": {
"batchId": 1,
"transferIdentification": {
"importId": 991,
"cohortId": 12,
"connectorId": 7
},
"patientDataMessages": [
{
"externalPatientId": "EXT-001",
"dataEntries": [
[
[
{ "schemaNodeId": 101, "value": 12.3 },
{ "schemaNodeId": 102, "value": 77 }
],
[
{ "schemaNodeId": 101, "value": 11.9 },
{ "schemaNodeId": 102, "value": 80 }
]
]
]
}
]
}
}

Server → Client (report):

{
"messageType": "PATIENT_REPORT",
"status": 200,
"message": {
"importId": 991,
"batchId": 1,
"errorLogs": [
{
"message": null,
"externalPatientId": "EXT-001",
"updated": true,
"errorFields": []
}
]
}
}

STOP_TRANSFER

Client → Server:

{
"messageType": "STOP_TRANSFER",
"status": 200,
"message": {
"importId": 991,
"cohortId": 12,
"connectorId": 7
}
}

Server → Client (final stats):

{
"messageType": "RUN_STATISTICS",
"status": 200,
"message": {
"id": 991,
"cohortId": 12,
"connectorId": 7,
"newEntities": 10,
"updatedEntities": 120,
"deletedEntities": 5,
"failedEntities": 0,
"unchangedEntities": 0,
"receivedEntities": 130,
"newDataEntries": 5000,
"failedDataEntries": 3
}
}

Implementation notes

  • Do not auto-commit on close. If you need “best-effort” persistence on disconnect, implement it explicitly and consider partial-run semantics.
  • Default mode is incomplete. Right now STOP_TRANSFER in DEFAULT results in 501 and a closed connection.
  • 3D data structure is intentional. Do not flatten it on the connector side unless you also preserve row-level grouping for timeseries joins.
  • AuditContext must be set on every message. If you add new message handlers, ensure setAuditContext(...) still runs before business logic.
  • Error paths should persist session status. If you introduce non-critical errors, decide whether they should:
    • continue the run and only log per-patient errors, or
    • terminate the run via CRITICAL_ERROR

Extending the protocol

If you add new message types:

  1. Extend MessageTypeEnum
  2. Add a DTO
  3. Add a case in onMessage(...)
  4. Decide:
    • does it require RUNNING session?
    • is it transactional?
    • should it be synchronized?

A good rule of thumb:

  • actions that mutate session lifecycle (START_TRANSFER, STOP_TRANSFER) should remain synchronized
  • batch processing can stay parallel as long as you don’t mutate shared per-connection state unsafely

Appendix: DTO reference

  • StartTransferConnectorDTO - starts a session
  • TransferIdentificationDTO - identifies a session in subsequent calls
  • BulkPatientDataMessage - a batch of patients
  • PatientDataMessage - one patient + data entries
  • BulkPatientReportDTO - per-batch report
  • PatientErrorLogsDTO / RunErrorFieldLogDTO - error reporting
  • BulkImportSessionDTO - final run statistics
  • ImportModeEnum - DEFAULT, COMPREHENSIVE
  • ImportStatusEnum - INIT, RUNNING, FINISHED, ERROR
  • BulkImportStatisticsTypeEnum - counter keys
  • MessageWrapper<T> - common envelope

Appendix: ID reference

This section summarizes the identifiers used throughout the bulk import protocol and clarifies where they are generated, what they uniquely identify, and their lifecycle scope.

  • cohortId Primary key of the target cohort in the local learning system.

    • Type: Long (auto-increment)
    • Generated by: Cohort service
    • Scope: Stable identifier for a cohort across the local clinic
  • connectorId Primary key of a connector in the data importer subsystem.

    • Type: Long (auto-increment)
    • Generated by: Connector service
    • Scope: Identifies the connector configuration
  • importerPID

    Identifier of a single import process execution on the connector side.

    • Type: Long (auto-increment or process-generated)
    • Generated by: Importer when starting a new run via the frontend
    • Scope: Connector-side run identifier, used for cross-system correlation and debugging
  • id Primary key of an import session in the local learning system.

    • Type: Long (auto-increment)
    • Generated by: Local learning backend during START_TRANSFER
    • Scope: Canonical server-side run identifier used for:
      • audit logging
      • error attribution
      • statistics aggregation
      • transfer identification in subsequent messages

Relationship overview

  • A connector (connectorId) can execute many runs (importerPID)
  • Each run results in exactly one import session (id) on the local-learning-api
  • Each import session operates on exactly one cohort (cohortId)
  • The tuple (id, cohortId, connectorId) uniquely identifies a transfer and is validated on every state-changing message