Skip to main content

PubSub

The PubSub scraper subscribes to message queues and pub/sub systems to consume messages and create configuration items from them. This enables real-time configuration tracking based on events and messages published to various messaging systems.

pubsub-scraper.yaml
apiVersion: configs.flanksource.com/v1
kind: ScrapeConfig
metadata:
name: pubsub-check
spec:
pubsub:
- pubsub:
project_id: flanksource-sandbox
subscription: incident-alerts-sub
type: PubItem
id: $.msg_id
transform:
expr: "[config].toJSON()"
FieldDescriptionSchemeRequired
scheduleSpecify the interval to scrape in cron format. Defaults to every 60 minutes.Cron
retentionSettings for retaining changes, analysis and scraped itemsRetention
pubsubSpecifies the list of PubSub configurations to scrape.[]PubSubtrue

PubSub

Mapping

Custom scrapers require you to define the id and type for each scraped item. For example, when you scrape a file containing a JSON array, where each array element represents a config item, you must specify the id and type for those items. You can achieve this by using mappings in your custom scraper configuration.

FieldDescriptionScheme
id*

A static value or JSONPath expression to use as the ID for the resource.

string or JSONPath

name*

A static value or JSONPath expression to use as the name for the resource.

string or JSONPath

type*

A static value or JSONPath expression to use as the type for the resource.

string or JSONPath

class

A static value or JSONPath expression to use as the class for the resource.

string or JSONPath

createFields

A list of JSONPath expressions used to identify the created time of the config. If multiple fields are specified, the first non-empty value will be used.

[]jsonpath

deleteFields

A list of JSONPath expressions used to identify the deleted time of the config. If multiple fields are specified, the first non-empty value will be used.

[]jsonpath

description

A static value or JSONPath expression to use as the description for the resource.

string or JSONPath

format

Format of config item, defaults to JSON, available options are JSON, properties. See Formats

string

health

A static value or JSONPath expression to use as the health of the config item.

string or JSONPath

items

A JSONPath expression to use to extract individual items from the resource. Items are extracted first and then the ID, Name, Type and transformations are applied for each item.

JSONPath

status

A static value or JSONPath expression to use as the status of the config item.

string or JSONPath

timestampFormat

A Go time format string used to parse timestamps in createFields and deleteFields. (Default: RFC3339)

string

Formats

JSON

The scraper stores config items as jsonb fields in PostgreSQL.

Resource providers typically return the JSON used. e.g. kubectl get -o json or aws --output=json.

When you display the config, the UI automatically converts the JSON data to YAML for improved readability.

XML / Properties

The scraper stores non-JSON files as JSON using:

{ 'format': 'xml', 'content': '<root>..</root>' }

You can still access non-JSON content in scripts using config.content.

The UI formats and renders XML appropriately.

Extracting Changes & Access Logs

Custom scrapers ingest changes & access logs from external systems when you enable the full option.

Every single config is expected to have at these 3 top-level fields

  • config
  • changes
  • access_logs
info

They could have more fields or even missing some of these fields. The point is that only these fields are extracted.

Consider a file that contains the following json data.

{
"reg_no": "A123",
"config": {
"meta": "this is the actual config that'll be stored."
},
"changes": [
{
"action": "drive",
"summary": "car color changed to blue",
"unrelated_stuff": 123
}
],
"access_logs": [
{
"config_id": "99024949-9118-4dcb-a3a0-b8f1536bebd0",
"external_user_id": "a3542241-4750-11f0-8000-e0146ce375e6",
"created_at": "2025-01-01"
},
{
"config_id": "9d9e51a7-6956-413e-a07e-a6aeb3f4877f",
"external_user_id": "a5c2e8e3-4750-11f0-8000-f4eaacabd632",
"created_at": "2025-01-02"
}
]
}

A regular scraper saves the entire json as a config. However, with the full option, the scraper extracts the config, changes and access logs.

