PubSub
The PubSub scraper subscribes to message queues and pub/sub systems to consume messages and create configuration items from them. This enables real-time configuration tracking based on events and messages published to various messaging systems.
pubsub-scraper.yamlapiVersion: configs.flanksource.com/v1
kind: ScrapeConfig
metadata:
name: pubsub-check
spec:
pubsub:
- pubsub:
project_id: flanksource-sandbox
subscription: incident-alerts-sub
type: PubItem
id: $.msg_id
transform:
expr: "[config].toJSON()"
Field | Description | Scheme | Required |
---|---|---|---|
schedule | Specify the interval to scrape in cron format. Defaults to every 60 minutes. | Cron | |
retention | Settings for retaining changes, analysis and scraped items | Retention | |
pubsub | Specifies the list of PubSub configurations to scrape. | []PubSub | true |
PubSub
Mapping
Custom scrapers require you to define the id
and type
for each scraped item. For example, when you scrape a file containing a JSON array, where each array element represents a config item, you must specify the id
and type
for those items.
You can achieve this by using mappings in your custom scraper configuration.
Field | Description | Scheme |
---|---|---|
id* | A static value or JSONPath expression to use as the ID for the resource. |
|
name* | A static value or JSONPath expression to use as the name for the resource. |
|
type* | A static value or JSONPath expression to use as the type for the resource. |
|
class | A static value or JSONPath expression to use as the class for the resource. |
|
createFields | A list of JSONPath expressions used to identify the created time of the config. If multiple fields are specified, the first non-empty value will be used. | []jsonpath |
deleteFields | A list of JSONPath expressions used to identify the deleted time of the config. If multiple fields are specified, the first non-empty value will be used. | []jsonpath |
description | A static value or JSONPath expression to use as the description for the resource. |
|
format | Format of config item, defaults to JSON, available options are JSON, properties. See Formats |
|
health | A static value or JSONPath expression to use as the health of the config item. |
|
items | A JSONPath expression to use to extract individual items from the resource. Items are extracted first and then the ID, Name, Type and transformations are applied for each item. | |
status | A static value or JSONPath expression to use as the status of the config item. |
|
timestampFormat | A Go time format string used to parse timestamps in createFields and deleteFields. (Default: RFC3339) |
|
Formats
JSON
The scraper stores config items as jsonb
fields in PostgreSQL.
Resource providers typically return the JSON used. e.g. kubectl get -o json
or aws --output=json
.
When you display the config, the UI automatically converts the JSON data to YAML for improved readability.
XML / Properties
The scraper stores non-JSON files as JSON using:
{ 'format': 'xml', 'content': '<root>..</root>' }
You can still access non-JSON content in scripts using config.content
.
The UI formats and renders XML appropriately.
Extracting Changes & Access Logs
Custom scrapers ingest changes & access logs from external systems when you enable the full
option.
Every single config is expected to have at these 3 top-level fields
config
changes
access_logs
They could have more fields or even missing some of these fields. The point is that only these fields are extracted.
Consider a file that contains the following json data.
{
"reg_no": "A123",
"config": {
"meta": "this is the actual config that'll be stored."
},
"changes": [
{
"action": "drive",
"summary": "car color changed to blue",
"unrelated_stuff": 123
}
],
"access_logs": [
{
"config_id": "99024949-9118-4dcb-a3a0-b8f1536bebd0",
"external_user_id": "a3542241-4750-11f0-8000-e0146ce375e6",
"created_at": "2025-01-01"
},
{
"config_id": "9d9e51a7-6956-413e-a07e-a6aeb3f4877f",
"external_user_id": "a5c2e8e3-4750-11f0-8000-f4eaacabd632",
"created_at": "2025-01-02"
}
]
}
A regular scraper saves the entire json as a config.
However, with the full
option, the scraper extracts the config, changes and access logs.
apiVersion: configs.flanksource.com/v1
kind: ScrapeConfig
metadata:
name: file-scraper
spec:
full: true
file:
- type: Car
id: $.reg_no
paths:
- fixtures/data/car_changes.json
The resulting config is:
{
"meta": "this is the actual config that'll be stored."
}
and the scraper records the following new config change on that config:
{
"action": "drive",
"summary": "car color changed to blue",
"unrelated_stuff": 123
}
and the access logs will be saved to
[
{
"config_id": "99024949-9118-4dcb-a3a0-b8f1536bebd0",
"external_user_id": "a3542241-4750-11f0-8000-e0146ce375e6",
"created_at": "2025-01-01"
},
{
"config_id": "9d9e51a7-6956-413e-a07e-a6aeb3f4877f",
"external_user_id": "a5c2e8e3-4750-11f0-8000-f4eaacabd632",
"created_at": "2025-01-02"
}
]
QueueConfig
The PubSub scraper supports various message queue systems. Currently, GCP Pub/Sub is the primary supported system.
GCP Pub/Sub Configuration
Mapping
Custom scrapers require you to define the id
and type
for each scraped item. For example, when you scrape a file containing a JSON array, where each array element represents a config item, you must specify the id
and type
for those items.
You can achieve this by using mappings in your custom scraper configuration.
Field | Description | Scheme |
---|---|---|
id* | A static value or JSONPath expression to use as the ID for the resource. |
|
name* | A static value or JSONPath expression to use as the name for the resource. |
|
type* | A static value or JSONPath expression to use as the type for the resource. |
|
class | A static value or JSONPath expression to use as the class for the resource. |
|
createFields | A list of JSONPath expressions used to identify the created time of the config. If multiple fields are specified, the first non-empty value will be used. | []jsonpath |
deleteFields | A list of JSONPath expressions used to identify the deleted time of the config. If multiple fields are specified, the first non-empty value will be used. | []jsonpath |
description | A static value or JSONPath expression to use as the description for the resource. |
|
format | Format of config item, defaults to JSON, available options are JSON, properties. See Formats |
|
health | A static value or JSONPath expression to use as the health of the config item. |
|
items | A JSONPath expression to use to extract individual items from the resource. Items are extracted first and then the ID, Name, Type and transformations are applied for each item. | |
status | A static value or JSONPath expression to use as the status of the config item. |
|
timestampFormat | A Go time format string used to parse timestamps in createFields and deleteFields. (Default: RFC3339) |
|
Formats
JSON
The scraper stores config items as jsonb
fields in PostgreSQL.
Resource providers typically return the JSON used. e.g. kubectl get -o json
or aws --output=json
.
When you display the config, the UI automatically converts the JSON data to YAML for improved readability.
XML / Properties
The scraper stores non-JSON files as JSON using:
{ 'format': 'xml', 'content': '<root>..</root>' }
You can still access non-JSON content in scripts using config.content
.
The UI formats and renders XML appropriately.
Extracting Changes & Access Logs
Custom scrapers ingest changes & access logs from external systems when you enable the full
option.
Every single config is expected to have at these 3 top-level fields
config
changes
access_logs
They could have more fields or even missing some of these fields. The point is that only these fields are extracted.
Consider a file that contains the following json data.
{
"reg_no": "A123",
"config": {
"meta": "this is the actual config that'll be stored."
},
"changes": [
{
"action": "drive",
"summary": "car color changed to blue",
"unrelated_stuff": 123
}
],
"access_logs": [
{
"config_id": "99024949-9118-4dcb-a3a0-b8f1536bebd0",
"external_user_id": "a3542241-4750-11f0-8000-e0146ce375e6",
"created_at": "2025-01-01"
},
{
"config_id": "9d9e51a7-6956-413e-a07e-a6aeb3f4877f",
"external_user_id": "a5c2e8e3-4750-11f0-8000-f4eaacabd632",
"created_at": "2025-01-02"
}
]
}
A regular scraper saves the entire json as a config.
However, with the full
option, the scraper extracts the config, changes and access logs.
apiVersion: configs.flanksource.com/v1
kind: ScrapeConfig
metadata:
name: file-scraper
spec:
full: true
file:
- type: Car
id: $.reg_no
paths:
- fixtures/data/car_changes.json
The resulting config is:
{
"meta": "this is the actual config that'll be stored."
}
and the scraper records the following new config change on that config:
{
"action": "drive",
"summary": "car color changed to blue",
"unrelated_stuff": 123
}
and the access logs will be saved to
[
{
"config_id": "99024949-9118-4dcb-a3a0-b8f1536bebd0",
"external_user_id": "a3542241-4750-11f0-8000-e0146ce375e6",
"created_at": "2025-01-01"
},
{
"config_id": "9d9e51a7-6956-413e-a07e-a6aeb3f4877f",
"external_user_id": "a5c2e8e3-4750-11f0-8000-f4eaacabd632",
"created_at": "2025-01-02"
}
]
Use Cases
- Event-Driven Configuration: React to configuration changes published to message queues
- Microservices Communication: Track service state changes communicated via pub/sub
- Alert Processing: Convert alert notifications into configuration changes
- Real-time Monitoring: Process streaming configuration data from various sources
- Integration Hub: Consume configuration events from multiple systems through a unified queue
Configuration Examples
GCP Pub/Sub Integration
pubsub-gcp.yamlapiVersion: configs.flanksource.com/v1
kind: ScrapeConfig
metadata:
name: pubsub-check
spec:
pubsub:
- pubsub:
project_id: flanksource-sandbox
subscription: incident-alerts-sub
type: PubItem
id: $.msg_id
transform:
expr: "[config].toJSON()"
Multi-Message Processing
apiVersion: configs.flanksource.com/v1
kind: ScrapeConfig
metadata:
name: pubsub-deployment-events
spec:
pubsub:
- pubsub:
project_id: devops-project
subscription: deployment-events
credentials:
valueFrom:
secretKeyRef:
name: gcp-credentials
key: service-account.json
maxMessages: 50
type: DeploymentEvent
id: $.deployment_id
transform:
expr: |
dyn(config).map(msg, {
"name": msg.service_name + "-" + msg.version,
"type": "Service::Deployment",
"config": msg,
"changes": [{
"change_type": msg.event_type,
"external_id": msg.deployment_id,
"summary": "Deployed " + msg.service_name + " version " + msg.version,
"severity": msg.event_type == "deployment_failed" ? "high" : "info",
"created_at": msg.timestamp
}]
})
Message Filtering and Processing
apiVersion: configs.flanksource.com/v1
kind: ScrapeConfig
metadata:
name: pubsub-config-changes
spec:
pubsub:
- pubsub:
project_id: config-management
subscription: config-change-notifications
maxMessages: 200
type: ConfigurationChange
id: $.change_id
transform:
expr: |
dyn(config).
filter(msg, msg.event_type == "configuration_updated").
map(msg, {
"name": msg.component_name,
"type": "Configuration",
"config": {
"component": msg.component_name,
"environment": msg.environment,
"old_config": msg.previous_config,
"new_config": msg.current_config
},
"changes": [{
"change_type": "ConfigurationUpdate",
"external_id": msg.change_id,
"summary": "Configuration updated for " + msg.component_name,
"severity": msg.impact_level,
"created_at": msg.timestamp,
"diff": msg.config_diff
}]
})
Best Practices
- Message Acknowledgment: Messages are automatically acknowledged after successful processing
- Error Handling: Failed message processing will be retried based on the Pub/Sub subscription settings
- Batch Processing: Use
maxMessages
to control throughput and resource usage - Transform Expressions: Use CEL expressions to filter and transform messages into the desired configuration format