Skip to content

Worker RPC Functions

These functions are called by stateless workers during job processing. They follow a standardized error handling pattern to ensure consistent behavior across all workers.

Response Format

All worker RPCs return JSONB with a standardized structure for consistent error handling:

For Job Processing Functions

Functions that process and complete jobs return:

typescript
{
  "success": boolean,
  "error": string | null,
  "retry": boolean
}

For Resource Creation Functions

Functions that create resources (like documents) return:

typescript
{
  "document_id": string | null,  // or other resource identifier
  "error": string | null,
  "retry": boolean
}

Retry Logic

The retry field indicates whether the worker should attempt the operation again:

  • retry: true - Transient errors (deadlocks, lock conflicts) - safe to retry automatically
  • retry: false - Permanent errors (validation failures, data errors, business logic violations) - requires investigation

Exception Handling

All worker functions have comprehensive exception handling:

sql
EXCEPTION
    WHEN deadlock_detected THEN
        -- Transient database conflict - safe to retry
        RETURN jsonb_build_object('success', false, 'error', 'Deadlock detected: ...', 'retry', true);

    WHEN OTHERS THEN
        -- All other errors - include SQLSTATE for debugging
        RETURN jsonb_build_object('success', false, 'error', SQLERRM || ' (SQLSTATE: ' || SQLSTATE || ')', 'retry', false);

The SQLSTATE code helps debug unknown errors. Common states include:

  • 23502 - NOT NULL violation
  • 23503 - Foreign key violation
  • 23505 - Unique violation
  • 23514 - Check constraint violation

Function Reference

store_discovery_results

Summary: Receives the results of a basic discovery search and writes the outcome to the database.

Context: This is the primary entry point for the basic discovery worker after it has queried the Handelsregister API. It handles all business logic for different outcomes (too many results, few results, etc.) and ensures atomic updates to searches, processing_jobs, and entities tables.

Signature:

sql
store_discovery_results(
  p_job_id uuid,
  p_search_id uuid,
  p_result_count integer,
  p_results jsonb
) RETURNS JSONB

Parameters:

NameTypeRequiredDescription
p_job_iduuidYesThe ID of the processing_jobs entry being processed
p_search_iduuidYesThe ID of the original search from public.searches
p_result_countintegerYesTotal number of results found
p_resultsjsonbYesJSON array of companies found (structure varies by path)

p_results JSON Structure:

All results must include these fields:

  • name, legal_form, register_court, register_number
  • address, seat, state, raw
  • register_type, registration_status
  • si_document_xml (only for Path A: ≤3 results)

Core Logic & Behavior:

The function routes to different paths based on p_result_count:

  • Path C (>20 results): Too many results

    • Sets search status to result_count_exceeds_limit
    • Completes the job
  • Path B (4-20 results): User selection required

    • Creates/updates entities with status discovered
    • Links entities to search
    • Completes the job
  • Path A (1-3 results): Auto-download

    • Creates/updates entities with XML included
    • Sets entity status to xml_ready
    • Triggers automatic XML parsing (via database trigger)
    • Completes the job
  • No results (0):

    • Marks search as completed with 0 results

Return Value:

json
{
  "success": true,
  "error": null,
  "retry": false
}

Error Handling:

  • Returns retry: false for validation errors (job not found, wrong status)
  • Returns retry: true for deadlocks
  • Returns retry: false with SQLSTATE for unknown errors

store_xml_parsing_results

Summary: Stores parsed XML data after LLM extraction is complete.

Context: Called by the XML parsing worker after extracting structured data from the si_document_xml field. Marks the basic discovery phase as complete.

Signature:

sql
store_xml_parsing_results(
  p_job_id uuid,
  p_entity_id uuid,
  p_results jsonb,
  p_address text DEFAULT NULL
) RETURNS JSONB

Parameters:

NameTypeRequiredDescription
p_job_iduuidYesThe ID of the xml_parsing job
p_entity_iduuidYesThe entity whose XML was parsed
p_resultsjsonbYesStructured data extracted from XML
p_addresstextNoParsed address (updates entity if provided)

p_results JSON Structure:

json
{
  "company_details": {
    "name": "Example Company GmbH",
    "legal_form": "GmbH"
  },
  "registration": {
    "court": "Amtsgericht Charlottenburg",
    "number": "12345 B"
  },
  "capital": "25000 EUR"
}

Core Logic & Behavior:

  1. Validation: Verifies job is xml_parsing type in running status and entity is xml_parsing_running
  2. Entity Update: Sets si_document_json to parsed results, updates address if provided, sets status to basic_discovery_complete
  3. Job Completion: Marks job as completed with success message