apiVersion: configs.flanksource.com/v1
kind: ScrapeConfig
metadata:
name: file-scraper
spec:
full: true
file:
- type: Car
id: $.reg_no
paths:
- fixtures/data/car_changes.json

The resulting config is:

{
"meta": "this is the actual config that'll be stored."
}

and the scraper records the following new config change on that config:

{
"action": "drive",
"summary": "car color changed to blue",
"unrelated_stuff": 123
}

and the access logs will be saved to

[
{
"config_id": "99024949-9118-4dcb-a3a0-b8f1536bebd0",
"external_user_id": "a3542241-4750-11f0-8000-e0146ce375e6",
"created_at": "2025-01-01"
},
{
"config_id": "9d9e51a7-6956-413e-a07e-a6aeb3f4877f",
"external_user_id": "a5c2e8e3-4750-11f0-8000-f4eaacabd632",
"created_at": "2025-01-02"
}
]

QueueConfig

The PubSub scraper supports various message queue systems. Currently, GCP Pub/Sub is the primary supported system.

GCP Pub/Sub Configuration

Mapping

Custom scrapers require you to define the id and type for each scraped item. For example, when you scrape a file containing a JSON array, where each array element represents a config item, you must specify the id and type for those items. You can achieve this by using mappings in your custom scraper configuration.

FieldDescriptionScheme
id*

A static value or JSONPath expression to use as the ID for the resource.

string or JSONPath

name*

A static value or JSONPath expression to use as the name for the resource.

string or JSONPath

type*

A static value or JSONPath expression to use as the type for the resource.

string or JSONPath

class

A static value or JSONPath expression to use as the class for the resource.

string or JSONPath

createFields

A list of JSONPath expressions used to identify the created time of the config. If multiple fields are specified, the first non-empty value will be used.

[]jsonpath

deleteFields

A list of JSONPath expressions used to identify the deleted time of the config. If multiple fields are specified, the first non-empty value will be used.

[]jsonpath

description

A static value or JSONPath expression to use as the description for the resource.

string or JSONPath

format

Format of config item, defaults to JSON, available options are JSON, properties. See Formats

string

health

A static value or JSONPath expression to use as the health of the config item.

string or JSONPath

items

A JSONPath expression to use to extract individual items from the resource. Items are extracted first and then the ID, Name, Type and transformations are applied for each item.

JSONPath

status

A static value or JSONPath expression to use as the status of the config item.

string or JSONPath

timestampFormat

A Go time format string used to parse timestamps in createFields and deleteFields. (Default: RFC3339)

string

Formats

JSON

The scraper stores config items as jsonb fields in PostgreSQL.

Resource providers typically return the JSON used. e.g. kubectl get -o json or aws --output=json.

When you display the config, the UI automatically converts the JSON data to YAML for improved readability.

XML / Properties

The scraper stores non-JSON files as JSON using:

{ 'format': 'xml', 'content': '<root>..</root>' }

You can still access non-JSON content in scripts using config.content.

The UI formats and renders XML appropriately.

Extracting Changes & Access Logs

Custom scrapers ingest changes & access logs from external systems when you enable the full option.

Every single config is expected to have at these 3 top-level fields

  • config
  • changes
  • access_logs
info

They could have more fields or even missing some of these fields. The point is that only these fields are extracted.

Consider a file that contains the following json data.

{
"reg_no": "A123",
"config": {
"meta": "this is the actual config that'll be stored."
},
"changes": [
{
"action": "drive",
"summary": "car color changed to blue",
"unrelated_stuff": 123
}
],
"access_logs": [
{
"config_id": "99024949-9118-4dcb-a3a0-b8f1536bebd0",
"external_user_id": "a3542241-4750-11f0-8000-e0146ce375e6",
"created_at": "2025-01-01"
},
{
"config_id": "9d9e51a7-6956-413e-a07e-a6aeb3f4877f",
"external_user_id": "a5c2e8e3-4750-11f0-8000-f4eaacabd632",
"created_at": "2025-01-02"
}
]
}

