Bulk Patient Import over WebSockets
This module implements a stateful, authenticated WebSocket protocol to import many patients (and their data entries) into a cohort in a controlled, auditable way.
It is designed for connector-style data ingestion where an external system streams patient batches to the server, and the server responds with per-batch reports plus final run statistics.
Where this lives
Main entrypoint (WebSocket endpoint):
bio.cosy.feddb.local.api.cohort.patient.bulkimport.BulkImportWebsocketService
Session + persistence orchestration:
bio.cosy.feddb.local.api.cohort.patient.bulkimport.BulkImportSessionBO
Implemented import mode:
bio.cosy.feddb.local.api.cohort.patient.bulkimport.modes.BulkImportComprehensiveModeBO
Other modes exist conceptually but are not implemented yet (see Import modes).
High-level flow
The protocol is intentionally simple and strict:
- Client connects to the WebSocket (
/ws/bulkimport) - Client sends
START_TRANSFER - Server persists a new import session and returns
START_TRANSFER_RESPONSEwith the canonicalTransferIdentificationDTO - Client sends one or more
PATIENT_DATAbatches (chunked bybatchId) - Server responds to each batch with a
PATIENT_REPORT(per-patient error logs) - Client sends
STOP_TRANSFER - Server finalizes and returns
RUN_STATISTICS - Connection may be closed by either side
If something goes wrong, the server sends CRITICAL_ERROR and closes the connection.
WebSocket endpoint
Path
The WebSocket endpoint is exposed at:
BulkImportWebsocketService.PATH = /ws/bulkimport
@WebSocket(path = BulkImportWebsocketService.PATH)
@Authenticated
@RequestScoped
public class BulkImportWebsocketService { ... }
Authentication
The endpoint is annotated with @Authenticated, so the connection is established only for authenticated users.
The service uses UserIdentity to resolve the caller:
userIdentity.getKeycloakIdOrDefault()
This keycloakId becomes part of the import session metadata and is also used for audit context propagation.
Connection state (in-memory)
WebSockets are long-lived; you need some server-side state per connection. This implementation uses Quarkus WebSockets UserData storage with typed keys:
SESSION_KEY:BulkImportSessionDTOPATIENTS_KEY:List<PatientEntity>
public static final TypedKey<BulkImportSessionDTO> SESSION_KEY = new TypedKey<>("session");
public static final TypedKey<List<PatientEntity>> PATIENTS_KEY = new TypedKey<>("patients");
OnOpen
On connection open:
- resolve
keycloakId - create a
BulkImportSessionDTO(keycloakId)inINITstate (in-memory only) - attach an empty patient buffer list
@OnOpen
public void onOpen(WebSocketConnection connection) {
String keycloakId = userIdentity.getKeycloakIdOrDefault();
BulkImportSessionDTO session = new BulkImportSessionDTO(keycloakId);
connection.userData().put(SESSION_KEY, session);
connection.userData().put(PATIENTS_KEY, new ArrayList<>());
}
Important: the session created here is not persisted. Persistence only happens once the client sends START_TRANSFER.
OnClose
On close, the service does not auto-finalize.
This is a deliberate design choice:
- closing a WebSocket can happen for many reasons (client crash, network reset, proxy timeout)
- you do not want an implicit flush or a partial commit unless the connector explicitly finishes the run
So the rule is:
Only STOP_TRANSFER finalizes and persists the import result.
Closing the connection alone is not treated as a successful run.
OnError
// @OnError updates a persisted run to ERROR, but only if the session has already started:
// - if session is null or still INIT, nothing is persisted
// - otherwise:
// - session.status = ERROR
// - session.errorMessage = throwable.getMessage()
// - bulkImportBO.update(session)
@OnError
@Transactional
public synchronized void onError(WebSocketConnection connection, Throwable throwable) {
BulkImportSessionDTO session = connection.userData().get(SESSION_KEY);
if (session == null || session.getStatus() == ImportStatusEnum.INIT) return;
session.setErrorMessage(throwable.getMessage());
session.setStatus(ImportStatusEnum.ERROR);
bulkImportBO.update(session);
}
Message model
All inbound and outbound messages are wrapped in:
MessageWrapper<T>
@Data
@NoArgsConstructor
@AllArgsConstructor
public class MessageWrapper<T> {
@NotNull
private MessageTypeEnum messageType;
private Integer status;
private T message;
}
Message types
Client → Server:
START_TRANSFERPATIENT_DATASTOP_TRANSFER
Server → Client:
START_TRANSFER_RESPONSEPATIENT_REPORTRUN_STATISTICSCRITICAL_ERROR(terminal, closes connection)
public enum MessageTypeEnum {
START_TRANSFER,
STOP_TRANSFER,
PATIENT_DATA,
START_TRANSFER_RESPONSE,
CRITICAL_ERROR,
PATIENT_REPORT,
RUN_STATISTICS
}
Protocol details
1) START_TRANSFER
Client sends:
MessageWrapper<StartTransferConnectorDTO>with messageType = START_TRANSFER
StartTransferConnectorDTO:
- cohortId (target cohort)
- connectorId (source connector identifier)
- importerPID (process identifier on connector side)
- mode (DEFAULT or COMPREHENSIVE)
- elements (expected number of patient entities)
- dry (optional, default false)
@Data
public class StartTransferConnectorDTO {
@NotNull private Long cohortId;
@NotNull private Long connectorId;
@NotNull private Long importerPID;
@NotNull private ImportModeEnum mode;
@NotNull private Long elements;
private Boolean dry = false;
}
Server behavior:
- validates that the connection session is still INIT
- persists a new session via bulkImportBO.initNewSession(startDto)
- stores the persisted session back into UserData
- replies with:
- MessageTypeEnum.START_TRANSFER_RESPONSE
- TransferIdentificationDTO(id, cohortId, connectorId)
public synchronized MessageWrapper<TransferIdentificationDTO> handleStartTransfer(...) {
BulkImportSessionDTO currentSession = connection.userData().get(SESSION_KEY);
if (currentSession.getStatus() != ImportStatusEnum.INIT) { ... }
BulkImportSessionDTO startSession = bulkImportBO.initNewSession(startDto);
connection.userData().put(SESSION_KEY, startSession);
TransferIdentificationDTO response = new TransferIdentificationDTO(
startSession.getId(), startSession.getCohortId(), startSession.getConnectorId()
);
return new MessageWrapper<>(MessageTypeEnum.START_TRANSFER_RESPONSE, 200, response);
}
2) PATIENT_DATA (batched)
Client sends:
MessageWrapper<BulkPatientDataMessage>with messageType = PATIENT_DATA
BulkPatientDataMessage:
- batchId (chunk identifier; client-chosen)
- transferIdentification (must match session)
- patientDataMessages (list of patient payloads)
@Data
public class BulkPatientDataMessage {
@NotNull private Long batchId;
@NotNull private TransferIdentificationDTO transferIdentification;
@NotNull private List<PatientDataMessage> patientDataMessages;
}
PatientDataMessage contains:
- externalPatientId
- dataEntries in a 3D list structure:
List<List<List<PatientDataEntryDTO>>>
@Data
public class PatientDataMessage {
@NotNull private String externalPatientId;
@NotNull private List<List<List<PatientDataEntryDTO>>> dataEntries;
}
How to read dataEntries:
- outer list: multiple time series blocks (or “frames”)
- middle list: rows within that block (each row corresponds to one timestamp/visit/observation group)
- inner list: columns/parameters of that row (each is one PatientDataEntryDTO)
This structure is used to reconstruct grouped long-format observations while keeping row-level grouping stable.
Server behavior:
- requires an active RUNNING session
- routes by mode:
- COMPREHENSIVE → BulkImportComprehensiveModeBO.importPatients(...)
- DEFAULT → currently implemented in a separate BO but stop is not implemented (see below)
- responds with PATIENT_REPORT containing:
- importId
- batchId
- per-patient error logs
public MessageWrapper<BulkPatientReportDTO> handlePatientData(...) {
BulkImportSessionDTO session = connection.userData().get(SESSION_KEY);
if (session == null || session.getStatus() != ImportStatusEnum.RUNNING) { ... }
BulkPatientReportDTO report = comprehensiveModeBO.importPatients(...);
return new MessageWrapper<>(MessageTypeEnum.PATIENT_REPORT, 200, report);
}
Per-patient error logs
Every batch returns a report listing errors per patient:
@Data
@AllArgsConstructor
public class BulkPatientReportDTO {
@NotNull private Long importId;
@NotNull private Long batchId;
@NotNull private List<PatientErrorLogsDTO> errorLogs;
}
PatientErrorLogsDTO:
- message (patient-level message)
- externalPatientId
- updated flag (true if changes were applied / entries added)
- errorFields (field-level errors: schemaNodeId + message)
@Data
public class PatientErrorLogsDTO {
private String message;
private String externalPatientId;
private boolean updated = false;
private List<RunErrorFieldLogDTO> errorFields = new ArrayList<>();
}
This is intentionally verbose: you can feed the logs directly into connector diagnostics or UI.
3) STOP_TRANSFER
Client sends:
MessageWrapper<TransferIdentificationDTO>with messageType = STOP_TRANSFER
@Data
@AllArgsConstructor
public class TransferIdentificationDTO {
private Long id; //pk of the bulk import session inside local learning id
private Long cohortId;
private Long connectorId;
}
Server behavior:
- validates transferIdentification matches the session (importId, cohortId, connectorId)
- requires session state RUNNING
- mode-specific finalization:
Comprehensive mode finalization BulkImportComprehensiveModeBO.finishTransfer(session, patientsToCommit) does the final reconciliation:
- Build lookup map from imported patients by externalPatientId
- Load all existing cohort patients
- For each existing patient:
- if present in import → replace its data entries (update)
- if missing in import → delete patient (comprehensive sync semantics)
- Any remaining imported patients not matched → create new
- Update session counters + mark FINISHED
- Return BulkImportSessionDTO (run statistics)
Response message type:
- MessageTypeEnum.RUN_STATISTICS
public synchronized MessageWrapper<BulkImportSessionDTO> handleStopTransfer(...) {
if (session.getMode() == ImportModeEnum.COMPREHENSIVE) {
BulkImportSessionDTO stats = comprehensiveModeBO.finishTransfer(session, patients);
return new MessageWrapper<>(MessageTypeEnum.RUN_STATISTICS, stats);
}
...
}
Default mode finalization (not implemented) DEFAULT currently returns:
- CRITICAL_ERROR with HTTP-like status 501
- connection closes
sendCriticalErrorandCleanup(connection, "Default mode not implemented yet", 501);
Import modes
The bulk import protocol supports different run modes that define how the incoming data set is interpreted relative to previously imported data. Each run mode expresses a contract between the connector and the platform about data completeness and deletion semantics.
The semantics are intentionally strict to guarantee reproducibility, auditability, and deterministic outcomes.
INSERT mode
Semantics: Incremental insert without deletion.
- New entities present in the run are created
- Existing entities: All new data entries from the import are always inserted, there is no check for duplications or whether similar entries exist already.
- No entities are deleted, even if they are missing from the current run
INSERT mode is suited for connectors that:
- ingest data incrementally,
- cannot guarantee that each run contains a full snapshot of the source system,
- or explicitly want to avoid destructive operations.
In the current implementation:
- Patient entities are created or updated based on the incoming batches
- if the patient exists, add only data to Table: patient_data
- If the patient is new, add patient and data (patient_data)
- Missing patients from previous runs are left untouched
- STOP_TRANSFER finalization performs no reconciliation-based deletion
INSERT mode assumes that absence of data does not imply deletion.
COMPREHENSIVE mode
Semantics: Snapshot-based synchronization (source-of-truth mode).
In COMPREHENSIVE mode, each run is treated as a complete representation of the cohort state as provided by the connector.
- New entities present in the run are created
- Existing entities are updated (or remain unchanged) based solely on the current run
- Any entities that were created by previous runs of the same connector, but are not present in the current run, are deleted
This mode is designed for connectors that:
- can reliably deliver the entire dataset on every run,
- treat the source system as authoritative,
- and want the target cohort to exactly mirror the source state.
Failure semantics (important)
If a connector running in COMPREHENSIVE mode encounters a critical error at any point in the run lifecycle (including during batch processing or finalization):
- the entire run fails
- no partial results are committed
- the import session is marked as ERROR
- the cohort state remains unchanged
This ensures atomic snapshot semantics and prevents partially applied destructive updates.
In other words: COMPREHENSIVE mode is all-or-nothing.
DELETION mode (planned / not yet implemented)
Semantics: Explicit deletion-only run.
In DELETION mode, the connector run is interpreted as a deletion instruction set.
- No entities are created
- No entities are updated
- Every entity whose identifier appears in the run is deleted
This mode is intended for advanced connector workflows where:
- deletions are pushed explicitly (e.g. event-driven source systems),
- each run represents only entities to be removed,
- and the run is not a comprehensive dataset.
Typical use cases include:
- warning-based cleanup runs,
- corrective deletion after misconfigured imports,
- or explicit "tombstone" synchronization from upstream systems.
DELETION mode is destructive by design and should be used with care.
Choosing the right run mode
| Run mode | Assumes full dataset | Creates | Updates (Replace) | Updates (Inserts) | Deletes | Typical use case |
|---|---|---|---|---|---|---|
| INSERT | ❌ No | ✅ Yes | ❌ No | ✅ Yes | ❌ No | Incremental ingestion |
| COMPREHENSIVE | ✅ Yes | ✅ Yes | ✅ Yes | ✅ Yes | ✅ Yes | Snapshot synchronization |
| DELETION | ❌ No | ❌ No | ❌ No | ❌ No | ✅ Yes | Explicit deletion runs |
Mapping to protocol behavior
- The run mode is fixed when START_TRANSFER is issued
- The selected mode governs:
- batch processing semantics
- STOP_TRANSFER finalization behavior
- error handling and rollback guarantees
- Mode-specific logic is encapsulated in dedicated Business Objects (e.g. BulkImportComprehensiveModeBO)
Statistics
Statistics are tracked with BulkImportStatisticsTypeEnum:
public enum BulkImportStatisticsTypeEnum {
NEW_ENTITIES,
UPDATED_ENTITIES,
DELETED_ENTITIES,
FAILED_ENTITIES,
UNCHANGED_ENTITIES,
RECEIVED_ENTITIES,
PROCCESSED_ENTITIES,
NEW_DATA_ENTRIES,
FAILED_DATA_ENTRIES;
}
In comprehensive mode, the importer uses these counters while processing batches and while finalizing. They are returned at the end as part of:
- BulkImportSessionDTO
@EqualsAndHashCode(callSuper = true)
@Data
@NoArgsConstructor
public class BulkImportSessionDTO extends BaseAuthDTO {
private Long cohortId;
private Long connectorId;
private Long importerPID;
private ImportStatusEnum status = ImportStatusEnum.INIT;
private ImportModeEnum mode;
private Boolean dryRun;
private Long newEntities;
private Long updatedEntities;
private Long deletedEntities;
private Long failedEntities;
private Long unchangedEntities;
private Long receivedEntities;
private Long newDataEntries;
private Long failedDataEntries;
private Long expectedElements;
private Long processedEntities;
private String errorMessage;
public BulkImportSessionDTO(String keycloakId) {
super();
this.setKeycloakId(keycloakId);
}
}
Practical meaning:
- receivedEntities: how many patients were received from the connector (sum over batches)
- processedEntities: how many were processed into in-memory entities
- newDataEntries / failedDataEntries: data-entry level success/failure
- newEntities / updatedEntities / deletedEntities: cohort-level reconciliation results during STOP_TRANSFER
Audit / traceability
WebSocket requests do not pass through the normal HTTP filter chain, so the audit context is set manually.
For every incoming message, BulkImportWebsocketService.onMessage(...) ensures:
- AuditContext.keycloakId
- AuditContext.connectorId
- AuditContext.runId (= import session id)
private void setAuditContext(String keycloakId, Long connectorId, Long runId) {
auditContext.setKeycloakId(keycloakId);
auditContext.setConnectorId(connectorId);
auditContext.setRunId(runId);
}
This is critical for auditing systems like a UserRevisionListener (Hibernate Envers style), where the listener depends on request-scoped context.
Data-entry grouping and importSchemaGroupId
While building a new PatientEntity from PatientDataMessage, the server assigns an importSchemaGroupId to each PatientDataEntryEntity.
The identifier is constructed as:
- parentSchemaGroupId + ":" + connectorId + sessionId + ":" + rowIndex
In code:
String importSchemaGroupIdSuffix = connectorId + sessionId + ":" + rowIndex;
Long relevantSchemaGroupId = dataEntryEntity.getSchemaNode().getParent().getId();
dataEntryEntity.setImportSchemaGroupId(relevantSchemaGroupId + ":" + importSchemaGroupIdSuffix);
Why this exists:
- Rows represent grouped observations (think: multiple parameters measured at the same timepoint)
- When reconstructing wide-format or time series semantics later, you need a stable join key for “these entries belong together”
There is also a commented grouping validation block (currently disabled). Expect it to be re-enabled once the edge cases are fixed.
Error handling and cleanup
CRITICAL_ERROR is terminal
Any critical protocol violation or runtime error that should stop the run uses:
- sendCriticalErrorandCleanup(connection, message, statusCode)
Behavior:
- Send a
MessageWrapper<MessageWrapperError>with messageType = CRITICAL_ERROR - Mark session ERROR and persist it (if already started)
- Close the WebSocket connection
@Transactional(REQUIRES_NEW)
public void sendCriticalErrorandCleanup(WebSocketConnection connection, String errorMessage, int statusCode) {
MessageWrapperError payload = new MessageWrapperError(errorMessage);
MessageWrapper<MessageWrapperError> msg = new MessageWrapper<>(MessageTypeEnum.CRITICAL_ERROR, statusCode, payload);
connection.sendTextAndAwait(msg);
BulkImportSessionDTO session = connection.userData().get(SESSION_KEY);
if (session != null && session.getStatus() != ImportStatusEnum.INIT) {
session.setStatus(ImportStatusEnum.ERROR);
session.setErrorMessage(errorMessage);
bulkImportBO.update(session);
}
connection.close().await();
}
Transfer identification mismatch
STOP_TRANSFER validates that the request matches the active session:
- importId
- connectorId
- cohortId
Mismatch throws IllegalArgumentException and is treated as a critical error by the surrounding logic.
Thread-safety and transactions
Some WebSocket handlers are marked synchronized and @Transactional.
Why:
- A single WebSocket connection can receive messages concurrently depending on client/proxy behavior.
- Session state and persistence updates must remain consistent (e.g., starting twice, stopping while a batch is being processed).
Notable methods:
- handleStartTransfer(...) → @Transactional + synchronized
- handleStopTransfer(...) → @Transactional + synchronized
- onError(...) → @Transactional + synchronized
Batch processing (handlePatientData) is transactional but not synchronized; comprehensive mode generally treats each message as an atomic operation.
Example message exchange
START_TRANSFER
Client → Server:
{
"messageType": "START_TRANSFER",
"status": 200,
"message": {
"cohortId": 12,
"connectorId": 7,
"importerPID": 4242,
"mode": "COMPREHENSIVE",
"elements": 2000,
"dry": false
}
}
Server → Client:
{
"messageType": "START_TRANSFER_RESPONSE",
"status": 200,
"message": {
"importId": 991,
"cohortId": 12,
"connectorId": 7
}
}
PATIENT_DATA (batch)
Client → Server:
{
"messageType": "PATIENT_DATA",
"status": 200,
"message": {
"batchId": 1,
"transferIdentification": {
"importId": 991,
"cohortId": 12,
"connectorId": 7
},
"patientDataMessages": [
{
"externalPatientId": "EXT-001",
"dataEntries": [
[
[
{ "schemaNodeId": 101, "value": 12.3 },
{ "schemaNodeId": 102, "value": 77 }
],
[
{ "schemaNodeId": 101, "value": 11.9 },
{ "schemaNodeId": 102, "value": 80 }
]
]
]
}
]
}
}
Server → Client (report):
{
"messageType": "PATIENT_REPORT",
"status": 200,
"message": {
"importId": 991,
"batchId": 1,
"errorLogs": [
{
"message": null,
"externalPatientId": "EXT-001",
"updated": true,
"errorFields": []
}
]
}
}
STOP_TRANSFER
Client → Server:
{
"messageType": "STOP_TRANSFER",
"status": 200,
"message": {
"importId": 991,
"cohortId": 12,
"connectorId": 7
}
}
Server → Client (final stats):
{
"messageType": "RUN_STATISTICS",
"status": 200,
"message": {
"id": 991,
"cohortId": 12,
"connectorId": 7,
"newEntities": 10,
"updatedEntities": 120,
"deletedEntities": 5,
"failedEntities": 0,
"unchangedEntities": 0,
"receivedEntities": 130,
"newDataEntries": 5000,
"failedDataEntries": 3
}
}
Implementation notes
- Do not auto-commit on close. If you need “best-effort” persistence on disconnect, implement it explicitly and consider partial-run semantics.
- Default mode is incomplete. Right now STOP_TRANSFER in DEFAULT results in 501 and a closed connection.
- 3D data structure is intentional. Do not flatten it on the connector side unless you also preserve row-level grouping for timeseries joins.
- AuditContext must be set on every message. If you add new message handlers, ensure setAuditContext(...) still runs before business logic.
- Error paths should persist session status. If you introduce non-critical errors, decide whether they should:
- continue the run and only log per-patient errors, or
- terminate the run via CRITICAL_ERROR
Extending the protocol
If you add new message types:
- Extend MessageTypeEnum
- Add a DTO
- Add a case in onMessage(...)
- Decide:
- does it require RUNNING session?
- is it transactional?
- should it be synchronized?
A good rule of thumb:
- actions that mutate session lifecycle (START_TRANSFER, STOP_TRANSFER) should remain synchronized
- batch processing can stay parallel as long as you don’t mutate shared per-connection state unsafely
Appendix: DTO reference
StartTransferConnectorDTO- starts a sessionTransferIdentificationDTO- identifies a session in subsequent callsBulkPatientDataMessage- a batch of patientsPatientDataMessage- one patient + data entriesBulkPatientReportDTO- per-batch reportPatientErrorLogsDTO/RunErrorFieldLogDTO- error reportingBulkImportSessionDTO- final run statisticsImportModeEnum- DEFAULT, COMPREHENSIVEImportStatusEnum- INIT, RUNNING, FINISHED, ERRORBulkImportStatisticsTypeEnum- counter keysMessageWrapper<T>- common envelope
Appendix: ID reference
This section summarizes the identifiers used throughout the bulk import protocol and clarifies where they are generated, what they uniquely identify, and their lifecycle scope.
-
cohortId Primary key of the target cohort in the local learning system.
- Type: Long (auto-increment)
- Generated by: Cohort service
- Scope: Stable identifier for a cohort across the local clinic
-
connectorId Primary key of a connector in the data importer subsystem.
- Type: Long (auto-increment)
- Generated by: Connector service
- Scope: Identifies the connector configuration
-
importerPID
Identifier of a single import process execution on the connector side.
- Type: Long (auto-increment or process-generated)
- Generated by: Importer when starting a new run via the frontend
- Scope: Connector-side run identifier, used for cross-system correlation and debugging
-
id Primary key of an import session in the local learning system.
- Type: Long (auto-increment)
- Generated by: Local learning backend during START_TRANSFER
- Scope: Canonical server-side run identifier used for:
- audit logging
- error attribution
- statistics aggregation
- transfer identification in subsequent messages
Relationship overview
- A connector (
connectorId) can execute many runs (importerPID) - Each run results in exactly one import session (
id) on the local-learning-api - Each import session operates on exactly one cohort (
cohortId) - The tuple
(id, cohortId, connectorId)uniquely identifies a transfer and is validated on every state-changing message