Skip to main content

No project description provided

Project description

::: {md-component="skip"} Skip to content{.md-skip} :::

::: {md-component="announce"} :::

::::::::::::: {.md-header .md-header--shadow md-component="header"} {.md-header__button .md-logo aria-label="lbxtoolkit" md-component="logo"}

:::::: {.md-header__title md-component="header-title"} ::::: md-header__ellipsis ::: md-header__topic [ lbxtoolkit ]{.md-ellipsis} :::

::: {.md-header__topic md-component="header-topic"} [ Home ]{.md-ellipsis} ::: ::::: ::::::

:::::::: {.md-search md-component="search" role="dialog"} ::::::: {.md-search__inner role="search"}

:::::: md-search__output ::::: {.md-search__scrollwrap tabindex="0" md-scrollfix=""} :::: {.md-search-result md-component="search-result"} ::: md-search-result__meta Initializing search ::: :::: ::::: :::::: ::::::: :::::::: :::::::::::::

:::::::::::::::::::::::::::::::::::::::::::::::::::::::::: {.md-container md-component="container"} :::::::::::::::::::::::::::::::::::::::::::::::::::::: {.md-main role="main" md-component="main"} ::::::::::::::::::::::::::::::::::::::::::::::::::::: {.md-main__inner .md-grid} ::::: {.md-sidebar .md-sidebar--primary md-component="sidebar" md-type="navigation"} :::: md-sidebar__scrollwrap ::: md-sidebar__inner {.md-nav__button .md-logo aria-label="lbxtoolkit" md-component="logo"} lbxtoolkit

[ Home ]{.md-ellipsis} []{.md-nav__icon .md-icon} [ Home ]{.md-ellipsis}{.md-nav__link .md-nav__link--active}

[]{.md-nav__icon .md-icon} Table of contents

::::: {.md-sidebar .md-sidebar--secondary md-component="sidebar" md-type="toc"} :::: md-sidebar__scrollwrap ::: md-sidebar__inner []{.md-nav__icon .md-icon} Table of contents

:::::::::::::::::::::::::::::::::::::::::::::: {.md-content md-component="content"}

Home