Return Value:

json
{
  "success": true,
  "error": null,
  "retry": false
}

store_shareholder_download_results

Summary: Records successful download of a shareholder list document and queues it for parsing.

Context: Called after downloading a shareholder document from Handelsregister. Separating download from parsing ensures the expensive file download only happens once, even if LLM extraction fails.

Signature:

sql
store_shareholder_download_results(
  p_job_id UUID,
  p_entity_id UUID,
  p_document_id UUID
) RETURNS JSONB

Parameters:

NameTypeRequiredDescription
p_job_idUUIDYesThe ID of the shareholder_download job
p_entity_idUUIDYesThe entity whose document was downloaded
p_document_idUUIDYesThe stored document ID from content.documents

Core Logic & Behavior:

  1. Validation: Verifies job is shareholder_download in running status and entity is downloading
  2. Entity Update: Sets shareholder_research_status to downloaded
  3. Download Job Completion: Marks download job as completed
  4. Parsing Job Creation: Creates new shareholder_parsing job with document_id in data field

Return Value:

json
{
  "success": true,
  "error": null,
  "retry": false
}

store_shareholder_parsing_results

Summary: Stores extracted shareholder data and completes the shareholder research process.

Context: Called after LLM has processed the shareholder document and extracted structured shareholder information.

Signature:

sql
store_shareholder_parsing_results(
  p_job_id UUID,
  p_entity_id UUID,
  p_source_document_id UUID,
  p_shareholders JSONB
) RETURNS JSONB

Parameters:

NameTypeRequiredDescription
p_job_idUUIDYesThe ID of the shareholder_parsing job
p_entity_idUUIDYesThe entity whose shareholders are being stored
p_source_document_idUUIDYesThe source document from which data was extracted
p_shareholdersJSONBYesArray of shareholder objects

p_shareholders JSON Structure:

Array of objects with fields depending on shareholder_type:

Natural person:

json
{
  "shareholder_type": "natural_person",
  "first_name": "Max",
  "last_name": "Mustermann",
  "date_of_birth": "1980-05-15",
  "residence": "Berlin",
  "share_nominal_amount": 25000,
  "share_percentage": 50.0,
  "sequence_number": 1
}

Organization:

json
{
  "shareholder_type": "organization",
  "company_name": "Example Holding GmbH",
  "register_court": "Amtsgericht München",
  "register_type": "HRB",
  "register_number": "12345",
  "seat": "München",
  "foreign_entity": false,
  "share_nominal_amount": 25000,
  "share_percentage": 50.0,
  "sequence_number": 2
}

Core Logic & Behavior:

  1. Validation: Verifies job is shareholder_parsing in running status and entity is parsing
  2. Shareholder Insertion: Loops through shareholders array and inserts each into content.entity_shareholders
  3. Entity Update: Sets shareholder_research_status to complete
  4. Job Completion: Marks parsing job as completed

Return Value:

json
{
  "success": true,
  "error": null,
  "retry": false
}

Error Scenarios:

Common errors that return retry: false:

  • Missing required field (e.g., shareholder_type is NULL) → NOT NULL violation
  • Invalid foreign key (e.g., bad entity_id) → Foreign key violation
  • Malformed JSONB structure → Type conversion error

record_file_and_document

Summary: Registers a successfully uploaded file in the database, creating records in both content.files and content.documents.

Context: Primary entry point after a file has been uploaded to the storage bucket. Handles file deduplication via hash checking.

Signature:

sql
record_file_and_document(
  p_file_hash TEXT,
  p_storage_path TEXT,
  p_file_size_bytes BIGINT,
  p_mime_type TEXT,
  p_entity_id UUID,
  p_original_filename TEXT,
  p_parent_document_id UUID DEFAULT NULL,
  p_display_name TEXT DEFAULT NULL,
  p_hr_document_path TEXT DEFAULT NULL,
  p_document_date DATE DEFAULT NULL,
  p_received_on DATE DEFAULT NULL,
  p_published_on DATE DEFAULT NULL,
  p_created_by TEXT DEFAULT NULL,
  p_type_of_document TEXT DEFAULT NULL,
  p_language_identifier TEXT DEFAULT NULL
) RETURNS JSONB

Parameters:

