Skip to content

Commit b587f4d

Browse files
authored
Add Pyhton module to implement Amazon Security Lake integration (#186)
* Migrate from #147 * Update amazon-security-lake integration - Improved documentation. - Python code has been moved to `wazuh-indexer/integrations/amazon-security-lake/src`. - Development environment now uses OpenSearch 2.12.0. - The `wazuh.integration.security.lake` container now displays logs, by watching logstash's log file. - [**NEEDS FIX**] As a temporary solution, the `INDEXER_USERNAME` and `INDEXER_PASSWORD` values have been added as an environment variable to the `wazuh.integration.security.lake` container. These values should be set at Dockerfile level, but isn't working, probably due to permission denied on invocation of the `setup.sh` script. - [**NEEDS FIX**] As a temporary solution, the output file of the `indexer-to-file` pipeline as been moved to `/var/log/logstash/indexer-to-file`. Previous path `/usr/share/logstash/pipeline/indexer-to-file.json` results in permission denied. - [**NEEDS FIX**] As a temporary solution, the input.opensearch.query has been replaced with `match_all`, as the previous one does not return any data, probably to the use of time filters `gt: now-1m`. - Standard output enable for `/usr/share/logstash/pipeline/indexer-to-file.json`. - [**NEEDS FIX**] ECS compatibility disabled: `echo "pipeline.ecs_compatibility: disabled" >> /etc/logstash/logstash.yml` -- to be included automatically - Python3 environment path added to the `indexer-to-integrator` pipeline. * Disable ECS compatibility (auto) - Adds pipeline.ecs_compatibility: disabled at Dockerfile level. - Removes `INDEXER_USERNAME` and `INDEXER_PASSWORD` as environment variables on the `wazuh.integration.security.lake` container. * Add @timestamp field to sample alerts * Fix Logstash pipelines * Add working indexer-to-s3 pipeline * Add working Python script up to S3 upload * Add latest changes * Remove duplicated line
1 parent 0377e44 commit b587f4d

20 files changed

+1225
-1146
lines changed

integrations/README.md

+24-3
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,32 @@ docker compose -f ./docker/amazon-security-lake.yml up -d
2222

2323
This docker compose project will bring a *wazuh-indexer* node, a *wazuh-dashboard* node,
2424
a *logstash* node and our event generator. On the one hand, the event generator will push events
25-
constantly to the indexer. On the other hand, logstash will constantly query for new data and
26-
deliver it to the integration Python program, also present in that node. Finally, the integration
27-
module will prepare and send the data to the Amazon Security Lake's S3 bucket.
25+
constantly to the indexer, on the `wazuh-alerts-4.x-sample` index by default (refer to the [events
26+
generator](./tools/events-generator/README.md) documentation for customization options).
27+
On the other hand, logstash will constantly query for new data and deliver it to the integration
28+
Python program, also present in that node. Finally, the integration module will prepare and send the
29+
data to the Amazon Security Lake's S3 bucket.
2830
<!-- TODO continue with S3 credentials setup -->
2931

32+
Attach a terminal to the container and start the integration by starting logstash, as follows:
33+
34+
```console
35+
/usr/share/logstash/bin/logstash -f /usr/share/logstash/pipeline/indexer-to-integrator.conf --path.settings /etc/logstash
36+
```
37+
38+
Unprocessed data can be sent to a file or to an S3 bucket.
39+
```console
40+
/usr/share/logstash/bin/logstash -f /usr/share/logstash/pipeline/indexer-to-file.conf --path.settings /etc/logstash
41+
/usr/share/logstash/bin/logstash -f /usr/share/logstash/pipeline/indexer-to-s3.conf --path.settings /etc/logstash
42+
```
43+
44+
All three pipelines are configured to fetch the latest data from the *wazuh-indexer* every minute. In
45+
the case of `indexer-to-file`, the data is written at the same pace, whereas `indexer-to-s3`, data
46+
is uploaded every 5 minutes.
47+
48+
For development or debugging purposes, you may want to enable hot-reload, test or debug on these files,
49+
by using the `--config.reload.automatic`, `--config.test_and_exit` or `--debug` flags, respectively.
50+
3051
For production usage, follow the instructions in our documentation page about this matter.
3152
(_when-its-done_)
3253

integrations/amazon-security-lake/Dockerfile

+7-2
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ RUN pip install -r /app/requirements.txt
1717
FROM python:3.9
1818
ENV LOGSTASH_KEYSTORE_PASS="SecretPassword"
1919
# Add the application source code.
20-
COPY --chown=logstash:logstash . /home/app
20+
COPY --chown=logstash:logstash ./src /home/app
2121
# Add execution persmissions.
2222
RUN chmod a+x /home/app/run.py
2323
# Copy the application's dependencies.
@@ -38,4 +38,9 @@ RUN chown --recursive logstash:logstash /usr/share/logstash /etc/logstash /var/l
3838
USER logstash
3939
# Copy and run the setup.sh script to create and configure a keystore for Logstash.
4040
COPY --chown=logstash:logstash logstash/setup.sh /usr/share/logstash/bin/setup.sh
41-
RUN bash /usr/share/logstash/bin/setup.sh
41+
RUN bash /usr/share/logstash/bin/setup.sh
42+
43+
# Disable ECS compatibility
44+
RUN `echo "pipeline.ecs_compatibility: disabled" >> /etc/logstash/logstash.yml`
45+
46+
WORKDIR /home/app

integrations/amazon-security-lake/logstash/pipeline/indexer-to-file.conf

+8-2
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,20 @@ input {
1515
}
1616
}
1717
}'
18-
target => "_source"
1918
schedule => "* * * * *"
2019
}
2120
}
2221

