-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathgateway.py
More file actions
executable file
·117 lines (89 loc) · 3.24 KB
/
gateway.py
File metadata and controls
executable file
·117 lines (89 loc) · 3.24 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
from asyncio import CancelledError
import json
from aiohttp import web, hdrs, ClientSession
from aiopg.sa import create_engine
from middlewares import BasicAuthMiddleware, RequestAuthorizationMiddleware
@web.middleware
async def routing(request, handler):
merchant = request.match_info.get('merchant')
channel = request.rel_url.query['channel']
# Check if there is route for the specified merchant/channel
for entry in [f'{merchant}:{channel}', f':{channel}', f'{merchant}:']:
route = app['configuration']['routes'].get(entry)
if route is not None:
request['address'] = f"ws://{route['address']}/{merchant}/products"
break
else:
return web.Response(status=web.HTTPNotFound.status_code)
return await handler(request)
basic_auth = BasicAuthMiddleware()
request_authz = RequestAuthorizationMiddleware()
async def manage_connection(app):
app['db'] = await create_engine(
user='registrar',
database='registry',
host='localhost',
port=5432,
password='123123123'
)
app['listener'] = app.loop.create_task(listen(app))
yield
app['listener'].cancel()
await app['listener']
app['db'].close()
await app['db'].wait_closed()
async def listen(app):
async with app['db'].acquire() as conn:
await conn.execute('LISTEN routes')
try:
while True:
msg = await conn.connection.notifies.get()
payload = json.loads(msg.payload)
action = payload['action']
route = payload['data']
if action == 'DELETE':
del app['configuration']['routes'][
f"{route['merchant']}:{route['channel']}"
]
else:
app['configuration']['routes'][f"{route['merchant']}:{route['channel']}"] = {
'address': route['address'],
'users': [] if route['users'] is None else [str(user) for user in route['users']]
}
except CancelledError:
pass
finally:
await conn.execute('UNLISTEN routes')
async def load_configuration(app):
app['configuration'] = {
'routes': {}
}
# Load routes
async with app['db'].acquire() as conn:
async for row in conn.execute('SELECT * FROM routes'):
app['configuration']['routes'][f'{row.merchant}:{row.channel}'] = {
'address': row.address,
'users': [] if row.users is None else [str(user) for user in row.users]
}
async def stream(request):
# Start streaming the data as soon as it becomes available
response = web.StreamResponse(
status=web.HTTPOk.status_code,
headers={hdrs.CONTENT_TYPE: 'text/html'})
await response.prepare(request)
async with ClientSession() as session:
async with session.ws_connect(request['address']) as ws:
async for msg in ws:
await response.write(msg.data.encode('utf-8'))
return response
app = web.Application(middlewares=[
basic_auth,
request_authz,
routing
])
app.cleanup_ctx.append(manage_connection)
app.on_startup.append(load_configuration)
app.add_routes([
web.get('/products/{merchant}', stream),
])
web.run_app(app)