package fetcher import ( "archive/zip" "bytes" "encoding/csv" "fmt" "io" "log/slog" "net/http" "strings" "time" "golang.org/x/text/encoding/charmap" "golang.org/x/text/transform" "github.com/sentinela-go/internal/db" ) func FetchCVMCompanies(database *db.DB) error { slog.Info("fetching CVM company registry") resp, err := http.Get("https://dados.cvm.gov.br/dados/CIA_ABERTA/CAD/DADOS/cad_cia_aberta.csv") if err != nil { return fmt.Errorf("fetch cvm companies: %w", err) } defer resp.Body.Close() reader := transform.NewReader(resp.Body, charmap.ISO8859_1.NewDecoder()) csvReader := csv.NewReader(reader) csvReader.Comma = ';' csvReader.LazyQuotes = true header, err := csvReader.Read() if err != nil { return fmt.Errorf("read header: %w", err) } colIdx := make(map[string]int) for i, h := range header { colIdx[strings.TrimSpace(h)] = i } count := 0 for { record, err := csvReader.Read() if err == io.EOF { break } if err != nil { continue } getCol := func(name string) string { if idx, ok := colIdx[name]; ok && idx < len(record) { return strings.TrimSpace(record[idx]) } return "" } c := &db.Company{ Name: getCol("DENOM_SOCIAL"), CNPJ: getCol("CNPJ_CIA"), CVMCode: getCol("CD_CVM"), Status: getCol("SIT"), Sector: getCol("SETOR_ATIV"), } if c.CNPJ == "" || c.Name == "" { continue } if err := database.UpsertCompany(c); err != nil { continue } count++ } database.RebuildCompaniesFTS() slog.Info("CVM companies loaded", "count", count) return nil } func FetchCVMFilings(database *db.DB, year int) error { slog.Info("fetching CVM IPE filings", "year", year) url := fmt.Sprintf("https://dados.cvm.gov.br/dados/CIA_ABERTA/DOC/IPE/DADOS/ipe_cia_aberta_%d.zip", year) resp, err := http.Get(url) if err != nil { return fmt.Errorf("fetch ipe %d: %w", year, err) } defer resp.Body.Close() body, err := io.ReadAll(resp.Body) if err != nil { return err } zipReader, err := zip.NewReader(bytes.NewReader(body), int64(len(body))) if err != nil { return fmt.Errorf("open zip: %w", err) } count := 0 for _, f := range zipReader.File { if !strings.HasSuffix(f.Name, ".csv") { continue } rc, err := f.Open() if err != nil { continue } reader := transform.NewReader(rc, charmap.ISO8859_1.NewDecoder()) csvReader := csv.NewReader(reader) csvReader.Comma = ';' csvReader.LazyQuotes = true header, err := csvReader.Read() if err != nil { rc.Close() continue } colIdx := make(map[string]int) for i, h := range header { colIdx[strings.TrimSpace(h)] = i } for { record, err := csvReader.Read() if err != nil { break } getCol := func(name string) string { if idx, ok := colIdx[name]; ok && idx < len(record) { return strings.TrimSpace(record[idx]) } return "" } cnpj := getCol("CNPJ_CIA") extID := getCol("NUM_SEQ") if extID == "" { extID = fmt.Sprintf("%s-%s-%s", cnpj, getCol("DT_ENTREGA"), getCol("NUM_PROTOCOLO")) } // Try to find company var companyID *int64 if cnpj != "" { if c, err := database.GetCompanyByCNPJ(cnpj); err == nil && c != nil { companyID = &c.ID } } filing := &db.Filing{ ExternalID: extID, CompanyID: companyID, CNPJ: cnpj, Category: getCol("CATEG_DOC"), Type: getCol("TP_DOC"), Species: getCol("ESPECIE"), Subject: getCol("ASSUNTO"), ReferenceDate: getCol("DT_REFER"), DeliveryDate: getCol("DT_ENTREGA"), Protocol: getCol("NUM_PROTOCOLO"), Version: getCol("VERSAO"), DownloadURL: getCol("LINK_DOC"), } if filing.DeliveryDate == "" { continue } if err := database.UpsertFiling(filing); err != nil { continue } count++ } rc.Close() } database.RebuildFilingsFTS() slog.Info("CVM filings loaded", "year", year, "count", count) return nil } func FetchAllCVM(database *db.DB) error { start := time.Now() if err := FetchCVMCompanies(database); err != nil { return err } // Fetch current year filings currentYear := time.Now().Year() FetchCVMFilings(database, currentYear) FetchCVMFilings(database, currentYear-1) slog.Info("CVM sync complete", "duration", time.Since(start)) return nil }