read queue

This commit is contained in:
Thomas Peetz
2025-07-10 22:04:31 +02:00
parent d2d4deb350
commit c6fd80408b
4 changed files with 47 additions and 6 deletions
+6 -4
View File
@@ -4,7 +4,6 @@ read file with URLs and store in DB
from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
import logging
import json
import stomp
from proton import Message, Event
from proton.handlers import MessagingHandler
from proton.reactor import Container
@@ -41,16 +40,19 @@ class AddLinkMessage(MessagingHandler):
def on_connection_error(self, event: Event) -> None:
self.log.info(f"error: {event}")
def on_disconnected(self, event: Event) -> None:
self.log.debug(f"disconnected: {repr(event)}")
def on_sendable(self, event: Event):
self.log.info("send message")
message = Message(body=self.url, address=self.address, content_type="application/json", durable=True)
delivery = event.sender.send(message)
self.log.info(f"Delivery {delivery} sent")
event.sender.close()
event.connection.close()
def on_accepted(self, event: Event) -> None:
self.log.info(f"accepted Delivery: {event.delivery}")
self.log.info(f"accepted Delivery: {event.delivery.remote_state}")
event.connection.close()
def on_rejected(self, event: Event) -> None:
@@ -0,0 +1,15 @@
package de.thpeetz.kontor.integration.routes;
import org.apache.camel.builder.RouteBuilder;
import org.springframework.stereotype.Component;
@Component
public class AddLinkFromQueue extends RouteBuilder {
@Override
public void configure() throws Exception {
from("jms:queue:add_link_file")
.routeId("read-queue-add-link_file")
.log("${body}")
.to("bean:addLinkService?method=fromQueue");
}
}
@@ -1,4 +1,4 @@
package de.thpeetz.kontor.integration;
package de.thpeetz.kontor.integration.routes;
import org.apache.camel.builder.RouteBuilder;
import org.springframework.stereotype.Component;
@@ -9,6 +9,8 @@ public class ReadQueueRoute extends RouteBuilder {
@Override
public void configure() throws Exception {
from("jms:queue:KontorMediaFile")
.log("${body}");
.routeId("add-link-from-queue")
.trace(true)
.log(">>> ${body}");
}
}
@@ -0,0 +1,22 @@
package de.thpeetz.kontor.integration.services;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.util.HashMap;
@Service
@Slf4j
public class AddLinkService {
public void fromQueue(String messageBody) throws JsonProcessingException {
log.info("get body: {}", messageBody);
ObjectMapper objectMapper = new ObjectMapper();
HashMap<String,String> myMap = objectMapper.readValue(messageBody, new TypeReference<HashMap<String,String>>() {});
String url = myMap.get("url");
log.info("found url: {}", url);
}
}