|
| 1 | +#!/env/bin/python3 |
| 2 | +# vim: bkc=yes bk wb |
| 3 | + |
| 4 | +import sys |
| 5 | +import os |
| 6 | +import datetime |
| 7 | +import transform |
| 8 | +import pyarrow as pa |
| 9 | +import pyarrow.parquet as pq |
| 10 | +import logging |
| 11 | +import boto3 |
| 12 | +from botocore.exceptions import ClientError |
| 13 | + |
| 14 | +# NOTE work in progress |
| 15 | +def upload_file(table, file_name, bucket, object_name=None): |
| 16 | + """Upload a file to an S3 bucket |
| 17 | +
|
| 18 | + :param table: PyArrow table with events data |
| 19 | + :param file_name: File to upload |
| 20 | + :param bucket: Bucket to upload to |
| 21 | + :param object_name: S3 object name. If not specified then file_name is used |
| 22 | + :return: True if file was uploaded, else False |
| 23 | + """ |
| 24 | + |
| 25 | + client = boto3.client( |
| 26 | + service_name='s3', |
| 27 | + aws_access_key_id=os.environ['AWS_ACCESS_KEY_ID'], |
| 28 | + aws_secret_access_key=os.environ['AWS_SECRET_ACCESS_KEY'], |
| 29 | + region_name=os.environ['AWS_REGION'], |
| 30 | + endpoint_url='http://s3.ninja:9000', |
| 31 | + ) |
| 32 | + |
| 33 | + # If S3 object_name was not specified, use file_name |
| 34 | + if object_name is None: |
| 35 | + object_name = os.path.basename(file_name) |
| 36 | + |
| 37 | + # Upload the file |
| 38 | + try: |
| 39 | + client.put_object(Bucket=bucket, Key=file_name, Body=open(file_name, 'rb')) |
| 40 | + except ClientError as e: |
| 41 | + logging.error(e) |
| 42 | + return False |
| 43 | + return True |
| 44 | + |
| 45 | + |
| 46 | +def main(): |
| 47 | + '''Main function''' |
| 48 | + # Get current timestamp |
| 49 | + timestamp = datetime.datetime.now(datetime.timezone.utc).isoformat() |
| 50 | + |
| 51 | + # Generate filenames |
| 52 | + filename_raw = f"/tmp/integrator-raw-{timestamp}.json" |
| 53 | + filename_ocsf = f"/tmp/integrator-ocsf-{timestamp}.json" |
| 54 | + filename_parquet = f"/tmp/integrator-ocsf-{timestamp}.parquet" |
| 55 | + |
| 56 | + # 1. Extract data |
| 57 | + # ================ |
| 58 | + raw_data = [] |
| 59 | + for line in sys.stdin: |
| 60 | + raw_data.append(line) |
| 61 | + |
| 62 | + # Echo piped data |
| 63 | + with open(filename_raw, "a") as fd: |
| 64 | + fd.write(line) |
| 65 | + |
| 66 | + # 2. Transform data |
| 67 | + # ================ |
| 68 | + # a. Transform to OCSF |
| 69 | + ocsf_data = [] |
| 70 | + for line in raw_data: |
| 71 | + try: |
| 72 | + event = transform.converter.from_json(line) |
| 73 | + ocsf_event = transform.converter.to_detection_finding(event) |
| 74 | + ocsf_data.append(ocsf_event.model_dump()) |
| 75 | + |
| 76 | + # Temporal disk storage |
| 77 | + with open(filename_ocsf, "a") as fd: |
| 78 | + fd.write(str(ocsf_event) + "\n") |
| 79 | + except AttributeError as e: |
| 80 | + print("Error transforming line to OCSF") |
| 81 | + print(event) |
| 82 | + print(e) |
| 83 | + |
| 84 | + # b. Encode as Parquet |
| 85 | + try: |
| 86 | + table = pa.Table.from_pylist(ocsf_data) |
| 87 | + pq.write_table(table, filename_parquet) |
| 88 | + except AttributeError as e: |
| 89 | + print("Error encoding data to parquet") |
| 90 | + print(e) |
| 91 | + |
| 92 | + # 3. Load data (upload to S3) |
| 93 | + # ================ |
| 94 | + if upload_file(table, filename_parquet, os.environ['AWS_BUCKET']): |
| 95 | + # Remove /tmp files |
| 96 | + pass |
| 97 | + |
| 98 | + |
| 99 | +def _test(): |
| 100 | + ocsf_event = {} |
| 101 | + with open("./wazuh-event.sample.json", "r") as fd: |
| 102 | + # Load from file descriptor |
| 103 | + for raw_event in fd: |
| 104 | + try: |
| 105 | + event = transform.converter.from_json(raw_event) |
| 106 | + print("") |
| 107 | + print("-- Wazuh event Pydantic model") |
| 108 | + print("") |
| 109 | + print(event.model_dump()) |
| 110 | + ocsf_event = transform.converter.to_detection_finding(event) |
| 111 | + print("") |
| 112 | + print("-- Converted to OCSF") |
| 113 | + print("") |
| 114 | + print(ocsf_event.model_dump()) |
| 115 | + |
| 116 | + except KeyError as e: |
| 117 | + raise (e) |
| 118 | + |
| 119 | + |
| 120 | +if __name__ == '__main__': |
| 121 | + main() |
| 122 | + # _test() |
0 commit comments