From 1c2c2f38a47aca99ce0b0f8f54d96c9de0c74cea Mon Sep 17 00:00:00 2001 From: Thomas Peetz Date: Mon, 7 Jul 2025 15:49:33 +0200 Subject: [PATCH] use message for adding links --- kontor-scripts/read_list.py | 76 ++++++++++++++++++++---------------- kontor-scripts/read_queue.py | 11 ++++-- 2 files changed, 51 insertions(+), 36 deletions(-) diff --git a/kontor-scripts/read_list.py b/kontor-scripts/read_list.py index cc1c4e8..17630f8 100644 --- a/kontor-scripts/read_list.py +++ b/kontor-scripts/read_list.py @@ -1,12 +1,13 @@ """ read file with URLs and store in DB """ -import uuid from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter -import datetime - -import mariadb -from setup import get_database_cursors, get_logger, get_scripts, get_meta_data +import logging +import json +import stomp +from proton import Message, Event +from proton.handlers import MessagingHandler +from proton.reactor import Container parser = ArgumentParser(formatter_class=ArgumentDefaultsHelpFormatter) parser.add_argument('-f', '--links', help='file with links') @@ -20,38 +21,47 @@ def read_links_file(links_file): return lines -def add_link_to_db(statement, connection, video_url, log): - entry_id = str(uuid.uuid4()) - current_date_time = datetime.datetime.now() - try: - cur = connection.cursor() - cur.execute(statement, (entry_id, current_date_time, current_date_time, 0, video_url, True, True, None, None, None, None)) - connection.commit() - log.info(f'link {video_url} added to db') - except mariadb.Error as insert_error: - log.debug("insert failed with %s", insert_error) - entry_id = None - return entry_id +class AddLinkMessage(MessagingHandler): + def __init__(self, server, url, log): + super(AddLinkMessage, self).__init__() + log.info("create AddLinkMessage") + self.server = server + self.address = "KontorMediaFile::add_link_file" + self.url = url + self.log = log + + def on_start(self, event: Event): + self.log.info("Connection...") + conn = event.container.connect(self.server, user="artemis", password="artemis") + event.container.create_sender(conn, self.address) + + def on_connection_error(self, event: Event) -> None: + self.log.info(f"error: {event}") + + def on_sendable(self, event: Event): + self.log.info("send message") + event.sender.send(Message(body=self.url, address=self.address, content_type="application/json")) + event.connection.close() + event.sender.close() + + def on_accepted(self, event: Event) -> None: + self.log.info(f"accepted: {event}") if __name__ == '__main__': - logger = get_logger(args.verbose) - logger.info('kontor.read_list started') - s_conn, m_conn = get_database_cursors(logger) - meta_data_tables = get_meta_data(m_conn) - scripts = get_scripts(meta_data_tables, logger) - tables = {} - for table_id in scripts: - tables[scripts[table_id]['name']] = table_id - media_file_id = tables['media_file'] - insert_statement = scripts[tables['media_file']]['insert_mariadb'] + logging.basicConfig(level=logging.INFO, format='[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s') + logging.info('kontor.read_list started') + conn = stomp.Connection([('127.0.0.1', '61616')]) + conn.connect('artemis', 'artemis', wait=True) if args.links: - logger.info("read links from file") + logging.info("read links from file") links = read_links_file(args.links) for link in links: - logger.info("add link to db") - add_link_to_db(insert_statement, m_conn, link.strip(), logger) - else: - logger.info('script used: {}'.format(insert_statement)) - logger.info('kontor.read_list finished') + data_dict = {'url': link.strip()} + data = json.dumps(data_dict) + logging.info("send link message") + Container(AddLinkMessage("amqp://127.0.0.1:5672", data, logging)).run() + # conn.send(body=data, destination='KontorMediaFile::add_link_file', headers={'content-type': 'application/json'}) + conn.disconnect() + logging.info('kontor.read_list finished') diff --git a/kontor-scripts/read_queue.py b/kontor-scripts/read_queue.py index 5d2562b..22dc8f6 100644 --- a/kontor-scripts/read_queue.py +++ b/kontor-scripts/read_queue.py @@ -1,4 +1,5 @@ import stomp +import json import time from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter from config import get_logger @@ -17,16 +18,20 @@ class MyListener(stomp.ConnectionListener): def on_message(self, frame): self.log.info(f"received a message '{frame.body}'") + data = json.loads(frame.body) + url = data['url'] + self.log.info(f"found link: {url}") if __name__ == '__main__': log = get_logger(args.verbose, args.config) + log.info("kontor.read_queue started") host = [('127.0.0.1', 61616)] conn = stomp.Connection(host_and_ports=host) conn.set_listener('', MyListener(log)) conn.connect(username='artemis', passcode='artemis', wait=True) - conn.subscribe(destination='KontorMediaFile', id=1, ack='auto', headers={}) - conn.send(body='{ "message": "test message"}', destination="KontorMediaFile") - time.sleep(10) + conn.subscribe(destination='KontorMediaFile::add_link_file', id=1, ack='auto', headers={}) + time.sleep(5) conn.disconnect() + log.info("kontor.read_queue finished")