use message for adding links
This commit is contained in:
+43
-33
@@ -1,12 +1,13 @@
|
|||||||
"""
|
"""
|
||||||
read file with URLs and store in DB
|
read file with URLs and store in DB
|
||||||
"""
|
"""
|
||||||
import uuid
|
|
||||||
from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
|
from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
|
||||||
import datetime
|
import logging
|
||||||
|
import json
|
||||||
import mariadb
|
import stomp
|
||||||
from setup import get_database_cursors, get_logger, get_scripts, get_meta_data
|
from proton import Message, Event
|
||||||
|
from proton.handlers import MessagingHandler
|
||||||
|
from proton.reactor import Container
|
||||||
|
|
||||||
parser = ArgumentParser(formatter_class=ArgumentDefaultsHelpFormatter)
|
parser = ArgumentParser(formatter_class=ArgumentDefaultsHelpFormatter)
|
||||||
parser.add_argument('-f', '--links', help='file with links')
|
parser.add_argument('-f', '--links', help='file with links')
|
||||||
@@ -20,38 +21,47 @@ def read_links_file(links_file):
|
|||||||
return lines
|
return lines
|
||||||
|
|
||||||
|
|
||||||
def add_link_to_db(statement, connection, video_url, log):
|
class AddLinkMessage(MessagingHandler):
|
||||||
entry_id = str(uuid.uuid4())
|
def __init__(self, server, url, log):
|
||||||
current_date_time = datetime.datetime.now()
|
super(AddLinkMessage, self).__init__()
|
||||||
try:
|
log.info("create AddLinkMessage")
|
||||||
cur = connection.cursor()
|
self.server = server
|
||||||
cur.execute(statement, (entry_id, current_date_time, current_date_time, 0, video_url, True, True, None, None, None, None))
|
self.address = "KontorMediaFile::add_link_file"
|
||||||
connection.commit()
|
self.url = url
|
||||||
log.info(f'link {video_url} added to db')
|
self.log = log
|
||||||
except mariadb.Error as insert_error:
|
|
||||||
log.debug("insert failed with %s", insert_error)
|
def on_start(self, event: Event):
|
||||||
entry_id = None
|
self.log.info("Connection...")
|
||||||
return entry_id
|
conn = event.container.connect(self.server, user="artemis", password="artemis")
|
||||||
|
event.container.create_sender(conn, self.address)
|
||||||
|
|
||||||
|
def on_connection_error(self, event: Event) -> None:
|
||||||
|
self.log.info(f"error: {event}")
|
||||||
|
|
||||||
|
def on_sendable(self, event: Event):
|
||||||
|
self.log.info("send message")
|
||||||
|
event.sender.send(Message(body=self.url, address=self.address, content_type="application/json"))
|
||||||
|
event.connection.close()
|
||||||
|
event.sender.close()
|
||||||
|
|
||||||
|
def on_accepted(self, event: Event) -> None:
|
||||||
|
self.log.info(f"accepted: {event}")
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
logger = get_logger(args.verbose)
|
logging.basicConfig(level=logging.INFO, format='[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s')
|
||||||
logger.info('kontor.read_list started')
|
logging.info('kontor.read_list started')
|
||||||
s_conn, m_conn = get_database_cursors(logger)
|
conn = stomp.Connection([('127.0.0.1', '61616')])
|
||||||
meta_data_tables = get_meta_data(m_conn)
|
conn.connect('artemis', 'artemis', wait=True)
|
||||||
scripts = get_scripts(meta_data_tables, logger)
|
|
||||||
tables = {}
|
|
||||||
for table_id in scripts:
|
|
||||||
tables[scripts[table_id]['name']] = table_id
|
|
||||||
media_file_id = tables['media_file']
|
|
||||||
insert_statement = scripts[tables['media_file']]['insert_mariadb']
|
|
||||||
if args.links:
|
if args.links:
|
||||||
logger.info("read links from file")
|
logging.info("read links from file")
|
||||||
links = read_links_file(args.links)
|
links = read_links_file(args.links)
|
||||||
for link in links:
|
for link in links:
|
||||||
logger.info("add link to db")
|
data_dict = {'url': link.strip()}
|
||||||
add_link_to_db(insert_statement, m_conn, link.strip(), logger)
|
data = json.dumps(data_dict)
|
||||||
else:
|
logging.info("send link message")
|
||||||
logger.info('script used: {}'.format(insert_statement))
|
Container(AddLinkMessage("amqp://127.0.0.1:5672", data, logging)).run()
|
||||||
logger.info('kontor.read_list finished')
|
# conn.send(body=data, destination='KontorMediaFile::add_link_file', headers={'content-type': 'application/json'})
|
||||||
|
conn.disconnect()
|
||||||
|
logging.info('kontor.read_list finished')
|
||||||
|
|
||||||
|
|||||||
@@ -1,4 +1,5 @@
|
|||||||
import stomp
|
import stomp
|
||||||
|
import json
|
||||||
import time
|
import time
|
||||||
from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
|
from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
|
||||||
from config import get_logger
|
from config import get_logger
|
||||||
@@ -17,16 +18,20 @@ class MyListener(stomp.ConnectionListener):
|
|||||||
|
|
||||||
def on_message(self, frame):
|
def on_message(self, frame):
|
||||||
self.log.info(f"received a message '{frame.body}'")
|
self.log.info(f"received a message '{frame.body}'")
|
||||||
|
data = json.loads(frame.body)
|
||||||
|
url = data['url']
|
||||||
|
self.log.info(f"found link: {url}")
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
log = get_logger(args.verbose, args.config)
|
log = get_logger(args.verbose, args.config)
|
||||||
|
log.info("kontor.read_queue started")
|
||||||
host = [('127.0.0.1', 61616)]
|
host = [('127.0.0.1', 61616)]
|
||||||
conn = stomp.Connection(host_and_ports=host)
|
conn = stomp.Connection(host_and_ports=host)
|
||||||
conn.set_listener('', MyListener(log))
|
conn.set_listener('', MyListener(log))
|
||||||
conn.connect(username='artemis', passcode='artemis', wait=True)
|
conn.connect(username='artemis', passcode='artemis', wait=True)
|
||||||
conn.subscribe(destination='KontorMediaFile', id=1, ack='auto', headers={})
|
conn.subscribe(destination='KontorMediaFile::add_link_file', id=1, ack='auto', headers={})
|
||||||
conn.send(body='{ "message": "test message"}', destination="KontorMediaFile")
|
time.sleep(5)
|
||||||
time.sleep(10)
|
|
||||||
conn.disconnect()
|
conn.disconnect()
|
||||||
|
log.info("kontor.read_queue finished")
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user