-
Notifications
You must be signed in to change notification settings - Fork 9
Expand file tree
/
Copy pathrelease_monitor.py
More file actions
261 lines (205 loc) · 8.6 KB
/
release_monitor.py
File metadata and controls
261 lines (205 loc) · 8.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
#!/usr/bin/env python3
"""The release monitor project."""
# std imports:
import logging
import os
import sys
from time import sleep
from abc import ABC, abstractmethod
from typing import Set, Union
# wow fix CI :)
assert Set
assert Union
# 3rt party imports:
import feedparser
import requests
from f8a_worker.setup_celery import init_celery, init_selinon
from f8a_worker.utils import normalize_package_name
from selinon import run_flow
import sentry_sdk
# local imports:
from release_monitor.defaults import NPM_URL, PYPI_URL, ENABLE_SCHEDULING, SLEEP_INTERVAL
def set_up_logger():
"""Set up logging."""
loglevel = os.environ.get('LOGLEVEL', 'INFO').upper()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(formatter)
logger = logging.getLogger('release_monitor')
logger.setLevel(loglevel)
logger.addHandler(handler)
return logger
logger = set_up_logger()
sentry_sdk.init(os.environ.get("SENTRY_DSN"))
class Package:
"""Encapsulate name and version."""
def __init__(self, name, version):
# type: (str, str) -> None
"""Create new package."""
self.name = name
self.version = version
def __eq__(self, other):
"""Test equality element by element."""
return (self.name, self.version) == (other.name, other.version)
def __hash__(self):
"""For usage in sets."""
return hash((self.name, self.version))
class AbstractMonitor(ABC):
"""
Abstract monitoring component for any feed (XML, JSON ...).
This class implements common logic, like set comparison. Implement your own child class
for each specific feed.
"""
def __init__(self):
# type: () -> None
"""
Create new monitor with sets of old and new packages.
Don't immediately schedule all packages in the feed.
TODO: decide on this ^
"""
self.old_set = self.fetch_feed() # type: Set[Package]
self.new_set = self.old_set # type: Set[Package]
@abstractmethod
def fetch_feed(self):
# type: () -> Set[Package]
"""
Implement your feed update logic here (e.g. load new XML file from the Internet).
Return the new feed as a set of packages.
"""
pass
def get_updated_packages(self):
# type: () -> Set[Package]
"""Run this function in an infinite loop to get incremental updates for your feed."""
self.old_set = self.new_set
self.new_set = self.fetch_feed()
return self.new_set - self.old_set
class PypiMonitor(AbstractMonitor):
"""Monitor Python Package Index."""
def __init__(self, url=None):
"""Store some PyPi specific data."""
self.pypi_url = url or PYPI_URL
super(PypiMonitor, self).__init__()
def fetch_feed(self):
"""Fetch PyPi RSS updates."""
def create_package_from_pypi_dict(dict):
title_parts = dict['title'].split(' ')
return Package(title_parts[0], title_parts[1])
list_of_pypi_updates = feedparser.parse(self.pypi_url + "rss/updates.xml").entries
try:
updated_packages = set(map(create_package_from_pypi_dict, list_of_pypi_updates))
except KeyError as e:
# if the "title" does not exist, catch the error and return nothing
logger.error('Titles does not exist : {e}'.format(e=str(e)))
return set()
except IndexError as e:
# if the "title" does not contain name and version, catch the error and return nothing
logger.error('name and version does not exist : {e}'.format(e=str(e)))
return set()
return updated_packages
class NPMMonitor(AbstractMonitor):
"""Monitor for the NPM package registry."""
def __init__(self, url=None):
"""Store some NPM specific data."""
self.npm_url = url or NPM_URL + "-/rss"
super(NPMMonitor, self).__init__()
def fetch_pkg_names_from_feed(self):
# type: () -> Union[Set[str], None]
"""Contact NPM repository and get a list of updated packages."""
npm_feed = feedparser.parse(self.npm_url)
try:
r = set(map(lambda x: x['title'], npm_feed.entries))
return r
except KeyError as e:
logger.error('npm package key not found : {e}'.format(e=str(e)))
return None
@staticmethod
def fetch_latest_package_version(package):
# type: (str) -> Union[str, None]
"""Contact NPM repository and get the latest version for the package."""
package_url = NPM_URL + "-/package/{}/dist-tags".format(package)
try:
req = requests.get(package_url, headers={'content-type': 'application/json'})
if req.status_code == 200:
body = req.json()
return body['latest']
except ValueError as e:
# The body was not a valid JSON
logger.error('valid JSON key not found : {e}'.format(e=str(e)))
return None
except KeyError as e:
# The body was a valid JSON, but it did not contain version field
logger.error('version key not found : {e}'.format(e=str(e)))
return None
def fetch_feed(self):
"""
Fetch the NPM feed.
This one is a bit more tricky as the feed itself does not contain version number. So there
are multiple possibilities how to solve this:
a) don't care, fetch the feed, for each entry do additional HTTP request and get the newest
version as well. (motto: premature optimization is the root of all evil)
b) create a local cache and try it before performing the request itself
c) reimplement the logic from abstract base class and just calculate set(pkg_names) -
- set(old_names)
I'll go with the first one for now.
"""
npm_feed = self.fetch_pkg_names_from_feed()
if npm_feed is None:
return set()
def create_package_object(pkg_name):
# type: (str) -> Union[Package, None]
version = NPMMonitor.fetch_latest_package_version(pkg_name)
return None if version is None else Package(pkg_name, version)
def not_none(x):
return x is not None
return set(filter(not_none, map(create_package_object, npm_feed)))
class ReleaseMonitor():
"""Class which check rss feeds for new releases."""
def __init__(self):
"""Constructor."""
logger.info("Starting the monitor service")
# Create PyPi monitor
self.pypi_monitor = PypiMonitor()
# Create NPM monitor
self.npm_monitor = NPMMonitor()
# Initialize Selinon if we want to run in production
if ENABLE_SCHEDULING:
init_celery(result_backend=False)
init_selinon()
def run_package_analysis(self, name, ecosystem, version):
"""Run Selinon flow for analyses.
:param name: name of the package to analyse
:param version: package version
:param ecosystem: package ecosystem
:return: dispatcher ID serving flow
"""
node_args = {
'ecosystem': ecosystem,
'name': normalize_package_name(ecosystem, name),
'version': version,
'force': True,
'recursive_limit': 0
}
logger.info("Scheduling Selinon flow '%s' "
"with node_args: '%s'", 'bayesianFlow', node_args)
return run_flow('bayesianFlow', node_args)
def run(self):
"""Run the monitor."""
logger.info("Sleep interval: {}".format(SLEEP_INTERVAL))
logger.info("Enabled scheduling: {}".format(ENABLE_SCHEDULING))
while True:
for pkg in self.pypi_monitor.get_updated_packages():
if ENABLE_SCHEDULING:
logger.info("Scheduling package from PyPI: '%s':'%s'", pkg.name, pkg.version)
self.run_package_analysis(pkg.name, 'pypi', pkg.version)
else:
logger.info("Processing package from PyPI: '%s':'%s'", pkg.name, pkg.version)
for pkg in self.npm_monitor.get_updated_packages():
if ENABLE_SCHEDULING:
logger.info("Scheduling package from NPM: '%s':'%s'", pkg.name, pkg.version)
self.run_package_analysis(pkg.name, 'npm', pkg.version)
else:
logger.info("Processing package from NPM: '%s':'%s'", pkg.name, pkg.version)
sleep(60 * SLEEP_INTERVAL)
if __name__ == '__main__':
monitor = ReleaseMonitor()
monitor.run()