Presentación

  • Jorge Aguilera

  • Programador backend

Objetivo

  • Consumir OpenData de Madrid sobre tráfico

    • Microservicios Micronaut

    • Kakfa como broker

    • Base de datos (PostgreSQL)

    • Accesible Internet

Okteto

"Instantly spin up pre-configured environments in the cloud and start developing within seconds"

  • kubernetes

  • entornos de desarrollo

  • depuración de aplicaciones k8s

  • stacks …​

OpenData del Ayuntamiento

14K estaciones reportan cada 5' la intensidad de trafico en cada punto geoposicionado

Vamos a consumir esta información y crear una alarma cuando un punto esté saturado durante los últimos 15 minutos

Arquitectura

architecture

Proyecto principal

gradle init

MicroServicios

mn create-app --lang groovy --features jib,kafka,kubernetes extractor

mn create-app --lang groovy --features jib,kafka,kubernetes,data-jdbc,postgres analizer

mn create-app --lang groovy --features jib,kafka,kubernetes,data-jdbc,postgres stations

Deploy

okteto-stack.yml
name: trafico-madrid

services:
  zookeeper:
    image: docker.io/bitnami/zookeeper:3-debian-10
    ...
  kafka:
    image: docker.io/bitnami/kafka:3.0.1-debian-10-r36
    ...
  db_alarmas:
    image: postgres:9.4
    ...
  db_estaciones:
    image: postgres:9.4
    ...
  extractor:
    image: dev/extractor
    build:
       context: extractor/build/docker/main
       dockerfile: Dockerfile
  stations:
    image: dev/stations
    build:
      context: stations/build/docker/main
      dockerfile: Dockerfile
  analizer:
    image: dev/analizer
    build:
       context: analizer/build/docker/main
       dockerfile: Dockerfile

Extractor

(incluir en settings.gradle del principal)

EstadoUrbano.groovy
import java.time.Instant
class EstadoUrbano {
    String idelem
    Instant fechahora
    String descripcion
    Integer accesoAsociado
    Integer intensidad
    Integer ocupacion
    Integer carga
    Integer nivelServicio
    Integer intensidadSat
    String error
    Integer subarea
    String st_x
    String st_y
}
ExtractorService.groovy
    void execute(){
        String url = "https://informo.madrid.es/informo/tmadrid/pm.xml";
        def xml = new XmlParser().parse(url)
        def list = []
        xml.pm.each{ item ->
            def add = new EstadoUrbano(
                     idelem: item.idelem.text(),
                     ...
            )
            list.add add
            if( list.size() > 100 ){
                traficoProducer.sendStatusUrbano(list)
                list.clear()
            }
        }
        traficoProducer.sendStatusUrbano(list)
    }
TraficoProducer.groovy
@KafkaClient(batch = true)
interface TraficoProducer {
    @Topic("urbano")
    void sendStatusUrbano(List<EstadoUrbano> list)
}

Stations

EstacionEntity y EstacionRepository para persistir en Postgre las estaciones

EstadoUrbano.groovy reutilizamos el de extractor por comodidad

EstacionListener.groovy
@KafkaListener(offsetReset = OffsetReset.EARLIEST)
class EstacionListener {
@Topic("urbano")
    void receiveUrbano( @MessageBody EstadoUrbano bean){
        ....
    }
}

Alarms

`