2322

2423
output {
24+
stdout {
25+
id => "output.stdout"
26+
codec => json_lines
27+
}
2528
file {
26-
path => "/usr/share/logstash/pipeline/indexer-to-file.json"
29+
id => "output.file"
30+
path => "/var/log/logstash/indexer-to-file-%{+YYYY-MM-dd-HH}.log"
31+
file_mode => 0644
32+
codec => json_lines
2733
}
2834
}

integrations/amazon-security-lake/logstash/pipeline/indexer-to-integrator.conf

+10-7
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,19 @@ input {
1515
}
1616
}
1717
}'
18-
target => "_source"
1918
schedule => "* * * * *"
2019
}
2120
}
2221

2322
output {
24-
pipe {
25-
id => "securityLake"
26-
message_format => "%{_source}"
27-
ttl => "10"
28-
command => "/usr/bin/env python3 /usr/local/bin/run.py -d"
29-
}
23+
stdout {
24+
id => "output.stdout"
25+
codec => json_lines
26+
}
27+
pipe {
28+
id => "output.integrator"
29+
ttl => "10"
30+
command => "/env/bin/python3 /usr/share/logstash/amazon-security-lake/run.py"
31+
# command => "/usr/share/logstash/amazon-security-lake/run.py --pushinterval 300 --maxlength 2000 --linebuffer 100 --sleeptime 1 --bucketname securitylake --s3endpoint s3.ninja:9000 --s3profile default"
32+
}
3033
}

integrations/amazon-security-lake/logstash/pipeline/indexer-to-s3.conf

+19-9
Original file line numberDiff line numberDiff line change
@@ -15,21 +15,31 @@ input {
1515
}
1616
}
1717
}'
18-
target => "_source"
19-
schedule => "* * * * *"
18+
schedule => "5/* * * * *"
2019
}
2120
}
2221

2322
output {
24-
stdout { codec => rubydebug }
23+
stdout {
24+
id => "output.stdout"
25+
codec => json_lines
26+
}
2527
s3 {
26-
access_key_id => "<aws-access-key>"
27-
secret_access_key => "<aws-secret-key>"
28-
region => "<your-region>"
28+
id => "output.s3"
29+
access_key_id => "${AWS_KEY}"
30+
secret_access_key => "${AWS_SECRET}"
31+
region => "${AWS_REGION}"
32+
endpoint => "http://s3.ninja:9000"
33+
bucket => "${AWS_BUCKET}"
34+
codec => "json"
35+
retry_count => 0
36+
validate_credentials_on_root_bucket => false
37+
prefix => "%{+YYYY}/%{+MM}/%{+dd}"
2938
server_side_encryption => true
3039
server_side_encryption_algorithm => "AES256"
31-
bucket => "wazuh-indexer-amazon-security-lake-bucket"
32-
canned_acl => "bucket-owner-full-control"
33-
codec => "json"
40+
additional_settings => {
41+
"force_path_style" => true
42+
}
43+
time_file => 5
3444
}
3545
}
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
pyarrow>=10.0.1
22
parquet-tools>=0.2.15
3-
pydantic==2.6.1
3+
pydantic==2.6.1
4+
boto3==1.34.46