::::::::::::::::::::::::::::::::::::::::::::: {.doc .doc-object .doc-module} []{#lbxtoolkit}

:::::::::::::::::::::::::::::::::::::::::::: {.doc .doc-contents .first}

Biblioteca de ferramentas LBX S/A {#lbxtoolkit--biblioteca-de-ferramentas-lbx-sa}

Esta biblioteca possui um ferramentas utilitárias de uso recorrente para aplicações de RPA em python.

Classe e funções {#lbxtoolkit--classe-e-funcoes}

auth_EntraID
Usa o Microsoft Entra ID (antiga Azure AD) para evitar execução não autorizada disclaimer : Mensagem sobre a necessidade de autenticação valida_grupo : Autentica o usuário e aborta se checa não pertencer ao grupo de segurança

postgreSQL Interage com o banco de dados PostgreSQL - .db: Inicia sessão com o banco - .csv_df: Lê arquivo CSV e gera Dataframe (pandas) a partir dele - .db_insert_df: Insere informações de Dataframe em tabela do banco com estrutura equivalente - .db_select: Retorna um cursor a partir de uma query - .db_update: Executa update em tabelas

api_rest Interage com APIs RESTfull, especialmente providas para a plataforma Sienge - .auth_base: Autentica (HTTPBasicAuth) sessão na API - .auth_bearer: Autentica sessão na API pelos métodos: OAuth, JWT, Bearer\

  • .endpoint_json: Realizad chama ao endpoint. Payload em formato json opcional. - .trata_erro_sienge: Retorna a mensagem de erro do Sienge caso código de retorno seja diferente de 200. - .close: Encerra a sessão autenticada

lbx_logger Manipula e formata as mensagens de saída do script para direcioná-las para tela (stdout) e/ou arquivo de log - .add: Adiciona a mensagem a um buffer sem exibir, acumulando até a próxima chamada em algum dos níveis abaixo. - .print: Contorna o manipulador de log e imprime diretamente na tela (stdout), sem formatar a mensagem nem registrar no arquivo - .debug, .info, .aviso, .erro, .critico: Classifica as mensagens por nível de severidade/relevância e rediciona a saída (arquivo, tela, tela+arquivo) conforme a configuração do nível - .stop_logging: Interrompe a manipulação das saídas pelo logger e restaura as saídas padrão (stdout/stderr) para a tela - .filtra: Filtra os eventos do arquivo de log registrados em um intervalo de tempo específico

misc Classe de miscelâneas/diversos - .seleciona_arquivo: Abre um picker do sistema operacionar para selecionar um arquivo e retorna seu path - .seleciona_dir: Abre um picker do sistema operacionar para selecionar um diretório e retorna seu path - .normaliza: Limpa caracteres especiais e espaços de strings e retorna tudo em minúsculo - .get_cmd_window: Captura a referencia da janela atual (cmd.exe) para retornar o foco à ela depois de chamar os pickers - .maximize_console: Maxima a janela do console (cmd.exe)

Instalação e uso: {#lbxtoolkit--instalacao-e-uso}

Instalação {#lbxtoolkit--instalacao}

pip install lbx_toolkit

Uso {#lbxtoolkit--uso}

from lbx_toolkit import auth_EntraID, PostgreSQL, api_rest, lbx_logger

::::::::::::::::::::::::::::::::::::::::::: {.doc .doc-children} :::::: {.doc .doc-object .doc-class}

ConfigManager {#lbxtoolkit.ConfigManager .doc .doc-heading}

::::: {.doc .doc-contents} Como Funciona Singleton Pattern: ConfigManager é um singleton que garante que todas as partes do código usem a mesma instância e, portanto, compartilhem a mesma configuração. Inicialização com Argumentos Dinâmicos: O método initialize usa **kwargs para aceitar qualquer número de pares chave-valor, armazenando-os no dicionário _config da instância. Método Genérico get: O método get aceita uma chave como argumento e retorna o valor correspondente do dicionário _config. Método set: O método set permite adicionar ou atualizar dinamicamente valores no dicionário _config. Método reset: O método reset limpa todas as configurações armazenadas, permitindo uma nova inicialização do ConfigManager com novos valores

Source code in lbxtoolkit\__init__.py

::: highlight +-----------------------------------+-----------------------------------+ | ::: linenodiv |

| | 1336 | | | 1337 | class ConfigManager: # Inicia | | 1338 | liza e recupera variáveis em ambi | | 1339 | ente de intercâmbio entre classes | | 1340 | """ | | 1341 | Como Funciona | | 1342 | | | 1343 | Singleton Pattern: ConfigMan | | 1344 | ager é um singleton que garante q | | 1345 | ue todas as partes do código usem | | 1346 | a mesma instância e, portanto, c | | 1347 | ompartilhem a mesma configuração. | | 1348 | Inicializ | | 1349 | ação com Argumentos Dinâmicos: O | | 1350 | método initialize usa **kwargs pa | | 1351 | ra aceitar qualquer número de par | | 1352 | es chave-valor, armazenando-os no | | 1353 | dicionário _config da instância. | | 1354 | | | 1355 | Método Genérico get: O | | 1356 | método get aceita uma chave como | | 1357 | argumento e retorna o valor corr | | 1358 | espondente do dicionário _config. | | 1359 | M | | 1360 | étodo set: O método set permite a | | 1361 | dicionar ou atualizar dinamicamen | | 1362 | te valores no dicionário _config. | | 1363 | Método rese | | 1364 | t: O método reset limpa todas as | | 1365 | configurações armazenadas, permit | | 1366 | indo uma nova inicialização do Co | | 1367 | nfigManager com novos valores | | 1368 | """ | | 1369 | _instance = None | | 1370 | def new(cls): | | 1371 | | | 1372 | if cls._instance is None: | | ::: | | | | cls._instance = super | | | (ConfigManager, cls).new(cls) | | | | | | cls._instance._config = {} | | | return cls._instance | | | # | | | # | | | @classmethod | | | | | | def initialize(cls, **kwargs): | | | instance = cls() | | | | | | for key, value in kwargs.items(): | | | | | | instance._config[key] = value | | | # | | | # | | | @classmethod | | | def get(cls, key): | | | retu | | | rn cls._instance._config.get(key) | | | # | | | # | | | @classmethod | | | def set(cls, key, value): | | | c | | | ls._instance._config[key] = value | | | # | | | # | | | @classmethod | | | def reset(cls): | | | | | | cls._instance._config = {} | | | | | |
| +-----------------------------------+-----------------------------------+ :::

::: {.doc .doc-children} ::: ::::: ::::::

:::::::::::::::::: {.doc .doc-object .doc-class}

Servicer {#lbxtoolkit.Servicer .doc .doc-heading}

::::::::::::::::: {.doc .doc-contents} Classe base que implementa as rotinas padrão para a criação dameons/serviços do windows. Além de iniciar e interromper o daemon/serviço, a classe implementar os métodos daemon_logs() e cleanup(), dependencias de run() e stop() que criam um arquivo de log do serviço/daemon (não do script em si) e um arquivo PID para o monitor de serviços. Os métodos padrão são init(), run() e stop() e não devem ser redefinidos/sobrecarregados. Para ser funcional, é necessária a criação de uma classe local que herde essa classe e redefina (por sobrecarga) ao menos os métodos on_run, on_start e args_parser(). on_init é opcional. Outros métodos complementares são oferidos para init, run e stop, permitindo injetar código no ínicio (pre) e fim (pós) os respectivos métodos (on_init_pre(), on_init_pos(), on_run_pre(), on_run_pos(), on_stop_pre(), on_stop_pos())

Source code in lbxtoolkit\__init__.py

::: highlight +-----------------------------------+-----------------------------------+ | ::: linenodiv |

| | 1377 | | | 1378 | class Servicer(): # Cria | | 1379 | um daemon para rodar como serviço | | 1380 | """ | | 1381 | | | 1382 | Classe base que implem | | 1383 | enta as rotinas padrão para a cri | | 1384 | ação dameons/serviços do windows. | | 1385 | Além de iniciar e i | | 1386 | nterromper o daemon/serviço, a cl | | 1387 | asse implementar os métodos daemo | | 1388 | n_logs() e cleanup(), dependencia | | 1389 | s de run() e stop() que criam um | | 1390 | arquivo de log do serviço/daemon | | 1391 | (não do script em si) e um arquiv | | 1392 | o PID para o monitor de serviços. | | 1393 | | | 1394 | Os métodos padrão são i | | 1395 | nit(), run() e stop() e não devem | | 1396 | ser redefinidos/sobrecarregados. | | 1397 | Para ser funcional, é | | 1398 | necessária a criação de uma clas | | 1399 | se local que herde essa classe e | | 1400 | redefina (por sobrecarga) ao meno | | 1401 | s os métodos on_run, on_start e a | | 1402 | rgs_parser(). on_init é opcional. | | 1403 | | | 1404 | Outros métodos complementares são | | 1405 | oferidos para init, run e stop, | | 1406 | permitindo injetar código no ínic | | 1407 | io (pre) e fim (pós) os respectiv | | 1408 | os métodos (on_init_pre(), on_ini | | 1409 | t_pos(), on_run_pre(), on_run_pos | | 1410 | (), on_stop_pre(), on_stop_pos()) | | 1411 | """ | | 1412 | def init(se | | 1413 | lf, Log=None, piddir=None):#TODO: | | 1414 | ao criar uma classe padrão usar | | 1415 | args/kwargs para lidar como param | | 1416 | etros variáveis no instanciamento | | 1417 | | | 1418 | # PRE-REQUISITOS/DEPENDÊNCIAS: | | 1419 | se | | 1420 | lf.log = ConfigManager.get('log') | | 1421 | self.kwargs = | | 1422 | ConfigManager.get('argparse_cfg') | | 1423 | | | 1424 | self.kwopts = ConfigManager.ge | | 1425 | t('argparse_opt') | | 1426 | self.ambiente = Confi | | 1427 | gManager.get('ambiente') | | 1428 | | | 1429 | if self.log is None or not | | 1430 | isinstance(self.log, lbx_logger): | | 1431 | rai | | 1432 | se ValueError(f'Argumento "log" é | | 1433 | mandatório e deve ser uma instân | | 1434 | cia de "lbxtoolkit.lbx_logger"') | | 1435 | | | 1436 | if self.kwargs is None: | | 1437 | ra | | 1438 | ise ValueError(f'Argumento "argpa | | 1439 | rse_cfg" é mandatório e deve ser | | 1440 | um dicionário com ao mínimo as ch | | 1441 | aves: [description, usage, usage, | | 1442 | add_help, formatter_class] para | | 1443 | configuração do módulo argpase') | | 1444 | | | 1445 | if self.kwopts is None: | | 1446 | raise V | | 1447 | alueError(f'Argumento "argparse_o | | 1448 | pt" é mandatório e deve ser uma l | | 1449 | ista de dicionários ao mínimo as | | 1450 | chaves: [short, long, action, hel | | 1451 | p] para tratamento dos argumentos | | 1452 | recebidos da linha de comando') | | 1453 | if self.ambient | | 1454 | e is None or self.ambiente not in | | 1455 | ['Linux', 'Windows', 'Serviço']: | | 1456 | r | | 1457 | aise ValueError(f'Argumento "ambi | | 1458 | ente" é mandatório e deve ser uma | | 1459 | string com um dos seguintes valo | | 1460 | res: [Linux, Windows, Serviço]') | | 1461 | | | 1462 | self.on_init_pre( | | 1463 | ) ## método opcional a ser defini | | 1464 | to por sobrecarga na função local | | 1465 | | | 1466 | # Argu | | 1467 | mentos padrão obrigatórios | | 1468 | self. | | 1469 | LogFile = Path('.',os.path.splite | | 1470 | xt(os.path.basename(file))[0] | | 1471 | + '.daemon') if not Log else Log | | 1472 | | | 1473 | self.OS = platform.system() | | 1474 | | | 1475 | self.PID = os.getppid() | | 1476 | self.IP = socket.ge | | 1477 | thostbyname(socket.gethostname()) | | 1478 | | | 1479 | self.Host = socket.gethostname() | | 1480 | | | 1481 | self.Usuario = os.getlogin() i | | 1482 | f self.OS == 'Windows' else os.pa | | 1483 | th.expanduser('~').split(r'/')[1] | | 1484 | se | | 1485 | lf.Me = os.path.abspath(file) | | 1486 | self.PIDDir = Pa | | 1487 | th('.') if not piddir else piddir | | 1488 | self.PIDFile = | | 1489 | Path(self.PIDDir,str(self.PID)) | | 1490 | self.exit = False | | 1491 | | | 1492 | self.mode = '[DAEMON (console)]' | | 1493 | | | 1494 | self.on_init( | | 1495 | ) ## método opcional a ser defini | | 1496 | to por sobrecarga na função local | | 1497 | | | 1498 | | | 1499 | self.on_init_pos() ## método opci | | 1500 | onal a ser definito por sobrecarg | | 1501 | a na função local | | 1502 | # | | 1503 | # | | 1504 | def main(self): | | 1505 | #kwargs = | | 1506 | ConfigManager.get('argparse_cfg') | | 1507 | | | 1508 | #kwopts = ConfigManager.ge | | 1509 | t('argparse_opt') | | 1510 | | | 1511 | #ambiente = ConfigManage | | 1512 | r.get('ambiente') | | 1513 | i | | 1514 | f len(sys.argv) == 1 and self.amb | | 1515 | iente == 'Serviço': ## VEM DAQUI | | 1516 | https://gist.github.com/drmalex07 | | 1517 | /10554232?permalink_comment_id=25 | | 1518 | 55358#gistcomment-2555358 | | 1519 | | | 1520 | servicemanager.Initialize() | | 1521 | servicemanager.Pr | | 1522 | epareToHostSingle(ServicoWindows) | | 1523 | serviceman | | 1524 | ager.StartServiceCtrlDispatcher() | | 1525 | elif len(sys.argv) | | 1526 | > 1 and sys.argv[1] == 'install': | | 1527 | | | 1528 | ServicoWindows.SvcInstall() | | 1529 | elif len(sys.argv) | | 1530 | > 1 and sys.argv[1] == 'remove': | | ::: | Servi | | | coWindows.SvcRemove() | | | else: | | | if len(sys.a | | | rgv) > 1 and sys.argv[1] in ['sta | | | rt', 'stop', 'restart', 'debug']: | | | | | | win32serviceutil. | | | HandleCommandLine(ServicoWindows) | | | else: | | | | | | self.parser = argpa | | | rse.ArgumentParser(**self.kwargs) | | | | | | for opt in self.kwopts: | | | | | | self.parser.add_argument( | | | opt['short'], opt['long'], action | | | =opt['action'], help=opt['help']) | | | self.args = | | | self.parser.parse_args() | | | | | | | | | self.args_paser() ## tratamento d | | | os arguemntos deve ser redefindo | | | por sobrecarga no na função local | | | # | | | # | | | def run(self): | | | """I | | | nicia a execução do do serviço""" | | | | | | self.on_run_pre() ## | | | método opcional a ser definito p | | | or sobrecarga na função local | | | | | | | | | self.daemon_log('START') | | | ## Gera o PIDFile | | | self.log.ad | | | d(f'Iniciando daemon [PID: {self. | | | PID}] para monitorar os processos | | | que rodam como serviço/daemon mo | | | nitorados em: {self.PIDDir}... ') | | | try: | | | | | | with open(self.PIDF | | | ile, 'w', encoding='utf-8') as f: | | | f.write(self. | | | Me + ';' + str(self.LogFile)) | | | | | | except Exception as Err: | | | | | | self.stop('CRASH') | | | | | | self.log.erro(f'Erro [{Err}] ao s | | | alvar PIDFile: {self.PIDFile}') | | | self.log.info(f'Ok!' | | | ) ## trocar para debug em prd ?? | | | | | | | | | self.on_run() # função princ | | | ipal para interreper o daemon/ser | | | viço, definir localmente por sobr | | | ecarga (criar classe que herde es | | | sa classe e defina essa função) | | | | | | self.on_run_pos() ## | | | método opcional a ser definito p | | | or sobrecarga na função local | | | # | | | # | | | | | | def stop(self, evento='STOP'): | | | | | | """Interrompe o daemon/serviço""" | | | | | | | | | self.on_stop_pre() ## mét | | | odo opcional a ser definito por s | | | obrecarga na função local | | | | | | | | | self.daemon_log(evento) | | | | | | self.on_stop() # função pri | | | ncipal para interreper o daemon/s | | | erviço, definir localmente por so | | | brecarga (criar classe que herde | | | essa classe e defina essa função) | | | self.cleanup() | | | self.exit=True | | | | | | | | | self.on_stop_pos() ## mét | | | odo opcional a ser definito por s | | | obrecarga na função local | | | # | | | # | | | def cleanup(self): # | | | # Elimina o arquivo PID do proces | | | so se estiver rodando como daemon | | | """Método auxil | | | iar utilizado no stop() para limp | | | ar o o PID file na interrupção""" | | | | | | | | | self.on_cleanup_pre() ## mét | | | odo opcional a ser definito por s | | | obrecarga na função local | | | | | | | | | if self.PIDFile: ## ver | | | ifica se está rodando como daemon | | | | | | if Path(self.PIDFile).exists(): | | | | | | Path(self.PIDFile).u | | | nlink() ##exclui o pidfile do da | | | emon se o arquivo existir | | | | | | self.PIDFile = None | | | | | | | | | self.on_cleanup_pos() ## mé | | | todo opcional a ser definito por | | | sobrecarga na função local | | | # | | | # | | | def daemon_log | | | (self, evento=None): ## Gerar log | | | de início/interrupção do serviço | | | """Mét | | | odo auxiliar utilizado alimentar | | | log do histórico de inicialização | | | /interrupção do serviço/daemon""" | | | | | | evento = | | | 'CHECK' if not evento else evento | | | | | | evento = evento.upper() | | | | | | TimeStamp = datetime.datetime.no | | | w().strftime('%Y-%m-%d %H:%M:%S') | | | Message = f'{TimeSt | | | amp} - {evento} - {self.OS} - {se | | | lf.Host}/{self.IP} - PID: {self.P | | | ID} - {self.Usuario} - {self.Me}' | | | try: | | | with | | | open(self.LogFile, 'a') as file: | | | | | | file.write(Message + '\n') | | | | | | except Exception as Err: | | | | | | self.log.erro(f'Erro [{Er | | | r}] ao gravar status do daemon em | | | {self.LogFile}') | | | # | | | # | | | def on_init_pre(self): | | | pass | | | def on_init_pos(self): | | | pass | | | def on_init(self): | | | pass | | | def on_cleanup_pre(self): | | | pass | | | def on_cleanup_pos(self): | | | pass | | | def on_cleanup(self): | | | pass | | | def on_run_pre(self): | | | pass | | | def on_run_pos(self): | | | pass | | | def on_run(self): | | | pass | | | def on_stop_pre(self): | | | pass | | | def on_stop_pos(self): | | | pass | | | def on_stop(self): | | | pass | | | def args_paser(self): | | | pass | | | | | |
| +-----------------------------------+-----------------------------------+ :::

::::::::::::::: {.doc .doc-children} ::::: {.doc .doc-object .doc-function}

[cleanup{.highlight .language-python}]{.n}[(){.highlight .language-python}]{.p} {#lbxtoolkit.Servicer.cleanup .doc .doc-heading}

:::: {.doc .doc-contents} Método auxiliar utilizado no stop() para limpar o o PID file na interrupção

Source code in lbxtoolkit\__init__.py

::: highlight +-----------------------------------+-----------------------------------+ | ::: linenodiv |

| | 1478 | | | 1479 | def cleanup(self): # | | 1480 | # Elimina o arquivo PID do proces | | 1481 | so se estiver rodando como daemon | | 1482 | """Método auxil | | 1483 | iar utilizado no stop() para limp | | 1484 | ar o o PID file na interrupção""" | | 1485 | | | 1486 | | | 1487 | self.on_cleanup_pre() ## mét | | 1488 | odo opcional a ser definito por s | | ::: | obrecarga na função local | | | | | | if self.PIDFile: ## ver | | | ifica se está rodando como daemon | | | | | | if Path(self.PIDFile).exists(): | | | | | | Path(self.PIDFile).u | | | nlink() ##exclui o pidfile do da | | | emon se o arquivo existir | | | | | | self.PIDFile = None | | | | | | | | | self.on_cleanup_pos() ## mé | | | todo opcional a ser definito por | | | sobrecarga na função local | | | | | |
| +-----------------------------------+-----------------------------------+ ::: :::: :::::

::::: {.doc .doc-object .doc-function}

[daemon_log{.highlight .language-python}]{.n}[({.highlight .language-python}]{.p}[evento{.highlight .language-python}]{.n}[={.highlight .language-python}]{.o}[None{.highlight .language-python}]{.kc}[){.highlight .language-python}]{.p} {#lbxtoolkit.Servicer.daemon_log .doc .doc-heading}

:::: {.doc .doc-contents} Método auxiliar utilizado alimentar log do histórico de inicialização/interrupção do serviço/daemon

Source code in lbxtoolkit\__init__.py

::: highlight +-----------------------------------+-----------------------------------+ | ::: linenodiv |

| | 1491 | | | 1492 | def daemon_log | | 1493 | (self, evento=None): ## Gerar log | | 1494 | de início/interrupção do serviço | | 1495 | """Mét | | 1496 | odo auxiliar utilizado alimentar | | 1497 | log do histórico de inicialização | | 1498 | /interrupção do serviço/daemon""" | | 1499 | | | 1500 | evento = | | 1501 | 'CHECK' if not evento else evento | | 1502 | evento = evento.upper() | | ::: | | | | TimeStamp = datetime.datetime.no | | | w().strftime('%Y-%m-%d %H:%M:%S') | | | Message = f'{TimeSt | | | amp} - {evento} - {self.OS} - {se | | | lf.Host}/{self.IP} - PID: {self.P | | | ID} - {self.Usuario} - {self.Me}' | | | try: | | | with | | | open(self.LogFile, 'a') as file: | | | | | | file.write(Message + '\n') | | | except Exception as Err: | | | | | | self.log.erro(f'Erro [{Er | | | r}] ao gravar status do daemon em | | | {self.LogFile}') | | | | | |
| +-----------------------------------+-----------------------------------+ ::: :::: :::::

::::: {.doc .doc-object .doc-function}

[run{.highlight .language-python}]{.n}[(){.highlight .language-python}]{.p} {#lbxtoolkit.Servicer.run .doc .doc-heading}

:::: {.doc .doc-contents} Inicia a execução do do serviço

Source code in lbxtoolkit\__init__.py

::: highlight +-----------------------------------+-----------------------------------+ | ::: linenodiv |

| | 1444 | | | 1445 | def run(self): | | 1446 | """I | | 1447 | nicia a execução do do serviço""" | | 1448 | | | 1449 | self.on_run_pre() ## | | 1450 | método opcional a ser definito p | | 1451 | or sobrecarga na função local | | 1452 | | | 1453 | self.daemon_log('START') | | 1454 | ## Gera o PIDFile | | 1455 | self.log.ad | | 1456 | d(f'Iniciando daemon [PID: {self. | | 1457 | PID}] para monitorar os processos | | 1458 | que rodam como serviço/daemon mo | | 1459 | nitorados em: {self.PIDDir}... ') | | 1460 | try: | | 1461 | with open(self.PIDF | | 1462 | ile, 'w', encoding='utf-8') as f: | | ::: | f.write(self. | | | Me + ';' + str(self.LogFile)) | | | except Exception as Err: | | | self.stop('CRASH') | | | | | | self.log.erro(f'Erro [{Err}] ao s | | | alvar PIDFile: {self.PIDFile}') | | | self.log.info(f'Ok!' | | | ) ## trocar para debug em prd ?? | | | | | | | | | self.on_run() # função princ | | | ipal para interreper o daemon/ser | | | viço, definir localmente por sobr | | | ecarga (criar classe que herde es | | | sa classe e defina essa função) | | | | | | self.on_run_pos() ## | | | método opcional a ser definito p | | | or sobrecarga na função local | | | | | |
| +-----------------------------------+-----------------------------------+ ::: :::: :::::

::::: {.doc .doc-object .doc-function}

[stop{.highlight .language-python}]{.n}[({.highlight .language-python}]{.p}[evento{.highlight .language-python}]{.n}[={.highlight .language-python}]{.o}['STOP'{.highlight .language-python}]{.s1}[){.highlight .language-python}]{.p} {#lbxtoolkit.Servicer.stop .doc .doc-heading}

:::: {.doc .doc-contents} Interrompe o daemon/serviço

Source code in lbxtoolkit\__init__.py

::: highlight +-----------------------------------+-----------------------------------+ | ::: linenodiv |

| | 1465 | | | 1466 | | | 1467 | def stop(self, evento='STOP'): | | 1468 | | | 1469 | """Interrompe o daemon/serviço""" | | 1470 | | | 1471 | self.on_stop_pre() ## mét | | 1472 | odo opcional a ser definito por s | | 1473 | obrecarga na função local | | 1474 | | | 1475 | self.daemon_log(evento) | | ::: | | | | self.on_stop() # função pri | | | ncipal para interreper o daemon/s | | | erviço, definir localmente por so | | | brecarga (criar classe que herde | | | essa classe e defina essa função) | | | self.cleanup() | | | self.exit=True | | | | | | self.on_stop_pos() ## mét | | | odo opcional a ser definito por s | | | obrecarga na função local | | | | | |
| +-----------------------------------+-----------------------------------+ ::: :::: ::::: ::::::::::::::: ::::::::::::::::: ::::::::::::::::::

:::::: {.doc .doc-object .doc-class}

api_rest {#lbxtoolkit.api_rest .doc .doc-heading}

::::: {.doc .doc-contents}

Classe api_rest {#lbxtoolkit.api_rest--classe-api_rest}

Destina-se a interatir com APIs RESTfull, em especial as publicadas pela SoftPlan para a Plataforma Sienge.

A classe deve ser instanciada conforme sintaxe abaixo:

api_rest(url, credenciais, cadencia, timeout=6, logger=None, headers={"Content-Type": "application/json"}, verify=True)

São nessários 2 parâmetros posicionais obrigatórios, e 5 parametros nominais facultativos (valor padrão, se omisso, indicado na sintaxe acima): - url: o endereço da URL de autenticação da API - crednciais: Dicionário com credenciais de autenticação. - cadencia Número máximo de chamadas por segudo à API - timeout Tempo máximo (segundos) para aguardar retorno à chamada. Padrão 6s, se omisso. - logger O objeto log handler para lidar com as informações de saída. Se não informado, todas as saídas serão direcionadas para a stdout. - headers Cabeçalhos http para a requisição à API. - verify Verifica a validade do certificado SSL do servidor de destino da requisição.

Quanto às credenciais de autenticação, assim como a classe de interação com o PostgreSQL, elas precisam ser fornecidas na forma de um dicionário. Para o método api_rest.aut_basic(), o formato deve ser:

credenciais = {
                'user': 'USUARIO_API',
                'password': 'TOKEN_USUARIO'
            }

Caso a autenticação seja pelo método api_rest.aut_bearer(), o dicionário deve corresponder ao formato previsto pelo endpoint e seu conteúdo será enviado como um JSON ao endereço indicado no parametro url

A classe possui 3 métodos: - api_rest.auth_basic(): instanciamento da sessão autenticando pelo método HTTPBasicAuth - api_rest.auth_bearer(): instanciamento da sessão autenticando pelos métodos OAuth, JWT, Bearer\

  • api_rest.endpoint_json([endereço], [método], payload=None): para a chamada ao endpoint - close() para encerra a instância/sessão

O consumo é feito pelo método api_rest.endpoint_json que suporta apenas APIs cujo payload (opcional) seja aceito no formato JSON.

Esse método espera 2 parametros posicionais obrigatórios: o endereço do endpoint e o verbo (get, post, patch ou put), tendo parametro opcional o objeto de 'payload' (json). Note que o endereço do endpoint deve ser informado completo. A URL informada no instanciamento da classe corresponde apenas ao endereço de autenticação.

O tempo, em segundos, transcorrido entre a chamada a atual e a chamada anterior ao endpoint pode ser consultado pelo argumento .Intervalo no objeto recebido do retorno à chamada ao método .endpoint_json.

Da mesma forma, o tempo de espera imposto para respeitar a cadência do webservcie também pode ser consultado pelo argumento .Espera.

Exemplo de uso:

from lbx_toolkit import api_rest

UrlBase=r'https://api.sienge.com.br/lbx/public/api/v1'
Credenciais = {
                'user': 'USUARIO_API',
                'password': 'TOKEN_USUARIO'
            }
ApiSienge = api_rest(UrlBase,Credenciais,2.5) # limite de 2 requisições/segundo para cadência de chamada ao endpoint
Auth = ApiSienge.auth_basic()

Nutitulo=input('Numero do título:')
Nuparcela=input('Numero da parcela:')
Vencimento=input('Vencimento [AAAA-MM-DD]:')
Payload = {
                "dueDate": f"{Vencimento}"
            }
EndPoint = f'{UrlBase}/bills/{Nutitulo}/installments/{Nuparcela}'

#chama o endpoint e recebe o retorno no objeto AlteraVcto
AlteraVcto = ApiSienge.endpoint_json(EndPoint, 'patch', Payload)

No exemplo acima não é esperado que o endpoint retorne nenhum dado (patch).

Quando se usa o verbo get e se espera o retorno de algum dado, use o método .json do pacote request para acessar o objeto recebido.

Para uso em APIs com autenticação JWT (JSON Web Token), OAuth, Bearer Token Authentication, a construção é a mesma indicada acima, bastando-se usar .auth_bearer() ao invés de .auth_basic(), e ajustar o dicionário credenciais informado no instanciamento da classe, que deve ser estruturado conforme o padrão fornecido peo mantendor da API e será enviado como payload ao endpoint (json=credenciais).

Source code in lbxtoolkit\__init__.py

::: highlight +-----------------------------------+-----------------------------------+ | ::: linenodiv |

| | 709 | | | 710 | class api_ | | 711 | rest: # Classe para interação com | | 712 | APIs Rest (especialmente Sienge) | | 713 | """ | | 714 | #### Classe api_rest | | 715 | | | 716 | Destina-se | | 717 | a interatir com APIs RESTfull, e | | 718 | m especial as publicadas pela Sof | | 719 | tPlan para a [Plataforma Sienge]( | | 720 | https://api.sienge.com.br/docs/). | | 721 | | | 722 | A classe deve ser ins | | 723 | tanciada conforme sintaxe abaixo: | | 724 | | | 725 | api_rest(url, credenc | | 726 | iais, cadencia, timeout=6, logger | | 727 | =None, headers={"Content-Type": " | | 728 | application/json"}, verify=True) | | 729 | | | 730 | São nessári | | 731 | os 2 parâmetros posicionais obrig | | 732 | atórios, e 5 parametros nominais | | 733 | facultativos (valor padrão, se om | | 734 | isso, indicado na sintaxe acima): | | 735 | - url: o ender | | 736 | eço da URL de autenticação da API | | 737 | | | 738 | - crednciais: Dicionário | | 739 | com credenciais de autenticação. | | 740 | - cadencia Número máxim | | 741 | o de chamadas por segudo à API | | 742 | - timeout Tempo máximo | | 743 | (segundos) para aguardar retorno | | 744 | à chamada. Padrão 6s, se omisso. | | 745 | - logger | | 746 | O objeto log handler para lid | | 747 | ar com as informações de saída. S | | 748 | e não informado, todas as saídas | | 749 | serão direcionadas para a stdout. | | 750 | - headers Cabeçalho | | 751 | s http para a requisição à API. | | 752 | - verify Verifica a | | 753 | validade do certificado SSL do s | | 754 | ervidor de destino da requisição. | | 755 | | | 756 | Quanto às creden | | 757 | ciais de autenticação, assim como | | 758 | a classe de interação com o Post | | 759 | greSQL, elas precisam ser forneci | | 760 | das na forma de um dicionário. | | 761 | Para o método api_rest.a | | 762 | ut_basic(), o formato deve ser: | | 763 | | | 764 | credenciais = { | | 765 | | | 766 | 'user': 'USUARIO_API', | | 767 | | | 768 | 'password': 'TOKEN_USUARIO' | | 769 | } | | 770 | | | 771 | Caso a au | | 772 | tenticação seja pelo método api_ | | 773 | rest.aut_bearer(), o dicionário | | 774 | deve corresponder ao formato prev | | 775 | isto pelo endpoint e seu conteúdo | | 776 | será enviado como um JSON ao end | | 777 | ereço indicado no parametro url | | 778 | | | 779 | | | 780 | | | 781 | A classe possui 3 métodos: | | 782 | - api_rest.auth_basic() | | 783 | : instanciamento da sessão auten | | 784 | ticando pelo método HTTPBasicAuth | | 785 | - | | 786 | api_rest.auth_bearer(): instanci | | 787 | amento da sessão autenticando pel | | 788 | os métodos OAuth, JWT, Bearer | | 789 | - api_rest.endpoint_jso | | 790 | n([endereço], [método], payload=N | | 791 | one): para a chamada ao endpoint | | 792 | - close() | | 793 | para encerra a instância/sessão | | 794 | | | 795 | | | 796 | O consumo é feito pelo método ap | | 797 | i_rest.endpoint_json que suporta | | 798 | apenas APIs cujo payload (opcion | | 799 | al) seja aceito no formato JSON. | | 800 | | | 801 | Esse mét | | 802 | odo espera 2 parametros posiciona | | 803 | is obrigatórios: o endereço do en | | 804 | dpoint e o verbo (get, post, patc | | 805 | h ou put), tendo parametro opcion | | 806 | al o objeto de 'payload' (json). | | 807 | Note que o endereço d | | 808 | o endpoint deve ser informado com | | 809 | pleto. A URL informada no instanc | | 810 | iamento da classe corresponde ape | | 811 | nas ao endereço de autenticação. | | 812 | | | 813 | O tempo, e | | 814 | m segundos, transcorrido entre a | | 815 | chamada a atual e a chamada anter | | 816 | ior ao endpoint pode ser consulta | | 817 | do pelo argumento .Intervalo no | | 818 | objeto recebido do retorno à cha | | 819 | mada ao método .endpoint_json. | | 820 | | | 821 | Da | | 822 | mesma forma, o tempo de espera i | | 823 | mposto para respeitar a cadência | | 824 | do webservcie também pode ser con | | 825 | sultado pelo argumento .Espera. | | 826 | | | 827 | Exemplo de uso: | | 828 | | | 829 | | | 830 | | | 831 | from lbx_toolkit import api_rest | | 832 | | | 833 | UrlBase=r'https://api | | 834 | .sienge.com.br/lbx/public/api/v1' | | 835 | Credenciais = { | | 836 | | | 837 | 'user': 'USUARIO_API', | | 838 | | | 839 | 'password': 'TOKEN_USUARIO' | | 840 | } | | 841 | ApiSienge = api_r | | 842 | est(UrlBase,Credenciais,2.5) # li | | 843 | mite de 2 requisições/segundo par | | 844 | a cadência de chamada ao endpoint | | 845 | | | 846 | Auth = ApiSienge.auth_basic() | | 847 | | | 848 | Nu | | 849 | titulo=input('Numero do título:') | | 850 | Nupa | | 851 | rcela=input('Numero da parcela:') | | 852 | Vencimento= | | 853 | input('Vencimento [AAAA-MM-DD]:') | | 854 | Payload = { | | 855 | | | 856 | "dueDate": f"{Vencimento}" | | 857 | } | | 858 | | | 859 | EndPoint = f'{UrlBase}/bills/{Nu | | 860 | titulo}/installments/{Nuparcela}' | | 861 | | | 862 | #chama o endpoint e rece | | 863 | be o retorno no objeto AlteraVcto | | 864 | | | 865 | AlteraVcto = ApiSienge.endpoint | | 866 | _json(EndPoint, 'patch', Payload) | | 867 | | | 868 | | | 869 | No exemplo | | 870 | acima não é esperado que o endpoi | | 871 | nt retorne nenhum dado (patch). | | 872 | | | 873 | Quand | | 874 | o se usa o verbo get e se esper | | 875 | a o retorno de algum dado, use o | | 876 | método .json do pacote request | | 877 | para acessar o objeto recebido. | | 878 | | | 879 | Para uso e | | 880 | m APIs com autenticação JWT (JSON | | 881 | Web Token), OAuth, Bearer Token | | 882 | Authentication, a construção é a | | 883 | mesma indicada acima, bastando-se | | 884 | usar .auth_bearer() ao invés d | | 885 | e .auth_basic(), e ajustar o di | | 886 | cionário credenciais informado | | 887 | no instanciamento da classe, que | | 888 | deve ser estruturado conforme o p | | 889 | adrão fornecido peo mantendor da | | 890 | API e será enviado como payload a | | 891 | o endpoint (json=credenciais). | | 892 | | | 893 | """ | | 894 | | | 895 | def init(self, url, credencia | | 896 | is, cadencia=3, timeout=6, logger | | 897 | =None, headers={"Content-Type": " | | 898 | application/json"}, verify=True): | | 899 | | | 900 | self.logger = logger if not l | | 901 | ogger is None else lbx_logger(Non | | 902 | e, logging.DEBUG, '%(levelname)s: | | 903 | %(message)s') # se não fornecer | | ::: | o logger, vai tudo para o console | | | | | | | | | if not validators.url(url): | | | self.logger. | | | critico('URL inválida: {url}. Pri | | | meiro parametro precisar uma URL | | | válida. Script abortado', exit=1) | | | if n | | | ot isinstance(credenciais, dict): | | | self. | | | logger.critico('O segundo paramet | | | ro posicional precisa ser um dici | | | onário. Script abortado', exit=1) | | | | | | self.RetEn | | | dPoint = None # Inicializa self. | | | RetEndPoint como None | | | | | | self.Headers = headers | | | | | | self.Verify = verify | | | self.Url = url | | | | | | self.Timeout = timeout | | | | | | self.Credenciais = credenciais | | | self.Cadencia = 1/cad | | | encia ## candencia corresponde a | | | chamadas por segundo, não minuto | | | | | | self.TempoUltReq = None | | | self.I | | | ntervalo = self.Cadencia + 1 | | | # | | | # | | | def controla_cadencia(s | | | elf): ## para controle apenas, nã | | | o deve ser chamada fora da classe | | | | | | # Verificar o tempo atual | | | Agora = time.time() | | | | | | # Cal | | | cular intervalo entre requisições | | | if self.TempoUltReq: | | | self.Int | | | ervalo = Agora - self.TempoUltReq | | | else: | | | | | | self.Intervalo = float('inf') | | | # Primeira requisição não espera | | | | | | | | | # Calcular o tempo de espera n | | | ecessário para respeitar o limite | | | i | | | f self.Intervalo < self.Cadencia: | | | self.Espera | | | = self.Cadencia - self.Intervalo | | | | | | time.sleep(self.Espera) | | | | | | return self.Espera | | | else: | | | self.Espera = 0 | | | r | | | eturn self.Espera, self.Intervalo | | | # | | | # | | | def auth_basic(self): | | | # Autentica e abre sessão na API | | | if | | | not self.Credenciais['user'] or | | | not self.Credenciais['password']: | | | | | | self.logger.critico('Dicioná | | | rio de credenciais não possui cha | | | ves "user" e/ou "password". Scrip | | | t abortado', exit=1) | | | try: | | | | | | self.Sessao = requests.Session() | | | | | | #Sessao.auth = (ApiUser, ApiPass) | | | | | | self.Sessao.auth = HTT | | | PBasicAuth(self.Credenciais['user | | | '], self.Credenciais['password']) | | | Au | | | th = self.Sessao.post(self.Url) | | | #pri | | | nt(f'Status: {Auth.status_code}') | | | | | | #print(f'Retorno: {Auth.text}') | | | | | | return self.Sessao | | | | | | except Exception as Err: | | | self.logger.c | | | ritico(f"Falha ao autenticar API: | | | {Err}. URL: {self.Url}", exit=1) | | | # | | | # | | | def auth_bearer(self): | | | # Autentica e abre sessão na API | | | | | | #self.UrlLogin = UrlLogin if U | | | rlLogin is not None else self.Url | | | try: | | | | | | self.Sessao = requests.Session() | | | Token = self | | | .Sessao.post(self.Url, headers=se | | | lf.Headers, json=self.Credenciais | | | , verify=self.Verify) | | | | | | self.Headers.update({"Authori | | | zation": f"Bearer {Token.text}"}) | | | if | | | 200 <= Token.status_code <= 299: | | | self.Sessa | | | o.status_code = Token.status_code | | | | | | self.Sessao.token = Token.text | | | | | | return self.Sessao | | | else: | | | | | | self.logger.critico(f"Erro a | | | o autenticar API: {Token.status_c | | | ode} - {Token.text}", exit=1) | | | | | | except Exception as Err: | | | self.logger.criti | | | co(f"Falha ao autenticar API: {Er | | | r}. URL: {self.Url}", exit=1) | | | # | | | # | | | def endpoint_js | | | on(self, endpoint, metodo, payloa | | | d=None): # Interage com End Point | | | | | | self.ult_tempo_req = time.time() | | | | | | self.Metodo = metodo.lower() | | | #se | | | lf.EndPoint = self.Url + endpoint | | | | | | self.EndPoint = endpoint | | | | | | self.Payload = payload | | | MetodosAceitos | | | = ['post', 'get', 'patch', 'put'] | | | | | | if not any(element in self.Metod | | | o for element in MetodosAceitos): | | | | | | self.logger.critico | | | (f'Método {self.Metodo} não previ | | | sto. Abortando chamada!', exit=1) | | | else: | | | ChamadaApi = | | | f'self.Sessao.{self.Metodo}(self | | | .EndPoint, timeout=self.Timeout, | | | headers=self.Headers, verify=self | | | .Verify)' if self.Payload is None | | | else f'self.Sessao.{self.Metodo} | | | (self.EndPoint, timeout=self.Time | | | out, headers=self.Headers, verify | | | =self.Verify, json=self.Payload)' | | | | | | self.controla_cadencia() | | | | | | self.TempoUltReq = time.time() | | | try: | | | se | | | lf.RetEndPoint = eval(ChamadaApi) | | | if sel | | | f.RetEndPoint.status_code >= 500: | | | | | | self.logger.critico(f'Erro {se | | | lf.RetEndPoint.status_code} na ch | | | amada do endpoint: {Err}\nEndpoin | | | t: {self.EndPoint}\nResposta: {se | | | lf.RetEndPoint.text}', exit=1) | | | | | | self.RetEndPoint. | | | Espera = self.Espera ## adiona o | | | tempo de espera ao retorno da API | | | | | | self.RetEndPoint.Intervalo = sel | | | f.Intervalo ## adiciona o interva | | | lo entre chamada ao retorno da AP | | | I | | | | | | return self.RetEndPoint | | | except reques | | | ts.exceptions.ReadTimeout as Err: | | | self.lo | | | gger.critico(f'Excedido tempo lim | | | ite {self.Timeout} para retorno d | | | o endpoint: {Err}\nEndpoint: {sel | | | f.EndPoint}', exit=1) | | | | | | except Exception as Err: | | | self.logger | | | .critico(f'Falha na chamada do en | | | dpoint: {Err}\nEndpoint: {self.En | | | dPoint}\nCodigo retorno: {self.Re | | | tEndPoint.status_code}\nResposta: | | | {self.RetEndPoint.text}', exit=1) | | | # | | | # | | | def tr | | | ata_erro_sienge(CodRet, Retorno): | | | if | | | not 200 <= CodRet <= 299: | | | try: | | | DicR | | | etorno = eval(Retorno.replace('nu | | | ll','None').replace(r'\n\t',' ')) | | | if 'c | | | lientMessage' in DicRetorno and D | | | icRetorno['clientMessage'] not in | | | ['None', None, '', ' ', 'null']: | | | MsgE | | | rro = DicRetorno['clientMessage'] | | | elif 'develop | | | erMessage' in DicRetorno and DicR | | | etorno['developerMessage'] not in | | | ['None', None, '', ' ', 'null']: | | | MsgErro | | | = DicRetorno['developerMessage'] | | | | | | elif 'message' in DicRetorno | | | and DicRetorno['message'] not in | | | ['None', None, '', ' ', 'null']: | | | | | | MsgErro = DicRetorno['message'] | | | else: | | | | | | MsgErro = Retorno | | | except: | | | MsgErro = Ret | | | orno.replace(r'\n\t',' ') | | | finally: | | | | | | return MsgErro | | | else: | | | | | | return Retorno | | | # | | | # | | | de | | | f close(self): # Encerra a cessão | | | self. | | | Sessao.close() | | | | | |
| +-----------------------------------+-----------------------------------+ :::

::: {.doc .doc-children} ::: ::::: ::::::

:::::: {.doc .doc-object .doc-class}

auth_EntraID {#lbxtoolkit.auth_EntraID .doc .doc-heading}

::::: {.doc .doc-contents}

Classe auth_EntraID {#lbxtoolkit.auth_EntraID--classe-auth_entraid}

Este recurso tem o propósito de controlar as permissões de execução do script usando as credencias do ambiente AD em nuvem da Microsoft (Azure AD >> Microsoft Entra ID), abortando se a autentição falhar ou o usuário não pertencer ao grupo.

Essa classe possui apenas dois métodos:

  • auth_EntraID.disclaimer(): apenas exibe uma tela de informações/instruções ao usuário.

  • auth_EntraID.valida_grupo([client_id], [client_secret], [tenant_id], timeout=60, log_file='auth_EntraID.log'): efetua a autenticação do usuário e verifica se ele pertence ao grupo informado, abortando a execução caso não pertença ao grupo ou a autenticação não seja validada no tempo estabelecido. Os argumentos timeout e log_file são opcionais e, se omitidos, os valores aqui atribuídos serão adotados como padrão.

É necessário obter parametros da plataforma de identidade da Microsoft (AD Azure, agora Microsoft Entra ID), no Centro de administração do Microsoft Entra. Sugerimos não armazenar estas ou outras informações sensíveis no script. Considere usar o pacote dotenv para isso.

Os argumentos obrigatórios (posicionais) são:

1) tenant_id corresponde ao campo ID do Locatário, que pode ser obtido na página visão geral de identidade do domínio

2) client_id corresponde ao ID do aplicativo (cliente), obtido na secção Identidade > Aplicativos > Registros de Aplicativo. Considere não reaproveitar aplicativos e criar um específico para essa finalidade.

3) secret_id corresponde ao Valor do ID secreto (não ao próprio ID Secreto) do aplicativo. Este token não é passivel de consulta após gerado e para obtê-lo, é necessário criar um novo segredo para o aplicativo na subsecção "Certificados e Segredos", após clicar no nome do aplicativo exibo na indicada no item (2). O token (Valor do segredo) deve ser copiado e anotado no ato da criação, pois não é possível consultá-lo posteriormente.

from lbx_toolkit import auth_EntraID

client_id = 'SEU_CLIENT_ID'
client_secret = 'SEU_CLIENT_SECRET'
tenant_id = 'SEU_TENANT_ID'

# inicializa instância
auth = auth_EntraID(client_id, client_secret, tenant_id, timeout=60, log_file='auth_EntraID.log')  

# exibe a mensagem padrão de aviso
auth.disclaimer()

auth.valida_grupo('Nome do Grupo de Distribuição') 
# se usuário não pertencer a grupo informado, a execução do script é abortada.

Source code in lbxtoolkit\__init__.py

::: highlight +-----------------------------------+-----------------------------------+ | ::: linenodiv |

| | 100 | | | 101 | | | 102 | class auth_EntraID: # Classe d | | 103 | e autenticação de usuários no Mic | | 104 | rosoft Entra ID (antiga Azure AD) | | 105 | """ | | 106 | | | 107 | #### Classe auth_EntraID | | 108 | | | 109 | Este rec | | 110 | urso tem o propósito de controlar | | 111 | as permissões de execução do scr | | 112 | ipt usando as credencias do ambie | | 113 | nte AD em nuvem da Microsoft (Azu | | 114 | re AD >> Microsoft Entra ID), abo | | 115 | rtando se a autentição falhar ou | | 116 | o usuário não pertencer ao grupo. | | 117 | | | 118 | Essa c | | 119 | lasse possui apenas dois métodos: | | 120 | | | 121 | - auth_EntraID.disclaim | | 122 | er(): apenas exibe uma tela de i | | 123 | nformações/instruções ao usuário. | | 124 | | | 125 | | | 126 | - auth_EntraID.valida_grupo | | 127 | ([client_id], [client_secret], [t | | 128 | enant_id], timeout=60, log_file=' | | 129 | auth_EntraID.log'): efetua a aut | | 130 | enticação do usuário e verifica s | | 131 | e ele pertence ao grupo informado | | 132 | , abortando a execução caso não | | 133 | pertença ao grupo ou a autenticaç | | 134 | ão não seja validada no tempo est | | 135 | abelecido. Os argumentos timeout | | 136 | e log_file são opcionais e, s | | 137 | e omitidos, os valores aqui atrib | | 138 | uídos serão adotados como padrão. | | 139 | | | 140 | É necessário obter para | | 141 | metros da plataforma de identidad | | 142 | e da Microsoft (AD Azure, agora M | | 143 | icrosoft Entra ID), no Centro d | | 144 | e administração do Microsoft Entr | | 145 | a. | | 146 | Sugerimos não arm | | 147 | azenar estas ou outras informaçõe | | 148 | s sensíveis no script. Considere | | 149 | usar o pacote dotenv para isso. | | 150 | | | 151 | Os argumento | | 152 | s obrigatórios (posicionais) são: | | 153 | | | 154 | 1) | | 155 | tenant_id corresponde ao campo * | | 156 | ID do Locatário*, que pode ser ob | | 157 | tido na página [visão geral de id | | 158 | entidade do domínio](https://entr | | 159 | a.microsoft.com/#blade/Microsoft_ | | 160 | AAD_IAM/TenantOverview.ReactView) | | 161 | | | 162 | 2) client_id corres | | 163 | ponde ao ID do aplicativo (clien | | 164 | te), obtido na secção [Identida | | 165 | de > Aplicativos > Registros de A | | 166 | plicativo](https://entra.microso | | 167 | ft.com/#view/Microsoft_AAD_Regist | | 168 | eredApps/ApplicationsListBlade/qu | | 169 | ickStartType~/null/sourceType/Mic | | 170 | rosoft_AAD_IAM). Considere não re | | 171 | aproveitar aplicativos e criar um | | 172 | específico para essa finalidade. | | 173 | | | 174 | 3) secret_id cor | | 175 | responde ao Valor do ID secret | | 176 | o (não ao próprio ID Secreto) do | | 177 | aplicativo. Este token não é pas | | 178 | sivel de consulta após gerado e p | | 179 | ara obtê-lo, é necessário criar u | | 180 | m novo segredo para o aplicativo | | 181 | na subsecção "Certificados e Seg | | 182 | redos", após clicar no nome do a | | 183 | plicativo exibo na indicada no it | | 184 | em (2). O token (Valor do segred | | 185 | o) deve ser copiado e anotado no | | 186 | ato da criação, pois não é poss | | 187 | ível consultá-lo posteriormente. | | 188 | | | 189 | | | 190 | | | 191 | fro | | 192 | m lbx_toolkit import auth_EntraID | | 193 | | | 194 | | | 195 | client_id = 'SEU_CLIENT_ID' | | 196 | cl | | 197 | ient_secret = 'SEU_CLIENT_SECRET' | | 198 | | | 199 | tenant_id = 'SEU_TENANT_ID' | | 200 | | | 201 | # inicializa instância | | 202 | | | 203 | auth = auth_EntraID(client_id, cl | | 204 | ient_secret, tenant_id, timeout=6 | | 205 | 0, log_file='auth_EntraID.log') | | 206 | | | 207 | # | | 208 | exibe a mensagem padrão de aviso | | 209 | auth.disclaimer() | | 210 | | | 211 | auth.valida_grupo( | | 212 | 'Nome do Grupo de Distribuição') | | 213 | # se usuário | | 214 | não pertencer a grupo informado, | | 215 | a execução do script é abortada. | | 216 | | | 217 | """ | | 218 | def _ | | 219 | init_(self, client_id, client_s | | 220 | ecret, tenant_id, grupo, timeout= | | 221 | 60, log_file='auth_EntraID.log'): | | 222 | | | 223 | self.client_id = client_id | | 224 | s | | 225 | elf.client_secret = client_secret | | 226 | | | 227 | self.tenant_id = tenant_id | | 228 | | | 229 | self.timeout = timeout | | 230 | self.grupo = grupo | | 231 | self | | 232 | .authority = f"https://login.micr | | 233 | osoftonline.com/{self.tenant_id}" | | 234 | self.scope = ["https | | 235 | ://graph.microsoft.com/.default"] | | 236 | self.redir | | 237 | ect_uri = "http://localhost:8000" | | 238 | self.response = "" | | 239 | self.status_code = 0 | | 240 | self.server = None | | 241 | | | 242 | self.log_file = log_file | | 243 | | | 244 | # Configura o logger | | 245 | | | 246 | logging.basicConfig(filen | | 247 | ame=log_file, level=logging.INFO, | | 248 | | | 249 | format='%(asctime)s - %(name) | | 250 | s - %(levelname)s - %(message)s') | | 251 | self.log | | 252 | ger = logging.getLogger(name) | | 253 | # | | 254 | # | | 255 | | | 256 | def valida_grupo(self): # Valid | | 257 | a se o usuário autenticado perten | | 258 | ce a grupo de segurança informado | | 259 | # Redireciona std | | 260 | out e stderr para arquivos de log | | 261 | | | 262 | original_stdout = sys.stdout | | 263 | | | 264 | original_stderr = sys.stderr | | 265 | sys | | 266 | .stdout = open('stdout.log', 'a') | | 267 | sys | | 268 | .stderr = open('stderr.log', 'a') | | 269 | | | 270 | | | 271 | # Configurações do Selenium | | 272 | | | 273 | chrome_options = Options() | | 274 | chrome_op | | 275 | tions.add_argument("--incognito") | | 276 | | | 277 | | | 278 | # Inicializa a aplicação MSAL | | 279 | try: | | 280 | app = ms | | 281 | al.ConfidentialClientApplication( | | 282 | | | 283 | self.client_id, | | 284 | | | 285 | authority=self.authority, | | 286 | clie | | 287 | nt_credential=self.client_secret, | | 288 | ) | | 289 | | | 290 | except BaseException as err: | | 291 | print(f'Falha a | | 292 | o iniciar aplicação MSAL: {err}') | | 293 | | | 294 | # Restaura saída padrão | | 295 | | | 296 | sys.stdout.close() | | 297 | | | 298 | sys.stderr.close() | | 299 | | | 300 | sys.stdout = original_stdout | | 301 | | | 302 | sys.stderr = original_stderr | | 303 | print(f | | 304 | 'Script abortado por falha aplica | | 305 | ção MSAL. Verifque logs: {self.lo | | 306 | g_file}, stdout.log e sterr.log') | | 307 | os._exit(0) | | 308 | | | 309 | # Inicia | | 310 | o fluxo de código de autorização | | 311 | try: | | 312 | | | 313 | flow = app.initiate | | 314 | _auth_code_flow(scopes=self.scope | | 315 | , redirect_uri=self.redirect_uri) | | 316 | | | 317 | auth_url = flow["auth_uri"] | | 318 | | | 319 | self.response = f"Acessando a URL | | 320 | de autenticação Microsoft Entra | | 321 | ID (antiga Azure AD): {auth_url}" | | 322 | | | 323 | self.logger.info(self.response) | | 324 | | | 325 | except BaseException as err: | | 326 | print(f'Falha no | | 327 | fluxo de autorização Microsoft En | | 328 | tra ID (antiga Azure AD): {err}') | | 329 | | | 330 | # Restaura saída padrão | | 331 | | | 332 | sys.stdout.close() | | 333 | | | 334 | sys.stderr.close() | | 335 | | | 336 | sys.stdout = original_stdout | | 337 | | | 338 | sys.stderr = original_stderr | | 339 | | | 340 | print(f'Script abort | | 341 | ado por falha no fluxo de autoriz | | 342 | ação Microsoft Entra ID (antiga A | | 343 | zure AD). Verifque logs: {self.lo | | 344 | g_file}, stdout.log e sterr.log') | | 345 | | | 346 | os._exit(0) | | 347 | | | 348 | | | 349 | # Inicializa o ChromeDri | | 350 | ver com redirecionamento de saída | | 351 | try: | | 352 | service = Service | | 353 | (ChromeDriverManager().install()) | | 354 | service.start() | | 355 | | | 356 | driver = webdriver.Chrome(service | | 357 | =service, options=chrome_options) | | 358 | | | 359 | driver.get(auth_url) | | 360 | | | 361 | except BaseException as err: | | 362 | print(f'Falha na | | 363 | inicialização do Chrome: {err}') | | 364 | | | 365 | # Restaura saída padrão | | 366 | | | 367 | sys.stdout.close() | | 368 | | | 369 | sys.stderr.close() | | 370 | | | 371 | sys.stdout = original_stdout | | 372 | | | 373 | sys.stderr = original_stderr | | 374 | print(f'S | | 375 | cript abortado na inicialização d | | 376 | o Chrome. Verifque logs: {self.lo | | 377 | g_file}, stdout.log e sterr.log') | | 378 | | | 379 | os.exit(0) | | 380 | # | | 381 | # | | 382 | class Aut | | 383 | hHandler(BaseHTTPRequestHandler): | | 384 | def | | 385 | log_message(self, format, *args): | | 386 | self.server | | 387 | .logger.info("%s - - [%s] %s\n" % | | 388 | | | 389 | | | 390 | (self.client_address[0], | | 391 | | | 392 | | | 393 | self.log_date_time_string(), | | 394 | | | 395 | format % args)) | | 396 | # | | 397 | # | | 398 | def do_GET(self): | | 399 | parsed_pa | | 400 | th = urlparse.urlparse(self.path) | | 401 | | | 402 | query_params = url | | 403 | parse.parse_qs(parsed_path.query) | | 404 | | | 405 | self.send_response(200) | | 406 | self.send_he | | 407 | ader('Content-type', 'text/html') | | 408 | | | 409 | self.end_headers() | | 410 | | | 411 | # Captura o | | 412 | código de autorização e o estado | | 413 | | | 414 | if 'code' in query_pa | | 415 | rams and 'state' in query_params: | | 416 | | | 417 | self.server.au | | 418 | th_code = query_params['code'][0] | | 419 | | | 420 | self.server | | 421 | .state = query_params['state'][0] | | 422 | | | 423 | self.wfile.write(b""" | | 424 | | | 425 | | | 426 | | | 427 | | | 428 | | | 429 | | | 430 | | | 431 | | | 432 | | | 433 | <met | | 434 | a name="viewport" content="width= | | 435 | device-width, initial-scale=1.0"> | | 436 | | | 437 | <style> | | 438 | | | 439 | body { | | 440 | | | 441 | | | 442 | font-family: 'Arial', sans-serif; | | 443 | | | 444 | | | 445 | background-color: #f8f9fa; | | 446 | | | 447 | margin: 0; | | 448 | | | 449 | | | 450 | font-size: 16px; | | 451 | | | 452 | | | 453 | padding: 30px; | | 454 | | | 455 | | | 456 | display: flex; * | | 457 | | | 458 | } | | 459 | | | 460 | | | 461 | | | 462 | .container { | | 463 | | | 464 | | | 465 | width: 100%; | | 466 | | | 467 | | | 468 | margin: auto; | | 469 | | | 470 | | | 471 | background-color: #ffffff; | | 472 | | | 473 | box-sha | | 474 | dow: 0 0 10px rgba(0, 0, 0, 0.1); | | 475 | | | 476 | | | 477 | padding: 16px; | | 478 | | | 479 | | | 480 | text-align: center; | | 481 | | | 482 | | | 483 | font-size: 16px; | | 484 | | | 485 | | | 486 | border-radius: 8px; | | 487 | | | 488 | } | | 489 | | | 490 | | | 491 | h1 { | | ::: | | | | | | | font-size: 18px; | | | | | | | | | text-align: center; | | | | | | | | | color: #007bff; | | | | | | } | | | | | | </style> | | | | | | | | | | | | | | |
| | | | | | | | |

Autenticaç&# | | | 227;o realizada com sucesso!

| | | | | | A | | | guarde que esta página ser&# | | | 225; fechada automaticamente.
| | | | | | | | | Se isto não acontecer | | | , pode fechá-la manualmente. | | | | | |
| | | | | | | | | | | | """) | | | else: | | | | | | self.wfile.write(b""" | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | <met | | | a name="viewport" content="width= | | | device-width, initial-scale=1.0"> | | | | | | <style> | | | | | | body { | | | | | | | | | font-family: 'Arial', sans-serif; | | | | | | | | | background-color: #f8f9fa; | | | | | | margin: 0; | | | | | | | | | font-size: 16px; | | | | | | | | | padding: 30px; | | | | | | | | | display: flex; * | | | | | | } | | | | | | | | | | | | .container { | | | | | | | | | width: 100%; | | | | | | | | | margin: auto; | | | | | | | | | background-color: #ffffff; | | | | | | box-sha | | | dow: 0 0 10px rgba(0, 0, 0, 0.1); | | | | | | | | | padding: 16px; | | | | | | | | | text-align: center; | | | | | | | | | font-size: 16px; | | | | | | | | | border-radius: 8px; | | | | | | } | | | | | | | | | h1 { | | | | | | | | | font-size: 18px; | | | | | | | | | text-align: center; | | | | | | | | | color: red; | | | | | | } | | | | | | </style> | | | | | | | | | | | | | | |
| | | | | |

Falh | | | a na autenticação!

| | | | | | | | | Esta página ser&# | | | 225; fechada automaticamente.
| | | | | | | | | Se isto não acontecer | | | , pode fechá-la manualmente. | | | | | |
| | | | | | | | | | | | """) | | | # | | | # | | | | | | # Inicializa o servidor HTTP | | | | | | self.server = HTTPServer( | | | ('localhost', 8000), AuthHandler) | | | | | | self.server.logger = self.logger | | | # Passa o logger para o servidor | | | | | | # Fun | | | ção para monitorar o tempo limite | | | | | | def monitor_timeout(): | | | | | | time.sleep(self.timeout) | | | if not h | | | asattr(self.server, 'auth_code'): | | | | | | self.response = "tempo limit | | | e para autenticação foi excedido" | | | | | | self.status_code = 490 | | | | | | self.logger.error(self.response) | | | | | | sys.stdout.close() | | | | | | sys.stderr.close() | | | | | | sys.stdout = original_stdout | | | | | | sys.stderr = original_stderr | | | print(f' | | | Código retorno: {self.status_code | | | } ', end='') ## self.status_code | | | = 200, usuário pertence ao grupo | | | informado. self.status_code = 299 | | | , grupo existe mas usuário NÃO pe | | | rtence à ele. Erros retornam 4xx. | | | | | | print(f'Resposta | | | : {self.response}', end='\n\n') | | | | | | print('Falha na a | | | utenticação! Execução abortada!') | | | driver.quit() | | | | | | self.server.server_close() | | | | | | os.exit(0) | | | # | | | # | | | # Inicia a thr | | | ead para monitorar o tempo limite | | | | | | timeout_thread = threadi | | | ng.Thread(target=monitor_timeout) | | | | | | timeout_thread.start() | | | | | | # | | | Espera pelo código de autorização | | | self.response = | | | "Esperando pela autenticação..." | | | | | | self.logger.info(self.response) | | | | | | self.server.handle_request() | | | | | | | | | # Restaura stdout e stderr | | | sys.stdout.close() | | | sys.stderr.close() | | | | | | sys.stdout = original_stdout | | | | | | sys.stderr = original_stderr | | | | | | # Ver | | | ifica se o código de autorização | | | foi obtido dentro do tempo limite | | | if not h | | | asattr(self.server, 'auth_code'): | | | return | | | | | | # Obté | | | m o código de autorização e o est | | | ado capturados pelo servidor HTTP | | | | | | auth_code = self.server.auth_code | | | | | | state = self.server.state | | | | | | # Ad | | | quire o token usando o código de | | | autorização, verificando o estado | | | try: | | | | | | result = app.acquire | | | token_by_auth_code_flow(flow, {"c | | | ode": auth_code, "state": state}) | | | | | | except ValueError as e: | | | | | | self.response = f"Erro | | | ao obter o token de acesso: {e}" | | | | | | self.status_code = 401 | | | | | | self.logger.error(self.response) | | | driver.quit() | | | return | | | | | | | | | if "access_token" in result: | | | acce | | | ss_token = result['access_token'] | | | headers = { | | | 'Authori | | | zation': 'Bearer ' + access_token | | | } | | | | | | # Obt | | | ém o email do usuário autenticado | | | | | | me_response = requests.get( | | | 'http | | | s://graph.microsoft.com/v1.0/me', | | | | | | headers=headers | | | ) | | | self.stat | | | us_code = me_response.status_code | | | i | | | f me_response.status_code == 200: | | | | | | me_data = me_response.json() | | | user_ema | | | il = me_data['userPrincipalName'] | | | | | | self.response = f"Email do u | | | suário autenticado: {user_email}" | | | | | | self.logger.info(self.response) | | | | | | # Verifi | | | ca se o usuário pertence ao grupo | | | | | | group_name = self.grupo | | | | | | | | | # Obtém o ID do usuário | | | | | | user_response = requests.get( | | | | | | f'https://graph.micros | | | oft.com/v1.0/users/{user_email}', | | | | | | headers=headers | | | ) | | | self.status | | | code = user_response.status_code | | | if | | | user_response.status_code == 200: | | | | | | user_data = user_response.json() | | | | | | user_id = user_data['id'] | | | | | | | | | # Pesquisa o grupo pelo nome | | | | | | group_response = requests.get( | | | | | | f"https://graph | | | .microsoft.com/v1.0/groups?$filte | | | r=displayName eq '{group_name}'", | | | | | | headers=headers | | | ) | | | | | | self.status | | | code = group_response.status_code | | | if g | | | roup_response.status_code == 200: | | | g | | | roup_data = group_response.json() | | | | | | if 'value' in group_data | | | and len(group_data['value']) > 0: | | | | | | group
| | | id = group_data['value'][0]['id'] | | | | | | | | | # Ve | | | rifica se o usuário está no grupo | | | | | | members_response = requests.get( | | | | | | f'https://graph.microsoft.com | | | /v1.0/groups/{group_id}/members', | | | | | | headers=headers | | | ) | | | | | | self.status_co | | | de = members_response.status_code | | | | | | if mem | | | bers_response.status_code == 200: | | | | | | membe | | | rs_data = members_response.json() | | | | | | if 'value' in members_data: | | | | | | user_in_group | | | = any(member['id'] == user_id for | | | member in members_data['value']) | | | | | | if user_in_group: | | | | | | self.respo | | | nse = f"O usuário {user_email} li | | | berado para uso desta aplicação." | | | | | | else: | | | | | | sel | | | f.response = f"O usuário {user_em | | | ail} NÃO liberado para uso desta | | | aplicação. Solicite acesso à TI." | | | | | | self.status_code = 299 | | | | | | else: | | | | | | self.r | | | esponse = "Resposta da API de mem | | | bros não contém a chave 'value'." | | | | | | self.status_code = 460 | | | | | | else: | | | | | | self.response = f"E | | | rro na resposta da API de membros | | | : {members_response.status_code}" | | | | | | self.response | | | += f"\n{members_response.json()}" | | | else: | | | | | | self.response = f"Grup | | | o '{group_name}' não encontrado." | | | | | | self.status_code = 470 | | | else: | | | | | | self.response = | | | f"Erro na resposta da API de grup | | | os: {group_response.status_code}" | | | | | | self.respons | | | e += f"\n{group_response.json()}" | | | else: | | | | | | self.response = | | | f"Erro na resposta da API de usuá | | | rio: {user_response.status_code}" | | | | | | self.respon | | | se += f"\n{user_response.json()}" | | | else: | | | | | | self.response = | | | f"Erro ao obter informações do us | | | uário: {me_response.status_code}" | | | self.resp | | | onse += f"\n{me_response.json()}" | | | else: | | | self.re | | | sponse = f"Erro ao obter o token | | | de acesso: {result.get('error')}" | | | | | | self.response += f"\n{ | | | result.get('error_description')}" | | | | | | self.status_code = 480 | | | | | | # Fecha o navegador | | | driver.quit() | | | service.stop() | | | | | | # Define o retorno | | | print(f'\n | | | Código retorno: {self.status_code | | | } ', end='') ## self.status_code | | | = 200, usuário pertence ao grupo | | | informado. self.status_code = 299 | | | , grupo existe mas usuário NÃO pe | | | rtence à ele. Erros retornam 4xx. | | | print(f'Resposta | | | : {self.response}', end='\n\n') | | | | | | if self.status_code == 200: | | | | | | print('Acesso autorizado!') | | | else: | | | print('Permissõe | | | s inválidas! Execução abortada!') | | | | | | os._exit(0) | | | # | | | # | | | def disclaimer(self | | | ): # Mostra o aviso do funcioname | | | nto e necessidade de autenticação | | | input(f""" | | | | | | Para ser utilizado d | | | e forma adequada e segura, este s | | | cript requer autenticação no Micr | | | osoft Entra ID (antiga Azure AD). | | | | | | Também requer que seu u | | | suário pertença a um grupo de seg | | | urança específico. Se você não te | | | m a segurança que tem permissão d | | | e uso, solicite previamente à TI. | | | | | | Para co | | | ntinuar, é necessário fornecer su | | | as credenciais, aquelas que costu | | | meiramente utiliza para acessar o | | | s serviços de e-mail corporativo. | | | | | | Uma janela de navegador será abe | | | rta e você será direcionado à tel | | | a de Logon do Microsoft Entra ID. | | | | | | Faça o Logon fornecendo usuário, | | | senha e validação de duplo fator | | | (no autenticador da Microsoft, i | | | nstalado em seu celular). | | | | | | Após a autenticação, a janela do | | | navegador será fechada e o script | | | iniciará o processo de execução. | | | | | | | | | Você tem {self.timeout} | | | segundos para realizar a autentic | | | ação ou a execução será abortada. | | | | | | | | | Tecle [ENTER] para continuar ... | | | | | | """) | | | | | |
| +-----------------------------------+-----------------------------------+ :::

::: {.doc .doc-children} ::: ::::: ::::::

:::::: {.doc .doc-object .doc-class}

lbx_logger {#lbxtoolkit.lbx_logger .doc .doc-heading}

::::: {.doc .doc-contents}

Classe lbx_logger {#lbxtoolkit.lbx_logger--classe-lbx_logger}

Essa classe requer a importação do módulo logging no script em que for instanciada e tem o propósito de manipular/formatar as mensagens de saída do script, alterando o formato e redirecionando destino padrão (stdout e stderr) para uma combinação de tela e/ou arquivo.

O comportamento padrão é registrar todas as saídas simultaneamente em tela e no arquivo com endereço informado no parâmetro log_file_path. Se este parametro for omisso no instanciamento da classe, as mensagens serão exibidas apenas na tela.

A mensagens devem ser classificadas por grau de severidade/relevância, da menor para a maior, na seguinte ordem: debug, info, warning (aviso), error (erro), critical (critico)

A classificação do nível de serveridade da mensagem se dá pelo método escolhido para invocar a mensagem, correspondente aos níveis de severidade equivalentes.

A classe deve ser instanciada conforme sintaxe abaixo:

lbx_logger(log_file_path=None, log_level=logging.DEBUG, formato_log='%(asctime)s - %(levelname)s - %(message)s', modulo=None, ignore_console=None, ignore_file=None):

Todos os parametros são nominativos e facultativos. Em caso de omissão, os valores padrão são assumidos conforme o exemplo acima.

Os parametros para o instanciamento da classe são:

  • log_file_path Define o caminho e o nome do arquivo de log. Se omisso, as mensagens serão todas direcionadas apenas para a tela.
  • log_level Define o nível mínimo de severidade das mensagens a serem manipuladas pelo logger. Se omisso, será assumido o nível mais baixo (debug). As mensagens com nível abaixo do especificado são descartadas. Os níveis devem ser informados de acordo com a sintaxe acima (prefixados com logging. e com o nome do nível em inglês e maiúsculas). Exemplo:
  • logging.DEBUG para manipular chamadas do método .debug() e acima.
  • logging.INFO para manipular chamadas do método .info() e acima.
  • logging.WARNING para manipular chamadas do método .aviso() e acima.
  • logging.ERROR para manipular chamadas do método .erro() e acima.
  • logging.CRITICAL para manipular chamadas do método .critico() e acima.
  • formato_log Define o formato em que a mensagem será apresentada. Se omisso, o padrá é DATA_HORA - NIVEL - MENSAGEM. Para maiores opções veja: Atributos de log
  • modulo Nome do módulo para o qual os logs serão monitorados. Permite instanciar várias vezes a classe para criar manipuladores diferentes para módulos diferente. Informe o nome do módulo para criar um log específico para ele ou simplesmente omita o parametro para criar um log para o script em geral.
  • ignore_console Lista com os níveis de severidade a serem ignorados para apresentação na tela, registrando apenas no arquivo (quando informado no parametro log_file_path) e obedecendo ao nível mínimo estabelecido no parametro log_level. Note que omitir o parametro log_file_path e incluir um nível na lsita ignore_console implica em ignorar/suprimir esse nível de mensagem de qualquer apresentação.
  • ignore_file Mesma lógica do parametro ignore_console, mas com lógica invertida: suprime o registro do nível do arquivo e demonstra apenas na tela.

1) As mensagem são manipuladas substituindo-se o comando print() pela chamada a um dos 5 métodos acima (.add(), .debug(), .info(), .aviso(), .erro(), .critico()). Exceto o método .add(), qualquer um dos demais métodos pode interromper a execução do script, através da passagem do parâmetro exit. Ao informar esse parametro na chamadada do método, atribua a ele o código de saída desejado (0 para normal, qualquer outro número para saída com erro). Exemplo:

log.erro('Essa mensagem apenas resulta em uma mensagem de nível ERROR')
log.erro('Essa mensagem resulta em uma mensagem de nível ERRO e encerra o script com código de retorno -1', exit=-1)

Qualquer chamada ao comando print(), uma vez instanciado manipulador de log, será registada como uma chamada ao método .info() e registrada com este nível de severidade. Para retornar ao comportamente padrão do comando print, ou interromper o manipulador, faça chamada ao método .stop_logging()

2) O método .add() não exibe/grava imediatamente a mensagem, mas apenas a diciona a buffer. Todas as chamas a .add() irão concatenar a mensagem recebida até a próxima chamada em algum dos níveis .debug(), .info(), .aviso(), .erro(), .critico(). Na primeira chama de um destes níveis após uma (ou mais) chamada(s) ao método .add() o buffer será concatenado à mensagem recebida por um destes métodos e o resultado será manipulado pelo log conforme os parametros definidos no intanciamento da classe e o método chamado. Essa função é útil para tratar mensagens com retorno condicional. Exemplo:

log.add('Mensagem 1# ') ## não será exibida/registrada
log.add('Mensagem 2# ') ## não será exibida/registrada
log.info('Mensagem 3) ## será exibida/registrada como nível "info" e com texto: "Mensagem 1# Mensagem 2# Mensagem 3"

3) Os métodos que exibem as mensagens (.debug(),.info(),.aviso(), .erro(), .critico()) possuem 3 parametros: message, corte=None, exit=None.

  • message: posicional e obrigatório. corresponde à mensagem a ser exibida
  • corte: o tamanho máximo da mensagem a ser exibida. opcional e se omitido, exibe a mensagem inteira. se fornecido, corta a mensagem no comprimento informado
  • exit: opcional. se informado (requer um código de retorno), aborta o script com o código informado. se omisso (padrão) a mensagem apenas é minutada pelo log, sem interferir no funcionamento do script

4) O método .filtra() possui 3 parametros posicionais, todos opcionais: log_file, dh_ini, dh_fim.

Se os 3 forem omitidos, serão exibidas as entradas de log do arquivo corrente, definido no instanciamento da classe lbx_logger, registradas na última hora. Deste modo, o valor padrão para dh_fim é now() e para dh_ini é now() menos 1 hora.

Caso queira filtrar os registro de outro arquivo de log, que não seja o do script corrente, informe o endereço do arquivo no primeiro parametro.

E caso queira alterar alterar o período de filtragem, informe nos parametros 2 e 3 a data/hora de início e fim do período. Estes dois parametros aceitam tanto um objeto do tipo datetime como uma string (que será convertida para datetime), desde que ela esteja no formato dd/mm/aaaa hh:mm:[ss] (segundos são opcionais).

Considerando que os parametros são posicionais, caso queira omitir apenas um dos parametros, preencha a posição do parametro a ser omitido com None.

A saída dessa função retorna um objeto, que pode ser salvo em disco ou impresso na tela.

5) Exemplos de uso:

from lbx_toolkit import lbx_logger 
import logging
import os
from pathlib import Path

DirBase = Path('./')  # diretório corrente do script
BaseName = os.path.splitext(os.path.basename(__file__))[0] # nome do script sem extensão
LogFile = Path(DirBase, BaseName + '.log') # salva logs no diretório corrente, em um arquivo nomeado com nome do script + extensão ".log"

### instancia o manipulador para tratar todas as mensagens (nível DEBUG acima), 
#   mas suprime a apresentação em tela das mensagens de nível "DEBUG" na tela, 
#   apenas registrando-as somente no arquivo
#   e sumprime o registro no arquivo das mensagens de nível "ERROR", 
#   mostrando-as apenas na tela
log = lbx_logger(LogFile, logging.DEBUG, ignore_console=[logging.DEBUG], ignore_file=[logging.ERROR]) 

# Exemplo de mensagens de log
log.debug('Esta é uma mensagem de debug') 
log.info('Esta é uma mensagem informativa')
log.add('Esta mensagem não será exibida agora, mas acumulada no buffer# ')
log.aviso('Esta é uma mensagem de aviso')
log.erro('Esta é uma mensagem de erro')
log.erro('Esta é uma mensagem erro muito comprida e será limitada a 40 caracteres, o restante será cortado e ingorado ao ser manipulado', 40)
log.critico('Esta é uma mensagem crítica')

# Exemplo de função que gera uma exceção
def funcao_com_erro():
    raise ValueError('Este é um erro de exemplo')

# Testando redirecionamento de print e captura de exceção
print('Mensagem de teste via print')
try:
    funcao_com_erro()
except Exception as e:
    print(f'Capturado um erro: {e}')

log.erro('Essa é uma mensagem de erro e abortará a execução do script', exit=1)

log.info('Essa mensagem não será exibida pois o script foi abortado na mensagem anterior')

# obtem os registros de log da última hora (comportamento padrão)
filtra_log = log.search() 

# obtem os registros das últimas 6 horas
ultimas_6h = datetime.datetime.now() - datetime.timedelta(hours=6) ## carimbo de tempo de 6 horas atrás !!! requer>> import datetime
filtra_log = log.search(None, ultimas_6h) # None no 1º parametro impõe o log do arquivo corrente como padrão (definido em 'LogFile' e apontado no instanciamento da classe)

# obtem os registros do dia 14/01/2020 até 3h atrás
ultimas_3h = datetime.datetime.now() - datetime.timedelta(hours=3) ## carimbo de tempo de 6 horas atrás !!! requer>> import datetime
filtra_log = log.search(None, '14/01/2020 00:00', ultimas_3h) # 

# obtem os registros do horário comercial do dia 23/12/2023 do arquivo salvo em C:\temp\outro_arquivo.log
Outro_Log = Path(r'c:\temp\outro_arquivo.log')
filtra_log = log.search(Outro_Log, '23/12/2023 08:00', '23/12/2023 18:00') # 

# salva conteúdo filtrado em um arquivo:
filtrado = 'filtered_log.txt'
with open(filtado, 'w', encoding='ISO-8859-1') as output_file:  # indique o enconding conforme salvo (UTF-8 ou ISO-8859-1)
    output_file.writelines(filta_log)    

# mostra o conteúdo filtrado na tela
print(''.join(filtra_log))

# mostra o conteúdo filtrado na tela, listando apenas as os registros do nível "DEBUG"
for line in filtered_lines:
    if "DEBUG" in line:
        print(line, end='')

Source code in lbxtoolkit\__init__.py

::: highlight +-----------------------------------+-----------------------------------+ | ::: linenodiv |

| | 906 | | | 907 | class lbx_logger: # Class | | 908 | e para gerenciar a saída para log | | 909 | r""" | | 910 | | | 911 | #### Classe lbx_logger | | 912 | | | 913 | Essa | | 914 | classe requer a importação do mód | | 915 | ulo logging no script em que fo | | 916 | r instanciada e tem o propósito d | | 917 | e manipular/formatar as mensagens | | 918 | de saída do script, alterando o | | 919 | formato e redirecionando destino | | 920 | padrão (stdout e stderr) para uma | | 921 | combinação de tela e/ou arquivo. | | 922 | | | 923 | O comportamen | | 924 | to padrão é registrar todas as sa | | 925 | ídas simultaneamente em tela e | | 926 | no arquivo com endereço informado | | 927 | no parâmetro log_file_path. Se | | 928 | este parametro for omisso no ins | | 929 | tanciamento da classe, as mensage | | 930 | ns serão exibidas apenas na tela. | | 931 | | | 932 | A mensagens | | 933 | devem ser classificadas por grau | | 934 | de severidade/relevância, da meno | | 935 | r para a maior, na seguinte ordem | | 936 | : debug, info, warning (aviso), | | 937 | error (erro), critical (critico) | | 938 | | | 939 | | | 940 | A classificação do nível d | | 941 | e serveridade da mensagem se dá p | | 942 | elo método escolhido para invocar | | 943 | a mensagem, correspondente aos n | | 944 | íveis de severidade equivalentes. | | 945 | | | 946 | A classe deve ser ins | | 947 | tanciada conforme sintaxe abaixo: | | 948 | | | 949 | l | | 950 | bx_logger(log_file_path=None, log | | 951 | _level=logging.DEBUG, formato_log | | 952 | ='%(asctime)s - %(levelname)s - % | | 953 | (message)s', modulo=None, ignore_ | | 954 | console=None, ignore_file=None): | | 955 | | | 956 | | | 957 | Todos os parametros são nomina | | 958 | tivos e facultativos. Em caso de | | 959 | omissão, os valores padrão são as | | 960 | sumidos conforme o exemplo acima. | | 961 | | | 962 | Os parametros par | | 963 | a o instanciamento da classe são: | | 964 | | | 965 | | | 966 | - log_file_path Define o camin | | 967 | ho e o nome do arquivo de log. Se | | 968 | omisso, as mensagens serão todas | | 969 | direcionadas apenas para a tela. | | 970 | | | 971 | - log_level Define o nível | | 972 | mínimo de severidade das mensagen | | 973 | s a serem manipuladas pelo logger | | 974 | . Se omisso, será assumido o níve | | 975 | l mais baixo (debug). As mensag | | 976 | ens com nível abaixo do especific | | 977 | ado são descartadas. Os níveis de | | 978 | vem ser informados de acordo com | | 979 | a sintaxe acima (prefixados com _ | | 980 | logging._ e com o nome do nível e | | 981 | m inglês e maiúsculas). Exemplo: | | 982 | - lo | | 983 | gging.DEBUG para manipular chama | | 984 | das do método .debug() e acima. | | 985 | - | | 986 | logging.INFO para manipular cham | | 987 | adas do método .info() e acima. | | 988 | - logg | | 989 | ing.WARNING para manipular chama | | 990 | das do método .aviso() e acima. | | 991 | - l | | 992 | ogging.ERROR para manipular cham | | 993 | adas do método .erro() e acima. | | 994 | - logging.CRITICA | | 995 | L para manipular chamadas do mét | | 996 | odo .critico() e acima. | | 997 | - f | | 998 | ormato_log Define o formato em q | | 999 | ue a mensagem será apresentada. S | | 1000 | e omisso, o padrá é DATA_HORA - | | 1001 | NIVEL - MENSAGEM. Para maiores o | | 1002 | pções veja: [Atributos de log](ht | | 1003 | tps://docs.python.org/3/library/l | | 1004 | ogging.html#logrecord-attributes) | | 1005 | - modu | | 1006 | lo Nome do módulo para o qual os | | 1007 | logs serão monitorados. Permite | | 1008 | instanciar várias vezes a classe | | 1009 | para criar manipuladores diferent | | 1010 | es para módulos diferente. Inform | | 1011 | e o nome do módulo para criar um | | 1012 | log específico para ele ou simple | | 1013 | smente omita o parametro para cri | | 1014 | ar um log para o script em geral. | | 1015 | - ignore_console | | 1016 | Lista com os níveis de severida | | 1017 | de a serem ignorados para aprese | | 1018 | ntação na tela, registrando ape | | 1019 | nas no arquivo (quando informado | | 1020 | no parametro log_file_path) e | | 1021 | obedecendo ao nível mínimo estabe | | 1022 | lecido no parametro log_level. | | 1023 | Note que omitir o parametro log_ | | 1024 | file_path e incluir um nível na | | 1025 | lsita ignore_console implica em | | 1026 | ignorar/suprimir esse nível de m | | 1027 | ensagem de qualquer apresentação. | | 1028 | - ignore_file Mesma | | 1029 | lógica do parametro ignore_conso | | 1030 | le, mas com lógica invertida: su | | 1031 | prime o registro do nível do arqu | | 1032 | ivo e demonstra apenas na tela. | | 1033 | | | 1034 | 1) | | 1035 | As mensagem são manipuladas subs | | 1036 | tituindo-se o comando print() p | | 1037 | ela chamada a um dos 5 métodos ac | | 1038 | ima (.add(), .debug(), .info(), | | 1039 | .aviso(), .erro(), .critico()). | | 1040 | Exceto o método .add(), qualque | | 1041 | r um dos demais métodos pode inte | | 1042 | rromper a execução do script, atr | | 1043 | avés da passagem do parâmetro ex | | 1044 | it. Ao informar esse parametro n | | 1045 | a chamadada do método, atribua a | | 1046 | ele o código de saída desejado (0 | | 1047 | para normal, qualquer outro núme | | 1048 | ro para saída com erro). Exemplo: | | 1049 | | | 1050 | | | 1051 | log.e | | 1052 | rro('Essa mensagem apenas resulta | | 1053 | em uma mensagem de nível ERROR') | | 1054 | log.erro('Essa me | | 1055 | nsagem resulta em uma mensagem de | | 1056 | nível ERRO e encerra o script co | | 1057 | m código de retorno -1', exit=-1) | | 1058 | | | 1059 | | | 1060 | Qualquer | | 1061 | chamada ao comando print(), uma | | 1062 | vez instanciado manipulador de l | | 1063 | og, será registada como uma chama | | 1064 | da ao método .info() e registra | | 1065 | da com este nível de severidade. | | 1066 | | | 1067 | Para retornar ao comportame | | 1068 | nte padrão do comando print, ou i | | 1069 | nterromper o manipulador, faça ch | | 1070 | amada ao método .stop_logging() | | 1071 | | | 1072 | 2) O méto | | 1073 | do .add() não exibe/grava imedi | | 1074 | atamente a mensagem, mas apenas a | | 1075 | diciona a buffer. Todas as cha | | 1076 | mas a .add() irão concatenar a | | 1077 | mensagem recebida até a próxima c | | 1078 | hamada em algum dos níveis .debu | | 1079 | g(), .info(), .aviso(), .erro(), | | 1080 | .critico(). Na primeira chama de | | 1081 | um destes níveis após uma (ou ma | | 1082 | is) chamada(s) ao método .add() | | 1083 | o buffer será concatenado à me | | 1084 | nsagem recebida por um destes mét | | 1085 | odos e o resultado será manipulad | | 1086 | o pelo log conforme os parametros | | 1087 | definidos no intanciamento da cl | | 1088 | asse e o método chamado. Essa fun | | 1089 | ção é útil para tratar mensagens | | 1090 | com retorno condicional. Exemplo: | | 1091 | | | 1092 | | | 1093 | log.add('Mensagem 1# | | 1094 | ') ## não será exibida/registrada | | 1095 | log.add('Mensagem 2# | | 1096 | ') ## não será exibida/registrada | | 1097 | log.info('Mensage | | 1098 | m 3) ## será exibida/registrada c | | 1099 | omo nível "info" e com texto: "Me | | 1100 | nsagem 1# Mensagem 2# Mensagem 3" | | 1101 | | | 1102 | | | 1103 | 3) Os métodos que exibem | | 1104 | as mensagens (.debug(),.info( | | 1105 | ),.aviso(), .erro(), .criti | | 1106 | co()) possuem 3 parametros: mes | | 1107 | sage, corte=None, exit=None. | | 1108 | | | 1109 | - messag | | 1110 | e: posicional e obrigatório. cor | | 1111 | responde à mensagem a ser exibida | | 1112 | | | 1113 | - corte: o tamanho máximo | | 1114 | da mensagem a ser exibida. opcio | | 1115 | nal e se omitido, exibe a mensage | | 1116 | m inteira. se fornecido, corta a | | 1117 | mensagem no comprimento informado | | 1118 | - ex | | 1119 | it: opcional. se informado (requ | | 1120 | er um código de retorno), aborta | | 1121 | o script com o código informado. | | 1122 | se omisso (padrão) a mensagem ape | | 1123 | nas é minutada pelo log, sem inte | | 1124 | rferir no funcionamento do script | | 1125 | | | 1126 | 4) O m | | 1127 | étodo .filtra() possui 3 parame | | 1128 | tros posicionais, todos opcionais | | 1129 | : log_file, dh_ini, dh_fim. | | 1130 | | | 1131 | Se os 3 forem omit | | 1132 | idos, serão exibidas as entradas | | 1133 | de log do arquivo corrente, defin | | 1134 | ido no instanciamento da classe | | 1135 | lbx_logger, registradas na últim | | 1136 | a hora. Deste modo, o valor padrã | | 1137 | o para dh_fim é now() e para | | 1138 | dh_ini é now() menos 1 hora. | | 1139 | | | 1140 | Caso queira | | 1141 | filtrar os registro de outro arqu | | 1142 | ivo de log, que não seja o do scr | | 1143 | ipt corrente, informe o endereço | | 1144 | do arquivo no primeiro parametro. | | 1145 | | | 1146 | E caso queira alterar alt | | 1147 | erar o período de filtragem, info | | 1148 | rme nos parametros 2 e 3 a data/h | | 1149 | ora de início e fim do período. E | | 1150 | stes dois parametros aceitam tant | | 1151 | o um objeto do tipo datetime co | | 1152 | mo uma string (que será convertid | | 1153 | a para datetime), desde que ela e | | 1154 | steja no formato dd/mm/aaaa hh:m | | 1155 | m:[ss] (segundos são opcionais). | | 1156 | | | 1157 | Considerando que o | | 1158 | s parametros são posicionais, cas | | 1159 | o queira omitir apenas um dos par | | 1160 | ametros, preencha a posição do pa | | 1161 | rametro a ser omitido com None. | | 1162 | | | 1163 | A saída dessa função r | | 1164 | etorna um objeto, que pode ser sa | | 1165 | lvo em disco ou impresso na tela. | | 1166 | | | 1167 | | | 1168 | 5) Exemplos de uso: | | 1169 | | | 1170 | | | 1171 | fr | | 1172 | om lbx_toolkit import lbx_logger | | 1173 | import logging | | 1174 | import os | | 1175 | from pathlib import Path | | 1176 | | | 1177 | DirBase = Path('./' | | 1178 | ) # diretório corrente do script | | 1179 | BaseName = os.path.spl | | 1180 | itext(os.path.basename(__file__)) | | 1181 | [0] # nome do script sem extensão | | 1182 | LogFi | | 1183 | le = Path(DirBase, BaseName + '.l | | 1184 | og') # salva logs no diretório co | | 1185 | rrente, em um arquivo nomeado com | | 1186 | nome do script + extensão ".log" | | 1187 | | | 1188 | ### instancia | | 1189 | o manipulador para tratar todas a | | 1190 | s mensagens (nível DEBUG acima), | | 1191 | # mas supri | | 1192 | me a apresentação em tela das men | | 1193 | sagens de nível "DEBUG" na tela, | | 1194 | # apenas | | 1195 | registrando-as somente no arquivo | | 1196 | # | | 1197 | e sumprime o registro no arquivo | | 1198 | das mensagens de nível "ERROR", | | 1199 | | | 1200 | # mostrando-as apenas na tela | | 1201 | log | | 1202 | = lbx_logger(LogFile, logging.DE | | 1203 | BUG, ignore_console=[logging.DEBU | | 1204 | G], ignore_file=[logging.ERROR]) | | 1205 | | | 1206 | | | 1207 | # Exemplo de mensagens de log | | 1208 | log.debug | | 1209 | ('Esta é uma mensagem de debug') | | 1210 | log.info(' | | 1211 | Esta é uma mensagem informativa') | | 1212 | log.add( | | 1213 | 'Esta mensagem não será exibida a | | 1214 | gora, mas acumulada no buffer# ') | | 1215 | log.avis | | 1216 | o('Esta é uma mensagem de aviso') | | 1217 | log.er | | 1218 | ro('Esta é uma mensagem de erro') | | 1219 | log.erro( | | 1220 | 'Esta é uma mensagem erro muito c | | 1221 | omprida e será limitada a 40 cara | | 1222 | cteres, o restante será cortado e | | 1223 | ingorado ao ser manipulado', 40) | | 1224 | log.criti | | 1225 | co('Esta é uma mensagem crítica') | | 1226 | | | 1227 | # Exemp | | 1228 | lo de função que gera uma exceção | | 1229 | def funcao_com_erro(): | | 1230 | raise ValueE | | 1231 | rror('Este é um erro de exemplo') | | 1232 | | | 1233 | # Testando redirecioname | | 1234 | nto de print e captura de exceção | | 1235 | pri | | 1236 | nt('Mensagem de teste via print') | | 1237 | try: | | 1238 | funcao_com_erro() | | 1239 | except Exception as e: | | 1240 | | | 1241 | print(f'Capturado um erro: {e}') | | 1242 | | | 1243 | log.erro('Ess | | 1244 | a é uma mensagem de erro e aborta | | 1245 | rá a execução do script', exit=1) | | 1246 | | | 1247 | log.info('Essa mensagem | | 1248 | não será exibida pois o script fo | | 1249 | i abortado na mensagem anterior') | | 1250 | | | 1251 | | | 1252 | # obtem os registros de log da ú | | 1253 | ltima hora (comportamento padrão) | | 1254 | | | 1255 | filtra_log = log.search() | | 1256 | | | 1257 | # obtem | | 1258 | os registros das últimas 6 horas | | 1259 | | | 1260 | ultimas_6h = datetime.datetime.no | | 1261 | w() - datetime.timedelta(hours=6) | | 1262 | ## carimbo de tempo de 6 horas a | | 1263 | trás !!! requer>> import datetime | | 1264 | filtra | | 1265 | _log = log.search(None, ultimas_6 | | 1266 | h) # None no 1º parametro impõe o | | 1267 | log do arquivo corrente como pad | | ::: | rão (definido em 'LogFile' e apon | | | tado no instanciamento da classe) | | | | | | # obtem os registr | | | os do dia 14/01/2020 até 3h atrás | | | | | | ultimas_3h = datetime.datetime.no | | | w() - datetime.timedelta(hours=3) | | | ## carimbo de tempo de 6 horas a | | | trás !!! requer>> import datetime | | | | | | filtra_log = log.search(None, ' | | | 14/01/2020 00:00', ultimas_3h) # | | | | | | # obte | | | m os registros do horário comerci | | | al do dia 23/12/2023 do arquivo s | | | alvo em C:\temp\outro_arquivo.log | | | Outro_Log = P | | | ath(r'c:\temp\outro_arquivo.log') | | | filtra_log | | | = log.search(Outro_Log, '23/12/20 | | | 23 08:00', '23/12/2023 18:00') # | | | | | | # salva | | | conteúdo filtrado em um arquivo: | | | | | | filtrado = 'filtered_log.txt' | | | with open(filtado, 'w', | | | encoding='ISO-8859-1') as output | | | _file: # indique o enconding con | | | forme salvo (UTF-8 ou ISO-8859-1) | | | outp | | | ut_file.writelines(filta_log) | | | | | | # m | | | ostra o conteúdo filtrado na tela | | | | | | print(''.join(filtra_log)) | | | | | | # mostra o conteúdo | | | filtrado na tela, listando apenas | | | as os registros do nível "DEBUG" | | | | | | for line in filtered_lines: | | | if "DEBUG" in line: | | | | | | print(line, end='') | | | | | | """ | | | class | | | LevelFilter(logging.Filter): | | | def | | | init(self, levels_to_ignore): | | | self.le | | | vels_to_ignore = levels_to_ignore | | | # | | | # | | | | | | def filter(self, record): | | | return record.lev | | | elno not in self.levels_to_ignore | | | # | | | # | | | def in | | | it(self, log_file_path=None, lo | | | g_level=logging.DEBUG, formato_lo | | | g='%(asctime)s - %(levelname)s - | | | %(message)s', modulo=None, ignore | | | console=None, ignore_file=None): | | | se | | | lf.ignore_file = [] if ignore_fil | | | e is None else ignore_file | | | self | | | .ignore_console = [] if ignore_co | | | nsole is None else ignore_console | | | self.modulo = nam | | | e if modulo is None else modulo | | | self.logger | | | = logging.getLogger(self.modulo) | | | | | | self.logger.setLevel(log_level) | | | self.msg = '' | | | s | | | elf.log_file_path = log_file_path | | | | | | if log_file_path: | | | | | | # Criando um handler p | | | ara escrever em um arquivo de log | | | | | | file_handler = loggin | | | g.FileHandler(self.log_file_path) | | | fi | | | le_handler.setLevel(log_level) # | | | Sempre registrar tudo no arquivo | | | | | | # Criando | | | um handler para exibir no console | | | console | | | handler = logging.StreamHandler() | | | console | | | handler.setLevel(log_level) # R | | | egistrar DEBUG e acima no console | | | | | | # Adi | | | cionando filtro para ignorar cert | | | os níveis no console e no arquivo | | | | | | file_handler.addFilter(se | | | lf.LevelFilter(self.ignore_file)) | | | | | | console_handler.addFilter(self. | | | LevelFilter(self.ignore_console)) | | | | | | # Definin | | | do o formato das mensagens de log | | | formatter | | | = logging.Formatter(formato_log) | | | fil | | | e_handler.setFormatter(formatter) | | | consol | | | e_handler.setFormatter(formatter) | | | | | | # | | | Adicionando os handlers ao logger | | | sel | | | f.logger.addHandler(file_handler) | | | self.l | | | ogger.addHandler(console_handler) | | | else: | | | | | | # Tudo direcionado para o console | | | console | | | handler = logging.StreamHandler() | | | | | | console_handler.setLevel(l | | | og_level) # Registrar no console | | | | | | # Adi | | | cionando filtro para ignorar cert | | | os níveis no console e no arquivo | | | consol | | | e_handler.addFilter(self.LevelFil | | | ter(self.ignore_console)) | | | | | | # Definin | | | do o formato das mensagens de log | | | formatter | | | = logging.Formatter(formato_log) | | | consol | | | e_handler.setFormatter(formatter) | | | | | | | | | # Adicionando o handler ao logger | | | self.l | | | ogger.addHandler(console_handler) | | | | | | # Redi | | | recionando exceções para o logger | | | sys.e | | | xcepthook = self.handle_exception | | | | | | | | | # Redirecionando saída padrão | | | | | | self.original_stdout = sys.stdout | | | sys.stdout = self | | | # | | | # | | | | | | def handle_exception(self, exc | | | _type, exc_value, exc_traceback): | | | if issubcl | | | ass(exc_type, KeyboardInterrupt): | | | | | | sys.excepthook(ex | | | c_type, exc_value, exc_traceback) | | | return | | | | | | self.logger.error("Exc | | | eção não prevista", exc_info=(exc | | | _type, exc_value, exc_traceback)) | | | # | | | # | | | | | | def print(self, *args, **kwargs): | | | # Im | | | prime diretamente na saída padrão | | | print(*args, **k | | | wargs, file=self.original_stdout) | | | # | | | # | | | de | | | f add(self, message, corte=None): | | | message = mess | | | age[:corte] if corte else message | | | | | | self.msg = self.msg + message if | | | not message is None else self.msg | | | # | | | # | | | def write(self, message): | | | if message.str | | | ip(): # Ignorar mensagens vazias | | | | | | self.logger.info(message.strip()) | | | # | | | # | | | def flush(self): | | | | | | pass # Método necessário pa | | | ra compatibilidade com sys.stdout | | | # | | | # | | | def debug(self, | | | message, corte=None, exit=None): | | | | | | self.msg = self.msg + message if | | | not message is None else self.msg | | | msg = self.m | | | sg[:corte] if corte else self.msg | | | | | | self.logger.debug(msg) | | | self.msg = '' | | | if exit: | | | os._exit(exit) | | | # | | | # | | | def info(self, | | | message, corte=None, exit=None): | | | | | | self.msg = self.msg + message if | | | not message is None else self.msg | | | msg = self.m | | | sg[:corte] if corte else self.msg | | | self.logger.info(msg) | | | self.msg = '' | | | if exit: | | | | | | os._exit(exit) | | | # | | | # | | | def aviso(self, | | | message, corte=None, exit=None): | | | | | | self.msg = self.msg + message if | | | not message is None else self.msg | | | msg = self.m | | | sg[:corte] if corte else self.msg | | | | | | self.logger.warning(msg) | | | self.msg = '' | | | if exit: | | | os._exit(exit) | | | # | | | # | | | def erro(self, | | | message, corte=None, exit=None): | | | | | | self.msg = self.msg + message if | | | not message is None else self.msg | | | msg = self.m | | | sg[:corte] if corte else self.msg | | | | | | self.logger.error(msg) | | | self.msg = '' | | | if exit: | | | os._exit(exit) | | | # | | | # | | | def critico(self, | | | message, corte=None, exit=None): | | | | | | self.msg = self.msg + message if | | | not message is None else self.msg | | | msg = self.m | | | sg[:corte] if corte else self.msg | | | | | | self.logger.critical(msg) | | | self.msg = '' | | | if exit: | | | os._exit(exit) | | | # | | | # | | | def stop_logging(self): | | | | | | # Restaurar o stdout original | | | | | | sys.stdout = self.original_stdout | | | | | | # Remover handlers do logger | | | h | | | andlers = self.logger.handlers[:] | | | | | | for handler in handlers: | | | handler.close() | | | s | | | elf.logger.removeHandler(handler) | | | # | | | # | | | def filtra | | | (self, log_file, dh_ini, dh_fim): | | | | | | # Validar parametros de entrada | | | if dh_ini: | | | if not isins | | | tance(dh_ini, datetime.datetime): | | | if not re.ful | | | lmatch(r'([0-3][0-9]/[0-1][0-2]/[ | | | 1-2][0-9]{3} [0-2][0-9]:[0-6][0- | | | 9])(:[0-6][0-9]){0,1}', dh_ini): | | | | | | self.logger.error(f'Da | | | ta/Hora início {dh_ini} em format | | | o inválido. Informe um objeto do | | | tipo "datetime" ou uma string no | | | formato "dd/mm/aaaa hh:mm:[ss]"') | | | | | | return None | | | | | | elif len(dh_ini) == | | | 16: # Formato 'dd/mm/yyyy hh:mm' | | | | | | dh_ini += ":00" | | | try: | | | sel | | | f.inicio = datetime.datetime.strp | | | time(dh_ini, '%d/%m/%Y %H:%M:%S') | | | except: | | | | | | self.logger.error(f'Da | | | ta/Hora início {dh_ini} em format | | | o inválido. Informe um objeto do | | | tipo "datetime" ou uma string no | | | formato "dd/mm/aaaa hh:mm:[ss]"') | | | | | | return None | | | else: | | | | | | self.inicio = dh_ini | | | else: | | | | | | self.inicio = date | | | time.datetime.now() - datetime.ti | | | medelta(hours=1) ## assume a últi | | | ma hora como intervalo, se omisso | | | | | | if dh_fim: | | | if not isins | | | tance(dh_fim, datetime.datetime): | | | if not re.ful | | | lmatch(r'([0-3][0-9]/[0-1][0-2]/[ | | | 1-2][0-9]{3} [0-2][0-9]:[0-6][0- | | | 9])(:[0-6][0-9]){0,1}', dh_ini): | | | | | | self.logger.error(f | | | 'Data/Hora fim {dh_fim} em format | | | o inválido. Informe um objeto do | | | tipo "datetime" ou uma string no | | | formato "dd/mm/aaaa hh:mm:[ss]"') | | | | | | return None | | | | | | elif len(dh_fim) == | | | 16: # Formato 'dd/mm/yyyy hh:mm' | | | | | | dh_fim += ":00" | | | try: | | | | | | self.fim = datetime.datetime.strp | | | time(dh_fim, '%d/%m/%Y %H:%M:%S') | | | except: | | | | | | self.logger.error(f | | | 'Data/Hora fim {dh_fim} em format | | | o inválido. Informe um objeto do | | | tipo "datetime" ou uma string no | | | formato "dd/mm/aaaa hh:mm:[ss]"') | | | | | | return None | | | else: | | | | | | self.fim = dh_fim | | | else: | | | | | | self.fim = datetim | | | e.datetime.now() ## assume a últi | | | ma hora como intervalo, se omisso | | | | | | if not log | | | _file and not self.log_file_path: | | | self.logger | | | .critical('Nenhum arquivo de log | | | disponível. Log desta instância c | | | onfigurado apenas para exibição e | | | m tela, sem registro em arquivo') | | | return None | | | elif not | | | log_file and self.log_file_path: | | | l | | | og_file_path = self.log_file_path | | | elif log_file: | | | | | | if Path(log_file).is_file(): | | | | | | log_file_path = log_file | | | else: | | | | | | self.logger.critical(f'Arquiv | | | o de log {log_file} não existe!') | | | return None | | | else: | | | sel | | | f.logger.critical('Erro validação | | | arquivo de entrada. Abortando!') | | | return None | | | | | | # | | | Função para verificar se a linha | | | está dentro do intervalo de tempo | | | | | | def is_within_time_ran | | | ge(timestamp, dh_inicio, dh_fim): | | | return | | | dh_inicio <= timestamp <= dh_fim | | | | | | | | | # Ler e filtrar o arquivo de | | | log com a codificação ISO-8859-1 | | | | | | with open(log_file_path, 'r', en | | | coding='ISO-8859-1') as log_file: | | | | | | log_lines = log_file.readlines() | | | | | | # Variável para ar | | | mazenar o último timestamp válido | | | | | | last_valid_timestamp = None | | | filtered_lines = [] | | | | | | | | | for line in log_lines: | | | try: | | | # | | | Extraia a data e a hora da linha | | | | | | timestamp_str = line.s | | | plit()[0] + " " + line.split()[1] | | | timestamp = | | | datetime.datetime.strptime(times | | | tamp_str, '%Y-%m-%d %H:%M:%S,%f') | | | | | | last_valid_timestamp = timestamp | | | | | | if is_within_time_range(t | | | imestamp, self.inicio, self.fim): | | | | | | filtered_lines.append(line) | | | | | | except Exception as e: | | | # Caso a | | | linha não tenha um carimbo de tem | | | po, use o último timestamp válido | | | | | | if last_valid_timestamp and | | | is_within_time_range(last_valid_t | | | imestamp, self.inicio, self.fim): | | | | | | filtered_lines.append(line) | | | | | | # Retornar o ob | | | jeto contendo as linhas filtradas | | | return filtered_lines | | | | | |
| +-----------------------------------+-----------------------------------+ :::

::: {.doc .doc-children} ::: ::::: ::::::

:::::: {.doc .doc-object .doc-class}

misc {#lbxtoolkit.misc .doc .doc-heading}

::::: {.doc .doc-contents}

Classe misc {#lbxtoolkit.misc--classe-misc}

Classe que reune pequenas funções uteis para agilizar tarefas comuns.

Sintaxe e exemplos de uso. Parametros omissos assume os valores padrão indicados abaixo:

  • Arquivo = seleciona_arquivo(DirBase, TiposArquivo=[('Todos os arquivos', '*.*')], Titulo='Selecionar arquivo')
  • Diretório = seleciona_dir(DirBase=Path(r'./'), Titulo='Selecionar diretório'):
  • NomeLimpo = normaliza('String # SEM Noção!') #>>> string_sem_nocao
  • cmd_window = get_cmd_window()
  • maximize_console()

Source code in lbxtoolkit\__init__.py

::: highlight +-----------------------------------+-----------------------------------+ | ::: linenodiv |

| | 1270 | | | 1271 | cl | | 1272 | ass misc: # Classe de miscelâneas | | 1273 | """ | | 1274 | #### Classe misc | | 1275 | | | 1276 | Cla | | 1277 | sse que reune pequenas funções ut | | 1278 | eis para agilizar tarefas comuns. | | 1279 | | | 1280 | Sintaxe e exemplos de u | | 1281 | so. Parametros omissos assume os | | 1282 | valores padrão indicados abaixo: | | 1283 | | | 1284 | - Arquivo = se | | 1285 | leciona_arquivo(DirBase, TiposArq | | 1286 | uivo=[('Todos os arquivos', '*.*' | | 1287 | )], Titulo='Selecionar arquivo') | | 1288 | - Diretório = s | | 1289 | eleciona_dir(DirBase=Path(r'./'), | | 1290 | Titulo='Selecionar diretório'): | | 1291 | - N | | 1292 | omeLimpo = normaliza('String # SE | | 1293 | M Noção!') #>>> string_sem_nocao | | 1294 | | | 1295 | - cmd_window = get_cmd_window() | | 1296 | | | 1297 | - maximize_console() | | 1298 | """ | | 1299 | def init(self): | | 1300 | pass | | 1301 | # | | 1302 | # | | 1303 | def se | | 1304 | leciona_arquivo(DirBase, TiposArq | | 1305 | uivo=[('Todos os arquivos', '.' | | 1306 | )], Titulo='Selecionar arquivo'): | | 1307 | # Picker para selecionar arquivo | | 1308 | root = tk.Tk() | | 1309 | | | 1310 | root.withdraw() # Esco | | 1311 | nde a janela principal do Tkinter | | 1312 | | | 1313 | Arquivo = filedialog.askopen | | 1314 | filename(initialdir=DirBase, file | | 1315 | types=TiposArquivo, title=Titulo) | | 1316 | | | 1317 | Arquivo = Path(Arquivo) | | 1318 | root.destroy() | | 1319 | return Arquivo | | 1320 | # | | 1321 | # | | 1322 | def se | | 1323 | leciona_dir(DirBase=Path(r'./'), | | 1324 | Titulo='Selecionar diretório'): # | | 1325 | Picker para selecionar diretório | | 1326 | root = tk.Tk | | 1327 | () # objeto picker (Tkinter)para | | 1328 | selecionar arquivos e diretórios | | 1329 | | | 1330 | root.withdraw() # Esco | | 1331 | nde a janela principal do Tkinter | | 1332 | Dir | | 1333 | etorio = filedialog.askdirectory( | | ::: | initialdir=DirBase, title=Titulo) | | | | | | Diretorio = Path(Diretorio) | | | root.destroy() | | | return Diretorio | | | # | | | # | | | def normaliza(Ori | | | ginal): # Limpa e padroniza nomes | | | Lixo = r'/\?%§ªº°` | | | ´^~:|"<>!@#$%¨&()+=-[]{}"' ' | | | Normalizar = nor | | | malize('NFKD', Original).encode(' | | | ASCII', 'ignore').decode('ASCII') | | | R | | | emoverLixo = [c if c not in Lixo | | | else '' for c in Normalizar] | | | | | | Limpo = "".join(RemoverLixo) | | | Limpo = re.su | | | b(r'.(?=.*.)', '', Limpo) # tr | | | oca todos os pontos por underline | | | Li | | | mpo = re.sub(r'+', '_', Limpo) | | | # limpa as reptições do underline | | | return Limpo.lower() | | | # | | | # | | | def get_cmd_wind | | | ow(): # Captura a referencia da j | | | anela atual para retornar o foco | | | à ela depois de chamar os pickers | | | pid = os.getpid() | | | win | | | dows = gw.getWindowsWithTitle("") | | | | | | for window in windows: | | | | | | if window.title and w | | | indow.visible and window.topleft: | | | return window | | | return None | | | # | | | # | | | def maximize_console | | | (): # Ajustar o buffer de console | | | # os.syste | | | m('mode con: cols=500 lines=100') | | | # Ob | | | ter o handle da janela do console | | | ker | | | nel32 = ctypes.WinDLL('kernel32') | | | | | | user32 = ctypes.WinDLL('user32') | | | h | | | Wnd = kernel32.GetConsoleWindow() | | | if hWnd: | | | | | | # Definir as dimensões da tela | | | user32.ShowWin | | | dow(hWnd, 3) # 3 = SW_MAXIMIZE | | | | | |
| +-----------------------------------+-----------------------------------+ :::

::: {.doc .doc-children} ::: ::::: ::::::

:::::: {.doc .doc-object .doc-class}

postgreSQL {#lbxtoolkit.postgreSQL .doc .doc-heading}

::::: {.doc .doc-contents}

Classe postgreSQL {#lbxtoolkit.postgreSQL--classe-postgresql}

Recursos de interação com o banco de dados relacional PostgreSQL

1) O método postgreSQl.db() exige que as credenciais e parametros de acesso sejam fornecidas em um dicionário com, ao mínimo, o seguinte formato:

credenciais = {
                'dbname': 'NOME_BANCO',
                'user': 'USUARIO'',        
                'password': 'SENHA',     
                'host': 'IP_OU_DNS_SERVIDOR',
                'port': 'PORTA_POSTGRESQL',  ## padrão = 5432
            }

conexao = postgreSQL.db(credenciais)

O nome do schema é ser declarado no contexto da query, mas se desejar alterar o schema padrão, adicione 'options' : '-c search_path=[NOME_SCHEMA]', ao dicionário.

Qualquer argumento de conexão previsto no pacote psycopg2 são aceitos como entrada no dicionário acima.

2) O método postgreSQl.csv_df() lê arquivo texto do tipo CSV e o converte para o objeto Dataframe do pandas. A assinatura da função exige que se forneça o caminho do arquivo CSV e, opcionalmente o caracter delimitador. Se o caracter demilitador não for informado, será assumido ;. Considere usar a função Path para tratar o caminho do arquivo de origem.

from pathlib import Path
arquivo_csv = Path('./diretorio/arquivo_exemplo.csv')
dados = postgreSQL.csv_df(arquivo_csv, CsvDelim=',') # usando vírgula como separador. se omisso, assume ";'

3) O método postgreSQl.db_insert_df() insere dados a partir de um Dataframe (pandas) em uma tabela do banco com estrutura de colunas equivalente.

A assinatura da função é postgreSQL.db_insert_df([conexao], [dataframe_origem], [tabela_destino], Schema=None, Colunas=None, OnConflict=None)

É necessário que os nomes das colunas do dataframe coincidam com o nome das colunas da tabela. Não há como traduzir/compatibilizar (de-para) nomes de colunas entre o dataframe e a tabela.

Os três primeiros parametros são posicionais e correspondem, respectivamente, (1) ao objeto da conexão com o banco, (2) ao objeto que contém o dataframe e (3) ao nome da tabela de destino. Assume-se que a tabela pertença ao schema padrão (definido na variável search_path do servidor). Caso a tabela de destino esteja em um schema diferente do padrão, deve-se informar seu nome no parâmetro opcional Schema.

O parametro opcional Colunas espera um objeto do tipo lista que contenha a relação das colunas a serem importadas. As colunas listadas neste objeto precisam existir nas duas pontas (dataframe e tabela). Caso seja omisso, todas as colunas do dataframe serão inseridas na tabela. Neste caso, admite-se que haja colunas na tabela que não exitam no dataframe (serão gravadas como NULL), mas o contrário provocará erro.

O último parametro opcional OnConflict espera uma declaração para tratar o que fazer caso o dado a ser inserido já exista na tabela, baseado na cláusula ON CONFLICT do comando INSERT. A claúsula deve ser declarada explicita e integralmente nessa variável (clausula, target e action) e não há crítica/validação desse argumento, podendo gerar erros se declarado inconforme com o padrão SQL.

Exemplo de uso:

from lbx_toolkit import postgreSQL
from pathlib import Path

credenciais = {
                'dbname': 'NOME_BANCO',
                'user': 'USUARIO'',        
                'password': 'SENHA',     
                'host': 'IP_OU_DNS_SERVIDOR',
                'port': 'PORTA_POSTGRESQL',  ## padrão = 5432
            }

conexao = postgreSQL.db(credenciais)

arquivo_csv = Path('./diretorio/arquivo_exemplo.csv')
dados = postgreSQL.csv_df(arquivo_csv, CsvDelim=',') # usando vírgula como separador. se omisso, assume ";'

postgreSQL.db_insert_df(conexao, dados, 'teste_table', Schema='meu_esquema', OnConflict='on conflict (coluna_chave_primaria) do nothing')

# conexão com o banco precisa ser fechada explicitamente após a chamada do método, caso não seja mais utilizada:
conexao.close()

4) O método postgreSQl.db_select() executa consultas no banco de dados e retorna um cursor com o resultado.

A assinatura da função é postgreSQL.db_select([conexao], [query])

São permitidas apenas instruções de consulta (podendo serem complexas, por exemplo, com uso de CTE). A presença de outras instruções SQL de manipulação de dados e metadados não são permitidas e abortarão a execução da query, se presentes.

O cursor é fechado no contexto do método, antes do retorno, não podendo ser manipulado após recebido como retorno da função.

A função retorna dois objetos, o primeiro contendo os dados do cursor, o segundo, contendo os nomes das respectivas colunas.

Exemplo de uso:

from lbx_toolkit import postgreSQL
from pathlib import Path

credenciais = {
                'dbname': 'NOME_BANCO',
                'user': 'USUARIO'',        
                'password': 'SENHA',     
                'host': 'IP_OU_DNS_SERVIDOR',
                'port': 'PORTA_POSTGRESQL',  ## padrão = 5432
            }

conexao = postgreSQL.db(credenciais)

query = 'select * from meu_esquema.teste_table'

dados, colunas = postgreSQL.db_select(conexao, query)
conexao.close()

5) O método postgreSQl.db_update() executa updates no banco

A assinatura da função é postgreSQL.db_update([conexao], [query])

São permitidas apenas instruções de update. A presença de outras instruções SQL de manipulação de dados e metadados não são permitidas e abortarão a execução da query.

A função retorna a quantidade de linhas alteradas.

Exemplo de uso:

from lbx_toolkit import postgreSQL
from pathlib import Path

credenciais = {
                'dbname': 'NOME_BANCO',
                'user': 'USUARIO'',        
                'password': 'SENHA',     
                'host': 'IP_OU_DNS_SERVIDOR',
                'port': 'PORTA_POSTGRESQL',  ## padrão = 5432
            }

conexao = postgreSQL.db(credenciais)

query = "update meu_esquema.teste_table set coluna='novo_valor' where pk='chave'"

result = postgreSQL.db_update(conexao, query)
conexao.close()

Source code in lbxtoolkit\__init__.py

::: highlight +-----------------------------------+-----------------------------------+ | ::: linenodiv |

| | 494 | | | 495 | cla | | 496 | ss postgreSQL: # Classe de acesso | | 497 | e interação com banco PostgreSQL | | 498 | """ | | 499 | | | 500 | #### Classe postgreSQL | | 501 | | | 502 | | | 503 | Recursos de interação com o ban | | 504 | co de dados relacional PostgreSQL | | 505 | | | 506 | 1) O método post | | 507 | greSQl.db() exige que as credenc | | 508 | iais e parametros de acesso sejam | | 509 | fornecidas em um dicionário co | | 510 | m, ao mínimo, o seguinte formato: | | 511 | | | 512 | | | 513 | credenciais = { | | 514 | | | 515 | 'dbname': 'NOME_BANCO', | | 516 | | | 517 | 'user': 'USUARIO'', | | 518 | | | 519 | 'password': 'SENHA', | | 520 | | | 521 | 'host': 'IP_OU_DNS_SERVIDOR', | | 522 | | | 523 | 'port': 'POR | | 524 | TA_POSTGRESQL', ## padrão = 5432 | | 525 | } | | 526 | | | 527 | con | | 528 | exao = postgreSQL.db(credenciais) | | 529 | | | 530 | | | 531 | O | | 532 | nome do schema é ser declarado n | | 533 | o contexto da query, mas se desej | | 534 | ar alterar o schema padrão, adici | | 535 | one 'options' : '-c search_path | | 536 | =[NOME_SCHEMA]', ao dicionário. | | 537 | | | 538 | Qualqu | | 539 | er argumento de conexão previsto | | 540 | no pacote psycopg2 são aceitos | | 541 | como entrada no dicionário acima. | | 542 | | | 543 | | | 544 | 2) O método postgreSQl.csv_df( | | 545 | ) lê arquivo texto do tipo CSV e | | 546 | o converte para o objeto Datafra | | 547 | me do pandas. A assinatura da f | | 548 | unção exige que se forneça o cami | | 549 | nho do arquivo CSV e, opcionalmen | | 550 | te o caracter delimitador. Se o c | | 551 | aracter demilitador não for infor | | 552 | mado, será assumido ;. Consider | | 553 | e usar a função Path para trata | | 554 | r o caminho do arquivo de origem. | | 555 | | | 556 | | | 557 | from pathlib import Path | | 558 | arquivo_csv = Path(' | | 559 | ./diretorio/arquivo_exemplo.csv') | | 560 | dados = | | 561 | postgreSQL.csv_df(arquivo_csv, Cs | | 562 | vDelim=',') # usando vírgula como | | 563 | separador. se omisso, assume ";' | | 564 | | | 565 | | | 566 | 3) O método po | | 567 | stgreSQl.db_insert_df() insere d | | 568 | ados a partir de um Dataframe (pa | | 569 | ndas) em uma tabela do banco com | | 570 | estrutura de colunas equivalente. | | 571 | | | 572 | A assinatur | | 573 | a da função é postgreSQL.db_inse | | 574 | rt_df([conexao], [dataframe_orige | | 575 | m], [tabela_destino], Schema=None | | 576 | , Colunas=None, OnConflict=None) | | 577 | | | 578 | | | 579 | É necessário que os nomes das | | 580 | colunas do dataframe coincidam c | | 581 | om o nome das colunas da tabela. | | 582 | | | 583 | Não há como traduzir/compa | | 584 | tibilizar (de-para) nomes de colu | | 585 | nas entre o dataframe e a tabela. | | 586 | | | 587 | Os três primeiros param | | 588 | etros são posicionais e correspon | | 589 | dem, respectivamente, (1) ao obje | | 590 | to da conexão com o banco, (2) ao | | 591 | objeto que contém o dataframe e | | 592 | (3) ao nome da tabela de destino. | | 593 | | | 594 | Assume-se que a tabela per | | 595 | tença ao schema padrão (definido | | 596 | na variável search_path do serv | | 597 | idor). Caso a tabela de destino e | | 598 | steja em um schema diferente do | | 599 | padrão, deve-se informar seu nom | | 600 | e no parâmetro opcional Schema. | | 601 | | | 602 | O parametro opcional | | 603 | Colunas espera um objeto do ti | | 604 | po lista que contenha a relação | | 605 | das colunas a serem importadas. | | 606 | As colunas listadas n | | 607 | este objeto precisam existir nas | | 608 | duas pontas (dataframe e tabela). | | 609 | Caso seja omis | | 610 | so, todas as colunas do dataframe | | 611 | serão inseridas na tabela. Neste | | 612 | caso, admite-se que haja colunas | | 613 | na tabela que não exitam no data | | 614 | frame (serão gravadas como NULL), | | 615 | mas o contrário provocará erro. | | 616 | | | 617 | O último | | 618 | parametro opcional OnConflict e | | 619 | spera uma declaração para tratar | | 620 | o que fazer caso o dado a ser ins | | 621 | erido já exista na tabela, basead | | 622 | o na cláusula [ON CONFLICT](htt | | 623 | ps://www.postgresql.org/docs/curr | | 624 | ent/sql-insert.html#SQL-ON-CONFLI | | 625 | CT) do comando INSERT. A claúsula | | 626 | deve ser declarada explicita e i | | 627 | ntegralmente nessa variável (clau | | 628 | sula, target e action) e não | | 629 | há crítica/validação desse argume | | 630 | nto, podendo gerar erros se decla | | 631 | rado inconforme com o padrão SQL. | | 632 | | | 633 | Exemplo de uso: | | 634 | | | 635 | | | 636 | f | | 637 | rom lbx_toolkit import postgreSQL | | 638 | from pathlib import Path | | 639 | | | 640 | credenciais = { | | 641 | | | 642 | 'dbname': 'NOME_BANCO', | | 643 | | | 644 | 'user': 'USUARIO'', | | 645 | | | 646 | 'password': 'SENHA', | | 647 | | | 648 | 'host': 'IP_OU_DNS_SERVIDOR', | | 649 | | | 650 | 'port': 'POR | | 651 | TA_POSTGRESQL', ## padrão = 5432 | | 652 | } | | 653 | | | 654 | con | | 655 | exao = postgreSQL.db(credenciais) | | 656 | | | 657 | arquivo_csv = Path(' | | 658 | ./diretorio/arquivo_exemplo.csv') | | 659 | dados = | | 660 | postgreSQL.csv_df(arquivo_csv, Cs | | 661 | vDelim=',') # usando vírgula como | | 662 | separador. se omisso, assume ";' | | 663 | | | 664 | postg | | 665 | reSQL.db_insert_df(conexao, dados | | 666 | , 'teste_table', Schema='meu_esqu | | 667 | ema', OnConflict='on conflict (co | | 668 | luna_chave_primaria) do nothing') | | 669 | | | 670 | # conexão com | | 671 | o banco precisa ser fechada expl | | 672 | icitamente após a chamada do méto | | 673 | do, caso não seja mais utilizada: | | 674 | conexao.close() | | 675 | | | 676 | | | 677 | 4) O método | | 678 | postgreSQl.db_select() executa | | 679 | consultas no banco de dados e ret | | 680 | orna um cursor com o resultado. | | 681 | | | 682 | A | | 683 | assinatura da função é postgreS | | 684 | QL.db_select([conexao], [query]) | | 685 | | | 686 | Sã | | 687 | o permitidas apenas instruções de | | 688 | consulta (podendo serem complexa | | 689 | s, por exemplo, com uso de [CTE]( | | 690 | https://www.postgresql.org/docs/c | | 691 | urrent/queries-with.html)). A pre | | 692 | sença de outras instruções SQL de | | 693 | manipulação de dados e metadados | | 694 | não são permitidas e abortarão a | | 695 | execução da query, se presentes. | | 696 | | | 697 | | | 698 | O cursor é fechado no conte | | 699 | xto do método, antes do retorno, | | 700 | não podendo ser manipulado após | | 701 | recebido como retorno da função. | | 702 | | | 703 | | | 704 | A função retorna dois obje | | 705 | tos, o primeiro contendo os dado | | 706 | s do cursor, o segundo, contendo | | ::: | os nomes das respectivas colunas. | | | | | | Exemplo de uso: | | | | | | | | | f | | | rom lbx_toolkit import postgreSQL | | | from pathlib import Path | | | | | | credenciais = { | | | | | | 'dbname': 'NOME_BANCO', | | | | | | 'user': 'USUARIO'', | | | | | | 'password': 'SENHA', | | | | | | 'host': 'IP_OU_DNS_SERVIDOR', | | | | | | 'port': 'POR | | | TA_POSTGRESQL', ## padrão = 5432 | | | } | | | | | | con | | | exao = postgreSQL.db(credenciais) | | | | | | query = 'selec | | | t * from meu_esquema.teste_table' | | | | | | dados, colunas = pos | | | tgreSQL.db_select(conexao, query) | | | conexao.close() | | | | | | | | | | | | 5) O método postgreSQl.db_u | | | pdate() executa updates no banco | | | | | | A | | | assinatura da função é postgreS | | | QL.db_update([conexao], [query]) | | | | | | Sã | | | o permitidas apenas instruções de | | | update. A presença de outras ins | | | truções SQL de manipulação de dad | | | os e metadados não são permitidas | | | e abortarão a execução da query. | | | | | | A função retorna a | | | quantidade de linhas alteradas. | | | | | | Exemplo de uso: | | | | | | | | | f | | | rom lbx_toolkit import postgreSQL | | | from pathlib import Path | | | | | | credenciais = { | | | | | | 'dbname': 'NOME_BANCO', | | | | | | 'user': 'USUARIO'', | | | | | | 'password': 'SENHA', | | | | | | 'host': 'IP_OU_DNS_SERVIDOR', | | | | | | 'port': 'POR | | | TA_POSTGRESQL', ## padrão = 5432 | | | } | | | | | | con | | | exao = postgreSQL.db(credenciais) | | | | | | query = "update | | | meu_esquema.teste_table set colu | | | na='novo_valor' where pk='chave'" | | | | | | result = pos | | | tgreSQL.db_update(conexao, query) | | | conexao.close() | | | | | | | | | """ | | | def i | | | nit(self, config, logger=None): | | | | | | self.logger = logger if not l | | | ogger is None else lbx_logger(Non | | | e, logging.DEBUG, '%(levelname)s: | | | %(message)s') # se não fornecer | | | o logger, vai tudo para o console | | | | | | try: | | | | | | self.Conexao = psyco | | | pg2.connect(**config) ## na cham | | | ada de uma função/método, o * exp | | | lode os valores de um dicionário | | | em argumentos posicionais (só val | | | ores) e ** explode discionário em | | | argumentos nominais (nome=valor) | | | | | | except Exception as Err: | | | raise | | | # | | | # | | | def csv_df(self, C | | | svPath, CsvDelim=';'): # Le arqui | | | vo CSV e gera Dataframe do Pandas | | | try: | | | | | | DataFrame = pd.read_cs | | | v(CsvPath, delimiter=CsvDelim) # | | | Verifique se o delimitador é ';' | | | | | | DataFrame.replace({np.nan: | | | None}, inplace=True) ## troca 'N | | | aN' por None (null no postgresql) | | | return DataFrame | | | | | | except Exception as Err: | | | raise | | | # | | | # | | | def db_insert_df(self, Da | | | taFrame, Tabela, Schema=None, Col | | | unas=None, OnConflict=None): # In | | | sere os dados de um dataframe em | | | uma tabela equivalente no banco ( | | | exige mesma estrutura de colunas) | | | # Essa função ex | | | ige que os nomes dos cabeçalhos d | | | as colunas do CSV sejam os mesmos | | | das colunas da tabela de destino | | | | | | Colunas = Colunas or DataFram | | | e.columns.tolist() # Caso não | | | seja fornecida a lista de coluna | | | s, assume as colunas do DataFrame | | | V | | | alores = [tuple(Linha) for Linha | | | in DataFrame[Colunas].values] | | | | | | Schema = Schema or 'public' | | | Query = | | | f'insert into {Schema}.{Tabela} ( | | | {', '.join(Colunas)}) values %s ' | | | | | | if not OnConflict is None: | | | | | | Query = Query + OnConflict | | | | | | try: | | | sel | | | f.Cursor = self.Conexao.cursor() | | | execute_valu | | | es(self.Cursor, Query, Valores) | | | | | | self.Conexao.commit() | | | | | | except Exception as Err: | | | | | | self.Conexao.rollback() | | | raise | | | finally: | | | | | | self.Cursor.close() | | | #Conexao.cl | | | ose() ## conexão precisa ser fech | | | ada explicitamente fora da classe | | | # | | | # | | | def | | | db_select(self, Query): # Retorn | | | a um cursor à partir de um select | | | BlackList | | | = ['INSERT ', 'DELETE ', 'UPDATE | | | ', 'CREATE ', 'DROP ', 'MERGE ', | | | 'REPLACE ', 'CALL ', 'EXECUTE '] | | | | | | if any(element in Query.u | | | pper() for element in BlackList): | | | | | | BlackListed = [eleme | | | nt for element in BlackList if el | | | ement in Query.upper()] | | | self.logger | | | .erro(f'{name}: Este método p | | | ermite apenas consultas. A query | | | informada possui as seguintes pal | | | avras reservadas não aceitas: {Bl | | | ackListed} e não foi executada!') | | | return None | | | else: | | | try: | | | se | | | lf.Cursor = self.Conexao.cursor() | | | | | | self.Cursor.execute(Query) | | | | | | Dados = self.Cursor.fetchall() | | | | | | Colunas = [Col[0] fo | | | r Col in self.Cursor.description] | | | | | | self.Conexao.commit() | | | | | | self.Cursor.close() | | | | | | return Dados, Colunas | | | | | | except Exception as Err: | | | | | | self.Conexao.rollback() | | | raise | | | # | | | # | | | def | | | db_update(self, Query): # Retorn | | | a um cursor à partir de um select | | | UpdRows = 0 | | | BlackList | | | = ['INSERT ', 'DELETE ', 'SELECT | | | ', 'CREATE ', 'DROP ', 'MERGE ', | | | 'REPLACE ', 'CALL ', 'EXECUTE '] | | | | | | if any(element in Query.u | | | pper() for element in BlackList): | | | | | | BlackListed = [eleme | | | nt for element in BlackList if el | | | ement in Query.upper()] | | | self.logg | | | er.erro(f'{name}: Este método | | | permite apenas updates. A query | | | informada possui as seguintes pal | | | avras reservadas não aceitas: {Bl | | | ackListed} e não foi executada!') | | | | | | return None | | | else: | | | try: | | | se | | | lf.Cursor = self.Conexao.cursor() | | | | | | self.Cursor.execute(Query) | | | | | | UpdRows = self.Cursor.rowcount | | | | | | self.Conexao.commit() | | | | | | self.Cursor.close() | | | | | | return UpdRows | | | | | | except Exception as Err: | | | | | | self.Conexao.rollback() | | | raise | | | | | |
| +-----------------------------------+-----------------------------------+ :::

::: {.doc .doc-children} ::: ::::: :::::: ::::::::::::::::::::::::::::::::::::::::::: :::::::::::::::::::::::::::::::::::::::::::: ::::::::::::::::::::::::::::::::::::::::::::: :::::::::::::::::::::::::::::::::::::::::::::: ::::::::::::::::::::::::::::::::::::::::::::::::::::: ::::::::::::::::::::::::::::::::::::::::::::::::::::::

::::: {.md-footer-meta .md-typeset} :::: {.md-footer-meta__inner .md-grid} ::: md-copyright Made with Material for MkDocs{target="_blank" rel="noopener"} ::: :::: ::::: ::::::::::::::::::::::::::::::::::::::::::::::::::::::::::

:::: {.md-dialog md-component="dialog"} ::: {.md-dialog__inner .md-typeset} ::: ::::

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

lbxtoolkit-2.1.1.tar.gz (102.9 kB view details)

Uploaded Source

Built Distribution

lbxtoolkit-2.1.1-py3-none-any.whl (64.7 kB view details)

Uploaded Python 3

File details

Details for the file lbxtoolkit-2.1.1.tar.gz.

File metadata

  • Download URL: lbxtoolkit-2.1.1.tar.gz
  • Upload date:
  • Size: 102.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.12.2

File hashes

Hashes for lbxtoolkit-2.1.1.tar.gz
Algorithm Hash digest
SHA256 bce25fc3c80462197e88051b072c20c7f70ae23e5f22459a8468342dde532b6e
MD5 f8de988ebb239e5946e85bc96cc0788a
BLAKE2b-256 2233b2992b0ef5343ffa1feecffb0225e5ab604b619136fce12165811136176a

See more details on using hashes here.

File details

Details for the file lbxtoolkit-2.1.1-py3-none-any.whl.

File metadata

  • Download URL: lbxtoolkit-2.1.1-py3-none-any.whl
  • Upload date:
  • Size: 64.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.12.2

File hashes

Hashes for lbxtoolkit-2.1.1-py3-none-any.whl
Algorithm Hash digest
SHA256 f5a8ff83e086bd7d7f55756033514730f44f2b2d3e30e87aa933fb7540ccd3a5
MD5 cc437a7b49f5e942ec771f9ffa2d942f
BLAKE2b-256 7a8c603a2252fe0b6faa87bbfc96e1044ea32473c704f2e8d37228610e5d7e1f

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page