A regular scraper saves the entire json as a config. However, with the full option, the scraper extracts the config, changes and access logs.

apiVersion: configs.flanksource.com/v1
kind: ScrapeConfig
metadata:
name: file-scraper
spec:
full: true
file:
- type: Car
id: $.reg_no
paths:
- fixtures/data/car_changes.json

The resulting config is:

{
"meta": "this is the actual config that'll be stored."
}

and the scraper records the following new config change on that config:

{
"action": "drive",
"summary": "car color changed to blue",
"unrelated_stuff": 123
}

and the access logs will be saved to

[
{
"config_id": "99024949-9118-4dcb-a3a0-b8f1536bebd0",
"external_user_id": "a3542241-4750-11f0-8000-e0146ce375e6",
"created_at": "2025-01-01"
},
{
"config_id": "9d9e51a7-6956-413e-a07e-a6aeb3f4877f",
"external_user_id": "a5c2e8e3-4750-11f0-8000-f4eaacabd632",
"created_at": "2025-01-02"
}
]

Use Cases

  • Event-Driven Configuration: React to configuration changes published to message queues
  • Microservices Communication: Track service state changes communicated via pub/sub
  • Alert Processing: Convert alert notifications into configuration changes
  • Real-time Monitoring: Process streaming configuration data from various sources
  • Integration Hub: Consume configuration events from multiple systems through a unified queue

Configuration Examples

GCP Pub/Sub Integration

pubsub-gcp.yaml
apiVersion: configs.flanksource.com/v1
kind: ScrapeConfig
metadata:
name: pubsub-check
spec:
pubsub:
- pubsub:
project_id: flanksource-sandbox
subscription: incident-alerts-sub
type: PubItem
id: $.msg_id
transform:
expr: "[config].toJSON()"

Multi-Message Processing

apiVersion: configs.flanksource.com/v1
kind: ScrapeConfig
metadata:
name: pubsub-deployment-events
spec:
pubsub:
- pubsub:
project_id: devops-project
subscription: deployment-events
credentials:
valueFrom:
secretKeyRef:
name: gcp-credentials
key: service-account.json
maxMessages: 50
type: DeploymentEvent
id: $.deployment_id
transform:
expr: |
dyn(config).map(msg, {
"name": msg.service_name + "-" + msg.version,
"type": "Service::Deployment",
"config": msg,
"changes": [{
"change_type": msg.event_type,
"external_id": msg.deployment_id,
"summary": "Deployed " + msg.service_name + " version " + msg.version,
"severity": msg.event_type == "deployment_failed" ? "high" : "info",
"created_at": msg.timestamp
}]
})

Message Filtering and Processing

apiVersion: configs.flanksource.com/v1
kind: ScrapeConfig
metadata:
name: pubsub-config-changes
spec:
pubsub:
- pubsub:
project_id: config-management
subscription: config-change-notifications
maxMessages: 200
type: ConfigurationChange
id: $.change_id
transform:
expr: |
dyn(config).
filter(msg, msg.event_type == "configuration_updated").
map(msg, {
"name": msg.component_name,
"type": "Configuration",
"config": {
"component": msg.component_name,
"environment": msg.environment,
"old_config": msg.previous_config,
"new_config": msg.current_config
},
"changes": [{
"change_type": "ConfigurationUpdate",
"external_id": msg.change_id,
"summary": "Configuration updated for " + msg.component_name,
"severity": msg.impact_level,
"created_at": msg.timestamp,
"diff": msg.config_diff
}]
})

Best Practices

  • Message Acknowledgment: Messages are automatically acknowledged after successful processing
  • Error Handling: Failed message processing will be retried based on the Pub/Sub subscription settings
  • Batch Processing: Use maxMessages to control throughput and resource usage
  • Transform Expressions: Use CEL expressions to filter and transform messages into the desired configuration format