integrations/amazon-security-lake/run.py

-26
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
import parquet.parquet
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
#!/env/bin/python3
2+
# vim: bkc=yes bk wb
3+
4+
import sys
5+
import os
6+
import datetime
7+
import transform
8+
import pyarrow as pa
9+
import pyarrow.parquet as pq
10+
import logging
11+
import boto3
12+
from botocore.exceptions import ClientError
13+
14+
# NOTE work in progress
15+
def upload_file(table, file_name, bucket, object_name=None):
16+
"""Upload a file to an S3 bucket
17+
18+
:param table: PyArrow table with events data
19+
:param file_name: File to upload
20+
:param bucket: Bucket to upload to
21+
:param object_name: S3 object name. If not specified then file_name is used
22+
:return: True if file was uploaded, else False
23+
"""
24+
25+
client = boto3.client(
26+
service_name='s3',
27+
aws_access_key_id=os.environ['AWS_ACCESS_KEY_ID'],
28+
aws_secret_access_key=os.environ['AWS_SECRET_ACCESS_KEY'],
29+
region_name=os.environ['AWS_REGION'],
30+
endpoint_url='http://s3.ninja:9000',
31+
)
32+
33+
# If S3 object_name was not specified, use file_name
34+
if object_name is None:
35+
object_name = os.path.basename(file_name)
36+
37+
# Upload the file
38+
try:
39+
client.put_object(Bucket=bucket, Key=file_name, Body=open(file_name, 'rb'))
40+
except ClientError as e:
41+
logging.error(e)
42+
return False
43+
return True
44+
45+
46+
def main():
47+
'''Main function'''
48+
# Get current timestamp
49+
timestamp = datetime.datetime.now(datetime.timezone.utc).isoformat()
50+
51+
# Generate filenames
52+
filename_raw = f"/tmp/integrator-raw-{timestamp}.json"
53+
filename_ocsf = f"/tmp/integrator-ocsf-{timestamp}.json"
54+
filename_parquet = f"/tmp/integrator-ocsf-{timestamp}.parquet"
55+
56+
# 1. Extract data
57+
# ================
58+
raw_data = []
59+
for line in sys.stdin:
60+
raw_data.append(line)
61+
62+
# Echo piped data
63+
with open(filename_raw, "a") as fd:
64+
fd.write(line)
65+
66+
# 2. Transform data
67+
# ================
68+
# a. Transform to OCSF
69+
ocsf_data = []
70+
for line in raw_data:
71+
try:
72+
event = transform.converter.from_json(line)
73+
ocsf_event = transform.converter.to_detection_finding(event)
74+
ocsf_data.append(ocsf_event.model_dump())
75+
76+
# Temporal disk storage
77+
with open(filename_ocsf, "a") as fd:
78+
fd.write(str(ocsf_event) + "\n")
79+
except AttributeError as e:
80+
print("Error transforming line to OCSF")
81+
print(event)
82+
print(e)
83+
84+
# b. Encode as Parquet
85+
try:
86+
table = pa.Table.from_pylist(ocsf_data)
87+
pq.write_table(table, filename_parquet)
88+
except AttributeError as e:
89+
print("Error encoding data to parquet")
90+
print(e)
91+
92+
# 3. Load data (upload to S3)
93+
# ================
94+
if upload_file(table, filename_parquet, os.environ['AWS_BUCKET']):
95+
# Remove /tmp files
96+
pass
97+
98+
99+
def _test():
100+
ocsf_event = {}
101+
with open("./wazuh-event.sample.json", "r") as fd:
102+
# Load from file descriptor
103+
for raw_event in fd:
104+
try:
105+
event = transform.converter.from_json(raw_event)
106+
print("")
107+
print("-- Wazuh event Pydantic model")
108+
print("")
109+
print(event.model_dump())
110+
ocsf_event = transform.converter.to_detection_finding(event)
111+
print("")
112+
print("-- Converted to OCSF")
113+
print("")
114+
print(ocsf_event.model_dump())
115+
116+
except KeyError as e:
117+
raise (e)
118+
119+
120+
if __name__ == '__main__':
121+
main()
122+
# _test()