NameTypeRequiredDescription
p_file_hashTEXTYesSHA256 hash of file content (for deduplication)
p_storage_pathTEXTYesFull path in Supabase storage bucket
p_file_size_bytesBIGINTYesFile size in bytes
p_mime_typeTEXTYesMIME type (e.g., 'application/pdf')
p_entity_idUUIDYesThe entity this document belongs to
p_original_filenameTEXTYesOriginal filename from Handelsregister
p_parent_document_idUUIDNoParent document if extracted from archive
p_display_nameTEXTNoHuman-readable document name
p_hr_document_pathTEXTNoHandelsregister navigation path (e.g., "VÖ/1/2")
p_document_dateDATENoDate printed on document
p_received_onDATENoDate received by register
p_published_onDATENoDate officially published
p_created_byTEXTNoThe source/creator named in Handelsregister
p_type_of_documentTEXTNoSpecific document type (e.g., 'Gesellschafterliste', 'Jahresabschluss')
p_language_identifierTEXTNoLanguage of the document in Handelsregister

Core Logic & Behavior:

  1. File Deduplication: Checks content.files for existing hash
    • If found: Reuses existing file ID
    • If not found: Creates new file record
  2. Document Creation: Inserts new document record linking to file and entity

Return Value:

json
{
  "document_id": "uuid-of-created-document",
  "error": null,
  "retry": false
}

Error Scenarios:

  • Invalid entity_id → Foreign key violation, retry: false
  • Missing required parameter → NOT NULL violation, retry: false

record_job_error

Summary: Centralized function for workers to report job failures with structured error data and optional retry delays.

Context: Single entry point for handling job errors. Implements automatic retry logic with configurable delays. The function checks retry_count against max_retries to determine whether to retry or mark the job as permanently failed.

Signature:

sql
record_job_error(
  p_job_id uuid,
  p_error_source text,
  p_error_type text,
  p_error_sub_type text,
  p_error_severity integer,
  p_error_message text,
  p_retry_delay_seconds integer DEFAULT NULL
) RETURNS JSONB

Parameters:

NameTypeRequiredDescription
p_job_iduuidYesThe ID of the failed job
p_error_sourcetextYesOrigin of error (e.g., worker, rpc_function)
p_error_typetextYesBroad category (e.g., api_error, validation_error)
p_error_sub_typetextYesSpecific error code (e.g., api_timeout, job_not_found)
p_error_severityintegerYesImpact level (1=low to 5=critical)
p_error_messagetextYesHuman-readable error description
p_retry_delay_secondsintegerNoSeconds to delay before retry (default: immediate)

Core Logic & Behavior:

  1. Job Validation: Locks job row, validates it exists and is in queued or running status
  2. Retry Check: Compares retry_count against max_retries to determine if retry is possible
  3. Retry Path (if retry_count < max_retries):
    • Increments retry_count
    • Sets status to queued
    • Sets run_at to now() + delay (allows scheduled retries)
    • Clears worker_id and started_at for clean retry
    • Stores error data in result field
  4. Failure Path (if max retries exceeded):
    • Sets status to failed
    • Sets completed_at timestamp
    • Stores error data in result field

Note: This function accepts jobs in both queued and running status to handle errors that occur:

  • Before a worker picks up the job (validation errors, business logic violations)
  • During job processing (worker errors, API failures)

Structured Error Format:

The function stores errors in this format:

json
{
  "error": {
    "source": "worker",
    "type": "api_error",
    "sub_type": "api_timeout",
    "severity": 3,
    "message": "Handelsregister API request timed out after 30s"
  }
}

Return Value:

Retry scheduled:

json
{
  "success": true,
  "error": null,
  "retry": true,
  "retry_at": "2025-11-24T19:22:37.010377Z"
}

Max retries exceeded (final failure):

json
{
  "success": true,
  "error": null,
  "retry": false,
  "retry_at": null
}

Validation error:

json
{
  "success": false,
  "error": "Job with ID ... not found",
  "retry": false,
  "retry_at": null
}

Retry Strategy Examples:

typescript
// Immediate retry (network glitch)
await supabase.rpc('record_job_error', {
  p_job_id: jobId,
  p_error_source: 'worker',
  p_error_type: 'network_error',
  p_error_sub_type: 'connection_reset',
  p_error_severity: 2,
  p_error_message: 'Connection reset by peer'
  // No delay - retry immediately
});

// Delayed retry (rate limiting)
await supabase.rpc('record_job_error', {
  p_job_id: jobId,
  p_error_source: 'worker',
  p_error_type: 'api_error',
  p_error_sub_type: 'rate_limit',
  p_error_severity: 2,
  p_error_message: 'Rate limit exceeded',
  p_retry_delay_seconds: 300  // Wait 5 minutes
});

The idx_processing_jobs_status_run_at index optimizes this query.

Permissions: Uses default SECURITY INVOKER. Intended for service_role clients.