Worker RPC Functions
These functions are called by stateless workers during job processing. They follow a standardized error handling pattern to ensure consistent behavior across all workers.
Response Format
All worker RPCs return JSONB with a standardized structure for consistent error handling:
For Job Processing Functions
Functions that process and complete jobs return:
{
"success": boolean,
"error": string | null,
"retry": boolean
}For Resource Creation Functions
Functions that create resources (like documents) return:
{
"document_id": string | null, // or other resource identifier
"error": string | null,
"retry": boolean
}Retry Logic
The retry field indicates whether the worker should attempt the operation again:
retry: true- Transient errors (deadlocks, lock conflicts) - safe to retry automaticallyretry: false- Permanent errors (validation failures, data errors, business logic violations) - requires investigation
Exception Handling
All worker functions have comprehensive exception handling:
EXCEPTION
WHEN deadlock_detected THEN
-- Transient database conflict - safe to retry
RETURN jsonb_build_object('success', false, 'error', 'Deadlock detected: ...', 'retry', true);
WHEN OTHERS THEN
-- All other errors - include SQLSTATE for debugging
RETURN jsonb_build_object('success', false, 'error', SQLERRM || ' (SQLSTATE: ' || SQLSTATE || ')', 'retry', false);The SQLSTATE code helps debug unknown errors. Common states include:
23502- NOT NULL violation23503- Foreign key violation23505- Unique violation23514- Check constraint violation
Function Reference
store_discovery_results
Summary: Receives the results of a basic discovery search and writes the outcome to the database.
Context: This is the primary entry point for the basic discovery worker after it has queried the Handelsregister API. It handles all business logic for different outcomes (too many results, few results, etc.) and ensures atomic updates to searches, processing_jobs, and entities tables.
Signature:
store_discovery_results(
p_job_id uuid,
p_search_id uuid,
p_result_count integer,
p_results jsonb
) RETURNS JSONBParameters:
| Name | Type | Required | Description |
|---|---|---|---|
p_job_id | uuid | Yes | The ID of the processing_jobs entry being processed |
p_search_id | uuid | Yes | The ID of the original search from public.searches |
p_result_count | integer | Yes | Total number of results found |
p_results | jsonb | Yes | JSON array of companies found (structure varies by path) |
p_results JSON Structure:
All results must include these fields:
name,legal_form,register_court,register_numberaddress,seat,state,rawregister_type,registration_statussi_document_xml(only for Path A: ≤3 results)
Core Logic & Behavior:
The function routes to different paths based on p_result_count:
Path C (>20 results): Too many results
- Sets search status to
result_count_exceeds_limit - Completes the job
- Sets search status to
Path B (4-20 results): User selection required
- Creates/updates entities with status
discovered - Links entities to search
- Completes the job
- Creates/updates entities with status
Path A (1-3 results): Auto-download
- Creates/updates entities with XML included
- Sets entity status to
xml_ready - Triggers automatic XML parsing (via database trigger)
- Completes the job
No results (0):
- Marks search as completed with 0 results
Return Value:
{
"success": true,
"error": null,
"retry": false
}Error Handling:
- Returns
retry: falsefor validation errors (job not found, wrong status) - Returns
retry: truefor deadlocks - Returns
retry: falsewith SQLSTATE for unknown errors
store_xml_parsing_results
Summary: Stores parsed XML data after LLM extraction is complete.
Context: Called by the XML parsing worker after extracting structured data from the si_document_xml field. Marks the basic discovery phase as complete.
Signature:
store_xml_parsing_results(
p_job_id uuid,
p_entity_id uuid,
p_results jsonb,
p_address text DEFAULT NULL
) RETURNS JSONBParameters:
| Name | Type | Required | Description |
|---|---|---|---|
p_job_id | uuid | Yes | The ID of the xml_parsing job |
p_entity_id | uuid | Yes | The entity whose XML was parsed |
p_results | jsonb | Yes | Structured data extracted from XML |
p_address | text | No | Parsed address (updates entity if provided) |
p_results JSON Structure:
{
"company_details": {
"name": "Example Company GmbH",
"legal_form": "GmbH"
},
"registration": {
"court": "Amtsgericht Charlottenburg",
"number": "12345 B"
},
"capital": "25000 EUR"
}Core Logic & Behavior:
- Validation: Verifies job is
xml_parsingtype inrunningstatus and entity isxml_parsing_running - Entity Update: Sets
si_document_jsonto parsed results, updates address if provided, sets status tobasic_discovery_complete - Job Completion: Marks job as completed with success message
Return Value:
{
"success": true,
"error": null,
"retry": false
}store_shareholder_download_results
Summary: Records successful download of a shareholder list document and queues it for parsing.
Context: Called after downloading a shareholder document from Handelsregister. Separating download from parsing ensures the expensive file download only happens once, even if LLM extraction fails.
Signature:
store_shareholder_download_results(
p_job_id UUID,
p_entity_id UUID,
p_document_id UUID
) RETURNS JSONBParameters:
| Name | Type | Required | Description |
|---|---|---|---|
p_job_id | UUID | Yes | The ID of the shareholder_download job |
p_entity_id | UUID | Yes | The entity whose document was downloaded |
p_document_id | UUID | Yes | The stored document ID from content.documents |
Core Logic & Behavior:
- Validation: Verifies job is
shareholder_downloadinrunningstatus and entity isdownloading - Entity Update: Sets
shareholder_research_statustodownloaded - Download Job Completion: Marks download job as completed
- Parsing Job Creation: Creates new
shareholder_parsingjob with document_id in data field
Return Value:
{
"success": true,
"error": null,
"retry": false
}store_shareholder_parsing_results
Summary: Stores extracted shareholder data and completes the shareholder research process.
Context: Called after LLM has processed the shareholder document and extracted structured shareholder information.
Signature:
store_shareholder_parsing_results(
p_job_id UUID,
p_entity_id UUID,
p_source_document_id UUID,
p_shareholders JSONB
) RETURNS JSONBParameters:
| Name | Type | Required | Description |
|---|---|---|---|
p_job_id | UUID | Yes | The ID of the shareholder_parsing job |
p_entity_id | UUID | Yes | The entity whose shareholders are being stored |
p_source_document_id | UUID | Yes | The source document from which data was extracted |
p_shareholders | JSONB | Yes | Array of shareholder objects |
p_shareholders JSON Structure:
Array of objects with fields depending on shareholder_type:
Natural person:
{
"shareholder_type": "natural_person",
"first_name": "Max",
"last_name": "Mustermann",
"date_of_birth": "1980-05-15",
"residence": "Berlin",
"share_nominal_amount": 25000,
"share_percentage": 50.0,
"sequence_number": 1
}Organization:
{
"shareholder_type": "organization",
"company_name": "Example Holding GmbH",
"register_court": "Amtsgericht München",
"register_type": "HRB",
"register_number": "12345",
"seat": "München",
"foreign_entity": false,
"share_nominal_amount": 25000,
"share_percentage": 50.0,
"sequence_number": 2
}Core Logic & Behavior:
- Validation: Verifies job is
shareholder_parsinginrunningstatus and entity isparsing - Shareholder Insertion: Loops through shareholders array and inserts each into
content.entity_shareholders - Entity Update: Sets
shareholder_research_statustocomplete - Job Completion: Marks parsing job as completed
Return Value:
{
"success": true,
"error": null,
"retry": false
}Error Scenarios:
Common errors that return retry: false:
- Missing required field (e.g.,
shareholder_typeis NULL) → NOT NULL violation - Invalid foreign key (e.g., bad
entity_id) → Foreign key violation - Malformed JSONB structure → Type conversion error
record_file_and_document
Summary: Registers a successfully uploaded file in the database, creating records in both
content.filesandcontent.documents.
Context: Primary entry point after a file has been uploaded to the storage bucket. Handles file deduplication via hash checking.
Signature:
record_file_and_document(
p_file_hash TEXT,
p_storage_path TEXT,
p_file_size_bytes BIGINT,
p_mime_type TEXT,
p_entity_id UUID,
p_original_filename TEXT,
p_parent_document_id UUID DEFAULT NULL,
p_display_name TEXT DEFAULT NULL,
p_hr_document_path TEXT DEFAULT NULL,
p_document_date DATE DEFAULT NULL,
p_received_on DATE DEFAULT NULL,
p_published_on DATE DEFAULT NULL,
p_created_by TEXT DEFAULT NULL,
p_type_of_document TEXT DEFAULT NULL,
p_language_identifier TEXT DEFAULT NULL
) RETURNS JSONBParameters:
| Name | Type | Required | Description |
|---|---|---|---|
p_file_hash | TEXT | Yes | SHA256 hash of file content (for deduplication) |
p_storage_path | TEXT | Yes | Full path in Supabase storage bucket |
p_file_size_bytes | BIGINT | Yes | File size in bytes |
p_mime_type | TEXT | Yes | MIME type (e.g., 'application/pdf') |
p_entity_id | UUID | Yes | The entity this document belongs to |
p_original_filename | TEXT | Yes | Original filename from Handelsregister |
p_parent_document_id | UUID | No | Parent document if extracted from archive |
p_display_name | TEXT | No | Human-readable document name |
p_hr_document_path | TEXT | No | Handelsregister navigation path (e.g., "VÖ/1/2") |
p_document_date | DATE | No | Date printed on document |
p_received_on | DATE | No | Date received by register |
p_published_on | DATE | No | Date officially published |
p_created_by | TEXT | No | The source/creator named in Handelsregister |
p_type_of_document | TEXT | No | Specific document type (e.g., 'Gesellschafterliste', 'Jahresabschluss') |
p_language_identifier | TEXT | No | Language of the document in Handelsregister |
Core Logic & Behavior:
- File Deduplication: Checks
content.filesfor existing hash- If found: Reuses existing file ID
- If not found: Creates new file record
- Document Creation: Inserts new document record linking to file and entity
Return Value:
{
"document_id": "uuid-of-created-document",
"error": null,
"retry": false
}Error Scenarios:
- Invalid
entity_id→ Foreign key violation,retry: false - Missing required parameter → NOT NULL violation,
retry: false
record_job_error
Summary: Centralized function for workers to report job failures with structured error data and optional retry delays.
Context: Single entry point for handling job errors. Implements automatic retry logic with configurable delays. The function checks retry_count against max_retries to determine whether to retry or mark the job as permanently failed.
Signature:
record_job_error(
p_job_id uuid,
p_error_source text,
p_error_type text,
p_error_sub_type text,
p_error_severity integer,
p_error_message text,
p_retry_delay_seconds integer DEFAULT NULL
) RETURNS JSONBParameters:
| Name | Type | Required | Description |
|---|---|---|---|
p_job_id | uuid | Yes | The ID of the failed job |
p_error_source | text | Yes | Origin of error (e.g., worker, rpc_function) |
p_error_type | text | Yes | Broad category (e.g., api_error, validation_error) |
p_error_sub_type | text | Yes | Specific error code (e.g., api_timeout, job_not_found) |
p_error_severity | integer | Yes | Impact level (1=low to 5=critical) |
p_error_message | text | Yes | Human-readable error description |
p_retry_delay_seconds | integer | No | Seconds to delay before retry (default: immediate) |
Core Logic & Behavior:
- Job Validation: Locks job row, validates it exists and is in
queuedorrunningstatus - Retry Check: Compares
retry_countagainstmax_retriesto determine if retry is possible - Retry Path (if
retry_count < max_retries):- Increments
retry_count - Sets status to
queued - Sets
run_attonow() + delay(allows scheduled retries) - Clears
worker_idandstarted_atfor clean retry - Stores error data in
resultfield
- Increments
- Failure Path (if max retries exceeded):
- Sets status to
failed - Sets
completed_attimestamp - Stores error data in
resultfield
- Sets status to
Note: This function accepts jobs in both queued and running status to handle errors that occur:
- Before a worker picks up the job (validation errors, business logic violations)
- During job processing (worker errors, API failures)
Structured Error Format:
The function stores errors in this format:
{
"error": {
"source": "worker",
"type": "api_error",
"sub_type": "api_timeout",
"severity": 3,
"message": "Handelsregister API request timed out after 30s"
}
}Return Value:
Retry scheduled:
{
"success": true,
"error": null,
"retry": true,
"retry_at": "2025-11-24T19:22:37.010377Z"
}Max retries exceeded (final failure):
{
"success": true,
"error": null,
"retry": false,
"retry_at": null
}Validation error:
{
"success": false,
"error": "Job with ID ... not found",
"retry": false,
"retry_at": null
}Retry Strategy Examples:
// Immediate retry (network glitch)
await supabase.rpc('record_job_error', {
p_job_id: jobId,
p_error_source: 'worker',
p_error_type: 'network_error',
p_error_sub_type: 'connection_reset',
p_error_severity: 2,
p_error_message: 'Connection reset by peer'
// No delay - retry immediately
});
// Delayed retry (rate limiting)
await supabase.rpc('record_job_error', {
p_job_id: jobId,
p_error_source: 'worker',
p_error_type: 'api_error',
p_error_sub_type: 'rate_limit',
p_error_severity: 2,
p_error_message: 'Rate limit exceeded',
p_retry_delay_seconds: 300 // Wait 5 minutes
});The idx_processing_jobs_status_run_at index optimizes this query.
Permissions: Uses default SECURITY INVOKER. Intended for service_role clients.