Skip to content

Commit ef0ee23

Browse files
committed
Add Prometheus instrumentation for ingest-file workers
1 parent 14118ec commit ef0ee23

File tree

3 files changed

+56
-4
lines changed

3 files changed

+56
-4
lines changed

ingestors/manager.py

+46-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import magic
22
import logging
3+
from timeit import default_timer
34
from tempfile import mkdtemp
45
from datetime import datetime
56
from pkg_resources import get_distribution
@@ -15,6 +16,7 @@
1516
from sentry_sdk import capture_exception
1617
from followthemoney.helpers import entity_filename
1718
from followthemoney.namespace import Namespace
19+
from prometheus_client import Counter, Histogram
1820

1921
from ingestors.directory import DirectoryIngestor
2022
from ingestors.exc import ProcessingException
@@ -23,6 +25,27 @@
2325

2426
log = logging.getLogger(__name__)
2527

28+
INGEST_SUCCEEDED = Counter(
29+
"ingest_succeeded_total",
30+
"Successful ingestions",
31+
["ingestor"],
32+
)
33+
INGEST_FAILED = Counter(
34+
"ingest_failed_total",
35+
"Failed ingestions",
36+
["ingestor"],
37+
)
38+
INGEST_DURATION = Histogram(
39+
"ingest_duration_seconds",
40+
"Ingest duration by ingestor",
41+
["ingestor"],
42+
)
43+
INGEST_INGESTED_BYTES = Counter(
44+
"ingest_ingested_bytes_total",
45+
"Total number of bytes ingested",
46+
["ingestor"],
47+
)
48+
2649

2750
class Manager(object):
2851
"""Handles the lifecycle of an ingestor. This can be subclassed to embed it
@@ -138,8 +161,10 @@ def ingest_entity(self, entity):
138161
def ingest(self, file_path, entity, **kwargs):
139162
"""Main execution step of an ingestor."""
140163
file_path = ensure_path(file_path)
164+
file_size = None
141165
if file_path.is_file() and not entity.has("fileSize"):
142-
entity.add("fileSize", file_path.stat().st_size)
166+
file_size = file_path.stat().st_size # size in bytes
167+
entity.add("fileSize", file_size)
143168

144169
now = datetime.now()
145170
now_string = now.strftime("%Y-%m-%dT%H:%M:%S.%f")
@@ -148,14 +173,32 @@ def ingest(self, file_path, entity, **kwargs):
148173
entity.set("processingAgent", get_distribution("ingest").version)
149174
entity.set("processedAt", now_string)
150175

176+
ingestor_class = None
177+
ingestor_name = None
178+
151179
try:
152180
ingestor_class = self.auction(file_path, entity)
153-
log.info("Ingestor [%r]: %s", entity, ingestor_class.__name__)
181+
ingestor_name = ingestor_class.__name__
182+
log.info("Ingestor [%r]: %s", entity, ingestor_name)
183+
184+
start_time = default_timer()
154185
self.delegate(ingestor_class, file_path, entity)
186+
duration = max(0, default_timer() - start_time)
187+
188+
INGEST_SUCCEEDED.labels(ingestor_name).inc()
189+
INGEST_DURATION.labels(ingestor_name).observe(duration)
190+
INGEST_INGESTED_BYTES.labels(ingestor_name).inc(file_size)
191+
155192
entity.set("processingStatus", self.STATUS_SUCCESS)
156193
except ProcessingException as pexc:
157-
entity.set("processingError", stringify(pexc))
158194
log.exception("[%r] Failed to process: %s", entity, pexc)
195+
196+
if ingestor_name:
197+
INGEST_FAILED.labels(ingestor_name).inc()
198+
else:
199+
INGEST_FAILED.labels(None).inc()
200+
201+
entity.set("processingError", stringify(pexc))
159202
capture_exception(pexc)
160203
finally:
161204
self.finalize(entity)

ingestors/support/convert.py

+9
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import subprocess
55

66
from followthemoney.helpers import entity_filename
7+
from prometheus_client import Counter
78

89
from ingestors.support.cache import CacheSupport
910
from ingestors.support.temp import TempFileSupport
@@ -14,6 +15,12 @@
1415
TIMEOUT = 3600 # seconds
1516
CONVERT_RETRIES = 5
1617

18+
INGEST_PDF_CACHE_ACCESSED = Counter(
19+
"ingest_pdf_cache_accessed",
20+
"Number of times the PDF cache has been accessed, by cache status",
21+
["status"],
22+
)
23+
1724

1825
class DocumentConvertSupport(CacheSupport, TempFileSupport):
1926
"""Provides helpers for UNO document conversion."""
@@ -25,10 +32,12 @@ def document_to_pdf(self, unique_tmpdir, file_path, entity):
2532
file_name = entity_filename(entity, extension="pdf")
2633
path = self.manager.load(pdf_hash, file_name=file_name)
2734
if path is not None:
35+
INGEST_PDF_CACHE_ACCESSED.labels("hit").inc()
2836
log.info("Using PDF cache: %s", file_name)
2937
entity.set("pdfHash", pdf_hash)
3038
return path
3139

40+
INGEST_PDF_CACHE_ACCESSED.labels("miss").inc()
3241
pdf_file = self._document_to_pdf(unique_tmpdir, file_path, entity)
3342
if pdf_file is not None:
3443
content_hash = self.manager.store(pdf_file)

requirements.txt

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ normality==2.4.0
33
pantomime==0.6.1
44
followthemoney==3.5.2
55
followthemoney-store[postgresql]==3.0.6
6-
servicelayer[google,amazon]==1.21.2
6+
servicelayer[google,amazon]==1.22.0
77
languagecodes==1.1.1
88
countrytagger==0.1.2
99
pyicu==2.11

0 commit comments

Comments
 (0)