integrations/amazon-security-lake/transform/converter.py integrations/amazon-security-lake/src/transform/converter.py

+10-3
Original file line numberDiff line numberDiff line change
@@ -90,9 +90,16 @@ def to_detection_finding(event: models.wazuh.Event) -> models.ocsf.DetectionFind
9090
)
9191

9292

93-
def from_json(event: dict) -> models.wazuh.Event:
93+
# def from_json(event: dict) -> models.wazuh.Event:
94+
# # Needs to a string, bytes or bytearray
95+
# try:
96+
# return models.wazuh.Event.model_validate_json(json.dumps(event))
97+
# except pydantic.ValidationError as e:
98+
# print(e)
99+
100+
def from_json(event: str) -> models.wazuh.Event:
94101
# Needs to a string, bytes or bytearray
95102
try:
96-
return models.wazuh.Event.model_validate_json(json.dumps(event))
103+
return models.wazuh.Event.model_validate_json(event)
97104
except pydantic.ValidationError as e:
98-
print(e)
105+
print(e)

integrations/amazon-security-lake/transform/models/wazuh.py integrations/amazon-security-lake/src/transform/models/wazuh.py

+10-10
Original file line numberDiff line numberDiff line change
@@ -6,27 +6,27 @@
66

77

88
class Mitre(pydantic.BaseModel):
9-
technique: typing.List[str] = []
10-
id: typing.List[str] = ""
11-
tactic: typing.List[str] = []
9+
technique: typing.List[str] = ["N/A"]
10+
id: typing.List[str] = ["N/A"]
11+
tactic: typing.List[str] = ["N/A"]
1212

1313

1414
class Rule(pydantic.BaseModel):
1515
firedtimes: int = 0
16-
description: str = ""
16+
description: str = "N/A"
1717
groups: typing.List[str] = []
18-
id: str = ""
18+
id: str = "N/A"
1919
mitre: Mitre = Mitre()
2020
level: int = 0
2121
nist_800_53: typing.List[str] = []
2222

2323

2424
class Decoder(pydantic.BaseModel):
25-
name: str
25+
name: str = "N/A"
2626

2727

2828
class Input(pydantic.BaseModel):
29-
type: str
29+
type: str = "N/A"
3030

3131

3232
class Agent(pydantic.BaseModel):
@@ -39,9 +39,9 @@ class Manager(pydantic.BaseModel):
3939

4040

4141
class Event(pydantic.BaseModel):
42-
rule: Rule = {}
43-
decoder: Decoder = {}
44-
input: Input = {}
42+
rule: Rule = Rule()
43+
decoder: Decoder = Decoder()
44+
input: Input = Input()
4545
id: str = ""
4646
full_log: str = ""
4747
agent: Agent = {}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
{"cluster":{"name":"wazuh-cluster","node":"wazuh-manager"},"agent":{"id":"003","ip":"10.0.0.180","name":"ip-10-0-0-180.us-west-1.compute.internal"},"@timestamp":"2024-03-14T12:57:05.730Z","data":{"audit":{"exe":"/usr/sbin/sshd","type":"NORMAL","cwd":"/home/wazuh","file":{"name":"/var/sample"},"success":"yes","command":"ssh"}},"@version":"1","manager":{"name":"wazuh-manager"},"location":"","decoder":{},"id":"1580123327.49031","predecoder":{},"timestamp":"2024-03-14T12:57:05.730+0000","rule":{"description":"Audit: Command: /usr/sbin/ssh","firedtimes":3,"level":3,"id":"80791","mail":false,"groups":["audit","audit_command"],"gdpr":["IV_30.1.g"]}}

0 commit comments

Comments
 (0)